diff options
Diffstat (limited to 'fs/io_uring.c')
-rw-r--r-- | fs/io_uring.c | 417 |
1 files changed, 99 insertions, 318 deletions
diff --git a/fs/io_uring.c b/fs/io_uring.c index f9eff8f62ddb..d94bd4e3a60e 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -56,7 +56,6 @@ #include <linux/mmu_context.h> #include <linux/percpu.h> #include <linux/slab.h> -#include <linux/workqueue.h> #include <linux/kthread.h> #include <linux/blkdev.h> #include <linux/bvec.h> @@ -77,6 +76,7 @@ #include <uapi/linux/io_uring.h> #include "internal.h" +#include "io-wq.h" #define IORING_MAX_ENTRIES 32768 #define IORING_MAX_CQ_ENTRIES (2 * IORING_MAX_ENTRIES) @@ -165,16 +165,6 @@ struct io_mapped_ubuf { unsigned int nr_bvecs; }; -struct async_list { - spinlock_t lock; - atomic_t cnt; - struct list_head list; - - struct file *file; - off_t io_start; - size_t io_len; -}; - struct io_ring_ctx { struct { struct percpu_ref refs; @@ -209,7 +199,7 @@ struct io_ring_ctx { } ____cacheline_aligned_in_smp; /* IO offload */ - struct workqueue_struct *sqo_wq[2]; + struct io_wq *io_wq; struct task_struct *sqo_thread; /* if using sq thread polling */ struct mm_struct *sqo_mm; wait_queue_head_t sqo_wait; @@ -262,8 +252,6 @@ struct io_ring_ctx { struct list_head cancel_list; } ____cacheline_aligned_in_smp; - struct async_list pending_async[2]; - #if defined(CONFIG_UNIX) struct socket *ring_sock; #endif @@ -333,7 +321,7 @@ struct io_kiocb { u32 result; u32 sequence; - struct work_struct work; + struct io_wq_work work; }; #define IO_PLUG_THRESHOLD 2 @@ -359,7 +347,7 @@ struct io_submit_state { unsigned int ios_left; }; -static void io_sq_wq_submit_work(struct work_struct *work); +static void io_wq_submit_work(struct io_wq_work **workptr); static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data, long res); static void __io_free_req(struct io_kiocb *req); @@ -391,7 +379,6 @@ static void io_ring_ctx_ref_free(struct percpu_ref *ref) static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) { struct io_ring_ctx *ctx; - int i; ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); if (!ctx) @@ -409,11 +396,6 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) init_completion(&ctx->sqo_thread_started); mutex_init(&ctx->uring_lock); init_waitqueue_head(&ctx->wait); - for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) { - spin_lock_init(&ctx->pending_async[i].lock); - INIT_LIST_HEAD(&ctx->pending_async[i].list); - atomic_set(&ctx->pending_async[i].cnt, 0); - } spin_lock_init(&ctx->completion_lock); INIT_LIST_HEAD(&ctx->poll_list); INIT_LIST_HEAD(&ctx->cancel_list); @@ -479,22 +461,45 @@ static void __io_commit_cqring(struct io_ring_ctx *ctx) } } -static inline void io_queue_async_work(struct io_ring_ctx *ctx, - struct io_kiocb *req) +static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe) { - int rw = 0; + u8 opcode = READ_ONCE(sqe->opcode); + + return !(opcode == IORING_OP_READ_FIXED || + opcode == IORING_OP_WRITE_FIXED); +} + +static inline bool io_prep_async_work(struct io_kiocb *req) +{ + bool do_hashed = false; if (req->submit.sqe) { switch (req->submit.sqe->opcode) { case IORING_OP_WRITEV: case IORING_OP_WRITE_FIXED: - rw = !(req->rw.ki_flags & IOCB_DIRECT); + do_hashed = true; break; } + if (io_sqe_needs_user(req->submit.sqe)) + req->work.flags |= IO_WQ_WORK_NEEDS_USER; } - trace_io_uring_queue_async_work(ctx, rw, req, &req->work, req->flags); - queue_work(ctx->sqo_wq[rw], &req->work); + return do_hashed; +} + +static inline void io_queue_async_work(struct io_ring_ctx *ctx, + struct io_kiocb *req) +{ + bool do_hashed = io_prep_async_work(req); + + trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work, + req->flags); + if (!do_hashed) { + io_wq_enqueue(ctx->io_wq, &req->work); + } else { + io_wq_enqueue_hashed(ctx->io_wq, &req->work, + file_inode(req->file)); + } } static void io_kill_timeout(struct io_kiocb *req) @@ -647,6 +652,7 @@ static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx, /* one is dropped after submission, the other at completion */ refcount_set(&req->refs, 2); req->result = 0; + INIT_IO_WORK(&req->work, io_wq_submit_work); return req; out: percpu_ref_put(&ctx->refs); @@ -693,12 +699,10 @@ static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr) * If we're in async work, we can continue processing the chain * in this context instead of having to queue up new async work. */ - if (nxtptr && current_work()) { + if (nxtptr && current_work()) *nxtptr = nxt; - } else { - INIT_WORK(&nxt->work, io_sq_wq_submit_work); + else io_queue_async_work(req->ctx, nxt); - } } } @@ -757,12 +761,10 @@ static void io_put_req(struct io_kiocb *req, struct io_kiocb **nxtptr) nxt = io_put_req_find_next(req); if (nxt) { - if (nxtptr) { + if (nxtptr) *nxtptr = nxt; - } else { - INIT_WORK(&nxt->work, io_sq_wq_submit_work); + else io_queue_async_work(nxt->ctx, nxt); - } } } @@ -1324,65 +1326,6 @@ static ssize_t io_import_iovec(struct io_ring_ctx *ctx, int rw, return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter); } -static inline bool io_should_merge(struct async_list *al, struct kiocb *kiocb) -{ - if (al->file == kiocb->ki_filp) { - off_t start, end; - - /* - * Allow merging if we're anywhere in the range of the same - * page. Generally this happens for sub-page reads or writes, - * and it's beneficial to allow the first worker to bring the - * page in and the piggy backed work can then work on the - * cached page. - */ - start = al->io_start & PAGE_MASK; - end = (al->io_start + al->io_len + PAGE_SIZE - 1) & PAGE_MASK; - if (kiocb->ki_pos >= start && kiocb->ki_pos <= end) - return true; - } - - al->file = NULL; - return false; -} - -/* - * Make a note of the last file/offset/direction we punted to async - * context. We'll use this information to see if we can piggy back a - * sequential request onto the previous one, if it's still hasn't been - * completed by the async worker. - */ -static void io_async_list_note(int rw, struct io_kiocb *req, size_t len) -{ - struct async_list *async_list = &req->ctx->pending_async[rw]; - struct kiocb *kiocb = &req->rw; - struct file *filp = kiocb->ki_filp; - - if (io_should_merge(async_list, kiocb)) { - unsigned long max_bytes; - - /* Use 8x RA size as a decent limiter for both reads/writes */ - max_bytes = filp->f_ra.ra_pages << (PAGE_SHIFT + 3); - if (!max_bytes) - max_bytes = VM_READAHEAD_PAGES << (PAGE_SHIFT + 3); - - /* If max len are exceeded, reset the state */ - if (async_list->io_len + len <= max_bytes) { - req->flags |= REQ_F_SEQ_PREV; - async_list->io_len += len; - } else { - async_list->file = NULL; - } - } - - /* New file? Reset state. */ - if (async_list->file != filp) { - async_list->io_start = kiocb->ki_pos; - async_list->io_len = len; - async_list->file = filp; - } -} - /* * For files that don't have ->read_iter() and ->write_iter(), handle them * by looping over ->read() or ->write() manually. @@ -1477,13 +1420,10 @@ static int io_read(struct io_kiocb *req, const struct sqe_submit *s, ret2 > 0 && ret2 < read_size) ret2 = -EAGAIN; /* Catch -EAGAIN return for forced non-blocking submission */ - if (!force_nonblock || ret2 != -EAGAIN) { + if (!force_nonblock || ret2 != -EAGAIN) kiocb_done(kiocb, ret2, nxt, s->in_async); - } else { - if (!s->in_async) - io_async_list_note(READ, req, iov_count); + else ret = -EAGAIN; - } } kfree(iovec); return ret; @@ -1517,11 +1457,8 @@ static int io_write(struct io_kiocb *req, const struct sqe_submit *s, iov_count = iov_iter_count(&iter); ret = -EAGAIN; - if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) { - if (!s->in_async) - io_async_list_note(WRITE, req, iov_count); + if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) goto out_free; - } ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count); if (!ret) { @@ -1546,13 +1483,10 @@ static int io_write(struct io_kiocb *req, const struct sqe_submit *s, ret2 = call_write_iter(file, kiocb, &iter); else ret2 = loop_rw_iter(WRITE, file, kiocb, &iter); - if (!force_nonblock || ret2 != -EAGAIN) { + if (!force_nonblock || ret2 != -EAGAIN) kiocb_done(kiocb, ret2, nxt, s->in_async); - } else { - if (!s->in_async) - io_async_list_note(WRITE, req, iov_count); + else ret = -EAGAIN; - } } out_free: kfree(iovec); @@ -1794,14 +1728,18 @@ static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req, io_commit_cqring(ctx); } -static void io_poll_complete_work(struct work_struct *work) +static void io_poll_complete_work(struct io_wq_work **workptr) { + struct io_wq_work *work = *workptr; struct io_kiocb *req = container_of(work, struct io_kiocb, work); struct io_poll_iocb *poll = &req->poll; struct poll_table_struct pt = { ._key = poll->events }; struct io_ring_ctx *ctx = req->ctx; __poll_t mask = 0; + if (work->flags & IO_WQ_WORK_CANCEL) + WRITE_ONCE(poll->canceled, true); + if (!READ_ONCE(poll->canceled)) mask = vfs_poll(poll->file, &pt) & poll->events; @@ -1894,7 +1832,7 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe) return -EBADF; req->submit.sqe = NULL; - INIT_WORK(&req->work, io_poll_complete_work); + INIT_IO_WORK(&req->work, io_poll_complete_work); events = READ_ONCE(sqe->poll_events); poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP; @@ -2152,7 +2090,6 @@ static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req, memcpy(sqe_copy, sqe, sizeof(*sqe_copy)); req->submit.sqe = sqe_copy; - INIT_WORK(&req->work, io_sq_wq_submit_work); trace_io_uring_defer(ctx, req, false); list_add_tail(&req->list, &ctx->defer_list); spin_unlock_irq(&ctx->completion_lock); @@ -2235,186 +2172,54 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, return 0; } -static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx, - const struct io_uring_sqe *sqe) -{ - switch (sqe->opcode) { - case IORING_OP_READV: - case IORING_OP_READ_FIXED: - return &ctx->pending_async[READ]; - case IORING_OP_WRITEV: - case IORING_OP_WRITE_FIXED: - return &ctx->pending_async[WRITE]; - default: - return NULL; - } -} - -static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe) -{ - u8 opcode = READ_ONCE(sqe->opcode); - - return !(opcode == IORING_OP_READ_FIXED || - opcode == IORING_OP_WRITE_FIXED); -} - -static void io_sq_wq_submit_work(struct work_struct *work) +static void io_wq_submit_work(struct io_wq_work **workptr) { + struct io_wq_work *work = *workptr; struct io_kiocb *req = container_of(work, struct io_kiocb, work); struct io_ring_ctx *ctx = req->ctx; - struct mm_struct *cur_mm = NULL; - struct async_list *async_list; - LIST_HEAD(req_list); - mm_segment_t old_fs; - int ret; + struct sqe_submit *s = &req->submit; + const struct io_uring_sqe *sqe = s->sqe; + struct io_kiocb *nxt = NULL; + int ret = 0; - async_list = io_async_list_from_sqe(ctx, req->submit.sqe); -restart: - do { - struct sqe_submit *s = &req->submit; - const struct io_uring_sqe *sqe = s->sqe; - unsigned int flags = req->flags; - struct io_kiocb *nxt = NULL; + /* Ensure we clear previously set non-block flag */ + req->rw.ki_flags &= ~IOCB_NOWAIT; - /* Ensure we clear previously set non-block flag */ - req->rw.ki_flags &= ~IOCB_NOWAIT; + if (work->flags & IO_WQ_WORK_CANCEL) + ret = -ECANCELED; - ret = 0; - if (io_sqe_needs_user(sqe) && !cur_mm) { - if (!mmget_not_zero(ctx->sqo_mm)) { - ret = -EFAULT; - } else { - cur_mm = ctx->sqo_mm; - use_mm(cur_mm); - old_fs = get_fs(); - set_fs(USER_DS); - } - } + if (!ret) { + s->has_user = (work->flags & IO_WQ_WORK_HAS_MM) != 0; + s->in_async = true; + do { + ret = __io_submit_sqe(ctx, req, s, &nxt, false); + /* + * We can get EAGAIN for polled IO even though we're + * forcing a sync submission from here, since we can't + * wait for request slots on the block side. + */ + if (ret != -EAGAIN) + break; + cond_resched(); + } while (1); + } - if (!ret) { - s->has_user = cur_mm != NULL; - s->in_async = true; - do { - ret = __io_submit_sqe(ctx, req, s, &nxt, false); - /* - * We can get EAGAIN for polled IO even though - * we're forcing a sync submission from here, - * since we can't wait for request slots on the - * block side. - */ - if (ret != -EAGAIN) - break; - cond_resched(); - } while (1); - } + /* drop submission reference */ + io_put_req(req, NULL); - /* drop submission reference */ + if (ret) { + io_cqring_add_event(ctx, sqe->user_data, ret); io_put_req(req, NULL); - - if (ret) { - io_cqring_add_event(ctx, sqe->user_data, ret); - io_put_req(req, NULL); - } - - /* async context always use a copy of the sqe */ - kfree(sqe); - - /* if a dependent link is ready, do that as the next one */ - if (!ret && nxt) { - req = nxt; - continue; - } - - /* req from defer and link list needn't decrease async cnt */ - if (flags & (REQ_F_IO_DRAINED | REQ_F_LINK_DONE)) - goto out; - - if (!async_list) - break; - if (!list_empty(&req_list)) { - req = list_first_entry(&req_list, struct io_kiocb, - list); - list_del(&req->list); - continue; - } - if (list_empty(&async_list->list)) - break; - - req = NULL; - spin_lock(&async_list->lock); - if (list_empty(&async_list->list)) { - spin_unlock(&async_list->lock); - break; - } - list_splice_init(&async_list->list, &req_list); - spin_unlock(&async_list->lock); - - req = list_first_entry(&req_list, struct io_kiocb, list); - list_del(&req->list); - } while (req); - - /* - * Rare case of racing with a submitter. If we find the count has - * dropped to zero AND we have pending work items, then restart - * the processing. This is a tiny race window. - */ - if (async_list) { - ret = atomic_dec_return(&async_list->cnt); - while (!ret && !list_empty(&async_list->list)) { - spin_lock(&async_list->lock); - atomic_inc(&async_list->cnt); - list_splice_init(&async_list->list, &req_list); - spin_unlock(&async_list->lock); - - if (!list_empty(&req_list)) { - req = list_first_entry(&req_list, - struct io_kiocb, list); - list_del(&req->list); - goto restart; - } - ret = atomic_dec_return(&async_list->cnt); - } - } - -out: - if (cur_mm) { - set_fs(old_fs); - unuse_mm(cur_mm); - mmput(cur_mm); } -} - -/* - * See if we can piggy back onto previously submitted work, that is still - * running. We currently only allow this if the new request is sequential - * to the previous one we punted. - */ -static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req) -{ - bool ret; - if (!list) - return false; - if (!(req->flags & REQ_F_SEQ_PREV)) - return false; - if (!atomic_read(&list->cnt)) - return false; + /* async context always use a copy of the sqe */ + kfree(sqe); - ret = true; - spin_lock(&list->lock); - list_add_tail(&req->list, &list->list); - /* - * Ensure we see a simultaneous modification from io_sq_wq_submit_work() - */ - smp_mb(); - if (!atomic_read(&list->cnt)) { - list_del_init(&req->list); - ret = false; + /* if a dependent link is ready, pass it back */ + if (!ret && nxt) { + io_prep_async_work(nxt); + *workptr = &nxt->work; } - spin_unlock(&list->lock); - - trace_io_uring_add_to_prev(req, ret); - return ret; } static bool io_op_needs_file(const struct io_uring_sqe *sqe) @@ -2488,17 +2293,9 @@ static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL); if (sqe_copy) { - struct async_list *list; - s->sqe = sqe_copy; memcpy(&req->submit, s, sizeof(*s)); - list = io_async_list_from_sqe(ctx, s->sqe); - if (!io_add_to_prev_work(list, req)) { - if (list) - atomic_inc(&list->cnt); - INIT_WORK(&req->work, io_sq_wq_submit_work); - io_queue_async_work(ctx, req); - } + io_queue_async_work(ctx, req); /* * Queued up for async execution, worker will release @@ -3109,15 +2906,11 @@ static void io_sq_thread_stop(struct io_ring_ctx *ctx) static void io_finish_async(struct io_ring_ctx *ctx) { - int i; - io_sq_thread_stop(ctx); - for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++) { - if (ctx->sqo_wq[i]) { - destroy_workqueue(ctx->sqo_wq[i]); - ctx->sqo_wq[i] = NULL; - } + if (ctx->io_wq) { + io_wq_destroy(ctx->io_wq); + ctx->io_wq = NULL; } } @@ -3125,11 +2918,9 @@ static void io_finish_async(struct io_ring_ctx *ctx) static void io_destruct_skb(struct sk_buff *skb) { struct io_ring_ctx *ctx = skb->sk->sk_user_data; - int i; - for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++) - if (ctx->sqo_wq[i]) - flush_workqueue(ctx->sqo_wq[i]); + if (ctx->io_wq) + io_wq_flush(ctx->io_wq); unix_destruct_scm(skb); } @@ -3473,6 +3264,7 @@ static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg, static int io_sq_offload_start(struct io_ring_ctx *ctx, struct io_uring_params *p) { + unsigned concurrency; int ret; init_waitqueue_head(&ctx->sqo_wait); @@ -3516,25 +3308,10 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx, goto err; } - /* Do QD, or 2 * CPUS, whatever is smallest */ - ctx->sqo_wq[0] = alloc_workqueue("io_ring-wq", - WQ_UNBOUND | WQ_FREEZABLE, - min(ctx->sq_entries - 1, 2 * num_online_cpus())); - if (!ctx->sqo_wq[0]) { - ret = -ENOMEM; - goto err; - } - - /* - * This is for buffered writes, where we want to limit the parallelism - * due to file locking in file systems. As "normal" buffered writes - * should parellelize on writeout quite nicely, limit us to having 2 - * pending. This avoids massive contention on the inode when doing - * buffered async writes. - */ - ctx->sqo_wq[1] = alloc_workqueue("io_ring-write-wq", - WQ_UNBOUND | WQ_FREEZABLE, 2); - if (!ctx->sqo_wq[1]) { + /* Do QD, or 4 * CPUS, whatever is smallest */ + concurrency = min(ctx->sq_entries, 4 * num_online_cpus()); + ctx->io_wq = io_wq_create(concurrency, ctx->sqo_mm); + if (!ctx->io_wq) { ret = -ENOMEM; goto err; } @@ -3919,6 +3696,10 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) io_kill_timeouts(ctx); io_poll_remove_all(ctx); + + if (ctx->io_wq) + io_wq_cancel_all(ctx->io_wq); + io_iopoll_reap_events(ctx); wait_for_completion(&ctx->ctx_done); io_ring_ctx_free(ctx); |