All of lore.kernel.org
 help / color / mirror / Atom feed
* [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU
@ 2013-05-15 15:48 Paolo Bonzini
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 01/12] add a header file for atomic operations Paolo Bonzini
                   ` (14 more replies)
  0 siblings, 15 replies; 27+ messages in thread
From: Paolo Bonzini @ 2013-05-15 15:48 UTC (permalink / raw)
  To: qemu-devel; +Cc: Jan Kiszka, qemulist, David Gibson

Here is an RCU implementation based on liburcu.  I had posted
something like this a couple years ago, but there have been
many changes:

- more portable code

- adjusted for all the threads that we have now

- updated atomic operations for GCC 4.8

- lots of documentation

There is plenty of documentation in the patches, so I'm not being
too verbose here...

Giving a shot to rcutorture on a weak memory-model machine (PPC)
would be nice.  It's available on my github repo as branch rcu.
rcutorture works on Linux-x86.  My WINE setup has some problem with glib
right now.

Paolo

Paolo Bonzini (12):
  add a header file for atomic operations
  qemu-thread: add QemuEvent
  rcu: add rcu library
  qemu-thread: register threads with RCU
  rcu: add call_rcu
  rcu: add rcutorture
  rcu: allow nested calls to rcu_thread_offline/rcu_thread_online
  qemu-thread: report RCU quiescent states
  event loop: report RCU quiescent states
  cpus: report RCU quiescent states
  block: report RCU quiescent states
  migration: report RCU quiescent states

 aio-posix.c                 |   7 +
 aio-win32.c                 |  10 +-
 block/raw-posix.c           |   3 +
 block/raw-win32.c           |   3 +
 cpus.c                      |   3 +
 docs/atomics.txt            | 322 ++++++++++++++++++++++++++++++++
 docs/rcu.txt                | 435 ++++++++++++++++++++++++++++++++++++++++++++
 hw/9pfs/virtio-9p-synth.c   |   1 +
 hw/display/qxl.c            |   3 +-
 hw/virtio/vhost.c           |   9 +-
 include/qemu/atomic.h       | 223 +++++++++++++++++++----
 include/qemu/queue.h        |  13 ++
 include/qemu/rcu-pointer.h  | 110 +++++++++++
 include/qemu/rcu.h          | 207 +++++++++++++++++++++
 include/qemu/thread-posix.h |   8 +
 include/qemu/thread-win32.h |   4 +
 include/qemu/thread.h       |  10 +-
 kvm-all.c                   |   3 +
 libcacard/Makefile          |   3 +-
 main-loop.c                 |   5 +
 migration.c                 |   5 +-
 tests/Makefile              |   5 +-
 tests/rcutorture.c          | 387 +++++++++++++++++++++++++++++++++++++++
 tests/test-thread-pool.c    |   8 +-
 util/Makefile.objs          |   1 +
 util/qemu-thread-posix.c    | 174 +++++++++++++++++-
 util/qemu-thread-win32.c    |  44 ++++-
 util/rcu.c                  | 320 ++++++++++++++++++++++++++++++++
 28 files changed, 2270 insertions(+), 56 deletions(-)
 create mode 100644 docs/atomics.txt
 create mode 100644 docs/rcu.txt
 create mode 100644 include/qemu/rcu-pointer.h
 create mode 100644 include/qemu/rcu.h
 create mode 100644 tests/rcutorture.c
 create mode 100644 util/rcu.c

-- 
1.8.1.4

^ permalink raw reply	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 01/12] add a header file for atomic operations
  2013-05-15 15:48 [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU Paolo Bonzini
@ 2013-05-15 15:48 ` Paolo Bonzini
  2013-05-15 16:11   ` Peter Maydell
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 02/12] qemu-thread: add QemuEvent Paolo Bonzini
                   ` (13 subsequent siblings)
  14 siblings, 1 reply; 27+ messages in thread
From: Paolo Bonzini @ 2013-05-15 15:48 UTC (permalink / raw)
  To: qemu-devel; +Cc: Jan Kiszka, qemulist, David Gibson

We're already using them in several places, but __sync builtins are just
too ugly to type, and do not provide seqcst load/store operations.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/atomics.txt         | 322 +++++++++++++++++++++++++++++++++++++++++++++++
 hw/display/qxl.c         |   3 +-
 hw/virtio/vhost.c        |   9 +-
 include/qemu/atomic.h    | 223 +++++++++++++++++++++++++++-----
 migration.c              |   3 +-
 tests/test-thread-pool.c |   8 +-
 6 files changed, 524 insertions(+), 44 deletions(-)
 create mode 100644 docs/atomics.txt

diff --git a/docs/atomics.txt b/docs/atomics.txt
new file mode 100644
index 0000000..cb820d6
--- /dev/null
+++ b/docs/atomics.txt
@@ -0,0 +1,322 @@
+CPUs perform independent memory operations effectively in random order.
+but this can be a problem for CPU-CPU interaction (including interactions
+between QEMU and the guest).  Multi-threaded programs use various tools
+to instruct the compiler and the CPU to restrict the order to something
+that is consistent with the expectations of the programmer.
+
+The most basic tool is locking.  Mutexes, condition variables and
+semaphores are used in QEMU, and should be the default approach to
+synchronization.  Anything else is considerably harder, but it's
+also justified more often than one would like.  The two tools that
+are provided by qemu/atomic.h are memory barriers and atomic operations.
+
+Macros defined by qemu/atomic.h fall in three camps:
+
+- compiler barriers: barrier();
+
+- weak atomic access and manual memory barriers: atomic_read(),
+  atomic_set(), smp_rmb(), smp_wmb(), smp_mb(), smp_read_barrier_depends();
+
+- sequentially consistent atomic access: everything else.
+
+
+COMPILER MEMORY BARRIER
+=======================
+
+barrier() prevents the compiler from moving the memory accesses either
+side of it to the other side.  The compiler barrier has no direct effect
+on the CPU, which may then reorder things however it wishes.
+
+barrier() is mostly used within qemu/atomic.h itself.  On some
+architectures, CPU guarantees are strong enough that blocking compiler
+optimizations already ensures the correct order of execution.  In this
+case, qemu/atomic.h will reduce stronger memory barriers to simple
+compiler barriers.
+
+Still, barrier() can be useful when writing code that can be interrupted
+by signal handlers.
+
+
+SEQUENTIALLY CONSISTENT ATOMIC ACCESS
+=====================================
+
+Most of the operations in the qemu/atomic.h header ensure *sequential
+consistency*, where "the result of any execution is the same as if the
+operations of all the processors were executed in some sequential order,
+and the operations of each individual processor appear in this sequence
+in the order specified by its program".
+
+qemu/atomic.h provides the following set of atomic read-modify-write
+operations:
+
+    typeof(*ptr) atomic_inc(ptr)
+    typeof(*ptr) atomic_dec(ptr)
+    typeof(*ptr) atomic_add(ptr, val)
+    typeof(*ptr) atomic_sub(ptr, val)
+    typeof(*ptr) atomic_and(ptr, val)
+    typeof(*ptr) atomic_or(ptr, val)
+    typeof(*ptr) atomic_xchg(ptr, val
+    typeof(*ptr) atomic_cmpxchg(ptr, old, new)
+
+all of which return the old value of *ptr.  These operations are
+polymorphic; they operate on any type that is as wide as an int.
+
+It also provides the following two operations for sequentially consistent
+loads and stores:
+
+    typeof(*ptr) atomic_mb_read(ptr)
+    void         atomic_mb_set(ptr, val)
+
+These operations operate on any type that is as wide as an int or smaller.
+
+
+WEAK ATOMIC ACCESS AND MANUAL MEMORY BARRIERS
+=============================================
+
+Compared to sequentially consistent atomic access, programming with
+weaker consistency models can be considerably more complicated.
+In general, if the algorithm you are writing includes both writes
+and reads on the same side, it is generally simpler to use sequentially
+consistent primitives.
+
+When using this model, variables are accessed with atomic_read() and
+atomic_set(), and restrictions to the ordering of accesses is enforced
+using the smp_rmb(), smp_wmb(), smp_mb() and smp_read_barrier_depends()
+memory barriers.
+
+atomic_read() and atomic_set() prevents the compiler from using
+optimizations that might otherwise optimize accesses out of existence
+on the one hand, or that might create unsolicited accesses on the other.
+In general this should not have any effect, because the same compiler
+barriers are already implied by memory barriers.  However, it is useful
+to do so, because it tells readers which variables are shared with
+other threads, and which are local to the current thread or protected
+by other, more mundane means.
+
+Memory barriers control the order of references to shared memory.
+They come in four kinds:
+
+- smp_rmb() guarantees that all the LOAD operations specified before
+  the barrier will appear to happen before all the LOAD operations
+  specified after the barrier with respect to the other components of
+  the system.
+
+  In other words, smp_rmb() puts a partial ordering on loads, but is not
+  required to have any effect on stores.
+
+- smp_wmb() guarantees that all the STORE operations specified before
+  the barrier will appear to happen before all the STORE operations
+  specified after the barrier with respect to the other components of
+  the system.
+
+  In other words, smp_wmb() puts a partial ordering on stores, but is not
+  required to have any effect on loads.
+
+- smp_mb() guarantees that all the LOAD and STORE operations specified
+  before the barrier will appear to happen before all the LOAD and
+  STORE operations specified after the barrier with respect to the other
+  components of the system.
+
+  smp_mb() puts a partial ordering on both loads and stores.  It is
+  stronger than both a read and a write memory barrier; it implies both
+  smp_rmb() and smp_wmb(), but it also prevents STOREs coming before the
+  barrier from overtaking LOADs coming after the barrier and vice versa.
+
+- smp_read_barrier_depends() is a weaker kind of read barrier.  On
+  most processors, whenever two loads are performed such that the
+  second depends on the result of the first (e.g., the first load
+  retrieves the address to which the second load will be directed),
+  the processor will guarantee that the first LOAD will appear to happen
+  before the second with respect to the other components of the system.
+  However, this is not always true---for example, it was not true on
+  Alpha processors.  Whenever this kind of access happens to shared
+  memory (that is not protected by a lock), a read barrier is needed,
+  and smp_read_barrier_depends() can be used instead of smp_rmb().
+
+  Note that the first load really has to have a _data_ dependency and not
+  a control dependency.  If the address for the second load is dependent
+  on the first load, but the dependency is through a conditional rather
+  than actually loading the address itself, then it's a _control_
+  dependency and a full read barrier or better is required.
+
+
+This is the set of barriers that is required *between* two atomic_read()
+and atomic_set() operations to achieve sequential consistency:
+
+                    |               2nd operation             |
+                    |-----------------------------------------|
+     1st operation  | (after last) | atomic_read | atomic_set |
+     ---------------+--------------+-------------+------------|
+     (before first) |              | none        | smp_wmb()  |
+     ---------------+--------------+-------------+------------|
+     atomic_read    | smp_rmb()    | smp_rmb()*  | **         |
+     ---------------+--------------+-------------+------------|
+     atomic_set     | none         | smp_mb()*** | smp_wmb()  |
+     ---------------+--------------+-------------+------------|
+
+       * Or smp_read_barrier_depends().
+
+      ** This requires a load-store barrier.  How to achieve this varies
+         depending on the machine, but in practice smp_rmb()+smp_wmb()
+         should have the desired effect.  For example, on PowerPC the
+         lwsync instruction is a combined load-load, load-store and
+         store-store barrier.
+
+     *** This requires a store-load barrier.  On most machines, the only
+         way to achieve this is a full barrier.
+
+
+You can see that the two possible definitions of atomic_mb_read()
+and atomic_mb_set() are the following:
+
+    1) atomic_mb_read(p)   = atomic_read(p); smp_rmb()
+       atomic_mb_set(p, v) = smp_wmb(); atomic_set(p, v); smp_mb()
+
+    2) atomic_mb_read(p)   = smp_mb() atomic_read(p); smp_rmb()
+       atomic_mb_set(p, v) = smp_wmb(); atomic_set(p, v);
+
+Usually the latter is used, because smp_mb() is expensive and a program
+normally has more reads than writes.  Therefore it makes more sense to
+make atomic_mb_set() the more expensive operation.
+
+There are two common cases in which atomic_mb_read and atomic_mb_set
+generate too many memory barriers, and thus it can be useful to manually
+place barriers instead:
+
+- when a data structure has one thread that is always a writer
+  and one thread that is always a reader, manual placement of
+  memory barriers makes the write side faster.  Furthermore,
+  correctness is easy to check for in this case using the "pairing"
+  trick that is explained below:
+
+     thread 1                                thread 1
+     -------------------------               ------------------------
+     (other writes)
+                                             smp_wmb()
+     atomic_mb_set(&a, x)                    atomic_set(&a, x)
+                                             smp_wmb()
+     atomic_mb_set(&b, y)                    atomic_set(&b, y)
+
+                                       =>
+     thread 2                                thread 2
+     -------------------------               ------------------------
+     y = atomic_mb_read(&b)                  y = atomic_read(&b)
+                                             smp_rmb()
+     x = atomic_mb_read(&a)                  x = atomic_read(&a)
+                                             smp_rmb()
+
+- sometimes, a thread is accessing many variables that are otherwise
+  unrelated to each other (for example because, apart from the current
+  thread, exactly one other thread will read or write each of these
+  variables).  In this case, it is possible to "hoist" the implicit
+  barriers provided by atomic_mb_read() and atomic_mb_set() outside
+  a loop.  For example, the above definition atomic_mb_read() gives
+  the following transformation:
+
+     n = 0;                                  n = 0;
+     for (i = 0; i < 10; i++)          =>    for (i = 0; i < 10; i++)
+       n += atomic_mb_read(&a[i]);             n += atomic_read(&a[i]);
+                                             smp_rmb();
+
+  Similarly, atomic_mb_set() can be transformed as follows:
+  smp_mb():
+
+                                             smp_wmb();
+     for (i = 0; i < 10; i++)          =>    for (i = 0; i < 10; i++)
+       atomic_mb_set(&a[i], false);            atomic_set(&a[i], false);
+                                             smp_mb();
+
+
+The two tricks can be combined.  In this case, splitting a loop in
+two lets you hoist the barriers out of the loops _and_ eliminate the
+expensive smp_mb():
+
+                                             smp_wmb();
+     for (i = 0; i < 10; i++) {        =>    for (i = 0; i < 10; i++)
+       atomic_mb_set(&a[i], false);            atomic_set(&a[i], false);
+       atomic_mb_set(&b[i], false);          smb_wmb();
+     }                                       for (i = 0; i < 10; i++)
+                                               atomic_set(&a[i], false);
+                                             smp_mb();
+
+  The other thread can still use atomic_mb_read()/atomic_mb_set()
+
+
+Memory barrier pairing
+----------------------
+
+A useful rule of thumb is that memory barriers should always, or almost
+always, be paired with another barrier.  In the case of QEMU, however,
+note that the other barrier may actually be in a driver that runs in
+the guest!
+
+For the purposes of pairing, smp_read_barrier_depends() and smp_rmb()
+both count as read barriers.  A read barriers shall pair with a write
+barrier or a full barrier; a write barrier shall pair with a read
+barrier or a full barrier.  A full barrier can pair with anything.
+For example:
+
+        thread 1             thread 2
+        ===============      ===============
+        a = 1;
+        smp_wmb();
+        b = 2;               x = b;
+                             smp_rmb();
+                             y = a;
+
+Note that the "writing" thread are accessing the variables in the
+opposite order as the "reading" thread.  This is expected: stores
+before the write barrier will normally match the loads after the
+read barrier, and vice versa.  The same is true for more than 2
+access and for data dependency barriers:
+
+        thread 1             thread 2
+        ===============      ===============
+        b[2] = 1;
+        smp_wmb();
+        x->i = 2;
+        smp_wmb();
+        a = x;               x = a;
+                             smp_read_barrier_depends();
+                             y = x->i;
+                             smp_read_barrier_depends();
+                             z = b[y];
+
+smp_wmb() also pairs with atomic_mb_read(), and smp_rmb() also pairs
+with atomic_mb_set().
+
+
+COMPARISON WITH LINUX KERNEL MEMORY BARRIERS
+============================================
+
+Here is a list of differences between Linux kernel atomic operations
+and memory barriers, and the equivalents in QEMU:
+
+- atomic operations in Linux are always on a 32-bit int type and
+  use a boxed atomic_t type; atomic operations in QEMU are polymorphic
+  and use normal C types.
+
+- atomic_read and atomic_set in Linux give no guarantee at all;
+  atomic_read and atomic_set in QEMU include a compiler barrier
+  (similar to the ACCESS_ONCE macro in Linux).
+
+- most atomic read-modify-write operations in Linux return void;
+  in QEMU, all of them return the old value of the variable.
+
+- different atomic read-modify-write operations in Linux imply
+  a different set of memory barriers; in QEMU, all of them enforce
+  sequential consistency, which means they imply full memory barriers
+  before and after the operation.
+
+- Linux does not have an equivalent of atomic_mb_read() and
+  atomic_mb_set().  In particular, note that set_mb() is a little
+  weaker than atomic_mb_set().
+
+
+SOURCES
+=======
+
+* Documentation/memory-barriers.txt from the Linux kernel
+
+* "The JSR-133 Cookbook for Compiler Writers", available at
+  http://g.oswego.edu/dl/jmm/cookbook.html
diff --git a/hw/display/qxl.c b/hw/display/qxl.c
index c475cb1..2b10d3d 100644
--- a/hw/display/qxl.c
+++ b/hw/display/qxl.c
@@ -23,6 +23,7 @@
 #include "qemu-common.h"
 #include "qemu/timer.h"
 #include "qemu/queue.h"
+#include "qemu/atomic.h"
 #include "monitor/monitor.h"
 #include "sysemu/sysemu.h"
 #include "trace.h"
@@ -1725,7 +1726,7 @@ static void qxl_send_events(PCIQXLDevice *d, uint32_t events)
         trace_qxl_send_events_vm_stopped(d->id, events);
         return;
     }
-    old_pending = __sync_fetch_and_or(&d->ram->int_pending, le_events);
+    old_pending = atomic_or(&d->ram->int_pending, le_events);
     if ((old_pending & le_events) == le_events) {
         return;
     }
diff --git a/hw/virtio/vhost.c b/hw/virtio/vhost.c
index fbabf99..28abe1e 100644
--- a/hw/virtio/vhost.c
+++ b/hw/virtio/vhost.c
@@ -16,6 +16,7 @@
 #include <sys/ioctl.h>
 #include "hw/virtio/vhost.h"
 #include "hw/hw.h"
+#include "qemu/atomic.h"
 #include "qemu/range.h"
 #include <linux/vhost.h>
 #include "exec/address-spaces.h"
@@ -47,11 +48,9 @@ static void vhost_dev_sync_region(struct vhost_dev *dev,
             addr += VHOST_LOG_CHUNK;
             continue;
         }
