All of lore.kernel.org
 help / color / mirror / Atom feed
* [PULL 00/18] Pull migration patches
@ 2020-01-29 11:15 Juan Quintela
  2020-01-29 11:15 ` [PULL 01/18] migration-test: Use g_free() instead of free() Juan Quintela
                   ` (18 more replies)
  0 siblings, 19 replies; 20+ messages in thread
From: Juan Quintela @ 2020-01-29 11:15 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Paolo Bonzini, Thomas Huth,
	Dr. David Alan Gilbert, Juan Quintela

The following changes since commit 4c60e3289875ae6c516a37523bcecb87f68ce67c:

  Merge remote-tracking branch 'remotes/rth/tags/pull-pa-20200127' into staging (2020-01-28 15:11:04 +0000)

are available in the Git repository at:

  https://github.com/juanquintela/qemu.git tags/pull-migration-pull-request

for you to fetch changes up to 42d24611afc7610808ecb8770cf40e84714dd28e:

  migration/compress: compress QEMUFile is not writable (2020-01-29 11:28:59 +0100)

----------------------------------------------------------------
Migration pull request

(this is a rerun of the previous pull request without the --- bits and
rebased to latest)

This pull request include:                                                            |
- simplify get_qlist (eric)                                                           |
- fix null in multifd_send_terminate_threads (zhimin)                                 |
- small fix for compress (wei)                                                        |
- migrate multifd + cancel fixes (juan)                                               |
- migrate compression: the bits that are reviewed (juan)    

----------------------------------------------------------------

Eric Auger (1):
  migration: Simplify get_qlist

Juan Quintela (15):
  migration-test: Use g_free() instead of free()
  multifd: Make sure that we don't do any IO after an error
  qemu-file: Don't do IO after shutdown
  migration: Don't send data if we have stopped
  migration-test: Make sure that multifd and cancel works
  migration: Create migration_is_running()
  ram_addr: Split RAMBlock definition
  multifd: multifd_send_pages only needs the qemufile
  multifd: multifd_queue_page only needs the qemufile
  multifd: multifd_send_sync_main only needs the qemufile
  multifd: Use qemu_target_page_size()
  migration: Make checkpatch happy with comments
  multifd: Make multifd_save_setup() get an Error parameter
  multifd: Make multifd_load_setup() get an Error parameter
  multifd: Split multifd code into its own file

Wei Yang (1):
  migration/compress: compress QEMUFile is not writable

Zhimin Feng (1):
  migration/multifd: fix nullptr access in
    multifd_send_terminate_threads

 MAINTAINERS                  |    1 +
 include/exec/ram_addr.h      |   40 +-
 include/exec/ramblock.h      |   64 +++
 include/qemu/queue.h         |   19 +-
 migration/Makefile.objs      |    1 +
 migration/migration.c        |   69 ++-
 migration/migration.h        |    3 +-
 migration/multifd.c          |  899 ++++++++++++++++++++++++++++++
 migration/multifd.h          |  139 +++++
 migration/qemu-file.c        |   38 +-
 migration/ram.c              | 1004 +---------------------------------
 migration/ram.h              |    7 -
 migration/rdma.c             |    2 +-
 migration/savevm.c           |    4 +-
 migration/vmstate-types.c    |   10 +-
 tests/qtest/migration-test.c |  114 +++-
 16 files changed, 1332 insertions(+), 1082 deletions(-)
 create mode 100644 include/exec/ramblock.h
 create mode 100644 migration/multifd.c
 create mode 100644 migration/multifd.h

-- 
2.24.1



^ permalink raw reply	[flat|nested] 20+ messages in thread

* [PULL 01/18] migration-test: Use g_free() instead of free()
  2020-01-29 11:15 [PULL 00/18] Pull migration patches Juan Quintela
@ 2020-01-29 11:15 ` Juan Quintela
  2020-01-29 11:15 ` [PULL 02/18] multifd: Make sure that we don't do any IO after an error Juan Quintela
                   ` (17 subsequent siblings)
  18 siblings, 0 replies; 20+ messages in thread
From: Juan Quintela @ 2020-01-29 11:15 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Thomas Huth, Juan Quintela,
	Dr. David Alan Gilbert, Paolo Bonzini,
	Philippe Mathieu-Daudé

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Thomas Huth <thuth@redhat.com>
Reviewed-by: Philippe Mathieu-Daudé <philmd@redhat.com>
---
 tests/qtest/migration-test.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tests/qtest/migration-test.c b/tests/qtest/migration-test.c
index 26e2e77289..b6a74a05ce 100644
--- a/tests/qtest/migration-test.c
+++ b/tests/qtest/migration-test.c
@@ -1291,7 +1291,7 @@ static void test_multifd_tcp(void)
     wait_for_serial("dest_serial");
     wait_for_migration_complete(from);
     test_migrate_end(from, to, true);
-    free(uri);
+    g_free(uri);
 }
 
 int main(int argc, char **argv)
-- 
2.24.1



^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PULL 02/18] multifd: Make sure that we don't do any IO after an error
  2020-01-29 11:15 [PULL 00/18] Pull migration patches Juan Quintela
  2020-01-29 11:15 ` [PULL 01/18] migration-test: Use g_free() instead of free() Juan Quintela
@ 2020-01-29 11:15 ` Juan Quintela
  2020-01-29 11:15 ` [PULL 03/18] qemu-file: Don't do IO after shutdown Juan Quintela
                   ` (16 subsequent siblings)
  18 siblings, 0 replies; 20+ messages in thread
From: Juan Quintela @ 2020-01-29 11:15 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Paolo Bonzini, Thomas Huth,
	Dr. David Alan Gilbert, Juan Quintela

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/ram.c | 22 +++++++++++++---------
 1 file changed, 13 insertions(+), 9 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index d2208b5534..f95d656c26 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -3445,7 +3445,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
 {
     RAMState **temp = opaque;
     RAMState *rs = *temp;
-    int ret;
+    int ret = 0;
     int i;
     int64_t t0;
     int done = 0;
@@ -3524,12 +3524,14 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
     ram_control_after_iterate(f, RAM_CONTROL_ROUND);
 
 out:
-    multifd_send_sync_main(rs);
-    qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
-    qemu_fflush(f);
-    ram_counters.transferred += 8;
+    if (ret >= 0) {
+        multifd_send_sync_main(rs);
+        qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
+        qemu_fflush(f);
+        ram_counters.transferred += 8;
 
-    ret = qemu_file_get_error(f);
+        ret = qemu_file_get_error(f);
+    }
     if (ret < 0) {
         return ret;
     }
@@ -3581,9 +3583,11 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
         ram_control_after_iterate(f, RAM_CONTROL_FINISH);
     }
 
-    multifd_send_sync_main(rs);
-    qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
-    qemu_fflush(f);
+    if (ret >= 0) {
+        multifd_send_sync_main(rs);
+        qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
+        qemu_fflush(f);
+    }
 
     return ret;
 }
-- 
2.24.1



^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PULL 03/18] qemu-file: Don't do IO after shutdown
  2020-01-29 11:15 [PULL 00/18] Pull migration patches Juan Quintela
  2020-01-29 11:15 ` [PULL 01/18] migration-test: Use g_free() instead of free() Juan Quintela
  2020-01-29 11:15 ` [PULL 02/18] multifd: Make sure that we don't do any IO after an error Juan Quintela
@ 2020-01-29 11:15 ` Juan Quintela
  2020-01-29 11:15 ` [PULL 04/18] migration: Don't send data if we have stopped Juan Quintela
                   ` (15 subsequent siblings)
  18 siblings, 0 replies; 20+ messages in thread
From: Juan Quintela @ 2020-01-29 11:15 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Paolo Bonzini, Thomas Huth,
	Dr. David Alan Gilbert, Juan Quintela

Be sure that we are not doing neither read/write after shutdown of the
QEMUFile.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/qemu-file.c | 22 +++++++++++++++++++++-
 1 file changed, 21 insertions(+), 1 deletion(-)

diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index 26fb25ddc1..bbb2b63927 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -53,6 +53,8 @@ struct QEMUFile {
 
     int last_error;
     Error *last_error_obj;
+    /* has the file has been shutdown */
+    bool shutdown;
 };
 
 /*
@@ -61,10 +63,18 @@ struct QEMUFile {
  */
 int qemu_file_shutdown(QEMUFile *f)
 {
+    int ret;
+
+    f->shutdown = true;
     if (!f->ops->shut_down) {
         return -ENOSYS;
     }
-    return f->ops->shut_down(f->opaque, true, true, NULL);
+    ret = f->ops->shut_down(f->opaque, true, true, NULL);
+
+    if (!f->last_error) {
+        qemu_file_set_error(f, -EIO);
+    }
+    return ret;
 }
 
 /*
@@ -214,6 +224,9 @@ void qemu_fflush(QEMUFile *f)
         return;
     }
 
+    if (f->shutdown) {
+        return;
+    }
     if (f->iovcnt > 0) {
         expect = iov_size(f->iov, f->iovcnt);
         ret = f->ops->writev_buffer(f->opaque, f->iov, f->iovcnt, f->pos,
@@ -328,6 +341,10 @@ static ssize_t qemu_fill_buffer(QEMUFile *f)
     f->buf_index = 0;
     f->buf_size = pending;
 
+    if (f->shutdown) {
+        return 0;
+    }
+
     len = f->ops->get_buffer(f->opaque, f->buf + pending, f->pos,
                              IO_BUF_SIZE - pending, &local_error);
     if (len > 0) {
@@ -642,6 +659,9 @@ int64_t qemu_ftell(QEMUFile *f)
 
 int qemu_file_rate_limit(QEMUFile *f)
 {
+    if (f->shutdown) {
+        return 1;
+    }
     if (qemu_file_get_error(f)) {
         return 1;
     }
-- 
2.24.1



^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PULL 04/18] migration: Don't send data if we have stopped
  2020-01-29 11:15 [PULL 00/18] Pull migration patches Juan Quintela
                   ` (2 preceding siblings ...)
  2020-01-29 11:15 ` [PULL 03/18] qemu-file: Don't do IO after shutdown Juan Quintela
@ 2020-01-29 11:15 ` Juan Quintela
  2020-01-29 11:15 ` [PULL 05/18] migration-test: Make sure that multifd and cancel works Juan Quintela
                   ` (14 subsequent siblings)
  18 siblings, 0 replies; 20+ messages in thread
From: Juan Quintela @ 2020-01-29 11:15 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Paolo Bonzini, Thomas Huth,
	Dr. David Alan Gilbert, Juan Quintela

If we do a cancel, we got out without one error, but we can't do the
rest of the output as in a normal situation.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/ram.c | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/migration/ram.c b/migration/ram.c
index f95d656c26..3fd7fdffcf 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -3524,7 +3524,8 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
     ram_control_after_iterate(f, RAM_CONTROL_ROUND);
 
 out:
-    if (ret >= 0) {
+    if (ret >= 0
+        && migration_is_setup_or_active(migrate_get_current()->state)) {
         multifd_send_sync_main(rs);
         qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
         qemu_fflush(f);
-- 
2.24.1



^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PULL 05/18] migration-test: Make sure that multifd and cancel works
  2020-01-29 11:15 [PULL 00/18] Pull migration patches Juan Quintela
                   ` (3 preceding siblings ...)
  2020-01-29 11:15 ` [PULL 04/18] migration: Don't send data if we have stopped Juan Quintela
@ 2020-01-29 11:15 ` Juan Quintela
  2020-01-29 11:15 ` [PULL 06/18] migration: Create migration_is_running() Juan Quintela
                   ` (13 subsequent siblings)
  18 siblings, 0 replies; 20+ messages in thread
From: Juan Quintela @ 2020-01-29 11:15 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Paolo Bonzini, Thomas Huth,
	Dr. David Alan Gilbert, Juan Quintela

Test that this sequence works:

- launch source
- launch target
- start migration
- cancel migration
- relaunch target
- do migration again

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 tests/qtest/migration-test.c | 112 ++++++++++++++++++++++++++++++++++-
 1 file changed, 111 insertions(+), 1 deletion(-)

diff --git a/tests/qtest/migration-test.c b/tests/qtest/migration-test.c
index b6a74a05ce..cf27ebbc9d 100644
--- a/tests/qtest/migration-test.c
+++ b/tests/qtest/migration-test.c
@@ -424,6 +424,14 @@ static void migrate_recover(QTestState *who, const char *uri)
     qobject_unref(rsp);
 }
 
+static void migrate_cancel(QTestState *who)
+{
+    QDict *rsp;
+
+    rsp = wait_command(who, "{ 'execute': 'migrate_cancel' }");
+    qobject_unref(rsp);
+}
+
 static void migrate_set_capability(QTestState *who, const char *capability,
                                    bool value)
 {
@@ -456,6 +464,8 @@ static void migrate_postcopy_start(QTestState *from, QTestState *to)
 typedef struct {
     bool hide_stderr;
     bool use_shmem;
+    /* only launch the target process */
+    bool only_target;
     char *opts_source;
     char *opts_target;
 } MigrateStart;
@@ -571,7 +581,9 @@ static int test_migrate_start(QTestState **from, QTestState **to,
                                  arch_source, shmem_opts, args->opts_source,
                                  ignore_stderr);
     g_free(arch_source);
-    *from = qtest_init(cmd_source);
+    if (!args->only_target) {
+        *from = qtest_init(cmd_source);
+    }
     g_free(cmd_source);
 
     cmd_target = g_strdup_printf("-accel kvm -accel tcg%s%s "
@@ -1294,6 +1306,103 @@ static void test_multifd_tcp(void)
     g_free(uri);
 }
 
+/*
+ * This test does:
+ *  source               target
+ *                       migrate_incoming
+ *     migrate
+ *     migrate_cancel
+ *                       launch another target
+ *     migrate
+ *
+ *  And see that it works
+ */
+
+static void test_multifd_tcp_cancel(void)
+{
+    MigrateStart *args = migrate_start_new();
+    QTestState *from, *to, *to2;
+    QDict *rsp;
+    char *uri;
+
+    args->hide_stderr = true;
+
+    if (test_migrate_start(&from, &to, "defer", args)) {
+        return;
+    }
+
+    /*
+     * We want to pick a speed slow enough that the test completes
+     * quickly, but that it doesn't complete precopy even on a slow
+     * machine, so also set the downtime.
+     */
+    /* 1 ms should make it not converge*/
+    migrate_set_parameter_int(from, "downtime-limit", 1);
+    /* 300MB/s */
+    migrate_set_parameter_int(from, "max-bandwidth", 30000000);
+
+    migrate_set_parameter_int(from, "multifd-channels", 16);
+    migrate_set_parameter_int(to, "multifd-channels", 16);
+
+    migrate_set_capability(from, "multifd", "true");
+    migrate_set_capability(to, "multifd", "true");
+
+    /* Start incoming migration from the 1st socket */
+    rsp = wait_command(to, "{ 'execute': 'migrate-incoming',"
+                           "  'arguments': { 'uri': 'tcp:127.0.0.1:0' }}");
+    qobject_unref(rsp);
+
+    /* Wait for the first serial output from the source */
+    wait_for_serial("src_serial");
+
+    uri = migrate_get_socket_address(to, "socket-address");
+
+    migrate_qmp(from, uri, "{}");
+
+    wait_for_migration_pass(from);
+
+    migrate_cancel(from);
+
+    args = migrate_start_new();
+    args->only_target = true;
+
+    if (test_migrate_start(&from, &to2, "defer", args)) {
+        return;
+    }
+
+    migrate_set_parameter_int(to2, "multifd-channels", 16);
+
+    migrate_set_capability(to2, "multifd", "true");
+
+    /* Start incoming migration from the 1st socket */
+    rsp = wait_command(to2, "{ 'execute': 'migrate-incoming',"
+                            "  'arguments': { 'uri': 'tcp:127.0.0.1:0' }}");
+    qobject_unref(rsp);
+
+    uri = migrate_get_socket_address(to2, "socket-address");
+
+    wait_for_migration_status(from, "cancelled", NULL);
+
+    /* 300ms it should converge */
+    migrate_set_parameter_int(from, "downtime-limit", 300);
+    /* 1GB/s */
+    migrate_set_parameter_int(from, "max-bandwidth", 1000000000);
+
+    migrate_qmp(from, uri, "{}");
+
+    wait_for_migration_pass(from);
+
+    if (!got_stop) {
+        qtest_qmp_eventwait(from, "STOP");
+    }
+    qtest_qmp_eventwait(to2, "RESUME");
+
+    wait_for_serial("dest_serial");
+    wait_for_migration_complete(from);
+    test_migrate_end(from, to2, true);
+    g_free(uri);
+}
+
 int main(int argc, char **argv)
 {
     char template[] = "/tmp/migration-test-XXXXXX";
@@ -1359,6 +1468,7 @@ int main(int argc, char **argv)
 
     qtest_add_func("/migration/auto_converge", test_migrate_auto_converge);
     qtest_add_func("/migration/multifd/tcp", test_multifd_tcp);
+    qtest_add_func("/migration/multifd/tcp/cancel", test_multifd_tcp_cancel);
 
     ret = g_test_run();
 
-- 
2.24.1



^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PULL 06/18] migration: Create migration_is_running()
  2020-01-29 11:15 [PULL 00/18] Pull migration patches Juan Quintela
                   ` (4 preceding siblings ...)
  2020-01-29 11:15 ` [PULL 05/18] migration-test: Make sure that multifd and cancel works Juan Quintela
