All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH 0/4] migration: improve multithreads
@ 2018-10-16 11:10 ` guangrong.xiao
  0 siblings, 0 replies; 28+ messages in thread
From: guangrong.xiao @ 2018-10-16 11:10 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: kvm, quintela, Xiao Guangrong, qemu-devel, peterx, dgilbert,
	wei.w.wang, jiang.biao2

From: Xiao Guangrong <xiaoguangrong@tencent.com>

This is the last part of our previous work:
   https://lists.gnu.org/archive/html/qemu-devel/2018-06/msg00526.html

This part finally improves the multithreads model used by compression
and decompression, that makes the compression feature is really usable
in the production.

Comparing with the previous version, we
1. port ptr_ring from linux kernel and use it to instead of lockless
   ring designed by ourself

   ( Michael, i added myself to the list of author in that file, if
    you dislike it, i'm fine to drop it. :) )

2  search all threads to detect if it has free room in its local ring
   to contain a request instead of RR to reduce busy-ratio

Background
----------
Current implementation of compression and decompression are very
hard to be enabled on productions. We noticed that too many wait-wakes
go to kernel space and CPU usages are very low even if the system
is really free

The reasons are:
1) there are two many locks used to do synchronous,there
  is a global lock and each single thread has its own lock,
  migration thread and work threads need to go to sleep if
  these locks are busy

2) migration thread separately submits request to the thread
   however, only one request can be pended, that means, the
   thread has to go to sleep after finishing the request

Our Ideas
---------
To make it work better, we introduce a lockless multithread model,
the user, currently it is the migration thread, submits request
to each thread which has its own ring whose capacity is 4 and
puts the result to a global ring where the user fetches result
out and do remaining operations for the request, e.g, posting the
compressed data out for migration on the source QEMU

Performance Result
-----------------
We tested live migration on two hosts:
   Intel(R) Xeon(R) Gold 6142 CPU @ 2.60GHz * 64 + 256G memory

to migration a VM between each other, which has 16 vCPUs and 120G
memory, during the migration, multiple threads are repeatedly writing
the memory in the VM

We used 16 threads on the destination to decompress the data and on the
source, we tried 4, 8 and 16 threads to compress the data

1) 4 threads, compress-wait-thread = off

CPU usages

         main thread      compression threads
-----------------------------------------------
before    66.2              32.4~36.8
after     56.5              59.4~60.9

Migration result

         total time        busy-ratio
--------------------------------------------------
before   247371             0.54
after    138326             0.55

2) 4 threads, compress-wait-thread = on

CPU usages

         main thread      compression threads
-----------------------------------------------
before    55.1              51.0~63.3
after     99.9              99.9

Migration result

         total time        busy-ratio
--------------------------------------------------
before   CAN'T COMPLETE    0
after    338692            0

3) 8 threads, compress-wait-thread = off

CPU usages

         main thread      compression threads
-----------------------------------------------
before    43.3              17.5~32.5
after     54.5              54.5~56.8

Migration result

         total time        busy-ratio
--------------------------------------------------
before   427384            0.19
after    125066            0.38

4) 8 threads, compress-wait-thread = on
CPU usages

         main thread      compression threads
-----------------------------------------------
before    96.3              2.3~46.8
after     90.6              90.6~91.8

Migration result

         total time        busy-ratio
--------------------------------------------------
before   CAN'T COMPLETE    0
after    164426            0

5) 16 threads, compress-wait-thread = off
CPU usages

         main thread      compression threads
-----------------------------------------------
before    56.2              6.2~56.2
after     37.8              37.8~40.2

Migration result

         total time        busy-ratio
--------------------------------------------------
before   2317123           0.02
after    149006            0.02

5) 16 threads, compress-wait-thread = on
CPU usages

         main thread      compression threads
-----------------------------------------------
before    48.3               1.7~31.0
after     43.9              42.1~45.6

Migration result

         total time        busy-ratio
--------------------------------------------------
before   1792817           0.00
after    161423            0.00

Xiao Guangrong (4):
  ptr_ring: port ptr_ring from linux kernel to QEMU
  migration: introduce lockless multithreads model
  migration: use lockless Multithread model for compression
  migration: use lockless Multithread model for decompression

 include/qemu/lockless-threads.h |  63 +++++
 include/qemu/ptr_ring.h         | 235 ++++++++++++++++++
 migration/ram.c                 | 535 +++++++++++++++-------------------------
 util/Makefile.objs              |   1 +
 util/lockless-threads.c         | 373 ++++++++++++++++++++++++++++
 5 files changed, 865 insertions(+), 342 deletions(-)
 create mode 100644 include/qemu/lockless-threads.h
 create mode 100644 include/qemu/ptr_ring.h
 create mode 100644 util/lockless-threads.c

-- 
2.14.4

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH 0/4] migration: improve multithreads
@ 2018-10-16 11:10 ` guangrong.xiao
  0 siblings, 0 replies; 28+ messages in thread
From: guangrong.xiao @ 2018-10-16 11:10 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: qemu-devel, kvm, dgilbert, peterx, wei.w.wang, jiang.biao2,
	eblake, quintela, Xiao Guangrong

From: Xiao Guangrong <xiaoguangrong@tencent.com>

This is the last part of our previous work:
   https://lists.gnu.org/archive/html/qemu-devel/2018-06/msg00526.html

This part finally improves the multithreads model used by compression
and decompression, that makes the compression feature is really usable
in the production.

Comparing with the previous version, we
1. port ptr_ring from linux kernel and use it to instead of lockless
   ring designed by ourself

   ( Michael, i added myself to the list of author in that file, if
    you dislike it, i'm fine to drop it. :) )

2  search all threads to detect if it has free room in its local ring
   to contain a request instead of RR to reduce busy-ratio

Background
----------
Current implementation of compression and decompression are very
hard to be enabled on productions. We noticed that too many wait-wakes
go to kernel space and CPU usages are very low even if the system
is really free

The reasons are:
1) there are two many locks used to do synchronous,there
  is a global lock and each single thread has its own lock,
  migration thread and work threads need to go to sleep if
  these locks are busy

2) migration thread separately submits request to the thread
   however, only one request can be pended, that means, the
   thread has to go to sleep after finishing the request

Our Ideas
---------
To make it work better, we introduce a lockless multithread model,
the user, currently it is the migration thread, submits request
to each thread which has its own ring whose capacity is 4 and
puts the result to a global ring where the user fetches result
out and do remaining operations for the request, e.g, posting the
compressed data out for migration on the source QEMU

Performance Result
-----------------
We tested live migration on two hosts:
   Intel(R) Xeon(R) Gold 6142 CPU @ 2.60GHz * 64 + 256G memory

to migration a VM between each other, which has 16 vCPUs and 120G
memory, during the migration, multiple threads are repeatedly writing
the memory in the VM

We used 16 threads on the destination to decompress the data and on the
source, we tried 4, 8 and 16 threads to compress the data

1) 4 threads, compress-wait-thread = off

CPU usages

         main thread      compression threads
-----------------------------------------------
before    66.2              32.4~36.8
after     56.5              59.4~60.9

Migration result

         total time        busy-ratio
--------------------------------------------------
before   247371             0.54
after    138326             0.55

2) 4 threads, compress-wait-thread = on

CPU usages

         main thread      compression threads
-----------------------------------------------
before    55.1              51.0~63.3
after     99.9              99.9

Migration result

         total time        busy-ratio
--------------------------------------------------
before   CAN'T COMPLETE    0
after    338692            0

3) 8 threads, compress-wait-thread = off

CPU usages

         main thread      compression threads
-----------------------------------------------
before    43.3              17.5~32.5
after     54.5              54.5~56.8

Migration result

         total time        busy-ratio
--------------------------------------------------
before   427384            0.19
after    125066            0.38

4) 8 threads, compress-wait-thread = on
CPU usages

         main thread      compression threads
-----------------------------------------------
before    96.3              2.3~46.8
after     90.6              90.6~91.8

Migration result

         total time        busy-ratio
--------------------------------------------------
before   CAN'T COMPLETE    0
after    164426            0

5) 16 threads, compress-wait-thread = off
CPU usages

         main thread      compression threads
-----------------------------------------------
before    56.2              6.2~56.2
after     37.8              37.8~40.2

Migration result

         total time        busy-ratio
--------------------------------------------------
before   2317123           0.02
after    149006            0.02

5) 16 threads, compress-wait-thread = on
CPU usages

         main thread      compression threads
-----------------------------------------------
before    48.3               1.7~31.0
after     43.9              42.1~45.6

Migration result

         total time        busy-ratio
--------------------------------------------------
before   1792817           0.00
after    161423            0.00

Xiao Guangrong (4):
  ptr_ring: port ptr_ring from linux kernel to QEMU
  migration: introduce lockless multithreads model
  migration: use lockless Multithread model for compression
  migration: use lockless Multithread model for decompression

 include/qemu/lockless-threads.h |  63 +++++
 include/qemu/ptr_ring.h         | 235 ++++++++++++++++++
 migration/ram.c                 | 535 +++++++++++++++-------------------------
 util/Makefile.objs              |   1 +
 util/lockless-threads.c         | 373 ++++++++++++++++++++++++++++
 5 files changed, 865 insertions(+), 342 deletions(-)
 create mode 100644 include/qemu/lockless-threads.h
 create mode 100644 include/qemu/ptr_ring.h
 create mode 100644 util/lockless-threads.c

-- 
2.14.4

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [PATCH 1/4] ptr_ring: port ptr_ring from linux kernel to QEMU
  2018-10-16 11:10 ` [Qemu-devel] " guangrong.xiao
@ 2018-10-16 11:10   ` guangrong.xiao
  -1 siblings, 0 replies; 28+ messages in thread
From: guangrong.xiao @ 2018-10-16 11:10 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: kvm, quintela, Xiao Guangrong, qemu-devel, peterx, dgilbert,
	wei.w.wang, jiang.biao2

From: Xiao Guangrong <xiaoguangrong@tencent.com>

ptr_ring is good to minimize cache-contention and has the simple model
of memory barrier which will be used by lockless threads model to pass
requests between main migration thread and compression threads

Some changes are made:
1) drop unnecessary APIs, e.g, for _irq, _bh APIs
2) the resize APIs has not been ported
3) drop the locks
4) adjust some comments
5) new API, ptr_ring_disable_batch, has been introduced

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 include/qemu/ptr_ring.h | 235 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 235 insertions(+)
 create mode 100644 include/qemu/ptr_ring.h

diff --git a/include/qemu/ptr_ring.h b/include/qemu/ptr_ring.h
new file mode 100644
index 0000000000..d8266d45f6
--- /dev/null
+++ b/include/qemu/ptr_ring.h
@@ -0,0 +1,235 @@
+/*
+ *    Definitions for the 'struct ptr_ring' datastructure.
+ *
+ *    Author:
+ *        Michael S. Tsirkin <mst@redhat.com>
+ *        Xiao Guangrong <xiaoguangrong@tencent.com>
+ *
+ *    Copyright (C) 2018 Red Hat, Inc.
+ *    Copyright (C) 2018 Tencent, Inc.
+ *
+ *    This program is free software; you can redistribute it and/or modify it
+ *    under the terms of the GNU General Public License as published by the
+ *    Free Software Foundation; either version 2 of the License, or (at your
+ *    option) any later version.
+ *
+ *    This is a limited-size FIFO maintaining pointers in FIFO order, with
+ *    one CPU producing entries and another consuming entries from a FIFO.
+ *
+ *    This implementation tries to minimize cache-contention when there is a
+ *    single producer and a single consumer CPU.
+ */
+
+#ifndef _QEMU_PTR_RING_H
+#define _QEMU_PTR_RING_H 1
+
+#include "qemu/compiler.h"
+#include "qemu/atomic.h"
+
+#define SMP_CACHE_BYTES      64
+#define ____cacheline_aligned_in_smp \
+        __attribute__((__aligned__(SMP_CACHE_BYTES)))
+
+#define WRITE_ONCE(ptr, val) \
+    (*((volatile typeof(ptr) *)(&(ptr))) = (val))
+#define READ_ONCE(ptr) (*((volatile typeof(ptr) *)(&(ptr))))
+
+struct ptr_ring {
+    int producer ____cacheline_aligned_in_smp;
+    int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
+    int consumer_tail; /* next entry to invalidate */
+    /* Shared consumer/producer data */
+    /* Read-only by both the producer and the consumer */
+    int size ____cacheline_aligned_in_smp; /* max entries in queue */
+    int batch; /* number of entries to consume in a batch */
+    void **queue;
+};
+typedef struct ptr_ring Ptr_Ring;
+
+/*
+ * Note: callers invoking this in a loop must use a compiler barrier,
+ * for example cpu_relax().
+ */
+static inline bool ptr_ring_full(struct ptr_ring *r)
+{
+    return r->queue[r->producer];
+}
+
+/*
+ * Note: callers invoking this in a loop must use a compiler barrier,
+ * for example cpu_relax().
+ *
+ * Callers are responsible for making sure pointer that is being queued
+ * points to a valid data.
+ */
+static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
+{
+    if (unlikely(!r->size) || r->queue[r->producer])
+        return -ENOSPC;
+
+    /* Make sure the pointer we are storing points to a valid data. */
+    /* Pairs with READ_ONCE in ptr_ring_consume. */
+    smp_wmb();
+
+    WRITE_ONCE(r->queue[r->producer++], ptr);
+    if (unlikely(r->producer >= r->size))
+        r->producer = 0;
+    return 0;
+}
+
+static inline void *__ptr_ring_peek(struct ptr_ring *r)
+{
+    if (likely(r->size))
+        return READ_ONCE(r->queue[r->consumer_head]);
+    return NULL;
+}
+
+/*
+ * Test ring empty status.
+ *
+ * However, if some other CPU consumes ring entries at the same time,
+ * the value returned is not guaranteed to be correct.
+ *
+ * In this case - to avoid incorrectly detecting the ring
+ * as empty - the CPU consuming the ring entries is responsible
+ * for either consuming all ring entries until the ring is empty,
+ * or synchronizing with some other CPU and causing it to
+ * re-test ptr_ring_empty and/or consume the ring enteries
+ * after the synchronization point.
+ *
+ * Note: callers invoking this in a loop must use a compiler barrier,
+ * for example cpu_relax().
+ */
+static inline bool ptr_ring_empty(struct ptr_ring *r)
+{
+    if (likely(r->size))
+        return !r->queue[READ_ONCE(r->consumer_head)];
+    return true;
+}
+
+/* Must only be called after __ptr_ring_peek returned !NULL */
+static inline void __ptr_ring_discard_one(struct ptr_ring *r)
+{
+    /* Fundamentally, what we want to do is update consumer
+     * index and zero out the entry so producer can reuse it.
+     * Doing it naively at each consume would be as simple as:
+     *       consumer = r->consumer;
+     *       r->queue[consumer++] = NULL;
+     *       if (unlikely(consumer >= r->size))
+     *               consumer = 0;
+     *       r->consumer = consumer;
+     * but that is suboptimal when the ring is full as producer is writing
+     * out new entries in the same cache line.  Defer these updates until a
+     * batch of entries has been consumed.
+     */
+    /* Note: we must keep consumer_head valid at all times for ptr_ring_empty
+     * to work correctly.
+     */
+    int consumer_head = r->consumer_head;
+    int head = consumer_head++;
+
+    /* Once we have processed enough entries invalidate them in
+     * the ring all at once so producer can reuse their space in the ring.
+     * We also do this when we reach end of the ring - not mandatory
+     * but helps keep the implementation simple.
+     */
+    if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
+             consumer_head >= r->size)) {
+        /* Zero out entries in the reverse order: this way we touch the
+         * cache line that producer might currently be reading the last;
+         * producer won't make progress and touch other cache lines
+         * besides the first one until we write out all entries.
+         */
+        while (likely(head >= r->consumer_tail))
+            r->queue[head--] = NULL;
+        r->consumer_tail = consumer_head;
+    }
+    if (unlikely(consumer_head >= r->size)) {
+        consumer_head = 0;
+        r->consumer_tail = 0;
+    }
+    /* matching READ_ONCE in ptr_ring_empty for lockless tests */
+    WRITE_ONCE(r->consumer_head, consumer_head);
+}
+
+static inline void *ptr_ring_consume(struct ptr_ring *r)
+{
+    void *ptr;
+
+    /* The READ_ONCE in __ptr_ring_peek guarantees that anyone
+     * accessing data through the pointer is up to date. Pairs
+     * with smp_wmb in ptr_ring_produce.
+     */
+    ptr = __ptr_ring_peek(r);
+    if (ptr)
+        __ptr_ring_discard_one(r);
+
+    return ptr;
+}
+
+static inline int ptr_ring_consume_batched(struct ptr_ring *r,
+                         void **array, int n)
+{
+    void *ptr;
+    int i;
+
+    for (i = 0; i < n; i++) {
+        ptr = ptr_ring_consume(r);
+        if (!ptr)
+            break;
+        array[i] = ptr;
+    }
+
+    return i;
+}
+
+static inline void **__ptr_ring_init_queue_alloc(unsigned int size)
+{
+    return g_try_malloc0_n(size, sizeof(void *));
+}
+
+static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
+{
+    r->size = size;
+    r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
+    /* We need to set batch at least to 1 to make logic
+     * in __ptr_ring_discard_one work correctly.
+     * Batching too much (because ring is small) would cause a lot of
+     * burstiness. Needs tuning, for now disable batching.
+     */
+    if (r->batch > r->size / 2 || !r->batch)
+        r->batch = 1;
+}
+
+/*
+ * Disable batching so that there is no consumered entry in the ring.
+ *
+ * It is convenient if it makes sure that the ring is large enough to
+ * contain all requests, i.e, ptr_ring_produce can not fail.
+ */
+static inline void ptr_ring_disable_batch(struct ptr_ring *r)
+{
+    r->batch = 1;
+}
+
+static inline int ptr_ring_init(struct ptr_ring *r, int size)
+{
+    r->queue = __ptr_ring_init_queue_alloc(size);
+    if (!r->queue)
+        return -ENOMEM;
+
+    __ptr_ring_set_size(r, size);
+    r->producer = r->consumer_head = r->consumer_tail = 0;
+    return 0;
+}
+
+static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
+{
+    void *ptr;
+
+    if (destroy)
+        while ((ptr = ptr_ring_consume(r)))
+            destroy(ptr);
+    g_free(r->queue);
+}
+#endif
-- 
2.14.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH 1/4] ptr_ring: port ptr_ring from linux kernel to QEMU
@ 2018-10-16 11:10   ` guangrong.xiao
  0 siblings, 0 replies; 28+ messages in thread
From: guangrong.xiao @ 2018-10-16 11:10 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: qemu-devel, kvm, dgilbert, peterx, wei.w.wang, jiang.biao2,
	eblake, quintela, Xiao Guangrong

From: Xiao Guangrong <xiaoguangrong@tencent.com>

ptr_ring is good to minimize cache-contention and has the simple model
of memory barrier which will be used by lockless threads model to pass
requests between main migration thread and compression threads

Some changes are made:
1) drop unnecessary APIs, e.g, for _irq, _bh APIs
2) the resize APIs has not been ported
3) drop the locks
4) adjust some comments
5) new API, ptr_ring_disable_batch, has been introduced

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 include/qemu/ptr_ring.h | 235 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 235 insertions(+)
 create mode 100644 include/qemu/ptr_ring.h

diff --git a/include/qemu/ptr_ring.h b/include/qemu/ptr_ring.h
new file mode 100644
index 0000000000..d8266d45f6
--- /dev/null
+++ b/include/qemu/ptr_ring.h
@@ -0,0 +1,235 @@
+/*
+ *    Definitions for the 'struct ptr_ring' datastructure.
+ *
+ *    Author:
+ *        Michael S. Tsirkin <mst@redhat.com>
+ *        Xiao Guangrong <xiaoguangrong@tencent.com>
+ *
+ *    Copyright (C) 2018 Red Hat, Inc.
+ *    Copyright (C) 2018 Tencent, Inc.
+ *
+ *    This program is free software; you can redistribute it and/or modify it
+ *    under the terms of the GNU General Public License as published by the
+ *    Free Software Foundation; either version 2 of the License, or (at your
+ *    option) any later version.
+ *
+ *    This is a limited-size FIFO maintaining pointers in FIFO order, with
+ *    one CPU producing entries and another consuming entries from a FIFO.
+ *
+ *    This implementation tries to minimize cache-contention when there is a
+ *    single producer and a single consumer CPU.
+ */
+
+#ifndef _QEMU_PTR_RING_H
+#define _QEMU_PTR_RING_H 1
+
+#include "qemu/compiler.h"
+#include "qemu/atomic.h"
+
+#define SMP_CACHE_BYTES      64
+#define ____cacheline_aligned_in_smp \
+        __attribute__((__aligned__(SMP_CACHE_BYTES)))
+
+#define WRITE_ONCE(ptr, val) \
+    (*((volatile typeof(ptr) *)(&(ptr))) = (val))
+#define READ_ONCE(ptr) (*((volatile typeof(ptr) *)(&(ptr))))
+
+struct ptr_ring {
+    int producer ____cacheline_aligned_in_smp;
+    int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
+    int consumer_tail; /* next entry to invalidate */
+    /* Shared consumer/producer data */
+    /* Read-only by both the producer and the consumer */
+    int size ____cacheline_aligned_in_smp; /* max entries in queue */
+    int batch; /* number of entries to consume in a batch */
+    void **queue;
+};
+typedef struct ptr_ring Ptr_Ring;
+
+/*
+ * Note: callers invoking this in a loop must use a compiler barrier,
+ * for example cpu_relax().
+ */
+static inline bool ptr_ring_full(struct ptr_ring *r)
+{
+    return r->queue[r->producer];
+}
+
+/*
+ * Note: callers invoking this in a loop must use a compiler barrier,
+ * for example cpu_relax().
+ *
+ * Callers are responsible for making sure pointer that is being queued
+ * points to a valid data.
+ */
+static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
+{
+    if (unlikely(!r->size) || r->queue[r->producer])
+        return -ENOSPC;
+
+    /* Make sure the pointer we are storing points to a valid data. */
+    /* Pairs with READ_ONCE in ptr_ring_consume. */
+    smp_wmb();
+
+    WRITE_ONCE(r->queue[r->producer++], ptr);
+    if (unlikely(r->producer >= r->size))
+        r->producer = 0;
+    return 0;
+}
+
+static inline void *__ptr_ring_peek(struct ptr_ring *r)
+{
+    if (likely(r->size))
+        return READ_ONCE(r->queue[r->consumer_head]);
+    return NULL;
+}
+
+/*
+ * Test ring empty status.
+ *
+ * However, if some other CPU consumes ring entries at the same time,
+ * the value returned is not guaranteed to be correct.
+ *
+ * In this case - to avoid incorrectly detecting the ring
+ * as empty - the CPU consuming the ring entries is responsible
+ * for either consuming all ring entries until the ring is empty,
+ * or synchronizing with some other CPU and causing it to
+ * re-test ptr_ring_empty and/or consume the ring enteries
+ * after the synchronization point.
+ *
+ * Note: callers invoking this in a loop must use a compiler barrier,
+ * for example cpu_relax().
+ */
+static inline bool ptr_ring_empty(struct ptr_ring *r)
+{
+    if (likely(r->size))
+        return !r->queue[READ_ONCE(r->consumer_head)];
+    return true;
+}
+
+/* Must only be called after __ptr_ring_peek returned !NULL */
+static inline void __ptr_ring_discard_one(struct ptr_ring *r)
+{
+    /* Fundamentally, what we want to do is update consumer
+     * index and zero out the entry so producer can reuse it.
+     * Doing it naively at each consume would be as simple as:
+     *       consumer = r->consumer;
+     *       r->queue[consumer++] = NULL;
+     *       if (unlikely(consumer >= r->size))
+     *               consumer = 0;
+     *       r->consumer = consumer;
+     * but that is suboptimal when the ring is full as producer is writing
+     * out new entries in the same cache line.  Defer these updates until a
+     * batch of entries has been consumed.
+     */
+    /* Note: we must keep consumer_head valid at all times for ptr_ring_empty
+     * to work correctly.
+     */
+    int consumer_head = r->consumer_head;
+    int head = consumer_head++;
+
+    /* Once we have processed enough entries invalidate them in
+     * the ring all at once so producer can reuse their space in the ring.
+     * We also do this when we reach end of the ring - not mandatory
+     * but helps keep the implementation simple.
+     */
+    if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
+             consumer_head >= r->size)) {
+        /* Zero out entries in the reverse order: this way we touch the
+         * cache line that producer might currently be reading the last;
+         * producer won't make progress and touch other cache lines
+         * besides the first one until we write out all entries.
+         */
+        while (likely(head >= r->consumer_tail))
+            r->queue[head--] = NULL;
+        r->consumer_tail = consumer_head;
+    }
+    if (unlikely(consumer_head >= r->size)) {
+        consumer_head = 0;
+        r->consumer_tail = 0;
+    }
+    /* matching READ_ONCE in ptr_ring_empty for lockless tests */
+    WRITE_ONCE(r->consumer_head, consumer_head);
+}
+
+static inline void *ptr_ring_consume(struct ptr_ring *r)
+{
+    void *ptr;
+
+    /* The READ_ONCE in __ptr_ring_peek guarantees that anyone
+     * accessing data through the pointer is up to date. Pairs
+     * with smp_wmb in ptr_ring_produce.
+     */
+    ptr = __ptr_ring_peek(r);
+    if (ptr)
+        __ptr_ring_discard_one(r);
+
+    return ptr;
+}
+
+static inline int ptr_ring_consume_batched(struct ptr_ring *r,
+                         void **array, int n)
+{
+    void *ptr;
+    int i;
+
+    for (i = 0; i < n; i++) {
+        ptr = ptr_ring_consume(r);
+        if (!ptr)
+            break;
+        array[i] = ptr;
+    }
+
+    return i;
+}
+
+static inline void **__ptr_ring_init_queue_alloc(unsigned int size)
+{
+    return g_try_malloc0_n(size, sizeof(void *));
+}
+
+static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
+{
+    r->size = size;
+    r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
+    /* We need to set batch at least to 1 to make logic
+     * in __ptr_ring_discard_one work correctly.
+     * Batching too much (because ring is small) would cause a lot of
+     * burstiness. Needs tuning, for now disable batching.
+     */
+    if (r->batch > r->size / 2 || !r->batch)
+        r->batch = 1;
+}
+
+/*
+ * Disable batching so that there is no consumered entry in the ring.
+ *
+ * It is convenient if it makes sure that the ring is large enough to
+ * contain all requests, i.e, ptr_ring_produce can not fail.
+ */
+static inline void ptr_ring_disable_batch(struct ptr_ring *r)
+{
+    r->batch = 1;
+}
+
+static inline int ptr_ring_init(struct ptr_ring *r, int size)
+{
+    r->queue = __ptr_ring_init_queue_alloc(size);
+    if (!r->queue)
+        return -ENOMEM;
+
+    __ptr_ring_set_size(r, size);
+    r->producer = r->consumer_head = r->consumer_tail = 0;
+    return 0;
+}
+
+static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
+{
+    void *ptr;
+
+    if (destroy)
+        while ((ptr = ptr_ring_consume(r)))
+            destroy(ptr);
+    g_free(r->queue);
+}
+#endif
-- 
2.14.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [PATCH 2/4] migration: introduce lockless multithreads model
  2018-10-16 11:10 ` [Qemu-devel] " guangrong.xiao
