All of lore.kernel.org
 help / color / mirror / Atom feed
* [Qemu-devel] [RFC PATCH 00/18] Fine-grained AioContext critical sections
@ 2015-08-06 13:35 Paolo Bonzini
  2015-08-06 13:35 ` [Qemu-devel] [PATCH 01/18] iothread: release iothread around aio_poll Paolo Bonzini
                   ` (17 more replies)
  0 siblings, 18 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-08-06 13:35 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, stefanha, qemu-block

This series represents a major enabler for two AioContext projects:
true multiqueue (i.e. multiple AioContexts for the same BDS) and
thread-safe virtio-scsi dataplane.

The patches decouple AioContext's internal locking for the BDS's locking
in two steps.  Patches 1 to 9 make aio_poll and the iothread run
without taking any external locks---only user callbacks are protected
by aio_context_acquire/aio_context_release.  Patches 12 to 16 then push
the lock down to the callbacks themselves, so that the AioContext lock
becomes nothing more than a convenience.

For multiqueue the next step would be to have fine-grained locks so
that e.g. each dataplane thread can proceed independently.  For
thread-safe virtio-scsi dataplane, most of dma-helpers.c is already
running without taking the AioContext lock, and all that's left is to
ensure that dma-helpers.c code is _always_ run without that lock
taken.  Of course the latter is a much smaller endeavor.

I'm fairly confident that patches 1 to 11 are correct and would like to
get them in before patches 12 to 18.  (This second half is much less
tested, especially I have not tested Win32 nor quorum).  I'm
nevertheless posting the whole thing as RFC and not just for review,
since I want to discuss this and the next steps forward at KVM Forum.

Applies on top of the three SCSI patches I have just sent.

Paolo

Paolo Bonzini (18):
  iothread: release iothread around aio_poll
  aio: rename bh_lock to list_lock
  qemu-thread: introduce QemuLockCnt
  aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh
  aio: tweak walking in dispatch phase
  aio-posix: remove walking_handlers, protecting AioHandler list with list_lock
  aio-win32: remove walking_handlers, protecting AioHandler list with list_lock
  aio: document locking
  aio: push aio_context_acquire/release down to dispatching
  async: optimize aio_bh_poll
  qemu-timer: optimize timerlist_run_timers
  block: explicitly acquire aiocontext in callbacks that need it
  block: explicitly acquire aiocontext in bottom halves that need it
  block: explicitly acquire aiocontext in timers that need it
  quorum: use atomics for rewrite_count
  quorum: split quorum_fifo_aio_cb from quorum_aio_cb
  block: explicitly acquire aiocontext in aio callbacks that need it
  aio: update locking documentation

 aio-posix.c                     |  78 +++++-----
 aio-win32.c                     |  97 +++++++------
 async.c                         |  63 ++++----
 block/blkverify.c               |   6 +-
 block/curl.c                    |  43 ++++--
 block/gluster.c                 |   2 +
 block/io.c                      |   7 +
 block/iscsi.c                   |  10 ++
 block/linux-aio.c               |  14 +-
 block/mirror.c                  |  12 +-
 block/nbd-client.c              |  14 +-
 block/nfs.c                     |  10 ++
 block/qed.c                     |   2 +
 block/quorum.c                  |  60 +++++---
 block/sheepdog.c                |  29 ++--
 block/ssh.c                     |  45 +++---
 block/throttle-groups.c         |   2 +
 block/win32-aio.c               |   8 +-
 dma-helpers.c                   |   7 +-
 docs/lockcnt.txt                | 308 ++++++++++++++++++++++++++++++++++++++++
 docs/multiple-iothreads.txt     |  95 ++++++++++---
 hw/block/dataplane/virtio-blk.c |   2 +
 hw/block/virtio-blk.c           |   8 ++
 hw/scsi/scsi-bus.c              |   2 +
 hw/scsi/scsi-disk.c             |  18 +++
 hw/scsi/scsi-generic.c          |  20 ++-
 hw/scsi/virtio-scsi-dataplane.c |   6 +
 include/block/aio.h             |  33 ++---
 include/qemu/thread.h           |  17 +++
 iothread.c                      |  11 +-
 nbd.c                           |   4 +
 qemu-coroutine-sleep.c          |   5 +
 qemu-timer.c                    |   4 +
 tests/test-aio.c                |  19 +--
 thread-pool.c                   |  14 +-
 util/Makefile.objs              |   2 +-
 util/lockcnt.c                  | 123 ++++++++++++++++
 37 files changed, 933 insertions(+), 267 deletions(-)
 create mode 100644 docs/lockcnt.txt
 create mode 100644 util/lockcnt.c

-- 
2.4.3

^ permalink raw reply	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 01/18] iothread: release iothread around aio_poll
  2015-08-06 13:35 [Qemu-devel] [RFC PATCH 00/18] Fine-grained AioContext critical sections Paolo Bonzini
@ 2015-08-06 13:35 ` Paolo Bonzini
  2015-09-09  6:06   ` Fam Zheng
  2015-09-28  9:50   ` Stefan Hajnoczi
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 02/18] aio: rename bh_lock to list_lock Paolo Bonzini
                   ` (16 subsequent siblings)
  17 siblings, 2 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-08-06 13:35 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, stefanha, qemu-block

This is the first step towards having fine-grained critical sections in
dataplane threads, which resolves lock ordering problems between
address_space_* functions (which need the BQL when doing MMIO, even
after we complete RCU-based dispatch) and the AioContext.

Because AioContext does not use contention callbacks anymore, the
unit test has to be changed.

Previously applied as a0710f7995f914e3044e5899bd8ff6c43c62f916 and
then reverted.

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 async.c                     | 22 +++-------------------
 docs/multiple-iothreads.txt | 25 +++++++++++++++----------
 include/block/aio.h         |  3 ---
 iothread.c                  | 11 ++---------
 tests/test-aio.c            | 19 +++++++++++--------
 5 files changed, 31 insertions(+), 49 deletions(-)

diff --git a/async.c b/async.c
index efce14b..f992914 100644
--- a/async.c
+++ b/async.c
@@ -79,8 +79,8 @@ int aio_bh_poll(AioContext *ctx)
          * aio_notify again if necessary.
          */
         if (!bh->deleted && atomic_xchg(&bh->scheduled, 0)) {
-            /* Idle BHs and the notify BH don't count as progress */
-            if (!bh->idle && bh != ctx->notify_dummy_bh) {
+            /* Idle BHs don't count as progress */
+            if (!bh->idle) {
                 ret = 1;
             }
             bh->idle = 0;
@@ -232,7 +232,6 @@ aio_ctx_finalize(GSource     *source)
 {
     AioContext *ctx = (AioContext *) source;
 
-    qemu_bh_delete(ctx->notify_dummy_bh);
     thread_pool_free(ctx->thread_pool);
 
     qemu_mutex_lock(&ctx->bh_lock);
@@ -299,19 +298,6 @@ static void aio_timerlist_notify(void *opaque)
     aio_notify(opaque);
 }
 
-static void aio_rfifolock_cb(void *opaque)
-{
-    AioContext *ctx = opaque;
-
-    /* Kick owner thread in case they are blocked in aio_poll() */
-    qemu_bh_schedule(ctx->notify_dummy_bh);
-}
-
-static void notify_dummy_bh(void *opaque)
-{
-    /* Do nothing, we were invoked just to force the event loop to iterate */
-}
-
 static void event_notifier_dummy_cb(EventNotifier *e)
 {
 }
@@ -333,11 +319,9 @@ AioContext *aio_context_new(Error **errp)
                            event_notifier_dummy_cb);
     ctx->thread_pool = NULL;
     qemu_mutex_init(&ctx->bh_lock);
-    rfifolock_init(&ctx->lock, aio_rfifolock_cb, ctx);
+    rfifolock_init(&ctx->lock, NULL, NULL);
     timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 
-    ctx->notify_dummy_bh = aio_bh_new(ctx, notify_dummy_bh, NULL);
-
     return ctx;
 }
 
diff --git a/docs/multiple-iothreads.txt b/docs/multiple-iothreads.txt
index 40b8419..723cc7e 100644
--- a/docs/multiple-iothreads.txt
+++ b/docs/multiple-iothreads.txt
@@ -105,13 +105,10 @@ a BH in the target AioContext beforehand and then call qemu_bh_schedule().  No
 acquire/release or locking is needed for the qemu_bh_schedule() call.  But be
 sure to acquire the AioContext for aio_bh_new() if necessary.
 
-The relationship between AioContext and the block layer
--------------------------------------------------------
-The AioContext originates from the QEMU block layer because it provides a
-scoped way of running event loop iterations until all work is done.  This
-feature is used to complete all in-flight block I/O requests (see
-bdrv_drain_all()).  Nowadays AioContext is a generic event loop that can be
-used by any QEMU subsystem.
+AioContext and the block layer
+------------------------------
+The AioContext originates from the QEMU block layer, even though nowadays
+AioContext is a generic event loop that can be used by any QEMU subsystem.
 
 The block layer has support for AioContext integrated.  Each BlockDriverState
 is associated with an AioContext using bdrv_set_aio_context() and
@@ -122,9 +119,17 @@ Block layer code must therefore expect to run in an IOThread and avoid using
 old APIs that implicitly use the main loop.  See the "How to program for
 IOThreads" above for information on how to do that.
 
-If main loop code such as a QMP function wishes to access a BlockDriverState it
-must first call aio_context_acquire(bdrv_get_aio_context(bs)) to ensure the
-IOThread does not run in parallel.
+If main loop code such as a QMP function wishes to access a BlockDriverState
+it must first call aio_context_acquire(bdrv_get_aio_context(bs)) to ensure
+that callbacks in the IOThread do not run in parallel.
+
+Code running in the monitor typically uses bdrv_drain() to ensure that
+past requests from the guest are completed.  When a block device is
+running in an IOThread, the IOThread can also process requests from
+the guest (via ioeventfd).  These requests have to be blocked with
+aio_disable_clients() before calling bdrv_drain().  You can then reenable
+guest requests with aio_enable_clients() before finally releasing the
+AioContext and completing the monitor command.
 
 Long-running jobs (usually in the form of coroutines) are best scheduled in the
 BlockDriverState's AioContext to avoid the need to acquire/release around each
diff --git a/include/block/aio.h b/include/block/aio.h
index 400b1b0..9dd32e0 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -114,9 +114,6 @@ struct AioContext {
     bool notified;
     EventNotifier notifier;
 
-    /* Scheduling this BH forces the event loop it iterate */
-    QEMUBH *notify_dummy_bh;
-
     /* Thread pool for performing work and receiving completion callbacks */
     struct ThreadPool *thread_pool;
 
diff --git a/iothread.c b/iothread.c
index da6ce7b..8f6d2e4 100644
--- a/iothread.c
+++ b/iothread.c
@@ -30,7 +30,6 @@ typedef ObjectClass IOThreadClass;
 static void *iothread_run(void *opaque)
 {
     IOThread *iothread = opaque;
-    bool blocking;
 
     rcu_register_thread();
 
@@ -39,14 +38,8 @@ static void *iothread_run(void *opaque)
     qemu_cond_signal(&iothread->init_done_cond);
     qemu_mutex_unlock(&iothread->init_done_lock);
 
-    while (!iothread->stopping) {
-        aio_context_acquire(iothread->ctx);
-        blocking = true;
-        while (!iothread->stopping && aio_poll(iothread->ctx, blocking)) {
-            /* Progress was made, keep going */
-            blocking = false;
-        }
-        aio_context_release(iothread->ctx);
+    while (!atomic_read(&iothread->stopping)) {
+        aio_poll(iothread->ctx, true);
     }
 
     rcu_unregister_thread();
diff --git a/tests/test-aio.c b/tests/test-aio.c
index 217e337..b4bd3f1 100644
--- a/tests/test-aio.c
+++ b/tests/test-aio.c
@@ -99,6 +99,7 @@ static void event_ready_cb(EventNotifier *e)
 
 typedef struct {
     QemuMutex start_lock;
+    EventNotifier notifier;
     bool thread_acquired;
 } AcquireTestData;
 
@@ -110,6 +111,8 @@ static void *test_acquire_thread(void *opaque)
     qemu_mutex_lock(&data->start_lock);
     qemu_mutex_unlock(&data->start_lock);
 
+    g_usleep(500000);
+    event_notifier_set(&data->notifier);
     aio_context_acquire(ctx);
     aio_context_release(ctx);
 
@@ -118,20 +121,19 @@ static void *test_acquire_thread(void *opaque)
     return NULL;
 }
 
-static void dummy_notifier_read(EventNotifier *unused)
+static void dummy_notifier_read(EventNotifier *n)
 {
-    g_assert(false); /* should never be invoked */
+    event_notifier_test_and_clear(n);
 }
 
 static void test_acquire(void)
 {
     QemuThread thread;
-    EventNotifier notifier;
     AcquireTestData data;
 
     /* Dummy event notifier ensures aio_poll() will block */
-    event_notifier_init(&notifier, false);
-    aio_set_event_notifier(ctx, &notifier, dummy_notifier_read);
+    event_notifier_init(&data.notifier, false);
+    aio_set_event_notifier(ctx, &data.notifier, dummy_notifier_read);
     g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */
 
     qemu_mutex_init(&data.start_lock);
@@ -145,12 +147,13 @@ static void test_acquire(void)
     /* Block in aio_poll(), let other thread kick us and acquire context */
     aio_context_acquire(ctx);
     qemu_mutex_unlock(&data.start_lock); /* let the thread run */
-    g_assert(!aio_poll(ctx, true));
+    g_assert(aio_poll(ctx, true));
+    g_assert(!data.thread_acquired);
     aio_context_release(ctx);
 
     qemu_thread_join(&thread);
-    aio_set_event_notifier(ctx, &notifier, NULL);
-    event_notifier_cleanup(&notifier);
+    aio_set_event_notifier(ctx, &data.notifier, NULL);
+    event_notifier_cleanup(&data.notifier);
 
     g_assert(data.thread_acquired);
 }
-- 
2.4.3

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 02/18] aio: rename bh_lock to list_lock
  2015-08-06 13:35 [Qemu-devel] [RFC PATCH 00/18] Fine-grained AioContext critical sections Paolo Bonzini
  2015-08-06 13:35 ` [Qemu-devel] [PATCH 01/18] iothread: release iothread around aio_poll Paolo Bonzini
@ 2015-08-06 13:36 ` Paolo Bonzini
  2015-09-09  6:08   ` Fam Zheng
  2015-09-28 10:09   ` Stefan Hajnoczi
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 03/18] qemu-thread: introduce QemuLockCnt Paolo Bonzini
                   ` (15 subsequent siblings)
  17 siblings, 2 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-08-06 13:36 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, stefanha, qemu-block

This will be used for AioHandlers too.  There is going to be little
or no contention, so it is better to reuse the same lock.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 async.c             | 16 ++++++++--------
 include/block/aio.h |  2 +-
 2 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/async.c b/async.c
index f992914..7e97614 100644
--- a/async.c
+++ b/async.c
@@ -50,12 +50,12 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
         .cb = cb,
         .opaque = opaque,
     };
-    qemu_mutex_lock(&ctx->bh_lock);
+    qemu_mutex_lock(&ctx->list_lock);
     bh->next = ctx->first_bh;
     /* Make sure that the members are ready before putting bh into list */
     smp_wmb();
     ctx->first_bh = bh;
-    qemu_mutex_unlock(&ctx->bh_lock);
+    qemu_mutex_unlock(&ctx->list_lock);
     return bh;
 }
 
@@ -92,7 +92,7 @@ int aio_bh_poll(AioContext *ctx)
 
     /* remove deleted bhs */
     if (!ctx->walking_bh) {
-        qemu_mutex_lock(&ctx->bh_lock);
+        qemu_mutex_lock(&ctx->list_lock);
         bhp = &ctx->first_bh;
         while (*bhp) {
             bh = *bhp;
@@ -103,7 +103,7 @@ int aio_bh_poll(AioContext *ctx)
                 bhp = &bh->next;
             }
         }
-        qemu_mutex_unlock(&ctx->bh_lock);
+        qemu_mutex_unlock(&ctx->list_lock);
     }
 
     return ret;
@@ -234,7 +234,7 @@ aio_ctx_finalize(GSource     *source)
 
     thread_pool_free(ctx->thread_pool);
 
-    qemu_mutex_lock(&ctx->bh_lock);
+    qemu_mutex_lock(&ctx->list_lock);
     while (ctx->first_bh) {
         QEMUBH *next = ctx->first_bh->next;
 
@@ -244,12 +244,12 @@ aio_ctx_finalize(GSource     *source)
         g_free(ctx->first_bh);
         ctx->first_bh = next;
     }
-    qemu_mutex_unlock(&ctx->bh_lock);
+    qemu_mutex_unlock(&ctx->list_lock);
 
     aio_set_event_notifier(ctx, &ctx->notifier, NULL);
     event_notifier_cleanup(&ctx->notifier);
     rfifolock_destroy(&ctx->lock);
-    qemu_mutex_destroy(&ctx->bh_lock);
+    qemu_mutex_destroy(&ctx->list_lock);
     timerlistgroup_deinit(&ctx->tlg);
 }
 
@@ -318,7 +318,7 @@ AioContext *aio_context_new(Error **errp)
                            (EventNotifierHandler *)
                            event_notifier_dummy_cb);
     ctx->thread_pool = NULL;
-    qemu_mutex_init(&ctx->bh_lock);
+    qemu_mutex_init(&ctx->list_lock);
     rfifolock_init(&ctx->lock, NULL, NULL);
     timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 
diff --git a/include/block/aio.h b/include/block/aio.h
index 9dd32e0..026f6c1 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -89,7 +89,7 @@ struct AioContext {
     uint32_t notify_me;
 
     /* lock to protect between bh's adders and deleter */
-    QemuMutex bh_lock;
+    QemuMutex list_lock;
 
     /* Anchor of the list of Bottom Halves belonging to the context */
     struct QEMUBH *first_bh;
-- 
2.4.3

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 03/18] qemu-thread: introduce QemuLockCnt
  2015-08-06 13:35 [Qemu-devel] [RFC PATCH 00/18] Fine-grained AioContext critical sections Paolo Bonzini
  2015-08-06 13:35 ` [Qemu-devel] [PATCH 01/18] iothread: release iothread around aio_poll Paolo Bonzini
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 02/18] aio: rename bh_lock to list_lock Paolo Bonzini
@ 2015-08-06 13:36 ` Paolo Bonzini
  2015-09-09  8:49   ` Fam Zheng
  2015-09-28 10:15   ` Stefan Hajnoczi
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 04/18] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh Paolo Bonzini
                   ` (14 subsequent siblings)
  17 siblings, 2 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-08-06 13:36 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, stefanha, qemu-block

A QemuLockCnt comprises a counter and a mutex, with primitives
to increment and decrement the counter, and to take and release the
mutex.  It can be used to do lock-free visits to a data structure
whenever mutexes would be too heavy-weight and the critical section
is too long for RCU.

This could be implemented simply by protecting the counter with the
mutex, but QemuLockCnt is harder to misuse and more efficient.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/lockcnt.txt      | 308 ++++++++++++++++++++++++++++++++++++++++++++++++++
 include/qemu/thread.h |  17 +++
 util/Makefile.objs    |   2 +-
 util/lockcnt.c        | 123 ++++++++++++++++++++
 4 files changed, 449 insertions(+), 1 deletion(-)
 create mode 100644 docs/lockcnt.txt
 create mode 100644 util/lockcnt.c

