All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH v2 0/5] migration: improve multithreads
@ 2018-11-06 12:20 ` guangrong.xiao
  0 siblings, 0 replies; 24+ messages in thread
From: guangrong.xiao @ 2018-11-06 12:20 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: kvm, quintela, Xiao Guangrong, qemu-devel, peterx, dgilbert,
	wei.w.wang, cota, jiang.biao2

From: Xiao Guangrong <xiaoguangrong@tencent.com>

Changelog in v2:
These changes are based on Paolo's suggestion:
1) rename the lockless multithreads model to threaded workqueue
2) hugely improve the internal design, that make all the request be
   a large array, properly partition it, assign requests to threads
   respectively and use bitmaps to sync up threads and the submitter,
   after that ptr_ring and spinlock are dropped
3) introduce event wait for the submitter

These changes are based on Emilio's review:
4) make more detailed description for threaded workqueue
5) add a benchmark for threaded workqueue

The previous version can be found at
	https://marc.info/?l=kvm&m=153968821910007&w=2

There's the simple performance measurement comparing these two versions,
the environment is the same as we listed in the previous version.

Use 8 threads to compress the data in the source QEMU
- with compress-wait-thread = off


      total time        busy-ratio
--------------------------------------------------
v1    125066            0.38
v2    120444            0.35

- with compress-wait-thread = on
         total time    busy-ratio
--------------------------------------------------
v1    164426            0
v2    142609            0

The v2 win slightly.

Xiao Guangrong (5):
  bitops: introduce change_bit_atomic
  util: introduce threaded workqueue
  migration: use threaded workqueue for compression
  migration: use threaded workqueue for decompression
  tests: add threaded-workqueue-bench

 include/qemu/bitops.h             |  13 +
 include/qemu/threaded-workqueue.h |  94 +++++++
 migration/ram.c                   | 538 ++++++++++++++------------------------
 tests/Makefile.include            |   5 +-
 tests/threaded-workqueue-bench.c  | 256 ++++++++++++++++++
 util/Makefile.objs                |   1 +
 util/threaded-workqueue.c         | 466 +++++++++++++++++++++++++++++++++
 7 files changed, 1030 insertions(+), 343 deletions(-)
 create mode 100644 include/qemu/threaded-workqueue.h
 create mode 100644 tests/threaded-workqueue-bench.c
 create mode 100644 util/threaded-workqueue.c

-- 
2.14.5

^ permalink raw reply	[flat|nested] 24+ messages in thread

* [Qemu-devel] [PATCH v2 0/5] migration: improve multithreads
@ 2018-11-06 12:20 ` guangrong.xiao
  0 siblings, 0 replies; 24+ messages in thread
From: guangrong.xiao @ 2018-11-06 12:20 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: qemu-devel, kvm, dgilbert, peterx, wei.w.wang, jiang.biao2,
	eblake, quintela, cota, Xiao Guangrong

From: Xiao Guangrong <xiaoguangrong@tencent.com>

Changelog in v2:
These changes are based on Paolo's suggestion:
1) rename the lockless multithreads model to threaded workqueue
2) hugely improve the internal design, that make all the request be
   a large array, properly partition it, assign requests to threads
   respectively and use bitmaps to sync up threads and the submitter,
   after that ptr_ring and spinlock are dropped
3) introduce event wait for the submitter

These changes are based on Emilio's review:
4) make more detailed description for threaded workqueue
5) add a benchmark for threaded workqueue

The previous version can be found at
	https://marc.info/?l=kvm&m=153968821910007&w=2

There's the simple performance measurement comparing these two versions,
the environment is the same as we listed in the previous version.

Use 8 threads to compress the data in the source QEMU
- with compress-wait-thread = off


      total time        busy-ratio
--------------------------------------------------
v1    125066            0.38
v2    120444            0.35

- with compress-wait-thread = on
         total time    busy-ratio
--------------------------------------------------
v1    164426            0
v2    142609            0

The v2 win slightly.

Xiao Guangrong (5):
  bitops: introduce change_bit_atomic
  util: introduce threaded workqueue
  migration: use threaded workqueue for compression
  migration: use threaded workqueue for decompression
  tests: add threaded-workqueue-bench

 include/qemu/bitops.h             |  13 +
 include/qemu/threaded-workqueue.h |  94 +++++++
 migration/ram.c                   | 538 ++++++++++++++------------------------
 tests/Makefile.include            |   5 +-
 tests/threaded-workqueue-bench.c  | 256 ++++++++++++++++++
 util/Makefile.objs                |   1 +
 util/threaded-workqueue.c         | 466 +++++++++++++++++++++++++++++++++
 7 files changed, 1030 insertions(+), 343 deletions(-)
 create mode 100644 include/qemu/threaded-workqueue.h
 create mode 100644 tests/threaded-workqueue-bench.c
 create mode 100644 util/threaded-workqueue.c

-- 
2.14.5

^ permalink raw reply	[flat|nested] 24+ messages in thread

* [PATCH v2 1/5] bitops: introduce change_bit_atomic
  2018-11-06 12:20 ` [Qemu-devel] " guangrong.xiao
@ 2018-11-06 12:20   ` guangrong.xiao
  -1 siblings, 0 replies; 24+ messages in thread
From: guangrong.xiao @ 2018-11-06 12:20 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: kvm, quintela, Xiao Guangrong, qemu-devel, peterx, dgilbert,
	wei.w.wang, cota, jiang.biao2

From: Xiao Guangrong <xiaoguangrong@tencent.com>

It will be used by threaded workqueue

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 include/qemu/bitops.h | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/include/qemu/bitops.h b/include/qemu/bitops.h
index 3f0926cf40..c522958852 100644
--- a/include/qemu/bitops.h
+++ b/include/qemu/bitops.h
@@ -79,6 +79,19 @@ static inline void change_bit(long nr, unsigned long *addr)
     *p ^= mask;
 }
 
+/**
+ * change_bit_atomic - Toggle a bit in memory atomically
+ * @nr: Bit to change
+ * @addr: Address to start counting from
+ */
+static inline void change_bit_atomic(long nr, unsigned long *addr)
+{
+    unsigned long mask = BIT_MASK(nr);
+    unsigned long *p = addr + BIT_WORD(nr);
+
+    atomic_xor(p, mask);
+}
+
 /**
  * test_and_set_bit - Set a bit and return its old value
  * @nr: Bit to set
-- 
2.14.5

^ permalink raw reply related	[flat|nested] 24+ messages in thread

* [Qemu-devel] [PATCH v2 1/5] bitops: introduce change_bit_atomic
@ 2018-11-06 12:20   ` guangrong.xiao
  0 siblings, 0 replies; 24+ messages in thread
From: guangrong.xiao @ 2018-11-06 12:20 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: qemu-devel, kvm, dgilbert, peterx, wei.w.wang, jiang.biao2,
	eblake, quintela, cota, Xiao Guangrong

From: Xiao Guangrong <xiaoguangrong@tencent.com>

It will be used by threaded workqueue

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 include/qemu/bitops.h | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/include/qemu/bitops.h b/include/qemu/bitops.h
index 3f0926cf40..c522958852 100644
--- a/include/qemu/bitops.h
+++ b/include/qemu/bitops.h
@@ -79,6 +79,19 @@ static inline void change_bit(long nr, unsigned long *addr)
     *p ^= mask;
 }
 
+/**
+ * change_bit_atomic - Toggle a bit in memory atomically
+ * @nr: Bit to change
+ * @addr: Address to start counting from
+ */
+static inline void change_bit_atomic(long nr, unsigned long *addr)
+{
+    unsigned long mask = BIT_MASK(nr);
+    unsigned long *p = addr + BIT_WORD(nr);
+
+    atomic_xor(p, mask);
+}
+
 /**
  * test_and_set_bit - Set a bit and return its old value
  * @nr: Bit to set
-- 
2.14.5

^ permalink raw reply related	[flat|nested] 24+ messages in thread

* [PATCH v2 2/5] util: introduce threaded workqueue
  2018-11-06 12:20 ` [Qemu-devel] " guangrong.xiao
@ 2018-11-06 12:20   ` guangrong.xiao
  -1 siblings, 0 replies; 24+ messages in thread
From: guangrong.xiao @ 2018-11-06 12:20 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: kvm, quintela, Xiao Guangrong, qemu-devel, peterx, dgilbert,
	wei.w.wang, cota, jiang.biao2

From: Xiao Guangrong <xiaoguangrong@tencent.com>

This modules implements the lockless and efficient threaded workqueue.

Three abstracted objects are used in this module:
- Request.
     It not only contains the data that the workqueue fetches out
    to finish the request but also offers the space to save the result
    after the workqueue handles the request.

    It's flowed between user and workqueue. The user fills the request
    data into it when it is owned by user. After it is submitted to the
    workqueue, the workqueue fetched data out and save the result into
    it after the request is handled.

    All the requests are pre-allocated and carefully partitioned between
    threads so there is no contention on the request, that make threads
    be parallel as much as possible.

  - User, i.e, the submitter
    It's the one fills the request and submits it to the workqueue,
    the result will be collected after it is handled by the work queue.

    The user can consecutively submit requests without waiting the previous
    requests been handled.
    It only supports one submitter, you should do serial submission by
    yourself if you want more, e.g, use lock on you side.

  - Workqueue, i.e, thread
    Each workqueue is represented by a running thread that fetches
    the request submitted by the user, do the specified work and save
    the result to the request.

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 include/qemu/threaded-workqueue.h |  94 ++++++++
 util/Makefile.objs                |   1 +
 util/threaded-workqueue.c         | 466 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 561 insertions(+)
 create mode 100644 include/qemu/threaded-workqueue.h
 create mode 100644 util/threaded-workqueue.c

diff --git a/include/qemu/threaded-workqueue.h b/include/qemu/threaded-workqueue.h
new file mode 100644
index 0000000000..d7eb66c8d2
--- /dev/null
+++ b/include/qemu/threaded-workqueue.h
@@ -0,0 +1,94 @@
+/*
+ * Lockless and Efficient Threaded Workqueue Abstraction
+ *
+ * Author:
+ *   Xiao Guangrong <xiaoguangrong@tencent.com>
+ *
+ * Copyright(C) 2018 Tencent Corporation.
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#ifndef QEMU_THREADED_WORKQUEUE_H
+#define QEMU_THREADED_WORKQUEUE_H
+
+#include "qemu/queue.h"
+#include "qemu/thread.h"
+
+/*
+ * This modules implements the lockless and efficient threaded workqueue.
+ *
+ * Three abstracted objects are used in this module:
+ * - Request.
+ *   It not only contains the data that the workqueue fetches out
+ *   to finish the request but also offers the space to save the result
+ *   after the workqueue handles the request.
+ *
+ *   It's flowed between user and workqueue. The user fills the request
+ *   data into it when it is owned by user. After it is submitted to the
+ *   workqueue, the workqueue fetched data out and save the result into
+ *   it after the request is handled.
+ *
+ *   All the requests are pre-allocated and carefully partitioned between
+ *   threads so there is no contention on the request, that make threads
+ *   be parallel as much as possible.
+ *
+ * - User, i.e, the submitter
+ *   It's the one fills the request and submits it to the workqueue,
+ *   the result will be collected after it is handled by the work queue.
+ *
+ *   The user can consecutively submit requests without waiting the previous
+ *   requests been handled.
+ *   It only supports one submitter, you should do serial submission by
+ *   yourself if you want more, e.g, use lock on you side.
+ *
+ * - Workqueue, i.e, thread
+ *   Each workqueue is represented by a running thread that fetches
+ *   the request submitted by the user, do the specified work and save
+ *   the result to the request.
+ */
+
+typedef struct Threads Threads;
+
+struct ThreadedWorkqueueOps {
+    /* return the size of each request */
+    int (*thread_get_request_size)(void);
+
+    /* constructor of the request */
+    int (*thread_request_init)(void *request);
+    /*  destructor of the request */
+    void (*thread_request_uninit)(void *request);
+
+    /* the handler of the request that is called by the thread */
+    void (*thread_request_handler)(void *request);
+    /* called by the user after the request has been handled */
+    void (*thread_request_done)(void *request);
+};
+typedef struct ThreadedWorkqueueOps ThreadedWorkqueueOps;
+
+/* the default number of requests that thread need handle */
+#define DEFAULT_THREAD_REQUEST_NR 4
+
+Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr,
+                                   int thread_request_nr,
+                                   ThreadedWorkqueueOps *ops);
+
+void threaded_workqueue_destroy(Threads *threads);
+
+/*
+ * find a free request where the user can store the data that is needed to
+ * finish the request
+ *
+ * If all requests are used up, return NULL
+ */
+void *threaded_workqueue_get_request(Threads *threads);
+/* submit the request and notify the thread */
+void threaded_workqueue_submit_request(Threads *threads, void *request);
+
+/*
+ * wait all threads to complete the request to make sure there is no
+ * previous request exists.
+ */
+void threaded_workqueue_wait_for_requests(Threads *threads);
+#endif
diff --git a/util/Makefile.objs b/util/Makefile.objs
index 0820923c18..f26dfe5182 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -50,5 +50,6 @@ util-obj-y += range.o
 util-obj-y += stats64.o
 util-obj-y += systemd.o
 util-obj-y += iova-tree.o
+util-obj-y += threaded-workqueue.o
 util-obj-$(CONFIG_LINUX) += vfio-helpers.o
 util-obj-$(CONFIG_OPENGL) += drm.o
diff --git a/util/threaded-workqueue.c b/util/threaded-workqueue.c
new file mode 100644
index 0000000000..966479631a
--- /dev/null
+++ b/util/threaded-workqueue.c
@@ -0,0 +1,466 @@
+/*
+ * Lockless and Efficient Threaded Workqueue Abstraction
+ *
+ * Author:
+ *   Xiao Guangrong <xiaoguangrong@tencent.com>
+ *
+ * Copyright(C) 2018 Tencent Corporation.
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/bitmap.h"
+#include "qemu/threaded-workqueue.h"
+
+#define SMP_CACHE_BYTES 64
+#define BITS_ALIGNED_TO_CACHE(_bits_)   \
+    QEMU_ALIGN_UP(_bits_, SMP_CACHE_BYTES * BITS_PER_BYTE)
+
+/*
+ * the request representation which contains the internally used mete data,
+ * it is the header of user-defined data.
+ *
+ * It should be aligned to the nature size of CPU.
+ */
+struct ThreadRequest {
+    /*
+     * the request has been handled by the thread and need the user
+     * to fetch result out.
+     */
+    bool done;
+    /*
+     * the index to Threads::requests.
+     * Save it to the padding space although it can be calculated at runtime.
+     */
+    int index;
+};
+typedef struct ThreadRequest ThreadRequest;
+
+struct ThreadLocal {
+    struct Threads *threads;
+
+    /*
+     * the request region in Threads::requests that the thread
+     * need handle
+     */
+    int start_request_index;
+    int end_request_index;
+
+    /*
+     * the interim bitmap used by the thread to avoid frequent
+     * memory allocation
+     */
+    unsigned long *result_bitmap;
+
+    /* the index of the thread */
+    int self;
+
+    /* thread is useless and needs to exit */
+    bool quit;
+
+    QemuThread thread;
+
+    /* the event used to wake up the thread */
+    QemuEvent ev;
+};
+typedef struct ThreadLocal ThreadLocal;
+
+/*
+ * the main data struct represents multithreads which is shared by
+ * all threads
+ */
+struct Threads {
+    /*
+     * in order to avoid contention, the @requests is partitioned to
+     * @threads_nr pieces, each thread exclusively handles
+     * @thread_request_nr requests in the array.
+     */
+    void *requests;
+
+    /*
+     * the bit in these two bitmaps indicates the index of the @requests
+     * respectively. If it's the same, the corresponding request is free
+     * and owned by the user, i.e, where the user fills a request. Otherwise,
+     * it is valid and owned by the thread, i.e, where the thread fetches
+     * the request and write the result.
+     */
+
+    /* after the user fills the request, the bit is flipped. */
+    unsigned long *request_fill_bitmap;
+    /* after handles the request, the thread flips the bit. */
+    unsigned long *request_done_bitmap;
+
+    /*
+     * the interim bitmap used by the user to avoid frequent
+     * memory allocation
+     */
+    unsigned long *result_bitmap;
+
+    /* the request header, ThreadRequest, is contained */
+    unsigned int request_size;
+
+    /* the number of requests that each thread need handle */
+    unsigned int thread_request_nr;
+    unsigned int total_requests;
+
+    unsigned int threads_nr;
+
+    /* the request is pushed to the thread with round-robin manner */
+    unsigned int current_thread_index;
+
+    ThreadedWorkqueueOps *ops;
+
+    const char *name;
+    QemuEvent ev;
+
+    ThreadLocal per_thread_data[0];
+};
+typedef struct Threads Threads;
+
+static ThreadRequest *index_to_request(Threads *threads, int request_index)
+{
+    ThreadRequest *request;
+
+    request = threads->requests + request_index * threads->request_size;
+
+    assert(request->index == request_index);
+    return request;
+}
+
+static int request_to_index(ThreadRequest *request)
+{
+    return request->index;
+}
+
+static int thread_to_first_request_index(Threads *threads, int thread_id)
+{
+    thread_id %= threads->threads_nr;
+    return thread_id * threads->thread_request_nr;
+}
+
+static int request_index_to_thread(Threads *threads, int request_index)
+{
+    return request_index / threads->thread_request_nr;
+}
+
+/*
+ * free request: the request is not used by any thread, however, it might
+ *   contian the result need the user to call thread_request_done()
+ *
+ * valid request: the request contains the request data and it's commited
+ *   to the thread, i,e. it's owned by thread.
+ */
+static unsigned long *get_free_request_bitmap(Threads *threads)
+{
+    bitmap_xor(threads->result_bitmap, threads->request_fill_bitmap,
+               threads->request_done_bitmap, threads->total_requests);
+
+    /*
+     * paired with smp_wmb() in mark_request_free() to make sure that we
+     * read request_done_bitmap before fetch the result out.
+     */
+    smp_rmb();
+
+    return threads->result_bitmap;
+}
+
+static int find_free_request_index(Threads *threads)
+{
+    unsigned long *result_bitmap = get_free_request_bitmap(threads);
+    int index, cur_index;
+
+    cur_index = thread_to_first_request_index(threads,
+                                              threads->current_thread_index);
+
+retry:
+    index = find_next_zero_bit(result_bitmap, threads->total_requests,
+                               cur_index);
+    if (index < threads->total_requests) {
+        return index;
+    }
+
+    /* if we get nothing, start it over. */
+    if (cur_index != 0) {
+        cur_index = 0;
+        goto retry;
+    }
+
+    return -1;
+}
+
+static void mark_request_valid(Threads *threads, int request_index)
+{
+    /*
+     * paired with smp_rmb() in find_first_valid_request_index() to make
+     * sure the request has been filled before the bit is flipped that
+     * will make the request be visible to the thread
+     */
+    smp_wmb();
+
+    change_bit(request_index, threads->request_fill_bitmap);
+}
+
+static int thread_find_first_valid_request_index(ThreadLocal *thread)
+{
+    Threads *threads = thread->threads;
+    int index;
+
+    bitmap_xor(thread->result_bitmap, threads->request_fill_bitmap,
+               threads->request_done_bitmap, threads->total_requests);
+    /*
+     * paired with smp_wmb() in mark_request_valid() to make sure that
+     * we read request_fill_bitmap before fetch the request out.
+     */
+    smp_rmb();
+
+    index = find_next_bit(thread->result_bitmap, threads->total_requests,
+                          thread->start_request_index);
+    return index > thread->end_request_index ? -1 : index;
+}
+
+static void mark_request_free(ThreadLocal *thread, ThreadRequest *request)
+{
+    int index = request_to_index(request);
+
+    /*
+     * smp_wmb() is implied in change_bit_atomic() that is paired with
+     * smp_rmb() in get_free_request_bitmap() to make sure the result
+     * has been saved before the bit is flipped.
+     */
+    change_bit_atomic(index, thread->threads->request_done_bitmap);
+}
+
+/* retry to see if there is available request before actually go to wait. */
+#define BUSY_WAIT_COUNT 1000
+
+static ThreadRequest *
+thread_busy_wait_for_request(ThreadLocal *thread)
+{
+    int index, count = 0;
+
+    for (count = 0; count < BUSY_WAIT_COUNT; count++) {
+        index = thread_find_first_valid_request_index(thread);
+        if (index >= 0) {
+            assert(index >= thread->start_request_index &&
+                   index <= thread->end_request_index);
+            return index_to_request(thread->threads, index);
+        }
+
+        cpu_relax();
+    }
+
+    return NULL;
+}
+
+static void *thread_run(void *opaque)
+{
+    ThreadLocal *self_data = (ThreadLocal *)opaque;
+    Threads *threads = self_data->threads;
+    void (*handler)(void *request) = threads->ops->thread_request_handler;
+    ThreadRequest *request;
+
+    for ( ; !atomic_read(&self_data->quit); ) {
+        qemu_event_reset(&self_data->ev);
+
+        request = thread_busy_wait_for_request(self_data);
+        if (!request) {
+            qemu_event_wait(&self_data->ev);
+            continue;
+        }
+
+        assert(!request->done);
+
+        handler(request + 1);
+        request->done = true;
+        mark_request_free(self_data, request);
+        qemu_event_set(&threads->ev);
+    }
+
+    return NULL;
+}
+
+static void uninit_requests(Threads *threads, int free_nr)
+{
+    ThreadRequest *request;
+    int i;
+
+    for (request = threads->requests, i = 0; i < free_nr; i++) {
+        threads->ops->thread_request_uninit(request + 1);
+        request = (void *)request + threads->request_size;
+    }
+
+    g_free(threads->result_bitmap);
+    g_free(threads->request_fill_bitmap);
+    g_free(threads->request_done_bitmap);
+    g_free(threads->requests);
+}
+
+static int init_requests(Threads *threads)
+{
+    ThreadRequest *request;
+    int aligned_requests, free_nr = 0, ret = -1;
+
+    aligned_requests = BITS_ALIGNED_TO_CACHE(threads->total_requests);
+    threads->request_fill_bitmap = bitmap_new(aligned_requests);
+    threads->request_done_bitmap = bitmap_new(aligned_requests);
+    threads->result_bitmap = bitmap_new(threads->total_requests);
+
+    QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(sizeof(ThreadRequest), sizeof(long)));
+
+    threads->request_size = threads->ops->thread_get_request_size();
+    threads->request_size = QEMU_ALIGN_UP(threads->request_size, sizeof(long));
+    threads->request_size += sizeof(ThreadRequest);
+    threads->requests = g_try_malloc0_n(threads->total_requests,
+                                        threads->request_size);
+    if (!threads->requests) {
+        goto exit;
+    }
+
+    for (request = threads->requests; free_nr < threads->total_requests;
+        free_nr++) {
+        ret = threads->ops->thread_request_init(request + 1);
+        if (ret < 0) {
+            goto exit;
+        }
+
+        request->index = free_nr;
+        request = (void *)request + threads->request_size;
+    }
+
+    return 0;
+
+exit:
+    uninit_requests(threads, free_nr);
+    return ret;
+}
+
+static void uninit_thread_data(Threads *threads)
+{
+    ThreadLocal *thread_local = threads->per_thread_data;
+    int i;
+
+    for (i = 0; i < threads->threads_nr; i++) {
+        thread_local[i].quit = true;
+        qemu_event_set(&thread_local[i].ev);
+        qemu_thread_join(&thread_local[i].thread);
+        qemu_event_destroy(&thread_local[i].ev);
+        g_free(thread_local[i].result_bitmap);
+    }
+}
+
+static void init_thread_data(Threads *threads)
+{
+    ThreadLocal *thread_local = threads->per_thread_data;
+    char *name;
+    int start_index, end_index, i;
+
+    for (i = 0; i < threads->threads_nr; i++) {
+        thread_local[i].threads = threads;
+        thread_local[i].self = i;
+
+        start_index = thread_to_first_request_index(threads, i);
+        end_index = start_index + threads->thread_request_nr - 1;
+        thread_local[i].start_request_index = start_index;
+        thread_local[i].end_request_index = end_index;
+
+        thread_local[i].result_bitmap = bitmap_new(threads->total_requests);
+
+        qemu_event_init(&thread_local[i].ev, false);
+
+        name = g_strdup_printf("%s/%d", threads->name, thread_local[i].self);
+        qemu_thread_create(&thread_local[i].thread, name,
+                           thread_run, &thread_local[i], QEMU_THREAD_JOINABLE);
+        g_free(name);
+    }
+}
+
+Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr,
+                               int thread_request_nr, ThreadedWorkqueueOps *ops)
+{
+    Threads *threads;
+
+    threads = g_malloc0(sizeof(*threads) + threads_nr * sizeof(ThreadLocal));
+    threads->name = name;
+    threads->ops = ops;
+
+    threads->threads_nr = threads_nr;
+    threads->thread_request_nr = thread_request_nr;
+
+    threads->total_requests = thread_request_nr * threads_nr;
+    if (init_requests(threads) < 0) {
+        g_free(threads);
+        return NULL;
+    }
+
+    qemu_event_init(&threads->ev, false);
+    init_thread_data(threads);
+    return threads;
+}
+
+void threaded_workqueue_destroy(Threads *threads)
+{
+    uninit_thread_data(threads);
+    uninit_requests(threads, threads->total_requests);
+    qemu_event_destroy(&threads->ev);
+    g_free(threads);
+}
+
+static void request_done(Threads *threads, ThreadRequest *request)
+{
+    if (!request->done) {
+        return;
+    }
+
+    threads->ops->thread_request_done(request + 1);
+    request->done = false;
+}
+
+void *threaded_workqueue_get_request(Threads *threads)
+{
+    ThreadRequest *request;
+    int index;
+
+    index = find_free_request_index(threads);
+    if (index < 0) {
+        return NULL;
+    }
+
+    request = index_to_request(threads, index);
+    request_done(threads, request);
+    return request + 1;
+}
+
+void threaded_workqueue_submit_request(Threads *threads, void *request)
+{
+    ThreadRequest *req = request - sizeof(ThreadRequest);
+    int request_index = request_to_index(req);
+    int thread_index = request_index_to_thread(threads, request_index);
+    ThreadLocal *thread_local = &threads->per_thread_data[thread_index];
+
+    assert(!req->done);
+
+    mark_request_valid(threads, request_index);
+
+    threads->current_thread_index = ++thread_index;
+    qemu_event_set(&thread_local->ev);
+}
+
+void threaded_workqueue_wait_for_requests(Threads *threads)
+{
+    unsigned long *result_bitmap;
+    int index = 0;
+
+retry:
+    qemu_event_reset(&threads->ev);
+    result_bitmap = get_free_request_bitmap(threads);
+    for (; index < threads->total_requests; index++) {
+        if (test_bit(index, result_bitmap)) {
+            qemu_event_wait(&threads->ev);
+            goto retry;
+        };
+
+        request_done(threads, index_to_request(threads, index));
+    }
+}
-- 
2.14.5