@ 2020-01-29 11:15 ` Juan Quintela
  2020-01-29 11:15 ` [PULL 07/18] migration/multifd: fix nullptr access in multifd_send_terminate_threads Juan Quintela
                   ` (12 subsequent siblings)
  18 siblings, 0 replies; 20+ messages in thread
From: Juan Quintela @ 2020-01-29 11:15 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Paolo Bonzini, Thomas Huth,
	Dr. David Alan Gilbert, Juan Quintela

This function returns true if we are in the middle of a migration.
It is like migration_is_setup_or_active() with CANCELLING and COLO.
Adapt all callers that are needed.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/migration.c | 29 ++++++++++++++++++++++++-----
 migration/migration.h |  1 +
 migration/savevm.c    |  4 +---
 3 files changed, 26 insertions(+), 8 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index efd5350e84..77768fb2c7 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -829,6 +829,27 @@ bool migration_is_setup_or_active(int state)
     }
 }
 
+bool migration_is_running(int state)
+{
+    switch (state) {
+    case MIGRATION_STATUS_ACTIVE:
+    case MIGRATION_STATUS_POSTCOPY_ACTIVE:
+    case MIGRATION_STATUS_POSTCOPY_PAUSED:
+    case MIGRATION_STATUS_POSTCOPY_RECOVER:
+    case MIGRATION_STATUS_SETUP:
+    case MIGRATION_STATUS_PRE_SWITCHOVER:
+    case MIGRATION_STATUS_DEVICE:
+    case MIGRATION_STATUS_WAIT_UNPLUG:
+    case MIGRATION_STATUS_CANCELLING:
+    case MIGRATION_STATUS_COLO:
+        return true;
+
+    default:
+        return false;
+
+    }
+}
+
 static void populate_time_info(MigrationInfo *info, MigrationState *s)
 {
     info->has_status = true;
@@ -1077,7 +1098,7 @@ void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
     MigrationCapabilityStatusList *cap;
     bool cap_list[MIGRATION_CAPABILITY__MAX];
 
-    if (migration_is_setup_or_active(s->state)) {
+    if (migration_is_running(s->state)) {
         error_setg(errp, QERR_MIGRATION_ACTIVE);
         return;
     }
@@ -1590,7 +1611,7 @@ static void migrate_fd_cancel(MigrationState *s)
 
     do {
         old_state = s->state;
-        if (!migration_is_setup_or_active(old_state)) {
+        if (!migration_is_running(old_state)) {
             break;
         }
         /* If the migration is paused, kick it out of the pause */
@@ -1888,9 +1909,7 @@ static bool migrate_prepare(MigrationState *s, bool blk, bool blk_inc,
         return true;
     }
 
-    if (migration_is_setup_or_active(s->state) ||
-        s->state == MIGRATION_STATUS_CANCELLING ||
-        s->state == MIGRATION_STATUS_COLO) {
+    if (migration_is_running(s->state)) {
         error_setg(errp, QERR_MIGRATION_ACTIVE);
         return false;
     }
diff --git a/migration/migration.h b/migration/migration.h
index aa9ff6f27b..44b1d56929 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -279,6 +279,7 @@ void migrate_fd_error(MigrationState *s, const Error *error);
 void migrate_fd_connect(MigrationState *s, Error *error_in);
 
 bool migration_is_setup_or_active(int state);
+bool migration_is_running(int state);
 
 void migrate_init(MigrationState *s);
 bool migration_is_blocked(Error **errp);
diff --git a/migration/savevm.c b/migration/savevm.c
index adfdca26ac..f19cb9ec7a 100644
--- a/migration/savevm.c
+++ b/migration/savevm.c
@@ -1531,9 +1531,7 @@ static int qemu_savevm_state(QEMUFile *f, Error **errp)
     MigrationState *ms = migrate_get_current();
     MigrationStatus status;
 
-    if (migration_is_setup_or_active(ms->state) ||
-        ms->state == MIGRATION_STATUS_CANCELLING ||
-        ms->state == MIGRATION_STATUS_COLO) {
+    if (migration_is_running(ms->state)) {
         error_setg(errp, QERR_MIGRATION_ACTIVE);
         return -EINVAL;
     }
-- 
2.24.1



^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PULL 07/18] migration/multifd: fix nullptr access in multifd_send_terminate_threads
  2020-01-29 11:15 [PULL 00/18] Pull migration patches Juan Quintela
                   ` (5 preceding siblings ...)
  2020-01-29 11:15 ` [PULL 06/18] migration: Create migration_is_running() Juan Quintela
@ 2020-01-29 11:15 ` Juan Quintela
  2020-01-29 11:15 ` [PULL 08/18] ram_addr: Split RAMBlock definition Juan Quintela
                   ` (11 subsequent siblings)
  18 siblings, 0 replies; 20+ messages in thread
From: Juan Quintela @ 2020-01-29 11:15 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Thomas Huth, Juan Quintela, Zhimin Feng,
	Dr. David Alan Gilbert, Paolo Bonzini

From: Zhimin Feng <fengzhimin1@huawei.com>

If the multifd_send_threads is not created when migration is failed,
multifd_save_cleanup would be called twice. In this senario, the
multifd_send_state is accessed after it has been released, the result
is that the source VM is crashing down.

Here is the coredump stack:
    Program received signal SIGSEGV, Segmentation fault.
    0x00005629333a78ef in multifd_send_terminate_threads (err=err@entry=0x0) at migration/ram.c:1012
    1012            MultiFDSendParams *p = &multifd_send_state->params[i];
    #0  0x00005629333a78ef in multifd_send_terminate_threads (err=err@entry=0x0) at migration/ram.c:1012
    #1  0x00005629333ab8a9 in multifd_save_cleanup () at migration/ram.c:1028
    #2  0x00005629333abaea in multifd_new_send_channel_async (task=0x562935450e70, opaque=<optimized out>) at migration/ram.c:1202
    #3  0x000056293373a562 in qio_task_complete (task=task@entry=0x562935450e70) at io/task.c:196
    #4  0x000056293373a6e0 in qio_task_thread_result (opaque=0x562935450e70) at io/task.c:111
    #5  0x00007f475d4d75a7 in g_idle_dispatch () from /usr/lib64/libglib-2.0.so.0
    #6  0x00007f475d4da9a9 in g_main_context_dispatch () from /usr/lib64/libglib-2.0.so.0
    #7  0x0000562933785b33 in glib_pollfds_poll () at util/main-loop.c:219
    #8  os_host_main_loop_wait (timeout=<optimized out>) at util/main-loop.c:242
    #9  main_loop_wait (nonblocking=nonblocking@entry=0) at util/main-loop.c:518
    #10 0x00005629334c5acf in main_loop () at vl.c:1810
    #11 0x000056293334d7bb in main (argc=<optimized out>, argv=<optimized out>, envp=<optimized out>) at vl.c:4471

If the multifd_send_threads is not created when migration is failed.
In this senario, we don't call multifd_save_cleanup in multifd_new_send_channel_async.

Signed-off-by: Zhimin Feng <fengzhimin1@huawei.com>
Reviewed-by: Juan Quintela <quintela@redhat.com>
Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/migration/ram.c b/migration/ram.c
index 3fd7fdffcf..82c7edb083 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1233,7 +1233,15 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
     trace_multifd_new_send_channel_async(p->id);
     if (qio_task_propagate_error(task, &local_err)) {
         migrate_set_error(migrate_get_current(), local_err);
-        multifd_save_cleanup();
+        /* Error happen, we need to tell who pay attention to me */
+        qemu_sem_post(&multifd_send_state->channels_ready);
+        qemu_sem_post(&p->sem_sync);
+        /*
+         * Although multifd_send_thread is not created, but main migration
+         * thread neet to judge whether it is running, so we need to mark
+         * its status.
+         */
+        p->quit = true;
     } else {
         p->c = QIO_CHANNEL(sioc);
         qio_channel_set_delay(p->c, false);
-- 
2.24.1



^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PULL 08/18] ram_addr: Split RAMBlock definition
  2020-01-29 11:15 [PULL 00/18] Pull migration patches Juan Quintela
                   ` (6 preceding siblings ...)
  2020-01-29 11:15 ` [PULL 07/18] migration/multifd: fix nullptr access in multifd_send_terminate_threads Juan Quintela
@ 2020-01-29 11:15 ` Juan Quintela
  2020-01-29 11:15 ` [PULL 09/18] multifd: multifd_send_pages only needs the qemufile Juan Quintela
                   ` (10 subsequent siblings)
  18 siblings, 0 replies; 20+ messages in thread
From: Juan Quintela @ 2020-01-29 11:15 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Paolo Bonzini, Thomas Huth,
	Dr. David Alan Gilbert, Juan Quintela

We need some of the fields without having to poison everything else.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 MAINTAINERS             |  1 +
 include/exec/ram_addr.h | 40 +-------------------------
 include/exec/ramblock.h | 64 +++++++++++++++++++++++++++++++++++++++++
 3 files changed, 66 insertions(+), 39 deletions(-)
 create mode 100644 include/exec/ramblock.h

diff --git a/MAINTAINERS b/MAINTAINERS
index efd3f3875f..c45e886d88 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -1975,6 +1975,7 @@ F: ioport.c
 F: include/exec/memop.h
 F: include/exec/memory.h
 F: include/exec/ram_addr.h
+F: include/exec/ramblock.h
 F: memory.c
 F: include/exec/memory-internal.h
 F: exec.c
diff --git a/include/exec/ram_addr.h b/include/exec/ram_addr.h
index 5adebb0bc7..5e59a3d8d7 100644
--- a/include/exec/ram_addr.h
+++ b/include/exec/ram_addr.h
@@ -24,45 +24,7 @@
 #include "hw/xen/xen.h"
 #include "sysemu/tcg.h"
 #include "exec/ramlist.h"
-
-struct RAMBlock {
-    struct rcu_head rcu;
-    struct MemoryRegion *mr;
-    uint8_t *host;
-    uint8_t *colo_cache; /* For colo, VM's ram cache */
-    ram_addr_t offset;
-    ram_addr_t used_length;
-    ram_addr_t max_length;
-    void (*resized)(const char*, uint64_t length, void *host);
-    uint32_t flags;
-    /* Protected by iothread lock.  */
-    char idstr[256];
-    /* RCU-enabled, writes protected by the ramlist lock */
-    QLIST_ENTRY(RAMBlock) next;
-    QLIST_HEAD(, RAMBlockNotifier) ramblock_notifiers;
-    int fd;
-    size_t page_size;
-    /* dirty bitmap used during migration */
-    unsigned long *bmap;
-    /* bitmap of already received pages in postcopy */
-    unsigned long *receivedmap;
-
-    /*
-     * bitmap to track already cleared dirty bitmap.  When the bit is
-     * set, it means the corresponding memory chunk needs a log-clear.
-     * Set this up to non-NULL to enable the capability to postpone
-     * and split clearing of dirty bitmap on the remote node (e.g.,
-     * KVM).  The bitmap will be set only when doing global sync.
-     *
-     * NOTE: this bitmap is different comparing to the other bitmaps
-     * in that one bit can represent multiple guest pages (which is
-     * decided by the `clear_bmap_shift' variable below).  On
-     * destination side, this should always be NULL, and the variable
-     * `clear_bmap_shift' is meaningless.
-     */
-    unsigned long *clear_bmap;
-    uint8_t clear_bmap_shift;
-};
+#include "exec/ramblock.h"
 
 /**
  * clear_bmap_size: calculate clear bitmap size
diff --git a/include/exec/ramblock.h b/include/exec/ramblock.h
new file mode 100644
index 0000000000..07d50864d8
--- /dev/null
+++ b/include/exec/ramblock.h
@@ -0,0 +1,64 @@
+/*
+ * Declarations for cpu physical memory functions
+ *
+ * Copyright 2011 Red Hat, Inc. and/or its affiliates
+ *
+ * Authors:
+ *  Avi Kivity <avi@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ *
+ */
+
+/*
+ * This header is for use by exec.c and memory.c ONLY.  Do not include it.
+ * The functions declared here will be removed soon.
+ */
+
+#ifndef QEMU_EXEC_RAMBLOCK_H
+#define QEMU_EXEC_RAMBLOCK_H
+
+#ifndef CONFIG_USER_ONLY
+#include "cpu-common.h"
+
+struct RAMBlock {
+    struct rcu_head rcu;
+    struct MemoryRegion *mr;
+    uint8_t *host;
+    uint8_t *colo_cache; /* For colo, VM's ram cache */
+    ram_addr_t offset;
+    ram_addr_t used_length;
+    ram_addr_t max_length;
+    void (*resized)(const char*, uint64_t length, void *host);
+    uint32_t flags;
+    /* Protected by iothread lock.  */
+    char idstr[256];
+    /* RCU-enabled, writes protected by the ramlist lock */
+    QLIST_ENTRY(RAMBlock) next;
+    QLIST_HEAD(, RAMBlockNotifier) ramblock_notifiers;
+    int fd;
+    size_t page_size;
+    /* dirty bitmap used during migration */
+    unsigned long *bmap;
+    /* bitmap of already received pages in postcopy */
+    unsigned long *receivedmap;
+
+    /*
+     * bitmap to track already cleared dirty bitmap.  When the bit is
+     * set, it means the corresponding memory chunk needs a log-clear.
+     * Set this up to non-NULL to enable the capability to postpone
+     * and split clearing of dirty bitmap on the remote node (e.g.,
+     * KVM).  The bitmap will be set only when doing global sync.
+     *
+     * NOTE: this bitmap is different comparing to the other bitmaps
+     * in that one bit can represent multiple guest pages (which is
+     * decided by the `clear_bmap_shift' variable below).  On
+     * destination side, this should always be NULL, and the variable
+     * `clear_bmap_shift' is meaningless.
+     */
+    unsigned long *clear_bmap;
+    uint8_t clear_bmap_shift;
+};
+#endif
+#endif
-- 
2.24.1



^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PULL 09/18] multifd: multifd_send_pages only needs the qemufile
  2020-01-29 11:15 [PULL 00/18] Pull migration patches Juan Quintela
                   ` (7 preceding siblings ...)
  2020-01-29 11:15 ` [PULL 08/18] ram_addr: Split RAMBlock definition Juan Quintela
@ 2020-01-29 11:15 ` Juan Quintela
  2020-01-29 11:15 ` [PULL 10/18] multifd: multifd_queue_page " Juan Quintela
                   ` (9 subsequent siblings)
  18 siblings, 0 replies; 20+ messages in thread