@ 2018-10-16 11:10   ` guangrong.xiao
  -1 siblings, 0 replies; 28+ messages in thread
From: guangrong.xiao @ 2018-10-16 11:10 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: kvm, quintela, Xiao Guangrong, qemu-devel, peterx, dgilbert,
	wei.w.wang, jiang.biao2

From: Xiao Guangrong <xiaoguangrong@tencent.com>

Current implementation of compression and decompression are very
hard to be enabled on productions. We noticed that too many wait-wakes
go to kernel space and CPU usages are very low even if the system
is really free

The reasons are:
1) there are two many locks used to do synchronous,there
  is a global lock and each single thread has its own lock,
  migration thread and work threads need to go to sleep if
  these locks are busy

2) migration thread separately submits request to the thread
   however, only one request can be pended, that means, the
   thread has to go to sleep after finishing the request

To make it work better, we introduce a lockless multithread model,
the user, currently it is the migration thread, submits request
to each thread which has its own ring whose capacity is 4 and
puts the result to a global ring where the user fetches result
out and do remaining operations for the request, e.g, posting the
compressed data out for migration on the source QEMU

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 include/qemu/lockless-threads.h |  63 +++++++
 util/Makefile.objs              |   1 +
 util/lockless-threads.c         | 373 ++++++++++++++++++++++++++++++++++++++++
 3 files changed, 437 insertions(+)
 create mode 100644 include/qemu/lockless-threads.h
 create mode 100644 util/lockless-threads.c

diff --git a/include/qemu/lockless-threads.h b/include/qemu/lockless-threads.h
new file mode 100644
index 0000000000..9340d3a748
--- /dev/null
+++ b/include/qemu/lockless-threads.h
@@ -0,0 +1,63 @@
+/*
+ * Lockless Multithreads Abstraction
+ *
+ * This is the abstraction layer for lockless multithreads management.
+ *
+ * Note: currently only one producer is allowed.
+ *
+ * Copyright(C) 2018 Tencent Corporation.
+ *
+ * Author:
+ *   Xiao Guangrong <xiaoguangrong@tencent.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#ifndef QEMU_LOCKLESS_THREAD_H
+#define QEMU_LOCKLESS_THREAD_H
+
+#include "qemu/queue.h"
+#include "qemu/thread.h"
+#include "qemu/ptr_ring.h"
+
+/*
+ * the request representation which contains the internally used mete data,
+ * it can be embedded to user's self-defined data struct and the user can
+ * use container_of() to get the self-defined data
+ */
+struct ThreadRequest {
+    QSLIST_ENTRY(ThreadRequest) node;
+    unsigned int thread_index;
+};
+typedef struct ThreadRequest ThreadRequest;
+
+typedef struct Threads Threads;
+
+/* the size of thread local request ring on default */
+#define DEFAULT_THREAD_RING_SIZE 4
+
+Threads *threads_create(unsigned int threads_nr, const char *name,
+                        int thread_ring_size,
+                        ThreadRequest *(*thread_request_init)(void),
+                        void (*thread_request_uninit)(ThreadRequest *request),
+                        void (*thread_request_handler)(ThreadRequest *request),
+                        void (*thread_request_done)(ThreadRequest *request));
+void threads_destroy(Threads *threads);
+
+/*
+ * find a free request and associate it with a free thread.
+ * If no request or no thread is free, return NULL
+ */
+ThreadRequest *threads_submit_request_prepare(Threads *threads);
+/*
+ * push the request to its thread's local ring and notify the thread
+ */
+void threads_submit_request_commit(Threads *threads, ThreadRequest *request);
+
+/*
+ * wait all threads to complete the request filled in their local rings
+ * to make sure there is no previous request exists.
+ */
+void threads_wait_done(Threads *threads);
+#endif
diff --git a/util/Makefile.objs b/util/Makefile.objs
index 0820923c18..deb5c972d5 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -50,5 +50,6 @@ util-obj-y += range.o
 util-obj-y += stats64.o
 util-obj-y += systemd.o
 util-obj-y += iova-tree.o
+util-obj-y += lockless-threads.o
 util-obj-$(CONFIG_LINUX) += vfio-helpers.o
 util-obj-$(CONFIG_OPENGL) += drm.o
diff --git a/util/lockless-threads.c b/util/lockless-threads.c
new file mode 100644
index 0000000000..50cf143c03
--- /dev/null
+++ b/util/lockless-threads.c
@@ -0,0 +1,373 @@
+/*
+ * Lockless Multithreads Implementation
+ *
+ * Implement lockless multithreads management.
+ *
+ * Note: currently only one producer is allowed.
+ *
+ * Copyright(C) 2018 Tencent Corporation.
+ *
+ * Author:
+ *   Xiao Guangrong <xiaoguangrong@tencent.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/host-utils.h"
+#include "qemu/lockless-threads.h"
+
+struct ThreadLocal {
+    QemuThread thread;
+
+    /* the event used to wake up the thread */
+    QemuEvent ev;
+
+    struct Threads *threads;
+
+    /* local request ring which is filled by the user */
+    Ptr_Ring request_ring;
+
+    /* the index of the thread */
+    int self;
+
+    /* thread is useless and needs to exit */
+    bool quit;
+};
+typedef struct ThreadLocal ThreadLocal;
+
+/*
+ * the main data struct represents multithreads which is shared by
+ * all threads
+ */
+struct Threads {
+    const char *name;
+    unsigned int threads_nr;
+    /* the request is pushed to the thread with round-robin manner */
+    unsigned int current_thread_index;
+
+    int thread_ring_size;
+    int total_requests;
+
+    /* the request is pre-allocated and linked in the list */
+    int free_requests_nr;
+    QSLIST_HEAD(, ThreadRequest) free_requests;
+
+    /* the constructor of request */
+    ThreadRequest *(*thread_request_init)(void);
+    /* the destructor of request */
+    void (*thread_request_uninit)(ThreadRequest *request);
+    /* the handler of the request which is called in the thread */
+    void (*thread_request_handler)(ThreadRequest *request);
+    /*
+     * the handler to process the result which is called in the
+     * user's context
+     */
+    void (*thread_request_done)(ThreadRequest *request);
+
+    /* the thread push the result to this ring so it has multiple producers */
+    QemuSpin done_ring_lock;
+    Ptr_Ring request_done_ring;
+
+    ThreadLocal per_thread_data[0];
+};
+typedef struct Threads Threads;
+
+static void put_done_request(Threads *threads, ThreadRequest *request)
+{
+    int ret;
+
+    qemu_spin_lock(&threads->done_ring_lock);
+    ret = ptr_ring_produce(&threads->request_done_ring, request);
+    /* there should be enough room to save all request. */
+    assert(!ret);
+    qemu_spin_unlock(&threads->done_ring_lock);
+}
+
+/* retry to see if there is avilable request before actually go to wait. */
+#define BUSY_WAIT_COUNT 1000
+
+static ThreadRequest *thread_busy_wait_for_request(ThreadLocal *thread)
+{
+    ThreadRequest *request;
+    int count = 0;
+
+    for (count = 0; count < BUSY_WAIT_COUNT; count++) {
+        request = ptr_ring_consume(&thread->request_ring);
+        if (request) {
+            return request;
+        }
+
+        cpu_relax();
+    }
+
+    return NULL;
+}
+
+static void *thread_run(void *opaque)
+{
+    ThreadLocal *self_data = (ThreadLocal *)opaque;
+    Threads *threads = self_data->threads;
+    void (*handler)(ThreadRequest *data) = threads->thread_request_handler;
+    ThreadRequest *request;
+
+    for ( ; !atomic_read(&self_data->quit); ) {
+        qemu_event_reset(&self_data->ev);
+
+        request = thread_busy_wait_for_request(self_data);
+        if (!request) {
+            qemu_event_wait(&self_data->ev);
+            continue;
+        }
+        handler(request);
+        put_done_request(threads, request);
+    }
+
+    return NULL;
+}
+
+static void add_free_request(Threads *threads, ThreadRequest *request)
+{
+    QSLIST_INSERT_HEAD(&threads->free_requests, request, node);
+    threads->free_requests_nr++;
+}
+
+static ThreadRequest *get_and_remove_first_free_request(Threads *threads)
+{
+    ThreadRequest *request;
+
+    if (QSLIST_EMPTY(&threads->free_requests)) {
+        return NULL;
+    }
+
+    request = QSLIST_FIRST(&threads->free_requests);
+    QSLIST_REMOVE_HEAD(&threads->free_requests, node);
+    threads->free_requests_nr--;
+    return request;
+}
+
+static void uninit_requests(Threads *threads, int free_nr)
+{
+    ThreadRequest *request;
+
+    /*
+     * all requests should be released to the list if threads are being
+     * destroyed, i,e. should call threads_wait_done() first.
+     */
+    assert(threads->free_requests_nr == free_nr);
+
+    while ((request = get_and_remove_first_free_request(threads))) {
+        threads->thread_request_uninit(request);
+    }
+
+    assert(ptr_ring_empty(&threads->request_done_ring));
+     ptr_ring_cleanup(&threads->request_done_ring, NULL);
+}
+
+static int init_requests(Threads *threads, int total_requests)
+{
+    ThreadRequest *request;
+    int i, free_nr = 0;
+
+    if (ptr_ring_init(&threads->request_done_ring, total_requests) < 0) {
+        return -1;
+    }
+    ptr_ring_disable_batch(&threads->request_done_ring);
+
+    QSLIST_INIT(&threads->free_requests);
+    for (i = 0; i < total_requests; i++) {
+        request = threads->thread_request_init();
+        if (!request) {
+            goto cleanup;
+        }
+
+        free_nr++;
+        add_free_request(threads, request);
+    }
+    return 0;
+
+cleanup:
+    uninit_requests(threads, free_nr);
+    return -1;
+}
+
+static void uninit_thread_data(Threads *threads, int num)
+{
+    ThreadLocal *thread_local = threads->per_thread_data;
+    int i;
+
+    for (i = 0; i < num; i++) {
+        thread_local[i].quit = true;
+        qemu_event_set(&thread_local[i].ev);
+        qemu_thread_join(&thread_local[i].thread);
+        qemu_event_destroy(&thread_local[i].ev);
+        assert(ptr_ring_empty(&thread_local[i].request_ring));
+
+        /* nothing is left in the ring. */
+        ptr_ring_cleanup(&thread_local[i].request_ring, NULL);
+    }
+}
+
+static int init_thread_data(Threads *threads, int threads_nr)
+{
+    ThreadLocal *thread_local = threads->per_thread_data;
+    char *name;
+    int i;
+
+    for (i = 0; i < threads_nr; i++) {
+        if (ptr_ring_init(&thread_local[i].request_ring,
+                          threads->thread_ring_size) < 0) {
+            goto exit;
+        }
+        ptr_ring_disable_batch(&thread_local[i].request_ring);
+
+        qemu_event_init(&thread_local[i].ev, false);
+        thread_local[i].threads = threads;
+        thread_local[i].self = i;
+        name = g_strdup_printf("%s/%d", threads->name, thread_local[i].self);
+        qemu_thread_create(&thread_local[i].thread, name,
+                           thread_run, &thread_local[i], QEMU_THREAD_JOINABLE);
+        g_free(name);
+    }
+    return 0;
+
+ exit:
+    uninit_thread_data(threads, i);
+    return -1;
+}
+
+Threads *threads_create(unsigned int threads_nr, const char *name,
+                        int thread_ring_size,
+                        ThreadRequest *(*thread_request_init)(void),
+                        void (*thread_request_uninit)(ThreadRequest *request),
+                        void (*thread_request_handler)(ThreadRequest *request),
+                        void (*thread_request_done)(ThreadRequest *request))
+{
+    Threads *threads;
+    int total_requests;
+
+    threads = g_malloc0(sizeof(*threads) + threads_nr * sizeof(ThreadLocal));
+    threads->name = name;
+    threads->thread_request_init = thread_request_init;
+    threads->thread_request_uninit = thread_request_uninit;
+    threads->thread_request_handler = thread_request_handler;
+    threads->thread_request_done = thread_request_done;
+    qemu_spin_init(&threads->done_ring_lock);
+
+    threads->thread_ring_size = thread_ring_size;
+    total_requests = threads->thread_ring_size * threads_nr;
+    if (init_requests(threads, total_requests) < 0) {
+        goto exit;
+    }
+    threads->total_requests = total_requests;
+
+    if (init_thread_data(threads, threads_nr) < 0) {
+        goto exit;
+    }
+    threads->threads_nr = threads_nr;
+    return threads;
+
+exit:
+    threads_destroy(threads);
+    return NULL;
+}
+
+void threads_destroy(Threads *threads)
+{
+    uninit_thread_data(threads, threads->threads_nr);
+    uninit_requests(threads, threads->total_requests);
+    g_free(threads);
+}
+
+static int find_free_thread(Threads *threads, int range)
+{
+    int current_index, index, try = 0;
+
+    current_index = threads->current_thread_index % threads->threads_nr;
+    index = current_index;
+
+    do {
+        index = index % threads->threads_nr;
+        if (!ptr_ring_full(&threads->per_thread_data[index].request_ring)) {
+            threads->current_thread_index = index;
+            return index;
+        }
+
+        if (++try > range) {
+            return -1;
+        }
+    } while (++index != current_index);
+
+    return -1;
+}
+
+ThreadRequest *threads_submit_request_prepare(Threads *threads)
+{
+    ThreadRequest *request;
+    int index;
+
+    /* seek a free one in all threads. */
+    index = find_free_thread(threads, threads->threads_nr);
+    if (index < 0) {
+        return NULL;
+    }
+
+    /* try to get the request from the list */
+    request = get_and_remove_first_free_request(threads);
+    if (request) {
+        goto got_request;
+    }
+
+    /* get the request already been handled by the threads */
+    request = ptr_ring_consume(&threads->request_done_ring);
+    if (request) {
+        threads->thread_request_done(request);
+        goto got_request;
+    }
+
+    return NULL;
+
+got_request:
+    request->thread_index = index;
+    return request;
+}
+
+void threads_submit_request_commit(Threads *threads, ThreadRequest *request)
+{
+    int ret, index = request->thread_index;
+    ThreadLocal *thread_local = &threads->per_thread_data[index];
+
+    ret = ptr_ring_produce(&thread_local->request_ring, request);
+
+    /*
+     * we have detected that the thread's ring is not full in
+     * threads_submit_request_prepare(), there should be free
+     * room in the ring
+     */
+    assert(!ret);
+    /* new request arrived, notify the thread */
+    qemu_event_set(&thread_local->ev);
+
+    /* we have used this entry, search from the next one. */
+    threads->current_thread_index = ++index;
+}
+
+void threads_wait_done(Threads *threads)
+{
+    ThreadRequest *requests[DEFAULT_THREAD_RING_SIZE * 2];
+    int nr;
+
+retry:
+    nr = ptr_ring_consume_batched(&threads->request_done_ring,
+                                  (void **)requests, ARRAY_SIZE(requests));
+    while (nr--) {
+        threads->thread_request_done(requests[nr]);
+       add_free_request(threads, requests[nr]);
+    }
+
+    if (threads->free_requests_nr != threads->total_requests) {
+        cpu_relax();
+        goto retry;
+    }
+}
-- 
2.14.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH 2/4] migration: introduce lockless multithreads model
@ 2018-10-16 11:10   ` guangrong.xiao
  0 siblings, 0 replies; 28+ messages in thread
