All of lore.kernel.org
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2
@ 2017-01-20 16:43 Paolo Bonzini
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 01/17] aio: introduce aio_co_schedule and aio_co_wake Paolo Bonzini
                   ` (18 more replies)
  0 siblings, 19 replies; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-20 16:43 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz

This series pushes down aio_context_acquire/release to the point
where we can actually reason on using different fine-grained mutexes.

The main infrastructure is introduced in patch 1.  The new API aio_co_wake
starts a coroutine with aio_context_acquire/release protection, which
requires tracking each coroutine's "home" AioContext.  aio_co_schedule
instead takes care of moving a sleeping coroutine to a different
AioContext, also ensuring that it runs under aio_context_acquire/release.
This is useful to implement bdrv_set_aio_context, as a simpler alternative
to bottom halves.  Even though one-shot BHs are already simpler than
what we had before, after this patch aio_co_wake and aio_co_schedule
save you from having to do aio_context_acquire/release explicitly.

After patch 2 to 4, which are just small preparatory changes, patches
5 to 8 provide an example of how to use the new API.  In particular patch
5 to 7 implement a new organization of coroutines in the NBD client,
which allows not blocking on partial reply header reads.

Patch 9 introduces helpers for AioContext locking in QED, which is
the most complex AIO-based driver left.  Then the actual meat of the
series runs from patch 10 to patch 14, followed by small optimizations
in patches 15 and 16.

The patches do some back and forth in adding/removing
aio_context_acquire/release calls in block/*.c but ultimately a small
number of aio_context_acquire/release pairs are added after the pushdown.
These are mostly in drivers that use external libraries (and actually
they could already be replaced by QemuMutex) and in device models
that support multithreaded operation (aka iothread aka dataplane).

Notably, coroutines need not care about aio_context_acquire/release.
The device models ensure that the first creation of the coroutine has
the AioContext, while aio_co_wake/aio_co_schedule do the same after
they yield.  Therefore, most of the files only need to use those two
functions instead of, respectively, qemu_coroutine_enter and
aio_bh_schedule_oneshot.

However, this is only an intermediate step which is needed because the
block layer and qemu-coroutine locks are thread-unsafe.  So the next
part will add separate locking, independent of AioContext, to block.c and
mostly block/io.c---this includes making CoMutex thread-safe.  Patch 17
therefore already documents the current locking policies block.h to
prepare for the next series.

Paolo

v1->v2:
        new patch (4) removing block-obj-y -> io-obj-y dependency
        removed QIOChannelRestart from
                "io: add methods to set I/O handlers on AioContext"
        improved qio_channel_set_aio_context docs, renamed to
                qio_channel_attach_aio_context
        fixed pasto in "io: make qio_channel_yield aware of AioContexts"
        document restrictions in bdrv_aio_cancel due to qemu_aio_ref
        converted NBD server to qio_channel_yield too
        allow NULL s->read_reply_co
        
*** BLURB HERE ***

Paolo Bonzini (17):
  aio: introduce aio_co_schedule and aio_co_wake
  block-backend: allow blk_prw from coroutine context
  test-thread-pool: use generic AioContext infrastructure
  block: move AioContext and QEMUTimer to libqemuutil
  io: add methods to set I/O handlers on AioContext
  io: make qio_channel_yield aware of AioContexts
  nbd: convert to use qio_channel_yield
  coroutine-lock: reschedule coroutine on the AioContext it was running
    on
  qed: introduce qed_aio_start_io and qed_aio_next_io_cb
  aio: push aio_context_acquire/release down to dispatching
  block: explicitly acquire aiocontext in timers that need it
  block: explicitly acquire aiocontext in callbacks that need it
  block: explicitly acquire aiocontext in bottom halves that need it
  block: explicitly acquire aiocontext in aio callbacks that need it
  aio-posix: partially inline aio_dispatch into aio_poll
  async: remove unnecessary inc/dec pairs
  block: document fields protected by AioContext lock

 Makefile.objs                       |   5 +-
 block/blkdebug.c                    |   9 +-
 block/blkreplay.c                   |   2 +-
 block/block-backend.c               |  13 ++-
 block/curl.c                        |  44 ++++++--
 block/gluster.c                     |   9 +-
 block/io.c                          |  38 ++-----
 block/iscsi.c                       |  15 ++-
 block/linux-aio.c                   |  10 +-
 block/mirror.c                      |  12 +-
 block/nbd-client.c                  | 108 ++++++++----------
 block/nbd-client.h                  |   2 +-
 block/nfs.c                         |   9 +-
 block/qed-cluster.c                 |   2 +
 block/qed-table.c                   |  12 +-
 block/qed.c                         |  58 +++++++---
 block/qed.h                         |   3 +
 block/sheepdog.c                    |  29 ++---
 block/ssh.c                         |  29 ++---
 block/throttle-groups.c             |   2 +
 block/win32-aio.c                   |   9 +-
 dma-helpers.c                       |   2 +
 hw/block/virtio-blk.c               |  19 +++-
 hw/scsi/scsi-bus.c                  |   2 +
 hw/scsi/scsi-disk.c                 |  15 +++
 hw/scsi/scsi-generic.c              |  20 +++-
 hw/scsi/virtio-scsi.c               |   6 +
 include/block/aio.h                 |  38 ++++++-
 include/block/block_int.h           |  64 ++++++-----
 include/io/channel.h                |  72 +++++++++++-
 include/qemu/coroutine_int.h        |  10 +-
 include/sysemu/block-backend.h      |  14 ++-
 io/channel-command.c                |  13 +++
 io/channel-file.c                   |  11 ++
 io/channel-socket.c                 |  16 ++-
 io/channel-tls.c                    |  12 ++
 io/channel-watch.c                  |   6 +
 io/channel.c                        |  97 ++++++++++++----
 nbd/client.c                        |   2 +-
 nbd/common.c                        |   9 +-
 nbd/server.c                        |  94 +++++-----------
 stubs/Makefile.objs                 |   2 +
 stubs/linux-aio.c                   |  32 ++++++
 stubs/main-loop.c                   |   8 ++
 stubs/set-fd-handler.c              |  11 --
 tests/Makefile.include              |  18 +--
 tests/iothread.c                    |  91 +++++++++++++++
 tests/iothread.h                    |  25 +++++
 tests/test-aio-multithread.c        | 213 ++++++++++++++++++++++++++++++++++++
 tests/test-thread-pool.c            |  12 +-
 tests/test-vmstate.c                |  11 --
 trace-events                        |   4 +
 util/Makefile.objs                  |   5 +-
 aio-posix.c => util/aio-posix.c     |  60 +++-------
 aio-win32.c => util/aio-win32.c     |  30 ++---
 util/aiocb.c                        |  55 ++++++++++
 async.c => util/async.c             |  84 ++++++++++++--
 util/qemu-coroutine-lock.c          |   5 +-
 util/qemu-coroutine-sleep.c         |   2 +-
 util/qemu-coroutine.c               |   8 ++
 qemu-timer.c => util/qemu-timer.c   |   0
 thread-pool.c => util/thread-pool.c |   6 +-
 util/trace-events                   |   1 -
 63 files changed, 1155 insertions(+), 465 deletions(-)
 create mode 100644 stubs/linux-aio.c
 create mode 100644 stubs/main-loop.c
 create mode 100644 tests/iothread.c
 create mode 100644 tests/iothread.h
 create mode 100644 tests/test-aio-multithread.c
 rename aio-posix.c => util/aio-posix.c (94%)
 rename aio-win32.c => util/aio-win32.c (95%)
 create mode 100644 util/aiocb.c
 rename async.c => util/async.c (82%)
 rename qemu-timer.c => util/qemu-timer.c (100%)
 rename thread-pool.c => util/thread-pool.c (98%)

-- 
2.9.3

^ permalink raw reply	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH 01/17] aio: introduce aio_co_schedule and aio_co_wake
  2017-01-20 16:43 [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Paolo Bonzini
@ 2017-01-20 16:43 ` Paolo Bonzini
  2017-01-30 15:18   ` Stefan Hajnoczi
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 02/17] block-backend: allow blk_prw from coroutine context Paolo Bonzini
                   ` (17 subsequent siblings)
  18 siblings, 1 reply; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-20 16:43 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz

aio_co_wake provides the infrastructure to start a coroutine on a "home"
AioContext.  It will be used by CoMutex and CoQueue, so that coroutines
don't jump from one context to another when they go to sleep on a
mutex or waitqueue.  However, it can also be used as a more efficient
alternative to one-shot bottom halves, and saves the effort of tracking
which AioContext a coroutine is running on.

aio_co_schedule is the part of aio_co_wake that starts a coroutine
on a remove AioContext, but it is also useful to implement e.g.
bdrv_set_aio_context callbacks.

The implementation of aio_co_schedule is based on a lock-free
multiple-producer, single-consumer queue.  The multiple producers use
cmpxchg to add to a LIFO stack.  The consumer (a per-AioContext bottom
half) grabs all items added so far, inverts the list to make it FIFO,
and goes through it one item at a time until it's empty.  The data
structure was inspired by OSv, which uses it in the very code we'll
"port" to QEMU for the thread-safe CoMutex.

Most of the new code is really tests.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
        v1->v2: added tests/iothread.o to test-block-obj-y to
        account for the new patch 4

 async.c                      |  65 +++++++++++++
 include/block/aio.h          |  32 +++++++
 include/qemu/coroutine_int.h |  10 +-
 tests/Makefile.include       |  11 ++-
 tests/iothread.c             |  91 ++++++++++++++++++
 tests/iothread.h             |  25 +++++
 tests/test-aio-multithread.c | 213 +++++++++++++++++++++++++++++++++++++++++++
 tests/test-vmstate.c         |  11 ---
 trace-events                 |   4 +
 util/qemu-coroutine.c        |   8 ++
 10 files changed, 455 insertions(+), 15 deletions(-)
 create mode 100644 tests/iothread.c
 create mode 100644 tests/iothread.h
 create mode 100644 tests/test-aio-multithread.c

diff --git a/async.c b/async.c
index 0d218ab..1338682 100644
--- a/async.c
+++ b/async.c
@@ -30,6 +30,8 @@
 #include "qemu/main-loop.h"
 #include "qemu/atomic.h"
 #include "block/raw-aio.h"
+#include "trace/generated-tracers.h"
+#include "qemu/coroutine_int.h"
 
 /***********************************************************/
 /* bottom halves (can be seen as timers which expire ASAP) */
@@ -274,6 +276,9 @@ aio_ctx_finalize(GSource     *source)
     }
 #endif
 
+    assert(QSLIST_EMPTY(&ctx->scheduled_coroutines));
+    qemu_bh_delete(ctx->co_schedule_bh);
+
     qemu_lockcnt_lock(&ctx->list_lock);
     assert(!qemu_lockcnt_count(&ctx->list_lock));
     while (ctx->first_bh) {
@@ -363,6 +368,28 @@ static bool event_notifier_poll(void *opaque)
     return atomic_read(&ctx->notified);
 }
 
+static void co_schedule_bh_cb(void *opaque)
+{
+    AioContext *ctx = opaque;
+    QSLIST_HEAD(, Coroutine) straight, reversed;
+
+    QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
+    QSLIST_INIT(&straight);
+
+    while (!QSLIST_EMPTY(&reversed)) {
+        Coroutine *co = QSLIST_FIRST(&reversed);
+        QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
+        QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
+    }
+
+    while (!QSLIST_EMPTY(&straight)) {
+        Coroutine *co = QSLIST_FIRST(&straight);
+        QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
+        trace_aio_co_schedule_bh_cb(ctx, co);
+        qemu_coroutine_enter(co);
+    }
+}
+
 AioContext *aio_context_new(Error **errp)
 {
     int ret;
@@ -378,6 +405,10 @@ AioContext *aio_context_new(Error **errp)
     }
     g_source_set_can_recurse(&ctx->source, true);
     qemu_lockcnt_init(&ctx->list_lock);
+
+    ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
+    QSLIST_INIT(&ctx->scheduled_coroutines);
+
     aio_set_event_notifier(ctx, &ctx->notifier,
                            false,
                            (EventNotifierHandler *)
@@ -401,6 +432,40 @@ fail:
     return NULL;
 }
 
+void aio_co_schedule(AioContext *ctx, Coroutine *co)
+{
+    trace_aio_co_schedule(ctx, co);
+    QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines,
+                              co, co_scheduled_next);
+    qemu_bh_schedule(ctx->co_schedule_bh);
+}
+
+void aio_co_wake(struct Coroutine *co)
+{
+    AioContext *ctx;
+
+    /* Read coroutine before co->ctx.  Matches smp_wmb in
+     * qemu_coroutine_enter.
+     */
+    smp_read_barrier_depends();
+    ctx = atomic_read(&co->ctx);
+
+    if (ctx != qemu_get_current_aio_context()) {
+        aio_co_schedule(ctx, co);
+        return;
+    }
+
+    if (qemu_in_coroutine()) {
+        Coroutine *self = qemu_coroutine_self();
+        assert(self != co);
+        QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next);
+    } else {
+        aio_context_acquire(ctx);
+        qemu_coroutine_enter(co);
+        aio_context_release(ctx);
+    }
+}
+
 void aio_context_ref(AioContext *ctx)
 {
     g_source_ref(&ctx->source);
diff --git a/include/block/aio.h b/include/block/aio.h
index 7df271d..614cbc6 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -47,6 +47,7 @@ typedef void QEMUBHFunc(void *opaque);
 typedef bool AioPollFn(void *opaque);
 typedef void IOHandler(void *opaque);
 
+struct Coroutine;
 struct ThreadPool;
 struct LinuxAioState;
 
@@ -108,6 +109,9 @@ struct AioContext {
     bool notified;
     EventNotifier notifier;
 
+    QSLIST_HEAD(, Coroutine) scheduled_coroutines;
+    QEMUBH *co_schedule_bh;
+
     /* Thread pool for performing work and receiving completion callbacks.
      * Has its own locking.
      */
@@ -483,6 +487,34 @@ static inline bool aio_node_check(AioContext *ctx, bool is_external)
 }
 
 /**
+ * aio_co_schedule:
+ * @ctx: the aio context
+ * @co: the coroutine
+ *
+ * Start a coroutine on a remote AioContext.
+ *
+ * The coroutine must not be entered by anyone else while aio_co_schedule()
+ * is active.  In addition the coroutine must have yielded unless ctx
+ * is the context in which the coroutine is running (i.e. the value of
+ * qemu_get_current_aio_context() from the coroutine itself).
+ */
+void aio_co_schedule(AioContext *ctx, struct Coroutine *co);
+
+/**
+ * aio_co_wake:
+ * @co: the coroutine
+ *
+ * Restart a coroutine on the AioContext where it was running last, thus
+ * preventing coroutines from jumping from one context to another when they
+ * go to sleep.
+ *
+ * aio_co_wake may be executed either in coroutine or non-coroutine
+ * context.  The coroutine must not be entered by anyone else while
+ * aio_co_wake() is active.
+ */
+void aio_co_wake(struct Coroutine *co);
+
+/**
  * Return the AioContext whose event loop runs in the current thread.
  *
  * If called from an IOThread this will be the IOThread's AioContext.  If
diff --git a/include/qemu/coroutine_int.h b/include/qemu/coroutine_int.h
index 14d4f1d..1efa356 100644
--- a/include/qemu/coroutine_int.h
+++ b/include/qemu/coroutine_int.h
@@ -40,12 +40,20 @@ struct Coroutine {
     CoroutineEntry *entry;
     void *entry_arg;
     Coroutine *caller;
+
+    /* Only used when the coroutine has terminated.  */
     QSLIST_ENTRY(Coroutine) pool_next;
     size_t locks_held;
 
-    /* Coroutines that should be woken up when we yield or terminate */
+    /* Coroutines that should be woken up when we yield or terminate.
+     * Only used when the coroutine is running.
+     */
     QSIMPLEQ_HEAD(, Coroutine) co_queue_wakeup;
+
+    /* Only used when the coroutine is sleeping.  */
+    AioContext *ctx;
     QSIMPLEQ_ENTRY(Coroutine) co_queue_next;
+    QSLIST_ENTRY(Coroutine) co_scheduled_next;
 };
 
 Coroutine *qemu_coroutine_new(void);
diff --git a/tests/Makefile.include b/tests/Makefile.include
index f776404..4478616 100644
--- a/tests/Makefile.include
+++ b/tests/Makefile.include
@@ -45,9 +45,13 @@ check-unit-y += tests/test-visitor-serialization$(EXESUF)
 check-unit-y += tests/test-iov$(EXESUF)
 gcov-files-test-iov-y = util/iov.c
 check-unit-y += tests/test-aio$(EXESUF)
+gcov-files-test-aio-y = async.c
+gcov-files-test-aio-$(CONFIG_WIN32) += aio-win32.c
+gcov-files-test-aio-$(CONFIG_POSIX) += aio-posix.c
+check-unit-y += tests/test-aio-multithread$(EXESUF)
+gcov-files-test-aio-multithread-y = $(gcov-files-test-aio-y)
+gcov-files-test-aio-multithread-y += util/qemu-coroutine.c tests/iothread.c
 check-unit-y += tests/test-throttle$(EXESUF)
-gcov-files-test-aio-$(CONFIG_WIN32) = aio-win32.c
-gcov-files-test-aio-$(CONFIG_POSIX) = aio-posix.c
 check-unit-y += tests/test-thread-pool$(EXESUF)
 gcov-files-test-thread-pool-y = thread-pool.c
 gcov-files-test-hbitmap-y = util/hbitmap.c
@@ -477,7 +481,7 @@ test-qapi-obj-y = tests/test-qapi-visit.o tests/test-qapi-types.o \
 	$(test-qom-obj-y)
 test-crypto-obj-y = $(crypto-obj-y) $(test-qom-obj-y)
 test-io-obj-y = $(io-obj-y) $(test-crypto-obj-y)
-test-block-obj-y = $(block-obj-y) $(test-io-obj-y)
+test-block-obj-y = $(block-obj-y) $(test-io-obj-y) tests/iothread.o
 
 tests/check-qint$(EXESUF): tests/check-qint.o $(test-util-obj-y)
 tests/check-qstring$(EXESUF): tests/check-qstring.o $(test-util-obj-y)
@@ -492,6 +496,7 @@ tests/check-qom-proplist$(EXESUF): tests/check-qom-proplist.o $(test-qom-obj-y)
 tests/test-char$(EXESUF): tests/test-char.o qemu-char.o qemu-timer.o $(test-util-obj-y) $(qtest-obj-y) $(test-io-obj-y)
 tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(test-block-obj-y)
 tests/test-aio$(EXESUF): tests/test-aio.o $(test-block-obj-y)
+tests/test-aio-multithread$(EXESUF): tests/test-aio-multithread.o $(test-block-obj-y)
 tests/test-throttle$(EXESUF): tests/test-throttle.o $(test-block-obj-y)
 tests/test-blockjob$(EXESUF): tests/test-blockjob.o $(test-block-obj-y) $(test-util-obj-y)
 tests/test-blockjob-txn$(EXESUF): tests/test-blockjob-txn.o $(test-block-obj-y) $(test-util-obj-y)
diff --git a/tests/iothread.c b/tests/iothread.c
new file mode 100644
index 0000000..777d9ee
--- /dev/null
+++ b/tests/iothread.c
@@ -0,0 +1,91 @@
+/*
+ * Event loop thread implementation for unit tests
+ *
+ * Copyright Red Hat Inc., 2013, 2016
+ *
+ * Authors:
+ *  Stefan Hajnoczi   <stefanha@redhat.com>
+ *  Paolo Bonzini     <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "qapi/error.h"
+#include "block/aio.h"
+#include "qemu/main-loop.h"
+#include "qemu/rcu.h"
+#include "iothread.h"
+
+struct IOThread {
+    AioContext *ctx;
+
+    QemuThread thread;
+    QemuMutex init_done_lock;
+    QemuCond init_done_cond;    /* is thread initialization done? */
+    bool stopping;
+};
+
+static __thread IOThread *my_iothread;
+
+AioContext *qemu_get_current_aio_context(void)
+{
+    return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
+}
+
+static void *iothread_run(void *opaque)
+{
+    IOThread *iothread = opaque;
+
+    rcu_register_thread();
+
+    my_iothread = iothread;
+    qemu_mutex_lock(&iothread->init_done_lock);
+    iothread->ctx = aio_context_new(&error_abort);
+    qemu_cond_signal(&iothread->init_done_cond);
+    qemu_mutex_unlock(&iothread->init_done_lock);
+
+    while (!atomic_read(&iothread->stopping)) {
+        aio_poll(iothread->ctx, true);
+    }
+
+    rcu_unregister_thread();
+    return NULL;
+}
+
+void iothread_join(IOThread *iothread)
+{
+    iothread->stopping = true;
+    aio_notify(iothread->ctx);
+    qemu_thread_join(&iothread->thread);
+    qemu_cond_destroy(&iothread->init_done_cond);
+    qemu_mutex_destroy(&iothread->init_done_lock);
+    aio_context_unref(iothread->ctx);
+    g_free(iothread);
+}
+
+IOThread *iothread_new(void)
+{
+    IOThread *iothread = g_new0(IOThread, 1);
+
+    qemu_mutex_init(&iothread->init_done_lock);
+    qemu_cond_init(&iothread->init_done_cond);
+    qemu_thread_create(&iothread->thread, NULL, iothread_run,
+                       iothread, QEMU_THREAD_JOINABLE);
+
+    /* Wait for initialization to complete */
+    qemu_mutex_lock(&iothread->init_done_lock);
+    while (iothread->ctx == NULL) {
+        qemu_cond_wait(&iothread->init_done_cond,
+                       &iothread->init_done_lock);
+    }
+    qemu_mutex_unlock(&iothread->init_done_lock);
+    return iothread;
+}
+
+AioContext *iothread_get_aio_context(IOThread *iothread)
+{
+    return iothread->ctx;
+}
diff --git a/tests/iothread.h b/tests/iothread.h
new file mode 100644
index 0000000..4877cea
--- /dev/null
+++ b/tests/iothread.h
@@ -0,0 +1,25 @@
+/*
+ * Event loop thread implementation for unit tests
+ *
+ * Copyright Red Hat Inc., 2013, 2016
+ *
+ * Authors:
+ *  Stefan Hajnoczi   <stefanha@redhat.com>
+ *  Paolo Bonzini     <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+#ifndef TEST_IOTHREAD_H
+#define TEST_IOTHREAD_H
+
+#include "block/aio.h"
+#include "qemu/thread.h"
+
+typedef struct IOThread IOThread;
+
+IOThread *iothread_new(void);
+void iothread_join(IOThread *iothread);
+AioContext *iothread_get_aio_context(IOThread *iothread);
+
+#endif
diff --git a/tests/test-aio-multithread.c b/tests/test-aio-multithread.c
new file mode 100644
index 0000000..17e81f9
--- /dev/null
+++ b/tests/test-aio-multithread.c
@@ -0,0 +1,213 @@
+/*
+ * AioContext multithreading tests
+ *
+ * Copyright Red Hat, Inc. 2016
+ *
+ * Authors:
+ *  Paolo Bonzini    <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include <glib.h>
+#include "block/aio.h"
+#include "qapi/error.h"
+#include "qemu/coroutine.h"
+#include "qemu/thread.h"
+#include "qemu/error-report.h"
+#include "iothread.h"
+
+/* AioContext management */
+
+#define NUM_CONTEXTS 5
+
+static IOThread *threads[NUM_CONTEXTS];
+static AioContext *ctx[NUM_CONTEXTS];
+static __thread int id = -1;
+
+static QemuEvent done_event;
+
+/* Run a function synchronously on a remote iothread. */
+
+typedef struct CtxRunData {
+    QEMUBHFunc *cb;
+    void *arg;
+} CtxRunData;
+
+static void ctx_run_bh_cb(void *opaque)
+{
+    CtxRunData *data = opaque;
+
+    data->cb(data->arg);
+    qemu_event_set(&done_event);
+}
+
+static void ctx_run(int i, QEMUBHFunc *cb, void *opaque)
+{
+    CtxRunData data = {
+        .cb = cb,
+        .arg = opaque
+    };
+
+    qemu_event_reset(&done_event);
+    aio_bh_schedule_oneshot(ctx[i], ctx_run_bh_cb, &data);
+    qemu_event_wait(&done_event);
+}
+
+/* Starting the iothreads. */
+
+static void set_id_cb(void *opaque)
+{
+    int *i = opaque;
+
+    id = *i;
+}
+
+static void create_aio_contexts(void)
+{
+    int i;
+
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        threads[i] = iothread_new();
+        ctx[i] = iothread_get_aio_context(threads[i]);
+    }
+
+    qemu_event_init(&done_event, false);
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        ctx_run(i, set_id_cb, &i);
+    }
+}
+
+/* Stopping the iothreads. */
+
+static void join_aio_contexts(void)
+{
+    int i;
+
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        aio_context_ref(ctx[i]);
+    }
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        iothread_join(threads[i]);
+    }
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        aio_context_unref(ctx[i]);
+    }
+    qemu_event_destroy(&done_event);
+}
+
+/* Basic test for the stuff above. */
+
+static void test_lifecycle(void)
+{
+    create_aio_contexts();
+    join_aio_contexts();
+}
+
+/* aio_co_schedule test.  */
+
+static Coroutine *to_schedule[NUM_CONTEXTS];
+
+static bool now_stopping;
+
+static int count_retry;
+static int count_here;
+static int count_other;
+
+static bool schedule_next(int n)
+{
+    Coroutine *co;
+
+    co = atomic_xchg(&to_schedule[n], NULL);
+    if (!co) {
+        atomic_inc(&count_retry);
+        return false;
+    }
+
+    if (n == id) {
+        atomic_inc(&count_here);
+    } else {
+        atomic_inc(&count_other);
+    }
+
+    aio_co_schedule(ctx[n], co);
+    return true;
+}
+
+static void finish_cb(void *opaque)
+{
+    schedule_next(id);
+}
+
+static void test_multi_co_schedule_entry(void *opaque)
+{
+    g_assert(to_schedule[id] == NULL);
+    atomic_mb_set(&to_schedule[id], qemu_coroutine_self());
+
+    while (!atomic_mb_read(&now_stopping)) {
+        int n;
+
+        n = g_test_rand_int_range(0, NUM_CONTEXTS);
+        schedule_next(n);
+        qemu_coroutine_yield();
+
+        g_assert(to_schedule[id] == NULL);
+        atomic_mb_set(&to_schedule[id], qemu_coroutine_self());
+    }
+}
+
+
+static void test_multi_co_schedule(int seconds)
+{
+    int i;
+
+    count_here = count_other = count_retry = 0;
+    now_stopping = false;
+
+    create_aio_contexts();
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        Coroutine *co1 = qemu_coroutine_create(test_multi_co_schedule_entry, NULL);
+        aio_co_schedule(ctx[i], co1);
+    }
+
+    g_usleep(seconds * 1000000);
+
+    atomic_mb_set(&now_stopping, true);
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        ctx_run(i, finish_cb, NULL);
+        to_schedule[i] = NULL;
+    }
+
+    join_aio_contexts();
+    g_test_message("scheduled %d, queued %d, retry %d, total %d\n",
+                  count_other, count_here, count_retry,
+                  count_here + count_other + count_retry);
+}
+
+static void test_multi_co_schedule_1(void)
+{
+    test_multi_co_schedule(1);
+}
+
+static void test_multi_co_schedule_10(void)
+{
+    test_multi_co_schedule(10);
+}
+
+/* End of tests.  */
+
+int main(int argc, char **argv)
+{
+    init_clocks();
+
+    g_test_init(&argc, &argv, NULL);
+    g_test_add_func("/aio/multi/lifecycle", test_lifecycle);
+    if (g_test_quick()) {
+        g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1);
+    } else {
+        g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10);
+    }
+    return g_test_run();
+}
diff --git a/tests/test-vmstate.c b/tests/test-vmstate.c
index d2f529b..cb87997 100644
--- a/tests/test-vmstate.c
+++ b/tests/test-vmstate.c
@@ -33,17 +33,6 @@
 static char temp_file[] = "/tmp/vmst.test.XXXXXX";
 static int temp_fd;
 