From: Juan Quintela @ 2020-01-29 11:15 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Paolo Bonzini, Thomas Huth,
	Dr. David Alan Gilbert, Juan Quintela

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/ram.c | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 82c7edb083..602e9ca5a0 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -929,7 +929,7 @@ struct {
  * false.
  */
 
-static int multifd_send_pages(RAMState *rs)
+static int multifd_send_pages(QEMUFile *f)
 {
     int i;
     static int next_channel;
@@ -965,7 +965,7 @@ static int multifd_send_pages(RAMState *rs)
     multifd_send_state->pages = p->pages;
     p->pages = pages;
     transferred = ((uint64_t) pages->used) * TARGET_PAGE_SIZE + p->packet_len;
-    qemu_file_update_transfer(rs->f, transferred);
+    qemu_file_update_transfer(f, transferred);
     ram_counters.multifd_bytes += transferred;
     ram_counters.transferred += transferred;;
     qemu_mutex_unlock(&p->mutex);
@@ -993,7 +993,7 @@ static int multifd_queue_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
         }
     }
 
-    if (multifd_send_pages(rs) < 0) {
+    if (multifd_send_pages(rs->f) < 0) {
         return -1;
     }
 
@@ -1090,7 +1090,7 @@ static void multifd_send_sync_main(RAMState *rs)
         return;
     }
     if (multifd_send_state->pages->used) {
-        if (multifd_send_pages(rs) < 0) {
+        if (multifd_send_pages(rs->f) < 0) {
             error_report("%s: multifd_send_pages fail", __func__);
             return;
         }
-- 
2.24.1



^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PULL 10/18] multifd: multifd_queue_page only needs the qemufile
  2020-01-29 11:15 [PULL 00/18] Pull migration patches Juan Quintela
                   ` (8 preceding siblings ...)
  2020-01-29 11:15 ` [PULL 09/18] multifd: multifd_send_pages only needs the qemufile Juan Quintela
@ 2020-01-29 11:15 ` Juan Quintela
  2020-01-29 11:15 ` [PULL 11/18] multifd: multifd_send_sync_main " Juan Quintela
                   ` (8 subsequent siblings)
  18 siblings, 0 replies; 20+ messages in thread
From: Juan Quintela @ 2020-01-29 11:15 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Paolo Bonzini, Thomas Huth,
	Dr. David Alan Gilbert, Juan Quintela

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/ram.c | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 602e9ca5a0..ad9cc3e7bd 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -974,7 +974,7 @@ static int multifd_send_pages(QEMUFile *f)
     return 1;
 }
 
-static int multifd_queue_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
+static int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
 {
     MultiFDPages_t *pages = multifd_send_state->pages;
 
@@ -993,12 +993,12 @@ static int multifd_queue_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
         }
     }
 
-    if (multifd_send_pages(rs->f) < 0) {
+    if (multifd_send_pages(f) < 0) {
         return -1;
     }
 
     if (pages->block != block) {
-        return  multifd_queue_page(rs, block, offset);
+        return  multifd_queue_page(f, block, offset);
     }
 
     return 1;
@@ -2136,7 +2136,7 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
 static int ram_save_multifd_page(RAMState *rs, RAMBlock *block,
                                  ram_addr_t offset)
 {
-    if (multifd_queue_page(rs, block, offset) < 0) {
+    if (multifd_queue_page(rs->f, block, offset) < 0) {
         return -1;
     }
     ram_counters.normal++;
-- 
2.24.1



^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PULL 11/18] multifd: multifd_send_sync_main only needs the qemufile
  2020-01-29 11:15 [PULL 00/18] Pull migration patches Juan Quintela
                   ` (9 preceding siblings ...)
  2020-01-29 11:15 ` [PULL 10/18] multifd: multifd_queue_page " Juan Quintela
@ 2020-01-29 11:15 ` Juan Quintela
  2020-01-29 11:15 ` [PULL 12/18] multifd: Use qemu_target_page_size() Juan Quintela
                   ` (7 subsequent siblings)
  18 siblings, 0 replies; 20+ messages in thread
From: Juan Quintela @ 2020-01-29 11:15 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Paolo Bonzini, Thomas Huth,
	Dr. David Alan Gilbert, Juan Quintela

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/ram.c | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index ad9cc3e7bd..152e9cf46d 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1082,7 +1082,7 @@ void multifd_save_cleanup(void)
     multifd_send_state = NULL;
 }
 
-static void multifd_send_sync_main(RAMState *rs)
+static void multifd_send_sync_main(QEMUFile *f)
 {
     int i;
 
@@ -1090,7 +1090,7 @@ static void multifd_send_sync_main(RAMState *rs)
         return;
     }
     if (multifd_send_state->pages->used) {
-        if (multifd_send_pages(rs->f) < 0) {
+        if (multifd_send_pages(f) < 0) {
             error_report("%s: multifd_send_pages fail", __func__);
             return;
         }
@@ -1111,7 +1111,7 @@ static void multifd_send_sync_main(RAMState *rs)
         p->packet_num = multifd_send_state->packet_num++;
         p->flags |= MULTIFD_FLAG_SYNC;
         p->pending_job++;
-        qemu_file_update_transfer(rs->f, p->packet_len);
+        qemu_file_update_transfer(f, p->packet_len);
         ram_counters.multifd_bytes += p->packet_len;
         ram_counters.transferred += p->packet_len;
         qemu_mutex_unlock(&p->mutex);
@@ -3434,7 +3434,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
 
-    multifd_send_sync_main(*rsp);
+    multifd_send_sync_main(f);
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
     qemu_fflush(f);
 
@@ -3534,7 +3534,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
 out:
     if (ret >= 0
         && migration_is_setup_or_active(migrate_get_current()->state)) {
-        multifd_send_sync_main(rs);
+        multifd_send_sync_main(rs->f);
         qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
         qemu_fflush(f);
         ram_counters.transferred += 8;
@@ -3593,7 +3593,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
     }
 
     if (ret >= 0) {
-        multifd_send_sync_main(rs);
+        multifd_send_sync_main(rs->f);
         qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
         qemu_fflush(f);
     }
-- 
2.24.1



^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PULL 12/18] multifd: Use qemu_target_page_size()
  2020-01-29 11:15 [PULL 00/18] Pull migration patches Juan Quintela
                   ` (10 preceding siblings ...)
  2020-01-29 11:15 ` [PULL 11/18] multifd: multifd_send_sync_main " Juan Quintela
@ 2020-01-29 11:15 ` Juan Quintela
  2020-01-29 11:15 ` [PULL 13/18] migration: Make checkpatch happy with comments Juan Quintela
                   ` (6 subsequent siblings)
  18 siblings, 0 replies; 20+ messages in thread
From: Juan Quintela @ 2020-01-29 11:15 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Paolo Bonzini, Thomas Huth,
	Dr. David Alan Gilbert, Juan Quintela

We will make it cpu independent.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/ram.c | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 152e9cf46d..8f04b5ab3a 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -882,14 +882,14 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
     for (i = 0; i < p->pages->used; i++) {
         uint64_t offset = be64_to_cpu(packet->offset[i]);
 
-        if (offset > (block->used_length - TARGET_PAGE_SIZE)) {
+        if (offset > (block->used_length - qemu_target_page_size())) {
             error_setg(errp, "multifd: offset too long %" PRIu64
                        " (max " RAM_ADDR_FMT ")",
                        offset, block->max_length);
             return -1;
         }
         p->pages->iov[i].iov_base = block->host + offset;
-        p->pages->iov[i].iov_len = TARGET_PAGE_SIZE;
+        p->pages->iov[i].iov_len = qemu_target_page_size();
     }
 
     return 0;
@@ -964,7 +964,8 @@ static int multifd_send_pages(QEMUFile *f)
     p->packet_num = multifd_send_state->packet_num++;
     multifd_send_state->pages = p->pages;
     p->pages = pages;
-    transferred = ((uint64_t) pages->used) * TARGET_PAGE_SIZE + p->packet_len;
+    transferred = ((uint64_t) pages->used) * qemu_target_page_size()
+                + p->packet_len;
     qemu_file_update_transfer(f, transferred);
     ram_counters.multifd_bytes += transferred;
     ram_counters.transferred += transferred;;
@@ -985,7 +986,7 @@ static int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
     if (pages->block == block) {
         pages->offset[pages->used] = offset;
         pages->iov[pages->used].iov_base = block->host + offset;
-        pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
+        pages->iov[pages->used].iov_len = qemu_target_page_size();
         pages->used++;
 
         if (pages->used < pages->allocated) {
-- 
2.24.1



^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PULL 13/18] migration: Make checkpatch happy with comments
  2020-01-29 11:15 [PULL 00/18] Pull migration patches Juan Quintela
                   ` (11 preceding siblings ...)
  2020-01-29 11:15 ` [PULL 12/18] multifd: Use qemu_target_page_size() Juan Quintela
@ 2020-01-29 11:15 ` Juan Quintela
  2020-01-29 11:15 ` [PULL 14/18] multifd: Make multifd_save_setup() get an Error parameter Juan Quintela
                   ` (5 subsequent siblings)
  18 siblings, 0 replies; 20+ messages in thread
From: Juan Quintela @ 2020-01-29 11:15 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Paolo Bonzini, Thomas Huth,
	Dr. David Alan Gilbert, Juan Quintela

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/ram.c | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 8f04b5ab3a..12b76b7841 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1320,10 +1320,12 @@ static void multifd_recv_terminate_threads(Error *err)
 
         qemu_mutex_lock(&p->mutex);
         p->quit = true;
-        /* We could arrive here for two reasons:
-           - normal quit, i.e. everything went fine, just finished
-           - error quit: We close the channels so the channel threads
-             finish the qio_channel_read_all_eof() */
+        /*
+         * We could arrive here for two reasons:
+         *  - normal quit, i.e. everything went fine, just finished
+         *  - error quit: We close the channels so the channel threads
+         *    finish the qio_channel_read_all_eof()
+         */
         if (p->c) {
             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
         }
-- 
2.24.1



^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PULL 14/18] multifd: Make multifd_save_setup() get an Error parameter
  2020-01-29 11:15 [PULL 00/18] Pull migration patches Juan Quintela
                   ` (12 preceding siblings ...)
  2020-01-29 11:15 ` [PULL 13/18] migration: Make checkpatch happy with comments Juan Quintela
@ 2020-01-29 11:15 ` Juan Quintela
  2020-01-29 11:15 ` [PULL 15/18] multifd: Make multifd_load_setup() " Juan Quintela
                   ` (4 subsequent siblings)
  18 siblings, 0 replies; 20+ messages in thread
From: Juan Quintela @ 2020-01-29 11:15 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Paolo Bonzini, Thomas Huth,
	Dr. David Alan Gilbert, Juan Quintela

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/migration.c | 4 +++-
 migration/ram.c       | 2 +-
 migration/ram.h       | 2 +-
 3 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 77768fb2c7..d478f832ea 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -3367,6 +3367,7 @@ static void *migration_thread(void *opaque)
 
 void migrate_fd_connect(MigrationState *s, Error *error_in)
 {
+    Error *local_err = NULL;
     int64_t rate_limit;
     bool resume = s->state == MIGRATION_STATUS_POSTCOPY_PAUSED;
 
@@ -3415,7 +3416,8 @@ void migrate_fd_connect(MigrationState *s, Error *error_in)
         return;
     }
 
-    if (multifd_save_setup() != 0) {
+    if (multifd_save_setup(&local_err) != 0) {
+        error_report_err(local_err);
         migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
                           MIGRATION_STATUS_FAILED);
         migrate_fd_cleanup(s);
diff --git a/migration/ram.c b/migration/ram.c
index 12b76b7841..78483247ad 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1252,7 +1252,7 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
     }
 }
 
-int multifd_save_setup(void)
+int multifd_save_setup(Error **errp)
 {
     int thread_count;
     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
diff --git a/migration/ram.h b/migration/ram.h
index bd0eee79b6..da22a417ea 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -41,7 +41,7 @@ int xbzrle_cache_resize(int64_t new_size, Error **errp);
 uint64_t ram_bytes_remaining(void);
 uint64_t ram_bytes_total(void);
 
-int multifd_save_setup(void);
+int multifd_save_setup(Error **errp);
 void multifd_save_cleanup(void);
 int multifd_load_setup(void);
 int multifd_load_cleanup(Error **errp);
-- 
2.24.1



^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PULL 15/18] multifd: Make multifd_load_setup() get an Error parameter
  2020-01-29 11:15 [PULL 00/18] Pull migration patches Juan Quintela
                   ` (13 preceding siblings ...)
  2020-01-29 11:15 ` [PULL 14/18] multifd: Make multifd_save_setup() get an Error parameter Juan Quintela
@ 2020-01-29 11:15 ` Juan Quintela
  2020-01-29 11:15 ` [PULL 16/18] multifd: Split multifd code into its own file Juan Quintela
                   ` (3 subsequent siblings)
  18 siblings, 0 replies; 20+ messages in thread
From: Juan Quintela @ 2020-01-29 11:15 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Paolo Bonzini, Thomas Huth,
	Dr. David Alan Gilbert, Juan Quintela

We need to change the full chain to pass the Error parameter.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/migration.c | 35 +++++++++++++++++++++++++++++------
 migration/migration.h |  2 +-
 migration/ram.c       |  2 +-
 migration/ram.h       |  2 +-
 migration/rdma.c      |  2 +-
 5 files changed, 33 insertions(+), 10 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index d478f832ea..adc7d08e93 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -518,13 +518,23 @@ fail:
     exit(EXIT_FAILURE);
 }
 
-static void migration_incoming_setup(QEMUFile *f)
+/**
+ * @migration_incoming_setup: Setup incoming migration
+ *
+ * Returns 0 for no error or 1 for error
+ *
+ * @f: file for main migration channel
+ * @errp: where to put errors
+ */
+static int migration_incoming_setup(QEMUFile *f, Error **errp)
 {
     MigrationIncomingState *mis = migration_incoming_get_current();
+    Error *local_err = NULL;
 
-    if (multifd_load_setup() != 0) {
+    if (multifd_load_setup(&local_err) != 0) {
         /* We haven't been able to create multifd threads
            nothing better to do */
+        error_report_err(local_err);
         exit(EXIT_FAILURE);
     }
 
@@ -532,6 +542,7 @@ static void migration_incoming_setup(QEMUFile *f)
         mis->from_src_file = f;
     }
     qemu_file_set_blocking(f, false);
+    return 0;
 }
 
 void migration_incoming_process(void)
@@ -572,19 +583,27 @@ static bool postcopy_try_recover(QEMUFile *f)
     return false;
 }
 
