summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/linux/io_uring_types.h3
-rw-r--r--io_uring/io_uring.c68
-rw-r--r--io_uring/io_uring.h9
-rw-r--r--io_uring/notif.c2
-rw-r--r--io_uring/notif.h2
-rw-r--r--io_uring/rw.c2
6 files changed, 61 insertions, 25 deletions
diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index 4a6ce03a4903..fa621a508a01 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -296,7 +296,7 @@ struct io_ring_ctx {
spinlock_t completion_lock;
bool poll_multi_queue;
- bool cq_waiting;
+ atomic_t cq_wait_nr;
/*
* ->iopoll_list is protected by the ctx->uring_lock for
@@ -566,6 +566,7 @@ struct io_kiocb {
atomic_t refs;
atomic_t poll_refs;
struct io_task_work io_task_work;
+ unsigned nr_tw;
/* for polled requests, i.e. IORING_OP_POLL_ADD and async armed poll */
union {
struct hlist_node hash_node;
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index 786ecfa01c54..8a327a81beaf 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -1300,35 +1300,59 @@ static __cold void io_fallback_tw(struct io_uring_task *tctx)
}
}
-static void io_req_local_work_add(struct io_kiocb *req)
+static void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
{
struct io_ring_ctx *ctx = req->ctx;
+ unsigned nr_wait, nr_tw, nr_tw_prev;
struct llist_node *first;
+ if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
+ flags &= ~IOU_F_TWQ_LAZY_WAKE;
+
first = READ_ONCE(ctx->work_llist.first);
do {
+ nr_tw_prev = 0;
+ if (first) {
+ struct io_kiocb *first_req = container_of(first,
+ struct io_kiocb,
+ io_task_work.node);
+ /*
+ * Might be executed at any moment, rely on
+ * SLAB_TYPESAFE_BY_RCU to keep it alive.
+ */
+ nr_tw_prev = READ_ONCE(first_req->nr_tw);
+ }
+ nr_tw = nr_tw_prev + 1;
+ /* Large enough to fail the nr_wait comparison below */
+ if (!(flags & IOU_F_TWQ_LAZY_WAKE))
+ nr_tw = -1U;
+
+ req->nr_tw = nr_tw;
req->io_task_work.node.next = first;
} while (!try_cmpxchg(&ctx->work_llist.first, &first,
&req->io_task_work.node));
- if (first)
- return;
-
- /* needed for the following wake up */
- smp_mb__after_atomic();
-
- if (unlikely(atomic_read(&req->task->io_uring->in_cancel))) {
- io_move_task_work_from_local(ctx);
- return;
+ if (!first) {
+ if (unlikely(atomic_read(&req->task->io_uring->in_cancel))) {
+ io_move_task_work_from_local(ctx);
+ return;
+ }
+ if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
+ atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
+ if (ctx->has_evfd)
+ io_eventfd_signal(ctx);
}
- if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
- atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
- if (ctx->has_evfd)
- io_eventfd_signal(ctx);
-
- if (READ_ONCE(ctx->cq_waiting))
- wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
+ nr_wait = atomic_read(&ctx->cq_wait_nr);
+ /* no one is waiting */
+ if (!nr_wait)
+ return;
+ /* either not enough or the previous add has already woken it up */
+ if (nr_wait > nr_tw || nr_tw_prev >= nr_wait)
+ return;
+ /* pairs with set_current_state() in io_cqring_wait() */
+ smp_mb__after_atomic();
+ wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
}
void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
@@ -1339,7 +1363,7 @@ void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
if (!(flags & IOU_F_TWQ_FORCE_NORMAL) &&
(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) {
rcu_read_lock();
- io_req_local_work_add(req);
+ io_req_local_work_add(req, flags);
rcu_read_unlock();
return;
}
@@ -2625,7 +2649,9 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
unsigned long check_cq;
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
- WRITE_ONCE(ctx->cq_waiting, 1);
+ int nr_wait = (int) iowq.cq_tail - READ_ONCE(ctx->rings->cq.tail);
+
+ atomic_set(&ctx->cq_wait_nr, nr_wait);
set_current_state(TASK_INTERRUPTIBLE);
} else {
prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
@@ -2634,7 +2660,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
ret = io_cqring_wait_schedule(ctx, &iowq);
__set_current_state(TASK_RUNNING);
- WRITE_ONCE(ctx->cq_waiting, 0);
+ atomic_set(&ctx->cq_wait_nr, 0);
if (ret < 0)
break;
@@ -4517,7 +4543,7 @@ static int __init io_uring_init(void)
io_uring_optable_init();
req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC |
- SLAB_ACCOUNT);
+ SLAB_ACCOUNT | SLAB_TYPESAFE_BY_RCU);
return 0;
};
__initcall(io_uring_init);
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index cb4309a2acdc..ef449e43d493 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -18,6 +18,15 @@
enum {
/* don't use deferred task_work */
IOU_F_TWQ_FORCE_NORMAL = 1,
+
+ /*
+ * A hint to not wake right away but delay until there are enough of
+ * tw's queued to match the number of CQEs the task is waiting for.
+ *
+ * Must not be used wirh requests generating more than one CQE.
+ * It's also ignored unless IORING_SETUP_DEFER_TASKRUN is set.
+ */
+ IOU_F_TWQ_LAZY_WAKE = 2,
};
enum {
diff --git a/io_uring/notif.c b/io_uring/notif.c
index 172105eb347d..e1846a25dde1 100644
--- a/io_uring/notif.c
+++ b/io_uring/notif.c
@@ -31,7 +31,7 @@ static void io_tx_ubuf_callback(struct sk_buff *skb, struct ubuf_info *uarg,
struct io_kiocb *notif = cmd_to_io_kiocb(nd);
if (refcount_dec_and_test(&uarg->refcnt))
- io_req_task_work_add(notif);
+ __io_req_task_work_add(notif, IOU_F_TWQ_LAZY_WAKE);
}
static void io_tx_ubuf_callback_ext(struct sk_buff *skb, struct ubuf_info *uarg,
diff --git a/io_uring/notif.h b/io_uring/notif.h
index c88c800cd89d..6dd1b30a468f 100644
--- a/io_uring/notif.h
+++ b/io_uring/notif.h
@@ -33,7 +33,7 @@ static inline void io_notif_flush(struct io_kiocb *notif)
/* drop slot's master ref */
if (refcount_dec_and_test(&nd->uarg.refcnt))
- io_req_task_work_add(notif);
+ __io_req_task_work_add(notif, IOU_F_TWQ_LAZY_WAKE);
}
static inline int io_notif_account_mem(struct io_kiocb *notif, unsigned len)
diff --git a/io_uring/rw.c b/io_uring/rw.c
index f14868624f41..6c7d2654770e 100644
--- a/io_uring/rw.c
+++ b/io_uring/rw.c
@@ -304,7 +304,7 @@ static void io_complete_rw(struct kiocb *kiocb, long res)
return;
io_req_set_res(req, io_fixup_rw_res(req, res), 0);
req->io_task_work.func = io_req_rw_complete;
- io_req_task_work_add(req);
+ __io_req_task_work_add(req, IOU_F_TWQ_LAZY_WAKE);
}
static void io_complete_rw_iopoll(struct kiocb *kiocb, long res)