-/* Fake yield_until_fd_readable() implementation so we don't have to pull the
- * coroutine code as dependency.
- */
-void yield_until_fd_readable(int fd)
-{
-    fd_set fds;
-    FD_ZERO(&fds);
-    FD_SET(fd, &fds);
-    select(fd + 1, &fds, NULL, NULL, NULL);
-}
-
 
 /* Duplicate temp_fd and seek to the beginning of the file */
 static QEMUFile *open_test_file(bool write)
diff --git a/trace-events b/trace-events
index 1181486..e9e31e9 100644
--- a/trace-events
+++ b/trace-events
@@ -85,6 +85,10 @@ xen_map_cache(uint64_t phys_addr) "want %#"PRIx64
 xen_remap_bucket(uint64_t index) "index %#"PRIx64
 xen_map_cache_return(void* ptr) "%p"
 
+# async.c
+aio_co_schedule(void *ctx, void *co) "ctx %p co %p"
+aio_co_schedule_bh_cb(void *ctx, void *co) "ctx %p co %p"
+
 # monitor.c
 handle_qmp_command(void *mon, const char *cmd_name) "mon %p cmd_name \"%s\""
 monitor_protocol_event_handler(uint32_t event, void *qdict) "event=%d data=%p"
diff --git a/util/qemu-coroutine.c b/util/qemu-coroutine.c
index a5d2f6c..415600d 100644
--- a/util/qemu-coroutine.c
+++ b/util/qemu-coroutine.c
@@ -19,6 +19,7 @@
 #include "qemu/atomic.h"
 #include "qemu/coroutine.h"
 #include "qemu/coroutine_int.h"
+#include "block/aio.h"
 
 enum {
     POOL_BATCH_SIZE = 64,
@@ -114,6 +115,13 @@ void qemu_coroutine_enter(Coroutine *co)
     }
 
     co->caller = self;
+    co->ctx = qemu_get_current_aio_context();
+
+    /* Store co->ctx before anything that stores co.  Matches
+     * barrier in aio_co_wake.
+     */
+    smp_wmb();
+
     ret = qemu_coroutine_switch(self, co, COROUTINE_ENTER);
 
     qemu_co_queue_run_restart(co);
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH 02/17] block-backend: allow blk_prw from coroutine context
  2017-01-20 16:43 [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Paolo Bonzini
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 01/17] aio: introduce aio_co_schedule and aio_co_wake Paolo Bonzini
@ 2017-01-20 16:43 ` Paolo Bonzini
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 03/17] test-thread-pool: use generic AioContext infrastructure Paolo Bonzini
                   ` (16 subsequent siblings)
  18 siblings, 0 replies; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-20 16:43 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz

qcow2_create2 calls this.  Do not run a nested event loop, as that
breaks when aio_co_wake tries to queue the coroutine on the co_queue_wakeup
list of the currently running one.

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/block-backend.c | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/block/block-backend.c b/block/block-backend.c
index efbf398..1177598 100644
--- a/block/block-backend.c
+++ b/block/block-backend.c
@@ -880,7 +880,6 @@ static int blk_prw(BlockBackend *blk, int64_t offset, uint8_t *buf,
 {
     QEMUIOVector qiov;
     struct iovec iov;
-    Coroutine *co;
     BlkRwCo rwco;
 
     iov = (struct iovec) {
@@ -897,9 +896,14 @@ static int blk_prw(BlockBackend *blk, int64_t offset, uint8_t *buf,
         .ret    = NOT_DONE,
     };
 
-    co = qemu_coroutine_create(co_entry, &rwco);
-    qemu_coroutine_enter(co);
-    BDRV_POLL_WHILE(blk_bs(blk), rwco.ret == NOT_DONE);
+    if (qemu_in_coroutine()) {
+        /* Fast-path if already in coroutine context */
+        co_entry(&rwco);
+    } else {
+        Coroutine *co = qemu_coroutine_create(co_entry, &rwco);
+        qemu_coroutine_enter(co);
+        BDRV_POLL_WHILE(blk_bs(blk), rwco.ret == NOT_DONE);
+    }
 
     return rwco.ret;
 }
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH 03/17] test-thread-pool: use generic AioContext infrastructure
  2017-01-20 16:43 [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Paolo Bonzini
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 01/17] aio: introduce aio_co_schedule and aio_co_wake Paolo Bonzini
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 02/17] block-backend: allow blk_prw from coroutine context Paolo Bonzini
@ 2017-01-20 16:43 ` Paolo Bonzini
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 04/17] block: move AioContext and QEMUTimer to libqemuutil Paolo Bonzini
                   ` (15 subsequent siblings)
  18 siblings, 0 replies; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-20 16:43 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz

Once the thread pool starts using aio_co_wake, it will also need
qemu_get_current_aio_context().  Make test-thread-pool create
an AioContext with qemu_init_main_loop, so that stubs/iothread.c
and tests/iothread.c can provide the rest.

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 tests/test-thread-pool.c | 12 +++---------
 1 file changed, 3 insertions(+), 9 deletions(-)

diff --git a/tests/test-thread-pool.c b/tests/test-thread-pool.c
index 8dbf66a..91b4ec5 100644
--- a/tests/test-thread-pool.c
+++ b/tests/test-thread-pool.c
@@ -6,6 +6,7 @@
 #include "qapi/error.h"
 #include "qemu/timer.h"
 #include "qemu/error-report.h"
+#include "qemu/main-loop.h"
 
 static AioContext *ctx;
 static ThreadPool *pool;
@@ -224,15 +225,9 @@ static void test_cancel_async(void)
 int main(int argc, char **argv)
 {
     int ret;
-    Error *local_error = NULL;
 
-    init_clocks();
-
-    ctx = aio_context_new(&local_error);
-    if (!ctx) {
-        error_reportf_err(local_error, "Failed to create AIO Context: ");
-        exit(1);
-    }
+    qemu_init_main_loop(&error_abort);
+    ctx = qemu_get_current_aio_context();
     pool = aio_get_thread_pool(ctx);
 
     g_test_init(&argc, &argv, NULL);
@@ -245,6 +240,5 @@ int main(int argc, char **argv)
 
     ret = g_test_run();
 
-    aio_context_unref(ctx);
     return ret;
 }
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH 04/17] block: move AioContext and QEMUTimer to libqemuutil
  2017-01-20 16:43 [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Paolo Bonzini
                   ` (2 preceding siblings ...)
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 03/17] test-thread-pool: use generic AioContext infrastructure Paolo Bonzini
@ 2017-01-20 16:43 ` Paolo Bonzini
  2017-01-24  9:34   ` Daniel P. Berrange
  2017-01-30 15:24   ` Stefan Hajnoczi
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 05/17] io: add methods to set I/O handlers on AioContext Paolo Bonzini
                   ` (14 subsequent siblings)
  18 siblings, 2 replies; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-20 16:43 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz

AioContext is fairly self contained, the only dependency is QEMUTimer but
that in turn doesn't need anything else.  So move them out of block-obj-y
to avoid introducing a dependency from io/ to block-obj-y.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
        v1->v2: new patch [Daniel]

 Makefile.objs                       |  5 +---
 block/io.c                          | 29 -------------------
 stubs/Makefile.objs                 |  2 ++
 stubs/linux-aio.c                   | 32 +++++++++++++++++++++
 stubs/main-loop.c                   |  8 ++++++
 stubs/set-fd-handler.c              | 11 --------
 tests/Makefile.include              | 13 ++++-----
 util/Makefile.objs                  |  5 +++-
 aio-posix.c => util/aio-posix.c     |  0
 aio-win32.c => util/aio-win32.c     |  0
 util/aiocb.c                        | 55 +++++++++++++++++++++++++++++++++++++
 async.c => util/async.c             |  3 +-
 qemu-timer.c => util/qemu-timer.c   |  0
 thread-pool.c => util/thread-pool.c |  0
 14 files changed, 110 insertions(+), 53 deletions(-)
 create mode 100644 stubs/linux-aio.c
 create mode 100644 stubs/main-loop.c
 rename aio-posix.c => util/aio-posix.c (100%)
 rename aio-win32.c => util/aio-win32.c (100%)
 create mode 100644 util/aiocb.c
 rename async.c => util/async.c (99%)
 rename qemu-timer.c => util/qemu-timer.c (100%)
 rename thread-pool.c => util/thread-pool.c (100%)

diff --git a/Makefile.objs b/Makefile.objs
index 01cef86..432edf8 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -7,12 +7,9 @@ util-obj-y += qmp-introspect.o qapi-types.o qapi-visit.o qapi-event.o
 #######################################################################
 # block-obj-y is code used by both qemu system emulation and qemu-img
 
-block-obj-y = async.o thread-pool.o
 block-obj-y += nbd/
 block-obj-y += block.o blockjob.o
-block-obj-y += main-loop.o iohandler.o qemu-timer.o
-block-obj-$(CONFIG_POSIX) += aio-posix.o
-block-obj-$(CONFIG_WIN32) += aio-win32.o
+block-obj-y += main-loop.o iohandler.o
 block-obj-y += block/
 block-obj-y += qemu-io-cmds.o
 block-obj-$(CONFIG_REPLICATION) += replication.o
diff --git a/block/io.c b/block/io.c
index c42b34a..76dfaf4 100644
--- a/block/io.c
+++ b/block/io.c
@@ -2239,35 +2239,6 @@ BlockAIOCB *bdrv_aio_flush(BlockDriverState *bs,
     return &acb->common;
 }
 
-void *qemu_aio_get(const AIOCBInfo *aiocb_info, BlockDriverState *bs,
-                   BlockCompletionFunc *cb, void *opaque)
-{
-    BlockAIOCB *acb;
-
-    acb = g_malloc(aiocb_info->aiocb_size);
-    acb->aiocb_info = aiocb_info;
-    acb->bs = bs;
-    acb->cb = cb;
-    acb->opaque = opaque;
-    acb->refcnt = 1;
-    return acb;
-}
-
-void qemu_aio_ref(void *p)
-{
-    BlockAIOCB *acb = p;
-    acb->refcnt++;
-}
-
-void qemu_aio_unref(void *p)
-{
-    BlockAIOCB *acb = p;
-    assert(acb->refcnt > 0);
-    if (--acb->refcnt == 0) {
-        g_free(acb);
-    }
-}
-
 /**************************************************************/
 /* Coroutine block device emulation */
 
diff --git a/stubs/Makefile.objs b/stubs/Makefile.objs
index 2b5bb74..2329be7 100644
--- a/stubs/Makefile.objs
+++ b/stubs/Makefile.objs
@@ -21,7 +21,9 @@ stub-obj-y += get-vm-name.o
 stub-obj-y += iothread.o
 stub-obj-y += iothread-lock.o
 stub-obj-y += is-daemonized.o
+stub-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 stub-obj-y += machine-init-done.o
+stub-obj-y += main-loop.o
 stub-obj-y += migr-blocker.o
 stub-obj-y += mon-is-qmp.o
 stub-obj-y += monitor-init.o
diff --git a/stubs/linux-aio.c b/stubs/linux-aio.c
new file mode 100644
index 0000000..ed47bd4
--- /dev/null
+++ b/stubs/linux-aio.c
@@ -0,0 +1,32 @@
+/*
+ * Linux native AIO support.
+ *
+ * Copyright (C) 2009 IBM, Corp.
+ * Copyright (C) 2009 Red Hat, Inc.
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+#include "qemu/osdep.h"
+#include "block/aio.h"
+#include "block/raw-aio.h"
+
+void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context)
+{
+    abort();
+}
+
+void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context)
+{
+    abort();
+}
+
+LinuxAioState *laio_init(void)
+{
+    abort();
+}
+
+void laio_cleanup(LinuxAioState *s)
+{
+    abort();
+}
diff --git a/stubs/main-loop.c b/stubs/main-loop.c
new file mode 100644
index 0000000..a457a02
--- /dev/null
+++ b/stubs/main-loop.c
@@ -0,0 +1,8 @@
+#include "qemu/osdep.h"
+#include "qemu/main-loop.h"
+
+/* Stubs for main-loop.c */
+AioContext *qemu_get_aio_context(void)
+{
+    abort();
+}
diff --git a/stubs/set-fd-handler.c b/stubs/set-fd-handler.c
index acbe65c..26965de 100644
--- a/stubs/set-fd-handler.c
+++ b/stubs/set-fd-handler.c
@@ -9,14 +9,3 @@ void qemu_set_fd_handler(int fd,
 {
     abort();
 }
-
-void aio_set_fd_handler(AioContext *ctx,
-                        int fd,
-                        bool is_external,
-                        IOHandler *io_read,
-                        IOHandler *io_write,
-                        AioPollFn *io_poll,
-                        void *opaque)
-{
-    abort();
-}
diff --git a/tests/Makefile.include b/tests/Makefile.include
index 4478616..91dcd5c 100644
--- a/tests/Makefile.include
+++ b/tests/Makefile.include
@@ -45,9 +45,9 @@ check-unit-y += tests/test-visitor-serialization$(EXESUF)
 check-unit-y += tests/test-iov$(EXESUF)
 gcov-files-test-iov-y = util/iov.c
 check-unit-y += tests/test-aio$(EXESUF)
-gcov-files-test-aio-y = async.c
-gcov-files-test-aio-$(CONFIG_WIN32) += aio-win32.c
-gcov-files-test-aio-$(CONFIG_POSIX) += aio-posix.c
+gcov-files-test-aio-y = util/async.c util/qemu-timer.o
+gcov-files-test-aio-$(CONFIG_WIN32) += util/aio-win32.c
+gcov-files-test-aio-$(CONFIG_POSIX) += util/aio-posix.c
 check-unit-y += tests/test-aio-multithread$(EXESUF)
 gcov-files-test-aio-multithread-y = $(gcov-files-test-aio-y)
 gcov-files-test-aio-multithread-y += util/qemu-coroutine.c tests/iothread.c
@@ -493,7 +493,7 @@ tests/check-qjson$(EXESUF): tests/check-qjson.o $(test-util-obj-y)
 tests/check-qom-interface$(EXESUF): tests/check-qom-interface.o $(test-qom-obj-y)
 tests/check-qom-proplist$(EXESUF): tests/check-qom-proplist.o $(test-qom-obj-y)
 
-tests/test-char$(EXESUF): tests/test-char.o qemu-char.o qemu-timer.o $(test-util-obj-y) $(qtest-obj-y) $(test-io-obj-y)
+tests/test-char$(EXESUF): tests/test-char.o qemu-char.o $(qtest-obj-y) $(test-io-obj-y) $(test-util-obj-y)
 tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(test-block-obj-y)
 tests/test-aio$(EXESUF): tests/test-aio.o $(test-block-obj-y)
 tests/test-aio-multithread$(EXESUF): tests/test-aio-multithread.o $(test-block-obj-y)
@@ -526,8 +526,7 @@ tests/test-vmstate$(EXESUF): tests/test-vmstate.o \
 	migration/vmstate.o migration/qemu-file.o \
         migration/qemu-file-channel.o migration/qjson.o \
 	$(test-io-obj-y)
-tests/test-timed-average$(EXESUF): tests/test-timed-average.o qemu-timer.o \
-	$(test-util-obj-y)
+tests/test-timed-average$(EXESUF): tests/test-timed-average.o $(test-util-obj-y)
 tests/test-base64$(EXESUF): tests/test-base64.o \
 	libqemuutil.a libqemustub.a
 tests/ptimer-test$(EXESUF): tests/ptimer-test.o tests/ptimer-test-stubs.o hw/core/ptimer.o libqemustub.a
@@ -685,7 +684,7 @@ tests/usb-hcd-ehci-test$(EXESUF): tests/usb-hcd-ehci-test.o $(libqos-usb-obj-y)
 tests/usb-hcd-xhci-test$(EXESUF): tests/usb-hcd-xhci-test.o $(libqos-usb-obj-y)
 tests/pc-cpu-test$(EXESUF): tests/pc-cpu-test.o
 tests/postcopy-test$(EXESUF): tests/postcopy-test.o
-tests/vhost-user-test$(EXESUF): tests/vhost-user-test.o qemu-char.o qemu-timer.o $(qtest-obj-y) $(test-io-obj-y) $(libqos-virtio-obj-y) $(libqos-pc-obj-y)
+tests/vhost-user-test$(EXESUF): tests/vhost-user-test.o qemu-char.o $(qtest-obj-y) $(test-io-obj-y) $(libqos-virtio-obj-y) $(libqos-pc-obj-y)
 tests/qemu-iotests/socket_scm_helper$(EXESUF): tests/qemu-iotests/socket_scm_helper.o
 tests/test-qemu-opts$(EXESUF): tests/test-qemu-opts.o $(test-util-obj-y)
 tests/test-write-threshold$(EXESUF): tests/test-write-threshold.o $(test-block-obj-y)
diff --git a/util/Makefile.objs b/util/Makefile.objs
index c1f247d..8db62bf 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -1,14 +1,17 @@
 util-obj-y = osdep.o cutils.o unicode.o qemu-timer-common.o
 util-obj-y += bufferiszero.o
 util-obj-y += lockcnt.o
+util-obj-y += aiocb.o async.o thread-pool.o qemu-timer.o
+util-obj-$(CONFIG_POSIX) += aio-posix.o
 util-obj-$(CONFIG_POSIX) += compatfd.o
 util-obj-$(CONFIG_POSIX) += event_notifier-posix.o
 util-obj-$(CONFIG_POSIX) += mmap-alloc.o
 util-obj-$(CONFIG_POSIX) += oslib-posix.o
 util-obj-$(CONFIG_POSIX) += qemu-openpty.o
 util-obj-$(CONFIG_POSIX) += qemu-thread-posix.o
-util-obj-$(CONFIG_WIN32) += event_notifier-win32.o
 util-obj-$(CONFIG_POSIX) += memfd.o
+util-obj-$(CONFIG_WIN32) += aio-win32.o
+util-obj-$(CONFIG_WIN32) += event_notifier-win32.o
 util-obj-$(CONFIG_WIN32) += oslib-win32.o
 util-obj-$(CONFIG_WIN32) += qemu-thread-win32.o
 util-obj-y += envlist.o path.o module.o
diff --git a/aio-posix.c b/util/aio-posix.c
similarity index 100%
rename from aio-posix.c
rename to util/aio-posix.c
diff --git a/aio-win32.c b/util/aio-win32.c
similarity index 100%
rename from aio-win32.c
rename to util/aio-win32.c
diff --git a/util/aiocb.c b/util/aiocb.c
new file mode 100644
index 0000000..305a9cf
--- /dev/null
+++ b/util/aiocb.c
@@ -0,0 +1,55 @@
+/*
+ * BlockAIOCB allocation
+ *
+ * Copyright (c) 2003-2017 Fabrice Bellard and the QEMU team
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#include "qemu/osdep.h"
+#include "block/aio.h"
+
+void *qemu_aio_get(const AIOCBInfo *aiocb_info, BlockDriverState *bs,
+                   BlockCompletionFunc *cb, void *opaque)
+{
+    BlockAIOCB *acb;
+
+    acb = g_malloc(aiocb_info->aiocb_size);
+    acb->aiocb_info = aiocb_info;
+    acb->bs = bs;
+    acb->cb = cb;
+    acb->opaque = opaque;
+    acb->refcnt = 1;
+    return acb;
+}
+
+void qemu_aio_ref(void *p)
+{
+    BlockAIOCB *acb = p;
+    acb->refcnt++;
+}
+
+void qemu_aio_unref(void *p)
+{
+    BlockAIOCB *acb = p;
+    assert(acb->refcnt > 0);
+    if (--acb->refcnt == 0) {
+        g_free(acb);
+    }
+}
diff --git a/async.c b/util/async.c
similarity index 99%
rename from async.c
rename to util/async.c
index 1338682..44c9c3b 100644
--- a/async.c
+++ b/util/async.c
@@ -1,7 +1,8 @@
 /*
- * QEMU System Emulator
+ * Data plane event loop
  *
  * Copyright (c) 2003-2008 Fabrice Bellard
+ * Copyright (c) 2009-2017 the QEMU team
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
  * of this software and associated documentation files (the "Software"), to deal
diff --git a/qemu-timer.c b/util/qemu-timer.c
similarity index 100%
rename from qemu-timer.c
rename to util/qemu-timer.c
diff --git a/thread-pool.c b/util/thread-pool.c
similarity index 100%
rename from thread-pool.c
rename to util/thread-pool.c
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH 05/17] io: add methods to set I/O handlers on AioContext
  2017-01-20 16:43 [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Paolo Bonzini
                   ` (3 preceding siblings ...)
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 04/17] block: move AioContext and QEMUTimer to libqemuutil Paolo Bonzini
@ 2017-01-20 16:43 ` Paolo Bonzini
  2017-01-24  9:35   ` Daniel P. Berrange
  2017-01-30 15:32   ` Stefan Hajnoczi
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 06/17] io: make qio_channel_yield aware of AioContexts Paolo Bonzini
                   ` (13 subsequent siblings)
  18 siblings, 2 replies; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-20 16:43 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz, Daniel P . Berrange

This is in preparation for making qio_channel_yield work on
AioContexts other than the main one.

Cc: Daniel P. Berrange <berrange@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
        v1->v2: removed QIOChannelRestart [Daniel]

 include/io/channel.h | 25 +++++++++++++++++++++++++
 io/channel-command.c | 13 +++++++++++++
 io/channel-file.c    | 11 +++++++++++
 io/channel-socket.c  | 16 +++++++++++-----
 io/channel-tls.c     | 12 ++++++++++++
 io/channel-watch.c   |  6 ++++++
 io/channel.c         | 11 +++++++++++
 7 files changed, 89 insertions(+), 5 deletions(-)

diff --git a/include/io/channel.h b/include/io/channel.h
index 32a9470..0bc7c3f 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -23,6 +23,7 @@
 
 #include "qemu-common.h"
 #include "qom/object.h"
+#include "block/aio.h"
 
 #define TYPE_QIO_CHANNEL "qio-channel"
 #define QIO_CHANNEL(obj)                                    \
@@ -132,6 +133,11 @@ struct QIOChannelClass {
                      off_t offset,
                      int whence,
                      Error **errp);
+    void (*io_set_aio_fd_handler)(QIOChannel *ioc,
+                                  AioContext *ctx,
+                                  IOHandler *io_read,
+                                  IOHandler *io_write,
+                                  void *opaque);
 };
 
 /* General I/O handling functions */
@@ -525,4 +531,23 @@ void qio_channel_yield(QIOChannel *ioc,
 void qio_channel_wait(QIOChannel *ioc,
                       GIOCondition condition);
 
+/**
+ * qio_channel_set_aio_fd_handler:
+ * @ioc: the channel object
+ * @ctx: the AioContext to set the handlers on
+ * @io_read: the read handler
+ * @io_write: the write handler
+ * @opaque: the opaque value passed to the handler
+ *
+ * This is used internally by qio_channel_yield().  It can
+ * be used by channel implementations to forward the handlers
+ * to another channel (e.g. from #QIOChannelTLS to the
+ * underlying socket).
+ */
+void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
+                                    AioContext *ctx,
+                                    IOHandler *io_read,
+                                    IOHandler *io_write,
+                                    void *opaque);
+
 #endif /* QIO_CHANNEL_H */
diff --git a/io/channel-command.c b/io/channel-command.c
index ad25313..4000b61 100644
--- a/io/channel-command.c
+++ b/io/channel-command.c
@@ -328,6 +328,18 @@ static int qio_channel_command_close(QIOChannel *ioc,
 }
 
 
