All of lore.kernel.org
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH v2 00/15] RCUification of the memory API, parts 1 and 2
@ 2015-01-22 14:47 Paolo Bonzini
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 01/15] rcu: add rcu library Paolo Bonzini
                   ` (14 more replies)
  0 siblings, 15 replies; 27+ messages in thread
From: Paolo Bonzini @ 2015-01-22 14:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: borntraeger, famz, stefanha

Part 1 is patches 1-7, part 2 is patches 8-15.

Part 1 changes requested by reviewers were minimal, but no one really
provided a Reviewed-by so I'm reposting.  The idea is to send a pull
request for part 1 soon.

These two parts are enough of a base for Stefan's work on thread-safe
migration bitmaps.

Part 3 (in the works) will separate removal and reclamation of device
data.  Part 4 (already done) will switch KVM MMIO and PIO to unlocked
lookup and convert a single device to unlocked access.

Changes in part 1:

- patch 1: left aside for later (Peter)

- patch 2 (now patch 1): two small doc fixes (Fam)

- patch 3 (now patch 2): do not do both short and long tests (Fam)
                         do not drop rcu_register_thread in rcu_init (Fam)

TCG folks, please review patches 8 and 9.  Note that they are not
yet ready for multiple TCG threads.

Paolo


Jan Kiszka (1):
  memory: remove assertion on memory_region_destroy

Mike Day (4):
  rcu: introduce RCU-enabled QLIST
  cosmetic changes preparing for the following patches
  exec: convert ram_list to QLIST
  Convert ram_list to RCU

Paolo Bonzini (10):
  rcu: add rcu library
  rcu: add rcutorture
  rcu: allow nesting of rcu_read_lock/rcu_read_unlock
  rcu: add call_rcu
  memory: protect current_map by RCU
  memory: avoid ref/unref in memory_region_find
  exec: introduce cpu_reload_memory_map
  exec: make iotlb RCU-friendly
  exec: RCUify AddressSpaceDispatch
  exec: protect mru_block with RCU

 arch_init.c                  |  66 ++++---
 cpu-exec.c                   |  21 ++
 cpus.c                       |   2 +-
 cputlb.c                     |  13 +-
 docs/rcu.txt                 | 387 ++++++++++++++++++++++++++++++++++++
 exec.c                       | 304 +++++++++++++++++++----------
 hw/9pfs/virtio-9p-synth.c    |   3 +-
 hw/i386/intel_iommu.c        |   3 +
 hw/pci-host/apb.c            |   1 +
 hw/ppc/spapr_iommu.c         |   1 +
 include/exec/cpu-all.h       |  13 +-
 include/exec/cputlb.h        |   2 +-
 include/exec/exec-all.h      |   5 +-
 include/exec/memory.h        |   5 +
 include/qemu/atomic.h        |  61 ++++++
 include/qemu/queue.h         |  24 +--
 include/qemu/rcu.h           | 147 ++++++++++++++
 include/qemu/rcu_queue.h     | 134 +++++++++++++
 include/qemu/thread.h        |   3 -
 include/qom/cpu.h            |   1 +
 memory.c                     |  60 +++---
 scripts/dump-guest-memory.py |   8 +-
 softmmu_template.h           |   4 +-
 tests/Makefile               |  10 +-
 tests/rcutorture.c           | 452 +++++++++++++++++++++++++++++++++++++++++++
 tests/test-rcu-list.c        | 326 +++++++++++++++++++++++++++++++
 util/Makefile.objs           |   1 +
 util/rcu.c                   | 291 ++++++++++++++++++++++++++++
 28 files changed, 2148 insertions(+), 200 deletions(-)
 create mode 100644 docs/rcu.txt
 create mode 100644 include/qemu/rcu.h
 create mode 100644 include/qemu/rcu_queue.h
 create mode 100644 tests/rcutorture.c
 create mode 100644 tests/test-rcu-list.c
 create mode 100644 util/rcu.c

-- 
1.8.3.1

^ permalink raw reply	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 01/15] rcu: add rcu library
  2015-01-22 14:47 [Qemu-devel] [PATCH v2 00/15] RCUification of the memory API, parts 1 and 2 Paolo Bonzini
@ 2015-01-22 14:47 ` Paolo Bonzini
  2015-01-26  3:13   ` Fam Zheng
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 02/15] rcu: add rcutorture Paolo Bonzini
                   ` (13 subsequent siblings)
  14 siblings, 1 reply; 27+ messages in thread
From: Paolo Bonzini @ 2015-01-22 14:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: borntraeger, famz, stefanha

This includes a (mangled) copy of the liburcu code.  The main changes
are: 1) removing dependencies on many other header files in liburcu; 2)
removing for simplicity the tentative busy waiting in synchronize_rcu,
which has limited performance effects; 3) replacing futexes in
synchronize_rcu with QemuEvents for Win32 portability.  The API is
the same as liburcu, so it should be possible in the future to require
liburcu on POSIX systems for example and use our copy only on Windows.

Among the various versions available I chose urcu-mb, which is the
least invasive implementation even though it does not have the
fastest rcu_read_{lock,unlock} implementation.  The urcu flavor can
be changed later, after benchmarking.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/rcu.txt              | 285 ++++++++++++++++++++++++++++++++++++++++++++++
 hw/9pfs/virtio-9p-synth.c |   1 +
 include/qemu/atomic.h     |  61 ++++++++++
 include/qemu/queue.h      |  13 +++
 include/qemu/rcu.h        | 112 ++++++++++++++++++
 include/qemu/thread.h     |   3 -
 util/Makefile.objs        |   1 +
 util/rcu.c                | 172 ++++++++++++++++++++++++++++
 8 files changed, 645 insertions(+), 3 deletions(-)
 create mode 100644 docs/rcu.txt
 create mode 100644 include/qemu/rcu.h
 create mode 100644 util/rcu.c

diff --git a/docs/rcu.txt b/docs/rcu.txt
new file mode 100644
index 0000000..9938ad3
--- /dev/null
+++ b/docs/rcu.txt
@@ -0,0 +1,285 @@
+Using RCU (Read-Copy-Update) for synchronization
+================================================
+
+Read-copy update (RCU) is a synchronization mechanism that is used to
+protect read-mostly data structures.  RCU is very efficient and scalable
+on the read side (it is wait-free), and thus can make the read paths
+extremely fast.
+
+RCU supports concurrency between a single writer and multiple readers,
+thus it is not used alone.  Typically, the write-side will use a lock to
+serialize multiple updates, but other approaches are possible (e.g.,
+restricting updates to a single task).  In QEMU, when a lock is used,
+this will often be the "iothread mutex", also known as the "big QEMU
+lock" (BQL).  Also, restricting updates to a single task is done in
+QEMU using the "bottom half" API.
+
+RCU is fundamentally a "wait-to-finish" mechanism.  The read side marks
+sections of code with "critical sections", and the update side will wait
+for the execution of all *currently running* critical sections before
+proceeding, or before asynchronously executing a callback.
+
+The key point here is that only the currently running critical sections
+are waited for; critical sections that are started _after_ the beginning
+of the wait do not extend the wait, despite running concurrently with
+the updater.  This is the reason why RCU is more scalable than,
+for example, reader-writer locks.  It is so much more scalable that
+the system will have a single instance of the RCU mechanism; a single
+mechanism can be used for an arbitrary number of "things", without
+having to worry about things such as contention or deadlocks.
+
+How is this possible?  The basic idea is to split updates in two phases,
+"removal" and "reclamation".  During removal, we ensure that subsequent
+readers will not be able to get a reference to the old data.  After
+removal has completed, a critical section will not be able to access
+the old data.  Therefore, critical sections that begin after removal
+do not matter; as soon as all previous critical sections have finished,
+there cannot be any readers who hold references to the data structure,
+and these can now be safely reclaimed (e.g., freed or unref'ed).
+
+Here is a picutre:
+
+        thread 1                  thread 2                  thread 3
+    -------------------    ------------------------    -------------------
+    enter RCU crit.sec.
+           |                finish removal phase
+           |                begin wait
+           |                      |                    enter RCU crit.sec.
+    exit RCU crit.sec             |                           |
+                            complete wait                     |
+                            begin reclamation phase           |
+                                                       exit RCU crit.sec.
+
+
+Note how thread 3 is still executing its critical section when thread 2
+starts reclaiming data.  This is possible, because the old version of the
+data structure was not accessible at the time thread 3 began executing
+that critical section.
+
+
+RCU API
+=======
+
+The core RCU API is small:
+
+     void rcu_read_lock(void);
+
+        Used by a reader to inform the reclaimer that the reader is
+        entering an RCU read-side critical section.
+
+     void rcu_read_unlock(void);
+
+        Used by a reader to inform the reclaimer that the reader is
+        exiting an RCU read-side critical section.  Note that RCU
+        read-side critical sections may be nested and/or overlapping.
+
+     void synchronize_rcu(void);
+
+        Blocks until all pre-existing RCU read-side critical sections
+        on all threads have completed.  This marks the end of the removal
+        phase and the beginning of reclamation phase.
+
+        Note that it would be valid for another update to come while
+        synchronize_rcu is running.  Because of this, it is better that
+        the updater releases any locks it may hold before calling
+        synchronize_rcu.
+
+     typeof(*p) atomic_rcu_read(p);
+
+        atomic_rcu_read() is similar to atomic_mb_read(), but it makes
+        some assumptions on the code that calls it.  This allows a more
+        optimized implementation.
+
+        atomic_rcu_read assumes that whenever a single RCU critical
+        section reads multiple shared data, these reads are either
+        data-dependent or need no ordering.  This is almost always the
+        case when using RCU, because read-side critical sections typically
+        navigate one or more pointers (the pointers that are changed on
+        every update) until reaching a data structure of interest,
+        and then read from there.
+
+        RCU read-side critical sections must use atomic_rcu_read() to
+        read data, unless concurrent writes are presented by another
+        synchronization mechanism.
+
+        Furthermore, RCU read-side critical sections should traverse the
+        data structure in a single direction, opposite to the direction
+        in which the updater initializes it.
+
+     void atomic_rcu_set(p, typeof(*p) v);
+
+        atomic_rcu_set() is also similar to atomic_mb_set(), and it also
+        makes assumptions on the code that calls it in order to allow a more
+        optimized implementation.
+
+        In particular, atomic_rcu_set() suffices for synchronization
+        with readers, if the updater never mutates a field within a
+        data item that is already accessible to readers.  This is the
+        case when initializing a new copy of the RCU-protected data
+        structure; just ensure that initialization of *p is carried out
+        before atomic_rcu_set() makes the data item visible to readers.
+        If this rule is observed, writes will happen in the opposite
+        order as reads in the RCU read-side critical sections (or if
+        there is just one update), and there will be no need for other
+        synchronization mechanism to coordinate the accesses.
+
+The following APIs must be used before RCU is used in a thread:
+
+     void rcu_register_thread(void);
+
+        Mark a thread as taking part in the RCU mechanism.  Such a thread
+        will have to report quiescent points regularly, either manually
+        or through the QemuCond/QemuSemaphore/QemuEvent APIs.
+
+     void rcu_unregister_thread(void);
+
+        Mark a thread as not taking part anymore in the RCU mechanism.
+        It is not a problem if such a thread reports quiescent points,
+        either manually or by using the QemuCond/QemuSemaphore/QemuEvent
+        APIs.
+
+Note that these APIs are relatively heavyweight, and should _not_ be
+nested.
+
+
+DIFFERENCES WITH LINUX
+======================
+
+- Waiting on a mutex is possible, though discouraged, within an RCU critical
+  section.  This is because spinlocks are rarely (if ever) used in userspace
+  programming; not allowing this would prevent upgrading an RCU read-side
+  critical section to become an updater.
+
+- atomic_rcu_read and atomic_rcu_set replace rcu_dereference and
+  rcu_assign_pointer.  They take a _pointer_ to the variable being accessed.
+
+
+RCU PATTERNS
+============
+
+Many patterns using read-writer locks translate directly to RCU, with
+the advantages of higher scalability and deadlock immunity.
+
+In general, RCU can be used whenever it is possible to create a new
+"version" of a data structure every time the updater runs.  This may
+sound like a very strict restriction, however:
+
+- the updater does not mean "everything that writes to a data structure",
+  but rather "everything that involves a reclamation step".  See the
+  array example below
+
+- in some cases, creating a new version of a data structure may actually
+  be very cheap.  For example, modifying the "next" pointer of a singly
+  linked list is effectively creating a new version of the list.
+
+Here are some frequently-used RCU idioms that are worth noting.
+
+
+RCU list processing
+-------------------
+
+TBD (not yet used in QEMU)
+
+
+RCU reference counting
+----------------------
+
+Because grace periods are not allowed to complete while there is an RCU
+read-side critical section in progress, the RCU read-side primitives
+may be used as a restricted reference-counting mechanism.  For example,
+consider the following code fragment:
+
+    rcu_read_lock();
+    p = atomic_rcu_read(&foo);
+    /* do something with p. */
+    rcu_read_unlock();
+
+The RCU read-side critical section ensures that the value of "p" remains
+valid until after the rcu_read_unlock().  In some sense, it is acquiring
+a reference to p that is later released when the critical section ends.
+The write side looks simply like this (with appropriate locking):
+
+    qemu_mutex_lock(&foo_mutex);
+    old = foo;
+    atomic_rcu_set(&foo, new);
+    qemu_mutex_unlock(&foo_mutex);
+    synchronize_rcu();
+    free(old);
+
+Note that the same idiom would be possible with reader/writer
+locks:
+
+    read_lock(&foo_rwlock);         write_mutex_lock(&foo_rwlock);
+    p = foo;                        p = foo;
+    /* do something with p. */      foo = new;
+    read_unlock(&foo_rwlock);       free(p);
+                                    write_mutex_unlock(&foo_rwlock);
+                                    free(p);
+
+
+RCU resizable arrays
+--------------------
+
+Resizable arrays can be used with RCU.  The expensive RCU synchronization
+only needs to take place when the array is resized.  The two items to
+take care of are:
+
+- ensuring that the old version of the array is available between removal
+  and reclamation;
+
+- avoiding mismatches in the read side between the array data and the
+  array size.
+
+The first problem is avoided simply by not using realloc.  Instead,
+each resize will allocate a new array and copy the old data into it.
+The second problem would arise if the size and the data pointers were
+two members of a larger struct:
+
+    struct mystuff {
+        ...
+        int data_size;
+        int data_alloc;
+        T   *data;
+        ...
+    };
+
+Instead, we store the size of the array with the array itself:
+
+    struct arr {
+        int size;
+        int alloc;
+        T   data[];
+    };
+    struct arr *global_array;
+
+    read side:
+        rcu_read_lock();
+        struct arr *array = atomic_rcu_read(&global_array);
+        x = i < array->size ? array->data[i] : -1;
+        rcu_read_unlock();
+        return x;
+
+    write side (running under a lock):
+        if (global_array->size == global_array->alloc) {
+            /* Creating a new version.  */
+            new_array = g_malloc(sizeof(struct arr) +
+                                 global_array->alloc * 2 * sizeof(T));
+            new_array->size = global_array->size;
+            new_array->alloc = global_array->alloc * 2;
+            memcpy(new_array->data, global_array->data,
+                   global_array->alloc * sizeof(T));
+
+            /* Removal phase.  */
+            old_array = global_array;
+            atomic_rcu_set(&new_array->data, new_array);
+            synchronize_rcu();
+
+            /* Reclamation phase.  */
+            free(old_array);
+        }
+
+
+SOURCES
+=======
+
+* Documentation/RCU/ from the Linux kernel
diff --git a/hw/9pfs/virtio-9p-synth.c b/hw/9pfs/virtio-9p-synth.c
index 71262bc..e75aa87 100644
--- a/hw/9pfs/virtio-9p-synth.c
+++ b/hw/9pfs/virtio-9p-synth.c
@@ -17,6 +17,7 @@
 #include "virtio-9p-xattr.h"
 #include "fsdev/qemu-fsdev.h"
 #include "virtio-9p-synth.h"
+#include "qemu/rcu.h"
 
 #include <sys/stat.h>
 
diff --git a/include/qemu/atomic.h b/include/qemu/atomic.h
index 93c2ae2..98e05ca 100644
--- a/include/qemu/atomic.h
+++ b/include/qemu/atomic.h
@@ -129,6 +129,67 @@
 #define atomic_set(ptr, i)     ((*(__typeof__(*ptr) volatile*) (ptr)) = (i))
 #endif
 
+/**
+ * atomic_rcu_read - reads a RCU-protected pointer to a local variable
+ * into a RCU read-side critical section. The pointer can later be safely
+ * dereferenced within the critical section.
+ *
+ * This ensures that the pointer copy is invariant thorough the whole critical
+ * section.
+ *
+ * Inserts memory barriers on architectures that require them (currently only
+ * Alpha) and documents which pointers are protected by RCU.
+ *
+ * Unless the __ATOMIC_CONSUME memory order is available, atomic_rcu_read also
+ * includes a compiler barrier to ensure that value-speculative optimizations
+ * (e.g. VSS: Value Speculation Scheduling) does not perform the data read
+ * before the pointer read by speculating the value of the pointer.  On new
+ * enough compilers, atomic_load takes care of such concern about
+ * dependency-breaking optimizations.
+ *
+ * Should match atomic_rcu_set(), atomic_xchg(), atomic_cmpxchg().
+ */
+#ifndef atomic_rcu_read
+#ifdef __ATOMIC_CONSUME
+#define atomic_rcu_read(ptr)    ({                \
+    typeof(*ptr) _val;                            \
+     __atomic_load(ptr, &_val, __ATOMIC_CONSUME); \
+    _val;                                         \
+})
+#else
+#define atomic_rcu_read(ptr)    ({                \
+    typeof(*ptr) _val = atomic_read(ptr);         \
+    smp_read_barrier_depends();                   \
+    _val;                                         \
+})
+#endif
+#endif
+
+/**
+ * atomic_rcu_set - assigns (publicizes) a pointer to a new data structure
+ * meant to be read by RCU read-side critical sections.
+ *
+ * Documents which pointers will be dereferenced by RCU read-side critical
+ * sections and adds the required memory barriers on architectures requiring
+ * them. It also makes sure the compiler does not reorder code initializing the
+ * data structure before its publication.
+ *
+ * Should match atomic_rcu_read().
+ */
+#ifndef atomic_rcu_set
+#ifdef __ATOMIC_RELEASE
+#define atomic_rcu_set(ptr, i)  do {              \
+    typeof(*ptr) _val = (i);                      \
+    __atomic_store(ptr, &_val, __ATOMIC_RELEASE); \
+} while(0)
+#else
+#define atomic_rcu_set(ptr, i)  do {              \
+    smp_wmb();                                    \
+    atomic_set(ptr, i);                           \
+} while (0)
+#endif
+#endif
+
 /* These have the same semantics as Java volatile variables.
  * See http://gee.cs.oswego.edu/dl/jmm/cookbook.html:
  * "1. Issue a StoreStore barrier (wmb) before each volatile store."
diff --git a/include/qemu/queue.h b/include/qemu/queue.h
index a98eb3a..c602797 100644
--- a/include/qemu/queue.h
+++ b/include/qemu/queue.h
@@ -104,6 +104,19 @@ struct {                                                                \
         (head)->lh_first = NULL;                                        \
 } while (/*CONSTCOND*/0)
 
+#define QLIST_SWAP(dstlist, srclist, field) do {                        \
+        void *tmplist;                                                  \
+        tmplist = (srclist)->lh_first;                                  \
+        (srclist)->lh_first = (dstlist)->lh_first;                      \
+        if ((srclist)->lh_first != NULL) {                              \
+            (srclist)->lh_first->field.le_prev = &(srclist)->lh_first;  \
+        }                                                               \
+        (dstlist)->lh_first = tmplist;                                  \
+        if ((dstlist)->lh_first != NULL) {                              \
+            (dstlist)->lh_first->field.le_prev = &(dstlist)->lh_first;  \
+        }                                                               \
+} while (/*CONSTCOND*/0)
+
 #define QLIST_INSERT_AFTER(listelm, elm, field) do {                    \
         if (((elm)->field.le_next = (listelm)->field.le_next) != NULL)  \
                 (listelm)->field.le_next->field.le_prev =               \
diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
new file mode 100644
index 0000000..cfef36e
--- /dev/null
+++ b/include/qemu/rcu.h
@@ -0,0 +1,112 @@
+#ifndef QEMU_RCU_H
+#define QEMU_RCU_H
+
+/*
+ * urcu-mb.h
+ *
+ * Userspace RCU header with explicit memory barrier.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * IBM's contributions to this file may be relicensed under LGPLv2 or later.
+ */
+
+#include <stdlib.h>
+#include <assert.h>
+#include <limits.h>
+#include <unistd.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <glib.h>
+
+#include "qemu/compiler.h"
+#include "qemu/thread.h"
+#include "qemu/queue.h"
+#include "qemu/atomic.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/*
+ * Important !
+ *
+ * Each thread containing read-side critical sections must be registered
+ * with rcu_register_thread() before calling rcu_read_lock().
+ * rcu_unregister_thread() should be called before the thread exits.
+ */
+
+#ifdef DEBUG_RCU
+#define rcu_assert(args...)    assert(args)
+#else
+#define rcu_assert(args...)
+#endif
+
+/*
+ * Global quiescent period counter with low-order bits unused.
+ * Using a int rather than a char to eliminate false register dependencies
+ * causing stalls on some architectures.
+ */
+extern unsigned long rcu_gp_ctr;
+
+extern QemuEvent rcu_gp_event;
+
+struct rcu_reader_data {
+    /* Data used by both reader and synchronize_rcu() */
+    unsigned long ctr;
+    bool waiting;
+
+    /* Data used for registry, protected by rcu_gp_lock */
+    QLIST_ENTRY(rcu_reader_data) node;
+};
+
+extern __thread struct rcu_reader_data rcu_reader;
+
+static inline void rcu_read_lock(void)
+{
+    struct rcu_reader_data *p_rcu_reader = &rcu_reader;
+
+    unsigned ctr = atomic_read(&rcu_gp_ctr);
+    atomic_xchg(&p_rcu_reader->ctr, ctr);
+    if (atomic_read(&p_rcu_reader->waiting)) {
+        atomic_set(&p_rcu_reader->waiting, false);
+        qemu_event_set(&rcu_gp_event);
+    }
+}
+
+static inline void rcu_read_unlock(void)
+{
+    struct rcu_reader_data *p_rcu_reader = &rcu_reader;
+
+    atomic_xchg(&p_rcu_reader->ctr, 0);
+    if (atomic_read(&p_rcu_reader->waiting)) {
+        atomic_set(&p_rcu_reader->waiting, false);
+        qemu_event_set(&rcu_gp_event);
+    }
+}
+
+extern void synchronize_rcu(void);
+
+/*
+ * Reader thread registration.
+ */
+extern void rcu_register_thread(void);
+extern void rcu_unregister_thread(void);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* QEMU_RCU_H */
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index e89fdc9..5114ec8 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -25,9 +25,6 @@ void qemu_mutex_lock(QemuMutex *mutex);
 int qemu_mutex_trylock(QemuMutex *mutex);
 void qemu_mutex_unlock(QemuMutex *mutex);
 
-#define rcu_read_lock() do { } while (0)
-#define rcu_read_unlock() do { } while (0)
-
 void qemu_cond_init(QemuCond *cond);
 void qemu_cond_destroy(QemuCond *cond);
 
diff --git a/util/Makefile.objs b/util/Makefile.objs
index 93007e2..ceaba30 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -17,3 +17,4 @@ util-obj-y += throttle.o
 util-obj-y += getauxval.o
 util-obj-y += readline.o
 util-obj-y += rfifolock.o
