linux-fsdevel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCHSET 0/5] Support for polled aio
@ 2018-11-17 23:53 Jens Axboe
  2018-11-17 23:53 ` [PATCH 1/5] aio: use assigned completion handler Jens Axboe
                   ` (4 more replies)
  0 siblings, 5 replies; 17+ messages in thread
From: Jens Axboe @ 2018-11-17 23:53 UTC (permalink / raw)
  To: linux-block, linux-aio, linux-fsdevel

Up until now, IO polling has been exclusively available through preadv2
and pwrite2, both fully synchronous interfaces. This works fine for
completely synchronous use cases, but that's about it. If QD=1 wasn't
enough read the performance goals, the only alternative was to increase
the thread count. Unfortunately, that isn't very efficient, both in
terms of CPU utilization (each thread will use 100% of CPU time) and in
terms of achievable performance.

With all of the recent advances in polling (non-irq polling, efficiency
gains, multiple pollable queues, etc), it's now feasible to add polling
support to aio - this patchset just does that.

An iocb flag is added, IOCB_FLAG_HIPRI, similarly to how we have
RWF_HIPRI for preadv2/pwritev2. It's applicable to the commands that
read/write data, like IOCB_CMD_PREAD/IOCB_CMD_PWRITE and the vectored
variants.  Submission works the same as before.

The polling happens off io_getevents(), when the application is looking
for completions. That also works like before, with the only difference
being that events aren't waited for, they are actively found and polled
on the device side.

The only real difference in terms of completions is that polling does
NOT use the libaio user exposed ring. This is just not feasible, as the
application needs to be the one that actively polls for the events.
Because of this, that's not supported with polling, and the internals
completely ignore the ring.

Outside of that, it's illegal to mix polled with non-polled IO on the
same io_context. There's no way to setup an io_context with the
information that we will be polling on it (always add flags to new
syscalls...), hence we need to track this internally. For polled IO, we
can never wait for events, we have to actively find them. I didn't want
to add counters to the io_context to inc/dec for each IO, so I just made
this illegal. If an application attempts to submit both polled and
non-polled IO on the same io_context, it will get an -EINVAL return at
io_submit() time.

Performance results have been very promising. For an internal Facebook
flash storage device, we're getting 20% increase in performance, with an
identical reduction in latencies. Notably, this is testing a highly
tuned setup to just turning on polling. I'm sure there's still extra
room for performance there. Note that at these speeds and feeds, the
polling ends up NOT using more CPU time than we did without polling!

On that same box, I ran microbenchmarks, and was able to increase peak
performance 25%. The box was pegged at around 2.4M IOPS, with just
turning on polling, the bandwidth was maxed out at 12.5GB/sec doing 3.2M
IOPS. All of this with 2 millions LESS interrupts/seconds, and 2M+ less
context switches.

In terms of efficiency, a tester was able to get 800K+ IOPS out of a
_single_ thread at QD=16 on a device. These kinds of results are just
unheard of in terms of efficiency.

You can find this code in my aio-poll branch, and that branch (and
these patches) are on top of my mq-perf branch.

 fs/aio.c                     | 495 ++++++++++++++++++++++++++++++++---
 fs/block_dev.c               |   2 +
 fs/direct-io.c               |   4 +-
 fs/iomap.c                   |   7 +-
 include/linux/fs.h           |   1 +
 include/uapi/linux/aio_abi.h |   2 +
 6 files changed, 478 insertions(+), 33 deletions(-)

-- 
Jens Axboe

^ permalink raw reply	[flat|nested] 17+ messages in thread

* [PATCH 1/5] aio: use assigned completion handler
  2018-11-17 23:53 [PATCHSET 0/5] Support for polled aio Jens Axboe
@ 2018-11-17 23:53 ` Jens Axboe
  2018-11-19  8:06   ` Christoph Hellwig
  2018-11-17 23:53 ` [PATCH 2/5] aio: fix failure to put the file pointer Jens Axboe
                   ` (3 subsequent siblings)
  4 siblings, 1 reply; 17+ messages in thread
From: Jens Axboe @ 2018-11-17 23:53 UTC (permalink / raw)
  To: linux-block, linux-aio, linux-fsdevel; +Cc: Jens Axboe

We know this is a read/write request, but in preparation for
having different kinds of those, ensure that we call the assigned
handler instead of assuming it's aio_complete_rq().

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/aio.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/fs/aio.c b/fs/aio.c
index 301e6314183b..b36691268b6c 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -1484,7 +1484,7 @@ static inline void aio_rw_done(struct kiocb *req, ssize_t ret)
 		ret = -EINTR;
 		/*FALLTHRU*/
 	default:
-		aio_complete_rw(req, ret, 0);
+		req->ki_complete(req, ret, 0);
 	}
 }
 
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [PATCH 2/5] aio: fix failure to put the file pointer
  2018-11-17 23:53 [PATCHSET 0/5] Support for polled aio Jens Axboe
  2018-11-17 23:53 ` [PATCH 1/5] aio: use assigned completion handler Jens Axboe
@ 2018-11-17 23:53 ` Jens Axboe
  2018-11-19  8:07   ` Christoph Hellwig
  2018-11-17 23:53 ` [PATCH 3/5] aio: add iocb->ki_blk_qc field Jens Axboe
                   ` (2 subsequent siblings)
  4 siblings, 1 reply; 17+ messages in thread
From: Jens Axboe @ 2018-11-17 23:53 UTC (permalink / raw)
  To: linux-block, linux-aio, linux-fsdevel; +Cc: Jens Axboe

If the ioprio capability check fails, we return without putting
the file pointer.

Fixes: d9a08a9e616b ("fs: Add aio iopriority support")
Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/aio.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/fs/aio.c b/fs/aio.c
index b36691268b6c..3d9bc81cf500 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -1436,6 +1436,7 @@ static int aio_prep_rw(struct kiocb *req, struct iocb *iocb)
 		ret = ioprio_check_cap(iocb->aio_reqprio);
 		if (ret) {
 			pr_debug("aio ioprio check cap error: %d\n", ret);
+			fput(req->ki_filp);
 			return ret;
 		}
 
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [PATCH 3/5] aio: add iocb->ki_blk_qc field
  2018-11-17 23:53 [PATCHSET 0/5] Support for polled aio Jens Axboe
  2018-11-17 23:53 ` [PATCH 1/5] aio: use assigned completion handler Jens Axboe
  2018-11-17 23:53 ` [PATCH 2/5] aio: fix failure to put the file pointer Jens Axboe
@ 2018-11-17 23:53 ` Jens Axboe
  2018-11-19  1:59   ` Dave Chinner
  2018-11-17 23:53 ` [PATCH 4/5] aio: support for IO polling Jens Axboe
  2018-11-17 23:53 ` [PATCH 5/5] aio: add support for file based polled IO Jens Axboe
  4 siblings, 1 reply; 17+ messages in thread
From: Jens Axboe @ 2018-11-17 23:53 UTC (permalink / raw)
  To: linux-block, linux-aio, linux-fsdevel; +Cc: Jens Axboe

Add the field and have the blockdev direct_IO() helpers set it.
This is in preparation for being able to poll for iocb completion.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/block_dev.c     | 2 ++
 include/linux/fs.h | 1 +
 2 files changed, 3 insertions(+)

diff --git a/fs/block_dev.c b/fs/block_dev.c
index d233a59ea364..8a2fed18e3fc 100644
--- a/fs/block_dev.c
+++ b/fs/block_dev.c
@@ -236,6 +236,7 @@ __blkdev_direct_IO_simple(struct kiocb *iocb, struct iov_iter *iter,
 		bio.bi_opf |= REQ_HIPRI;
 
 	qc = submit_bio(&bio);
+	WRITE_ONCE(iocb->ki_blk_qc, qc);
 	for (;;) {
 		__set_current_state(TASK_UNINTERRUPTIBLE);
 
@@ -396,6 +397,7 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages)
 				bio->bi_opf |= REQ_HIPRI;
 
 			qc = submit_bio(bio);
+			WRITE_ONCE(iocb->ki_blk_qc, qc);
 			break;
 		}
 
diff --git a/include/linux/fs.h b/include/linux/fs.h
index c95c0807471f..032761d9b218 100644
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -310,6 +310,7 @@ struct kiocb {
 	int			ki_flags;
 	u16			ki_hint;
 	u16			ki_ioprio; /* See linux/ioprio.h */
+	u32			ki_blk_qc;
 } __randomize_layout;
 
 static inline bool is_sync_kiocb(struct kiocb *kiocb)
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [PATCH 4/5] aio: support for IO polling
  2018-11-17 23:53 [PATCHSET 0/5] Support for polled aio Jens Axboe
                   ` (2 preceding siblings ...)
  2018-11-17 23:53 ` [PATCH 3/5] aio: add iocb->ki_blk_qc field Jens Axboe
@ 2018-11-17 23:53 ` Jens Axboe
  2018-11-19  8:11   ` Christoph Hellwig
  2018-11-17 23:53 ` [PATCH 5/5] aio: add support for file based polled IO Jens Axboe
  4 siblings, 1 reply; 17+ messages in thread
From: Jens Axboe @ 2018-11-17 23:53 UTC (permalink / raw)
  To: linux-block, linux-aio, linux-fsdevel; +Cc: Jens Axboe

Add polled variants of PREAD/PREADV and PWRITE/PWRITEV. These act
like their non-polled counterparts, except we expect to poll for
completion of them. The polling happens at io_getevent() time, and
works just like non-polled IO.

Polled IO doesn't support the user mapped completion ring. Events
must be reaped through the io_getevents() system call. For non-irq
driven poll devices, there's no way to support completion reaping
from userspace by just looking at the ring. The application itself
is the one that pulls completion entries.

It's illegal to mix polled and non-polled IO on the same ioctx.
This is because we need to know at io_getevents() time if we have
any non-polled IOs pending. For polled IO, we can never sleep in
the read_events() path, since polled IO may not trigger async
completions through an IRQ. In lieu of having some way of marking
an ioctx as strictly polled or non-polled, we track this state
and return -EINVAL if the application mixes the two types.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/aio.c                     | 490 ++++++++++++++++++++++++++++++++---
 include/uapi/linux/aio_abi.h |   2 +
 2 files changed, 463 insertions(+), 29 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index 3d9bc81cf500..500da3ffc376 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -86,6 +86,11 @@ struct ctx_rq_wait {
 	atomic_t count;
 };
 
