All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH v2 for-7.1 0/9] nbd: actually make s->state thread-safe
@ 2022-04-14 17:57 Paolo Bonzini
  2022-04-14 17:57 ` [PATCH v2 for-7.1 1/9] nbd: safeguard against waking up invalid coroutine Paolo Bonzini
                   ` (9 more replies)
  0 siblings, 10 replies; 33+ messages in thread
From: Paolo Bonzini @ 2022-04-14 17:57 UTC (permalink / raw)
  To: qemu-devel; +Cc: v.sementsov-og, eblake

The main point of this series is patch 7, which removes the dubious and
probably wrong use of atomics in block/nbd.c.  This in turn is enabled
mostly by the cleanups in patches 3-5.  Together, they introduce a
QemuMutex that synchronizes the NBD client coroutines, the reconnect_delay
timer and nbd_cancel_in_flight() as well.

The fixes happen to remove an incorrect use of qemu_co_queue_restart_all
and qemu_co_enter_next on the s->free_sema CoQueue, which was not guarded
by s->send_mutex.

The rest is bugfixes, simplifying the code a bit, and extra documentation.

v1->v2:
- cleaner patch 1
- fix grammar in patch 4
- split out patch 6

Paolo Bonzini (9):
  nbd: safeguard against waking up invalid coroutine
  nbd: mark more coroutine_fns
  nbd: remove peppering of nbd_client_connected
  nbd: keep send_mutex/free_sema handling outside
    nbd_co_do_establish_connection
  nbd: use a QemuMutex to synchronize yanking, reconnection and
    coroutines
  nbd: code motion and function renaming
  nbd: move s->state under requests_lock
  nbd: take receive_mutex when reading requests[].receiving
  nbd: document what is protected by the CoMutexes

 block/coroutines.h |   4 +-
 block/nbd.c        | 298 +++++++++++++++++++++++----------------------
 2 files changed, 154 insertions(+), 148 deletions(-)

-- 
2.35.1



^ permalink raw reply	[flat|nested] 33+ messages in thread

* [PATCH v2 for-7.1 1/9] nbd: safeguard against waking up invalid coroutine
  2022-04-14 17:57 [PATCH v2 for-7.1 0/9] nbd: actually make s->state thread-safe Paolo Bonzini
@ 2022-04-14 17:57 ` Paolo Bonzini
  2022-04-14 19:02   ` Eric Blake
  2022-04-15  9:53   ` Vladimir Sementsov-Ogievskiy
  2022-04-14 17:57 ` [PATCH v2 for-7.1 2/9] nbd: mark more coroutine_fns Paolo Bonzini
                   ` (8 subsequent siblings)
  9 siblings, 2 replies; 33+ messages in thread
From: Paolo Bonzini @ 2022-04-14 17:57 UTC (permalink / raw)
  To: qemu-devel; +Cc: v.sementsov-og, eblake

The .reply_possible field of s->requests is never set to false.  This is
not a problem as it is only a safeguard to detect protocol errors,
but it's sloppy.  In fact, the field is actually not necessary at all,
because .coroutine is set to NULL in NBD_FOREACH_REPLY_CHUNK after
receiving the last chunk.  Thus, replace .reply_possible with .coroutine
and move the check before deciding the fate of this request.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/nbd.c | 12 +++++-------
 1 file changed, 5 insertions(+), 7 deletions(-)

diff --git a/block/nbd.c b/block/nbd.c
index 691d4b05dc..d29bee1122 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -58,7 +58,6 @@ typedef struct {
     Coroutine *coroutine;
     uint64_t offset;        /* original offset of the request */
     bool receiving;         /* sleeping in the yield in nbd_receive_replies */
-    bool reply_possible;    /* reply header not yet received */
 } NBDClientRequest;
 
 typedef enum NBDClientState {
@@ -454,15 +453,15 @@ static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t handle)
             nbd_channel_error(s, -EINVAL);
             return -EINVAL;
         }
+        ind2 = HANDLE_TO_INDEX(s, s->reply.handle);
+        if (ind2 >= MAX_NBD_REQUESTS || !s->requests[ind2].coroutine) {
+            nbd_channel_error(s, -EINVAL);
+            return -EINVAL;
+        }
         if (s->reply.handle == handle) {
             /* We are done */
             return 0;
         }
-        ind2 = HANDLE_TO_INDEX(s, s->reply.handle);
-        if (ind2 >= MAX_NBD_REQUESTS || !s->requests[ind2].reply_possible) {
-            nbd_channel_error(s, -EINVAL);
-            return -EINVAL;
-        }
         nbd_recv_coroutine_wake_one(&s->requests[ind2]);
     }
 }
@@ -505,7 +504,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
     s->requests[i].coroutine = qemu_coroutine_self();
     s->requests[i].offset = request->from;
     s->requests[i].receiving = false;
-    s->requests[i].reply_possible = true;
 
     request->handle = INDEX_TO_HANDLE(s, i);
 
-- 
2.35.1




^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH v2 for-7.1 2/9] nbd: mark more coroutine_fns
  2022-04-14 17:57 [PATCH v2 for-7.1 0/9] nbd: actually make s->state thread-safe Paolo Bonzini
  2022-04-14 17:57 ` [PATCH v2 for-7.1 1/9] nbd: safeguard against waking up invalid coroutine Paolo Bonzini
@ 2022-04-14 17:57 ` Paolo Bonzini
  2022-04-15 10:12   ` Vladimir Sementsov-Ogievskiy
  2022-04-14 17:57 ` [PATCH v2 for-7.1 3/9] nbd: remove peppering of nbd_client_connected Paolo Bonzini
                   ` (7 subsequent siblings)
  9 siblings, 1 reply; 33+ messages in thread
From: Paolo Bonzini @ 2022-04-14 17:57 UTC (permalink / raw)
  To: qemu-devel; +Cc: v.sementsov-og, eblake

Several coroutine functions in block/nbd.c are not marked as such.  This
patch adds a few more markers; it is not exhaustive, but it focuses
especially on:

- places that wake other coroutines, because aio_co_wake() has very
different semantics inside a coroutine (queuing after yield vs. entering
immediately);

- functions with _co_ in their names, to avoid confusion

Reviewed-by: Eric Blake <eblake@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/nbd.c | 64 ++++++++++++++++++++++++++---------------------------
 1 file changed, 32 insertions(+), 32 deletions(-)

diff --git a/block/nbd.c b/block/nbd.c
index d29bee1122..3cc4ea4430 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -132,7 +132,7 @@ static bool nbd_client_connected(BDRVNBDState *s)
     return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED;
 }
 
-static bool nbd_recv_coroutine_wake_one(NBDClientRequest *req)
+static bool coroutine_fn nbd_recv_coroutine_wake_one(NBDClientRequest *req)
 {
     if (req->receiving) {
         req->receiving = false;
@@ -143,7 +143,7 @@ static bool nbd_recv_coroutine_wake_one(NBDClientRequest *req)
     return false;
 }
 
-static void nbd_recv_coroutines_wake(BDRVNBDState *s, bool all)
+static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s, bool all)
 {
     int i;
 
@@ -154,7 +154,7 @@ static void nbd_recv_coroutines_wake(BDRVNBDState *s, bool all)
     }
 }
 
-static void nbd_channel_error(BDRVNBDState *s, int ret)
+static void coroutine_fn nbd_channel_error(BDRVNBDState *s, int ret)
 {
     if (nbd_client_connected(s)) {
         qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
@@ -466,9 +466,9 @@ static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t handle)
     }
 }
 
-static int nbd_co_send_request(BlockDriverState *bs,
-                               NBDRequest *request,
-                               QEMUIOVector *qiov)
+static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
+                                            NBDRequest *request,
+                                            QEMUIOVector *qiov)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     int rc, i = -1;
@@ -721,9 +721,9 @@ static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk,
     return 0;
 }
 
-static int nbd_co_receive_offset_data_payload(BDRVNBDState *s,
-                                              uint64_t orig_offset,
-                                              QEMUIOVector *qiov, Error **errp)
+static int coroutine_fn
+nbd_co_receive_offset_data_payload(BDRVNBDState *s, uint64_t orig_offset,
+                                   QEMUIOVector *qiov, Error **errp)
 {
     QEMUIOVector sub_qiov;
     uint64_t offset;
@@ -1039,8 +1039,8 @@ break_loop:
     return false;
 }
 
-static int nbd_co_receive_return_code(BDRVNBDState *s, uint64_t handle,
-                                      int *request_ret, Error **errp)
+static int coroutine_fn nbd_co_receive_return_code(BDRVNBDState *s, uint64_t handle,
+                                                   int *request_ret, Error **errp)
 {
     NBDReplyChunkIter iter;
 
@@ -1053,9 +1053,9 @@ static int nbd_co_receive_return_code(BDRVNBDState *s, uint64_t handle,
     return iter.ret;
 }
 
-static int nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle,
-                                        uint64_t offset, QEMUIOVector *qiov,
-                                        int *request_ret, Error **errp)
+static int coroutine_fn nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle,
+                                                     uint64_t offset, QEMUIOVector *qiov,
+                                                     int *request_ret, Error **errp)
 {
     NBDReplyChunkIter iter;
     NBDReply reply;
@@ -1105,10 +1105,10 @@ static int nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle,
     return iter.ret;
 }
 
-static int nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
-                                            uint64_t handle, uint64_t length,
-                                            NBDExtent *extent,
-                                            int *request_ret, Error **errp)
+static int coroutine_fn nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
+                                                         uint64_t handle, uint64_t length,
+                                                         NBDExtent *extent,
+                                                         int *request_ret, Error **errp)
 {
     NBDReplyChunkIter iter;
     NBDReply reply;
@@ -1165,8 +1165,8 @@ static int nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
     return iter.ret;
 }
 
-static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
-                          QEMUIOVector *write_qiov)
+static int coroutine_fn nbd_co_request(BlockDriverState *bs, NBDRequest *request,
+                                       QEMUIOVector *write_qiov)
 {
     int ret, request_ret;
     Error *local_err = NULL;
@@ -1202,9 +1202,9 @@ static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
     return ret ? ret : request_ret;
 }
 
-static int nbd_client_co_preadv(BlockDriverState *bs, int64_t offset,
-                                int64_t bytes, QEMUIOVector *qiov,
-                                BdrvRequestFlags flags)
+static int coroutine_fn nbd_client_co_preadv(BlockDriverState *bs, int64_t offset,
+                                             int64_t bytes, QEMUIOVector *qiov,
+                                             BdrvRequestFlags flags)
 {
     int ret, request_ret;
     Error *local_err = NULL;
@@ -1261,9 +1261,9 @@ static int nbd_client_co_preadv(BlockDriverState *bs, int64_t offset,
     return ret ? ret : request_ret;
 }
 
-static int nbd_client_co_pwritev(BlockDriverState *bs, int64_t offset,
-                                 int64_t bytes, QEMUIOVector *qiov,
-                                 BdrvRequestFlags flags)
+static int coroutine_fn nbd_client_co_pwritev(BlockDriverState *bs, int64_t offset,
+                                              int64_t bytes, QEMUIOVector *qiov,
+                                              BdrvRequestFlags flags)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     NBDRequest request = {
@@ -1286,8 +1286,8 @@ static int nbd_client_co_pwritev(BlockDriverState *bs, int64_t offset,
     return nbd_co_request(bs, &request, qiov);
 }
 
-static int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
-                                       int64_t bytes, BdrvRequestFlags flags)
+static int coroutine_fn nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
+                                                    int64_t bytes, BdrvRequestFlags flags)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     NBDRequest request = {
@@ -1321,7 +1321,7 @@ static int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
     return nbd_co_request(bs, &request, NULL);
 }
 
-static int nbd_client_co_flush(BlockDriverState *bs)
+static int coroutine_fn nbd_client_co_flush(BlockDriverState *bs)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     NBDRequest request = { .type = NBD_CMD_FLUSH };
@@ -1336,8 +1336,8 @@ static int nbd_client_co_flush(BlockDriverState *bs)
     return nbd_co_request(bs, &request, NULL);
 }
 
-static int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset,
-                                  int64_t bytes)
+static int coroutine_fn nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset,
+                                               int64_t bytes)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     NBDRequest request = {
@@ -1914,7 +1914,7 @@ fail:
     return ret;
 }
 
-static int nbd_co_flush(BlockDriverState *bs)
+static int coroutine_fn nbd_co_flush(BlockDriverState *bs)
 {
     return nbd_client_co_flush(bs);
 }
-- 
2.35.1




^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH v2 for-7.1 3/9] nbd: remove peppering of nbd_client_connected
  2022-04-14 17:57 [PATCH v2 for-7.1 0/9] nbd: actually make s->state thread-safe Paolo Bonzini
  2022-04-14 17:57 ` [PATCH v2 for-7.1 1/9] nbd: safeguard against waking up invalid coroutine Paolo Bonzini
  2022-04-14 17:57 ` [PATCH v2 for-7.1 2/9] nbd: mark more coroutine_fns Paolo Bonzini
@ 2022-04-14 17:57 ` Paolo Bonzini
  2022-04-15 17:01   ` Vladimir Sementsov-Ogievskiy
  2022-04-14 17:57 ` [PATCH v2 for-7.1 4/9] nbd: keep send_mutex/free_sema handling outside nbd_co_do_establish_connection Paolo Bonzini
                   ` (6 subsequent siblings)
  9 siblings, 1 reply; 33+ messages in thread
From: Paolo Bonzini @ 2022-04-14 17:57 UTC (permalink / raw)
  To: qemu-devel; +Cc: v.sementsov-og, eblake

It is unnecessary to check nbd_client_connected() because every time
s->state is moved out of NBD_CLIENT_CONNECTED the socket is shut down
and all coroutines are resumed.

The only case where it was actually needed is when the NBD
server disconnects and there is no reconnect-delay.  In that
case, nbd_receive_replies() does not set s->reply.handle and
nbd_co_do_receive_one_chunk() cannot continue.  For that one case,
check the return value of nbd_receive_replies().

As to the others:

* nbd_receive_replies() can put the current coroutine to sleep if another
reply is ongoing; then it will be woken by nbd_channel_error(), called
by the ongoing reply.  Or it can try itself to read a reply header and
fail, thus calling nbd_channel_error() itself.

* nbd_co_send_request() will write the body of the request and fail

* nbd_reply_chunk_iter_receive() will call nbd_co_receive_one_chunk()
and then nbd_co_do_receive_one_chunk(), which will handle the failure as
above; or it will just detect a previous call to nbd_iter_channel_error()
via iter->ret < 0.

Reviewed-by: Eric Blake <eblake@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/nbd.c | 17 ++++-------------
 1 file changed, 4 insertions(+), 13 deletions(-)

diff --git a/block/nbd.c b/block/nbd.c
index 3cc4ea4430..a30603ce87 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -409,10 +409,6 @@ static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t handle)
             return 0;
         }
 
