All of lore.kernel.org
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1
@ 2017-01-12 16:55 Paolo Bonzini
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 01/10] aio: rename bh_lock to list_lock Paolo Bonzini
                   ` (10 more replies)
  0 siblings, 11 replies; 15+ messages in thread
From: Paolo Bonzini @ 2017-01-12 16:55 UTC (permalink / raw)
  To: qemu-devel; +Cc: famz, stefanha

This is the first step of pushing down the AioContext lock.  Bottom halves
are already protected by their own lock, use it also for walking_bh
and for the handlers list (including walking_handlers).  The (lock,
walking_foo) pair is wrapped into the QemuLockCnt primitive.

The only difference from v3 is a smattering of tiny nice improvements
to QemuLockCnt.

Paolo

v3->v4:
        Avoid useless atomic_mb_read in non-futex lockcnt [Stefan]
        Use atomic_read in qemu_lockcnt_count [Stefan]
        Tweak comment for qemu_lockcnt_cmpxchg_or_wait [Fam]
        Use if/else in qemu_lockcnt_dec_and_lock [Fam]
        Comment QEMU_LOCKCNT_STATE_* definitions [Fam]

v2->v3:
        fix missing unlock

v1->v2:
        rebased on top of polling patches
        moved documentation to include/
        new patch "aio-posix: split aio_dispatch_handlers out of aio_dispatch"
        replaced atomic_read with atomic_mb_read in qemu_lockcnt_inc
        replaced futex* function and macro names with qemu_futex*
        removed atomic access to node->pfd.revents (because there is no
          concurrent aio_poll anymore after the BDRV_POLL_WHILE series)
        left "aio: push aio_context_acquire/release down to dispatching"
          to a later series

Paolo Bonzini (10):
  aio: rename bh_lock to list_lock
  qemu-thread: introduce QemuLockCnt
  aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh
  qemu-thread: optimize QemuLockCnt with futexes on Linux
  aio-posix: split aio_dispatch_handlers out of aio_dispatch
  aio: tweak walking in dispatch phase
  aio-posix: remove walking_handlers, protecting AioHandler list with
    list_lock
  aio-win32: remove walking_handlers, protecting AioHandler list with
    list_lock
  aio: document locking
  async: optimize aio_bh_poll

 aio-posix.c                 | 116 +++++++------
 aio-win32.c                 |  83 +++++----
 async.c                     |  45 ++---
 docs/lockcnt.txt            | 277 +++++++++++++++++++++++++++++++
 docs/multiple-iothreads.txt |  13 +-
 include/block/aio.h         |  38 ++---
 include/qemu/futex.h        |  36 ++++
 include/qemu/thread.h       | 112 +++++++++++++
 util/Makefile.objs          |   1 +
 util/lockcnt.c              | 397 ++++++++++++++++++++++++++++++++++++++++++++
 util/qemu-thread-posix.c    |  35 +---
 util/qemu-thread-win32.c    |   2 +-
 util/trace-events           |  10 ++
 13 files changed, 1006 insertions(+), 159 deletions(-)
 create mode 100644 docs/lockcnt.txt
 create mode 100644 include/qemu/futex.h
 create mode 100644 util/lockcnt.c

-- 
2.9.3

^ permalink raw reply	[flat|nested] 15+ messages in thread

* [Qemu-devel] [PATCH 01/10] aio: rename bh_lock to list_lock
  2017-01-12 16:55 [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
@ 2017-01-12 16:55 ` Paolo Bonzini
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 02/10] qemu-thread: introduce QemuLockCnt Paolo Bonzini
                   ` (9 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Paolo Bonzini @ 2017-01-12 16:55 UTC (permalink / raw)
  To: qemu-devel; +Cc: famz, stefanha

This will be used for AioHandlers too.  There is going to be little
or no contention, so it is better to reuse the same lock.

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 async.c             | 20 ++++++++++----------
 include/block/aio.h |  2 +-
 2 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/async.c b/async.c
index 2960171..69292fa 100644
--- a/async.c
+++ b/async.c
@@ -53,14 +53,14 @@ void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
         .cb = cb,
         .opaque = opaque,
     };
-    qemu_mutex_lock(&ctx->bh_lock);
+    qemu_mutex_lock(&ctx->list_lock);
     bh->next = ctx->first_bh;
     bh->scheduled = 1;
     bh->deleted = 1;
     /* Make sure that the members are ready before putting bh into list */
     smp_wmb();
     ctx->first_bh = bh;
-    qemu_mutex_unlock(&ctx->bh_lock);
+    qemu_mutex_unlock(&ctx->list_lock);
     aio_notify(ctx);
 }
 
@@ -73,12 +73,12 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
         .cb = cb,
         .opaque = opaque,
     };
-    qemu_mutex_lock(&ctx->bh_lock);
+    qemu_mutex_lock(&ctx->list_lock);
     bh->next = ctx->first_bh;
     /* Make sure that the members are ready before putting bh into list */
     smp_wmb();
     ctx->first_bh = bh;
-    qemu_mutex_unlock(&ctx->bh_lock);
+    qemu_mutex_unlock(&ctx->list_lock);
     return bh;
 }
 
@@ -120,7 +120,7 @@ int aio_bh_poll(AioContext *ctx)
 
     /* remove deleted bhs */
     if (!ctx->walking_bh) {
-        qemu_mutex_lock(&ctx->bh_lock);
+        qemu_mutex_lock(&ctx->list_lock);
         bhp = &ctx->first_bh;
         while (*bhp) {
             bh = *bhp;
@@ -131,7 +131,7 @@ int aio_bh_poll(AioContext *ctx)
                 bhp = &bh->next;
             }
         }
-        qemu_mutex_unlock(&ctx->bh_lock);
+        qemu_mutex_unlock(&ctx->list_lock);
     }
 
     return ret;
@@ -270,7 +270,7 @@ aio_ctx_finalize(GSource     *source)
     }
 #endif
 
-    qemu_mutex_lock(&ctx->bh_lock);
+    qemu_mutex_lock(&ctx->list_lock);
     while (ctx->first_bh) {
         QEMUBH *next = ctx->first_bh->next;
 
@@ -280,12 +280,12 @@ aio_ctx_finalize(GSource     *source)
         g_free(ctx->first_bh);
         ctx->first_bh = next;
     }
-    qemu_mutex_unlock(&ctx->bh_lock);
+    qemu_mutex_unlock(&ctx->list_lock);
 
     aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL);
     event_notifier_cleanup(&ctx->notifier);
     qemu_rec_mutex_destroy(&ctx->lock);
-    qemu_mutex_destroy(&ctx->bh_lock);
+    qemu_mutex_destroy(&ctx->list_lock);
     timerlistgroup_deinit(&ctx->tlg);
 }
 
@@ -381,7 +381,7 @@ AioContext *aio_context_new(Error **errp)
     ctx->linux_aio = NULL;
 #endif
     ctx->thread_pool = NULL;
-    qemu_mutex_init(&ctx->bh_lock);
+    qemu_mutex_init(&ctx->list_lock);
     qemu_rec_mutex_init(&ctx->lock);
     timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 
diff --git a/include/block/aio.h b/include/block/aio.h
index 4dca54d..013d400 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -91,7 +91,7 @@ struct AioContext {
     uint32_t notify_me;
 
     /* lock to protect between bh's adders and deleter */
-    QemuMutex bh_lock;
+    QemuMutex list_lock;
 
     /* Anchor of the list of Bottom Halves belonging to the context */
     struct QEMUBH *first_bh;
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [Qemu-devel] [PATCH 02/10] qemu-thread: introduce QemuLockCnt
  2017-01-12 16:55 [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 01/10] aio: rename bh_lock to list_lock Paolo Bonzini
@ 2017-01-12 16:55 ` Paolo Bonzini
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 03/10] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh Paolo Bonzini
                   ` (8 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Paolo Bonzini @ 2017-01-12 16:55 UTC (permalink / raw)
  To: qemu-devel; +Cc: famz, stefanha

A QemuLockCnt comprises a counter and a mutex, with primitives
to increment and decrement the counter, and to take and release the
mutex.  It can be used to do lock-free visits to a data structure
whenever mutexes would be too heavy-weight and the critical section
is too long for RCU.

This could be implemented simply by protecting the counter with the
mutex, but QemuLockCnt is harder to misuse and more efficient.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/lockcnt.txt      | 278 ++++++++++++++++++++++++++++++++++++++++++++++++++
 include/qemu/thread.h | 110 ++++++++++++++++++++
 util/Makefile.objs    |   1 +
 util/lockcnt.c        | 114 +++++++++++++++++++++
 4 files changed, 503 insertions(+)
 create mode 100644 docs/lockcnt.txt
 create mode 100644 util/lockcnt.c

diff --git a/docs/lockcnt.txt b/docs/lockcnt.txt
new file mode 100644
index 0000000..25a8091
--- /dev/null
+++ b/docs/lockcnt.txt
@@ -0,0 +1,278 @@
+DOCUMENTATION FOR LOCKED COUNTERS (aka QemuLockCnt)
+===================================================
+
+QEMU often uses reference counts to track data structures that are being
+accessed and should not be freed.  For example, a loop that invoke
+callbacks like this is not safe:
+
+    QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+        if (ioh->revents & G_IO_OUT) {
+            ioh->fd_write(ioh->opaque);
+        }
+    }
+
+QLIST_FOREACH_SAFE protects against deletion of the current node (ioh)
+by stashing away its "next" pointer.  However, ioh->fd_write could
+actually delete the next node from the list.  The simplest way to
+avoid this is to mark the node as deleted, and remove it from the
+list in the above loop:
+
+    QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+        if (ioh->deleted) {
+            QLIST_REMOVE(ioh, next);
+            g_free(ioh);
+        } else {
+            if (ioh->revents & G_IO_OUT) {
+                ioh->fd_write(ioh->opaque);
+            }
+        }
+    }
+
+If however this loop must also be reentrant, i.e. it is possible that
+ioh->fd_write invokes the loop again, some kind of counting is needed:
+
+    walking_handlers++;
+    QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+        if (ioh->deleted) {
+            if (walking_handlers == 1) {
+                QLIST_REMOVE(ioh, next);
+                g_free(ioh);
+            }
+        } else {
+            if (ioh->revents & G_IO_OUT) {
+                ioh->fd_write(ioh->opaque);
+            }
+        }
+    }
+    walking_handlers--;
+
+One may think of using the RCU primitives, rcu_read_lock() and
+rcu_read_unlock(); effectively, the RCU nesting count would take
+the place of the walking_handlers global variable.  Indeed,
+reference counting and RCU have similar purposes, but their usage in
+general is complementary:
+
+- reference counting is fine-grained and limited to a single data
+  structure; RCU delays reclamation of *all* RCU-protected data
+  structures;
+
+- reference counting works even in the presence of code that keeps
+  a reference for a long time; RCU critical sections in principle
+  should be kept short;
+
+- reference counting is often applied to code that is not thread-safe
+  but is reentrant; in fact, usage of reference counting in QEMU predates
+  the introduction of threads by many years.  RCU is generally used to
+  protect readers from other threads freeing memory after concurrent
+  modifications to a data structure.
+
+- reclaiming data can be done by a separate thread in the case of RCU;
+  this can improve performance, but also delay reclamation undesirably.
+  With reference counting, reclamation is deterministic.
+
+This file documents QemuLockCnt, an abstraction for using reference
+counting in code that has to be both thread-safe and reentrant.
+
+
+QemuLockCnt concepts
+--------------------
+
+A QemuLockCnt comprises both a counter and a mutex; it has primitives
+to increment and decrement the counter, and to take and release the
+mutex.  The counter notes how many visits to the data structures are
+taking place (the visits could be from different threads, or there could
+be multiple reentrant visits from the same thread).  The basic rules
+governing the counter/mutex pair then are the following:
+
+- Data protected by the QemuLockCnt must not be freed unless the
+  counter is zero and the mutex is taken.
+
+- A new visit cannot be started while the counter is zero and the
+  mutex is taken.
+
+Most of the time, the mutex protects all writes to the data structure,
+not just frees, though there could be cases where this is not necessary.
+
+Reads, instead, can be done without taking the mutex, as long as the
+readers and writers use the same macros that are used for RCU, for
+example atomic_rcu_read, atomic_rcu_set, QLIST_FOREACH_RCU, etc.  This is
+because the reads are done outside a lock and a set or QLIST_INSERT_HEAD
+can happen concurrently with the read.  The RCU API ensures that the
+processor and the compiler see all required memory barriers.
+
+This could be implemented simply by protecting the counter with the
+mutex, for example:
+
+    // (1)
+    qemu_mutex_lock(&walking_handlers_mutex);
+    walking_handlers++;
+    qemu_mutex_unlock(&walking_handlers_mutex);
+
+    ...
+
+    // (2)
+    qemu_mutex_lock(&walking_handlers_mutex);
+    if (--walking_handlers == 0) {
+        QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+            if (ioh->deleted) {
+                QLIST_REMOVE(ioh, next);
+                g_free(ioh);
+            }
+        }
+    }
+    qemu_mutex_unlock(&walking_handlers_mutex);
+
+Here, no frees can happen in the code represented by the ellipsis.
+If another thread is executing critical section (2), that part of
+the code cannot be entered, because the thread will not be able
+to increment the walking_handlers variable.  And of course
+during the visit any other thread will see a nonzero value for
+walking_handlers, as in the single-threaded code.
+
+Note that it is possible for multiple concurrent accesses to delay
+the cleanup arbitrarily; in other words, for the walking_handlers
+counter to never become zero.  For this reason, this technique is
+more easily applicable if concurrent access to the structure is rare.
+
+However, critical sections are easy to forget since you have to do
+them for each modification of the counter.  QemuLockCnt ensures that
+all modifications of the counter take the lock appropriately, and it
+can also be more efficient in two ways:
+
+- it avoids taking the lock for many operations (for example
+  incrementing the counter while it is non-zero);
+
+- on some platforms, one could implement QemuLockCnt to hold the
+  lock and the mutex in a single word, making it no more expensive
+  than simply managing a counter using atomic operations (see
+  docs/atomics.txt).  This is not implemented yet, but can be
+  very helpful if concurrent access to the data structure is
+  expected to be rare.
+
+
+Using the same mutex for frees and writes can still incur some small
+inefficiencies; for example, a visit can never start if the counter is
+zero and the mutex is taken---even if the mutex is taken by a write,
+which in principle need not block a visit of the data structure.
+However, these are usually not a problem if any of the following
+assumptions are valid:
+
+- concurrent access is possible but rare
+
+- writes are rare
+
+- writes are frequent, but this kind of write (e.g. appending to a
+  list) has a very small critical section.
+
+For example, QEMU uses QemuLockCnt to manage an AioContext's list of
+bottom halves and file descriptor handlers.  Modifications to the list
+of file descriptor handlers are rare.  Creation of a new bottom half is
+frequent and can happen on a fast path; however: 1) it is almost never
+concurrent with a visit to the list of bottom halves; 2) it only has
+three instructions in the critical path, two assignments and a smp_wmb().
+
+
+QemuLockCnt API
+---------------
+
+The QemuLockCnt API is described in include/qemu/thread.h.
+
+
+QemuLockCnt usage
+-----------------
+
+This section explains the typical usage patterns for QemuLockCnt functions.
+
+Setting a variable to a non-NULL value can be done between
+qemu_lockcnt_lock and qemu_lockcnt_unlock:
+
+    qemu_lockcnt_lock(&xyz_lockcnt);
+    if (!xyz) {
+        new_xyz = g_new(XYZ, 1);
+        ...
+        atomic_rcu_set(&xyz, new_xyz);
+    }
+    qemu_lockcnt_unlock(&xyz_lockcnt);
+
+Accessing the value can be done between qemu_lockcnt_inc and
+qemu_lockcnt_dec:
+
+    qemu_lockcnt_inc(&xyz_lockcnt);
+    if (xyz) {
+        XYZ *p = atomic_rcu_read(&xyz);
+        ...
+        /* Accesses can now be done through "p".  */
+    }
+    qemu_lockcnt_dec(&xyz_lockcnt);
+
+Freeing the object can similarly use qemu_lockcnt_lock and
+qemu_lockcnt_unlock, but you also need to ensure that the count
+is zero (i.e. there is no concurrent visit).  Because qemu_lockcnt_inc
+takes the QemuLockCnt's lock, the count cannot become non-zero while
+the object is being freed.  Freeing an object looks like this:
+
+    qemu_lockcnt_lock(&xyz_lockcnt);
+    if (!qemu_lockcnt_count(&xyz_lockcnt)) {
+        g_free(xyz);
+        xyz = NULL;
+    }
+    qemu_lockcnt_unlock(&xyz_lockcnt);
+
+If an object has to be freed right after a visit, you can combine
+the decrement, the locking and the check on count as follows:
+
+    qemu_lockcnt_inc(&xyz_lockcnt);
+    if (xyz) {
+        XYZ *p = atomic_rcu_read(&xyz);
+        ...
+        /* Accesses can now be done through "p".  */
+    }
+    if (qemu_lockcnt_dec_and_lock(&xyz_lockcnt)) {
+        g_free(xyz);
+        xyz = NULL;
+        qemu_lockcnt_unlock(&xyz_lockcnt);
+    }
+
+QemuLockCnt can also be used to access a list as follows:
+
+    qemu_lockcnt_inc(&io_handlers_lockcnt);
+    QLIST_FOREACH_RCU(ioh, &io_handlers, pioh) {
+        if (ioh->revents & G_IO_OUT) {
+            ioh->fd_write(ioh->opaque);
+        }
+    }
+
+    if (qemu_lockcnt_dec_and_lock(&io_handlers_lockcnt)) {
+        QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+            if (ioh->deleted) {
+                QLIST_REMOVE(ioh, next);
+                g_free(ioh);
+            }
+        }
+        qemu_lockcnt_unlock(&io_handlers_lockcnt);
+    }
+
+Again, the RCU primitives are used because new items can be added to the
+list during the walk.  QLIST_FOREACH_RCU ensures that the processor and
+the compiler see the appropriate memory barriers.
+
+An alternative pattern uses qemu_lockcnt_dec_if_lock:
+
+    qemu_lockcnt_inc(&io_handlers_lockcnt);
+    QLIST_FOREACH_SAFE_RCU(ioh, &io_handlers, next, pioh) {
+        if (ioh->deleted) {
+            if (qemu_lockcnt_dec_if_lock(&io_handlers_lockcnt)) {
+                QLIST_REMOVE(ioh, next);
+                g_free(ioh);
+                qemu_lockcnt_inc_and_unlock(&io_handlers_lockcnt);
+            }
+        } else {
+            if (ioh->revents & G_IO_OUT) {
+                ioh->fd_write(ioh->opaque);
+            }
+        }
+    }
+    qemu_lockcnt_dec(&io_handlers_lockcnt);
+
+Here you can use qemu_lockcnt_dec instead of qemu_lockcnt_dec_and_lock,
+because there is no special task to do if the count goes from 1 to 0.
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index e8e665f..5f7de7b 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -8,6 +8,7 @@ typedef struct QemuMutex QemuMutex;
 typedef struct QemuCond QemuCond;
 typedef struct QemuSemaphore QemuSemaphore;
 typedef struct QemuEvent QemuEvent;
+typedef struct QemuLockCnt QemuLockCnt;
 typedef struct QemuThread QemuThread;
 
 #ifdef _WIN32
@@ -98,4 +99,113 @@ static inline void qemu_spin_unlock(QemuSpin *spin)
     __sync_lock_release(&spin->value);
 }
 
+struct QemuLockCnt {
+    QemuMutex mutex;
+    unsigned count;
+};
+
+/**
+ * qemu_lockcnt_init: initialize a QemuLockcnt
+ * @lockcnt: the lockcnt to initialize
+ *
+ * Initialize lockcnt's counter to zero and prepare its mutex
+ * for usage.
+ */
+void qemu_lockcnt_init(QemuLockCnt *lockcnt);
+
+/**
+ * qemu_lockcnt_destroy: destroy a QemuLockcnt
+ * @lockcnt: the lockcnt to destruct
+ *
+ * Destroy lockcnt's mutex.
+ */
+void qemu_lockcnt_destroy(QemuLockCnt *lockcnt);
+
+/**
+ * qemu_lockcnt_inc: increment a QemuLockCnt's counter
+ * @lockcnt: the lockcnt to operate on
+ *
+ * If the lockcnt's count is zero, wait for critical sections
+ * to finish and increment lockcnt's count to 1.  If the count
+ * is not zero, just increment it.
+ *
+ * Because this function can wait on the mutex, it must not be
+ * called while the lockcnt's mutex is held by the current thread.
+ * For the same reason, qemu_lockcnt_inc can also contribute to
+ * AB-BA deadlocks.  This is a sample deadlock scenario:
+ *
+ *            thread 1                      thread 2
+ *            -------------------------------------------------------
+ *            qemu_lockcnt_lock(&lc1);
+ *                                          qemu_lockcnt_lock(&lc2);
+ *            qemu_lockcnt_inc(&lc2);
+ *                                          qemu_lockcnt_inc(&lc1);
+ */
+void qemu_lockcnt_inc(QemuLockCnt *lockcnt);
+
+/**
+ * qemu_lockcnt_dec: decrement a QemuLockCnt's counter
+ * @lockcnt: the lockcnt to operate on
+ */
+void qemu_lockcnt_dec(QemuLockCnt *lockcnt);
+
+/**
+ * qemu_lockcnt_dec_and_lock: decrement a QemuLockCnt's counter and
+ * possibly lock it.
+ * @lockcnt: the lockcnt to operate on
+ *
+ * Decrement lockcnt's count.  If the new count is zero, lock
+ * the mutex and return true.  Otherwise, return false.
+ */
+bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt);
+
+/**
+ * qemu_lockcnt_dec_if_lock: possibly decrement a QemuLockCnt's counter and
+ * lock it.
+ * @lockcnt: the lockcnt to operate on
+ *
+ * If the count is 1, decrement the count to zero, lock
+ * the mutex and return true.  Otherwise, return false.
+ */
+bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt);
+
+/**
+ * qemu_lockcnt_lock: lock a QemuLockCnt's mutex.
+ * @lockcnt: the lockcnt to operate on
+ *
+ * Remember that concurrent visits are not blocked unless the count is
+ * also zero.  You can use qemu_lockcnt_count to check for this inside a
+ * critical section.
+ */
+void qemu_lockcnt_lock(QemuLockCnt *lockcnt);
+
+/**
+ * qemu_lockcnt_unlock: release a QemuLockCnt's mutex.
+ * @lockcnt: the lockcnt to operate on.
+ */
+void qemu_lockcnt_unlock(QemuLockCnt *lockcnt);
+
+/**
+ * qemu_lockcnt_inc_and_unlock: combined unlock/increment on a QemuLockCnt.
+ * @lockcnt: the lockcnt to operate on.
+ *
+ * This is the same as
+ *
+ *     qemu_lockcnt_unlock(lockcnt);
+ *     qemu_lockcnt_inc(lockcnt);
+ *
+ * but more efficient.
+ */
+void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt);
+
+/**
+ * qemu_lockcnt_count: query a LockCnt's count.
+ * @lockcnt: the lockcnt to query.
+ *
+ * Note that the count can change at any time.  Still, while the
+ * lockcnt is locked, one can usefully check whether the count
+ * is non-zero.
+ */
+unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt);
+
 #endif
diff --git a/util/Makefile.objs b/util/Makefile.objs
index ad0f9c7..c1f247d 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -1,5 +1,6 @@
 util-obj-y = osdep.o cutils.o unicode.o qemu-timer-common.o
 util-obj-y += bufferiszero.o
+util-obj-y += lockcnt.o
 util-obj-$(CONFIG_POSIX) += compatfd.o
 util-obj-$(CONFIG_POSIX) += event_notifier-posix.o
 util-obj-$(CONFIG_POSIX) += mmap-alloc.o
diff --git a/util/lockcnt.c b/util/lockcnt.c
new file mode 100644
index 0000000..da1de77
--- /dev/null
+++ b/util/lockcnt.c
@@ -0,0 +1,114 @@
+/*
+ * QemuLockCnt implementation
+ *
+ * Copyright Red Hat, Inc. 2017
+ *
+ * Author:
+ *   Paolo Bonzini <pbonzini@redhat.com>
+ */
+#include "qemu/osdep.h"
+#include "qemu/thread.h"
+#include "qemu/atomic.h"
+
+void qemu_lockcnt_init(QemuLockCnt *lockcnt)
+{
+    qemu_mutex_init(&lockcnt->mutex);
+    lockcnt->count = 0;
+}
+
+void qemu_lockcnt_destroy(QemuLockCnt *lockcnt)
+{
+    qemu_mutex_destroy(&lockcnt->mutex);
+}
+
+void qemu_lockcnt_inc(QemuLockCnt *lockcnt)
+{
+    int old;
+    for (;;) {
+        old = atomic_read(&lockcnt->count);
+        if (old == 0) {
+            qemu_lockcnt_lock(lockcnt);
+            qemu_lockcnt_inc_and_unlock(lockcnt);
+            return;
+        } else {
+            if (atomic_cmpxchg(&lockcnt->count, old, old + 1) == old) {
+                return;
+            }
+        }
+    }
+}
+
+void qemu_lockcnt_dec(QemuLockCnt *lockcnt)
+{
+    atomic_dec(&lockcnt->count);
+}
+
+/* Decrement a counter, and return locked if it is decremented to zero.
+ * It is impossible for the counter to become nonzero while the mutex
+ * is taken.
+ */
+bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    while (val > 1) {
+        int old = atomic_cmpxchg(&lockcnt->count, val, val - 1);
+        if (old != val) {
+            val = old;
+            continue;
+        }
+
+        return false;
+    }
+
+    qemu_lockcnt_lock(lockcnt);
+    if (atomic_fetch_dec(&lockcnt->count) == 1) {
+        return true;
+    }
+
+    qemu_lockcnt_unlock(lockcnt);
+    return false;
+}
+
+/* Decrement a counter and return locked if it is decremented to zero.
+ * Otherwise do nothing.
+ *
+ * It is impossible for the counter to become nonzero while the mutex
+ * is taken.
+ */
+bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt)
+{
+    /* No need for acquire semantics if we return false.  */
+    int val = atomic_read(&lockcnt->count);
+    if (val > 1) {
+        return false;
+    }
+
+    qemu_lockcnt_lock(lockcnt);
+    if (atomic_fetch_dec(&lockcnt->count) == 1) {
+        return true;
+    }
+
+    qemu_lockcnt_inc_and_unlock(lockcnt);
+    return false;
+}
+
+void qemu_lockcnt_lock(QemuLockCnt *lockcnt)
+{
+    qemu_mutex_lock(&lockcnt->mutex);
+}
+
+void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt)
+{
+    atomic_inc(&lockcnt->count);
+    qemu_mutex_unlock(&lockcnt->mutex);
+}
+
+void qemu_lockcnt_unlock(QemuLockCnt *lockcnt)
+{
+    qemu_mutex_unlock(&lockcnt->mutex);
+}
+
+unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
+{
+    return atomic_read(&lockcnt->count);
+}
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [Qemu-devel] [PATCH 03/10] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh
  2017-01-12 16:55 [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 01/10] aio: rename bh_lock to list_lock Paolo Bonzini
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 02/10] qemu-thread: introduce QemuLockCnt Paolo Bonzini
@ 2017-01-12 16:55 ` Paolo Bonzini
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini
                   ` (7 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Paolo Bonzini @ 2017-01-12 16:55 UTC (permalink / raw)
  To: qemu-devel; +Cc: famz, stefanha

This will make it possible to walk the list of bottom halves without
holding the AioContext lock---and in turn to call bottom half
handlers without holding the lock.

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 async.c             | 35 ++++++++++++++++-------------------
 include/block/aio.h | 12 +++++-------
 2 files changed, 21 insertions(+), 26 deletions(-)

diff --git a/async.c b/async.c
index 69292fa..2305e11 100644
--- a/async.c
+++ b/async.c
@@ -53,14 +53,14 @@ void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
         .cb = cb,
         .opaque = opaque,
     };
-    qemu_mutex_lock(&ctx->list_lock);
+    qemu_lockcnt_lock(&ctx->list_lock);
     bh->next = ctx->first_bh;
     bh->scheduled = 1;
     bh->deleted = 1;
     /* Make sure that the members are ready before putting bh into list */
     smp_wmb();
     ctx->first_bh = bh;
-    qemu_mutex_unlock(&ctx->list_lock);
+    qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
 }
 
@@ -73,12 +73,12 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
         .cb = cb,
         .opaque = opaque,
     };
-    qemu_mutex_lock(&ctx->list_lock);
+    qemu_lockcnt_lock(&ctx->list_lock);
     bh->next = ctx->first_bh;
     /* Make sure that the members are ready before putting bh into list */
     smp_wmb();
     ctx->first_bh = bh;
-    qemu_mutex_unlock(&ctx->list_lock);
+    qemu_lockcnt_unlock(&ctx->list_lock);
     return bh;
 }
 
@@ -93,13 +93,11 @@ int aio_bh_poll(AioContext *ctx)
     QEMUBH *bh, **bhp, *next;
     int ret;
 
-    ctx->walking_bh++;
+    qemu_lockcnt_inc(&ctx->list_lock);
 
     ret = 0;
-    for (bh = ctx->first_bh; bh; bh = next) {
-        /* Make sure that fetching bh happens before accessing its members */
-        smp_read_barrier_depends();
-        next = bh->next;
+    for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) {
+        next = atomic_rcu_read(&bh->next);
         /* The atomic_xchg is paired with the one in qemu_bh_schedule.  The
          * implicit memory barrier ensures that the callback sees all writes
          * done by the scheduling thread.  It also ensures that the scheduling
@@ -116,11 +114,8 @@ int aio_bh_poll(AioContext *ctx)
         }
     }
 
-    ctx->walking_bh--;
-
     /* remove deleted bhs */
-    if (!ctx->walking_bh) {
-        qemu_mutex_lock(&ctx->list_lock);
+    if (qemu_lockcnt_dec_and_lock(&ctx->list_lock)) {
         bhp = &ctx->first_bh;
         while (*bhp) {
             bh = *bhp;
@@ -131,7 +126,7 @@ int aio_bh_poll(AioContext *ctx)
                 bhp = &bh->next;
             }
         }
-        qemu_mutex_unlock(&ctx->list_lock);
+        qemu_lockcnt_unlock(&ctx->list_lock);
     }
 
     return ret;
@@ -187,7 +182,8 @@ aio_compute_timeout(AioContext *ctx)
     int timeout = -1;
     QEMUBH *bh;
 
-    for (bh = ctx->first_bh; bh; bh = bh->next) {
+    for (bh = atomic_rcu_read(&ctx->first_bh); bh;
+         bh = atomic_rcu_read(&bh->next)) {
         if (bh->scheduled) {
             if (bh->idle) {
                 /* idle bottom halves will be polled at least
@@ -270,7 +266,8 @@ aio_ctx_finalize(GSource     *source)
     }
 #endif
 
-    qemu_mutex_lock(&ctx->list_lock);
+    qemu_lockcnt_lock(&ctx->list_lock);
+    assert(!qemu_lockcnt_count(&ctx->list_lock));
     while (ctx->first_bh) {
         QEMUBH *next = ctx->first_bh->next;
 
@@ -280,12 +277,12 @@ aio_ctx_finalize(GSource     *source)
         g_free(ctx->first_bh);
         ctx->first_bh = next;
     }
-    qemu_mutex_unlock(&ctx->list_lock);
+    qemu_lockcnt_unlock(&ctx->list_lock);
 
     aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL);
     event_notifier_cleanup(&ctx->notifier);
     qemu_rec_mutex_destroy(&ctx->lock);
-    qemu_mutex_destroy(&ctx->list_lock);
+    qemu_lockcnt_destroy(&ctx->list_lock);
     timerlistgroup_deinit(&ctx->tlg);
 }
 
@@ -372,6 +369,7 @@ AioContext *aio_context_new(Error **errp)
         goto fail;
     }
     g_source_set_can_recurse(&ctx->source, true);
+    qemu_lockcnt_init(&ctx->list_lock);
     aio_set_event_notifier(ctx, &ctx->notifier,
                            false,
                            (EventNotifierHandler *)
@@ -381,7 +379,6 @@ AioContext *aio_context_new(Error **errp)
     ctx->linux_aio = NULL;
 #endif
     ctx->thread_pool = NULL;
-    qemu_mutex_init(&ctx->list_lock);
     qemu_rec_mutex_init(&ctx->lock);
     timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 
diff --git a/include/block/aio.h b/include/block/aio.h
index 013d400..be3adfe 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -90,17 +90,15 @@ struct AioContext {
      */
     uint32_t notify_me;
 
-    /* lock to protect between bh's adders and deleter */
-    QemuMutex list_lock;
+    /* A lock to protect between bh's adders and deleter, and to ensure
+     * that no callbacks are removed while we're walking and dispatching
+     * them.
+     */
+    QemuLockCnt list_lock;
 
     /* Anchor of the list of Bottom Halves belonging to the context */
     struct QEMUBH *first_bh;
 
-    /* A simple lock used to protect the first_bh list, and ensure that
-     * no callbacks are removed while we're walking and dispatching callbacks.
-     */
-    int walking_bh;
-
     /* Used by aio_notify.
      *
      * "notified" is used to avoid expensive event_notifier_test_and_clear
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux
  2017-01-12 16:55 [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
                   ` (2 preceding siblings ...)
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 03/10] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh Paolo Bonzini
@ 2017-01-12 16:55 ` Paolo Bonzini
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 05/10] aio-posix: split aio_dispatch_handlers out of aio_dispatch Paolo Bonzini
                   ` (6 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Paolo Bonzini @ 2017-01-12 16:55 UTC (permalink / raw)
  To: qemu-devel; +Cc: famz, stefanha

This is complex, but I think it is reasonably documented in the source.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/lockcnt.txt         |   9 +-
 include/qemu/futex.h     |  36 ++++++
 include/qemu/thread.h    |   2 +
 util/lockcnt.c           | 283 +++++++++++++++++++++++++++++++++++++++++++++++
 util/qemu-thread-posix.c |  35 +-----
 util/qemu-thread-win32.c |   2 +-
 util/trace-events        |  10 ++
 7 files changed, 342 insertions(+), 35 deletions(-)
 create mode 100644 include/qemu/futex.h

diff --git a/docs/lockcnt.txt b/docs/lockcnt.txt
index 25a8091..2a79b32 100644
--- a/docs/lockcnt.txt
+++ b/docs/lockcnt.txt
@@ -142,12 +142,11 @@ can also be more efficient in two ways:
 - it avoids taking the lock for many operations (for example
   incrementing the counter while it is non-zero);
 
-- on some platforms, one could implement QemuLockCnt to hold the
-  lock and the mutex in a single word, making it no more expensive
+- on some platforms, one can implement QemuLockCnt to hold the lock
+  and the mutex in a single word, making the fast path no more expensive
   than simply managing a counter using atomic operations (see
-  docs/atomics.txt).  This is not implemented yet, but can be
-  very helpful if concurrent access to the data structure is
-  expected to be rare.
+  docs/atomics.txt).  This can be very helpful if concurrent access to
+  the data structure is expected to be rare.
 
 
 Using the same mutex for frees and writes can still incur some small