^ permalink raw reply related	[flat|nested] 24+ messages in thread

* [Qemu-devel] [PATCH v2 2/5] util: introduce threaded workqueue
@ 2018-11-06 12:20   ` guangrong.xiao
  0 siblings, 0 replies; 24+ messages in thread
From: guangrong.xiao @ 2018-11-06 12:20 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: qemu-devel, kvm, dgilbert, peterx, wei.w.wang, jiang.biao2,
	eblake, quintela, cota, Xiao Guangrong

From: Xiao Guangrong <xiaoguangrong@tencent.com>

This modules implements the lockless and efficient threaded workqueue.

Three abstracted objects are used in this module:
- Request.
     It not only contains the data that the workqueue fetches out
    to finish the request but also offers the space to save the result
    after the workqueue handles the request.

    It's flowed between user and workqueue. The user fills the request
    data into it when it is owned by user. After it is submitted to the
    workqueue, the workqueue fetched data out and save the result into
    it after the request is handled.

    All the requests are pre-allocated and carefully partitioned between
    threads so there is no contention on the request, that make threads
    be parallel as much as possible.

  - User, i.e, the submitter
    It's the one fills the request and submits it to the workqueue,
    the result will be collected after it is handled by the work queue.

    The user can consecutively submit requests without waiting the previous
    requests been handled.
    It only supports one submitter, you should do serial submission by
    yourself if you want more, e.g, use lock on you side.

  - Workqueue, i.e, thread
    Each workqueue is represented by a running thread that fetches
    the request submitted by the user, do the specified work and save
    the result to the request.

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 include/qemu/threaded-workqueue.h |  94 ++++++++
 util/Makefile.objs                |   1 +
 util/threaded-workqueue.c         | 466 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 561 insertions(+)
 create mode 100644 include/qemu/threaded-workqueue.h
 create mode 100644 util/threaded-workqueue.c

diff --git a/include/qemu/threaded-workqueue.h b/include/qemu/threaded-workqueue.h
new file mode 100644
index 0000000000..d7eb66c8d2
--- /dev/null
+++ b/include/qemu/threaded-workqueue.h
@@ -0,0 +1,94 @@
+/*
+ * Lockless and Efficient Threaded Workqueue Abstraction
+ *
+ * Author:
+ *   Xiao Guangrong <xiaoguangrong@tencent.com>
+ *
+ * Copyright(C) 2018 Tencent Corporation.
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#ifndef QEMU_THREADED_WORKQUEUE_H
+#define QEMU_THREADED_WORKQUEUE_H
+
+#include "qemu/queue.h"
+#include "qemu/thread.h"
+
+/*
+ * This modules implements the lockless and efficient threaded workqueue.
+ *
+ * Three abstracted objects are used in this module:
+ * - Request.
+ *   It not only contains the data that the workqueue fetches out
+ *   to finish the request but also offers the space to save the result
+ *   after the workqueue handles the request.
+ *
+ *   It's flowed between user and workqueue. The user fills the request
+ *   data into it when it is owned by user. After it is submitted to the
+ *   workqueue, the workqueue fetched data out and save the result into
+ *   it after the request is handled.
+ *
+ *   All the requests are pre-allocated and carefully partitioned between
+ *   threads so there is no contention on the request, that make threads
+ *   be parallel as much as possible.
+ *
+ * - User, i.e, the submitter
+ *   It's the one fills the request and submits it to the workqueue,
+ *   the result will be collected after it is handled by the work queue.
+ *
+ *   The user can consecutively submit requests without waiting the previous
+ *   requests been handled.
+ *   It only supports one submitter, you should do serial submission by
+ *   yourself if you want more, e.g, use lock on you side.
+ *
+ * - Workqueue, i.e, thread
+ *   Each workqueue is represented by a running thread that fetches
+ *   the request submitted by the user, do the specified work and save
+ *   the result to the request.
+ */
+
+typedef struct Threads Threads;
+
+struct ThreadedWorkqueueOps {
+    /* return the size of each request */
+    int (*thread_get_request_size)(void);
+
+    /* constructor of the request */
+    int (*thread_request_init)(void *request);
+    /*  destructor of the request */
+    void (*thread_request_uninit)(void *request);
+
+    /* the handler of the request that is called by the thread */
+    void (*thread_request_handler)(void *request);
+    /* called by the user after the request has been handled */
+    void (*thread_request_done)(void *request);
+};
+typedef struct ThreadedWorkqueueOps ThreadedWorkqueueOps;
+
+/* the default number of requests that thread need handle */
+#define DEFAULT_THREAD_REQUEST_NR 4
+
+Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr,
+                                   int thread_request_nr,
+                                   ThreadedWorkqueueOps *ops);
+
+void threaded_workqueue_destroy(Threads *threads);
+
+/*
+ * find a free request where the user can store the data that is needed to
+ * finish the request
+ *
+ * If all requests are used up, return NULL
+ */
+void *threaded_workqueue_get_request(Threads *threads);
+/* submit the request and notify the thread */
+void threaded_workqueue_submit_request(Threads *threads, void *request);
+
+/*
+ * wait all threads to complete the request to make sure there is no
+ * previous request exists.
+ */
+void threaded_workqueue_wait_for_requests(Threads *threads);
+#endif
diff --git a/util/Makefile.objs b/util/Makefile.objs
index 0820923c18..f26dfe5182 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -50,5 +50,6 @@ util-obj-y += range.o
 util-obj-y += stats64.o
 util-obj-y += systemd.o
 util-obj-y += iova-tree.o
+util-obj-y += threaded-workqueue.o
 util-obj-$(CONFIG_LINUX) += vfio-helpers.o
 util-obj-$(CONFIG_OPENGL) += drm.o
diff --git a/util/threaded-workqueue.c b/util/threaded-workqueue.c
new file mode 100644
index 0000000000..966479631a
--- /dev/null
+++ b/util/threaded-workqueue.c
@@ -0,0 +1,466 @@
+/*
+ * Lockless and Efficient Threaded Workqueue Abstraction
+ *
+ * Author:
+ *   Xiao Guangrong <xiaoguangrong@tencent.com>
+ *
+ * Copyright(C) 2018 Tencent Corporation.
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/bitmap.h"
+#include "qemu/threaded-workqueue.h"
+
+#define SMP_CACHE_BYTES 64
+#define BITS_ALIGNED_TO_CACHE(_bits_)   \
+    QEMU_ALIGN_UP(_bits_, SMP_CACHE_BYTES * BITS_PER_BYTE)
+
+/*
+ * the request representation which contains the internally used mete data,
+ * it is the header of user-defined data.
+ *
+ * It should be aligned to the nature size of CPU.
+ */
+struct ThreadRequest {
+    /*
+     * the request has been handled by the thread and need the user
+     * to fetch result out.
+     */
+    bool done;
+    /*
+     * the index to Threads::requests.
+     * Save it to the padding space although it can be calculated at runtime.
+     */
+    int index;
+};
+typedef struct ThreadRequest ThreadRequest;
+
+struct ThreadLocal {
+    struct Threads *threads;
+
+    /*
+     * the request region in Threads::requests that the thread
+     * need handle
+     */
+    int start_request_index;
+    int end_request_index;
+
+    /*
+     * the interim bitmap used by the thread to avoid frequent
+     * memory allocation
+     */
+    unsigned long *result_bitmap;
+
+    /* the index of the thread */
+    int self;
+
+    /* thread is useless and needs to exit */
+    bool quit;
+
+    QemuThread thread;
+
+    /* the event used to wake up the thread */
+    QemuEvent ev;
+};
+typedef struct ThreadLocal ThreadLocal;
+
+/*
+ * the main data struct represents multithreads which is shared by
+ * all threads
+ */
+struct Threads {
+    /*
+     * in order to avoid contention, the @requests is partitioned to
+     * @threads_nr pieces, each thread exclusively handles
+     * @thread_request_nr requests in the array.
+     */
+    void *requests;
+
+    /*
+     * the bit in these two bitmaps indicates the index of the @requests
+     * respectively. If it's the same, the corresponding request is free
+     * and owned by the user, i.e, where the user fills a request. Otherwise,
+     * it is valid and owned by the thread, i.e, where the thread fetches
+     * the request and write the result.
+     */
+
+    /* after the user fills the request, the bit is flipped. */
+    unsigned long *request_fill_bitmap;
+    /* after handles the request, the thread flips the bit. */
+    unsigned long *request_done_bitmap;
+
+    /*
+     * the interim bitmap used by the user to avoid frequent
+     * memory allocation
+     */
+    unsigned long *result_bitmap;
+
+    /* the request header, ThreadRequest, is contained */
+    unsigned int request_size;
+
+    /* the number of requests that each thread need handle */
+    unsigned int thread_request_nr;
+    unsigned int total_requests;
+
+    unsigned int threads_nr;
+
+    /* the request is pushed to the thread with round-robin manner */
+    unsigned int current_thread_index;
+
+    ThreadedWorkqueueOps *ops;
+
+    const char *name;
+    QemuEvent ev;
+
+    ThreadLocal per_thread_data[0];
+};
+typedef struct Threads Threads;
+
+static ThreadRequest *index_to_request(Threads *threads, int request_index)
+{
+    ThreadRequest *request;
+
+    request = threads->requests + request_index * threads->request_size;
+
+    assert(request->index == request_index);
+    return request;
+}
+
+static int request_to_index(ThreadRequest *request)
+{
+    return request->index;
+}
+
+static int thread_to_first_request_index(Threads *threads, int thread_id)
+{
+    thread_id %= threads->threads_nr;
+    return thread_id * threads->thread_request_nr;
+}
+
+static int request_index_to_thread(Threads *threads, int request_index)
+{
+    return request_index / threads->thread_request_nr;
+}
+
+/*
+ * free request: the request is not used by any thread, however, it might
+ *   contian the result need the user to call thread_request_done()
+ *
+ * valid request: the request contains the request data and it's commited
+ *   to the thread, i,e. it's owned by thread.
+ */
+static unsigned long *get_free_request_bitmap(Threads *threads)
+{
+    bitmap_xor(threads->result_bitmap, threads->request_fill_bitmap,
+               threads->request_done_bitmap, threads->total_requests);
+
+    /*
+     * paired with smp_wmb() in mark_request_free() to make sure that we
+     * read request_done_bitmap before fetch the result out.
+     */
+    smp_rmb();
+
+    return threads->result_bitmap;
+}
+
+static int find_free_request_index(Threads *threads)
+{
+    unsigned long *result_bitmap = get_free_request_bitmap(threads);
+    int index, cur_index;
+
+    cur_index = thread_to_first_request_index(threads,
+                                              threads->current_thread_index);
+
+retry:
+    index = find_next_zero_bit(result_bitmap, threads->total_requests,
+                               cur_index);
+    if (index < threads->total_requests) {
+        return index;
+    }
+
+    /* if we get nothing, start it over. */
+    if (cur_index != 0) {
+        cur_index = 0;
+        goto retry;
+    }
+
+    return -1;
+}
+
+static void mark_request_valid(Threads *threads, int request_index)
+{
+    /*
+     * paired with smp_rmb() in find_first_valid_request_index() to make
+     * sure the request has been filled before the bit is flipped that
+     * will make the request be visible to the thread
+     */
+    smp_wmb();
+
+    change_bit(request_index, threads->request_fill_bitmap);
+}
+
+static int thread_find_first_valid_request_index(ThreadLocal *thread)
+{
+    Threads *threads = thread->threads;
+    int index;
+
+    bitmap_xor(thread->result_bitmap, threads->request_fill_bitmap,
+               threads->request_done_bitmap, threads->total_requests);
+    /*
+     * paired with smp_wmb() in mark_request_valid() to make sure that
+     * we read request_fill_bitmap before fetch the request out.
+     */
+    smp_rmb();
+
+    index = find_next_bit(thread->result_bitmap, threads->total_requests,
+                          thread->start_request_index);
+    return index > thread->end_request_index ? -1 : index;
+}
+
+static void mark_request_free(ThreadLocal *thread, ThreadRequest *request)
+{
+    int index = request_to_index(request);
+
+    /*
+     * smp_wmb() is implied in change_bit_atomic() that is paired with
+     * smp_rmb() in get_free_request_bitmap() to make sure the result
+     * has been saved before the bit is flipped.
+     */
+    change_bit_atomic(index, thread->threads->request_done_bitmap);
+}
+
+/* retry to see if there is available request before actually go to wait. */
+#define BUSY_WAIT_COUNT 1000
+
+static ThreadRequest *
+thread_busy_wait_for_request(ThreadLocal *thread)
+{
+    int index, count = 0;
+
+    for (count = 0; count < BUSY_WAIT_COUNT; count++) {
+        index = thread_find_first_valid_request_index(thread);
+        if (index >= 0) {
+            assert(index >= thread->start_request_index &&
+                   index <= thread->end_request_index);
+            return index_to_request(thread->threads, index);
+        }
+
+        cpu_relax();
+    }
+
+    return NULL;
+}
+
+static void *thread_run(void *opaque)
+{
+    ThreadLocal *self_data = (ThreadLocal *)opaque;
+    Threads *threads = self_data->threads;
+    void (*handler)(void *request) = threads->ops->thread_request_handler;
+    ThreadRequest *request;
+
+    for ( ; !atomic_read(&self_data->quit); ) {
+        qemu_event_reset(&self_data->ev);
+
+        request = thread_busy_wait_for_request(self_data);
+        if (!request) {
+            qemu_event_wait(&self_data->ev);
+            continue;
+        }
+
+        assert(!request->done);
+
+        handler(request + 1);
+        request->done = true;
+        mark_request_free(self_data, request);
+        qemu_event_set(&threads->ev);
+    }
+
+    return NULL;
+}
+
+static void uninit_requests(Threads *threads, int free_nr)
+{
+    ThreadRequest *request;
+    int i;
+
+    for (request = threads->requests, i = 0; i < free_nr; i++) {
+        threads->ops->thread_request_uninit(request + 1);
+        request = (void *)request + threads->request_size;
+    }
+
+    g_free(threads->result_bitmap);
+    g_free(threads->request_fill_bitmap);
+    g_free(threads->request_done_bitmap);
+    g_free(threads->requests);
+}
+
+static int init_requests(Threads *threads)
+{
+    ThreadRequest *request;
+    int aligned_requests, free_nr = 0, ret = -1;
+
+    aligned_requests = BITS_ALIGNED_TO_CACHE(threads->total_requests);
+    threads->request_fill_bitmap = bitmap_new(aligned_requests);
+    threads->request_done_bitmap = bitmap_new(aligned_requests);
+    threads->result_bitmap = bitmap_new(threads->total_requests);
+
+    QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(sizeof(ThreadRequest), sizeof(long)));
+
+    threads->request_size = threads->ops->thread_get_request_size();
+    threads->request_size = QEMU_ALIGN_UP(threads->request_size, sizeof(long));
+    threads->request_size += sizeof(ThreadRequest);
+    threads->requests = g_try_malloc0_n(threads->total_requests,
+                                        threads->request_size);
+    if (!threads->requests) {
+        goto exit;
+    }
+
+    for (request = threads->requests; free_nr < threads->total_requests;
+        free_nr++) {
+        ret = threads->ops->thread_request_init(request + 1);
+        if (ret < 0) {
+            goto exit;
+        }
+
+        request->index = free_nr;
+        request = (void *)request + threads->request_size;
+    }
+
+    return 0;
+
+exit:
+    uninit_requests(threads, free_nr);
+    return ret;
+}
+
+static void uninit_thread_data(Threads *threads)
+{
+    ThreadLocal *thread_local = threads->per_thread_data;
+    int i;
+
+    for (i = 0; i < threads->threads_nr; i++) {
+        thread_local[i].quit = true;
+        qemu_event_set(&thread_local[i].ev);
+        qemu_thread_join(&thread_local[i].thread);
+        qemu_event_destroy(&thread_local[i].ev);
+        g_free(thread_local[i].result_bitmap);
+    }
+}
+
+static void init_thread_data(Threads *threads)
+{
+    ThreadLocal *thread_local = threads->per_thread_data;
+    char *name;
+    int start_index, end_index, i;
+
+    for (i = 0; i < threads->threads_nr; i++) {
+        thread_local[i].threads = threads;
+        thread_local[i].self = i;
+
+        start_index = thread_to_first_request_index(threads, i);
+        end_index = start_index + threads->thread_request_nr - 1;
+        thread_local[i].start_request_index = start_index;
+        thread_local[i].end_request_index = end_index;
+
+        thread_local[i].result_bitmap = bitmap_new(threads->total_requests);
+
+        qemu_event_init(&thread_local[i].ev, false);
+
+        name = g_strdup_printf("%s/%d", threads->name, thread_local[i].self);
+        qemu_thread_create(&thread_local[i].thread, name,
+                           thread_run, &thread_local[i], QEMU_THREAD_JOINABLE);
+        g_free(name);
+    }
+}
+
+Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr,
+                               int thread_request_nr, ThreadedWorkqueueOps *ops)
+{
+    Threads *threads;
+
+    threads = g_malloc0(sizeof(*threads) + threads_nr * sizeof(ThreadLocal));
+    threads->name = name;
+    threads->ops = ops;
+
+    threads->threads_nr = threads_nr;
+    threads->thread_request_nr = thread_request_nr;
+
+    threads->total_requests = thread_request_nr * threads_nr;
+    if (init_requests(threads) < 0) {
+        g_free(threads);
+        return NULL;
+    }
+
+    qemu_event_init(&threads->ev, false);
+    init_thread_data(threads);
+    return threads;
+}
+
+void threaded_workqueue_destroy(Threads *threads)
+{
+    uninit_thread_data(threads);
+    uninit_requests(threads, threads->total_requests);
+    qemu_event_destroy(&threads->ev);
+    g_free(threads);
+}
+
+static void request_done(Threads *threads, ThreadRequest *request)
+{
+    if (!request->done) {
+        return;
+    }
+
+    threads->ops->thread_request_done(request + 1);
+    request->done = false;
+}
+
+void *threaded_workqueue_get_request(Threads *threads)
+{
+    ThreadRequest *request;
+    int index;
+
+    index = find_free_request_index(threads);
+    if (index < 0) {
+        return NULL;
+    }
+
+    request = index_to_request(threads, index);
+    request_done(threads, request);
+    return request + 1;
+}
+
+void threaded_workqueue_submit_request(Threads *threads, void *request)
+{
+    ThreadRequest *req = request - sizeof(ThreadRequest);
+    int request_index = request_to_index(req);
+    int thread_index = request_index_to_thread(threads, request_index);
+    ThreadLocal *thread_local = &threads->per_thread_data[thread_index];
+
+    assert(!req->done);
+
+    mark_request_valid(threads, request_index);
+
+    threads->current_thread_index = ++thread_index;
+    qemu_event_set(&thread_local->ev);
+}
+
+void threaded_workqueue_wait_for_requests(Threads *threads)
+{
+    unsigned long *result_bitmap;
+    int index = 0;
+
+retry:
+    qemu_event_reset(&threads->ev);
+    result_bitmap = get_free_request_bitmap(threads);
+    for (; index < threads->total_requests; index++) {
+        if (test_bit(index, result_bitmap)) {
+            qemu_event_wait(&threads->ev);
+            goto retry;
+        };
+
+        request_done(threads, index_to_request(threads, index));
+    }
+}
-- 
2.14.5

^ permalink raw reply related	[flat|nested] 24+ messages in thread

* [PATCH v2 3/5] migration: use threaded workqueue for compression
  2018-11-06 12:20 ` [Qemu-devel] " guangrong.xiao