From: guangrong.xiao @ 2018-10-16 11:10 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: qemu-devel, kvm, dgilbert, peterx, wei.w.wang, jiang.biao2,
	eblake, quintela, Xiao Guangrong

From: Xiao Guangrong <xiaoguangrong@tencent.com>

Current implementation of compression and decompression are very
hard to be enabled on productions. We noticed that too many wait-wakes
go to kernel space and CPU usages are very low even if the system
is really free

The reasons are:
1) there are two many locks used to do synchronous,there
  is a global lock and each single thread has its own lock,
  migration thread and work threads need to go to sleep if
  these locks are busy

2) migration thread separately submits request to the thread
   however, only one request can be pended, that means, the
   thread has to go to sleep after finishing the request

To make it work better, we introduce a lockless multithread model,
the user, currently it is the migration thread, submits request
to each thread which has its own ring whose capacity is 4 and
puts the result to a global ring where the user fetches result
out and do remaining operations for the request, e.g, posting the
compressed data out for migration on the source QEMU

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 include/qemu/lockless-threads.h |  63 +++++++
 util/Makefile.objs              |   1 +
 util/lockless-threads.c         | 373 ++++++++++++++++++++++++++++++++++++++++
 3 files changed, 437 insertions(+)
 create mode 100644 include/qemu/lockless-threads.h
 create mode 100644 util/lockless-threads.c

diff --git a/include/qemu/lockless-threads.h b/include/qemu/lockless-threads.h
new file mode 100644
index 0000000000..9340d3a748
--- /dev/null
+++ b/include/qemu/lockless-threads.h
@@ -0,0 +1,63 @@
+/*
+ * Lockless Multithreads Abstraction
+ *
+ * This is the abstraction layer for lockless multithreads management.
+ *
+ * Note: currently only one producer is allowed.
+ *
+ * Copyright(C) 2018 Tencent Corporation.
+ *
+ * Author:
+ *   Xiao Guangrong <xiaoguangrong@tencent.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#ifndef QEMU_LOCKLESS_THREAD_H
+#define QEMU_LOCKLESS_THREAD_H
+
+#include "qemu/queue.h"
+#include "qemu/thread.h"
+#include "qemu/ptr_ring.h"
+
+/*
+ * the request representation which contains the internally used mete data,
+ * it can be embedded to user's self-defined data struct and the user can
+ * use container_of() to get the self-defined data
+ */
+struct ThreadRequest {
+    QSLIST_ENTRY(ThreadRequest) node;
+    unsigned int thread_index;
+};
+typedef struct ThreadRequest ThreadRequest;
+
+typedef struct Threads Threads;
+
+/* the size of thread local request ring on default */
+#define DEFAULT_THREAD_RING_SIZE 4
+
+Threads *threads_create(unsigned int threads_nr, const char *name,
+                        int thread_ring_size,
+                        ThreadRequest *(*thread_request_init)(void),
+                        void (*thread_request_uninit)(ThreadRequest *request),
+                        void (*thread_request_handler)(ThreadRequest *request),
+                        void (*thread_request_done)(ThreadRequest *request));
+void threads_destroy(Threads *threads);
+
+/*
+ * find a free request and associate it with a free thread.
+ * If no request or no thread is free, return NULL
+ */
+ThreadRequest *threads_submit_request_prepare(Threads *threads);
+/*
+ * push the request to its thread's local ring and notify the thread
+ */
+void threads_submit_request_commit(Threads *threads, ThreadRequest *request);
+
+/*
+ * wait all threads to complete the request filled in their local rings
+ * to make sure there is no previous request exists.
+ */
+void threads_wait_done(Threads *threads);
+#endif
diff --git a/util/Makefile.objs b/util/Makefile.objs
index 0820923c18..deb5c972d5 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -50,5 +50,6 @@ util-obj-y += range.o
 util-obj-y += stats64.o
 util-obj-y += systemd.o
 util-obj-y += iova-tree.o
+util-obj-y += lockless-threads.o
 util-obj-$(CONFIG_LINUX) += vfio-helpers.o
 util-obj-$(CONFIG_OPENGL) += drm.o
diff --git a/util/lockless-threads.c b/util/lockless-threads.c
new file mode 100644
index 0000000000..50cf143c03
--- /dev/null
+++ b/util/lockless-threads.c
@@ -0,0 +1,373 @@
+/*
+ * Lockless Multithreads Implementation
+ *
+ * Implement lockless multithreads management.
+ *
+ * Note: currently only one producer is allowed.
+ *
+ * Copyright(C) 2018 Tencent Corporation.
+ *
+ * Author:
+ *   Xiao Guangrong <xiaoguangrong@tencent.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/host-utils.h"
+#include "qemu/lockless-threads.h"
+
+struct ThreadLocal {
+    QemuThread thread;
+
+    /* the event used to wake up the thread */
+    QemuEvent ev;
+
+    struct Threads *threads;
+
+    /* local request ring which is filled by the user */
+    Ptr_Ring request_ring;
+
+    /* the index of the thread */
+    int self;
+
+    /* thread is useless and needs to exit */
+    bool quit;
+};
+typedef struct ThreadLocal ThreadLocal;
+
+/*
+ * the main data struct represents multithreads which is shared by
+ * all threads
+ */
+struct Threads {
+    const char *name;
+    unsigned int threads_nr;
+    /* the request is pushed to the thread with round-robin manner */
+    unsigned int current_thread_index;
+
+    int thread_ring_size;
+    int total_requests;
+
+    /* the request is pre-allocated and linked in the list */
+    int free_requests_nr;
+    QSLIST_HEAD(, ThreadRequest) free_requests;
+
+    /* the constructor of request */
+    ThreadRequest *(*thread_request_init)(void);
+    /* the destructor of request */
+    void (*thread_request_uninit)(ThreadRequest *request);
+    /* the handler of the request which is called in the thread */
+    void (*thread_request_handler)(ThreadRequest *request);
+    /*
+     * the handler to process the result which is called in the
+     * user's context
+     */
+    void (*thread_request_done)(ThreadRequest *request);
+
+    /* the thread push the result to this ring so it has multiple producers */
+    QemuSpin done_ring_lock;
+    Ptr_Ring request_done_ring;
+
+    ThreadLocal per_thread_data[0];
+};
+typedef struct Threads Threads;
+
+static void put_done_request(Threads *threads, ThreadRequest *request)
+{
+    int ret;
+
+    qemu_spin_lock(&threads->done_ring_lock);
+    ret = ptr_ring_produce(&threads->request_done_ring, request);
+    /* there should be enough room to save all request. */
+    assert(!ret);
+    qemu_spin_unlock(&threads->done_ring_lock);
+}
+
+/* retry to see if there is avilable request before actually go to wait. */
+#define BUSY_WAIT_COUNT 1000
+
+static ThreadRequest *thread_busy_wait_for_request(ThreadLocal *thread)
+{
+    ThreadRequest *request;
+    int count = 0;
+
+    for (count = 0; count < BUSY_WAIT_COUNT; count++) {
+        request = ptr_ring_consume(&thread->request_ring);
+        if (request) {
+            return request;
+        }
+
+        cpu_relax();
+    }
+
+    return NULL;
+}
+
+static void *thread_run(void *opaque)
+{
+    ThreadLocal *self_data = (ThreadLocal *)opaque;
+    Threads *threads = self_data->threads;
+    void (*handler)(ThreadRequest *data) = threads->thread_request_handler;
+    ThreadRequest *request;
+
+    for ( ; !atomic_read(&self_data->quit); ) {
+        qemu_event_reset(&self_data->ev);
+
+        request = thread_busy_wait_for_request(self_data);
+        if (!request) {
+            qemu_event_wait(&self_data->ev);
+            continue;
+        }
+        handler(request);
+        put_done_request(threads, request);
+    }
+
+    return NULL;
+}
+
+static void add_free_request(Threads *threads, ThreadRequest *request)
+{
+    QSLIST_INSERT_HEAD(&threads->free_requests, request, node);
+    threads->free_requests_nr++;
+}
+
+static ThreadRequest *get_and_remove_first_free_request(Threads *threads)
+{
+    ThreadRequest *request;
+
+    if (QSLIST_EMPTY(&threads->free_requests)) {
+        return NULL;
+    }
+
+    request = QSLIST_FIRST(&threads->free_requests);
+    QSLIST_REMOVE_HEAD(&threads->free_requests, node);
+    threads->free_requests_nr--;
+    return request;
+}
+
+static void uninit_requests(Threads *threads, int free_nr)
+{
+    ThreadRequest *request;
+
+    /*
+     * all requests should be released to the list if threads are being
+     * destroyed, i,e. should call threads_wait_done() first.
+     */
+    assert(threads->free_requests_nr == free_nr);
+
+    while ((request = get_and_remove_first_free_request(threads))) {
+        threads->thread_request_uninit(request);
+    }
+
+    assert(ptr_ring_empty(&threads->request_done_ring));
+     ptr_ring_cleanup(&threads->request_done_ring, NULL);
+}
+
+static int init_requests(Threads *threads, int total_requests)
+{
+    ThreadRequest *request;
+    int i, free_nr = 0;
+
+    if (ptr_ring_init(&threads->request_done_ring, total_requests) < 0) {
+        return -1;
+    }
+    ptr_ring_disable_batch(&threads->request_done_ring);
+
+    QSLIST_INIT(&threads->free_requests);
+    for (i = 0; i < total_requests; i++) {
+        request = threads->thread_request_init();
+        if (!request) {
+            goto cleanup;
+        }
+
+        free_nr++;
+        add_free_request(threads, request);
+    }
+    return 0;
+
+cleanup:
+    uninit_requests(threads, free_nr);
+    return -1;
+}
+
+static void uninit_thread_data(Threads *threads, int num)
+{
+    ThreadLocal *thread_local = threads->per_thread_data;
+    int i;
+
+    for (i = 0; i < num; i++) {
+        thread_local[i].quit = true;
+        qemu_event_set(&thread_local[i].ev);
+        qemu_thread_join(&thread_local[i].thread);
+        qemu_event_destroy(&thread_local[i].ev);
+        assert(ptr_ring_empty(&thread_local[i].request_ring));
+
+        /* nothing is left in the ring. */
+        ptr_ring_cleanup(&thread_local[i].request_ring, NULL);
+    }
+}
+
+static int init_thread_data(Threads *threads, int threads_nr)
+{
+    ThreadLocal *thread_local = threads->per_thread_data;
+    char *name;
+    int i;
+
+    for (i = 0; i < threads_nr; i++) {
+        if (ptr_ring_init(&thread_local[i].request_ring,
+                          threads->thread_ring_size) < 0) {
+            goto exit;
+        }
+        ptr_ring_disable_batch(&thread_local[i].request_ring);
+
+        qemu_event_init(&thread_local[i].ev, false);
+        thread_local[i].threads = threads;
+        thread_local[i].self = i;
+        name = g_strdup_printf("%s/%d", threads->name, thread_local[i].self);
+        qemu_thread_create(&thread_local[i].thread, name,
+                           thread_run, &thread_local[i], QEMU_THREAD_JOINABLE);
+        g_free(name);
+    }
+    return 0;
+
+ exit:
+    uninit_thread_data(threads, i);
+    return -1;
+}
+
+Threads *threads_create(unsigned int threads_nr, const char *name,
+                        int thread_ring_size,
+                        ThreadRequest *(*thread_request_init)(void),
+                        void (*thread_request_uninit)(ThreadRequest *request),
+                        void (*thread_request_handler)(ThreadRequest *request),
+                        void (*thread_request_done)(ThreadRequest *request))
+{
+    Threads *threads;
+    int total_requests;
+
+    threads = g_malloc0(sizeof(*threads) + threads_nr * sizeof(ThreadLocal));
+    threads->name = name;
+    threads->thread_request_init = thread_request_init;
+    threads->thread_request_uninit = thread_request_uninit;
+    threads->thread_request_handler = thread_request_handler;
+    threads->thread_request_done = thread_request_done;
+    qemu_spin_init(&threads->done_ring_lock);
+
+    threads->thread_ring_size = thread_ring_size;
+    total_requests = threads->thread_ring_size * threads_nr;
+    if (init_requests(threads, total_requests) < 0) {
+        goto exit;
+    }
+    threads->total_requests = total_requests;
+
+    if (init_thread_data(threads, threads_nr) < 0) {
+        goto exit;
+    }
+    threads->threads_nr = threads_nr;
+    return threads;
+
+exit:
+    threads_destroy(threads);
+    return NULL;
+}
+
+void threads_destroy(Threads *threads)
+{
+    uninit_thread_data(threads, threads->threads_nr);
+    uninit_requests(threads, threads->total_requests);
+    g_free(threads);
+}
+
+static int find_free_thread(Threads *threads, int range)
+{
+    int current_index, index, try = 0;
+
+    current_index = threads->current_thread_index % threads->threads_nr;
+    index = current_index;
+
+    do {
+        index = index % threads->threads_nr;
+        if (!ptr_ring_full(&threads->per_thread_data[index].request_ring)) {
+            threads->current_thread_index = index;
+            return index;
+        }
+
+        if (++try > range) {
+            return -1;
+        }
+    } while (++index != current_index);
+
+    return -1;
+}
+
+ThreadRequest *threads_submit_request_prepare(Threads *threads)
+{
+    ThreadRequest *request;
+    int index;
+
+    /* seek a free one in all threads. */
+    index = find_free_thread(threads, threads->threads_nr);
+    if (index < 0) {
+        return NULL;
+    }
+
+    /* try to get the request from the list */
+    request = get_and_remove_first_free_request(threads);
+    if (request) {
+        goto got_request;
+    }
+
+    /* get the request already been handled by the threads */
+    request = ptr_ring_consume(&threads->request_done_ring);
+    if (request) {
+        threads->thread_request_done(request);
+        goto got_request;
+    }
+
+    return NULL;
+
+got_request:
+    request->thread_index = index;
+    return request;
+}
+
+void threads_submit_request_commit(Threads *threads, ThreadRequest *request)
+{
+    int ret, index = request->thread_index;
+    ThreadLocal *thread_local = &threads->per_thread_data[index];
+
+    ret = ptr_ring_produce(&thread_local->request_ring, request);
+
+    /*
+     * we have detected that the thread's ring is not full in
+     * threads_submit_request_prepare(), there should be free
+     * room in the ring
+     */
+    assert(!ret);
+    /* new request arrived, notify the thread */
+    qemu_event_set(&thread_local->ev);
+
+    /* we have used this entry, search from the next one. */
+    threads->current_thread_index = ++index;
+}
+
+void threads_wait_done(Threads *threads)
+{
+    ThreadRequest *requests[DEFAULT_THREAD_RING_SIZE * 2];
+    int nr;
+
+retry:
+    nr = ptr_ring_consume_batched(&threads->request_done_ring,
+                                  (void **)requests, ARRAY_SIZE(requests));
+    while (nr--) {
+        threads->thread_request_done(requests[nr]);
+       add_free_request(threads, requests[nr]);
+    }
+
+    if (threads->free_requests_nr != threads->total_requests) {
+        cpu_relax();
+        goto retry;
+    }
+}
-- 
2.14.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [PATCH 3/4] migration: use lockless Multithread model for compression
  2018-10-16 11:10 ` [Qemu-devel] " guangrong.xiao
@ 2018-10-16 11:10   ` guangrong.xiao
  -1 siblings, 0 replies; 28+ messages in thread
From: guangrong.xiao @ 2018-10-16 11:10 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: kvm, quintela, Xiao Guangrong, qemu-devel, peterx, dgilbert,
	wei.w.wang, jiang.biao2

From: Xiao Guangrong <xiaoguangrong@tencent.com>

Adapt the compression code to the lockless multithread model

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 migration/ram.c | 312 +++++++++++++++++++++-----------------------------------
 1 file changed, 115 insertions(+), 197 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index bc38d98cc3..2356bc255c 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -57,6 +57,7 @@
 #include "qemu/uuid.h"
 #include "savevm.h"
 #include "qemu/iov.h"
+#include "qemu/lockless-threads.h"
 
 /***********************************************************/
 /* ram save/restore */
@@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus;
 
 CompressionStats compression_counters;
 
-struct CompressParam {
-    bool done;
-    bool quit;
-    bool zero_page;
-    QEMUFile *file;
-    QemuMutex mutex;
-    QemuCond cond;
-    RAMBlock *block;
-    ram_addr_t offset;
-
-    /* internally used fields */
-    z_stream stream;
-    uint8_t *originbuf;
-};
-typedef struct CompressParam CompressParam;
-
 struct DecompressParam {
     bool done;
     bool quit;
@@ -377,15 +362,6 @@ struct DecompressParam {
 };
 typedef struct DecompressParam DecompressParam;
 
-static CompressParam *comp_param;
-static QemuThread *compress_threads;
-/* comp_done_cond is used to wake up the migration thread when
- * one of the compression threads has finished the compression.
- * comp_done_lock is used to co-work with comp_done_cond.
- */
-static QemuMutex comp_done_lock;
-static QemuCond comp_done_cond;
-/* The empty QEMUFileOps will be used by file in CompressParam */
 static const QEMUFileOps empty_ops = { };
 
 static QEMUFile *decomp_file;
@@ -394,125 +370,6 @@ static QemuThread *decompress_threads;
 static QemuMutex decomp_done_lock;
 static QemuCond decomp_done_cond;
 
-static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
-                                 ram_addr_t offset, uint8_t *source_buf);
-
-static void *do_data_compress(void *opaque)
-{
-    CompressParam *param = opaque;
-    RAMBlock *block;
-    ram_addr_t offset;
-    bool zero_page;
-
-    qemu_mutex_lock(&param->mutex);
-    while (!param->quit) {
-        if (param->block) {
-            block = param->block;
-            offset = param->offset;
-            param->block = NULL;
-            qemu_mutex_unlock(&param->mutex);
-
-            zero_page = do_compress_ram_page(param->file, &param->stream,
-                                             block, offset, param->originbuf);
-
-            qemu_mutex_lock(&comp_done_lock);
-            param->done = true;
-            param->zero_page = zero_page;
-            qemu_cond_signal(&comp_done_cond);
-            qemu_mutex_unlock(&comp_done_lock);
-
-            qemu_mutex_lock(&param->mutex);
-        } else {
-            qemu_cond_wait(&param->cond, &param->mutex);
-        }
-    }
-    qemu_mutex_unlock(&param->mutex);
-
-    return NULL;
-}
-
-static void compress_threads_save_cleanup(void)
-{
-    int i, thread_count;
-
-    if (!migrate_use_compression() || !comp_param) {
-        return;
-    }
-
-    thread_count = migrate_compress_threads();
-    for (i = 0; i < thread_count; i++) {
-        /*
-         * we use it as a indicator which shows if the thread is
-         * properly init'd or not
-         */
-        if (!comp_param[i].file) {
-            break;
-        }
-
-        qemu_mutex_lock(&comp_param[i].mutex);
-        comp_param[i].quit = true;
-        qemu_cond_signal(&comp_param[i].cond);
-        qemu_mutex_unlock(&comp_param[i].mutex);
-
-        qemu_thread_join(compress_threads + i);
-        qemu_mutex_destroy(&comp_param[i].mutex);
-        qemu_cond_destroy(&comp_param[i].cond);
-        deflateEnd(&comp_param[i].stream);
-        g_free(comp_param[i].originbuf);
-        qemu_fclose(comp_param[i].file);
-        comp_param[i].file = NULL;
-    }
-    qemu_mutex_destroy(&comp_done_lock);
-    qemu_cond_destroy(&comp_done_cond);
-    g_free(compress_threads);
-    g_free(comp_param);
-    compress_threads = NULL;
-    comp_param = NULL;
-}
-
-static int compress_threads_save_setup(void)
-{
-    int i, thread_count;
-
-    if (!migrate_use_compression()) {
-        return 0;
-    }
-    thread_count = migrate_compress_threads();
-    compress_threads = g_new0(QemuThread, thread_count);
-    comp_param = g_new0(CompressParam, thread_count);
-    qemu_cond_init(&comp_done_cond);
-    qemu_mutex_init(&comp_done_lock);
-    for (i = 0; i < thread_count; i++) {
-        comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
-        if (!comp_param[i].originbuf) {
-            goto exit;
-        }
-
-        if (deflateInit(&comp_param[i].stream,
-                        migrate_compress_level()) != Z_OK) {
-            g_free(comp_param[i].originbuf);
-            goto exit;
-        }
-
-        /* comp_param[i].file is just used as a dummy buffer to save data,
-         * set its ops to empty.
-         */
-        comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
-        comp_param[i].done = true;
-        comp_param[i].quit = false;
-        qemu_mutex_init(&comp_param[i].mutex);
-        qemu_cond_init(&comp_param[i].cond);
-        qemu_thread_create(compress_threads + i, "compress",
-                           do_data_compress, comp_param + i,
-                           QEMU_THREAD_JOINABLE);
-    }
-    return 0;
-
-exit:
-    compress_threads_save_cleanup();
-    return -1;
-}
-
 /* Multiple fd's */
 
 #define MULTIFD_MAGIC 0x11223344U
@@ -1909,12 +1766,27 @@ exit:
     return zero_page;
 }
 
+struct CompressData {
+    /* filled by migration thread.*/
+    RAMBlock *block;
+    ram_addr_t offset;
+
+    /* filled by compress thread. */
+    QEMUFile *file;
+    z_stream stream;
+    uint8_t *originbuf;
+    bool zero_page;
+
+    ThreadRequest request;
+};
+typedef struct CompressData CompressData;
+
 static void