+util-obj-y += rcu.o
diff --git a/util/rcu.c b/util/rcu.c
new file mode 100644
index 0000000..1f737d5
--- /dev/null
+++ b/util/rcu.c
@@ -0,0 +1,172 @@
+/*
+ * urcu-mb.c
+ *
+ * Userspace RCU library with explicit memory barriers
+ *
+ * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
+ * Copyright 2015 Red Hat, Inc.
+ *
+ * Ported to QEMU by Paolo Bonzini  <pbonzini@redhat.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * IBM's contributions to this file may be relicensed under LGPLv2 or later.
+ */
+
+#include <stdio.h>
+#include <assert.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <errno.h>
+#include "qemu/rcu.h"
+#include "qemu/atomic.h"
+
+/*
+ * Global grace period counter.  Bit 0 is always one in rcu_gp_ctr.
+ * Bits 1 and above are defined in synchronize_rcu.
+ */
+#define RCU_GP_LOCKED           (1UL << 0)
+#define RCU_GP_CTR              (1UL << 1)
+
+unsigned long rcu_gp_ctr = RCU_GP_LOCKED;
+
+QemuEvent rcu_gp_event;
+static QemuMutex rcu_gp_lock;
+
+/*
+ * Check whether a quiescent state was crossed between the beginning of
+ * update_counter_and_wait and now.
+ */
+static inline int rcu_gp_ongoing(unsigned long *ctr)
+{
+    unsigned long v;
+
+    v = atomic_read(ctr);
+    return v && (v != rcu_gp_ctr);
+}
+
+/* Written to only by each individual reader. Read by both the reader and the
+ * writers.
+ */
+__thread struct rcu_reader_data rcu_reader;
+
+/* Protected by rcu_gp_lock.  */
+typedef QLIST_HEAD(, rcu_reader_data) ThreadList;
+static ThreadList registry = QLIST_HEAD_INITIALIZER(registry);
+
+/* Wait for previous parity/grace period to be empty of readers.  */
+static void wait_for_readers(void)
+{
+    ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders);
+    struct rcu_reader_data *index, *tmp;
+
+    for (;;) {
+        /* We want to be notified of changes made to rcu_gp_ongoing
+         * while we walk the list.
+         */
+        qemu_event_reset(&rcu_gp_event);
+
+        /* Instead of using atomic_mb_set for index->waiting, and
+         * atomic_mb_read for index->ctr, memory barriers are placed
+         * manually since writes to different threads are independent.
+         * atomic_mb_set has a smp_wmb before...
+         */
+        smp_wmb();
+        QLIST_FOREACH(index, &registry, node) {
+            atomic_set(&index->waiting, true);
+        }
+
+        /* ... and a smp_mb after.  */
+        smp_mb();
+
+        QLIST_FOREACH_SAFE(index, &registry, node, tmp) {
+            if (!rcu_gp_ongoing(&index->ctr)) {
+                QLIST_REMOVE(index, node);
+                QLIST_INSERT_HEAD(&qsreaders, index, node);
+
+                /* No need for mb_set here, worst of all we
+                 * get some extra futex wakeups.
+                 */
+                atomic_set(&index->waiting, false);
+            }
+        }
+
+        /* atomic_mb_read has smp_rmb after.  */
+        smp_rmb();
+
+        if (QLIST_EMPTY(&registry)) {
+            break;
+        }
+
+        /* Wait for one thread to report a quiescent state and
+         * try again.
+         */
+        qemu_event_wait(&rcu_gp_event);
+    }
+
+    /* put back the reader list in the registry */
+    QLIST_SWAP(&registry, &qsreaders, node);
+}
+
+void synchronize_rcu(void)
+{
+    qemu_mutex_lock(&rcu_gp_lock);
+
+    if (!QLIST_EMPTY(&registry)) {
+        /* In either case, the atomic_mb_set below blocks stores that free
+         * old RCU-protected pointers.
+         */
+        if (sizeof(rcu_gp_ctr) < 8) {
+            /* For architectures with 32-bit longs, a two-subphases algorithm
+             * ensures we do not encounter overflow bugs.
+             *
+             * Switch parity: 0 -> 1, 1 -> 0.
+             */
+            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
+            wait_for_readers();
+            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
+        } else {
+            /* Increment current grace period.  */
+            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
+        }
+
+        wait_for_readers();
+    }
+
+    qemu_mutex_unlock(&rcu_gp_lock);
+}
+
+void rcu_register_thread(void)
+{
+    assert(rcu_reader.ctr == 0);
+    qemu_mutex_lock(&rcu_gp_lock);
+    QLIST_INSERT_HEAD(&registry, &rcu_reader, node);
+    qemu_mutex_unlock(&rcu_gp_lock);
+}
+
+void rcu_unregister_thread(void)
+{
+    qemu_mutex_lock(&rcu_gp_lock);
+    QLIST_REMOVE(&rcu_reader, node);
+    qemu_mutex_unlock(&rcu_gp_lock);
+}
+
+static void __attribute__((__constructor__)) rcu_init(void)
+{
+    qemu_mutex_init(&rcu_gp_lock);
+    qemu_event_init(&rcu_gp_event, true);
+    rcu_register_thread();
+}
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 02/15] rcu: add rcutorture
  2015-01-22 14:47 [Qemu-devel] [PATCH v2 00/15] RCUification of the memory API, parts 1 and 2 Paolo Bonzini
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 01/15] rcu: add rcu library Paolo Bonzini
@ 2015-01-22 14:47 ` Paolo Bonzini
  2015-01-26  3:31   ` Fam Zheng
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 03/15] rcu: allow nesting of rcu_read_lock/rcu_read_unlock Paolo Bonzini
                   ` (12 subsequent siblings)
  14 siblings, 1 reply; 27+ messages in thread
From: Paolo Bonzini @ 2015-01-22 14:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: borntraeger, famz, stefanha

rcutorture is the unit test for rcu.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 tests/Makefile     |   7 +-
 tests/rcutorture.c | 450 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 456 insertions(+), 1 deletion(-)
 create mode 100644 tests/rcutorture.c

diff --git a/tests/Makefile b/tests/Makefile
index c2e2e52..db5b3c3 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -60,6 +60,8 @@ gcov-files-test-mul64-y = util/host-utils.c
 check-unit-y += tests/test-int128$(EXESUF)
 # all code tested by test-int128 is inside int128.h
 gcov-files-test-int128-y =
+check-unit-y += tests/rcutorture$(EXESUF)
+gcov-files-rcutorture-y = util/rcu.c
 check-unit-y += tests/test-bitops$(EXESUF)
 check-unit-$(CONFIG_HAS_GLIB_SUBPROCESS_TESTS) += tests/test-qdev-global-props$(EXESUF)
 check-unit-y += tests/check-qom-interface$(EXESUF)
@@ -223,7 +225,8 @@ test-obj-y = tests/check-qint.o tests/check-qstring.o tests/check-qdict.o \
 	tests/test-qmp-input-visitor.o tests/test-qmp-input-strict.o \
 	tests/test-qmp-commands.o tests/test-visitor-serialization.o \
 	tests/test-x86-cpuid.o tests/test-mul64.o tests/test-int128.o \
-	tests/test-opts-visitor.o tests/test-qmp-event.o
+	tests/test-opts-visitor.o tests/test-qmp-event.o \
+	tests/rcutorture.o
 
 test-qapi-obj-y = tests/test-qapi-visit.o tests/test-qapi-types.o \
 		  tests/test-qapi-event.o
@@ -252,6 +255,8 @@ tests/test-x86-cpuid$(EXESUF): tests/test-x86-cpuid.o
 tests/test-xbzrle$(EXESUF): tests/test-xbzrle.o migration/xbzrle.o page_cache.o libqemuutil.a
 tests/test-cutils$(EXESUF): tests/test-cutils.o util/cutils.o
 tests/test-int128$(EXESUF): tests/test-int128.o
+tests/rcutorture$(EXESUF): tests/rcutorture.o libqemuutil.a
+
 tests/test-qdev-global-props$(EXESUF): tests/test-qdev-global-props.o \
 	hw/core/qdev.o hw/core/qdev-properties.o hw/core/hotplug.o\
 	hw/core/irq.o \
diff --git a/tests/rcutorture.c b/tests/rcutorture.c
new file mode 100644
index 0000000..214985f
--- /dev/null
+++ b/tests/rcutorture.c
@@ -0,0 +1,450 @@
+/*
+ * rcutorture.c: simple user-level performance/stress test of RCU.
+ *
+ * Usage:
+ *     ./rcu <nreaders> rperf [ <seconds> ]
+ *         Run a read-side performance test with the specified
+ *         number of readers for <seconds> seconds.
+ *     ./rcu <nupdaters> uperf [ <seconds> ]
+ *         Run an update-side performance test with the specified
+ *         number of updaters and specified duration.
+ *     ./rcu <nreaders> perf [ <seconds> ]
+ *         Run a combined read/update performance test with the specified
+ *         number of readers and one updater and specified duration.
+ *
+ * The above tests produce output as follows:
+ *
+ * n_reads: 46008000  n_updates: 146026  nreaders: 2  nupdaters: 1 duration: 1
+ * ns/read: 43.4707  ns/update: 6848.1
+ *
+ * The first line lists the total number of RCU reads and updates executed
+ * during the test, the number of reader threads, the number of updater
+ * threads, and the duration of the test in seconds.  The second line
+ * lists the average duration of each type of operation in nanoseconds,
+ * or "nan" if the corresponding type of operation was not performed.
+ *
+ *     ./rcu <nreaders> stress [ <seconds> ]
+ *         Run a stress test with the specified number of readers and
+ *         one updater.
+ *
+ * This test produces output as follows:
+ *
+ * n_reads: 114633217  n_updates: 3903415  n_mberror: 0
+ * rcu_stress_count: 114618391 14826 0 0 0 0 0 0 0 0 0
+ *
+ * The first line lists the number of RCU read and update operations
+ * executed, followed by the number of memory-ordering violations
+ * (which will be zero in a correct RCU implementation).  The second
+ * line lists the number of readers observing progressively more stale
+ * data.  A correct RCU implementation will have all but the first two
+ * numbers non-zero.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright (c) 2008 Paul E. McKenney, IBM Corporation.
+ */
+
+/*
+ * Test variables.
+ */
+
+#include <glib.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include "qemu/atomic.h"
+#include "qemu/rcu.h"
+#include "qemu/compiler.h"
+#include "qemu/thread.h"
+
+long long n_reads = 0LL;
+long n_updates = 0L;
+int nthreadsrunning;
+
+char argsbuf[64];
+
+#define GOFLAG_INIT 0
+#define GOFLAG_RUN  1
+#define GOFLAG_STOP 2
+
+static volatile int goflag = GOFLAG_INIT;
+
+#define RCU_READ_RUN 1000
+
+#define NR_THREADS 100
+static QemuThread threads[NR_THREADS];
+static struct rcu_reader_data *data[NR_THREADS];
+static int n_threads;
+
+static void create_thread(void *(*func)(void *))
+{
+    if (n_threads >= NR_THREADS) {
+        fprintf(stderr, "Thread limit of %d exceeded!\n", NR_THREADS);
+        exit(-1);
+    }
+    qemu_thread_create(&threads[n_threads], "test", func, &data[n_threads],
+                       QEMU_THREAD_JOINABLE);
+    n_threads++;
+}
+
+static void wait_all_threads(void)
+{
+    int i;
+
+    for (i = 0; i < n_threads; i++) {
+        qemu_thread_join(&threads[i]);
+    }
+    n_threads = 0;
+}
+
+/*
+ * Performance test.
+ */
+
+static void *rcu_read_perf_test(void *arg)
+{
+    int i;
+    long long n_reads_local = 0;
+
+    rcu_register_thread();
+
+    *(struct rcu_reader_data **)arg = &rcu_reader;
+    atomic_inc(&nthreadsrunning);
+    while (goflag == GOFLAG_INIT) {
+        g_usleep(1000);
+    }
+    while (goflag == GOFLAG_RUN) {
+        for (i = 0; i < RCU_READ_RUN; i++) {
+            rcu_read_lock();
+            rcu_read_unlock();
+        }
+        n_reads_local += RCU_READ_RUN;
+    }
+    atomic_add(&n_reads, n_reads_local);
+
+    rcu_unregister_thread();
+    return NULL;
+}
+
+static void *rcu_update_perf_test(void *arg)
+{
+    long long n_updates_local = 0;
+
+    rcu_register_thread();
+
+    *(struct rcu_reader_data **)arg = &rcu_reader;
+    atomic_inc(&nthreadsrunning);
+    while (goflag == GOFLAG_INIT) {
+        g_usleep(1000);
+    }
+    while (goflag == GOFLAG_RUN) {
+        synchronize_rcu();
+        n_updates_local++;
+    }
+    atomic_add(&n_updates, n_updates_local);
+
+    rcu_unregister_thread();
+    return NULL;
+}
+
+static void perftestinit(void)
+{
+    nthreadsrunning = 0;
+}
+
+static void perftestrun(int nthreads, int duration, int nreaders, int nupdaters)
+{
+    while (atomic_read(&nthreadsrunning) < nthreads) {
+        g_usleep(1000);
+    }
+    goflag = GOFLAG_RUN;
+    sleep(duration);
+    goflag = GOFLAG_STOP;
+    wait_all_threads();
+    printf("n_reads: %lld  n_updates: %ld  nreaders: %d  nupdaters: %d duration: %d\n",
+           n_reads, n_updates, nreaders, nupdaters, duration);
+    printf("ns/read: %g  ns/update: %g\n",
+           ((duration * 1000*1000*1000.*(double)nreaders) /
+        (double)n_reads),
+           ((duration * 1000*1000*1000.*(double)nupdaters) /
+        (double)n_updates));
+    exit(0);
+}
+
+static void perftest(int nreaders, int duration)
+{
+    int i;
+
+    perftestinit();
+    for (i = 0; i < nreaders; i++) {
+        create_thread(rcu_read_perf_test);
+    }
+    create_thread(rcu_update_perf_test);
+    perftestrun(i + 1, duration, nreaders, 1);
+}
+
+static void rperftest(int nreaders, int duration)
+{
+    int i;
+
+    perftestinit();
+    for (i = 0; i < nreaders; i++) {
+        create_thread(rcu_read_perf_test);
+    }
+    perftestrun(i, duration, nreaders, 0);
+}
+
+static void uperftest(int nupdaters, int duration)
+{
+    int i;
+
+    perftestinit();
+    for (i = 0; i < nupdaters; i++) {
+        create_thread(rcu_update_perf_test);
+    }
+    perftestrun(i, duration, 0, nupdaters);
+}
+
+/*
+ * Stress test.
+ */
+
+#define RCU_STRESS_PIPE_LEN 10
+
+struct rcu_stress {
+    int pipe_count;
+    int mbtest;
+};
+
+struct rcu_stress rcu_stress_array[RCU_STRESS_PIPE_LEN] = { { 0 } };
+struct rcu_stress *rcu_stress_current;
+int rcu_stress_idx;
+
+int n_mberror;
+long long rcu_stress_count[RCU_STRESS_PIPE_LEN + 1];
+
+
+static void *rcu_read_stress_test(void *arg)
+{
+    int i;
+    int itercnt = 0;
+    struct rcu_stress *p;
+    int pc;
+    long long n_reads_local = 0;
+    volatile int garbage;
+
+    rcu_register_thread();
+
+    *(struct rcu_reader_data **)arg = &rcu_reader;
+    while (goflag == GOFLAG_INIT) {
+        g_usleep(1000);
+    }
+    while (goflag == GOFLAG_RUN) {
+        rcu_read_lock();
+        p = atomic_rcu_read(&rcu_stress_current);
+        if (p->mbtest == 0) {
+            n_mberror++;
+        }
+        for (i = 0; i < 100; i++) {
+            garbage++;
+        }
+        pc = p->pipe_count;
+        rcu_read_unlock();
+        if ((pc > RCU_STRESS_PIPE_LEN) || (pc < 0)) {
+            pc = RCU_STRESS_PIPE_LEN;
+        }
+        atomic_inc(&rcu_stress_count[pc]);
+        n_reads_local++;
+        if ((++itercnt % 0x1000) == 0) {
+            synchronize_rcu();
+        }
+    }
+    atomic_add(&n_reads, n_reads_local);
+
+    rcu_unregister_thread();
+    return NULL;
+}
+
+static void *rcu_update_stress_test(void *arg)
+{
+    int i;
+    struct rcu_stress *p;
+
+    rcu_register_thread();
+
+    *(struct rcu_reader_data **)arg = &rcu_reader;
+    while (goflag == GOFLAG_INIT) {
+        g_usleep(1000);
+    }
+    while (goflag == GOFLAG_RUN) {
+        i = rcu_stress_idx + 1;
+        if (i >= RCU_STRESS_PIPE_LEN) {
+            i = 0;
+        }
+        p = &rcu_stress_array[i];
+        p->mbtest = 0;
+        smp_mb();
+        p->pipe_count = 0;
+        p->mbtest = 1;
+        atomic_rcu_set(&rcu_stress_current, p);
+        rcu_stress_idx = i;
+        for (i = 0; i < RCU_STRESS_PIPE_LEN; i++)
+            if (i != rcu_stress_idx) {
+                rcu_stress_array[i].pipe_count++;
+            }
+        synchronize_rcu();
+        n_updates++;
+    }
+
+    rcu_unregister_thread();
+    return NULL;
+}
+
+static void *rcu_fake_update_stress_test(void *arg)
+{
+    rcu_register_thread();
+
+    *(struct rcu_reader_data **)arg = &rcu_reader;
+    while (goflag == GOFLAG_INIT) {
+        g_usleep(1000);
+    }
+    while (goflag == GOFLAG_RUN) {
+        synchronize_rcu();
+        g_usleep(1000);
+    }
+
+    rcu_unregister_thread();
+    return NULL;
+}
+
+static void stresstest(int nreaders, int duration)
+{
+    int i;
+
+    rcu_stress_current = &rcu_stress_array[0];
+    rcu_stress_current->pipe_count = 0;
+    rcu_stress_current->mbtest = 1;
+    for (i = 0; i < nreaders; i++) {
+        create_thread(rcu_read_stress_test);
+    }
+    create_thread(rcu_update_stress_test);
+    for (i = 0; i < 5; i++) {
+        create_thread(rcu_fake_update_stress_test);
+    }
+    goflag = GOFLAG_RUN;
+    sleep(duration);
+    goflag = GOFLAG_STOP;
+    wait_all_threads();
+    printf("n_reads: %lld  n_updates: %ld  n_mberror: %d\n",
+           n_reads, n_updates, n_mberror);
+    printf("rcu_stress_count:");
+    for (i = 0; i <= RCU_STRESS_PIPE_LEN; i++) {
+        printf(" %lld", rcu_stress_count[i]);
+    }
+    printf("\n");
+    exit(0);
+}
+
+/* GTest interface */
+
+static void gtest_stress(int nreaders, int duration)
+{
+    int i;
+
+    rcu_stress_current = &rcu_stress_array[0];
+    rcu_stress_current->pipe_count = 0;
+    rcu_stress_current->mbtest = 1;
+    for (i = 0; i < nreaders; i++) {
+        create_thread(rcu_read_stress_test);
+    }
+    create_thread(rcu_update_stress_test);
+    for (i = 0; i < 5; i++) {
+        create_thread(rcu_fake_update_stress_test);
+    }
+    goflag = GOFLAG_RUN;
+    sleep(duration);
+    goflag = GOFLAG_STOP;
+    wait_all_threads();
+    g_assert_cmpint(n_mberror, ==, 0);
+    for (i = 2; i <= RCU_STRESS_PIPE_LEN; i++) {
+        g_assert_cmpint(rcu_stress_count[i], ==, 0);
+    }
+}
+
+static void gtest_stress_1_1(void)
+{
+    gtest_stress(1, 1);
+}
+
+static void gtest_stress_10_1(void)
+{
+    gtest_stress(10, 1);
+}
+
+static void gtest_stress_1_5(void)
+{
+    gtest_stress(1, 5);
+}
+
+static void gtest_stress_10_5(void)
+{
+    gtest_stress(10, 5);
+}
+
+/*
+ * Mainprogram.
+ */
+
+static void usage(int argc, char *argv[])
+{
+    fprintf(stderr, "Usage: %s [nreaders [ perf | stress ] ]\n", argv[0]);
+    exit(-1);
+}
+
+int main(int argc, char *argv[])
+{
+    int nreaders = 1;
+    int duration = 1;
+
+    if (argc >= 2 && argv[1][0] == '-') {
+        g_test_init(&argc, &argv, NULL);
+        if (g_test_quick()) {
+            g_test_add_func("/rcu/torture/1reader", gtest_stress_1_1);
+            g_test_add_func("/rcu/torture/10readers", gtest_stress_10_1);
+        } else {
+            g_test_add_func("/rcu/torture/1reader", gtest_stress_1_5);
+            g_test_add_func("/rcu/torture/10readers", gtest_stress_10_5);
+        }
+        return g_test_run();
+    }
+
+    if (argc >= 2) {
+        nreaders = strtoul(argv[1], NULL, 0);
+    }
+    if (argc > 3) {
+        duration = strtoul(argv[3], NULL, 0);
+    }
+    if (argc < 3 || strcmp(argv[2], "stress") == 0) {
+        stresstest(nreaders, duration);
+    } else if (strcmp(argv[2], "rperf") == 0) {
+        rperftest(nreaders, duration);
+    } else if (strcmp(argv[2], "uperf") == 0) {
+        uperftest(nreaders, duration);
+    } else if (strcmp(argv[2], "perf") == 0) {
+        perftest(nreaders, duration);
+    }
+    usage(argc, argv);
+    return 0;
+}
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 03/15] rcu: allow nesting of rcu_read_lock/rcu_read_unlock
  2015-01-22 14:47 [Qemu-devel] [PATCH v2 00/15] RCUification of the memory API, parts 1 and 2 Paolo Bonzini
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 01/15] rcu: add rcu library Paolo Bonzini
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 02/15] rcu: add rcutorture Paolo Bonzini
@ 2015-01-22 14:47 ` Paolo Bonzini
  2015-01-26  3:32   ` Fam Zheng
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 04/15] rcu: add call_rcu Paolo Bonzini
                   ` (11 subsequent siblings)
  14 siblings, 1 reply; 27+ messages in thread
From: Paolo Bonzini @ 2015-01-22 14:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: borntraeger, famz, stefanha

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 include/qemu/rcu.h | 15 ++++++++++++++-
 tests/rcutorture.c |  2 ++
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
index cfef36e..da043f2 100644
--- a/include/qemu/rcu.h
+++ b/include/qemu/rcu.h
@@ -68,6 +68,9 @@ struct rcu_reader_data {
     unsigned long ctr;
     bool waiting;
 
+    /* Data used by reader only */
+    unsigned depth;
+
     /* Data used for registry, protected by rcu_gp_lock */
     QLIST_ENTRY(rcu_reader_data) node;
 };
