All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCHSET v2] Support for polled aio
@ 2018-11-20 17:19 Jens Axboe
  2018-11-20 17:19 ` [PATCH 1/8] fs: add file_operations ->iopoll() handler Jens Axboe
                   ` (7 more replies)
  0 siblings, 8 replies; 16+ messages in thread
From: Jens Axboe @ 2018-11-20 17:19 UTC (permalink / raw)
  To: linux-block, linux-aio, linux-fsdevel

For the grand introduction to this feature, see my original posting
here:

https://lore.kernel.org/linux-block/20181117235317.7366-1-axboe@kernel.dk/

Since last time, a number of significant changes has been made:

- Addition of the io_setup2() system call. I didn't like tracking the
  fact that an io_context was polled or not internally, I think it's
  much better to make this explicit. The system call is like io_setup(),
  except we pass in a flags argument. In the flags, the application can
  set IOCTX_FLAG_IOPOLL to ask for this context to be polled. The syscall
  is only wired up for x86-64 for now.

- Addition of fops->iopoll() to properly handle polling on files.

- Various optimizations to how we track and handle poll requests
  internally.

- Split some patches into prep patches, to reduce the size of the
  final poll support patch.

- Various cleanups, tweaks, improvements.

In terms of efficiency, I've got one device that maxes out at 845K
4k IOPS, and the polled IO can reach this with just a single thread/core.
For comparison, non-polled on the same device reaches 600K IOPS. On
a slightly faster device, I can reach 910K IOPS on a single thread. All
of this is on a 2.2GHz server box, faster clock rate will go faster.

In terms of how to use polling, I'd advise you to check out the basic
changes made to fio to support it. It's simple:

1) Ensure that you call io_setup2() with IOCTX_FLAG_IOPOLL, instead
   of io_queue_init() or io_setup().
2) Set iocb->u.c.flags to IOCB_FLAG_HIPRI.

That's it, everything else works like before.

Patches are against my mq-perf branch, and can also be found in my
aio poll branch.

 arch/x86/entry/syscalls/syscall_64.tbl |   1 +
 fs/aio.c                               | 469 +++++++++++++++++++++----
 fs/block_dev.c                         |  25 +-
 fs/iomap.c                             |  50 ++-
 fs/xfs/xfs_file.c                      |   1 +
 include/linux/fs.h                     |   1 +
 include/linux/iomap.h                  |   1 +
 include/linux/syscalls.h               |   2 +
 include/uapi/asm-generic/unistd.h      |   4 +-
 include/uapi/linux/aio_abi.h           |   4 +
 kernel/sys_ni.c                        |   1 +
 11 files changed, 461 insertions(+), 98 deletions(-)

-- 
Jens Axboe



^ permalink raw reply	[flat|nested] 16+ messages in thread

* [PATCH 1/8] fs: add file_operations ->iopoll() handler
  2018-11-20 17:19 [PATCHSET v2] Support for polled aio Jens Axboe
@ 2018-11-20 17:19 ` Jens Axboe
  2018-11-20 17:19 ` [PATCH 2/8] block: wire up block device ->iopoll() Jens Axboe
                   ` (6 subsequent siblings)
  7 siblings, 0 replies; 16+ messages in thread
From: Jens Axboe @ 2018-11-20 17:19 UTC (permalink / raw)
  To: linux-block, linux-aio, linux-fsdevel; +Cc: Jens Axboe

In preparation for adding async aio iopolling.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 include/linux/fs.h | 1 +
 1 file changed, 1 insertion(+)

diff --git a/include/linux/fs.h b/include/linux/fs.h
index a1ab233e6469..9109c287437a 100644
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -1781,6 +1781,7 @@ struct file_operations {
 	ssize_t (*write) (struct file *, const char __user *, size_t, loff_t *);
 	ssize_t (*read_iter) (struct kiocb *, struct iov_iter *);
 	ssize_t (*write_iter) (struct kiocb *, struct iov_iter *);
+	int (*iopoll)(struct kiocb *kiocb, bool spin);
 	int (*iterate) (struct file *, struct dir_context *);
 	int (*iterate_shared) (struct file *, struct dir_context *);
 	__poll_t (*poll) (struct file *, struct poll_table_struct *);
-- 
2.17.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH 2/8] block: wire up block device ->iopoll()
  2018-11-20 17:19 [PATCHSET v2] Support for polled aio Jens Axboe
  2018-11-20 17:19 ` [PATCH 1/8] fs: add file_operations ->iopoll() handler Jens Axboe
@ 2018-11-20 17:19 ` Jens Axboe
  2018-11-20 17:19 ` [PATCH 3/8] iomap/xfs: wire up file_operations ->iopoll() Jens Axboe
                   ` (5 subsequent siblings)
  7 siblings, 0 replies; 16+ messages in thread
From: Jens Axboe @ 2018-11-20 17:19 UTC (permalink / raw)
  To: linux-block, linux-aio, linux-fsdevel; +Cc: Jens Axboe

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/block_dev.c | 25 ++++++++++++++++++++++---
 1 file changed, 22 insertions(+), 3 deletions(-)

diff --git a/fs/block_dev.c b/fs/block_dev.c
index d233a59ea364..711cd5a3469e 100644
--- a/fs/block_dev.c
+++ b/fs/block_dev.c
@@ -273,6 +273,7 @@ struct blkdev_dio {
 	};
 	size_t			size;
 	atomic_t		ref;
+	blk_qc_t		qc;
 	bool			multi_bio : 1;
 	bool			should_dirty : 1;
 	bool			is_sync : 1;
@@ -281,6 +282,21 @@ struct blkdev_dio {
 
 static struct bio_set blkdev_dio_pool;
 
+static int blkdev_iopoll(struct kiocb *kiocb, bool wait)
+{
+	struct blkdev_dio *dio = READ_ONCE(kiocb->private);
+
+	/* dio can be NULL here, if the IO hasn't been submitted yet */
+	if (dio) {
+		struct block_device *bdev;
+
+		bdev = I_BDEV(kiocb->ki_filp->f_mapping->host);
+		return blk_poll(bdev_get_queue(bdev), READ_ONCE(dio->qc), wait);
+	}
+
+	return 0;
+}
+
 static void blkdev_bio_end_io(struct bio *bio)
 {
 	struct blkdev_dio *dio = bio->bi_private;
@@ -335,7 +351,6 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages)
 	bool is_poll = (iocb->ki_flags & IOCB_HIPRI) != 0;
 	bool is_read = (iov_iter_rw(iter) == READ), is_sync;
 	loff_t pos = iocb->ki_pos;
-	blk_qc_t qc = BLK_QC_T_NONE;
 	int ret = 0;
 
 	if ((pos | iov_iter_alignment(iter)) &
@@ -355,6 +370,9 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages)
 	dio->size = 0;
 	dio->multi_bio = false;
 	dio->should_dirty = is_read && iter_is_iovec(iter);
+	dio->qc = BLK_QC_T_NONE;
+
+	WRITE_ONCE(iocb->private, dio);
 
 	/*
 	 * Don't plug for HIPRI/polled IO, as those should go straight
@@ -395,7 +413,7 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages)
 			if (iocb->ki_flags & IOCB_HIPRI)
 				bio->bi_opf |= REQ_HIPRI;
 
-			qc = submit_bio(bio);
+			dio->qc = submit_bio(bio);
 			break;
 		}
 
@@ -423,7 +441,7 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages)
 			break;
 
 		if (!(iocb->ki_flags & IOCB_HIPRI) ||
-		    !blk_poll(bdev_get_queue(bdev), qc, true))
+		    !blk_poll(bdev_get_queue(bdev), dio->qc, true))
 			io_schedule();
 	}
 	__set_current_state(TASK_RUNNING);
@@ -2061,6 +2079,7 @@ const struct file_operations def_blk_fops = {
 	.llseek		= block_llseek,
 	.read_iter	= blkdev_read_iter,
 	.write_iter	= blkdev_write_iter,
+	.iopoll		= blkdev_iopoll,
 	.mmap		= generic_file_mmap,
 	.fsync		= blkdev_fsync,
 	.unlocked_ioctl	= block_ioctl,
-- 
2.17.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH 3/8] iomap/xfs: wire up file_operations ->iopoll()
  2018-11-20 17:19 [PATCHSET v2] Support for polled aio Jens Axboe
  2018-11-20 17:19 ` [PATCH 1/8] fs: add file_operations ->iopoll() handler Jens Axboe
  2018-11-20 17:19 ` [PATCH 2/8] block: wire up block device ->iopoll() Jens Axboe
@ 2018-11-20 17:19 ` Jens Axboe
  2018-11-21  9:15   ` Benny Halevy
  2018-11-20 17:19 ` [PATCH 4/8] aio: use assigned completion handler Jens Axboe
                   ` (4 subsequent siblings)
  7 siblings, 1 reply; 16+ messages in thread
From: Jens Axboe @ 2018-11-20 17:19 UTC (permalink / raw)
  To: linux-block, linux-aio, linux-fsdevel; +Cc: Jens Axboe

Add an iomap implementation of fops->iopoll() and wire it up for
XFS.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/iomap.c            | 50 +++++++++++++++++++++++++++++--------------
 fs/xfs/xfs_file.c     |  1 +
 include/linux/iomap.h |  1 +
 3 files changed, 36 insertions(+), 16 deletions(-)

diff --git a/fs/iomap.c b/fs/iomap.c
index 74c1f37f0fd6..faf96198f99a 100644
--- a/fs/iomap.c
+++ b/fs/iomap.c
@@ -1419,14 +1419,14 @@ struct iomap_dio {
 	unsigned		flags;
 	int			error;
 	bool			wait_for_completion;
+	blk_qc_t		cookie;
+	struct request_queue	*last_queue;
 
 	union {
 		/* used during submission and for synchronous completion: */
 		struct {
 			struct iov_iter		*iter;
 			struct task_struct	*waiter;
-			struct request_queue	*last_queue;
-			blk_qc_t		cookie;
 		} submit;
 
 		/* used for aio completion: */
@@ -1436,6 +1436,30 @@ struct iomap_dio {
 	};
 };
 
+int iomap_dio_iopoll(struct kiocb *kiocb, bool spin)
+{
+	struct iomap_dio *dio = kiocb->private;
+	struct request_queue *q = READ_ONCE(dio->last_queue);
+
+	if (!q || dio->cookie == BLK_QC_T_NONE)
+		return 0;
+	return blk_poll(q, READ_ONCE(dio->cookie), spin);
+}
+EXPORT_SYMBOL_GPL(iomap_dio_iopoll);
+
+static void iomap_dio_submit_bio(struct iomap_dio *dio, struct iomap *iomap,
+		struct bio *bio)
+{
+	atomic_inc(&dio->ref);
+
+	/*
+	 * iomap_dio_iopoll can race with us.  A non-zero last_queue marks that
+	 * we are ready to poll.
+	 */
+	WRITE_ONCE(dio->cookie, submit_bio(bio));
+	WRITE_ONCE(dio->last_queue, bdev_get_queue(iomap->bdev));
+}
+
 static ssize_t iomap_dio_complete(struct iomap_dio *dio)
 {
 	struct kiocb *iocb = dio->iocb;
@@ -1548,7 +1572,7 @@ static void iomap_dio_bio_end_io(struct bio *bio)
 	}
 }
 
-static blk_qc_t
+static void
 iomap_dio_zero(struct iomap_dio *dio, struct iomap *iomap, loff_t pos,
 		unsigned len)
 {
@@ -1568,9 +1592,7 @@ iomap_dio_zero(struct iomap_dio *dio, struct iomap *iomap, loff_t pos,
 	get_page(page);
 	__bio_add_page(bio, page, len, 0);
 	bio_set_op_attrs(bio, REQ_OP_WRITE, flags);
-
-	atomic_inc(&dio->ref);
-	return submit_bio(bio);
+	iomap_dio_submit_bio(dio, iomap, bio);
 }
 
 static loff_t
@@ -1676,11 +1698,7 @@ iomap_dio_bio_actor(struct inode *inode, loff_t pos, loff_t length,
 		copied += n;
 
 		nr_pages = iov_iter_npages(&iter, BIO_MAX_PAGES);
-
-		atomic_inc(&dio->ref);
-
-		dio->submit.last_queue = bdev_get_queue(iomap->bdev);
-		dio->submit.cookie = submit_bio(bio);
+		iomap_dio_submit_bio(dio, iomap, bio);
 	} while (nr_pages);
 
 	if (need_zeroout) {
@@ -1782,6 +1800,7 @@ iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
 	dio = kmalloc(sizeof(*dio), GFP_KERNEL);
 	if (!dio)
 		return -ENOMEM;
+	iocb->private = dio;
 
 	dio->iocb = iocb;
 	atomic_set(&dio->ref, 1);
@@ -1791,11 +1810,11 @@ iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
 	dio->error = 0;
 	dio->flags = 0;
 	dio->wait_for_completion = is_sync_kiocb(iocb);
+	dio->cookie = BLK_QC_T_NONE;
+	dio->last_queue = NULL;
 
 	dio->submit.iter = iter;
 	dio->submit.waiter = current;
-	dio->submit.cookie = BLK_QC_T_NONE;
-	dio->submit.last_queue = NULL;
 
 	if (iov_iter_rw(iter) == READ) {
 		if (pos >= dio->i_size)
@@ -1894,9 +1913,8 @@ iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
 				break;
 
 			if (!(iocb->ki_flags & IOCB_HIPRI) ||
-			    !dio->submit.last_queue ||
-			    !blk_poll(dio->submit.last_queue,
-					 dio->submit.cookie, true))
+			    !dio->last_queue ||
+			    !blk_poll(dio->last_queue, dio->cookie, true))
 				io_schedule();
 		}
 		__set_current_state(TASK_RUNNING);
diff --git a/fs/xfs/xfs_file.c b/fs/xfs/xfs_file.c
index 53c9ab8fb777..603e705781a4 100644
--- a/fs/xfs/xfs_file.c
+++ b/fs/xfs/xfs_file.c
@@ -1203,6 +1203,7 @@ const struct file_operations xfs_file_operations = {
 	.write_iter	= xfs_file_write_iter,
 	.splice_read	= generic_file_splice_read,
 	.splice_write	= iter_file_splice_write,
+	.iopoll		= iomap_dio_iopoll,
 	.unlocked_ioctl	= xfs_file_ioctl,
 #ifdef CONFIG_COMPAT
 	.compat_ioctl	= xfs_file_compat_ioctl,
diff --git a/include/linux/iomap.h b/include/linux/iomap.h
index 9a4258154b25..0fefb5455bda 100644
--- a/include/linux/iomap.h
+++ b/include/linux/iomap.h
@@ -162,6 +162,7 @@ typedef int (iomap_dio_end_io_t)(struct kiocb *iocb, ssize_t ret,
 		unsigned flags);
 ssize_t iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
 		const struct iomap_ops *ops, iomap_dio_end_io_t end_io);
+int iomap_dio_iopoll(struct kiocb *kiocb, bool spin);
 
 #ifdef CONFIG_SWAP
 struct file;
-- 
2.17.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH 4/8] aio: use assigned completion handler
  2018-11-20 17:19 [PATCHSET v2] Support for polled aio Jens Axboe
                   ` (2 preceding siblings ...)
  2018-11-20 17:19 ` [PATCH 3/8] iomap/xfs: wire up file_operations ->iopoll() Jens Axboe
@ 2018-11-20 17:19 ` Jens Axboe
  2018-11-20 17:19 ` [PATCH 5/8] aio: fix failure to put the file pointer Jens Axboe
                   ` (3 subsequent siblings)
  7 siblings, 0 replies; 16+ messages in thread