-update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
+update_compress_thread_counts(CompressData *cd, int bytes_xmit)
 {
     ram_counters.transferred += bytes_xmit;
 
-    if (param->zero_page) {
+    if (cd->zero_page) {
         ram_counters.duplicate++;
         return;
     }
@@ -1924,81 +1796,127 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
     compression_counters.pages++;
 }
 
+static ThreadRequest *compress_thread_data_init(void)
+{
+    CompressData *cd = g_new0(CompressData, 1);
+
+    cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE);
+    if (!cd->originbuf) {
+        goto exit;
+    }
+
+    if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) {
+        g_free(cd->originbuf);
+        goto exit;
+    }
+
+    cd->file = qemu_fopen_ops(NULL, &empty_ops);
+    return &cd->request;
+
+exit:
+    g_free(cd);
+    return NULL;
+}
+
+static void compress_thread_data_fini(ThreadRequest *request)
+{
+    CompressData *cd = container_of(request, CompressData, request);
+
+    qemu_fclose(cd->file);
+    deflateEnd(&cd->stream);
+    g_free(cd->originbuf);
+    g_free(cd);
+}
+
+static void compress_thread_data_handler(ThreadRequest *request)
+{
+    CompressData *cd = container_of(request, CompressData, request);
+
+    /*
+     * if compression fails, it will be indicated by
+     * migrate_get_current()->to_dst_file.
+     */
+    cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block,
+                                         cd->offset, cd->originbuf);
+}
+
+static void compress_thread_data_done(ThreadRequest *request)
+{
+    CompressData *cd = container_of(request, CompressData, request);
+    RAMState *rs = ram_state;
+    int bytes_xmit;
+
+    bytes_xmit = qemu_put_qemu_file(rs->f, cd->file);
+    update_compress_thread_counts(cd, bytes_xmit);
+}
+
+static Threads *compress_threads;
+
 static bool save_page_use_compression(RAMState *rs);
 
 static void flush_compressed_data(RAMState *rs)
 {
-    int idx, len, thread_count;
-
     if (!save_page_use_compression(rs)) {
         return;
     }
-    thread_count = migrate_compress_threads();
 
-    qemu_mutex_lock(&comp_done_lock);
-    for (idx = 0; idx < thread_count; idx++) {
-        while (!comp_param[idx].done) {
-            qemu_cond_wait(&comp_done_cond, &comp_done_lock);
-        }
-    }
-    qemu_mutex_unlock(&comp_done_lock);
+    threads_wait_done(compress_threads);
+}
 
-    for (idx = 0; idx < thread_count; idx++) {
-        qemu_mutex_lock(&comp_param[idx].mutex);
-        if (!comp_param[idx].quit) {
-            len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
-            /*
-             * it's safe to fetch zero_page without holding comp_done_lock
-             * as there is no further request submitted to the thread,
-             * i.e, the thread should be waiting for a request at this point.
-             */
-            update_compress_thread_counts(&comp_param[idx], len);
-        }
-        qemu_mutex_unlock(&comp_param[idx].mutex);
+static void compress_threads_save_cleanup(void)
+{
+    if (!compress_threads) {
+        return;
     }
+
+    threads_destroy(compress_threads);
+    compress_threads = NULL;
 }
 
-static inline void set_compress_params(CompressParam *param, RAMBlock *block,
-                                       ram_addr_t offset)
+static int compress_threads_save_setup(void)
 {
-    param->block = block;
-    param->offset = offset;
+    if (!migrate_use_compression()) {
+        return 0;
+    }
+
+    compress_threads = threads_create(migrate_compress_threads(),
+                                      "compress",
+                                      DEFAULT_THREAD_RING_SIZE,
+                                      compress_thread_data_init,
+                                      compress_thread_data_fini,
+                                      compress_thread_data_handler,
+                                      compress_thread_data_done);
+    return compress_threads ? 0 : -1;
 }
 
 static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
                                            ram_addr_t offset)
 {
-    int idx, thread_count, bytes_xmit = -1, pages = -1;
+    CompressData *cd;
+    ThreadRequest *request;
     bool wait = migrate_compress_wait_thread();
 
-    thread_count = migrate_compress_threads();
-    qemu_mutex_lock(&comp_done_lock);
 retry:
-    for (idx = 0; idx < thread_count; idx++) {
-        if (comp_param[idx].done) {
-            comp_param[idx].done = false;
-            bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
-            qemu_mutex_lock(&comp_param[idx].mutex);
-            set_compress_params(&comp_param[idx], block, offset);
-            qemu_cond_signal(&comp_param[idx].cond);
-            qemu_mutex_unlock(&comp_param[idx].mutex);
-            pages = 1;
-            update_compress_thread_counts(&comp_param[idx], bytes_xmit);
-            break;
+    request = threads_submit_request_prepare(compress_threads);
+    if (!request) {
+        /*
+         * wait for the free thread if the user specifies
+         * 'compress-wait-thread', otherwise we will post
+         *  the page out in the main thread as normal page.
+         */
+        if (wait) {
+            cpu_relax();
+            goto retry;
         }
-    }
 
-    /*
-     * wait for the free thread if the user specifies 'compress-wait-thread',
-     * otherwise we will post the page out in the main thread as normal page.
-     */
-    if (pages < 0 && wait) {
-        qemu_cond_wait(&comp_done_cond, &comp_done_lock);
-        goto retry;
-    }
-    qemu_mutex_unlock(&comp_done_lock);
+        return -1;
+     }
 
-    return pages;
+    cd = container_of(request, CompressData, request);
+    cd->block = block;
+    cd->offset = offset;
+    threads_submit_request_commit(compress_threads, request);
+    return 1;
 }
 
 /**
-- 
2.14.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH 3/4] migration: use lockless Multithread model for compression
@ 2018-10-16 11:10   ` guangrong.xiao
  0 siblings, 0 replies; 28+ messages in thread
From: guangrong.xiao @ 2018-10-16 11:10 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: qemu-devel, kvm, dgilbert, peterx, wei.w.wang, jiang.biao2,
	eblake, quintela, Xiao Guangrong

From: Xiao Guangrong <xiaoguangrong@tencent.com>

Adapt the compression code to the lockless multithread model

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 migration/ram.c | 312 +++++++++++++++++++++-----------------------------------
 1 file changed, 115 insertions(+), 197 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index bc38d98cc3..2356bc255c 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -57,6 +57,7 @@
 #include "qemu/uuid.h"
 #include "savevm.h"
 #include "qemu/iov.h"
+#include "qemu/lockless-threads.h"
 
 /***********************************************************/
 /* ram save/restore */
@@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus;
 
 CompressionStats compression_counters;
 
-struct CompressParam {
-    bool done;
-    bool quit;
-    bool zero_page;
-    QEMUFile *file;
-    QemuMutex mutex;
-    QemuCond cond;
-    RAMBlock *block;
-    ram_addr_t offset;
-
-    /* internally used fields */
-    z_stream stream;
-    uint8_t *originbuf;
-};
-typedef struct CompressParam CompressParam;
-
 struct DecompressParam {
     bool done;
     bool quit;
@@ -377,15 +362,6 @@ struct DecompressParam {
 };
 typedef struct DecompressParam DecompressParam;
 
-static CompressParam *comp_param;
-static QemuThread *compress_threads;
-/* comp_done_cond is used to wake up the migration thread when
- * one of the compression threads has finished the compression.
- * comp_done_lock is used to co-work with comp_done_cond.
- */
-static QemuMutex comp_done_lock;
-static QemuCond comp_done_cond;
-/* The empty QEMUFileOps will be used by file in CompressParam */
 static const QEMUFileOps empty_ops = { };
 
 static QEMUFile *decomp_file;
@@ -394,125 +370,6 @@ static QemuThread *decompress_threads;
 static QemuMutex decomp_done_lock;
 static QemuCond decomp_done_cond;
 
-static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
-                                 ram_addr_t offset, uint8_t *source_buf);
-
-static void *do_data_compress(void *opaque)
-{
-    CompressParam *param = opaque;
-    RAMBlock *block;
-    ram_addr_t offset;
-    bool zero_page;
-
-    qemu_mutex_lock(&param->mutex);
-    while (!param->quit) {
-        if (param->block) {
-            block = param->block;
-            offset = param->offset;
-            param->block = NULL;
-            qemu_mutex_unlock(&param->mutex);
-
-            zero_page = do_compress_ram_page(param->file, &param->stream,
-                                             block, offset, param->originbuf);
-
-            qemu_mutex_lock(&comp_done_lock);
-            param->done = true;
-            param->zero_page = zero_page;
-            qemu_cond_signal(&comp_done_cond);
-            qemu_mutex_unlock(&comp_done_lock);
-
-            qemu_mutex_lock(&param->mutex);
-        } else {
-            qemu_cond_wait(&param->cond, &param->mutex);
-        }
-    }
-    qemu_mutex_unlock(&param->mutex);
-
-    return NULL;
-}
-
-static void compress_threads_save_cleanup(void)
-{
-    int i, thread_count;
-
-    if (!migrate_use_compression() || !comp_param) {
-        return;
-    }
-
-    thread_count = migrate_compress_threads();
-    for (i = 0; i < thread_count; i++) {
-        /*
-         * we use it as a indicator which shows if the thread is
-         * properly init'd or not
-         */
-        if (!comp_param[i].file) {
-            break;
-        }
-
-        qemu_mutex_lock(&comp_param[i].mutex);
-        comp_param[i].quit = true;
-        qemu_cond_signal(&comp_param[i].cond);
-        qemu_mutex_unlock(&comp_param[i].mutex);
-
-        qemu_thread_join(compress_threads + i);
-        qemu_mutex_destroy(&comp_param[i].mutex);
-        qemu_cond_destroy(&comp_param[i].cond);
-        deflateEnd(&comp_param[i].stream);
-        g_free(comp_param[i].originbuf);
-        qemu_fclose(comp_param[i].file);
-        comp_param[i].file = NULL;
-    }
-    qemu_mutex_destroy(&comp_done_lock);
-    qemu_cond_destroy(&comp_done_cond);
-    g_free(compress_threads);
-    g_free(comp_param);
-    compress_threads = NULL;
-    comp_param = NULL;
-}
-
-static int compress_threads_save_setup(void)
-{
-    int i, thread_count;
-
-    if (!migrate_use_compression()) {
-        return 0;
-    }
-    thread_count = migrate_compress_threads();
-    compress_threads = g_new0(QemuThread, thread_count);
-    comp_param = g_new0(CompressParam, thread_count);
-    qemu_cond_init(&comp_done_cond);
-    qemu_mutex_init(&comp_done_lock);
-    for (i = 0; i < thread_count; i++) {
-        comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
-        if (!comp_param[i].originbuf) {
-            goto exit;
-        }
-
-        if (deflateInit(&comp_param[i].stream,
-                        migrate_compress_level()) != Z_OK) {
-            g_free(comp_param[i].originbuf);
-            goto exit;
-        }
-
-        /* comp_param[i].file is just used as a dummy buffer to save data,
-         * set its ops to empty.
-         */
-        comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
-        comp_param[i].done = true;
-        comp_param[i].quit = false;
-        qemu_mutex_init(&comp_param[i].mutex);
-        qemu_cond_init(&comp_param[i].cond);
-        qemu_thread_create(compress_threads + i, "compress",
-                           do_data_compress, comp_param + i,
-                           QEMU_THREAD_JOINABLE);
-    }
-    return 0;
-
-exit:
-    compress_threads_save_cleanup();
-    return -1;
-}
-
 /* Multiple fd's */
 
 #define MULTIFD_MAGIC 0x11223344U
@@ -1909,12 +1766,27 @@ exit:
     return zero_page;
 }
 
+struct CompressData {
+    /* filled by migration thread.*/
+    RAMBlock *block;
+    ram_addr_t offset;
+
+    /* filled by compress thread. */
+    QEMUFile *file;
+    z_stream stream;
+    uint8_t *originbuf;
+    bool zero_page;
+
+    ThreadRequest request;
+};
+typedef struct CompressData CompressData;
+
 static void
-update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
+update_compress_thread_counts(CompressData *cd, int bytes_xmit)
 {
     ram_counters.transferred += bytes_xmit;
 
-    if (param->zero_page) {
+    if (cd->zero_page) {
         ram_counters.duplicate++;
         return;
     }
@@ -1924,81 +1796,127 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
     compression_counters.pages++;
 }
 
+static ThreadRequest *compress_thread_data_init(void)
+{
+    CompressData *cd = g_new0(CompressData, 1);
+
+    cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE);
+    if (!cd->originbuf) {
+        goto exit;
+    }
+
+    if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) {
+        g_free(cd->originbuf);
+        goto exit;
+    }
+
+    cd->file = qemu_fopen_ops(NULL, &empty_ops);
+    return &cd->request;
+
+exit:
+    g_free(cd);
+    return NULL;
+}
+
+static void compress_thread_data_fini(ThreadRequest *request)
+{
+    CompressData *cd = container_of(request, CompressData, request);
+
+    qemu_fclose(cd->file);
+    deflateEnd(&cd->stream);
+    g_free(cd->originbuf);
+    g_free(cd);
+}
+
+static void compress_thread_data_handler(ThreadRequest *request)
+{
+    CompressData *cd = container_of(request, CompressData, request);
+
+    /*
+     * if compression fails, it will be indicated by
+     * migrate_get_current()->to_dst_file.
+     */
+    cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block,
+                                         cd->offset, cd->originbuf);
+}
+
+static void compress_thread_data_done(ThreadRequest *request)
+{
+    CompressData *cd = container_of(request, CompressData, request);
+    RAMState *rs = ram_state;
+    int bytes_xmit;
+
+    bytes_xmit = qemu_put_qemu_file(rs->f, cd->file);
+    update_compress_thread_counts(cd, bytes_xmit);
+}
+
+static Threads *compress_threads;
+
 static bool save_page_use_compression(RAMState *rs);
 
 static void flush_compressed_data(RAMState *rs)
 {
-    int idx, len, thread_count;
-
     if (!save_page_use_compression(rs)) {
         return;
     }
-    thread_count = migrate_compress_threads();
 
-    qemu_mutex_lock(&comp_done_lock);
-    for (idx = 0; idx < thread_count; idx++) {
-        while (!comp_param[idx].done) {
-            qemu_cond_wait(&comp_done_cond, &comp_done_lock);
-        }
-    }
-    qemu_mutex_unlock(&comp_done_lock);
+    threads_wait_done(compress_threads);
+}
 
-    for (idx = 0; idx < thread_count; idx++) {
-        qemu_mutex_lock(&comp_param[idx].mutex);
-        if (!comp_param[idx].quit) {
-            len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
-            /*
-             * it's safe to fetch zero_page without holding comp_done_lock
-             * as there is no further request submitted to the thread,
-             * i.e, the thread should be waiting for a request at this point.
-             */
-            update_compress_thread_counts(&comp_param[idx], len);
-        }
-        qemu_mutex_unlock(&comp_param[idx].mutex);
+static void compress_threads_save_cleanup(void)
+{
+    if (!compress_threads) {
+        return;
     }
+
+    threads_destroy(compress_threads);
+    compress_threads = NULL;
 }
 
-static inline void set_compress_params(CompressParam *param, RAMBlock *block,
-                                       ram_addr_t offset)
+static int compress_threads_save_setup(void)
 {
-    param->block = block;
-    param->offset = offset;
+    if (!migrate_use_compression()) {
+        return 0;
+    }
+
+    compress_threads = threads_create(migrate_compress_threads(),
+                                      "compress",
+                                      DEFAULT_THREAD_RING_SIZE,
+                                      compress_thread_data_init,
+                                      compress_thread_data_fini,
+                                      compress_thread_data_handler,
+                                      compress_thread_data_done);
+    return compress_threads ? 0 : -1;
 }
 
 static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
                                            ram_addr_t offset)
 {
-    int idx, thread_count, bytes_xmit = -1, pages = -1;
+    CompressData *cd;
+    ThreadRequest *request;
     bool wait = migrate_compress_wait_thread();
 
-    thread_count = migrate_compress_threads();
-    qemu_mutex_lock(&comp_done_lock);
 retry:
-    for (idx = 0; idx < thread_count; idx++) {
-        if (comp_param[idx].done) {
-            comp_param[idx].done = false;
-            bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
-            qemu_mutex_lock(&comp_param[idx].mutex);
-            set_compress_params(&comp_param[idx], block, offset);
-            qemu_cond_signal(&comp_param[idx].cond);
-            qemu_mutex_unlock(&comp_param[idx].mutex);
-            pages = 1;
-            update_compress_thread_counts(&comp_param[idx], bytes_xmit);
-            break;
+    request = threads_submit_request_prepare(compress_threads);
+    if (!request) {
+        /*
+         * wait for the free thread if the user specifies
+         * 'compress-wait-thread', otherwise we will post
+         *  the page out in the main thread as normal page.
+         */
+        if (wait) {
+            cpu_relax();
+            goto retry;
         }
-    }
 
-    /*
-     * wait for the free thread if the user specifies 'compress-wait-thread',
-     * otherwise we will post the page out in the main thread as normal page.
-     */
-    if (pages < 0 && wait) {
-        qemu_cond_wait(&comp_done_cond, &comp_done_lock);
-        goto retry;
-    }
-    qemu_mutex_unlock(&comp_done_lock);
+        return -1;
+     }
 
-    return pages;
+    cd = container_of(request, CompressData, request);
+    cd->block = block;
+    cd->offset = offset;
+    threads_submit_request_commit(compress_threads, request);
+    return 1;
 }
 
 /**
-- 
2.14.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [PATCH 4/4] migration: use lockless Multithread model for decompression
  2018-10-16 11:10 ` [Qemu-devel] " guangrong.xiao
@ 2018-10-16 11:10   ` guangrong.xiao
  -1 siblings, 0 replies; 28+ messages in thread
From: guangrong.xiao @ 2018-10-16 11:10 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: kvm, quintela, Xiao Guangrong, qemu-devel, peterx, dgilbert,
	wei.w.wang, jiang.biao2

From: Xiao Guangrong <xiaoguangrong@tencent.com>

Adapt the compression code to the lockless multithread model

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 migration/ram.c | 223 ++++++++++++++++++++------------------------------------
 1 file changed, 78 insertions(+), 145 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 2356bc255c..99dc9d1911 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -350,25 +350,9 @@ typedef struct PageSearchStatus PageSearchStatus;
 
 CompressionStats compression_counters;
 
-struct DecompressParam {
-    bool done;
-    bool quit;
-    QemuMutex mutex;
-    QemuCond cond;
-    void *des;
-    uint8_t *compbuf;
-    int len;
-    z_stream stream;
-};
-typedef struct DecompressParam DecompressParam;
-
 static const QEMUFileOps empty_ops = { };
 
 static QEMUFile *decomp_file;
-static DecompressParam *decomp_param;
-static QemuThread *decompress_threads;
-static QemuMutex decomp_done_lock;
-static QemuCond decomp_done_cond;
 
 /* Multiple fd's */
 
@@ -3382,6 +3366,7 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
     }
 }
 
+
 /* return the size after decompression, or negative value on error */
 static int
 qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
@@ -3407,166 +3392,114 @@ qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
     return stream->total_out;
 }
 