-void migration_fd_process_incoming(QEMUFile *f)
+void migration_fd_process_incoming(QEMUFile *f, Error **errp)
 {
+    Error *local_err = NULL;
+
     if (postcopy_try_recover(f)) {
         return;
     }
 
-    migration_incoming_setup(f);
+    if (migration_incoming_setup(f, &local_err)) {
+        if (local_err) {
+            error_propagate(errp, local_err);
+        }
+        return;
+    }
     migration_incoming_process();
 }
 
 void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
 {
     MigrationIncomingState *mis = migration_incoming_get_current();
+    Error *local_err = NULL;
     bool start_migration;
 
     if (!mis->from_src_file) {
@@ -596,7 +615,12 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
             return;
         }
 
-        migration_incoming_setup(f);
+        if (migration_incoming_setup(f, &local_err)) {
+            if (local_err) {
+                error_propagate(errp, local_err);
+            }
+            return;
+        }
 
         /*
          * Common migration only needs one channel, so we can start
@@ -604,7 +628,6 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
          */
         start_migration = !migrate_use_multifd();
     } else {
-        Error *local_err = NULL;
         /* Multiple connections */
         assert(migrate_use_multifd());
         start_migration = multifd_recv_new_channel(ioc, &local_err);
diff --git a/migration/migration.h b/migration/migration.h
index 44b1d56929..8473ddfc88 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -265,7 +265,7 @@ struct MigrationState
 
 void migrate_set_state(int *state, int old_state, int new_state);
 
-void migration_fd_process_incoming(QEMUFile *f);
+void migration_fd_process_incoming(QEMUFile *f, Error **errp);
 void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp);
 void migration_incoming_process(void);
 
diff --git a/migration/ram.c b/migration/ram.c
index 78483247ad..3abd41ad33 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1474,7 +1474,7 @@ static void *multifd_recv_thread(void *opaque)
     return NULL;
 }
 
-int multifd_load_setup(void)
+int multifd_load_setup(Error **errp)
 {
     int thread_count;
     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
diff --git a/migration/ram.h b/migration/ram.h
index da22a417ea..42be471d52 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -43,7 +43,7 @@ uint64_t ram_bytes_total(void);
 
 int multifd_save_setup(Error **errp);
 void multifd_save_cleanup(void);
-int multifd_load_setup(void);
+int multifd_load_setup(Error **errp);
 int multifd_load_cleanup(Error **errp);
 bool multifd_recv_all_channels_created(void);
 bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
diff --git a/migration/rdma.c b/migration/rdma.c
index e241dcb992..2379b8345b 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -4004,7 +4004,7 @@ static void rdma_accept_incoming_migration(void *opaque)
     }
 
     rdma->migration_started_on_destination = 1;
-    migration_fd_process_incoming(f);
+    migration_fd_process_incoming(f, errp);
 }
 
 void rdma_start_incoming_migration(const char *host_port, Error **errp)
-- 
2.24.1



^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PULL 16/18] multifd: Split multifd code into its own file
  2020-01-29 11:15 [PULL 00/18] Pull migration patches Juan Quintela
                   ` (14 preceding siblings ...)
  2020-01-29 11:15 ` [PULL 15/18] multifd: Make multifd_load_setup() " Juan Quintela