diff --git a/docs/lockcnt.txt b/docs/lockcnt.txt
new file mode 100644
index 0000000..2a72002
--- /dev/null
+++ b/docs/lockcnt.txt
@@ -0,0 +1,308 @@
+DOCUMENTATION FOR LOCKED COUNTERS (aka QemuLockCnt)
+===================================================
+
+QEMU often uses reference counts to track data structures that are being
+accessed and should not be freed.  For example, a loop that invoke
+callbacks like this is not safe:
+
+    QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+        if (ioh->revents & G_IO_OUT) {
+            ioh->fd_write(ioh->opaque);
+        }
+    }
+
+QLIST_FOREACH_SAFE protects against deletion of the current node (ioh)
+by stashing away its "next" pointer.  However, ioh->fd_write could
+actually delete the next node from the list.  The simplest way to
+avoid this is to mark the node as deleted, and remove it from the
+list in the above loop:
+
+    QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+        if (ioh->deleted) {
+            QLIST_REMOVE(ioh, next);
+            g_free(ioh);
+            continue;
+        }
+        if (ioh->revents & G_IO_OUT) {
+            ioh->fd_write(ioh->opaque);
+        }
+    }
+
+If however this loop must also be reentrant, i.e. it is possible that
+ioh->fd_write invokes the loop again, some kind of counting is needed:
+
+    walking_handlers++;
+    QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+        if (walking_handlers == 1 && ioh->deleted) {
+            QLIST_REMOVE(ioh, next);
+            g_free(ioh);
+            continue;
+        }
+        if (ioh->revents & G_IO_OUT) {
+            ioh->fd_write(ioh->opaque);
+        }
+    }
+    walking_handlers--;
+
+One may think of using the RCU primitives, rcu_read_lock() and
+rcu_read_unlock(); effectively, the RCU nesting count would take
+the place of the walking_handlers global variable.  Indeed,
+reference counting and RCU have similar purposes, but their usage in
+general is complementary:
+
+- reference counting is fine-grained and limited to a single data
+  structure; RCU delays reclamation of *all* RCU-protected data
+  structures;
+
+- reference counting works even in the presence of code that keeps
+  a reference for a long time; RCU critical sections in principle
+  should be kept short;
+
+- reference counting is often applied to code that is not thread-safe
+  but is reentrant; in fact, usage of reference counting in QEMU predates
+  the introduction of threads by many years.  RCU is generally used to
+  protect readers from other threads freeing memory after concurrent
+  modifications to a data structure.
+
+- reclaiming data can be done by a separate thread in the case of RCU;
+  this can improve performance, but also delay reclamation undesirably.
+  With reference counting, reclamation is deterministic.
+
+This file documents QemuLockCnt, an abstraction for using reference
+counting in code that has to be both thread-safe and reentrant.
+
+
+QemuLockCnt concepts
+--------------------
+
+A QemuLockCnt comprises both a counter and a mutex; it has primitives
+to increment and decrement the counter, and to take and release the
+mutex.  The counter notes how many visits to the data structures are
+taking place (the visits could be from different threads, or there could
+be multiple reentrant visits from the same thread).  The basic rules
+governing the counter/mutex pair then are the following:
+
+- Data protected by the QemuLockCnt must not be freed unless the
+  counter is zero and the mutex is taken.
+
+- A new visit cannot be started while the counter is zero and the
+  mutex is taken.
+
+Reads can be done without taking the mutex, as long as the readers and
+writers respectively use atomic_rcu_read and atomic_rcu_write.  Most
+of the time, the mutex protects all writes to the data structure, not
+just frees, though there could be cases where this is not necessary.
+
+This could be implemented simply by protecting the counter with the
+mutex, for example:
+
+    // (1)
+    qemu_mutex_lock(&walking_handlers_mutex);
+    walking_handlers++;
+    qemu_mutex_unlock(&walking_handlers_mutex);
+
+    ...
+
+    // (2)
+    qemu_mutex_lock(&walking_handlers_mutex);
+    if (--walking_handlers == 0) {
+        QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+            if (ioh->deleted) {
+                QLIST_REMOVE(ioh, next);
+                g_free(ioh);
+            }
+        }
+    }
+    qemu_mutex_unlock(&walking_handlers_mutex);
+
+Here, no frees can happen in the code represented by the ellipsis.
+If another thread is executing critical section (2), that part of
+the code cannot be entered, because the thread will not be able
+to increment the walking_handlers variable.  And of course
+during the visit any other thread will see a nonzero value for
+walking_handlers, as in the single-threaded code.
+
+However, because the modifications of the count result in very small
+critical sections, they are easy to forget.  QemuLockCnt ensures that
+all modifications of the counter take the lock appropriately, and it
+can also be more efficient in two ways:
+
+- it avoids taking the lock for many operations (for example
+  incrementing the counter while it is non-zero);
+
+- on some platforms, one could implement QemuLockCnt to hold the
+  lock and the mutex in a single word, making it no more expensive
+  than simply managing a counter using atomic operations (see
+  docs/atomics.txt).  This is not implemented yet, but can be
+  very helpful if concurrent access to the data structure is
+  expected to be rare.
+
+
+Using the same mutex for frees and writes can still incur some small
+inefficiencies; for example, a visit can never start if the counter is
+zero and the mutex is taken---even if the mutex is taken by a write,
+which in principle need not block a visit of the data structure.
+However, these are usually not a problem if any of the following
+assumptions are valid:
+
+- concurrent access is possible but rare
+
+- writes are rare
+
+- writes are frequent, but this kind of write (e.g. appending to a
+  list) has a very small critical section.
+
+For example, QEMU uses QemuLockCnt to manage an AioContext's list of
+bottom halves and file descriptor handlers.  Modifications to the list
+of file descriptor handlers are rare.  Creation of a new bottom half is
+frequent and can happen on a fast path; however: 1) it is almost never
+concurrent with a visit to the list of bottom halves; 2) it only has
+three instructions in the critical path, two assignments and a smp_wmb().
+
+
+QemuLockCnt API
+---------------
+
+    void qemu_lockcnt_init(QemuLockCnt *lockcnt);
+
+        Initialize lockcnt's counter to zero and prepare its mutex
+        for usage.
+
+    void qemu_lockcnt_destroy(QemuLockCnt *lockcnt);
+
+        Destroy lockcnt's mutex.
+
+    void qemu_lockcnt_inc(QemuLockCnt *lockcnt);
+
+        If the lockcnt's count is zero, wait for critical sections
+        to finish and increment lockcnt's count to 1.  If the count
+        is not zero, just increment it.
+
+        Because this function can wait on the mutex, it must not be
+        called while the lockcnt's mutex is held by the current thread.
+        For the same reason, qemu_lockcnt_inc can also contribute to
+        AB-BA deadlocks.  This is a sample deadlock scenario:
+
+              thread 1                      thread 2
+              -------------------------------------------------------
+              qemu_lockcnt_lock(&lc1);
+                                            qemu_lockcnt_lock(&lc2);
+              qemu_lockcnt_inc(&lc2);
+                                            qemu_lockcnt_inc(&lc1);
+
+    void qemu_lockcnt_dec(QemuLockCnt *lockcnt);
+
+        Decrement lockcnt's count.
+
+    bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt);
+
+        Decrement the count.  If the new count is zero, lock
+        the mutex and return true.  Otherwise, return false.
+
+    bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt);
+
+        If the count is 1, decrement the count to zero, lock
+        the mutex and return true.  Otherwise, return false.
+
+    void qemu_lockcnt_lock(QemuLockCnt *lockcnt);
+
+        Lock the lockcnt's mutex.  Remember that concurrent visits
+        are not blocked unless the count is also zero.  You can
+        use qemu_lockcnt_count to check for this inside a critical
+        section.
+
+    void qemu_lockcnt_unlock(QemuLockCnt *lockcnt);
+
+        Release the lockcnt's mutex.
+
+    void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt);
+
+        This is the same as
+
+            qemu_lockcnt_unlock(lockcnt);
+            qemu_lockcnt_inc(lockcnt);
+
+        but more efficient.
+
+    int qemu_lockcnt_count(QemuLockCnt *lockcnt);
+
+        Return the lockcnt's count.  The count can change at any time
+        any time; still, while the lockcnt is locked, one can usefully
+        check whether the count is non-zero.
+
+
+QemuLockCnt usage
+-----------------
+
+The typical pattern for QemuLockCnt functions is as follows.
+
+    qemu_lockcnt_inc(&xyz_lockcnt);
+    if (xyz) {
+        ... access xyz ...
+    }
+
+    if (qemu_lockcnt_dec_and_lock(&xyz_lockcnt)) {
+        g_free(xyz);
+        xyz = NULL;
+        qemu_lockcnt_unlock(&xyz_lockcnt);
+    }
+
+Modifications are done using qemu_lockcnt_lock and qemu_lockcnt_unlock:
+
+    qemu_lockcnt_lock(&xyz_lockcnt);
+    xyz = g_malloc(xyz);
+    qemu_lockcnt_unlock(&xyz_lockcnt);
+
+If an object has to be freed outside a visit, you can use the following
+scheme:
+
+    qemu_lockcnt_lock(&xyz_lockcnt);
+    if (!qemu_lockcnt_count(&xyz_lockcnt)) {
+        g_free(xyz);
+        xyz = NULL;
+    }
+    qemu_lockcnt_unlock(&xyz_lockcnt);
+
+In both the first and the third code snippets, g_free is only executed
+if count is zero.  qemu_lockcnt_inc prevents the count from becoming
+non-zero while the object is being freed.
+
+
+QemuLockCnt can also be used to access a list as follows:
+
+    qemu_lockcnt_inc(&io_handlers_lockcnt);
+    QLIST_FOREACH_RCU(ioh, &io_handlers, pioh) {
+        if (ioh->revents & G_IO_OUT) {
+            ioh->fd_write(ioh->opaque);
+        }
+    }
+
+    if (qemu_lockcnt_dec_and_lock(&io_handlers_lockcnt)) {
+        QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+            if (ioh->deleted) {
+                QLIST_REMOVE(ioh, next);
+                g_free(ioh);
+            }
+        }
+        qemu_lockcnt_unlock(&io_handlers_lockcnt);
+    }
+
+An alternative pattern uses qemu_lockcnt_dec_if_lock:
+
+    qemu_lockcnt_inc(&io_handlers_lockcnt);
+    QLIST_FOREACH_SAFE_RCU(ioh, &io_handlers, next, pioh) {
+        if (ioh->deleted && qemu_lockcnt_dec_if_lock(&io_handlers_lockcnt)) {
+            QLIST_REMOVE(ioh, next);
+            g_free(ioh);
+            qemu_lockcnt_inc_and_unlock(&io_handlers_lockcnt);
+            continue;
+        }
+
+        if (ioh->revents & G_IO_OUT) {
+            ioh->fd_write(ioh->opaque);
+        }
+    }
+    qemu_lockcnt_dec(&io_handlers_lockcnt);
+
+Here you can use qemu_lockcnt_dec because there is no special task
+to do if the count goes from 1 to 0.
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index 5114ec8..39d5959 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -8,6 +8,7 @@ typedef struct QemuMutex QemuMutex;
 typedef struct QemuCond QemuCond;
 typedef struct QemuSemaphore QemuSemaphore;
 typedef struct QemuEvent QemuEvent;
+typedef struct QemuLockCnt QemuLockCnt;
 typedef struct QemuThread QemuThread;
 
 #ifdef _WIN32
@@ -62,4 +63,20 @@ struct Notifier;
 void qemu_thread_atexit_add(struct Notifier *notifier);
 void qemu_thread_atexit_remove(struct Notifier *notifier);
 
+struct QemuLockCnt {
+    QemuMutex mutex;
+    unsigned count;
+};
+
+void qemu_lockcnt_init(QemuLockCnt *lockcnt);
+void qemu_lockcnt_destroy(QemuLockCnt *lockcnt);
+void qemu_lockcnt_inc(QemuLockCnt *lockcnt);
+void qemu_lockcnt_dec(QemuLockCnt *lockcnt);
+bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt);
+bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt);
+void qemu_lockcnt_lock(QemuLockCnt *lockcnt);
+void qemu_lockcnt_unlock(QemuLockCnt *lockcnt);
+void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt);
+int qemu_lockcnt_count(QemuLockCnt *lockcnt);
+
 #endif
diff --git a/util/Makefile.objs b/util/Makefile.objs
index 114d657..46dd01c 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -1,4 +1,4 @@
-util-obj-y = osdep.o cutils.o unicode.o qemu-timer-common.o
+util-obj-y = osdep.o cutils.o unicode.o qemu-timer-common.o lockcnt.o
 util-obj-$(CONFIG_WIN32) += oslib-win32.o qemu-thread-win32.o event_notifier-win32.o
 util-obj-$(CONFIG_POSIX) += oslib-posix.o qemu-thread-posix.o event_notifier-posix.o qemu-openpty.o
 util-obj-y += envlist.o path.o module.o
