linux-xfs.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
From: Dave Chinner <david@fromorbit.com>
To: linux-xfs@vger.kernel.org
Subject: [PATCH 27/27] libxfs: convert sync IO buftarg engine to AIO
Date: Thu, 15 Oct 2020 18:21:55 +1100	[thread overview]
Message-ID: <20201015072155.1631135-28-david@fromorbit.com> (raw)
In-Reply-To: <20201015072155.1631135-1-david@fromorbit.com>

From: Dave Chinner <dchinner@redhat.com>

Simple per-ag thread based completion engine. Will have issues with
large AG counts.

XXX: should this be combined with the struct btcache?

Signed-off-by: Dave Chinner <dchinner@redhat.com>
---
 include/atomic.h           |   7 +-
 include/builddefs.in       |   2 +-
 include/platform_defs.h.in |   1 +
 libxfs/buftarg.c           | 202 +++++++++++++++++++++++++++++++------
 libxfs/xfs_buf.h           |   6 ++
 libxfs/xfs_buftarg.h       |   7 ++
 6 files changed, 191 insertions(+), 34 deletions(-)

diff --git a/include/atomic.h b/include/atomic.h
index 5860d7897ae5..8727fc4ddae9 100644
--- a/include/atomic.h
+++ b/include/atomic.h
@@ -27,8 +27,11 @@ typedef	int64_t	atomic64_t;
 #define atomic_inc_return(a)	uatomic_add_return(a, 1)
 #define atomic_dec_return(a)	uatomic_sub_return(a, 1)
 
-#define atomic_inc(a)		atomic_inc_return(a)
-#define atomic_dec(a)		atomic_inc_return(a)
+#define atomic_add(a, v)	uatomic_add(a, v)
+#define atomic_sub(a, v)	uatomic_sub(a, v)
+
+#define atomic_inc(a)		uatomic_inc(a)
+#define atomic_dec(a)		uatomic_dec(a)
 
 #define atomic_dec_and_test(a)	(atomic_dec_return(a) == 0)
 
diff --git a/include/builddefs.in b/include/builddefs.in
index 78eddf4a9852..c20a48f6258c 100644
--- a/include/builddefs.in
+++ b/include/builddefs.in
@@ -29,7 +29,7 @@ LIBEDITLINE = @libeditline@
 LIBBLKID = @libblkid@
 LIBDEVMAPPER = @libdevmapper@
 LIBINIH = @libinih@
-LIBXFS = $(TOPDIR)/libxfs/libxfs.la
+LIBXFS = $(TOPDIR)/libxfs/libxfs.la -laio
 LIBFROG = $(TOPDIR)/libfrog/libfrog.la
 LIBXCMD = $(TOPDIR)/libxcmd/libxcmd.la
 LIBXLOG = $(TOPDIR)/libxlog/libxlog.la
diff --git a/include/platform_defs.h.in b/include/platform_defs.h.in
index 8af43f3b8d8a..7c30a43eb951 100644
--- a/include/platform_defs.h.in
+++ b/include/platform_defs.h.in
@@ -24,6 +24,7 @@
 #include <stdbool.h>
 #include <libgen.h>
 #include <urcu.h>
+#include <libaio.h>
 
 typedef struct filldir		filldir_t;
 