@ 2020-01-29 11:15 ` Juan Quintela
  2020-01-29 11:15 ` [PULL 17/18] migration: Simplify get_qlist Juan Quintela
                   ` (2 subsequent siblings)
  18 siblings, 0 replies; 20+ messages in thread
From: Juan Quintela @ 2020-01-29 11:15 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Paolo Bonzini, Thomas Huth,
	Dr. David Alan Gilbert, Juan Quintela

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/Makefile.objs |   1 +
 migration/migration.c   |   1 +
 migration/multifd.c     | 899 ++++++++++++++++++++++++++++++++++++
 migration/multifd.h     | 139 ++++++
 migration/ram.c         | 988 +---------------------------------------
 migration/ram.h         |   7 -
 6 files changed, 1041 insertions(+), 994 deletions(-)
 create mode 100644 migration/multifd.c
 create mode 100644 migration/multifd.h

diff --git a/migration/Makefile.objs b/migration/Makefile.objs
index a4f3bafd86..d3623d5f9b 100644
--- a/migration/Makefile.objs
+++ b/migration/Makefile.objs
@@ -7,6 +7,7 @@ common-obj-y += qemu-file-channel.o
 common-obj-y += xbzrle.o postcopy-ram.o
 common-obj-y += qjson.o
 common-obj-y += block-dirty-bitmap.o
+common-obj-y += multifd.o
 
 common-obj-$(CONFIG_RDMA) += rdma.o
 
diff --git a/migration/migration.c b/migration/migration.c
index adc7d08e93..3a21a4686c 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -53,6 +53,7 @@
 #include "monitor/monitor.h"
 #include "net/announce.h"
 #include "qemu/queue.h"
+#include "multifd.h"
 
 #define MAX_THROTTLE  (32 << 20)      /* Migration transfer speed throttling */
 
diff --git a/migration/multifd.c b/migration/multifd.c
new file mode 100644
index 0000000000..b3e8ae9bcc
--- /dev/null
+++ b/migration/multifd.c
@@ -0,0 +1,899 @@
+/*
+ * Multifd common code
+ *
+ * Copyright (c) 2019-2020 Red Hat Inc
+ *
+ * Authors:
+ *  Juan Quintela <quintela@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/rcu.h"
+#include "exec/target_page.h"
+#include "sysemu/sysemu.h"
+#include "exec/ramblock.h"
+#include "qemu/error-report.h"
+#include "qapi/error.h"
+#include "ram.h"
+#include "migration.h"
+#include "socket.h"
+#include "qemu-file.h"
+#include "trace.h"
+#include "multifd.h"
+
+/* Multiple fd's */
+
+#define MULTIFD_MAGIC 0x11223344U
+#define MULTIFD_VERSION 1
+
+typedef struct {
+    uint32_t magic;
+    uint32_t version;
+    unsigned char uuid[16]; /* QemuUUID */
+    uint8_t id;
+    uint8_t unused1[7];     /* Reserved for future use */
+    uint64_t unused2[4];    /* Reserved for future use */
+} __attribute__((packed)) MultiFDInit_t;
+
+static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
+{
+    MultiFDInit_t msg = {};
+    int ret;
+
+    msg.magic = cpu_to_be32(MULTIFD_MAGIC);
+    msg.version = cpu_to_be32(MULTIFD_VERSION);
+    msg.id = p->id;
+    memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
+
+    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
+    if (ret != 0) {
+        return -1;
+    }
+    return 0;
+}
+
+static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
+{
+    MultiFDInit_t msg;
+    int ret;
+
+    ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
+    if (ret != 0) {
+        return -1;
+    }
+
+    msg.magic = be32_to_cpu(msg.magic);
+    msg.version = be32_to_cpu(msg.version);
+
+    if (msg.magic != MULTIFD_MAGIC) {
+        error_setg(errp, "multifd: received packet magic %x "
+                   "expected %x", msg.magic, MULTIFD_MAGIC);
+        return -1;
+    }
+
+    if (msg.version != MULTIFD_VERSION) {
+        error_setg(errp, "multifd: received packet version %d "
+                   "expected %d", msg.version, MULTIFD_VERSION);
+        return -1;
+    }
+
+    if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
+        char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
+        char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
+
+        error_setg(errp, "multifd: received uuid '%s' and expected "
+                   "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
+        g_free(uuid);
+        g_free(msg_uuid);
+        return -1;
+    }
+
+    if (msg.id > migrate_multifd_channels()) {
+        error_setg(errp, "multifd: received channel version %d "
+                   "expected %d", msg.version, MULTIFD_VERSION);
+        return -1;
+    }
+
+    return msg.id;
+}
+
+static MultiFDPages_t *multifd_pages_init(size_t size)
+{
+    MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
+
+    pages->allocated = size;
+    pages->iov = g_new0(struct iovec, size);
+    pages->offset = g_new0(ram_addr_t, size);
+
+    return pages;
+}
+
+static void multifd_pages_clear(MultiFDPages_t *pages)
+{
+    pages->used = 0;
+    pages->allocated = 0;
+    pages->packet_num = 0;
+    pages->block = NULL;
+    g_free(pages->iov);
+    pages->iov = NULL;
+    g_free(pages->offset);
+    pages->offset = NULL;
+    g_free(pages);
+}
+
+static void multifd_send_fill_packet(MultiFDSendParams *p)
+{
+    MultiFDPacket_t *packet = p->packet;
+    int i;
+
+    packet->flags = cpu_to_be32(p->flags);
+    packet->pages_alloc = cpu_to_be32(p->pages->allocated);
+    packet->pages_used = cpu_to_be32(p->pages->used);
+    packet->next_packet_size = cpu_to_be32(p->next_packet_size);
+    packet->packet_num = cpu_to_be64(p->packet_num);
+
+    if (p->pages->block) {
+        strncpy(packet->ramblock, p->pages->block->idstr, 256);
+    }
+
+    for (i = 0; i < p->pages->used; i++) {
+        /* there are architectures where ram_addr_t is 32 bit */
+        uint64_t temp = p->pages->offset[i];
+
+        packet->offset[i] = cpu_to_be64(temp);
+    }
+}
+
+static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
+{
+    MultiFDPacket_t *packet = p->packet;
+    uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+    RAMBlock *block;
+    int i;
+
+    packet->magic = be32_to_cpu(packet->magic);
+    if (packet->magic != MULTIFD_MAGIC) {
+        error_setg(errp, "multifd: received packet "
+                   "magic %x and expected magic %x",
+                   packet->magic, MULTIFD_MAGIC);
+        return -1;
+    }
+
+    packet->version = be32_to_cpu(packet->version);
+    if (packet->version != MULTIFD_VERSION) {
+        error_setg(errp, "multifd: received packet "
+                   "version %d and expected version %d",
+                   packet->version, MULTIFD_VERSION);
+        return -1;
+    }
+
+    p->flags = be32_to_cpu(packet->flags);
+
+    packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
+    /*
+     * If we received a packet that is 100 times bigger than expected
+     * just stop migration.  It is a magic number.
+     */
+    if (packet->pages_alloc > pages_max * 100) {
+        error_setg(errp, "multifd: received packet "
+                   "with size %d and expected a maximum size of %d",
+                   packet->pages_alloc, pages_max * 100) ;
+        return -1;
+    }
+    /*
+     * We received a packet that is bigger than expected but inside
+     * reasonable limits (see previous comment).  Just reallocate.
+     */
+    if (packet->pages_alloc > p->pages->allocated) {
+        multifd_pages_clear(p->pages);
+        p->pages = multifd_pages_init(packet->pages_alloc);
+    }
+
+    p->pages->used = be32_to_cpu(packet->pages_used);
+    if (p->pages->used > packet->pages_alloc) {
+        error_setg(errp, "multifd: received packet "
+                   "with %d pages and expected maximum pages are %d",
+                   p->pages->used, packet->pages_alloc) ;
+        return -1;
+    }
+
+    p->next_packet_size = be32_to_cpu(packet->next_packet_size);
+    p->packet_num = be64_to_cpu(packet->packet_num);
+
+    if (p->pages->used == 0) {
+        return 0;
+    }
+
+    /* make sure that ramblock is 0 terminated */
+    packet->ramblock[255] = 0;
+    block = qemu_ram_block_by_name(packet->ramblock);
+    if (!block) {
+        error_setg(errp, "multifd: unknown ram block %s",
+                   packet->ramblock);
+        return -1;
+    }
+
+    for (i = 0; i < p->pages->used; i++) {
+        uint64_t offset = be64_to_cpu(packet->offset[i]);
+
+        if (offset > (block->used_length - qemu_target_page_size())) {
+            error_setg(errp, "multifd: offset too long %" PRIu64
+                       " (max " RAM_ADDR_FMT ")",
+                       offset, block->max_length);
+            return -1;
+        }
+        p->pages->iov[i].iov_base = block->host + offset;
+        p->pages->iov[i].iov_len = qemu_target_page_size();
+    }
+
+    return 0;
+}
+
+struct {
+    MultiFDSendParams *params;
+    /* array of pages to sent */
+    MultiFDPages_t *pages;
+    /* global number of generated multifd packets */
+    uint64_t packet_num;
+    /* send channels ready */
+    QemuSemaphore channels_ready;
+    /*
+     * Have we already run terminate threads.  There is a race when it
+     * happens that we got one error while we are exiting.
+     * We will use atomic operations.  Only valid values are 0 and 1.
+     */
+    int exiting;
+} *multifd_send_state;
+
+/*
+ * How we use multifd_send_state->pages and channel->pages?
+ *
+ * We create a pages for each channel, and a main one.  Each time that
+ * we need to send a batch of pages we interchange the ones between
+ * multifd_send_state and the channel that is sending it.  There are
+ * two reasons for that:
+ *    - to not have to do so many mallocs during migration
+ *    - to make easier to know what to free at the end of migration
+ *
+ * This way we always know who is the owner of each "pages" struct,
+ * and we don't need any locking.  It belongs to the migration thread
+ * or to the channel thread.  Switching is safe because the migration
+ * thread is using the channel mutex when changing it, and the channel
+ * have to had finish with its own, otherwise pending_job can't be
+ * false.
+ */
+
+static int multifd_send_pages(QEMUFile *f)
+{
+    int i;
+    static int next_channel;
+    MultiFDSendParams *p = NULL; /* make happy gcc */
+    MultiFDPages_t *pages = multifd_send_state->pages;
+    uint64_t transferred;
+
+    if (atomic_read(&multifd_send_state->exiting)) {
+        return -1;
+    }
+
+    qemu_sem_wait(&multifd_send_state->channels_ready);
+    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
+        p = &multifd_send_state->params[i];
+
+        qemu_mutex_lock(&p->mutex);
+        if (p->quit) {
+            error_report("%s: channel %d has already quit!", __func__, i);
+            qemu_mutex_unlock(&p->mutex);
+            return -1;
+        }
+        if (!p->pending_job) {
+            p->pending_job++;
+            next_channel = (i + 1) % migrate_multifd_channels();
+            break;
+        }
+        qemu_mutex_unlock(&p->mutex);
+    }
+    assert(!p->pages->used);
+    assert(!p->pages->block);
+
+    p->packet_num = multifd_send_state->packet_num++;
+    multifd_send_state->pages = p->pages;
+    p->pages = pages;
+    transferred = ((uint64_t) pages->used) * qemu_target_page_size()
+                + p->packet_len;
+    qemu_file_update_transfer(f, transferred);
+    ram_counters.multifd_bytes += transferred;
+    ram_counters.transferred += transferred;;
+    qemu_mutex_unlock(&p->mutex);
+    qemu_sem_post(&p->sem);
+
+    return 1;
+}
+
+int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
+{
+    MultiFDPages_t *pages = multifd_send_state->pages;
+
+    if (!pages->block) {
+        pages->block = block;
+    }
+
+    if (pages->block == block) {
+        pages->offset[pages->used] = offset;
+        pages->iov[pages->used].iov_base = block->host + offset;
+        pages->iov[pages->used].iov_len = qemu_target_page_size();
+        pages->used++;
+
+        if (pages->used < pages->allocated) {
+            return 1;
+        }
+    }
+
+    if (multifd_send_pages(f) < 0) {
+        return -1;
+    }
+
+    if (pages->block != block) {
+        return  multifd_queue_page(f, block, offset);
+    }
+
+    return 1;
+}
+
+static void multifd_send_terminate_threads(Error *err)
+{
+    int i;
+
+    trace_multifd_send_terminate_threads(err != NULL);
+
+    if (err) {
+        MigrationState *s = migrate_get_current();
+        migrate_set_error(s, err);
+        if (s->state == MIGRATION_STATUS_SETUP ||
+            s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
+            s->state == MIGRATION_STATUS_DEVICE ||
+            s->state == MIGRATION_STATUS_ACTIVE) {
+            migrate_set_state(&s->state, s->state,
+                              MIGRATION_STATUS_FAILED);
+        }
+    }
+
+    /*
+     * We don't want to exit each threads twice.  Depending on where
+     * we get the error, or if there are two independent errors in two
+     * threads at the same time, we can end calling this function
+     * twice.
+     */
+    if (atomic_xchg(&multifd_send_state->exiting, 1)) {
+        return;
+    }
+
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+
+        qemu_mutex_lock(&p->mutex);
+        p->quit = true;
+        qemu_sem_post(&p->sem);
+        qemu_mutex_unlock(&p->mutex);
+    }
+}
+
+void multifd_save_cleanup(void)
+{
+    int i;
+
+    if (!migrate_use_multifd()) {
+        return;
+    }
+    multifd_send_terminate_threads(NULL);
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+
+        if (p->running) {
+            qemu_thread_join(&p->thread);
+        }
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+
+        socket_send_channel_destroy(p->c);
+        p->c = NULL;
+        qemu_mutex_destroy(&p->mutex);
+        qemu_sem_destroy(&p->sem);
+        qemu_sem_destroy(&p->sem_sync);
+        g_free(p->name);
+        p->name = NULL;
+        multifd_pages_clear(p->pages);
+        p->pages = NULL;
+        p->packet_len = 0;
+        g_free(p->packet);
+        p->packet = NULL;
+    }
+    qemu_sem_destroy(&multifd_send_state->channels_ready);
+    g_free(multifd_send_state->params);
+    multifd_send_state->params = NULL;
+    multifd_pages_clear(multifd_send_state->pages);
+    multifd_send_state->pages = NULL;
+    g_free(multifd_send_state);
+    multifd_send_state = NULL;
+}
+
+void multifd_send_sync_main(QEMUFile *f)
+{
+    int i;
+
+    if (!migrate_use_multifd()) {
+        return;
+    }
+    if (multifd_send_state->pages->used) {
+        if (multifd_send_pages(f) < 0) {
+            error_report("%s: multifd_send_pages fail", __func__);
+            return;
+        }
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+
+        trace_multifd_send_sync_main_signal(p->id);
+
+        qemu_mutex_lock(&p->mutex);
+
+        if (p->quit) {
+            error_report("%s: channel %d has already quit", __func__, i);
+            qemu_mutex_unlock(&p->mutex);
+            return;
+        }
+
+        p->packet_num = multifd_send_state->packet_num++;
+        p->flags |= MULTIFD_FLAG_SYNC;
+        p->pending_job++;
+        qemu_file_update_transfer(f, p->packet_len);
+        ram_counters.multifd_bytes += p->packet_len;
+        ram_counters.transferred += p->packet_len;
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_post(&p->sem);
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+
+        trace_multifd_send_sync_main_wait(p->id);
+        qemu_sem_wait(&p->sem_sync);
+    }
+    trace_multifd_send_sync_main(multifd_send_state->packet_num);
+}
+
+static void *multifd_send_thread(void *opaque)
+{
+    MultiFDSendParams *p = opaque;
+    Error *local_err = NULL;
+    int ret = 0;
+    uint32_t flags = 0;
+
+    trace_multifd_send_thread_start(p->id);
+    rcu_register_thread();
+
+    if (multifd_send_initial_packet(p, &local_err) < 0) {
+        ret = -1;
+        goto out;
+    }
+    /* initial packet */
+    p->num_packets = 1;
+
+    while (true) {
+        qemu_sem_wait(&p->sem);
+
+        if (atomic_read(&multifd_send_state->exiting)) {
+            break;
+        }
+        qemu_mutex_lock(&p->mutex);
+
+        if (p->pending_job) {
+            uint32_t used = p->pages->used;
+            uint64_t packet_num = p->packet_num;
+            flags = p->flags;
+
+            p->next_packet_size = used * qemu_target_page_size();
+            multifd_send_fill_packet(p);
+            p->flags = 0;
+            p->num_packets++;
+            p->num_pages += used;
+            p->pages->used = 0;
+            p->pages->block = NULL;
+            qemu_mutex_unlock(&p->mutex);
+
+            trace_multifd_send(p->id, packet_num, used, flags,
+                               p->next_packet_size);
+
+            ret = qio_channel_write_all(p->c, (void *)p->packet,
+                                        p->packet_len, &local_err);
+            if (ret != 0) {
+                break;
+            }
+
+            if (used) {
+                ret = qio_channel_writev_all(p->c, p->pages->iov,
+                                             used, &local_err);
+                if (ret != 0) {
+                    break;
+                }
+            }
+
+            qemu_mutex_lock(&p->mutex);
+            p->pending_job--;
+            qemu_mutex_unlock(&p->mutex);
+
+            if (flags & MULTIFD_FLAG_SYNC) {
+                qemu_sem_post(&p->sem_sync);
+            }
+            qemu_sem_post(&multifd_send_state->channels_ready);
+        } else if (p->quit) {
+            qemu_mutex_unlock(&p->mutex);
+            break;
+        } else {
+            qemu_mutex_unlock(&p->mutex);
+            /* sometimes there are spurious wakeups */
+        }
+    }
+
+out:
+    if (local_err) {
+        trace_multifd_send_error(p->id);
+        multifd_send_terminate_threads(local_err);
+    }
+
+    /*
+     * Error happen, I will exit, but I can't just leave, tell
+     * who pay attention to me.
+     */
+    if (ret != 0) {
+        qemu_sem_post(&p->sem_sync);
+        qemu_sem_post(&multifd_send_state->channels_ready);
+    }
+
+    qemu_mutex_lock(&p->mutex);
+    p->running = false;
+    qemu_mutex_unlock(&p->mutex);
+
+    rcu_unregister_thread();
+    trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
+
+    return NULL;
+}
+
+static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
+{
+    MultiFDSendParams *p = opaque;
+    QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
+    Error *local_err = NULL;
+
+    trace_multifd_new_send_channel_async(p->id);
+    if (qio_task_propagate_error(task, &local_err)) {
+        migrate_set_error(migrate_get_current(), local_err);
+        /* Error happen, we need to tell who pay attention to me */
+        qemu_sem_post(&multifd_send_state->channels_ready);
+        qemu_sem_post(&p->sem_sync);
+        /*
+         * Although multifd_send_thread is not created, but main migration
+         * thread neet to judge whether it is running, so we need to mark
+         * its status.
+         */
+        p->quit = true;
+    } else {
+        p->c = QIO_CHANNEL(sioc);
+        qio_channel_set_delay(p->c, false);
+        p->running = true;
+        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
+                           QEMU_THREAD_JOINABLE);
+    }
+}
+
+int multifd_save_setup(Error **errp)
+{
+    int thread_count;
+    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+    uint8_t i;
+
+    if (!migrate_use_multifd()) {
+        return 0;
+    }
+    thread_count = migrate_multifd_channels();
+    multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
+    multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
+    multifd_send_state->pages = multifd_pages_init(page_count);
+    qemu_sem_init(&multifd_send_state->channels_ready, 0);
+    atomic_set(&multifd_send_state->exiting, 0);
+
+    for (i = 0; i < thread_count; i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+
+        qemu_mutex_init(&p->mutex);
+        qemu_sem_init(&p->sem, 0);
+        qemu_sem_init(&p->sem_sync, 0);
+        p->quit = false;
+        p->pending_job = 0;
+        p->id = i;
+        p->pages = multifd_pages_init(page_count);
+        p->packet_len = sizeof(MultiFDPacket_t)
+                      + sizeof(uint64_t) * page_count;
+        p->packet = g_malloc0(p->packet_len);
+        p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
+        p->packet->version = cpu_to_be32(MULTIFD_VERSION);
+        p->name = g_strdup_printf("multifdsend_%d", i);
+        socket_send_channel_create(multifd_new_send_channel_async, p);
+    }
+    return 0;
+}
+
+struct {
+    MultiFDRecvParams *params;
+    /* number of created threads */
+    int count;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
+    /* global number of generated multifd packets */
+    uint64_t packet_num;
+} *multifd_recv_state;
+
+static void multifd_recv_terminate_threads(Error *err)
+{
+    int i;
+
+    trace_multifd_recv_terminate_threads(err != NULL);
+
+    if (err) {
+        MigrationState *s = migrate_get_current();
+        migrate_set_error(s, err);
+        if (s->state == MIGRATION_STATUS_SETUP ||
+            s->state == MIGRATION_STATUS_ACTIVE) {
+            migrate_set_state(&s->state, s->state,
+                              MIGRATION_STATUS_FAILED);
+        }
+    }
+
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        qemu_mutex_lock(&p->mutex);
+        p->quit = true;
+        /*
+         * We could arrive here for two reasons:
+         *  - normal quit, i.e. everything went fine, just finished
+         *  - error quit: We close the channels so the channel threads
+         *    finish the qio_channel_read_all_eof()
+         */
+        if (p->c) {
+            qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+        }
+        qemu_mutex_unlock(&p->mutex);
+    }
+}
+
+int multifd_load_cleanup(Error **errp)
+{
+    int i;
+    int ret = 0;
+
+    if (!migrate_use_multifd()) {
+        return 0;
+    }
+    multifd_recv_terminate_threads(NULL);
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        if (p->running) {
+            p->quit = true;
+            /*
+             * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
+             * however try to wakeup it without harm in cleanup phase.
+             */
+            qemu_sem_post(&p->sem_sync);
+            qemu_thread_join(&p->thread);
+        }
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        object_unref(OBJECT(p->c));
+        p->c = NULL;
+        qemu_mutex_destroy(&p->mutex);
+        qemu_sem_destroy(&p->sem_sync);
+        g_free(p->name);
+        p->name = NULL;
+        multifd_pages_clear(p->pages);
+        p->pages = NULL;
+        p->packet_len = 0;
+        g_free(p->packet);
+        p->packet = NULL;
+    }
+    qemu_sem_destroy(&multifd_recv_state->sem_sync);
+    g_free(multifd_recv_state->params);
+    multifd_recv_state->params = NULL;
+    g_free(multifd_recv_state);
+    multifd_recv_state = NULL;
+
+    return ret;
+}
+
+void multifd_recv_sync_main(void)
+{
+    int i;
+
+    if (!migrate_use_multifd()) {
+        return;
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        trace_multifd_recv_sync_main_wait(p->id);
+        qemu_sem_wait(&multifd_recv_state->sem_sync);
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        qemu_mutex_lock(&p->mutex);
+        if (multifd_recv_state->packet_num < p->packet_num) {
+            multifd_recv_state->packet_num = p->packet_num;
+        }
+        qemu_mutex_unlock(&p->mutex);
+        trace_multifd_recv_sync_main_signal(p->id);
+        qemu_sem_post(&p->sem_sync);
+    }
+    trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
+}
+
+static void *multifd_recv_thread(void *opaque)
+{
+    MultiFDRecvParams *p = opaque;
+    Error *local_err = NULL;
+    int ret;
+
+    trace_multifd_recv_thread_start(p->id);
+    rcu_register_thread();
+
+    while (true) {
+        uint32_t used;
+        uint32_t flags;
+
+        if (p->quit) {
+            break;
+        }
+
+        ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
+                                       p->packet_len, &local_err);
+        if (ret == 0) {   /* EOF */
+            break;
+        }
+        if (ret == -1) {   /* Error */
+            break;
+        }
+
+        qemu_mutex_lock(&p->mutex);
+        ret = multifd_recv_unfill_packet(p, &local_err);
+        if (ret) {
+            qemu_mutex_unlock(&p->mutex);
+            break;
+        }
+
+        used = p->pages->used;
+        flags = p->flags;
+        trace_multifd_recv(p->id, p->packet_num, used, flags,
+                           p->next_packet_size);
+        p->num_packets++;
+        p->num_pages += used;
+        qemu_mutex_unlock(&p->mutex);
+
+        if (used) {
+            ret = qio_channel_readv_all(p->c, p->pages->iov,
+                                        used, &local_err);
+            if (ret != 0) {
+                break;
+            }
+        }
+
+        if (flags & MULTIFD_FLAG_SYNC) {
+            qemu_sem_post(&multifd_recv_state->sem_sync);
+            qemu_sem_wait(&p->sem_sync);
+        }
+    }
+
+    if (local_err) {
+        multifd_recv_terminate_threads(local_err);
+    }
+    qemu_mutex_lock(&p->mutex);
+    p->running = false;
+    qemu_mutex_unlock(&p->mutex);
+
+    rcu_unregister_thread();
+    trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
+
+    return NULL;
+}
+
+int multifd_load_setup(Error **errp)
+{
+    int thread_count;
+    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+    uint8_t i;
+
+    if (!migrate_use_multifd()) {
+        return 0;
+    }
+    thread_count = migrate_multifd_channels();
+    multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
+    multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
+    atomic_set(&multifd_recv_state->count, 0);
+    qemu_sem_init(&multifd_recv_state->sem_sync, 0);
+
+    for (i = 0; i < thread_count; i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        qemu_mutex_init(&p->mutex);
+        qemu_sem_init(&p->sem_sync, 0);
+        p->quit = false;
+        p->id = i;
+        p->pages = multifd_pages_init(page_count);
+        p->packet_len = sizeof(MultiFDPacket_t)
+                      + sizeof(uint64_t) * page_count;
+        p->packet = g_malloc0(p->packet_len);
+        p->name = g_strdup_printf("multifdrecv_%d", i);
+    }
+    return 0;
+}
+
+bool multifd_recv_all_channels_created(void)
+{
+    int thread_count = migrate_multifd_channels();
+
+    if (!migrate_use_multifd()) {
+        return true;
+    }
+
+    return thread_count == atomic_read(&multifd_recv_state->count);
+}
+
+/*
+ * Try to receive all multifd channels to get ready for the migration.
+ * - Return true and do not set @errp when correctly receving all channels;
+ * - Return false and do not set @errp when correctly receiving the current one;
+ * - Return false and set @errp when failing to receive the current channel.
+ */
+bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
+{
+    MultiFDRecvParams *p;
+    Error *local_err = NULL;
+    int id;
+
+    id = multifd_recv_initial_packet(ioc, &local_err);
+    if (id < 0) {
+        multifd_recv_terminate_threads(local_err);
+        error_propagate_prepend(errp, local_err,
+                                "failed to receive packet"
+                                " via multifd channel %d: ",
+                                atomic_read(&multifd_recv_state->count));
+        return false;
+    }
+    trace_multifd_recv_new_channel(id);
+
+    p = &multifd_recv_state->params[id];
+    if (p->c != NULL) {
+        error_setg(&local_err, "multifd: received id '%d' already setup'",
+                   id);
+        multifd_recv_terminate_threads(local_err);
+        error_propagate(errp, local_err);
+        return false;
+    }
+    p->c = ioc;
+    object_ref(OBJECT(ioc));
+    /* initial packet */
+    p->num_packets = 1;
+
+    p->running = true;
+    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
+                       QEMU_THREAD_JOINABLE);
+    atomic_inc(&multifd_recv_state->count);
+    return atomic_read(&multifd_recv_state->count) ==
+           migrate_multifd_channels();
+}
+
diff --git a/migration/multifd.h b/migration/multifd.h
new file mode 100644
index 0000000000..d8b0205977
--- /dev/null
+++ b/migration/multifd.h
@@ -0,0 +1,139 @@
+/*
+ * Multifd common functions
+ *
+ * Copyright (c) 2019-2020 Red Hat Inc
+ *
+ * Authors:
+ *  Juan Quintela <quintela@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+
+#ifndef QEMU_MIGRATION_MULTIFD_H
+#define QEMU_MIGRATION_MULTIFD_H
+
+int multifd_save_setup(Error **errp);
+void multifd_save_cleanup(void);
+int multifd_load_setup(Error **errp);
+int multifd_load_cleanup(Error **errp);
+bool multifd_recv_all_channels_created(void);
+bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
+void multifd_recv_sync_main(void);
+void multifd_send_sync_main(QEMUFile *f);
+int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset);
+
+#define MULTIFD_FLAG_SYNC (1 << 0)
+
+/* This value needs to be a multiple of qemu_target_page_size() */
+#define MULTIFD_PACKET_SIZE (512 * 1024)
+
+typedef struct {
+    uint32_t magic;
+    uint32_t version;
+    uint32_t flags;
+    /* maximum number of allocated pages */
+    uint32_t pages_alloc;
+    uint32_t pages_used;
+    /* size of the next packet that contains pages */
+    uint32_t next_packet_size;
+    uint64_t packet_num;
+    uint64_t unused[4];    /* Reserved for future use */
+    char ramblock[256];
+    uint64_t offset[];
+} __attribute__((packed)) MultiFDPacket_t;
+
+typedef struct {
+    /* number of used pages */
+    uint32_t used;
+    /* number of allocated pages */
+    uint32_t allocated;
+    /* global number of generated multifd packets */
+    uint64_t packet_num;
+    /* offset of each page */
+    ram_addr_t *offset;
+    /* pointer to each page */
+    struct iovec *iov;
+    RAMBlock *block;
+} MultiFDPages_t;
+
+typedef struct {
+    /* this fields are not changed once the thread is created */
+    /* channel number */
+    uint8_t id;
+    /* channel thread name */
+    char *name;
+    /* channel thread id */
+    QemuThread thread;
+    /* communication channel */
+    QIOChannel *c;
+    /* sem where to wait for more work */
+    QemuSemaphore sem;
+    /* this mutex protects the following parameters */
+    QemuMutex mutex;
+    /* is this channel thread running */
+    bool running;
+    /* should this thread finish */
+    bool quit;
+    /* thread has work to do */
+    int pending_job;
+    /* array of pages to sent */
+    MultiFDPages_t *pages;
+    /* packet allocated len */
+    uint32_t packet_len;
+    /* pointer to the packet */
+    MultiFDPacket_t *packet;
+    /* multifd flags for each packet */
+    uint32_t flags;
+    /* size of the next packet that contains pages */
+    uint32_t next_packet_size;
+    /* global number of generated multifd packets */
+    uint64_t packet_num;
+    /* thread local variables */
+    /* packets sent through this channel */
+    uint64_t num_packets;
+    /* pages sent through this channel */
+    uint64_t num_pages;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
+}  MultiFDSendParams;
+
+typedef struct {
+    /* this fields are not changed once the thread is created */
+    /* channel number */
+    uint8_t id;
+    /* channel thread name */
+    char *name;
+    /* channel thread id */
+    QemuThread thread;
+    /* communication channel */
+    QIOChannel *c;
+    /* this mutex protects the following parameters */
+    QemuMutex mutex;
+    /* is this channel thread running */
+    bool running;
+    /* should this thread finish */
+    bool quit;
+    /* array of pages to receive */
+    MultiFDPages_t *pages;
+    /* packet allocated len */
+    uint32_t packet_len;
+    /* pointer to the packet */
+    MultiFDPacket_t *packet;
+    /* multifd flags for each packet */
+    uint32_t flags;
+    /* global number of generated multifd packets */
+    uint64_t packet_num;
+    /* thread local variables */
+    /* size of the next packet that contains pages */
+    uint32_t next_packet_size;
+    /* packets sent through this channel */
+    uint64_t num_packets;
+    /* pages sent through this channel */
+    uint64_t num_pages;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
+} MultiFDRecvParams;
+
+#endif
+
diff --git a/migration/ram.c b/migration/ram.c
index 3abd41ad33..ed23ed1c7c 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -36,7 +36,6 @@
 #include "xbzrle.h"
 #include "ram.h"
 #include "migration.h"
-#include "socket.h"
 #include "migration/register.h"
 #include "migration/misc.h"
 #include "qemu-file.h"
@@ -53,9 +52,9 @@
 #include "migration/colo.h"
 #include "block.h"
 #include "sysemu/sysemu.h"
-#include "qemu/uuid.h"
 #include "savevm.h"
 #include "qemu/iov.h"
+#include "multifd.h"
 
 /***********************************************************/
 /* ram save/restore */
@@ -575,991 +574,6 @@ exit:
     return -1;
 }
 
-/* Multiple fd's */
-
-#define MULTIFD_MAGIC 0x11223344U
-#define MULTIFD_VERSION 1
-
-#define MULTIFD_FLAG_SYNC (1 << 0)
-
-/* This value needs to be a multiple of qemu_target_page_size() */
-#define MULTIFD_PACKET_SIZE (512 * 1024)
-
-typedef struct {
-    uint32_t magic;
-    uint32_t version;
-    unsigned char uuid[16]; /* QemuUUID */
-    uint8_t id;
-    uint8_t unused1[7];     /* Reserved for future use */
-    uint64_t unused2[4];    /* Reserved for future use */
-} __attribute__((packed)) MultiFDInit_t;
-
-typedef struct {
-    uint32_t magic;
-    uint32_t version;
-    uint32_t flags;
-    /* maximum number of allocated pages */
-    uint32_t pages_alloc;
-    uint32_t pages_used;
-    /* size of the next packet that contains pages */
-    uint32_t next_packet_size;
-    uint64_t packet_num;
-    uint64_t unused[4];    /* Reserved for future use */
-    char ramblock[256];
-    uint64_t offset[];
-} __attribute__((packed)) MultiFDPacket_t;
-
-typedef struct {
-    /* number of used pages */
-    uint32_t used;
-    /* number of allocated pages */
-    uint32_t allocated;
-    /* global number of generated multifd packets */
-    uint64_t packet_num;
-    /* offset of each page */
-    ram_addr_t *offset;
-    /* pointer to each page */
-    struct iovec *iov;
-    RAMBlock *block;
-} MultiFDPages_t;
-
-typedef struct {
-    /* this fields are not changed once the thread is created */
-    /* channel number */
-    uint8_t id;
-    /* channel thread name */
-    char *name;
-    /* channel thread id */
-    QemuThread thread;
-    /* communication channel */
-    QIOChannel *c;
-    /* sem where to wait for more work */
-    QemuSemaphore sem;
-    /* this mutex protects the following parameters */
-    QemuMutex mutex;
-    /* is this channel thread running */
-    bool running;
-    /* should this thread finish */
-    bool quit;
-    /* thread has work to do */
-    int pending_job;
-    /* array of pages to sent */
-    MultiFDPages_t *pages;
-    /* packet allocated len */
-    uint32_t packet_len;
-    /* pointer to the packet */
-    MultiFDPacket_t *packet;
-    /* multifd flags for each packet */
-    uint32_t flags;
-    /* size of the next packet that contains pages */
-    uint32_t next_packet_size;
-    /* global number of generated multifd packets */
-    uint64_t packet_num;
-    /* thread local variables */
-    /* packets sent through this channel */
-    uint64_t num_packets;
-    /* pages sent through this channel */
-    uint64_t num_pages;
-    /* syncs main thread and channels */
-    QemuSemaphore sem_sync;
-}  MultiFDSendParams;
-
-typedef struct {
-    /* this fields are not changed once the thread is created */
-    /* channel number */
-    uint8_t id;
-    /* channel thread name */
-    char *name;
-    /* channel thread id */
-    QemuThread thread;
-    /* communication channel */
-    QIOChannel *c;
-    /* this mutex protects the following parameters */
-    QemuMutex mutex;
-    /* is this channel thread running */
-    bool running;
-    /* should this thread finish */
-    bool quit;
-    /* array of pages to receive */
-    MultiFDPages_t *pages;
-    /* packet allocated len */
-    uint32_t packet_len;
-    /* pointer to the packet */
-    MultiFDPacket_t *packet;
-    /* multifd flags for each packet */
-    uint32_t flags;
-    /* global number of generated multifd packets */
-    uint64_t packet_num;
-    /* thread local variables */
-    /* size of the next packet that contains pages */
-    uint32_t next_packet_size;
-    /* packets sent through this channel */
-    uint64_t num_packets;
-    /* pages sent through this channel */
-    uint64_t num_pages;
-    /* syncs main thread and channels */
-    QemuSemaphore sem_sync;
-} MultiFDRecvParams;
-
-static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
-{
-    MultiFDInit_t msg = {};
-    int ret;
-
-    msg.magic = cpu_to_be32(MULTIFD_MAGIC);
-    msg.version = cpu_to_be32(MULTIFD_VERSION);
-    msg.id = p->id;
-    memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
-
-    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
-    if (ret != 0) {
-        return -1;
-    }
-    return 0;
-}
-
-static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
-{
-    MultiFDInit_t msg;
-    int ret;
-
-    ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
-    if (ret != 0) {
-        return -1;
-    }
-
-    msg.magic = be32_to_cpu(msg.magic);
-    msg.version = be32_to_cpu(msg.version);
-
-    if (msg.magic != MULTIFD_MAGIC) {
-        error_setg(errp, "multifd: received packet magic %x "
-                   "expected %x", msg.magic, MULTIFD_MAGIC);
-        return -1;
-    }
-
-    if (msg.version != MULTIFD_VERSION) {
-        error_setg(errp, "multifd: received packet version %d "
-                   "expected %d", msg.version, MULTIFD_VERSION);
-        return -1;
-    }
-
-    if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
-        char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
-        char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
-
-        error_setg(errp, "multifd: received uuid '%s' and expected "
-                   "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
-        g_free(uuid);
-        g_free(msg_uuid);
-        return -1;
-    }
-
-    if (msg.id > migrate_multifd_channels()) {
-        error_setg(errp, "multifd: received channel version %d "
-                   "expected %d", msg.version, MULTIFD_VERSION);
-        return -1;
-    }
-
-    return msg.id;
-}
-
-static MultiFDPages_t *multifd_pages_init(size_t size)
-{
-    MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
-
-    pages->allocated = size;
-    pages->iov = g_new0(struct iovec, size);
-    pages->offset = g_new0(ram_addr_t, size);
-
-    return pages;
-}
-
-static void multifd_pages_clear(MultiFDPages_t *pages)
-{
-    pages->used = 0;
-    pages->allocated = 0;
-    pages->packet_num = 0;
-    pages->block = NULL;
-    g_free(pages->iov);
-    pages->iov = NULL;
-    g_free(pages->offset);
-    pages->offset = NULL;
-    g_free(pages);
-}
-
-static void multifd_send_fill_packet(MultiFDSendParams *p)
-{
-    MultiFDPacket_t *packet = p->packet;
-    int i;
-
-    packet->flags = cpu_to_be32(p->flags);
-    packet->pages_alloc = cpu_to_be32(p->pages->allocated);
-    packet->pages_used = cpu_to_be32(p->pages->used);
-    packet->next_packet_size = cpu_to_be32(p->next_packet_size);
-    packet->packet_num = cpu_to_be64(p->packet_num);
-
-    if (p->pages->block) {
-        strncpy(packet->ramblock, p->pages->block->idstr, 256);
-    }
-
-    for (i = 0; i < p->pages->used; i++) {
-        /* there are architectures where ram_addr_t is 32 bit */
-        uint64_t temp = p->pages->offset[i];
-
-        packet->offset[i] = cpu_to_be64(temp);
-    }
-}
-
-static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
-{
-    MultiFDPacket_t *packet = p->packet;
-    uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size();
-    RAMBlock *block;
-    int i;
-
-    packet->magic = be32_to_cpu(packet->magic);
-    if (packet->magic != MULTIFD_MAGIC) {
-        error_setg(errp, "multifd: received packet "
-                   "magic %x and expected magic %x",
-                   packet->magic, MULTIFD_MAGIC);
-        return -1;
-    }
-
-    packet->version = be32_to_cpu(packet->version);
-    if (packet->version != MULTIFD_VERSION) {
-        error_setg(errp, "multifd: received packet "
-                   "version %d and expected version %d",
-                   packet->version, MULTIFD_VERSION);
-        return -1;
-    }
-
-    p->flags = be32_to_cpu(packet->flags);
-
-    packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
-    /*
-     * If we received a packet that is 100 times bigger than expected
-     * just stop migration.  It is a magic number.
-     */
-    if (packet->pages_alloc > pages_max * 100) {
-        error_setg(errp, "multifd: received packet "
-                   "with size %d and expected a maximum size of %d",
-                   packet->pages_alloc, pages_max * 100) ;
-        return -1;
-    }
-    /*
-     * We received a packet that is bigger than expected but inside
-     * reasonable limits (see previous comment).  Just reallocate.
-     */
-    if (packet->pages_alloc > p->pages->allocated) {
-        multifd_pages_clear(p->pages);
-        p->pages = multifd_pages_init(packet->pages_alloc);
-    }
-
-    p->pages->used = be32_to_cpu(packet->pages_used);
-    if (p->pages->used > packet->pages_alloc) {
-        error_setg(errp, "multifd: received packet "
-                   "with %d pages and expected maximum pages are %d",
-                   p->pages->used, packet->pages_alloc) ;
-        return -1;
-    }
-
-    p->next_packet_size = be32_to_cpu(packet->next_packet_size);
-    p->packet_num = be64_to_cpu(packet->packet_num);
-
-    if (p->pages->used == 0) {
-        return 0;
-    }
-
-    /* make sure that ramblock is 0 terminated */
-    packet->ramblock[255] = 0;
-    block = qemu_ram_block_by_name(packet->ramblock);
-    if (!block) {
-        error_setg(errp, "multifd: unknown ram block %s",
-                   packet->ramblock);
-        return -1;
-    }
-
-    for (i = 0; i < p->pages->used; i++) {
-        uint64_t offset = be64_to_cpu(packet->offset[i]);
-
-        if (offset > (block->used_length - qemu_target_page_size())) {
-            error_setg(errp, "multifd: offset too long %" PRIu64
-                       " (max " RAM_ADDR_FMT ")",
-                       offset, block->max_length);
-            return -1;
-        }
-        p->pages->iov[i].iov_base = block->host + offset;
-        p->pages->iov[i].iov_len = qemu_target_page_size();
-    }
-
-    return 0;
-}
-
-struct {
-    MultiFDSendParams *params;
-    /* array of pages to sent */
-    MultiFDPages_t *pages;
-    /* global number of generated multifd packets */
-    uint64_t packet_num;
-    /* send channels ready */
-    QemuSemaphore channels_ready;
-    /*
-     * Have we already run terminate threads.  There is a race when it
-     * happens that we got one error while we are exiting.
-     * We will use atomic operations.  Only valid values are 0 and 1.
-     */
-    int exiting;
-} *multifd_send_state;
-
-/*
- * How we use multifd_send_state->pages and channel->pages?
- *
- * We create a pages for each channel, and a main one.  Each time that
- * we need to send a batch of pages we interchange the ones between
- * multifd_send_state and the channel that is sending it.  There are
- * two reasons for that:
- *    - to not have to do so many mallocs during migration
- *    - to make easier to know what to free at the end of migration
- *
- * This way we always know who is the owner of each "pages" struct,
- * and we don't need any locking.  It belongs to the migration thread
- * or to the channel thread.  Switching is safe because the migration
- * thread is using the channel mutex when changing it, and the channel
- * have to had finish with its own, otherwise pending_job can't be
- * false.
- */
-
-static int multifd_send_pages(QEMUFile *f)
-{
-    int i;
-    static int next_channel;
-    MultiFDSendParams *p = NULL; /* make happy gcc */
-    MultiFDPages_t *pages = multifd_send_state->pages;
-    uint64_t transferred;
-
-    if (atomic_read(&multifd_send_state->exiting)) {
-        return -1;
-    }
-
-    qemu_sem_wait(&multifd_send_state->channels_ready);
-    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
-        p = &multifd_send_state->params[i];
-
-        qemu_mutex_lock(&p->mutex);
-        if (p->quit) {
-            error_report("%s: channel %d has already quit!", __func__, i);
-            qemu_mutex_unlock(&p->mutex);
-            return -1;
-        }
-        if (!p->pending_job) {
-            p->pending_job++;
-            next_channel = (i + 1) % migrate_multifd_channels();
-            break;
-        }
-        qemu_mutex_unlock(&p->mutex);
-    }
-    assert(!p->pages->used);
-    assert(!p->pages->block);
-
-    p->packet_num = multifd_send_state->packet_num++;
-    multifd_send_state->pages = p->pages;
-    p->pages = pages;
-    transferred = ((uint64_t) pages->used) * qemu_target_page_size()
-                + p->packet_len;
-    qemu_file_update_transfer(f, transferred);
-    ram_counters.multifd_bytes += transferred;
-    ram_counters.transferred += transferred;;
-    qemu_mutex_unlock(&p->mutex);
-    qemu_sem_post(&p->sem);
-
-    return 1;
-}
-
-static int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
-{
-    MultiFDPages_t *pages = multifd_send_state->pages;
-
-    if (!pages->block) {
-        pages->block = block;
-    }
-
-    if (pages->block == block) {
-        pages->offset[pages->used] = offset;
-        pages->iov[pages->used].iov_base = block->host + offset;
-        pages->iov[pages->used].iov_len = qemu_target_page_size();
-        pages->used++;
-
-        if (pages->used < pages->allocated) {
-            return 1;
-        }
-    }
-
-    if (multifd_send_pages(f) < 0) {
-        return -1;
-    }
-
-    if (pages->block != block) {
-        return  multifd_queue_page(f, block, offset);
-    }
-
-    return 1;
-}
-
-static void multifd_send_terminate_threads(Error *err)
-{
-    int i;
-
-    trace_multifd_send_terminate_threads(err != NULL);
-
-    if (err) {
-        MigrationState *s = migrate_get_current();
-        migrate_set_error(s, err);
-        if (s->state == MIGRATION_STATUS_SETUP ||
-            s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
-            s->state == MIGRATION_STATUS_DEVICE ||
-            s->state == MIGRATION_STATUS_ACTIVE) {
-            migrate_set_state(&s->state, s->state,
-                              MIGRATION_STATUS_FAILED);
-        }
-    }
-
-    /*
-     * We don't want to exit each threads twice.  Depending on where
-     * we get the error, or if there are two independent errors in two
-     * threads at the same time, we can end calling this function
-     * twice.
-     */
-    if (atomic_xchg(&multifd_send_state->exiting, 1)) {
-        return;
-    }
-
-    for (i = 0; i < migrate_multifd_channels(); i++) {
-        MultiFDSendParams *p = &multifd_send_state->params[i];
-
-        qemu_mutex_lock(&p->mutex);
-        p->quit = true;
-        qemu_sem_post(&p->sem);
-        qemu_mutex_unlock(&p->mutex);
-    }
-}
-
-void multifd_save_cleanup(void)
-{
-    int i;
-
-    if (!migrate_use_multifd()) {
-        return;
-    }
-    multifd_send_terminate_threads(NULL);
-    for (i = 0; i < migrate_multifd_channels(); i++) {
-        MultiFDSendParams *p = &multifd_send_state->params[i];
-
-        if (p->running) {
-            qemu_thread_join(&p->thread);
-        }
-    }
-    for (i = 0; i < migrate_multifd_channels(); i++) {
-        MultiFDSendParams *p = &multifd_send_state->params[i];
-
-        socket_send_channel_destroy(p->c);
-        p->c = NULL;
-        qemu_mutex_destroy(&p->mutex);
-        qemu_sem_destroy(&p->sem);
-        qemu_sem_destroy(&p->sem_sync);
-        g_free(p->name);
-        p->name = NULL;
-        multifd_pages_clear(p->pages);
-        p->pages = NULL;
-        p->packet_len = 0;
-        g_free(p->packet);
-        p->packet = NULL;
-    }
-    qemu_sem_destroy(&multifd_send_state->channels_ready);
-    g_free(multifd_send_state->params);
-    multifd_send_state->params = NULL;
-    multifd_pages_clear(multifd_send_state->pages);
-    multifd_send_state->pages = NULL;
-    g_free(multifd_send_state);
-    multifd_send_state = NULL;
-}
-
-static void multifd_send_sync_main(QEMUFile *f)
-{
-    int i;
-
-    if (!migrate_use_multifd()) {
-        return;
-    }
-    if (multifd_send_state->pages->used) {
-        if (multifd_send_pages(f) < 0) {
-            error_report("%s: multifd_send_pages fail", __func__);
-            return;
-        }
-    }
-    for (i = 0; i < migrate_multifd_channels(); i++) {
-        MultiFDSendParams *p = &multifd_send_state->params[i];
-
-        trace_multifd_send_sync_main_signal(p->id);
-
-        qemu_mutex_lock(&p->mutex);
-
-        if (p->quit) {
-            error_report("%s: channel %d has already quit", __func__, i);
-            qemu_mutex_unlock(&p->mutex);
-            return;
-        }
-
-        p->packet_num = multifd_send_state->packet_num++;
-        p->flags |= MULTIFD_FLAG_SYNC;
-        p->pending_job++;
-        qemu_file_update_transfer(f, p->packet_len);
-        ram_counters.multifd_bytes += p->packet_len;
-        ram_counters.transferred += p->packet_len;
-        qemu_mutex_unlock(&p->mutex);
-        qemu_sem_post(&p->sem);
-    }
-    for (i = 0; i < migrate_multifd_channels(); i++) {
-        MultiFDSendParams *p = &multifd_send_state->params[i];
-
-        trace_multifd_send_sync_main_wait(p->id);
-        qemu_sem_wait(&p->sem_sync);
-    }
-    trace_multifd_send_sync_main(multifd_send_state->packet_num);
-}
-
-static void *multifd_send_thread(void *opaque)
-{
-    MultiFDSendParams *p = opaque;
-    Error *local_err = NULL;
-    int ret = 0;
-    uint32_t flags = 0;
-
-    trace_multifd_send_thread_start(p->id);
-    rcu_register_thread();
-
-    if (multifd_send_initial_packet(p, &local_err) < 0) {
-        ret = -1;
-        goto out;
-    }
-    /* initial packet */
-    p->num_packets = 1;
-
-    while (true) {
-        qemu_sem_wait(&p->sem);
-
-        if (atomic_read(&multifd_send_state->exiting)) {
-            break;
-        }
-        qemu_mutex_lock(&p->mutex);
-
-        if (p->pending_job) {
-            uint32_t used = p->pages->used;
-            uint64_t packet_num = p->packet_num;
-            flags = p->flags;
-
-            p->next_packet_size = used * qemu_target_page_size();
-            multifd_send_fill_packet(p);
-            p->flags = 0;
-            p->num_packets++;
-            p->num_pages += used;
-            p->pages->used = 0;
-            p->pages->block = NULL;
-            qemu_mutex_unlock(&p->mutex);
-
-            trace_multifd_send(p->id, packet_num, used, flags,
-                               p->next_packet_size);
-
-            ret = qio_channel_write_all(p->c, (void *)p->packet,
-                                        p->packet_len, &local_err);
-            if (ret != 0) {
-                break;
-            }
-
-            if (used) {
-                ret = qio_channel_writev_all(p->c, p->pages->iov,
-                                             used, &local_err);
-                if (ret != 0) {
-                    break;
-                }
-            }
-
-            qemu_mutex_lock(&p->mutex);
-            p->pending_job--;
-            qemu_mutex_unlock(&p->mutex);
-
-            if (flags & MULTIFD_FLAG_SYNC) {
-                qemu_sem_post(&p->sem_sync);
-            }
-            qemu_sem_post(&multifd_send_state->channels_ready);
-        } else if (p->quit) {
-            qemu_mutex_unlock(&p->mutex);
-            break;
-        } else {
-            qemu_mutex_unlock(&p->mutex);
-            /* sometimes there are spurious wakeups */
-        }
-    }
-
-out:
-    if (local_err) {
-        trace_multifd_send_error(p->id);
-        multifd_send_terminate_threads(local_err);
-    }
-
-    /*
-     * Error happen, I will exit, but I can't just leave, tell
-     * who pay attention to me.
-     */
-    if (ret != 0) {
-        qemu_sem_post(&p->sem_sync);
-        qemu_sem_post(&multifd_send_state->channels_ready);
-    }
-
-    qemu_mutex_lock(&p->mutex);
-    p->running = false;
-    qemu_mutex_unlock(&p->mutex);
-
-    rcu_unregister_thread();
-    trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
-
-    return NULL;
-}
-
-static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
-{
-    MultiFDSendParams *p = opaque;
-    QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
-    Error *local_err = NULL;
-
-    trace_multifd_new_send_channel_async(p->id);
-    if (qio_task_propagate_error(task, &local_err)) {
-        migrate_set_error(migrate_get_current(), local_err);
-        /* Error happen, we need to tell who pay attention to me */
-        qemu_sem_post(&multifd_send_state->channels_ready);
-        qemu_sem_post(&p->sem_sync);
-        /*
-         * Although multifd_send_thread is not created, but main migration
-         * thread neet to judge whether it is running, so we need to mark
-         * its status.
-         */
-        p->quit = true;
-    } else {
-        p->c = QIO_CHANNEL(sioc);
-        qio_channel_set_delay(p->c, false);
-        p->running = true;
-        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
-                           QEMU_THREAD_JOINABLE);
-    }
-}
-
-int multifd_save_setup(Error **errp)
-{
-    int thread_count;
-    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
-    uint8_t i;
-
-    if (!migrate_use_multifd()) {
-        return 0;
-    }
-    thread_count = migrate_multifd_channels();
-    multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
-    multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
-    multifd_send_state->pages = multifd_pages_init(page_count);
-    qemu_sem_init(&multifd_send_state->channels_ready, 0);
-    atomic_set(&multifd_send_state->exiting, 0);
-
-    for (i = 0; i < thread_count; i++) {
-        MultiFDSendParams *p = &multifd_send_state->params[i];
-
-        qemu_mutex_init(&p->mutex);
-        qemu_sem_init(&p->sem, 0);
-        qemu_sem_init(&p->sem_sync, 0);
-        p->quit = false;
-        p->pending_job = 0;
-        p->id = i;
-        p->pages = multifd_pages_init(page_count);
-        p->packet_len = sizeof(MultiFDPacket_t)
-                      + sizeof(uint64_t) * page_count;
-        p->packet = g_malloc0(p->packet_len);
-        p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
-        p->packet->version = cpu_to_be32(MULTIFD_VERSION);
-        p->name = g_strdup_printf("multifdsend_%d", i);
-        socket_send_channel_create(multifd_new_send_channel_async, p);
-    }
-    return 0;
-}
-
-struct {
-    MultiFDRecvParams *params;
-    /* number of created threads */
-    int count;
-    /* syncs main thread and channels */
-    QemuSemaphore sem_sync;
-    /* global number of generated multifd packets */
-    uint64_t packet_num;
-} *multifd_recv_state;
-
-static void multifd_recv_terminate_threads(Error *err)
-{
-    int i;
-
-    trace_multifd_recv_terminate_threads(err != NULL);
-
-    if (err) {
-        MigrationState *s = migrate_get_current();
-        migrate_set_error(s, err);
-        if (s->state == MIGRATION_STATUS_SETUP ||
-            s->state == MIGRATION_STATUS_ACTIVE) {
-            migrate_set_state(&s->state, s->state,
-                              MIGRATION_STATUS_FAILED);
-        }
-    }
-
-    for (i = 0; i < migrate_multifd_channels(); i++) {
-        MultiFDRecvParams *p = &multifd_recv_state->params[i];
-
-        qemu_mutex_lock(&p->mutex);
-        p->quit = true;
-        /*
-         * We could arrive here for two reasons:
-         *  - normal quit, i.e. everything went fine, just finished
-         *  - error quit: We close the channels so the channel threads
-         *    finish the qio_channel_read_all_eof()
-         */
-        if (p->c) {
-            qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
-        }
-        qemu_mutex_unlock(&p->mutex);
-    }
-}
-
-int multifd_load_cleanup(Error **errp)
-{
-    int i;
-    int ret = 0;
-
-    if (!migrate_use_multifd()) {
-        return 0;
-    }
-    multifd_recv_terminate_threads(NULL);
-    for (i = 0; i < migrate_multifd_channels(); i++) {
-        MultiFDRecvParams *p = &multifd_recv_state->params[i];
-
-        if (p->running) {
-            p->quit = true;
-            /*
-             * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
-             * however try to wakeup it without harm in cleanup phase.
-             */
-            qemu_sem_post(&p->sem_sync);
-            qemu_thread_join(&p->thread);
-        }
-    }
-    for (i = 0; i < migrate_multifd_channels(); i++) {
-        MultiFDRecvParams *p = &multifd_recv_state->params[i];
-
-        object_unref(OBJECT(p->c));
-        p->c = NULL;
-        qemu_mutex_destroy(&p->mutex);
-        qemu_sem_destroy(&p->sem_sync);
-        g_free(p->name);
-        p->name = NULL;
-        multifd_pages_clear(p->pages);
-        p->pages = NULL;
-        p->packet_len = 0;
-        g_free(p->packet);
-        p->packet = NULL;
-    }
-    qemu_sem_destroy(&multifd_recv_state->sem_sync);
-    g_free(multifd_recv_state->params);
-    multifd_recv_state->params = NULL;
-    g_free(multifd_recv_state);
-    multifd_recv_state = NULL;
-
-    return ret;
-}
-
-static void multifd_recv_sync_main(void)
-{
-    int i;
-
-    if (!migrate_use_multifd()) {
-        return;
-    }
-    for (i = 0; i < migrate_multifd_channels(); i++) {
-        MultiFDRecvParams *p = &multifd_recv_state->params[i];
-
-        trace_multifd_recv_sync_main_wait(p->id);
-        qemu_sem_wait(&multifd_recv_state->sem_sync);
-    }
-    for (i = 0; i < migrate_multifd_channels(); i++) {
-        MultiFDRecvParams *p = &multifd_recv_state->params[i];
-
-        qemu_mutex_lock(&p->mutex);
-        if (multifd_recv_state->packet_num < p->packet_num) {
-            multifd_recv_state->packet_num = p->packet_num;
-        }
-        qemu_mutex_unlock(&p->mutex);
-        trace_multifd_recv_sync_main_signal(p->id);
-        qemu_sem_post(&p->sem_sync);
-    }
-    trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
-}
-
-static void *multifd_recv_thread(void *opaque)
-{
-    MultiFDRecvParams *p = opaque;
-    Error *local_err = NULL;
-    int ret;
-
-    trace_multifd_recv_thread_start(p->id);
-    rcu_register_thread();
-
-    while (true) {
-        uint32_t used;
-        uint32_t flags;
-
-        if (p->quit) {
-            break;
-        }
-
-        ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
-                                       p->packet_len, &local_err);
-        if (ret == 0) {   /* EOF */
-            break;
-        }
-        if (ret == -1) {   /* Error */
-            break;
-        }
-
-        qemu_mutex_lock(&p->mutex);
-        ret = multifd_recv_unfill_packet(p, &local_err);
-        if (ret) {
-            qemu_mutex_unlock(&p->mutex);
-            break;
-        }
-
-        used = p->pages->used;
-        flags = p->flags;
-        trace_multifd_recv(p->id, p->packet_num, used, flags,
-                           p->next_packet_size);
-        p->num_packets++;
-        p->num_pages += used;
-        qemu_mutex_unlock(&p->mutex);
-
-        if (used) {
-            ret = qio_channel_readv_all(p->c, p->pages->iov,
-                                        used, &local_err);
-            if (ret != 0) {
-                break;
-            }
-        }
-
-        if (flags & MULTIFD_FLAG_SYNC) {
-            qemu_sem_post(&multifd_recv_state->sem_sync);
-            qemu_sem_wait(&p->sem_sync);
-        }
-    }
-
-    if (local_err) {
-        multifd_recv_terminate_threads(local_err);
-    }
-    qemu_mutex_lock(&p->mutex);
-    p->running = false;
-    qemu_mutex_unlock(&p->mutex);
-
-    rcu_unregister_thread();
-    trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
-
-    return NULL;
-}
-
-int multifd_load_setup(Error **errp)
-{
-    int thread_count;
-    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
-    uint8_t i;
-
-    if (!migrate_use_multifd()) {
-        return 0;
-    }
-    thread_count = migrate_multifd_channels();
-    multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
-    multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
-    atomic_set(&multifd_recv_state->count, 0);
-    qemu_sem_init(&multifd_recv_state->sem_sync, 0);
-
-    for (i = 0; i < thread_count; i++) {
-        MultiFDRecvParams *p = &multifd_recv_state->params[i];
-
-        qemu_mutex_init(&p->mutex);
-        qemu_sem_init(&p->sem_sync, 0);
-        p->quit = false;
-        p->id = i;
-        p->pages = multifd_pages_init(page_count);
-        p->packet_len = sizeof(MultiFDPacket_t)
-                      + sizeof(uint64_t) * page_count;
-        p->packet = g_malloc0(p->packet_len);
-        p->name = g_strdup_printf("multifdrecv_%d", i);
-    }
-    return 0;
-}
-
-bool multifd_recv_all_channels_created(void)
-{
-    int thread_count = migrate_multifd_channels();
-
-    if (!migrate_use_multifd()) {
-        return true;
-    }
-
-    return thread_count == atomic_read(&multifd_recv_state->count);
-}
-
-/*
- * Try to receive all multifd channels to get ready for the migration.
- * - Return true and do not set @errp when correctly receving all channels;
- * - Return false and do not set @errp when correctly receiving the current one;
- * - Return false and set @errp when failing to receive the current channel.
- */
-bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
-{
-    MultiFDRecvParams *p;
-    Error *local_err = NULL;
-    int id;
-
-    id = multifd_recv_initial_packet(ioc, &local_err);
-    if (id < 0) {
-        multifd_recv_terminate_threads(local_err);
-        error_propagate_prepend(errp, local_err,
-                                "failed to receive packet"
-                                " via multifd channel %d: ",
-                                atomic_read(&multifd_recv_state->count));
-        return false;
-    }
-    trace_multifd_recv_new_channel(id);
-
-    p = &multifd_recv_state->params[id];
-    if (p->c != NULL) {
-        error_setg(&local_err, "multifd: received id '%d' already setup'",
-                   id);
-        multifd_recv_terminate_threads(local_err);
-        error_propagate(errp, local_err);
-        return false;
-    }
-    p->c = ioc;
-    object_ref(OBJECT(ioc));
-    /* initial packet */
-    p->num_packets = 1;
-
-    p->running = true;
-    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
-                       QEMU_THREAD_JOINABLE);
-    atomic_inc(&multifd_recv_state->count);
-    return atomic_read(&multifd_recv_state->count) ==
-           migrate_multifd_channels();
-}
-
 /**
  * save_page_header: write page header to wire
  *
diff --git a/migration/ram.h b/migration/ram.h
index 42be471d52..a553d40751 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -41,13 +41,6 @@ int xbzrle_cache_resize(int64_t new_size, Error **errp);
 uint64_t ram_bytes_remaining(void);
 uint64_t ram_bytes_total(void);
 
-int multifd_save_setup(Error **errp);
-void multifd_save_cleanup(void);
-int multifd_load_setup(Error **errp);
-int multifd_load_cleanup(Error **errp);
-bool multifd_recv_all_channels_created(void);
-bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
-
 uint64_t ram_pagesize_summary(void);
 int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len);
 void acct_update_position(QEMUFile *f, size_t size, bool zero);
-- 
2.24.1



^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PULL 17/18] migration: Simplify get_qlist
  2020-01-29 11:15 [PULL 00/18] Pull migration patches Juan Quintela
                   ` (15 preceding siblings ...)
  2020-01-29 11:15 ` [PULL 16/18] multifd: Split multifd code into its own file Juan Quintela
@ 2020-01-29 11:15 ` Juan Quintela
  2020-01-29 11:15 ` [PULL 18/18] migration/compress: compress QEMUFile is not writable Juan Quintela
  2020-01-30 13:12 ` [PULL 00/18] Pull migration patches Peter Maydell
  18 siblings, 0 replies; 20+ messages in thread