-        /* Data must be read atomically. We don't really
-         * need the barrier semantics of __sync
-         * builtins, but it's easier to use them than
-         * roll our own. */
-        log = __sync_fetch_and_and(from, 0);
+        /* Data must be read atomically. We don't really need barrier semantics
+         * but it's easier to use atomic_* than roll our own. */
+        log = atomic_xchg(from, 0);
         while ((bit = sizeof(log) > sizeof(int) ?
                 ffsll(log) : ffs(log))) {
             hwaddr page_addr;
diff --git a/include/qemu/atomic.h b/include/qemu/atomic.h
index 10becb6..8e3ab1e 100644
--- a/include/qemu/atomic.h
+++ b/include/qemu/atomic.h
@@ -1,68 +1,227 @@
-#ifndef __QEMU_BARRIER_H
-#define __QEMU_BARRIER_H 1
+/*
+ * Simple interface for atomic operations.
+ *
+ * Copyright (C) 2013 Red Hat, Inc.
+ *
+ * Author: Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
 
-/* Compiler barrier */
-#define barrier()   asm volatile("" ::: "memory")
+#ifndef __QEMU_ATOMIC_H
+#define __QEMU_ATOMIC_H 1
 
-#if defined(__i386__)
+#include "qemu/compiler.h"
 
-#include "qemu/compiler.h"        /* QEMU_GNUC_PREREQ */
+/* For C11 atomic ops */
 
-/*
- * Because of the strongly ordered x86 storage model, wmb() and rmb() are nops
- * on x86(well, a compiler barrier only).  Well, at least as long as
- * qemu doesn't do accesses to write-combining memory or non-temporal
- * load/stores from C code.
- */
-#define smp_wmb()   barrier()
-#define smp_rmb()   barrier()
+#if QEMU_GNUC_PREREQ(4, 8)
+#ifndef __ATOMIC_RELAXED
+#define __ATOMIC_RELAXED 0
+#endif
+#ifndef __ATOMIC_CONSUME
+#define __ATOMIC_CONSUME 1
+#endif
+#ifndef __ATOMIC_ACQUIRE
+#define __ATOMIC_ACQUIRE 2
+#endif
+#ifndef __ATOMIC_RELEASE
+#define __ATOMIC_RELEASE 3
+#endif
+#ifndef __ATOMIC_ACQ_REL
+#define __ATOMIC_ACQ_REL 4
+#endif
+#ifndef __ATOMIC_SEQ_CST
+#define __ATOMIC_SEQ_CST 5
+#endif
+#endif
+
+
+/* Compiler barrier */
+#define barrier()   ({ asm volatile("" ::: "memory"); (void)0; })
+
+#if !QEMU_GNUC_PREREQ(4, 8)
 
 /*
- * We use GCC builtin if it's available, as that can use
- * mfence on 32 bit as well, e.g. if built with -march=pentium-m.
- * However, on i386, there seem to be known bugs as recently as 4.3.
- * */
-#if QEMU_GNUC_PREREQ(4, 4)
-#define smp_mb() __sync_synchronize()
+ * We use GCC builtin if it's available, as that can use mfence on
+ * 32-bit as well, e.g. if built with -march=pentium-m. However, on
+ * i386 the spec is buggy, and the implementation followed it until
+ * 4.3 (http://gcc.gnu.org/bugzilla/show_bug.cgi?id=36793).
+ */
+#if defined(__i386__) || defined(__x86_64__)
+#if !QEMU_GNUC_PREREQ(4, 4)
+#if defined __x86_64__
+#define smp_mb()    ({ asm volatile("mfence" ::: "memory"); (void)0; })
 #else
-#define smp_mb() asm volatile("lock; addl $0,0(%%esp) " ::: "memory")
+#define smp_mb()    ({ asm volatile("lock; addl $0,0(%%esp) " ::: "memory"); (void)0; })
+#endif
+#endif
+#endif
+
+
+#ifdef __alpha__
+#define smp_read_barrier_depends()   asm volatile("mb":::"memory")
 #endif
 
-#elif defined(__x86_64__)
+#if defined(__i386__) || defined(__x86_64__) || defined(__s390x__)
 
+/*
+ * Because of the strongly ordered storage model, wmb() and rmb() are nops
+ * here (a compiler barrier only).  QEMU doesn't do accesses to write-combining
+ * qemu memory or non-temporal load/stores from C code.
+ */
 #define smp_wmb()   barrier()
 #define smp_rmb()   barrier()
-#define smp_mb() asm volatile("mfence" ::: "memory")
+
+/*
+ * __sync_lock_test_and_set() is documented to be an acquire barrier only,
+ * but it is a full barrier at the hardware level.  Add a compiler barrier
+ * to make it a full barrier also at the compiler level.
+ */
+#define atomic_xchg(ptr, i)    (barrier(), __sync_lock_test_and_set(ptr, i))
+
+/*
+ * Load/store with Java volatile semantics.
+ */
+#define atomic_mb_set(ptr, i)  ((void)atomic_xchg(ptr, i))
 
 #elif defined(_ARCH_PPC)
 
 /*
  * We use an eieio() for wmb() on powerpc.  This assumes we don't
  * need to order cacheable and non-cacheable stores with respect to
- * each other
+ * each other.
+ *
+ * smp_mb has the same problem as on x86 for not-very-new GCC
+ * (http://patchwork.ozlabs.org/patch/126184/, Nov 2011).
  */
-#define smp_wmb()   asm volatile("eieio" ::: "memory")
-
+#define smp_wmb()   ({ asm volatile("eieio" ::: "memory"); (void)0; })
 #if defined(__powerpc64__)
-#define smp_rmb()   asm volatile("lwsync" ::: "memory")
+#define smp_rmb()   ({ asm volatile("lwsync" ::: "memory"); (void)0; })
 #else
-#define smp_rmb()   asm volatile("sync" ::: "memory")
+#define smp_rmb()   ({ asm volatile("sync" ::: "memory"); (void)0; })
 #endif
+#define smp_mb()    ({ asm volatile("sync" ::: "memory"); (void)0; })
 
-#define smp_mb()   asm volatile("sync" ::: "memory")
+#endif /* _ARCH_PPC */
 
-#else
+#endif /* GCC 4.8 */
 
 /*
  * For (host) platforms we don't have explicit barrier definitions
  * for, we use the gcc __sync_synchronize() primitive to generate a
  * full barrier.  This should be safe on all platforms, though it may
- * be overkill for wmb() and rmb().
+ * be overkill for smp_wmb() and smp_rmb().
  */
+#ifndef smp_mb
+#define smp_mb()    __sync_synchronize()
+#endif
+
+#ifndef smp_wmb
+#if QEMU_GNUC_PREREQ(4, 8)
+#define smp_wmb()   __atomic_thread_fence(__ATOMIC_RELEASE)
+#else
 #define smp_wmb()   __sync_synchronize()
-#define smp_mb()   __sync_synchronize()
+#endif
+#endif
+
+#ifndef smp_rmb
+#if QEMU_GNUC_PREREQ(4, 8)
+#define smp_rmb()   __atomic_thread_fence(__ATOMIC_ACQUIRE)
+#else
 #define smp_rmb()   __sync_synchronize()
+#endif
+#endif
+
+#ifndef smp_read_barrier_depends
+#if QEMU_GNUC_PREREQ(4, 8)
+#define smp_read_barrier_depends()   __atomic_thread_fence(__ATOMIC_CONSUME)
+#else
+#define smp_read_barrier_depends()   barrier()
+#endif
+#endif
 
+#ifndef atomic_read
+#define atomic_read(ptr)       (*(__typeof__(*ptr) *volatile) (ptr))
 #endif
 
+#ifndef atomic_set
+#define atomic_set(ptr, i)     ((*(__typeof__(*ptr) *volatile) (ptr)) = (i))
+#endif
+
+/* These have the same semantics as Java volatile variables.
+ * See http://gee.cs.oswego.edu/dl/jmm/cookbook.html:
+ * "1. Issue a StoreStore barrier (wmb) before each volatile store."
+ *  2. Issue a StoreLoad barrier after each volatile store.
+ *     Note that you could instead issue one before each volatile load, but
+ *     this would be slower for typical programs using volatiles in which
+ *     reads greatly outnumber writes. Alternatively, if available, you
+ *     can implement volatile store as an atomic instruction (for example
+ *     XCHG on x86) and omit the barrier. This may be more efficient if
+ *     atomic instructions are cheaper than StoreLoad barriers.
+ *  3. Issue LoadLoad and LoadStore barriers after each volatile load."
+ *
+ * If you prefer to think in terms of "pairing" of memory barriers,
+ * an atomic_mb_read pairs with an atomic_mb_set.
+ *
+ * And for the few ia64 lovers that exist, an atomic_mb_read is a ld.acq,
+ * while an atomic_mb_set is a st.rel followed by a memory barrier.
+ */
+#ifndef atomic_mb_read
+#if QEMU_GNUC_PREREQ(4, 8)
+#define atomic_mb_read(ptr)       ({             \
+    typeof(*ptr) _val;                           \
+    __atomic_load(ptr, &_val, __ATOMIC_SEQ_CST); \
+    _val;                                        \
+})
+#else
+#define atomic_mb_read(ptr)    ({           \
+    typeof(*ptr) _val = atomic_read(ptr);   \
+    smp_rmb();                              \
+    _val;                                   \
+})
+#endif
+#endif
+
+#ifndef atomic_mb_set
+#if QEMU_GNUC_PREREQ(4, 8)
+#define atomic_mb_set(ptr, i)    do {             \
+    typeof(*ptr) _val = i;                        \
+    __atomic_store(ptr, &_val, __ATOMIC_SEQ_CST); \
+} while(0)
+#else
+#define atomic_mb_set(ptr, i)  do {         \
+    smp_wmb();                              \
+    atomic_set(ptr, i);                     \
+    smp_mb();                               \
+} while (0)
+#endif
+#endif
+
+#ifndef atomic_xchg
+#ifdef __clang__
+#define atomic_xchg(ptr, i)    __sync_exchange(ptr, i)
+#elif QEMU_GNUC_PREREQ(4, 8)
+#define atomic_xchg(ptr, i)    ({                           \
+    typeof(*ptr) _new = (i), _old;                          \
+    __atomic_exchange(ptr, &_new, &_old, __ATOMIC_SEQ_CST); \
+    _old;                                                   \
+})
+#else
+/* __sync_lock_test_and_set() is documented to be an acquire barrier only.  */
+#define atomic_xchg(ptr, i)    (smp_mb(), __sync_lock_test_and_set(ptr, i))
+#endif
+#endif
+
+/* Provide shorter names for GCC atomic builtins.  */
+#define atomic_inc(ptr)        __sync_fetch_and_add(ptr, 1)
+#define atomic_dec(ptr)        __sync_fetch_and_add(ptr, -1)
+#define atomic_add             __sync_fetch_and_add
+#define atomic_sub             __sync_fetch_and_sub
+#define atomic_and             __sync_fetch_and_and
+#define atomic_or              __sync_fetch_and_or
+#define atomic_cmpxchg         __sync_val_compare_and_swap
+
 #endif