+enum {
+	CTX_TYPE_NORMAL = 0,
+	CTX_TYPE_POLLED,
+};
+
 struct kioctx {
 	struct percpu_ref	users;
 	atomic_t		dead;
@@ -96,6 +101,12 @@ struct kioctx {
 
 	struct __percpu kioctx_cpu *cpu;
 
+	/*
+	 * CTX_TYPE_ bits set. Used to not mix and match polled and
+	 * normal IO.
+	 */
+	unsigned long		io_type;
+
 	/*
 	 * For percpu reqs_available, number of slots we move to/from global
 	 * counter at a time:
@@ -138,6 +149,12 @@ struct kioctx {
 		atomic_t	reqs_available;
 	} ____cacheline_aligned_in_smp;
 
+	struct {
+		spinlock_t poll_lock;
+		struct list_head poll_pending;
+		struct list_head poll_done;
+	} ____cacheline_aligned_in_smp;
+
 	struct {
 		spinlock_t	ctx_lock;
 		struct list_head active_reqs;	/* used for cancellation */
@@ -191,6 +208,9 @@ struct aio_kiocb {
 
 	struct list_head	ki_list;	/* the aio core uses this
 						 * for cancellation */
+
+	struct list_head	ki_poll_list;
+
 	refcount_t		ki_refcnt;
 
 	/*
@@ -198,6 +218,12 @@ struct aio_kiocb {
 	 * this is the underlying eventfd context to deliver events to.
 	 */
 	struct eventfd_ctx	*ki_eventfd;
+
+	/*
+	 * For polled IO, stash completion info here
+	 */
+	long			ki_poll_res;
+	long			ki_poll_res2;
 };
 
 /*------ sysctl variables----*/
@@ -214,6 +240,8 @@ static struct vfsmount *aio_mnt;
 static const struct file_operations aio_ring_fops;
 static const struct address_space_operations aio_ctx_aops;
 
+static void aio_reap_polled_events(struct kioctx *);
+
 static struct file *aio_private_file(struct kioctx *ctx, loff_t nr_pages)
 {
 	struct file *file;
@@ -732,6 +760,10 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
 
 	INIT_LIST_HEAD(&ctx->active_reqs);
 
+	spin_lock_init(&ctx->poll_lock);
+	INIT_LIST_HEAD(&ctx->poll_pending);
+	INIT_LIST_HEAD(&ctx->poll_done);
+
 	if (percpu_ref_init(&ctx->users, free_ioctx_users, 0, GFP_KERNEL))
 		goto err;
 
@@ -814,6 +846,8 @@ static int kill_ioctx(struct mm_struct *mm, struct kioctx *ctx,
 	RCU_INIT_POINTER(table->table[ctx->id], NULL);
 	spin_unlock(&mm->ioctx_lock);
 
+	aio_reap_polled_events(ctx);
+
 	/* free_ioctx_reqs() will do the necessary RCU synchronization */
 	wake_up_all(&ctx->wait);
 
@@ -997,11 +1031,11 @@ static void user_refill_reqs_available(struct kioctx *ctx)
  *	Allocate a slot for an aio request.
  * Returns NULL if no requests are free.
  */
-static inline struct aio_kiocb *aio_get_req(struct kioctx *ctx)
+static inline struct aio_kiocb *aio_get_req(struct kioctx *ctx, bool needs_ring)
 {
 	struct aio_kiocb *req;
 
-	if (!get_reqs_available(ctx)) {
+	if (needs_ring && !get_reqs_available(ctx)) {
 		user_refill_reqs_available(ctx);
 		if (!get_reqs_available(ctx))
 			return NULL;
@@ -1013,11 +1047,13 @@ static inline struct aio_kiocb *aio_get_req(struct kioctx *ctx)
 
 	percpu_ref_get(&ctx->reqs);
 	INIT_LIST_HEAD(&req->ki_list);
+	INIT_LIST_HEAD(&req->ki_poll_list);
 	refcount_set(&req->ki_refcnt, 0);
 	req->ki_ctx = ctx;
 	return req;
 out_put:
-	put_reqs_available(ctx, 1);
+	if (needs_ring)
+		put_reqs_available(ctx, 1);
 	return NULL;
 }
 
@@ -1057,6 +1093,14 @@ static inline void iocb_put(struct aio_kiocb *iocb)
 	}
 }
 
+static void iocb_put_many(struct kioctx *ctx, void  **iocbs, int nr)
+{
+	if (nr) {
+		kmem_cache_free_bulk(kiocb_cachep, nr, iocbs);
+		percpu_ref_put_many(&ctx->reqs, nr);
+	}
+}
+
 /* aio_complete
  *	Called when the io request on the given iocb is complete.
  */
@@ -1240,6 +1284,299 @@ static bool aio_read_events(struct kioctx *ctx, long min_nr, long nr,
 	return ret < 0 || *i >= min_nr;
 }
 
+struct aio_iopoll_data {
+	unsigned int blk_qc;
+	struct block_device *bdev;
+};
+
+static int aio_io_poll(struct aio_iopoll_data *pd, bool wait)
+{
+#ifdef CONFIG_BLOCK
+	/*
+	 * Should only happen if someone sets ->ki_blk_qc at random,
+	 * not being a blockdev target. We'll just ignore it, the IO
+	 * will complete normally without being polled.
+	 */
+	if (pd->bdev)
+		return blk_poll(bdev_get_queue(pd->bdev), pd->blk_qc, wait);
+#endif
+
+	return 0;
+}
+
+static struct block_device *aio_bdev_host(struct kiocb *req)
+{
+	struct inode *inode = req->ki_filp->f_mapping->host;
+
+	if (S_ISBLK(inode->i_mode))
+		return I_BDEV(inode);
+
+	return NULL;
+}
+
+#define AIO_POLL_STACK	8
+
+struct aio_poll_data {
+	struct io_event __user *evs;
+	int off;
+	long max;
+	void *iocbs[AIO_POLL_STACK];
+	int to_free;
+};
+
+/*
+ * Process the done_list of iocbs, copy to user space, and free them.
+ * Migh return with data->iocbs holding entries, in which case
+ * data->to_free is non-zero and the caller should free them.
+ */
+static long aio_poll_reap(struct kioctx *ctx, struct aio_poll_data *data)
+	__releases(&ctx->poll_lock)
+	__acquires(&ctx->poll_lock)
+{
+	struct aio_kiocb *iocb;
+	int ret, nr = 0;
+
+restart:
+	while (!list_empty(&ctx->poll_done)) {
+		struct io_event __user *uev;
+		struct io_event ev;
+
+		if (data->to_free == ARRAY_SIZE(data->iocbs)) {
+			iocb_put_many(ctx, data->iocbs, data->to_free);
+			data->to_free = 0;
+		}
+
+		iocb = list_first_entry(&ctx->poll_done, struct aio_kiocb,
+						ki_poll_list);
+		list_del(&iocb->ki_poll_list);
+
+		data->iocbs[data->to_free++] = iocb;
+		if (!data->evs) {
+			nr++;
+			continue;
+		}
+
+		ev.obj = (u64)(unsigned long)iocb->ki_user_iocb;
+		ev.data = iocb->ki_user_data;
+		ev.res = iocb->ki_poll_res;
+		ev.res2 = iocb->ki_poll_res2;
+
+		uev = data->evs + nr + data->off;
+		if (!__copy_to_user_inatomic(uev, &ev, sizeof(*uev))) {
+			nr++;
+			if (nr + data->off < data->max)
+				continue;
+			break;
+		}
+
+		/*
+		 * Unexpected slow path, drop lock and attempt copy. If this
+		 * also fails, we're done. If it worked, we got another event
+		 * and we restart the list check since we dropped the lock.
+		 */
+		spin_unlock_irq(&ctx->poll_lock);
+		ret = copy_to_user(uev, &ev, sizeof(*uev));
+		spin_lock_irq(&ctx->poll_lock);
+		if (!ret) {
+			nr++;
+			if (nr + data->off < data->max)
+				goto restart;
+
+			break;
+		}
+
+		if (!nr)
+			nr = -EFAULT;
+		break;
+	}
+
+	return nr;
+}
+
+/*
+ * Reap done events, if any
+ */
+static long aio_poll_find(struct kioctx *ctx, struct io_event __user *evs,
+			  int off, long max)
+{
+	struct aio_poll_data data = {
+		.evs		= evs,
+		.off		= off,
+		.max		= max,
+		.to_free	= 0
+	};
+	int ret;
+
+	if (list_empty_careful(&ctx->poll_done))
+		return 0;
+
+	spin_lock_irq(&ctx->poll_lock);
+	ret = aio_poll_reap(ctx, &data);
+	spin_unlock_irq(&ctx->poll_lock);
+
+	if (data.to_free)
+		iocb_put_many(ctx, data.iocbs, data.to_free);
+
+	return ret;
+}
+
+static void aio_poll_for_events(struct kioctx *ctx, struct aio_iopoll_data *pd,
+				unsigned int nr_pd, int off, long min, long max)
+{
+	int i, polled = 0;
+
+	/*
+	 * Poll for needed events with wait == true, anything
+	 * after that we just check if we have more, up to max.
+	 */
+	for (i = 0; i < nr_pd; i++) {
+		bool wait = polled + off >= min;
+
+		polled += aio_io_poll(&pd[i], wait);
+		if (polled + off >= max)
+			break;
+
+		/*
+		 * If we have entries waiting to be reaped, stop polling
+		 */
+		if (!list_empty_careful(&ctx->poll_done))
+			break;
+	}
+}
+
+static int __aio_check_polled(struct kioctx *ctx, struct io_event __user *event,
+			      int off, unsigned int *entries, long min, long max)
+{
+	struct aio_iopoll_data pd[AIO_POLL_STACK];
+	struct aio_kiocb *iocb;
+	unsigned int nr_pd;
+	int ret, pre = 0;
+
+	if (list_empty_careful(&ctx->poll_pending))
+		goto out;
+
+	spin_lock_irq(&ctx->poll_lock);
+
+	/*
+	 * Check if we already have done events that satisfy what we need
+	 */
+	while (!list_empty(&ctx->poll_done)) {
+		struct aio_poll_data data = {
+			.evs = event,
+			.off = off,
+			.max = max,
+			.to_free = 0
+		};
+
+		ret = aio_poll_reap(ctx, &data);
+		if (!ret)
+			break;
+		else if (ret < 0 || ret + off >= min) {
+			spin_unlock_irq(&ctx->poll_lock);
+
+			if (data.to_free)
+				iocb_put_many(ctx, data.iocbs, data.to_free);
+
+			return ret;
+		}
+
+		if (data.to_free)
+			iocb_put_many(ctx, data.iocbs, data.to_free);
+
+		pre = ret;
+		off += ret;
+	}
+
+	/*
+	 * Find up to 'max_nr' worth of events to poll for, including the
+	 * events we already successfully polled
+	 */
+	nr_pd = 0;
+	list_for_each_entry(iocb, &ctx->poll_pending, ki_poll_list) {
+		struct kiocb *kiocb = &iocb->rw;
+		blk_qc_t qc;
+
+		/*
+		 * Not submitted yet, don't poll for it
+		 */
+		qc = READ_ONCE(kiocb->ki_blk_qc);
+		if (qc == BLK_QC_T_NONE)
+			continue;
+
+		pd[nr_pd].blk_qc = qc;
+		pd[nr_pd].bdev = aio_bdev_host(kiocb);
+
+		++nr_pd;
+		if (nr_pd == ARRAY_SIZE(pd) || nr_pd + off >= max)
+			break;
+	}
+	spin_unlock_irq(&ctx->poll_lock);
+
+	if (nr_pd) {
+		*entries = nr_pd;
+		aio_poll_for_events(ctx, pd, nr_pd, off, min, max);
+	}
+
+out:
+	ret = aio_poll_find(ctx, event, off, max);
+	if (ret >= 0)
+		return pre + ret;
+	else if (pre)
+		return pre;
+
+	return ret;
+}
+
+/*
+ * We can't just wait for polled events to come to us, we have to actively
+ * find and complete them.
+ */
+static void aio_reap_polled_events(struct kioctx *ctx)
+{
+	unsigned int loop, found;
+
+	if (!test_bit(CTX_TYPE_POLLED, &ctx->io_type))
+		return;
+
+	spin_lock_irq(&ctx->poll_lock);
+	while (!list_empty(&ctx->poll_pending) || !list_empty(&ctx->poll_done)) {
+		loop = 0;
+		spin_unlock_irq(&ctx->poll_lock);
+		found = __aio_check_polled(ctx, NULL, 0, &loop, 1, UINT_MAX);
+		spin_lock_irq(&ctx->poll_lock);
+	}
+	spin_unlock_irq(&ctx->poll_lock);
+}
+
+static int aio_check_polled(struct kioctx *ctx, long min_nr, long nr,
+			    struct io_event __user *event)
+{
+	unsigned int found;
+	int this, ret = 0;
+
+	if (!access_ok(VERIFY_WRITE, event, nr * sizeof(*event)))
+		return -EFAULT;
+
+	do {
+		int tmin;
+
+		if (ret && need_resched())
+			break;
+
+		found = 0;
+		tmin = ret >= min_nr ? 0 : min_nr - ret;
+		this = __aio_check_polled(ctx, event, ret, &found, tmin, nr);
+		if (this < 0) {
+			if (!ret)
+				ret = this;
+			break;
+		}
+		ret += this;
+	} while (found && ret < min_nr);
+
+	return ret;
+}
+
 static long read_events(struct kioctx *ctx, long min_nr, long nr,
 			struct io_event __user *event,
 			ktime_t until)
@@ -1391,13 +1728,8 @@ static void aio_remove_iocb(struct aio_kiocb *iocb)
 	spin_unlock_irqrestore(&ctx->ctx_lock, flags);
 }
 
-static void aio_complete_rw(struct kiocb *kiocb, long res, long res2)
+static void kiocb_end_write(struct kiocb *kiocb)
 {
-	struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, rw);
-
-	if (!list_empty_careful(&iocb->ki_list))
-		aio_remove_iocb(iocb);
-
 	if (kiocb->ki_flags & IOCB_WRITE) {
 		struct inode *inode = file_inode(kiocb->ki_filp);
 
@@ -1409,21 +1741,66 @@ static void aio_complete_rw(struct kiocb *kiocb, long res, long res2)
 			__sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
 		file_end_write(kiocb->ki_filp);
 	}
+}
+
+static void aio_complete_rw(struct kiocb *kiocb, long res, long res2)
+{
+	struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, rw);
+
+	if (!list_empty_careful(&iocb->ki_list))
+		aio_remove_iocb(iocb);
+
+	kiocb_end_write(kiocb);
 
 	fput(kiocb->ki_filp);
 	aio_complete(iocb, res, res2);
 }
 
-static int aio_prep_rw(struct kiocb *req, struct iocb *iocb)
+static void aio_complete_rw_poll(struct kiocb *kiocb, long res, long res2)
+{
+	struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, rw);
+	struct kioctx *ctx = iocb->ki_ctx;
+	struct file *filp = kiocb->ki_filp;
+	unsigned long flags;
+
+	kiocb_end_write(kiocb);
+
+	iocb->ki_poll_res = res;
+	iocb->ki_poll_res2 = res2;
+
+	spin_lock_irqsave(&ctx->poll_lock, flags);
+	list_move_tail(&iocb->ki_poll_list, &ctx->poll_done);
+	spin_unlock_irqrestore(&ctx->poll_lock, flags);
+
+	fput(filp);
+}
+
+static int aio_prep_rw(struct aio_kiocb *kiocb, struct iocb *iocb)
 {
+	struct kiocb *req = &kiocb->rw;
 	int ret;
 
 	req->ki_filp = fget(iocb->aio_fildes);
 	if (unlikely(!req->ki_filp))
 		return -EBADF;
-	req->ki_complete = aio_complete_rw;
-	req->ki_pos = iocb->aio_offset;
+
 	req->ki_flags = iocb_flags(req->ki_filp);
+
+	if (iocb->aio_flags & IOCB_FLAG_HIPRI) {
+		struct kioctx *ctx = kiocb->ki_ctx;
+
+		req->ki_flags |= IOCB_HIPRI;
+		req->ki_blk_qc = BLK_QC_T_NONE;
+		req->ki_complete = aio_complete_rw_poll;
+
+		spin_lock_irq(&ctx->poll_lock);
+		list_add_tail(&kiocb->ki_poll_list, &ctx->poll_pending);
+		spin_unlock_irq(&ctx->poll_lock);
+	} else {
+		req->ki_complete = aio_complete_rw;
+	}
+
+	req->ki_pos = iocb->aio_offset;
 	if (iocb->aio_flags & IOCB_FLAG_RESFD)
 		req->ki_flags |= IOCB_EVENTFD;
 	req->ki_hint = ki_hint_validate(file_write_hint(req->ki_filp));
@@ -1489,15 +1866,16 @@ static inline void aio_rw_done(struct kiocb *req, ssize_t ret)
 	}
 }
 
-static ssize_t aio_read(struct kiocb *req, struct iocb *iocb, bool vectored,
-		bool compat)
+static ssize_t aio_read(struct aio_kiocb *kiocb, struct iocb *iocb,
+			bool vectored, bool compat)
 {
 	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
+	struct kiocb *req = &kiocb->rw;
 	struct iov_iter iter;
 	struct file *file;
 	ssize_t ret;
 
-	ret = aio_prep_rw(req, iocb);
+	ret = aio_prep_rw(kiocb, iocb);
 	if (ret)
 		return ret;
 	file = req->ki_filp;
@@ -1522,15 +1900,16 @@ static ssize_t aio_read(struct kiocb *req, struct iocb *iocb, bool vectored,
 	return ret;
 }
 
-static ssize_t aio_write(struct kiocb *req, struct iocb *iocb, bool vectored,
-		bool compat)
+static ssize_t aio_write(struct aio_kiocb *kiocb, struct iocb *iocb,
+			 bool vectored, bool compat)
 {
 	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
+	struct kiocb *req = &kiocb->rw;
 	struct iov_iter iter;
 	struct file *file;
 	ssize_t ret;
 
-	ret = aio_prep_rw(req, iocb);
+	ret = aio_prep_rw(kiocb, iocb);
 	if (ret)
 		return ret;
 	file = req->ki_filp;
@@ -1774,11 +2153,30 @@ static ssize_t aio_poll(struct aio_kiocb *aiocb, struct iocb *iocb)
 	return 0;
 }
 
+/*
+ * We can't mix and match polled and non-polled IO on the same ctx. If this
+ * ctx is already of the appropriate type, we're good. If not and none is
+ * set, mark us that type and return success. If a type is set and it
+ * differs from the new IO, return -EINVAL.
+ */
+static int aio_test_and_mark_type(struct kioctx *ctx, int ctx_type)
+{
+	if (test_bit(ctx_type, &ctx->io_type)) {
+		return 0;
+	} else if (!ctx->io_type) {
+		set_bit(ctx_type, &ctx->io_type);
+		return 0;
+	}
+
+	return -EINVAL;
+}
+
 static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 			 bool compat)
 {
 	struct aio_kiocb *req;
 	struct iocb iocb;
+	int ctx_type;
 	ssize_t ret;
 
 	if (unlikely(copy_from_user(&iocb, user_iocb, sizeof(iocb))))
@@ -1800,7 +2198,15 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 		return -EINVAL;
 	}
 