From: Juan Quintela @ 2020-01-29 11:15 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Thomas Huth, Juan Quintela,
	Dr. David Alan Gilbert, Eric Auger, Paolo Bonzini

From: Eric Auger <eric.auger@redhat.com>

Instead of inserting read elements at the head and
then reversing the list, it is simpler to add
each element after the previous one. Introduce
QLIST_RAW_INSERT_AFTER helper and use it in
get_qlist().

Signed-off-by: Eric Auger <eric.auger@redhat.com>
Suggested-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Juan Quintela <quintela@redhat.com>
Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 include/qemu/queue.h      | 19 ++++++-------------
 migration/vmstate-types.c | 10 +++++++---
 2 files changed, 13 insertions(+), 16 deletions(-)

diff --git a/include/qemu/queue.h b/include/qemu/queue.h
index 4d4554a7ce..19425f973f 100644
--- a/include/qemu/queue.h
+++ b/include/qemu/queue.h
@@ -515,6 +515,12 @@ union {                                                                 \
              (elm);                                                            \
              (elm) = *QLIST_RAW_NEXT(elm, entry))
 
+#define QLIST_RAW_INSERT_AFTER(head, prev, elem, entry) do {                   \
+        *QLIST_RAW_NEXT(prev, entry) = elem;                                   \
+        *QLIST_RAW_PREVIOUS(elem, entry) = QLIST_RAW_NEXT(prev, entry);        \
+        *QLIST_RAW_NEXT(elem, entry) = NULL;                                   \
+} while (0)
+
 #define QLIST_RAW_INSERT_HEAD(head, elm, entry) do {                           \
         void *first = *QLIST_RAW_FIRST(head);                                  \
         *QLIST_RAW_FIRST(head) = elm;                                          \