diff --git a/util/lockcnt.c b/util/lockcnt.c
new file mode 100644
index 0000000..63057ba
--- /dev/null
+++ b/util/lockcnt.c
@@ -0,0 +1,123 @@
+/*
+ * QemuLockCnt implementation
+ *
+ * Copyright Red Hat, Inc. 2015
+ *
+ * Author:
+ *   Paolo Bonzini <pbonzini@redhat.com>
+ */
+#include <stdlib.h>
+#include <stdio.h>
+#include <errno.h>
+#include <time.h>
+#include <signal.h>
+#include <stdint.h>
+#include <string.h>
+#include <limits.h>
+#include <unistd.h>
+#include <sys/time.h>
+#include "qemu/thread.h"
+#include "qemu/atomic.h"
+
+void qemu_lockcnt_init(QemuLockCnt *lockcnt)
+{
+    qemu_mutex_init(&lockcnt->mutex);
+    lockcnt->count = 0;
+}
+
+void qemu_lockcnt_destroy(QemuLockCnt *lockcnt)
+{
+    qemu_mutex_destroy(&lockcnt->mutex);
+}
+
+void qemu_lockcnt_inc(QemuLockCnt *lockcnt)
+{
+    int old;
+    for (;;) {
+        old = atomic_mb_read(&lockcnt->count);
+        if (old == 0) {
+            qemu_lockcnt_lock(lockcnt);
+            atomic_inc(&lockcnt->count);
+            qemu_lockcnt_unlock(lockcnt);
+            return;
+        } else {
+            if (atomic_cmpxchg(&lockcnt->count, old, old + 1) == old) {
+                return;
+            }
+        }
+    }
+}
+
+void qemu_lockcnt_dec(QemuLockCnt *lockcnt)
+{
+    atomic_dec(&lockcnt->count);
+}
+
+/* Decrement a counter, and return locked if it is decremented to zero.
+ * It is impossible for the counter to become nonzero while the mutex
+ * is taken.
+ */
+bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    while (val > 1) {
+        int old = atomic_cmpxchg(&lockcnt->count, val, val - 1);
+        if (old != val) {
+            val = old;
+            continue;
+        }
+
+        return false;
+    }
+
+    qemu_lockcnt_lock(lockcnt);
+    if (atomic_fetch_dec(&lockcnt->count) == 1) {
+        return true;
+    }
+
+    qemu_lockcnt_unlock(lockcnt);
+    return false;
+}
+
+/* Decrement a counter and return locked if it is decremented to zero.
+ * Otherwise do nothing.
+ *
+ * It is impossible for the counter to become nonzero while the mutex
+ * is taken.
+ */
+bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_mb_read(&lockcnt->count);
+    if (val > 1) {
+        return false;
+    }
+
+    qemu_lockcnt_lock(lockcnt);
+    if (atomic_fetch_dec(&lockcnt->count) == 1) {
+        return true;
+    }
+
+    qemu_lockcnt_inc_and_unlock(lockcnt);
+    return false;
+}
+
+void qemu_lockcnt_lock(QemuLockCnt *lockcnt)
+{
+    qemu_mutex_lock(&lockcnt->mutex);
+}
+
+void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt)
+{
+    atomic_inc(&lockcnt->count);
+    qemu_mutex_unlock(&lockcnt->mutex);
+}
+
+void qemu_lockcnt_unlock(QemuLockCnt *lockcnt)
+{
+    qemu_mutex_unlock(&lockcnt->mutex);
+}
+
+int qemu_lockcnt_count(QemuLockCnt *lockcnt)
+{
+    return lockcnt->count;
+}
-- 
2.4.3

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 04/18] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh
  2015-08-06 13:35 [Qemu-devel] [RFC PATCH 00/18] Fine-grained AioContext critical sections Paolo Bonzini
                   ` (2 preceding siblings ...)
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 03/18] qemu-thread: introduce QemuLockCnt Paolo Bonzini
@ 2015-08-06 13:36 ` Paolo Bonzini
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 05/18] aio: tweak walking in dispatch phase Paolo Bonzini
                   ` (13 subsequent siblings)
  17 siblings, 0 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-08-06 13:36 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, stefanha, qemu-block

This will make it possible to walk the list of bottom halves without
holding the AioContext lock---and in turn to call bottom half
handlers without holding the lock.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 async.c             | 28 ++++++++++++----------------
 include/block/aio.h | 12 +++++-------
 2 files changed, 17 insertions(+), 23 deletions(-)

diff --git a/async.c b/async.c
index 7e97614..69b76dc 100644
--- a/async.c
+++ b/async.c
@@ -50,12 +50,12 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
         .cb = cb,
         .opaque = opaque,
     };
-    qemu_mutex_lock(&ctx->list_lock);
+    qemu_lockcnt_lock(&ctx->list_lock);
     bh->next = ctx->first_bh;
     /* Make sure that the members are ready before putting bh into list */
     smp_wmb();
     ctx->first_bh = bh;
-    qemu_mutex_unlock(&ctx->list_lock);
+    qemu_lockcnt_unlock(&ctx->list_lock);
     return bh;
 }
 
@@ -65,13 +65,11 @@ int aio_bh_poll(AioContext *ctx)
     QEMUBH *bh, **bhp, *next;
     int ret;
 
-    ctx->walking_bh++;
+    qemu_lockcnt_inc(&ctx->list_lock);
 
     ret = 0;
-    for (bh = ctx->first_bh; bh; bh = next) {
-        /* Make sure that fetching bh happens before accessing its members */
-        smp_read_barrier_depends();
-        next = bh->next;
+    for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) {
+        next = atomic_rcu_read(&bh->next);
         /* The atomic_xchg is paired with the one in qemu_bh_schedule.  The
          * implicit memory barrier ensures that the callback sees all writes
          * done by the scheduling thread.  It also ensures that the scheduling
@@ -88,11 +86,8 @@ int aio_bh_poll(AioContext *ctx)
         }
     }
 
-    ctx->walking_bh--;
-
     /* remove deleted bhs */
-    if (!ctx->walking_bh) {
-        qemu_mutex_lock(&ctx->list_lock);
+    if (qemu_lockcnt_dec_and_lock(&ctx->list_lock)) {
         bhp = &ctx->first_bh;
         while (*bhp) {
             bh = *bhp;
@@ -103,7 +98,7 @@ int aio_bh_poll(AioContext *ctx)
                 bhp = &bh->next;
             }
         }
-        qemu_mutex_unlock(&ctx->list_lock);
+        qemu_lockcnt_unlock(&ctx->list_lock);
     }
 
     return ret;
@@ -234,7 +229,8 @@ aio_ctx_finalize(GSource     *source)
 
     thread_pool_free(ctx->thread_pool);
 
-    qemu_mutex_lock(&ctx->list_lock);
+    qemu_lockcnt_lock(&ctx->list_lock);
+    assert(!qemu_lockcnt_count(&ctx->list_lock));
     while (ctx->first_bh) {
         QEMUBH *next = ctx->first_bh->next;
 
@@ -244,12 +240,12 @@ aio_ctx_finalize(GSource     *source)
         g_free(ctx->first_bh);
         ctx->first_bh = next;
     }
-    qemu_mutex_unlock(&ctx->list_lock);
+    qemu_lockcnt_unlock(&ctx->list_lock);
 
     aio_set_event_notifier(ctx, &ctx->notifier, NULL);
     event_notifier_cleanup(&ctx->notifier);
     rfifolock_destroy(&ctx->lock);
-    qemu_mutex_destroy(&ctx->list_lock);
+    qemu_lockcnt_destroy(&ctx->list_lock);
     timerlistgroup_deinit(&ctx->tlg);
 }
 
@@ -318,7 +314,7 @@ AioContext *aio_context_new(Error **errp)
                            (EventNotifierHandler *)
                            event_notifier_dummy_cb);
     ctx->thread_pool = NULL;
-    qemu_mutex_init(&ctx->list_lock);
+    qemu_lockcnt_init(&ctx->list_lock);
     rfifolock_init(&ctx->lock, NULL, NULL);
     timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 
diff --git a/include/block/aio.h b/include/block/aio.h
index 026f6c1..3988d28 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -88,17 +88,15 @@ struct AioContext {
      */
     uint32_t notify_me;
 
-    /* lock to protect between bh's adders and deleter */
-    QemuMutex list_lock;
+    /* A lock to protect between bh's adders and deleter, and to ensure
+     * that no callbacks are removed while we're walking and dispatching
+     * them.
+     */
+    QemuLockCnt list_lock;
 
     /* Anchor of the list of Bottom Halves belonging to the context */
     struct QEMUBH *first_bh;
 
-    /* A simple lock used to protect the first_bh list, and ensure that
-     * no callbacks are removed while we're walking and dispatching callbacks.
-     */
-    int walking_bh;
-
     /* Used by aio_notify.
      *
      * "notified" is used to avoid expensive event_notifier_test_and_clear
-- 
2.4.3

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 05/18] aio: tweak walking in dispatch phase
  2015-08-06 13:35 [Qemu-devel] [RFC PATCH 00/18] Fine-grained AioContext critical sections Paolo Bonzini
                   ` (3 preceding siblings ...)
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 04/18] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh Paolo Bonzini
@ 2015-08-06 13:36 ` Paolo Bonzini
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 06/18] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock Paolo Bonzini
                   ` (12 subsequent siblings)
  17 siblings, 0 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-08-06 13:36 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, stefanha, qemu-block

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-posix.c | 26 ++++++++++++--------------
 aio-win32.c | 26 ++++++++++++--------------
 2 files changed, 24 insertions(+), 28 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index d477033..b8aaa92 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -124,7 +124,7 @@ bool aio_pending(AioContext *ctx)
 
 bool aio_dispatch(AioContext *ctx)
 {
-    AioHandler *node;
+    AioHandler *node, *tmp;
     bool progress = false;
 
     /*
@@ -140,12 +140,10 @@ bool aio_dispatch(AioContext *ctx)
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    node = QLIST_FIRST(&ctx->aio_handlers);
-    while (node) {
-        AioHandler *tmp;
-        int revents;
+    ctx->walking_handlers++;
 
-        ctx->walking_handlers++;
+    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
+        int revents;
 
         revents = node->pfd.revents & node->pfd.events;
         node->pfd.revents = 0;
@@ -167,17 +165,17 @@ bool aio_dispatch(AioContext *ctx)
             progress = true;
         }
 
-        tmp = node;
-        node = QLIST_NEXT(node, node);
-
-        ctx->walking_handlers--;
-
-        if (!ctx->walking_handlers && tmp->deleted) {
-            QLIST_REMOVE(tmp, node);
-            g_free(tmp);
+        if (node->deleted) {
+            ctx->walking_handlers--;
+            if (!ctx->walking_handlers) {
+                g_free(node);
+            }
+            ctx->walking_handlers++;
         }
     }
 
+    ctx->walking_handlers--;
+
     /* Run our timers */
     progress |= timerlistgroup_run_timers(&ctx->tlg);
 
diff --git a/aio-win32.c b/aio-win32.c
index 50a6867..2b4b156 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -203,20 +203,18 @@ bool aio_pending(AioContext *ctx)
 
 static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
 {
-    AioHandler *node;
+    AioHandler *node, *tmp;
     bool progress = false;
 
+    ctx->walking_handlers++;
+
     /*
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    node = QLIST_FIRST(&ctx->aio_handlers);
-    while (node) {
-        AioHandler *tmp;
+    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
         int revents = node->pfd.revents;
 
-        ctx->walking_handlers++;
-
         if (!node->deleted &&
             (revents || event_notifier_get_handle(node->e) == event) &&
             node->io_notify) {
@@ -251,17 +249,17 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
             }
         }
 
-        tmp = node;
-        node = QLIST_NEXT(node, node);
-
-        ctx->walking_handlers--;
-
-        if (!ctx->walking_handlers && tmp->deleted) {
-            QLIST_REMOVE(tmp, node);
-            g_free(tmp);
+        if (node->deleted) {
+            ctx->walking_handlers--;
+            if (!ctx->walking_handlers) {
+                QLIST_REMOVE(tmp, node);
+                g_free(tmp);
+            }
+            ctx->walking_handlers++;
         }
     }
 
+    ctx->walking_handlers--;
     return progress;
 }
 
-- 
2.4.3

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 06/18] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock
  2015-08-06 13:35 [Qemu-devel] [RFC PATCH 00/18] Fine-grained AioContext critical sections Paolo Bonzini
                   ` (4 preceding siblings ...)
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 05/18] aio: tweak walking in dispatch phase Paolo Bonzini
@ 2015-08-06 13:36 ` Paolo Bonzini
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 07/18] aio-win32: " Paolo Bonzini
                   ` (11 subsequent siblings)
  17 siblings, 0 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-08-06 13:36 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, stefanha, qemu-block

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-posix.c | 55 +++++++++++++++++++++++++++++++++----------------------
 1 file changed, 33 insertions(+), 22 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index b8aaa92..d2b51fc 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -15,7 +15,7 @@
 
 #include "qemu-common.h"
 #include "block/block.h"
-#include "qemu/queue.h"
+#include "qemu/rcu_queue.h"
 #include "qemu/sockets.h"
 
 struct AioHandler
@@ -49,6 +49,8 @@ void aio_set_fd_handler(AioContext *ctx,
 {
     AioHandler *node;
 
+    qemu_lockcnt_lock(&ctx->list_lock);
+
     node = find_aio_handler(ctx, fd);
 
     /* Are we deleting the fd handler? */
@@ -56,14 +58,14 @@ void aio_set_fd_handler(AioContext *ctx,
         if (node) {
             g_source_remove_poll(&ctx->source, &node->pfd);
 
-            /* If the lock is held, just mark the node as deleted */
-            if (ctx->walking_handlers) {
+            /* If aio_poll is in progress, just mark the node as deleted */
+            if (qemu_lockcnt_count(&ctx->list_lock)) {
                 node->deleted = 1;
                 node->pfd.revents = 0;
             } else {
                 /* Otherwise, delete it for real.  We can't just mark it as
                  * deleted because deleted nodes are only cleaned up after
-                 * releasing the walking_handlers lock.
+                 * releasing the list_lock.
                  */
                 QLIST_REMOVE(node, node);
                 g_free(node);
@@ -74,7 +76,7 @@ void aio_set_fd_handler(AioContext *ctx,
             /* Alloc and insert if it's not already there */
             node = g_new0(AioHandler, 1);
             node->pfd.fd = fd;
-            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
 
             g_source_add_poll(&ctx->source, &node->pfd);
         }
@@ -87,6 +89,7 @@ void aio_set_fd_handler(AioContext *ctx,
         node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
     }
 
+    qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
 }
 
@@ -106,20 +109,30 @@ bool aio_prepare(AioContext *ctx)
 bool aio_pending(AioContext *ctx)
 {
     AioHandler *node;
+    bool result = false;
 
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    /*
+     * We have to walk very carefully in case aio_set_fd_handler is
+     * called while we're walking.
+     */
+    qemu_lockcnt_inc(&ctx->list_lock);
+
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         int revents;
 
         revents = node->pfd.revents & node->pfd.events;
         if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
-            return true;
+            result = true;
+            break;
         }
         if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
-            return true;
+            result = true;
+            break;
         }
     }
+    qemu_lockcnt_dec(&ctx->list_lock);
 
-    return false;
+    return result;
 }
 
 bool aio_dispatch(AioContext *ctx)
@@ -140,13 +153,12 @@ bool aio_dispatch(AioContext *ctx)
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    ctx->walking_handlers++;
+    qemu_lockcnt_inc(&ctx->list_lock);
 
-    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
+    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
         int revents;
 
-        revents = node->pfd.revents & node->pfd.events;
-        node->pfd.revents = 0;
+        revents = atomic_xchg(&node->pfd.revents, 0) & node->pfd.events;
 
         if (!node->deleted &&
             (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
@@ -166,15 +178,15 @@ bool aio_dispatch(AioContext *ctx)
         }
 
         if (node->deleted) {
-            ctx->walking_handlers--;
-            if (!ctx->walking_handlers) {
+            if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
+                QLIST_REMOVE(node, node);
                 g_free(node);
+                qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
             }
-            ctx->walking_handlers++;
         }
     }
 
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
 
     /* Run our timers */
     progress |= timerlistgroup_run_timers(&ctx->tlg);
@@ -249,12 +261,11 @@ bool aio_poll(AioContext *ctx, bool blocking)
         atomic_add(&ctx->notify_me, 2);
     }
 
-    ctx->walking_handlers++;
-
+    qemu_lockcnt_inc(&ctx->list_lock);
     assert(npfd == 0);
 
     /* fill pollfds */
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         if (!node->deleted && node->pfd.events) {
             add_pollfd(node);
         }
@@ -279,12 +290,12 @@ bool aio_poll(AioContext *ctx, bool blocking)
     /* if we have any readable fds, dispatch event */
     if (ret > 0) {
         for (i = 0; i < npfd; i++) {
-            nodes[i]->pfd.revents = pollfds[i].revents;
+            atomic_or(&nodes[i]->pfd.revents, pollfds[i].revents);
         }
     }
 
     npfd = 0;
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
 
     /* Run dispatch even if there were no readable fds to run timers */
     if (aio_dispatch(ctx)) {
-- 
2.4.3

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 07/18] aio-win32: remove walking_handlers, protecting AioHandler list with list_lock
  2015-08-06 13:35 [Qemu-devel] [RFC PATCH 00/18] Fine-grained AioContext critical sections Paolo Bonzini
                   ` (5 preceding siblings ...)
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 06/18] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock Paolo Bonzini
@ 2015-08-06 13:36 ` Paolo Bonzini
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 08/18] aio: document locking Paolo Bonzini
                   ` (10 subsequent siblings)
  17 siblings, 0 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-08-06 13:36 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, stefanha, qemu-block

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-win32.c | 81 +++++++++++++++++++++++++++++++++++++------------------------
 1 file changed, 49 insertions(+), 32 deletions(-)

diff --git a/aio-win32.c b/aio-win32.c
index 2b4b156..a7994b3 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -40,6 +40,7 @@ void aio_set_fd_handler(AioContext *ctx,
     /* fd is a SOCKET in our case */
     AioHandler *node;
 
+    qemu_lockcnt_lock(&ctx->list_lock);
     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
         if (node->pfd.fd == fd && !node->deleted) {
             break;
@@ -49,14 +50,14 @@ void aio_set_fd_handler(AioContext *ctx,
     /* Are we deleting the fd handler? */
     if (!io_read && !io_write) {
         if (node) {
-            /* If the lock is held, just mark the node as deleted */
-            if (ctx->walking_handlers) {
+            /* If aio_poll is in progress, just mark the node as deleted */
+            if (qemu_lockcnt_count(&ctx->list_lock)) {
                 node->deleted = 1;
                 node->pfd.revents = 0;
             } else {
                 /* Otherwise, delete it for real.  We can't just mark it as
                  * deleted because deleted nodes are only cleaned up after
-                 * releasing the walking_handlers lock.
+                 * releasing the list_lock.
                  */
                 QLIST_REMOVE(node, node);
                 g_free(node);
@@ -69,7 +70,7 @@ void aio_set_fd_handler(AioContext *ctx,
             /* Alloc and insert if it's not already there */
             node = g_new0(AioHandler, 1);
             node->pfd.fd = fd;
-            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
         }
 
         node->pfd.events = 0;
@@ -93,6 +94,7 @@ void aio_set_fd_handler(AioContext *ctx,
                        FD_CONNECT | FD_WRITE | FD_OOB);
     }
 
+    qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
 }
 
@@ -102,6 +104,7 @@ void aio_set_event_notifier(AioContext *ctx,
 {
     AioHandler *node;
 
+    qemu_lockcnt_lock(&ctx->list_lock);
     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
         if (node->e == e && !node->deleted) {
             break;
@@ -113,14 +116,14 @@ void aio_set_event_notifier(AioContext *ctx,
         if (node) {
             g_source_remove_poll(&ctx->source, &node->pfd);
 
-            /* If the lock is held, just mark the node as deleted */
-            if (ctx->walking_handlers) {
+            /* aio_poll is in progress, just mark the node as deleted */
+            if (qemu_lockcnt_count(&ctx->list_lock)) {
                 node->deleted = 1;
                 node->pfd.revents = 0;
             } else {
                 /* Otherwise, delete it for real.  We can't just mark it as
                  * deleted because deleted nodes are only cleaned up after
-                 * releasing the walking_handlers lock.
+                 * releasing the list_lock.
                  */
                 QLIST_REMOVE(node, node);
                 g_free(node);
@@ -133,7 +136,7 @@ void aio_set_event_notifier(AioContext *ctx,
             node->e = e;
             node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
             node->pfd.events = G_IO_IN;
-            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
 
             g_source_add_poll(&ctx->source, &node->pfd);
         }
@@ -141,6 +144,7 @@ void aio_set_event_notifier(AioContext *ctx,
         node->io_notify = io_notify;
     }
 
+    qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
 }
 
@@ -151,10 +155,16 @@ bool aio_prepare(AioContext *ctx)
     bool have_select_revents = false;
     fd_set rfds, wfds;
 
+    /*
+     * We have to walk very carefully in case aio_set_fd_handler is
+     * called while we're walking.
+     */
+    qemu_lockcnt_inc(&ctx->list_lock);
+
     /* fill fd sets */
     FD_ZERO(&rfds);
     FD_ZERO(&wfds);
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         if (node->io_read) {
             FD_SET ((SOCKET)node->pfd.fd, &rfds);
         }
@@ -164,61 +174,71 @@ bool aio_prepare(AioContext *ctx)
     }
 
     if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
-        QLIST_FOREACH(node, &ctx->aio_handlers, node) {
-            node->pfd.revents = 0;
+        QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
             if (FD_ISSET(node->pfd.fd, &rfds)) {
-                node->pfd.revents |= G_IO_IN;
+                atomic_or(&node->pfd.revents, G_IO_IN);
                 have_select_revents = true;
             }
 
             if (FD_ISSET(node->pfd.fd, &wfds)) {
-                node->pfd.revents |= G_IO_OUT;
+                atomic_or(&node->pfd.revents, G_IO_OUT);
                 have_select_revents = true;
             }
         }
     }
 
+    qemu_lockcnt_dec(&ctx->list_lock);
     return have_select_revents;
 }
 
 bool aio_pending(AioContext *ctx)
 {
     AioHandler *node;
+    bool result = false;
 
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    /*
+     * We have to walk very carefully in case aio_set_fd_handler is
+     * called while we're walking.
+     */
+    qemu_lockcnt_inc(&ctx->list_lock);
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         if (node->pfd.revents && node->io_notify) {
-            return true;
+            result = true;
+            break;
         }
 
         if ((node->pfd.revents & G_IO_IN) && node->io_read) {
-            return true;
+            result = true;
+            break;
         }
         if ((node->pfd.revents & G_IO_OUT) && node->io_write) {
-            return true;
+            result = true;
+            break;
         }
     }
 
-    return false;
+    qemu_lockcnt_dec(&ctx->list_lock);
+    return result;
 }
 
 static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
 {
-    AioHandler *node, *tmp;
+    AioHandler *node;
     bool progress = false;
 
-    ctx->walking_handlers++;
+    qemu_lockcnt_inc(&ctx->list_lock);;
 
     /*
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
-        int revents = node->pfd.revents;
+    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
+        AioHandler *tmp;
+        int revents = atomic_xchg(&node->pfd.revents, 0);
 
         if (!node->deleted &&
             (revents || event_notifier_get_handle(node->e) == event) &&
             node->io_notify) {
-            node->pfd.revents = 0;
             node->io_notify(node->e);
 
             /* aio_notify() does not count as progress */
@@ -229,7 +249,6 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
 
         if (!node->deleted &&
             (node->io_read || node->io_write)) {
-            node->pfd.revents = 0;
             if ((revents & G_IO_IN) && node->io_read) {
                 node->io_read(node->opaque);
                 progress = true;
@@ -250,16 +269,15 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
         }
 
         if (node->deleted) {
-            ctx->walking_handlers--;
-            if (!ctx->walking_handlers) {
+            if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
                 QLIST_REMOVE(tmp, node);
                 g_free(tmp);
+                qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
             }
-            ctx->walking_handlers++;
         }
     }
 
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
     return progress;
 }
 
@@ -295,19 +313,18 @@ bool aio_poll(AioContext *ctx, bool blocking)
         atomic_add(&ctx->notify_me, 2);
     }
 
+    qemu_lockcnt_inc(&ctx->list_lock);
     have_select_revents = aio_prepare(ctx);
 
-    ctx->walking_handlers++;
-
     /* fill fd sets */
     count = 0;
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         if (!node->deleted && node->io_notify) {
             events[count++] = event_notifier_get_handle(node->e);
         }
     }
 
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
     first = true;
 
     /* ctx->notifier is always registered.  */
-- 
2.4.3

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 08/18] aio: document locking
  2015-08-06 13:35 [Qemu-devel] [RFC PATCH 00/18] Fine-grained AioContext critical sections Paolo Bonzini
                   ` (6 preceding siblings ...)
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 07/18] aio-win32: " Paolo Bonzini
@ 2015-08-06 13:36 ` Paolo Bonzini
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 09/18] aio: push aio_context_acquire/release down to dispatching Paolo Bonzini
                   ` (9 subsequent siblings)
  17 siblings, 0 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-08-06 13:36 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, stefanha, qemu-block

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/multiple-iothreads.txt |  5 ++---
 include/block/aio.h         | 24 +++++++++++-------------
 2 files changed, 13 insertions(+), 16 deletions(-)

diff --git a/docs/multiple-iothreads.txt b/docs/multiple-iothreads.txt
index 723cc7e..4197f62 100644
--- a/docs/multiple-iothreads.txt
+++ b/docs/multiple-iothreads.txt
@@ -84,9 +84,8 @@ How to synchronize with an IOThread
 AioContext is not thread-safe so some rules must be followed when using file
 descriptors, event notifiers, timers, or BHs across threads:
 
-1. AioContext functions can be called safely from file descriptor, event
-notifier, timer, or BH callbacks invoked by the AioContext.  No locking is
-necessary.
+1. AioContext functions can always be called safely.  They handle their
+own locking internally.
 
 2. Other threads wishing to access the AioContext must use
 aio_context_acquire()/aio_context_release() for mutual exclusion.  Once the
diff --git a/include/block/aio.h b/include/block/aio.h
index 3988d28..5d4ca01 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -51,18 +51,12 @@ typedef void IOHandler(void *opaque);
 struct AioContext {
     GSource source;
 
-    /* Protects all fields from multi-threaded access */
+    /* Used by AioContext users to protect from multi-threaded access.  */
     RFifoLock lock;
 
-    /* The list of registered AIO handlers */
+    /* The list of registered AIO handlers.  Protected by ctx->list_lock. */
     QLIST_HEAD(, AioHandler) aio_handlers;
 
-    /* This is a simple lock used to protect the aio_handlers list.
-     * Specifically, it's used to ensure that no callbacks are removed while
-     * we're walking and dispatching callbacks.
-     */
-    int walking_handlers;
-
     /* Used to avoid unnecessary event_notifier_set calls in aio_notify;
      * accessed with atomic primitives.  If this field is 0, everything
      * (file descriptors, bottom halves, timers) will be re-evaluated
@@ -88,9 +82,9 @@ struct AioContext {
      */
     uint32_t notify_me;
 
-    /* A lock to protect between bh's adders and deleter, and to ensure
-     * that no callbacks are removed while we're walking and dispatching
-     * them.
+    /* A lock to protect between QEMUBH and AioHandler adders and deleter,
+     * and to ensure that no callbacks are removed while we're walking and
+     * dispatching them.
      */
     QemuLockCnt list_lock;
 
@@ -112,10 +106,14 @@ struct AioContext {
     bool notified;
     EventNotifier notifier;
 
-    /* Thread pool for performing work and receiving completion callbacks */
+    /* Thread pool for performing work and receiving completion callbacks.
+     * Has its own locking.
+     */
     struct ThreadPool *thread_pool;
 
-    /* TimerLists for calling timers - one per clock type */
+    /* TimerLists for calling timers - one per clock type.  Has its own
+     * locking.
+     */
     QEMUTimerListGroup tlg;
 };
 
-- 
2.4.3

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 09/18] aio: push aio_context_acquire/release down to dispatching
  2015-08-06 13:35 [Qemu-devel] [RFC PATCH 00/18] Fine-grained AioContext critical sections Paolo Bonzini
                   ` (7 preceding siblings ...)
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 08/18] aio: document locking Paolo Bonzini
@ 2015-08-06 13:36 ` Paolo Bonzini
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 10/18] async: optimize aio_bh_poll Paolo Bonzini
                   ` (8 subsequent siblings)
  17 siblings, 0 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-08-06 13:36 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, stefanha, qemu-block

The AioContext data structures are now protected by list_lock and/or
they are walked with FOREACH_RCU primitives.  There is no need anymore
to acquire the AioContext for the entire duration of aio_dispatch.
Instead, just acquire it before and after invoking the callbacks.
The next step is then to push it further down.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-posix.c | 15 ++++++---------
 aio-win32.c | 16 ++++++++--------
 async.c     |  2 ++
 3 files changed, 16 insertions(+), 17 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index d2b51fc..4944595 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -163,7 +163,9 @@ bool aio_dispatch(AioContext *ctx)
         if (!node->deleted &&
             (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
             node->io_read) {
+            aio_context_acquire(ctx);
             node->io_read(node->opaque);
+            aio_context_release(ctx);
 
             /* aio_notify() does not count as progress */
             if (node->opaque != &ctx->notifier) {
@@ -173,7 +175,9 @@ bool aio_dispatch(AioContext *ctx)
         if (!node->deleted &&
             (revents & (G_IO_OUT | G_IO_ERR)) &&
             node->io_write) {
+            aio_context_acquire(ctx);
             node->io_write(node->opaque);
+            aio_context_release(ctx);
             progress = true;
         }
 
@@ -189,7 +193,9 @@ bool aio_dispatch(AioContext *ctx)
     qemu_lockcnt_dec(&ctx->list_lock);
 
     /* Run our timers */
+    aio_context_acquire(ctx);
     progress |= timerlistgroup_run_timers(&ctx->tlg);
+    aio_context_release(ctx);
 
     return progress;
 }
@@ -247,7 +253,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
     bool progress;
     int64_t timeout;
 
-    aio_context_acquire(ctx);
     progress = false;
 
     /* aio_notify can avoid the expensive event_notifier_set if
@@ -274,16 +279,10 @@ bool aio_poll(AioContext *ctx, bool blocking)
     timeout = blocking ? aio_compute_timeout(ctx) : 0;
 
     /* wait until next event */
-    if (timeout) {
-        aio_context_release(ctx);
-    }
     ret = qemu_poll_ns((GPollFD *)pollfds, npfd, timeout);
     if (blocking) {
         atomic_sub(&ctx->notify_me, 2);
     }
-    if (timeout) {
-        aio_context_acquire(ctx);
-    }
 
     aio_notify_accept(ctx);
 
@@ -302,7 +301,5 @@ bool aio_poll(AioContext *ctx, bool blocking)
         progress = true;
     }
 
-    aio_context_release(ctx);
-
     return progress;
 }
diff --git a/aio-win32.c b/aio-win32.c
index a7994b3..929ed49 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -239,7 +239,9 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
         if (!node->deleted &&
             (revents || event_notifier_get_handle(node->e) == event) &&
             node->io_notify) {
+            aio_context_acquire(ctx);
             node->io_notify(node->e);
+            aio_context_release(ctx);
 
             /* aio_notify() does not count as progress */
             if (node->e != &ctx->notifier) {
@@ -250,11 +252,15 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
         if (!node->deleted &&
             (node->io_read || node->io_write)) {
             if ((revents & G_IO_IN) && node->io_read) {
+                aio_context_acquire(ctx);
                 node->io_read(node->opaque);
+                aio_context_release(ctx);
                 progress = true;
             }
             if ((revents & G_IO_OUT) && node->io_write) {
+                aio_context_acquire(ctx);
                 node->io_write(node->opaque);
+                aio_context_release(ctx);
                 progress = true;
             }
 
@@ -299,7 +305,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
     int count;
     int timeout;
 
-    aio_context_acquire(ctx);
     progress = false;
 
     /* aio_notify can avoid the expensive event_notifier_set if
@@ -340,17 +345,11 @@ bool aio_poll(AioContext *ctx, bool blocking)
 
         timeout = blocking && !have_select_revents
             ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0;
-        if (timeout) {
-            aio_context_release(ctx);
-        }
         ret = WaitForMultipleObjects(count, events, FALSE, timeout);
         if (blocking) {
             assert(first);
             atomic_sub(&ctx->notify_me, 2);
         }
-        if (timeout) {
-            aio_context_acquire(ctx);
-        }
 
         if (first) {
             aio_notify_accept(ctx);
@@ -373,8 +372,9 @@ bool aio_poll(AioContext *ctx, bool blocking)
         progress |= aio_dispatch_handlers(ctx, event);
     } while (count > 0);
 
+    aio_context_acquire(ctx);
     progress |= timerlistgroup_run_timers(&ctx->tlg);
-
     aio_context_release(ctx);
+
     return progress;
 }
diff --git a/async.c b/async.c
index 69b76dc..198c53d 100644
--- a/async.c
+++ b/async.c
@@ -82,7 +82,9 @@ int aio_bh_poll(AioContext *ctx)
                 ret = 1;
             }
             bh->idle = 0;
+            aio_context_acquire(ctx);
             bh->cb(bh->opaque);
+            aio_context_release(ctx);
         }
     }
 
-- 
2.4.3

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 10/18] async: optimize aio_bh_poll
  2015-08-06 13:35 [Qemu-devel] [RFC PATCH 00/18] Fine-grained AioContext critical sections Paolo Bonzini
                   ` (8 preceding siblings ...)
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 09/18] aio: push aio_context_acquire/release down to dispatching Paolo Bonzini
@ 2015-08-06 13:36 ` Paolo Bonzini
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 11/18] qemu-timer: optimize timerlist_run_timers Paolo Bonzini
                   ` (7 subsequent siblings)
  17 siblings, 0 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-08-06 13:36 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, stefanha, qemu-block