@ 2018-11-06 12:20   ` guangrong.xiao
  -1 siblings, 0 replies; 24+ messages in thread
From: guangrong.xiao @ 2018-11-06 12:20 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: kvm, quintela, Xiao Guangrong, qemu-devel, peterx, dgilbert,
	wei.w.wang, cota, jiang.biao2

From: Xiao Guangrong <xiaoguangrong@tencent.com>

Adapt the compression code to the threaded workqueue

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 migration/ram.c | 313 +++++++++++++++++++++-----------------------------------
 1 file changed, 115 insertions(+), 198 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 7e7deec4d8..acca842aff 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -57,6 +57,7 @@
 #include "qemu/uuid.h"
 #include "savevm.h"
 #include "qemu/iov.h"
+#include "qemu/threaded-workqueue.h"
 
 /***********************************************************/
 /* ram save/restore */
@@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus;
 
 CompressionStats compression_counters;
 
-struct CompressParam {
-    bool done;
-    bool quit;
-    bool zero_page;
-    QEMUFile *file;
-    QemuMutex mutex;
-    QemuCond cond;
-    RAMBlock *block;
-    ram_addr_t offset;
-
-    /* internally used fields */
-    z_stream stream;
-    uint8_t *originbuf;
-};
-typedef struct CompressParam CompressParam;
-
 struct DecompressParam {
     bool done;
     bool quit;
@@ -377,15 +362,6 @@ struct DecompressParam {
 };
 typedef struct DecompressParam DecompressParam;
 
-static CompressParam *comp_param;
-static QemuThread *compress_threads;
-/* comp_done_cond is used to wake up the migration thread when
- * one of the compression threads has finished the compression.
- * comp_done_lock is used to co-work with comp_done_cond.
- */
-static QemuMutex comp_done_lock;
-static QemuCond comp_done_cond;
-/* The empty QEMUFileOps will be used by file in CompressParam */
 static const QEMUFileOps empty_ops = { };
 
 static QEMUFile *decomp_file;
@@ -394,125 +370,6 @@ static QemuThread *decompress_threads;
 static QemuMutex decomp_done_lock;
 static QemuCond decomp_done_cond;
 
-static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
-                                 ram_addr_t offset, uint8_t *source_buf);
-
-static void *do_data_compress(void *opaque)
-{
-    CompressParam *param = opaque;
-    RAMBlock *block;
-    ram_addr_t offset;
-    bool zero_page;
-
-    qemu_mutex_lock(&param->mutex);
-    while (!param->quit) {
-        if (param->block) {
-            block = param->block;
-            offset = param->offset;
-            param->block = NULL;
-            qemu_mutex_unlock(&param->mutex);
-
-            zero_page = do_compress_ram_page(param->file, &param->stream,
-                                             block, offset, param->originbuf);
-
-            qemu_mutex_lock(&comp_done_lock);
-            param->done = true;
-            param->zero_page = zero_page;
-            qemu_cond_signal(&comp_done_cond);
-            qemu_mutex_unlock(&comp_done_lock);
-
-            qemu_mutex_lock(&param->mutex);
-        } else {
-            qemu_cond_wait(&param->cond, &param->mutex);
-        }
-    }
-    qemu_mutex_unlock(&param->mutex);
-
-    return NULL;
-}
-
-static void compress_threads_save_cleanup(void)
-{
-    int i, thread_count;
-
-    if (!migrate_use_compression() || !comp_param) {
-        return;
-    }
-
-    thread_count = migrate_compress_threads();
-    for (i = 0; i < thread_count; i++) {
-        /*
-         * we use it as a indicator which shows if the thread is
-         * properly init'd or not
-         */
-        if (!comp_param[i].file) {
-            break;
-        }
-
-        qemu_mutex_lock(&comp_param[i].mutex);
-        comp_param[i].quit = true;
-        qemu_cond_signal(&comp_param[i].cond);
-        qemu_mutex_unlock(&comp_param[i].mutex);
-
-        qemu_thread_join(compress_threads + i);
-        qemu_mutex_destroy(&comp_param[i].mutex);
-        qemu_cond_destroy(&comp_param[i].cond);
-        deflateEnd(&comp_param[i].stream);
-        g_free(comp_param[i].originbuf);
-        qemu_fclose(comp_param[i].file);
-        comp_param[i].file = NULL;
-    }
-    qemu_mutex_destroy(&comp_done_lock);
-    qemu_cond_destroy(&comp_done_cond);
-    g_free(compress_threads);
-    g_free(comp_param);
-    compress_threads = NULL;
-    comp_param = NULL;
-}
-
-static int compress_threads_save_setup(void)
-{
-    int i, thread_count;
-
-    if (!migrate_use_compression()) {
-        return 0;
-    }
-    thread_count = migrate_compress_threads();
-    compress_threads = g_new0(QemuThread, thread_count);
-    comp_param = g_new0(CompressParam, thread_count);
-    qemu_cond_init(&comp_done_cond);
-    qemu_mutex_init(&comp_done_lock);
-    for (i = 0; i < thread_count; i++) {
-        comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
-        if (!comp_param[i].originbuf) {
-            goto exit;
-        }
-
-        if (deflateInit(&comp_param[i].stream,
-                        migrate_compress_level()) != Z_OK) {
-            g_free(comp_param[i].originbuf);
-            goto exit;
-        }
-
-        /* comp_param[i].file is just used as a dummy buffer to save data,
-         * set its ops to empty.
-         */
-        comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
-        comp_param[i].done = true;
-        comp_param[i].quit = false;
-        qemu_mutex_init(&comp_param[i].mutex);
-        qemu_cond_init(&comp_param[i].cond);
-        qemu_thread_create(compress_threads + i, "compress",
-                           do_data_compress, comp_param + i,
-                           QEMU_THREAD_JOINABLE);
-    }
-    return 0;
-
-exit:
-    compress_threads_save_cleanup();
-    return -1;
-}
-
 /* Multiple fd's */
 
 #define MULTIFD_MAGIC 0x11223344U
@@ -1909,12 +1766,25 @@ exit:
     return zero_page;
 }
 
+struct CompressData {
+    /* filled by migration thread.*/
+    RAMBlock *block;
+    ram_addr_t offset;
+
+    /* filled by compress thread. */
+    QEMUFile *file;
+    z_stream stream;
+    uint8_t *originbuf;
+    bool zero_page;
+};
+typedef struct CompressData CompressData;
+
 static void
-update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
+update_compress_thread_counts(CompressData *cd, int bytes_xmit)
 {
     ram_counters.transferred += bytes_xmit;
 
-    if (param->zero_page) {
+    if (cd->zero_page) {
         ram_counters.duplicate++;
         return;
     }
@@ -1924,81 +1794,128 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
     compression_counters.pages++;
 }
 
+static int compress_thread_get_data_size(void)
+{
+    return sizeof(CompressData);
+}
+
+static int compress_thread_data_init(void *request)
+{
+    CompressData *cd = request;
+
+    cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE);
+    if (!cd->originbuf) {
+        return -1;
+    }
+
+    if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) {
+        g_free(cd->originbuf);
+        return -1;
+    }
+
+    cd->file = qemu_fopen_ops(NULL, &empty_ops);
+    return 0;
+}
+
+static void compress_thread_data_fini(void *request)
+{
+    CompressData *cd = request;
+
+    qemu_fclose(cd->file);
+    deflateEnd(&cd->stream);
+    g_free(cd->originbuf);
+}
+
+static void compress_thread_data_handler(void *request)
+{
+    CompressData *cd = request;
+
+    /*
+     * if compression fails, it will be indicated by
+     * migrate_get_current()->to_dst_file.
+     */
+    cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block,
+                                         cd->offset, cd->originbuf);
+}
+
+static void compress_thread_data_done(void *request)
+{
+    CompressData *cd = request;
+    RAMState *rs = ram_state;
+    int bytes_xmit;
+
+    bytes_xmit = qemu_put_qemu_file(rs->f, cd->file);
+    update_compress_thread_counts(cd, bytes_xmit);
+}
+
+static ThreadedWorkqueueOps compress_ops = {
+    .thread_get_request_size = compress_thread_get_data_size,
+    .thread_request_init = compress_thread_data_init,
+    .thread_request_uninit = compress_thread_data_fini,
+    .thread_request_handler = compress_thread_data_handler,
+    .thread_request_done = compress_thread_data_done,
+};
+
+static Threads *compress_threads;
+
 static bool save_page_use_compression(RAMState *rs);
 
 static void flush_compressed_data(RAMState *rs)
 {
-    int idx, len, thread_count;
-
     if (!save_page_use_compression(rs)) {
         return;
     }
-    thread_count = migrate_compress_threads();
 
-    qemu_mutex_lock(&comp_done_lock);
-    for (idx = 0; idx < thread_count; idx++) {
-        while (!comp_param[idx].done) {
-            qemu_cond_wait(&comp_done_cond, &comp_done_lock);
-        }
-    }
-    qemu_mutex_unlock(&comp_done_lock);
+    threaded_workqueue_wait_for_requests(compress_threads);
+}
 
-    for (idx = 0; idx < thread_count; idx++) {
-        qemu_mutex_lock(&comp_param[idx].mutex);
-        if (!comp_param[idx].quit) {
-            len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
-            /*
-             * it's safe to fetch zero_page without holding comp_done_lock
-             * as there is no further request submitted to the thread,
-             * i.e, the thread should be waiting for a request at this point.
-             */
-            update_compress_thread_counts(&comp_param[idx], len);
-        }
-        qemu_mutex_unlock(&comp_param[idx].mutex);
+static void compress_threads_save_cleanup(void)
+{
+    if (!compress_threads) {
+        return;
     }
+
+    threaded_workqueue_destroy(compress_threads);
+    compress_threads = NULL;
 }
 
-static inline void set_compress_params(CompressParam *param, RAMBlock *block,
-                                       ram_addr_t offset)
+static int compress_threads_save_setup(void)
 {
-    param->block = block;
-    param->offset = offset;
+    if (!migrate_use_compression()) {
+        return 0;
+    }
+
+    compress_threads = threaded_workqueue_create("compress",
+                                migrate_compress_threads(),
+                                DEFAULT_THREAD_REQUEST_NR, &compress_ops);
+    return compress_threads ? 0 : -1;
 }
 
 static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
                                            ram_addr_t offset)
 {
-    int idx, thread_count, bytes_xmit = -1, pages = -1;
+    CompressData *cd;
     bool wait = migrate_compress_wait_thread();
 
-    thread_count = migrate_compress_threads();
-    qemu_mutex_lock(&comp_done_lock);
 retry:
-    for (idx = 0; idx < thread_count; idx++) {
-        if (comp_param[idx].done) {
-            comp_param[idx].done = false;
-            bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
-            qemu_mutex_lock(&comp_param[idx].mutex);
-            set_compress_params(&comp_param[idx], block, offset);
-            qemu_cond_signal(&comp_param[idx].cond);
-            qemu_mutex_unlock(&comp_param[idx].mutex);
-            pages = 1;
-            update_compress_thread_counts(&comp_param[idx], bytes_xmit);
-            break;
+    cd = threaded_workqueue_get_request(compress_threads);
+    if (!cd) {
+        /*
+         * wait for the free thread if the user specifies
+         * 'compress-wait-thread', otherwise we will post
+         *  the page out in the main thread as normal page.
+         */
+        if (wait) {
+            cpu_relax();
+            goto retry;
         }
-    }
 
-    /*
-     * wait for the free thread if the user specifies 'compress-wait-thread',
-     * otherwise we will post the page out in the main thread as normal page.
-     */
-    if (pages < 0 && wait) {
-        qemu_cond_wait(&comp_done_cond, &comp_done_lock);
-        goto retry;
-    }
-    qemu_mutex_unlock(&comp_done_lock);
-
-    return pages;
+        return -1;
+     }
+    cd->block = block;
+    cd->offset = offset;
+    threaded_workqueue_submit_request(compress_threads, cd);
+    return 1;
 }
 
 /**
-- 
2.14.5

^ permalink raw reply related	[flat|nested] 24+ messages in thread

* [Qemu-devel] [PATCH v2 3/5] migration: use threaded workqueue for compression
@ 2018-11-06 12:20   ` guangrong.xiao
  0 siblings, 0 replies; 24+ messages in thread
From: guangrong.xiao @ 2018-11-06 12:20 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: qemu-devel, kvm, dgilbert, peterx, wei.w.wang, jiang.biao2,
	eblake, quintela, cota, Xiao Guangrong

From: Xiao Guangrong <xiaoguangrong@tencent.com>

Adapt the compression code to the threaded workqueue

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 migration/ram.c | 313 +++++++++++++++++++++-----------------------------------
 1 file changed, 115 insertions(+), 198 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 7e7deec4d8..acca842aff 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -57,6 +57,7 @@
 #include "qemu/uuid.h"
 #include "savevm.h"
 #include "qemu/iov.h"
+#include "qemu/threaded-workqueue.h"
 
 /***********************************************************/
 /* ram save/restore */
@@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus;
 
 CompressionStats compression_counters;
 
-struct CompressParam {
-    bool done;
-    bool quit;
-    bool zero_page;
-    QEMUFile *file;
-    QemuMutex mutex;
-    QemuCond cond;
-    RAMBlock *block;
-    ram_addr_t offset;
-
-    /* internally used fields */
-    z_stream stream;
-    uint8_t *originbuf;
-};
-typedef struct CompressParam CompressParam;
-
 struct DecompressParam {
     bool done;
     bool quit;
@@ -377,15 +362,6 @@ struct DecompressParam {
 };
 typedef struct DecompressParam DecompressParam;
 
-static CompressParam *comp_param;
-static QemuThread *compress_threads;
-/* comp_done_cond is used to wake up the migration thread when
- * one of the compression threads has finished the compression.
- * comp_done_lock is used to co-work with comp_done_cond.
- */
-static QemuMutex comp_done_lock;
-static QemuCond comp_done_cond;
-/* The empty QEMUFileOps will be used by file in CompressParam */
 static const QEMUFileOps empty_ops = { };
 
 static QEMUFile *decomp_file;
@@ -394,125 +370,6 @@ static QemuThread *decompress_threads;
 static QemuMutex decomp_done_lock;
 static QemuCond decomp_done_cond;
 
-static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
-                                 ram_addr_t offset, uint8_t *source_buf);
-
-static void *do_data_compress(void *opaque)
-{
-    CompressParam *param = opaque;
-    RAMBlock *block;
-    ram_addr_t offset;
-    bool zero_page;
-
-    qemu_mutex_lock(&param->mutex);
-    while (!param->quit) {
-        if (param->block) {
-            block = param->block;
-            offset = param->offset;
-            param->block = NULL;
-            qemu_mutex_unlock(&param->mutex);
-
-            zero_page = do_compress_ram_page(param->file, &param->stream,
-                                             block, offset, param->originbuf);
-
-            qemu_mutex_lock(&comp_done_lock);
-            param->done = true;
-            param->zero_page = zero_page;
-            qemu_cond_signal(&comp_done_cond);
-            qemu_mutex_unlock(&comp_done_lock);
-
-            qemu_mutex_lock(&param->mutex);
-        } else {
-            qemu_cond_wait(&param->cond, &param->mutex);
-        }
-    }
-    qemu_mutex_unlock(&param->mutex);
-
-    return NULL;
-}
-
-static void compress_threads_save_cleanup(void)
-{
-    int i, thread_count;
-
-    if (!migrate_use_compression() || !comp_param) {
-        return;
-    }
-
-    thread_count = migrate_compress_threads();
-    for (i = 0; i < thread_count; i++) {
-        /*
-         * we use it as a indicator which shows if the thread is
-         * properly init'd or not
-         */
-        if (!comp_param[i].file) {
-            break;
-        }
-
-        qemu_mutex_lock(&comp_param[i].mutex);
-        comp_param[i].quit = true;
-        qemu_cond_signal(&comp_param[i].cond);
-        qemu_mutex_unlock(&comp_param[i].mutex);
-
-        qemu_thread_join(compress_threads + i);
-        qemu_mutex_destroy(&comp_param[i].mutex);
-        qemu_cond_destroy(&comp_param[i].cond);
-        deflateEnd(&comp_param[i].stream);
-        g_free(comp_param[i].originbuf);
-        qemu_fclose(comp_param[i].file);
-        comp_param[i].file = NULL;
-    }
-    qemu_mutex_destroy(&comp_done_lock);
-    qemu_cond_destroy(&comp_done_cond);
-    g_free(compress_threads);
-    g_free(comp_param);
-    compress_threads = NULL;
-    comp_param = NULL;
-}
-
-static int compress_threads_save_setup(void)
-{
-    int i, thread_count;
-
-    if (!migrate_use_compression()) {
-        return 0;
-    }
-    thread_count = migrate_compress_threads();
-    compress_threads = g_new0(QemuThread, thread_count);
-    comp_param = g_new0(CompressParam, thread_count);
-    qemu_cond_init(&comp_done_cond);
-    qemu_mutex_init(&comp_done_lock);
-    for (i = 0; i < thread_count; i++) {
-        comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
-        if (!comp_param[i].originbuf) {
-            goto exit;
-        }
-
-        if (deflateInit(&comp_param[i].stream,
-                        migrate_compress_level()) != Z_OK) {
-            g_free(comp_param[i].originbuf);
-            goto exit;
-        }
-
-        /* comp_param[i].file is just used as a dummy buffer to save data,
-         * set its ops to empty.
-         */
-        comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
-        comp_param[i].done = true;
-        comp_param[i].quit = false;
-        qemu_mutex_init(&comp_param[i].mutex);
-        qemu_cond_init(&comp_param[i].cond);
-        qemu_thread_create(compress_threads + i, "compress",
-                           do_data_compress, comp_param + i,
-                           QEMU_THREAD_JOINABLE);
-    }
-    return 0;
-
-exit:
-    compress_threads_save_cleanup();
-    return -1;
-}
-
 /* Multiple fd's */
 
 #define MULTIFD_MAGIC 0x11223344U