From: Jens Axboe @ 2018-11-20 17:19 UTC (permalink / raw)
  To: linux-block, linux-aio, linux-fsdevel; +Cc: Jens Axboe

We know this is a read/write request, but in preparation for
having different kinds of those, ensure that we call the assigned
handler instead of assuming it's aio_complete_rq().

Reviewed-by: Christoph Hellwig <hch@lst.de>
Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/aio.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/fs/aio.c b/fs/aio.c
index b984918be4b7..574a14b43037 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -1484,7 +1484,7 @@ static inline void aio_rw_done(struct kiocb *req, ssize_t ret)
 		ret = -EINTR;
 		/*FALLTHRU*/
 	default:
-		aio_complete_rw(req, ret, 0);
+		req->ki_complete(req, ret, 0);
 	}
 }
 
-- 
2.17.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH 5/8] aio: fix failure to put the file pointer
  2018-11-20 17:19 [PATCHSET v2] Support for polled aio Jens Axboe
                   ` (3 preceding siblings ...)
  2018-11-20 17:19 ` [PATCH 4/8] aio: use assigned completion handler Jens Axboe
@ 2018-11-20 17:19 ` Jens Axboe
  2018-11-20 17:19 ` [PATCH 6/8] aio: add io_setup2() system call Jens Axboe
                   ` (2 subsequent siblings)
  7 siblings, 0 replies; 16+ messages in thread
From: Jens Axboe @ 2018-11-20 17:19 UTC (permalink / raw)
  To: linux-block, linux-aio, linux-fsdevel; +Cc: Jens Axboe

If the ioprio capability check fails, we return without putting
the file pointer.

Fixes: d9a08a9e616b ("fs: Add aio iopriority support")
Reviewed-by: Christoph Hellwig <hch@lst.de>
Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/aio.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/fs/aio.c b/fs/aio.c
index 574a14b43037..611e9824279d 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -1436,6 +1436,7 @@ static int aio_prep_rw(struct kiocb *req, struct iocb *iocb)
 		ret = ioprio_check_cap(iocb->aio_reqprio);
 		if (ret) {
 			pr_debug("aio ioprio check cap error: %d\n", ret);
+			fput(req->ki_filp);
 			return ret;
 		}
 
-- 
2.17.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH 6/8] aio: add io_setup2() system call
  2018-11-20 17:19 [PATCHSET v2] Support for polled aio Jens Axboe
                   ` (4 preceding siblings ...)
  2018-11-20 17:19 ` [PATCH 5/8] aio: fix failure to put the file pointer Jens Axboe
@ 2018-11-20 17:19 ` Jens Axboe
  2018-11-20 17:19 ` [PATCH 7/8] aio: separate out ring reservation from req allocation Jens Axboe
  2018-11-20 17:19 ` [PATCH 8/8] aio: support for IO polling Jens Axboe
  7 siblings, 0 replies; 16+ messages in thread
From: Jens Axboe @ 2018-11-20 17:19 UTC (permalink / raw)
  To: linux-block, linux-aio, linux-fsdevel; +Cc: Jens Axboe

This is just like io_setup(), except we get to pass in flags for what
kind of behavior we want.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 arch/x86/entry/syscalls/syscall_64.tbl |  1 +
 fs/aio.c                               | 67 +++++++++++++++++---------
 include/linux/syscalls.h               |  2 +
 include/uapi/asm-generic/unistd.h      |  4 +-
 kernel/sys_ni.c                        |  1 +
 5 files changed, 51 insertions(+), 24 deletions(-)

diff --git a/arch/x86/entry/syscalls/syscall_64.tbl b/arch/x86/entry/syscalls/syscall_64.tbl
index f0b1709a5ffb..67c357225fb0 100644
--- a/arch/x86/entry/syscalls/syscall_64.tbl
+++ b/arch/x86/entry/syscalls/syscall_64.tbl
@@ -343,6 +343,7 @@
 332	common	statx			__x64_sys_statx
 333	common	io_pgetevents		__x64_sys_io_pgetevents
 334	common	rseq			__x64_sys_rseq
+335	common	io_setup2		__x64_sys_io_setup2
 
 #
 # x32-specific system call numbers start at 512 to avoid cache impact
diff --git a/fs/aio.c b/fs/aio.c
index 611e9824279d..8453bb849c32 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -683,7 +683,7 @@ static void aio_nr_sub(unsigned nr)
 /* ioctx_alloc
  *	Allocates and initializes an ioctx.  Returns an ERR_PTR if it failed.
  */
-static struct kioctx *ioctx_alloc(unsigned nr_events)
+static struct kioctx *ioctx_alloc(unsigned nr_events, unsigned int flags)
 {
 	struct mm_struct *mm = current->mm;
 	struct kioctx *ctx;
@@ -1269,6 +1269,44 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr,
 	return ret;
 }
 
+static struct kioctx *io_setup_flags(unsigned long ctx, unsigned int nr_events,
+				     unsigned int flags)
+{
+	if (unlikely(ctx || nr_events == 0)) {
+		pr_debug("EINVAL: ctx %lu nr_events %u\n",
+		         ctx, nr_events);
+		return ERR_PTR(-EINVAL);
+	}
+
+	return ioctx_alloc(nr_events, flags);
+}
+
+SYSCALL_DEFINE3(io_setup2, u32, nr_events, u32, flags,
+		aio_context_t __user *, ctxp)
+{
+	struct kioctx *ioctx;
+	unsigned long ctx;
+	long ret;
+
+	if (flags)
+		return -EINVAL;
+
+	ret = get_user(ctx, ctxp);
+	if (unlikely(ret))
+		goto out;
+
+	ioctx = io_setup_flags(ctx, nr_events, flags);
+	ret = PTR_ERR(ioctx);
+	if (!IS_ERR(ioctx)) {
+		ret = put_user(ioctx->user_id, ctxp);
+		if (ret)
+			kill_ioctx(current->mm, ioctx, NULL);
+		percpu_ref_put(&ioctx->users);
+	}
+out:
+	return ret;
+}
+
 /* sys_io_setup:
  *	Create an aio_context capable of receiving at least nr_events.
  *	ctxp must not point to an aio_context that already exists, and
@@ -1284,7 +1322,7 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr,
  */
 SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp)
 {
-	struct kioctx *ioctx = NULL;
+	struct kioctx *ioctx;
 	unsigned long ctx;
 	long ret;
 
@@ -1292,14 +1330,7 @@ SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp)
 	if (unlikely(ret))
 		goto out;
 
-	ret = -EINVAL;
-	if (unlikely(ctx || nr_events == 0)) {
-		pr_debug("EINVAL: ctx %lu nr_events %u\n",
-		         ctx, nr_events);
-		goto out;
-	}
-
-	ioctx = ioctx_alloc(nr_events);
+	ioctx = io_setup_flags(ctx, nr_events, 0);
 	ret = PTR_ERR(ioctx);
 	if (!IS_ERR(ioctx)) {
 		ret = put_user(ioctx->user_id, ctxp);
@@ -1307,7 +1338,6 @@ SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp)
 			kill_ioctx(current->mm, ioctx, NULL);
 		percpu_ref_put(&ioctx->users);
 	}
-
 out:
 	return ret;
 }
@@ -1315,7 +1345,7 @@ SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp)
 #ifdef CONFIG_COMPAT
 COMPAT_SYSCALL_DEFINE2(io_setup, unsigned, nr_events, u32 __user *, ctx32p)
 {
-	struct kioctx *ioctx = NULL;
+	struct kioctx *ioctx;
 	unsigned long ctx;
 	long ret;
 
@@ -1323,23 +1353,14 @@ COMPAT_SYSCALL_DEFINE2(io_setup, unsigned, nr_events, u32 __user *, ctx32p)
 	if (unlikely(ret))
 		goto out;
 
-	ret = -EINVAL;
-	if (unlikely(ctx || nr_events == 0)) {
-		pr_debug("EINVAL: ctx %lu nr_events %u\n",
-		         ctx, nr_events);
-		goto out;
-	}
-
-	ioctx = ioctx_alloc(nr_events);
+	ioctx = io_setup_flags(ctx, nr_events, 0);
 	ret = PTR_ERR(ioctx);
 	if (!IS_ERR(ioctx)) {
-		/* truncating is ok because it's a user address */
-		ret = put_user((u32)ioctx->user_id, ctx32p);
+		ret = put_user(ioctx->user_id, ctx32p);
 		if (ret)
 			kill_ioctx(current->mm, ioctx, NULL);
 		percpu_ref_put(&ioctx->users);
 	}
-
 out:
 	return ret;
 }
diff --git a/include/linux/syscalls.h b/include/linux/syscalls.h
index 2ac3d13a915b..455fabc5b0ac 100644
--- a/include/linux/syscalls.h
+++ b/include/linux/syscalls.h
@@ -287,6 +287,8 @@ static inline void addr_limit_user_check(void)
  */
 #ifndef CONFIG_ARCH_HAS_SYSCALL_WRAPPER
 asmlinkage long sys_io_setup(unsigned nr_reqs, aio_context_t __user *ctx);
+asmlinkage long sys_io_setup2(unsigned nr_reqs, unsigned flags,
+				aio_context_t __user *ctx);
 asmlinkage long sys_io_destroy(aio_context_t ctx);
 asmlinkage long sys_io_submit(aio_context_t, long,
 			struct iocb __user * __user *);
diff --git a/include/uapi/asm-generic/unistd.h b/include/uapi/asm-generic/unistd.h
index 538546edbfbd..b4527ed373b0 100644
--- a/include/uapi/asm-generic/unistd.h
+++ b/include/uapi/asm-generic/unistd.h
@@ -738,9 +738,11 @@ __SYSCALL(__NR_statx,     sys_statx)
 __SC_COMP(__NR_io_pgetevents, sys_io_pgetevents, compat_sys_io_pgetevents)
 #define __NR_rseq 293
 __SYSCALL(__NR_rseq, sys_rseq)
+#define __NR_io_setup2 294
+__SYSCALL(__NR_io_setup2, sys_io_setup2)
 
 #undef __NR_syscalls
-#define __NR_syscalls 294
+#define __NR_syscalls 295
 
 /*
  * 32 bit systems traditionally used different
diff --git a/kernel/sys_ni.c b/kernel/sys_ni.c
index df556175be50..17c8b4393669 100644
--- a/kernel/sys_ni.c
+++ b/kernel/sys_ni.c
@@ -37,6 +37,7 @@ asmlinkage long sys_ni_syscall(void)
  */
 
 COND_SYSCALL(io_setup);
+COND_SYSCALL(io_setup2);
 COND_SYSCALL_COMPAT(io_setup);
 COND_SYSCALL(io_destroy);
 COND_SYSCALL(io_submit);
-- 
2.17.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH 7/8] aio: separate out ring reservation from req allocation
  2018-11-20 17:19 [PATCHSET v2] Support for polled aio Jens Axboe
                   ` (5 preceding siblings ...)
  2018-11-20 17:19 ` [PATCH 6/8] aio: add io_setup2() system call Jens Axboe