Avoid entering the slow path of qemu_lockcnt_dec_and_lock if
no bottom half has to be deleted.  If a bottom half deletes itself,
it will be picked up on the next visit of the list, or when the
AioContext itself is finalized.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 async.c | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/async.c b/async.c
index 198c53d..1fce3e4 100644
--- a/async.c
+++ b/async.c
@@ -64,19 +64,24 @@ int aio_bh_poll(AioContext *ctx)
 {
     QEMUBH *bh, **bhp, *next;
     int ret;
+    bool deleted = false;
 
     qemu_lockcnt_inc(&ctx->list_lock);
 
     ret = 0;
     for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) {
         next = atomic_rcu_read(&bh->next);
+        if (bh->deleted) {
+            deleted = true;
+            continue;
+        }
         /* The atomic_xchg is paired with the one in qemu_bh_schedule.  The
          * implicit memory barrier ensures that the callback sees all writes
          * done by the scheduling thread.  It also ensures that the scheduling
          * thread sees the zero before bh->cb has run, and thus will call
          * aio_notify again if necessary.
          */
-        if (!bh->deleted && atomic_xchg(&bh->scheduled, 0)) {
+        if (atomic_xchg(&bh->scheduled, 0)) {
             /* Idle BHs don't count as progress */
             if (!bh->idle) {
                 ret = 1;
@@ -89,6 +94,11 @@ int aio_bh_poll(AioContext *ctx)
     }
 
     /* remove deleted bhs */
+    if (!deleted) {
+        qemu_lockcnt_dec(&ctx->list_lock);
+        return ret;
+    }
+
     if (qemu_lockcnt_dec_and_lock(&ctx->list_lock)) {
         bhp = &ctx->first_bh;
         while (*bhp) {
@@ -102,7 +112,6 @@ int aio_bh_poll(AioContext *ctx)
         }
         qemu_lockcnt_unlock(&ctx->list_lock);
     }
-
     return ret;
 }
 
-- 
2.4.3

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 11/18] qemu-timer: optimize timerlist_run_timers
  2015-08-06 13:35 [Qemu-devel] [RFC PATCH 00/18] Fine-grained AioContext critical sections Paolo Bonzini
                   ` (9 preceding siblings ...)
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 10/18] async: optimize aio_bh_poll Paolo Bonzini
@ 2015-08-06 13:36 ` Paolo Bonzini
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 12/18] block: explicitly acquire aiocontext in callbacks that need it Paolo Bonzini
                   ` (6 subsequent siblings)
  17 siblings, 0 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-08-06 13:36 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, stefanha, qemu-block

The case of no active timers is relatively common.  Check it
outside the lock.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 qemu-timer.c | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/qemu-timer.c b/qemu-timer.c
index 2463fe6..dc11ab9 100644
--- a/qemu-timer.c
+++ b/qemu-timer.c
@@ -476,6 +476,10 @@ bool timerlist_run_timers(QEMUTimerList *timer_list)
     QEMUTimerCB *cb;
     void *opaque;
 
+    if (!atomic_read(&timer_list->active_timers)) {
+        return false;
+    }
+
     qemu_event_reset(&timer_list->timers_done_ev);
     if (!timer_list->clock->enabled) {
         goto out;
-- 
2.4.3

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 12/18] block: explicitly acquire aiocontext in callbacks that need it
  2015-08-06 13:35 [Qemu-devel] [RFC PATCH 00/18] Fine-grained AioContext critical sections Paolo Bonzini
                   ` (10 preceding siblings ...)
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 11/18] qemu-timer: optimize timerlist_run_timers Paolo Bonzini
@ 2015-08-06 13:36 ` Paolo Bonzini
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 13/18] block: explicitly acquire aiocontext in bottom halves " Paolo Bonzini
                   ` (5 subsequent siblings)
  17 siblings, 0 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-08-06 13:36 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, stefanha, qemu-block

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-posix.c                     |  4 ----
 aio-win32.c                     |  6 ------
 block/curl.c                    | 16 ++++++++++++---
 block/iscsi.c                   |  4 ++++
 block/nbd-client.c              | 14 +++++++++++--
 block/nfs.c                     |  6 ++++++
 block/sheepdog.c                | 29 ++++++++++++++++----------
 block/ssh.c                     | 45 ++++++++++++++++++++---------------------
 block/win32-aio.c               | 10 +++++----
 hw/block/dataplane/virtio-blk.c |  2 ++
 hw/scsi/virtio-scsi-dataplane.c |  6 ++++++
 nbd.c                           |  4 ++++
 12 files changed, 93 insertions(+), 53 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index 4944595..58f0937 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -163,9 +163,7 @@ bool aio_dispatch(AioContext *ctx)
         if (!node->deleted &&
             (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
             node->io_read) {
-            aio_context_acquire(ctx);
             node->io_read(node->opaque);
-            aio_context_release(ctx);
 
             /* aio_notify() does not count as progress */
             if (node->opaque != &ctx->notifier) {
@@ -175,9 +173,7 @@ bool aio_dispatch(AioContext *ctx)
         if (!node->deleted &&
             (revents & (G_IO_OUT | G_IO_ERR)) &&
             node->io_write) {
-            aio_context_acquire(ctx);
             node->io_write(node->opaque);
-            aio_context_release(ctx);
             progress = true;
         }
 
diff --git a/aio-win32.c b/aio-win32.c
index 929ed49..f6608b3 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -239,9 +239,7 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
         if (!node->deleted &&
             (revents || event_notifier_get_handle(node->e) == event) &&
             node->io_notify) {
-            aio_context_acquire(ctx);
             node->io_notify(node->e);
-            aio_context_release(ctx);
 
             /* aio_notify() does not count as progress */
             if (node->e != &ctx->notifier) {
@@ -252,15 +250,11 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
         if (!node->deleted &&
             (node->io_read || node->io_write)) {
             if ((revents & G_IO_IN) && node->io_read) {
-                aio_context_acquire(ctx);
                 node->io_read(node->opaque);
-                aio_context_release(ctx);
                 progress = true;
             }
             if ((revents & G_IO_OUT) && node->io_write) {
-                aio_context_acquire(ctx);
                 node->io_write(node->opaque);
-                aio_context_release(ctx);
                 progress = true;
             }
 
diff --git a/block/curl.c b/block/curl.c
index 032cc8a..b572828 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -330,9 +330,8 @@ static void curl_multi_check_completion(BDRVCURLState *s)
     }
 }
 
-static void curl_multi_do(void *arg)
+static void curl_multi_do_locked(CURLState *s)
 {
-    CURLState *s = (CURLState *)arg;
     int running;
     int r;
 
@@ -346,12 +345,23 @@ static void curl_multi_do(void *arg)
 
 }
 
+static void curl_multi_do(void *arg)
+{
+    CURLState *s = (CURLState *)arg;
+
+    aio_context_acquire(s->s->aio_context);
+    curl_multi_do_locked(s);
+    aio_context_release(s->s->aio_context);
+}
+
 static void curl_multi_read(void *arg)
 {
     CURLState *s = (CURLState *)arg;
 
-    curl_multi_do(arg);
+    aio_context_acquire(s->s->aio_context);
+    curl_multi_do_locked(s);
     curl_multi_check_completion(s->s);
+    aio_context_release(s->s->aio_context);
 }
 
 static void curl_multi_timeout_do(void *arg)
diff --git a/block/iscsi.c b/block/iscsi.c
index 5002916..1c3f99b 100644
--- a/block/iscsi.c
+++ b/block/iscsi.c
@@ -326,8 +326,10 @@ iscsi_process_read(void *arg)
     IscsiLun *iscsilun = arg;
     struct iscsi_context *iscsi = iscsilun->iscsi;
 
+    aio_context_acquire(iscsilun->aio_context);
     iscsi_service(iscsi, POLLIN);
     iscsi_set_events(iscsilun);
+    aio_context_release(iscsilun->aio_context);
 }
 
 static void
@@ -336,8 +338,10 @@ iscsi_process_write(void *arg)
     IscsiLun *iscsilun = arg;
     struct iscsi_context *iscsi = iscsilun->iscsi;
 
+    aio_context_acquire(iscsilun->aio_context);
     iscsi_service(iscsi, POLLOUT);
     iscsi_set_events(iscsilun);
+    aio_context_release(iscsilun->aio_context);
 }
 
 static int64_t sector_lun2qemu(int64_t sector, IscsiLun *iscsilun)
diff --git a/block/nbd-client.c b/block/nbd-client.c
index e1bb919..020399e 100644
--- a/block/nbd-client.c
+++ b/block/nbd-client.c
@@ -56,9 +56,8 @@ static void nbd_teardown_connection(BlockDriverState *bs)
     client->sock = -1;
 }
 
-static void nbd_reply_ready(void *opaque)
+static void nbd_reply_ready_locked(BlockDriverState *bs)
 {
-    BlockDriverState *bs = opaque;
     NbdClientSession *s = nbd_get_client_session(bs);
     uint64_t i;
     int ret;
@@ -95,11 +94,22 @@ fail:
     nbd_teardown_connection(bs);
 }
 
+static void nbd_reply_ready(void *opaque)
+{
+    BlockDriverState *bs = opaque;
+
+    aio_context_acquire(bdrv_get_aio_context(bs));
+    nbd_reply_ready_locked(bs);
+    aio_context_release(bdrv_get_aio_context(bs));
+}
+
 static void nbd_restart_write(void *opaque)
 {
     BlockDriverState *bs = opaque;
 
+    aio_context_acquire(bdrv_get_aio_context(bs));
     qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine, NULL);
+    aio_context_release(bdrv_get_aio_context(bs));
 }
 
 static int nbd_co_send_request(BlockDriverState *bs,
diff --git a/block/nfs.c b/block/nfs.c
index c026ff6..05b02f5 100644
--- a/block/nfs.c
+++ b/block/nfs.c
@@ -75,15 +75,21 @@ static void nfs_set_events(NFSClient *client)
 static void nfs_process_read(void *arg)
 {
     NFSClient *client = arg;
+
+    aio_context_acquire(client->aio_context);
     nfs_service(client->context, POLLIN);
     nfs_set_events(client);
+    aio_context_release(client->aio_context);
 }
 
 static void nfs_process_write(void *arg)
 {
     NFSClient *client = arg;
+
+    aio_context_acquire(client->aio_context);
     nfs_service(client->context, POLLOUT);
     nfs_set_events(client);
+    aio_context_release(client->aio_context);
 }
 
 static void nfs_co_init_task(NFSClient *client, NFSRPC *task)
diff --git a/block/sheepdog.c b/block/sheepdog.c
index 9585beb..2f2731e 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -608,13 +608,6 @@ static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
     return ret;
 }
 
-static void restart_co_req(void *opaque)
-{
-    Coroutine *co = opaque;
-
-    qemu_coroutine_enter(co, NULL);
-}
-
 typedef struct SheepdogReqCo {
     int sockfd;
     AioContext *aio_context;
@@ -624,12 +617,21 @@ typedef struct SheepdogReqCo {
     unsigned int *rlen;
     int ret;
     bool finished;
+    Coroutine *co;
 } SheepdogReqCo;
 
+static void restart_co_req(void *opaque)
+{
+    SheepdogReqCo *srco = opaque;
+
+    aio_context_acquire(srco->aio_context);
+    qemu_coroutine_enter(srco->co, NULL);
+    aio_context_release(srco->aio_context);
+}
+
 static coroutine_fn void do_co_req(void *opaque)
 {
     int ret;
-    Coroutine *co;
     SheepdogReqCo *srco = opaque;
     int sockfd = srco->sockfd;
     SheepdogReq *hdr = srco->hdr;
@@ -637,15 +639,15 @@ static coroutine_fn void do_co_req(void *opaque)
     unsigned int *wlen = srco->wlen;
     unsigned int *rlen = srco->rlen;
 
-    co = qemu_coroutine_self();
-    aio_set_fd_handler(srco->aio_context, sockfd, NULL, restart_co_req, co);
+    srco->co = qemu_coroutine_self();
+    aio_set_fd_handler(srco->aio_context, sockfd, NULL, restart_co_req, srco);
 
     ret = send_co_req(sockfd, hdr, data, wlen);
     if (ret < 0) {
         goto out;
     }
 
-    aio_set_fd_handler(srco->aio_context, sockfd, restart_co_req, NULL, co);
+    aio_set_fd_handler(srco->aio_context, sockfd, restart_co_req, NULL, srco);
 
     ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
     if (ret != sizeof(*hdr)) {
@@ -672,6 +674,7 @@ out:
      * set each handler to NULL. */
     aio_set_fd_handler(srco->aio_context, sockfd, NULL, NULL, NULL);
 
+    srco->co = NULL;
     srco->ret = ret;
     srco->finished = true;
 }
@@ -904,14 +907,18 @@ static void co_read_response(void *opaque)
         s->co_recv = qemu_coroutine_create(aio_read_response);
     }
 
+    aio_context_acquire(s->aio_context);
     qemu_coroutine_enter(s->co_recv, opaque);
+    aio_context_release(s->aio_context);
 }
 
 static void co_write_request(void *opaque)
 {
     BDRVSheepdogState *s = opaque;
 
+    aio_context_acquire(s->aio_context);
     qemu_coroutine_enter(s->co_send, NULL);
+    aio_context_release(s->aio_context);
 }
 
 /*
diff --git a/block/ssh.c b/block/ssh.c
index 8d06739..783df9e 100644
--- a/block/ssh.c
+++ b/block/ssh.c
@@ -775,20 +775,34 @@ static int ssh_has_zero_init(BlockDriverState *bs)
     return has_zero_init;
 }
 
+typedef struct BDRVSSHRestart {
+    Coroutine *co;
+    AioContext *ctx;
+} BDRVSSHRestart;
+
 static void restart_coroutine(void *opaque)
 {
-    Coroutine *co = opaque;
+    BDRVSSHRestart *restart = opaque;
 
-    DPRINTF("co=%p", co);
+    DPRINTF("ctx=%p co=%p", restart->ctx, restart->co);
 
-    qemu_coroutine_enter(co, NULL);
+    aio_context_acquire(restart->ctx);
+    qemu_coroutine_enter(restart->co, NULL);
+    aio_context_release(restart->ctx);
 }
 
-static coroutine_fn void set_fd_handler(BDRVSSHState *s, BlockDriverState *bs)
+/* A non-blocking call returned EAGAIN, so yield, ensuring the
+ * handlers are set up so that we'll be rescheduled when there is an
+ * interesting event on the socket.
+ */
+static coroutine_fn void co_yield(BDRVSSHState *s, BlockDriverState *bs)
 {
     int r;
     IOHandler *rd_handler = NULL, *wr_handler = NULL;
-    Coroutine *co = qemu_coroutine_self();
+    BDRVSSHRestart restart = {
+        .ctx = bdrv_get_aio_context(bs),
+        .co = qemu_coroutine_self()
+    };
 
     r = libssh2_session_block_directions(s->session);
 
@@ -803,25 +817,10 @@ static coroutine_fn void set_fd_handler(BDRVSSHState *s, BlockDriverState *bs)
             rd_handler, wr_handler);
 
     aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock,
-                       rd_handler, wr_handler, co);
-}
-
-static coroutine_fn void clear_fd_handler(BDRVSSHState *s,
-                                          BlockDriverState *bs)
-{
-    DPRINTF("s->sock=%d", s->sock);
-    aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock, NULL, NULL, NULL);
-}
-
-/* A non-blocking call returned EAGAIN, so yield, ensuring the
- * handlers are set up so that we'll be rescheduled when there is an
- * interesting event on the socket.
- */
-static coroutine_fn void co_yield(BDRVSSHState *s, BlockDriverState *bs)
-{
-    set_fd_handler(s, bs);
+                       rd_handler, wr_handler, &restart);
     qemu_coroutine_yield();
-    clear_fd_handler(s, bs);
+    DPRINTF("s->sock=%d - back", s->sock);
+    aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock, NULL, NULL, NULL);
 }
 
 /* SFTP has a function `libssh2_sftp_seek64' which seeks to a position
diff --git a/block/win32-aio.c b/block/win32-aio.c
index 64e8682..a78d149 100644
--- a/block/win32-aio.c
+++ b/block/win32-aio.c
@@ -40,7 +40,7 @@ struct QEMUWin32AIOState {
     HANDLE hIOCP;
     EventNotifier e;
     int count;
-    bool is_aio_context_attached;
+    AioContext *aio_ctx;
 };
 
 typedef struct QEMUWin32AIOCB {
@@ -87,7 +87,9 @@ static void win32_aio_process_completion(QEMUWin32AIOState *s,
     }
 
 
+    aio_context_acquire(s->aio_ctx);
     waiocb->common.cb(waiocb->common.opaque, ret);
+    aio_context_release(s->aio_ctx);
     qemu_aio_unref(waiocb);
 }
 
@@ -175,13 +177,13 @@ void win32_aio_detach_aio_context(QEMUWin32AIOState *aio,
                                   AioContext *old_context)
 {
     aio_set_event_notifier(old_context, &aio->e, NULL);
-    aio->is_aio_context_attached = false;
+    aio->aio_ctx = NULL;
 }
 
 void win32_aio_attach_aio_context(QEMUWin32AIOState *aio,
                                   AioContext *new_context)
 {
-    aio->is_aio_context_attached = true;
+    aio->aio_ctx = new_context;
     aio_set_event_notifier(new_context, &aio->e, win32_aio_completion_cb);
 }
 
@@ -210,7 +212,7 @@ out_free_state:
 
 void win32_aio_cleanup(QEMUWin32AIOState *aio)
 {
-    assert(!aio->is_aio_context_attached);
+    assert(!aio->aio_ctx);
     CloseHandle(aio->hIOCP);
     event_notifier_cleanup(&aio->e);
     g_free(aio);
diff --git a/hw/block/dataplane/virtio-blk.c b/hw/block/dataplane/virtio-blk.c
index 6106e46..7c11fbd 100644
--- a/hw/block/dataplane/virtio-blk.c
+++ b/hw/block/dataplane/virtio-blk.c
@@ -95,6 +95,7 @@ static void handle_notify(EventNotifier *e)
     VirtIOBlock *vblk = VIRTIO_BLK(s->vdev);
 
     event_notifier_test_and_clear(&s->host_notifier);
+    aio_context_acquire(s->ctx);
     blk_io_plug(s->conf->conf.blk);
     for (;;) {
         MultiReqBuffer mrb = {};
@@ -135,6 +136,7 @@ static void handle_notify(EventNotifier *e)
         }
     }
     blk_io_unplug(s->conf->conf.blk);
+    aio_context_release(s->ctx);
 }
 
 /* Context: QEMU global mutex held */
diff --git a/hw/scsi/virtio-scsi-dataplane.c b/hw/scsi/virtio-scsi-dataplane.c
index 5575648..b588b692 100644
--- a/hw/scsi/virtio-scsi-dataplane.c
+++ b/hw/scsi/virtio-scsi-dataplane.c
@@ -112,9 +112,11 @@ static void virtio_scsi_iothread_handle_ctrl(EventNotifier *notifier)
     VirtIOSCSIReq *req;
 
     event_notifier_test_and_clear(notifier);
+    aio_context_acquire(s->ctx);
     while ((req = virtio_scsi_pop_req_vring(s, vring))) {
         virtio_scsi_handle_ctrl_req(s, req);
     }
+    aio_context_release(s->ctx);
 }
 
 static void virtio_scsi_iothread_handle_event(EventNotifier *notifier)
@@ -130,9 +132,11 @@ static void virtio_scsi_iothread_handle_event(EventNotifier *notifier)
         return;
     }
 
+    aio_context_acquire(s->ctx);
     if (s->events_dropped) {
         virtio_scsi_push_event(s, NULL, VIRTIO_SCSI_T_NO_EVENT, 0);
     }
+    aio_context_release(s->ctx);
 }
 
 static void virtio_scsi_iothread_handle_cmd(EventNotifier *notifier)
@@ -144,6 +148,7 @@ static void virtio_scsi_iothread_handle_cmd(EventNotifier *notifier)
     QTAILQ_HEAD(, VirtIOSCSIReq) reqs = QTAILQ_HEAD_INITIALIZER(reqs);
 
     event_notifier_test_and_clear(notifier);