diff --git a/migration.c b/migration.c
index 3eb0fad..866d68d 100644
--- a/migration.c
+++ b/migration.c
@@ -290,8 +290,7 @@ static void migrate_fd_cleanup(void *opaque)
 
 static void migrate_finish_set_state(MigrationState *s, int new_state)
 {
-    if (__sync_val_compare_and_swap(&s->state, MIG_STATE_ACTIVE,
-                                    new_state) == new_state) {
+    if (atomic_cmpxchg(&s->state, MIG_STATE_ACTIVE, new_state) == new_state) {
         trace_migrate_set_state(new_state);
     }
 }
diff --git a/tests/test-thread-pool.c b/tests/test-thread-pool.c
index 22915aa..90d1542 100644
--- a/tests/test-thread-pool.c
+++ b/tests/test-thread-pool.c
@@ -17,15 +17,15 @@ typedef struct {
 static int worker_cb(void *opaque)
 {
     WorkerTestData *data = opaque;
-    return __sync_fetch_and_add(&data->n, 1);
+    return atomic_inc(&data->n, 1);
 }
 
 static int long_cb(void *opaque)
 {
     WorkerTestData *data = opaque;
-    __sync_fetch_and_add(&data->n, 1);
+    atomic_inc(&data->n, 1);
     g_usleep(2000000);
-    __sync_fetch_and_add(&data->n, 1);
+    atomic_inc(&data->n, 1);
     return 0;
 }
 
@@ -169,7 +169,7 @@ static void test_cancel(void)
     /* Cancel the jobs that haven't been started yet.  */
     num_canceled = 0;
     for (i = 0; i < 100; i++) {
-        if (__sync_val_compare_and_swap(&data[i].n, 0, 3) == 0) {
+        if (atomic_cmpxchg(&data[i].n, 0, 3) == 0) {
             data[i].ret = -ECANCELED;
             bdrv_aio_cancel(data[i].aiocb);
             active--;
-- 
1.8.1.4

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 02/12] qemu-thread: add QemuEvent
  2013-05-15 15:48 [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU Paolo Bonzini
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 01/12] add a header file for atomic operations Paolo Bonzini
@ 2013-05-15 15:48 ` Paolo Bonzini
  2013-05-16 10:15   ` Stefan Hajnoczi
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 03/12] rcu: add rcu library Paolo Bonzini
                   ` (12 subsequent siblings)
  14 siblings, 1 reply; 27+ messages in thread
From: Paolo Bonzini @ 2013-05-15 15:48 UTC (permalink / raw)
  To: qemu-devel; +Cc: Jan Kiszka, qemulist, David Gibson

This emulates Win32 manual-reset events using futexes or conditional
variables.  Typical ways to use them are with multi-producer,
single-consumer data structures, to test for a complex condition whose
elements come from different threads:

    for (;;) {
        qemu_event_reset(ev);
        ... test complex condition ...
        if (condition is true) {
            break;
        }
        qemu_event_wait(ev);
    }

Alternatively:

    ... compute condition ...
    if (condition) {
        do {
            qemu_event_wait(ev);
            qemu_event_reset(ev);
            ... compute condition ...
        } while(condition);
        qemu_event_set(ev);
    }

QemuEvent provides a very fast userspace path in the common case when
no other thread is waiting, or the event is not changing state.  It
is used to report RCU quiescent states to the thread calling
synchronize_rcu (the latter being the single consumer), and to report
call_rcu invocations to the thread that receives them.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 include/qemu/thread-posix.h |   8 +++
 include/qemu/thread-win32.h |   4 ++
 include/qemu/thread.h       |   7 +++
 util/qemu-thread-posix.c    | 116 ++++++++++++++++++++++++++++++++++++++++++++
 util/qemu-thread-win32.c    |  26 ++++++++++
 5 files changed, 161 insertions(+)

diff --git a/include/qemu/thread-posix.h b/include/qemu/thread-posix.h
index 0f30dcc..916b2a7 100644
--- a/include/qemu/thread-posix.h
+++ b/include/qemu/thread-posix.h
@@ -21,6 +21,14 @@ struct QemuSemaphore {
 #endif
 };
 
+struct QemuEvent {
+#ifndef __linux__
+    pthread_mutex_t lock;
+    pthread_cond_t cond;
+#endif
+    unsigned value;
+};
+
 struct QemuThread {
     pthread_t thread;
 };
diff --git a/include/qemu/thread-win32.h b/include/qemu/thread-win32.h
index 13adb95..3d58081 100644
--- a/include/qemu/thread-win32.h
+++ b/include/qemu/thread-win32.h
@@ -17,6 +17,10 @@ struct QemuSemaphore {
     HANDLE sema;
 };
 
+struct QemuEvent {
+    HANDLE event;
+};
+
 typedef struct QemuThreadData QemuThreadData;
 struct QemuThread {
     QemuThreadData *data;
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index c02404b..3e32c65 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -7,6 +7,7 @@
 typedef struct QemuMutex QemuMutex;
 typedef struct QemuCond QemuCond;
 typedef struct QemuSemaphore QemuSemaphore;
+typedef struct QemuEvent QemuEvent;
 typedef struct QemuThread QemuThread;
 
 #ifdef _WIN32
@@ -45,6 +46,12 @@ void qemu_sem_wait(QemuSemaphore *sem);
 int qemu_sem_timedwait(QemuSemaphore *sem, int ms);
 void qemu_sem_destroy(QemuSemaphore *sem);
 
+void qemu_event_init(QemuEvent *ev, bool init);
+void qemu_event_set(QemuEvent *ev);
+void qemu_event_reset(QemuEvent *ev);
+void qemu_event_wait(QemuEvent *ev);
+void qemu_event_destroy(QemuEvent *ev);
+
 void qemu_thread_create(QemuThread *thread,
                         void *(*start_routine)(void *),
                         void *arg, int mode);
diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c
index 4489abf..8178f9b 100644
--- a/util/qemu-thread-posix.c
+++ b/util/qemu-thread-posix.c
@@ -20,7 +20,12 @@
 #include <limits.h>
 #include <unistd.h>
 #include <sys/time.h>
+#ifdef __linux__
+#include <sys/syscall.h>
+#include <linux/futex.h>
+#endif
 #include "qemu/thread.h"
+#include "qemu/atomic.h"
 
 static void error_exit(int err, const char *msg)
 {
@@ -268,6 +273,117 @@ void qemu_sem_wait(QemuSemaphore *sem)
 #endif
 }
 
+#ifdef __linux__
+#define futex(...)              syscall(__NR_futex, __VA_ARGS__)
+
+static inline void futex_wake(QemuEvent *ev, int n)
+{
+    futex(ev, FUTEX_WAKE, n, NULL, NULL, 0);
+}
+
+static inline void futex_wait(QemuEvent *ev, unsigned val)
+{
+    futex(ev, FUTEX_WAIT, (int) val, NULL, NULL, 0);
+}
+#else
+static inline void futex_wake(QemuEvent *ev, int n)
+{
+    if (n == 1) {
+        pthread_cond_signal(&ev->cond);
+    } else {
+        pthread_cond_broadcast(&ev->cond);
+    }
+}
+
+static inline void futex_wait(QemuEvent *ev, unsigned val)
+{
+    pthread_mutex_lock(&ev->lock);
+    if (ev->value == val) {
+        pthread_cond_wait(&ev->cond, &ev->lock);
+    }
+    pthread_mutex_unlock(&ev->lock);
+}
+#endif
+
+/* Valid transitions:
+ * - free->set, when setting the event
+ * - busy->set, when setting the event, followed by futex_wake
+ * - set->free, when resetting the event
+ * - free->busy, when waiting
+ *
+ * set->busy does not happen (it can be observed from the outside but
+ * it really is set->free->busy).
+ *
+ * busy->free provably cannot happen; to enforce it, the set->free transition
+ * is done with an OR, which becomes a no-op if the event has concurrently
+ * transitioned to free or busy.
+ */
+
+#define EV_SET         0
+#define EV_FREE        1
+#define EV_BUSY       -1
+
+void qemu_event_init(QemuEvent *ev, bool init)
+{
+#ifndef __linux__
+    pthread_mutex_init(&ev->lock, NULL);
+    pthread_cond_init(&ev->cond, NULL);
+#endif
+
+    ev->value = (init ? EV_SET : EV_FREE);
+}
+
+void qemu_event_destroy(QemuEvent *ev)
+{
+#ifndef __linux__
+    pthread_mutex_destroy(&ev->lock);
+    pthread_cond_destroy(&ev->cond);
+#endif
+}
+
+void qemu_event_set(QemuEvent *ev)
+{
+    if (atomic_mb_read(&ev->value) != EV_SET) {
+        if (atomic_xchg(&ev->value, EV_SET) == EV_BUSY) {
+            /* There were waiters, wake them up.  */
+            futex_wake(ev, INT_MAX);
+        }
+    }
+}
+
+void qemu_event_reset(QemuEvent *ev)
+{
+    if (atomic_mb_read(&ev->value) == EV_SET) {
+        /*
+         * If there was a concurrent reset (or even reset+wait),
+         * do nothing.  Otherwise change EV_SET->EV_FREE.
+         */
+        atomic_or(&ev->value, EV_FREE);
+    }
+}
+
+void qemu_event_wait(QemuEvent *ev)
+{
+    unsigned value;
+
+    value = atomic_mb_read(&ev->value);
+    if (value != EV_SET) {
+        if (value == EV_FREE) {
+            /*
+             * Leave the event reset and tell qemu_event_set that there
+             * are waiters.  No need to retry, because there cannot be
+             * a concurent busy->free transition.  After the CAS, the
+             * event will be either set or busy.
+             */
+            if (atomic_cmpxchg(&ev->value, EV_FREE, EV_BUSY) == EV_SET) {
+                return;
+            }
+        }
+        futex_wait(ev, EV_BUSY);
+    }
+}
+
+
 void qemu_thread_create(QemuThread *thread,
                        void *(*start_routine)(void*),
                        void *arg, int mode)
diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c
index 517878d..27a5217 100644
--- a/util/qemu-thread-win32.c
+++ b/util/qemu-thread-win32.c
@@ -227,6 +227,32 @@ void qemu_sem_wait(QemuSemaphore *sem)
     }
 }
 
+void qemu_event_init(QemuEvent *ev, bool init)
+{
+    /* Manual reset.  */
+    ev->event = CreateEvent(NULL, TRUE, init, NULL);
+}
+
+void qemu_event_destroy(QemuEvent *ev)
+{
+    CloseHandle(ev->event);
+}
+
+void qemu_event_set(QemuEvent *ev)
+{
+    SetEvent(ev->event);
+}
+
+void qemu_event_reset(QemuEvent *ev)
+{
+    ResetEvent(ev->event);
+}
+
+void qemu_event_wait(QemuEvent *ev)
+{
+    WaitForSingleObject(ev->event, INFINITE);
+}
+
 struct QemuThreadData {
     /* Passed to win32_start_routine.  */
     void             *(*start_routine)(void *);
-- 
1.8.1.4

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 03/12] rcu: add rcu library
  2013-05-15 15:48 [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU Paolo Bonzini
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 01/12] add a header file for atomic operations Paolo Bonzini
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 02/12] qemu-thread: add QemuEvent Paolo Bonzini
@ 2013-05-15 15:48 ` Paolo Bonzini
  2013-05-16 11:46   ` Stefan Hajnoczi
  2013-05-17  4:36   ` liu ping fan
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 04/12] qemu-thread: register threads with RCU Paolo Bonzini
                   ` (11 subsequent siblings)
  14 siblings, 2 replies; 27+ messages in thread
From: Paolo Bonzini @ 2013-05-15 15:48 UTC (permalink / raw)
  To: qemu-devel; +Cc: Jan Kiszka, qemulist, David Gibson

This includes a (mangled) copy of the urcu-qsbr code from liburcu.
The main changes are: 1) removing dependencies on many other header files
in liburcu; 2) removing for simplicity the tentative busy waiting in
synchronize_rcu, which has limited performance effects; 3) replacing
futexes in synchronize_rcu with QemuEvents for Win32 portability.
The API is the same as liburcu, so it should be possible in the future
to require liburcu on POSIX systems for example and use our copy only
on Windows.

Among the various versions available I chose urcu-qsbr, which has the
fastest rcu_read_{lock,unlock} but requires the program to manually
annotate quiescent points or intervals.  QEMU threads usually have easily
identified quiescent periods, so this should not be a problem.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/rcu.txt               | 301 +++++++++++++++++++++++++++++++++++++++++++++
 hw/9pfs/virtio-9p-synth.c  |   1 +
 include/qemu/queue.h       |  13 ++
 include/qemu/rcu-pointer.h | 110 +++++++++++++++++
 include/qemu/rcu.h         | 168 +++++++++++++++++++++++++
 include/qemu/thread.h      |   3 -
 libcacard/Makefile         |   3 +-
 util/Makefile.objs         |   1 +
 util/rcu.c                 | 203 ++++++++++++++++++++++++++++++
 9 files changed, 799 insertions(+), 4 deletions(-)
 create mode 100644 docs/rcu.txt
 create mode 100644 include/qemu/rcu-pointer.h
 create mode 100644 include/qemu/rcu.h
 create mode 100644 util/rcu.c

diff --git a/docs/rcu.txt b/docs/rcu.txt
new file mode 100644
index 0000000..19e4840
--- /dev/null
+++ b/docs/rcu.txt
@@ -0,0 +1,301 @@
+Using RCU (Read-Copy-Update) for synchronization
+================================================
+
+Read-copy update (RCU) is a synchronization mechanism that is used to
+protect read-mostly data structures.  RCU is very efficient and scalable
+on the read side (it is wait-free), and thus can make the read paths
+extremely fast.
+
+RCU supports concurrency between a single writer and multiple readers,
+thus it is not used alone.  Typically, the write-side will use a lock to
+serialize multiple updates, but other approaches are possible (e.g.,
+restricting updates to a single task).  In QEMU, when a lock is used,
+this will often be the "iothread mutex", also known as the "big QEMU
+lock" (BQL).  Also, restricting updates to a single task is done in
+QEMU using the "bottom half" API.
+
+RCU is fundamentally a "wait-to-finish" mechanism.  The read side marks
+sections of code with "critical sections", and the update side will wait
+for the execution of all *currently running* critical sections before
+proceeding, or before asynchronously executing a callback.
+
+The key point here is that only the currently running critical sections
+are waited for; critical sections that are started _after_ the beginning
+of the wait do not extend the wait, despite running concurrently with
+the updater.  This is the reason why RCU is more scalable than,
+for example, reader-writer locks.  It is so much more scalable that
+the system will have a single instance of the RCU mechanism; a single
+mechanism can be used for an arbitrary number of "things", without
+having to worry about things such as contention or deadlocks.
+
+How is this possible?  The basic idea is to split updates in two phases,
+"removal" and "reclamation".  During removal, we ensure that subsequent
+readers will not be able to get a reference to the old data.  After
+removal has completed, a critical section will not be able to access
+the old data.  Therefore, critical sections that begin after removal
+do not matter; as soon as all previous critical sections have finished,
+there cannot be any readers who hold references to the data structure,
+which may not be safely reclaimed (e.g., freed or unref'ed).
+
+Here is a picutre:
+
+        thread 1                  thread 2                  thread 3
+    -------------------    ------------------------    -------------------
+    enter RCU crit.sec.
+           |                finish removal phase
+           |                begin wait
+           |                      |                    enter RCU crit.sec.
+    exit RCU crit.sec             |                           |
+                            complete wait                     |
+                            begin reclamation phase           |
+                                                       exit RCU crit.sec.
+
+
+Note how thread 3 is still executing its critical section when thread 2
+starts reclaiming data.  This is possible, because the old version of the
+data structure was not accessible at the time thread 3 began executing
+that critical section.
+
+
+RCU API
+=======
+
+The core RCU API is small:
+
+     void rcu_read_lock(void);
+
+        Used by a reader to inform the reclaimer that the reader is
+        entering an RCU read-side critical section.
+
+     void rcu_read_unlock(void);
+
+        Used by a reader to inform the reclaimer that the reader is
+        exiting an RCU read-side critical section.  Note that RCU
+        read-side critical sections may be nested and/or overlapping.
+
+     void synchronize_rcu(void);
+
+        Blocks until all pre-existing RCU read-side critical sections
+        on all threads have completed.  This marks the end of the removal
+        phase and the beginning of reclamation phase.
+
+        Note that it would be valid for another update to come while
+        synchronize_rcu is running.  Because of this, it is better that
+        the updater releases any locks it may hold before calling
+        synchronize_rcu.
+
+     typeof(*p) rcu_dereference(p);
+     typeof(p) rcu_assign_pointer(p, typeof(p) v);
+
+        These macros are similar to atomic_mb_read() and atomic_mb_set()
+        respectively.  However, they make some assumptions on the code
+        that calls them, which allows a more optimized implementation.
+
+        rcu_assign_pointer assumes that the update side is not going
+        to read from the data structure after "publishing" the new
+        values; that is, it assumes that all assignments happen at
+        the very end of the removal phase.
+
+        rcu_dereference assumes that whenever a single RCU critical
+        section reads multiple shared data, these reads are either
+        data-dependent or need no ordering.  This is almost always the
+        case when using RCU.  If this were not the case, you can use
+        atomic_mb_read() or smp_rmb().
+
+        If you are going to be fetching multiple fields from the
+        RCU-protected structure, repeated rcu_dereference() calls
+        would look ugly and incur unnecessary overhead on Alpha CPUs.
+        You can then do this:
+
+        p = &rcu_dereference(head);
+        foo = head->foo;
+        bar = head->bar;
+
+
+RCU QUIESCENT STATES
+====================
+
+An efficient implementation of rcu_read_lock() and rcu_read_unlock()
+relies on the availability of fast thread-local storage.  Unfortunately,
+this is not possible on all the systems supported by QEMU (in particular
+on many POSIX systems other than Linux and Solaris).
+
+For this reason, QEMU's RCU implementation resorts to manual annotation
+of "quiescent states", i.e. points where no RCU read-side critical
+section can be active.  All threads that participate in the RCU mechanism
+need to annotate such points.
+
+Marking quiescent states is done with the following three APIs:
+
+     void rcu_quiescent_state(void);
+
+        Marks a point in the execution of the current thread where no
+        RCU read-side critical section can be active.
+
+     void rcu_thread_offline(void);
+
+        Marks the beginning of an "extended quiescent state" for the
+        current thread, i.e. an interval of time during which no
+        RCU read-side critical section can be active.
+
+     void rcu_thread_online(void);
+
+        Marks the end of an extended quiescent state for the current
+        thread.
+
+
+Furthermore, threads that participate in the RCU mechanism must communicate
+this fact using the following APIs:
+
+     void rcu_register_thread(void);
+
+        Mark a thread as taking part in the RCU mechanism.  Such a thread
+        will have to report quiescent points regularly, either manually
+        or through the QemuCond/QemuSemaphore/QemuEvent APIs.
+
+     void rcu_unregister_thread(void);
+
+        Mark a thread as not taking part anymore in the RCU mechanism.
+        It is not a problem if such a thread reports quiescent points,
+        either manually or by using the QemuCond/QemuSemaphore/QemuEvent
+        APIs.
+
+Note that these APIs are relatively heavyweight, and should _not_ be
+nested.
+
+
+DIFFERENCES WITH LINUX
+======================
+
+- Sleeping is possible, though discouraged, within an RCU critical section.
+
+- rcu_dereference takes a _pointer_ to the variable being accessed.
+  Wrong usage will be detected by the compiler.
+
+- Quiescent points must be marked explicitly unless the thread uses
+  condvars/semaphores/events for synchronization.
+
+
+RCU PATTERNS
+============
+
+Many patterns using read-writer locks translate directly to RCU, with
+the advantages of higher scalability and deadlock immunity.
+
+In general, RCU can be used whenever it is possible to create a new
+"version" of a data structure every time the updater runs.  This may
+sound like a very strict restriction, however:
+
+- the updater does not mean "everything that writes to a data structure",
+  but rather "everything that involves a reclamation step".  See the
+  array example below
+
+- in some cases, creating a new version of a data structure may actually
+  be very cheap.  For example, modifying the "next" pointer of a singly
+  linked list is effectively creating a new version of the list.
+
+
+them however are worth noting.
+
+RCU list processing
+-------------------
+
+TBD (not yet used in QEMU)
+
+
+RCU reference counting
+----------------------
+
+Because grace periods are not allowed to complete while there is an RCU
+read-side critical section in progress, the RCU read-side primitives
+may be used as a restricted reference-counting mechanism.  For example,
+consider the following code fragment:
+
+    rcu_read_lock();
+    p = rcu_dereference(&foo);
+    /* do something with p. */
+    rcu_read_unlock();
+
+The RCU read-side critical section ensures that the value of "p" remains
+valid until after the rcu_read_unlock().  In some sense, it is acquiring
+a reference to p that is later released when the critical section ends.
+The write side looks simply like this (with appropriate locking):
+
+    qemu_mutex_lock(&foo_mutex);
+    old = foo;
+    rcu_assign_pointer(foo, new);
+    qemu_mutex_unlock(&foo_mutex);
+    synchronize_rcu();
+    free(old);
+
+Note that the same idiom would be possible with reader/writer
+locks:
+
+    read_lock(&foo_rwlock);         write_mutex_lock(&foo_rwlock);
+    p = foo;                        p = foo;
+    /* do something with p. */      foo = new;
+    read_unlock(&foo_rwlock);       free(p);
+                                    write_mutex_unlock(&foo_rwlock);
+                                    free(p);
+
+
+RCU resizable arrays
+--------------------
+
+Resizable arrays can be used with RCU.  The expensive RCU synchronization
+only needs to take place when the array is resized.  The two items to
+take care of are:
+
+- ensuring that the old version of the array is available between removal
+  and reclamation;
+
+- avoiding mismatches in the read side between the array data and the
+  array size.
+
+The first problem is avoided simply by not using realloc.  Instead,
+each resize will allocate a new array and copy the old data into it.
+The second problem would arise if the size and the data pointers were
+two members of a larger struct:
+
+    struct mystuff {
+        ...
+        int data_size;
+        int data_alloc;
+        T   *data;
+        ...
+    };
+
+Instead, we store the size of the array with the array itself:
+
+    struct arr {
+        int size;
+        int alloc;
+        T   data[];
+    };
+    struct arr *global_array;
+
+    read side:
+        rcu_read_lock();
+        struct arr *array = rcu_dereference(&global_array);
+        x = i < array->size ? array->data[i] : -1;
+        rcu_read_unlock();
+        return x;
+
+    write side (running under a lock):
+        if (global_array->size == global_array->alloc) {
+            /* Creating a new version.  */
+            new_array = g_malloc(sizeof(struct arr) +
+                                 global_array->alloc * 2 * sizeof(T));
+            new_array->size = global_array->size;
+            new_array->alloc = global_array->alloc * 2;
+            memcpy(new_array->data, global_array->data,
+                   global_array->alloc * sizeof(T));
+
+            /* Removal phase.  */
+            old_array = global_array;
+            rcu_assign_pointer(new_array->data, new_array);
+            synchronize_rcu();
+
+            /* Reclamation phase.  */
+            free(old_array);
+        }
diff --git a/hw/9pfs/virtio-9p-synth.c b/hw/9pfs/virtio-9p-synth.c
index 840e4eb..d5f5842 100644
--- a/hw/9pfs/virtio-9p-synth.c
+++ b/hw/9pfs/virtio-9p-synth.c
@@ -17,6 +17,7 @@
 #include "virtio-9p-xattr.h"
 #include "fsdev/qemu-fsdev.h"
 #include "virtio-9p-synth.h"
+#include "util/rcu.h"
 
 #include <sys/stat.h>
 
diff --git a/include/qemu/queue.h b/include/qemu/queue.h
index d433b90..847ddd1 100644
--- a/include/qemu/queue.h
+++ b/include/qemu/queue.h
@@ -104,6 +104,19 @@ struct {                                                                \
         (head)->lh_first = NULL;                                        \
 } while (/*CONSTCOND*/0)
 
+#define QLIST_SWAP(dstlist, srclist, field) do {                        \
+        void *tmplist;                                                  \
+        tmplist = (srclist)->lh_first;                                  \
+        (srclist)->lh_first = (dstlist)->lh_first;                      \
+        if ((srclist)->lh_first != NULL) {                              \
+            (srclist)->lh_first->field.le_prev = &(srclist)->lh_first;  \
+        }                                                               \
+        (dstlist)->lh_first = tmplist;                                  \
+        if ((dstlist)->lh_first != NULL) {                              \
+            (dstlist)->lh_first->field.le_prev = &(dstlist)->lh_first;  \
+        }                                                               \
+} while (/*CONSTCOND*/0)
+
 #define QLIST_INSERT_AFTER(listelm, elm, field) do {                    \
         if (((elm)->field.le_next = (listelm)->field.le_next) != NULL)  \
                 (listelm)->field.le_next->field.le_prev =               \
diff --git a/include/qemu/rcu-pointer.h b/include/qemu/rcu-pointer.h
new file mode 100644
index 0000000..0e6417c
--- /dev/null
+++ b/include/qemu/rcu-pointer.h
@@ -0,0 +1,110 @@
+#ifndef _URCU_POINTER_STATIC_H
+#define _URCU_POINTER_STATIC_H
+
+/*
+ * urcu-pointer-static.h
+ *
+ * Userspace RCU header. Operations on pointers.
+ *
+ * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See urcu-pointer.h for
+ * linking dynamically with the userspace rcu library.
+ *
+ * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * IBM's contributions to this file may be relicensed under LGPLv2 or later.
+ */
+
+#include "compiler.h"
+#include "qemu/atomic.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * rcu_dereference - reads (copy) a RCU-protected pointer to a local variable
+ * into a RCU read-side critical section. The pointer can later be safely
+ * dereferenced within the critical section.
+ *
+ * This ensures that the pointer copy is invariant thorough the whole critical
+ * section.
+ *
+ * Inserts memory barriers on architectures that require them (currently only
+ * Alpha) and documents which pointers are protected by RCU.
+ *
+ * The compiler memory barrier in atomic_read() ensures that value-speculative
+ * optimizations (e.g. VSS: Value Speculation Scheduling) does not perform the
+ * data read before the pointer read by speculating the value of the pointer.
+ * Correct ordering is ensured because the pointer is read as a volatile access.
+ * This acts as a global side-effect operation, which forbids reordering of
+ * dependent memory operations. Note that such concern about dependency-breaking
+ * optimizations will eventually be taken care of by the "memory_order_consume"
+ * addition to forthcoming C++ standard.
+ *
+ * Should match rcu_assign_pointer() or rcu_xchg_pointer().
+ */
+
+#define rcu_dereference(p)                      \
+        ({                                      \
+            typeof(p) _p1 = (p);                \
+            smp_read_barrier_depends();         \
+            *(_p1);                             \
+        })
+
+/**
+ * rcu_cmpxchg_pointer - same as rcu_set_pointer, but tests if the pointer
+ * is as expected by "old". If succeeds, returns the previous pointer to the
+ * data structure, which can be safely freed after waiting for a quiescent state
+ * using synchronize_rcu(). If fails (unexpected value), returns old (which
+ * should not be freed !).
+ */
+
+#define rcu_cmpxchg_pointer(p, old, _new)       \
+        ({                                      \
+            typeof(*p) _pold = (old);           \
+            typeof(*p) _pnew = (_new);          \
+            atomic_cmpxchg(p, _pold, _pnew);    \
+        })
+
+#define rcu_set_pointer(p, v)                   \
+        ({                                      \
+             typeof(*p) _pv = (v);              \
+             smp_wmb();                         \
+             atomic_set(p, _pv);                \
+        })
+
+/**
+ * rcu_assign_pointer - assign (publicize) a pointer to a new data structure
+ * meant to be read by RCU read-side critical sections. Returns the assigned
+ * value.
+ *
+ * Documents which pointers will be dereferenced by RCU read-side critical
+ * sections and adds the required memory barriers on architectures requiring
+ * them. It also makes sure the compiler does not reorder code initializing the
+ * data structure before its publication.
+ *
+ * Should match rcu_dereference().
+ */
+
+#define rcu_assign_pointer(p, v)    rcu_set_pointer(&(p), v)
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _URCU_POINTER_STATIC_H */
diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
new file mode 100644
index 0000000..b875593
--- /dev/null
+++ b/include/qemu/rcu.h
@@ -0,0 +1,168 @@
+#ifndef _URCU_QSBR_H
+#define _URCU_QSBR_H
+
+/*
+ * urcu-qsbr.h
+ *
+ * Userspace RCU QSBR header.
+ *
+ * LGPL-compatible code should include this header with :
+ *
+ * #define _LGPL_SOURCE
+ * #include <urcu.h>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * IBM's contributions to this file may be relicensed under LGPLv2 or later.
+ */
+
+#include <stdlib.h>
+#include <assert.h>
+#include <limits.h>
+#include <unistd.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <glib.h>
+
+#include "qemu/compiler.h"
+#include "qemu/rcu-pointer.h"
+#include "qemu/thread.h"
+#include "qemu/queue.h"
+#include "qemu/atomic.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/*
+ * Important !
+ *
+ * Each thread containing read-side critical sections must be registered
+ * with rcu_register_thread() before calling rcu_read_lock().
+ * rcu_unregister_thread() should be called before the thread exits.
+ */
+
+#ifdef DEBUG_RCU
+#define rcu_assert(args...)    assert(args)
+#else
+#define rcu_assert(args...)
+#endif
+
+#define RCU_GP_ONLINE     (1UL << 0)
+#define RCU_GP_CTR        (1UL << 1)
+
+/*
+ * Global quiescent period counter with low-order bits unused.
+ * Using a int rather than a char to eliminate false register dependencies
+ * causing stalls on some architectures.
+ */
+extern unsigned long rcu_gp_ctr;
+
+extern QemuEvent rcu_gp_event;
+
+struct rcu_reader_data {
+    /* Data used by both reader and synchronize_rcu() */
+    unsigned long ctr;
+    bool waiting;
+
+    /* Data used for registry, protected by rcu_gp_lock */
+    QLIST_ENTRY(rcu_reader_data) node;
+};
+
+#ifdef __linux__
+extern __thread struct rcu_reader_data rcu_reader;
+#define DEFINE_RCU_READER() \
+    __thread struct rcu_reader_data rcu_reader
+
+static inline struct rcu_reader_data *get_rcu_reader(void)
+{
+    return &rcu_reader;
+}
+
+static inline void alloc_rcu_reader(void)
+{
+}
+#else
+extern GPrivate rcu_reader_key;
+#define DEFINE_RCU_READER() \
+     GPrivate rcu_reader_key = G_PRIVATE_INIT(g_free)
+
+static inline struct rcu_reader_data *get_rcu_reader(void)
+{
+    return g_private_get(&rcu_reader_key);
+}
+
+static inline void alloc_rcu_reader(void)
+{
+     g_private_replace(&rcu_reader_key,
+                       g_malloc0(sizeof(struct rcu_reader_data)));
+}
+#endif
+
+static inline void rcu_read_lock(void)
+{
+    rcu_assert(get_rcu_reader()->ctr);
+}
+
+static inline void rcu_read_unlock(void)
+{
+    /* Ensure that the previous reads complete before starting those
+     * in another critical section.
+     */
+    smp_rmb();
+}
+
+static inline void rcu_quiescent_state(void)
+{
+    struct rcu_reader_data *p_rcu_reader = get_rcu_reader();
+
+    /* Reuses smp_rmb() in the last rcu_read_unlock().  */
+    unsigned ctr = atomic_read(&rcu_gp_ctr);
+    atomic_xchg(&p_rcu_reader->ctr, ctr);
+    if (atomic_read(&p_rcu_reader->waiting)) {
+        atomic_set(&p_rcu_reader->waiting, false);
+        qemu_event_set(&rcu_gp_event);
+    }
+}
+
+static inline void rcu_thread_offline(void)
+{
+    struct rcu_reader_data *p_rcu_reader = get_rcu_reader();
+
+    atomic_xchg(&p_rcu_reader->ctr, 0);
+    if (atomic_read(&p_rcu_reader->waiting)) {
+        atomic_set(&p_rcu_reader->waiting, false);
+        qemu_event_set(&rcu_gp_event);
+    }
+}
+
+static inline void rcu_thread_online(void)
+{
+    rcu_quiescent_state();
+}
+
+extern void synchronize_rcu(void);
+
+/*
+ * Reader thread registration.
+ */
+extern void rcu_register_thread(void);
+extern void rcu_unregister_thread(void);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _URCU_QSBR_H */
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index 3e32c65..5d64a20 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -25,9 +25,6 @@ void qemu_mutex_lock(QemuMutex *mutex);
 int qemu_mutex_trylock(QemuMutex *mutex);
 void qemu_mutex_unlock(QemuMutex *mutex);
 
-#define rcu_read_lock() do { } while (0)
-#define rcu_read_unlock() do { } while (0)
-
 void qemu_cond_init(QemuCond *cond);
 void qemu_cond_destroy(QemuCond *cond);
 
diff --git a/libcacard/Makefile b/libcacard/Makefile
index 47827a0..f7a3b07 100644
--- a/libcacard/Makefile
+++ b/libcacard/Makefile
@@ -4,7 +4,8 @@ TOOLS += vscclient$(EXESUF)
 
 # objects linked into a shared library, built with libtool with -fPIC if required
 libcacard-obj-y = $(stub-obj-y) $(libcacard-y)
-libcacard-obj-y += util/osdep.o util/cutils.o util/qemu-timer-common.o util/error.o
+libcacard-obj-y += util/osdep.o util/cutils.o util/qemu-timer-common.o
+libcacard-obj-y += util/rcu.o util/error.o
 libcacard-obj-$(CONFIG_WIN32) += util/oslib-win32.o util/qemu-thread-win32.o
 libcacard-obj-$(CONFIG_POSIX) += util/oslib-posix.o util/qemu-thread-posix.o
 libcacard-obj-y += $(filter trace/%, $(util-obj-y))
diff --git a/util/Makefile.objs b/util/Makefile.objs
index 4a1bd4e..f05eba1 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -11,3 +11,4 @@ util-obj-y += iov.o aes.o qemu-config.o qemu-sockets.o uri.o notify.o
 util-obj-y += qemu-option.o qemu-progress.o
 util-obj-y += hexdump.o
 util-obj-y += crc32c.o
+util-obj-y += rcu.o
diff --git a/util/rcu.c b/util/rcu.c
new file mode 100644
index 0000000..48686a3
--- /dev/null
+++ b/util/rcu.c
@@ -0,0 +1,203 @@
+/*
+ * urcu-qsbr.c
+ *
+ * Userspace RCU QSBR library
+ *
+ * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
+ * Copyright 2013 Red Hat, Inc.
+ *
+ * Ported to QEMU by Paolo Bonzini  <pbonzini@redhat.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * IBM's contributions to this file may be relicensed under LGPLv2 or later.
+ */
+
+#include <stdio.h>
+#include <assert.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <errno.h>
+#include "qemu/rcu.h"
+#include "qemu/atomic.h"
+
+/*
+ * Global grace period counter.  Bit 0 is one if the thread is online.
+ * Bits 1 and above are defined in synchronize_rcu/update_counter_and_wait.
+ */
+#define RCU_GP_ONLINE           (1UL << 0)
+#define RCU_GP_CTR              (1UL << 1)
+
+unsigned long rcu_gp_ctr = RCU_GP_ONLINE;
+
+QemuEvent rcu_gp_event;
+static QemuMutex rcu_gp_lock;
+
+/*
+ * Check whether a quiescent state was crossed between the beginning of
+ * update_counter_and_wait and now.
+ */
+static inline int rcu_gp_ongoing(unsigned long *ctr)
+{
+    unsigned long v;
+
+    /* See update_counter_and_wait for the discussion of memory barriers.  */
+    v = atomic_read(ctr);
+    return v && (v != rcu_gp_ctr);
+}
+
+/* Written to only by each individual reader. Read by both the reader and the
+ * writers.
+ */
+DEFINE_RCU_READER();
+
+/* Protected by rcu_gp_lock.  */
+typedef QLIST_HEAD(, rcu_reader_data) ThreadList;
+static ThreadList registry = QLIST_HEAD_INITIALIZER(registry);
+
+/* Wait for previous parity/grace period to be empty of readers.  */
+static void wait_for_readers(void)
+{
+    ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders);
+    struct rcu_reader_data *index, *tmp;
+
+    for (;;) {
+        /* We want to be notified of changes made to rcu_gp_ongoing
+         * while we walk the list.
+         */
+        qemu_event_reset(&rcu_gp_event);
+
+        /* Instead of using atomic_mb_set for index->waiting, and
+         * atomic_mb_read for index->ctr, memory barriers are placed
+         * manually since writes to different threads are independent.
+         * atomic_mb_set has a smp_wmb before...
+         */
+        smp_wmb();
+        QLIST_FOREACH(index, &registry, node) {
+            atomic_set(&index->waiting, true);
+        }
+
+        /* ... and a smp_mb after.
+         *
+         * This barrier also blocks stores that free old RCU-protected
+         * pointers.
+         */
+        smp_mb();
+
+        QLIST_FOREACH_SAFE(index, &registry, node, tmp) {
+            if (!rcu_gp_ongoing(&index->ctr)) {
+                QLIST_REMOVE(index, node);
+                QLIST_INSERT_HEAD(&qsreaders, index, node);
+
+                /* No need for mb_set here, worst of all we
+                 * get some extra futex wakeups.
+                 */
+                atomic_set(&index->waiting, false);
+            }
+        }
+
+        /* atomic_mb_read has smp_rmb after.  */
+        smp_rmb();
+
+        if (QLIST_EMPTY(&registry)) {
+            break;
+        }
+
+        /* Wait for one thread to report a quiescent state and
+         * try again.
+         */
+        qemu_event_wait(&rcu_gp_event);
+    }
+
+    /* put back the reader list in the registry */
+    QLIST_SWAP(&registry, &qsreaders, node);
+}
+
+void synchronize_rcu(void)
+{
+    unsigned long was_online;
+
+    was_online = get_rcu_reader()->ctr;
+
+    /* Mark the writer thread offline to make sure we don't wait for
+     * our own quiescent state. This allows using synchronize_rcu()
+     * in threads registered as readers.
+     *
+     * rcu_thread_offline() and rcu_thread_online() include a
+     * memory barrier.
+     */
+    if (was_online) {
+        rcu_thread_offline();
+    } else {
+        smp_mb();
+    }
+
+    qemu_mutex_lock(&rcu_gp_lock);
+
+    if (!QLIST_EMPTY(&registry)) {
+        if (sizeof(rcu_gp_ctr) < 8) {
+            /* For architectures with 32-bit longs, a two-subphases algorithm
+             * ensures we do not encounter overflow bugs.
+             *
+             * Switch parity: 0 -> 1, 1 -> 0.
+             */
+            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
+            wait_for_readers();
+            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
+        } else {
+            /* Increment current grace period.  */
+            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
+        }
+
+        wait_for_readers();
+    }
+
+    qemu_mutex_unlock(&rcu_gp_lock);
+
+    if (was_online) {
+        rcu_thread_online();
+    } else {
+        smp_mb();
+    }
+}
+
+void rcu_register_thread(void)
+{
+    if (!get_rcu_reader()) {
+        alloc_rcu_reader();
+    }
+
+    assert(get_rcu_reader()->ctr == 0);
+    qemu_mutex_lock(&rcu_gp_lock);
+    QLIST_INSERT_HEAD(&registry, get_rcu_reader(), node);
+    qemu_mutex_unlock(&rcu_gp_lock);
+    rcu_quiescent_state();
+}
+
+void rcu_unregister_thread(void)
+{
+    rcu_thread_offline();
+    qemu_mutex_lock(&rcu_gp_lock);
+    QLIST_REMOVE(get_rcu_reader(), node);
+    qemu_mutex_unlock(&rcu_gp_lock);
+}
+
+static void __attribute__((__constructor__)) rcu_init(void)
+{
+    qemu_mutex_init(&rcu_gp_lock);
+    qemu_event_init(&rcu_gp_event, true);
+    rcu_register_thread();
+}
-- 
1.8.1.4

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 04/12] qemu-thread: register threads with RCU
  2013-05-15 15:48 [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU Paolo Bonzini
                   ` (2 preceding siblings ...)
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 03/12] rcu: add rcu library Paolo Bonzini
@ 2013-05-15 15:48 ` Paolo Bonzini
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 05/12] rcu: add call_rcu Paolo Bonzini
                   ` (10 subsequent siblings)
  14 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2013-05-15 15:48 UTC (permalink / raw)
  To: qemu-devel; +Cc: Jan Kiszka, qemulist, David Gibson

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/rcu.txt             | 13 +++++++------
 util/qemu-thread-posix.c | 28 +++++++++++++++++++++++++++-
 util/qemu-thread-win32.c |  2 ++
 3 files changed, 36 insertions(+), 7 deletions(-)

diff --git a/docs/rcu.txt b/docs/rcu.txt
index 19e4840..5736676 100644
--- a/docs/rcu.txt
+++ b/docs/rcu.txt
@@ -122,8 +122,8 @@ on many POSIX systems other than Linux and Solaris).
 
 For this reason, QEMU's RCU implementation resorts to manual annotation
 of "quiescent states", i.e. points where no RCU read-side critical
-section can be active.  All threads that participate in the RCU mechanism
-need to annotate such points.
+section can be active.  All threads created with qemu_thread_create
+participate in the RCU mechanism and need to annotate such points.
 
 Marking quiescent states is done with the following three APIs:
 
@@ -144,8 +144,8 @@ Marking quiescent states is done with the following three APIs:
         thread.
 
 
-Furthermore, threads that participate in the RCU mechanism must communicate
-this fact using the following APIs:
+The following APIs can be used to use RCU in a thread that is not
+created with qemu_thread_create():
 
      void rcu_register_thread(void);
 
