qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH v4 0/6] Multifd compression support
@ 2019-06-12 10:53 Juan Quintela
  2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 1/6] migration-test: introduce functions to handle string parameters Juan Quintela
                   ` (5 more replies)
  0 siblings, 6 replies; 10+ messages in thread
From: Juan Quintela @ 2019-06-12 10:53 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Thomas Huth, Juan Quintela, Markus Armbruster,
	Dr. David Alan Gilbert, Paolo Bonzini

v4:
- improve the code left and right
- use the MIGRATION_FLAG_SYNC
- use qerrors properly
- pass errors everywhere (no more printfs)
- create cleanup/save operations
- merged zlib patches into one
- general patches alreody on the migration pull request.
- commented all the methods.

ToDo: Didn't add the sztd compression because I changed the zlib
      code/methods quite a bit.
      My understanding is that all the issues are gone.

v3:
- improve the code
- address David and Markus comments
- make compression code into methods
  so we can add any other method ading just three functions

Please review, as far as I know everything is ok now.

Todo: Add zstd support

v2:
- improve the code left and right
- Split better the zlib code
- rename everything to v4.1
- Add tests for multifd-compress zlib
- Parameter is now an enum (soon will see sztd)

ToDo:
- Make operations for diferent methods:
  * multifd_prepare_send_none/zlib
  * multifd_send_none/zlib
  * multifd_recv_none/zlib
- Use the MULTIFD_FLAG_ZLIB (it is unused so far).

Please review and comment.

v1:

This series create compression code on top of multifd.  It is still
WIP, but it is already:
- faster that current compression code
- it does the minimum amount of copies possible
- we allow support for other compression codes
- it pass the multifd test sent in my previous series

Test for existing code didn't work because code is too slow, I need to
make downtime 10 times bigger to make it to converge on my test
machine.  This code works with same limits that multifd no-

ToDo:
- move printf's  to traces
- move code to a struct instead of if (zlib) inside the main threads.
- improve error handling.

Please, review and coment.

Juan Quintela (6):
  migration-test: introduce functions to handle string parameters
  migration: Make multifd_save_setup() get an Error parameter
  migration: Make multifd_load_setup() get an Error parameter
  migration: Add multifd-compress parameter
  migration: Make no compression operations into its own structure
  migration: Add zlib compression multifd support

 hmp.c                        |  13 +
 hw/core/qdev-properties.c    |  13 +
 include/hw/qdev-properties.h |   3 +
 migration/migration.c        |  34 ++-
 migration/migration.h        |   3 +-
 migration/ram.c              | 454 ++++++++++++++++++++++++++++++++++-
 migration/ram.h              |   4 +-
 migration/rdma.c             |   2 +-
 qapi/migration.json          |  30 ++-
 tests/migration-test.c       |  54 ++++-
 10 files changed, 585 insertions(+), 25 deletions(-)

-- 
2.21.0



^ permalink raw reply	[flat|nested] 10+ messages in thread

* [Qemu-devel] [PATCH v4 1/6] migration-test: introduce functions to handle string parameters
  2019-06-12 10:53 [Qemu-devel] [PATCH v4 0/6] Multifd compression support Juan Quintela
@ 2019-06-12 10:53 ` Juan Quintela
  2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 2/6] migration: Make multifd_save_setup() get an Error parameter Juan Quintela
                   ` (4 subsequent siblings)
  5 siblings, 0 replies; 10+ messages in thread
From: Juan Quintela @ 2019-06-12 10:53 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Thomas Huth, Juan Quintela, Markus Armbruster,
	Dr. David Alan Gilbert, Paolo Bonzini

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 tests/migration-test.c | 37 +++++++++++++++++++++++++++++++++++++
 1 file changed, 37 insertions(+)

diff --git a/tests/migration-test.c b/tests/migration-test.c
index 36d4910192..c7c311e02c 100644
--- a/tests/migration-test.c
+++ b/tests/migration-test.c
@@ -442,6 +442,43 @@ static void migrate_set_parameter_int(QTestState *who, const char *parameter,
     migrate_check_parameter_int(who, parameter, value);
 }
 
+static char *migrate_get_parameter_str(QTestState *who,
+                                       const char *parameter)
+{
+    QDict *rsp;
+    char *result;
+
+    rsp = wait_command(who, "{ 'execute': 'query-migrate-parameters' }");
+    result = g_strdup(qdict_get_str(rsp, parameter));
+    qobject_unref(rsp);
+    return result;
+}
+
+static void migrate_check_parameter_str(QTestState *who, const char *parameter,
+                                        const char *value)
+{
+    char *result;
+
+    result = migrate_get_parameter_str(who, parameter);
+    g_assert_cmpstr(result, ==, value);
+    g_free(result);
+}
+
+__attribute__((unused))
+static void migrate_set_parameter_str(QTestState *who, const char *parameter,
+                                      const char *value)
+{
+    QDict *rsp;
+
+    rsp = qtest_qmp(who,
+                    "{ 'execute': 'migrate-set-parameters',"
+                    "'arguments': { %s: %s } }",
+                    parameter, value);
+    g_assert(qdict_haskey(rsp, "return"));
+    qobject_unref(rsp);
+    migrate_check_parameter_str(who, parameter, value);
+}
+
 static void migrate_pause(QTestState *who)
 {
     QDict *rsp;
-- 
2.21.0



^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [Qemu-devel] [PATCH v4 2/6] migration: Make multifd_save_setup() get an Error parameter
  2019-06-12 10:53 [Qemu-devel] [PATCH v4 0/6] Multifd compression support Juan Quintela
  2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 1/6] migration-test: introduce functions to handle string parameters Juan Quintela
@ 2019-06-12 10:53 ` Juan Quintela
  2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 3/6] migration: Make multifd_load_setup() " Juan Quintela
                   ` (3 subsequent siblings)
  5 siblings, 0 replies; 10+ messages in thread
From: Juan Quintela @ 2019-06-12 10:53 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Thomas Huth, Juan Quintela, Markus Armbruster,
	Dr. David Alan Gilbert, Paolo Bonzini

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/migration.c | 2 +-
 migration/ram.c       | 2 +-
 migration/ram.h       | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 2865ae3fa9..0ac504be3c 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -3336,7 +3336,7 @@ void migrate_fd_connect(MigrationState *s, Error *error_in)
         return;
     }
 
-    if (multifd_save_setup() != 0) {
+    if (multifd_save_setup(&error_in) != 0) {
         migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
                           MIGRATION_STATUS_FAILED);
         migrate_fd_cleanup(s);
diff --git a/migration/ram.c b/migration/ram.c
index 89eec7ee9d..4b65d22cb1 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1172,7 +1172,7 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
     }
 }
 
-int multifd_save_setup(void)
+int multifd_save_setup(Error **errp)
 {
     int thread_count;
     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
diff --git a/migration/ram.h b/migration/ram.h
index 936177b3e9..09feaad55b 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -42,7 +42,7 @@ int xbzrle_cache_resize(int64_t new_size, Error **errp);
 uint64_t ram_bytes_remaining(void);
 uint64_t ram_bytes_total(void);
 
-int multifd_save_setup(void);
+int multifd_save_setup(Error **errp);
 void multifd_save_cleanup(void);
 int multifd_load_setup(void);
 int multifd_load_cleanup(Error **errp);
-- 
2.21.0



^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [Qemu-devel] [PATCH v4 3/6] migration: Make multifd_load_setup() get an Error parameter
  2019-06-12 10:53 [Qemu-devel] [PATCH v4 0/6] Multifd compression support Juan Quintela
  2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 1/6] migration-test: introduce functions to handle string parameters Juan Quintela
  2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 2/6] migration: Make multifd_save_setup() get an Error parameter Juan Quintela
@ 2019-06-12 10:53 ` Juan Quintela
  2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 4/6] migration: Add multifd-compress parameter Juan Quintela
                   ` (2 subsequent siblings)
  5 siblings, 0 replies; 10+ messages in thread
From: Juan Quintela @ 2019-06-12 10:53 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Thomas Huth, Juan Quintela, Markus Armbruster,
	Dr. David Alan Gilbert, Paolo Bonzini

We need to change the full chain to pass the Error parameter.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/migration.c | 10 +++++-----
 migration/migration.h |  2 +-
 migration/ram.c       |  2 +-
 migration/ram.h       |  2 +-
 migration/rdma.c      |  2 +-
 5 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 0ac504be3c..4246bdd661 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -513,11 +513,11 @@ fail:
     exit(EXIT_FAILURE);
 }
 
-static void migration_incoming_setup(QEMUFile *f)
+static void migration_incoming_setup(QEMUFile *f, Error **errp)
 {
     MigrationIncomingState *mis = migration_incoming_get_current();
 
-    if (multifd_load_setup() != 0) {
+    if (multifd_load_setup(errp) != 0) {
         /* We haven't been able to create multifd threads
            nothing better to do */
         exit(EXIT_FAILURE);
@@ -567,13 +567,13 @@ static bool postcopy_try_recover(QEMUFile *f)
     return false;
 }
 
-void migration_fd_process_incoming(QEMUFile *f)
+void migration_fd_process_incoming(QEMUFile *f, Error **errp)
 {
     if (postcopy_try_recover(f)) {
         return;
     }
 
-    migration_incoming_setup(f);
+    migration_incoming_setup(f, errp);
     migration_incoming_process();
 }
 
@@ -591,7 +591,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
             return;
         }
 
-        migration_incoming_setup(f);
+        migration_incoming_setup(f, errp);
 
         /*
          * Common migration only needs one channel, so we can start
diff --git a/migration/migration.h b/migration/migration.h
index 780a096857..71c03353c3 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -237,7 +237,7 @@ struct MigrationState
 
 void migrate_set_state(int *state, int old_state, int new_state);
 
-void migration_fd_process_incoming(QEMUFile *f);
+void migration_fd_process_incoming(QEMUFile *f, Error **errp);
 void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp);
 void migration_incoming_process(void);
 
diff --git a/migration/ram.c b/migration/ram.c
index 4b65d22cb1..b0ca989160 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1370,7 +1370,7 @@ static void *multifd_recv_thread(void *opaque)
     return NULL;
 }
 
-int multifd_load_setup(void)
+int multifd_load_setup(Error **errp)
 {
     int thread_count;
     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
diff --git a/migration/ram.h b/migration/ram.h
index 09feaad55b..dd1a736417 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -44,7 +44,7 @@ uint64_t ram_bytes_total(void);
 
 int multifd_save_setup(Error **errp);
 void multifd_save_cleanup(void);
-int multifd_load_setup(void);
+int multifd_load_setup(Error **errp);
 int multifd_load_cleanup(Error **errp);
 bool multifd_recv_all_channels_created(void);
 bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
diff --git a/migration/rdma.c b/migration/rdma.c
index c1bcece53b..69389a8662 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -4019,7 +4019,7 @@ static void rdma_accept_incoming_migration(void *opaque)
     }
 
     rdma->migration_started_on_destination = 1;
-    migration_fd_process_incoming(f);
+    migration_fd_process_incoming(f, errp);
 }
 
 void rdma_start_incoming_migration(const char *host_port, Error **errp)
-- 
2.21.0



^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [Qemu-devel] [PATCH v4 4/6] migration: Add multifd-compress parameter
  2019-06-12 10:53 [Qemu-devel] [PATCH v4 0/6] Multifd compression support Juan Quintela
                   ` (2 preceding siblings ...)
  2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 3/6] migration: Make multifd_load_setup() " Juan Quintela