+    aio_context_acquire(s->ctx);
     while ((req = virtio_scsi_pop_req_vring(s, vring))) {
         if (virtio_scsi_handle_cmd_req_prepare(s, req)) {
             QTAILQ_INSERT_TAIL(&reqs, req, next);
@@ -153,6 +158,7 @@ static void virtio_scsi_iothread_handle_cmd(EventNotifier *notifier)
     QTAILQ_FOREACH_SAFE(req, &reqs, next, next) {
         virtio_scsi_handle_cmd_req_submit(s, req);
     }
+    aio_context_release(s->ctx);
 }
 
 /* assumes s->ctx held */
diff --git a/nbd.c b/nbd.c
index 06b501b..497189c 100644
--- a/nbd.c
+++ b/nbd.c
@@ -1436,6 +1436,10 @@ static void nbd_restart_write(void *opaque)
 static void nbd_set_handlers(NBDClient *client)
 {
     if (client->exp && client->exp->ctx) {
+        /* Note that the handlers do not expect any concurrency; qemu-nbd
+         * does not instantiate multiple AioContexts yet, nor does it call
+         * aio_poll/aio_dispatch from multiple threads.
+         */
         aio_set_fd_handler(client->exp->ctx, client->sock,
                            client->can_read ? nbd_read : NULL,
                            client->send_coroutine ? nbd_restart_write : NULL,
-- 
2.4.3

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 13/18] block: explicitly acquire aiocontext in bottom halves that need it
  2015-08-06 13:35 [Qemu-devel] [RFC PATCH 00/18] Fine-grained AioContext critical sections Paolo Bonzini
                   ` (11 preceding siblings ...)
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 12/18] block: explicitly acquire aiocontext in callbacks that need it Paolo Bonzini
@ 2015-08-06 13:36 ` Paolo Bonzini
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 14/18] block: explicitly acquire aiocontext in timers " Paolo Bonzini
                   ` (4 subsequent siblings)
  17 siblings, 0 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-08-06 13:36 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, stefanha, qemu-block

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 async.c               |  2 --
 block/archipelago.c   |  3 +++
 block/blkdebug.c      |  4 ++++
 block/blkverify.c     |  3 +++
 block/block-backend.c |  4 ++++
 block/curl.c          | 25 +++++++++++++++++--------
 block/gluster.c       |  2 ++
 block/io.c            |  6 ++++++
 block/iscsi.c         |  4 ++++
 block/linux-aio.c     |  7 +++++++
 block/nfs.c           |  4 ++++
 block/null.c          |  4 ++++
 block/qed.c           |  3 +++
 block/rbd.c           |  4 ++++
 dma-helpers.c         |  7 +++++--
 hw/block/virtio-blk.c |  2 ++
 hw/scsi/scsi-bus.c    |  2 ++
 thread-pool.c         |  2 ++
 18 files changed, 76 insertions(+), 12 deletions(-)

diff --git a/async.c b/async.c
index 1fce3e4..6186901 100644
--- a/async.c
+++ b/async.c
@@ -87,9 +87,7 @@ int aio_bh_poll(AioContext *ctx)
                 ret = 1;
             }
             bh->idle = 0;
-            aio_context_acquire(ctx);
             bh->cb(bh->opaque);
-            aio_context_release(ctx);
         }
     }
 
diff --git a/block/archipelago.c b/block/archipelago.c
index 855655c..7f69a3f 100644
--- a/block/archipelago.c
+++ b/block/archipelago.c
@@ -312,9 +312,12 @@ static void qemu_archipelago_complete_aio(void *opaque)
 {
     AIORequestData *reqdata = (AIORequestData *) opaque;
     ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) reqdata->aio_cb;
+    AioContext *ctx = bdrv_get_aio_context(aio_cb->common.bs);
 
     qemu_bh_delete(aio_cb->bh);
+    aio_context_acquire(ctx);
     aio_cb->common.cb(aio_cb->common.opaque, aio_cb->ret);
+    aio_context_release(ctx);
     aio_cb->status = 0;
 
     qemu_aio_unref(aio_cb);
diff --git a/block/blkdebug.c b/block/blkdebug.c
index bc247f4..2130811 100644
--- a/block/blkdebug.c
+++ b/block/blkdebug.c
@@ -458,8 +458,12 @@ out:
 static void error_callback_bh(void *opaque)
 {
     struct BlkdebugAIOCB *acb = opaque;
+    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
+
     qemu_bh_delete(acb->bh);
+    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, acb->ret);
+    aio_context_release(ctx);
     qemu_aio_unref(acb);
 }
 
diff --git a/block/blkverify.c b/block/blkverify.c
index d277e63..510c198 100644
--- a/block/blkverify.c
+++ b/block/blkverify.c
@@ -185,13 +185,16 @@ static BlkverifyAIOCB *blkverify_aio_get(BlockDriverState *bs, bool is_write,
 static void blkverify_aio_bh(void *opaque)
 {
     BlkverifyAIOCB *acb = opaque;
+    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
 
     qemu_bh_delete(acb->bh);
     if (acb->buf) {
         qemu_iovec_destroy(&acb->raw_qiov);
         qemu_vfree(acb->buf);
     }
+    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, acb->ret);
+    aio_context_release(ctx);
     qemu_aio_unref(acb);
 }
 
diff --git a/block/block-backend.c b/block/block-backend.c
index aee8a12..185ba32 100644
--- a/block/block-backend.c
+++ b/block/block-backend.c
@@ -529,8 +529,12 @@ int blk_write_zeroes(BlockBackend *blk, int64_t sector_num,
 static void error_callback_bh(void *opaque)
 {
     struct BlockBackendAIOCB *acb = opaque;
+    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
+
     qemu_bh_delete(acb->bh);
+    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, acb->ret);
+    aio_context_release(ctx);
     qemu_aio_unref(acb);
 }
 
diff --git a/block/curl.c b/block/curl.c
index b572828..446a6d9 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -651,10 +651,14 @@ static void curl_readv_bh_cb(void *p)
 {
     CURLState *state;
     int running;
+    int ret = -EINPROGRESS;
 
     CURLAIOCB *acb = p;
-    BDRVCURLState *s = acb->common.bs->opaque;
+    BlockDriverState *bs = acb->common.bs;
+    BDRVCURLState *s = bs->opaque;
+    AioContext *ctx = bdrv_get_aio_context(bs);
 
+    aio_context_acquire(ctx);
     qemu_bh_delete(acb->bh);
     acb->bh = NULL;
 
@@ -668,7 +672,7 @@ static void curl_readv_bh_cb(void *p)
             qemu_aio_unref(acb);
             // fall through
         case FIND_RET_WAIT:
-            return;
+            goto out;
         default:
             break;
     }
@@ -676,9 +680,8 @@ static void curl_readv_bh_cb(void *p)
     // No cache found, so let's start a new request
     state = curl_init_state(acb->common.bs, s);
     if (!state) {
-        acb->common.cb(acb->common.opaque, -EIO);
-        qemu_aio_unref(acb);
-        return;
+        ret = -EIO;
+        goto out;
     }
 
     acb->start = 0;
@@ -692,9 +695,8 @@ static void curl_readv_bh_cb(void *p)
     state->orig_buf = g_try_malloc(state->buf_len);
     if (state->buf_len && state->orig_buf == NULL) {
         curl_clean_state(state);
-        acb->common.cb(acb->common.opaque, -ENOMEM);
-        qemu_aio_unref(acb);
-        return;
+        ret = -ENOMEM;
+        goto out;
     }
     state->acb[0] = acb;
 
@@ -707,6 +709,13 @@ static void curl_readv_bh_cb(void *p)
 
     /* Tell curl it needs to kick things off */
     curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);
+
+out:
+    if (ret != -EINPROGRESS) {
+        acb->common.cb(acb->common.opaque, ret);
+        qemu_aio_unref(acb);
+    }
+    aio_context_release(ctx);
 }
 
 static BlockAIOCB *curl_aio_readv(BlockDriverState *bs,
diff --git a/block/gluster.c b/block/gluster.c
index 1eb3a8c..35d4230 100644
--- a/block/gluster.c
+++ b/block/gluster.c
@@ -232,7 +232,9 @@ static void qemu_gluster_complete_aio(void *opaque)
 
     qemu_bh_delete(acb->bh);
     acb->bh = NULL;
+    aio_context_acquire(acb->aio_context);
     qemu_coroutine_enter(acb->coroutine, NULL);
+    aio_context_release(acb->aio_context);
 }
 
 /*
diff --git a/block/io.c b/block/io.c
index d4bc83b..74f5705 100644
--- a/block/io.c
+++ b/block/io.c
@@ -2002,12 +2002,15 @@ static const AIOCBInfo bdrv_em_aiocb_info = {
 static void bdrv_aio_bh_cb(void *opaque)
 {
     BlockAIOCBSync *acb = opaque;
+    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
 
     if (!acb->is_write && acb->ret >= 0) {
         qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
     }
     qemu_vfree(acb->bounce);
+    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, acb->ret);
+    aio_context_release(ctx);
     qemu_bh_delete(acb->bh);
     acb->bh = NULL;
     qemu_aio_unref(acb);
@@ -2083,10 +2086,13 @@ static void bdrv_co_complete(BlockAIOCBCoroutine *acb)
 static void bdrv_co_em_bh(void *opaque)
 {
     BlockAIOCBCoroutine *acb = opaque;
+    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
 
     assert(!acb->need_bh);
     qemu_bh_delete(acb->bh);
+    aio_context_acquire(ctx);
     bdrv_co_complete(acb);
+    aio_context_release(ctx);
 }
 
 static void bdrv_co_maybe_schedule_bh(BlockAIOCBCoroutine *acb)
diff --git a/block/iscsi.c b/block/iscsi.c
index 1c3f99b..9948e70 100644
--- a/block/iscsi.c
+++ b/block/iscsi.c
@@ -150,9 +150,13 @@ iscsi_schedule_bh(IscsiAIOCB *acb)
 static void iscsi_co_generic_bh_cb(void *opaque)
 {
     struct IscsiTask *iTask = opaque;
+    AioContext *ctx = iTask->iscsilun->aio_context;
+
     iTask->complete = 1;
     qemu_bh_delete(iTask->bh);
+    aio_context_acquire(ctx);
     qemu_coroutine_enter(iTask->co, NULL);
+    aio_context_release(ctx);
 }
 
 static void iscsi_retry_timer_expired(void *opaque)
diff --git a/block/linux-aio.c b/block/linux-aio.c
index c991443..bc83e5c 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -46,6 +46,8 @@ typedef struct {
 } LaioQueue;
 
 struct qemu_laio_state {
+    AioContext *aio_context;
+
     io_context_t ctx;
     EventNotifier e;
 
@@ -109,6 +111,7 @@ static void qemu_laio_completion_bh(void *opaque)
     struct qemu_laio_state *s = opaque;
 
     /* Fetch more completion events when empty */
+    aio_context_acquire(s->aio_context);
     if (s->event_idx == s->event_max) {
         do {
             struct timespec ts = { 0 };
@@ -141,6 +144,8 @@ static void qemu_laio_completion_bh(void *opaque)
     if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
         ioq_submit(s);
     }
+
+    aio_context_release(s->aio_context);
 }
 
 static void qemu_laio_completion_cb(EventNotifier *e)
@@ -289,12 +294,14 @@ void laio_detach_aio_context(void *s_, AioContext *old_context)
 
     aio_set_event_notifier(old_context, &s->e, NULL);
     qemu_bh_delete(s->completion_bh);
+    s->aio_context = NULL;
 }
 
 void laio_attach_aio_context(void *s_, AioContext *new_context)
 {
     struct qemu_laio_state *s = s_;
 
+    s->aio_context = new_context;
     s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
     aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
 }
diff --git a/block/nfs.c b/block/nfs.c
index 05b02f5..ffab087 100644
--- a/block/nfs.c
+++ b/block/nfs.c
@@ -103,9 +103,13 @@ static void nfs_co_init_task(NFSClient *client, NFSRPC *task)
 static void nfs_co_generic_bh_cb(void *opaque)
 {
     NFSRPC *task = opaque;
+    AioContext *ctx = task->client->aio_context;
+
     task->complete = 1;
     qemu_bh_delete(task->bh);
+    aio_context_acquire(ctx);
     qemu_coroutine_enter(task->co, NULL);
+    aio_context_release(ctx);
 }
 
 static void
diff --git a/block/null.c b/block/null.c
index 7d08323..dd1b170 100644
--- a/block/null.c
+++ b/block/null.c
@@ -117,7 +117,11 @@ static const AIOCBInfo null_aiocb_info = {
 static void null_bh_cb(void *opaque)
 {
     NullAIOCB *acb = opaque;
+    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
+
+    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, 0);
+    aio_context_release(ctx);
     qemu_bh_delete(acb->bh);
     qemu_aio_unref(acb);
 }
diff --git a/block/qed.c b/block/qed.c
index 954ed00..d47d7e1 100644
--- a/block/qed.c
+++ b/block/qed.c
@@ -910,12 +910,15 @@ static void qed_aio_complete_bh(void *opaque)
     BlockCompletionFunc *cb = acb->common.cb;
     void *user_opaque = acb->common.opaque;
     int ret = acb->bh_ret;
+    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
 
     qemu_bh_delete(acb->bh);
     qemu_aio_unref(acb);
 
     /* Invoke callback */
+    aio_context_acquire(ctx);
     cb(user_opaque, ret);
+    aio_context_release(ctx);
 }
 
 static void qed_aio_complete(QEDAIOCB *acb, int ret)
diff --git a/block/rbd.c b/block/rbd.c
index a60a19d..6206dc3 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -376,6 +376,7 @@ static int qemu_rbd_create(const char *filename, QemuOpts *opts, Error **errp)
 static void qemu_rbd_complete_aio(RADOSCB *rcb)
 {
     RBDAIOCB *acb = rcb->acb;
+    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
     int64_t r;
 
     r = rcb->ret;
@@ -408,7 +409,10 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
         qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
     }
     qemu_vfree(acb->bounce);
+
+    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
+    aio_context_release(ctx);
 
     qemu_aio_unref(acb);
 }
diff --git a/dma-helpers.c b/dma-helpers.c
index 4faec5d..68f6f07 100644
--- a/dma-helpers.c
+++ b/dma-helpers.c
@@ -69,6 +69,7 @@ void qemu_sglist_destroy(QEMUSGList *qsg)
 
 typedef struct {
     BlockAIOCB common;
+    AioContext *ctx;
     BlockBackend *blk;
     BlockAIOCB *acb;
     QEMUSGList *sg;
@@ -153,8 +154,7 @@ static void dma_blk_cb(void *opaque, int ret)
 
     if (dbs->iov.size == 0) {
         trace_dma_map_wait(dbs);
-        dbs->bh = aio_bh_new(blk_get_aio_context(dbs->blk),
-                             reschedule_dma, dbs);
+        dbs->bh = aio_bh_new(dbs->ctx, reschedule_dma, dbs);
         cpu_register_map_client(dbs->bh);
         return;
     }
@@ -163,8 +163,10 @@ static void dma_blk_cb(void *opaque, int ret)
         qemu_iovec_discard_back(&dbs->iov, dbs->iov.size & ~BDRV_SECTOR_MASK);
     }
 
+    aio_context_acquire(dbs->ctx);
     dbs->acb = dbs->io_func(dbs->blk, dbs->sector_num, &dbs->iov,
                             dbs->iov.size / 512, dma_blk_cb, dbs);
+    aio_context_release(dbs->ctx);
     assert(dbs->acb);
 }
 
@@ -201,6 +203,7 @@ BlockAIOCB *dma_blk_io(
 
     dbs->acb = NULL;
     dbs->blk = blk;
+    dbs->ctx = blk_get_aio_context(blk);
     dbs->sg = sg;
     dbs->sector_num = sector_num;
     dbs->sg_cur_index = 0;
diff --git a/hw/block/virtio-blk.c b/hw/block/virtio-blk.c
index 1556c9c..4462ad2 100644
--- a/hw/block/virtio-blk.c
+++ b/hw/block/virtio-blk.c
@@ -620,6 +620,7 @@ static void virtio_blk_dma_restart_bh(void *opaque)
 
     s->rq = NULL;
 
+    aio_context_acquire(blk_get_aio_context(s->conf.conf.blk));
     while (req) {
         VirtIOBlockReq *next = req->next;
         virtio_blk_handle_request(req, &mrb);
@@ -629,6 +630,7 @@ static void virtio_blk_dma_restart_bh(void *opaque)
     if (mrb.num_reqs) {
         virtio_blk_submit_multireq(s->blk, &mrb);
     }
+    aio_context_release(blk_get_aio_context(s->conf.conf.blk));
 }
 
 static void virtio_blk_dma_restart_cb(void *opaque, int running,
diff --git a/hw/scsi/scsi-bus.c b/hw/scsi/scsi-bus.c
index ffac8f4..c42e854 100644
--- a/hw/scsi/scsi-bus.c
+++ b/hw/scsi/scsi-bus.c
@@ -102,6 +102,7 @@ static void scsi_dma_restart_bh(void *opaque)
     qemu_bh_delete(s->bh);
     s->bh = NULL;
 
+    aio_context_acquire(blk_get_aio_context(s->conf.blk));
     QTAILQ_FOREACH_SAFE(req, &s->requests, next, next) {
         scsi_req_ref(req);
         if (req->retry) {
@@ -119,6 +120,7 @@ static void scsi_dma_restart_bh(void *opaque)
         }
         scsi_req_unref(req);
     }
+    aio_context_release(blk_get_aio_context(s->conf.blk));
 }
 
 void scsi_req_retry(SCSIRequest *req)
diff --git a/thread-pool.c b/thread-pool.c
index ac909f4..1039188 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -165,6 +165,7 @@ static void thread_pool_completion_bh(void *opaque)
     ThreadPool *pool = opaque;
     ThreadPoolElement *elem, *next;
 
+    aio_context_acquire(pool->ctx);
 restart:
     QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
         if (elem->state != THREAD_DONE) {
@@ -191,6 +192,7 @@ restart:
             qemu_aio_unref(elem);
         }
     }
+    aio_context_release(pool->ctx);
 }
 
 static void thread_pool_cancel(BlockAIOCB *acb)
-- 
2.4.3

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 14/18] block: explicitly acquire aiocontext in timers that need it
  2015-08-06 13:35 [Qemu-devel] [RFC PATCH 00/18] Fine-grained AioContext critical sections Paolo Bonzini
                   ` (12 preceding siblings ...)
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 13/18] block: explicitly acquire aiocontext in bottom halves " Paolo Bonzini
@ 2015-08-06 13:36 ` Paolo Bonzini
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 15/18] quorum: use atomics for rewrite_count Paolo Bonzini
                   ` (3 subsequent siblings)
  17 siblings, 0 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-08-06 13:36 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, stefanha, qemu-block

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-posix.c             | 2 --
 aio-win32.c             | 2 --
 block/curl.c            | 2 ++
 block/iscsi.c           | 2 ++
 block/null.c            | 4 ++++
 block/qed.c             | 2 ++
 block/throttle-groups.c | 2 ++
 qemu-coroutine-sleep.c  | 5 +++++
 8 files changed, 17 insertions(+), 4 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index 58f0937..6cc7ada 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -189,9 +189,7 @@ bool aio_dispatch(AioContext *ctx)
     qemu_lockcnt_dec(&ctx->list_lock);
 
     /* Run our timers */
-    aio_context_acquire(ctx);
     progress |= timerlistgroup_run_timers(&ctx->tlg);
-    aio_context_release(ctx);
 
     return progress;
 }
diff --git a/aio-win32.c b/aio-win32.c
index f6608b3..ed0306a 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -366,9 +366,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
         progress |= aio_dispatch_handlers(ctx, event);
     } while (count > 0);
 
-    aio_context_acquire(ctx);
     progress |= timerlistgroup_run_timers(&ctx->tlg);
-    aio_context_release(ctx);
 
     return progress;
 }
diff --git a/block/curl.c b/block/curl.c
index 446a6d9..fc70b01 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -374,9 +374,11 @@ static void curl_multi_timeout_do(void *arg)
         return;
     }
 
+    aio_context_acquire(s->aio_context);
     curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);
 
     curl_multi_check_completion(s);
+    aio_context_release(s->aio_context);
 #else
     abort();
 #endif
diff --git a/block/iscsi.c b/block/iscsi.c
index 9948e70..2a4e6bc 100644
--- a/block/iscsi.c
+++ b/block/iscsi.c
@@ -1155,6 +1155,7 @@ static void iscsi_nop_timed_event(void *opaque)
 {
     IscsiLun *iscsilun = opaque;
 
+    aio_context_acquire(iscsilun->aio_context);
     if (iscsi_get_nops_in_flight(iscsilun->iscsi) >= MAX_NOP_FAILURES) {
         error_report("iSCSI: NOP timeout. Reconnecting...");
         iscsilun->request_timed_out = true;
@@ -1165,6 +1166,7 @@ static void iscsi_nop_timed_event(void *opaque)
 
     timer_mod(iscsilun->nop_timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) + NOP_INTERVAL);
     iscsi_set_events(iscsilun);
+    aio_context_release(iscsilun->aio_context);
 }
 
 static void iscsi_readcapacity_sync(IscsiLun *iscsilun, Error **errp)
diff --git a/block/null.c b/block/null.c
index dd1b170..9bddc1b 100644
--- a/block/null.c
+++ b/block/null.c
@@ -129,7 +129,11 @@ static void null_bh_cb(void *opaque)
 static void null_timer_cb(void *opaque)
 {
     NullAIOCB *acb = opaque;
+    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
+
+    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, 0);
+    aio_context_release(ctx);
     timer_deinit(&acb->timer);
     qemu_aio_unref(acb);
 }
diff --git a/block/qed.c b/block/qed.c
index d47d7e1..adc9b73 100644
--- a/block/qed.c
+++ b/block/qed.c
@@ -330,10 +330,12 @@ static void qed_need_check_timer_cb(void *opaque)
 
     trace_qed_need_check_timer_cb(s);
 
+    aio_context_acquire(bdrv_get_aio_context(s->bs));
     qed_plug_allocating_write_reqs(s);
 
     /* Ensure writes are on disk before clearing flag */
     bdrv_aio_flush(s->bs, qed_clear_need_check, s);
+    aio_context_release(bdrv_get_aio_context(s->bs));
 }
 
 static void qed_start_need_check_timer(BDRVQEDState *s)
diff --git a/block/throttle-groups.c b/block/throttle-groups.c
index 1abc6fc..9d8a620 100644
--- a/block/throttle-groups.c
+++ b/block/throttle-groups.c
@@ -369,7 +369,9 @@ static void timer_cb(BlockDriverState *bs, bool is_write)
     qemu_mutex_unlock(&tg->lock);
 
     /* Run the request that was waiting for this timer */
+    aio_context_acquire(bdrv_get_aio_context(bs));
     empty_queue = !qemu_co_enter_next(&bs->throttled_reqs[is_write]);
+    aio_context_release(bdrv_get_aio_context(bs));
 
     /* If the request queue was empty then we have to take care of
      * scheduling the next one */
