summaryrefslogtreecommitdiff
path: root/net/rxrpc/output.c
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2022-08-27 14:27:56 +0100
committerDavid Howells <dhowells@redhat.com>2022-11-08 16:42:28 +0000
commit5d7edbc9231ec6b60f9c5b7e7980e9a1cd92e6bb (patch)
treebb10e432f005b9988c542d079a1c6f0200b4fc64 /net/rxrpc/output.c
parentd4d02d8bb5c412d977af7ea7c7ea91977a6a64dc (diff)
downloadlwn-5d7edbc9231ec6b60f9c5b7e7980e9a1cd92e6bb.tar.gz
lwn-5d7edbc9231ec6b60f9c5b7e7980e9a1cd92e6bb.zip
rxrpc: Get rid of the Rx ring
Get rid of the Rx ring and replace it with a pair of queues instead. One queue gets the packets that are in-sequence and are ready for processing by recvmsg(); the other queue gets the out-of-sequence packets for addition to the first queue as the holes get filled. The annotation ring is removed and replaced with a SACK table. The SACK table has the bits set that correspond exactly to the sequence number of the packet being acked. The SACK ring is copied when an ACK packet is being assembled and rotated so that the first ACK is in byte 0. Flow control handling is altered so that packets that are moved to the in-sequence queue are hard-ACK'd even before they're consumed - and then the Rx window size in the ACK packet (rsize) is shrunk down to compensate (even going to 0 if the window is full). Signed-off-by: David Howells <dhowells@redhat.com> cc: Marc Dionne <marc.dionne@auristor.com> cc: linux-afs@lists.infradead.org
Diffstat (limited to 'net/rxrpc/output.c')
-rw-r--r--net/rxrpc/output.c95
1 files changed, 58 insertions, 37 deletions
diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index f7bb792c8aa1..0a4f37d7b6b5 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -74,47 +74,64 @@ static void rxrpc_set_keepalive(struct rxrpc_call *call)
*/
static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn,
struct rxrpc_call *call,
- struct rxrpc_txbuf *txb,
- rxrpc_seq_t *_hard_ack,
- rxrpc_seq_t *_top)
+ struct rxrpc_txbuf *txb)
{
struct rxrpc_ackinfo ackinfo;
- unsigned int tmp;
- rxrpc_seq_t hard_ack, top, seq;
- int ix;
+ unsigned int qsize;
+ rxrpc_seq_t window, wtop, wrap_point, ix, first;
+ int rsize;
+ u64 wtmp;
u32 mtu, jmax;
u8 *ackp = txb->acks;
+ u8 sack_buffer[sizeof(call->ackr_sack_table)] __aligned(8);
- tmp = atomic_xchg(&call->ackr_nr_unacked, 0);
- tmp |= atomic_xchg(&call->ackr_nr_consumed, 0);
- if (!tmp && (txb->ack.reason == RXRPC_ACK_DELAY ||
- txb->ack.reason == RXRPC_ACK_IDLE)) {
- rxrpc_inc_stat(call->rxnet, stat_tx_ack_skip);
- return 0;
- }
-
+ atomic_set(&call->ackr_nr_unacked, 0);
+ atomic_set(&call->ackr_nr_consumed, 0);
rxrpc_inc_stat(call->rxnet, stat_tx_ack_fill);
/* Barrier against rxrpc_input_data(). */
- hard_ack = READ_ONCE(call->rx_hard_ack);
- top = smp_load_acquire(&call->rx_top);
- *_hard_ack = hard_ack;
- *_top = top;
-
- txb->ack.firstPacket = htonl(hard_ack + 1);
- txb->ack.previousPacket = htonl(call->ackr_highest_seq);
- txb->ack.nAcks = top - hard_ack;
-
- if (txb->ack.nAcks) {
- seq = hard_ack + 1;
- do {
- ix = seq & RXRPC_RXTX_BUFF_MASK;
- if (call->rxtx_buffer[ix])
- *ackp++ = RXRPC_ACK_TYPE_ACK;
- else
- *ackp++ = RXRPC_ACK_TYPE_NACK;
- seq++;
- } while (before_eq(seq, top));
+retry:
+ wtmp = atomic64_read_acquire(&call->ackr_window);
+ window = lower_32_bits(wtmp);
+ wtop = upper_32_bits(wtmp);
+ txb->ack.firstPacket = htonl(window);
+ txb->ack.nAcks = 0;
+
+ if (after(wtop, window)) {
+ /* Try to copy the SACK ring locklessly. We can use the copy,
+ * only if the now-current top of the window didn't go past the
+ * previously read base - otherwise we can't know whether we
+ * have old data or new data.
+ */
+ memcpy(sack_buffer, call->ackr_sack_table, sizeof(sack_buffer));
+ wrap_point = window + RXRPC_SACK_SIZE - 1;
+ wtmp = atomic64_read_acquire(&call->ackr_window);
+ window = lower_32_bits(wtmp);
+ wtop = upper_32_bits(wtmp);
+ if (after(wtop, wrap_point)) {
+ cond_resched();
+ goto retry;
+ }
+
+ /* The buffer is maintained as a ring with an invariant mapping
+ * between bit position and sequence number, so we'll probably
+ * need to rotate it.
+ */
+ txb->ack.nAcks = wtop - window;
+ ix = window % RXRPC_SACK_SIZE;
+ first = sizeof(sack_buffer) - ix;
+
+ if (ix + txb->ack.nAcks <= RXRPC_SACK_SIZE) {
+ memcpy(txb->acks, sack_buffer + ix, txb->ack.nAcks);
+ } else {
+ memcpy(txb->acks, sack_buffer + ix, first);
+ memcpy(txb->acks + first, sack_buffer,
+ txb->ack.nAcks - first);
+ }
+
+ ackp += txb->ack.nAcks;
+ } else if (before(wtop, window)) {
+ pr_warn("ack window backward %x %x", window, wtop);
} else if (txb->ack.reason == RXRPC_ACK_DELAY) {
txb->ack.reason = RXRPC_ACK_IDLE;
}
@@ -122,16 +139,18 @@ static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn,
mtu = conn->params.peer->if_mtu;
mtu -= conn->params.peer->hdrsize;
jmax = rxrpc_rx_jumbo_max;
+ qsize = (window - 1) - call->rx_consumed;
+ rsize = max_t(int, call->rx_winsize - qsize, 0);
ackinfo.rxMTU = htonl(rxrpc_rx_mtu);
ackinfo.maxMTU = htonl(mtu);
- ackinfo.rwind = htonl(call->rx_winsize);
+ ackinfo.rwind = htonl(rsize);
ackinfo.jumbo_max = htonl(jmax);
*ackp++ = 0;
*ackp++ = 0;
*ackp++ = 0;
memcpy(ackp, &ackinfo, sizeof(ackinfo));
- return top - hard_ack + 3 + sizeof(ackinfo);
+ return txb->ack.nAcks + 3 + sizeof(ackinfo);
}
/*
@@ -188,7 +207,6 @@ static int rxrpc_send_ack_packet(struct rxrpc_local *local, struct rxrpc_txbuf *
struct msghdr msg;
struct kvec iov[1];
rxrpc_serial_t serial;
- rxrpc_seq_t hard_ack, top;
size_t len, n;
int ret, rtt_slot = -1;
@@ -212,7 +230,7 @@ static int rxrpc_send_ack_packet(struct rxrpc_local *local, struct rxrpc_txbuf *
clear_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags);
spin_lock_bh(&call->lock);
- n = rxrpc_fill_out_ack(conn, call, txb, &hard_ack, &top);
+ n = rxrpc_fill_out_ack(conn, call, txb);
spin_unlock_bh(&call->lock);
if (n == 0) {
kfree(pkt);
@@ -236,6 +254,9 @@ static int rxrpc_send_ack_packet(struct rxrpc_local *local, struct rxrpc_txbuf *
rxrpc_inc_stat(call->rxnet, stat_tx_ack_send);
+ /* Grab the highest received seq as late as possible */
+ txb->ack.previousPacket = htonl(call->rx_highest_seq);
+
iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len);
ret = do_udp_sendmsg(conn->params.local->socket, &msg, len);
call->peer->last_tx_at = ktime_get_seconds();