linux-man.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
From: Jens Axboe <axboe@kernel.dk>
To: linux-aio@kvack.org, linux-block@vger.kernel.org,
	linux-man@vger.kernel.org, linux-api@vger.kernel.org
Cc: hch@lst.de, jmoyer@redhat.com, avi@scylladb.com,
	Jens Axboe <axboe@kernel.dk>
Subject: [PATCH 14/18] io_uring: add submission polling
Date: Mon, 28 Jan 2019 14:35:34 -0700	[thread overview]
Message-ID: <20190128213538.13486-15-axboe@kernel.dk> (raw)
In-Reply-To: <20190128213538.13486-1-axboe@kernel.dk>

This enables an application to do IO, without ever entering the kernel.
By using the SQ ring to fill in new sqes and watching for completions
on the CQ ring, we can submit and reap IOs without doing a single system
call. The kernel side thread will poll for new submissions, and in case
of HIPRI/polled IO, it'll also poll for completions.

By default, we allow 1 second of active spinning. This can by changed
by passing in a different grace period at io_uring_register(2) time.
If the thread exceeds this idle time without having any work to do, it
will set:

sq_ring->flags |= IORING_SQ_NEED_WAKEUP.

The application will have to call io_uring_enter() to start things back
up again. If IO is kept busy, that will never be needed. Basically an
application that has this feature enabled will guard it's
io_uring_enter(2) call with:

read_barrier();
if (*sq_ring->flags & IORING_SQ_NEED_WAKEUP)
	io_uring_enter(fd, 0, 0, IORING_ENTER_SQ_WAKEUP);

instead of calling it unconditionally.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c                 | 232 +++++++++++++++++++++++++++++++++-
 include/uapi/linux/io_uring.h |  12 +-
 2 files changed, 237 insertions(+), 7 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 77993972879b..34e786eb2e2d 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -24,6 +24,7 @@
 #include <linux/percpu.h>
 #include <linux/slab.h>
 #include <linux/workqueue.h>
+#include <linux/kthread.h>
 #include <linux/blkdev.h>
 #include <linux/bvec.h>
 #include <linux/anon_inodes.h>
@@ -79,14 +80,16 @@ struct io_ring_ctx {
 		unsigned		cached_sq_head;
 		unsigned		sq_entries;
 		unsigned		sq_mask;
-		unsigned		sq_thread_cpu;
+		unsigned		sq_thread_idle;
 		struct io_uring_sqe	*sq_sqes;
 	} ____cacheline_aligned_in_smp;
 
 	/* IO offload */
 	struct workqueue_struct	*sqo_wq;
+	struct task_struct	*sqo_thread;	/* if using sq thread polling */
 	struct mm_struct	*sqo_mm;
 	struct files_struct	*sqo_files;
+	wait_queue_head_t	sqo_wait;
 
 	struct {
 		/* CQ ring */
@@ -262,6 +265,8 @@ static void __io_cqring_add_event(struct io_ring_ctx *ctx, u64 ki_user_data,
 
 	if (waitqueue_active(&ctx->wait))
 		wake_up(&ctx->wait);
+	if (waitqueue_active(&ctx->sqo_wait))
+		wake_up(&ctx->sqo_wait);
 }
 
 static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 ki_user_data,
@@ -1113,6 +1118,168 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
 	return false;
 }
 
