All of lore.kernel.org
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH for-2.9 00/10] aio_context_acquire/release pushdown, part 1
@ 2016-11-29 11:46 Paolo Bonzini
  2016-11-29 11:46 ` [Qemu-devel] [PATCH 01/10] aio: rename bh_lock to list_lock Paolo Bonzini
                   ` (9 more replies)
  0 siblings, 10 replies; 34+ messages in thread
From: Paolo Bonzini @ 2016-11-29 11:46 UTC (permalink / raw)
  To: qemu-devel; +Cc: qemu-block

This is the first step of pushing down the AioContext lock.  Bottom halves
are already protected by their own lock, use it also for walking_bh
and for the handlers list (including walking_handlers).  The (lock,
walking_foo) pair is wrapped into the QemuLockCnt primitive.

Paolo

Paolo Bonzini (10):
  aio: rename bh_lock to list_lock
  qemu-thread: introduce QemuLockCnt
  aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh
  qemu-thread: optimize QemuLockCnt with futexes on Linux
  aio: tweak walking in dispatch phase
  aio-posix: remove walking_handlers, protecting AioHandler list with list_lock
  aio-win32: remove walking_handlers, protecting AioHandler list with list_lock
  aio: document locking
  aio: push aio_context_acquire/release down to dispatching
  async: optimize aio_bh_poll

 aio-posix.c                 |  81 ++++-----
 aio-win32.c                 | 107 ++++++------
 async.c                     |  47 +++---
 docs/lockcnt.txt            | 342 ++++++++++++++++++++++++++++++++++++++
 docs/multiple-iothreads.txt |   5 +-
 include/block/aio.h         |  38 ++---
 include/qemu/futex.h        |  36 ++++
 include/qemu/thread.h       |  19 +++
 util/Makefile.objs          |   1 +
 util/lockcnt.c              | 395 ++++++++++++++++++++++++++++++++++++++++++++
 util/qemu-thread-posix.c    |  25 +--
 util/trace-events           |  10 ++
 12 files changed, 955 insertions(+), 151 deletions(-)
 create mode 100644 docs/lockcnt.txt
 create mode 100644 include/qemu/futex.h
 create mode 100644 util/lockcnt.c

-- 
2.9.3

^ permalink raw reply	[flat|nested] 34+ messages in thread

* [Qemu-devel] [PATCH 01/10] aio: rename bh_lock to list_lock
  2016-11-29 11:46 [Qemu-devel] [PATCH for-2.9 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
@ 2016-11-29 11:46 ` Paolo Bonzini
  2016-11-30 12:53   ` Stefan Hajnoczi
  2016-11-29 11:46 ` [Qemu-devel] [PATCH 02/10] qemu-thread: introduce QemuLockCnt Paolo Bonzini
                   ` (8 subsequent siblings)
  9 siblings, 1 reply; 34+ messages in thread
From: Paolo Bonzini @ 2016-11-29 11:46 UTC (permalink / raw)
  To: qemu-devel; +Cc: qemu-block

This will be used for AioHandlers too.  There is going to be little
or no contention, so it is better to reuse the same lock.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 async.c             | 20 ++++++++++----------
 include/block/aio.h |  2 +-
 2 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/async.c b/async.c
index b2de360..ef7043f 100644
--- a/async.c
+++ b/async.c
@@ -53,14 +53,14 @@ void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
         .cb = cb,
         .opaque = opaque,
     };
-    qemu_mutex_lock(&ctx->bh_lock);
+    qemu_mutex_lock(&ctx->list_lock);
     bh->next = ctx->first_bh;
     bh->scheduled = 1;
     bh->deleted = 1;
     /* Make sure that the members are ready before putting bh into list */
     smp_wmb();
     ctx->first_bh = bh;
-    qemu_mutex_unlock(&ctx->bh_lock);
+    qemu_mutex_unlock(&ctx->list_lock);
     aio_notify(ctx);
 }
 
@@ -73,12 +73,12 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
         .cb = cb,
         .opaque = opaque,
     };
-    qemu_mutex_lock(&ctx->bh_lock);
+    qemu_mutex_lock(&ctx->list_lock);
     bh->next = ctx->first_bh;
     /* Make sure that the members are ready before putting bh into list */
     smp_wmb();
     ctx->first_bh = bh;
-    qemu_mutex_unlock(&ctx->bh_lock);
+    qemu_mutex_unlock(&ctx->list_lock);
     return bh;
 }
 
@@ -120,7 +120,7 @@ int aio_bh_poll(AioContext *ctx)
 
     /* remove deleted bhs */
     if (!ctx->walking_bh) {
-        qemu_mutex_lock(&ctx->bh_lock);
+        qemu_mutex_lock(&ctx->list_lock);
         bhp = &ctx->first_bh;
         while (*bhp) {
             bh = *bhp;
@@ -131,7 +131,7 @@ int aio_bh_poll(AioContext *ctx)
                 bhp = &bh->next;
             }
         }
-        qemu_mutex_unlock(&ctx->bh_lock);
+        qemu_mutex_unlock(&ctx->list_lock);
     }
 
     return ret;
@@ -270,7 +270,7 @@ aio_ctx_finalize(GSource     *source)
     }
 #endif
 
-    qemu_mutex_lock(&ctx->bh_lock);
+    qemu_mutex_lock(&ctx->list_lock);
     while (ctx->first_bh) {
         QEMUBH *next = ctx->first_bh->next;
 
@@ -280,12 +280,12 @@ aio_ctx_finalize(GSource     *source)
         g_free(ctx->first_bh);
         ctx->first_bh = next;
     }
-    qemu_mutex_unlock(&ctx->bh_lock);
+    qemu_mutex_unlock(&ctx->list_lock);
 
     aio_set_event_notifier(ctx, &ctx->notifier, false, NULL);
     event_notifier_cleanup(&ctx->notifier);
     qemu_rec_mutex_destroy(&ctx->lock);
-    qemu_mutex_destroy(&ctx->bh_lock);
+    qemu_mutex_destroy(&ctx->list_lock);
     timerlistgroup_deinit(&ctx->tlg);
 }
 
@@ -371,7 +371,7 @@ AioContext *aio_context_new(Error **errp)
     ctx->linux_aio = NULL;
 #endif
     ctx->thread_pool = NULL;
-    qemu_mutex_init(&ctx->bh_lock);
+    qemu_mutex_init(&ctx->list_lock);
     qemu_rec_mutex_init(&ctx->lock);
     timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 
diff --git a/include/block/aio.h b/include/block/aio.h
index c7ae27c..eee3139 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -90,7 +90,7 @@ struct AioContext {
     uint32_t notify_me;
 
     /* lock to protect between bh's adders and deleter */
-    QemuMutex bh_lock;
+    QemuMutex list_lock;
 
     /* Anchor of the list of Bottom Halves belonging to the context */
     struct QEMUBH *first_bh;
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 34+ messages in thread

* [Qemu-devel] [PATCH 02/10] qemu-thread: introduce QemuLockCnt
  2016-11-29 11:46 [Qemu-devel] [PATCH for-2.9 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
  2016-11-29 11:46 ` [Qemu-devel] [PATCH 01/10] aio: rename bh_lock to list_lock Paolo Bonzini
@ 2016-11-29 11:46 ` Paolo Bonzini
  2016-11-29 19:34   ` Eric Blake
                     ` (2 more replies)
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 03/10] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh Paolo Bonzini
                   ` (7 subsequent siblings)
  9 siblings, 3 replies; 34+ messages in thread
From: Paolo Bonzini @ 2016-11-29 11:46 UTC (permalink / raw)
  To: qemu-devel; +Cc: qemu-block

A QemuLockCnt comprises a counter and a mutex, with primitives
to increment and decrement the counter, and to take and release the
mutex.  It can be used to do lock-free visits to a data structure
whenever mutexes would be too heavy-weight and the critical section
is too long for RCU.

This could be implemented simply by protecting the counter with the
mutex, but QemuLockCnt is harder to misuse and more efficient.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/lockcnt.txt      | 343 ++++++++++++++++++++++++++++++++++++++++++++++++++
 include/qemu/thread.h |  17 +++
 util/Makefile.objs    |   1 +
 util/lockcnt.c        | 113 +++++++++++++++++
 4 files changed, 474 insertions(+)
 create mode 100644 docs/lockcnt.txt
 create mode 100644 util/lockcnt.c

diff --git a/docs/lockcnt.txt b/docs/lockcnt.txt
new file mode 100644
index 0000000..fc5d240
--- /dev/null
+++ b/docs/lockcnt.txt
@@ -0,0 +1,343 @@
+DOCUMENTATION FOR LOCKED COUNTERS (aka QemuLockCnt)
+===================================================
+
+QEMU often uses reference counts to track data structures that are being
+accessed and should not be freed.  For example, a loop that invoke
+callbacks like this is not safe:
+
+    QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+        if (ioh->revents & G_IO_OUT) {
+            ioh->fd_write(ioh->opaque);
+        }
+    }
+
+QLIST_FOREACH_SAFE protects against deletion of the current node (ioh)
+by stashing away its "next" pointer.  However, ioh->fd_write could
+actually delete the next node from the list.  The simplest way to
+avoid this is to mark the node as deleted, and remove it from the
+list in the above loop:
+
+    QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+        if (ioh->deleted) {
+            QLIST_REMOVE(ioh, next);
+            g_free(ioh);
+        } else {
+            if (ioh->revents & G_IO_OUT) {
+                ioh->fd_write(ioh->opaque);
+            }
+        }
+    }
+
+If however this loop must also be reentrant, i.e. it is possible that
+ioh->fd_write invokes the loop again, some kind of counting is needed:
+
+    walking_handlers++;
+    QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+        if (ioh->deleted) {
+            if (walking_handlers == 1) {
+                QLIST_REMOVE(ioh, next);
+                g_free(ioh);
+            }
+        } else {
+            if (ioh->revents & G_IO_OUT) {
+                ioh->fd_write(ioh->opaque);
+            }
+        }
+    }
+    walking_handlers--;
+
+One may think of using the RCU primitives, rcu_read_lock() and
+rcu_read_unlock(); effectively, the RCU nesting count would take
+the place of the walking_handlers global variable.  Indeed,
+reference counting and RCU have similar purposes, but their usage in
+general is complementary:
+
+- reference counting is fine-grained and limited to a single data
+  structure; RCU delays reclamation of *all* RCU-protected data
+  structures;
+
+- reference counting works even in the presence of code that keeps
+  a reference for a long time; RCU critical sections in principle
+  should be kept short;
+
+- reference counting is often applied to code that is not thread-safe
+  but is reentrant; in fact, usage of reference counting in QEMU predates
+  the introduction of threads by many years.  RCU is generally used to
+  protect readers from other threads freeing memory after concurrent
+  modifications to a data structure.
+
+- reclaiming data can be done by a separate thread in the case of RCU;
+  this can improve performance, but also delay reclamation undesirably.
+  With reference counting, reclamation is deterministic.
+
+This file documents QemuLockCnt, an abstraction for using reference
+counting in code that has to be both thread-safe and reentrant.
+
+
+QemuLockCnt concepts
+--------------------
+
+A QemuLockCnt comprises both a counter and a mutex; it has primitives
+to increment and decrement the counter, and to take and release the
+mutex.  The counter notes how many visits to the data structures are
+taking place (the visits could be from different threads, or there could
+be multiple reentrant visits from the same thread).  The basic rules
+governing the counter/mutex pair then are the following:
+
+- Data protected by the QemuLockCnt must not be freed unless the
+  counter is zero and the mutex is taken.
+
+- A new visit cannot be started while the counter is zero and the
+  mutex is taken.
+
+Most of the time, the mutex protects all writes to the data structure,
+not just frees, though there could be cases where this is not necessary.
+
+Reads, instead, can be done without taking the mutex, as long as the
+readers and writers use the same macros that are used for RCU, for
+example atomic_rcu_read, atomic_rcu_set, QLIST_FOREACH_RCU, etc.  This is
+because the reads are done outside a lock and a set or QLIST_INSERT_HEAD
+can happen concurrently with the read.  The RCU API ensures that the
+processor and the compiler see all required memory barriers.
+
+This could be implemented simply by protecting the counter with the
+mutex, for example:
+
+    // (1)
+    qemu_mutex_lock(&walking_handlers_mutex);
+    walking_handlers++;
+    qemu_mutex_unlock(&walking_handlers_mutex);
+
+    ...
+
+    // (2)
+    qemu_mutex_lock(&walking_handlers_mutex);
+    if (--walking_handlers == 0) {
+        QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+            if (ioh->deleted) {
+                QLIST_REMOVE(ioh, next);
+                g_free(ioh);
+            }
+        }
+    }
+    qemu_mutex_unlock(&walking_handlers_mutex);
+
+Here, no frees can happen in the code represented by the ellipsis.
+If another thread is executing critical section (2), that part of
+the code cannot be entered, because the thread will not be able
+to increment the walking_handlers variable.  And of course
+during the visit any other thread will see a nonzero value for
+walking_handlers, as in the single-threaded code.
+
+Note that it is possible for multiple concurrent accesses to delay
+the cleanup arbitrarily; in other words, for the walking_handlers
+counter to never become zero.  For this reason, this technique is
+more easily applicable if concurrent access to the structure is rare.
+
+However, critical sections are easy to forget since you have to do
+them for each modification of the counter.  QemuLockCnt ensures that
+all modifications of the counter take the lock appropriately, and it
+can also be more efficient in two ways:
+
+- it avoids taking the lock for many operations (for example
+  incrementing the counter while it is non-zero);
+
+- on some platforms, one could implement QemuLockCnt to hold the
+  lock and the mutex in a single word, making it no more expensive
+  than simply managing a counter using atomic operations (see
+  docs/atomics.txt).  This is not implemented yet, but can be
+  very helpful if concurrent access to the data structure is
+  expected to be rare.
+
+
+Using the same mutex for frees and writes can still incur some small
+inefficiencies; for example, a visit can never start if the counter is
+zero and the mutex is taken---even if the mutex is taken by a write,
+which in principle need not block a visit of the data structure.
+However, these are usually not a problem if any of the following
+assumptions are valid:
+
+- concurrent access is possible but rare
+
+- writes are rare
+
+- writes are frequent, but this kind of write (e.g. appending to a
+  list) has a very small critical section.
+
+For example, QEMU uses QemuLockCnt to manage an AioContext's list of
+bottom halves and file descriptor handlers.  Modifications to the list
+of file descriptor handlers are rare.  Creation of a new bottom half is
+frequent and can happen on a fast path; however: 1) it is almost never
+concurrent with a visit to the list of bottom halves; 2) it only has
+three instructions in the critical path, two assignments and a smp_wmb().
+
+
+QemuLockCnt API
+---------------
+
+    void qemu_lockcnt_init(QemuLockCnt *lockcnt);
+
+        Initialize lockcnt's counter to zero and prepare its mutex
+        for usage.
+
+    void qemu_lockcnt_destroy(QemuLockCnt *lockcnt);
+
+        Destroy lockcnt's mutex.
+
+    void qemu_lockcnt_inc(QemuLockCnt *lockcnt);
+
+        If the lockcnt's count is zero, wait for critical sections
+        to finish and increment lockcnt's count to 1.  If the count
+        is not zero, just increment it.
+
+        Because this function can wait on the mutex, it must not be
+        called while the lockcnt's mutex is held by the current thread.
+        For the same reason, qemu_lockcnt_inc can also contribute to
+        AB-BA deadlocks.  This is a sample deadlock scenario:
+
+              thread 1                      thread 2
+              -------------------------------------------------------
+              qemu_lockcnt_lock(&lc1);
+                                            qemu_lockcnt_lock(&lc2);
+              qemu_lockcnt_inc(&lc2);
+                                            qemu_lockcnt_inc(&lc1);
+
+    void qemu_lockcnt_dec(QemuLockCnt *lockcnt);
+
+        Decrement lockcnt's count.
+
+    bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt);
+
+        Decrement the count.  If the new count is zero, lock
+        the mutex and return true.  Otherwise, return false.
+
+    bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt);
+
+        If the count is 1, decrement the count to zero, lock
+        the mutex and return true.  Otherwise, return false.
+
+    void qemu_lockcnt_lock(QemuLockCnt *lockcnt);
+
+        Lock the lockcnt's mutex.  Remember that concurrent visits
+        are not blocked unless the count is also zero.  You can
+        use qemu_lockcnt_count to check for this inside a critical
+        section.
+
+    void qemu_lockcnt_unlock(QemuLockCnt *lockcnt);
+
+        Release the lockcnt's mutex.
+
+    void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt);
+
+        This is the same as
+
+            qemu_lockcnt_unlock(lockcnt);
+            qemu_lockcnt_inc(lockcnt);
+
+        but more efficient.
+
+    int qemu_lockcnt_count(QemuLockCnt *lockcnt);
+
+        Return the lockcnt's count.  The count can change at any time
+        any time; still, while the lockcnt is locked, one can usefully
+        check whether the count is non-zero.
+
+
+QemuLockCnt usage
+-----------------
+
+This section explains the typical usage patterns for QemuLockCnt functions.
+
+Setting a variable to a non-NULL value can be done between
+qemu_lockcnt_lock and qemu_lockcnt_unlock:
+
+    qemu_lockcnt_lock(&xyz_lockcnt);
+    if (!xyz) {
+        new_xyz = g_new(XYZ, 1);
+        ...
+        atomic_rcu_set(&xyz, new_xyz);
+    }
+    qemu_lockcnt_unlock(&xyz_lockcnt);
+
+Accessing the value can be done between qemu_lockcnt_inc and
+qemu_lockcnt_dec:
+
+    qemu_lockcnt_inc(&xyz_lockcnt);
+    if (xyz) {
+        XYZ *p = atomic_rcu_read(&xyz);
+        ...
+        /* Accesses can now be done through "p".  */
+    }
+    qemu_lockcnt_dec(&xyz_lockcnt);
+
+Freeing the object can similarly use qemu_lockcnt_lock and
+qemu_lockcnt_unlock, but you also need to ensure that the count
+is zero (i.e. there is no concurrent visit).  Because qemu_lockcnt_inc
+takes the QemuLockCnt's lock, the count cannot become non-zero while
+the object is being freed.  Freeing an object looks like this:
+
+    qemu_lockcnt_lock(&xyz_lockcnt);
+    if (!qemu_lockcnt_count(&xyz_lockcnt)) {
+        g_free(xyz);
+        xyz = NULL;
+    }
+    qemu_lockcnt_unlock(&xyz_lockcnt);
+
+If an object has to be freed right after a visit, you can combine
+the decrement, the locking and the check on count as follows:
+
+    qemu_lockcnt_inc(&xyz_lockcnt);
+    if (xyz) {
+        XYZ *p = atomic_rcu_read(&xyz);
+        ...
+        /* Accesses can now be done through "p".  */
+    }
+    if (qemu_lockcnt_dec_and_lock(&xyz_lockcnt)) {
+        g_free(xyz);
+        xyz = NULL;
+        qemu_lockcnt_unlock(&xyz_lockcnt);
+    }
+
+QemuLockCnt can also be used to access a list as follows:
+
+    qemu_lockcnt_inc(&io_handlers_lockcnt);
+    QLIST_FOREACH_RCU(ioh, &io_handlers, pioh) {
+        if (ioh->revents & G_IO_OUT) {
+            ioh->fd_write(ioh->opaque);
+        }
+    }
+
+    if (qemu_lockcnt_dec_and_lock(&io_handlers_lockcnt)) {
+        QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+            if (ioh->deleted) {
+                QLIST_REMOVE(ioh, next);
+                g_free(ioh);
+            }
+        }
+        qemu_lockcnt_unlock(&io_handlers_lockcnt);
+    }
+
+Again, the RCU primitives are used because new items can be added to the
+list during the walk.  QLIST_FOREACH_RCU ensures that the processor and
+the compiler see the appropriate memory barriers.
+
+An alternative pattern uses qemu_lockcnt_dec_if_lock:
+
+    qemu_lockcnt_inc(&io_handlers_lockcnt);
+    QLIST_FOREACH_SAFE_RCU(ioh, &io_handlers, next, pioh) {
+        if (ioh->deleted) {
+            if (qemu_lockcnt_dec_if_lock(&io_handlers_lockcnt)) {
+                QLIST_REMOVE(ioh, next);
+                g_free(ioh);
+                qemu_lockcnt_inc_and_unlock(&io_handlers_lockcnt);
+            }
+        } else {
+            if (ioh->revents & G_IO_OUT) {
+                ioh->fd_write(ioh->opaque);
+            }
+        }
+    }
+    qemu_lockcnt_dec(&io_handlers_lockcnt);
+
+Here you can use qemu_lockcnt_dec instead of qemu_lockcnt_dec_and_lock,
+because there is no special task to do if the count goes from 1 to 0.
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index e8e665f..ce18b17 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -8,6 +8,7 @@ typedef struct QemuMutex QemuMutex;
 typedef struct QemuCond QemuCond;
 typedef struct QemuSemaphore QemuSemaphore;
 typedef struct QemuEvent QemuEvent;
+typedef struct QemuLockCnt QemuLockCnt;
 typedef struct QemuThread QemuThread;
 
 #ifdef _WIN32
@@ -98,4 +99,20 @@ static inline void qemu_spin_unlock(QemuSpin *spin)
     __sync_lock_release(&spin->value);
 }
 
+struct QemuLockCnt {
+    QemuMutex mutex;
+    unsigned count;
+};
+
+void qemu_lockcnt_init(QemuLockCnt *lockcnt);
+void qemu_lockcnt_destroy(QemuLockCnt *lockcnt);
+void qemu_lockcnt_inc(QemuLockCnt *lockcnt);
+void qemu_lockcnt_dec(QemuLockCnt *lockcnt);
+bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt);
+bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt);
+void qemu_lockcnt_lock(QemuLockCnt *lockcnt);
+void qemu_lockcnt_unlock(QemuLockCnt *lockcnt);
+void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt);
+unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt);
+
 #endif
diff --git a/util/Makefile.objs b/util/Makefile.objs
index ad0f9c7..c1f247d 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -1,5 +1,6 @@
 util-obj-y = osdep.o cutils.o unicode.o qemu-timer-common.o
 util-obj-y += bufferiszero.o
+util-obj-y += lockcnt.o
 util-obj-$(CONFIG_POSIX) += compatfd.o
 util-obj-$(CONFIG_POSIX) += event_notifier-posix.o
 util-obj-$(CONFIG_POSIX) += mmap-alloc.o
diff --git a/util/lockcnt.c b/util/lockcnt.c
new file mode 100644
index 0000000..78ed1e4
--- /dev/null
+++ b/util/lockcnt.c
@@ -0,0 +1,113 @@
+/*
+ * QemuLockCnt implementation
+ *
+ * Copyright Red Hat, Inc. 2015
+ *
+ * Author:
+ *   Paolo Bonzini <pbonzini@redhat.com>
+ */
+#include "qemu/osdep.h"
+#include "qemu/thread.h"
+#include "qemu/atomic.h"
+
+void qemu_lockcnt_init(QemuLockCnt *lockcnt)
+{
+    qemu_mutex_init(&lockcnt->mutex);
+    lockcnt->count = 0;
+}
+
+void qemu_lockcnt_destroy(QemuLockCnt *lockcnt)
+{
+    qemu_mutex_destroy(&lockcnt->mutex);
+}
+
+void qemu_lockcnt_inc(QemuLockCnt *lockcnt)
+{
+    int old;
+    for (;;) {
+        old = atomic_read(&lockcnt->count);
+        if (old == 0) {
+            qemu_lockcnt_lock(lockcnt);
+            qemu_lockcnt_inc_and_unlock(lockcnt);
+            return;
+        } else {
+            if (atomic_cmpxchg(&lockcnt->count, old, old + 1) == old) {
+                return;
+            }
+        }
+    }
+}
+
+void qemu_lockcnt_dec(QemuLockCnt *lockcnt)
+{
+    atomic_dec(&lockcnt->count);
+}
+
+/* Decrement a counter, and return locked if it is decremented to zero.
+ * It is impossible for the counter to become nonzero while the mutex
+ * is taken.
+ */
+bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    while (val > 1) {
+        int old = atomic_cmpxchg(&lockcnt->count, val, val - 1);
+        if (old != val) {
+            val = old;
+            continue;
+        }
+
+        return false;
+    }
+
+    qemu_lockcnt_lock(lockcnt);
+    if (atomic_fetch_dec(&lockcnt->count) == 1) {
+        return true;
+    }
+
+    qemu_lockcnt_unlock(lockcnt);
+    return false;
+}
+
+/* Decrement a counter and return locked if it is decremented to zero.
+ * Otherwise do nothing.
+ *
+ * It is impossible for the counter to become nonzero while the mutex
+ * is taken.
+ */
+bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_mb_read(&lockcnt->count);
+    if (val > 1) {
+        return false;
+    }
+
+    qemu_lockcnt_lock(lockcnt);
+    if (atomic_fetch_dec(&lockcnt->count) == 1) {
+        return true;
+    }
+
+    qemu_lockcnt_inc_and_unlock(lockcnt);
+    return false;
+}
+
+void qemu_lockcnt_lock(QemuLockCnt *lockcnt)
+{
+    qemu_mutex_lock(&lockcnt->mutex);
+}
+
+void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt)
+{
+    atomic_inc(&lockcnt->count);
+    qemu_mutex_unlock(&lockcnt->mutex);
+}
+
+void qemu_lockcnt_unlock(QemuLockCnt *lockcnt)
+{
+    qemu_mutex_unlock(&lockcnt->mutex);
+}
+
+unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
+{
+    return lockcnt->count;
+}
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 34+ messages in thread

* [Qemu-devel] [PATCH 03/10] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh
  2016-11-29 11:46 [Qemu-devel] [PATCH for-2.9 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
  2016-11-29 11:46 ` [Qemu-devel] [PATCH 01/10] aio: rename bh_lock to list_lock Paolo Bonzini
  2016-11-29 11:46 ` [Qemu-devel] [PATCH 02/10] qemu-thread: introduce QemuLockCnt Paolo Bonzini
@ 2016-11-29 11:47 ` Paolo Bonzini
  2016-11-30 13:06   ` Stefan Hajnoczi
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini
                   ` (6 subsequent siblings)
  9 siblings, 1 reply; 34+ messages in thread
From: Paolo Bonzini @ 2016-11-29 11:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: qemu-block

This will make it possible to walk the list of bottom halves without
holding the AioContext lock---and in turn to call bottom half
handlers without holding the lock.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 async.c             | 35 ++++++++++++++++-------------------
 include/block/aio.h | 12 +++++-------
 2 files changed, 21 insertions(+), 26 deletions(-)

diff --git a/async.c b/async.c
index ef7043f..f606785 100644
--- a/async.c
+++ b/async.c
@@ -53,14 +53,14 @@ void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
         .cb = cb,
         .opaque = opaque,
     };