@@ -160,8 +160,9 @@ this fact using the following APIs:
         either manually or by using the QemuCond/QemuSemaphore/QemuEvent
         APIs.
 
-Note that these APIs are relatively heavyweight, and should _not_ be
-nested.
+Note that these APIs are relatively heavyweight, should _not_ be
+nested, and should not be called in threads that are created with
+qemu_thread_create().
 
 
 DIFFERENCES WITH LINUX
diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c
index 8178f9b..2df3382 100644
--- a/util/qemu-thread-posix.c
+++ b/util/qemu-thread-posix.c
@@ -26,6 +26,7 @@
 #endif
 #include "qemu/thread.h"
 #include "qemu/atomic.h"
+#include "qemu/rcu.h"
 
 static void error_exit(int err, const char *msg)
 {
@@ -384,6 +385,26 @@ void qemu_event_wait(QemuEvent *ev)
 }
 
 
+typedef struct QemuThreadData {
+    /* Passed to win32_start_routine.  */
+    void             *(*start_routine)(void *);
+    void             *arg;
+} QemuThreadData;
+
+static void *thread_start_routine(void *arg)
+{
+    QemuThreadData *data = (QemuThreadData *) arg;
+    void *(*start_routine)(void *) = data->start_routine;
+    void *thread_arg = data->arg;
+    void *ret;
+
+    rcu_register_thread();
+    g_free(data);
+    ret = start_routine(thread_arg);
+    rcu_unregister_thread();
+    return ret;
+}
+
 void qemu_thread_create(QemuThread *thread,
                        void *(*start_routine)(void*),
                        void *arg, int mode)
@@ -391,6 +412,11 @@ void qemu_thread_create(QemuThread *thread,
     sigset_t set, oldset;
     int err;
     pthread_attr_t attr;
+    QemuThreadData *data;
+
+    data = g_malloc(sizeof(*data));
+    data->start_routine = start_routine;
+    data->arg = arg;
 
     err = pthread_attr_init(&attr);
     if (err) {
@@ -406,7 +432,7 @@ void qemu_thread_create(QemuThread *thread,
     /* Leave signal handling to the iothread.  */
     sigfillset(&set);
     pthread_sigmask(SIG_SETMASK, &set, &oldset);
-    err = pthread_create(&thread->thread, &attr, start_routine, arg);
+    err = pthread_create(&thread->thread, &attr, thread_start_routine, data);
     if (err)
         error_exit(err, __func__);
 
diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c
index 27a5217..0c4850d 100644
--- a/util/qemu-thread-win32.c
+++ b/util/qemu-thread-win32.c
@@ -278,6 +278,7 @@ static unsigned __stdcall win32_start_routine(void *arg)
         data = NULL;
     }
     qemu_thread_data = data;
+    rcu_register_thread();
     qemu_thread_exit(start_routine(thread_arg));
     abort();
 }
@@ -293,6 +294,7 @@ void qemu_thread_exit(void *arg)
         data->exited = true;
         LeaveCriticalSection(&data->cs);
     }
+    rcu_unregister_thread();
     _endthreadex(0);
 }
 
-- 
1.8.1.4

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 05/12] rcu: add call_rcu
  2013-05-15 15:48 [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU Paolo Bonzini
                   ` (3 preceding siblings ...)
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 04/12] qemu-thread: register threads with RCU Paolo Bonzini
@ 2013-05-15 15:48 ` Paolo Bonzini
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 06/12] rcu: add rcutorture Paolo Bonzini
                   ` (9 subsequent siblings)
  14 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2013-05-15 15:48 UTC (permalink / raw)
  To: qemu-devel; +Cc: Jan Kiszka, qemulist, David Gibson

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/rcu.txt       | 108 +++++++++++++++++++++++++++++++++++++++++++++--
 include/qemu/rcu.h |  22 ++++++++++
 util/rcu.c         | 120 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 246 insertions(+), 4 deletions(-)

diff --git a/docs/rcu.txt b/docs/rcu.txt
index 5736676..d7c4f0b 100644
--- a/docs/rcu.txt
+++ b/docs/rcu.txt
@@ -82,7 +82,50 @@ The core RCU API is small:
         Note that it would be valid for another update to come while
         synchronize_rcu is running.  Because of this, it is better that
         the updater releases any locks it may hold before calling
-        synchronize_rcu.
+        synchronize_rcu.  If this is not possible (for example, because
+        the updater is protected by the BQL), you can use call_rcu.
+
+     void call_rcu1(struct rcu_head * head,
+                    void (*func)(struct rcu_head *head));
+
+        This function invokes func(head) after all pre-existing RCU
+        read-side critical sections on all threads have completed.  This
+        marks the end of the removal phase, with func taking care
+        asynchronously of the reclamation phase.
+
+        The foo struct needs to have an rcu_head structure added,
+        perhaps as follows:
+
+            struct foo {
+                struct rcu_head rcu;
+                int a;
+                char b;
+                long c;
+            };
+
+        so that the reclaimer function can fetch the struct foo address
+        and free it:
+
+            call_rcu1(foo_reclaim, &foo.rcu);
+
+            void foo_reclaim(struct rcu_head *rp)
+            {
+                struct foo *fp = container_of(rp, struct foo, rcu);
+                g_free(fp);
+            }
+
+        For the common case where the rcu_head member is the first of the
+        struct, you can use the following macro.
+
+     void call_rcu(T *p,
+                   void (*func)(T *p),
+                   field-name);
+
+        call_rcu1 is typically used through this macro, in the common case
+        where the "struct rcu_head" is the first field in the struct.  In
+        the above case, one could have written simply:
+
+            call_rcu(foo_reclaim, g_free, rcu);
 
      typeof(*p) rcu_dereference(p);
      typeof(p) rcu_assign_pointer(p, typeof(p) v);
@@ -173,6 +216,11 @@ DIFFERENCES WITH LINUX
 - rcu_dereference takes a _pointer_ to the variable being accessed.
   Wrong usage will be detected by the compiler.
 
+- call_rcu is a macro that has an extra argument (the name of the first
+  field in the struct, which must be a struct rcu_head), and expects the
+  type of the callback's argument to be the type of the first argument.
+  call_rcu1 is the same as Linux's call_rcu.
+
 - Quiescent points must be marked explicitly unless the thread uses
   condvars/semaphores/events for synchronization.
 
@@ -229,7 +277,47 @@ The write side looks simply like this (with appropriate locking):
     synchronize_rcu();
     free(old);
 
-Note that the same idiom would be possible with reader/writer
+If the processing cannot be done purely within the critical section, it
+is possible to combine this idiom with a "real" reference count:
+
+    rcu_read_lock();
+    p = rcu_dereference(&foo);
+    foo_ref(p);
+    rcu_read_unlock();
+    /* do something with p. */
+    foo_unref(p);
+
+The write side can be like this:
+
+    qemu_mutex_lock(&foo_mutex);
+    old = foo;
+    rcu_assign_pointer(foo, new);
+    qemu_mutex_unlock(&foo_mutex);
+    synchronize_rcu();
+    foo_unref(old);
+
+or with call_rcu:
+
+    qemu_mutex_lock(&foo_mutex);
+    old = foo;
+    rcu_assign_pointer(foo, new);
+    qemu_mutex_unlock(&foo_mutex);
+    call_rcu(foo_unref, old, rcu);
+
+In both cases, the write side only performs removal.  Reclamation
+happens when the last reference to a "foo" object is dropped.
+Using synchronize_rcu() is undesirably expensive, because the
+last reference may be dropped on the read side.  Hence you can
+use call_rcu() instead:
+
+     foo_unref(struct foo *p) {
+        if (atomic_dec(&p->refcount) == 0) {
+            call_rcu(foo_destroy, p, rcu);
+        }
+    }
+
+
+Note that the same idioms would be possible with reader/writer
 locks:
 
     read_lock(&foo_rwlock);         write_mutex_lock(&foo_rwlock);
@@ -239,13 +327,25 @@ locks:
                                     write_mutex_unlock(&foo_rwlock);
                                     free(p);
 
+    ------------------------------------------------------------------
+
+    read_lock(&foo_rwlock);         write_mutex_lock(&foo_rwlock);
+    p = foo;                        old = foo;
+    foo_ref(p);                     foo = new;
+    read_unlock(&foo_rwlock);       write_mutex_unlock(&foo_rwlock);
+    /* do something with p. */      foo_unref(old);
+    foo_unref(p);
+
+foo_unref could use a mechanism such as bottom halves to move deallocation
+out of hot paths.
+
 
 RCU resizable arrays
 --------------------
 
 Resizable arrays can be used with RCU.  The expensive RCU synchronization
-only needs to take place when the array is resized.  The two items to
-take care of are:
+(or call_rcu) only needs to take place when the array is resized.
+The two items to take care of are:
 
 - ensuring that the old version of the array is available between removal
   and reclamation;
diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
index b875593..e43b912 100644
--- a/include/qemu/rcu.h
+++ b/include/qemu/rcu.h
@@ -161,6 +161,28 @@ extern void synchronize_rcu(void);
 extern void rcu_register_thread(void);
 extern void rcu_unregister_thread(void);
 
+struct rcu_head;
+typedef void RCUCBFunc(struct rcu_head *head);
+
+struct rcu_head {
+    struct rcu_head *next;
+    RCUCBFunc *func;
+};
+
+extern void call_rcu1(struct rcu_head *head, RCUCBFunc *func);
+
+/* The operands of the minus operator must have the same type,
+ * which must be the one that we specify in the cast.
+ */
+#define call_rcu(head, func, field)                                      \
+    call_rcu1(({                                                         \
+         char __attribute__((unused))                                    \
+            offset_must_be_zero[-offsetof(typeof(*(head)), field)],      \
+            func_type_invalid = (func) - (void (*)(typeof(head)))(func); \
+         &(head)->field;                                                 \
+      }),                                                                \
+      (RCUCBFunc *)(func))
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/util/rcu.c b/util/rcu.c
index 48686a3..27fda86 100644
--- a/util/rcu.c
+++ b/util/rcu.c
@@ -26,6 +26,7 @@
  * IBM's contributions to this file may be relicensed under LGPLv2 or later.
  */
 
+#include "qemu-common.h"
 #include <stdio.h>
 #include <assert.h>
 #include <stdlib.h>
@@ -33,6 +34,7 @@
 #include <errno.h>
 #include "qemu/rcu.h"
 #include "qemu/atomic.h"
+#include "qemu/thread.h"
 
 /*
  * Global grace period counter.  Bit 0 is one if the thread is online.
@@ -174,6 +176,119 @@ void synchronize_rcu(void)
     }
 }
 
+
+#define RCU_CALL_MIN_SIZE        30
+
+/* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h
+ * from liburcu.  Note that head is only used by the consumer.
+ */
+static struct rcu_head dummy;
+static struct rcu_head *head = &dummy, **tail = &dummy.next;
+static int rcu_call_count;
+static QemuEvent rcu_call_ready_event;
+
+static void enqueue(struct rcu_head *node)
+{
+    struct rcu_head **old_tail;
+
+    node->next = NULL;
+    old_tail = atomic_xchg(&tail, &node->next);
+    atomic_mb_set(old_tail, node);
+}
+
+static struct rcu_head *try_dequeue(void)
+{
+    struct rcu_head *node, *next;
+
+retry:
+    /* Test for an empty list, which we do not expect.  Note that for
+     * the consumer head and tail are always consistent.  The head
+     * is consistent because only the consumer reads/writes it.
+     * The tail, because it is the first step in the enqueuing.
+     * It is only the next pointers that might be inconsistent.
+     */
+    if (head == &dummy && atomic_mb_read(&tail) == &dummy.next) {
+        abort();
+    }
+
+    /* If the head node has NULL in its next pointer, the value is
+     * wrong and we need to wait until its enqueuer finishes the update.
+     */
+    node = head;
+    next = atomic_mb_read(&head->next);
+    if (!next) {
+        return NULL;
+    }
+
+    /* Since we are the sole consumer, and can_dequeue() excludes the
+     * empty case, the queue will always have at least two nodes: the
+     * dummy node, and the one being removed.  So we do not need to update
+     * the tail pointer.
+     */
+    head = next;
+
+    /* If we dequeued the dummy node, add it back at the end and retry.  */
+    if (node == &dummy) {
+        enqueue(node);
+        goto retry;
+    }
+
+    return node;
+}
+
+static void *call_rcu_thread(void *opaque)
+{
+    struct rcu_head *node;
+
+    /* This thread is just a writer.  */
+    rcu_thread_offline();
+
+    for (;;) {
+        int tries = 0;
+        int n = atomic_read(&rcu_call_count);
+
+        /* Heuristically wait for a decent number of callbacks to pile up.
+         * Fetch rcu_call_count now, we only must process elements that were
+         * added before synchronize_rcu() starts.
+         */
+        while (n < RCU_CALL_MIN_SIZE && ++tries <= 5) {
+            g_usleep(100000);
+            qemu_event_reset(&rcu_call_ready_event);
+            n = atomic_read(&rcu_call_count);
+            if (n < RCU_CALL_MIN_SIZE) {
+                qemu_event_wait(&rcu_call_ready_event);
+                n = atomic_read(&rcu_call_count);
+            }
+        }
+
+        atomic_sub(&rcu_call_count, n);
+        synchronize_rcu();
+        while (n > 0) {
+            node = try_dequeue();
+            while (!node) {
+                qemu_event_reset(&rcu_call_ready_event);
+                node = try_dequeue();
+                if (!node) {
+                    qemu_event_wait(&rcu_call_ready_event);
+                    node = try_dequeue();
+                }
+            }
+
+            n--;
+            node->func(node);
+        }
+    }
+    abort();
+}
+
+void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
+{
+    node->func = func;
+    enqueue(node);
+    atomic_inc(&rcu_call_count);
+    qemu_event_set(&rcu_call_ready_event);
+}
+
 void rcu_register_thread(void)
 {
     if (!get_rcu_reader()) {
@@ -197,7 +312,12 @@ void rcu_unregister_thread(void)
 
 static void __attribute__((__constructor__)) rcu_init(void)
 {
+    QemuThread thread;
+
     qemu_mutex_init(&rcu_gp_lock);
     qemu_event_init(&rcu_gp_event, true);
+
+    qemu_event_init(&rcu_call_ready_event, false);
+    qemu_thread_create(&thread, call_rcu_thread, NULL, QEMU_THREAD_DETACHED);
     rcu_register_thread();
 }
-- 
1.8.1.4

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 06/12] rcu: add rcutorture
  2013-05-15 15:48 [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU Paolo Bonzini
                   ` (4 preceding siblings ...)
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 05/12] rcu: add call_rcu Paolo Bonzini
@ 2013-05-15 15:48 ` Paolo Bonzini
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 07/12] rcu: allow nested calls to rcu_thread_offline/rcu_thread_online Paolo Bonzini
                   ` (8 subsequent siblings)
  14 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2013-05-15 15:48 UTC (permalink / raw)
  To: qemu-devel; +Cc: Jan Kiszka, qemulist, David Gibson

A stress test program (works, too :)).

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 tests/Makefile     |   5 +-
 tests/rcutorture.c | 387 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 391 insertions(+), 1 deletion(-)
 create mode 100644 tests/rcutorture.c

diff --git a/tests/Makefile b/tests/Makefile
index a307d5a..1aa7572 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -44,6 +44,8 @@ check-unit-y += tests/test-cutils$(EXESUF)
 gcov-files-test-cutils-y += util/cutils.c
 check-unit-y += tests/test-mul64$(EXESUF)
 gcov-files-test-mul64-y = util/host-utils.c
+check-unit-y += tests/rcutorture$(EXESUF)
+gcov-files-rcutorture-y = util/rcu.c
 
 check-block-$(CONFIG_POSIX) += tests/qemu-iotests-quick.sh
 
@@ -75,7 +77,7 @@ test-obj-y = tests/check-qint.o tests/check-qstring.o tests/check-qdict.o \
 	tests/test-string-input-visitor.o tests/test-qmp-output-visitor.o \
 	tests/test-qmp-input-visitor.o tests/test-qmp-input-strict.o \
 	tests/test-qmp-commands.o tests/test-visitor-serialization.o \
-	tests/test-x86-cpuid.o tests/test-mul64.o
+	tests/test-x86-cpuid.o tests/test-mul64.o tests/rcutorture.o
 
 test-qapi-obj-y = tests/test-qapi-visit.o tests/test-qapi-types.o
 
@@ -98,6 +100,7 @@ tests/test-hbitmap$(EXESUF): tests/test-hbitmap.o libqemuutil.a libqemustub.a
 tests/test-x86-cpuid$(EXESUF): tests/test-x86-cpuid.o
 tests/test-xbzrle$(EXESUF): tests/test-xbzrle.o xbzrle.o page_cache.o libqemuutil.a
 tests/test-cutils$(EXESUF): tests/test-cutils.o util/cutils.o
+tests/rcutorture$(EXESUF): tests/rcutorture.o libqemuutil.a
 
 tests/test-qapi-types.c tests/test-qapi-types.h :\
 $(SRC_PATH)/qapi-schema-test.json $(SRC_PATH)/scripts/qapi-types.py
