summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--fs/io-wq.c50
1 files changed, 35 insertions, 15 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c
index 9174007ce107..243eb1d4d2bd 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -33,6 +33,7 @@ enum {
enum {
IO_WQ_BIT_EXIT = 0, /* wq exiting */
IO_WQ_BIT_CANCEL = 1, /* cancel work on list */
+ IO_WQ_BIT_ERROR = 2, /* error on setup */
};
enum {
@@ -562,14 +563,14 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
spin_unlock_irq(&wqe->lock);
}
-static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
+static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
{
struct io_wqe_acct *acct =&wqe->acct[index];
struct io_worker *worker;
worker = kcalloc_node(1, sizeof(*worker), GFP_KERNEL, wqe->node);
if (!worker)
- return;
+ return false;
refcount_set(&worker->ref, 1);
worker->nulls_node.pprev = NULL;
@@ -581,7 +582,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
"io_wqe_worker-%d/%d", index, wqe->node);
if (IS_ERR(worker->task)) {
kfree(worker);
- return;
+ return false;
}
spin_lock_irq(&wqe->lock);
@@ -599,6 +600,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
atomic_inc(&wq->user->processes);
wake_up_process(worker->task);
+ return true;
}
static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
@@ -606,9 +608,6 @@ static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
{
struct io_wqe_acct *acct = &wqe->acct[index];
- /* always ensure we have one bounded worker */
- if (index == IO_WQ_ACCT_BOUND && !acct->nr_workers)
- return true;
/* if we have available workers or no work, no need */
if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
return false;
@@ -621,10 +620,19 @@ static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
static int io_wq_manager(void *data)
{
struct io_wq *wq = data;
+ int i;
- while (!kthread_should_stop()) {
- int i;
+ /* create fixed workers */
+ refcount_set(&wq->refs, wq->nr_wqes);
+ for (i = 0; i < wq->nr_wqes; i++) {
+ if (create_io_worker(wq, wq->wqes[i], IO_WQ_ACCT_BOUND))
+ continue;
+ goto err;
+ }
+ complete(&wq->done);
+
+ while (!kthread_should_stop()) {
for (i = 0; i < wq->nr_wqes; i++) {
struct io_wqe *wqe = wq->wqes[i];
bool fork_worker[2] = { false, false };
@@ -645,6 +653,12 @@ static int io_wq_manager(void *data)
}
return 0;
+err:
+ set_bit(IO_WQ_BIT_ERROR, &wq->state);
+ set_bit(IO_WQ_BIT_EXIT, &wq->state);
+ if (refcount_sub_and_test(wq->nr_wqes - i, &wq->refs))
+ complete(&wq->done);
+ return 0;
}
static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct,
@@ -982,7 +996,6 @@ struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm,
wq->user = user;
i = 0;
- refcount_set(&wq->refs, wq->nr_wqes);
for_each_online_node(node) {
struct io_wqe *wqe;
@@ -1020,14 +1033,22 @@ struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm,
wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager");
if (!IS_ERR(wq->manager)) {
wake_up_process(wq->manager);
+ wait_for_completion(&wq->done);
+ if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
+ ret = -ENOMEM;
+ goto err;
+ }
+ reinit_completion(&wq->done);
return wq;
}
ret = PTR_ERR(wq->manager);
- wq->manager = NULL;
-err:
complete(&wq->done);
- io_wq_destroy(wq);
+err:
+ for (i = 0; i < wq->nr_wqes; i++)
+ kfree(wq->wqes[i]);
+ kfree(wq->wqes);
+ kfree(wq);
return ERR_PTR(ret);
}
@@ -1041,10 +1062,9 @@ void io_wq_destroy(struct io_wq *wq)
{
int i;
- if (wq->manager) {
- set_bit(IO_WQ_BIT_EXIT, &wq->state);
+ set_bit(IO_WQ_BIT_EXIT, &wq->state);
+ if (wq->manager)
kthread_stop(wq->manager);
- }
rcu_read_lock();
for (i = 0; i < wq->nr_wqes; i++) {