diff options
author | David Howells <dhowells@redhat.com> | 2022-11-25 12:43:50 +0000 |
---|---|---|
committer | David Howells <dhowells@redhat.com> | 2022-12-01 13:36:40 +0000 |
commit | 3cec055c56958c5498eeb3ed9fb2aef2d28c030f (patch) | |
tree | e4ca198e2d35ebebc1e1116f50caca90b176868c /net/rxrpc/conn_object.c | |
parent | 3feda9d69c83983b530cea6287ba4fea0e5c3b87 (diff) | |
download | lwn-3cec055c56958c5498eeb3ed9fb2aef2d28c030f.tar.gz lwn-3cec055c56958c5498eeb3ed9fb2aef2d28c030f.zip |
rxrpc: Don't hold a ref for connection workqueue
Currently, rxrpc gives the connection's work item a ref on the connection
when it queues it - and this is called from the timer expiration function.
The problem comes when queue_work() fails (ie. the work item is already
queued): the timer routine must put the ref - but this may cause the
cleanup code to run.
This has the unfortunate effect that the cleanup code may then be run in
softirq context - which means that any spinlocks it might need to touch
have to be guarded to disable softirqs (ie. they need a "_bh" suffix).
(1) Don't give a ref to the work item.
(2) Simplify handling of service connections by adding a separate active
count so that the refcount isn't also used for this.
(3) Connection destruction for both client and service connections can
then be cleaned up by putting rxrpc_put_connection() out of line and
making a tidy progression through the destruction code (offloaded to a
workqueue if put from softirq or processor function context). The RCU
part of the cleanup then only deals with the freeing at the end.
(4) Make rxrpc_queue_conn() return immediately if it sees the active count
is -1 rather then queuing the connection.
(5) Make sure that the cleanup routine waits for the work item to
complete.
(6) Stash the rxrpc_net pointer in the conn struct so that the rcu free
routine can use it, even if the local endpoint has been freed.
Unfortunately, neither the timer nor the work item can simply get around
the problem by just using refcount_inc_not_zero() as the waits would still
have to be done, and there would still be the possibility of having to put
the ref in the expiration function.
Note the connection work item is mostly going to go away with the main
event work being transferred to the I/O thread, so the wait in (6) will
become obsolete.
Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
Diffstat (limited to 'net/rxrpc/conn_object.c')
-rw-r--r-- | net/rxrpc/conn_object.c | 169 |
1 files changed, 94 insertions, 75 deletions
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c index f7c271a740ed..c2e05ea29f12 100644 --- a/net/rxrpc/conn_object.c +++ b/net/rxrpc/conn_object.c @@ -19,7 +19,9 @@ unsigned int __read_mostly rxrpc_connection_expiry = 10 * 60; unsigned int __read_mostly rxrpc_closed_conn_expiry = 10; -static void rxrpc_destroy_connection(struct rcu_head *); +static void rxrpc_clean_up_connection(struct work_struct *work); +static void rxrpc_set_service_reap_timer(struct rxrpc_net *rxnet, + unsigned long reap_at); static void rxrpc_connection_timer(struct timer_list *timer) { @@ -32,7 +34,8 @@ static void rxrpc_connection_timer(struct timer_list *timer) /* * allocate a new connection */ -struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp) +struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *rxnet, + gfp_t gfp) { struct rxrpc_connection *conn; @@ -42,10 +45,12 @@ struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp) if (conn) { INIT_LIST_HEAD(&conn->cache_link); timer_setup(&conn->timer, &rxrpc_connection_timer, 0); - INIT_WORK(&conn->processor, &rxrpc_process_connection); + INIT_WORK(&conn->processor, rxrpc_process_connection); + INIT_WORK(&conn->destructor, rxrpc_clean_up_connection); INIT_LIST_HEAD(&conn->proc_link); INIT_LIST_HEAD(&conn->link); skb_queue_head_init(&conn->rx_queue); + conn->rxnet = rxnet; conn->security = &rxrpc_no_security; spin_lock_init(&conn->state_lock); conn->debug_id = atomic_inc_return(&rxrpc_debug_id); @@ -224,53 +229,20 @@ void rxrpc_disconnect_call(struct rxrpc_call *call) set_bit(RXRPC_CALL_DISCONNECTED, &call->flags); conn->idle_timestamp = jiffies; -} - -/* - * Kill off a connection. - */ -void rxrpc_kill_connection(struct rxrpc_connection *conn) -{ - struct rxrpc_net *rxnet = conn->local->rxnet; - - ASSERT(!rcu_access_pointer(conn->channels[0].call) && - !rcu_access_pointer(conn->channels[1].call) && - !rcu_access_pointer(conn->channels[2].call) && - !rcu_access_pointer(conn->channels[3].call)); - ASSERT(list_empty(&conn->cache_link)); - - write_lock(&rxnet->conn_lock); - list_del_init(&conn->proc_link); - write_unlock(&rxnet->conn_lock); - - /* Drain the Rx queue. Note that even though we've unpublished, an - * incoming packet could still be being added to our Rx queue, so we - * will need to drain it again in the RCU cleanup handler. - */ - rxrpc_purge_queue(&conn->rx_queue); - - /* Leave final destruction to RCU. The connection processor work item - * must carry a ref on the connection to prevent us getting here whilst - * it is queued or running. - */ - call_rcu(&conn->rcu, rxrpc_destroy_connection); + if (atomic_dec_and_test(&conn->active)) + rxrpc_set_service_reap_timer(conn->rxnet, + jiffies + rxrpc_connection_expiry); } /* * Queue a connection's work processor, getting a ref to pass to the work * queue. */ -bool rxrpc_queue_conn(struct rxrpc_connection *conn, enum rxrpc_conn_trace why) +void rxrpc_queue_conn(struct rxrpc_connection *conn, enum rxrpc_conn_trace why) { - int r; - - if (!__refcount_inc_not_zero(&conn->ref, &r)) - return false; - if (rxrpc_queue_work(&conn->processor)) - trace_rxrpc_conn(conn->debug_id, why, r + 1); - else - rxrpc_put_connection(conn, rxrpc_conn_put_already_queued); - return true; + if (atomic_read(&conn->active) >= 0 && + rxrpc_queue_work(&conn->processor)) + rxrpc_see_connection(conn, why); } /* @@ -328,50 +300,95 @@ static void rxrpc_set_service_reap_timer(struct rxrpc_net *rxnet, } /* - * Release a service connection - */ -void rxrpc_put_service_conn(struct rxrpc_connection *conn, - enum rxrpc_conn_trace why) -{ - unsigned int debug_id = conn->debug_id; - int r; - - __refcount_dec(&conn->ref, &r); - trace_rxrpc_conn(debug_id, r - 1, why); - if (r - 1 == 1) - rxrpc_set_service_reap_timer(conn->local->rxnet, - jiffies + rxrpc_connection_expiry); -} - -/* * destroy a virtual connection */ -static void rxrpc_destroy_connection(struct rcu_head *rcu) +static void rxrpc_rcu_free_connection(struct rcu_head *rcu) { struct rxrpc_connection *conn = container_of(rcu, struct rxrpc_connection, rcu); + struct rxrpc_net *rxnet = conn->rxnet; _enter("{%d,u=%d}", conn->debug_id, refcount_read(&conn->ref)); trace_rxrpc_conn(conn->debug_id, refcount_read(&conn->ref), rxrpc_conn_free); + kfree(conn); - ASSERTCMP(refcount_read(&conn->ref), ==, 0); + if (atomic_dec_and_test(&rxnet->nr_conns)) + wake_up_var(&rxnet->nr_conns); +} + +/* + * Clean up a dead connection. + */ +static void rxrpc_clean_up_connection(struct work_struct *work) +{ + struct rxrpc_connection *conn = + container_of(work, struct rxrpc_connection, destructor); + struct rxrpc_net *rxnet = conn->rxnet; + + ASSERT(!rcu_access_pointer(conn->channels[0].call) && + !rcu_access_pointer(conn->channels[1].call) && + !rcu_access_pointer(conn->channels[2].call) && + !rcu_access_pointer(conn->channels[3].call)); + ASSERT(list_empty(&conn->cache_link)); del_timer_sync(&conn->timer); + cancel_work_sync(&conn->processor); /* Processing may restart the timer */ + del_timer_sync(&conn->timer); + + write_lock(&rxnet->conn_lock); + list_del_init(&conn->proc_link); + write_unlock(&rxnet->conn_lock); + rxrpc_purge_queue(&conn->rx_queue); + rxrpc_kill_client_conn(conn); + conn->security->clear(conn); key_put(conn->key); rxrpc_put_bundle(conn->bundle, rxrpc_bundle_put_conn); rxrpc_put_peer(conn->peer, rxrpc_peer_put_conn); - - if (atomic_dec_and_test(&conn->local->rxnet->nr_conns)) - wake_up_var(&conn->local->rxnet->nr_conns); rxrpc_put_local(conn->local, rxrpc_local_put_kill_conn); - kfree(conn); - _leave(""); + /* Drain the Rx queue. Note that even though we've unpublished, an + * incoming packet could still be being added to our Rx queue, so we + * will need to drain it again in the RCU cleanup handler. + */ + rxrpc_purge_queue(&conn->rx_queue); + + call_rcu(&conn->rcu, rxrpc_rcu_free_connection); +} + +/* + * Drop a ref on a connection. + */ +void rxrpc_put_connection(struct rxrpc_connection *conn, + enum rxrpc_conn_trace why) +{ + unsigned int debug_id; + bool dead; + int r; + + if (!conn) + return; + + debug_id = conn->debug_id; + dead = __refcount_dec_and_test(&conn->ref, &r); + trace_rxrpc_conn(debug_id, r - 1, why); + if (dead) { + del_timer(&conn->timer); + cancel_work(&conn->processor); + + if (in_softirq() || work_busy(&conn->processor) || + timer_pending(&conn->timer)) + /* Can't use the rxrpc workqueue as we need to cancel/flush + * something that may be running/waiting there. + */ + schedule_work(&conn->destructor); + else + rxrpc_clean_up_connection(&conn->destructor); + } } /* @@ -383,6 +400,7 @@ void rxrpc_service_connection_reaper(struct work_struct *work) struct rxrpc_net *rxnet = container_of(work, struct rxrpc_net, service_conn_reaper); unsigned long expire_at, earliest, idle_timestamp, now; + int active; LIST_HEAD(graveyard); @@ -393,8 +411,8 @@ void rxrpc_service_connection_reaper(struct work_struct *work) write_lock(&rxnet->conn_lock); list_for_each_entry_safe(conn, _p, &rxnet->service_conns, link) { - ASSERTCMP(refcount_read(&conn->ref), >, 0); - if (likely(refcount_read(&conn->ref) > 1)) + ASSERTCMP(atomic_read(&conn->active), >=, 0); + if (likely(atomic_read(&conn->active) > 0)) continue; if (conn->state == RXRPC_CONN_SERVICE_PREALLOC) continue; @@ -405,8 +423,8 @@ void rxrpc_service_connection_reaper(struct work_struct *work) if (conn->local->service_closed) expire_at = idle_timestamp + rxrpc_closed_conn_expiry * HZ; - _debug("reap CONN %d { u=%d,t=%ld }", - conn->debug_id, refcount_read(&conn->ref), + _debug("reap CONN %d { a=%d,t=%ld }", + conn->debug_id, atomic_read(&conn->active), (long)expire_at - (long)now); if (time_before(now, expire_at)) { @@ -416,10 +434,11 @@ void rxrpc_service_connection_reaper(struct work_struct *work) } } - /* The usage count sits at 1 whilst the object is unused on the - * list; we reduce that to 0 to make the object unavailable. + /* The activity count sits at 0 whilst the conn is unused on + * the list; we reduce that to -1 to make the conn unavailable. */ - if (!refcount_dec_if_one(&conn->ref)) + active = 0; + if (!atomic_try_cmpxchg(&conn->active, &active, -1)) continue; rxrpc_see_connection(conn, rxrpc_conn_see_reap_service); @@ -443,8 +462,8 @@ void rxrpc_service_connection_reaper(struct work_struct *work) link); list_del_init(&conn->link); - ASSERTCMP(refcount_read(&conn->ref), ==, 0); - rxrpc_kill_connection(conn); + ASSERTCMP(atomic_read(&conn->active), ==, -1); + rxrpc_put_connection(conn, rxrpc_conn_put_service_reaped); } _leave(""); |