@ 2018-11-20 17:19 ` Jens Axboe
  2018-11-20 17:19 ` [PATCH 8/8] aio: support for IO polling Jens Axboe
  7 siblings, 0 replies; 16+ messages in thread
From: Jens Axboe @ 2018-11-20 17:19 UTC (permalink / raw)
  To: linux-block, linux-aio, linux-fsdevel; +Cc: Jens Axboe

This is in preparation for certain types of IO not needing a ring
reserveration.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/aio.c | 30 +++++++++++++++++-------------
 1 file changed, 17 insertions(+), 13 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index 8453bb849c32..8bbb0b77d9c4 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -901,7 +901,7 @@ static void put_reqs_available(struct kioctx *ctx, unsigned nr)
 	local_irq_restore(flags);
 }
 
-static bool get_reqs_available(struct kioctx *ctx)
+static bool __get_reqs_available(struct kioctx *ctx)
 {
 	struct kioctx_cpu *kcpu;
 	bool ret = false;
@@ -993,6 +993,14 @@ static void user_refill_reqs_available(struct kioctx *ctx)
 	spin_unlock_irq(&ctx->completion_lock);
 }
 
+static bool get_reqs_available(struct kioctx *ctx)
+{
+	if (__get_reqs_available(ctx))
+		return true;
+	user_refill_reqs_available(ctx);
+	return __get_reqs_available(ctx);
+}
+
 /* aio_get_req
  *	Allocate a slot for an aio request.
  * Returns NULL if no requests are free.
@@ -1001,24 +1009,15 @@ static inline struct aio_kiocb *aio_get_req(struct kioctx *ctx)
 {
 	struct aio_kiocb *req;
 
-	if (!get_reqs_available(ctx)) {
-		user_refill_reqs_available(ctx);
-		if (!get_reqs_available(ctx))
-			return NULL;
-	}
-
 	req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
 	if (unlikely(!req))
-		goto out_put;
+		return NULL;
 
 	percpu_ref_get(&ctx->reqs);
 	INIT_LIST_HEAD(&req->ki_list);
 	refcount_set(&req->ki_refcnt, 0);
 	req->ki_ctx = ctx;
 	return req;
-out_put:
-	put_reqs_available(ctx, 1);
-	return NULL;
 }
 
 static struct kioctx *lookup_ioctx(unsigned long ctx_id)
@@ -1821,9 +1820,13 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 		return -EINVAL;
 	}
 
+	if (!get_reqs_available(ctx))
+		return -EAGAIN;
+
+	ret = -EAGAIN;
 	req = aio_get_req(ctx);
 	if (unlikely(!req))
-		return -EAGAIN;
+		goto out_put_reqs_available;
 
 	if (iocb.aio_flags & IOCB_FLAG_RESFD) {
 		/*
@@ -1886,11 +1889,12 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 		goto out_put_req;
 	return 0;
 out_put_req:
-	put_reqs_available(ctx, 1);
 	percpu_ref_put(&ctx->reqs);
 	if (req->ki_eventfd)
 		eventfd_ctx_put(req->ki_eventfd);
 	kmem_cache_free(kiocb_cachep, req);
+out_put_reqs_available:
+	put_reqs_available(ctx, 1);
 	return ret;
 }
 
-- 
2.17.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH 8/8] aio: support for IO polling
  2018-11-20 17:19 [PATCHSET v2] Support for polled aio Jens Axboe
                   ` (6 preceding siblings ...)
  2018-11-20 17:19 ` [PATCH 7/8] aio: separate out ring reservation from req allocation Jens Axboe
@ 2018-11-20 17:19 ` Jens Axboe
  2018-11-21 11:12   ` Benny Halevy
  2018-11-22 11:13   ` Jan Kara
  7 siblings, 2 replies; 16+ messages in thread
From: Jens Axboe @ 2018-11-20 17:19 UTC (permalink / raw)
  To: linux-block, linux-aio, linux-fsdevel; +Cc: Jens Axboe

Add polled variants of PREAD/PREADV and PWRITE/PWRITEV. These act
like their non-polled counterparts, except we expect to poll for
completion of them. The polling happens at io_getevent() time, and
works just like non-polled IO.

To setup an io_context for polled IO, the application must call
io_setup2() with IOCTX_FLAG_IOPOLL as one of the flags. It is illegal
to mix and match polled and non-polled IO on an io_context.

Polled IO doesn't support the user mapped completion ring. Events
must be reaped through the io_getevents() system call. For non-irq
driven poll devices, there's no way to support completion reaping
from userspace by just looking at the ring. The application itself
is the one that pulls completion entries.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/aio.c                     | 377 ++++++++++++++++++++++++++++++-----
 include/uapi/linux/aio_abi.h |   4 +
 2 files changed, 336 insertions(+), 45 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index 8bbb0b77d9c4..ea93847d25d1 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -94,6 +94,8 @@ struct kioctx {
 
 	unsigned long		user_id;
 
+	unsigned int		flags;
+
 	struct __percpu kioctx_cpu *cpu;
 
 	/*
@@ -138,6 +140,19 @@ struct kioctx {
 		atomic_t	reqs_available;
 	} ____cacheline_aligned_in_smp;
 
+	/* iopoll submission state */
+	struct {
+		spinlock_t poll_lock;
+		struct list_head poll_submitted;
+	} ____cacheline_aligned_in_smp;
+
+	/* iopoll completion state */
+	struct {
+		struct list_head poll_completing;
+		unsigned long getevents_busy;
+		atomic_t poll_completed;
+	} ____cacheline_aligned_in_smp;
+
 	struct {
 		spinlock_t	ctx_lock;
 		struct list_head active_reqs;	/* used for cancellation */
@@ -191,13 +206,24 @@ struct aio_kiocb {
 
 	struct list_head	ki_list;	/* the aio core uses this
 						 * for cancellation */
+
+	unsigned long		ki_flags;
+#define IOCB_POLL_COMPLETED	0
+
 	refcount_t		ki_refcnt;
 
-	/*
-	 * If the aio_resfd field of the userspace iocb is not zero,
-	 * this is the underlying eventfd context to deliver events to.
-	 */
-	struct eventfd_ctx	*ki_eventfd;
+	union {
+		/*
+		 * If the aio_resfd field of the userspace iocb is not zero,
+		 * this is the underlying eventfd context to deliver events to.
+		 */
+		struct eventfd_ctx	*ki_eventfd;
+
+		/*
+		 * For polled IO, stash completion info here
+		 */
+		struct io_event		ki_ev;
+	};
 };
 
 /*------ sysctl variables----*/
@@ -214,6 +240,8 @@ static struct vfsmount *aio_mnt;
 static const struct file_operations aio_ring_fops;
 static const struct address_space_operations aio_ctx_aops;
 
+static void aio_iopoll_reap_events(struct kioctx *);
+
 static struct file *aio_private_file(struct kioctx *ctx, loff_t nr_pages)
 {
 	struct file *file;
@@ -451,11 +479,15 @@ static int aio_setup_ring(struct kioctx *ctx, unsigned int nr_events)
 	int i;
 	struct file *file;
 
-	/* Compensate for the ring buffer's head/tail overlap entry */
-	nr_events += 2;	/* 1 is required, 2 for good luck */
-
+	/*
+	 * Compensate for the ring buffer's head/tail overlap entry.
+	 * IO polling doesn't require any io event entries
+	 */
 	size = sizeof(struct aio_ring);
-	size += sizeof(struct io_event) * nr_events;
+	if (!(ctx->flags & IOCTX_FLAG_IOPOLL)) {
+		nr_events += 2;	/* 1 is required, 2 for good luck */
+		size += sizeof(struct io_event) * nr_events;
+	}
 
 	nr_pages = PFN_UP(size);
 	if (nr_pages < 0)
@@ -720,6 +752,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events, unsigned int flags)
 	if (!ctx)
 		return ERR_PTR(-ENOMEM);
 
+	ctx->flags = flags;
 	ctx->max_reqs = max_reqs;
 
 	spin_lock_init(&ctx->ctx_lock);
@@ -732,6 +765,11 @@ static struct kioctx *ioctx_alloc(unsigned nr_events, unsigned int flags)
 
 	INIT_LIST_HEAD(&ctx->active_reqs);
 
+	spin_lock_init(&ctx->poll_lock);
+	INIT_LIST_HEAD(&ctx->poll_submitted);
+	INIT_LIST_HEAD(&ctx->poll_completing);
+	atomic_set(&ctx->poll_completed, 0);
+
 	if (percpu_ref_init(&ctx->users, free_ioctx_users, 0, GFP_KERNEL))
 		goto err;
 
@@ -814,6 +852,8 @@ static int kill_ioctx(struct mm_struct *mm, struct kioctx *ctx,
 	RCU_INIT_POINTER(table->table[ctx->id], NULL);
 	spin_unlock(&mm->ioctx_lock);
 
+	aio_iopoll_reap_events(ctx);
+
 	/* free_ioctx_reqs() will do the necessary RCU synchronization */
 	wake_up_all(&ctx->wait);
 
@@ -1056,6 +1096,24 @@ static inline void iocb_put(struct aio_kiocb *iocb)
 	}
 }
 
+static void iocb_put_many(struct kioctx *ctx, void **iocbs, int *nr)
+{
+	if (nr) {
+		kmem_cache_free_bulk(kiocb_cachep, *nr, iocbs);
+		percpu_ref_put_many(&ctx->reqs, *nr);
+		*nr = 0;
+	}
+}
+
+static void aio_fill_event(struct io_event *ev, struct aio_kiocb *iocb,
+			   long res, long res2)
+{
+	ev->obj = (u64)(unsigned long)iocb->ki_user_iocb;
+	ev->data = iocb->ki_user_data;
+	ev->res = res;
+	ev->res2 = res2;
+}
+
 /* aio_complete
  *	Called when the io request on the given iocb is complete.
  */
