qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH v2 00/16] aio: first part of aio_context_acquire/release pushdown
@ 2016-02-08 16:14 Paolo Bonzini
  2016-02-08 16:14 ` [Qemu-devel] [PATCH 01/16] aio: introduce aio_context_in_iothread Paolo Bonzini
                   ` (15 more replies)
  0 siblings, 16 replies; 21+ messages in thread
From: Paolo Bonzini @ 2016-02-08 16:14 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha

This is the infrastructure part of the aio_context_acquire/release pushdown,
which in turn is the first step towards a real multiqueue block layer in
QEMU.  The next step is to touch all the drivers and move calls to the
aio_context_acquire/release functions from aio-*.c to the drivers.  This
will be done in a separate patch series, which I plan to post before soft
freeze.

While the inserted lines are a lot, more than half of it are in documentation
and formal models of the code, as well as in the implementation of the new
"lockcnt" synchronization primitive.  The code is also very heavily commented.

The first four patches are new, as the issue they fix was found after posting
the previous patch.  Everything else is more or less the same as before.

Paolo

v1->v2: Update documentation [Stefan]
        Remove g_usleep from testcase [Stefan]

Paolo Bonzini (16):
  aio: introduce aio_context_in_iothread
  aio: do not really acquire/release the main AIO context
  aio: introduce aio_poll_internal
  aio: only call aio_poll_internal from iothread
  iothread: release AioContext around aio_poll
  qemu-thread: introduce QemuRecMutex
  aio: convert from RFifoLock to QemuRecMutex
  aio: rename bh_lock to list_lock
  qemu-thread: introduce QemuLockCnt
  aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh
  qemu-thread: optimize QemuLockCnt with futexes on Linux
  aio: tweak walking in dispatch phase
  aio-posix: remove walking_handlers, protecting AioHandler list with list_lock
  aio-win32: remove walking_handlers, protecting AioHandler list with list_lock
  aio: document locking
  aio: push aio_context_acquire/release down to dispatching

 aio-posix.c                     |  86 +++++----
 aio-win32.c                     | 106 ++++++-----
 async.c                         | 279 +++++++++++++++++++++++----
 block/io.c                      |  14 +-
 docs/aio_poll_drain.promela     | 210 +++++++++++++++++++++
 docs/aio_poll_drain_bug.promela | 158 ++++++++++++++++
 docs/aio_poll_sync_io.promela   |  88 +++++++++
 docs/lockcnt.txt                | 342 ++++++++++++++++++++++++++++++++++
 docs/multiple-iothreads.txt     |  63 ++++---
 include/block/aio.h             |  69 ++++---
 include/qemu/futex.h            |  36 ++++
 include/qemu/rfifolock.h        |  54 ------
 include/qemu/thread-posix.h     |   6 +
 include/qemu/thread-win32.h     |  10 +
 include/qemu/thread.h           |  23 +++
 iothread.c                      |  20 +-
 stubs/iothread-lock.c           |   5 +
 tests/.gitignore                |   1 -
 tests/Makefile                  |   2 -
 tests/test-aio.c                |  22 ++-
 tests/test-rfifolock.c          |  91 ---------
 trace-events                    |  10 +
 util/Makefile.objs              |   2 +-
 util/lockcnt.c                  | 404 ++++++++++++++++++++++++++++++++++++++++
 util/qemu-thread-posix.c        |  38 ++--
 util/qemu-thread-win32.c        |  25 +++
 util/rfifolock.c                |  78 --------
 27 files changed, 1792 insertions(+), 450 deletions(-)
 create mode 100644 docs/aio_poll_drain.promela
 create mode 100644 docs/aio_poll_drain_bug.promela
 create mode 100644 docs/aio_poll_sync_io.promela
 create mode 100644 docs/lockcnt.txt
 create mode 100644 include/qemu/futex.h
 delete mode 100644 include/qemu/rfifolock.h
 delete mode 100644 tests/test-rfifolock.c
 create mode 100644 util/lockcnt.c
 delete mode 100644 util/rfifolock.c

-- 
2.5.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [Qemu-devel] [PATCH 01/16] aio: introduce aio_context_in_iothread
  2016-02-08 16:14 [Qemu-devel] [PATCH v2 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
@ 2016-02-08 16:14 ` Paolo Bonzini
  2016-02-08 16:14 ` [Qemu-devel] [PATCH 02/16] aio: do not really acquire/release the main AIO context Paolo Bonzini
                   ` (14 subsequent siblings)
  15 siblings, 0 replies; 21+ messages in thread
From: Paolo Bonzini @ 2016-02-08 16:14 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha

This will be used by the synchronous I/O helpers, to choose between
aio_poll or QemuEvent.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 include/block/aio.h | 7 +++++++
 iothread.c          | 9 +++++++++
 2 files changed, 16 insertions(+)

diff --git a/include/block/aio.h b/include/block/aio.h
index e086e3b..9434665 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -435,6 +435,13 @@ static inline bool aio_node_check(AioContext *ctx, bool is_external)
 }
 
 /**
+ * @ctx: the aio context
+ *
+ * Return whether we are running in the I/O thread that manages @ctx.
+ */
+bool aio_context_in_iothread(AioContext *ctx);
+
+/**
  * aio_context_setup:
  * @ctx: the aio context
  *
diff --git a/iothread.c b/iothread.c
index f183d38..8d40bb0 100644
--- a/iothread.c
+++ b/iothread.c
@@ -20,6 +20,7 @@
 #include "qmp-commands.h"
 #include "qemu/error-report.h"
 #include "qemu/rcu.h"
+#include "qemu/main-loop.h"
 
 typedef ObjectClass IOThreadClass;
 
@@ -28,6 +29,13 @@ typedef ObjectClass IOThreadClass;
 #define IOTHREAD_CLASS(klass) \
    OBJECT_CLASS_CHECK(IOThreadClass, klass, TYPE_IOTHREAD)
 
+static __thread IOThread *my_iothread;
+
+bool aio_context_in_iothread(AioContext *ctx)
+{
+    return ctx == (my_iothread ? my_iothread->ctx : qemu_get_aio_context());
+}
+
 static void *iothread_run(void *opaque)
 {
     IOThread *iothread = opaque;
@@ -35,6 +43,7 @@ static void *iothread_run(void *opaque)
 
     rcu_register_thread();
 
+    my_iothread = iothread;
     qemu_mutex_lock(&iothread->init_done_lock);
     iothread->thread_id = qemu_get_thread_id();
     qemu_cond_signal(&iothread->init_done_cond);
-- 
2.5.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [Qemu-devel] [PATCH 02/16] aio: do not really acquire/release the main AIO context
  2016-02-08 16:14 [Qemu-devel] [PATCH v2 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
  2016-02-08 16:14 ` [Qemu-devel] [PATCH 01/16] aio: introduce aio_context_in_iothread Paolo Bonzini
@ 2016-02-08 16:14 ` Paolo Bonzini
  2016-02-08 16:14 ` [Qemu-devel] [PATCH 03/16] aio: introduce aio_poll_internal Paolo Bonzini
                   ` (13 subsequent siblings)
  15 siblings, 0 replies; 21+ messages in thread
From: Paolo Bonzini @ 2016-02-08 16:14 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha

The main AIO context is used in many places that are not aware
of AioContexts at all.  bdrv_drain will soon do a release/acquire
itself, which for the main AIO context would break because code
calls bdrv_drain on it without acquiring anything.

Very soon, bdrv will be ready for removal of aio_context_acquire
from non-block-layer code.  The idea is that the AioContext will be
acquired by bdrv_*, and no one will care of what's running in the
main I/O thread or in the dataplane thread.  Even if there are two
concurrent instances of the I/O thread, locks protect the data
structures; this evolves naturally to the multiqueue case where
there are multiple I/O threads touching the same BlockDriverState.

When this happens, aio_context_acquire/release can go away, replaced
by fine-grained locks, and this hack will also go away with it.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 async.c | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git a/async.c b/async.c
index d4dd2cc..d083564 100644
--- a/async.c
+++ b/async.c
@@ -369,10 +369,18 @@ void aio_context_unref(AioContext *ctx)
 
 void aio_context_acquire(AioContext *ctx)
 {
-    rfifolock_lock(&ctx->lock);
+    if (ctx == qemu_get_aio_context()) {
+        assert(qemu_mutex_iothread_locked());
+    } else {
+        rfifolock_lock(&ctx->lock);
+    }
 }
 
 void aio_context_release(AioContext *ctx)
 {
-    rfifolock_unlock(&ctx->lock);
+    if (ctx == qemu_get_aio_context()) {
+        assert(qemu_mutex_iothread_locked());
+    } else {
+        rfifolock_unlock(&ctx->lock);
+    }
 }
-- 
2.5.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [Qemu-devel] [PATCH 03/16] aio: introduce aio_poll_internal
  2016-02-08 16:14 [Qemu-devel] [PATCH v2 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
  2016-02-08 16:14 ` [Qemu-devel] [PATCH 01/16] aio: introduce aio_context_in_iothread Paolo Bonzini
  2016-02-08 16:14 ` [Qemu-devel] [PATCH 02/16] aio: do not really acquire/release the main AIO context Paolo Bonzini
@ 2016-02-08 16:14 ` Paolo Bonzini
  2016-02-08 16:14 ` [Qemu-devel] [PATCH 04/16] aio: only call aio_poll_internal from iothread Paolo Bonzini
                   ` (12 subsequent siblings)
  15 siblings, 0 replies; 21+ messages in thread
From: Paolo Bonzini @ 2016-02-08 16:14 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha

Move the implemntation of aio_poll to aio_poll_internal, and introduce
a wrapper for public use.  For now it just asserts that aio_poll is
being used correctly---either from the thread that manages the context,
or with the QEMU global mutex held.

The next patch, however, will completely separate the two cases.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-posix.c         | 2 +-
 aio-win32.c         | 2 +-
 async.c             | 8 ++++++++
 include/block/aio.h | 6 ++++++
 4 files changed, 16 insertions(+), 2 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index fa7f8ab..4dc075c 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -401,7 +401,7 @@ static void add_pollfd(AioHandler *node)
     npfd++;
 }
 
-bool aio_poll(AioContext *ctx, bool blocking)
+bool aio_poll_internal(AioContext *ctx, bool blocking)
 {
     AioHandler *node;
     int i, ret;
diff --git a/aio-win32.c b/aio-win32.c
index 6aaa32a..86ad822 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -281,7 +281,7 @@ bool aio_dispatch(AioContext *ctx)
     return progress;
 }
 
-bool aio_poll(AioContext *ctx, bool blocking)
+bool aio_poll_internal(AioContext *ctx, bool blocking)
 {
     AioHandler *node;
     HANDLE events[MAXIMUM_WAIT_OBJECTS + 1];
diff --git a/async.c b/async.c
index d083564..01c4891 100644
--- a/async.c
+++ b/async.c
@@ -300,6 +300,14 @@ void aio_notify_accept(AioContext *ctx)
     }
 }
 
