All of lore.kernel.org
 help / color / mirror / Atom feed
From: Kevin Wolf <kwolf@redhat.com>
To: anthony@codemonkey.ws
Cc: kwolf@redhat.com, qemu-devel@nongnu.org
Subject: [Qemu-devel] [PATCH 25/30] sheepdog: use coroutines
Date: Mon, 29 Aug 2011 16:53:33 +0200	[thread overview]
Message-ID: <1314629618-8308-26-git-send-email-kwolf@redhat.com> (raw)
In-Reply-To: <1314629618-8308-1-git-send-email-kwolf@redhat.com>

From: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp>

This makes the sheepdog block driver support bdrv_co_readv/writev
instead of bdrv_aio_readv/writev.

With this patch, Sheepdog network I/O becomes fully asynchronous.  The
block driver yields back when send/recv returns EAGAIN, and is resumed
when the sheepdog network connection is ready for the operation.

Signed-off-by: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp>
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
---
 block/sheepdog.c |  150 +++++++++++++++++++++++++++++++++--------------------
 1 files changed, 93 insertions(+), 57 deletions(-)

diff --git a/block/sheepdog.c b/block/sheepdog.c
index 57b6e1a..c1f6e07 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -274,7 +274,7 @@ struct SheepdogAIOCB {
     int ret;
     enum AIOCBState aiocb_type;
 
-    QEMUBH *bh;
+    Coroutine *coroutine;
     void (*aio_done_func)(SheepdogAIOCB *);
 
     int canceled;
@@ -295,6 +295,10 @@ typedef struct BDRVSheepdogState {
     char *port;
     int fd;
 
+    CoMutex lock;
+    Coroutine *co_send;
+    Coroutine *co_recv;
+
     uint32_t aioreq_seq_num;
     QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head;
 } BDRVSheepdogState;
@@ -346,19 +350,16 @@ static const char * sd_strerror(int err)
 /*
  * Sheepdog I/O handling:
  *
- * 1. In the sd_aio_readv/writev, read/write requests are added to the
- *    QEMU Bottom Halves.
- *
- * 2. In sd_readv_writev_bh_cb, the callbacks of BHs, we send the I/O
- *    requests to the server and link the requests to the
- *    outstanding_list in the BDRVSheepdogState.  we exits the
- *    function without waiting for receiving the response.
+ * 1. In sd_co_rw_vector, we send the I/O requests to the server and
+ *    link the requests to the outstanding_list in the
+ *    BDRVSheepdogState.  The function exits without waiting for
+ *    receiving the response.
  *
- * 3. We receive the response in aio_read_response, the fd handler to
+ * 2. We receive the response in aio_read_response, the fd handler to
  *    the sheepdog connection.  If metadata update is needed, we send
  *    the write request to the vdi object in sd_write_done, the write
- *    completion function.  The AIOCB callback is not called until all
- *    the requests belonging to the AIOCB are finished.
+ *    completion function.  We switch back to sd_co_readv/writev after
+ *    all the requests belonging to the AIOCB are finished.
  */
 
 static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
@@ -398,7 +399,7 @@ static inline int free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
 static void sd_finish_aiocb(SheepdogAIOCB *acb)
 {
     if (!acb->canceled) {
-        acb->common.cb(acb->common.opaque, acb->ret);
+        qemu_coroutine_enter(acb->coroutine, NULL);
     }
     qemu_aio_release(acb);
 }
@@ -411,7 +412,8 @@ static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
      * Sheepdog cannot cancel the requests which are already sent to
      * the servers, so we just complete the request with -EIO here.
      */
-    acb->common.cb(acb->common.opaque, -EIO);
+    acb->ret = -EIO;
+    qemu_coroutine_enter(acb->coroutine, NULL);
     acb->canceled = 1;
 }
 
@@ -435,24 +437,12 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
 
     acb->aio_done_func = NULL;
     acb->canceled = 0;