-static void *do_data_decompress(void *opaque)
-{
-    DecompressParam *param = opaque;
-    unsigned long pagesize;
-    uint8_t *des;
-    int len, ret;
-
-    qemu_mutex_lock(&param->mutex);
-    while (!param->quit) {
-        if (param->des) {
-            des = param->des;
-            len = param->len;
-            param->des = 0;
-            qemu_mutex_unlock(&param->mutex);
-
-            pagesize = TARGET_PAGE_SIZE;
-
-            ret = qemu_uncompress_data(&param->stream, des, pagesize,
-                                       param->compbuf, len);
-            if (ret < 0 && migrate_get_current()->decompress_error_check) {
-                error_report("decompress data failed");
-                qemu_file_set_error(decomp_file, ret);
-            }
+struct DecompressData {
+    /* filled by migration thread.*/
+    void *des;
+    uint8_t *compbuf;
+    size_t len;
 
-            qemu_mutex_lock(&decomp_done_lock);
-            param->done = true;
-            qemu_cond_signal(&decomp_done_cond);
-            qemu_mutex_unlock(&decomp_done_lock);
+    z_stream stream;
+    ThreadRequest request;
+};
+typedef struct DecompressData DecompressData;
 
-            qemu_mutex_lock(&param->mutex);
-        } else {
-            qemu_cond_wait(&param->cond, &param->mutex);
-        }
+static Threads *decompress_threads;
+
+static ThreadRequest *decompress_thread_data_init(void)
+{
+    DecompressData *dd = g_new0(DecompressData, 1);
+
+    if (inflateInit(&dd->stream) != Z_OK) {
+        g_free(dd);
+        return NULL;
     }
-    qemu_mutex_unlock(&param->mutex);
 
-    return NULL;
+    dd->compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+    return &dd->request;
 }
 
-static int wait_for_decompress_done(void)
+static void decompress_thread_data_fini(ThreadRequest *request)
 {
-    int idx, thread_count;
+    DecompressData *dd = container_of(request, DecompressData, request);
 
-    if (!migrate_use_compression()) {
-        return 0;
-    }
+    inflateEnd(&dd->stream);
+    g_free(dd->compbuf);
+    g_free(dd);
+}
 
-    thread_count = migrate_decompress_threads();
-    qemu_mutex_lock(&decomp_done_lock);
-    for (idx = 0; idx < thread_count; idx++) {
-        while (!decomp_param[idx].done) {
-            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
-        }
+static void decompress_thread_data_handler(ThreadRequest *request)
+{
+    DecompressData *dd = container_of(request, DecompressData, request);
+    unsigned long pagesize = TARGET_PAGE_SIZE;
+    int ret;
+
+    ret = qemu_uncompress_data(&dd->stream, dd->des, pagesize,
+                               dd->compbuf, dd->len);
+    if (ret < 0 && migrate_get_current()->decompress_error_check) {
+        error_report("decompress data failed");
+        qemu_file_set_error(decomp_file, ret);
     }
-    qemu_mutex_unlock(&decomp_done_lock);
-    return qemu_file_get_error(decomp_file);
 }
 
-static void compress_threads_load_cleanup(void)
+static void decompress_thread_data_done(ThreadRequest *data)
 {
-    int i, thread_count;
+}
 
+static int decompress_init(QEMUFile *f)
+{
     if (!migrate_use_compression()) {
-        return;
+        return 0;
     }
-    thread_count = migrate_decompress_threads();
-    for (i = 0; i < thread_count; i++) {
-        /*
-         * we use it as a indicator which shows if the thread is
-         * properly init'd or not
-         */
-        if (!decomp_param[i].compbuf) {
-            break;
-        }
 
-        qemu_mutex_lock(&decomp_param[i].mutex);
-        decomp_param[i].quit = true;
-        qemu_cond_signal(&decomp_param[i].cond);
-        qemu_mutex_unlock(&decomp_param[i].mutex);
-    }
-    for (i = 0; i < thread_count; i++) {
-        if (!decomp_param[i].compbuf) {
-            break;
-        }
+    decomp_file = f;
+    decompress_threads = threads_create(migrate_decompress_threads(),
+                             "decompress",
+                             DEFAULT_THREAD_RING_SIZE,
+                             decompress_thread_data_init,
+                             decompress_thread_data_fini,
+                             decompress_thread_data_handler,
+                             decompress_thread_data_done);
+    return decompress_threads ? 0 : -1;
+}
 
-        qemu_thread_join(decompress_threads + i);
-        qemu_mutex_destroy(&decomp_param[i].mutex);
-        qemu_cond_destroy(&decomp_param[i].cond);
-        inflateEnd(&decomp_param[i].stream);
-        g_free(decomp_param[i].compbuf);
-        decomp_param[i].compbuf = NULL;
+static void decompress_fini(void)
+{
+    if (!decompress_threads) {
+        return;
     }
-    g_free(decompress_threads);
-    g_free(decomp_param);
+
+    threads_destroy(decompress_threads);
     decompress_threads = NULL;
-    decomp_param = NULL;
     decomp_file = NULL;
 }
 
-static int compress_threads_load_setup(QEMUFile *f)
+static int flush_decompressed_data(void)
 {
-    int i, thread_count;
-
     if (!migrate_use_compression()) {
         return 0;
     }
 
-    thread_count = migrate_decompress_threads();
-    decompress_threads = g_new0(QemuThread, thread_count);
-    decomp_param = g_new0(DecompressParam, thread_count);
-    qemu_mutex_init(&decomp_done_lock);
-    qemu_cond_init(&decomp_done_cond);
-    decomp_file = f;
-    for (i = 0; i < thread_count; i++) {
-        if (inflateInit(&decomp_param[i].stream) != Z_OK) {
-            goto exit;
-        }
-
-        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
-        qemu_mutex_init(&decomp_param[i].mutex);
-        qemu_cond_init(&decomp_param[i].cond);
-        decomp_param[i].done = true;
-        decomp_param[i].quit = false;
-        qemu_thread_create(decompress_threads + i, "decompress",
-                           do_data_decompress, decomp_param + i,
-                           QEMU_THREAD_JOINABLE);
-    }
-    return 0;
-exit:
-    compress_threads_load_cleanup();
-    return -1;
+    threads_wait_done(decompress_threads);
+    return qemu_file_get_error(decomp_file);
 }
 
 static void decompress_data_with_multi_threads(QEMUFile *f,
-                                               void *host, int len)
+                                               void *host, size_t len)
 {
-    int idx, thread_count;
+    ThreadRequest *request;
+    DecompressData *dd;
 
-    thread_count = migrate_decompress_threads();
-    qemu_mutex_lock(&decomp_done_lock);
-    while (true) {
-        for (idx = 0; idx < thread_count; idx++) {
-            if (decomp_param[idx].done) {
-                decomp_param[idx].done = false;
-                qemu_mutex_lock(&decomp_param[idx].mutex);
-                qemu_get_buffer(f, decomp_param[idx].compbuf, len);
-                decomp_param[idx].des = host;
-                decomp_param[idx].len = len;
-                qemu_cond_signal(&decomp_param[idx].cond);
-                qemu_mutex_unlock(&decomp_param[idx].mutex);
-                break;
-            }
-        }
-        if (idx < thread_count) {
-            break;
-        } else {
-            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
-        }
+retry:
+    request = threads_submit_request_prepare(decompress_threads);
+    if (!request) {
+        goto retry;
     }
-    qemu_mutex_unlock(&decomp_done_lock);
+
+    dd = container_of(request, DecompressData, request);
+    dd->des = host;
+    dd->len = len;
+    qemu_get_buffer(f, dd->compbuf, len);
+    threads_submit_request_commit(decompress_threads, request);
 }
 
 /**
@@ -3579,7 +3512,7 @@ static void decompress_data_with_multi_threads(QEMUFile *f,
  */
 static int ram_load_setup(QEMUFile *f, void *opaque)
 {
-    if (compress_threads_load_setup(f)) {
+    if (decompress_init(f)) {
         return -1;
     }
 
@@ -3599,7 +3532,7 @@ static int ram_load_cleanup(void *opaque)
     }
 
     xbzrle_load_cleanup();
-    compress_threads_load_cleanup();
+    decompress_fini();
 
     RAMBLOCK_FOREACH_MIGRATABLE(rb) {
         g_free(rb->receivedmap);
@@ -3949,7 +3882,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
         }
     }
 
-    ret |= wait_for_decompress_done();
+    ret |= flush_decompressed_data();
     rcu_read_unlock();
     trace_ram_load_complete(ret, seq_iter);
     return ret;
-- 
2.14.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH 4/4] migration: use lockless Multithread model for decompression
@ 2018-10-16 11:10   ` guangrong.xiao
  0 siblings, 0 replies; 28+ messages in thread
From: guangrong.xiao @ 2018-10-16 11:10 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: qemu-devel, kvm, dgilbert, peterx, wei.w.wang, jiang.biao2,
	eblake, quintela, Xiao Guangrong

From: Xiao Guangrong <xiaoguangrong@tencent.com>

Adapt the compression code to the lockless multithread model

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 migration/ram.c | 223 ++++++++++++++++++++------------------------------------
 1 file changed, 78 insertions(+), 145 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 2356bc255c..99dc9d1911 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -350,25 +350,9 @@ typedef struct PageSearchStatus PageSearchStatus;
 
 CompressionStats compression_counters;
 
-struct DecompressParam {
-    bool done;
-    bool quit;
-    QemuMutex mutex;
-    QemuCond cond;
-    void *des;
-    uint8_t *compbuf;
-    int len;
-    z_stream stream;
-};
-typedef struct DecompressParam DecompressParam;
-
 static const QEMUFileOps empty_ops = { };
 
 static QEMUFile *decomp_file;
-static DecompressParam *decomp_param;
-static QemuThread *decompress_threads;
-static QemuMutex decomp_done_lock;
-static QemuCond decomp_done_cond;
 
 /* Multiple fd's */
 
@@ -3382,6 +3366,7 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
     }
 }
 
+
 /* return the size after decompression, or negative value on error */
 static int
 qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
@@ -3407,166 +3392,114 @@ qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
     return stream->total_out;
 }
 
-static void *do_data_decompress(void *opaque)
-{
-    DecompressParam *param = opaque;
-    unsigned long pagesize;
-    uint8_t *des;
-    int len, ret;
-
-    qemu_mutex_lock(&param->mutex);
-    while (!param->quit) {
-        if (param->des) {
-            des = param->des;
-            len = param->len;
-            param->des = 0;
-            qemu_mutex_unlock(&param->mutex);
-
-            pagesize = TARGET_PAGE_SIZE;
-
-            ret = qemu_uncompress_data(&param->stream, des, pagesize,
-                                       param->compbuf, len);
-            if (ret < 0 && migrate_get_current()->decompress_error_check) {
-                error_report("decompress data failed");
-                qemu_file_set_error(decomp_file, ret);
-            }
+struct DecompressData {
+    /* filled by migration thread.*/
+    void *des;
+    uint8_t *compbuf;
+    size_t len;
 
-            qemu_mutex_lock(&decomp_done_lock);
-            param->done = true;
-            qemu_cond_signal(&decomp_done_cond);
-            qemu_mutex_unlock(&decomp_done_lock);
+    z_stream stream;
+    ThreadRequest request;
+};
+typedef struct DecompressData DecompressData;
 
-            qemu_mutex_lock(&param->mutex);
-        } else {
-            qemu_cond_wait(&param->cond, &param->mutex);
-        }
+static Threads *decompress_threads;
+
+static ThreadRequest *decompress_thread_data_init(void)
+{
+    DecompressData *dd = g_new0(DecompressData, 1);
+
+    if (inflateInit(&dd->stream) != Z_OK) {
+        g_free(dd);
+        return NULL;
     }
-    qemu_mutex_unlock(&param->mutex);
 
-    return NULL;
+    dd->compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+    return &dd->request;
 }
 
-static int wait_for_decompress_done(void)
+static void decompress_thread_data_fini(ThreadRequest *request)
 {
-    int idx, thread_count;
+    DecompressData *dd = container_of(request, DecompressData, request);
 
-    if (!migrate_use_compression()) {
-        return 0;
-    }
+    inflateEnd(&dd->stream);
+    g_free(dd->compbuf);
+    g_free(dd);
+}
 
-    thread_count = migrate_decompress_threads();
-    qemu_mutex_lock(&decomp_done_lock);
-    for (idx = 0; idx < thread_count; idx++) {
-        while (!decomp_param[idx].done) {
-            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
-        }
+static void decompress_thread_data_handler(ThreadRequest *request)
+{
+    DecompressData *dd = container_of(request, DecompressData, request);
+    unsigned long pagesize = TARGET_PAGE_SIZE;
+    int ret;
+
+    ret = qemu_uncompress_data(&dd->stream, dd->des, pagesize,
+                               dd->compbuf, dd->len);
+    if (ret < 0 && migrate_get_current()->decompress_error_check) {
+        error_report("decompress data failed");
+        qemu_file_set_error(decomp_file, ret);
     }
-    qemu_mutex_unlock(&decomp_done_lock);
-    return qemu_file_get_error(decomp_file);
 }
 
-static void compress_threads_load_cleanup(void)
+static void decompress_thread_data_done(ThreadRequest *data)
 {
-    int i, thread_count;
+}
 
+static int decompress_init(QEMUFile *f)
+{
     if (!migrate_use_compression()) {
-        return;
+        return 0;
     }
-    thread_count = migrate_decompress_threads();
-    for (i = 0; i < thread_count; i++) {
-        /*
-         * we use it as a indicator which shows if the thread is
-         * properly init'd or not
-         */
-        if (!decomp_param[i].compbuf) {
-            break;
-        }
 
-        qemu_mutex_lock(&decomp_param[i].mutex);
-        decomp_param[i].quit = true;
-        qemu_cond_signal(&decomp_param[i].cond);
-        qemu_mutex_unlock(&decomp_param[i].mutex);
-    }
-    for (i = 0; i < thread_count; i++) {
-        if (!decomp_param[i].compbuf) {
-            break;
-        }
+    decomp_file = f;
+    decompress_threads = threads_create(migrate_decompress_threads(),
+                             "decompress",
+                             DEFAULT_THREAD_RING_SIZE,
+                             decompress_thread_data_init,
+                             decompress_thread_data_fini,
+                             decompress_thread_data_handler,
+                             decompress_thread_data_done);
+    return decompress_threads ? 0 : -1;
+}
 
-        qemu_thread_join(decompress_threads + i);
-        qemu_mutex_destroy(&decomp_param[i].mutex);
-        qemu_cond_destroy(&decomp_param[i].cond);
-        inflateEnd(&decomp_param[i].stream);
-        g_free(decomp_param[i].compbuf);
-        decomp_param[i].compbuf = NULL;
+static void decompress_fini(void)
+{
+    if (!decompress_threads) {
+        return;
     }
-    g_free(decompress_threads);
-    g_free(decomp_param);
+
+    threads_destroy(decompress_threads);
     decompress_threads = NULL;
-    decomp_param = NULL;
     decomp_file = NULL;
 }
 
-static int compress_threads_load_setup(QEMUFile *f)
+static int flush_decompressed_data(void)
 {
-    int i, thread_count;
-
     if (!migrate_use_compression()) {
         return 0;
     }
 
-    thread_count = migrate_decompress_threads();
-    decompress_threads = g_new0(QemuThread, thread_count);
-    decomp_param = g_new0(DecompressParam, thread_count);
-    qemu_mutex_init(&decomp_done_lock);
-    qemu_cond_init(&decomp_done_cond);
-    decomp_file = f;
-    for (i = 0; i < thread_count; i++) {
-        if (inflateInit(&decomp_param[i].stream) != Z_OK) {
-            goto exit;
-        }
-
-        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
-        qemu_mutex_init(&decomp_param[i].mutex);
-        qemu_cond_init(&decomp_param[i].cond);
-        decomp_param[i].done = true;
-        decomp_param[i].quit = false;
-        qemu_thread_create(decompress_threads + i, "decompress",
-                           do_data_decompress, decomp_param + i,
-                           QEMU_THREAD_JOINABLE);
-    }
-    return 0;
-exit:
-    compress_threads_load_cleanup();
-    return -1;
+    threads_wait_done(decompress_threads);
+    return qemu_file_get_error(decomp_file);
 }
 
 static void decompress_data_with_multi_threads(QEMUFile *f,
-                                               void *host, int len)
+                                               void *host, size_t len)
 {
-    int idx, thread_count;
+    ThreadRequest *request;
+    DecompressData *dd;
 
-    thread_count = migrate_decompress_threads();
-    qemu_mutex_lock(&decomp_done_lock);
-    while (true) {
-        for (idx = 0; idx < thread_count; idx++) {
-            if (decomp_param[idx].done) {
-                decomp_param[idx].done = false;
-                qemu_mutex_lock(&decomp_param[idx].mutex);
-                qemu_get_buffer(f, decomp_param[idx].compbuf, len);
-                decomp_param[idx].des = host;
-                decomp_param[idx].len = len;
-                qemu_cond_signal(&decomp_param[idx].cond);
-                qemu_mutex_unlock(&decomp_param[idx].mutex);
-                break;
-            }
-        }
-        if (idx < thread_count) {
-            break;
-        } else {
-            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
-        }
+retry:
+    request = threads_submit_request_prepare(decompress_threads);
+    if (!request) {
+        goto retry;
     }
-    qemu_mutex_unlock(&decomp_done_lock);
+
+    dd = container_of(request, DecompressData, request);
+    dd->des = host;
+    dd->len = len;
+    qemu_get_buffer(f, dd->compbuf, len);
+    threads_submit_request_commit(decompress_threads, request);
 }
 
 /**
@@ -3579,7 +3512,7 @@ static void decompress_data_with_multi_threads(QEMUFile *f,
  */
 static int ram_load_setup(QEMUFile *f, void *opaque)
 {
-    if (compress_threads_load_setup(f)) {
+    if (decompress_init(f)) {
         return -1;
     }
 
@@ -3599,7 +3532,7 @@ static int ram_load_cleanup(void *opaque)
     }
 
     xbzrle_load_cleanup();
-    compress_threads_load_cleanup();
+    decompress_fini();
 
     RAMBLOCK_FOREACH_MIGRATABLE(rb) {
         g_free(rb->receivedmap);
@@ -3949,7 +3882,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
         }
     }
 
-    ret |= wait_for_decompress_done();
+    ret |= flush_decompressed_data();
     rcu_read_unlock();
     trace_ram_load_complete(ret, seq_iter);
     return ret;
-- 
2.14.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* Re: [PATCH 1/4] ptr_ring: port ptr_ring from linux kernel to QEMU
  2018-10-16 11:10   ` [Qemu-devel] " guangrong.xiao
@ 2018-10-16 16:40     ` Emilio G. Cota
  -1 siblings, 0 replies; 28+ messages in thread
From: Emilio G. Cota @ 2018-10-16 16:40 UTC (permalink / raw)
  To: guangrong.xiao
  Cc: kvm, quintela, mtosatti, Xiao Guangrong, qemu-devel, peterx,
	dgilbert, wei.w.wang, mst, jiang.biao2, pbonzini

On Tue, Oct 16, 2018 at 19:10:03 +0800, guangrong.xiao@gmail.com wrote:
(snip)
> diff --git a/include/qemu/ptr_ring.h b/include/qemu/ptr_ring.h
> new file mode 100644
> index 0000000000..d8266d45f6
> --- /dev/null
> +++ b/include/qemu/ptr_ring.h
> @@ -0,0 +1,235 @@
(snip)
> +#define SMP_CACHE_BYTES      64
> +#define ____cacheline_aligned_in_smp \
> +        __attribute__((__aligned__(SMP_CACHE_BYTES)))

You could use QEMU_ALIGNED() here.

> +
> +#define WRITE_ONCE(ptr, val) \
> +    (*((volatile typeof(ptr) *)(&(ptr))) = (val))
> +#define READ_ONCE(ptr) (*((volatile typeof(ptr) *)(&(ptr))))

Why not atomic_read/set, like in the rest of the QEMU code base?

Furthermore, READ_ONCE in the kernel implies smp_read_barrier_depends,
whereas here you're not bringing that in. That means that your
barrier pairing, e.g.

> +    /* Pairs with READ_ONCE in ptr_ring_consume. */
> +    smp_wmb();

is incorrect for Alpha.

Thanks,

		E.

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 1/4] ptr_ring: port ptr_ring from linux kernel to QEMU
@ 2018-10-16 16:40     ` Emilio G. Cota
  0 siblings, 0 replies; 28+ messages in thread
From: Emilio G. Cota @ 2018-10-16 16:40 UTC (permalink / raw)
  To: guangrong.xiao
  Cc: pbonzini, mst, mtosatti, kvm, quintela, Xiao Guangrong,
	qemu-devel, peterx, dgilbert, wei.w.wang, jiang.biao2

On Tue, Oct 16, 2018 at 19:10:03 +0800, guangrong.xiao@gmail.com wrote:
(snip)
> diff --git a/include/qemu/ptr_ring.h b/include/qemu/ptr_ring.h
> new file mode 100644
> index 0000000000..d8266d45f6
> --- /dev/null
> +++ b/include/qemu/ptr_ring.h
> @@ -0,0 +1,235 @@
(snip)
> +#define SMP_CACHE_BYTES      64
> +#define ____cacheline_aligned_in_smp \
> +        __attribute__((__aligned__(SMP_CACHE_BYTES)))

You could use QEMU_ALIGNED() here.

> +
> +#define WRITE_ONCE(ptr, val) \
> +    (*((volatile typeof(ptr) *)(&(ptr))) = (val))
> +#define READ_ONCE(ptr) (*((volatile typeof(ptr) *)(&(ptr))))

Why not atomic_read/set, like in the rest of the QEMU code base?

Furthermore, READ_ONCE in the kernel implies smp_read_barrier_depends,
whereas here you're not bringing that in. That means that your
barrier pairing, e.g.

> +    /* Pairs with READ_ONCE in ptr_ring_consume. */
> +    smp_wmb();

is incorrect for Alpha.

Thanks,

		E.

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 1/4] ptr_ring: port ptr_ring from linux kernel to QEMU
  2018-10-16 16:40     ` [Qemu-devel] " Emilio G. Cota
@ 2018-10-17  8:14       ` Paolo Bonzini
  -1 siblings, 0 replies; 28+ messages in thread
From: Paolo Bonzini @ 2018-10-17  8:14 UTC (permalink / raw)
  To: Emilio G. Cota, guangrong.xiao
  Cc: kvm, quintela, mtosatti, Xiao Guangrong, qemu-devel, peterx,
	dgilbert, wei.w.wang, mst, jiang.biao2

On 16/10/2018 18:40, Emilio G. Cota wrote:
>> +#define SMP_CACHE_BYTES      64
>> +#define ____cacheline_aligned_in_smp \
>> +        __attribute__((__aligned__(SMP_CACHE_BYTES)))
> You could use QEMU_ALIGNED() here.
> 
>> +
>> +#define WRITE_ONCE(ptr, val) \
>> +    (*((volatile typeof(ptr) *)(&(ptr))) = (val))
>> +#define READ_ONCE(ptr) (*((volatile typeof(ptr) *)(&(ptr))))
> Why not atomic_read/set, like in the rest of the QEMU code base?

Or even atomic_rcu_read/atomic_rcu_set, which includes the necessary
barriers.

Also, please do not use __ identifiers in QEMU code.
____cacheline_aligned_in_smp can become just QEMU_ALIGNED(SMP_CACHE_BYTES).

Paolo

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 1/4] ptr_ring: port ptr_ring from linux kernel to QEMU
@ 2018-10-17  8:14       ` Paolo Bonzini
  0 siblings, 0 replies; 28+ messages in thread
From: Paolo Bonzini @ 2018-10-17  8:14 UTC (permalink / raw)
  To: Emilio G. Cota, guangrong.xiao
  Cc: mst, mtosatti, kvm, quintela, Xiao Guangrong, qemu-devel, peterx,
	dgilbert, wei.w.wang, jiang.biao2

On 16/10/2018 18:40, Emilio G. Cota wrote:
>> +#define SMP_CACHE_BYTES      64
>> +#define ____cacheline_aligned_in_smp \
>> +        __attribute__((__aligned__(SMP_CACHE_BYTES)))
> You could use QEMU_ALIGNED() here.
> 
>> +
>> +#define WRITE_ONCE(ptr, val) \
>> +    (*((volatile typeof(ptr) *)(&(ptr))) = (val))
>> +#define READ_ONCE(ptr) (*((volatile typeof(ptr) *)(&(ptr))))
> Why not atomic_read/set, like in the rest of the QEMU code base?

Or even atomic_rcu_read/atomic_rcu_set, which includes the necessary
barriers.

Also, please do not use __ identifiers in QEMU code.
____cacheline_aligned_in_smp can become just QEMU_ALIGNED(SMP_CACHE_BYTES).

Paolo

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 2/4] migration: introduce lockless multithreads model
  2018-10-16 11:10   ` [Qemu-devel] " guangrong.xiao
@ 2018-10-17 10:10     ` Paolo Bonzini
  -1 siblings, 0 replies; 28+ messages in thread
From: Paolo Bonzini @ 2018-10-17 10:10 UTC (permalink / raw)
  To: guangrong.xiao, mst, mtosatti
  Cc: kvm, quintela, Xiao Guangrong, qemu-devel, peterx, dgilbert,
	wei.w.wang, Emilio G. Cota, jiang.biao2

On 16/10/2018 13:10, guangrong.xiao@gmail.com wrote:
> From: Xiao Guangrong <xiaoguangrong@tencent.com>
> 
> Current implementation of compression and decompression are very
> hard to be enabled on productions. We noticed that too many wait-wakes
> go to kernel space and CPU usages are very low even if the system
> is really free
> 
> The reasons are:
> 1) there are two many locks used to do synchronous,there
>   is a global lock and each single thread has its own lock,
>   migration thread and work threads need to go to sleep if
>   these locks are busy
> 
> 2) migration thread separately submits request to the thread
>    however, only one request can be pended, that means, the
>    thread has to go to sleep after finishing the request
> 
> To make it work better, we introduce a lockless multithread model,
> the user, currently it is the migration thread, submits request
> to each thread which has its own ring whose capacity is 4 and
> puts the result to a global ring where the user fetches result
> out and do remaining operations for the request, e.g, posting the
> compressed data out for migration on the source QEMU
> 
> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
> ---
>  include/qemu/lockless-threads.h |  63 +++++++
>  util/Makefile.objs              |   1 +
>  util/lockless-threads.c         | 373 ++++++++++++++++++++++++++++++++++++++++
>  3 files changed, 437 insertions(+)
>  create mode 100644 include/qemu/lockless-threads.h
>  create mode 100644 util/lockless-threads.c
> 
> diff --git a/include/qemu/lockless-threads.h b/include/qemu/lockless-threads.h
> new file mode 100644
> index 0000000000..9340d3a748
> --- /dev/null
> +++ b/include/qemu/lockless-threads.h
> @@ -0,0 +1,63 @@
> +/*
> + * Lockless Multithreads Abstraction
> + *
> + * This is the abstraction layer for lockless multithreads management.
> + *
> + * Note: currently only one producer is allowed.
> + *
> + * Copyright(C) 2018 Tencent Corporation.
> + *
> + * Author:
> + *   Xiao Guangrong <xiaoguangrong@tencent.com>
> + *
> + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
> + * See the COPYING.LIB file in the top-level directory.
> + */
> +
> +#ifndef QEMU_LOCKLESS_THREAD_H
> +#define QEMU_LOCKLESS_THREAD_H
> +
> +#include "qemu/queue.h"
> +#include "qemu/thread.h"
> +#include "qemu/ptr_ring.h"
> +
> +/*
> + * the request representation which contains the internally used mete data,
> + * it can be embedded to user's self-defined data struct and the user can
> + * use container_of() to get the self-defined data
> + */
> +struct ThreadRequest {
> +    QSLIST_ENTRY(ThreadRequest) node;
> +    unsigned int thread_index;
> +};
> +typedef struct ThreadRequest ThreadRequest;
> +
> +typedef struct Threads Threads;

The module is really nice.  I just have a few suggestions on the naming
and the data organization, but it's really small stuff.  The only bigger
suggestion is about the communication of completed requests back to the
submitter.

First, can you rename this module to something like ThreadedWorkqueue?
(So threaded_workqueue_create, threaded_workqueue_destroy, ...)  The
file can also be renamed to {qemu,utils}/threaded-workqueue.[ch].

> +/* the size of thread local request ring on default */
> +#define DEFAULT_THREAD_RING_SIZE 4
> +
> +Threads *threads_create(unsigned int threads_nr, const char *name,
> +                        int thread_ring_size,
> +                        ThreadRequest *(*thread_request_init)(void),
> +                        void (*thread_request_uninit)(ThreadRequest *request),
> +                        void (*thread_request_handler)(ThreadRequest *request),
> +                        void (*thread_request_done)(ThreadRequest *request));

Please put these four members into a ThreadedWorkqueueOps struct.

> +void threads_destroy(Threads *threads);
> +
> +/*
> + * find a free request and associate it with a free thread.
> + * If no request or no thread is free, return NULL
> + */
> +ThreadRequest *threads_submit_request_prepare(Threads *threads);

threaded_workqueue_get_request

> +/*
> + * push the request to its thread's local ring and notify the thread
> + */
> +void threads_submit_request_commit(Threads *threads, ThreadRequest *request);

threaded_workqueue_submit_request

> +/*
> + * wait all threads to complete the request filled in their local rings
> + * to make sure there is no previous request exists.
> + */
> +void threads_wait_done(Threads *threads);

threaded_workqueue_wait_for_requests

?

> +struct ThreadLocal {
> +    QemuThread thread;
> +
> +    /* the event used to wake up the thread */
> +    QemuEvent ev;
> +
> +    struct Threads *threads;
> +
> +    /* local request ring which is filled by the user */
> +    Ptr_Ring request_ring;

Put the request ring and ev first, so that they certainly fit a
cacheline together.

> +    /* the index of the thread */
> +    int self;
> +
> +    /* thread is useless and needs to exit */
> +    bool quit;
> +};
> +typedef struct ThreadLocal ThreadLocal;
> +
> +/*
> + * the main data struct represents multithreads which is shared by
> + * all threads
> + */
> +struct Threads {
> +    const char *name;
> +    unsigned int threads_nr;
> +    /* the request is pushed to the thread with round-robin manner */
> +    unsigned int current_thread_index;
> +
> +    int thread_ring_size;
> +    int total_requests;
> +
> +    /* the request is pre-allocated and linked in the list */
> +    int free_requests_nr;
> +    QSLIST_HEAD(, ThreadRequest) free_requests;
> +
> +    /* the constructor of request */
> +    ThreadRequest *(*thread_request_init)(void);
> +    /* the destructor of request */
> +    void (*thread_request_uninit)(ThreadRequest *request);
> +    /* the handler of the request which is called in the thread */
> +    void (*thread_request_handler)(ThreadRequest *request);
> +    /*
> +     * the handler to process the result which is called in the
> +     * user's context
> +     */
> +    void (*thread_request_done)(ThreadRequest *request);

You can now store the ops struct pointer here instead of the four
separate fields.

> +    /* the thread push the result to this ring so it has multiple producers */
> +    QemuSpin done_ring_lock;
> +    Ptr_Ring request_done_ring;

Again, putting the request_done_ring first ensures that there's no false
sharing with ops.  Though, see more below about the request_done_ring.
My suggestion below would change these three fields to:

    char *requests;
    unsigned long *completed_requests;
    QemuEvent complete_ev;

> +    ThreadLocal per_thread_data[0];
> +};
> +typedef struct Threads Threads;
> +
> +static void put_done_request(Threads *threads, ThreadRequest *request)
> +{
> +    int ret;
> +
> +    qemu_spin_lock(&threads->done_ring_lock);
> +    ret = ptr_ring_produce(&threads->request_done_ring, request);
> +    /* there should be enough room to save all request. */
> +    assert(!ret);
> +    qemu_spin_unlock(&threads->done_ring_lock);
> +}

An idea: the total number of requests is going to be very small, and a
PtrRing is not the nicest data structure for multiple producer/single
consumer.  So you could instead:

- add the size of one request to the ops structure.  Move the allocation
in init_requests, so that you can have one single large array that
stores all requests.  thread_request_init gets the pointer to a single
request.

- now that you have a single array (let's call it threads->requests),
the request index can be found with "(req - threads->requests) /
threads->ops->request_size".  The thread index, furthermore, is just
request_index / threads->thread_ring_size and you can remove it from
ThreadRequest.

- now that you have request indices, you can replace the completion
ptr_ring with a bitmap, and set a bit in the bitmap with set_bit_atomic
to report completion.  On the writer side you use find_next_bit to find
a completed request and clear_bit_atomic to clear its index.  The index
passed to find_next_bit is threads->current_thread_index *
threads->thread_ring_size,  And you also don't need find_free_thread,
because you can update threads->current_thread_index to

    threads->current_thread_index =
       ((free_request_id / threads->thread_ring_size) + 1)
        % threads->thread_nr;

after you prepare a request, and threads will then naturally receive
requests in round-robin order.  (If find_next_bit fails it means you
have to move the current_thread_index to 0 and retry).

- you don't need the free requests list and free_requests_nr either: you
just initialize the completed request bitmap to all-ones, and
find_next_bit + clear_bit_atomic will do the work of free_requests.
ThreadRequest goes away completely now!

The advantage is that you get rid of the spinlock on the consumer side,
and many auxiliary data structures on the producer side: a bitmap is a
much nicer data structure for multiple producer/single consumer than the
PtrRing.  In addition, with this design the entire Threads structure
becomes read-mostly, which is nice for the cache.  The disadvantage is
that you add an atomic operation (clear_bit_atomic) to
threads_submit_request_prepare(*).

The PtrRing is still useful for the other direction, because there you
have single producer/single consumer.

	(*) It's probably possible to have two separate bitmaps, e.g.
	    where the producer and consumers *flip* bits and the
	    producer looks for mismatched bits between the two bitmaps.
	    I'm not asking you to flesh out and implement that; it's
	    just why I think you can ignore the extra cost of
	    clear_bit_atomic.  In fact, if the maintainers think this
	    is overkill you can go ahead with just the naming fixes and
	    I'll take a look at a rewrite when I have some time for fun
	    stuff. :)

> +void threads_wait_done(Threads *threads)
> +{
> +    ThreadRequest *requests[DEFAULT_THREAD_RING_SIZE * 2];
> +    int nr;
> +
> +retry:
> +    nr = ptr_ring_consume_batched(&threads->request_done_ring,
> +                                  (void **)requests, ARRAY_SIZE(requests));

This is not a fast path, so it should use an event in the
thread->submitter direction too.  qemu_event_set is quite fast
(basically a smp_mb but no cacheline bouncing) if the event is already
set, so it should not be a performance problem to add it in
put_done_request after set_bit_atomic.  (This is more or less unrelated
to the bitmap idea above).

Emilio, can you review the above ideas?

Paolo

> +    while (nr--) {
> +        threads->thread_request_done(requests[nr]);
> +       add_free_request(threads, requests[nr]);
> +    }
> +
> +    if (threads->free_requests_nr != threads->total_requests) {
> +        cpu_relax();
> +        goto retry;
> +    }
> +}
> 

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 2/4] migration: introduce lockless multithreads model
@ 2018-10-17 10:10     ` Paolo Bonzini
  0 siblings, 0 replies; 28+ messages in thread
From: Paolo Bonzini @ 2018-10-17 10:10 UTC (permalink / raw)
  To: guangrong.xiao, mst, mtosatti
  Cc: qemu-devel, kvm, dgilbert, peterx, wei.w.wang, jiang.biao2,
	eblake, quintela, Xiao Guangrong, Emilio G. Cota

On 16/10/2018 13:10, guangrong.xiao@gmail.com wrote:
> From: Xiao Guangrong <xiaoguangrong@tencent.com>
> 
> Current implementation of compression and decompression are very
> hard to be enabled on productions. We noticed that too many wait-wakes
> go to kernel space and CPU usages are very low even if the system
> is really free
> 
> The reasons are:
> 1) there are two many locks used to do synchronous,there
>   is a global lock and each single thread has its own lock,
>   migration thread and work threads need to go to sleep if
>   these locks are busy
> 
> 2) migration thread separately submits request to the thread
>    however, only one request can be pended, that means, the
>    thread has to go to sleep after finishing the request
> 
> To make it work better, we introduce a lockless multithread model,
> the user, currently it is the migration thread, submits request
> to each thread which has its own ring whose capacity is 4 and
> puts the result to a global ring where the user fetches result
> out and do remaining operations for the request, e.g, posting the
> compressed data out for migration on the source QEMU
> 
> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
> ---
>  include/qemu/lockless-threads.h |  63 +++++++
>  util/Makefile.objs              |   1 +
>  util/lockless-threads.c         | 373 ++++++++++++++++++++++++++++++++++++++++
>  3 files changed, 437 insertions(+)
>  create mode 100644 include/qemu/lockless-threads.h
>  create mode 100644 util/lockless-threads.c
> 
> diff --git a/include/qemu/lockless-threads.h b/include/qemu/lockless-threads.h
> new file mode 100644
> index 0000000000..9340d3a748
> --- /dev/null
> +++ b/include/qemu/lockless-threads.h
> @@ -0,0 +1,63 @@
> +/*
> + * Lockless Multithreads Abstraction
> + *
> + * This is the abstraction layer for lockless multithreads management.
> + *
> + * Note: currently only one producer is allowed.
> + *
> + * Copyright(C) 2018 Tencent Corporation.
> + *
> + * Author:
> + *   Xiao Guangrong <xiaoguangrong@tencent.com>
> + *
> + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
> + * See the COPYING.LIB file in the top-level directory.
> + */
> +
> +#ifndef QEMU_LOCKLESS_THREAD_H
> +#define QEMU_LOCKLESS_THREAD_H
> +
> +#include "qemu/queue.h"
> +#include "qemu/thread.h"
> +#include "qemu/ptr_ring.h"
> +
> +/*
> + * the request representation which contains the internally used mete data,
> + * it can be embedded to user's self-defined data struct and the user can
> + * use container_of() to get the self-defined data
> + */
> +struct ThreadRequest {
> +    QSLIST_ENTRY(ThreadRequest) node;
> +    unsigned int thread_index;
> +};
> +typedef struct ThreadRequest ThreadRequest;
> +
> +typedef struct Threads Threads;

