* [Qemu-devel] [PATCH v4 0/6] Multifd compression support
@ 2019-06-12 10:53 Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 1/6] migration-test: introduce functions to handle string parameters Juan Quintela
` (5 more replies)
0 siblings, 6 replies; 10+ messages in thread
From: Juan Quintela @ 2019-06-12 10:53 UTC (permalink / raw)
To: qemu-devel
Cc: Laurent Vivier, Thomas Huth, Juan Quintela, Markus Armbruster,
Dr. David Alan Gilbert, Paolo Bonzini
v4:
- improve the code left and right
- use the MIGRATION_FLAG_SYNC
- use qerrors properly
- pass errors everywhere (no more printfs)
- create cleanup/save operations
- merged zlib patches into one
- general patches alreody on the migration pull request.
- commented all the methods.
ToDo: Didn't add the sztd compression because I changed the zlib
code/methods quite a bit.
My understanding is that all the issues are gone.
v3:
- improve the code
- address David and Markus comments
- make compression code into methods
so we can add any other method ading just three functions
Please review, as far as I know everything is ok now.
Todo: Add zstd support
v2:
- improve the code left and right
- Split better the zlib code
- rename everything to v4.1
- Add tests for multifd-compress zlib
- Parameter is now an enum (soon will see sztd)
ToDo:
- Make operations for diferent methods:
* multifd_prepare_send_none/zlib
* multifd_send_none/zlib
* multifd_recv_none/zlib
- Use the MULTIFD_FLAG_ZLIB (it is unused so far).
Please review and comment.
v1:
This series create compression code on top of multifd. It is still
WIP, but it is already:
- faster that current compression code
- it does the minimum amount of copies possible
- we allow support for other compression codes
- it pass the multifd test sent in my previous series
Test for existing code didn't work because code is too slow, I need to
make downtime 10 times bigger to make it to converge on my test
machine. This code works with same limits that multifd no-
ToDo:
- move printf's to traces
- move code to a struct instead of if (zlib) inside the main threads.
- improve error handling.
Please, review and coment.
Juan Quintela (6):
migration-test: introduce functions to handle string parameters
migration: Make multifd_save_setup() get an Error parameter
migration: Make multifd_load_setup() get an Error parameter
migration: Add multifd-compress parameter
migration: Make no compression operations into its own structure
migration: Add zlib compression multifd support
hmp.c | 13 +
hw/core/qdev-properties.c | 13 +
include/hw/qdev-properties.h | 3 +
migration/migration.c | 34 ++-
migration/migration.h | 3 +-
migration/ram.c | 454 ++++++++++++++++++++++++++++++++++-
migration/ram.h | 4 +-
migration/rdma.c | 2 +-
qapi/migration.json | 30 ++-
tests/migration-test.c | 54 ++++-
10 files changed, 585 insertions(+), 25 deletions(-)
--
2.21.0
^ permalink raw reply [flat|nested] 10+ messages in thread
* [Qemu-devel] [PATCH v4 1/6] migration-test: introduce functions to handle string parameters
2019-06-12 10:53 [Qemu-devel] [PATCH v4 0/6] Multifd compression support Juan Quintela
@ 2019-06-12 10:53 ` Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 2/6] migration: Make multifd_save_setup() get an Error parameter Juan Quintela
` (4 subsequent siblings)
5 siblings, 0 replies; 10+ messages in thread
From: Juan Quintela @ 2019-06-12 10:53 UTC (permalink / raw)
To: qemu-devel
Cc: Laurent Vivier, Thomas Huth, Juan Quintela, Markus Armbruster,
Dr. David Alan Gilbert, Paolo Bonzini
Signed-off-by: Juan Quintela <quintela@redhat.com>
---
tests/migration-test.c | 37 +++++++++++++++++++++++++++++++++++++
1 file changed, 37 insertions(+)
diff --git a/tests/migration-test.c b/tests/migration-test.c
index 36d4910192..c7c311e02c 100644
--- a/tests/migration-test.c
+++ b/tests/migration-test.c
@@ -442,6 +442,43 @@ static void migrate_set_parameter_int(QTestState *who, const char *parameter,
migrate_check_parameter_int(who, parameter, value);
}
+static char *migrate_get_parameter_str(QTestState *who,
+ const char *parameter)
+{
+ QDict *rsp;
+ char *result;
+
+ rsp = wait_command(who, "{ 'execute': 'query-migrate-parameters' }");
+ result = g_strdup(qdict_get_str(rsp, parameter));
+ qobject_unref(rsp);
+ return result;
+}
+
+static void migrate_check_parameter_str(QTestState *who, const char *parameter,
+ const char *value)
+{
+ char *result;
+
+ result = migrate_get_parameter_str(who, parameter);
+ g_assert_cmpstr(result, ==, value);
+ g_free(result);
+}
+
+__attribute__((unused))
+static void migrate_set_parameter_str(QTestState *who, const char *parameter,
+ const char *value)
+{
+ QDict *rsp;
+
+ rsp = qtest_qmp(who,
+ "{ 'execute': 'migrate-set-parameters',"
+ "'arguments': { %s: %s } }",
+ parameter, value);
+ g_assert(qdict_haskey(rsp, "return"));
+ qobject_unref(rsp);
+ migrate_check_parameter_str(who, parameter, value);
+}
+
static void migrate_pause(QTestState *who)
{
QDict *rsp;
--
2.21.0
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Qemu-devel] [PATCH v4 2/6] migration: Make multifd_save_setup() get an Error parameter
2019-06-12 10:53 [Qemu-devel] [PATCH v4 0/6] Multifd compression support Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 1/6] migration-test: introduce functions to handle string parameters Juan Quintela
@ 2019-06-12 10:53 ` Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 3/6] migration: Make multifd_load_setup() " Juan Quintela
` (3 subsequent siblings)
5 siblings, 0 replies; 10+ messages in thread
From: Juan Quintela @ 2019-06-12 10:53 UTC (permalink / raw)
To: qemu-devel
Cc: Laurent Vivier, Thomas Huth, Juan Quintela, Markus Armbruster,
Dr. David Alan Gilbert, Paolo Bonzini
Signed-off-by: Juan Quintela <quintela@redhat.com>
---
migration/migration.c | 2 +-
migration/ram.c | 2 +-
migration/ram.h | 2 +-
3 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/migration/migration.c b/migration/migration.c
index 2865ae3fa9..0ac504be3c 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -3336,7 +3336,7 @@ void migrate_fd_connect(MigrationState *s, Error *error_in)
return;
}
- if (multifd_save_setup() != 0) {
+ if (multifd_save_setup(&error_in) != 0) {
migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
MIGRATION_STATUS_FAILED);
migrate_fd_cleanup(s);
diff --git a/migration/ram.c b/migration/ram.c
index 89eec7ee9d..4b65d22cb1 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1172,7 +1172,7 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
}
}
-int multifd_save_setup(void)
+int multifd_save_setup(Error **errp)
{
int thread_count;
uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
diff --git a/migration/ram.h b/migration/ram.h
index 936177b3e9..09feaad55b 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -42,7 +42,7 @@ int xbzrle_cache_resize(int64_t new_size, Error **errp);
uint64_t ram_bytes_remaining(void);
uint64_t ram_bytes_total(void);
-int multifd_save_setup(void);
+int multifd_save_setup(Error **errp);
void multifd_save_cleanup(void);
int multifd_load_setup(void);
int multifd_load_cleanup(Error **errp);
--
2.21.0
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Qemu-devel] [PATCH v4 3/6] migration: Make multifd_load_setup() get an Error parameter
2019-06-12 10:53 [Qemu-devel] [PATCH v4 0/6] Multifd compression support Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 1/6] migration-test: introduce functions to handle string parameters Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 2/6] migration: Make multifd_save_setup() get an Error parameter Juan Quintela
@ 2019-06-12 10:53 ` Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 4/6] migration: Add multifd-compress parameter Juan Quintela
` (2 subsequent siblings)
5 siblings, 0 replies; 10+ messages in thread
From: Juan Quintela @ 2019-06-12 10:53 UTC (permalink / raw)
To: qemu-devel
Cc: Laurent Vivier, Thomas Huth, Juan Quintela, Markus Armbruster,
Dr. David Alan Gilbert, Paolo Bonzini
We need to change the full chain to pass the Error parameter.
Signed-off-by: Juan Quintela <quintela@redhat.com>
---
migration/migration.c | 10 +++++-----
migration/migration.h | 2 +-
migration/ram.c | 2 +-
migration/ram.h | 2 +-
migration/rdma.c | 2 +-
5 files changed, 9 insertions(+), 9 deletions(-)
diff --git a/migration/migration.c b/migration/migration.c
index 0ac504be3c..4246bdd661 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -513,11 +513,11 @@ fail:
exit(EXIT_FAILURE);
}
-static void migration_incoming_setup(QEMUFile *f)
+static void migration_incoming_setup(QEMUFile *f, Error **errp)
{
MigrationIncomingState *mis = migration_incoming_get_current();
- if (multifd_load_setup() != 0) {
+ if (multifd_load_setup(errp) != 0) {
/* We haven't been able to create multifd threads
nothing better to do */
exit(EXIT_FAILURE);
@@ -567,13 +567,13 @@ static bool postcopy_try_recover(QEMUFile *f)
return false;
}
-void migration_fd_process_incoming(QEMUFile *f)
+void migration_fd_process_incoming(QEMUFile *f, Error **errp)
{
if (postcopy_try_recover(f)) {
return;
}
- migration_incoming_setup(f);
+ migration_incoming_setup(f, errp);
migration_incoming_process();
}
@@ -591,7 +591,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
return;
}
- migration_incoming_setup(f);
+ migration_incoming_setup(f, errp);
/*
* Common migration only needs one channel, so we can start
diff --git a/migration/migration.h b/migration/migration.h
index 780a096857..71c03353c3 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -237,7 +237,7 @@ struct MigrationState
void migrate_set_state(int *state, int old_state, int new_state);
-void migration_fd_process_incoming(QEMUFile *f);
+void migration_fd_process_incoming(QEMUFile *f, Error **errp);
void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp);
void migration_incoming_process(void);
diff --git a/migration/ram.c b/migration/ram.c
index 4b65d22cb1..b0ca989160 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1370,7 +1370,7 @@ static void *multifd_recv_thread(void *opaque)
return NULL;
}
-int multifd_load_setup(void)
+int multifd_load_setup(Error **errp)
{
int thread_count;
uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
diff --git a/migration/ram.h b/migration/ram.h
index 09feaad55b..dd1a736417 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -44,7 +44,7 @@ uint64_t ram_bytes_total(void);
int multifd_save_setup(Error **errp);
void multifd_save_cleanup(void);
-int multifd_load_setup(void);
+int multifd_load_setup(Error **errp);
int multifd_load_cleanup(Error **errp);
bool multifd_recv_all_channels_created(void);
bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
diff --git a/migration/rdma.c b/migration/rdma.c
index c1bcece53b..69389a8662 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -4019,7 +4019,7 @@ static void rdma_accept_incoming_migration(void *opaque)
}
rdma->migration_started_on_destination = 1;
- migration_fd_process_incoming(f);
+ migration_fd_process_incoming(f, errp);
}
void rdma_start_incoming_migration(const char *host_port, Error **errp)
--
2.21.0
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Qemu-devel] [PATCH v4 4/6] migration: Add multifd-compress parameter
2019-06-12 10:53 [Qemu-devel] [PATCH v4 0/6] Multifd compression support Juan Quintela
` (2 preceding siblings ...)
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 3/6] migration: Make multifd_load_setup() " Juan Quintela
@ 2019-06-12 10:53 ` Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 5/6] migration: Make no compression operations into its own structure Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 6/6] migration: Add zlib compression multifd support Juan Quintela
5 siblings, 0 replies; 10+ messages in thread
From: Juan Quintela @ 2019-06-12 10:53 UTC (permalink / raw)
To: qemu-devel
Cc: Laurent Vivier, Thomas Huth, Juan Quintela, Markus Armbruster,
Dr. David Alan Gilbert, Paolo Bonzini
Signed-off-by: Juan Quintela <quintela@redhat.com>
---
Rename it to NONE
Fix typos (dave)
We don't need to chek values returned by visit_type_MultifdCompress (markus)
Fix yet more typos (wei)
---
hmp.c | 13 +++++++++++++
hw/core/qdev-properties.c | 13 +++++++++++++
include/hw/qdev-properties.h | 3 +++
migration/migration.c | 13 +++++++++++++
qapi/migration.json | 30 +++++++++++++++++++++++++++---
tests/migration-test.c | 13 ++++++++++---
6 files changed, 79 insertions(+), 6 deletions(-)
diff --git a/hmp.c b/hmp.c
index be5e345c6f..b011f139ca 100644
--- a/hmp.c
+++ b/hmp.c
@@ -38,6 +38,7 @@
#include "qapi/qapi-commands-run-state.h"
#include "qapi/qapi-commands-tpm.h"
#include "qapi/qapi-commands-ui.h"
+#include "qapi/qapi-visit-migration.h"
#include "qapi/qmp/qdict.h"
#include "qapi/qmp/qerror.h"
#include "qapi/string-input-visitor.h"
@@ -435,6 +436,9 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
monitor_printf(mon, "%s: %u\n",
MigrationParameter_str(MIGRATION_PARAMETER_MULTIFD_CHANNELS),
params->multifd_channels);
+ monitor_printf(mon, "%s: %s\n",
+ MigrationParameter_str(MIGRATION_PARAMETER_MULTIFD_COMPRESS),
+ MultifdCompress_str(params->multifd_compress));
monitor_printf(mon, "%s: %" PRIu64 "\n",
MigrationParameter_str(MIGRATION_PARAMETER_XBZRLE_CACHE_SIZE),
params->xbzrle_cache_size);
@@ -1736,6 +1740,7 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
MigrateSetParameters *p = g_new0(MigrateSetParameters, 1);
uint64_t valuebw = 0;
uint64_t cache_size;
+ MultifdCompress compress_type;
Error *err = NULL;
int val, ret;
@@ -1821,6 +1826,14 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
p->has_multifd_channels = true;
visit_type_int(v, param, &p->multifd_channels, &err);
break;
+ case MIGRATION_PARAMETER_MULTIFD_COMPRESS:
+ p->has_multifd_compress = true;
+ visit_type_MultifdCompress(v, param, &compress_type, &err);
+ if (err) {
+ break;
+ }
+ p->multifd_compress = compress_type;
+ break;
case MIGRATION_PARAMETER_XBZRLE_CACHE_SIZE:
p->has_xbzrle_cache_size = true;
visit_type_size(v, param, &cache_size, &err);
diff --git a/hw/core/qdev-properties.c b/hw/core/qdev-properties.c
index 5da1439a8b..ebeeb5c88d 100644
--- a/hw/core/qdev-properties.c
+++ b/hw/core/qdev-properties.c
@@ -5,6 +5,7 @@
#include "hw/pci/pci.h"
#include "qapi/qmp/qerror.h"
#include "qemu/error-report.h"
+#include "qapi/qapi-types-migration.h"
#include "hw/block/block.h"
#include "net/hub.h"
#include "qapi/visitor.h"
@@ -645,6 +646,18 @@ const PropertyInfo qdev_prop_fdc_drive_type = {
.set_default_value = set_default_value_enum,
};
+/* --- MultifdCompress --- */
+
+const PropertyInfo qdev_prop_multifd_compress = {
+ .name = "MultifdCompress",
+ .description = "multifd_compress values, "
+ "none",
+ .enum_table = &MultifdCompress_lookup,
+ .get = get_enum,
+ .set = set_enum,
+ .set_default_value = set_default_value_enum,
+};
+
/* --- pci address --- */
/*
diff --git a/include/hw/qdev-properties.h b/include/hw/qdev-properties.h
index 1eae5ab056..34d906593b 100644
--- a/include/hw/qdev-properties.h
+++ b/include/hw/qdev-properties.h
@@ -23,6 +23,7 @@ extern const PropertyInfo qdev_prop_tpm;
extern const PropertyInfo qdev_prop_ptr;
extern const PropertyInfo qdev_prop_macaddr;
extern const PropertyInfo qdev_prop_on_off_auto;
+extern const PropertyInfo qdev_prop_multifd_compress;
extern const PropertyInfo qdev_prop_losttickpolicy;
extern const PropertyInfo qdev_prop_blockdev_on_error;
extern const PropertyInfo qdev_prop_bios_chs_trans;
@@ -205,6 +206,8 @@ extern const PropertyInfo qdev_prop_pcie_link_width;
DEFINE_PROP(_n, _s, _f, qdev_prop_macaddr, MACAddr)
#define DEFINE_PROP_ON_OFF_AUTO(_n, _s, _f, _d) \
DEFINE_PROP_SIGNED(_n, _s, _f, _d, qdev_prop_on_off_auto, OnOffAuto)
+#define DEFINE_PROP_MULTIFD_COMPRESS(_n, _s, _f, _d) \
+ DEFINE_PROP_SIGNED(_n, _s, _f, _d, qdev_prop_multifd_compress, MultifdCompress)
#define DEFINE_PROP_LOSTTICKPOLICY(_n, _s, _f, _d) \
DEFINE_PROP_SIGNED(_n, _s, _f, _d, qdev_prop_losttickpolicy, \
LostTickPolicy)
diff --git a/migration/migration.c b/migration/migration.c
index 4246bdd661..3f17d8f2f8 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -82,6 +82,7 @@
/* The delay time (in ms) between two COLO checkpoints */
#define DEFAULT_MIGRATE_X_CHECKPOINT_DELAY (200 * 100)
#define DEFAULT_MIGRATE_MULTIFD_CHANNELS 2
+#define DEFAULT_MIGRATE_MULTIFD_COMPRESS MULTIFD_COMPRESS_NONE
/* Background transfer rate for postcopy, 0 means unlimited, note
* that page requests can still exceed this limit.
@@ -769,6 +770,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
params->block_incremental = s->parameters.block_incremental;
params->has_multifd_channels = true;
params->multifd_channels = s->parameters.multifd_channels;
+ params->has_multifd_compress = true;
+ params->multifd_compress = s->parameters.multifd_compress;
params->has_xbzrle_cache_size = true;
params->xbzrle_cache_size = s->parameters.xbzrle_cache_size;
params->has_max_postcopy_bandwidth = true;
@@ -1268,6 +1271,9 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
if (params->has_multifd_channels) {
dest->multifd_channels = params->multifd_channels;
}
+ if (params->has_multifd_compress) {
+ dest->multifd_compress = params->multifd_compress;
+ }
if (params->has_xbzrle_cache_size) {
dest->xbzrle_cache_size = params->xbzrle_cache_size;
}
@@ -1364,6 +1370,9 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
if (params->has_multifd_channels) {
s->parameters.multifd_channels = params->multifd_channels;
}
+ if (params->has_multifd_compress) {
+ s->parameters.multifd_compress = params->multifd_compress;
+ }
if (params->has_xbzrle_cache_size) {
s->parameters.xbzrle_cache_size = params->xbzrle_cache_size;
xbzrle_cache_resize(params->xbzrle_cache_size, errp);
@@ -3406,6 +3415,9 @@ static Property migration_properties[] = {
DEFINE_PROP_UINT8("multifd-channels", MigrationState,
parameters.multifd_channels,
DEFAULT_MIGRATE_MULTIFD_CHANNELS),
+ DEFINE_PROP_MULTIFD_COMPRESS("multifd-compress", MigrationState,
+ parameters.multifd_compress,
+ DEFAULT_MIGRATE_MULTIFD_COMPRESS),
DEFINE_PROP_SIZE("xbzrle-cache-size", MigrationState,
parameters.xbzrle_cache_size,
DEFAULT_MIGRATE_XBZRLE_CACHE_SIZE),
@@ -3495,6 +3507,7 @@ static void migration_instance_init(Object *obj)
params->has_x_checkpoint_delay = true;
params->has_block_incremental = true;
params->has_multifd_channels = true;
+ params->has_multifd_compress = true;
params->has_xbzrle_cache_size = true;
params->has_max_postcopy_bandwidth = true;
params->has_max_cpu_throttle = true;
diff --git a/qapi/migration.json b/qapi/migration.json
index 9cfbaf8c6c..153527e120 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -482,6 +482,19 @@
##
{ 'command': 'query-migrate-capabilities', 'returns': ['MigrationCapabilityStatus']}
+##
+# @MultifdCompress:
+#
+# An enumeration of multifd compression.
+#
+# @none: no compression.
+#
+# Since: 4.1
+#
+##
+{ 'enum': 'MultifdCompress',
+ 'data': [ 'none' ] }
+
##
# @MigrationParameter:
#
@@ -580,6 +593,9 @@
# @max-cpu-throttle: maximum cpu throttle percentage.
# Defaults to 99. (Since 3.1)
#
+# @multifd-compress: Which compression method to use.
+# Defaults to none. (Since 4.1)
+#
# Since: 2.4
##
{ 'enum': 'MigrationParameter',
@@ -592,7 +608,7 @@
'downtime-limit', 'x-checkpoint-delay', 'block-incremental',
'multifd-channels',
'xbzrle-cache-size', 'max-postcopy-bandwidth',
- 'max-cpu-throttle' ] }
+ 'max-cpu-throttle', 'multifd-compress' ] }
##
# @MigrateSetParameters:
@@ -682,6 +698,9 @@
# @max-cpu-throttle: maximum cpu throttle percentage.
# The default value is 99. (Since 3.1)
#
+# @multifd-compress: Which compression method to use.
+# Defaults to none. (Since 4.1)
+#
# Since: 2.4
##
# TODO either fuse back into MigrationParameters, or make
@@ -707,7 +726,8 @@
'*multifd-channels': 'int',
'*xbzrle-cache-size': 'size',
'*max-postcopy-bandwidth': 'size',
- '*max-cpu-throttle': 'int' } }
+ '*max-cpu-throttle': 'int',
+ '*multifd-compress': 'MultifdCompress' } }
##
# @migrate-set-parameters:
@@ -817,6 +837,9 @@
# Defaults to 99.
# (Since 3.1)
#
+# @multifd-compress: Which compression method to use.
+# Defaults to none. (Since 4.1)
+#
# Since: 2.4
##
{ 'struct': 'MigrationParameters',
@@ -840,7 +863,8 @@
'*multifd-channels': 'uint8',
'*xbzrle-cache-size': 'size',
'*max-postcopy-bandwidth': 'size',
- '*max-cpu-throttle':'uint8'} }
+ '*max-cpu-throttle': 'uint8',
+ '*multifd-compress': 'MultifdCompress' } }
##
# @query-migrate-parameters:
diff --git a/tests/migration-test.c b/tests/migration-test.c
index c7c311e02c..e5b8125e1c 100644
--- a/tests/migration-test.c
+++ b/tests/migration-test.c
@@ -464,7 +464,6 @@ static void migrate_check_parameter_str(QTestState *who, const char *parameter,
g_free(result);
}
-__attribute__((unused))
static void migrate_set_parameter_str(QTestState *who, const char *parameter,
const char *value)
{
@@ -1165,7 +1164,7 @@ static void test_migrate_fd_proto(void)
test_migrate_end(from, to, true);
}
-static void test_multifd_tcp(void)
+static void test_multifd_tcp(const char *method)
{
char *uri;
QTestState *from, *to;
@@ -1187,6 +1186,9 @@ static void test_multifd_tcp(void)
migrate_set_parameter_int(from, "multifd-channels", 2);
migrate_set_parameter_int(to, "multifd-channels", 2);
+ migrate_set_parameter_str(from, "multifd-compress", method);
+ migrate_set_parameter_str(to, "multifd-compress", method);
+
migrate_set_capability(from, "multifd", "true");
migrate_set_capability(to, "multifd", "true");
/* Wait for the first serial output from the source */
@@ -1212,6 +1214,11 @@ static void test_multifd_tcp(void)
free(uri);
}
+static void test_multifd_tcp_none(void)
+{
+ test_multifd_tcp("none");
+}
+
int main(int argc, char **argv)
{
char template[] = "/tmp/migration-test-XXXXXX";
@@ -1267,7 +1274,7 @@ int main(int argc, char **argv)
/* qtest_add_func("/migration/ignore_shared", test_ignore_shared); */
qtest_add_func("/migration/xbzrle/unix", test_xbzrle_unix);
qtest_add_func("/migration/fd_proto", test_migrate_fd_proto);
- qtest_add_func("/migration/multifd/tcp", test_multifd_tcp);
+ qtest_add_func("/migration/multifd/tcp/none", test_multifd_tcp_none);
ret = g_test_run();
--
2.21.0
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Qemu-devel] [PATCH v4 5/6] migration: Make no compression operations into its own structure
2019-06-12 10:53 [Qemu-devel] [PATCH v4 0/6] Multifd compression support Juan Quintela
` (3 preceding siblings ...)
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 4/6] migration: Add multifd-compress parameter Juan Quintela
@ 2019-06-12 10:53 ` Juan Quintela
2019-06-14 11:26 ` Dr. David Alan Gilbert
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 6/6] migration: Add zlib compression multifd support Juan Quintela
5 siblings, 1 reply; 10+ messages in thread
From: Juan Quintela @ 2019-06-12 10:53 UTC (permalink / raw)
To: qemu-devel
Cc: Laurent Vivier, Thomas Huth, Juan Quintela, Markus Armbruster,
Dr. David Alan Gilbert, Paolo Bonzini
It will be used later.
Signed-off-by: Juan Quintela <quintela@redhat.com>
---
Move setup of ->ops helper to proper place (wei)
Rename s/none/nocomp/ (dave)
---
migration/migration.c | 9 ++
migration/migration.h | 1 +
migration/ram.c | 188 ++++++++++++++++++++++++++++++++++++++++--
3 files changed, 190 insertions(+), 8 deletions(-)
diff --git a/migration/migration.c b/migration/migration.c
index 3f17d8f2f8..a3526d395b 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -2174,6 +2174,15 @@ int migrate_multifd_channels(void)
return s->parameters.multifd_channels;
}
+int migrate_multifd_method(void)
+{
+ MigrationState *s;
+
+ s = migrate_get_current();
+
+ return s->parameters.multifd_compress;
+}
+
int migrate_use_xbzrle(void)
{
MigrationState *s;
diff --git a/migration/migration.h b/migration/migration.h
index 71c03353c3..437abf3405 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -270,6 +270,7 @@ bool migrate_auto_converge(void);
bool migrate_use_multifd(void);
bool migrate_pause_before_switchover(void);
int migrate_multifd_channels(void);
+int migrate_multifd_method(void);
int migrate_use_xbzrle(void);
int64_t migrate_xbzrle_cache_size(void);
diff --git a/migration/ram.c b/migration/ram.c
index b0ca989160..3b0002ddba 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -45,6 +45,7 @@
#include "page_cache.h"
#include "qemu/error-report.h"
#include "qapi/error.h"
+#include "qapi/qapi-types-migration.h"
#include "qapi/qapi-events-migration.h"
#include "qapi/qmp/qerror.h"
#include "trace.h"
@@ -661,6 +662,8 @@ typedef struct {
uint64_t num_packets;
/* pages sent through this channel */
uint64_t num_pages;
+ /* used for compression methods */
+ void *data;
} MultiFDSendParams;
typedef struct {
@@ -696,8 +699,152 @@ typedef struct {
uint64_t num_pages;
/* syncs main thread and channels */
QemuSemaphore sem_sync;
+ /* used for de-compression methods */
+ void *data;
} MultiFDRecvParams;
+typedef struct {
+ /* Setup for sending side */
+ int (*send_setup)(MultiFDSendParams *p, Error **errp);
+ /* Cleanup for sending side */
+ void (*send_cleanup)(MultiFDSendParams *p);
+ /* Prepare the send packet */
+ int (*send_prepare)(MultiFDSendParams *p, uint32_t used, Error **errp);
+ /* Write the send packet */
+ int (*send_write)(MultiFDSendParams *p, uint32_t used, Error **errp);
+ /* Setup for receiving side */
+ int (*recv_setup)(MultiFDRecvParams *p, Error **errp);
+ /* Cleanup for receiving side */
+ void (*recv_cleanup)(MultiFDRecvParams *p);
+ /* Read all pages */
+ int (*recv_pages)(MultiFDRecvParams *p, uint32_t used, Error **errp);
+} MultiFDMethods;
+
+/* Multifd without compression */
+
+/**
+ * nocomp_send_setup: setup send side
+ *
+ * For no compression this function does nothing.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
+{
+ return 0;
+}
+
+/**
+ * nocomp_send_cleanup: cleanup send side
+ *
+ * For no compression this function does nothing.
+ *
+ * @p: Params for the channel that we are using
+ */
+static void nocomp_send_cleanup(MultiFDSendParams *p)
+{
+ return;
+}
+
+/**
+ * nocomp_send_prepare: prepare date to be able to send
+ *
+ * For no compression we just have to calculate the size of the
+ * packet.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @used: number of pages used
+ * @errp: pointer to an error
+ */
+static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used,
+ Error **errp)
+{
+ p->next_packet_size = used * qemu_target_page_size();
+ return 0;
+}
+
+/**
+ * nocomp_send_write: do the actual write of the data
+ *
+ * For no compression we just have to write the data.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @used: number of pages used
+ * @errp: pointer to an error
+ */
+static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
+{
+ return qio_channel_writev_all(p->c, p->pages->iov, used, errp);
+}
+
+/**
+ * nocomp_recv_setup: setup receive side
+ *
+ * For no compression this function does nothing.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
+{
+ return 0;
+}
+
+/**
+ * nocomp_recv_cleanup: setup receive side
+ *
+ * For no compression this function does nothing.
+ *
+ * @p: Params for the channel that we are using
+ */
+static void nocomp_recv_cleanup(MultiFDRecvParams *p)
+{
+}
+
+/**
+ * nocomp_recv_pages: read the data from the channel into actual pages
+ *
+ * For no compression we just need to read things into the correct place.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @used: number of pages used
+ * @errp: pointer to an error
+ */
+static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
+{
+ if (p->flags != 0) {
+ error_setg(errp, "multifd %d: flags received %x flags expected %x",
+ p->id, MULTIFD_FLAG_ZLIB, p->flags);
+ return -1;
+ }
+ return qio_channel_readv_all(p->c, p->pages->iov, used, errp);
+}
+
+static MultiFDMethods multifd_nocomp_ops = {
+ .send_setup = nocomp_send_setup,
+ .send_cleanup = nocomp_send_cleanup,
+ .send_prepare = nocomp_send_prepare,
+ .send_write = nocomp_send_write,
+ .recv_setup = nocomp_recv_setup,
+ .recv_cleanup = nocomp_recv_cleanup,
+ .recv_pages = nocomp_recv_pages
+};
+
+static MultiFDMethods *multifd_ops[MULTIFD_COMPRESS__MAX] = {
+ [MULTIFD_COMPRESS_NONE] = &multifd_nocomp_ops,
+};
+
static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
{
MultiFDInit_t msg;
@@ -900,6 +1047,8 @@ struct {
uint64_t packet_num;
/* send channels ready */
QemuSemaphore channels_ready;
+ /* multifd ops */
+ MultiFDMethods *ops;
} *multifd_send_state;
/*
@@ -1030,6 +1179,7 @@ void multifd_save_cleanup(void)
p->packet_len = 0;
g_free(p->packet);
p->packet = NULL;
+ multifd_send_state->ops->send_cleanup(p);
}
qemu_sem_destroy(&multifd_send_state->channels_ready);
qemu_sem_destroy(&multifd_send_state->sem_sync);
@@ -1097,7 +1247,14 @@ static void *multifd_send_thread(void *opaque)
uint64_t packet_num = p->packet_num;
uint32_t flags = p->flags;
- p->next_packet_size = used * qemu_target_page_size();
+ if (used) {
+ ret = multifd_send_state->ops->send_prepare(p, used,
+ &local_err);
+ if (ret != 0) {
+ qemu_mutex_unlock(&p->mutex);
+ break;
+ }
+ }
multifd_send_fill_packet(p);
p->flags = 0;
p->num_packets++;
@@ -1115,8 +1272,7 @@ static void *multifd_send_thread(void *opaque)
}
if (used) {
- ret = qio_channel_writev_all(p->c, p->pages->iov,
- used, &local_err);
+ ret = multifd_send_state->ops->send_write(p, used, &local_err);
if (ret != 0) {
break;
}
@@ -1176,6 +1332,7 @@ int multifd_save_setup(Error **errp)
{
int thread_count;
uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+ int ret = 0;
uint8_t i;
if (!migrate_use_multifd()) {
@@ -1187,9 +1344,11 @@ int multifd_save_setup(Error **errp)
multifd_send_state->pages = multifd_pages_init(page_count);
qemu_sem_init(&multifd_send_state->sem_sync, 0);
qemu_sem_init(&multifd_send_state->channels_ready, 0);
+ multifd_send_state->ops = multifd_ops[migrate_multifd_method()];
for (i = 0; i < thread_count; i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
+ int res;
qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem, 0);
@@ -1202,8 +1361,12 @@ int multifd_save_setup(Error **errp)
p->packet = g_malloc0(p->packet_len);
p->name = g_strdup_printf("multifdsend_%d", i);
socket_send_channel_create(multifd_new_send_channel_async, p);
+ res = multifd_send_state->ops->send_setup(p, errp);
+ if (ret == 0) {
+ ret = res;
+ }
}
- return 0;
+ return ret;
}
struct {
@@ -1214,6 +1377,8 @@ struct {
QemuSemaphore sem_sync;
/* global number of generated multifd packets */
uint64_t packet_num;
+ /* multifd ops */
+ MultiFDMethods *ops;
} *multifd_recv_state;
static void multifd_recv_terminate_threads(Error *err)
@@ -1246,7 +1411,6 @@ static void multifd_recv_terminate_threads(Error *err)
int multifd_load_cleanup(Error **errp)
{
int i;
- int ret = 0;
if (!migrate_use_multifd()) {
return 0;
@@ -1269,6 +1433,7 @@ int multifd_load_cleanup(Error **errp)
p->packet_len = 0;
g_free(p->packet);
p->packet = NULL;
+ multifd_recv_state->ops->recv_cleanup(p);
}
qemu_sem_destroy(&multifd_recv_state->sem_sync);
g_free(multifd_recv_state->params);
@@ -1276,7 +1441,7 @@ int multifd_load_cleanup(Error **errp)
g_free(multifd_recv_state);
multifd_recv_state = NULL;
- return ret;
+ return 0;
}
static void multifd_recv_sync_main(void)
@@ -1337,6 +1502,8 @@ static void *multifd_recv_thread(void *opaque)
used = p->pages->used;
flags = p->flags;
+ /* recv methods don't know how to handle the SYNC flag */
+ p->flags &= ~MULTIFD_FLAG_SYNC;
trace_multifd_recv(p->id, p->packet_num, used, flags,
p->next_packet_size);
p->num_packets++;
@@ -1344,8 +1511,7 @@ static void *multifd_recv_thread(void *opaque)
qemu_mutex_unlock(&p->mutex);
if (used) {
- ret = qio_channel_readv_all(p->c, p->pages->iov,
- used, &local_err);
+ ret = multifd_recv_state->ops->recv_pages(p, used, &local_err);
if (ret != 0) {
break;
}
@@ -1384,9 +1550,11 @@ int multifd_load_setup(Error **errp)
multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
atomic_set(&multifd_recv_state->count, 0);
qemu_sem_init(&multifd_recv_state->sem_sync, 0);
+ multifd_recv_state->ops = multifd_ops[migrate_multifd_method()];
for (i = 0; i < thread_count; i++) {
MultiFDRecvParams *p = &multifd_recv_state->params[i];
+ int ret;
qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem_sync, 0);
@@ -1396,6 +1564,10 @@ int multifd_load_setup(Error **errp)
+ sizeof(ram_addr_t) * page_count;
p->packet = g_malloc0(p->packet_len);
p->name = g_strdup_printf("multifdrecv_%d", i);
+ ret = multifd_recv_state->ops->recv_setup(p, errp);
+ if (ret != 0) {
+ return ret;
+ }
}
return 0;
}
--
2.21.0
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Qemu-devel] [PATCH v4 6/6] migration: Add zlib compression multifd support
2019-06-12 10:53 [Qemu-devel] [PATCH v4 0/6] Multifd compression support Juan Quintela
` (4 preceding siblings ...)
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 5/6] migration: Make no compression operations into its own structure Juan Quintela
@ 2019-06-12 10:53 ` Juan Quintela
2019-06-14 12:56 ` Dr. David Alan Gilbert
5 siblings, 1 reply; 10+ messages in thread
From: Juan Quintela @ 2019-06-12 10:53 UTC (permalink / raw)
To: qemu-devel
Cc: Laurent Vivier, Thomas Huth, Juan Quintela, Markus Armbruster,
Dr. David Alan Gilbert, Paolo Bonzini
Signed-off-by: Juan Quintela <quintela@redhat.com>
---
hw/core/qdev-properties.c | 2 +-
migration/ram.c | 262 ++++++++++++++++++++++++++++++++++++++
qapi/migration.json | 2 +-
tests/migration-test.c | 6 +
4 files changed, 270 insertions(+), 2 deletions(-)
diff --git a/hw/core/qdev-properties.c b/hw/core/qdev-properties.c
index ebeeb5c88d..e40aa806e2 100644
--- a/hw/core/qdev-properties.c
+++ b/hw/core/qdev-properties.c
@@ -651,7 +651,7 @@ const PropertyInfo qdev_prop_fdc_drive_type = {
const PropertyInfo qdev_prop_multifd_compress = {
.name = "MultifdCompress",
.description = "multifd_compress values, "
- "none",
+ "none/zlib",
.enum_table = &MultifdCompress_lookup,
.get = get_enum,
.set = set_enum,
diff --git a/migration/ram.c b/migration/ram.c
index 3b0002ddba..691ebd9108 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -583,6 +583,7 @@ exit:
#define MULTIFD_VERSION 1
#define MULTIFD_FLAG_SYNC (1 << 0)
+#define MULTIFD_FLAG_ZLIB (1 << 1)
/* This value needs to be a multiple of qemu_target_page_size() */
#define MULTIFD_PACKET_SIZE (512 * 1024)
@@ -625,6 +626,15 @@ typedef struct {
RAMBlock *block;
} MultiFDPages_t;
+struct zlib_data {
+ /* stream for compression */
+ z_stream zs;
+ /* compressed buffer */
+ uint8_t *zbuff;
+ /* size of compressed buffer */
+ uint32_t zbuff_len;
+};
+
typedef struct {
/* this fields are not changed once the thread is created */
/* channel number */
@@ -841,8 +851,260 @@ static MultiFDMethods multifd_nocomp_ops = {
.recv_pages = nocomp_recv_pages
};
+/* Multifd zlib compression */
+
+/**
+ * zlib_send_setup: setup send side
+ *
+ * Setup each channel with zlib compression.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int zlib_send_setup(MultiFDSendParams *p, Error **errp)
+{
+ uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+ struct zlib_data *z = g_malloc0(sizeof(struct zlib_data));
+ z_stream *zs = &z->zs;
+
+ p->data = z;
+ zs->zalloc = Z_NULL;
+ zs->zfree = Z_NULL;
+ zs->opaque = Z_NULL;
+ if (deflateInit(zs, migrate_compress_level()) != Z_OK) {
+ error_setg(errp, "multifd %d: deflate init failed", p->id);
+ return -1;
+ }
+ /* We will never have more than page_count pages */
+ z->zbuff_len = page_count * qemu_target_page_size();
+ z->zbuff_len *= 2;
+ z->zbuff = g_try_malloc(z->zbuff_len);
+ if (!z->zbuff) {
+ error_setg(errp, "multifd %d: out of memory for zbuff", p->id);
+ return -1;
+ }
+ return 0;
+}
+
+/**
+ * zlib_send_cleanup: cleanup send side
+ *
+ * Close the channel and return memory.
+ *
+ * @p: Params for the channel that we are using
+ */
+static void zlib_send_cleanup(MultiFDSendParams *p)
+{
+ struct zlib_data *z = p->data;
+
+ deflateEnd(&z->zs);
+ g_free(z->zbuff);
+ z->zbuff = NULL;
+ g_free(p->data);
+ p->data = NULL;
+}
+
+/**
+ * zlib_send_prepare: prepare date to be able to send
+ *
+ * Create a compressed buffer with all the pages that we are going to
+ * send.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @used: number of pages used
+ */
+static int zlib_send_prepare(MultiFDSendParams *p, uint32_t used, Error **errp)
+{
+ struct iovec *iov = p->pages->iov;
+ struct zlib_data *z = p->data;
+ z_stream *zs = &z->zs;
+ uint32_t out_size = 0;
+ int ret;
+ uint32_t i;
+
+ for (i = 0; i < used; i++) {
+ uint32_t available = z->zbuff_len - out_size;
+ int flush = Z_NO_FLUSH;
+
+ if (i == used - 1) {
+ flush = Z_SYNC_FLUSH;
+ }
+
+ zs->avail_in = iov[i].iov_len;
+ zs->next_in = iov[i].iov_base;
+
+ zs->avail_out = available;
+ zs->next_out = z->zbuff + out_size;
+
+ ret = deflate(zs, flush);
+ if (ret != Z_OK) {
+ error_setg(errp, "multifd %d: deflate returned %d instead of Z_OK",
+ p->id, ret);
+ return -1;
+ }
+ out_size += available - zs->avail_out;
+ }
+ p->next_packet_size = out_size;
+ p->flags |= MULTIFD_FLAG_ZLIB;
+
+ return 0;
+}
+
+/**
+ * zlib_send_write: do the actual write of the data
+ *
+ * Do the actual write of the comprresed buffer.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @used: number of pages used
+ * @errp: pointer to an error
+ */
+static int zlib_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
+{
+ struct zlib_data *z = p->data;
+
+ return qio_channel_write_all(p->c, (void *)z->zbuff, p->next_packet_size,
+ errp);
+}
+
+/**
+ * zlib_recv_setup: setup receive side
+ *
+ * Create the compressed channel and buffer.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int zlib_recv_setup(MultiFDRecvParams *p, Error **errp)
+{
+ uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+ struct zlib_data *z = g_malloc0(sizeof(struct zlib_data));
+ z_stream *zs = &z->zs;
+
+ p->data = z;
+ zs->zalloc = Z_NULL;
+ zs->zfree = Z_NULL;
+ zs->opaque = Z_NULL;
+ zs->avail_in = 0;
+ zs->next_in = Z_NULL;
+ if (inflateInit(zs) != Z_OK) {
+ error_setg(errp, "multifd %d: inflate init failed", p->id);
+ return -1;
+ }
+ /* We will never have more than page_count pages */
+ z->zbuff_len = page_count * qemu_target_page_size();
+ /* We know compression "could" use more space */
+ z->zbuff_len *= 2;
+ z->zbuff = g_try_malloc(z->zbuff_len);
+ if (!z->zbuff) {
+ error_setg(errp, "multifd %d: out of memory for zbuff", p->id);
+ return -1;
+ }
+ return 0;
+}
+
+/**
+ * zlib_recv_cleanup: setup receive side
+ *
+ * For no compression this function does nothing.
+ *
+ * @p: Params for the channel that we are using
+ */
+static void zlib_recv_cleanup(MultiFDRecvParams *p)
+{
+ struct zlib_data *z = p->data;
+
+ inflateEnd(&z->zs);
+ g_free(z->zbuff);
+ z->zbuff = NULL;
+ g_free(p->data);
+ p->data = NULL;
+}
+
+/**
+ * zlib_recv_pages: read the data from the channel into actual pages
+ *
+ * Read the compressed buffer, and uncompress it into the actual
+ * pages.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @used: number of pages used
+ * @errp: pointer to an error
+ */
+static int zlib_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
+{
+ uint32_t in_size = p->next_packet_size;
+ uint32_t out_size = 0;
+ uint32_t expected_size = used * qemu_target_page_size();
+ struct zlib_data *z = p->data;
+ z_stream *zs = &z->zs;
+ int ret;
+ int i;
+
+ if (p->flags != MULTIFD_FLAG_ZLIB) {
+ error_setg(errp, "multifd %d: flags received %x flags expected %x",
+ p->id, MULTIFD_FLAG_ZLIB, p->flags);
+ return -1;
+ }
+ ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp);
+
+ if (ret != 0) {
+ return ret;
+ }
+
+ zs->avail_in = in_size;
+ zs->next_in = z->zbuff;
+
+ for (i = 0; i < used; i++) {
+ struct iovec *iov = &p->pages->iov[i];
+ int flush = Z_NO_FLUSH;
+
+ if (i == used - 1) {
+ flush = Z_SYNC_FLUSH;
+ }
+
+ zs->avail_out = iov->iov_len;
+ zs->next_out = iov->iov_base;
+
+ ret = inflate(zs, flush);
+ if (ret != Z_OK) {
+ error_setg(errp, "multifd %d: inflate returned %d instead of Z_OK",
+ p->id, ret);
+ return ret;
+ }
+ out_size += iov->iov_len;
+ }
+ if (out_size != expected_size) {
+ error_setg(errp, "multifd %d: packet size received %d size expected %d",
+ p->id, out_size, expected_size);
+ return -1;
+ }
+ return 0;
+}
+
+static MultiFDMethods multifd_zlib_ops = {
+ .send_setup = zlib_send_setup,
+ .send_cleanup = zlib_send_cleanup,
+ .send_prepare = zlib_send_prepare,
+ .send_write = zlib_send_write,
+ .recv_setup = zlib_recv_setup,
+ .recv_cleanup = zlib_recv_cleanup,
+ .recv_pages = zlib_recv_pages
+};
+
static MultiFDMethods *multifd_ops[MULTIFD_COMPRESS__MAX] = {
[MULTIFD_COMPRESS_NONE] = &multifd_nocomp_ops,
+ [MULTIFD_COMPRESS_ZLIB] = &multifd_zlib_ops,
};
static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
diff --git a/qapi/migration.json b/qapi/migration.json
index 153527e120..085eba8f07 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -493,7 +493,7 @@
#
##
{ 'enum': 'MultifdCompress',
- 'data': [ 'none' ] }
+ 'data': [ 'none', 'zlib' ] }
##
# @MigrationParameter:
diff --git a/tests/migration-test.c b/tests/migration-test.c
index e5b8125e1c..e6995ae4e7 100644
--- a/tests/migration-test.c
+++ b/tests/migration-test.c
@@ -1219,6 +1219,11 @@ static void test_multifd_tcp_none(void)
test_multifd_tcp("none");
}
+static void test_multifd_tcp_zlib(void)
+{
+ test_multifd_tcp("zlib");
+}
+
int main(int argc, char **argv)
{
char template[] = "/tmp/migration-test-XXXXXX";
@@ -1275,6 +1280,7 @@ int main(int argc, char **argv)
qtest_add_func("/migration/xbzrle/unix", test_xbzrle_unix);
qtest_add_func("/migration/fd_proto", test_migrate_fd_proto);
qtest_add_func("/migration/multifd/tcp/none", test_multifd_tcp_none);
+ qtest_add_func("/migration/multifd/tcp/zlib", test_multifd_tcp_zlib);
ret = g_test_run();
--
2.21.0
^ permalink raw reply related [flat|nested] 10+ messages in thread
* Re: [Qemu-devel] [PATCH v4 5/6] migration: Make no compression operations into its own structure
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 5/6] migration: Make no compression operations into its own structure Juan Quintela
@ 2019-06-14 11:26 ` Dr. David Alan Gilbert
2019-06-14 17:33 ` Juan Quintela
0 siblings, 1 reply; 10+ messages in thread
From: Dr. David Alan Gilbert @ 2019-06-14 11:26 UTC (permalink / raw)
To: Juan Quintela
Cc: Laurent Vivier, Thomas Huth, qemu-devel, Markus Armbruster,
Paolo Bonzini
* Juan Quintela (quintela@redhat.com) wrote:
> It will be used later.
>
> Signed-off-by: Juan Quintela <quintela@redhat.com>
>
> ---
> Move setup of ->ops helper to proper place (wei)
> Rename s/none/nocomp/ (dave)
> ---
> migration/migration.c | 9 ++
> migration/migration.h | 1 +
> migration/ram.c | 188 ++++++++++++++++++++++++++++++++++++++++--
> 3 files changed, 190 insertions(+), 8 deletions(-)
>
> diff --git a/migration/migration.c b/migration/migration.c
> index 3f17d8f2f8..a3526d395b 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -2174,6 +2174,15 @@ int migrate_multifd_channels(void)
> return s->parameters.multifd_channels;
> }
>
> +int migrate_multifd_method(void)
> +{
> + MigrationState *s;
> +
> + s = migrate_get_current();
> +
> + return s->parameters.multifd_compress;
> +}
> +
> int migrate_use_xbzrle(void)
> {
> MigrationState *s;
> diff --git a/migration/migration.h b/migration/migration.h
> index 71c03353c3..437abf3405 100644
> --- a/migration/migration.h
> +++ b/migration/migration.h
> @@ -270,6 +270,7 @@ bool migrate_auto_converge(void);
> bool migrate_use_multifd(void);
> bool migrate_pause_before_switchover(void);
> int migrate_multifd_channels(void);
> +int migrate_multifd_method(void);
>
> int migrate_use_xbzrle(void);
> int64_t migrate_xbzrle_cache_size(void);
> diff --git a/migration/ram.c b/migration/ram.c
> index b0ca989160..3b0002ddba 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -45,6 +45,7 @@
> #include "page_cache.h"
> #include "qemu/error-report.h"
> #include "qapi/error.h"
> +#include "qapi/qapi-types-migration.h"
> #include "qapi/qapi-events-migration.h"
> #include "qapi/qmp/qerror.h"
> #include "trace.h"
> @@ -661,6 +662,8 @@ typedef struct {
> uint64_t num_packets;
> /* pages sent through this channel */
> uint64_t num_pages;
> + /* used for compression methods */
> + void *data;
> } MultiFDSendParams;
>
> typedef struct {
> @@ -696,8 +699,152 @@ typedef struct {
> uint64_t num_pages;
> /* syncs main thread and channels */
> QemuSemaphore sem_sync;
> + /* used for de-compression methods */
> + void *data;
> } MultiFDRecvParams;
>
> +typedef struct {
> + /* Setup for sending side */
> + int (*send_setup)(MultiFDSendParams *p, Error **errp);
> + /* Cleanup for sending side */
> + void (*send_cleanup)(MultiFDSendParams *p);
> + /* Prepare the send packet */
> + int (*send_prepare)(MultiFDSendParams *p, uint32_t used, Error **errp);
> + /* Write the send packet */
> + int (*send_write)(MultiFDSendParams *p, uint32_t used, Error **errp);
> + /* Setup for receiving side */
> + int (*recv_setup)(MultiFDRecvParams *p, Error **errp);
> + /* Cleanup for receiving side */
> + void (*recv_cleanup)(MultiFDRecvParams *p);
> + /* Read all pages */
> + int (*recv_pages)(MultiFDRecvParams *p, uint32_t used, Error **errp);
> +} MultiFDMethods;
> +
> +/* Multifd without compression */
> +
> +/**
> + * nocomp_send_setup: setup send side
> + *
> + * For no compression this function does nothing.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
> +{
> + return 0;
> +}
> +
> +/**
> + * nocomp_send_cleanup: cleanup send side
> + *
> + * For no compression this function does nothing.
> + *
> + * @p: Params for the channel that we are using
> + */
> +static void nocomp_send_cleanup(MultiFDSendParams *p)
> +{
> + return;
> +}
> +
> +/**
> + * nocomp_send_prepare: prepare date to be able to send
> + *
> + * For no compression we just have to calculate the size of the
> + * packet.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @used: number of pages used
> + * @errp: pointer to an error
> + */
> +static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used,
> + Error **errp)
> +{
> + p->next_packet_size = used * qemu_target_page_size();
> + return 0;
> +}
> +
> +/**
> + * nocomp_send_write: do the actual write of the data
> + *
> + * For no compression we just have to write the data.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @used: number of pages used
> + * @errp: pointer to an error
> + */
> +static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
> +{
> + return qio_channel_writev_all(p->c, p->pages->iov, used, errp);
> +}
> +
> +/**
> + * nocomp_recv_setup: setup receive side
> + *
> + * For no compression this function does nothing.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
> +{
> + return 0;
> +}
> +
> +/**
> + * nocomp_recv_cleanup: setup receive side
> + *
> + * For no compression this function does nothing.
> + *
> + * @p: Params for the channel that we are using
> + */
> +static void nocomp_recv_cleanup(MultiFDRecvParams *p)
> +{
> +}
> +
> +/**
> + * nocomp_recv_pages: read the data from the channel into actual pages
> + *
> + * For no compression we just need to read things into the correct place.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @used: number of pages used
> + * @errp: pointer to an error
> + */
> +static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
> +{
> + if (p->flags != 0) {
> + error_setg(errp, "multifd %d: flags received %x flags expected %x",
> + p->id, MULTIFD_FLAG_ZLIB, p->flags);
Can you just explain that a bit - the 'received' seems to be constant
while the expected is p->flags - is that the right way around?
Why would you expect FLAG_ZLIB in nocomp?
> + return -1;
> + }
> + return qio_channel_readv_all(p->c, p->pages->iov, used, errp);
> +}
> +
> +static MultiFDMethods multifd_nocomp_ops = {
> + .send_setup = nocomp_send_setup,
> + .send_cleanup = nocomp_send_cleanup,
> + .send_prepare = nocomp_send_prepare,
> + .send_write = nocomp_send_write,
> + .recv_setup = nocomp_recv_setup,
> + .recv_cleanup = nocomp_recv_cleanup,
> + .recv_pages = nocomp_recv_pages
> +};
> +
> +static MultiFDMethods *multifd_ops[MULTIFD_COMPRESS__MAX] = {
> + [MULTIFD_COMPRESS_NONE] = &multifd_nocomp_ops,
> +};
> +
> static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
> {
> MultiFDInit_t msg;
> @@ -900,6 +1047,8 @@ struct {
> uint64_t packet_num;
> /* send channels ready */
> QemuSemaphore channels_ready;
> + /* multifd ops */
> + MultiFDMethods *ops;
> } *multifd_send_state;
>
> /*
> @@ -1030,6 +1179,7 @@ void multifd_save_cleanup(void)
> p->packet_len = 0;
> g_free(p->packet);
> p->packet = NULL;
> + multifd_send_state->ops->send_cleanup(p);
> }
> qemu_sem_destroy(&multifd_send_state->channels_ready);
> qemu_sem_destroy(&multifd_send_state->sem_sync);
> @@ -1097,7 +1247,14 @@ static void *multifd_send_thread(void *opaque)
> uint64_t packet_num = p->packet_num;
> uint32_t flags = p->flags;
>
> - p->next_packet_size = used * qemu_target_page_size();
> + if (used) {
> + ret = multifd_send_state->ops->send_prepare(p, used,
> + &local_err);
> + if (ret != 0) {
> + qemu_mutex_unlock(&p->mutex);
> + break;
> + }
> + }
> multifd_send_fill_packet(p);
> p->flags = 0;
> p->num_packets++;
> @@ -1115,8 +1272,7 @@ static void *multifd_send_thread(void *opaque)
> }
>
> if (used) {
> - ret = qio_channel_writev_all(p->c, p->pages->iov,
> - used, &local_err);
> + ret = multifd_send_state->ops->send_write(p, used, &local_err);
> if (ret != 0) {
> break;
> }
> @@ -1176,6 +1332,7 @@ int multifd_save_setup(Error **errp)
> {
> int thread_count;
> uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
> + int ret = 0;
> uint8_t i;
>
> if (!migrate_use_multifd()) {
> @@ -1187,9 +1344,11 @@ int multifd_save_setup(Error **errp)
> multifd_send_state->pages = multifd_pages_init(page_count);
> qemu_sem_init(&multifd_send_state->sem_sync, 0);
> qemu_sem_init(&multifd_send_state->channels_ready, 0);
> + multifd_send_state->ops = multifd_ops[migrate_multifd_method()];
>
> for (i = 0; i < thread_count; i++) {
> MultiFDSendParams *p = &multifd_send_state->params[i];
> + int res;
>
> qemu_mutex_init(&p->mutex);
> qemu_sem_init(&p->sem, 0);
> @@ -1202,8 +1361,12 @@ int multifd_save_setup(Error **errp)
> p->packet = g_malloc0(p->packet_len);
> p->name = g_strdup_printf("multifdsend_%d", i);
> socket_send_channel_create(multifd_new_send_channel_async, p);
> + res = multifd_send_state->ops->send_setup(p, errp);
> + if (ret == 0) {
> + ret = res;
> + }
> }
> - return 0;
> + return ret;
> }
>
> struct {
> @@ -1214,6 +1377,8 @@ struct {
> QemuSemaphore sem_sync;
> /* global number of generated multifd packets */
> uint64_t packet_num;
> + /* multifd ops */
> + MultiFDMethods *ops;
> } *multifd_recv_state;
>
> static void multifd_recv_terminate_threads(Error *err)
> @@ -1246,7 +1411,6 @@ static void multifd_recv_terminate_threads(Error *err)
> int multifd_load_cleanup(Error **errp)
> {
> int i;
> - int ret = 0;
>
> if (!migrate_use_multifd()) {
> return 0;
> @@ -1269,6 +1433,7 @@ int multifd_load_cleanup(Error **errp)
> p->packet_len = 0;
> g_free(p->packet);
> p->packet = NULL;
> + multifd_recv_state->ops->recv_cleanup(p);
> }
> qemu_sem_destroy(&multifd_recv_state->sem_sync);
> g_free(multifd_recv_state->params);
> @@ -1276,7 +1441,7 @@ int multifd_load_cleanup(Error **errp)
> g_free(multifd_recv_state);
> multifd_recv_state = NULL;
>
> - return ret;
> + return 0;
> }
>
> static void multifd_recv_sync_main(void)
> @@ -1337,6 +1502,8 @@ static void *multifd_recv_thread(void *opaque)
>
> used = p->pages->used;
> flags = p->flags;
> + /* recv methods don't know how to handle the SYNC flag */
> + p->flags &= ~MULTIFD_FLAG_SYNC;
> trace_multifd_recv(p->id, p->packet_num, used, flags,
> p->next_packet_size);
> p->num_packets++;
> @@ -1344,8 +1511,7 @@ static void *multifd_recv_thread(void *opaque)
> qemu_mutex_unlock(&p->mutex);
>
> if (used) {
> - ret = qio_channel_readv_all(p->c, p->pages->iov,
> - used, &local_err);
> + ret = multifd_recv_state->ops->recv_pages(p, used, &local_err);
> if (ret != 0) {
> break;
> }
> @@ -1384,9 +1550,11 @@ int multifd_load_setup(Error **errp)
> multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
> atomic_set(&multifd_recv_state->count, 0);
> qemu_sem_init(&multifd_recv_state->sem_sync, 0);
> + multifd_recv_state->ops = multifd_ops[migrate_multifd_method()];
>
> for (i = 0; i < thread_count; i++) {
> MultiFDRecvParams *p = &multifd_recv_state->params[i];
> + int ret;
>
> qemu_mutex_init(&p->mutex);
> qemu_sem_init(&p->sem_sync, 0);
> @@ -1396,6 +1564,10 @@ int multifd_load_setup(Error **errp)
> + sizeof(ram_addr_t) * page_count;
> p->packet = g_malloc0(p->packet_len);
> p->name = g_strdup_printf("multifdrecv_%d", i);
> + ret = multifd_recv_state->ops->recv_setup(p, errp);
> + if (ret != 0) {
> + return ret;
> + }
> }
> return 0;
> }
> --
> 2.21.0
>
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
^ permalink raw reply [flat|nested] 10+ messages in thread
* Re: [Qemu-devel] [PATCH v4 6/6] migration: Add zlib compression multifd support
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 6/6] migration: Add zlib compression multifd support Juan Quintela
@ 2019-06-14 12:56 ` Dr. David Alan Gilbert
0 siblings, 0 replies; 10+ messages in thread
From: Dr. David Alan Gilbert @ 2019-06-14 12:56 UTC (permalink / raw)
To: Juan Quintela
Cc: Laurent Vivier, Thomas Huth, qemu-devel, Markus Armbruster,
Paolo Bonzini
* Juan Quintela (quintela@redhat.com) wrote:
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
> hw/core/qdev-properties.c | 2 +-
> migration/ram.c | 262 ++++++++++++++++++++++++++++++++++++++
> qapi/migration.json | 2 +-
> tests/migration-test.c | 6 +
> 4 files changed, 270 insertions(+), 2 deletions(-)
>
> diff --git a/hw/core/qdev-properties.c b/hw/core/qdev-properties.c
> index ebeeb5c88d..e40aa806e2 100644
> --- a/hw/core/qdev-properties.c
> +++ b/hw/core/qdev-properties.c
> @@ -651,7 +651,7 @@ const PropertyInfo qdev_prop_fdc_drive_type = {
> const PropertyInfo qdev_prop_multifd_compress = {
> .name = "MultifdCompress",
> .description = "multifd_compress values, "
> - "none",
> + "none/zlib",
> .enum_table = &MultifdCompress_lookup,
> .get = get_enum,
> .set = set_enum,
> diff --git a/migration/ram.c b/migration/ram.c
> index 3b0002ddba..691ebd9108 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -583,6 +583,7 @@ exit:
> #define MULTIFD_VERSION 1
>
> #define MULTIFD_FLAG_SYNC (1 << 0)
> +#define MULTIFD_FLAG_ZLIB (1 << 1)
>
> /* This value needs to be a multiple of qemu_target_page_size() */
> #define MULTIFD_PACKET_SIZE (512 * 1024)
> @@ -625,6 +626,15 @@ typedef struct {
> RAMBlock *block;
> } MultiFDPages_t;
>
> +struct zlib_data {
> + /* stream for compression */
> + z_stream zs;
> + /* compressed buffer */
> + uint8_t *zbuff;
> + /* size of compressed buffer */
> + uint32_t zbuff_len;
> +};
> +
> typedef struct {
> /* this fields are not changed once the thread is created */
> /* channel number */
> @@ -841,8 +851,260 @@ static MultiFDMethods multifd_nocomp_ops = {
> .recv_pages = nocomp_recv_pages
> };
>
> +/* Multifd zlib compression */
> +
> +/**
> + * zlib_send_setup: setup send side
> + *
> + * Setup each channel with zlib compression.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int zlib_send_setup(MultiFDSendParams *p, Error **errp)
> +{
> + uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
> + struct zlib_data *z = g_malloc0(sizeof(struct zlib_data));
> + z_stream *zs = &z->zs;
> +
> + p->data = z;
> + zs->zalloc = Z_NULL;
> + zs->zfree = Z_NULL;
> + zs->opaque = Z_NULL;
> + if (deflateInit(zs, migrate_compress_level()) != Z_OK) {
> + error_setg(errp, "multifd %d: deflate init failed", p->id);
> + return -1;
We're leaking 'z' here? Or does zlib_send_cleanup happy?
Other than that I think we're OK.
Dave
> + }
> + /* We will never have more than page_count pages */
> + z->zbuff_len = page_count * qemu_target_page_size();
> + z->zbuff_len *= 2;
> + z->zbuff = g_try_malloc(z->zbuff_len);
> + if (!z->zbuff) {
> + error_setg(errp, "multifd %d: out of memory for zbuff", p->id);
> + return -1;
> + }
> + return 0;
> +}
> +
> +/**
> + * zlib_send_cleanup: cleanup send side
> + *
> + * Close the channel and return memory.
> + *
> + * @p: Params for the channel that we are using
> + */
> +static void zlib_send_cleanup(MultiFDSendParams *p)
> +{
> + struct zlib_data *z = p->data;
> +
> + deflateEnd(&z->zs);
> + g_free(z->zbuff);
> + z->zbuff = NULL;
> + g_free(p->data);
> + p->data = NULL;
> +}
> +
> +/**
> + * zlib_send_prepare: prepare date to be able to send
> + *
> + * Create a compressed buffer with all the pages that we are going to
> + * send.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @used: number of pages used
> + */
> +static int zlib_send_prepare(MultiFDSendParams *p, uint32_t used, Error **errp)
> +{
> + struct iovec *iov = p->pages->iov;
> + struct zlib_data *z = p->data;
> + z_stream *zs = &z->zs;
> + uint32_t out_size = 0;
> + int ret;
> + uint32_t i;
> +
> + for (i = 0; i < used; i++) {
> + uint32_t available = z->zbuff_len - out_size;
> + int flush = Z_NO_FLUSH;
> +
> + if (i == used - 1) {
> + flush = Z_SYNC_FLUSH;
> + }
> +
> + zs->avail_in = iov[i].iov_len;
> + zs->next_in = iov[i].iov_base;
> +
> + zs->avail_out = available;
> + zs->next_out = z->zbuff + out_size;
> +
> + ret = deflate(zs, flush);
> + if (ret != Z_OK) {
> + error_setg(errp, "multifd %d: deflate returned %d instead of Z_OK",
> + p->id, ret);
> + return -1;
> + }
> + out_size += available - zs->avail_out;
> + }
> + p->next_packet_size = out_size;
> + p->flags |= MULTIFD_FLAG_ZLIB;
> +
> + return 0;
> +}
> +
> +/**
> + * zlib_send_write: do the actual write of the data
> + *
> + * Do the actual write of the comprresed buffer.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @used: number of pages used
> + * @errp: pointer to an error
> + */
> +static int zlib_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
> +{
> + struct zlib_data *z = p->data;
> +
> + return qio_channel_write_all(p->c, (void *)z->zbuff, p->next_packet_size,
> + errp);
> +}
> +
> +/**
> + * zlib_recv_setup: setup receive side
> + *
> + * Create the compressed channel and buffer.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int zlib_recv_setup(MultiFDRecvParams *p, Error **errp)
> +{
> + uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
> + struct zlib_data *z = g_malloc0(sizeof(struct zlib_data));
> + z_stream *zs = &z->zs;
> +
> + p->data = z;
> + zs->zalloc = Z_NULL;
> + zs->zfree = Z_NULL;
> + zs->opaque = Z_NULL;
> + zs->avail_in = 0;
> + zs->next_in = Z_NULL;
> + if (inflateInit(zs) != Z_OK) {
> + error_setg(errp, "multifd %d: inflate init failed", p->id);
> + return -1;
> + }
> + /* We will never have more than page_count pages */
> + z->zbuff_len = page_count * qemu_target_page_size();
> + /* We know compression "could" use more space */
> + z->zbuff_len *= 2;
> + z->zbuff = g_try_malloc(z->zbuff_len);
> + if (!z->zbuff) {
> + error_setg(errp, "multifd %d: out of memory for zbuff", p->id);
> + return -1;
> + }
> + return 0;
> +}
> +
> +/**
> + * zlib_recv_cleanup: setup receive side
> + *
> + * For no compression this function does nothing.
> + *
> + * @p: Params for the channel that we are using
> + */
> +static void zlib_recv_cleanup(MultiFDRecvParams *p)
> +{
> + struct zlib_data *z = p->data;
> +
> + inflateEnd(&z->zs);
> + g_free(z->zbuff);
> + z->zbuff = NULL;
> + g_free(p->data);
> + p->data = NULL;
> +}
> +
> +/**
> + * zlib_recv_pages: read the data from the channel into actual pages
> + *
> + * Read the compressed buffer, and uncompress it into the actual
> + * pages.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @used: number of pages used
> + * @errp: pointer to an error
> + */
> +static int zlib_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
> +{
> + uint32_t in_size = p->next_packet_size;
> + uint32_t out_size = 0;
> + uint32_t expected_size = used * qemu_target_page_size();
> + struct zlib_data *z = p->data;
> + z_stream *zs = &z->zs;
> + int ret;
> + int i;
> +
> + if (p->flags != MULTIFD_FLAG_ZLIB) {
> + error_setg(errp, "multifd %d: flags received %x flags expected %x",
> + p->id, MULTIFD_FLAG_ZLIB, p->flags);
> + return -1;
> + }
> + ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp);
> +
> + if (ret != 0) {
> + return ret;
> + }
> +
> + zs->avail_in = in_size;
> + zs->next_in = z->zbuff;
> +
> + for (i = 0; i < used; i++) {
> + struct iovec *iov = &p->pages->iov[i];
> + int flush = Z_NO_FLUSH;
> +
> + if (i == used - 1) {
> + flush = Z_SYNC_FLUSH;
> + }
> +
> + zs->avail_out = iov->iov_len;
> + zs->next_out = iov->iov_base;
> +
> + ret = inflate(zs, flush);
> + if (ret != Z_OK) {
> + error_setg(errp, "multifd %d: inflate returned %d instead of Z_OK",
> + p->id, ret);
> + return ret;
> + }
> + out_size += iov->iov_len;
> + }
> + if (out_size != expected_size) {
> + error_setg(errp, "multifd %d: packet size received %d size expected %d",
> + p->id, out_size, expected_size);
> + return -1;
> + }
> + return 0;
> +}
> +
> +static MultiFDMethods multifd_zlib_ops = {
> + .send_setup = zlib_send_setup,
> + .send_cleanup = zlib_send_cleanup,
> + .send_prepare = zlib_send_prepare,
> + .send_write = zlib_send_write,
> + .recv_setup = zlib_recv_setup,
> + .recv_cleanup = zlib_recv_cleanup,
> + .recv_pages = zlib_recv_pages
> +};
> +
> static MultiFDMethods *multifd_ops[MULTIFD_COMPRESS__MAX] = {
> [MULTIFD_COMPRESS_NONE] = &multifd_nocomp_ops,
> + [MULTIFD_COMPRESS_ZLIB] = &multifd_zlib_ops,
> };
>
> static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
> diff --git a/qapi/migration.json b/qapi/migration.json
> index 153527e120..085eba8f07 100644
> --- a/qapi/migration.json
> +++ b/qapi/migration.json
> @@ -493,7 +493,7 @@
> #
> ##
> { 'enum': 'MultifdCompress',
> - 'data': [ 'none' ] }
> + 'data': [ 'none', 'zlib' ] }
>
> ##
> # @MigrationParameter:
> diff --git a/tests/migration-test.c b/tests/migration-test.c
> index e5b8125e1c..e6995ae4e7 100644
> --- a/tests/migration-test.c
> +++ b/tests/migration-test.c
> @@ -1219,6 +1219,11 @@ static void test_multifd_tcp_none(void)
> test_multifd_tcp("none");
> }
>
> +static void test_multifd_tcp_zlib(void)
> +{
> + test_multifd_tcp("zlib");
> +}
> +
> int main(int argc, char **argv)
> {
> char template[] = "/tmp/migration-test-XXXXXX";
> @@ -1275,6 +1280,7 @@ int main(int argc, char **argv)
> qtest_add_func("/migration/xbzrle/unix", test_xbzrle_unix);
> qtest_add_func("/migration/fd_proto", test_migrate_fd_proto);
> qtest_add_func("/migration/multifd/tcp/none", test_multifd_tcp_none);
> + qtest_add_func("/migration/multifd/tcp/zlib", test_multifd_tcp_zlib);
>
> ret = g_test_run();
>
> --
> 2.21.0
>
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
^ permalink raw reply [flat|nested] 10+ messages in thread
* Re: [Qemu-devel] [PATCH v4 5/6] migration: Make no compression operations into its own structure
2019-06-14 11:26 ` Dr. David Alan Gilbert
@ 2019-06-14 17:33 ` Juan Quintela
0 siblings, 0 replies; 10+ messages in thread
From: Juan Quintela @ 2019-06-14 17:33 UTC (permalink / raw)
To: Dr. David Alan Gilbert
Cc: Laurent Vivier, Thomas Huth, qemu-devel, Markus Armbruster,
Paolo Bonzini
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> It will be used later.
>>
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> + */
>> +static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
>> +{
>> + if (p->flags != 0) {
>> + error_setg(errp, "multifd %d: flags received %x flags expected %x",
>> + p->id, MULTIFD_FLAG_ZLIB, p->flags);
>
> Can you just explain that a bit - the 'received' seems to be constant
> while the expected is p->flags - is that the right way around?
> Why would you expect FLAG_ZLIB in nocomp?
When I changed printf's to error_setg I did a bit *too much* of copy
paste. Then I decided to put consistently received/expected in all
messages and clearly I failed.
Fixing, Thanks.
^ permalink raw reply [flat|nested] 10+ messages in thread
end of thread, other threads:[~2019-06-14 18:58 UTC | newest]
Thread overview: 10+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-06-12 10:53 [Qemu-devel] [PATCH v4 0/6] Multifd compression support Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 1/6] migration-test: introduce functions to handle string parameters Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 2/6] migration: Make multifd_save_setup() get an Error parameter Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 3/6] migration: Make multifd_load_setup() " Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 4/6] migration: Add multifd-compress parameter Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 5/6] migration: Make no compression operations into its own structure Juan Quintela
2019-06-14 11:26 ` Dr. David Alan Gilbert
2019-06-14 17:33 ` Juan Quintela
2019-06-12 10:53 ` [Qemu-devel] [PATCH v4 6/6] migration: Add zlib compression multifd support Juan Quintela
2019-06-14 12:56 ` Dr. David Alan Gilbert
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).