@@ -77,8 +80,13 @@ extern __thread struct rcu_reader_data rcu_reader;
 static inline void rcu_read_lock(void)
 {
     struct rcu_reader_data *p_rcu_reader = &rcu_reader;
+    unsigned ctr;
+
+    if (p_rcu_reader->depth++ > 0) {
+        return;
+    }
 
-    unsigned ctr = atomic_read(&rcu_gp_ctr);
+    ctr = atomic_read(&rcu_gp_ctr);
     atomic_xchg(&p_rcu_reader->ctr, ctr);
     if (atomic_read(&p_rcu_reader->waiting)) {
         atomic_set(&p_rcu_reader->waiting, false);
@@ -90,6 +98,11 @@ static inline void rcu_read_unlock(void)
 {
     struct rcu_reader_data *p_rcu_reader = &rcu_reader;
 
+    assert(p_rcu_reader->depth != 0);
+    if (--p_rcu_reader->depth > 0) {
+        return;
+    }
+
     atomic_xchg(&p_rcu_reader->ctr, 0);
     if (atomic_read(&p_rcu_reader->waiting)) {
         atomic_set(&p_rcu_reader->waiting, false);
diff --git a/tests/rcutorture.c b/tests/rcutorture.c
index 214985f..31461c3 100644
--- a/tests/rcutorture.c
+++ b/tests/rcutorture.c
@@ -257,9 +257,11 @@ static void *rcu_read_stress_test(void *arg)
         if (p->mbtest == 0) {
             n_mberror++;
         }
+        rcu_read_lock();
         for (i = 0; i < 100; i++) {
             garbage++;
         }
+        rcu_read_unlock();
         pc = p->pipe_count;
         rcu_read_unlock();
         if ((pc > RCU_STRESS_PIPE_LEN) || (pc < 0)) {
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 04/15] rcu: add call_rcu
  2015-01-22 14:47 [Qemu-devel] [PATCH v2 00/15] RCUification of the memory API, parts 1 and 2 Paolo Bonzini
                   ` (2 preceding siblings ...)
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 03/15] rcu: allow nesting of rcu_read_lock/rcu_read_unlock Paolo Bonzini
@ 2015-01-22 14:47 ` Paolo Bonzini
  2015-01-26  6:04   ` Fam Zheng
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 05/15] memory: remove assertion on memory_region_destroy Paolo Bonzini
                   ` (10 subsequent siblings)
  14 siblings, 1 reply; 27+ messages in thread
From: Paolo Bonzini @ 2015-01-22 14:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: borntraeger, famz, stefanha

Asynchronous callbacks provided by call_rcu are particularly important
for QEMU, because the BQL makes it hard to use synchronize_rcu.

In addition, the current RCU implementation is not particularly friendly
to multiple concurrent synchronize_rcu callers, making call_rcu even
more important.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/rcu.txt       | 110 +++++++++++++++++++++++++++++++++++++++++++++++--
 include/qemu/rcu.h |  22 ++++++++++
 util/rcu.c         | 119 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 247 insertions(+), 4 deletions(-)

diff --git a/docs/rcu.txt b/docs/rcu.txt
index 9938ad3..61752b9 100644
--- a/docs/rcu.txt
+++ b/docs/rcu.txt
@@ -82,7 +82,50 @@ The core RCU API is small:
         Note that it would be valid for another update to come while
         synchronize_rcu is running.  Because of this, it is better that
         the updater releases any locks it may hold before calling
-        synchronize_rcu.
+        synchronize_rcu.  If this is not possible (for example, because
+        the updater is protected by the BQL), you can use call_rcu.
+
+     void call_rcu1(struct rcu_head * head,
+                    void (*func)(struct rcu_head *head));
+
+        This function invokes func(head) after all pre-existing RCU
+        read-side critical sections on all threads have completed.  This
+        marks the end of the removal phase, with func taking care
+        asynchronously of the reclamation phase.
+
+        The foo struct needs to have an rcu_head structure added,
+        perhaps as follows:
+
+            struct foo {
+                struct rcu_head rcu;
+                int a;
+                char b;
+                long c;
+            };
+
+        so that the reclaimer function can fetch the struct foo address
+        and free it:
+
+            call_rcu1(&foo.rcu, foo_reclaim);
+
+            void foo_reclaim(struct rcu_head *rp)
+            {
+                struct foo *fp = container_of(rp, struct foo, rcu);
+                g_free(fp);
+            }
+
+        For the common case where the rcu_head member is the first of the
+        struct, you can use the following macro.
+
+     void call_rcu(T *p,
+                   void (*func)(T *p),
+                   field-name);
+
+        call_rcu1 is typically used through this macro, in the common case
+        where the "struct rcu_head" is the first field in the struct.  In
+        the above case, one could have written simply:
+
+            call_rcu(foo_reclaim, g_free, rcu);
 
      typeof(*p) atomic_rcu_read(p);
 
@@ -153,6 +196,11 @@ DIFFERENCES WITH LINUX
 - atomic_rcu_read and atomic_rcu_set replace rcu_dereference and
   rcu_assign_pointer.  They take a _pointer_ to the variable being accessed.
 
+- call_rcu is a macro that has an extra argument (the name of the first
+  field in the struct, which must be a struct rcu_head), and expects the
+  type of the callback's argument to be the type of the first argument.
+  call_rcu1 is the same as Linux's call_rcu.
+
 
 RCU PATTERNS
 ============
@@ -206,7 +254,47 @@ The write side looks simply like this (with appropriate locking):
     synchronize_rcu();
     free(old);
 
-Note that the same idiom would be possible with reader/writer
+If the processing cannot be done purely within the critical section, it
+is possible to combine this idiom with a "real" reference count:
+
+    rcu_read_lock();
+    p = atomic_rcu_read(&foo);
+    foo_ref(p);
+    rcu_read_unlock();
+    /* do something with p. */
+    foo_unref(p);
+
+The write side can be like this:
+
+    qemu_mutex_lock(&foo_mutex);
+    old = foo;
+    atomic_rcu_set(&foo, new);
+    qemu_mutex_unlock(&foo_mutex);
+    synchronize_rcu();
+    foo_unref(old);
+
+or with call_rcu:
+
+    qemu_mutex_lock(&foo_mutex);
+    old = foo;
+    atomic_rcu_set(&foo, new);
+    qemu_mutex_unlock(&foo_mutex);
+    call_rcu(foo_unref, old, rcu);
+
+In both cases, the write side only performs removal.  Reclamation
+happens when the last reference to a "foo" object is dropped.
+Using synchronize_rcu() is undesirably expensive, because the
+last reference may be dropped on the read side.  Hence you can
+use call_rcu() instead:
+
+     foo_unref(struct foo *p) {
+        if (atomic_fetch_dec(&p->refcount) == 1) {
+            call_rcu(foo_destroy, p, rcu);
+        }
+    }
+
+
+Note that the same idioms would be possible with reader/writer
 locks:
 
     read_lock(&foo_rwlock);         write_mutex_lock(&foo_rwlock);
@@ -216,13 +304,27 @@ locks:
                                     write_mutex_unlock(&foo_rwlock);
                                     free(p);
 
+    ------------------------------------------------------------------
+
+    read_lock(&foo_rwlock);         write_mutex_lock(&foo_rwlock);
+    p = foo;                        old = foo;
+    foo_ref(p);                     foo = new;
+    read_unlock(&foo_rwlock);       foo_unref(old);
+    /* do something with p. */      write_mutex_unlock(&foo_rwlock);
+    read_lock(&foo_rwlock);
+    foo_unref(p);
+    read_unlock(&foo_rwlock);
+
+foo_unref could use a mechanism such as bottom halves to move deallocation
+out of the write-side critical section.
+
 
 RCU resizable arrays
 --------------------
 
 Resizable arrays can be used with RCU.  The expensive RCU synchronization
-only needs to take place when the array is resized.  The two items to
-take care of are:
+(or call_rcu) only needs to take place when the array is resized.
+The two items to take care of are:
 
 - ensuring that the old version of the array is available between removal
   and reclamation;
diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
index da043f2..068a279 100644
--- a/include/qemu/rcu.h
+++ b/include/qemu/rcu.h
@@ -118,6 +118,28 @@ extern void synchronize_rcu(void);
 extern void rcu_register_thread(void);
 extern void rcu_unregister_thread(void);
 
+struct rcu_head;
+typedef void RCUCBFunc(struct rcu_head *head);
+
+struct rcu_head {
+    struct rcu_head *next;
+    RCUCBFunc *func;
+};
+
+extern void call_rcu1(struct rcu_head *head, RCUCBFunc *func);
+
+/* The operands of the minus operator must have the same type,
+ * which must be the one that we specify in the cast.
+ */
+#define call_rcu(head, func, field)                                      \
+    call_rcu1(({                                                         \
+         char __attribute__((unused))                                    \
+            offset_must_be_zero[-offsetof(typeof(*(head)), field)],      \
+            func_type_invalid = (func) - (void (*)(typeof(head)))(func); \
+         &(head)->field;                                                 \
+      }),                                                                \
+      (RCUCBFunc *)(func))
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/util/rcu.c b/util/rcu.c
index 1f737d5..c9c3e6e 100644
--- a/util/rcu.c
+++ b/util/rcu.c
@@ -26,6 +26,7 @@
  * IBM's contributions to this file may be relicensed under LGPLv2 or later.
  */
 
+#include "qemu-common.h"
 #include <stdio.h>
 #include <assert.h>
 #include <stdlib.h>
@@ -33,6 +34,7 @@
 #include <errno.h>
 #include "qemu/rcu.h"
 #include "qemu/atomic.h"
+#include "qemu/thread.h"
 
 /*
  * Global grace period counter.  Bit 0 is always one in rcu_gp_ctr.
@@ -149,6 +151,116 @@ void synchronize_rcu(void)
     qemu_mutex_unlock(&rcu_gp_lock);
 }
 
+
+#define RCU_CALL_MIN_SIZE        30
+
+/* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h
+ * from liburcu.  Note that head is only used by the consumer.
+ */
+static struct rcu_head dummy;
+static struct rcu_head *head = &dummy, **tail = &dummy.next;
+static int rcu_call_count;
+static QemuEvent rcu_call_ready_event;
+
+static void enqueue(struct rcu_head *node)
+{
+    struct rcu_head **old_tail;
+
+    node->next = NULL;
+    old_tail = atomic_xchg(&tail, &node->next);
+    atomic_mb_set(old_tail, node);
+}
+
+static struct rcu_head *try_dequeue(void)
+{
+    struct rcu_head *node, *next;
+
+retry:
+    /* Test for an empty list, which we do not expect.  Note that for
+     * the consumer head and tail are always consistent.  The head
+     * is consistent because only the consumer reads/writes it.
+     * The tail, because it is the first step in the enqueuing.
+     * It is only the next pointers that might be inconsistent.
+     */
+    if (head == &dummy && atomic_mb_read(&tail) == &dummy.next) {
+        abort();
+    }
+
+    /* If the head node has NULL in its next pointer, the value is
+     * wrong and we need to wait until its enqueuer finishes the update.
+     */
+    node = head;
+    next = atomic_mb_read(&head->next);
+    if (!next) {
+        return NULL;
+    }
+
+    /* Since we are the sole consumer, and we excluded the empty case
+     * above, the queue will always have at least two nodes: the
+     * dummy node, and the one being removed.  So we do not need to update
+     * the tail pointer.
+     */
+    head = next;
+
+    /* If we dequeued the dummy node, add it back at the end and retry.  */
+    if (node == &dummy) {
+        enqueue(node);
+        goto retry;
+    }
+
+    return node;
+}
+
+static void *call_rcu_thread(void *opaque)
+{
+    struct rcu_head *node;
+
+    for (;;) {
+        int tries = 0;
+        int n = atomic_read(&rcu_call_count);
+
+        /* Heuristically wait for a decent number of callbacks to pile up.
+         * Fetch rcu_call_count now, we only must process elements that were
+         * added before synchronize_rcu() starts.
+         */
+        while (n < RCU_CALL_MIN_SIZE && ++tries <= 5) {
+            g_usleep(100000);
+            qemu_event_reset(&rcu_call_ready_event);
+            n = atomic_read(&rcu_call_count);
+            if (n < RCU_CALL_MIN_SIZE) {
+                qemu_event_wait(&rcu_call_ready_event);
+                n = atomic_read(&rcu_call_count);
+            }
+        }
+
+        atomic_sub(&rcu_call_count, n);
+        synchronize_rcu();
+        while (n > 0) {
+            node = try_dequeue();
+            while (!node) {
+                qemu_event_reset(&rcu_call_ready_event);
+                node = try_dequeue();
+                if (!node) {
+                    qemu_event_wait(&rcu_call_ready_event);
+                    node = try_dequeue();
+                }
+            }
+
+            n--;
+            node->func(node);
+        }
+    }
+    abort();
+}
+
+void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
+{
+    node->func = func;
+    enqueue(node);
+    atomic_inc(&rcu_call_count);
+    qemu_event_set(&rcu_call_ready_event);
+}
+
 void rcu_register_thread(void)
 {
     assert(rcu_reader.ctr == 0);
@@ -166,7 +278,14 @@ void rcu_unregister_thread(void)
 
 static void __attribute__((__constructor__)) rcu_init(void)
 {
+    QemuThread thread;
+
     qemu_mutex_init(&rcu_gp_lock);
     qemu_event_init(&rcu_gp_event, true);
+
+    qemu_event_init(&rcu_call_ready_event, false);
+    qemu_thread_create(&thread, "call_rcu", call_rcu_thread,
+                       NULL, QEMU_THREAD_DETACHED);
+
     rcu_register_thread();
 }
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 05/15] memory: remove assertion on memory_region_destroy
  2015-01-22 14:47 [Qemu-devel] [PATCH v2 00/15] RCUification of the memory API, parts 1 and 2 Paolo Bonzini
                   ` (3 preceding siblings ...)
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 04/15] rcu: add call_rcu Paolo Bonzini
@ 2015-01-22 14:47 ` Paolo Bonzini
  2015-01-26  6:04   ` Fam Zheng
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 06/15] memory: protect current_map by RCU Paolo Bonzini
                   ` (9 subsequent siblings)
  14 siblings, 1 reply; 27+ messages in thread
From: Paolo Bonzini @ 2015-01-22 14:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: borntraeger, famz, stefanha, Jan Kiszka

From: Jan Kiszka <jan.kiszka@siemens.com>

Now that memory_region_destroy can be called from an RCU callback,
checking the BQL-protected global memory_region_transaction_depth
does not make much sense.

Signed-off-by: Jan Kiszka <jan.kiszka@siemens.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 memory.c | 1 -
 1 file changed, 1 deletion(-)

diff --git a/memory.c b/memory.c
index c343bf3..8c3d8c0 100644
--- a/memory.c
+++ b/memory.c
@@ -1263,7 +1263,6 @@ static void memory_region_finalize(Object *obj)
     MemoryRegion *mr = MEMORY_REGION(obj);
 
     assert(QTAILQ_EMPTY(&mr->subregions));
-    assert(memory_region_transaction_depth == 0);
     mr->destructor(mr);
     memory_region_clear_coalescing(mr);
     g_free((char *)mr->name);
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 06/15] memory: protect current_map by RCU
  2015-01-22 14:47 [Qemu-devel] [PATCH v2 00/15] RCUification of the memory API, parts 1 and 2 Paolo Bonzini
                   ` (4 preceding siblings ...)
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 05/15] memory: remove assertion on memory_region_destroy Paolo Bonzini
@ 2015-01-22 14:47 ` Paolo Bonzini
  2015-01-26  6:16   ` Fam Zheng
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 07/15] memory: avoid ref/unref in memory_region_find Paolo Bonzini
                   ` (8 subsequent siblings)
  14 siblings, 1 reply; 27+ messages in thread
From: Paolo Bonzini @ 2015-01-22 14:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: borntraeger, famz, stefanha

Replace the flat_view_mutex by RCU, avoiding futex contention for
dataplane on large systems and many iothreads.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 include/exec/memory.h |  5 +++++
 memory.c              | 54 ++++++++++++++++++++++-----------------------------
 2 files changed, 28 insertions(+), 31 deletions(-)

diff --git a/include/exec/memory.h b/include/exec/memory.h
index 0cd96b1..06ffa1d 100644
--- a/include/exec/memory.h
+++ b/include/exec/memory.h
@@ -33,6 +33,7 @@
 #include "qemu/notify.h"
 #include "qapi/error.h"
 #include "qom/object.h"
+#include "qemu/rcu.h"
 
 #define MAX_PHYS_ADDR_SPACE_BITS 62
 #define MAX_PHYS_ADDR            (((hwaddr)1 << MAX_PHYS_ADDR_SPACE_BITS) - 1)
@@ -207,9 +208,13 @@ struct MemoryListener {
  */
 struct AddressSpace {
     /* All fields are private. */
+    struct rcu_head rcu;
     char *name;
     MemoryRegion *root;
+
+    /* Accessed via RCU.  */
     struct FlatView *current_map;
+
     int ioeventfd_nb;
     struct MemoryRegionIoeventfd *ioeventfds;
     struct AddressSpaceDispatch *dispatch;
diff --git a/memory.c b/memory.c
index 8c3d8c0..a844ced 100644
--- a/memory.c
+++ b/memory.c
@@ -33,26 +33,12 @@ static bool memory_region_update_pending;
 static bool ioeventfd_update_pending;
 static bool global_dirty_log = false;
 
-/* flat_view_mutex is taken around reading as->current_map; the critical
- * section is extremely short, so I'm using a single mutex for every AS.
- * We could also RCU for the read-side.
- *
- * The BQL is taken around transaction commits, hence both locks are taken
- * while writing to as->current_map (with the BQL taken outside).
- */
-static QemuMutex flat_view_mutex;
-
 static QTAILQ_HEAD(memory_listeners, MemoryListener) memory_listeners
     = QTAILQ_HEAD_INITIALIZER(memory_listeners);
 
 static QTAILQ_HEAD(, AddressSpace) address_spaces
     = QTAILQ_HEAD_INITIALIZER(address_spaces);
 
-static void memory_init(void)
-{
-    qemu_mutex_init(&flat_view_mutex);
-}
-
 typedef struct AddrRange AddrRange;
 
 /*
@@ -242,6 +228,7 @@ struct FlatRange {
  * order.
  */
 struct FlatView {
+    struct rcu_head rcu;
     unsigned ref;
     FlatRange *ranges;
     unsigned nr;
@@ -654,10 +641,10 @@ static FlatView *address_space_get_flatview(AddressSpace *as)
 {
     FlatView *view;
 
-    qemu_mutex_lock(&flat_view_mutex);
-    view = as->current_map;
+    rcu_read_lock();
+    view = atomic_rcu_read(&as->current_map);
     flatview_ref(view);
-    qemu_mutex_unlock(&flat_view_mutex);
+    rcu_read_unlock();
     return view;
 }
 
@@ -766,10 +753,9 @@ static void address_space_update_topology(AddressSpace *as)
     address_space_update_topology_pass(as, old_view, new_view, false);
     address_space_update_topology_pass(as, old_view, new_view, true);
 
-    qemu_mutex_lock(&flat_view_mutex);
-    flatview_unref(as->current_map);
-    as->current_map = new_view;
-    qemu_mutex_unlock(&flat_view_mutex);
+    /* Writes are protected by the BQL.  */
+    atomic_rcu_set(&as->current_map, new_view);
+    call_rcu(old_view, flatview_unref, rcu);
 
     /* Note that all the old MemoryRegions are still alive up to this
      * point.  This relieves most MemoryListeners from the need to
@@ -1957,10 +1943,6 @@ void memory_listener_unregister(MemoryListener *listener)
 
 void address_space_init(AddressSpace *as, MemoryRegion *root, const char *name)
 {
-    if (QTAILQ_EMPTY(&address_spaces)) {
-        memory_init();
-    }
-
     memory_region_transaction_begin();
     as->root = root;
     as->current_map = g_new(FlatView, 1);
@@ -1974,15 +1956,10 @@ void address_space_init(AddressSpace *as, MemoryRegion *root, const char *name)
     memory_region_transaction_commit();
 }
 
-void address_space_destroy(AddressSpace *as)
+static void do_address_space_destroy(AddressSpace *as)
 {
     MemoryListener *listener;
 
-    /* Flush out anything from MemoryListeners listening in on this */
-    memory_region_transaction_begin();
-    as->root = NULL;
-    memory_region_transaction_commit();
-    QTAILQ_REMOVE(&address_spaces, as, address_spaces_link);
     address_space_destroy_dispatch(as);
 
     QTAILQ_FOREACH(listener, &memory_listeners, link) {
@@ -1994,6 +1971,21 @@ void address_space_destroy(AddressSpace *as)
     g_free(as->ioeventfds);
 }
 
+void address_space_destroy(AddressSpace *as)
+{
+    /* Flush out anything from MemoryListeners listening in on this */
+    memory_region_transaction_begin();
+    as->root = NULL;
+    memory_region_transaction_commit();
+    QTAILQ_REMOVE(&address_spaces, as, address_spaces_link);
+
+    /* At this point, as->dispatch and as->current_map are dummy
+     * entries that the guest should never use.  Wait for the old
+     * values to expire before freeing the data.
+     */
+    call_rcu(as, do_address_space_destroy, rcu);
+}
+
 bool io_mem_read(MemoryRegion *mr, hwaddr addr, uint64_t *pval, unsigned size)
 {
     return memory_region_dispatch_read(mr, addr, pval, size);
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 07/15] memory: avoid ref/unref in memory_region_find
  2015-01-22 14:47 [Qemu-devel] [PATCH v2 00/15] RCUification of the memory API, parts 1 and 2 Paolo Bonzini
                   ` (5 preceding siblings ...)
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 06/15] memory: protect current_map by RCU Paolo Bonzini
@ 2015-01-22 14:47 ` Paolo Bonzini
  2015-01-26  6:24   ` Fam Zheng
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 08/15] exec: introduce cpu_reload_memory_map Paolo Bonzini
                   ` (7 subsequent siblings)
  14 siblings, 1 reply; 27+ messages in thread
From: Paolo Bonzini @ 2015-01-22 14:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: borntraeger, famz, stefanha

Do the entire lookup under RCU, which avoids atomic operations.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 memory.c | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/memory.c b/memory.c
index a844ced..577e87c 100644
--- a/memory.c
+++ b/memory.c
@@ -1828,7 +1828,8 @@ MemoryRegionSection memory_region_find(MemoryRegion *mr,
     }
     range = addrrange_make(int128_make64(addr), int128_make64(size));
 
-    view = address_space_get_flatview(as);
+    rcu_read_lock();
+    view = atomic_rcu_read(&as->current_map);
     fr = flatview_lookup(view, range);
     if (!fr) {
         flatview_unref(view);
@@ -1850,7 +1851,7 @@ MemoryRegionSection memory_region_find(MemoryRegion *mr,
     ret.readonly = fr->readonly;
     memory_region_ref(ret.mr);
 
-    flatview_unref(view);
+    rcu_read_unlock();
     return ret;
 }
 
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 08/15] exec: introduce cpu_reload_memory_map
  2015-01-22 14:47 [Qemu-devel] [PATCH v2 00/15] RCUification of the memory API, parts 1 and 2 Paolo Bonzini
                   ` (6 preceding siblings ...)
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 07/15] memory: avoid ref/unref in memory_region_find Paolo Bonzini
@ 2015-01-22 14:47 ` Paolo Bonzini
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 09/15] exec: make iotlb RCU-friendly Paolo Bonzini
                   ` (6 subsequent siblings)
  14 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2015-01-22 14:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: borntraeger, famz, stefanha

This for now is a simple TLB flush.  This can change later for two
reasons:

1) an AddressSpaceDispatch will be cached in the CPUState object

2) it will not be possible to do tlb_flush once the TCG-generated code
runs outside the BQL.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 cpu-exec.c              | 6 ++++++
 exec.c                  | 2 +-
 include/exec/exec-all.h | 1 +
 3 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/cpu-exec.c b/cpu-exec.c
index a4f0eff..5a9e23f 100644
--- a/cpu-exec.c
+++ b/cpu-exec.c
@@ -144,6 +144,12 @@ void cpu_resume_from_signal(CPUState *cpu, void *puc)
     cpu->exception_index = -1;
     siglongjmp(cpu->jmp_env, 1);
 }
+
+void cpu_reload_memory_map(CPUState *cpu)
+{
+    /* The TLB is protected by the iothread lock.  */
+    tlb_flush(cpu, 1);
+}
 #endif
 
 /* Execute a TB, and fix up the CPU state afterwards if necessary */
diff --git a/exec.c b/exec.c
index 410371d..d79a1bc 100644
--- a/exec.c
+++ b/exec.c
@@ -2025,7 +2025,7 @@ static void tcg_commit(MemoryListener *listener)
         if (cpu->tcg_as_listener != listener) {
             continue;
         }
-        tlb_flush(cpu, 1);
+        cpu_reload_memory_map(cpu);
     }
 }
 
diff --git a/include/exec/exec-all.h b/include/exec/exec-all.h
index 6a15448..1b30813 100644
--- a/include/exec/exec-all.h
+++ b/include/exec/exec-all.h
@@ -96,6 +96,7 @@ void tb_invalidate_phys_page_range(tb_page_addr_t start, tb_page_addr_t end,
 void tb_invalidate_phys_range(tb_page_addr_t start, tb_page_addr_t end,
                               int is_cpu_write_access);
 #if !defined(CONFIG_USER_ONLY)
+void cpu_reload_memory_map(CPUState *cpu);
 void tcg_cpu_address_space_init(CPUState *cpu, AddressSpace *as);
 /* cputlb.c */
 void tlb_flush_page(CPUState *cpu, target_ulong addr);
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 09/15] exec: make iotlb RCU-friendly
  2015-01-22 14:47 [Qemu-devel] [PATCH v2 00/15] RCUification of the memory API, parts 1 and 2 Paolo Bonzini
                   ` (7 preceding siblings ...)
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 08/15] exec: introduce cpu_reload_memory_map Paolo Bonzini
@ 2015-01-22 14:47 ` Paolo Bonzini
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 10/15] exec: RCUify AddressSpaceDispatch Paolo Bonzini
                   ` (5 subsequent siblings)
  14 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2015-01-22 14:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: borntraeger, famz, stefanha

After the previous patch, TLBs will be flushed on every change to
the memory mapping.  This patch augments that with synchronization
of the MemoryRegionSections referred to in the iotlb array.

With this change, it is guaranteed that iotlb_to_region will access
the correct memory map, even once the TLB will be accessed outside
the BQL.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 cpu-exec.c              |  6 +++++-
 cputlb.c                |  5 ++---
 exec.c                  | 13 ++++++++-----
 include/exec/cputlb.h   |  2 +-
 include/exec/exec-all.h |  3 ++-
 include/qom/cpu.h       |  1 +
 softmmu_template.h      |  4 ++--
 7 files changed, 21 insertions(+), 13 deletions(-)

diff --git a/cpu-exec.c b/cpu-exec.c
index 5a9e23f..12473ff 100644
--- a/cpu-exec.c
+++ b/cpu-exec.c
@@ -24,6 +24,8 @@
 #include "qemu/atomic.h"
 #include "sysemu/qtest.h"
 #include "qemu/timer.h"
+#include "exec/address-spaces.h"
+#include "exec/memory-internal.h"
 
 /* -icount align implementation. */
 
@@ -147,7 +149,9 @@ void cpu_resume_from_signal(CPUState *cpu, void *puc)
 
 void cpu_reload_memory_map(CPUState *cpu)
 {
-    /* The TLB is protected by the iothread lock.  */
+    /* The CPU and TLB are protected by the iothread lock.  */
+    AddressSpaceDispatch *d = cpu->as->dispatch;
+    cpu->memory_dispatch = d;
     tlb_flush(cpu, 1);
 }
 #endif
diff --git a/cputlb.c b/cputlb.c
index 3b271d4..f92db5e 100644
--- a/cputlb.c
+++ b/cputlb.c
@@ -265,8 +265,7 @@ void tlb_set_page(CPUState *cpu, target_ulong vaddr,
     }
 
     sz = size;
-    section = address_space_translate_for_iotlb(cpu->as, paddr,
-                                                &xlat, &sz);
+    section = address_space_translate_for_iotlb(cpu, paddr, &xlat, &sz);
     assert(sz >= TARGET_PAGE_SIZE);
 
 #if defined(DEBUG_TLB)
@@ -347,7 +346,7 @@ tb_page_addr_t get_page_addr_code(CPUArchState *env1, target_ulong addr)
         cpu_ldub_code(env1, addr);
     }
     pd = env1->iotlb[mmu_idx][page_index] & ~TARGET_PAGE_MASK;
-    mr = iotlb_to_region(cpu->as, pd);
+    mr = iotlb_to_region(cpu, pd);
     if (memory_region_is_unassigned(mr)) {
         CPUClass *cc = CPU_GET_CLASS(cpu);
 
diff --git a/exec.c b/exec.c
index d79a1bc..762ec76 100644
--- a/exec.c
+++ b/exec.c
@@ -401,11 +401,12 @@ MemoryRegion *address_space_translate(AddressSpace *as, hwaddr addr,
 }
 
 MemoryRegionSection *
-address_space_translate_for_iotlb(AddressSpace *as, hwaddr addr, hwaddr *xlat,
-                                  hwaddr *plen)
+address_space_translate_for_iotlb(CPUState *cpu, hwaddr addr,
+                                  hwaddr *xlat, hwaddr *plen)
 {
     MemoryRegionSection *section;
-    section = address_space_translate_internal(as->dispatch, addr, xlat, plen, false);
+    section = address_space_translate_internal(cpu->memory_dispatch,
+                                               addr, xlat, plen, false);
 
     assert(!section->mr->iommu_ops);
     return section;
@@ -1960,9 +1961,11 @@ static uint16_t dummy_section(PhysPageMap *map, AddressSpace *as,
     return phys_section_add(map, &section);
 }
 
-MemoryRegion *iotlb_to_region(AddressSpace *as, hwaddr index)
+MemoryRegion *iotlb_to_region(CPUState *cpu, hwaddr index)
 {
-    return as->dispatch->map.sections[index & ~TARGET_PAGE_MASK].mr;
+    MemoryRegionSection *sections = cpu->memory_dispatch->map.sections;
+
+    return sections[index & ~TARGET_PAGE_MASK].mr;
 }
 
 static void io_mem_init(void)
diff --git a/include/exec/cputlb.h b/include/exec/cputlb.h
index b8ecd6f..e0da9d7 100644
--- a/include/exec/cputlb.h
+++ b/include/exec/cputlb.h
@@ -34,7 +34,7 @@ extern int tlb_flush_count;
 void tb_flush_jmp_cache(CPUState *cpu, target_ulong addr);
 
 MemoryRegionSection *
-address_space_translate_for_iotlb(AddressSpace *as, hwaddr addr, hwaddr *xlat,
+address_space_translate_for_iotlb(CPUState *cpu, hwaddr addr, hwaddr *xlat,
                                   hwaddr *plen);
 hwaddr memory_region_section_get_iotlb(CPUState *cpu,
                                        MemoryRegionSection *section,
diff --git a/include/exec/exec-all.h b/include/exec/exec-all.h
index 1b30813..bb3fd37 100644
--- a/include/exec/exec-all.h
+++ b/include/exec/exec-all.h
@@ -338,7 +338,8 @@ extern uintptr_t tci_tb_ptr;
 
 void phys_mem_set_alloc(void *(*alloc)(size_t, uint64_t *align));
 
-struct MemoryRegion *iotlb_to_region(AddressSpace *as, hwaddr index);
+struct MemoryRegion *iotlb_to_region(CPUState *cpu,
+                                     hwaddr index);
 bool io_mem_read(struct MemoryRegion *mr, hwaddr addr,
                  uint64_t *pvalue, unsigned size);
 bool io_mem_write(struct MemoryRegion *mr, hwaddr addr,
diff --git a/include/qom/cpu.h b/include/qom/cpu.h
index 2098f1c..48fd6fb 100644
--- a/include/qom/cpu.h
+++ b/include/qom/cpu.h
@@ -256,6 +256,7 @@ struct CPUState {
     sigjmp_buf jmp_env;
 
     AddressSpace *as;
+    struct AddressSpaceDispatch *memory_dispatch;
     MemoryListener *tcg_as_listener;
 
     void *env_ptr; /* CPUArchState */
diff --git a/softmmu_template.h b/softmmu_template.h
index 6b4e615..0e3dd35 100644
--- a/softmmu_template.h
+++ b/softmmu_template.h
@@ -149,7 +149,7 @@ static inline DATA_TYPE glue(io_read, SUFFIX)(CPUArchState *env,
 {
     uint64_t val;
     CPUState *cpu = ENV_GET_CPU(env);
-    MemoryRegion *mr = iotlb_to_region(cpu->as, physaddr);
+    MemoryRegion *mr = iotlb_to_region(cpu, physaddr);
 
     physaddr = (physaddr & TARGET_PAGE_MASK) + addr;
     cpu->mem_io_pc = retaddr;
@@ -369,7 +369,7 @@ static inline void glue(io_write, SUFFIX)(CPUArchState *env,
                                           uintptr_t retaddr)
 {
     CPUState *cpu = ENV_GET_CPU(env);
-    MemoryRegion *mr = iotlb_to_region(cpu->as, physaddr);
+    MemoryRegion *mr = iotlb_to_region(cpu, physaddr);
 
     physaddr = (physaddr & TARGET_PAGE_MASK) + addr;
     if (mr != &io_mem_rom && mr != &io_mem_notdirty && !cpu_can_do_io(cpu)) {
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 10/15] exec: RCUify AddressSpaceDispatch
  2015-01-22 14:47 [Qemu-devel] [PATCH v2 00/15] RCUification of the memory API, parts 1 and 2 Paolo Bonzini
                   ` (8 preceding siblings ...)
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 09/15] exec: make iotlb RCU-friendly Paolo Bonzini
@ 2015-01-22 14:47 ` Paolo Bonzini
  2015-01-28  5:45   ` Fam Zheng
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 11/15] rcu: introduce RCU-enabled QLIST Paolo Bonzini
                   ` (4 subsequent siblings)
  14 siblings, 1 reply; 27+ messages in thread
From: Paolo Bonzini @ 2015-01-22 14:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: borntraeger, famz, stefanha

Note that even after this patch, most callers of address_space_*
functions must still be under the big QEMU lock, otherwise the memory
region returned by address_space_translate can disappear as soon as
address_space_translate returns.  This will be fixed in the next part
of this series.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 cpu-exec.c              | 13 ++++++++++++-
 cpus.c                  |  2 +-
 cputlb.c                |  8 ++++++--
 exec.c                  | 34 ++++++++++++++++++++++++++--------
 hw/i386/intel_iommu.c   |  3 +++
 hw/pci-host/apb.c       |  1 +
 hw/ppc/spapr_iommu.c    |  1 +
 include/exec/exec-all.h |  1 +
 8 files changed, 51 insertions(+), 12 deletions(-)

diff --git a/cpu-exec.c b/cpu-exec.c
index 12473ff..edc5eb9 100644
--- a/cpu-exec.c
+++ b/cpu-exec.c
@@ -26,6 +26,7 @@
 #include "qemu/timer.h"
 #include "exec/address-spaces.h"
 #include "exec/memory-internal.h"
+#include "qemu/rcu.h"
 
 /* -icount align implementation. */
 
@@ -149,8 +150,15 @@ void cpu_resume_from_signal(CPUState *cpu, void *puc)
 
 void cpu_reload_memory_map(CPUState *cpu)
 {
+    AddressSpaceDispatch *d;
+
+    if (qemu_in_vcpu_thread()) {
+        rcu_read_unlock();
+        rcu_read_lock();
+    }
+
     /* The CPU and TLB are protected by the iothread lock.  */
-    AddressSpaceDispatch *d = cpu->as->dispatch;
+    d = atomic_rcu_read(&cpu->as->dispatch);
     cpu->memory_dispatch = d;
     tlb_flush(cpu, 1);
 }
@@ -365,6 +373,8 @@ int cpu_exec(CPUArchState *env)
      * an instruction scheduling constraint on modern architectures.  */
     smp_mb();
 
+    rcu_read_lock();
+
     if (unlikely(exit_request)) {
         cpu->exit_request = 1;
     }
@@ -567,6 +577,7 @@ int cpu_exec(CPUArchState *env)
     } /* for(;;) */
 
     cc->cpu_exec_exit(cpu);
+    rcu_read_unlock();
 
     /* fail safe : never use current_cpu outside cpu_exec() */
     current_cpu = NULL;
diff --git a/cpus.c b/cpus.c
index 3a5323b..b02c793 100644
--- a/cpus.c
+++ b/cpus.c
@@ -1121,7 +1121,7 @@ bool qemu_cpu_is_self(CPUState *cpu)
     return qemu_thread_is_self(cpu->thread);
 }
 
-static bool qemu_in_vcpu_thread(void)
+bool qemu_in_vcpu_thread(void)
 {
     return current_cpu && qemu_cpu_is_self(current_cpu);
 }
diff --git a/cputlb.c b/cputlb.c
index f92db5e..38f2151 100644
--- a/cputlb.c
+++ b/cputlb.c
@@ -243,8 +243,12 @@ static void tlb_add_large_page(CPUArchState *env, target_ulong vaddr,
 }
 
 /* Add a new TLB entry. At most one entry for a given virtual address
-   is permitted. Only a single TARGET_PAGE_SIZE region is mapped, the
-   supplied size is only used by tlb_flush_page.  */
+ * is permitted. Only a single TARGET_PAGE_SIZE region is mapped, the
+ * supplied size is only used by tlb_flush_page.
+ *
+ * Called from TCG-generated code, which is under an RCU read-side
+ * critical section.
+ */
 void tlb_set_page(CPUState *cpu, target_ulong vaddr,
                   hwaddr paddr, int prot,
                   int mmu_idx, target_ulong size)
diff --git a/exec.c b/exec.c
index 762ec76..262e8bc 100644
--- a/exec.c
+++ b/exec.c
@@ -115,6 +115,8 @@ struct PhysPageEntry {
 typedef PhysPageEntry Node[P_L2_SIZE];
 
 typedef struct PhysPageMap {
+    struct rcu_head rcu;
+
     unsigned sections_nb;
     unsigned sections_nb_alloc;
     unsigned nodes_nb;
@@ -124,6 +126,8 @@ typedef struct PhysPageMap {
 } PhysPageMap;
 
 struct AddressSpaceDispatch {
+    struct rcu_head rcu;
+
     /* This is a multi-level map on the physical address space.
      * The bottom level has pointers to MemoryRegionSections.
      */
@@ -315,6 +319,7 @@ bool memory_region_is_unassigned(MemoryRegion *mr)
         && mr != &io_mem_watch;
 }
 
+/* Called from RCU critical section */
 static MemoryRegionSection *address_space_lookup_region(AddressSpaceDispatch *d,
                                                         hwaddr addr,
                                                         bool resolve_subpage)
@@ -330,6 +335,7 @@ static MemoryRegionSection *address_space_lookup_region(AddressSpaceDispatch *d,
     return section;
 }
 
+/* Called from RCU critical section */
 static MemoryRegionSection *
 address_space_translate_internal(AddressSpaceDispatch *d, hwaddr addr, hwaddr *xlat,
                                  hwaddr *plen, bool resolve_subpage)
@@ -370,8 +376,10 @@ MemoryRegion *address_space_translate(AddressSpace *as, hwaddr addr,
     MemoryRegion *mr;
     hwaddr len = *plen;
 
+    rcu_read_lock();
     for (;;) {
-        section = address_space_translate_internal(as->dispatch, addr, &addr, plen, true);
+        AddressSpaceDispatch *d = atomic_rcu_read(&as->dispatch);
+        section = address_space_translate_internal(d, addr, &addr, plen, true);
         mr = section->mr;
 
         if (!mr->iommu_ops) {
@@ -397,9 +405,11 @@ MemoryRegion *address_space_translate(AddressSpace *as, hwaddr addr,
 
     *plen = len;
     *xlat = addr;
+    rcu_read_unlock();
     return mr;
 }
 
+/* Called from RCU critical section */
 MemoryRegionSection *
 address_space_translate_for_iotlb(CPUState *cpu, hwaddr addr,
                                   hwaddr *xlat, hwaddr *plen)
@@ -852,6 +862,7 @@ static void cpu_physical_memory_set_dirty_tracking(bool enable)
     in_migration = enable;
 }
 
+/* Called from RCU critical section */
 hwaddr memory_region_section_get_iotlb(CPUState *cpu,
                                        MemoryRegionSection *section,
                                        target_ulong vaddr,
@@ -1963,7 +1974,8 @@ static uint16_t dummy_section(PhysPageMap *map, AddressSpace *as,
 
 MemoryRegion *iotlb_to_region(CPUState *cpu, hwaddr index)
 {
-    MemoryRegionSection *sections = cpu->memory_dispatch->map.sections;
+    AddressSpaceDispatch *d = atomic_rcu_read(&cpu->memory_dispatch);
+    MemoryRegionSection *sections = d->map.sections;
 
     return sections[index & ~TARGET_PAGE_MASK].mr;
 }
@@ -1999,6 +2011,12 @@ static void mem_begin(MemoryListener *listener)
     as->next_dispatch = d;
 }
 
+static void address_space_dispatch_free(AddressSpaceDispatch *d)
+{
+    phys_sections_free(&d->map);
+    g_free(d);
+}
+
 static void mem_commit(MemoryListener *listener)
 {
     AddressSpace *as = container_of(listener, AddressSpace, dispatch_listener);
@@ -2007,11 +2025,9 @@ static void mem_commit(MemoryListener *listener)
 
     phys_page_compact_all(next, next->map.nodes_nb);
 
-    as->dispatch = next;
-
+    atomic_rcu_set(&as->dispatch, next);
     if (cur) {
-        phys_sections_free(&cur->map);
-        g_free(cur);
+        call_rcu(cur, address_space_dispatch_free, rcu);
     }
 }
 
@@ -2066,8 +2082,10 @@ void address_space_destroy_dispatch(AddressSpace *as)
     AddressSpaceDispatch *d = as->dispatch;
 
     memory_listener_unregister(&as->dispatch_listener);
-    g_free(d);
-    as->dispatch = NULL;
+    atomic_rcu_set(&as->dispatch, NULL);
+    if (d) {
+        call_rcu(d, address_space_dispatch_free, rcu);
+    }
 }
 
 static void memory_map_init(void)
diff --git a/hw/i386/intel_iommu.c b/hw/i386/intel_iommu.c
index 0a4282a..7da70ff 100644
--- a/hw/i386/intel_iommu.c
+++ b/hw/i386/intel_iommu.c
@@ -745,6 +745,9 @@ static inline bool vtd_is_interrupt_addr(hwaddr addr)
 
 /* Map dev to context-entry then do a paging-structures walk to do a iommu
  * translation.
+ *
+ * Called from RCU critical section.
+ *
  * @bus_num: The bus number
  * @devfn: The devfn, which is the  combined of device and function number
  * @is_write: The access is a write operation
diff --git a/hw/pci-host/apb.c b/hw/pci-host/apb.c
index f573875..832b6c7 100644
--- a/hw/pci-host/apb.c
+++ b/hw/pci-host/apb.c
@@ -205,6 +205,7 @@ static AddressSpace *pbm_pci_dma_iommu(PCIBus *bus, void *opaque, int devfn)
     return &is->iommu_as;
 }
 
+/* Called from RCU critical section */
 static IOMMUTLBEntry pbm_translate_iommu(MemoryRegion *iommu, hwaddr addr,
                                          bool is_write)
 {
diff --git a/hw/ppc/spapr_iommu.c b/hw/ppc/spapr_iommu.c
index da47474..ba003da 100644
--- a/hw/ppc/spapr_iommu.c
+++ b/hw/ppc/spapr_iommu.c
@@ -59,6 +59,7 @@ static sPAPRTCETable *spapr_tce_find_by_liobn(uint32_t liobn)
     return NULL;
 }
 
+/* Called from RCU critical section */
 static IOMMUTLBEntry spapr_tce_translate_iommu(MemoryRegion *iommu, hwaddr addr,
                                                bool is_write)
 {
diff --git a/include/exec/exec-all.h b/include/exec/exec-all.h
index bb3fd37..8eb0db3 100644
--- a/include/exec/exec-all.h
+++ b/include/exec/exec-all.h
@@ -96,6 +96,7 @@ void tb_invalidate_phys_page_range(tb_page_addr_t start, tb_page_addr_t end,
 void tb_invalidate_phys_range(tb_page_addr_t start, tb_page_addr_t end,
                               int is_cpu_write_access);
 #if !defined(CONFIG_USER_ONLY)
+bool qemu_in_vcpu_thread(void);
 void cpu_reload_memory_map(CPUState *cpu);
 void tcg_cpu_address_space_init(CPUState *cpu, AddressSpace *as);
 /* cputlb.c */
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 11/15] rcu: introduce RCU-enabled QLIST
  2015-01-22 14:47 [Qemu-devel] [PATCH v2 00/15] RCUification of the memory API, parts 1 and 2 Paolo Bonzini
                   ` (9 preceding siblings ...)
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 10/15] exec: RCUify AddressSpaceDispatch Paolo Bonzini
@ 2015-01-22 14:47 ` Paolo Bonzini
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 12/15] exec: protect mru_block with RCU Paolo Bonzini
                   ` (3 subsequent siblings)
  14 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2015-01-22 14:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: Mike Day, borntraeger, famz, stefanha

From: Mike Day <ncmike@ncultra.org>

Add RCU-enabled variants on the existing bsd DQ facility. Each
operation has the same interface as the existing (non-RCU)
version. Also, each operation is implemented as macro.

Using the RCU-enabled QLIST, existing QLIST users will be able to
convert to RCU without using a different list interface.

Signed-off-by: Mike Day <ncmike@ncultra.org>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 hw/9pfs/virtio-9p-synth.c |   2 +-
 include/qemu/queue.h      |  11 --
 include/qemu/rcu_queue.h  | 134 +++++++++++++++++++
 tests/Makefile            |   5 +-
 tests/test-rcu-list.c     | 326 ++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 465 insertions(+), 13 deletions(-)
 create mode 100644 include/qemu/rcu_queue.h
 create mode 100644 tests/test-rcu-list.c

diff --git a/hw/9pfs/virtio-9p-synth.c b/hw/9pfs/virtio-9p-synth.c
index e75aa87..a0ab9a8 100644
--- a/hw/9pfs/virtio-9p-synth.c
+++ b/hw/9pfs/virtio-9p-synth.c
@@ -18,7 +18,7 @@
 #include "fsdev/qemu-fsdev.h"
 #include "virtio-9p-synth.h"
 #include "qemu/rcu.h"
-
+#include "qemu/rcu_queue.h"
 #include <sys/stat.h>
 
 /* Root node for synth file system */
diff --git a/include/qemu/queue.h b/include/qemu/queue.h
index c602797..8094150 100644
--- a/include/qemu/queue.h
+++ b/include/qemu/queue.h
@@ -139,17 +139,6 @@ struct {                                                                \
         (elm)->field.le_prev = &(head)->lh_first;                       \
 } while (/*CONSTCOND*/0)
 
-#define QLIST_INSERT_HEAD_RCU(head, elm, field) do {                    \
-        (elm)->field.le_prev = &(head)->lh_first;                       \
-        (elm)->field.le_next = (head)->lh_first;                        \
-        smp_wmb(); /* fill elm before linking it */                     \
-        if ((head)->lh_first != NULL)  {                                \
-            (head)->lh_first->field.le_prev = &(elm)->field.le_next;    \
-        }                                                               \
-        (head)->lh_first = (elm);                                       \
-        smp_wmb();                                                      \
-} while (/* CONSTCOND*/0)
-
 #define QLIST_REMOVE(elm, field) do {                                   \
         if ((elm)->field.le_next != NULL)                               \
                 (elm)->field.le_next->field.le_prev =                   \
diff --git a/include/qemu/rcu_queue.h b/include/qemu/rcu_queue.h
new file mode 100644
index 0000000..3aca7a5
--- /dev/null
+++ b/include/qemu/rcu_queue.h
@@ -0,0 +1,134 @@
+#ifndef QEMU_RCU_QUEUE_H
+#define QEMU_RCU_QUEUE_H
+
+/*
+ * rcu_queue.h
+ *
+ * RCU-friendly versions of the queue.h primitives.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * Copyright (c) 2013 Mike D. Day, IBM Corporation.
+ *
+ * IBM's contributions to this file may be relicensed under LGPLv2 or later.
+ */
+
+#include "qemu/queue.h"
+#include "qemu/atomic.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+
+/*
+ * List access methods.
+ */
+#define QLIST_EMPTY_RCU(head) (atomic_rcu_read(&(head)->lh_first) == NULL)
+#define QLIST_FIRST_RCU(head) (atomic_rcu_read(&(head)->lh_first))
+#define QLIST_NEXT_RCU(elm, field) (atomic_rcu_read(&(elm)->field.le_next))
+
+/*
+ * List functions.
+ */
+
+
+/*
+ *  The difference between atomic_read/set and atomic_rcu_read/set
+ *  is in the including of a read/write memory barrier to the volatile
+ *  access. atomic_rcu_* macros include the memory barrier, the
+ *  plain atomic macros do not. Therefore, it should be correct to
+ *  issue a series of reads or writes to the same element using only
+ *  the atomic_* macro, until the last read or write, which should be
+ *  atomic_rcu_* to introduce a read or write memory barrier as
+ *  appropriate.
+ */
+
+/* Upon publication of the listelm->next value, list readers
+ * will see the new node when following next pointers from
+ * antecedent nodes, but may not see the new node when following
+ * prev pointers from subsequent nodes until after the RCU grace
+ * period expires.
+ * see linux/include/rculist.h __list_add_rcu(new, prev, next)
+ */
+#define QLIST_INSERT_AFTER_RCU(listelm, elm, field) do {    \
+    (elm)->field.le_next = (listelm)->field.le_next;        \
+    (elm)->field.le_prev = &(listelm)->field.le_next;       \
+    atomic_rcu_set(&(listelm)->field.le_next, (elm));       \
+    if ((elm)->field.le_next != NULL) {                     \
+       (elm)->field.le_next->field.le_prev =                \
+        &(elm)->field.le_next;                              \
+    }                                                       \
+} while (/*CONSTCOND*/0)
+
+/* Upon publication of the listelm->prev->next value, list
+ * readers will see the new element when following prev pointers
+ * from subsequent elements, but may not see the new element
+ * when following next pointers from antecedent elements
+ * until after the RCU grace period expires.
+ */
+#define QLIST_INSERT_BEFORE_RCU(listelm, elm, field) do {   \
+    (elm)->field.le_prev = (listelm)->field.le_prev;        \
+    (elm)->field.le_next = (listelm);                       \
+    atomic_rcu_set((listelm)->field.le_prev, (elm));        \
+    (listelm)->field.le_prev = &(elm)->field.le_next;       \
+} while (/*CONSTCOND*/0)
+
+/* Upon publication of the head->first value, list readers
+ * will see the new element when following the head, but may
+ * not see the new element when following prev pointers from
+ * subsequent elements until after the RCU grace period has
+ * expired.
+ */
+#define QLIST_INSERT_HEAD_RCU(head, elm, field) do {    \
+    (elm)->field.le_prev = &(head)->lh_first;           \
+    (elm)->field.le_next = (head)->lh_first;            \
+    atomic_rcu_set((&(head)->lh_first), (elm));         \
+    if ((elm)->field.le_next != NULL) {                 \
+       (elm)->field.le_next->field.le_prev =            \
+        &(elm)->field.le_next;                          \
+    }                                                   \
+} while (/*CONSTCOND*/0)
+
+
+/* prior to publication of the elm->prev->next value, some list
+ * readers may still see the removed element when following
+ * the antecedent's next pointer.
+ */
+#define QLIST_REMOVE_RCU(elm, field) do {           \
+    if ((elm)->field.le_next != NULL) {             \
+       (elm)->field.le_next->field.le_prev =        \
+        (elm)->field.le_prev;                       \
+    }                                               \
+    *(elm)->field.le_prev =  (elm)->field.le_next;  \
+} while (/*CONSTCOND*/0)
+
+/* List traversal must occur within an RCU critical section.  */
+#define QLIST_FOREACH_RCU(var, head, field)                 \
+        for ((var) = atomic_rcu_read(&(head)->lh_first);    \
+                (var);                                      \
+                (var) = atomic_rcu_read(&(var)->field.le_next))
+
+/* List traversal must occur within an RCU critical section.  */
+#define QLIST_FOREACH_SAFE_RCU(var, head, field, next_var)           \
+    for ((var) = (atomic_rcu_read(&(head)->lh_first));               \
+      (var) &&                                                       \
+          ((next_var) = atomic_rcu_read(&(var)->field.le_next), 1);  \
+           (var) = (next_var))
+
+#ifdef __cplusplus
+}
+#endif
+#endif /* QEMU_RCU_QUEUE.H */
diff --git a/tests/Makefile b/tests/Makefile
index db5b3c3..ef8d589 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -62,6 +62,8 @@ check-unit-y += tests/test-int128$(EXESUF)
 gcov-files-test-int128-y =
 check-unit-y += tests/rcutorture$(EXESUF)
 gcov-files-rcutorture-y = util/rcu.c
+check-unit-y += tests/test-rcu-list$(EXESUF)
+gcov-files-test-rcu-list-y = util/rcu.c
 check-unit-y += tests/test-bitops$(EXESUF)
 check-unit-$(CONFIG_HAS_GLIB_SUBPROCESS_TESTS) += tests/test-qdev-global-props$(EXESUF)
 check-unit-y += tests/check-qom-interface$(EXESUF)
@@ -226,7 +228,7 @@ test-obj-y = tests/check-qint.o tests/check-qstring.o tests/check-qdict.o \
 	tests/test-qmp-commands.o tests/test-visitor-serialization.o \
 	tests/test-x86-cpuid.o tests/test-mul64.o tests/test-int128.o \
 	tests/test-opts-visitor.o tests/test-qmp-event.o \
-	tests/rcutorture.o
+	tests/rcutorture.o tests/test-rcu-list.o
 
 test-qapi-obj-y = tests/test-qapi-visit.o tests/test-qapi-types.o \
 		  tests/test-qapi-event.o
@@ -256,6 +258,7 @@ tests/test-xbzrle$(EXESUF): tests/test-xbzrle.o migration/xbzrle.o page_cache.o
 tests/test-cutils$(EXESUF): tests/test-cutils.o util/cutils.o
 tests/test-int128$(EXESUF): tests/test-int128.o
 tests/rcutorture$(EXESUF): tests/rcutorture.o libqemuutil.a
+tests/test-rcu-list$(EXESUF): tests/test-rcu-list.o libqemuutil.a
 
 tests/test-qdev-global-props$(EXESUF): tests/test-qdev-global-props.o \
 	hw/core/qdev.o hw/core/qdev-properties.o hw/core/hotplug.o\
diff --git a/tests/test-rcu-list.c b/tests/test-rcu-list.c
new file mode 100644
index 0000000..082d33a
--- /dev/null
+++ b/tests/test-rcu-list.c
@@ -0,0 +1,326 @@
+/*
+ * rcuq_test.c
+ *
+ * usage: rcuq_test <readers> <duration>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright (c) 2013 Mike D. Day, IBM Corporation.
+ */
+
+/*
+ * Test variables.
+ */
+
+#include <glib.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include "qemu/atomic.h"
+#include "qemu/rcu.h"
+#include "qemu/compiler.h"
+#include "qemu/osdep.h"
+#include "qemu/thread.h"
+#include "qemu/rcu_queue.h"
+
+/* QLIST_ENTRY MUST be the first element of (var)
+ */
+#define QLIST_FOREACH_REVERSE_RCU(var, head, field)                     \
+    for ( ;                                                             \
+    (var) && (var) != (head)->lh_first;                                 \
+          (var) = (typeof(var))atomic_rcu_read(&(var)->field.le_prev))
+
+
+long long n_reads = 0LL;
+long long n_updates = 0LL;
+long long n_reclaims = 0LL;
+long long n_nodes_removed = 0LL;
+long long n_nodes = 0LL;
+int g_test_in_charge = 0;
+
+int nthreadsrunning;
+
+char argsbuf[64];
+
+#define GOFLAG_INIT 0
+#define GOFLAG_RUN  1
+#define GOFLAG_STOP 2
+
+static volatile int goflag = GOFLAG_INIT;
+
+#define RCU_READ_RUN 1000
+#define RCU_UPDATE_RUN 10
+#define NR_THREADS 100
+#define RCU_Q_LEN 100
+
+static QemuThread threads[NR_THREADS];
+static struct rcu_reader_data *data[NR_THREADS];
+static int n_threads;
+
+static int select_random_el(int max)
+{
+    return (rand() % max);
+}
+
+
+static void create_thread(void *(*func)(void *))
+{
+    if (n_threads >= NR_THREADS) {
+        fprintf(stderr, "Thread limit of %d exceeded!\n", NR_THREADS);
+        exit(-1);
+    }
+    qemu_thread_create(&threads[n_threads], "test", func, &data[n_threads],
+                       QEMU_THREAD_JOINABLE);
+    n_threads++;
+}
+
+static void wait_all_threads(void)
+{
+    int i;
+
+    for (i = 0; i < n_threads; i++) {
+        qemu_thread_join(&threads[i]);
+    }
+    n_threads = 0;
+}
+
+
+struct list_element {
+    QLIST_ENTRY(list_element) entry;
+    struct rcu_head rcu;
+    long long val;
+};
+
+static void reclaim_list_el(struct rcu_head *prcu)
+{
+    struct list_element *el = container_of(prcu, struct list_element, rcu);
+    g_free(el);
+    atomic_add(&n_reclaims, 1);
+}
+
+static QLIST_HEAD(q_list_head, list_element) Q_list_head;
+
+static void *rcu_q_reader(void *arg)
+{
+    long long j, n_reads_local = 0;
+    struct list_element *el, *prev_el = NULL;
+
+    *(struct rcu_reader_data **)arg = &rcu_reader;
+    atomic_inc(&nthreadsrunning);
+    while (goflag == GOFLAG_INIT) {
+        g_usleep(1000);
+    }
+
+    while (goflag == GOFLAG_RUN) {
+        rcu_read_lock();
+        QLIST_FOREACH_RCU(el, &Q_list_head, entry) {
+            prev_el = el;
+            j = el->val;
+            if (j) {
+                ;
+            }
+            n_reads_local++;
+            if (goflag == GOFLAG_STOP) {
+                break;
+            }
+        }
+        rcu_read_unlock();
+
+        g_usleep(100);
+        rcu_read_lock();
+        QLIST_FOREACH_REVERSE_RCU(prev_el, &Q_list_head, entry) {
+            el = prev_el;
+            j = el->val;
+            if (j) {
+                ;
+            }
+            n_reads_local++;
+            if (goflag != GOFLAG_RUN) {
+                break;
+            }
+        }
+        rcu_read_unlock();
+
+        g_usleep(100);
+    }
+    atomic_add(&n_reads, n_reads_local);
+    return NULL;
+}
+
+
+static void *rcu_q_updater(void *arg)
+{
+    int j, target_el;
+    long long n_updates_local = 0;
+    long long n_removed_local = 0;
+    struct list_element *el, *prev_el;
+
+    *(struct rcu_reader_data **)arg = &rcu_reader;
+    atomic_inc(&nthreadsrunning);
+    while (goflag == GOFLAG_INIT) {
+        g_usleep(1000);
+    }
+
+    while (goflag == GOFLAG_RUN) {
+        target_el = select_random_el(RCU_Q_LEN);
+        j = 0;
+        /* FOREACH_RCU could work here but let's use both macros */
+        QLIST_FOREACH_SAFE_RCU(prev_el, &Q_list_head, entry, el) {
+            j++;
+            if (target_el == j) {
+                QLIST_REMOVE_RCU(prev_el, entry);
+                /* may be more than one updater in the future */
+                call_rcu1(&prev_el->rcu, reclaim_list_el);
+                n_removed_local++;
+                break;
+            }
+        }
+        if (goflag == GOFLAG_STOP) {
+            break;
+        }
+        target_el = select_random_el(RCU_Q_LEN);
+        j = 0;
+        QLIST_FOREACH_RCU(el, &Q_list_head, entry) {
+            j++;
+            if (target_el == j) {
+                prev_el = g_new(struct list_element, 1);
+                atomic_add(&n_nodes, 1);
+                prev_el->val = atomic_read(&n_nodes);
+                QLIST_INSERT_BEFORE_RCU(el, prev_el, entry);
+                break;
+            }
+        }
+
+        n_updates_local += 2;
+        synchronize_rcu();
+    }
+    synchronize_rcu();
+    atomic_add(&n_updates, n_updates_local);
+    atomic_add(&n_nodes_removed, n_removed_local);
+    return NULL;
+}
+
+static void rcu_qtest_init(void)
+{
+    struct list_element *new_el;
+    int i;
+    nthreadsrunning = 0;
+    srand(time(0));
+    for (i = 0; i < RCU_Q_LEN; i++) {
+        new_el = g_new(struct list_element, 1);
+        new_el->val = i;
+        QLIST_INSERT_HEAD_RCU(&Q_list_head, new_el, entry);
+    }
+    atomic_add(&n_nodes, RCU_Q_LEN);
+}
+
+static void rcu_qtest_run(int duration, int nreaders)
+{
+    int nthreads = nreaders + 1;
+    while (atomic_read(&nthreadsrunning) < nthreads) {
+        g_usleep(1000);
+    }
+
+    goflag = GOFLAG_RUN;
+    sleep(duration);
+    goflag = GOFLAG_STOP;
+    wait_all_threads();
+}
+
+
+static void rcu_qtest(const char *test, int duration, int nreaders)
+{
+    int i;
+    long long n_removed_local = 0;
+
+    struct list_element *el, *prev_el;
+
+    rcu_qtest_init();
+    for (i = 0; i < nreaders; i++) {
+        create_thread(rcu_q_reader);
+    }
+    create_thread(rcu_q_updater);
+    rcu_qtest_run(duration, nreaders);
+
+    QLIST_FOREACH_SAFE_RCU(prev_el, &Q_list_head, entry, el) {
+        QLIST_REMOVE_RCU(prev_el, entry);
+        call_rcu1(&prev_el->rcu, reclaim_list_el);
+        n_removed_local++;
+    }
+    atomic_add(&n_nodes_removed, n_removed_local);
+    synchronize_rcu();
+    while (n_nodes_removed > n_reclaims) {
+        g_usleep(100);
+        synchronize_rcu();
+    }
+    if (g_test_in_charge) {
+        g_assert_cmpint(n_nodes_removed, ==, n_reclaims);
+    } else {
+        printf("%s: %d readers; 1 updater; nodes read: "  \
+               "%lld, nodes removed: %lld; nodes reclaimed: %lld\n",
+               test, nthreadsrunning - 1, n_reads, n_nodes_removed, n_reclaims);
+        exit(0);
+    }
+}
+
+static void usage(int argc, char *argv[])
+{
+    fprintf(stderr, "Usage: %s duration nreaders\n", argv[0]);
+    exit(-1);
+}
+
+static void gtest_rcuq_one(void)
+{
+    rcu_qtest("rcuqtest", 1, 1);
+}
+
+static void gtest_rcuq_short_few(void)
+{
+    rcu_qtest("rcuqtest", 2, 5);
+}
+
+
+static void gtest_rcuq_long_many(void)
+{
+    rcu_qtest("rcuqtest", 5, 20);
+}
+
+
+int main(int argc, char *argv[])
+{
+    int duration = 0, readers = 0;
+
+    if (argc >= 2) {
+        if (argv[1][0] == '-') {
+            g_test_init(&argc, &argv, NULL);
+            g_test_add_func("/rcu/qlist/single-threaded", gtest_rcuq_one);
+            g_test_add_func("/rcu/qlist/short-few", gtest_rcuq_short_few);
+            g_test_add_func("/rcu/qlist/long-many", gtest_rcuq_long_many);
+            g_test_in_charge = 1;
+            return g_test_run();
+        }
+        duration = strtoul(argv[1], NULL, 0);
+    }
+    if (argc >= 3) {
+        readers = strtoul(argv[2], NULL, 0);
+    }
+    if (duration && readers) {
+        rcu_qtest(argv[0], duration, readers);
+        return 0;
+    }
+
+    usage(argc, argv);
+    return -1;
+}
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 12/15] exec: protect mru_block with RCU
  2015-01-22 14:47 [Qemu-devel] [PATCH v2 00/15] RCUification of the memory API, parts 1 and 2 Paolo Bonzini
                   ` (10 preceding siblings ...)
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 11/15] rcu: introduce RCU-enabled QLIST Paolo Bonzini
@ 2015-01-22 14:47 ` Paolo Bonzini
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 13/15] cosmetic changes preparing for the following patches Paolo Bonzini
                   ` (2 subsequent siblings)
  14 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2015-01-22 14:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: borntraeger, famz, stefanha

Hence, freeing a RAMBlock has to be switched to call_rcu.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 exec.c                 | 61 ++++++++++++++++++++++++++++++++++----------------
 include/exec/cpu-all.h |  2 ++
 2 files changed, 44 insertions(+), 19 deletions(-)

diff --git a/exec.c b/exec.c
index 262e8bc..2a142fe 100644
--- a/exec.c
+++ b/exec.c
@@ -811,7 +811,7 @@ static RAMBlock *qemu_get_ram_block(ram_addr_t addr)
     RAMBlock *block;
 
     /* The list is protected by the iothread lock here.  */
-    block = ram_list.mru_block;
+    block = atomic_rcu_read(&ram_list.mru_block);
     if (block && addr - block->offset < block->max_length) {
         goto found;
     }
@@ -825,6 +825,22 @@ static RAMBlock *qemu_get_ram_block(ram_addr_t addr)
     abort();
 
 found:
+    /* It is safe to write mru_block outside the iothread lock.  This
+     * is what happens:
+     *
+     *     mru_block = xxx
+     *     rcu_read_unlock()
+     *                                        xxx removed from list
+     *                  rcu_read_lock()
+     *                  read mru_block
+     *                                        mru_block = NULL;
+     *                                        call_rcu(reclaim_ramblock, xxx);
+     *                  rcu_read_unlock()
+     *
+     * atomic_rcu_set is not needed here.  The block was already published
+     * when it was placed into the list.  Here we're just making an extra
+     * copy of the pointer.
+     */
     ram_list.mru_block = block;
     return block;
 }
@@ -1381,14 +1397,16 @@ static ram_addr_t ram_block_add(RAMBlock *new_block, Error **errp)
         QTAILQ_INSERT_TAIL(&ram_list.blocks, new_block, next);
     }
     ram_list.mru_block = NULL;
+    atomic_rcu_set(&ram_list.version, ram_list.version + 1);
 
-    ram_list.version++;
     qemu_mutex_unlock_ramlist();
 
     new_ram_size = last_ram_offset() >> TARGET_PAGE_BITS;
 
     if (new_ram_size > old_ram_size) {
         int i;
+
+        /* ram_list.dirty_memory[] is protected by the iothread lock.  */
         for (i = 0; i < DIRTY_MEMORY_NUM; i++) {
             ram_list.dirty_memory[i] =
                 bitmap_zero_extend(ram_list.dirty_memory[i],
@@ -1524,14 +1542,31 @@ void qemu_ram_free_from_ptr(ram_addr_t addr)
         if (addr == block->offset) {
             QTAILQ_REMOVE(&ram_list.blocks, block, next);
             ram_list.mru_block = NULL;
-            ram_list.version++;
-            g_free(block);
+            atomic_rcu_set(&ram_list.version, ram_list.version + 1);
+            call_rcu(block, (void (*)(struct RAMBlock *))g_free, rcu);
             break;
         }
     }
-    qemu_mutex_unlock_ramlist();
 }
 
+static void reclaim_ramblock(RAMBlock *block)
+{
+    if (block->flags & RAM_PREALLOC) {
+        ;
+    } else if (xen_enabled()) {
+        xen_invalidate_map_cache_entry(block->host);
+#ifndef _WIN32
+    } else if (block->fd >= 0) {
+        munmap(block->host, block->max_length);
+        close(block->fd);
+#endif
+    } else {
+        qemu_anon_ram_free(block->host, block->max_length);
+    }
+    g_free(block);
+}
+
+/* Called with the iothread lock held */
 void qemu_ram_free(ram_addr_t addr)
 {
     RAMBlock *block;
@@ -1542,20 +1577,8 @@ void qemu_ram_free(ram_addr_t addr)
         if (addr == block->offset) {
             QTAILQ_REMOVE(&ram_list.blocks, block, next);
             ram_list.mru_block = NULL;
-            ram_list.version++;
-            if (block->flags & RAM_PREALLOC) {
-                ;
-            } else if (xen_enabled()) {
-                xen_invalidate_map_cache_entry(block->host);
-#ifndef _WIN32
-            } else if (block->fd >= 0) {
-                munmap(block->host, block->max_length);
-                close(block->fd);
-#endif
-            } else {
-                qemu_anon_ram_free(block->host, block->max_length);
-            }
-            g_free(block);
+            atomic_rcu_set(&ram_list.version, ram_list.version + 1);
+            call_rcu(block, reclaim_ramblock, rcu);
             break;
         }
     }
diff --git a/include/exec/cpu-all.h b/include/exec/cpu-all.h
index 2c48286..b8781d1 100644
--- a/include/exec/cpu-all.h
+++ b/include/exec/cpu-all.h
@@ -24,6 +24,7 @@
 #include "exec/memory.h"
 #include "qemu/thread.h"
 #include "qom/cpu.h"
+#include "qemu/rcu.h"
 
 /* some important defines:
  *
@@ -268,6 +269,7 @@ CPUArchState *cpu_copy(CPUArchState *env);
 typedef struct RAMBlock RAMBlock;
 
 struct RAMBlock {
+    struct rcu_head rcu;
     struct MemoryRegion *mr;
     uint8_t *host;
     ram_addr_t offset;
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 13/15] cosmetic changes preparing for the following patches
  2015-01-22 14:47 [Qemu-devel] [PATCH v2 00/15] RCUification of the memory API, parts 1 and 2 Paolo Bonzini
                   ` (11 preceding siblings ...)
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 12/15] exec: protect mru_block with RCU Paolo Bonzini
@ 2015-01-22 14:47 ` Paolo Bonzini
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 14/15] exec: convert ram_list to QLIST Paolo Bonzini
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 15/15] Convert ram_list to RCU Paolo Bonzini
  14 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2015-01-22 14:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: Mike Day, borntraeger, famz, stefanha

From: Mike Day <ncmike@ncultra.org>

Signed-off-by: Mike Day <ncmike@ncultra.org>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 arch_init.c |  5 +----
 exec.c      | 68 +++++++++++++++++++++++++++++++++++++++----------------------
 2 files changed, 45 insertions(+), 28 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index 89c8fa4..b13f74b 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -688,9 +688,9 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
             }
         }
     }
+
     last_seen_block = block;
     last_offset = offset;
-
     return bytes_sent;
 }
 
@@ -1117,7 +1117,6 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
                 ret = -EINVAL;
                 break;
             }
-
             ch = qemu_get_byte(f);
             ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
             break;
@@ -1128,7 +1127,6 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
                 ret = -EINVAL;
                 break;
             }
-
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;
         case RAM_SAVE_FLAG_XBZRLE:
@@ -1138,7 +1136,6 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
                 ret = -EINVAL;
                 break;
             }
-
             if (load_xbzrle(f, addr, host) < 0) {
                 error_report("Failed to decompress XBZRLE page at "
                              RAM_ADDR_FMT, addr);
diff --git a/exec.c b/exec.c
index 2a142fe..e01458d 100644
--- a/exec.c
+++ b/exec.c
@@ -1631,7 +1631,6 @@ void qemu_ram_remap(ram_addr_t addr, ram_addr_t length)
                 memory_try_enable_merging(vaddr, length);
                 qemu_ram_setup_dump(vaddr, length);
             }
-            return;
         }
     }
 }
@@ -1639,49 +1638,60 @@ void qemu_ram_remap(ram_addr_t addr, ram_addr_t length)
 
 int qemu_get_ram_fd(ram_addr_t addr)
 {
-    RAMBlock *block = qemu_get_ram_block(addr);
+    RAMBlock *block;
+    int fd;
 
-    return block->fd;
+    block = qemu_get_ram_block(addr);
+    fd = atomic_rcu_read(&block->fd);
+    return fd;
 }
 
 void *qemu_get_ram_block_host_ptr(ram_addr_t addr)
 {
-    RAMBlock *block = qemu_get_ram_block(addr);
+    RAMBlock *block;
+    void *ptr;
 
-    return ramblock_ptr(block, 0);
+    block = qemu_get_ram_block(addr);
+    ptr = ramblock_ptr(block, 0);
+    return ptr;
 }
 
 /* Return a host pointer to ram allocated with qemu_ram_alloc.
-   With the exception of the softmmu code in this file, this should
-   only be used for local memory (e.g. video ram) that the device owns,
-   and knows it isn't going to access beyond the end of the block.
-
-   It should not be used for general purpose DMA.
-   Use cpu_physical_memory_map/cpu_physical_memory_rw instead.
+ * This should not be used for general purpose DMA.  Use address_space_map
+ * or address_space_rw instead. For local memory (e.g. video ram) that the
+ * device owns, use memory_region_get_ram_ptr.
  */
 void *qemu_get_ram_ptr(ram_addr_t addr)
 {
-    RAMBlock *block = qemu_get_ram_block(addr);
+    RAMBlock *block;
+    void *ptr;
 
-    if (xen_enabled()) {
+    block = qemu_get_ram_block(addr);
+
+    if (xen_enabled() && block->host == NULL) {
         /* We need to check if the requested address is in the RAM
          * because we don't want to map the entire memory in QEMU.
          * In that case just map until the end of the page.
          */
         if (block->offset == 0) {
-            return xen_map_cache(addr, 0, 0);
-        } else if (block->host == NULL) {
-            block->host =
-                xen_map_cache(block->offset, block->max_length, 1);
+            ptr = xen_map_cache(addr, 0, 0);
+            goto done;
         }
+
+        block->host = xen_map_cache(block->offset, block->max_length, 1);
     }
-    return ramblock_ptr(block, addr - block->offset);
+    ptr = ramblock_ptr(block, addr - block->offset);
+
+done:
+    return ptr;
 }
 
 /* Return a host pointer to guest's ram. Similar to qemu_get_ram_ptr
- * but takes a size argument */
+ * but takes a size argument.
+ */
 static void *qemu_ram_ptr_length(ram_addr_t addr, hwaddr *size)
 {
+    void *ptr;
     if (*size == 0) {
         return NULL;
     }
@@ -1689,12 +1699,12 @@ static void *qemu_ram_ptr_length(ram_addr_t addr, hwaddr *size)
         return xen_map_cache(addr, *size, 1);
     } else {
         RAMBlock *block;
-
         QTAILQ_FOREACH(block, &ram_list.blocks, next) {
             if (addr - block->offset < block->max_length) {
                 if (addr - block->offset + *size > block->max_length)
                     *size = block->max_length - addr + block->offset;
-                return ramblock_ptr(block, addr - block->offset);
+                ptr = ramblock_ptr(block, addr - block->offset);
+                return ptr;
             }
         }
 
@@ -1704,15 +1714,24 @@ static void *qemu_ram_ptr_length(ram_addr_t addr, hwaddr *size)
 }
 
 /* Some of the softmmu routines need to translate from a host pointer
-   (typically a TLB entry) back to a ram offset.  */
+ * (typically a TLB entry) back to a ram offset.
+ *
+ * By the time this function returns, the returned pointer is not protected
+ * by RCU anymore.  If the caller is not within an RCU critical section and
+ * does not hold the iothread lock, it must have other means of protecting the
+ * pointer, such as a reference to the region that includes the incoming
+ * ram_addr_t.
+ */
 MemoryRegion *qemu_ram_addr_from_host(void *ptr, ram_addr_t *ram_addr)
 {
     RAMBlock *block;
     uint8_t *host = ptr;
+    MemoryRegion *mr;
 
     if (xen_enabled()) {
         *ram_addr = xen_ram_addr_from_mapcache(ptr);
-        return qemu_get_ram_block(*ram_addr)->mr;
+        mr = qemu_get_ram_block(*ram_addr)->mr;
+        return mr;
     }
 
     block = ram_list.mru_block;
@@ -1734,7 +1753,8 @@ MemoryRegion *qemu_ram_addr_from_host(void *ptr, ram_addr_t *ram_addr)
 
 found:
     *ram_addr = block->offset + (host - block->host);
-    return block->mr;
+    mr = block->mr;
+    return mr;
 }
 
 static void notdirty_mem_write(void *opaque, hwaddr ram_addr,
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 14/15] exec: convert ram_list to QLIST
  2015-01-22 14:47 [Qemu-devel] [PATCH v2 00/15] RCUification of the memory API, parts 1 and 2 Paolo Bonzini
                   ` (12 preceding siblings ...)
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 13/15] cosmetic changes preparing for the following patches Paolo Bonzini
@ 2015-01-22 14:47 ` Paolo Bonzini
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 15/15] Convert ram_list to RCU Paolo Bonzini
  14 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2015-01-22 14:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: Mike Day, borntraeger, famz, stefanha

From: Mike Day <ncmike@ncultra.org>

QLIST has RCU-friendly primitives, so switch to it.

Signed-off-by: Mike Day <ncmike@ncultra.org>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>

Signed-off-by: Mike Day <ncmike@ncultra.org>
---
 arch_init.c                  | 19 ++++++++--------
 exec.c                       | 52 +++++++++++++++++++++++++-------------------
 include/exec/cpu-all.h       |  4 ++--
 scripts/dump-guest-memory.py |  8 +++----
 4 files changed, 46 insertions(+), 37 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index b13f74b..757632b 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -523,7 +523,7 @@ static void migration_bitmap_sync(void)
     trace_migration_bitmap_sync_start();
     address_space_sync_dirty_bitmap(&address_space_memory);
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH(block, &ram_list.blocks, next) {
         migration_bitmap_sync_range(block->mr->ram_addr, block->used_length);
     }
     trace_migration_bitmap_sync_end(migration_dirty_pages
@@ -661,7 +661,7 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
     MemoryRegion *mr;
 
     if (!block)
-        block = QTAILQ_FIRST(&ram_list.blocks);
+        block = QLIST_FIRST(&ram_list.blocks);
 
     while (true) {
         mr = block->mr;
@@ -672,9 +672,9 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
         }
         if (offset >= block->used_length) {
             offset = 0;
-            block = QTAILQ_NEXT(block, next);
+            block = QLIST_NEXT(block, next);
             if (!block) {
-                block = QTAILQ_FIRST(&ram_list.blocks);
+                block = QLIST_FIRST(&ram_list.blocks);
                 complete_round = true;
                 ram_bulk_stage = false;
             }
@@ -728,8 +728,9 @@ uint64_t ram_bytes_total(void)
     RAMBlock *block;
     uint64_t total = 0;
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next)
+    QLIST_FOREACH(block, &ram_list.blocks, next) {
         total += block->used_length;
+    }
 
     return total;
 }
@@ -830,7 +831,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
      * gaps due to alignment or unplugs.
      */
     migration_dirty_pages = 0;
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH(block, &ram_list.blocks, next) {
         uint64_t block_pages;
 
         block_pages = block->used_length >> TARGET_PAGE_BITS;
@@ -843,7 +844,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
 
     qemu_put_be64(f, ram_bytes_total() | RAM_SAVE_FLAG_MEM_SIZE);
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH(block, &ram_list.blocks, next) {
         qemu_put_byte(f, strlen(block->idstr));
         qemu_put_buffer(f, (uint8_t *)block->idstr, strlen(block->idstr));
         qemu_put_be64(f, block->used_length);
@@ -1029,7 +1030,7 @@ static inline void *host_from_stream_offset(QEMUFile *f,
     qemu_get_buffer(f, (uint8_t *)id, len);
     id[len] = 0;
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH(block, &ram_list.blocks, next) {
         if (!strncmp(id, block->idstr, sizeof(id)) &&
             block->max_length > offset) {
             return memory_region_get_ram_ptr(block->mr) + offset;
@@ -1086,7 +1087,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
                 id[len] = 0;
                 length = qemu_get_be64(f);
 
-                QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+                QLIST_FOREACH(block, &ram_list.blocks, next) {
                     if (!strncmp(id, block->idstr, sizeof(id))) {
                         if (length != block->used_length) {
                             Error *local_err = NULL;
diff --git a/exec.c b/exec.c
index e01458d..6f88a86 100644
--- a/exec.c
+++ b/exec.c
@@ -58,7 +58,7 @@
 #if !defined(CONFIG_USER_ONLY)
 static bool in_migration;
 
-RAMList ram_list = { .blocks = QTAILQ_HEAD_INITIALIZER(ram_list.blocks) };
+RAMList ram_list = { .blocks = QLIST_HEAD_INITIALIZER(ram_list.blocks) };
 
 static MemoryRegion *system_memory;
 static MemoryRegion *system_io;
@@ -815,7 +815,7 @@ static RAMBlock *qemu_get_ram_block(ram_addr_t addr)
     if (block && addr - block->offset < block->max_length) {
         goto found;
     }
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH(block, &ram_list.blocks, next) {
         if (addr - block->offset < block->max_length) {
             goto found;
         }
@@ -1197,15 +1197,16 @@ static ram_addr_t find_ram_offset(ram_addr_t size)
 
     assert(size != 0); /* it would hand out same offset multiple times */
 
-    if (QTAILQ_EMPTY(&ram_list.blocks))
+    if (QLIST_EMPTY(&ram_list.blocks)) {
         return 0;
+    }
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH(block, &ram_list.blocks, next) {
         ram_addr_t end, next = RAM_ADDR_MAX;
 
         end = block->offset + block->max_length;
 
-        QTAILQ_FOREACH(next_block, &ram_list.blocks, next) {
+        QLIST_FOREACH(next_block, &ram_list.blocks, next) {
             if (next_block->offset >= end) {
                 next = MIN(next, next_block->offset);
             }
@@ -1230,9 +1231,9 @@ ram_addr_t last_ram_offset(void)
     RAMBlock *block;
     ram_addr_t last = 0;
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next)
+    QLIST_FOREACH(block, &ram_list.blocks, next) {
         last = MAX(last, block->offset + block->max_length);
-
+    }
     return last;
 }
 
@@ -1256,7 +1257,7 @@ static RAMBlock *find_ram_block(ram_addr_t addr)
 {
     RAMBlock *block;
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH(block, &ram_list.blocks, next) {
         if (block->offset == addr) {
             return block;
         }
@@ -1284,7 +1285,7 @@ void qemu_ram_set_idstr(ram_addr_t addr, const char *name, DeviceState *dev)
 
     /* This assumes the iothread lock is taken here too.  */
     qemu_mutex_lock_ramlist();
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH(block, &ram_list.blocks, next) {
         if (block != new_block && !strcmp(block->idstr, new_block->idstr)) {
             fprintf(stderr, "RAMBlock \"%s\" already registered, abort!\n",
                     new_block->idstr);
@@ -1359,6 +1360,7 @@ int qemu_ram_resize(ram_addr_t base, ram_addr_t newsize, Error **errp)
 static ram_addr_t ram_block_add(RAMBlock *new_block, Error **errp)
 {
     RAMBlock *block;
+    RAMBlock *last_block = NULL;
     ram_addr_t old_ram_size, new_ram_size;
 
     old_ram_size = last_ram_offset() >> TARGET_PAGE_BITS;
@@ -1385,16 +1387,22 @@ static ram_addr_t ram_block_add(RAMBlock *new_block, Error **errp)
         }
     }
 
-    /* Keep the list sorted from biggest to smallest block.  */
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    /* Keep the list sorted from biggest to smallest block.  Unlike QTAILQ,
+     * QLIST (which has an RCU-friendly variant) does not have insertion at
+     * tail, so save the last element in last_block.
+     */
+    QLIST_FOREACH(block, &ram_list.blocks, next) {
+        last_block = block;
         if (block->max_length < new_block->max_length) {
             break;
         }
     }
     if (block) {
-        QTAILQ_INSERT_BEFORE(block, new_block, next);
-    } else {
-        QTAILQ_INSERT_TAIL(&ram_list.blocks, new_block, next);
+        QLIST_INSERT_BEFORE(block, new_block, next);
+    } else if (last_block) {
+        QLIST_INSERT_AFTER(last_block, new_block, next);
+    } else { /* list is empty */
+        QLIST_INSERT_HEAD(&ram_list.blocks, new_block, next);
     }
     ram_list.mru_block = NULL;
     atomic_rcu_set(&ram_list.version, ram_list.version + 1);
@@ -1538,9 +1546,9 @@ void qemu_ram_free_from_ptr(ram_addr_t addr)
 
     /* This assumes the iothread lock is taken here too.  */
     qemu_mutex_lock_ramlist();
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH(block, &ram_list.blocks, next) {
         if (addr == block->offset) {
-            QTAILQ_REMOVE(&ram_list.blocks, block, next);
+            QLIST_REMOVE(block, next);
             ram_list.mru_block = NULL;
             atomic_rcu_set(&ram_list.version, ram_list.version + 1);
             call_rcu(block, (void (*)(struct RAMBlock *))g_free, rcu);
@@ -1573,9 +1581,9 @@ void qemu_ram_free(ram_addr_t addr)
 
     /* This assumes the iothread lock is taken here too.  */
     qemu_mutex_lock_ramlist();
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH(block, &ram_list.blocks, next) {
         if (addr == block->offset) {
-            QTAILQ_REMOVE(&ram_list.blocks, block, next);
+            QLIST_REMOVE(block, next);
             ram_list.mru_block = NULL;
             atomic_rcu_set(&ram_list.version, ram_list.version + 1);
             call_rcu(block, reclaim_ramblock, rcu);
@@ -1594,7 +1602,7 @@ void qemu_ram_remap(ram_addr_t addr, ram_addr_t length)
     int flags;
     void *area, *vaddr;
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH(block, &ram_list.blocks, next) {
         offset = addr - block->offset;
         if (offset < block->max_length) {
             vaddr = ramblock_ptr(block, offset);
@@ -1699,7 +1707,7 @@ static void *qemu_ram_ptr_length(ram_addr_t addr, hwaddr *size)
         return xen_map_cache(addr, *size, 1);
     } else {
         RAMBlock *block;
-        QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+        QLIST_FOREACH(block, &ram_list.blocks, next) {
             if (addr - block->offset < block->max_length) {
                 if (addr - block->offset + *size > block->max_length)
                     *size = block->max_length - addr + block->offset;
@@ -1739,7 +1747,7 @@ MemoryRegion *qemu_ram_addr_from_host(void *ptr, ram_addr_t *ram_addr)
         goto found;
     }
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH(block, &ram_list.blocks, next) {
         /* This case append when the block is not mapped. */
         if (block->host == NULL) {
             continue;
@@ -3007,7 +3015,7 @@ void qemu_ram_foreach_block(RAMBlockIterFunc func, void *opaque)
 {
     RAMBlock *block;
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH(block, &ram_list.blocks, next) {
         func(block->host, block->offset, block->used_length, opaque);
     }
 }
diff --git a/include/exec/cpu-all.h b/include/exec/cpu-all.h
index b8781d1..b38e03f 100644
--- a/include/exec/cpu-all.h
+++ b/include/exec/cpu-all.h
@@ -281,7 +281,7 @@ struct RAMBlock {
     /* Reads can take either the iothread or the ramlist lock.
      * Writes must take both locks.
      */
-    QTAILQ_ENTRY(RAMBlock) next;
+    QLIST_ENTRY(RAMBlock) next;
     int fd;
 };
 
@@ -298,7 +298,7 @@ typedef struct RAMList {
     unsigned long *dirty_memory[DIRTY_MEMORY_NUM];
     RAMBlock *mru_block;
     /* Protected by the ramlist lock.  */
-    QTAILQ_HEAD(, RAMBlock) blocks;
+    QLIST_HEAD(, RAMBlock) blocks;
     uint32_t version;
 } RAMList;
 extern RAMList ram_list;
diff --git a/scripts/dump-guest-memory.py b/scripts/dump-guest-memory.py
index 1ed8b67..dc8e44a 100644
--- a/scripts/dump-guest-memory.py
+++ b/scripts/dump-guest-memory.py
@@ -108,16 +108,16 @@ shape and this command should mostly work."""
         assert (val["hi"] == 0)
         return val["lo"]
 
-    def qtailq_foreach(self, head, field_str):
-        var_p = head["tqh_first"]
+    def qlist_foreach(self, head, field_str):
+        var_p = head["lh_first"]
         while (var_p != 0):
             var = var_p.dereference()
             yield var
-            var_p = var[field_str]["tqe_next"]
+            var_p = var[field_str]["le_next"]
 
     def qemu_get_ram_block(self, ram_addr):
         ram_blocks = gdb.parse_and_eval("ram_list.blocks")
-        for block in self.qtailq_foreach(ram_blocks, "next"):
+        for block in self.qlist_foreach(ram_blocks, "next"):
             if (ram_addr - block["offset"] < block["length"]):
                 return block
         raise gdb.GdbError("Bad ram offset %x" % ram_addr)
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* [Qemu-devel] [PATCH 15/15] Convert ram_list to RCU
  2015-01-22 14:47 [Qemu-devel] [PATCH v2 00/15] RCUification of the memory API, parts 1 and 2 Paolo Bonzini
                   ` (13 preceding siblings ...)
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 14/15] exec: convert ram_list to QLIST Paolo Bonzini
@ 2015-01-22 14:47 ` Paolo Bonzini
  14 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2015-01-22 14:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: Mike Day, borntraeger, famz, stefanha

From: Mike Day <ncmike@ncultra.org>

Allow "unlocked" reads of the ram_list by using an RCU-enabled
DQ. Most readers of the list no longer require holding the list mutex.

The ram_list now uses a QLIST instead of a QTAILQ. The difference is
minimal.

Signed-off-by: Mike Day <ncmike@ncultra.org>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 arch_init.c            |  62 +++++++++++++++++---------
 exec.c                 | 118 +++++++++++++++++++++++++++++--------------------
 include/exec/cpu-all.h |   7 +--
 3 files changed, 113 insertions(+), 74 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index 757632b..2051673 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -52,6 +52,7 @@
 #include "exec/ram_addr.h"
 #include "hw/acpi/acpi.h"
 #include "qemu/host-utils.h"
+#include "qemu/rcu_queue.h"
 
 #ifdef DEBUG_ARCH_INIT
 #define DPRINTF(fmt, ...) \
@@ -523,9 +524,12 @@ static void migration_bitmap_sync(void)
     trace_migration_bitmap_sync_start();
     address_space_sync_dirty_bitmap(&address_space_memory);
 
-    QLIST_FOREACH(block, &ram_list.blocks, next) {
+    rcu_read_lock();
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         migration_bitmap_sync_range(block->mr->ram_addr, block->used_length);
     }
+    rcu_read_unlock();
+
     trace_migration_bitmap_sync_end(migration_dirty_pages
                                     - num_dirty_pages_init);
     num_dirty_pages_period += migration_dirty_pages - num_dirty_pages_init;
@@ -648,6 +652,8 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
 /*
  * ram_find_and_save_block: Finds a page to send and sends it to f
  *
+ * Called within an RCU critical section.
+ *
  * Returns:  The number of bytes written.
  *           0 means no dirty pages
  */
@@ -661,7 +667,7 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
     MemoryRegion *mr;
 
     if (!block)
-        block = QLIST_FIRST(&ram_list.blocks);
+        block = QLIST_FIRST_RCU(&ram_list.blocks);
 
     while (true) {
         mr = block->mr;
@@ -672,9 +678,9 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
         }
         if (offset >= block->used_length) {
             offset = 0;
-            block = QLIST_NEXT(block, next);
+            block = QLIST_NEXT_RCU(block, next);
             if (!block) {
-                block = QLIST_FIRST(&ram_list.blocks);
+                block = QLIST_FIRST_RCU(&ram_list.blocks);
                 complete_round = true;
                 ram_bulk_stage = false;
             }
@@ -728,10 +734,10 @@ uint64_t ram_bytes_total(void)
     RAMBlock *block;
     uint64_t total = 0;
 
-    QLIST_FOREACH(block, &ram_list.blocks, next) {
+    rcu_read_lock();
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next)
         total += block->used_length;
-    }
-
+    rcu_read_unlock();
     return total;
 }
 
@@ -777,6 +783,13 @@ static void reset_ram_globals(void)
 
 #define MAX_WAIT 50 /* ms, half buffered_file limit */
 
+
+/* Each of ram_save_setup, ram_save_iterate and ram_save_complete has
+ * long-running RCU critical section.  When rcu-reclaims in the code
+ * start to become numerous it will be necessary to reduce the
+ * granularity of these critical sections.
+ */
+
 static int ram_save_setup(QEMUFile *f, void *opaque)
 {
     RAMBlock *block;
@@ -818,7 +831,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
     }
 
     qemu_mutex_lock_iothread();
-    qemu_mutex_lock_ramlist();
+    rcu_read_lock();
     bytes_transferred = 0;
     reset_ram_globals();
 
@@ -831,7 +844,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
      * gaps due to alignment or unplugs.
      */
     migration_dirty_pages = 0;
-    QLIST_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         uint64_t block_pages;
 
         block_pages = block->used_length >> TARGET_PAGE_BITS;
@@ -844,13 +857,13 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
 
     qemu_put_be64(f, ram_bytes_total() | RAM_SAVE_FLAG_MEM_SIZE);
 
-    QLIST_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         qemu_put_byte(f, strlen(block->idstr));
         qemu_put_buffer(f, (uint8_t *)block->idstr, strlen(block->idstr));
         qemu_put_be64(f, block->used_length);
     }
 
-    qemu_mutex_unlock_ramlist();
+    rcu_read_unlock();
 
     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
@@ -867,9 +880,8 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
     int64_t t0;
     int total_sent = 0;
 
-    qemu_mutex_lock_ramlist();
-
-    if (ram_list.version != last_version) {
+    rcu_read_lock();
+    if (atomic_rcu_read(&ram_list.version) != last_version) {
         reset_ram_globals();
     }
 
@@ -903,8 +915,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
         }
         i++;
     }
-
-    qemu_mutex_unlock_ramlist();
+    rcu_read_unlock();
 
     /*
      * Must occur before EOS (or any QEMUFile operation)
@@ -931,7 +942,8 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
 
 static int ram_save_complete(QEMUFile *f, void *opaque)
 {
-    qemu_mutex_lock_ramlist();
+    rcu_read_lock();
+
     migration_bitmap_sync();
 
     ram_control_before_iterate(f, RAM_CONTROL_FINISH);
@@ -953,7 +965,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
     migration_end();
 
-    qemu_mutex_unlock_ramlist();
+    rcu_read_unlock();
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
 
     return 0;
@@ -1009,6 +1021,9 @@ static int load_xbzrle(QEMUFile *f, ram_addr_t addr, void *host)
     return 0;
 }
 
+/* Must be called from within a rcu critical section.
+ * Returns a pointer from within the RCU-protected ram_list.
+ */
 static inline void *host_from_stream_offset(QEMUFile *f,
                                             ram_addr_t offset,
                                             int flags)
@@ -1030,7 +1045,7 @@ static inline void *host_from_stream_offset(QEMUFile *f,
     qemu_get_buffer(f, (uint8_t *)id, len);
     id[len] = 0;
 
-    QLIST_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         if (!strncmp(id, block->idstr, sizeof(id)) &&
             block->max_length > offset) {
             return memory_region_get_ram_ptr(block->mr) + offset;
@@ -1063,6 +1078,12 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
         ret = -EINVAL;
     }
 
+    /* This RCU critical section can be very long running.
+     * When RCU reclaims in the code start to become numerous,
+     * it will be necessary to reduce the granularity of this
+     * critical section.
+     */
+    rcu_read_lock();
     while (!ret && !(flags & RAM_SAVE_FLAG_EOS)) {
         ram_addr_t addr, total_ram_bytes;
         void *host;
@@ -1087,7 +1108,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
                 id[len] = 0;
                 length = qemu_get_be64(f);
 
-                QLIST_FOREACH(block, &ram_list.blocks, next) {
+                QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
                     if (!strncmp(id, block->idstr, sizeof(id))) {
                         if (length != block->used_length) {
                             Error *local_err = NULL;
@@ -1161,6 +1182,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
         }
     }
 
+    rcu_read_unlock();
     DPRINTF("Completed load of VM with exit code %d seq iteration "
             "%" PRIu64 "\n", ret, seq_iter);
     return ret;
diff --git a/exec.c b/exec.c
index 6f88a86..1c70662 100644
--- a/exec.c
+++ b/exec.c
@@ -44,7 +44,7 @@
 #include "trace.h"
 #endif
 #include "exec/cpu-all.h"
-
+#include "qemu/rcu_queue.h"
 #include "exec/cputlb.h"
 #include "translate-all.h"
 
@@ -58,6 +58,9 @@
 #if !defined(CONFIG_USER_ONLY)
 static bool in_migration;
 
+/* ram_list is read under rcu_read_lock()/rcu_read_unlock().  Writes
+ * are protected by a lock, currently the iothread lock.
+ */
 RAMList ram_list = { .blocks = QLIST_HEAD_INITIALIZER(ram_list.blocks) };
 
 static MemoryRegion *system_memory;
@@ -426,7 +429,6 @@ address_space_translate_for_iotlb(CPUState *cpu, hwaddr addr,
 void cpu_exec_init_all(void)
 {
 #if !defined(CONFIG_USER_ONLY)
-    qemu_mutex_init(&ram_list.mutex);
     memory_map_init();
     io_mem_init();
 #endif
@@ -806,16 +808,16 @@ void cpu_abort(CPUState *cpu, const char *fmt, ...)
 }
 
 #if !defined(CONFIG_USER_ONLY)
+/* Called from RCU critical section */
 static RAMBlock *qemu_get_ram_block(ram_addr_t addr)
 {
     RAMBlock *block;
 
-    /* The list is protected by the iothread lock here.  */
     block = atomic_rcu_read(&ram_list.mru_block);
     if (block && addr - block->offset < block->max_length) {
         goto found;
     }
-    QLIST_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         if (addr - block->offset < block->max_length) {
             goto found;
         }
@@ -854,10 +856,12 @@ static void tlb_reset_dirty_range_all(ram_addr_t start, ram_addr_t length)
     end = TARGET_PAGE_ALIGN(start + length);
     start &= TARGET_PAGE_MASK;
 
+    rcu_read_lock();
     block = qemu_get_ram_block(start);
     assert(block == qemu_get_ram_block(end - 1));
     start1 = (uintptr_t)ramblock_ptr(block, start - block->offset);
     cpu_tlb_reset_dirty_all(start1, length);
+    rcu_read_unlock();
 }
 
 /* Note: start and end must be within the same ram block.  */
@@ -1061,16 +1065,6 @@ void qemu_flush_coalesced_mmio_buffer(void)
         kvm_flush_coalesced_mmio_buffer();
 }
 
-void qemu_mutex_lock_ramlist(void)
-{
-    qemu_mutex_lock(&ram_list.mutex);
-}
-
-void qemu_mutex_unlock_ramlist(void)
-{
-    qemu_mutex_unlock(&ram_list.mutex);
-}
-
 #ifdef __linux__
 
 #include <sys/vfs.h>
@@ -1190,6 +1184,7 @@ error:
 }
 #endif
 
+/* Called with the iothread lock held, to protect ram_list.  */
 static ram_addr_t find_ram_offset(ram_addr_t size)
 {
     RAMBlock *block, *next_block;
@@ -1197,16 +1192,16 @@ static ram_addr_t find_ram_offset(ram_addr_t size)
 
     assert(size != 0); /* it would hand out same offset multiple times */
 
-    if (QLIST_EMPTY(&ram_list.blocks)) {
+    if (QLIST_EMPTY_RCU(&ram_list.blocks)) {
         return 0;
     }
 
-    QLIST_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         ram_addr_t end, next = RAM_ADDR_MAX;
 
         end = block->offset + block->max_length;
 
-        QLIST_FOREACH(next_block, &ram_list.blocks, next) {
+        QLIST_FOREACH_RCU(next_block, &ram_list.blocks, next) {
             if (next_block->offset >= end) {
                 next = MIN(next, next_block->offset);
             }
@@ -1231,9 +1226,11 @@ ram_addr_t last_ram_offset(void)
     RAMBlock *block;
     ram_addr_t last = 0;
 
-    QLIST_FOREACH(block, &ram_list.blocks, next) {
+    rcu_read_lock();
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         last = MAX(last, block->offset + block->max_length);
     }
+    rcu_read_unlock();
     return last;
 }
 
@@ -1253,11 +1250,14 @@ static void qemu_ram_setup_dump(void *addr, ram_addr_t size)
     }
 }
 
+/* Called within an RCU critical section, or while the iothread lock
+ * is held.
+ */
 static RAMBlock *find_ram_block(ram_addr_t addr)
 {
     RAMBlock *block;
 
-    QLIST_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         if (block->offset == addr) {
             return block;
         }
@@ -1266,6 +1266,7 @@ static RAMBlock *find_ram_block(ram_addr_t addr)
     return NULL;
 }
 
+/* Called with the iothread lock held, to protect ram_list.  */
 void qemu_ram_set_idstr(ram_addr_t addr, const char *name, DeviceState *dev)
 {
     RAMBlock *new_block = find_ram_block(addr);
@@ -1283,18 +1284,16 @@ void qemu_ram_set_idstr(ram_addr_t addr, const char *name, DeviceState *dev)
     }
     pstrcat(new_block->idstr, sizeof(new_block->idstr), name);
 
-    /* This assumes the iothread lock is taken here too.  */
-    qemu_mutex_lock_ramlist();
-    QLIST_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         if (block != new_block && !strcmp(block->idstr, new_block->idstr)) {
             fprintf(stderr, "RAMBlock \"%s\" already registered, abort!\n",
                     new_block->idstr);
             abort();
         }
     }
-    qemu_mutex_unlock_ramlist();
 }
 
+/* Called with the iothread lock held, to protect ram_list.  */
 void qemu_ram_unset_idstr(ram_addr_t addr)
 {
     RAMBlock *block = find_ram_block(addr);
@@ -1357,6 +1356,7 @@ int qemu_ram_resize(ram_addr_t base, ram_addr_t newsize, Error **errp)
     return 0;
 }
 
+/* Called with the iothread lock held, to protect ram_list.  */
 static ram_addr_t ram_block_add(RAMBlock *new_block, Error **errp)
 {
     RAMBlock *block;
@@ -1365,8 +1365,6 @@ static ram_addr_t ram_block_add(RAMBlock *new_block, Error **errp)
 
     old_ram_size = last_ram_offset() >> TARGET_PAGE_BITS;
 
-    /* This assumes the iothread lock is taken here too.  */
-    qemu_mutex_lock_ramlist();
     new_block->offset = find_ram_offset(new_block->max_length);
 
     if (!new_block->host) {
@@ -1380,7 +1378,6 @@ static ram_addr_t ram_block_add(RAMBlock *new_block, Error **errp)
                 error_setg_errno(errp, errno,
                                  "cannot set up guest memory '%s'",
                                  memory_region_name(new_block->mr));
-                qemu_mutex_unlock_ramlist();
                 return -1;
             }
             memory_try_enable_merging(new_block->host, new_block->max_length);
@@ -1391,24 +1388,22 @@ static ram_addr_t ram_block_add(RAMBlock *new_block, Error **errp)
      * QLIST (which has an RCU-friendly variant) does not have insertion at
      * tail, so save the last element in last_block.
      */
-    QLIST_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         last_block = block;
         if (block->max_length < new_block->max_length) {
             break;
         }
     }
     if (block) {
-        QLIST_INSERT_BEFORE(block, new_block, next);
+        QLIST_INSERT_BEFORE_RCU(block, new_block, next);
     } else if (last_block) {
-        QLIST_INSERT_AFTER(last_block, new_block, next);
+        QLIST_INSERT_AFTER_RCU(last_block, new_block, next);
     } else { /* list is empty */
-        QLIST_INSERT_HEAD(&ram_list.blocks, new_block, next);
+        QLIST_INSERT_HEAD_RCU(&ram_list.blocks, new_block, next);
     }
     ram_list.mru_block = NULL;
     atomic_rcu_set(&ram_list.version, ram_list.version + 1);
 
-    qemu_mutex_unlock_ramlist();
-
     new_ram_size = last_ram_offset() >> TARGET_PAGE_BITS;
 
     if (new_ram_size > old_ram_size) {
@@ -1436,6 +1431,7 @@ static ram_addr_t ram_block_add(RAMBlock *new_block, Error **errp)
 }
 
 #ifdef __linux__
+/* Called with the iothread lock held, to protect ram_list.  */
 ram_addr_t qemu_ram_alloc_from_file(ram_addr_t size, MemoryRegion *mr,
                                     bool share, const char *mem_path,
                                     Error **errp)
@@ -1483,6 +1479,7 @@ ram_addr_t qemu_ram_alloc_from_file(ram_addr_t size, MemoryRegion *mr,
 }
 #endif
 
+/* Called with the iothread lock held, to protect ram_list.  */
 static
 ram_addr_t qemu_ram_alloc_internal(ram_addr_t size, ram_addr_t max_size,
                                    void (*resized)(const char*,
@@ -1520,12 +1517,14 @@ ram_addr_t qemu_ram_alloc_internal(ram_addr_t size, ram_addr_t max_size,
     return addr;
 }
 
+/* Called with the iothread lock held, to protect ram_list.  */
 ram_addr_t qemu_ram_alloc_from_ptr(ram_addr_t size, void *host,
                                    MemoryRegion *mr, Error **errp)
 {
     return qemu_ram_alloc_internal(size, size, NULL, host, false, mr, errp);
 }
 
+/* Called with the iothread lock held, to protect ram_list.  */
 ram_addr_t qemu_ram_alloc(ram_addr_t size, MemoryRegion *mr, Error **errp)
 {
     return qemu_ram_alloc_internal(size, size, NULL, NULL, false, mr, errp);
@@ -1540,15 +1539,14 @@ ram_addr_t qemu_ram_alloc_resizeable(ram_addr_t size, ram_addr_t maxsz,
     return qemu_ram_alloc_internal(size, maxsz, resized, NULL, true, mr, errp);
 }
 
+/* Called with the iothread lock held, to protect ram_list.  */
 void qemu_ram_free_from_ptr(ram_addr_t addr)
 {
     RAMBlock *block;
 
-    /* This assumes the iothread lock is taken here too.  */
-    qemu_mutex_lock_ramlist();
-    QLIST_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         if (addr == block->offset) {
-            QLIST_REMOVE(block, next);
+            QLIST_REMOVE_RCU(block, next);
             ram_list.mru_block = NULL;
             atomic_rcu_set(&ram_list.version, ram_list.version + 1);
             call_rcu(block, (void (*)(struct RAMBlock *))g_free, rcu);
@@ -1579,19 +1577,15 @@ void qemu_ram_free(ram_addr_t addr)
 {
     RAMBlock *block;
 
-    /* This assumes the iothread lock is taken here too.  */
-    qemu_mutex_lock_ramlist();
-    QLIST_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         if (addr == block->offset) {
-            QLIST_REMOVE(block, next);
+            QLIST_REMOVE_RCU(block, next);
             ram_list.mru_block = NULL;
             atomic_rcu_set(&ram_list.version, ram_list.version + 1);
             call_rcu(block, reclaim_ramblock, rcu);
             break;
         }
     }
-    qemu_mutex_unlock_ramlist();
-
 }
 
 #ifndef _WIN32
@@ -1602,7 +1596,7 @@ void qemu_ram_remap(ram_addr_t addr, ram_addr_t length)
     int flags;
     void *area, *vaddr;
 
-    QLIST_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         offset = addr - block->offset;
         if (offset < block->max_length) {
             vaddr = ramblock_ptr(block, offset);
@@ -1649,8 +1643,10 @@ int qemu_get_ram_fd(ram_addr_t addr)
     RAMBlock *block;
     int fd;
 
+    rcu_read_lock();
     block = qemu_get_ram_block(addr);
     fd = atomic_rcu_read(&block->fd);
+    rcu_read_unlock();
     return fd;
 }
 
@@ -1659,8 +1655,10 @@ void *qemu_get_ram_block_host_ptr(ram_addr_t addr)
     RAMBlock *block;
     void *ptr;
 
+    rcu_read_lock();
     block = qemu_get_ram_block(addr);
     ptr = ramblock_ptr(block, 0);
+    rcu_read_unlock();
     return ptr;
 }
 
@@ -1668,12 +1666,19 @@ void *qemu_get_ram_block_host_ptr(ram_addr_t addr)
  * This should not be used for general purpose DMA.  Use address_space_map
  * or address_space_rw instead. For local memory (e.g. video ram) that the
  * device owns, use memory_region_get_ram_ptr.
+ *
+ * By the time this function returns, the returned pointer is not protected
+ * by RCU anymore.  If the caller is not within an RCU critical section and
+ * does not hold the iothread lock, it must have other means of protecting the
+ * pointer, such as a reference to the region that includes the incoming
+ * ram_addr_t.
  */
 void *qemu_get_ram_ptr(ram_addr_t addr)
 {
     RAMBlock *block;
     void *ptr;
 
+    rcu_read_lock();
     block = qemu_get_ram_block(addr);
 
     if (xen_enabled() && block->host == NULL) {
@@ -1683,19 +1688,26 @@ void *qemu_get_ram_ptr(ram_addr_t addr)
          */
         if (block->offset == 0) {
             ptr = xen_map_cache(addr, 0, 0);
-            goto done;
+            goto unlock;
         }
 
         block->host = xen_map_cache(block->offset, block->max_length, 1);
     }
     ptr = ramblock_ptr(block, addr - block->offset);
 
-done:
+unlock:
+    rcu_read_unlock();
     return ptr;
 }
 
 /* Return a host pointer to guest's ram. Similar to qemu_get_ram_ptr
  * but takes a size argument.
+ *
+ * By the time this function returns, the returned pointer is not protected
+ * by RCU anymore.  If the caller is not within an RCU critical section and
+ * does not hold the iothread lock, it must have other means of protecting the
+ * pointer, such as a reference to the region that includes the incoming
+ * ram_addr_t.
  */
 static void *qemu_ram_ptr_length(ram_addr_t addr, hwaddr *size)
 {
@@ -1707,11 +1719,13 @@ static void *qemu_ram_ptr_length(ram_addr_t addr, hwaddr *size)
         return xen_map_cache(addr, *size, 1);
     } else {
         RAMBlock *block;
-        QLIST_FOREACH(block, &ram_list.blocks, next) {
+        rcu_read_lock();
+        QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
             if (addr - block->offset < block->max_length) {
                 if (addr - block->offset + *size > block->max_length)
                     *size = block->max_length - addr + block->offset;
                 ptr = ramblock_ptr(block, addr - block->offset);
+                rcu_read_unlock();
                 return ptr;
             }
         }
@@ -1737,17 +1751,20 @@ MemoryRegion *qemu_ram_addr_from_host(void *ptr, ram_addr_t *ram_addr)
     MemoryRegion *mr;
 
     if (xen_enabled()) {
+        rcu_read_lock();
         *ram_addr = xen_ram_addr_from_mapcache(ptr);
         mr = qemu_get_ram_block(*ram_addr)->mr;
+        rcu_read_unlock();
         return mr;
     }
 
-    block = ram_list.mru_block;
+    rcu_read_lock();
+    block = atomic_rcu_read(&ram_list.mru_block);
     if (block && block->host && host - block->host < block->max_length) {
         goto found;
     }
 
-    QLIST_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         /* This case append when the block is not mapped. */
         if (block->host == NULL) {
             continue;
@@ -1762,6 +1779,7 @@ MemoryRegion *qemu_ram_addr_from_host(void *ptr, ram_addr_t *ram_addr)
 found:
     *ram_addr = block->offset + (host - block->host);
     mr = block->mr;
+    rcu_read_unlock();
     return mr;
 }
 
@@ -3015,8 +3033,10 @@ void qemu_ram_foreach_block(RAMBlockIterFunc func, void *opaque)
 {
     RAMBlock *block;
 
-    QLIST_FOREACH(block, &ram_list.blocks, next) {
+    rcu_read_lock();
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         func(block->host, block->offset, block->used_length, opaque);
     }
+    rcu_read_unlock();
 }
 #endif
diff --git a/include/exec/cpu-all.h b/include/exec/cpu-all.h
index b38e03f..70f0f20 100644
--- a/include/exec/cpu-all.h
+++ b/include/exec/cpu-all.h
@@ -278,9 +278,7 @@ struct RAMBlock {
     void (*resized)(const char*, uint64_t length, void *host);
     uint32_t flags;
     char idstr[256];
-    /* Reads can take either the iothread or the ramlist lock.
-     * Writes must take both locks.
-     */
+    /* Writes must hold the iothread lock */
     QLIST_ENTRY(RAMBlock) next;
     int fd;
 };
@@ -293,11 +291,10 @@ static inline void *ramblock_ptr(RAMBlock *block, ram_addr_t offset)
 }
 
 typedef struct RAMList {
-    QemuMutex mutex;
     /* Protected by the iothread lock.  */
     unsigned long *dirty_memory[DIRTY_MEMORY_NUM];
     RAMBlock *mru_block;
-    /* Protected by the ramlist lock.  */
+    /* RCU-enabled, writes protected by the iothread lock. */
     QLIST_HEAD(, RAMBlock) blocks;
     uint32_t version;
 } RAMList;
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH 01/15] rcu: add rcu library
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 01/15] rcu: add rcu library Paolo Bonzini
@ 2015-01-26  3:13   ` Fam Zheng
  0 siblings, 0 replies; 27+ messages in thread
From: Fam Zheng @ 2015-01-26  3:13 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: borntraeger, qemu-devel, stefanha

On Thu, 01/22 15:47, Paolo Bonzini wrote:
> This includes a (mangled) copy of the liburcu code.  The main changes
> are: 1) removing dependencies on many other header files in liburcu; 2)
> removing for simplicity the tentative busy waiting in synchronize_rcu,
> which has limited performance effects; 3) replacing futexes in
> synchronize_rcu with QemuEvents for Win32 portability.  The API is
> the same as liburcu, so it should be possible in the future to require
> liburcu on POSIX systems for example and use our copy only on Windows.
> 
> Among the various versions available I chose urcu-mb, which is the
> least invasive implementation even though it does not have the
> fastest rcu_read_{lock,unlock} implementation.  The urcu flavor can
> be changed later, after benchmarking.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>

Reviewed-by: Fam Zheng <famz@redhat.com>

> ---
>  docs/rcu.txt              | 285 ++++++++++++++++++++++++++++++++++++++++++++++
>  hw/9pfs/virtio-9p-synth.c |   1 +
>  include/qemu/atomic.h     |  61 ++++++++++
>  include/qemu/queue.h      |  13 +++
>  include/qemu/rcu.h        | 112 ++++++++++++++++++
>  include/qemu/thread.h     |   3 -
>  util/Makefile.objs        |   1 +
>  util/rcu.c                | 172 ++++++++++++++++++++++++++++
>  8 files changed, 645 insertions(+), 3 deletions(-)
>  create mode 100644 docs/rcu.txt
>  create mode 100644 include/qemu/rcu.h
>  create mode 100644 util/rcu.c
> 
> diff --git a/docs/rcu.txt b/docs/rcu.txt
> new file mode 100644
> index 0000000..9938ad3
> --- /dev/null
> +++ b/docs/rcu.txt
> @@ -0,0 +1,285 @@
> +Using RCU (Read-Copy-Update) for synchronization
> +================================================
> +
> +Read-copy update (RCU) is a synchronization mechanism that is used to
> +protect read-mostly data structures.  RCU is very efficient and scalable
> +on the read side (it is wait-free), and thus can make the read paths
> +extremely fast.
> +
> +RCU supports concurrency between a single writer and multiple readers,
> +thus it is not used alone.  Typically, the write-side will use a lock to
> +serialize multiple updates, but other approaches are possible (e.g.,
> +restricting updates to a single task).  In QEMU, when a lock is used,
> +this will often be the "iothread mutex", also known as the "big QEMU
> +lock" (BQL).  Also, restricting updates to a single task is done in
> +QEMU using the "bottom half" API.
> +
> +RCU is fundamentally a "wait-to-finish" mechanism.  The read side marks
> +sections of code with "critical sections", and the update side will wait
> +for the execution of all *currently running* critical sections before
> +proceeding, or before asynchronously executing a callback.
> +
> +The key point here is that only the currently running critical sections
> +are waited for; critical sections that are started _after_ the beginning
> +of the wait do not extend the wait, despite running concurrently with
> +the updater.  This is the reason why RCU is more scalable than,
> +for example, reader-writer locks.  It is so much more scalable that
> +the system will have a single instance of the RCU mechanism; a single
> +mechanism can be used for an arbitrary number of "things", without
> +having to worry about things such as contention or deadlocks.
> +
> +How is this possible?  The basic idea is to split updates in two phases,
> +"removal" and "reclamation".  During removal, we ensure that subsequent
> +readers will not be able to get a reference to the old data.  After
> +removal has completed, a critical section will not be able to access
> +the old data.  Therefore, critical sections that begin after removal
> +do not matter; as soon as all previous critical sections have finished,
> +there cannot be any readers who hold references to the data structure,
> +and these can now be safely reclaimed (e.g., freed or unref'ed).
> +
> +Here is a picutre:
> +
> +        thread 1                  thread 2                  thread 3
> +    -------------------    ------------------------    -------------------
> +    enter RCU crit.sec.
> +           |                finish removal phase
> +           |                begin wait
> +           |                      |                    enter RCU crit.sec.
> +    exit RCU crit.sec             |                           |
> +                            complete wait                     |
> +                            begin reclamation phase           |
> +                                                       exit RCU crit.sec.
> +
> +
> +Note how thread 3 is still executing its critical section when thread 2
> +starts reclaiming data.  This is possible, because the old version of the
> +data structure was not accessible at the time thread 3 began executing
> +that critical section.
> +
> +
> +RCU API
> +=======
> +
> +The core RCU API is small:
> +
> +     void rcu_read_lock(void);
> +
> +        Used by a reader to inform the reclaimer that the reader is
> +        entering an RCU read-side critical section.
> +
> +     void rcu_read_unlock(void);
> +
> +        Used by a reader to inform the reclaimer that the reader is
> +        exiting an RCU read-side critical section.  Note that RCU
> +        read-side critical sections may be nested and/or overlapping.
> +
> +     void synchronize_rcu(void);
> +
> +        Blocks until all pre-existing RCU read-side critical sections
> +        on all threads have completed.  This marks the end of the removal
> +        phase and the beginning of reclamation phase.
> +
> +        Note that it would be valid for another update to come while
> +        synchronize_rcu is running.  Because of this, it is better that
> +        the updater releases any locks it may hold before calling
> +        synchronize_rcu.
> +
> +     typeof(*p) atomic_rcu_read(p);
> +
> +        atomic_rcu_read() is similar to atomic_mb_read(), but it makes
> +        some assumptions on the code that calls it.  This allows a more
> +        optimized implementation.
> +
> +        atomic_rcu_read assumes that whenever a single RCU critical
> +        section reads multiple shared data, these reads are either
> +        data-dependent or need no ordering.  This is almost always the
> +        case when using RCU, because read-side critical sections typically
> +        navigate one or more pointers (the pointers that are changed on
> +        every update) until reaching a data structure of interest,
> +        and then read from there.
> +
> +        RCU read-side critical sections must use atomic_rcu_read() to
> +        read data, unless concurrent writes are presented by another
> +        synchronization mechanism.
> +
> +        Furthermore, RCU read-side critical sections should traverse the
> +        data structure in a single direction, opposite to the direction
> +        in which the updater initializes it.
> +
> +     void atomic_rcu_set(p, typeof(*p) v);
> +
> +        atomic_rcu_set() is also similar to atomic_mb_set(), and it also
> +        makes assumptions on the code that calls it in order to allow a more
> +        optimized implementation.
> +
> +        In particular, atomic_rcu_set() suffices for synchronization
> +        with readers, if the updater never mutates a field within a
> +        data item that is already accessible to readers.  This is the
> +        case when initializing a new copy of the RCU-protected data
> +        structure; just ensure that initialization of *p is carried out
> +        before atomic_rcu_set() makes the data item visible to readers.
> +        If this rule is observed, writes will happen in the opposite
> +        order as reads in the RCU read-side critical sections (or if
> +        there is just one update), and there will be no need for other
> +        synchronization mechanism to coordinate the accesses.
> +
> +The following APIs must be used before RCU is used in a thread:
> +
> +     void rcu_register_thread(void);
> +
> +        Mark a thread as taking part in the RCU mechanism.  Such a thread
> +        will have to report quiescent points regularly, either manually
> +        or through the QemuCond/QemuSemaphore/QemuEvent APIs.
> +
> +     void rcu_unregister_thread(void);
> +
> +        Mark a thread as not taking part anymore in the RCU mechanism.
> +        It is not a problem if such a thread reports quiescent points,
> +        either manually or by using the QemuCond/QemuSemaphore/QemuEvent
> +        APIs.
> +
> +Note that these APIs are relatively heavyweight, and should _not_ be
> +nested.
> +
> +
> +DIFFERENCES WITH LINUX
> +======================
> +
> +- Waiting on a mutex is possible, though discouraged, within an RCU critical
> +  section.  This is because spinlocks are rarely (if ever) used in userspace
> +  programming; not allowing this would prevent upgrading an RCU read-side
> +  critical section to become an updater.
> +
> +- atomic_rcu_read and atomic_rcu_set replace rcu_dereference and
> +  rcu_assign_pointer.  They take a _pointer_ to the variable being accessed.
> +
> +
> +RCU PATTERNS
> +============
> +
> +Many patterns using read-writer locks translate directly to RCU, with
> +the advantages of higher scalability and deadlock immunity.
> +
> +In general, RCU can be used whenever it is possible to create a new
> +"version" of a data structure every time the updater runs.  This may
> +sound like a very strict restriction, however:
> +
> +- the updater does not mean "everything that writes to a data structure",
> +  but rather "everything that involves a reclamation step".  See the
> +  array example below
> +
> +- in some cases, creating a new version of a data structure may actually
> +  be very cheap.  For example, modifying the "next" pointer of a singly
> +  linked list is effectively creating a new version of the list.
> +
> +Here are some frequently-used RCU idioms that are worth noting.
> +
> +
> +RCU list processing
> +-------------------
> +
> +TBD (not yet used in QEMU)
> +
> +
> +RCU reference counting
> +----------------------
> +
> +Because grace periods are not allowed to complete while there is an RCU
> +read-side critical section in progress, the RCU read-side primitives
> +may be used as a restricted reference-counting mechanism.  For example,
> +consider the following code fragment:
> +
> +    rcu_read_lock();
> +    p = atomic_rcu_read(&foo);
> +    /* do something with p. */
> +    rcu_read_unlock();
> +
> +The RCU read-side critical section ensures that the value of "p" remains
> +valid until after the rcu_read_unlock().  In some sense, it is acquiring
> +a reference to p that is later released when the critical section ends.
> +The write side looks simply like this (with appropriate locking):
> +
> +    qemu_mutex_lock(&foo_mutex);
> +    old = foo;
> +    atomic_rcu_set(&foo, new);
> +    qemu_mutex_unlock(&foo_mutex);
> +    synchronize_rcu();
> +    free(old);
> +
> +Note that the same idiom would be possible with reader/writer
> +locks:
> +
> +    read_lock(&foo_rwlock);         write_mutex_lock(&foo_rwlock);
> +    p = foo;                        p = foo;
> +    /* do something with p. */      foo = new;
> +    read_unlock(&foo_rwlock);       free(p);
> +                                    write_mutex_unlock(&foo_rwlock);
> +                                    free(p);
> +
> +
> +RCU resizable arrays
> +--------------------
> +
> +Resizable arrays can be used with RCU.  The expensive RCU synchronization
> +only needs to take place when the array is resized.  The two items to
> +take care of are:
> +
> +- ensuring that the old version of the array is available between removal
> +  and reclamation;
> +
> +- avoiding mismatches in the read side between the array data and the
> +  array size.
> +
> +The first problem is avoided simply by not using realloc.  Instead,
> +each resize will allocate a new array and copy the old data into it.
> +The second problem would arise if the size and the data pointers were
> +two members of a larger struct:
> +
> +    struct mystuff {
> +        ...
> +        int data_size;
> +        int data_alloc;
> +        T   *data;
> +        ...
> +    };
> +
> +Instead, we store the size of the array with the array itself:
> +
> +    struct arr {
> +        int size;
> +        int alloc;
> +        T   data[];
> +    };
> +    struct arr *global_array;
> +
> +    read side:
> +        rcu_read_lock();
> +        struct arr *array = atomic_rcu_read(&global_array);
> +        x = i < array->size ? array->data[i] : -1;
> +        rcu_read_unlock();
> +        return x;
> +
> +    write side (running under a lock):
> +        if (global_array->size == global_array->alloc) {
> +            /* Creating a new version.  */
> +            new_array = g_malloc(sizeof(struct arr) +
> +                                 global_array->alloc * 2 * sizeof(T));
> +            new_array->size = global_array->size;
> +            new_array->alloc = global_array->alloc * 2;
> +            memcpy(new_array->data, global_array->data,
> +                   global_array->alloc * sizeof(T));
> +
> +            /* Removal phase.  */
> +            old_array = global_array;
> +            atomic_rcu_set(&new_array->data, new_array);
> +            synchronize_rcu();
> +
> +            /* Reclamation phase.  */
> +            free(old_array);
> +        }
> +
> +
> +SOURCES
> +=======
> +
> +* Documentation/RCU/ from the Linux kernel
> diff --git a/hw/9pfs/virtio-9p-synth.c b/hw/9pfs/virtio-9p-synth.c
> index 71262bc..e75aa87 100644
> --- a/hw/9pfs/virtio-9p-synth.c
> +++ b/hw/9pfs/virtio-9p-synth.c
> @@ -17,6 +17,7 @@
>  #include "virtio-9p-xattr.h"
>  #include "fsdev/qemu-fsdev.h"
>  #include "virtio-9p-synth.h"
> +#include "qemu/rcu.h"
>  
>  #include <sys/stat.h>
>  
> diff --git a/include/qemu/atomic.h b/include/qemu/atomic.h
> index 93c2ae2..98e05ca 100644
> --- a/include/qemu/atomic.h
> +++ b/include/qemu/atomic.h
> @@ -129,6 +129,67 @@
>  #define atomic_set(ptr, i)     ((*(__typeof__(*ptr) volatile*) (ptr)) = (i))
>  #endif
>  
> +/**
> + * atomic_rcu_read - reads a RCU-protected pointer to a local variable
> + * into a RCU read-side critical section. The pointer can later be safely
> + * dereferenced within the critical section.
> + *
> + * This ensures that the pointer copy is invariant thorough the whole critical
> + * section.
> + *
> + * Inserts memory barriers on architectures that require them (currently only
> + * Alpha) and documents which pointers are protected by RCU.
> + *
> + * Unless the __ATOMIC_CONSUME memory order is available, atomic_rcu_read also
> + * includes a compiler barrier to ensure that value-speculative optimizations
> + * (e.g. VSS: Value Speculation Scheduling) does not perform the data read
> + * before the pointer read by speculating the value of the pointer.  On new
> + * enough compilers, atomic_load takes care of such concern about
> + * dependency-breaking optimizations.
> + *
> + * Should match atomic_rcu_set(), atomic_xchg(), atomic_cmpxchg().
> + */
> +#ifndef atomic_rcu_read
> +#ifdef __ATOMIC_CONSUME
> +#define atomic_rcu_read(ptr)    ({                \
> +    typeof(*ptr) _val;                            \
> +     __atomic_load(ptr, &_val, __ATOMIC_CONSUME); \
> +    _val;                                         \
> +})
> +#else
> +#define atomic_rcu_read(ptr)    ({                \
> +    typeof(*ptr) _val = atomic_read(ptr);         \
> +    smp_read_barrier_depends();                   \
> +    _val;                                         \
> +})
> +#endif
> +#endif
> +
> +/**
> + * atomic_rcu_set - assigns (publicizes) a pointer to a new data structure
> + * meant to be read by RCU read-side critical sections.
> + *
> + * Documents which pointers will be dereferenced by RCU read-side critical
> + * sections and adds the required memory barriers on architectures requiring
> + * them. It also makes sure the compiler does not reorder code initializing the
> + * data structure before its publication.
> + *
> + * Should match atomic_rcu_read().
> + */
> +#ifndef atomic_rcu_set
> +#ifdef __ATOMIC_RELEASE
> +#define atomic_rcu_set(ptr, i)  do {              \
> +    typeof(*ptr) _val = (i);                      \
> +    __atomic_store(ptr, &_val, __ATOMIC_RELEASE); \
> +} while(0)
> +#else
> +#define atomic_rcu_set(ptr, i)  do {              \
> +    smp_wmb();                                    \
> +    atomic_set(ptr, i);                           \
> +} while (0)
> +#endif
> +#endif
> +
>  /* These have the same semantics as Java volatile variables.
>   * See http://gee.cs.oswego.edu/dl/jmm/cookbook.html:
>   * "1. Issue a StoreStore barrier (wmb) before each volatile store."
> diff --git a/include/qemu/queue.h b/include/qemu/queue.h
> index a98eb3a..c602797 100644
> --- a/include/qemu/queue.h
> +++ b/include/qemu/queue.h
> @@ -104,6 +104,19 @@ struct {                                                                \
>          (head)->lh_first = NULL;                                        \
>  } while (/*CONSTCOND*/0)
>  
> +#define QLIST_SWAP(dstlist, srclist, field) do {                        \
> +        void *tmplist;                                                  \
> +        tmplist = (srclist)->lh_first;                                  \
> +        (srclist)->lh_first = (dstlist)->lh_first;                      \
> +        if ((srclist)->lh_first != NULL) {                              \
> +            (srclist)->lh_first->field.le_prev = &(srclist)->lh_first;  \
> +        }                                                               \
> +        (dstlist)->lh_first = tmplist;                                  \
> +        if ((dstlist)->lh_first != NULL) {                              \
> +            (dstlist)->lh_first->field.le_prev = &(dstlist)->lh_first;  \
> +        }                                                               \
> +} while (/*CONSTCOND*/0)
> +
>  #define QLIST_INSERT_AFTER(listelm, elm, field) do {                    \
>          if (((elm)->field.le_next = (listelm)->field.le_next) != NULL)  \
>                  (listelm)->field.le_next->field.le_prev =               \
> diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
> new file mode 100644
> index 0000000..cfef36e
> --- /dev/null
> +++ b/include/qemu/rcu.h
> @@ -0,0 +1,112 @@
> +#ifndef QEMU_RCU_H
> +#define QEMU_RCU_H
> +
> +/*
> + * urcu-mb.h
> + *
> + * Userspace RCU header with explicit memory barrier.
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + *
> + * IBM's contributions to this file may be relicensed under LGPLv2 or later.
> + */
> +
> +#include <stdlib.h>
> +#include <assert.h>
> +#include <limits.h>
> +#include <unistd.h>
> +#include <stdint.h>
> +#include <stdbool.h>
> +#include <glib.h>
> +
> +#include "qemu/compiler.h"
> +#include "qemu/thread.h"
> +#include "qemu/queue.h"
> +#include "qemu/atomic.h"
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +/*
> + * Important !
> + *
> + * Each thread containing read-side critical sections must be registered
> + * with rcu_register_thread() before calling rcu_read_lock().
> + * rcu_unregister_thread() should be called before the thread exits.
> + */
> +
> +#ifdef DEBUG_RCU
> +#define rcu_assert(args...)    assert(args)
> +#else
> +#define rcu_assert(args...)
> +#endif
> +
> +/*
> + * Global quiescent period counter with low-order bits unused.
> + * Using a int rather than a char to eliminate false register dependencies
> + * causing stalls on some architectures.
> + */
> +extern unsigned long rcu_gp_ctr;
> +
> +extern QemuEvent rcu_gp_event;
> +
> +struct rcu_reader_data {
> +    /* Data used by both reader and synchronize_rcu() */
> +    unsigned long ctr;
> +    bool waiting;
> +
> +    /* Data used for registry, protected by rcu_gp_lock */
> +    QLIST_ENTRY(rcu_reader_data) node;
> +};
> +
> +extern __thread struct rcu_reader_data rcu_reader;
> +
> +static inline void rcu_read_lock(void)
> +{
> +    struct rcu_reader_data *p_rcu_reader = &rcu_reader;
> +
> +    unsigned ctr = atomic_read(&rcu_gp_ctr);
> +    atomic_xchg(&p_rcu_reader->ctr, ctr);
> +    if (atomic_read(&p_rcu_reader->waiting)) {
> +        atomic_set(&p_rcu_reader->waiting, false);
> +        qemu_event_set(&rcu_gp_event);
> +    }
> +}
> +
> +static inline void rcu_read_unlock(void)
> +{
> +    struct rcu_reader_data *p_rcu_reader = &rcu_reader;
> +
> +    atomic_xchg(&p_rcu_reader->ctr, 0);
> +    if (atomic_read(&p_rcu_reader->waiting)) {
> +        atomic_set(&p_rcu_reader->waiting, false);
> +        qemu_event_set(&rcu_gp_event);
> +    }
> +}
> +
> +extern void synchronize_rcu(void);
> +
> +/*
> + * Reader thread registration.
> + */
> +extern void rcu_register_thread(void);
> +extern void rcu_unregister_thread(void);
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* QEMU_RCU_H */
> diff --git a/include/qemu/thread.h b/include/qemu/thread.h
> index e89fdc9..5114ec8 100644
> --- a/include/qemu/thread.h
> +++ b/include/qemu/thread.h
> @@ -25,9 +25,6 @@ void qemu_mutex_lock(QemuMutex *mutex);
>  int qemu_mutex_trylock(QemuMutex *mutex);
>  void qemu_mutex_unlock(QemuMutex *mutex);
>  
> -#define rcu_read_lock() do { } while (0)
> -#define rcu_read_unlock() do { } while (0)
> -
>  void qemu_cond_init(QemuCond *cond);
>  void qemu_cond_destroy(QemuCond *cond);
>  
> diff --git a/util/Makefile.objs b/util/Makefile.objs
> index 93007e2..ceaba30 100644
> --- a/util/Makefile.objs
> +++ b/util/Makefile.objs
> @@ -17,3 +17,4 @@ util-obj-y += throttle.o
>  util-obj-y += getauxval.o
>  util-obj-y += readline.o
>  util-obj-y += rfifolock.o
> +util-obj-y += rcu.o
> diff --git a/util/rcu.c b/util/rcu.c
> new file mode 100644
> index 0000000..1f737d5
> --- /dev/null
> +++ b/util/rcu.c
> @@ -0,0 +1,172 @@
> +/*
> + * urcu-mb.c
> + *
> + * Userspace RCU library with explicit memory barriers
> + *
> + * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
> + * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
> + * Copyright 2015 Red Hat, Inc.
> + *
> + * Ported to QEMU by Paolo Bonzini  <pbonzini@redhat.com>
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + *
> + * IBM's contributions to this file may be relicensed under LGPLv2 or later.
> + */
> +
> +#include <stdio.h>
> +#include <assert.h>
> +#include <stdlib.h>
> +#include <stdint.h>
> +#include <errno.h>
> +#include "qemu/rcu.h"
> +#include "qemu/atomic.h"
> +
> +/*
> + * Global grace period counter.  Bit 0 is always one in rcu_gp_ctr.
> + * Bits 1 and above are defined in synchronize_rcu.
> + */
> +#define RCU_GP_LOCKED           (1UL << 0)
> +#define RCU_GP_CTR              (1UL << 1)
> +
> +unsigned long rcu_gp_ctr = RCU_GP_LOCKED;
> +
> +QemuEvent rcu_gp_event;
> +static QemuMutex rcu_gp_lock;
> +
> +/*
> + * Check whether a quiescent state was crossed between the beginning of
> + * update_counter_and_wait and now.
> + */
> +static inline int rcu_gp_ongoing(unsigned long *ctr)
> +{
> +    unsigned long v;
> +
> +    v = atomic_read(ctr);
> +    return v && (v != rcu_gp_ctr);
> +}
> +
> +/* Written to only by each individual reader. Read by both the reader and the
> + * writers.
> + */
> +__thread struct rcu_reader_data rcu_reader;
> +
> +/* Protected by rcu_gp_lock.  */
> +typedef QLIST_HEAD(, rcu_reader_data) ThreadList;
> +static ThreadList registry = QLIST_HEAD_INITIALIZER(registry);
> +
> +/* Wait for previous parity/grace period to be empty of readers.  */
> +static void wait_for_readers(void)
> +{
> +    ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders);
> +    struct rcu_reader_data *index, *tmp;
> +
> +    for (;;) {
> +        /* We want to be notified of changes made to rcu_gp_ongoing
> +         * while we walk the list.
> +         */
> +        qemu_event_reset(&rcu_gp_event);
> +
> +        /* Instead of using atomic_mb_set for index->waiting, and
> +         * atomic_mb_read for index->ctr, memory barriers are placed
> +         * manually since writes to different threads are independent.
> +         * atomic_mb_set has a smp_wmb before...
> +         */
> +        smp_wmb();
> +        QLIST_FOREACH(index, &registry, node) {
> +            atomic_set(&index->waiting, true);
> +        }
> +
> +        /* ... and a smp_mb after.  */
> +        smp_mb();
> +
> +        QLIST_FOREACH_SAFE(index, &registry, node, tmp) {
> +            if (!rcu_gp_ongoing(&index->ctr)) {
> +                QLIST_REMOVE(index, node);
> +                QLIST_INSERT_HEAD(&qsreaders, index, node);
> +
> +                /* No need for mb_set here, worst of all we
> +                 * get some extra futex wakeups.
> +                 */
> +                atomic_set(&index->waiting, false);
> +            }
> +        }
> +
> +        /* atomic_mb_read has smp_rmb after.  */
> +        smp_rmb();
> +
> +        if (QLIST_EMPTY(&registry)) {
> +            break;
> +        }
> +
> +        /* Wait for one thread to report a quiescent state and
> +         * try again.
> +         */
> +        qemu_event_wait(&rcu_gp_event);
> +    }
> +
> +    /* put back the reader list in the registry */
> +    QLIST_SWAP(&registry, &qsreaders, node);
> +}
> +
> +void synchronize_rcu(void)
> +{
> +    qemu_mutex_lock(&rcu_gp_lock);
> +
> +    if (!QLIST_EMPTY(&registry)) {
> +        /* In either case, the atomic_mb_set below blocks stores that free
> +         * old RCU-protected pointers.
> +         */
> +        if (sizeof(rcu_gp_ctr) < 8) {
> +            /* For architectures with 32-bit longs, a two-subphases algorithm
> +             * ensures we do not encounter overflow bugs.
> +             *
> +             * Switch parity: 0 -> 1, 1 -> 0.
> +             */
> +            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
> +            wait_for_readers();
> +            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
> +        } else {
> +            /* Increment current grace period.  */
> +            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
> +        }
> +
> +        wait_for_readers();
> +    }
> +
> +    qemu_mutex_unlock(&rcu_gp_lock);
> +}
> +
> +void rcu_register_thread(void)
> +{
> +    assert(rcu_reader.ctr == 0);
> +    qemu_mutex_lock(&rcu_gp_lock);
> +    QLIST_INSERT_HEAD(&registry, &rcu_reader, node);
> +    qemu_mutex_unlock(&rcu_gp_lock);
> +}
> +
> +void rcu_unregister_thread(void)
> +{
> +    qemu_mutex_lock(&rcu_gp_lock);
> +    QLIST_REMOVE(&rcu_reader, node);
> +    qemu_mutex_unlock(&rcu_gp_lock);
> +}
> +
> +static void __attribute__((__constructor__)) rcu_init(void)
> +{
> +    qemu_mutex_init(&rcu_gp_lock);
> +    qemu_event_init(&rcu_gp_event, true);
> +    rcu_register_thread();
> +}
> -- 
> 1.8.3.1
> 
> 

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH 02/15] rcu: add rcutorture
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 02/15] rcu: add rcutorture Paolo Bonzini
@ 2015-01-26  3:31   ` Fam Zheng
  0 siblings, 0 replies; 27+ messages in thread
From: Fam Zheng @ 2015-01-26  3:31 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: borntraeger, qemu-devel, stefanha

On Thu, 01/22 15:47, Paolo Bonzini wrote:
> +
> +long long n_reads = 0LL;
> +long n_updates = 0L;
> +int nthreadsrunning;
> +
> +char argsbuf[64];

This seems unused. Overall the gtest enabled part looks good to me.

Reviewed-by: Fam Zheng <famz@redhat.com>

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH 03/15] rcu: allow nesting of rcu_read_lock/rcu_read_unlock
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 03/15] rcu: allow nesting of rcu_read_lock/rcu_read_unlock Paolo Bonzini
@ 2015-01-26  3:32   ` Fam Zheng
  0 siblings, 0 replies; 27+ messages in thread
From: Fam Zheng @ 2015-01-26  3:32 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: borntraeger, qemu-devel, stefanha

On Thu, 01/22 15:47, Paolo Bonzini wrote:
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  include/qemu/rcu.h | 15 ++++++++++++++-
>  tests/rcutorture.c |  2 ++
>  2 files changed, 16 insertions(+), 1 deletion(-)
> 
> diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
> index cfef36e..da043f2 100644
> --- a/include/qemu/rcu.h
> +++ b/include/qemu/rcu.h
> @@ -68,6 +68,9 @@ struct rcu_reader_data {
>      unsigned long ctr;
>      bool waiting;
>  
> +    /* Data used by reader only */
> +    unsigned depth;
> +
>      /* Data used for registry, protected by rcu_gp_lock */
>      QLIST_ENTRY(rcu_reader_data) node;
>  };
> @@ -77,8 +80,13 @@ extern __thread struct rcu_reader_data rcu_reader;
>  static inline void rcu_read_lock(void)
>  {
>      struct rcu_reader_data *p_rcu_reader = &rcu_reader;
> +    unsigned ctr;
> +
> +    if (p_rcu_reader->depth++ > 0) {
> +        return;
> +    }
>  
> -    unsigned ctr = atomic_read(&rcu_gp_ctr);
> +    ctr = atomic_read(&rcu_gp_ctr);
>      atomic_xchg(&p_rcu_reader->ctr, ctr);
>      if (atomic_read(&p_rcu_reader->waiting)) {
>          atomic_set(&p_rcu_reader->waiting, false);
> @@ -90,6 +98,11 @@ static inline void rcu_read_unlock(void)
>  {
>      struct rcu_reader_data *p_rcu_reader = &rcu_reader;
>  
> +    assert(p_rcu_reader->depth != 0);
> +    if (--p_rcu_reader->depth > 0) {
> +        return;
> +    }
> +
>      atomic_xchg(&p_rcu_reader->ctr, 0);
>      if (atomic_read(&p_rcu_reader->waiting)) {
>          atomic_set(&p_rcu_reader->waiting, false);
> diff --git a/tests/rcutorture.c b/tests/rcutorture.c
> index 214985f..31461c3 100644
> --- a/tests/rcutorture.c
> +++ b/tests/rcutorture.c
> @@ -257,9 +257,11 @@ static void *rcu_read_stress_test(void *arg)
>          if (p->mbtest == 0) {
>              n_mberror++;
>          }
> +        rcu_read_lock();
>          for (i = 0; i < 100; i++) {
>              garbage++;
>          }
> +        rcu_read_unlock();
>          pc = p->pipe_count;
>          rcu_read_unlock();
>          if ((pc > RCU_STRESS_PIPE_LEN) || (pc < 0)) {
> -- 
> 1.8.3.1
> 
> 

Reviewed-by: Fam Zheng <famz@redhat.com>

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH 04/15] rcu: add call_rcu
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 04/15] rcu: add call_rcu Paolo Bonzini
@ 2015-01-26  6:04   ` Fam Zheng
  0 siblings, 0 replies; 27+ messages in thread
From: Fam Zheng @ 2015-01-26  6:04 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: borntraeger, qemu-devel, stefanha

On Thu, 01/22 15:47, Paolo Bonzini wrote:
> Asynchronous callbacks provided by call_rcu are particularly important
> for QEMU, because the BQL makes it hard to use synchronize_rcu.
> 
> In addition, the current RCU implementation is not particularly friendly
> to multiple concurrent synchronize_rcu callers, making call_rcu even
> more important.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  docs/rcu.txt       | 110 +++++++++++++++++++++++++++++++++++++++++++++++--
>  include/qemu/rcu.h |  22 ++++++++++
>  util/rcu.c         | 119 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>  3 files changed, 247 insertions(+), 4 deletions(-)
> 
> diff --git a/docs/rcu.txt b/docs/rcu.txt
> index 9938ad3..61752b9 100644
> --- a/docs/rcu.txt
> +++ b/docs/rcu.txt
> @@ -82,7 +82,50 @@ The core RCU API is small:
>          Note that it would be valid for another update to come while
>          synchronize_rcu is running.  Because of this, it is better that
>          the updater releases any locks it may hold before calling
> -        synchronize_rcu.
> +        synchronize_rcu.  If this is not possible (for example, because
> +        the updater is protected by the BQL), you can use call_rcu.
> +
> +     void call_rcu1(struct rcu_head * head,
> +                    void (*func)(struct rcu_head *head));
> +
> +        This function invokes func(head) after all pre-existing RCU
> +        read-side critical sections on all threads have completed.  This
> +        marks the end of the removal phase, with func taking care
> +        asynchronously of the reclamation phase.
> +
> +        The foo struct needs to have an rcu_head structure added,
> +        perhaps as follows:
> +
> +            struct foo {
> +                struct rcu_head rcu;
> +                int a;
> +                char b;
> +                long c;
> +            };
> +
> +        so that the reclaimer function can fetch the struct foo address
> +        and free it:
> +
> +            call_rcu1(&foo.rcu, foo_reclaim);
> +
> +            void foo_reclaim(struct rcu_head *rp)
> +            {
> +                struct foo *fp = container_of(rp, struct foo, rcu);
> +                g_free(fp);
> +            }
> +
> +        For the common case where the rcu_head member is the first of the
> +        struct, you can use the following macro.
> +
> +     void call_rcu(T *p,
> +                   void (*func)(T *p),
> +                   field-name);
> +
> +        call_rcu1 is typically used through this macro, in the common case
> +        where the "struct rcu_head" is the first field in the struct.  In
> +        the above case, one could have written simply:
> +
> +            call_rcu(foo_reclaim, g_free, rcu);
>  
>       typeof(*p) atomic_rcu_read(p);
>  
> @@ -153,6 +196,11 @@ DIFFERENCES WITH LINUX
>  - atomic_rcu_read and atomic_rcu_set replace rcu_dereference and
>    rcu_assign_pointer.  They take a _pointer_ to the variable being accessed.
>  
> +- call_rcu is a macro that has an extra argument (the name of the first
> +  field in the struct, which must be a struct rcu_head), and expects the
> +  type of the callback's argument to be the type of the first argument.
> +  call_rcu1 is the same as Linux's call_rcu.
> +
>  
>  RCU PATTERNS
>  ============
> @@ -206,7 +254,47 @@ The write side looks simply like this (with appropriate locking):
>      synchronize_rcu();
>      free(old);
>  
> -Note that the same idiom would be possible with reader/writer
> +If the processing cannot be done purely within the critical section, it
> +is possible to combine this idiom with a "real" reference count:
> +
> +    rcu_read_lock();
> +    p = atomic_rcu_read(&foo);
> +    foo_ref(p);
> +    rcu_read_unlock();
> +    /* do something with p. */
> +    foo_unref(p);
> +
> +The write side can be like this:
> +
> +    qemu_mutex_lock(&foo_mutex);
> +    old = foo;
> +    atomic_rcu_set(&foo, new);
> +    qemu_mutex_unlock(&foo_mutex);
> +    synchronize_rcu();
> +    foo_unref(old);
> +
> +or with call_rcu:
> +
> +    qemu_mutex_lock(&foo_mutex);
> +    old = foo;
> +    atomic_rcu_set(&foo, new);
> +    qemu_mutex_unlock(&foo_mutex);
> +    call_rcu(foo_unref, old, rcu);
> +
> +In both cases, the write side only performs removal.  Reclamation
> +happens when the last reference to a "foo" object is dropped.
> +Using synchronize_rcu() is undesirably expensive, because the
> +last reference may be dropped on the read side.  Hence you can
> +use call_rcu() instead:
> +
> +     foo_unref(struct foo *p) {
> +        if (atomic_fetch_dec(&p->refcount) == 1) {
> +            call_rcu(foo_destroy, p, rcu);
> +        }
> +    }
> +
> +
> +Note that the same idioms would be possible with reader/writer
>  locks:
>  
>      read_lock(&foo_rwlock);         write_mutex_lock(&foo_rwlock);
> @@ -216,13 +304,27 @@ locks:
>                                      write_mutex_unlock(&foo_rwlock);
>                                      free(p);
>  
> +    ------------------------------------------------------------------
> +
> +    read_lock(&foo_rwlock);         write_mutex_lock(&foo_rwlock);
> +    p = foo;                        old = foo;
> +    foo_ref(p);                     foo = new;
> +    read_unlock(&foo_rwlock);       foo_unref(old);
> +    /* do something with p. */      write_mutex_unlock(&foo_rwlock);
> +    read_lock(&foo_rwlock);
> +    foo_unref(p);
> +    read_unlock(&foo_rwlock);
> +
> +foo_unref could use a mechanism such as bottom halves to move deallocation
> +out of the write-side critical section.
> +
>  
>  RCU resizable arrays
>  --------------------
>  
>  Resizable arrays can be used with RCU.  The expensive RCU synchronization
> -only needs to take place when the array is resized.  The two items to
> -take care of are:
> +(or call_rcu) only needs to take place when the array is resized.
> +The two items to take care of are:
>  
>  - ensuring that the old version of the array is available between removal
>    and reclamation;
> diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
> index da043f2..068a279 100644
> --- a/include/qemu/rcu.h
> +++ b/include/qemu/rcu.h
> @@ -118,6 +118,28 @@ extern void synchronize_rcu(void);
>  extern void rcu_register_thread(void);
>  extern void rcu_unregister_thread(void);
>  
> +struct rcu_head;
> +typedef void RCUCBFunc(struct rcu_head *head);
> +
> +struct rcu_head {
> +    struct rcu_head *next;
> +    RCUCBFunc *func;
> +};
> +
> +extern void call_rcu1(struct rcu_head *head, RCUCBFunc *func);
> +
> +/* The operands of the minus operator must have the same type,
> + * which must be the one that we specify in the cast.
> + */
> +#define call_rcu(head, func, field)                                      \
> +    call_rcu1(({                                                         \
> +         char __attribute__((unused))                                    \
> +            offset_must_be_zero[-offsetof(typeof(*(head)), field)],      \
> +            func_type_invalid = (func) - (void (*)(typeof(head)))(func); \
> +         &(head)->field;                                                 \
> +      }),                                                                \
> +      (RCUCBFunc *)(func))
> +
>  #ifdef __cplusplus
>  }
>  #endif
> diff --git a/util/rcu.c b/util/rcu.c
> index 1f737d5..c9c3e6e 100644
> --- a/util/rcu.c
> +++ b/util/rcu.c
> @@ -26,6 +26,7 @@
>   * IBM's contributions to this file may be relicensed under LGPLv2 or later.
>   */
>  
> +#include "qemu-common.h"
>  #include <stdio.h>
>  #include <assert.h>
>  #include <stdlib.h>
> @@ -33,6 +34,7 @@
>  #include <errno.h>
>  #include "qemu/rcu.h"
>  #include "qemu/atomic.h"
> +#include "qemu/thread.h"
>  
>  /*
>   * Global grace period counter.  Bit 0 is always one in rcu_gp_ctr.
> @@ -149,6 +151,116 @@ void synchronize_rcu(void)
>      qemu_mutex_unlock(&rcu_gp_lock);
>  }
>  
> +
> +#define RCU_CALL_MIN_SIZE        30
> +
> +/* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h
> + * from liburcu.  Note that head is only used by the consumer.
> + */
> +static struct rcu_head dummy;
> +static struct rcu_head *head = &dummy, **tail = &dummy.next;
> +static int rcu_call_count;
> +static QemuEvent rcu_call_ready_event;
> +
> +static void enqueue(struct rcu_head *node)
> +{
> +    struct rcu_head **old_tail;
> +
> +    node->next = NULL;
> +    old_tail = atomic_xchg(&tail, &node->next);
> +    atomic_mb_set(old_tail, node);
> +}
> +
> +static struct rcu_head *try_dequeue(void)
> +{
> +    struct rcu_head *node, *next;
> +
> +retry:
> +    /* Test for an empty list, which we do not expect.  Note that for
> +     * the consumer head and tail are always consistent.  The head
> +     * is consistent because only the consumer reads/writes it.
> +     * The tail, because it is the first step in the enqueuing.
> +     * It is only the next pointers that might be inconsistent.
> +     */
> +    if (head == &dummy && atomic_mb_read(&tail) == &dummy.next) {
> +        abort();
> +    }
> +
> +    /* If the head node has NULL in its next pointer, the value is
> +     * wrong and we need to wait until its enqueuer finishes the update.
> +     */
> +    node = head;
> +    next = atomic_mb_read(&head->next);
> +    if (!next) {
> +        return NULL;
> +    }
> +
> +    /* Since we are the sole consumer, and we excluded the empty case
> +     * above, the queue will always have at least two nodes: the
> +     * dummy node, and the one being removed.  So we do not need to update
> +     * the tail pointer.
> +     */
> +    head = next;
> +
> +    /* If we dequeued the dummy node, add it back at the end and retry.  */
> +    if (node == &dummy) {
> +        enqueue(node);
> +        goto retry;
> +    }
> +
> +    return node;
> +}
> +
> +static void *call_rcu_thread(void *opaque)
> +{
> +    struct rcu_head *node;
> +
> +    for (;;) {
> +        int tries = 0;
> +        int n = atomic_read(&rcu_call_count);
> +
> +        /* Heuristically wait for a decent number of callbacks to pile up.
> +         * Fetch rcu_call_count now, we only must process elements that were
> +         * added before synchronize_rcu() starts.
> +         */
> +        while (n < RCU_CALL_MIN_SIZE && ++tries <= 5) {
> +            g_usleep(100000);
> +            qemu_event_reset(&rcu_call_ready_event);
> +            n = atomic_read(&rcu_call_count);
> +            if (n < RCU_CALL_MIN_SIZE) {
> +                qemu_event_wait(&rcu_call_ready_event);
> +                n = atomic_read(&rcu_call_count);
> +            }
> +        }
> +
> +        atomic_sub(&rcu_call_count, n);
> +        synchronize_rcu();
> +        while (n > 0) {
> +            node = try_dequeue();
> +            while (!node) {
> +                qemu_event_reset(&rcu_call_ready_event);
> +                node = try_dequeue();
> +                if (!node) {
> +                    qemu_event_wait(&rcu_call_ready_event);
> +                    node = try_dequeue();
> +                }
> +            }
> +
> +            n--;
> +            node->func(node);
> +        }
> +    }
> +    abort();
> +}
> +
> +void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
> +{
> +    node->func = func;
> +    enqueue(node);
> +    atomic_inc(&rcu_call_count);
> +    qemu_event_set(&rcu_call_ready_event);
> +}
> +
>  void rcu_register_thread(void)
>  {
>      assert(rcu_reader.ctr == 0);
> @@ -166,7 +278,14 @@ void rcu_unregister_thread(void)
>  
>  static void __attribute__((__constructor__)) rcu_init(void)
>  {
> +    QemuThread thread;
> +
>      qemu_mutex_init(&rcu_gp_lock);
>      qemu_event_init(&rcu_gp_event, true);
> +
> +    qemu_event_init(&rcu_call_ready_event, false);
> +    qemu_thread_create(&thread, "call_rcu", call_rcu_thread,
> +                       NULL, QEMU_THREAD_DETACHED);
> +
>      rcu_register_thread();
>  }
> -- 
> 1.8.3.1
> 
> 
> 

Reviewed-by: Fam Zheng <famz@redhat.com>

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH 05/15] memory: remove assertion on memory_region_destroy
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 05/15] memory: remove assertion on memory_region_destroy Paolo Bonzini
@ 2015-01-26  6:04   ` Fam Zheng
  0 siblings, 0 replies; 27+ messages in thread
From: Fam Zheng @ 2015-01-26  6:04 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: borntraeger, qemu-devel, stefanha, Jan Kiszka

On Thu, 01/22 15:47, Paolo Bonzini wrote:
> From: Jan Kiszka <jan.kiszka@siemens.com>
> 
> Now that memory_region_destroy can be called from an RCU callback,
> checking the BQL-protected global memory_region_transaction_depth
> does not make much sense.
> 
> Signed-off-by: Jan Kiszka <jan.kiszka@siemens.com>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  memory.c | 1 -
>  1 file changed, 1 deletion(-)
> 
> diff --git a/memory.c b/memory.c
> index c343bf3..8c3d8c0 100644
> --- a/memory.c
> +++ b/memory.c
> @@ -1263,7 +1263,6 @@ static void memory_region_finalize(Object *obj)
>      MemoryRegion *mr = MEMORY_REGION(obj);
>  
>      assert(QTAILQ_EMPTY(&mr->subregions));
> -    assert(memory_region_transaction_depth == 0);
>      mr->destructor(mr);
>      memory_region_clear_coalescing(mr);
>      g_free((char *)mr->name);
> -- 
> 1.8.3.1
> 
> 

Reviewed-by: Fam Zheng <famz@redhat.com>

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH 06/15] memory: protect current_map by RCU
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 06/15] memory: protect current_map by RCU Paolo Bonzini
@ 2015-01-26  6:16   ` Fam Zheng
  0 siblings, 0 replies; 27+ messages in thread
From: Fam Zheng @ 2015-01-26  6:16 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: borntraeger, qemu-devel, stefanha

On Thu, 01/22 15:47, Paolo Bonzini wrote:
> Replace the flat_view_mutex by RCU, avoiding futex contention for
> dataplane on large systems and many iothreads.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  include/exec/memory.h |  5 +++++
>  memory.c              | 54 ++++++++++++++++++++++-----------------------------
>  2 files changed, 28 insertions(+), 31 deletions(-)
> 
> diff --git a/include/exec/memory.h b/include/exec/memory.h
> index 0cd96b1..06ffa1d 100644
> --- a/include/exec/memory.h
> +++ b/include/exec/memory.h
> @@ -33,6 +33,7 @@
>  #include "qemu/notify.h"
>  #include "qapi/error.h"
>  #include "qom/object.h"
> +#include "qemu/rcu.h"
>  
>  #define MAX_PHYS_ADDR_SPACE_BITS 62
>  #define MAX_PHYS_ADDR            (((hwaddr)1 << MAX_PHYS_ADDR_SPACE_BITS) - 1)
> @@ -207,9 +208,13 @@ struct MemoryListener {
>   */
>  struct AddressSpace {
>      /* All fields are private. */
> +    struct rcu_head rcu;
>      char *name;
>      MemoryRegion *root;
> +
> +    /* Accessed via RCU.  */
>      struct FlatView *current_map;
> +
>      int ioeventfd_nb;
>      struct MemoryRegionIoeventfd *ioeventfds;
>      struct AddressSpaceDispatch *dispatch;
> diff --git a/memory.c b/memory.c
> index 8c3d8c0..a844ced 100644
> --- a/memory.c
> +++ b/memory.c
> @@ -33,26 +33,12 @@ static bool memory_region_update_pending;
>  static bool ioeventfd_update_pending;
>  static bool global_dirty_log = false;
>  
> -/* flat_view_mutex is taken around reading as->current_map; the critical
> - * section is extremely short, so I'm using a single mutex for every AS.
> - * We could also RCU for the read-side.
> - *
> - * The BQL is taken around transaction commits, hence both locks are taken
> - * while writing to as->current_map (with the BQL taken outside).
> - */
> -static QemuMutex flat_view_mutex;
> -
>  static QTAILQ_HEAD(memory_listeners, MemoryListener) memory_listeners
>      = QTAILQ_HEAD_INITIALIZER(memory_listeners);
>  
>  static QTAILQ_HEAD(, AddressSpace) address_spaces
>      = QTAILQ_HEAD_INITIALIZER(address_spaces);
>  
> -static void memory_init(void)
> -{
> -    qemu_mutex_init(&flat_view_mutex);
> -}
> -
>  typedef struct AddrRange AddrRange;
>  
>  /*
> @@ -242,6 +228,7 @@ struct FlatRange {
>   * order.
>   */
>  struct FlatView {
> +    struct rcu_head rcu;
>      unsigned ref;
>      FlatRange *ranges;
>      unsigned nr;
> @@ -654,10 +641,10 @@ static FlatView *address_space_get_flatview(AddressSpace *as)
>  {
>      FlatView *view;
>  
> -    qemu_mutex_lock(&flat_view_mutex);
> -    view = as->current_map;
> +    rcu_read_lock();
> +    view = atomic_rcu_read(&as->current_map);
>      flatview_ref(view);
> -    qemu_mutex_unlock(&flat_view_mutex);
> +    rcu_read_unlock();
>      return view;
>  }
>  
> @@ -766,10 +753,9 @@ static void address_space_update_topology(AddressSpace *as)
>      address_space_update_topology_pass(as, old_view, new_view, false);
>      address_space_update_topology_pass(as, old_view, new_view, true);
>  
> -    qemu_mutex_lock(&flat_view_mutex);
> -    flatview_unref(as->current_map);
> -    as->current_map = new_view;
> -    qemu_mutex_unlock(&flat_view_mutex);
> +    /* Writes are protected by the BQL.  */
> +    atomic_rcu_set(&as->current_map, new_view);
> +    call_rcu(old_view, flatview_unref, rcu);
>  
>      /* Note that all the old MemoryRegions are still alive up to this
>       * point.  This relieves most MemoryListeners from the need to
> @@ -1957,10 +1943,6 @@ void memory_listener_unregister(MemoryListener *listener)
>  
>  void address_space_init(AddressSpace *as, MemoryRegion *root, const char *name)
>  {
> -    if (QTAILQ_EMPTY(&address_spaces)) {
> -        memory_init();
> -    }
> -
>      memory_region_transaction_begin();
>      as->root = root;
>      as->current_map = g_new(FlatView, 1);
> @@ -1974,15 +1956,10 @@ void address_space_init(AddressSpace *as, MemoryRegion *root, const char *name)
>      memory_region_transaction_commit();
>  }
>  
> -void address_space_destroy(AddressSpace *as)
> +static void do_address_space_destroy(AddressSpace *as)
>  {
>      MemoryListener *listener;
>  
> -    /* Flush out anything from MemoryListeners listening in on this */
> -    memory_region_transaction_begin();
> -    as->root = NULL;
> -    memory_region_transaction_commit();
> -    QTAILQ_REMOVE(&address_spaces, as, address_spaces_link);
>      address_space_destroy_dispatch(as);
>  
>      QTAILQ_FOREACH(listener, &memory_listeners, link) {
> @@ -1994,6 +1971,21 @@ void address_space_destroy(AddressSpace *as)
>      g_free(as->ioeventfds);
>  }
>  
> +void address_space_destroy(AddressSpace *as)
> +{
> +    /* Flush out anything from MemoryListeners listening in on this */
> +    memory_region_transaction_begin();
> +    as->root = NULL;
> +    memory_region_transaction_commit();
> +    QTAILQ_REMOVE(&address_spaces, as, address_spaces_link);
> +
> +    /* At this point, as->dispatch and as->current_map are dummy
> +     * entries that the guest should never use.  Wait for the old
> +     * values to expire before freeing the data.
> +     */
> +    call_rcu(as, do_address_space_destroy, rcu);
> +}
> +
>  bool io_mem_read(MemoryRegion *mr, hwaddr addr, uint64_t *pval, unsigned size)
>  {
>      return memory_region_dispatch_read(mr, addr, pval, size);
> -- 
> 1.8.3.1
> 
> 
Reviewed-by: Fam Zheng <famz@redhat.com>

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH 07/15] memory: avoid ref/unref in memory_region_find
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 07/15] memory: avoid ref/unref in memory_region_find Paolo Bonzini
@ 2015-01-26  6:24   ` Fam Zheng
  2015-01-26  9:19     ` Paolo Bonzini
  0 siblings, 1 reply; 27+ messages in thread
From: Fam Zheng @ 2015-01-26  6:24 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: borntraeger, qemu-devel, stefanha

On Thu, 01/22 15:47, Paolo Bonzini wrote:
> Do the entire lookup under RCU, which avoids atomic operations.

address_space_get_flatview() already is RCU protected, I don't see why this
patch is necessary. Could you explain?

And there is one question below:

> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  memory.c | 5 +++--
>  1 file changed, 3 insertions(+), 2 deletions(-)
> 
> diff --git a/memory.c b/memory.c
> index a844ced..577e87c 100644
> --- a/memory.c
> +++ b/memory.c
> @@ -1828,7 +1828,8 @@ MemoryRegionSection memory_region_find(MemoryRegion *mr,
>      }
>      range = addrrange_make(int128_make64(addr), int128_make64(size));
>  
> -    view = address_space_get_flatview(as);
> +    rcu_read_lock();
> +    view = atomic_rcu_read(&as->current_map);
>      fr = flatview_lookup(view, range);
>      if (!fr) {
>          flatview_unref(view);

Following lines are:

           return ret;
       }

Which requires a rcu_read_unlock.

Fam

> @@ -1850,7 +1851,7 @@ MemoryRegionSection memory_region_find(MemoryRegion *mr,
>      ret.readonly = fr->readonly;
>      memory_region_ref(ret.mr);
>  
> -    flatview_unref(view);
> +    rcu_read_unlock();
>      return ret;
>  }
>  
> -- 
> 1.8.3.1
> 
> 

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH 07/15] memory: avoid ref/unref in memory_region_find
  2015-01-26  6:24   ` Fam Zheng
@ 2015-01-26  9:19     ` Paolo Bonzini
  0 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2015-01-26  9:19 UTC (permalink / raw)
  To: Fam Zheng; +Cc: borntraeger, qemu-devel, stefanha



On 26/01/2015 07:24, Fam Zheng wrote:
>> > Do the entire lookup under RCU, which avoids atomic operations.
> address_space_get_flatview() already is RCU protected, I don't see why this
> patch is necessary. Could you explain?

It's just an optimization; flatview_ref/flatview_unref do
atomic_inc/atomic_dec and this patch avoids that.

Paolo

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH 10/15] exec: RCUify AddressSpaceDispatch
  2015-01-22 14:47 ` [Qemu-devel] [PATCH 10/15] exec: RCUify AddressSpaceDispatch Paolo Bonzini
@ 2015-01-28  5:45   ` Fam Zheng
  2015-01-28  9:44     ` Paolo Bonzini
  2015-02-03 10:07     ` Paolo Bonzini
  0 siblings, 2 replies; 27+ messages in thread
From: Fam Zheng @ 2015-01-28  5:45 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: borntraeger, qemu-devel, stefanha

On Thu, 01/22 15:47, Paolo Bonzini wrote:
> Note that even after this patch, most callers of address_space_*
> functions must still be under the big QEMU lock, otherwise the memory
> region returned by address_space_translate can disappear as soon as
> address_space_translate returns.  This will be fixed in the next part
> of this series.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  cpu-exec.c              | 13 ++++++++++++-
>  cpus.c                  |  2 +-
>  cputlb.c                |  8 ++++++--
>  exec.c                  | 34 ++++++++++++++++++++++++++--------
>  hw/i386/intel_iommu.c   |  3 +++
>  hw/pci-host/apb.c       |  1 +
>  hw/ppc/spapr_iommu.c    |  1 +
>  include/exec/exec-all.h |  1 +
>  8 files changed, 51 insertions(+), 12 deletions(-)
> 
> diff --git a/cpu-exec.c b/cpu-exec.c
> index 12473ff..edc5eb9 100644
> --- a/cpu-exec.c
> +++ b/cpu-exec.c
> @@ -26,6 +26,7 @@
>  #include "qemu/timer.h"
>  #include "exec/address-spaces.h"
>  #include "exec/memory-internal.h"
> +#include "qemu/rcu.h"
>  
>  /* -icount align implementation. */
>  
> @@ -149,8 +150,15 @@ void cpu_resume_from_signal(CPUState *cpu, void *puc)
>  
>  void cpu_reload_memory_map(CPUState *cpu)
>  {
> +    AddressSpaceDispatch *d;
> +
> +    if (qemu_in_vcpu_thread()) {
> +        rcu_read_unlock();

Could you explain why we need to split the critical section? Maybe a line of
comment helps here.

> +        rcu_read_lock();
> +    }
> +
>      /* The CPU and TLB are protected by the iothread lock.  */
> -    AddressSpaceDispatch *d = cpu->as->dispatch;
> +    d = atomic_rcu_read(&cpu->as->dispatch);
>      cpu->memory_dispatch = d;
>      tlb_flush(cpu, 1);
>  }
> @@ -365,6 +373,8 @@ int cpu_exec(CPUArchState *env)
>       * an instruction scheduling constraint on modern architectures.  */
>      smp_mb();
>  
> +    rcu_read_lock();
> +
>      if (unlikely(exit_request)) {
>          cpu->exit_request = 1;
>      }
> @@ -567,6 +577,7 @@ int cpu_exec(CPUArchState *env)
>      } /* for(;;) */
>  
>      cc->cpu_exec_exit(cpu);
> +    rcu_read_unlock();
>  
>      /* fail safe : never use current_cpu outside cpu_exec() */
>      current_cpu = NULL;
> diff --git a/cpus.c b/cpus.c
> index 3a5323b..b02c793 100644
> --- a/cpus.c
> +++ b/cpus.c
> @@ -1121,7 +1121,7 @@ bool qemu_cpu_is_self(CPUState *cpu)
>      return qemu_thread_is_self(cpu->thread);
>  }
>  
> -static bool qemu_in_vcpu_thread(void)
> +bool qemu_in_vcpu_thread(void)
>  {
>      return current_cpu && qemu_cpu_is_self(current_cpu);
>  }
> diff --git a/cputlb.c b/cputlb.c
> index f92db5e..38f2151 100644
> --- a/cputlb.c
> +++ b/cputlb.c
> @@ -243,8 +243,12 @@ static void tlb_add_large_page(CPUArchState *env, target_ulong vaddr,
>  }
>  
>  /* Add a new TLB entry. At most one entry for a given virtual address
> -   is permitted. Only a single TARGET_PAGE_SIZE region is mapped, the
> -   supplied size is only used by tlb_flush_page.  */
> + * is permitted. Only a single TARGET_PAGE_SIZE region is mapped, the
> + * supplied size is only used by tlb_flush_page.
> + *
> + * Called from TCG-generated code, which is under an RCU read-side
> + * critical section.
> + */
>  void tlb_set_page(CPUState *cpu, target_ulong vaddr,
>                    hwaddr paddr, int prot,
>                    int mmu_idx, target_ulong size)
> diff --git a/exec.c b/exec.c
> index 762ec76..262e8bc 100644
> --- a/exec.c
> +++ b/exec.c
> @@ -115,6 +115,8 @@ struct PhysPageEntry {
>  typedef PhysPageEntry Node[P_L2_SIZE];
>  
>  typedef struct PhysPageMap {
> +    struct rcu_head rcu;
> +
>      unsigned sections_nb;
>      unsigned sections_nb_alloc;
>      unsigned nodes_nb;
> @@ -124,6 +126,8 @@ typedef struct PhysPageMap {
>  } PhysPageMap;
>  
>  struct AddressSpaceDispatch {
> +    struct rcu_head rcu;
> +
>      /* This is a multi-level map on the physical address space.
>       * The bottom level has pointers to MemoryRegionSections.
>       */
> @@ -315,6 +319,7 @@ bool memory_region_is_unassigned(MemoryRegion *mr)
>          && mr != &io_mem_watch;
>  }
>  
> +/* Called from RCU critical section */
>  static MemoryRegionSection *address_space_lookup_region(AddressSpaceDispatch *d,
>                                                          hwaddr addr,
>                                                          bool resolve_subpage)
> @@ -330,6 +335,7 @@ static MemoryRegionSection *address_space_lookup_region(AddressSpaceDispatch *d,
>      return section;
>  }
>  
> +/* Called from RCU critical section */
>  static MemoryRegionSection *
>  address_space_translate_internal(AddressSpaceDispatch *d, hwaddr addr, hwaddr *xlat,
>                                   hwaddr *plen, bool resolve_subpage)
> @@ -370,8 +376,10 @@ MemoryRegion *address_space_translate(AddressSpace *as, hwaddr addr,
>      MemoryRegion *mr;
>      hwaddr len = *plen;
>  
> +    rcu_read_lock();
>      for (;;) {
> -        section = address_space_translate_internal(as->dispatch, addr, &addr, plen, true);
> +        AddressSpaceDispatch *d = atomic_rcu_read(&as->dispatch);
> +        section = address_space_translate_internal(d, addr, &addr, plen, true);
>          mr = section->mr;
>  
>          if (!mr->iommu_ops) {
> @@ -397,9 +405,11 @@ MemoryRegion *address_space_translate(AddressSpace *as, hwaddr addr,
>  
>      *plen = len;
>      *xlat = addr;
> +    rcu_read_unlock();
>      return mr;
>  }
>  
> +/* Called from RCU critical section */
>  MemoryRegionSection *
>  address_space_translate_for_iotlb(CPUState *cpu, hwaddr addr,
>                                    hwaddr *xlat, hwaddr *plen)
> @@ -852,6 +862,7 @@ static void cpu_physical_memory_set_dirty_tracking(bool enable)
>      in_migration = enable;
>  }
>  
> +/* Called from RCU critical section */
>  hwaddr memory_region_section_get_iotlb(CPUState *cpu,
>                                         MemoryRegionSection *section,
>                                         target_ulong vaddr,
> @@ -1963,7 +1974,8 @@ static uint16_t dummy_section(PhysPageMap *map, AddressSpace *as,
>  
>  MemoryRegion *iotlb_to_region(CPUState *cpu, hwaddr index)
>  {
> -    MemoryRegionSection *sections = cpu->memory_dispatch->map.sections;
> +    AddressSpaceDispatch *d = atomic_rcu_read(&cpu->memory_dispatch);
> +    MemoryRegionSection *sections = d->map.sections;
>  
>      return sections[index & ~TARGET_PAGE_MASK].mr;
>  }
> @@ -1999,6 +2011,12 @@ static void mem_begin(MemoryListener *listener)
>      as->next_dispatch = d;
>  }
>  
> +static void address_space_dispatch_free(AddressSpaceDispatch *d)
> +{
> +    phys_sections_free(&d->map);


If I understand it, this code doesn't hold iothread lock when releasing the
memory region, but in one of the memroy region destructors,
memory_region_destructor_ram_from_ptr:

    void qemu_ram_free_from_ptr(ram_addr_t addr)
    {
        RAMBlock *block;

        /* This assumes the iothread lock is taken here too.  */
        qemu_mutex_lock_ramlist();
        QTAILQ_FOREACH(block, &ram_list.blocks, next) {
...

Is the comment stale or I missed something?

Fam

> +    g_free(d);
> +}
> +
>  static void mem_commit(MemoryListener *listener)
>  {
>      AddressSpace *as = container_of(listener, AddressSpace, dispatch_listener);
> @@ -2007,11 +2025,9 @@ static void mem_commit(MemoryListener *listener)
>  
>      phys_page_compact_all(next, next->map.nodes_nb);
>  
> -    as->dispatch = next;
> -
> +    atomic_rcu_set(&as->dispatch, next);
>      if (cur) {
> -        phys_sections_free(&cur->map);
> -        g_free(cur);
> +        call_rcu(cur, address_space_dispatch_free, rcu);
>      }
>  }
>  
> @@ -2066,8 +2082,10 @@ void address_space_destroy_dispatch(AddressSpace *as)
>      AddressSpaceDispatch *d = as->dispatch;
>  
>      memory_listener_unregister(&as->dispatch_listener);
> -    g_free(d);
> -    as->dispatch = NULL;
> +    atomic_rcu_set(&as->dispatch, NULL);
> +    if (d) {
> +        call_rcu(d, address_space_dispatch_free, rcu);
> +    }
>  }
>  
>  static void memory_map_init(void)
> diff --git a/hw/i386/intel_iommu.c b/hw/i386/intel_iommu.c
> index 0a4282a..7da70ff 100644
> --- a/hw/i386/intel_iommu.c
> +++ b/hw/i386/intel_iommu.c
> @@ -745,6 +745,9 @@ static inline bool vtd_is_interrupt_addr(hwaddr addr)
>  
>  /* Map dev to context-entry then do a paging-structures walk to do a iommu
>   * translation.
> + *
> + * Called from RCU critical section.
> + *
>   * @bus_num: The bus number
>   * @devfn: The devfn, which is the  combined of device and function number
>   * @is_write: The access is a write operation
> diff --git a/hw/pci-host/apb.c b/hw/pci-host/apb.c
> index f573875..832b6c7 100644
> --- a/hw/pci-host/apb.c
> +++ b/hw/pci-host/apb.c
> @@ -205,6 +205,7 @@ static AddressSpace *pbm_pci_dma_iommu(PCIBus *bus, void *opaque, int devfn)
>      return &is->iommu_as;
>  }
>  
> +/* Called from RCU critical section */
>  static IOMMUTLBEntry pbm_translate_iommu(MemoryRegion *iommu, hwaddr addr,
>                                           bool is_write)
>  {
> diff --git a/hw/ppc/spapr_iommu.c b/hw/ppc/spapr_iommu.c
> index da47474..ba003da 100644
> --- a/hw/ppc/spapr_iommu.c
> +++ b/hw/ppc/spapr_iommu.c
> @@ -59,6 +59,7 @@ static sPAPRTCETable *spapr_tce_find_by_liobn(uint32_t liobn)
>      return NULL;
>  }
>  
> +/* Called from RCU critical section */
>  static IOMMUTLBEntry spapr_tce_translate_iommu(MemoryRegion *iommu, hwaddr addr,
>                                                 bool is_write)
>  {
> diff --git a/include/exec/exec-all.h b/include/exec/exec-all.h
> index bb3fd37..8eb0db3 100644
> --- a/include/exec/exec-all.h
> +++ b/include/exec/exec-all.h
> @@ -96,6 +96,7 @@ void tb_invalidate_phys_page_range(tb_page_addr_t start, tb_page_addr_t end,
>  void tb_invalidate_phys_range(tb_page_addr_t start, tb_page_addr_t end,
>                                int is_cpu_write_access);
>  #if !defined(CONFIG_USER_ONLY)
> +bool qemu_in_vcpu_thread(void);
>  void cpu_reload_memory_map(CPUState *cpu);
>  void tcg_cpu_address_space_init(CPUState *cpu, AddressSpace *as);
>  /* cputlb.c */
> -- 
> 1.8.3.1
> 
> 

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH 10/15] exec: RCUify AddressSpaceDispatch
  2015-01-28  5:45   ` Fam Zheng
@ 2015-01-28  9:44     ` Paolo Bonzini
  2015-02-03 10:07     ` Paolo Bonzini
  1 sibling, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2015-01-28  9:44 UTC (permalink / raw)
  To: Fam Zheng; +Cc: borntraeger, qemu-devel, stefanha



On 28/01/2015 06:45, Fam Zheng wrote:
> On Thu, 01/22 15:47, Paolo Bonzini wrote:
>> Note that even after this patch, most callers of address_space_*
>> functions must still be under the big QEMU lock, otherwise the memory
>> region returned by address_space_translate can disappear as soon as
>> address_space_translate returns.  This will be fixed in the next part
>> of this series.
>>
>> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
>> ---
>>  cpu-exec.c              | 13 ++++++++++++-
>>  cpus.c                  |  2 +-
>>  cputlb.c                |  8 ++++++--
>>  exec.c                  | 34 ++++++++++++++++++++++++++--------
>>  hw/i386/intel_iommu.c   |  3 +++
>>  hw/pci-host/apb.c       |  1 +
>>  hw/ppc/spapr_iommu.c    |  1 +
>>  include/exec/exec-all.h |  1 +
>>  8 files changed, 51 insertions(+), 12 deletions(-)
>>
>> diff --git a/cpu-exec.c b/cpu-exec.c
>> index 12473ff..edc5eb9 100644
>> --- a/cpu-exec.c
>> +++ b/cpu-exec.c
>> @@ -26,6 +26,7 @@
>>  #include "qemu/timer.h"
>>  #include "exec/address-spaces.h"
>>  #include "exec/memory-internal.h"
>> +#include "qemu/rcu.h"
>>  
>>  /* -icount align implementation. */
>>  
>> @@ -149,8 +150,15 @@ void cpu_resume_from_signal(CPUState *cpu, void *puc)
>>  
>>  void cpu_reload_memory_map(CPUState *cpu)
>>  {
>> +    AddressSpaceDispatch *d;
>> +
>> +    if (qemu_in_vcpu_thread()) {
>> +        rcu_read_unlock();
> 
> Could you explain why we need to split the critical section? Maybe a line of
> comment helps here.

It is needed because otherwise the guest could prolong the critical
section as much as it desires.

Currently, this is prevented by the I/O thread's kicking of the VCPU
thread (iothread_requesting_mutex, qemu_cpu_kick_thread) but this will
go away once TCG's execution moves out of the global mutex.

This pair matches cpu_exec's rcu_read_lock()/rcu_read_unlock(), which
only protects cpu->as->dispatch.  Since we reload it below, we can split
the critical section.

Paolo

>> +        rcu_read_lock();
>> +    }
>> +
>>      /* The CPU and TLB are protected by the iothread lock.  */
>> -    AddressSpaceDispatch *d = cpu->as->dispatch;
>> +    d = atomic_rcu_read(&cpu->as->dispatch);
>>      cpu->memory_dispatch = d;
>>      tlb_flush(cpu, 1);
>>  }
>> @@ -365,6 +373,8 @@ int cpu_exec(CPUArchState *env)
>>       * an instruction scheduling constraint on modern architectures.  */
>>      smp_mb();
>>  
>> +    rcu_read_lock();
>> +
>>      if (unlikely(exit_request)) {
>>          cpu->exit_request = 1;
>>      }
>> @@ -567,6 +577,7 @@ int cpu_exec(CPUArchState *env)
>>      } /* for(;;) */
>>  
>>      cc->cpu_exec_exit(cpu);
>> +    rcu_read_unlock();
>>  
>>      /* fail safe : never use current_cpu outside cpu_exec() */
>>      current_cpu = NULL;
>> diff --git a/cpus.c b/cpus.c
>> index 3a5323b..b02c793 100644
>> --- a/cpus.c
>> +++ b/cpus.c
>> @@ -1121,7 +1121,7 @@ bool qemu_cpu_is_self(CPUState *cpu)
>>      return qemu_thread_is_self(cpu->thread);
>>  }
>>  
>> -static bool qemu_in_vcpu_thread(void)
>> +bool qemu_in_vcpu_thread(void)
>>  {
>>      return current_cpu && qemu_cpu_is_self(current_cpu);
>>  }
>> diff --git a/cputlb.c b/cputlb.c
>> index f92db5e..38f2151 100644
>> --- a/cputlb.c
>> +++ b/cputlb.c
>> @@ -243,8 +243,12 @@ static void tlb_add_large_page(CPUArchState *env, target_ulong vaddr,
>>  }
>>  
>>  /* Add a new TLB entry. At most one entry for a given virtual address
>> -   is permitted. Only a single TARGET_PAGE_SIZE region is mapped, the
>> -   supplied size is only used by tlb_flush_page.  */
>> + * is permitted. Only a single TARGET_PAGE_SIZE region is mapped, the
>> + * supplied size is only used by tlb_flush_page.
>> + *
>> + * Called from TCG-generated code, which is under an RCU read-side
>> + * critical section.
>> + */
>>  void tlb_set_page(CPUState *cpu, target_ulong vaddr,
>>                    hwaddr paddr, int prot,
>>                    int mmu_idx, target_ulong size)
>> diff --git a/exec.c b/exec.c
>> index 762ec76..262e8bc 100644
>> --- a/exec.c
>> +++ b/exec.c
>> @@ -115,6 +115,8 @@ struct PhysPageEntry {
>>  typedef PhysPageEntry Node[P_L2_SIZE];
>>  
>>  typedef struct PhysPageMap {
>> +    struct rcu_head rcu;
>> +
>>      unsigned sections_nb;
>>      unsigned sections_nb_alloc;
>>      unsigned nodes_nb;
>> @@ -124,6 +126,8 @@ typedef struct PhysPageMap {
>>  } PhysPageMap;
>>  
>>  struct AddressSpaceDispatch {
>> +    struct rcu_head rcu;
>> +
>>      /* This is a multi-level map on the physical address space.
>>       * The bottom level has pointers to MemoryRegionSections.
>>       */
>> @@ -315,6 +319,7 @@ bool memory_region_is_unassigned(MemoryRegion *mr)
>>          && mr != &io_mem_watch;
>>  }
>>  
>> +/* Called from RCU critical section */
>>  static MemoryRegionSection *address_space_lookup_region(AddressSpaceDispatch *d,
>>                                                          hwaddr addr,
>>                                                          bool resolve_subpage)
>> @@ -330,6 +335,7 @@ static MemoryRegionSection *address_space_lookup_region(AddressSpaceDispatch *d,
>>      return section;
>>  }
>>  
>> +/* Called from RCU critical section */
>>  static MemoryRegionSection *
>>  address_space_translate_internal(AddressSpaceDispatch *d, hwaddr addr, hwaddr *xlat,
>>                                   hwaddr *plen, bool resolve_subpage)
>> @@ -370,8 +376,10 @@ MemoryRegion *address_space_translate(AddressSpace *as, hwaddr addr,
>>      MemoryRegion *mr;
>>      hwaddr len = *plen;
>>  
>> +    rcu_read_lock();
>>      for (;;) {
>> -        section = address_space_translate_internal(as->dispatch, addr, &addr, plen, true);
>> +        AddressSpaceDispatch *d = atomic_rcu_read(&as->dispatch);
>> +        section = address_space_translate_internal(d, addr, &addr, plen, true);
>>          mr = section->mr;
>>  
>>          if (!mr->iommu_ops) {
>> @@ -397,9 +405,11 @@ MemoryRegion *address_space_translate(AddressSpace *as, hwaddr addr,
>>  
>>      *plen = len;
>>      *xlat = addr;
>> +    rcu_read_unlock();
>>      return mr;
>>  }
>>  
>> +/* Called from RCU critical section */
>>  MemoryRegionSection *
>>  address_space_translate_for_iotlb(CPUState *cpu, hwaddr addr,
>>                                    hwaddr *xlat, hwaddr *plen)
>> @@ -852,6 +862,7 @@ static void cpu_physical_memory_set_dirty_tracking(bool enable)
>>      in_migration = enable;
>>  }
>>  
>> +/* Called from RCU critical section */
>>  hwaddr memory_region_section_get_iotlb(CPUState *cpu,
>>                                         MemoryRegionSection *section,
>>                                         target_ulong vaddr,
>> @@ -1963,7 +1974,8 @@ static uint16_t dummy_section(PhysPageMap *map, AddressSpace *as,
>>  
>>  MemoryRegion *iotlb_to_region(CPUState *cpu, hwaddr index)
>>  {
>> -    MemoryRegionSection *sections = cpu->memory_dispatch->map.sections;
>> +    AddressSpaceDispatch *d = atomic_rcu_read(&cpu->memory_dispatch);
>> +    MemoryRegionSection *sections = d->map.sections;
>>  
>>      return sections[index & ~TARGET_PAGE_MASK].mr;
>>  }
>> @@ -1999,6 +2011,12 @@ static void mem_begin(MemoryListener *listener)
>>      as->next_dispatch = d;
>>  }
>>  
>> +static void address_space_dispatch_free(AddressSpaceDispatch *d)
>> +{
>> +    phys_sections_free(&d->map);
> 
> 
> If I understand it, this code doesn't hold iothread lock when releasing the
> memory region, but in one of the memroy region destructors,
> memory_region_destructor_ram_from_ptr:
> 
>     void qemu_ram_free_from_ptr(ram_addr_t addr)
>     {
>         RAMBlock *block;
> 
>         /* This assumes the iothread lock is taken here too.  */
>         qemu_mutex_lock_ramlist();
>         QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> ...
> 
> Is the comment stale or I missed something?
> 
> Fam
> 
>> +    g_free(d);
>> +}
>> +
>>  static void mem_commit(MemoryListener *listener)
>>  {
>>      AddressSpace *as = container_of(listener, AddressSpace, dispatch_listener);
>> @@ -2007,11 +2025,9 @@ static void mem_commit(MemoryListener *listener)
>>  
>>      phys_page_compact_all(next, next->map.nodes_nb);
>>  
>> -    as->dispatch = next;
>> -
>> +    atomic_rcu_set(&as->dispatch, next);
>>      if (cur) {
>> -        phys_sections_free(&cur->map);
>> -        g_free(cur);
>> +        call_rcu(cur, address_space_dispatch_free, rcu);
>>      }
>>  }
>>  
>> @@ -2066,8 +2082,10 @@ void address_space_destroy_dispatch(AddressSpace *as)
>>      AddressSpaceDispatch *d = as->dispatch;
>>  
>>      memory_listener_unregister(&as->dispatch_listener);
>> -    g_free(d);
>> -    as->dispatch = NULL;
>> +    atomic_rcu_set(&as->dispatch, NULL);
>> +    if (d) {
>> +        call_rcu(d, address_space_dispatch_free, rcu);
>> +    }
>>  }
>>  
>>  static void memory_map_init(void)
>> diff --git a/hw/i386/intel_iommu.c b/hw/i386/intel_iommu.c
>> index 0a4282a..7da70ff 100644
>> --- a/hw/i386/intel_iommu.c
>> +++ b/hw/i386/intel_iommu.c
>> @@ -745,6 +745,9 @@ static inline bool vtd_is_interrupt_addr(hwaddr addr)
>>  
>>  /* Map dev to context-entry then do a paging-structures walk to do a iommu
>>   * translation.
>> + *
>> + * Called from RCU critical section.
>> + *
>>   * @bus_num: The bus number
>>   * @devfn: The devfn, which is the  combined of device and function number
>>   * @is_write: The access is a write operation
>> diff --git a/hw/pci-host/apb.c b/hw/pci-host/apb.c
>> index f573875..832b6c7 100644
>> --- a/hw/pci-host/apb.c
>> +++ b/hw/pci-host/apb.c
>> @@ -205,6 +205,7 @@ static AddressSpace *pbm_pci_dma_iommu(PCIBus *bus, void *opaque, int devfn)
>>      return &is->iommu_as;
>>  }
>>  
>> +/* Called from RCU critical section */
>>  static IOMMUTLBEntry pbm_translate_iommu(MemoryRegion *iommu, hwaddr addr,
>>                                           bool is_write)
>>  {
>> diff --git a/hw/ppc/spapr_iommu.c b/hw/ppc/spapr_iommu.c
>> index da47474..ba003da 100644
>> --- a/hw/ppc/spapr_iommu.c
>> +++ b/hw/ppc/spapr_iommu.c
>> @@ -59,6 +59,7 @@ static sPAPRTCETable *spapr_tce_find_by_liobn(uint32_t liobn)
>>      return NULL;
>>  }
>>  
>> +/* Called from RCU critical section */
>>  static IOMMUTLBEntry spapr_tce_translate_iommu(MemoryRegion *iommu, hwaddr addr,
>>                                                 bool is_write)
>>  {
>> diff --git a/include/exec/exec-all.h b/include/exec/exec-all.h
>> index bb3fd37..8eb0db3 100644
>> --- a/include/exec/exec-all.h
>> +++ b/include/exec/exec-all.h
>> @@ -96,6 +96,7 @@ void tb_invalidate_phys_page_range(tb_page_addr_t start, tb_page_addr_t end,
>>  void tb_invalidate_phys_range(tb_page_addr_t start, tb_page_addr_t end,
>>                                int is_cpu_write_access);
>>  #if !defined(CONFIG_USER_ONLY)
>> +bool qemu_in_vcpu_thread(void);
>>  void cpu_reload_memory_map(CPUState *cpu);
>>  void tcg_cpu_address_space_init(CPUState *cpu, AddressSpace *as);
>>  /* cputlb.c */
>> -- 
>> 1.8.3.1
>>
>>

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Qemu-devel] [PATCH 10/15] exec: RCUify AddressSpaceDispatch
  2015-01-28  5:45   ` Fam Zheng
  2015-01-28  9:44     ` Paolo Bonzini
@ 2015-02-03 10:07     ` Paolo Bonzini
  1 sibling, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2015-02-03 10:07 UTC (permalink / raw)
  To: Fam Zheng; +Cc: borntraeger, qemu-devel, stefanha



On 28/01/2015 06:45, Fam Zheng wrote:
> If I understand it, this code doesn't hold iothread lock when releasing the
> memory region, but in one of the memroy region destructors,
> memory_region_destructor_ram_from_ptr:
> 
>     void qemu_ram_free_from_ptr(ram_addr_t addr)
>     {
>         RAMBlock *block;
> 
>         /* This assumes the iothread lock is taken here too.  */
>         qemu_mutex_lock_ramlist();
>         QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> ...
> 
> Is the comment stale or I missed something?

No, you're right.

Paolo

^ permalink raw reply	[flat|nested] 27+ messages in thread

end of thread, other threads:[~2015-02-03 10:07 UTC | newest]

Thread overview: 27+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2015-01-22 14:47 [Qemu-devel] [PATCH v2 00/15] RCUification of the memory API, parts 1 and 2 Paolo Bonzini
2015-01-22 14:47 ` [Qemu-devel] [PATCH 01/15] rcu: add rcu library Paolo Bonzini
2015-01-26  3:13   ` Fam Zheng
2015-01-22 14:47 ` [Qemu-devel] [PATCH 02/15] rcu: add rcutorture Paolo Bonzini
2015-01-26  3:31   ` Fam Zheng
2015-01-22 14:47 ` [Qemu-devel] [PATCH 03/15] rcu: allow nesting of rcu_read_lock/rcu_read_unlock Paolo Bonzini
2015-01-26  3:32   ` Fam Zheng
2015-01-22 14:47 ` [Qemu-devel] [PATCH 04/15] rcu: add call_rcu Paolo Bonzini
2015-01-26  6:04   ` Fam Zheng
2015-01-22 14:47 ` [Qemu-devel] [PATCH 05/15] memory: remove assertion on memory_region_destroy Paolo Bonzini
2015-01-26  6:04   ` Fam Zheng
2015-01-22 14:47 ` [Qemu-devel] [PATCH 06/15] memory: protect current_map by RCU Paolo Bonzini
2015-01-26  6:16   ` Fam Zheng
2015-01-22 14:47 ` [Qemu-devel] [PATCH 07/15] memory: avoid ref/unref in memory_region_find Paolo Bonzini
2015-01-26  6:24   ` Fam Zheng
2015-01-26  9:19     ` Paolo Bonzini
2015-01-22 14:47 ` [Qemu-devel] [PATCH 08/15] exec: introduce cpu_reload_memory_map Paolo Bonzini
2015-01-22 14:47 ` [Qemu-devel] [PATCH 09/15] exec: make iotlb RCU-friendly Paolo Bonzini
2015-01-22 14:47 ` [Qemu-devel] [PATCH 10/15] exec: RCUify AddressSpaceDispatch Paolo Bonzini
2015-01-28  5:45   ` Fam Zheng
2015-01-28  9:44     ` Paolo Bonzini
2015-02-03 10:07     ` Paolo Bonzini
2015-01-22 14:47 ` [Qemu-devel] [PATCH 11/15] rcu: introduce RCU-enabled QLIST Paolo Bonzini
2015-01-22 14:47 ` [Qemu-devel] [PATCH 12/15] exec: protect mru_block with RCU Paolo Bonzini
2015-01-22 14:47 ` [Qemu-devel] [PATCH 13/15] cosmetic changes preparing for the following patches Paolo Bonzini
2015-01-22 14:47 ` [Qemu-devel] [PATCH 14/15] exec: convert ram_list to QLIST Paolo Bonzini
2015-01-22 14:47 ` [Qemu-devel] [PATCH 15/15] Convert ram_list to RCU Paolo Bonzini

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.