@ 2019-06-12 10:53 ` Juan Quintela
  2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 5/6] migration: Make no compression operations into its own structure Juan Quintela
  2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 6/6] migration: Add zlib compression multifd support Juan Quintela
  5 siblings, 0 replies; 10+ messages in thread
From: Juan Quintela @ 2019-06-12 10:53 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Thomas Huth, Juan Quintela, Markus Armbruster,
	Dr. David Alan Gilbert, Paolo Bonzini

Signed-off-by: Juan Quintela <quintela@redhat.com>

---
Rename it to NONE
Fix typos (dave)
We don't need to chek values returned by visit_type_MultifdCompress (markus)
Fix yet more typos (wei)
---
 hmp.c                        | 13 +++++++++++++
 hw/core/qdev-properties.c    | 13 +++++++++++++
 include/hw/qdev-properties.h |  3 +++
 migration/migration.c        | 13 +++++++++++++
 qapi/migration.json          | 30 +++++++++++++++++++++++++++---
 tests/migration-test.c       | 13 ++++++++++---
 6 files changed, 79 insertions(+), 6 deletions(-)

diff --git a/hmp.c b/hmp.c
index be5e345c6f..b011f139ca 100644
--- a/hmp.c
+++ b/hmp.c
@@ -38,6 +38,7 @@
 #include "qapi/qapi-commands-run-state.h"
 #include "qapi/qapi-commands-tpm.h"
 #include "qapi/qapi-commands-ui.h"
+#include "qapi/qapi-visit-migration.h"
 #include "qapi/qmp/qdict.h"
 #include "qapi/qmp/qerror.h"
 #include "qapi/string-input-visitor.h"
@@ -435,6 +436,9 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
         monitor_printf(mon, "%s: %u\n",
             MigrationParameter_str(MIGRATION_PARAMETER_MULTIFD_CHANNELS),
             params->multifd_channels);
+        monitor_printf(mon, "%s: %s\n",
+            MigrationParameter_str(MIGRATION_PARAMETER_MULTIFD_COMPRESS),
+            MultifdCompress_str(params->multifd_compress));
         monitor_printf(mon, "%s: %" PRIu64 "\n",
             MigrationParameter_str(MIGRATION_PARAMETER_XBZRLE_CACHE_SIZE),
             params->xbzrle_cache_size);
@@ -1736,6 +1740,7 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
     MigrateSetParameters *p = g_new0(MigrateSetParameters, 1);
     uint64_t valuebw = 0;
     uint64_t cache_size;
+    MultifdCompress compress_type;
     Error *err = NULL;
     int val, ret;
 
@@ -1821,6 +1826,14 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
         p->has_multifd_channels = true;
         visit_type_int(v, param, &p->multifd_channels, &err);
         break;
+    case MIGRATION_PARAMETER_MULTIFD_COMPRESS:
+        p->has_multifd_compress = true;
+        visit_type_MultifdCompress(v, param, &compress_type, &err);
+        if (err) {
+            break;
+        }
+        p->multifd_compress = compress_type;
+        break;
     case MIGRATION_PARAMETER_XBZRLE_CACHE_SIZE:
         p->has_xbzrle_cache_size = true;
         visit_type_size(v, param, &cache_size, &err);
diff --git a/hw/core/qdev-properties.c b/hw/core/qdev-properties.c
index 5da1439a8b..ebeeb5c88d 100644
--- a/hw/core/qdev-properties.c
+++ b/hw/core/qdev-properties.c
@@ -5,6 +5,7 @@
 #include "hw/pci/pci.h"
 #include "qapi/qmp/qerror.h"
 #include "qemu/error-report.h"
+#include "qapi/qapi-types-migration.h"
 #include "hw/block/block.h"
 #include "net/hub.h"
 #include "qapi/visitor.h"
@@ -645,6 +646,18 @@ const PropertyInfo qdev_prop_fdc_drive_type = {
     .set_default_value = set_default_value_enum,
 };
 
+/* --- MultifdCompress --- */
+
+const PropertyInfo qdev_prop_multifd_compress = {
+    .name = "MultifdCompress",
+    .description = "multifd_compress values, "
+                   "none",
+    .enum_table = &MultifdCompress_lookup,
+    .get = get_enum,
+    .set = set_enum,
+    .set_default_value = set_default_value_enum,
+};
+
 /* --- pci address --- */
 
 /*
diff --git a/include/hw/qdev-properties.h b/include/hw/qdev-properties.h
index 1eae5ab056..34d906593b 100644
--- a/include/hw/qdev-properties.h
+++ b/include/hw/qdev-properties.h
@@ -23,6 +23,7 @@ extern const PropertyInfo qdev_prop_tpm;
 extern const PropertyInfo qdev_prop_ptr;
 extern const PropertyInfo qdev_prop_macaddr;
 extern const PropertyInfo qdev_prop_on_off_auto;
+extern const PropertyInfo qdev_prop_multifd_compress;
 extern const PropertyInfo qdev_prop_losttickpolicy;
 extern const PropertyInfo qdev_prop_blockdev_on_error;
 extern const PropertyInfo qdev_prop_bios_chs_trans;
@@ -205,6 +206,8 @@ extern const PropertyInfo qdev_prop_pcie_link_width;
     DEFINE_PROP(_n, _s, _f, qdev_prop_macaddr, MACAddr)
 #define DEFINE_PROP_ON_OFF_AUTO(_n, _s, _f, _d) \
     DEFINE_PROP_SIGNED(_n, _s, _f, _d, qdev_prop_on_off_auto, OnOffAuto)
+#define DEFINE_PROP_MULTIFD_COMPRESS(_n, _s, _f, _d) \
+    DEFINE_PROP_SIGNED(_n, _s, _f, _d, qdev_prop_multifd_compress, MultifdCompress)
 #define DEFINE_PROP_LOSTTICKPOLICY(_n, _s, _f, _d) \
     DEFINE_PROP_SIGNED(_n, _s, _f, _d, qdev_prop_losttickpolicy, \
                         LostTickPolicy)
diff --git a/migration/migration.c b/migration/migration.c
index 4246bdd661..3f17d8f2f8 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -82,6 +82,7 @@
 /* The delay time (in ms) between two COLO checkpoints */
 #define DEFAULT_MIGRATE_X_CHECKPOINT_DELAY (200 * 100)
 #define DEFAULT_MIGRATE_MULTIFD_CHANNELS 2
+#define DEFAULT_MIGRATE_MULTIFD_COMPRESS MULTIFD_COMPRESS_NONE
 
 /* Background transfer rate for postcopy, 0 means unlimited, note
  * that page requests can still exceed this limit.
@@ -769,6 +770,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
     params->block_incremental = s->parameters.block_incremental;
     params->has_multifd_channels = true;
     params->multifd_channels = s->parameters.multifd_channels;
+    params->has_multifd_compress = true;
+    params->multifd_compress = s->parameters.multifd_compress;
     params->has_xbzrle_cache_size = true;
     params->xbzrle_cache_size = s->parameters.xbzrle_cache_size;
     params->has_max_postcopy_bandwidth = true;
@@ -1268,6 +1271,9 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
     if (params->has_multifd_channels) {
         dest->multifd_channels = params->multifd_channels;
     }
+    if (params->has_multifd_compress) {
+        dest->multifd_compress = params->multifd_compress;
+    }
     if (params->has_xbzrle_cache_size) {
         dest->xbzrle_cache_size = params->xbzrle_cache_size;
     }
@@ -1364,6 +1370,9 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
     if (params->has_multifd_channels) {
         s->parameters.multifd_channels = params->multifd_channels;
     }
+    if (params->has_multifd_compress) {
+        s->parameters.multifd_compress = params->multifd_compress;
+    }
     if (params->has_xbzrle_cache_size) {
         s->parameters.xbzrle_cache_size = params->xbzrle_cache_size;
         xbzrle_cache_resize(params->xbzrle_cache_size, errp);
@@ -3406,6 +3415,9 @@ static Property migration_properties[] = {
     DEFINE_PROP_UINT8("multifd-channels", MigrationState,
                       parameters.multifd_channels,
                       DEFAULT_MIGRATE_MULTIFD_CHANNELS),
+    DEFINE_PROP_MULTIFD_COMPRESS("multifd-compress", MigrationState,
+                      parameters.multifd_compress,
+                      DEFAULT_MIGRATE_MULTIFD_COMPRESS),
     DEFINE_PROP_SIZE("xbzrle-cache-size", MigrationState,
                       parameters.xbzrle_cache_size,
                       DEFAULT_MIGRATE_XBZRLE_CACHE_SIZE),
@@ -3495,6 +3507,7 @@ static void migration_instance_init(Object *obj)
     params->has_x_checkpoint_delay = true;
     params->has_block_incremental = true;
     params->has_multifd_channels = true;
+    params->has_multifd_compress = true;
     params->has_xbzrle_cache_size = true;
     params->has_max_postcopy_bandwidth = true;
     params->has_max_cpu_throttle = true;
diff --git a/qapi/migration.json b/qapi/migration.json
index 9cfbaf8c6c..153527e120 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -482,6 +482,19 @@
 ##
 { 'command': 'query-migrate-capabilities', 'returns':   ['MigrationCapabilityStatus']}
 
+##
+# @MultifdCompress:
+#
+# An enumeration of multifd compression.
+#
+# @none: no compression.
+#
+# Since: 4.1
+#
+##
+{ 'enum': 'MultifdCompress',
+  'data': [ 'none' ] }
+
 ##
 # @MigrationParameter:
 #
@@ -580,6 +593,9 @@
 # @max-cpu-throttle: maximum cpu throttle percentage.
 #                    Defaults to 99. (Since 3.1)
 #
+# @multifd-compress: Which compression method to use.
+#                    Defaults to none. (Since 4.1)
+#
 # Since: 2.4
 ##
 { 'enum': 'MigrationParameter',
@@ -592,7 +608,7 @@
            'downtime-limit', 'x-checkpoint-delay', 'block-incremental',
            'multifd-channels',
            'xbzrle-cache-size', 'max-postcopy-bandwidth',
-           'max-cpu-throttle' ] }
+           'max-cpu-throttle', 'multifd-compress' ] }
 
 ##
 # @MigrateSetParameters:
@@ -682,6 +698,9 @@
 # @max-cpu-throttle: maximum cpu throttle percentage.
 #                    The default value is 99. (Since 3.1)
 #
+# @multifd-compress: Which compression method to use.
+#                    Defaults to none. (Since 4.1)
+#
 # Since: 2.4
 ##
 # TODO either fuse back into MigrationParameters, or make
@@ -707,7 +726,8 @@
             '*multifd-channels': 'int',
             '*xbzrle-cache-size': 'size',
             '*max-postcopy-bandwidth': 'size',
-	    '*max-cpu-throttle': 'int' } }
+            '*max-cpu-throttle': 'int',
+            '*multifd-compress': 'MultifdCompress' } }
 
 ##
 # @migrate-set-parameters:
@@ -817,6 +837,9 @@
 #                    Defaults to 99.
 #                     (Since 3.1)
 #
+# @multifd-compress: Which compression method to use.
+#                    Defaults to none. (Since 4.1)
+#
 # Since: 2.4
 ##
 { 'struct': 'MigrationParameters',
@@ -840,7 +863,8 @@
             '*multifd-channels': 'uint8',
             '*xbzrle-cache-size': 'size',
 	    '*max-postcopy-bandwidth': 'size',
-            '*max-cpu-throttle':'uint8'} }
+            '*max-cpu-throttle': 'uint8',
+            '*multifd-compress': 'MultifdCompress' } }
 
 ##
 # @query-migrate-parameters:
diff --git a/tests/migration-test.c b/tests/migration-test.c
index c7c311e02c..e5b8125e1c 100644
--- a/tests/migration-test.c
+++ b/tests/migration-test.c
@@ -464,7 +464,6 @@ static void migrate_check_parameter_str(QTestState *who, const char *parameter,
     g_free(result);
 }
 
-__attribute__((unused))
 static void migrate_set_parameter_str(QTestState *who, const char *parameter,
                                       const char *value)
 {
@@ -1165,7 +1164,7 @@ static void test_migrate_fd_proto(void)
     test_migrate_end(from, to, true);
 }
 
-static void test_multifd_tcp(void)
+static void test_multifd_tcp(const char *method)
 {
     char *uri;
     QTestState *from, *to;
@@ -1187,6 +1186,9 @@ static void test_multifd_tcp(void)
     migrate_set_parameter_int(from, "multifd-channels", 2);
     migrate_set_parameter_int(to, "multifd-channels", 2);
 
+    migrate_set_parameter_str(from, "multifd-compress", method);
+    migrate_set_parameter_str(to, "multifd-compress", method);
+
     migrate_set_capability(from, "multifd", "true");
     migrate_set_capability(to, "multifd", "true");
     /* Wait for the first serial output from the source */
@@ -1212,6 +1214,11 @@ static void test_multifd_tcp(void)
     free(uri);
 }
 
+static void test_multifd_tcp_none(void)
+{
+    test_multifd_tcp("none");
+}
+
 int main(int argc, char **argv)
 {
     char template[] = "/tmp/migration-test-XXXXXX";
@@ -1267,7 +1274,7 @@ int main(int argc, char **argv)
     /* qtest_add_func("/migration/ignore_shared", test_ignore_shared); */
     qtest_add_func("/migration/xbzrle/unix", test_xbzrle_unix);
     qtest_add_func("/migration/fd_proto", test_migrate_fd_proto);
-    qtest_add_func("/migration/multifd/tcp", test_multifd_tcp);
+    qtest_add_func("/migration/multifd/tcp/none", test_multifd_tcp_none);
 
     ret = g_test_run();
 
-- 
2.21.0



^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [Qemu-devel] [PATCH v4 5/6] migration: Make no compression operations into its own structure
  2019-06-12 10:53 [Qemu-devel] [PATCH v4 0/6] Multifd compression support Juan Quintela
                   ` (3 preceding siblings ...)
  2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 4/6] migration: Add multifd-compress parameter Juan Quintela
