diff options
author | David Howells <dhowells@redhat.com> | 2020-01-23 13:13:41 +0000 |
---|---|---|
committer | David Howells <dhowells@redhat.com> | 2022-12-01 13:36:42 +0000 |
commit | 5e6ef4f1017c7f844e305283bbd8875af475e2fc (patch) | |
tree | a1af5b9ab3f538d84a50214be6c41fd0700d4bca /net/rxrpc/call_object.c | |
parent | 393a2a2007d13df7ae54c94328b45b6c2269b6a9 (diff) | |
download | lwn-5e6ef4f1017c7f844e305283bbd8875af475e2fc.tar.gz lwn-5e6ef4f1017c7f844e305283bbd8875af475e2fc.zip |
rxrpc: Make the I/O thread take over the call and local processor work
Move the functions from the call->processor and local->processor work items
into the domain of the I/O thread.
The call event processor, now called from the I/O thread, then takes over
the job of cranking the call state machine, processing incoming packets and
transmitting DATA, ACK and ABORT packets. In a future patch,
rxrpc_send_ACK() will transmit the ACK on the spot rather than queuing it
for later transmission.
The call event processor becomes purely received-skb driven. It only
transmits things in response to events. We use "pokes" to queue a dummy
skb to make it do things like start/resume transmitting data. Timer expiry
also results in pokes.
The connection event processor, becomes similar, though crypto events, such
as dealing with CHALLENGE and RESPONSE packets is offloaded to a work item
to avoid doing crypto in the I/O thread.
The local event processor is removed and VERSION response packets are
generated directly from the packet parser. Similarly, ABORTs generated in
response to protocol errors will be transmitted immediately rather than
being pushed onto a queue for later transmission.
Changes:
========
ver #2)
- Fix a couple of introduced lock context imbalances.
Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
Diffstat (limited to 'net/rxrpc/call_object.c')
-rw-r--r-- | net/rxrpc/call_object.c | 56 |
1 files changed, 34 insertions, 22 deletions
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c index 7570b4e67bc5..d441a715d988 100644 --- a/net/rxrpc/call_object.c +++ b/net/rxrpc/call_object.c @@ -71,7 +71,7 @@ static void rxrpc_call_timer_expired(struct timer_list *t) if (call->state < RXRPC_CALL_COMPLETE) { trace_rxrpc_timer_expired(call, jiffies); - rxrpc_queue_call(call, rxrpc_call_queue_timer); + rxrpc_poke_call(call, rxrpc_call_poke_timer); } } @@ -148,7 +148,6 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp, &rxrpc_call_user_mutex_lock_class_key); timer_setup(&call->timer, rxrpc_call_timer_expired, 0); - INIT_WORK(&call->processor, rxrpc_process_call); INIT_WORK(&call->destroyer, rxrpc_destroy_call); INIT_LIST_HEAD(&call->link); INIT_LIST_HEAD(&call->chan_wait_link); @@ -163,7 +162,6 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp, init_waitqueue_head(&call->waitq); spin_lock_init(&call->notify_lock); spin_lock_init(&call->tx_lock); - spin_lock_init(&call->acks_ack_lock); rwlock_init(&call->state_lock); refcount_set(&call->ref, 1); call->debug_id = debug_id; @@ -252,6 +250,7 @@ static void rxrpc_start_call_timer(struct rxrpc_call *call) call->ack_lost_at = j; call->resend_at = j; call->ping_at = j; + call->keepalive_at = j; call->expect_rx_by = j; call->expect_req_by = j; call->expect_term_by = j; @@ -430,6 +429,29 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx, call->state = RXRPC_CALL_SERVER_SECURING; call->cong_tstamp = skb->tstamp; + spin_lock(&conn->state_lock); + + switch (conn->state) { + case RXRPC_CONN_SERVICE_UNSECURED: + case RXRPC_CONN_SERVICE_CHALLENGING: + call->state = RXRPC_CALL_SERVER_SECURING; + break; + case RXRPC_CONN_SERVICE: + call->state = RXRPC_CALL_SERVER_RECV_REQUEST; + break; + + case RXRPC_CONN_REMOTELY_ABORTED: + __rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED, + conn->abort_code, conn->error); + break; + case RXRPC_CONN_LOCALLY_ABORTED: + __rxrpc_abort_call("CON", call, 1, + conn->abort_code, conn->error); + break; + default: + BUG(); + } + /* Set the channel for this call. We don't get channel_lock as we're * only defending against the data_ready handler (which we're called * from) and the RESPONSE packet parser (which is only really @@ -440,6 +462,7 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx, conn->channels[chan].call_counter = call->call_id; conn->channels[chan].call_id = call->call_id; rcu_assign_pointer(conn->channels[chan].call, call); + spin_unlock(&conn->state_lock); spin_lock(&conn->peer->lock); hlist_add_head(&call->error_link, &conn->peer->error_targets); @@ -450,15 +473,6 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx, } /* - * Queue a call's work processor. - */ -void rxrpc_queue_call(struct rxrpc_call *call, enum rxrpc_call_trace why) -{ - if (rxrpc_queue_work(&call->processor)) - trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), 0, why); -} - -/* * Note the re-emergence of a call. */ void rxrpc_see_call(struct rxrpc_call *call, enum rxrpc_call_trace why) @@ -470,14 +484,15 @@ void rxrpc_see_call(struct rxrpc_call *call, enum rxrpc_call_trace why) } } -bool rxrpc_try_get_call(struct rxrpc_call *call, enum rxrpc_call_trace why) +struct rxrpc_call *rxrpc_try_get_call(struct rxrpc_call *call, + enum rxrpc_call_trace why) { int r; - if (!__refcount_inc_not_zero(&call->ref, &r)) - return false; + if (!call || !__refcount_inc_not_zero(&call->ref, &r)) + return NULL; trace_rxrpc_call(call->debug_id, r + 1, 0, why); - return true; + return call; } /* @@ -638,8 +653,6 @@ static void rxrpc_destroy_call(struct work_struct *work) struct rxrpc_txbuf *txb; del_timer_sync(&call->timer); - cancel_work_sync(&call->processor); /* The processor may restart the timer */ - del_timer_sync(&call->timer); rxrpc_cleanup_ring(call); while ((txb = list_first_entry_or_null(&call->tx_sendmsg, @@ -652,8 +665,8 @@ static void rxrpc_destroy_call(struct work_struct *work) list_del(&txb->call_link); rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned); } + rxrpc_put_txbuf(call->tx_pending, rxrpc_txbuf_put_cleaned); - rxrpc_free_skb(call->acks_soft_tbl, rxrpc_skb_put_ack); rxrpc_put_connection(call->conn, rxrpc_conn_put_call); rxrpc_put_peer(call->peer, rxrpc_peer_put_call); rxrpc_put_local(call->local, rxrpc_local_put_call); @@ -670,10 +683,9 @@ void rxrpc_cleanup_call(struct rxrpc_call *call) ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE); ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags)); - del_timer_sync(&call->timer); - cancel_work(&call->processor); + del_timer(&call->timer); - if (rcu_read_lock_held() || work_busy(&call->processor)) + if (rcu_read_lock_held()) /* Can't use the rxrpc workqueue as we need to cancel/flush * something that may be running/waiting there. */ |