diff --git a/qemu-coroutine-sleep.c b/qemu-coroutine-sleep.c
index 9abb7fd..6014e7c 100644
--- a/qemu-coroutine-sleep.c
+++ b/qemu-coroutine-sleep.c
@@ -18,13 +18,17 @@
 typedef struct CoSleepCB {
     QEMUTimer *ts;
     Coroutine *co;
+    AioContext *ctx;
 } CoSleepCB;
 
 static void co_sleep_cb(void *opaque)
 {
     CoSleepCB *sleep_cb = opaque;
+    AioContext *ctx = sleep_cb->ctx;
 
+    aio_context_acquire(ctx);
     qemu_coroutine_enter(sleep_cb->co, NULL);
+    aio_context_release(ctx);
 }
 
 void coroutine_fn co_aio_sleep_ns(AioContext *ctx, QEMUClockType type,
@@ -32,6 +36,7 @@ void coroutine_fn co_aio_sleep_ns(AioContext *ctx, QEMUClockType type,
 {
     CoSleepCB sleep_cb = {
         .co = qemu_coroutine_self(),
+        .ctx = ctx,
     };
     sleep_cb.ts = aio_timer_new(ctx, type, SCALE_NS, co_sleep_cb, &sleep_cb);
     timer_mod(sleep_cb.ts, qemu_clock_get_ns(type) + ns);
-- 
2.4.3

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 15/18] quorum: use atomics for rewrite_count
  2015-08-06 13:35 [Qemu-devel] [RFC PATCH 00/18] Fine-grained AioContext critical sections Paolo Bonzini
                   ` (13 preceding siblings ...)
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 14/18] block: explicitly acquire aiocontext in timers " Paolo Bonzini
@ 2015-08-06 13:36 ` Paolo Bonzini
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 16/18] quorum: split quorum_fifo_aio_cb from quorum_aio_cb Paolo Bonzini
                   ` (2 subsequent siblings)
  17 siblings, 0 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-08-06 13:36 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, stefanha, qemu-block

This is simpler than taking and releasing the AioContext lock.  Both
quorum_rewrite_aio_cb and quorum_aio_finalize will be called without
any lock taken.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/quorum.c | 11 +++--------
 1 file changed, 3 insertions(+), 8 deletions(-)

diff --git a/block/quorum.c b/block/quorum.c
index 4e66221..0a42962 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -250,15 +250,10 @@ static void quorum_rewrite_aio_cb(void *opaque, int ret)
 {
     QuorumAIOCB *acb = opaque;
 
-    /* one less rewrite to do */
-    acb->rewrite_count--;
-
     /* wait until all rewrite callbacks have completed */
-    if (acb->rewrite_count) {
-        return;
+    if (atomic_fetch_dec(&acb->rewrite_count) == 1) {
+        quorum_aio_finalize(acb);
     }
-
-    quorum_aio_finalize(acb);
 }
 
 static BlockAIOCB *read_fifo_child(QuorumAIOCB *acb);
@@ -361,7 +356,7 @@ static bool quorum_rewrite_bad_versions(BDRVQuorumState *s, QuorumAIOCB *acb,
     }
 
     /* quorum_rewrite_aio_cb will count down this to zero */
-    acb->rewrite_count = count;
+    atomic_mb_set(&acb->rewrite_count, count);
 
     /* now fire the correcting rewrites */
     QLIST_FOREACH(version, &acb->votes.vote_list, next) {
-- 
2.4.3

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 16/18] quorum: split quorum_fifo_aio_cb from quorum_aio_cb
  2015-08-06 13:35 [Qemu-devel] [RFC PATCH 00/18] Fine-grained AioContext critical sections Paolo Bonzini
                   ` (14 preceding siblings ...)
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 15/18] quorum: use atomics for rewrite_count Paolo Bonzini
@ 2015-08-06 13:36 ` Paolo Bonzini
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 17/18] block: explicitly acquire aiocontext in aio callbacks that need it Paolo Bonzini
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 18/18] aio: update locking documentation Paolo Bonzini
  17 siblings, 0 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-08-06 13:36 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, stefanha, qemu-block

The two cases of quorum_aio_cb are called from different paths, clarify
that with an assertion.  Also note that the FIFO read pattern is not
doing rewrites.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/quorum.c | 37 +++++++++++++++++++++++--------------
 1 file changed, 23 insertions(+), 14 deletions(-)

diff --git a/block/quorum.c b/block/quorum.c
index 0a42962..026528e 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -271,28 +271,37 @@ static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
     }
 }
 
-static void quorum_aio_cb(void *opaque, int ret)
+static void quorum_fifo_aio_cb(void *opaque, int ret)
 {
     QuorumChildRequest *sacb = opaque;
     QuorumAIOCB *acb = sacb->parent;
     BDRVQuorumState *s = acb->common.bs->opaque;
-    bool rewrite = false;
 
-    if (acb->is_read && s->read_pattern == QUORUM_READ_PATTERN_FIFO) {
-        /* We try to read next child in FIFO order if we fail to read */
-        if (ret < 0 && ++acb->child_iter < s->num_children) {
-            read_fifo_child(acb);
-            return;
-        }
+    assert(acb->is_read && s->read_pattern == QUORUM_READ_PATTERN_FIFO);
 
-        if (ret == 0) {
-            quorum_copy_qiov(acb->qiov, &acb->qcrs[acb->child_iter].qiov);
-        }
-        acb->vote_ret = ret;
-        quorum_aio_finalize(acb);
+    /* We try to read next child in FIFO order if we fail to read */
+    if (ret < 0 && ++acb->child_iter < s->num_children) {
+        read_fifo_child(acb);
         return;
     }
 
+    if (ret == 0) {
+        quorum_copy_qiov(acb->qiov, &acb->qcrs[acb->child_iter].qiov);
+    }
+    acb->vote_ret = ret;
+
+    /* FIXME: rewrite failed children if acb->child_iter > 0? */
+
+    quorum_aio_finalize(acb);
+}
+
+static void quorum_aio_cb(void *opaque, int ret)
+{
+    QuorumChildRequest *sacb = opaque;
+    QuorumAIOCB *acb = sacb->parent;
+    BDRVQuorumState *s = acb->common.bs->opaque;
+    bool rewrite = false;
+
     sacb->ret = ret;
     acb->count++;
     if (ret == 0) {
@@ -658,7 +667,7 @@ static BlockAIOCB *read_fifo_child(QuorumAIOCB *acb)
                      acb->qcrs[acb->child_iter].buf);
     bdrv_aio_readv(s->bs[acb->child_iter], acb->sector_num,
                    &acb->qcrs[acb->child_iter].qiov, acb->nb_sectors,
-                   quorum_aio_cb, &acb->qcrs[acb->child_iter]);
+                   quorum_fifo_aio_cb, &acb->qcrs[acb->child_iter]);
 
     return &acb->common;
 }
-- 
2.4.3

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 17/18] block: explicitly acquire aiocontext in aio callbacks that need it
  2015-08-06 13:35 [Qemu-devel] [RFC PATCH 00/18] Fine-grained AioContext critical sections Paolo Bonzini
                   ` (15 preceding siblings ...)
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 16/18] quorum: split quorum_fifo_aio_cb from quorum_aio_cb Paolo Bonzini
@ 2015-08-06 13:36 ` Paolo Bonzini
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 18/18] aio: update locking documentation Paolo Bonzini
  17 siblings, 0 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-08-06 13:36 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, stefanha, qemu-block

todo: qed

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/archipelago.c    |  3 ---
 block/blkdebug.c       |  4 ----
 block/blkverify.c      |  9 ++++-----
 block/block-backend.c  |  4 ----
 block/curl.c           |  2 +-
 block/io.c             | 13 +++++++------
 block/linux-aio.c      |  7 ++++---
 block/mirror.c         | 12 +++++++++---
 block/null.c           |  8 --------
 block/qed.c            |  3 ---
 block/quorum.c         | 12 ++++++++++++
 block/rbd.c            |  4 ----
 block/win32-aio.c      |  2 --
 hw/block/virtio-blk.c  |  6 ++++++
 hw/scsi/scsi-disk.c    | 18 ++++++++++++++++++
 hw/scsi/scsi-generic.c | 20 +++++++++++++++++---
 thread-pool.c          | 12 +++++++++++-
 17 files changed, 89 insertions(+), 50 deletions(-)

diff --git a/block/archipelago.c b/block/archipelago.c
index 7f69a3f..855655c 100644
--- a/block/archipelago.c
+++ b/block/archipelago.c
@@ -312,12 +312,9 @@ static void qemu_archipelago_complete_aio(void *opaque)
 {
     AIORequestData *reqdata = (AIORequestData *) opaque;
     ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) reqdata->aio_cb;
-    AioContext *ctx = bdrv_get_aio_context(aio_cb->common.bs);
 
     qemu_bh_delete(aio_cb->bh);
-    aio_context_acquire(ctx);
     aio_cb->common.cb(aio_cb->common.opaque, aio_cb->ret);
-    aio_context_release(ctx);
     aio_cb->status = 0;
 
     qemu_aio_unref(aio_cb);
diff --git a/block/blkdebug.c b/block/blkdebug.c
index 2130811..bc247f4 100644
--- a/block/blkdebug.c
+++ b/block/blkdebug.c
@@ -458,12 +458,8 @@ out:
 static void error_callback_bh(void *opaque)
 {
     struct BlkdebugAIOCB *acb = opaque;
-    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
-
     qemu_bh_delete(acb->bh);
-    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, acb->ret);
-    aio_context_release(ctx);
     qemu_aio_unref(acb);
 }
 
diff --git a/block/blkverify.c b/block/blkverify.c
index 510c198..e670622 100644
--- a/block/blkverify.c
+++ b/block/blkverify.c
@@ -185,23 +185,22 @@ static BlkverifyAIOCB *blkverify_aio_get(BlockDriverState *bs, bool is_write,
 static void blkverify_aio_bh(void *opaque)
 {
     BlkverifyAIOCB *acb = opaque;
-    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
 
     qemu_bh_delete(acb->bh);
     if (acb->buf) {
         qemu_iovec_destroy(&acb->raw_qiov);
         qemu_vfree(acb->buf);
     }
-    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, acb->ret);
-    aio_context_release(ctx);
     qemu_aio_unref(acb);
 }
 
 static void blkverify_aio_cb(void *opaque, int ret)
 {
     BlkverifyAIOCB *acb = opaque;
+    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
 
+    aio_context_acquire(ctx);
     switch (++acb->done) {
     case 1:
         acb->ret = ret;
@@ -216,11 +215,11 @@ static void blkverify_aio_cb(void *opaque, int ret)
             acb->verify(acb);
         }
 
-        acb->bh = aio_bh_new(bdrv_get_aio_context(acb->common.bs),
-                             blkverify_aio_bh, acb);
+        acb->bh = aio_bh_new(ctx, blkverify_aio_bh, acb);
         qemu_bh_schedule(acb->bh);
         break;
     }
+    aio_context_release(ctx);
 }
 
 static void blkverify_verify_readv(BlkverifyAIOCB *acb)
diff --git a/block/block-backend.c b/block/block-backend.c
index 185ba32..aee8a12 100644
--- a/block/block-backend.c
+++ b/block/block-backend.c
@@ -529,12 +529,8 @@ int blk_write_zeroes(BlockBackend *blk, int64_t sector_num,
 static void error_callback_bh(void *opaque)
 {
     struct BlockBackendAIOCB *acb = opaque;
-    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
-
     qemu_bh_delete(acb->bh);
-    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, acb->ret);
-    aio_context_release(ctx);
     qemu_aio_unref(acb);
 }
 
diff --git a/block/curl.c b/block/curl.c
index fc70b01..0e062eb 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -713,11 +713,11 @@ static void curl_readv_bh_cb(void *p)
     curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);
 
 out:
+    aio_context_release(ctx);
     if (ret != -EINPROGRESS) {
         acb->common.cb(acb->common.opaque, ret);
         qemu_aio_unref(acb);
     }
-    aio_context_release(ctx);
 }
 
 static BlockAIOCB *curl_aio_readv(BlockDriverState *bs,
diff --git a/block/io.c b/block/io.c
index 74f5705..7bfe6a1 100644
--- a/block/io.c
+++ b/block/io.c
@@ -2002,15 +2002,12 @@ static const AIOCBInfo bdrv_em_aiocb_info = {
 static void bdrv_aio_bh_cb(void *opaque)
 {
     BlockAIOCBSync *acb = opaque;
-    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
 
     if (!acb->is_write && acb->ret >= 0) {
         qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
     }
     qemu_vfree(acb->bounce);
-    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, acb->ret);
-    aio_context_release(ctx);
     qemu_bh_delete(acb->bh);
     acb->bh = NULL;
     qemu_aio_unref(acb);
@@ -2086,13 +2083,10 @@ static void bdrv_co_complete(BlockAIOCBCoroutine *acb)
 static void bdrv_co_em_bh(void *opaque)
 {
     BlockAIOCBCoroutine *acb = opaque;
-    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
 
     assert(!acb->need_bh);
     qemu_bh_delete(acb->bh);
-    aio_context_acquire(ctx);
     bdrv_co_complete(acb);
-    aio_context_release(ctx);
 }
 
 static void bdrv_co_maybe_schedule_bh(BlockAIOCBCoroutine *acb)
@@ -2243,15 +2237,19 @@ void qemu_aio_unref(void *p)
 
 typedef struct CoroutineIOCompletion {
     Coroutine *coroutine;
+    AioContext *ctx;
     int ret;
 } CoroutineIOCompletion;
 
 static void bdrv_co_io_em_complete(void *opaque, int ret)
 {
     CoroutineIOCompletion *co = opaque;
+    AioContext *ctx = co->ctx;
 
     co->ret = ret;
+    aio_context_acquire(ctx);
     qemu_coroutine_enter(co->coroutine, NULL);
+    aio_context_release(ctx);
 }
 
 static int coroutine_fn bdrv_co_io_em(BlockDriverState *bs, int64_t sector_num,
@@ -2260,6 +2258,7 @@ static int coroutine_fn bdrv_co_io_em(BlockDriverState *bs, int64_t sector_num,
 {
     CoroutineIOCompletion co = {
         .coroutine = qemu_coroutine_self(),
+        .ctx = bdrv_get_aio_context(bs),
     };
     BlockAIOCB *acb;
 
@@ -2331,6 +2330,7 @@ int coroutine_fn bdrv_co_flush(BlockDriverState *bs)
         BlockAIOCB *acb;
         CoroutineIOCompletion co = {
             .coroutine = qemu_coroutine_self(),
+            .ctx = bdrv_get_aio_context(bs),
         };
 
         acb = bs->drv->bdrv_aio_flush(bs, bdrv_co_io_em_complete, &co);
@@ -2455,6 +2455,7 @@ int coroutine_fn bdrv_co_discard(BlockDriverState *bs, int64_t sector_num,
             BlockAIOCB *acb;
             CoroutineIOCompletion co = {
                 .coroutine = qemu_coroutine_self(),
+                .ctx = bdrv_get_aio_context(bs),
             };
 
             acb = bs->drv->bdrv_aio_discard(bs, sector_num, nb_sectors,
diff --git a/block/linux-aio.c b/block/linux-aio.c
index bc83e5c..4f21918 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -71,8 +71,7 @@ static inline ssize_t io_event_ret(struct io_event *ev)
 /*
  * Completes an AIO request (calls the callback and frees the ACB).
  */
-static void qemu_laio_process_completion(struct qemu_laio_state *s,
-    struct qemu_laiocb *laiocb)
+static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
 {
     int ret;
 
@@ -138,7 +137,9 @@ static void qemu_laio_completion_bh(void *opaque)
         laiocb->ret = io_event_ret(&s->events[s->event_idx]);
         s->event_idx++;
 
-        qemu_laio_process_completion(s, laiocb);
+        aio_context_release(s->aio_context);
+        qemu_laio_process_completion(laiocb);
+        aio_context_acquire(s->aio_context);
     }
 
     if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
diff --git a/block/mirror.c b/block/mirror.c
index fc4d8f5..923732c 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -127,6 +127,8 @@ static void mirror_write_complete(void *opaque, int ret)
 {
     MirrorOp *op = opaque;
     MirrorBlockJob *s = op->s;
+
+    aio_context_acquire(bdrv_get_aio_context(s->common.bs));
     if (ret < 0) {
         BlockErrorAction action;
 
@@ -137,12 +139,15 @@ static void mirror_write_complete(void *opaque, int ret)
         }
     }
     mirror_iteration_done(op, ret);
+    aio_context_release(bdrv_get_aio_context(s->common.bs));
 }
 
 static void mirror_read_complete(void *opaque, int ret)
 {
     MirrorOp *op = opaque;
     MirrorBlockJob *s = op->s;
+
+    aio_context_acquire(bdrv_get_aio_context(s->common.bs));
     if (ret < 0) {
         BlockErrorAction action;
 
@@ -153,10 +158,11 @@ static void mirror_read_complete(void *opaque, int ret)
         }
 
         mirror_iteration_done(op, ret);
-        return;
+    } else {
+        bdrv_aio_writev(s->target, op->sector_num, &op->qiov, op->nb_sectors,
+                        mirror_write_complete, op);
     }
-    bdrv_aio_writev(s->target, op->sector_num, &op->qiov, op->nb_sectors,
-                    mirror_write_complete, op);
+    aio_context_release(bdrv_get_aio_context(s->common.bs));
 }
 
 static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)
diff --git a/block/null.c b/block/null.c
index 9bddc1b..7d08323 100644
--- a/block/null.c
+++ b/block/null.c
@@ -117,11 +117,7 @@ static const AIOCBInfo null_aiocb_info = {
 static void null_bh_cb(void *opaque)
 {
     NullAIOCB *acb = opaque;
-    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
-
-    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, 0);
-    aio_context_release(ctx);
     qemu_bh_delete(acb->bh);
     qemu_aio_unref(acb);
 }
@@ -129,11 +125,7 @@ static void null_bh_cb(void *opaque)
 static void null_timer_cb(void *opaque)
 {
     NullAIOCB *acb = opaque;
-    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
-
-    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, 0);
-    aio_context_release(ctx);
     timer_deinit(&acb->timer);
     qemu_aio_unref(acb);
 }
diff --git a/block/qed.c b/block/qed.c
index adc9b73..9b2c786 100644
--- a/block/qed.c
+++ b/block/qed.c
@@ -912,15 +912,12 @@ static void qed_aio_complete_bh(void *opaque)
     BlockCompletionFunc *cb = acb->common.cb;
     void *user_opaque = acb->common.opaque;
     int ret = acb->bh_ret;
-    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
 
     qemu_bh_delete(acb->bh);
     qemu_aio_unref(acb);
 
     /* Invoke callback */
-    aio_context_acquire(ctx);
     cb(user_opaque, ret);
-    aio_context_release(ctx);
 }
 
 static void qed_aio_complete(QEDAIOCB *acb, int ret)
diff --git a/block/quorum.c b/block/quorum.c
index 026528e..7c06080 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -149,6 +149,7 @@ static AIOCBInfo quorum_aiocb_info = {
     .cancel_async       = quorum_aio_cancel,
 };
 
+/* Called _without_ acquiring AioContext.  */
 static void quorum_aio_finalize(QuorumAIOCB *acb)
 {
     int i, ret = 0;
@@ -276,12 +277,15 @@ static void quorum_fifo_aio_cb(void *opaque, int ret)
     QuorumChildRequest *sacb = opaque;
     QuorumAIOCB *acb = sacb->parent;
     BDRVQuorumState *s = acb->common.bs->opaque;
+    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
 
+    aio_context_acquire(ctx);
     assert(acb->is_read && s->read_pattern == QUORUM_READ_PATTERN_FIFO);
 
     /* We try to read next child in FIFO order if we fail to read */
     if (ret < 0 && ++acb->child_iter < s->num_children) {
         read_fifo_child(acb);
+        aio_context_release(ctx);
         return;
     }
 
@@ -292,6 +296,7 @@ static void quorum_fifo_aio_cb(void *opaque, int ret)
 
     /* FIXME: rewrite failed children if acb->child_iter > 0? */
 
+    aio_context_release(ctx);
     quorum_aio_finalize(acb);
 }
 
@@ -300,8 +305,11 @@ static void quorum_aio_cb(void *opaque, int ret)
     QuorumChildRequest *sacb = opaque;
     QuorumAIOCB *acb = sacb->parent;
     BDRVQuorumState *s = acb->common.bs->opaque;
+    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
     bool rewrite = false;
 
+    aio_context_acquire(ctx);
+
     sacb->ret = ret;
     acb->count++;
     if (ret == 0) {
@@ -311,7 +319,9 @@ static void quorum_aio_cb(void *opaque, int ret)
     }
     assert(acb->count <= s->num_children);
     assert(acb->success_count <= s->num_children);
+
     if (acb->count < s->num_children) {
+        aio_context_release(ctx);
         return;
     }
 
@@ -322,6 +332,8 @@ static void quorum_aio_cb(void *opaque, int ret)
         quorum_has_too_much_io_failed(acb);
     }
 
+    aio_context_release(ctx);
+
     /* if no rewrite is done the code will finish right away */
     if (!rewrite) {
         quorum_aio_finalize(acb);
diff --git a/block/rbd.c b/block/rbd.c
index 6206dc3..a60a19d 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -376,7 +376,6 @@ static int qemu_rbd_create(const char *filename, QemuOpts *opts, Error **errp)
 static void qemu_rbd_complete_aio(RADOSCB *rcb)
 {
     RBDAIOCB *acb = rcb->acb;
-    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
     int64_t r;
 
     r = rcb->ret;
@@ -409,10 +408,7 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
         qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
     }
     qemu_vfree(acb->bounce);
-
-    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
-    aio_context_release(ctx);
 
     qemu_aio_unref(acb);
 }
diff --git a/block/win32-aio.c b/block/win32-aio.c
index a78d149..33f7bcf 100644
--- a/block/win32-aio.c
+++ b/block/win32-aio.c
@@ -87,9 +87,7 @@ static void win32_aio_process_completion(QEMUWin32AIOState *s,
     }
 
 
-    aio_context_acquire(s->aio_ctx);
     waiocb->common.cb(waiocb->common.opaque, ret);
-    aio_context_release(s->aio_ctx);
     qemu_aio_unref(waiocb);
 }
 
diff --git a/hw/block/virtio-blk.c b/hw/block/virtio-blk.c
index 4462ad2..fd018a3 100644
--- a/hw/block/virtio-blk.c
+++ b/hw/block/virtio-blk.c
@@ -87,7 +87,9 @@ static int virtio_blk_handle_rw_error(VirtIOBlockReq *req, int error,
 static void virtio_blk_rw_complete(void *opaque, int ret)
 {
     VirtIOBlockReq *next = opaque;
+    VirtIOBlock *s = next->dev;
 
+    aio_context_acquire(blk_get_aio_context(s->conf.conf.blk));
     while (next) {
         VirtIOBlockReq *req = next;
         next = req->mr_next;
@@ -120,12 +122,15 @@ static void virtio_blk_rw_complete(void *opaque, int ret)
         block_acct_done(blk_get_stats(req->dev->blk), &req->acct);
         virtio_blk_free_request(req);
     }
+    aio_context_release(blk_get_aio_context(s->conf.conf.blk));
 }
 
 static void virtio_blk_flush_complete(void *opaque, int ret)
 {
     VirtIOBlockReq *req = opaque;
+    VirtIOBlock *s = req->dev;
 
+    aio_context_acquire(blk_get_aio_context(s->conf.conf.blk));
     if (ret) {
         if (virtio_blk_handle_rw_error(req, -ret, 0)) {
             return;
@@ -135,6 +140,7 @@ static void virtio_blk_flush_complete(void *opaque, int ret)
     virtio_blk_req_complete(req, VIRTIO_BLK_S_OK);
     block_acct_done(blk_get_stats(req->dev->blk), &req->acct);
     virtio_blk_free_request(req);
+    aio_context_release(blk_get_aio_context(s->conf.conf.blk));
 }
 
 #ifdef __linux__
diff --git a/hw/scsi/scsi-disk.c b/hw/scsi/scsi-disk.c
index bada9a7..fb82d20 100644
--- a/hw/scsi/scsi-disk.c
+++ b/hw/scsi/scsi-disk.c
@@ -169,6 +169,8 @@ static void scsi_aio_complete(void *opaque, int ret)
 
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
+
+    aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
     block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
     if (r->req.io_canceled) {
         scsi_req_cancel_complete(&r->req);
@@ -184,6 +186,7 @@ static void scsi_aio_complete(void *opaque, int ret)
     scsi_req_complete(&r->req, GOOD);
 
 done:
+    aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
     scsi_req_unref(&r->req);
 }
 
@@ -273,8 +276,10 @@ static void scsi_dma_complete(void *opaque, int ret)
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
 
+    aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
     block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
     scsi_dma_complete_noio(r, ret);
+    aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
 }
 
 static void scsi_read_complete(void * opaque, int ret)
@@ -285,6 +290,8 @@ static void scsi_read_complete(void * opaque, int ret)
 
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
+
+    aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
     block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
     if (r->req.io_canceled) {
         scsi_req_cancel_complete(&r->req);
@@ -306,6 +313,7 @@ static void scsi_read_complete(void * opaque, int ret)
 
 done:
     scsi_req_unref(&r->req);
+    aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
 }
 
 /* Actually issue a read to the block device.  */
@@ -355,8 +363,10 @@ static void scsi_do_read_cb(void *opaque, int ret)
     assert (r->req.aiocb != NULL);
     r->req.aiocb = NULL;
 
+    aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
     block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
     scsi_do_read(opaque, ret);
+    aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
 }
 
 /* Read more data from scsi device into buffer.  */
@@ -481,8 +491,10 @@ static void scsi_write_complete(void * opaque, int ret)
     assert (r->req.aiocb != NULL);
     r->req.aiocb = NULL;
 
+    aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
     block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
     scsi_write_complete_noio(r, ret);
+    aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
 }
 
 static void scsi_write_data(SCSIRequest *req)
@@ -1625,11 +1637,14 @@ static void scsi_unmap_complete(void *opaque, int ret)
 {
     UnmapCBData *data = opaque;
     SCSIDiskReq *r = data->r;
+    SCSIDiskState *s = DO_UPCAST(SCSIDiskState, qdev, r->req.dev);
 
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
 
+    aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
     scsi_unmap_complete_noio(data, ret);
+    aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
 }
 
 static void scsi_disk_emulate_unmap(SCSIDiskReq *r, uint8_t *inbuf)
@@ -1696,6 +1711,8 @@ static void scsi_write_same_complete(void *opaque, int ret)
 
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
+
+    aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
     block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
     if (r->req.io_canceled) {
         scsi_req_cancel_complete(&r->req);
@@ -1730,6 +1747,7 @@ done:
     scsi_req_unref(&r->req);
     qemu_vfree(data->iov.iov_base);
     g_free(data);
+    aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
 }
 
 static void scsi_disk_emulate_write_same(SCSIDiskReq *r, uint8_t *inbuf)
diff --git a/hw/scsi/scsi-generic.c b/hw/scsi/scsi-generic.c
index 1b6350b..503ea18 100644
--- a/hw/scsi/scsi-generic.c
+++ b/hw/scsi/scsi-generic.c
@@ -145,10 +145,14 @@ done:
 static void scsi_command_complete(void *opaque, int ret)
 {
     SCSIGenericReq *r = (SCSIGenericReq *)opaque;
+    SCSIDevice *s = r->req.dev;
 
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
+
+    aio_context_acquire(blk_get_aio_context(s->conf.blk));
     scsi_command_complete_noio(r, ret);
+    aio_context_release(blk_get_aio_context(s->conf.blk));
 }
 
 static int execute_command(BlockBackend *blk,
@@ -184,9 +188,11 @@ static void scsi_read_complete(void * opaque, int ret)
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
 
+    aio_context_acquire(blk_get_aio_context(s->conf.blk));
+
     if (ret || r->req.io_canceled) {
         scsi_command_complete_noio(r, ret);
-        return;
+        goto done;
     }
 
     len = r->io_header.dxfer_len - r->io_header.resid;
@@ -195,7 +201,7 @@ static void scsi_read_complete(void * opaque, int ret)
     r->len = -1;
     if (len == 0) {
         scsi_command_complete_noio(r, 0);
-        return;
+        goto done;
     }
 
     /* Snoop READ CAPACITY output to set the blocksize.  */
@@ -212,6 +218,9 @@ static void scsi_read_complete(void * opaque, int ret)
 
     scsi_req_data(&r->req, len);
     scsi_req_unref(&r->req);
+
+done:
+    aio_context_release(blk_get_aio_context(s->conf.blk));
 }
 
 /* Read more data from scsi device into buffer.  */
@@ -247,9 +256,11 @@ static void scsi_write_complete(void * opaque, int ret)
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
 
+    aio_context_acquire(blk_get_aio_context(s->conf.blk));
+
     if (ret || r->req.io_canceled) {
         scsi_command_complete_noio(r, ret);
-        return;
+        goto done;
     }
 
     if (r->req.cmd.buf[0] == MODE_SELECT && r->req.cmd.buf[4] == 12 &&
@@ -259,6 +270,9 @@ static void scsi_write_complete(void * opaque, int ret)
     }
 
     scsi_command_complete_noio(r, ret);
+
+done:
+    aio_context_release(blk_get_aio_context(s->conf.blk));
 }
 
 /* Write data to a scsi device.  Returns nonzero on failure.
diff --git a/thread-pool.c b/thread-pool.c
index 1039188..9e6d5c9 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -185,7 +185,9 @@ restart:
              */
             qemu_bh_schedule(pool->completion_bh);
 
+            aio_context_release(pool->ctx);
             elem->common.cb(elem->common.opaque, elem->ret);
+            aio_context_acquire(pool->ctx);
             qemu_aio_unref(elem);
             goto restart;
         } else {
@@ -261,21 +263,29 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
 
 typedef struct ThreadPoolCo {
     Coroutine *co;
+    AioContext *ctx;
     int ret;
 } ThreadPoolCo;
 
 static void thread_pool_co_cb(void *opaque, int ret)
 {
     ThreadPoolCo *co = opaque;
+    AioContext *ctx = co->ctx;
 
     co->ret = ret;
+    aio_context_acquire(ctx);
     qemu_coroutine_enter(co->co, NULL);
+    aio_context_release(ctx);
 }
 
 int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
                                        void *arg)
 {
-    ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
+    ThreadPoolCo tpc = {
+        .co = qemu_coroutine_self(),
+        .ctx = pool->ctx,
+        .ret = -EINPROGRESS
+    };
     assert(qemu_in_coroutine());
     thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
     qemu_coroutine_yield();
-- 
2.4.3

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 18/18] aio: update locking documentation
  2015-08-06 13:35 [Qemu-devel] [RFC PATCH 00/18] Fine-grained AioContext critical sections Paolo Bonzini
                   ` (16 preceding siblings ...)
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 17/18] block: explicitly acquire aiocontext in aio callbacks that need it Paolo Bonzini
@ 2015-08-06 13:36 ` Paolo Bonzini
  17 siblings, 0 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-08-06 13:36 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, famz, stefanha, qemu-block

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/multiple-iothreads.txt | 65 +++++++++++++++++++++++++++++++++++++++------
 1 file changed, 57 insertions(+), 8 deletions(-)

diff --git a/docs/multiple-iothreads.txt b/docs/multiple-iothreads.txt
index 4197f62..729c64b 100644
--- a/docs/multiple-iothreads.txt
+++ b/docs/multiple-iothreads.txt
@@ -47,8 +47,11 @@ How to program for IOThreads
 The main difference between legacy code and new code that can run in an
 IOThread is dealing explicitly with the event loop object, AioContext
 (see include/block/aio.h).  Code that only works in the main loop
-implicitly uses the main loop's AioContext.  Code that supports running
-in IOThreads must be aware of its AioContext.
+implicitly uses the main loop's AioContext, and does not need to handle
+locking because it implicitly uses the QEMU global mutex.
+
+Code that supports running in IOThreads, instead, must be aware of its
+AioContext and of synchronization.
 
 AioContext supports the following services:
  * File descriptor monitoring (read/write/error on POSIX hosts)
@@ -79,6 +82,11 @@ iothread_get_aio_context() or for the main loop using qemu_get_aio_context().
 Code that takes an AioContext argument works both in IOThreads or the main
 loop, depending on which AioContext instance the caller passes in.
 
+When running in an IOThread, handlers for bottom halves, file descriptors,
+and timers have to handle their own synchronization.  Block layer AIO
+callbacks also have to handle synchronization.  How to do this is the
+topic of the next sections.
+
 How to synchronize with an IOThread
 -----------------------------------
 AioContext is not thread-safe so some rules must be followed when using file
@@ -87,13 +95,15 @@ descriptors, event notifiers, timers, or BHs across threads:
 1. AioContext functions can always be called safely.  They handle their
 own locking internally.
 
-2. Other threads wishing to access the AioContext must use
-aio_context_acquire()/aio_context_release() for mutual exclusion.  Once the
-context is acquired no other thread can access it or run event loop iterations
-in this AioContext.
+2. Other threads wishing to access block devices on an IOThread's AioContext
+must use a mutex; the block layer does not attempt to provide mutual
+exclusion of any kind.  aio_context_acquire() and aio_context_release()
+are the typical choice; aio_context_acquire()/aio_context_release() calls
+may be nested.
 
-aio_context_acquire()/aio_context_release() calls may be nested.  This
-means you can call them if you're not sure whether #1 applies.
+#2 typically includes bottom half handlers, file descriptor handlers,
+timer handlers and AIO callbacks.  All of these typically acquire and
+release the AioContext.
 
 There is currently no lock ordering rule if a thread needs to acquire multiple
 AioContexts simultaneously.  Therefore, it is only safe for code holding the
@@ -130,9 +140,48 @@ aio_disable_clients() before calling bdrv_drain().  You can then reenable
 guest requests with aio_enable_clients() before finally releasing the
 AioContext and completing the monitor command.
 
+AioContext and coroutines
+-------------------------
 Long-running jobs (usually in the form of coroutines) are best scheduled in the
 BlockDriverState's AioContext to avoid the need to acquire/release around each
 bdrv_*() call.  Be aware that there is currently no mechanism to get notified
 when bdrv_set_aio_context() moves this BlockDriverState to a different
 AioContext (see bdrv_detach_aio_context()/bdrv_attach_aio_context()), so you
 may need to add this if you want to support long-running jobs.
+
+Usually, the coroutine is created and first entered in the main loop.  After
+some time it will yield and, when entered again, it will run in the
+IOThread.  In general, however, the coroutine never has to worry about
+releasing and acquiring the AioContext.  The release will happen in the
+function that entered the coroutine; the acquire will happen (e.g. in
+a bottom half or AIO callback) before the coroutine is entered.  For
+example:
+
+1) the coroutine yields because it waits on I/O (e.g. during a call to
+bdrv_co_readv) or sleeps for a determinate period of time (e.g. with
+co_aio_sleep_ns).  In this case, functions such as bdrv_co_io_em_complete
+or co_sleep_cb take the AioContext lock before re-entering.
+
+2) the coroutine explicitly yields.  In this case, the code implementing
+the long-running job will have a bottom half, AIO callback or similar that
+should acquire the AioContext before re-entering the coroutine.
+
+3) the coroutine yields through a CoQueue (directly or indirectly, e.g.
+through a CoMutex or CoRwLock).  This also "just works", but the mechanism
+is more subtle.  Suppose coroutine C1 has just released a mutex and C2
+was waiting on it.  C2 is woken up at C1's next yield point, or just
+before C1 terminates; C2 then runs *before qemu_coroutine_enter(C1)
+returns to its caller*.  Assuming that C1 was entered with a sequence like
+
+    aio_context_acquire(ctx);
+    qemu_coroutine_enter(co);     // enters C1
+    aio_context_release(ctx);
+
+... then C2 will also be protected by the AioContext lock.
+
+
+Bullet 3 provides the following rule for using coroutine mutual exclusion
+in multiple threads:
+
+    *** If two coroutines can exclude each other through a CoMutex or
+    *** CoRwLock or CoQueue, they should all run under the same AioContext.
-- 
2.4.3

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 01/18] iothread: release iothread around aio_poll
  2015-08-06 13:35 ` [Qemu-devel] [PATCH 01/18] iothread: release iothread around aio_poll Paolo Bonzini