@@ -1909,12 +1766,25 @@ exit:
     return zero_page;
 }
 
+struct CompressData {
+    /* filled by migration thread.*/
+    RAMBlock *block;
+    ram_addr_t offset;
+
+    /* filled by compress thread. */
+    QEMUFile *file;
+    z_stream stream;
+    uint8_t *originbuf;
+    bool zero_page;
+};
+typedef struct CompressData CompressData;
+
 static void
-update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
+update_compress_thread_counts(CompressData *cd, int bytes_xmit)
 {
     ram_counters.transferred += bytes_xmit;
 
-    if (param->zero_page) {
+    if (cd->zero_page) {
         ram_counters.duplicate++;
         return;
     }
@@ -1924,81 +1794,128 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
     compression_counters.pages++;
 }
 
+static int compress_thread_get_data_size(void)
+{
+    return sizeof(CompressData);
+}
+
+static int compress_thread_data_init(void *request)
+{
+    CompressData *cd = request;
+
+    cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE);
+    if (!cd->originbuf) {
+        return -1;
+    }
+
+    if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) {
+        g_free(cd->originbuf);
+        return -1;
+    }
+
+    cd->file = qemu_fopen_ops(NULL, &empty_ops);
+    return 0;
+}
+
+static void compress_thread_data_fini(void *request)
+{
+    CompressData *cd = request;
+
+    qemu_fclose(cd->file);
+    deflateEnd(&cd->stream);
+    g_free(cd->originbuf);
+}
+
+static void compress_thread_data_handler(void *request)
+{
+    CompressData *cd = request;
+
+    /*
+     * if compression fails, it will be indicated by
+     * migrate_get_current()->to_dst_file.
+     */
+    cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block,
+                                         cd->offset, cd->originbuf);
+}
+
+static void compress_thread_data_done(void *request)
+{
+    CompressData *cd = request;
+    RAMState *rs = ram_state;
+    int bytes_xmit;
+
+    bytes_xmit = qemu_put_qemu_file(rs->f, cd->file);
+    update_compress_thread_counts(cd, bytes_xmit);
+}
+
+static ThreadedWorkqueueOps compress_ops = {
+    .thread_get_request_size = compress_thread_get_data_size,
+    .thread_request_init = compress_thread_data_init,
+    .thread_request_uninit = compress_thread_data_fini,
+    .thread_request_handler = compress_thread_data_handler,
+    .thread_request_done = compress_thread_data_done,
+};
+
+static Threads *compress_threads;
+
 static bool save_page_use_compression(RAMState *rs);
 
 static void flush_compressed_data(RAMState *rs)
 {
-    int idx, len, thread_count;
-
     if (!save_page_use_compression(rs)) {
         return;
     }
-    thread_count = migrate_compress_threads();
 
-    qemu_mutex_lock(&comp_done_lock);
-    for (idx = 0; idx < thread_count; idx++) {
-        while (!comp_param[idx].done) {
-            qemu_cond_wait(&comp_done_cond, &comp_done_lock);
-        }
-    }
-    qemu_mutex_unlock(&comp_done_lock);
+    threaded_workqueue_wait_for_requests(compress_threads);
+}
 
-    for (idx = 0; idx < thread_count; idx++) {
-        qemu_mutex_lock(&comp_param[idx].mutex);
-        if (!comp_param[idx].quit) {
-            len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
-            /*
-             * it's safe to fetch zero_page without holding comp_done_lock
-             * as there is no further request submitted to the thread,
-             * i.e, the thread should be waiting for a request at this point.
-             */
-            update_compress_thread_counts(&comp_param[idx], len);
-        }
-        qemu_mutex_unlock(&comp_param[idx].mutex);
+static void compress_threads_save_cleanup(void)
+{
+    if (!compress_threads) {
+        return;
     }
+
+    threaded_workqueue_destroy(compress_threads);
+    compress_threads = NULL;
 }
 
-static inline void set_compress_params(CompressParam *param, RAMBlock *block,
-                                       ram_addr_t offset)
+static int compress_threads_save_setup(void)
 {
-    param->block = block;
-    param->offset = offset;
+    if (!migrate_use_compression()) {
+        return 0;
+    }
+
+    compress_threads = threaded_workqueue_create("compress",
+                                migrate_compress_threads(),
+                                DEFAULT_THREAD_REQUEST_NR, &compress_ops);
+    return compress_threads ? 0 : -1;
 }
 
 static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
                                            ram_addr_t offset)
 {
-    int idx, thread_count, bytes_xmit = -1, pages = -1;
+    CompressData *cd;
     bool wait = migrate_compress_wait_thread();
 
-    thread_count = migrate_compress_threads();
-    qemu_mutex_lock(&comp_done_lock);
 retry:
-    for (idx = 0; idx < thread_count; idx++) {
-        if (comp_param[idx].done) {
-            comp_param[idx].done = false;
-            bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
-            qemu_mutex_lock(&comp_param[idx].mutex);
-            set_compress_params(&comp_param[idx], block, offset);
-            qemu_cond_signal(&comp_param[idx].cond);
-            qemu_mutex_unlock(&comp_param[idx].mutex);
-            pages = 1;
-            update_compress_thread_counts(&comp_param[idx], bytes_xmit);
-            break;
+    cd = threaded_workqueue_get_request(compress_threads);
+    if (!cd) {
+        /*
+         * wait for the free thread if the user specifies
+         * 'compress-wait-thread', otherwise we will post
+         *  the page out in the main thread as normal page.
+         */
+        if (wait) {
+            cpu_relax();
+            goto retry;
         }
-    }
 
-    /*
-     * wait for the free thread if the user specifies 'compress-wait-thread',
-     * otherwise we will post the page out in the main thread as normal page.
-     */
-    if (pages < 0 && wait) {
-        qemu_cond_wait(&comp_done_cond, &comp_done_lock);
-        goto retry;
-    }
-    qemu_mutex_unlock(&comp_done_lock);
-
-    return pages;
+        return -1;
+     }
+    cd->block = block;
+    cd->offset = offset;
+    threaded_workqueue_submit_request(compress_threads, cd);
+    return 1;
 }
 
 /**
-- 
2.14.5

^ permalink raw reply related	[flat|nested] 24+ messages in thread

* [PATCH v2 4/5] migration: use threaded workqueue for decompression
  2018-11-06 12:20 ` [Qemu-devel] " guangrong.xiao
@ 2018-11-06 12:20   ` guangrong.xiao
  -1 siblings, 0 replies; 24+ messages in thread
From: guangrong.xiao @ 2018-11-06 12:20 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: kvm, quintela, Xiao Guangrong, qemu-devel, peterx, dgilbert,
	wei.w.wang, cota, jiang.biao2

From: Xiao Guangrong <xiaoguangrong@tencent.com>

Adapt the compression code to the threaded workqueue

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 migration/ram.c | 225 ++++++++++++++++++++------------------------------------
 1 file changed, 81 insertions(+), 144 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index acca842aff..834198f11c 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -350,25 +350,9 @@ typedef struct PageSearchStatus PageSearchStatus;
 
 CompressionStats compression_counters;
 
-struct DecompressParam {
-    bool done;
-    bool quit;
-    QemuMutex mutex;
-    QemuCond cond;
-    void *des;
-    uint8_t *compbuf;
-    int len;
-    z_stream stream;
-};
-typedef struct DecompressParam DecompressParam;
-
 static const QEMUFileOps empty_ops = { };
 
 static QEMUFile *decomp_file;
-static DecompressParam *decomp_param;
-static QemuThread *decompress_threads;
-static QemuMutex decomp_done_lock;
-static QemuCond decomp_done_cond;
 
 /* Multiple fd's */
 
@@ -3404,6 +3388,7 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
     }
 }
 
+
 /* return the size after decompression, or negative value on error */
 static int
 qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
@@ -3429,166 +3414,118 @@ qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
     return stream->total_out;
 }
 