-        if (!nbd_client_connected(s)) {
-            return -EIO;
-        }
-
         if (s->reply.handle != 0) {
             /*
              * Some other request is being handled now. It should already be
@@ -512,7 +508,7 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
     if (qiov) {
         qio_channel_set_cork(s->ioc, true);
         rc = nbd_send_request(s->ioc, request);
-        if (nbd_client_connected(s) && rc >= 0) {
+        if (rc >= 0) {
             if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
                                        NULL) < 0) {
                 rc = -EIO;
@@ -829,8 +825,8 @@ static coroutine_fn int nbd_co_do_receive_one_chunk(
     }
     *request_ret = 0;
 
-    nbd_receive_replies(s, handle);
-    if (!nbd_client_connected(s)) {
+    ret = nbd_receive_replies(s, handle);
+    if (ret < 0) {
         error_setg(errp, "Connection closed");
         return -EIO;
     }
@@ -982,11 +978,6 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
     NBDReply local_reply;
     NBDStructuredReplyChunk *chunk;
     Error *local_err = NULL;
-    if (!nbd_client_connected(s)) {
-        error_setg(&local_err, "Connection closed");
-        nbd_iter_channel_error(iter, -EIO, &local_err);
-        goto break_loop;
-    }
 
     if (iter->done) {
         /* Previous iteration was last. */
@@ -1007,7 +998,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
     }
 
     /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
-    if (nbd_reply_is_simple(reply) || !nbd_client_connected(s)) {
+    if (nbd_reply_is_simple(reply) || iter->ret < 0) {
         goto break_loop;
     }
 
-- 
2.35.1




^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH v2 for-7.1 4/9] nbd: keep send_mutex/free_sema handling outside nbd_co_do_establish_connection
  2022-04-14 17:57 [PATCH v2 for-7.1 0/9] nbd: actually make s->state thread-safe Paolo Bonzini
                   ` (2 preceding siblings ...)
  2022-04-14 17:57 ` [PATCH v2 for-7.1 3/9] nbd: remove peppering of nbd_client_connected Paolo Bonzini
@ 2022-04-14 17:57 ` Paolo Bonzini
  2022-04-14 19:11   ` Eric Blake
  2022-04-16 11:54   ` Vladimir Sementsov-Ogievskiy
  2022-04-14 17:57 ` [PATCH v2 for-7.1 5/9] nbd: use a QemuMutex to synchronize yanking, reconnection and coroutines Paolo Bonzini
                   ` (5 subsequent siblings)
  9 siblings, 2 replies; 33+ messages in thread
From: Paolo Bonzini @ 2022-04-14 17:57 UTC (permalink / raw)
  To: qemu-devel; +Cc: v.sementsov-og, eblake

Elevate s->in_flight early so that other incoming requests will wait
on the CoQueue in nbd_co_send_request; restart them after getting back
from nbd_reconnect_attempt.  This could be after the reconnect timer or
nbd_cancel_in_flight have cancelled the attempt, so there is no
need anymore to cancel the requests there.

nbd_co_send_request now handles both stopping and restarting pending
requests after a successful connection, and there is no need to
hold send_mutex in nbd_co_do_establish_connection.  The current setup
is confusing because nbd_co_do_establish_connection is called both with
send_mutex taken and without it.  Before the patch it uses free_sema which
(at least in theory...) is protected by send_mutex, after the patch it
does not anymore.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/coroutines.h |  4 +--
 block/nbd.c        | 66 ++++++++++++++++++++++------------------------
 2 files changed, 33 insertions(+), 37 deletions(-)

diff --git a/block/coroutines.h b/block/coroutines.h
index b293e943c8..8f6e438ef3 100644
--- a/block/coroutines.h
+++ b/block/coroutines.h
@@ -59,7 +59,7 @@ int coroutine_fn bdrv_co_writev_vmstate(BlockDriverState *bs,
                                         QEMUIOVector *qiov, int64_t pos);
 
 int coroutine_fn
-nbd_co_do_establish_connection(BlockDriverState *bs, Error **errp);
+nbd_co_do_establish_connection(BlockDriverState *bs, bool blocking, Error **errp);
 
 
 int coroutine_fn
@@ -109,7 +109,7 @@ bdrv_common_block_status_above(BlockDriverState *bs,
                                BlockDriverState **file,
                                int *depth);
 int generated_co_wrapper
-nbd_do_establish_connection(BlockDriverState *bs, Error **errp);
+nbd_do_establish_connection(BlockDriverState *bs, bool blocking, Error **errp);
 
 int generated_co_wrapper
 blk_do_preadv(BlockBackend *blk, int64_t offset, int64_t bytes,
diff --git a/block/nbd.c b/block/nbd.c
index a30603ce87..62dd338ef3 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -187,9 +187,6 @@ static void reconnect_delay_timer_cb(void *opaque)
     if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
         nbd_co_establish_connection_cancel(s->conn);
-        while (qemu_co_enter_next(&s->free_sema, NULL)) {
-            /* Resume all queued requests */
-        }
     }
 
     reconnect_delay_timer_del(s);
@@ -310,11 +307,10 @@ static int nbd_handle_updated_info(BlockDriverState *bs, Error **errp)
 }
 
 int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
-                                                Error **errp)
+                                                bool blocking, Error **errp)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     int ret;
-    bool blocking = nbd_client_connecting_wait(s);
     IO_CODE();
 
     assert(!s->ioc);
@@ -350,7 +346,6 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
 
     /* successfully connected */
     s->state = NBD_CLIENT_CONNECTED;
-    qemu_co_queue_restart_all(&s->free_sema);
 
     return 0;
 }
@@ -358,25 +353,25 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
 /* called under s->send_mutex */
 static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
 {
-    assert(nbd_client_connecting(s));
-    assert(s->in_flight == 0);
-
-    if (nbd_client_connecting_wait(s) && s->reconnect_delay &&
-        !s->reconnect_delay_timer)
-    {
-        /*
-         * It's first reconnect attempt after switching to
-         * NBD_CLIENT_CONNECTING_WAIT
-         */
-        reconnect_delay_timer_init(s,
-            qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
-            s->reconnect_delay * NANOSECONDS_PER_SECOND);
-    }
+    bool blocking = nbd_client_connecting_wait(s);
 
     /*
      * Now we are sure that nobody is accessing the channel, and no one will
      * try until we set the state to CONNECTED.
      */
+    assert(nbd_client_connecting(s));
+    assert(s->in_flight == 1);
+
+    if (blocking && !s->reconnect_delay_timer) {
+        /*
+         * It's the first reconnect attempt after switching to
+         * NBD_CLIENT_CONNECTING_WAIT
+         */
+        g_assert(s->reconnect_delay);
+        reconnect_delay_timer_init(s,
+            qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
+            s->reconnect_delay * NANOSECONDS_PER_SECOND);
+    }
 
     /* Finalize previous connection if any */
     if (s->ioc) {
@@ -387,7 +382,9 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
         s->ioc = NULL;
     }
 
-    nbd_co_do_establish_connection(s->bs, NULL);
+    qemu_co_mutex_unlock(&s->send_mutex);
+    nbd_co_do_establish_connection(s->bs, blocking, NULL);
+    qemu_co_mutex_lock(&s->send_mutex);
 
     /*
      * The reconnect attempt is done (maybe successfully, maybe not), so
@@ -472,21 +469,21 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
     qemu_co_mutex_lock(&s->send_mutex);
 
     while (s->in_flight == MAX_NBD_REQUESTS ||
-           (!nbd_client_connected(s) && s->in_flight > 0))
-    {
+           (!nbd_client_connected(s) && s->in_flight > 0)) {
         qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
     }
 
-    if (nbd_client_connecting(s)) {
-        nbd_reconnect_attempt(s);
-    }
-
-    if (!nbd_client_connected(s)) {
-        rc = -EIO;
-        goto err;
-    }
-
     s->in_flight++;
+    if (!nbd_client_connected(s)) {
+        if (nbd_client_connecting(s)) {
+            nbd_reconnect_attempt(s);
+            qemu_co_queue_restart_all(&s->free_sema);
+        }
+        if (!nbd_client_connected(s)) {
+            rc = -EIO;
+            goto err;
+        }
+    }
 
     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
         if (s->requests[i].coroutine == NULL) {
@@ -526,8 +523,8 @@ err:
         nbd_channel_error(s, rc);
         if (i != -1) {
             s->requests[i].coroutine = NULL;
-            s->in_flight--;
         }
+        s->in_flight--;
         qemu_co_queue_next(&s->free_sema);
     }
     qemu_co_mutex_unlock(&s->send_mutex);
@@ -1883,7 +1880,7 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
     }
 
     s->state = NBD_CLIENT_CONNECTING_WAIT;
-    ret = nbd_do_establish_connection(bs, errp);
+    ret = nbd_do_establish_connection(bs, true, errp);
     if (ret < 0) {
         goto fail;
     }
@@ -2049,7 +2046,6 @@ static void nbd_cancel_in_flight(BlockDriverState *bs)
 
     if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
-        qemu_co_queue_restart_all(&s->free_sema);
     }
 
     nbd_co_establish_connection_cancel(s->conn);
-- 
2.35.1




^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH v2 for-7.1 5/9] nbd: use a QemuMutex to synchronize yanking, reconnection and coroutines
  2022-04-14 17:57 [PATCH v2 for-7.1 0/9] nbd: actually make s->state thread-safe Paolo Bonzini
                   ` (3 preceding siblings ...)
  2022-04-14 17:57 ` [PATCH v2 for-7.1 4/9] nbd: keep send_mutex/free_sema handling outside nbd_co_do_establish_connection Paolo Bonzini
@ 2022-04-14 17:57 ` Paolo Bonzini
  2022-04-14 19:33   ` Eric Blake
  2022-04-16 12:18   ` Vladimir Sementsov-Ogievskiy
  2022-04-14 17:57 ` [PATCH v2 for-7.1 6/9] nbd: code motion and function renaming Paolo Bonzini
                   ` (4 subsequent siblings)
  9 siblings, 2 replies; 33+ messages in thread
From: Paolo Bonzini @ 2022-04-14 17:57 UTC (permalink / raw)
  To: qemu-devel; +Cc: v.sementsov-og, eblake

The condition for waiting on the s->free_sema queue depends on
both s->in_flight and s->state.  The latter is currently using
atomics, but this is quite dubious and probably wrong.

Because s->state is written in the main thread too, for example by
the yank callback, it cannot be protected by a CoMutex.  Introduce a
separate lock that can be used by nbd_co_send_request(); later on this
lock will also be used for s->state.  There will not be any contention
on the lock unless there is a yank or reconnect, so this is not
performance sensitive.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/nbd.c | 46 +++++++++++++++++++++++++++-------------------
 1 file changed, 27 insertions(+), 19 deletions(-)

diff --git a/block/nbd.c b/block/nbd.c
index 62dd338ef3..a2414566d1 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -71,17 +71,22 @@ typedef struct BDRVNBDState {
     QIOChannel *ioc; /* The current I/O channel */
     NBDExportInfo info;
 
-    CoMutex send_mutex;
+    /*
+     * Protects free_sema, in_flight, requests[].coroutine,
+     * reconnect_delay_timer.
+     */
+    QemuMutex requests_lock;
     CoQueue free_sema;
-
-    CoMutex receive_mutex;
     int in_flight;
+    NBDClientRequest requests[MAX_NBD_REQUESTS];
+    QEMUTimer *reconnect_delay_timer;
+
+    CoMutex send_mutex;
+    CoMutex receive_mutex;
     NBDClientState state;
 
-    QEMUTimer *reconnect_delay_timer;
     QEMUTimer *open_timer;
 
-    NBDClientRequest requests[MAX_NBD_REQUESTS];
     NBDReply reply;
     BlockDriverState *bs;
 
@@ -350,7 +355,7 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
     return 0;
 }
 
-/* called under s->send_mutex */
+/* Called with s->requests_lock taken.  */
 static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
 {
     bool blocking = nbd_client_connecting_wait(s);
@@ -382,9 +387,9 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
         s->ioc = NULL;
     }
 
-    qemu_co_mutex_unlock(&s->send_mutex);
+    qemu_mutex_unlock(&s->requests_lock);
     nbd_co_do_establish_connection(s->bs, blocking, NULL);
-    qemu_co_mutex_lock(&s->send_mutex);
+    qemu_mutex_lock(&s->requests_lock);
 
     /*
      * The reconnect attempt is done (maybe successfully, maybe not), so
@@ -466,11 +471,10 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     int rc, i = -1;
 
-    qemu_co_mutex_lock(&s->send_mutex);
-
+    qemu_mutex_lock(&s->requests_lock);
     while (s->in_flight == MAX_NBD_REQUESTS ||
            (!nbd_client_connected(s) && s->in_flight > 0)) {
-        qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
+        qemu_co_queue_wait(&s->free_sema, &s->requests_lock);
     }
 
     s->in_flight++;
@@ -491,13 +495,13 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
         }
     }
 
-    g_assert(qemu_in_coroutine());
     assert(i < MAX_NBD_REQUESTS);
-
     s->requests[i].coroutine = qemu_coroutine_self();
     s->requests[i].offset = request->from;
     s->requests[i].receiving = false;
+    qemu_mutex_unlock(&s->requests_lock);
 
+    qemu_co_mutex_lock(&s->send_mutex);
     request->handle = INDEX_TO_HANDLE(s, i);
 
     assert(s->ioc);
@@ -517,17 +521,19 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
     } else {
         rc = nbd_send_request(s->ioc, request);
     }
+    qemu_co_mutex_unlock(&s->send_mutex);
 
-err:
     if (rc < 0) {
+        qemu_mutex_lock(&s->requests_lock);
+err:
         nbd_channel_error(s, rc);
         if (i != -1) {
             s->requests[i].coroutine = NULL;
         }
         s->in_flight--;
         qemu_co_queue_next(&s->free_sema);
+        qemu_mutex_unlock(&s->requests_lock);
     }
-    qemu_co_mutex_unlock(&s->send_mutex);
     return rc;
 }
 
@@ -1017,12 +1023,11 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
     return true;
 
 break_loop:
+    qemu_mutex_lock(&s->requests_lock);
     s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
-
-    qemu_co_mutex_lock(&s->send_mutex);
     s->in_flight--;
     qemu_co_queue_next(&s->free_sema);
-    qemu_co_mutex_unlock(&s->send_mutex);
+    qemu_mutex_unlock(&s->requests_lock);
 
     return false;
 }
@@ -1855,8 +1860,9 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
     s->bs = bs;
-    qemu_co_mutex_init(&s->send_mutex);
+    qemu_mutex_init(&s->requests_lock);
     qemu_co_queue_init(&s->free_sema);
+    qemu_co_mutex_init(&s->send_mutex);
     qemu_co_mutex_init(&s->receive_mutex);
 
     if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
@@ -2044,9 +2050,11 @@ static void nbd_cancel_in_flight(BlockDriverState *bs)
 
     reconnect_delay_timer_del(s);
 
+    qemu_mutex_lock(&s->requests_lock);
     if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
     }
+    qemu_mutex_unlock(&s->requests_lock);
 
     nbd_co_establish_connection_cancel(s->conn);
 }
-- 
2.35.1




^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH v2 for-7.1 6/9] nbd: code motion and function renaming
  2022-04-14 17:57 [PATCH v2 for-7.1 0/9] nbd: actually make s->state thread-safe Paolo Bonzini
                   ` (4 preceding siblings ...)
  2022-04-14 17:57 ` [PATCH v2 for-7.1 5/9] nbd: use a QemuMutex to synchronize yanking, reconnection and coroutines Paolo Bonzini
@ 2022-04-14 17:57 ` Paolo Bonzini
  2022-04-14 19:37   ` Eric Blake
  2022-04-16 12:37   ` Vladimir Sementsov-Ogievskiy
  2022-04-14 17:57 ` [PATCH v2 for-7.1 7/9] nbd: move s->state under requests_lock Paolo Bonzini
                   ` (3 subsequent siblings)
  9 siblings, 2 replies; 33+ messages in thread
From: Paolo Bonzini @ 2022-04-14 17:57 UTC (permalink / raw)
  To: qemu-devel; +Cc: v.sementsov-og, eblake

Prepare for the next patch, so that the diff is less confusing.

nbd_client_connecting is moved closer to the definition point.

nbd_client_connecting_wait() is kept only for the reconnection
logic; when it is used to check if a request has to be reissued,
use the renamed function nbd_client_will_reconnect().  In the
next patch, the two cases will have different locking requirements.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/nbd.c | 24 ++++++++++++++----------
 1 file changed, 14 insertions(+), 10 deletions(-)

diff --git a/block/nbd.c b/block/nbd.c
index a2414566d1..37d466e435 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -254,18 +254,15 @@ static void open_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
     timer_mod(s->open_timer, expire_time_ns);
 }
 
-static bool nbd_client_connecting(BDRVNBDState *s)
-{
-    NBDClientState state = qatomic_load_acquire(&s->state);
-    return state == NBD_CLIENT_CONNECTING_WAIT ||
-        state == NBD_CLIENT_CONNECTING_NOWAIT;
-}
-
 static bool nbd_client_connecting_wait(BDRVNBDState *s)
 {
     return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
 }
 
+static bool nbd_client_will_reconnect(BDRVNBDState *s)
+{
+    return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
+}
 /*
  * Update @bs with information learned during a completed negotiation process.
  * Return failure if the server's advertised options are incompatible with the
@@ -355,6 +352,13 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
     return 0;
 }
 
+static bool nbd_client_connecting(BDRVNBDState *s)
+{
+    NBDClientState state = qatomic_load_acquire(&s->state);
+    return state == NBD_CLIENT_CONNECTING_WAIT ||
+        state == NBD_CLIENT_CONNECTING_NOWAIT;
+}
+
 /* Called with s->requests_lock taken.  */
 static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
 {
@@ -1190,7 +1194,7 @@ static int coroutine_fn nbd_co_request(BlockDriverState *bs, NBDRequest *request
             error_free(local_err);
             local_err = NULL;
         }
-    } while (ret < 0 && nbd_client_connecting_wait(s));
+    } while (ret < 0 && nbd_client_will_reconnect(s));
 
     return ret ? ret : request_ret;
 }