diff --git a/include/qemu/futex.h b/include/qemu/futex.h
new file mode 100644
index 0000000..852d612
--- /dev/null
+++ b/include/qemu/futex.h
@@ -0,0 +1,36 @@
+/*
+ * Wrappers around Linux futex syscall
+ *
+ * Copyright Red Hat, Inc. 2017
+ *
+ * Author:
+ *  Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include <sys/syscall.h>
+#include <linux/futex.h>
+
+#define qemu_futex(...)              syscall(__NR_futex, __VA_ARGS__)
+
+static inline void qemu_futex_wake(void *f, int n)
+{
+    qemu_futex(f, FUTEX_WAKE, n, NULL, NULL, 0);
+}
+
+static inline void qemu_futex_wait(void *f, unsigned val)
+{
+    while (qemu_futex(f, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
+        switch (errno) {
+        case EWOULDBLOCK:
+            return;
+        case EINTR:
+            break; /* get out of switch and retry */
+        default:
+            abort();
+        }
+    }
+}
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index 5f7de7b..9910f49 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -100,7 +100,9 @@ static inline void qemu_spin_unlock(QemuSpin *spin)
 }
 
 struct QemuLockCnt {
+#ifndef CONFIG_LINUX
     QemuMutex mutex;
+#endif
     unsigned count;
 };
 
