Linux-Block Archive on lore.kernel.org
 help / color / Atom feed
* [PATCHSET 0/2] Add unbounded io_wq for io_uring
@ 2019-11-06 19:40 Jens Axboe
  2019-11-06 19:40 ` [PATCH 1/2] io-wq: pass in io_wq to work handler Jens Axboe
  2019-11-06 19:40 ` [PATCH 2/2] io_uring: use separate io_wq's for bounded and unbounded request times Jens Axboe
  0 siblings, 2 replies; 3+ messages in thread
From: Jens Axboe @ 2019-11-06 19:40 UTC (permalink / raw)
  To: io-uring; +Cc: linux-block

Currently we have just the one workqueue for each io_uring ctx, which
supports any kind of IO. This is problematic with requests that have
very different life times. For example, disk IO is always bounded in
time, while networked IO can take a long time. Ditto with POLL commands,
that also have unbounded execution time.

First patch is just a prep patch, patch number two adds support for
determining the io_wq placement based on the request type.

Patches are against my for-5.5/io_uring-test branch.

 fs/io-wq.c    |   8 +--
 fs/io-wq.h    |   2 +-
 fs/io_uring.c | 145 ++++++++++++++++++++++++++++++++++++--------------
 3 files changed, 110 insertions(+), 45 deletions(-)

-- 
 Jens Axboe



^ permalink raw reply	[flat|nested] 3+ messages in thread

* [PATCH 1/2] io-wq: pass in io_wq to work handler
  2019-11-06 19:40 [PATCHSET 0/2] Add unbounded io_wq for io_uring Jens Axboe
@ 2019-11-06 19:40 ` Jens Axboe
  2019-11-06 19:40 ` [PATCH 2/2] io_uring: use separate io_wq's for bounded and unbounded request times Jens Axboe
  1 sibling, 0 replies; 3+ messages in thread
From: Jens Axboe @ 2019-11-06 19:40 UTC (permalink / raw)
  To: io-uring; +Cc: linux-block, Jens Axboe

This is the io_wq that the work is executing on. No functional changes
in this patch, we'll need this in a future patch.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io-wq.c    | 8 ++++----
 fs/io-wq.h    | 2 +-
 fs/io_uring.c | 6 +++---
 3 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/fs/io-wq.c b/fs/io-wq.c
index ba40a7ee31c3..4ebbdd068ebf 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -318,7 +318,7 @@ static void io_worker_handle_work(struct io_worker *worker)
 			work->flags |= IO_WQ_WORK_HAS_MM;
 
 		old_work = work;
-		work->func(&work);
+		work->func(wq, &work);
 
 		spin_lock_irq(&wqe->lock);
 		worker->cur_work = NULL;
@@ -685,7 +685,7 @@ static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe,
 
 	if (found) {
 		work->flags |= IO_WQ_WORK_CANCEL;
-		work->func(&work);
+		work->func(wqe->wq, &work);
 		return IO_WQ_CANCEL_OK;
 	}
 
@@ -757,7 +757,7 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
 
 	if (found) {
 		work->flags |= IO_WQ_WORK_CANCEL;
-		work->func(&work);
+		work->func(wqe->wq, &work);
 		return IO_WQ_CANCEL_OK;
 	}
 
@@ -801,7 +801,7 @@ struct io_wq_flush_data {
 	struct completion done;
 };
 
-static void io_wq_flush_func(struct io_wq_work **workptr)
+static void io_wq_flush_func(struct io_wq *wq, struct io_wq_work **workptr)
 {
 	struct io_wq_work *work = *workptr;
 	struct io_wq_flush_data *data;
diff --git a/fs/io-wq.h b/fs/io-wq.h
index 3de192dc73fc..9fe8c97bcbd2 100644
--- a/fs/io-wq.h
+++ b/fs/io-wq.h
@@ -21,7 +21,7 @@ enum io_wq_cancel {
 
 struct io_wq_work {
 	struct list_head list;
-	void (*func)(struct io_wq_work **);
+	void (*func)(struct io_wq *, struct io_wq_work **);
 	unsigned flags;
 	struct files_struct *files;
 };
diff --git a/fs/io_uring.c b/fs/io_uring.c
index 6c13411896b5..ad452be9f3bc 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -369,7 +369,7 @@ struct io_submit_state {
 	unsigned int		ios_left;
 };
 
-static void io_wq_submit_work(struct io_wq_work **workptr);
+static void io_wq_submit_work(struct io_wq *wq, struct io_wq_work **workptr);
 static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
 				 long res);
 static void __io_free_req(struct io_kiocb *req);
@@ -1930,7 +1930,7 @@ static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
 	io_commit_cqring(ctx);
 }
 