-static void *do_data_decompress(void *opaque)
+struct DecompressData {
+    /* filled by migration thread.*/
+    void *des;
+    uint8_t *compbuf;
+    size_t len;
+
+    z_stream stream;
+};
+typedef struct DecompressData DecompressData;
+
+static Threads *decompress_threads;
+
+static int decompress_thread_get_data_size(void)
 {
-    DecompressParam *param = opaque;
-    unsigned long pagesize;
-    uint8_t *des;
-    int len, ret;
-
-    qemu_mutex_lock(&param->mutex);
-    while (!param->quit) {
-        if (param->des) {
-            des = param->des;
-            len = param->len;
-            param->des = 0;
-            qemu_mutex_unlock(&param->mutex);
-
-            pagesize = TARGET_PAGE_SIZE;
-
-            ret = qemu_uncompress_data(&param->stream, des, pagesize,
-                                       param->compbuf, len);
-            if (ret < 0 && migrate_get_current()->decompress_error_check) {
-                error_report("decompress data failed");
-                qemu_file_set_error(decomp_file, ret);
-            }
+    return sizeof(DecompressData);
+}
 
-            qemu_mutex_lock(&decomp_done_lock);
-            param->done = true;
-            qemu_cond_signal(&decomp_done_cond);
-            qemu_mutex_unlock(&decomp_done_lock);
+static int decompress_thread_data_init(void *request)
+{
+    DecompressData *dd = request;
 
-            qemu_mutex_lock(&param->mutex);
-        } else {
-            qemu_cond_wait(&param->cond, &param->mutex);
-        }
+    if (inflateInit(&dd->stream) != Z_OK) {
+        return -1;
     }
-    qemu_mutex_unlock(&param->mutex);
 
-    return NULL;
+    dd->compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+    return 0;
 }
 
-static int wait_for_decompress_done(void)
+static void decompress_thread_data_fini(void *request)
 {
-    int idx, thread_count;
+    DecompressData *dd = request;
 
-    if (!migrate_use_compression()) {
-        return 0;
-    }
+    inflateEnd(&dd->stream);
+    g_free(dd->compbuf);
+}
 
-    thread_count = migrate_decompress_threads();
-    qemu_mutex_lock(&decomp_done_lock);
-    for (idx = 0; idx < thread_count; idx++) {
-        while (!decomp_param[idx].done) {
-            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
-        }
+static void decompress_thread_data_handler(void *request)
+{
+    DecompressData *dd = request;
+    unsigned long pagesize = TARGET_PAGE_SIZE;
+    int ret;
+
+    ret = qemu_uncompress_data(&dd->stream, dd->des, pagesize,
+                               dd->compbuf, dd->len);
+    if (ret < 0 && migrate_get_current()->decompress_error_check) {
+        error_report("decompress data failed");
+        qemu_file_set_error(decomp_file, ret);
     }
-    qemu_mutex_unlock(&decomp_done_lock);
-    return qemu_file_get_error(decomp_file);
 }
 
-static void compress_threads_load_cleanup(void)
+static void decompress_thread_data_done(void *request)
 {
-    int i, thread_count;
+}
+
+static ThreadedWorkqueueOps decompress_ops = {
+    .thread_get_request_size = decompress_thread_get_data_size,
+    .thread_request_init = decompress_thread_data_init,
+    .thread_request_uninit = decompress_thread_data_fini,
+    .thread_request_handler = decompress_thread_data_handler,
+    .thread_request_done = decompress_thread_data_done,
+};
 
+static int decompress_init(QEMUFile *f)
+{
     if (!migrate_use_compression()) {
-        return;
+        return 0;
     }
-    thread_count = migrate_decompress_threads();
-    for (i = 0; i < thread_count; i++) {
-        /*
-         * we use it as a indicator which shows if the thread is
-         * properly init'd or not
-         */
-        if (!decomp_param[i].compbuf) {
-            break;
-        }
 
-        qemu_mutex_lock(&decomp_param[i].mutex);
-        decomp_param[i].quit = true;
-        qemu_cond_signal(&decomp_param[i].cond);
-        qemu_mutex_unlock(&decomp_param[i].mutex);
-    }
-    for (i = 0; i < thread_count; i++) {
-        if (!decomp_param[i].compbuf) {
-            break;
-        }
+    decomp_file = f;
+    decompress_threads = threaded_workqueue_create("decompress",
+                                migrate_decompress_threads(),
+                                DEFAULT_THREAD_REQUEST_NR, &decompress_ops);
+    return decompress_threads ? 0 : -1;
+}
 
-        qemu_thread_join(decompress_threads + i);
-        qemu_mutex_destroy(&decomp_param[i].mutex);
-        qemu_cond_destroy(&decomp_param[i].cond);
-        inflateEnd(&decomp_param[i].stream);
-        g_free(decomp_param[i].compbuf);
-        decomp_param[i].compbuf = NULL;
+static void decompress_fini(void)
+{
+    if (!decompress_threads) {
+        return;
     }
-    g_free(decompress_threads);
-    g_free(decomp_param);
+
+    threaded_workqueue_destroy(decompress_threads);
     decompress_threads = NULL;
-    decomp_param = NULL;
     decomp_file = NULL;
 }
 
-static int compress_threads_load_setup(QEMUFile *f)
+static int flush_decompressed_data(void)
 {
-    int i, thread_count;
-
     if (!migrate_use_compression()) {
         return 0;
     }
 
-    thread_count = migrate_decompress_threads();
-    decompress_threads = g_new0(QemuThread, thread_count);
-    decomp_param = g_new0(DecompressParam, thread_count);
-    qemu_mutex_init(&decomp_done_lock);
-    qemu_cond_init(&decomp_done_cond);
-    decomp_file = f;
-    for (i = 0; i < thread_count; i++) {
-        if (inflateInit(&decomp_param[i].stream) != Z_OK) {
-            goto exit;
-        }
-
-        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
-        qemu_mutex_init(&decomp_param[i].mutex);
-        qemu_cond_init(&decomp_param[i].cond);
-        decomp_param[i].done = true;
-        decomp_param[i].quit = false;
-        qemu_thread_create(decompress_threads + i, "decompress",
-                           do_data_decompress, decomp_param + i,
-                           QEMU_THREAD_JOINABLE);
-    }
-    return 0;
-exit:
-    compress_threads_load_cleanup();
-    return -1;
+    threaded_workqueue_wait_for_requests(decompress_threads);
+    return qemu_file_get_error(decomp_file);
 }
 
 static void decompress_data_with_multi_threads(QEMUFile *f,
-                                               void *host, int len)
+                                               void *host, size_t len)
 {
-    int idx, thread_count;
+    DecompressData *dd;
 
-    thread_count = migrate_decompress_threads();
-    qemu_mutex_lock(&decomp_done_lock);
-    while (true) {
-        for (idx = 0; idx < thread_count; idx++) {
-            if (decomp_param[idx].done) {
-                decomp_param[idx].done = false;
-                qemu_mutex_lock(&decomp_param[idx].mutex);
-                qemu_get_buffer(f, decomp_param[idx].compbuf, len);
-                decomp_param[idx].des = host;
-                decomp_param[idx].len = len;
-                qemu_cond_signal(&decomp_param[idx].cond);
-                qemu_mutex_unlock(&decomp_param[idx].mutex);
-                break;
-            }
-        }
-        if (idx < thread_count) {
-            break;
-        } else {
-            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
-        }
+retry:
+    dd = threaded_workqueue_get_request(decompress_threads);
+    if (!dd) {
+        goto retry;
     }
-    qemu_mutex_unlock(&decomp_done_lock);
+
+    dd->des = host;
+    dd->len = len;
+    qemu_get_buffer(f, dd->compbuf, len);
+    threaded_workqueue_submit_request(decompress_threads, dd);
 }
 
 /*
@@ -3683,7 +3620,7 @@ void colo_release_ram_cache(void)
  */
 static int ram_load_setup(QEMUFile *f, void *opaque)
 {
-    if (compress_threads_load_setup(f)) {
+    if (decompress_init(f)) {
         return -1;
     }
 
@@ -3704,7 +3641,7 @@ static int ram_load_cleanup(void *opaque)
     }
 
     xbzrle_load_cleanup();
-    compress_threads_load_cleanup();
+    decompress_fini();
 
     RAMBLOCK_FOREACH_MIGRATABLE(rb) {
         g_free(rb->receivedmap);
@@ -4106,7 +4043,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
         }
     }
 
-    ret |= wait_for_decompress_done();
+    ret |= flush_decompressed_data();
     rcu_read_unlock();
     trace_ram_load_complete(ret, seq_iter);
 
-- 
2.14.5

^ permalink raw reply related	[flat|nested] 24+ messages in thread

* [Qemu-devel] [PATCH v2 4/5] migration: use threaded workqueue for decompression
@ 2018-11-06 12:20   ` guangrong.xiao
  0 siblings, 0 replies; 24+ messages in thread
From: guangrong.xiao @ 2018-11-06 12:20 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: qemu-devel, kvm, dgilbert, peterx, wei.w.wang, jiang.biao2,
	eblake, quintela, cota, Xiao Guangrong

From: Xiao Guangrong <xiaoguangrong@tencent.com>

Adapt the compression code to the threaded workqueue

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 migration/ram.c | 225 ++++++++++++++++++++------------------------------------
 1 file changed, 81 insertions(+), 144 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index acca842aff..834198f11c 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -350,25 +350,9 @@ typedef struct PageSearchStatus PageSearchStatus;
 
 CompressionStats compression_counters;
 
-struct DecompressParam {
-    bool done;
-    bool quit;
-    QemuMutex mutex;
-    QemuCond cond;
-    void *des;
-    uint8_t *compbuf;
-    int len;
-    z_stream stream;
-};
-typedef struct DecompressParam DecompressParam;
-
 static const QEMUFileOps empty_ops = { };
 
 static QEMUFile *decomp_file;
-static DecompressParam *decomp_param;
-static QemuThread *decompress_threads;
-static QemuMutex decomp_done_lock;
-static QemuCond decomp_done_cond;
 
 /* Multiple fd's */
 
@@ -3404,6 +3388,7 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
     }
 }
 
+
 /* return the size after decompression, or negative value on error */
 static int
 qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
@@ -3429,166 +3414,118 @@ qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
     return stream->total_out;
 }
 
-static void *do_data_decompress(void *opaque)
+struct DecompressData {
+    /* filled by migration thread.*/
+    void *des;
+    uint8_t *compbuf;
+    size_t len;
+
+    z_stream stream;
+};
+typedef struct DecompressData DecompressData;
+
+static Threads *decompress_threads;
+
+static int decompress_thread_get_data_size(void)
 {
-    DecompressParam *param = opaque;
-    unsigned long pagesize;
-    uint8_t *des;
-    int len, ret;
-
-    qemu_mutex_lock(&param->mutex);
-    while (!param->quit) {
-        if (param->des) {
-            des = param->des;
-            len = param->len;
-            param->des = 0;
-            qemu_mutex_unlock(&param->mutex);
-
-            pagesize = TARGET_PAGE_SIZE;
-
-            ret = qemu_uncompress_data(&param->stream, des, pagesize,
-                                       param->compbuf, len);
-            if (ret < 0 && migrate_get_current()->decompress_error_check) {
-                error_report("decompress data failed");
-                qemu_file_set_error(decomp_file, ret);
-            }
+    return sizeof(DecompressData);
+}
 
-            qemu_mutex_lock(&decomp_done_lock);
-            param->done = true;
-            qemu_cond_signal(&decomp_done_cond);
-            qemu_mutex_unlock(&decomp_done_lock);
+static int decompress_thread_data_init(void *request)
+{
+    DecompressData *dd = request;
 
-            qemu_mutex_lock(&param->mutex);
-        } else {
-            qemu_cond_wait(&param->cond, &param->mutex);
-        }
+    if (inflateInit(&dd->stream) != Z_OK) {
+        return -1;
     }
-    qemu_mutex_unlock(&param->mutex);
 
-    return NULL;
+    dd->compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+    return 0;
 }
 
-static int wait_for_decompress_done(void)
+static void decompress_thread_data_fini(void *request)
 {
-    int idx, thread_count;
+    DecompressData *dd = request;
 
-    if (!migrate_use_compression()) {
-        return 0;
-    }
+    inflateEnd(&dd->stream);
+    g_free(dd->compbuf);
+}
 
-    thread_count = migrate_decompress_threads();
-    qemu_mutex_lock(&decomp_done_lock);
-    for (idx = 0; idx < thread_count; idx++) {
-        while (!decomp_param[idx].done) {
-            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
-        }
+static void decompress_thread_data_handler(void *request)
+{
+    DecompressData *dd = request;
+    unsigned long pagesize = TARGET_PAGE_SIZE;
+    int ret;
+
+    ret = qemu_uncompress_data(&dd->stream, dd->des, pagesize,
+                               dd->compbuf, dd->len);
+    if (ret < 0 && migrate_get_current()->decompress_error_check) {
+        error_report("decompress data failed");
+        qemu_file_set_error(decomp_file, ret);
     }
-    qemu_mutex_unlock(&decomp_done_lock);
-    return qemu_file_get_error(decomp_file);
 }
 
-static void compress_threads_load_cleanup(void)
+static void decompress_thread_data_done(void *request)
 {
-    int i, thread_count;
+}
+
+static ThreadedWorkqueueOps decompress_ops = {
+    .thread_get_request_size = decompress_thread_get_data_size,
+    .thread_request_init = decompress_thread_data_init,
+    .thread_request_uninit = decompress_thread_data_fini,
+    .thread_request_handler = decompress_thread_data_handler,
+    .thread_request_done = decompress_thread_data_done,
+};
 
+static int decompress_init(QEMUFile *f)
+{
     if (!migrate_use_compression()) {
-        return;
+        return 0;
     }
-    thread_count = migrate_decompress_threads();
-    for (i = 0; i < thread_count; i++) {
-        /*
-         * we use it as a indicator which shows if the thread is
-         * properly init'd or not
-         */
-        if (!decomp_param[i].compbuf) {
-            break;
-        }
 
-        qemu_mutex_lock(&decomp_param[i].mutex);
-        decomp_param[i].quit = true;
-        qemu_cond_signal(&decomp_param[i].cond);
-        qemu_mutex_unlock(&decomp_param[i].mutex);
-    }
-    for (i = 0; i < thread_count; i++) {
-        if (!decomp_param[i].compbuf) {
-            break;
-        }
+    decomp_file = f;
+    decompress_threads = threaded_workqueue_create("decompress",
+                                migrate_decompress_threads(),
+                                DEFAULT_THREAD_REQUEST_NR, &decompress_ops);
+    return decompress_threads ? 0 : -1;
+}
 
-        qemu_thread_join(decompress_threads + i);
-        qemu_mutex_destroy(&decomp_param[i].mutex);
-        qemu_cond_destroy(&decomp_param[i].cond);
-        inflateEnd(&decomp_param[i].stream);
-        g_free(decomp_param[i].compbuf);
-        decomp_param[i].compbuf = NULL;
+static void decompress_fini(void)
+{
+    if (!decompress_threads) {
+        return;
     }
-    g_free(decompress_threads);
-    g_free(decomp_param);
+
+    threaded_workqueue_destroy(decompress_threads);
     decompress_threads = NULL;
-    decomp_param = NULL;
     decomp_file = NULL;
 }
 
-static int compress_threads_load_setup(QEMUFile *f)
+static int flush_decompressed_data(void)
 {
-    int i, thread_count;
-
     if (!migrate_use_compression()) {
         return 0;
     }
 
-    thread_count = migrate_decompress_threads();
-    decompress_threads = g_new0(QemuThread, thread_count);
-    decomp_param = g_new0(DecompressParam, thread_count);
-    qemu_mutex_init(&decomp_done_lock);
-    qemu_cond_init(&decomp_done_cond);
-    decomp_file = f;
-    for (i = 0; i < thread_count; i++) {
-        if (inflateInit(&decomp_param[i].stream) != Z_OK) {
-            goto exit;
-        }
-
-        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
-        qemu_mutex_init(&decomp_param[i].mutex);
-        qemu_cond_init(&decomp_param[i].cond);
-        decomp_param[i].done = true;
-        decomp_param[i].quit = false;
-        qemu_thread_create(decompress_threads + i, "decompress",
-                           do_data_decompress, decomp_param + i,
-                           QEMU_THREAD_JOINABLE);
-    }
-    return 0;
-exit:
-    compress_threads_load_cleanup();
-    return -1;
+    threaded_workqueue_wait_for_requests(decompress_threads);
+    return qemu_file_get_error(decomp_file);
 }
 
 static void decompress_data_with_multi_threads(QEMUFile *f,
-                                               void *host, int len)
+                                               void *host, size_t len)
 {
-    int idx, thread_count;
+    DecompressData *dd;
 
-    thread_count = migrate_decompress_threads();
-    qemu_mutex_lock(&decomp_done_lock);
-    while (true) {
-        for (idx = 0; idx < thread_count; idx++) {
-            if (decomp_param[idx].done) {
-                decomp_param[idx].done = false;
-                qemu_mutex_lock(&decomp_param[idx].mutex);
-                qemu_get_buffer(f, decomp_param[idx].compbuf, len);
-                decomp_param[idx].des = host;
-                decomp_param[idx].len = len;
-                qemu_cond_signal(&decomp_param[idx].cond);
-                qemu_mutex_unlock(&decomp_param[idx].mutex);
-                break;
-            }
-        }
-        if (idx < thread_count) {
-            break;
-        } else {
-            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
-        }
+retry:
+    dd = threaded_workqueue_get_request(decompress_threads);
+    if (!dd) {
+        goto retry;
     }
-    qemu_mutex_unlock(&decomp_done_lock);
+
+    dd->des = host;
+    dd->len = len;
+    qemu_get_buffer(f, dd->compbuf, len);
+    threaded_workqueue_submit_request(decompress_threads, dd);
 }
 
 /*
@@ -3683,7 +3620,7 @@ void colo_release_ram_cache(void)
  */
 static int ram_load_setup(QEMUFile *f, void *opaque)
 {
-    if (compress_threads_load_setup(f)) {
+    if (decompress_init(f)) {
         return -1;
     }
 
@@ -3704,7 +3641,7 @@ static int ram_load_cleanup(void *opaque)
     }
 
     xbzrle_load_cleanup();
-    compress_threads_load_cleanup();
+    decompress_fini();
 
     RAMBLOCK_FOREACH_MIGRATABLE(rb) {
         g_free(rb->receivedmap);
@@ -4106,7 +4043,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
         }
     }
 
-    ret |= wait_for_decompress_done();
+    ret |= flush_decompressed_data();
     rcu_read_unlock();
     trace_ram_load_complete(ret, seq_iter);
 
-- 
2.14.5

^ permalink raw reply related	[flat|nested] 24+ messages in thread

* [PATCH v2 5/5] tests: add threaded-workqueue-bench
  2018-11-06 12:20 ` [Qemu-devel] " guangrong.xiao
@ 2018-11-06 12:20   ` guangrong.xiao
  -1 siblings, 0 replies; 24+ messages in thread
From: guangrong.xiao @ 2018-11-06 12:20 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: kvm, quintela, Xiao Guangrong, qemu-devel, peterx, dgilbert,
	wei.w.wang, cota, jiang.biao2

From: Xiao Guangrong <xiaoguangrong@tencent.com>

It's the benhcmark of threaded-workqueue, also it's a good
example to show how threaded-workqueue is used

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 tests/Makefile.include           |   5 +-
 tests/threaded-workqueue-bench.c | 256 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 260 insertions(+), 1 deletion(-)
 create mode 100644 tests/threaded-workqueue-bench.c

diff --git a/tests/Makefile.include b/tests/Makefile.include
index d2e577eabb..a4deb210ab 100644
--- a/tests/Makefile.include
+++ b/tests/Makefile.include
@@ -499,7 +499,8 @@ test-obj-y = tests/check-qnum.o tests/check-qstring.o tests/check-qdict.o \
 	tests/test-rcu-tailq.o \
 	tests/test-qdist.o tests/test-shift128.o \
 	tests/test-qht.o tests/qht-bench.o tests/test-qht-par.o \
-	tests/atomic_add-bench.o tests/atomic64-bench.o
+	tests/atomic_add-bench.o tests/atomic64-bench.o \
+	tests/threaded-workqueue-bench.o
 
 $(test-obj-y): QEMU_INCLUDES += -Itests
 QEMU_CFLAGS += -I$(SRC_PATH)/tests
@@ -555,6 +556,8 @@ tests/qht-bench$(EXESUF): tests/qht-bench.o $(test-util-obj-y)
 tests/test-bufferiszero$(EXESUF): tests/test-bufferiszero.o $(test-util-obj-y)
 tests/atomic_add-bench$(EXESUF): tests/atomic_add-bench.o $(test-util-obj-y)
 tests/atomic64-bench$(EXESUF): tests/atomic64-bench.o $(test-util-obj-y)
+tests/threaded-workqueue-bench$(EXESUF): tests/threaded-workqueue-bench.o migration/qemu-file.o \
+	$(test-util-obj-y)
 
 tests/fp/%:
 	$(MAKE) -C $(dir $@) $(notdir $@)
diff --git a/tests/threaded-workqueue-bench.c b/tests/threaded-workqueue-bench.c
new file mode 100644
index 0000000000..88026f1a8f
--- /dev/null
+++ b/tests/threaded-workqueue-bench.c
@@ -0,0 +1,256 @@
+/*
+ * Threaded Workqueue Benchmark
+ *
+ * Author:
+ *   Xiao Guangrong <xiaoguangrong@tencent.com>
+ *
+ * Copyright(C) 2018 Tencent Corporation.
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+#include <zlib.h>
+
+#include "qemu/osdep.h"
+#include "exec/cpu-common.h"
+#include "qemu/error-report.h"
+#include "migration/qemu-file.h"
+#include "qemu/threaded-workqueue.h"
+
+#define PAGE_SHIFT              12
+#define PAGE_SIZE               (1 << PAGE_SHIFT)
+#define DEFAULT_THREAD_NR       2
+#define DEFAULT_MEM_SIZE        1
+#define DEFAULT_REPEATED_COUNT  3
+
+static ssize_t test_writev_buffer(void *opaque, struct iovec *iov, int iovcnt,
+                                   int64_t pos)
+{
+    int i, size = 0;
+
+    for (i = 0; i < iovcnt; i++) {
+        size += iov[i].iov_len;
+    }
+    return size;
+}
+
+static int test_fclose(void *opaque)
+{
+    return 0;
+}
+
+static const QEMUFileOps test_write_ops = {
+    .writev_buffer  = test_writev_buffer,
+    .close          = test_fclose
+};
+
+static QEMUFile *dest_file;
+
+static const QEMUFileOps empty_ops = { };
+
+struct CompressData {
+    uint8_t *ram_addr;
+    QEMUFile *file;
+    z_stream stream;
+};
+typedef struct CompressData CompressData;
+
+static int compress_request_size(void)
+{
+    return sizeof(CompressData);
+}
+
+static int compress_request_init(void *request)
+{
+    CompressData *cd = request;
+
+    if (deflateInit(&cd->stream, 1) != Z_OK) {
+        return -1;
+    }
+    cd->file = qemu_fopen_ops(NULL, &empty_ops);
+    return 0;
+}
+
+static void compress_request_uninit(void *request)
+{
+    CompressData *cd = request;
+
+    qemu_fclose(cd->file);
+    deflateEnd(&cd->stream);
+}
+
+static void compress_thread_data_handler(void *request)
+{
+    CompressData *cd = request;
+    int blen;
+
+    blen = qemu_put_compression_data(cd->file, &cd->stream, cd->ram_addr,
+                                     PAGE_SIZE);
+    if (blen < 0) {
+        error_report("compressed data failed!");
+        qemu_file_set_error(dest_file, blen);
+    }
+}
+
+struct CompressStats {
+    unsigned long pages;
+    unsigned long compressed_size;
+};
+typedef struct CompressStats CompressStats;
+
+static CompressStats comp_stats;
+
+static void compress_thread_data_done(void *request)
+{
+    CompressData *cd = request;
+    int bytes_xmit;
+
+    bytes_xmit = qemu_put_qemu_file(dest_file, cd->file);
+
+    comp_stats.pages++;
+    comp_stats.compressed_size += bytes_xmit;
+}
+
+static ThreadedWorkqueueOps ops = {
+    .thread_get_request_size = compress_request_size,
+    .thread_request_init = compress_request_init,
+    .thread_request_uninit = compress_request_uninit,
+    .thread_request_handler = compress_thread_data_handler,
+    .thread_request_done = compress_thread_data_done,
+};
+
+static void compress_threads_save_cleanup(Threads *threads)
+{
+    threaded_workqueue_destroy(threads);
+    qemu_fclose(dest_file);
+}
+
+static Threads *compress_threads_save_setup(int threads_nr, int requests_nr)
+{
+    Threads *compress_threads;
+
+    dest_file = qemu_fopen_ops(NULL, &test_write_ops);
+    compress_threads = threaded_workqueue_create("compress", threads_nr,
+                                                 requests_nr, &ops);
+    assert(compress_threads);
+    return compress_threads;
+}
+
+static void compress_page_with_multi_thread(Threads *threads, uint8_t *addr)
+{
+    CompressData *cd;
+
+retry:
+    cd = threaded_workqueue_get_request(threads);
+    if (!cd) {
+        goto retry;
+    }
+
+    cd->ram_addr = addr;
+    threaded_workqueue_submit_request(threads, cd);
+}
+
+static void run(Threads *threads, uint8_t *mem, unsigned long mem_size,
+                int repeated_count)
+{
+    uint8_t *ptr = mem, *end = mem + mem_size;
+    uint64_t start_ts, spend, total_ts = 0, pages = mem_size >> PAGE_SHIFT;
+    double rate;
+    int i;
+
+    for (i = 0; i < repeated_count; i++) {
+        ptr = mem;
+        memset(&comp_stats, 0, sizeof(comp_stats));
+
+        start_ts = g_get_monotonic_time();
+        for (ptr = mem; ptr < end; ptr += PAGE_SIZE) {
+            *ptr = 0x10;
+            compress_page_with_multi_thread(threads, ptr);
+        }
+        threaded_workqueue_wait_for_requests(threads);
+        spend = g_get_monotonic_time() - start_ts;
+        total_ts += spend;
+
+        if (comp_stats.pages != pages) {
+            printf("ERROR: pages are compressed %ld, expect %ld.\n",
+                   comp_stats.pages, pages);
+            exit(-1);
+        }
+
+        rate = (double)(comp_stats.pages * PAGE_SIZE) /
+                        comp_stats.compressed_size;
+        printf("RUN %d: Request # %ld Cost %ld, Compression Rate %f.\n", i,
+               comp_stats.pages, spend, rate);
+    }
+
+    printf("AVG: Time Cost %ld.\n", total_ts / repeated_count);
+}
+
+static void usage(const char *arg0)
+{
+    printf("\nThreaded Workqueue Benchmark.\n");
+    printf("Usage:\n");
+    printf("  %s [OPTIONS]\n", arg0);
+    printf("Options:\n");
+    printf("   -t        the number of threads (default %d).\n",
+            DEFAULT_THREAD_NR);
+    printf("   -r:       the number of requests handled by each thread (default %d).\n",
+            DEFAULT_THREAD_REQUEST_NR);
+    printf("   -m:       the size of the memory (G) used to test (default %dG).\n",
+            DEFAULT_MEM_SIZE);
+    printf("   -c:       the repeated count (default %d).\n",
+            DEFAULT_REPEATED_COUNT);
+    printf("   -h        show this help info.\n");
+}
+
+int main(int argc, char *argv[])
+{
+    int c, threads_nr, requests_nr, repeated_count;
+    unsigned long mem_size;
+    uint8_t *mem;
+    Threads *threads;
+
+    threads_nr = DEFAULT_THREAD_NR;
+    requests_nr = DEFAULT_THREAD_REQUEST_NR;
+    mem_size = DEFAULT_MEM_SIZE;
+    repeated_count = DEFAULT_REPEATED_COUNT;
+
+    for (;;) {
+        c = getopt(argc, argv, "t:r:m:c:h");
+        if (c < 0) {
+            break;
+        }
+
+        switch (c) {
+        case 't':
+            threads_nr = atoi(optarg);
+            break;
+        case 'r':
+            requests_nr = atoi(optarg);
+            break;
+        case 'm':
+            mem_size = atol(optarg);
+            break;
+        case 'c':
+            repeated_count = atoi(optarg);
+            break;
+        default:
+            printf("Unkown option: %c.\n", c);
+        case 'h':
+            usage(argv[0]);
+            return -1;
+        }
+    }
+
+    printf("Run the benchmark: threads %d requests-per-thread: %d memory %ldG repeat %d.\n",
+            threads_nr, requests_nr, mem_size, repeated_count);
+
+    mem_size = mem_size << 30;
+    mem = qemu_memalign(PAGE_SIZE, mem_size);
+    memset(mem, 0, mem_size);
+
+    threads = compress_threads_save_setup(threads_nr, requests_nr);
+    run(threads, mem, mem_size, repeated_count);
+    compress_threads_save_cleanup(threads);
+    return 0;
+}
-- 
2.14.5

^ permalink raw reply related	[flat|nested] 24+ messages in thread

* [Qemu-devel] [PATCH v2 5/5] tests: add threaded-workqueue-bench
@ 2018-11-06 12:20   ` guangrong.xiao
  0 siblings, 0 replies; 24+ messages in thread
From: guangrong.xiao @ 2018-11-06 12:20 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: qemu-devel, kvm, dgilbert, peterx, wei.w.wang, jiang.biao2,
	eblake, quintela, cota, Xiao Guangrong

From: Xiao Guangrong <xiaoguangrong@tencent.com>

It's the benhcmark of threaded-workqueue, also it's a good
example to show how threaded-workqueue is used

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 tests/Makefile.include           |   5 +-
 tests/threaded-workqueue-bench.c | 256 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 260 insertions(+), 1 deletion(-)
 create mode 100644 tests/threaded-workqueue-bench.c

diff --git a/tests/Makefile.include b/tests/Makefile.include
index d2e577eabb..a4deb210ab 100644
--- a/tests/Makefile.include
+++ b/tests/Makefile.include
@@ -499,7 +499,8 @@ test-obj-y = tests/check-qnum.o tests/check-qstring.o tests/check-qdict.o \
 	tests/test-rcu-tailq.o \
 	tests/test-qdist.o tests/test-shift128.o \
 	tests/test-qht.o tests/qht-bench.o tests/test-qht-par.o \
-	tests/atomic_add-bench.o tests/atomic64-bench.o
+	tests/atomic_add-bench.o tests/atomic64-bench.o \
+	tests/threaded-workqueue-bench.o
 
 $(test-obj-y): QEMU_INCLUDES += -Itests
 QEMU_CFLAGS += -I$(SRC_PATH)/tests
@@ -555,6 +556,8 @@ tests/qht-bench$(EXESUF): tests/qht-bench.o $(test-util-obj-y)
 tests/test-bufferiszero$(EXESUF): tests/test-bufferiszero.o $(test-util-obj-y)
 tests/atomic_add-bench$(EXESUF): tests/atomic_add-bench.o $(test-util-obj-y)
 tests/atomic64-bench$(EXESUF): tests/atomic64-bench.o $(test-util-obj-y)
+tests/threaded-workqueue-bench$(EXESUF): tests/threaded-workqueue-bench.o migration/qemu-file.o \
+	$(test-util-obj-y)
 
 tests/fp/%:
 	$(MAKE) -C $(dir $@) $(notdir $@)
diff --git a/tests/threaded-workqueue-bench.c b/tests/threaded-workqueue-bench.c
new file mode 100644
index 0000000000..88026f1a8f
--- /dev/null
+++ b/tests/threaded-workqueue-bench.c
@@ -0,0 +1,256 @@
+/*
+ * Threaded Workqueue Benchmark
+ *
+ * Author:
+ *   Xiao Guangrong <xiaoguangrong@tencent.com>
+ *
+ * Copyright(C) 2018 Tencent Corporation.
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+#include <zlib.h>
+
+#include "qemu/osdep.h"
+#include "exec/cpu-common.h"
+#include "qemu/error-report.h"
+#include "migration/qemu-file.h"
+#include "qemu/threaded-workqueue.h"
+
+#define PAGE_SHIFT              12
+#define PAGE_SIZE               (1 << PAGE_SHIFT)
+#define DEFAULT_THREAD_NR       2
+#define DEFAULT_MEM_SIZE        1
+#define DEFAULT_REPEATED_COUNT  3
+
+static ssize_t test_writev_buffer(void *opaque, struct iovec *iov, int iovcnt,
+                                   int64_t pos)
+{
+    int i, size = 0;
+
+    for (i = 0; i < iovcnt; i++) {
+        size += iov[i].iov_len;
+    }
+    return size;
+}
+
+static int test_fclose(void *opaque)
+{
+    return 0;
+}
+
+static const QEMUFileOps test_write_ops = {
+    .writev_buffer  = test_writev_buffer,
+    .close          = test_fclose
+};
+
+static QEMUFile *dest_file;
+
+static const QEMUFileOps empty_ops = { };
+
+struct CompressData {
+    uint8_t *ram_addr;
+    QEMUFile *file;
+    z_stream stream;
+};
+typedef struct CompressData CompressData;
+
+static int compress_request_size(void)
+{
+    return sizeof(CompressData);
+}
+
+static int compress_request_init(void *request)
+{
+    CompressData *cd = request;
+
+    if (deflateInit(&cd->stream, 1) != Z_OK) {
+        return -1;
+    }
+    cd->file = qemu_fopen_ops(NULL, &empty_ops);
+    return 0;
+}
+
+static void compress_request_uninit(void *request)
+{
+    CompressData *cd = request;
+
+    qemu_fclose(cd->file);
+    deflateEnd(&cd->stream);
+}
+
+static void compress_thread_data_handler(void *request)
+{
+    CompressData *cd = request;
+    int blen;
+
+    blen = qemu_put_compression_data(cd->file, &cd->stream, cd->ram_addr,
+                                     PAGE_SIZE);
+    if (blen < 0) {
+        error_report("compressed data failed!");
+        qemu_file_set_error(dest_file, blen);
+    }
+}
+
+struct CompressStats {
+    unsigned long pages;
+    unsigned long compressed_size;
+};
+typedef struct CompressStats CompressStats;
+
+static CompressStats comp_stats;
+
+static void compress_thread_data_done(void *request)
+{
+    CompressData *cd = request;
+    int bytes_xmit;
+
+    bytes_xmit = qemu_put_qemu_file(dest_file, cd->file);
+
+    comp_stats.pages++;
+    comp_stats.compressed_size += bytes_xmit;
+}
+
+static ThreadedWorkqueueOps ops = {
+    .thread_get_request_size = compress_request_size,
+    .thread_request_init = compress_request_init,
+    .thread_request_uninit = compress_request_uninit,
+    .thread_request_handler = compress_thread_data_handler,
+    .thread_request_done = compress_thread_data_done,
+};
+
+static void compress_threads_save_cleanup(Threads *threads)
+{
+    threaded_workqueue_destroy(threads);
+    qemu_fclose(dest_file);
+}
+
+static Threads *compress_threads_save_setup(int threads_nr, int requests_nr)
+{
+    Threads *compress_threads;
+
+    dest_file = qemu_fopen_ops(NULL, &test_write_ops);
+    compress_threads = threaded_workqueue_create("compress", threads_nr,
+                                                 requests_nr, &ops);
+    assert(compress_threads);
+    return compress_threads;
+}
+
+static void compress_page_with_multi_thread(Threads *threads, uint8_t *addr)
+{
+    CompressData *cd;
+
+retry:
+    cd = threaded_workqueue_get_request(threads);
+    if (!cd) {
+        goto retry;
+    }
+
+    cd->ram_addr = addr;
+    threaded_workqueue_submit_request(threads, cd);
+}
+
+static void run(Threads *threads, uint8_t *mem, unsigned long mem_size,
+                int repeated_count)
+{
+    uint8_t *ptr = mem, *end = mem + mem_size;
+    uint64_t start_ts, spend, total_ts = 0, pages = mem_size >> PAGE_SHIFT;
+    double rate;
+    int i;
+
+    for (i = 0; i < repeated_count; i++) {
+        ptr = mem;
+        memset(&comp_stats, 0, sizeof(comp_stats));
+
+        start_ts = g_get_monotonic_time();
+        for (ptr = mem; ptr < end; ptr += PAGE_SIZE) {
+            *ptr = 0x10;
+            compress_page_with_multi_thread(threads, ptr);
+        }
+        threaded_workqueue_wait_for_requests(threads);
+        spend = g_get_monotonic_time() - start_ts;
+        total_ts += spend;
+
+        if (comp_stats.pages != pages) {
+            printf("ERROR: pages are compressed %ld, expect %ld.\n",
+                   comp_stats.pages, pages);
+            exit(-1);
+        }
+
+        rate = (double)(comp_stats.pages * PAGE_SIZE) /
+                        comp_stats.compressed_size;
+        printf("RUN %d: Request # %ld Cost %ld, Compression Rate %f.\n", i,
+               comp_stats.pages, spend, rate);
+    }
+
+    printf("AVG: Time Cost %ld.\n", total_ts / repeated_count);
+}
+
+static void usage(const char *arg0)
+{
+    printf("\nThreaded Workqueue Benchmark.\n");
+    printf("Usage:\n");
+    printf("  %s [OPTIONS]\n", arg0);
+    printf("Options:\n");
+    printf("   -t        the number of threads (default %d).\n",
+            DEFAULT_THREAD_NR);
+    printf("   -r:       the number of requests handled by each thread (default %d).\n",
+            DEFAULT_THREAD_REQUEST_NR);
+    printf("   -m:       the size of the memory (G) used to test (default %dG).\n",
+            DEFAULT_MEM_SIZE);
+    printf("   -c:       the repeated count (default %d).\n",
+            DEFAULT_REPEATED_COUNT);
+    printf("   -h        show this help info.\n");
+}
+
+int main(int argc, char *argv[])
+{
+    int c, threads_nr, requests_nr, repeated_count;
+    unsigned long mem_size;
+    uint8_t *mem;
+    Threads *threads;
+
+    threads_nr = DEFAULT_THREAD_NR;
+    requests_nr = DEFAULT_THREAD_REQUEST_NR;
+    mem_size = DEFAULT_MEM_SIZE;
+    repeated_count = DEFAULT_REPEATED_COUNT;
+
+    for (;;) {
+        c = getopt(argc, argv, "t:r:m:c:h");
+        if (c < 0) {
+            break;
+        }
+
+        switch (c) {
+        case 't':
+            threads_nr = atoi(optarg);
+            break;
+        case 'r':
+            requests_nr = atoi(optarg);
+            break;
+        case 'm':
+            mem_size = atol(optarg);
+            break;
+        case 'c':
+            repeated_count = atoi(optarg);
+            break;
+        default:
+            printf("Unkown option: %c.\n", c);
+        case 'h':
+            usage(argv[0]);
+            return -1;
+        }
+    }
+
+    printf("Run the benchmark: threads %d requests-per-thread: %d memory %ldG repeat %d.\n",
+            threads_nr, requests_nr, mem_size, repeated_count);
+
+    mem_size = mem_size << 30;
+    mem = qemu_memalign(PAGE_SIZE, mem_size);
+    memset(mem, 0, mem_size);
+
+    threads = compress_threads_save_setup(threads_nr, requests_nr);
+    run(threads, mem, mem_size, repeated_count);
+    compress_threads_save_cleanup(threads);
+    return 0;
+}
-- 
2.14.5

^ permalink raw reply related	[flat|nested] 24+ messages in thread

* Re: [PATCH v2 0/5] migration: improve multithreads
  2018-11-06 12:20 ` [Qemu-devel] " guangrong.xiao
@ 2018-11-07  2:52   ` no-reply
  -1 siblings, 0 replies; 24+ messages in thread
From: no-reply @ 2018-11-07  2:52 UTC (permalink / raw)
  To: guangrong.xiao
  Cc: famz, kvm, quintela, mtosatti, xiaoguangrong, qemu-devel, peterx,
	dgilbert, wei.w.wang, cota, mst, jiang.biao2, pbonzini

Hi,

This series seems to have some coding style problems. See output below for
more information:

Type: series
Message-id: 20181106122025.3487-1-xiaoguangrong@tencent.com
Subject: [Qemu-devel] [PATCH v2 0/5] migration: improve multithreads

=== TEST SCRIPT BEGIN ===
#!/bin/bash

BASE=base
n=1
total=$(git log --oneline $BASE.. | wc -l)
failed=0

git config --local diff.renamelimit 0
git config --local diff.renames True
git config --local diff.algorithm histogram

commits="$(git log --format=%H --reverse $BASE..)"
for c in $commits; do
    echo "Checking PATCH $n/$total: $(git log -n 1 --format=%s $c)..."
    if ! git show $c --format=email | ./scripts/checkpatch.pl --mailback -; then
        failed=1
        echo
    fi
    n=$((n+1))
done

exit $failed
=== TEST SCRIPT END ===

Updating 3c8cf5a9c21ff8782164d1def7f44bd888713384
Switched to a new branch 'test'
64aabd11be tests: add threaded-workqueue-bench
e9efc70edd migration: use threaded workqueue for decompression
7753a846ad migration: use threaded workqueue for compression
f6933b5132 util: introduce threaded workqueue
810caaa566 bitops: introduce change_bit_atomic

=== OUTPUT BEGIN ===
Checking PATCH 1/5: bitops: introduce change_bit_atomic...
Checking PATCH 2/5: util: introduce threaded workqueue...
WARNING: added, moved or deleted file(s), does MAINTAINERS need updating?
#41: 
new file mode 100644

total: 0 errors, 1 warnings, 566 lines checked

Your patch has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.
Checking PATCH 3/5: migration: use threaded workqueue for compression...
Checking PATCH 4/5: migration: use threaded workqueue for decompression...
Checking PATCH 5/5: tests: add threaded-workqueue-bench...
WARNING: added, moved or deleted file(s), does MAINTAINERS need updating?
#36: 
new file mode 100644

WARNING: line over 80 characters
#237: FILE: tests/threaded-workqueue-bench.c:197:
+    printf("   -r:       the number of requests handled by each thread (default %d).\n",

WARNING: line over 80 characters
#239: FILE: tests/threaded-workqueue-bench.c:199:
+    printf("   -m:       the size of the memory (G) used to test (default %dG).\n",

ERROR: line over 90 characters
#285: FILE: tests/threaded-workqueue-bench.c:245:
+    printf("Run the benchmark: threads %d requests-per-thread: %d memory %ldG repeat %d.\n",

total: 1 errors, 3 warnings, 273 lines checked

Your patch has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.

=== OUTPUT END ===

Test command exited with code: 1


---
Email generated automatically by Patchew [http://patchew.org/].
Please send your feedback to patchew-devel@redhat.com

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [Qemu-devel] [PATCH v2 0/5] migration: improve multithreads
@ 2018-11-07  2:52   ` no-reply
  0 siblings, 0 replies; 24+ messages in thread
From: no-reply @ 2018-11-07  2:52 UTC (permalink / raw)
  To: guangrong.xiao
  Cc: famz, pbonzini, mst, mtosatti, kvm, quintela, xiaoguangrong,
	qemu-devel, peterx, dgilbert, wei.w.wang, cota, jiang.biao2

Hi,

This series seems to have some coding style problems. See output below for
more information:

Type: series
Message-id: 20181106122025.3487-1-xiaoguangrong@tencent.com
Subject: [Qemu-devel] [PATCH v2 0/5] migration: improve multithreads

=== TEST SCRIPT BEGIN ===
#!/bin/bash

BASE=base
n=1
total=$(git log --oneline $BASE.. | wc -l)
failed=0

git config --local diff.renamelimit 0
git config --local diff.renames True
git config --local diff.algorithm histogram

commits="$(git log --format=%H --reverse $BASE..)"
for c in $commits; do
    echo "Checking PATCH $n/$total: $(git log -n 1 --format=%s $c)..."
    if ! git show $c --format=email | ./scripts/checkpatch.pl --mailback -; then
        failed=1
        echo
    fi
    n=$((n+1))
done

exit $failed
=== TEST SCRIPT END ===

Updating 3c8cf5a9c21ff8782164d1def7f44bd888713384
Switched to a new branch 'test'
64aabd11be tests: add threaded-workqueue-bench
e9efc70edd migration: use threaded workqueue for decompression
7753a846ad migration: use threaded workqueue for compression
f6933b5132 util: introduce threaded workqueue
810caaa566 bitops: introduce change_bit_atomic

=== OUTPUT BEGIN ===
Checking PATCH 1/5: bitops: introduce change_bit_atomic...
Checking PATCH 2/5: util: introduce threaded workqueue...
WARNING: added, moved or deleted file(s), does MAINTAINERS need updating?
#41: 
new file mode 100644

total: 0 errors, 1 warnings, 566 lines checked

Your patch has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.
Checking PATCH 3/5: migration: use threaded workqueue for compression...
Checking PATCH 4/5: migration: use threaded workqueue for decompression...
Checking PATCH 5/5: tests: add threaded-workqueue-bench...
WARNING: added, moved or deleted file(s), does MAINTAINERS need updating?
#36: 
new file mode 100644

WARNING: line over 80 characters
#237: FILE: tests/threaded-workqueue-bench.c:197:
+    printf("   -r:       the number of requests handled by each thread (default %d).\n",

WARNING: line over 80 characters
#239: FILE: tests/threaded-workqueue-bench.c:199:
+    printf("   -m:       the size of the memory (G) used to test (default %dG).\n",

ERROR: line over 90 characters
#285: FILE: tests/threaded-workqueue-bench.c:245:
+    printf("Run the benchmark: threads %d requests-per-thread: %d memory %ldG repeat %d.\n",

total: 1 errors, 3 warnings, 273 lines checked

Your patch has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.

=== OUTPUT END ===

Test command exited with code: 1


---
Email generated automatically by Patchew [http://patchew.org/].
Please send your feedback to patchew-devel@redhat.com

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [PATCH v2 0/5] migration: improve multithreads
  2018-11-06 12:20 ` [Qemu-devel] " guangrong.xiao
@ 2018-11-12  3:07   ` Xiao Guangrong
  -1 siblings, 0 replies; 24+ messages in thread
From: Xiao Guangrong @ 2018-11-12  3:07 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: kvm, quintela, Xiao Guangrong, qemu-devel, peterx, dgilbert,
	wei.w.wang, cota, jiang.biao2


Hi,

Ping...

On 11/6/18 8:20 PM, guangrong.xiao@gmail.com wrote:
> From: Xiao Guangrong <xiaoguangrong@tencent.com>
> 
> Changelog in v2:
> These changes are based on Paolo's suggestion:
> 1) rename the lockless multithreads model to threaded workqueue
> 2) hugely improve the internal design, that make all the request be
>     a large array, properly partition it, assign requests to threads
>     respectively and use bitmaps to sync up threads and the submitter,
>     after that ptr_ring and spinlock are dropped
> 3) introduce event wait for the submitter
> 
> These changes are based on Emilio's review:
> 4) make more detailed description for threaded workqueue
> 5) add a benchmark for threaded workqueue
> 
> The previous version can be found at
> 	https://marc.info/?l=kvm&m=153968821910007&w=2
> 
> There's the simple performance measurement comparing these two versions,
> the environment is the same as we listed in the previous version.
> 
> Use 8 threads to compress the data in the source QEMU
> - with compress-wait-thread = off
> 
> 
>        total time        busy-ratio
> --------------------------------------------------
> v1    125066            0.38
> v2    120444            0.35
> 
> - with compress-wait-thread = on
>           total time    busy-ratio
> --------------------------------------------------
> v1    164426            0
> v2    142609            0
> 
> The v2 win slightly.
> 
> Xiao Guangrong (5):
>    bitops: introduce change_bit_atomic
>    util: introduce threaded workqueue
>    migration: use threaded workqueue for compression
>    migration: use threaded workqueue for decompression
>    tests: add threaded-workqueue-bench
> 
>   include/qemu/bitops.h             |  13 +
>   include/qemu/threaded-workqueue.h |  94 +++++++
>   migration/ram.c                   | 538 ++++++++++++++------------------------
>   tests/Makefile.include            |   5 +-
>   tests/threaded-workqueue-bench.c  | 256 ++++++++++++++++++
>   util/Makefile.objs                |   1 +
>   util/threaded-workqueue.c         | 466 +++++++++++++++++++++++++++++++++
>   7 files changed, 1030 insertions(+), 343 deletions(-)
>   create mode 100644 include/qemu/threaded-workqueue.h
>   create mode 100644 tests/threaded-workqueue-bench.c
>   create mode 100644 util/threaded-workqueue.c
> 

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [Qemu-devel] [PATCH v2 0/5] migration: improve multithreads
@ 2018-11-12  3:07   ` Xiao Guangrong
  0 siblings, 0 replies; 24+ messages in thread
From: Xiao Guangrong @ 2018-11-12  3:07 UTC (permalink / raw)
  To: pbonzini, mst, mtosatti
  Cc: qemu-devel, kvm, dgilbert, peterx, wei.w.wang, jiang.biao2,
	eblake, quintela, cota, Xiao Guangrong


Hi,

Ping...

On 11/6/18 8:20 PM, guangrong.xiao@gmail.com wrote:
> From: Xiao Guangrong <xiaoguangrong@tencent.com>
> 
> Changelog in v2:
> These changes are based on Paolo's suggestion:
> 1) rename the lockless multithreads model to threaded workqueue
> 2) hugely improve the internal design, that make all the request be
>     a large array, properly partition it, assign requests to threads
>     respectively and use bitmaps to sync up threads and the submitter,
>     after that ptr_ring and spinlock are dropped
> 3) introduce event wait for the submitter
> 
> These changes are based on Emilio's review:
> 4) make more detailed description for threaded workqueue
> 5) add a benchmark for threaded workqueue
> 
> The previous version can be found at
> 	https://marc.info/?l=kvm&m=153968821910007&w=2
> 
> There's the simple performance measurement comparing these two versions,
> the environment is the same as we listed in the previous version.
> 
> Use 8 threads to compress the data in the source QEMU
> - with compress-wait-thread = off
> 
> 
>        total time        busy-ratio
> --------------------------------------------------
> v1    125066            0.38
> v2    120444            0.35
> 
> - with compress-wait-thread = on
>           total time    busy-ratio
> --------------------------------------------------
> v1    164426            0
> v2    142609            0
> 
> The v2 win slightly.
> 
> Xiao Guangrong (5):
>    bitops: introduce change_bit_atomic
>    util: introduce threaded workqueue
>    migration: use threaded workqueue for compression
>    migration: use threaded workqueue for decompression
>    tests: add threaded-workqueue-bench
> 
>   include/qemu/bitops.h             |  13 +
>   include/qemu/threaded-workqueue.h |  94 +++++++
>   migration/ram.c                   | 538 ++++++++++++++------------------------
>   tests/Makefile.include            |   5 +-
>   tests/threaded-workqueue-bench.c  | 256 ++++++++++++++++++
>   util/Makefile.objs                |   1 +
>   util/threaded-workqueue.c         | 466 +++++++++++++++++++++++++++++++++
>   7 files changed, 1030 insertions(+), 343 deletions(-)
>   create mode 100644 include/qemu/threaded-workqueue.h
>   create mode 100644 tests/threaded-workqueue-bench.c
>   create mode 100644 util/threaded-workqueue.c
> 

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [PATCH v2 2/5] util: introduce threaded workqueue
  2018-11-06 12:20   ` [Qemu-devel] " guangrong.xiao
@ 2018-11-13 18:38     ` Emilio G. Cota
  -1 siblings, 0 replies; 24+ messages in thread
From: Emilio G. Cota @ 2018-11-13 18:38 UTC (permalink / raw)
  To: guangrong.xiao
  Cc: kvm, mst, mtosatti, Xiao Guangrong, dgilbert, peterx, qemu-devel,
	quintela, wei.w.wang, jiang.biao2, pbonzini

On Tue, Nov 06, 2018 at 20:20:22 +0800, guangrong.xiao@gmail.com wrote:
> From: Xiao Guangrong <xiaoguangrong@tencent.com>
> 
> This modules implements the lockless and efficient threaded workqueue.
(snip)
> +++ b/util/threaded-workqueue.c
> +struct Threads {
> +    /*
> +     * in order to avoid contention, the @requests is partitioned to
> +     * @threads_nr pieces, each thread exclusively handles
> +     * @thread_request_nr requests in the array.
> +     */
> +    void *requests;
(snip)
> +    /*
> +     * the bit in these two bitmaps indicates the index of the @requests
> +     * respectively. If it's the same, the corresponding request is free
> +     * and owned by the user, i.e, where the user fills a request. Otherwise,
> +     * it is valid and owned by the thread, i.e, where the thread fetches
> +     * the request and write the result.
> +     */
> +
> +    /* after the user fills the request, the bit is flipped. */
> +    unsigned long *request_fill_bitmap;
> +    /* after handles the request, the thread flips the bit. */
> +    unsigned long *request_done_bitmap;
(snip)
> +    /* the request is pushed to the thread with round-robin manner */
> +    unsigned int current_thread_index;
(snip)
> +    QemuEvent ev;
(snip)
> +};

The fields I'm showing above are all shared by all worker threads.
This can lead to unnecessary contention. For example:
- Adjacent requests can share the same cache line, which might be
  written to by different worker threads (when setting request->done)

- The request_done bitmap is written to by worker threads every time
  a job completes. At high core counts with low numbers of job slots,
  this can result in high contention. For example, imagine we have
  16 threads with 4 jobs each. This only requires 64 bits == 8 bytes, i.e.
  much less than a cache line. Whenever a job completes, the cache line
  will be atomically updated by one of the 16 threads.

- The completion event (Threads.ev above) is written to by every thread.
  Again, this can result in contention at large core counts.

An orthogonal issue is the round-robin policy. This can give us fairness,
in that we guarantee that all workers get a similar number of jobs.
But giving one job at a time to each worker is suboptimal when the job
sizes are small-ish, because it misses out on the benefits of batching,
which amortize the cost of communication.
Given that the number of jobs that we have (at least in the benchmark)
are small, filling up a worker's queue before moving on to the next
can yield a significant speedup at high core counts.

I implemented the above on top of your series. The results are as follows:

                                         threaded-workqueue-bench -r 4 -m 2 -c 20 -t #N
                                              Host: AMD Opteron(tm) Processor 6376
                                          Thread pinning: #N+1 cores, same-socket first

         12 +-------------------------------------------------------------------------------------------------------+
            |    +   +     +     +     +     +    A+     +     +     +     +     +     +     +     +     +     +    |
            |                                     $                                                  before ***B*** |
         10 |-+                                  $$                                               +batching ###D###-|
            |                                    $$                                       +per-thread-state $$$A$$$ |
            |                                    $$  A        A                                                     |
            |                     $AD     D$A $A $ $ $A  A   $$               A        A  A$       A        A$ A    |
          8 |-+               D$AA  A# D$AA# A  $#D$$  $ $$ A  $   $A $A      $$ A$ A$A $ $ AA   $A $  $A   $ A   +-|
            |                AA  B* B$DA D  DD# A #$$   A  A   $$AA  A  A$A  $  A  A     A    $ A    AA  A$A        |
            |               $DB*B  B $ $ BB    D   $$  #D #D   A           A$A                 A                    |
          6 |-+          $AA*B       *A *  *       $# D  D  D#  #D #D   D#    D#DD#D   D# D#  # ##D      D#       +-|
            |           A             BB   *       A D        DD  D  D#D  DD#D      D#D  D  DD  D  D# D#D  DD#DD    |
            |           $                   B                                                        D              |
            |         $A                     **BB     B                                                             |
          4 |-+      A#                      B   *    **                                                          +-|
            |        $                            B *B  BB* B*                    *BB*B   B*BB*BB*B  B *BB* B*BB    |
            |      $A                              B       B  BB*BB*BB*BB*BB*BB*BB     **B         ** B    B        |
          2 |-+   A                                                                    B           B              +-|
            |     $                                                                                                 |
            |    A                                                                                                  |
            |    +   +     +     +     +     +     +     +     +     +     +     +     +     +     +     +     +    |
          0 +-------------------------------------------------------------------------------------------------------+
                 1   4     8     12    16    20    24    28    32    36    40    44    48    52    56    60    64
                                                             Threads
  png: https://imgur.com/Aj4yfGO

Note: "Threads" in the X axis means "worker threads".

Batching achieves higher performance at high core counts (>8),
since worker threads go through fewer sleep+wake cycles while
waiting for jobs. Performance however diminishes as more threads
are used (>=24) due to cache line contention.

Contention can be avoided by partitioning the request array, bitmaps
and completion event to be entirely local to worker threads
("+per-thread-state"). This avoids the performance decrease at >=24
threads that we observe with batching alone. Note that the overall
speedup does not go beyond ~8; this is explained by the fact that we
use a single requester thread. Thus, as we add more worker threads,
they become increasingly less busy, yet throughput remains roughly
constant. I say roughly because there's quite a bit of noise--this
is a 4-socket machine and I'm letting the scheduler move threads
around, although I'm limiting the cores that can be used with
taskset to maximize locality (this means that for 15 threads we're
using 16 host cores that are all in the same socket; note that
the additional thread is the requester one).

I have pushed the above changes, along with some minor fixes (e.g.
removing the Threads.name field) here:

  https://github.com/cota/qemu/tree/xiao

Note that the 0-len variable goes away, and that Threads become
read-only. I also use padding to make sure the events are in
separate cache lines.

Feel free to incorporate whatever you see fit from that branch into
a subsequent iteraiton.

I have also some minor comments, but we can postpone those for later.
There are some issues I'd like you to consider now, however:

- Make sure all bitmap ops are using atomic_set/read. Add additional
  helpers if necessary.

- Constify everywhere the Ops struct.

- Consider just registering a size_t instead of a function to get the
  job size from the Ops struct.

And then a possible optimization for the actual use case you have:

- Consider using a system-specific number of threads (determined at
  run-time) for compression/decompression. For example, if the host
  system has a single core, there's no point in spawning more than a single
  thread. If the host system has 32 cores, you're probably leaving performance 
  on the table if you just use the default. 
  Ideally determining this number would also take into account the
  size of each job, which should also determine the number of
  job slots per worker thread.

Thanks,

		Emilio

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [Qemu-devel] [PATCH v2 2/5] util: introduce threaded workqueue
@ 2018-11-13 18:38     ` Emilio G. Cota
  0 siblings, 0 replies; 24+ messages in thread
From: Emilio G. Cota @ 2018-11-13 18:38 UTC (permalink / raw)
  To: guangrong.xiao
  Cc: pbonzini, mst, mtosatti, qemu-devel, kvm, dgilbert, peterx,
	wei.w.wang, jiang.biao2, eblake, quintela, Xiao Guangrong

On Tue, Nov 06, 2018 at 20:20:22 +0800, guangrong.xiao@gmail.com wrote:
> From: Xiao Guangrong <xiaoguangrong@tencent.com>
> 
> This modules implements the lockless and efficient threaded workqueue.
(snip)
> +++ b/util/threaded-workqueue.c
> +struct Threads {
> +    /*
> +     * in order to avoid contention, the @requests is partitioned to
> +     * @threads_nr pieces, each thread exclusively handles
> +     * @thread_request_nr requests in the array.
> +     */
> +    void *requests;
(snip)
> +    /*
> +     * the bit in these two bitmaps indicates the index of the @requests
> +     * respectively. If it's the same, the corresponding request is free
> +     * and owned by the user, i.e, where the user fills a request. Otherwise,
> +     * it is valid and owned by the thread, i.e, where the thread fetches
> +     * the request and write the result.
> +     */
> +
> +    /* after the user fills the request, the bit is flipped. */
> +    unsigned long *request_fill_bitmap;
> +    /* after handles the request, the thread flips the bit. */
> +    unsigned long *request_done_bitmap;
(snip)
> +    /* the request is pushed to the thread with round-robin manner */
> +    unsigned int current_thread_index;
(snip)
> +    QemuEvent ev;
(snip)
> +};

The fields I'm showing above are all shared by all worker threads.
This can lead to unnecessary contention. For example:
- Adjacent requests can share the same cache line, which might be
  written to by different worker threads (when setting request->done)

- The request_done bitmap is written to by worker threads every time
  a job completes. At high core counts with low numbers of job slots,
  this can result in high contention. For example, imagine we have
  16 threads with 4 jobs each. This only requires 64 bits == 8 bytes, i.e.
  much less than a cache line. Whenever a job completes, the cache line
  will be atomically updated by one of the 16 threads.

- The completion event (Threads.ev above) is written to by every thread.
  Again, this can result in contention at large core counts.

An orthogonal issue is the round-robin policy. This can give us fairness,
in that we guarantee that all workers get a similar number of jobs.
But giving one job at a time to each worker is suboptimal when the job
sizes are small-ish, because it misses out on the benefits of batching,
which amortize the cost of communication.
Given that the number of jobs that we have (at least in the benchmark)
are small, filling up a worker's queue before moving on to the next
can yield a significant speedup at high core counts.

I implemented the above on top of your series. The results are as follows:

                                         threaded-workqueue-bench -r 4 -m 2 -c 20 -t #N
                                              Host: AMD Opteron(tm) Processor 6376
                                          Thread pinning: #N+1 cores, same-socket first

         12 +-------------------------------------------------------------------------------------------------------+
            |    +   +     +     +     +     +    A+     +     +     +     +     +     +     +     +     +     +    |
            |                                     $                                                  before ***B*** |
         10 |-+                                  $$                                               +batching ###D###-|
            |                                    $$                                       +per-thread-state $$$A$$$ |
            |                                    $$  A        A                                                     |
            |                     $AD     D$A $A $ $ $A  A   $$               A        A  A$       A        A$ A    |
          8 |-+               D$AA  A# D$AA# A  $#D$$  $ $$ A  $   $A $A      $$ A$ A$A $ $ AA   $A $  $A   $ A   +-|
            |                AA  B* B$DA D  DD# A #$$   A  A   $$AA  A  A$A  $  A  A     A    $ A    AA  A$A        |
            |               $DB*B  B $ $ BB    D   $$  #D #D   A           A$A                 A                    |
          6 |-+          $AA*B       *A *  *       $# D  D  D#  #D #D   D#    D#DD#D   D# D#  # ##D      D#       +-|
            |           A             BB   *       A D        DD  D  D#D  DD#D      D#D  D  DD  D  D# D#D  DD#DD    |
            |           $                   B                                                        D              |
            |         $A                     **BB     B                                                             |
          4 |-+      A#                      B   *    **                                                          +-|
            |        $                            B *B  BB* B*                    *BB*B   B*BB*BB*B  B *BB* B*BB    |
            |      $A                              B       B  BB*BB*BB*BB*BB*BB*BB     **B         ** B    B        |
          2 |-+   A                                                                    B           B              +-|
            |     $                                                                                                 |
            |    A                                                                                                  |
            |    +   +     +     +     +     +     +     +     +     +     +     +     +     +     +     +     +    |
          0 +-------------------------------------------------------------------------------------------------------+
                 1   4     8     12    16    20    24    28    32    36    40    44    48    52    56    60    64
                                                             Threads
  png: https://imgur.com/Aj4yfGO

Note: "Threads" in the X axis means "worker threads".

Batching achieves higher performance at high core counts (>8),
since worker threads go through fewer sleep+wake cycles while
waiting for jobs. Performance however diminishes as more threads
are used (>=24) due to cache line contention.

Contention can be avoided by partitioning the request array, bitmaps
and completion event to be entirely local to worker threads
("+per-thread-state"). This avoids the performance decrease at >=24
threads that we observe with batching alone. Note that the overall
speedup does not go beyond ~8; this is explained by the fact that we
use a single requester thread. Thus, as we add more worker threads,
they become increasingly less busy, yet throughput remains roughly
constant. I say roughly because there's quite a bit of noise--this
is a 4-socket machine and I'm letting the scheduler move threads
around, although I'm limiting the cores that can be used with
taskset to maximize locality (this means that for 15 threads we're
using 16 host cores that are all in the same socket; note that
the additional thread is the requester one).

I have pushed the above changes, along with some minor fixes (e.g.
removing the Threads.name field) here:

  https://github.com/cota/qemu/tree/xiao

Note that the 0-len variable goes away, and that Threads become
read-only. I also use padding to make sure the events are in
separate cache lines.

Feel free to incorporate whatever you see fit from that branch into
a subsequent iteraiton.

I have also some minor comments, but we can postpone those for later.
There are some issues I'd like you to consider now, however:

- Make sure all bitmap ops are using atomic_set/read. Add additional
  helpers if necessary.

- Constify everywhere the Ops struct.

- Consider just registering a size_t instead of a function to get the
  job size from the Ops struct.

And then a possible optimization for the actual use case you have:

- Consider using a system-specific number of threads (determined at
  run-time) for compression/decompression. For example, if the host
  system has a single core, there's no point in spawning more than a single
  thread. If the host system has 32 cores, you're probably leaving performance 
  on the table if you just use the default. 
  Ideally determining this number would also take into account the
  size of each job, which should also determine the number of
  job slots per worker thread.

Thanks,

		Emilio

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [PATCH v2 2/5] util: introduce threaded workqueue
  2018-11-13 18:38     ` [Qemu-devel] " Emilio G. Cota
@ 2018-11-20 10:25       ` Xiao Guangrong
  -1 siblings, 0 replies; 24+ messages in thread
From: Xiao Guangrong @ 2018-11-20 10:25 UTC (permalink / raw)
  To: Emilio G. Cota
  Cc: kvm, mst, mtosatti, Xiao Guangrong, dgilbert, peterx, qemu-devel,
	quintela, wei.w.wang, jiang.biao2, pbonzini



On 11/14/18 2:38 AM, Emilio G. Cota wrote:
> On Tue, Nov 06, 2018 at 20:20:22 +0800, guangrong.xiao@gmail.com wrote:
>> From: Xiao Guangrong <xiaoguangrong@tencent.com>
>>
>> This modules implements the lockless and efficient threaded workqueue.
> (snip)
>> +++ b/util/threaded-workqueue.c
>> +struct Threads {
>> +    /*
>> +     * in order to avoid contention, the @requests is partitioned to
>> +     * @threads_nr pieces, each thread exclusively handles
>> +     * @thread_request_nr requests in the array.
>> +     */
>> +    void *requests;
> (snip)
>> +    /*
>> +     * the bit in these two bitmaps indicates the index of the @requests
>> +     * respectively. If it's the same, the corresponding request is free
>> +     * and owned by the user, i.e, where the user fills a request. Otherwise,
>> +     * it is valid and owned by the thread, i.e, where the thread fetches
>> +     * the request and write the result.
>> +     */
>> +
>> +    /* after the user fills the request, the bit is flipped. */
>> +    unsigned long *request_fill_bitmap;
>> +    /* after handles the request, the thread flips the bit. */
>> +    unsigned long *request_done_bitmap;
> (snip)
>> +    /* the request is pushed to the thread with round-robin manner */
>> +    unsigned int current_thread_index;
> (snip)
>> +    QemuEvent ev;
> (snip)
>> +};
> 
> The fields I'm showing above are all shared by all worker threads.
> This can lead to unnecessary contention. For example:
> - Adjacent requests can share the same cache line, which might be
>    written to by different worker threads (when setting request->done)
> 
> - The request_done bitmap is written to by worker threads every time
>    a job completes. At high core counts with low numbers of job slots,
>    this can result in high contention. For example, imagine we have
>    16 threads with 4 jobs each. This only requires 64 bits == 8 bytes, i.e.
>    much less than a cache line. Whenever a job completes, the cache line
>    will be atomically updated by one of the 16 threads.
> 
> - The completion event (Threads.ev above) is written to by every thread.
>    Again, this can result in contention at large core counts.
> 
> An orthogonal issue is the round-robin policy. This can give us fairness,
> in that we guarantee that all workers get a similar number of jobs.
> But giving one job at a time to each worker is suboptimal when the job
> sizes are small-ish, because it misses out on the benefits of batching,
> which amortize the cost of communication.
> Given that the number of jobs that we have (at least in the benchmark)
> are small, filling up a worker's queue before moving on to the next
> can yield a significant speedup at high core counts.
> 
> I implemented the above on top of your series. The results are as follows:
> 
>                                           threaded-workqueue-bench -r 4 -m 2 -c 20 -t #N
>                                                Host: AMD Opteron(tm) Processor 6376
>                                            Thread pinning: #N+1 cores, same-socket first
> 
>           12 +-------------------------------------------------------------------------------------------------------+
>              |    +   +     +     +     +     +    A+     +     +     +     +     +     +     +     +     +     +    |
>              |                                     $                                                  before ***B*** |
>           10 |-+                                  $$                                               +batching ###D###-|
>              |                                    $$                                       +per-thread-state $$$A$$$ |
>              |                                    $$  A        A                                                     |
>              |                     $AD     D$A $A $ $ $A  A   $$               A        A  A$       A        A$ A    |
>            8 |-+               D$AA  A# D$AA# A  $#D$$  $ $$ A  $   $A $A      $$ A$ A$A $ $ AA   $A $  $A   $ A   +-|
>              |                AA  B* B$DA D  DD# A #$$   A  A   $$AA  A  A$A  $  A  A     A    $ A    AA  A$A        |
>              |               $DB*B  B $ $ BB    D   $$  #D #D   A           A$A                 A                    |
>            6 |-+          $AA*B       *A *  *       $# D  D  D#  #D #D   D#    D#DD#D   D# D#  # ##D      D#       +-|
>              |           A             BB   *       A D        DD  D  D#D  DD#D      D#D  D  DD  D  D# D#D  DD#DD    |
>              |           $                   B                                                        D              |
>              |         $A                     **BB     B                                                             |
>            4 |-+      A#                      B   *    **                                                          +-|
>              |        $                            B *B  BB* B*                    *BB*B   B*BB*BB*B  B *BB* B*BB    |
>              |      $A                              B       B  BB*BB*BB*BB*BB*BB*BB     **B         ** B    B        |
>            2 |-+   A                                                                    B           B              +-|
>              |     $                                                                                                 |
>              |    A                                                                                                  |
>              |    +   +     +     +     +     +     +     +     +     +     +     +     +     +     +     +     +    |
>            0 +-------------------------------------------------------------------------------------------------------+
>                   1   4     8     12    16    20    24    28    32    36    40    44    48    52    56    60    64
>                                                               Threads
>    png: https://imgur.com/Aj4yfGO
> 
> Note: "Threads" in the X axis means "worker threads".
> 
> Batching achieves higher performance at high core counts (>8),
> since worker threads go through fewer sleep+wake cycles while
> waiting for jobs. Performance however diminishes as more threads
> are used (>=24) due to cache line contention.
> 
> Contention can be avoided by partitioning the request array, bitmaps
> and completion event to be entirely local to worker threads
> ("+per-thread-state"). This avoids the performance decrease at >=24
> threads that we observe with batching alone. Note that the overall
> speedup does not go beyond ~8; this is explained by the fact that we
> use a single requester thread. Thus, as we add more worker threads,
> they become increasingly less busy, yet throughput remains roughly
> constant. I say roughly because there's quite a bit of noise--this
> is a 4-socket machine and I'm letting the scheduler move threads
> around, although I'm limiting the cores that can be used with
> taskset to maximize locality (this means that for 15 threads we're
> using 16 host cores that are all in the same socket; note that
> the additional thread is the requester one).

Hmm... I have carefully written the stuff step by step:
1. separate @requests from threads to each single thread
2. separate @completion from threads
3. use batch mode
4. separate bitmaps from threads
5. revert batch mode based on step 4
6. compare them with directly using Emilio's patches.

The big different between my modification and Emilio's patches is
that i still make per-thread-data be attached in the end of
@Threads.

I got these performance data:
	https://ibb.co/kcLnLL

Indeed, i can almost reproduce your conclusion. The confused part
is batch -  I used 16G memory to do the benchmark, the batch improved
nothing, I guess all the threads keep busy under this case anyway.


> 
> I have pushed the above changes, along with some minor fixes (e.g.
> removing the Threads.name field) here:
> 
>    https://github.com/cota/qemu/tree/xiao
> 
> Note that the 0-len variable goes away, and that Threads become
> read-only. I also use padding to make sure the events are in
> separate cache lines.
> 
> Feel free to incorporate whatever you see fit from that branch into
> a subsequent iteraiton.

Nice, I will add you to the author list if you do not object. :)

> 
> I have also some minor comments, but we can postpone those for later.
> There are some issues I'd like you to consider now, however:
> 
> - Make sure all bitmap ops are using atomic_set/read. Add additional
>    helpers if necessary.
> 

Good to me. And we have moved the requests to each thread, that is
requests-per-thread, i think 64 is big enough, so i am planning to
limit it to the max of 64 and use u64 as bitmap.

> - Constify everywhere the Ops struct.

Good suggestion.

> 
> - Consider just registering a size_t instead of a function to get the
>    job size from the Ops struct.
> 

Okay, that's good to me.

> And then a possible optimization for the actual use case you have:
> 
> - Consider using a system-specific number of threads (determined at
>    run-time) for compression/decompression. For example, if the host
>    system has a single core, there's no point in spawning more than a single
>    thread. If the host system has 32 cores, you're probably leaving performance
>    on the table if you just use the default.
>    Ideally determining this number would also take into account the
>    size of each job, which should also determine the number of
>    job slots per worker thread.
> 

It can not work well... It depends on the CPU usage rather than CPU number.
Furthermore, we developed adaptive migration that will dynimically adjust
thread number based on the resource usage. more detailed please see:
   https://kvmforum2018.sched.com/event/FzuU/adaptive-live-migration-xiao-guangrong-yulei-zhang-tencent-cloud

Thanks!

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [Qemu-devel] [PATCH v2 2/5] util: introduce threaded workqueue
@ 2018-11-20 10:25       ` Xiao Guangrong
  0 siblings, 0 replies; 24+ messages in thread
From: Xiao Guangrong @ 2018-11-20 10:25 UTC (permalink / raw)
  To: Emilio G. Cota
  Cc: pbonzini, mst, mtosatti, qemu-devel, kvm, dgilbert, peterx,
	wei.w.wang, jiang.biao2, eblake, quintela, Xiao Guangrong



On 11/14/18 2:38 AM, Emilio G. Cota wrote:
> On Tue, Nov 06, 2018 at 20:20:22 +0800, guangrong.xiao@gmail.com wrote:
>> From: Xiao Guangrong <xiaoguangrong@tencent.com>
>>
>> This modules implements the lockless and efficient threaded workqueue.
> (snip)
>> +++ b/util/threaded-workqueue.c
>> +struct Threads {
>> +    /*
>> +     * in order to avoid contention, the @requests is partitioned to
>> +     * @threads_nr pieces, each thread exclusively handles
>> +     * @thread_request_nr requests in the array.
>> +     */
>> +    void *requests;
> (snip)
>> +    /*
>> +     * the bit in these two bitmaps indicates the index of the @requests
>> +     * respectively. If it's the same, the corresponding request is free
>> +     * and owned by the user, i.e, where the user fills a request. Otherwise,
>> +     * it is valid and owned by the thread, i.e, where the thread fetches
>> +     * the request and write the result.
>> +     */
>> +
>> +    /* after the user fills the request, the bit is flipped. */
>> +    unsigned long *request_fill_bitmap;
>> +    /* after handles the request, the thread flips the bit. */
>> +    unsigned long *request_done_bitmap;
> (snip)
>> +    /* the request is pushed to the thread with round-robin manner */
>> +    unsigned int current_thread_index;
> (snip)
>> +    QemuEvent ev;
> (snip)
>> +};
> 
> The fields I'm showing above are all shared by all worker threads.
> This can lead to unnecessary contention. For example:
> - Adjacent requests can share the same cache line, which might be
>    written to by different worker threads (when setting request->done)
> 
> - The request_done bitmap is written to by worker threads every time
>    a job completes. At high core counts with low numbers of job slots,
>    this can result in high contention. For example, imagine we have
>    16 threads with 4 jobs each. This only requires 64 bits == 8 bytes, i.e.
>    much less than a cache line. Whenever a job completes, the cache line
>    will be atomically updated by one of the 16 threads.
> 
> - The completion event (Threads.ev above) is written to by every thread.
>    Again, this can result in contention at large core counts.
> 
> An orthogonal issue is the round-robin policy. This can give us fairness,
> in that we guarantee that all workers get a similar number of jobs.
> But giving one job at a time to each worker is suboptimal when the job
> sizes are small-ish, because it misses out on the benefits of batching,
> which amortize the cost of communication.
> Given that the number of jobs that we have (at least in the benchmark)
> are small, filling up a worker's queue before moving on to the next
> can yield a significant speedup at high core counts.
> 
> I implemented the above on top of your series. The results are as follows:
> 
>                                           threaded-workqueue-bench -r 4 -m 2 -c 20 -t #N
>                                                Host: AMD Opteron(tm) Processor 6376
>                                            Thread pinning: #N+1 cores, same-socket first
> 
>           12 +-------------------------------------------------------------------------------------------------------+
>              |    +   +     +     +     +     +    A+     +     +     +     +     +     +     +     +     +     +    |
>              |                                     $                                                  before ***B*** |
>           10 |-+                                  $$                                               +batching ###D###-|
>              |                                    $$                                       +per-thread-state $$$A$$$ |
>              |                                    $$  A        A                                                     |
>              |                     $AD     D$A $A $ $ $A  A   $$               A        A  A$       A        A$ A    |
>            8 |-+               D$AA  A# D$AA# A  $#D$$  $ $$ A  $   $A $A      $$ A$ A$A $ $ AA   $A $  $A   $ A   +-|
>              |                AA  B* B$DA D  DD# A #$$   A  A   $$AA  A  A$A  $  A  A     A    $ A    AA  A$A        |
>              |               $DB*B  B $ $ BB    D   $$  #D #D   A           A$A                 A                    |
>            6 |-+          $AA*B       *A *  *       $# D  D  D#  #D #D   D#    D#DD#D   D# D#  # ##D      D#       +-|
>              |           A             BB   *       A D        DD  D  D#D  DD#D      D#D  D  DD  D  D# D#D  DD#DD    |
>              |           $                   B                                                        D              |
>              |         $A                     **BB     B                                                             |
>            4 |-+      A#                      B   *    **                                                          +-|
>              |        $                            B *B  BB* B*                    *BB*B   B*BB*BB*B  B *BB* B*BB    |
>              |      $A                              B       B  BB*BB*BB*BB*BB*BB*BB     **B         ** B    B        |
>            2 |-+   A                                                                    B           B              +-|
>              |     $                                                                                                 |
>              |    A                                                                                                  |
>              |    +   +     +     +     +     +     +     +     +     +     +     +     +     +     +     +     +    |
>            0 +-------------------------------------------------------------------------------------------------------+
>                   1   4     8     12    16    20    24    28    32    36    40    44    48    52    56    60    64
>                                                               Threads
>    png: https://imgur.com/Aj4yfGO
> 
> Note: "Threads" in the X axis means "worker threads".
> 
> Batching achieves higher performance at high core counts (>8),
> since worker threads go through fewer sleep+wake cycles while
> waiting for jobs. Performance however diminishes as more threads
> are used (>=24) due to cache line contention.
> 
> Contention can be avoided by partitioning the request array, bitmaps
> and completion event to be entirely local to worker threads
> ("+per-thread-state"). This avoids the performance decrease at >=24
> threads that we observe with batching alone. Note that the overall
> speedup does not go beyond ~8; this is explained by the fact that we
> use a single requester thread. Thus, as we add more worker threads,
> they become increasingly less busy, yet throughput remains roughly
> constant. I say roughly because there's quite a bit of noise--this
> is a 4-socket machine and I'm letting the scheduler move threads
> around, although I'm limiting the cores that can be used with
> taskset to maximize locality (this means that for 15 threads we're
> using 16 host cores that are all in the same socket; note that
> the additional thread is the requester one).

Hmm... I have carefully written the stuff step by step:
1. separate @requests from threads to each single thread
2. separate @completion from threads
3. use batch mode
4. separate bitmaps from threads
5. revert batch mode based on step 4
6. compare them with directly using Emilio's patches.

The big different between my modification and Emilio's patches is
that i still make per-thread-data be attached in the end of
@Threads.

I got these performance data:
	https://ibb.co/kcLnLL

Indeed, i can almost reproduce your conclusion. The confused part
is batch -  I used 16G memory to do the benchmark, the batch improved
nothing, I guess all the threads keep busy under this case anyway.


> 
> I have pushed the above changes, along with some minor fixes (e.g.
> removing the Threads.name field) here:
> 
>    https://github.com/cota/qemu/tree/xiao
> 
> Note that the 0-len variable goes away, and that Threads become
> read-only. I also use padding to make sure the events are in
> separate cache lines.
> 
> Feel free to incorporate whatever you see fit from that branch into
> a subsequent iteraiton.

Nice, I will add you to the author list if you do not object. :)

> 
> I have also some minor comments, but we can postpone those for later.
> There are some issues I'd like you to consider now, however:
> 
> - Make sure all bitmap ops are using atomic_set/read. Add additional
>    helpers if necessary.
> 

Good to me. And we have moved the requests to each thread, that is
requests-per-thread, i think 64 is big enough, so i am planning to
limit it to the max of 64 and use u64 as bitmap.

> - Constify everywhere the Ops struct.

Good suggestion.

> 
> - Consider just registering a size_t instead of a function to get the
>    job size from the Ops struct.
> 

Okay, that's good to me.

> And then a possible optimization for the actual use case you have:
> 
> - Consider using a system-specific number of threads (determined at
>    run-time) for compression/decompression. For example, if the host
>    system has a single core, there's no point in spawning more than a single
>    thread. If the host system has 32 cores, you're probably leaving performance
>    on the table if you just use the default.
>    Ideally determining this number would also take into account the
>    size of each job, which should also determine the number of
>    job slots per worker thread.
> 

It can not work well... It depends on the CPU usage rather than CPU number.
Furthermore, we developed adaptive migration that will dynimically adjust
thread number based on the resource usage. more detailed please see:
   https://kvmforum2018.sched.com/event/FzuU/adaptive-live-migration-xiao-guangrong-yulei-zhang-tencent-cloud

Thanks!

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [PATCH v2 2/5] util: introduce threaded workqueue
  2018-11-20 10:25       ` [Qemu-devel] " Xiao Guangrong
@ 2018-11-20 16:33         ` Emilio G. Cota
  -1 siblings, 0 replies; 24+ messages in thread
From: Emilio G. Cota @ 2018-11-20 16:33 UTC (permalink / raw)
  To: Xiao Guangrong
  Cc: kvm, mst, mtosatti, Xiao Guangrong, dgilbert, peterx, qemu-devel,
	quintela, wei.w.wang, jiang.biao2, pbonzini

On Tue, Nov 20, 2018 at 18:25:25 +0800, Xiao Guangrong wrote:
> On 11/14/18 2:38 AM, Emilio G. Cota wrote:
> > On Tue, Nov 06, 2018 at 20:20:22 +0800, guangrong.xiao@gmail.com wrote:
> > > From: Xiao Guangrong <xiaoguangrong@tencent.com>
(snip)
> > Batching achieves higher performance at high core counts (>8),
> > since worker threads go through fewer sleep+wake cycles while
> > waiting for jobs. Performance however diminishes as more threads
> > are used (>=24) due to cache line contention.
> > 
> > Contention can be avoided by partitioning the request array, bitmaps
> > and completion event to be entirely local to worker threads
> > ("+per-thread-state"). This avoids the performance decrease at >=24
> > threads that we observe with batching alone. Note that the overall
> > speedup does not go beyond ~8; this is explained by the fact that we
> > use a single requester thread. Thus, as we add more worker threads,
> > they become increasingly less busy, yet throughput remains roughly
> > constant. I say roughly because there's quite a bit of noise--this
> > is a 4-socket machine and I'm letting the scheduler move threads
> > around, although I'm limiting the cores that can be used with
> > taskset to maximize locality (this means that for 15 threads we're
> > using 16 host cores that are all in the same socket; note that
> > the additional thread is the requester one).
> 
> Hmm... I have carefully written the stuff step by step:
> 1. separate @requests from threads to each single thread
> 2. separate @completion from threads
> 3. use batch mode
> 4. separate bitmaps from threads
> 5. revert batch mode based on step 4
> 6. compare them with directly using Emilio's patches.
> 
> The big different between my modification and Emilio's patches is
> that i still make per-thread-data be attached in the end of
> @Threads.
>
> I got these performance data:
> 	https://ibb.co/kcLnLL
> 
> Indeed, i can almost reproduce your conclusion. The confused part
> is batch -  I used 16G memory to do the benchmark, the batch improved
> nothing, I guess all the threads keep busy under this case anyway.

I wouldn't worry too much about it, then. The machine I ran my tests
on is rather old, so it's possible that cross-core/socket communication
is much slower there than on your machine, which makes batching
more attractive. At the end of the day, the measurements you take on
the machines you care about are what matter =)