-    acb->bh = NULL;
+    acb->coroutine = qemu_coroutine_self();
     acb->ret = 0;
     QLIST_INIT(&acb->aioreq_head);
     return acb;
 }
 
-static int sd_schedule_bh(QEMUBHFunc *cb, SheepdogAIOCB *acb)
-{
-    if (acb->bh) {
-        error_report("bug: %d %d", acb->aiocb_type, acb->aiocb_type);
-        return -EIO;
-    }
-
-    acb->bh = qemu_bh_new(cb, acb);
-    qemu_bh_schedule(acb->bh);
-    return 0;
-}
-
 #ifdef _WIN32
 
 struct msghdr {
@@ -635,7 +625,13 @@ static int do_readv_writev(int sockfd, struct iovec *iov, int len,
 again:
     ret = do_send_recv(sockfd, iov, len, iov_offset, write);
     if (ret < 0) {
-        if (errno == EINTR || errno == EAGAIN) {
+        if (errno == EINTR) {
+            goto again;
+        }
+        if (errno == EAGAIN) {
+            if (qemu_in_coroutine()) {
+                qemu_coroutine_yield();
+            }
             goto again;
         }
         error_report("failed to recv a rsp, %s", strerror(errno));
@@ -793,14 +789,14 @@ static void aio_read_response(void *opaque)
     unsigned long idx;
 
     if (QLIST_EMPTY(&s->outstanding_aio_head)) {
-        return;
+        goto out;
     }
 
     /* read a header */
     ret = do_read(fd, &rsp, sizeof(rsp));
     if (ret) {
         error_report("failed to get the header, %s", strerror(errno));
-        return;
+        goto out;
     }
 
     /* find the right aio_req from the outstanding_aio list */
@@ -811,7 +807,7 @@ static void aio_read_response(void *opaque)
     }
     if (!aio_req) {
         error_report("cannot find aio_req %x", rsp.id);
-        return;
+        goto out;
     }
 
     acb = aio_req->aiocb;
@@ -847,7 +843,7 @@ static void aio_read_response(void *opaque)
                        aio_req->iov_offset);
         if (ret) {
             error_report("failed to get the data, %s", strerror(errno));
-            return;
+            goto out;
         }
         break;
     }
@@ -861,10 +857,30 @@ static void aio_read_response(void *opaque)
     if (!rest) {
         /*
          * We've finished all requests which belong to the AIOCB, so
-         * we can call the callback now.
+         * we can switch back to sd_co_readv/writev now.
          */
         acb->aio_done_func(acb);
     }
+out:
+    s->co_recv = NULL;
+}
+
+static void co_read_response(void *opaque)
+{
+    BDRVSheepdogState *s = opaque;
+
+    if (!s->co_recv) {
+        s->co_recv = qemu_coroutine_create(aio_read_response);
+    }
+
+    qemu_coroutine_enter(s->co_recv, opaque);
+}
+
+static void co_write_request(void *opaque)
+{
+    BDRVSheepdogState *s = opaque;
+
+    qemu_coroutine_enter(s->co_send, NULL);
 }
 
 static int aio_flush_request(void *opaque)
@@ -924,7 +940,7 @@ static int get_sheep_fd(BDRVSheepdogState *s)
         return -1;
     }
 
-    qemu_aio_set_fd_handler(fd, aio_read_response, NULL, aio_flush_request,
+    qemu_aio_set_fd_handler(fd, co_read_response, NULL, aio_flush_request,
                             NULL, s);
     return fd;
 }
@@ -1091,6 +1107,10 @@ static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
 
     hdr.id = aio_req->id;
 
+    qemu_co_mutex_lock(&s->lock);
+    s->co_send = qemu_coroutine_self();
+    qemu_aio_set_fd_handler(s->fd, co_read_response, co_write_request,
+                            aio_flush_request, NULL, s);
     set_cork(s->fd, 1);
 
     /* send a header */