+static void qio_channel_command_set_aio_fd_handler(QIOChannel *ioc,
+                                                   AioContext *ctx,
+                                                   IOHandler *io_read,
+                                                   IOHandler *io_write,
+                                                   void *opaque)
+{
+    QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
+    aio_set_fd_handler(ctx, cioc->readfd, false, io_read, NULL, NULL, opaque);
+    aio_set_fd_handler(ctx, cioc->writefd, false, NULL, io_write, NULL, opaque);
+}
+
+
 static GSource *qio_channel_command_create_watch(QIOChannel *ioc,
                                                  GIOCondition condition)
 {
@@ -349,6 +361,7 @@ static void qio_channel_command_class_init(ObjectClass *klass,
     ioc_klass->io_set_blocking = qio_channel_command_set_blocking;
     ioc_klass->io_close = qio_channel_command_close;
     ioc_klass->io_create_watch = qio_channel_command_create_watch;
+    ioc_klass->io_set_aio_fd_handler = qio_channel_command_set_aio_fd_handler;
 }
 
 static const TypeInfo qio_channel_command_info = {
diff --git a/io/channel-file.c b/io/channel-file.c
index e1da243..b383273 100644
--- a/io/channel-file.c
+++ b/io/channel-file.c
@@ -186,6 +186,16 @@ static int qio_channel_file_close(QIOChannel *ioc,
 }
 
 
+static void qio_channel_file_set_aio_fd_handler(QIOChannel *ioc,
+                                                AioContext *ctx,
+                                                IOHandler *io_read,
+                                                IOHandler *io_write,
+                                                void *opaque)
+{
+    QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
+    aio_set_fd_handler(ctx, fioc->fd, false, io_read, io_write, NULL, opaque);
+}
+
 static GSource *qio_channel_file_create_watch(QIOChannel *ioc,
                                               GIOCondition condition)
 {
@@ -206,6 +216,7 @@ static void qio_channel_file_class_init(ObjectClass *klass,
     ioc_klass->io_seek = qio_channel_file_seek;
     ioc_klass->io_close = qio_channel_file_close;
     ioc_klass->io_create_watch = qio_channel_file_create_watch;
+    ioc_klass->io_set_aio_fd_handler = qio_channel_file_set_aio_fd_handler;
 }
 
 static const TypeInfo qio_channel_file_info = {
diff --git a/io/channel-socket.c b/io/channel-socket.c
index d7e03f6..3909f65 100644
--- a/io/channel-socket.c
+++ b/io/channel-socket.c
@@ -661,11 +661,6 @@ qio_channel_socket_set_blocking(QIOChannel *ioc,
         qemu_set_block(sioc->fd);
     } else {
         qemu_set_nonblock(sioc->fd);
-#ifdef WIN32
-        WSAEventSelect(sioc->fd, ioc->event,
-                       FD_READ | FD_ACCEPT | FD_CLOSE |
-                       FD_CONNECT | FD_WRITE | FD_OOB);
-#endif
     }
     return 0;
 }
@@ -745,6 +740,16 @@ qio_channel_socket_shutdown(QIOChannel *ioc,
     return 0;
 }
 
+static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
+                                                  AioContext *ctx,
+                                                  IOHandler *io_read,
+                                                  IOHandler *io_write,
+                                                  void *opaque)
+{
+    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+    aio_set_fd_handler(ctx, sioc->fd, false, io_read, io_write, NULL, opaque);
+}
+
 static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,
                                                 GIOCondition condition)
 {
@@ -767,6 +772,7 @@ static void qio_channel_socket_class_init(ObjectClass *klass,
     ioc_klass->io_set_cork = qio_channel_socket_set_cork;
     ioc_klass->io_set_delay = qio_channel_socket_set_delay;
     ioc_klass->io_create_watch = qio_channel_socket_create_watch;
+    ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler;
 }
 
 static const TypeInfo qio_channel_socket_info = {
diff --git a/io/channel-tls.c b/io/channel-tls.c
index d24dc8c..fa3f93e 100644
--- a/io/channel-tls.c
+++ b/io/channel-tls.c
@@ -349,6 +349,17 @@ static int qio_channel_tls_close(QIOChannel *ioc,
     return qio_channel_close(tioc->master, errp);
 }
 
+static void qio_channel_tls_set_aio_fd_handler(QIOChannel *ioc,
+                                               AioContext *ctx,
+                                               IOHandler *io_read,
+                                               IOHandler *io_write,
+                                               void *opaque)
+{
+    QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);
+
+    qio_channel_set_aio_fd_handler(tioc->master, ctx, io_read, io_write, opaque);
+}
+
 static GSource *qio_channel_tls_create_watch(QIOChannel *ioc,
                                              GIOCondition condition)
 {
@@ -376,6 +387,7 @@ static void qio_channel_tls_class_init(ObjectClass *klass,
     ioc_klass->io_close = qio_channel_tls_close;
     ioc_klass->io_shutdown = qio_channel_tls_shutdown;
     ioc_klass->io_create_watch = qio_channel_tls_create_watch;
+    ioc_klass->io_set_aio_fd_handler = qio_channel_tls_set_aio_fd_handler;
 }
 
 static const TypeInfo qio_channel_tls_info = {
diff --git a/io/channel-watch.c b/io/channel-watch.c
index cf1cdff..8640d1c 100644
--- a/io/channel-watch.c
+++ b/io/channel-watch.c
@@ -285,6 +285,12 @@ GSource *qio_channel_create_socket_watch(QIOChannel *ioc,
     GSource *source;
     QIOChannelSocketSource *ssource;
 
+#ifdef WIN32
+    WSAEventSelect(socket, ioc->event,
+                   FD_READ | FD_ACCEPT | FD_CLOSE |
+                   FD_CONNECT | FD_WRITE | FD_OOB);
+#endif
+
     source = g_source_new(&qio_channel_socket_source_funcs,
                           sizeof(QIOChannelSocketSource));
     ssource = (QIOChannelSocketSource *)source;
diff --git a/io/channel.c b/io/channel.c
index 80924c1..ce470d7 100644
--- a/io/channel.c
+++ b/io/channel.c
@@ -154,6 +154,17 @@ GSource *qio_channel_create_watch(QIOChannel *ioc,
 }
 
 
+void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
+                                    AioContext *ctx,
+                                    IOHandler *io_read,
+                                    IOHandler *io_write,
+                                    void *opaque)
+{
+    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+    klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
+}
+
 guint qio_channel_add_watch(QIOChannel *ioc,
                             GIOCondition condition,
                             QIOChannelFunc func,
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH 06/17] io: make qio_channel_yield aware of AioContexts
  2017-01-20 16:43 [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Paolo Bonzini
                   ` (4 preceding siblings ...)
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 05/17] io: add methods to set I/O handlers on AioContext Paolo Bonzini
@ 2017-01-20 16:43 ` Paolo Bonzini
  2017-01-24  9:36   ` Daniel P. Berrange
  2017-01-30 15:37   ` Stefan Hajnoczi
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 07/17] nbd: convert to use qio_channel_yield Paolo Bonzini
                   ` (12 subsequent siblings)
  18 siblings, 2 replies; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-20 16:43 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz, Daniel P . Berrange

Support separate coroutines for reading and writing, and place the
read/write handlers on the AioContext that the QIOChannel is registered
with.

Cc: Daniel P. Berrange <berrange@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
        v1->v2:
        improved qio_channel_set_aio_context docs, renamed to
                qio_channel_attach_aio_context [Daniel, Stefan]
        fixed pasto in "io: make qio_channel_yield aware of AioContexts"

 include/io/channel.h | 47 ++++++++++++++++++++++++++--
 io/channel.c         | 86 +++++++++++++++++++++++++++++++++++++++-------------
 2 files changed, 109 insertions(+), 24 deletions(-)

diff --git a/include/io/channel.h b/include/io/channel.h
index 0bc7c3f..8e5f721 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -23,6 +23,7 @@
 
 #include "qemu-common.h"
 #include "qom/object.h"
+#include "qemu/coroutine.h"
 #include "block/aio.h"
 
 #define TYPE_QIO_CHANNEL "qio-channel"
@@ -81,6 +82,9 @@ struct QIOChannel {
     Object parent;
     unsigned int features; /* bitmask of QIOChannelFeatures */
     char *name;
+    AioContext *ctx;
+    Coroutine *read_coroutine;
+    Coroutine *write_coroutine;
 #ifdef _WIN32
     HANDLE event; /* For use with GSource on Win32 */
 #endif
@@ -503,13 +507,50 @@ guint qio_channel_add_watch(QIOChannel *ioc,
 
 
 /**
+ * qio_channel_attach_aio_context:
+ * @ioc: the channel object
+ * @ctx: the #AioContext to set the handlers on
+ *
+ * Request that qio_channel_yield() sets I/O handlers on
+ * the given #AioContext.  If @ctx is %NULL, qio_channel_yield()
+ * uses QEMU's main thread event loop.
+ *
+ * You can move a #QIOChannel from one #AioContext to another even if
+ * I/O handlers are set for a coroutine.  However, #QIOChannel provides
+ * no synchronization between the calls to qio_channel_yield() and
+ * qio_channel_attach_aio_context().
+ *
+ * Therefore you should first call qio_channel_detach_aio_context()
+ * to ensure that the coroutine is not entered concurrently.  Then,
+ * while the coroutine has yielded, call qio_channel_attach_aio_context(),
+ * and then aio_co_schedule() to place the coroutine on the new
+ * #AioContext.  The calls to qio_channel_detach_aio_context()
+ * and qio_channel_attach_aio_context() should be protected with
+ * aio_context_acquire() and aio_context_release().
+ */
+void qio_channel_attach_aio_context(QIOChannel *ioc,
+                                    AioContext *ctx);
+
+/**
+ * qio_channel_detach_aio_context:
+ * @ioc: the channel object
+ *
+ * Disable any I/O handlers set by qio_channel_yield().  With the
+ * help of aio_co_schedule(), this allows moving a coroutine that was
+ * paused by qio_channel_yield() to another context.
+ */
+void qio_channel_detach_aio_context(QIOChannel *ioc);
+
+/**
  * qio_channel_yield:
  * @ioc: the channel object
  * @condition: the I/O condition to wait for
  *
- * Yields execution from the current coroutine until
- * the condition indicated by @condition becomes
- * available.
+ * Yields execution from the current coroutine until the condition
+ * indicated by @condition becomes available.  @condition must
+ * be either %G_IO_IN or %G_IO_OUT; it cannot contain both.  In
+ * addition, no two coroutine can be waiting on the same condition
+ * and channel at the same time.
  *
  * This must only be called from coroutine context
  */
diff --git a/io/channel.c b/io/channel.c
index ce470d7..31a6d39 100644
--- a/io/channel.c
+++ b/io/channel.c
@@ -21,7 +21,7 @@
 #include "qemu/osdep.h"
 #include "io/channel.h"
 #include "qapi/error.h"
-#include "qemu/coroutine.h"
+#include "qemu/main-loop.h"
 
 bool qio_channel_has_feature(QIOChannel *ioc,
                              QIOChannelFeature feature)
@@ -238,36 +238,80 @@ off_t qio_channel_io_seek(QIOChannel *ioc,
 }
 
 
-typedef struct QIOChannelYieldData QIOChannelYieldData;
-struct QIOChannelYieldData {
-    QIOChannel *ioc;
-    Coroutine *co;
-};
+static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc);
+
+static void qio_channel_restart_read(void *opaque)
+{
+    QIOChannel *ioc = opaque;
+    Coroutine *co = ioc->read_coroutine;
 
+    ioc->read_coroutine = NULL;
+    qio_channel_set_aio_fd_handlers(ioc);
+    aio_co_wake(co);
+}
 
-static gboolean qio_channel_yield_enter(QIOChannel *ioc,
-                                        GIOCondition condition,
-                                        gpointer opaque)
+static void qio_channel_restart_write(void *opaque)
 {
-    QIOChannelYieldData *data = opaque;
-    qemu_coroutine_enter(data->co);
-    return FALSE;
+    QIOChannel *ioc = opaque;
+    Coroutine *co = ioc->write_coroutine;
+
+    ioc->write_coroutine = NULL;
+    qio_channel_set_aio_fd_handlers(ioc);
+    aio_co_wake(co);
 }
 
+static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
+{
+    IOHandler *rd_handler = NULL, *wr_handler = NULL;
+    AioContext *ctx;
+
+    if (ioc->read_coroutine) {
+        rd_handler = qio_channel_restart_read;
+    }
+    if (ioc->write_coroutine) {
+        wr_handler = qio_channel_restart_write;
+    }
+
+    ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
+    qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc);
+}
+
+void qio_channel_attach_aio_context(QIOChannel *ioc,
+                                    AioContext *ctx)
+{
+    AioContext *old_ctx;
+    if (ioc->ctx == ctx) {
+        return;
+    }
+
+    old_ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
+    qio_channel_set_aio_fd_handler(ioc, old_ctx, NULL, NULL, NULL);
+    ioc->ctx = ctx;
+    qio_channel_set_aio_fd_handlers(ioc);
+}
+
+void qio_channel_detach_aio_context(QIOChannel *ioc)
+{
+    ioc->read_coroutine = NULL;
+    ioc->write_coroutine = NULL;
+    qio_channel_set_aio_fd_handlers(ioc);
+    ioc->ctx = NULL;
+}
 
 void coroutine_fn qio_channel_yield(QIOChannel *ioc,
                                     GIOCondition condition)
 {
-    QIOChannelYieldData data;
-
     assert(qemu_in_coroutine());
-    data.ioc = ioc;
-    data.co = qemu_coroutine_self();
-    qio_channel_add_watch(ioc,
-                          condition,
-                          qio_channel_yield_enter,
-                          &data,
-                          NULL);
+    if (condition == G_IO_IN) {
+        assert(!ioc->read_coroutine);
+        ioc->read_coroutine = qemu_coroutine_self();
+    } else if (condition == G_IO_OUT) {
+        assert(!ioc->write_coroutine);
+        ioc->write_coroutine = qemu_coroutine_self();
+    } else {
+        abort();
+    }
+    qio_channel_set_aio_fd_handlers(ioc);
     qemu_coroutine_yield();
 }
 
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH 07/17] nbd: convert to use qio_channel_yield
  2017-01-20 16:43 [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Paolo Bonzini
                   ` (5 preceding siblings ...)
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 06/17] io: make qio_channel_yield aware of AioContexts Paolo Bonzini
@ 2017-01-20 16:43 ` Paolo Bonzini
  2017-01-30 15:50   ` Stefan Hajnoczi
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 08/17] coroutine-lock: reschedule coroutine on the AioContext it was running on Paolo Bonzini
                   ` (11 subsequent siblings)
  18 siblings, 1 reply; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-20 16:43 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz

In the client, read the reply headers from a coroutine, switching the
read side between the "read header" coroutine and the I/O coroutine that
reads the body of the reply.

In the server, if the server can read more requests it will create a new
"read request" coroutine as soon as a request has been read.  Otherwise,
the new coroutine is created in nbd_request_put.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
        v1->v2:
        allow NULL s->read_reply_co [Fam]
        converted NBD server to qio_channel_yield too [Fam]

 block/nbd-client.c | 108 +++++++++++++++++++++--------------------------------
 block/nbd-client.h |   2 +-
 nbd/client.c       |   2 +-
 nbd/common.c       |   9 +----
 nbd/server.c       |  94 ++++++++++++++--------------------------------
 5 files changed, 74 insertions(+), 141 deletions(-)

diff --git a/block/nbd-client.c b/block/nbd-client.c
index 06f1532..6ea54f9 100644
--- a/block/nbd-client.c
+++ b/block/nbd-client.c
@@ -33,8 +33,9 @@
 #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
 #define INDEX_TO_HANDLE(bs, index)  ((index)  ^ ((uint64_t)(intptr_t)bs))
 
-static void nbd_recv_coroutines_enter_all(NBDClientSession *s)
+static void nbd_recv_coroutines_enter_all(BlockDriverState *bs)
 {
+    NBDClientSession *s = nbd_get_client_session(bs);
     int i;
 
     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
@@ -42,6 +43,7 @@ static void nbd_recv_coroutines_enter_all(NBDClientSession *s)
             qemu_coroutine_enter(s->recv_coroutine[i]);
         }
     }
+    BDRV_POLL_WHILE(bs, s->read_reply_co);
 }
 
 static void nbd_teardown_connection(BlockDriverState *bs)
@@ -56,7 +58,7 @@ static void nbd_teardown_connection(BlockDriverState *bs)
     qio_channel_shutdown(client->ioc,
                          QIO_CHANNEL_SHUTDOWN_BOTH,
                          NULL);
-    nbd_recv_coroutines_enter_all(client);
+    nbd_recv_coroutines_enter_all(bs);
 
     nbd_client_detach_aio_context(bs);
     object_unref(OBJECT(client->sioc));
@@ -65,54 +67,34 @@ static void nbd_teardown_connection(BlockDriverState *bs)
     client->ioc = NULL;
 }
 
-static void nbd_reply_ready(void *opaque)
+static coroutine_fn void nbd_read_reply_entry(void *opaque)
 {
-    BlockDriverState *bs = opaque;
-    NBDClientSession *s = nbd_get_client_session(bs);
+    NBDClientSession *s = opaque;
     uint64_t i;
     int ret;
 
-    if (!s->ioc) { /* Already closed */
-        return;
-    }
-
-    if (s->reply.handle == 0) {
-        /* No reply already in flight.  Fetch a header.  It is possible
-         * that another thread has done the same thing in parallel, so
-         * the socket is not readable anymore.
-         */
+    for (;;) {
+        assert(s->reply.handle == 0);
         ret = nbd_receive_reply(s->ioc, &s->reply);
-        if (ret == -EAGAIN) {
-            return;
-        }
         if (ret < 0) {
-            s->reply.handle = 0;
-            goto fail;
+            break;
         }
-    }
-
-    /* There's no need for a mutex on the receive side, because the
-     * handler acts as a synchronization point and ensures that only
-     * one coroutine is called until the reply finishes.  */
-    i = HANDLE_TO_INDEX(s, s->reply.handle);
-    if (i >= MAX_NBD_REQUESTS) {
-        goto fail;
-    }
-
-    if (s->recv_coroutine[i]) {
-        qemu_coroutine_enter(s->recv_coroutine[i]);
-        return;
-    }
 
-fail:
-    nbd_teardown_connection(bs);
-}
+        /* There's no need for a mutex on the receive side, because the
+         * handler acts as a synchronization point and ensures that only
+         * one coroutine is called until the reply finishes.
+         */
+        i = HANDLE_TO_INDEX(s, s->reply.handle);
+        if (i >= MAX_NBD_REQUESTS || !s->recv_coroutine[i]) {
+            break;
+        }
 
-static void nbd_restart_write(void *opaque)
-{
-    BlockDriverState *bs = opaque;
+        aio_co_wake(s->recv_coroutine[i]);
 
-    qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine);
+        /* We're woken up by the recv_coroutine itself.  */
+        qemu_coroutine_yield();
+    }
+    s->read_reply_co = NULL;
 }
 
 static int nbd_co_send_request(BlockDriverState *bs,
@@ -120,7 +102,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
                                QEMUIOVector *qiov)
 {
     NBDClientSession *s = nbd_get_client_session(bs);
-    AioContext *aio_context;
     int rc, ret, i;
 
     qemu_co_mutex_lock(&s->send_mutex);
@@ -141,11 +122,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
         return -EPIPE;
     }
 
-    s->send_coroutine = qemu_coroutine_self();
-    aio_context = bdrv_get_aio_context(bs);
-
-    aio_set_fd_handler(aio_context, s->sioc->fd, false,
-                       nbd_reply_ready, nbd_restart_write, NULL, bs);
     if (qiov) {
         qio_channel_set_cork(s->ioc, true);
         rc = nbd_send_request(s->ioc, request);
@@ -160,9 +136,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
     } else {
         rc = nbd_send_request(s->ioc, request);
     }
-    aio_set_fd_handler(aio_context, s->sioc->fd, false,
-                       nbd_reply_ready, NULL, NULL, bs);
-    s->send_coroutine = NULL;
     qemu_co_mutex_unlock(&s->send_mutex);
     return rc;
 }
@@ -174,8 +147,7 @@ static void nbd_co_receive_reply(NBDClientSession *s,
 {
     int ret;
 
-    /* Wait until we're woken up by the read handler.  TODO: perhaps
-     * peek at the next reply and avoid yielding if it's ours?  */
+    /* Wait until we're woken up by nbd_read_reply_entry.  */
     qemu_coroutine_yield();
     *reply = s->reply;
     if (reply->handle != request->handle ||
@@ -209,13 +181,19 @@ static void nbd_coroutine_start(NBDClientSession *s,
     /* s->recv_coroutine[i] is set as soon as we get the send_lock.  */
 }
 
-static void nbd_coroutine_end(NBDClientSession *s,
+static void nbd_coroutine_end(BlockDriverState *bs,
                               NBDRequest *request)
 {
+    NBDClientSession *s = nbd_get_client_session(bs);
     int i = HANDLE_TO_INDEX(s, request->handle);
+
     s->recv_coroutine[i] = NULL;
-    if (s->in_flight-- == MAX_NBD_REQUESTS) {
-        qemu_co_queue_next(&s->free_sema);
+    s->in_flight--;
+    qemu_co_queue_next(&s->free_sema);
+
+    /* Kick the read_reply_co to get the next reply.  */
+    if (s->read_reply_co) {
+        aio_co_wake(s->read_reply_co);
     }
 }
 
@@ -241,7 +219,7 @@ int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
     } else {
         nbd_co_receive_reply(client, &request, &reply, qiov);
     }
-    nbd_coroutine_end(client, &request);
+    nbd_coroutine_end(bs, &request);
     return -reply.error;
 }
 
@@ -271,7 +249,7 @@ int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
     } else {
         nbd_co_receive_reply(client, &request, &reply, NULL);
     }
-    nbd_coroutine_end(client, &request);
+    nbd_coroutine_end(bs, &request);
     return -reply.error;
 }
 
@@ -306,7 +284,7 @@ int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
     } else {
         nbd_co_receive_reply(client, &request, &reply, NULL);
     }
-    nbd_coroutine_end(client, &request);
+    nbd_coroutine_end(bs, &request);
     return -reply.error;
 }
 
@@ -331,7 +309,7 @@ int nbd_client_co_flush(BlockDriverState *bs)
     } else {
         nbd_co_receive_reply(client, &request, &reply, NULL);
     }
-    nbd_coroutine_end(client, &request);
+    nbd_coroutine_end(bs, &request);
     return -reply.error;
 }
 
@@ -357,23 +335,23 @@ int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int count)
     } else {
         nbd_co_receive_reply(client, &request, &reply, NULL);
     }
-    nbd_coroutine_end(client, &request);
+    nbd_coroutine_end(bs, &request);
     return -reply.error;
 
 }
 
 void nbd_client_detach_aio_context(BlockDriverState *bs)
 {
-    aio_set_fd_handler(bdrv_get_aio_context(bs),
-                       nbd_get_client_session(bs)->sioc->fd,
-                       false, NULL, NULL, NULL, NULL);
+    NBDClientSession *client = nbd_get_client_session(bs);
+    qio_channel_detach_aio_context(QIO_CHANNEL(client->sioc));
 }
 
 void nbd_client_attach_aio_context(BlockDriverState *bs,
                                    AioContext *new_context)
 {
-    aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sioc->fd,
-                       false, nbd_reply_ready, NULL, NULL, bs);
+    NBDClientSession *client = nbd_get_client_session(bs);
+    qio_channel_attach_aio_context(QIO_CHANNEL(client->sioc), new_context);
+    aio_co_schedule(new_context, client->read_reply_co);
 }
 
 void nbd_client_close(BlockDriverState *bs)
@@ -434,7 +412,7 @@ int nbd_client_init(BlockDriverState *bs,
     /* Now that we're connected, set the socket to be non-blocking and
      * kick the reply mechanism.  */
     qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
-
+    client->read_reply_co = qemu_coroutine_create(nbd_read_reply_entry, client);
     nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
 
     logout("Established connection with NBD server\n");
diff --git a/block/nbd-client.h b/block/nbd-client.h
index f8d6006..8cdfc92 100644
--- a/block/nbd-client.h
+++ b/block/nbd-client.h
@@ -25,7 +25,7 @@ typedef struct NBDClientSession {
 
     CoMutex send_mutex;
     CoQueue free_sema;
-    Coroutine *send_coroutine;
+    Coroutine *read_reply_co;
     int in_flight;
 
     Coroutine *recv_coroutine[MAX_NBD_REQUESTS];
diff --git a/nbd/client.c b/nbd/client.c
index ffb0743..5c9dee3 100644
--- a/nbd/client.c
+++ b/nbd/client.c
@@ -778,7 +778,7 @@ ssize_t nbd_receive_reply(QIOChannel *ioc, NBDReply *reply)
     ssize_t ret;
 
     ret = read_sync(ioc, buf, sizeof(buf));
-    if (ret < 0) {
+    if (ret <= 0) {
         return ret;
     }
 
diff --git a/nbd/common.c b/nbd/common.c
index b583a4f..805dbdc 100644
--- a/nbd/common.c
+++ b/nbd/common.c
@@ -43,14 +43,7 @@ ssize_t nbd_wr_syncv(QIOChannel *ioc,
         }
         if (len == QIO_CHANNEL_ERR_BLOCK) {
             if (qemu_in_coroutine()) {
-                /* XXX figure out if we can create a variant on
-                 * qio_channel_yield() that works with AIO contexts
-                 * and consider using that in this branch */
-                qemu_coroutine_yield();
-            } else if (done) {
-                /* XXX this is needed by nbd_reply_ready.  */
-                qio_channel_wait(ioc,
-                                 do_read ? G_IO_IN : G_IO_OUT);
+                qio_channel_yield(ioc, do_read ? G_IO_IN : G_IO_OUT);
             } else {
                 return -EAGAIN;
             }
diff --git a/nbd/server.c b/nbd/server.c
index efe5cb8..1130055 100644
--- a/nbd/server.c
+++ b/nbd/server.c
@@ -95,8 +95,6 @@ struct NBDClient {
     CoMutex send_lock;
     Coroutine *send_coroutine;
 
-    bool can_read;
-
     QTAILQ_ENTRY(NBDClient) next;
     int nb_requests;
     bool closing;
@@ -104,9 +102,7 @@ struct NBDClient {
 
 /* That's all folks */
 
-static void nbd_set_handlers(NBDClient *client);
-static void nbd_unset_handlers(NBDClient *client);
-static void nbd_update_can_read(NBDClient *client);
+static void nbd_client_receive_next_request(NBDClient *client);
 
 static gboolean nbd_negotiate_continue(QIOChannel *ioc,
                                        GIOCondition condition,
@@ -785,7 +781,7 @@ void nbd_client_put(NBDClient *client)
          */
         assert(client->closing);
 
-        nbd_unset_handlers(client);
+        qio_channel_detach_aio_context(client->ioc);
         object_unref(OBJECT(client->sioc));
         object_unref(OBJECT(client->ioc));
         if (client->tlscreds) {
@@ -826,7 +822,6 @@ static NBDRequestData *nbd_request_get(NBDClient *client)
 
     assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
     client->nb_requests++;
-    nbd_update_can_read(client);
 
     req = g_new0(NBDRequestData, 1);
     nbd_client_get(client);
@@ -844,7 +839,8 @@ static void nbd_request_put(NBDRequestData *req)
     g_free(req);
 
     client->nb_requests--;
-    nbd_update_can_read(client);
+    nbd_client_receive_next_request(client);
+
     nbd_client_put(client);
 }
 
@@ -858,7 +854,13 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
     exp->ctx = ctx;
 
     QTAILQ_FOREACH(client, &exp->clients, next) {
-        nbd_set_handlers(client);
+        qio_channel_attach_aio_context(client->ioc, ctx);
+        if (client->recv_coroutine) {
+            aio_co_schedule(ctx, client->recv_coroutine);
+        }
+        if (client->send_coroutine) {
+            aio_co_schedule(ctx, client->send_coroutine);
+        }
     }
 }
 
@@ -870,7 +872,7 @@ static void blk_aio_detach(void *opaque)
     TRACE("Export %s: Detaching clients from AIO context %p\n", exp->name, exp->ctx);
 
     QTAILQ_FOREACH(client, &exp->clients, next) {
-        nbd_unset_handlers(client);
+        qio_channel_detach_aio_context(client->ioc);
     }
 
     exp->ctx = NULL;
@@ -1045,7 +1047,6 @@ static ssize_t nbd_co_send_reply(NBDRequestData *req, NBDReply *reply,
     g_assert(qemu_in_coroutine());
     qemu_co_mutex_lock(&client->send_lock);
     client->send_coroutine = qemu_coroutine_self();
-    nbd_set_handlers(client);
 
     if (!len) {
         rc = nbd_send_reply(client->ioc, reply);
@@ -1062,7 +1063,6 @@ static ssize_t nbd_co_send_reply(NBDRequestData *req, NBDReply *reply,
     }
 
     client->send_coroutine = NULL;
-    nbd_set_handlers(client);
     qemu_co_mutex_unlock(&client->send_lock);
     return rc;
 }
@@ -1079,9 +1079,7 @@ static ssize_t nbd_co_receive_request(NBDRequestData *req,
     ssize_t rc;
 
     g_assert(qemu_in_coroutine());
-    client->recv_coroutine = qemu_coroutine_self();
-    nbd_update_can_read(client);
-
+    assert(client->recv_coroutine == qemu_coroutine_self());
     rc = nbd_receive_request(client->ioc, request);
     if (rc < 0) {
         if (rc != -EAGAIN) {
@@ -1163,23 +1161,25 @@ static ssize_t nbd_co_receive_request(NBDRequestData *req,
 
 out:
     client->recv_coroutine = NULL;
-    nbd_update_can_read(client);
+    nbd_client_receive_next_request(client);
 
     return rc;
 }
 
-static void nbd_trip(void *opaque)
+/* Owns a reference to the NBDClient passed as opaque.  */
+static coroutine_fn void nbd_trip(void *opaque)
 {
     NBDClient *client = opaque;
     NBDExport *exp = client->exp;
     NBDRequestData *req;
-    NBDRequest request;
+    NBDRequest request = { 0 };    /* GCC thinks it can be used uninitialized */
     NBDReply reply;
     ssize_t ret;
     int flags;
 
     TRACE("Reading request.");
     if (client->closing) {
+        nbd_client_put(client);
         return;
     }
 
@@ -1338,60 +1338,21 @@ static void nbd_trip(void *opaque)
 
 done:
     nbd_request_put(req);
+    nbd_client_put(client);
     return;
 
 out:
     nbd_request_put(req);
     client_close(client);
+    nbd_client_put(client);
 }
 
-static void nbd_read(void *opaque)
-{
-    NBDClient *client = opaque;
-
-    if (client->recv_coroutine) {
-        qemu_coroutine_enter(client->recv_coroutine);
-    } else {
-        qemu_coroutine_enter(qemu_coroutine_create(nbd_trip, client));
-    }
-}
-
-static void nbd_restart_write(void *opaque)
-{
-    NBDClient *client = opaque;
-
-    qemu_coroutine_enter(client->send_coroutine);
-}
-
-static void nbd_set_handlers(NBDClient *client)
-{
-    if (client->exp && client->exp->ctx) {
-        aio_set_fd_handler(client->exp->ctx, client->sioc->fd, true,
-                           client->can_read ? nbd_read : NULL,
-                           client->send_coroutine ? nbd_restart_write : NULL,
-                           NULL, client);
-    }
-}
-
-static void nbd_unset_handlers(NBDClient *client)
-{
-    if (client->exp && client->exp->ctx) {
-        aio_set_fd_handler(client->exp->ctx, client->sioc->fd, true, NULL,
-                           NULL, NULL, NULL);
-    }
-}
-
-static void nbd_update_can_read(NBDClient *client)
+static void nbd_client_receive_next_request(NBDClient *client)
 {
-    bool can_read = client->recv_coroutine ||
-                    client->nb_requests < MAX_NBD_REQUESTS;
-
-    if (can_read != client->can_read) {
-        client->can_read = can_read;
-        nbd_set_handlers(client);
-
-        /* There is no need to invoke aio_notify(), since aio_set_fd_handler()
-         * in nbd_set_handlers() will have taken care of that */
+    if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) {
+        nbd_client_get(client);
+        client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
+        aio_co_schedule(client->exp->ctx, client->recv_coroutine);
     }
 }
 
@@ -1409,11 +1370,13 @@ static coroutine_fn void nbd_co_client_start(void *opaque)
         goto out;
     }
     qemu_co_mutex_init(&client->send_lock);
-    nbd_set_handlers(client);
 
     if (exp) {
         QTAILQ_INSERT_TAIL(&exp->clients, client, next);
     }
+
+    nbd_client_receive_next_request(client);
+
 out:
     g_free(data);
 }
@@ -1439,7 +1402,6 @@ void nbd_client_new(NBDExport *exp,
     object_ref(OBJECT(client->sioc));
     client->ioc = QIO_CHANNEL(sioc);
     object_ref(OBJECT(client->ioc));
-    client->can_read = true;
     client->close = close_fn;
 
     data->client = client;
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH 08/17] coroutine-lock: reschedule coroutine on the AioContext it was running on
  2017-01-20 16:43 [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Paolo Bonzini
                   ` (6 preceding siblings ...)
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 07/17] nbd: convert to use qio_channel_yield Paolo Bonzini
@ 2017-01-20 16:43 ` Paolo Bonzini
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 09/17] qed: introduce qed_aio_start_io and qed_aio_next_io_cb Paolo Bonzini
                   ` (10 subsequent siblings)
  18 siblings, 0 replies; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-20 16:43 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz

As a small step towards the introduction of multiqueue, we want
coroutines to remain on the same AioContext that started them,
unless they are moved explicitly with e.g. aio_co_schedule.  This patch
avoids that coroutines switch AioContext when they use a CoMutex.
For now it does not make much of a difference, because the CoMutex
is not thread-safe and the AioContext itself is used to protect the
CoMutex from concurrent access.  However, this is going to change.

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 util/qemu-coroutine-lock.c | 5 ++---
 util/trace-events          | 1 -
 2 files changed, 2 insertions(+), 4 deletions(-)

diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
index 14cf9ce..e6afd1a 100644
--- a/util/qemu-coroutine-lock.c
+++ b/util/qemu-coroutine-lock.c
@@ -27,6 +27,7 @@
 #include "qemu/coroutine.h"
 #include "qemu/coroutine_int.h"
 #include "qemu/queue.h"
+#include "block/aio.h"
 #include "trace.h"
 
 void qemu_co_queue_init(CoQueue *queue)
@@ -63,7 +64,6 @@ void qemu_co_queue_run_restart(Coroutine *co)
 
 static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
 {
-    Coroutine *self = qemu_coroutine_self();
     Coroutine *next;
 
     if (QSIMPLEQ_EMPTY(&queue->entries)) {
@@ -72,8 +72,7 @@ static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
 
     while ((next = QSIMPLEQ_FIRST(&queue->entries)) != NULL) {
         QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
-        QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, next, co_queue_next);
-        trace_qemu_co_queue_next(next);
+        aio_co_wake(next);
         if (single) {
             break;
         }
diff --git a/util/trace-events b/util/trace-events
index 2b8aa30..65705c4 100644
--- a/util/trace-events
+++ b/util/trace-events
@@ -13,7 +13,6 @@ qemu_coroutine_terminate(void *co) "self %p"
 
 # util/qemu-coroutine-lock.c
 qemu_co_queue_run_restart(void *co) "co %p"
-qemu_co_queue_next(void *nxt) "next %p"
 qemu_co_mutex_lock_entry(void *mutex, void *self) "mutex %p self %p"
 qemu_co_mutex_lock_return(void *mutex, void *self) "mutex %p self %p"
 qemu_co_mutex_unlock_entry(void *mutex, void *self) "mutex %p self %p"
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH 09/17] qed: introduce qed_aio_start_io and qed_aio_next_io_cb
  2017-01-20 16:43 [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Paolo Bonzini
                   ` (7 preceding siblings ...)
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 08/17] coroutine-lock: reschedule coroutine on the AioContext it was running on Paolo Bonzini
@ 2017-01-20 16:43 ` Paolo Bonzini
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 10/17] aio: push aio_context_acquire/release down to dispatching Paolo Bonzini
                   ` (9 subsequent siblings)
  18 siblings, 0 replies; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-20 16:43 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz

qed_aio_start_io and qed_aio_next_io will not have to acquire/release
the AioContext, while qed_aio_next_io_cb will.  Split the functionality
and gain a little type-safety in the process.

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/qed.c | 39 +++++++++++++++++++++++++--------------
 1 file changed, 25 insertions(+), 14 deletions(-)

diff --git a/block/qed.c b/block/qed.c
index 1a7ef0a..7f1c508 100644
--- a/block/qed.c
+++ b/block/qed.c
@@ -273,7 +273,19 @@ static CachedL2Table *qed_new_l2_table(BDRVQEDState *s)
     return l2_table;
 }
 
-static void qed_aio_next_io(void *opaque, int ret);
+static void qed_aio_next_io(QEDAIOCB *acb, int ret);
+
+static void qed_aio_start_io(QEDAIOCB *acb)
+{
+    qed_aio_next_io(acb, 0);
+}
+
+static void qed_aio_next_io_cb(void *opaque, int ret)
+{
+    QEDAIOCB *acb = opaque;
+
+    qed_aio_next_io(acb, ret);
+}
 
 static void qed_plug_allocating_write_reqs(BDRVQEDState *s)
 {
@@ -292,7 +304,7 @@ static void qed_unplug_allocating_write_reqs(BDRVQEDState *s)
 
     acb = QSIMPLEQ_FIRST(&s->allocating_write_reqs);
     if (acb) {
-        qed_aio_next_io(acb, 0);
+        qed_aio_start_io(acb);
     }
 }
 
@@ -959,7 +971,7 @@ static void qed_aio_complete(QEDAIOCB *acb, int ret)
         QSIMPLEQ_REMOVE_HEAD(&s->allocating_write_reqs, next);
         acb = QSIMPLEQ_FIRST(&s->allocating_write_reqs);
         if (acb) {
-            qed_aio_next_io(acb, 0);
+            qed_aio_start_io(acb);
         } else if (s->header.features & QED_F_NEED_CHECK) {
             qed_start_need_check_timer(s);
         }
@@ -984,7 +996,7 @@ static void qed_commit_l2_update(void *opaque, int ret)
     acb->request.l2_table = qed_find_l2_cache_entry(&s->l2_cache, l2_offset);
     assert(acb->request.l2_table != NULL);
 
-    qed_aio_next_io(opaque, ret);
+    qed_aio_next_io(acb, ret);
 }
 
 /**
@@ -1032,11 +1044,11 @@ static void qed_aio_write_l2_update(QEDAIOCB *acb, int ret, uint64_t offset)
     if (need_alloc) {
         /* Write out the whole new L2 table */
         qed_write_l2_table(s, &acb->request, 0, s->table_nelems, true,
-                            qed_aio_write_l1_update, acb);
+                           qed_aio_write_l1_update, acb);
     } else {
         /* Write out only the updated part of the L2 table */
         qed_write_l2_table(s, &acb->request, index, acb->cur_nclusters, false,
-                            qed_aio_next_io, acb);
+                           qed_aio_next_io_cb, acb);
     }
     return;
 
@@ -1088,7 +1100,7 @@ static void qed_aio_write_main(void *opaque, int ret)
     }
 
     if (acb->find_cluster_ret == QED_CLUSTER_FOUND) {
-        next_fn = qed_aio_next_io;
+        next_fn = qed_aio_next_io_cb;
     } else {
         if (s->bs->backing) {
             next_fn = qed_aio_write_flush_before_l2_update;
@@ -1201,7 +1213,7 @@ static void qed_aio_write_alloc(QEDAIOCB *acb, size_t len)
     if (acb->flags & QED_AIOCB_ZERO) {
         /* Skip ahead if the clusters are already zero */
         if (acb->find_cluster_ret == QED_CLUSTER_ZERO) {
-            qed_aio_next_io(acb, 0);
+            qed_aio_start_io(acb);
             return;
         }
 
@@ -1321,18 +1333,18 @@ static void qed_aio_read_data(void *opaque, int ret,
     /* Handle zero cluster and backing file reads */
     if (ret == QED_CLUSTER_ZERO) {
         qemu_iovec_memset(&acb->cur_qiov, 0, 0, acb->cur_qiov.size);
-        qed_aio_next_io(acb, 0);
+        qed_aio_start_io(acb);
         return;
     } else if (ret != QED_CLUSTER_FOUND) {
         qed_read_backing_file(s, acb->cur_pos, &acb->cur_qiov,
-                              &acb->backing_qiov, qed_aio_next_io, acb);
+                              &acb->backing_qiov, qed_aio_next_io_cb, acb);
         return;
     }
 
     BLKDBG_EVENT(bs->file, BLKDBG_READ_AIO);
     bdrv_aio_readv(bs->file, offset / BDRV_SECTOR_SIZE,
                    &acb->cur_qiov, acb->cur_qiov.size / BDRV_SECTOR_SIZE,
-                   qed_aio_next_io, acb);
+                   qed_aio_next_io_cb, acb);
     return;
 
 err:
@@ -1342,9 +1354,8 @@ err:
 /**
  * Begin next I/O or complete the request
  */
-static void qed_aio_next_io(void *opaque, int ret)
+static void qed_aio_next_io(QEDAIOCB *acb, int ret)
 {
-    QEDAIOCB *acb = opaque;
     BDRVQEDState *s = acb_to_s(acb);
     QEDFindClusterFunc *io_fn = (acb->flags & QED_AIOCB_WRITE) ?
                                 qed_aio_write_data : qed_aio_read_data;
@@ -1400,7 +1411,7 @@ static BlockAIOCB *qed_aio_setup(BlockDriverState *bs,
     qemu_iovec_init(&acb->cur_qiov, qiov->niov);
 
     /* Start request */
-    qed_aio_next_io(acb, 0);
+    qed_aio_start_io(acb);
     return &acb->common;
 }
 
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH 10/17] aio: push aio_context_acquire/release down to dispatching
  2017-01-20 16:43 [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Paolo Bonzini
                   ` (8 preceding siblings ...)
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 09/17] qed: introduce qed_aio_start_io and qed_aio_next_io_cb Paolo Bonzini
@ 2017-01-20 16:43 ` Paolo Bonzini
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 11/17] block: explicitly acquire aiocontext in timers that need it Paolo Bonzini
                   ` (8 subsequent siblings)
  18 siblings, 0 replies; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-20 16:43 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz

The AioContext data structures are now protected by list_lock and/or
they are walked with FOREACH_RCU primitives.  There is no need anymore
to acquire the AioContext for the entire duration of aio_dispatch.
Instead, just acquire it before and after invoking the callbacks.
The next step is then to push it further down.

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 util/aio-posix.c | 25 +++++++++++--------------
 util/aio-win32.c | 15 +++++++--------
 util/async.c     |  2 ++
 3 files changed, 20 insertions(+), 22 deletions(-)

diff --git a/util/aio-posix.c b/util/aio-posix.c
index 9453d83..3fd64fb 100644
--- a/util/aio-posix.c
+++ b/util/aio-posix.c
@@ -402,7 +402,9 @@ static bool aio_dispatch_handlers(AioContext *ctx)
             (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
             aio_node_check(ctx, node->is_external) &&
             node->io_read) {
+            aio_context_acquire(ctx);
             node->io_read(node->opaque);
+            aio_context_release(ctx);
 
             /* aio_notify() does not count as progress */
             if (node->opaque != &ctx->notifier) {
@@ -413,7 +415,9 @@ static bool aio_dispatch_handlers(AioContext *ctx)
             (revents & (G_IO_OUT | G_IO_ERR)) &&
             aio_node_check(ctx, node->is_external) &&
             node->io_write) {
+            aio_context_acquire(ctx);
             node->io_write(node->opaque);
+            aio_context_release(ctx);
             progress = true;
         }
 
@@ -450,7 +454,9 @@ bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
     }
 
     /* Run our timers */
+    aio_context_acquire(ctx);
     progress |= timerlistgroup_run_timers(&ctx->tlg);
+    aio_context_release(ctx);
 
     return progress;
 }