+static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
+			  unsigned int nr, bool mm_fault)
+{
+	struct io_submit_state state, *statep = NULL;
+	int ret, i, submitted = 0;
+
+	if (nr > IO_PLUG_THRESHOLD) {
+		io_submit_state_start(&state, ctx, nr);
+		statep = &state;
+	}
+
+	for (i = 0; i < nr; i++) {
+		if (unlikely(mm_fault))
+			ret = -EFAULT;
+		else
+			ret = io_submit_sqe(ctx, &sqes[i], statep);
+		if (!ret) {
+			submitted++;
+			continue;
+		}
+
+		io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret, 0);
+	}
+
+	if (statep)
+		io_submit_state_end(&state);
+
+	return submitted;
+}
+
+static int io_sq_thread(void *data)
+{
+	struct sqe_submit sqes[IO_IOPOLL_BATCH];
+	struct io_ring_ctx *ctx = data;
+	struct mm_struct *cur_mm = NULL;
+	struct files_struct *old_files;
+	mm_segment_t old_fs;
+	DEFINE_WAIT(wait);
+	unsigned inflight;
+	unsigned long timeout;
+
+	old_files = current->files;
+	current->files = ctx->sqo_files;
+
+	old_fs = get_fs();
+	set_fs(USER_DS);
+
+	timeout = inflight = 0;
+	while (!kthread_should_stop()) {
+		bool all_fixed, mm_fault = false;
+		int i;
+
+		if (inflight) {
+			unsigned nr_events = 0;
+
+			if (ctx->flags & IORING_SETUP_IOPOLL) {
+				/*
+				 * We disallow the app entering submit/complete
+				 * with polling, so no need to lock the ring.
+				 */
+				io_iopoll_check(ctx, &nr_events, 0);
+			} else {
+				/*
+				 * Normal IO, just pretend everything completed.
+				 * We don't have to poll completions for that.
+				 */
+				nr_events = inflight;
+			}
+
+			inflight -= nr_events;
+			if (!inflight)
+				timeout = jiffies + ctx->sq_thread_idle;
+		}
+
+		if (!io_get_sqring(ctx, &sqes[0])) {
+			/*
+			 * We're polling. If we're within the defined idle
+			 * period, then let us spin without work before going
+			 * to sleep.
+			 */
+			if (inflight || !time_after(jiffies, timeout)) {
+				cpu_relax();
+				continue;
+			}
+
+			/*
+			 * Drop cur_mm before scheduling, we can't hold it for
+			 * long periods (or over schedule()). Do this before
+			 * adding ourselves to the waitqueue, as the unuse/drop
+			 * may sleep.
+			 */
+			if (cur_mm) {
+				unuse_mm(cur_mm);
+				mmput(cur_mm);
+				cur_mm = NULL;
+			}
+
+			prepare_to_wait(&ctx->sqo_wait, &wait,
+						TASK_INTERRUPTIBLE);
+
+			/* Tell userspace we may need a wakeup call */
+			ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP;
+			smp_wmb();
+
+			if (!io_get_sqring(ctx, &sqes[0])) {
+				if (kthread_should_park())
+					kthread_parkme();
+				if (kthread_should_stop()) {
+					finish_wait(&ctx->sqo_wait, &wait);
+					break;
+				}
+				if (signal_pending(current))
+					flush_signals(current);
+				schedule();
+				finish_wait(&ctx->sqo_wait, &wait);
+
+				ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
+				smp_wmb();
+				continue;
+			}
+			finish_wait(&ctx->sqo_wait, &wait);
+
+			ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
+			smp_wmb();
+		}
+
+		i = 0;
+		all_fixed = true;
+		do {
+			if (all_fixed && io_sqe_needs_user(sqes[i].sqe))
+				all_fixed = false;
+
+			i++;
+			if (i == ARRAY_SIZE(sqes))
+				break;
+		} while (io_get_sqring(ctx, &sqes[i]));
+
+		io_commit_sqring(ctx);
+
+		/* Unless all new commands are FIXED regions, grab mm */
+		if (!all_fixed && !cur_mm) {
+			mm_fault = !mmget_not_zero(ctx->sqo_mm);
+			if (!mm_fault) {
+				use_mm(ctx->sqo_mm);
+				cur_mm = ctx->sqo_mm;
+			}
+		}
+
+		inflight += io_submit_sqes(ctx, sqes, i, mm_fault);
+	}
+
+	io_iopoll_reap_events(ctx);
+
+	current->files = old_files;
+	set_fs(old_fs);
+	if (cur_mm) {
+		unuse_mm(cur_mm);
+		mmput(cur_mm);
+	}
+	return 0;
+}
+
 static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
 {
 	struct io_submit_state state, *statep = NULL;
@@ -1198,6 +1365,17 @@ static int __io_uring_enter(struct io_ring_ctx *ctx, unsigned to_submit,
 {
 	int submitted, ret;
 
+	/*
+	 * For SQ polling, the thread will do all submissions and completions.
+	 * Just return the requested submit count, and wake the thread if
+	 * we were asked to.
+	 */
+	if (ctx->flags & IORING_SETUP_SQPOLL) {
+		if (flags & IORING_ENTER_SQ_WAKEUP)
+			wake_up(&ctx->sqo_wait);
+		return to_submit;
+	}
+
 	submitted = ret = 0;
 	if (to_submit) {
 		submitted = io_ring_submit(ctx, to_submit);
@@ -1276,10 +1454,12 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
 	return ret;
 }
 
-static int io_sq_offload_start(struct io_ring_ctx *ctx)
+static int io_sq_offload_start(struct io_ring_ctx *ctx,
+			       struct io_uring_params *p)
 {
 	int ret;
 
+	init_waitqueue_head(&ctx->sqo_wait);
 	ctx->sqo_mm = current->mm;
 
 	/*
@@ -1292,6 +1472,34 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx)
 	if (!ctx->sqo_files)
 		goto err;
 
+	ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
+	if (!ctx->sq_thread_idle)
+		ctx->sq_thread_idle = HZ;
+
+	if (ctx->flags & IORING_SETUP_SQPOLL) {
+		if (p->flags & IORING_SETUP_SQ_AFF) {
+			int cpu;
+
+			cpu = array_index_nospec(p->sq_thread_cpu, NR_CPUS);
+			ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
+							ctx, cpu,
+							"io_uring-sq");
+		} else {
+			ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
+							"io_uring-sq");
+		}
+		if (IS_ERR(ctx->sqo_thread)) {
+			ret = PTR_ERR(ctx->sqo_thread);
+			ctx->sqo_thread = NULL;
+			goto err;
+		}
+		wake_up_process(ctx->sqo_thread);
+	} else if (p->flags & IORING_SETUP_SQ_AFF) {
+		/* Can't have SQ_AFF without SQPOLL */
+		ret = -EINVAL;
+		goto err;
+	}
+
 	/* Do QD, or 2 * CPUS, whatever is smallest */
 	ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
 			min(ctx->sq_entries - 1, 2 * num_online_cpus()));
@@ -1302,6 +1510,11 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx)
 
 	return 0;
 err:
+	if (ctx->sqo_thread) {
+		kthread_park(ctx->sqo_thread);
+		kthread_stop(ctx->sqo_thread);
+		ctx->sqo_thread = NULL;
+	}
 	if (ctx->sqo_files)
 		ctx->sqo_files = NULL;
 	ctx->sqo_mm = NULL;
@@ -1560,8 +1773,13 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
 
 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
 {
+	if (ctx->sqo_thread) {
+		kthread_park(ctx->sqo_thread);
+		kthread_stop(ctx->sqo_thread);
+	}
 	destroy_workqueue(ctx->sqo_wq);
-	io_iopoll_reap_events(ctx);
+	if (!(ctx->flags & IORING_SETUP_SQPOLL))
+		io_iopoll_reap_events(ctx);
 	io_sqe_buffer_unregister(ctx);
 	io_sqe_files_unregister(ctx);
 
@@ -1602,7 +1820,8 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
 	percpu_ref_kill(&ctx->refs);
 	mutex_unlock(&ctx->uring_lock);
 
-	io_iopoll_reap_events(ctx);
+	if (!(ctx->flags & IORING_SETUP_SQPOLL))
+		io_iopoll_reap_events(ctx);
 	wait_for_completion(&ctx->ctx_done);
 	io_ring_ctx_free(ctx);
 }
@@ -1771,7 +1990,7 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p)
 	if (ret)
 		goto err;
 