@ 2019-06-12 10:53 ` Juan Quintela
  2019-06-14 11:26   ` Dr. David Alan Gilbert
  2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 6/6] migration: Add zlib compression multifd support Juan Quintela
  5 siblings, 1 reply; 10+ messages in thread
From: Juan Quintela @ 2019-06-12 10:53 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Thomas Huth, Juan Quintela, Markus Armbruster,
	Dr. David Alan Gilbert, Paolo Bonzini

It will be used later.

Signed-off-by: Juan Quintela <quintela@redhat.com>

---
Move setup of ->ops helper to proper place (wei)
Rename s/none/nocomp/ (dave)
---
 migration/migration.c |   9 ++
 migration/migration.h |   1 +
 migration/ram.c       | 188 ++++++++++++++++++++++++++++++++++++++++--
 3 files changed, 190 insertions(+), 8 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 3f17d8f2f8..a3526d395b 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -2174,6 +2174,15 @@ int migrate_multifd_channels(void)
     return s->parameters.multifd_channels;
 }
 
+int migrate_multifd_method(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->parameters.multifd_compress;
+}
+
 int migrate_use_xbzrle(void)
 {
     MigrationState *s;
diff --git a/migration/migration.h b/migration/migration.h
index 71c03353c3..437abf3405 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -270,6 +270,7 @@ bool migrate_auto_converge(void);
 bool migrate_use_multifd(void);
 bool migrate_pause_before_switchover(void);
 int migrate_multifd_channels(void);
+int migrate_multifd_method(void);
 
 int migrate_use_xbzrle(void);
 int64_t migrate_xbzrle_cache_size(void);
diff --git a/migration/ram.c b/migration/ram.c
index b0ca989160..3b0002ddba 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -45,6 +45,7 @@
 #include "page_cache.h"
 #include "qemu/error-report.h"
 #include "qapi/error.h"
+#include "qapi/qapi-types-migration.h"
 #include "qapi/qapi-events-migration.h"
 #include "qapi/qmp/qerror.h"
 #include "trace.h"
@@ -661,6 +662,8 @@ typedef struct {
     uint64_t num_packets;
     /* pages sent through this channel */
     uint64_t num_pages;
+    /* used for compression methods */
+    void *data;
 }  MultiFDSendParams;
 
 typedef struct {
@@ -696,8 +699,152 @@ typedef struct {
     uint64_t num_pages;
     /* syncs main thread and channels */
     QemuSemaphore sem_sync;
+    /* used for de-compression methods */
+    void *data;
 } MultiFDRecvParams;
 
+typedef struct {
+    /* Setup for sending side */
+    int (*send_setup)(MultiFDSendParams *p, Error **errp);
+    /* Cleanup for sending side */
+    void (*send_cleanup)(MultiFDSendParams *p);
+    /* Prepare the send packet */
+    int (*send_prepare)(MultiFDSendParams *p, uint32_t used, Error **errp);
+    /* Write the send packet */
+    int (*send_write)(MultiFDSendParams *p, uint32_t used, Error **errp);
+    /* Setup for receiving side */
+    int (*recv_setup)(MultiFDRecvParams *p, Error **errp);
+    /* Cleanup for receiving side */
+    void (*recv_cleanup)(MultiFDRecvParams *p);
+    /* Read all pages */
+    int (*recv_pages)(MultiFDRecvParams *p, uint32_t used, Error **errp);
+} MultiFDMethods;
+
+/* Multifd without compression */
+
+/**
+ * nocomp_send_setup: setup send side
+ *
+ * For no compression this function does nothing.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
+{
+    return 0;
+}
+
+/**
+ * nocomp_send_cleanup: cleanup send side
+ *
+ * For no compression this function does nothing.
+ *
+ * @p: Params for the channel that we are using
+ */
+static void nocomp_send_cleanup(MultiFDSendParams *p)
+{
+    return;
+}
+
+/**
+ * nocomp_send_prepare: prepare date to be able to send
+ *
+ * For no compression we just have to calculate the size of the
+ * packet.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @used: number of pages used
+ * @errp: pointer to an error
+ */
+static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used,
+                               Error **errp)
+{
+    p->next_packet_size = used * qemu_target_page_size();
+    return 0;
+}
+
+/**
+ * nocomp_send_write: do the actual write of the data
+ *
+ * For no compression we just have to write the data.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @used: number of pages used
+ * @errp: pointer to an error
+ */
+static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
+{
+    return qio_channel_writev_all(p->c, p->pages->iov, used, errp);
+}
+
+/**
+ * nocomp_recv_setup: setup receive side
+ *
+ * For no compression this function does nothing.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
+{
+    return 0;
+}
+
+/**
+ * nocomp_recv_cleanup: setup receive side
+ *
+ * For no compression this function does nothing.
+ *
+ * @p: Params for the channel that we are using
+ */
+static void nocomp_recv_cleanup(MultiFDRecvParams *p)
+{
+}
+
+/**
+ * nocomp_recv_pages: read the data from the channel into actual pages
+ *
+ * For no compression we just need to read things into the correct place.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @used: number of pages used
+ * @errp: pointer to an error
+ */
+static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
+{
+    if (p->flags != 0) {
+        error_setg(errp, "multifd %d: flags received %x flags expected %x",
+                   p->id, MULTIFD_FLAG_ZLIB, p->flags);
+        return -1;
+    }
+    return qio_channel_readv_all(p->c, p->pages->iov, used, errp);
+}
+
+static MultiFDMethods multifd_nocomp_ops = {
+    .send_setup = nocomp_send_setup,
+    .send_cleanup = nocomp_send_cleanup,
+    .send_prepare = nocomp_send_prepare,
+    .send_write = nocomp_send_write,
+    .recv_setup = nocomp_recv_setup,
+    .recv_cleanup = nocomp_recv_cleanup,
+    .recv_pages = nocomp_recv_pages
+};
+
+static MultiFDMethods *multifd_ops[MULTIFD_COMPRESS__MAX] = {
+    [MULTIFD_COMPRESS_NONE] = &multifd_nocomp_ops,
+};
+
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
 {
     MultiFDInit_t msg;
@@ -900,6 +1047,8 @@ struct {
     uint64_t packet_num;
     /* send channels ready */
     QemuSemaphore channels_ready;
+    /* multifd ops */
+    MultiFDMethods *ops;
 } *multifd_send_state;
 
 /*
@@ -1030,6 +1179,7 @@ void multifd_save_cleanup(void)
         p->packet_len = 0;
         g_free(p->packet);
         p->packet = NULL;
+        multifd_send_state->ops->send_cleanup(p);
     }
     qemu_sem_destroy(&multifd_send_state->channels_ready);
     qemu_sem_destroy(&multifd_send_state->sem_sync);
@@ -1097,7 +1247,14 @@ static void *multifd_send_thread(void *opaque)
             uint64_t packet_num = p->packet_num;
             uint32_t flags = p->flags;
 
-            p->next_packet_size = used * qemu_target_page_size();
+            if (used) {
+                ret = multifd_send_state->ops->send_prepare(p, used,
+                                                            &local_err);
+                if (ret != 0) {
+                    qemu_mutex_unlock(&p->mutex);
+                    break;
+                }
+            }
             multifd_send_fill_packet(p);
             p->flags = 0;
             p->num_packets++;
@@ -1115,8 +1272,7 @@ static void *multifd_send_thread(void *opaque)
             }
 
             if (used) {
-                ret = qio_channel_writev_all(p->c, p->pages->iov,
-                                             used, &local_err);
+                ret = multifd_send_state->ops->send_write(p, used, &local_err);
                 if (ret != 0) {
                     break;
                 }
@@ -1176,6 +1332,7 @@ int multifd_save_setup(Error **errp)
 {
     int thread_count;
     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+    int ret = 0;
     uint8_t i;
 
     if (!migrate_use_multifd()) {
@@ -1187,9 +1344,11 @@ int multifd_save_setup(Error **errp)
     multifd_send_state->pages = multifd_pages_init(page_count);
     qemu_sem_init(&multifd_send_state->sem_sync, 0);
     qemu_sem_init(&multifd_send_state->channels_ready, 0);
+    multifd_send_state->ops = multifd_ops[migrate_multifd_method()];
 
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
+        int res;
 
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
@@ -1202,8 +1361,12 @@ int multifd_save_setup(Error **errp)
         p->packet = g_malloc0(p->packet_len);
         p->name = g_strdup_printf("multifdsend_%d", i);
         socket_send_channel_create(multifd_new_send_channel_async, p);
+        res = multifd_send_state->ops->send_setup(p, errp);
+        if (ret == 0) {
+            ret = res;
+        }
     }
-    return 0;
+    return ret;
 }
 
 struct {
@@ -1214,6 +1377,8 @@ struct {
     QemuSemaphore sem_sync;
     /* global number of generated multifd packets */
     uint64_t packet_num;
+    /* multifd ops */
+    MultiFDMethods *ops;
 } *multifd_recv_state;
 
 static void multifd_recv_terminate_threads(Error *err)
@@ -1246,7 +1411,6 @@ static void multifd_recv_terminate_threads(Error *err)
 int multifd_load_cleanup(Error **errp)
 {
     int i;
-    int ret = 0;
 
     if (!migrate_use_multifd()) {
         return 0;
@@ -1269,6 +1433,7 @@ int multifd_load_cleanup(Error **errp)
         p->packet_len = 0;
         g_free(p->packet);
         p->packet = NULL;
+        multifd_recv_state->ops->recv_cleanup(p);
     }
     qemu_sem_destroy(&multifd_recv_state->sem_sync);
     g_free(multifd_recv_state->params);
@@ -1276,7 +1441,7 @@ int multifd_load_cleanup(Error **errp)
     g_free(multifd_recv_state);
     multifd_recv_state = NULL;
 
-    return ret;
+    return 0;
 }
 
 static void multifd_recv_sync_main(void)
@@ -1337,6 +1502,8 @@ static void *multifd_recv_thread(void *opaque)
 
         used = p->pages->used;
         flags = p->flags;
+        /* recv methods don't know how to handle the SYNC flag */
+        p->flags &= ~MULTIFD_FLAG_SYNC;
         trace_multifd_recv(p->id, p->packet_num, used, flags,
                            p->next_packet_size);
         p->num_packets++;
@@ -1344,8 +1511,7 @@ static void *multifd_recv_thread(void *opaque)
         qemu_mutex_unlock(&p->mutex);
 
         if (used) {
-            ret = qio_channel_readv_all(p->c, p->pages->iov,
-                                        used, &local_err);
+            ret = multifd_recv_state->ops->recv_pages(p, used, &local_err);
             if (ret != 0) {
                 break;
             }
@@ -1384,9 +1550,11 @@ int multifd_load_setup(Error **errp)
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
     atomic_set(&multifd_recv_state->count, 0);
     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
+    multifd_recv_state->ops = multifd_ops[migrate_multifd_method()];
 
     for (i = 0; i < thread_count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
+        int ret;
 
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem_sync, 0);
@@ -1396,6 +1564,10 @@ int multifd_load_setup(Error **errp)
                       + sizeof(ram_addr_t) * page_count;
         p->packet = g_malloc0(p->packet_len);
         p->name = g_strdup_printf("multifdrecv_%d", i);
+        ret = multifd_recv_state->ops->recv_setup(p, errp);
+        if (ret != 0) {
+            return ret;
+        }
     }
     return 0;
 }
