All of lore.kernel.org
 help / color / mirror / Atom feed
From: Zeyu Jin <jinzeyu@huawei.com>
To: <quintela@redhat.com>, <dgilbert@redhat.com>
Cc: qemu-devel@nongnu.org, Zeyu Jin <jinzeyu@huawei.com>,
	zhang.zhanghailiang@huawei.com
Subject: [RFC PATCH 3/6] migration: Add multi-thread compress ops
Date: Fri, 27 Nov 2020 15:41:55 +0800	[thread overview]
Message-ID: <20201127074155.2165-1-jinzeyu@huawei.com> (raw)

Add the MigrationCompressOps and MigrationDecompressOps structures to make
the compression method configurable for multi-thread compression migration.

Signed-off-by: Zeyu Jin <jinzeyu@huawei.com>
Signed-off-by: Ying Fang <fangying1@huawei.com
---
 migration/migration.c |   9 ++
 migration/migration.h |   1 +
 migration/ram.c       | 273 +++++++++++++++++++++++++++++-------------
 3 files changed, 203 insertions(+), 80 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index d0da95fc0d..2c68012029 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -2356,6 +2356,15 @@ int migrate_decompress_threads(void)
     return s->parameters.decompress_threads;
 }
 
+CompressMethod migrate_compress_method(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->parameters.compress_method;
+}
+
 bool migrate_dirty_bitmaps(void)
 {
     MigrationState *s;
diff --git a/migration/migration.h b/migration/migration.h
index d096b77f74..e22b2ef840 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -339,6 +339,7 @@ int migrate_compress_level(void);
 int migrate_compress_threads(void);
 int migrate_compress_wait_thread(void);
 int migrate_decompress_threads(void);
+CompressMethod migrate_compress_method(void);
 bool migrate_use_events(void);
 bool migrate_postcopy_blocktime(void);
 
diff --git a/migration/ram.c b/migration/ram.c
index 75504540c9..94a7422204 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -419,8 +419,11 @@ struct CompressParam {
     ram_addr_t offset;
 
     /* internally used fields */
-    z_stream stream;
     uint8_t *originbuf;
+
+    /* for zlib compression */
+    z_stream stream;
+
 };
 typedef struct CompressParam CompressParam;
 
@@ -432,12 +435,29 @@ struct DecompressParam {
     void *des;
     uint8_t *compbuf;
     int len;
+
+    /* for zlib compression */
     z_stream stream;
 };
 typedef struct DecompressParam DecompressParam;
 
+typedef struct {
+    int (*save_setup)(CompressParam *param);
+    void (*save_cleanup)(CompressParam *param);
+    ssize_t (*compress_data)(CompressParam *param, size_t size);
+} MigrationCompressOps;
+
+typedef struct {
+    int (*load_setup)(DecompressParam *param);
+    void (*load_cleanup)(DecompressParam *param);
+    int (*decompress_data)(DecompressParam *param, uint8_t *dest, size_t size);
+    int (*check_len)(int len);
+} MigrationDecompressOps;
+
 static CompressParam *comp_param;
 static QemuThread *compress_threads;
+static MigrationCompressOps *compress_ops;
+static MigrationDecompressOps *decompress_ops;
 /* comp_done_cond is used to wake up the migration thread when
  * one of the compression threads has finished the compression.
  * comp_done_lock is used to co-work with comp_done_cond.
@@ -455,6 +475,157 @@ static QemuCond decomp_done_cond;
 
 static bool do_compress_ram_page(CompressParam *param, RAMBlock *block);
 
+static int zlib_save_setup(CompressParam *param)
+{
+    if (deflateInit(&param->stream,
+                    migrate_compress_level()) != Z_OK) {
+        return -1;
+    }
+
+    return 0;
+}
+
+static ssize_t zlib_compress_data(CompressParam *param, size_t size)
+{
+    int err;
+    uint8_t *dest = NULL;
+    z_stream *stream = &param->stream;
+    uint8_t *p = param->originbuf;
+    QEMUFile *f = f = param->file;
+    ssize_t blen = qemu_put_compress_start(f, &dest);
+
+    if (blen < compressBound(size)) {
+        return -1;
+    }
+
+    err = deflateReset(stream);
+    if (err != Z_OK) {
+        return -1;
+    }
+
+    stream->avail_in = size;
+    stream->next_in = p;
+    stream->avail_out = blen;
+    stream->next_out = dest;
+
+    err = deflate(stream, Z_FINISH);
+    if (err != Z_STREAM_END) {
+        return -1;
+    }
+
+    blen = stream->next_out - dest;
+    if (blen < 0) {
+        return -1;
+    }
+
+    qemu_put_compress_end(f, blen);
+    return blen + sizeof(int32_t);
+}
+
+static void zlib_save_cleanup(CompressParam *param)
+{
+    deflateEnd(&param->stream);
+}
+
+static int zlib_load_setup(DecompressParam *param)
+{
+    if (inflateInit(&param->stream) != Z_OK) {
+        return -1;
+    }
+
+    return 0;
+}
+
+static int
+zlib_decompress_data(DecompressParam *param, uint8_t *dest, size_t size)
+{
+    int err;
+
+    z_stream *stream = &param->stream;
+
+    err = inflateReset(stream);
+    if (err != Z_OK) {
+        return -1;
+    }
+
+    stream->avail_in = param->len;
+    stream->next_in = param->compbuf;
+    stream->avail_out = size;
+    stream->next_out = dest;
+
+    err = inflate(stream, Z_NO_FLUSH);
+    if (err != Z_STREAM_END) {
+        return -1;
+    }
+
+    return stream->total_out;
+}
+
+static void zlib_load_cleanup(DecompressParam *param)
+{
+    inflateEnd(&param->stream);
+}
+
+static int zlib_check_len(int len)
+{
+    return len < 0 || len > compressBound(TARGET_PAGE_SIZE);
+}
+
+static int set_compress_ops(void)
+{
+   compress_ops = g_new0(MigrationCompressOps, 1);
+
+    switch (migrate_compress_method()) {
+    case COMPRESS_METHOD_ZLIB:
+        compress_ops->save_setup = zlib_save_setup;
+        compress_ops->save_cleanup = zlib_save_cleanup;
+        compress_ops->compress_data = zlib_compress_data;
+        break;
+    default:
+        return -1;
+    }
+
+    return 0;
+}
+
+static int set_decompress_ops(void)
+{
+   decompress_ops = g_new0(MigrationDecompressOps, 1);
+
+    switch (migrate_compress_method()) {
+    case COMPRESS_METHOD_ZLIB:
+        decompress_ops->load_setup = zlib_load_setup;
+        decompress_ops->load_cleanup = zlib_load_cleanup;
+        decompress_ops->decompress_data = zlib_decompress_data;
+        decompress_ops->check_len = zlib_check_len;
+        break;
+    default:
+        return -1;
+   }
+
+   return 0;
+}
+
+static void clean_compress_ops(void)
+{
+    compress_ops->save_setup = NULL;
+    compress_ops->save_cleanup = NULL;
+    compress_ops->compress_data = NULL;
+
+    g_free(compress_ops);
+    compress_ops = NULL;
+}
+
+static void clean_decompress_ops(void)
+{
+    decompress_ops->load_setup = NULL;
+    decompress_ops->load_cleanup = NULL;
+    decompress_ops->decompress_data = NULL;
+
+    g_free(decompress_ops);
+    decompress_ops = NULL;
+}
+
 static void *do_data_compress(void *opaque)
 {
     CompressParam *param = opaque;
@@ -511,7 +682,7 @@ static void compress_threads_save_cleanup(void)
         qemu_thread_join(compress_threads + i);
         qemu_mutex_destroy(&comp_param[i].mutex);
         qemu_cond_destroy(&comp_param[i].cond);
-        deflateEnd(&comp_param[i].stream);
+        compress_ops->save_cleanup(&comp_param[i]);
         g_free(comp_param[i].originbuf);
         qemu_fclose(comp_param[i].file);
         comp_param[i].file = NULL;
@@ -522,6 +693,7 @@ static void compress_threads_save_cleanup(void)
     g_free(comp_param);
     compress_threads = NULL;
     comp_param = NULL;
+    clean_compress_ops();
 }
 
 static int compress_threads_save_setup(void)
@@ -531,6 +703,12 @@ static int compress_threads_save_setup(void)
     if (!migrate_use_compression()) {
         return 0;
     }
+
+    if (set_compress_ops() < 0) {
+        clean_compress_ops();
+        return -1;
+    }
+
     thread_count = migrate_compress_threads();
     compress_threads = g_new0(QemuThread, thread_count);
     comp_param = g_new0(CompressParam, thread_count);
@@ -542,8 +720,7 @@ static int compress_threads_save_setup(void)
             goto exit;
         }
 
-        if (deflateInit(&comp_param[i].stream,
-                        migrate_compress_level()) != Z_OK) {
+        if (compress_ops->save_setup(&comp_param[i]) < 0) {
             g_free(comp_param[i].originbuf);
             goto exit;
         }
@@ -1209,50 +1386,6 @@ static int ram_save_multifd_page(RAMState *rs, RAMBlock *block,
     return 1;
 }
 
-/*
- * Compress size bytes of data start at p and store the compressed
- * data to the buffer of f.
- *
- * Since the file is dummy file with empty_ops, return -1 if f has no space to
- * save the compressed data.
- */
-static ssize_t qemu_put_compression_data(CompressParam *param, size_t size)
-{
-    int err;
-    uint8_t *dest = NULL;
-    z_stream *stream = &param->stream;
-    uint8_t *p = param->originbuf;
-    QEMUFile *f = f = param->file;
-    ssize_t blen = qemu_put_compress_start(f, &dest);
-
-    if (blen < compressBound(size)) {
-        return -1;
-    }
-
-    err = deflateReset(stream);
-    if (err != Z_OK) {
-        return -1;
-    }
-
-    stream->avail_in = size;
-    stream->next_in = p;
-    stream->avail_out = blen;
-    stream->next_out = dest;
-
-    err = deflate(stream, Z_FINISH);
-    if (err != Z_STREAM_END) {
-        return -1;
-    }
-
-    blen = stream->next_out - dest;
-    if (blen < 0) {
-        return -1;
-    }
-
-    qemu_put_compress_end(f, blen);
-    return blen + sizeof(int32_t);
-}
-
 static bool do_compress_ram_page(CompressParam *param, RAMBlock *block)
 {
     RAMState *rs = ram_state;
@@ -1275,7 +1408,7 @@ static bool do_compress_ram_page(CompressParam *param, RAMBlock *block)
      * decompression
      */
     memcpy(param->originbuf, p, TARGET_PAGE_SIZE);
-    ret = qemu_put_compression_data(param, TARGET_PAGE_SIZE);
+    ret = compress_ops->compress_data(param, TARGET_PAGE_SIZE);
     if (ret < 0) {
         qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
         error_report("compressed data failed!");
@@ -2864,32 +2997,6 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
     }
 }
 
-/* return the size after decompression, or negative value on error */
-static int
-qemu_uncompress_data(DecompressParam *param, uint8_t *dest, size_t pagesize)
-{
-    int err;
-
-    z_stream *stream = &param->stream;
-
-    err = inflateReset(stream);
-    if (err != Z_OK) {
-        return -1;
-    }
-
-    stream->avail_in = param->len;
-    stream->next_in = param->compbuf;
-    stream->avail_out = pagesize;
-    stream->next_out = dest;
-
-    err = inflate(stream, Z_NO_FLUSH);
-    if (err != Z_STREAM_END) {
-        return -1;
-    }
-
-    return stream->total_out;
-}
-
 static void *do_data_decompress(void *opaque)
 {
     DecompressParam *param = opaque;
@@ -2903,7 +3010,7 @@ static void *do_data_decompress(void *opaque)
             param->des = 0;
             qemu_mutex_unlock(&param->mutex);
 
-            ret = qemu_uncompress_data(param, des, TARGET_PAGE_SIZE);
+            ret = decompress_ops->decompress_data(param, des, TARGET_PAGE_SIZE);
             if (ret < 0 && migrate_get_current()->decompress_error_check) {
                 error_report("decompress data failed");
                 qemu_file_set_error(decomp_file, ret);
@@ -2973,7 +3080,7 @@ static void compress_threads_load_cleanup(void)
         qemu_thread_join(decompress_threads + i);
         qemu_mutex_destroy(&decomp_param[i].mutex);
         qemu_cond_destroy(&decomp_param[i].cond);
-        inflateEnd(&decomp_param[i].stream);
+        decompress_ops->load_cleanup(&decomp_param[i]);
         g_free(decomp_param[i].compbuf);
         decomp_param[i].compbuf = NULL;
     }
@@ -2982,6 +3089,7 @@ static void compress_threads_load_cleanup(void)
     decompress_threads = NULL;
     decomp_param = NULL;
     decomp_file = NULL;
+    clean_decompress_ops();
 }
 
 static int compress_threads_load_setup(QEMUFile *f)
@@ -2992,6 +3100,11 @@ static int compress_threads_load_setup(QEMUFile *f)
         return 0;
     }
 