@@ -1083,10 +1141,7 @@ static void aio_complete(struct aio_kiocb *iocb, long res, long res2)
 	ev_page = kmap_atomic(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
 	event = ev_page + pos % AIO_EVENTS_PER_PAGE;
 
-	event->obj = (u64)(unsigned long)iocb->ki_user_iocb;
-	event->data = iocb->ki_user_data;
-	event->res = res;
-	event->res2 = res2;
+	aio_fill_event(event, iocb, res, res2);
 
 	kunmap_atomic(ev_page);
 	flush_dcache_page(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
@@ -1239,6 +1294,165 @@ static bool aio_read_events(struct kioctx *ctx, long min_nr, long nr,
 	return ret < 0 || *i >= min_nr;
 }
 
+#define AIO_POLL_STACK	8
+
+/*
+ * Process completed iocb iopoll entries, copying the result to userspace.
+ */
+static long aio_iopoll_reap(struct kioctx *ctx, struct io_event __user *evs,
+			    unsigned int *nr_events, long max)
+{
+	void *iocbs[AIO_POLL_STACK];
+	struct aio_kiocb *iocb, *n;
+	int to_free = 0, ret = 0;
+
+	list_for_each_entry_safe(iocb, n, &ctx->poll_completing, ki_list) {
+		if (!test_bit(IOCB_POLL_COMPLETED, &iocb->ki_flags))
+			continue;
+		if (to_free == AIO_POLL_STACK)
+			iocb_put_many(ctx, iocbs, &to_free);
+
+		list_del(&iocb->ki_list);
+		iocbs[to_free++] = iocb;
+
+		fput(iocb->rw.ki_filp);
+
+		if (!evs) {
+			(*nr_events)++;
+			continue;
+		}
+
+		if (copy_to_user(evs + *nr_events, &iocb->ki_ev,
+		    sizeof(iocb->ki_ev))) {
+			ret = -EFAULT;
+			break;
+		}
+		if (++(*nr_events) == max)
+			break;
+	}
+
+	if (to_free)
+		iocb_put_many(ctx, iocbs, &to_free);
+
+	return ret;
+}
+
+static int __aio_iopoll_check(struct kioctx *ctx, struct io_event __user *event,
+			      unsigned int *nr_events, long min, long max)
+{
+	struct aio_kiocb *iocb;
+	unsigned int poll_completed;
+	int to_poll, polled, ret;
+
+	/*
+	 * Check if we already have done events that satisfy what we need
+	 */
+	if (!list_empty(&ctx->poll_completing)) {
+		ret = aio_iopoll_reap(ctx, event, nr_events, max);
+		if (ret < 0)
+			return ret;
+		if (*nr_events >= min)
+			return 0;
+	}
+
+	/*
+	 * Take in a new working set from the submitted list if possible.
+	 */
+	if (!list_empty_careful(&ctx->poll_submitted)) {
+		spin_lock(&ctx->poll_lock);
+		list_splice_init(&ctx->poll_submitted, &ctx->poll_completing);
+		spin_unlock(&ctx->poll_lock);
+	}
+
+	if (list_empty(&ctx->poll_completing))
+		return 0;
+
+	/*
+	 * Check again now that we have a new batch.
+	 */
+	ret = aio_iopoll_reap(ctx, event, nr_events, max);
+	if (ret < 0)
+		return ret;
+	if (*nr_events >= min)
+		return 0;
+
+	/*
+	 * Find up to 'max_nr' worth of events to poll for, including the
+	 * events we already successfully polled
+	 */
+	polled = to_poll = 0;
+	poll_completed = atomic_read(&ctx->poll_completed);
+	list_for_each_entry(iocb, &ctx->poll_completing, ki_list) {
+		/*
+		 * Poll for needed events with wait == true, anything after
+		 * that we just check if we have more, up to max.
+		 */
+		bool wait = polled + *nr_events >= min;
+		struct kiocb *kiocb = &iocb->rw;
+
+		if (test_bit(IOCB_POLL_COMPLETED, &iocb->ki_flags))
+			break;
+		if (++to_poll + *nr_events >= max)
+			break;
+
+		polled += kiocb->ki_filp->f_op->iopoll(kiocb, wait);
+		if (polled + *nr_events >= max)
+			break;
+		if (poll_completed != atomic_read(&ctx->poll_completed))
+			break;
+	}
+
+	ret = aio_iopoll_reap(ctx, event, nr_events, max);
+	if (ret < 0)
+		return ret;
+	if (*nr_events >= min)
+		return 0;
+	return to_poll;
+}
+
+/*
+ * We can't just wait for polled events to come to us, we have to actively
+ * find and complete them.
+ */
+static void aio_iopoll_reap_events(struct kioctx *ctx)
+{
+	if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
+		return;
+
+	while (!list_empty_careful(&ctx->poll_submitted) ||
+	       !list_empty(&ctx->poll_completing)) {
+		unsigned int nr_events = 0;
+
+		__aio_iopoll_check(ctx, NULL, &nr_events, 1, UINT_MAX);
+	}
+}
+
+static int aio_iopoll_check(struct kioctx *ctx, long min_nr, long nr,
+			    struct io_event __user *event)
+{
+	unsigned int nr_events = 0;
+	int ret = 0;
+
+	/* * Only allow one thread polling at a time */
+	if (test_and_set_bit(0, &ctx->getevents_busy))
+		return -EBUSY;
+
+	while (!nr_events || !need_resched()) {
+		int tmin = 0;
+
+		if (nr_events < min_nr)
+			tmin = min_nr - nr_events;
+
+		ret = __aio_iopoll_check(ctx, event, &nr_events, tmin, nr);
+		if (ret <= 0)
+			break;
+		ret = 0;
+	}
+
+	clear_bit(0, &ctx->getevents_busy);
+	return nr_events ? nr_events : ret;
+}
+
 static long read_events(struct kioctx *ctx, long min_nr, long nr,
 			struct io_event __user *event,
 			ktime_t until)
@@ -1287,7 +1501,7 @@ SYSCALL_DEFINE3(io_setup2, u32, nr_events, u32, flags,
 	unsigned long ctx;
 	long ret;
 
-	if (flags)
+	if (flags & ~IOCTX_FLAG_IOPOLL)
 		return -EINVAL;
 
 	ret = get_user(ctx, ctxp);
@@ -1411,13 +1625,8 @@ static void aio_remove_iocb(struct aio_kiocb *iocb)
 	spin_unlock_irqrestore(&ctx->ctx_lock, flags);
 }
 
-static void aio_complete_rw(struct kiocb *kiocb, long res, long res2)
+static void kiocb_end_write(struct kiocb *kiocb)
 {
-	struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, rw);
-
-	if (!list_empty_careful(&iocb->ki_list))
-		aio_remove_iocb(iocb);
-
 	if (kiocb->ki_flags & IOCB_WRITE) {
 		struct inode *inode = file_inode(kiocb->ki_filp);
 
@@ -1429,19 +1638,42 @@ static void aio_complete_rw(struct kiocb *kiocb, long res, long res2)
 			__sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
 		file_end_write(kiocb->ki_filp);
 	}
+}
+
+static void aio_complete_rw(struct kiocb *kiocb, long res, long res2)
+{
+	struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, rw);
+
+	if (!list_empty_careful(&iocb->ki_list))
+		aio_remove_iocb(iocb);
+
+	kiocb_end_write(kiocb);
 
 	fput(kiocb->ki_filp);
 	aio_complete(iocb, res, res2);
 }
 
-static int aio_prep_rw(struct kiocb *req, struct iocb *iocb)
+static void aio_complete_rw_poll(struct kiocb *kiocb, long res, long res2)
 {
+	struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, rw);
+	struct kioctx *ctx = iocb->ki_ctx;
+
+	kiocb_end_write(kiocb);
+
+	aio_fill_event(&iocb->ki_ev, iocb, res, res2);
+	set_bit(IOCB_POLL_COMPLETED, &iocb->ki_flags);
+	atomic_inc(&ctx->poll_completed);
+}
+
+static int aio_prep_rw(struct aio_kiocb *kiocb, struct iocb *iocb)
+{
+	struct kioctx *ctx = kiocb->ki_ctx;
+	struct kiocb *req = &kiocb->rw;
 	int ret;
 
 	req->ki_filp = fget(iocb->aio_fildes);
 	if (unlikely(!req->ki_filp))
 		return -EBADF;
-	req->ki_complete = aio_complete_rw;
 	req->ki_pos = iocb->aio_offset;
 	req->ki_flags = iocb_flags(req->ki_filp);
 	if (iocb->aio_flags & IOCB_FLAG_RESFD)
@@ -1456,8 +1688,7 @@ static int aio_prep_rw(struct kiocb *req, struct iocb *iocb)
 		ret = ioprio_check_cap(iocb->aio_reqprio);
 		if (ret) {
 			pr_debug("aio ioprio check cap error: %d\n", ret);
-			fput(req->ki_filp);
-			return ret;
+			goto out_fput;
 		}
 
 		req->ki_ioprio = iocb->aio_reqprio;
@@ -1466,7 +1697,41 @@ static int aio_prep_rw(struct kiocb *req, struct iocb *iocb)
 
 	ret = kiocb_set_rw_flags(req, iocb->aio_rw_flags);
 	if (unlikely(ret))
-		fput(req->ki_filp);
+		goto out_fput;
+
+	if (iocb->aio_flags & IOCB_FLAG_HIPRI) {
+		/* shares space in the union, and is rather pointless.. */
+		ret = -EINVAL;
+		if (iocb->aio_flags & IOCB_FLAG_RESFD)
+			goto out_fput;
+
+		/* can't submit polled IO to a non-polled ctx */
+		if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
+			goto out_fput;
+
+		ret = -EOPNOTSUPP;
+		if (!(req->ki_flags & IOCB_DIRECT) ||
+		    !req->ki_filp->f_op->iopoll)
+			goto out_fput;
+
+		req->ki_flags |= IOCB_HIPRI;
+		req->ki_complete = aio_complete_rw_poll;
+
+		spin_lock(&ctx->poll_lock);
+		list_add_tail(&kiocb->ki_list, &ctx->poll_submitted);
+		spin_unlock(&ctx->poll_lock);
+	} else {
+		/* can't submit non-polled IO to a polled ctx */
+		ret = -EINVAL;
+		if (ctx->flags & IOCTX_FLAG_IOPOLL)
+			goto out_fput;
+
+		req->ki_complete = aio_complete_rw;
+	}
+
+	return 0;
+out_fput:
+	fput(req->ki_filp);
 	return ret;
 }
 
@@ -1509,15 +1774,16 @@ static inline void aio_rw_done(struct kiocb *req, ssize_t ret)
 	}
 }
 
-static ssize_t aio_read(struct kiocb *req, struct iocb *iocb, bool vectored,
-		bool compat)
+static ssize_t aio_read(struct aio_kiocb *kiocb, struct iocb *iocb,
+			bool vectored, bool compat)
 {
 	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
+	struct kiocb *req = &kiocb->rw;
 	struct iov_iter iter;
 	struct file *file;
 	ssize_t ret;
 
-	ret = aio_prep_rw(req, iocb);
+	ret = aio_prep_rw(kiocb, iocb);
 	if (ret)
 		return ret;
 	file = req->ki_filp;
@@ -1542,15 +1808,16 @@ static ssize_t aio_read(struct kiocb *req, struct iocb *iocb, bool vectored,
 	return ret;
 }
 
-static ssize_t aio_write(struct kiocb *req, struct iocb *iocb, bool vectored,
-		bool compat)
+static ssize_t aio_write(struct aio_kiocb *kiocb, struct iocb *iocb,
+			 bool vectored, bool compat)
 {
 	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
+	struct kiocb *req = &kiocb->rw;
 	struct iov_iter iter;
 	struct file *file;
 	ssize_t ret;
 
-	ret = aio_prep_rw(req, iocb);
+	ret = aio_prep_rw(kiocb, iocb);
 	if (ret)
 		return ret;
 	file = req->ki_filp;
@@ -1820,7 +2087,8 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 		return -EINVAL;
 	}
 
-	if (!get_reqs_available(ctx))
+	/* Poll IO doesn't need ring reservations */
+	if (!(ctx->flags & IOCTX_FLAG_IOPOLL) && !get_reqs_available(ctx))
 		return -EAGAIN;
 
 	ret = -EAGAIN;
@@ -1843,35 +2111,45 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 		}
 	}
 