-    qemu_mutex_lock(&ctx->list_lock);
+    qemu_lockcnt_lock(&ctx->list_lock);
     bh->next = ctx->first_bh;
     bh->scheduled = 1;
     bh->deleted = 1;
     /* Make sure that the members are ready before putting bh into list */
     smp_wmb();
     ctx->first_bh = bh;
-    qemu_mutex_unlock(&ctx->list_lock);
+    qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
 }
 
@@ -73,12 +73,12 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
         .cb = cb,
         .opaque = opaque,
     };
-    qemu_mutex_lock(&ctx->list_lock);
+    qemu_lockcnt_lock(&ctx->list_lock);
     bh->next = ctx->first_bh;
     /* Make sure that the members are ready before putting bh into list */
     smp_wmb();
     ctx->first_bh = bh;
-    qemu_mutex_unlock(&ctx->list_lock);
+    qemu_lockcnt_unlock(&ctx->list_lock);
     return bh;
 }
 
@@ -93,13 +93,11 @@ int aio_bh_poll(AioContext *ctx)
     QEMUBH *bh, **bhp, *next;
     int ret;
 
-    ctx->walking_bh++;
+    qemu_lockcnt_inc(&ctx->list_lock);
 
     ret = 0;
-    for (bh = ctx->first_bh; bh; bh = next) {
-        /* Make sure that fetching bh happens before accessing its members */
-        smp_read_barrier_depends();
-        next = bh->next;
+    for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) {
+        next = atomic_rcu_read(&bh->next);
         /* The atomic_xchg is paired with the one in qemu_bh_schedule.  The
          * implicit memory barrier ensures that the callback sees all writes
          * done by the scheduling thread.  It also ensures that the scheduling
@@ -116,11 +114,8 @@ int aio_bh_poll(AioContext *ctx)
         }
     }
 
-    ctx->walking_bh--;
-
     /* remove deleted bhs */
-    if (!ctx->walking_bh) {
-        qemu_mutex_lock(&ctx->list_lock);
+    if (qemu_lockcnt_dec_and_lock(&ctx->list_lock)) {
         bhp = &ctx->first_bh;
         while (*bhp) {
             bh = *bhp;
@@ -131,7 +126,7 @@ int aio_bh_poll(AioContext *ctx)
                 bhp = &bh->next;
             }
         }
-        qemu_mutex_unlock(&ctx->list_lock);
+        qemu_lockcnt_unlock(&ctx->list_lock);
     }
 
     return ret;
@@ -187,7 +182,8 @@ aio_compute_timeout(AioContext *ctx)
     int timeout = -1;
     QEMUBH *bh;
 
-    for (bh = ctx->first_bh; bh; bh = bh->next) {
+    for (bh = atomic_rcu_read(&ctx->first_bh); bh;
+         bh = atomic_rcu_read(&bh->next)) {
         if (bh->scheduled) {
             if (bh->idle) {
                 /* idle bottom halves will be polled at least
@@ -270,7 +266,8 @@ aio_ctx_finalize(GSource     *source)
     }
 #endif
 
-    qemu_mutex_lock(&ctx->list_lock);
+    qemu_lockcnt_lock(&ctx->list_lock);
+    assert(!qemu_lockcnt_count(&ctx->list_lock));
     while (ctx->first_bh) {
         QEMUBH *next = ctx->first_bh->next;
 
@@ -280,12 +277,12 @@ aio_ctx_finalize(GSource     *source)
         g_free(ctx->first_bh);
         ctx->first_bh = next;
     }
-    qemu_mutex_unlock(&ctx->list_lock);
+    qemu_lockcnt_unlock(&ctx->list_lock);
 
     aio_set_event_notifier(ctx, &ctx->notifier, false, NULL);
     event_notifier_cleanup(&ctx->notifier);
     qemu_rec_mutex_destroy(&ctx->lock);
-    qemu_mutex_destroy(&ctx->list_lock);
+    qemu_lockcnt_destroy(&ctx->list_lock);
     timerlistgroup_deinit(&ctx->tlg);
 }
 
@@ -363,6 +360,7 @@ AioContext *aio_context_new(Error **errp)
         goto fail;
     }
     g_source_set_can_recurse(&ctx->source, true);
+    qemu_lockcnt_init(&ctx->list_lock);
     aio_set_event_notifier(ctx, &ctx->notifier,
                            false,
                            (EventNotifierHandler *)
@@ -371,7 +369,6 @@ AioContext *aio_context_new(Error **errp)
     ctx->linux_aio = NULL;
 #endif
     ctx->thread_pool = NULL;
-    qemu_mutex_init(&ctx->list_lock);
     qemu_rec_mutex_init(&ctx->lock);
     timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 
diff --git a/include/block/aio.h b/include/block/aio.h
index eee3139..b833fe8 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -89,17 +89,15 @@ struct AioContext {
      */
     uint32_t notify_me;
 
-    /* lock to protect between bh's adders and deleter */
-    QemuMutex list_lock;
+    /* A lock to protect between bh's adders and deleter, and to ensure
+     * that no callbacks are removed while we're walking and dispatching
+     * them.
+     */
+    QemuLockCnt list_lock;
 
     /* Anchor of the list of Bottom Halves belonging to the context */
     struct QEMUBH *first_bh;
 
-    /* A simple lock used to protect the first_bh list, and ensure that
-     * no callbacks are removed while we're walking and dispatching callbacks.
-     */
-    int walking_bh;
-
     /* Used by aio_notify.
      *
      * "notified" is used to avoid expensive event_notifier_test_and_clear
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 34+ messages in thread

* [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux
  2016-11-29 11:46 [Qemu-devel] [PATCH for-2.9 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
                   ` (2 preceding siblings ...)
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 03/10] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh Paolo Bonzini
@ 2016-11-29 11:47 ` Paolo Bonzini
  2016-11-30 13:19   ` Stefan Hajnoczi
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 05/10] aio: tweak walking in dispatch phase Paolo Bonzini
                   ` (5 subsequent siblings)
  9 siblings, 1 reply; 34+ messages in thread
From: Paolo Bonzini @ 2016-11-29 11:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: qemu-block

This is complex, but I think it is reasonably documented in the source.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/lockcnt.txt         |   9 +-
 include/qemu/futex.h     |  36 ++++++
 include/qemu/thread.h    |   2 +
 util/lockcnt.c           | 282 +++++++++++++++++++++++++++++++++++++++++++++++
 util/qemu-thread-posix.c |  25 +----
 util/trace-events        |  10 ++
 6 files changed, 335 insertions(+), 29 deletions(-)
 create mode 100644 include/qemu/futex.h

diff --git a/docs/lockcnt.txt b/docs/lockcnt.txt
index fc5d240..594764b 100644
--- a/docs/lockcnt.txt
+++ b/docs/lockcnt.txt
@@ -142,12 +142,11 @@ can also be more efficient in two ways:
 - it avoids taking the lock for many operations (for example
   incrementing the counter while it is non-zero);
 
-- on some platforms, one could implement QemuLockCnt to hold the
-  lock and the mutex in a single word, making it no more expensive
+- on some platforms, one can implement QemuLockCnt to hold the lock
+  and the mutex in a single word, making the fast path no more expensive
   than simply managing a counter using atomic operations (see
-  docs/atomics.txt).  This is not implemented yet, but can be
-  very helpful if concurrent access to the data structure is
-  expected to be rare.
+  docs/atomics.txt).  This can be very helpful if concurrent access to
+  the data structure is expected to be rare.
 
 
 Using the same mutex for frees and writes can still incur some small
diff --git a/include/qemu/futex.h b/include/qemu/futex.h
new file mode 100644
index 0000000..c3d1089
--- /dev/null
+++ b/include/qemu/futex.h
@@ -0,0 +1,36 @@
+/*
+ * Wrappers around Linux futex syscall
+ *
+ * Copyright Red Hat, Inc. 2015
+ *
+ * Author:
+ *  Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include <sys/syscall.h>
+#include <linux/futex.h>
+
+#define futex(...)              syscall(__NR_futex, __VA_ARGS__)
+
+static inline void futex_wake(void *f, int n)
+{
+    futex(f, FUTEX_WAKE, n, NULL, NULL, 0);
+}
+
+static inline void futex_wait(void *f, unsigned val)
+{
+    while (futex(f, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
+        switch (errno) {
+        case EWOULDBLOCK:
+            return;
+        case EINTR:
+            break; /* get out of switch and retry */
+        default:
+            abort();
+        }
+    }
+}
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index ce18b17..6b9cbd0 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -100,7 +100,9 @@ static inline void qemu_spin_unlock(QemuSpin *spin)
 }
 
 struct QemuLockCnt {
+#ifndef CONFIG_LINUX
     QemuMutex mutex;
+#endif
     unsigned count;
 };
 
diff --git a/util/lockcnt.c b/util/lockcnt.c
index 78ed1e4..71e8f8f 100644
--- a/util/lockcnt.c
+++ b/util/lockcnt.c
@@ -9,7 +9,288 @@
 #include "qemu/osdep.h"
 #include "qemu/thread.h"
 #include "qemu/atomic.h"
+#include "trace.h"
 
+#ifdef CONFIG_LINUX
+#include "qemu/futex.h"
+
+/* On Linux, bits 0-1 are a futex-based lock, bits 2-31 are the counter.
+ * For the mutex algorithm see Ulrich Drepper's "Futexes Are Tricky" (ok,
+ * this is not the most relaxing citation I could make...).  It is similar
+ * to mutex2 in the paper.
+ */
+
+#define QEMU_LOCKCNT_STATE_MASK    3
+#define QEMU_LOCKCNT_STATE_FREE    0
+#define QEMU_LOCKCNT_STATE_LOCKED  1
+#define QEMU_LOCKCNT_STATE_WAITING 2
+
+#define QEMU_LOCKCNT_COUNT_STEP    4
+#define QEMU_LOCKCNT_COUNT_SHIFT   2
+
+void qemu_lockcnt_init(QemuLockCnt *lockcnt)
+{
+    lockcnt->count = 0;
+}
+
+void qemu_lockcnt_destroy(QemuLockCnt *lockcnt)
+{
+}
+
+/* *val is the current value of lockcnt->count.
+ *
+ * If the lock is free, try a cmpxchg from *val to new_if_free; if
+ * successful return true and set *val to the new value of
+ * lockcnt->count, i.e. new_if_free.
+ *
+ * If the lock is taken, wait for it to be released and return false
+ * *without trying again to take the lock*.  Again, set *val to the
+ * new value of lockcnt->count.  *waited is set to true before sleeping.
+ *
+ * new_if_free's bottom two bits must not be QEMU_LOCKCNT_STATE_LOCKED
+ * if calling this function a second time after it has returned
+ * false.
+ */
+static bool qemu_lockcnt_cmpxchg_or_wait(QemuLockCnt *lockcnt, int *val,
+                                         int new_if_free, bool *waited)
+{
+    /* Fast path for when the lock is free.  */
+    if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_FREE) {
+        int expected = *val;
+
+        trace_lockcnt_fast_path_attempt(lockcnt, expected, new_if_free);
+        *val = atomic_cmpxchg(&lockcnt->count, expected, new_if_free);
+        if (*val == expected) {
+            trace_lockcnt_fast_path_success(lockcnt, expected, new_if_free);
+            *val = new_if_free;
+            return true;
+        }
+    }
+
+    /* The slow path moves from locked to waiting if necessary, then
+     * does a futex wait.  Both steps can be repeated ad nauseam,
+     * only getting out of the loop if we can have another shot at the
+     * fast path.  Once we can, get out to compute the new destination
+     * value for the fast path.
+     */
+    while ((*val & QEMU_LOCKCNT_STATE_MASK) != QEMU_LOCKCNT_STATE_FREE) {
+        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_LOCKED) {
+            int expected = *val;
+            int new = expected - QEMU_LOCKCNT_STATE_LOCKED + QEMU_LOCKCNT_STATE_WAITING;
+
+            trace_lockcnt_futex_wait_prepare(lockcnt, expected, new);
+            *val = atomic_cmpxchg(&lockcnt->count, expected, new);
+            if (*val == expected) {
+                *val = new;
+            }
+            continue;
+        }
+
+        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_WAITING) {
+            *waited = true;
+            trace_lockcnt_futex_wait(lockcnt, *val);
+            futex_wait(&lockcnt->count, *val);
+            *val = atomic_read(&lockcnt->count);
+            trace_lockcnt_futex_wait_resume(lockcnt, *val);
+            continue;
+        }
+
+        abort();
+    }
+    return false;
+}
+
+static void lockcnt_wake(QemuLockCnt *lockcnt)
+{
+    trace_lockcnt_futex_wake(lockcnt);
+    futex_wake(&lockcnt->count, 1);
+}
+
+void qemu_lockcnt_inc(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    bool waited = false;
+
+    for (;;) {
+        if (val >= QEMU_LOCKCNT_COUNT_STEP) {
+            int expected = val;
+            val = atomic_cmpxchg(&lockcnt->count, val, val + QEMU_LOCKCNT_COUNT_STEP);
+            if (val == expected) {
+                break;
+            }
+        } else {
+            /* The fast path is (0, unlocked)->(1, unlocked).  */
+            if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, QEMU_LOCKCNT_COUNT_STEP,
+                                             &waited)) {
+                break;
+            }
+        }
+    }
+
+    /* If we were woken by another thread, we should also wake one because
+     * we are effectively releasing the lock that was given to us.  This is
+     * the case where qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING
+     * in the low bits, and qemu_lockcnt_inc_and_unlock would find it and
+     * wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+void qemu_lockcnt_dec(QemuLockCnt *lockcnt)
+{
+    atomic_sub(&lockcnt->count, QEMU_LOCKCNT_COUNT_STEP);
+}
+
+/* Decrement a counter, and return locked if it is decremented to zero.
+ * If the function returns true, it is impossible for the counter to
+ * become nonzero until the next qemu_lockcnt_unlock.
+ */
+bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    for (;;) {
+        if (val >= 2 * QEMU_LOCKCNT_COUNT_STEP) {
+            int expected = val;
+            int new = val - QEMU_LOCKCNT_COUNT_STEP;
+            val = atomic_cmpxchg(&lockcnt->count, val, new);
+            if (val == expected) {
+                break;
+            }
+        }
+
+        /* If count is going 1->0, take the lock. The fast path is
+         * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
+         */
+        if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
+            return true;
+        }
+
+        if (waited) {
+            /* At this point we do not know if there are more waiters.  Assume
+             * there are.
+             */
+            locked_state = QEMU_LOCKCNT_STATE_WAITING;
+        }
+    }
+
+    /* If we were woken by another thread, but we're returning in unlocked
+     * state, we should also wake a thread because we are effectively
+     * releasing the lock that was given to us.  This is the case where
+     * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
+     * bits, and qemu_lockcnt_unlock would find it and wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+    return false;
+}
+
+/* If the counter is one, decrement it and return locked.  Otherwise do
+ * nothing.
+ *
+ * If the function returns true, it is impossible for the counter to
+ * become nonzero until the next qemu_lockcnt_unlock.
+ */
+bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    while (val < 2 * QEMU_LOCKCNT_COUNT_STEP) {
+        /* If count is going 1->0, take the lock. The fast path is
+         * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
+         */
+        if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
+            return true;
+        }
+
+        if (waited) {
+            /* At this point we do not know if there are more waiters.  Assume
+             * there are.
+             */
+            locked_state = QEMU_LOCKCNT_STATE_WAITING;
+        }
+    }
+
+    /* If we were woken by another thread, but we're returning in unlocked
+     * state, we should also wake a thread because we are effectively
+     * releasing the lock that was given to us.  This is the case where
+     * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
+     * bits, and qemu_lockcnt_inc_and_unlock would find it and wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+    return false;
+}
+
+void qemu_lockcnt_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int step = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    /* The third argument is only used if the low bits of val are 0
+     * (QEMU_LOCKCNT_STATE_FREE), so just blindly mix in the desired
+     * state.
+     */
+    while (!qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, val + step, &waited)) {
+        if (waited) {
+            /* At this point we do not know if there are more waiters.  Assume
+             * there are.
+             */
+            step = QEMU_LOCKCNT_STATE_WAITING;
+        }
+    }
+}
+
+void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt)
+{
+    int expected, new, val;
+
+    val = atomic_read(&lockcnt->count);
+    do {
+        expected = val;
+        new = (val + QEMU_LOCKCNT_COUNT_STEP) & ~QEMU_LOCKCNT_STATE_MASK;
+        trace_lockcnt_unlock_attempt(lockcnt, val, new);
+        val = atomic_cmpxchg(&lockcnt->count, val, new);
+    } while (val != expected);
+
+    trace_lockcnt_unlock_success(lockcnt, val, new);
+    if (val & QEMU_LOCKCNT_STATE_WAITING) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+void qemu_lockcnt_unlock(QemuLockCnt *lockcnt)
+{
+    int expected, new, val;
+
+    val = atomic_read(&lockcnt->count);
+    do {
+        expected = val;
+        new = val & ~QEMU_LOCKCNT_STATE_MASK;
+        trace_lockcnt_unlock_attempt(lockcnt, val, new);
+        val = atomic_cmpxchg(&lockcnt->count, val, new);
+    } while (val != expected);
+
+    trace_lockcnt_unlock_success(lockcnt, val, new);
+    if (val & QEMU_LOCKCNT_STATE_WAITING) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
+{
+    return lockcnt->count >> QEMU_LOCKCNT_COUNT_SHIFT;
+}
+#else
 void qemu_lockcnt_init(QemuLockCnt *lockcnt)
 {
     qemu_mutex_init(&lockcnt->mutex);
@@ -111,3 +392,4 @@ unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
 {
     return lockcnt->count;
 }
+#endif
diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c
index d20cdde..a3df214 100644
--- a/util/qemu-thread-posix.c
+++ b/util/qemu-thread-posix.c
@@ -11,10 +11,6 @@
  *
  */
 #include "qemu/osdep.h"
-#ifdef __linux__
-#include <sys/syscall.h>
-#include <linux/futex.h>
-#endif
 #include "qemu/thread.h"
 #include "qemu/atomic.h"
 #include "qemu/notify.h"
@@ -294,26 +290,7 @@ void qemu_sem_wait(QemuSemaphore *sem)
 }
 
 #ifdef __linux__
-#define futex(...)              syscall(__NR_futex, __VA_ARGS__)
-
-static inline void futex_wake(QemuEvent *ev, int n)
-{
-    futex(ev, FUTEX_WAKE, n, NULL, NULL, 0);
-}
-
-static inline void futex_wait(QemuEvent *ev, unsigned val)
-{
-    while (futex(ev, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
-        switch (errno) {
-        case EWOULDBLOCK:
-            return;
-        case EINTR:
-            break; /* get out of switch and retry */
-        default:
-            abort();
-        }
-    }
-}
+#include "qemu/futex.h"
 #else
 static inline void futex_wake(QemuEvent *ev, int n)
 {
diff --git a/util/trace-events b/util/trace-events
index ed06aee..2b8aa30 100644
--- a/util/trace-events
+++ b/util/trace-events
@@ -30,3 +30,13 @@ qemu_anon_ram_free(void *ptr, size_t size) "ptr %p size %zu"
 hbitmap_iter_skip_words(const void *hb, void *hbi, uint64_t pos, unsigned long cur) "hb %p hbi %p pos %"PRId64" cur 0x%lx"
 hbitmap_reset(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64
 hbitmap_set(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64
+
+# util/lockcnt.c
+lockcnt_fast_path_attempt(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d"
+lockcnt_fast_path_success(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d succeeded"
+lockcnt_unlock_attempt(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d"
+lockcnt_unlock_success(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d succeeded"
+lockcnt_futex_wait_prepare(const void *lockcnt, int expected, int new) "lockcnt %p preparing slow path %d->%d"
+lockcnt_futex_wait(const void *lockcnt, int val) "lockcnt %p waiting on %d"
+lockcnt_futex_wait_resume(const void *lockcnt, int new) "lockcnt %p after wait: %d"
+lockcnt_futex_wake(const void *lockcnt) "lockcnt %p waking up one waiter"
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 34+ messages in thread

* [Qemu-devel] [PATCH 05/10] aio: tweak walking in dispatch phase
  2016-11-29 11:46 [Qemu-devel] [PATCH for-2.9 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
                   ` (3 preceding siblings ...)
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini
@ 2016-11-29 11:47 ` Paolo Bonzini
  2016-11-30 13:38   ` Stefan Hajnoczi
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 06/10] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock Paolo Bonzini
                   ` (4 subsequent siblings)
  9 siblings, 1 reply; 34+ messages in thread
From: Paolo Bonzini @ 2016-11-29 11:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: qemu-block

Preparing for the following patch, use QLIST_FOREACH_SAFE and
modify the placement of walking_handlers increment/decrement.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-posix.c | 27 +++++++++++++--------------
 aio-win32.c | 26 ++++++++++++--------------
 2 files changed, 25 insertions(+), 28 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index e13b9ab..93a50ad 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -292,7 +292,7 @@ bool aio_pending(AioContext *ctx)
 
 bool aio_dispatch(AioContext *ctx)
 {
-    AioHandler *node;
+    AioHandler *node, *tmp;
     bool progress = false;
 
     /*
@@ -308,12 +308,10 @@ bool aio_dispatch(AioContext *ctx)
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    node = QLIST_FIRST(&ctx->aio_handlers);
-    while (node) {
-        AioHandler *tmp;
-        int revents;
+    ctx->walking_handlers++;
 
-        ctx->walking_handlers++;
+    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
+        int revents;
 
         revents = node->pfd.revents & node->pfd.events;
         node->pfd.revents = 0;
@@ -337,17 +335,18 @@ bool aio_dispatch(AioContext *ctx)
             progress = true;
         }
 
-        tmp = node;
-        node = QLIST_NEXT(node, node);
-
-        ctx->walking_handlers--;
-
-        if (!ctx->walking_handlers && tmp->deleted) {
-            QLIST_REMOVE(tmp, node);
-            g_free(tmp);
+        if (node->deleted) {
+            ctx->walking_handlers--;
+            if (!ctx->walking_handlers) {
+                QLIST_REMOVE(node, node);
+                g_free(node);
+            }
+            ctx->walking_handlers++;
         }
     }
 
+    ctx->walking_handlers--;
+
     /* Run our timers */
     progress |= timerlistgroup_run_timers(&ctx->tlg);
 
diff --git a/aio-win32.c b/aio-win32.c
index c8c249e..f27b56b 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -209,20 +209,18 @@ bool aio_pending(AioContext *ctx)
 
 static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
 {
-    AioHandler *node;
+    AioHandler *node, *tmp;
     bool progress = false;
 
+    ctx->walking_handlers++;
+
     /*
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    node = QLIST_FIRST(&ctx->aio_handlers);
-    while (node) {
-        AioHandler *tmp;
+    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
         int revents = node->pfd.revents;
 
-        ctx->walking_handlers++;
-
         if (!node->deleted &&
             (revents || event_notifier_get_handle(node->e) == event) &&
             node->io_notify) {
@@ -257,17 +255,17 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
             }
         }
 
-        tmp = node;
-        node = QLIST_NEXT(node, node);
-
-        ctx->walking_handlers--;
-
-        if (!ctx->walking_handlers && tmp->deleted) {
-            QLIST_REMOVE(tmp, node);
-            g_free(tmp);
+        if (node->deleted) {
+            ctx->walking_handlers--;
+            if (!ctx->walking_handlers) {
+                QLIST_REMOVE(node, node);
+                g_free(node);
+            }
+            ctx->walking_handlers++;
         }
     }
 
+    ctx->walking_handlers--;
     return progress;
 }
 
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 34+ messages in thread

* [Qemu-devel] [PATCH 06/10] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock
  2016-11-29 11:46 [Qemu-devel] [PATCH for-2.9 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
                   ` (4 preceding siblings ...)
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 05/10] aio: tweak walking in dispatch phase Paolo Bonzini
@ 2016-11-29 11:47 ` Paolo Bonzini
  2016-11-30 13:31   ` [Qemu-devel] [Qemu-block] " Stefan Hajnoczi
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 07/10] aio-win32: " Paolo Bonzini
                   ` (3 subsequent siblings)
  9 siblings, 1 reply; 34+ messages in thread
From: Paolo Bonzini @ 2016-11-29 11:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: qemu-block

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-posix.c | 51 ++++++++++++++++++++++++++++++---------------------
 1 file changed, 30 insertions(+), 21 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index 93a50ad..c64d36d 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -16,7 +16,7 @@
 #include "qemu/osdep.h"
 #include "qemu-common.h"
 #include "block/block.h"
-#include "qemu/queue.h"
+#include "qemu/rcu_queue.h"
 #include "qemu/sockets.h"
 #ifdef CONFIG_EPOLL_CREATE1
 #include <sys/epoll.h>
@@ -206,6 +206,8 @@ void aio_set_fd_handler(AioContext *ctx,
     bool is_new = false;
     bool deleted = false;
 
+    qemu_lockcnt_lock(&ctx->list_lock);
+
     node = find_aio_handler(ctx, fd);
 
     /* Are we deleting the fd handler? */
@@ -217,7 +219,7 @@ void aio_set_fd_handler(AioContext *ctx,
         g_source_remove_poll(&ctx->source, &node->pfd);
 
         /* If the lock is held, just mark the node as deleted */
-        if (ctx->walking_handlers) {
+        if (qemu_lockcnt_count(&ctx->list_lock)) {
             node->deleted = 1;
             node->pfd.revents = 0;
         } else {
@@ -233,7 +235,7 @@ void aio_set_fd_handler(AioContext *ctx,
             /* Alloc and insert if it's not already there */
             node = g_new0(AioHandler, 1);
             node->pfd.fd = fd;
-            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
 
             g_source_add_poll(&ctx->source, &node->pfd);
             is_new = true;
@@ -249,6 +251,7 @@ void aio_set_fd_handler(AioContext *ctx,
     }
 
     aio_epoll_update(ctx, node, is_new);
+    qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
     if (deleted) {
         g_free(node);
@@ -272,22 +275,32 @@ bool aio_prepare(AioContext *ctx)
 bool aio_pending(AioContext *ctx)
 {
     AioHandler *node;
+    bool result = false;
 
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    /*
+     * We have to walk very carefully in case aio_set_fd_handler is
+     * called while we're walking.
+     */
+    qemu_lockcnt_inc(&ctx->list_lock);
+
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         int revents;
 
         revents = node->pfd.revents & node->pfd.events;
         if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
             aio_node_check(ctx, node->is_external)) {
-            return true;
+            result = true;
+            break;
         }
         if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
             aio_node_check(ctx, node->is_external)) {
-            return true;
+            result = true;
+            break;
         }
     }
+    qemu_lockcnt_dec(&ctx->list_lock);
 
-    return false;
+    return result;
 }
 
 bool aio_dispatch(AioContext *ctx)
@@ -308,13 +321,12 @@ bool aio_dispatch(AioContext *ctx)
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    ctx->walking_handlers++;
+    qemu_lockcnt_inc(&ctx->list_lock);
 
-    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
+    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
         int revents;
 
-        revents = node->pfd.revents & node->pfd.events;
-        node->pfd.revents = 0;
+        revents = atomic_xchg(&node->pfd.revents, 0) & node->pfd.events;
 
         if (!node->deleted &&
             (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
@@ -336,16 +348,15 @@ bool aio_dispatch(AioContext *ctx)
         }
 
         if (node->deleted) {
-            ctx->walking_handlers--;
-            if (!ctx->walking_handlers) {
+            if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
                 QLIST_REMOVE(node, node);
                 g_free(node);
+                qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
             }
-            ctx->walking_handlers++;
         }
     }
 
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
 
     /* Run our timers */
     progress |= timerlistgroup_run_timers(&ctx->tlg);
@@ -420,14 +431,12 @@ bool aio_poll(AioContext *ctx, bool blocking)
         atomic_add(&ctx->notify_me, 2);
     }
 
-    ctx->walking_handlers++;
-
+    qemu_lockcnt_inc(&ctx->list_lock);
     assert(npfd == 0);
 
     /* fill pollfds */
-
     if (!aio_epoll_enabled(ctx)) {
-        QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+        QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
             if (!node->deleted && node->pfd.events
                 && aio_node_check(ctx, node->is_external)) {
                 add_pollfd(node);
@@ -464,12 +473,12 @@ bool aio_poll(AioContext *ctx, bool blocking)
     /* if we have any readable fds, dispatch event */
     if (ret > 0) {
         for (i = 0; i < npfd; i++) {
-            nodes[i]->pfd.revents = pollfds[i].revents;
+            atomic_or(&nodes[i]->pfd.revents, pollfds[i].revents);
         }
     }
 
     npfd = 0;
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
 
     /* Run dispatch even if there were no readable fds to run timers */
     if (aio_dispatch(ctx)) {
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 34+ messages in thread

* [Qemu-devel] [PATCH 07/10] aio-win32: remove walking_handlers, protecting AioHandler list with list_lock
  2016-11-29 11:46 [Qemu-devel] [PATCH for-2.9 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
                   ` (5 preceding siblings ...)
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 06/10] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock Paolo Bonzini
@ 2016-11-29 11:47 ` Paolo Bonzini
  2016-11-30 13:34   ` Stefan Hajnoczi
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 08/10] aio: document locking Paolo Bonzini
                   ` (2 subsequent siblings)
  9 siblings, 1 reply; 34+ messages in thread
From: Paolo Bonzini @ 2016-11-29 11:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: qemu-block

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-win32.c | 82 +++++++++++++++++++++++++++++++++++++------------------------
 1 file changed, 50 insertions(+), 32 deletions(-)

diff --git a/aio-win32.c b/aio-win32.c
index f27b56b..7ae2c14 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -20,6 +20,7 @@
 #include "block/block.h"
 #include "qemu/queue.h"
 #include "qemu/sockets.h"
+#include "qemu/rcu_queue.h"
 
 struct AioHandler {
     EventNotifier *e;
@@ -43,6 +44,7 @@ void aio_set_fd_handler(AioContext *ctx,
     /* fd is a SOCKET in our case */
     AioHandler *node;
 
+    qemu_lockcnt_lock(&ctx->list_lock);
     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
         if (node->pfd.fd == fd && !node->deleted) {
             break;
@@ -52,14 +54,14 @@ void aio_set_fd_handler(AioContext *ctx,
     /* Are we deleting the fd handler? */
     if (!io_read && !io_write) {
         if (node) {
-            /* If the lock is held, just mark the node as deleted */
-            if (ctx->walking_handlers) {
+            /* If aio_poll is in progress, just mark the node as deleted */
+            if (qemu_lockcnt_count(&ctx->list_lock)) {
                 node->deleted = 1;
                 node->pfd.revents = 0;
             } else {
                 /* Otherwise, delete it for real.  We can't just mark it as
                  * deleted because deleted nodes are only cleaned up after
-                 * releasing the walking_handlers lock.
+                 * releasing the list_lock.
                  */
                 QLIST_REMOVE(node, node);
                 g_free(node);
@@ -72,7 +74,7 @@ void aio_set_fd_handler(AioContext *ctx,
             /* Alloc and insert if it's not already there */
             node = g_new0(AioHandler, 1);
             node->pfd.fd = fd;
-            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
         }
 
         node->pfd.events = 0;
@@ -97,6 +99,7 @@ void aio_set_fd_handler(AioContext *ctx,
                        FD_CONNECT | FD_WRITE | FD_OOB);
     }
 
+    qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
 }
 
@@ -107,6 +110,7 @@ void aio_set_event_notifier(AioContext *ctx,
 {
     AioHandler *node;
 
+    qemu_lockcnt_lock(&ctx->list_lock);
     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
         if (node->e == e && !node->deleted) {
             break;
@@ -118,14 +122,14 @@ void aio_set_event_notifier(AioContext *ctx,
         if (node) {
             g_source_remove_poll(&ctx->source, &node->pfd);
 
-            /* If the lock is held, just mark the node as deleted */
-            if (ctx->walking_handlers) {
+            /* aio_poll is in progress, just mark the node as deleted */
+            if (qemu_lockcnt_count(&ctx->list_lock)) {
                 node->deleted = 1;
                 node->pfd.revents = 0;
             } else {
                 /* Otherwise, delete it for real.  We can't just mark it as
                  * deleted because deleted nodes are only cleaned up after
-                 * releasing the walking_handlers lock.
+                 * releasing the list_lock.
                  */
                 QLIST_REMOVE(node, node);
                 g_free(node);
@@ -139,7 +143,7 @@ void aio_set_event_notifier(AioContext *ctx,
             node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
             node->pfd.events = G_IO_IN;
             node->is_external = is_external;
-            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
 
             g_source_add_poll(&ctx->source, &node->pfd);
         }
@@ -147,6 +151,7 @@ void aio_set_event_notifier(AioContext *ctx,
         node->io_notify = io_notify;
     }
 
+    qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
 }
 
@@ -157,10 +162,16 @@ bool aio_prepare(AioContext *ctx)
     bool have_select_revents = false;
     fd_set rfds, wfds;
 
+    /*
+     * We have to walk very carefully in case aio_set_fd_handler is
+     * called while we're walking.
+     */
+    qemu_lockcnt_inc(&ctx->list_lock);
+
     /* fill fd sets */
     FD_ZERO(&rfds);
     FD_ZERO(&wfds);
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         if (node->io_read) {
             FD_SET ((SOCKET)node->pfd.fd, &rfds);
         }
@@ -170,61 +181,71 @@ bool aio_prepare(AioContext *ctx)
     }
 
     if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
-        QLIST_FOREACH(node, &ctx->aio_handlers, node) {
-            node->pfd.revents = 0;
+        QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
             if (FD_ISSET(node->pfd.fd, &rfds)) {
-                node->pfd.revents |= G_IO_IN;
+                atomic_or(&node->pfd.revents, G_IO_IN);
                 have_select_revents = true;
             }
 
             if (FD_ISSET(node->pfd.fd, &wfds)) {
-                node->pfd.revents |= G_IO_OUT;
+                atomic_or(&node->pfd.revents, G_IO_OUT);
                 have_select_revents = true;
             }
         }
     }
 
+    qemu_lockcnt_dec(&ctx->list_lock);
     return have_select_revents;
 }
 
 bool aio_pending(AioContext *ctx)
 {
     AioHandler *node;
+    bool result = false;
 
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    /*
+     * We have to walk very carefully in case aio_set_fd_handler is
+     * called while we're walking.
+     */
+    qemu_lockcnt_inc(&ctx->list_lock);
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         if (node->pfd.revents && node->io_notify) {
-            return true;
+            result = true;
+            break;
         }
 
         if ((node->pfd.revents & G_IO_IN) && node->io_read) {
-            return true;
+            result = true;
+            break;
         }
         if ((node->pfd.revents & G_IO_OUT) && node->io_write) {
-            return true;
+            result = true;
+            break;
         }
     }
 
-    return false;
+    qemu_lockcnt_dec(&ctx->list_lock);
+    return result;
 }
 
 static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
 {
-    AioHandler *node, *tmp;
+    AioHandler *node;
     bool progress = false;
+    AioHandler *tmp;
 
-    ctx->walking_handlers++;
+    qemu_lockcnt_inc(&ctx->list_lock);
 
     /*
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
-        int revents = node->pfd.revents;
+    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
+        int revents = atomic_xchg(&node->pfd.revents, 0);
 
         if (!node->deleted &&
             (revents || event_notifier_get_handle(node->e) == event) &&
             node->io_notify) {
-            node->pfd.revents = 0;
             node->io_notify(node->e);
 
             /* aio_notify() does not count as progress */
@@ -235,7 +256,6 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
 
         if (!node->deleted &&
             (node->io_read || node->io_write)) {
-            node->pfd.revents = 0;
             if ((revents & G_IO_IN) && node->io_read) {
                 node->io_read(node->opaque);
                 progress = true;
@@ -256,16 +276,15 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
         }
 
         if (node->deleted) {
-            ctx->walking_handlers--;
-            if (!ctx->walking_handlers) {
+            if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
                 QLIST_REMOVE(node, node);
                 g_free(node);
+                qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
             }
-            ctx->walking_handlers++;
         }
     }
 
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
     return progress;
 }
 
@@ -301,20 +320,19 @@ bool aio_poll(AioContext *ctx, bool blocking)
         atomic_add(&ctx->notify_me, 2);
     }
 
+    qemu_lockcnt_inc(&ctx->list_lock);
     have_select_revents = aio_prepare(ctx);
 
-    ctx->walking_handlers++;
-
     /* fill fd sets */
     count = 0;
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         if (!node->deleted && node->io_notify
             && aio_node_check(ctx, node->is_external)) {
             events[count++] = event_notifier_get_handle(node->e);
         }
     }
 
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
     first = true;
 
     /* ctx->notifier is always registered.  */
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 34+ messages in thread

* [Qemu-devel] [PATCH 08/10] aio: document locking
  2016-11-29 11:46 [Qemu-devel] [PATCH for-2.9 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
                   ` (6 preceding siblings ...)
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 07/10] aio-win32: " Paolo Bonzini
@ 2016-11-29 11:47 ` Paolo Bonzini
  2016-11-30 13:35   ` [Qemu-devel] [Qemu-block] " Stefan Hajnoczi
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 09/10] aio: push aio_context_acquire/release down to dispatching Paolo Bonzini
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 10/10] async: optimize aio_bh_poll Paolo Bonzini
  9 siblings, 1 reply; 34+ messages in thread
From: Paolo Bonzini @ 2016-11-29 11:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: qemu-block

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/multiple-iothreads.txt |  5 ++---
 include/block/aio.h         | 32 ++++++++++++++++----------------
 2 files changed, 18 insertions(+), 19 deletions(-)

diff --git a/docs/multiple-iothreads.txt b/docs/multiple-iothreads.txt
index 0e7cdb2..a03f887 100644
--- a/docs/multiple-iothreads.txt
+++ b/docs/multiple-iothreads.txt
@@ -84,9 +84,8 @@ How to synchronize with an IOThread
 AioContext is not thread-safe so some rules must be followed when using file
 descriptors, event notifiers, timers, or BHs across threads:
 
-1. AioContext functions can be called safely from file descriptor, event
-notifier, timer, or BH callbacks invoked by the AioContext.  No locking is
-necessary.
+1. AioContext functions can always be called safely.  They handle their
+own locking internally.
 
 2. Other threads wishing to access the AioContext must use
 aio_context_acquire()/aio_context_release() for mutual exclusion.  Once the
diff --git a/include/block/aio.h b/include/block/aio.h
index b833fe8..5c2f53b 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -52,18 +52,12 @@ struct LinuxAioState;
 struct AioContext {
     GSource source;
 
-    /* Protects all fields from multi-threaded access */
+    /* Used by AioContext users to protect from multi-threaded access.  */
     QemuRecMutex lock;
 
-    /* The list of registered AIO handlers */
+    /* The list of registered AIO handlers.  Protected by ctx->list_lock. */
     QLIST_HEAD(, AioHandler) aio_handlers;
 
-    /* This is a simple lock used to protect the aio_handlers list.
-     * Specifically, it's used to ensure that no callbacks are removed while
-     * we're walking and dispatching callbacks.
-     */
-    int walking_handlers;
-
     /* Used to avoid unnecessary event_notifier_set calls in aio_notify;
      * accessed with atomic primitives.  If this field is 0, everything
      * (file descriptors, bottom halves, timers) will be re-evaluated
@@ -89,9 +83,9 @@ struct AioContext {
      */
     uint32_t notify_me;
 
-    /* A lock to protect between bh's adders and deleter, and to ensure
-     * that no callbacks are removed while we're walking and dispatching
-     * them.
+    /* A lock to protect between QEMUBH and AioHandler adders and deleter,
+     * and to ensure that no callbacks are removed while we're walking and
+     * dispatching them.
      */
     QemuLockCnt list_lock;
 
@@ -113,7 +107,9 @@ struct AioContext {
     bool notified;
     EventNotifier notifier;
 
-    /* Thread pool for performing work and receiving completion callbacks */
+    /* Thread pool for performing work and receiving completion callbacks.
+     * Has its own locking.
+     */
     struct ThreadPool *thread_pool;
 
 #ifdef CONFIG_LINUX_AIO
@@ -123,7 +119,9 @@ struct AioContext {
     struct LinuxAioState *linux_aio;
 #endif
 
-    /* TimerLists for calling timers - one per clock type */
+    /* TimerLists for calling timers - one per clock type.  Has its own
+     * locking.
+     */
     QEMUTimerListGroup tlg;
 
     int external_disable_cnt;
@@ -165,9 +163,11 @@ void aio_context_unref(AioContext *ctx);
  * automatically takes care of calling aio_context_acquire and
  * aio_context_release.
  *
- * Access to timers and BHs from a thread that has not acquired AioContext
- * is possible.  Access to callbacks for now must be done while the AioContext
- * is owned by the thread (FIXME).
+ * Note that this is separate from bdrv_drained_begin/bdrv_drained_end.  A
+ * thread still has to call those to avoid being interrupted by the guest.
+ *
+ * Bottom halves, timers and callbacks can be created or removed without
+ * acquiring the AioContext.
  */
 void aio_context_acquire(AioContext *ctx);
 
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 34+ messages in thread

* [Qemu-devel] [PATCH 09/10] aio: push aio_context_acquire/release down to dispatching
  2016-11-29 11:46 [Qemu-devel] [PATCH for-2.9 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
                   ` (7 preceding siblings ...)
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 08/10] aio: document locking Paolo Bonzini
@ 2016-11-29 11:47 ` Paolo Bonzini
  2016-11-30 13:37   ` [Qemu-devel] [Qemu-block] " Stefan Hajnoczi
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 10/10] async: optimize aio_bh_poll Paolo Bonzini
  9 siblings, 1 reply; 34+ messages in thread
From: Paolo Bonzini @ 2016-11-29 11:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: qemu-block

The AioContext data structures are now protected by list_lock and/or
they are walked with FOREACH_RCU primitives.  There is no need anymore
to acquire the AioContext for the entire duration of aio_dispatch.
Instead, just acquire it before and after invoking the callbacks.
The next step is then to push it further down.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-posix.c | 15 ++++++---------
 aio-win32.c | 15 +++++++--------
 async.c     |  2 ++
 3 files changed, 15 insertions(+), 17 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index c64d36d..32e493f 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -332,7 +332,9 @@ bool aio_dispatch(AioContext *ctx)
             (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
             aio_node_check(ctx, node->is_external) &&
             node->io_read) {
+            aio_context_acquire(ctx);
             node->io_read(node->opaque);
+            aio_context_release(ctx);
 
             /* aio_notify() does not count as progress */
             if (node->opaque != &ctx->notifier) {
@@ -343,7 +345,9 @@ bool aio_dispatch(AioContext *ctx)
             (revents & (G_IO_OUT | G_IO_ERR)) &&
             aio_node_check(ctx, node->is_external) &&
             node->io_write) {
+            aio_context_acquire(ctx);
             node->io_write(node->opaque);
+            aio_context_release(ctx);
             progress = true;
         }
 
@@ -359,7 +363,9 @@ bool aio_dispatch(AioContext *ctx)
     qemu_lockcnt_dec(&ctx->list_lock);
 
     /* Run our timers */
+    aio_context_acquire(ctx);
     progress |= timerlistgroup_run_timers(&ctx->tlg);
+    aio_context_release(ctx);
 
     return progress;
 }
@@ -417,7 +423,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
     bool progress;
     int64_t timeout;
 
-    aio_context_acquire(ctx);
     progress = false;
 
     /* aio_notify can avoid the expensive event_notifier_set if
@@ -447,9 +452,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
     timeout = blocking ? aio_compute_timeout(ctx) : 0;
 
     /* wait until next event */
-    if (timeout) {
-        aio_context_release(ctx);
-    }
     if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) {
         AioHandler epoll_handler;
 
@@ -464,9 +466,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
     if (blocking) {
         atomic_sub(&ctx->notify_me, 2);
     }
-    if (timeout) {
-        aio_context_acquire(ctx);
-    }
 
     aio_notify_accept(ctx);
 
@@ -485,8 +484,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
         progress = true;
     }
 
-    aio_context_release(ctx);
-
     return progress;
 }
 
diff --git a/aio-win32.c b/aio-win32.c
index 7ae2c14..374d28e 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -246,7 +246,9 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
         if (!node->deleted &&
             (revents || event_notifier_get_handle(node->e) == event) &&
             node->io_notify) {
+            aio_context_acquire(ctx);
             node->io_notify(node->e);
+            aio_context_release(ctx);
 
             /* aio_notify() does not count as progress */
             if (node->e != &ctx->notifier) {
@@ -257,11 +259,15 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
         if (!node->deleted &&
             (node->io_read || node->io_write)) {
             if ((revents & G_IO_IN) && node->io_read) {
+                aio_context_acquire(ctx);
                 node->io_read(node->opaque);
+                aio_context_release(ctx);
                 progress = true;
             }
             if ((revents & G_IO_OUT) && node->io_write) {
+                aio_context_acquire(ctx);
                 node->io_write(node->opaque);
+                aio_context_release(ctx);
                 progress = true;
             }
 
@@ -306,7 +312,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
     int count;
     int timeout;
 
-    aio_context_acquire(ctx);
     progress = false;
 
     /* aio_notify can avoid the expensive event_notifier_set if
@@ -348,17 +353,11 @@ bool aio_poll(AioContext *ctx, bool blocking)
 
         timeout = blocking && !have_select_revents
             ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0;
-        if (timeout) {
-            aio_context_release(ctx);
-        }
         ret = WaitForMultipleObjects(count, events, FALSE, timeout);
         if (blocking) {
             assert(first);
             atomic_sub(&ctx->notify_me, 2);
         }
-        if (timeout) {
-            aio_context_acquire(ctx);
-        }
 
         if (first) {
             aio_notify_accept(ctx);
@@ -381,8 +380,8 @@ bool aio_poll(AioContext *ctx, bool blocking)
         progress |= aio_dispatch_handlers(ctx, event);
     } while (count > 0);
 
+    aio_context_acquire(ctx);
     progress |= timerlistgroup_run_timers(&ctx->tlg);
-
     aio_context_release(ctx);
     return progress;
 }
diff --git a/async.c b/async.c
index f606785..95927fc 100644
--- a/async.c
+++ b/async.c
@@ -110,7 +110,9 @@ int aio_bh_poll(AioContext *ctx)
                 ret = 1;
             }
             bh->idle = 0;
+            aio_context_acquire(ctx);
             aio_bh_call(bh);
+            aio_context_release(ctx);
         }
     }
 
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 34+ messages in thread

* [Qemu-devel] [PATCH 10/10] async: optimize aio_bh_poll
  2016-11-29 11:46 [Qemu-devel] [PATCH for-2.9 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
                   ` (8 preceding siblings ...)
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 09/10] aio: push aio_context_acquire/release down to dispatching Paolo Bonzini
@ 2016-11-29 11:47 ` Paolo Bonzini
  2016-11-30 13:38   ` [Qemu-devel] [Qemu-block] " Stefan Hajnoczi
  9 siblings, 1 reply; 34+ messages in thread
From: Paolo Bonzini @ 2016-11-29 11:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: qemu-block

Avoid entering the slow path of qemu_lockcnt_dec_and_lock if
no bottom half has to be deleted.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 async.c | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/async.c b/async.c
index 95927fc..6f8184b 100644
--- a/async.c
+++ b/async.c
@@ -92,6 +92,7 @@ int aio_bh_poll(AioContext *ctx)
 {
     QEMUBH *bh, **bhp, *next;
     int ret;
+    bool deleted = false;
 
     qemu_lockcnt_inc(&ctx->list_lock);
 
@@ -114,9 +115,17 @@ int aio_bh_poll(AioContext *ctx)
             aio_bh_call(bh);
             aio_context_release(ctx);
         }
+        if (bh->deleted) {
+            deleted = true;
+        }
     }
 
     /* remove deleted bhs */
+    if (!deleted) {
+        qemu_lockcnt_dec(&ctx->list_lock);
+        return ret;
+    }
+
     if (qemu_lockcnt_dec_and_lock(&ctx->list_lock)) {
         bhp = &ctx->first_bh;
         while (*bhp) {
@@ -130,7 +139,6 @@ int aio_bh_poll(AioContext *ctx)
         }
         qemu_lockcnt_unlock(&ctx->list_lock);
     }
-
     return ret;
 }
 
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 34+ messages in thread

* Re: [Qemu-devel] [PATCH 02/10] qemu-thread: introduce QemuLockCnt
  2016-11-29 11:46 ` [Qemu-devel] [PATCH 02/10] qemu-thread: introduce QemuLockCnt Paolo Bonzini
@ 2016-11-29 19:34   ` Eric Blake
  2016-11-30 13:05   ` Stefan Hajnoczi
  2017-02-02 19:06   ` Emilio G. Cota
  2 siblings, 0 replies; 34+ messages in thread
From: Eric Blake @ 2016-11-29 19:34 UTC (permalink / raw)
  To: Paolo Bonzini, qemu-devel; +Cc: qemu-block

[-- Attachment #1: Type: text/plain, Size: 1189 bytes --]

On 11/29/2016 05:46 AM, Paolo Bonzini wrote:
> A QemuLockCnt comprises a counter and a mutex, with primitives
> to increment and decrement the counter, and to take and release the
> mutex.  It can be used to do lock-free visits to a data structure
> whenever mutexes would be too heavy-weight and the critical section
> is too long for RCU.
> 
> This could be implemented simply by protecting the counter with the
> mutex, but QemuLockCnt is harder to misuse and more efficient.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---


> +    int qemu_lockcnt_count(QemuLockCnt *lockcnt);
> +
> +        Return the lockcnt's count.  The count can change at any time
> +        any time; still, while the lockcnt is locked, one can usefully

duplicate 'any time'


> +++ b/util/lockcnt.c
> @@ -0,0 +1,113 @@
> +/*
> + * QemuLockCnt implementation
> + *
> + * Copyright Red Hat, Inc. 2015

You've been sitting on this a while :)  Want to add 2016?

The documentation is a huge help to understanding the code; overall it
looks pretty clean.

-- 
Eric Blake   eblake redhat com    +1-919-301-3266
Libvirt virtualization library http://libvirt.org


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 604 bytes --]

^ permalink raw reply	[flat|nested] 34+ messages in thread

* Re: [Qemu-devel] [PATCH 01/10] aio: rename bh_lock to list_lock
  2016-11-29 11:46 ` [Qemu-devel] [PATCH 01/10] aio: rename bh_lock to list_lock Paolo Bonzini
@ 2016-11-30 12:53   ` Stefan Hajnoczi
  0 siblings, 0 replies; 34+ messages in thread
From: Stefan Hajnoczi @ 2016-11-30 12:53 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, qemu-block

[-- Attachment #1: Type: text/plain, Size: 512 bytes --]

On Tue, Nov 29, 2016 at 12:46:58PM +0100, Paolo Bonzini wrote:
> diff --git a/include/block/aio.h b/include/block/aio.h
> index c7ae27c..eee3139 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -90,7 +90,7 @@ struct AioContext {
>      uint32_t notify_me;
>  
>      /* lock to protect between bh's adders and deleter */
> -    QemuMutex bh_lock;
> +    QemuMutex list_lock;

Please update the comment.  Looks good otherwise:

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 455 bytes --]

^ permalink raw reply	[flat|nested] 34+ messages in thread

* Re: [Qemu-devel] [PATCH 02/10] qemu-thread: introduce QemuLockCnt
  2016-11-29 11:46 ` [Qemu-devel] [PATCH 02/10] qemu-thread: introduce QemuLockCnt Paolo Bonzini
  2016-11-29 19:34   ` Eric Blake
@ 2016-11-30 13:05   ` Stefan Hajnoczi
  2017-02-02 19:06   ` Emilio G. Cota
  2 siblings, 0 replies; 34+ messages in thread
From: Stefan Hajnoczi @ 2016-11-30 13:05 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, qemu-block

[-- Attachment #1: Type: text/plain, Size: 1337 bytes --]

On Tue, Nov 29, 2016 at 12:46:59PM +0100, Paolo Bonzini wrote:
> A QemuLockCnt comprises a counter and a mutex, with primitives
> to increment and decrement the counter, and to take and release the
> mutex.  It can be used to do lock-free visits to a data structure
> whenever mutexes would be too heavy-weight and the critical section
> is too long for RCU.
> 
> This could be implemented simply by protecting the counter with the
> mutex, but QemuLockCnt is harder to misuse and more efficient.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  docs/lockcnt.txt      | 343 ++++++++++++++++++++++++++++++++++++++++++++++++++
>  include/qemu/thread.h |  17 +++
>  util/Makefile.objs    |   1 +
>  util/lockcnt.c        | 113 +++++++++++++++++
>  4 files changed, 474 insertions(+)
>  create mode 100644 docs/lockcnt.txt
>  create mode 100644 util/lockcnt.c
> 
> diff --git a/docs/lockcnt.txt b/docs/lockcnt.txt
> new file mode 100644
> index 0000000..fc5d240
> --- /dev/null
> +++ b/docs/lockcnt.txt
> @@ -0,0 +1,343 @@
> +DOCUMENTATION FOR LOCKED COUNTERS (aka QemuLockCnt)
> +===================================================

This file contains all the documentation but the header file has no doc
comments.  Could you move everything into the header file (like
include/qom/object.h)?

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 455 bytes --]

^ permalink raw reply	[flat|nested] 34+ messages in thread

* Re: [Qemu-devel] [PATCH 03/10] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 03/10] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh Paolo Bonzini
@ 2016-11-30 13:06   ` Stefan Hajnoczi
  0 siblings, 0 replies; 34+ messages in thread
From: Stefan Hajnoczi @ 2016-11-30 13:06 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, qemu-block

[-- Attachment #1: Type: text/plain, Size: 520 bytes --]

On Tue, Nov 29, 2016 at 12:47:00PM +0100, Paolo Bonzini wrote:
> This will make it possible to walk the list of bottom halves without
> holding the AioContext lock---and in turn to call bottom half
> handlers without holding the lock.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  async.c             | 35 ++++++++++++++++-------------------
>  include/block/aio.h | 12 +++++-------
>  2 files changed, 21 insertions(+), 26 deletions(-)

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 455 bytes --]

^ permalink raw reply	[flat|nested] 34+ messages in thread

* Re: [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini
@ 2016-11-30 13:19   ` Stefan Hajnoczi
  0 siblings, 0 replies; 34+ messages in thread
From: Stefan Hajnoczi @ 2016-11-30 13:19 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, qemu-block

[-- Attachment #1: Type: text/plain, Size: 1203 bytes --]

On Tue, Nov 29, 2016 at 12:47:01PM +0100, Paolo Bonzini wrote:
> diff --git a/include/qemu/futex.h b/include/qemu/futex.h
> new file mode 100644
> index 0000000..c3d1089
> --- /dev/null
> +++ b/include/qemu/futex.h
> @@ -0,0 +1,36 @@
> +/*
> + * Wrappers around Linux futex syscall
> + *
> + * Copyright Red Hat, Inc. 2015
> + *
> + * Author:
> + *  Paolo Bonzini <pbonzini@redhat.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + *
> + */
> +
> +#include <sys/syscall.h>
> +#include <linux/futex.h>
> +
> +#define futex(...)              syscall(__NR_futex, __VA_ARGS__)
> +
> +static inline void futex_wake(void *f, int n)
> +{
> +    futex(f, FUTEX_WAKE, n, NULL, NULL, 0);
> +}
> +
> +static inline void futex_wait(void *f, unsigned val)

Now that this is being promoted to an include/ API please use
qemu_futex(), qemu_futex_wake(), and qemu_futex_wait() names.  It's a
bit bold to use futex(), futex_wake(), and futex_wait().  We're relying
on the fact that no system headers will ever use those names.

I haven't reviewed this patch in detail but:
Acked-by: Stefan Hajnoczi <stefanha@redhat.com>

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 455 bytes --]

^ permalink raw reply	[flat|nested] 34+ messages in thread

* Re: [Qemu-devel] [Qemu-block] [PATCH 06/10] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 06/10] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock Paolo Bonzini
@ 2016-11-30 13:31   ` Stefan Hajnoczi
  2016-11-30 13:36     ` Paolo Bonzini
  0 siblings, 1 reply; 34+ messages in thread
From: Stefan Hajnoczi @ 2016-11-30 13:31 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, qemu-block

[-- Attachment #1: Type: text/plain, Size: 1930 bytes --]

On Tue, Nov 29, 2016 at 12:47:03PM +0100, Paolo Bonzini wrote:
> @@ -272,22 +275,32 @@ bool aio_prepare(AioContext *ctx)
>  bool aio_pending(AioContext *ctx)
>  {
>      AioHandler *node;
> +    bool result = false;
>  
> -    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
> +    /*
> +     * We have to walk very carefully in case aio_set_fd_handler is
> +     * called while we're walking.
> +     */
> +    qemu_lockcnt_inc(&ctx->list_lock);
> +
> +    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
>          int revents;
>  
>          revents = node->pfd.revents & node->pfd.events;
>          if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
>              aio_node_check(ctx, node->is_external)) {
> -            return true;
> +            result = true;
> +            break;
>          }
>          if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
>              aio_node_check(ctx, node->is_external)) {
> -            return true;
> +            result = true;
> +            break;
>          }
>      }
> +    qemu_lockcnt_dec(&ctx->list_lock);
>  
> -    return false;
> +    return result;
>  }
>  
>  bool aio_dispatch(AioContext *ctx)
> @@ -308,13 +321,12 @@ bool aio_dispatch(AioContext *ctx)
>       * We have to walk very carefully in case aio_set_fd_handler is
>       * called while we're walking.
>       */
> -    ctx->walking_handlers++;
> +    qemu_lockcnt_inc(&ctx->list_lock);
>  
> -    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
> +    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
>          int revents;
>  
> -        revents = node->pfd.revents & node->pfd.events;
> -        node->pfd.revents = 0;
> +        revents = atomic_xchg(&node->pfd.revents, 0) & node->pfd.events;

Why is node->pfd.revents accessed with atomic_*() here and in aio_poll()
but not in aio_pending()?

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 455 bytes --]

^ permalink raw reply	[flat|nested] 34+ messages in thread

* Re: [Qemu-devel] [PATCH 07/10] aio-win32: remove walking_handlers, protecting AioHandler list with list_lock
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 07/10] aio-win32: " Paolo Bonzini
@ 2016-11-30 13:34   ` Stefan Hajnoczi
  0 siblings, 0 replies; 34+ messages in thread
From: Stefan Hajnoczi @ 2016-11-30 13:34 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, qemu-block

[-- Attachment #1: Type: text/plain, Size: 309 bytes --]

On Tue, Nov 29, 2016 at 12:47:04PM +0100, Paolo Bonzini wrote:
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  aio-win32.c | 82 +++++++++++++++++++++++++++++++++++++------------------------
>  1 file changed, 50 insertions(+), 32 deletions(-)

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 455 bytes --]

^ permalink raw reply	[flat|nested] 34+ messages in thread

* Re: [Qemu-devel] [Qemu-block] [PATCH 08/10] aio: document locking
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 08/10] aio: document locking Paolo Bonzini
@ 2016-11-30 13:35   ` Stefan Hajnoczi
  0 siblings, 0 replies; 34+ messages in thread
From: Stefan Hajnoczi @ 2016-11-30 13:35 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, qemu-block

[-- Attachment #1: Type: text/plain, Size: 339 bytes --]

On Tue, Nov 29, 2016 at 12:47:05PM +0100, Paolo Bonzini wrote:
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  docs/multiple-iothreads.txt |  5 ++---
>  include/block/aio.h         | 32 ++++++++++++++++----------------
>  2 files changed, 18 insertions(+), 19 deletions(-)

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 455 bytes --]

^ permalink raw reply	[flat|nested] 34+ messages in thread

* Re: [Qemu-devel] [Qemu-block] [PATCH 06/10] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock
  2016-11-30 13:31   ` [Qemu-devel] [Qemu-block] " Stefan Hajnoczi
@ 2016-11-30 13:36     ` Paolo Bonzini
  2016-12-01 15:32       ` [Qemu-devel] " Paolo Bonzini
  0 siblings, 1 reply; 34+ messages in thread
From: Paolo Bonzini @ 2016-11-30 13:36 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: qemu-devel, qemu-block

[-- Attachment #1: Type: text/plain, Size: 2092 bytes --]



On 30/11/2016 14:31, Stefan Hajnoczi wrote:
> On Tue, Nov 29, 2016 at 12:47:03PM +0100, Paolo Bonzini wrote:
>> @@ -272,22 +275,32 @@ bool aio_prepare(AioContext *ctx)
>>  bool aio_pending(AioContext *ctx)
>>  {
>>      AioHandler *node;
>> +    bool result = false;
>>  
>> -    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
>> +    /*
>> +     * We have to walk very carefully in case aio_set_fd_handler is
>> +     * called while we're walking.
>> +     */
>> +    qemu_lockcnt_inc(&ctx->list_lock);
>> +
>> +    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
>>          int revents;
>>  
>>          revents = node->pfd.revents & node->pfd.events;
>>          if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
>>              aio_node_check(ctx, node->is_external)) {
>> -            return true;
>> +            result = true;
>> +            break;
>>          }
>>          if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
>>              aio_node_check(ctx, node->is_external)) {
>> -            return true;
>> +            result = true;
>> +            break;
>>          }
>>      }
>> +    qemu_lockcnt_dec(&ctx->list_lock);
>>  
>> -    return false;
>> +    return result;
>>  }
>>  
>>  bool aio_dispatch(AioContext *ctx)
>> @@ -308,13 +321,12 @@ bool aio_dispatch(AioContext *ctx)
>>       * We have to walk very carefully in case aio_set_fd_handler is
>>       * called while we're walking.
>>       */
>> -    ctx->walking_handlers++;
>> +    qemu_lockcnt_inc(&ctx->list_lock);
>>  
>> -    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
>> +    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
>>          int revents;
>>  
>> -        revents = node->pfd.revents & node->pfd.events;
>> -        node->pfd.revents = 0;
>> +        revents = atomic_xchg(&node->pfd.revents, 0) & node->pfd.events;
> 
> Why is node->pfd.revents accessed with atomic_*() here and in aio_poll()
> but not in aio_pending()?

It could use atomic_read there, indeed.

Paolo


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 473 bytes --]

^ permalink raw reply	[flat|nested] 34+ messages in thread

* Re: [Qemu-devel] [Qemu-block] [PATCH 09/10] aio: push aio_context_acquire/release down to dispatching
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 09/10] aio: push aio_context_acquire/release down to dispatching Paolo Bonzini
@ 2016-11-30 13:37   ` Stefan Hajnoczi
  0 siblings, 0 replies; 34+ messages in thread
From: Stefan Hajnoczi @ 2016-11-30 13:37 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, qemu-block

[-- Attachment #1: Type: text/plain, Size: 671 bytes --]

On Tue, Nov 29, 2016 at 12:47:06PM +0100, Paolo Bonzini wrote:
> The AioContext data structures are now protected by list_lock and/or
> they are walked with FOREACH_RCU primitives.  There is no need anymore
> to acquire the AioContext for the entire duration of aio_dispatch.
> Instead, just acquire it before and after invoking the callbacks.
> The next step is then to push it further down.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  aio-posix.c | 15 ++++++---------
>  aio-win32.c | 15 +++++++--------
>  async.c     |  2 ++
>  3 files changed, 15 insertions(+), 17 deletions(-)

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 455 bytes --]

^ permalink raw reply	[flat|nested] 34+ messages in thread

* Re: [Qemu-devel] [Qemu-block] [PATCH 10/10] async: optimize aio_bh_poll
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 10/10] async: optimize aio_bh_poll Paolo Bonzini
@ 2016-11-30 13:38   ` Stefan Hajnoczi
  0 siblings, 0 replies; 34+ messages in thread
From: Stefan Hajnoczi @ 2016-11-30 13:38 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, qemu-block

[-- Attachment #1: Type: text/plain, Size: 363 bytes --]

On Tue, Nov 29, 2016 at 12:47:07PM +0100, Paolo Bonzini wrote:
> Avoid entering the slow path of qemu_lockcnt_dec_and_lock if
> no bottom half has to be deleted.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  async.c | 10 +++++++++-
>  1 file changed, 9 insertions(+), 1 deletion(-)

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 455 bytes --]

^ permalink raw reply	[flat|nested] 34+ messages in thread

* Re: [Qemu-devel] [PATCH 05/10] aio: tweak walking in dispatch phase
  2016-11-29 11:47 ` [Qemu-devel] [PATCH 05/10] aio: tweak walking in dispatch phase Paolo Bonzini
@ 2016-11-30 13:38   ` Stefan Hajnoczi
  0 siblings, 0 replies; 34+ messages in thread
From: Stefan Hajnoczi @ 2016-11-30 13:38 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, qemu-block

[-- Attachment #1: Type: text/plain, Size: 465 bytes --]

On Tue, Nov 29, 2016 at 12:47:02PM +0100, Paolo Bonzini wrote:
> Preparing for the following patch, use QLIST_FOREACH_SAFE and
> modify the placement of walking_handlers increment/decrement.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  aio-posix.c | 27 +++++++++++++--------------
>  aio-win32.c | 26 ++++++++++++--------------
>  2 files changed, 25 insertions(+), 28 deletions(-)

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 455 bytes --]

^ permalink raw reply	[flat|nested] 34+ messages in thread

* Re: [Qemu-devel] [PATCH 06/10] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock
  2016-11-30 13:36     ` Paolo Bonzini
@ 2016-12-01 15:32       ` Paolo Bonzini
  0 siblings, 0 replies; 34+ messages in thread
From: Paolo Bonzini @ 2016-12-01 15:32 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: qemu-devel, qemu-block



On 30/11/2016 14:36, Paolo Bonzini wrote:
> 
> 
> On 30/11/2016 14:31, Stefan Hajnoczi wrote:
>> On Tue, Nov 29, 2016 at 12:47:03PM +0100, Paolo Bonzini wrote:
>>> @@ -272,22 +275,32 @@ bool aio_prepare(AioContext *ctx)
>>>  bool aio_pending(AioContext *ctx)
>>>  {
>>>      AioHandler *node;
>>> +    bool result = false;
>>>  
>>> -    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
>>> +    /*
>>> +     * We have to walk very carefully in case aio_set_fd_handler is
>>> +     * called while we're walking.
>>> +     */
>>> +    qemu_lockcnt_inc(&ctx->list_lock);
>>> +
>>> +    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
>>>          int revents;
>>>  
>>>          revents = node->pfd.revents & node->pfd.events;
>>>          if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
>>>              aio_node_check(ctx, node->is_external)) {
>>> -            return true;
>>> +            result = true;
>>> +            break;
>>>          }
>>>          if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
>>>              aio_node_check(ctx, node->is_external)) {
>>> -            return true;
>>> +            result = true;
>>> +            break;
>>>          }
>>>      }
>>> +    qemu_lockcnt_dec(&ctx->list_lock);
>>>  
>>> -    return false;
>>> +    return result;
>>>  }
>>>  
>>>  bool aio_dispatch(AioContext *ctx)
>>> @@ -308,13 +321,12 @@ bool aio_dispatch(AioContext *ctx)
>>>       * We have to walk very carefully in case aio_set_fd_handler is
>>>       * called while we're walking.
>>>       */
>>> -    ctx->walking_handlers++;
>>> +    qemu_lockcnt_inc(&ctx->list_lock);
>>>  
>>> -    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
>>> +    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
>>>          int revents;
>>>  
>>> -        revents = node->pfd.revents & node->pfd.events;
>>> -        node->pfd.revents = 0;
>>> +        revents = atomic_xchg(&node->pfd.revents, 0) & node->pfd.events;
>>
>> Why is node->pfd.revents accessed with atomic_*() here and in aio_poll()
>> but not in aio_pending()?
> 
> It could use atomic_read there, indeed.

Actually, thanks to the (already committed) patches that limit aio_poll
to the I/O thread, these atomic accesses are not needed anymore.

Paolo

^ permalink raw reply	[flat|nested] 34+ messages in thread

* Re: [Qemu-devel] [PATCH 02/10] qemu-thread: introduce QemuLockCnt
  2016-11-29 11:46 ` [Qemu-devel] [PATCH 02/10] qemu-thread: introduce QemuLockCnt Paolo Bonzini
  2016-11-29 19:34   ` Eric Blake
  2016-11-30 13:05   ` Stefan Hajnoczi
@ 2017-02-02 19:06   ` Emilio G. Cota
  2017-02-02 19:20     ` Emilio G. Cota
  2 siblings, 1 reply; 34+ messages in thread
From: Emilio G. Cota @ 2017-02-02 19:06 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, qemu-block

On Tue, Nov 29, 2016 at 12:46:59 +0100, Paolo Bonzini wrote:
> A QemuLockCnt comprises a counter and a mutex, with primitives
> to increment and decrement the counter, and to take and release the
> mutex.  It can be used to do lock-free visits to a data structure
> whenever mutexes would be too heavy-weight and the critical section
> is too long for RCU.
> 
> This could be implemented simply by protecting the counter with the
> mutex, but QemuLockCnt is harder to misuse and more efficient.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
(snip)
> +++ b/docs/lockcnt.txt
> @@ -0,0 +1,343 @@
> +DOCUMENTATION FOR LOCKED COUNTERS (aka QemuLockCnt)
> +===================================================
(snip)
> +    bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt);
> +
> +        If the count is 1, decrement the count to zero, lock
> +        the mutex and return true.  Otherwise, return false.
> +
(snip)
> +++ b/util/lockcnt.c
(snip)
> +void qemu_lockcnt_init(QemuLockCnt *lockcnt)
> +{
> +    qemu_mutex_init(&lockcnt->mutex);
> +    lockcnt->count = 0;

Minor nit: a release barrier here wouldn't harm.

> +}
> +
> +void qemu_lockcnt_destroy(QemuLockCnt *lockcnt)
> +{
> +    qemu_mutex_destroy(&lockcnt->mutex);
> +}
> +
> +void qemu_lockcnt_inc(QemuLockCnt *lockcnt)
> +{
> +    int old;
> +    for (;;) {
> +        old = atomic_read(&lockcnt->count);
> +        if (old == 0) {
> +            qemu_lockcnt_lock(lockcnt);
> +            qemu_lockcnt_inc_and_unlock(lockcnt);
> +            return;
> +        } else {
> +            if (atomic_cmpxchg(&lockcnt->count, old, old + 1) == old) {
> +                return;
> +            }
> +        }
> +    }
> +}
> +
> +void qemu_lockcnt_dec(QemuLockCnt *lockcnt)
> +{
> +    atomic_dec(&lockcnt->count);
> +}
> +
> +/* Decrement a counter, and return locked if it is decremented to zero.
> + * It is impossible for the counter to become nonzero while the mutex
> + * is taken.
> + */
> +bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
> +{
> +    int val = atomic_read(&lockcnt->count);
> +    while (val > 1) {
> +        int old = atomic_cmpxchg(&lockcnt->count, val, val - 1);
> +        if (old != val) {
> +            val = old;
> +            continue;
> +        }
> +
> +        return false;
> +    }

Minor nit:
	while (val > 1) {
		int old = cmpxchg();
		if (old == val) {
			return false;
		}
		val = old;
	}
seems to me a little easier to read.

> +    qemu_lockcnt_lock(lockcnt);
> +    if (atomic_fetch_dec(&lockcnt->count) == 1) {
> +        return true;
> +    }
> +
> +    qemu_lockcnt_unlock(lockcnt);
> +    return false;
> +}
> +
> +/* Decrement a counter and return locked if it is decremented to zero.
> + * Otherwise do nothing.

This comment doesn't match the code below nor the description in the
.txt file (quoted above) [we might not decrement the counter at all!]

> + *
> + * It is impossible for the counter to become nonzero while the mutex
> + * is taken.
> + */
> +bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt)
> +{
> +    int val = atomic_mb_read(&lockcnt->count);
> +    if (val > 1) {
> +        return false;
> +    }
> +
> +    qemu_lockcnt_lock(lockcnt);
> +    if (atomic_fetch_dec(&lockcnt->count) == 1) {
> +        return true;
> +    }
> +
> +    qemu_lockcnt_inc_and_unlock(lockcnt);

The choice of dec+(maybe)inc over cmpxchg seems a little odd to me.

		Emilio

^ permalink raw reply	[flat|nested] 34+ messages in thread

* Re: [Qemu-devel] [PATCH 02/10] qemu-thread: introduce QemuLockCnt
  2017-02-02 19:06   ` Emilio G. Cota
@ 2017-02-02 19:20     ` Emilio G. Cota
  0 siblings, 0 replies; 34+ messages in thread
From: Emilio G. Cota @ 2017-02-02 19:20 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, qemu-block

Just noticed the message above mistakenly sat in my outbox for
nearly 2 months. Just flushed it, so do not be surprised by
its original date.

Sorry for the noise,

		Emilio

^ permalink raw reply	[flat|nested] 34+ messages in thread

* [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux
  2017-01-12 18:07 [Qemu-devel] [PATCH v5 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
@ 2017-01-12 18:07 ` Paolo Bonzini
  0 siblings, 0 replies; 34+ messages in thread
From: Paolo Bonzini @ 2017-01-12 18:07 UTC (permalink / raw)
  To: qemu-devel; +Cc: famz, stefanha

This is complex, but I think it is reasonably documented in the source.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/lockcnt.txt         |   9 +-
 include/qemu/futex.h     |  36 ++++++
 include/qemu/thread.h    |   2 +
 util/lockcnt.c           | 283 +++++++++++++++++++++++++++++++++++++++++++++++
 util/qemu-thread-posix.c |  35 +-----
 util/qemu-thread-win32.c |   2 +-
 util/trace-events        |  10 ++
 7 files changed, 342 insertions(+), 35 deletions(-)
 create mode 100644 include/qemu/futex.h

diff --git a/docs/lockcnt.txt b/docs/lockcnt.txt
index 25a8091..2a79b32 100644
--- a/docs/lockcnt.txt
+++ b/docs/lockcnt.txt
@@ -142,12 +142,11 @@ can also be more efficient in two ways:
 - it avoids taking the lock for many operations (for example
   incrementing the counter while it is non-zero);
 
-- on some platforms, one could implement QemuLockCnt to hold the
-  lock and the mutex in a single word, making it no more expensive
+- on some platforms, one can implement QemuLockCnt to hold the lock
+  and the mutex in a single word, making the fast path no more expensive
   than simply managing a counter using atomic operations (see
-  docs/atomics.txt).  This is not implemented yet, but can be
-  very helpful if concurrent access to the data structure is
-  expected to be rare.
+  docs/atomics.txt).  This can be very helpful if concurrent access to
+  the data structure is expected to be rare.
 
 
 Using the same mutex for frees and writes can still incur some small
diff --git a/include/qemu/futex.h b/include/qemu/futex.h
new file mode 100644
index 0000000..852d612
--- /dev/null
+++ b/include/qemu/futex.h
@@ -0,0 +1,36 @@
+/*
+ * Wrappers around Linux futex syscall
+ *
+ * Copyright Red Hat, Inc. 2017
+ *
+ * Author:
+ *  Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include <sys/syscall.h>
+#include <linux/futex.h>
+
+#define qemu_futex(...)              syscall(__NR_futex, __VA_ARGS__)
+
+static inline void qemu_futex_wake(void *f, int n)
+{
+    qemu_futex(f, FUTEX_WAKE, n, NULL, NULL, 0);
+}
+
+static inline void qemu_futex_wait(void *f, unsigned val)
+{
+    while (qemu_futex(f, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
+        switch (errno) {
+        case EWOULDBLOCK:
+            return;
+        case EINTR:
+            break; /* get out of switch and retry */
+        default:
+            abort();
+        }
+    }
+}
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index 5f7de7b..9910f49 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -100,7 +100,9 @@ static inline void qemu_spin_unlock(QemuSpin *spin)
 }
 
 struct QemuLockCnt {
+#ifndef CONFIG_LINUX
     QemuMutex mutex;
+#endif
     unsigned count;
 };
 
diff --git a/util/lockcnt.c b/util/lockcnt.c
index da1de77..c0acdfe 100644
--- a/util/lockcnt.c
+++ b/util/lockcnt.c
@@ -9,7 +9,289 @@
 #include "qemu/osdep.h"
 #include "qemu/thread.h"
 #include "qemu/atomic.h"
+#include "trace.h"
 
+#ifdef CONFIG_LINUX
+#include "qemu/futex.h"
+
+/* On Linux, bits 0-1 are a futex-based lock, bits 2-31 are the counter.
+ * For the mutex algorithm see Ulrich Drepper's "Futexes Are Tricky" (ok,
+ * this is not the most relaxing citation I could make...).  It is similar
+ * to mutex2 in the paper.
+ */
+
+#define QEMU_LOCKCNT_STATE_MASK    3
+#define QEMU_LOCKCNT_STATE_FREE    0   /* free, uncontended */
+#define QEMU_LOCKCNT_STATE_LOCKED  1   /* locked, uncontended */
+#define QEMU_LOCKCNT_STATE_WAITING 2   /* locked, contended */
+
+#define QEMU_LOCKCNT_COUNT_STEP    4
+#define QEMU_LOCKCNT_COUNT_SHIFT   2
+
+void qemu_lockcnt_init(QemuLockCnt *lockcnt)
+{
+    lockcnt->count = 0;
+}
+
+void qemu_lockcnt_destroy(QemuLockCnt *lockcnt)
+{
+}
+
+/* *val is the current value of lockcnt->count.
+ *
+ * If the lock is free, try a cmpxchg from *val to new_if_free; return
+ * true and set *val to the old value found by the cmpxchg in
+ * lockcnt->count.
+ *
+ * If the lock is taken, wait for it to be released and return false
+ * *without trying again to take the lock*.  Again, set *val to the
+ * new value of lockcnt->count.
+ *
+ * If *waited is true on return, new_if_free's bottom two bits must not
+ * be QEMU_LOCKCNT_STATE_LOCKED on subsequent calls, because the caller
+ * does not know if there are other waiters.  Furthermore, after *waited
+ * is set the caller has effectively acquired the lock.  If it returns
+ * with the lock not taken, it must wake another futex waiter.
+ */
+static bool qemu_lockcnt_cmpxchg_or_wait(QemuLockCnt *lockcnt, int *val,
+                                         int new_if_free, bool *waited)
+{
+    /* Fast path for when the lock is free.  */
+    if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_FREE) {
+        int expected = *val;
+
+        trace_lockcnt_fast_path_attempt(lockcnt, expected, new_if_free);
+        *val = atomic_cmpxchg(&lockcnt->count, expected, new_if_free);
+        if (*val == expected) {
+            trace_lockcnt_fast_path_success(lockcnt, expected, new_if_free);
+            *val = new_if_free;
+            return true;
+        }
+    }
+
+    /* The slow path moves from locked to waiting if necessary, then
+     * does a futex wait.  Both steps can be repeated ad nauseam,
+     * only getting out of the loop if we can have another shot at the
+     * fast path.  Once we can, get out to compute the new destination
+     * value for the fast path.
+     */
+    while ((*val & QEMU_LOCKCNT_STATE_MASK) != QEMU_LOCKCNT_STATE_FREE) {
+        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_LOCKED) {
+            int expected = *val;
+            int new = expected - QEMU_LOCKCNT_STATE_LOCKED + QEMU_LOCKCNT_STATE_WAITING;
+
+            trace_lockcnt_futex_wait_prepare(lockcnt, expected, new);
+            *val = atomic_cmpxchg(&lockcnt->count, expected, new);
+            if (*val == expected) {
+                *val = new;
+            }
+            continue;
+        }
+
+        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_WAITING) {
+            *waited = true;
+            trace_lockcnt_futex_wait(lockcnt, *val);
+            qemu_futex_wait(&lockcnt->count, *val);
+            *val = atomic_read(&lockcnt->count);
+            trace_lockcnt_futex_wait_resume(lockcnt, *val);
+            continue;
+        }
+
+        abort();
+    }
+    return false;
+}
+
+static void lockcnt_wake(QemuLockCnt *lockcnt)
+{
+    trace_lockcnt_futex_wake(lockcnt);
+    qemu_futex_wake(&lockcnt->count, 1);
+}
+
+void qemu_lockcnt_inc(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    bool waited = false;
+
+    for (;;) {
+        if (val >= QEMU_LOCKCNT_COUNT_STEP) {
+            int expected = val;
+            val = atomic_cmpxchg(&lockcnt->count, val, val + QEMU_LOCKCNT_COUNT_STEP);
+            if (val == expected) {
+                break;
+            }
+        } else {
+            /* The fast path is (0, unlocked)->(1, unlocked).  */
+            if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, QEMU_LOCKCNT_COUNT_STEP,
+                                             &waited)) {
+                break;
+            }
+        }
+    }
+
+    /* If we were woken by another thread, we should also wake one because
+     * we are effectively releasing the lock that was given to us.  This is
+     * the case where qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING
+     * in the low bits, and qemu_lockcnt_inc_and_unlock would find it and
+     * wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+void qemu_lockcnt_dec(QemuLockCnt *lockcnt)
+{
+    atomic_sub(&lockcnt->count, QEMU_LOCKCNT_COUNT_STEP);
+}
+
+/* Decrement a counter, and return locked if it is decremented to zero.
+ * If the function returns true, it is impossible for the counter to
+ * become nonzero until the next qemu_lockcnt_unlock.
+ */
+bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    for (;;) {
+        if (val >= 2 * QEMU_LOCKCNT_COUNT_STEP) {
+            int expected = val;
+            val = atomic_cmpxchg(&lockcnt->count, val, val - QEMU_LOCKCNT_COUNT_STEP);
+            if (val == expected) {
+                break;
+            }
+        } else {
+            /* If count is going 1->0, take the lock. The fast path is
+             * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
+             */
+            if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
+                return true;
+            }
+
+            if (waited) {
+                /* At this point we do not know if there are more waiters.  Assume
+                 * there are.
+                 */
+                locked_state = QEMU_LOCKCNT_STATE_WAITING;
+            }
+        }
+    }
+
+    /* If we were woken by another thread, but we're returning in unlocked
+     * state, we should also wake a thread because we are effectively
+     * releasing the lock that was given to us.  This is the case where
+     * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
+     * bits, and qemu_lockcnt_unlock would find it and wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+    return false;
+}
+
+/* If the counter is one, decrement it and return locked.  Otherwise do
+ * nothing.
+ *
+ * If the function returns true, it is impossible for the counter to
+ * become nonzero until the next qemu_lockcnt_unlock.
+ */
+bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    while (val < 2 * QEMU_LOCKCNT_COUNT_STEP) {
+        /* If count is going 1->0, take the lock. The fast path is
+         * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
+         */
+        if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
+            return true;
+        }
+
+        if (waited) {
+            /* At this point we do not know if there are more waiters.  Assume
+             * there are.
+             */
+            locked_state = QEMU_LOCKCNT_STATE_WAITING;
+        }
+    }
+
+    /* If we were woken by another thread, but we're returning in unlocked
+     * state, we should also wake a thread because we are effectively
+     * releasing the lock that was given to us.  This is the case where
+     * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
+     * bits, and qemu_lockcnt_inc_and_unlock would find it and wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+    return false;
+}
+
+void qemu_lockcnt_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int step = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    /* The third argument is only used if the low bits of val are 0
+     * (QEMU_LOCKCNT_STATE_FREE), so just blindly mix in the desired
+     * state.
+     */
+    while (!qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, val + step, &waited)) {
+        if (waited) {
+            /* At this point we do not know if there are more waiters.  Assume
+             * there are.
+             */
+            step = QEMU_LOCKCNT_STATE_WAITING;
+        }
+    }
+}
+
+void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt)
+{
+    int expected, new, val;
+
+    val = atomic_read(&lockcnt->count);
+    do {
+        expected = val;
+        new = (val + QEMU_LOCKCNT_COUNT_STEP) & ~QEMU_LOCKCNT_STATE_MASK;
+        trace_lockcnt_unlock_attempt(lockcnt, val, new);
+        val = atomic_cmpxchg(&lockcnt->count, val, new);
+    } while (val != expected);
+
+    trace_lockcnt_unlock_success(lockcnt, val, new);
+    if (val & QEMU_LOCKCNT_STATE_WAITING) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+void qemu_lockcnt_unlock(QemuLockCnt *lockcnt)
+{
+    int expected, new, val;
+
+    val = atomic_read(&lockcnt->count);
+    do {
+        expected = val;
+        new = val & ~QEMU_LOCKCNT_STATE_MASK;
+        trace_lockcnt_unlock_attempt(lockcnt, val, new);
+        val = atomic_cmpxchg(&lockcnt->count, val, new);
+    } while (val != expected);
+
+    trace_lockcnt_unlock_success(lockcnt, val, new);
+    if (val & QEMU_LOCKCNT_STATE_WAITING) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
+{
+    return atomic_read(&lockcnt->count) >> QEMU_LOCKCNT_COUNT_SHIFT;
+}
+#else
 void qemu_lockcnt_init(QemuLockCnt *lockcnt)
 {
     qemu_mutex_init(&lockcnt->mutex);
@@ -112,3 +394,4 @@ unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
 {
     return atomic_read(&lockcnt->count);
 }
+#endif
diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c
index d20cdde..37cd8ba 100644
--- a/util/qemu-thread-posix.c
+++ b/util/qemu-thread-posix.c
@@ -11,10 +11,6 @@
  *
  */
 #include "qemu/osdep.h"
-#ifdef __linux__
-#include <sys/syscall.h>
-#include <linux/futex.h>
-#endif
 #include "qemu/thread.h"
 #include "qemu/atomic.h"
 #include "qemu/notify.h"
@@ -294,28 +290,9 @@ void qemu_sem_wait(QemuSemaphore *sem)
 }
 
 #ifdef __linux__
-#define futex(...)              syscall(__NR_futex, __VA_ARGS__)
-
-static inline void futex_wake(QemuEvent *ev, int n)
-{
-    futex(ev, FUTEX_WAKE, n, NULL, NULL, 0);
-}
-
-static inline void futex_wait(QemuEvent *ev, unsigned val)
-{
-    while (futex(ev, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
-        switch (errno) {
-        case EWOULDBLOCK:
-            return;
-        case EINTR:
-            break; /* get out of switch and retry */
-        default:
-            abort();
-        }
-    }
-}
+#include "qemu/futex.h"
 #else
-static inline void futex_wake(QemuEvent *ev, int n)
+static inline void qemu_futex_wake(QemuEvent *ev, int n)
 {
     pthread_mutex_lock(&ev->lock);
     if (n == 1) {
@@ -326,7 +303,7 @@ static inline void futex_wake(QemuEvent *ev, int n)
     pthread_mutex_unlock(&ev->lock);
 }
 
-static inline void futex_wait(QemuEvent *ev, unsigned val)
+static inline void qemu_futex_wait(QemuEvent *ev, unsigned val)
 {
     pthread_mutex_lock(&ev->lock);
     if (ev->value == val) {
@@ -338,7 +315,7 @@ static inline void futex_wait(QemuEvent *ev, unsigned val)
 
 /* Valid transitions:
  * - free->set, when setting the event
- * - busy->set, when setting the event, followed by futex_wake
+ * - busy->set, when setting the event, followed by qemu_futex_wake
  * - set->free, when resetting the event
  * - free->busy, when waiting
  *
@@ -381,7 +358,7 @@ void qemu_event_set(QemuEvent *ev)
     if (atomic_read(&ev->value) != EV_SET) {
         if (atomic_xchg(&ev->value, EV_SET) == EV_BUSY) {
             /* There were waiters, wake them up.  */
-            futex_wake(ev, INT_MAX);
+            qemu_futex_wake(ev, INT_MAX);
         }
     }
 }
@@ -419,7 +396,7 @@ void qemu_event_wait(QemuEvent *ev)
                 return;
             }
         }
-        futex_wait(ev, EV_BUSY);
+        qemu_futex_wait(ev, EV_BUSY);
     }
 }
 
diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c
index 728e76b..178e016 100644
--- a/util/qemu-thread-win32.c
+++ b/util/qemu-thread-win32.c
@@ -269,7 +269,7 @@ void qemu_sem_wait(QemuSemaphore *sem)
  *
  * Valid transitions:
  * - free->set, when setting the event
- * - busy->set, when setting the event, followed by futex_wake
+ * - busy->set, when setting the event, followed by SetEvent
  * - set->free, when resetting the event
  * - free->busy, when waiting
  *
diff --git a/util/trace-events b/util/trace-events
index ed06aee..2b8aa30 100644
--- a/util/trace-events
+++ b/util/trace-events
@@ -30,3 +30,13 @@ qemu_anon_ram_free(void *ptr, size_t size) "ptr %p size %zu"
 hbitmap_iter_skip_words(const void *hb, void *hbi, uint64_t pos, unsigned long cur) "hb %p hbi %p pos %"PRId64" cur 0x%lx"
 hbitmap_reset(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64
 hbitmap_set(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64
+
+# util/lockcnt.c
+lockcnt_fast_path_attempt(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d"
+lockcnt_fast_path_success(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d succeeded"
+lockcnt_unlock_attempt(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d"
+lockcnt_unlock_success(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d succeeded"
+lockcnt_futex_wait_prepare(const void *lockcnt, int expected, int new) "lockcnt %p preparing slow path %d->%d"
+lockcnt_futex_wait(const void *lockcnt, int val) "lockcnt %p waiting on %d"
+lockcnt_futex_wait_resume(const void *lockcnt, int new) "lockcnt %p after wait: %d"
+lockcnt_futex_wake(const void *lockcnt) "lockcnt %p waking up one waiter"
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 34+ messages in thread

* [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux
  2017-01-12 16:55 [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
@ 2017-01-12 16:55 ` Paolo Bonzini
  0 siblings, 0 replies; 34+ messages in thread
From: Paolo Bonzini @ 2017-01-12 16:55 UTC (permalink / raw)
  To: qemu-devel; +Cc: famz, stefanha

This is complex, but I think it is reasonably documented in the source.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/lockcnt.txt         |   9 +-
 include/qemu/futex.h     |  36 ++++++
 include/qemu/thread.h    |   2 +
 util/lockcnt.c           | 283 +++++++++++++++++++++++++++++++++++++++++++++++
 util/qemu-thread-posix.c |  35 +-----
 util/qemu-thread-win32.c |   2 +-
 util/trace-events        |  10 ++
 7 files changed, 342 insertions(+), 35 deletions(-)
 create mode 100644 include/qemu/futex.h

diff --git a/docs/lockcnt.txt b/docs/lockcnt.txt
index 25a8091..2a79b32 100644
--- a/docs/lockcnt.txt
+++ b/docs/lockcnt.txt
@@ -142,12 +142,11 @@ can also be more efficient in two ways:
 - it avoids taking the lock for many operations (for example
   incrementing the counter while it is non-zero);
 
-- on some platforms, one could implement QemuLockCnt to hold the
-  lock and the mutex in a single word, making it no more expensive
+- on some platforms, one can implement QemuLockCnt to hold the lock
+  and the mutex in a single word, making the fast path no more expensive
   than simply managing a counter using atomic operations (see
-  docs/atomics.txt).  This is not implemented yet, but can be
-  very helpful if concurrent access to the data structure is
-  expected to be rare.
+  docs/atomics.txt).  This can be very helpful if concurrent access to
+  the data structure is expected to be rare.
 
 
 Using the same mutex for frees and writes can still incur some small
diff --git a/include/qemu/futex.h b/include/qemu/futex.h
new file mode 100644
index 0000000..852d612
--- /dev/null
+++ b/include/qemu/futex.h
@@ -0,0 +1,36 @@
+/*
+ * Wrappers around Linux futex syscall
+ *
+ * Copyright Red Hat, Inc. 2017
+ *
+ * Author:
+ *  Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include <sys/syscall.h>
+#include <linux/futex.h>
+
+#define qemu_futex(...)              syscall(__NR_futex, __VA_ARGS__)
+
+static inline void qemu_futex_wake(void *f, int n)
+{
+    qemu_futex(f, FUTEX_WAKE, n, NULL, NULL, 0);
+}
+
+static inline void qemu_futex_wait(void *f, unsigned val)
+{
+    while (qemu_futex(f, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
+        switch (errno) {
+        case EWOULDBLOCK:
+            return;
+        case EINTR:
+            break; /* get out of switch and retry */
+        default:
+            abort();
+        }
+    }
+}
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index 5f7de7b..9910f49 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -100,7 +100,9 @@ static inline void qemu_spin_unlock(QemuSpin *spin)
 }
 
 struct QemuLockCnt {
+#ifndef CONFIG_LINUX
     QemuMutex mutex;
+#endif
     unsigned count;
 };
 
diff --git a/util/lockcnt.c b/util/lockcnt.c
index da1de77..c0acdfe 100644
--- a/util/lockcnt.c
+++ b/util/lockcnt.c
@@ -9,7 +9,289 @@
 #include "qemu/osdep.h"
 #include "qemu/thread.h"
 #include "qemu/atomic.h"
+#include "trace.h"
 
+#ifdef CONFIG_LINUX
+#include "qemu/futex.h"
+
+/* On Linux, bits 0-1 are a futex-based lock, bits 2-31 are the counter.
+ * For the mutex algorithm see Ulrich Drepper's "Futexes Are Tricky" (ok,
+ * this is not the most relaxing citation I could make...).  It is similar
+ * to mutex2 in the paper.
+ */
+
+#define QEMU_LOCKCNT_STATE_MASK    3
+#define QEMU_LOCKCNT_STATE_FREE    0	/* free, uncontended */
+#define QEMU_LOCKCNT_STATE_LOCKED  1	/* locked, uncontended */
+#define QEMU_LOCKCNT_STATE_WAITING 2   /* locked, contended */
+
+#define QEMU_LOCKCNT_COUNT_STEP    4
+#define QEMU_LOCKCNT_COUNT_SHIFT   2
+
+void qemu_lockcnt_init(QemuLockCnt *lockcnt)
+{
+    lockcnt->count = 0;
+}
+
+void qemu_lockcnt_destroy(QemuLockCnt *lockcnt)
+{
+}
+
+/* *val is the current value of lockcnt->count.
+ *
+ * If the lock is free, try a cmpxchg from *val to new_if_free; return
+ * true and set *val to the old value found by the cmpxchg in
+ * lockcnt->count.
+ *
+ * If the lock is taken, wait for it to be released and return false
+ * *without trying again to take the lock*.  Again, set *val to the
+ * new value of lockcnt->count.
+ *
+ * If *waited is true on return, new_if_free's bottom two bits must not
+ * be QEMU_LOCKCNT_STATE_LOCKED on subsequent calls, because the caller
+ * does not know if there are other waiters.  Furthermore, after *waited
+ * is set the caller has effectively acquired the lock.  If it returns
+ * with the lock not taken, it must wake another futex waiter.
+ */
+static bool qemu_lockcnt_cmpxchg_or_wait(QemuLockCnt *lockcnt, int *val,
+                                         int new_if_free, bool *waited)
+{
+    /* Fast path for when the lock is free.  */
+    if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_FREE) {
+        int expected = *val;
+
+        trace_lockcnt_fast_path_attempt(lockcnt, expected, new_if_free);
+        *val = atomic_cmpxchg(&lockcnt->count, expected, new_if_free);
+        if (*val == expected) {
+            trace_lockcnt_fast_path_success(lockcnt, expected, new_if_free);
+            *val = new_if_free;
+            return true;
+        }
+    }
+
+    /* The slow path moves from locked to waiting if necessary, then
+     * does a futex wait.  Both steps can be repeated ad nauseam,
+     * only getting out of the loop if we can have another shot at the
+     * fast path.  Once we can, get out to compute the new destination
+     * value for the fast path.
+     */
+    while ((*val & QEMU_LOCKCNT_STATE_MASK) != QEMU_LOCKCNT_STATE_FREE) {
+        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_LOCKED) {
+            int expected = *val;
+            int new = expected - QEMU_LOCKCNT_STATE_LOCKED + QEMU_LOCKCNT_STATE_WAITING;
+
+            trace_lockcnt_futex_wait_prepare(lockcnt, expected, new);
+            *val = atomic_cmpxchg(&lockcnt->count, expected, new);
+            if (*val == expected) {
+                *val = new;
+            }
+            continue;
+        }
+
+        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_WAITING) {
+            *waited = true;
+            trace_lockcnt_futex_wait(lockcnt, *val);
+            qemu_futex_wait(&lockcnt->count, *val);
+            *val = atomic_read(&lockcnt->count);
+            trace_lockcnt_futex_wait_resume(lockcnt, *val);
+            continue;
+        }
+
+        abort();
+    }
+    return false;
+}
+
+static void lockcnt_wake(QemuLockCnt *lockcnt)
+{
+    trace_lockcnt_futex_wake(lockcnt);
+    qemu_futex_wake(&lockcnt->count, 1);
+}
+
+void qemu_lockcnt_inc(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    bool waited = false;
+
+    for (;;) {
+        if (val >= QEMU_LOCKCNT_COUNT_STEP) {
+            int expected = val;
+            val = atomic_cmpxchg(&lockcnt->count, val, val + QEMU_LOCKCNT_COUNT_STEP);
+            if (val == expected) {
+                break;
+            }
+        } else {
+            /* The fast path is (0, unlocked)->(1, unlocked).  */
+            if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, QEMU_LOCKCNT_COUNT_STEP,
+                                             &waited)) {
+                break;
+            }
+        }
+    }
+
+    /* If we were woken by another thread, we should also wake one because
+     * we are effectively releasing the lock that was given to us.  This is
+     * the case where qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING
+     * in the low bits, and qemu_lockcnt_inc_and_unlock would find it and
+     * wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+void qemu_lockcnt_dec(QemuLockCnt *lockcnt)
+{
+    atomic_sub(&lockcnt->count, QEMU_LOCKCNT_COUNT_STEP);
+}
+
+/* Decrement a counter, and return locked if it is decremented to zero.
+ * If the function returns true, it is impossible for the counter to
+ * become nonzero until the next qemu_lockcnt_unlock.
+ */
+bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    for (;;) {
+        if (val >= 2 * QEMU_LOCKCNT_COUNT_STEP) {
+            int expected = val;
+            val = atomic_cmpxchg(&lockcnt->count, val, val - QEMU_LOCKCNT_COUNT_STEP);
+            if (val == expected) {
+                break;
+            }
+        } else {
+            /* If count is going 1->0, take the lock. The fast path is
+             * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
+             */
+            if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
+                return true;
+            }
+
+            if (waited) {
+                /* At this point we do not know if there are more waiters.  Assume
+                 * there are.
+                 */
+                locked_state = QEMU_LOCKCNT_STATE_WAITING;
+            }
+        }
+    }
+
+    /* If we were woken by another thread, but we're returning in unlocked
+     * state, we should also wake a thread because we are effectively
+     * releasing the lock that was given to us.  This is the case where
+     * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
+     * bits, and qemu_lockcnt_unlock would find it and wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+    return false;
+}
+
+/* If the counter is one, decrement it and return locked.  Otherwise do
+ * nothing.
+ *
+ * If the function returns true, it is impossible for the counter to
+ * become nonzero until the next qemu_lockcnt_unlock.
+ */
+bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    while (val < 2 * QEMU_LOCKCNT_COUNT_STEP) {
+        /* If count is going 1->0, take the lock. The fast path is
+         * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
+         */
+        if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
+            return true;
+        }
+
+        if (waited) {
+            /* At this point we do not know if there are more waiters.  Assume
+             * there are.
+             */
+            locked_state = QEMU_LOCKCNT_STATE_WAITING;
+        }
+    }
+
+    /* If we were woken by another thread, but we're returning in unlocked
+     * state, we should also wake a thread because we are effectively
+     * releasing the lock that was given to us.  This is the case where
+     * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
+     * bits, and qemu_lockcnt_inc_and_unlock would find it and wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+    return false;
+}
+
+void qemu_lockcnt_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int step = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    /* The third argument is only used if the low bits of val are 0
+     * (QEMU_LOCKCNT_STATE_FREE), so just blindly mix in the desired
+     * state.
+     */
+    while (!qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, val + step, &waited)) {
+        if (waited) {
+            /* At this point we do not know if there are more waiters.  Assume
+             * there are.
+             */
+            step = QEMU_LOCKCNT_STATE_WAITING;
+        }
+    }
+}
+
+void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt)
+{
+    int expected, new, val;
+
+    val = atomic_read(&lockcnt->count);
+    do {
+        expected = val;
+        new = (val + QEMU_LOCKCNT_COUNT_STEP) & ~QEMU_LOCKCNT_STATE_MASK;
+        trace_lockcnt_unlock_attempt(lockcnt, val, new);
+        val = atomic_cmpxchg(&lockcnt->count, val, new);
+    } while (val != expected);
+
+    trace_lockcnt_unlock_success(lockcnt, val, new);
+    if (val & QEMU_LOCKCNT_STATE_WAITING) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+void qemu_lockcnt_unlock(QemuLockCnt *lockcnt)
+{
+    int expected, new, val;
+
+    val = atomic_read(&lockcnt->count);
+    do {
+        expected = val;
+        new = val & ~QEMU_LOCKCNT_STATE_MASK;
+        trace_lockcnt_unlock_attempt(lockcnt, val, new);
+        val = atomic_cmpxchg(&lockcnt->count, val, new);
+    } while (val != expected);
+
+    trace_lockcnt_unlock_success(lockcnt, val, new);
+    if (val & QEMU_LOCKCNT_STATE_WAITING) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
+{
+    return atomic_read(&lockcnt->count) >> QEMU_LOCKCNT_COUNT_SHIFT;
+}
+#else
 void qemu_lockcnt_init(QemuLockCnt *lockcnt)
 {
     qemu_mutex_init(&lockcnt->mutex);
@@ -112,3 +394,4 @@ unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
 {
     return atomic_read(&lockcnt->count);
 }
+#endif
diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c
index d20cdde..37cd8ba 100644
--- a/util/qemu-thread-posix.c
+++ b/util/qemu-thread-posix.c
@@ -11,10 +11,6 @@
  *
  */
 #include "qemu/osdep.h"
-#ifdef __linux__
-#include <sys/syscall.h>
-#include <linux/futex.h>
-#endif
 #include "qemu/thread.h"
 #include "qemu/atomic.h"
 #include "qemu/notify.h"
@@ -294,28 +290,9 @@ void qemu_sem_wait(QemuSemaphore *sem)
 }
 
 #ifdef __linux__
-#define futex(...)              syscall(__NR_futex, __VA_ARGS__)
-
-static inline void futex_wake(QemuEvent *ev, int n)
-{
-    futex(ev, FUTEX_WAKE, n, NULL, NULL, 0);
-}
-
-static inline void futex_wait(QemuEvent *ev, unsigned val)
-{
-    while (futex(ev, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
-        switch (errno) {
-        case EWOULDBLOCK:
-            return;
-        case EINTR:
-            break; /* get out of switch and retry */
-        default:
-            abort();
-        }
-    }
-}
+#include "qemu/futex.h"
 #else
-static inline void futex_wake(QemuEvent *ev, int n)
+static inline void qemu_futex_wake(QemuEvent *ev, int n)
 {
     pthread_mutex_lock(&ev->lock);
     if (n == 1) {
@@ -326,7 +303,7 @@ static inline void futex_wake(QemuEvent *ev, int n)
     pthread_mutex_unlock(&ev->lock);
 }
 
-static inline void futex_wait(QemuEvent *ev, unsigned val)
+static inline void qemu_futex_wait(QemuEvent *ev, unsigned val)
 {
     pthread_mutex_lock(&ev->lock);
     if (ev->value == val) {
@@ -338,7 +315,7 @@ static inline void futex_wait(QemuEvent *ev, unsigned val)
 
 /* Valid transitions:
  * - free->set, when setting the event
- * - busy->set, when setting the event, followed by futex_wake
+ * - busy->set, when setting the event, followed by qemu_futex_wake
  * - set->free, when resetting the event
  * - free->busy, when waiting
  *
@@ -381,7 +358,7 @@ void qemu_event_set(QemuEvent *ev)
     if (atomic_read(&ev->value) != EV_SET) {
         if (atomic_xchg(&ev->value, EV_SET) == EV_BUSY) {
             /* There were waiters, wake them up.  */
-            futex_wake(ev, INT_MAX);
+            qemu_futex_wake(ev, INT_MAX);
         }
     }
 }
@@ -419,7 +396,7 @@ void qemu_event_wait(QemuEvent *ev)
                 return;
             }
         }
-        futex_wait(ev, EV_BUSY);
+        qemu_futex_wait(ev, EV_BUSY);
     }
 }
 
diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c
index 728e76b..178e016 100644
--- a/util/qemu-thread-win32.c
+++ b/util/qemu-thread-win32.c
@@ -269,7 +269,7 @@ void qemu_sem_wait(QemuSemaphore *sem)
  *
  * Valid transitions:
  * - free->set, when setting the event
- * - busy->set, when setting the event, followed by futex_wake
+ * - busy->set, when setting the event, followed by SetEvent
  * - set->free, when resetting the event
  * - free->busy, when waiting
  *
diff --git a/util/trace-events b/util/trace-events
index ed06aee..2b8aa30 100644
--- a/util/trace-events
+++ b/util/trace-events
@@ -30,3 +30,13 @@ qemu_anon_ram_free(void *ptr, size_t size) "ptr %p size %zu"
 hbitmap_iter_skip_words(const void *hb, void *hbi, uint64_t pos, unsigned long cur) "hb %p hbi %p pos %"PRId64" cur 0x%lx"
 hbitmap_reset(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64
 hbitmap_set(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64
+
+# util/lockcnt.c
+lockcnt_fast_path_attempt(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d"
+lockcnt_fast_path_success(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d succeeded"
+lockcnt_unlock_attempt(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d"
+lockcnt_unlock_success(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d succeeded"
+lockcnt_futex_wait_prepare(const void *lockcnt, int expected, int new) "lockcnt %p preparing slow path %d->%d"
+lockcnt_futex_wait(const void *lockcnt, int val) "lockcnt %p waiting on %d"
+lockcnt_futex_wait_resume(const void *lockcnt, int new) "lockcnt %p after wait: %d"
+lockcnt_futex_wake(const void *lockcnt) "lockcnt %p waking up one waiter"
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 34+ messages in thread

* Re: [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux
  2017-01-12 13:34   ` Fam Zheng
@ 2017-01-12 15:40     ` Paolo Bonzini
  0 siblings, 0 replies; 34+ messages in thread
From: Paolo Bonzini @ 2017-01-12 15:40 UTC (permalink / raw)
  To: Fam Zheng; +Cc: qemu-devel, stefanha



On 12/01/2017 14:34, Fam Zheng wrote:
>> +     */
>> +    while ((*val & QEMU_LOCKCNT_STATE_MASK) != QEMU_LOCKCNT_STATE_FREE) {
>> +        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_LOCKED) {
>> +            int expected = *val;
>> +            int new = expected - QEMU_LOCKCNT_STATE_LOCKED + QEMU_LOCKCNT_STATE_WAITING;
>> +
>> +            trace_lockcnt_futex_wait_prepare(lockcnt, expected, new);
> ... the holder thread releases the lock at this point. In this case a second
> call to this function in qemu_lockcnt_dec_and_lock does pass
> QEMU_LOCKCNT_STATE_LOCKED in new_if_free, because 'waited' is left false there.
> The code is okay, but the comment above is too strict.

Right.

>> +bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
>> +{
>> +    int val = atomic_read(&lockcnt->count);
>> +    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
>> +    bool waited = false;
>> +
>> +    for (;;) {
>> +        if (val >= 2 * QEMU_LOCKCNT_COUNT_STEP) {
>> +            int expected = val;
>> +            int new = val - QEMU_LOCKCNT_COUNT_STEP;
>> +            val = atomic_cmpxchg(&lockcnt->count, val, new);
>> +            if (val == expected) {
>> +                break;
>> +            }
> If (val != expected && val >= 2 * QEMU_LOCKCNT_COUNT_STEP), should this
> atomic_cmpxchg be retried before trying qemu_lockcnt_cmpxchg_or_wait?
> 

Yeah, the below can be moved entirely in an "else".

Paolo

^ permalink raw reply	[flat|nested] 34+ messages in thread

* Re: [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux
  2017-01-04 13:26 ` [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini
  2017-01-11 16:50   ` Stefan Hajnoczi
@ 2017-01-12 13:34   ` Fam Zheng
  2017-01-12 15:40     ` Paolo Bonzini
  1 sibling, 1 reply; 34+ messages in thread
From: Fam Zheng @ 2017-01-12 13:34 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, stefanha

On Wed, 01/04 14:26, Paolo Bonzini wrote:
> diff --git a/include/qemu/futex.h b/include/qemu/futex.h
> new file mode 100644
> index 0000000..852d612
> --- /dev/null
> +++ b/include/qemu/futex.h
> @@ -0,0 +1,36 @@
> +/*
> + * Wrappers around Linux futex syscall
> + *
> + * Copyright Red Hat, Inc. 2015

2015 - 2017, too?

> + *
> + * Author:
> + *  Paolo Bonzini <pbonzini@redhat.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + *
> + */
> +
> +#include <sys/syscall.h>
> +#include <linux/futex.h>
> +
> +#define qemu_futex(...)              syscall(__NR_futex, __VA_ARGS__)
> +
> +static inline void qemu_futex_wake(void *f, int n)
> +{
> +    qemu_futex(f, FUTEX_WAKE, n, NULL, NULL, 0);
> +}
> +
> +static inline void qemu_futex_wait(void *f, unsigned val)
> +{
> +    while (qemu_futex(f, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
> +        switch (errno) {
> +        case EWOULDBLOCK:
> +            return;
> +        case EINTR:
> +            break; /* get out of switch and retry */
> +        default:
> +            abort();
> +        }
> +    }
> +}
> diff --git a/util/lockcnt.c b/util/lockcnt.c
> index 78ed1e4..40cc02a 100644
> --- a/util/lockcnt.c
> +++ b/util/lockcnt.c
> @@ -9,7 +9,288 @@
>  #include "qemu/osdep.h"
>  #include "qemu/thread.h"
>  #include "qemu/atomic.h"
> +#include "trace.h"
>  
> +#ifdef CONFIG_LINUX
> +#include "qemu/futex.h"
> +
> +/* On Linux, bits 0-1 are a futex-based lock, bits 2-31 are the counter.
> + * For the mutex algorithm see Ulrich Drepper's "Futexes Are Tricky" (ok,
> + * this is not the most relaxing citation I could make...).  It is similar
> + * to mutex2 in the paper.
> + */
> +
> +#define QEMU_LOCKCNT_STATE_MASK    3
> +#define QEMU_LOCKCNT_STATE_FREE    0
> +#define QEMU_LOCKCNT_STATE_LOCKED  1

I find the macro names a bit incomplete in describing the semantics but maybe
you want to limit the length, making it harder to understand the mutex
implementation without reading the paper. How about adding a comment saying
"locked" is "locked but _not waited_" and "waiting" is "_locked_ and waited"?
It's up to you, because this is trivial compared to the real complexity of this
patch. :)

> +#define QEMU_LOCKCNT_STATE_WAITING 2
> +
> +#define QEMU_LOCKCNT_COUNT_STEP    4
> +#define QEMU_LOCKCNT_COUNT_SHIFT   2
> +
> +void qemu_lockcnt_init(QemuLockCnt *lockcnt)
> +{
> +    lockcnt->count = 0;
> +}
> +
> +void qemu_lockcnt_destroy(QemuLockCnt *lockcnt)
> +{
> +}
> +
> +/* *val is the current value of lockcnt->count.
> + *
> + * If the lock is free, try a cmpxchg from *val to new_if_free; return
> + * true and set *val to the old value found by the cmpxchg in
> + * lockcnt->count.
> + *
> + * If the lock is taken, wait for it to be released and return false
> + * *without trying again to take the lock*.  Again, set *val to the
> + * new value of lockcnt->count.
> + *
> + * new_if_free's bottom two bits must not be QEMU_LOCKCNT_STATE_LOCKED
> + * if calling this function a second time after it has returned
> + * false.

"and waited"? I think it is possible this function return false with the lock
actually being free, when ...

> + */
> +static bool qemu_lockcnt_cmpxchg_or_wait(QemuLockCnt *lockcnt, int *val,
> +                                         int new_if_free, bool *waited)
> +{
> +    /* Fast path for when the lock is free.  */
> +    if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_FREE) {
> +        int expected = *val;
> +
> +        trace_lockcnt_fast_path_attempt(lockcnt, expected, new_if_free);
> +        *val = atomic_cmpxchg(&lockcnt->count, expected, new_if_free);
> +        if (*val == expected) {
> +            trace_lockcnt_fast_path_success(lockcnt, expected, new_if_free);
> +            *val = new_if_free;
> +            return true;
> +        }
> +    }
> +
> +    /* The slow path moves from locked to waiting if necessary, then
> +     * does a futex wait.  Both steps can be repeated ad nauseam,
> +     * only getting out of the loop if we can have another shot at the
> +     * fast path.  Once we can, get out to compute the new destination
> +     * value for the fast path.
> +     */
> +    while ((*val & QEMU_LOCKCNT_STATE_MASK) != QEMU_LOCKCNT_STATE_FREE) {
> +        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_LOCKED) {
> +            int expected = *val;
> +            int new = expected - QEMU_LOCKCNT_STATE_LOCKED + QEMU_LOCKCNT_STATE_WAITING;
> +
> +            trace_lockcnt_futex_wait_prepare(lockcnt, expected, new);

... the holder thread releases the lock at this point. In this case a second
call to this function in qemu_lockcnt_dec_and_lock does pass
QEMU_LOCKCNT_STATE_LOCKED in new_if_free, because 'waited' is left false there.
The code is okay, but the comment above is too strict.

> +            *val = atomic_cmpxchg(&lockcnt->count, expected, new);
> +            if (*val == expected) {
> +                *val = new;
> +            }
> +            continue;
> +        }
> +
> +        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_WAITING) {
> +            *waited = true;
> +            trace_lockcnt_futex_wait(lockcnt, *val);
> +            qemu_futex_wait(&lockcnt->count, *val);
> +            *val = atomic_read(&lockcnt->count);
> +            trace_lockcnt_futex_wait_resume(lockcnt, *val);
> +            continue;
> +        }
> +
> +        abort();
> +    }
> +    return false;
> +}
> +
> +void qemu_lockcnt_inc(QemuLockCnt *lockcnt)
> +{
> +    int val = atomic_read(&lockcnt->count);
> +    bool waited = false;
> +
> +    for (;;) {
> +        if (val >= QEMU_LOCKCNT_COUNT_STEP) {
> +            int expected = val;
> +            val = atomic_cmpxchg(&lockcnt->count, val, val + QEMU_LOCKCNT_COUNT_STEP);
> +            if (val == expected) {
> +                break;
> +            }
> +        } else {
> +            /* The fast path is (0, unlocked)->(1, unlocked).  */
> +            if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, QEMU_LOCKCNT_COUNT_STEP,
> +                                             &waited)) {
> +                break;
> +            }
> +        }
> +    }
> +
> +    /* If we were woken by another thread, we should also wake one because
> +     * we are effectively releasing the lock that was given to us.  This is
> +     * the case where qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING
> +     * in the low bits, and qemu_lockcnt_inc_and_unlock would find it and
> +     * wake someone.
> +     */
> +    if (waited) {
> +        lockcnt_wake(lockcnt);
> +    }
> +}
> +
> +/* Decrement a counter, and return locked if it is decremented to zero.
> + * If the function returns true, it is impossible for the counter to
> + * become nonzero until the next qemu_lockcnt_unlock.
> + */
> +bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
> +{
> +    int val = atomic_read(&lockcnt->count);
> +    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
> +    bool waited = false;
> +
> +    for (;;) {
> +        if (val >= 2 * QEMU_LOCKCNT_COUNT_STEP) {
> +            int expected = val;
> +            int new = val - QEMU_LOCKCNT_COUNT_STEP;
> +            val = atomic_cmpxchg(&lockcnt->count, val, new);
> +            if (val == expected) {
> +                break;
> +            }

If (val != expected && val >= 2 * QEMU_LOCKCNT_COUNT_STEP), should this
atomic_cmpxchg be retried before trying qemu_lockcnt_cmpxchg_or_wait?

> +        }
> +
> +        /* If count is going 1->0, take the lock. The fast path is
> +         * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
> +         */
> +        if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
> +            return true;
> +        }
> +
> +        if (waited) {
> +            /* At this point we do not know if there are more waiters.  Assume
> +             * there are.
> +             */
> +            locked_state = QEMU_LOCKCNT_STATE_WAITING;
> +        }
> +    }
> +
> +    /* If we were woken by another thread, but we're returning in unlocked
> +     * state, we should also wake a thread because we are effectively
> +     * releasing the lock that was given to us.  This is the case where
> +     * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
> +     * bits, and qemu_lockcnt_unlock would find it and wake someone.
> +     */
> +    if (waited) {
> +        lockcnt_wake(lockcnt);
> +    }
> +    return false;
> +}
> +
> +/* If the counter is one, decrement it and return locked.  Otherwise do
> + * nothing.
> + *
> + * If the function returns true, it is impossible for the counter to
> + * become nonzero until the next qemu_lockcnt_unlock.
> + */
> +bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt)
> +{
> +    int val = atomic_read(&lockcnt->count);
> +    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
> +    bool waited = false;
> +
> +    while (val < 2 * QEMU_LOCKCNT_COUNT_STEP) {
> +        /* If count is going 1->0, take the lock. The fast path is
> +         * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
> +         */
> +        if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
> +            return true;
> +        }
> +
> +        if (waited) {
> +            /* At this point we do not know if there are more waiters.  Assume
> +             * there are.
> +             */
> +            locked_state = QEMU_LOCKCNT_STATE_WAITING;
> +        }
> +    }
> +
> +    /* If we were woken by another thread, but we're returning in unlocked
> +     * state, we should also wake a thread because we are effectively
> +     * releasing the lock that was given to us.  This is the case where
> +     * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
> +     * bits, and qemu_lockcnt_inc_and_unlock would find it and wake someone.
> +     */
> +    if (waited) {
> +        lockcnt_wake(lockcnt);
> +    }
> +    return false;
> +}
> +
> +void qemu_lockcnt_lock(QemuLockCnt *lockcnt)
> +{
> +    int val = atomic_read(&lockcnt->count);
> +    int step = QEMU_LOCKCNT_STATE_LOCKED;
> +    bool waited = false;
> +
> +    /* The third argument is only used if the low bits of val are 0
> +     * (QEMU_LOCKCNT_STATE_FREE), so just blindly mix in the desired
> +     * state.
> +     */
> +    while (!qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, val + step, &waited)) {
> +        if (waited) {
> +            /* At this point we do not know if there are more waiters.  Assume
> +             * there are.
> +             */
> +            step = QEMU_LOCKCNT_STATE_WAITING;
> +        }
> +    }
> +}
> +
> +void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt)
> +{
> +    int expected, new, val;
> +
> +    val = atomic_read(&lockcnt->count);
> +    do {
> +        expected = val;
> +        new = (val + QEMU_LOCKCNT_COUNT_STEP) & ~QEMU_LOCKCNT_STATE_MASK;
> +        trace_lockcnt_unlock_attempt(lockcnt, val, new);
> +        val = atomic_cmpxchg(&lockcnt->count, val, new);
> +    } while (val != expected);
> +
> +    trace_lockcnt_unlock_success(lockcnt, val, new);
> +    if (val & QEMU_LOCKCNT_STATE_WAITING) {
> +        lockcnt_wake(lockcnt);
> +    }
> +}

Fam

^ permalink raw reply	[flat|nested] 34+ messages in thread

* Re: [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux
  2017-01-11 16:50   ` Stefan Hajnoczi
@ 2017-01-11 16:52     ` Paolo Bonzini
  0 siblings, 0 replies; 34+ messages in thread
From: Paolo Bonzini @ 2017-01-11 16:52 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: qemu-devel, famz

[-- Attachment #1: Type: text/plain, Size: 401 bytes --]



On 11/01/2017 17:50, Stefan Hajnoczi wrote:
> On Wed, Jan 04, 2017 at 02:26:19PM +0100, Paolo Bonzini wrote:
>> +unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
>> +{
>> +    return lockcnt->count >> QEMU_LOCKCNT_COUNT_SHIFT;
> 
> According to docs/atomics.txt at least atomic_read() should be used here
> otherwise sanitizers could flag up this memory access.

Good point.

Paolo


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 473 bytes --]

^ permalink raw reply	[flat|nested] 34+ messages in thread

* Re: [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux
  2017-01-04 13:26 ` [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini
@ 2017-01-11 16:50   ` Stefan Hajnoczi
  2017-01-11 16:52     ` Paolo Bonzini
  2017-01-12 13:34   ` Fam Zheng
  1 sibling, 1 reply; 34+ messages in thread
From: Stefan Hajnoczi @ 2017-01-11 16:50 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, famz

[-- Attachment #1: Type: text/plain, Size: 308 bytes --]

On Wed, Jan 04, 2017 at 02:26:19PM +0100, Paolo Bonzini wrote:
> +unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
> +{
> +    return lockcnt->count >> QEMU_LOCKCNT_COUNT_SHIFT;

According to docs/atomics.txt at least atomic_read() should be used here
otherwise sanitizers could flag up this memory access.

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 455 bytes --]

^ permalink raw reply	[flat|nested] 34+ messages in thread

* [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux
  2017-01-04 13:26 [Qemu-devel] [PATCH v3 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
@ 2017-01-04 13:26 ` Paolo Bonzini
  2017-01-11 16:50   ` Stefan Hajnoczi
  2017-01-12 13:34   ` Fam Zheng
  0 siblings, 2 replies; 34+ messages in thread
From: Paolo Bonzini @ 2017-01-04 13:26 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz

This is complex, but I think it is reasonably documented in the source.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/lockcnt.txt         |   9 +-
 include/qemu/futex.h     |  36 ++++++
 include/qemu/thread.h    |   2 +
 util/lockcnt.c           | 282 +++++++++++++++++++++++++++++++++++++++++++++++
 util/qemu-thread-posix.c |  35 +-----
 util/qemu-thread-win32.c |   2 +-
 util/trace-events        |  10 ++
 7 files changed, 341 insertions(+), 35 deletions(-)
 create mode 100644 include/qemu/futex.h

diff --git a/docs/lockcnt.txt b/docs/lockcnt.txt
index 25a8091..2a79b32 100644
--- a/docs/lockcnt.txt
+++ b/docs/lockcnt.txt
@@ -142,12 +142,11 @@ can also be more efficient in two ways:
 - it avoids taking the lock for many operations (for example
   incrementing the counter while it is non-zero);
 
-- on some platforms, one could implement QemuLockCnt to hold the
-  lock and the mutex in a single word, making it no more expensive
+- on some platforms, one can implement QemuLockCnt to hold the lock
+  and the mutex in a single word, making the fast path no more expensive
   than simply managing a counter using atomic operations (see
-  docs/atomics.txt).  This is not implemented yet, but can be
-  very helpful if concurrent access to the data structure is
-  expected to be rare.
+  docs/atomics.txt).  This can be very helpful if concurrent access to
+  the data structure is expected to be rare.
 
 
 Using the same mutex for frees and writes can still incur some small
diff --git a/include/qemu/futex.h b/include/qemu/futex.h
new file mode 100644
index 0000000..852d612
--- /dev/null
+++ b/include/qemu/futex.h
@@ -0,0 +1,36 @@
+/*
+ * Wrappers around Linux futex syscall
+ *
+ * Copyright Red Hat, Inc. 2015
+ *
+ * Author:
+ *  Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include <sys/syscall.h>
+#include <linux/futex.h>
+
+#define qemu_futex(...)              syscall(__NR_futex, __VA_ARGS__)
+
+static inline void qemu_futex_wake(void *f, int n)
+{
+    qemu_futex(f, FUTEX_WAKE, n, NULL, NULL, 0);
+}
+
+static inline void qemu_futex_wait(void *f, unsigned val)
+{
+    while (qemu_futex(f, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
+        switch (errno) {
+        case EWOULDBLOCK:
+            return;
+        case EINTR:
+            break; /* get out of switch and retry */
+        default:
+            abort();
+        }
+    }
+}
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index 7944f79..93337c4 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -100,7 +100,9 @@ static inline void qemu_spin_unlock(QemuSpin *spin)
 }
 
 struct QemuLockCnt {
+#ifndef CONFIG_LINUX
     QemuMutex mutex;
+#endif
     unsigned count;
 };
 
diff --git a/util/lockcnt.c b/util/lockcnt.c
index 78ed1e4..40cc02a 100644
--- a/util/lockcnt.c
+++ b/util/lockcnt.c
@@ -9,7 +9,288 @@
 #include "qemu/osdep.h"
 #include "qemu/thread.h"
 #include "qemu/atomic.h"
+#include "trace.h"
 
+#ifdef CONFIG_LINUX
+#include "qemu/futex.h"
+
+/* On Linux, bits 0-1 are a futex-based lock, bits 2-31 are the counter.
+ * For the mutex algorithm see Ulrich Drepper's "Futexes Are Tricky" (ok,
+ * this is not the most relaxing citation I could make...).  It is similar
+ * to mutex2 in the paper.
+ */
+
+#define QEMU_LOCKCNT_STATE_MASK    3
+#define QEMU_LOCKCNT_STATE_FREE    0
+#define QEMU_LOCKCNT_STATE_LOCKED  1
+#define QEMU_LOCKCNT_STATE_WAITING 2
+
+#define QEMU_LOCKCNT_COUNT_STEP    4
+#define QEMU_LOCKCNT_COUNT_SHIFT   2
+
+void qemu_lockcnt_init(QemuLockCnt *lockcnt)
+{
+    lockcnt->count = 0;
+}
+
+void qemu_lockcnt_destroy(QemuLockCnt *lockcnt)
+{
+}
+
+/* *val is the current value of lockcnt->count.
+ *
+ * If the lock is free, try a cmpxchg from *val to new_if_free; return
+ * true and set *val to the old value found by the cmpxchg in
+ * lockcnt->count.
+ *
+ * If the lock is taken, wait for it to be released and return false
+ * *without trying again to take the lock*.  Again, set *val to the
+ * new value of lockcnt->count.
+ *
+ * new_if_free's bottom two bits must not be QEMU_LOCKCNT_STATE_LOCKED
+ * if calling this function a second time after it has returned
+ * false.
+ */
+static bool qemu_lockcnt_cmpxchg_or_wait(QemuLockCnt *lockcnt, int *val,
+                                         int new_if_free, bool *waited)
+{
+    /* Fast path for when the lock is free.  */
+    if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_FREE) {
+        int expected = *val;
+
+        trace_lockcnt_fast_path_attempt(lockcnt, expected, new_if_free);
+        *val = atomic_cmpxchg(&lockcnt->count, expected, new_if_free);
+        if (*val == expected) {
+            trace_lockcnt_fast_path_success(lockcnt, expected, new_if_free);
+            *val = new_if_free;
+            return true;
+        }
+    }
+
+    /* The slow path moves from locked to waiting if necessary, then
+     * does a futex wait.  Both steps can be repeated ad nauseam,
+     * only getting out of the loop if we can have another shot at the
+     * fast path.  Once we can, get out to compute the new destination
+     * value for the fast path.
+     */
+    while ((*val & QEMU_LOCKCNT_STATE_MASK) != QEMU_LOCKCNT_STATE_FREE) {
+        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_LOCKED) {
+            int expected = *val;
+            int new = expected - QEMU_LOCKCNT_STATE_LOCKED + QEMU_LOCKCNT_STATE_WAITING;
+
+            trace_lockcnt_futex_wait_prepare(lockcnt, expected, new);
+            *val = atomic_cmpxchg(&lockcnt->count, expected, new);
+            if (*val == expected) {
+                *val = new;
+            }
+            continue;
+        }
+
+        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_WAITING) {
+            *waited = true;
+            trace_lockcnt_futex_wait(lockcnt, *val);
+            qemu_futex_wait(&lockcnt->count, *val);
+            *val = atomic_read(&lockcnt->count);
+            trace_lockcnt_futex_wait_resume(lockcnt, *val);
+            continue;
+        }
+
+        abort();
+    }
+    return false;
+}
+
+static void lockcnt_wake(QemuLockCnt *lockcnt)
+{
+    trace_lockcnt_futex_wake(lockcnt);
+    qemu_futex_wake(&lockcnt->count, 1);
+}
+
+void qemu_lockcnt_inc(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    bool waited = false;
+
+    for (;;) {
+        if (val >= QEMU_LOCKCNT_COUNT_STEP) {
+            int expected = val;
+            val = atomic_cmpxchg(&lockcnt->count, val, val + QEMU_LOCKCNT_COUNT_STEP);
+            if (val == expected) {
+                break;
+            }
+        } else {
+            /* The fast path is (0, unlocked)->(1, unlocked).  */
+            if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, QEMU_LOCKCNT_COUNT_STEP,
+                                             &waited)) {
+                break;
+            }
+        }
+    }
+
+    /* If we were woken by another thread, we should also wake one because
+     * we are effectively releasing the lock that was given to us.  This is
+     * the case where qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING
+     * in the low bits, and qemu_lockcnt_inc_and_unlock would find it and
+     * wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+void qemu_lockcnt_dec(QemuLockCnt *lockcnt)
+{
+    atomic_sub(&lockcnt->count, QEMU_LOCKCNT_COUNT_STEP);
+}
+
+/* Decrement a counter, and return locked if it is decremented to zero.
+ * If the function returns true, it is impossible for the counter to
+ * become nonzero until the next qemu_lockcnt_unlock.
+ */
+bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    for (;;) {
+        if (val >= 2 * QEMU_LOCKCNT_COUNT_STEP) {
+            int expected = val;
+            int new = val - QEMU_LOCKCNT_COUNT_STEP;
+            val = atomic_cmpxchg(&lockcnt->count, val, new);
+            if (val == expected) {
+                break;
+            }
+        }
+
+        /* If count is going 1->0, take the lock. The fast path is
+         * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
+         */
+        if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
+            return true;
+        }
+
+        if (waited) {
+            /* At this point we do not know if there are more waiters.  Assume
+             * there are.
+             */
+            locked_state = QEMU_LOCKCNT_STATE_WAITING;
+        }
+    }
+
+    /* If we were woken by another thread, but we're returning in unlocked
+     * state, we should also wake a thread because we are effectively
+     * releasing the lock that was given to us.  This is the case where
+     * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
+     * bits, and qemu_lockcnt_unlock would find it and wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+    return false;
+}
+
+/* If the counter is one, decrement it and return locked.  Otherwise do
+ * nothing.
+ *
+ * If the function returns true, it is impossible for the counter to
+ * become nonzero until the next qemu_lockcnt_unlock.
+ */
+bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    while (val < 2 * QEMU_LOCKCNT_COUNT_STEP) {
+        /* If count is going 1->0, take the lock. The fast path is
+         * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
+         */
+        if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
+            return true;
+        }
+
+        if (waited) {
+            /* At this point we do not know if there are more waiters.  Assume
+             * there are.
+             */
+            locked_state = QEMU_LOCKCNT_STATE_WAITING;
+        }
+    }
+
+    /* If we were woken by another thread, but we're returning in unlocked
+     * state, we should also wake a thread because we are effectively
+     * releasing the lock that was given to us.  This is the case where
+     * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
+     * bits, and qemu_lockcnt_inc_and_unlock would find it and wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+    return false;
+}
+
+void qemu_lockcnt_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int step = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    /* The third argument is only used if the low bits of val are 0
+     * (QEMU_LOCKCNT_STATE_FREE), so just blindly mix in the desired
+     * state.
+     */
+    while (!qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, val + step, &waited)) {
+        if (waited) {
+            /* At this point we do not know if there are more waiters.  Assume
+             * there are.
+             */
+            step = QEMU_LOCKCNT_STATE_WAITING;
+        }
+    }
+}
+
+void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt)
+{
+    int expected, new, val;
+
+    val = atomic_read(&lockcnt->count);
+    do {
+        expected = val;
+        new = (val + QEMU_LOCKCNT_COUNT_STEP) & ~QEMU_LOCKCNT_STATE_MASK;
+        trace_lockcnt_unlock_attempt(lockcnt, val, new);
+        val = atomic_cmpxchg(&lockcnt->count, val, new);
+    } while (val != expected);
+
+    trace_lockcnt_unlock_success(lockcnt, val, new);
+    if (val & QEMU_LOCKCNT_STATE_WAITING) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+void qemu_lockcnt_unlock(QemuLockCnt *lockcnt)
+{
+    int expected, new, val;
+
+    val = atomic_read(&lockcnt->count);
+    do {
+        expected = val;
+        new = val & ~QEMU_LOCKCNT_STATE_MASK;
+        trace_lockcnt_unlock_attempt(lockcnt, val, new);
+        val = atomic_cmpxchg(&lockcnt->count, val, new);
+    } while (val != expected);
+
+    trace_lockcnt_unlock_success(lockcnt, val, new);
+    if (val & QEMU_LOCKCNT_STATE_WAITING) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
+{
+    return lockcnt->count >> QEMU_LOCKCNT_COUNT_SHIFT;
+}
+#else
 void qemu_lockcnt_init(QemuLockCnt *lockcnt)
 {
     qemu_mutex_init(&lockcnt->mutex);
@@ -111,3 +392,4 @@ unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
 {
     return lockcnt->count;
 }
+#endif
diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c
index d20cdde..37cd8ba 100644
--- a/util/qemu-thread-posix.c
+++ b/util/qemu-thread-posix.c
@@ -11,10 +11,6 @@
  *
  */
 #include "qemu/osdep.h"
-#ifdef __linux__
-#include <sys/syscall.h>
-#include <linux/futex.h>
-#endif
 #include "qemu/thread.h"
 #include "qemu/atomic.h"
 #include "qemu/notify.h"
@@ -294,28 +290,9 @@ void qemu_sem_wait(QemuSemaphore *sem)
 }
 
 #ifdef __linux__
-#define futex(...)              syscall(__NR_futex, __VA_ARGS__)
-
-static inline void futex_wake(QemuEvent *ev, int n)
-{
-    futex(ev, FUTEX_WAKE, n, NULL, NULL, 0);
-}
-
-static inline void futex_wait(QemuEvent *ev, unsigned val)
-{
-    while (futex(ev, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
-        switch (errno) {
-        case EWOULDBLOCK:
-            return;
-        case EINTR:
-            break; /* get out of switch and retry */
-        default:
-            abort();
-        }
-    }
-}
+#include "qemu/futex.h"
 #else
-static inline void futex_wake(QemuEvent *ev, int n)
+static inline void qemu_futex_wake(QemuEvent *ev, int n)
 {
     pthread_mutex_lock(&ev->lock);
     if (n == 1) {
@@ -326,7 +303,7 @@ static inline void futex_wake(QemuEvent *ev, int n)
     pthread_mutex_unlock(&ev->lock);
 }
 
-static inline void futex_wait(QemuEvent *ev, unsigned val)
+static inline void qemu_futex_wait(QemuEvent *ev, unsigned val)
 {
     pthread_mutex_lock(&ev->lock);
     if (ev->value == val) {
@@ -338,7 +315,7 @@ static inline void futex_wait(QemuEvent *ev, unsigned val)
 
 /* Valid transitions:
  * - free->set, when setting the event
- * - busy->set, when setting the event, followed by futex_wake
+ * - busy->set, when setting the event, followed by qemu_futex_wake
  * - set->free, when resetting the event
  * - free->busy, when waiting
  *
@@ -381,7 +358,7 @@ void qemu_event_set(QemuEvent *ev)
     if (atomic_read(&ev->value) != EV_SET) {
         if (atomic_xchg(&ev->value, EV_SET) == EV_BUSY) {
             /* There were waiters, wake them up.  */
-            futex_wake(ev, INT_MAX);
+            qemu_futex_wake(ev, INT_MAX);
         }
     }
 }
@@ -419,7 +396,7 @@ void qemu_event_wait(QemuEvent *ev)
                 return;
             }
         }
-        futex_wait(ev, EV_BUSY);
+        qemu_futex_wait(ev, EV_BUSY);
     }
 }
 
diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c
index 728e76b..178e016 100644
--- a/util/qemu-thread-win32.c
+++ b/util/qemu-thread-win32.c
@@ -269,7 +269,7 @@ void qemu_sem_wait(QemuSemaphore *sem)
  *
  * Valid transitions:
  * - free->set, when setting the event
- * - busy->set, when setting the event, followed by futex_wake
+ * - busy->set, when setting the event, followed by SetEvent
  * - set->free, when resetting the event
  * - free->busy, when waiting
  *
diff --git a/util/trace-events b/util/trace-events
index ed06aee..2b8aa30 100644
--- a/util/trace-events
+++ b/util/trace-events
@@ -30,3 +30,13 @@ qemu_anon_ram_free(void *ptr, size_t size) "ptr %p size %zu"
 hbitmap_iter_skip_words(const void *hb, void *hbi, uint64_t pos, unsigned long cur) "hb %p hbi %p pos %"PRId64" cur 0x%lx"
 hbitmap_reset(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64
 hbitmap_set(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64
+
+# util/lockcnt.c
+lockcnt_fast_path_attempt(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d"
+lockcnt_fast_path_success(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d succeeded"
+lockcnt_unlock_attempt(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d"
+lockcnt_unlock_success(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d succeeded"
+lockcnt_futex_wait_prepare(const void *lockcnt, int expected, int new) "lockcnt %p preparing slow path %d->%d"
+lockcnt_futex_wait(const void *lockcnt, int val) "lockcnt %p waiting on %d"
+lockcnt_futex_wait_resume(const void *lockcnt, int new) "lockcnt %p after wait: %d"
+lockcnt_futex_wake(const void *lockcnt) "lockcnt %p waking up one waiter"
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 34+ messages in thread

* [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux
  2016-12-21 14:03 [Qemu-devel] [PATCH v2 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
@ 2016-12-21 14:03 ` Paolo Bonzini
  0 siblings, 0 replies; 34+ messages in thread
From: Paolo Bonzini @ 2016-12-21 14:03 UTC (permalink / raw)
  To: qemu-devel; +Cc: qemu-block

This is complex, but I think it is reasonably documented in the source.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/lockcnt.txt         |   9 +-
 include/qemu/futex.h     |  36 ++++++
 include/qemu/thread.h    |   2 +
 util/lockcnt.c           | 282 +++++++++++++++++++++++++++++++++++++++++++++++
 util/qemu-thread-posix.c |  35 +-----
 util/qemu-thread-win32.c |   2 +-
 util/trace-events        |  10 ++
 7 files changed, 341 insertions(+), 35 deletions(-)
 create mode 100644 include/qemu/futex.h

diff --git a/docs/lockcnt.txt b/docs/lockcnt.txt
index 25a8091..2a79b32 100644
--- a/docs/lockcnt.txt
+++ b/docs/lockcnt.txt
@@ -142,12 +142,11 @@ can also be more efficient in two ways:
 - it avoids taking the lock for many operations (for example
   incrementing the counter while it is non-zero);
 
-- on some platforms, one could implement QemuLockCnt to hold the
-  lock and the mutex in a single word, making it no more expensive
+- on some platforms, one can implement QemuLockCnt to hold the lock
+  and the mutex in a single word, making the fast path no more expensive
   than simply managing a counter using atomic operations (see
-  docs/atomics.txt).  This is not implemented yet, but can be
-  very helpful if concurrent access to the data structure is
-  expected to be rare.
+  docs/atomics.txt).  This can be very helpful if concurrent access to
+  the data structure is expected to be rare.
 
 
 Using the same mutex for frees and writes can still incur some small
diff --git a/include/qemu/futex.h b/include/qemu/futex.h
new file mode 100644
index 0000000..852d612
--- /dev/null
+++ b/include/qemu/futex.h
@@ -0,0 +1,36 @@
+/*
+ * Wrappers around Linux futex syscall
+ *
+ * Copyright Red Hat, Inc. 2015
+ *
+ * Author:
+ *  Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include <sys/syscall.h>
+#include <linux/futex.h>
+
+#define qemu_futex(...)              syscall(__NR_futex, __VA_ARGS__)
+
+static inline void qemu_futex_wake(void *f, int n)
+{
+    qemu_futex(f, FUTEX_WAKE, n, NULL, NULL, 0);
+}
+
+static inline void qemu_futex_wait(void *f, unsigned val)
+{
+    while (qemu_futex(f, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
+        switch (errno) {
+        case EWOULDBLOCK:
+            return;
+        case EINTR:
+            break; /* get out of switch and retry */
+        default:
+            abort();
+        }
+    }
+}
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index 7944f79..93337c4 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -100,7 +100,9 @@ static inline void qemu_spin_unlock(QemuSpin *spin)
 }
 
 struct QemuLockCnt {
+#ifndef CONFIG_LINUX
     QemuMutex mutex;
+#endif
     unsigned count;
 };
 
diff --git a/util/lockcnt.c b/util/lockcnt.c
index 78ed1e4..40cc02a 100644
--- a/util/lockcnt.c
+++ b/util/lockcnt.c
@@ -9,7 +9,288 @@
 #include "qemu/osdep.h"
 #include "qemu/thread.h"
 #include "qemu/atomic.h"
+#include "trace.h"
 
+#ifdef CONFIG_LINUX
+#include "qemu/futex.h"
+
+/* On Linux, bits 0-1 are a futex-based lock, bits 2-31 are the counter.
+ * For the mutex algorithm see Ulrich Drepper's "Futexes Are Tricky" (ok,
+ * this is not the most relaxing citation I could make...).  It is similar
+ * to mutex2 in the paper.
+ */
+
+#define QEMU_LOCKCNT_STATE_MASK    3
+#define QEMU_LOCKCNT_STATE_FREE    0
+#define QEMU_LOCKCNT_STATE_LOCKED  1
+#define QEMU_LOCKCNT_STATE_WAITING 2
+
+#define QEMU_LOCKCNT_COUNT_STEP    4
+#define QEMU_LOCKCNT_COUNT_SHIFT   2
+
+void qemu_lockcnt_init(QemuLockCnt *lockcnt)
+{
+    lockcnt->count = 0;
+}
+
+void qemu_lockcnt_destroy(QemuLockCnt *lockcnt)
+{
+}
+
+/* *val is the current value of lockcnt->count.
+ *
+ * If the lock is free, try a cmpxchg from *val to new_if_free; return
+ * true and set *val to the old value found by the cmpxchg in
+ * lockcnt->count.
+ *
+ * If the lock is taken, wait for it to be released and return false
+ * *without trying again to take the lock*.  Again, set *val to the
+ * new value of lockcnt->count.
+ *
+ * new_if_free's bottom two bits must not be QEMU_LOCKCNT_STATE_LOCKED
+ * if calling this function a second time after it has returned
+ * false.
+ */
+static bool qemu_lockcnt_cmpxchg_or_wait(QemuLockCnt *lockcnt, int *val,
+                                         int new_if_free, bool *waited)
+{
+    /* Fast path for when the lock is free.  */
+    if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_FREE) {
+        int expected = *val;
+
+        trace_lockcnt_fast_path_attempt(lockcnt, expected, new_if_free);
+        *val = atomic_cmpxchg(&lockcnt->count, expected, new_if_free);
+        if (*val == expected) {
+            trace_lockcnt_fast_path_success(lockcnt, expected, new_if_free);
+            *val = new_if_free;
+            return true;
+        }
+    }
+
+    /* The slow path moves from locked to waiting if necessary, then
+     * does a futex wait.  Both steps can be repeated ad nauseam,
+     * only getting out of the loop if we can have another shot at the
+     * fast path.  Once we can, get out to compute the new destination
+     * value for the fast path.
+     */
+    while ((*val & QEMU_LOCKCNT_STATE_MASK) != QEMU_LOCKCNT_STATE_FREE) {
+        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_LOCKED) {
+            int expected = *val;
+            int new = expected - QEMU_LOCKCNT_STATE_LOCKED + QEMU_LOCKCNT_STATE_WAITING;
+
+            trace_lockcnt_futex_wait_prepare(lockcnt, expected, new);
+            *val = atomic_cmpxchg(&lockcnt->count, expected, new);
+            if (*val == expected) {
+                *val = new;
+            }
+            continue;
+        }
+
+        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_WAITING) {
+            *waited = true;
+            trace_lockcnt_futex_wait(lockcnt, *val);
+            qemu_futex_wait(&lockcnt->count, *val);
+            *val = atomic_read(&lockcnt->count);
+            trace_lockcnt_futex_wait_resume(lockcnt, *val);
+            continue;
+        }
+
+        abort();
+    }
+    return false;
+}
+
+static void lockcnt_wake(QemuLockCnt *lockcnt)
+{
+    trace_lockcnt_futex_wake(lockcnt);
+    qemu_futex_wake(&lockcnt->count, 1);
+}
+
+void qemu_lockcnt_inc(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    bool waited = false;
+
+    for (;;) {
+        if (val >= QEMU_LOCKCNT_COUNT_STEP) {
+            int expected = val;
+            val = atomic_cmpxchg(&lockcnt->count, val, val + QEMU_LOCKCNT_COUNT_STEP);
+            if (val == expected) {
+                break;
+            }
+        } else {
+            /* The fast path is (0, unlocked)->(1, unlocked).  */
+            if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, QEMU_LOCKCNT_COUNT_STEP,
+                                             &waited)) {
+                break;
+            }
+        }
+    }
+
+    /* If we were woken by another thread, we should also wake one because
+     * we are effectively releasing the lock that was given to us.  This is
+     * the case where qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING
+     * in the low bits, and qemu_lockcnt_inc_and_unlock would find it and
+     * wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+void qemu_lockcnt_dec(QemuLockCnt *lockcnt)
+{
+    atomic_sub(&lockcnt->count, QEMU_LOCKCNT_COUNT_STEP);
+}
+
+/* Decrement a counter, and return locked if it is decremented to zero.
+ * If the function returns true, it is impossible for the counter to
+ * become nonzero until the next qemu_lockcnt_unlock.
+ */
+bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    for (;;) {
+        if (val >= 2 * QEMU_LOCKCNT_COUNT_STEP) {
+            int expected = val;
+            int new = val - QEMU_LOCKCNT_COUNT_STEP;
+            val = atomic_cmpxchg(&lockcnt->count, val, new);
+            if (val == expected) {
+                break;
+            }
+        }
+
+        /* If count is going 1->0, take the lock. The fast path is
+         * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
+         */
+        if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
+            return true;
+        }
+
+        if (waited) {
+            /* At this point we do not know if there are more waiters.  Assume
+             * there are.
+             */
+            locked_state = QEMU_LOCKCNT_STATE_WAITING;
+        }
+    }
+
+    /* If we were woken by another thread, but we're returning in unlocked
+     * state, we should also wake a thread because we are effectively
+     * releasing the lock that was given to us.  This is the case where
+     * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
+     * bits, and qemu_lockcnt_unlock would find it and wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+    return false;
+}
+
+/* If the counter is one, decrement it and return locked.  Otherwise do
+ * nothing.
+ *
+ * If the function returns true, it is impossible for the counter to
+ * become nonzero until the next qemu_lockcnt_unlock.
+ */
+bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    while (val < 2 * QEMU_LOCKCNT_COUNT_STEP) {
+        /* If count is going 1->0, take the lock. The fast path is
+         * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
+         */
+        if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
+            return true;
+        }
+
+        if (waited) {
+            /* At this point we do not know if there are more waiters.  Assume
+             * there are.
+             */
+            locked_state = QEMU_LOCKCNT_STATE_WAITING;
+        }
+    }
+
+    /* If we were woken by another thread, but we're returning in unlocked
+     * state, we should also wake a thread because we are effectively
+     * releasing the lock that was given to us.  This is the case where
+     * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
+     * bits, and qemu_lockcnt_inc_and_unlock would find it and wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+    return false;
+}
+
+void qemu_lockcnt_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int step = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    /* The third argument is only used if the low bits of val are 0
+     * (QEMU_LOCKCNT_STATE_FREE), so just blindly mix in the desired
+     * state.
+     */
+    while (!qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, val + step, &waited)) {
+        if (waited) {
+            /* At this point we do not know if there are more waiters.  Assume
+             * there are.
+             */
+            step = QEMU_LOCKCNT_STATE_WAITING;
+        }
+    }
+}
+
+void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt)
+{
+    int expected, new, val;
+
+    val = atomic_read(&lockcnt->count);
+    do {
+        expected = val;
+        new = (val + QEMU_LOCKCNT_COUNT_STEP) & ~QEMU_LOCKCNT_STATE_MASK;
+        trace_lockcnt_unlock_attempt(lockcnt, val, new);
+        val = atomic_cmpxchg(&lockcnt->count, val, new);
+    } while (val != expected);
+
+    trace_lockcnt_unlock_success(lockcnt, val, new);
+    if (val & QEMU_LOCKCNT_STATE_WAITING) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+void qemu_lockcnt_unlock(QemuLockCnt *lockcnt)
+{
+    int expected, new, val;
+
+    val = atomic_read(&lockcnt->count);
+    do {
+        expected = val;
+        new = val & ~QEMU_LOCKCNT_STATE_MASK;
+        trace_lockcnt_unlock_attempt(lockcnt, val, new);
+        val = atomic_cmpxchg(&lockcnt->count, val, new);
+    } while (val != expected);
+
+    trace_lockcnt_unlock_success(lockcnt, val, new);
+    if (val & QEMU_LOCKCNT_STATE_WAITING) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
+{
+    return lockcnt->count >> QEMU_LOCKCNT_COUNT_SHIFT;
+}
+#else
 void qemu_lockcnt_init(QemuLockCnt *lockcnt)
 {
     qemu_mutex_init(&lockcnt->mutex);
@@ -111,3 +392,4 @@ unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
 {
     return lockcnt->count;
 }
+#endif
diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c
index d20cdde..37cd8ba 100644
--- a/util/qemu-thread-posix.c
+++ b/util/qemu-thread-posix.c
@@ -11,10 +11,6 @@
  *
  */
 #include "qemu/osdep.h"
-#ifdef __linux__
-#include <sys/syscall.h>
-#include <linux/futex.h>
-#endif
 #include "qemu/thread.h"
 #include "qemu/atomic.h"
 #include "qemu/notify.h"
@@ -294,28 +290,9 @@ void qemu_sem_wait(QemuSemaphore *sem)
 }
 
 #ifdef __linux__
-#define futex(...)              syscall(__NR_futex, __VA_ARGS__)
-
-static inline void futex_wake(QemuEvent *ev, int n)
-{
-    futex(ev, FUTEX_WAKE, n, NULL, NULL, 0);
-}
-
-static inline void futex_wait(QemuEvent *ev, unsigned val)
-{
-    while (futex(ev, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
-        switch (errno) {
-        case EWOULDBLOCK:
-            return;
-        case EINTR:
-            break; /* get out of switch and retry */
-        default:
-            abort();
-        }
-    }
-}
+#include "qemu/futex.h"
 #else
-static inline void futex_wake(QemuEvent *ev, int n)
+static inline void qemu_futex_wake(QemuEvent *ev, int n)
 {
     pthread_mutex_lock(&ev->lock);
     if (n == 1) {
@@ -326,7 +303,7 @@ static inline void futex_wake(QemuEvent *ev, int n)
     pthread_mutex_unlock(&ev->lock);
 }
 
-static inline void futex_wait(QemuEvent *ev, unsigned val)
+static inline void qemu_futex_wait(QemuEvent *ev, unsigned val)
 {
     pthread_mutex_lock(&ev->lock);
     if (ev->value == val) {
@@ -338,7 +315,7 @@ static inline void futex_wait(QemuEvent *ev, unsigned val)
 
 /* Valid transitions:
  * - free->set, when setting the event
- * - busy->set, when setting the event, followed by futex_wake
+ * - busy->set, when setting the event, followed by qemu_futex_wake
  * - set->free, when resetting the event
  * - free->busy, when waiting
  *
@@ -381,7 +358,7 @@ void qemu_event_set(QemuEvent *ev)
     if (atomic_read(&ev->value) != EV_SET) {
         if (atomic_xchg(&ev->value, EV_SET) == EV_BUSY) {
             /* There were waiters, wake them up.  */
-            futex_wake(ev, INT_MAX);
+            qemu_futex_wake(ev, INT_MAX);
         }
     }
 }
@@ -419,7 +396,7 @@ void qemu_event_wait(QemuEvent *ev)
                 return;
             }
         }
-        futex_wait(ev, EV_BUSY);
+        qemu_futex_wait(ev, EV_BUSY);
     }
 }
 
diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c
index 728e76b..178e016 100644
--- a/util/qemu-thread-win32.c
+++ b/util/qemu-thread-win32.c
@@ -269,7 +269,7 @@ void qemu_sem_wait(QemuSemaphore *sem)
  *
  * Valid transitions:
  * - free->set, when setting the event
- * - busy->set, when setting the event, followed by futex_wake
+ * - busy->set, when setting the event, followed by SetEvent
  * - set->free, when resetting the event
  * - free->busy, when waiting
  *
diff --git a/util/trace-events b/util/trace-events
index ed06aee..2b8aa30 100644
--- a/util/trace-events
+++ b/util/trace-events
@@ -30,3 +30,13 @@ qemu_anon_ram_free(void *ptr, size_t size) "ptr %p size %zu"
 hbitmap_iter_skip_words(const void *hb, void *hbi, uint64_t pos, unsigned long cur) "hb %p hbi %p pos %"PRId64" cur 0x%lx"
 hbitmap_reset(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64
 hbitmap_set(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64
+
+# util/lockcnt.c
+lockcnt_fast_path_attempt(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d"
+lockcnt_fast_path_success(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d succeeded"
+lockcnt_unlock_attempt(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d"
+lockcnt_unlock_success(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d succeeded"
+lockcnt_futex_wait_prepare(const void *lockcnt, int expected, int new) "lockcnt %p preparing slow path %d->%d"
+lockcnt_futex_wait(const void *lockcnt, int val) "lockcnt %p waiting on %d"
+lockcnt_futex_wait_resume(const void *lockcnt, int new) "lockcnt %p after wait: %d"
+lockcnt_futex_wake(const void *lockcnt) "lockcnt %p waking up one waiter"
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 34+ messages in thread

end of thread, other threads:[~2017-02-02 19:20 UTC | newest]

Thread overview: 34+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-11-29 11:46 [Qemu-devel] [PATCH for-2.9 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
2016-11-29 11:46 ` [Qemu-devel] [PATCH 01/10] aio: rename bh_lock to list_lock Paolo Bonzini
2016-11-30 12:53   ` Stefan Hajnoczi
2016-11-29 11:46 ` [Qemu-devel] [PATCH 02/10] qemu-thread: introduce QemuLockCnt Paolo Bonzini
2016-11-29 19:34   ` Eric Blake
2016-11-30 13:05   ` Stefan Hajnoczi
2017-02-02 19:06   ` Emilio G. Cota
2017-02-02 19:20     ` Emilio G. Cota
2016-11-29 11:47 ` [Qemu-devel] [PATCH 03/10] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh Paolo Bonzini
2016-11-30 13:06   ` Stefan Hajnoczi
2016-11-29 11:47 ` [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini
2016-11-30 13:19   ` Stefan Hajnoczi
2016-11-29 11:47 ` [Qemu-devel] [PATCH 05/10] aio: tweak walking in dispatch phase Paolo Bonzini
2016-11-30 13:38   ` Stefan Hajnoczi
2016-11-29 11:47 ` [Qemu-devel] [PATCH 06/10] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock Paolo Bonzini
2016-11-30 13:31   ` [Qemu-devel] [Qemu-block] " Stefan Hajnoczi
2016-11-30 13:36     ` Paolo Bonzini
2016-12-01 15:32       ` [Qemu-devel] " Paolo Bonzini
2016-11-29 11:47 ` [Qemu-devel] [PATCH 07/10] aio-win32: " Paolo Bonzini
2016-11-30 13:34   ` Stefan Hajnoczi
2016-11-29 11:47 ` [Qemu-devel] [PATCH 08/10] aio: document locking Paolo Bonzini
2016-11-30 13:35   ` [Qemu-devel] [Qemu-block] " Stefan Hajnoczi
2016-11-29 11:47 ` [Qemu-devel] [PATCH 09/10] aio: push aio_context_acquire/release down to dispatching Paolo Bonzini
2016-11-30 13:37   ` [Qemu-devel] [Qemu-block] " Stefan Hajnoczi
2016-11-29 11:47 ` [Qemu-devel] [PATCH 10/10] async: optimize aio_bh_poll Paolo Bonzini
2016-11-30 13:38   ` [Qemu-devel] [Qemu-block] " Stefan Hajnoczi
2016-12-21 14:03 [Qemu-devel] [PATCH v2 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
2016-12-21 14:03 ` [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini
2017-01-04 13:26 [Qemu-devel] [PATCH v3 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
2017-01-04 13:26 ` [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini
2017-01-11 16:50   ` Stefan Hajnoczi
2017-01-11 16:52     ` Paolo Bonzini
2017-01-12 13:34   ` Fam Zheng
2017-01-12 15:40     ` Paolo Bonzini
2017-01-12 16:55 [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
2017-01-12 16:55 ` [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini
2017-01-12 18:07 [Qemu-devel] [PATCH v5 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
2017-01-12 18:07 ` [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.