> > I have pushed the above changes, along with some minor fixes (e.g.
> > removing the Threads.name field) here:
> > 
> >    https://github.com/cota/qemu/tree/xiao
> > 
> > Note that the 0-len variable goes away, and that Threads become
> > read-only. I also use padding to make sure the events are in
> > separate cache lines.
> > 
> > Feel free to incorporate whatever you see fit from that branch into
> > a subsequent iteraiton.
> 
> Nice, I will add you to the author list if you do not object. :)

That's not necessary. You deserve the full credit -- I just reviewed
the code ;-)

Thanks,

		Emilio

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [Qemu-devel] [PATCH v2 2/5] util: introduce threaded workqueue
@ 2018-11-20 16:33         ` Emilio G. Cota
  0 siblings, 0 replies; 24+ messages in thread
From: Emilio G. Cota @ 2018-11-20 16:33 UTC (permalink / raw)
  To: Xiao Guangrong
  Cc: pbonzini, mst, mtosatti, qemu-devel, kvm, dgilbert, peterx,
	wei.w.wang, jiang.biao2, eblake, quintela, Xiao Guangrong

On Tue, Nov 20, 2018 at 18:25:25 +0800, Xiao Guangrong wrote:
> On 11/14/18 2:38 AM, Emilio G. Cota wrote:
> > On Tue, Nov 06, 2018 at 20:20:22 +0800, guangrong.xiao@gmail.com wrote:
> > > From: Xiao Guangrong <xiaoguangrong@tencent.com>
(snip)
> > Batching achieves higher performance at high core counts (>8),
> > since worker threads go through fewer sleep+wake cycles while
> > waiting for jobs. Performance however diminishes as more threads
> > are used (>=24) due to cache line contention.
> > 
> > Contention can be avoided by partitioning the request array, bitmaps
> > and completion event to be entirely local to worker threads
> > ("+per-thread-state"). This avoids the performance decrease at >=24
> > threads that we observe with batching alone. Note that the overall
> > speedup does not go beyond ~8; this is explained by the fact that we
> > use a single requester thread. Thus, as we add more worker threads,
> > they become increasingly less busy, yet throughput remains roughly
> > constant. I say roughly because there's quite a bit of noise--this
> > is a 4-socket machine and I'm letting the scheduler move threads
> > around, although I'm limiting the cores that can be used with
> > taskset to maximize locality (this means that for 15 threads we're
> > using 16 host cores that are all in the same socket; note that
> > the additional thread is the requester one).
> 
> Hmm... I have carefully written the stuff step by step:
> 1. separate @requests from threads to each single thread
> 2. separate @completion from threads
> 3. use batch mode
> 4. separate bitmaps from threads
> 5. revert batch mode based on step 4
> 6. compare them with directly using Emilio's patches.
> 
> The big different between my modification and Emilio's patches is
> that i still make per-thread-data be attached in the end of
> @Threads.
>
> I got these performance data:
> 	https://ibb.co/kcLnLL
> 
> Indeed, i can almost reproduce your conclusion. The confused part
> is batch -  I used 16G memory to do the benchmark, the batch improved
> nothing, I guess all the threads keep busy under this case anyway.

I wouldn't worry too much about it, then. The machine I ran my tests
on is rather old, so it's possible that cross-core/socket communication
is much slower there than on your machine, which makes batching
more attractive. At the end of the day, the measurements you take on
the machines you care about are what matter =)

> > I have pushed the above changes, along with some minor fixes (e.g.
> > removing the Threads.name field) here:
> > 
> >    https://github.com/cota/qemu/tree/xiao
> > 
> > Note that the 0-len variable goes away, and that Threads become
> > read-only. I also use padding to make sure the events are in
> > separate cache lines.
> > 
> > Feel free to incorporate whatever you see fit from that branch into
> > a subsequent iteraiton.
> 
> Nice, I will add you to the author list if you do not object. :)

That's not necessary. You deserve the full credit -- I just reviewed
the code ;-)

Thanks,

		Emilio

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [PATCH v2 0/5] migration: improve multithreads
  2018-11-12  3:07   ` [Qemu-devel] " Xiao Guangrong