@@ -1249,7 +1253,7 @@ static int coroutine_fn nbd_client_co_preadv(BlockDriverState *bs, int64_t offse
             error_free(local_err);
             local_err = NULL;
         }
-    } while (ret < 0 && nbd_client_connecting_wait(s));
+    } while (ret < 0 && nbd_client_will_reconnect(s));
 
     return ret ? ret : request_ret;
 }
@@ -1407,7 +1411,7 @@ static int coroutine_fn nbd_client_co_block_status(
             error_free(local_err);
             local_err = NULL;
         }
-    } while (ret < 0 && nbd_client_connecting_wait(s));
+    } while (ret < 0 && nbd_client_will_reconnect(s));
 
     if (ret < 0 || request_ret < 0) {
         return ret ? ret : request_ret;
-- 
2.35.1




^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH v2 for-7.1 7/9] nbd: move s->state under requests_lock
  2022-04-14 17:57 [PATCH v2 for-7.1 0/9] nbd: actually make s->state thread-safe Paolo Bonzini
                   ` (5 preceding siblings ...)
  2022-04-14 17:57 ` [PATCH v2 for-7.1 6/9] nbd: code motion and function renaming Paolo Bonzini
@ 2022-04-14 17:57 ` Paolo Bonzini
  2022-04-14 19:40   ` Eric Blake
  2022-04-16 13:37   ` Vladimir Sementsov-Ogievskiy
  2022-04-14 17:57 ` [PATCH v2 for-7.1 8/9] nbd: take receive_mutex when reading requests[].receiving Paolo Bonzini
                   ` (2 subsequent siblings)
  9 siblings, 2 replies; 33+ messages in thread
From: Paolo Bonzini @ 2022-04-14 17:57 UTC (permalink / raw)
  To: qemu-devel; +Cc: v.sementsov-og, eblake

Remove the confusing, and most likely wrong, atomics.  The only function
that used to be somewhat in a hot path was nbd_client_connected(),
but it is not anymore after the previous patches.

The same logic is used both to check if a request had to be reissued
and also in nbd_reconnecting_attempt().  The former cases are outside
requests_lock, while nbd_reconnecting_attempt() does have the lock,
therefore the two have been separated in the previous commit.
nbd_client_will_reconnect() can simply take s->requests_lock, while
nbd_reconnecting_attempt() can inline the access now that no
complicated atomics are involved.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/nbd.c | 78 ++++++++++++++++++++++++++++-------------------------
 1 file changed, 41 insertions(+), 37 deletions(-)

diff --git a/block/nbd.c b/block/nbd.c
index 37d466e435..b5aac2548c 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -35,7 +35,6 @@
 #include "qemu/option.h"
 #include "qemu/cutils.h"
 #include "qemu/main-loop.h"
-#include "qemu/atomic.h"
 
 #include "qapi/qapi-visit-sockets.h"
 #include "qapi/qmp/qstring.h"
@@ -72,10 +71,11 @@ typedef struct BDRVNBDState {
     NBDExportInfo info;
 
     /*
-     * Protects free_sema, in_flight, requests[].coroutine,
+     * Protects state, free_sema, in_flight, requests[].coroutine,
      * reconnect_delay_timer.
      */
     QemuMutex requests_lock;
+    NBDClientState state;
     CoQueue free_sema;
     int in_flight;
     NBDClientRequest requests[MAX_NBD_REQUESTS];
@@ -83,7 +83,6 @@ typedef struct BDRVNBDState {
 
     CoMutex send_mutex;
     CoMutex receive_mutex;
-    NBDClientState state;
 
     QEMUTimer *open_timer;
 
@@ -132,11 +131,6 @@ static void nbd_clear_bdrvstate(BlockDriverState *bs)
     s->x_dirty_bitmap = NULL;
 }
 
-static bool nbd_client_connected(BDRVNBDState *s)
-{
-    return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED;
-}
-
 static bool coroutine_fn nbd_recv_coroutine_wake_one(NBDClientRequest *req)
 {
     if (req->receiving) {
@@ -159,14 +153,15 @@ static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s, bool all)
     }
 }
 
-static void coroutine_fn nbd_channel_error(BDRVNBDState *s, int ret)
+/* Called with s->requests_lock held.  */
+static void coroutine_fn nbd_channel_error_locked(BDRVNBDState *s, int ret)
 {
-    if (nbd_client_connected(s)) {
+    if (s->state == NBD_CLIENT_CONNECTED) {
         qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
     }
 
     if (ret == -EIO) {
-        if (nbd_client_connected(s)) {
+        if (s->state == NBD_CLIENT_CONNECTED) {
             s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
                                             NBD_CLIENT_CONNECTING_NOWAIT;
         }
@@ -177,6 +172,12 @@ static void coroutine_fn nbd_channel_error(BDRVNBDState *s, int ret)
     nbd_recv_coroutines_wake(s, true);
 }
 
+static void coroutine_fn nbd_channel_error(BDRVNBDState *s, int ret)
+{
+    QEMU_LOCK_GUARD(&s->requests_lock);
+    nbd_channel_error_locked(s, ret);
+}
+
 static void reconnect_delay_timer_del(BDRVNBDState *s)
 {
     if (s->reconnect_delay_timer) {
@@ -189,20 +190,18 @@ static void reconnect_delay_timer_cb(void *opaque)
 {
     BDRVNBDState *s = opaque;
 
-    if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
-        s->state = NBD_CLIENT_CONNECTING_NOWAIT;
-        nbd_co_establish_connection_cancel(s->conn);
-    }
-
     reconnect_delay_timer_del(s);
+    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
+        if (s->state != NBD_CLIENT_CONNECTING_WAIT) {
+            return;
+        }
+        s->state = NBD_CLIENT_CONNECTING_NOWAIT;
+    }
+    nbd_co_establish_connection_cancel(s->conn);
 }
 
 static void reconnect_delay_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
 {
-    if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTING_WAIT) {
-        return;
-    }
-
     assert(!s->reconnect_delay_timer);
     s->reconnect_delay_timer = aio_timer_new(bdrv_get_aio_context(s->bs),
                                              QEMU_CLOCK_REALTIME,
@@ -225,7 +224,9 @@ static void nbd_teardown_connection(BlockDriverState *bs)
         s->ioc = NULL;
     }
 
-    s->state = NBD_CLIENT_QUIT;
+    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
+        s->state = NBD_CLIENT_QUIT;
+    }
 }
 
 static void open_timer_del(BDRVNBDState *s)
@@ -254,15 +255,15 @@ static void open_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
     timer_mod(s->open_timer, expire_time_ns);
 }
 
-static bool nbd_client_connecting_wait(BDRVNBDState *s)
-{
-    return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
-}
-
 static bool nbd_client_will_reconnect(BDRVNBDState *s)
 {
-    return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
+    /*
+     * Called only after a socket error, so this is not performance sensitive.
+     */
+    QEMU_LOCK_GUARD(&s->requests_lock);
+    return s->state == NBD_CLIENT_CONNECTING_WAIT;
 }
+
 /*
  * Update @bs with information learned during a completed negotiation process.
  * Return failure if the server's advertised options are incompatible with the
@@ -347,22 +348,24 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
     qio_channel_attach_aio_context(s->ioc, bdrv_get_aio_context(bs));
 
     /* successfully connected */
-    s->state = NBD_CLIENT_CONNECTED;
+    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
+        s->state = NBD_CLIENT_CONNECTED;
+    }
 
     return 0;
 }
 
+/* Called with s->requests_lock held.  */
 static bool nbd_client_connecting(BDRVNBDState *s)
 {
-    NBDClientState state = qatomic_load_acquire(&s->state);
-    return state == NBD_CLIENT_CONNECTING_WAIT ||
-        state == NBD_CLIENT_CONNECTING_NOWAIT;
+    return s->state == NBD_CLIENT_CONNECTING_WAIT ||
+        s->state == NBD_CLIENT_CONNECTING_NOWAIT;
 }
 
 /* Called with s->requests_lock taken.  */
 static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
 {
-    bool blocking = nbd_client_connecting_wait(s);
+    bool blocking = s->state == NBD_CLIENT_CONNECTING_WAIT;
 
     /*
      * Now we are sure that nobody is accessing the channel, and no one will
@@ -477,17 +480,17 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
 
     qemu_mutex_lock(&s->requests_lock);
     while (s->in_flight == MAX_NBD_REQUESTS ||
-           (!nbd_client_connected(s) && s->in_flight > 0)) {
+           (s->state != NBD_CLIENT_CONNECTED && s->in_flight > 0)) {
         qemu_co_queue_wait(&s->free_sema, &s->requests_lock);
     }
 
     s->in_flight++;
-    if (!nbd_client_connected(s)) {
+    if (s->state != NBD_CLIENT_CONNECTED) {
         if (nbd_client_connecting(s)) {
             nbd_reconnect_attempt(s);
             qemu_co_queue_restart_all(&s->free_sema);
         }
-        if (!nbd_client_connected(s)) {
+        if (s->state != NBD_CLIENT_CONNECTED) {
             rc = -EIO;
             goto err;
         }
@@ -530,7 +533,7 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
     if (rc < 0) {
         qemu_mutex_lock(&s->requests_lock);
 err:
-        nbd_channel_error(s, rc);
+        nbd_channel_error_locked(s, rc);
         if (i != -1) {
             s->requests[i].coroutine = NULL;
         }
@@ -1443,8 +1446,9 @@ static void nbd_yank(void *opaque)
     BlockDriverState *bs = opaque;
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
-    qatomic_store_release(&s->state, NBD_CLIENT_QUIT);
+    QEMU_LOCK_GUARD(&s->requests_lock);
     qio_channel_shutdown(QIO_CHANNEL(s->ioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+    s->state = NBD_CLIENT_QUIT;
 }
 
 static void nbd_client_close(BlockDriverState *bs)
-- 
2.35.1




^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH v2 for-7.1 8/9] nbd: take receive_mutex when reading requests[].receiving
  2022-04-14 17:57 [PATCH v2 for-7.1 0/9] nbd: actually make s->state thread-safe Paolo Bonzini
                   ` (6 preceding siblings ...)
  2022-04-14 17:57 ` [PATCH v2 for-7.1 7/9] nbd: move s->state under requests_lock Paolo Bonzini
@ 2022-04-14 17:57 ` Paolo Bonzini
  2022-04-14 19:42   ` Eric Blake
  2022-04-16 13:54   ` Vladimir Sementsov-Ogievskiy
  2022-04-14 17:57 ` [PATCH v2 for-7.1 9/9] nbd: document what is protected by the CoMutexes Paolo Bonzini
  2022-04-16 19:03 ` [PATCH v2 for-7.1 0/9] nbd: actually make s->state thread-safe Lukas Straub
  9 siblings, 2 replies; 33+ messages in thread
From: Paolo Bonzini @ 2022-04-14 17:57 UTC (permalink / raw)
  To: qemu-devel; +Cc: v.sementsov-og, eblake

requests[].receiving is set by nbd_receive_replies() under the receive_mutex;
Read it under the same mutex as well.  Waking up receivers on errors happens
after each reply finishes processing, in nbd_co_receive_one_chunk().
If there is no currently-active reply, there are two cases:

* either there is no active request at all, in which case no
element of request[] can have .receiving = true

* or nbd_receive_replies() must be running and owns receive_mutex;
in that case it will get back to nbd_co_receive_one_chunk() because
the socket has been shutdown, and all waiting coroutines will wake up
in turn.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/nbd.c | 15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)

diff --git a/block/nbd.c b/block/nbd.c
index b5aac2548c..31c684772e 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -131,6 +131,7 @@ static void nbd_clear_bdrvstate(BlockDriverState *bs)
     s->x_dirty_bitmap = NULL;
 }
 
+/* Called with s->receive_mutex taken.  */
 static bool coroutine_fn nbd_recv_coroutine_wake_one(NBDClientRequest *req)
 {
     if (req->receiving) {
@@ -142,12 +143,13 @@ static bool coroutine_fn nbd_recv_coroutine_wake_one(NBDClientRequest *req)
     return false;
 }
 
-static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s, bool all)
+static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s)
 {
     int i;
 
+    QEMU_LOCK_GUARD(&s->receive_mutex);
     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
-        if (nbd_recv_coroutine_wake_one(&s->requests[i]) && !all) {
+        if (nbd_recv_coroutine_wake_one(&s->requests[i])) {
             return;
         }
     }
@@ -168,8 +170,6 @@ static void coroutine_fn nbd_channel_error_locked(BDRVNBDState *s, int ret)
     } else {
         s->state = NBD_CLIENT_QUIT;
     }
-
-    nbd_recv_coroutines_wake(s, true);
 }
 
 static void coroutine_fn nbd_channel_error(BDRVNBDState *s, int ret)
@@ -432,11 +432,10 @@ static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t handle)
 
             qemu_coroutine_yield();
             /*
-             * We may be woken for 3 reasons:
+             * We may be woken for 2 reasons:
              * 1. From this function, executing in parallel coroutine, when our
              *    handle is received.
-             * 2. From nbd_channel_error(), when connection is lost.
-             * 3. From nbd_co_receive_one_chunk(), when previous request is
+             * 2. From nbd_co_receive_one_chunk(), when previous request is
              *    finished and s->reply.handle set to 0.
              * Anyway, it's OK to lock the mutex and go to the next iteration.
              */