+    if (set_decompress_ops() < 0) {
+        clean_decompress_ops();
+        return -1;
+    }
+
     thread_count = migrate_decompress_threads();
     decompress_threads = g_new0(QemuThread, thread_count);
     decomp_param = g_new0(DecompressParam, thread_count);
@@ -2999,7 +3112,7 @@ static int compress_threads_load_setup(QEMUFile *f)
     qemu_cond_init(&decomp_done_cond);
     decomp_file = f;
     for (i = 0; i < thread_count; i++) {
-        if (inflateInit(&decomp_param[i].stream) != Z_OK) {
+        if (decompress_ops->load_setup(&decomp_param[i]) < 0) {
             goto exit;
         }
 
@@ -3323,7 +3436,7 @@ static int ram_load_postcopy(QEMUFile *f)
         case RAM_SAVE_FLAG_COMPRESS_PAGE:
             all_zero = false;
             len = qemu_get_be32(f);
-            if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
+            if (decompress_ops->check_len(len)) {
                 error_report("Invalid compressed data length: %d", len);
                 ret = -EINVAL;
                 break;
@@ -3590,7 +3703,7 @@ static int ram_load_precopy(QEMUFile *f)
 
         case RAM_SAVE_FLAG_COMPRESS_PAGE:
             len = qemu_get_be32(f);
-            if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
+            if (decompress_ops->check_len(len)) {
                 error_report("Invalid compressed data length: %d", len);
                 ret = -EINVAL;
                 break;
-- 
2.23.0



             reply	other threads:[~2020-11-27  7:45 UTC|newest]

Thread overview: 2+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-11-27  7:41 Zeyu Jin [this message]
  -- strict thread matches above, loose matches on Subject: below --
2020-11-09  9:08 [RFC PATCH 0/6] migration: Multi-thread compression with zstd method Zeyu Jin
2020-11-09  9:08 ` [RFC PATCH 3/6] migration: Add multi-thread compress ops Zeyu Jin

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20201127074155.2165-1-jinzeyu@huawei.com \
    --to=jinzeyu@huawei.com \
    --cc=dgilbert@redhat.com \
    --cc=qemu-devel@nongnu.org \
    --cc=quintela@redhat.com \
    --cc=zhang.zhanghailiang@huawei.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.