@@ -596,9 +602,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
     int64_t timeout;
     int64_t start = 0;
 
-    aio_context_acquire(ctx);
-    progress = false;
-
     /* aio_notify can avoid the expensive event_notifier_set if
      * everything (file descriptors, bottom halves, timers) will
      * be re-evaluated before the next blocking poll().  This is
@@ -616,9 +619,11 @@ bool aio_poll(AioContext *ctx, bool blocking)
         start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
     }
 
-    if (try_poll_mode(ctx, blocking)) {
-        progress = true;
-    } else {
+    aio_context_acquire(ctx);
+    progress = try_poll_mode(ctx, blocking);
+    aio_context_release(ctx);
+
+    if (!progress) {
         assert(npfd == 0);
 
         /* fill pollfds */
@@ -635,9 +640,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
         timeout = blocking ? aio_compute_timeout(ctx) : 0;
 
         /* wait until next event */
-        if (timeout) {
-            aio_context_release(ctx);
-        }
         if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) {
             AioHandler epoll_handler;
 
@@ -649,9 +651,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
         } else  {
             ret = qemu_poll_ns(pollfds, npfd, timeout);
         }
-        if (timeout) {
-            aio_context_acquire(ctx);
-        }
     }
 
     if (blocking) {
@@ -716,8 +715,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
         progress = true;
     }
 
-    aio_context_release(ctx);
-
     return progress;
 }
 
diff --git a/util/aio-win32.c b/util/aio-win32.c
index 900524c..ab6d0e5 100644
--- a/util/aio-win32.c
+++ b/util/aio-win32.c
@@ -266,7 +266,9 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
             (revents || event_notifier_get_handle(node->e) == event) &&
             node->io_notify) {
             node->pfd.revents = 0;
+            aio_context_acquire(ctx);
             node->io_notify(node->e);
+            aio_context_release(ctx);
 
             /* aio_notify() does not count as progress */
             if (node->e != &ctx->notifier) {
@@ -278,11 +280,15 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
             (node->io_read || node->io_write)) {
             node->pfd.revents = 0;
             if ((revents & G_IO_IN) && node->io_read) {
+                aio_context_acquire(ctx);
                 node->io_read(node->opaque);
+                aio_context_release(ctx);
                 progress = true;
             }
             if ((revents & G_IO_OUT) && node->io_write) {
+                aio_context_acquire(ctx);
                 node->io_write(node->opaque);
+                aio_context_release(ctx);
                 progress = true;
             }
 
@@ -329,7 +335,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
     int count;
     int timeout;
 
-    aio_context_acquire(ctx);
     progress = false;
 
     /* aio_notify can avoid the expensive event_notifier_set if
@@ -371,17 +376,11 @@ bool aio_poll(AioContext *ctx, bool blocking)
 
         timeout = blocking && !have_select_revents
             ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0;
-        if (timeout) {
-            aio_context_release(ctx);
-        }
         ret = WaitForMultipleObjects(count, events, FALSE, timeout);
         if (blocking) {
             assert(first);
             atomic_sub(&ctx->notify_me, 2);
         }
-        if (timeout) {
-            aio_context_acquire(ctx);
-        }
 
         if (first) {
             aio_notify_accept(ctx);
@@ -404,8 +403,8 @@ bool aio_poll(AioContext *ctx, bool blocking)
         progress |= aio_dispatch_handlers(ctx, event);
     } while (count > 0);
 
+    aio_context_acquire(ctx);
     progress |= timerlistgroup_run_timers(&ctx->tlg);
-
     aio_context_release(ctx);
     return progress;
 }
diff --git a/util/async.c b/util/async.c
index 44c9c3b..8e65e4b 100644
--- a/util/async.c
+++ b/util/async.c
@@ -114,7 +114,9 @@ int aio_bh_poll(AioContext *ctx)
                 ret = 1;
             }
             bh->idle = 0;
+            aio_context_acquire(ctx);
             aio_bh_call(bh);
+            aio_context_release(ctx);
         }
         if (bh->deleted) {
             deleted = true;
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH 11/17] block: explicitly acquire aiocontext in timers that need it
  2017-01-20 16:43 [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Paolo Bonzini
                   ` (9 preceding siblings ...)
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 10/17] aio: push aio_context_acquire/release down to dispatching Paolo Bonzini
@ 2017-01-20 16:43 ` Paolo Bonzini
  2017-01-30 16:01   ` Stefan Hajnoczi
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 12/17] block: explicitly acquire aiocontext in callbacks " Paolo Bonzini
                   ` (7 subsequent siblings)
  18 siblings, 1 reply; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-20 16:43 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/curl.c                |  2 ++
 block/io.c                  |  5 +++++
 block/iscsi.c               |  8 ++++++--
 block/null.c                |  4 ++++
 block/qed.c                 | 12 ++++++++++++
 block/qed.h                 |  3 +++
 block/throttle-groups.c     |  2 ++
 util/aio-posix.c            |  2 --
 util/aio-win32.c            |  2 --
 util/qemu-coroutine-sleep.c |  2 +-
 10 files changed, 35 insertions(+), 7 deletions(-)

        v1->v2: document restrictions in bdrv_aio_cancel due to qemu_aio_ref
	[Stefan]

diff --git a/block/curl.c b/block/curl.c
index 792fef8..65e6da1 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -424,9 +424,11 @@ static void curl_multi_timeout_do(void *arg)
         return;
     }
 
+    aio_context_acquire(s->aio_context);
     curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);
 
     curl_multi_check_completion(s);
+    aio_context_release(s->aio_context);
 #else
     abort();
 #endif
diff --git a/block/io.c b/block/io.c
index 4f9178e..4e7834c 100644
--- a/block/io.c
+++ b/block/io.c
@@ -2092,6 +2092,11 @@ void bdrv_aio_cancel(BlockAIOCB *acb)
         if (acb->aiocb_info->get_aio_context) {
             aio_poll(acb->aiocb_info->get_aio_context(acb), true);
         } else if (acb->bs) {
+            /* qemu_aio_ref and qemu_aio_unref are not thread-safe, so
+             * assert that we're not using an I/O thread.  Thread-safe
+             * code should use bdrv_aio_cancel_async exclusively.
+             */
+            assert(bdrv_get_aio_context(acb->bs) == qemu_get_aio_context());
             aio_poll(bdrv_get_aio_context(acb->bs), true);
         } else {
             abort();
diff --git a/block/iscsi.c b/block/iscsi.c
index 6aeeb9e..e1f10d6 100644
--- a/block/iscsi.c
+++ b/block/iscsi.c
@@ -174,7 +174,7 @@ static void iscsi_retry_timer_expired(void *opaque)
     struct IscsiTask *iTask = opaque;
     iTask->complete = 1;
     if (iTask->co) {
-        qemu_coroutine_enter(iTask->co);
+        aio_co_wake(iTask->co);
     }
 }
 
@@ -1388,16 +1388,20 @@ static void iscsi_nop_timed_event(void *opaque)
 {
     IscsiLun *iscsilun = opaque;
 
+    aio_context_acquire(iscsilun->aio_context);
     if (iscsi_get_nops_in_flight(iscsilun->iscsi) >= MAX_NOP_FAILURES) {
         error_report("iSCSI: NOP timeout. Reconnecting...");
         iscsilun->request_timed_out = true;
     } else if (iscsi_nop_out_async(iscsilun->iscsi, NULL, NULL, 0, NULL) != 0) {
         error_report("iSCSI: failed to sent NOP-Out. Disabling NOP messages.");
-        return;
+        goto out;
     }
 
     timer_mod(iscsilun->nop_timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) + NOP_INTERVAL);
     iscsi_set_events(iscsilun);
+
+out:
+    aio_context_release(iscsilun->aio_context);
 }
 
 static void iscsi_readcapacity_sync(IscsiLun *iscsilun, Error **errp)
diff --git a/block/null.c b/block/null.c
index b300390..356209a 100644
--- a/block/null.c
+++ b/block/null.c
@@ -141,7 +141,11 @@ static void null_bh_cb(void *opaque)
 static void null_timer_cb(void *opaque)
 {
     NullAIOCB *acb = opaque;
+    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
+
+    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, 0);
+    aio_context_release(ctx);
     timer_deinit(&acb->timer);
     qemu_aio_unref(acb);
 }
diff --git a/block/qed.c b/block/qed.c
index 7f1c508..a21d025 100644
--- a/block/qed.c
+++ b/block/qed.c
@@ -345,10 +345,22 @@ static void qed_need_check_timer_cb(void *opaque)
 
     trace_qed_need_check_timer_cb(s);
 
+    qed_acquire(s);
     qed_plug_allocating_write_reqs(s);
 
     /* Ensure writes are on disk before clearing flag */
     bdrv_aio_flush(s->bs->file->bs, qed_clear_need_check, s);
+    qed_release(s);
+}
+
+void qed_acquire(BDRVQEDState *s)
+{
+    aio_context_acquire(bdrv_get_aio_context(s->bs));
+}
+
+void qed_release(BDRVQEDState *s)
+{
+    aio_context_release(bdrv_get_aio_context(s->bs));
 }
 
 static void qed_start_need_check_timer(BDRVQEDState *s)
diff --git a/block/qed.h b/block/qed.h
index 9676ab9..ce8c314 100644
--- a/block/qed.h
+++ b/block/qed.h
@@ -198,6 +198,9 @@ enum {
  */
 typedef void QEDFindClusterFunc(void *opaque, int ret, uint64_t offset, size_t len);
 
+void qed_acquire(BDRVQEDState *s);
+void qed_release(BDRVQEDState *s);
+
 /**
  * Generic callback for chaining async callbacks
  */
diff --git a/block/throttle-groups.c b/block/throttle-groups.c
index 17b2efb..aade5de 100644
--- a/block/throttle-groups.c
+++ b/block/throttle-groups.c
@@ -416,7 +416,9 @@ static void timer_cb(BlockBackend *blk, bool is_write)
     qemu_mutex_unlock(&tg->lock);
 
     /* Run the request that was waiting for this timer */
+    aio_context_acquire(blk_get_aio_context(blk));
     empty_queue = !qemu_co_enter_next(&blkp->throttled_reqs[is_write]);
+    aio_context_release(blk_get_aio_context(blk));
 
     /* If the request queue was empty then we have to take care of
      * scheduling the next one */
diff --git a/util/aio-posix.c b/util/aio-posix.c
index 3fd64fb..8d79cf3 100644
--- a/util/aio-posix.c
+++ b/util/aio-posix.c
@@ -454,9 +454,7 @@ bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
     }
 
     /* Run our timers */
-    aio_context_acquire(ctx);
     progress |= timerlistgroup_run_timers(&ctx->tlg);
-    aio_context_release(ctx);
 
     return progress;
 }
diff --git a/util/aio-win32.c b/util/aio-win32.c
index ab6d0e5..810e1c6 100644
--- a/util/aio-win32.c
+++ b/util/aio-win32.c
@@ -403,9 +403,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
         progress |= aio_dispatch_handlers(ctx, event);
     } while (count > 0);
 
-    aio_context_acquire(ctx);
     progress |= timerlistgroup_run_timers(&ctx->tlg);
-    aio_context_release(ctx);
     return progress;
 }
 