-	req = aio_get_req(ctx);
+	if (iocb.aio_flags & IOCB_FLAG_HIPRI)
+		ctx_type = CTX_TYPE_POLLED;
+	else
+		ctx_type = CTX_TYPE_NORMAL;
+
+	/*
+	 * Polled IO doesn't need ring reservations
+	 */
+	req = aio_get_req(ctx, ctx_type == CTX_TYPE_NORMAL);
 	if (unlikely(!req))
 		return -EAGAIN;
 
@@ -1830,24 +2236,45 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 
 	switch (iocb.aio_lio_opcode) {
 	case IOCB_CMD_PREAD:
-		ret = aio_read(&req->rw, &iocb, false, compat);
+		ret = aio_test_and_mark_type(ctx, ctx_type);
+		if (ret)
+			break;
+		ret = aio_read(req, &iocb, false, compat);
 		break;
 	case IOCB_CMD_PWRITE:
-		ret = aio_write(&req->rw, &iocb, false, compat);
+		ret = aio_test_and_mark_type(ctx, ctx_type);
+		if (ret)
+			break;
+		ret = aio_write(req, &iocb, false, compat);
 		break;
 	case IOCB_CMD_PREADV:
-		ret = aio_read(&req->rw, &iocb, true, compat);
+		ret = aio_test_and_mark_type(ctx, ctx_type);
+		if (ret)
+			break;
+		ret = aio_read(req, &iocb, true, compat);
 		break;
 	case IOCB_CMD_PWRITEV:
-		ret = aio_write(&req->rw, &iocb, true, compat);
+		ret = aio_test_and_mark_type(ctx, ctx_type);
+		if (ret)
+			break;
+		ret = aio_write(req, &iocb, true, compat);
 		break;
 	case IOCB_CMD_FSYNC:
+		ret = aio_test_and_mark_type(ctx, false);
+		if (ret)
+			break;
 		ret = aio_fsync(&req->fsync, &iocb, false);
 		break;
 	case IOCB_CMD_FDSYNC:
+		ret = aio_test_and_mark_type(ctx, false);
+		if (ret)
+			break;
 		ret = aio_fsync(&req->fsync, &iocb, true);
 		break;
 	case IOCB_CMD_POLL:
+		ret = aio_test_and_mark_type(ctx, false);
+		if (ret)
+			break;
 		ret = aio_poll(req, &iocb);
 		break;
 	default:
@@ -1861,11 +2288,12 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 	 * arranged for that to be done asynchronously.  Anything non-zero
 	 * means that we need to destroy req ourselves.
 	 */
-	if (ret)
-		goto out_put_req;
-	return 0;
+	if (!ret)
+		return 0;
+
 out_put_req:
-	put_reqs_available(ctx, 1);
+	if (ctx_type == CTX_TYPE_NORMAL)
+		put_reqs_available(ctx, 1);
 	percpu_ref_put(&ctx->reqs);
 	if (req->ki_eventfd)
 		eventfd_ctx_put(req->ki_eventfd);
@@ -2043,8 +2471,12 @@ static long do_io_getevents(aio_context_t ctx_id,
 	long ret = -EINVAL;
 
 	if (likely(ioctx)) {
-		if (likely(min_nr <= nr && min_nr >= 0))
-			ret = read_events(ioctx, min_nr, nr, events, until);
+		if (likely(min_nr <= nr && min_nr >= 0)) {
+			if (test_bit(CTX_TYPE_POLLED, &ioctx->io_type))
+				ret = aio_check_polled(ioctx, min_nr, nr, events);
+			else
+				ret = read_events(ioctx, min_nr, nr, events, until);
+		}
 		percpu_ref_put(&ioctx->users);
 	}
 
diff --git a/include/uapi/linux/aio_abi.h b/include/uapi/linux/aio_abi.h
index ce43d340f010..bfa553a2324b 100644
--- a/include/uapi/linux/aio_abi.h
+++ b/include/uapi/linux/aio_abi.h
@@ -50,9 +50,11 @@ enum {
  *
  * IOCB_FLAG_RESFD - Set if the "aio_resfd" member of the "struct iocb"
  *                   is valid.
+ * IOCB_FLAG_HIPRI - Use IO completion polling
  */
 #define IOCB_FLAG_RESFD		(1 << 0)
 #define IOCB_FLAG_IOPRIO	(1 << 1)
+#define IOCB_FLAG_HIPRI		(1 << 2)
 
 /* read() from /dev/aio returns these structures. */
 struct io_event {
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 17+ messages in thread

* [PATCH 5/5] aio: add support for file based polled IO
  2018-11-17 23:53 [PATCHSET 0/5] Support for polled aio Jens Axboe
                   ` (3 preceding siblings ...)
  2018-11-17 23:53 ` [PATCH 4/5] aio: support for IO polling Jens Axboe
@ 2018-11-17 23:53 ` Jens Axboe
  2018-11-19  1:57   ` Dave Chinner
  2018-11-19  8:12   ` Christoph Hellwig
  4 siblings, 2 replies; 17+ messages in thread
From: Jens Axboe @ 2018-11-17 23:53 UTC (permalink / raw)
  To: linux-block, linux-aio, linux-fsdevel; +Cc: Jens Axboe

Needs further work, but this should work fine on normal setups
with a file system on a pollable block device.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/aio.c       | 2 ++
 fs/direct-io.c | 4 +++-
 fs/iomap.c     | 7 +++++--
 3 files changed, 10 insertions(+), 3 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index 500da3ffc376..e02085fe10d7 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -1310,6 +1310,8 @@ static struct block_device *aio_bdev_host(struct kiocb *req)
 
 	if (S_ISBLK(inode->i_mode))
 		return I_BDEV(inode);
+	else if (inode->i_sb && inode->i_sb->s_bdev)
+		return inode->i_sb->s_bdev;
 
 	return NULL;
 }
diff --git a/fs/direct-io.c b/fs/direct-io.c
index a5a4e5a1423e..34de494e9061 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -477,8 +477,10 @@ static inline void dio_bio_submit(struct dio *dio, struct dio_submit *sdio)
 	if (sdio->submit_io) {
 		sdio->submit_io(bio, dio->inode, sdio->logical_offset_in_bio);
 		dio->bio_cookie = BLK_QC_T_NONE;
-	} else
+	} else {
 		dio->bio_cookie = submit_bio(bio);
+		WRITE_ONCE(dio->iocb->ki_blk_qc, dio->bio_cookie);
+	}
 
 	sdio->bio = NULL;
 	sdio->boundary = 0;
diff --git a/fs/iomap.c b/fs/iomap.c
index 74c1f37f0fd6..4cf412b6230a 100644
--- a/fs/iomap.c
+++ b/fs/iomap.c
@@ -1555,6 +1555,7 @@ iomap_dio_zero(struct iomap_dio *dio, struct iomap *iomap, loff_t pos,
 	struct page *page = ZERO_PAGE(0);
 	int flags = REQ_SYNC | REQ_IDLE;
 	struct bio *bio;
+	blk_qc_t qc;
 
 	bio = bio_alloc(GFP_KERNEL, 1);
 	bio_set_dev(bio, iomap->bdev);
@@ -1570,7 +1571,9 @@ iomap_dio_zero(struct iomap_dio *dio, struct iomap *iomap, loff_t pos,
 	bio_set_op_attrs(bio, REQ_OP_WRITE, flags);
 
 	atomic_inc(&dio->ref);
-	return submit_bio(bio);
+	qc = submit_bio(bio);
+	WRITE_ONCE(dio->iocb->ki_blk_qc, qc);
+	return qc;
 }
 
 static loff_t
@@ -1680,7 +1683,7 @@ iomap_dio_bio_actor(struct inode *inode, loff_t pos, loff_t length,
 		atomic_inc(&dio->ref);
 
 		dio->submit.last_queue = bdev_get_queue(iomap->bdev);
-		dio->submit.cookie = submit_bio(bio);
+		dio->iocb->ki_blk_qc = dio->submit.cookie = submit_bio(bio);
 	} while (nr_pages);
 
 	if (need_zeroout) {
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 17+ messages in thread

* Re: [PATCH 5/5] aio: add support for file based polled IO
  2018-11-17 23:53 ` [PATCH 5/5] aio: add support for file based polled IO Jens Axboe
@ 2018-11-19  1:57   ` Dave Chinner
  2018-11-19  2:58     ` Jens Axboe
  2018-11-19  8:12   ` Christoph Hellwig
  1 sibling, 1 reply; 17+ messages in thread
From: Dave Chinner @ 2018-11-19  1:57 UTC (permalink / raw)
  To: Jens Axboe; +Cc: linux-block, linux-aio, linux-fsdevel

On Sat, Nov 17, 2018 at 04:53:17PM -0700, Jens Axboe wrote:
> Needs further work, but this should work fine on normal setups
> with a file system on a pollable block device.

What should work fine? I've got no idea what this patch is actually
providing....

> Signed-off-by: Jens Axboe <axboe@kernel.dk>
> ---
>  fs/aio.c       | 2 ++
>  fs/direct-io.c | 4 +++-
>  fs/iomap.c     | 7 +++++--
>  3 files changed, 10 insertions(+), 3 deletions(-)
> 
> diff --git a/fs/aio.c b/fs/aio.c
> index 500da3ffc376..e02085fe10d7 100644
> --- a/fs/aio.c
> +++ b/fs/aio.c
> @@ -1310,6 +1310,8 @@ static struct block_device *aio_bdev_host(struct kiocb *req)
>  
>  	if (S_ISBLK(inode->i_mode))
>  		return I_BDEV(inode);
> +	else if (inode->i_sb && inode->i_sb->s_bdev)
> +		return inode->i_sb->s_bdev;

XFS might be doing AIO to files on real-time device, not
inode->i_sb->s_bdev. So this may well be the wrong block device
for the IO being submitted.

>  
>  	return NULL;
>  }
> diff --git a/fs/direct-io.c b/fs/direct-io.c
> index a5a4e5a1423e..34de494e9061 100644
> --- a/fs/direct-io.c
> +++ b/fs/direct-io.c
> @@ -477,8 +477,10 @@ static inline void dio_bio_submit(struct dio *dio, struct dio_submit *sdio)
>  	if (sdio->submit_io) {
>  		sdio->submit_io(bio, dio->inode, sdio->logical_offset_in_bio);
>  		dio->bio_cookie = BLK_QC_T_NONE;
> -	} else
> +	} else {
>  		dio->bio_cookie = submit_bio(bio);
> +		WRITE_ONCE(dio->iocb->ki_blk_qc, dio->bio_cookie);
> +	}
>  
>  	sdio->bio = NULL;
>  	sdio->boundary = 0;
> diff --git a/fs/iomap.c b/fs/iomap.c
> index 74c1f37f0fd6..4cf412b6230a 100644
> --- a/fs/iomap.c
> +++ b/fs/iomap.c
> @@ -1555,6 +1555,7 @@ iomap_dio_zero(struct iomap_dio *dio, struct iomap *iomap, loff_t pos,
>  	struct page *page = ZERO_PAGE(0);
>  	int flags = REQ_SYNC | REQ_IDLE;
>  	struct bio *bio;
> +	blk_qc_t qc;
>  
>  	bio = bio_alloc(GFP_KERNEL, 1);
>  	bio_set_dev(bio, iomap->bdev);
> @@ -1570,7 +1571,9 @@ iomap_dio_zero(struct iomap_dio *dio, struct iomap *iomap, loff_t pos,
>  	bio_set_op_attrs(bio, REQ_OP_WRITE, flags);
>  
>  	atomic_inc(&dio->ref);
> -	return submit_bio(bio);
> +	qc = submit_bio(bio);
> +	WRITE_ONCE(dio->iocb->ki_blk_qc, qc);
> +	return qc;
>  }

Why is this added to sub-block zeroing IO calls? It gets overwritten
by the data IO submission, so this value is going to change as the
IO progresses. What does making these partial IOs visible provide,
especially as they then get overwritten by the next submissions?
Indeed, how does one wait on all IOs in the DIO to complete if we
are only tracking one of many?

Cheers,

Dave.
-- 
Dave Chinner
david@fromorbit.com

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [PATCH 3/5] aio: add iocb->ki_blk_qc field
  2018-11-17 23:53 ` [PATCH 3/5] aio: add iocb->ki_blk_qc field Jens Axboe
@ 2018-11-19  1:59   ` Dave Chinner
  2018-11-19  2:59     ` Jens Axboe
  0 siblings, 1 reply; 17+ messages in thread
From: Dave Chinner @ 2018-11-19  1:59 UTC (permalink / raw)
  To: Jens Axboe; +Cc: linux-block, linux-aio, linux-fsdevel

On Sat, Nov 17, 2018 at 04:53:15PM -0700, Jens Axboe wrote:
> Add the field and have the blockdev direct_IO() helpers set it.
> This is in preparation for being able to poll for iocb completion.

What purpose does this field serve?

And why add it to just blockdev direct_IO and not the other direct
IO paths, too?

Cheers,

Dave.
-- 
Dave Chinner
david@fromorbit.com

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [PATCH 5/5] aio: add support for file based polled IO
  2018-11-19  1:57   ` Dave Chinner
@ 2018-11-19  2:58     ` Jens Axboe
  0 siblings, 0 replies; 17+ messages in thread
From: Jens Axboe @ 2018-11-19  2:58 UTC (permalink / raw)
  To: Dave Chinner; +Cc: linux-block, linux-aio, linux-fsdevel

On 11/18/18 6:57 PM, Dave Chinner wrote:
> On Sat, Nov 17, 2018 at 04:53:17PM -0700, Jens Axboe wrote:
>> Needs further work, but this should work fine on normal setups
>> with a file system on a pollable block device.
> 
> What should work fine? I've got no idea what this patch is actually
> providing....

Sorry, should have made it more clear that this one is just a test
patch to enable polled aio for files, it's not supposed to be
considered complete.

>> Signed-off-by: Jens Axboe <axboe@kernel.dk>
>> ---
>>  fs/aio.c       | 2 ++
>>  fs/direct-io.c | 4 +++-
>>  fs/iomap.c     | 7 +++++--
>>  3 files changed, 10 insertions(+), 3 deletions(-)
>>
>> diff --git a/fs/aio.c b/fs/aio.c
>> index 500da3ffc376..e02085fe10d7 100644
>> --- a/fs/aio.c
>> +++ b/fs/aio.c
>> @@ -1310,6 +1310,8 @@ static struct block_device *aio_bdev_host(struct kiocb *req)
>>  
>>  	if (S_ISBLK(inode->i_mode))
>>  		return I_BDEV(inode);
>> +	else if (inode->i_sb && inode->i_sb->s_bdev)
>> +		return inode->i_sb->s_bdev;
> 
> XFS might be doing AIO to files on real-time device, not
> inode->i_sb->s_bdev. So this may well be the wrong block device
> for the IO being submitted.

See patch description, XFS doing AIO on a real-time device is not a
"normal setup".

It doesn't work for btrfs either, for instance. As far as I can tell, there
are a few routes that we can go here:

1) Have an fs ops that returns the bdev.
2) Set it after submit time, like we do in other spots. This won't work
   if we straddle devices.

I don't feel that strongly about it, and frankly I'm fine with just
dropping non-bdev support for now. The patch is provided for testing
for the 99% of use cases, which is a file system on top of a single
device.

>> diff --git a/fs/iomap.c b/fs/iomap.c
>> index 74c1f37f0fd6..4cf412b6230a 100644
>> --- a/fs/iomap.c
>> +++ b/fs/iomap.c
>> @@ -1555,6 +1555,7 @@ iomap_dio_zero(struct iomap_dio *dio, struct iomap *iomap, loff_t pos,
>>  	struct page *page = ZERO_PAGE(0);
>>  	int flags = REQ_SYNC | REQ_IDLE;
>>  	struct bio *bio;
>> +	blk_qc_t qc;
>>  
>>  	bio = bio_alloc(GFP_KERNEL, 1);
>>  	bio_set_dev(bio, iomap->bdev);
>> @@ -1570,7 +1571,9 @@ iomap_dio_zero(struct iomap_dio *dio, struct iomap *iomap, loff_t pos,
>>  	bio_set_op_attrs(bio, REQ_OP_WRITE, flags);
>>  
>>  	atomic_inc(&dio->ref);
>> -	return submit_bio(bio);
>> +	qc = submit_bio(bio);
>> +	WRITE_ONCE(dio->iocb->ki_blk_qc, qc);
>> +	return qc;
>>  }
> 
> Why is this added to sub-block zeroing IO calls? It gets overwritten
> by the data IO submission, so this value is going to change as the
> IO progresses. What does making these partial IOs visible provide,
> especially as they then get overwritten by the next submissions?
> Indeed, how does one wait on all IOs in the DIO to complete if we
> are only tracking one of many?

That does look like a mistake, the zeroing should just be ignored.
We only care about the actual IO.

-- 
Jens Axboe

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [PATCH 3/5] aio: add iocb->ki_blk_qc field
  2018-11-19  1:59   ` Dave Chinner
@ 2018-11-19  2:59     ` Jens Axboe
  0 siblings, 0 replies; 17+ messages in thread
From: Jens Axboe @ 2018-11-19  2:59 UTC (permalink / raw)
  To: Dave Chinner; +Cc: linux-block, linux-aio, linux-fsdevel

On 11/18/18 6:59 PM, Dave Chinner wrote:
> On Sat, Nov 17, 2018 at 04:53:15PM -0700, Jens Axboe wrote:
>> Add the field and have the blockdev direct_IO() helpers set it.
>> This is in preparation for being able to poll for iocb completion.
> 
> What purpose does this field serve?

It's for later lookup to find the queue to poll on.

> And why add it to just blockdev direct_IO and not the other direct
> IO paths, too?

Only block devices support polling.

-- 
Jens Axboe

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [PATCH 1/5] aio: use assigned completion handler
  2018-11-17 23:53 ` [PATCH 1/5] aio: use assigned completion handler Jens Axboe
@ 2018-11-19  8:06   ` Christoph Hellwig
  0 siblings, 0 replies; 17+ messages in thread
From: Christoph Hellwig @ 2018-11-19  8:06 UTC (permalink / raw)
  To: Jens Axboe; +Cc: linux-block, linux-aio, linux-fsdevel

On Sat, Nov 17, 2018 at 04:53:13PM -0700, Jens Axboe wrote:
> We know this is a read/write request, but in preparation for
> having different kinds of those, ensure that we call the assigned
> handler instead of assuming it's aio_complete_rq().
> 
> Signed-off-by: Jens Axboe <axboe@kernel.dk>

Looks good,

Reviewed-by: Christoph Hellwig <hch@lst.de>

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [PATCH 2/5] aio: fix failure to put the file pointer
  2018-11-17 23:53 ` [PATCH 2/5] aio: fix failure to put the file pointer Jens Axboe
@ 2018-11-19  8:07   ` Christoph Hellwig
  2018-11-19 15:39     ` Jens Axboe
  0 siblings, 1 reply; 17+ messages in thread
From: Christoph Hellwig @ 2018-11-19  8:07 UTC (permalink / raw)
  To: Jens Axboe; +Cc: linux-block, linux-aio, linux-fsdevel

On Sat, Nov 17, 2018 at 04:53:14PM -0700, Jens Axboe wrote:
> If the ioprio capability check fails, we return without putting
> the file pointer.
> 
> Fixes: d9a08a9e616b ("fs: Add aio iopriority support")
> Signed-off-by: Jens Axboe <axboe@kernel.dk>

Looks good.  Please also send it to Al so that it can go into 4.20
and -stable.

Reviewed-by: Christoph Hellwig <hch@lst.de>

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [PATCH 4/5] aio: support for IO polling
  2018-11-17 23:53 ` [PATCH 4/5] aio: support for IO polling Jens Axboe
@ 2018-11-19  8:11   ` Christoph Hellwig
  2018-11-19 13:32     ` Christoph Hellwig
  0 siblings, 1 reply; 17+ messages in thread
From: Christoph Hellwig @ 2018-11-19  8:11 UTC (permalink / raw)
  To: Jens Axboe; +Cc: linux-block, linux-aio, linux-fsdevel

[-- Attachment #1: Type: text/plain, Size: 648 bytes --]

I like this idea, but there are a couple issues here.

First the flag per command really doesn't work - we need a creation
time flag.  Unfortunately the existing io_setup system call doesn't
take flags, so we'll need to add a new one.

Second we need a check that the polling mode is actually supported
for a given file descriptor.

Third this hardcodes block device knowlege into the block layer. We
really need move it into a method.   See my third attached patch
for an (untested) idea how to make that work.  The first two are
just cleanups.

Fourth we already have aio ->poll support.  So this needs a different
naming scheme, e.g. *iopoll*.


[-- Attachment #2: 0001-aio-split-get_reqs_available-from-aio_get_req.patch --]
[-- Type: text/plain, Size: 3103 bytes --]

>From 8b0d8f2e723bcf52d010c46130eb759770a0dc11 Mon Sep 17 00:00:00 2001
From: Christoph Hellwig <hch@lst.de>
Date: Mon, 19 Nov 2018 08:13:19 +0100
Subject: aio: split get_reqs_available from aio_get_req

This makes the polled case nice to handle, and matches the put side.

Signed-off-by: Christoph Hellwig <hch@lst.de>
---
 fs/aio.c | 41 ++++++++++++++++++++++-------------------
 1 file changed, 22 insertions(+), 19 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index e02085fe10d7..348f04129035 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -935,7 +935,7 @@ static void put_reqs_available(struct kioctx *ctx, unsigned nr)
 	local_irq_restore(flags);
 }
 
-static bool get_reqs_available(struct kioctx *ctx)
+static bool __get_reqs_available(struct kioctx *ctx)
 {
 	struct kioctx_cpu *kcpu;
 	bool ret = false;
@@ -1027,23 +1027,25 @@ static void user_refill_reqs_available(struct kioctx *ctx)
 	spin_unlock_irq(&ctx->completion_lock);
 }
 
+static bool get_reqs_available(struct kioctx *ctx)
+{
+	if (__get_reqs_available(ctx))
+		return true;
+	user_refill_reqs_available(ctx);
+	return __get_reqs_available(ctx);
+}
+
 /* aio_get_req
  *	Allocate a slot for an aio request.
  * Returns NULL if no requests are free.
  */
-static inline struct aio_kiocb *aio_get_req(struct kioctx *ctx, bool needs_ring)
+static inline struct aio_kiocb *aio_get_req(struct kioctx *ctx)
 {
 	struct aio_kiocb *req;
 
-	if (needs_ring && !get_reqs_available(ctx)) {
-		user_refill_reqs_available(ctx);
-		if (!get_reqs_available(ctx))
-			return NULL;
-	}
-
 	req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
 	if (unlikely(!req))
-		goto out_put;
+		return NULL;
 
 	percpu_ref_get(&ctx->reqs);
 	INIT_LIST_HEAD(&req->ki_list);
@@ -1051,10 +1053,6 @@ static inline struct aio_kiocb *aio_get_req(struct kioctx *ctx, bool needs_ring)
 	refcount_set(&req->ki_refcnt, 0);
 	req->ki_ctx = ctx;
 	return req;
-out_put:
-	if (needs_ring)
-		put_reqs_available(ctx, 1);
-	return NULL;
 }
 
 static struct kioctx *lookup_ioctx(unsigned long ctx_id)
@@ -2200,17 +2198,21 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 		return -EINVAL;
 	}
 
-	if (iocb.aio_flags & IOCB_FLAG_HIPRI)
+	if (iocb.aio_flags & IOCB_FLAG_HIPRI) {
 		ctx_type = CTX_TYPE_POLLED;
-	else
+	} else {
 		ctx_type = CTX_TYPE_NORMAL;
+		if (!get_reqs_available(ctx))
+			return -EAGAIN;
+	}
 
 	/*
 	 * Polled IO doesn't need ring reservations
 	 */
-	req = aio_get_req(ctx, ctx_type == CTX_TYPE_NORMAL);
+	ret = -EAGAIN;
+	req = aio_get_req(ctx);
 	if (unlikely(!req))
-		return -EAGAIN;
+		goto out_put_reqs_available;
 
 	if (iocb.aio_flags & IOCB_FLAG_RESFD) {
 		/*
@@ -2294,12 +2296,13 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 		return 0;
 
 out_put_req:
-	if (ctx_type == CTX_TYPE_NORMAL)
-		put_reqs_available(ctx, 1);
 	percpu_ref_put(&ctx->reqs);
 	if (req->ki_eventfd)
 		eventfd_ctx_put(req->ki_eventfd);
 	kmem_cache_free(kiocb_cachep, req);
+out_put_reqs_available:
+	if (ctx_type == CTX_TYPE_NORMAL)
+		put_reqs_available(ctx, 1);
 	return ret;
 }
 
-- 
2.19.1


[-- Attachment #3: 0002-cleanup-the-aio_poll_reap-related-flow.patch --]
[-- Type: text/plain, Size: 6506 bytes --]

>From a42c0af6c96b47d9b915e4efdaa0211e9b6b5253 Mon Sep 17 00:00:00 2001
From: Christoph Hellwig <hch@lst.de>
Date: Sun, 18 Nov 2018 16:24:22 +0100
Subject: cleanup the aio_poll_reap related flow

Should not change behavior except for fixing a bug where the number
of returned iocbs was incorrectly overwritten when we actually loop
on poll_done for the first call.

I don't really understand why we loop there but not the second time
we call it.  Nor do I really understand the nested loop in the callers
of __aio_check_polled, but that is for another time..
---
 fs/aio.c | 152 ++++++++++++++++++-------------------------------------
 1 file changed, 50 insertions(+), 102 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index 348f04129035..d9198f99ed97 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -1091,7 +1091,7 @@ static inline void iocb_put(struct aio_kiocb *iocb)
 	}
 }
 
-static void iocb_put_many(struct kioctx *ctx, void  **iocbs, int nr)
+static void iocb_put_many(struct kioctx *ctx, void **iocbs, int nr)
 {
 	if (nr) {
 		kmem_cache_free_bulk(kiocb_cachep, nr, iocbs);
@@ -1316,42 +1316,33 @@ static struct block_device *aio_bdev_host(struct kiocb *req)
 
 #define AIO_POLL_STACK	8
 
-struct aio_poll_data {
-	struct io_event __user *evs;
-	int off;
-	long max;
-	void *iocbs[AIO_POLL_STACK];
-	int to_free;
-};
-
 /*
- * Process the done_list of iocbs, copy to user space, and free them.
- * Migh return with data->iocbs holding entries, in which case
- * data->to_free is non-zero and the caller should free them.
+ * Process the done_list of iocbs, copy to user space, and free them.  Might
+ * return with iocbs holding entries, in which case *to_free is non-zero and
+ * the caller should free them.
  */
-static long aio_poll_reap(struct kioctx *ctx, struct aio_poll_data *data)
+static long aio_poll_reap(struct kioctx *ctx, struct io_event __user *evs,
+		int off, long max, void **iocbs, int *to_free)
 	__releases(&ctx->poll_lock)
 	__acquires(&ctx->poll_lock)
 {
 	struct aio_kiocb *iocb;
 	int ret, nr = 0;
 
-restart:
-	while (!list_empty(&ctx->poll_done)) {
+	while ((iocb = list_first_entry_or_null(&ctx->poll_done,
+			struct aio_kiocb, ki_poll_list))) {
 		struct io_event __user *uev;
 		struct io_event ev;
 
-		if (data->to_free == ARRAY_SIZE(data->iocbs)) {
-			iocb_put_many(ctx, data->iocbs, data->to_free);
-			data->to_free = 0;
+		if (*to_free == AIO_POLL_STACK) {
+			iocb_put_many(ctx, iocbs, *to_free);
+			*to_free = 0;
 		}
 
-		iocb = list_first_entry(&ctx->poll_done, struct aio_kiocb,
-						ki_poll_list);
 		list_del(&iocb->ki_poll_list);
+		iocbs[*to_free++] = iocb;
 
-		data->iocbs[data->to_free++] = iocb;
-		if (!data->evs) {
+		if (!evs) {
 			nr++;
 			continue;
 		}
@@ -1361,65 +1352,26 @@ static long aio_poll_reap(struct kioctx *ctx, struct aio_poll_data *data)
 		ev.res = iocb->ki_poll_res;
 		ev.res2 = iocb->ki_poll_res2;
 
-		uev = data->evs + nr + data->off;
-		if (!__copy_to_user_inatomic(uev, &ev, sizeof(*uev))) {
-			nr++;
-			if (nr + data->off < data->max)
-				continue;
-			break;
+		uev = evs + nr + off;
+		if (unlikely(__copy_to_user_inatomic(uev, &ev, sizeof(*uev)))) {
+			/*
+			 * Unexpected slow path, drop lock and attempt copy
+			 * again.  If this also fails we are done.
+			 */
+			spin_unlock_irq(&ctx->poll_lock);
+			ret = copy_to_user(uev, &ev, sizeof(*uev));
+			spin_lock_irq(&ctx->poll_lock);
+			if (ret)
+				return nr ? nr : -EFAULT;
 		}
 
-		/*
-		 * Unexpected slow path, drop lock and attempt copy. If this
-		 * also fails, we're done. If it worked, we got another event
-		 * and we restart the list check since we dropped the lock.
-		 */
-		spin_unlock_irq(&ctx->poll_lock);
-		ret = copy_to_user(uev, &ev, sizeof(*uev));
-		spin_lock_irq(&ctx->poll_lock);
-		if (!ret) {
-			nr++;
-			if (nr + data->off < data->max)
-				goto restart;
-
+		if (++nr + off == max)
 			break;
-		}
-
-		if (!nr)
-			nr = -EFAULT;
-		break;
 	}
 
 	return nr;
 }
 
-/*
- * Reap done events, if any
- */
-static long aio_poll_find(struct kioctx *ctx, struct io_event __user *evs,
-			  int off, long max)
-{
-	struct aio_poll_data data = {
-		.evs		= evs,
-		.off		= off,
-		.max		= max,
-		.to_free	= 0
-	};
-	int ret;
-
-	if (list_empty_careful(&ctx->poll_done))
-		return 0;
-
-	spin_lock_irq(&ctx->poll_lock);
-	ret = aio_poll_reap(ctx, &data);
-	spin_unlock_irq(&ctx->poll_lock);
-
-	if (data.to_free)
-		iocb_put_many(ctx, data.iocbs, data.to_free);
-
-	return ret;
-}
-
 static void aio_poll_for_events(struct kioctx *ctx, struct aio_iopoll_data *pd,
 				unsigned int nr_pd, int off, long min, long max)
 {
@@ -1448,42 +1400,32 @@ static int __aio_check_polled(struct kioctx *ctx, struct io_event __user *event,
 			      int off, unsigned int *entries, long min, long max)
 {
 	struct aio_iopoll_data pd[AIO_POLL_STACK];
+	void *iocbs[AIO_POLL_STACK];
+	int to_free = 0;
 	struct aio_kiocb *iocb;
 	unsigned int nr_pd;
-	int ret, pre = 0;
+	int ret, found = 0;
 
 	if (list_empty_careful(&ctx->poll_pending))
 		goto out;
 
-	spin_lock_irq(&ctx->poll_lock);
-
 	/*
 	 * Check if we already have done events that satisfy what we need
 	 */
-	while (!list_empty(&ctx->poll_done)) {
-		struct aio_poll_data data = {
-			.evs = event,
-			.off = off,
-			.max = max,
-			.to_free = 0
-		};
-
-		ret = aio_poll_reap(ctx, &data);
-		if (!ret)
-			break;
-		else if (ret < 0 || ret + off >= min) {
+	spin_lock_irq(&ctx->poll_lock);
+	while ((ret = aio_poll_reap(ctx, event, off, max, iocbs, &to_free))) {
+		if (ret < 0 || ret + off >= min) {
 			spin_unlock_irq(&ctx->poll_lock);
-
-			if (data.to_free)
-				iocb_put_many(ctx, data.iocbs, data.to_free);
-
+			if (to_free)
+				iocb_put_many(ctx, iocbs, to_free);
 			return ret;
 		}
 
-		if (data.to_free)
-			iocb_put_many(ctx, data.iocbs, data.to_free);
-
-		pre = ret;
+		if (to_free) {
+			iocb_put_many(ctx, iocbs, to_free);
+			to_free = 0;
+		}
+		found += ret;
 		off += ret;
 	}
 
@@ -1518,13 +1460,19 @@ static int __aio_check_polled(struct kioctx *ctx, struct io_event __user *event,
 	}
 
 out:
-	ret = aio_poll_find(ctx, event, off, max);
-	if (ret >= 0)
-		return pre + ret;
-	else if (pre)
-		return pre;
+	if (!list_empty_careful(&ctx->poll_done)) {
+		spin_lock_irq(&ctx->poll_lock);
+		ret = aio_poll_reap(ctx, event, off, max, iocbs, &to_free);
+		spin_unlock_irq(&ctx->poll_lock);
+	
+		if (to_free)
+			iocb_put_many(ctx, iocbs, to_free);
+		if (ret < 0)
+			return ret;
+		found += ret;
+	}
 
-	return ret;
+	return found;
 }
 
 /*
-- 
2.19.1


[-- Attachment #4: 0003-use-array-of-iocbs.patch --]
[-- Type: text/plain, Size: 14316 bytes --]

>From f44b92fc87f2f83946af12db9faea5916016a486 Mon Sep 17 00:00:00 2001
From: Christoph Hellwig <hch@lst.de>
Date: Sun, 18 Nov 2018 17:17:55 +0100
Subject: use array of iocbs

XXX: will need some protection against concurrent reaps
---
 fs/aio.c              | 121 +++++++++++++-----------------------------
 fs/block_dev.c        |  20 +++++--
 fs/direct-io.c        |   4 +-
 fs/iomap.c            |  53 +++++++++++-------
 fs/xfs/xfs_file.c     |   1 +
 include/linux/fs.h    |   2 +-
 include/linux/iomap.h |   1 +
 7 files changed, 90 insertions(+), 112 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index d9198f99ed97..cb2fead2ab7c 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -89,6 +89,9 @@ struct ctx_rq_wait {
 enum {
 	CTX_TYPE_NORMAL = 0,
 	CTX_TYPE_POLLED,
+
+	/* currently undergoing a polling io_getevents */
+	CTX_TYPE_POLLING,
 };
 
 struct kioctx {
@@ -1282,38 +1285,6 @@ static bool aio_read_events(struct kioctx *ctx, long min_nr, long nr,
 	return ret < 0 || *i >= min_nr;
 }
 
-struct aio_iopoll_data {
-	unsigned int blk_qc;
-	struct block_device *bdev;
-};
-
-static int aio_io_poll(struct aio_iopoll_data *pd, bool wait)
-{
-#ifdef CONFIG_BLOCK
-	/*
-	 * Should only happen if someone sets ->ki_blk_qc at random,
-	 * not being a blockdev target. We'll just ignore it, the IO
-	 * will complete normally without being polled.
-	 */
-	if (pd->bdev)
-		return blk_poll(bdev_get_queue(pd->bdev), pd->blk_qc, wait);
-#endif
-
-	return 0;
-}
-
-static struct block_device *aio_bdev_host(struct kiocb *req)
-{
-	struct inode *inode = req->ki_filp->f_mapping->host;
-
-	if (S_ISBLK(inode->i_mode))
-		return I_BDEV(inode);
-	else if (inode->i_sb && inode->i_sb->s_bdev)
-		return inode->i_sb->s_bdev;
-
-	return NULL;
-}
-
 #define AIO_POLL_STACK	8
 
 /*
@@ -1372,39 +1343,12 @@ static long aio_poll_reap(struct kioctx *ctx, struct io_event __user *evs,
 	return nr;
 }
 
-static void aio_poll_for_events(struct kioctx *ctx, struct aio_iopoll_data *pd,
-				unsigned int nr_pd, int off, long min, long max)
-{
-	int i, polled = 0;
-
-	/*
-	 * Poll for needed events with wait == true, anything
-	 * after that we just check if we have more, up to max.
-	 */
-	for (i = 0; i < nr_pd; i++) {
-		bool wait = polled + off >= min;
-
-		polled += aio_io_poll(&pd[i], wait);
-		if (polled + off >= max)
-			break;
-
-		/*
-		 * If we have entries waiting to be reaped, stop polling
-		 */
-		if (!list_empty_careful(&ctx->poll_done))
-			break;
-	}
-}
-
 static int __aio_check_polled(struct kioctx *ctx, struct io_event __user *event,
-			      int off, unsigned int *entries, long min, long max)
+			      int off, unsigned int *to_poll, long min, long max)
 {
-	struct aio_iopoll_data pd[AIO_POLL_STACK];
 	void *iocbs[AIO_POLL_STACK];
-	int to_free = 0;
 	struct aio_kiocb *iocb;
-	unsigned int nr_pd;
-	int ret, found = 0;
+	int to_free = 0, found = 0, polled = 0, ret, i;
 
 	if (list_empty_careful(&ctx->poll_pending))
 		goto out;
@@ -1433,30 +1377,27 @@ static int __aio_check_polled(struct kioctx *ctx, struct io_event __user *event,
 	 * Find up to 'max_nr' worth of events to poll for, including the
 	 * events we already successfully polled
 	 */
-	nr_pd = 0;
 	list_for_each_entry(iocb, &ctx->poll_pending, ki_poll_list) {
-		struct kiocb *kiocb = &iocb->rw;
-		blk_qc_t qc;
-
-		/*
-		 * Not submitted yet, don't poll for it
-		 */
-		qc = READ_ONCE(kiocb->ki_blk_qc);
-		if (qc == BLK_QC_T_NONE)
-			continue;
-
-		pd[nr_pd].blk_qc = qc;
-		pd[nr_pd].bdev = aio_bdev_host(kiocb);
-
-		++nr_pd;
-		if (nr_pd == ARRAY_SIZE(pd) || nr_pd + off >= max)
+		iocbs[*to_poll] = iocb;
+		(*to_poll)++;
+		if (*to_poll == AIO_POLL_STACK || *to_poll + off >= max)
 			break;
 	}
 	spin_unlock_irq(&ctx->poll_lock);
 
-	if (nr_pd) {
-		*entries = nr_pd;
-		aio_poll_for_events(ctx, pd, nr_pd, off, min, max);
+	/*
+	 * Poll for needed events with wait == true, anything after that we just
+	 * check if we have more, up to max.
+	 *
+	 * If we have entries waiting to be reaped, stop polling
+	 */
+	for (i = 0; i < *to_poll; i++) {
+		bool wait = polled + off >= min;
+
+		iocb = iocbs[*to_poll];
+		polled += iocb->rw.ki_filp->f_op->iopoll(&iocb->rw, wait);
+		if (polled + off >= max || !list_empty_careful(&ctx->poll_done))
+			break;
 	}
 
 out:
@@ -1502,6 +1443,10 @@ static int aio_check_polled(struct kioctx *ctx, long min_nr, long nr,
 	unsigned int found;
 	int this, ret = 0;
 
+	/* We can only allow a single thread to poll a context at a time */
+	if (test_and_set_bit(CTX_TYPE_POLLING, &ctx->io_type))
+		return -EBUSY;
+
 	if (!access_ok(VERIFY_WRITE, event, nr * sizeof(*event)))
 		return -EFAULT;
 
@@ -1522,6 +1467,7 @@ static int aio_check_polled(struct kioctx *ctx, long min_nr, long nr,
 		ret += this;
 	} while (found && ret < min_nr);
 
+	clear_bit(CTX_TYPE_POLLING, &ctx->io_type);
 	return ret;
 }
 
@@ -1737,14 +1683,19 @@ static int aio_prep_rw(struct aio_kiocb *kiocb, struct iocb *iocb)
 	if (iocb->aio_flags & IOCB_FLAG_HIPRI) {
 		struct kioctx *ctx = kiocb->ki_ctx;
 
+		ret = -EOPNOTSUPP;
+		if (!(req->ki_flags & IOCB_DIRECT) ||
+		    !req->ki_filp->f_op->iopoll)
+			goto out_fput;
+
 		req->ki_flags |= IOCB_HIPRI;
-		req->ki_blk_qc = BLK_QC_T_NONE;
 		req->ki_complete = aio_complete_rw_poll;
 
 		spin_lock_irq(&ctx->poll_lock);
 		list_add_tail(&kiocb->ki_poll_list, &ctx->poll_pending);
 		spin_unlock_irq(&ctx->poll_lock);
 	} else {
+		req->ki_flags &= ~IOCB_HIPRI;
 		req->ki_complete = aio_complete_rw;
 	}
 
@@ -1761,8 +1712,7 @@ static int aio_prep_rw(struct aio_kiocb *kiocb, struct iocb *iocb)
 		ret = ioprio_check_cap(iocb->aio_reqprio);
 		if (ret) {
 			pr_debug("aio ioprio check cap error: %d\n", ret);
-			fput(req->ki_filp);
-			return ret;
+			goto out_fput;
 		}
 
 		req->ki_ioprio = iocb->aio_reqprio;
@@ -1771,7 +1721,10 @@ static int aio_prep_rw(struct aio_kiocb *kiocb, struct iocb *iocb)
 
 	ret = kiocb_set_rw_flags(req, iocb->aio_rw_flags);
 	if (unlikely(ret))
-		fput(req->ki_filp);
+		goto out_fput;
+	return 0;
+out_fput:
+	fput(req->ki_filp);
 	return ret;
 }
 
diff --git a/fs/block_dev.c b/fs/block_dev.c
index 8a2fed18e3fc..8ba58e280ac6 100644
--- a/fs/block_dev.c
+++ b/fs/block_dev.c
@@ -236,7 +236,6 @@ __blkdev_direct_IO_simple(struct kiocb *iocb, struct iov_iter *iter,
 		bio.bi_opf |= REQ_HIPRI;
 
 	qc = submit_bio(&bio);
-	WRITE_ONCE(iocb->ki_blk_qc, qc);
 	for (;;) {
 		__set_current_state(TASK_UNINTERRUPTIBLE);
 
@@ -274,6 +273,7 @@ struct blkdev_dio {
 	};
 	size_t			size;
 	atomic_t		ref;
+	blk_qc_t		qc;
 	bool			multi_bio : 1;
 	bool			should_dirty : 1;
 	bool			is_sync : 1;
@@ -282,6 +282,14 @@ struct blkdev_dio {
 
 static struct bio_set blkdev_dio_pool;
 
+static bool blkdev_iopoll(struct kiocb *kiocb, bool wait)
+{
+	struct blkdev_dio *dio = kiocb->private;
+	struct block_device *bdev = I_BDEV(kiocb->ki_filp->f_mapping->host);
+
+	return blk_poll(bdev_get_queue(bdev), READ_ONCE(dio->qc), wait);
+}
+
 static void blkdev_bio_end_io(struct bio *bio)
 {
 	struct blkdev_dio *dio = bio->bi_private;
@@ -336,7 +344,6 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages)
 	bool is_poll = (iocb->ki_flags & IOCB_HIPRI) != 0;
 	bool is_read = (iov_iter_rw(iter) == READ), is_sync;
 	loff_t pos = iocb->ki_pos;
-	blk_qc_t qc = BLK_QC_T_NONE;
 	int ret = 0;
 
 	if ((pos | iov_iter_alignment(iter)) &
@@ -356,6 +363,9 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages)
 	dio->size = 0;
 	dio->multi_bio = false;
 	dio->should_dirty = is_read && iter_is_iovec(iter);
+	dio->qc = BLK_QC_T_NONE;
+
+	iocb->private = dio;
 
 	/*
 	 * Don't plug for HIPRI/polled IO, as those should go straight
@@ -396,8 +406,7 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages)
 			if (iocb->ki_flags & IOCB_HIPRI)
 				bio->bi_opf |= REQ_HIPRI;
 
-			qc = submit_bio(bio);
-			WRITE_ONCE(iocb->ki_blk_qc, qc);
+			WRITE_ONCE(dio->qc, submit_bio(bio));
 			break;
 		}
 
@@ -425,7 +434,7 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages)
 			break;
 
 		if (!(iocb->ki_flags & IOCB_HIPRI) ||
-		    !blk_poll(bdev_get_queue(bdev), qc, true))
+		    !blk_poll(bdev_get_queue(bdev), dio->qc, true))
 			io_schedule();
 	}
 	__set_current_state(TASK_RUNNING);
@@ -2063,6 +2072,7 @@ const struct file_operations def_blk_fops = {
 	.llseek		= block_llseek,
 	.read_iter	= blkdev_read_iter,
 	.write_iter	= blkdev_write_iter,
+	.iopoll		= blkdev_iopoll,
 	.mmap		= generic_file_mmap,
 	.fsync		= blkdev_fsync,
 	.unlocked_ioctl	= block_ioctl,
diff --git a/fs/direct-io.c b/fs/direct-io.c
index 34de494e9061..a5a4e5a1423e 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -477,10 +477,8 @@ static inline void dio_bio_submit(struct dio *dio, struct dio_submit *sdio)
 	if (sdio->submit_io) {
 		sdio->submit_io(bio, dio->inode, sdio->logical_offset_in_bio);
 		dio->bio_cookie = BLK_QC_T_NONE;
-	} else {
+	} else
 		dio->bio_cookie = submit_bio(bio);
-		WRITE_ONCE(dio->iocb->ki_blk_qc, dio->bio_cookie);
-	}
 
 	sdio->bio = NULL;
 	sdio->boundary = 0;
diff --git a/fs/iomap.c b/fs/iomap.c
index 4cf412b6230a..e5cd9dbe78a8 100644
--- a/fs/iomap.c
+++ b/fs/iomap.c
@@ -1419,14 +1419,14 @@ struct iomap_dio {
 	unsigned		flags;
 	int			error;
 	bool			wait_for_completion;
+	blk_qc_t		cookie;
+	struct request_queue	*last_queue;
 
 	union {
 		/* used during submission and for synchronous completion: */
 		struct {
 			struct iov_iter		*iter;
 			struct task_struct	*waiter;
-			struct request_queue	*last_queue;
-			blk_qc_t		cookie;
 		} submit;
 
 		/* used for aio completion: */
@@ -1436,6 +1436,30 @@ struct iomap_dio {
 	};
 };
 
+bool iomap_dio_iopoll(struct kiocb *kiocb, bool wait)
+{
+	struct iomap_dio *dio = kiocb->private;
+	struct request_queue *q = READ_ONCE(dio->last_queue);
+
+	if (!q)
+		return false;
+	return blk_poll(q, READ_ONCE(dio->cookie), wait);
+}
+EXPORT_SYMBOL_GPL(iomap_dio_iopoll);
+
+static void iomap_dio_submit_bio(struct iomap_dio *dio, struct iomap *iomap,
+		struct bio *bio)
+{
+	atomic_inc(&dio->ref);
+
+	/*
+	 * iomap_dio_iopoll can race with us.  A non-zero last_queue marks that
+	 * we are ready to poll.
+	 */
+	WRITE_ONCE(dio->cookie, submit_bio(bio));
+	WRITE_ONCE(dio->last_queue, bdev_get_queue(iomap->bdev));
+}
+
 static ssize_t iomap_dio_complete(struct iomap_dio *dio)
 {
 	struct kiocb *iocb = dio->iocb;
@@ -1548,14 +1572,13 @@ static void iomap_dio_bio_end_io(struct bio *bio)
 	}
 }
 
-static blk_qc_t
+static void
 iomap_dio_zero(struct iomap_dio *dio, struct iomap *iomap, loff_t pos,
 		unsigned len)
 {
 	struct page *page = ZERO_PAGE(0);
 	int flags = REQ_SYNC | REQ_IDLE;
 	struct bio *bio;
-	blk_qc_t qc;
 
 	bio = bio_alloc(GFP_KERNEL, 1);
 	bio_set_dev(bio, iomap->bdev);
@@ -1569,11 +1592,7 @@ iomap_dio_zero(struct iomap_dio *dio, struct iomap *iomap, loff_t pos,
 	get_page(page);
 	__bio_add_page(bio, page, len, 0);
 	bio_set_op_attrs(bio, REQ_OP_WRITE, flags);
-
-	atomic_inc(&dio->ref);
-	qc = submit_bio(bio);
-	WRITE_ONCE(dio->iocb->ki_blk_qc, qc);
-	return qc;
+	iomap_dio_submit_bio(dio, iomap, bio);
 }
 
 static loff_t
@@ -1679,11 +1698,7 @@ iomap_dio_bio_actor(struct inode *inode, loff_t pos, loff_t length,
 		copied += n;
 
 		nr_pages = iov_iter_npages(&iter, BIO_MAX_PAGES);
-
-		atomic_inc(&dio->ref);
-
-		dio->submit.last_queue = bdev_get_queue(iomap->bdev);
-		dio->iocb->ki_blk_qc = dio->submit.cookie = submit_bio(bio);
+		iomap_dio_submit_bio(dio, iomap, bio);
 	} while (nr_pages);
 
 	if (need_zeroout) {
@@ -1785,6 +1800,7 @@ iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
 	dio = kmalloc(sizeof(*dio), GFP_KERNEL);
 	if (!dio)
 		return -ENOMEM;
+	iocb->private = dio;
 
 	dio->iocb = iocb;
 	atomic_set(&dio->ref, 1);
@@ -1794,11 +1810,11 @@ iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
 	dio->error = 0;
 	dio->flags = 0;
 	dio->wait_for_completion = is_sync_kiocb(iocb);
+	dio->cookie = BLK_QC_T_NONE;
+	dio->last_queue = NULL;
 
 	dio->submit.iter = iter;
 	dio->submit.waiter = current;
-	dio->submit.cookie = BLK_QC_T_NONE;
-	dio->submit.last_queue = NULL;
 
 	if (iov_iter_rw(iter) == READ) {
 		if (pos >= dio->i_size)
@@ -1897,9 +1913,8 @@ iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
 				break;
 
 			if (!(iocb->ki_flags & IOCB_HIPRI) ||
-			    !dio->submit.last_queue ||
-			    !blk_poll(dio->submit.last_queue,
-					 dio->submit.cookie, true))
+			    !dio->last_queue ||
+			    !blk_poll(dio->last_queue, dio->cookie, true))
 				io_schedule();
 		}
 		__set_current_state(TASK_RUNNING);
diff --git a/fs/xfs/xfs_file.c b/fs/xfs/xfs_file.c
index 53c9ab8fb777..603e705781a4 100644
--- a/fs/xfs/xfs_file.c
+++ b/fs/xfs/xfs_file.c
@@ -1203,6 +1203,7 @@ const struct file_operations xfs_file_operations = {
 	.write_iter	= xfs_file_write_iter,
 	.splice_read	= generic_file_splice_read,
 	.splice_write	= iter_file_splice_write,
+	.iopoll		= iomap_dio_iopoll,
 	.unlocked_ioctl	= xfs_file_ioctl,
 #ifdef CONFIG_COMPAT
 	.compat_ioctl	= xfs_file_compat_ioctl,
diff --git a/include/linux/fs.h b/include/linux/fs.h
index 032761d9b218..1d46a10aef6c 100644
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -310,7 +310,6 @@ struct kiocb {
 	int			ki_flags;
 	u16			ki_hint;
 	u16			ki_ioprio; /* See linux/ioprio.h */
-	u32			ki_blk_qc;
 } __randomize_layout;
 
 static inline bool is_sync_kiocb(struct kiocb *kiocb)
@@ -1782,6 +1781,7 @@ struct file_operations {
 	ssize_t (*write) (struct file *, const char __user *, size_t, loff_t *);
 	ssize_t (*read_iter) (struct kiocb *, struct iov_iter *);
 	ssize_t (*write_iter) (struct kiocb *, struct iov_iter *);
+	bool (*iopoll)(struct kiocb *kiocb, bool wait);
 	int (*iterate) (struct file *, struct dir_context *);
 	int (*iterate_shared) (struct file *, struct dir_context *);
 	__poll_t (*poll) (struct file *, struct poll_table_struct *);
diff --git a/include/linux/iomap.h b/include/linux/iomap.h
index 9a4258154b25..2cbe87ad1878 100644
--- a/include/linux/iomap.h
+++ b/include/linux/iomap.h
@@ -162,6 +162,7 @@ typedef int (iomap_dio_end_io_t)(struct kiocb *iocb, ssize_t ret,
 		unsigned flags);
 ssize_t iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
 		const struct iomap_ops *ops, iomap_dio_end_io_t end_io);
+bool iomap_dio_iopoll(struct kiocb *kiocb, bool wait);
 
 #ifdef CONFIG_SWAP
 struct file;
-- 
2.19.1


^ permalink raw reply related	[flat|nested] 17+ messages in thread

* Re: [PATCH 5/5] aio: add support for file based polled IO
  2018-11-17 23:53 ` [PATCH 5/5] aio: add support for file based polled IO Jens Axboe
  2018-11-19  1:57   ` Dave Chinner
@ 2018-11-19  8:12   ` Christoph Hellwig
  1 sibling, 0 replies; 17+ messages in thread
From: Christoph Hellwig @ 2018-11-19  8:12 UTC (permalink / raw)
  To: Jens Axboe; +Cc: linux-block, linux-aio, linux-fsdevel

On Sat, Nov 17, 2018 at 04:53:17PM -0700, Jens Axboe wrote:
> Needs further work, but this should work fine on normal setups
> with a file system on a pollable block device.
> 
> Signed-off-by: Jens Axboe <axboe@kernel.dk>
> ---
>  fs/aio.c       | 2 ++
>  fs/direct-io.c | 4 +++-
>  fs/iomap.c     | 7 +++++--
>  3 files changed, 10 insertions(+), 3 deletions(-)
> 
> diff --git a/fs/aio.c b/fs/aio.c
> index 500da3ffc376..e02085fe10d7 100644
> --- a/fs/aio.c
> +++ b/fs/aio.c
> @@ -1310,6 +1310,8 @@ static struct block_device *aio_bdev_host(struct kiocb *req)
>  
>  	if (S_ISBLK(inode->i_mode))
>  		return I_BDEV(inode);
> +	else if (inode->i_sb && inode->i_sb->s_bdev)
> +		return inode->i_sb->s_bdev;

This is broken for multi-device filesystems, e.g. XFS with the RT
device, or btrfs.  See my patch in reply to your last one for an idea
how to solve this.

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [PATCH 4/5] aio: support for IO polling
  2018-11-19  8:11   ` Christoph Hellwig
@ 2018-11-19 13:32     ` Christoph Hellwig
  2018-11-19 16:07       ` Jens Axboe
  0 siblings, 1 reply; 17+ messages in thread
From: Christoph Hellwig @ 2018-11-19 13:32 UTC (permalink / raw)
  To: Jens Axboe; +Cc: linux-block, linux-aio, linux-fsdevel


I just saw the patch that avoids the irq disabling show up in your
tree this morning.  I think we can do even better by using slightly
lazy lists that are not updated from ->ki_complete context.

Please take a look at the patch below - this replaces patch 3 from
my previous mail, that is it is on top of what you send to the list
plus my first two patches.

Completely untested again of course..

---
>From cf9fd90d13a025d53b26ba54202c2898ba4bf0ef Mon Sep 17 00:00:00 2001
From: Christoph Hellwig <hch@lst.de>
Date: Sun, 18 Nov 2018 17:17:55 +0100
Subject: change aio poll list management
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Have a submitted list, which the iocb is added on on submission,
and batch removed from in __aio_check_polled.  The actual I/O
completion only ever marks the iocb completed using a bit flag.

The completion code then walks the list completely lock free
after a quick splice under the irq-disable less lock because we
prevent multiple contexts from polling at the same time.

Also move the actual blk_poll call into a filesystem method,
which makes the aio code better abstracted out, allows checking
if a given file actually supports it, and last but not least
adds support for filesystems with multіple block devices.

Signed-off-by: Christoph Hellwig <hch@lst.de>
---
 fs/aio.c              | 328 +++++++++++++++++-------------------------
 fs/block_dev.c        |  20 ++-
 fs/direct-io.c        |   4 +-
 fs/iomap.c            |  53 ++++---
 fs/xfs/xfs_file.c     |   1 +
 include/linux/fs.h    |   2 +-
 include/linux/iomap.h |   1 +
 7 files changed, 186 insertions(+), 223 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index d9198f99ed97..8fa106db9b64 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -89,6 +89,9 @@ struct ctx_rq_wait {
 enum {
 	CTX_TYPE_NORMAL = 0,
 	CTX_TYPE_POLLED,
+
+	/* currently undergoing a polling io_getevents */
+	CTX_TYPE_POLLING,
 };
 
 struct kioctx {
@@ -151,8 +154,7 @@ struct kioctx {
 
 	struct {
 		spinlock_t poll_lock;
-		struct list_head poll_pending;
-		struct list_head poll_done;
+		struct list_head poll_submitted;
 	} ____cacheline_aligned_in_smp;
 
 	struct {
@@ -175,6 +177,9 @@ struct kioctx {
 	struct file		*aio_ring_file;
 
 	unsigned		id;
+
+	struct list_head poll_completing;
+	atomic_t poll_completed;
 };
 
 struct fsync_iocb {
@@ -209,21 +214,27 @@ struct aio_kiocb {
 	struct list_head	ki_list;	/* the aio core uses this
 						 * for cancellation */
 
+	unsigned long		ki_flags;
+#define IOCB_POLL_COMPLETED	0
 	struct list_head	ki_poll_list;
 
 	refcount_t		ki_refcnt;
 
-	/*
-	 * If the aio_resfd field of the userspace iocb is not zero,
-	 * this is the underlying eventfd context to deliver events to.
-	 */
-	struct eventfd_ctx	*ki_eventfd;
+	union {
+		/*
+		 * If the aio_resfd field of the userspace iocb is not zero,
+		 * this is the underlying eventfd context to deliver events to.
+		 */
+		struct eventfd_ctx	*ki_eventfd;
 
-	/*
-	 * For polled IO, stash completion info here
-	 */
-	long			ki_poll_res;
-	long			ki_poll_res2;
+		/*
+		 * For polled IO, stash completion info here
+		 */
+		struct {
+			long			res;
+			long			res2;
+		} ki_iopoll;
+	};
 };
 
 /*------ sysctl variables----*/
@@ -761,8 +772,9 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
 	INIT_LIST_HEAD(&ctx->active_reqs);
 
 	spin_lock_init(&ctx->poll_lock);
-	INIT_LIST_HEAD(&ctx->poll_pending);
-	INIT_LIST_HEAD(&ctx->poll_done);
+	INIT_LIST_HEAD(&ctx->poll_submitted);
+	INIT_LIST_HEAD(&ctx->poll_completing);
+	atomic_set(&ctx->poll_completed, 0);
 
 	if (percpu_ref_init(&ctx->users, free_ioctx_users, 0, GFP_KERNEL))
 		goto err;
@@ -1282,38 +1294,6 @@ static bool aio_read_events(struct kioctx *ctx, long min_nr, long nr,
 	return ret < 0 || *i >= min_nr;
 }
 
-struct aio_iopoll_data {
-	unsigned int blk_qc;
-	struct block_device *bdev;
-};
-
-static int aio_io_poll(struct aio_iopoll_data *pd, bool wait)
-{
-#ifdef CONFIG_BLOCK
-	/*
-	 * Should only happen if someone sets ->ki_blk_qc at random,
-	 * not being a blockdev target. We'll just ignore it, the IO
-	 * will complete normally without being polled.
-	 */
-	if (pd->bdev)
-		return blk_poll(bdev_get_queue(pd->bdev), pd->blk_qc, wait);
-#endif
-
-	return 0;
-}
-
-static struct block_device *aio_bdev_host(struct kiocb *req)
-{
-	struct inode *inode = req->ki_filp->f_mapping->host;
-
-	if (S_ISBLK(inode->i_mode))
-		return I_BDEV(inode);
-	else if (inode->i_sb && inode->i_sb->s_bdev)
-		return inode->i_sb->s_bdev;
-
-	return NULL;
-}
-
 #define AIO_POLL_STACK	8
 
 /*
@@ -1322,157 +1302,119 @@ static struct block_device *aio_bdev_host(struct kiocb *req)
  * the caller should free them.
  */
 static long aio_poll_reap(struct kioctx *ctx, struct io_event __user *evs,
-		int off, long max, void **iocbs, int *to_free)
-	__releases(&ctx->poll_lock)
-	__acquires(&ctx->poll_lock)
+		unsigned int *nr_events, long max)
 {
-	struct aio_kiocb *iocb;
-	int ret, nr = 0;
+	void *iocbs[AIO_POLL_STACK];
+	struct aio_kiocb *iocb, *n;
+	int to_free = 0, ret = 0;
 
-	while ((iocb = list_first_entry_or_null(&ctx->poll_done,
-			struct aio_kiocb, ki_poll_list))) {
-		struct io_event __user *uev;
+	list_for_each_entry_safe(iocb, n, &ctx->poll_completing, ki_poll_list) {
 		struct io_event ev;
 
-		if (*to_free == AIO_POLL_STACK) {
-			iocb_put_many(ctx, iocbs, *to_free);
-			*to_free = 0;
+		if (!test_bit(IOCB_POLL_COMPLETED, &iocb->ki_flags))
+			continue;
+
+		if (to_free == AIO_POLL_STACK) {
+			iocb_put_many(ctx, iocbs, to_free);
+			to_free = 0;
 		}
 
 		list_del(&iocb->ki_poll_list);
-		iocbs[*to_free++] = iocb;
+		iocbs[to_free++] = iocb;
 
 		if (!evs) {
-			nr++;
+			(*nr_events)++;
 			continue;
 		}
 
 		ev.obj = (u64)(unsigned long)iocb->ki_user_iocb;
 		ev.data = iocb->ki_user_data;
-		ev.res = iocb->ki_poll_res;
-		ev.res2 = iocb->ki_poll_res2;
-
-		uev = evs + nr + off;
-		if (unlikely(__copy_to_user_inatomic(uev, &ev, sizeof(*uev)))) {
-			/*
-			 * Unexpected slow path, drop lock and attempt copy
-			 * again.  If this also fails we are done.
-			 */
-			spin_unlock_irq(&ctx->poll_lock);
-			ret = copy_to_user(uev, &ev, sizeof(*uev));
-			spin_lock_irq(&ctx->poll_lock);
-			if (ret)
-				return nr ? nr : -EFAULT;
+		ev.res = iocb->ki_iopoll.res;
+		ev.res2 = iocb->ki_iopoll.res2;
+		if (copy_to_user(evs + *nr_events, &ev, sizeof(ev))) {
+			ret = -EFAULT;
+			break;
 		}
 
-		if (++nr + off == max)
+		if (++(*nr_events) == max)
 			break;
 	}
 
-	return nr;
-}
-
-static void aio_poll_for_events(struct kioctx *ctx, struct aio_iopoll_data *pd,
-				unsigned int nr_pd, int off, long min, long max)
-{
-	int i, polled = 0;
-
-	/*
-	 * Poll for needed events with wait == true, anything
-	 * after that we just check if we have more, up to max.
-	 */
-	for (i = 0; i < nr_pd; i++) {
-		bool wait = polled + off >= min;
-
-		polled += aio_io_poll(&pd[i], wait);
-		if (polled + off >= max)
-			break;
-
-		/*
-		 * If we have entries waiting to be reaped, stop polling
-		 */
-		if (!list_empty_careful(&ctx->poll_done))
-			break;
-	}
+	if (to_free)
+		iocb_put_many(ctx, iocbs, to_free);
+	return ret;
 }
 
 static int __aio_check_polled(struct kioctx *ctx, struct io_event __user *event,
-			      int off, unsigned int *entries, long min, long max)
+			      unsigned int *nr_events, long min, long max)
 {
-	struct aio_iopoll_data pd[AIO_POLL_STACK];
-	void *iocbs[AIO_POLL_STACK];
-	int to_free = 0;
 	struct aio_kiocb *iocb;
-	unsigned int nr_pd;
-	int ret, found = 0;
-
-	if (list_empty_careful(&ctx->poll_pending))
-		goto out;
+	unsigned int poll_completed;
+	int to_poll = 0, polled = 0, ret;
 
 	/*
 	 * Check if we already have done events that satisfy what we need
 	 */
-	spin_lock_irq(&ctx->poll_lock);
-	while ((ret = aio_poll_reap(ctx, event, off, max, iocbs, &to_free))) {
-		if (ret < 0 || ret + off >= min) {
-			spin_unlock_irq(&ctx->poll_lock);
-			if (to_free)
-				iocb_put_many(ctx, iocbs, to_free);
+	if (!list_empty(&ctx->poll_completing)) {
+		ret = aio_poll_reap(ctx, event, nr_events, max);
+		if (ret < 0)
 			return ret;
-		}
+		if (*nr_events >= min)
+			return 0;
+	}
 
-		if (to_free) {
-			iocb_put_many(ctx, iocbs, to_free);
-			to_free = 0;
-		}
-		found += ret;
-		off += ret;
+	/*
+	 * Take in a new working set from the submitted list if possible.
+	 */
+	if (!list_empty_careful(&ctx->poll_submitted)) {
+		spin_lock(&ctx->poll_lock);
+		list_splice_init(&ctx->poll_submitted, &ctx->poll_completing);
+		spin_unlock(&ctx->poll_lock);
 	}
 
+	if (list_empty(&ctx->poll_completing))
+		return 0;
+
+	/*
+	 * Check again now that we have a new batch.
+	 */
+	ret = aio_poll_reap(ctx, event, nr_events, max);
+	if (ret < 0)
+		return ret;
+	if (*nr_events >= min)
+		return 0;
+
 	/*
 	 * Find up to 'max_nr' worth of events to poll for, including the
 	 * events we already successfully polled
 	 */
-	nr_pd = 0;
-	list_for_each_entry(iocb, &ctx->poll_pending, ki_poll_list) {
-		struct kiocb *kiocb = &iocb->rw;
-		blk_qc_t qc;
-
+	poll_completed = atomic_read(&ctx->poll_completed);
+	list_for_each_entry(iocb, &ctx->poll_completing, ki_poll_list) {
 		/*
-		 * Not submitted yet, don't poll for it
+		 * Poll for needed events with wait == true, anything after
+		 * that we just check if we have more, up to max.
 		 */
-		qc = READ_ONCE(kiocb->ki_blk_qc);
-		if (qc == BLK_QC_T_NONE)
-			continue;
+		bool wait = polled + *nr_events >= min;
 
-		pd[nr_pd].blk_qc = qc;
-		pd[nr_pd].bdev = aio_bdev_host(kiocb);
-
-		++nr_pd;
-		if (nr_pd == ARRAY_SIZE(pd) || nr_pd + off >= max)
+		if (test_bit(IOCB_POLL_COMPLETED, &iocb->ki_flags))
 			break;
-	}
-	spin_unlock_irq(&ctx->poll_lock);
 
-	if (nr_pd) {
-		*entries = nr_pd;
-		aio_poll_for_events(ctx, pd, nr_pd, off, min, max);
-	}
+		if (++to_poll + *nr_events >= max)
+			break;
 
-out:
-	if (!list_empty_careful(&ctx->poll_done)) {
-		spin_lock_irq(&ctx->poll_lock);
-		ret = aio_poll_reap(ctx, event, off, max, iocbs, &to_free);
-		spin_unlock_irq(&ctx->poll_lock);
-	
-		if (to_free)
-			iocb_put_many(ctx, iocbs, to_free);
-		if (ret < 0)
-			return ret;
-		found += ret;
+		polled += iocb->rw.ki_filp->f_op->iopoll(&iocb->rw, wait);
+		if (polled + *nr_events >= max)
+			break;
+		if (poll_completed != atomic_read(&ctx->poll_completed))
+			break;
 	}
 
-	return found;
+	ret = aio_poll_reap(ctx, event, nr_events, max);
+	if (ret < 0)
+		return ret;
+	if (*nr_events >= min)
+		return 0;
+	return to_poll;
 }
 
 /*
@@ -1481,48 +1423,41 @@ static int __aio_check_polled(struct kioctx *ctx, struct io_event __user *event,
  */
 static void aio_reap_polled_events(struct kioctx *ctx)
 {
-	unsigned int loop, found;
-
 	if (!test_bit(CTX_TYPE_POLLED, &ctx->io_type))
 		return;
 
-	spin_lock_irq(&ctx->poll_lock);
-	while (!list_empty(&ctx->poll_pending) || !list_empty(&ctx->poll_done)) {
-		loop = 0;
-		spin_unlock_irq(&ctx->poll_lock);
-		found = __aio_check_polled(ctx, NULL, 0, &loop, 1, UINT_MAX);
-		spin_lock_irq(&ctx->poll_lock);
+	while (!list_empty_careful(&ctx->poll_submitted) ||
+	       !list_empty(&ctx->poll_completing)) {
+		unsigned int nr_events = 0;
+
+		__aio_check_polled(ctx, NULL, &nr_events, 1, UINT_MAX);
 	}
-	spin_unlock_irq(&ctx->poll_lock);
 }
 
 static int aio_check_polled(struct kioctx *ctx, long min_nr, long nr,
 			    struct io_event __user *event)
 {
-	unsigned int found;
-	int this, ret = 0;
+	unsigned int nr_events = 0;
+	int ret = 0;
 
-	if (!access_ok(VERIFY_WRITE, event, nr * sizeof(*event)))
-		return -EFAULT;
+	/* We can only allow a single thread to poll a context at a time */
+	if (test_and_set_bit(CTX_TYPE_POLLING, &ctx->io_type))
+		return -EBUSY;
 
-	do {
-		int tmin;
+	while (!nr_events || !need_resched()) {
+		int tmin = 0;
 
-		if (ret && need_resched())
-			break;
+		if (nr_events < min_nr)
+			tmin = min_nr - nr_events;
 
-		found = 0;
-		tmin = ret >= min_nr ? 0 : min_nr - ret;
-		this = __aio_check_polled(ctx, event, ret, &found, tmin, nr);
-		if (this < 0) {
-			if (!ret)
-				ret = this;
+		ret = __aio_check_polled(ctx, event, &nr_events, tmin, nr);
+		if (ret <= 0)
 			break;
-		}
-		ret += this;
-	} while (found && ret < min_nr);
+		ret = 0;
+	}
 
-	return ret;
+	clear_bit(CTX_TYPE_POLLING, &ctx->io_type);
+	return nr_events ? nr_events : ret;
 }
 
 static long read_events(struct kioctx *ctx, long min_nr, long nr,
@@ -1707,19 +1642,15 @@ static void aio_complete_rw(struct kiocb *kiocb, long res, long res2)
 static void aio_complete_rw_poll(struct kiocb *kiocb, long res, long res2)
 {
 	struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, rw);
-	struct kioctx *ctx = iocb->ki_ctx;
 	struct file *filp = kiocb->ki_filp;
-	unsigned long flags;
 
 	kiocb_end_write(kiocb);
 
-	iocb->ki_poll_res = res;
-	iocb->ki_poll_res2 = res2;
-
-	spin_lock_irqsave(&ctx->poll_lock, flags);
-	list_move_tail(&iocb->ki_poll_list, &ctx->poll_done);
-	spin_unlock_irqrestore(&ctx->poll_lock, flags);
+	iocb->ki_iopoll.res = res;
+	iocb->ki_iopoll.res2 = res2;
 
+	set_bit(IOCB_POLL_COMPLETED, &iocb->ki_flags);
+	atomic_inc(&iocb->ki_ctx->poll_completed);
 	fput(filp);
 }
 
@@ -1737,14 +1668,19 @@ static int aio_prep_rw(struct aio_kiocb *kiocb, struct iocb *iocb)
 	if (iocb->aio_flags & IOCB_FLAG_HIPRI) {
 		struct kioctx *ctx = kiocb->ki_ctx;
 
+		ret = -EOPNOTSUPP;
+		if (!(req->ki_flags & IOCB_DIRECT) ||
+		    !req->ki_filp->f_op->iopoll)
+			goto out_fput;
+
 		req->ki_flags |= IOCB_HIPRI;
-		req->ki_blk_qc = BLK_QC_T_NONE;
 		req->ki_complete = aio_complete_rw_poll;
 
-		spin_lock_irq(&ctx->poll_lock);
-		list_add_tail(&kiocb->ki_poll_list, &ctx->poll_pending);
-		spin_unlock_irq(&ctx->poll_lock);
+		spin_lock(&ctx->poll_lock);
+		list_add_tail(&kiocb->ki_poll_list, &ctx->poll_submitted);
+		spin_unlock(&ctx->poll_lock);
 	} else {
+		req->ki_flags &= ~IOCB_HIPRI;
 		req->ki_complete = aio_complete_rw;
 	}
 
@@ -1761,8 +1697,7 @@ static int aio_prep_rw(struct aio_kiocb *kiocb, struct iocb *iocb)
 		ret = ioprio_check_cap(iocb->aio_reqprio);
 		if (ret) {
 			pr_debug("aio ioprio check cap error: %d\n", ret);
-			fput(req->ki_filp);
-			return ret;
+			goto out_fput;
 		}
 
 		req->ki_ioprio = iocb->aio_reqprio;
@@ -1771,7 +1706,10 @@ static int aio_prep_rw(struct aio_kiocb *kiocb, struct iocb *iocb)
 
 	ret = kiocb_set_rw_flags(req, iocb->aio_rw_flags);
 	if (unlikely(ret))
-		fput(req->ki_filp);
+		goto out_fput;
+	return 0;
+out_fput:
+	fput(req->ki_filp);
 	return ret;
 }
 
diff --git a/fs/block_dev.c b/fs/block_dev.c
index 8a2fed18e3fc..8ba58e280ac6 100644
--- a/fs/block_dev.c
+++ b/fs/block_dev.c
@@ -236,7 +236,6 @@ __blkdev_direct_IO_simple(struct kiocb *iocb, struct iov_iter *iter,
 		bio.bi_opf |= REQ_HIPRI;
 
 	qc = submit_bio(&bio);
-	WRITE_ONCE(iocb->ki_blk_qc, qc);
 	for (;;) {
 		__set_current_state(TASK_UNINTERRUPTIBLE);
 
@@ -274,6 +273,7 @@ struct blkdev_dio {
 	};
 	size_t			size;
 	atomic_t		ref;
+	blk_qc_t		qc;
 	bool			multi_bio : 1;
 	bool			should_dirty : 1;
 	bool			is_sync : 1;
@@ -282,6 +282,14 @@ struct blkdev_dio {
 
 static struct bio_set blkdev_dio_pool;
 
+static bool blkdev_iopoll(struct kiocb *kiocb, bool wait)
+{
+	struct blkdev_dio *dio = kiocb->private;
+	struct block_device *bdev = I_BDEV(kiocb->ki_filp->f_mapping->host);
+
+	return blk_poll(bdev_get_queue(bdev), READ_ONCE(dio->qc), wait);
+}
+
 static void blkdev_bio_end_io(struct bio *bio)
 {
 	struct blkdev_dio *dio = bio->bi_private;
@@ -336,7 +344,6 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages)
 	bool is_poll = (iocb->ki_flags & IOCB_HIPRI) != 0;
 	bool is_read = (iov_iter_rw(iter) == READ), is_sync;
 	loff_t pos = iocb->ki_pos;
-	blk_qc_t qc = BLK_QC_T_NONE;
 	int ret = 0;
 
 	if ((pos | iov_iter_alignment(iter)) &
@@ -356,6 +363,9 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages)
 	dio->size = 0;
 	dio->multi_bio = false;
 	dio->should_dirty = is_read && iter_is_iovec(iter);
+	dio->qc = BLK_QC_T_NONE;
+
+	iocb->private = dio;
 
 	/*
 	 * Don't plug for HIPRI/polled IO, as those should go straight
@@ -396,8 +406,7 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages)
 			if (iocb->ki_flags & IOCB_HIPRI)
 				bio->bi_opf |= REQ_HIPRI;
 
-			qc = submit_bio(bio);
-			WRITE_ONCE(iocb->ki_blk_qc, qc);
+			WRITE_ONCE(dio->qc, submit_bio(bio));
 			break;
 		}
 
@@ -425,7 +434,7 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages)
 			break;
 
 		if (!(iocb->ki_flags & IOCB_HIPRI) ||
-		    !blk_poll(bdev_get_queue(bdev), qc, true))
+		    !blk_poll(bdev_get_queue(bdev), dio->qc, true))
 			io_schedule();
 	}
 	__set_current_state(TASK_RUNNING);
@@ -2063,6 +2072,7 @@ const struct file_operations def_blk_fops = {
 	.llseek		= block_llseek,
 	.read_iter	= blkdev_read_iter,
 	.write_iter	= blkdev_write_iter,
+	.iopoll		= blkdev_iopoll,
 	.mmap		= generic_file_mmap,
 	.fsync		= blkdev_fsync,
 	.unlocked_ioctl	= block_ioctl,
diff --git a/fs/direct-io.c b/fs/direct-io.c
index 34de494e9061..a5a4e5a1423e 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -477,10 +477,8 @@ static inline void dio_bio_submit(struct dio *dio, struct dio_submit *sdio)
 	if (sdio->submit_io) {
 		sdio->submit_io(bio, dio->inode, sdio->logical_offset_in_bio);
 		dio->bio_cookie = BLK_QC_T_NONE;
-	} else {
+	} else
 		dio->bio_cookie = submit_bio(bio);
-		WRITE_ONCE(dio->iocb->ki_blk_qc, dio->bio_cookie);
-	}
 
 	sdio->bio = NULL;
 	sdio->boundary = 0;
diff --git a/fs/iomap.c b/fs/iomap.c
index 4cf412b6230a..e5cd9dbe78a8 100644
--- a/fs/iomap.c
+++ b/fs/iomap.c
@@ -1419,14 +1419,14 @@ struct iomap_dio {
 	unsigned		flags;
 	int			error;
 	bool			wait_for_completion;
+	blk_qc_t		cookie;
+	struct request_queue	*last_queue;
 
 	union {
 		/* used during submission and for synchronous completion: */
 		struct {
 			struct iov_iter		*iter;
 			struct task_struct	*waiter;
-			struct request_queue	*last_queue;
-			blk_qc_t		cookie;
 		} submit;
 
 		/* used for aio completion: */
@@ -1436,6 +1436,30 @@ struct iomap_dio {
 	};
 };
 
+bool iomap_dio_iopoll(struct kiocb *kiocb, bool wait)
+{
+	struct iomap_dio *dio = kiocb->private;
+	struct request_queue *q = READ_ONCE(dio->last_queue);
+
+	if (!q)
+		return false;
+	return blk_poll(q, READ_ONCE(dio->cookie), wait);
+}
+EXPORT_SYMBOL_GPL(iomap_dio_iopoll);
+
+static void iomap_dio_submit_bio(struct iomap_dio *dio, struct iomap *iomap,
+		struct bio *bio)
+{
+	atomic_inc(&dio->ref);
+
+	/*
+	 * iomap_dio_iopoll can race with us.  A non-zero last_queue marks that
+	 * we are ready to poll.
+	 */
+	WRITE_ONCE(dio->cookie, submit_bio(bio));
+	WRITE_ONCE(dio->last_queue, bdev_get_queue(iomap->bdev));
+}
+
 static ssize_t iomap_dio_complete(struct iomap_dio *dio)
 {
 	struct kiocb *iocb = dio->iocb;
@@ -1548,14 +1572,13 @@ static void iomap_dio_bio_end_io(struct bio *bio)
 	}
 }
 
-static blk_qc_t
+static void
 iomap_dio_zero(struct iomap_dio *dio, struct iomap *iomap, loff_t pos,
 		unsigned len)
 {
 	struct page *page = ZERO_PAGE(0);
 	int flags = REQ_SYNC | REQ_IDLE;
 	struct bio *bio;
-	blk_qc_t qc;
 
 	bio = bio_alloc(GFP_KERNEL, 1);
 	bio_set_dev(bio, iomap->bdev);
@@ -1569,11 +1592,7 @@ iomap_dio_zero(struct iomap_dio *dio, struct iomap *iomap, loff_t pos,
 	get_page(page);
 	__bio_add_page(bio, page, len, 0);
 	bio_set_op_attrs(bio, REQ_OP_WRITE, flags);
-
-	atomic_inc(&dio->ref);
-	qc = submit_bio(bio);
-	WRITE_ONCE(dio->iocb->ki_blk_qc, qc);
-	return qc;
+	iomap_dio_submit_bio(dio, iomap, bio);
 }
 
 static loff_t
@@ -1679,11 +1698,7 @@ iomap_dio_bio_actor(struct inode *inode, loff_t pos, loff_t length,
 		copied += n;
 
 		nr_pages = iov_iter_npages(&iter, BIO_MAX_PAGES);
-
-		atomic_inc(&dio->ref);
-
-		dio->submit.last_queue = bdev_get_queue(iomap->bdev);
-		dio->iocb->ki_blk_qc = dio->submit.cookie = submit_bio(bio);
+		iomap_dio_submit_bio(dio, iomap, bio);
 	} while (nr_pages);
 
 	if (need_zeroout) {
@@ -1785,6 +1800,7 @@ iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
 	dio = kmalloc(sizeof(*dio), GFP_KERNEL);
 	if (!dio)
 		return -ENOMEM;
+	iocb->private = dio;
 
 	dio->iocb = iocb;
 	atomic_set(&dio->ref, 1);
@@ -1794,11 +1810,11 @@ iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
 	dio->error = 0;
 	dio->flags = 0;
 	dio->wait_for_completion = is_sync_kiocb(iocb);
+	dio->cookie = BLK_QC_T_NONE;
+	dio->last_queue = NULL;
 
 	dio->submit.iter = iter;
 	dio->submit.waiter = current;
-	dio->submit.cookie = BLK_QC_T_NONE;
-	dio->submit.last_queue = NULL;
 
 	if (iov_iter_rw(iter) == READ) {
 		if (pos >= dio->i_size)
@@ -1897,9 +1913,8 @@ iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
 				break;
 
 			if (!(iocb->ki_flags & IOCB_HIPRI) ||
-			    !dio->submit.last_queue ||
-			    !blk_poll(dio->submit.last_queue,
-					 dio->submit.cookie, true))
+			    !dio->last_queue ||
+			    !blk_poll(dio->last_queue, dio->cookie, true))
 				io_schedule();
 		}
 		__set_current_state(TASK_RUNNING);
diff --git a/fs/xfs/xfs_file.c b/fs/xfs/xfs_file.c
index 53c9ab8fb777..603e705781a4 100644
--- a/fs/xfs/xfs_file.c
+++ b/fs/xfs/xfs_file.c
@@ -1203,6 +1203,7 @@ const struct file_operations xfs_file_operations = {
 	.write_iter	= xfs_file_write_iter,
 	.splice_read	= generic_file_splice_read,
 	.splice_write	= iter_file_splice_write,
+	.iopoll		= iomap_dio_iopoll,
 	.unlocked_ioctl	= xfs_file_ioctl,
 #ifdef CONFIG_COMPAT
 	.compat_ioctl	= xfs_file_compat_ioctl,
diff --git a/include/linux/fs.h b/include/linux/fs.h
index 032761d9b218..1d46a10aef6c 100644
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -310,7 +310,6 @@ struct kiocb {
 	int			ki_flags;
 	u16			ki_hint;
 	u16			ki_ioprio; /* See linux/ioprio.h */
-	u32			ki_blk_qc;
 } __randomize_layout;
 
 static inline bool is_sync_kiocb(struct kiocb *kiocb)
@@ -1782,6 +1781,7 @@ struct file_operations {
 	ssize_t (*write) (struct file *, const char __user *, size_t, loff_t *);
 	ssize_t (*read_iter) (struct kiocb *, struct iov_iter *);
 	ssize_t (*write_iter) (struct kiocb *, struct iov_iter *);
+	bool (*iopoll)(struct kiocb *kiocb, bool wait);
 	int (*iterate) (struct file *, struct dir_context *);
 	int (*iterate_shared) (struct file *, struct dir_context *);
 	__poll_t (*poll) (struct file *, struct poll_table_struct *);
diff --git a/include/linux/iomap.h b/include/linux/iomap.h
index 9a4258154b25..2cbe87ad1878 100644
--- a/include/linux/iomap.h
+++ b/include/linux/iomap.h
@@ -162,6 +162,7 @@ typedef int (iomap_dio_end_io_t)(struct kiocb *iocb, ssize_t ret,
 		unsigned flags);
 ssize_t iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
 		const struct iomap_ops *ops, iomap_dio_end_io_t end_io);
+bool iomap_dio_iopoll(struct kiocb *kiocb, bool wait);
 
 #ifdef CONFIG_SWAP
 struct file;
-- 
2.19.1

^ permalink raw reply related	[flat|nested] 17+ messages in thread

* Re: [PATCH 2/5] aio: fix failure to put the file pointer
  2018-11-19  8:07   ` Christoph Hellwig
@ 2018-11-19 15:39     ` Jens Axboe
  0 siblings, 0 replies; 17+ messages in thread
From: Jens Axboe @ 2018-11-19 15:39 UTC (permalink / raw)
  To: Christoph Hellwig; +Cc: linux-block, linux-aio, linux-fsdevel

On 11/19/18 1:07 AM, Christoph Hellwig wrote:
> On Sat, Nov 17, 2018 at 04:53:14PM -0700, Jens Axboe wrote:
>> If the ioprio capability check fails, we return without putting
>> the file pointer.
>>
>> Fixes: d9a08a9e616b ("fs: Add aio iopriority support")
>> Signed-off-by: Jens Axboe <axboe@kernel.dk>
> 
> Looks good.  Please also send it to Al so that it can go into 4.20
> and -stable.
> 
> Reviewed-by: Christoph Hellwig <hch@lst.de>

Al already took it, but it's still in the series since it's not in
Linus's tree yet.

-- 
Jens Axboe

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [PATCH 4/5] aio: support for IO polling
  2018-11-19 13:32     ` Christoph Hellwig
@ 2018-11-19 16:07       ` Jens Axboe
  0 siblings, 0 replies; 17+ messages in thread
From: Jens Axboe @ 2018-11-19 16:07 UTC (permalink / raw)
  To: Christoph Hellwig; +Cc: linux-block, linux-aio, linux-fsdevel

On 11/19/18 6:32 AM, Christoph Hellwig wrote:
> 
> I just saw the patch that avoids the irq disabling show up in your
> tree this morning.  I think we can do even better by using slightly
> lazy lists that are not updated from ->ki_complete context.

I totally agree, it's just a first step. One restriction you added
is that only one task can be on io_getevents at the time for an
io_context, I didn't want to go that far. One issue with that is
that we can find events that are for other io contexts, if you
saw my patch for the stack list, then that's in there too. So you have
to deal with the case where multiple tasks can be completing events for
the same io_context, regardless of whether you try to block that
behavior at io_getevents() entry time.

> Please take a look at the patch below - this replaces patch 3 from
> my previous mail, that is it is on top of what you send to the list
> plus my first two patches.

Thanks, I'll take a look. I'll split out fhe fops change, as that is
indeed how I wanted to get this done ultimately as well, since it's the
only way to support more complicated setups.

-- 
Jens Axboe

^ permalink raw reply	[flat|nested] 17+ messages in thread

end of thread, other threads:[~2018-11-20  2:31 UTC | newest]

Thread overview: 17+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-11-17 23:53 [PATCHSET 0/5] Support for polled aio Jens Axboe
2018-11-17 23:53 ` [PATCH 1/5] aio: use assigned completion handler Jens Axboe
2018-11-19  8:06   ` Christoph Hellwig
2018-11-17 23:53 ` [PATCH 2/5] aio: fix failure to put the file pointer Jens Axboe
2018-11-19  8:07   ` Christoph Hellwig
2018-11-19 15:39     ` Jens Axboe
2018-11-17 23:53 ` [PATCH 3/5] aio: add iocb->ki_blk_qc field Jens Axboe
2018-11-19  1:59   ` Dave Chinner
2018-11-19  2:59     ` Jens Axboe
2018-11-17 23:53 ` [PATCH 4/5] aio: support for IO polling Jens Axboe
2018-11-19  8:11   ` Christoph Hellwig
2018-11-19 13:32     ` Christoph Hellwig
2018-11-19 16:07       ` Jens Axboe
2018-11-17 23:53 ` [PATCH 5/5] aio: add support for file based polled IO Jens Axboe
2018-11-19  1:57   ` Dave Chinner
2018-11-19  2:58     ` Jens Axboe
2018-11-19  8:12   ` Christoph Hellwig

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).