All of lore.kernel.org
 help / color / mirror / Atom feed
* [patch,rfc] aio: allocate kiocbs in batches
@ 2011-07-05 20:35 Jeff Moyer
  2011-09-01  0:13 ` Andrew Morton
  0 siblings, 1 reply; 5+ messages in thread
From: Jeff Moyer @ 2011-07-05 20:35 UTC (permalink / raw)
  To: linux-aio, linux-fsdevel

Hi,

When looking at perf data for my aio-stress test runs against some
relatively fast storage, I noticed that __aio_get_req was taking up more
CPU than I expected.  The command line for aio-stress was:
  aio-stress -O -r 8 -d 256 -b 32 -l -s 16384 /dev/sdk
which means we're sending 32 iocbs to io_submit at a time.  Here's the
perf report -g output:

Events: 41K cycles
-      7.33%  aio-stress  [kernel.kallsyms]             [k] _raw_spin_lock_irq
   - _raw_spin_lock_irq
      + 75.38% scsi_request_fn
      - 10.67% __aio_get_req
           io_submit_one

The spinlock is the ctx_lock, taken when allocating the request.  This
seemed like pretty low-hanging fruit to me, so I cooked up a patch to
allocate 32 kiocbs at a time.  This is the result:

Events: 50K cycles
-      5.54%  aio-stress  [kernel.kallsyms]             [k] _raw_spin_lock_irq
   - _raw_spin_lock_irq
      + 83.23% scsi_request_fn
      - 5.14% io_submit_one

So, aio-stress now takes up ~2% less CPU.  This seems like a worth-while
thing to do, but I don't have a system handy that would show any real
performance gains from it.  The higher the number of iocbs passed to
io_submit, the more of a win this will be.

FWIW, I tested this with multiple different batch sizes (1, 32, 33, 64,
256) and it didn't blow up.

Comments?

Cheers,
Jeff

Signed-off-by: Jeff Moyer <jmoyer@redhat.com>

diff --git a/fs/aio.c b/fs/aio.c
index e29ec48..ae6066f 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -440,8 +440,6 @@ void exit_aio(struct mm_struct *mm)
 static struct kiocb *__aio_get_req(struct kioctx *ctx)
 {
 	struct kiocb *req = NULL;
-	struct aio_ring *ring;
-	int okay = 0;
 
 	req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL);
 	if (unlikely(!req))
@@ -459,40 +457,107 @@ static struct kiocb *__aio_get_req(struct kioctx *ctx)
 	INIT_LIST_HEAD(&req->ki_run_list);
 	req->ki_eventfd = NULL;
 
-	/* Check if the completion queue has enough free space to
-	 * accept an event from this io.
+	return req;
+}
+
+#define KIOCB_STASH_SIZE	32
+struct kiocb_stash {
+	unsigned head;
+	unsigned tail;
+	unsigned total;
+	unsigned used;
+	struct kiocb *kiocbs[KIOCB_STASH_SIZE];
+};
+static void kiocb_stash_init(struct kiocb_stash *stash, int nr)
+{
+	stash->head = stash->tail = 0;
+	stash->total = nr;
+	stash->used = 0;
+}
+static void kiocb_stash_free(struct kiocb_stash *stash, int used)
+{
+	int i;
+
+	for (i = used; i < stash->total; i++)
+		kmem_cache_free(kiocb_cachep, stash->kiocbs[i]);
+}
+static inline unsigned kiocb_stash_avail(struct kiocb_stash *stash)
+{
+	return stash->tail - stash->head;
+}
+
+/*
+ * Allocate nr kiocbs.  This aovids taking and dropping the lock a
+ * lot during setup.
+ */
+static int kiocb_stash_refill(struct kioctx *ctx, struct kiocb_stash *stash)
+{
+	int i, nr = KIOCB_STASH_SIZE;
+	int avail;
+	int called_fput = 0;
+	struct aio_ring *ring;
+
+	/*
+	 * Allocate the requests up-front.  Later, we'll trim the list
+	 * if there isn't enough room in the completion ring.
 	 */
+	stash->head = 0;
+	if ((stash->total - stash->used) % KIOCB_STASH_SIZE)
+		/* don't need to allocate a full batch */
+		nr = stash->total - stash->used;
+
+	for (i = 0; i < nr; i++) {
+		stash->kiocbs[i] = __aio_get_req(ctx);
+		if (!stash->kiocbs[i] && !called_fput) {
+			/* Handle a potential starvation case --
+			 * should be exceedingly rare as requests will
+			 * be stuck on fput_head only if the
+			 * aio_fput_routine is delayed and the
+			 * requests were the last user of the struct
+			 * file.
+			 */
+			aio_fput_routine(NULL);
+			called_fput = 1;
+			i--;
+			continue;
+		}
+	}
+	stash->tail = i;
+
 	spin_lock_irq(&ctx->ctx_lock);
 	ring = kmap_atomic(ctx->ring_info.ring_pages[0], KM_USER0);
-	if (ctx->reqs_active < aio_ring_avail(&ctx->ring_info, ring)) {
-		list_add(&req->ki_list, &ctx->active_reqs);
+
+	avail = aio_ring_avail(&ctx->ring_info, ring) + ctx->reqs_active;
+	if (avail < stash->tail) {
+		/* Trim back the number of requests. */
+		for (i = avail; i < stash->tail; i++)
+			kmem_cache_free(kiocb_cachep, stash->kiocbs[i]);
+		stash->tail = avail;
+	}
+	for (i = 0; i < stash->tail; i++) {
+		list_add(&stash->kiocbs[i]->ki_list, &ctx->active_reqs);
 		ctx->reqs_active++;
-		okay = 1;
 	}
 	kunmap_atomic(ring, KM_USER0);
 	spin_unlock_irq(&ctx->ctx_lock);
 
-	if (!okay) {
-		kmem_cache_free(kiocb_cachep, req);
-		req = NULL;
-	}
-
-	return req;
+	if (stash->tail == 0)
+		return -1;
+	return 0;
 }
 
-static inline struct kiocb *aio_get_req(struct kioctx *ctx)
+static inline struct kiocb *aio_get_req(struct kioctx *ctx,
+					struct kiocb_stash *stash)
 {
-	struct kiocb *req;
-	/* Handle a potential starvation case -- should be exceedingly rare as 
-	 * requests will be stuck on fput_head only if the aio_fput_routine is 
-	 * delayed and the requests were the last user of the struct file.
-	 */
-	req = __aio_get_req(ctx);
-	if (unlikely(NULL == req)) {
-		aio_fput_routine(NULL);
-		req = __aio_get_req(ctx);
+	int ret;
+
+	if (kiocb_stash_avail(stash) == 0) {
+		ret = kiocb_stash_refill(ctx, stash);
+		if (ret)
+			return NULL;
 	}
-	return req;
+	stash->used++;
+	return stash->kiocbs[stash->head++];
 }
 
 static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
@@ -1515,7 +1580,8 @@ static ssize_t aio_setup_iocb(struct kiocb *kiocb, bool compat)
 }
 
 static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
-			 struct iocb *iocb, bool compat)
+			 struct iocb *iocb, struct kiocb_stash *stash,
+			 bool compat)
 {
 	struct kiocb *req;
 	struct file *file;
@@ -1541,7 +1607,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 	if (unlikely(!file))
 		return -EBADF;
 
-	req = aio_get_req(ctx);		/* returns with 2 references to req */
+	req = aio_get_req(ctx, stash);	/* returns with 2 references to req */
 	if (unlikely(!req)) {
 		fput(file);
 		return -EAGAIN;
@@ -1621,8 +1687,9 @@ long do_io_submit(aio_context_t ctx_id, long nr,
 {
 	struct kioctx *ctx;
 	long ret = 0;
-	int i;
+	int i = 0;
 	struct blk_plug plug;
+	struct kiocb_stash kiocbs;
 
 	if (unlikely(nr < 0))
 		return -EINVAL;
@@ -1639,6 +1706,8 @@ long do_io_submit(aio_context_t ctx_id, long nr,
 		return -EINVAL;
 	}
 
+	kiocb_stash_init(&kiocbs, nr);
+
 	blk_start_plug(&plug);
 
 	/*
@@ -1659,12 +1728,13 @@ long do_io_submit(aio_context_t ctx_id, long nr,
 			break;
 		}
 
-		ret = io_submit_one(ctx, user_iocb, &tmp, compat);
+		ret = io_submit_one(ctx, user_iocb, &tmp, &kiocbs, compat);
 		if (ret)
 			break;
 	}
 	blk_finish_plug(&plug);
 
+	kiocb_stash_free(&kiocbs, i);
 	put_ioctx(ctx);
 	return i ? i : ret;
 }

^ permalink raw reply related	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2011-09-08 17:54 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2011-07-05 20:35 [patch,rfc] aio: allocate kiocbs in batches Jeff Moyer
2011-09-01  0:13 ` Andrew Morton
2011-09-01 13:56   ` Jeff Moyer
2011-09-01 22:35     ` Andrew Morton
2011-09-08 17:54       ` Jeff Moyer

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.