diff --git a/util/qemu-coroutine-sleep.c b/util/qemu-coroutine-sleep.c
index 25de3ed..9c56550 100644
--- a/util/qemu-coroutine-sleep.c
+++ b/util/qemu-coroutine-sleep.c
@@ -25,7 +25,7 @@ static void co_sleep_cb(void *opaque)
 {
     CoSleepCB *sleep_cb = opaque;
 
-    qemu_coroutine_enter(sleep_cb->co);
+    aio_co_wake(sleep_cb->co);
 }
 
 void coroutine_fn co_aio_sleep_ns(AioContext *ctx, QEMUClockType type,
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH 12/17] block: explicitly acquire aiocontext in callbacks that need it
  2017-01-20 16:43 [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Paolo Bonzini
                   ` (10 preceding siblings ...)
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 11/17] block: explicitly acquire aiocontext in timers that need it Paolo Bonzini
@ 2017-01-20 16:43 ` Paolo Bonzini
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 13/17] block: explicitly acquire aiocontext in bottom halves " Paolo Bonzini
                   ` (6 subsequent siblings)
  18 siblings, 0 replies; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-20 16:43 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz

This covers both file descriptor callbacks and polling callbacks,
since they execute related code.

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/curl.c          | 16 +++++++++++++---
 block/iscsi.c         |  4 ++++
 block/linux-aio.c     |  4 ++++
 block/nfs.c           |  6 ++++++
 block/sheepdog.c      | 29 +++++++++++++++--------------
 block/ssh.c           | 29 +++++++++--------------------
 block/win32-aio.c     | 10 ++++++----
 hw/block/virtio-blk.c |  5 ++++-
 hw/scsi/virtio-scsi.c |  6 ++++++
 util/aio-posix.c      |  7 -------
 util/aio-win32.c      |  6 ------
 11 files changed, 67 insertions(+), 55 deletions(-)

diff --git a/block/curl.c b/block/curl.c
index 65e6da1..05b9ca3 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -386,9 +386,8 @@ static void curl_multi_check_completion(BDRVCURLState *s)
     }
 }
 
-static void curl_multi_do(void *arg)
+static void curl_multi_do_locked(CURLState *s)
 {
-    CURLState *s = (CURLState *)arg;
     CURLSocket *socket, *next_socket;
     int running;
     int r;
@@ -406,12 +405,23 @@ static void curl_multi_do(void *arg)
     }
 }
 
+static void curl_multi_do(void *arg)
+{
+    CURLState *s = (CURLState *)arg;
+
+    aio_context_acquire(s->s->aio_context);
+    curl_multi_do_locked(s);
+    aio_context_release(s->s->aio_context);
+}
+
 static void curl_multi_read(void *arg)
 {
     CURLState *s = (CURLState *)arg;
 
-    curl_multi_do(arg);
+    aio_context_acquire(s->s->aio_context);
+    curl_multi_do_locked(s);
     curl_multi_check_completion(s->s);
+    aio_context_release(s->s->aio_context);
 }
 
 static void curl_multi_timeout_do(void *arg)
diff --git a/block/iscsi.c b/block/iscsi.c
index e1f10d6..54d1381 100644
--- a/block/iscsi.c
+++ b/block/iscsi.c
@@ -394,8 +394,10 @@ iscsi_process_read(void *arg)
     IscsiLun *iscsilun = arg;
     struct iscsi_context *iscsi = iscsilun->iscsi;
 
+    aio_context_acquire(iscsilun->aio_context);
     iscsi_service(iscsi, POLLIN);
     iscsi_set_events(iscsilun);
+    aio_context_release(iscsilun->aio_context);
 }
 
 static void
@@ -404,8 +406,10 @@ iscsi_process_write(void *arg)
     IscsiLun *iscsilun = arg;
     struct iscsi_context *iscsi = iscsilun->iscsi;
 
+    aio_context_acquire(iscsilun->aio_context);
     iscsi_service(iscsi, POLLOUT);
     iscsi_set_events(iscsilun);
+    aio_context_release(iscsilun->aio_context);
 }
 
 static int64_t sector_lun2qemu(int64_t sector, IscsiLun *iscsilun)
diff --git a/block/linux-aio.c b/block/linux-aio.c
index 03ab741..277c016 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -251,7 +251,9 @@ static void qemu_laio_completion_cb(EventNotifier *e)
     LinuxAioState *s = container_of(e, LinuxAioState, e);
 
     if (event_notifier_test_and_clear(&s->e)) {
+        aio_context_acquire(s->aio_context);
         qemu_laio_process_completions_and_submit(s);
+        aio_context_release(s->aio_context);
     }
 }
 
@@ -265,7 +267,9 @@ static bool qemu_laio_poll_cb(void *opaque)
         return false;
     }
 
+    aio_context_acquire(s->aio_context);
     qemu_laio_process_completions_and_submit(s);
+    aio_context_release(s->aio_context);
     return true;
 }
 
diff --git a/block/nfs.c b/block/nfs.c
index a564340..803faf9 100644
--- a/block/nfs.c
+++ b/block/nfs.c
@@ -207,15 +207,21 @@ static void nfs_set_events(NFSClient *client)
 static void nfs_process_read(void *arg)
 {
     NFSClient *client = arg;
+
+    aio_context_acquire(client->aio_context);
     nfs_service(client->context, POLLIN);
     nfs_set_events(client);
+    aio_context_release(client->aio_context);
 }
 
 static void nfs_process_write(void *arg)
 {
     NFSClient *client = arg;
+
+    aio_context_acquire(client->aio_context);
     nfs_service(client->context, POLLOUT);
     nfs_set_events(client);
+    aio_context_release(client->aio_context);
 }
 
 static void nfs_co_init_task(BlockDriverState *bs, NFSRPC *task)
diff --git a/block/sheepdog.c b/block/sheepdog.c
index f757157..32c4e4c 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -575,13 +575,6 @@ static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
     return ret;
 }
 
-static void restart_co_req(void *opaque)
-{
-    Coroutine *co = opaque;
-
-    qemu_coroutine_enter(co);
-}
-
 typedef struct SheepdogReqCo {
     int sockfd;
     BlockDriverState *bs;
@@ -592,12 +585,19 @@ typedef struct SheepdogReqCo {
     unsigned int *rlen;
     int ret;
     bool finished;
+    Coroutine *co;
 } SheepdogReqCo;
 
+static void restart_co_req(void *opaque)
+{
+    SheepdogReqCo *srco = opaque;
+
+    aio_co_wake(srco->co);
+}
+
 static coroutine_fn void do_co_req(void *opaque)
 {
     int ret;
-    Coroutine *co;
     SheepdogReqCo *srco = opaque;
     int sockfd = srco->sockfd;
     SheepdogReq *hdr = srco->hdr;
@@ -605,9 +605,9 @@ static coroutine_fn void do_co_req(void *opaque)
     unsigned int *wlen = srco->wlen;
     unsigned int *rlen = srco->rlen;
 
-    co = qemu_coroutine_self();
+    srco->co = qemu_coroutine_self();
     aio_set_fd_handler(srco->aio_context, sockfd, false,
-                       NULL, restart_co_req, NULL, co);
+                       NULL, restart_co_req, NULL, srco);
 
     ret = send_co_req(sockfd, hdr, data, wlen);
     if (ret < 0) {
@@ -615,7 +615,7 @@ static coroutine_fn void do_co_req(void *opaque)
     }
 
     aio_set_fd_handler(srco->aio_context, sockfd, false,
-                       restart_co_req, NULL, NULL, co);
+                       restart_co_req, NULL, NULL, srco);
 
     ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
     if (ret != sizeof(*hdr)) {
@@ -643,6 +643,7 @@ out:
     aio_set_fd_handler(srco->aio_context, sockfd, false,
                        NULL, NULL, NULL, NULL);
 
+    srco->co = NULL;
     srco->ret = ret;
     srco->finished = true;
     if (srco->bs) {
@@ -866,7 +867,7 @@ static void coroutine_fn aio_read_response(void *opaque)
          * We've finished all requests which belong to the AIOCB, so
          * we can switch back to sd_co_readv/writev now.
          */
-        qemu_coroutine_enter(acb->coroutine);
+        aio_co_wake(acb->coroutine);
     }
 
     return;
@@ -883,14 +884,14 @@ static void co_read_response(void *opaque)
         s->co_recv = qemu_coroutine_create(aio_read_response, opaque);
     }
 
-    qemu_coroutine_enter(s->co_recv);
+    aio_co_wake(s->co_recv);
 }
 
 static void co_write_request(void *opaque)
 {
     BDRVSheepdogState *s = opaque;
 
-    qemu_coroutine_enter(s->co_send);
+    aio_co_wake(s->co_send);
 }
 
 /*
diff --git a/block/ssh.c b/block/ssh.c
index e0edf20..835932e 100644
--- a/block/ssh.c
+++ b/block/ssh.c
@@ -889,10 +889,14 @@ static void restart_coroutine(void *opaque)
 
     DPRINTF("co=%p", co);
 
-    qemu_coroutine_enter(co);
+    aio_co_wake(co);
 }
 
-static coroutine_fn void set_fd_handler(BDRVSSHState *s, BlockDriverState *bs)
+/* A non-blocking call returned EAGAIN, so yield, ensuring the
+ * handlers are set up so that we'll be rescheduled when there is an
+ * interesting event on the socket.
+ */
+static coroutine_fn void co_yield(BDRVSSHState *s, BlockDriverState *bs)
 {
     int r;
     IOHandler *rd_handler = NULL, *wr_handler = NULL;
@@ -912,25 +916,10 @@ static coroutine_fn void set_fd_handler(BDRVSSHState *s, BlockDriverState *bs)
 
     aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock,
                        false, rd_handler, wr_handler, NULL, co);
-}
-
-static coroutine_fn void clear_fd_handler(BDRVSSHState *s,
-                                          BlockDriverState *bs)
-{
-    DPRINTF("s->sock=%d", s->sock);
-    aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock,
-                       false, NULL, NULL, NULL, NULL);
-}
-
-/* A non-blocking call returned EAGAIN, so yield, ensuring the
- * handlers are set up so that we'll be rescheduled when there is an
- * interesting event on the socket.
- */
-static coroutine_fn void co_yield(BDRVSSHState *s, BlockDriverState *bs)
-{
-    set_fd_handler(s, bs);
     qemu_coroutine_yield();
-    clear_fd_handler(s, bs);
+    DPRINTF("s->sock=%d - back", s->sock);
+    aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock, false,
+                       NULL, NULL, NULL, NULL);
 }
 
 /* SFTP has a function `libssh2_sftp_seek64' which seeks to a position
diff --git a/block/win32-aio.c b/block/win32-aio.c
index 8cdf73b..c3f8f1a 100644
--- a/block/win32-aio.c
+++ b/block/win32-aio.c
@@ -41,7 +41,7 @@ struct QEMUWin32AIOState {
     HANDLE hIOCP;
     EventNotifier e;
     int count;
-    bool is_aio_context_attached;
+    AioContext *aio_ctx;
 };
 
 typedef struct QEMUWin32AIOCB {
@@ -88,7 +88,9 @@ static void win32_aio_process_completion(QEMUWin32AIOState *s,
     }
 
 
+    aio_context_acquire(s->aio_ctx);
     waiocb->common.cb(waiocb->common.opaque, ret);
+    aio_context_release(s->aio_ctx);
     qemu_aio_unref(waiocb);
 }
 
@@ -176,13 +178,13 @@ void win32_aio_detach_aio_context(QEMUWin32AIOState *aio,
                                   AioContext *old_context)
 {
     aio_set_event_notifier(old_context, &aio->e, false, NULL, NULL);
-    aio->is_aio_context_attached = false;
+    aio->aio_ctx = NULL;
 }
 
 void win32_aio_attach_aio_context(QEMUWin32AIOState *aio,
                                   AioContext *new_context)
 {
-    aio->is_aio_context_attached = true;
+    aio->aio_ctx = new_context;
     aio_set_event_notifier(new_context, &aio->e, false,
                            win32_aio_completion_cb, NULL);
 }
@@ -212,7 +214,7 @@ out_free_state:
 
 void win32_aio_cleanup(QEMUWin32AIOState *aio)
 {
-    assert(!aio->is_aio_context_attached);
+    assert(!aio->aio_ctx);
     CloseHandle(aio->hIOCP);
     event_notifier_cleanup(&aio->e);
     g_free(aio);
diff --git a/hw/block/virtio-blk.c b/hw/block/virtio-blk.c
index 702eda8..a00ee38 100644
--- a/hw/block/virtio-blk.c
+++ b/hw/block/virtio-blk.c
@@ -150,7 +150,8 @@ static void virtio_blk_ioctl_complete(void *opaque, int status)
 {
     VirtIOBlockIoctlReq *ioctl_req = opaque;
     VirtIOBlockReq *req = ioctl_req->req;
-    VirtIODevice *vdev = VIRTIO_DEVICE(req->dev);
+    VirtIOBlock *s = req->dev;
+    VirtIODevice *vdev = VIRTIO_DEVICE(s);
     struct virtio_scsi_inhdr *scsi;
     struct sg_io_hdr *hdr;
 
@@ -586,6 +587,7 @@ void virtio_blk_handle_vq(VirtIOBlock *s, VirtQueue *vq)
     VirtIOBlockReq *req;
     MultiReqBuffer mrb = {};
 
+    aio_context_acquire(blk_get_aio_context(s->blk));
     blk_io_plug(s->blk);
 
     do {
@@ -607,6 +609,7 @@ void virtio_blk_handle_vq(VirtIOBlock *s, VirtQueue *vq)
     }
 
     blk_io_unplug(s->blk);
+    aio_context_release(blk_get_aio_context(s->blk));
 }
 
 static void virtio_blk_handle_output(VirtIODevice *vdev, VirtQueue *vq)
diff --git a/hw/scsi/virtio-scsi.c b/hw/scsi/virtio-scsi.c
index ce19eff..5d9718a 100644
--- a/hw/scsi/virtio-scsi.c
+++ b/hw/scsi/virtio-scsi.c
@@ -440,9 +440,11 @@ void virtio_scsi_handle_ctrl_vq(VirtIOSCSI *s, VirtQueue *vq)
 {
     VirtIOSCSIReq *req;
 
+    virtio_scsi_acquire(s);
     while ((req = virtio_scsi_pop_req(s, vq))) {
         virtio_scsi_handle_ctrl_req(s, req);
     }
+    virtio_scsi_release(s);
 }
 
 static void virtio_scsi_handle_ctrl(VirtIODevice *vdev, VirtQueue *vq)
@@ -598,6 +600,7 @@ void virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq)
 
     QTAILQ_HEAD(, VirtIOSCSIReq) reqs = QTAILQ_HEAD_INITIALIZER(reqs);
 
+    virtio_scsi_acquire(s);
     do {
         virtio_queue_set_notification(vq, 0);
 
@@ -624,6 +627,7 @@ void virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq)
     QTAILQ_FOREACH_SAFE(req, &reqs, next, next) {
         virtio_scsi_handle_cmd_req_submit(s, req);
     }
+    virtio_scsi_release(s);
 }
 
 static void virtio_scsi_handle_cmd(VirtIODevice *vdev, VirtQueue *vq)
@@ -754,9 +758,11 @@ out:
 
 void virtio_scsi_handle_event_vq(VirtIOSCSI *s, VirtQueue *vq)
 {
+    virtio_scsi_acquire(s);
     if (s->events_dropped) {
         virtio_scsi_push_event(s, NULL, VIRTIO_SCSI_T_NO_EVENT, 0);
     }
+    virtio_scsi_release(s);
 }
 
 static void virtio_scsi_handle_event(VirtIODevice *vdev, VirtQueue *vq)
diff --git a/util/aio-posix.c b/util/aio-posix.c
index 8d79cf3..6beebcd 100644
--- a/util/aio-posix.c
+++ b/util/aio-posix.c
@@ -402,9 +402,7 @@ static bool aio_dispatch_handlers(AioContext *ctx)
             (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
             aio_node_check(ctx, node->is_external) &&
             node->io_read) {
-            aio_context_acquire(ctx);
             node->io_read(node->opaque);
-            aio_context_release(ctx);
 
             /* aio_notify() does not count as progress */
             if (node->opaque != &ctx->notifier) {
@@ -415,9 +413,7 @@ static bool aio_dispatch_handlers(AioContext *ctx)
             (revents & (G_IO_OUT | G_IO_ERR)) &&
             aio_node_check(ctx, node->is_external) &&
             node->io_write) {
-            aio_context_acquire(ctx);
             node->io_write(node->opaque);
-            aio_context_release(ctx);
             progress = true;
         }
 
@@ -617,10 +613,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
         start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
     }
 
-    aio_context_acquire(ctx);
     progress = try_poll_mode(ctx, blocking);
-    aio_context_release(ctx);
-
     if (!progress) {
         assert(npfd == 0);
 
diff --git a/util/aio-win32.c b/util/aio-win32.c
index 810e1c6..20b63ce 100644
--- a/util/aio-win32.c
+++ b/util/aio-win32.c
@@ -266,9 +266,7 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
             (revents || event_notifier_get_handle(node->e) == event) &&
             node->io_notify) {
             node->pfd.revents = 0;
-            aio_context_acquire(ctx);
             node->io_notify(node->e);
-            aio_context_release(ctx);
 
             /* aio_notify() does not count as progress */
             if (node->e != &ctx->notifier) {
@@ -280,15 +278,11 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
             (node->io_read || node->io_write)) {
             node->pfd.revents = 0;
             if ((revents & G_IO_IN) && node->io_read) {
-                aio_context_acquire(ctx);
                 node->io_read(node->opaque);
-                aio_context_release(ctx);
                 progress = true;
             }
             if ((revents & G_IO_OUT) && node->io_write) {
-                aio_context_acquire(ctx);
                 node->io_write(node->opaque);
-                aio_context_release(ctx);
                 progress = true;
             }
 
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH 13/17] block: explicitly acquire aiocontext in bottom halves that need it
  2017-01-20 16:43 [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Paolo Bonzini
                   ` (11 preceding siblings ...)
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 12/17] block: explicitly acquire aiocontext in callbacks " Paolo Bonzini
@ 2017-01-20 16:43 ` Paolo Bonzini
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 14/17] block: explicitly acquire aiocontext in aio callbacks " Paolo Bonzini
                   ` (5 subsequent siblings)
  18 siblings, 0 replies; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-20 16:43 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/archipelago.c   |  3 +++
 block/blkdebug.c      |  9 +--------
 block/blkreplay.c     |  2 +-
 block/block-backend.c |  6 ++++++
 block/curl.c          | 26 ++++++++++++++++++--------
 block/gluster.c       |  9 +--------
 block/io.c            |  6 +++++-
 block/iscsi.c         |  6 +++++-
 block/linux-aio.c     | 15 +++++++++------
 block/nfs.c           |  3 ++-
 block/null.c          |  4 ++++
 block/qed.c           |  3 +++
 block/rbd.c           |  4 ++++
 dma-helpers.c         |  2 ++
 hw/block/virtio-blk.c |  2 ++
 hw/scsi/scsi-bus.c    |  2 ++
 util/async.c          |  4 ++--
 util/thread-pool.c    |  2 ++
 18 files changed, 72 insertions(+), 36 deletions(-)

diff --git a/block/archipelago.c b/block/archipelago.c
index 2449cfc..a624390 100644
--- a/block/archipelago.c
+++ b/block/archipelago.c
@@ -310,8 +310,11 @@ static void qemu_archipelago_complete_aio(void *opaque)
 {
     AIORequestData *reqdata = (AIORequestData *) opaque;
     ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) reqdata->aio_cb;
+    AioContext *ctx = bdrv_get_aio_context(aio_cb->common.bs);
 
+    aio_context_acquire(ctx);
     aio_cb->common.cb(aio_cb->common.opaque, aio_cb->ret);
+    aio_context_release(ctx);
     aio_cb->status = 0;
 
     qemu_aio_unref(aio_cb);
diff --git a/block/blkdebug.c b/block/blkdebug.c
index acccf85..259369d 100644
--- a/block/blkdebug.c
+++ b/block/blkdebug.c
@@ -405,12 +405,6 @@ out:
     return ret;
 }
 
-static void error_callback_bh(void *opaque)
-{
-    Coroutine *co = opaque;
-    qemu_coroutine_enter(co);
-}
-
 static int inject_error(BlockDriverState *bs, BlkdebugRule *rule)
 {
     BDRVBlkdebugState *s = bs->opaque;
@@ -423,8 +417,7 @@ static int inject_error(BlockDriverState *bs, BlkdebugRule *rule)
     }
 
     if (!immediately) {
-        aio_bh_schedule_oneshot(bdrv_get_aio_context(bs), error_callback_bh,
-                                qemu_coroutine_self());
+        aio_co_schedule(bdrv_get_aio_context(bs), qemu_coroutine_self());
         qemu_coroutine_yield();
     }
 
diff --git a/block/blkreplay.c b/block/blkreplay.c
index a741654..cfc8c5b 100755
--- a/block/blkreplay.c
+++ b/block/blkreplay.c
@@ -60,7 +60,7 @@ static int64_t blkreplay_getlength(BlockDriverState *bs)
 static void blkreplay_bh_cb(void *opaque)
 {
     Request *req = opaque;
-    qemu_coroutine_enter(req->co);
+    aio_co_wake(req->co);
     qemu_bh_delete(req->bh);
     g_free(req);
 }
diff --git a/block/block-backend.c b/block/block-backend.c
index 1177598..bfc0e6b 100644
--- a/block/block-backend.c
+++ b/block/block-backend.c
@@ -939,9 +939,12 @@ int blk_make_zero(BlockBackend *blk, BdrvRequestFlags flags)
 static void error_callback_bh(void *opaque)
 {
     struct BlockBackendAIOCB *acb = opaque;
+    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
 
     bdrv_dec_in_flight(acb->common.bs);
+    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, acb->ret);
+    aio_context_release(ctx);
     qemu_aio_unref(acb);
 }
 
@@ -983,9 +986,12 @@ static void blk_aio_complete(BlkAioEmAIOCB *acb)
 static void blk_aio_complete_bh(void *opaque)
 {
     BlkAioEmAIOCB *acb = opaque;
+    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
 
     assert(acb->has_returned);
+    aio_context_acquire(ctx);
     blk_aio_complete(acb);
+    aio_context_release(ctx);
 }
 
 static BlockAIOCB *blk_aio_prwv(BlockBackend *blk, int64_t offset, int bytes,
diff --git a/block/curl.c b/block/curl.c
index 05b9ca3..f3f063b 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -796,13 +796,18 @@ static void curl_readv_bh_cb(void *p)
 {
     CURLState *state;
     int running;
+    int ret = -EINPROGRESS;
 
     CURLAIOCB *acb = p;
-    BDRVCURLState *s = acb->common.bs->opaque;
+    BlockDriverState *bs = acb->common.bs;
+    BDRVCURLState *s = bs->opaque;
+    AioContext *ctx = bdrv_get_aio_context(bs);
 
     size_t start = acb->sector_num * BDRV_SECTOR_SIZE;
     size_t end;
 
+    aio_context_acquire(ctx);
+
     // In case we have the requested data already (e.g. read-ahead),
     // we can just call the callback and be done.
     switch (curl_find_buf(s, start, acb->nb_sectors * BDRV_SECTOR_SIZE, acb)) {
@@ -810,7 +815,7 @@ static void curl_readv_bh_cb(void *p)
             qemu_aio_unref(acb);
             // fall through
         case FIND_RET_WAIT:
-            return;
+            goto out;
         default:
             break;
     }
@@ -818,9 +823,8 @@ static void curl_readv_bh_cb(void *p)
     // No cache found, so let's start a new request
     state = curl_init_state(acb->common.bs, s);
     if (!state) {
-        acb->common.cb(acb->common.opaque, -EIO);
-        qemu_aio_unref(acb);
-        return;
+        ret = -EIO;
+        goto out;
     }
 
     acb->start = 0;
@@ -834,9 +838,8 @@ static void curl_readv_bh_cb(void *p)
     state->orig_buf = g_try_malloc(state->buf_len);
     if (state->buf_len && state->orig_buf == NULL) {
         curl_clean_state(state);
-        acb->common.cb(acb->common.opaque, -ENOMEM);
-        qemu_aio_unref(acb);
-        return;
+        ret = -ENOMEM;
+        goto out;
     }
     state->acb[0] = acb;
 
@@ -849,6 +852,13 @@ static void curl_readv_bh_cb(void *p)
 
     /* Tell curl it needs to kick things off */
     curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);
+
+out:
+    if (ret != -EINPROGRESS) {
+        acb->common.cb(acb->common.opaque, ret);
+        qemu_aio_unref(acb);
+    }
+    aio_context_release(ctx);
 }
 
 static BlockAIOCB *curl_aio_readv(BlockDriverState *bs,
diff --git a/block/gluster.c b/block/gluster.c
index 1a22f29..56b4abe 100644
--- a/block/gluster.c
+++ b/block/gluster.c
@@ -698,13 +698,6 @@ static struct glfs *qemu_gluster_init(BlockdevOptionsGluster *gconf,
     return qemu_gluster_glfs_init(gconf, errp);
 }
 
-static void qemu_gluster_complete_aio(void *opaque)
-{
-    GlusterAIOCB *acb = (GlusterAIOCB *)opaque;
-
-    qemu_coroutine_enter(acb->coroutine);
-}
-
 /*
  * AIO callback routine called from GlusterFS thread.
  */
@@ -720,7 +713,7 @@ static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg)
         acb->ret = -EIO; /* Partial read/write - fail it */
     }
 
-    aio_bh_schedule_oneshot(acb->aio_context, qemu_gluster_complete_aio, acb);
+    aio_co_schedule(acb->aio_context, acb->coroutine);
 }
 
 static void qemu_gluster_parse_flags(int bdrv_flags, int *open_flags)
diff --git a/block/io.c b/block/io.c
index 76dfaf4..b7dc4f8 100644
--- a/block/io.c
+++ b/block/io.c
@@ -189,7 +189,7 @@ static void bdrv_co_drain_bh_cb(void *opaque)
     bdrv_dec_in_flight(bs);
     bdrv_drained_begin(bs);
     data->done = true;
-    qemu_coroutine_enter(co);
+    aio_co_wake(co);
 }
 
 static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs)
@@ -2147,9 +2147,13 @@ static void bdrv_co_complete(BlockAIOCBCoroutine *acb)
 static void bdrv_co_em_bh(void *opaque)
 {
     BlockAIOCBCoroutine *acb = opaque;
+    BlockDriverState *bs = acb->common.bs;
+    AioContext *ctx = bdrv_get_aio_context(bs);
 
     assert(!acb->need_bh);
+    aio_context_acquire(ctx);
     bdrv_co_complete(acb);
+    aio_context_release(ctx);
 }
 
 static void bdrv_co_maybe_schedule_bh(BlockAIOCBCoroutine *acb)
diff --git a/block/iscsi.c b/block/iscsi.c
index 54d1381..5de5d93 100644
--- a/block/iscsi.c
+++ b/block/iscsi.c
@@ -136,13 +136,16 @@ static void
 iscsi_bh_cb(void *p)
 {
     IscsiAIOCB *acb = p;
+    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
 
     qemu_bh_delete(acb->bh);
 
     g_free(acb->buf);
     acb->buf = NULL;
 
+    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, acb->status);
+    aio_context_release(ctx);
 
     if (acb->task != NULL) {
         scsi_free_scsi_task(acb->task);
@@ -165,8 +168,9 @@ iscsi_schedule_bh(IscsiAIOCB *acb)
 static void iscsi_co_generic_bh_cb(void *opaque)
 {
     struct IscsiTask *iTask = opaque;
+
     iTask->complete = 1;
-    qemu_coroutine_enter(iTask->co);
+    aio_co_wake(iTask->co);
 }
 
 static void iscsi_retry_timer_expired(void *opaque)
diff --git a/block/linux-aio.c b/block/linux-aio.c
index 277c016..f7ae38a 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -54,10 +54,10 @@ struct LinuxAioState {
     io_context_t ctx;
     EventNotifier e;
 
-    /* io queue for submit at batch */
+    /* io queue for submit at batch.  Protected by AioContext lock. */
     LaioQueue io_q;
 
-    /* I/O completion processing */
+    /* I/O completion processing.  Only runs in I/O thread.  */
     QEMUBH *completion_bh;
     int event_idx;
     int event_max;
@@ -75,6 +75,7 @@ static inline ssize_t io_event_ret(struct io_event *ev)
  */
 static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
 {
+    LinuxAioState *s = laiocb->ctx;
     int ret;
 
     ret = laiocb->ret;
@@ -93,6 +94,7 @@ static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
     }
 
     laiocb->ret = ret;
+    aio_context_acquire(s->aio_context);
     if (laiocb->co) {
         /* If the coroutine is already entered it must be in ioq_submit() and
          * will notice laio->ret has been filled in when it eventually runs
@@ -106,6 +108,7 @@ static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
         laiocb->common.cb(laiocb->common.opaque, ret);
         qemu_aio_unref(laiocb);
     }
+    aio_context_release(s->aio_context);
 }
 
 /**
@@ -234,9 +237,12 @@ static void qemu_laio_process_completions(LinuxAioState *s)
 static void qemu_laio_process_completions_and_submit(LinuxAioState *s)
 {
     qemu_laio_process_completions(s);
+
+    aio_context_acquire(s->aio_context);
     if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
         ioq_submit(s);
     }
+    aio_context_release(s->aio_context);
 }
 
 static void qemu_laio_completion_bh(void *opaque)
@@ -251,9 +257,7 @@ static void qemu_laio_completion_cb(EventNotifier *e)
     LinuxAioState *s = container_of(e, LinuxAioState, e);
 
     if (event_notifier_test_and_clear(&s->e)) {
-        aio_context_acquire(s->aio_context);
         qemu_laio_process_completions_and_submit(s);
-        aio_context_release(s->aio_context);
     }
 }
 
@@ -267,9 +271,7 @@ static bool qemu_laio_poll_cb(void *opaque)
         return false;
     }
 
-    aio_context_acquire(s->aio_context);
     qemu_laio_process_completions_and_submit(s);
-    aio_context_release(s->aio_context);
     return true;
 }
 
@@ -459,6 +461,7 @@ void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context)
 {
     aio_set_event_notifier(old_context, &s->e, false, NULL, NULL);
     qemu_bh_delete(s->completion_bh);
+    s->aio_context = NULL;
 }
 
 void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context)
diff --git a/block/nfs.c b/block/nfs.c
index 803faf9..32631bb 100644
--- a/block/nfs.c
+++ b/block/nfs.c
@@ -236,8 +236,9 @@ static void nfs_co_init_task(BlockDriverState *bs, NFSRPC *task)
 static void nfs_co_generic_bh_cb(void *opaque)
 {
     NFSRPC *task = opaque;
+
     task->complete = 1;
-    qemu_coroutine_enter(task->co);
+    aio_co_wake(task->co);
 }
 
 static void
diff --git a/block/null.c b/block/null.c
index 356209a..5eb2038 100644
--- a/block/null.c
+++ b/block/null.c
@@ -134,7 +134,11 @@ static const AIOCBInfo null_aiocb_info = {
 static void null_bh_cb(void *opaque)
 {
     NullAIOCB *acb = opaque;
+    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
+
+    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, 0);
+    aio_context_release(ctx);
     qemu_aio_unref(acb);
 }
 
diff --git a/block/qed.c b/block/qed.c
index a21d025..db8295d 100644
--- a/block/qed.c
+++ b/block/qed.c
@@ -942,6 +942,7 @@ static void qed_update_l2_table(BDRVQEDState *s, QEDTable *table, int index,
 static void qed_aio_complete_bh(void *opaque)
 {
     QEDAIOCB *acb = opaque;
+    BDRVQEDState *s = acb_to_s(acb);
     BlockCompletionFunc *cb = acb->common.cb;
     void *user_opaque = acb->common.opaque;
     int ret = acb->bh_ret;
@@ -949,7 +950,9 @@ static void qed_aio_complete_bh(void *opaque)
     qemu_aio_unref(acb);
 
     /* Invoke callback */
+    qed_acquire(s);
     cb(user_opaque, ret);
+    qed_release(s);
 }
 
 static void qed_aio_complete(QEDAIOCB *acb, int ret)
diff --git a/block/rbd.c b/block/rbd.c
index a57b3e3..2cb2cb4 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -413,6 +413,7 @@ shutdown:
 static void qemu_rbd_complete_aio(RADOSCB *rcb)
 {
     RBDAIOCB *acb = rcb->acb;
+    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
     int64_t r;
 
     r = rcb->ret;
@@ -445,7 +446,10 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
         qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
     }
     qemu_vfree(acb->bounce);
+
+    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
+    aio_context_release(ctx);
 
     qemu_aio_unref(acb);
 }
diff --git a/dma-helpers.c b/dma-helpers.c
index 6f9d47c..39d4802 100644
--- a/dma-helpers.c
+++ b/dma-helpers.c
@@ -166,8 +166,10 @@ static void dma_blk_cb(void *opaque, int ret)
                                 QEMU_ALIGN_DOWN(dbs->iov.size, dbs->align));
     }
 
+    aio_context_acquire(dbs->ctx);
     dbs->acb = dbs->io_func(dbs->offset, &dbs->iov,
                             dma_blk_cb, dbs, dbs->io_func_opaque);
+    aio_context_release(dbs->ctx);
     assert(dbs->acb);
 }
 
diff --git a/hw/block/virtio-blk.c b/hw/block/virtio-blk.c
index a00ee38..af652f3 100644
--- a/hw/block/virtio-blk.c
+++ b/hw/block/virtio-blk.c
@@ -639,6 +639,7 @@ static void virtio_blk_dma_restart_bh(void *opaque)
 
     s->rq = NULL;
 
+    aio_context_acquire(blk_get_aio_context(s->conf.conf.blk));
     while (req) {
         VirtIOBlockReq *next = req->next;
         if (virtio_blk_handle_request(req, &mrb)) {
@@ -659,6 +660,7 @@ static void virtio_blk_dma_restart_bh(void *opaque)
     if (mrb.num_reqs) {
         virtio_blk_submit_multireq(s->blk, &mrb);
     }
+    aio_context_release(blk_get_aio_context(s->conf.conf.blk));
 }
 
 static void virtio_blk_dma_restart_cb(void *opaque, int running,
diff --git a/hw/scsi/scsi-bus.c b/hw/scsi/scsi-bus.c
index 297216d..dddeee3 100644
--- a/hw/scsi/scsi-bus.c
+++ b/hw/scsi/scsi-bus.c
@@ -105,6 +105,7 @@ static void scsi_dma_restart_bh(void *opaque)
     qemu_bh_delete(s->bh);
     s->bh = NULL;
 
+    aio_context_acquire(blk_get_aio_context(s->conf.blk));
     QTAILQ_FOREACH_SAFE(req, &s->requests, next, next) {
         scsi_req_ref(req);
         if (req->retry) {
@@ -122,6 +123,7 @@ static void scsi_dma_restart_bh(void *opaque)
         }
         scsi_req_unref(req);
     }
+    aio_context_release(blk_get_aio_context(s->conf.blk));
 }
 
 void scsi_req_retry(SCSIRequest *req)
diff --git a/util/async.c b/util/async.c
index 8e65e4b..99b9d7e 100644
--- a/util/async.c
+++ b/util/async.c
@@ -114,9 +114,7 @@ int aio_bh_poll(AioContext *ctx)
                 ret = 1;
             }
             bh->idle = 0;
-            aio_context_acquire(ctx);
             aio_bh_call(bh);
-            aio_context_release(ctx);
         }
         if (bh->deleted) {
             deleted = true;
@@ -389,7 +387,9 @@ static void co_schedule_bh_cb(void *opaque)
         Coroutine *co = QSLIST_FIRST(&straight);
         QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
         trace_aio_co_schedule_bh_cb(ctx, co);
+        aio_context_acquire(ctx);
         qemu_coroutine_enter(co);
+        aio_context_release(ctx);
     }
 }
 
diff --git a/util/thread-pool.c b/util/thread-pool.c
index 6fba913..7c9cec5 100644
--- a/util/thread-pool.c
+++ b/util/thread-pool.c
@@ -165,6 +165,7 @@ static void thread_pool_completion_bh(void *opaque)
     ThreadPool *pool = opaque;
     ThreadPoolElement *elem, *next;
 
+    aio_context_acquire(pool->ctx);
 restart:
     QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
         if (elem->state != THREAD_DONE) {
@@ -191,6 +192,7 @@ restart:
             qemu_aio_unref(elem);
         }
     }
+    aio_context_release(pool->ctx);
 }
 
 static void thread_pool_cancel(BlockAIOCB *acb)
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH 14/17] block: explicitly acquire aiocontext in aio callbacks that need it
  2017-01-20 16:43 [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Paolo Bonzini
                   ` (12 preceding siblings ...)
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 13/17] block: explicitly acquire aiocontext in bottom halves " Paolo Bonzini
@ 2017-01-20 16:43 ` Paolo Bonzini
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 15/17] aio-posix: partially inline aio_dispatch into aio_poll Paolo Bonzini
                   ` (4 subsequent siblings)
  18 siblings, 0 replies; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-20 16:43 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/archipelago.c    |  3 ---
 block/block-backend.c  |  7 -------
 block/curl.c           |  2 +-
 block/io.c             |  6 +-----
 block/iscsi.c          |  3 ---
 block/linux-aio.c      |  5 +----
 block/mirror.c         | 12 +++++++++---
 block/null.c           |  8 --------
 block/qed-cluster.c    |  2 ++
 block/qed-table.c      | 12 ++++++++++--
 block/qed.c            |  4 ++--
 block/rbd.c            |  4 ----
 block/win32-aio.c      |  3 ---
 hw/block/virtio-blk.c  | 12 +++++++++++-
 hw/scsi/scsi-disk.c    | 15 +++++++++++++++
 hw/scsi/scsi-generic.c | 20 +++++++++++++++++---
 util/thread-pool.c     |  4 +++-
 17 files changed, 72 insertions(+), 50 deletions(-)

