All of lore.kernel.org
 help / color / mirror / Atom feed
From: Liang Li <liang.z.li@intel.com>
To: qemu-devel@nongnu.org
Cc: quintela@redhat.com, Liang Li <liang.z.li@intel.com>,
	armbru@redhat.com, lcapitulino@redhat.com,
	Yang Zhang <yang.z.zhang@intel.com>,
	amit.shah@redhat.com, dgilbert@redhat.com
Subject: [Qemu-devel] [v7 08/14] migration: Add the core code of multi-thread compression
Date: Wed,  8 Apr 2015 14:20:05 +0800	[thread overview]
Message-ID: <1428474011-30797-9-git-send-email-liang.z.li@intel.com> (raw)
In-Reply-To: <1428474011-30797-1-git-send-email-liang.z.li@intel.com>

Implement the core logic of the multiple thread compression. At this
point, multiple thread compression can't co-work with xbzrle yet.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 arch_init.c | 183 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 177 insertions(+), 6 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index 44ac002..d657a6c 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -355,12 +355,35 @@ static DecompressParam *decomp_param;
 static QemuThread *decompress_threads;
 static uint8_t *compressed_data_buf;
 
+static int do_compress_ram_page(CompressParam *param);
+
+static inline void notify_compression_done(CompressParam *param)
+{
+    qemu_mutex_lock(comp_done_lock);
+    param->done = true;
+    qemu_cond_signal(comp_done_cond);
+    qemu_mutex_unlock(comp_done_lock);
+}
+
 static void *do_data_compress(void *opaque)
 {
-    while (true) {
+    CompressParam *param = opaque;
 
-    /* To be done */
+    while (true) {
+        qemu_mutex_lock(&param->mutex);
+        while (!param->start && !param->quit) {
+            qemu_cond_wait(&param->cond, &param->mutex);
+        }
+        if (param->quit) {
+            qemu_mutex_unlock(&param->mutex);
+            notify_compression_done(param);
+            break;
+        }
+        do_compress_ram_page(param);
+        param->start = false;
+        qemu_mutex_unlock(&param->mutex);
 
+        notify_compression_done(param);
     }
 
     return NULL;
@@ -368,7 +391,15 @@ static void *do_data_compress(void *opaque)
 
 static inline void terminate_compression_threads(void)
 {
-    /* To be done */
+    int idx, thread_count;
+
+    thread_count = migrate_compress_threads();
+    for (idx = 0; idx < thread_count; idx++) {
+        qemu_mutex_lock(&comp_param[idx].mutex);
+        comp_param[idx].quit = true;
+        qemu_cond_signal(&comp_param[idx].cond);
+        qemu_mutex_unlock(&comp_param[idx].mutex);
+    }
 }
 
 void migrate_compress_threads_join(void)
@@ -827,6 +858,95 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
     return pages;
 }
 
+static int do_compress_ram_page(CompressParam *param)
+{
+    int bytes_sent, blen;
+    uint8_t *p;
+    RAMBlock *block = param->block;
+    ram_addr_t offset = param->offset;
+
+    p = memory_region_get_ram_ptr(block->mr) + (offset & TARGET_PAGE_MASK);
+
+    bytes_sent = save_page_header(param->file, block, offset |
+                                  RAM_SAVE_FLAG_COMPRESS_PAGE);
+    blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
+                                     migrate_compress_level());
+    bytes_sent += blen;
+    atomic_inc(&acct_info.norm_pages);
+
+    return bytes_sent;
+}
+
+static inline void start_compression(CompressParam *param)
+{
+    param->done = false;
+    qemu_mutex_lock(&param->mutex);
+    param->start = true;
+    qemu_cond_signal(&param->cond);
+    qemu_mutex_unlock(&param->mutex);
+}
+
+
+static uint64_t bytes_transferred;
+
+static void flush_compressed_data(QEMUFile *f)
+{
+    int idx, len, thread_count;
+
+    if (!migrate_use_compression()) {
+        return;
+    }
+    thread_count = migrate_compress_threads();
+    for (idx = 0; idx < thread_count; idx++) {
+        qemu_mutex_lock(comp_done_lock);
+        while (!comp_param[idx].done && !comp_param[idx].quit) {
+            qemu_cond_wait(comp_done_cond, comp_done_lock);
+        }
+        qemu_mutex_unlock(comp_done_lock);
+        if (!comp_param[idx].quit) {
+            len = qemu_put_qemu_file(f, comp_param[idx].file);
+            bytes_transferred += len;
+        }
+    }
+}
+
+static inline void set_compress_params(CompressParam *param, RAMBlock *block,
+                                       ram_addr_t offset)
+{
+    param->block = block;
+    param->offset = offset;
+}
+
+static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
+                                           ram_addr_t offset,
+                                           uint64_t *bytes_transferred)
+{
+    int idx, thread_count, bytes_xmit = -1, pages = -1;
+
+    thread_count = migrate_compress_threads();
+    qemu_mutex_lock(comp_done_lock);
+    while (true) {
+        for (idx = 0; idx < thread_count; idx++) {
+            if (comp_param[idx].done) {
+                bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file);
+                set_compress_params(&comp_param[idx], block, offset);
+                start_compression(&comp_param[idx]);
+                pages = 1;
+                *bytes_transferred += bytes_xmit;
+                break;
+            }
+        }
+        if (pages > 0) {
+            break;
+        } else {
+            qemu_cond_wait(comp_done_cond, comp_done_lock);
+        }
+    }
+    qemu_mutex_unlock(comp_done_lock);
+
+    return pages;
+}
+
 /**
  * ram_save_compressed_page: compress the given page and send it to the stream
  *
@@ -843,8 +963,59 @@ static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
                                     uint64_t *bytes_transferred)
 {
     int pages = -1;
+    uint64_t bytes_xmit;
+    MemoryRegion *mr = block->mr;
+    uint8_t *p;
+    int ret;
 
-    /* To be done*/
+    p = memory_region_get_ram_ptr(mr) + offset;
+
+    bytes_xmit = 0;
+    ret = ram_control_save_page(f, block->offset,
+                                offset, TARGET_PAGE_SIZE, &bytes_xmit);
+    if (bytes_xmit) {
+        *bytes_transferred += bytes_xmit;
+        pages = 1;
+    }
+    if (block == last_sent_block) {
+        offset |= RAM_SAVE_FLAG_CONTINUE;
+    }
+    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
+        if (ret != RAM_SAVE_CONTROL_DELAYED) {
+            if (bytes_xmit > 0) {
+                acct_info.norm_pages++;
+            } else if (bytes_xmit == 0) {
+                acct_info.dup_pages++;
+            }
+        }
+    } else {
+        /* When starting the process of a new block, the first page of
+         * the block should be sent out before other pages in the same
+         * block, and all the pages in last block should have been sent
+         * out, keeping this order is important, because the 'cont' flag
+         * is used to avoid resending the block name.
+         */
+        if (block != last_sent_block) {
+            flush_compressed_data(f);
+            pages = save_zero_page(f, block, offset, p, bytes_transferred);
+            if (pages == -1) {
+                set_compress_params(&comp_param[0], block, offset);
+                /* Use the qemu thread to compress the data to make sure the
+                 * first page is sent out before other pages
+                 */
+                bytes_xmit = do_compress_ram_page(&comp_param[0]);
+                qemu_put_qemu_file(f, comp_param[0].file);
+                *bytes_transferred += bytes_xmit;
+                pages = 1;
+            }
+        } else {
+            pages = save_zero_page(f, block, offset, p, bytes_transferred);
+            if (pages == -1) {
+                pages = compress_page_with_multi_thread(f, block, offset,
+                                                        bytes_transferred);
+            }
+        }
+    }
 
     return pages;
 }
