All of lore.kernel.org
 help / color / mirror / Atom feed
From: Anthony Liguori <aliguori@us.ibm.com>
To: qemu-devel@nongnu.org
Cc: kvm@vger.kernel.org
Subject: [PATCH][RFC] Linux AIO support when using O_DIRECT
Date: Mon, 23 Mar 2009 10:45:24 -0500	[thread overview]
Message-ID: <1237823124-6417-1-git-send-email-aliguori@us.ibm.com> (raw)

This is just a first cut.  It needs a fair bit of cleanup before it can be
committed.  I also think we need to fixup the AIO abstractions a bit.

I wanted to share though in case anyone is interested in doing some performance
comparisons.  It seems to work although I haven't exercised it very much.

diff --git a/Makefile b/Makefile
index 82fec80..afc6b41 100644
--- a/Makefile
+++ b/Makefile
@@ -61,6 +61,9 @@ else
 ifdef CONFIG_AIO
 BLOCK_OBJS += posix-aio-compat.o
 endif
+ifdef CONFIG_LINUX_AIO
+BLOCK_OBJS += linux-aio.o
+endif
 BLOCK_OBJS += block-raw-posix.o
 endif
 
diff --git a/Makefile.target b/Makefile.target
index 41366ee..df2a794 100644
--- a/Makefile.target
+++ b/Makefile.target
@@ -514,6 +514,9 @@ else
 ifdef CONFIG_AIO
 OBJS+=posix-aio-compat.o
 endif
+ifdef CONFIG_LINUX_AIO
+OBJS+=linux-aio.o
+endif
 OBJS+=block-raw-posix.o
 endif
 
diff --git a/block-raw-posix.c b/block-raw-posix.c
index 1a1a178..e355cf4 100644
--- a/block-raw-posix.c
+++ b/block-raw-posix.c
@@ -29,6 +29,9 @@
 #ifdef CONFIG_AIO
 #include "posix-aio-compat.h"
 #endif
+#ifdef CONFIG_LINUX_AIO
+#include "linux-aio.h"
+#endif
 
 #ifdef CONFIG_COCOA
 #include <paths.h>
@@ -68,6 +71,10 @@
 #include <sys/diskslice.h>
 #endif
 
+#ifdef CONFIG_LINUX_AIO
+#include "linux-aio.h"
+#endif
+
 //#define DEBUG_FLOPPY
 
 //#define DEBUG_BLOCK
@@ -98,6 +105,17 @@
    reopen it to see if the disk has been changed */
 #define FD_OPEN_TIMEOUT 1000
 