@@ -527,17 +533,4 @@ union {                                                                 \
         }                                                                      \
 } while (0)
 
-#define QLIST_RAW_REVERSE(head, elm, entry) do {                               \
-        void *iter = *QLIST_RAW_FIRST(head), *prev = NULL, *next;              \
-        while (iter) {                                                         \
-            next = *QLIST_RAW_NEXT(iter, entry);                               \
-            *QLIST_RAW_PREVIOUS(iter, entry) = QLIST_RAW_NEXT(next, entry);    \
-            *QLIST_RAW_NEXT(iter, entry) = prev;                               \
-            prev = iter;                                                       \
-            iter = next;                                                       \
-        }                                                                      \
-        *QLIST_RAW_FIRST(head) = prev;                                         \
-        *QLIST_RAW_PREVIOUS(prev, entry) = QLIST_RAW_FIRST(head);              \
-} while (0)
-
 #endif /* QEMU_SYS_QUEUE_H */
diff --git a/migration/vmstate-types.c b/migration/vmstate-types.c
index 1eee36773a..35e784c9d9 100644
--- a/migration/vmstate-types.c
+++ b/migration/vmstate-types.c
@@ -879,7 +879,7 @@ static int get_qlist(QEMUFile *f, void *pv, size_t unused_size,
     /* offset of the QLIST entry in a QLIST element */
     size_t entry_offset = field->start;
     int version_id = field->version_id;
-    void *elm;
+    void *elm, *prev = NULL;
 
     trace_get_qlist(field->name, vmsd->name, vmsd->version_id);
     if (version_id > vmsd->version_id) {
@@ -900,9 +900,13 @@ static int get_qlist(QEMUFile *f, void *pv, size_t unused_size,
             g_free(elm);
             return ret;
         }
-        QLIST_RAW_INSERT_HEAD(pv, elm, entry_offset);
+        if (!prev) {
+            QLIST_RAW_INSERT_HEAD(pv, elm, entry_offset);
+        } else {
+            QLIST_RAW_INSERT_AFTER(pv, prev, elm, entry_offset);
+        }
+        prev = elm;
     }
-    QLIST_RAW_REVERSE(pv, elm, entry_offset);
     trace_get_qlist_end(field->name, vmsd->name);
 
     return ret;
-- 
2.24.1



^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PULL 18/18] migration/compress: compress QEMUFile is not writable
  2020-01-29 11:15 [PULL 00/18] Pull migration patches Juan Quintela
                   ` (16 preceding siblings ...)
  2020-01-29 11:15 ` [PULL 17/18] migration: Simplify get_qlist Juan Quintela