-static void io_poll_complete_work(struct io_wq_work **workptr)
+static void io_poll_complete_work(struct io_wq *wq, struct io_wq_work **workptr)
 {
 	struct io_wq_work *work = *workptr;
 	struct io_kiocb *req = container_of(work, struct io_kiocb, work);
@@ -2436,7 +2436,7 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 	return 0;
 }
 
-static void io_wq_submit_work(struct io_wq_work **workptr)
+static void io_wq_submit_work(struct io_wq *wq, struct io_wq_work **workptr)
 {
 	struct io_wq_work *work = *workptr;
 	struct io_kiocb *req = container_of(work, struct io_kiocb, work);
-- 
2.24.0


^ permalink raw reply	[flat|nested] 3+ messages in thread

* [PATCH 2/2] io_uring: use separate io_wq's for bounded and unbounded request times
  2019-11-06 19:40 [PATCHSET 0/2] Add unbounded io_wq for io_uring Jens Axboe
  2019-11-06 19:40 ` [PATCH 1/2] io-wq: pass in io_wq to work handler Jens Axboe
@ 2019-11-06 19:40 ` Jens Axboe
  1 sibling, 0 replies; 3+ messages in thread
From: Jens Axboe @ 2019-11-06 19:40 UTC (permalink / raw)
  To: io-uring; +Cc: linux-block, Jens Axboe

We currently have just the one io_wq for the ctx, which is used for
both disk and network IO. Generally it can be a problem to mix the two,
since disk IO is bounded in time, and network IO is not. This can lead
to situations where we could tie up all workers with unbounded work,
leaving no room for bounded work.

Add a separate io_wq for unbounded work times.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 139 ++++++++++++++++++++++++++++++++++++--------------
 1 file changed, 102 insertions(+), 37 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index ad452be9f3bc..4418139c1fce 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -213,7 +213,7 @@ struct io_ring_ctx {
 	} ____cacheline_aligned_in_smp;
 
 	/* IO offload */
-	struct io_wq		*io_wq;
+	struct io_wq		*io_wq[2];
 	struct task_struct	*sqo_thread;	/* if using sq thread polling */
 	struct mm_struct	*sqo_mm;
 	wait_queue_head_t	sqo_wait;
@@ -496,37 +496,55 @@ static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
 		 opcode == IORING_OP_WRITE_FIXED);
 }
 
-static inline bool io_prep_async_work(struct io_kiocb *req)
+static inline struct io_wq *io_prep_async_work(struct io_ring_ctx *ctx,
+					       struct io_kiocb *req,
+					       bool *do_hashed)
 {
-	bool do_hashed = false;
+	struct io_wq *wq = ctx->io_wq[0];
 
+	*do_hashed = false;
 	if (req->submit.sqe) {
 		switch (req->submit.sqe->opcode) {
 		case IORING_OP_WRITEV:
 		case IORING_OP_WRITE_FIXED:
-			do_hashed = true;
+			*do_hashed = true;
+			/* fall-through */
+		case IORING_OP_READV:
+		case IORING_OP_READ_FIXED:
+		case IORING_OP_SENDMSG:
+		case IORING_OP_RECVMSG:
+		case IORING_OP_ACCEPT:
+		case IORING_OP_POLL_ADD:
+			/*
+			 * We know REQ_F_ISREG is not set on some of these
+			 * opcodes, but this enables us to keep the check in
+			 * just one place.
+			 */
+			if (!(req->flags & REQ_F_ISREG))
+				wq = ctx->io_wq[1];
 			break;
 		}
 		if (io_sqe_needs_user(req->submit.sqe))
 			req->work.flags |= IO_WQ_WORK_NEEDS_USER;
 	}
 
-	return do_hashed;
+	return wq;
 }
 
 static inline void io_queue_async_work(struct io_ring_ctx *ctx,
 				       struct io_kiocb *req)
 {
-	bool do_hashed = io_prep_async_work(req);
+	struct io_wq *wq;
+	bool do_hashed;
+
+	wq = io_prep_async_work(ctx, req, &do_hashed);
 
 	trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work,
 					req->flags);
-	if (!do_hashed) {
-		io_wq_enqueue(ctx->io_wq, &req->work);
-	} else {
-		io_wq_enqueue_hashed(ctx->io_wq, &req->work,
-					file_inode(req->file));
-	}
+	if (!do_hashed)
+		io_wq_enqueue(wq, &req->work);
+	else
+		io_wq_enqueue_hashed(wq, &req->work, file_inode(req->file));
 }
 
 static void io_kill_timeout(struct io_kiocb *req)
