All of lore.kernel.org
 help / color / mirror / Atom feed
From: Zeyu Jin <jinzeyu@huawei.com>
To: <quintela@redhat.com>, <dgilbert@redhat.com>
Cc: Ying Fang <fangying1@huawei.com>,
	qemu-devel@nongnu.org, Zeyu Jin <jinzeyu@huawei.com>,
	zhang.zhanghailiang@huawei.com
Subject: [RFC PATCH 2/6] migration: Refactoring multi-thread compress migration
Date: Fri, 27 Nov 2020 15:41:41 +0800	[thread overview]
Message-ID: <20201127074142.2113-1-jinzeyu@huawei.com> (raw)

Code refactor for the compression procedure which includes:

1. Move qemu_compress_data and qemu_put_compression_data from qemu-file.c to
ram.c, for the reason that most part of the code logical has nothing to do
with qemu-file. Besides, the decompression code is located at ram.c only.

2. Simplify the function input arguments for compression and decompression.
Wrap the input into the param structure which already exists. This change also
makes the function much more flexible for other compression methods.

Signed-off-by: Zeyu Jin <jinzeyu@huawei.com>
Signed-off-by: Ying Fang <fangying1@huawei.com>
---
 migration/qemu-file.c | 62 ++++++-------------------------
 migration/qemu-file.h |  4 +-
 migration/ram.c       | 86 ++++++++++++++++++++++++++++++-------------
 3 files changed, 75 insertions(+), 77 deletions(-)

diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index be21518c57..1efb667aa1 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -737,56 +737,6 @@ uint64_t qemu_get_be64(QEMUFile *f)
     return v;
 }
 
-/* return the size after compression, or negative value on error */
-static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
-                              const uint8_t *source, size_t source_len)
-{
-    int err;
-
-    err = deflateReset(stream);
-    if (err != Z_OK) {
-        return -1;
-    }
-
-    stream->avail_in = source_len;
-    stream->next_in = (uint8_t *)source;
-    stream->avail_out = dest_len;
-    stream->next_out = dest;
-
-    err = deflate(stream, Z_FINISH);
-    if (err != Z_STREAM_END) {
-        return -1;
-    }
-
-    return stream->next_out - dest;
-}
-
-/* Compress size bytes of data start at p and store the compressed
- * data to the buffer of f.
- *
- * Since the file is dummy file with empty_ops, return -1 if f has no space to
- * save the compressed data.
- */
-ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
-                                  const uint8_t *p, size_t size)
-{
-    ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t);
-
-    if (blen < compressBound(size)) {
-        return -1;
-    }
-
-    blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t),
-                              blen, p, size);
-    if (blen < 0) {
-        return -1;
-    }
-
-    qemu_put_be32(f, blen);
-    add_buf_to_iovec(f, blen);
-    return blen + sizeof(int32_t);
-}
-
 /* Put the data in the buffer of f_src to the buffer of f_des, and
  * then reset the buf_index of f_src to 0.
  */
@@ -846,3 +796,15 @@ void qemu_file_set_blocking(QEMUFile *f, bool block)
         f->ops->set_blocking(f->opaque, block, NULL);
     }
 }
+
+ssize_t qemu_put_compress_start(QEMUFile *f, uint8_t **dest_ptr)
+{
+    *dest_ptr = f->buf + f->buf_index + sizeof(int32_t);
+    return IO_BUF_SIZE - f->buf_index - sizeof(int32_t);
+}
+
+void qemu_put_compress_end(QEMUFile *f, unsigned int v)
+{
+    qemu_put_be32(f, v);
+    add_buf_to_iovec(f, v);
+}
diff --git a/migration/qemu-file.h b/migration/qemu-file.h
index a9b6d6ccb7..1ac1566460 100644
--- a/migration/qemu-file.h
+++ b/migration/qemu-file.h
@@ -138,8 +138,6 @@ bool qemu_file_is_writable(QEMUFile *f);
 
 size_t qemu_peek_buffer(QEMUFile *f, uint8_t **buf, size_t size, size_t offset);
 size_t qemu_get_buffer_in_place(QEMUFile *f, uint8_t **buf, size_t size);
-ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
-                                  const uint8_t *p, size_t size);
 int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src);
 
 /*
@@ -166,6 +164,8 @@ void ram_control_before_iterate(QEMUFile *f, uint64_t flags);
 void ram_control_after_iterate(QEMUFile *f, uint64_t flags);
 void ram_control_load_hook(QEMUFile *f, uint64_t flags, void *data);
 
+ssize_t qemu_put_compress_start(QEMUFile *f, uint8_t **dest_ptr);
+void qemu_put_compress_end(QEMUFile *f, unsigned int v);
 /* Whenever this is found in the data stream, the flags
  * will be passed to ram_control_load_hook in the incoming-migration
  * side. This lets before_ram_iterate/after_ram_iterate add
diff --git a/migration/ram.c b/migration/ram.c
index 2da2b622ab..75504540c9 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -453,27 +453,22 @@ static QemuThread *decompress_threads;
 static QemuMutex decomp_done_lock;
 static QemuCond decomp_done_cond;
 
-static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
-                                 ram_addr_t offset, uint8_t *source_buf);
+static bool do_compress_ram_page(CompressParam *param, RAMBlock *block);
 
 static void *do_data_compress(void *opaque)
 {
     CompressParam *param = opaque;
     RAMBlock *block;
-    ram_addr_t offset;
     bool zero_page;
 
     qemu_mutex_lock(&param->mutex);
     while (!param->quit) {
         if (param->block) {
             block = param->block;
-            offset = param->offset;
             param->block = NULL;
             qemu_mutex_unlock(&param->mutex);
 
-            zero_page = do_compress_ram_page(param->file, &param->stream,
-                                             block, offset, param->originbuf);
-
+            zero_page = do_compress_ram_page(param, block);
             qemu_mutex_lock(&comp_done_lock);
             param->done = true;
             param->zero_page = zero_page;
@@ -1214,28 +1209,73 @@ static int ram_save_multifd_page(RAMState *rs, RAMBlock *block,
     return 1;
 }
 
-static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
-                                 ram_addr_t offset, uint8_t *source_buf)
+/*
+ * Compress size bytes of data start at p and store the compressed
+ * data to the buffer of f.
+ *
+ * Since the file is dummy file with empty_ops, return -1 if f has no space to
+ * save the compressed data.
+ */
+static ssize_t qemu_put_compression_data(CompressParam *param, size_t size)
+{
+    int err;
+    uint8_t *dest = NULL;
+    z_stream *stream = &param->stream;
+    uint8_t *p = param->originbuf;
+    QEMUFile *f = f = param->file;
+    ssize_t blen = qemu_put_compress_start(f, &dest);
+
+    if (blen < compressBound(size)) {
+        return -1;
+    }
+
+    err = deflateReset(stream);
+    if (err != Z_OK) {
+        return -1;
+    }
+
+    stream->avail_in = size;
+    stream->next_in = p;
+    stream->avail_out = blen;
+    stream->next_out = dest;
+
+    err = deflate(stream, Z_FINISH);
+    if (err != Z_STREAM_END) {
+        return -1;
+    }
+
+    blen = stream->next_out - dest;
+    if (blen < 0) {
+        return -1;
+    }
+
+    qemu_put_compress_end(f, blen);
+    return blen + sizeof(int32_t);
+}
+
+static bool do_compress_ram_page(CompressParam *param, RAMBlock *block)
 {
     RAMState *rs = ram_state;
+    ram_addr_t offset = param->offset;
     uint8_t *p = block->host + (offset & TARGET_PAGE_MASK);
     bool zero_page = false;
     int ret;
 
-    if (save_zero_page_to_file(rs, f, block, offset)) {
+    if (save_zero_page_to_file(rs, param->file, block, offset)) {
         zero_page = true;
         goto exit;
     }
 
-    save_page_header(rs, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
+    save_page_header(rs, param->file, block,
+                         offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
 
     /*
      * copy it to a internal buffer to avoid it being modified by VM
      * so that we can catch up the error during compression and
      * decompression
      */
-    memcpy(source_buf, p, TARGET_PAGE_SIZE);
-    ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
+    memcpy(param->originbuf, p, TARGET_PAGE_SIZE);
+    ret = qemu_put_compression_data(param, TARGET_PAGE_SIZE);
     if (ret < 0) {
         qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
         error_report("compressed data failed!");
@@ -2826,19 +2866,20 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
 
 /* return the size after decompression, or negative value on error */
 static int
-qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
-                     const uint8_t *source, size_t source_len)
+qemu_uncompress_data(DecompressParam *param, uint8_t *dest, size_t pagesize)
 {
     int err;
 
+    z_stream *stream = &param->stream;
+
     err = inflateReset(stream);
     if (err != Z_OK) {
         return -1;
     }
 
-    stream->avail_in = source_len;
-    stream->next_in = (uint8_t *)source;
-    stream->avail_out = dest_len;
+    stream->avail_in = param->len;
+    stream->next_in = param->compbuf;
+    stream->avail_out = pagesize;
     stream->next_out = dest;
 
     err = inflate(stream, Z_NO_FLUSH);
@@ -2852,22 +2893,17 @@ qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
 static void *do_data_decompress(void *opaque)
 {
     DecompressParam *param = opaque;
-    unsigned long pagesize;
     uint8_t *des;
-    int len, ret;
+    int ret;
 
     qemu_mutex_lock(&param->mutex);
     while (!param->quit) {
         if (param->des) {
             des = param->des;
-            len = param->len;
             param->des = 0;
             qemu_mutex_unlock(&param->mutex);
 
-            pagesize = TARGET_PAGE_SIZE;
-
-            ret = qemu_uncompress_data(&param->stream, des, pagesize,
-                                       param->compbuf, len);
+            ret = qemu_uncompress_data(param, des, TARGET_PAGE_SIZE);
             if (ret < 0 && migrate_get_current()->decompress_error_check) {
                 error_report("decompress data failed");
                 qemu_file_set_error(decomp_file, ret);
-- 
2.23.0



             reply	other threads:[~2020-11-27  7:43 UTC|newest]

Thread overview: 2+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-11-27  7:41 Zeyu Jin [this message]
  -- strict thread matches above, loose matches on Subject: below --
2020-11-09  9:08 [RFC PATCH 0/6] migration: Multi-thread compression with zstd method Zeyu Jin
2020-11-09  9:08 ` [RFC PATCH 2/6] migration: Refactoring multi-thread compress migration Zeyu Jin

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20201127074142.2113-1-jinzeyu@huawei.com \
    --to=jinzeyu@huawei.com \
    --cc=dgilbert@redhat.com \
    --cc=fangying1@huawei.com \
    --cc=qemu-devel@nongnu.org \
    --cc=quintela@redhat.com \
    --cc=zhang.zhanghailiang@huawei.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.