diff --git a/tests/rcutorture.c b/tests/rcutorture.c
new file mode 100644
index 0000000..514ed26
--- /dev/null
+++ b/tests/rcutorture.c
@@ -0,0 +1,387 @@
+/*
+ * rcutorture.c: simple user-level performance/stress test of RCU.
+ *
+ * Usage:
+ *     ./rcu <nreaders> rperf [ <seconds> ]
+ *         Run a read-side performance test with the specified
+ *         number of readers for <seconds> seconds.
+ *         Thus "./rcu 16 rperf 2" would run 16 readers on even-numbered
+ *         CPUs from 0 to 30.
+ *     ./rcu <nupdaters> uperf [ <seconds> ]
+ *         Run an update-side performance test with the specified
+ *         number of updaters and specified duration.
+ *     ./rcu <nreaders> perf [ <seconds> ]
+ *         Run a combined read/update performance test with the specified
+ *         number of readers and one updater and specified duration.
+ *
+ * The above tests produce output as follows:
+ *
+ * n_reads: 46008000  n_updates: 146026  nreaders: 2  nupdaters: 1 duration: 1
+ * ns/read: 43.4707  ns/update: 6848.1
+ *
+ * The first line lists the total number of RCU reads and updates executed
+ * during the test, the number of reader threads, the number of updater
+ * threads, and the duration of the test in seconds.  The second line
+ * lists the average duration of each type of operation in nanoseconds,
+ * or "nan" if the corresponding type of operation was not performed.
+ *
+ *     ./rcu <nreaders> stress [ <seconds> ]
+ *         Run a stress test with the specified number of readers and
+ *         one updater.
+ *
+ * This test produces output as follows:
+ *
+ * n_reads: 114633217  n_updates: 3903415  n_mberror: 0
+ * rcu_stress_count: 114618391 14826 0 0 0 0 0 0 0 0 0
+ *
+ * The first line lists the number of RCU read and update operations
+ * executed, followed by the number of memory-ordering violations
+ * (which will be zero in a correct RCU implementation).  The second
+ * line lists the number of readers observing progressively more stale
+ * data.  A correct RCU implementation will have all but the first two
+ * numbers non-zero.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright (c) 2008 Paul E. McKenney, IBM Corporation.
+ */
+
+/*
+ * Test variables.
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include "qemu/atomic.h"
+#include "qemu/rcu.h"
+#include "qemu/compiler.h"
+#include "qemu/thread.h"
+
+long long n_reads = 0LL;
+long n_updates = 0L;
+int nthreadsrunning;
+
+char argsbuf[64];
+
+#define GOFLAG_INIT 0
+#define GOFLAG_RUN  1
+#define GOFLAG_STOP 2
+
+static volatile int goflag = GOFLAG_INIT;
+
+#define RCU_READ_RUN 1000
+
+#define NR_THREADS 100
+static QemuThread threads[NR_THREADS];
+static struct rcu_reader_data *data[NR_THREADS];
+static int n_threads;
+
+static void create_thread(void *(*func)(void *))
+{
+    if (n_threads >= NR_THREADS) {
+        fprintf(stderr, "Thread limit of %d exceeded!\n", NR_THREADS);
+        exit(-1);
+    }
+    qemu_thread_create(&threads[n_threads], func, &data[n_threads],
+                       QEMU_THREAD_JOINABLE);
+    n_threads++;
+}
+
+static void wait_all_threads(void)
+{
+    int i;
+
+    for (i = 0; i < n_threads; i++) {
+        qemu_thread_join(&threads[i]);
+    }
+}
+
+/*
+ * Performance test.
+ */
+
+static void *rcu_read_perf_test(void *arg)
+{
+    int i;
+    long long n_reads_local = 0;
+
+    *(struct rcu_reader_data **)arg = get_rcu_reader();
+    atomic_inc(&nthreadsrunning);
+    rcu_thread_offline();
+    while (goflag == GOFLAG_INIT) {
+        g_usleep(1000);
+    }
+    rcu_thread_online();
+    while (goflag == GOFLAG_RUN) {
+        for (i = 0; i < RCU_READ_RUN; i++) {
+            rcu_read_lock();
+            rcu_read_unlock();
+        }
+        n_reads_local += RCU_READ_RUN;
+        rcu_quiescent_state();
+    }
+    atomic_add(&n_reads, n_reads_local);
+
+    return NULL;
+}
+
+static void *rcu_update_perf_test(void *arg)
+{
+    long long n_updates_local = 0;
+
+    *(struct rcu_reader_data **)arg = get_rcu_reader();
+    atomic_inc(&nthreadsrunning);
+    while (goflag == GOFLAG_INIT) {
+        g_usleep(1000);
+    }
+    while (goflag == GOFLAG_RUN) {
+        synchronize_rcu();
+        n_updates_local++;
+    }
+    atomic_add(&n_updates, n_updates_local);
+    return NULL;
+}
+
+static void perftestinit(void)
+{
+    nthreadsrunning = 0;
+}
+
+static void perftestrun(int nthreads, int duration, int nreaders, int nupdaters)
+{
+    while (atomic_read(&nthreadsrunning) < nthreads) {
+        g_usleep(1000);
+    }
+    goflag = GOFLAG_RUN;
+    sleep(duration);
+    goflag = GOFLAG_STOP;
+    wait_all_threads();
+    printf("n_reads: %lld  n_updates: %ld  nreaders: %d  nupdaters: %d duration: %d\n",
+           n_reads, n_updates, nreaders, nupdaters, duration);
+    printf("ns/read: %g  ns/update: %g\n",
+           ((duration * 1000*1000*1000.*(double)nreaders) /
+        (double)n_reads),
+           ((duration * 1000*1000*1000.*(double)nupdaters) /
+        (double)n_updates));
+    exit(0);
+}
+
+static void perftest(int nreaders, int duration)
+{
+    int i;
+
+    perftestinit();
+    for (i = 0; i < nreaders; i++) {
+        create_thread(rcu_read_perf_test);
+    }
+    create_thread(rcu_update_perf_test);
+    perftestrun(i + 1, duration, nreaders, 1);
+}
+
+static void rperftest(int nreaders, int duration)
+{
+    int i;
+
+    perftestinit();
+    for (i = 0; i < nreaders; i++) {
+        create_thread(rcu_read_perf_test);
+    }
+    perftestrun(i, duration, nreaders, 0);
+}
+
+static void uperftest(int nupdaters, int duration)
+{
+    int i;
+
+    perftestinit();
+    for (i = 0; i < nupdaters; i++) {
+        create_thread(rcu_update_perf_test);
+    }
+    perftestrun(i, duration, 0, nupdaters);
+}
+
+/*
+ * Stress test.
+ */
+
+#define RCU_STRESS_PIPE_LEN 10
+
+struct rcu_stress {
+    int pipe_count;
+    int mbtest;
+};
+
+struct rcu_stress rcu_stress_array[RCU_STRESS_PIPE_LEN] = { { 0 } };
+struct rcu_stress *rcu_stress_current;
+int rcu_stress_idx;
+
+int n_mberror;
+long long rcu_stress_count[RCU_STRESS_PIPE_LEN + 1];
+
+
+static void *rcu_read_stress_test(void *arg)
+{
+    int i;
+    int itercnt = 0;
+    struct rcu_stress *p;
+    int pc;
+    long long n_reads_local = 0;
+    volatile int garbage;
+
+    *(struct rcu_reader_data **)arg = get_rcu_reader();
+    rcu_thread_offline();
+    while (goflag == GOFLAG_INIT) {
+        g_usleep(1000);
+    }
+    rcu_thread_online();
+    while (goflag == GOFLAG_RUN) {
+        rcu_read_lock();
+        p = rcu_dereference(&rcu_stress_current);
+        if (p->mbtest == 0) {
+            n_mberror++;
+        }
+        rcu_read_lock();
+        for (i = 0; i < 100; i++)
+            garbage++;
+        rcu_read_unlock();
+        pc = p->pipe_count;
+        rcu_read_unlock();
+        if ((pc > RCU_STRESS_PIPE_LEN) || (pc < 0)) {
+            pc = RCU_STRESS_PIPE_LEN;
+        }
+        atomic_inc(&rcu_stress_count[pc]);
+        n_reads_local++;
+        rcu_quiescent_state();
+        if ((++itercnt % 0x1000) == 0) {
+            synchronize_rcu();
+        }
+    }
+    atomic_add(&n_reads, n_reads_local);
+
+    return NULL;
+}
+
+static void *rcu_update_stress_test(void *arg)
+{
+    int i;
+    struct rcu_stress *p;
+
+    *(struct rcu_reader_data **)arg = get_rcu_reader();
+    while (goflag == GOFLAG_INIT) {
+        g_usleep(1000);
+    }
+    while (goflag == GOFLAG_RUN) {
+        i = rcu_stress_idx + 1;
+        if (i >= RCU_STRESS_PIPE_LEN) {
+            i = 0;
+        }
+        p = &rcu_stress_array[i];
+        p->mbtest = 0;
+        smp_mb();
+        p->pipe_count = 0;
+        p->mbtest = 1;
+        rcu_assign_pointer(rcu_stress_current, p);
+        rcu_stress_idx = i;
+        for (i = 0; i < RCU_STRESS_PIPE_LEN; i++)
+            if (i != rcu_stress_idx) {
+                rcu_stress_array[i].pipe_count++;
+            }
+        synchronize_rcu();
+        n_updates++;
+    }
+    return NULL;
+}
+
+static void *rcu_fake_update_stress_test(void *arg)
+{
+    *(struct rcu_reader_data **)arg = get_rcu_reader();
+    while (goflag == GOFLAG_INIT) {
+        g_usleep(1000);
+    }
+    while (goflag == GOFLAG_RUN) {
+        synchronize_rcu();
+        g_usleep(1000);
+    }
+    return NULL;
+}
+
+static void stresstest(int nreaders, int duration)
+{
+    int i;
+
+    rcu_stress_current = &rcu_stress_array[0];
+    rcu_stress_current->pipe_count = 0;
+    rcu_stress_current->mbtest = 1;
+    for (i = 0; i < nreaders; i++) {
+        create_thread(rcu_read_stress_test);
+    }
+    create_thread(rcu_update_stress_test);
+    for (i = 0; i < 5; i++) {
+        create_thread(rcu_fake_update_stress_test);
+    }
+    goflag = GOFLAG_RUN;
+    sleep(duration);
+    goflag = GOFLAG_STOP;
+    wait_all_threads();
+    printf("n_reads: %lld  n_updates: %ld  n_mberror: %d\n",
+           n_reads, n_updates, n_mberror);
+    printf("rcu_stress_count:");
+    for (i = 0; i <= RCU_STRESS_PIPE_LEN; i++) {
+        printf(" %lld", rcu_stress_count[i]);
+        if (i > 1) {
+            assert(rcu_stress_count[i] == 0);
+        }
+    }
+    printf("\n");
+    exit(0);
+}
+
+/*
+ * Mainprogram.
+ */
+
+static void usage(int argc, char *argv[])
+{
+    fprintf(stderr, "Usage: %s [nreaders [ perf | stress ] ]\n", argv[0]);
+    exit(-1);
+}
+
+int main(int argc, char *argv[])
+{
+    int nreaders = 1;
+    int duration = 1;
+
+    /* This thread is not part of the test.  */
+    rcu_thread_offline();
+
+    if (argc >= 2) {
+        nreaders = strtoul(argv[1], NULL, 0);
+    }
+    if (argc > 3) {
+        duration = strtoul(argv[3], NULL, 0);
+    }
+    if (argc < 3 || strcmp(argv[2], "stress") == 0) {
+        stresstest(nreaders, duration);
+    } else if (strcmp(argv[2], "rperf") == 0) {
+        rperftest(nreaders, duration);
+    } else if (strcmp(argv[2], "uperf") == 0) {
+        uperftest(nreaders, duration);
+    } else if (strcmp(argv[2], "perf") == 0) {
+        perftest(nreaders, duration);
+    }
+    usage(argc, argv);
+    return 0;
+}
-- 
1.8.1.4

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 07/12] rcu: allow nested calls to rcu_thread_offline/rcu_thread_online
  2013-05-15 15:48 [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU Paolo Bonzini
                   ` (5 preceding siblings ...)
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 06/12] rcu: add rcutorture Paolo Bonzini
@ 2013-05-15 15:48 ` Paolo Bonzini
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 08/12] qemu-thread: report RCU quiescent states Paolo Bonzini
                   ` (7 subsequent siblings)
  14 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2013-05-15 15:48 UTC (permalink / raw)
  To: qemu-devel; +Cc: Jan Kiszka, qemulist, David Gibson

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/rcu.txt       |  5 +++++
 include/qemu/rcu.h | 21 +++++++++++++++++++--
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/docs/rcu.txt b/docs/rcu.txt
index d7c4f0b..4e7cde3 100644
--- a/docs/rcu.txt
+++ b/docs/rcu.txt
@@ -187,6 +187,11 @@ Marking quiescent states is done with the following three APIs:
         thread.
 
 
+rcu_thread_offline() and rcu_thread_online() can be nested.  The end of
+the extended quiescent state will coincide with the outermost call to
+rcu_thread_online().
+
+
 The following APIs can be used to use RCU in a thread that is not
 created with qemu_thread_create():
 
diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
index e43b912..3a55045 100644
--- a/include/qemu/rcu.h
+++ b/include/qemu/rcu.h
@@ -77,6 +77,9 @@ struct rcu_reader_data {
     unsigned long ctr;
     bool waiting;
 
+    /* Data used by reader only */
+    unsigned offline;
+
     /* Data used for registry, protected by rcu_gp_lock */
     QLIST_ENTRY(rcu_reader_data) node;
 };
@@ -127,9 +130,14 @@ static inline void rcu_read_unlock(void)
 static inline void rcu_quiescent_state(void)
 {
     struct rcu_reader_data *p_rcu_reader = get_rcu_reader();
+    unsigned ctr;
+
+    if (p_rcu_reader->offline > 0) {
+        return;
+    }
 
     /* Reuses smp_rmb() in the last rcu_read_unlock().  */
-    unsigned ctr = atomic_read(&rcu_gp_ctr);
+    ctr = atomic_read(&rcu_gp_ctr);
     atomic_xchg(&p_rcu_reader->ctr, ctr);
     if (atomic_read(&p_rcu_reader->waiting)) {
         atomic_set(&p_rcu_reader->waiting, false);
@@ -141,6 +149,10 @@ static inline void rcu_thread_offline(void)
 {
     struct rcu_reader_data *p_rcu_reader = get_rcu_reader();
 
+    if (p_rcu_reader->offline++ > 0) {
+        return;
+    }
+
     atomic_xchg(&p_rcu_reader->ctr, 0);
     if (atomic_read(&p_rcu_reader->waiting)) {
         atomic_set(&p_rcu_reader->waiting, false);
@@ -150,7 +162,12 @@ static inline void rcu_thread_offline(void)
 
 static inline void rcu_thread_online(void)
 {
-    rcu_quiescent_state();
+    struct rcu_reader_data *p_rcu_reader = get_rcu_reader();
+
+    assert(p_rcu_reader->offline != 0);
+    if (--p_rcu_reader->offline == 0) {
+        rcu_quiescent_state();
+    }
 }
 
 extern void synchronize_rcu(void);
-- 
1.8.1.4

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 08/12] qemu-thread: report RCU quiescent states
  2013-05-15 15:48 [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU Paolo Bonzini
                   ` (6 preceding siblings ...)
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 07/12] rcu: allow nested calls to rcu_thread_offline/rcu_thread_online Paolo Bonzini
@ 2013-05-15 15:48 ` Paolo Bonzini
  2013-05-16  8:33   ` liu ping fan
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 09/12] event loop: " Paolo Bonzini
                   ` (6 subsequent siblings)
  14 siblings, 1 reply; 27+ messages in thread
From: Paolo Bonzini @ 2013-05-15 15:48 UTC (permalink / raw)
  To: qemu-devel; +Cc: Jan Kiszka, qemulist, David Gibson

Most threads will use mutexes and other sleeping synchronization primitives
(condition variables, semaphores, events) periodically.  For these threads,
the synchronization primitives are natural places to report a quiescent
state (possibly an extended one).

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/rcu.txt             | 28 ++++++++++++++++++++++++++++
 util/qemu-thread-posix.c | 30 ++++++++++++++++++++++++++----
 util/qemu-thread-win32.c | 16 +++++++++++++++-
 util/rcu.c               |  3 ---
 4 files changed, 69 insertions(+), 8 deletions(-)

diff --git a/docs/rcu.txt b/docs/rcu.txt
index 4e7cde3..d249ebf 100644
--- a/docs/rcu.txt
+++ b/docs/rcu.txt
@@ -168,6 +168,34 @@ of "quiescent states", i.e. points where no RCU read-side critical
 section can be active.  All threads created with qemu_thread_create
 participate in the RCU mechanism and need to annotate such points.
 
+Luckily, in most cases no manual annotation is needed, because waiting
+on condition variables (qemu_cond_wait), semaphores (qemu_sem_wait,
+qemu_sem_timedwait) or events (qemu_event_wait) implicitly marks the thread
+as quiescent for the whole duration of the wait.  (There is an exception
+for semaphore waits with a zero timeout).
+
+Manual annotation is still needed in the following cases:
+
+- threads that spend their sleeping time in the kernel, for example
+  in a call to select(), poll() or WaitForMultipleObjects().  The QEMU
+  I/O thread is an example of this case.
+
+- threads that perform a lot of I/O.  In QEMU, the workers used for
+  aio=thread are an example of this case (see aio_worker in block/raw-*).
+
+- threads that run continuously until they exit.  The migration thread
+  is an example of this case.
+
+Regarding the second case, note that the workers run in the QEMU thread
+pool.  The thread pool uses semaphores for synchronization, hence it does
+report quiescent states periodically.  However, in some cases (e.g. NFS
+mounted with the "hard" option) the workers can take an arbitrarily long
+amount of time.  When this happens, synchronize_rcu() will not exit and
+call_rcu() callbacks will be delayed arbitrarily.  It is therefore a
+good idea to mark I/O system calls as quiescence points in the worker
+functions.
+
+
 Marking quiescent states is done with the following three APIs:
 
      void rcu_quiescent_state(void);
diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c
index 2df3382..f1f325a 100644
--- a/util/qemu-thread-posix.c
+++ b/util/qemu-thread-posix.c
@@ -119,7 +119,9 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex)
 {
     int err;
 
+    rcu_thread_offline();
     err = pthread_cond_wait(&cond->cond, &mutex->lock);
+    rcu_thread_online();
     if (err)
         error_exit(err, __func__);
 }
@@ -212,6 +214,10 @@ int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
     int rc;
     struct timespec ts;
 
+    if (ms) {
+        rcu_thread_offline();
+    }
+
 #if defined(__APPLE__) || defined(__NetBSD__)
     compute_abs_deadline(&ts, ms);
     pthread_mutex_lock(&sem->lock);
@@ -227,7 +233,10 @@ int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
         }
     }
     pthread_mutex_unlock(&sem->lock);
-    return (rc == ETIMEDOUT ? -1 : 0);
+    if (rc == ETIMEDOUT) {
+        rc == -1;
+    }
+
 #else
     if (ms <= 0) {
         /* This is cheaper than sem_timedwait.  */
@@ -235,7 +244,7 @@ int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
             rc = sem_trywait(&sem->sem);
         } while (rc == -1 && errno == EINTR);
         if (rc == -1 && errno == EAGAIN) {
-            return -1;
+            goto out;
         }
     } else {
         compute_abs_deadline(&ts, ms);
@@ -243,18 +252,25 @@ int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
             rc = sem_timedwait(&sem->sem, &ts);
         } while (rc == -1 && errno == EINTR);
         if (rc == -1 && errno == ETIMEDOUT) {
-            return -1;
+            goto out;
         }
     }
     if (rc < 0) {
         error_exit(errno, __func__);
     }
-    return 0;
 #endif
+
+out:
+    if (ms) {
+        rcu_thread_offline();
+    }
+    return rc;
 }
 
 void qemu_sem_wait(QemuSemaphore *sem)
 {
+    rcu_thread_offline();
+
 #if defined(__APPLE__) || defined(__NetBSD__)
     pthread_mutex_lock(&sem->lock);
     --sem->count;
@@ -272,6 +288,8 @@ void qemu_sem_wait(QemuSemaphore *sem)
         error_exit(errno, __func__);
     }
 #endif
+
+    rcu_thread_online();
 }
 
 #ifdef __linux__
@@ -380,7 +398,11 @@ void qemu_event_wait(QemuEvent *ev)
                 return;
             }
         }
+        rcu_thread_offline();
         futex_wait(ev, EV_BUSY);
+        rcu_thread_online();
+    } else {
+        rcu_quiescent_state();
     }
 }
 
diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c
index 0c4850d..6fff1a4 100644
--- a/util/qemu-thread-win32.c
+++ b/util/qemu-thread-win32.c
@@ -12,6 +12,7 @@
  */
 #include "qemu-common.h"
 #include "qemu/thread.h"
+#include "qemu/rcu.h"
 #include <process.h>
 #include <assert.h>
 #include <limits.h>
@@ -170,7 +171,9 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex)
      * leaving mutex unlocked before we wait on semaphore.
      */
     qemu_mutex_unlock(mutex);
+    rcu_thread_offline();
     WaitForSingleObject(cond->sema, INFINITE);
+    rcu_thread_online();
 
     /* Now waiters must rendez-vous with the signaling thread and
      * let it continue.  For cond_broadcast this has heavy contention
@@ -210,7 +213,16 @@ void qemu_sem_post(QemuSemaphore *sem)
 
 int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
 {
-    int rc = WaitForSingleObject(sem->sema, ms);
+    int rc;
+
+    if (ms) {
+        rcu_thread_offline();
+    }
+    rc = WaitForSingleObject(sem->sema, ms);
+    if (ms) {
+        rcu_thread_offline();
+    }
+
     if (rc == WAIT_OBJECT_0) {
         return 0;
     }
@@ -250,7 +262,9 @@ void qemu_event_reset(QemuEvent *ev)
 
 void qemu_event_wait(QemuEvent *ev)
 {
+    rcu_thread_offline();
     WaitForSingleObject(ev->event, INFINITE);
+    rcu_thread_online();
 }
 
 struct QemuThreadData {
diff --git a/util/rcu.c b/util/rcu.c
index 27fda86..91d6ae2 100644
--- a/util/rcu.c
+++ b/util/rcu.c
@@ -240,9 +240,6 @@ static void *call_rcu_thread(void *opaque)
 {
     struct rcu_head *node;
 
-    /* This thread is just a writer.  */
-    rcu_thread_offline();
-
     for (;;) {
         int tries = 0;
         int n = atomic_read(&rcu_call_count);
-- 
1.8.1.4

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 09/12] event loop: report RCU quiescent states
  2013-05-15 15:48 [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU Paolo Bonzini
                   ` (7 preceding siblings ...)
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 08/12] qemu-thread: report RCU quiescent states Paolo Bonzini
@ 2013-05-15 15:48 ` Paolo Bonzini
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 10/12] cpus: " Paolo Bonzini
                   ` (5 subsequent siblings)
  14 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2013-05-15 15:48 UTC (permalink / raw)
  To: qemu-devel; +Cc: Jan Kiszka, qemulist, David Gibson

Threads that run event loops also have places that can sleep for an extended
time.  Place an extended quiescent state there.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-posix.c |  7 +++++++
 aio-win32.c | 10 +++++++++-
 main-loop.c |  5 +++++
 3 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/aio-posix.c b/aio-posix.c
index b68eccd..4ab8f4b 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -17,6 +17,7 @@
 #include "block/block.h"
 #include "qemu/queue.h"
 #include "qemu/sockets.h"
+#include "qemu/rcu.h"
 
 struct AioHandler
 {
@@ -232,9 +233,15 @@ bool aio_poll(AioContext *ctx, bool blocking)
     }
 
     /* wait until next event */
+    if (blocking) {
+        rcu_thread_offline();
+    }
     ret = g_poll((GPollFD *)ctx->pollfds->data,
                  ctx->pollfds->len,
                  blocking ? -1 : 0);
+    if (blocking) {
+        rcu_thread_online();
+    }
 
     /* if we have any readable fds, dispatch event */
     if (ret > 0) {
diff --git a/aio-win32.c b/aio-win32.c
index 38723bf..f1e3f0c 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -19,6 +19,7 @@
 #include "block/block.h"
 #include "qemu/queue.h"
 #include "qemu/sockets.h"
+#include "qemu/rcu.h"
 
 struct AioHandler {
     EventNotifier *e;
@@ -175,7 +176,14 @@ bool aio_poll(AioContext *ctx, bool blocking)
     /* wait until next event */
     while (count > 0) {
         int timeout = blocking ? INFINITE : 0;
-        int ret = WaitForMultipleObjects(count, events, FALSE, timeout);
+
+        if (timeout) {
+            rcu_thread_offline();
+        }
+        ret = WaitForMultipleObjects(count, events, FALSE, timeout);
+        if (timeout) {
+            rcu_thread_online();
+        }
 
         /* if we have any signaled events, dispatch event */
         if ((DWORD) (ret - WAIT_OBJECT_0) >= count) {
diff --git a/main-loop.c b/main-loop.c
index f46aece..a048394 100644
--- a/main-loop.c
+++ b/main-loop.c
@@ -27,6 +27,7 @@
 #include "slirp/slirp.h"
 #include "qemu/main-loop.h"
 #include "block/aio.h"
+#include "qemu/rcu.h"
 
 #ifndef _WIN32
 
@@ -219,6 +220,7 @@ static int os_host_main_loop_wait(uint32_t timeout)
     if (timeout > 0) {
         spin_counter = 0;
         qemu_mutex_unlock_iothread();
+        rcu_thread_offline();
     } else {
         spin_counter++;
     }
@@ -226,6 +228,7 @@ static int os_host_main_loop_wait(uint32_t timeout)
     ret = g_poll((GPollFD *)gpollfds->data, gpollfds->len, timeout);
 
     if (timeout > 0) {
+        rcu_thread_online();
         qemu_mutex_lock_iothread();
     }
 
@@ -409,7 +412,9 @@ static int os_host_main_loop_wait(uint32_t timeout)
     }
 
     qemu_mutex_unlock_iothread();
+    rcu_thread_offline();
     g_poll_ret = g_poll(poll_fds, n_poll_fds + w->num, poll_timeout);
+    rcu_thread_online();
     qemu_mutex_lock_iothread();
     if (g_poll_ret > 0) {
         for (i = 0; i < w->num; i++) {
-- 
1.8.1.4

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 10/12] cpus: report RCU quiescent states
  2013-05-15 15:48 [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU Paolo Bonzini
                   ` (8 preceding siblings ...)
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 09/12] event loop: " Paolo Bonzini
@ 2013-05-15 15:48 ` Paolo Bonzini
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 11/12] block: " Paolo Bonzini
                   ` (4 subsequent siblings)
  14 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2013-05-15 15:48 UTC (permalink / raw)
  To: qemu-devel; +Cc: Jan Kiszka, qemulist, David Gibson

CPU threads have extended quiescent states while relinquishing control
to the accelerator (except TCG).

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 cpus.c    | 3 +++
 kvm-all.c | 3 +++
 2 files changed, 6 insertions(+)

diff --git a/cpus.c b/cpus.c
index c232265..66970d6 100644
--- a/cpus.c
+++ b/cpus.c
@@ -37,6 +37,7 @@
 #include "sysemu/qtest.h"
 #include "qemu/main-loop.h"
 #include "qemu/bitmap.h"
+#include "qemu/rcu.h"
 
 #ifndef _WIN32
 #include "qemu/compatfd.h"
@@ -793,6 +794,7 @@ static void *qemu_dummy_cpu_thread_fn(void *arg)
     while (1) {
         cpu_single_env = NULL;
         qemu_mutex_unlock_iothread();
+        rcu_thread_offline();
         do {
             int sig;
             r = sigwait(&waitset, &sig);
@@ -801,6 +803,7 @@ static void *qemu_dummy_cpu_thread_fn(void *arg)
             perror("sigwait");
             exit(1);
         }
+        rcu_thread_online();
         qemu_mutex_lock_iothread();
         cpu_single_env = env;
         qemu_wait_io_event_common(cpu);
diff --git a/kvm-all.c b/kvm-all.c
index 8222729..6f952ac 100644
--- a/kvm-all.c
+++ b/kvm-all.c
@@ -33,6 +33,7 @@
 #include "exec/memory.h"
 #include "exec/address-spaces.h"
 #include "qemu/event_notifier.h"
+#include "qemu/rcu.h"
 #include "trace.h"
 
 /* This check must be after config-host.h is included */
@@ -1611,7 +1612,9 @@ int kvm_cpu_exec(CPUArchState *env)
         }
         qemu_mutex_unlock_iothread();
 
+        rcu_thread_offline();
         run_ret = kvm_vcpu_ioctl(cpu, KVM_RUN, 0);
+        rcu_thread_online();
 
         qemu_mutex_lock_iothread();
         kvm_arch_post_run(cpu, run);
-- 
1.8.1.4

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 11/12] block: report RCU quiescent states
  2013-05-15 15:48 [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU Paolo Bonzini
                   ` (9 preceding siblings ...)
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 10/12] cpus: " Paolo Bonzini
@ 2013-05-15 15:48 ` Paolo Bonzini
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 12/12] migration: " Paolo Bonzini
                   ` (3 subsequent siblings)
  14 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2013-05-15 15:48 UTC (permalink / raw)
  To: qemu-devel; +Cc: Jan Kiszka, qemulist, David Gibson

The aio workers may spend a long time executing I/O operations;
mark that time as an extended quiescent state.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/raw-posix.c | 3 +++
 block/raw-win32.c | 3 +++
 2 files changed, 6 insertions(+)

diff --git a/block/raw-posix.c b/block/raw-posix.c
index c0ccf27..637fe8a 100644
--- a/block/raw-posix.c
+++ b/block/raw-posix.c
@@ -26,6 +26,7 @@
 #include "qemu/log.h"
 #include "block/block_int.h"
 #include "qemu/module.h"
+#include "qemu/rcu.h"
 #include "trace.h"
 #include "block/thread-pool.h"
 #include "qemu/iov.h"
@@ -735,6 +736,7 @@ static int aio_worker(void *arg)
     RawPosixAIOData *aiocb = arg;
     ssize_t ret = 0;
 
+    rcu_thread_offline();
     switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
     case QEMU_AIO_READ:
         ret = handle_aiocb_rw(aiocb);
@@ -774,6 +776,7 @@ static int aio_worker(void *arg)
     }
 
     g_slice_free(RawPosixAIOData, aiocb);
+    rcu_thread_online();
     return ret;
 }
 
diff --git a/block/raw-win32.c b/block/raw-win32.c
index 7c03b6d..fc573b7 100644
--- a/block/raw-win32.c
+++ b/block/raw-win32.c
@@ -25,6 +25,7 @@
 #include "qemu/timer.h"
 #include "block/block_int.h"
 #include "qemu/module.h"
+#include "qemu/rcu.h"
 #include "raw-aio.h"
 #include "trace.h"
 #include "block/thread-pool.h"
@@ -99,6 +100,7 @@ static int aio_worker(void *arg)
     ssize_t ret = 0;
     size_t count;
 
+    rcu_thread_offline();
     switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
     case QEMU_AIO_READ:
         count = handle_aiocb_rw(aiocb);
@@ -136,6 +138,7 @@ static int aio_worker(void *arg)
     }
 
     g_slice_free(RawWin32AIOData, aiocb);