+bool aio_poll(AioContext *ctx, bool blocking)
+{
+    assert(qemu_mutex_iothread_locked() ||
+           aio_context_in_iothread(ctx));
+
+    return aio_poll_internal(ctx, blocking);
+}
+
 static void aio_timerlist_notify(void *opaque)
 {
     aio_notify(opaque);
diff --git a/include/block/aio.h b/include/block/aio.h
index 9434665..986be97 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -287,6 +287,12 @@ bool aio_pending(AioContext *ctx);
  */
 bool aio_dispatch(AioContext *ctx);
 
+/* Same as aio_poll, but only meant for use in the I/O thread.
+ *
+ * This is used internally in the implementation of aio_poll.
+ */
+bool aio_poll_internal(AioContext *ctx, bool blocking);
+
 /* Progress in completing AIO work to occur.  This can issue new pending
  * aio as a result of executing I/O completion or bh callbacks.
  *
-- 
2.5.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [Qemu-devel] [PATCH 04/16] aio: only call aio_poll_internal from iothread
  2016-02-08 16:14 [Qemu-devel] [PATCH v2 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
                   ` (2 preceding siblings ...)
  2016-02-08 16:14 ` [Qemu-devel] [PATCH 03/16] aio: introduce aio_poll_internal Paolo Bonzini
@ 2016-02-08 16:14 ` Paolo Bonzini
  2016-02-08 22:22   ` Eric Blake
  2016-02-08 16:14 ` [Qemu-devel] [PATCH 05/16] iothread: release AioContext around aio_poll Paolo Bonzini
                   ` (11 subsequent siblings)
  15 siblings, 1 reply; 21+ messages in thread
From: Paolo Bonzini @ 2016-02-08 16:14 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha

aio_poll is not thread safe; it can report progress incorrectly when
called from the main thread.  The bug remains latent as long as
all of it is called within aio_context_acquire/aio_context_release,
but this will change soon.

The details of the bug are pretty simple, but fixing it in an
efficient way is thorny.  There are plenty of comments and formal
models in the patch, so I will refer to it.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 async.c                         | 218 +++++++++++++++++++++++++++++++++++++++-
 block/io.c                      |  14 ++-
 docs/aio_poll_drain.promela     | 210 ++++++++++++++++++++++++++++++++++++++
 docs/aio_poll_drain_bug.promela | 158 +++++++++++++++++++++++++++++
 docs/aio_poll_sync_io.promela   |  88 ++++++++++++++++
 docs/multiple-iothreads.txt     |  20 ++--
 include/block/aio.h             |  12 ++-
 stubs/iothread-lock.c           |   5 +
 8 files changed, 703 insertions(+), 22 deletions(-)
 create mode 100644 docs/aio_poll_drain.promela
 create mode 100644 docs/aio_poll_drain_bug.promela
 create mode 100644 docs/aio_poll_sync_io.promela

diff --git a/async.c b/async.c
index 01c4891..c93a27b 100644
--- a/async.c
+++ b/async.c
@@ -300,12 +300,224 @@ void aio_notify_accept(AioContext *ctx)
     }
 }
 
+/* aio_poll_internal is not thread-safe; it only reports progress
+ * correctly when called from one thread, because it has no
+ * history of what happened in different threads.  When called
+ * from two threads, there is a race:
+ *
+ *      main thread                       I/O thread
+ *      -----------------------           --------------------------
+ *      blk_drain
+ *        bdrv_requests_pending -> true
+ *                                        aio_poll_internal
+ *                                          process last request
+ *        aio_poll_internal
+ *
+ * Now aio_poll_internal will never exit, because there is no pending
+ * I/O on the AioContext.
+ *
+ * Therefore, aio_poll is a wrapper around aio_poll_internal that allows
+ * usage from _two_ threads: the I/O thread of course, and the main thread.
+ * When called from the main thread, aio_poll just asks the I/O thread
+ * for a nudge as soon as the next call to aio_poll is complete.
+ * Because we use QemuEvent, and QemuEvent supports a single consumer
+ * only, this only works when the calling thread holds the big QEMU lock.
+ *
+ * Because aio_poll is used in a loop, spurious wakeups are okay.
+ * Therefore, the I/O thread calls qemu_event_set very liberally
+ * (it helps that qemu_event_set is cheap on an already-set event).
+ * generally used in a loop, it's okay to have spurious wakeups.
+ * Similarly it is okay to return true when no progress was made
+ * (as long as this doesn't happen forever, or you get livelock).
+ *
+ * The important thing is that you need to report progress from
+ * aio_poll(ctx, false) correctly.  This is complicated and the
+ * implementation builds on event_notifier_set plus
+ * aio_poll(ctx, true).
+ *
+ * The implementation consists of the following functions:
+ * - aio_poll_and_wake: runs in the iothread and takes care of
+ *   waking up the main thread after aio_poll_internal returns
+ *
+ * - aio_wait_iteration: implementation of aio_poll(ctx, true)
+ *   for the main thread
+ *
+ * - aio_force_iteration: implementation of aio_poll(ctx, false)
+ *   for the main thread
+ */
+static bool aio_poll_and_wake(AioContext *ctx, bool blocking)
+{
+    bool progress = aio_poll_internal(ctx, blocking);
+    smp_wmb();
+    if (progress) {
+        ctx->progress = true;
+    }
+
+    qemu_event_set(&ctx->sync_io_event);
+    return progress;
+}
+
+static bool aio_wait_iteration(AioContext *ctx)
+{
+    /* Wait until at least one iteration has passed since the main thread
+     * last called aio_poll.
+     */
+    qemu_event_wait(&ctx->sync_io_event);
+
+    /* Remember how aio_poll is used---in a loop, until a guard condition
+     * becomes true.   By resetting the event here, we ensure that the guard
+     * condition will be checked before the next call to qemu_event_wait.
+     * The above race is resolved as follows.
+     *
+     *      main thread                       I/O thread
+     *      -----------------------           --------------------------
+     *      blk_drain
+     *        bdrv_requests_pending -> true
+     *                                        aio_poll_internal
+     *                                          process last request
+     *                                        qemu_event_set
+     *        qemu_event_wait
+     */
+    qemu_event_reset(&ctx->sync_io_event);
+
+    return atomic_xchg(&ctx->progress, false);
+}
+
+/* The first idea is to simply implement non-blocking aio_poll as
+ *
+ *     // cannot use aio_notify; because of the notify_me optimization,
+ *     // aio_notify might do nothing and then poll will block
+ *     event_notifier_set(&ctx->notifier);
+ *     progress = aio_wait_iteration(ctx);
+ *
+ * which doesn't work because of a relatively simple race.  Remember
+ * that spurious wakeups of aio_poll are common.  What can happen then is:
+ *
+ *     main thread                             I/O thread
+ *     ---------------------------             -------------------------
+ *                                             qemu_event_set();
+ *                                             ...
+ *                                             qemu_bh_schedule()
+ *     aio_poll(ctx, true)
+ *       aio_wait_iteration
+ *         qemu_event_wait();
+ *         qemu_event_reset();
+ *                                             ctx->progress = true;
+ *         progress=xchg(ctx->progress, false)
+ *                                             qemu_event_set();
+ *     ...
+ *     aio_poll(ctx, false)
+ *       event_notifier_set()
+ *       aio_wait_iteration
+ *         qemu_event_wait();
+ *         qemu_event_reset();
+ *       <<returns false>>
+ *
+ * aio_poll's contract ensures that *some* progress is made if possible;
+ * hence the bottom half should be executed before aio_poll(ctx, false)
+ * returns, and aio_poll should return true.  The failure happened because
+ * aio_poll_internal has not run at all since the last qemu_event_wait;
+ * the execution threads interleaved so that aio_poll(ctx, false)
+ * immediately returns false.
+ *
+ *
+ * The next idea then is to add a reset of the event in the non-blocking
+ * aio_poll.  This has the same problem, just a little harder to trigger:
+ *
+ *     main thread                             I/O thread
+ *     ---------------------------             -----------------------------
+ *                                             qemu_event_set();
+ *                                             qemu_bh_schedule()
+ *     aio_poll(ctx, true)
+ *       aio_wait_iteration
+ *         qemu_event_wait();
+ *         qemu_event_reset();
+ *                                             ctx->progress = true;
+ *         progress=xchg(ctx->progress, false)
+ *     ...
+ *     aio_poll(ctx, false)
+ *       qemu_event_reset();
+ *                                             qemu_event_set();
+ *       event_notifier_set()
+ *       aio_wait_iteration
+ *         qemu_event_wait();
+ *         qemu_event_reset();
+ *         progress=xchg(ctx->progress, false)
+ *       <<returns false>>
+ *
+ *
+ * Let's change approach and try running aio_wait_iteration *twice*.
+ * This doesn't work either, but for a different reason.  If the main
+ * thread is "slow", an entire execution of the loop happens in the I/O
+ * thread, and a preceding aio_poll(ctx, true) can leave ctx->progress
+ * equal to false.  But unlike the previous example, some progress was
+ * made *after* qemu_event_wait returned:
+ *
+ *     main thread                             I/O thread
+ *     ---------------------------             -----------------------------
+ *                                             qemu_event_set();
+ *                                             ...
+ *     aio_poll(ctx, true)
+ *       aio_wait_iteration
+ *         qemu_event_wait();
+ *         qemu_event_reset();
+ *                                             ctx->progress = true;
+ *                                             qemu_event_set();
+ *                                             ...
+ *                                             qemu_bh_schedule()
+ *         progress=xchg(ctx->progress, false)
+ *     aio_poll(ctx, false)
+ *       event_notifier_set()
+ *       aio_wait_iteration
+ *         qemu_event_wait();
+ *         qemu_event_reset();
+ *         progress=xchg(ctx->progress, false)
+ *                                             qemu_event_set();
+ *       event_notifier_set()
+ *       aio_wait_iteration
+ *         qemu_event_wait();
+ *         qemu_event_reset();
+ *         progress|=xchg(ctx->progress, false)
+ *       <<returns false>>
+ *
+ * However, applying both workarounds solves the problems.  The initial
+ * qemu_event_reset removes leftovers of previously completed iterations
+ * (such as in the last example).  Then, the double iteration ensures
+ * at least one execution of the dispatch phase to happen.  The dispatch
+ * phase will then execute pending bottom halves and return progress
+ * correctly.
+ */
+static bool aio_force_iteration(AioContext *ctx)
+{
+    bool progress;
+    qemu_event_reset(&ctx->sync_io_event);
+
+    event_notifier_set(&ctx->notifier);
+    progress = aio_wait_iteration(ctx);
+
+    event_notifier_set(&ctx->notifier);
+    progress |= aio_wait_iteration(ctx);
+
+    return progress;
+}
+
 bool aio_poll(AioContext *ctx, bool blocking)
 {
-    assert(qemu_mutex_iothread_locked() ||
-           aio_context_in_iothread(ctx));
+    bool progress;
+
+    if (aio_context_in_iothread(ctx)) {
+        return aio_poll_and_wake(ctx, blocking);
+    }
 
-    return aio_poll_internal(ctx, blocking);
+    assert(qemu_mutex_iothread_locked());
+    aio_context_release(ctx);
+    if (blocking) {
+        progress = aio_wait_iteration(ctx);
+    } else {
+        progress = aio_force_iteration(ctx);
+    }
+    aio_context_acquire(ctx);
+    return progress;
 }
 
 static void aio_timerlist_notify(void *opaque)
diff --git a/block/io.c b/block/io.c
index 343ff1f..8555594 100644
--- a/block/io.c
+++ b/block/io.c
@@ -270,15 +270,19 @@ static void bdrv_drain_recurse(BlockDriverState *bs)
  */
 void bdrv_drain(BlockDriverState *bs)
 {
-    bool busy = true;
+    bool busy;
 
     bdrv_drain_recurse(bs);
-    while (busy) {
+    do {
         /* Keep iterating */
          bdrv_flush_io_queue(bs);
-         busy = bdrv_requests_pending(bs);
-         busy |= aio_poll(bdrv_get_aio_context(bs), busy);
-    }
+         if (bdrv_requests_pending(bs)) {
+             aio_poll(bdrv_get_aio_context(bs), true);
+             busy = true;
+         } else {
+             busy = aio_poll(bdrv_get_aio_context(bs), false);
+         }
+    } while (busy);
 }
 
 /*
diff --git a/docs/aio_poll_drain.promela b/docs/aio_poll_drain.promela
new file mode 100644
index 0000000..302b333
--- /dev/null
+++ b/docs/aio_poll_drain.promela
@@ -0,0 +1,210 @@
+/*
+ * Demonstrate the behavior of blk_drain() when called from the
+ * iothread.
+ *
+ * Author: Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This file is in the public domain.  If you really want a license,
+ * the WTFPL will do.
+ *
+ * To simulate it:
+ *     spin -p docs/aio_poll_sync_io.promela
+ *
+ * To verify it:
+ *     spin -a docs/aio_poll_sync_io.promela
+ *     gcc -O2 pan.c -DSAFETY
+ *     ./a.out
+ *
+ * To verify it (with a bug planted):
+ *     spin -DBUG1 -a docs/aio_poll_sync_io.promela
+ *     gcc -O2 pan.c -DSAFETY
+ *     ./a.out
+ *
+ * (try BUG2/BUG3/BUG4 too)
+ */
+
+#define REQS         3
+#define REQS_INITIAL 1
+
+int req_submitted = 0;
+int req_completed = 0;
+int req_pending   = 0;
+
+
+/* This is a mutex.  */
+int mutex = 0;
+#define LOCK(mutex)   atomic { mutex == 0 -> mutex = 1; }
+#define UNLOCK(mutex) mutex = 0
+
+/* A QEMUBH (e.g. from thread-pool.c) */
+bool io_bh_scheduled;
+bool spawn_bh_scheduled;
+
+/* The AioContext's EventNotifier */
+bool aio_notify;
+
+/* The AioContext's QemuEvent.  Typically the first call to aio_poll
+ * produces a spurious wakeup, which is modeled by these initializers.
+ */
+bool progress = 1;
+bool sync_io_event = 1;
+
+proctype io_worker(int req_id)
+{
+    LOCK(mutex);
+        req_completed++;
+    UNLOCK(mutex);
+
+    io_bh_scheduled = 1;
+    aio_notify = 1;
+}
+
+#define SUBMIT_ONE_REQ {                         \
+       run io_worker(req_submitted);             \
+       req_submitted++;                          \
+       req_pending++;                            \
+    }
+
+active proctype iothread()
+{
+    bool old;
+    bool aio_poll_progress;
+
+    /* The iothread keeps calling aio_poll(..., true).  At the end
+     * of the simulation it is stuck here waiting for a request,
+     * so mark this as a valid end state.
+     */
+end:
+    do
+        :: aio_notify -> {
+            /* aio_notify_accept */
+            aio_notify = 0;
+
+            aio_poll_progress = 0;
+
+            /* Execute the "bottom half" which can spawn more requests */
+            atomic { old = spawn_bh_scheduled; spawn_bh_scheduled = 0; }
+            if
+                :: old -> {
+                    LOCK(mutex);
+                        if
+                           :: req_submitted < REQS -> SUBMIT_ONE_REQ
+                           :: else                 -> skip;
+                        fi;
+                    UNLOCK(mutex);
+                    aio_poll_progress = 1;
+                }
+                :: else -> skip;
+            fi;
+
+            /* Execute the bottom half which spawns the other bh */
+            atomic { old = io_bh_scheduled; io_bh_scheduled = 0; }
+            if
+                :: old -> {
+                    LOCK(mutex);
+                        req_pending = req_pending - req_completed;
+                        req_completed = 0;
+                        assert(req_pending >= 0);
+                    UNLOCK(mutex);
+                    aio_poll_progress = 1;
+                    spawn_bh_scheduled = 1;
+                    aio_notify = 1;
+                }
+                :: else -> skip;
+            fi;
+
+            /* The above is aio_poll_internal, this is aio_poll */
+            if
+                :: aio_poll_progress -> progress = 1;
+                :: else -> skip;
+            fi;
+            sync_io_event = 1;
+        }
+    od;
+}
+
+#define AIO_POLL_ITERATION             \
+    if                                 \
+        :: sync_io_event -> skip;      \
+    fi;                                \
+    sync_io_event = 0;                 \
+    atomic {                           \
+        aio_poll_result = aio_poll_result | progress;    \
+        progress = 0;                  \
+    }
+    
+#define AIO_POLL                       \
+    aio_poll_result = 0;               \
+    AIO_POLL_ITERATION
+
+#if defined BUG1
+/* Skip the nonblocking aio_poll altogether */
+#define AIO_POLL_NONBLOCKING aio_poll_result = 0
+
+#elif defined BUG2
+/* Simple but wrong implementation */
+#define AIO_POLL_NONBLOCKING           \
+    aio_poll_result = 0;               \
+    aio_notify = 1;                    \
+    AIO_POLL_ITERATION
+
+#elif defined BUG3
+/* Workaround 1 is not enough */
+#define AIO_POLL_NONBLOCKING           \
+    sync_io_event = 0;                 \
+    aio_poll_result = 0;               \
+    aio_notify = 1;                    \
+    AIO_POLL_ITERATION
+
+#elif defined BUG4
+/* Workaround 2 is not enough */
+#define AIO_POLL_NONBLOCKING           \
+    aio_poll_result = 0;               \
+    aio_notify = 1;                    \
+    AIO_POLL_ITERATION                 \
+    aio_notify = 1;                    \
+    AIO_POLL_ITERATION
+
+#else
+/* Combining them works! */
+#define AIO_POLL_NONBLOCKING           \
+    sync_io_event = 0;                 \
+    aio_poll_result = 0;               \
+    aio_notify = 1;                    \
+    AIO_POLL_ITERATION                 \
+    aio_notify = 1;                    \
+    AIO_POLL_ITERATION
+#endif
+
+active proctype mainthread()
+{
+    bool aio_poll_result;
+    bool busy = true;
+
+    LOCK(mutex);
+        /* Start with a few requests */
+        do
+           :: req_submitted < REQS_INITIAL -> SUBMIT_ONE_REQ
+           :: else                         -> break;
+        od;
+    UNLOCK(mutex);
+
+    do
+        :: !busy -> break;
+
+        :: else -> {
+            /* This is blk_drain */
+            LOCK(mutex);
+                busy = req_pending > 0;
+            UNLOCK(mutex);
+            if
+               :: busy -> AIO_POLL;
+               :: else -> AIO_POLL_NONBLOCKING; busy = aio_poll_result;
+            fi;
+        }
+    od;
+
+    LOCK(mutex);
+        assert(!req_pending && req_submitted == REQS);
+    UNLOCK(mutex);
+}
diff --git a/docs/aio_poll_drain_bug.promela b/docs/aio_poll_drain_bug.promela
new file mode 100644
index 0000000..26f351d
--- /dev/null
+++ b/docs/aio_poll_drain_bug.promela
@@ -0,0 +1,158 @@
+/*
+ * Demonstrate the behavior of blk_drain() when called from the
+ * iothread.
+ *
+ * Author: Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This file is in the public domain.  If you really want a license,
+ * the WTFPL will do.
+ *
+ * To simulate it:
+ *     spin -p docs/aio_poll_sync_io.promela
+ *
+ * To verify it:
+ *     spin -a docs/aio_poll_sync_io.promela
+ *     gcc -O2 pan.c -DSAFETY
+ *     ./a.out
+ *
+ * Note that the model verifies successfully, but it is actually buggy.
+ * The reason is that the bottom half structure in the iothread is
+ * overly simplified.
+ *
+ * This explains why blk_drain() needs to use non-blocking aio_poll,
+ * for whose modeling see aio_poll_drain.promela.
+ */
+
+#define REQS         3
+#define REQS_INITIAL 1
+
+int req_submitted = 0;
+int req_completed = 0;
+int req_pending   = 0;
+
+
+/* This is a mutex.  */
+int mutex = 0;
+#define LOCK(mutex)   atomic { mutex == 0 -> mutex = 1; }
+#define UNLOCK(mutex) mutex = 0
+
+/* A QEMUBH (e.g. from thread-pool.c) */
+bool io_bh_scheduled;
+
+/* The AioContext's EventNotifier */
+bool aio_notify;
+
+/* The AioContext's QemuEvent.  Typically the first call to aio_poll
+ * produces a spurious wakeup.  In aio_poll_drain.promela, the
+ * initializers are set to 1 to model this.  This model is buggy
+ * so we skip this additional torture...
+ */
+bool progress;
+bool sync_io_event;
+
+proctype io_worker(int req_id)
+{
+    LOCK(mutex);
+        req_completed++;
+    UNLOCK(mutex);
+
+    io_bh_scheduled = 1;
+    aio_notify = 1;
+}
+
+#define SUBMIT_ONE_REQ {                         \
+       run io_worker(req_submitted);             \
+       req_submitted++;                          \
+       req_pending++;                            \
+    }
+
+active proctype iothread()
+{
+    bool old;
+    bool aio_poll_progress;
+
+    /* The iothread keeps calling aio_poll(..., true).  At the end
+     * of the simulation it is stuck here waiting for a request,
+     * so mark this as a valid end state.
+     */
+end:
+    do
+        :: aio_notify -> {
+            /* aio_notify_accept */
+            aio_notify = 0;
+
+            aio_poll_progress = 0;
+
+            atomic { old = io_bh_scheduled; io_bh_scheduled = 0; }
+            if
+                :: old -> {
+                    LOCK(mutex);
+                        req_pending = req_pending - req_completed;
+                        req_completed = 0;
+                        assert(req_pending >= 0);
+
+                        if
+                           :: req_submitted < REQS -> SUBMIT_ONE_REQ
+                           :: else                 -> skip;
+                        fi;
+                    UNLOCK(mutex);
+                    aio_poll_progress = 1;
+                }
+                :: else -> skip;
+            fi;
+
+            /* The above is aio_poll_internal, this is aio_poll */
+            if
+                :: aio_poll_progress -> progress = 1;
+                :: else -> skip;
+            fi;
+            sync_io_event = 1;
+        }
+    od;
+}
+
+#define AIO_POLL_ITERATION             \
+    if                                 \
+        :: sync_io_event -> skip;      \
+    fi;                                \
+    sync_io_event = 0;                 \
+    atomic {                           \
+        aio_poll_result = aio_poll_result | progress;    \
+        progress = 0;                  \
+    }
+
+#define AIO_POLL                       \
+    aio_poll_result = 0;               \
+    AIO_POLL_ITERATION
+
+active proctype mainthread()
+{
+    bool aio_poll_result;
+    bool busy = true;
+
+    LOCK(mutex);
+        /* Start with a few requests */
+        do
+           :: req_submitted < REQS_INITIAL -> SUBMIT_ONE_REQ
+           :: else                         -> break;
+        od;
+    UNLOCK(mutex);
+
+    do
+        :: true -> {
+            /* This is blk_drain */
+            LOCK(mutex);
+                busy = req_pending > 0;
+            UNLOCK(mutex);
+            if
+               :: busy -> AIO_POLL;
+               :: else -> break;
+            fi;
+        }
+    od;
+
+    LOCK(mutex);
+        assert(!req_pending && req_submitted == REQS);
+    UNLOCK(mutex);
+}
+
diff --git a/docs/aio_poll_sync_io.promela b/docs/aio_poll_sync_io.promela
new file mode 100644
index 0000000..3851d8e
--- /dev/null
+++ b/docs/aio_poll_sync_io.promela
@@ -0,0 +1,88 @@
+/*
+ * Demonstrate the behavior of aio_poll(ctx, true) when called from the
+ * iothread.
+ *
+ * Author: Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This file is in the public domain.  If you really want a license,
+ * the WTFPL will do.
+ *
+ * To simulate it:
+ *     spin -p docs/aio_poll_sync_io.promela
+ *
+ * To verify it:
+ *     spin -a docs/aio_poll_sync_io.promela
+ *     gcc -O2 pan.c -DSAFETY
+ *     ./a.out
+ *
+ * To verify it (with a bug planted in the model):
+ *     spin -a -DBUG docs/aio_poll_sync_io.promela
+ *     gcc -O2 pan.c -DSAFETY
+ *     ./a.out
+ */
+
+/* A QEMUBH (e.g. from thread-pool.c) */
+bool io_bh_scheduled;
+
+/* The AioContext's EventNotifier */
+bool aio_notify;
+
+/* The "guard condition" for synchronous I/O */
+bool io_done;
+
+/* The AioContext's QemuEvent */
+bool sync_io_event;
+
+active proctype io_worker()
+{
+    /* qemu_bh_schedule */
+    io_bh_scheduled = 1;
+    aio_notify = 1;
+}
+
+active proctype iothread()
+{
+    bool old;
+
+    /* The iothread keeps calling aio_poll(..., true).  At the end
+     * of the simulation it is stuck here waiting for a request,
+     * so mark this as a valid end state.
+     */
+end:
+    do
+        :: aio_notify -> {
+            /* aio_notify_accept */
+            aio_notify = 0;
+
+            /* Execute the "bottom half" */
+
+            atomic { old = io_bh_scheduled; io_bh_scheduled = 0; }
+            if
+                :: old -> { io_done = 1; }
+                :: else -> skip;
+            fi;
+
+            /* The above is aio_poll_internal, this is aio_poll */
+#ifndef BUG
+            sync_io_event = 1;
+#endif
+        }
+    od;
+}
+
+active proctype mainthread()
+{
+    do
+        :: io_done -> break;
+
+        :: else -> {
+            /* qemu_event_wait */
+            if
+               :: sync_io_event -> skip;
+            fi;
+
+            /* qemu_event_reset */
+            sync_io_event = 0;
+        }
+    od;
+}
diff --git a/docs/multiple-iothreads.txt b/docs/multiple-iothreads.txt
index 40b8419..ac3e74c 100644
--- a/docs/multiple-iothreads.txt
+++ b/docs/multiple-iothreads.txt
@@ -88,17 +88,15 @@ descriptors, event notifiers, timers, or BHs across threads:
 notifier, timer, or BH callbacks invoked by the AioContext.  No locking is
 necessary.
 
-2. Other threads wishing to access the AioContext must use
-aio_context_acquire()/aio_context_release() for mutual exclusion.  Once the
-context is acquired no other thread can access it or run event loop iterations
-in this AioContext.
-
-aio_context_acquire()/aio_context_release() calls may be nested.  This
-means you can call them if you're not sure whether #1 applies.
-
-There is currently no lock ordering rule if a thread needs to acquire multiple
-AioContexts simultaneously.  Therefore, it is only safe for code holding the
-QEMU global mutex to acquire other AioContexts.
+2. Other threads wishing to access the AioContext must take the QEMU global
+mutex *as well as* call aio_context_acquire()/aio_context_release() for
+mutual exclusion.  The QEMU global mutex must be taken outside the call
+to aio_context_acquire().
+
+No lock ordering rule is necessary if a thread needs to acquire multiple
+AioContexts simultaneously.  Because the IOThreads won't ever acquire
+multiple AioContexts, it is always safe for the owner of the QEMU global
+mutex to acquire any number of them.
 
 Side note: the best way to schedule a function call across threads is to create
 a BH in the target AioContext beforehand and then call qemu_bh_schedule().  No