The module is really nice.  I just have a few suggestions on the naming
and the data organization, but it's really small stuff.  The only bigger
suggestion is about the communication of completed requests back to the
submitter.

First, can you rename this module to something like ThreadedWorkqueue?
(So threaded_workqueue_create, threaded_workqueue_destroy, ...)  The
file can also be renamed to {qemu,utils}/threaded-workqueue.[ch].

> +/* the size of thread local request ring on default */
> +#define DEFAULT_THREAD_RING_SIZE 4
> +
> +Threads *threads_create(unsigned int threads_nr, const char *name,
> +                        int thread_ring_size,
> +                        ThreadRequest *(*thread_request_init)(void),
> +                        void (*thread_request_uninit)(ThreadRequest *request),
> +                        void (*thread_request_handler)(ThreadRequest *request),
> +                        void (*thread_request_done)(ThreadRequest *request));

Please put these four members into a ThreadedWorkqueueOps struct.

> +void threads_destroy(Threads *threads);
> +
> +/*
> + * find a free request and associate it with a free thread.
> + * If no request or no thread is free, return NULL
> + */
> +ThreadRequest *threads_submit_request_prepare(Threads *threads);

threaded_workqueue_get_request

> +/*
> + * push the request to its thread's local ring and notify the thread
> + */
> +void threads_submit_request_commit(Threads *threads, ThreadRequest *request);

threaded_workqueue_submit_request

> +/*
> + * wait all threads to complete the request filled in their local rings
> + * to make sure there is no previous request exists.
> + */
> +void threads_wait_done(Threads *threads);

threaded_workqueue_wait_for_requests

?

> +struct ThreadLocal {
> +    QemuThread thread;
> +
> +    /* the event used to wake up the thread */
> +    QemuEvent ev;
> +
> +    struct Threads *threads;
> +
> +    /* local request ring which is filled by the user */
> +    Ptr_Ring request_ring;

Put the request ring and ev first, so that they certainly fit a
cacheline together.

> +    /* the index of the thread */
> +    int self;
> +
> +    /* thread is useless and needs to exit */
> +    bool quit;
> +};
> +typedef struct ThreadLocal ThreadLocal;
> +
> +/*
> + * the main data struct represents multithreads which is shared by
> + * all threads
> + */
> +struct Threads {
> +    const char *name;
> +    unsigned int threads_nr;
> +    /* the request is pushed to the thread with round-robin manner */
> +    unsigned int current_thread_index;
> +
> +    int thread_ring_size;
> +    int total_requests;
> +
> +    /* the request is pre-allocated and linked in the list */
> +    int free_requests_nr;
> +    QSLIST_HEAD(, ThreadRequest) free_requests;
> +
> +    /* the constructor of request */
> +    ThreadRequest *(*thread_request_init)(void);
> +    /* the destructor of request */
> +    void (*thread_request_uninit)(ThreadRequest *request);
> +    /* the handler of the request which is called in the thread */
> +    void (*thread_request_handler)(ThreadRequest *request);
> +    /*
> +     * the handler to process the result which is called in the
> +     * user's context
> +     */
> +    void (*thread_request_done)(ThreadRequest *request);

You can now store the ops struct pointer here instead of the four
separate fields.

> +    /* the thread push the result to this ring so it has multiple producers */
> +    QemuSpin done_ring_lock;
> +    Ptr_Ring request_done_ring;

Again, putting the request_done_ring first ensures that there's no false
sharing with ops.  Though, see more below about the request_done_ring.
My suggestion below would change these three fields to:

    char *requests;
    unsigned long *completed_requests;
    QemuEvent complete_ev;

> +    ThreadLocal per_thread_data[0];
> +};
> +typedef struct Threads Threads;
> +
> +static void put_done_request(Threads *threads, ThreadRequest *request)
> +{
> +    int ret;
> +
> +    qemu_spin_lock(&threads->done_ring_lock);
> +    ret = ptr_ring_produce(&threads->request_done_ring, request);
> +    /* there should be enough room to save all request. */
> +    assert(!ret);
> +    qemu_spin_unlock(&threads->done_ring_lock);
> +}

An idea: the total number of requests is going to be very small, and a
PtrRing is not the nicest data structure for multiple producer/single
consumer.  So you could instead:

- add the size of one request to the ops structure.  Move the allocation
in init_requests, so that you can have one single large array that
stores all requests.  thread_request_init gets the pointer to a single
request.

- now that you have a single array (let's call it threads->requests),
the request index can be found with "(req - threads->requests) /
threads->ops->request_size".  The thread index, furthermore, is just
request_index / threads->thread_ring_size and you can remove it from
ThreadRequest.

- now that you have request indices, you can replace the completion
ptr_ring with a bitmap, and set a bit in the bitmap with set_bit_atomic
to report completion.  On the writer side you use find_next_bit to find
a completed request and clear_bit_atomic to clear its index.  The index
passed to find_next_bit is threads->current_thread_index *
threads->thread_ring_size,  And you also don't need find_free_thread,
because you can update threads->current_thread_index to

    threads->current_thread_index =
       ((free_request_id / threads->thread_ring_size) + 1)
        % threads->thread_nr;

after you prepare a request, and threads will then naturally receive
requests in round-robin order.  (If find_next_bit fails it means you
have to move the current_thread_index to 0 and retry).

- you don't need the free requests list and free_requests_nr either: you
just initialize the completed request bitmap to all-ones, and
find_next_bit + clear_bit_atomic will do the work of free_requests.
ThreadRequest goes away completely now!

The advantage is that you get rid of the spinlock on the consumer side,
and many auxiliary data structures on the producer side: a bitmap is a
much nicer data structure for multiple producer/single consumer than the
PtrRing.  In addition, with this design the entire Threads structure
becomes read-mostly, which is nice for the cache.  The disadvantage is
that you add an atomic operation (clear_bit_atomic) to
threads_submit_request_prepare(*).

The PtrRing is still useful for the other direction, because there you
have single producer/single consumer.

	(*) It's probably possible to have two separate bitmaps, e.g.
	    where the producer and consumers *flip* bits and the
	    producer looks for mismatched bits between the two bitmaps.
	    I'm not asking you to flesh out and implement that; it's
	    just why I think you can ignore the extra cost of
	    clear_bit_atomic.  In fact, if the maintainers think this
	    is overkill you can go ahead with just the naming fixes and
	    I'll take a look at a rewrite when I have some time for fun
	    stuff. :)