+    rcu_thread_online();
     return ret;
 }
 
-- 
1.8.1.4

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 12/12] migration: report RCU quiescent states
  2013-05-15 15:48 [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU Paolo Bonzini
                   ` (10 preceding siblings ...)
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 11/12] block: " Paolo Bonzini
@ 2013-05-15 15:48 ` Paolo Bonzini
  2013-05-15 16:03 ` [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU Peter Maydell
                   ` (2 subsequent siblings)
  14 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2013-05-15 15:48 UTC (permalink / raw)
  To: qemu-devel; +Cc: Jan Kiszka, qemulist, David Gibson

The migration thread polls s->state periodically, it does not
use a mutex or condition variable, so it has to report quiescent
states manually.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 migration.c | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/migration.c b/migration.c
index 866d68d..fb8b326 100644
--- a/migration.c
+++ b/migration.c
@@ -22,6 +22,7 @@
 #include "qemu/sockets.h"
 #include "migration/block.h"
 #include "qemu/thread.h"
+#include "qemu/rcu.h"
 #include "qmp-commands.h"
 #include "trace.h"
 
@@ -510,6 +511,7 @@ static void *migration_thread(void *opaque)
         int64_t current_time;
         uint64_t pending_size;
 
+        rcu_quiescent_state();
         if (!qemu_file_rate_limit(s->file)) {
             DPRINTF("iterate\n");
             pending_size = qemu_savevm_state_pending(s->file, max_size);
-- 
1.8.1.4

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU
  2013-05-15 15:48 [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU Paolo Bonzini
                   ` (11 preceding siblings ...)
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 12/12] migration: " Paolo Bonzini
@ 2013-05-15 16:03 ` Peter Maydell
  2013-05-15 16:17   ` Paolo Bonzini
  2013-05-15 16:16 ` Peter Maydell
  2013-05-15 19:28 ` Peter Maydell
  14 siblings, 1 reply; 27+ messages in thread
From: Peter Maydell @ 2013-05-15 16:03 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: Jan Kiszka, David Gibson, qemu-devel, qemulist

On 15 May 2013 16:48, Paolo Bonzini <pbonzini@redhat.com> wrote:
> Giving a shot to rcutorture on a weak memory-model machine (PPC)
> would be nice.  It's available on my github repo as branch rcu.
> rcutorture works on Linux-x86.  My WINE setup has some problem with glib
> right now.

Doesn't compile (ARM):
root@localhost:~/qemu# make
  CC    hw/9pfs/virtio-9p-synth.o
hw/9pfs/virtio-9p-synth.c:20:22: fatal error: util/rcu.h: No such file
or directory

I assume that should be qemu/rcu.h...

thanks
-- PMM

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH 01/12] add a header file for atomic operations
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 01/12] add a header file for atomic operations Paolo Bonzini
@ 2013-05-15 16:11   ` Peter Maydell
  0 siblings, 0 replies; 27+ messages in thread
From: Peter Maydell @ 2013-05-15 16:11 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: Jan Kiszka, David Gibson, qemu-devel, qemulist

On 15 May 2013 16:48, Paolo Bonzini <pbonzini@redhat.com> wrote:
> We're already using them in several places, but __sync builtins are just
> too ugly to type, and do not provide seqcst load/store operations.

This doesn't compile:

  CC    tests/test-thread-pool.o
tests/test-thread-pool.c: In function ‘worker_cb’:
tests/test-thread-pool.c:20:34: error: macro "atomic_inc" passed 2
arguments, but takes just 1
tests/test-thread-pool.c:20:12: error: ‘atomic_inc’ undeclared (first
use in this function)
tests/test-thread-pool.c:20:12: note: each undeclared identifier is
reported only once for each function it appears in
tests/test-thread-pool.c:19:21: error: unused variable ‘data’
[-Werror=unused-variable]
tests/test-thread-pool.c: In function ‘long_cb’:
tests/test-thread-pool.c:26:27: error: macro "atomic_inc" passed 2
arguments, but takes just 1
tests/test-thread-pool.c:26:5: error: ‘atomic_inc’ undeclared (first
use in this function)
tests/test-thread-pool.c:28:27: error: macro "atomic_inc" passed 2
arguments, but takes just 1
tests/test-thread-pool.c:25:21: error: unused variable ‘data’
[-Werror=unused-variable]
tests/test-thread-pool.c: In function ‘worker_cb’:
tests/test-thread-pool.c:21:1: error: control reaches end of non-void
function [-Werror=return-type]
cc1: all warnings being treated as errors

thanks
-- PMM

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU
  2013-05-15 15:48 [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU Paolo Bonzini
                   ` (12 preceding siblings ...)
  2013-05-15 16:03 ` [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU Peter Maydell
@ 2013-05-15 16:16 ` Peter Maydell
  2013-05-15 16:19   ` Paolo Bonzini
  2013-05-15 19:28 ` Peter Maydell
  14 siblings, 1 reply; 27+ messages in thread
From: Peter Maydell @ 2013-05-15 16:16 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: Jan Kiszka, David Gibson, qemu-devel, qemulist

On 15 May 2013 16:48, Paolo Bonzini <pbonzini@redhat.com> wrote:
> Giving a shot to rcutorture on a weak memory-model machine (PPC)
> would be nice.

# ./tests/rcutorture 16 rperf 2
n_reads: 116103000  n_updates: 0  nreaders: 16  nupdaters: 0 duration: 2
ns/read: 275.617  ns/update: nan

...that "nan" isn't entirely reassuring :-)

Other than that, I gave it a few runs and it seemed OK, but it
would be helpful if you could give some command lines that would
constitute a usefully thorough test process.

thanks
-- PMM

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU
  2013-05-15 16:03 ` [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU Peter Maydell
@ 2013-05-15 16:17   ` Paolo Bonzini
  0 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2013-05-15 16:17 UTC (permalink / raw)
  To: Peter Maydell; +Cc: Jan Kiszka, David Gibson, qemu-devel, qemulist

Il 15/05/2013 18:03, Peter Maydell ha scritto:
> On 15 May 2013 16:48, Paolo Bonzini <pbonzini@redhat.com> wrote:
>> Giving a shot to rcutorture on a weak memory-model machine (PPC)
>> would be nice.  It's available on my github repo as branch rcu.
>> rcutorture works on Linux-x86.  My WINE setup has some problem with glib
>> right now.
> 
> Doesn't compile (ARM):
> root@localhost:~/qemu# make
>   CC    hw/9pfs/virtio-9p-synth.o
> hw/9pfs/virtio-9p-synth.c:20:22: fatal error: util/rcu.h: No such file
> or directory
> 
> I assume that should be qemu/rcu.h...

Yep.  Note that this series introduces no use of RCU in QEMU, so the
only test you can do is with tests/rcutorture (which compiles fine).
Kevin lent me a ppc64 machine, and I tested it myself.

A very useful thing you could do is proofreading the docs... :)

Paolo

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU
  2013-05-15 16:16 ` Peter Maydell
@ 2013-05-15 16:19   ` Paolo Bonzini
  0 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2013-05-15 16:19 UTC (permalink / raw)
  To: Peter Maydell; +Cc: Jan Kiszka, David Gibson, qemu-devel, qemulist

Il 15/05/2013 18:16, Peter Maydell ha scritto:
> On 15 May 2013 16:48, Paolo Bonzini <pbonzini@redhat.com> wrote:
>> Giving a shot to rcutorture on a weak memory-model machine (PPC)
>> would be nice.
> 
> # ./tests/rcutorture 16 rperf 2
> n_reads: 116103000  n_updates: 0  nreaders: 16  nupdaters: 0 duration: 2
> ns/read: 275.617  ns/update: nan
> 
> ...that "nan" isn't entirely reassuring :-)
> 
> Other than that, I gave it a few runs and it seemed OK, but it
> would be helpful if you could give some command lines that would
> constitute a usefully thorough test process.

"tests/rcutorture 10 stress 10" is a pretty good test.  Here it gives
something like this:

n_reads: 30374101  n_updates: 232950  n_mberror: 0
rcu_stress_count: 30028631 345470 0 0 0 0 0 0 0 0 0

It will abort if something goes wrong.

Paolo

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU
  2013-05-15 15:48 [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU Paolo Bonzini
                   ` (13 preceding siblings ...)
  2013-05-15 16:16 ` Peter Maydell
@ 2013-05-15 19:28 ` Peter Maydell
  14 siblings, 0 replies; 27+ messages in thread
From: Peter Maydell @ 2013-05-15 19:28 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: Jan Kiszka, David Gibson, qemu-devel, qemulist

On 15 May 2013 16:48, Paolo Bonzini <pbonzini@redhat.com> wrote:
> Here is an RCU implementation based on liburcu.

Doesn't compile on MacOSX/clang:

  CC    util/rcu.o
In file included from util/rcu.c:35:
/Users/pm215/src/qemu/include/qemu/rcu.h:112:6: warning: implicit
declaration of function 'g_private_replace' is invalid in C99
      [-Wimplicit-function-declaration]
     g_private_replace(&rcu_reader_key,
     ^
util/rcu.c:67:1: warning: implicit declaration of function
'G_PRIVATE_INIT' is invalid in C99 [-Wimplicit-function-declaration]
DEFINE_RCU_READER();
^
/Users/pm215/src/qemu/include/qemu/rcu.h:103:32: note: expanded from
macro 'DEFINE_RCU_READER'
     GPrivate rcu_reader_key = G_PRIVATE_INIT(g_free)
                               ^
util/rcu.c:67:1: error: variable has incomplete type 'GPrivate' (aka
'struct _GPrivate')
DEFINE_RCU_READER();
^
/Users/pm215/src/qemu/include/qemu/rcu.h:103:15: note: expanded from
macro 'DEFINE_RCU_READER'
     GPrivate rcu_reader_key = G_PRIVATE_INIT(g_free)
              ^
/sw/include/glib-2.0/glib/gthread.h:74:16: note: forward declaration
of 'struct _GPrivate'
typedef struct _GPrivate        GPrivate;
               ^
2 warnings and 1 error generated.
make: *** [util/rcu.o] Error 1

g_private_replace() didn't come in until glib 2.32...

thanks
-- PMM

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH 08/12] qemu-thread: report RCU quiescent states
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 08/12] qemu-thread: report RCU quiescent states Paolo Bonzini
@ 2013-05-16  8:33   ` liu ping fan
  2013-05-16  8:43     ` Paolo Bonzini
  0 siblings, 1 reply; 27+ messages in thread
From: liu ping fan @ 2013-05-16  8:33 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: Jan Kiszka, qemu-devel, David Gibson

On Wed, May 15, 2013 at 11:48 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
> Most threads will use mutexes and other sleeping synchronization primitives
> (condition variables, semaphores, events) periodically.  For these threads,
> the synchronization primitives are natural places to report a quiescent
> state (possibly an extended one).
>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  docs/rcu.txt             | 28 ++++++++++++++++++++++++++++
>  util/qemu-thread-posix.c | 30 ++++++++++++++++++++++++++----
>  util/qemu-thread-win32.c | 16 +++++++++++++++-
>  util/rcu.c               |  3 ---
>  4 files changed, 69 insertions(+), 8 deletions(-)
>
> diff --git a/docs/rcu.txt b/docs/rcu.txt
> index 4e7cde3..d249ebf 100644
> --- a/docs/rcu.txt
> +++ b/docs/rcu.txt
> @@ -168,6 +168,34 @@ of "quiescent states", i.e. points where no RCU read-side critical
>  section can be active.  All threads created with qemu_thread_create
>  participate in the RCU mechanism and need to annotate such points.
>
> +Luckily, in most cases no manual annotation is needed, because waiting
> +on condition variables (qemu_cond_wait), semaphores (qemu_sem_wait,
> +qemu_sem_timedwait) or events (qemu_event_wait) implicitly marks the thread
> +as quiescent for the whole duration of the wait.  (There is an exception
> +for semaphore waits with a zero timeout).
> +
Why not the same rule for zero timeout?

> +Manual annotation is still needed in the following cases:
> +
> +- threads that spend their sleeping time in the kernel, for example
> +  in a call to select(), poll() or WaitForMultipleObjects().  The QEMU
> +  I/O thread is an example of this case.
> +
> +- threads that perform a lot of I/O.  In QEMU, the workers used for
> +  aio=thread are an example of this case (see aio_worker in block/raw-*).
> +
> +- threads that run continuously until they exit.  The migration thread
> +  is an example of this case.
> +
> +Regarding the second case, note that the workers run in the QEMU thread
> +pool.  The thread pool uses semaphores for synchronization, hence it does
> +report quiescent states periodically.  However, in some cases (e.g. NFS
> +mounted with the "hard" option) the workers can take an arbitrarily long
> +amount of time.  When this happens, synchronize_rcu() will not exit and
> +call_rcu() callbacks will be delayed arbitrarily.  It is therefore a
> +good idea to mark I/O system calls as quiescence points in the worker
> +functions.
> +
> +
>  Marking quiescent states is done with the following three APIs:
>
>       void rcu_quiescent_state(void);
> diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c
> index 2df3382..f1f325a 100644
> --- a/util/qemu-thread-posix.c
> +++ b/util/qemu-thread-posix.c
> @@ -119,7 +119,9 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex)
>  {
>      int err;
>
> +    rcu_thread_offline();
>      err = pthread_cond_wait(&cond->cond, &mutex->lock);
> +    rcu_thread_online();
>      if (err)
>          error_exit(err, __func__);
>  }
> @@ -212,6 +214,10 @@ int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
>      int rc;
>      struct timespec ts;
>
> +    if (ms) {
> +        rcu_thread_offline();
> +    }
> +
>  #if defined(__APPLE__) || defined(__NetBSD__)
>      compute_abs_deadline(&ts, ms);
>      pthread_mutex_lock(&sem->lock);
> @@ -227,7 +233,10 @@ int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
>          }
>      }
>      pthread_mutex_unlock(&sem->lock);
> -    return (rc == ETIMEDOUT ? -1 : 0);
> +    if (rc == ETIMEDOUT) {
> +        rc == -1;
> +    }
> +
>  #else
>      if (ms <= 0) {
>          /* This is cheaper than sem_timedwait.  */
> @@ -235,7 +244,7 @@ int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
>              rc = sem_trywait(&sem->sem);
>          } while (rc == -1 && errno == EINTR);
>          if (rc == -1 && errno == EAGAIN) {
> -            return -1;
> +            goto out;
>          }
>      } else {
>          compute_abs_deadline(&ts, ms);
> @@ -243,18 +252,25 @@ int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
>              rc = sem_timedwait(&sem->sem, &ts);
>          } while (rc == -1 && errno == EINTR);
>          if (rc == -1 && errno == ETIMEDOUT) {
> -            return -1;
> +            goto out;
>          }
>      }
>      if (rc < 0) {
>          error_exit(errno, __func__);
>      }
> -    return 0;
>  #endif
> +
> +out:
> +    if (ms) {
> +        rcu_thread_offline();

s/offline/online/

Regards,
Pingfan
> +    }
> +    return rc;
>  }
>
>  void qemu_sem_wait(QemuSemaphore *sem)
>  {
> +    rcu_thread_offline();
> +
>  #if defined(__APPLE__) || defined(__NetBSD__)
>      pthread_mutex_lock(&sem->lock);
>      --sem->count;
> @@ -272,6 +288,8 @@ void qemu_sem_wait(QemuSemaphore *sem)
>          error_exit(errno, __func__);
>      }
>  #endif
> +
> +    rcu_thread_online();
>  }
>
>  #ifdef __linux__
> @@ -380,7 +398,11 @@ void qemu_event_wait(QemuEvent *ev)
>                  return;
>              }
>          }
> +        rcu_thread_offline();
>          futex_wait(ev, EV_BUSY);
> +        rcu_thread_online();
> +    } else {
> +        rcu_quiescent_state();
>      }
>  }
>
> diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c
> index 0c4850d..6fff1a4 100644
> --- a/util/qemu-thread-win32.c
> +++ b/util/qemu-thread-win32.c
> @@ -12,6 +12,7 @@
>   */
>  #include "qemu-common.h"
>  #include "qemu/thread.h"
> +#include "qemu/rcu.h"
>  #include <process.h>
>  #include <assert.h>
>  #include <limits.h>
> @@ -170,7 +171,9 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex)
>       * leaving mutex unlocked before we wait on semaphore.
>       */
>      qemu_mutex_unlock(mutex);
> +    rcu_thread_offline();
>      WaitForSingleObject(cond->sema, INFINITE);
> +    rcu_thread_online();
>
>      /* Now waiters must rendez-vous with the signaling thread and
>       * let it continue.  For cond_broadcast this has heavy contention
> @@ -210,7 +213,16 @@ void qemu_sem_post(QemuSemaphore *sem)
>
>  int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
>  {
> -    int rc = WaitForSingleObject(sem->sema, ms);
> +    int rc;
> +
> +    if (ms) {
> +        rcu_thread_offline();
> +    }
> +    rc = WaitForSingleObject(sem->sema, ms);
> +    if (ms) {
> +        rcu_thread_offline();
> +    }
> +
>      if (rc == WAIT_OBJECT_0) {
>          return 0;
>      }
> @@ -250,7 +262,9 @@ void qemu_event_reset(QemuEvent *ev)
>
>  void qemu_event_wait(QemuEvent *ev)
>  {
> +    rcu_thread_offline();
>      WaitForSingleObject(ev->event, INFINITE);
> +    rcu_thread_online();
>  }
>
>  struct QemuThreadData {
> diff --git a/util/rcu.c b/util/rcu.c
> index 27fda86..91d6ae2 100644
> --- a/util/rcu.c
> +++ b/util/rcu.c
> @@ -240,9 +240,6 @@ static void *call_rcu_thread(void *opaque)
>  {
>      struct rcu_head *node;
>
> -    /* This thread is just a writer.  */
> -    rcu_thread_offline();
> -
>      for (;;) {
>          int tries = 0;
>          int n = atomic_read(&rcu_call_count);
> --
> 1.8.1.4
>
>
>

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH 08/12] qemu-thread: report RCU quiescent states
  2013-05-16  8:33   ` liu ping fan
@ 2013-05-16  8:43     ` Paolo Bonzini
  0 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2013-05-16  8:43 UTC (permalink / raw)
  To: liu ping fan; +Cc: Jan Kiszka, qemu-devel, David Gibson

Il 16/05/2013 10:33, liu ping fan ha scritto:
>> > +Luckily, in most cases no manual annotation is needed, because waiting
>> > +on condition variables (qemu_cond_wait), semaphores (qemu_sem_wait,
>> > +qemu_sem_timedwait) or events (qemu_event_wait) implicitly marks the thread
>> > +as quiescent for the whole duration of the wait.  (There is an exception
>> > +for semaphore waits with a zero timeout).
>> > +
> Why not the same rule for zero timeout?
> 

Because they are not really doing synchronization, they are basically an
"atomic_dec_if_not_zero" on the semaphore count.

Paolo

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH 02/12] qemu-thread: add QemuEvent
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 02/12] qemu-thread: add QemuEvent Paolo Bonzini
@ 2013-05-16 10:15   ` Stefan Hajnoczi
  2013-05-16 10:49     ` Paolo Bonzini
  0 siblings, 1 reply; 27+ messages in thread
From: Stefan Hajnoczi @ 2013-05-16 10:15 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: Jan Kiszka, David Gibson, qemu-devel, qemulist

On Wed, May 15, 2013 at 05:48:47PM +0200, Paolo Bonzini wrote:
> This emulates Win32 manual-reset events using futexes or conditional
> variables.  Typical ways to use them are with multi-producer,
> single-consumer data structures, to test for a complex condition whose
> elements come from different threads:
> 
>     for (;;) {
>         qemu_event_reset(ev);
>         ... test complex condition ...
>         if (condition is true) {
>             break;
>         }
>         qemu_event_wait(ev);
>     }
> 
> Alternatively:
> 
>     ... compute condition ...
>     if (condition) {
>         do {
>             qemu_event_wait(ev);
>             qemu_event_reset(ev);
>             ... compute condition ...
>         } while(condition);
>         qemu_event_set(ev);
>     }
> 
> QemuEvent provides a very fast userspace path in the common case when
> no other thread is waiting, or the event is not changing state.  It
> is used to report RCU quiescent states to the thread calling
> synchronize_rcu (the latter being the single consumer), and to report
> call_rcu invocations to the thread that receives them.

It would be nice to describe the need for the Linux futex code.  pthread
mutex/condvars are implemented in terms of futexes already, so how much
benefit is there - I thought they stay in userspace in the non-contended
case too?

Stefan

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH 02/12] qemu-thread: add QemuEvent
  2013-05-16 10:15   ` Stefan Hajnoczi
@ 2013-05-16 10:49     ` Paolo Bonzini
  0 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2013-05-16 10:49 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: Jan Kiszka, David Gibson, qemu-devel, qemulist