@@ -928,7 +927,7 @@ static coroutine_fn int nbd_co_receive_one_chunk(
     }
     s->reply.handle = 0;
 
-    nbd_recv_coroutines_wake(s, false);
+    nbd_recv_coroutines_wake(s);
 
     return ret;
 }
-- 
2.35.1




^ permalink raw reply related	[flat|nested] 33+ messages in thread

* [PATCH v2 for-7.1 9/9] nbd: document what is protected by the CoMutexes
  2022-04-14 17:57 [PATCH v2 for-7.1 0/9] nbd: actually make s->state thread-safe Paolo Bonzini
                   ` (7 preceding siblings ...)
  2022-04-14 17:57 ` [PATCH v2 for-7.1 8/9] nbd: take receive_mutex when reading requests[].receiving Paolo Bonzini
@ 2022-04-14 17:57 ` Paolo Bonzini
  2022-04-14 19:42   ` Eric Blake
  2022-04-16 14:00   ` Vladimir Sementsov-Ogievskiy
  2022-04-16 19:03 ` [PATCH v2 for-7.1 0/9] nbd: actually make s->state thread-safe Lukas Straub
  9 siblings, 2 replies; 33+ messages in thread
From: Paolo Bonzini @ 2022-04-14 17:57 UTC (permalink / raw)
  To: qemu-devel; +Cc: v.sementsov-og, eblake

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/nbd.c | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/block/nbd.c b/block/nbd.c
index 31c684772e..d0d94b40bd 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -81,12 +81,18 @@ typedef struct BDRVNBDState {
     NBDClientRequest requests[MAX_NBD_REQUESTS];
     QEMUTimer *reconnect_delay_timer;
 
+    /* Protects sending data on the socket.  */
     CoMutex send_mutex;
+
+    /*
+     * Protects receiving reply headers from the socket, as well as the
+     * fields reply and requests[].receiving
+     */
     CoMutex receive_mutex;
+    NBDReply reply;
 
     QEMUTimer *open_timer;
 
-    NBDReply reply;
     BlockDriverState *bs;
 
     /* Connection parameters */
-- 
2.35.1



^ permalink raw reply related	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 1/9] nbd: safeguard against waking up invalid coroutine
  2022-04-14 17:57 ` [PATCH v2 for-7.1 1/9] nbd: safeguard against waking up invalid coroutine Paolo Bonzini
@ 2022-04-14 19:02   ` Eric Blake
  2022-04-15  9:53   ` Vladimir Sementsov-Ogievskiy
  1 sibling, 0 replies; 33+ messages in thread
From: Eric Blake @ 2022-04-14 19:02 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: v.sementsov-og, qemu-devel

On Thu, Apr 14, 2022 at 07:57:48PM +0200, Paolo Bonzini wrote:
> The .reply_possible field of s->requests is never set to false.  This is
> not a problem as it is only a safeguard to detect protocol errors,
> but it's sloppy.  In fact, the field is actually not necessary at all,
> because .coroutine is set to NULL in NBD_FOREACH_REPLY_CHUNK after
> receiving the last chunk.  Thus, replace .reply_possible with .coroutine
> and move the check before deciding the fate of this request.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/nbd.c | 12 +++++-------
>  1 file changed, 5 insertions(+), 7 deletions(-)

Ah, indeed nicer than the v1 approach.

Reviewed-by: Eric Blake <eblake@redhat.com>

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3266
Virtualization:  qemu.org | libvirt.org



^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 4/9] nbd: keep send_mutex/free_sema handling outside nbd_co_do_establish_connection
  2022-04-14 17:57 ` [PATCH v2 for-7.1 4/9] nbd: keep send_mutex/free_sema handling outside nbd_co_do_establish_connection Paolo Bonzini
@ 2022-04-14 19:11   ` Eric Blake
  2022-04-16 11:54   ` Vladimir Sementsov-Ogievskiy
  1 sibling, 0 replies; 33+ messages in thread
From: Eric Blake @ 2022-04-14 19:11 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: v.sementsov-og, qemu-devel

On Thu, Apr 14, 2022 at 07:57:51PM +0200, Paolo Bonzini wrote:
> Elevate s->in_flight early so that other incoming requests will wait
> on the CoQueue in nbd_co_send_request; restart them after getting back
> from nbd_reconnect_attempt.  This could be after the reconnect timer or
> nbd_cancel_in_flight have cancelled the attempt, so there is no
> need anymore to cancel the requests there.
> 
> nbd_co_send_request now handles both stopping and restarting pending
> requests after a successful connection, and there is no need to
> hold send_mutex in nbd_co_do_establish_connection.  The current setup
> is confusing because nbd_co_do_establish_connection is called both with
> send_mutex taken and without it.  Before the patch it uses free_sema which
> (at least in theory...) is protected by send_mutex, after the patch it
> does not anymore.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/coroutines.h |  4 +--
>  block/nbd.c        | 66 ++++++++++++++++++++++------------------------
>  2 files changed, 33 insertions(+), 37 deletions(-)
> 
> diff --git a/block/coroutines.h b/block/coroutines.h
> index b293e943c8..8f6e438ef3 100644
> --- a/block/coroutines.h
> +++ b/block/coroutines.h
> @@ -59,7 +59,7 @@ int coroutine_fn bdrv_co_writev_vmstate(BlockDriverState *bs,
>                                          QEMUIOVector *qiov, int64_t pos);
>  
>  int coroutine_fn
> -nbd_co_do_establish_connection(BlockDriverState *bs, Error **errp);
> +nbd_co_do_establish_connection(BlockDriverState *bs, bool blocking, Error **errp);

Long line; probably worth wrapping.  But that's cosmetic; I could do
it while applying to my tree.

Reviewed-by: Eric Blake <eblake@redhat.com>

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3266
Virtualization:  qemu.org | libvirt.org



^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 5/9] nbd: use a QemuMutex to synchronize yanking, reconnection and coroutines
  2022-04-14 17:57 ` [PATCH v2 for-7.1 5/9] nbd: use a QemuMutex to synchronize yanking, reconnection and coroutines Paolo Bonzini
@ 2022-04-14 19:33   ` Eric Blake
  2022-04-16 12:18   ` Vladimir Sementsov-Ogievskiy
  1 sibling, 0 replies; 33+ messages in thread
From: Eric Blake @ 2022-04-14 19:33 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: v.sementsov-og, qemu-devel

On Thu, Apr 14, 2022 at 07:57:52PM +0200, Paolo Bonzini wrote:
> The condition for waiting on the s->free_sema queue depends on
> both s->in_flight and s->state.  The latter is currently using
> atomics, but this is quite dubious and probably wrong.
> 
> Because s->state is written in the main thread too, for example by
> the yank callback, it cannot be protected by a CoMutex.  Introduce a
> separate lock that can be used by nbd_co_send_request(); later on this
> lock will also be used for s->state.  There will not be any contention
> on the lock unless there is a yank or reconnect, so this is not
> performance sensitive.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/nbd.c | 46 +++++++++++++++++++++++++++-------------------
>  1 file changed, 27 insertions(+), 19 deletions(-)

Reviewed-by: Eric Blake <eblake@redhat.com>

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3266
Virtualization:  qemu.org | libvirt.org



^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 6/9] nbd: code motion and function renaming
  2022-04-14 17:57 ` [PATCH v2 for-7.1 6/9] nbd: code motion and function renaming Paolo Bonzini
@ 2022-04-14 19:37   ` Eric Blake
  2022-04-16 12:37   ` Vladimir Sementsov-Ogievskiy
  1 sibling, 0 replies; 33+ messages in thread
From: Eric Blake @ 2022-04-14 19:37 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: v.sementsov-og, qemu-devel

On Thu, Apr 14, 2022 at 07:57:53PM +0200, Paolo Bonzini wrote:
> Prepare for the next patch, so that the diff is less confusing.
> 
> nbd_client_connecting is moved closer to the definition point.
> 
> nbd_client_connecting_wait() is kept only for the reconnection
> logic; when it is used to check if a request has to be reissued,
> use the renamed function nbd_client_will_reconnect().  In the
> next patch, the two cases will have different locking requirements.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/nbd.c | 24 ++++++++++++++----------
>  1 file changed, 14 insertions(+), 10 deletions(-)

Reviewed-by: Eric Blake <eblake@redhat.com>

Yes, this split makes the next patch easier to read ;)

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3266
Virtualization:  qemu.org | libvirt.org



^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 7/9] nbd: move s->state under requests_lock
  2022-04-14 17:57 ` [PATCH v2 for-7.1 7/9] nbd: move s->state under requests_lock Paolo Bonzini
@ 2022-04-14 19:40   ` Eric Blake
  2022-04-16 13:37   ` Vladimir Sementsov-Ogievskiy
  1 sibling, 0 replies; 33+ messages in thread
From: Eric Blake @ 2022-04-14 19:40 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: v.sementsov-og, qemu-devel

On Thu, Apr 14, 2022 at 07:57:54PM +0200, Paolo Bonzini wrote:
> Remove the confusing, and most likely wrong, atomics.  The only function
> that used to be somewhat in a hot path was nbd_client_connected(),
> but it is not anymore after the previous patches.
> 
> The same logic is used both to check if a request had to be reissued
> and also in nbd_reconnecting_attempt().  The former cases are outside
> requests_lock, while nbd_reconnecting_attempt() does have the lock,
> therefore the two have been separated in the previous commit.
> nbd_client_will_reconnect() can simply take s->requests_lock, while
> nbd_reconnecting_attempt() can inline the access now that no
> complicated atomics are involved.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/nbd.c | 78 ++++++++++++++++++++++++++++-------------------------
>  1 file changed, 41 insertions(+), 37 deletions(-)
>

Reviewed-by: Eric Blake <eblake@redhat.com>

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3266
Virtualization:  qemu.org | libvirt.org



^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 8/9] nbd: take receive_mutex when reading requests[].receiving
  2022-04-14 17:57 ` [PATCH v2 for-7.1 8/9] nbd: take receive_mutex when reading requests[].receiving Paolo Bonzini
@ 2022-04-14 19:42   ` Eric Blake
  2022-04-16 13:54   ` Vladimir Sementsov-Ogievskiy
  1 sibling, 0 replies; 33+ messages in thread
From: Eric Blake @ 2022-04-14 19:42 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: v.sementsov-og, qemu-devel

On Thu, Apr 14, 2022 at 07:57:55PM +0200, Paolo Bonzini wrote:
> requests[].receiving is set by nbd_receive_replies() under the receive_mutex;
> Read it under the same mutex as well.  Waking up receivers on errors happens
> after each reply finishes processing, in nbd_co_receive_one_chunk().
> If there is no currently-active reply, there are two cases:
> 
> * either there is no active request at all, in which case no
> element of request[] can have .receiving = true
> 
> * or nbd_receive_replies() must be running and owns receive_mutex;
> in that case it will get back to nbd_co_receive_one_chunk() because
> the socket has been shutdown, and all waiting coroutines will wake up
> in turn.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/nbd.c | 15 +++++++--------
>  1 file changed, 7 insertions(+), 8 deletions(-)

Reviewed-by: Eric Blake <eblake@redhat.com>

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3266
Virtualization:  qemu.org | libvirt.org



^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 9/9] nbd: document what is protected by the CoMutexes
  2022-04-14 17:57 ` [PATCH v2 for-7.1 9/9] nbd: document what is protected by the CoMutexes Paolo Bonzini
@ 2022-04-14 19:42   ` Eric Blake
  2022-04-16 14:00   ` Vladimir Sementsov-Ogievskiy
  1 sibling, 0 replies; 33+ messages in thread
From: Eric Blake @ 2022-04-14 19:42 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: v.sementsov-og, qemu-devel

On Thu, Apr 14, 2022 at 07:57:56PM +0200, Paolo Bonzini wrote:
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/nbd.c | 8 +++++++-
>  1 file changed, 7 insertions(+), 1 deletion(-)

Reviewed-by: Eric Blake <eblake@redhat.com>

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3266
Virtualization:  qemu.org | libvirt.org



^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 1/9] nbd: safeguard against waking up invalid coroutine
  2022-04-14 17:57 ` [PATCH v2 for-7.1 1/9] nbd: safeguard against waking up invalid coroutine Paolo Bonzini
  2022-04-14 19:02   ` Eric Blake
@ 2022-04-15  9:53   ` Vladimir Sementsov-Ogievskiy
  1 sibling, 0 replies; 33+ messages in thread
From: Vladimir Sementsov-Ogievskiy @ 2022-04-15  9:53 UTC (permalink / raw)
  To: Paolo Bonzini, qemu-devel; +Cc: eblake

14.04.2022 20:57, Paolo Bonzini wrote:
> The .reply_possible field of s->requests is never set to false.  This is
> not a problem as it is only a safeguard to detect protocol errors,
> but it's sloppy.  In fact, the field is actually not necessary at all,
> because .coroutine is set to NULL in NBD_FOREACH_REPLY_CHUNK after
> receiving the last chunk.  Thus, replace .reply_possible with .coroutine
> and move the check before deciding the fate of this request.
> 
> Signed-off-by: Paolo Bonzini<pbonzini@redhat.com>

Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@openvz.org>

-- 
Best regards,
Vladimir


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 2/9] nbd: mark more coroutine_fns
  2022-04-14 17:57 ` [PATCH v2 for-7.1 2/9] nbd: mark more coroutine_fns Paolo Bonzini
@ 2022-04-15 10:12   ` Vladimir Sementsov-Ogievskiy
  0 siblings, 0 replies; 33+ messages in thread
From: Vladimir Sementsov-Ogievskiy @ 2022-04-15 10:12 UTC (permalink / raw)
  To: Paolo Bonzini, qemu-devel; +Cc: eblake

14.04.2022 20:57, Paolo Bonzini wrote:
> Several coroutine functions in block/nbd.c are not marked as such.  This
> patch adds a few more markers; it is not exhaustive, but it focuses
> especially on:
> 
> - places that wake other coroutines, because aio_co_wake() has very
> different semantics inside a coroutine (queuing after yield vs. entering
> immediately);
> 
> - functions with_co_  in their names, to avoid confusion
> 
> Reviewed-by: Eric Blake<eblake@redhat.com>
> Signed-off-by: Paolo Bonzini<pbonzini@redhat.com>

nbd_reply_chunk_iter_receive is forgotten :)

Also, many function headers becomes >80 character length, checkpatch should complain.

That all is not critical and may be fixed by follow-up.

Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@openvz.org>

