diff options
Diffstat (limited to 'net/rxrpc/sendmsg.c')
-rw-r--r-- | net/rxrpc/sendmsg.c | 222 |
1 files changed, 90 insertions, 132 deletions
diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c index 7376794a0308..1f8040d82395 100644 --- a/net/rxrpc/sendmsg.c +++ b/net/rxrpc/sendmsg.c @@ -15,7 +15,6 @@ #include <linux/gfp.h> #include <linux/skbuff.h> #include <linux/export.h> -#include <linux/circ_buf.h> #include <net/sock.h> #include <net/af_rxrpc.h> #include "ar-internal.h" @@ -38,24 +37,28 @@ static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx, DECLARE_WAITQUEUE(myself, current); int ret; - _enter(",{%d},%ld", - CIRC_SPACE(call->acks_head, ACCESS_ONCE(call->acks_tail), - call->acks_winsz), - *timeo); + _enter(",{%u,%u,%u}", + call->tx_hard_ack, call->tx_top, call->tx_winsize); add_wait_queue(&call->waitq, &myself); for (;;) { set_current_state(TASK_INTERRUPTIBLE); ret = 0; - if (CIRC_SPACE(call->acks_head, ACCESS_ONCE(call->acks_tail), - call->acks_winsz) > 0) + if (call->tx_top - call->tx_hard_ack < + min_t(unsigned int, call->tx_winsize, + call->cong_cwnd + call->cong_extra)) break; + if (call->state >= RXRPC_CALL_COMPLETE) { + ret = -call->error; + break; + } if (signal_pending(current)) { ret = sock_intr_errno(*timeo); break; } + trace_rxrpc_transmit(call, rxrpc_transmit_wait); release_sock(&rx->sk); *timeo = schedule_timeout(*timeo); lock_sock(&rx->sk); @@ -68,36 +71,55 @@ static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx, } /* - * attempt to schedule an instant Tx resend + * Schedule an instant Tx resend. */ -static inline void rxrpc_instant_resend(struct rxrpc_call *call) +static inline void rxrpc_instant_resend(struct rxrpc_call *call, int ix) { - read_lock_bh(&call->state_lock); - if (try_to_del_timer_sync(&call->resend_timer) >= 0) { - clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags); - if (call->state < RXRPC_CALL_COMPLETE && - !test_and_set_bit(RXRPC_CALL_EV_RESEND_TIMER, &call->events)) + spin_lock_bh(&call->lock); + + if (call->state < RXRPC_CALL_COMPLETE) { + call->rxtx_annotations[ix] = RXRPC_TX_ANNO_RETRANS; + if (!test_and_set_bit(RXRPC_CALL_EV_RESEND, &call->events)) rxrpc_queue_call(call); } - read_unlock_bh(&call->state_lock); + + spin_unlock_bh(&call->lock); } /* - * queue a packet for transmission, set the resend timer and attempt - * to send the packet immediately + * Queue a DATA packet for transmission, set the resend timeout and send the + * packet immediately */ static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb, bool last) { struct rxrpc_skb_priv *sp = rxrpc_skb(skb); - int ret; + rxrpc_seq_t seq = sp->hdr.seq; + int ret, ix; + u8 annotation = RXRPC_TX_ANNO_UNACK; - _net("queue skb %p [%d]", skb, call->acks_head); + _net("queue skb %p [%d]", skb, seq); - ASSERT(call->acks_window != NULL); - call->acks_window[call->acks_head] = (unsigned long) skb; + ASSERTCMP(seq, ==, call->tx_top + 1); + + if (last) + annotation |= RXRPC_TX_ANNO_LAST; + + /* We have to set the timestamp before queueing as the retransmit + * algorithm can see the packet as soon as we queue it. + */ + skb->tstamp = ktime_get_real(); + + ix = seq & RXRPC_RXTX_BUFF_MASK; + rxrpc_get_skb(skb, rxrpc_skb_tx_got); + call->rxtx_annotations[ix] = annotation; smp_wmb(); - call->acks_head = (call->acks_head + 1) & (call->acks_winsz - 1); + call->rxtx_buffer[ix] = skb; + call->tx_top = seq; + if (last) + trace_rxrpc_transmit(call, rxrpc_transmit_queue_last); + else + trace_rxrpc_transmit(call, rxrpc_transmit_queue); if (last || call->state == RXRPC_CALL_SERVER_ACK_REQUEST) { _debug("________awaiting reply/ACK__________"); @@ -119,60 +141,26 @@ static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb, write_unlock_bh(&call->state_lock); } - _proto("Tx DATA %%%u { #%u }", sp->hdr.serial, sp->hdr.seq); - - sp->need_resend = false; - sp->resend_at = jiffies + rxrpc_resend_timeout; - if (!test_and_set_bit(RXRPC_CALL_RUN_RTIMER, &call->flags)) { - _debug("run timer"); - call->resend_timer.expires = sp->resend_at; - add_timer(&call->resend_timer); - } - - /* attempt to cancel the rx-ACK timer, deferring reply transmission if - * we're ACK'ing the request phase of an incoming call */ - ret = -EAGAIN; - if (try_to_del_timer_sync(&call->ack_timer) >= 0) { - /* the packet may be freed by rxrpc_process_call() before this - * returns */ - if (rxrpc_is_client_call(call)) - rxrpc_expose_client_call(call); - ret = rxrpc_send_data_packet(call->conn, skb); - _net("sent skb %p", skb); - } else { - _debug("failed to delete ACK timer"); - } + if (seq == 1 && rxrpc_is_client_call(call)) + rxrpc_expose_client_call(call); + ret = rxrpc_send_data_packet(call, skb); if (ret < 0) { _debug("need instant resend %d", ret); - sp->need_resend = true; - rxrpc_instant_resend(call); - } + rxrpc_instant_resend(call, ix); + } else { + unsigned long resend_at; - _leave(""); -} + resend_at = jiffies + msecs_to_jiffies(rxrpc_resend_timeout); -/* - * Convert a host-endian header into a network-endian header. - */ -static void rxrpc_insert_header(struct sk_buff *skb) -{ - struct rxrpc_wire_header whdr; - struct rxrpc_skb_priv *sp = rxrpc_skb(skb); + if (time_before(resend_at, call->resend_at)) { + call->resend_at = resend_at; + rxrpc_set_timer(call, rxrpc_timer_set_for_send); + } + } - whdr.epoch = htonl(sp->hdr.epoch); - whdr.cid = htonl(sp->hdr.cid); - whdr.callNumber = htonl(sp->hdr.callNumber); - whdr.seq = htonl(sp->hdr.seq); - whdr.serial = htonl(sp->hdr.serial); - whdr.type = sp->hdr.type; - whdr.flags = sp->hdr.flags; - whdr.userStatus = sp->hdr.userStatus; - whdr.securityIndex = sp->hdr.securityIndex; - whdr._rsvd = htons(sp->hdr._rsvd); - whdr.serviceId = htons(sp->hdr.serviceId); - - memcpy(skb->head, &whdr, sizeof(whdr)); + rxrpc_free_skb(skb, rxrpc_skb_tx_freed); + _leave(""); } /* @@ -203,18 +191,22 @@ static int rxrpc_send_data(struct rxrpc_sock *rx, skb = call->tx_pending; call->tx_pending = NULL; - rxrpc_see_skb(skb); + rxrpc_see_skb(skb, rxrpc_skb_tx_seen); copied = 0; do { + /* Check to see if there's a ping ACK to reply to. */ + if (call->ackr_reason == RXRPC_ACK_PING_RESPONSE) + rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ACK); + if (!skb) { size_t size, chunk, max, space; _debug("alloc"); - if (CIRC_SPACE(call->acks_head, - ACCESS_ONCE(call->acks_tail), - call->acks_winsz) <= 0) { + if (call->tx_top - call->tx_hard_ack >= + min_t(unsigned int, call->tx_winsize, + call->cong_cwnd + call->cong_extra)) { ret = -EAGAIN; if (msg->msg_flags & MSG_DONTWAIT) goto maybe_error; @@ -224,7 +216,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx, goto maybe_error; } - max = call->conn->params.peer->maxdata; + max = RXRPC_JUMBO_DATALEN; max -= call->conn->security_size; max &= ~(call->conn->size_align - 1UL); @@ -235,7 +227,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx, space = chunk + call->conn->size_align; space &= ~(call->conn->size_align - 1UL); - size = space + call->conn->header_size; + size = space + call->conn->security_size; _debug("SIZE: %zu/%zu/%zu", chunk, space, size); @@ -245,15 +237,15 @@ static int rxrpc_send_data(struct rxrpc_sock *rx, if (!skb) goto maybe_error; - rxrpc_new_skb(skb); + rxrpc_new_skb(skb, rxrpc_skb_tx_new); _debug("ALLOC SEND %p", skb); ASSERTCMP(skb->mark, ==, 0); - _debug("HS: %u", call->conn->header_size); - skb_reserve(skb, call->conn->header_size); - skb->len += call->conn->header_size; + _debug("HS: %u", call->conn->security_size); + skb_reserve(skb, call->conn->security_size); + skb->len += call->conn->security_size; sp = rxrpc_skb(skb); sp->remain = chunk; @@ -313,36 +305,23 @@ static int rxrpc_send_data(struct rxrpc_sock *rx, memset(skb_put(skb, pad), 0, pad); } - seq = atomic_inc_return(&call->sequence); + seq = call->tx_top + 1; - sp->hdr.epoch = conn->proto.epoch; - sp->hdr.cid = call->cid; - sp->hdr.callNumber = call->call_id; sp->hdr.seq = seq; - sp->hdr.serial = atomic_inc_return(&conn->serial); - sp->hdr.type = RXRPC_PACKET_TYPE_DATA; - sp->hdr.userStatus = 0; - sp->hdr.securityIndex = conn->security_ix; sp->hdr._rsvd = 0; - sp->hdr.serviceId = call->service_id; + sp->hdr.flags = conn->out_clientflag; - sp->hdr.flags = conn->out_clientflag; if (msg_data_left(msg) == 0 && !more) sp->hdr.flags |= RXRPC_LAST_PACKET; - else if (CIRC_SPACE(call->acks_head, - ACCESS_ONCE(call->acks_tail), - call->acks_winsz) > 1) + else if (call->tx_top - call->tx_hard_ack < + call->tx_winsize) sp->hdr.flags |= RXRPC_MORE_PACKETS; - if (more && seq & 1) - sp->hdr.flags |= RXRPC_REQUEST_ACK; ret = conn->security->secure_packet( - call, skb, skb->mark, - skb->head + sizeof(struct rxrpc_wire_header)); + call, skb, skb->mark, skb->head); if (ret < 0) goto out; - rxrpc_insert_header(skb); rxrpc_queue_packet(call, skb, !msg_data_left(msg) && !more); skb = NULL; } @@ -356,9 +335,9 @@ out: return ret; call_terminated: - rxrpc_free_skb(skb); + rxrpc_free_skb(skb, rxrpc_skb_tx_freed); _leave(" = %d", -call->error); - return ret; + return -call->error; maybe_error: if (copied) @@ -452,28 +431,6 @@ static int rxrpc_sendmsg_cmsg(struct msghdr *msg, } /* - * abort a call, sending an ABORT packet to the peer - */ -static void rxrpc_send_abort(struct rxrpc_call *call, u32 abort_code) -{ - if (call->state >= RXRPC_CALL_COMPLETE) - return; - - write_lock_bh(&call->state_lock); - - if (__rxrpc_abort_call(call, abort_code, ECONNABORTED)) { - del_timer_sync(&call->resend_timer); - del_timer_sync(&call->ack_timer); - clear_bit(RXRPC_CALL_EV_RESEND_TIMER, &call->events); - clear_bit(RXRPC_CALL_EV_ACK, &call->events); - clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags); - rxrpc_queue_call(call); - } - - write_unlock_bh(&call->state_lock); -} - -/* * Create a new client call for sendmsg(). */ static struct rxrpc_call * @@ -534,7 +491,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) call = rxrpc_accept_call(rx, user_call_ID, NULL); if (IS_ERR(call)) return PTR_ERR(call); - rxrpc_put_call(call); + rxrpc_put_call(call, rxrpc_call_put); return 0; } @@ -548,7 +505,6 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) return PTR_ERR(call); } - rxrpc_see_call(call); _debug("CALL %d USR %lx ST %d on CONN %p", call->debug_id, call->user_call_ID, call->state, call->conn); @@ -556,8 +512,10 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) /* it's too late for this call */ ret = -ESHUTDOWN; } else if (cmd == RXRPC_CMD_SEND_ABORT) { - rxrpc_send_abort(call, abort_code); ret = 0; + if (rxrpc_abort_call("CMD", call, 0, abort_code, ECONNABORTED)) + ret = rxrpc_send_call_packet(call, + RXRPC_PACKET_TYPE_ABORT); } else if (cmd != RXRPC_CMD_SEND_DATA) { ret = -EINVAL; } else if (rxrpc_is_client_call(call) && @@ -573,7 +531,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) ret = rxrpc_send_data(rx, call, msg, len); } - rxrpc_put_call(call); + rxrpc_put_call(call, rxrpc_call_put); _leave(" = %d", ret); return ret; } @@ -626,20 +584,20 @@ EXPORT_SYMBOL(rxrpc_kernel_send_data); * @sock: The socket the call is on * @call: The call to be aborted * @abort_code: The abort code to stick into the ABORT packet + * @error: Local error value + * @why: 3-char string indicating why. * * Allow a kernel service to abort a call, if it's still in an abortable state. */ void rxrpc_kernel_abort_call(struct socket *sock, struct rxrpc_call *call, - u32 abort_code) + u32 abort_code, int error, const char *why) { - _enter("{%d},%d", call->debug_id, abort_code); + _enter("{%d},%d,%d,%s", call->debug_id, abort_code, error, why); lock_sock(sock->sk); - _debug("CALL %d USR %lx ST %d on CONN %p", - call->debug_id, call->user_call_ID, call->state, call->conn); - - rxrpc_send_abort(call, abort_code); + if (rxrpc_abort_call(why, call, 0, abort_code, error)) + rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ABORT); release_sock(sock->sk); _leave(""); |