[-- Attachment #1: Type: text/plain, Size: 2301 bytes --]

Il 16/05/2013 12:15, Stefan Hajnoczi ha scritto:
> On Wed, May 15, 2013 at 05:48:47PM +0200, Paolo Bonzini wrote:
>> This emulates Win32 manual-reset events using futexes or conditional
>> variables.  Typical ways to use them are with multi-producer,
>> single-consumer data structures, to test for a complex condition whose
>> elements come from different threads:
>>
>>     for (;;) {
>>         qemu_event_reset(ev);
>>         ... test complex condition ...
>>         if (condition is true) {
>>             break;
>>         }
>>         qemu_event_wait(ev);
>>     }
>>
>> Alternatively:
>>
>>     ... compute condition ...
>>     if (condition) {
>>         do {
>>             qemu_event_wait(ev);
>>             qemu_event_reset(ev);
>>             ... compute condition ...
>>         } while(condition);
>>         qemu_event_set(ev);
>>     }
>>
>> QemuEvent provides a very fast userspace path in the common case when
>> no other thread is waiting, or the event is not changing state.  It
>> is used to report RCU quiescent states to the thread calling
>> synchronize_rcu (the latter being the single consumer), and to report
>> call_rcu invocations to the thread that receives them.
> 
> It would be nice to describe the need for the Linux futex code.  pthread
> mutex/condvars are implemented in terms of futexes already, so how much
> benefit is there - I thought they stay in userspace in the non-contended
> case too?

Yes, but they are still measurably slower, around 20%.  I don't have
around the program I wrote for QemuEvent, because I did the measurement
~2 years ago.  However, here is one that tests a similar synchronization
primitive (not exactly the same as QemuEvent).  You can run it like this:

$ ./a.out -c 100 10 4 40 # with mutex/condvar
4 child processes, avg think time 100 msec
Avg event distance 10 msec. Running for 40 sec
........................................
waits		 1404	0.663 us (fast)
  slow waits	 152
signal		 3890	0.784 us
  fast path	 0                             <<<< (this is bogus)

$ ./a.out 100 10 4 40 # with futex
4 child processes, avg think time 100 msec
Avg event distance 10 msec. Running for 40 sec
........................................
waits		 1383	0.635 us (fast)
  slow waits	 147
signal		 3924	0.640 us
  fast path	 3791


Paolo

[-- Attachment #2: evc.c --]
[-- Type: text/x-c, Size: 6107 bytes --]

#define _GNU_SOURCE 1
#include "pthread.h"
#include <limits.h>
#include <string.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <math.h>

#ifdef __linux__
#include <sys/syscall.h>
#include <linux/futex.h>
#define HAVE_FUTEX 1
#endif

#ifdef _WIN32
#include <windows.h>
#include <mmsystem.h>
#else
#define HAVE_AFFINITY 1
#endif

typedef struct EvCounter {
    int ctr;
    int waiters;
    int fast_signal;
    pthread_mutex_t lock;
    pthread_cond_t cond;
} EvCounter;

typedef int EvCounterState;

void evcounter_get(EvCounterState *state, EvCounter *evcounter)
{
    *state = evcounter->ctr;
}

#ifdef HAVE_FUTEX
#define futex(...)      syscall(__NR_futex, __VA_ARGS__)

void _evcounter_init(EvCounter *evcounter)
{
    evcounter->ctr = evcounter->waiters = 0;
}

int _evcounter_wait(EvCounterState *state, EvCounter *evcounter)
{
    int fast = 1;
    __sync_fetch_and_add(&evcounter->waiters, 1);
    while (*state == evcounter->ctr) {
        futex(&evcounter->ctr, FUTEX_WAIT_PRIVATE, *state, NULL);
        fast = 0;
    }
    __sync_fetch_and_add(&evcounter->waiters, -1);
    *state = evcounter->ctr;
    return fast;
}

void _evcounter_signal(EvCounter *evcounter)
{
    __sync_fetch_and_add(&evcounter->ctr, 1);
    if (evcounter->waiters != 0) {
        futex(&evcounter->ctr, FUTEX_WAKE_PRIVATE, INT_MAX);
    } else {
	evcounter->fast_signal++;
    }
}
#endif

void _evcounter_init_cond(EvCounter *evcounter)
{
    evcounter->ctr = 0;
    pthread_mutex_init (&evcounter->lock, NULL);
    pthread_cond_init (&evcounter->cond, NULL);
}

int _evcounter_wait_cond(EvCounterState *state, EvCounter *evcounter)
{
    int fast = 1;
    pthread_mutex_lock(&evcounter->lock);
    while (*state == evcounter->ctr) {
        pthread_cond_wait(&evcounter->cond, &evcounter->lock);
        fast = 0;
    }
    pthread_mutex_unlock(&evcounter->lock);
    *state = evcounter->ctr;
    return fast;
}

void _evcounter_signal_cond(EvCounter *evcounter)
{
    pthread_mutex_lock(&evcounter->lock);
    evcounter->ctr++;
    pthread_cond_broadcast(&evcounter->cond);
    pthread_mutex_unlock(&evcounter->lock);
}


void (*evcounter_init)(EvCounter *);
int (*evcounter_wait)(EvCounterState *, EvCounter *);
void (*evcounter_signal)(EvCounter *);

EvCounter evc;
volatile int stop = 0;
int affinity = 0;

long t_wait;
long n_slow_wait;
long n_wait;
long t_signal;
long n_signal;

#define CLOCK_RES (sizeof(long) == 8 ? 1000000000LL : 1000000LL)

static inline long long getclk()
{
#ifndef _WIN32
    struct timespec ts;
    clock_gettime(CLOCK_MONOTONIC, &ts);
    return (ts.tv_sec * 1000000000LL + ts.tv_nsec) / (1000000000LL / CLOCK_RES);
#else
    static LARGE_INTEGER freq, init;
    LARGE_INTEGER counter;
    if (!init.QuadPart) {
        QueryPerformanceFrequency(&freq);
        QueryPerformanceCounter(&init);
    }
    QueryPerformanceCounter(&counter);
    return (counter.QuadPart - init.QuadPart) * CLOCK_RES / freq.QuadPart;
#endif
}

#ifdef _WIN32
void sleep(int secs)
{
    Sleep (secs * 1000);
}
#endif

void exp_usleep(int avg)
{
    if (avg) {
	double x = rand() / ((double)RAND_MAX + 1.0);
	int usecs = (int) -avg * log(x);
#ifdef _WIN32
	Sleep(usecs / 1000);
#else
	usleep(usecs);
#endif
    }
}

void *generator(void *pavg)
{
    int avg = *(int *)pavg;
    while (stop != 2) {
	exp_usleep(avg);
	long long t1 = getclk();
	evcounter_signal(&evc);
	t_signal += getclk() - t1;
	n_signal++;
    }
    return NULL;
}

void *consumer(void *pavg)
{
    EvCounterState s;
    int avg = *(int *)pavg;
    evcounter_get(&s, &evc);
    while (stop == 0) {
	long long t1 = getclk();
	if (evcounter_wait(&s, &evc)) {
	    __sync_fetch_and_add (&t_wait, getclk() - t1);
	    __sync_fetch_and_add (&n_wait, 1);
        } else {
            __sync_fetch_and_add(&n_slow_wait, 1);
        }
	exp_usleep(avg);
    }
    return NULL;
}

void
create_thread(pthread_t *p, void *(*f)(void *), void *arg)
{
    pthread_create(p, NULL, f, arg);
#ifdef HAVE_AFFINITY
    if (affinity) {
        static int nproc, i;
        cpu_set_t cpu_set;
        if (nproc == 0)
	    nproc = sysconf (_SC_NPROCESSORS_ONLN);

        CPU_ZERO(&cpu_set);
        CPU_SET(i % nproc, &cpu_set);
        i++;

        pthread_setaffinity_np(*p, sizeof(cpu_set), &cpu_set);
    }
#endif
}

int main(int argc, char **argv)
{
#ifdef HAVE_FUTEX
    evcounter_init = _evcounter_init;
    evcounter_wait = _evcounter_wait;
    evcounter_signal = _evcounter_signal;
#else
    evcounter_init = _evcounter_init_cond;
    evcounter_wait = _evcounter_wait_cond;
    evcounter_signal = _evcounter_signal_cond;
#endif
    while (argv[1] && argv[1][0] == '-') { 
        if (strchr(argv[1], 'c')) {
	    evcounter_init = _evcounter_init_cond;
	    evcounter_wait = _evcounter_wait_cond;
	    evcounter_signal = _evcounter_signal_cond;
        }
        if (!strcmp(argv[1], "-a")) {
            affinity = 1;
        }
	argc--, argv++;
    }

    evcounter_init (&evc);

    srand(time(NULL));

    int think_avg = argc > 1 ? atoi(argv[1]) * 1000 : 100000;
    int throughput_avg = argc > 2 ? atoi(argv[2]) * 1000 : 10000;
    int n = argc > 3 ? atoi(argv[3]) : 10;
    int len = argc > 4 ? atoi(argv[4]) : 10;

    printf ("%d child processes, avg think time %d msec\n", n, think_avg/1000);
    printf ("Avg event distance %d msec. Running for %d sec\n",
	    throughput_avg/1000, len);

    pthread_t g, c[n];
    create_thread(&g, generator, &throughput_avg);

    int i;
    for (i = 0; i < n; i++)
	create_thread(&c[i], consumer, &think_avg);

    for (i = 0; i < len; i++) {
	write(1, ".", 1);
	sleep (1);
    }

    write (1, "\n", 1);

    stop = 1;
    for (i = 0; i < n; i++)
	pthread_join(c[i], NULL);
    stop = 2;
    pthread_join(g, NULL);

    printf ("waits\t\t %ld	%.3f us (fast case)\n", n_wait,
            (double)t_wait/(CLOCK_RES/1000000.0)/n_wait);
    printf ("  slow waits\t %ld\n", n_slow_wait);
    printf ("signal\t\t %ld	%.3f us\n", n_signal,
            (double)t_signal/(CLOCK_RES/1000000.0)/n_signal);
    printf ("  fast path\t %ld\n", evc.fast_signal);
    exit (0);
}

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH 03/12] rcu: add rcu library
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 03/12] rcu: add rcu library Paolo Bonzini
@ 2013-05-16 11:46   ` Stefan Hajnoczi
  2013-05-17  4:36   ` liu ping fan
  1 sibling, 0 replies; 27+ messages in thread
From: Stefan Hajnoczi @ 2013-05-16 11:46 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: Jan Kiszka, David Gibson, qemu-devel, qemulist

On Wed, May 15, 2013 at 05:48:48PM +0200, Paolo Bonzini wrote:
> +RCU PATTERNS
> +============
> +
> +Many patterns using read-writer locks translate directly to RCU, with
> +the advantages of higher scalability and deadlock immunity.
> +
> +In general, RCU can be used whenever it is possible to create a new
> +"version" of a data structure every time the updater runs.  This may
> +sound like a very strict restriction, however:
> +
> +- the updater does not mean "everything that writes to a data structure",
> +  but rather "everything that involves a reclamation step".  See the
> +  array example below
> +
> +- in some cases, creating a new version of a data structure may actually
> +  be very cheap.  For example, modifying the "next" pointer of a singly
> +  linked list is effectively creating a new version of the list.
> +
> +
> +them however are worth noting.

?

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH 03/12] rcu: add rcu library
  2013-05-15 15:48 ` [Qemu-devel] [PATCH 03/12] rcu: add rcu library Paolo Bonzini
  2013-05-16 11:46   ` Stefan Hajnoczi