@ 2018-11-20 18:27     ` Paolo Bonzini
  -1 siblings, 0 replies; 24+ messages in thread
From: Paolo Bonzini @ 2018-11-20 18:27 UTC (permalink / raw)
  To: Xiao Guangrong, mst, mtosatti
  Cc: kvm, quintela, Xiao Guangrong, qemu-devel, peterx, dgilbert,
	wei.w.wang, cota, jiang.biao2

On 12/11/18 04:07, Xiao Guangrong wrote:
> 
> Hi,
> 
> Ping...

Hi Guangrong, I think this isn't being reviewed because we're in freeze.

Paolo

> On 11/6/18 8:20 PM, guangrong.xiao@gmail.com wrote:
>> From: Xiao Guangrong <xiaoguangrong@tencent.com>
>>
>> Changelog in v2:
>> These changes are based on Paolo's suggestion:
>> 1) rename the lockless multithreads model to threaded workqueue
>> 2) hugely improve the internal design, that make all the request be
>>     a large array, properly partition it, assign requests to threads
>>     respectively and use bitmaps to sync up threads and the submitter,
>>     after that ptr_ring and spinlock are dropped
>> 3) introduce event wait for the submitter
>>
>> These changes are based on Emilio's review:
>> 4) make more detailed description for threaded workqueue
>> 5) add a benchmark for threaded workqueue
>>
>> The previous version can be found at
>>     https://marc.info/?l=kvm&m=153968821910007&w=2
>>
>> There's the simple performance measurement comparing these two versions,
>> the environment is the same as we listed in the previous version.
>>
>> Use 8 threads to compress the data in the source QEMU
>> - with compress-wait-thread = off
>>
>>
>>        total time        busy-ratio
>> --------------------------------------------------
>> v1    125066            0.38
>> v2    120444            0.35
>>
>> - with compress-wait-thread = on
>>           total time    busy-ratio
>> --------------------------------------------------
>> v1    164426            0
>> v2    142609            0
>>
>> The v2 win slightly.
>>
>> Xiao Guangrong (5):
>>    bitops: introduce change_bit_atomic
>>    util: introduce threaded workqueue
>>    migration: use threaded workqueue for compression
>>    migration: use threaded workqueue for decompression
>>    tests: add threaded-workqueue-bench
>>
>>   include/qemu/bitops.h             |  13 +
>>   include/qemu/threaded-workqueue.h |  94 +++++++
>>   migration/ram.c                   | 538
>> ++++++++++++++------------------------
>>   tests/Makefile.include            |   5 +-
>>   tests/threaded-workqueue-bench.c  | 256 ++++++++++++++++++
>>   util/Makefile.objs                |   1 +
>>   util/threaded-workqueue.c         | 466
>> +++++++++++++++++++++++++++++++++
>>   7 files changed, 1030 insertions(+), 343 deletions(-)
>>   create mode 100644 include/qemu/threaded-workqueue.h
>>   create mode 100644 tests/threaded-workqueue-bench.c
>>   create mode 100644 util/threaded-workqueue.c
>>

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [Qemu-devel] [PATCH v2 0/5] migration: improve multithreads
@ 2018-11-20 18:27     ` Paolo Bonzini
  0 siblings, 0 replies; 24+ messages in thread