diff --git a/block/archipelago.c b/block/archipelago.c
index a624390..2449cfc 100644
--- a/block/archipelago.c
+++ b/block/archipelago.c
@@ -310,11 +310,8 @@ static void qemu_archipelago_complete_aio(void *opaque)
 {
     AIORequestData *reqdata = (AIORequestData *) opaque;
     ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) reqdata->aio_cb;
-    AioContext *ctx = bdrv_get_aio_context(aio_cb->common.bs);
 
-    aio_context_acquire(ctx);
     aio_cb->common.cb(aio_cb->common.opaque, aio_cb->ret);
-    aio_context_release(ctx);
     aio_cb->status = 0;
 
     qemu_aio_unref(aio_cb);
diff --git a/block/block-backend.c b/block/block-backend.c
index bfc0e6b..819f272 100644
--- a/block/block-backend.c
+++ b/block/block-backend.c
@@ -939,12 +939,9 @@ int blk_make_zero(BlockBackend *blk, BdrvRequestFlags flags)
 static void error_callback_bh(void *opaque)
 {
     struct BlockBackendAIOCB *acb = opaque;
-    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
 
     bdrv_dec_in_flight(acb->common.bs);
-    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, acb->ret);
-    aio_context_release(ctx);
     qemu_aio_unref(acb);
 }
 
@@ -986,12 +983,8 @@ static void blk_aio_complete(BlkAioEmAIOCB *acb)
 static void blk_aio_complete_bh(void *opaque)
 {
     BlkAioEmAIOCB *acb = opaque;
-    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
-
     assert(acb->has_returned);
-    aio_context_acquire(ctx);
     blk_aio_complete(acb);
-    aio_context_release(ctx);
 }
 
 static BlockAIOCB *blk_aio_prwv(BlockBackend *blk, int64_t offset, int bytes,
diff --git a/block/curl.c b/block/curl.c
index f3f063b..2939cc7 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -854,11 +854,11 @@ static void curl_readv_bh_cb(void *p)
     curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);
 
 out:
+    aio_context_release(ctx);
     if (ret != -EINPROGRESS) {
         acb->common.cb(acb->common.opaque, ret);
         qemu_aio_unref(acb);
     }
-    aio_context_release(ctx);
 }
 
 static BlockAIOCB *curl_aio_readv(BlockDriverState *bs,
diff --git a/block/io.c b/block/io.c
index b7dc4f8..80c9d41 100644
--- a/block/io.c
+++ b/block/io.c
@@ -813,7 +813,7 @@ static void bdrv_co_io_em_complete(void *opaque, int ret)
     CoroutineIOCompletion *co = opaque;
 
     co->ret = ret;
-    qemu_coroutine_enter(co->coroutine);
+    aio_co_wake(co->coroutine);
 }
 
 static int coroutine_fn bdrv_driver_preadv(BlockDriverState *bs,
@@ -2147,13 +2147,9 @@ static void bdrv_co_complete(BlockAIOCBCoroutine *acb)
 static void bdrv_co_em_bh(void *opaque)
 {
     BlockAIOCBCoroutine *acb = opaque;
-    BlockDriverState *bs = acb->common.bs;
-    AioContext *ctx = bdrv_get_aio_context(bs);
 
     assert(!acb->need_bh);
-    aio_context_acquire(ctx);
     bdrv_co_complete(acb);
-    aio_context_release(ctx);
 }
 
 static void bdrv_co_maybe_schedule_bh(BlockAIOCBCoroutine *acb)
diff --git a/block/iscsi.c b/block/iscsi.c
index 5de5d93..c19c838 100644
--- a/block/iscsi.c
+++ b/block/iscsi.c
@@ -136,16 +136,13 @@ static void
 iscsi_bh_cb(void *p)
 {
     IscsiAIOCB *acb = p;
-    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
 
     qemu_bh_delete(acb->bh);
 
     g_free(acb->buf);
     acb->buf = NULL;
 
-    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, acb->status);
-    aio_context_release(ctx);
 
     if (acb->task != NULL) {
         scsi_free_scsi_task(acb->task);
diff --git a/block/linux-aio.c b/block/linux-aio.c
index f7ae38a..88b8d55 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -75,7 +75,6 @@ static inline ssize_t io_event_ret(struct io_event *ev)
  */
 static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
 {
-    LinuxAioState *s = laiocb->ctx;
     int ret;
 
     ret = laiocb->ret;
@@ -94,7 +93,6 @@ static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
     }
 
     laiocb->ret = ret;
-    aio_context_acquire(s->aio_context);
     if (laiocb->co) {
         /* If the coroutine is already entered it must be in ioq_submit() and
          * will notice laio->ret has been filled in when it eventually runs
@@ -102,13 +100,12 @@ static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
          * that!
          */
         if (!qemu_coroutine_entered(laiocb->co)) {
-            qemu_coroutine_enter(laiocb->co);
+            aio_co_wake(laiocb->co);
         }
     } else {
         laiocb->common.cb(laiocb->common.opaque, ret);
         qemu_aio_unref(laiocb);
     }
-    aio_context_release(s->aio_context);
 }
 
 /**
diff --git a/block/mirror.c b/block/mirror.c
index 301ba92..698a54e 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -132,6 +132,8 @@ static void mirror_write_complete(void *opaque, int ret)
 {
     MirrorOp *op = opaque;
     MirrorBlockJob *s = op->s;
+
+    aio_context_acquire(blk_get_aio_context(s->common.blk));
     if (ret < 0) {
         BlockErrorAction action;
 
@@ -142,12 +144,15 @@ static void mirror_write_complete(void *opaque, int ret)
         }
     }
     mirror_iteration_done(op, ret);
+    aio_context_release(blk_get_aio_context(s->common.blk));
 }
 
 static void mirror_read_complete(void *opaque, int ret)
 {
     MirrorOp *op = opaque;
     MirrorBlockJob *s = op->s;
+
+    aio_context_acquire(blk_get_aio_context(s->common.blk));
     if (ret < 0) {
         BlockErrorAction action;
 
@@ -158,10 +163,11 @@ static void mirror_read_complete(void *opaque, int ret)
         }
 
         mirror_iteration_done(op, ret);
-        return;
+    } else {
+        blk_aio_pwritev(s->target, op->sector_num * BDRV_SECTOR_SIZE, &op->qiov,
+                        0, mirror_write_complete, op);
     }
-    blk_aio_pwritev(s->target, op->sector_num * BDRV_SECTOR_SIZE, &op->qiov,
-                    0, mirror_write_complete, op);
+    aio_context_release(blk_get_aio_context(s->common.blk));
 }
 
 static inline void mirror_clip_sectors(MirrorBlockJob *s,
diff --git a/block/null.c b/block/null.c
index 5eb2038..b300390 100644
--- a/block/null.c
+++ b/block/null.c
@@ -134,22 +134,14 @@ static const AIOCBInfo null_aiocb_info = {
 static void null_bh_cb(void *opaque)
 {
     NullAIOCB *acb = opaque;
-    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
-
-    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, 0);
-    aio_context_release(ctx);
     qemu_aio_unref(acb);
 }
 
 static void null_timer_cb(void *opaque)
 {
     NullAIOCB *acb = opaque;
-    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
-
-    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, 0);
-    aio_context_release(ctx);
     timer_deinit(&acb->timer);
     qemu_aio_unref(acb);
 }
diff --git a/block/qed-cluster.c b/block/qed-cluster.c
index c24e756..8f5da74 100644
--- a/block/qed-cluster.c
+++ b/block/qed-cluster.c
@@ -83,6 +83,7 @@ static void qed_find_cluster_cb(void *opaque, int ret)
     unsigned int index;
     unsigned int n;
 
+    qed_acquire(s);
     if (ret) {
         goto out;
     }
@@ -109,6 +110,7 @@ static void qed_find_cluster_cb(void *opaque, int ret)
 
 out:
     find_cluster_cb->cb(find_cluster_cb->opaque, ret, offset, len);
+    qed_release(s);
     g_free(find_cluster_cb);
 }
 
diff --git a/block/qed-table.c b/block/qed-table.c
index ed443e2..b12c298 100644
--- a/block/qed-table.c
+++ b/block/qed-table.c
@@ -31,6 +31,7 @@ static void qed_read_table_cb(void *opaque, int ret)
 {
     QEDReadTableCB *read_table_cb = opaque;
     QEDTable *table = read_table_cb->table;
+    BDRVQEDState *s = read_table_cb->s;
     int noffsets = read_table_cb->qiov.size / sizeof(uint64_t);
     int i;
 
@@ -40,13 +41,15 @@ static void qed_read_table_cb(void *opaque, int ret)
     }
 
     /* Byteswap offsets */
+    qed_acquire(s);
     for (i = 0; i < noffsets; i++) {
         table->offsets[i] = le64_to_cpu(table->offsets[i]);
     }
+    qed_release(s);
 
 out:
     /* Completion */
-    trace_qed_read_table_cb(read_table_cb->s, read_table_cb->table, ret);
+    trace_qed_read_table_cb(s, read_table_cb->table, ret);
     gencb_complete(&read_table_cb->gencb, ret);
 }
 
@@ -84,8 +87,9 @@ typedef struct {
 static void qed_write_table_cb(void *opaque, int ret)
 {
     QEDWriteTableCB *write_table_cb = opaque;
+    BDRVQEDState *s = write_table_cb->s;
 
-    trace_qed_write_table_cb(write_table_cb->s,
+    trace_qed_write_table_cb(s,
                              write_table_cb->orig_table,
                              write_table_cb->flush,
                              ret);
@@ -97,8 +101,10 @@ static void qed_write_table_cb(void *opaque, int ret)
     if (write_table_cb->flush) {
         /* We still need to flush first */
         write_table_cb->flush = false;
+        qed_acquire(s);
         bdrv_aio_flush(write_table_cb->s->bs, qed_write_table_cb,
                        write_table_cb);
+        qed_release(s);
         return;
     }
 
@@ -213,6 +219,7 @@ static void qed_read_l2_table_cb(void *opaque, int ret)
     CachedL2Table *l2_table = request->l2_table;
     uint64_t l2_offset = read_l2_table_cb->l2_offset;
 
+    qed_acquire(s);
     if (ret) {
         /* can't trust loaded L2 table anymore */
         qed_unref_l2_cache_entry(l2_table);
@@ -228,6 +235,7 @@ static void qed_read_l2_table_cb(void *opaque, int ret)
         request->l2_table = qed_find_l2_cache_entry(&s->l2_cache, l2_offset);
         assert(request->l2_table != NULL);
     }
+    qed_release(s);
 
     gencb_complete(&read_l2_table_cb->gencb, ret);
 }
diff --git a/block/qed.c b/block/qed.c
index db8295d..0b62c77 100644
--- a/block/qed.c
+++ b/block/qed.c
@@ -745,7 +745,7 @@ static void qed_is_allocated_cb(void *opaque, int ret, uint64_t offset, size_t l
     }
 
     if (cb->co) {
-        qemu_coroutine_enter(cb->co);
+        aio_co_wake(cb->co);
     }
 }
 
@@ -1462,7 +1462,7 @@ static void coroutine_fn qed_co_pwrite_zeroes_cb(void *opaque, int ret)
     cb->done = true;
     cb->ret = ret;
     if (cb->co) {
-        qemu_coroutine_enter(cb->co);
+        aio_co_wake(cb->co);
     }
 }
 
diff --git a/block/rbd.c b/block/rbd.c
index 2cb2cb4..a57b3e3 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -413,7 +413,6 @@ shutdown:
 static void qemu_rbd_complete_aio(RADOSCB *rcb)
 {
     RBDAIOCB *acb = rcb->acb;
-    AioContext *ctx = bdrv_get_aio_context(acb->common.bs);
     int64_t r;
 
     r = rcb->ret;
@@ -446,10 +445,7 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
         qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
     }
     qemu_vfree(acb->bounce);
-
-    aio_context_acquire(ctx);
     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
-    aio_context_release(ctx);
 
     qemu_aio_unref(acb);
 }
diff --git a/block/win32-aio.c b/block/win32-aio.c
index c3f8f1a..3be8f45 100644
--- a/block/win32-aio.c
+++ b/block/win32-aio.c
@@ -87,10 +87,7 @@ static void win32_aio_process_completion(QEMUWin32AIOState *s,
         qemu_vfree(waiocb->buf);
     }
 
-
-    aio_context_acquire(s->aio_ctx);
     waiocb->common.cb(waiocb->common.opaque, ret);
-    aio_context_release(s->aio_ctx);
     qemu_aio_unref(waiocb);
 }
 
diff --git a/hw/block/virtio-blk.c b/hw/block/virtio-blk.c
index af652f3..39516e8 100644
--- a/hw/block/virtio-blk.c
+++ b/hw/block/virtio-blk.c
@@ -89,7 +89,9 @@ static int virtio_blk_handle_rw_error(VirtIOBlockReq *req, int error,
 static void virtio_blk_rw_complete(void *opaque, int ret)
 {
     VirtIOBlockReq *next = opaque;
+    VirtIOBlock *s = next->dev;
 
+    aio_context_acquire(blk_get_aio_context(s->conf.conf.blk));
     while (next) {
         VirtIOBlockReq *req = next;
         next = req->mr_next;
@@ -122,21 +124,27 @@ static void virtio_blk_rw_complete(void *opaque, int ret)
         block_acct_done(blk_get_stats(req->dev->blk), &req->acct);
         virtio_blk_free_request(req);
     }
+    aio_context_release(blk_get_aio_context(s->conf.conf.blk));
 }
 
 static void virtio_blk_flush_complete(void *opaque, int ret)
 {
     VirtIOBlockReq *req = opaque;
+    VirtIOBlock *s = req->dev;
 
+    aio_context_acquire(blk_get_aio_context(s->conf.conf.blk));
     if (ret) {
         if (virtio_blk_handle_rw_error(req, -ret, 0)) {
-            return;
+            goto out;
         }
     }
 
     virtio_blk_req_complete(req, VIRTIO_BLK_S_OK);
     block_acct_done(blk_get_stats(req->dev->blk), &req->acct);
     virtio_blk_free_request(req);
+
+out:
+    aio_context_release(blk_get_aio_context(s->conf.conf.blk));
 }
 
 #ifdef __linux__
@@ -183,8 +191,10 @@ static void virtio_blk_ioctl_complete(void *opaque, int status)
     virtio_stl_p(vdev, &scsi->data_len, hdr->dxfer_len);
 
 out:
+    aio_context_acquire(blk_get_aio_context(s->conf.conf.blk));
     virtio_blk_req_complete(req, status);
     virtio_blk_free_request(req);
+    aio_context_release(blk_get_aio_context(s->conf.conf.blk));
     g_free(ioctl_req);
 }
 
diff --git a/hw/scsi/scsi-disk.c b/hw/scsi/scsi-disk.c
index bdd1e5f..baf66b5 100644
--- a/hw/scsi/scsi-disk.c
+++ b/hw/scsi/scsi-disk.c
@@ -207,6 +207,7 @@ static void scsi_aio_complete(void *opaque, int ret)
 
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
+    aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
     if (scsi_disk_req_check_error(r, ret, true)) {
         goto done;
     }
@@ -215,6 +216,7 @@ static void scsi_aio_complete(void *opaque, int ret)
     scsi_req_complete(&r->req, GOOD);
 
 done:
+    aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
     scsi_req_unref(&r->req);
 }
 
@@ -290,12 +292,14 @@ static void scsi_dma_complete(void *opaque, int ret)
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
 
+    aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
     if (ret < 0) {
         block_acct_failed(blk_get_stats(s->qdev.conf.blk), &r->acct);
     } else {
         block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
     }
     scsi_dma_complete_noio(r, ret);
+    aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
 }
 
 static void scsi_read_complete(void * opaque, int ret)
@@ -306,6 +310,7 @@ static void scsi_read_complete(void * opaque, int ret)
 
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
+    aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
     if (scsi_disk_req_check_error(r, ret, true)) {
         goto done;
     }
@@ -320,6 +325,7 @@ static void scsi_read_complete(void * opaque, int ret)
 
 done:
     scsi_req_unref(&r->req);
+    aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
 }
 
 /* Actually issue a read to the block device.  */
@@ -364,12 +370,14 @@ static void scsi_do_read_cb(void *opaque, int ret)
     assert (r->req.aiocb != NULL);
     r->req.aiocb = NULL;
 
+    aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
     if (ret < 0) {
         block_acct_failed(blk_get_stats(s->qdev.conf.blk), &r->acct);
     } else {
         block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
     }
     scsi_do_read(opaque, ret);
+    aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
 }
 
 /* Read more data from scsi device into buffer.  */
@@ -489,12 +497,14 @@ static void scsi_write_complete(void * opaque, int ret)
     assert (r->req.aiocb != NULL);
     r->req.aiocb = NULL;
 
+    aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
     if (ret < 0) {
         block_acct_failed(blk_get_stats(s->qdev.conf.blk), &r->acct);
     } else {
         block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
     }
     scsi_write_complete_noio(r, ret);
+    aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
 }
 
 static void scsi_write_data(SCSIRequest *req)
@@ -1625,11 +1635,14 @@ static void scsi_unmap_complete(void *opaque, int ret)
 {
     UnmapCBData *data = opaque;
     SCSIDiskReq *r = data->r;
+    SCSIDiskState *s = DO_UPCAST(SCSIDiskState, qdev, r->req.dev);
 
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
 
+    aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
     scsi_unmap_complete_noio(data, ret);
+    aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
 }
 
 static void scsi_disk_emulate_unmap(SCSIDiskReq *r, uint8_t *inbuf)
@@ -1696,6 +1709,7 @@ static void scsi_write_same_complete(void *opaque, int ret)
 
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
+    aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
     if (scsi_disk_req_check_error(r, ret, true)) {
         goto done;
     }
@@ -1724,6 +1738,7 @@ done:
     scsi_req_unref(&r->req);
     qemu_vfree(data->iov.iov_base);
     g_free(data);
+    aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
 }
 
 static void scsi_disk_emulate_write_same(SCSIDiskReq *r, uint8_t *inbuf)
diff --git a/hw/scsi/scsi-generic.c b/hw/scsi/scsi-generic.c
index 7a588a7..fdf9168 100644
--- a/hw/scsi/scsi-generic.c
+++ b/hw/scsi/scsi-generic.c
@@ -143,10 +143,14 @@ done:
 static void scsi_command_complete(void *opaque, int ret)
 {
     SCSIGenericReq *r = (SCSIGenericReq *)opaque;
+    SCSIDevice *s = r->req.dev;
 
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
+
+    aio_context_acquire(blk_get_aio_context(s->conf.blk));
     scsi_command_complete_noio(r, ret);
+    aio_context_release(blk_get_aio_context(s->conf.blk));
 }
 
 static int execute_command(BlockBackend *blk,
@@ -182,9 +186,11 @@ static void scsi_read_complete(void * opaque, int ret)
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
 
+    aio_context_acquire(blk_get_aio_context(s->conf.blk));
+
     if (ret || r->req.io_canceled) {
         scsi_command_complete_noio(r, ret);
-        return;
+        goto done;
     }
 
     len = r->io_header.dxfer_len - r->io_header.resid;
@@ -193,7 +199,7 @@ static void scsi_read_complete(void * opaque, int ret)
     r->len = -1;
     if (len == 0) {
         scsi_command_complete_noio(r, 0);
-        return;
+        goto done;
     }
 
     /* Snoop READ CAPACITY output to set the blocksize.  */
@@ -237,6 +243,9 @@ static void scsi_read_complete(void * opaque, int ret)
     }
     scsi_req_data(&r->req, len);
     scsi_req_unref(&r->req);
+
+done:
+    aio_context_release(blk_get_aio_context(s->conf.blk));
 }
 
 /* Read more data from scsi device into buffer.  */
@@ -272,9 +281,11 @@ static void scsi_write_complete(void * opaque, int ret)
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
 
+    aio_context_acquire(blk_get_aio_context(s->conf.blk));
+
     if (ret || r->req.io_canceled) {
         scsi_command_complete_noio(r, ret);
-        return;
+        goto done;
     }
 
     if (r->req.cmd.buf[0] == MODE_SELECT && r->req.cmd.buf[4] == 12 &&
@@ -284,6 +295,9 @@ static void scsi_write_complete(void * opaque, int ret)
     }
 
     scsi_command_complete_noio(r, ret);
+
+done:
+    aio_context_release(blk_get_aio_context(s->conf.blk));
 }
 
 /* Write data to a scsi device.  Returns nonzero on failure.
diff --git a/util/thread-pool.c b/util/thread-pool.c
index 7c9cec5..ce6cd30 100644
--- a/util/thread-pool.c
+++ b/util/thread-pool.c
@@ -185,7 +185,9 @@ restart:
              */
             qemu_bh_schedule(pool->completion_bh);
 
+            aio_context_release(pool->ctx);
             elem->common.cb(elem->common.opaque, elem->ret);