@ 2013-05-17  4:36   ` liu ping fan
  2013-05-17  7:08     ` liu ping fan
  2013-05-17  7:35     ` Paolo Bonzini
  1 sibling, 2 replies; 27+ messages in thread
From: liu ping fan @ 2013-05-17  4:36 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: Jan Kiszka, qemu-devel, David Gibson

On Wed, May 15, 2013 at 11:48 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
> This includes a (mangled) copy of the urcu-qsbr code from liburcu.
> The main changes are: 1) removing dependencies on many other header files
> in liburcu; 2) removing for simplicity the tentative busy waiting in
> synchronize_rcu, which has limited performance effects; 3) replacing
> futexes in synchronize_rcu with QemuEvents for Win32 portability.
> The API is the same as liburcu, so it should be possible in the future
> to require liburcu on POSIX systems for example and use our copy only
> on Windows.
>
> Among the various versions available I chose urcu-qsbr, which has the
> fastest rcu_read_{lock,unlock} but requires the program to manually
> annotate quiescent points or intervals.  QEMU threads usually have easily
> identified quiescent periods, so this should not be a problem.
>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  docs/rcu.txt               | 301 +++++++++++++++++++++++++++++++++++++++++++++
>  hw/9pfs/virtio-9p-synth.c  |   1 +
>  include/qemu/queue.h       |  13 ++
>  include/qemu/rcu-pointer.h | 110 +++++++++++++++++
>  include/qemu/rcu.h         | 168 +++++++++++++++++++++++++
>  include/qemu/thread.h      |   3 -
>  libcacard/Makefile         |   3 +-
>  util/Makefile.objs         |   1 +
>  util/rcu.c                 | 203 ++++++++++++++++++++++++++++++
>  9 files changed, 799 insertions(+), 4 deletions(-)
>  create mode 100644 docs/rcu.txt
>  create mode 100644 include/qemu/rcu-pointer.h
>  create mode 100644 include/qemu/rcu.h
>  create mode 100644 util/rcu.c
>
> diff --git a/docs/rcu.txt b/docs/rcu.txt
> new file mode 100644
> index 0000000..19e4840
> --- /dev/null
> +++ b/docs/rcu.txt
> @@ -0,0 +1,301 @@
> +Using RCU (Read-Copy-Update) for synchronization
> +================================================
> +
> +Read-copy update (RCU) is a synchronization mechanism that is used to
> +protect read-mostly data structures.  RCU is very efficient and scalable
> +on the read side (it is wait-free), and thus can make the read paths
> +extremely fast.
> +
> +RCU supports concurrency between a single writer and multiple readers,
> +thus it is not used alone.  Typically, the write-side will use a lock to
> +serialize multiple updates, but other approaches are possible (e.g.,
> +restricting updates to a single task).  In QEMU, when a lock is used,
> +this will often be the "iothread mutex", also known as the "big QEMU
> +lock" (BQL).  Also, restricting updates to a single task is done in
> +QEMU using the "bottom half" API.
> +
> +RCU is fundamentally a "wait-to-finish" mechanism.  The read side marks
> +sections of code with "critical sections", and the update side will wait
> +for the execution of all *currently running* critical sections before
> +proceeding, or before asynchronously executing a callback.
> +
> +The key point here is that only the currently running critical sections
> +are waited for; critical sections that are started _after_ the beginning
> +of the wait do not extend the wait, despite running concurrently with
> +the updater.  This is the reason why RCU is more scalable than,
> +for example, reader-writer locks.  It is so much more scalable that
> +the system will have a single instance of the RCU mechanism; a single
> +mechanism can be used for an arbitrary number of "things", without
> +having to worry about things such as contention or deadlocks.
> +
> +How is this possible?  The basic idea is to split updates in two phases,
> +"removal" and "reclamation".  During removal, we ensure that subsequent
> +readers will not be able to get a reference to the old data.  After
> +removal has completed, a critical section will not be able to access
> +the old data.  Therefore, critical sections that begin after removal
> +do not matter; as soon as all previous critical sections have finished,
> +there cannot be any readers who hold references to the data structure,
> +which may not be safely reclaimed (e.g., freed or unref'ed).
> +
> +Here is a picutre:
> +
> +        thread 1                  thread 2                  thread 3
> +    -------------------    ------------------------    -------------------
> +    enter RCU crit.sec.
> +           |                finish removal phase
> +           |                begin wait
> +           |                      |                    enter RCU crit.sec.
> +    exit RCU crit.sec             |                           |
> +                            complete wait                     |
> +                            begin reclamation phase           |
> +                                                       exit RCU crit.sec.
> +
> +
> +Note how thread 3 is still executing its critical section when thread 2
> +starts reclaiming data.  This is possible, because the old version of the
> +data structure was not accessible at the time thread 3 began executing
> +that critical section.
> +
> +
> +RCU API
> +=======
> +
> +The core RCU API is small:
> +
> +     void rcu_read_lock(void);
> +
> +        Used by a reader to inform the reclaimer that the reader is
> +        entering an RCU read-side critical section.
> +
> +     void rcu_read_unlock(void);
> +
> +        Used by a reader to inform the reclaimer that the reader is
> +        exiting an RCU read-side critical section.  Note that RCU
> +        read-side critical sections may be nested and/or overlapping.
> +
> +     void synchronize_rcu(void);
> +
> +        Blocks until all pre-existing RCU read-side critical sections
> +        on all threads have completed.  This marks the end of the removal
> +        phase and the beginning of reclamation phase.
> +
> +        Note that it would be valid for another update to come while
> +        synchronize_rcu is running.  Because of this, it is better that
> +        the updater releases any locks it may hold before calling
> +        synchronize_rcu.
> +
> +     typeof(*p) rcu_dereference(p);
> +     typeof(p) rcu_assign_pointer(p, typeof(p) v);
> +
> +        These macros are similar to atomic_mb_read() and atomic_mb_set()
> +        respectively.  However, they make some assumptions on the code
> +        that calls them, which allows a more optimized implementation.
> +
> +        rcu_assign_pointer assumes that the update side is not going
> +        to read from the data structure after "publishing" the new
> +        values; that is, it assumes that all assignments happen at
> +        the very end of the removal phase.
> +
> +        rcu_dereference assumes that whenever a single RCU critical
> +        section reads multiple shared data, these reads are either
> +        data-dependent or need no ordering.  This is almost always the
> +        case when using RCU.  If this were not the case, you can use
> +        atomic_mb_read() or smp_rmb().
> +
> +        If you are going to be fetching multiple fields from the
> +        RCU-protected structure, repeated rcu_dereference() calls
> +        would look ugly and incur unnecessary overhead on Alpha CPUs.
> +        You can then do this:
> +
> +        p = &rcu_dereference(head);
> +        foo = head->foo;
> +        bar = head->bar;
> +
> +
> +RCU QUIESCENT STATES
> +====================
> +
> +An efficient implementation of rcu_read_lock() and rcu_read_unlock()
> +relies on the availability of fast thread-local storage.  Unfortunately,
> +this is not possible on all the systems supported by QEMU (in particular
> +on many POSIX systems other than Linux and Solaris).
> +
> +For this reason, QEMU's RCU implementation resorts to manual annotation
> +of "quiescent states", i.e. points where no RCU read-side critical
> +section can be active.  All threads that participate in the RCU mechanism
> +need to annotate such points.
> +
> +Marking quiescent states is done with the following three APIs:
> +
> +     void rcu_quiescent_state(void);
> +
> +        Marks a point in the execution of the current thread where no
> +        RCU read-side critical section can be active.
> +
> +     void rcu_thread_offline(void);
> +
> +        Marks the beginning of an "extended quiescent state" for the
> +        current thread, i.e. an interval of time during which no
> +        RCU read-side critical section can be active.
> +
> +     void rcu_thread_online(void);
> +
> +        Marks the end of an extended quiescent state for the current
> +        thread.
> +
> +
> +Furthermore, threads that participate in the RCU mechanism must communicate
> +this fact using the following APIs:
> +
> +     void rcu_register_thread(void);
> +
> +        Mark a thread as taking part in the RCU mechanism.  Such a thread
> +        will have to report quiescent points regularly, either manually
> +        or through the QemuCond/QemuSemaphore/QemuEvent APIs.
> +
> +     void rcu_unregister_thread(void);
> +
> +        Mark a thread as not taking part anymore in the RCU mechanism.
> +        It is not a problem if such a thread reports quiescent points,
> +        either manually or by using the QemuCond/QemuSemaphore/QemuEvent
> +        APIs.
> +
> +Note that these APIs are relatively heavyweight, and should _not_ be
> +nested.
> +
> +
> +DIFFERENCES WITH LINUX
> +======================
> +
> +- Sleeping is possible, though discouraged, within an RCU critical section.
> +
> +- rcu_dereference takes a _pointer_ to the variable being accessed.
> +  Wrong usage will be detected by the compiler.
> +
> +- Quiescent points must be marked explicitly unless the thread uses
> +  condvars/semaphores/events for synchronization.
> +
> +
> +RCU PATTERNS
> +============
> +
> +Many patterns using read-writer locks translate directly to RCU, with
> +the advantages of higher scalability and deadlock immunity.
> +
> +In general, RCU can be used whenever it is possible to create a new
> +"version" of a data structure every time the updater runs.  This may
> +sound like a very strict restriction, however:
> +
> +- the updater does not mean "everything that writes to a data structure",
> +  but rather "everything that involves a reclamation step".  See the
> +  array example below
> +
> +- in some cases, creating a new version of a data structure may actually
> +  be very cheap.  For example, modifying the "next" pointer of a singly
> +  linked list is effectively creating a new version of the list.
> +
> +
> +them however are worth noting.
> +
> +RCU list processing
> +-------------------
> +
> +TBD (not yet used in QEMU)
> +
> +
> +RCU reference counting
> +----------------------
> +
> +Because grace periods are not allowed to complete while there is an RCU
> +read-side critical section in progress, the RCU read-side primitives
> +may be used as a restricted reference-counting mechanism.  For example,
> +consider the following code fragment:
> +
> +    rcu_read_lock();
> +    p = rcu_dereference(&foo);
> +    /* do something with p. */
> +    rcu_read_unlock();
> +
> +The RCU read-side critical section ensures that the value of "p" remains
> +valid until after the rcu_read_unlock().  In some sense, it is acquiring
> +a reference to p that is later released when the critical section ends.
> +The write side looks simply like this (with appropriate locking):
> +
> +    qemu_mutex_lock(&foo_mutex);
> +    old = foo;
> +    rcu_assign_pointer(foo, new);
> +    qemu_mutex_unlock(&foo_mutex);
> +    synchronize_rcu();
> +    free(old);
> +
> +Note that the same idiom would be possible with reader/writer
> +locks:
> +
> +    read_lock(&foo_rwlock);         write_mutex_lock(&foo_rwlock);
> +    p = foo;                        p = foo;
> +    /* do something with p. */      foo = new;
> +    read_unlock(&foo_rwlock);       free(p);
> +                                    write_mutex_unlock(&foo_rwlock);
> +                                    free(p);
> +
> +
> +RCU resizable arrays
> +--------------------
> +
> +Resizable arrays can be used with RCU.  The expensive RCU synchronization
> +only needs to take place when the array is resized.  The two items to
> +take care of are:
> +
> +- ensuring that the old version of the array is available between removal
> +  and reclamation;
> +
> +- avoiding mismatches in the read side between the array data and the
> +  array size.
> +
> +The first problem is avoided simply by not using realloc.  Instead,
> +each resize will allocate a new array and copy the old data into it.
> +The second problem would arise if the size and the data pointers were
> +two members of a larger struct:
> +
> +    struct mystuff {
> +        ...
> +        int data_size;
> +        int data_alloc;
> +        T   *data;
> +        ...
> +    };
> +
> +Instead, we store the size of the array with the array itself:
> +
> +    struct arr {
> +        int size;
> +        int alloc;
> +        T   data[];
> +    };
> +    struct arr *global_array;
> +
> +    read side:
> +        rcu_read_lock();
> +        struct arr *array = rcu_dereference(&global_array);
> +        x = i < array->size ? array->data[i] : -1;
> +        rcu_read_unlock();
> +        return x;
> +
> +    write side (running under a lock):
> +        if (global_array->size == global_array->alloc) {
> +            /* Creating a new version.  */
> +            new_array = g_malloc(sizeof(struct arr) +
> +                                 global_array->alloc * 2 * sizeof(T));
> +            new_array->size = global_array->size;
> +            new_array->alloc = global_array->alloc * 2;
> +            memcpy(new_array->data, global_array->data,
> +                   global_array->alloc * sizeof(T));
> +
> +            /* Removal phase.  */
> +            old_array = global_array;
> +            rcu_assign_pointer(new_array->data, new_array);
> +            synchronize_rcu();
> +
> +            /* Reclamation phase.  */
> +            free(old_array);
> +        }
> diff --git a/hw/9pfs/virtio-9p-synth.c b/hw/9pfs/virtio-9p-synth.c
> index 840e4eb..d5f5842 100644
> --- a/hw/9pfs/virtio-9p-synth.c
> +++ b/hw/9pfs/virtio-9p-synth.c
> @@ -17,6 +17,7 @@
>  #include "virtio-9p-xattr.h"
>  #include "fsdev/qemu-fsdev.h"
>  #include "virtio-9p-synth.h"
> +#include "util/rcu.h"
>
>  #include <sys/stat.h>
>
> diff --git a/include/qemu/queue.h b/include/qemu/queue.h
> index d433b90..847ddd1 100644
> --- a/include/qemu/queue.h
> +++ b/include/qemu/queue.h
> @@ -104,6 +104,19 @@ struct {                                                                \
>          (head)->lh_first = NULL;                                        \
>  } while (/*CONSTCOND*/0)
>
> +#define QLIST_SWAP(dstlist, srclist, field) do {                        \
> +        void *tmplist;                                                  \
> +        tmplist = (srclist)->lh_first;                                  \
> +        (srclist)->lh_first = (dstlist)->lh_first;                      \
> +        if ((srclist)->lh_first != NULL) {                              \
> +            (srclist)->lh_first->field.le_prev = &(srclist)->lh_first;  \
> +        }                                                               \
> +        (dstlist)->lh_first = tmplist;                                  \
> +        if ((dstlist)->lh_first != NULL) {                              \
> +            (dstlist)->lh_first->field.le_prev = &(dstlist)->lh_first;  \
> +        }                                                               \
> +} while (/*CONSTCOND*/0)
> +
>  #define QLIST_INSERT_AFTER(listelm, elm, field) do {                    \
>          if (((elm)->field.le_next = (listelm)->field.le_next) != NULL)  \
>                  (listelm)->field.le_next->field.le_prev =               \
> diff --git a/include/qemu/rcu-pointer.h b/include/qemu/rcu-pointer.h
> new file mode 100644
> index 0000000..0e6417c
> --- /dev/null
> +++ b/include/qemu/rcu-pointer.h
> @@ -0,0 +1,110 @@
> +#ifndef _URCU_POINTER_STATIC_H
> +#define _URCU_POINTER_STATIC_H
> +
> +/*
> + * urcu-pointer-static.h
> + *
> + * Userspace RCU header. Operations on pointers.
> + *
> + * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See urcu-pointer.h for
> + * linking dynamically with the userspace rcu library.
> + *
> + * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
> + * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + *
> + * IBM's contributions to this file may be relicensed under LGPLv2 or later.
> + */
> +
> +#include "compiler.h"
> +#include "qemu/atomic.h"
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +/**
> + * rcu_dereference - reads (copy) a RCU-protected pointer to a local variable
> + * into a RCU read-side critical section. The pointer can later be safely
> + * dereferenced within the critical section.
> + *
> + * This ensures that the pointer copy is invariant thorough the whole critical
> + * section.
> + *
> + * Inserts memory barriers on architectures that require them (currently only
> + * Alpha) and documents which pointers are protected by RCU.
> + *
> + * The compiler memory barrier in atomic_read() ensures that value-speculative
> + * optimizations (e.g. VSS: Value Speculation Scheduling) does not perform the
> + * data read before the pointer read by speculating the value of the pointer.
> + * Correct ordering is ensured because the pointer is read as a volatile access.
> + * This acts as a global side-effect operation, which forbids reordering of
> + * dependent memory operations. Note that such concern about dependency-breaking
> + * optimizations will eventually be taken care of by the "memory_order_consume"
> + * addition to forthcoming C++ standard.
> + *
> + * Should match rcu_assign_pointer() or rcu_xchg_pointer().
> + */
> +
> +#define rcu_dereference(p)                      \
> +        ({                                      \
> +            typeof(p) _p1 = (p);                \
> +            smp_read_barrier_depends();         \
> +            *(_p1);                             \
> +        })
> +
> +/**
> + * rcu_cmpxchg_pointer - same as rcu_set_pointer, but tests if the pointer
> + * is as expected by "old". If succeeds, returns the previous pointer to the
> + * data structure, which can be safely freed after waiting for a quiescent state
> + * using synchronize_rcu(). If fails (unexpected value), returns old (which
> + * should not be freed !).
> + */
> +
> +#define rcu_cmpxchg_pointer(p, old, _new)       \
> +        ({                                      \
> +            typeof(*p) _pold = (old);           \
> +            typeof(*p) _pnew = (_new);          \
> +            atomic_cmpxchg(p, _pold, _pnew);    \
> +        })
> +
> +#define rcu_set_pointer(p, v)                   \
> +        ({                                      \
> +             typeof(*p) _pv = (v);              \
> +             smp_wmb();                         \
> +             atomic_set(p, _pv);                \
> +        })
> +
> +/**
> + * rcu_assign_pointer - assign (publicize) a pointer to a new data structure
> + * meant to be read by RCU read-side critical sections. Returns the assigned
> + * value.
> + *
> + * Documents which pointers will be dereferenced by RCU read-side critical
> + * sections and adds the required memory barriers on architectures requiring
> + * them. It also makes sure the compiler does not reorder code initializing the
> + * data structure before its publication.
> + *
> + * Should match rcu_dereference().
> + */
> +
> +#define rcu_assign_pointer(p, v)    rcu_set_pointer(&(p), v)
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _URCU_POINTER_STATIC_H */
> diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
> new file mode 100644
> index 0000000..b875593
> --- /dev/null
> +++ b/include/qemu/rcu.h
> @@ -0,0 +1,168 @@
> +#ifndef _URCU_QSBR_H
> +#define _URCU_QSBR_H
> +
> +/*
> + * urcu-qsbr.h
> + *
> + * Userspace RCU QSBR header.
> + *
> + * LGPL-compatible code should include this header with :
> + *
> + * #define _LGPL_SOURCE
> + * #include <urcu.h>
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + *
> + * IBM's contributions to this file may be relicensed under LGPLv2 or later.
> + */
> +
> +#include <stdlib.h>
> +#include <assert.h>
> +#include <limits.h>
> +#include <unistd.h>
> +#include <stdint.h>
> +#include <stdbool.h>
> +#include <glib.h>
> +
> +#include "qemu/compiler.h"
> +#include "qemu/rcu-pointer.h"
> +#include "qemu/thread.h"
> +#include "qemu/queue.h"
> +#include "qemu/atomic.h"
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +/*
> + * Important !
> + *
> + * Each thread containing read-side critical sections must be registered
> + * with rcu_register_thread() before calling rcu_read_lock().
> + * rcu_unregister_thread() should be called before the thread exits.
> + */
> +
> +#ifdef DEBUG_RCU
> +#define rcu_assert(args...)    assert(args)
> +#else
> +#define rcu_assert(args...)
> +#endif
> +
> +#define RCU_GP_ONLINE     (1UL << 0)
> +#define RCU_GP_CTR        (1UL << 1)
> +
> +/*
> + * Global quiescent period counter with low-order bits unused.
> + * Using a int rather than a char to eliminate false register dependencies
> + * causing stalls on some architectures.
> + */
> +extern unsigned long rcu_gp_ctr;
> +
> +extern QemuEvent rcu_gp_event;
> +
> +struct rcu_reader_data {
> +    /* Data used by both reader and synchronize_rcu() */
> +    unsigned long ctr;
> +    bool waiting;
> +
> +    /* Data used for registry, protected by rcu_gp_lock */
> +    QLIST_ENTRY(rcu_reader_data) node;
> +};
> +
> +#ifdef __linux__
> +extern __thread struct rcu_reader_data rcu_reader;
> +#define DEFINE_RCU_READER() \
> +    __thread struct rcu_reader_data rcu_reader
> +
> +static inline struct rcu_reader_data *get_rcu_reader(void)
> +{
> +    return &rcu_reader;
> +}
> +
> +static inline void alloc_rcu_reader(void)
> +{
> +}
> +#else
> +extern GPrivate rcu_reader_key;
> +#define DEFINE_RCU_READER() \
> +     GPrivate rcu_reader_key = G_PRIVATE_INIT(g_free)
> +
> +static inline struct rcu_reader_data *get_rcu_reader(void)
> +{
> +    return g_private_get(&rcu_reader_key);
> +}
> +
> +static inline void alloc_rcu_reader(void)
> +{
> +     g_private_replace(&rcu_reader_key,
> +                       g_malloc0(sizeof(struct rcu_reader_data)));
> +}
> +#endif
> +
> +static inline void rcu_read_lock(void)
> +{
> +    rcu_assert(get_rcu_reader()->ctr);
> +}
> +
> +static inline void rcu_read_unlock(void)
> +{
> +    /* Ensure that the previous reads complete before starting those
> +     * in another critical section.
> +     */
> +    smp_rmb();
> +}
> +
> +static inline void rcu_quiescent_state(void)
> +{
> +    struct rcu_reader_data *p_rcu_reader = get_rcu_reader();
> +
> +    /* Reuses smp_rmb() in the last rcu_read_unlock().  */
> +    unsigned ctr = atomic_read(&rcu_gp_ctr);
> +    atomic_xchg(&p_rcu_reader->ctr, ctr);
> +    if (atomic_read(&p_rcu_reader->waiting)) {
> +        atomic_set(&p_rcu_reader->waiting, false);
> +        qemu_event_set(&rcu_gp_event);
> +    }
> +}
> +
> +static inline void rcu_thread_offline(void)
> +{
> +    struct rcu_reader_data *p_rcu_reader = get_rcu_reader();
> +
> +    atomic_xchg(&p_rcu_reader->ctr, 0);
> +    if (atomic_read(&p_rcu_reader->waiting)) {
> +        atomic_set(&p_rcu_reader->waiting, false);
> +        qemu_event_set(&rcu_gp_event);
> +    }
> +}
> +
> +static inline void rcu_thread_online(void)
> +{
> +    rcu_quiescent_state();
> +}
> +
> +extern void synchronize_rcu(void);
> +
> +/*
> + * Reader thread registration.
> + */
> +extern void rcu_register_thread(void);
> +extern void rcu_unregister_thread(void);
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _URCU_QSBR_H */
> diff --git a/include/qemu/thread.h b/include/qemu/thread.h
> index 3e32c65..5d64a20 100644
> --- a/include/qemu/thread.h
> +++ b/include/qemu/thread.h
> @@ -25,9 +25,6 @@ void qemu_mutex_lock(QemuMutex *mutex);
>  int qemu_mutex_trylock(QemuMutex *mutex);
>  void qemu_mutex_unlock(QemuMutex *mutex);
>
> -#define rcu_read_lock() do { } while (0)
> -#define rcu_read_unlock() do { } while (0)
> -
>  void qemu_cond_init(QemuCond *cond);
>  void qemu_cond_destroy(QemuCond *cond);
>
> diff --git a/libcacard/Makefile b/libcacard/Makefile
> index 47827a0..f7a3b07 100644
> --- a/libcacard/Makefile
> +++ b/libcacard/Makefile
> @@ -4,7 +4,8 @@ TOOLS += vscclient$(EXESUF)
>
>  # objects linked into a shared library, built with libtool with -fPIC if required
>  libcacard-obj-y = $(stub-obj-y) $(libcacard-y)
> -libcacard-obj-y += util/osdep.o util/cutils.o util/qemu-timer-common.o util/error.o
> +libcacard-obj-y += util/osdep.o util/cutils.o util/qemu-timer-common.o
> +libcacard-obj-y += util/rcu.o util/error.o
>  libcacard-obj-$(CONFIG_WIN32) += util/oslib-win32.o util/qemu-thread-win32.o
>  libcacard-obj-$(CONFIG_POSIX) += util/oslib-posix.o util/qemu-thread-posix.o
>  libcacard-obj-y += $(filter trace/%, $(util-obj-y))
> diff --git a/util/Makefile.objs b/util/Makefile.objs
> index 4a1bd4e..f05eba1 100644
> --- a/util/Makefile.objs
> +++ b/util/Makefile.objs
> @@ -11,3 +11,4 @@ util-obj-y += iov.o aes.o qemu-config.o qemu-sockets.o uri.o notify.o
>  util-obj-y += qemu-option.o qemu-progress.o
>  util-obj-y += hexdump.o
>  util-obj-y += crc32c.o
> +util-obj-y += rcu.o
> diff --git a/util/rcu.c b/util/rcu.c
> new file mode 100644
> index 0000000..48686a3
> --- /dev/null
> +++ b/util/rcu.c
> @@ -0,0 +1,203 @@
> +/*
> + * urcu-qsbr.c
> + *
> + * Userspace RCU QSBR library
> + *
> + * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
> + * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
> + * Copyright 2013 Red Hat, Inc.
> + *
> + * Ported to QEMU by Paolo Bonzini  <pbonzini@redhat.com>
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + *
> + * IBM's contributions to this file may be relicensed under LGPLv2 or later.
> + */
> +
> +#include <stdio.h>
> +#include <assert.h>
> +#include <stdlib.h>
> +#include <stdint.h>
> +#include <errno.h>
> +#include "qemu/rcu.h"
> +#include "qemu/atomic.h"
> +
> +/*
> + * Global grace period counter.  Bit 0 is one if the thread is online.
> + * Bits 1 and above are defined in synchronize_rcu/update_counter_and_wait.
> + */
> +#define RCU_GP_ONLINE           (1UL << 0)
> +#define RCU_GP_CTR              (1UL << 1)
> +
> +unsigned long rcu_gp_ctr = RCU_GP_ONLINE;
> +
> +QemuEvent rcu_gp_event;
> +static QemuMutex rcu_gp_lock;
> +
> +/*
> + * Check whether a quiescent state was crossed between the beginning of
> + * update_counter_and_wait and now.
> + */
> +static inline int rcu_gp_ongoing(unsigned long *ctr)
> +{
> +    unsigned long v;
> +
> +    /* See update_counter_and_wait for the discussion of memory barriers.  */
> +    v = atomic_read(ctr);
> +    return v && (v != rcu_gp_ctr);
> +}
> +
> +/* Written to only by each individual reader. Read by both the reader and the
> + * writers.
> + */
> +DEFINE_RCU_READER();
> +
> +/* Protected by rcu_gp_lock.  */
> +typedef QLIST_HEAD(, rcu_reader_data) ThreadList;
> +static ThreadList registry = QLIST_HEAD_INITIALIZER(registry);
> +
> +/* Wait for previous parity/grace period to be empty of readers.  */
> +static void wait_for_readers(void)
> +{
> +    ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders);
> +    struct rcu_reader_data *index, *tmp;
> +
> +    for (;;) {
> +        /* We want to be notified of changes made to rcu_gp_ongoing
> +         * while we walk the list.
> +         */
> +        qemu_event_reset(&rcu_gp_event);
> +
> +        /* Instead of using atomic_mb_set for index->waiting, and
> +         * atomic_mb_read for index->ctr, memory barriers are placed
> +         * manually since writes to different threads are independent.
> +         * atomic_mb_set has a smp_wmb before...
> +         */
> +        smp_wmb();
> +        QLIST_FOREACH(index, &registry, node) {
> +            atomic_set(&index->waiting, true);
> +        }
> +
> +        /* ... and a smp_mb after.
> +         *
> +         * This barrier also blocks stores that free old RCU-protected
> +         * pointers.
> +         */
> +        smp_mb();
> +
> +        QLIST_FOREACH_SAFE(index, &registry, node, tmp) {
> +            if (!rcu_gp_ongoing(&index->ctr)) {
> +                QLIST_REMOVE(index, node);
> +                QLIST_INSERT_HEAD(&qsreaders, index, node);
> +
> +                /* No need for mb_set here, worst of all we
> +                 * get some extra futex wakeups.
> +                 */
> +                atomic_set(&index->waiting, false);
> +            }
> +        }
> +
> +        /* atomic_mb_read has smp_rmb after.  */
> +        smp_rmb();
> +
> +        if (QLIST_EMPTY(&registry)) {
> +            break;
> +        }
> +
> +        /* Wait for one thread to report a quiescent state and
> +         * try again.
> +         */
> +        qemu_event_wait(&rcu_gp_event);
> +    }
> +
> +    /* put back the reader list in the registry */
> +    QLIST_SWAP(&registry, &qsreaders, node);
> +}
> +
> +void synchronize_rcu(void)
> +{
> +    unsigned long was_online;
> +
> +    was_online = get_rcu_reader()->ctr;
> +
> +    /* Mark the writer thread offline to make sure we don't wait for
> +     * our own quiescent state. This allows using synchronize_rcu()
> +     * in threads registered as readers.
> +     *
> +     * rcu_thread_offline() and rcu_thread_online() include a
> +     * memory barrier.
> +     */
> +    if (was_online) {
> +        rcu_thread_offline();

Encourage the user to call synchronize_rcu() in reader? I think the
caller should ensure it is outside read-section. Also online can be
nested which make the situation even worse.

Regards,
Pingfan
> +    } else {
> +        smp_mb();
> +    }
> +
> +    qemu_mutex_lock(&rcu_gp_lock);
> +
> +    if (!QLIST_EMPTY(&registry)) {
> +        if (sizeof(rcu_gp_ctr) < 8) {
> +            /* For architectures with 32-bit longs, a two-subphases algorithm
> +             * ensures we do not encounter overflow bugs.
> +             *
> +             * Switch parity: 0 -> 1, 1 -> 0.
> +             */
> +            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
> +            wait_for_readers();
> +            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
> +        } else {
> +            /* Increment current grace period.  */
> +            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
> +        }
> +
> +        wait_for_readers();
> +    }
> +
> +    qemu_mutex_unlock(&rcu_gp_lock);
> +
> +    if (was_online) {
> +        rcu_thread_online();
> +    } else {
> +        smp_mb();
> +    }
> +}
> +
> +void rcu_register_thread(void)
> +{
> +    if (!get_rcu_reader()) {
> +        alloc_rcu_reader();
> +    }
> +
> +    assert(get_rcu_reader()->ctr == 0);
> +    qemu_mutex_lock(&rcu_gp_lock);
> +    QLIST_INSERT_HEAD(&registry, get_rcu_reader(), node);
> +    qemu_mutex_unlock(&rcu_gp_lock);
> +    rcu_quiescent_state();
> +}
> +
> +void rcu_unregister_thread(void)
> +{
> +    rcu_thread_offline();
> +    qemu_mutex_lock(&rcu_gp_lock);
> +    QLIST_REMOVE(get_rcu_reader(), node);
> +    qemu_mutex_unlock(&rcu_gp_lock);
> +}
> +
> +static void __attribute__((__constructor__)) rcu_init(void)
> +{
> +    qemu_mutex_init(&rcu_gp_lock);
> +    qemu_event_init(&rcu_gp_event, true);
> +    rcu_register_thread();
> +}
> --
> 1.8.1.4
>
>

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH 03/12] rcu: add rcu library
  2013-05-17  4:36   ` liu ping fan
@ 2013-05-17  7:08     ` liu ping fan
  2013-05-17  7:35     ` Paolo Bonzini
  1 sibling, 0 replies; 27+ messages in thread
From: liu ping fan @ 2013-05-17  7:08 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: Jan Kiszka, qemu-devel, David Gibson

[...]
>> +
>> +void synchronize_rcu(void)
>> +{
>> +    unsigned long was_online;
>> +
>> +    was_online = get_rcu_reader()->ctr;
>> +
>> +    /* Mark the writer thread offline to make sure we don't wait for
>> +     * our own quiescent state. This allows using synchronize_rcu()
>> +     * in threads registered as readers.
>> +     *
>> +     * rcu_thread_offline() and rcu_thread_online() include a
>> +     * memory barrier.
>> +     */
>> +    if (was_online) {
>> +        rcu_thread_offline();
>
> Encourage the user to call synchronize_rcu() in reader? I think the
> caller should ensure it is outside read-section. Also online can be
> nested which make the situation even worse.
>
What about removing call_rcu_thread from @registry, then we can avoid
this, ... and some small changes in _offline()/_online()

> Regards,
> Pingfan
>> +    } else {
>> +        smp_mb();
>> +    }
>> +
>> +    qemu_mutex_lock(&rcu_gp_lock);
>> +
>> +    if (!QLIST_EMPTY(&registry)) {
>> +        if (sizeof(rcu_gp_ctr) < 8) {
>> +            /* For architectures with 32-bit longs, a two-subphases algorithm
>> +             * ensures we do not encounter overflow bugs.
>> +             *
>> +             * Switch parity: 0 -> 1, 1 -> 0.
>> +             */
>> +            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
>> +            wait_for_readers();
>> +            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
>> +        } else {
>> +            /* Increment current grace period.  */
>> +            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
>> +        }
>> +
>> +        wait_for_readers();
>> +    }
>> +
>> +    qemu_mutex_unlock(&rcu_gp_lock);
>> +
>> +    if (was_online) {
>> +        rcu_thread_online();
>> +    } else {
>> +        smp_mb();
>> +    }
>> +}
>> +
>> +void rcu_register_thread(void)
>> +{
>> +    if (!get_rcu_reader()) {
>> +        alloc_rcu_reader();
>> +    }
>> +
>> +    assert(get_rcu_reader()->ctr == 0);
>> +    qemu_mutex_lock(&rcu_gp_lock);
>> +    QLIST_INSERT_HEAD(&registry, get_rcu_reader(), node);
>> +    qemu_mutex_unlock(&rcu_gp_lock);
>> +    rcu_quiescent_state();
>> +}
>> +
>> +void rcu_unregister_thread(void)
>> +{
>> +    rcu_thread_offline();
>> +    qemu_mutex_lock(&rcu_gp_lock);
>> +    QLIST_REMOVE(get_rcu_reader(), node);
>> +    qemu_mutex_unlock(&rcu_gp_lock);
>> +}
>> +
>> +static void __attribute__((__constructor__)) rcu_init(void)
>> +{
>> +    qemu_mutex_init(&rcu_gp_lock);
>> +    qemu_event_init(&rcu_gp_event, true);
>> +    rcu_register_thread();
>> +}
>> --
>> 1.8.1.4
>>
>>

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH 03/12] rcu: add rcu library
  2013-05-17  4:36   ` liu ping fan
  2013-05-17  7:08     ` liu ping fan
@ 2013-05-17  7:35     ` Paolo Bonzini
  1 sibling, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2013-05-17  7:35 UTC (permalink / raw)
  To: liu ping fan; +Cc: Jan Kiszka, qemu-devel, David Gibson

Il 17/05/2013 06:36, liu ping fan ha scritto:
>> > +    /* Mark the writer thread offline to make sure we don't wait for
>> > +     * our own quiescent state. This allows using synchronize_rcu()
>> > +     * in threads registered as readers.
>> > +     *
>> > +     * rcu_thread_offline() and rcu_thread_online() include a
>> > +     * memory barrier.
>> > +     */
>> > +    if (was_online) {
>> > +        rcu_thread_offline();
> Encourage the user to call synchronize_rcu() in reader?

Not in a read-side critical section, but in a *thread registered as
reader*.  And in QEMU, all threads actually are registered as readers.

> I think the caller should ensure it is outside read-section.

That would be possible by adding a small overhead to rcu_read_lock/unlock.

> Also online can be
> nested which make the situation even worse.

It's not online that can be nested, only offline.  So:

- if the thread is already marked as offline, there will be no effect.

- if the thread is not marked as offline, it will be.

Paolo

^ permalink raw reply	[flat|nested] 27+ messages in thread

end of thread, other threads:[~2013-05-17  7:38 UTC | newest]

Thread overview: 27+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2013-05-15 15:48 [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU Paolo Bonzini
2013-05-15 15:48 ` [Qemu-devel] [PATCH 01/12] add a header file for atomic operations Paolo Bonzini
2013-05-15 16:11   ` Peter Maydell
2013-05-15 15:48 ` [Qemu-devel] [PATCH 02/12] qemu-thread: add QemuEvent Paolo Bonzini
2013-05-16 10:15   ` Stefan Hajnoczi
2013-05-16 10:49     ` Paolo Bonzini
2013-05-15 15:48 ` [Qemu-devel] [PATCH 03/12] rcu: add rcu library Paolo Bonzini
2013-05-16 11:46   ` Stefan Hajnoczi
2013-05-17  4:36   ` liu ping fan
2013-05-17  7:08     ` liu ping fan
2013-05-17  7:35     ` Paolo Bonzini
2013-05-15 15:48 ` [Qemu-devel] [PATCH 04/12] qemu-thread: register threads with RCU Paolo Bonzini
2013-05-15 15:48 ` [Qemu-devel] [PATCH 05/12] rcu: add call_rcu Paolo Bonzini
2013-05-15 15:48 ` [Qemu-devel] [PATCH 06/12] rcu: add rcutorture Paolo Bonzini
2013-05-15 15:48 ` [Qemu-devel] [PATCH 07/12] rcu: allow nested calls to rcu_thread_offline/rcu_thread_online Paolo Bonzini
2013-05-15 15:48 ` [Qemu-devel] [PATCH 08/12] qemu-thread: report RCU quiescent states Paolo Bonzini
2013-05-16  8:33   ` liu ping fan
2013-05-16  8:43     ` Paolo Bonzini
2013-05-15 15:48 ` [Qemu-devel] [PATCH 09/12] event loop: " Paolo Bonzini
2013-05-15 15:48 ` [Qemu-devel] [PATCH 10/12] cpus: " Paolo Bonzini
2013-05-15 15:48 ` [Qemu-devel] [PATCH 11/12] block: " Paolo Bonzini
2013-05-15 15:48 ` [Qemu-devel] [PATCH 12/12] migration: " Paolo Bonzini
2013-05-15 16:03 ` [Qemu-devel] [RFC PATCH 00/12] RCU implementation for QEMU Peter Maydell
2013-05-15 16:17   ` Paolo Bonzini
2013-05-15 16:16 ` Peter Maydell
2013-05-15 16:19   ` Paolo Bonzini
2013-05-15 19:28 ` Peter Maydell

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.