diff --git a/libxfs/buftarg.c b/libxfs/buftarg.c
index df968c66c205..e1e5f41b423c 100644
--- a/libxfs/buftarg.c
+++ b/libxfs/buftarg.c
@@ -259,11 +259,16 @@ xfs_buftarg_alloc(
 	if (percpu_counter_init(&btp->bt_io_count, 0, GFP_KERNEL))
 		goto error_lru;
 
-	if (xfs_buftarg_mempressue_init(btp))
+	if (xfs_buftarg_aio_init(&btp->bt_aio))
 		goto error_pcp;
 
+	if (xfs_buftarg_mempressue_init(btp))
+		goto error_aio;
+
 	return btp;
 
+error_aio:
+	xfs_buftarg_aio_destroy(btp->bt_aio);
 error_pcp:
 	percpu_counter_destroy(&btp->bt_io_count);
 error_lru:
@@ -286,6 +291,7 @@ xfs_buftarg_free(
 	if (btp->bt_psi_fd >= 0)
 		close(btp->bt_psi_fd);
 
+	xfs_buftarg_aio_destroy(btp->bt_aio);
 	ASSERT(percpu_counter_sum(&btp->bt_io_count) == 0);
 	percpu_counter_destroy(&btp->bt_io_count);
 	platform_flush_device(btp->bt_fd, btp->bt_bdev);
@@ -329,39 +335,132 @@ xfs_buf_allocate_memory(
  */
 
 /*
- * XXX: this will be replaced by an AIO submission engine in future. In the mean
- * time, just complete the IO synchronously so all the machinery still works.
+ * AIO context for dispatch and completion.
+ *
+ * Run completion polling in a separate thread, the poll timeout will stop it
+ * from spinning in tight loops when there is nothing to do.
  */
-static int
-submit_io(
-	struct xfs_buf *bp,
-	int		fd,
-	void		*buf,
-	xfs_daddr_t	blkno,
-	int		size,
-	bool		write)
+#define MAX_AIO_EVENTS 1024
+struct xfs_btaio {
+	io_context_t	ctxp;
+	int		aio_fd;
+	int		aio_in_flight;
+	pthread_t	completion_tid;
+	bool		done;
+};
+
+static void
+xfs_buf_aio_ioend(
+	struct io_event		*ev)
 {
-	int		ret;
+	struct xfs_buf		*bp = (struct xfs_buf *)ev->data;
 
-	if (!write)
-		ret = pread(fd, buf, size, BBTOB(blkno));
-	else
-		ret = pwrite(fd, buf, size, BBTOB(blkno));
-	if (ret < 0)
-		ret = -errno;
-	else if (ret != size)
-		ret = -EIO;
-	else
-		ret = 0;
 	/*
-	 * This is a bit of a hack until we get AIO that runs completions.
-	 * Success is treated as a completion here, but IO errors are handled as
-	 * a submission error and are handled by the caller. AIO will clean this
-	 * up.
+	 * don't overwrite existing errors - otherwise we can lose errors on
+	 * buffers that require multiple bios to complete.
+	 *
+	 * We check that the returned length was the same as specified for this
+	 * IO. Note that this onyl works for read and write - if we start
+	 * using readv/writev for discontiguous buffers then this needs more
+	 * work.
 	 */
-	if (!ret)
+	if (ev->res < 0 || ev->res != ev->obj->u.c.nbytes) {
+		int error = ev->res < 0 ? (int)ev->res : -EIO;
+
+		cmpxchg(&bp->b_io_error, 0, error);
+	}
+
+	if (atomic_dec_and_test(&bp->b_io_remaining))
 		xfs_buf_ioend(bp);
-	return ret;
+}
+
+static void
+get_io_completions(
+	struct xfs_btaio	*aio,
+	int			threshold)
+{
+	struct io_event		ioevents[MAX_AIO_EVENTS];
+	struct timespec		tout = {
+		.tv_nsec = 100*1000*1000,	/* 100ms */
+	};
+	int			i, r;
+
+	/* gather up some completions */
+	r = io_getevents(aio->ctxp, 1, MAX_AIO_EVENTS, ioevents, &tout);
+	if (r < 0)  {
+		fprintf(stderr, "FAIL! io_getevents returned %d\n", r);
+		if (r == -EINTR)
+			return;
+		exit(1);
+	}
+	if (r == 0) {
+		/* timeout, return to caller to check for what to do next */
+		return;
+	}
+
+	atomic_sub(&aio->aio_in_flight, r);
+	for (i = 0; i < r; ++i)
+		xfs_buf_aio_ioend(&ioevents[i]);
+}
+
+static void *
+aio_completion_thread(
+	void			*arg)
+{
+	struct xfs_btaio	*aio = arg;
+
+	while (!aio->done) {
+		get_io_completions(aio, 1);
+	}
+	return NULL;
+}
+
+static int
+submit_aio(
+	struct xfs_buf		*bp,
+	void			*buf,
+	xfs_daddr_t		blkno,
+	int			size)
+{
+	struct xfs_btaio	*aio = bp->b_target->bt_aio;
+	int			r;
+
+	if (!aio->aio_fd)
+		aio->aio_fd = bp->b_target->bt_fd;
+
+	/*
+	 * Reserve and bound the number of in flight IOs to keep the number of
+	 * pending IOs overrunning the tail of the event loop. This also serves
+	 * to throttle incoming IOs without burning CPU by spinning.
+	 */
+	while (!atomic_add_unless(&aio->aio_in_flight, 1, MAX_AIO_EVENTS - 1))
+		get_io_completions(aio, 1);
+
+	if (bp->b_flags & XBF_WRITE)
+		io_prep_pwrite(&bp->b_iocb, aio->aio_fd, buf, size, BBTOB(blkno));
+	else
+		io_prep_pread(&bp->b_iocb, aio->aio_fd, buf, size, BBTOB(blkno));
+
+	bp->b_iocb.data = bp;
+	do {
+		struct iocb		*iocb;
+
+		iocb = &bp->b_iocb;
+		r = io_submit(aio->ctxp, 1, &iocb);
+		if (r == 1)
+			return 0;	/* successful submission */
+		fprintf(stderr, "io_submit returned %d\n", r);
+		if (r != -EAGAIN)
+			break;
+		/* On EAGAIN, reap some completions and try again. */
+		get_io_completions(aio, 1);
+	} while (1);
+
+	if (bp->b_flags & LIBXFS_B_EXIT)
+		exit(1);
+
+	atomic_dec(&aio->aio_in_flight);
+	return r;
 }
 
 static void
@@ -373,7 +472,6 @@ xfs_buftarg_submit_io_map(
 {
 	int		size;
 	int		offset;
-	bool		rw = (bp->b_flags & XBF_WRITE);
 	int		error;
 
 	offset = *buf_offset;
@@ -388,8 +486,7 @@ xfs_buftarg_submit_io_map(
 
 	atomic_inc(&bp->b_io_remaining);
 
-	error = submit_io(bp, bp->b_target->bt_fd, bp->b_addr + offset,
-			  bp->b_maps[map].bm_bn, size, rw);
+	error = submit_aio(bp, bp->b_addr + offset, bp->b_maps[map].bm_bn, size);
 	if (error) {
 		/*
 		 * This is guaranteed not to be the last io reference count
@@ -474,6 +571,49 @@ xfs_buftarg_submit_io(
 	}
 }
 
+int
+xfs_buftarg_aio_init(
+	struct xfs_btaio	**aiop)
+{
+	struct xfs_btaio	*aio;
+	int			r;
+
+	aio = calloc(1, sizeof(*aio));
+	if (!aio)
+		return -ENOMEM;
+
+	r = io_setup(MAX_AIO_EVENTS, &aio->ctxp);
+	if (r) {
+		printf("FAIL! io_setup returned %d\n", r);
+		goto free_aio;
+	}
+
+	r = pthread_create(&aio->completion_tid, NULL, aio_completion_thread,
+			aio);
+	if (r) {
+		printf("FAIL! aio thread create returned %d\n", r);
+		goto free_aio;
+	}
+
+	*aiop = aio;
+	return 0;
+
+free_aio:
+	free(aio);
+	return r;
+}
+
+void
+xfs_buftarg_aio_destroy(
+	struct xfs_btaio	*aio)
+{
+	if (!aio)
+		return;
+
+	aio->done = true;
+	pthread_join(aio->completion_tid, NULL);
+	free(aio);
+}
 /*
  * Return a buffer associated to external memory via xfs_buf_associate_memory()
  * back to it's empty state.
diff --git a/libxfs/xfs_buf.h b/libxfs/xfs_buf.h
index 4b6dff885165..29fbaaab4abb 100644
--- a/libxfs/xfs_buf.h
+++ b/libxfs/xfs_buf.h
@@ -81,6 +81,12 @@ struct xfs_buf {
 	struct completion	b_iowait;
 	struct semaphore	b_sema;
 	xfs_buf_iodone_t	b_iodone;
+
+	/*
+	 * XXX: AIO needs as many iocbs as we have maps for discontig
+	 * buffers to work correctly.
+	 */
+	struct iocb		b_iocb;
 };
 
 struct xfs_buf *xfs_buf_incore(struct xfs_buftarg *target,
diff --git a/libxfs/xfs_buftarg.h b/libxfs/xfs_buftarg.h
index 61c4a3164d23..62aa6f236537 100644
--- a/libxfs/xfs_buftarg.h
+++ b/libxfs/xfs_buftarg.h
@@ -16,6 +16,7 @@ struct xfs_buf_ops;
 struct xfs_buf;
 struct xfs_buf_map;
 struct xfs_mount;
+struct xfs_btaio;
 
 /* this needs to die */
 #define LIBXFS_BBTOOFF64(bbs)	(((xfs_off_t)(bbs)) << BBSHIFT)
@@ -55,6 +56,9 @@ struct xfs_buftarg {
 	bool			bt_exiting;
 	bool			bt_low_mem;
 	struct completion	bt_low_mem_wait;
+
+	/* AIO submission/completion structures */
+	struct xfs_btaio	*bt_aio;
 };
 
 /* We purged a dirty buffer and lost a write. */
@@ -90,6 +94,9 @@ int xfs_buf_associate_memory(struct xfs_buf *bp, void *mem, size_t length);
 void xfs_buftarg_submit_io(struct xfs_buf *bp);
 void xfs_buf_mark_dirty(struct xfs_buf *bp);
 
+int xfs_buftarg_aio_init(struct xfs_btaio **aiop);
+void xfs_buftarg_aio_destroy(struct xfs_btaio *aio);
+
 /*
  * Cached buffer memory manangement
  */
-- 
2.28.0


  parent reply	other threads:[~2020-10-15  7:22 UTC|newest]

Thread overview: 49+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-10-15  7:21 [PATCH 00/27] [RFC, WIP] xfsprogs: xfs_buf unification and AIO Dave Chinner
2020-10-15  7:21 ` [PATCH 01/27] xfsprogs: remove unused buffer tracing code Dave Chinner
2020-10-15  7:21 ` [PATCH 02/27] xfsprogs: remove unused IO_DEBUG functionality Dave Chinner
2020-11-16  2:31   ` Eric Sandeen
2020-10-15  7:21 ` [PATCH 03/27] libxfs: get rid of b_bcount from xfs_buf Dave Chinner
2020-11-23 19:53   ` Eric Sandeen
2020-10-15  7:21 ` [PATCH 04/27] libxfs: rename buftarg->dev to btdev Dave Chinner
2020-11-16  2:33   ` Eric Sandeen
2020-10-15  7:21 ` [PATCH 05/27] xfsprogs: get rid of ancient btree tracing fragments Dave Chinner
2020-11-16  2:35   ` Eric Sandeen
2020-10-15  7:21 ` [PATCH 06/27] xfsprogs: remove xfs_buf_t typedef Dave Chinner
2020-10-15 15:22   ` Darrick J. Wong
2020-10-15 20:54     ` Dave Chinner
2020-10-15  7:21 ` [PATCH 07/27] xfsprogs: introduce liburcu support Dave Chinner
2020-10-15  7:21 ` [PATCH 08/27] libxfs: add spinlock_t wrapper Dave Chinner
2020-10-15  7:21 ` [PATCH 09/27] atomic: convert to uatomic Dave Chinner
2020-10-15  7:21 ` [PATCH 10/27] libxfs: add kernel-compatible completion API Dave Chinner
2020-10-15 17:09   ` Darrick J. Wong
2020-10-19 22:21     ` Dave Chinner
2020-10-15  7:21 ` [PATCH 11/27] libxfs: add wrappers for kernel semaphores Dave Chinner
2020-10-15  7:21 ` [PATCH 12/27] xfsprogs: convert use-once buffer reads to uncached IO Dave Chinner
2020-10-15 17:12   ` Darrick J. Wong
2020-10-19 22:36     ` Dave Chinner
2020-10-15  7:21 ` [PATCH 13/27] libxfs: introduce userspace buftarg infrastructure Dave Chinner
2020-10-15  7:21 ` [PATCH 14/27] xfs: rename libxfs_buftarg_init to libxfs_open_devices() Dave Chinner
2020-10-15  7:21 ` [PATCH 15/27] libxfs: introduce userspace buftarg infrastructure Dave Chinner
2020-10-15 17:16   ` Darrick J. Wong
2020-10-15  7:21 ` [PATCH 16/27] libxfs: add a synchronous IO engine to the buftarg Dave Chinner
2020-10-15  7:21 ` [PATCH 17/27] xfsprogs: convert libxfs_readbufr to libxfs_buf_read_uncached Dave Chinner
2020-10-15  7:21 ` [PATCH 18/27] libxfs: convert libxfs_bwrite to buftarg IO Dave Chinner
2020-10-15  7:21 ` [PATCH 19/27] libxfs: add cache infrastructure to buftarg Dave Chinner
2020-10-15  7:21 ` [PATCH 20/27] libxfs: add internal lru to btcache Dave Chinner
2020-10-15  7:21 ` [PATCH 21/27] libxfs: Add kernel list_lru wrapper Dave Chinner
2020-10-15  7:21 ` [PATCH 22/27] libxfs: introduce new buffer cache infrastructure Dave Chinner
2020-10-15 17:46   ` Darrick J. Wong
2020-10-15  7:21 ` [PATCH 23/27] libxfs: use PSI information to detect memory pressure Dave Chinner
2020-10-15 17:56   ` Darrick J. Wong
2020-10-15 21:20     ` Dave Chinner
2020-10-15  7:21 ` [PATCH 24/27] libxfs: add a buftarg cache shrinker implementation Dave Chinner
2020-10-15 18:01   ` Darrick J. Wong
2020-10-15 21:33     ` Dave Chinner
2020-10-15  7:21 ` [PATCH 25/27] libxfs: switch buffer cache implementations Dave Chinner
2020-10-15  7:21 ` [PATCH 26/27] build: set platform_defs.h.in dependency correctly Dave Chinner
2020-10-15  7:21 ` Dave Chinner [this message]
2020-10-15 18:26   ` [PATCH 27/27] libxfs: convert sync IO buftarg engine to AIO Darrick J. Wong
2020-10-15 21:42     ` Dave Chinner
2020-10-15  7:29 ` [PATCH 00/27] [RFC, WIP] xfsprogs: xfs_buf unification and AIO Dave Chinner
2020-10-15 18:37 ` Darrick J. Wong
2020-10-15 22:35   ` Dave Chinner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20201015072155.1631135-28-david@fromorbit.com \
    --to=david@fromorbit.com \
    --cc=linux-xfs@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).