-	ret = put_user(KIOCB_KEY, &user_iocb->aio_key);
-	if (unlikely(ret)) {
-		pr_debug("EFAULT: aio_key\n");
-		goto out_put_req;
+	/* polled IO isn't cancelable, don't bother copying the key */
+	if (!(ctx->flags & IOCTX_FLAG_IOPOLL)) {
+		ret = put_user(KIOCB_KEY, &user_iocb->aio_key);
+		if (unlikely(ret)) {
+			pr_debug("EFAULT: aio_key\n");
+			goto out_put_req;
+		}
 	}
 
 	req->ki_user_iocb = user_iocb;
 	req->ki_user_data = iocb.aio_data;
 
+	ret = -EINVAL;
 	switch (iocb.aio_lio_opcode) {
 	case IOCB_CMD_PREAD:
-		ret = aio_read(&req->rw, &iocb, false, compat);
+		ret = aio_read(req, &iocb, false, compat);
 		break;
 	case IOCB_CMD_PWRITE:
-		ret = aio_write(&req->rw, &iocb, false, compat);
+		ret = aio_write(req, &iocb, false, compat);
 		break;
 	case IOCB_CMD_PREADV:
-		ret = aio_read(&req->rw, &iocb, true, compat);
+		ret = aio_read(req, &iocb, true, compat);
 		break;
 	case IOCB_CMD_PWRITEV:
-		ret = aio_write(&req->rw, &iocb, true, compat);
+		ret = aio_write(req, &iocb, true, compat);
 		break;
 	case IOCB_CMD_FSYNC:
+		if (ctx->flags & IOCTX_FLAG_IOPOLL)
+			break;
 		ret = aio_fsync(&req->fsync, &iocb, false);
 		break;
 	case IOCB_CMD_FDSYNC:
+		if (ctx->flags & IOCTX_FLAG_IOPOLL)
+			break;
 		ret = aio_fsync(&req->fsync, &iocb, true);
 		break;
 	case IOCB_CMD_POLL:
+		if (ctx->flags & IOCTX_FLAG_IOPOLL)
+			break;
 		ret = aio_poll(req, &iocb);
 		break;
 	default:
@@ -1894,7 +2172,8 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 		eventfd_ctx_put(req->ki_eventfd);
 	kmem_cache_free(kiocb_cachep, req);
 out_put_reqs_available:
-	put_reqs_available(ctx, 1);
+	if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
+		put_reqs_available(ctx, 1);
 	return ret;
 }
 
@@ -1930,7 +2209,9 @@ SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr,
 	if (nr > ctx->nr_events)
 		nr = ctx->nr_events;
 
-	blk_start_plug(&plug);
+	if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
+		blk_start_plug(&plug);
+
 	for (i = 0; i < nr; i++) {
 		struct iocb __user *user_iocb;
 
@@ -1943,7 +2224,9 @@ SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr,
 		if (ret)
 			break;
 	}
-	blk_finish_plug(&plug);
+
+	if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
+		blk_finish_plug(&plug);
 
 	percpu_ref_put(&ctx->users);
 	return i ? i : ret;
@@ -2068,8 +2351,12 @@ static long do_io_getevents(aio_context_t ctx_id,
 	long ret = -EINVAL;
 
 	if (likely(ioctx)) {
-		if (likely(min_nr <= nr && min_nr >= 0))
-			ret = read_events(ioctx, min_nr, nr, events, until);
+		if (likely(min_nr <= nr && min_nr >= 0)) {
+			if (ioctx->flags & IOCTX_FLAG_IOPOLL)
+				ret = aio_iopoll_check(ioctx, min_nr, nr, events);
+			else
+				ret = read_events(ioctx, min_nr, nr, events, until);
+		}
 		percpu_ref_put(&ioctx->users);
 	}
 
diff --git a/include/uapi/linux/aio_abi.h b/include/uapi/linux/aio_abi.h
index 8387e0af0f76..3b98b5fbacde 100644
--- a/include/uapi/linux/aio_abi.h
+++ b/include/uapi/linux/aio_abi.h
@@ -52,9 +52,11 @@ enum {
  *                   is valid.
  * IOCB_FLAG_IOPRIO - Set if the "aio_reqprio" member of the "struct iocb"
  *                    is valid.
+ * IOCB_FLAG_HIPRI - Use IO completion polling
  */
 #define IOCB_FLAG_RESFD		(1 << 0)
 #define IOCB_FLAG_IOPRIO	(1 << 1)
+#define IOCB_FLAG_HIPRI		(1 << 2)
 
 /* read() from /dev/aio returns these structures. */
 struct io_event {
@@ -106,6 +108,8 @@ struct iocb {
 	__u32	aio_resfd;
 }; /* 64 bytes */
 
+#define IOCTX_FLAG_IOPOLL	(1 << 0)
+
 #undef IFBIG
 #undef IFLITTLE
 
-- 
2.17.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* Re: [PATCH 3/8] iomap/xfs: wire up file_operations ->iopoll()
  2018-11-20 17:19 ` [PATCH 3/8] iomap/xfs: wire up file_operations ->iopoll() Jens Axboe
@ 2018-11-21  9:15   ` Benny Halevy
  2018-11-21 13:27     ` Jens Axboe
  0 siblings, 1 reply; 16+ messages in thread
From: Benny Halevy @ 2018-11-21  9:15 UTC (permalink / raw)
  To: Jens Axboe, linux-block, linux-aio, linux-fsdevel

On Tue, 2018-11-20 at 10:19 -0700, Jens Axboe wrote:
> Add an iomap implementation of fops->iopoll() and wire it up for
> XFS.
> 
> Signed-off-by: Jens Axboe <axboe@kernel.dk>
> ---
>  fs/iomap.c            | 50 +++++++++++++++++++++++++++++--------------
>  fs/xfs/xfs_file.c     |  1 +
>  include/linux/iomap.h |  1 +
>  3 files changed, 36 insertions(+), 16 deletions(-)
> 
> diff --git a/fs/iomap.c b/fs/iomap.c
> index 74c1f37f0fd6..faf96198f99a 100644
> --- a/fs/iomap.c
> +++ b/fs/iomap.c
> @@ -1419,14 +1419,14 @@ struct iomap_dio {
>  	unsigned		flags;
>  	int			error;
>  	bool			wait_for_completion;
> +	blk_qc_t		cookie;
> +	struct request_queue	*last_queue;
>  
>  	union {
>  		/* used during submission and for synchronous completion: */
>  		struct {
>  			struct iov_iter		*iter;
>  			struct task_struct	*waiter;
> -			struct request_queue	*last_queue;
> -			blk_qc_t		cookie;
>  		} submit;
>  
>  		/* used for aio completion: */
> @@ -1436,6 +1436,30 @@ struct iomap_dio {
>  	};
>  };
>  
> +int iomap_dio_iopoll(struct kiocb *kiocb, bool spin)
> +{
> +	struct iomap_dio *dio = kiocb->private;
> +	struct request_queue *q = READ_ONCE(dio->last_queue);
> +
> +	if (!q || dio->cookie == BLK_QC_T_NONE)
> +		return 0;
> +	return blk_poll(q, READ_ONCE(dio->cookie), spin);

How about:
	blk_qc_t cookie = READ_ONCE(dio->cookie);

	if (!q || cookie == BLK_QC_T_NONE)
		return 0;
	return blk_poll(q, cookie, spin);

Is there a chance the dio->cookie in the if () condition
will be stale?

> 
> +}
> +EXPORT_SYMBOL_GPL(iomap_dio_iopoll);
> +
> +static void iomap_dio_submit_bio(struct iomap_dio *dio, struct iomap *iomap,
> +		struct bio *bio)
> +{
> +	atomic_inc(&dio->ref);
> +
> +	/*
> +	 * iomap_dio_iopoll can race with us.  A non-zero last_queue marks that
> +	 * we are ready to poll.
> +	 */
> +	WRITE_ONCE(dio->cookie, submit_bio(bio));
> +	WRITE_ONCE(dio->last_queue, bdev_get_queue(iomap->bdev));
> +}
> +
>  static ssize_t iomap_dio_complete(struct iomap_dio *dio)
>  {
>  	struct kiocb *iocb = dio->iocb;
> @@ -1548,7 +1572,7 @@ static void iomap_dio_bio_end_io(struct bio *bio)
>  	}
>  }
>  
> -static blk_qc_t
> +static void
>  iomap_dio_zero(struct iomap_dio *dio, struct iomap *iomap, loff_t pos,
>  		unsigned len)
>  {
> @@ -1568,9 +1592,7 @@ iomap_dio_zero(struct iomap_dio *dio, struct iomap *iomap, loff_t pos,
>  	get_page(page);
>  	__bio_add_page(bio, page, len, 0);
>  	bio_set_op_attrs(bio, REQ_OP_WRITE, flags);
> -
> -	atomic_inc(&dio->ref);
> -	return submit_bio(bio);
> +	iomap_dio_submit_bio(dio, iomap, bio);
>  }
>  
>  static loff_t
> @@ -1676,11 +1698,7 @@ iomap_dio_bio_actor(struct inode *inode, loff_t pos, loff_t length,
>  		copied += n;
>  
>  		nr_pages = iov_iter_npages(&iter, BIO_MAX_PAGES);
> -
> -		atomic_inc(&dio->ref);
> -
> -		dio->submit.last_queue = bdev_get_queue(iomap->bdev);
> -		dio->submit.cookie = submit_bio(bio);
> +		iomap_dio_submit_bio(dio, iomap, bio);
>  	} while (nr_pages);
>  
>  	if (need_zeroout) {
> @@ -1782,6 +1800,7 @@ iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
>  	dio = kmalloc(sizeof(*dio), GFP_KERNEL);
>  	if (!dio)
>  		return -ENOMEM;
> +	iocb->private = dio;
>  
>  	dio->iocb = iocb;
>  	atomic_set(&dio->ref, 1);
> @@ -1791,11 +1810,11 @@ iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
>  	dio->error = 0;
>  	dio->flags = 0;
>  	dio->wait_for_completion = is_sync_kiocb(iocb);
> +	dio->cookie = BLK_QC_T_NONE;
> +	dio->last_queue = NULL;
>  
>  	dio->submit.iter = iter;
>  	dio->submit.waiter = current;
> -	dio->submit.cookie = BLK_QC_T_NONE;
> -	dio->submit.last_queue = NULL;
>  
>  	if (iov_iter_rw(iter) == READ) {
>  		if (pos >= dio->i_size)
> @@ -1894,9 +1913,8 @@ iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
>  				break;
>  
>  			if (!(iocb->ki_flags & IOCB_HIPRI) ||
> -			    !dio->submit.last_queue ||
> -			    !blk_poll(dio->submit.last_queue,
> -					 dio->submit.cookie, true))
> +			    !dio->last_queue ||
> +			    !blk_poll(dio->last_queue, dio->cookie, true))
>  				io_schedule();
>  		}
>  		__set_current_state(TASK_RUNNING);
> diff --git a/fs/xfs/xfs_file.c b/fs/xfs/xfs_file.c
> index 53c9ab8fb777..603e705781a4 100644
> --- a/fs/xfs/xfs_file.c
> +++ b/fs/xfs/xfs_file.c
> @@ -1203,6 +1203,7 @@ const struct file_operations xfs_file_operations = {
>  	.write_iter	= xfs_file_write_iter,
>  	.splice_read	= generic_file_splice_read,
>  	.splice_write	= iter_file_splice_write,
> +	.iopoll		= iomap_dio_iopoll,
>  	.unlocked_ioctl	= xfs_file_ioctl,
>  #ifdef CONFIG_COMPAT
>  	.compat_ioctl	= xfs_file_compat_ioctl,
> diff --git a/include/linux/iomap.h b/include/linux/iomap.h
> index 9a4258154b25..0fefb5455bda 100644
> --- a/include/linux/iomap.h
> +++ b/include/linux/iomap.h
> @@ -162,6 +162,7 @@ typedef int (iomap_dio_end_io_t)(struct kiocb *iocb, ssize_t ret,
>  		unsigned flags);
>  ssize_t iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
>  		const struct iomap_ops *ops, iomap_dio_end_io_t end_io);
> +int iomap_dio_iopoll(struct kiocb *kiocb, bool spin);
>  
>  #ifdef CONFIG_SWAP
>  struct file;


^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH 8/8] aio: support for IO polling
  2018-11-20 17:19 ` [PATCH 8/8] aio: support for IO polling Jens Axboe
@ 2018-11-21 11:12   ` Benny Halevy
  2018-11-21 13:26     ` Jens Axboe
  2018-11-22 11:13   ` Jan Kara
  1 sibling, 1 reply; 16+ messages in thread
From: Benny Halevy @ 2018-11-21 11:12 UTC (permalink / raw)
  To: Jens Axboe, linux-block, linux-aio, linux-fsdevel

On Tue, 2018-11-20 at 10:19 -0700, Jens Axboe wrote:
> Add polled variants of PREAD/PREADV and PWRITE/PWRITEV. These act
> like their non-polled counterparts, except we expect to poll for
> completion of them. The polling happens at io_getevent() time, and
> works just like non-polled IO.
> 
> To setup an io_context for polled IO, the application must call
> io_setup2() with IOCTX_FLAG_IOPOLL as one of the flags. It is illegal
> to mix and match polled and non-polled IO on an io_context.
> 
> Polled IO doesn't support the user mapped completion ring. Events
> must be reaped through the io_getevents() system call. For non-irq
> driven poll devices, there's no way to support completion reaping
> from userspace by just looking at the ring. The application itself
> is the one that pulls completion entries.
> 
> Signed-off-by: Jens Axboe <axboe@kernel.dk>
> ---
>  fs/aio.c                     | 377 ++++++++++++++++++++++++++++++-----
>  include/uapi/linux/aio_abi.h |   4 +
>  2 files changed, 336 insertions(+), 45 deletions(-)
> 
> diff --git a/fs/aio.c b/fs/aio.c
> index 8bbb0b77d9c4..ea93847d25d1 100644
> --- a/fs/aio.c
> +++ b/fs/aio.c
> @@ -94,6 +94,8 @@ struct kioctx {
>  
>  	unsigned long		user_id;
>  
> +	unsigned int		flags;
> +
>  	struct __percpu kioctx_cpu *cpu;
>  
>  	/*
> @@ -138,6 +140,19 @@ struct kioctx {
>  		atomic_t	reqs_available;
>  	} ____cacheline_aligned_in_smp;
>  
> +	/* iopoll submission state */
> +	struct {
> +		spinlock_t poll_lock;
> +		struct list_head poll_submitted;
> +	} ____cacheline_aligned_in_smp;
> +
> +	/* iopoll completion state */
> +	struct {
> +		struct list_head poll_completing;
> +		unsigned long getevents_busy;
> +		atomic_t poll_completed;
> +	} ____cacheline_aligned_in_smp;
> +
>  	struct {
>  		spinlock_t	ctx_lock;
>  		struct list_head active_reqs;	/* used for cancellation */
> @@ -191,13 +206,24 @@ struct aio_kiocb {
>  
>  	struct list_head	ki_list;	/* the aio core uses this
>  						 * for cancellation */
> +
> +	unsigned long		ki_flags;
> +#define IOCB_POLL_COMPLETED	0
> +
>  	refcount_t		ki_refcnt;
>  
> -	/*
> -	 * If the aio_resfd field of the userspace iocb is not zero,
> -	 * this is the underlying eventfd context to deliver events to.
> -	 */
> -	struct eventfd_ctx	*ki_eventfd;
> +	union {
> +		/*
> +		 * If the aio_resfd field of the userspace iocb is not zero,
> +		 * this is the underlying eventfd context to deliver events to.
> +		 */
> +		struct eventfd_ctx	*ki_eventfd;
> +
> +		/*
> +		 * For polled IO, stash completion info here
> +		 */
> +		struct io_event		ki_ev;
> +	};
>  };
>  
>  /*------ sysctl variables----*/
> @@ -214,6 +240,8 @@ static struct vfsmount *aio_mnt;
>  static const struct file_operations aio_ring_fops;
>  static const struct address_space_operations aio_ctx_aops;
>  
> +static void aio_iopoll_reap_events(struct kioctx *);
> +
>  static struct file *aio_private_file(struct kioctx *ctx, loff_t nr_pages)
>  {
>  	struct file *file;
> @@ -451,11 +479,15 @@ static int aio_setup_ring(struct kioctx *ctx, unsigned int nr_events)
>  	int i;
>  	struct file *file;
>  
> -	/* Compensate for the ring buffer's head/tail overlap entry */
> -	nr_events += 2;	/* 1 is required, 2 for good luck */
> -
> +	/*
> +	 * Compensate for the ring buffer's head/tail overlap entry.
> +	 * IO polling doesn't require any io event entries
> +	 */
>  	size = sizeof(struct aio_ring);
> -	size += sizeof(struct io_event) * nr_events;
> +	if (!(ctx->flags & IOCTX_FLAG_IOPOLL)) {
> +		nr_events += 2;	/* 1 is required, 2 for good luck */
> +		size += sizeof(struct io_event) * nr_events;
> +	}
>  
>  	nr_pages = PFN_UP(size);
>  	if (nr_pages < 0)
> @@ -720,6 +752,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events, unsigned int flags)
>  	if (!ctx)
>  		return ERR_PTR(-ENOMEM);
>  
> +	ctx->flags = flags;
>  	ctx->max_reqs = max_reqs;
>  
>  	spin_lock_init(&ctx->ctx_lock);
> @@ -732,6 +765,11 @@ static struct kioctx *ioctx_alloc(unsigned nr_events, unsigned int flags)
>  
>  	INIT_LIST_HEAD(&ctx->active_reqs);
>  
> +	spin_lock_init(&ctx->poll_lock);
> +	INIT_LIST_HEAD(&ctx->poll_submitted);
> +	INIT_LIST_HEAD(&ctx->poll_completing);
> +	atomic_set(&ctx->poll_completed, 0);
> +
>  	if (percpu_ref_init(&ctx->users, free_ioctx_users, 0, GFP_KERNEL))
>  		goto err;
>  
> @@ -814,6 +852,8 @@ static int kill_ioctx(struct mm_struct *mm, struct kioctx *ctx,
>  	RCU_INIT_POINTER(table->table[ctx->id], NULL);
>  	spin_unlock(&mm->ioctx_lock);
>  
> +	aio_iopoll_reap_events(ctx);
> +
>  	/* free_ioctx_reqs() will do the necessary RCU synchronization */
>  	wake_up_all(&ctx->wait);
>  
> @@ -1056,6 +1096,24 @@ static inline void iocb_put(struct aio_kiocb *iocb)
>  	}
>  }
>  
> +static void iocb_put_many(struct kioctx *ctx, void **iocbs, int *nr)
> +{
> +	if (nr) {
> +		kmem_cache_free_bulk(kiocb_cachep, *nr, iocbs);
> +		percpu_ref_put_many(&ctx->reqs, *nr);
> +		*nr = 0;
> +	}
> +}
> +
> +static void aio_fill_event(struct io_event *ev, struct aio_kiocb *iocb,
> +			   long res, long res2)
> +{
> +	ev->obj = (u64)(unsigned long)iocb->ki_user_iocb;
> +	ev->data = iocb->ki_user_data;
> +	ev->res = res;
> +	ev->res2 = res2;
> +}
> +
>  /* aio_complete
>   *	Called when the io request on the given iocb is complete.
>   */
> @@ -1083,10 +1141,7 @@ static void aio_complete(struct aio_kiocb *iocb, long res, long res2)
>  	ev_page = kmap_atomic(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
>  	event = ev_page + pos % AIO_EVENTS_PER_PAGE;
>  
> -	event->obj = (u64)(unsigned long)iocb->ki_user_iocb;
> -	event->data = iocb->ki_user_data;
> -	event->res = res;
> -	event->res2 = res2;
> +	aio_fill_event(event, iocb, res, res2);
>  
>  	kunmap_atomic(ev_page);
>  	flush_dcache_page(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
> @@ -1239,6 +1294,165 @@ static bool aio_read_events(struct kioctx *ctx, long min_nr, long nr,
>  	return ret < 0 || *i >= min_nr;
>  }
>  
> +#define AIO_POLL_STACK	8
> +
> +/*
> + * Process completed iocb iopoll entries, copying the result to userspace.
> + */
> +static long aio_iopoll_reap(struct kioctx *ctx, struct io_event __user *evs,
> +			    unsigned int *nr_events, long max)
> +{
> +	void *iocbs[AIO_POLL_STACK];
> +	struct aio_kiocb *iocb, *n;
> +	int to_free = 0, ret = 0;

To be on the safe side, how about checking that if (evs)
*nr_events < max, otherwise, return -EINVAL?

> +
> +	list_for_each_entry_safe(iocb, n, &ctx->poll_completing, ki_list) {
> +		if (!test_bit(IOCB_POLL_COMPLETED, &iocb->ki_flags))
> +			continue;
> +		if (to_free == AIO_POLL_STACK)
> +			iocb_put_many(ctx, iocbs, &to_free);
> +
> +		list_del(&iocb->ki_list);
> +		iocbs[to_free++] = iocb;
> +
> +		fput(iocb->rw.ki_filp);
> +
> +		if (!evs) {
> +			(*nr_events)++;
> +			continue;
> +		}
> +
> +		if (copy_to_user(evs + *nr_events, &iocb->ki_ev,
> +		    sizeof(iocb->ki_ev))) {
> +			ret = -EFAULT;
> +			break;
> +		}
> +		if (++(*nr_events) == max)
> +			break;
> +	}
> +
> +	if (to_free)
> +		iocb_put_many(ctx, iocbs, &to_free);
> +
> +	return ret;
> +}
> +
> +static int __aio_iopoll_check(struct kioctx *ctx, struct io_event __user *event,
> +			      unsigned int *nr_events, long min, long max)
> +{
> +	struct aio_kiocb *iocb;
> +	unsigned int poll_completed;
> +	int to_poll, polled, ret;
> +
> +	/*
> +	 * Check if we already have done events that satisfy what we need
> +	 */
> +	if (!list_empty(&ctx->poll_completing)) {
> +		ret = aio_iopoll_reap(ctx, event, nr_events, max);
> +		if (ret < 0)
> +			return ret;
> +		if (*nr_events >= min)
> +			return 0;
> +	}
> +
> +	/*
> +	 * Take in a new working set from the submitted list if possible.
> +	 */
> +	if (!list_empty_careful(&ctx->poll_submitted)) {
> +		spin_lock(&ctx->poll_lock);
> +		list_splice_init(&ctx->poll_submitted, &ctx->poll_completing);
> +		spin_unlock(&ctx->poll_lock);
> +	}
> +
> +	if (list_empty(&ctx->poll_completing))
> +		return 0;

Could be somewhat optimized like this:

	if (list_empty_careful(&ctx->poll_submitted))
		return 0;

	spin_lock(&ctx->poll_lock);
	list_splice_init(&ctx->poll_submitted, &ctx->poll_completing);
	spin_unlock(&ctx->poll_lock);
	if (list_empty(&ctx->poll_completing))
		return 0;

Or, possibly...
	if (list_empty_careful(&ctx->poll_submitted) ||
	    ({
		spin_lock(&ctx->poll_lock);
		list_splice_init(&ctx->poll_submitted, &ctx->poll_completing);
		spin_unlock(&ctx->poll_lock);
		list_empty(&ctx->poll_completing);
	    }))
		return 0;

> +
> +	/*
> +	 * Check again now that we have a new batch.
> +	 */
> +	ret = aio_iopoll_reap(ctx, event, nr_events, max);
> +	if (ret < 0)
> +		return ret;
> +	if (*nr_events >= min)
> +		return 0;
> +
> +	/*
> +	 * Find up to 'max_nr' worth of events to poll for, including the

What's max_nr? You mean 'max'?

> +	 * events we already successfully polled
> +	 */
> +	polled = to_poll = 0;
> +	poll_completed = atomic_read(&ctx->poll_completed);
> +	list_for_each_entry(iocb, &ctx->poll_completing, ki_list) {
> +		/*
> +		 * Poll for needed events with wait == true, anything after
> +		 * that we just check if we have more, up to max.
> +		 */
> +		bool wait = polled + *nr_events >= min;
> +		struct kiocb *kiocb = &iocb->rw;
> +
> +		if (test_bit(IOCB_POLL_COMPLETED, &iocb->ki_flags))
> +			break;
> +		if (++to_poll + *nr_events >= max)
> +			break;
> +
> +		polled += kiocb->ki_filp->f_op->iopoll(kiocb, wait);

Could iopoll return a negative value? (Currently not in this patchset,
but would it be possible in the future?)

> +		if (polled + *nr_events >= max)
> +			break;
> +		if (poll_completed != atomic_read(&ctx->poll_completed))
> +			break;
> +	}
> +
> +	ret = aio_iopoll_reap(ctx, event, nr_events, max);
> +	if (ret < 0)
> +		return ret;
> +	if (*nr_events >= min)
> +		return 0;
> +	return to_poll;

What does the returned value mean?
If the intention is only to return a value greater than zero,
how about just returning to_poll > 0?

> +}
> +
> +/*
> + * We can't just wait for polled events to come to us, we have to actively
> + * find and complete them.
> + */
> +static void aio_iopoll_reap_events(struct kioctx *ctx)
> +{
> +	if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
> +		return;
> +
> +	while (!list_empty_careful(&ctx->poll_submitted) ||
> +	       !list_empty(&ctx->poll_completing)) {
> +		unsigned int nr_events = 0;
> +
> +		__aio_iopoll_check(ctx, NULL, &nr_events, 1, UINT_MAX);

BUG_ON(__aoi_iopoll_check() < 0) ?

> +	}
> +}
> +
> +static int aio_iopoll_check(struct kioctx *ctx, long min_nr, long nr,
> +			    struct io_event __user *event)
> +{
> +	unsigned int nr_events = 0;
> +	int ret = 0;
> +
> +	/* * Only allow one thread polling at a time */

nit: extra '* '

> +	if (test_and_set_bit(0, &ctx->getevents_busy))
> +		return -EBUSY;
> +
> +	while (!nr_events || !need_resched()) {
> +		int tmin = 0;
> +
> +		if (nr_events < min_nr)
> +			tmin = min_nr - nr_events;
> +
> +		ret = __aio_iopoll_check(ctx, event, &nr_events, tmin, nr);
> +		if (ret <= 0)
> +			break;
> +		ret = 0;
> +	}
> +
> +	clear_bit(0, &ctx->getevents_busy);
> +	return nr_events ? nr_events : ret;
> +}
> +
>  static long read_events(struct kioctx *ctx, long min_nr, long nr,
>  			struct io_event __user *event,
>  			ktime_t until)
> @@ -1287,7 +1501,7 @@ SYSCALL_DEFINE3(io_setup2, u32, nr_events, u32, flags,
>  	unsigned long ctx;
>  	long ret;
>  
> -	if (flags)
> +	if (flags & ~IOCTX_FLAG_IOPOLL)
>  		return -EINVAL;
>  
>  	ret = get_user(ctx, ctxp);
> @@ -1411,13 +1625,8 @@ static void aio_remove_iocb(struct aio_kiocb *iocb)
>  	spin_unlock_irqrestore(&ctx->ctx_lock, flags);
>  }
>  
> -static void aio_complete_rw(struct kiocb *kiocb, long res, long res2)
> +static void kiocb_end_write(struct kiocb *kiocb)
>  {
> -	struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, rw);
> -
> -	if (!list_empty_careful(&iocb->ki_list))
> -		aio_remove_iocb(iocb);
> -
>  	if (kiocb->ki_flags & IOCB_WRITE) {
>  		struct inode *inode = file_inode(kiocb->ki_filp);
>  
> @@ -1429,19 +1638,42 @@ static void aio_complete_rw(struct kiocb *kiocb, long res, long res2)
>  			__sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
>  		file_end_write(kiocb->ki_filp);
>  	}
> +}
> +
> +static void aio_complete_rw(struct kiocb *kiocb, long res, long res2)
> +{
> +	struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, rw);
> +
> +	if (!list_empty_careful(&iocb->ki_list))
> +		aio_remove_iocb(iocb);
> +
> +	kiocb_end_write(kiocb);
>  
>  	fput(kiocb->ki_filp);
>  	aio_complete(iocb, res, res2);
>  }
>  
> -static int aio_prep_rw(struct kiocb *req, struct iocb *iocb)
> +static void aio_complete_rw_poll(struct kiocb *kiocb, long res, long res2)
>  {
> +	struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, rw);
> +	struct kioctx *ctx = iocb->ki_ctx;
> +
> +	kiocb_end_write(kiocb);
> +
> +	aio_fill_event(&iocb->ki_ev, iocb, res, res2);
> +	set_bit(IOCB_POLL_COMPLETED, &iocb->ki_flags);
> +	atomic_inc(&ctx->poll_completed);
> +}
> +
> +static int aio_prep_rw(struct aio_kiocb *kiocb, struct iocb *iocb)
> +{
> +	struct kioctx *ctx = kiocb->ki_ctx;
> +	struct kiocb *req = &kiocb->rw;
>  	int ret;
>  
>  	req->ki_filp = fget(iocb->aio_fildes);
>  	if (unlikely(!req->ki_filp))
>  		return -EBADF;
> -	req->ki_complete = aio_complete_rw;
>  	req->ki_pos = iocb->aio_offset;
>  	req->ki_flags = iocb_flags(req->ki_filp);
>  	if (iocb->aio_flags & IOCB_FLAG_RESFD)
> @@ -1456,8 +1688,7 @@ static int aio_prep_rw(struct kiocb *req, struct iocb *iocb)
>  		ret = ioprio_check_cap(iocb->aio_reqprio);
>  		if (ret) {
>  			pr_debug("aio ioprio check cap error: %d\n", ret);
> -			fput(req->ki_filp);
> -			return ret;
> +			goto out_fput;
>  		}
>  
>  		req->ki_ioprio = iocb->aio_reqprio;
> @@ -1466,7 +1697,41 @@ static int aio_prep_rw(struct kiocb *req, struct iocb *iocb)
>  
>  	ret = kiocb_set_rw_flags(req, iocb->aio_rw_flags);
>  	if (unlikely(ret))
> -		fput(req->ki_filp);
> +		goto out_fput;
> +
> +	if (iocb->aio_flags & IOCB_FLAG_HIPRI) {
> +		/* shares space in the union, and is rather pointless.. */
> +		ret = -EINVAL;
> +		if (iocb->aio_flags & IOCB_FLAG_RESFD)
> +			goto out_fput;
> +
> +		/* can't submit polled IO to a non-polled ctx */
> +		if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
> +			goto out_fput;
> +
> +		ret = -EOPNOTSUPP;
> +		if (!(req->ki_flags & IOCB_DIRECT) ||
> +		    !req->ki_filp->f_op->iopoll)
> +			goto out_fput;
> +
> +		req->ki_flags |= IOCB_HIPRI;
> +		req->ki_complete = aio_complete_rw_poll;
> +
> +		spin_lock(&ctx->poll_lock);
> +		list_add_tail(&kiocb->ki_list, &ctx->poll_submitted);
> +		spin_unlock(&ctx->poll_lock);
> +	} else {
> +		/* can't submit non-polled IO to a polled ctx */
> +		ret = -EINVAL;
> +		if (ctx->flags & IOCTX_FLAG_IOPOLL)
> +			goto out_fput;
> +
> +		req->ki_complete = aio_complete_rw;
> +	}
> +
> +	return 0;
> +out_fput:
> +	fput(req->ki_filp);
>  	return ret;
>  }
>  
> @@ -1509,15 +1774,16 @@ static inline void aio_rw_done(struct kiocb *req, ssize_t ret)
>  	}
>  }
>  
> -static ssize_t aio_read(struct kiocb *req, struct iocb *iocb, bool vectored,
> -		bool compat)
> +static ssize_t aio_read(struct aio_kiocb *kiocb, struct iocb *iocb,
> +			bool vectored, bool compat)
>  {
>  	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
> +	struct kiocb *req = &kiocb->rw;
>  	struct iov_iter iter;
>  	struct file *file;
>  	ssize_t ret;
>  
> -	ret = aio_prep_rw(req, iocb);
> +	ret = aio_prep_rw(kiocb, iocb);
>  	if (ret)
>  		return ret;
>  	file = req->ki_filp;
> @@ -1542,15 +1808,16 @@ static ssize_t aio_read(struct kiocb *req, struct iocb *iocb, bool vectored,
>  	return ret;
>  }
>  
> -static ssize_t aio_write(struct kiocb *req, struct iocb *iocb, bool vectored,
> -		bool compat)
> +static ssize_t aio_write(struct aio_kiocb *kiocb, struct iocb *iocb,
> +			 bool vectored, bool compat)
>  {
>  	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
> +	struct kiocb *req = &kiocb->rw;
>  	struct iov_iter iter;
>  	struct file *file;
>  	ssize_t ret;
>  
> -	ret = aio_prep_rw(req, iocb);
> +	ret = aio_prep_rw(kiocb, iocb);
>  	if (ret)
>  		return ret;
>  	file = req->ki_filp;
> @@ -1820,7 +2087,8 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
>  		return -EINVAL;
>  	}
>  
> -	if (!get_reqs_available(ctx))
> +	/* Poll IO doesn't need ring reservations */
> +	if (!(ctx->flags & IOCTX_FLAG_IOPOLL) && !get_reqs_available(ctx))
>  		return -EAGAIN;
>  
>  	ret = -EAGAIN;
> @@ -1843,35 +2111,45 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
>  		}
>  	}
>  
> -	ret = put_user(KIOCB_KEY, &user_iocb->aio_key);
> -	if (unlikely(ret)) {
> -		pr_debug("EFAULT: aio_key\n");
> -		goto out_put_req;
> +	/* polled IO isn't cancelable, don't bother copying the key */
> +	if (!(ctx->flags & IOCTX_FLAG_IOPOLL)) {
> +		ret = put_user(KIOCB_KEY, &user_iocb->aio_key);
> +		if (unlikely(ret)) {
> +			pr_debug("EFAULT: aio_key\n");
> +			goto out_put_req;
> +		}
>  	}
>  
>  	req->ki_user_iocb = user_iocb;
>  	req->ki_user_data = iocb.aio_data;
>  
> +	ret = -EINVAL;
>  	switch (iocb.aio_lio_opcode) {
>  	case IOCB_CMD_PREAD:
> -		ret = aio_read(&req->rw, &iocb, false, compat);
> +		ret = aio_read(req, &iocb, false, compat);
>  		break;
>  	case IOCB_CMD_PWRITE:
> -		ret = aio_write(&req->rw, &iocb, false, compat);
> +		ret = aio_write(req, &iocb, false, compat);
>  		break;
>  	case IOCB_CMD_PREADV:
> -		ret = aio_read(&req->rw, &iocb, true, compat);
> +		ret = aio_read(req, &iocb, true, compat);
>  		break;
>  	case IOCB_CMD_PWRITEV:
> -		ret = aio_write(&req->rw, &iocb, true, compat);
> +		ret = aio_write(req, &iocb, true, compat);
>  		break;
>  	case IOCB_CMD_FSYNC:
> +		if (ctx->flags & IOCTX_FLAG_IOPOLL)
> +			break;
>  		ret = aio_fsync(&req->fsync, &iocb, false);
>  		break;
>  	case IOCB_CMD_FDSYNC:
> +		if (ctx->flags & IOCTX_FLAG_IOPOLL)
> +			break;
>  		ret = aio_fsync(&req->fsync, &iocb, true);
>  		break;
>  	case IOCB_CMD_POLL:
> +		if (ctx->flags & IOCTX_FLAG_IOPOLL)
> +			break;
>  		ret = aio_poll(req, &iocb);
>  		break;
>  	default:
> @@ -1894,7 +2172,8 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
>  		eventfd_ctx_put(req->ki_eventfd);
>  	kmem_cache_free(kiocb_cachep, req);
>  out_put_reqs_available:
> -	put_reqs_available(ctx, 1);
> +	if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
> +		put_reqs_available(ctx, 1);
>  	return ret;
>  }
>  
> @@ -1930,7 +2209,9 @@ SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr,
>  	if (nr > ctx->nr_events)
>  		nr = ctx->nr_events;
>  
> -	blk_start_plug(&plug);
> +	if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
> +		blk_start_plug(&plug);
> +
>  	for (i = 0; i < nr; i++) {
>  		struct iocb __user *user_iocb;
>  
> @@ -1943,7 +2224,9 @@ SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr,
>  		if (ret)
>  			break;
>  	}
> -	blk_finish_plug(&plug);
> +
> +	if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
> +		blk_finish_plug(&plug);
>  
>  	percpu_ref_put(&ctx->users);
>  	return i ? i : ret;
> @@ -2068,8 +2351,12 @@ static long do_io_getevents(aio_context_t ctx_id,
>  	long ret = -EINVAL;
>  
>  	if (likely(ioctx)) {
> -		if (likely(min_nr <= nr && min_nr >= 0))
> -			ret = read_events(ioctx, min_nr, nr, events, until);
> +		if (likely(min_nr <= nr && min_nr >= 0)) {
> +			if (ioctx->flags & IOCTX_FLAG_IOPOLL)
> +				ret = aio_iopoll_check(ioctx, min_nr, nr, events);
> +			else
> +				ret = read_events(ioctx, min_nr, nr, events, until);
> +		}
>  		percpu_ref_put(&ioctx->users);
>  	}
>  
> diff --git a/include/uapi/linux/aio_abi.h b/include/uapi/linux/aio_abi.h
> index 8387e0af0f76..3b98b5fbacde 100644
> --- a/include/uapi/linux/aio_abi.h
> +++ b/include/uapi/linux/aio_abi.h
> @@ -52,9 +52,11 @@ enum {
>   *                   is valid.
>   * IOCB_FLAG_IOPRIO - Set if the "aio_reqprio" member of the "struct iocb"
>   *                    is valid.
> + * IOCB_FLAG_HIPRI - Use IO completion polling
>   */
>  #define IOCB_FLAG_RESFD		(1 << 0)
>  #define IOCB_FLAG_IOPRIO	(1 << 1)
> +#define IOCB_FLAG_HIPRI		(1 << 2)
>  
>  /* read() from /dev/aio returns these structures. */
>  struct io_event {
> @@ -106,6 +108,8 @@ struct iocb {
>  	__u32	aio_resfd;
>  }; /* 64 bytes */
>  
> +#define IOCTX_FLAG_IOPOLL	(1 << 0)
> +
>  #undef IFBIG
>  #undef IFLITTLE
>  


^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH 8/8] aio: support for IO polling
  2018-11-21 11:12   ` Benny Halevy
@ 2018-11-21 13:26     ` Jens Axboe
  2018-11-21 13:51       ` Benny Halevy
  0 siblings, 1 reply; 16+ messages in thread
From: Jens Axboe @ 2018-11-21 13:26 UTC (permalink / raw)
  To: Benny Halevy, linux-block, linux-aio, linux-fsdevel

On 11/21/18 4:12 AM, Benny Halevy wrote:
>> +#define AIO_POLL_STACK	8
>> +
>> +/*
>> + * Process completed iocb iopoll entries, copying the result to userspace.
>> + */
>> +static long aio_iopoll_reap(struct kioctx *ctx, struct io_event __user *evs,
>> +			    unsigned int *nr_events, long max)
>> +{
>> +	void *iocbs[AIO_POLL_STACK];
>> +	struct aio_kiocb *iocb, *n;
>> +	int to_free = 0, ret = 0;
> 
> To be on the safe side, how about checking that if (evs)
> *nr_events < max, otherwise, return -EINVAL?

Good point, I think we should re-arrange the loop a bit to move the
check up at the top to guard for entries == max at entry. I've done
that.

>> +	/*
>> +	 * Take in a new working set from the submitted list if possible.
>> +	 */
>> +	if (!list_empty_careful(&ctx->poll_submitted)) {
>> +		spin_lock(&ctx->poll_lock);
>> +		list_splice_init(&ctx->poll_submitted, &ctx->poll_completing);
>> +		spin_unlock(&ctx->poll_lock);
>> +	}
>> +
>> +	if (list_empty(&ctx->poll_completing))
>> +		return 0;
> 
> Could be somewhat optimized like this:
> 
> 	if (list_empty_careful(&ctx->poll_submitted))
> 		return 0;
> 
> 	spin_lock(&ctx->poll_lock);
> 	list_splice_init(&ctx->poll_submitted, &ctx->poll_completing);
> 	spin_unlock(&ctx->poll_lock);
> 	if (list_empty(&ctx->poll_completing))
> 		return 0;
> 
> Or, possibly...
> 	if (list_empty_careful(&ctx->poll_submitted) ||
> 	    ({
> 		spin_lock(&ctx->poll_lock);
> 		list_splice_init(&ctx->poll_submitted, &ctx->poll_completing);
> 		spin_unlock(&ctx->poll_lock);
> 		list_empty(&ctx->poll_completing);
> 	    }))
> 		return 0;

I think the readability of the existing version is better.

>> +	/*
>> +	 * Check again now that we have a new batch.
>> +	 */
>> +	ret = aio_iopoll_reap(ctx, event, nr_events, max);
>> +	if (ret < 0)
>> +		return ret;
>> +	if (*nr_events >= min)
>> +		return 0;
>> +
>> +	/*
>> +	 * Find up to 'max_nr' worth of events to poll for, including the
> 
> What's max_nr? You mean 'max'?

It should, corrected.

>> +	 * events we already successfully polled
>> +	 */
>> +	polled = to_poll = 0;
>> +	poll_completed = atomic_read(&ctx->poll_completed);
>> +	list_for_each_entry(iocb, &ctx->poll_completing, ki_list) {
>> +		/*
>> +		 * Poll for needed events with wait == true, anything after
>> +		 * that we just check if we have more, up to max.
>> +		 */
>> +		bool wait = polled + *nr_events >= min;
>> +		struct kiocb *kiocb = &iocb->rw;
>> +
>> +		if (test_bit(IOCB_POLL_COMPLETED, &iocb->ki_flags))
>> +			break;
>> +		if (++to_poll + *nr_events >= max)
>> +			break;
>> +
>> +		polled += kiocb->ki_filp->f_op->iopoll(kiocb, wait);
> 
> Could iopoll return a negative value? (Currently not in this patchset,
> but would it be possible in the future?)

That's a good point, I've added a separate check for this. Given that
it's a regular fops handler, it should be perfectly valid to return
-ERROR.

>> +		if (polled + *nr_events >= max)
>> +			break;
>> +		if (poll_completed != atomic_read(&ctx->poll_completed))
>> +			break;
>> +	}
>> +
>> +	ret = aio_iopoll_reap(ctx, event, nr_events, max);
>> +	if (ret < 0)
>> +		return ret;
>> +	if (*nr_events >= min)
>> +		return 0;
>> +	return to_poll;
> 
> What does the returned value mean?
> If the intention is only to return a value greater than zero,
> how about just returning to_poll > 0?

It just means that you could call us again, if > 0, and < 0 is an error
specifically.

>> +/*
>> + * We can't just wait for polled events to come to us, we have to actively
>> + * find and complete them.
>> + */
>> +static void aio_iopoll_reap_events(struct kioctx *ctx)
>> +{
>> +	if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
>> +		return;
>> +
>> +	while (!list_empty_careful(&ctx->poll_submitted) ||
>> +	       !list_empty(&ctx->poll_completing)) {
>> +		unsigned int nr_events = 0;
>> +
>> +		__aio_iopoll_check(ctx, NULL, &nr_events, 1, UINT_MAX);
> 
> BUG_ON(__aoi_iopoll_check() < 0) ?

Ho hum...

>> +	}
>> +}
>> +
>> +static int aio_iopoll_check(struct kioctx *ctx, long min_nr, long nr,
>> +			    struct io_event __user *event)
>> +{
>> +	unsigned int nr_events = 0;
>> +	int ret = 0;
>> +
>> +	/* * Only allow one thread polling at a time */
> 
> nit: extra '* '

Removed.

Thanks for your review!

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH 3/8] iomap/xfs: wire up file_operations ->iopoll()
  2018-11-21  9:15   ` Benny Halevy
@ 2018-11-21 13:27     ` Jens Axboe
  0 siblings, 0 replies; 16+ messages in thread
From: Jens Axboe @ 2018-11-21 13:27 UTC (permalink / raw)
  To: Benny Halevy, linux-block, linux-aio, linux-fsdevel

On 11/21/18 2:15 AM, Benny Halevy wrote:
>> +int iomap_dio_iopoll(struct kiocb *kiocb, bool spin)
>> +{
>> +	struct iomap_dio *dio = kiocb->private;
>> +	struct request_queue *q = READ_ONCE(dio->last_queue);
>> +
>> +	if (!q || dio->cookie == BLK_QC_T_NONE)
>> +		return 0;
>> +	return blk_poll(q, READ_ONCE(dio->cookie), spin);
> 
> How about:
> 	blk_qc_t cookie = READ_ONCE(dio->cookie);
> 
> 	if (!q || cookie == BLK_QC_T_NONE)
> 		return 0;
> 	return blk_poll(q, cookie, spin);
> 
> Is there a chance the dio->cookie in the if () condition
> will be stale?

I've rewritten this one, more importantly it needs to guard against
submission by ensuring that 'dio' is valid.

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH 8/8] aio: support for IO polling
  2018-11-21 13:26     ` Jens Axboe
@ 2018-11-21 13:51       ` Benny Halevy
  0 siblings, 0 replies; 16+ messages in thread
From: Benny Halevy @ 2018-11-21 13:51 UTC (permalink / raw)
  To: Jens Axboe, linux-block, linux-aio, linux-fsdevel

On Wed, 2018-11-21 at 06:26 -0700, Jens Axboe wrote:
> On 11/21/18 4:12 AM, Benny Halevy wrote:
> > > +#define AIO_POLL_STACK	8
> > > +
> > > +/*
> > > + * Process completed iocb iopoll entries, copying the result to userspace.
> > > + */
> > > +static long aio_iopoll_reap(struct kioctx *ctx, struct io_event __user *evs,
> > > +			    unsigned int *nr_events, long max)
> > > +{
> > > +	void *iocbs[AIO_POLL_STACK];
> > > +	struct aio_kiocb *iocb, *n;
> > > +	int to_free = 0, ret = 0;
> > 
> > To be on the safe side, how about checking that if (evs)
> > *nr_events < max, otherwise, return -EINVAL?
> 
> Good point, I think we should re-arrange the loop a bit to move the
> check up at the top to guard for entries == max at entry. I've done
> that.
> 
> > > +	/*
> > > +	 * Take in a new working set from the submitted list if possible.
> > > +	 */
> > > +	if (!list_empty_careful(&ctx->poll_submitted)) {
> > > +		spin_lock(&ctx->poll_lock);
> > > +		list_splice_init(&ctx->poll_submitted, &ctx->poll_completing);
> > > +		spin_unlock(&ctx->poll_lock);
> > > +	}
> > > +
> > > +	if (list_empty(&ctx->poll_completing))
> > > +		return 0;
> > 
> > Could be somewhat optimized like this:
> > 
> > 	if (list_empty_careful(&ctx->poll_submitted))
> > 		return 0;
> > 
> > 	spin_lock(&ctx->poll_lock);
> > 	list_splice_init(&ctx->poll_submitted, &ctx->poll_completing);
> > 	spin_unlock(&ctx->poll_lock);
> > 	if (list_empty(&ctx->poll_completing))
> > 		return 0;
> > 
> > Or, possibly...
> > 	if (list_empty_careful(&ctx->poll_submitted) ||
> > 	    ({
> > 		spin_lock(&ctx->poll_lock);
> > 		list_splice_init(&ctx->poll_submitted, &ctx->poll_completing);
> > 		spin_unlock(&ctx->poll_lock);
> > 		list_empty(&ctx->poll_completing);
> > 	    }))
> > 		return 0;
> 
> I think the readability of the existing version is better.
> 
> > > +	/*
> > > +	 * Check again now that we have a new batch.
> > > +	 */
> > > +	ret = aio_iopoll_reap(ctx, event, nr_events, max);
> > > +	if (ret < 0)
> > > +		return ret;
> > > +	if (*nr_events >= min)
> > > +		return 0;
> > > +
> > > +	/*
> > > +	 * Find up to 'max_nr' worth of events to poll for, including the
> > 
> > What's max_nr? You mean 'max'?
> 
> It should, corrected.
> 
> > > +	 * events we already successfully polled
> > > +	 */
> > > +	polled = to_poll = 0;
> > > +	poll_completed = atomic_read(&ctx->poll_completed);
> > > +	list_for_each_entry(iocb, &ctx->poll_completing, ki_list) {
> > > +		/*
> > > +		 * Poll for needed events with wait == true, anything after
> > > +		 * that we just check if we have more, up to max.
> > > +		 */
> > > +		bool wait = polled + *nr_events >= min;
> > > +		struct kiocb *kiocb = &iocb->rw;
> > > +
> > > +		if (test_bit(IOCB_POLL_COMPLETED, &iocb->ki_flags))
> > > +			break;
> > > +		if (++to_poll + *nr_events >= max)
> > > +			break;
> > > +
> > > +		polled += kiocb->ki_filp->f_op->iopoll(kiocb, wait);
> > 
> > Could iopoll return a negative value? (Currently not in this patchset,
> > but would it be possible in the future?)
> 
> That's a good point, I've added a separate check for this. Given that
> it's a regular fops handler, it should be perfectly valid to return
> -ERROR.
> 
> > > +		if (polled + *nr_events >= max)
> > > +			break;
> > > +		if (poll_completed != atomic_read(&ctx->poll_completed))
> > > +			break;
> > > +	}
> > > +
> > > +	ret = aio_iopoll_reap(ctx, event, nr_events, max);
> > > +	if (ret < 0)
> > > +		return ret;
> > > +	if (*nr_events >= min)
> > > +		return 0;
> > > +	return to_poll;
> > 
> > What does the returned value mean?
> > If the intention is only to return a value greater than zero,
> > how about just returning to_poll > 0?
> 
> It just means that you could call us again, if > 0, and < 0 is an error
> specifically.
> 
> > > +/*
> > > + * We can't just wait for polled events to come to us, we have to actively
> > > + * find and complete them.
> > > + */
> > > +static void aio_iopoll_reap_events(struct kioctx *ctx)
> > > +{
> > > +	if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
> > > +		return;
> > > +
> > > +	while (!list_empty_careful(&ctx->poll_submitted) ||
> > > +	       !list_empty(&ctx->poll_completing)) {
> > > +		unsigned int nr_events = 0;
> > > +
> > > +		__aio_iopoll_check(ctx, NULL, &nr_events, 1, UINT_MAX);
> > 
> > BUG_ON(__aoi_iopoll_check() < 0) ?
> 
> Ho hum...
> 
> > > +	}
> > > +}
> > > +
> > > +static int aio_iopoll_check(struct kioctx *ctx, long min_nr, long nr,
> > > +			    struct io_event __user *event)
> > > +{
> > > +	unsigned int nr_events = 0;
> > > +	int ret = 0;
> > > +
> > > +	/* * Only allow one thread polling at a time */
> > 
> > nit: extra '* '
> 
> Removed.
> 
> Thanks for your review!

You're welcome. Always happy to help!
Thanks for introducing this interface!
We intend to make use of it in scylla, via seastar
(See http://seastar.io/ https://github.com/scylladb/seastar)

Benny



^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH 8/8] aio: support for IO polling
  2018-11-20 17:19 ` [PATCH 8/8] aio: support for IO polling Jens Axboe
  2018-11-21 11:12   ` Benny Halevy
@ 2018-11-22 11:13   ` Jan Kara
  2018-11-22 21:01     ` Jens Axboe
  1 sibling, 1 reply; 16+ messages in thread
From: Jan Kara @ 2018-11-22 11:13 UTC (permalink / raw)
  To: Jens Axboe; +Cc: linux-block, linux-aio, linux-fsdevel


On Tue 20-11-18 10:19:53, Jens Axboe wrote:
> +/*
> + * We can't just wait for polled events to come to us, we have to actively
> + * find and complete them.
> + */
> +static void aio_iopoll_reap_events(struct kioctx *ctx)
> +{
> +	if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
> +		return;
> +
> +	while (!list_empty_careful(&ctx->poll_submitted) ||
> +	       !list_empty(&ctx->poll_completing)) {
> +		unsigned int nr_events = 0;
> +
> +		__aio_iopoll_check(ctx, NULL, &nr_events, 1, UINT_MAX);
> +	}
> +}
> +
> +static int aio_iopoll_check(struct kioctx *ctx, long min_nr, long nr,
> +			    struct io_event __user *event)
> +{
> +	unsigned int nr_events = 0;
> +	int ret = 0;
> +
> +	/* * Only allow one thread polling at a time */
> +	if (test_and_set_bit(0, &ctx->getevents_busy))
> +		return -EBUSY;
> +
> +	while (!nr_events || !need_resched()) {
> +		int tmin = 0;
> +
> +		if (nr_events < min_nr)
> +			tmin = min_nr - nr_events;
> +
> +		ret = __aio_iopoll_check(ctx, event, &nr_events, tmin, nr);
> +		if (ret <= 0)
> +			break;
> +		ret = 0;
> +	}
> +
> +	clear_bit(0, &ctx->getevents_busy);
> +	return nr_events ? nr_events : ret;
> +}

Hum, what if userspace calls io_destroy() while another process is polling
for events on the same kioctx? It seems we'd be reaping events from two
processes in parallel in that case which will result in various
"interesting" effects like ctx->poll_completing list corruption...

								Honza
-- 
Jan Kara <jack@suse.com>
SUSE Labs, CR

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH 8/8] aio: support for IO polling
  2018-11-22 11:13   ` Jan Kara
@ 2018-11-22 21:01     ` Jens Axboe
  0 siblings, 0 replies; 16+ messages in thread
From: Jens Axboe @ 2018-11-22 21:01 UTC (permalink / raw)
  To: Jan Kara; +Cc: linux-block, linux-aio, linux-fsdevel

On 11/22/18 4:13 AM, Jan Kara wrote:
> 
> On Tue 20-11-18 10:19:53, Jens Axboe wrote:
>> +/*
>> + * We can't just wait for polled events to come to us, we have to actively
>> + * find and complete them.
>> + */
>> +static void aio_iopoll_reap_events(struct kioctx *ctx)
>> +{
>> +	if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
>> +		return;
>> +
>> +	while (!list_empty_careful(&ctx->poll_submitted) ||
>> +	       !list_empty(&ctx->poll_completing)) {
>> +		unsigned int nr_events = 0;
>> +
>> +		__aio_iopoll_check(ctx, NULL, &nr_events, 1, UINT_MAX);
>> +	}
>> +}
>> +
>> +static int aio_iopoll_check(struct kioctx *ctx, long min_nr, long nr,
>> +			    struct io_event __user *event)
>> +{
>> +	unsigned int nr_events = 0;
>> +	int ret = 0;
>> +
>> +	/* * Only allow one thread polling at a time */
>> +	if (test_and_set_bit(0, &ctx->getevents_busy))
>> +		return -EBUSY;
>> +
>> +	while (!nr_events || !need_resched()) {
>> +		int tmin = 0;
>> +
>> +		if (nr_events < min_nr)
>> +			tmin = min_nr - nr_events;
>> +
>> +		ret = __aio_iopoll_check(ctx, event, &nr_events, tmin, nr);
>> +		if (ret <= 0)
>> +			break;
>> +		ret = 0;
>> +	}
>> +
>> +	clear_bit(0, &ctx->getevents_busy);
>> +	return nr_events ? nr_events : ret;
>> +}
> 
> Hum, what if userspace calls io_destroy() while another process is polling
> for events on the same kioctx? It seems we'd be reaping events from two
> processes in parallel in that case which will result in various
> "interesting" effects like ctx->poll_completing list corruption...

I've replaced the ->getevents_busy with a mutex, and we also protect
the ->dead check inside that mutex. That ensures that destroy can't
proceed before a potential caller is inside getevents(), and that
getevents() sees if the ctx is being destroyed.

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 16+ messages in thread

end of thread, other threads:[~2018-11-22 21:01 UTC | newest]

Thread overview: 16+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-11-20 17:19 [PATCHSET v2] Support for polled aio Jens Axboe
2018-11-20 17:19 ` [PATCH 1/8] fs: add file_operations ->iopoll() handler Jens Axboe
2018-11-20 17:19 ` [PATCH 2/8] block: wire up block device ->iopoll() Jens Axboe
2018-11-20 17:19 ` [PATCH 3/8] iomap/xfs: wire up file_operations ->iopoll() Jens Axboe
2018-11-21  9:15   ` Benny Halevy
2018-11-21 13:27     ` Jens Axboe
2018-11-20 17:19 ` [PATCH 4/8] aio: use assigned completion handler Jens Axboe
2018-11-20 17:19 ` [PATCH 5/8] aio: fix failure to put the file pointer Jens Axboe
2018-11-20 17:19 ` [PATCH 6/8] aio: add io_setup2() system call Jens Axboe
2018-11-20 17:19 ` [PATCH 7/8] aio: separate out ring reservation from req allocation Jens Axboe
2018-11-20 17:19 ` [PATCH 8/8] aio: support for IO polling Jens Axboe
2018-11-21 11:12   ` Benny Halevy
2018-11-21 13:26     ` Jens Axboe
2018-11-21 13:51       ` Benny Halevy
2018-11-22 11:13   ` Jan Kara
2018-11-22 21:01     ` Jens Axboe

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.