@@ -912,8 +1083,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage,
     return pages;
 }
 
-static uint64_t bytes_transferred;
-
 void acct_update_position(QEMUFile *f, size_t size, bool zero)
 {
     uint64_t pages = size / TARGET_PAGE_SIZE;
@@ -1127,6 +1296,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
         }
         i++;
     }
+    flush_compressed_data(f);
     rcu_read_unlock();
 
     /*
@@ -1168,6 +1338,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
         }
     }
 
+    flush_compressed_data(f);
     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
     migration_end();
 
-- 
1.9.1

  parent reply	other threads:[~2015-04-08  6:30 UTC|newest]

Thread overview: 34+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2015-04-08  6:19 [Qemu-devel] [PATCH v7 0/14] migration: Add a new feature to do live migration Liang Li
2015-04-08  6:19 ` [Qemu-devel] [v7 01/14] docs: Add a doc about multiple thread compression Liang Li
2015-04-08  6:19 ` [Qemu-devel] [v7 02/14] migration: Add the framework of multi-thread compression Liang Li
2015-04-08  6:20 ` [Qemu-devel] [v7 03/14] migration: Add the framework of multi-thread decompression Liang Li
2015-04-08  6:20 ` [Qemu-devel] [v7 04/14] qemu-file: Add compression functions to QEMUFile Liang Li
2015-04-08  6:20 ` [Qemu-devel] [v7 05/14] arch_init: Alloc and free data struct for compression Liang Li
2015-04-08  6:20 ` [Qemu-devel] [v7 06/14] arch_init: Add and free data struct for decompression Liang Li
2015-04-08  6:20 ` [Qemu-devel] [v7 07/14] migration: Split save_zero_page from ram_save_page Liang Li
2015-04-08  6:20 ` Liang Li [this message]
2015-04-08 11:14   ` [Qemu-devel] [v7 08/14] migration: Add the core code of multi-thread compression Juan Quintela
2015-04-08  6:20 ` [Qemu-devel] [v7 09/14] migration: Make compression co-work with xbzrle Liang Li
2015-04-08 11:26   ` Juan Quintela
2015-04-08 13:52     ` Li, Liang Z
2015-04-08  6:20 ` [Qemu-devel] [v7 10/14] migration: Add the core code for decompression Liang Li
2015-04-08 11:34   ` Juan Quintela
2015-04-08 14:00     ` Li, Liang Z
2015-04-08 14:48       ` Juan Quintela
2015-04-08  6:20 ` [Qemu-devel] [v7 11/14] migration: Add interface to control compression Liang Li
2015-04-08 11:34   ` Juan Quintela
2015-04-14 12:00   ` Eric Blake
2015-04-08  6:20 ` [Qemu-devel] [v7 12/14] migration: Use an array instead of 3 parameters Liang Li
2015-04-08 11:37   ` Juan Quintela
2015-04-14 12:01     ` Eric Blake
2015-04-14 12:03   ` Eric Blake
2015-04-15  8:13     ` Juan Quintela
2015-04-15  8:35       ` Li, Liang Z
2015-04-08  6:20 ` [Qemu-devel] [v7 13/14] migration: Add qmp commands to set and query parameters Liang Li
2015-04-08 11:39   ` Juan Quintela
2015-04-14  2:07     ` Li, Liang Z
2015-04-14  9:24       ` Juan Quintela
2015-04-14 12:09   ` Eric Blake
2015-04-14 12:16     ` Eric Blake
2015-04-08  6:20 ` [Qemu-devel] [v7 14/14] migration: Add hmp interface " Liang Li
2015-04-14 12:20   ` Eric Blake

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1428474011-30797-9-git-send-email-liang.z.li@intel.com \
    --to=liang.z.li@intel.com \
    --cc=amit.shah@redhat.com \
    --cc=armbru@redhat.com \
    --cc=dgilbert@redhat.com \
    --cc=lcapitulino@redhat.com \
    --cc=qemu-devel@nongnu.org \
    --cc=quintela@redhat.com \
    --cc=yang.z.zhang@intel.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.