> +void threads_wait_done(Threads *threads)
> +{
> +    ThreadRequest *requests[DEFAULT_THREAD_RING_SIZE * 2];
> +    int nr;
> +
> +retry:
> +    nr = ptr_ring_consume_batched(&threads->request_done_ring,
> +                                  (void **)requests, ARRAY_SIZE(requests));

This is not a fast path, so it should use an event in the
thread->submitter direction too.  qemu_event_set is quite fast
(basically a smp_mb but no cacheline bouncing) if the event is already
set, so it should not be a performance problem to add it in
put_done_request after set_bit_atomic.  (This is more or less unrelated
to the bitmap idea above).

Emilio, can you review the above ideas?

Paolo

> +    while (nr--) {
> +        threads->thread_request_done(requests[nr]);
> +       add_free_request(threads, requests[nr]);
> +    }
> +
> +    if (threads->free_requests_nr != threads->total_requests) {
> +        cpu_relax();
> +        goto retry;
> +    }
> +}
> 

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 1/4] ptr_ring: port ptr_ring from linux kernel to QEMU
  2018-10-17  8:14       ` [Qemu-devel] " Paolo Bonzini
@ 2018-10-18  6:52         ` Xiao Guangrong
  -1 siblings, 0 replies; 28+ messages in thread
From: Xiao Guangrong @ 2018-10-18  6:52 UTC (permalink / raw)
  To: Paolo Bonzini, Emilio G. Cota
  Cc: kvm, quintela, mtosatti, Xiao Guangrong, qemu-devel, peterx,
	dgilbert, wei.w.wang, mst, jiang.biao2



On 10/17/2018 04:14 PM, Paolo Bonzini wrote:
> On 16/10/2018 18:40, Emilio G. Cota wrote:
>>> +#define SMP_CACHE_BYTES      64
>>> +#define ____cacheline_aligned_in_smp \
>>> +        __attribute__((__aligned__(SMP_CACHE_BYTES)))
>> You could use QEMU_ALIGNED() here.

Yes, you are right.

>>
>>> +
>>> +#define WRITE_ONCE(ptr, val) \
>>> +    (*((volatile typeof(ptr) *)(&(ptr))) = (val))
>>> +#define READ_ONCE(ptr) (*((volatile typeof(ptr) *)(&(ptr))))
>> Why not atomic_read/set, like in the rest of the QEMU code base?
> 
> Or even atomic_rcu_read/atomic_rcu_set, which includes the necessary
> barriers.
> 

Okay, will fix it, thank you and Emilio for pointing the
issue out.

> Also, please do not use __ identifiers in QEMU code.
> ____cacheline_aligned_in_smp can become just QEMU_ALIGNED(SMP_CACHE_BYTES).
> 

Sure, will keep that in my mind. :)

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 1/4] ptr_ring: port ptr_ring from linux kernel to QEMU
@ 2018-10-18  6:52         ` Xiao Guangrong
  0 siblings, 0 replies; 28+ messages in thread
From: Xiao Guangrong @ 2018-10-18  6:52 UTC (permalink / raw)
  To: Paolo Bonzini, Emilio G. Cota
  Cc: mst, mtosatti, kvm, quintela, Xiao Guangrong, qemu-devel, peterx,
	dgilbert, wei.w.wang, jiang.biao2



On 10/17/2018 04:14 PM, Paolo Bonzini wrote:
> On 16/10/2018 18:40, Emilio G. Cota wrote:
>>> +#define SMP_CACHE_BYTES      64
>>> +#define ____cacheline_aligned_in_smp \
>>> +        __attribute__((__aligned__(SMP_CACHE_BYTES)))
>> You could use QEMU_ALIGNED() here.

Yes, you are right.

>>
>>> +
>>> +#define WRITE_ONCE(ptr, val) \
>>> +    (*((volatile typeof(ptr) *)(&(ptr))) = (val))
>>> +#define READ_ONCE(ptr) (*((volatile typeof(ptr) *)(&(ptr))))
>> Why not atomic_read/set, like in the rest of the QEMU code base?
> 
> Or even atomic_rcu_read/atomic_rcu_set, which includes the necessary
> barriers.
> 

Okay, will fix it, thank you and Emilio for pointing the
issue out.

> Also, please do not use __ identifiers in QEMU code.
> ____cacheline_aligned_in_smp can become just QEMU_ALIGNED(SMP_CACHE_BYTES).
> 

Sure, will keep that in my mind. :)

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 2/4] migration: introduce lockless multithreads model
  2018-10-17 10:10     ` [Qemu-devel] " Paolo Bonzini
@ 2018-10-18  9:30       ` Xiao Guangrong
  -1 siblings, 0 replies; 28+ messages in thread
From: Xiao Guangrong @ 2018-10-18  9:30 UTC (permalink / raw)
  To: Paolo Bonzini, mst, mtosatti
  Cc: kvm, quintela, Xiao Guangrong, qemu-devel, peterx, dgilbert,
	wei.w.wang, Emilio G. Cota, jiang.biao2



On 10/17/2018 06:10 PM, Paolo Bonzini wrote:

> 
> An idea: the total number of requests is going to be very small, and a
> PtrRing is not the nicest data structure for multiple producer/single
> consumer.  So you could instead:
> 
> - add the size of one request to the ops structure.  Move the allocation
> in init_requests, so that you can have one single large array that
> stores all requests.  thread_request_init gets the pointer to a single
> request.
> 
> - now that you have a single array (let's call it threads->requests),
> the request index can be found with "(req - threads->requests) /
> threads->ops->request_size".  The thread index, furthermore, is just
> request_index / threads->thread_ring_size and you can remove it from
> ThreadRequest.
> 
> - now that you have request indices, you can replace the completion
> ptr_ring with a bitmap, and set a bit in the bitmap with set_bit_atomic
> to report completion.  On the writer side you use find_next_bit to find
> a completed request and clear_bit_atomic to clear its index.  The index
> passed to find_next_bit is threads->current_thread_index *
> threads->thread_ring_size,  And you also don't need find_free_thread,
> because you can update threads->current_thread_index to
> 
>      threads->current_thread_index =
>         ((free_request_id / threads->thread_ring_size) + 1)
>          % threads->thread_nr;
> 
> after you prepare a request, and threads will then naturally receive
> requests in round-robin order.  (If find_next_bit fails it means you
> have to move the current_thread_index to 0 and retry).
> 
> - you don't need the free requests list and free_requests_nr either: you
> just initialize the completed request bitmap to all-ones, and
> find_next_bit + clear_bit_atomic will do the work of free_requests.
> ThreadRequest goes away completely now!
> 
> The advantage is that you get rid of the spinlock on the consumer side,
> and many auxiliary data structures on the producer side: a bitmap is a
> much nicer data structure for multiple producer/single consumer than the
> PtrRing.  In addition, with this design the entire Threads structure
> becomes read-mostly, which is nice for the cache.  The disadvantage is
> that you add an atomic operation (clear_bit_atomic) to
> threads_submit_request_prepare(*).
> 

All your comments are good to me and you are a GENIUS, the idea
that make the requests be a single array and partitions it like
this way simplifies the thing a lot.

> The PtrRing is still useful for the other direction, because there you
> have single producer/single consumer.
> 
> 	(*) It's probably possible to have two separate bitmaps, e.g.
> 	    where the producer and consumers *flip* bits and the
> 	    producer looks for mismatched bits between the two bitmaps.
> 	    I'm not asking you to flesh out and implement that; it's
> 	    just why I think you can ignore the extra cost of
> 	    clear_bit_atomic.  In fact, if the maintainers think this
> 	    is overkill you can go ahead with just the naming fixes and
> 	    I'll take a look at a rewrite when I have some time for fun
> 	    stuff. :)
> 

LOL! Great minds think alike, the idea, "flipping bitmaps", was in my
mind too. :)

Beside that... i think we get the chance to remove ptr_ring gracefully,
as the bitmap can indicate the ownership of the request as well. If
the bit is 1 (supposing all bits are 1 on default), only the user can
operate it, the bit will be cleared after the user fills the info
to the request. After that, the thread sees the bit is cleared, then
it gets the ownership and finishes the request, then sets bit in
the bitmap. The ownership is returned to the user again.

One thing may be disadvantage is, it can't differentiate the case if the
request is empty or contains the result which need the user call
threads_wait_done(), that will slow threads_wait_done() a little as it
need check all requests, but it is not a big deal as
1) at the point needing flush, it's high possible that all most requests
    have been used.
2) the total number of requests is going to be very small.


It is illustrated by following code by combining the "flip" bitmaps:

struct Threads {
    ......

    /*
     * the bit in these two bitmaps indicates the index of the requests
     * respectively. If it's the same, the request is owned by the user,
     * i.e, only the use can use the request. Otherwise, it is owned by
     * the thread.
     */

    /* after the user fills the request, the bit is flipped. */
    unsigned long *request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);

    /* after handles the request, the thread flips the bit. */
    unsigned long *request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
}

threads_submit_request_prepare()
{
	request_done_bitmap = READ_ONCE(threads->request_done_bitmap);
         result_bitmap = bitmap_xor(&request_done_bitmap, threads->request_fill_bitmap);

	index = find_first_zero_bit(current-thread-to-request-index, &result_bitmap);

	/* make sure we get the data the thread written. */
         smp_rmb();

         thread_request_done(requests[index]);
         ...
}

threads_submit_request_commit()
{
         /* make sure the user have filled the request before we make it be viable to the threads. */
	smp_wmb();

	/* after that, the thread can handle the request. */
         bitmap_change_bit(request-to-index, threads->request_fill_bitmap);
         ......
}

In the thread, it does:
thread_run()
{
	index_start = threads->requests + itself->index * threads->thread_ring_size;
	index_end = index_start + threads->thread_ring_size;

loop:
	request_fill_bitmap = READ_ONCE(threads->request_fill_bitmap);
	request_done_bitmap = READ_ONCE(threads->request_done_bitmap);
	result_bitmap = bitmap_xor(&request_fill_bitmap, &request_done_bitmap);

	index = find_first_bit_set(&result_bitmap, .start = index_start, .end = index_end);

	/*
          * paired with smp_wmb() in threads_submit_request_commit to make sure the
          * thread can get data filled by the user.
          */
	smp_rmb();

	request = threads->requests[index];
	thread_request_handler(request);

	/*
          * updating the request is viable before flip the bitmap, paired
          * with smp_rmb() in threads_submit_request_prepare().
          */
	smp_wmb();

	bitmap_change_bit_atomic(&threads->request_done_bitmap, index);
         ......
}

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 2/4] migration: introduce lockless multithreads model
@ 2018-10-18  9:30       ` Xiao Guangrong
  0 siblings, 0 replies; 28+ messages in thread
From: Xiao Guangrong @ 2018-10-18  9:30 UTC (permalink / raw)
  To: Paolo Bonzini, mst, mtosatti
  Cc: qemu-devel, kvm, dgilbert, peterx, wei.w.wang, jiang.biao2,
	eblake, quintela, Xiao Guangrong, Emilio G. Cota



On 10/17/2018 06:10 PM, Paolo Bonzini wrote:

> 
> An idea: the total number of requests is going to be very small, and a
> PtrRing is not the nicest data structure for multiple producer/single
> consumer.  So you could instead:
> 
> - add the size of one request to the ops structure.  Move the allocation
> in init_requests, so that you can have one single large array that
> stores all requests.  thread_request_init gets the pointer to a single
> request.
> 
> - now that you have a single array (let's call it threads->requests),
> the request index can be found with "(req - threads->requests) /
> threads->ops->request_size".  The thread index, furthermore, is just
> request_index / threads->thread_ring_size and you can remove it from
> ThreadRequest.
> 
> - now that you have request indices, you can replace the completion
> ptr_ring with a bitmap, and set a bit in the bitmap with set_bit_atomic
> to report completion.  On the writer side you use find_next_bit to find
> a completed request and clear_bit_atomic to clear its index.  The index
> passed to find_next_bit is threads->current_thread_index *
> threads->thread_ring_size,  And you also don't need find_free_thread,
> because you can update threads->current_thread_index to
> 
>      threads->current_thread_index =
>         ((free_request_id / threads->thread_ring_size) + 1)
>          % threads->thread_nr;
> 
> after you prepare a request, and threads will then naturally receive
> requests in round-robin order.  (If find_next_bit fails it means you
> have to move the current_thread_index to 0 and retry).
> 
> - you don't need the free requests list and free_requests_nr either: you
> just initialize the completed request bitmap to all-ones, and
> find_next_bit + clear_bit_atomic will do the work of free_requests.
> ThreadRequest goes away completely now!
> 
> The advantage is that you get rid of the spinlock on the consumer side,
> and many auxiliary data structures on the producer side: a bitmap is a
> much nicer data structure for multiple producer/single consumer than the
> PtrRing.  In addition, with this design the entire Threads structure
> becomes read-mostly, which is nice for the cache.  The disadvantage is
> that you add an atomic operation (clear_bit_atomic) to
> threads_submit_request_prepare(*).
> 

All your comments are good to me and you are a GENIUS, the idea
that make the requests be a single array and partitions it like
this way simplifies the thing a lot.

> The PtrRing is still useful for the other direction, because there you
> have single producer/single consumer.
> 
> 	(*) It's probably possible to have two separate bitmaps, e.g.
> 	    where the producer and consumers *flip* bits and the
> 	    producer looks for mismatched bits between the two bitmaps.
> 	    I'm not asking you to flesh out and implement that; it's
> 	    just why I think you can ignore the extra cost of
> 	    clear_bit_atomic.  In fact, if the maintainers think this
> 	    is overkill you can go ahead with just the naming fixes and
> 	    I'll take a look at a rewrite when I have some time for fun
> 	    stuff. :)
> 

LOL! Great minds think alike, the idea, "flipping bitmaps", was in my
mind too. :)

Beside that... i think we get the chance to remove ptr_ring gracefully,
as the bitmap can indicate the ownership of the request as well. If
the bit is 1 (supposing all bits are 1 on default), only the user can
operate it, the bit will be cleared after the user fills the info
to the request. After that, the thread sees the bit is cleared, then
it gets the ownership and finishes the request, then sets bit in
the bitmap. The ownership is returned to the user again.

One thing may be disadvantage is, it can't differentiate the case if the
request is empty or contains the result which need the user call
threads_wait_done(), that will slow threads_wait_done() a little as it
need check all requests, but it is not a big deal as
1) at the point needing flush, it's high possible that all most requests
    have been used.
2) the total number of requests is going to be very small.


It is illustrated by following code by combining the "flip" bitmaps:

struct Threads {
    ......

    /*
     * the bit in these two bitmaps indicates the index of the requests
     * respectively. If it's the same, the request is owned by the user,
     * i.e, only the use can use the request. Otherwise, it is owned by
     * the thread.
     */

    /* after the user fills the request, the bit is flipped. */
    unsigned long *request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);

    /* after handles the request, the thread flips the bit. */
    unsigned long *request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
}

threads_submit_request_prepare()
{
	request_done_bitmap = READ_ONCE(threads->request_done_bitmap);
         result_bitmap = bitmap_xor(&request_done_bitmap, threads->request_fill_bitmap);

	index = find_first_zero_bit(current-thread-to-request-index, &result_bitmap);

	/* make sure we get the data the thread written. */
         smp_rmb();

         thread_request_done(requests[index]);
         ...
}

threads_submit_request_commit()
{
         /* make sure the user have filled the request before we make it be viable to the threads. */
	smp_wmb();

	/* after that, the thread can handle the request. */
         bitmap_change_bit(request-to-index, threads->request_fill_bitmap);
         ......
}

In the thread, it does:
thread_run()
{
	index_start = threads->requests + itself->index * threads->thread_ring_size;
	index_end = index_start + threads->thread_ring_size;

loop:
	request_fill_bitmap = READ_ONCE(threads->request_fill_bitmap);
	request_done_bitmap = READ_ONCE(threads->request_done_bitmap);
	result_bitmap = bitmap_xor(&request_fill_bitmap, &request_done_bitmap);

	index = find_first_bit_set(&result_bitmap, .start = index_start, .end = index_end);

	/*
          * paired with smp_wmb() in threads_submit_request_commit to make sure the
          * thread can get data filled by the user.
          */
	smp_rmb();

	request = threads->requests[index];
	thread_request_handler(request);

	/*
          * updating the request is viable before flip the bitmap, paired
          * with smp_rmb() in threads_submit_request_prepare().
          */
	smp_wmb();

	bitmap_change_bit_atomic(&threads->request_done_bitmap, index);
         ......
}

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 2/4] migration: introduce lockless multithreads model
  2018-10-18  9:30       ` [Qemu-devel] " Xiao Guangrong
@ 2018-10-18 10:39         ` Paolo Bonzini
  -1 siblings, 0 replies; 28+ messages in thread
From: Paolo Bonzini @ 2018-10-18 10:39 UTC (permalink / raw)
  To: Xiao Guangrong, mst, mtosatti
  Cc: kvm, quintela, Xiao Guangrong, qemu-devel, peterx, dgilbert,
	wei.w.wang, Emilio G. Cota, jiang.biao2

On 18/10/2018 11:30, Xiao Guangrong wrote:
> Beside that... i think we get the chance to remove ptr_ring gracefully,
> as the bitmap can indicate the ownership of the request as well. If
> the bit is 1 (supposing all bits are 1 on default), only the user can
> operate it, the bit will be cleared after the user fills the info
> to the request. After that, the thread sees the bit is cleared, then
> it gets the ownership and finishes the request, then sets bit in
> the bitmap. The ownership is returned to the user again.

Yes, even better. :)

> One thing may be disadvantage is, it can't differentiate the case if the
> request is empty or contains the result which need the user call
> threads_wait_done(), that will slow threads_wait_done() a little as it
> need check all requests, but it is not a big deal as
> 1) at the point needing flush, it's high possible that all most requests
>    have been used.
> 2) the total number of requests is going to be very small.

threads_wait_done only needs to check bitmap_equal for the two bitmaps,
no?  (I'm not sure if, with the code below, it would be bitmap_equal or
"all bits are different", i.e. xor is all ones.  But it's a trivial change).

> 
> It is illustrated by following code by combining the "flip" bitmaps:
> 
> struct Threads {
>    ......
> 
>    /*
>     * the bit in these two bitmaps indicates the index of the requests
>     * respectively. If it's the same, the request is owned by the user,
>     * i.e, only the use can use the request. Otherwise, it is owned by
>     * the thread.
>     */
> 
>    /* after the user fills the request, the bit is flipped. */
>    unsigned long *request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
> 
>    /* after handles the request, the thread flips the bit. */
>    unsigned long *request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
> }

Note that the pointers need not be aligned, because they are only read.
 It's the data that should be aligned instead (qemu_memalign to allocate
it).