-- 
2.21.0



^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [Qemu-devel] [PATCH v4 6/6] migration: Add zlib compression multifd support
  2019-06-12 10:53 [Qemu-devel] [PATCH v4 0/6] Multifd compression support Juan Quintela
                   ` (4 preceding siblings ...)
  2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 5/6] migration: Make no compression operations into its own structure Juan Quintela
@ 2019-06-12 10:53 ` Juan Quintela
  2019-06-14 12:56   ` Dr. David Alan Gilbert
  5 siblings, 1 reply; 10+ messages in thread
From: Juan Quintela @ 2019-06-12 10:53 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Thomas Huth, Juan Quintela, Markus Armbruster,
	Dr. David Alan Gilbert, Paolo Bonzini

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 hw/core/qdev-properties.c |   2 +-
 migration/ram.c           | 262 ++++++++++++++++++++++++++++++++++++++
 qapi/migration.json       |   2 +-
 tests/migration-test.c    |   6 +
 4 files changed, 270 insertions(+), 2 deletions(-)

diff --git a/hw/core/qdev-properties.c b/hw/core/qdev-properties.c
index ebeeb5c88d..e40aa806e2 100644
--- a/hw/core/qdev-properties.c
+++ b/hw/core/qdev-properties.c
@@ -651,7 +651,7 @@ const PropertyInfo qdev_prop_fdc_drive_type = {
 const PropertyInfo qdev_prop_multifd_compress = {
     .name = "MultifdCompress",
     .description = "multifd_compress values, "
-                   "none",
+                   "none/zlib",
     .enum_table = &MultifdCompress_lookup,
     .get = get_enum,
     .set = set_enum,
diff --git a/migration/ram.c b/migration/ram.c
index 3b0002ddba..691ebd9108 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -583,6 +583,7 @@ exit:
 #define MULTIFD_VERSION 1
 
 #define MULTIFD_FLAG_SYNC (1 << 0)
+#define MULTIFD_FLAG_ZLIB (1 << 1)
 
 /* This value needs to be a multiple of qemu_target_page_size() */
 #define MULTIFD_PACKET_SIZE (512 * 1024)
@@ -625,6 +626,15 @@ typedef struct {
     RAMBlock *block;
 } MultiFDPages_t;
 
+struct zlib_data {
+    /* stream for compression */
+    z_stream zs;
+    /* compressed buffer */
+    uint8_t *zbuff;
+    /* size of compressed buffer */
+    uint32_t zbuff_len;
+};
+
 typedef struct {
     /* this fields are not changed once the thread is created */
     /* channel number */
@@ -841,8 +851,260 @@ static MultiFDMethods multifd_nocomp_ops = {
     .recv_pages = nocomp_recv_pages
 };
 
+/* Multifd zlib compression */
+
+/**
+ * zlib_send_setup: setup send side
+ *
+ * Setup each channel with zlib compression.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int zlib_send_setup(MultiFDSendParams *p, Error **errp)
+{
+    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+    struct zlib_data *z = g_malloc0(sizeof(struct zlib_data));
+    z_stream *zs = &z->zs;
+
+    p->data = z;
+    zs->zalloc = Z_NULL;
+    zs->zfree = Z_NULL;
+    zs->opaque = Z_NULL;
+    if (deflateInit(zs, migrate_compress_level()) != Z_OK) {
+        error_setg(errp, "multifd %d: deflate init failed", p->id);
+        return -1;
+    }
+    /* We will never have more than page_count pages */
+    z->zbuff_len = page_count * qemu_target_page_size();
+    z->zbuff_len *= 2;
+    z->zbuff = g_try_malloc(z->zbuff_len);
+    if (!z->zbuff) {
+        error_setg(errp, "multifd %d: out of memory for zbuff", p->id);
+        return -1;
+    }
+    return 0;
+}
+
+/**
+ * zlib_send_cleanup: cleanup send side
+ *
+ * Close the channel and return memory.
+ *
+ * @p: Params for the channel that we are using
+ */
+static void zlib_send_cleanup(MultiFDSendParams *p)
+{
+    struct zlib_data *z = p->data;
+
+    deflateEnd(&z->zs);
+    g_free(z->zbuff);
+    z->zbuff = NULL;
+    g_free(p->data);
+    p->data = NULL;
+}
+
+/**
+ * zlib_send_prepare: prepare date to be able to send
+ *
+ * Create a compressed buffer with all the pages that we are going to
+ * send.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @used: number of pages used
+ */
+static int zlib_send_prepare(MultiFDSendParams *p, uint32_t used, Error **errp)
+{
+    struct iovec *iov = p->pages->iov;
+    struct zlib_data *z = p->data;
+    z_stream *zs = &z->zs;
+    uint32_t out_size = 0;
+    int ret;
+    uint32_t i;
+
+    for (i = 0; i < used; i++) {
+        uint32_t available = z->zbuff_len - out_size;
+        int flush = Z_NO_FLUSH;
+
+        if (i == used  - 1) {
+            flush = Z_SYNC_FLUSH;
+        }
+
+        zs->avail_in = iov[i].iov_len;
+        zs->next_in = iov[i].iov_base;
+
+        zs->avail_out = available;
+        zs->next_out = z->zbuff + out_size;
+
+        ret = deflate(zs, flush);
+        if (ret != Z_OK) {
+            error_setg(errp, "multifd %d: deflate returned %d instead of Z_OK",
+                       p->id, ret);
+            return -1;
+        }
+        out_size += available - zs->avail_out;
+    }
+    p->next_packet_size = out_size;
+    p->flags |= MULTIFD_FLAG_ZLIB;
+
+    return 0;
+}
+
+/**
+ * zlib_send_write: do the actual write of the data
+ *
+ * Do the actual write of the comprresed buffer.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @used: number of pages used
+ * @errp: pointer to an error
+ */
+static int zlib_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
+{
+    struct zlib_data *z = p->data;
+
+    return qio_channel_write_all(p->c, (void *)z->zbuff, p->next_packet_size,
+                                 errp);
+}
+
+/**
+ * zlib_recv_setup: setup receive side
+ *
+ * Create the compressed channel and buffer.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int zlib_recv_setup(MultiFDRecvParams *p, Error **errp)
+{
+    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+    struct zlib_data *z = g_malloc0(sizeof(struct zlib_data));
+    z_stream *zs = &z->zs;
+
+    p->data = z;
+    zs->zalloc = Z_NULL;
+    zs->zfree = Z_NULL;
+    zs->opaque = Z_NULL;
+    zs->avail_in = 0;
+    zs->next_in = Z_NULL;
+    if (inflateInit(zs) != Z_OK) {
+        error_setg(errp, "multifd %d: inflate init failed", p->id);
+        return -1;
+    }
+    /* We will never have more than page_count pages */
+    z->zbuff_len = page_count * qemu_target_page_size();
+    /* We know compression "could" use more space */
+    z->zbuff_len *= 2;
+    z->zbuff = g_try_malloc(z->zbuff_len);
+    if (!z->zbuff) {
+        error_setg(errp, "multifd %d: out of memory for zbuff", p->id);
+        return -1;
+    }
+    return 0;
+}
+
+/**
+ * zlib_recv_cleanup: setup receive side
+ *
+ * For no compression this function does nothing.
+ *
+ * @p: Params for the channel that we are using
+ */
+static void zlib_recv_cleanup(MultiFDRecvParams *p)
+{
+    struct zlib_data *z = p->data;
+
+    inflateEnd(&z->zs);
+    g_free(z->zbuff);
+    z->zbuff = NULL;
+    g_free(p->data);
+    p->data = NULL;
+}
+
+/**
+ * zlib_recv_pages: read the data from the channel into actual pages
+ *
+ * Read the compressed buffer, and uncompress it into the actual
+ * pages.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @used: number of pages used
+ * @errp: pointer to an error
+ */
+static int zlib_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
+{
+    uint32_t in_size = p->next_packet_size;
+    uint32_t out_size = 0;
+    uint32_t expected_size = used * qemu_target_page_size();
+    struct zlib_data *z = p->data;
+    z_stream *zs = &z->zs;
+    int ret;
+    int i;
+
+    if (p->flags != MULTIFD_FLAG_ZLIB) {
+        error_setg(errp, "multifd %d: flags received %x flags expected %x",
+                   p->id, MULTIFD_FLAG_ZLIB, p->flags);
+        return -1;
+    }
+    ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp);
+
+    if (ret != 0) {
+        return ret;
+    }
+
+    zs->avail_in = in_size;
+    zs->next_in = z->zbuff;
+
+    for (i = 0; i < used; i++) {
+        struct iovec *iov = &p->pages->iov[i];
+        int flush = Z_NO_FLUSH;
+
+        if (i == used  - 1) {
+            flush = Z_SYNC_FLUSH;
+        }
+
+        zs->avail_out = iov->iov_len;
+        zs->next_out = iov->iov_base;
+
+        ret = inflate(zs, flush);
+        if (ret != Z_OK) {
+            error_setg(errp, "multifd %d: inflate returned %d instead of Z_OK",
+                       p->id, ret);
+            return ret;
+        }
+        out_size += iov->iov_len;
+    }
+    if (out_size != expected_size) {
+        error_setg(errp, "multifd %d: packet size received %d size expected %d",
+                   p->id, out_size, expected_size);
+        return -1;
+    }
+    return 0;
+}
+
+static MultiFDMethods multifd_zlib_ops = {
+    .send_setup = zlib_send_setup,
+    .send_cleanup = zlib_send_cleanup,
+    .send_prepare = zlib_send_prepare,
+    .send_write = zlib_send_write,
+    .recv_setup = zlib_recv_setup,
+    .recv_cleanup = zlib_recv_cleanup,
+    .recv_pages = zlib_recv_pages
+};
+
 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESS__MAX] = {
     [MULTIFD_COMPRESS_NONE] = &multifd_nocomp_ops,
+    [MULTIFD_COMPRESS_ZLIB] = &multifd_zlib_ops,
 };
 
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
diff --git a/qapi/migration.json b/qapi/migration.json
index 153527e120..085eba8f07 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -493,7 +493,7 @@
 #
 ##
 { 'enum': 'MultifdCompress',
-  'data': [ 'none' ] }
+  'data': [ 'none', 'zlib' ] }
 
 ##
 # @MigrationParameter:
diff --git a/tests/migration-test.c b/tests/migration-test.c
index e5b8125e1c..e6995ae4e7 100644
--- a/tests/migration-test.c
+++ b/tests/migration-test.c
@@ -1219,6 +1219,11 @@ static void test_multifd_tcp_none(void)
     test_multifd_tcp("none");
 }
 
+static void test_multifd_tcp_zlib(void)
+{
+    test_multifd_tcp("zlib");
+}
+
 int main(int argc, char **argv)
 {
     char template[] = "/tmp/migration-test-XXXXXX";
@@ -1275,6 +1280,7 @@ int main(int argc, char **argv)
     qtest_add_func("/migration/xbzrle/unix", test_xbzrle_unix);
     qtest_add_func("/migration/fd_proto", test_migrate_fd_proto);
     qtest_add_func("/migration/multifd/tcp/none", test_multifd_tcp_none);
+    qtest_add_func("/migration/multifd/tcp/zlib", test_multifd_tcp_zlib);
 
     ret = g_test_run();
 
-- 
2.21.0



^ permalink raw reply related	[flat|nested] 10+ messages in thread

* Re: [Qemu-devel] [PATCH v4 5/6] migration: Make no compression operations into its own structure
  2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 5/6] migration: Make no compression operations into its own structure Juan Quintela
@ 2019-06-14 11:26   ` Dr. David Alan Gilbert
  2019-06-14 17:33     ` Juan Quintela
  0 siblings, 1 reply; 10+ messages in thread
From: Dr. David Alan Gilbert @ 2019-06-14 11:26 UTC (permalink / raw)
  To: Juan Quintela
  Cc: Laurent Vivier, Thomas Huth, qemu-devel, Markus Armbruster,
	Paolo Bonzini

* Juan Quintela (quintela@redhat.com) wrote:
> It will be used later.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> ---
> Move setup of ->ops helper to proper place (wei)
> Rename s/none/nocomp/ (dave)
> ---
>  migration/migration.c |   9 ++
>  migration/migration.h |   1 +
>  migration/ram.c       | 188 ++++++++++++++++++++++++++++++++++++++++--
>  3 files changed, 190 insertions(+), 8 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index 3f17d8f2f8..a3526d395b 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -2174,6 +2174,15 @@ int migrate_multifd_channels(void)
>      return s->parameters.multifd_channels;
>  }
>  
> +int migrate_multifd_method(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->parameters.multifd_compress;
> +}
> +
>  int migrate_use_xbzrle(void)
>  {
>      MigrationState *s;
> diff --git a/migration/migration.h b/migration/migration.h
> index 71c03353c3..437abf3405 100644
> --- a/migration/migration.h
> +++ b/migration/migration.h
> @@ -270,6 +270,7 @@ bool migrate_auto_converge(void);
>  bool migrate_use_multifd(void);
>  bool migrate_pause_before_switchover(void);
>  int migrate_multifd_channels(void);
> +int migrate_multifd_method(void);
>  
>  int migrate_use_xbzrle(void);
>  int64_t migrate_xbzrle_cache_size(void);
> diff --git a/migration/ram.c b/migration/ram.c
> index b0ca989160..3b0002ddba 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -45,6 +45,7 @@
>  #include "page_cache.h"
>  #include "qemu/error-report.h"
>  #include "qapi/error.h"
> +#include "qapi/qapi-types-migration.h"
>  #include "qapi/qapi-events-migration.h"
>  #include "qapi/qmp/qerror.h"
>  #include "trace.h"
> @@ -661,6 +662,8 @@ typedef struct {
>      uint64_t num_packets;
>      /* pages sent through this channel */
>      uint64_t num_pages;
> +    /* used for compression methods */
> +    void *data;
>  }  MultiFDSendParams;
>  
>  typedef struct {
> @@ -696,8 +699,152 @@ typedef struct {
>      uint64_t num_pages;
>      /* syncs main thread and channels */
>      QemuSemaphore sem_sync;
> +    /* used for de-compression methods */
> +    void *data;
>  } MultiFDRecvParams;
>  
> +typedef struct {
> +    /* Setup for sending side */
> +    int (*send_setup)(MultiFDSendParams *p, Error **errp);
> +    /* Cleanup for sending side */
> +    void (*send_cleanup)(MultiFDSendParams *p);
> +    /* Prepare the send packet */
> +    int (*send_prepare)(MultiFDSendParams *p, uint32_t used, Error **errp);
> +    /* Write the send packet */
> +    int (*send_write)(MultiFDSendParams *p, uint32_t used, Error **errp);
> +    /* Setup for receiving side */
> +    int (*recv_setup)(MultiFDRecvParams *p, Error **errp);
> +    /* Cleanup for receiving side */
> +    void (*recv_cleanup)(MultiFDRecvParams *p);
> +    /* Read all pages */
> +    int (*recv_pages)(MultiFDRecvParams *p, uint32_t used, Error **errp);
> +} MultiFDMethods;
> +
> +/* Multifd without compression */
> +
> +/**
> + * nocomp_send_setup: setup send side
> + *
> + * For no compression this function does nothing.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
> +{
> +    return 0;
> +}
> +
> +/**
> + * nocomp_send_cleanup: cleanup send side
> + *
> + * For no compression this function does nothing.
> + *
> + * @p: Params for the channel that we are using
> + */
> +static void nocomp_send_cleanup(MultiFDSendParams *p)
> +{
> +    return;
> +}
> +
> +/**
> + * nocomp_send_prepare: prepare date to be able to send
> + *
> + * For no compression we just have to calculate the size of the
> + * packet.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @used: number of pages used
> + * @errp: pointer to an error
> + */
> +static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used,
> +                               Error **errp)
> +{
> +    p->next_packet_size = used * qemu_target_page_size();
> +    return 0;
> +}
> +
> +/**
> + * nocomp_send_write: do the actual write of the data
> + *
> + * For no compression we just have to write the data.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @used: number of pages used
> + * @errp: pointer to an error
> + */
> +static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
> +{
> +    return qio_channel_writev_all(p->c, p->pages->iov, used, errp);
> +}
> +
> +/**
> + * nocomp_recv_setup: setup receive side
> + *
> + * For no compression this function does nothing.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
> +{
> +    return 0;
> +}
> +
> +/**
> + * nocomp_recv_cleanup: setup receive side
> + *
> + * For no compression this function does nothing.
> + *
> + * @p: Params for the channel that we are using
> + */
> +static void nocomp_recv_cleanup(MultiFDRecvParams *p)
> +{
> +}
> +
> +/**
> + * nocomp_recv_pages: read the data from the channel into actual pages
> + *
> + * For no compression we just need to read things into the correct place.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @used: number of pages used
> + * @errp: pointer to an error
> + */
> +static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
> +{
> +    if (p->flags != 0) {
> +        error_setg(errp, "multifd %d: flags received %x flags expected %x",
> +                   p->id, MULTIFD_FLAG_ZLIB, p->flags);

Can you just explain that a bit - the 'received' seems to be constant
while the expected is p->flags - is that the right way around?
Why would you expect FLAG_ZLIB in nocomp?

> +        return -1;
> +    }
> +    return qio_channel_readv_all(p->c, p->pages->iov, used, errp);
> +}
> +
> +static MultiFDMethods multifd_nocomp_ops = {
> +    .send_setup = nocomp_send_setup,
> +    .send_cleanup = nocomp_send_cleanup,
> +    .send_prepare = nocomp_send_prepare,
> +    .send_write = nocomp_send_write,
> +    .recv_setup = nocomp_recv_setup,
> +    .recv_cleanup = nocomp_recv_cleanup,
> +    .recv_pages = nocomp_recv_pages
> +};
> +
> +static MultiFDMethods *multifd_ops[MULTIFD_COMPRESS__MAX] = {
> +    [MULTIFD_COMPRESS_NONE] = &multifd_nocomp_ops,
> +};
> +
>  static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
>  {
>      MultiFDInit_t msg;
> @@ -900,6 +1047,8 @@ struct {
>      uint64_t packet_num;
>      /* send channels ready */
>      QemuSemaphore channels_ready;
> +    /* multifd ops */
> +    MultiFDMethods *ops;
>  } *multifd_send_state;
>  
>  /*
> @@ -1030,6 +1179,7 @@ void multifd_save_cleanup(void)
>          p->packet_len = 0;
>          g_free(p->packet);
>          p->packet = NULL;
> +        multifd_send_state->ops->send_cleanup(p);
>      }
>      qemu_sem_destroy(&multifd_send_state->channels_ready);
>      qemu_sem_destroy(&multifd_send_state->sem_sync);
> @@ -1097,7 +1247,14 @@ static void *multifd_send_thread(void *opaque)
>              uint64_t packet_num = p->packet_num;
>              uint32_t flags = p->flags;
>  
> -            p->next_packet_size = used * qemu_target_page_size();
> +            if (used) {
> +                ret = multifd_send_state->ops->send_prepare(p, used,
> +                                                            &local_err);
> +                if (ret != 0) {
> +                    qemu_mutex_unlock(&p->mutex);
> +                    break;
> +                }
> +            }
>              multifd_send_fill_packet(p);
>              p->flags = 0;
>              p->num_packets++;
> @@ -1115,8 +1272,7 @@ static void *multifd_send_thread(void *opaque)
>              }
>  
>              if (used) {
> -                ret = qio_channel_writev_all(p->c, p->pages->iov,
> -                                             used, &local_err);
> +                ret = multifd_send_state->ops->send_write(p, used, &local_err);
>                  if (ret != 0) {
>                      break;
>                  }
> @@ -1176,6 +1332,7 @@ int multifd_save_setup(Error **errp)
>  {
>      int thread_count;
>      uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
> +    int ret = 0;
>      uint8_t i;
>  
>      if (!migrate_use_multifd()) {
> @@ -1187,9 +1344,11 @@ int multifd_save_setup(Error **errp)
>      multifd_send_state->pages = multifd_pages_init(page_count);
>      qemu_sem_init(&multifd_send_state->sem_sync, 0);
>      qemu_sem_init(&multifd_send_state->channels_ready, 0);
> +    multifd_send_state->ops = multifd_ops[migrate_multifd_method()];
>  
>      for (i = 0; i < thread_count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
> +        int res;
>  
>          qemu_mutex_init(&p->mutex);
>          qemu_sem_init(&p->sem, 0);
> @@ -1202,8 +1361,12 @@ int multifd_save_setup(Error **errp)
>          p->packet = g_malloc0(p->packet_len);
>          p->name = g_strdup_printf("multifdsend_%d", i);
>          socket_send_channel_create(multifd_new_send_channel_async, p);
> +        res = multifd_send_state->ops->send_setup(p, errp);
> +        if (ret == 0) {
> +            ret = res;
> +        }
>      }
> -    return 0;
> +    return ret;
>  }
>  
>  struct {
> @@ -1214,6 +1377,8 @@ struct {
>      QemuSemaphore sem_sync;
>      /* global number of generated multifd packets */
>      uint64_t packet_num;
> +    /* multifd ops */
> +    MultiFDMethods *ops;
>  } *multifd_recv_state;
>  
>  static void multifd_recv_terminate_threads(Error *err)
> @@ -1246,7 +1411,6 @@ static void multifd_recv_terminate_threads(Error *err)
>  int multifd_load_cleanup(Error **errp)
>  {
>      int i;
> -    int ret = 0;
>  
>      if (!migrate_use_multifd()) {
>          return 0;
> @@ -1269,6 +1433,7 @@ int multifd_load_cleanup(Error **errp)
>          p->packet_len = 0;
>          g_free(p->packet);
>          p->packet = NULL;
> +        multifd_recv_state->ops->recv_cleanup(p);
>      }
>      qemu_sem_destroy(&multifd_recv_state->sem_sync);
>      g_free(multifd_recv_state->params);
> @@ -1276,7 +1441,7 @@ int multifd_load_cleanup(Error **errp)
>      g_free(multifd_recv_state);
>      multifd_recv_state = NULL;
>  
> -    return ret;
> +    return 0;
>  }
>  
>  static void multifd_recv_sync_main(void)
> @@ -1337,6 +1502,8 @@ static void *multifd_recv_thread(void *opaque)
>  
>          used = p->pages->used;
>          flags = p->flags;
> +        /* recv methods don't know how to handle the SYNC flag */
> +        p->flags &= ~MULTIFD_FLAG_SYNC;
>          trace_multifd_recv(p->id, p->packet_num, used, flags,
>                             p->next_packet_size);
>          p->num_packets++;
> @@ -1344,8 +1511,7 @@ static void *multifd_recv_thread(void *opaque)
>          qemu_mutex_unlock(&p->mutex);
>  
>          if (used) {
> -            ret = qio_channel_readv_all(p->c, p->pages->iov,
> -                                        used, &local_err);
> +            ret = multifd_recv_state->ops->recv_pages(p, used, &local_err);
>              if (ret != 0) {
>                  break;
>              }
> @@ -1384,9 +1550,11 @@ int multifd_load_setup(Error **errp)
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
>      atomic_set(&multifd_recv_state->count, 0);
>      qemu_sem_init(&multifd_recv_state->sem_sync, 0);
> +    multifd_recv_state->ops = multifd_ops[migrate_multifd_method()];
>  
>      for (i = 0; i < thread_count; i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +        int ret;
>  
>          qemu_mutex_init(&p->mutex);
>          qemu_sem_init(&p->sem_sync, 0);
> @@ -1396,6 +1564,10 @@ int multifd_load_setup(Error **errp)
>                        + sizeof(ram_addr_t) * page_count;
>          p->packet = g_malloc0(p->packet_len);
>          p->name = g_strdup_printf("multifdrecv_%d", i);
> +        ret = multifd_recv_state->ops->recv_setup(p, errp);
> +        if (ret != 0) {
> +            return ret;
> +        }
>      }
>      return 0;
>  }
> -- 
> 2.21.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK


^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [Qemu-devel] [PATCH v4 6/6] migration: Add zlib compression multifd support
  2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 6/6] migration: Add zlib compression multifd support Juan Quintela
@ 2019-06-14 12:56   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 10+ messages in thread
From: Dr. David Alan Gilbert @ 2019-06-14 12:56 UTC (permalink / raw)
  To: Juan Quintela
  Cc: Laurent Vivier, Thomas Huth, qemu-devel, Markus Armbruster,
	Paolo Bonzini

* Juan Quintela (quintela@redhat.com) wrote:
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  hw/core/qdev-properties.c |   2 +-
>  migration/ram.c           | 262 ++++++++++++++++++++++++++++++++++++++
>  qapi/migration.json       |   2 +-
>  tests/migration-test.c    |   6 +
>  4 files changed, 270 insertions(+), 2 deletions(-)
> 
> diff --git a/hw/core/qdev-properties.c b/hw/core/qdev-properties.c
> index ebeeb5c88d..e40aa806e2 100644
> --- a/hw/core/qdev-properties.c
> +++ b/hw/core/qdev-properties.c
> @@ -651,7 +651,7 @@ const PropertyInfo qdev_prop_fdc_drive_type = {
>  const PropertyInfo qdev_prop_multifd_compress = {
>      .name = "MultifdCompress",
>      .description = "multifd_compress values, "
> -                   "none",
> +                   "none/zlib",
>      .enum_table = &MultifdCompress_lookup,
>      .get = get_enum,
>      .set = set_enum,
> diff --git a/migration/ram.c b/migration/ram.c
> index 3b0002ddba..691ebd9108 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -583,6 +583,7 @@ exit:
>  #define MULTIFD_VERSION 1
>  
>  #define MULTIFD_FLAG_SYNC (1 << 0)
> +#define MULTIFD_FLAG_ZLIB (1 << 1)
>  
>  /* This value needs to be a multiple of qemu_target_page_size() */
>  #define MULTIFD_PACKET_SIZE (512 * 1024)
> @@ -625,6 +626,15 @@ typedef struct {
>      RAMBlock *block;
>  } MultiFDPages_t;
>  
> +struct zlib_data {
> +    /* stream for compression */
> +    z_stream zs;
> +    /* compressed buffer */
> +    uint8_t *zbuff;
> +    /* size of compressed buffer */
> +    uint32_t zbuff_len;
> +};
> +
>  typedef struct {
>      /* this fields are not changed once the thread is created */
>      /* channel number */
> @@ -841,8 +851,260 @@ static MultiFDMethods multifd_nocomp_ops = {
>      .recv_pages = nocomp_recv_pages
>  };
>  
> +/* Multifd zlib compression */
> +
> +/**
> + * zlib_send_setup: setup send side
> + *
> + * Setup each channel with zlib compression.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int zlib_send_setup(MultiFDSendParams *p, Error **errp)
> +{
> +    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
> +    struct zlib_data *z = g_malloc0(sizeof(struct zlib_data));
> +    z_stream *zs = &z->zs;
> +
> +    p->data = z;
> +    zs->zalloc = Z_NULL;
> +    zs->zfree = Z_NULL;
> +    zs->opaque = Z_NULL;
> +    if (deflateInit(zs, migrate_compress_level()) != Z_OK) {
> +        error_setg(errp, "multifd %d: deflate init failed", p->id);
> +        return -1;

We're leaking 'z' here? Or does zlib_send_cleanup happy?

Other than that I think we're OK.

Dave

> +    }
> +    /* We will never have more than page_count pages */
> +    z->zbuff_len = page_count * qemu_target_page_size();
> +    z->zbuff_len *= 2;
> +    z->zbuff = g_try_malloc(z->zbuff_len);
> +    if (!z->zbuff) {
> +        error_setg(errp, "multifd %d: out of memory for zbuff", p->id);
> +        return -1;
> +    }
> +    return 0;
> +}
> +
> +/**
> + * zlib_send_cleanup: cleanup send side
> + *
> + * Close the channel and return memory.
> + *
> + * @p: Params for the channel that we are using
> + */
> +static void zlib_send_cleanup(MultiFDSendParams *p)
> +{
> +    struct zlib_data *z = p->data;
> +
> +    deflateEnd(&z->zs);
> +    g_free(z->zbuff);
> +    z->zbuff = NULL;
> +    g_free(p->data);
> +    p->data = NULL;
> +}
> +
> +/**
> + * zlib_send_prepare: prepare date to be able to send
> + *
> + * Create a compressed buffer with all the pages that we are going to
> + * send.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @used: number of pages used
> + */
> +static int zlib_send_prepare(MultiFDSendParams *p, uint32_t used, Error **errp)
> +{
> +    struct iovec *iov = p->pages->iov;
> +    struct zlib_data *z = p->data;
> +    z_stream *zs = &z->zs;
> +    uint32_t out_size = 0;
> +    int ret;
> +    uint32_t i;
> +
> +    for (i = 0; i < used; i++) {
> +        uint32_t available = z->zbuff_len - out_size;
> +        int flush = Z_NO_FLUSH;
> +
> +        if (i == used  - 1) {
> +            flush = Z_SYNC_FLUSH;
> +        }
> +
> +        zs->avail_in = iov[i].iov_len;
> +        zs->next_in = iov[i].iov_base;
> +
> +        zs->avail_out = available;
> +        zs->next_out = z->zbuff + out_size;
> +
> +        ret = deflate(zs, flush);
> +        if (ret != Z_OK) {
> +            error_setg(errp, "multifd %d: deflate returned %d instead of Z_OK",
> +                       p->id, ret);
> +            return -1;
> +        }
> +        out_size += available - zs->avail_out;
> +    }
> +    p->next_packet_size = out_size;
> +    p->flags |= MULTIFD_FLAG_ZLIB;
> +
> +    return 0;
> +}
> +
> +/**
> + * zlib_send_write: do the actual write of the data
> + *
> + * Do the actual write of the comprresed buffer.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @used: number of pages used
> + * @errp: pointer to an error
> + */
> +static int zlib_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
> +{
> +    struct zlib_data *z = p->data;
> +
> +    return qio_channel_write_all(p->c, (void *)z->zbuff, p->next_packet_size,
> +                                 errp);
> +}
> +
> +/**
> + * zlib_recv_setup: setup receive side
> + *
> + * Create the compressed channel and buffer.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int zlib_recv_setup(MultiFDRecvParams *p, Error **errp)
> +{
> +    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
> +    struct zlib_data *z = g_malloc0(sizeof(struct zlib_data));
> +    z_stream *zs = &z->zs;
> +
> +    p->data = z;
> +    zs->zalloc = Z_NULL;
> +    zs->zfree = Z_NULL;
> +    zs->opaque = Z_NULL;
> +    zs->avail_in = 0;
> +    zs->next_in = Z_NULL;
> +    if (inflateInit(zs) != Z_OK) {
> +        error_setg(errp, "multifd %d: inflate init failed", p->id);
> +        return -1;
> +    }
> +    /* We will never have more than page_count pages */
> +    z->zbuff_len = page_count * qemu_target_page_size();
> +    /* We know compression "could" use more space */
> +    z->zbuff_len *= 2;
> +    z->zbuff = g_try_malloc(z->zbuff_len);
> +    if (!z->zbuff) {
> +        error_setg(errp, "multifd %d: out of memory for zbuff", p->id);
> +        return -1;
> +    }
> +    return 0;
> +}
> +
> +/**
> + * zlib_recv_cleanup: setup receive side
> + *
> + * For no compression this function does nothing.
> + *
> + * @p: Params for the channel that we are using
> + */
> +static void zlib_recv_cleanup(MultiFDRecvParams *p)
> +{
> +    struct zlib_data *z = p->data;
> +
> +    inflateEnd(&z->zs);
> +    g_free(z->zbuff);
> +    z->zbuff = NULL;
> +    g_free(p->data);
> +    p->data = NULL;
> +}
> +
> +/**
> + * zlib_recv_pages: read the data from the channel into actual pages
> + *
> + * Read the compressed buffer, and uncompress it into the actual
> + * pages.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @used: number of pages used
> + * @errp: pointer to an error
> + */
> +static int zlib_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
> +{
> +    uint32_t in_size = p->next_packet_size;
> +    uint32_t out_size = 0;
> +    uint32_t expected_size = used * qemu_target_page_size();
> +    struct zlib_data *z = p->data;
> +    z_stream *zs = &z->zs;
> +    int ret;
> +    int i;
> +
> +    if (p->flags != MULTIFD_FLAG_ZLIB) {
> +        error_setg(errp, "multifd %d: flags received %x flags expected %x",
> +                   p->id, MULTIFD_FLAG_ZLIB, p->flags);
> +        return -1;
> +    }
> +    ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp);
> +
> +    if (ret != 0) {
> +        return ret;
> +    }
> +
> +    zs->avail_in = in_size;
> +    zs->next_in = z->zbuff;
> +
> +    for (i = 0; i < used; i++) {
> +        struct iovec *iov = &p->pages->iov[i];
> +        int flush = Z_NO_FLUSH;
> +
> +        if (i == used  - 1) {
> +            flush = Z_SYNC_FLUSH;
> +        }
> +
> +        zs->avail_out = iov->iov_len;
> +        zs->next_out = iov->iov_base;
> +
> +        ret = inflate(zs, flush);
> +        if (ret != Z_OK) {
> +            error_setg(errp, "multifd %d: inflate returned %d instead of Z_OK",
> +                       p->id, ret);
> +            return ret;
> +        }
> +        out_size += iov->iov_len;
> +    }
> +    if (out_size != expected_size) {
> +        error_setg(errp, "multifd %d: packet size received %d size expected %d",
> +                   p->id, out_size, expected_size);
> +        return -1;
> +    }
> +    return 0;
> +}
> +
> +static MultiFDMethods multifd_zlib_ops = {
> +    .send_setup = zlib_send_setup,
> +    .send_cleanup = zlib_send_cleanup,
> +    .send_prepare = zlib_send_prepare,
> +    .send_write = zlib_send_write,
> +    .recv_setup = zlib_recv_setup,
> +    .recv_cleanup = zlib_recv_cleanup,
> +    .recv_pages = zlib_recv_pages
> +};
> +
>  static MultiFDMethods *multifd_ops[MULTIFD_COMPRESS__MAX] = {
>      [MULTIFD_COMPRESS_NONE] = &multifd_nocomp_ops,
> +    [MULTIFD_COMPRESS_ZLIB] = &multifd_zlib_ops,
>  };
>  
>  static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
> diff --git a/qapi/migration.json b/qapi/migration.json
> index 153527e120..085eba8f07 100644
> --- a/qapi/migration.json
> +++ b/qapi/migration.json
> @@ -493,7 +493,7 @@
>  #
>  ##
>  { 'enum': 'MultifdCompress',
> -  'data': [ 'none' ] }
> +  'data': [ 'none', 'zlib' ] }
>  
>  ##
>  # @MigrationParameter:
> diff --git a/tests/migration-test.c b/tests/migration-test.c
> index e5b8125e1c..e6995ae4e7 100644
> --- a/tests/migration-test.c
> +++ b/tests/migration-test.c
> @@ -1219,6 +1219,11 @@ static void test_multifd_tcp_none(void)
>      test_multifd_tcp("none");
>  }
>  
> +static void test_multifd_tcp_zlib(void)
> +{
> +    test_multifd_tcp("zlib");
> +}
> +
>  int main(int argc, char **argv)
>  {
>      char template[] = "/tmp/migration-test-XXXXXX";
> @@ -1275,6 +1280,7 @@ int main(int argc, char **argv)
>      qtest_add_func("/migration/xbzrle/unix", test_xbzrle_unix);
>      qtest_add_func("/migration/fd_proto", test_migrate_fd_proto);
>      qtest_add_func("/migration/multifd/tcp/none", test_multifd_tcp_none);
> +    qtest_add_func("/migration/multifd/tcp/zlib", test_multifd_tcp_zlib);
>  
>      ret = g_test_run();
>  
> -- 
> 2.21.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK


^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [Qemu-devel] [PATCH v4 5/6] migration: Make no compression operations into its own structure
  2019-06-14 11:26   ` Dr. David Alan Gilbert