diff --git a/include/block/aio.h b/include/block/aio.h
index 986be97..191bd3e 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -129,6 +129,12 @@ struct AioContext {
     int epollfd;
     bool epoll_enabled;
     bool epoll_available;
+
+    /* Returns whether there has been progress on the AioContext since
+     * the last time this field was set to false.
+     */
+    bool progress;
+    QemuEvent sync_io_event;
 };
 
 /**
@@ -296,9 +302,9 @@ bool aio_poll_internal(AioContext *ctx, bool blocking);
 /* Progress in completing AIO work to occur.  This can issue new pending
  * aio as a result of executing I/O completion or bh callbacks.
  *
- * Return whether any progress was made by executing AIO or bottom half
- * handlers.  If @blocking == true, this should always be true except
- * if someone called aio_notify.
+ * Return whether any progress was made since the last call to aio_poll
+ * in the execution of AIO or bottom half handlers.  If @blocking ==
+ * true, this should always be true except if someone called aio_notify.
  *
  * If there are no pending bottom halves, but there are pending AIO
  * operations, it may not be possible to make any progress without
diff --git a/stubs/iothread-lock.c b/stubs/iothread-lock.c
index 9b6db2e..aa61c20 100644
--- a/stubs/iothread-lock.c
+++ b/stubs/iothread-lock.c
@@ -14,3 +14,8 @@ void qemu_mutex_lock_iothread(void)
 void qemu_mutex_unlock_iothread(void)
 {
 }
+
+bool aio_context_in_iothread(AioContext *ctx)
+{
+    return true;
+}
-- 
2.5.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [Qemu-devel] [PATCH 05/16] iothread: release AioContext around aio_poll
  2016-02-08 16:14 [Qemu-devel] [PATCH v2 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
                   ` (3 preceding siblings ...)
  2016-02-08 16:14 ` [Qemu-devel] [PATCH 04/16] aio: only call aio_poll_internal from iothread Paolo Bonzini
@ 2016-02-08 16:14 ` Paolo Bonzini
  2016-02-08 16:14 ` [Qemu-devel] [PATCH 06/16] qemu-thread: introduce QemuRecMutex Paolo Bonzini
                   ` (10 subsequent siblings)
  15 siblings, 0 replies; 21+ messages in thread
From: Paolo Bonzini @ 2016-02-08 16:14 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha

This is the first step towards having fine-grained critical sections in
dataplane threads, which resolves lock ordering problems between
address_space_* functions (which need the BQL when doing MMIO, even
after we complete RCU-based dispatch) and the AioContext.

Because AioContext does not use contention callbacks anymore, the
unit test has to be changed.

Previously applied as a0710f7995f914e3044e5899bd8ff6c43c62f916 and
then reverted.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---

v1->v2: Update function names in documentation [Stefan]
        Remove g_usleep from testcase [Stefan]

 async.c                     | 22 ++++------------------
 docs/multiple-iothreads.txt | 40 +++++++++++++++++++++++-----------------
 include/block/aio.h         |  3 ---
 iothread.c                  | 11 ++---------
 tests/test-aio.c            | 22 ++++++++++++++--------
 5 files changed, 43 insertions(+), 55 deletions(-)

diff --git a/async.c b/async.c
index c93a27b..e092b1c 100644
--- a/async.c
+++ b/async.c
@@ -85,8 +85,8 @@ int aio_bh_poll(AioContext *ctx)
          * aio_notify again if necessary.
          */
         if (!bh->deleted && atomic_xchg(&bh->scheduled, 0)) {
-            /* Idle BHs and the notify BH don't count as progress */
-            if (!bh->idle && bh != ctx->notify_dummy_bh) {
+            /* Idle BHs don't count as progress */
+            if (!bh->idle) {
                 ret = 1;
             }
             bh->idle = 0;
@@ -238,7 +238,6 @@ aio_ctx_finalize(GSource     *source)
 {
     AioContext *ctx = (AioContext *) source;
 
-    qemu_bh_delete(ctx->notify_dummy_bh);
     thread_pool_free(ctx->thread_pool);
 
     qemu_mutex_lock(&ctx->bh_lock);
@@ -525,19 +524,6 @@ static void aio_timerlist_notify(void *opaque)
     aio_notify(opaque);
 }
 
-static void aio_rfifolock_cb(void *opaque)
-{
-    AioContext *ctx = opaque;
-
-    /* Kick owner thread in case they are blocked in aio_poll() */
-    qemu_bh_schedule(ctx->notify_dummy_bh);
-}
-
-static void notify_dummy_bh(void *opaque)
-{
-    /* Do nothing, we were invoked just to force the event loop to iterate */
-}
-
 static void event_notifier_dummy_cb(EventNotifier *e)
 {
 }
@@ -566,10 +552,10 @@ AioContext *aio_context_new(Error **errp)
                            event_notifier_dummy_cb);
     ctx->thread_pool = NULL;
     qemu_mutex_init(&ctx->bh_lock);
-    rfifolock_init(&ctx->lock, aio_rfifolock_cb, ctx);
+    rfifolock_init(&ctx->lock, NULL, NULL);
     timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 
-    ctx->notify_dummy_bh = aio_bh_new(ctx, notify_dummy_bh, NULL);
+    qemu_event_init(&ctx->sync_io_event, true);
 
     return ctx;
 fail:
diff --git a/docs/multiple-iothreads.txt b/docs/multiple-iothreads.txt
index ac3e74c..c5d38e9 100644
--- a/docs/multiple-iothreads.txt
+++ b/docs/multiple-iothreads.txt
@@ -103,13 +103,10 @@ a BH in the target AioContext beforehand and then call qemu_bh_schedule().  No
 acquire/release or locking is needed for the qemu_bh_schedule() call.  But be
 sure to acquire the AioContext for aio_bh_new() if necessary.
 
-The relationship between AioContext and the block layer
--------------------------------------------------------
-The AioContext originates from the QEMU block layer because it provides a
-scoped way of running event loop iterations until all work is done.  This
-feature is used to complete all in-flight block I/O requests (see
-bdrv_drain_all()).  Nowadays AioContext is a generic event loop that can be
-used by any QEMU subsystem.
+AioContext and the block layer
+------------------------------
+The AioContext originates from the QEMU block layer, even though nowadays
+AioContext is a generic event loop that can be used by any QEMU subsystem.
 
 The block layer has support for AioContext integrated.  Each BlockDriverState
 is associated with an AioContext using bdrv_set_aio_context() and
@@ -120,13 +117,22 @@ Block layer code must therefore expect to run in an IOThread and avoid using
 old APIs that implicitly use the main loop.  See the "How to program for
 IOThreads" above for information on how to do that.
 
-If main loop code such as a QMP function wishes to access a BlockDriverState it
-must first call aio_context_acquire(bdrv_get_aio_context(bs)) to ensure the
-IOThread does not run in parallel.
-
-Long-running jobs (usually in the form of coroutines) are best scheduled in the
-BlockDriverState's AioContext to avoid the need to acquire/release around each
-bdrv_*() call.  Be aware that there is currently no mechanism to get notified
-when bdrv_set_aio_context() moves this BlockDriverState to a different
-AioContext (see bdrv_detach_aio_context()/bdrv_attach_aio_context()), so you
-may need to add this if you want to support long-running jobs.
+If main loop code such as a QMP function wishes to access a BlockDriverState
+it must first call aio_context_acquire(bdrv_get_aio_context(bs)) to ensure
+that callbacks in the IOThread do not run in parallel.
+
+Code running in the monitor typically needs to ensure that past
+requests from the guest are completed.  When a block device is running
+in an IOThread, the IOThread can also process requests from the guest
+(via ioeventfd).  To achieve both objects, wrap the code between
+bdrv_drained_begin() and bdrv_drained_end(), thus creating a "drained
+section".  The functions must be called between aio_context_acquire()
+and aio_context_release().  You can freely release and re-acquire the
+AioContext within a drained section.
+
+Long-running jobs (usually in the form of coroutines) are best scheduled in
+the BlockDriverState's AioContext to avoid the need to acquire/release around
+each bdrv_*() call.  The functions bdrv_add/remove_aio_context_notifier,
+or alternatively blk_add/remove_aio_context_notifier if you use BlockBackends,
+can be used to get a notification whenever bdrv_set_aio_context() moves a
+BlockDriverState to a different AioContext.
diff --git a/include/block/aio.h b/include/block/aio.h
index 191bd3e..7223daf 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -114,9 +114,6 @@ struct AioContext {
     bool notified;
     EventNotifier notifier;
 
-    /* Scheduling this BH forces the event loop it iterate */
-    QEMUBH *notify_dummy_bh;
-
     /* Thread pool for performing work and receiving completion callbacks */
     struct ThreadPool *thread_pool;
 
diff --git a/iothread.c b/iothread.c
index 8d40bb0..f66ec95 100644
--- a/iothread.c
+++ b/iothread.c
@@ -39,7 +39,6 @@ bool aio_context_in_iothread(AioContext *ctx)
 static void *iothread_run(void *opaque)
 {
     IOThread *iothread = opaque;
-    bool blocking;
 
     rcu_register_thread();
 
@@ -49,14 +48,8 @@ static void *iothread_run(void *opaque)
     qemu_cond_signal(&iothread->init_done_cond);
     qemu_mutex_unlock(&iothread->init_done_lock);
 
-    while (!iothread->stopping) {
-        aio_context_acquire(iothread->ctx);
-        blocking = true;
-        while (!iothread->stopping && aio_poll(iothread->ctx, blocking)) {
-            /* Progress was made, keep going */
-            blocking = false;
-        }
-        aio_context_release(iothread->ctx);
+    while (!atomic_read(&iothread->stopping)) {
+        aio_poll(iothread->ctx, true);
     }
 
     rcu_unregister_thread();
diff --git a/tests/test-aio.c b/tests/test-aio.c
index 6ccea98..3fe27e7 100644
--- a/tests/test-aio.c
+++ b/tests/test-aio.c
@@ -99,6 +99,7 @@ static void event_ready_cb(EventNotifier *e)
 
 typedef struct {
     QemuMutex start_lock;
+    EventNotifier notifier;
     bool thread_acquired;
 } AcquireTestData;
 
@@ -110,6 +111,11 @@ static void *test_acquire_thread(void *opaque)
     qemu_mutex_lock(&data->start_lock);
     qemu_mutex_unlock(&data->start_lock);
 
+    /* event_notifier_set might be called either before or after
+     * the main thread's call to poll().  The test case's outcome
+     * should be the same in either case.
+     */
+    event_notifier_set(&data->notifier);
     aio_context_acquire(ctx);
     aio_context_release(ctx);
 
@@ -124,20 +130,19 @@ static void set_event_notifier(AioContext *ctx, EventNotifier *notifier,
     aio_set_event_notifier(ctx, notifier, false, handler);
 }
 
-static void dummy_notifier_read(EventNotifier *unused)
+static void dummy_notifier_read(EventNotifier *n)
 {
-    g_assert(false); /* should never be invoked */
+    event_notifier_test_and_clear(n);
 }
 
 static void test_acquire(void)
 {
     QemuThread thread;
-    EventNotifier notifier;
     AcquireTestData data;
 
     /* Dummy event notifier ensures aio_poll() will block */
-    event_notifier_init(&notifier, false);
-    set_event_notifier(ctx, &notifier, dummy_notifier_read);
+    event_notifier_init(&data.notifier, false);
+    set_event_notifier(ctx, &data.notifier, dummy_notifier_read);
     g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */
 
     qemu_mutex_init(&data.start_lock);
@@ -151,12 +156,13 @@ static void test_acquire(void)
     /* Block in aio_poll(), let other thread kick us and acquire context */
     aio_context_acquire(ctx);
     qemu_mutex_unlock(&data.start_lock); /* let the thread run */
-    g_assert(!aio_poll(ctx, true));
+    g_assert(aio_poll(ctx, true));
+    g_assert(!data.thread_acquired);
     aio_context_release(ctx);
 
     qemu_thread_join(&thread);
-    set_event_notifier(ctx, &notifier, NULL);
-    event_notifier_cleanup(&notifier);
+    set_event_notifier(ctx, &data.notifier, NULL);
+    event_notifier_cleanup(&data.notifier);
 
     g_assert(data.thread_acquired);
 }
-- 
2.5.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [Qemu-devel] [PATCH 06/16] qemu-thread: introduce QemuRecMutex
  2016-02-08 16:14 [Qemu-devel] [PATCH v2 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
                   ` (4 preceding siblings ...)
  2016-02-08 16:14 ` [Qemu-devel] [PATCH 05/16] iothread: release AioContext around aio_poll Paolo Bonzini
@ 2016-02-08 16:14 ` Paolo Bonzini
  2016-02-08 16:14 ` [Qemu-devel] [PATCH 07/16] aio: convert from RFifoLock to QemuRecMutex Paolo Bonzini
                   ` (9 subsequent siblings)
  15 siblings, 0 replies; 21+ messages in thread
From: Paolo Bonzini @ 2016-02-08 16:14 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha

GRecMutex is new in glib 2.32, so we cannot use it.  Introduce
a recursive mutex in qemu-thread instead, which will be used
instead of RFifoLock.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 include/qemu/thread-posix.h |  6 ++++++
 include/qemu/thread-win32.h | 10 ++++++++++
 include/qemu/thread.h       |  3 +++
 util/qemu-thread-posix.c    | 13 +++++++++++++
 util/qemu-thread-win32.c    | 25 +++++++++++++++++++++++++
 5 files changed, 57 insertions(+)

diff --git a/include/qemu/thread-posix.h b/include/qemu/thread-posix.h
index eb5c7a1..2fb6b90 100644
--- a/include/qemu/thread-posix.h
+++ b/include/qemu/thread-posix.h
@@ -3,6 +3,12 @@
 #include "pthread.h"
 #include <semaphore.h>
 
+typedef QemuMutex QemuRecMutex;
+#define qemu_rec_mutex_destroy qemu_mutex_destroy
+#define qemu_rec_mutex_lock qemu_mutex_lock
+#define qemu_rec_mutex_try_lock qemu_mutex_try_lock
+#define qemu_rec_mutex_unlock qemu_mutex_unlock
+
 struct QemuMutex {
     pthread_mutex_t lock;
 };
diff --git a/include/qemu/thread-win32.h b/include/qemu/thread-win32.h
index 385ff5f..50999c5 100644
--- a/include/qemu/thread-win32.h
+++ b/include/qemu/thread-win32.h
@@ -7,6 +7,16 @@ struct QemuMutex {
     LONG owner;
 };
 
+typedef struct QemuRecMutex QemuRecMutex;
+struct QemuRecMutex {
+    CRITICAL_SECTION lock;
+};
+
+void qemu_rec_mutex_destroy(QemuMutex *mutex);
+void qemu_rec_mutex_lock(QemuMutex *mutex);
+int qemu_rec_mutex_trylock(QemuMutex *mutex);
+void qemu_rec_mutex_unlock(QemuMutex *mutex);
+
 struct QemuCond {
     LONG waiters, target;
     HANDLE sema;
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index 5114ec8..981f3dc 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -25,6 +25,9 @@ void qemu_mutex_lock(QemuMutex *mutex);
 int qemu_mutex_trylock(QemuMutex *mutex);
 void qemu_mutex_unlock(QemuMutex *mutex);
 
+/* Prototypes for other functions are in thread-posix.h/thread-win32.h.  */
+void qemu_rec_mutex_init(QemuRecMutex *mutex);
+
 void qemu_cond_init(QemuCond *cond);
 void qemu_cond_destroy(QemuCond *cond);
 
diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c
index 74a3023..1aec83f 100644
--- a/util/qemu-thread-posix.c
+++ b/util/qemu-thread-posix.c
@@ -80,6 +80,19 @@ void qemu_mutex_unlock(QemuMutex *mutex)
         error_exit(err, __func__);
 }
 
+void qemu_rec_mutex_init(QemuRecMutex *mutex)
+{
+    int err;
+    pthread_mutexattr_t attr;
+
+    pthread_mutexattr_init(&attr);
+    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
+    err = pthread_mutex_init(&mutex->lock, &attr);
+    if (err) {
+        error_exit(err, __func__);
+    }
+}
+
 void qemu_cond_init(QemuCond *cond)
 {
     int err;
diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c
index 98a5ddf..171d83c 100644
--- a/util/qemu-thread-win32.c
+++ b/util/qemu-thread-win32.c
@@ -79,6 +79,31 @@ void qemu_mutex_unlock(QemuMutex *mutex)
     LeaveCriticalSection(&mutex->lock);
 }
 
+void qemu_rec_mutex_init(QemuRecMutex *mutex)
+{
+    InitializeCriticalSection(&mutex->lock);
+}
+
+void qemu_rec_mutex_destroy(QemuRecMutex *mutex)
+{
+    DeleteCriticalSection(&mutex->lock);
+}
+
+void qemu_rec_mutex_lock(QemuRecMutex *mutex)
+{
+    EnterCriticalSection(&mutex->lock);
+}
+
+int qemu_rec_mutex_trylock(QemuRecMutex *mutex)
+{
+    return !TryEnterCriticalSection(&mutex->lock);
+}
+
+void qemu_rec_mutex_unlock(QemuRecMutex *mutex)
+{
+    LeaveCriticalSection(&mutex->lock);
+}
+
 void qemu_cond_init(QemuCond *cond)
 {
     memset(cond, 0, sizeof(*cond));
-- 
2.5.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [Qemu-devel] [PATCH 07/16] aio: convert from RFifoLock to QemuRecMutex
  2016-02-08 16:14 [Qemu-devel] [PATCH v2 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
                   ` (5 preceding siblings ...)
  2016-02-08 16:14 ` [Qemu-devel] [PATCH 06/16] qemu-thread: introduce QemuRecMutex Paolo Bonzini
@ 2016-02-08 16:14 ` Paolo Bonzini
  2016-02-08 16:14 ` [Qemu-devel] [PATCH 08/16] aio: rename bh_lock to list_lock Paolo Bonzini
                   ` (8 subsequent siblings)
  15 siblings, 0 replies; 21+ messages in thread
From: Paolo Bonzini @ 2016-02-08 16:14 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha

It is simpler and a bit faster, and QEMU does not need the contention
callbacks (and thus the fairness) anymore.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 async.c                  |  8 ++---
 include/block/aio.h      |  3 +-
 include/qemu/rfifolock.h | 54 ----------------------------
 tests/.gitignore         |  1 -
 tests/Makefile           |  2 --
 tests/test-rfifolock.c   | 91 ------------------------------------------------
 util/Makefile.objs       |  1 -
 util/rfifolock.c         | 78 -----------------------------------------
 8 files changed, 5 insertions(+), 233 deletions(-)
 delete mode 100644 include/qemu/rfifolock.h
 delete mode 100644 tests/test-rfifolock.c
 delete mode 100644 util/rfifolock.c

diff --git a/async.c b/async.c
index e092b1c..edb2846 100644
--- a/async.c
+++ b/async.c
@@ -254,7 +254,7 @@ aio_ctx_finalize(GSource     *source)
 
     aio_set_event_notifier(ctx, &ctx->notifier, false, NULL);
     event_notifier_cleanup(&ctx->notifier);
-    rfifolock_destroy(&ctx->lock);
+    qemu_rec_mutex_destroy(&ctx->lock);
     qemu_mutex_destroy(&ctx->bh_lock);
     timerlistgroup_deinit(&ctx->tlg);
 }
@@ -552,7 +552,7 @@ AioContext *aio_context_new(Error **errp)
                            event_notifier_dummy_cb);
     ctx->thread_pool = NULL;
     qemu_mutex_init(&ctx->bh_lock);
-    rfifolock_init(&ctx->lock, NULL, NULL);
+    qemu_rec_mutex_init(&ctx->lock);
     timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 
     qemu_event_init(&ctx->sync_io_event, true);
@@ -578,7 +578,7 @@ void aio_context_acquire(AioContext *ctx)
     if (ctx == qemu_get_aio_context()) {
         assert(qemu_mutex_iothread_locked());
     } else {
-        rfifolock_lock(&ctx->lock);
+        qemu_rec_mutex_lock(&ctx->lock);
     }
 }
 
@@ -587,6 +587,6 @@ void aio_context_release(AioContext *ctx)
     if (ctx == qemu_get_aio_context()) {
         assert(qemu_mutex_iothread_locked());
     } else {
-        rfifolock_unlock(&ctx->lock);
+        qemu_rec_mutex_unlock(&ctx->lock);
     }
 }
diff --git a/include/block/aio.h b/include/block/aio.h
index 7223daf..3f055d2 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -19,7 +19,6 @@
 #include "qemu/queue.h"
 #include "qemu/event_notifier.h"
 #include "qemu/thread.h"
-#include "qemu/rfifolock.h"
 #include "qemu/timer.h"
 
 typedef struct BlockAIOCB BlockAIOCB;
@@ -52,7 +51,7 @@ struct AioContext {
     GSource source;
 
     /* Protects all fields from multi-threaded access */
-    RFifoLock lock;
+    QemuRecMutex lock;
 
     /* The list of registered AIO handlers */
     QLIST_HEAD(, AioHandler) aio_handlers;
diff --git a/include/qemu/rfifolock.h b/include/qemu/rfifolock.h
deleted file mode 100644
index b23ab53..0000000
--- a/include/qemu/rfifolock.h
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Recursive FIFO lock
- *
- * Copyright Red Hat, Inc. 2013
- *
- * Authors:
- *  Stefan Hajnoczi   <stefanha@redhat.com>
- *
- * This work is licensed under the terms of the GNU GPL, version 2 or later.
- * See the COPYING file in the top-level directory.
- *
- */
-
-#ifndef QEMU_RFIFOLOCK_H
-#define QEMU_RFIFOLOCK_H
-
-#include "qemu/thread.h"
-
-/* Recursive FIFO lock
- *
- * This lock provides more features than a plain mutex:
- *
- * 1. Fairness - enforces FIFO order.
- * 2. Nesting - can be taken recursively.
- * 3. Contention callback - optional, called when thread must wait.
- *
- * The recursive FIFO lock is heavyweight so prefer other synchronization
- * primitives if you do not need its features.
- */
-typedef struct {
-    QemuMutex lock;             /* protects all fields */
-
-    /* FIFO order */
-    unsigned int head;          /* active ticket number */
-    unsigned int tail;          /* waiting ticket number */
-    QemuCond cond;              /* used to wait for our ticket number */
-
-    /* Nesting */
-    QemuThread owner_thread;    /* thread that currently has ownership */
-    unsigned int nesting;       /* amount of nesting levels */
-
-    /* Contention callback */
-    void (*cb)(void *);         /* called when thread must wait, with ->lock
-                                 * held so it may not recursively lock/unlock
-                                 */
-    void *cb_opaque;
-} RFifoLock;
-
-void rfifolock_init(RFifoLock *r, void (*cb)(void *), void *opaque);
-void rfifolock_destroy(RFifoLock *r);
-void rfifolock_lock(RFifoLock *r);
-void rfifolock_unlock(RFifoLock *r);
-
-#endif /* QEMU_RFIFOLOCK_H */
diff --git a/tests/.gitignore b/tests/.gitignore
index 787c95c..b3da3f1 100644
--- a/tests/.gitignore
+++ b/tests/.gitignore
@@ -51,7 +51,6 @@ test-qmp-introspect.[ch]
 test-qmp-marshal.c
 test-qmp-output-visitor
 test-rcu-list
-test-rfifolock
 test-string-input-visitor
 test-string-output-visitor
 test-thread-pool
diff --git a/tests/Makefile b/tests/Makefile
index 650e654..9da0fcf 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -39,7 +39,6 @@ check-unit-y += tests/test-visitor-serialization$(EXESUF)
 check-unit-y += tests/test-iov$(EXESUF)
 gcov-files-test-iov-y = util/iov.c
 check-unit-y += tests/test-aio$(EXESUF)
-check-unit-$(CONFIG_POSIX) += tests/test-rfifolock$(EXESUF)
 check-unit-y += tests/test-throttle$(EXESUF)
 gcov-files-test-aio-$(CONFIG_WIN32) = aio-win32.c
 gcov-files-test-aio-$(CONFIG_POSIX) = aio-posix.c
@@ -403,7 +402,6 @@ tests/check-qom-interface$(EXESUF): tests/check-qom-interface.o $(test-qom-obj-y
 tests/check-qom-proplist$(EXESUF): tests/check-qom-proplist.o $(test-qom-obj-y)
 tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(test-block-obj-y)
 tests/test-aio$(EXESUF): tests/test-aio.o $(test-block-obj-y)
-tests/test-rfifolock$(EXESUF): tests/test-rfifolock.o $(test-util-obj-y)
 tests/test-throttle$(EXESUF): tests/test-throttle.o $(test-block-obj-y)
 tests/test-blockjob-txn$(EXESUF): tests/test-blockjob-txn.o $(test-block-obj-y) $(test-util-obj-y)
 tests/test-thread-pool$(EXESUF): tests/test-thread-pool.o $(test-block-obj-y)
diff --git a/tests/test-rfifolock.c b/tests/test-rfifolock.c
deleted file mode 100644
index 0572ebb..0000000
--- a/tests/test-rfifolock.c
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * RFifoLock tests
- *
- * Copyright Red Hat, Inc. 2013
- *
- * Authors:
- *  Stefan Hajnoczi    <stefanha@redhat.com>
- *
- * This work is licensed under the terms of the GNU LGPL, version 2 or later.
- * See the COPYING.LIB file in the top-level directory.
- */
-
-#include <glib.h>
-#include "qemu-common.h"
-#include "qemu/rfifolock.h"
-
-static void test_nesting(void)
-{
-    RFifoLock lock;
-
-    /* Trivial test, ensure the lock is recursive */
-    rfifolock_init(&lock, NULL, NULL);
-    rfifolock_lock(&lock);
-    rfifolock_lock(&lock);
-    rfifolock_lock(&lock);
-    rfifolock_unlock(&lock);
-    rfifolock_unlock(&lock);
-    rfifolock_unlock(&lock);
-    rfifolock_destroy(&lock);
-}
-
-typedef struct {
-    RFifoLock lock;
-    int fd[2];
-} CallbackTestData;
-
-static void rfifolock_cb(void *opaque)
-{
-    CallbackTestData *data = opaque;
-    int ret;
-    char c = 0;
-
-    ret = write(data->fd[1], &c, sizeof(c));
-    g_assert(ret == 1);
-}
-
-static void *callback_thread(void *opaque)
-{
-    CallbackTestData *data = opaque;
-
-    /* The other thread holds the lock so the contention callback will be
-     * invoked...
-     */
-    rfifolock_lock(&data->lock);
-    rfifolock_unlock(&data->lock);
-    return NULL;
-}
-
-static void test_callback(void)
-{
-    CallbackTestData data;
-    QemuThread thread;
-    int ret;
-    char c;
-
-    rfifolock_init(&data.lock, rfifolock_cb, &data);
-    ret = qemu_pipe(data.fd);
-    g_assert(ret == 0);
-
-    /* Hold lock but allow the callback to kick us by writing to the pipe */
-    rfifolock_lock(&data.lock);
-    qemu_thread_create(&thread, "callback_thread",
-                       callback_thread, &data, QEMU_THREAD_JOINABLE);
-    ret = read(data.fd[0], &c, sizeof(c));
-    g_assert(ret == 1);
-    rfifolock_unlock(&data.lock);
-    /* If we got here then the callback was invoked, as expected */
-
-    qemu_thread_join(&thread);
-    close(data.fd[0]);
-    close(data.fd[1]);
-    rfifolock_destroy(&data.lock);
-}
-
-int main(int argc, char **argv)
-{
-    g_test_init(&argc, &argv, NULL);
-    g_test_add_func("/nesting", test_nesting);
-    g_test_add_func("/callback", test_callback);
-    return g_test_run();
-}
diff --git a/util/Makefile.objs b/util/Makefile.objs
index a8a777e..c0223c6 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -23,7 +23,6 @@ util-obj-y += crc32c.o
 util-obj-y += throttle.o
 util-obj-y += getauxval.o
 util-obj-y += readline.o
-util-obj-y += rfifolock.o
 util-obj-y += rcu.o
 util-obj-y += qemu-coroutine.o qemu-coroutine-lock.o qemu-coroutine-io.o
 util-obj-y += qemu-coroutine-sleep.o
diff --git a/util/rfifolock.c b/util/rfifolock.c
deleted file mode 100644
index c22f5fe..0000000
--- a/util/rfifolock.c
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Recursive FIFO lock
- *
- * Copyright Red Hat, Inc. 2013
- *
- * Authors:
- *  Stefan Hajnoczi   <stefanha@redhat.com>
- *
- * This work is licensed under the terms of the GNU LGPL, version 2 or later.
- * See the COPYING.LIB file in the top-level directory.
- *
- */
-
-#include "qemu/osdep.h"
-#include "qemu/rfifolock.h"
-
-void rfifolock_init(RFifoLock *r, void (*cb)(void *), void *opaque)
-{
-    qemu_mutex_init(&r->lock);
-    r->head = 0;
-    r->tail = 0;
-    qemu_cond_init(&r->cond);
-    r->nesting = 0;
-    r->cb = cb;
-    r->cb_opaque = opaque;
-}
-
-void rfifolock_destroy(RFifoLock *r)
-{
-    qemu_cond_destroy(&r->cond);
-    qemu_mutex_destroy(&r->lock);
-}
-
-/*
- * Theory of operation:
- *
- * In order to ensure FIFO ordering, implement a ticketlock.  Threads acquiring
- * the lock enqueue themselves by incrementing the tail index.  When the lock
- * is unlocked, the head is incremented and waiting threads are notified.
- *
- * Recursive locking does not take a ticket since the head is only incremented
- * when the outermost recursive caller unlocks.
- */
-void rfifolock_lock(RFifoLock *r)
-{
-    qemu_mutex_lock(&r->lock);
-
-    /* Take a ticket */
-    unsigned int ticket = r->tail++;
-
-    if (r->nesting > 0 && qemu_thread_is_self(&r->owner_thread)) {
-        r->tail--; /* put ticket back, we're nesting */
-    } else {
-        while (ticket != r->head) {
-            /* Invoke optional contention callback */
-            if (r->cb) {
-                r->cb(r->cb_opaque);
-            }
-            qemu_cond_wait(&r->cond, &r->lock);
-        }
-    }
-
-    qemu_thread_get_self(&r->owner_thread);
-    r->nesting++;
-    qemu_mutex_unlock(&r->lock);
-}
-
-void rfifolock_unlock(RFifoLock *r)
-{
-    qemu_mutex_lock(&r->lock);
-    assert(r->nesting > 0);
-    assert(qemu_thread_is_self(&r->owner_thread));
-    if (--r->nesting == 0) {
-        r->head++;
-        qemu_cond_broadcast(&r->cond);
-    }
-    qemu_mutex_unlock(&r->lock);
-}
-- 
2.5.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [Qemu-devel] [PATCH 08/16] aio: rename bh_lock to list_lock
  2016-02-08 16:14 [Qemu-devel] [PATCH v2 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
                   ` (6 preceding siblings ...)
  2016-02-08 16:14 ` [Qemu-devel] [PATCH 07/16] aio: convert from RFifoLock to QemuRecMutex Paolo Bonzini
@ 2016-02-08 16:14 ` Paolo Bonzini
  2016-02-08 16:15 ` [Qemu-devel] [PATCH 09/16] qemu-thread: introduce QemuLockCnt Paolo Bonzini
                   ` (7 subsequent siblings)
  15 siblings, 0 replies; 21+ messages in thread
From: Paolo Bonzini @ 2016-02-08 16:14 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha

This will be used for AioHandlers too.  There is going to be little
or no contention, so it is better to reuse the same lock.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 async.c             | 16 ++++++++--------
 include/block/aio.h |  2 +-
 2 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/async.c b/async.c
index edb2846..fc4c173 100644
--- a/async.c
+++ b/async.c
@@ -51,12 +51,12 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
         .cb = cb,
         .opaque = opaque,
     };
-    qemu_mutex_lock(&ctx->bh_lock);
+    qemu_mutex_lock(&ctx->list_lock);
     bh->next = ctx->first_bh;
     /* Make sure that the members are ready before putting bh into list */
     smp_wmb();
     ctx->first_bh = bh;
-    qemu_mutex_unlock(&ctx->bh_lock);
+    qemu_mutex_unlock(&ctx->list_lock);
     return bh;
 }
 
@@ -98,7 +98,7 @@ int aio_bh_poll(AioContext *ctx)
 
     /* remove deleted bhs */
     if (!ctx->walking_bh) {
-        qemu_mutex_lock(&ctx->bh_lock);
+        qemu_mutex_lock(&ctx->list_lock);
         bhp = &ctx->first_bh;
         while (*bhp) {
             bh = *bhp;
@@ -109,7 +109,7 @@ int aio_bh_poll(AioContext *ctx)
                 bhp = &bh->next;
             }
         }
-        qemu_mutex_unlock(&ctx->bh_lock);
+        qemu_mutex_unlock(&ctx->list_lock);
     }
 
     return ret;
@@ -240,7 +240,7 @@ aio_ctx_finalize(GSource     *source)
 
     thread_pool_free(ctx->thread_pool);
 
-    qemu_mutex_lock(&ctx->bh_lock);
+    qemu_mutex_lock(&ctx->list_lock);
     while (ctx->first_bh) {
         QEMUBH *next = ctx->first_bh->next;
 
@@ -250,12 +250,12 @@ aio_ctx_finalize(GSource     *source)
         g_free(ctx->first_bh);
         ctx->first_bh = next;
     }
-    qemu_mutex_unlock(&ctx->bh_lock);
+    qemu_mutex_unlock(&ctx->list_lock);
 
     aio_set_event_notifier(ctx, &ctx->notifier, false, NULL);
     event_notifier_cleanup(&ctx->notifier);
     qemu_rec_mutex_destroy(&ctx->lock);
-    qemu_mutex_destroy(&ctx->bh_lock);
+    qemu_mutex_destroy(&ctx->list_lock);
     timerlistgroup_deinit(&ctx->tlg);
 }
 
@@ -551,7 +551,7 @@ AioContext *aio_context_new(Error **errp)
                            (EventNotifierHandler *)
                            event_notifier_dummy_cb);
     ctx->thread_pool = NULL;
-    qemu_mutex_init(&ctx->bh_lock);
+    qemu_mutex_init(&ctx->list_lock);
     qemu_rec_mutex_init(&ctx->lock);
     timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 
diff --git a/include/block/aio.h b/include/block/aio.h
index 3f055d2..322a10e 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -88,7 +88,7 @@ struct AioContext {
     uint32_t notify_me;
 
     /* lock to protect between bh's adders and deleter */
-    QemuMutex bh_lock;
+    QemuMutex list_lock;
 
     /* Anchor of the list of Bottom Halves belonging to the context */
     struct QEMUBH *first_bh;
-- 
2.5.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [Qemu-devel] [PATCH 09/16] qemu-thread: introduce QemuLockCnt
  2016-02-08 16:14 [Qemu-devel] [PATCH v2 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
                   ` (7 preceding siblings ...)
  2016-02-08 16:14 ` [Qemu-devel] [PATCH 08/16] aio: rename bh_lock to list_lock Paolo Bonzini
@ 2016-02-08 16:15 ` Paolo Bonzini
  2016-02-08 22:38   ` Eric Blake
  2016-02-08 16:15 ` [Qemu-devel] [PATCH 10/16] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh Paolo Bonzini
                   ` (6 subsequent siblings)
  15 siblings, 1 reply; 21+ messages in thread
From: Paolo Bonzini @ 2016-02-08 16:15 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha

A QemuLockCnt comprises a counter and a mutex, with primitives
to increment and decrement the counter, and to take and release the
mutex.  It can be used to do lock-free visits to a data structure
whenever mutexes would be too heavy-weight and the critical section
is too long for RCU.

This could be implemented simply by protecting the counter with the
mutex, but QemuLockCnt is harder to misuse and more efficient.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/lockcnt.txt      | 343 ++++++++++++++++++++++++++++++++++++++++++++++++++
 include/qemu/thread.h |  17 +++
 util/Makefile.objs    |   1 +
 util/lockcnt.c        | 122 ++++++++++++++++++
 4 files changed, 483 insertions(+)
 create mode 100644 docs/lockcnt.txt
 create mode 100644 util/lockcnt.c

diff --git a/docs/lockcnt.txt b/docs/lockcnt.txt
new file mode 100644
index 0000000..fc5d240
--- /dev/null
+++ b/docs/lockcnt.txt
@@ -0,0 +1,343 @@
+DOCUMENTATION FOR LOCKED COUNTERS (aka QemuLockCnt)
+===================================================
+
+QEMU often uses reference counts to track data structures that are being
+accessed and should not be freed.  For example, a loop that invoke
+callbacks like this is not safe:
+
+    QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+        if (ioh->revents & G_IO_OUT) {
+            ioh->fd_write(ioh->opaque);
+        }
+    }
+
+QLIST_FOREACH_SAFE protects against deletion of the current node (ioh)
+by stashing away its "next" pointer.  However, ioh->fd_write could
+actually delete the next node from the list.  The simplest way to
+avoid this is to mark the node as deleted, and remove it from the
+list in the above loop:
+
+    QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+        if (ioh->deleted) {
+            QLIST_REMOVE(ioh, next);
+            g_free(ioh);
+        } else {
+            if (ioh->revents & G_IO_OUT) {
+                ioh->fd_write(ioh->opaque);
+            }
+        }
+    }
+
+If however this loop must also be reentrant, i.e. it is possible that
+ioh->fd_write invokes the loop again, some kind of counting is needed:
+
+    walking_handlers++;
+    QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+        if (ioh->deleted) {
+            if (walking_handlers == 1) {
+                QLIST_REMOVE(ioh, next);
+                g_free(ioh);
+            }
+        } else {
+            if (ioh->revents & G_IO_OUT) {
+                ioh->fd_write(ioh->opaque);
+            }
+        }
+    }
+    walking_handlers--;
+
+One may think of using the RCU primitives, rcu_read_lock() and
+rcu_read_unlock(); effectively, the RCU nesting count would take
+the place of the walking_handlers global variable.  Indeed,
+reference counting and RCU have similar purposes, but their usage in
+general is complementary:
+
+- reference counting is fine-grained and limited to a single data
+  structure; RCU delays reclamation of *all* RCU-protected data
+  structures;
+
+- reference counting works even in the presence of code that keeps
+  a reference for a long time; RCU critical sections in principle
+  should be kept short;
+
+- reference counting is often applied to code that is not thread-safe
+  but is reentrant; in fact, usage of reference counting in QEMU predates
+  the introduction of threads by many years.  RCU is generally used to
+  protect readers from other threads freeing memory after concurrent
+  modifications to a data structure.
+
+- reclaiming data can be done by a separate thread in the case of RCU;
+  this can improve performance, but also delay reclamation undesirably.
+  With reference counting, reclamation is deterministic.
+
+This file documents QemuLockCnt, an abstraction for using reference
+counting in code that has to be both thread-safe and reentrant.
+
+
+QemuLockCnt concepts
+--------------------
+
+A QemuLockCnt comprises both a counter and a mutex; it has primitives
+to increment and decrement the counter, and to take and release the
+mutex.  The counter notes how many visits to the data structures are
+taking place (the visits could be from different threads, or there could
+be multiple reentrant visits from the same thread).  The basic rules
+governing the counter/mutex pair then are the following:
+
+- Data protected by the QemuLockCnt must not be freed unless the
+  counter is zero and the mutex is taken.
+
+- A new visit cannot be started while the counter is zero and the
+  mutex is taken.
+
+Most of the time, the mutex protects all writes to the data structure,
+not just frees, though there could be cases where this is not necessary.
+
+Reads, instead, can be done without taking the mutex, as long as the
+readers and writers use the same macros that are used for RCU, for
+example atomic_rcu_read, atomic_rcu_set, QLIST_FOREACH_RCU, etc.  This is
+because the reads are done outside a lock and a set or QLIST_INSERT_HEAD
+can happen concurrently with the read.  The RCU API ensures that the
+processor and the compiler see all required memory barriers.
+
+This could be implemented simply by protecting the counter with the
+mutex, for example:
+
+    // (1)
+    qemu_mutex_lock(&walking_handlers_mutex);
+    walking_handlers++;
+    qemu_mutex_unlock(&walking_handlers_mutex);
+
+    ...
+
+    // (2)
+    qemu_mutex_lock(&walking_handlers_mutex);
+    if (--walking_handlers == 0) {
+        QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+            if (ioh->deleted) {
+                QLIST_REMOVE(ioh, next);
+                g_free(ioh);
+            }
+        }
+    }
+    qemu_mutex_unlock(&walking_handlers_mutex);
+
+Here, no frees can happen in the code represented by the ellipsis.
+If another thread is executing critical section (2), that part of
+the code cannot be entered, because the thread will not be able
+to increment the walking_handlers variable.  And of course
+during the visit any other thread will see a nonzero value for
+walking_handlers, as in the single-threaded code.
+
+Note that it is possible for multiple concurrent accesses to delay
+the cleanup arbitrarily; in other words, for the walking_handlers
+counter to never become zero.  For this reason, this technique is
+more easily applicable if concurrent access to the structure is rare.
+
+However, critical sections are easy to forget since you have to do
+them for each modification of the counter.  QemuLockCnt ensures that
+all modifications of the counter take the lock appropriately, and it
+can also be more efficient in two ways:
+
+- it avoids taking the lock for many operations (for example
+  incrementing the counter while it is non-zero);
+
+- on some platforms, one could implement QemuLockCnt to hold the
+  lock and the mutex in a single word, making it no more expensive
+  than simply managing a counter using atomic operations (see
+  docs/atomics.txt).  This is not implemented yet, but can be
+  very helpful if concurrent access to the data structure is
+  expected to be rare.
+
+
+Using the same mutex for frees and writes can still incur some small
+inefficiencies; for example, a visit can never start if the counter is
+zero and the mutex is taken---even if the mutex is taken by a write,
+which in principle need not block a visit of the data structure.
+However, these are usually not a problem if any of the following
+assumptions are valid:
+
+- concurrent access is possible but rare
+
+- writes are rare
+
+- writes are frequent, but this kind of write (e.g. appending to a
+  list) has a very small critical section.
+
+For example, QEMU uses QemuLockCnt to manage an AioContext's list of
+bottom halves and file descriptor handlers.  Modifications to the list
+of file descriptor handlers are rare.  Creation of a new bottom half is
+frequent and can happen on a fast path; however: 1) it is almost never
+concurrent with a visit to the list of bottom halves; 2) it only has
+three instructions in the critical path, two assignments and a smp_wmb().
+
+
+QemuLockCnt API
+---------------
+
+    void qemu_lockcnt_init(QemuLockCnt *lockcnt);
+
+        Initialize lockcnt's counter to zero and prepare its mutex
+        for usage.
+
+    void qemu_lockcnt_destroy(QemuLockCnt *lockcnt);
+
+        Destroy lockcnt's mutex.
+
+    void qemu_lockcnt_inc(QemuLockCnt *lockcnt);
+
+        If the lockcnt's count is zero, wait for critical sections
+        to finish and increment lockcnt's count to 1.  If the count
+        is not zero, just increment it.
+
+        Because this function can wait on the mutex, it must not be
+        called while the lockcnt's mutex is held by the current thread.
+        For the same reason, qemu_lockcnt_inc can also contribute to
+        AB-BA deadlocks.  This is a sample deadlock scenario:
+
+              thread 1                      thread 2
+              -------------------------------------------------------
+              qemu_lockcnt_lock(&lc1);
+                                            qemu_lockcnt_lock(&lc2);
+              qemu_lockcnt_inc(&lc2);
+                                            qemu_lockcnt_inc(&lc1);
+
+    void qemu_lockcnt_dec(QemuLockCnt *lockcnt);
+
+        Decrement lockcnt's count.
+
+    bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt);
+
+        Decrement the count.  If the new count is zero, lock
+        the mutex and return true.  Otherwise, return false.
+
+    bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt);
+
+        If the count is 1, decrement the count to zero, lock
+        the mutex and return true.  Otherwise, return false.
+
+    void qemu_lockcnt_lock(QemuLockCnt *lockcnt);
+
+        Lock the lockcnt's mutex.  Remember that concurrent visits
+        are not blocked unless the count is also zero.  You can
+        use qemu_lockcnt_count to check for this inside a critical
+        section.
+
+    void qemu_lockcnt_unlock(QemuLockCnt *lockcnt);
+
+        Release the lockcnt's mutex.
+
+    void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt);
+
+        This is the same as
+
+            qemu_lockcnt_unlock(lockcnt);
+            qemu_lockcnt_inc(lockcnt);
+
+        but more efficient.
+
+    int qemu_lockcnt_count(QemuLockCnt *lockcnt);
+
+        Return the lockcnt's count.  The count can change at any time
+        any time; still, while the lockcnt is locked, one can usefully
+        check whether the count is non-zero.
+
+
+QemuLockCnt usage
+-----------------
+
+This section explains the typical usage patterns for QemuLockCnt functions.
+
+Setting a variable to a non-NULL value can be done between
+qemu_lockcnt_lock and qemu_lockcnt_unlock:
+
+    qemu_lockcnt_lock(&xyz_lockcnt);
+    if (!xyz) {
+        new_xyz = g_new(XYZ, 1);
+        ...
+        atomic_rcu_set(&xyz, new_xyz);
+    }
+    qemu_lockcnt_unlock(&xyz_lockcnt);
+
+Accessing the value can be done between qemu_lockcnt_inc and
+qemu_lockcnt_dec:
+
+    qemu_lockcnt_inc(&xyz_lockcnt);
+    if (xyz) {
+        XYZ *p = atomic_rcu_read(&xyz);
+        ...
+        /* Accesses can now be done through "p".  */
+    }
+    qemu_lockcnt_dec(&xyz_lockcnt);
+
+Freeing the object can similarly use qemu_lockcnt_lock and
+qemu_lockcnt_unlock, but you also need to ensure that the count
+is zero (i.e. there is no concurrent visit).  Because qemu_lockcnt_inc
+takes the QemuLockCnt's lock, the count cannot become non-zero while
+the object is being freed.  Freeing an object looks like this:
+
+    qemu_lockcnt_lock(&xyz_lockcnt);
+    if (!qemu_lockcnt_count(&xyz_lockcnt)) {
+        g_free(xyz);
+        xyz = NULL;
+    }
+    qemu_lockcnt_unlock(&xyz_lockcnt);
+
+If an object has to be freed right after a visit, you can combine
+the decrement, the locking and the check on count as follows:
+
+    qemu_lockcnt_inc(&xyz_lockcnt);
+    if (xyz) {
+        XYZ *p = atomic_rcu_read(&xyz);
+        ...
+        /* Accesses can now be done through "p".  */
+    }
+    if (qemu_lockcnt_dec_and_lock(&xyz_lockcnt)) {
+        g_free(xyz);
+        xyz = NULL;
+        qemu_lockcnt_unlock(&xyz_lockcnt);
+    }
+
+QemuLockCnt can also be used to access a list as follows:
+
+    qemu_lockcnt_inc(&io_handlers_lockcnt);
+    QLIST_FOREACH_RCU(ioh, &io_handlers, pioh) {
+        if (ioh->revents & G_IO_OUT) {
+            ioh->fd_write(ioh->opaque);
+        }
+    }
+
+    if (qemu_lockcnt_dec_and_lock(&io_handlers_lockcnt)) {
+        QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+            if (ioh->deleted) {
+                QLIST_REMOVE(ioh, next);
+                g_free(ioh);
+            }
+        }
+        qemu_lockcnt_unlock(&io_handlers_lockcnt);
+    }
+
+Again, the RCU primitives are used because new items can be added to the
+list during the walk.  QLIST_FOREACH_RCU ensures that the processor and
+the compiler see the appropriate memory barriers.
+
+An alternative pattern uses qemu_lockcnt_dec_if_lock:
+
+    qemu_lockcnt_inc(&io_handlers_lockcnt);
+    QLIST_FOREACH_SAFE_RCU(ioh, &io_handlers, next, pioh) {
+        if (ioh->deleted) {
+            if (qemu_lockcnt_dec_if_lock(&io_handlers_lockcnt)) {
+                QLIST_REMOVE(ioh, next);
+                g_free(ioh);
+                qemu_lockcnt_inc_and_unlock(&io_handlers_lockcnt);
+            }
+        } else {
+            if (ioh->revents & G_IO_OUT) {
+                ioh->fd_write(ioh->opaque);
+            }
+        }
+    }
+    qemu_lockcnt_dec(&io_handlers_lockcnt);
+
+Here you can use qemu_lockcnt_dec instead of qemu_lockcnt_dec_and_lock,
+because there is no special task to do if the count goes from 1 to 0.
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index 981f3dc..9fadca4 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -8,6 +8,7 @@ typedef struct QemuMutex QemuMutex;
 typedef struct QemuCond QemuCond;
 typedef struct QemuSemaphore QemuSemaphore;
 typedef struct QemuEvent QemuEvent;
+typedef struct QemuLockCnt QemuLockCnt;
 typedef struct QemuThread QemuThread;
 
 #ifdef _WIN32
@@ -65,4 +66,20 @@ struct Notifier;
 void qemu_thread_atexit_add(struct Notifier *notifier);
 void qemu_thread_atexit_remove(struct Notifier *notifier);
 
+struct QemuLockCnt {
+    QemuMutex mutex;
+    unsigned count;
+};
+
+void qemu_lockcnt_init(QemuLockCnt *lockcnt);
+void qemu_lockcnt_destroy(QemuLockCnt *lockcnt);
+void qemu_lockcnt_inc(QemuLockCnt *lockcnt);
+void qemu_lockcnt_dec(QemuLockCnt *lockcnt);
+bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt);
+bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt);
+void qemu_lockcnt_lock(QemuLockCnt *lockcnt);
+void qemu_lockcnt_unlock(QemuLockCnt *lockcnt);
+void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt);
+unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt);
+
 #endif
diff --git a/util/Makefile.objs b/util/Makefile.objs
index c0223c6..f1886ae 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -1,4 +1,5 @@
 util-obj-y = osdep.o cutils.o unicode.o qemu-timer-common.o
+util-obj-y += lockcnt.o
 util-obj-$(CONFIG_POSIX) += compatfd.o
 util-obj-$(CONFIG_POSIX) += event_notifier-posix.o
 util-obj-$(CONFIG_POSIX) += mmap-alloc.o
diff --git a/util/lockcnt.c b/util/lockcnt.c
new file mode 100644
index 0000000..304f9d9
--- /dev/null
+++ b/util/lockcnt.c
@@ -0,0 +1,122 @@
+/*
+ * QemuLockCnt implementation
+ *
+ * Copyright Red Hat, Inc. 2015
+ *
+ * Author:
+ *   Paolo Bonzini <pbonzini@redhat.com>
+ */
+#include <stdlib.h>
+#include <stdio.h>
+#include <errno.h>
+#include <time.h>
+#include <signal.h>
+#include <stdint.h>
+#include <string.h>
+#include <limits.h>
+#include <unistd.h>
+#include <sys/time.h>
+#include "qemu/thread.h"
+#include "qemu/atomic.h"
+
+void qemu_lockcnt_init(QemuLockCnt *lockcnt)
+{
+    qemu_mutex_init(&lockcnt->mutex);
+    lockcnt->count = 0;
+}
+
+void qemu_lockcnt_destroy(QemuLockCnt *lockcnt)
+{
+    qemu_mutex_destroy(&lockcnt->mutex);
+}
+
+void qemu_lockcnt_inc(QemuLockCnt *lockcnt)
+{
+    int old;
+    for (;;) {
+        old = atomic_mb_read(&lockcnt->count);
+        if (old == 0) {
+            qemu_lockcnt_lock(lockcnt);
+            qemu_lockcnt_inc_and_unlock(lockcnt);
+            return;
+        } else {
+            if (atomic_cmpxchg(&lockcnt->count, old, old + 1) == old) {
+                return;
+            }
+        }
+    }
+}
+
+void qemu_lockcnt_dec(QemuLockCnt *lockcnt)
+{
+    atomic_dec(&lockcnt->count);
+}
+
+/* Decrement a counter, and return locked if it is decremented to zero.
+ * It is impossible for the counter to become nonzero while the mutex
+ * is taken.
+ */
+bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    while (val > 1) {
+        int old = atomic_cmpxchg(&lockcnt->count, val, val - 1);
+        if (old != val) {
+            val = old;
+            continue;
+        }
+
+        return false;
+    }
+
+    qemu_lockcnt_lock(lockcnt);
+    if (atomic_fetch_dec(&lockcnt->count) == 1) {
+        return true;
+    }
+
+    qemu_lockcnt_unlock(lockcnt);
+    return false;
+}
+
+/* Decrement a counter and return locked if it is decremented to zero.
+ * Otherwise do nothing.
+ *
+ * It is impossible for the counter to become nonzero while the mutex
+ * is taken.
+ */
+bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_mb_read(&lockcnt->count);
+    if (val > 1) {
+        return false;
+    }
+
+    qemu_lockcnt_lock(lockcnt);
+    if (atomic_fetch_dec(&lockcnt->count) == 1) {
+        return true;
+    }
+
+    qemu_lockcnt_inc_and_unlock(lockcnt);
+    return false;
+}
+
+void qemu_lockcnt_lock(QemuLockCnt *lockcnt)
+{
+    qemu_mutex_lock(&lockcnt->mutex);
+}
+
+void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt)
+{
+    atomic_inc(&lockcnt->count);
+    qemu_mutex_unlock(&lockcnt->mutex);
+}
+
+void qemu_lockcnt_unlock(QemuLockCnt *lockcnt)
+{
+    qemu_mutex_unlock(&lockcnt->mutex);
+}
+
+unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
+{
+    return lockcnt->count;
+}
-- 
2.5.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [Qemu-devel] [PATCH 10/16] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh
  2016-02-08 16:14 [Qemu-devel] [PATCH v2 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
                   ` (8 preceding siblings ...)
  2016-02-08 16:15 ` [Qemu-devel] [PATCH 09/16] qemu-thread: introduce QemuLockCnt Paolo Bonzini
@ 2016-02-08 16:15 ` Paolo Bonzini
  2016-02-08 16:15 ` [Qemu-devel] [PATCH 11/16] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini
                   ` (5 subsequent siblings)
  15 siblings, 0 replies; 21+ messages in thread
From: Paolo Bonzini @ 2016-02-08 16:15 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha

This will make it possible to walk the list of bottom halves without
holding the AioContext lock---and in turn to call bottom half
handlers without holding the lock.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 async.c             | 31 ++++++++++++++-----------------
 include/block/aio.h | 12 +++++-------
 2 files changed, 19 insertions(+), 24 deletions(-)

diff --git a/async.c b/async.c
index fc4c173..bc7e142 100644
--- a/async.c
+++ b/async.c
@@ -51,12 +51,12 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
         .cb = cb,
         .opaque = opaque,
     };
-    qemu_mutex_lock(&ctx->list_lock);
+    qemu_lockcnt_lock(&ctx->list_lock);
     bh->next = ctx->first_bh;
     /* Make sure that the members are ready before putting bh into list */
     smp_wmb();
     ctx->first_bh = bh;
-    qemu_mutex_unlock(&ctx->list_lock);
+    qemu_lockcnt_unlock(&ctx->list_lock);
     return bh;
 }
 
@@ -71,13 +71,11 @@ int aio_bh_poll(AioContext *ctx)
     QEMUBH *bh, **bhp, *next;
     int ret;
 
-    ctx->walking_bh++;
+    qemu_lockcnt_inc(&ctx->list_lock);
 
     ret = 0;
-    for (bh = ctx->first_bh; bh; bh = next) {
-        /* Make sure that fetching bh happens before accessing its members */
-        smp_read_barrier_depends();
-        next = bh->next;
+    for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) {
+        next = atomic_rcu_read(&bh->next);
         /* The atomic_xchg is paired with the one in qemu_bh_schedule.  The
          * implicit memory barrier ensures that the callback sees all writes
          * done by the scheduling thread.  It also ensures that the scheduling
@@ -94,11 +92,8 @@ int aio_bh_poll(AioContext *ctx)
         }
     }
 
-    ctx->walking_bh--;
-
     /* remove deleted bhs */
-    if (!ctx->walking_bh) {
-        qemu_mutex_lock(&ctx->list_lock);
+    if (qemu_lockcnt_dec_and_lock(&ctx->list_lock)) {
         bhp = &ctx->first_bh;
         while (*bhp) {
             bh = *bhp;
@@ -109,7 +104,7 @@ int aio_bh_poll(AioContext *ctx)
                 bhp = &bh->next;
             }
         }
-        qemu_mutex_unlock(&ctx->list_lock);
+        qemu_lockcnt_unlock(&ctx->list_lock);
     }
 
     return ret;
@@ -165,7 +160,8 @@ aio_compute_timeout(AioContext *ctx)
     int timeout = -1;
     QEMUBH *bh;
 
-    for (bh = ctx->first_bh; bh; bh = bh->next) {
+    for (bh = atomic_rcu_read(&ctx->first_bh); bh;
+         bh = atomic_rcu_read(&bh->next)) {
         if (!bh->deleted && bh->scheduled) {
             if (bh->idle) {
                 /* idle bottom halves will be polled at least
@@ -240,7 +236,8 @@ aio_ctx_finalize(GSource     *source)
 
     thread_pool_free(ctx->thread_pool);
 
-    qemu_mutex_lock(&ctx->list_lock);
+    qemu_lockcnt_lock(&ctx->list_lock);
+    assert(!qemu_lockcnt_count(&ctx->list_lock));
     while (ctx->first_bh) {
         QEMUBH *next = ctx->first_bh->next;
 
@@ -250,12 +247,12 @@ aio_ctx_finalize(GSource     *source)
         g_free(ctx->first_bh);
         ctx->first_bh = next;
     }
-    qemu_mutex_unlock(&ctx->list_lock);
+    qemu_lockcnt_unlock(&ctx->list_lock);
 
     aio_set_event_notifier(ctx, &ctx->notifier, false, NULL);
     event_notifier_cleanup(&ctx->notifier);
     qemu_rec_mutex_destroy(&ctx->lock);
-    qemu_mutex_destroy(&ctx->list_lock);
+    qemu_lockcnt_destroy(&ctx->list_lock);
     timerlistgroup_deinit(&ctx->tlg);
 }
 
@@ -551,7 +548,7 @@ AioContext *aio_context_new(Error **errp)
                            (EventNotifierHandler *)
                            event_notifier_dummy_cb);
     ctx->thread_pool = NULL;
-    qemu_mutex_init(&ctx->list_lock);
+    qemu_lockcnt_init(&ctx->list_lock);
     qemu_rec_mutex_init(&ctx->lock);
     timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 
diff --git a/include/block/aio.h b/include/block/aio.h
index 322a10e..fb0ff21 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -87,17 +87,15 @@ struct AioContext {
      */
     uint32_t notify_me;
 
-    /* lock to protect between bh's adders and deleter */
-    QemuMutex list_lock;
+    /* A lock to protect between bh's adders and deleter, and to ensure
+     * that no callbacks are removed while we're walking and dispatching
+     * them.
+     */
+    QemuLockCnt list_lock;
 
     /* Anchor of the list of Bottom Halves belonging to the context */
     struct QEMUBH *first_bh;
 
-    /* A simple lock used to protect the first_bh list, and ensure that
-     * no callbacks are removed while we're walking and dispatching callbacks.
-     */
-    int walking_bh;
-
     /* Used by aio_notify.
      *
      * "notified" is used to avoid expensive event_notifier_test_and_clear
-- 
2.5.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [Qemu-devel] [PATCH 11/16] qemu-thread: optimize QemuLockCnt with futexes on Linux
  2016-02-08 16:14 [Qemu-devel] [PATCH v2 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
                   ` (9 preceding siblings ...)
  2016-02-08 16:15 ` [Qemu-devel] [PATCH 10/16] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh Paolo Bonzini
@ 2016-02-08 16:15 ` Paolo Bonzini
  2016-02-08 16:15 ` [Qemu-devel] [PATCH 12/16] aio: tweak walking in dispatch phase Paolo Bonzini
                   ` (4 subsequent siblings)
  15 siblings, 0 replies; 21+ messages in thread
From: Paolo Bonzini @ 2016-02-08 16:15 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha

This is complex, but I think it is reasonably documented in the source.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/lockcnt.txt         |   9 +-
 include/qemu/futex.h     |  36 ++++++
 include/qemu/thread.h    |   3 +
 trace-events             |  10 ++
 util/lockcnt.c           | 282 +++++++++++++++++++++++++++++++++++++++++++++++
 util/qemu-thread-posix.c |  25 +----
 6 files changed, 336 insertions(+), 29 deletions(-)
 create mode 100644 include/qemu/futex.h

diff --git a/docs/lockcnt.txt b/docs/lockcnt.txt
index fc5d240..594764b 100644
--- a/docs/lockcnt.txt
+++ b/docs/lockcnt.txt
@@ -142,12 +142,11 @@ can also be more efficient in two ways:
 - it avoids taking the lock for many operations (for example
   incrementing the counter while it is non-zero);
 
-- on some platforms, one could implement QemuLockCnt to hold the
-  lock and the mutex in a single word, making it no more expensive
+- on some platforms, one can implement QemuLockCnt to hold the lock
+  and the mutex in a single word, making the fast path no more expensive
   than simply managing a counter using atomic operations (see
-  docs/atomics.txt).  This is not implemented yet, but can be
-  very helpful if concurrent access to the data structure is
-  expected to be rare.
+  docs/atomics.txt).  This can be very helpful if concurrent access to
+  the data structure is expected to be rare.
 
 
 Using the same mutex for frees and writes can still incur some small
diff --git a/include/qemu/futex.h b/include/qemu/futex.h
new file mode 100644
index 0000000..c3d1089
--- /dev/null
+++ b/include/qemu/futex.h
@@ -0,0 +1,36 @@
+/*
+ * Wrappers around Linux futex syscall
+ *
+ * Copyright Red Hat, Inc. 2015
+ *
+ * Author:
+ *  Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include <sys/syscall.h>
+#include <linux/futex.h>
+
+#define futex(...)              syscall(__NR_futex, __VA_ARGS__)
+
+static inline void futex_wake(void *f, int n)
+{
+    futex(f, FUTEX_WAKE, n, NULL, NULL, 0);
+}
+
+static inline void futex_wait(void *f, unsigned val)
+{
+    while (futex(f, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
+        switch (errno) {
+        case EWOULDBLOCK:
+            return;
+        case EINTR:
+            break; /* get out of switch and retry */
+        default:
+            abort();
+        }
+    }
+}
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index 9fadca4..22d92d2 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -1,6 +1,7 @@
 #ifndef __QEMU_THREAD_H
 #define __QEMU_THREAD_H 1
 
+#include "config-host.h"
 #include <inttypes.h>
 #include <stdbool.h>
 
@@ -67,7 +68,9 @@ void qemu_thread_atexit_add(struct Notifier *notifier);
 void qemu_thread_atexit_remove(struct Notifier *notifier);
 
 struct QemuLockCnt {
+#ifndef CONFIG_LINUX
     QemuMutex mutex;
+#endif
     unsigned count;
 };
 
diff --git a/trace-events b/trace-events
index c9ac144..daa091e 100644
--- a/trace-events
+++ b/trace-events
@@ -1434,6 +1434,16 @@ hbitmap_iter_skip_words(const void *hb, void *hbi, uint64_t pos, unsigned long c
 hbitmap_reset(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64
 hbitmap_set(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64
 
+# util/lockcnt.c
+lockcnt_fast_path_attempt(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d"
+lockcnt_fast_path_success(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d succeeded"
+lockcnt_unlock_attempt(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d"
+lockcnt_unlock_success(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d succeeded"
+lockcnt_futex_wait_prepare(const void *lockcnt, int expected, int new) "lockcnt %p preparing slow path %d->%d"
+lockcnt_futex_wait(const void *lockcnt, int val) "lockcnt %p waiting on %d"
+lockcnt_futex_wait_resume(const void *lockcnt, int new) "lockcnt %p after wait: %d"
+lockcnt_futex_wake(const void *lockcnt) "lockcnt %p waking up one waiter"
+
 # target-s390x/mmu_helper.c
 get_skeys_nonzero(int rc) "SKEY: Call to get_skeys unexpectedly returned %d"
 set_skeys_nonzero(int rc) "SKEY: Call to set_skeys unexpectedly returned %d"
diff --git a/util/lockcnt.c b/util/lockcnt.c
index 304f9d9..56eb29e 100644
--- a/util/lockcnt.c
+++ b/util/lockcnt.c
@@ -18,7 +18,288 @@
 #include <sys/time.h>
 #include "qemu/thread.h"
 #include "qemu/atomic.h"
+#include "trace.h"
 
+#ifdef CONFIG_LINUX
+#include "qemu/futex.h"
+
+/* On Linux, bits 0-1 are a futex-based lock, bits 2-31 are the counter.
+ * For the mutex algorithm see Ulrich Drepper's "Futexes Are Tricky" (ok,
+ * this is not the most relaxing citation I could make...).  It is similar
+ * to mutex2 in the paper.
+ */
+
+#define QEMU_LOCKCNT_STATE_MASK    3
+#define QEMU_LOCKCNT_STATE_FREE    0
+#define QEMU_LOCKCNT_STATE_LOCKED  1
+#define QEMU_LOCKCNT_STATE_WAITING 2
+
+#define QEMU_LOCKCNT_COUNT_STEP    4
+#define QEMU_LOCKCNT_COUNT_SHIFT   2
+
+void qemu_lockcnt_init(QemuLockCnt *lockcnt)
+{
+    lockcnt->count = 0;
+}
+
+void qemu_lockcnt_destroy(QemuLockCnt *lockcnt)
+{
+}
+
+/* *val is the current value of lockcnt->count.
+ *
+ * If the lock is free, try a cmpxchg from *val to new_if_free; return
+ * true and set *val to the old value found by the cmpxchg in
+ * lockcnt->count.
+ *
+ * If the lock is taken, wait for it to be released and return false
+ * *without trying again to take the lock*.  Again, set *val to the
+ * new value of lockcnt->count.
+ *
+ * new_if_free's bottom two bits must not be QEMU_LOCKCNT_STATE_LOCKED
+ * if calling this function a second time after it has returned
+ * false.
+ */
+static bool qemu_lockcnt_cmpxchg_or_wait(QemuLockCnt *lockcnt, int *val,
+                                         int new_if_free, bool *waited)
+{
+    /* Fast path for when the lock is free.  */
+    if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_FREE) {
+        int expected = *val;
+
+        trace_lockcnt_fast_path_attempt(lockcnt, expected, new_if_free);
+        *val = atomic_cmpxchg(&lockcnt->count, expected, new_if_free);
+        if (*val == expected) {
+            trace_lockcnt_fast_path_success(lockcnt, expected, new_if_free);
+            *val = new_if_free;
+            return true;
+        }
+    }
+
+    /* The slow path moves from locked to waiting if necessary, then
+     * does a futex wait.  Both steps can be repeated ad nauseam,
+     * only getting out of the loop if we can have another shot at the
+     * fast path.  Once we can, get out to compute the new destination
+     * value for the fast path.
+     */
+    while ((*val & QEMU_LOCKCNT_STATE_MASK) != QEMU_LOCKCNT_STATE_FREE) {
+        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_LOCKED) {
+            int expected = *val;
+            int new = expected - QEMU_LOCKCNT_STATE_LOCKED + QEMU_LOCKCNT_STATE_WAITING;
+
+            trace_lockcnt_futex_wait_prepare(lockcnt, expected, new);
+            *val = atomic_cmpxchg(&lockcnt->count, expected, new);
+            if (*val == expected) {
+                *val = new;
+            }
+            continue;
+        }
+
+        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_WAITING) {
+            *waited = true;
+            trace_lockcnt_futex_wait(lockcnt, *val);
+            futex_wait(&lockcnt->count, *val);
+            *val = atomic_read(&lockcnt->count);
+            trace_lockcnt_futex_wait_resume(lockcnt, *val);
+            continue;
+        }
+
+        abort();
+    }
+    return false;
+}
+
+static void lockcnt_wake(QemuLockCnt *lockcnt)
+{
+    trace_lockcnt_futex_wake(lockcnt);
+    futex_wake(&lockcnt->count, 1);
+}
+
+void qemu_lockcnt_inc(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    bool waited = false;
+
+    for (;;) {
+        if (val >= QEMU_LOCKCNT_COUNT_STEP) {
+            int expected = val;
+            val = atomic_cmpxchg(&lockcnt->count, val, val + QEMU_LOCKCNT_COUNT_STEP);
+            if (val == expected) {
+                break;
+            }
+        } else {
+            /* The fast path is (0, unlocked)->(1, unlocked).  */
+            if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, QEMU_LOCKCNT_COUNT_STEP,
+                                             &waited)) {
+                break;
+            }
+        }
+    }
+
+    /* If we were woken by another thread, we should also wake one because
+     * we are effectively releasing the lock that was given to us.  This is
+     * the case where qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING
+     * in the low bits, and qemu_lockcnt_inc_and_unlock would find it and
+     * wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+void qemu_lockcnt_dec(QemuLockCnt *lockcnt)
+{
+    atomic_sub(&lockcnt->count, QEMU_LOCKCNT_COUNT_STEP);
+}
+
+/* Decrement a counter, and return locked if it is decremented to zero.
+ * If the function returns true, it is impossible for the counter to
+ * become nonzero until the next qemu_lockcnt_unlock.
+ */
+bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    for (;;) {
+        if (val >= 2 * QEMU_LOCKCNT_COUNT_STEP) {
+            int expected = val;
+            int new = val - QEMU_LOCKCNT_COUNT_STEP;
+            val = atomic_cmpxchg(&lockcnt->count, val, new);
+            if (val == expected) {
+                break;
+            }
+        }
+
+        /* If count is going 1->0, take the lock. The fast path is
+         * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
+         */
+        if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
+            return true;
+        }
+
+        if (waited) {
+            /* At this point we do not know if there are more waiters.  Assume
+             * there are.
+             */
+            locked_state = QEMU_LOCKCNT_STATE_WAITING;
+        }
+    }
+
+    /* If we were woken by another thread, but we're returning in unlocked
+     * state, we should also wake a thread because we are effectively
+     * releasing the lock that was given to us.  This is the case where
+     * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
+     * bits, and qemu_lockcnt_unlock would find it and wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+    return false;
+}
+
+/* If the counter is one, decrement it and return locked.  Otherwise do
+ * nothing.
+ *
+ * If the function returns true, it is impossible for the counter to
+ * become nonzero until the next qemu_lockcnt_unlock.
+ */
+bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    while (val < 2 * QEMU_LOCKCNT_COUNT_STEP) {
+        /* If count is going 1->0, take the lock. The fast path is
+         * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
+         */
+        if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
+            return true;
+        }
+
+        if (waited) {
+            /* At this point we do not know if there are more waiters.  Assume
+             * there are.
+             */
+            locked_state = QEMU_LOCKCNT_STATE_WAITING;
+        }
+    }
+
+    /* If we were woken by another thread, but we're returning in unlocked
+     * state, we should also wake a thread because we are effectively
+     * releasing the lock that was given to us.  This is the case where
+     * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
+     * bits, and qemu_lockcnt_inc_and_unlock would find it and wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+    return false;
+}
+
+void qemu_lockcnt_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int step = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    /* The third argument is only used if the low bits of val are 0
+     * (QEMU_LOCKCNT_STATE_FREE), so just blindly mix in the desired
+     * state.
+     */
+    while (!qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, val + step, &waited)) {
+        if (waited) {
+            /* At this point we do not know if there are more waiters.  Assume
+             * there are.
+             */
+            step = QEMU_LOCKCNT_STATE_WAITING;
+        }
+    }
+}
+
+void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt)
+{
+    int expected, new, val;
+
+    val = atomic_read(&lockcnt->count);
+    do {
+        expected = val;
+        new = (val + QEMU_LOCKCNT_COUNT_STEP) & ~QEMU_LOCKCNT_STATE_MASK;
+        trace_lockcnt_unlock_attempt(lockcnt, val, new);
+        val = atomic_cmpxchg(&lockcnt->count, val, new);
+    } while (val != expected);
+
+    trace_lockcnt_unlock_success(lockcnt, val, new);
+    if (val & QEMU_LOCKCNT_STATE_WAITING) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+void qemu_lockcnt_unlock(QemuLockCnt *lockcnt)
+{
+    int expected, new, val;
+
+    val = atomic_read(&lockcnt->count);
+    do {
+        expected = val;
+        new = val & ~QEMU_LOCKCNT_STATE_MASK;
+        trace_lockcnt_unlock_attempt(lockcnt, val, new);
+        val = atomic_cmpxchg(&lockcnt->count, val, new);
+    } while (val != expected);
+
+    trace_lockcnt_unlock_success(lockcnt, val, new);
+    if (val & QEMU_LOCKCNT_STATE_WAITING) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
+{
+    return lockcnt->count >> QEMU_LOCKCNT_COUNT_SHIFT;
+}
+#else
 void qemu_lockcnt_init(QemuLockCnt *lockcnt)
 {
     qemu_mutex_init(&lockcnt->mutex);
@@ -120,3 +401,4 @@ unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
 {
     return lockcnt->count;
 }
+#endif
diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c
index 1aec83f..b1e3cdb 100644
--- a/util/qemu-thread-posix.c
+++ b/util/qemu-thread-posix.c
@@ -11,10 +11,6 @@
  *
  */
 #include "qemu/osdep.h"
-#ifdef __linux__
-#include <sys/syscall.h>
-#include <linux/futex.h>
-#endif
 #include "qemu/thread.h"
 #include "qemu/atomic.h"
 #include "qemu/notify.h"
@@ -293,26 +289,7 @@ void qemu_sem_wait(QemuSemaphore *sem)
 }
 
 #ifdef __linux__
-#define futex(...)              syscall(__NR_futex, __VA_ARGS__)
-
-static inline void futex_wake(QemuEvent *ev, int n)
-{
-    futex(ev, FUTEX_WAKE, n, NULL, NULL, 0);
-}
-
-static inline void futex_wait(QemuEvent *ev, unsigned val)
-{
-    while (futex(ev, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
-        switch (errno) {
-        case EWOULDBLOCK:
-            return;
-        case EINTR:
-            break; /* get out of switch and retry */
-        default:
-            abort();
-        }
-    }
-}
+#include "qemu/futex.h"
 #else
 static inline void futex_wake(QemuEvent *ev, int n)
 {
-- 
2.5.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [Qemu-devel] [PATCH 12/16] aio: tweak walking in dispatch phase
  2016-02-08 16:14 [Qemu-devel] [PATCH v2 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
                   ` (10 preceding siblings ...)
  2016-02-08 16:15 ` [Qemu-devel] [PATCH 11/16] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini
@ 2016-02-08 16:15 ` Paolo Bonzini
  2016-02-08 16:15 ` [Qemu-devel] [PATCH 13/16] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock Paolo Bonzini
                   ` (3 subsequent siblings)
  15 siblings, 0 replies; 21+ messages in thread
From: Paolo Bonzini @ 2016-02-08 16:15 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha

Preparing for the following patch, use QLIST_FOREACH_SAFE and
modify the placement of walking_handlers increment/decrement.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-posix.c | 27 +++++++++++++--------------
 aio-win32.c | 26 ++++++++++++--------------
 2 files changed, 25 insertions(+), 28 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index 4dc075c..450da51 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -295,7 +295,7 @@ bool aio_pending(AioContext *ctx)
 
 bool aio_dispatch(AioContext *ctx)
 {
-    AioHandler *node;
+    AioHandler *node, *tmp;
     bool progress = false;
 
     /*
@@ -311,12 +311,10 @@ bool aio_dispatch(AioContext *ctx)
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    node = QLIST_FIRST(&ctx->aio_handlers);
-    while (node) {
-        AioHandler *tmp;
-        int revents;
+    ctx->walking_handlers++;
 
-        ctx->walking_handlers++;
+    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
+        int revents;
 
         revents = node->pfd.revents & node->pfd.events;
         node->pfd.revents = 0;
@@ -338,17 +336,18 @@ bool aio_dispatch(AioContext *ctx)
             progress = true;
         }
 
-        tmp = node;
-        node = QLIST_NEXT(node, node);
-
-        ctx->walking_handlers--;
-
-        if (!ctx->walking_handlers && tmp->deleted) {
-            QLIST_REMOVE(tmp, node);
-            g_free(tmp);
+        if (node->deleted) {
+            ctx->walking_handlers--;
+            if (!ctx->walking_handlers) {
+                QLIST_REMOVE(node, node);
+                g_free(node);
+            }
+            ctx->walking_handlers++;
         }
     }
 
+    ctx->walking_handlers--;
+
     /* Run our timers */
     progress |= timerlistgroup_run_timers(&ctx->tlg);
 
diff --git a/aio-win32.c b/aio-win32.c
index 86ad822..f1a8780 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -209,20 +209,18 @@ bool aio_pending(AioContext *ctx)
 
 static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
 {
-    AioHandler *node;
+    AioHandler *node, *tmp;
     bool progress = false;
 
+    ctx->walking_handlers++;
+
     /*
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    node = QLIST_FIRST(&ctx->aio_handlers);
-    while (node) {
-        AioHandler *tmp;
+    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
         int revents = node->pfd.revents;
 
-        ctx->walking_handlers++;
-
         if (!node->deleted &&
             (revents || event_notifier_get_handle(node->e) == event) &&
             node->io_notify) {
@@ -257,17 +255,17 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
             }
         }
 
-        tmp = node;
-        node = QLIST_NEXT(node, node);
-
-        ctx->walking_handlers--;
-
-        if (!ctx->walking_handlers && tmp->deleted) {
-            QLIST_REMOVE(tmp, node);
-            g_free(tmp);
+        if (node->deleted) {
+            ctx->walking_handlers--;
+            if (!ctx->walking_handlers) {
+                QLIST_REMOVE(node, node);
+                g_free(node);
+            }
+            ctx->walking_handlers++;
         }
     }
 
+    ctx->walking_handlers--;
     return progress;
 }
 
-- 
2.5.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [Qemu-devel] [PATCH 13/16] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock
  2016-02-08 16:14 [Qemu-devel] [PATCH v2 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
                   ` (11 preceding siblings ...)
  2016-02-08 16:15 ` [Qemu-devel] [PATCH 12/16] aio: tweak walking in dispatch phase Paolo Bonzini
@ 2016-02-08 16:15 ` Paolo Bonzini
  2016-02-08 16:15 ` [Qemu-devel] [PATCH 14/16] aio-win32: " Paolo Bonzini
                   ` (2 subsequent siblings)
  15 siblings, 0 replies; 21+ messages in thread
From: Paolo Bonzini @ 2016-02-08 16:15 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-posix.c | 54 ++++++++++++++++++++++++++++++++----------------------
 1 file changed, 32 insertions(+), 22 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index 450da51..cbdc6e4 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -16,7 +16,7 @@
 #include "qemu/osdep.h"
 #include "qemu-common.h"
 #include "block/block.h"
-#include "qemu/queue.h"
+#include "qemu/rcu_queue.h"
 #include "qemu/sockets.h"
 #ifdef CONFIG_EPOLL
 #include <sys/epoll.h>
@@ -213,6 +213,8 @@ void aio_set_fd_handler(AioContext *ctx,
     bool is_new = false;
     bool deleted = false;
 
+    qemu_lockcnt_lock(&ctx->list_lock);
+
     node = find_aio_handler(ctx, fd);
 
     /* Are we deleting the fd handler? */
@@ -220,14 +222,14 @@ void aio_set_fd_handler(AioContext *ctx,
         if (node) {
             g_source_remove_poll(&ctx->source, &node->pfd);
 
-            /* If the lock is held, just mark the node as deleted */
-            if (ctx->walking_handlers) {
+            /* If aio_poll is in progress, just mark the node as deleted */
+            if (qemu_lockcnt_count(&ctx->list_lock)) {
                 node->deleted = 1;
                 node->pfd.revents = 0;
             } else {
                 /* Otherwise, delete it for real.  We can't just mark it as
                  * deleted because deleted nodes are only cleaned up after
-                 * releasing the walking_handlers lock.
+                 * releasing the list_lock.
                  */
                 QLIST_REMOVE(node, node);
                 deleted = true;
@@ -238,7 +240,7 @@ void aio_set_fd_handler(AioContext *ctx,
             /* Alloc and insert if it's not already there */
             node = g_new0(AioHandler, 1);
             node->pfd.fd = fd;
-            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
 
             g_source_add_poll(&ctx->source, &node->pfd);
             is_new = true;
@@ -254,6 +256,7 @@ void aio_set_fd_handler(AioContext *ctx,
     }
 
     aio_epoll_update(ctx, node, is_new);
+    qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
     if (deleted) {
         g_free(node);
@@ -277,20 +280,30 @@ bool aio_prepare(AioContext *ctx)
 bool aio_pending(AioContext *ctx)
 {
     AioHandler *node;
+    bool result = false;
 
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    /*
+     * We have to walk very carefully in case aio_set_fd_handler is
+     * called while we're walking.
+     */
+    qemu_lockcnt_inc(&ctx->list_lock);
+
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         int revents;
 
         revents = node->pfd.revents & node->pfd.events;
         if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
-            return true;
+            result = true;
+            break;
         }
         if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
-            return true;
+            result = true;
+            break;
         }
     }
+    qemu_lockcnt_dec(&ctx->list_lock);
 
-    return false;
+    return result;
 }
 
 bool aio_dispatch(AioContext *ctx)
@@ -311,13 +324,12 @@ bool aio_dispatch(AioContext *ctx)
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    ctx->walking_handlers++;
+    qemu_lockcnt_inc(&ctx->list_lock);
 
-    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
+    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
         int revents;
 
-        revents = node->pfd.revents & node->pfd.events;
-        node->pfd.revents = 0;
+        revents = atomic_xchg(&node->pfd.revents, 0) & node->pfd.events;
 
         if (!node->deleted &&
             (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
@@ -337,16 +349,15 @@ bool aio_dispatch(AioContext *ctx)
         }
 
         if (node->deleted) {
-            ctx->walking_handlers--;
-            if (!ctx->walking_handlers) {
+            if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
                 QLIST_REMOVE(node, node);
                 g_free(node);
+                qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
             }
-            ctx->walking_handlers++;
         }
     }
 
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
 
     /* Run our timers */
     progress |= timerlistgroup_run_timers(&ctx->tlg);
@@ -421,12 +432,11 @@ bool aio_poll_internal(AioContext *ctx, bool blocking)
         atomic_add(&ctx->notify_me, 2);
     }
 
-    ctx->walking_handlers++;
-
+    qemu_lockcnt_inc(&ctx->list_lock);
     assert(npfd == 0);
 
     /* fill pollfds */
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         if (!node->deleted && node->pfd.events
             && !aio_epoll_enabled(ctx)
             && aio_node_check(ctx, node->is_external)) {
@@ -463,12 +473,12 @@ bool aio_poll_internal(AioContext *ctx, bool blocking)
     /* if we have any readable fds, dispatch event */
     if (ret > 0) {
         for (i = 0; i < npfd; i++) {
-            nodes[i]->pfd.revents = pollfds[i].revents;
+            atomic_or(&nodes[i]->pfd.revents, pollfds[i].revents);
         }
     }
 
     npfd = 0;
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
 
     /* Run dispatch even if there were no readable fds to run timers */
     if (aio_dispatch(ctx)) {
-- 
2.5.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [Qemu-devel] [PATCH 14/16] aio-win32: remove walking_handlers, protecting AioHandler list with list_lock
  2016-02-08 16:14 [Qemu-devel] [PATCH v2 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
                   ` (12 preceding siblings ...)
  2016-02-08 16:15 ` [Qemu-devel] [PATCH 13/16] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock Paolo Bonzini
@ 2016-02-08 16:15 ` Paolo Bonzini
  2016-02-08 16:15 ` [Qemu-devel] [PATCH 15/16] aio: document locking Paolo Bonzini
  2016-02-08 16:15 ` [Qemu-devel] [PATCH 16/16] aio: push aio_context_acquire/release down to dispatching Paolo Bonzini
  15 siblings, 0 replies; 21+ messages in thread
From: Paolo Bonzini @ 2016-02-08 16:15 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-win32.c | 81 +++++++++++++++++++++++++++++++++++++------------------------
 1 file changed, 49 insertions(+), 32 deletions(-)

diff --git a/aio-win32.c b/aio-win32.c
index f1a8780..862f48c 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -43,6 +43,7 @@ void aio_set_fd_handler(AioContext *ctx,
     /* fd is a SOCKET in our case */
     AioHandler *node;
 
+    qemu_lockcnt_lock(&ctx->list_lock);
     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
         if (node->pfd.fd == fd && !node->deleted) {
             break;
@@ -52,14 +53,14 @@ void aio_set_fd_handler(AioContext *ctx,
     /* Are we deleting the fd handler? */
     if (!io_read && !io_write) {
         if (node) {
-            /* If the lock is held, just mark the node as deleted */
-            if (ctx->walking_handlers) {
+            /* If aio_poll is in progress, just mark the node as deleted */
+            if (qemu_lockcnt_count(&ctx->list_lock)) {
                 node->deleted = 1;
                 node->pfd.revents = 0;
             } else {
                 /* Otherwise, delete it for real.  We can't just mark it as
                  * deleted because deleted nodes are only cleaned up after
-                 * releasing the walking_handlers lock.
+                 * releasing the list_lock.
                  */
                 QLIST_REMOVE(node, node);
                 g_free(node);
@@ -72,7 +73,7 @@ void aio_set_fd_handler(AioContext *ctx,
             /* Alloc and insert if it's not already there */
             node = g_new0(AioHandler, 1);
             node->pfd.fd = fd;
-            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
         }
 
         node->pfd.events = 0;
@@ -97,6 +98,7 @@ void aio_set_fd_handler(AioContext *ctx,
                        FD_CONNECT | FD_WRITE | FD_OOB);
     }
 
+    qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
 }
 
@@ -107,6 +109,7 @@ void aio_set_event_notifier(AioContext *ctx,
 {
     AioHandler *node;
 
+    qemu_lockcnt_lock(&ctx->list_lock);
     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
         if (node->e == e && !node->deleted) {
             break;
@@ -118,14 +121,14 @@ void aio_set_event_notifier(AioContext *ctx,
         if (node) {
             g_source_remove_poll(&ctx->source, &node->pfd);
 
-            /* If the lock is held, just mark the node as deleted */
-            if (ctx->walking_handlers) {
+            /* aio_poll is in progress, just mark the node as deleted */
+            if (qemu_lockcnt_count(&ctx->list_lock)) {
                 node->deleted = 1;
                 node->pfd.revents = 0;
             } else {
                 /* Otherwise, delete it for real.  We can't just mark it as
                  * deleted because deleted nodes are only cleaned up after
-                 * releasing the walking_handlers lock.
+                 * releasing the list_lock.
                  */
                 QLIST_REMOVE(node, node);
                 g_free(node);
@@ -139,7 +142,7 @@ void aio_set_event_notifier(AioContext *ctx,
             node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
             node->pfd.events = G_IO_IN;
             node->is_external = is_external;
-            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
 
             g_source_add_poll(&ctx->source, &node->pfd);
         }
@@ -147,6 +150,7 @@ void aio_set_event_notifier(AioContext *ctx,
         node->io_notify = io_notify;
     }
 
+    qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
 }
 
@@ -157,10 +161,16 @@ bool aio_prepare(AioContext *ctx)
     bool have_select_revents = false;
     fd_set rfds, wfds;
 
+    /*
+     * We have to walk very carefully in case aio_set_fd_handler is
+     * called while we're walking.
+     */
+    qemu_lockcnt_inc(&ctx->list_lock);
+
     /* fill fd sets */
     FD_ZERO(&rfds);
     FD_ZERO(&wfds);
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         if (node->io_read) {
             FD_SET ((SOCKET)node->pfd.fd, &rfds);
         }
@@ -170,61 +180,71 @@ bool aio_prepare(AioContext *ctx)
     }
 
     if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
-        QLIST_FOREACH(node, &ctx->aio_handlers, node) {
-            node->pfd.revents = 0;
+        QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
             if (FD_ISSET(node->pfd.fd, &rfds)) {
-                node->pfd.revents |= G_IO_IN;
+                atomic_or(&node->pfd.revents, G_IO_IN);
                 have_select_revents = true;
             }
 
             if (FD_ISSET(node->pfd.fd, &wfds)) {
-                node->pfd.revents |= G_IO_OUT;
+                atomic_or(&node->pfd.revents, G_IO_OUT);
                 have_select_revents = true;
             }
         }
     }
 
+    qemu_lockcnt_dec(&ctx->list_lock);
     return have_select_revents;
 }
 
 bool aio_pending(AioContext *ctx)
 {
     AioHandler *node;
+    bool result = false;
 
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    /*
+     * We have to walk very carefully in case aio_set_fd_handler is
+     * called while we're walking.
+     */
+    qemu_lockcnt_inc(&ctx->list_lock);
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         if (node->pfd.revents && node->io_notify) {
-            return true;
+            result = true;
+            break;
         }
 
         if ((node->pfd.revents & G_IO_IN) && node->io_read) {
-            return true;
+            result = true;
+            break;
         }
         if ((node->pfd.revents & G_IO_OUT) && node->io_write) {
-            return true;
+            result = true;
+            break;
         }
     }
 
-    return false;
+    qemu_lockcnt_dec(&ctx->list_lock);
+    return result;
 }
 
 static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
 {
-    AioHandler *node, *tmp;
+    AioHandler *node;
     bool progress = false;
 
-    ctx->walking_handlers++;
+    qemu_lockcnt_inc(&ctx->list_lock);
 
     /*
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
-        int revents = node->pfd.revents;
+    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
+        AioHandler *tmp;
+        int revents = atomic_xchg(&node->pfd.revents, 0);
 
         if (!node->deleted &&
             (revents || event_notifier_get_handle(node->e) == event) &&
             node->io_notify) {
-            node->pfd.revents = 0;
             node->io_notify(node->e);
 
             /* aio_notify() does not count as progress */
@@ -235,7 +255,6 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
 
         if (!node->deleted &&
             (node->io_read || node->io_write)) {
-            node->pfd.revents = 0;
             if ((revents & G_IO_IN) && node->io_read) {
                 node->io_read(node->opaque);
                 progress = true;
@@ -256,16 +275,15 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
         }
 
         if (node->deleted) {
-            ctx->walking_handlers--;
-            if (!ctx->walking_handlers) {
+            if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
                 QLIST_REMOVE(node, node);
                 g_free(node);
+                qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
             }
-            ctx->walking_handlers++;
         }
     }
 
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
     return progress;
 }
 
@@ -301,20 +319,19 @@ bool aio_poll_internal(AioContext *ctx, bool blocking)
         atomic_add(&ctx->notify_me, 2);
     }
 
+    qemu_lockcnt_inc(&ctx->list_lock);
     have_select_revents = aio_prepare(ctx);
 
-    ctx->walking_handlers++;
-
     /* fill fd sets */
     count = 0;
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         if (!node->deleted && node->io_notify
             && aio_node_check(ctx, node->is_external)) {
             events[count++] = event_notifier_get_handle(node->e);
         }
     }
 
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
     first = true;
 
     /* ctx->notifier is always registered.  */
-- 
2.5.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [Qemu-devel] [PATCH 15/16] aio: document locking
  2016-02-08 16:14 [Qemu-devel] [PATCH v2 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
                   ` (13 preceding siblings ...)
  2016-02-08 16:15 ` [Qemu-devel] [PATCH 14/16] aio-win32: " Paolo Bonzini
@ 2016-02-08 16:15 ` Paolo Bonzini
  2016-02-08 16:15 ` [Qemu-devel] [PATCH 16/16] aio: push aio_context_acquire/release down to dispatching Paolo Bonzini
  15 siblings, 0 replies; 21+ messages in thread
From: Paolo Bonzini @ 2016-02-08 16:15 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/multiple-iothreads.txt |  5 ++---
 include/block/aio.h         | 32 ++++++++++++++++----------------
 2 files changed, 18 insertions(+), 19 deletions(-)

diff --git a/docs/multiple-iothreads.txt b/docs/multiple-iothreads.txt
index c5d38e9..8ec3777 100644
--- a/docs/multiple-iothreads.txt
+++ b/docs/multiple-iothreads.txt
@@ -84,9 +84,8 @@ How to synchronize with an IOThread
 AioContext is not thread-safe so some rules must be followed when using file
 descriptors, event notifiers, timers, or BHs across threads:
 
-1. AioContext functions can be called safely from file descriptor, event
-notifier, timer, or BH callbacks invoked by the AioContext.  No locking is
-necessary.
+1. AioContext functions can always be called safely.  They handle their
+own locking internally.
 
 2. Other threads wishing to access the AioContext must take the QEMU global
 mutex *as well as* call aio_context_acquire()/aio_context_release() for
diff --git a/include/block/aio.h b/include/block/aio.h
index fb0ff21..cea92b0 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -50,18 +50,12 @@ typedef void IOHandler(void *opaque);
 struct AioContext {
     GSource source;
 
-    /* Protects all fields from multi-threaded access */
+    /* Used by AioContext users to protect from multi-threaded access.  */
     QemuRecMutex lock;
 
-    /* The list of registered AIO handlers */
+    /* The list of registered AIO handlers.  Protected by ctx->list_lock. */
     QLIST_HEAD(, AioHandler) aio_handlers;
 
-    /* This is a simple lock used to protect the aio_handlers list.
-     * Specifically, it's used to ensure that no callbacks are removed while
-     * we're walking and dispatching callbacks.
-     */
-    int walking_handlers;
-
     /* Used to avoid unnecessary event_notifier_set calls in aio_notify;
      * accessed with atomic primitives.  If this field is 0, everything
      * (file descriptors, bottom halves, timers) will be re-evaluated
@@ -87,9 +81,9 @@ struct AioContext {
      */
     uint32_t notify_me;
 
-    /* A lock to protect between bh's adders and deleter, and to ensure
-     * that no callbacks are removed while we're walking and dispatching
-     * them.
+    /* A lock to protect between QEMUBH and AioHandler adders and deleter,
+     * and to ensure that no callbacks are removed while we're walking and
+     * dispatching them.
      */
     QemuLockCnt list_lock;
 
@@ -111,10 +105,14 @@ struct AioContext {
     bool notified;
     EventNotifier notifier;
 
-    /* Thread pool for performing work and receiving completion callbacks */
+    /* Thread pool for performing work and receiving completion callbacks.
+     * Has its own locking.
+     */
     struct ThreadPool *thread_pool;
 
-    /* TimerLists for calling timers - one per clock type */
+    /* TimerLists for calling timers - one per clock type.  Has its own
+     * locking.
+     */
     QEMUTimerListGroup tlg;
 
     int external_disable_cnt;
@@ -162,9 +160,11 @@ void aio_context_unref(AioContext *ctx);
  * automatically takes care of calling aio_context_acquire and
  * aio_context_release.
  *
- * Access to timers and BHs from a thread that has not acquired AioContext
- * is possible.  Access to callbacks for now must be done while the AioContext
- * is owned by the thread (FIXME).
+ * Note that this is separate from bdrv_drained_begin/bdrv_drained_end.  A
+ * thread still has to call those to avoid being interrupted by the guest.
+ *
+ * Bottom halves, timers and callbacks can be created or removed without
+ * acquiring the AioContext.
  */
 void aio_context_acquire(AioContext *ctx);
 
-- 
2.5.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [Qemu-devel] [PATCH 16/16] aio: push aio_context_acquire/release down to dispatching
  2016-02-08 16:14 [Qemu-devel] [PATCH v2 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
                   ` (14 preceding siblings ...)
  2016-02-08 16:15 ` [Qemu-devel] [PATCH 15/16] aio: document locking Paolo Bonzini
@ 2016-02-08 16:15 ` Paolo Bonzini
  15 siblings, 0 replies; 21+ messages in thread
From: Paolo Bonzini @ 2016-02-08 16:15 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha

The AioContext data structures are now protected by list_lock and/or
they are walked with FOREACH_RCU primitives.  There is no need anymore
to acquire the AioContext for the entire duration of aio_dispatch.
Instead, just acquire it before and after invoking the callbacks.
The next step is then to push it further down.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-posix.c | 15 ++++++---------
 aio-win32.c | 15 +++++++--------
 async.c     |  2 ++
 3 files changed, 15 insertions(+), 17 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index cbdc6e4..015d41a 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -334,7 +334,9 @@ bool aio_dispatch(AioContext *ctx)
         if (!node->deleted &&
             (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
             node->io_read) {
+            aio_context_acquire(ctx);
             node->io_read(node->opaque);
+            aio_context_release(ctx);
 
             /* aio_notify() does not count as progress */
             if (node->opaque != &ctx->notifier) {
@@ -344,7 +346,9 @@ bool aio_dispatch(AioContext *ctx)
         if (!node->deleted &&
             (revents & (G_IO_OUT | G_IO_ERR)) &&
             node->io_write) {
+            aio_context_acquire(ctx);
             node->io_write(node->opaque);
+            aio_context_release(ctx);
             progress = true;
         }
 
@@ -360,7 +364,9 @@ bool aio_dispatch(AioContext *ctx)
     qemu_lockcnt_dec(&ctx->list_lock);
 
     /* Run our timers */
+    aio_context_acquire(ctx);
     progress |= timerlistgroup_run_timers(&ctx->tlg);
+    aio_context_release(ctx);
 
     return progress;
 }
@@ -418,7 +424,6 @@ bool aio_poll_internal(AioContext *ctx, bool blocking)
     bool progress;
     int64_t timeout;
 
-    aio_context_acquire(ctx);
     progress = false;
 
     /* aio_notify can avoid the expensive event_notifier_set if
@@ -447,9 +452,6 @@ bool aio_poll_internal(AioContext *ctx, bool blocking)
     timeout = blocking ? aio_compute_timeout(ctx) : 0;
 
     /* wait until next event */
-    if (timeout) {
-        aio_context_release(ctx);
-    }
     if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) {
         AioHandler epoll_handler;
 
@@ -464,9 +466,6 @@ bool aio_poll_internal(AioContext *ctx, bool blocking)
     if (blocking) {
         atomic_sub(&ctx->notify_me, 2);
     }
-    if (timeout) {
-        aio_context_acquire(ctx);
-    }
 
     aio_notify_accept(ctx);
 
@@ -485,8 +484,6 @@ bool aio_poll_internal(AioContext *ctx, bool blocking)
         progress = true;
     }
 
-    aio_context_release(ctx);
-
     return progress;
 }
 
diff --git a/aio-win32.c b/aio-win32.c
index 862f48c..7e90dfd 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -245,7 +245,9 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
         if (!node->deleted &&
             (revents || event_notifier_get_handle(node->e) == event) &&
             node->io_notify) {
+            aio_context_acquire(ctx);
             node->io_notify(node->e);
+            aio_context_release(ctx);
 
             /* aio_notify() does not count as progress */
             if (node->e != &ctx->notifier) {
@@ -256,11 +258,15 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
         if (!node->deleted &&
             (node->io_read || node->io_write)) {
             if ((revents & G_IO_IN) && node->io_read) {
+                aio_context_acquire(ctx);
                 node->io_read(node->opaque);
+                aio_context_release(ctx);
                 progress = true;
             }
             if ((revents & G_IO_OUT) && node->io_write) {
+                aio_context_acquire(ctx);
                 node->io_write(node->opaque);
+                aio_context_release(ctx);
                 progress = true;
             }
 
@@ -305,7 +311,6 @@ bool aio_poll_internal(AioContext *ctx, bool blocking)
     int count;
     int timeout;
 
-    aio_context_acquire(ctx);
     progress = false;
 
     /* aio_notify can avoid the expensive event_notifier_set if
@@ -347,17 +352,11 @@ bool aio_poll_internal(AioContext *ctx, bool blocking)
 
         timeout = blocking && !have_select_revents
             ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0;
-        if (timeout) {
-            aio_context_release(ctx);
-        }
         ret = WaitForMultipleObjects(count, events, FALSE, timeout);
         if (blocking) {
             assert(first);
             atomic_sub(&ctx->notify_me, 2);
         }
-        if (timeout) {
-            aio_context_acquire(ctx);
-        }
 
         if (first) {
             aio_notify_accept(ctx);
@@ -380,8 +379,8 @@ bool aio_poll_internal(AioContext *ctx, bool blocking)
         progress |= aio_dispatch_handlers(ctx, event);
     } while (count > 0);
 
+    aio_context_acquire(ctx);
     progress |= timerlistgroup_run_timers(&ctx->tlg);
-
     aio_context_release(ctx);
     return progress;
 }
diff --git a/async.c b/async.c
index bc7e142..9eab833 100644
--- a/async.c
+++ b/async.c
@@ -88,7 +88,9 @@ int aio_bh_poll(AioContext *ctx)
                 ret = 1;
             }
             bh->idle = 0;
+            aio_context_acquire(ctx);
             aio_bh_call(bh);
+            aio_context_release(ctx);
         }
     }
 
-- 
2.5.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [PATCH 04/16] aio: only call aio_poll_internal from iothread
  2016-02-08 16:14 ` [Qemu-devel] [PATCH 04/16] aio: only call aio_poll_internal from iothread Paolo Bonzini
@ 2016-02-08 22:22   ` Eric Blake
  0 siblings, 0 replies; 21+ messages in thread
From: Eric Blake @ 2016-02-08 22:22 UTC (permalink / raw)
  To: Paolo Bonzini, qemu-devel; +Cc: stefanha

[-- Attachment #1: Type: text/plain, Size: 2429 bytes --]

On 02/08/2016 09:14 AM, Paolo Bonzini wrote:
> aio_poll is not thread safe; it can report progress incorrectly when
> called from the main thread.  The bug remains latent as long as
> all of it is called within aio_context_acquire/aio_context_release,
> but this will change soon.
> 
> The details of the bug are pretty simple, but fixing it in an
> efficient way is thorny.  There are plenty of comments and formal
> models in the patch, so I will refer to it.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---

> +++ b/async.c
> @@ -300,12 +300,224 @@ void aio_notify_accept(AioContext *ctx)
>      }
>  }
>  
> +/* aio_poll_internal is not thread-safe; it only reports progress
> + * correctly when called from one thread, because it has no
> + * history of what happened in different threads.  When called
> + * from two threads, there is a race:
> + *
> + *      main thread                       I/O thread
> + *      -----------------------           --------------------------
> + *      blk_drain
> + *        bdrv_requests_pending -> true
> + *                                        aio_poll_internal
> + *                                          process last request
> + *        aio_poll_internal
> + *
> + * Now aio_poll_internal will never exit, because there is no pending
> + * I/O on the AioContext.
> + *
> + * Therefore, aio_poll is a wrapper around aio_poll_internal that allows
> + * usage from _two_ threads: the I/O thread of course, and the main thread.
> + * When called from the main thread, aio_poll just asks the I/O thread
> + * for a nudge as soon as the next call to aio_poll is complete.
> + * Because we use QemuEvent, and QemuEvent supports a single consumer
> + * only, this only works when the calling thread holds the big QEMU lock.
> + *
> + * Because aio_poll is used in a loop, spurious wakeups are okay.
> + * Therefore, the I/O thread calls qemu_event_set very liberally
> + * (it helps that qemu_event_set is cheap on an already-set event).
> + * generally used in a loop, it's okay to have spurious wakeups.

Incomplete sentence due to bad rebase leftovers?

> + * Similarly it is okay to return true when no progress was made
> + * (as long as this doesn't happen forever, or you get livelock).
> + *
> +


-- 
Eric Blake   eblake redhat com    +1-919-301-3266
Libvirt virtualization library http://libvirt.org


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 604 bytes --]

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [PATCH 09/16] qemu-thread: introduce QemuLockCnt
  2016-02-08 16:15 ` [Qemu-devel] [PATCH 09/16] qemu-thread: introduce QemuLockCnt Paolo Bonzini
@ 2016-02-08 22:38   ` Eric Blake
  0 siblings, 0 replies; 21+ messages in thread
From: Eric Blake @ 2016-02-08 22:38 UTC (permalink / raw)
  To: Paolo Bonzini, qemu-devel; +Cc: stefanha

[-- Attachment #1: Type: text/plain, Size: 2191 bytes --]

On 02/08/2016 09:15 AM, Paolo Bonzini wrote:
> A QemuLockCnt comprises a counter and a mutex, with primitives
> to increment and decrement the counter, and to take and release the
> mutex.  It can be used to do lock-free visits to a data structure
> whenever mutexes would be too heavy-weight and the critical section
> is too long for RCU.
> 
> This could be implemented simply by protecting the counter with the
> mutex, but QemuLockCnt is harder to misuse and more efficient.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  docs/lockcnt.txt      | 343 ++++++++++++++++++++++++++++++++++++++++++++++++++
>  include/qemu/thread.h |  17 +++
>  util/Makefile.objs    |   1 +
>  util/lockcnt.c        | 122 ++++++++++++++++++
>  4 files changed, 483 insertions(+)
>  create mode 100644 docs/lockcnt.txt
>  create mode 100644 util/lockcnt.c
> 
> diff --git a/docs/lockcnt.txt b/docs/lockcnt.txt
> new file mode 100644
> index 0000000..fc5d240
> --- /dev/null
> +++ b/docs/lockcnt.txt
> @@ -0,0 +1,343 @@
> +DOCUMENTATION FOR LOCKED COUNTERS (aka QemuLockCnt)
> +===================================================

Worth an explicit mention that this document is GPLv2+ (or an explicit
choice of a different license)?

> +
> +QEMU often uses reference counts to track data structures that are being
> +accessed and should not be freed.  For example, a loop that invoke

s/invoke/invokes/

> +callbacks like this is not safe:
> +

but overall a nice writeup.

I'll leave the code review to others, though.

> +++ b/util/lockcnt.c
> @@ -0,0 +1,122 @@
> +/*
> + * QemuLockCnt implementation
> + *
> + * Copyright Red Hat, Inc. 2015
> + *
> + * Author:
> + *   Paolo Bonzini <pbonzini@redhat.com>
> + */
> +#include <stdlib.h>
> +#include <stdio.h>
> +#include <errno.h>
> +#include <time.h>
> +#include <signal.h>
> +#include <stdint.h>
> +#include <string.h>
> +#include <limits.h>
> +#include <unistd.h>
> +#include <sys/time.h>

You'll want to use qemu/osdep.h in place of several of these headers.

-- 
Eric Blake   eblake redhat com    +1-919-301-3266
Libvirt virtualization library http://libvirt.org


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 604 bytes --]

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [Qemu-devel] [PATCH 07/16] aio: convert from RFifoLock to QemuRecMutex
  2016-02-09 11:45 [Qemu-devel] [PATCH v3 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
@ 2016-02-09 11:46 ` Paolo Bonzini
  0 siblings, 0 replies; 21+ messages in thread
From: Paolo Bonzini @ 2016-02-09 11:46 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha

It is simpler and a bit faster, and QEMU does not need the contention
callbacks (and thus the fairness) anymore.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 async.c                  |  8 ++---
 include/block/aio.h      |  3 +-
 include/qemu/rfifolock.h | 54 ----------------------------
 tests/.gitignore         |  1 -
 tests/Makefile           |  2 --
 tests/test-rfifolock.c   | 91 ------------------------------------------------
 util/Makefile.objs       |  1 -
 util/rfifolock.c         | 78 -----------------------------------------
 8 files changed, 5 insertions(+), 233 deletions(-)
 delete mode 100644 include/qemu/rfifolock.h
 delete mode 100644 tests/test-rfifolock.c
 delete mode 100644 util/rfifolock.c

diff --git a/async.c b/async.c
index 17a11fe..fc11b1c 100644
--- a/async.c
+++ b/async.c
@@ -254,7 +254,7 @@ aio_ctx_finalize(GSource     *source)
 
     aio_set_event_notifier(ctx, &ctx->notifier, false, NULL);
     event_notifier_cleanup(&ctx->notifier);
-    rfifolock_destroy(&ctx->lock);
+    qemu_rec_mutex_destroy(&ctx->lock);
     qemu_mutex_destroy(&ctx->bh_lock);
     timerlistgroup_deinit(&ctx->tlg);
 }
@@ -551,7 +551,7 @@ AioContext *aio_context_new(Error **errp)
                            event_notifier_dummy_cb);
     ctx->thread_pool = NULL;
     qemu_mutex_init(&ctx->bh_lock);
-    rfifolock_init(&ctx->lock, NULL, NULL);
+    qemu_rec_mutex_init(&ctx->lock);
     timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 
     qemu_event_init(&ctx->sync_io_event, true);
@@ -577,7 +577,7 @@ void aio_context_acquire(AioContext *ctx)
     if (ctx == qemu_get_aio_context()) {
         assert(qemu_mutex_iothread_locked());
     } else {
-        rfifolock_lock(&ctx->lock);
+        qemu_rec_mutex_lock(&ctx->lock);
     }
 }
 
@@ -586,6 +586,6 @@ void aio_context_release(AioContext *ctx)
     if (ctx == qemu_get_aio_context()) {
         assert(qemu_mutex_iothread_locked());
     } else {
-        rfifolock_unlock(&ctx->lock);
+        qemu_rec_mutex_unlock(&ctx->lock);
     }
 }
diff --git a/include/block/aio.h b/include/block/aio.h
index 7223daf..3f055d2 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -19,7 +19,6 @@
 #include "qemu/queue.h"
 #include "qemu/event_notifier.h"
 #include "qemu/thread.h"
-#include "qemu/rfifolock.h"
 #include "qemu/timer.h"
 
 typedef struct BlockAIOCB BlockAIOCB;
@@ -52,7 +51,7 @@ struct AioContext {
     GSource source;
 
     /* Protects all fields from multi-threaded access */
-    RFifoLock lock;
+    QemuRecMutex lock;
 
     /* The list of registered AIO handlers */
     QLIST_HEAD(, AioHandler) aio_handlers;
diff --git a/include/qemu/rfifolock.h b/include/qemu/rfifolock.h
deleted file mode 100644
index b23ab53..0000000
--- a/include/qemu/rfifolock.h
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Recursive FIFO lock
- *
- * Copyright Red Hat, Inc. 2013
- *
- * Authors:
- *  Stefan Hajnoczi   <stefanha@redhat.com>
- *
- * This work is licensed under the terms of the GNU GPL, version 2 or later.
- * See the COPYING file in the top-level directory.
- *
- */
-
-#ifndef QEMU_RFIFOLOCK_H
-#define QEMU_RFIFOLOCK_H
-
-#include "qemu/thread.h"
-
-/* Recursive FIFO lock
- *
- * This lock provides more features than a plain mutex:
- *
- * 1. Fairness - enforces FIFO order.
- * 2. Nesting - can be taken recursively.
- * 3. Contention callback - optional, called when thread must wait.
- *
- * The recursive FIFO lock is heavyweight so prefer other synchronization
- * primitives if you do not need its features.
- */
-typedef struct {
-    QemuMutex lock;             /* protects all fields */
-
-    /* FIFO order */
-    unsigned int head;          /* active ticket number */
-    unsigned int tail;          /* waiting ticket number */
-    QemuCond cond;              /* used to wait for our ticket number */
-
-    /* Nesting */
-    QemuThread owner_thread;    /* thread that currently has ownership */
-    unsigned int nesting;       /* amount of nesting levels */
-
-    /* Contention callback */
-    void (*cb)(void *);         /* called when thread must wait, with ->lock
-                                 * held so it may not recursively lock/unlock
-                                 */
-    void *cb_opaque;
-} RFifoLock;
-
-void rfifolock_init(RFifoLock *r, void (*cb)(void *), void *opaque);
-void rfifolock_destroy(RFifoLock *r);
-void rfifolock_lock(RFifoLock *r);
-void rfifolock_unlock(RFifoLock *r);
-
-#endif /* QEMU_RFIFOLOCK_H */
diff --git a/tests/.gitignore b/tests/.gitignore
index 787c95c..b3da3f1 100644
--- a/tests/.gitignore
+++ b/tests/.gitignore
@@ -51,7 +51,6 @@ test-qmp-introspect.[ch]
 test-qmp-marshal.c
 test-qmp-output-visitor
 test-rcu-list
-test-rfifolock
 test-string-input-visitor
 test-string-output-visitor
 test-thread-pool
diff --git a/tests/Makefile b/tests/Makefile
index 650e654..9da0fcf 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -39,7 +39,6 @@ check-unit-y += tests/test-visitor-serialization$(EXESUF)
 check-unit-y += tests/test-iov$(EXESUF)
 gcov-files-test-iov-y = util/iov.c
 check-unit-y += tests/test-aio$(EXESUF)
-check-unit-$(CONFIG_POSIX) += tests/test-rfifolock$(EXESUF)
 check-unit-y += tests/test-throttle$(EXESUF)
 gcov-files-test-aio-$(CONFIG_WIN32) = aio-win32.c
 gcov-files-test-aio-$(CONFIG_POSIX) = aio-posix.c
@@ -403,7 +402,6 @@ tests/check-qom-interface$(EXESUF): tests/check-qom-interface.o $(test-qom-obj-y
 tests/check-qom-proplist$(EXESUF): tests/check-qom-proplist.o $(test-qom-obj-y)
 tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(test-block-obj-y)
 tests/test-aio$(EXESUF): tests/test-aio.o $(test-block-obj-y)
-tests/test-rfifolock$(EXESUF): tests/test-rfifolock.o $(test-util-obj-y)
 tests/test-throttle$(EXESUF): tests/test-throttle.o $(test-block-obj-y)
 tests/test-blockjob-txn$(EXESUF): tests/test-blockjob-txn.o $(test-block-obj-y) $(test-util-obj-y)
 tests/test-thread-pool$(EXESUF): tests/test-thread-pool.o $(test-block-obj-y)
diff --git a/tests/test-rfifolock.c b/tests/test-rfifolock.c
deleted file mode 100644
index 0572ebb..0000000
--- a/tests/test-rfifolock.c
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * RFifoLock tests
- *
- * Copyright Red Hat, Inc. 2013
- *
- * Authors:
- *  Stefan Hajnoczi    <stefanha@redhat.com>
- *
- * This work is licensed under the terms of the GNU LGPL, version 2 or later.
- * See the COPYING.LIB file in the top-level directory.
- */
-
-#include <glib.h>
-#include "qemu-common.h"
-#include "qemu/rfifolock.h"
-
-static void test_nesting(void)
-{
-    RFifoLock lock;
-
-    /* Trivial test, ensure the lock is recursive */
-    rfifolock_init(&lock, NULL, NULL);
-    rfifolock_lock(&lock);
-    rfifolock_lock(&lock);
-    rfifolock_lock(&lock);
-    rfifolock_unlock(&lock);
-    rfifolock_unlock(&lock);
-    rfifolock_unlock(&lock);
-    rfifolock_destroy(&lock);
-}
-
-typedef struct {
-    RFifoLock lock;
-    int fd[2];
-} CallbackTestData;
-
-static void rfifolock_cb(void *opaque)
-{
-    CallbackTestData *data = opaque;
-    int ret;
-    char c = 0;
-
-    ret = write(data->fd[1], &c, sizeof(c));
-    g_assert(ret == 1);
-}
-
-static void *callback_thread(void *opaque)
-{
-    CallbackTestData *data = opaque;
-
-    /* The other thread holds the lock so the contention callback will be
-     * invoked...
-     */
-    rfifolock_lock(&data->lock);
-    rfifolock_unlock(&data->lock);
-    return NULL;
-}
-
-static void test_callback(void)
-{
-    CallbackTestData data;
-    QemuThread thread;
-    int ret;
-    char c;
-
-    rfifolock_init(&data.lock, rfifolock_cb, &data);
-    ret = qemu_pipe(data.fd);
-    g_assert(ret == 0);
-
-    /* Hold lock but allow the callback to kick us by writing to the pipe */
-    rfifolock_lock(&data.lock);
-    qemu_thread_create(&thread, "callback_thread",
-                       callback_thread, &data, QEMU_THREAD_JOINABLE);
-    ret = read(data.fd[0], &c, sizeof(c));
-    g_assert(ret == 1);
-    rfifolock_unlock(&data.lock);
-    /* If we got here then the callback was invoked, as expected */
-
-    qemu_thread_join(&thread);
-    close(data.fd[0]);
-    close(data.fd[1]);
-    rfifolock_destroy(&data.lock);
-}
-
-int main(int argc, char **argv)
-{
-    g_test_init(&argc, &argv, NULL);
-    g_test_add_func("/nesting", test_nesting);
-    g_test_add_func("/callback", test_callback);
-    return g_test_run();
-}
diff --git a/util/Makefile.objs b/util/Makefile.objs
index a8a777e..c0223c6 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -23,7 +23,6 @@ util-obj-y += crc32c.o
 util-obj-y += throttle.o
 util-obj-y += getauxval.o
 util-obj-y += readline.o
-util-obj-y += rfifolock.o
 util-obj-y += rcu.o
 util-obj-y += qemu-coroutine.o qemu-coroutine-lock.o qemu-coroutine-io.o
 util-obj-y += qemu-coroutine-sleep.o
diff --git a/util/rfifolock.c b/util/rfifolock.c
deleted file mode 100644
index c22f5fe..0000000
--- a/util/rfifolock.c
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Recursive FIFO lock
- *
- * Copyright Red Hat, Inc. 2013
- *
- * Authors:
- *  Stefan Hajnoczi   <stefanha@redhat.com>
- *
- * This work is licensed under the terms of the GNU LGPL, version 2 or later.
- * See the COPYING.LIB file in the top-level directory.
- *
- */
-
-#include "qemu/osdep.h"
-#include "qemu/rfifolock.h"
-
-void rfifolock_init(RFifoLock *r, void (*cb)(void *), void *opaque)
-{
-    qemu_mutex_init(&r->lock);
-    r->head = 0;
-    r->tail = 0;
-    qemu_cond_init(&r->cond);
-    r->nesting = 0;
-    r->cb = cb;
-    r->cb_opaque = opaque;
-}
-
-void rfifolock_destroy(RFifoLock *r)
-{
-    qemu_cond_destroy(&r->cond);
-    qemu_mutex_destroy(&r->lock);
-}
-
-/*
- * Theory of operation:
- *
- * In order to ensure FIFO ordering, implement a ticketlock.  Threads acquiring
- * the lock enqueue themselves by incrementing the tail index.  When the lock
- * is unlocked, the head is incremented and waiting threads are notified.
- *
- * Recursive locking does not take a ticket since the head is only incremented
- * when the outermost recursive caller unlocks.
- */
-void rfifolock_lock(RFifoLock *r)
-{
-    qemu_mutex_lock(&r->lock);
-
-    /* Take a ticket */
-    unsigned int ticket = r->tail++;
-
-    if (r->nesting > 0 && qemu_thread_is_self(&r->owner_thread)) {
-        r->tail--; /* put ticket back, we're nesting */
-    } else {
-        while (ticket != r->head) {
-            /* Invoke optional contention callback */
-            if (r->cb) {
-                r->cb(r->cb_opaque);
-            }
-            qemu_cond_wait(&r->cond, &r->lock);
-        }
-    }
-
-    qemu_thread_get_self(&r->owner_thread);
-    r->nesting++;
-    qemu_mutex_unlock(&r->lock);
-}
-
-void rfifolock_unlock(RFifoLock *r)
-{
-    qemu_mutex_lock(&r->lock);
-    assert(r->nesting > 0);
-    assert(qemu_thread_is_self(&r->owner_thread));
-    if (--r->nesting == 0) {
-        r->head++;
-        qemu_cond_broadcast(&r->cond);
-    }
-    qemu_mutex_unlock(&r->lock);
-}
-- 
2.5.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [Qemu-devel] [PATCH 07/16] aio: convert from RFifoLock to QemuRecMutex
  2016-01-15 15:12 [Qemu-devel] [PATCH 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
@ 2016-01-15 15:12 ` Paolo Bonzini
  0 siblings, 0 replies; 21+ messages in thread
From: Paolo Bonzini @ 2016-01-15 15:12 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha

It is simpler and a bit faster, and QEMU does not need the contention
callbacks (and thus the fairness) anymore.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 async.c                  |  8 ++---
 include/block/aio.h      |  3 +-
 include/qemu/rfifolock.h | 54 ----------------------------
 tests/.gitignore         |  1 -
 tests/Makefile           |  2 --
 tests/test-rfifolock.c   | 91 ------------------------------------------------
 util/Makefile.objs       |  1 -
 util/rfifolock.c         | 78 -----------------------------------------
 8 files changed, 5 insertions(+), 233 deletions(-)
 delete mode 100644 include/qemu/rfifolock.h
 delete mode 100644 tests/test-rfifolock.c
 delete mode 100644 util/rfifolock.c

diff --git a/async.c b/async.c
index e4ef8b8..6eee89d 100644
--- a/async.c
+++ b/async.c
@@ -253,7 +253,7 @@ aio_ctx_finalize(GSource     *source)
 
     aio_set_event_notifier(ctx, &ctx->notifier, false, NULL);
     event_notifier_cleanup(&ctx->notifier);
-    rfifolock_destroy(&ctx->lock);
+    qemu_rec_mutex_destroy(&ctx->lock);
     qemu_mutex_destroy(&ctx->bh_lock);
     timerlistgroup_deinit(&ctx->tlg);
 }
@@ -549,7 +549,7 @@ AioContext *aio_context_new(Error **errp)
                            event_notifier_dummy_cb);
     ctx->thread_pool = NULL;
     qemu_mutex_init(&ctx->bh_lock);
-    rfifolock_init(&ctx->lock, NULL, NULL);
+    qemu_rec_mutex_init(&ctx->lock);
     timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 
     qemu_event_init(&ctx->sync_io_event, true);
@@ -575,7 +575,7 @@ void aio_context_acquire(AioContext *ctx)
     if (ctx == qemu_get_aio_context()) {
         assert(qemu_mutex_iothread_locked());
     } else {
-        rfifolock_lock(&ctx->lock);
+        qemu_rec_mutex_lock(&ctx->lock);
     }
 }
 
@@ -584,6 +584,6 @@ void aio_context_release(AioContext *ctx)
     if (ctx == qemu_get_aio_context()) {
         assert(qemu_mutex_iothread_locked());
     } else {
-        rfifolock_unlock(&ctx->lock);
+        qemu_rec_mutex_unlock(&ctx->lock);
     }
 }
diff --git a/include/block/aio.h b/include/block/aio.h
index 7223daf..3f055d2 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -19,7 +19,6 @@
 #include "qemu/queue.h"
 #include "qemu/event_notifier.h"
 #include "qemu/thread.h"
-#include "qemu/rfifolock.h"
 #include "qemu/timer.h"
 
 typedef struct BlockAIOCB BlockAIOCB;
@@ -52,7 +51,7 @@ struct AioContext {
     GSource source;
 
     /* Protects all fields from multi-threaded access */
-    RFifoLock lock;
+    QemuRecMutex lock;
 
     /* The list of registered AIO handlers */
     QLIST_HEAD(, AioHandler) aio_handlers;
diff --git a/include/qemu/rfifolock.h b/include/qemu/rfifolock.h
deleted file mode 100644
index b23ab53..0000000
--- a/include/qemu/rfifolock.h
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Recursive FIFO lock
- *
- * Copyright Red Hat, Inc. 2013
- *
- * Authors:
- *  Stefan Hajnoczi   <stefanha@redhat.com>
- *
- * This work is licensed under the terms of the GNU GPL, version 2 or later.
- * See the COPYING file in the top-level directory.
- *
- */
-
-#ifndef QEMU_RFIFOLOCK_H
-#define QEMU_RFIFOLOCK_H
-
-#include "qemu/thread.h"
-
-/* Recursive FIFO lock
- *
- * This lock provides more features than a plain mutex:
- *
- * 1. Fairness - enforces FIFO order.
- * 2. Nesting - can be taken recursively.
- * 3. Contention callback - optional, called when thread must wait.
- *
- * The recursive FIFO lock is heavyweight so prefer other synchronization
- * primitives if you do not need its features.
- */
-typedef struct {
-    QemuMutex lock;             /* protects all fields */
-
-    /* FIFO order */
-    unsigned int head;          /* active ticket number */
-    unsigned int tail;          /* waiting ticket number */
-    QemuCond cond;              /* used to wait for our ticket number */
-
-    /* Nesting */
-    QemuThread owner_thread;    /* thread that currently has ownership */
-    unsigned int nesting;       /* amount of nesting levels */
-
-    /* Contention callback */
-    void (*cb)(void *);         /* called when thread must wait, with ->lock
-                                 * held so it may not recursively lock/unlock
-                                 */
-    void *cb_opaque;
-} RFifoLock;
-
-void rfifolock_init(RFifoLock *r, void (*cb)(void *), void *opaque);
-void rfifolock_destroy(RFifoLock *r);
-void rfifolock_lock(RFifoLock *r);
-void rfifolock_unlock(RFifoLock *r);
-
-#endif /* QEMU_RFIFOLOCK_H */
diff --git a/tests/.gitignore b/tests/.gitignore
index 787c95c..b3da3f1 100644
--- a/tests/.gitignore
+++ b/tests/.gitignore
@@ -51,7 +51,6 @@ test-qmp-introspect.[ch]
 test-qmp-marshal.c
 test-qmp-output-visitor
 test-rcu-list
-test-rfifolock
 test-string-input-visitor
 test-string-output-visitor
 test-thread-pool
diff --git a/tests/Makefile b/tests/Makefile
index b7352f1..73a6971 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -39,7 +39,6 @@ check-unit-y += tests/test-visitor-serialization$(EXESUF)
 check-unit-y += tests/test-iov$(EXESUF)
 gcov-files-test-iov-y = util/iov.c
 check-unit-y += tests/test-aio$(EXESUF)
-check-unit-$(CONFIG_POSIX) += tests/test-rfifolock$(EXESUF)
 check-unit-y += tests/test-throttle$(EXESUF)
 gcov-files-test-aio-$(CONFIG_WIN32) = aio-win32.c
 gcov-files-test-aio-$(CONFIG_POSIX) = aio-posix.c
@@ -403,7 +402,6 @@ tests/check-qom-interface$(EXESUF): tests/check-qom-interface.o $(test-qom-obj-y
 tests/check-qom-proplist$(EXESUF): tests/check-qom-proplist.o $(test-qom-obj-y)
 tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(test-block-obj-y)
 tests/test-aio$(EXESUF): tests/test-aio.o $(test-block-obj-y)
-tests/test-rfifolock$(EXESUF): tests/test-rfifolock.o $(test-util-obj-y)
 tests/test-throttle$(EXESUF): tests/test-throttle.o $(test-block-obj-y)
 tests/test-blockjob-txn$(EXESUF): tests/test-blockjob-txn.o $(test-block-obj-y) $(test-util-obj-y)
 tests/test-thread-pool$(EXESUF): tests/test-thread-pool.o $(test-block-obj-y)
diff --git a/tests/test-rfifolock.c b/tests/test-rfifolock.c
deleted file mode 100644
index 0572ebb..0000000
--- a/tests/test-rfifolock.c
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * RFifoLock tests
- *
- * Copyright Red Hat, Inc. 2013
- *
- * Authors:
- *  Stefan Hajnoczi    <stefanha@redhat.com>
- *
- * This work is licensed under the terms of the GNU LGPL, version 2 or later.
- * See the COPYING.LIB file in the top-level directory.
- */
-
-#include <glib.h>
-#include "qemu-common.h"
-#include "qemu/rfifolock.h"
-
-static void test_nesting(void)
-{
-    RFifoLock lock;
-
-    /* Trivial test, ensure the lock is recursive */
-    rfifolock_init(&lock, NULL, NULL);
-    rfifolock_lock(&lock);
-    rfifolock_lock(&lock);
-    rfifolock_lock(&lock);
-    rfifolock_unlock(&lock);
-    rfifolock_unlock(&lock);
-    rfifolock_unlock(&lock);
-    rfifolock_destroy(&lock);
-}
-
-typedef struct {
-    RFifoLock lock;
-    int fd[2];
-} CallbackTestData;
-
-static void rfifolock_cb(void *opaque)
-{
-    CallbackTestData *data = opaque;
-    int ret;
-    char c = 0;
-
-    ret = write(data->fd[1], &c, sizeof(c));
-    g_assert(ret == 1);
-}
-
-static void *callback_thread(void *opaque)
-{
-    CallbackTestData *data = opaque;
-
-    /* The other thread holds the lock so the contention callback will be
-     * invoked...
-     */
-    rfifolock_lock(&data->lock);
-    rfifolock_unlock(&data->lock);
-    return NULL;
-}
-
-static void test_callback(void)
-{
-    CallbackTestData data;
-    QemuThread thread;
-    int ret;
-    char c;
-
-    rfifolock_init(&data.lock, rfifolock_cb, &data);
-    ret = qemu_pipe(data.fd);
-    g_assert(ret == 0);
-
-    /* Hold lock but allow the callback to kick us by writing to the pipe */
-    rfifolock_lock(&data.lock);
-    qemu_thread_create(&thread, "callback_thread",
-                       callback_thread, &data, QEMU_THREAD_JOINABLE);
-    ret = read(data.fd[0], &c, sizeof(c));
-    g_assert(ret == 1);
-    rfifolock_unlock(&data.lock);
-    /* If we got here then the callback was invoked, as expected */
-
-    qemu_thread_join(&thread);
-    close(data.fd[0]);
-    close(data.fd[1]);
-    rfifolock_destroy(&data.lock);
-}
-
-int main(int argc, char **argv)
-{
-    g_test_init(&argc, &argv, NULL);
-    g_test_add_func("/nesting", test_nesting);
-    g_test_add_func("/callback", test_callback);
-    return g_test_run();
-}
diff --git a/util/Makefile.objs b/util/Makefile.objs
index 8620a80..8fd03d4 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -23,7 +23,6 @@ util-obj-y += crc32c.o
 util-obj-y += throttle.o
 util-obj-y += getauxval.o
 util-obj-y += readline.o
-util-obj-y += rfifolock.o
 util-obj-y += rcu.o
 util-obj-y += qemu-coroutine.o qemu-coroutine-lock.o qemu-coroutine-io.o
 util-obj-y += qemu-coroutine-sleep.o
diff --git a/util/rfifolock.c b/util/rfifolock.c
deleted file mode 100644
index afbf748..0000000
--- a/util/rfifolock.c
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Recursive FIFO lock
- *
- * Copyright Red Hat, Inc. 2013
- *
- * Authors:
- *  Stefan Hajnoczi   <stefanha@redhat.com>
- *
- * This work is licensed under the terms of the GNU LGPL, version 2 or later.
- * See the COPYING.LIB file in the top-level directory.
- *
- */
-
-#include <assert.h>
-#include "qemu/rfifolock.h"
-
-void rfifolock_init(RFifoLock *r, void (*cb)(void *), void *opaque)
-{
-    qemu_mutex_init(&r->lock);
-    r->head = 0;
-    r->tail = 0;
-    qemu_cond_init(&r->cond);
-    r->nesting = 0;
-    r->cb = cb;
-    r->cb_opaque = opaque;
-}
-
-void rfifolock_destroy(RFifoLock *r)
-{
-    qemu_cond_destroy(&r->cond);
-    qemu_mutex_destroy(&r->lock);
-}
-
-/*
- * Theory of operation:
- *
- * In order to ensure FIFO ordering, implement a ticketlock.  Threads acquiring
- * the lock enqueue themselves by incrementing the tail index.  When the lock
- * is unlocked, the head is incremented and waiting threads are notified.
- *
- * Recursive locking does not take a ticket since the head is only incremented
- * when the outermost recursive caller unlocks.
- */
-void rfifolock_lock(RFifoLock *r)
-{
-    qemu_mutex_lock(&r->lock);
-
-    /* Take a ticket */
-    unsigned int ticket = r->tail++;
-
-    if (r->nesting > 0 && qemu_thread_is_self(&r->owner_thread)) {
-        r->tail--; /* put ticket back, we're nesting */
-    } else {
-        while (ticket != r->head) {
-            /* Invoke optional contention callback */
-            if (r->cb) {
-                r->cb(r->cb_opaque);
-            }
-            qemu_cond_wait(&r->cond, &r->lock);
-        }
-    }
-
-    qemu_thread_get_self(&r->owner_thread);
-    r->nesting++;
-    qemu_mutex_unlock(&r->lock);
-}
-
-void rfifolock_unlock(RFifoLock *r)
-{
-    qemu_mutex_lock(&r->lock);
-    assert(r->nesting > 0);
-    assert(qemu_thread_is_self(&r->owner_thread));
-    if (--r->nesting == 0) {
-        r->head++;
-        qemu_cond_broadcast(&r->cond);
-    }
-    qemu_mutex_unlock(&r->lock);
-}
-- 
2.5.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

end of thread, other threads:[~2016-02-09 11:46 UTC | newest]

Thread overview: 21+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-02-08 16:14 [Qemu-devel] [PATCH v2 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
2016-02-08 16:14 ` [Qemu-devel] [PATCH 01/16] aio: introduce aio_context_in_iothread Paolo Bonzini
2016-02-08 16:14 ` [Qemu-devel] [PATCH 02/16] aio: do not really acquire/release the main AIO context Paolo Bonzini
2016-02-08 16:14 ` [Qemu-devel] [PATCH 03/16] aio: introduce aio_poll_internal Paolo Bonzini
2016-02-08 16:14 ` [Qemu-devel] [PATCH 04/16] aio: only call aio_poll_internal from iothread Paolo Bonzini
2016-02-08 22:22   ` Eric Blake
2016-02-08 16:14 ` [Qemu-devel] [PATCH 05/16] iothread: release AioContext around aio_poll Paolo Bonzini
2016-02-08 16:14 ` [Qemu-devel] [PATCH 06/16] qemu-thread: introduce QemuRecMutex Paolo Bonzini
2016-02-08 16:14 ` [Qemu-devel] [PATCH 07/16] aio: convert from RFifoLock to QemuRecMutex Paolo Bonzini
2016-02-08 16:14 ` [Qemu-devel] [PATCH 08/16] aio: rename bh_lock to list_lock Paolo Bonzini
2016-02-08 16:15 ` [Qemu-devel] [PATCH 09/16] qemu-thread: introduce QemuLockCnt Paolo Bonzini
2016-02-08 22:38   ` Eric Blake
2016-02-08 16:15 ` [Qemu-devel] [PATCH 10/16] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh Paolo Bonzini
2016-02-08 16:15 ` [Qemu-devel] [PATCH 11/16] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini
2016-02-08 16:15 ` [Qemu-devel] [PATCH 12/16] aio: tweak walking in dispatch phase Paolo Bonzini
2016-02-08 16:15 ` [Qemu-devel] [PATCH 13/16] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock Paolo Bonzini
2016-02-08 16:15 ` [Qemu-devel] [PATCH 14/16] aio-win32: " Paolo Bonzini
2016-02-08 16:15 ` [Qemu-devel] [PATCH 15/16] aio: document locking Paolo Bonzini
2016-02-08 16:15 ` [Qemu-devel] [PATCH 16/16] aio: push aio_context_acquire/release down to dispatching Paolo Bonzini
  -- strict thread matches above, loose matches on Subject: below --
2016-02-09 11:45 [Qemu-devel] [PATCH v3 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
2016-02-09 11:46 ` [Qemu-devel] [PATCH 07/16] aio: convert from RFifoLock to QemuRecMutex Paolo Bonzini
2016-01-15 15:12 [Qemu-devel] [PATCH 00/16] aio: first part of aio_context_acquire/release pushdown Paolo Bonzini
2016-01-15 15:12 ` [Qemu-devel] [PATCH 07/16] aio: convert from RFifoLock to QemuRecMutex Paolo Bonzini

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).