-- 
Best regards,
Vladimir


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 3/9] nbd: remove peppering of nbd_client_connected
  2022-04-14 17:57 ` [PATCH v2 for-7.1 3/9] nbd: remove peppering of nbd_client_connected Paolo Bonzini
@ 2022-04-15 17:01   ` Vladimir Sementsov-Ogievskiy
  2022-04-23  8:30     ` Paolo Bonzini
  0 siblings, 1 reply; 33+ messages in thread
From: Vladimir Sementsov-Ogievskiy @ 2022-04-15 17:01 UTC (permalink / raw)
  To: Paolo Bonzini, qemu-devel; +Cc: eblake

14.04.2022 20:57, Paolo Bonzini wrote:
> It is unnecessary to check nbd_client_connected() because every time
> s->state is moved out of NBD_CLIENT_CONNECTED the socket is shut down
> and all coroutines are resumed.
> 
> The only case where it was actually needed is when the NBD
> server disconnects and there is no reconnect-delay.  In that
> case, nbd_receive_replies() does not set s->reply.handle and
> nbd_co_do_receive_one_chunk() cannot continue.  For that one case,
> check the return value of nbd_receive_replies().
> 
> As to the others:
> 
> * nbd_receive_replies() can put the current coroutine to sleep if another
> reply is ongoing; then it will be woken by nbd_channel_error(), called
> by the ongoing reply.  Or it can try itself to read a reply header and
> fail, thus calling nbd_channel_error() itself.
> 
> * nbd_co_send_request() will write the body of the request and fail
> 
> * nbd_reply_chunk_iter_receive() will call nbd_co_receive_one_chunk()
> and then nbd_co_do_receive_one_chunk(), which will handle the failure as
> above; or it will just detect a previous call to nbd_iter_channel_error()
> via iter->ret < 0.
> 
> Reviewed-by: Eric Blake <eblake@redhat.com>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>   block/nbd.c | 17 ++++-------------
>   1 file changed, 4 insertions(+), 13 deletions(-)
> 
> diff --git a/block/nbd.c b/block/nbd.c
> index 3cc4ea4430..a30603ce87 100644
> --- a/block/nbd.c
> +++ b/block/nbd.c
> @@ -409,10 +409,6 @@ static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t handle)
>               return 0;
>           }
>   
> -        if (!nbd_client_connected(s)) {
> -            return -EIO;
> -        }
> -
>           if (s->reply.handle != 0) {
>               /*
>                * Some other request is being handled now. It should already be
> @@ -512,7 +508,7 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
>       if (qiov) {
>           qio_channel_set_cork(s->ioc, true);
>           rc = nbd_send_request(s->ioc, request);
> -        if (nbd_client_connected(s) && rc >= 0) {
> +        if (rc >= 0) {
>               if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
>                                          NULL) < 0) {
>                   rc = -EIO;
> @@ -829,8 +825,8 @@ static coroutine_fn int nbd_co_do_receive_one_chunk(
>       }
>       *request_ret = 0;
>   
> -    nbd_receive_replies(s, handle);
> -    if (!nbd_client_connected(s)) {
> +    ret = nbd_receive_replies(s, handle);
> +    if (ret < 0) {
>           error_setg(errp, "Connection closed");
>           return -EIO;
>       }
> @@ -982,11 +978,6 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
>       NBDReply local_reply;
>       NBDStructuredReplyChunk *chunk;
>       Error *local_err = NULL;
> -    if (!nbd_client_connected(s)) {
> -        error_setg(&local_err, "Connection closed");
> -        nbd_iter_channel_error(iter, -EIO, &local_err);
> -        goto break_loop;
> -    }

Probably we should still check iter->ret here. It's strange to start new iteration, when user set iter->ret in previous iteration of NBD_FOREACH_REPLY_CHUNK()

Or, maybe we should set iter->done in nbd_iter_channel_error ?

>   
>       if (iter->done) {
>           /* Previous iteration was last. */
> @@ -1007,7 +998,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
>       }
>   
>       /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
> -    if (nbd_reply_is_simple(reply) || !nbd_client_connected(s)) {
> +    if (nbd_reply_is_simple(reply) || iter->ret < 0) {

And then here, may be we can just goto break_loop from previous "if (ret < 0)". Then we'll not have to check iter->ret.

>           goto break_loop;
>       }
>   

anyway:
Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@openvz.org>

(a bit weak, as it really hard to imagine all these paths and possible consequences :/

-- 
Best regards,
Vladimir


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 4/9] nbd: keep send_mutex/free_sema handling outside nbd_co_do_establish_connection
  2022-04-14 17:57 ` [PATCH v2 for-7.1 4/9] nbd: keep send_mutex/free_sema handling outside nbd_co_do_establish_connection Paolo Bonzini
  2022-04-14 19:11   ` Eric Blake
@ 2022-04-16 11:54   ` Vladimir Sementsov-Ogievskiy
  2022-04-23  8:34     ` Paolo Bonzini
  1 sibling, 1 reply; 33+ messages in thread
From: Vladimir Sementsov-Ogievskiy @ 2022-04-16 11:54 UTC (permalink / raw)
  To: Paolo Bonzini, qemu-devel; +Cc: eblake

14.04.2022 20:57, Paolo Bonzini wrote:
> Elevate s->in_flight early so that other incoming requests will wait
> on the CoQueue in nbd_co_send_request; restart them after getting back
> from nbd_reconnect_attempt.  This could be after the reconnect timer or
> nbd_cancel_in_flight have cancelled the attempt, so there is no
> need anymore to cancel the requests there.
> 
> nbd_co_send_request now handles both stopping and restarting pending
> requests after a successful connection, and there is no need to
> hold send_mutex in nbd_co_do_establish_connection.  The current setup
> is confusing because nbd_co_do_establish_connection is called both with
> send_mutex taken and without it.  Before the patch it uses free_sema which
> (at least in theory...) is protected by send_mutex, after the patch it
> does not anymore.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>

Seems good to me, still again, hard to imagine, can it lead to some new dead-locks or not. Seems not, at least I don't see the problem. We will see.

Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@openvz.org>

> ---
>   block/coroutines.h |  4 +--
>   block/nbd.c        | 66 ++++++++++++++++++++++------------------------
>   2 files changed, 33 insertions(+), 37 deletions(-)
> 
> diff --git a/block/coroutines.h b/block/coroutines.h
> index b293e943c8..8f6e438ef3 100644
> --- a/block/coroutines.h
> +++ b/block/coroutines.h
> @@ -59,7 +59,7 @@ int coroutine_fn bdrv_co_writev_vmstate(BlockDriverState *bs,
>                                           QEMUIOVector *qiov, int64_t pos);
>   
>   int coroutine_fn
> -nbd_co_do_establish_connection(BlockDriverState *bs, Error **errp);
> +nbd_co_do_establish_connection(BlockDriverState *bs, bool blocking, Error **errp);
>   
>   
>   int coroutine_fn
> @@ -109,7 +109,7 @@ bdrv_common_block_status_above(BlockDriverState *bs,
>                                  BlockDriverState **file,
>                                  int *depth);
>   int generated_co_wrapper
> -nbd_do_establish_connection(BlockDriverState *bs, Error **errp);
> +nbd_do_establish_connection(BlockDriverState *bs, bool blocking, Error **errp);
>   
>   int generated_co_wrapper
>   blk_do_preadv(BlockBackend *blk, int64_t offset, int64_t bytes,
> diff --git a/block/nbd.c b/block/nbd.c
> index a30603ce87..62dd338ef3 100644
> --- a/block/nbd.c
> +++ b/block/nbd.c
> @@ -187,9 +187,6 @@ static void reconnect_delay_timer_cb(void *opaque)
>       if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
>           s->state = NBD_CLIENT_CONNECTING_NOWAIT;
>           nbd_co_establish_connection_cancel(s->conn);
> -        while (qemu_co_enter_next(&s->free_sema, NULL)) {
> -            /* Resume all queued requests */
> -        }
>       }
>   
>       reconnect_delay_timer_del(s);

Seems, removing the timer is not needed here too. We do nbd_co_establish_connection_cancel(), and timer will be removed in nbd_reconnect_attempt anyway.

More over, we don't need to keep timer in the state at all: it should become local thing for nbd_reconnect_attempt. A kind of call nbd_co_do_establish_connection() with timeout. That could be refactored later, using same way as in 4-5 patches of my "[PATCH v4 0/7] copy-before-write: on-cbw-error and cbw-timeout" series.

> @@ -310,11 +307,10 @@ static int nbd_handle_updated_info(BlockDriverState *bs, Error **errp)
>   }
>   
>   int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
> -                                                Error **errp)
> +                                                bool blocking, Error **errp)
>   {
>       BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
>       int ret;
> -    bool blocking = nbd_client_connecting_wait(s);
>       IO_CODE();
>   
>       assert(!s->ioc);
> @@ -350,7 +346,6 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
>   
>       /* successfully connected */
>       s->state = NBD_CLIENT_CONNECTED;
> -    qemu_co_queue_restart_all(&s->free_sema);
>   
>       return 0;
>   }
> @@ -358,25 +353,25 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
>   /* called under s->send_mutex */
>   static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
>   {
> -    assert(nbd_client_connecting(s));
> -    assert(s->in_flight == 0);
> -
> -    if (nbd_client_connecting_wait(s) && s->reconnect_delay &&
> -        !s->reconnect_delay_timer)
> -    {
> -        /*
> -         * It's first reconnect attempt after switching to
> -         * NBD_CLIENT_CONNECTING_WAIT
> -         */
> -        reconnect_delay_timer_init(s,
> -            qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
> -            s->reconnect_delay * NANOSECONDS_PER_SECOND);
> -    }
> +    bool blocking = nbd_client_connecting_wait(s);
>   
>       /*
>        * Now we are sure that nobody is accessing the channel, and no one will
>        * try until we set the state to CONNECTED.
>        */
> +    assert(nbd_client_connecting(s));
> +    assert(s->in_flight == 1);
> +
> +    if (blocking && !s->reconnect_delay_timer) {
> +        /*
> +         * It's the first reconnect attempt after switching to
> +         * NBD_CLIENT_CONNECTING_WAIT
> +         */
> +        g_assert(s->reconnect_delay);
> +        reconnect_delay_timer_init(s,
> +            qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
> +            s->reconnect_delay * NANOSECONDS_PER_SECOND);
> +    }
>   
>       /* Finalize previous connection if any */
>       if (s->ioc) {
> @@ -387,7 +382,9 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
>           s->ioc = NULL;
>       }
>   
> -    nbd_co_do_establish_connection(s->bs, NULL);
> +    qemu_co_mutex_unlock(&s->send_mutex);
> +    nbd_co_do_establish_connection(s->bs, blocking, NULL);
> +    qemu_co_mutex_lock(&s->send_mutex);


Hmm. So, that breaks a critical section.

We can do it because in_flight=1 and we are not connected => all other requests will wait in the queue.

Still. during nbd_co_do_establish_connection() state may become CONNECTED. That theoretically means that parallel requests may start.

Is it bad? Seems not.. Bad thing that comes into my mind is that parallel request fails, and go to reconnect, and start own timer, but we remove it now after locking the mutex back. But that's not possible as parallel request should wait for in-flight=0 to start own reconnect-attempt. And that is not possible, as we keep our in-flight point.


>   
>       /*
>        * The reconnect attempt is done (maybe successfully, maybe not), so
> @@ -472,21 +469,21 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
>       qemu_co_mutex_lock(&s->send_mutex);
>   
>       while (s->in_flight == MAX_NBD_REQUESTS ||
> -           (!nbd_client_connected(s) && s->in_flight > 0))
> -    {
> +           (!nbd_client_connected(s) && s->in_flight > 0)) {
>           qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
>       }
>   
> -    if (nbd_client_connecting(s)) {
> -        nbd_reconnect_attempt(s);
> -    }
> -
> -    if (!nbd_client_connected(s)) {
> -        rc = -EIO;
> -        goto err;
> -    }
> -
>       s->in_flight++;

I like how this works.

> +    if (!nbd_client_connected(s)) {
> +        if (nbd_client_connecting(s)) {
> +            nbd_reconnect_attempt(s);
> +            qemu_co_queue_restart_all(&s->free_sema);
> +        }
> +        if (!nbd_client_connected(s)) {
> +            rc = -EIO;
> +            goto err;
> +        }
> +    }
>   
>       for (i = 0; i < MAX_NBD_REQUESTS; i++) {
>           if (s->requests[i].coroutine == NULL) {
> @@ -526,8 +523,8 @@ err:
>           nbd_channel_error(s, rc);
>           if (i != -1) {
>               s->requests[i].coroutine = NULL;
> -            s->in_flight--;
>           }
> +        s->in_flight--;
>           qemu_co_queue_next(&s->free_sema);
>       }
>       qemu_co_mutex_unlock(&s->send_mutex);
> @@ -1883,7 +1880,7 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
>       }
>   
>       s->state = NBD_CLIENT_CONNECTING_WAIT;
> -    ret = nbd_do_establish_connection(bs, errp);
> +    ret = nbd_do_establish_connection(bs, true, errp);
>       if (ret < 0) {
>           goto fail;
>       }
> @@ -2049,7 +2046,6 @@ static void nbd_cancel_in_flight(BlockDriverState *bs)
>   
>       if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
>           s->state = NBD_CLIENT_CONNECTING_NOWAIT;
> -        qemu_co_queue_restart_all(&s->free_sema);

As I understand, we always have one request running (or no requests at all, but that's OK too) and it will wake all others (altogether, or they will wake each other in a chain). So, we don't need to wake them here.

>       }
>   
>       nbd_co_establish_connection_cancel(s->conn);


-- 
Best regards,
Vladimir


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 5/9] nbd: use a QemuMutex to synchronize yanking, reconnection and coroutines
  2022-04-14 17:57 ` [PATCH v2 for-7.1 5/9] nbd: use a QemuMutex to synchronize yanking, reconnection and coroutines Paolo Bonzini
  2022-04-14 19:33   ` Eric Blake
@ 2022-04-16 12:18   ` Vladimir Sementsov-Ogievskiy
  2022-04-23  8:37     ` Paolo Bonzini
  1 sibling, 1 reply; 33+ messages in thread
From: Vladimir Sementsov-Ogievskiy @ 2022-04-16 12:18 UTC (permalink / raw)
  To: Paolo Bonzini, qemu-devel; +Cc: eblake

14.04.2022 20:57, Paolo Bonzini wrote:
> The condition for waiting on the s->free_sema queue depends on
> both s->in_flight and s->state.  The latter is currently using
> atomics, but this is quite dubious and probably wrong.
> 
> Because s->state is written in the main thread too, for example by
> the yank callback, it cannot be protected by a CoMutex.  Introduce a
> separate lock that can be used by nbd_co_send_request(); later on this
> lock will also be used for s->state.  There will not be any contention
> on the lock unless there is a yank or reconnect, so this is not
> performance sensitive.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>   block/nbd.c | 46 +++++++++++++++++++++++++++-------------------
>   1 file changed, 27 insertions(+), 19 deletions(-)
> 
> diff --git a/block/nbd.c b/block/nbd.c
> index 62dd338ef3..a2414566d1 100644
> --- a/block/nbd.c
> +++ b/block/nbd.c
> @@ -71,17 +71,22 @@ typedef struct BDRVNBDState {
>       QIOChannel *ioc; /* The current I/O channel */
>       NBDExportInfo info;
>   
> -    CoMutex send_mutex;
> +    /*
> +     * Protects free_sema, in_flight, requests[].coroutine,
> +     * reconnect_delay_timer.
> +     */

requests[].coroutine is read without mutex in nbd_receive_replies

reconnect_delay_timer_del() is called without mutex in nbd_cancel_in_flight() and in reconnect_delay_timer_cb()..

Is it OK? Worth improving the comment?

> +    QemuMutex requests_lock;
>       CoQueue free_sema;
> -
> -    CoMutex receive_mutex;
>       int in_flight;
> +    NBDClientRequest requests[MAX_NBD_REQUESTS];
> +    QEMUTimer *reconnect_delay_timer;
> +
> +    CoMutex send_mutex;
> +    CoMutex receive_mutex;
>       NBDClientState state;
>   
> -    QEMUTimer *reconnect_delay_timer;
>       QEMUTimer *open_timer;
>   
> -    NBDClientRequest requests[MAX_NBD_REQUESTS];
>       NBDReply reply;
>       BlockDriverState *bs;
>   
> @@ -350,7 +355,7 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
>       return 0;
>   }
>   
> -/* called under s->send_mutex */
> +/* Called with s->requests_lock taken.  */
>   static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
>   {
>       bool blocking = nbd_client_connecting_wait(s);
> @@ -382,9 +387,9 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
>           s->ioc = NULL;
>       }
>   
> -    qemu_co_mutex_unlock(&s->send_mutex);
> +    qemu_mutex_unlock(&s->requests_lock);
>       nbd_co_do_establish_connection(s->bs, blocking, NULL);
> -    qemu_co_mutex_lock(&s->send_mutex);
> +    qemu_mutex_lock(&s->requests_lock);
>   
>       /*
>        * The reconnect attempt is done (maybe successfully, maybe not), so
> @@ -466,11 +471,10 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
>       BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
>       int rc, i = -1;
>   
> -    qemu_co_mutex_lock(&s->send_mutex);
> -
> +    qemu_mutex_lock(&s->requests_lock);
>       while (s->in_flight == MAX_NBD_REQUESTS ||
>              (!nbd_client_connected(s) && s->in_flight > 0)) {
> -        qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
> +        qemu_co_queue_wait(&s->free_sema, &s->requests_lock);
>       }
>   
>       s->in_flight++;
> @@ -491,13 +495,13 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,

Prior to this patch, if request sending fails, we'll not send further requests. After the patch, we can send more requests after failure on unlocking send_mutex.

May be that's not bad..

What's wrong if we keep send_mutex critical section as is and just lock requests_lock additionally inside send_mutex-critical-section?


>           }
>       }
>   
> -    g_assert(qemu_in_coroutine());
>       assert(i < MAX_NBD_REQUESTS);
> -
>       s->requests[i].coroutine = qemu_coroutine_self();
>       s->requests[i].offset = request->from;
>       s->requests[i].receiving = false;
> +    qemu_mutex_unlock(&s->requests_lock);
>   
> +    qemu_co_mutex_lock(&s->send_mutex);
>       request->handle = INDEX_TO_HANDLE(s, i);
>   
>       assert(s->ioc);
> @@ -517,17 +521,19 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
>       } else {
>           rc = nbd_send_request(s->ioc, request);
>       }
> +    qemu_co_mutex_unlock(&s->send_mutex);
>   
> -err:
>       if (rc < 0) {
> +        qemu_mutex_lock(&s->requests_lock);
> +err:
>           nbd_channel_error(s, rc);
>           if (i != -1) {
>               s->requests[i].coroutine = NULL;
>           }
>           s->in_flight--;
>           qemu_co_queue_next(&s->free_sema);
> +        qemu_mutex_unlock(&s->requests_lock);
>       }
> -    qemu_co_mutex_unlock(&s->send_mutex);
>       return rc;
>   }
>   
> @@ -1017,12 +1023,11 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
>       return true;
>   
>   break_loop:
> +    qemu_mutex_lock(&s->requests_lock);
>       s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
> -
> -    qemu_co_mutex_lock(&s->send_mutex);
>       s->in_flight--;
>       qemu_co_queue_next(&s->free_sema);
> -    qemu_co_mutex_unlock(&s->send_mutex);
> +    qemu_mutex_unlock(&s->requests_lock);
>   
>       return false;
>   }
> @@ -1855,8 +1860,9 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
>       BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
>   
>       s->bs = bs;
> -    qemu_co_mutex_init(&s->send_mutex);
> +    qemu_mutex_init(&s->requests_lock);
>       qemu_co_queue_init(&s->free_sema);
> +    qemu_co_mutex_init(&s->send_mutex);
>       qemu_co_mutex_init(&s->receive_mutex);
>   
>       if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
> @@ -2044,9 +2050,11 @@ static void nbd_cancel_in_flight(BlockDriverState *bs)
>   
>       reconnect_delay_timer_del(s);
>   
> +    qemu_mutex_lock(&s->requests_lock);
>       if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
>           s->state = NBD_CLIENT_CONNECTING_NOWAIT;
>       }
> +    qemu_mutex_unlock(&s->requests_lock);
>   
>       nbd_co_establish_connection_cancel(s->conn);
>   }


-- 
Best regards,
Vladimir


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 6/9] nbd: code motion and function renaming
  2022-04-14 17:57 ` [PATCH v2 for-7.1 6/9] nbd: code motion and function renaming Paolo Bonzini
  2022-04-14 19:37   ` Eric Blake
@ 2022-04-16 12:37   ` Vladimir Sementsov-Ogievskiy
  2022-04-23  8:38     ` Paolo Bonzini
  1 sibling, 1 reply; 33+ messages in thread
From: Vladimir Sementsov-Ogievskiy @ 2022-04-16 12:37 UTC (permalink / raw)
  To: Paolo Bonzini, qemu-devel; +Cc: eblake

14.04.2022 20:57, Paolo Bonzini wrote:
> Prepare for the next patch, so that the diff is less confusing.
> 
> nbd_client_connecting is moved closer to the definition point.

Amm. To usage-point you mean?
The original idea was to keep simple state-reading helpers definitions together :)

> 
> nbd_client_connecting_wait() is kept only for the reconnection
> logic; when it is used to check if a request has to be reissued,
> use the renamed function nbd_client_will_reconnect().  In the
> next patch, the two cases will have different locking requirements.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>

Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@openvz.org>

> ---
>   block/nbd.c | 24 ++++++++++++++----------
>   1 file changed, 14 insertions(+), 10 deletions(-)
> 
> diff --git a/block/nbd.c b/block/nbd.c
> index a2414566d1..37d466e435 100644
> --- a/block/nbd.c
> +++ b/block/nbd.c
> @@ -254,18 +254,15 @@ static void open_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
>       timer_mod(s->open_timer, expire_time_ns);
>   }
>   
> -static bool nbd_client_connecting(BDRVNBDState *s)
> -{
> -    NBDClientState state = qatomic_load_acquire(&s->state);
> -    return state == NBD_CLIENT_CONNECTING_WAIT ||
> -        state == NBD_CLIENT_CONNECTING_NOWAIT;
> -}
> -
>   static bool nbd_client_connecting_wait(BDRVNBDState *s)
>   {
>       return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
>   }
>   
> +static bool nbd_client_will_reconnect(BDRVNBDState *s)
> +{
> +    return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
> +}
>   /*
>    * Update @bs with information learned during a completed negotiation process.
>    * Return failure if the server's advertised options are incompatible with the
> @@ -355,6 +352,13 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
>       return 0;
>   }
>   
> +static bool nbd_client_connecting(BDRVNBDState *s)
> +{
> +    NBDClientState state = qatomic_load_acquire(&s->state);
> +    return state == NBD_CLIENT_CONNECTING_WAIT ||
> +        state == NBD_CLIENT_CONNECTING_NOWAIT;
> +}
> +
>   /* Called with s->requests_lock taken.  */
>   static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
>   {
> @@ -1190,7 +1194,7 @@ static int coroutine_fn nbd_co_request(BlockDriverState *bs, NBDRequest *request
>               error_free(local_err);
>               local_err = NULL;
>           }
> -    } while (ret < 0 && nbd_client_connecting_wait(s));
> +    } while (ret < 0 && nbd_client_will_reconnect(s));
>   
>       return ret ? ret : request_ret;
>   }
> @@ -1249,7 +1253,7 @@ static int coroutine_fn nbd_client_co_preadv(BlockDriverState *bs, int64_t offse
>               error_free(local_err);
>               local_err = NULL;
>           }
> -    } while (ret < 0 && nbd_client_connecting_wait(s));
> +    } while (ret < 0 && nbd_client_will_reconnect(s));
>   
>       return ret ? ret : request_ret;
>   }
> @@ -1407,7 +1411,7 @@ static int coroutine_fn nbd_client_co_block_status(
>               error_free(local_err);
>               local_err = NULL;
>           }
> -    } while (ret < 0 && nbd_client_connecting_wait(s));
> +    } while (ret < 0 && nbd_client_will_reconnect(s));
>   
>       if (ret < 0 || request_ret < 0) {
>           return ret ? ret : request_ret;


-- 
Best regards,
Vladimir


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 7/9] nbd: move s->state under requests_lock
  2022-04-14 17:57 ` [PATCH v2 for-7.1 7/9] nbd: move s->state under requests_lock Paolo Bonzini
  2022-04-14 19:40   ` Eric Blake
@ 2022-04-16 13:37   ` Vladimir Sementsov-Ogievskiy
  1 sibling, 0 replies; 33+ messages in thread