@ 2015-09-09  6:06   ` Fam Zheng
  2015-09-09  7:31     ` Paolo Bonzini
  2015-09-28  9:50   ` Stefan Hajnoczi
  1 sibling, 1 reply; 29+ messages in thread
From: Fam Zheng @ 2015-09-09  6:06 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: kwolf, stefanha, qemu-devel, qemu-block

On Thu, 08/06 15:35, Paolo Bonzini wrote:
> This is the first step towards having fine-grained critical sections in
> dataplane threads, which resolves lock ordering problems between
> address_space_* functions (which need the BQL when doing MMIO, even
> after we complete RCU-based dispatch) and the AioContext.
> 
> Because AioContext does not use contention callbacks anymore, the
> unit test has to be changed.
> 
> Previously applied as a0710f7995f914e3044e5899bd8ff6c43c62f916 and
> then reverted.
> 
> Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  async.c                     | 22 +++-------------------
>  docs/multiple-iothreads.txt | 25 +++++++++++++++----------
>  include/block/aio.h         |  3 ---
>  iothread.c                  | 11 ++---------
>  tests/test-aio.c            | 19 +++++++++++--------
>  5 files changed, 31 insertions(+), 49 deletions(-)
> 
> diff --git a/async.c b/async.c
> index efce14b..f992914 100644
> --- a/async.c
> +++ b/async.c
> @@ -79,8 +79,8 @@ int aio_bh_poll(AioContext *ctx)
>           * aio_notify again if necessary.
>           */
>          if (!bh->deleted && atomic_xchg(&bh->scheduled, 0)) {
> -            /* Idle BHs and the notify BH don't count as progress */
> -            if (!bh->idle && bh != ctx->notify_dummy_bh) {
> +            /* Idle BHs don't count as progress */
> +            if (!bh->idle) {
>                  ret = 1;
>              }
>              bh->idle = 0;
> @@ -232,7 +232,6 @@ aio_ctx_finalize(GSource     *source)
>  {
>      AioContext *ctx = (AioContext *) source;
>  
> -    qemu_bh_delete(ctx->notify_dummy_bh);
>      thread_pool_free(ctx->thread_pool);
>  
>      qemu_mutex_lock(&ctx->bh_lock);
> @@ -299,19 +298,6 @@ static void aio_timerlist_notify(void *opaque)
>      aio_notify(opaque);
>  }
>  
> -static void aio_rfifolock_cb(void *opaque)
> -{
> -    AioContext *ctx = opaque;
> -
> -    /* Kick owner thread in case they are blocked in aio_poll() */
> -    qemu_bh_schedule(ctx->notify_dummy_bh);
> -}
> -
> -static void notify_dummy_bh(void *opaque)
> -{
> -    /* Do nothing, we were invoked just to force the event loop to iterate */
> -}
> -
>  static void event_notifier_dummy_cb(EventNotifier *e)
>  {
>  }
> @@ -333,11 +319,9 @@ AioContext *aio_context_new(Error **errp)
>                             event_notifier_dummy_cb);
>      ctx->thread_pool = NULL;
>      qemu_mutex_init(&ctx->bh_lock);
> -    rfifolock_init(&ctx->lock, aio_rfifolock_cb, ctx);
> +    rfifolock_init(&ctx->lock, NULL, NULL);
>      timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
>  
> -    ctx->notify_dummy_bh = aio_bh_new(ctx, notify_dummy_bh, NULL);
> -
>      return ctx;
>  }
>  
> diff --git a/docs/multiple-iothreads.txt b/docs/multiple-iothreads.txt
> index 40b8419..723cc7e 100644
> --- a/docs/multiple-iothreads.txt
> +++ b/docs/multiple-iothreads.txt
> @@ -105,13 +105,10 @@ a BH in the target AioContext beforehand and then call qemu_bh_schedule().  No
>  acquire/release or locking is needed for the qemu_bh_schedule() call.  But be
>  sure to acquire the AioContext for aio_bh_new() if necessary.
>  
> -The relationship between AioContext and the block layer
> --------------------------------------------------------
> -The AioContext originates from the QEMU block layer because it provides a
> -scoped way of running event loop iterations until all work is done.  This
> -feature is used to complete all in-flight block I/O requests (see
> -bdrv_drain_all()).  Nowadays AioContext is a generic event loop that can be
> -used by any QEMU subsystem.
> +AioContext and the block layer
> +------------------------------
> +The AioContext originates from the QEMU block layer, even though nowadays
> +AioContext is a generic event loop that can be used by any QEMU subsystem.
>  
>  The block layer has support for AioContext integrated.  Each BlockDriverState
>  is associated with an AioContext using bdrv_set_aio_context() and
> @@ -122,9 +119,17 @@ Block layer code must therefore expect to run in an IOThread and avoid using
>  old APIs that implicitly use the main loop.  See the "How to program for
>  IOThreads" above for information on how to do that.
>  
> -If main loop code such as a QMP function wishes to access a BlockDriverState it
> -must first call aio_context_acquire(bdrv_get_aio_context(bs)) to ensure the
> -IOThread does not run in parallel.
> +If main loop code such as a QMP function wishes to access a BlockDriverState
> +it must first call aio_context_acquire(bdrv_get_aio_context(bs)) to ensure
> +that callbacks in the IOThread do not run in parallel.
> +
> +Code running in the monitor typically uses bdrv_drain() to ensure that
> +past requests from the guest are completed.  When a block device is
> +running in an IOThread, the IOThread can also process requests from
> +the guest (via ioeventfd).  These requests have to be blocked with
> +aio_disable_clients() before calling bdrv_drain().  You can then reenable
> +guest requests with aio_enable_clients() before finally releasing the
> +AioContext and completing the monitor command.

This patch will probably go in before aio_disable_clients, if any, but I'm not
quite confident about the interface yet: listing a precise set of clients from
monitor is an ugly coupling between modules:

    aio_disable_clients(bdrv_get_aio_context(bs),
                        AIO_CLIENT_NBD | AIO_CLIENT_IOEVENTFD | AIO_CLIENT_FOO);

    ...

    aio_enble_clients(bdrv_get_aio_context(bs),
                      AIO_CLIENT_NBD | AIO_CLIENT_IOEVENTFD | AIO_CLIENT_FOO);

I prefer at least an abstraction:

    bdrv_quiesce_begin(bs);

    ...

    bdrv_quiesce_end(bs);

My point is maybe we should leave documenting this to whichever series that
introduces it?

Fam

>  
>  Long-running jobs (usually in the form of coroutines) are best scheduled in the
>  BlockDriverState's AioContext to avoid the need to acquire/release around each
> diff --git a/include/block/aio.h b/include/block/aio.h
> index 400b1b0..9dd32e0 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -114,9 +114,6 @@ struct AioContext {
>      bool notified;
>      EventNotifier notifier;
>  
> -    /* Scheduling this BH forces the event loop it iterate */
> -    QEMUBH *notify_dummy_bh;
> -
>      /* Thread pool for performing work and receiving completion callbacks */
>      struct ThreadPool *thread_pool;
>  
> diff --git a/iothread.c b/iothread.c
> index da6ce7b..8f6d2e4 100644
> --- a/iothread.c
> +++ b/iothread.c
> @@ -30,7 +30,6 @@ typedef ObjectClass IOThreadClass;
>  static void *iothread_run(void *opaque)
>  {
>      IOThread *iothread = opaque;
> -    bool blocking;
>  
>      rcu_register_thread();
>  
> @@ -39,14 +38,8 @@ static void *iothread_run(void *opaque)
>      qemu_cond_signal(&iothread->init_done_cond);
>      qemu_mutex_unlock(&iothread->init_done_lock);
>  
> -    while (!iothread->stopping) {
> -        aio_context_acquire(iothread->ctx);
> -        blocking = true;
> -        while (!iothread->stopping && aio_poll(iothread->ctx, blocking)) {
> -            /* Progress was made, keep going */
> -            blocking = false;
> -        }
> -        aio_context_release(iothread->ctx);
> +    while (!atomic_read(&iothread->stopping)) {
> +        aio_poll(iothread->ctx, true);
>      }
>  
>      rcu_unregister_thread();
> diff --git a/tests/test-aio.c b/tests/test-aio.c
> index 217e337..b4bd3f1 100644
> --- a/tests/test-aio.c
> +++ b/tests/test-aio.c
> @@ -99,6 +99,7 @@ static void event_ready_cb(EventNotifier *e)
>  
>  typedef struct {
>      QemuMutex start_lock;
> +    EventNotifier notifier;
>      bool thread_acquired;
>  } AcquireTestData;
>  
> @@ -110,6 +111,8 @@ static void *test_acquire_thread(void *opaque)
>      qemu_mutex_lock(&data->start_lock);
>      qemu_mutex_unlock(&data->start_lock);
>  
> +    g_usleep(500000);
> +    event_notifier_set(&data->notifier);
>      aio_context_acquire(ctx);
>      aio_context_release(ctx);
>  
> @@ -118,20 +121,19 @@ static void *test_acquire_thread(void *opaque)
>      return NULL;
>  }
>  
> -static void dummy_notifier_read(EventNotifier *unused)
> +static void dummy_notifier_read(EventNotifier *n)
>  {
> -    g_assert(false); /* should never be invoked */
> +    event_notifier_test_and_clear(n);
>  }
>  
>  static void test_acquire(void)
>  {
>      QemuThread thread;
> -    EventNotifier notifier;
>      AcquireTestData data;
>  
>      /* Dummy event notifier ensures aio_poll() will block */
> -    event_notifier_init(&notifier, false);
> -    aio_set_event_notifier(ctx, &notifier, dummy_notifier_read);
> +    event_notifier_init(&data.notifier, false);
> +    aio_set_event_notifier(ctx, &data.notifier, dummy_notifier_read);
>      g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */
>  
>      qemu_mutex_init(&data.start_lock);
> @@ -145,12 +147,13 @@ static void test_acquire(void)
>      /* Block in aio_poll(), let other thread kick us and acquire context */
>      aio_context_acquire(ctx);
>      qemu_mutex_unlock(&data.start_lock); /* let the thread run */
> -    g_assert(!aio_poll(ctx, true));
> +    g_assert(aio_poll(ctx, true));
> +    g_assert(!data.thread_acquired);
>      aio_context_release(ctx);
>  
>      qemu_thread_join(&thread);
> -    aio_set_event_notifier(ctx, &notifier, NULL);
> -    event_notifier_cleanup(&notifier);
> +    aio_set_event_notifier(ctx, &data.notifier, NULL);
> +    event_notifier_cleanup(&data.notifier);
>  
>      g_assert(data.thread_acquired);
>  }
> -- 
> 2.4.3
> 
> 

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 02/18] aio: rename bh_lock to list_lock
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 02/18] aio: rename bh_lock to list_lock Paolo Bonzini
@ 2015-09-09  6:08   ` Fam Zheng
  2015-09-28 10:09   ` Stefan Hajnoczi
  1 sibling, 0 replies; 29+ messages in thread
From: Fam Zheng @ 2015-09-09  6:08 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: kwolf, qemu-block, qemu-devel, stefanha

On Thu, 08/06 15:36, Paolo Bonzini wrote:
> This will be used for AioHandlers too.  There is going to be little
> or no contention, so it is better to reuse the same lock.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>

Reviewed-by: Fam Zheng <famz@redhat.com>

> ---
>  async.c             | 16 ++++++++--------
>  include/block/aio.h |  2 +-
>  2 files changed, 9 insertions(+), 9 deletions(-)
> 
> diff --git a/async.c b/async.c
> index f992914..7e97614 100644
> --- a/async.c
> +++ b/async.c
> @@ -50,12 +50,12 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
>          .cb = cb,
>          .opaque = opaque,
>      };
> -    qemu_mutex_lock(&ctx->bh_lock);
> +    qemu_mutex_lock(&ctx->list_lock);
>      bh->next = ctx->first_bh;
>      /* Make sure that the members are ready before putting bh into list */
>      smp_wmb();
>      ctx->first_bh = bh;
> -    qemu_mutex_unlock(&ctx->bh_lock);
> +    qemu_mutex_unlock(&ctx->list_lock);
>      return bh;
>  }
>  
> @@ -92,7 +92,7 @@ int aio_bh_poll(AioContext *ctx)
>  
>      /* remove deleted bhs */
>      if (!ctx->walking_bh) {
> -        qemu_mutex_lock(&ctx->bh_lock);
> +        qemu_mutex_lock(&ctx->list_lock);
>          bhp = &ctx->first_bh;
>          while (*bhp) {
>              bh = *bhp;
> @@ -103,7 +103,7 @@ int aio_bh_poll(AioContext *ctx)
>                  bhp = &bh->next;
>              }
>          }
> -        qemu_mutex_unlock(&ctx->bh_lock);
> +        qemu_mutex_unlock(&ctx->list_lock);
>      }
>  
>      return ret;
> @@ -234,7 +234,7 @@ aio_ctx_finalize(GSource     *source)
>  
>      thread_pool_free(ctx->thread_pool);
>  
> -    qemu_mutex_lock(&ctx->bh_lock);
> +    qemu_mutex_lock(&ctx->list_lock);
>      while (ctx->first_bh) {
>          QEMUBH *next = ctx->first_bh->next;
>  
> @@ -244,12 +244,12 @@ aio_ctx_finalize(GSource     *source)
>          g_free(ctx->first_bh);
>          ctx->first_bh = next;
>      }
> -    qemu_mutex_unlock(&ctx->bh_lock);
> +    qemu_mutex_unlock(&ctx->list_lock);
>  
>      aio_set_event_notifier(ctx, &ctx->notifier, NULL);
>      event_notifier_cleanup(&ctx->notifier);
>      rfifolock_destroy(&ctx->lock);
> -    qemu_mutex_destroy(&ctx->bh_lock);
> +    qemu_mutex_destroy(&ctx->list_lock);
>      timerlistgroup_deinit(&ctx->tlg);
>  }
>  
> @@ -318,7 +318,7 @@ AioContext *aio_context_new(Error **errp)
>                             (EventNotifierHandler *)
>                             event_notifier_dummy_cb);
>      ctx->thread_pool = NULL;
> -    qemu_mutex_init(&ctx->bh_lock);
> +    qemu_mutex_init(&ctx->list_lock);
>      rfifolock_init(&ctx->lock, NULL, NULL);
>      timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
>  
> diff --git a/include/block/aio.h b/include/block/aio.h
> index 9dd32e0..026f6c1 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -89,7 +89,7 @@ struct AioContext {
>      uint32_t notify_me;
>  
>      /* lock to protect between bh's adders and deleter */
> -    QemuMutex bh_lock;
> +    QemuMutex list_lock;
>  
>      /* Anchor of the list of Bottom Halves belonging to the context */
>      struct QEMUBH *first_bh;
> -- 
> 2.4.3
> 
> 
> 

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 01/18] iothread: release iothread around aio_poll
  2015-09-09  6:06   ` Fam Zheng