+            aio_context_acquire(pool->ctx);
             qemu_aio_unref(elem);
             goto restart;
         } else {
@@ -269,7 +271,7 @@ static void thread_pool_co_cb(void *opaque, int ret)
     ThreadPoolCo *co = opaque;
 
     co->ret = ret;
-    qemu_coroutine_enter(co->co);
+    aio_co_wake(co->co);
 }
 
 int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH 15/17] aio-posix: partially inline aio_dispatch into aio_poll
  2017-01-20 16:43 [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Paolo Bonzini
                   ` (13 preceding siblings ...)
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 14/17] block: explicitly acquire aiocontext in aio callbacks " Paolo Bonzini
@ 2017-01-20 16:43 ` Paolo Bonzini
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 16/17] async: remove unnecessary inc/dec pairs Paolo Bonzini
                   ` (3 subsequent siblings)
  18 siblings, 0 replies; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-20 16:43 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz

This patch prepares for the removal of unnecessary lockcnt inc/dec pairs.
Extract the dispatching loop for file descriptor handlers into a new
function aio_dispatch_handlers, and then inline aio_dispatch into
aio_poll.

aio_dispatch can now become void.

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 include/block/aio.h |  6 +-----
 util/aio-posix.c    | 44 ++++++++++++++------------------------------
 util/aio-win32.c    | 13 ++++---------
 util/async.c        |  2 +-
 4 files changed, 20 insertions(+), 45 deletions(-)

diff --git a/include/block/aio.h b/include/block/aio.h
index 614cbc6..677b6ff 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -310,12 +310,8 @@ bool aio_pending(AioContext *ctx);
 /* Dispatch any pending callbacks from the GSource attached to the AioContext.
  *
  * This is used internally in the implementation of the GSource.
- *
- * @dispatch_fds: true to process fds, false to skip them
- *                (can be used as an optimization by callers that know there
- *                are no fds ready)
  */
-bool aio_dispatch(AioContext *ctx, bool dispatch_fds);
+void aio_dispatch(AioContext *ctx);
 
 /* Progress in completing AIO work to occur.  This can issue new pending
  * aio as a result of executing I/O completion or bh callbacks.
diff --git a/util/aio-posix.c b/util/aio-posix.c
index 6beebcd..51e92b8 100644
--- a/util/aio-posix.c
+++ b/util/aio-posix.c
@@ -386,12 +386,6 @@ static bool aio_dispatch_handlers(AioContext *ctx)
     AioHandler *node, *tmp;
     bool progress = false;
 
-    /*
-     * We have to walk very carefully in case aio_set_fd_handler is
-     * called while we're walking.
-     */
-    qemu_lockcnt_inc(&ctx->list_lock);
-
     QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
         int revents;
 
@@ -426,33 +420,18 @@ static bool aio_dispatch_handlers(AioContext *ctx)
         }
     }
 
-    qemu_lockcnt_dec(&ctx->list_lock);
     return progress;
 }
 
-/*
- * Note that dispatch_fds == false has the side-effect of post-poning the
- * freeing of deleted handlers.
- */
-bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
+void aio_dispatch(AioContext *ctx)
 {
-    bool progress;
-
-    /*
-     * If there are callbacks left that have been queued, we need to call them.
-     * Do not call select in this case, because it is possible that the caller
-     * does not need a complete flush (as is the case for aio_poll loops).
-     */
-    progress = aio_bh_poll(ctx);
+    aio_bh_poll(ctx);
 
-    if (dispatch_fds) {
-        progress |= aio_dispatch_handlers(ctx);
-    }
-
-    /* Run our timers */
-    progress |= timerlistgroup_run_timers(&ctx->tlg);
+    qemu_lockcnt_inc(&ctx->list_lock);
+    aio_dispatch_handlers(ctx);
+    qemu_lockcnt_dec(&ctx->list_lock);
 
-    return progress;
+    timerlistgroup_run_timers(&ctx->tlg);
 }
 
 /* These thread-local variables are used only in a small part of aio_poll
@@ -701,11 +680,16 @@ bool aio_poll(AioContext *ctx, bool blocking)
     npfd = 0;
     qemu_lockcnt_dec(&ctx->list_lock);
 
-    /* Run dispatch even if there were no readable fds to run timers */
-    if (aio_dispatch(ctx, ret > 0)) {
-        progress = true;
+    progress |= aio_bh_poll(ctx);
+
+    if (ret > 0) {
+        qemu_lockcnt_inc(&ctx->list_lock);
+        progress |= aio_dispatch_handlers(ctx);
+        qemu_lockcnt_dec(&ctx->list_lock);
     }
 
+    progress |= timerlistgroup_run_timers(&ctx->tlg);
+
     return progress;
 }
 
diff --git a/util/aio-win32.c b/util/aio-win32.c
index 20b63ce..442a179 100644
--- a/util/aio-win32.c
+++ b/util/aio-win32.c
@@ -309,16 +309,11 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
     return progress;
 }
 
-bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
+void aio_dispatch(AioContext *ctx)
 {
-    bool progress;
-
-    progress = aio_bh_poll(ctx);
-    if (dispatch_fds) {
-        progress |= aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
-    }
-    progress |= timerlistgroup_run_timers(&ctx->tlg);
-    return progress;
+    aio_bh_poll(ctx);
+    aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
+    timerlistgroup_run_timers(&ctx->tlg);
 }
 
 bool aio_poll(AioContext *ctx, bool blocking)
diff --git a/util/async.c b/util/async.c
index 99b9d7e..cc40735 100644
--- a/util/async.c
+++ b/util/async.c
@@ -258,7 +258,7 @@ aio_ctx_dispatch(GSource     *source,
     AioContext *ctx = (AioContext *) source;
 
     assert(callback == NULL);
-    aio_dispatch(ctx, true);
+    aio_dispatch(ctx);
     return true;
 }
 
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH 16/17] async: remove unnecessary inc/dec pairs
  2017-01-20 16:43 [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Paolo Bonzini
                   ` (14 preceding siblings ...)
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 15/17] aio-posix: partially inline aio_dispatch into aio_poll Paolo Bonzini
@ 2017-01-20 16:43 ` Paolo Bonzini
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 17/17] block: document fields protected by AioContext lock Paolo Bonzini
                   ` (2 subsequent siblings)
  18 siblings, 0 replies; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-20 16:43 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz

Pull the increment/decrement pair out of aio_bh_poll and into the
callers.

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 util/aio-posix.c |  8 +++-----
 util/aio-win32.c |  8 ++++----
 util/async.c     | 12 ++++++------
 3 files changed, 13 insertions(+), 15 deletions(-)

diff --git a/util/aio-posix.c b/util/aio-posix.c
index 51e92b8..2537bcd 100644
--- a/util/aio-posix.c
+++ b/util/aio-posix.c
@@ -425,9 +425,8 @@ static bool aio_dispatch_handlers(AioContext *ctx)
 
 void aio_dispatch(AioContext *ctx)
 {
-    aio_bh_poll(ctx);
-
     qemu_lockcnt_inc(&ctx->list_lock);
+    aio_bh_poll(ctx);
     aio_dispatch_handlers(ctx);
     qemu_lockcnt_dec(&ctx->list_lock);
 
@@ -678,16 +677,15 @@ bool aio_poll(AioContext *ctx, bool blocking)
     }
 
     npfd = 0;
-    qemu_lockcnt_dec(&ctx->list_lock);
 
     progress |= aio_bh_poll(ctx);
 
     if (ret > 0) {
-        qemu_lockcnt_inc(&ctx->list_lock);
         progress |= aio_dispatch_handlers(ctx);
-        qemu_lockcnt_dec(&ctx->list_lock);
     }
 
+    qemu_lockcnt_dec(&ctx->list_lock);
+
     progress |= timerlistgroup_run_timers(&ctx->tlg);
 
     return progress;
diff --git a/util/aio-win32.c b/util/aio-win32.c
index 442a179..bca496a 100644
--- a/util/aio-win32.c
+++ b/util/aio-win32.c
@@ -253,8 +253,6 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
     bool progress = false;
     AioHandler *tmp;
 
-    qemu_lockcnt_inc(&ctx->list_lock);
-
     /*
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
@@ -305,14 +303,15 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
         }
     }
 
-    qemu_lockcnt_dec(&ctx->list_lock);
     return progress;
 }
 
 void aio_dispatch(AioContext *ctx)
 {
+    qemu_lockcnt_inc(&ctx->list_lock);
     aio_bh_poll(ctx);
     aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
+    qemu_lockcnt_dec(&ctx->list_lock);
     timerlistgroup_run_timers(&ctx->tlg);
 }
 
@@ -349,7 +348,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
         }
     }
 
-    qemu_lockcnt_dec(&ctx->list_lock);
     first = true;
 
     /* ctx->notifier is always registered.  */
@@ -392,6 +390,8 @@ bool aio_poll(AioContext *ctx, bool blocking)
         progress |= aio_dispatch_handlers(ctx, event);
     } while (count > 0);
 
+    qemu_lockcnt_dec(&ctx->list_lock);
+
     progress |= timerlistgroup_run_timers(&ctx->tlg);
     return progress;
 }
diff --git a/util/async.c b/util/async.c
index cc40735..9c3ce6a 100644
--- a/util/async.c
+++ b/util/async.c
@@ -90,15 +90,16 @@ void aio_bh_call(QEMUBH *bh)
     bh->cb(bh->opaque);
 }
 
-/* Multiple occurrences of aio_bh_poll cannot be called concurrently */
+/* Multiple occurrences of aio_bh_poll cannot be called concurrently.
+ * The count in ctx->list_lock is incremented before the call, and is
+ * not affected by the call.
+ */
 int aio_bh_poll(AioContext *ctx)
 {
     QEMUBH *bh, **bhp, *next;
     int ret;
     bool deleted = false;
 
-    qemu_lockcnt_inc(&ctx->list_lock);
-
     ret = 0;
     for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) {
         next = atomic_rcu_read(&bh->next);
@@ -123,11 +124,10 @@ int aio_bh_poll(AioContext *ctx)
 
     /* remove deleted bhs */
     if (!deleted) {
-        qemu_lockcnt_dec(&ctx->list_lock);
         return ret;
     }
 
-    if (qemu_lockcnt_dec_and_lock(&ctx->list_lock)) {
+    if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
         bhp = &ctx->first_bh;
         while (*bhp) {
             bh = *bhp;
@@ -138,7 +138,7 @@ int aio_bh_poll(AioContext *ctx)
                 bhp = &bh->next;
             }
         }
-        qemu_lockcnt_unlock(&ctx->list_lock);
+        qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
     }
     return ret;
 }
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH 17/17] block: document fields protected by AioContext lock
  2017-01-20 16:43 [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Paolo Bonzini
                   ` (15 preceding siblings ...)
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 16/17] async: remove unnecessary inc/dec pairs Paolo Bonzini
@ 2017-01-20 16:43 ` Paolo Bonzini
  2017-01-24  9:38 ` [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Daniel P. Berrange
  2017-01-29 20:48 ` Paolo Bonzini
  18 siblings, 0 replies; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-20 16:43 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 include/block/block_int.h      | 64 +++++++++++++++++++++++++-----------------
 include/sysemu/block-backend.h | 14 ++++++---
 2 files changed, 49 insertions(+), 29 deletions(-)

diff --git a/include/block/block_int.h b/include/block/block_int.h
index 2d92d7e..1670941 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -430,8 +430,9 @@ struct BdrvChild {
  * copied as well.
  */
 struct BlockDriverState {
-    int64_t total_sectors; /* if we are reading a disk image, give its
-                              size in sectors */
+    /* Protected by big QEMU lock or read-only after opening.  No special
+     * locking needed during I/O...
+     */
     int open_flags; /* flags used to open the file, re-used for re-open */
     bool read_only; /* if true, the media is read only */
     bool encrypted; /* if true, the media is encrypted */
@@ -439,14 +440,6 @@ struct BlockDriverState {
     bool sg;        /* if true, the device is a /dev/sg* */
     bool probed;    /* if true, format was probed rather than specified */
 
-    int copy_on_read; /* if nonzero, copy read backing sectors into image.
-                         note this is a reference count */
-
-    CoQueue flush_queue;            /* Serializing flush queue */
-    bool active_flush_req;          /* Flush request in flight? */
-    unsigned int write_gen;         /* Current data generation */
-    unsigned int flushed_gen;       /* Flushed write generation */
-
     BlockDriver *drv; /* NULL means no media */
     void *opaque;
 
@@ -468,18 +461,6 @@ struct BlockDriverState {
     BdrvChild *backing;
     BdrvChild *file;
 
-    /* Callback before write request is processed */
-    NotifierWithReturnList before_write_notifiers;
-
-    /* number of in-flight requests; overall and serialising */
-    unsigned int in_flight;
-    unsigned int serialising_in_flight;
-
-    bool wakeup;
-
-    /* Offset after the highest byte written to */
-    uint64_t wr_highest_offset;
-
     /* I/O Limits */
     BlockLimits bl;
 
@@ -497,11 +478,8 @@ struct BlockDriverState {
     QTAILQ_ENTRY(BlockDriverState) bs_list;
     /* element of the list of monitor-owned BDS */
     QTAILQ_ENTRY(BlockDriverState) monitor_list;
-    QLIST_HEAD(, BdrvDirtyBitmap) dirty_bitmaps;
     int refcnt;
 
-    QLIST_HEAD(, BdrvTrackedRequest) tracked_requests;
-
     /* operation blockers */
     QLIST_HEAD(, BdrvOpBlocker) op_blockers[BLOCK_OP_TYPE_MAX];
 
@@ -522,6 +500,31 @@ struct BlockDriverState {
     /* The error object in use for blocking operations on backing_hd */
     Error *backing_blocker;
 
+    /* Protected by AioContext lock */
+
+    /* If true, copy read backing sectors into image.  Can be >1 if more
+     * than one client has requested copy-on-read.
+     */
+    int copy_on_read;
+
+    /* If we are reading a disk image, give its size in sectors.
+     * Generally read-only; it is written to by load_vmstate and save_vmstate,
+     * but the block layer is quiescent during those.
+     */
+    int64_t total_sectors;
+
+    /* Callback before write request is processed */
+    NotifierWithReturnList before_write_notifiers;
+
+    /* number of in-flight requests; overall and serialising */
+    unsigned int in_flight;
+    unsigned int serialising_in_flight;
+
+    bool wakeup;
+
+    /* Offset after the highest byte written to */
+    uint64_t wr_highest_offset;
+
     /* threshold limit for writes, in bytes. "High water mark". */
     uint64_t write_threshold_offset;
     NotifierWithReturn write_threshold_notifier;
@@ -529,6 +532,17 @@ struct BlockDriverState {
     /* counter for nested bdrv_io_plug */
     unsigned io_plugged;
 
+    QLIST_HEAD(, BdrvTrackedRequest) tracked_requests;
+    CoQueue flush_queue;                  /* Serializing flush queue */
+    bool active_flush_req;                /* Flush request in flight? */
+    unsigned int write_gen;               /* Current data generation */
+    unsigned int flushed_gen;             /* Flushed write generation */
+
+    QLIST_HEAD(, BdrvDirtyBitmap) dirty_bitmaps;
+
+    /* do we need to tell the quest if we have a volatile write cache? */
+    int enable_write_cache;
+
     int quiesce_counter;
 };
 
diff --git a/include/sysemu/block-backend.h b/include/sysemu/block-backend.h
index 6444e41..f365a51 100644
--- a/include/sysemu/block-backend.h
+++ b/include/sysemu/block-backend.h
@@ -64,14 +64,20 @@ typedef struct BlockDevOps {
  * fields that must be public. This is in particular for QLIST_ENTRY() and
  * friends so that BlockBackends can be kept in lists outside block-backend.c */
 typedef struct BlockBackendPublic {
-    /* I/O throttling.
-     * throttle_state tells us if this BlockBackend has I/O limits configured.
-     * io_limits_disabled tells us if they are currently being enforced */
+    /* I/O throttling has its own locking, but also some fields are
+     * protected by the AioContext lock.
+     */
+
+    /* Protected by AioContext lock.  */
     CoQueue      throttled_reqs[2];
+
+    /* Nonzero if the I/O limits are currently being ignored; generally
+     * it is zero.  */
     unsigned int io_limits_disabled;
 
     /* The following fields are protected by the ThrottleGroup lock.
-     * See the ThrottleGroup documentation for details. */
+     * See the ThrottleGroup documentation for details.
+     * throttle_state tells us if I/O limits are configured. */
     ThrottleState *throttle_state;
     ThrottleTimers throttle_timers;
     unsigned       pending_reqs[2];
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH 04/17] block: move AioContext and QEMUTimer to libqemuutil
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 04/17] block: move AioContext and QEMUTimer to libqemuutil Paolo Bonzini
@ 2017-01-24  9:34   ` Daniel P. Berrange
  2017-01-30 15:24   ` Stefan Hajnoczi
  1 sibling, 0 replies; 35+ messages in thread
From: Daniel P. Berrange @ 2017-01-24  9:34 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, famz, stefanha

On Fri, Jan 20, 2017 at 05:43:09PM +0100, Paolo Bonzini wrote:
> AioContext is fairly self contained, the only dependency is QEMUTimer but
> that in turn doesn't need anything else.  So move them out of block-obj-y
> to avoid introducing a dependency from io/ to block-obj-y.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>         v1->v2: new patch [Daniel]
> 
>  Makefile.objs                       |  5 +---
>  block/io.c                          | 29 -------------------
>  stubs/Makefile.objs                 |  2 ++
>  stubs/linux-aio.c                   | 32 +++++++++++++++++++++
>  stubs/main-loop.c                   |  8 ++++++
>  stubs/set-fd-handler.c              | 11 --------
>  tests/Makefile.include              | 13 ++++-----
>  util/Makefile.objs                  |  5 +++-
>  aio-posix.c => util/aio-posix.c     |  0
>  aio-win32.c => util/aio-win32.c     |  0
>  util/aiocb.c                        | 55 +++++++++++++++++++++++++++++++++++++
>  async.c => util/async.c             |  3 +-
>  qemu-timer.c => util/qemu-timer.c   |  0
>  thread-pool.c => util/thread-pool.c |  0
>  14 files changed, 110 insertions(+), 53 deletions(-)
>  create mode 100644 stubs/linux-aio.c
>  create mode 100644 stubs/main-loop.c
>  rename aio-posix.c => util/aio-posix.c (100%)
>  rename aio-win32.c => util/aio-win32.c (100%)
>  create mode 100644 util/aiocb.c
>  rename async.c => util/async.c (99%)
>  rename qemu-timer.c => util/qemu-timer.c (100%)
>  rename thread-pool.c => util/thread-pool.c (100%)

Reviewed-by: Daniel P. Berrange <berrange@redhat.com>


Regards,
Daniel
-- 
|: http://berrange.com      -o-    http://www.flickr.com/photos/dberrange/ :|
|: http://libvirt.org              -o-             http://virt-manager.org :|
|: http://entangle-photo.org       -o-    http://search.cpan.org/~danberr/ :|

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH 05/17] io: add methods to set I/O handlers on AioContext
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 05/17] io: add methods to set I/O handlers on AioContext Paolo Bonzini
@ 2017-01-24  9:35   ` Daniel P. Berrange
  2017-01-30 15:32   ` Stefan Hajnoczi
  1 sibling, 0 replies; 35+ messages in thread
From: Daniel P. Berrange @ 2017-01-24  9:35 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, stefanha, famz

On Fri, Jan 20, 2017 at 05:43:10PM +0100, Paolo Bonzini wrote:
> This is in preparation for making qio_channel_yield work on
> AioContexts other than the main one.
> 
> Cc: Daniel P. Berrange <berrange@redhat.com>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>         v1->v2: removed QIOChannelRestart [Daniel]
> 
>  include/io/channel.h | 25 +++++++++++++++++++++++++
>  io/channel-command.c | 13 +++++++++++++
>  io/channel-file.c    | 11 +++++++++++
>  io/channel-socket.c  | 16 +++++++++++-----
>  io/channel-tls.c     | 12 ++++++++++++
>  io/channel-watch.c   |  6 ++++++
>  io/channel.c         | 11 +++++++++++
>  7 files changed, 89 insertions(+), 5 deletions(-)

Signed-off-by: Daniel P. Berrange <berrange@redhat.com>

Regards,
Daniel
-- 
|: http://berrange.com      -o-    http://www.flickr.com/photos/dberrange/ :|
|: http://libvirt.org              -o-             http://virt-manager.org :|
|: http://entangle-photo.org       -o-    http://search.cpan.org/~danberr/ :|

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH 06/17] io: make qio_channel_yield aware of AioContexts
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 06/17] io: make qio_channel_yield aware of AioContexts Paolo Bonzini
@ 2017-01-24  9:36   ` Daniel P. Berrange
  2017-01-24  9:38     ` Paolo Bonzini
  2017-01-30 15:37   ` Stefan Hajnoczi
  1 sibling, 1 reply; 35+ messages in thread
From: Daniel P. Berrange @ 2017-01-24  9:36 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, stefanha, famz

On Fri, Jan 20, 2017 at 05:43:11PM +0100, Paolo Bonzini wrote:
> Support separate coroutines for reading and writing, and place the
> read/write handlers on the AioContext that the QIOChannel is registered
> with.
> 
> Cc: Daniel P. Berrange <berrange@redhat.com>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>         v1->v2:
>         improved qio_channel_set_aio_context docs, renamed to
>                 qio_channel_attach_aio_context [Daniel, Stefan]
>         fixed pasto in "io: make qio_channel_yield aware of AioContexts"
> 
>  include/io/channel.h | 47 ++++++++++++++++++++++++++--
>  io/channel.c         | 86 +++++++++++++++++++++++++++++++++++++++-------------
>  2 files changed, 109 insertions(+), 24 deletions(-)

Signed-off-by: Daniel P. Berrange <berrange@redhat.com>

Regards,
Daniel
-- 
|: http://berrange.com      -o-    http://www.flickr.com/photos/dberrange/ :|
|: http://libvirt.org              -o-             http://virt-manager.org :|
|: http://entangle-photo.org       -o-    http://search.cpan.org/~danberr/ :|

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2
  2017-01-20 16:43 [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Paolo Bonzini
                   ` (16 preceding siblings ...)
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 17/17] block: document fields protected by AioContext lock Paolo Bonzini
@ 2017-01-24  9:38 ` Daniel P. Berrange
  2017-01-29 20:48 ` Paolo Bonzini
  18 siblings, 0 replies; 35+ messages in thread
From: Daniel P. Berrange @ 2017-01-24  9:38 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, famz, stefanha

On Fri, Jan 20, 2017 at 05:43:05PM +0100, Paolo Bonzini wrote:
> This series pushes down aio_context_acquire/release to the point
> where we can actually reason on using different fine-grained mutexes.
> 
> The main infrastructure is introduced in patch 1.  The new API aio_co_wake
> starts a coroutine with aio_context_acquire/release protection, which
> requires tracking each coroutine's "home" AioContext.  aio_co_schedule
> instead takes care of moving a sleeping coroutine to a different
> AioContext, also ensuring that it runs under aio_context_acquire/release.
> This is useful to implement bdrv_set_aio_context, as a simpler alternative
> to bottom halves.  Even though one-shot BHs are already simpler than
> what we had before, after this patch aio_co_wake and aio_co_schedule
> save you from having to do aio_context_acquire/release explicitly.
> 
> After patch 2 to 4, which are just small preparatory changes, patches
> 5 to 8 provide an example of how to use the new API.  In particular patch
> 5 to 7 implement a new organization of coroutines in the NBD client,
> which allows not blocking on partial reply header reads.

FYI, I added my S-o-B to the two io: patches, so when this series is ready
just include them in your main pull request - no need for me to take those
io: patches separately.

Regards,
Daniel
-- 
|: http://berrange.com      -o-    http://www.flickr.com/photos/dberrange/ :|
|: http://libvirt.org              -o-             http://virt-manager.org :|
|: http://entangle-photo.org       -o-    http://search.cpan.org/~danberr/ :|

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH 06/17] io: make qio_channel_yield aware of AioContexts
  2017-01-24  9:36   ` Daniel P. Berrange
@ 2017-01-24  9:38     ` Paolo Bonzini
  2017-01-24  9:40       ` Daniel P. Berrange
  0 siblings, 1 reply; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-24  9:38 UTC (permalink / raw)
  To: Daniel P. Berrange; +Cc: qemu-devel, stefanha, famz



On 24/01/2017 10:36, Daniel P. Berrange wrote:
> On Fri, Jan 20, 2017 at 05:43:11PM +0100, Paolo Bonzini wrote:
>> Support separate coroutines for reading and writing, and place the
>> read/write handlers on the AioContext that the QIOChannel is registered
>> with.
>>
>> Cc: Daniel P. Berrange <berrange@redhat.com>
>> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
>> ---
>>         v1->v2:
>>         improved qio_channel_set_aio_context docs, renamed to
>>                 qio_channel_attach_aio_context [Daniel, Stefan]
>>         fixed pasto in "io: make qio_channel_yield aware of AioContexts"
>>
>>  include/io/channel.h | 47 ++++++++++++++++++++++++++--
>>  io/channel.c         | 86 +++++++++++++++++++++++++++++++++++++++-------------
>>  2 files changed, 109 insertions(+), 24 deletions(-)
> 
> Signed-off-by: Daniel P. Berrange <berrange@redhat.com>

I suppose you mean either Reviewed-by or Acked-by? :)

Anyway, thanks for the reviews!

Paolo

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH 06/17] io: make qio_channel_yield aware of AioContexts
  2017-01-24  9:38     ` Paolo Bonzini
@ 2017-01-24  9:40       ` Daniel P. Berrange
  0 siblings, 0 replies; 35+ messages in thread
From: Daniel P. Berrange @ 2017-01-24  9:40 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, stefanha, famz

On Tue, Jan 24, 2017 at 10:38:51AM +0100, Paolo Bonzini wrote:
> 
> 
> On 24/01/2017 10:36, Daniel P. Berrange wrote:
> > On Fri, Jan 20, 2017 at 05:43:11PM +0100, Paolo Bonzini wrote:
> >> Support separate coroutines for reading and writing, and place the
> >> read/write handlers on the AioContext that the QIOChannel is registered
> >> with.
> >>
> >> Cc: Daniel P. Berrange <berrange@redhat.com>
> >> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> >> ---
> >>         v1->v2:
> >>         improved qio_channel_set_aio_context docs, renamed to
> >>                 qio_channel_attach_aio_context [Daniel, Stefan]
> >>         fixed pasto in "io: make qio_channel_yield aware of AioContexts"
> >>
> >>  include/io/channel.h | 47 ++++++++++++++++++++++++++--
> >>  io/channel.c         | 86 +++++++++++++++++++++++++++++++++++++++-------------
> >>  2 files changed, 109 insertions(+), 24 deletions(-)
> > 
> > Signed-off-by: Daniel P. Berrange <berrange@redhat.com>
> 
> I suppose you mean either Reviewed-by or Acked-by? :)
> 
> Anyway, thanks for the reviews!

Well whatever is appropriate - I just meant to indicate that you can send
a pull request including these two io: patches without funnelling them
separately via my io-next queue :-)

Regards,
Daniel
-- 
|: http://berrange.com      -o-    http://www.flickr.com/photos/dberrange/ :|
|: http://libvirt.org              -o-             http://virt-manager.org :|
|: http://entangle-photo.org       -o-    http://search.cpan.org/~danberr/ :|

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2
  2017-01-20 16:43 [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Paolo Bonzini
                   ` (17 preceding siblings ...)
  2017-01-24  9:38 ` [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Daniel P. Berrange
@ 2017-01-29 20:48 ` Paolo Bonzini
  18 siblings, 0 replies; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-29 20:48 UTC (permalink / raw)
  To: qemu-devel; +Cc: famz, stefanha



On 20/01/2017 17:43, Paolo Bonzini wrote:
> This series pushes down aio_context_acquire/release to the point
> where we can actually reason on using different fine-grained mutexes.
> 
> The main infrastructure is introduced in patch 1.  The new API aio_co_wake
> starts a coroutine with aio_context_acquire/release protection, which
> requires tracking each coroutine's "home" AioContext.  aio_co_schedule
> instead takes care of moving a sleeping coroutine to a different
> AioContext, also ensuring that it runs under aio_context_acquire/release.
> This is useful to implement bdrv_set_aio_context, as a simpler alternative
> to bottom halves.  Even though one-shot BHs are already simpler than
> what we had before, after this patch aio_co_wake and aio_co_schedule
> save you from having to do aio_context_acquire/release explicitly.
> 
> After patch 2 to 4, which are just small preparatory changes, patches
> 5 to 8 provide an example of how to use the new API.  In particular patch
> 5 to 7 implement a new organization of coroutines in the NBD client,
> which allows not blocking on partial reply header reads.
> 
> Patch 9 introduces helpers for AioContext locking in QED, which is
> the most complex AIO-based driver left.  Then the actual meat of the
> series runs from patch 10 to patch 14, followed by small optimizations
> in patches 15 and 16.
> 
> The patches do some back and forth in adding/removing
> aio_context_acquire/release calls in block/*.c but ultimately a small
> number of aio_context_acquire/release pairs are added after the pushdown.
> These are mostly in drivers that use external libraries (and actually
> they could already be replaced by QemuMutex) and in device models
> that support multithreaded operation (aka iothread aka dataplane).
> 
> Notably, coroutines need not care about aio_context_acquire/release.
> The device models ensure that the first creation of the coroutine has
> the AioContext, while aio_co_wake/aio_co_schedule do the same after
> they yield.  Therefore, most of the files only need to use those two
> functions instead of, respectively, qemu_coroutine_enter and
> aio_bh_schedule_oneshot.
> 
> However, this is only an intermediate step which is needed because the
> block layer and qemu-coroutine locks are thread-unsafe.  So the next
> part will add separate locking, independent of AioContext, to block.c and
> mostly block/io.c---this includes making CoMutex thread-safe.  Patch 17
> therefore already documents the current locking policies block.h to
> prepare for the next series.
> 
> Paolo
> 
> v1->v2:
>         new patch (4) removing block-obj-y -> io-obj-y dependency
>         removed QIOChannelRestart from
>                 "io: add methods to set I/O handlers on AioContext"
>         improved qio_channel_set_aio_context docs, renamed to
>                 qio_channel_attach_aio_context
>         fixed pasto in "io: make qio_channel_yield aware of AioContexts"
>         document restrictions in bdrv_aio_cancel due to qemu_aio_ref
>         converted NBD server to qio_channel_yield too
>         allow NULL s->read_reply_co
>         
> *** BLURB HERE ***
> 
> Paolo Bonzini (17):
>   aio: introduce aio_co_schedule and aio_co_wake
>   block-backend: allow blk_prw from coroutine context
>   test-thread-pool: use generic AioContext infrastructure
>   block: move AioContext and QEMUTimer to libqemuutil
>   io: add methods to set I/O handlers on AioContext
>   io: make qio_channel_yield aware of AioContexts
>   nbd: convert to use qio_channel_yield
>   coroutine-lock: reschedule coroutine on the AioContext it was running
>     on
>   qed: introduce qed_aio_start_io and qed_aio_next_io_cb
>   aio: push aio_context_acquire/release down to dispatching
>   block: explicitly acquire aiocontext in timers that need it
>   block: explicitly acquire aiocontext in callbacks that need it
>   block: explicitly acquire aiocontext in bottom halves that need it
>   block: explicitly acquire aiocontext in aio callbacks that need it
>   aio-posix: partially inline aio_dispatch into aio_poll
>   async: remove unnecessary inc/dec pairs
>   block: document fields protected by AioContext lock
> 
>  Makefile.objs                       |   5 +-
>  block/blkdebug.c                    |   9 +-
>  block/blkreplay.c                   |   2 +-
>  block/block-backend.c               |  13 ++-
>  block/curl.c                        |  44 ++++++--
>  block/gluster.c                     |   9 +-
>  block/io.c                          |  38 ++-----
>  block/iscsi.c                       |  15 ++-
>  block/linux-aio.c                   |  10 +-
>  block/mirror.c                      |  12 +-
>  block/nbd-client.c                  | 108 ++++++++----------
>  block/nbd-client.h                  |   2 +-
>  block/nfs.c                         |   9 +-
>  block/qed-cluster.c                 |   2 +
>  block/qed-table.c                   |  12 +-
>  block/qed.c                         |  58 +++++++---
>  block/qed.h                         |   3 +
>  block/sheepdog.c                    |  29 ++---
>  block/ssh.c                         |  29 ++---
>  block/throttle-groups.c             |   2 +
>  block/win32-aio.c                   |   9 +-
>  dma-helpers.c                       |   2 +
>  hw/block/virtio-blk.c               |  19 +++-
>  hw/scsi/scsi-bus.c                  |   2 +
>  hw/scsi/scsi-disk.c                 |  15 +++
>  hw/scsi/scsi-generic.c              |  20 +++-
>  hw/scsi/virtio-scsi.c               |   6 +
>  include/block/aio.h                 |  38 ++++++-
>  include/block/block_int.h           |  64 ++++++-----
>  include/io/channel.h                |  72 +++++++++++-
>  include/qemu/coroutine_int.h        |  10 +-
>  include/sysemu/block-backend.h      |  14 ++-
>  io/channel-command.c                |  13 +++
>  io/channel-file.c                   |  11 ++
>  io/channel-socket.c                 |  16 ++-
>  io/channel-tls.c                    |  12 ++
>  io/channel-watch.c                  |   6 +
>  io/channel.c                        |  97 ++++++++++++----
>  nbd/client.c                        |   2 +-
>  nbd/common.c                        |   9 +-
>  nbd/server.c                        |  94 +++++-----------
>  stubs/Makefile.objs                 |   2 +
>  stubs/linux-aio.c                   |  32 ++++++
>  stubs/main-loop.c                   |   8 ++
>  stubs/set-fd-handler.c              |  11 --
>  tests/Makefile.include              |  18 +--
>  tests/iothread.c                    |  91 +++++++++++++++
>  tests/iothread.h                    |  25 +++++
>  tests/test-aio-multithread.c        | 213 ++++++++++++++++++++++++++++++++++++
>  tests/test-thread-pool.c            |  12 +-
>  tests/test-vmstate.c                |  11 --
>  trace-events                        |   4 +
>  util/Makefile.objs                  |   5 +-
>  aio-posix.c => util/aio-posix.c     |  60 +++-------
>  aio-win32.c => util/aio-win32.c     |  30 ++---
>  util/aiocb.c                        |  55 ++++++++++
>  async.c => util/async.c             |  84 ++++++++++++--
>  util/qemu-coroutine-lock.c          |   5 +-
>  util/qemu-coroutine-sleep.c         |   2 +-
>  util/qemu-coroutine.c               |   8 ++
>  qemu-timer.c => util/qemu-timer.c   |   0
>  thread-pool.c => util/thread-pool.c |   6 +-
>  util/trace-events                   |   1 -
>  63 files changed, 1155 insertions(+), 465 deletions(-)
>  create mode 100644 stubs/linux-aio.c
>  create mode 100644 stubs/main-loop.c
>  create mode 100644 tests/iothread.c
>  create mode 100644 tests/iothread.h
>  create mode 100644 tests/test-aio-multithread.c
>  rename aio-posix.c => util/aio-posix.c (94%)
>  rename aio-win32.c => util/aio-win32.c (95%)
>  create mode 100644 util/aiocb.c
>  rename async.c => util/async.c (82%)
>  rename qemu-timer.c => util/qemu-timer.c (100%)
>  rename thread-pool.c => util/thread-pool.c (98%)
> 

Ping?

Paolo

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH 01/17] aio: introduce aio_co_schedule and aio_co_wake
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 01/17] aio: introduce aio_co_schedule and aio_co_wake Paolo Bonzini
@ 2017-01-30 15:18   ` Stefan Hajnoczi
  2017-01-30 20:15     ` Paolo Bonzini
  0 siblings, 1 reply; 35+ messages in thread
From: Stefan Hajnoczi @ 2017-01-30 15:18 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, famz, stefanha

[-- Attachment #1: Type: text/plain, Size: 1946 bytes --]

On Fri, Jan 20, 2017 at 05:43:06PM +0100, Paolo Bonzini wrote:
> diff --git a/include/qemu/coroutine_int.h b/include/qemu/coroutine_int.h
> index 14d4f1d..1efa356 100644
> --- a/include/qemu/coroutine_int.h
> +++ b/include/qemu/coroutine_int.h
> @@ -40,12 +40,20 @@ struct Coroutine {
>      CoroutineEntry *entry;
>      void *entry_arg;
>      Coroutine *caller;
> +
> +    /* Only used when the coroutine has terminated.  */
>      QSLIST_ENTRY(Coroutine) pool_next;
>      size_t locks_held;

locks_held is used when the coroutine is running.  Please add a newline
to separate it from pool_next.

>  
> -    /* Coroutines that should be woken up when we yield or terminate */
> +    /* Coroutines that should be woken up when we yield or terminate.
> +     * Only used when the coroutine is running.
> +     */
>      QSIMPLEQ_HEAD(, Coroutine) co_queue_wakeup;
> +
> +    /* Only used when the coroutine is sleeping.  */

"running", "yielded", and "terminated" are terms with specific meanings.
Is "sleeping" a subset of "yielded" where the coroutine is waiting for a
mutex or its AioContext to schedule it?

> +static void test_multi_co_schedule_entry(void *opaque)

Missing coroutine_fn.

> diff --git a/tests/test-vmstate.c b/tests/test-vmstate.c
> index d2f529b..cb87997 100644
> --- a/tests/test-vmstate.c
> +++ b/tests/test-vmstate.c
> @@ -33,17 +33,6 @@
>  static char temp_file[] = "/tmp/vmst.test.XXXXXX";
>  static int temp_fd;
>  
> -/* Fake yield_until_fd_readable() implementation so we don't have to pull the
> - * coroutine code as dependency.
> - */
> -void yield_until_fd_readable(int fd)
> -{
> -    fd_set fds;
> -    FD_ZERO(&fds);
> -    FD_SET(fd, &fds);
> -    select(fd + 1, &fds, NULL, NULL, NULL);
> -}
> -
>  
>  /* Duplicate temp_fd and seek to the beginning of the file */
>  static QEMUFile *open_test_file(bool write)

This should be a separate patch.

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 455 bytes --]

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH 04/17] block: move AioContext and QEMUTimer to libqemuutil
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 04/17] block: move AioContext and QEMUTimer to libqemuutil Paolo Bonzini
  2017-01-24  9:34   ` Daniel P. Berrange
@ 2017-01-30 15:24   ` Stefan Hajnoczi
  1 sibling, 0 replies; 35+ messages in thread
