All of lore.kernel.org
 help / color / mirror / Atom feed
From: Eric Blake <eblake@redhat.com>
To: qemu-devel@nongnu.org
Cc: "open list:Network Block Dev..." <qemu-block@nongnu.org>,
	Sergio Lopez <slp@redhat.com>
Subject: [PULL 04/13] nbd/server: Quiesce coroutines on context switch
Date: Wed, 20 Jan 2021 20:36:48 -0600	[thread overview]
Message-ID: <20210121023657.1186241-5-eblake@redhat.com> (raw)
In-Reply-To: <20210121023657.1186241-1-eblake@redhat.com>

From: Sergio Lopez <slp@redhat.com>

When switching between AIO contexts we need to me make sure that both
recv_coroutine and send_coroutine are not scheduled to run. Otherwise,
QEMU may crash while attaching the new context with an error like
this one:

aio_co_schedule: Co-routine was already scheduled in 'aio_co_schedule'

To achieve this we need a local implementation of
'qio_channel_readv_all_eof' named 'nbd_read_eof' (a trick already done
by 'nbd/client.c') that allows us to interrupt the operation and to
know when recv_coroutine is yielding.

With this in place, we delegate detaching the AIO context to the
owning context with a BH ('nbd_aio_detach_bh') scheduled using
'aio_wait_bh_oneshot'. This BH signals that we need to quiesce the
channel by setting 'client->quiescing' to 'true', and either waits for
the coroutine to finish using AIO_WAIT_WHILE or, if it's yielding in
'nbd_read_eof', actively enters the coroutine to interrupt it.

RHBZ: https://bugzilla.redhat.com/show_bug.cgi?id=1900326
Signed-off-by: Sergio Lopez <slp@redhat.com>
Reviewed-by: Eric Blake <eblake@redhat.com>
Message-Id: <20201214170519.223781-4-slp@redhat.com>
Signed-off-by: Eric Blake <eblake@redhat.com>
---
 nbd/server.c | 120 +++++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 106 insertions(+), 14 deletions(-)

diff --git a/nbd/server.c b/nbd/server.c
index 613ed2634ada..7229f487d296 100644
--- a/nbd/server.c
+++ b/nbd/server.c
@@ -132,6 +132,9 @@ struct NBDClient {
     CoMutex send_lock;
     Coroutine *send_coroutine;

+    bool read_yielding;
+    bool quiescing;
+
     QTAILQ_ENTRY(NBDClient) next;
     int nb_requests;
     bool closing;
@@ -1352,14 +1355,60 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
     return 0;
 }

-static int nbd_receive_request(QIOChannel *ioc, NBDRequest *request,
+/* nbd_read_eof
+ * Tries to read @size bytes from @ioc. This is a local implementation of
+ * qio_channel_readv_all_eof. We have it here because we need it to be
+ * interruptible and to know when the coroutine is yielding.
+ * Returns 1 on success
+ *         0 on eof, when no data was read (errp is not set)
+ *         negative errno on failure (errp is set)
+ */
+static inline int coroutine_fn
+nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
+{
+    bool partial = false;
+
+    assert(size);
+    while (size > 0) {
+        struct iovec iov = { .iov_base = buffer, .iov_len = size };
+        ssize_t len;
+
+        len = qio_channel_readv(client->ioc, &iov, 1, errp);
+        if (len == QIO_CHANNEL_ERR_BLOCK) {
+            client->read_yielding = true;
+            qio_channel_yield(client->ioc, G_IO_IN);
+            client->read_yielding = false;
+            if (client->quiescing) {
+                return -EAGAIN;
+            }
+            continue;
+        } else if (len < 0) {
+            return -EIO;
+        } else if (len == 0) {
+            if (partial) {
+                error_setg(errp,
+                           "Unexpected end-of-file before all bytes were read");
+                return -EIO;
+            } else {
+                return 0;
+            }
+        }
+
+        partial = true;
+        size -= len;
+        buffer = (uint8_t *) buffer + len;
+    }
+    return 1;
+}
+
+static int nbd_receive_request(NBDClient *client, NBDRequest *request,
                                Error **errp)
 {
     uint8_t buf[NBD_REQUEST_SIZE];
     uint32_t magic;
     int ret;

-    ret = nbd_read(ioc, buf, sizeof(buf), "request", errp);
+    ret = nbd_read_eof(client, buf, sizeof(buf), errp);
     if (ret < 0) {
         return ret;
     }
@@ -1480,11 +1529,37 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)

     QTAILQ_FOREACH(client, &exp->clients, next) {
         qio_channel_attach_aio_context(client->ioc, ctx);
+
+        assert(client->recv_coroutine == NULL);
+        assert(client->send_coroutine == NULL);
+
+        if (client->quiescing) {
+            client->quiescing = false;
+            nbd_client_receive_next_request(client);
+        }
+    }
+}
+
+static void nbd_aio_detach_bh(void *opaque)
+{
+    NBDExport *exp = opaque;
+    NBDClient *client;
+
+    QTAILQ_FOREACH(client, &exp->clients, next) {
+        qio_channel_detach_aio_context(client->ioc);
+        client->quiescing = true;
+
         if (client->recv_coroutine) {
-            aio_co_schedule(ctx, client->recv_coroutine);
+            if (client->read_yielding) {
+                qemu_aio_coroutine_enter(exp->common.ctx,
+                                         client->recv_coroutine);
+            } else {
+                AIO_WAIT_WHILE(exp->common.ctx, client->recv_coroutine != NULL);
+            }
         }
+
         if (client->send_coroutine) {
-            aio_co_schedule(ctx, client->send_coroutine);
+            AIO_WAIT_WHILE(exp->common.ctx, client->send_coroutine != NULL);
         }
     }
 }