From: Vladimir Sementsov-Ogievskiy @ 2022-04-16 13:37 UTC (permalink / raw)
  To: Paolo Bonzini, qemu-devel; +Cc: eblake

14.04.2022 20:57, Paolo Bonzini wrote:
> Remove the confusing, and most likely wrong, atomics.  The only function
> that used to be somewhat in a hot path was nbd_client_connected(),
> but it is not anymore after the previous patches.
> 
> The same logic is used both to check if a request had to be reissued
> and also in nbd_reconnecting_attempt().  The former cases are outside
> requests_lock, while nbd_reconnecting_attempt() does have the lock,
> therefore the two have been separated in the previous commit.
> nbd_client_will_reconnect() can simply take s->requests_lock, while
> nbd_reconnecting_attempt() can inline the access now that no
> complicated atomics are involved.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>


Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@openvz.org>

> ---
>   block/nbd.c | 78 ++++++++++++++++++++++++++++-------------------------
>   1 file changed, 41 insertions(+), 37 deletions(-)
> 
> diff --git a/block/nbd.c b/block/nbd.c
> index 37d466e435..b5aac2548c 100644
> --- a/block/nbd.c
> +++ b/block/nbd.c
> @@ -35,7 +35,6 @@
>   #include "qemu/option.h"
>   #include "qemu/cutils.h"
>   #include "qemu/main-loop.h"
> -#include "qemu/atomic.h"
>   
>   #include "qapi/qapi-visit-sockets.h"
>   #include "qapi/qmp/qstring.h"
> @@ -72,10 +71,11 @@ typedef struct BDRVNBDState {
>       NBDExportInfo info;
>   
>       /*
> -     * Protects free_sema, in_flight, requests[].coroutine,
> +     * Protects state, free_sema, in_flight, requests[].coroutine,
>        * reconnect_delay_timer.
>        */
>       QemuMutex requests_lock;
> +    NBDClientState state;
>       CoQueue free_sema;
>       int in_flight;
>       NBDClientRequest requests[MAX_NBD_REQUESTS];
> @@ -83,7 +83,6 @@ typedef struct BDRVNBDState {
>   
>       CoMutex send_mutex;
>       CoMutex receive_mutex;
> -    NBDClientState state;
>   
>       QEMUTimer *open_timer;
>   
> @@ -132,11 +131,6 @@ static void nbd_clear_bdrvstate(BlockDriverState *bs)
>       s->x_dirty_bitmap = NULL;
>   }
>   
> -static bool nbd_client_connected(BDRVNBDState *s)
> -{
> -    return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED;
> -}
> -
>   static bool coroutine_fn nbd_recv_coroutine_wake_one(NBDClientRequest *req)
>   {
>       if (req->receiving) {
> @@ -159,14 +153,15 @@ static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s, bool all)
>       }
>   }
>   
> -static void coroutine_fn nbd_channel_error(BDRVNBDState *s, int ret)
> +/* Called with s->requests_lock held.  */
> +static void coroutine_fn nbd_channel_error_locked(BDRVNBDState *s, int ret)
>   {
> -    if (nbd_client_connected(s)) {
> +    if (s->state == NBD_CLIENT_CONNECTED) {
>           qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
>       }
>   Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@openvz.org>
>       if (ret == -EIO) {
> -        if (nbd_client_connected(s)) {
> +        if (s->state == NBD_CLIENT_CONNECTED) {
>               s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
>                                               NBD_CLIENT_CONNECTING_NOWAIT;
>           }
> @@ -177,6 +172,12 @@ static void coroutine_fn nbd_channel_error(BDRVNBDState *s, int ret)
>       nbd_recv_coroutines_wake(s, true);
>   }
>   
> +static void coroutine_fn nbd_channel_error(BDRVNBDState *s, int ret)
> +{
> +    QEMU_LOCK_GUARD(&s->requests_lock);
> +    nbd_channel_error_locked(s, ret);
> +}
> +
>   static void reconnect_delay_timer_del(BDRVNBDState *s)
>   {
>       if (s->reconnect_delay_timer) {
> @@ -189,20 +190,18 @@ static void reconnect_delay_timer_cb(void *opaque)
>   {
>       BDRVNBDState *s = opaque;
>   
> -    if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
> -        s->state = NBD_CLIENT_CONNECTING_NOWAIT;
> -        nbd_co_establish_connection_cancel(s->conn);
> -    }
> -
>       reconnect_delay_timer_del(s);
> +    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
> +        if (s->state != NBD_CLIENT_CONNECTING_WAIT) {
> +            return;
> +        }
> +        s->state = NBD_CLIENT_CONNECTING_NOWAIT;
> +    }
> +    nbd_co_establish_connection_cancel(s->conn);
>   }
>   
>   static void reconnect_delay_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
>   {
> -    if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTING_WAIT) {
> -        return;
> -    }
> -
>       assert(!s->reconnect_delay_timer);
>       s->reconnect_delay_timer = aio_timer_new(bdrv_get_aio_context(s->bs),
>                                                QEMU_CLOCK_REALTIME,
> @@ -225,7 +224,9 @@ static void nbd_teardown_connection(BlockDriverState *bs)
>           s->ioc = NULL;
>       }
>   
> -    s->state = NBD_CLIENT_QUIT;
> +    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
> +        s->state = NBD_CLIENT_QUIT;
> +    }
>   }
>   
>   static void open_timer_del(BDRVNBDState *s)
> @@ -254,15 +255,15 @@ static void open_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
>       timer_mod(s->open_timer, expire_time_ns);
>   }
>   
> -static bool nbd_client_connecting_wait(BDRVNBDState *s)
> -{
> -    return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
> -}
> -
>   static bool nbd_client_will_reconnect(BDRVNBDState *s)
>   {
> -    return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
> +    /*
> +     * Called only after a socket error, so this is not performance sensitive.
> +     */
> +    QEMU_LOCK_GUARD(&s->requests_lock);
> +    return s->state == NBD_CLIENT_CONNECTING_WAIT;
>   }
> +
>   /*
>    * Update @bs with information learned during a completed negotiation process.
>    * Return failure if the server's advertised options are incompatible with the
> @@ -347,22 +348,24 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
>       qio_channel_attach_aio_context(s->ioc, bdrv_get_aio_context(bs));
>   
>       /* successfully connected */
> -    s->state = NBD_CLIENT_CONNECTED;
> +    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
> +        s->state = NBD_CLIENT_CONNECTED;
> +    }
>   
>       return 0;
>   }
>   
> +/* Called with s->requests_lock held.  */
>   static bool nbd_client_connecting(BDRVNBDState *s)
>   {
> -    NBDClientState state = qatomic_load_acquire(&s->state);
> -    return state == NBD_CLIENT_CONNECTING_WAIT ||
> -        state == NBD_CLIENT_CONNECTING_NOWAIT;
> +    return s->state == NBD_CLIENT_CONNECTING_WAIT ||
> +        s->state == NBD_CLIENT_CONNECTING_NOWAIT;
>   }
>   
>   /* Called with s->requests_lock taken.  */
>   static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
>   {
> -    bool blocking = nbd_client_connecting_wait(s);
> +    bool blocking = s->state == NBD_CLIENT_CONNECTING_WAIT;
>   
>       /*
>        * Now we are sure that nobody is accessing the channel, and no one will
> @@ -477,17 +480,17 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
>   
>       qemu_mutex_lock(&s->requests_lock);
>       while (s->in_flight == MAX_NBD_REQUESTS ||
> -           (!nbd_client_connected(s) && s->in_flight > 0)) {
> +           (s->state != NBD_CLIENT_CONNECTED && s->in_flight > 0)) {
>           qemu_co_queue_wait(&s->free_sema, &s->requests_lock);
>       }
>   
>       s->in_flight++;
> -    if (!nbd_client_connected(s)) {
> +    if (s->state != NBD_CLIENT_CONNECTED) {
>           if (nbd_client_connecting(s)) {
>               nbd_reconnect_attempt(s);
>               qemu_co_queue_restart_all(&s->free_sema);
>           }
> -        if (!nbd_client_connected(s)) {
> +        if (s->state != NBD_CLIENT_CONNECTED) {
>               rc = -EIO;
>               goto err;
>           }
> @@ -530,7 +533,7 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
>       if (rc < 0) {
>           qemu_mutex_lock(&s->requests_lock);
>   err:
> -        nbd_channel_error(s, rc);
> +        nbd_channel_error_locked(s, rc);
>           if (i != -1) {
>               s->requests[i].coroutine = NULL;
>           }
> @@ -1443,8 +1446,9 @@ static void nbd_yank(void *opaque)
>       BlockDriverState *bs = opaque;
>       BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
>   
> -    qatomic_store_release(&s->state, NBD_CLIENT_QUIT);
> +    QEMU_LOCK_GUARD(&s->requests_lock);
>       qio_channel_shutdown(QIO_CHANNEL(s->ioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
> +    s->state = NBD_CLIENT_QUIT;

This last addition looks like a fix, that could be a separate commit.

>   }
>   
>   static void nbd_client_close(BlockDriverState *bs)


-- 
Best regards,
Vladimir


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 8/9] nbd: take receive_mutex when reading requests[].receiving
  2022-04-14 17:57 ` [PATCH v2 for-7.1 8/9] nbd: take receive_mutex when reading requests[].receiving Paolo Bonzini
  2022-04-14 19:42   ` Eric Blake
@ 2022-04-16 13:54   ` Vladimir Sementsov-Ogievskiy
  1 sibling, 0 replies; 33+ messages in thread
From: Vladimir Sementsov-Ogievskiy @ 2022-04-16 13:54 UTC (permalink / raw)
  To: Paolo Bonzini, qemu-devel; +Cc: eblake

14.04.2022 20:57, Paolo Bonzini wrote:
> requests[].receiving is set by nbd_receive_replies() under the receive_mutex;
> Read it under the same mutex as well.  Waking up receivers on errors happens
> after each reply finishes processing, in nbd_co_receive_one_chunk().
> If there is no currently-active reply, there are two cases:
> 
> * either there is no active request at all, in which case no
> element of request[] can have .receiving = true
> 
> * or nbd_receive_replies() must be running and owns receive_mutex;
> in that case it will get back to nbd_co_receive_one_chunk() because
> the socket has been shutdown, and all waiting coroutines will wake up
> in turn.
> 

Seems that removing "nbd_recv_coroutines_wake(s, true);" from nbd_channel_error_locked() could be a separate preparing patch. It's a separate thing:
no sense to wake all receving coroutines on error, if they will wake each-other in a chain anyway..

> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>


Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@openvz.org>

> ---
>   block/nbd.c | 15 +++++++--------
>   1 file changed, 7 insertions(+), 8 deletions(-)
> 
> diff --git a/block/nbd.c b/block/nbd.c
> index b5aac2548c..31c684772e 100644
> --- a/block/nbd.c
> +++ b/block/nbd.c
> @@ -131,6 +131,7 @@ static void nbd_clear_bdrvstate(BlockDriverState *bs)
>       s->x_dirty_bitmap = NULL;
>   }
>   
> +/* Called with s->receive_mutex taken.  */
>   static bool coroutine_fn nbd_recv_coroutine_wake_one(NBDClientRequest *req)
>   {
>       if (req->receiving) {
> @@ -142,12 +143,13 @@ static bool coroutine_fn nbd_recv_coroutine_wake_one(NBDClientRequest *req)
>       return false;
>   }
>   
> -static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s, bool all)
> +static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s)
>   {
>       int i;
>   
> +    QEMU_LOCK_GUARD(&s->receive_mutex);
>       for (i = 0; i < MAX_NBD_REQUESTS; i++) {
> -        if (nbd_recv_coroutine_wake_one(&s->requests[i]) && !all) {
> +        if (nbd_recv_coroutine_wake_one(&s->requests[i])) {
>               return;
>           }
>       }
> @@ -168,8 +170,6 @@ static void coroutine_fn nbd_channel_error_locked(BDRVNBDState *s, int ret)
>       } else {
>           s->state = NBD_CLIENT_QUIT;
>       }
> -
> -    nbd_recv_coroutines_wake(s, true);
>   }
>   
>   static void coroutine_fn nbd_channel_error(BDRVNBDState *s, int ret)
> @@ -432,11 +432,10 @@ static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t handle)
>   
>               qemu_coroutine_yield();
>               /*
> -             * We may be woken for 3 reasons:
> +             * We may be woken for 2 reasons:
>                * 1. From this function, executing in parallel coroutine, when our
>                *    handle is received.
> -             * 2. From nbd_channel_error(), when connection is lost.
> -             * 3. From nbd_co_receive_one_chunk(), when previous request is
> +             * 2. From nbd_co_receive_one_chunk(), when previous request is
>                *    finished and s->reply.handle set to 0.
>                * Anyway, it's OK to lock the mutex and go to the next iteration.
>                */
> @@ -928,7 +927,7 @@ static coroutine_fn int nbd_co_receive_one_chunk(
>       }
>       s->reply.handle = 0;
>   
> -    nbd_recv_coroutines_wake(s, false);
> +    nbd_recv_coroutines_wake(s);
>   
>       return ret;
>   }