-	ret = io_sq_offload_start(ctx);
+	ret = io_sq_offload_start(ctx, p);
 	if (ret)
 		goto err;
 
@@ -1820,7 +2039,8 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
 			return -EINVAL;
 	}
 
-	if (p.flags & ~IORING_SETUP_IOPOLL)
+	if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
+			IORING_SETUP_SQ_AFF))
 		return -EINVAL;
 
 	ret = io_uring_create(entries, &p);
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 8323320077ec..cb3592d618fe 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -44,6 +44,8 @@ struct io_uring_sqe {
  * io_uring_setup() flags
  */
 #define IORING_SETUP_IOPOLL	(1 << 0)	/* io_context is polled */
+#define IORING_SETUP_SQPOLL	(1 << 1)	/* SQ poll thread */
+#define IORING_SETUP_SQ_AFF	(1 << 2)	/* sq_thread_cpu is valid */
 
 #define IORING_OP_NOP		0
 #define IORING_OP_READV		1
@@ -87,6 +89,11 @@ struct io_sqring_offsets {
 	__u32 resv[3];
 };
 
+/*
+ * sq_ring->flags
+ */
+#define IORING_SQ_NEED_WAKEUP	(1 << 0) /* needs io_uring_enter wakeup */
+
 struct io_cqring_offsets {
 	__u32 head;
 	__u32 tail;
@@ -101,6 +108,7 @@ struct io_cqring_offsets {
  * io_uring_enter(2) flags
  */
 #define IORING_ENTER_GETEVENTS	(1 << 0)
+#define IORING_ENTER_SQ_WAKEUP	(1 << 1)
 
 /*
  * Passed in for io_uring_setup(2). Copied back with updated info on success
@@ -109,7 +117,9 @@ struct io_uring_params {
 	__u32 sq_entries;
 	__u32 cq_entries;
 	__u32 flags;
-	__u16 resv[10];
+	__u16 sq_thread_cpu;
+	__u16 sq_thread_idle;
+	__u16 resv[8];
 	struct io_sqring_offsets sq_off;
 	struct io_cqring_offsets cq_off;
 };
-- 
2.17.1

--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo@kvack.org.  For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: <a href=mailto:"aart@kvack.org">aart@kvack.org</a>

  parent reply	other threads:[~2019-01-28 21:35 UTC|newest]

Thread overview: 62+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-01-28 21:35 [PATCHSET v8] io_uring IO interface Jens Axboe
2019-01-28 21:35 ` [PATCH 01/18] fs: add an iopoll method to struct file_operations Jens Axboe
2019-01-28 21:35 ` [PATCH 02/18] block: wire up block device iopoll method Jens Axboe
2019-01-28 21:35 ` [PATCH 03/18] block: add bio_set_polled() helper Jens Axboe
2019-01-28 21:35 ` [PATCH 04/18] iomap: wire up the iopoll method Jens Axboe
2019-01-28 21:35 ` [PATCH 05/18] Add io_uring IO interface Jens Axboe
2019-01-28 21:53   ` Jeff Moyer
2019-01-28 21:56     ` Jens Axboe
2019-01-28 22:32   ` Jann Horn
2019-01-28 23:46     ` Jens Axboe
2019-01-28 23:59       ` Jann Horn
2019-01-29  0:03         ` Jens Axboe
2019-01-29  0:31           ` Jens Axboe
2019-01-29  0:34             ` Jann Horn
2019-01-29  0:55               ` Jens Axboe
2019-01-29  0:58                 ` Jann Horn
2019-01-29  1:01                   ` Jens Axboe
2019-02-01 16:57         ` Matt Mullins
2019-02-01 17:04           ` Jann Horn
2019-02-01 17:23             ` Jann Horn
2019-02-01 18:05               ` Al Viro
2019-01-29  1:07   ` Jann Horn
2019-01-29  2:21     ` Jann Horn
2019-01-29  2:54       ` Jens Axboe
2019-01-29  3:46       ` Jens Axboe
2019-01-29 15:56         ` Jann Horn
2019-01-29 16:06           ` Jens Axboe
2019-01-29  2:21     ` Jens Axboe
2019-01-29  1:29   ` Jann Horn
2019-01-29  1:31     ` Jens Axboe
2019-01-29  1:32       ` Jann Horn
2019-01-29  2:23         ` Jens Axboe
2019-01-29  7:12   ` Bert Wesarg
2019-01-29 12:12   ` Florian Weimer
2019-01-29 13:35     ` Jens Axboe
2019-01-28 21:35 ` [PATCH 06/18] io_uring: add fsync support Jens Axboe
2019-01-28 21:35 ` [PATCH 07/18] io_uring: support for IO polling Jens Axboe
2019-01-29 17:24   ` Christoph Hellwig
2019-01-29 18:31     ` Jens Axboe
2019-01-29 19:10       ` Jens Axboe
2019-01-29 20:35         ` Jeff Moyer
2019-01-29 20:37           ` Jens Axboe
2019-01-28 21:35 ` [PATCH 08/18] fs: add fget_many() and fput_many() Jens Axboe
2019-01-28 21:35 ` [PATCH 09/18] io_uring: use fget/fput_many() for file references Jens Axboe
2019-01-28 21:56   ` Jann Horn
2019-01-28 22:03     ` Jens Axboe
2019-01-28 21:35 ` [PATCH 10/18] io_uring: batch io_kiocb allocation Jens Axboe
2019-01-29 17:26   ` Christoph Hellwig
2019-01-29 18:14     ` Jens Axboe
2019-01-28 21:35 ` [PATCH 11/18] block: implement bio helper to add iter bvec pages to bio Jens Axboe
2019-01-28 21:35 ` [PATCH 12/18] io_uring: add support for pre-mapped user IO buffers Jens Axboe
2019-01-28 23:35   ` Jann Horn
2019-01-28 23:50     ` Jens Axboe
2019-01-29  0:36       ` Jann Horn
2019-01-29  1:25         ` Jens Axboe
2019-01-28 21:35 ` [PATCH 13/18] io_uring: add file set registration Jens Axboe
2019-01-28 21:35 ` Jens Axboe [this message]
2019-01-28 21:35 ` [PATCH 15/18] io_uring: add io_kiocb ref count Jens Axboe
2019-01-29 17:26   ` Christoph Hellwig
2019-01-28 21:35 ` [PATCH 16/18] io_uring: add support for IORING_OP_POLL Jens Axboe
2019-01-28 21:35 ` [PATCH 17/18] io_uring: allow workqueue item to handle multiple buffered requests Jens Axboe
2019-01-28 21:35 ` [PATCH 18/18] io_uring: add io_uring_event cache hit information Jens Axboe

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20190128213538.13486-15-axboe@kernel.dk \
    --to=axboe@kernel.dk \
    --cc=avi@scylladb.com \
    --cc=hch@lst.de \
    --cc=jmoyer@redhat.com \
    --cc=linux-aio@kvack.org \
    --cc=linux-api@vger.kernel.org \
    --cc=linux-block@vger.kernel.org \
    --cc=linux-man@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).