@@ -2282,12 +2300,12 @@ static bool io_cancel_cb(struct io_wq_work *work, void *data)
 	return req->user_data == (unsigned long) data;
 }
 
-static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr)
+static int io_async_cancel_one(struct io_wq *wq, void *sqe_addr)
 {
 	enum io_wq_cancel cancel_ret;
 	int ret = 0;
 
-	cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr);
+	cancel_ret = io_wq_cancel_cb(wq, io_cancel_cb, sqe_addr);
 	switch (cancel_ret) {
 	case IO_WQ_CANCEL_OK:
 		ret = 0;
@@ -2308,7 +2326,7 @@ static int io_async_cancel(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 {
 	struct io_ring_ctx *ctx = req->ctx;
 	void *sqe_addr;
-	int ret;
+	int i, ret;
 
 	if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
 		return -EINVAL;
@@ -2317,7 +2335,11 @@ static int io_async_cancel(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 		return -EINVAL;
 
 	sqe_addr = (void *) (unsigned long) READ_ONCE(sqe->addr);
-	ret = io_async_cancel_one(ctx, sqe_addr);
+	for (i = 0; i < ARRAY_SIZE(ctx->io_wq); i++) {
+		ret = io_async_cancel_one(ctx->io_wq[i], sqe_addr);
+		if (ret != IO_WQ_CANCEL_NOTFOUND)
+			break;
+	}
 
 	if (ret < 0 && (req->flags & REQ_F_LINK))
 		req->flags |= REQ_F_FAIL_LINK;
@@ -2481,10 +2503,20 @@ static void io_wq_submit_work(struct io_wq *wq, struct io_wq_work **workptr)
 	/* async context always use a copy of the sqe */
 	kfree(sqe);
 
-	/* if a dependent link is ready, pass it back */
+	/*
+	 * If a dependent link is ready and is on the same io_wq as us, pass it
+	 * back for immediate execution. If it's on a different io_wq, enqueue
+	 * it separately.
+	 */
 	if (!ret && nxt) {
-		io_prep_async_work(nxt);
-		*workptr = &nxt->work;
+		struct io_wq *nxt_wq;
+		bool do_hashed;
+
+		nxt_wq = io_prep_async_work(ctx, nxt, &do_hashed);
+		if (nxt_wq == wq)
+			*workptr = &nxt->work;
+		else
+			io_wq_enqueue(nxt_wq, &nxt->work);
 	}
 }
 
@@ -2598,8 +2630,13 @@ static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
 
 	spin_unlock_irqrestore(&ctx->completion_lock, flags);
 
-	if (prev)
-		ret = io_async_cancel_one(ctx, (void *) prev->user_data);
+	if (prev) {
+		struct io_wq *wq;
+		bool tmp;
+
+		wq = io_prep_async_work(ctx, prev, &tmp);
+		ret = io_async_cancel_one(wq, (void *) prev->user_data);
+	}
 
 	io_cqring_add_event(ctx, req->user_data, ret);
 	io_put_req(req, NULL);
@@ -3274,11 +3311,15 @@ static void io_sq_thread_stop(struct io_ring_ctx *ctx)
 
 static void io_finish_async(struct io_ring_ctx *ctx)
 {
+	int i;
+
 	io_sq_thread_stop(ctx);
 
-	if (ctx->io_wq) {
-		io_wq_destroy(ctx->io_wq);
-		ctx->io_wq = NULL;
+	for (i = 0; i < ARRAY_SIZE(ctx->io_wq); i++) {
+		if (ctx->io_wq[i]) {
+			io_wq_destroy(ctx->io_wq[i]);
+			ctx->io_wq[i] = NULL;
+		}
 	}
 }
 
@@ -3286,9 +3327,11 @@ static void io_finish_async(struct io_ring_ctx *ctx)
 static void io_destruct_skb(struct sk_buff *skb)
 {
 	struct io_ring_ctx *ctx = skb->sk->sk_user_data;
+	int i;
 
-	if (ctx->io_wq)
-		io_wq_flush(ctx->io_wq);
+	for (i = 0; i < ARRAY_SIZE(ctx->io_wq); i++)
+		if (ctx->io_wq[i])
+			io_wq_flush(ctx->io_wq[i]);
 
 	unix_destruct_scm(skb);
 }
@@ -3731,12 +3774,27 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,
 		goto err;
 	}
 
-	/* Do QD, or 4 * CPUS, whatever is smallest */
+	/*
+	 * This is for the bounded time requests, generally disk IO.
+	 * Do QD, or 4 * CPUS, whatever is smallest
+	 */
 	concurrency = min(ctx->sq_entries, 4 * num_online_cpus());
-	ctx->io_wq = io_wq_create(concurrency, ctx->sqo_mm);
-	if (IS_ERR(ctx->io_wq)) {
-		ret = PTR_ERR(ctx->io_wq);
-		ctx->io_wq = NULL;
+	ctx->io_wq[0] = io_wq_create(concurrency, ctx->sqo_mm);
+	if (IS_ERR(ctx->io_wq[0])) {
+		ret = PTR_ERR(ctx->io_wq[0]);
+		ctx->io_wq[0] = NULL;
+		goto err;
+	}
+
+	/*
+	 * This pool is for unbounded request times, things that could
+	 * take an indeterminite amount of time to complete. Use a separate
+	 * pool for those, to provide fairness with the bounded queue.
+	 */
+	ctx->io_wq[1] = io_wq_create(ctx->cq_entries, ctx->sqo_mm);
+	if (IS_ERR(ctx->io_wq[1])) {
+		ret = PTR_ERR(ctx->io_wq[1]);
+		ctx->io_wq[1] = NULL;
 		goto err;
 	}
 
@@ -4114,6 +4172,8 @@ static int io_uring_fasync(int fd, struct file *file, int on)
 
 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
 {
+	int i;
+
 	mutex_lock(&ctx->uring_lock);
 	percpu_ref_kill(&ctx->refs);
 	mutex_unlock(&ctx->uring_lock);
@@ -4121,8 +4181,9 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
 	io_kill_timeouts(ctx);
 	io_poll_remove_all(ctx);
 
-	if (ctx->io_wq)
-		io_wq_cancel_all(ctx->io_wq);
+	for (i = 0; i < ARRAY_SIZE(ctx->io_wq); i++)
+		if (ctx->io_wq[i])
+			io_wq_cancel_all(ctx->io_wq[i]);
 
 	io_iopoll_reap_events(ctx);
 	wait_for_completion(&ctx->ctx_done);
@@ -4138,7 +4199,7 @@ static int io_uring_release(struct inode *inode, struct file *file)
 	return 0;
 }
 
-static void io_uring_cancel_files(struct io_ring_ctx *ctx,
+static void io_uring_cancel_files(struct io_ring_ctx *ctx, struct io_wq *wq,
 				  struct files_struct *files)
 {
 	struct io_kiocb *req;
@@ -4150,7 +4211,7 @@ static void io_uring_cancel_files(struct io_ring_ctx *ctx,
 		spin_lock_irq(&ctx->inflight_lock);
 		list_for_each_entry(req, &ctx->inflight_list, inflight_entry) {
 			if (req->work.files == files) {
-				ret = io_wq_cancel_work(ctx->io_wq, &req->work);
+				ret = io_wq_cancel_work(wq, &req->work);
 				break;
 			}
 		}
@@ -4178,10 +4239,14 @@ static void io_uring_cancel_files(struct io_ring_ctx *ctx,
 static int io_uring_flush(struct file *file, void *data)
 {
 	struct io_ring_ctx *ctx = file->private_data;
+	int i;
+
+	for (i = 0; i < ARRAY_SIZE(ctx->io_wq); i++)
+		io_uring_cancel_files(ctx, ctx->io_wq[i], data);
 
-	io_uring_cancel_files(ctx, data);
 	if (fatal_signal_pending(current) || (current->flags & PF_EXITING))
-		io_wq_cancel_all(ctx->io_wq);
+		for (i = 0; i < ARRAY_SIZE(ctx->io_wq); i++)
+			io_wq_cancel_all(ctx->io_wq[i]);
 	return 0;
 }
 
-- 
2.24.0


^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, back to index

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-11-06 19:40 [PATCHSET 0/2] Add unbounded io_wq for io_uring Jens Axboe
2019-11-06 19:40 ` [PATCH 1/2] io-wq: pass in io_wq to work handler Jens Axboe
2019-11-06 19:40 ` [PATCH 2/2] io_uring: use separate io_wq's for bounded and unbounded request times Jens Axboe

Linux-Block Archive on lore.kernel.org

Archives are clonable:
	git clone --mirror https://lore.kernel.org/linux-block/0 linux-block/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 linux-block linux-block/ https://lore.kernel.org/linux-block \
		linux-block@vger.kernel.org
	public-inbox-index linux-block

Example config snippet for mirrors

Newsgroup available over NNTP:
	nntp://nntp.lore.kernel.org/org.kernel.vger.linux-block


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git