diff options
Diffstat (limited to 'net/rxrpc/sendmsg.c')
-rw-r--r-- | net/rxrpc/sendmsg.c | 106 |
1 files changed, 78 insertions, 28 deletions
diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c index 6abb8eec1b2b..84dc6c94f23b 100644 --- a/net/rxrpc/sendmsg.c +++ b/net/rxrpc/sendmsg.c @@ -94,9 +94,11 @@ no_wait: */ static bool rxrpc_check_tx_space(struct rxrpc_call *call, rxrpc_seq_t *_tx_win) { + rxrpc_seq_t tx_bottom = READ_ONCE(call->tx_bottom); + if (_tx_win) - *_tx_win = call->tx_bottom; - return call->tx_prepared - call->tx_bottom < 256; + *_tx_win = tx_bottom; + return call->send_top - tx_bottom < 256; } /* @@ -132,13 +134,13 @@ static int rxrpc_wait_for_tx_window_waitall(struct rxrpc_sock *rx, rxrpc_seq_t tx_start, tx_win; signed long rtt, timeout; - rtt = READ_ONCE(call->peer->srtt_us) >> 3; + rtt = READ_ONCE(call->srtt_us) >> 3; rtt = usecs_to_jiffies(rtt) * 2; if (rtt < 2) rtt = 2; timeout = rtt; - tx_start = smp_load_acquire(&call->acks_hard_ack); + tx_start = READ_ONCE(call->tx_bottom); for (;;) { set_current_state(TASK_UNINTERRUPTIBLE); @@ -195,8 +197,8 @@ static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx, DECLARE_WAITQUEUE(myself, current); int ret; - _enter(",{%u,%u,%u,%u}", - call->tx_bottom, call->acks_hard_ack, call->tx_top, call->tx_winsize); + _enter(",{%u,%u,%u}", + call->tx_bottom, call->tx_top, call->tx_winsize); add_wait_queue(&call->waitq, &myself); @@ -240,37 +242,77 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call, struct rxrpc_txbuf *txb, rxrpc_notify_end_tx_t notify_end_tx) { + struct rxrpc_txqueue *sq = call->send_queue; rxrpc_seq_t seq = txb->seq; bool poke, last = txb->flags & RXRPC_LAST_PACKET; - + int ix = seq & RXRPC_TXQ_MASK; rxrpc_inc_stat(call->rxnet, stat_tx_data); - ASSERTCMP(txb->seq, ==, call->tx_prepared + 1); - - /* We have to set the timestamp before queueing as the retransmit - * algorithm can see the packet as soon as we queue it. - */ - txb->last_sent = ktime_get_real(); + ASSERTCMP(txb->seq, ==, call->send_top + 1); if (last) trace_rxrpc_txqueue(call, rxrpc_txqueue_queue_last); else trace_rxrpc_txqueue(call, rxrpc_txqueue_queue); + if (WARN_ON_ONCE(sq->bufs[ix])) + trace_rxrpc_tq(call, sq, seq, rxrpc_tq_queue_dup); + else + trace_rxrpc_tq(call, sq, seq, rxrpc_tq_queue); + /* Add the packet to the call's output buffer */ - spin_lock(&call->tx_lock); - poke = list_empty(&call->tx_sendmsg); - list_add_tail(&txb->call_link, &call->tx_sendmsg); - call->tx_prepared = seq; - if (last) + poke = (READ_ONCE(call->tx_bottom) == call->send_top); + sq->bufs[ix] = txb; + /* Order send_top after the queue->next pointer and txb content. */ + smp_store_release(&call->send_top, seq); + if (last) { + set_bit(RXRPC_CALL_TX_NO_MORE, &call->flags); rxrpc_notify_end_tx(rx, call, notify_end_tx); - spin_unlock(&call->tx_lock); + call->send_queue = NULL; + } if (poke) rxrpc_poke_call(call, rxrpc_call_poke_start); } /* + * Allocate a new txqueue unit and add it to the transmission queue. + */ +static int rxrpc_alloc_txqueue(struct sock *sk, struct rxrpc_call *call) +{ + struct rxrpc_txqueue *tq; + + tq = kzalloc(sizeof(*tq), sk->sk_allocation); + if (!tq) + return -ENOMEM; + + tq->xmit_ts_base = KTIME_MIN; + for (int i = 0; i < RXRPC_NR_TXQUEUE; i++) + tq->segment_xmit_ts[i] = UINT_MAX; + + if (call->send_queue) { + tq->qbase = call->send_top + 1; + call->send_queue->next = tq; + call->send_queue = tq; + } else if (WARN_ON(call->tx_queue)) { + kfree(tq); + return -ENOMEM; + } else { + /* We start at seq 1, so pretend seq 0 is hard-acked. */ + tq->nr_reported_acks = 1; + tq->segment_acked = 1UL; + tq->qbase = 0; + call->tx_qbase = 0; + call->send_queue = tq; + call->tx_qtail = tq; + call->tx_queue = tq; + } + + trace_rxrpc_tq(call, tq, call->send_top, rxrpc_tq_alloc); + return 0; +} + +/* * send data through a socket * - must be called in process context * - The caller holds the call user access mutex, but not the socket lock. @@ -288,6 +330,13 @@ static int rxrpc_send_data(struct rxrpc_sock *rx, bool more = msg->msg_flags & MSG_MORE; int ret, copied = 0; + if (test_bit(RXRPC_CALL_TX_NO_MORE, &call->flags)) { + trace_rxrpc_abort(call->debug_id, rxrpc_sendmsg_late_send, + call->cid, call->call_id, call->rx_consumed, + 0, -EPROTO); + return -EPROTO; + } + timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT); ret = rxrpc_wait_to_be_connected(call, &timeo); @@ -344,6 +393,13 @@ reload: if (!rxrpc_check_tx_space(call, NULL)) goto wait_for_space; + /* See if we need to begin/extend the Tx queue. */ + if (!call->send_queue || !((call->send_top + 1) & RXRPC_TXQ_MASK)) { + ret = rxrpc_alloc_txqueue(sk, call); + if (ret < 0) + goto maybe_error; + } + /* Work out the maximum size of a packet. Assume that * the security header is going to be in the padded * region (enc blocksize), but the trailer is not. @@ -360,10 +416,10 @@ reload: /* append next segment of data to the current buffer */ if (msg_data_left(msg) > 0) { - size_t copy = min_t(size_t, txb->space, msg_data_left(msg)); + size_t copy = umin(txb->space, msg_data_left(msg)); _debug("add %zu", copy); - if (!copy_from_iter_full(txb->kvec[0].iov_base + txb->offset, + if (!copy_from_iter_full(txb->data + txb->offset, copy, &msg->msg_iter)) goto efault; _debug("added"); @@ -385,16 +441,10 @@ reload: (msg_data_left(msg) == 0 && !more)) { if (msg_data_left(msg) == 0 && !more) txb->flags |= RXRPC_LAST_PACKET; - else if (call->tx_top - call->acks_hard_ack < - call->tx_winsize) - txb->flags |= RXRPC_MORE_PACKETS; ret = call->security->secure_packet(call, txb); if (ret < 0) goto out; - - txb->kvec[0].iov_len += txb->len; - txb->len = txb->kvec[0].iov_len; rxrpc_queue_packet(rx, call, txb, notify_end_tx); txb = NULL; } @@ -655,7 +705,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) } else { switch (rxrpc_call_state(call)) { case RXRPC_CALL_CLIENT_AWAIT_CONN: - case RXRPC_CALL_SERVER_SECURING: + case RXRPC_CALL_SERVER_RECV_REQUEST: if (p.command == RXRPC_CMD_SEND_ABORT) break; fallthrough; |