From: Stefan Hajnoczi @ 2017-01-30 15:24 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, famz, stefanha

[-- Attachment #1: Type: text/plain, Size: 1615 bytes --]

On Fri, Jan 20, 2017 at 05:43:09PM +0100, Paolo Bonzini wrote:
> AioContext is fairly self contained, the only dependency is QEMUTimer but
> that in turn doesn't need anything else.  So move them out of block-obj-y
> to avoid introducing a dependency from io/ to block-obj-y.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>         v1->v2: new patch [Daniel]
> 
>  Makefile.objs                       |  5 +---
>  block/io.c                          | 29 -------------------
>  stubs/Makefile.objs                 |  2 ++
>  stubs/linux-aio.c                   | 32 +++++++++++++++++++++
>  stubs/main-loop.c                   |  8 ++++++
>  stubs/set-fd-handler.c              | 11 --------
>  tests/Makefile.include              | 13 ++++-----
>  util/Makefile.objs                  |  5 +++-
>  aio-posix.c => util/aio-posix.c     |  0
>  aio-win32.c => util/aio-win32.c     |  0
>  util/aiocb.c                        | 55 +++++++++++++++++++++++++++++++++++++
>  async.c => util/async.c             |  3 +-
>  qemu-timer.c => util/qemu-timer.c   |  0
>  thread-pool.c => util/thread-pool.c |  0
>  14 files changed, 110 insertions(+), 53 deletions(-)
>  create mode 100644 stubs/linux-aio.c
>  create mode 100644 stubs/main-loop.c
>  rename aio-posix.c => util/aio-posix.c (100%)
>  rename aio-win32.c => util/aio-win32.c (100%)
>  create mode 100644 util/aiocb.c
>  rename async.c => util/async.c (99%)
>  rename qemu-timer.c => util/qemu-timer.c (100%)
>  rename thread-pool.c => util/thread-pool.c (100%)

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 455 bytes --]

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH 05/17] io: add methods to set I/O handlers on AioContext
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 05/17] io: add methods to set I/O handlers on AioContext Paolo Bonzini
  2017-01-24  9:35   ` Daniel P. Berrange
@ 2017-01-30 15:32   ` Stefan Hajnoczi
  1 sibling, 0 replies; 35+ messages in thread
From: Stefan Hajnoczi @ 2017-01-30 15:32 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, famz, stefanha

[-- Attachment #1: Type: text/plain, Size: 757 bytes --]

On Fri, Jan 20, 2017 at 05:43:10PM +0100, Paolo Bonzini wrote:
> This is in preparation for making qio_channel_yield work on
> AioContexts other than the main one.
> 
> Cc: Daniel P. Berrange <berrange@redhat.com>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>         v1->v2: removed QIOChannelRestart [Daniel]
> 
>  include/io/channel.h | 25 +++++++++++++++++++++++++
>  io/channel-command.c | 13 +++++++++++++
>  io/channel-file.c    | 11 +++++++++++
>  io/channel-socket.c  | 16 +++++++++++-----
>  io/channel-tls.c     | 12 ++++++++++++
>  io/channel-watch.c   |  6 ++++++
>  io/channel.c         | 11 +++++++++++
>  7 files changed, 89 insertions(+), 5 deletions(-)

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 455 bytes --]

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH 06/17] io: make qio_channel_yield aware of AioContexts
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 06/17] io: make qio_channel_yield aware of AioContexts Paolo Bonzini
  2017-01-24  9:36   ` Daniel P. Berrange
@ 2017-01-30 15:37   ` Stefan Hajnoczi
  1 sibling, 0 replies; 35+ messages in thread
From: Stefan Hajnoczi @ 2017-01-30 15:37 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, famz, stefanha

[-- Attachment #1: Type: text/plain, Size: 814 bytes --]

On Fri, Jan 20, 2017 at 05:43:11PM +0100, Paolo Bonzini wrote:
> Support separate coroutines for reading and writing, and place the
> read/write handlers on the AioContext that the QIOChannel is registered
> with.
> 
> Cc: Daniel P. Berrange <berrange@redhat.com>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>         v1->v2:
>         improved qio_channel_set_aio_context docs, renamed to
>                 qio_channel_attach_aio_context [Daniel, Stefan]
>         fixed pasto in "io: make qio_channel_yield aware of AioContexts"
> 
>  include/io/channel.h | 47 ++++++++++++++++++++++++++--
>  io/channel.c         | 86 +++++++++++++++++++++++++++++++++++++++-------------
>  2 files changed, 109 insertions(+), 24 deletions(-)

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 455 bytes --]

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH 07/17] nbd: convert to use qio_channel_yield
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 07/17] nbd: convert to use qio_channel_yield Paolo Bonzini
@ 2017-01-30 15:50   ` Stefan Hajnoczi
  2017-01-30 21:18     ` Paolo Bonzini
  0 siblings, 1 reply; 35+ messages in thread
From: Stefan Hajnoczi @ 2017-01-30 15:50 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, famz, stefanha

[-- Attachment #1: Type: text/plain, Size: 473 bytes --]

On Fri, Jan 20, 2017 at 05:43:12PM +0100, Paolo Bonzini wrote:
> +        aio_co_wake(s->recv_coroutine[i]);
>  
> -    qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine);
> +        /* We're woken up by the recv_coroutine itself.  */
> +        qemu_coroutine_yield();

This relies on recv_coroutine() entering us only after we've yielded -
otherwise QEMU will crash.  The code and comments don't make it obvious
why this is guaranteed to be safe.

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 455 bytes --]

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH 11/17] block: explicitly acquire aiocontext in timers that need it
  2017-01-20 16:43 ` [Qemu-devel] [PATCH 11/17] block: explicitly acquire aiocontext in timers that need it Paolo Bonzini
@ 2017-01-30 16:01   ` Stefan Hajnoczi
  0 siblings, 0 replies; 35+ messages in thread
From: Stefan Hajnoczi @ 2017-01-30 16:01 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, famz, stefanha

[-- Attachment #1: Type: text/plain, Size: 753 bytes --]

On Fri, Jan 20, 2017 at 05:43:16PM +0100, Paolo Bonzini wrote:
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/curl.c                |  2 ++
>  block/io.c                  |  5 +++++
>  block/iscsi.c               |  8 ++++++--
>  block/null.c                |  4 ++++
>  block/qed.c                 | 12 ++++++++++++
>  block/qed.h                 |  3 +++
>  block/throttle-groups.c     |  2 ++
>  util/aio-posix.c            |  2 --
>  util/aio-win32.c            |  2 --
>  util/qemu-coroutine-sleep.c |  2 +-
>  10 files changed, 35 insertions(+), 7 deletions(-)
> 
>         v1->v2: document restrictions in bdrv_aio_cancel due to qemu_aio_ref
> 	[Stefan]

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 455 bytes --]

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH 01/17] aio: introduce aio_co_schedule and aio_co_wake
  2017-01-30 15:18   ` Stefan Hajnoczi
@ 2017-01-30 20:15     ` Paolo Bonzini
  0 siblings, 0 replies; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-30 20:15 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: qemu-devel, famz, stefanha



On 30/01/2017 10:18, Stefan Hajnoczi wrote:
> On Fri, Jan 20, 2017 at 05:43:06PM +0100, Paolo Bonzini wrote:
>> diff --git a/include/qemu/coroutine_int.h b/include/qemu/coroutine_int.h
>> index 14d4f1d..1efa356 100644
>> --- a/include/qemu/coroutine_int.h
>> +++ b/include/qemu/coroutine_int.h
>> @@ -40,12 +40,20 @@ struct Coroutine {
>>      CoroutineEntry *entry;
>>      void *entry_arg;
>>      Coroutine *caller;
>> +
>> +    /* Only used when the coroutine has terminated.  */
>>      QSLIST_ENTRY(Coroutine) pool_next;
>>      size_t locks_held;
> 
> locks_held is used when the coroutine is running.  Please add a newline
> to separate it from pool_next.
> 
>>  
>> -    /* Coroutines that should be woken up when we yield or terminate */
>> +    /* Coroutines that should be woken up when we yield or terminate.
>> +     * Only used when the coroutine is running.
>> +     */
>>      QSIMPLEQ_HEAD(, Coroutine) co_queue_wakeup;
>> +
>> +    /* Only used when the coroutine is sleeping.  */
> 
> "running", "yielded", and "terminated" are terms with specific meanings.
> Is "sleeping" a subset of "yielded" where the coroutine is waiting for a
> mutex or its AioContext to schedule it?

No, I'll just change it to "has yielded".

>> +static void test_multi_co_schedule_entry(void *opaque)
> 
> Missing coroutine_fn.
> 
>> diff --git a/tests/test-vmstate.c b/tests/test-vmstate.c
>> index d2f529b..cb87997 100644
>> --- a/tests/test-vmstate.c
>> +++ b/tests/test-vmstate.c
>> @@ -33,17 +33,6 @@
>>  static char temp_file[] = "/tmp/vmst.test.XXXXXX";
>>  static int temp_fd;
>>  
>> -/* Fake yield_until_fd_readable() implementation so we don't have to pull the
>> - * coroutine code as dependency.
>> - */
>> -void yield_until_fd_readable(int fd)
>> -{
>> -    fd_set fds;
>> -    FD_ZERO(&fds);
>> -    FD_SET(fd, &fds);
>> -    select(fd + 1, &fds, NULL, NULL, NULL);
>> -}
>> -
>>  
>>  /* Duplicate temp_fd and seek to the beginning of the file */
>>  static QEMUFile *open_test_file(bool write)
> 
> This should be a separate patch.

Yes, it probably doesn't have to be in this series either.

Paolo

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH 07/17] nbd: convert to use qio_channel_yield
  2017-01-30 15:50   ` Stefan Hajnoczi
@ 2017-01-30 21:18     ` Paolo Bonzini
  2017-01-31 13:50       ` Stefan Hajnoczi
  0 siblings, 1 reply; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-30 21:18 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: qemu-devel, famz, stefanha



On 30/01/2017 10:50, Stefan Hajnoczi wrote:
> On Fri, Jan 20, 2017 at 05:43:12PM +0100, Paolo Bonzini wrote:
>> +        aio_co_wake(s->recv_coroutine[i]);
>>  
>> -    qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine);
>> +        /* We're woken up by the recv_coroutine itself.  */
>> +        qemu_coroutine_yield();
> 
> This relies on recv_coroutine() entering us only after we've yielded -
> otherwise QEMU will crash.  The code and comments don't make it obvious
> why this is guaranteed to be safe.

It doesn't rely on that (that's the magic hidden behind aio_co_wake),
but you're right there is a documentation problem because I needed 10
minutes to remind myself why it's correct.

It works because neither coroutine is moving context:

- if the two coroutines are in the same context, aio_co_wake queues the
coroutine for execution after the yield, which is obviously okay;

- if they are in different context, the recv_coroutine's aio_co_wake
queues the read_reply_co with aio_co_schedule.  It is then woken up
through a bottom half, which executes only after read_reply has yielded.

It is implied by the aio_co_wake and aio_co_schedule documentation; all
requirements are satisfied:

1) aio_co_wake's comment says:

   * aio_co_wake may be executed either in coroutine or non-coroutine
   * context.  The coroutine must not be entered by anyone else while
   * aio_co_wake() is active.

   read_reply_co is only woken by one recv_coroutine at a time

2) And for the case where read_reply_co and recv_coroutine are in
   different contexts, aio_co_schedule's comment says:

   * In addition the coroutine must have yielded unless ctx
   * is the context in which the coroutine is running (i.e. the value of
   * qemu_get_current_aio_context() from the coroutine itself).

   which is okay because aio_co_wake always uses "the context in
   which the coroutine is running" as the argument to aio_co_schedule.

Any suggestions on how to document this properly?  I'm not sure a
comment in the NBD driver is the best place, because similar logic will
appear soon in other networked block drivers.

Paolo

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH 07/17] nbd: convert to use qio_channel_yield
  2017-01-30 21:18     ` Paolo Bonzini
@ 2017-01-31 13:50       ` Stefan Hajnoczi
  2017-01-31 14:29         ` Paolo Bonzini
  0 siblings, 1 reply; 35+ messages in thread
From: Stefan Hajnoczi @ 2017-01-31 13:50 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: Stefan Hajnoczi, qemu-devel, famz

[-- Attachment #1: Type: text/plain, Size: 2436 bytes --]

On Mon, Jan 30, 2017 at 04:18:10PM -0500, Paolo Bonzini wrote:
> On 30/01/2017 10:50, Stefan Hajnoczi wrote:
> > On Fri, Jan 20, 2017 at 05:43:12PM +0100, Paolo Bonzini wrote:
> >> +        aio_co_wake(s->recv_coroutine[i]);
> >>  
> >> -    qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine);
> >> +        /* We're woken up by the recv_coroutine itself.  */
> >> +        qemu_coroutine_yield();
> > 
> > This relies on recv_coroutine() entering us only after we've yielded -
> > otherwise QEMU will crash.  The code and comments don't make it obvious
> > why this is guaranteed to be safe.
> 
> It doesn't rely on that (that's the magic hidden behind aio_co_wake),
> but you're right there is a documentation problem because I needed 10
> minutes to remind myself why it's correct.
>
> It works because neither coroutine is moving context:
> 
> - if the two coroutines are in the same context, aio_co_wake queues the
> coroutine for execution after the yield, which is obviously okay;
> 
> - if they are in different context, the recv_coroutine's aio_co_wake
> queues the read_reply_co with aio_co_schedule.  It is then woken up
> through a bottom half, which executes only after read_reply has yielded.
> 
> It is implied by the aio_co_wake and aio_co_schedule documentation; all
> requirements are satisfied:
> 
> 1) aio_co_wake's comment says:
> 
>    * aio_co_wake may be executed either in coroutine or non-coroutine
>    * context.  The coroutine must not be entered by anyone else while
>    * aio_co_wake() is active.
> 
>    read_reply_co is only woken by one recv_coroutine at a time
> 
> 2) And for the case where read_reply_co and recv_coroutine are in
>    different contexts, aio_co_schedule's comment says:
> 
>    * In addition the coroutine must have yielded unless ctx
>    * is the context in which the coroutine is running (i.e. the value of
>    * qemu_get_current_aio_context() from the coroutine itself).
> 
>    which is okay because aio_co_wake always uses "the context in
>    which the coroutine is running" as the argument to aio_co_schedule.
> 
> Any suggestions on how to document this properly?  I'm not sure a
> comment in the NBD driver is the best place, because similar logic will
> appear soon in other networked block drivers.

Maybe add a new function that just does these two calls.  I don't have a
good name for it.

Stefan

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 455 bytes --]

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH 07/17] nbd: convert to use qio_channel_yield
  2017-01-31 13:50       ` Stefan Hajnoczi
@ 2017-01-31 14:29         ` Paolo Bonzini
  0 siblings, 0 replies; 35+ messages in thread
From: Paolo Bonzini @ 2017-01-31 14:29 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: Stefan Hajnoczi, famz, qemu-devel



On 31/01/2017 08:50, Stefan Hajnoczi wrote:
>> I'm not sure a
>> comment in the NBD driver is the best place, because similar logic will
>> appear soon in other networked block drivers.
> Maybe add a new function that just does these two calls.  I don't have a
> good name for it.

For now I'll add a comment here.  I checked my final branch and the
idiom is not as frequent as I thought.

Paolo

^ permalink raw reply	[flat|nested] 35+ messages in thread

end of thread, other threads:[~2017-01-31 14:29 UTC | newest]

Thread overview: 35+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2017-01-20 16:43 [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Paolo Bonzini
2017-01-20 16:43 ` [Qemu-devel] [PATCH 01/17] aio: introduce aio_co_schedule and aio_co_wake Paolo Bonzini
2017-01-30 15:18   ` Stefan Hajnoczi
2017-01-30 20:15     ` Paolo Bonzini
2017-01-20 16:43 ` [Qemu-devel] [PATCH 02/17] block-backend: allow blk_prw from coroutine context Paolo Bonzini
2017-01-20 16:43 ` [Qemu-devel] [PATCH 03/17] test-thread-pool: use generic AioContext infrastructure Paolo Bonzini
2017-01-20 16:43 ` [Qemu-devel] [PATCH 04/17] block: move AioContext and QEMUTimer to libqemuutil Paolo Bonzini
2017-01-24  9:34   ` Daniel P. Berrange
2017-01-30 15:24   ` Stefan Hajnoczi
2017-01-20 16:43 ` [Qemu-devel] [PATCH 05/17] io: add methods to set I/O handlers on AioContext Paolo Bonzini
2017-01-24  9:35   ` Daniel P. Berrange
2017-01-30 15:32   ` Stefan Hajnoczi
2017-01-20 16:43 ` [Qemu-devel] [PATCH 06/17] io: make qio_channel_yield aware of AioContexts Paolo Bonzini
2017-01-24  9:36   ` Daniel P. Berrange
2017-01-24  9:38     ` Paolo Bonzini
2017-01-24  9:40       ` Daniel P. Berrange
2017-01-30 15:37   ` Stefan Hajnoczi
2017-01-20 16:43 ` [Qemu-devel] [PATCH 07/17] nbd: convert to use qio_channel_yield Paolo Bonzini
2017-01-30 15:50   ` Stefan Hajnoczi
2017-01-30 21:18     ` Paolo Bonzini
2017-01-31 13:50       ` Stefan Hajnoczi
2017-01-31 14:29         ` Paolo Bonzini
2017-01-20 16:43 ` [Qemu-devel] [PATCH 08/17] coroutine-lock: reschedule coroutine on the AioContext it was running on Paolo Bonzini
2017-01-20 16:43 ` [Qemu-devel] [PATCH 09/17] qed: introduce qed_aio_start_io and qed_aio_next_io_cb Paolo Bonzini
2017-01-20 16:43 ` [Qemu-devel] [PATCH 10/17] aio: push aio_context_acquire/release down to dispatching Paolo Bonzini
2017-01-20 16:43 ` [Qemu-devel] [PATCH 11/17] block: explicitly acquire aiocontext in timers that need it Paolo Bonzini
2017-01-30 16:01   ` Stefan Hajnoczi
2017-01-20 16:43 ` [Qemu-devel] [PATCH 12/17] block: explicitly acquire aiocontext in callbacks " Paolo Bonzini
2017-01-20 16:43 ` [Qemu-devel] [PATCH 13/17] block: explicitly acquire aiocontext in bottom halves " Paolo Bonzini
2017-01-20 16:43 ` [Qemu-devel] [PATCH 14/17] block: explicitly acquire aiocontext in aio callbacks " Paolo Bonzini
2017-01-20 16:43 ` [Qemu-devel] [PATCH 15/17] aio-posix: partially inline aio_dispatch into aio_poll Paolo Bonzini
2017-01-20 16:43 ` [Qemu-devel] [PATCH 16/17] async: remove unnecessary inc/dec pairs Paolo Bonzini
2017-01-20 16:43 ` [Qemu-devel] [PATCH 17/17] block: document fields protected by AioContext lock Paolo Bonzini
2017-01-24  9:38 ` [Qemu-devel] [PATCH 00/17] aio_context_acquire/release pushdown, part 2 Daniel P. Berrange
2017-01-29 20:48 ` Paolo Bonzini

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.