@ 2015-09-09  7:31     ` Paolo Bonzini
  0 siblings, 0 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-09-09  7:31 UTC (permalink / raw)
  To: Fam Zheng; +Cc: kwolf, qemu-block, qemu-devel, stefanha



On 09/09/2015 08:06, Fam Zheng wrote:
> This patch will probably go in before aio_disable_clients,

Actually I think it's blocked by aio_disable_clients.

> if any, but I'm not
> quite confident about the interface yet: listing a precise set of clients from
> monitor is an ugly coupling between modules:
> 
>     aio_disable_clients(bdrv_get_aio_context(bs),
>                         AIO_CLIENT_NBD | AIO_CLIENT_IOEVENTFD | AIO_CLIENT_FOO);
> 
>     ...
> 
>     aio_enble_clients(bdrv_get_aio_context(bs),
>                       AIO_CLIENT_NBD | AIO_CLIENT_IOEVENTFD | AIO_CLIENT_FOO);
> 
> I prefer at least an abstraction:
> 
>     bdrv_quiesce_begin(bs);
> 
>     ...
> 
>     bdrv_quiesce_end(bs);

That's fine.

Paolo

> My point is maybe we should leave documenting this to whichever series that
> introduces it?

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 03/18] qemu-thread: introduce QemuLockCnt
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 03/18] qemu-thread: introduce QemuLockCnt Paolo Bonzini
@ 2015-09-09  8:49   ` Fam Zheng
  2015-09-09  9:14     ` Paolo Bonzini
  2015-09-28 10:15   ` Stefan Hajnoczi
  1 sibling, 1 reply; 29+ messages in thread
From: Fam Zheng @ 2015-09-09  8:49 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: kwolf, qemu-block, qemu-devel, stefanha

On Thu, 08/06 15:36, Paolo Bonzini wrote:
> +QemuLockCnt usage
> +-----------------
> +
> +The typical pattern for QemuLockCnt functions is as follows.
> +
> +    qemu_lockcnt_inc(&xyz_lockcnt);
> +    if (xyz) {
> +        ... access xyz ...
> +    }
> +
> +    if (qemu_lockcnt_dec_and_lock(&xyz_lockcnt)) {
> +        g_free(xyz);
> +        xyz = NULL;
> +        qemu_lockcnt_unlock(&xyz_lockcnt);
> +    }
> +
> +Modifications are done using qemu_lockcnt_lock and qemu_lockcnt_unlock:
> +
> +    qemu_lockcnt_lock(&xyz_lockcnt);
> +    xyz = g_malloc(xyz);
> +    qemu_lockcnt_unlock(&xyz_lockcnt);
> +
> +If an object has to be freed outside a visit, you can use the following
> +scheme:
> +
> +    qemu_lockcnt_lock(&xyz_lockcnt);
> +    if (!qemu_lockcnt_count(&xyz_lockcnt)) {
> +        g_free(xyz);
> +        xyz = NULL;
> +    }
> +    qemu_lockcnt_unlock(&xyz_lockcnt);
> +
> +In both the first and the third code snippets, g_free is only executed
> +if count is zero.  qemu_lockcnt_inc prevents the count from becoming
> +non-zero while the object is being freed.
> +
> +
> +QemuLockCnt can also be used to access a list as follows:
> +
> +    qemu_lockcnt_inc(&io_handlers_lockcnt);
> +    QLIST_FOREACH_RCU(ioh, &io_handlers, pioh) {
> +        if (ioh->revents & G_IO_OUT) {
> +            ioh->fd_write(ioh->opaque);
> +        }
> +    }

I'm confused, the comment of QLIST_FOREACH_RCU says "list traversal
must occur within an RCU critical section.", but there is not rcu_read_lock
here. Why?

Fam

> +
> +    if (qemu_lockcnt_dec_and_lock(&io_handlers_lockcnt)) {
> +        QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
> +            if (ioh->deleted) {
> +                QLIST_REMOVE(ioh, next);
> +                g_free(ioh);
> +            }
> +        }
> +        qemu_lockcnt_unlock(&io_handlers_lockcnt);
> +    }
> +
> +An alternative pattern uses qemu_lockcnt_dec_if_lock:
> +
> +    qemu_lockcnt_inc(&io_handlers_lockcnt);
> +    QLIST_FOREACH_SAFE_RCU(ioh, &io_handlers, next, pioh) {
> +        if (ioh->deleted && qemu_lockcnt_dec_if_lock(&io_handlers_lockcnt)) {
> +            QLIST_REMOVE(ioh, next);
> +            g_free(ioh);
> +            qemu_lockcnt_inc_and_unlock(&io_handlers_lockcnt);
> +            continue;
> +        }
> +
> +        if (ioh->revents & G_IO_OUT) {
> +            ioh->fd_write(ioh->opaque);
> +        }
> +    }
> +    qemu_lockcnt_dec(&io_handlers_lockcnt);
> +
> +Here you can use qemu_lockcnt_dec because there is no special task
> +to do if the count goes from 1 to 0.

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 03/18] qemu-thread: introduce QemuLockCnt
  2015-09-09  8:49   ` Fam Zheng
@ 2015-09-09  9:14     ` Paolo Bonzini
  0 siblings, 0 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-09-09  9:14 UTC (permalink / raw)
  To: Fam Zheng; +Cc: kwolf, qemu-block, qemu-devel, stefanha



On 09/09/2015 10:49, Fam Zheng wrote:
>> > +    qemu_lockcnt_inc(&io_handlers_lockcnt);
>> > +    QLIST_FOREACH_RCU(ioh, &io_handlers, pioh) {
>> > +        if (ioh->revents & G_IO_OUT) {
>> > +            ioh->fd_write(ioh->opaque);
>> > +        }
>> > +    }
> I'm confused, the comment of QLIST_FOREACH_RCU says "list traversal
> must occur within an RCU critical section.", but there is not rcu_read_lock
> here. Why?

Right, the comment should be updated.

RCU can be seen as a "global reference count" that prevents freeing an
object between rcu_read_lock and rcu_read_unlock.  Here the reference
count is provided by the LockCnt.

The difference between QLIST_FOREACH and QLIST_FOREACH_RCU is just that
the latter has an extra smp_read_barrier_depends.  The barrier is needed
for all lockless visits.  I think QLIST_FOREACH_RCU is more expressive
than QLIST_FOREACH_LOCKLESS or something like that.

Paolo

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 01/18] iothread: release iothread around aio_poll
  2015-08-06 13:35 ` [Qemu-devel] [PATCH 01/18] iothread: release iothread around aio_poll Paolo Bonzini
  2015-09-09  6:06   ` Fam Zheng
@ 2015-09-28  9:50   ` Stefan Hajnoczi
  2015-09-28 10:14     ` Paolo Bonzini
  1 sibling, 1 reply; 29+ messages in thread
From: Stefan Hajnoczi @ 2015-09-28  9:50 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: kwolf, famz, qemu-devel, qemu-block

On Thu, Aug 06, 2015 at 03:35:59PM +0200, Paolo Bonzini wrote:
> This is the first step towards having fine-grained critical sections in
> dataplane threads, which resolves lock ordering problems between
> address_space_* functions (which need the BQL when doing MMIO, even
> after we complete RCU-based dispatch) and the AioContext.
> 
> Because AioContext does not use contention callbacks anymore, the
> unit test has to be changed.
> 
> Previously applied as a0710f7995f914e3044e5899bd8ff6c43c62f916 and
> then reverted.

commit da5e1de95bb235330d7724316e7a29239d1359d5
Author: Stefan Hajnoczi <stefanha@redhat.com>
Date:   Wed Jun 3 10:15:33 2015 +0100

    Revert "iothread: release iothread around aio_poll"
    
    This reverts commit a0710f7995f914e3044e5899bd8ff6c43c62f916.
    
    In qemu-devel email message <556DBF87.2020908@de.ibm.com>, Christian
    Borntraeger writes:
    
      Having many guests all with a kernel/ramdisk (via -kernel) and
      several null block devices will result in hangs. All hanging
      guests are in partition detection code waiting for an I/O to return
      so very early maybe even the first I/O.
    
      Reverting that commit "fixes" the hangs.
    
    Reverting this commit for the 2.4 release.  More time is needed to
    investigate and correct this patch.

Did we ever find the root cause for hangs caused by this patch?

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 02/18] aio: rename bh_lock to list_lock
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 02/18] aio: rename bh_lock to list_lock Paolo Bonzini
  2015-09-09  6:08   ` Fam Zheng
@ 2015-09-28 10:09   ` Stefan Hajnoczi
  1 sibling, 0 replies; 29+ messages in thread
From: Stefan Hajnoczi @ 2015-09-28 10:09 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: kwolf, famz, qemu-devel, qemu-block

On Thu, Aug 06, 2015 at 03:36:00PM +0200, Paolo Bonzini wrote:
> This will be used for AioHandlers too.  There is going to be little
> or no contention, so it is better to reuse the same lock.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  async.c             | 16 ++++++++--------
>  include/block/aio.h |  2 +-
>  2 files changed, 9 insertions(+), 9 deletions(-)

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 01/18] iothread: release iothread around aio_poll
  2015-09-28  9:50   ` Stefan Hajnoczi
@ 2015-09-28 10:14     ` Paolo Bonzini
  0 siblings, 0 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-09-28 10:14 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: kwolf, famz, qemu-devel, qemu-block



On 28/09/2015 11:50, Stefan Hajnoczi wrote:
> On Thu, Aug 06, 2015 at 03:35:59PM +0200, Paolo Bonzini wrote:
>> This is the first step towards having fine-grained critical sections in
>> dataplane threads, which resolves lock ordering problems between
>> address_space_* functions (which need the BQL when doing MMIO, even
>> after we complete RCU-based dispatch) and the AioContext.
>>
>> Because AioContext does not use contention callbacks anymore, the
>> unit test has to be changed.
>>
>> Previously applied as a0710f7995f914e3044e5899bd8ff6c43c62f916 and
>> then reverted.
> 
> commit da5e1de95bb235330d7724316e7a29239d1359d5
> Author: Stefan Hajnoczi <stefanha@redhat.com>
> Date:   Wed Jun 3 10:15:33 2015 +0100
> 
>     Revert "iothread: release iothread around aio_poll"
>     
>     This reverts commit a0710f7995f914e3044e5899bd8ff6c43c62f916.
>     
>     In qemu-devel email message <556DBF87.2020908@de.ibm.com>, Christian
>     Borntraeger writes:
>     
>       Having many guests all with a kernel/ramdisk (via -kernel) and
>       several null block devices will result in hangs. All hanging
>       guests are in partition detection code waiting for an I/O to return
>       so very early maybe even the first I/O.
>     
>       Reverting that commit "fixes" the hangs.
>     
>     Reverting this commit for the 2.4 release.  More time is needed to
>     investigate and correct this patch.
> 
> Did we ever find the root cause for hangs caused by this patch?

It was fixed by commit 53ec73e ("block: Use bdrv_drain to replace
uncessary bdrv_drain_all", 2015-05-29)'s change to bdrv_set_aio_context.
 We never investigated the root cause, but I'd guess it's gone after the
2.4-rc bugfixes to AioContext.

Paolo

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 03/18] qemu-thread: introduce QemuLockCnt
  2015-08-06 13:36 ` [Qemu-devel] [PATCH 03/18] qemu-thread: introduce QemuLockCnt Paolo Bonzini
  2015-09-09  8:49   ` Fam Zheng
@ 2015-09-28 10:15   ` Stefan Hajnoczi
  2015-09-28 10:17     ` Paolo Bonzini
  1 sibling, 1 reply; 29+ messages in thread
From: Stefan Hajnoczi @ 2015-09-28 10:15 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: kwolf, famz, qemu-devel, qemu-block

On Thu, Aug 06, 2015 at 03:36:01PM +0200, Paolo Bonzini wrote:
> +int qemu_lockcnt_count(QemuLockCnt *lockcnt);

Why use int here when the counter field is unsigned?

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 03/18] qemu-thread: introduce QemuLockCnt
  2015-09-28 10:15   ` Stefan Hajnoczi
@ 2015-09-28 10:17     ` Paolo Bonzini
  0 siblings, 0 replies; 29+ messages in thread
From: Paolo Bonzini @ 2015-09-28 10:17 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: kwolf, famz, qemu-devel, qemu-block



On 28/09/2015 12:15, Stefan Hajnoczi wrote:
> On Thu, Aug 06, 2015 at 03:36:01PM +0200, Paolo Bonzini wrote:
>> > +int qemu_lockcnt_count(QemuLockCnt *lockcnt);
> Why use int here when the counter field is unsigned?

Nice catch, will fix.

Paolo

^ permalink raw reply	[flat|nested] 29+ messages in thread

end of thread, other threads:[~2015-09-28 10:17 UTC | newest]

Thread overview: 29+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2015-08-06 13:35 [Qemu-devel] [RFC PATCH 00/18] Fine-grained AioContext critical sections Paolo Bonzini
2015-08-06 13:35 ` [Qemu-devel] [PATCH 01/18] iothread: release iothread around aio_poll Paolo Bonzini
2015-09-09  6:06   ` Fam Zheng
2015-09-09  7:31     ` Paolo Bonzini
2015-09-28  9:50   ` Stefan Hajnoczi
2015-09-28 10:14     ` Paolo Bonzini
2015-08-06 13:36 ` [Qemu-devel] [PATCH 02/18] aio: rename bh_lock to list_lock Paolo Bonzini
2015-09-09  6:08   ` Fam Zheng
2015-09-28 10:09   ` Stefan Hajnoczi
2015-08-06 13:36 ` [Qemu-devel] [PATCH 03/18] qemu-thread: introduce QemuLockCnt Paolo Bonzini
2015-09-09  8:49   ` Fam Zheng
2015-09-09  9:14     ` Paolo Bonzini
2015-09-28 10:15   ` Stefan Hajnoczi
2015-09-28 10:17     ` Paolo Bonzini
2015-08-06 13:36 ` [Qemu-devel] [PATCH 04/18] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh Paolo Bonzini
2015-08-06 13:36 ` [Qemu-devel] [PATCH 05/18] aio: tweak walking in dispatch phase Paolo Bonzini
2015-08-06 13:36 ` [Qemu-devel] [PATCH 06/18] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock Paolo Bonzini
2015-08-06 13:36 ` [Qemu-devel] [PATCH 07/18] aio-win32: " Paolo Bonzini
2015-08-06 13:36 ` [Qemu-devel] [PATCH 08/18] aio: document locking Paolo Bonzini
2015-08-06 13:36 ` [Qemu-devel] [PATCH 09/18] aio: push aio_context_acquire/release down to dispatching Paolo Bonzini
2015-08-06 13:36 ` [Qemu-devel] [PATCH 10/18] async: optimize aio_bh_poll Paolo Bonzini
2015-08-06 13:36 ` [Qemu-devel] [PATCH 11/18] qemu-timer: optimize timerlist_run_timers Paolo Bonzini
2015-08-06 13:36 ` [Qemu-devel] [PATCH 12/18] block: explicitly acquire aiocontext in callbacks that need it Paolo Bonzini
2015-08-06 13:36 ` [Qemu-devel] [PATCH 13/18] block: explicitly acquire aiocontext in bottom halves " Paolo Bonzini
2015-08-06 13:36 ` [Qemu-devel] [PATCH 14/18] block: explicitly acquire aiocontext in timers " Paolo Bonzini
2015-08-06 13:36 ` [Qemu-devel] [PATCH 15/18] quorum: use atomics for rewrite_count Paolo Bonzini
2015-08-06 13:36 ` [Qemu-devel] [PATCH 16/18] quorum: split quorum_fifo_aio_cb from quorum_aio_cb Paolo Bonzini
2015-08-06 13:36 ` [Qemu-devel] [PATCH 17/18] block: explicitly acquire aiocontext in aio callbacks that need it Paolo Bonzini
2015-08-06 13:36 ` [Qemu-devel] [PATCH 18/18] aio: update locking documentation Paolo Bonzini

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.