@@ -1109,6 +1129,9 @@ static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
     }
 
     set_cork(s->fd, 0);
+    qemu_aio_set_fd_handler(s->fd, co_read_response, NULL,
+                            aio_flush_request, NULL, s);
+    qemu_co_mutex_unlock(&s->lock);
 
     return 0;
 }
@@ -1225,6 +1248,7 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags)
 
     bs->total_sectors = s->inode.vdi_size / SECTOR_SIZE;
     strncpy(s->name, vdi, sizeof(s->name));
+    qemu_co_mutex_init(&s->lock);
     g_free(buf);
     return 0;
 out:
@@ -1491,7 +1515,7 @@ static int sd_truncate(BlockDriverState *bs, int64_t offset)
 /*
  * This function is called after writing data objects.  If we need to
  * update metadata, this sends a write request to the vdi object.
- * Otherwise, this calls the AIOCB callback.
+ * Otherwise, this switches back to sd_co_readv/writev.
  */
 static void sd_write_done(SheepdogAIOCB *acb)
 {
@@ -1587,8 +1611,11 @@ out:
  * waiting the response.  The responses are received in the
  * `aio_read_response' function which is called from the main loop as
  * a fd handler.
+ *
+ * Returns 1 when we need to wait a response, 0 when there is no sent
+ * request and -errno in error cases.
  */
-static void sd_readv_writev_bh_cb(void *p)
+static int sd_co_rw_vector(void *p)
 {
     SheepdogAIOCB *acb = p;
     int ret = 0;
@@ -1600,9 +1627,6 @@ static void sd_readv_writev_bh_cb(void *p)
     SheepdogInode *inode = &s->inode;
     AIOReq *aio_req;
 
-    qemu_bh_delete(acb->bh);
-    acb->bh = NULL;
-
     if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
         /*
          * In the case we open the snapshot VDI, Sheepdog creates the
@@ -1684,42 +1708,47 @@ static void sd_readv_writev_bh_cb(void *p)
     }
 out:
     if (QLIST_EMPTY(&acb->aioreq_head)) {
-        sd_finish_aiocb(acb);
+        return acb->ret;
     }
+    return 1;
 }
 
-static BlockDriverAIOCB *sd_aio_writev(BlockDriverState *bs, int64_t sector_num,
-                                       QEMUIOVector *qiov, int nb_sectors,
-                                       BlockDriverCompletionFunc *cb,
-                                       void *opaque)
+static int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
+                        int nb_sectors, QEMUIOVector *qiov)
 {
     SheepdogAIOCB *acb;
+    int ret;
 
     if (bs->growable && sector_num + nb_sectors > bs->total_sectors) {
         /* TODO: shouldn't block here */
         if (sd_truncate(bs, (sector_num + nb_sectors) * SECTOR_SIZE) < 0) {
-            return NULL;
+            return -EIO;
         }
         bs->total_sectors = sector_num + nb_sectors;
     }
 
-    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
+    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, NULL, NULL);
     acb->aio_done_func = sd_write_done;
     acb->aiocb_type = AIOCB_WRITE_UDATA;
 
-    sd_schedule_bh(sd_readv_writev_bh_cb, acb);
-    return &acb->common;
+    ret = sd_co_rw_vector(acb);
+    if (ret <= 0) {
+        qemu_aio_release(acb);
+        return ret;
+    }
+
+    qemu_coroutine_yield();
+
+    return acb->ret;
 }
 
-static BlockDriverAIOCB *sd_aio_readv(BlockDriverState *bs, int64_t sector_num,
-                                      QEMUIOVector *qiov, int nb_sectors,
-                                      BlockDriverCompletionFunc *cb,
-                                      void *opaque)
+static int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
+                       int nb_sectors, QEMUIOVector *qiov)
 {
     SheepdogAIOCB *acb;
-    int i;
+    int i, ret;
 
-    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
+    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, NULL, NULL);
     acb->aiocb_type = AIOCB_READ_UDATA;
     acb->aio_done_func = sd_finish_aiocb;
 
@@ -1731,8 +1760,15 @@ static BlockDriverAIOCB *sd_aio_readv(BlockDriverState *bs, int64_t sector_num,
         memset(qiov->iov[i].iov_base, 0, qiov->iov[i].iov_len);
     }
 
-    sd_schedule_bh(sd_readv_writev_bh_cb, acb);
-    return &acb->common;
+    ret = sd_co_rw_vector(acb);
+    if (ret <= 0) {
+        qemu_aio_release(acb);
+        return ret;
+    }
+
+    qemu_coroutine_yield();
+
+    return acb->ret;
 }
 
 static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
@@ -2062,8 +2098,8 @@ BlockDriver bdrv_sheepdog = {
     .bdrv_getlength = sd_getlength,
     .bdrv_truncate  = sd_truncate,
 
-    .bdrv_aio_readv     = sd_aio_readv,
-    .bdrv_aio_writev    = sd_aio_writev,
+    .bdrv_co_readv  = sd_co_readv,
+    .bdrv_co_writev = sd_co_writev,
 
     .bdrv_snapshot_create   = sd_snapshot_create,
     .bdrv_snapshot_goto     = sd_snapshot_goto,
-- 
1.7.6

  parent reply	other threads:[~2011-08-29 14:51 UTC|newest]

Thread overview: 32+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2011-08-29 14:53 [Qemu-devel] [PULL 00/30] Block patches Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 01/30] coroutine: Add CoRwlock support Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 02/30] block: parse cache mode flags in a single place Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 03/30] block: add cache=directsync parameter to -drive Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 04/30] qcow2: Fix DEBUG_* compilation Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 05/30] qemu-img: Use qemu_blockalign Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 06/30] qcow2: fix typo in documentation for qcow2_get_cluster_offset() Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 07/30] qcow: initialize coroutine mutex Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 08/30] qemu-img: print error codes when convert fails Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 09/30] block/curl: Handle failed reads gracefully Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 10/30] block: include flush requests in info blockstats Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 11/30] posix-aio-compat: fix latency issues Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 12/30] qcow/qcow2: Allocate QCowAIOCB structure using stack Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 13/30] qcow: QCowAIOCB field cleanup Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 14/30] qcow: move some blocks of code to avoid useless variable initialization Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 15/30] qcow: Remove QCowAIOCB Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 16/30] qcow: remove old #undefined code Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 17/30] qcow2: Removed unused AIOCB fields Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 18/30] qcow2: removed cur_nr_sectors field in QCowAIOCB Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 19/30] qcow2: remove l2meta from QCowAIOCB Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 20/30] qcow2: remove cluster_offset " Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 21/30] qcow2: remove common " Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 22/30] qcow2: reindent and use while before the big jump Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 23/30] qcow2: Removed QCowAIOCB entirely Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 24/30] qcow2: remove memory leak Kevin Wolf
2011-08-29 14:53 ` Kevin Wolf [this message]
2011-08-29 14:53 ` [Qemu-devel] [PATCH 26/30] qcow2: use always stderr for debugging Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 27/30] qcow2: remove unused qcow2_create_refcount_update function Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 28/30] block: explicit I/O accounting Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 29/30] block: latency accounting Kevin Wolf
2011-08-29 14:53 ` [Qemu-devel] [PATCH 30/30] qemu-img: Require larger zero areas for sparse handling Kevin Wolf
2011-08-29 19:15 ` [Qemu-devel] [PULL 00/30] Block patches Anthony Liguori

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1314629618-8308-26-git-send-email-kwolf@redhat.com \
    --to=kwolf@redhat.com \
    --cc=anthony@codemonkey.ws \
    --cc=qemu-devel@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.