On Fri, May 24, 2019 at 07:33:34PM +0530, Aarushi Mehta wrote: > Signed-off-by: Aarushi Mehta > --- > MAINTAINERS | 1 + > block/Makefile.objs | 2 + > block/io_uring.c | 306 ++++++++++++++++++++++++++++++++++++++++++++ > 3 files changed, 309 insertions(+) > create mode 100644 block/io_uring.c > > diff --git a/MAINTAINERS b/MAINTAINERS > index b8fc1e3fe3..770d562c6c 100644 > --- a/MAINTAINERS > +++ b/MAINTAINERS > @@ -2510,6 +2510,7 @@ R: Stefan Hajnoczi > L: qemu-block@nongnu.org > S: Maintained > F: stubs/io_uring.c > +F: block/io_uring.c > > > L: qemu-block@nongnu.original > diff --git a/block/Makefile.objs b/block/Makefile.objs > index 7a81892a52..262d413c6d 100644 > --- a/block/Makefile.objs > +++ b/block/Makefile.objs > @@ -18,6 +18,7 @@ block-obj-y += block-backend.o snapshot.o qapi.o > block-obj-$(CONFIG_WIN32) += file-win32.o win32-aio.o > block-obj-$(CONFIG_POSIX) += file-posix.o > block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o > +block-obj-$(CONFIG_LINUX_IO_URING) += io_uring.o > block-obj-y += null.o mirror.o commit.o io.o create.o > block-obj-y += throttle-groups.o > block-obj-$(CONFIG_LINUX) += nvme.o > @@ -61,5 +62,6 @@ block-obj-$(if $(CONFIG_LZFSE),m,n) += dmg-lzfse.o > dmg-lzfse.o-libs := $(LZFSE_LIBS) > qcow.o-libs := -lz > linux-aio.o-libs := -laio > +io_uring.o-libs := -luring > parallels.o-cflags := $(LIBXML2_CFLAGS) > parallels.o-libs := $(LIBXML2_LIBS) > diff --git a/block/io_uring.c b/block/io_uring.c > new file mode 100644 > index 0000000000..817ec055db > --- /dev/null > +++ b/block/io_uring.c > @@ -0,0 +1,306 @@ > +/* > + * Linux io_uring support. > + * > + * Copyright (C) 2009 IBM, Corp. > + * Copyright (C) 2009 Red Hat, Inc. > + * > + * This work is licensed under the terms of the GNU GPL, version 2 or later. > + * See the COPYING file in the top-level directory. > + */ > +#include Please move this include below osdep.h as per ./HACKING "1.2. Include directives". > + > +#include "qemu/osdep.h" > +#include "qemu-common.h" > +#include "block/aio.h" > +#include "qemu/queue.h" > +#include "block/block.h" > +#include "block/raw-aio.h" > +#include "qemu/event_notifier.h" Unused, please drop. > +#include "qemu/coroutine.h" > +#include "qapi/error.h" > + > +#define MAX_EVENTS 128 > + > +typedef struct LuringAIOCB { > + BlockAIOCB common; > + Coroutine *co; > + struct io_uring_sqe sqeq; > + int ret; > + QSIMPLEQ_ENTRY(LuringAIOCB) next; > +} LuringAIOCB; > + > +typedef struct LuringQueue { > + int plugged; > + unsigned int in_queue; > + unsigned int in_flight; > + bool blocked; > + QSIMPLEQ_HEAD(, LuringAIOCB) pending; > +} LuringQueue; > + > +typedef struct LuringState { > + AioContext *aio_context; > + > + struct io_uring ring; > + > + /* io queue for submit at batch. Protected by AioContext lock. */ > + LuringQueue io_q; > + > + /* I/O completion processing. Only runs in I/O thread. */ > + QEMUBH *completion_bh; > + int event_idx; > + int event_max; > +} LuringState; > + > +static void ioq_submit(LuringState *s); > + > +static inline int io_cqe_ret(struct io_uring_cqe *cqe) > +{ > + return cqe->res; > +} > + > +/** > + * qemu_luring_process_completions: > + * @s: AIO state > + * > + * Fetches completed I/O requests, consumes cqes and invokes their callbacks. > + * > + */ > +static void qemu_luring_process_completions(LuringState *s) > +{ > + struct io_uring_cqe *cqes; > + > + qemu_bh_schedule(s->completion_bh); Please add a comment explaining why this is necessary: /* * Request completion callbacks can run the nested event loop. * Schedule ourselves so the nested event loop will "see" remaining * completed requests and process them. Without this, completion * callbacks that wait for other requests using a nested event loop * would hang forever. */ > + > + while ((s->event_max = s->io_q.in_flight) && > + !io_uring_peek_cqe(&s->ring, &cqes)) { > + for (s->event_idx = 0; s->event_idx < s->event_max;) { > + io_uring_cqe_seen(&s->ring, cqes); What is the purpose of event_max/event_idx given that we can consume cqes via io_uring_peek_cqe() + io_uring_cqe_seen()? I think just this will do: while (!io_uring_peek_cqe(&s->ring, &cqes)) { int ret = io_cqe_ret(cqes); /* We're done accessing cqes, replenish the cq ring */ io_uring_cqe_seen(&s->ring, cqes); > + > + LuringAIOCB *luringcb; > + luringcb = g_malloc0(sizeof(luringcb)); We need to fetch the original LuringAIOCB associated with this request instead of allocating a new one. The original LuringAIOCB contains the coroutine and async completion callback function pointer that we need. io_uring allows us to pass user data along with the sqe and it gets returned in the cqe. During request submission: io_uring_sqe_set_data(sqes, luringcb); During request completion: LuringAIOCB *luringcb = io_uring_cqe_get_data(cqes); > + luringcb->ret = io_cqe_ret(cqes); > + /* Change counters one-by-one because we can be nested. */ > + s->io_q.in_flight--; > + s->event_idx++; We must invoke completion and possibly unref luringcb here: if (luringcb->co) { /* If the coroutine is already entered it must be in ioq_submit() and * will notice laio->ret has been filled in when it eventually runs * later. Coroutines cannot be entered recursively so avoid doing * that! */ if (!qemu_coroutine_entered(luringcb->co)) { aio_wake(luringcb->co); } } else { luringcb->common.cb(luringcb->common.opaque, luringcb->ret); qemu_aio_unref(luringcb); } Without this code the request is never completed. How are you testing this code (you mentioned the guest boots)? Is it possible that the Linux AIO or thread-pool code path is being taken accidentally and we're actually not running io_uring yet? > + } > + } > + > + qemu_bh_cancel(s->completion_bh); > + > + /* > + *If we are nested we have to notify the level above that we are done > + * by setting event_max to zero, upper level will then jump out of it's > + * own `for` loop. If we are the last all counters dropped to zero. > + */ > + s->event_max = 0; > + s->event_idx = 0; > +} > + > +static void qemu_luring_process_completions_and_submit(LuringState *s) > +{ > + aio_context_acquire(s->aio_context); > + qemu_luring_process_completions(s); > + > + if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { Please use in_queue instead of checking io_q.pending since there might be requests in the sq ring that also need to be submitted. > + ioq_submit(s); > + } > + aio_context_release(s->aio_context); > +} > + > +static void qemu_luring_completion_bh(void *opaque) > +{ > + LuringState *s = opaque; > + qemu_luring_process_completions_and_submit(s); > +} > + > +static void qemu_luring_completion_cb(void *opaque) > +{ > + LuringState *s = opaque; > + qemu_luring_process_completions_and_submit(s); > +} > + > +static const AIOCBInfo luring_aiocb_info = { > + .aiocb_size = sizeof(LuringAIOCB), > +}; > + > + > +static void ioq_init(LuringQueue *io_q) > +{ > + QSIMPLEQ_INIT(&io_q->pending); > + io_q->plugged = 0; > + io_q->in_queue = 0; > + io_q->in_flight = 0; > + io_q->blocked = false; > +} > + > +static void ioq_submit(LuringState *s) > +{ > + int ret, len; > + LuringAIOCB *luringcb; > + QSIMPLEQ_HEAD(, LuringAIOCB) completed; > + > + while (s->io_q.in_flight >= MAX_EVENTS && s->io_q.in_queue) { Try to submit requests as long as MAX_EVENTS has not been reached: s/>=/ + len = 0; > + QSIMPLEQ_FOREACH(luringcb, &s->io_q.pending, next) { > + if (s->io_q.in_flight + len++ >= MAX_EVENTS) { > + break; > + } Can we rely on io_uring_get_sqe() failing when the sq ring is exhausted? That way there's no need for this if statement. > + struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring); > + if (sqes) { /* Prep sqe for subission */ > + memset(sqes, 0, sizeof(*sqes)); memset is unnecessary since the next statement overwrites all of sqes. > + *sqes = luringcb->sqeq; > + QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next); Careful, a special API is needed when you want to modify the queue during iteration. Please see QSIMPLEQ_FOREACH_SAFE(). > + } else { > + break; > + } The simpler form of this if statement is: if (!sqes) { break; } ...the success case without nesting... This is just a style suggestion. I find code easier to read without nesting. Feel free to leave it if you prefer your way. > + } > + > + ret = io_uring_submit(&s->ring); > + if (ret == -EAGAIN) { > + break; > + } Actually in all error cases since we don't want to increment in_flight/in_queue with -errno: if (ret < 0) { break; } Please add a TODO comment for error handling, the error is currently ignored and this could be a problem. In the EAGAIN case the kernel is unable to process more requests temporarily and we should try again later. But we don't try again later, so this could result in hung I/O. > + > + s->io_q.in_flight += ret; > + s->io_q.in_queue -= ret; > + QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, luringcb, next, &completed); Hmm...what is this doing? QSIMPLEQ_REMOVE_HEAD() was already called earlier on, so why modify the list again? > + } > + s->io_q.blocked = (s->io_q.in_queue > 0); > + > + if (s->io_q.in_flight) { > + /* > + * We can try to complete something just right away if there are > + * still requests in-flight. > + */ > + qemu_luring_process_completions(s); > + } > +} > + > +void luring_io_plug(BlockDriverState *bs, LuringState *s) > +{ > + s->io_q.plugged++; > +} > + > +void luring_io_unplug(BlockDriverState *bs, LuringState *s) > +{ > + assert(s->io_q.plugged); > + if (--s->io_q.plugged == 0 && > + !s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { Remember to take into account requests in the sq ring, they also need to be submitted: s/!QSIMPLEQ_EMPTY(&s->io_q.pending)/s->io_q.in_queue > 0/ > + ioq_submit(s); > + } > +} > + > +static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s, > + uint64_t offset, QEMUIOVector *qiov, int type) > +{ > + struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring); > + if (!sqes) { > + sqes = &luringcb->sqeq; > + QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, luringcb, next); > + } > + > + switch (type) { > + case QEMU_AIO_WRITE: > + io_uring_prep_writev(sqes, fd, qiov->iov, qiov->niov, offset); > + break; > + case QEMU_AIO_READ: > + io_uring_prep_readv(sqes, fd, qiov->iov, qiov->niov, offset); > + break; > + case QEMU_AIO_FLUSH: > + io_uring_prep_fsync(sqes, fd, IORING_FSYNC_DATASYNC); > + break; > + default: > + fprintf(stderr, "%s: invalid AIO request type, aborting 0x%x.\n", > + __func__, type); > + abort(); > + } > + > + s->io_q.in_queue++; It's easy to think that "in_queue" is just the length of io_q.pending, but that's incorrect. "pending" and "in_queue" have different semantics and it's a bit subtle: * The "pending" queue is only used when the sq ring is full. * The "in_queue" counter indicates the total number of requests on the io_q.pending list and in the sq_ring but not yet consumed by the kernel. Changing the names would help. Here is my suggestion, maybe you have a better idea: * Call the list io_q.sq_overflow so it's clear these are requests that didn't fit into the sq ring. * Call the counter num_unsubmitted. I don't know about this one, it's hard to find a good name :). Maybe it can be left as in_queue. > + if (!s->io_q.blocked && > + (!s->io_q.plugged || > + s->io_q.in_flight + s->io_q.in_queue >= MAX_EVENTS)) { > + ioq_submit(s); > + } > + > + return 0; > +} > + > +int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd, > + uint64_t offset, QEMUIOVector *qiov, int type) > +{ > + int ret; > + LuringAIOCB luringcb = { > + .co = qemu_coroutine_self(), > + .ret = -EINPROGRESS, > + }; > + > + ret = luring_do_submit(fd, &luringcb, s, offset, qiov, type); > + if (ret < 0) { > + return ret; > + } > + > + if (luringcb.ret == -EINPROGRESS) { > + qemu_coroutine_yield(); > + } > + return luringcb.ret; > +} > + > +BlockAIOCB *luring_submit(BlockDriverState *bs, LuringState *s, int fd, > + int64_t sector_num, QEMUIOVector *qiov, BlockCompletionFunc *cb, > + void *opaque, int type) > +{ > + LuringAIOCB *luringcb; > + off_t offset = sector_num * BDRV_SECTOR_SIZE; > + int ret; > + > + luringcb = qemu_aio_get(&luring_aiocb_info, bs, cb, opaque); > + luringcb->ret = -EINPROGRESS; > + ret = luring_do_submit(fd, luringcb, s, offset, qiov, type); > + if (ret < 0) { > + qemu_aio_unref(luringcb); > + return NULL; > + } > + > + return &luringcb->common; > +} > + > +void luring_detach_aio_context(LuringState *s, AioContext *old_context) > +{ > + aio_set_fd_handler(old_context, s->ring.ring_fd, false, NULL, NULL, NULL, > + &s); > + qemu_bh_delete(s->completion_bh); > + s->aio_context = NULL; > +} > + > +void luring_attach_aio_context(LuringState *s, AioContext *new_context) > +{ > + s->aio_context = new_context; > + s->completion_bh = aio_bh_new(new_context, qemu_luring_completion_bh, s); > + aio_set_fd_handler(s->aio_context, s->ring.ring_fd, false, > + (IOHandler *)qemu_luring_completion_cb, NULL, NULL, &s); Casting function pointers is suspicious because it often indicates unportable code (it relies on assumptions about the calling convention that the compiler is using). Luckily this type cast seems unnecessary and can be dropped: static void qemu_luring_completion_cb(void *opaque) == typedef void IOHandler(void *opaque); > +} > + > +LuringState *luring_init(Error **errp) > +{ > + int rc; > + LuringState *s; > + s = g_malloc0(sizeof(*s)); > + struct io_uring *ring = &s->ring; > + rc = io_uring_queue_init(MAX_EVENTS, ring, 0); > + if (rc == -1) { > + error_setg_errno(errp, errno, "failed to init linux io_uring ring"); > + goto out_close_efd; > + } > + > + ioq_init(&s->io_q); > + return s; > + > +out_close_efd: There is no eventfd so "efd" is outdated. Given that there are no other gotos, you could just inline this return code and avoid the goto altogether. > + g_free(s); > + return NULL; > +} > + > +void luring_cleanup(LuringState *s) > +{ > + io_uring_queue_exit(&s->ring); > + g_free(s); > +} > -- > 2.17.1