From: Paolo Bonzini @ 2018-11-20 18:27 UTC (permalink / raw)
  To: Xiao Guangrong, mst, mtosatti
  Cc: qemu-devel, kvm, dgilbert, peterx, wei.w.wang, jiang.biao2,
	eblake, quintela, cota, Xiao Guangrong

On 12/11/18 04:07, Xiao Guangrong wrote:
> 
> Hi,
> 
> Ping...

Hi Guangrong, I think this isn't being reviewed because we're in freeze.

Paolo

> On 11/6/18 8:20 PM, guangrong.xiao@gmail.com wrote:
>> From: Xiao Guangrong <xiaoguangrong@tencent.com>
>>
>> Changelog in v2:
>> These changes are based on Paolo's suggestion:
>> 1) rename the lockless multithreads model to threaded workqueue
>> 2) hugely improve the internal design, that make all the request be
>>     a large array, properly partition it, assign requests to threads
>>     respectively and use bitmaps to sync up threads and the submitter,
>>     after that ptr_ring and spinlock are dropped
>> 3) introduce event wait for the submitter
>>
>> These changes are based on Emilio's review:
>> 4) make more detailed description for threaded workqueue
>> 5) add a benchmark for threaded workqueue
>>
>> The previous version can be found at
>>     https://marc.info/?l=kvm&m=153968821910007&w=2
>>
>> There's the simple performance measurement comparing these two versions,
>> the environment is the same as we listed in the previous version.
>>
>> Use 8 threads to compress the data in the source QEMU
>> - with compress-wait-thread = off
>>
>>
>>        total time        busy-ratio
>> --------------------------------------------------
>> v1    125066            0.38
>> v2    120444            0.35
>>
>> - with compress-wait-thread = on
>>           total time    busy-ratio
>> --------------------------------------------------
>> v1    164426            0
>> v2    142609            0
>>
>> The v2 win slightly.
>>
>> Xiao Guangrong (5):
>>    bitops: introduce change_bit_atomic
>>    util: introduce threaded workqueue
>>    migration: use threaded workqueue for compression
>>    migration: use threaded workqueue for decompression
>>    tests: add threaded-workqueue-bench
>>
>>   include/qemu/bitops.h             |  13 +
>>   include/qemu/threaded-workqueue.h |  94 +++++++
>>   migration/ram.c                   | 538
>> ++++++++++++++------------------------
>>   tests/Makefile.include            |   5 +-
>>   tests/threaded-workqueue-bench.c  | 256 ++++++++++++++++++
>>   util/Makefile.objs                |   1 +
>>   util/threaded-workqueue.c         | 466
>> +++++++++++++++++++++++++++++++++
>>   7 files changed, 1030 insertions(+), 343 deletions(-)
>>   create mode 100644 include/qemu/threaded-workqueue.h
>>   create mode 100644 tests/threaded-workqueue-bench.c
>>   create mode 100644 util/threaded-workqueue.c
>>

^ permalink raw reply	[flat|nested] 24+ messages in thread

end of thread, other threads:[~2018-11-20 18:28 UTC | newest]

Thread overview: 24+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-11-06 12:20 [PATCH v2 0/5] migration: improve multithreads guangrong.xiao
2018-11-06 12:20 ` [Qemu-devel] " guangrong.xiao
2018-11-06 12:20 ` [PATCH v2 1/5] bitops: introduce change_bit_atomic guangrong.xiao
2018-11-06 12:20   ` [Qemu-devel] " guangrong.xiao
2018-11-06 12:20 ` [PATCH v2 2/5] util: introduce threaded workqueue guangrong.xiao
2018-11-06 12:20   ` [Qemu-devel] " guangrong.xiao
2018-11-13 18:38   ` Emilio G. Cota
2018-11-13 18:38     ` [Qemu-devel] " Emilio G. Cota
2018-11-20 10:25     ` Xiao Guangrong
2018-11-20 10:25       ` [Qemu-devel] " Xiao Guangrong
2018-11-20 16:33       ` Emilio G. Cota
2018-11-20 16:33         ` [Qemu-devel] " Emilio G. Cota
2018-11-06 12:20 ` [PATCH v2 3/5] migration: use threaded workqueue for compression guangrong.xiao
2018-11-06 12:20   ` [Qemu-devel] " guangrong.xiao
2018-11-06 12:20 ` [PATCH v2 4/5] migration: use threaded workqueue for decompression guangrong.xiao
2018-11-06 12:20   ` [Qemu-devel] " guangrong.xiao
2018-11-06 12:20 ` [PATCH v2 5/5] tests: add threaded-workqueue-bench guangrong.xiao
2018-11-06 12:20   ` [Qemu-devel] " guangrong.xiao
2018-11-07  2:52 ` [PATCH v2 0/5] migration: improve multithreads no-reply
2018-11-07  2:52   ` [Qemu-devel] " no-reply
2018-11-12  3:07 ` Xiao Guangrong
2018-11-12  3:07   ` [Qemu-devel] " Xiao Guangrong
2018-11-20 18:27   ` Paolo Bonzini
2018-11-20 18:27     ` [Qemu-devel] " Paolo Bonzini

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.