diff --git a/util/lockcnt.c b/util/lockcnt.c
index da1de77..c0acdfe 100644
--- a/util/lockcnt.c
+++ b/util/lockcnt.c
@@ -9,7 +9,289 @@
 #include "qemu/osdep.h"
 #include "qemu/thread.h"
 #include "qemu/atomic.h"
+#include "trace.h"
 
+#ifdef CONFIG_LINUX
+#include "qemu/futex.h"
+
+/* On Linux, bits 0-1 are a futex-based lock, bits 2-31 are the counter.
+ * For the mutex algorithm see Ulrich Drepper's "Futexes Are Tricky" (ok,
+ * this is not the most relaxing citation I could make...).  It is similar
+ * to mutex2 in the paper.
+ */
+
+#define QEMU_LOCKCNT_STATE_MASK    3
+#define QEMU_LOCKCNT_STATE_FREE    0	/* free, uncontended */
+#define QEMU_LOCKCNT_STATE_LOCKED  1	/* locked, uncontended */
+#define QEMU_LOCKCNT_STATE_WAITING 2   /* locked, contended */
+
+#define QEMU_LOCKCNT_COUNT_STEP    4
+#define QEMU_LOCKCNT_COUNT_SHIFT   2
+
+void qemu_lockcnt_init(QemuLockCnt *lockcnt)
+{
+    lockcnt->count = 0;
+}
+
+void qemu_lockcnt_destroy(QemuLockCnt *lockcnt)
+{
+}
+
+/* *val is the current value of lockcnt->count.
+ *
+ * If the lock is free, try a cmpxchg from *val to new_if_free; return
+ * true and set *val to the old value found by the cmpxchg in
+ * lockcnt->count.
+ *
+ * If the lock is taken, wait for it to be released and return false
+ * *without trying again to take the lock*.  Again, set *val to the
+ * new value of lockcnt->count.
+ *
+ * If *waited is true on return, new_if_free's bottom two bits must not
+ * be QEMU_LOCKCNT_STATE_LOCKED on subsequent calls, because the caller
+ * does not know if there are other waiters.  Furthermore, after *waited
+ * is set the caller has effectively acquired the lock.  If it returns
+ * with the lock not taken, it must wake another futex waiter.
+ */
+static bool qemu_lockcnt_cmpxchg_or_wait(QemuLockCnt *lockcnt, int *val,
+                                         int new_if_free, bool *waited)
+{
+    /* Fast path for when the lock is free.  */
+    if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_FREE) {
+        int expected = *val;
+
+        trace_lockcnt_fast_path_attempt(lockcnt, expected, new_if_free);
+        *val = atomic_cmpxchg(&lockcnt->count, expected, new_if_free);
+        if (*val == expected) {
+            trace_lockcnt_fast_path_success(lockcnt, expected, new_if_free);
+            *val = new_if_free;
+            return true;
+        }
+    }
+
+    /* The slow path moves from locked to waiting if necessary, then
+     * does a futex wait.  Both steps can be repeated ad nauseam,
+     * only getting out of the loop if we can have another shot at the
+     * fast path.  Once we can, get out to compute the new destination
+     * value for the fast path.
+     */
+    while ((*val & QEMU_LOCKCNT_STATE_MASK) != QEMU_LOCKCNT_STATE_FREE) {
+        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_LOCKED) {
+            int expected = *val;
+            int new = expected - QEMU_LOCKCNT_STATE_LOCKED + QEMU_LOCKCNT_STATE_WAITING;
+
+            trace_lockcnt_futex_wait_prepare(lockcnt, expected, new);
+            *val = atomic_cmpxchg(&lockcnt->count, expected, new);
+            if (*val == expected) {
+                *val = new;
+            }
+            continue;
+        }
+
+        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_WAITING) {
+            *waited = true;
+            trace_lockcnt_futex_wait(lockcnt, *val);
+            qemu_futex_wait(&lockcnt->count, *val);
+            *val = atomic_read(&lockcnt->count);
+            trace_lockcnt_futex_wait_resume(lockcnt, *val);
+            continue;
+        }
+
+        abort();
+    }
+    return false;
+}
+
+static void lockcnt_wake(QemuLockCnt *lockcnt)
+{
+    trace_lockcnt_futex_wake(lockcnt);
+    qemu_futex_wake(&lockcnt->count, 1);
+}
+
+void qemu_lockcnt_inc(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    bool waited = false;
+
+    for (;;) {
+        if (val >= QEMU_LOCKCNT_COUNT_STEP) {
+            int expected = val;
+            val = atomic_cmpxchg(&lockcnt->count, val, val + QEMU_LOCKCNT_COUNT_STEP);
+            if (val == expected) {
+                break;
+            }
+        } else {
+            /* The fast path is (0, unlocked)->(1, unlocked).  */
+            if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, QEMU_LOCKCNT_COUNT_STEP,
+                                             &waited)) {
+                break;
+            }
+        }
+    }
+
+    /* If we were woken by another thread, we should also wake one because
+     * we are effectively releasing the lock that was given to us.  This is
+     * the case where qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING
+     * in the low bits, and qemu_lockcnt_inc_and_unlock would find it and
+     * wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+void qemu_lockcnt_dec(QemuLockCnt *lockcnt)
+{
+    atomic_sub(&lockcnt->count, QEMU_LOCKCNT_COUNT_STEP);
+}
+
+/* Decrement a counter, and return locked if it is decremented to zero.
+ * If the function returns true, it is impossible for the counter to
+ * become nonzero until the next qemu_lockcnt_unlock.
+ */
+bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    for (;;) {
+        if (val >= 2 * QEMU_LOCKCNT_COUNT_STEP) {
+            int expected = val;
+            val = atomic_cmpxchg(&lockcnt->count, val, val - QEMU_LOCKCNT_COUNT_STEP);
+            if (val == expected) {
+                break;
+            }
+        } else {
+            /* If count is going 1->0, take the lock. The fast path is
+             * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
+             */
+            if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
+                return true;
+            }
+
+            if (waited) {
+                /* At this point we do not know if there are more waiters.  Assume
+                 * there are.
+                 */
+                locked_state = QEMU_LOCKCNT_STATE_WAITING;
+            }
+        }
+    }
+
+    /* If we were woken by another thread, but we're returning in unlocked
+     * state, we should also wake a thread because we are effectively
+     * releasing the lock that was given to us.  This is the case where
+     * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
+     * bits, and qemu_lockcnt_unlock would find it and wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+    return false;
+}
+
+/* If the counter is one, decrement it and return locked.  Otherwise do
+ * nothing.
+ *
+ * If the function returns true, it is impossible for the counter to
+ * become nonzero until the next qemu_lockcnt_unlock.
+ */
+bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    while (val < 2 * QEMU_LOCKCNT_COUNT_STEP) {
+        /* If count is going 1->0, take the lock. The fast path is
+         * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
+         */
+        if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
+            return true;
+        }
+
+        if (waited) {
+            /* At this point we do not know if there are more waiters.  Assume
+             * there are.
+             */
+            locked_state = QEMU_LOCKCNT_STATE_WAITING;
+        }
+    }
+
+    /* If we were woken by another thread, but we're returning in unlocked
+     * state, we should also wake a thread because we are effectively
+     * releasing the lock that was given to us.  This is the case where
+     * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
+     * bits, and qemu_lockcnt_inc_and_unlock would find it and wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+    return false;
+}
+
+void qemu_lockcnt_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int step = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    /* The third argument is only used if the low bits of val are 0
+     * (QEMU_LOCKCNT_STATE_FREE), so just blindly mix in the desired
+     * state.
+     */
+    while (!qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, val + step, &waited)) {
+        if (waited) {
+            /* At this point we do not know if there are more waiters.  Assume
+             * there are.
+             */
+            step = QEMU_LOCKCNT_STATE_WAITING;
+        }
+    }
+}
+
+void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt)
+{
+    int expected, new, val;
+
+    val = atomic_read(&lockcnt->count);
+    do {
+        expected = val;
+        new = (val + QEMU_LOCKCNT_COUNT_STEP) & ~QEMU_LOCKCNT_STATE_MASK;
+        trace_lockcnt_unlock_attempt(lockcnt, val, new);
+        val = atomic_cmpxchg(&lockcnt->count, val, new);
+    } while (val != expected);
+
+    trace_lockcnt_unlock_success(lockcnt, val, new);
+    if (val & QEMU_LOCKCNT_STATE_WAITING) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+void qemu_lockcnt_unlock(QemuLockCnt *lockcnt)
+{
+    int expected, new, val;
+
+    val = atomic_read(&lockcnt->count);
+    do {
+        expected = val;
+        new = val & ~QEMU_LOCKCNT_STATE_MASK;
+        trace_lockcnt_unlock_attempt(lockcnt, val, new);
+        val = atomic_cmpxchg(&lockcnt->count, val, new);
+    } while (val != expected);
+
+    trace_lockcnt_unlock_success(lockcnt, val, new);
+    if (val & QEMU_LOCKCNT_STATE_WAITING) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
+{
+    return atomic_read(&lockcnt->count) >> QEMU_LOCKCNT_COUNT_SHIFT;
+}
+#else
 void qemu_lockcnt_init(QemuLockCnt *lockcnt)
 {
     qemu_mutex_init(&lockcnt->mutex);
@@ -112,3 +394,4 @@ unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
 {
     return atomic_read(&lockcnt->count);
 }
+#endif
diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c
index d20cdde..37cd8ba 100644
--- a/util/qemu-thread-posix.c
+++ b/util/qemu-thread-posix.c
@@ -11,10 +11,6 @@
  *
  */
 #include "qemu/osdep.h"
-#ifdef __linux__
-#include <sys/syscall.h>
-#include <linux/futex.h>
-#endif
 #include "qemu/thread.h"
 #include "qemu/atomic.h"
 #include "qemu/notify.h"
@@ -294,28 +290,9 @@ void qemu_sem_wait(QemuSemaphore *sem)
 }
 
 #ifdef __linux__
-#define futex(...)              syscall(__NR_futex, __VA_ARGS__)
-
-static inline void futex_wake(QemuEvent *ev, int n)
-{
-    futex(ev, FUTEX_WAKE, n, NULL, NULL, 0);
-}
-
-static inline void futex_wait(QemuEvent *ev, unsigned val)
-{
-    while (futex(ev, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
-        switch (errno) {
-        case EWOULDBLOCK:
-            return;
-        case EINTR:
-            break; /* get out of switch and retry */
-        default:
-            abort();
-        }
-    }
-}
+#include "qemu/futex.h"
 #else
-static inline void futex_wake(QemuEvent *ev, int n)
+static inline void qemu_futex_wake(QemuEvent *ev, int n)
 {
     pthread_mutex_lock(&ev->lock);
     if (n == 1) {
@@ -326,7 +303,7 @@ static inline void futex_wake(QemuEvent *ev, int n)
     pthread_mutex_unlock(&ev->lock);
 }
 
-static inline void futex_wait(QemuEvent *ev, unsigned val)
+static inline void qemu_futex_wait(QemuEvent *ev, unsigned val)
 {
     pthread_mutex_lock(&ev->lock);
     if (ev->value == val) {
@@ -338,7 +315,7 @@ static inline void futex_wait(QemuEvent *ev, unsigned val)
 
 /* Valid transitions:
  * - free->set, when setting the event
- * - busy->set, when setting the event, followed by futex_wake
+ * - busy->set, when setting the event, followed by qemu_futex_wake
  * - set->free, when resetting the event
  * - free->busy, when waiting
  *
@@ -381,7 +358,7 @@ void qemu_event_set(QemuEvent *ev)
     if (atomic_read(&ev->value) != EV_SET) {
         if (atomic_xchg(&ev->value, EV_SET) == EV_BUSY) {
             /* There were waiters, wake them up.  */
-            futex_wake(ev, INT_MAX);
+            qemu_futex_wake(ev, INT_MAX);
         }
     }
 }
@@ -419,7 +396,7 @@ void qemu_event_wait(QemuEvent *ev)
                 return;
             }
         }
-        futex_wait(ev, EV_BUSY);
+        qemu_futex_wait(ev, EV_BUSY);
     }
 }
 
diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c
index 728e76b..178e016 100644
--- a/util/qemu-thread-win32.c
+++ b/util/qemu-thread-win32.c
@@ -269,7 +269,7 @@ void qemu_sem_wait(QemuSemaphore *sem)
  *
  * Valid transitions:
  * - free->set, when setting the event
- * - busy->set, when setting the event, followed by futex_wake
+ * - busy->set, when setting the event, followed by SetEvent
  * - set->free, when resetting the event
  * - free->busy, when waiting
  *
diff --git a/util/trace-events b/util/trace-events
index ed06aee..2b8aa30 100644
--- a/util/trace-events
+++ b/util/trace-events
@@ -30,3 +30,13 @@ qemu_anon_ram_free(void *ptr, size_t size) "ptr %p size %zu"
 hbitmap_iter_skip_words(const void *hb, void *hbi, uint64_t pos, unsigned long cur) "hb %p hbi %p pos %"PRId64" cur 0x%lx"
 hbitmap_reset(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64
 hbitmap_set(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64
+
+# util/lockcnt.c
+lockcnt_fast_path_attempt(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d"
+lockcnt_fast_path_success(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d succeeded"
+lockcnt_unlock_attempt(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d"
+lockcnt_unlock_success(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d succeeded"
+lockcnt_futex_wait_prepare(const void *lockcnt, int expected, int new) "lockcnt %p preparing slow path %d->%d"
+lockcnt_futex_wait(const void *lockcnt, int val) "lockcnt %p waiting on %d"
+lockcnt_futex_wait_resume(const void *lockcnt, int new) "lockcnt %p after wait: %d"
+lockcnt_futex_wake(const void *lockcnt) "lockcnt %p waking up one waiter"
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [Qemu-devel] [PATCH 05/10] aio-posix: split aio_dispatch_handlers out of aio_dispatch
  2017-01-12 16:55 [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
                   ` (3 preceding siblings ...)
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini
@ 2017-01-12 16:55 ` Paolo Bonzini
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 06/10] aio: tweak walking in dispatch phase Paolo Bonzini
                   ` (5 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Paolo Bonzini @ 2017-01-12 16:55 UTC (permalink / raw)
  To: qemu-devel; +Cc: famz, stefanha

This simplifies the handling of dispatch_fds.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-posix.c | 43 +++++++++++++++++++++++++------------------
 1 file changed, 25 insertions(+), 18 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index 1585571..25198d9 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -367,31 +367,16 @@ bool aio_pending(AioContext *ctx)
     return false;
 }
 
-/*
- * Note that dispatch_fds == false has the side-effect of post-poning the
- * freeing of deleted handlers.
- */
-bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
+static bool aio_dispatch_handlers(AioContext *ctx)
 {
-    AioHandler *node = NULL;
+    AioHandler *node;
     bool progress = false;
 
     /*
-     * If there are callbacks left that have been queued, we need to call them.
-     * Do not call select in this case, because it is possible that the caller
-     * does not need a complete flush (as is the case for aio_poll loops).
-     */
-    if (aio_bh_poll(ctx)) {
-        progress = true;
-    }
-
-    /*
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    if (dispatch_fds) {
-        node = QLIST_FIRST(&ctx->aio_handlers);
-    }
+    node = QLIST_FIRST(&ctx->aio_handlers);
     while (node) {
         AioHandler *tmp;
         int revents;
@@ -431,6 +416,28 @@ bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
         }
     }
 
+    return progress;
+}
+
+/*
+ * Note that dispatch_fds == false has the side-effect of post-poning the
+ * freeing of deleted handlers.
+ */
+bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
+{
+    bool progress;
+
+    /*
+     * If there are callbacks left that have been queued, we need to call them.
+     * Do not call select in this case, because it is possible that the caller
+     * does not need a complete flush (as is the case for aio_poll loops).
+     */
+    progress = aio_bh_poll(ctx);
+
+    if (dispatch_fds) {
+        progress |= aio_dispatch_handlers(ctx);
+    }
+
     /* Run our timers */
     progress |= timerlistgroup_run_timers(&ctx->tlg);
 
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [Qemu-devel] [PATCH 06/10] aio: tweak walking in dispatch phase
  2017-01-12 16:55 [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
                   ` (4 preceding siblings ...)
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 05/10] aio-posix: split aio_dispatch_handlers out of aio_dispatch Paolo Bonzini
@ 2017-01-12 16:55 ` Paolo Bonzini
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 07/10] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock Paolo Bonzini
                   ` (4 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Paolo Bonzini @ 2017-01-12 16:55 UTC (permalink / raw)
  To: qemu-devel; +Cc: famz, stefanha

Preparing for the following patch, use QLIST_FOREACH_SAFE and
modify the placement of walking_handlers increment/decrement.

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-posix.c | 26 ++++++++++++--------------
 aio-win32.c | 26 ++++++++++++--------------
 2 files changed, 24 insertions(+), 28 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index 25198d9..f83b7af 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -369,19 +369,17 @@ bool aio_pending(AioContext *ctx)
 
 static bool aio_dispatch_handlers(AioContext *ctx)
 {
-    AioHandler *node;
+    AioHandler *node, *tmp;
     bool progress = false;
 
     /*
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    node = QLIST_FIRST(&ctx->aio_handlers);
-    while (node) {
-        AioHandler *tmp;
-        int revents;
+    ctx->walking_handlers++;
 
-        ctx->walking_handlers++;
+    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
+        int revents;
 
         revents = node->pfd.revents & node->pfd.events;
         node->pfd.revents = 0;
@@ -405,17 +403,17 @@ static bool aio_dispatch_handlers(AioContext *ctx)
             progress = true;
         }
 
-        tmp = node;
-        node = QLIST_NEXT(node, node);
-
-        ctx->walking_handlers--;
-
-        if (!ctx->walking_handlers && tmp->deleted) {
-            QLIST_REMOVE(tmp, node);
-            g_free(tmp);
+        if (node->deleted) {
+            ctx->walking_handlers--;
+            if (!ctx->walking_handlers) {
+                QLIST_REMOVE(node, node);
+                g_free(node);
+            }
+            ctx->walking_handlers++;
         }
     }
 
+    ctx->walking_handlers--;
     return progress;
 }
 
diff --git a/aio-win32.c b/aio-win32.c
index d19dc42..1ad459d 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -227,20 +227,18 @@ bool aio_pending(AioContext *ctx)
 
 static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
 {
-    AioHandler *node;
+    AioHandler *node, *tmp;
     bool progress = false;
 
+    ctx->walking_handlers++;
+
     /*
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    node = QLIST_FIRST(&ctx->aio_handlers);
-    while (node) {
-        AioHandler *tmp;
+    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
         int revents = node->pfd.revents;
 
-        ctx->walking_handlers++;
-
         if (!node->deleted &&
             (revents || event_notifier_get_handle(node->e) == event) &&
             node->io_notify) {
@@ -275,17 +273,17 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
             }
         }
 
-        tmp = node;
-        node = QLIST_NEXT(node, node);
-
-        ctx->walking_handlers--;
-
-        if (!ctx->walking_handlers && tmp->deleted) {
-            QLIST_REMOVE(tmp, node);
-            g_free(tmp);
+        if (node->deleted) {
+            ctx->walking_handlers--;
+            if (!ctx->walking_handlers) {
+                QLIST_REMOVE(node, node);
+                g_free(node);
+            }
+            ctx->walking_handlers++;
         }
     }
 
+    ctx->walking_handlers--;
     return progress;
 }
 
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [Qemu-devel] [PATCH 07/10] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock
  2017-01-12 16:55 [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
                   ` (5 preceding siblings ...)
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 06/10] aio: tweak walking in dispatch phase Paolo Bonzini
@ 2017-01-12 16:55 ` Paolo Bonzini
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 08/10] aio-win32: " Paolo Bonzini
                   ` (3 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Paolo Bonzini @ 2017-01-12 16:55 UTC (permalink / raw)
  To: qemu-devel; +Cc: famz, stefanha

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-posix.c | 65 ++++++++++++++++++++++++++++++++++++-------------------------
 1 file changed, 39 insertions(+), 26 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index f83b7af..9453d83 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -16,7 +16,7 @@
 #include "qemu/osdep.h"
 #include "qemu-common.h"
 #include "block/block.h"
-#include "qemu/queue.h"
+#include "qemu/rcu_queue.h"
 #include "qemu/sockets.h"
 #include "qemu/cutils.h"
 #include "trace.h"
@@ -66,7 +66,7 @@ static bool aio_epoll_try_enable(AioContext *ctx)
     AioHandler *node;
     struct epoll_event event;
 
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         int r;
         if (node->deleted || !node->pfd.events) {
             continue;
@@ -212,24 +212,27 @@ void aio_set_fd_handler(AioContext *ctx,
     bool is_new = false;
     bool deleted = false;
 
+    qemu_lockcnt_lock(&ctx->list_lock);
+
     node = find_aio_handler(ctx, fd);
 
     /* Are we deleting the fd handler? */
     if (!io_read && !io_write && !io_poll) {
         if (node == NULL) {
+            qemu_lockcnt_unlock(&ctx->list_lock);
             return;
         }
 
         g_source_remove_poll(&ctx->source, &node->pfd);
 
         /* If the lock is held, just mark the node as deleted */
-        if (ctx->walking_handlers) {
+        if (qemu_lockcnt_count(&ctx->list_lock)) {
             node->deleted = 1;
             node->pfd.revents = 0;
         } else {
             /* Otherwise, delete it for real.  We can't just mark it as
-             * deleted because deleted nodes are only cleaned up after
-             * releasing the walking_handlers lock.
+             * deleted because deleted nodes are only cleaned up while
+             * no one is walking the handlers list.
              */
             QLIST_REMOVE(node, node);
             deleted = true;
@@ -243,7 +246,7 @@ void aio_set_fd_handler(AioContext *ctx,
             /* Alloc and insert if it's not already there */
             node = g_new0(AioHandler, 1);
             node->pfd.fd = fd;
-            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
 
             g_source_add_poll(&ctx->source, &node->pfd);
             is_new = true;
@@ -265,6 +268,7 @@ void aio_set_fd_handler(AioContext *ctx,
     }
 
     aio_epoll_update(ctx, node, is_new);
+    qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
 
     if (deleted) {
@@ -316,8 +320,8 @@ static void poll_set_started(AioContext *ctx, bool started)
 
     ctx->poll_started = started;
 
-    ctx->walking_handlers++;
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    qemu_lockcnt_inc(&ctx->list_lock);
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         IOHandler *fn;
 
         if (node->deleted) {
@@ -334,7 +338,7 @@ static void poll_set_started(AioContext *ctx, bool started)
             fn(node->opaque);
         }
     }
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
 }
 
 
@@ -349,22 +353,32 @@ bool aio_prepare(AioContext *ctx)
 bool aio_pending(AioContext *ctx)
 {
     AioHandler *node;
+    bool result = false;
 
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    /*
+     * We have to walk very carefully in case aio_set_fd_handler is
+     * called while we're walking.
+     */
+    qemu_lockcnt_inc(&ctx->list_lock);
+
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         int revents;
 
         revents = node->pfd.revents & node->pfd.events;
         if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
             aio_node_check(ctx, node->is_external)) {
-            return true;
+            result = true;
+            break;
         }
         if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
             aio_node_check(ctx, node->is_external)) {
-            return true;
+            result = true;
+            break;
         }
     }
+    qemu_lockcnt_dec(&ctx->list_lock);
 
-    return false;
+    return result;
 }
 
 static bool aio_dispatch_handlers(AioContext *ctx)
@@ -376,9 +390,9 @@ static bool aio_dispatch_handlers(AioContext *ctx)
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    ctx->walking_handlers++;
+    qemu_lockcnt_inc(&ctx->list_lock);
 
-    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
+    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
         int revents;
 
         revents = node->pfd.revents & node->pfd.events;
@@ -404,16 +418,15 @@ static bool aio_dispatch_handlers(AioContext *ctx)
         }
 
         if (node->deleted) {
-            ctx->walking_handlers--;
-            if (!ctx->walking_handlers) {
+            if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
                 QLIST_REMOVE(node, node);
                 g_free(node);
+                qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
             }
-            ctx->walking_handlers++;
         }
     }
 
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
     return progress;
 }
 
@@ -493,7 +506,7 @@ static bool run_poll_handlers_once(AioContext *ctx)
     bool progress = false;
     AioHandler *node;
 
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         if (!node->deleted && node->io_poll &&
                 node->io_poll(node->opaque)) {
             progress = true;
@@ -514,7 +527,7 @@ static bool run_poll_handlers_once(AioContext *ctx)
  * Note that ctx->notify_me must be non-zero so this function can detect
  * aio_notify().
  *
- * Note that the caller must have incremented ctx->walking_handlers.
+ * Note that the caller must have incremented ctx->list_lock.
  *
  * Returns: true if progress was made, false otherwise
  */
@@ -524,7 +537,7 @@ static bool run_poll_handlers(AioContext *ctx, int64_t max_ns)
     int64_t end_time;
 
     assert(ctx->notify_me);
-    assert(ctx->walking_handlers > 0);
+    assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
     assert(ctx->poll_disable_cnt == 0);
 
     trace_run_poll_handlers_begin(ctx, max_ns);
@@ -546,7 +559,7 @@ static bool run_poll_handlers(AioContext *ctx, int64_t max_ns)
  *
  * ctx->notify_me must be non-zero so this function can detect aio_notify().
  *
- * Note that the caller must have incremented ctx->walking_handlers.
+ * Note that the caller must have incremented ctx->list_lock.
  *
  * Returns: true if progress was made, false otherwise
  */
@@ -597,7 +610,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
         atomic_add(&ctx->notify_me, 2);
     }
 
-    ctx->walking_handlers++;
+    qemu_lockcnt_inc(&ctx->list_lock);
 
     if (ctx->poll_max_ns) {
         start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
@@ -611,7 +624,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
         /* fill pollfds */
 
         if (!aio_epoll_enabled(ctx)) {
-            QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+            QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
                 if (!node->deleted && node->pfd.events
                     && aio_node_check(ctx, node->is_external)) {
                     add_pollfd(node);
@@ -696,7 +709,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
     }
 
     npfd = 0;
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
 
     /* Run dispatch even if there were no readable fds to run timers */
     if (aio_dispatch(ctx, ret > 0)) {
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [Qemu-devel] [PATCH 08/10] aio-win32: remove walking_handlers, protecting AioHandler list with list_lock
  2017-01-12 16:55 [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
                   ` (6 preceding siblings ...)
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 07/10] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock Paolo Bonzini
@ 2017-01-12 16:55 ` Paolo Bonzini
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 09/10] aio: document locking Paolo Bonzini
                   ` (2 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Paolo Bonzini @ 2017-01-12 16:55 UTC (permalink / raw)
  To: qemu-devel; +Cc: famz, stefanha

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-win32.c | 73 +++++++++++++++++++++++++++++++++++++++----------------------
 1 file changed, 47 insertions(+), 26 deletions(-)

diff --git a/aio-win32.c b/aio-win32.c
index 1ad459d..900524c 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -21,6 +21,7 @@
 #include "qemu/queue.h"
 #include "qemu/sockets.h"
 #include "qapi/error.h"
+#include "qemu/rcu_queue.h"
 
 struct AioHandler {
     EventNotifier *e;
@@ -45,6 +46,7 @@ void aio_set_fd_handler(AioContext *ctx,
     /* fd is a SOCKET in our case */
     AioHandler *node;
 
+    qemu_lockcnt_lock(&ctx->list_lock);
     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
         if (node->pfd.fd == fd && !node->deleted) {
             break;
@@ -54,14 +56,14 @@ void aio_set_fd_handler(AioContext *ctx,
     /* Are we deleting the fd handler? */
     if (!io_read && !io_write) {
         if (node) {
-            /* If the lock is held, just mark the node as deleted */
-            if (ctx->walking_handlers) {
+            /* If aio_poll is in progress, just mark the node as deleted */
+            if (qemu_lockcnt_count(&ctx->list_lock)) {
                 node->deleted = 1;
                 node->pfd.revents = 0;
             } else {
                 /* Otherwise, delete it for real.  We can't just mark it as
                  * deleted because deleted nodes are only cleaned up after
-                 * releasing the walking_handlers lock.
+                 * releasing the list_lock.
                  */
                 QLIST_REMOVE(node, node);
                 g_free(node);
@@ -74,7 +76,7 @@ void aio_set_fd_handler(AioContext *ctx,
             /* Alloc and insert if it's not already there */
             node = g_new0(AioHandler, 1);
             node->pfd.fd = fd;
-            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
         }
 
         node->pfd.events = 0;
@@ -99,6 +101,7 @@ void aio_set_fd_handler(AioContext *ctx,
                        FD_CONNECT | FD_WRITE | FD_OOB);
     }
 
+    qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
 }
 
@@ -117,6 +120,7 @@ void aio_set_event_notifier(AioContext *ctx,
 {
     AioHandler *node;
 
+    qemu_lockcnt_lock(&ctx->list_lock);
     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
         if (node->e == e && !node->deleted) {
             break;
@@ -128,14 +132,14 @@ void aio_set_event_notifier(AioContext *ctx,
         if (node) {
             g_source_remove_poll(&ctx->source, &node->pfd);
 
-            /* If the lock is held, just mark the node as deleted */
-            if (ctx->walking_handlers) {
+            /* aio_poll is in progress, just mark the node as deleted */
+            if (qemu_lockcnt_count(&ctx->list_lock)) {
                 node->deleted = 1;
                 node->pfd.revents = 0;
             } else {
                 /* Otherwise, delete it for real.  We can't just mark it as
                  * deleted because deleted nodes are only cleaned up after
-                 * releasing the walking_handlers lock.
+                 * releasing the list_lock.
                  */
                 QLIST_REMOVE(node, node);
                 g_free(node);
@@ -149,7 +153,7 @@ void aio_set_event_notifier(AioContext *ctx,
             node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
             node->pfd.events = G_IO_IN;
             node->is_external = is_external;
-            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
 
             g_source_add_poll(&ctx->source, &node->pfd);
         }
@@ -157,6 +161,7 @@ void aio_set_event_notifier(AioContext *ctx,
         node->io_notify = io_notify;
     }
 
+    qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
 }
 
@@ -175,10 +180,16 @@ bool aio_prepare(AioContext *ctx)
     bool have_select_revents = false;
     fd_set rfds, wfds;
 
+    /*
+     * We have to walk very carefully in case aio_set_fd_handler is
+     * called while we're walking.
+     */
+    qemu_lockcnt_inc(&ctx->list_lock);
+
     /* fill fd sets */
     FD_ZERO(&rfds);
     FD_ZERO(&wfds);
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         if (node->io_read) {
             FD_SET ((SOCKET)node->pfd.fd, &rfds);
         }
@@ -188,7 +199,7 @@ bool aio_prepare(AioContext *ctx)
     }
 
     if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
-        QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+        QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
             node->pfd.revents = 0;
             if (FD_ISSET(node->pfd.fd, &rfds)) {
                 node->pfd.revents |= G_IO_IN;
@@ -202,41 +213,53 @@ bool aio_prepare(AioContext *ctx)
         }
     }
 
+    qemu_lockcnt_dec(&ctx->list_lock);
     return have_select_revents;
 }
 
 bool aio_pending(AioContext *ctx)
 {
     AioHandler *node;
+    bool result = false;
 
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    /*
+     * We have to walk very carefully in case aio_set_fd_handler is
+     * called while we're walking.
+     */
+    qemu_lockcnt_inc(&ctx->list_lock);
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         if (node->pfd.revents && node->io_notify) {
-            return true;
+            result = true;
+            break;
         }
 
         if ((node->pfd.revents & G_IO_IN) && node->io_read) {
-            return true;
+            result = true;
+            break;
         }
         if ((node->pfd.revents & G_IO_OUT) && node->io_write) {
-            return true;
+            result = true;
+            break;
         }
     }
 
-    return false;
+    qemu_lockcnt_dec(&ctx->list_lock);
+    return result;
 }
 
 static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
 {
-    AioHandler *node, *tmp;
+    AioHandler *node;
     bool progress = false;
+    AioHandler *tmp;
 
-    ctx->walking_handlers++;
+    qemu_lockcnt_inc(&ctx->list_lock);
 
     /*
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
+    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
         int revents = node->pfd.revents;
 
         if (!node->deleted &&
@@ -274,16 +297,15 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
         }
 
         if (node->deleted) {
-            ctx->walking_handlers--;
-            if (!ctx->walking_handlers) {
+            if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
                 QLIST_REMOVE(node, node);
                 g_free(node);
+                qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
             }
-            ctx->walking_handlers++;
         }
     }
 
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
     return progress;
 }
 
@@ -321,20 +343,19 @@ bool aio_poll(AioContext *ctx, bool blocking)
         atomic_add(&ctx->notify_me, 2);
     }
 
+    qemu_lockcnt_inc(&ctx->list_lock);
     have_select_revents = aio_prepare(ctx);
 
-    ctx->walking_handlers++;
-
     /* fill fd sets */
     count = 0;
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         if (!node->deleted && node->io_notify
             && aio_node_check(ctx, node->is_external)) {
             events[count++] = event_notifier_get_handle(node->e);
         }
     }
 
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
     first = true;
 
     /* ctx->notifier is always registered.  */
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [Qemu-devel] [PATCH 09/10] aio: document locking
  2017-01-12 16:55 [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
                   ` (7 preceding siblings ...)
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 08/10] aio-win32: " Paolo Bonzini
@ 2017-01-12 16:55 ` Paolo Bonzini
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 10/10] async: optimize aio_bh_poll Paolo Bonzini
  2017-01-12 17:30 ` [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1 no-reply
  10 siblings, 0 replies; 15+ messages in thread
From: Paolo Bonzini @ 2017-01-12 16:55 UTC (permalink / raw)
  To: qemu-devel; +Cc: famz, stefanha

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/multiple-iothreads.txt | 13 +++++--------
 include/block/aio.h         | 32 ++++++++++++++++----------------
 2 files changed, 21 insertions(+), 24 deletions(-)

diff --git a/docs/multiple-iothreads.txt b/docs/multiple-iothreads.txt
index 0e7cdb2..e4d340b 100644
--- a/docs/multiple-iothreads.txt
+++ b/docs/multiple-iothreads.txt
@@ -84,9 +84,8 @@ How to synchronize with an IOThread
 AioContext is not thread-safe so some rules must be followed when using file
 descriptors, event notifiers, timers, or BHs across threads:
 
-1. AioContext functions can be called safely from file descriptor, event
-notifier, timer, or BH callbacks invoked by the AioContext.  No locking is
-necessary.
+1. AioContext functions can always be called safely.  They handle their
+own locking internally.
 
 2. Other threads wishing to access the AioContext must use
 aio_context_acquire()/aio_context_release() for mutual exclusion.  Once the
@@ -94,16 +93,14 @@ context is acquired no other thread can access it or run event loop iterations
 in this AioContext.
 
 aio_context_acquire()/aio_context_release() calls may be nested.  This
-means you can call them if you're not sure whether #1 applies.
+means you can call them if you're not sure whether #2 applies.
 
 There is currently no lock ordering rule if a thread needs to acquire multiple
 AioContexts simultaneously.  Therefore, it is only safe for code holding the
 QEMU global mutex to acquire other AioContexts.
 
-Side note: the best way to schedule a function call across threads is to create
-a BH in the target AioContext beforehand and then call qemu_bh_schedule().  No
-acquire/release or locking is needed for the qemu_bh_schedule() call.  But be
-sure to acquire the AioContext for aio_bh_new() if necessary.
+Side note: the best way to schedule a function call across threads is to call
+aio_bh_schedule_oneshot().  No acquire/release or locking is needed.
 
 AioContext and the block layer
 ------------------------------
diff --git a/include/block/aio.h b/include/block/aio.h
index be3adfe..7df271d 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -53,18 +53,12 @@ struct LinuxAioState;
 struct AioContext {
     GSource source;
 
-    /* Protects all fields from multi-threaded access */
+    /* Used by AioContext users to protect from multi-threaded access.  */
     QemuRecMutex lock;
 
-    /* The list of registered AIO handlers */
+    /* The list of registered AIO handlers.  Protected by ctx->list_lock. */
     QLIST_HEAD(, AioHandler) aio_handlers;
 
-    /* This is a simple lock used to protect the aio_handlers list.
-     * Specifically, it's used to ensure that no callbacks are removed while
-     * we're walking and dispatching callbacks.
-     */
-    int walking_handlers;
-
     /* Used to avoid unnecessary event_notifier_set calls in aio_notify;
      * accessed with atomic primitives.  If this field is 0, everything
      * (file descriptors, bottom halves, timers) will be re-evaluated
@@ -90,9 +84,9 @@ struct AioContext {
      */
     uint32_t notify_me;
 
-    /* A lock to protect between bh's adders and deleter, and to ensure
-     * that no callbacks are removed while we're walking and dispatching
-     * them.
+    /* A lock to protect between QEMUBH and AioHandler adders and deleter,
+     * and to ensure that no callbacks are removed while we're walking and
+     * dispatching them.
      */
     QemuLockCnt list_lock;
 
@@ -114,7 +108,9 @@ struct AioContext {
     bool notified;
     EventNotifier notifier;
 
-    /* Thread pool for performing work and receiving completion callbacks */
+    /* Thread pool for performing work and receiving completion callbacks.
+     * Has its own locking.
+     */
     struct ThreadPool *thread_pool;
 
 #ifdef CONFIG_LINUX_AIO
@@ -124,7 +120,9 @@ struct AioContext {
     struct LinuxAioState *linux_aio;
 #endif
 
-    /* TimerLists for calling timers - one per clock type */
+    /* TimerLists for calling timers - one per clock type.  Has its own
+     * locking.
+     */
     QEMUTimerListGroup tlg;
 
     int external_disable_cnt;
@@ -178,9 +176,11 @@ void aio_context_unref(AioContext *ctx);
  * automatically takes care of calling aio_context_acquire and
  * aio_context_release.
  *
- * Access to timers and BHs from a thread that has not acquired AioContext
- * is possible.  Access to callbacks for now must be done while the AioContext
- * is owned by the thread (FIXME).
+ * Note that this is separate from bdrv_drained_begin/bdrv_drained_end.  A
+ * thread still has to call those to avoid being interrupted by the guest.
+ *
+ * Bottom halves, timers and callbacks can be created or removed without
+ * acquiring the AioContext.
  */
 void aio_context_acquire(AioContext *ctx);
 
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [Qemu-devel] [PATCH 10/10] async: optimize aio_bh_poll
  2017-01-12 16:55 [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
                   ` (8 preceding siblings ...)
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 09/10] aio: document locking Paolo Bonzini
@ 2017-01-12 16:55 ` Paolo Bonzini
  2017-01-12 17:30 ` [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1 no-reply
  10 siblings, 0 replies; 15+ messages in thread
From: Paolo Bonzini @ 2017-01-12 16:55 UTC (permalink / raw)
  To: qemu-devel; +Cc: famz, stefanha

Avoid entering the slow path of qemu_lockcnt_dec_and_lock if
no bottom half has to be deleted.

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 async.c | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/async.c b/async.c
index 2305e11..0d218ab 100644
--- a/async.c
+++ b/async.c
@@ -92,6 +92,7 @@ int aio_bh_poll(AioContext *ctx)
 {
     QEMUBH *bh, **bhp, *next;
     int ret;
+    bool deleted = false;
 
     qemu_lockcnt_inc(&ctx->list_lock);
 
@@ -112,9 +113,17 @@ int aio_bh_poll(AioContext *ctx)
             bh->idle = 0;
             aio_bh_call(bh);
         }
+        if (bh->deleted) {
+            deleted = true;
+        }
     }
 
     /* remove deleted bhs */
+    if (!deleted) {
+        qemu_lockcnt_dec(&ctx->list_lock);
+        return ret;
+    }
+
     if (qemu_lockcnt_dec_and_lock(&ctx->list_lock)) {
         bhp = &ctx->first_bh;
         while (*bhp) {
@@ -128,7 +137,6 @@ int aio_bh_poll(AioContext *ctx)
         }
         qemu_lockcnt_unlock(&ctx->list_lock);
     }
-
     return ret;
 }
 
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* Re: [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1
  2017-01-12 16:55 [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
                   ` (9 preceding siblings ...)
  2017-01-12 16:55 ` [Qemu-devel] [PATCH 10/10] async: optimize aio_bh_poll Paolo Bonzini
@ 2017-01-12 17:30 ` no-reply
  10 siblings, 0 replies; 15+ messages in thread
From: no-reply @ 2017-01-12 17:30 UTC (permalink / raw)
  To: pbonzini; +Cc: famz, qemu-devel

Hi,

Your series seems to have some coding style problems. See output below for
more information:

Message-id: 20170112165531.11882-1-pbonzini@redhat.com
Subject: [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1
Type: series

=== TEST SCRIPT BEGIN ===
#!/bin/bash

BASE=base
n=1
total=$(git log --oneline $BASE.. | wc -l)
failed=0

# Useful git options
git config --local diff.renamelimit 0
git config --local diff.renames True

commits="$(git log --format=%H --reverse $BASE..)"
for c in $commits; do
    echo "Checking PATCH $n/$total: $(git log -n 1 --format=%s $c)..."
    if ! git show $c --format=email | ./scripts/checkpatch.pl --mailback -; then
        failed=1
        echo
    fi
    n=$((n+1))
done

exit $failed
=== TEST SCRIPT END ===

Updating 3c8cf5a9c21ff8782164d1def7f44bd888713384
From https://github.com/patchew-project/qemu
 * [new tag]         patchew/20170112165531.11882-1-pbonzini@redhat.com -> patchew/20170112165531.11882-1-pbonzini@redhat.com
Switched to a new branch 'test'
c3b6ff3 async: optimize aio_bh_poll
10daea4 aio: document locking
c87dd80 aio-win32: remove walking_handlers, protecting AioHandler list with list_lock
ebb7fc8 aio-posix: remove walking_handlers, protecting AioHandler list with list_lock
a57c5b9 aio: tweak walking in dispatch phase
ec80912 aio-posix: split aio_dispatch_handlers out of aio_dispatch
3477fee qemu-thread: optimize QemuLockCnt with futexes on Linux
5e9110c aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh
8babf64 qemu-thread: introduce QemuLockCnt
26a92ef aio: rename bh_lock to list_lock

=== OUTPUT BEGIN ===
Checking PATCH 1/10: aio: rename bh_lock to list_lock...
Checking PATCH 2/10: qemu-thread: introduce QemuLockCnt...
Checking PATCH 3/10: aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh...
Checking PATCH 4/10: qemu-thread: optimize QemuLockCnt with futexes on Linux...
ERROR: code indent should never use tabs
#108: FILE: util/lockcnt.c:24:
+#define QEMU_LOCKCNT_STATE_FREE    0^I/* free, uncontended */$

ERROR: code indent should never use tabs
#109: FILE: util/lockcnt.c:25:
+#define QEMU_LOCKCNT_STATE_LOCKED  1^I/* locked, uncontended */$

WARNING: line over 80 characters
#165: FILE: util/lockcnt.c:81:
+            int new = expected - QEMU_LOCKCNT_STATE_LOCKED + QEMU_LOCKCNT_STATE_WAITING;

WARNING: line over 80 characters
#203: FILE: util/lockcnt.c:119:
+            val = atomic_cmpxchg(&lockcnt->count, val, val + QEMU_LOCKCNT_COUNT_STEP);

WARNING: line over 80 characters
#209: FILE: util/lockcnt.c:125:
+            if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, QEMU_LOCKCNT_COUNT_STEP,

WARNING: line over 80 characters
#245: FILE: util/lockcnt.c:161:
+            val = atomic_cmpxchg(&lockcnt->count, val, val - QEMU_LOCKCNT_COUNT_STEP);

WARNING: line over 80 characters
#253: FILE: util/lockcnt.c:169:
+            if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {

WARNING: line over 80 characters
#258: FILE: util/lockcnt.c:174:
+                /* At this point we do not know if there are more waiters.  Assume

WARNING: line over 80 characters
#294: FILE: util/lockcnt.c:210:
+        if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {

total: 2 errors, 7 warnings, 447 lines checked

Your patch has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.

Checking PATCH 5/10: aio-posix: split aio_dispatch_handlers out of aio_dispatch...
Checking PATCH 6/10: aio: tweak walking in dispatch phase...
Checking PATCH 7/10: aio-posix: remove walking_handlers, protecting AioHandler list with list_lock...
Checking PATCH 8/10: aio-win32: remove walking_handlers, protecting AioHandler list with list_lock...
Checking PATCH 9/10: aio: document locking...
Checking PATCH 10/10: async: optimize aio_bh_poll...
=== OUTPUT END ===

Test command exited with code: 1


---
Email generated automatically by Patchew [http://patchew.org/].
Please send your feedback to patchew-devel@freelists.org

^ permalink raw reply	[flat|nested] 15+ messages in thread

* [Qemu-devel] [PATCH 06/10] aio: tweak walking in dispatch phase
  2017-01-12 18:07 [Qemu-devel] [PATCH v5 " Paolo Bonzini
@ 2017-01-12 18:07 ` Paolo Bonzini
  0 siblings, 0 replies; 15+ messages in thread
From: Paolo Bonzini @ 2017-01-12 18:07 UTC (permalink / raw)
  To: qemu-devel; +Cc: famz, stefanha

Preparing for the following patch, use QLIST_FOREACH_SAFE and
modify the placement of walking_handlers increment/decrement.

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-posix.c | 26 ++++++++++++--------------
 aio-win32.c | 26 ++++++++++++--------------
 2 files changed, 24 insertions(+), 28 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index 25198d9..f83b7af 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -369,19 +369,17 @@ bool aio_pending(AioContext *ctx)
 
 static bool aio_dispatch_handlers(AioContext *ctx)
 {
-    AioHandler *node;
+    AioHandler *node, *tmp;
     bool progress = false;
 
     /*
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    node = QLIST_FIRST(&ctx->aio_handlers);
-    while (node) {
-        AioHandler *tmp;
-        int revents;
+    ctx->walking_handlers++;
 
-        ctx->walking_handlers++;
+    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
+        int revents;
 
         revents = node->pfd.revents & node->pfd.events;
         node->pfd.revents = 0;
@@ -405,17 +403,17 @@ static bool aio_dispatch_handlers(AioContext *ctx)
             progress = true;
         }
 
-        tmp = node;
-        node = QLIST_NEXT(node, node);
-
-        ctx->walking_handlers--;
-
-        if (!ctx->walking_handlers && tmp->deleted) {
-            QLIST_REMOVE(tmp, node);
-            g_free(tmp);
+        if (node->deleted) {
+            ctx->walking_handlers--;
+            if (!ctx->walking_handlers) {
+                QLIST_REMOVE(node, node);
+                g_free(node);
+            }
+            ctx->walking_handlers++;
         }
     }
 
+    ctx->walking_handlers--;
     return progress;
 }
 
diff --git a/aio-win32.c b/aio-win32.c
index d19dc42..1ad459d 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -227,20 +227,18 @@ bool aio_pending(AioContext *ctx)
 
 static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
 {
-    AioHandler *node;
+    AioHandler *node, *tmp;
     bool progress = false;
 
+    ctx->walking_handlers++;
+
     /*
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    node = QLIST_FIRST(&ctx->aio_handlers);
-    while (node) {
-        AioHandler *tmp;
+    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
         int revents = node->pfd.revents;
 
-        ctx->walking_handlers++;
-
         if (!node->deleted &&
             (revents || event_notifier_get_handle(node->e) == event) &&
             node->io_notify) {
@@ -275,17 +273,17 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
             }
         }
 
-        tmp = node;
-        node = QLIST_NEXT(node, node);
-
-        ctx->walking_handlers--;
-
-        if (!ctx->walking_handlers && tmp->deleted) {
-            QLIST_REMOVE(tmp, node);
-            g_free(tmp);
+        if (node->deleted) {
+            ctx->walking_handlers--;
+            if (!ctx->walking_handlers) {
+                QLIST_REMOVE(node, node);
+                g_free(node);
+            }
+            ctx->walking_handlers++;
         }
     }
 
+    ctx->walking_handlers--;
     return progress;
 }
 
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [Qemu-devel] [PATCH 06/10] aio: tweak walking in dispatch phase
  2017-01-04 13:26 [Qemu-devel] [PATCH v3 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
@ 2017-01-04 13:26 ` Paolo Bonzini
  0 siblings, 0 replies; 15+ messages in thread
From: Paolo Bonzini @ 2017-01-04 13:26 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz

Preparing for the following patch, use QLIST_FOREACH_SAFE and
modify the placement of walking_handlers increment/decrement.

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-posix.c | 26 ++++++++++++--------------
 aio-win32.c | 26 ++++++++++++--------------
 2 files changed, 24 insertions(+), 28 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index 25198d9..f83b7af 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -369,19 +369,17 @@ bool aio_pending(AioContext *ctx)
 
 static bool aio_dispatch_handlers(AioContext *ctx)
 {
-    AioHandler *node;
+    AioHandler *node, *tmp;
     bool progress = false;
 
     /*
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    node = QLIST_FIRST(&ctx->aio_handlers);
-    while (node) {
-        AioHandler *tmp;
-        int revents;
+    ctx->walking_handlers++;
 
-        ctx->walking_handlers++;
+    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
+        int revents;
 
         revents = node->pfd.revents & node->pfd.events;
         node->pfd.revents = 0;
@@ -405,17 +403,17 @@ static bool aio_dispatch_handlers(AioContext *ctx)
             progress = true;
         }
 
-        tmp = node;
-        node = QLIST_NEXT(node, node);
-
-        ctx->walking_handlers--;
-
-        if (!ctx->walking_handlers && tmp->deleted) {
-            QLIST_REMOVE(tmp, node);
-            g_free(tmp);
+        if (node->deleted) {
+            ctx->walking_handlers--;
+            if (!ctx->walking_handlers) {
+                QLIST_REMOVE(node, node);
+                g_free(node);
+            }
+            ctx->walking_handlers++;
         }
     }
 
+    ctx->walking_handlers--;
     return progress;
 }
 
diff --git a/aio-win32.c b/aio-win32.c
index d19dc42..1ad459d 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -227,20 +227,18 @@ bool aio_pending(AioContext *ctx)
 
 static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
 {
-    AioHandler *node;
+    AioHandler *node, *tmp;
     bool progress = false;
 
+    ctx->walking_handlers++;
+
     /*
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    node = QLIST_FIRST(&ctx->aio_handlers);
-    while (node) {
-        AioHandler *tmp;
+    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
         int revents = node->pfd.revents;
 
-        ctx->walking_handlers++;
-
         if (!node->deleted &&
             (revents || event_notifier_get_handle(node->e) == event) &&
             node->io_notify) {
@@ -275,17 +273,17 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
             }
         }
 
-        tmp = node;
-        node = QLIST_NEXT(node, node);
-
-        ctx->walking_handlers--;
-
-        if (!ctx->walking_handlers && tmp->deleted) {
-            QLIST_REMOVE(tmp, node);
-            g_free(tmp);
+        if (node->deleted) {
+            ctx->walking_handlers--;
+            if (!ctx->walking_handlers) {
+                QLIST_REMOVE(node, node);
+                g_free(node);
+            }
+            ctx->walking_handlers++;
         }
     }
 
+    ctx->walking_handlers--;
     return progress;
 }
 
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [Qemu-devel] [PATCH 06/10] aio: tweak walking in dispatch phase
  2016-12-21 14:03 [Qemu-devel] [PATCH v2 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
@ 2016-12-21 14:03 ` Paolo Bonzini
  0 siblings, 0 replies; 15+ messages in thread
From: Paolo Bonzini @ 2016-12-21 14:03 UTC (permalink / raw)
  To: qemu-devel; +Cc: qemu-block

Preparing for the following patch, use QLIST_FOREACH_SAFE and
modify the placement of walking_handlers increment/decrement.

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-posix.c | 26 ++++++++++++--------------
 aio-win32.c | 26 ++++++++++++--------------
 2 files changed, 24 insertions(+), 28 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index 25198d9..f83b7af 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -369,19 +369,17 @@ bool aio_pending(AioContext *ctx)
 
 static bool aio_dispatch_handlers(AioContext *ctx)
 {
-    AioHandler *node;
+    AioHandler *node, *tmp;
     bool progress = false;
 
     /*
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    node = QLIST_FIRST(&ctx->aio_handlers);
-    while (node) {
-        AioHandler *tmp;
-        int revents;
+    ctx->walking_handlers++;
 
-        ctx->walking_handlers++;
+    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
+        int revents;
 
         revents = node->pfd.revents & node->pfd.events;
         node->pfd.revents = 0;
@@ -405,17 +403,17 @@ static bool aio_dispatch_handlers(AioContext *ctx)
             progress = true;
         }
 
-        tmp = node;
-        node = QLIST_NEXT(node, node);
-
-        ctx->walking_handlers--;
-
-        if (!ctx->walking_handlers && tmp->deleted) {
-            QLIST_REMOVE(tmp, node);
-            g_free(tmp);
+        if (node->deleted) {
+            ctx->walking_handlers--;
+            if (!ctx->walking_handlers) {
+                QLIST_REMOVE(node, node);
+                g_free(node);
+            }
+            ctx->walking_handlers++;
         }
     }
 
+    ctx->walking_handlers--;
     return progress;
 }
 
diff --git a/aio-win32.c b/aio-win32.c
index d19dc42..1ad459d 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -227,20 +227,18 @@ bool aio_pending(AioContext *ctx)
 
 static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
 {
-    AioHandler *node;
+    AioHandler *node, *tmp;
     bool progress = false;
 
+    ctx->walking_handlers++;
+
     /*
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    node = QLIST_FIRST(&ctx->aio_handlers);
-    while (node) {
-        AioHandler *tmp;
+    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
         int revents = node->pfd.revents;
 
-        ctx->walking_handlers++;
-
         if (!node->deleted &&
             (revents || event_notifier_get_handle(node->e) == event) &&
             node->io_notify) {
@@ -275,17 +273,17 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
             }
         }
 
-        tmp = node;
-        node = QLIST_NEXT(node, node);
-
-        ctx->walking_handlers--;
-
-        if (!ctx->walking_handlers && tmp->deleted) {
-            QLIST_REMOVE(tmp, node);
-            g_free(tmp);
+        if (node->deleted) {
+            ctx->walking_handlers--;
+            if (!ctx->walking_handlers) {
+                QLIST_REMOVE(node, node);
+                g_free(node);
+            }
+            ctx->walking_handlers++;
         }
     }
 
+    ctx->walking_handlers--;
     return progress;
 }
 
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 15+ messages in thread

end of thread, other threads:[~2017-01-12 18:08 UTC | newest]

Thread overview: 15+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2017-01-12 16:55 [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
2017-01-12 16:55 ` [Qemu-devel] [PATCH 01/10] aio: rename bh_lock to list_lock Paolo Bonzini
2017-01-12 16:55 ` [Qemu-devel] [PATCH 02/10] qemu-thread: introduce QemuLockCnt Paolo Bonzini
2017-01-12 16:55 ` [Qemu-devel] [PATCH 03/10] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh Paolo Bonzini
2017-01-12 16:55 ` [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini
2017-01-12 16:55 ` [Qemu-devel] [PATCH 05/10] aio-posix: split aio_dispatch_handlers out of aio_dispatch Paolo Bonzini
2017-01-12 16:55 ` [Qemu-devel] [PATCH 06/10] aio: tweak walking in dispatch phase Paolo Bonzini
2017-01-12 16:55 ` [Qemu-devel] [PATCH 07/10] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock Paolo Bonzini
2017-01-12 16:55 ` [Qemu-devel] [PATCH 08/10] aio-win32: " Paolo Bonzini
2017-01-12 16:55 ` [Qemu-devel] [PATCH 09/10] aio: document locking Paolo Bonzini
2017-01-12 16:55 ` [Qemu-devel] [PATCH 10/10] async: optimize aio_bh_poll Paolo Bonzini
2017-01-12 17:30 ` [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1 no-reply
  -- strict thread matches above, loose matches on Subject: below --
2017-01-12 18:07 [Qemu-devel] [PATCH v5 " Paolo Bonzini
2017-01-12 18:07 ` [Qemu-devel] [PATCH 06/10] aio: tweak walking in dispatch phase Paolo Bonzini
2017-01-04 13:26 [Qemu-devel] [PATCH v3 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
2017-01-04 13:26 ` [Qemu-devel] [PATCH 06/10] aio: tweak walking in dispatch phase Paolo Bonzini
2016-12-21 14:03 [Qemu-devel] [PATCH v2 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
2016-12-21 14:03 ` [Qemu-devel] [PATCH 06/10] aio: tweak walking in dispatch phase Paolo Bonzini

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.