-- 
Best regards,
Vladimir


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 9/9] nbd: document what is protected by the CoMutexes
  2022-04-14 17:57 ` [PATCH v2 for-7.1 9/9] nbd: document what is protected by the CoMutexes Paolo Bonzini
  2022-04-14 19:42   ` Eric Blake
@ 2022-04-16 14:00   ` Vladimir Sementsov-Ogievskiy
  2022-04-23  8:41     ` Paolo Bonzini
  1 sibling, 1 reply; 33+ messages in thread
From: Vladimir Sementsov-Ogievskiy @ 2022-04-16 14:00 UTC (permalink / raw)
  To: Paolo Bonzini, qemu-devel; +Cc: eblake

14.04.2022 20:57, Paolo Bonzini wrote:
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>   block/nbd.c | 8 +++++++-
>   1 file changed, 7 insertions(+), 1 deletion(-)
> 
> diff --git a/block/nbd.c b/block/nbd.c
> index 31c684772e..d0d94b40bd 100644
> --- a/block/nbd.c
> +++ b/block/nbd.c
> @@ -81,12 +81,18 @@ typedef struct BDRVNBDState {
>       NBDClientRequest requests[MAX_NBD_REQUESTS];
>       QEMUTimer *reconnect_delay_timer;
>   
> +    /* Protects sending data on the socket.  */
>       CoMutex send_mutex;
> +
> +    /*
> +     * Protects receiving reply headers from the socket, as well as the
> +     * fields reply and requests[].receiving

I think, worth noting, that s->reply is used without mutex after nbd_receive_replies() success and till setting s->reply.handle to 0 in nbd_co_receive_one_chunk()..

Should "s->reply.handle = 0" be done under mutex as well? And may be, in same critical section as nbd_recv_coroutines_wake() ?

> +     */
>       CoMutex receive_mutex;
> +    NBDReply reply;
>   
>       QEMUTimer *open_timer;
>   
> -    NBDReply reply;
>       BlockDriverState *bs;
>   
>       /* Connection parameters */


-- 
Best regards,
Vladimir


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 0/9] nbd: actually make s->state thread-safe
  2022-04-14 17:57 [PATCH v2 for-7.1 0/9] nbd: actually make s->state thread-safe Paolo Bonzini
                   ` (8 preceding siblings ...)
  2022-04-14 17:57 ` [PATCH v2 for-7.1 9/9] nbd: document what is protected by the CoMutexes Paolo Bonzini
@ 2022-04-16 19:03 ` Lukas Straub
  2022-04-22 20:39   ` Eric Blake
  9 siblings, 1 reply; 33+ messages in thread
From: Lukas Straub @ 2022-04-16 19:03 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: v.sementsov-og, eblake, qemu-devel

[-- Attachment #1: Type: text/plain, Size: 1595 bytes --]

On Thu, 14 Apr 2022 19:57:47 +0200
Paolo Bonzini <pbonzini@redhat.com> wrote:

> The main point of this series is patch 7, which removes the dubious and
> probably wrong use of atomics in block/nbd.c.  This in turn is enabled
> mostly by the cleanups in patches 3-5.  Together, they introduce a
> QemuMutex that synchronizes the NBD client coroutines, the reconnect_delay
> timer and nbd_cancel_in_flight() as well.
> 
> The fixes happen to remove an incorrect use of qemu_co_queue_restart_all
> and qemu_co_enter_next on the s->free_sema CoQueue, which was not guarded
> by s->send_mutex.
> 
> The rest is bugfixes, simplifying the code a bit, and extra documentation.
> 
> v1->v2:
> - cleaner patch 1
> - fix grammar in patch 4
> - split out patch 6
> 
> Paolo Bonzini (9):
>   nbd: safeguard against waking up invalid coroutine
>   nbd: mark more coroutine_fns
>   nbd: remove peppering of nbd_client_connected
>   nbd: keep send_mutex/free_sema handling outside
>     nbd_co_do_establish_connection
>   nbd: use a QemuMutex to synchronize yanking, reconnection and
>     coroutines
>   nbd: code motion and function renaming
>   nbd: move s->state under requests_lock
>   nbd: take receive_mutex when reading requests[].receiving
>   nbd: document what is protected by the CoMutexes
> 
>  block/coroutines.h |   4 +-
>  block/nbd.c        | 298 +++++++++++++++++++++++----------------------
>  2 files changed, 154 insertions(+), 148 deletions(-)
> 

For the whole series:

Reviewed-by: Lukas Straub <lukasstraub2@web.de>

Regards,
Lukas

-- 


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 0/9] nbd: actually make s->state thread-safe
  2022-04-16 19:03 ` [PATCH v2 for-7.1 0/9] nbd: actually make s->state thread-safe Lukas Straub
@ 2022-04-22 20:39   ` Eric Blake
  0 siblings, 0 replies; 33+ messages in thread
From: Eric Blake @ 2022-04-22 20:39 UTC (permalink / raw)
  To: Lukas Straub; +Cc: Paolo Bonzini, v.sementsov-og, qemu-devel

On Sat, Apr 16, 2022 at 07:03:57PM +0000, Lukas Straub wrote:
> On Thu, 14 Apr 2022 19:57:47 +0200
> Paolo Bonzini <pbonzini@redhat.com> wrote:
> 
> > The main point of this series is patch 7, which removes the dubious and
> > probably wrong use of atomics in block/nbd.c.  This in turn is enabled
> > mostly by the cleanups in patches 3-5.  Together, they introduce a
> > QemuMutex that synchronizes the NBD client coroutines, the reconnect_delay
> > timer and nbd_cancel_in_flight() as well.
> > 
> > The fixes happen to remove an incorrect use of qemu_co_queue_restart_all
> > and qemu_co_enter_next on the s->free_sema CoQueue, which was not guarded
> > by s->send_mutex.
> > 
> > The rest is bugfixes, simplifying the code a bit, and extra documentation.

> For the whole series:
> 
> Reviewed-by: Lukas Straub <lukasstraub2@web.de>

I've queued the series through my NBD tree for a pull request in the next week.

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3266
Virtualization:  qemu.org | libvirt.org



^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 3/9] nbd: remove peppering of nbd_client_connected
  2022-04-15 17:01   ` Vladimir Sementsov-Ogievskiy
@ 2022-04-23  8:30     ` Paolo Bonzini
  0 siblings, 0 replies; 33+ messages in thread
From: Paolo Bonzini @ 2022-04-23  8:30 UTC (permalink / raw)
  To: Vladimir Sementsov-Ogievskiy, qemu-devel; +Cc: eblake

Hi,

thanks for the careful review and sorry I'm only replying now.

On 4/15/22 19:01, Vladimir Sementsov-Ogievskiy wrote:
>> @@ -982,11 +978,6 @@ static bool 
>> nbd_reply_chunk_iter_receive(BDRVNBDState *s,
>>       NBDReply local_reply;
>>       NBDStructuredReplyChunk *chunk;
>>       Error *local_err = NULL;
>> -    if (!nbd_client_connected(s)) {
>> -        error_setg(&local_err, "Connection closed");
>> -        nbd_iter_channel_error(iter, -EIO, &local_err);
>> -        goto break_loop;
>> -    }
> 
> Probably we should still check iter->ret here. It's strange to start new 
> iteration, when user set iter->ret in previous iteration of 
> NBD_FOREACH_REPLY_CHUNK()
> 
> Or, maybe we should set iter->done in nbd_iter_channel_error ?

Yes, this second one is a possibility.  I chose to check iter->ret below 
because it was a bit more self-contained ("before reading again check 
that the error code is not overwritten"), but it is also less obvious.

Paolo

>>       if (iter->done) {
>>           /* Previous iteration was last. */
>> @@ -1007,7 +998,7 @@ static bool 
>> nbd_reply_chunk_iter_receive(BDRVNBDState *s,
>>       }
>>       /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple 
>> reply. */
>> -    if (nbd_reply_is_simple(reply) || !nbd_client_connected(s)) {
>> +    if (nbd_reply_is_simple(reply) || iter->ret < 0) {
> 
> And then here, may be we can just goto break_loop from previous "if (ret 
> < 0)". Then we'll not have to check iter->ret.
> 
>>           goto break_loop;
>>       }
> 
> anyway:
> Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@openvz.org>
> 
> (a bit weak, as it really hard to imagine all these paths and possible 
> consequences :/
> 



^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 4/9] nbd: keep send_mutex/free_sema handling outside nbd_co_do_establish_connection
  2022-04-16 11:54   ` Vladimir Sementsov-Ogievskiy
@ 2022-04-23  8:34     ` Paolo Bonzini
  0 siblings, 0 replies; 33+ messages in thread
From: Paolo Bonzini @ 2022-04-23  8:34 UTC (permalink / raw)
  To: Vladimir Sementsov-Ogievskiy, qemu-devel; +Cc: eblake

On 4/16/22 13:54, Vladimir Sementsov-Ogievskiy wrote:
>> @@ -187,9 +187,6 @@ static void reconnect_delay_timer_cb(void *opaque)
>>       if (qatomic_load_acquire(&s->state) == 
>> NBD_CLIENT_CONNECTING_WAIT) {
>>           s->state = NBD_CLIENT_CONNECTING_NOWAIT;
>>           nbd_co_establish_connection_cancel(s->conn);
>> -        while (qemu_co_enter_next(&s->free_sema, NULL)) {
>> -            /* Resume all queued requests */
>> -        }
>>       }
>>       reconnect_delay_timer_del(s);
> 
> Seems, removing the timer is not needed here too. We do 
> nbd_co_establish_connection_cancel(), and timer will be removed in 
> nbd_reconnect_attempt anyway.
> 
> More over, we don't need to keep timer in the state at all: it should 
> become local thing for nbd_reconnect_attempt. A kind of call 
> nbd_co_do_establish_connection() with timeout. That could be refactored 
> later, using same way as in 4-5 patches of my "[PATCH v4 0/7] 
> copy-before-write: on-cbw-error and cbw-timeout" series.

Yes, good point.

>> nbd_reconnect_attempt(BDRVNBDState *s)
>>           s->ioc = NULL;
>>       }
>> -    nbd_co_do_establish_connection(s->bs, NULL);
>> +    qemu_co_mutex_unlock(&s->send_mutex);
>> +    nbd_co_do_establish_connection(s->bs, blocking, NULL);
>> +    qemu_co_mutex_lock(&s->send_mutex);
> 
> 
> Hmm. So, that breaks a critical section.
> 
> We can do it because in_flight=1 and we are not connected => all other 
> requests will wait in the queue.
> 
> Still. during nbd_co_do_establish_connection() state may become 
> CONNECTED. That theoretically means that parallel requests may start.
> 
> Is it bad? Seems not.. Bad thing that comes into my mind is that 
> parallel request fails, and go to reconnect, and start own timer, but we 
> remove it now after locking the mutex back. But that's not possible as 
> parallel request should wait for in-flight=0 to start own 
> reconnect-attempt. And that is not possible, as we keep our in-flight 
> point.

Yes that was even intended. :>  Synchronizing on in_flight == 0 before 
reconnecting is the important bit that simplifies a lot of things.

>> @@ -2049,7 +2046,6 @@ static void 
>> nbd_cancel_in_flight(BlockDriverState *bs)
>>       if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
>>           s->state = NBD_CLIENT_CONNECTING_NOWAIT;
>> -        qemu_co_queue_restart_all(&s->free_sema);
> 
> As I understand, we always have one request running (or no requests at 
> all, but that's OK too) and it will wake all others (altogether, or they 
> will wake each other in a chain). So, we don't need to wake them here.

Exactly, the "let each coroutine wake up the next one" pattern can be 
generalized to the error cases because the wakeups cascade until 
in_flight becomes 0.

Paolo

>>       }
>>       nbd_co_establish_connection_cancel(s->conn);
> 
> 



^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 5/9] nbd: use a QemuMutex to synchronize yanking, reconnection and coroutines
  2022-04-16 12:18   ` Vladimir Sementsov-Ogievskiy
@ 2022-04-23  8:37     ` Paolo Bonzini
  0 siblings, 0 replies; 33+ messages in thread