@@ -1492,13 +1567,10 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
 static void blk_aio_detach(void *opaque)
 {
     NBDExport *exp = opaque;
-    NBDClient *client;

     trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);

-    QTAILQ_FOREACH(client, &exp->clients, next) {
-        qio_channel_detach_aio_context(client->ioc);
-    }
+    aio_wait_bh_oneshot(exp->common.ctx, nbd_aio_detach_bh, exp);

     exp->common.ctx = NULL;
 }
@@ -2151,20 +2223,23 @@ static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle,

 /* nbd_co_receive_request
  * Collect a client request. Return 0 if request looks valid, -EIO to drop
- * connection right away, and any other negative value to report an error to
- * the client (although the caller may still need to disconnect after reporting
- * the error).
+ * connection right away, -EAGAIN to indicate we were interrupted and the
+ * channel should be quiesced, and any other negative value to report an error
+ * to the client (although the caller may still need to disconnect after
+ * reporting the error).
  */
 static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
                                   Error **errp)
 {
     NBDClient *client = req->client;
     int valid_flags;
+    int ret;

     g_assert(qemu_in_coroutine());
     assert(client->recv_coroutine == qemu_coroutine_self());
-    if (nbd_receive_request(client->ioc, request, errp) < 0) {
-        return -EIO;
+    ret = nbd_receive_request(client, request, errp);
+    if (ret < 0) {
+        return  ret;
     }

     trace_nbd_co_receive_request_decode_type(request->handle, request->type,
@@ -2507,6 +2582,17 @@ static coroutine_fn void nbd_trip(void *opaque)
         return;
     }

+    if (client->quiescing) {
+        /*
+         * We're switching between AIO contexts. Don't attempt to receive a new
+         * request and kick the main context which may be waiting for us.
+         */
+        nbd_client_put(client);
+        client->recv_coroutine = NULL;
+        aio_wait_kick();
+        return;
+    }
+
     req = nbd_request_get(client);
     ret = nbd_co_receive_request(req, &request, &local_err);
     client->recv_coroutine = NULL;
@@ -2519,6 +2605,11 @@ static coroutine_fn void nbd_trip(void *opaque)
         goto done;
     }

+    if (ret == -EAGAIN) {
+        assert(client->quiescing);
+        goto done;
+    }
+
     nbd_client_receive_next_request(client);
     if (ret == -EIO) {
         goto disconnect;
@@ -2565,7 +2656,8 @@ disconnect:

 static void nbd_client_receive_next_request(NBDClient *client)
 {
-    if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) {
+    if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS &&
+        !client->quiescing) {
         nbd_client_get(client);
         client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
         aio_co_schedule(client->exp->common.ctx, client->recv_coroutine);
-- 
2.30.0



  parent reply	other threads:[~2021-01-21  2:41 UTC|newest]

Thread overview: 17+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-01-21  2:36 [PULL 00/13] NBD patches through 2021-01-20 Eric Blake
2021-01-21  2:36 ` [PULL 01/13] qemu-nbd: Fix a memleak in qemu_nbd_client_list() Eric Blake
2021-01-21  2:36 ` [PULL 02/13] qemu-nbd: Fix a memleak in nbd_client_thread() Eric Blake
2021-01-21  2:36 ` [PULL 03/13] block: Honor blk_set_aio_context() context requirements Eric Blake
2021-01-21  2:36   ` Eric Blake
2021-01-21  2:36 ` Eric Blake [this message]
2021-01-21  2:36 ` [PULL 05/13] iotests/277: use dot slash for nbd-fault-injector.py running Eric Blake
2021-01-21  2:36 ` [PULL 06/13] iotests/303: use dot slash for qcow2.py running Eric Blake
2021-01-21  2:36 ` [PULL 07/13] iotests: fix some whitespaces in test output files Eric Blake
2021-01-21  2:36 ` [PULL 08/13] iotests: make tests executable Eric Blake
2021-01-21  2:36 ` [PULL 09/13] iotests/294: add shebang line Eric Blake
2021-01-21  2:36 ` [PULL 10/13] iotests: define group in each iotest Eric Blake
2021-01-21  9:19   ` Vladimir Sementsov-Ogievskiy
2021-01-21  2:36 ` [PULL 11/13] iotests/264: fix style Eric Blake
2021-01-21  2:36 ` [PULL 12/13] iotests.py: fix qemu_tool_pipe_and_status() Eric Blake
2021-01-21  2:36 ` [PULL 13/13] iotests.py: qemu_io(): reuse qemu_tool_pipe_and_status() Eric Blake
2021-01-21 11:35 ` [PULL 00/13] NBD patches through 2021-01-20 Peter Maydell

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210121023657.1186241-5-eblake@redhat.com \
    --to=eblake@redhat.com \
    --cc=qemu-block@nongnu.org \
    --cc=qemu-devel@nongnu.org \
    --cc=slp@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.