@ 2020-01-29 11:15 ` Juan Quintela
  2020-01-30 13:12 ` [PULL 00/18] Pull migration patches Peter Maydell
  18 siblings, 0 replies; 20+ messages in thread
From: Juan Quintela @ 2020-01-29 11:15 UTC (permalink / raw)
  To: qemu-devel
  Cc: Laurent Vivier, Thomas Huth, Juan Quintela,
	Dr. David Alan Gilbert, Wei Yang, Paolo Bonzini

From: Wei Yang <richardw.yang@linux.intel.com>

We open a file with empty_ops for compress QEMUFile, which means this is
not writable.

Signed-off-by: Wei Yang <richardw.yang@linux.intel.com>
Reviewed-by: Juan Quintela <quintela@redhat.com>
Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/qemu-file.c | 16 +++-------------
 1 file changed, 3 insertions(+), 13 deletions(-)

diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index bbb2b63927..1c3a358a14 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -764,11 +764,8 @@ static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
 /* Compress size bytes of data start at p and store the compressed
  * data to the buffer of f.
  *
- * When f is not writable, return -1 if f has no space to save the
- * compressed data.
- * When f is wirtable and it has no space to save the compressed data,
- * do fflush first, if f still has no space to save the compressed
- * data, return -1.
+ * Since the file is dummy file with empty_ops, return -1 if f has no space to
+ * save the compressed data.
  */
 ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
                                   const uint8_t *p, size_t size)
@@ -776,14 +773,7 @@ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
     ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t);
 
     if (blen < compressBound(size)) {
-        if (!qemu_file_is_writable(f)) {
-            return -1;
-        }
-        qemu_fflush(f);
-        blen = IO_BUF_SIZE - sizeof(int32_t);
-        if (blen < compressBound(size)) {
-            return -1;
-        }
+        return -1;
     }
 
     blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t),
-- 
2.24.1



^ permalink raw reply related	[flat|nested] 20+ messages in thread

* Re: [PULL 00/18] Pull migration patches
  2020-01-29 11:15 [PULL 00/18] Pull migration patches Juan Quintela
                   ` (17 preceding siblings ...)
  2020-01-29 11:15 ` [PULL 18/18] migration/compress: compress QEMUFile is not writable Juan Quintela
@ 2020-01-30 13:12 ` Peter Maydell
  18 siblings, 0 replies; 20+ messages in thread
From: Peter Maydell @ 2020-01-30 13:12 UTC (permalink / raw)
  To: Juan Quintela
  Cc: Laurent Vivier, Paolo Bonzini, Thomas Huth, QEMU Developers,
	Dr. David Alan Gilbert

On Wed, 29 Jan 2020 at 11:16, Juan Quintela <quintela@redhat.com> wrote:
>
> The following changes since commit 4c60e3289875ae6c516a37523bcecb87f68ce67c:
>
>   Merge remote-tracking branch 'remotes/rth/tags/pull-pa-20200127' into staging (2020-01-28 15:11:04 +0000)
>
> are available in the Git repository at:
>
>   https://github.com/juanquintela/qemu.git tags/pull-migration-pull-request
>
> for you to fetch changes up to 42d24611afc7610808ecb8770cf40e84714dd28e:
>
>   migration/compress: compress QEMUFile is not writable (2020-01-29 11:28:59 +0100)
>
> ----------------------------------------------------------------
> Migration pull request
>
> (this is a rerun of the previous pull request without the --- bits and
> rebased to latest)
>
> This pull request include:                                                            |
> - simplify get_qlist (eric)                                                           |
> - fix null in multifd_send_terminate_threads (zhimin)                                 |
> - small fix for compress (wei)                                                        |
> - migrate multifd + cancel fixes (juan)                                               |
> - migrate compression: the bits that are reviewed (juan)
>
> ----------------------------------------------------------------


Applied, thanks.

Please update the changelog at https://wiki.qemu.org/ChangeLog/5.0
for any user-visible changes.

-- PMM


^ permalink raw reply	[flat|nested] 20+ messages in thread

end of thread, other threads:[~2020-01-30 13:13 UTC | newest]

Thread overview: 20+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-01-29 11:15 [PULL 00/18] Pull migration patches Juan Quintela
2020-01-29 11:15 ` [PULL 01/18] migration-test: Use g_free() instead of free() Juan Quintela
2020-01-29 11:15 ` [PULL 02/18] multifd: Make sure that we don't do any IO after an error Juan Quintela
2020-01-29 11:15 ` [PULL 03/18] qemu-file: Don't do IO after shutdown Juan Quintela
2020-01-29 11:15 ` [PULL 04/18] migration: Don't send data if we have stopped Juan Quintela
2020-01-29 11:15 ` [PULL 05/18] migration-test: Make sure that multifd and cancel works Juan Quintela
2020-01-29 11:15 ` [PULL 06/18] migration: Create migration_is_running() Juan Quintela
2020-01-29 11:15 ` [PULL 07/18] migration/multifd: fix nullptr access in multifd_send_terminate_threads Juan Quintela
2020-01-29 11:15 ` [PULL 08/18] ram_addr: Split RAMBlock definition Juan Quintela
2020-01-29 11:15 ` [PULL 09/18] multifd: multifd_send_pages only needs the qemufile Juan Quintela
2020-01-29 11:15 ` [PULL 10/18] multifd: multifd_queue_page " Juan Quintela
2020-01-29 11:15 ` [PULL 11/18] multifd: multifd_send_sync_main " Juan Quintela
2020-01-29 11:15 ` [PULL 12/18] multifd: Use qemu_target_page_size() Juan Quintela
2020-01-29 11:15 ` [PULL 13/18] migration: Make checkpatch happy with comments Juan Quintela
2020-01-29 11:15 ` [PULL 14/18] multifd: Make multifd_save_setup() get an Error parameter Juan Quintela
2020-01-29 11:15 ` [PULL 15/18] multifd: Make multifd_load_setup() " Juan Quintela
2020-01-29 11:15 ` [PULL 16/18] multifd: Split multifd code into its own file Juan Quintela
2020-01-29 11:15 ` [PULL 17/18] migration: Simplify get_qlist Juan Quintela
2020-01-29 11:15 ` [PULL 18/18] migration/compress: compress QEMUFile is not writable Juan Quintela
2020-01-30 13:12 ` [PULL 00/18] Pull migration patches Peter Maydell

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.