From: Paolo Bonzini @ 2022-04-23  8:37 UTC (permalink / raw)
  To: Vladimir Sementsov-Ogievskiy, qemu-devel; +Cc: eblake

On 4/16/22 14:18, Vladimir Sementsov-Ogievskiy wrote:
> 14.04.2022 20:57, Paolo Bonzini wrote:
>> The condition for waiting on the s->free_sema queue depends on
>> both s->in_flight and s->state.  The latter is currently using
>> atomics, but this is quite dubious and probably wrong.
>>
>> Because s->state is written in the main thread too, for example by
>> the yank callback, it cannot be protected by a CoMutex.  Introduce a
>> separate lock that can be used by nbd_co_send_request(); later on this
>> lock will also be used for s->state.  There will not be any contention
>> on the lock unless there is a yank or reconnect, so this is not
>> performance sensitive.
>>
>> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
>> ---
>>   block/nbd.c | 46 +++++++++++++++++++++++++++-------------------
>>   1 file changed, 27 insertions(+), 19 deletions(-)
>>
>> diff --git a/block/nbd.c b/block/nbd.c
>> index 62dd338ef3..a2414566d1 100644
>> --- a/block/nbd.c
>> +++ b/block/nbd.c
>> @@ -71,17 +71,22 @@ typedef struct BDRVNBDState {
>>       QIOChannel *ioc; /* The current I/O channel */
>>       NBDExportInfo info;
>> -    CoMutex send_mutex;
>> +    /*
>> +     * Protects free_sema, in_flight, requests[].coroutine,
>> +     * reconnect_delay_timer.
>> +     */
> 
> requests[].coroutine is read without mutex in nbd_receive_replies
> 
> reconnect_delay_timer_del() is called without mutex in 
> nbd_cancel_in_flight() and in reconnect_delay_timer_cb()..
> 
> Is it OK? Worth improving the comment?

Yeah, let me check the reconnect_delay_timer idea you had in the 
previous patch, too.

> Prior to this patch, if request sending fails, we'll not send further 
> requests. After the patch, we can send more requests after failure on 
> unlocking send_mutex.
>
> What's wrong if we keep send_mutex critical section as is and just lock 
> requests_lock additionally inside send_mutex-critical-section?

I wanted to keep send_mutex just for the socket, basically.  The cse you 
point out exists but it is harmless.

Paolo


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 6/9] nbd: code motion and function renaming
  2022-04-16 12:37   ` Vladimir Sementsov-Ogievskiy
@ 2022-04-23  8:38     ` Paolo Bonzini
  0 siblings, 0 replies; 33+ messages in thread
From: Paolo Bonzini @ 2022-04-23  8:38 UTC (permalink / raw)
  To: Vladimir Sementsov-Ogievskiy, qemu-devel; +Cc: eblake

On 4/16/22 14:37, Vladimir Sementsov-Ogievskiy wrote:
> 14.04.2022 20:57, Paolo Bonzini wrote:
>> Prepare for the next patch, so that the diff is less confusing.
>>
>> nbd_client_connecting is moved closer to the definition point.
> 
> Amm. To usage-point you mean?
> The original idea was to keep simple state-reading helpers definitions 
> together :)

Yes and it makes sense.  The new idea is to keep requests_lock functions 
together instead. :)

Paolo


>>
>> nbd_client_connecting_wait() is kept only for the reconnection
>> logic; when it is used to check if a request has to be reissued,
>> use the renamed function nbd_client_will_reconnect().  In the
>> next patch, the two cases will have different locking requirements.
>>
>> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> 
> Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@openvz.org>
> 
>> ---
>>   block/nbd.c | 24 ++++++++++++++----------
>>   1 file changed, 14 insertions(+), 10 deletions(-)
>>
>> diff --git a/block/nbd.c b/block/nbd.c
>> index a2414566d1..37d466e435 100644
>> --- a/block/nbd.c
>> +++ b/block/nbd.c
>> @@ -254,18 +254,15 @@ static void open_timer_init(BDRVNBDState *s, 
>> uint64_t expire_time_ns)
>>       timer_mod(s->open_timer, expire_time_ns);
>>   }
>> -static bool nbd_client_connecting(BDRVNBDState *s)
>> -{
>> -    NBDClientState state = qatomic_load_acquire(&s->state);
>> -    return state == NBD_CLIENT_CONNECTING_WAIT ||
>> -        state == NBD_CLIENT_CONNECTING_NOWAIT;
>> -}
>> -
>>   static bool nbd_client_connecting_wait(BDRVNBDState *s)
>>   {
>>       return qatomic_load_acquire(&s->state) == 
>> NBD_CLIENT_CONNECTING_WAIT;
>>   }
>> +static bool nbd_client_will_reconnect(BDRVNBDState *s)
>> +{
>> +    return qatomic_load_acquire(&s->state) == 
>> NBD_CLIENT_CONNECTING_WAIT;
>> +}
>>   /*
>>    * Update @bs with information learned during a completed 
>> negotiation process.
>>    * Return failure if the server's advertised options are 
>> incompatible with the
>> @@ -355,6 +352,13 @@ int coroutine_fn 
>> nbd_co_do_establish_connection(BlockDriverState *bs,
>>       return 0;
>>   }
>> +static bool nbd_client_connecting(BDRVNBDState *s)
>> +{
>> +    NBDClientState state = qatomic_load_acquire(&s->state);
>> +    return state == NBD_CLIENT_CONNECTING_WAIT ||
>> +        state == NBD_CLIENT_CONNECTING_NOWAIT;
>> +}
>> +
>>   /* Called with s->requests_lock taken.  */
>>   static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
>>   {
>> @@ -1190,7 +1194,7 @@ static int coroutine_fn 
>> nbd_co_request(BlockDriverState *bs, NBDRequest *request
>>               error_free(local_err);
>>               local_err = NULL;
>>           }
>> -    } while (ret < 0 && nbd_client_connecting_wait(s));
>> +    } while (ret < 0 && nbd_client_will_reconnect(s));
>>       return ret ? ret : request_ret;
>>   }
>> @@ -1249,7 +1253,7 @@ static int coroutine_fn 
>> nbd_client_co_preadv(BlockDriverState *bs, int64_t offse
>>               error_free(local_err);
>>               local_err = NULL;
>>           }
>> -    } while (ret < 0 && nbd_client_connecting_wait(s));
>> +    } while (ret < 0 && nbd_client_will_reconnect(s));
>>       return ret ? ret : request_ret;
>>   }
>> @@ -1407,7 +1411,7 @@ static int coroutine_fn nbd_client_co_block_status(
>>               error_free(local_err);
>>               local_err = NULL;
>>           }
>> -    } while (ret < 0 && nbd_client_connecting_wait(s));
>> +    } while (ret < 0 && nbd_client_will_reconnect(s));
>>       if (ret < 0 || request_ret < 0) {
>>           return ret ? ret : request_ret;
> 
> 



^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [PATCH v2 for-7.1 9/9] nbd: document what is protected by the CoMutexes
  2022-04-16 14:00   ` Vladimir Sementsov-Ogievskiy
@ 2022-04-23  8:41     ` Paolo Bonzini
  0 siblings, 0 replies; 33+ messages in thread
From: Paolo Bonzini @ 2022-04-23  8:41 UTC (permalink / raw)
  To: Vladimir Sementsov-Ogievskiy, qemu-devel; +Cc: eblake

On 4/16/22 16:00, Vladimir Sementsov-Ogievskiy wrote:
> 14.04.2022 20:57, Paolo Bonzini wrote:
>> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
>> ---
>>   block/nbd.c | 8 +++++++-
>>   1 file changed, 7 insertions(+), 1 deletion(-)
>>
>> diff --git a/block/nbd.c b/block/nbd.c
>> index 31c684772e..d0d94b40bd 100644
>> --- a/block/nbd.c
>> +++ b/block/nbd.c
>> @@ -81,12 +81,18 @@ typedef struct BDRVNBDState {
>>       NBDClientRequest requests[MAX_NBD_REQUESTS];
>>       QEMUTimer *reconnect_delay_timer;
>> +    /* Protects sending data on the socket.  */
>>       CoMutex send_mutex;
>> +
>> +    /*
>> +     * Protects receiving reply headers from the socket, as well as the
>> +     * fields reply and requests[].receiving
> 
> I think, worth noting, that s->reply is used without mutex after 
> nbd_receive_replies() success and till setting s->reply.handle to 0 in 
> nbd_co_receive_one_chunk()..
> 
> Should "s->reply.handle = 0" be done under mutex as well? And may be, in 
> same critical section as nbd_recv_coroutines_wake() ?

Could be an idea.  It could also be a store-release but no reason to be 
fancy:

diff --git a/block/nbd.c b/block/nbd.c
index 0bd9b674a9..cd760bfd50 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -149,11 +149,11 @@ static bool coroutine_fn 
nbd_recv_coroutine_wake_one(NBDClientRequest *req)
      return false;
  }

+/* Called with s->receive_mutex taken.  */
  static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s)
  {
      int i;

-    QEMU_LOCK_GUARD(&s->receive_mutex);
      for (i = 0; i < MAX_NBD_REQUESTS; i++) {
          if (nbd_recv_coroutine_wake_one(&s->requests[i])) {
              return;
@@ -924,9 +924,11 @@ static coroutine_fn int nbd_co_receive_one_chunk(
          /* For assert at loop start in nbd_connection_entry */
          *reply = s->reply;
      }
-    s->reply.handle = 0;

-    nbd_recv_coroutines_wake(s);
+    WITH_QEMU_LOCK_GUARD(&s->receive_mutex) {
+        s->reply.handle = 0;
+        nbd_recv_coroutines_wake(s);
+    }

      return ret;
  }

Paolo

>> +     */
>>       CoMutex receive_mutex;
>> +    NBDReply reply;
>>       QEMUTimer *open_timer;
>> -    NBDReply reply;
>>       BlockDriverState *bs;
>>       /* Connection parameters */
> 
> 



^ permalink raw reply related	[flat|nested] 33+ messages in thread

end of thread, other threads:[~2022-04-23  8:42 UTC | newest]

Thread overview: 33+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-04-14 17:57 [PATCH v2 for-7.1 0/9] nbd: actually make s->state thread-safe Paolo Bonzini
2022-04-14 17:57 ` [PATCH v2 for-7.1 1/9] nbd: safeguard against waking up invalid coroutine Paolo Bonzini
2022-04-14 19:02   ` Eric Blake
2022-04-15  9:53   ` Vladimir Sementsov-Ogievskiy
2022-04-14 17:57 ` [PATCH v2 for-7.1 2/9] nbd: mark more coroutine_fns Paolo Bonzini
2022-04-15 10:12   ` Vladimir Sementsov-Ogievskiy
2022-04-14 17:57 ` [PATCH v2 for-7.1 3/9] nbd: remove peppering of nbd_client_connected Paolo Bonzini
2022-04-15 17:01   ` Vladimir Sementsov-Ogievskiy
2022-04-23  8:30     ` Paolo Bonzini
2022-04-14 17:57 ` [PATCH v2 for-7.1 4/9] nbd: keep send_mutex/free_sema handling outside nbd_co_do_establish_connection Paolo Bonzini
2022-04-14 19:11   ` Eric Blake
2022-04-16 11:54   ` Vladimir Sementsov-Ogievskiy
2022-04-23  8:34     ` Paolo Bonzini
2022-04-14 17:57 ` [PATCH v2 for-7.1 5/9] nbd: use a QemuMutex to synchronize yanking, reconnection and coroutines Paolo Bonzini
2022-04-14 19:33   ` Eric Blake
2022-04-16 12:18   ` Vladimir Sementsov-Ogievskiy
2022-04-23  8:37     ` Paolo Bonzini
2022-04-14 17:57 ` [PATCH v2 for-7.1 6/9] nbd: code motion and function renaming Paolo Bonzini
2022-04-14 19:37   ` Eric Blake
2022-04-16 12:37   ` Vladimir Sementsov-Ogievskiy
2022-04-23  8:38     ` Paolo Bonzini
2022-04-14 17:57 ` [PATCH v2 for-7.1 7/9] nbd: move s->state under requests_lock Paolo Bonzini
2022-04-14 19:40   ` Eric Blake
2022-04-16 13:37   ` Vladimir Sementsov-Ogievskiy
2022-04-14 17:57 ` [PATCH v2 for-7.1 8/9] nbd: take receive_mutex when reading requests[].receiving Paolo Bonzini
2022-04-14 19:42   ` Eric Blake
2022-04-16 13:54   ` Vladimir Sementsov-Ogievskiy
2022-04-14 17:57 ` [PATCH v2 for-7.1 9/9] nbd: document what is protected by the CoMutexes Paolo Bonzini
2022-04-14 19:42   ` Eric Blake
2022-04-16 14:00   ` Vladimir Sementsov-Ogievskiy
2022-04-23  8:41     ` Paolo Bonzini
2022-04-16 19:03 ` [PATCH v2 for-7.1 0/9] nbd: actually make s->state thread-safe Lukas Straub
2022-04-22 20:39   ` Eric Blake

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.