summaryrefslogtreecommitdiff
path: root/fs/io-wq.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/io-wq.c')
-rw-r--r--fs/io-wq.c96
1 files changed, 66 insertions, 30 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c
index 843d4a7bcd6e..7d2ed8c7dd31 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -129,7 +129,8 @@ struct io_cb_cancel_data {
bool cancel_all;
};
-static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
+static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first);
+static void io_wqe_dec_running(struct io_worker *worker);
static bool io_worker_get(struct io_worker *worker)
{
@@ -168,26 +169,21 @@ static void io_worker_exit(struct io_worker *worker)
{
struct io_wqe *wqe = worker->wqe;
struct io_wqe_acct *acct = io_wqe_get_acct(worker);
- unsigned flags;
if (refcount_dec_and_test(&worker->ref))
complete(&worker->ref_done);
wait_for_completion(&worker->ref_done);
- preempt_disable();
- current->flags &= ~PF_IO_WORKER;
- flags = worker->flags;
- worker->flags = 0;
- if (flags & IO_WORKER_F_RUNNING)
- atomic_dec(&acct->nr_running);
- worker->flags = 0;
- preempt_enable();
-
raw_spin_lock_irq(&wqe->lock);
- if (flags & IO_WORKER_F_FREE)
+ if (worker->flags & IO_WORKER_F_FREE)
hlist_nulls_del_rcu(&worker->nulls_node);
list_del_rcu(&worker->all_list);
acct->nr_workers--;
+ preempt_disable();
+ io_wqe_dec_running(worker);
+ worker->flags = 0;
+ current->flags &= ~PF_IO_WORKER;
+ preempt_enable();
raw_spin_unlock_irq(&wqe->lock);
kfree_rcu(worker, rcu);
@@ -214,15 +210,19 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
struct hlist_nulls_node *n;
struct io_worker *worker;
- n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list));
- if (is_a_nulls(n))
- return false;
-
- worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
- if (io_worker_get(worker)) {
- wake_up_process(worker->task);
+ /*
+ * Iterate free_list and see if we can find an idle worker to
+ * activate. If a given worker is on the free_list but in the process
+ * of exiting, keep trying.
+ */
+ hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
+ if (!io_worker_get(worker))
+ continue;
+ if (wake_up_process(worker->task)) {
+ io_worker_release(worker);
+ return true;
+ }
io_worker_release(worker);
- return true;
}
return false;
@@ -247,10 +247,21 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
ret = io_wqe_activate_free_worker(wqe);
rcu_read_unlock();
- if (!ret && acct->nr_workers < acct->max_workers) {
- atomic_inc(&acct->nr_running);
- atomic_inc(&wqe->wq->worker_refs);
- create_io_worker(wqe->wq, wqe, acct->index);
+ if (!ret) {
+ bool do_create = false, first = false;
+
+ raw_spin_lock_irq(&wqe->lock);
+ if (acct->nr_workers < acct->max_workers) {
+ atomic_inc(&acct->nr_running);
+ atomic_inc(&wqe->wq->worker_refs);
+ if (!acct->nr_workers)
+ first = true;
+ acct->nr_workers++;
+ do_create = true;
+ }
+ raw_spin_unlock_irq(&wqe->lock);
+ if (do_create)
+ create_io_worker(wqe->wq, wqe, acct->index, first);
}
}
@@ -271,10 +282,28 @@ static void create_worker_cb(struct callback_head *cb)
{
struct create_worker_data *cwd;
struct io_wq *wq;
+ struct io_wqe *wqe;
+ struct io_wqe_acct *acct;
+ bool do_create = false, first = false;
cwd = container_of(cb, struct create_worker_data, work);
- wq = cwd->wqe->wq;
- create_io_worker(wq, cwd->wqe, cwd->index);
+ wqe = cwd->wqe;
+ wq = wqe->wq;
+ acct = &wqe->acct[cwd->index];
+ raw_spin_lock_irq(&wqe->lock);
+ if (acct->nr_workers < acct->max_workers) {
+ if (!acct->nr_workers)
+ first = true;
+ acct->nr_workers++;
+ do_create = true;
+ }
+ raw_spin_unlock_irq(&wqe->lock);
+ if (do_create) {
+ create_io_worker(wq, wqe, cwd->index, first);
+ } else {
+ atomic_dec(&acct->nr_running);
+ io_worker_ref_put(wq);
+ }
kfree(cwd);
}
@@ -612,7 +641,7 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
raw_spin_unlock_irq(&worker->wqe->lock);
}
-static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
+static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first)
{
struct io_wqe_acct *acct = &wqe->acct[index];
struct io_worker *worker;
@@ -635,6 +664,9 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
kfree(worker);
fail:
atomic_dec(&acct->nr_running);
+ raw_spin_lock_irq(&wqe->lock);
+ acct->nr_workers--;
+ raw_spin_unlock_irq(&wqe->lock);
io_worker_ref_put(wq);
return;
}
@@ -650,9 +682,8 @@ fail:
worker->flags |= IO_WORKER_F_FREE;
if (index == IO_WQ_ACCT_BOUND)
worker->flags |= IO_WORKER_F_BOUND;
- if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
+ if (first && (worker->flags & IO_WORKER_F_BOUND))
worker->flags |= IO_WORKER_F_FIXED;
- acct->nr_workers++;
raw_spin_unlock_irq(&wqe->lock);
wake_up_new_task(tsk);
}
@@ -731,7 +762,12 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
int work_flags;
unsigned long flags;
- if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) {
+ /*
+ * If io-wq is exiting for this task, or if the request has explicitly
+ * been marked as one that should not get executed, cancel it here.
+ */
+ if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
+ (work->flags & IO_WQ_WORK_CANCEL)) {
io_run_cancel(work, wqe);
return;
}