@ 2019-06-14 17:33     ` Juan Quintela
  0 siblings, 0 replies; 10+ messages in thread
From: Juan Quintela @ 2019-06-14 17:33 UTC (permalink / raw)
  To: Dr. David Alan Gilbert
  Cc: Laurent Vivier, Thomas Huth, qemu-devel, Markus Armbruster,
	Paolo Bonzini

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> It will be used later.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> + */
>> +static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
>> +{
>> +    if (p->flags != 0) {
>> +        error_setg(errp, "multifd %d: flags received %x flags expected %x",
>> +                   p->id, MULTIFD_FLAG_ZLIB, p->flags);
>
> Can you just explain that a bit - the 'received' seems to be constant
> while the expected is p->flags - is that the right way around?
> Why would you expect FLAG_ZLIB in nocomp?

When I changed printf's to error_setg I did a bit *too much* of copy
paste.  Then I decided to put consistently received/expected in all
messages and clearly I failed.

Fixing, Thanks.


^ permalink raw reply	[flat|nested] 10+ messages in thread

end of thread, other threads:[~2019-06-14 18:58 UTC | newest]

Thread overview: 10+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-06-12 10:53 [Qemu-devel] [PATCH v4 0/6] Multifd compression support Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 1/6] migration-test: introduce functions to handle string parameters Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 2/6] migration: Make multifd_save_setup() get an Error parameter Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 3/6] migration: Make multifd_load_setup() " Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 4/6] migration: Add multifd-compress parameter Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 5/6] migration: Make no compression operations into its own structure Juan Quintela
2019-06-14 11:26   ` Dr. David Alan Gilbert
2019-06-14 17:33     ` Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 6/6] migration: Add zlib compression multifd support Juan Quintela
2019-06-14 12:56   ` Dr. David Alan Gilbert

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).