> threads_submit_request_prepare()
> {
>     request_done_bitmap = READ_ONCE(threads->request_done_bitmap);
>         result_bitmap = bitmap_xor(&request_done_bitmap,
> threads->request_fill_bitmap);
> 
>     index = find_first_zero_bit(current-thread-to-request-index,
> &result_bitmap);

find_next_zero_bit.

>     /* make sure we get the data the thread written. */
>         smp_rmb();
> 
>         thread_request_done(requests[index]);
>         ...
> }
> 
> threads_submit_request_commit()
> {
>         /* make sure the user have filled the request before we make it
> be viable to the threads. */
>     smp_wmb();
> 
>     /* after that, the thread can handle the request. */
>         bitmap_change_bit(request-to-index, threads->request_fill_bitmap);
>         ......
> }
> 
> In the thread, it does:
> thread_run()
> {
>     index_start = threads->requests + itself->index *
> threads->thread_ring_size;
>     index_end = index_start + threads->thread_ring_size;
> 
> loop:
>     request_fill_bitmap = READ_ONCE(threads->request_fill_bitmap);
>     request_done_bitmap = READ_ONCE(threads->request_done_bitmap);

No need for READ_ONCE (atomic_read in QEMU), as the pointers are never
written.  Technically READ_ONCE _would_ be needed in bitmap_xor.  Either
just ignore the issue or write a find_{equal,different}_bit yourself in
util/threads.c, so that it can use atomic_read.

>     result_bitmap = bitmap_xor(&request_fill_bitmap, &request_done_bitmap);
>     index = find_first_bit_set(&result_bitmap, .start = index_start,
> .end = index_end);
> 
>     /*
>          * paired with smp_wmb() in threads_submit_request_commit to
> make sure the
>          * thread can get data filled by the user.
>          */
>     smp_rmb();
> 
>     request = threads->requests[index];
>     thread_request_handler(request);
> 
>     /*
>          * updating the request is viable before flip the bitmap, paired
>          * with smp_rmb() in threads_submit_request_prepare().
>          */
>     smp_wmb();

No need for smp_wmb before atomic_xor.

>     bitmap_change_bit_atomic(&threads->request_done_bitmap, index);
>         ......
> }

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 2/4] migration: introduce lockless multithreads model
@ 2018-10-18 10:39         ` Paolo Bonzini
  0 siblings, 0 replies; 28+ messages in thread
From: Paolo Bonzini @ 2018-10-18 10:39 UTC (permalink / raw)
  To: Xiao Guangrong, mst, mtosatti
  Cc: qemu-devel, kvm, dgilbert, peterx, wei.w.wang, jiang.biao2,
	eblake, quintela, Xiao Guangrong, Emilio G. Cota

On 18/10/2018 11:30, Xiao Guangrong wrote:
> Beside that... i think we get the chance to remove ptr_ring gracefully,
> as the bitmap can indicate the ownership of the request as well. If
> the bit is 1 (supposing all bits are 1 on default), only the user can
> operate it, the bit will be cleared after the user fills the info
> to the request. After that, the thread sees the bit is cleared, then
> it gets the ownership and finishes the request, then sets bit in
> the bitmap. The ownership is returned to the user again.

Yes, even better. :)

> One thing may be disadvantage is, it can't differentiate the case if the
> request is empty or contains the result which need the user call
> threads_wait_done(), that will slow threads_wait_done() a little as it
> need check all requests, but it is not a big deal as
> 1) at the point needing flush, it's high possible that all most requests
>    have been used.
> 2) the total number of requests is going to be very small.

threads_wait_done only needs to check bitmap_equal for the two bitmaps,
no?  (I'm not sure if, with the code below, it would be bitmap_equal or
"all bits are different", i.e. xor is all ones.  But it's a trivial change).

> 
> It is illustrated by following code by combining the "flip" bitmaps:
> 
> struct Threads {
>    ......
> 
>    /*
>     * the bit in these two bitmaps indicates the index of the requests
>     * respectively. If it's the same, the request is owned by the user,
>     * i.e, only the use can use the request. Otherwise, it is owned by
>     * the thread.
>     */
> 
>    /* after the user fills the request, the bit is flipped. */
>    unsigned long *request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
> 
>    /* after handles the request, the thread flips the bit. */
>    unsigned long *request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
> }

Note that the pointers need not be aligned, because they are only read.
 It's the data that should be aligned instead (qemu_memalign to allocate
it).

> threads_submit_request_prepare()
> {
>     request_done_bitmap = READ_ONCE(threads->request_done_bitmap);
>         result_bitmap = bitmap_xor(&request_done_bitmap,
> threads->request_fill_bitmap);
> 
>     index = find_first_zero_bit(current-thread-to-request-index,
> &result_bitmap);

find_next_zero_bit.

>     /* make sure we get the data the thread written. */
>         smp_rmb();
> 
>         thread_request_done(requests[index]);
>         ...
> }
> 
> threads_submit_request_commit()
> {
>         /* make sure the user have filled the request before we make it
> be viable to the threads. */
>     smp_wmb();
> 
>     /* after that, the thread can handle the request. */
>         bitmap_change_bit(request-to-index, threads->request_fill_bitmap);
>         ......
> }
> 
> In the thread, it does:
> thread_run()
> {
>     index_start = threads->requests + itself->index *
> threads->thread_ring_size;
>     index_end = index_start + threads->thread_ring_size;
> 
> loop:
>     request_fill_bitmap = READ_ONCE(threads->request_fill_bitmap);
>     request_done_bitmap = READ_ONCE(threads->request_done_bitmap);

No need for READ_ONCE (atomic_read in QEMU), as the pointers are never
written.  Technically READ_ONCE _would_ be needed in bitmap_xor.  Either
just ignore the issue or write a find_{equal,different}_bit yourself in
util/threads.c, so that it can use atomic_read.

>     result_bitmap = bitmap_xor(&request_fill_bitmap, &request_done_bitmap);
>     index = find_first_bit_set(&result_bitmap, .start = index_start,
> .end = index_end);
> 
>     /*
>          * paired with smp_wmb() in threads_submit_request_commit to
> make sure the
>          * thread can get data filled by the user.
>          */
>     smp_rmb();
> 
>     request = threads->requests[index];
>     thread_request_handler(request);
> 
>     /*
>          * updating the request is viable before flip the bitmap, paired
>          * with smp_rmb() in threads_submit_request_prepare().
>          */
>     smp_wmb();

No need for smp_wmb before atomic_xor.

>     bitmap_change_bit_atomic(&threads->request_done_bitmap, index);
>         ......
> }

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 2/4] migration: introduce lockless multithreads model
  2018-10-17 10:10     ` [Qemu-devel] " Paolo Bonzini
@ 2018-10-26 23:33       ` Emilio G. Cota
  -1 siblings, 0 replies; 28+ messages in thread
From: Emilio G. Cota @ 2018-10-26 23:33 UTC (permalink / raw)
  To: Paolo Bonzini
  Cc: kvm, mst, mtosatti, Xiao Guangrong, dgilbert, peterx, qemu-devel,
	quintela, wei.w.wang, guangrong.xiao, jiang.biao2

On Wed, Oct 17, 2018 at 12:10:15 +0200, Paolo Bonzini wrote:
> On 16/10/2018 13:10, guangrong.xiao@gmail.com wrote:

> An idea: the total number of requests is going to be very small, and a
> PtrRing is not the nicest data structure for multiple producer/single
> consumer.  So you could instead:
(snip)
> - now that you have request indices, you can replace the completion
> ptr_ring with a bitmap, and set a bit in the bitmap with set_bit_atomic
> to report completion.  On the writer side you use find_next_bit to find
(snip)
> Emilio, can you review the above ideas?

Sorry it took me a while to go through this.

I like your suggestions. Just one nit; I'm not sure I understood
the use case very well, but I think using a bitmap to signal
completion might be suboptimal, since we'd have several
thread spinning on the same cacheline yet caring about
different bits.

Xiao: a couple of suggestions

- Since you'll be adding a generic module, make its commit and
  description self-contained. That is, mentioning in the
  log that this will be used for migration is fine, but please
  describe the module (and the assumptions it makes about its
  users) in general, so that someone that doesn't know anything
  about migration can still understand this module (and hopefully
  adopt it for other use cases).

- I'd like to see a simple test program (or rather, benchmark)
  that shows how this works. This benchmark would be completely
  unrelated to migration; it should just be a simple test of
  the performance/scalability of this module.
  Having this benchmark would help (1) discuss and quantitately
  evaluate modifications to the module, and (2) help others to
  quickly understand what the module does.
  See tests/qht-bench.c for an example.

Thanks,

		Emilio

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 2/4] migration: introduce lockless multithreads model
@ 2018-10-26 23:33       ` Emilio G. Cota
  0 siblings, 0 replies; 28+ messages in thread
From: Emilio G. Cota @ 2018-10-26 23:33 UTC (permalink / raw)
  To: Paolo Bonzini
  Cc: guangrong.xiao, mst, mtosatti, qemu-devel, kvm, dgilbert, peterx,
	wei.w.wang, jiang.biao2, eblake, quintela, Xiao Guangrong

On Wed, Oct 17, 2018 at 12:10:15 +0200, Paolo Bonzini wrote:
> On 16/10/2018 13:10, guangrong.xiao@gmail.com wrote:

> An idea: the total number of requests is going to be very small, and a
> PtrRing is not the nicest data structure for multiple producer/single
> consumer.  So you could instead:
(snip)
> - now that you have request indices, you can replace the completion
> ptr_ring with a bitmap, and set a bit in the bitmap with set_bit_atomic
> to report completion.  On the writer side you use find_next_bit to find
(snip)
> Emilio, can you review the above ideas?

Sorry it took me a while to go through this.

I like your suggestions. Just one nit; I'm not sure I understood
the use case very well, but I think using a bitmap to signal
completion might be suboptimal, since we'd have several
thread spinning on the same cacheline yet caring about
different bits.

Xiao: a couple of suggestions

- Since you'll be adding a generic module, make its commit and
  description self-contained. That is, mentioning in the
  log that this will be used for migration is fine, but please
  describe the module (and the assumptions it makes about its
  users) in general, so that someone that doesn't know anything
  about migration can still understand this module (and hopefully
  adopt it for other use cases).

- I'd like to see a simple test program (or rather, benchmark)
  that shows how this works. This benchmark would be completely
  unrelated to migration; it should just be a simple test of
  the performance/scalability of this module.
  Having this benchmark would help (1) discuss and quantitately
  evaluate modifications to the module, and (2) help others to
  quickly understand what the module does.
  See tests/qht-bench.c for an example.

Thanks,

		Emilio

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 2/4] migration: introduce lockless multithreads model
  2018-10-26 23:33       ` [Qemu-devel] " Emilio G. Cota
@ 2018-10-28  7:50         ` Paolo Bonzini
  -1 siblings, 0 replies; 28+ messages in thread
From: Paolo Bonzini @ 2018-10-28  7:50 UTC (permalink / raw)
  To: Emilio G. Cota
  Cc: kvm, mst, mtosatti, Xiao Guangrong, dgilbert, peterx, qemu-devel,
	quintela, wei.w.wang, guangrong.xiao, jiang.biao2

On 27/10/2018 01:33, Emilio G. Cota wrote:
> On Wed, Oct 17, 2018 at 12:10:15 +0200, Paolo Bonzini wrote:
>> On 16/10/2018 13:10, guangrong.xiao@gmail.com wrote:
> 
>> An idea: the total number of requests is going to be very small, and a
>> PtrRing is not the nicest data structure for multiple producer/single
>> consumer.  So you could instead:
> (snip)
>> - now that you have request indices, you can replace the completion
>> ptr_ring with a bitmap, and set a bit in the bitmap with set_bit_atomic
>> to report completion.  On the writer side you use find_next_bit to find
> (snip)
>> Emilio, can you review the above ideas?
> 
> Sorry it took me a while to go through this.
> 
> I like your suggestions. Just one nit; I'm not sure I understood
> the use case very well, but I think using a bitmap to signal
> completion might be suboptimal, since we'd have several
> thread spinning on the same cacheline yet caring about
> different bits.

Requests are asynchronous, the bitmap is only used to find a free
submission slot.  You're right that the bitmap can bounce across
processors, but I'm not sure how else you would do that because you
don't know in advance how many submitting threads you have.  It wouldn't
be any worse if there was a spinlock.

However, in the migration case there is only one submitting thread, so
it's okay. :)

Paolo

> Xiao: a couple of suggestions
> 
> - Since you'll be adding a generic module, make its commit and
>   description self-contained. That is, mentioning in the
>   log that this will be used for migration is fine, but please
>   describe the module (and the assumptions it makes about its
>   users) in general, so that someone that doesn't know anything
>   about migration can still understand this module (and hopefully
>   adopt it for other use cases).
> 
> - I'd like to see a simple test program (or rather, benchmark)
>   that shows how this works. This benchmark would be completely
>   unrelated to migration; it should just be a simple test of
>   the performance/scalability of this module.
>   Having this benchmark would help (1) discuss and quantitately
>   evaluate modifications to the module, and (2) help others to
>   quickly understand what the module does.
>   See tests/qht-bench.c for an example.
> 
> Thanks,
> 
> 		Emilio
> 

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 2/4] migration: introduce lockless multithreads model
@ 2018-10-28  7:50         ` Paolo Bonzini
  0 siblings, 0 replies; 28+ messages in thread
From: Paolo Bonzini @ 2018-10-28  7:50 UTC (permalink / raw)
  To: Emilio G. Cota
  Cc: guangrong.xiao, mst, mtosatti, qemu-devel, kvm, dgilbert, peterx,
	wei.w.wang, jiang.biao2, eblake, quintela, Xiao Guangrong

On 27/10/2018 01:33, Emilio G. Cota wrote:
> On Wed, Oct 17, 2018 at 12:10:15 +0200, Paolo Bonzini wrote:
>> On 16/10/2018 13:10, guangrong.xiao@gmail.com wrote:
> 
>> An idea: the total number of requests is going to be very small, and a
>> PtrRing is not the nicest data structure for multiple producer/single
>> consumer.  So you could instead:
> (snip)
>> - now that you have request indices, you can replace the completion
>> ptr_ring with a bitmap, and set a bit in the bitmap with set_bit_atomic
>> to report completion.  On the writer side you use find_next_bit to find
> (snip)
>> Emilio, can you review the above ideas?
> 
> Sorry it took me a while to go through this.
> 
> I like your suggestions. Just one nit; I'm not sure I understood
> the use case very well, but I think using a bitmap to signal
> completion might be suboptimal, since we'd have several
> thread spinning on the same cacheline yet caring about
> different bits.

Requests are asynchronous, the bitmap is only used to find a free
submission slot.  You're right that the bitmap can bounce across
processors, but I'm not sure how else you would do that because you
don't know in advance how many submitting threads you have.  It wouldn't
be any worse if there was a spinlock.

However, in the migration case there is only one submitting thread, so
it's okay. :)

Paolo

> Xiao: a couple of suggestions
> 
> - Since you'll be adding a generic module, make its commit and
>   description self-contained. That is, mentioning in the
>   log that this will be used for migration is fine, but please
>   describe the module (and the assumptions it makes about its
>   users) in general, so that someone that doesn't know anything
>   about migration can still understand this module (and hopefully
>   adopt it for other use cases).
> 
> - I'd like to see a simple test program (or rather, benchmark)
>   that shows how this works. This benchmark would be completely
>   unrelated to migration; it should just be a simple test of
>   the performance/scalability of this module.
>   Having this benchmark would help (1) discuss and quantitately
>   evaluate modifications to the module, and (2) help others to
>   quickly understand what the module does.
>   See tests/qht-bench.c for an example.
> 
> Thanks,
> 
> 		Emilio
> 

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH 2/4] migration: introduce lockless multithreads model
  2018-10-28  7:50         ` [Qemu-devel] " Paolo Bonzini
@ 2018-10-29  2:52           ` Xiao Guangrong
  -1 siblings, 0 replies; 28+ messages in thread
From: Xiao Guangrong @ 2018-10-29  2:52 UTC (permalink / raw)
  To: Paolo Bonzini, Emilio G. Cota
  Cc: kvm, mst, mtosatti, Xiao Guangrong, dgilbert, peterx, qemu-devel,
	quintela, wei.w.wang, jiang.biao2



On 10/28/2018 03:50 PM, Paolo Bonzini wrote:
> On 27/10/2018 01:33, Emilio G. Cota wrote:
>> On Wed, Oct 17, 2018 at 12:10:15 +0200, Paolo Bonzini wrote:
>>> On 16/10/2018 13:10, guangrong.xiao@gmail.com wrote:
>>
>>> An idea: the total number of requests is going to be very small, and a
>>> PtrRing is not the nicest data structure for multiple producer/single
>>> consumer.  So you could instead:
>> (snip)
>>> - now that you have request indices, you can replace the completion
>>> ptr_ring with a bitmap, and set a bit in the bitmap with set_bit_atomic
>>> to report completion.  On the writer side you use find_next_bit to find
>> (snip)
>>> Emilio, can you review the above ideas?
>>
>> Sorry it took me a while to go through this.
>>
>> I like your suggestions. Just one nit; I'm not sure I understood
>> the use case very well, but I think using a bitmap to signal
>> completion might be suboptimal, since we'd have several
>> thread spinning on the same cacheline yet caring about
>> different bits.
> 
> Requests are asynchronous, the bitmap is only used to find a free
> submission slot.  You're right that the bitmap can bounce across
> processors, but I'm not sure how else you would do that because you
> don't know in advance how many submitting threads you have.  It wouldn't
> be any worse if there was a spinlock.
> 
> However, in the migration case there is only one submitting thread, so
> it's okay. :)
> 

Yup.

The cache contention only exists in the work threads, the sumbiter thread
is totally free who is the main migration thread. Making the main thread
be faster is good.

> Paolo
> 
>> Xiao: a couple of suggestions
>>
>> - Since you'll be adding a generic module, make its commit and
>>    description self-contained. That is, mentioning in the
>>    log that this will be used for migration is fine, but please
>>    describe the module (and the assumptions it makes about its
>>    users) in general, so that someone that doesn't know anything
>>    about migration can still understand this module (and hopefully
>>    adopt it for other use cases).

Good to me, i will add more detailed description for this module in
the next version.

>>
>> - I'd like to see a simple test program (or rather, benchmark)
>>    that shows how this works. This benchmark would be completely
>>    unrelated to migration; it should just be a simple test of
>>    the performance/scalability of this module.
>>    Having this benchmark would help (1) discuss and quantitately
>>    evaluate modifications to the module, and (2) help others to
>>    quickly understand what the module does.
>>    See tests/qht-bench.c for an example.
>>

Can not agree with you more, will do. :)

Thank you, Emilio and Paolo!

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 2/4] migration: introduce lockless multithreads model
@ 2018-10-29  2:52           ` Xiao Guangrong
  0 siblings, 0 replies; 28+ messages in thread
From: Xiao Guangrong @ 2018-10-29  2:52 UTC (permalink / raw)
  To: Paolo Bonzini, Emilio G. Cota
  Cc: mst, mtosatti, qemu-devel, kvm, dgilbert, peterx, wei.w.wang,
	jiang.biao2, eblake, quintela, Xiao Guangrong



On 10/28/2018 03:50 PM, Paolo Bonzini wrote:
> On 27/10/2018 01:33, Emilio G. Cota wrote:
>> On Wed, Oct 17, 2018 at 12:10:15 +0200, Paolo Bonzini wrote:
>>> On 16/10/2018 13:10, guangrong.xiao@gmail.com wrote:
>>
>>> An idea: the total number of requests is going to be very small, and a
>>> PtrRing is not the nicest data structure for multiple producer/single
>>> consumer.  So you could instead:
>> (snip)
>>> - now that you have request indices, you can replace the completion
>>> ptr_ring with a bitmap, and set a bit in the bitmap with set_bit_atomic
>>> to report completion.  On the writer side you use find_next_bit to find
>> (snip)
>>> Emilio, can you review the above ideas?
>>
>> Sorry it took me a while to go through this.
>>
>> I like your suggestions. Just one nit; I'm not sure I understood
>> the use case very well, but I think using a bitmap to signal
>> completion might be suboptimal, since we'd have several
>> thread spinning on the same cacheline yet caring about
>> different bits.
> 
> Requests are asynchronous, the bitmap is only used to find a free
> submission slot.  You're right that the bitmap can bounce across
> processors, but I'm not sure how else you would do that because you
> don't know in advance how many submitting threads you have.  It wouldn't
> be any worse if there was a spinlock.
> 
> However, in the migration case there is only one submitting thread, so
> it's okay. :)
> 

Yup.

The cache contention only exists in the work threads, the sumbiter thread
is totally free who is the main migration thread. Making the main thread
be faster is good.

> Paolo
> 
>> Xiao: a couple of suggestions
>>
>> - Since you'll be adding a generic module, make its commit and
>>    description self-contained. That is, mentioning in the
>>    log that this will be used for migration is fine, but please
>>    describe the module (and the assumptions it makes about its
>>    users) in general, so that someone that doesn't know anything
>>    about migration can still understand this module (and hopefully
>>    adopt it for other use cases).

Good to me, i will add more detailed description for this module in
the next version.

>>
>> - I'd like to see a simple test program (or rather, benchmark)
>>    that shows how this works. This benchmark would be completely
>>    unrelated to migration; it should just be a simple test of
>>    the performance/scalability of this module.
>>    Having this benchmark would help (1) discuss and quantitately
>>    evaluate modifications to the module, and (2) help others to
>>    quickly understand what the module does.
>>    See tests/qht-bench.c for an example.
>>

Can not agree with you more, will do. :)

Thank you, Emilio and Paolo!

^ permalink raw reply	[flat|nested] 28+ messages in thread

end of thread, other threads:[~2018-10-29  2:52 UTC | newest]

Thread overview: 28+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-10-16 11:10 [PATCH 0/4] migration: improve multithreads guangrong.xiao
2018-10-16 11:10 ` [Qemu-devel] " guangrong.xiao
2018-10-16 11:10 ` [PATCH 1/4] ptr_ring: port ptr_ring from linux kernel to QEMU guangrong.xiao
2018-10-16 11:10   ` [Qemu-devel] " guangrong.xiao
2018-10-16 16:40   ` Emilio G. Cota
2018-10-16 16:40     ` [Qemu-devel] " Emilio G. Cota
2018-10-17  8:14     ` Paolo Bonzini
2018-10-17  8:14       ` [Qemu-devel] " Paolo Bonzini
2018-10-18  6:52       ` Xiao Guangrong
2018-10-18  6:52         ` [Qemu-devel] " Xiao Guangrong
2018-10-16 11:10 ` [PATCH 2/4] migration: introduce lockless multithreads model guangrong.xiao
2018-10-16 11:10   ` [Qemu-devel] " guangrong.xiao
2018-10-17 10:10   ` Paolo Bonzini
2018-10-17 10:10     ` [Qemu-devel] " Paolo Bonzini
2018-10-18  9:30     ` Xiao Guangrong
2018-10-18  9:30       ` [Qemu-devel] " Xiao Guangrong
2018-10-18 10:39       ` Paolo Bonzini
2018-10-18 10:39         ` [Qemu-devel] " Paolo Bonzini
2018-10-26 23:33     ` Emilio G. Cota
2018-10-26 23:33       ` [Qemu-devel] " Emilio G. Cota
2018-10-28  7:50       ` Paolo Bonzini
2018-10-28  7:50         ` [Qemu-devel] " Paolo Bonzini
2018-10-29  2:52         ` Xiao Guangrong
2018-10-29  2:52           ` [Qemu-devel] " Xiao Guangrong
2018-10-16 11:10 ` [PATCH 3/4] migration: use lockless Multithread model for compression guangrong.xiao
2018-10-16 11:10   ` [Qemu-devel] " guangrong.xiao
2018-10-16 11:10 ` [PATCH 4/4] migration: use lockless Multithread model for decompression guangrong.xiao
2018-10-16 11:10   ` [Qemu-devel] " guangrong.xiao

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.