+typedef struct AIOOperations
+{
+    struct qemu_aiocb *(*get_aiocb)(void);
+    void (*put_aiocb)(struct qemu_aiocb *);
+    int (*read)(struct qemu_aiocb *);
+    int (*write)(struct qemu_aiocb *);
+    int (*error)(struct qemu_aiocb *);
+    ssize_t (*get_result)(struct qemu_aiocb *aiocb);
+    int (*cancel)(int fd, struct qemu_aiocb *aiocb);
+} AIOOperations;
+
 typedef struct BDRVRawState {
     int fd;
     int type;
@@ -111,8 +129,31 @@ typedef struct BDRVRawState {
     int fd_media_changed;
 #endif
     uint8_t* aligned_buf;
+    AIOOperations *aio_ops;
 } BDRVRawState;
 
+static AIOOperations posix_aio_ops = {
+    .get_aiocb = qemu_paio_get_aiocb,
+    .put_aiocb = qemu_paio_put_aiocb,
+    .read = qemu_paio_read,
+    .write = qemu_paio_write,
+    .error = qemu_paio_error,
+    .get_result = qemu_paio_return,
+    .cancel = qemu_paio_cancel,
+};
+
+#ifdef CONFIG_LINUX_AIO
+static AIOOperations linux_aio_ops = {
+    .get_aiocb = qemu_laio_get_aiocb,
+    .put_aiocb = qemu_laio_put_aiocb,
+    .read = qemu_laio_read,
+    .write = qemu_laio_write,
+    .error = qemu_laio_error,
+    .get_result = qemu_laio_return,
+    .cancel = qemu_laio_cancel,
+};    
+#endif
+
 static int posix_aio_init(void);
 
 static int fd_open(BlockDriverState *bs);
@@ -124,6 +165,14 @@ static int raw_open(BlockDriverState *bs, const char *filename, int flags)
 
     posix_aio_init();
 
+#ifdef CONFIG_LINUX_AIO
+    if ((flags & BDRV_O_NOCACHE)) {
+        qemu_laio_init();
+        s->aio_ops = &linux_aio_ops;
+    } else
+#endif
+        s->aio_ops = &posix_aio_ops;
+
     s->lseek_err_cnt = 0;
 
     open_flags = O_BINARY;
@@ -463,7 +512,7 @@ static int raw_write(BlockDriverState *bs, int64_t sector_num,
 
 typedef struct RawAIOCB {
     BlockDriverAIOCB common;
-    struct qemu_paiocb aiocb;
+    struct qemu_aiocb *aiocb;
     struct RawAIOCB *next;
     int ret;
 } RawAIOCB;
@@ -496,19 +545,24 @@ static void posix_aio_read(void *opaque)
     for(;;) {
         pacb = &s->first_aio;
         for(;;) {
+            BDRVRawState *s;
+
             acb = *pacb;
             if (!acb)
                 goto the_end;
-            ret = qemu_paio_error(&acb->aiocb);
+
+            s = acb->common.bs->opaque;
+            ret = s->aio_ops->error(acb->aiocb);
             if (ret == ECANCELED) {
                 /* remove the request */
                 *pacb = acb->next;
+                s->aio_ops->put_aiocb(acb->aiocb);
                 qemu_aio_release(acb);
             } else if (ret != EINPROGRESS) {
                 /* end of aio */
                 if (ret == 0) {
-                    ret = qemu_paio_return(&acb->aiocb);
-                    if (ret == acb->aiocb.aio_nbytes)
+                    ret = s->aio_ops->get_result(acb->aiocb);
+                    if (ret == acb->aiocb->aio_nbytes)
                         ret = 0;
                     else
                         ret = -EINVAL;
@@ -519,6 +573,7 @@ static void posix_aio_read(void *opaque)
                 *pacb = acb->next;
                 /* call the callback */
                 acb->common.cb(acb->common.opaque, ret);
+                s->aio_ops->put_aiocb(acb->aiocb);
                 qemu_aio_release(acb);
                 break;
             } else {
@@ -553,7 +608,6 @@ static int posix_aio_init(void)
     struct sigaction act;
     PosixAioState *s;
     int fds[2];
-    struct qemu_paioinit ai;
   
     if (posix_aio_state)
         return 0;
@@ -579,6 +633,8 @@ static int posix_aio_init(void)
 
     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s);
 
+    struct qemu_paioinit ai;
+
     memset(&ai, 0, sizeof(ai));
     ai.aio_threads = 64;
     ai.aio_num = 64;
@@ -600,16 +656,15 @@ static RawAIOCB *raw_aio_setup(BlockDriverState *bs,
         return NULL;
 
     acb = qemu_aio_get(bs, cb, opaque);
-    if (!acb)
-        return NULL;
-    acb->aiocb.aio_fildes = s->fd;
-    acb->aiocb.ev_signo = SIGUSR2;
-    acb->aiocb.aio_buf = buf;
+    acb->aiocb = s->aio_ops->get_aiocb();
+    acb->aiocb->aio_fildes = s->fd;
+    acb->aiocb->ev_signo = SIGUSR2;
+    acb->aiocb->aio_buf = buf;
     if (nb_sectors < 0)
-        acb->aiocb.aio_nbytes = -nb_sectors;
+        acb->aiocb->aio_nbytes = -nb_sectors;
     else
-        acb->aiocb.aio_nbytes = nb_sectors * 512;
-    acb->aiocb.aio_offset = sector_num * 512;
+        acb->aiocb->aio_nbytes = nb_sectors * 512;
+    acb->aiocb->aio_offset = sector_num * 512;
     acb->next = posix_aio_state->first_aio;
     posix_aio_state->first_aio = acb;
     return acb;
@@ -618,7 +673,9 @@ static RawAIOCB *raw_aio_setup(BlockDriverState *bs,
 static void raw_aio_em_cb(void* opaque)
 {
     RawAIOCB *acb = opaque;
+    BDRVRawState *s = acb->common.bs->opaque;
     acb->common.cb(acb->common.opaque, acb->ret);
+    s->aio_ops->put_aiocb(acb->aiocb);
     qemu_aio_release(acb);
 }
 
@@ -633,7 +690,9 @@ static void raw_aio_remove(RawAIOCB *acb)
             fprintf(stderr, "raw_aio_remove: aio request not found!\n");
             break;
         } else if (*pacb == acb) {
+            BDRVRawState *s = acb->common.bs->opaque;
             *pacb = acb->next;
+            s->aio_ops->put_aiocb(acb->aiocb);
             qemu_aio_release(acb);
             break;
         }
@@ -656,6 +715,7 @@ static BlockDriverAIOCB *raw_aio_read(BlockDriverState *bs,
     if (unlikely(s->aligned_buf != NULL && ((uintptr_t) buf % 512))) {
         QEMUBH *bh;
         acb = qemu_aio_get(bs, cb, opaque);
+        acb->aiocb = s->aio_ops->get_aiocb();
         acb->ret = raw_pread(bs, 512 * sector_num, buf, 512 * nb_sectors);
         bh = qemu_bh_new(raw_aio_em_cb, acb);
         qemu_bh_schedule(bh);
@@ -665,7 +725,7 @@ static BlockDriverAIOCB *raw_aio_read(BlockDriverState *bs,
     acb = raw_aio_setup(bs, sector_num, buf, nb_sectors, cb, opaque);
     if (!acb)
         return NULL;
-    if (qemu_paio_read(&acb->aiocb) < 0) {
+    if (s->aio_ops->read(acb->aiocb) < 0) {
         raw_aio_remove(acb);
         return NULL;
     }
@@ -687,6 +747,7 @@ static BlockDriverAIOCB *raw_aio_write(BlockDriverState *bs,
     if (unlikely(s->aligned_buf != NULL && ((uintptr_t) buf % 512))) {
         QEMUBH *bh;
         acb = qemu_aio_get(bs, cb, opaque);
+        acb->aiocb = s->aio_ops->get_aiocb();
         acb->ret = raw_pwrite(bs, 512 * sector_num, buf, 512 * nb_sectors);
         bh = qemu_bh_new(raw_aio_em_cb, acb);
         qemu_bh_schedule(bh);
@@ -696,7 +757,7 @@ static BlockDriverAIOCB *raw_aio_write(BlockDriverState *bs,
     acb = raw_aio_setup(bs, sector_num, (uint8_t*)buf, nb_sectors, cb, opaque);
     if (!acb)
         return NULL;
-    if (qemu_paio_write(&acb->aiocb) < 0) {
+    if (s->aio_ops->write(acb->aiocb) < 0) {
         raw_aio_remove(acb);
         return NULL;
     }
@@ -707,12 +768,13 @@ static void raw_aio_cancel(BlockDriverAIOCB *blockacb)
 {
     int ret;
     RawAIOCB *acb = (RawAIOCB *)blockacb;
+    BDRVRawState *s = acb->common.bs->opaque;
 
-    ret = qemu_paio_cancel(acb->aiocb.aio_fildes, &acb->aiocb);
+    ret = s->aio_ops->cancel(acb->aiocb->aio_fildes, acb->aiocb);
     if (ret == QEMU_PAIO_NOTCANCELED) {
         /* fail safe: if the aio could not be canceled, we wait for
            it */
-        while (qemu_paio_error(&acb->aiocb) == EINPROGRESS);
+        while (s->aio_ops->error(acb->aiocb) == EINPROGRESS);
     }
 
     raw_aio_remove(acb);
@@ -938,6 +1000,14 @@ static int hdev_open(BlockDriverState *bs, const char *filename, int flags)
 
     posix_aio_init();
 
+#ifdef CONFIG_LINUX_AIO
+    if ((flags & BDRV_O_NOCACHE)) {
+        qemu_laio_init();
+        s->aio_ops = &linux_aio_ops;
+    } else
+#endif
+        s->aio_ops = &posix_aio_ops;
+
 #ifdef CONFIG_COCOA
     if (strstart(filename, "/dev/cdrom", NULL)) {
         kern_return_t kernResult;
diff --git a/configure b/configure
index 5c62c59..4913a3f 100755
--- a/configure
+++ b/configure
@@ -180,6 +180,7 @@ build_docs="no"
 uname_release=""
 curses="yes"
 aio="yes"
+linuxaio="yes"
 nptl="yes"
 mixemu="no"
 bluez="yes"
@@ -463,6 +464,8 @@ for opt do
   ;;
   --disable-aio) aio="no"
   ;;
+  --disable-linux-aio) linuxaio="no"
+  ;;
   --disable-blobs) blobs="no"
   ;;
   --kerneldir=*) kerneldir="$optarg"
@@ -577,6 +580,7 @@ echo "  --enable-uname-release=R Return R for uname -r in usermode emulation"
 echo "  --sparc_cpu=V            Build qemu for Sparc architecture v7, v8, v8plus, v8plusa, v9"
 echo "  --disable-vde            disable support for vde network"
 echo "  --disable-aio            disable AIO support"
+echo "  --disable-linux-aio      disable Linux AIO support"
 echo "  --disable-blobs          disable installing provided firmware blobs"
 echo "  --kerneldir=PATH         look for kernel includes in PATH"
 echo ""
@@ -1082,6 +1086,22 @@ EOF
 fi
 
 ##########################################
+# linux-aio probe
+
+if test "$linuxaio" = "yes" ; then
+    linuxaio=no
+    cat > $TMPC <<EOF
+#include <libaio.h>
+#include <sys/eventfd.h>
+int main(void) { io_setup; io_set_eventfd; eventfd; return 0; }
+EOF
+    if $cc $ARCH_CFLAGS -o $TMPE -laio $TMPC 2> /dev/null ; then
+	linuxaio=yes
+	AIOLIBS="$AIOLIBS -laio"
+    fi
+fi
+
+##########################################
 # iovec probe
 cat > $TMPC <<EOF
 #include <sys/types.h>
@@ -1204,6 +1224,7 @@ echo "uname -r          $uname_release"
 echo "NPTL support      $nptl"
 echo "vde support       $vde"
 echo "AIO support       $aio"
+echo "Linux AIO support $linuxaio"
 echo "Install blobs     $blobs"
 echo "KVM support       $kvm"
 echo "fdt support       $fdt"
@@ -1500,6 +1521,10 @@ if test "$aio" = "yes" ; then
   echo "#define CONFIG_AIO 1" >> $config_h
   echo "CONFIG_AIO=yes" >> $config_mak
 fi
+if test "$linuxaio" = "yes" ; then
+  echo "#define CONFIG_LINUX_AIO 1" >> $config_h
+  echo "CONFIG_LINUX_AIO=yes" >> $config_mak
+fi
 if test "$blobs" = "yes" ; then
   echo "INSTALL_BLOBS=yes" >> $config_mak
 fi
diff --git a/linux-aio.c b/linux-aio.c
new file mode 100644
index 0000000..959407c
--- /dev/null
+++ b/linux-aio.c
@@ -0,0 +1,207 @@
+/* QEMU linux-aio
+ *
+ * Copyright IBM, Corp. 2009
+ *
+ * Authors:
+ *  Anthony Liguori   <aliguori@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ */
+
+#include "qemu-common.h"
+#include "linux-aio.h"
+#include "sys-queue.h"
+#include "osdep.h"
+#include "qemu-aio.h"
+
+#include <sys/eventfd.h>
+#include <libaio.h>
+
+#define MAX_EVENTS 64
+
+struct qemu_laiocb
+{
+    struct qemu_aiocb common;
+    struct qemu_laio_state *ctx;
+    struct iocb iocb;
+    ssize_t ret;
+};
+
+struct qemu_laio_state
+{
+    int efd;
+    io_context_t ctx;
+    int count;
+};
+
+static struct qemu_laio_state *qemu_laio_state;
+
+static struct qemu_laiocb *aiocb_to_laiocb(struct qemu_aiocb *aiocb)
+{
+    return container_of(aiocb, struct qemu_laiocb, common);
+}
+
+struct qemu_aiocb *qemu_laio_get_aiocb(void)
+{
+    struct qemu_laiocb *laiocb;
+
+    laiocb = qemu_mallocz(sizeof(*laiocb));
+    return &laiocb->common;
+}
+
+void qemu_laio_put_aiocb(struct qemu_aiocb *aiocb)
+{
+    struct qemu_laiocb *laiocb = aiocb_to_laiocb(aiocb);
+
+    qemu_free(laiocb);
+}
+
+static void qemu_laio_completion_cb(void *opaque)
+{
+    struct qemu_laio_state *s = opaque;
+    uint64_t val;
+    ssize_t ret;
+    struct io_event events[MAX_EVENTS];
+    int ev_signo = -1;
+
+    while (1) {
+        struct timespec ts = { 0 };
+        int nevents, i;
+
+        do {
+            ret = read(s->efd, &val, sizeof(val));
+        } while (ret == -1 && errno == EINTR);
+
+        if (ret == -1 && errno == EAGAIN)
+            break;
+
+        if (ret != 8)
+            break;
+
+        do {
+            nevents = io_getevents(s->ctx, val, MAX_EVENTS, events, &ts);
+        } while (nevents == -1 && errno == EINTR);
+
+        for (i = 0; i < nevents; i++) {
+            struct iocb *iocb = events[i].obj;
+            struct qemu_laiocb *laiocb = container_of(iocb, struct qemu_laiocb, iocb);
+
+            laiocb->ret = (ssize_t)(((uint64_t)events[i].res2 << 32) | events[i].res);
+            s->count--;
+            ev_signo = laiocb->common.ev_signo;
+        }
+    }
+
+    /* FIXME this is cheating */
+    if (ev_signo != -1)
+        kill(getpid(), ev_signo);
+}
+
+static int qemu_laio_flush_cb(void *opaque)
+{
+    struct qemu_laio_state *s = opaque;
+
+    if (s->count > 0)
+        return 1;
+
+    return 0;
+}
+
+int qemu_laio_init(void)
+{
+    if (qemu_laio_state == NULL) {
+        qemu_laio_state = qemu_mallocz(sizeof(*qemu_laio_state));
+        qemu_laio_state->efd = eventfd(0, 0);
+        if (qemu_laio_state->efd == -1) {
+            qemu_free(qemu_laio_state);
+            return -EINVAL;
+        }
+        if (io_setup(MAX_EVENTS, &qemu_laio_state->ctx) != 0) {
+            close(qemu_laio_state->efd);
+            qemu_free(qemu_laio_state);
+            return -EINVAL;
+        }
+
+        fcntl(qemu_laio_state->efd, F_SETFL, O_NONBLOCK);
+
+        /* FIXME we could use a separate thread to read from eventfd. */
+        /* This will not generate a signal upon IO completion which means that
+         * the VCPU may keep spinning unless there's an IO thread. */
+        qemu_aio_set_fd_handler(qemu_laio_state->efd, qemu_laio_completion_cb,
+                                NULL, qemu_laio_flush_cb, qemu_laio_state);
+    }
+
+    return 0;
+}
+
+static int qemu_laio_submit(struct qemu_aiocb *aiocb, int is_write)
+{
+    struct qemu_laiocb *laiocb = aiocb_to_laiocb(aiocb);
+    struct iocb *iocbs = &laiocb->iocb;
+
+    if (is_write)
+        io_prep_pwrite(&laiocb->iocb, aiocb->aio_fildes, aiocb->aio_buf,
+                       aiocb->aio_nbytes, aiocb->aio_offset);
+    else
+        io_prep_pread(&laiocb->iocb, aiocb->aio_fildes, aiocb->aio_buf,
+                      aiocb->aio_nbytes, aiocb->aio_offset);
+
+    io_set_eventfd(&laiocb->iocb, qemu_laio_state->efd);
+
+    laiocb->ctx = qemu_laio_state;
+    laiocb->ret = -EINPROGRESS;
+
+    qemu_laio_state->count++;
+
+    return io_submit(qemu_laio_state->ctx, 1, &iocbs);
+}
+
+int qemu_laio_read(struct qemu_aiocb *aiocb)
+{
+    return qemu_laio_submit(aiocb, 0);
+}
+
+int qemu_laio_write(struct qemu_aiocb *aiocb)
+{
+    return qemu_laio_submit(aiocb, 1);
+}
+
+int qemu_laio_error(struct qemu_aiocb *aiocb)
+{
+    ssize_t ret = qemu_laio_return(aiocb);
+
+    if (ret < 0)
+        ret = -ret;
+    else
+        ret = 0;
+
+    return ret;
+}
+
+ssize_t qemu_laio_return(struct qemu_aiocb *aiocb)
+{
+    struct qemu_laiocb *laiocb = aiocb_to_laiocb(aiocb);
+
+    return laiocb->ret;
+}
+
+int qemu_laio_cancel(int fd, struct qemu_aiocb *aiocb)
+{
+    struct qemu_laiocb *laiocb = aiocb_to_laiocb(aiocb);
+    struct io_event event;
+    int ret;
+
+    if (laiocb->ret == -EINPROGRESS) {
+        ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
+        if (ret == 0) {
+            laiocb->ret = -ECANCELED;
+            ret = QEMU_PAIO_CANCELED;
+        } else
+            ret = QEMU_PAIO_NOTCANCELED;
+    } else
+        ret = QEMU_PAIO_ALLDONE;
+
+    return ret;
+}
diff --git a/linux-aio.h b/linux-aio.h
new file mode 100644
index 0000000..002270c
--- /dev/null
+++ b/linux-aio.h
@@ -0,0 +1,28 @@
+/* QEMU linux-aio
+ *
+ * Copyright IBM, Corp. 2009
+ *
+ * Authors:
+ *  Anthony Liguori   <aliguori@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ */
+
+#ifndef QEMU_LINUX_AIO_H
+#define QEMU_LINUX_AIO_H
+
+#include "posix-aio-compat.h"
+
+struct qemu_aiocb *qemu_laio_get_aiocb(void);
+void qemu_laio_put_aiocb(struct qemu_aiocb *aiocb);
+
+int qemu_laio_init(void);
+int qemu_laio_read(struct qemu_aiocb *aiocb);
+int qemu_laio_write(struct qemu_aiocb *aiocb);
+int qemu_laio_error(struct qemu_aiocb *aiocb);
+ssize_t qemu_laio_return(struct qemu_aiocb *aiocb);
+int qemu_laio_cancel(int fd, struct qemu_aiocb *aiocb);
+
+#endif
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 6b547f4..752001f 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -18,10 +18,24 @@
 #include <string.h>
 #include <stdlib.h>
 #include <stdio.h>
+#include "qemu-common.h"
 #include "osdep.h"
 
 #include "posix-aio-compat.h"
 
+#include "sys-queue.h"
+
+struct qemu_paiocb
+{
+    struct qemu_aiocb common;
+
+    /* private */
+    TAILQ_ENTRY(qemu_paiocb) node;
+    int is_write;
+    ssize_t ret;
+    int active;
+};
+
 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
 static pthread_t thread_id;
@@ -31,6 +45,11 @@ static int cur_threads = 0;
 static int idle_threads = 0;
 static TAILQ_HEAD(, qemu_paiocb) request_list;
 
+static struct qemu_paiocb *aiocb_to_paiocb(struct qemu_aiocb *aiocb)
+{
+    return container_of(aiocb, struct qemu_paiocb, common);
+}
+
 static void die2(int err, const char *what)
 {
     fprintf(stderr, "%s failed: %s\n", what, strerror(err));
@@ -116,19 +135,19 @@ static void *aio_thread(void *unused)
         idle_threads--;
         mutex_unlock(&lock);
 
-        while (offset < aiocb->aio_nbytes) {
+        while (offset < aiocb->common.aio_nbytes) {
             ssize_t len;
 
             if (aiocb->is_write)
-                len = pwrite(aiocb->aio_fildes,
-                             (const char *)aiocb->aio_buf + offset,
-                             aiocb->aio_nbytes - offset,
-                             aiocb->aio_offset + offset);
+                len = pwrite(aiocb->common.aio_fildes,
+                             (const char *)aiocb->common.aio_buf + offset,
+                             aiocb->common.aio_nbytes - offset,
+                             aiocb->common.aio_offset + offset);
             else
-                len = pread(aiocb->aio_fildes,
-                            (char *)aiocb->aio_buf + offset,
-                            aiocb->aio_nbytes - offset,
-                            aiocb->aio_offset + offset);
+                len = pread(aiocb->common.aio_fildes,
+                            (char *)aiocb->common.aio_buf + offset,
+                            aiocb->common.aio_nbytes - offset,
+                            aiocb->common.aio_offset + offset);
 
             if (len == -1 && errno == EINTR)
                 continue;
@@ -146,7 +165,7 @@ static void *aio_thread(void *unused)
         idle_threads++;
         mutex_unlock(&lock);
 
-        if (kill(pid, aiocb->ev_signo)) die("kill failed");
+        if (kill(pid, aiocb->common.ev_signo)) die("kill failed");
     }
 
     idle_threads--;
@@ -193,18 +212,21 @@ static int qemu_paio_submit(struct qemu_paiocb *aiocb, int is_write)
     return 0;
 }
 
-int qemu_paio_read(struct qemu_paiocb *aiocb)
+int qemu_paio_read(struct qemu_aiocb *cb)
 {
+    struct qemu_paiocb *aiocb = aiocb_to_paiocb(cb);
     return qemu_paio_submit(aiocb, 0);
 }
 
-int qemu_paio_write(struct qemu_paiocb *aiocb)
+int qemu_paio_write(struct qemu_aiocb *cb)
 {
+    struct qemu_paiocb *aiocb = aiocb_to_paiocb(cb);
     return qemu_paio_submit(aiocb, 1);
 }
 
-ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
+ssize_t qemu_paio_return(struct qemu_aiocb *cb)
 {
+    struct qemu_paiocb *aiocb = aiocb_to_paiocb(cb);
     ssize_t ret;
 
     mutex_lock(&lock);
@@ -214,9 +236,9 @@ ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
     return ret;
 }
 
-int qemu_paio_error(struct qemu_paiocb *aiocb)
+int qemu_paio_error(struct qemu_aiocb *cb)
 {
-    ssize_t ret = qemu_paio_return(aiocb);
+    ssize_t ret = qemu_paio_return(cb);
 
     if (ret < 0)
         ret = -ret;
@@ -226,8 +248,9 @@ int qemu_paio_error(struct qemu_paiocb *aiocb)
     return ret;
 }
 
-int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
+int qemu_paio_cancel(int fd, struct qemu_aiocb *cb)
 {
+    struct qemu_paiocb *aiocb = aiocb_to_paiocb(cb);
     int ret;
 
     mutex_lock(&lock);
@@ -243,3 +266,18 @@ int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
 
     return ret;
 }
+
+struct qemu_aiocb *qemu_paio_get_aiocb(void)
+{
+    struct qemu_paiocb *paiocb;
+
+    paiocb = qemu_mallocz(sizeof(*paiocb));
+    return &paiocb->common;
+}
+
+void qemu_paio_put_aiocb(struct qemu_aiocb *aiocb)
+{
+    struct qemu_paiocb *paiocb = aiocb_to_paiocb(aiocb);
+
+    qemu_free(paiocb);
+}
diff --git a/posix-aio-compat.h b/posix-aio-compat.h
index 0bc10f5..b9aa3f9 100644
--- a/posix-aio-compat.h
+++ b/posix-aio-compat.h
@@ -18,25 +18,17 @@
 #include <unistd.h>
 #include <signal.h>
 
-#include "sys-queue.h"
-
 #define QEMU_PAIO_CANCELED     0x01
 #define QEMU_PAIO_NOTCANCELED  0x02
 #define QEMU_PAIO_ALLDONE      0x03
 
-struct qemu_paiocb
+struct qemu_aiocb
 {
     int aio_fildes;
     void *aio_buf;
     size_t aio_nbytes;
     int ev_signo;
     off_t aio_offset;
-
-    /* private */
-    TAILQ_ENTRY(qemu_paiocb) node;
-    int is_write;
-    ssize_t ret;
-    int active;
 };
 
 struct qemu_paioinit
@@ -46,11 +38,14 @@ struct qemu_paioinit
     unsigned int aio_idle_time;
 };
 
+struct qemu_aiocb *qemu_paio_get_aiocb(void);
+void qemu_paio_put_aiocb(struct qemu_aiocb *aiocb);
+
 int qemu_paio_init(struct qemu_paioinit *aioinit);
-int qemu_paio_read(struct qemu_paiocb *aiocb);
-int qemu_paio_write(struct qemu_paiocb *aiocb);
-int qemu_paio_error(struct qemu_paiocb *aiocb);
-ssize_t qemu_paio_return(struct qemu_paiocb *aiocb);
-int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb);
+int qemu_paio_read(struct qemu_aiocb *aiocb);
+int qemu_paio_write(struct qemu_aiocb *aiocb);
+int qemu_paio_error(struct qemu_aiocb *aiocb);
+ssize_t qemu_paio_return(struct qemu_aiocb *aiocb);
+int qemu_paio_cancel(int fd, struct qemu_aiocb *aiocb);
 
 #endif

WARNING: multiple messages have this Message-ID (diff)
From: Anthony Liguori <aliguori@us.ibm.com>
To: qemu-devel@nongnu.org
Cc: kvm@vger.kernel.org
Subject: [Qemu-devel] [PATCH][RFC] Linux AIO support when using O_DIRECT
Date: Mon, 23 Mar 2009 10:45:24 -0500	[thread overview]
Message-ID: <1237823124-6417-1-git-send-email-aliguori@us.ibm.com> (raw)

This is just a first cut.  It needs a fair bit of cleanup before it can be
committed.  I also think we need to fixup the AIO abstractions a bit.

I wanted to share though in case anyone is interested in doing some performance
comparisons.  It seems to work although I haven't exercised it very much.

diff --git a/Makefile b/Makefile
index 82fec80..afc6b41 100644
--- a/Makefile
+++ b/Makefile
@@ -61,6 +61,9 @@ else
 ifdef CONFIG_AIO
 BLOCK_OBJS += posix-aio-compat.o
 endif
+ifdef CONFIG_LINUX_AIO
+BLOCK_OBJS += linux-aio.o
+endif
 BLOCK_OBJS += block-raw-posix.o
 endif
 
diff --git a/Makefile.target b/Makefile.target
index 41366ee..df2a794 100644
--- a/Makefile.target
+++ b/Makefile.target
@@ -514,6 +514,9 @@ else
 ifdef CONFIG_AIO
 OBJS+=posix-aio-compat.o
 endif
+ifdef CONFIG_LINUX_AIO
+OBJS+=linux-aio.o
+endif
 OBJS+=block-raw-posix.o
 endif
 
diff --git a/block-raw-posix.c b/block-raw-posix.c
index 1a1a178..e355cf4 100644
--- a/block-raw-posix.c
+++ b/block-raw-posix.c
@@ -29,6 +29,9 @@
 #ifdef CONFIG_AIO
 #include "posix-aio-compat.h"
 #endif
+#ifdef CONFIG_LINUX_AIO
+#include "linux-aio.h"
+#endif
 
 #ifdef CONFIG_COCOA
 #include <paths.h>
@@ -68,6 +71,10 @@
 #include <sys/diskslice.h>
 #endif
 
+#ifdef CONFIG_LINUX_AIO
+#include "linux-aio.h"
+#endif
+
 //#define DEBUG_FLOPPY
 
 //#define DEBUG_BLOCK
@@ -98,6 +105,17 @@
    reopen it to see if the disk has been changed */
 #define FD_OPEN_TIMEOUT 1000
 
+typedef struct AIOOperations
+{
+    struct qemu_aiocb *(*get_aiocb)(void);
+    void (*put_aiocb)(struct qemu_aiocb *);
+    int (*read)(struct qemu_aiocb *);
+    int (*write)(struct qemu_aiocb *);
+    int (*error)(struct qemu_aiocb *);
+    ssize_t (*get_result)(struct qemu_aiocb *aiocb);
+    int (*cancel)(int fd, struct qemu_aiocb *aiocb);
+} AIOOperations;
+
 typedef struct BDRVRawState {
     int fd;
     int type;
@@ -111,8 +129,31 @@ typedef struct BDRVRawState {
     int fd_media_changed;
 #endif
     uint8_t* aligned_buf;
+    AIOOperations *aio_ops;
 } BDRVRawState;
 
+static AIOOperations posix_aio_ops = {
+    .get_aiocb = qemu_paio_get_aiocb,
+    .put_aiocb = qemu_paio_put_aiocb,
+    .read = qemu_paio_read,
+    .write = qemu_paio_write,
+    .error = qemu_paio_error,
+    .get_result = qemu_paio_return,
+    .cancel = qemu_paio_cancel,
+};
+
+#ifdef CONFIG_LINUX_AIO
+static AIOOperations linux_aio_ops = {
+    .get_aiocb = qemu_laio_get_aiocb,
+    .put_aiocb = qemu_laio_put_aiocb,
+    .read = qemu_laio_read,
+    .write = qemu_laio_write,
+    .error = qemu_laio_error,
+    .get_result = qemu_laio_return,
+    .cancel = qemu_laio_cancel,
+};    
+#endif
+
 static int posix_aio_init(void);
 
 static int fd_open(BlockDriverState *bs);
@@ -124,6 +165,14 @@ static int raw_open(BlockDriverState *bs, const char *filename, int flags)
 
     posix_aio_init();
 
+#ifdef CONFIG_LINUX_AIO
+    if ((flags & BDRV_O_NOCACHE)) {
+        qemu_laio_init();
+        s->aio_ops = &linux_aio_ops;
+    } else
+#endif
+        s->aio_ops = &posix_aio_ops;
+
     s->lseek_err_cnt = 0;
 
     open_flags = O_BINARY;
@@ -463,7 +512,7 @@ static int raw_write(BlockDriverState *bs, int64_t sector_num,
 
 typedef struct RawAIOCB {
     BlockDriverAIOCB common;
-    struct qemu_paiocb aiocb;
+    struct qemu_aiocb *aiocb;
     struct RawAIOCB *next;
     int ret;
 } RawAIOCB;
@@ -496,19 +545,24 @@ static void posix_aio_read(void *opaque)
     for(;;) {
         pacb = &s->first_aio;
         for(;;) {
+            BDRVRawState *s;
+
             acb = *pacb;
             if (!acb)
                 goto the_end;
-            ret = qemu_paio_error(&acb->aiocb);
+
+            s = acb->common.bs->opaque;
+            ret = s->aio_ops->error(acb->aiocb);
             if (ret == ECANCELED) {
                 /* remove the request */
                 *pacb = acb->next;
+                s->aio_ops->put_aiocb(acb->aiocb);
                 qemu_aio_release(acb);
             } else if (ret != EINPROGRESS) {
                 /* end of aio */
                 if (ret == 0) {
-                    ret = qemu_paio_return(&acb->aiocb);
-                    if (ret == acb->aiocb.aio_nbytes)
+                    ret = s->aio_ops->get_result(acb->aiocb);
+                    if (ret == acb->aiocb->aio_nbytes)
                         ret = 0;
                     else
                         ret = -EINVAL;
@@ -519,6 +573,7 @@ static void posix_aio_read(void *opaque)
                 *pacb = acb->next;
                 /* call the callback */
                 acb->common.cb(acb->common.opaque, ret);
+                s->aio_ops->put_aiocb(acb->aiocb);
                 qemu_aio_release(acb);
                 break;
             } else {
@@ -553,7 +608,6 @@ static int posix_aio_init(void)
     struct sigaction act;
     PosixAioState *s;
     int fds[2];
-    struct qemu_paioinit ai;
   
     if (posix_aio_state)
         return 0;
@@ -579,6 +633,8 @@ static int posix_aio_init(void)
 
     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s);
 
+    struct qemu_paioinit ai;
+
     memset(&ai, 0, sizeof(ai));
     ai.aio_threads = 64;
     ai.aio_num = 64;
@@ -600,16 +656,15 @@ static RawAIOCB *raw_aio_setup(BlockDriverState *bs,
         return NULL;
 
     acb = qemu_aio_get(bs, cb, opaque);
-    if (!acb)
-        return NULL;
-    acb->aiocb.aio_fildes = s->fd;
-    acb->aiocb.ev_signo = SIGUSR2;
-    acb->aiocb.aio_buf = buf;
+    acb->aiocb = s->aio_ops->get_aiocb();
+    acb->aiocb->aio_fildes = s->fd;
+    acb->aiocb->ev_signo = SIGUSR2;
+    acb->aiocb->aio_buf = buf;
     if (nb_sectors < 0)
-        acb->aiocb.aio_nbytes = -nb_sectors;
+        acb->aiocb->aio_nbytes = -nb_sectors;
     else
-        acb->aiocb.aio_nbytes = nb_sectors * 512;
-    acb->aiocb.aio_offset = sector_num * 512;
+        acb->aiocb->aio_nbytes = nb_sectors * 512;
+    acb->aiocb->aio_offset = sector_num * 512;
     acb->next = posix_aio_state->first_aio;
     posix_aio_state->first_aio = acb;
     return acb;
@@ -618,7 +673,9 @@ static RawAIOCB *raw_aio_setup(BlockDriverState *bs,
 static void raw_aio_em_cb(void* opaque)
 {
     RawAIOCB *acb = opaque;
+    BDRVRawState *s = acb->common.bs->opaque;
     acb->common.cb(acb->common.opaque, acb->ret);
+    s->aio_ops->put_aiocb(acb->aiocb);
     qemu_aio_release(acb);
 }
 
@@ -633,7 +690,9 @@ static void raw_aio_remove(RawAIOCB *acb)
             fprintf(stderr, "raw_aio_remove: aio request not found!\n");
             break;
         } else if (*pacb == acb) {
+            BDRVRawState *s = acb->common.bs->opaque;
             *pacb = acb->next;
+            s->aio_ops->put_aiocb(acb->aiocb);
             qemu_aio_release(acb);
             break;
         }
@@ -656,6 +715,7 @@ static BlockDriverAIOCB *raw_aio_read(BlockDriverState *bs,
     if (unlikely(s->aligned_buf != NULL && ((uintptr_t) buf % 512))) {
         QEMUBH *bh;
         acb = qemu_aio_get(bs, cb, opaque);
+        acb->aiocb = s->aio_ops->get_aiocb();
         acb->ret = raw_pread(bs, 512 * sector_num, buf, 512 * nb_sectors);
         bh = qemu_bh_new(raw_aio_em_cb, acb);
         qemu_bh_schedule(bh);
@@ -665,7 +725,7 @@ static BlockDriverAIOCB *raw_aio_read(BlockDriverState *bs,
     acb = raw_aio_setup(bs, sector_num, buf, nb_sectors, cb, opaque);
     if (!acb)
         return NULL;
-    if (qemu_paio_read(&acb->aiocb) < 0) {
+    if (s->aio_ops->read(acb->aiocb) < 0) {
         raw_aio_remove(acb);
         return NULL;
     }
@@ -687,6 +747,7 @@ static BlockDriverAIOCB *raw_aio_write(BlockDriverState *bs,
     if (unlikely(s->aligned_buf != NULL && ((uintptr_t) buf % 512))) {
         QEMUBH *bh;
         acb = qemu_aio_get(bs, cb, opaque);
+        acb->aiocb = s->aio_ops->get_aiocb();
         acb->ret = raw_pwrite(bs, 512 * sector_num, buf, 512 * nb_sectors);
         bh = qemu_bh_new(raw_aio_em_cb, acb);
         qemu_bh_schedule(bh);
@@ -696,7 +757,7 @@ static BlockDriverAIOCB *raw_aio_write(BlockDriverState *bs,
     acb = raw_aio_setup(bs, sector_num, (uint8_t*)buf, nb_sectors, cb, opaque);
     if (!acb)
         return NULL;
-    if (qemu_paio_write(&acb->aiocb) < 0) {
+    if (s->aio_ops->write(acb->aiocb) < 0) {
         raw_aio_remove(acb);
         return NULL;
     }
@@ -707,12 +768,13 @@ static void raw_aio_cancel(BlockDriverAIOCB *blockacb)
 {
     int ret;
     RawAIOCB *acb = (RawAIOCB *)blockacb;
+    BDRVRawState *s = acb->common.bs->opaque;
 
-    ret = qemu_paio_cancel(acb->aiocb.aio_fildes, &acb->aiocb);
+    ret = s->aio_ops->cancel(acb->aiocb->aio_fildes, acb->aiocb);
     if (ret == QEMU_PAIO_NOTCANCELED) {
         /* fail safe: if the aio could not be canceled, we wait for
            it */
-        while (qemu_paio_error(&acb->aiocb) == EINPROGRESS);
+        while (s->aio_ops->error(acb->aiocb) == EINPROGRESS);
     }
 
     raw_aio_remove(acb);
@@ -938,6 +1000,14 @@ static int hdev_open(BlockDriverState *bs, const char *filename, int flags)
 
     posix_aio_init();
 
+#ifdef CONFIG_LINUX_AIO
+    if ((flags & BDRV_O_NOCACHE)) {
+        qemu_laio_init();
+        s->aio_ops = &linux_aio_ops;
+    } else
+#endif
+        s->aio_ops = &posix_aio_ops;
+
 #ifdef CONFIG_COCOA
     if (strstart(filename, "/dev/cdrom", NULL)) {
         kern_return_t kernResult;
diff --git a/configure b/configure
index 5c62c59..4913a3f 100755
--- a/configure
+++ b/configure
@@ -180,6 +180,7 @@ build_docs="no"
 uname_release=""
 curses="yes"
 aio="yes"
+linuxaio="yes"
 nptl="yes"
 mixemu="no"
 bluez="yes"
@@ -463,6 +464,8 @@ for opt do
   ;;
   --disable-aio) aio="no"
   ;;
+  --disable-linux-aio) linuxaio="no"
+  ;;
   --disable-blobs) blobs="no"
   ;;
   --kerneldir=*) kerneldir="$optarg"
@@ -577,6 +580,7 @@ echo "  --enable-uname-release=R Return R for uname -r in usermode emulation"
 echo "  --sparc_cpu=V            Build qemu for Sparc architecture v7, v8, v8plus, v8plusa, v9"
 echo "  --disable-vde            disable support for vde network"
 echo "  --disable-aio            disable AIO support"
+echo "  --disable-linux-aio      disable Linux AIO support"
 echo "  --disable-blobs          disable installing provided firmware blobs"
 echo "  --kerneldir=PATH         look for kernel includes in PATH"
 echo ""
@@ -1082,6 +1086,22 @@ EOF
 fi
 
 ##########################################
+# linux-aio probe
+
+if test "$linuxaio" = "yes" ; then
+    linuxaio=no
+    cat > $TMPC <<EOF
+#include <libaio.h>
+#include <sys/eventfd.h>
+int main(void) { io_setup; io_set_eventfd; eventfd; return 0; }
+EOF
+    if $cc $ARCH_CFLAGS -o $TMPE -laio $TMPC 2> /dev/null ; then
+	linuxaio=yes
+	AIOLIBS="$AIOLIBS -laio"
+    fi
+fi
+
+##########################################
 # iovec probe
 cat > $TMPC <<EOF
 #include <sys/types.h>
@@ -1204,6 +1224,7 @@ echo "uname -r          $uname_release"
 echo "NPTL support      $nptl"
 echo "vde support       $vde"
 echo "AIO support       $aio"
+echo "Linux AIO support $linuxaio"
 echo "Install blobs     $blobs"
 echo "KVM support       $kvm"
 echo "fdt support       $fdt"
@@ -1500,6 +1521,10 @@ if test "$aio" = "yes" ; then
   echo "#define CONFIG_AIO 1" >> $config_h
   echo "CONFIG_AIO=yes" >> $config_mak
 fi
+if test "$linuxaio" = "yes" ; then
+  echo "#define CONFIG_LINUX_AIO 1" >> $config_h
+  echo "CONFIG_LINUX_AIO=yes" >> $config_mak
+fi
 if test "$blobs" = "yes" ; then
   echo "INSTALL_BLOBS=yes" >> $config_mak
 fi
diff --git a/linux-aio.c b/linux-aio.c
new file mode 100644
index 0000000..959407c
--- /dev/null
+++ b/linux-aio.c
@@ -0,0 +1,207 @@
+/* QEMU linux-aio
+ *
+ * Copyright IBM, Corp. 2009
+ *
+ * Authors:
+ *  Anthony Liguori   <aliguori@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ */
+
+#include "qemu-common.h"
+#include "linux-aio.h"
+#include "sys-queue.h"
+#include "osdep.h"
+#include "qemu-aio.h"
+
+#include <sys/eventfd.h>
+#include <libaio.h>
+
+#define MAX_EVENTS 64
+
+struct qemu_laiocb
+{
+    struct qemu_aiocb common;
+    struct qemu_laio_state *ctx;
+    struct iocb iocb;
+    ssize_t ret;
+};
+
+struct qemu_laio_state
+{
+    int efd;
+    io_context_t ctx;
+    int count;
+};
+
+static struct qemu_laio_state *qemu_laio_state;
+
+static struct qemu_laiocb *aiocb_to_laiocb(struct qemu_aiocb *aiocb)
+{
+    return container_of(aiocb, struct qemu_laiocb, common);
+}
+
+struct qemu_aiocb *qemu_laio_get_aiocb(void)
+{
+    struct qemu_laiocb *laiocb;
+
+    laiocb = qemu_mallocz(sizeof(*laiocb));
+    return &laiocb->common;
+}
+
+void qemu_laio_put_aiocb(struct qemu_aiocb *aiocb)
+{
+    struct qemu_laiocb *laiocb = aiocb_to_laiocb(aiocb);
+
+    qemu_free(laiocb);
+}
+
+static void qemu_laio_completion_cb(void *opaque)
+{
+    struct qemu_laio_state *s = opaque;
+    uint64_t val;
+    ssize_t ret;
+    struct io_event events[MAX_EVENTS];
+    int ev_signo = -1;
+
+    while (1) {
+        struct timespec ts = { 0 };
+        int nevents, i;
+
+        do {
+            ret = read(s->efd, &val, sizeof(val));
+        } while (ret == -1 && errno == EINTR);
+
+        if (ret == -1 && errno == EAGAIN)
+            break;
+
+        if (ret != 8)
+            break;
+
+        do {
+            nevents = io_getevents(s->ctx, val, MAX_EVENTS, events, &ts);
+        } while (nevents == -1 && errno == EINTR);
+
+        for (i = 0; i < nevents; i++) {
+            struct iocb *iocb = events[i].obj;
+            struct qemu_laiocb *laiocb = container_of(iocb, struct qemu_laiocb, iocb);
+
+            laiocb->ret = (ssize_t)(((uint64_t)events[i].res2 << 32) | events[i].res);
+            s->count--;
+            ev_signo = laiocb->common.ev_signo;
+        }
+    }
+
+    /* FIXME this is cheating */
+    if (ev_signo != -1)
+        kill(getpid(), ev_signo);
+}
+
+static int qemu_laio_flush_cb(void *opaque)
+{
+    struct qemu_laio_state *s = opaque;
+
+    if (s->count > 0)
+        return 1;
+
+    return 0;
+}
+
+int qemu_laio_init(void)
+{
+    if (qemu_laio_state == NULL) {
+        qemu_laio_state = qemu_mallocz(sizeof(*qemu_laio_state));
+        qemu_laio_state->efd = eventfd(0, 0);
+        if (qemu_laio_state->efd == -1) {
+            qemu_free(qemu_laio_state);
+            return -EINVAL;
+        }
+        if (io_setup(MAX_EVENTS, &qemu_laio_state->ctx) != 0) {
+            close(qemu_laio_state->efd);
+            qemu_free(qemu_laio_state);
+            return -EINVAL;
+        }
+
+        fcntl(qemu_laio_state->efd, F_SETFL, O_NONBLOCK);
+
+        /* FIXME we could use a separate thread to read from eventfd. */
+        /* This will not generate a signal upon IO completion which means that
+         * the VCPU may keep spinning unless there's an IO thread. */
+        qemu_aio_set_fd_handler(qemu_laio_state->efd, qemu_laio_completion_cb,
+                                NULL, qemu_laio_flush_cb, qemu_laio_state);
+    }
+
+    return 0;
+}
+
+static int qemu_laio_submit(struct qemu_aiocb *aiocb, int is_write)
+{
+    struct qemu_laiocb *laiocb = aiocb_to_laiocb(aiocb);
+    struct iocb *iocbs = &laiocb->iocb;
+
+    if (is_write)
+        io_prep_pwrite(&laiocb->iocb, aiocb->aio_fildes, aiocb->aio_buf,
+                       aiocb->aio_nbytes, aiocb->aio_offset);
+    else
+        io_prep_pread(&laiocb->iocb, aiocb->aio_fildes, aiocb->aio_buf,
+                      aiocb->aio_nbytes, aiocb->aio_offset);
+
+    io_set_eventfd(&laiocb->iocb, qemu_laio_state->efd);
+
+    laiocb->ctx = qemu_laio_state;
+    laiocb->ret = -EINPROGRESS;
+
+    qemu_laio_state->count++;
+
+    return io_submit(qemu_laio_state->ctx, 1, &iocbs);
+}
+
+int qemu_laio_read(struct qemu_aiocb *aiocb)
+{
+    return qemu_laio_submit(aiocb, 0);
+}
+
+int qemu_laio_write(struct qemu_aiocb *aiocb)
+{
+    return qemu_laio_submit(aiocb, 1);
+}
+
+int qemu_laio_error(struct qemu_aiocb *aiocb)
+{
+    ssize_t ret = qemu_laio_return(aiocb);
+
+    if (ret < 0)
+        ret = -ret;
+    else
+        ret = 0;
+
+    return ret;
+}
+
+ssize_t qemu_laio_return(struct qemu_aiocb *aiocb)
+{
+    struct qemu_laiocb *laiocb = aiocb_to_laiocb(aiocb);
+
+    return laiocb->ret;
+}
+
+int qemu_laio_cancel(int fd, struct qemu_aiocb *aiocb)
+{
+    struct qemu_laiocb *laiocb = aiocb_to_laiocb(aiocb);
+    struct io_event event;
+    int ret;
+
+    if (laiocb->ret == -EINPROGRESS) {
+        ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
+        if (ret == 0) {
+            laiocb->ret = -ECANCELED;
+            ret = QEMU_PAIO_CANCELED;
+        } else
+            ret = QEMU_PAIO_NOTCANCELED;
+    } else
+        ret = QEMU_PAIO_ALLDONE;
+
+    return ret;
+}
diff --git a/linux-aio.h b/linux-aio.h
new file mode 100644
index 0000000..002270c
--- /dev/null
+++ b/linux-aio.h
@@ -0,0 +1,28 @@
+/* QEMU linux-aio
+ *
+ * Copyright IBM, Corp. 2009
+ *
+ * Authors:
+ *  Anthony Liguori   <aliguori@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ */
+
+#ifndef QEMU_LINUX_AIO_H
+#define QEMU_LINUX_AIO_H
+
+#include "posix-aio-compat.h"
+
+struct qemu_aiocb *qemu_laio_get_aiocb(void);
+void qemu_laio_put_aiocb(struct qemu_aiocb *aiocb);
+
+int qemu_laio_init(void);
+int qemu_laio_read(struct qemu_aiocb *aiocb);
+int qemu_laio_write(struct qemu_aiocb *aiocb);
+int qemu_laio_error(struct qemu_aiocb *aiocb);
+ssize_t qemu_laio_return(struct qemu_aiocb *aiocb);
+int qemu_laio_cancel(int fd, struct qemu_aiocb *aiocb);
+
+#endif
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 6b547f4..752001f 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -18,10 +18,24 @@
 #include <string.h>
 #include <stdlib.h>
 #include <stdio.h>
+#include "qemu-common.h"
 #include "osdep.h"
 
 #include "posix-aio-compat.h"
 
+#include "sys-queue.h"
+
+struct qemu_paiocb
+{
+    struct qemu_aiocb common;
+
+    /* private */
+    TAILQ_ENTRY(qemu_paiocb) node;
+    int is_write;
+    ssize_t ret;
+    int active;
+};
+
 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
 static pthread_t thread_id;
@@ -31,6 +45,11 @@ static int cur_threads = 0;
 static int idle_threads = 0;
 static TAILQ_HEAD(, qemu_paiocb) request_list;
 
+static struct qemu_paiocb *aiocb_to_paiocb(struct qemu_aiocb *aiocb)
+{
+    return container_of(aiocb, struct qemu_paiocb, common);
+}
+
 static void die2(int err, const char *what)
 {
     fprintf(stderr, "%s failed: %s\n", what, strerror(err));
@@ -116,19 +135,19 @@ static void *aio_thread(void *unused)
         idle_threads--;
         mutex_unlock(&lock);
 
-        while (offset < aiocb->aio_nbytes) {
+        while (offset < aiocb->common.aio_nbytes) {
             ssize_t len;
 
             if (aiocb->is_write)
-                len = pwrite(aiocb->aio_fildes,
-                             (const char *)aiocb->aio_buf + offset,
-                             aiocb->aio_nbytes - offset,
-                             aiocb->aio_offset + offset);
+                len = pwrite(aiocb->common.aio_fildes,
+                             (const char *)aiocb->common.aio_buf + offset,
+                             aiocb->common.aio_nbytes - offset,
+                             aiocb->common.aio_offset + offset);
             else
-                len = pread(aiocb->aio_fildes,
-                            (char *)aiocb->aio_buf + offset,
-                            aiocb->aio_nbytes - offset,
-                            aiocb->aio_offset + offset);
+                len = pread(aiocb->common.aio_fildes,
+                            (char *)aiocb->common.aio_buf + offset,
+                            aiocb->common.aio_nbytes - offset,
+                            aiocb->common.aio_offset + offset);
 
             if (len == -1 && errno == EINTR)
                 continue;
@@ -146,7 +165,7 @@ static void *aio_thread(void *unused)
         idle_threads++;
         mutex_unlock(&lock);
 
-        if (kill(pid, aiocb->ev_signo)) die("kill failed");
+        if (kill(pid, aiocb->common.ev_signo)) die("kill failed");
     }
 
     idle_threads--;
@@ -193,18 +212,21 @@ static int qemu_paio_submit(struct qemu_paiocb *aiocb, int is_write)
     return 0;
 }
 
-int qemu_paio_read(struct qemu_paiocb *aiocb)
+int qemu_paio_read(struct qemu_aiocb *cb)
 {
+    struct qemu_paiocb *aiocb = aiocb_to_paiocb(cb);
     return qemu_paio_submit(aiocb, 0);
 }
 
-int qemu_paio_write(struct qemu_paiocb *aiocb)
+int qemu_paio_write(struct qemu_aiocb *cb)
 {
+    struct qemu_paiocb *aiocb = aiocb_to_paiocb(cb);
     return qemu_paio_submit(aiocb, 1);
 }
 
-ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
+ssize_t qemu_paio_return(struct qemu_aiocb *cb)
 {
+    struct qemu_paiocb *aiocb = aiocb_to_paiocb(cb);
     ssize_t ret;
 
     mutex_lock(&lock);
@@ -214,9 +236,9 @@ ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
     return ret;
 }
 
-int qemu_paio_error(struct qemu_paiocb *aiocb)
+int qemu_paio_error(struct qemu_aiocb *cb)
 {
-    ssize_t ret = qemu_paio_return(aiocb);
+    ssize_t ret = qemu_paio_return(cb);
 
     if (ret < 0)
         ret = -ret;
@@ -226,8 +248,9 @@ int qemu_paio_error(struct qemu_paiocb *aiocb)
     return ret;
 }
 
-int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
+int qemu_paio_cancel(int fd, struct qemu_aiocb *cb)
 {
+    struct qemu_paiocb *aiocb = aiocb_to_paiocb(cb);
     int ret;
 
     mutex_lock(&lock);
@@ -243,3 +266,18 @@ int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
 
     return ret;
 }
+
+struct qemu_aiocb *qemu_paio_get_aiocb(void)
+{
+    struct qemu_paiocb *paiocb;
+
+    paiocb = qemu_mallocz(sizeof(*paiocb));
+    return &paiocb->common;
+}
+
+void qemu_paio_put_aiocb(struct qemu_aiocb *aiocb)
+{
+    struct qemu_paiocb *paiocb = aiocb_to_paiocb(aiocb);
+
+    qemu_free(paiocb);
+}
diff --git a/posix-aio-compat.h b/posix-aio-compat.h
index 0bc10f5..b9aa3f9 100644
--- a/posix-aio-compat.h
+++ b/posix-aio-compat.h
@@ -18,25 +18,17 @@
 #include <unistd.h>
 #include <signal.h>
 
-#include "sys-queue.h"
-
 #define QEMU_PAIO_CANCELED     0x01
 #define QEMU_PAIO_NOTCANCELED  0x02
 #define QEMU_PAIO_ALLDONE      0x03
 
-struct qemu_paiocb
+struct qemu_aiocb
 {
     int aio_fildes;
     void *aio_buf;
     size_t aio_nbytes;
     int ev_signo;
     off_t aio_offset;
-
-    /* private */
-    TAILQ_ENTRY(qemu_paiocb) node;
-    int is_write;
-    ssize_t ret;
-    int active;
 };
 
 struct qemu_paioinit
@@ -46,11 +38,14 @@ struct qemu_paioinit
     unsigned int aio_idle_time;
 };
 
+struct qemu_aiocb *qemu_paio_get_aiocb(void);
+void qemu_paio_put_aiocb(struct qemu_aiocb *aiocb);
+
 int qemu_paio_init(struct qemu_paioinit *aioinit);
-int qemu_paio_read(struct qemu_paiocb *aiocb);
-int qemu_paio_write(struct qemu_paiocb *aiocb);
-int qemu_paio_error(struct qemu_paiocb *aiocb);
-ssize_t qemu_paio_return(struct qemu_paiocb *aiocb);
-int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb);
+int qemu_paio_read(struct qemu_aiocb *aiocb);
+int qemu_paio_write(struct qemu_aiocb *aiocb);
+int qemu_paio_error(struct qemu_aiocb *aiocb);
+ssize_t qemu_paio_return(struct qemu_aiocb *aiocb);
+int qemu_paio_cancel(int fd, struct qemu_aiocb *aiocb);
 
 #endif

             reply	other threads:[~2009-03-23 15:45 UTC|newest]

Thread overview: 16+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2009-03-23 15:45 Anthony Liguori [this message]
2009-03-23 15:45 ` [Qemu-devel] [PATCH][RFC] Linux AIO support when using O_DIRECT Anthony Liguori
2009-03-23 16:17 ` Avi Kivity
2009-03-23 17:14   ` Anthony Liguori
2009-03-23 17:29     ` Christoph Hellwig
2009-03-23 17:29       ` Christoph Hellwig
2009-03-23 18:10       ` Anthony Liguori
2009-03-23 18:10         ` Anthony Liguori
2009-03-23 18:48         ` Christoph Hellwig
2009-03-23 19:35           ` Avi Kivity
2009-03-23 19:35             ` Avi Kivity
2009-03-23 17:32     ` Christoph Hellwig
2009-03-23 17:32       ` Christoph Hellwig
2009-03-23 19:58     ` Avi Kivity
2009-03-23 20:32       ` Anthony Liguori
2009-03-23 17:26   ` Christoph Hellwig

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1237823124-6417-1-git-send-email-aliguori@us.ibm.com \
    --to=aliguori@us.ibm.com \
    --cc=kvm@vger.kernel.org \
    --cc=qemu-devel@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.