All of lore.kernel.org
 help / color / mirror / Atom feed
* [Qemu-devel] [RFC 00/13] Multifd v2
@ 2016-10-21 19:42 Juan Quintela
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 01/13] migration: create Migration Incoming State at init time Juan Quintela
                   ` (13 more replies)
  0 siblings, 14 replies; 25+ messages in thread
From: Juan Quintela @ 2016-10-21 19:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

Hi

This is a version against current code.  It is based on top of QIO
work. It improves the thread synchronization and fixes the problem
when we could have two threads handing the same page.

Please comment, Juan.

Juan Quintela (13):
  migration: create Migration Incoming State at init time
  migration: [HACK] Don't create decompression threads if not enabled
  migration: Add multifd capability
  migration: Create x-multifd-threads parameter
  migration: create multifd migration threads
  migration: Start of multiple fd work
  migration: create ram_multifd_page
  migration: Create thread infrastructure for multifd send side
  migration: Send the fd number which we are going to use for this page
  migration: Create thread infrastructure for multifd recv side
  migration: Test new fd infrastructure
  migration: [HACK]Transfer pages over new channels
  migration: flush receive queue

 hmp.c                         |   9 +
 include/migration/migration.h |  16 +-
 migration/migration.c         |  93 +++++++---
 migration/ram.c               | 420 +++++++++++++++++++++++++++++++++++++++++-
 migration/savevm.c            |   4 +-
 migration/socket.c            |  53 +++++-
 qapi-schema.json              |  20 +-
 7 files changed, 580 insertions(+), 35 deletions(-)

-- 
2.7.4

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Qemu-devel] [PATCH 01/13] migration: create Migration Incoming State at init time
  2016-10-21 19:42 [Qemu-devel] [RFC 00/13] Multifd v2 Juan Quintela
@ 2016-10-21 19:42 ` Juan Quintela
  2016-10-26 18:05   ` Dr. David Alan Gilbert
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 02/13] migration: [HACK] Don't create decompression threads if not enabled Juan Quintela
                   ` (12 subsequent siblings)
  13 siblings, 1 reply; 25+ messages in thread
From: Juan Quintela @ 2016-10-21 19:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 include/migration/migration.h |  1 -
 migration/migration.c         | 38 +++++++++++++++++---------------------
 migration/savevm.c            |  4 ++--
 3 files changed, 19 insertions(+), 24 deletions(-)

diff --git a/include/migration/migration.h b/include/migration/migration.h
index 2791b90..37ef4f2 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -112,7 +112,6 @@ struct MigrationIncomingState {
 };

 MigrationIncomingState *migration_incoming_get_current(void);
-MigrationIncomingState *migration_incoming_state_new(QEMUFile *f);
 void migration_incoming_state_destroy(void);

 /*
diff --git a/migration/migration.c b/migration/migration.c
index 4d417b7..a71921f 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -104,32 +104,28 @@ MigrationState *migrate_get_current(void)
     return &current_migration;
 }

-/* For incoming */
-static MigrationIncomingState *mis_current;
-
 MigrationIncomingState *migration_incoming_get_current(void)
 {
-    return mis_current;
-}
+    static bool once;
+    static MigrationIncomingState mis_current;

-MigrationIncomingState *migration_incoming_state_new(QEMUFile* f)
-{
-    mis_current = g_new0(MigrationIncomingState, 1);
-    mis_current->from_src_file = f;
-    mis_current->state = MIGRATION_STATUS_NONE;
-    QLIST_INIT(&mis_current->loadvm_handlers);
-    qemu_mutex_init(&mis_current->rp_mutex);
-    qemu_event_init(&mis_current->main_thread_load_event, false);
-
-    return mis_current;
+    if (!once) {
+        mis_current.state = MIGRATION_STATUS_NONE;
+        memset(&mis_current, 0, sizeof(MigrationIncomingState));
+        QLIST_INIT(&mis_current.loadvm_handlers);
+        qemu_mutex_init(&mis_current.rp_mutex);
+        qemu_event_init(&mis_current.main_thread_load_event, false);
+        once = true;
+    }
+    return &mis_current;
 }

 void migration_incoming_state_destroy(void)
 {
-    qemu_event_destroy(&mis_current->main_thread_load_event);
-    loadvm_free_handlers(mis_current);
-    g_free(mis_current);
-    mis_current = NULL;
+    struct MigrationIncomingState *mis = migration_incoming_get_current();
+
+    qemu_event_destroy(&mis->main_thread_load_event);
+    loadvm_free_handlers(mis);
 }


@@ -375,11 +371,11 @@ static void process_incoming_migration_bh(void *opaque)
 static void process_incoming_migration_co(void *opaque)
 {
     QEMUFile *f = opaque;
-    MigrationIncomingState *mis;
+    MigrationIncomingState *mis = migration_incoming_get_current();
     PostcopyState ps;
     int ret;

-    mis = migration_incoming_state_new(f);
+    mis->from_src_file = f;
     postcopy_state_set(POSTCOPY_INCOMING_NONE);
     migrate_set_state(&mis->state, MIGRATION_STATUS_NONE,
                       MIGRATION_STATUS_ACTIVE);
diff --git a/migration/savevm.c b/migration/savevm.c
index a831ec2..de84be5 100644
--- a/migration/savevm.c
+++ b/migration/savevm.c
@@ -2107,7 +2107,6 @@ void qmp_xen_load_devices_state(const char *filename, Error **errp)
     }
     f = qemu_fopen_channel_input(QIO_CHANNEL(ioc));

-    migration_incoming_state_new(f);
     ret = qemu_loadvm_state(f);
     qemu_fclose(f);
     if (ret < 0) {
@@ -2123,6 +2122,7 @@ int load_vmstate(const char *name)
     QEMUFile *f;
     int ret;
     AioContext *aio_context;
+    MigrationIncomingState *mis = migration_incoming_get_current();

     if (!bdrv_all_can_snapshot(&bs)) {
         error_report("Device '%s' is writable but does not support snapshots.",
@@ -2173,7 +2173,7 @@ int load_vmstate(const char *name)
     }

     qemu_system_reset(VMRESET_SILENT);
-    migration_incoming_state_new(f);
+    mis->from_src_file = f;

     aio_context_acquire(aio_context);
     ret = qemu_loadvm_state(f);
-- 
2.7.4

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [Qemu-devel] [PATCH 02/13] migration: [HACK] Don't create decompression threads if not enabled
  2016-10-21 19:42 [Qemu-devel] [RFC 00/13] Multifd v2 Juan Quintela
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 01/13] migration: create Migration Incoming State at init time Juan Quintela
@ 2016-10-21 19:42 ` Juan Quintela
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 03/13] migration: Add multifd capability Juan Quintela
                   ` (11 subsequent siblings)
  13 siblings, 0 replies; 25+ messages in thread
From: Juan Quintela @ 2016-10-21 19:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

This is a partial fix, we also need to not allow reception of
compression packages if not enabled.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/migration/ram.c b/migration/ram.c
index bc6154f..495a931 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -2241,6 +2241,9 @@ void migrate_decompress_threads_create(void)
 {
     int i, thread_count;

+    if (!migrate_use_compression()) {
+        return;
+    }
     thread_count = migrate_decompress_threads();
     decompress_threads = g_new0(QemuThread, thread_count);
     decomp_param = g_new0(DecompressParam, thread_count);
@@ -2262,6 +2265,9 @@ void migrate_decompress_threads_join(void)
 {
     int i, thread_count;

+    if (!migrate_use_compression()) {
+        return;
+    }
     thread_count = migrate_decompress_threads();
     for (i = 0; i < thread_count; i++) {
         qemu_mutex_lock(&decomp_param[i].mutex);
-- 
2.7.4

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [Qemu-devel] [PATCH 03/13] migration: Add multifd capability
  2016-10-21 19:42 [Qemu-devel] [RFC 00/13] Multifd v2 Juan Quintela
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 01/13] migration: create Migration Incoming State at init time Juan Quintela
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 02/13] migration: [HACK] Don't create decompression threads if not enabled Juan Quintela
@ 2016-10-21 19:42 ` Juan Quintela
  2016-10-26 17:57   ` Dr. David Alan Gilbert
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 04/13] migration: Create x-multifd-threads parameter Juan Quintela
                   ` (10 subsequent siblings)
  13 siblings, 1 reply; 25+ messages in thread
From: Juan Quintela @ 2016-10-21 19:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 include/migration/migration.h | 1 +
 migration/migration.c         | 9 +++++++++
 qapi-schema.json              | 5 +++--
 3 files changed, 13 insertions(+), 2 deletions(-)

diff --git a/include/migration/migration.h b/include/migration/migration.h
index 37ef4f2..5666068 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -290,6 +290,7 @@ bool migrate_postcopy_ram(void);
 bool migrate_zero_blocks(void);

 bool migrate_auto_converge(void);
+bool migrate_multifd(void);

 int xbzrle_encode_buffer(uint8_t *old_buf, uint8_t *new_buf, int slen,
                          uint8_t *dst, int dlen);
diff --git a/migration/migration.c b/migration/migration.c
index a71921f..5f7a570 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -1273,6 +1273,15 @@ bool migrate_use_events(void)
     return s->enabled_capabilities[MIGRATION_CAPABILITY_EVENTS];
 }

+bool migrate_multifd(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->enabled_capabilities[MIGRATION_CAPABILITY_X_MULTIFD];
+}
+
 int migrate_use_xbzrle(void)
 {
     MigrationState *s;
diff --git a/qapi-schema.json b/qapi-schema.json
index 5a8ec38..bc96ee4 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -574,12 +574,13 @@
 #          been migrated, pulling the remaining pages along as needed. NOTE: If
 #          the migration fails during postcopy the VM will fail.  (since 2.6)
 #
+# @x-multifd: Use more than one fd for migration (since 2.8)
+#
 # Since: 1.2
 ##
 { 'enum': 'MigrationCapability',
   'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks',
-           'compress', 'events', 'postcopy-ram'] }
-
+           'compress', 'events', 'postcopy-ram', 'x-multifd'] }
 ##
 # @MigrationCapabilityStatus
 #
-- 
2.7.4

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [Qemu-devel] [PATCH 04/13] migration: Create x-multifd-threads parameter
  2016-10-21 19:42 [Qemu-devel] [RFC 00/13] Multifd v2 Juan Quintela
                   ` (2 preceding siblings ...)
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 03/13] migration: Add multifd capability Juan Quintela
@ 2016-10-21 19:42 ` Juan Quintela
  2016-10-26 18:33   ` Dr. David Alan Gilbert
  2016-10-26 21:16   ` Eric Blake
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 05/13] migration: create multifd migration threads Juan Quintela
                   ` (9 subsequent siblings)
  13 siblings, 2 replies; 25+ messages in thread
From: Juan Quintela @ 2016-10-21 19:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

Indicates the number of threads that we would create.  By default we
create 2 threads.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 hmp.c                         |  7 +++++++
 include/migration/migration.h |  2 ++
 migration/migration.c         | 24 ++++++++++++++++++++++++
 qapi-schema.json              | 11 +++++++++--
 4 files changed, 42 insertions(+), 2 deletions(-)

diff --git a/hmp.c b/hmp.c
index 80f7f1f..54f9f03 100644
--- a/hmp.c
+++ b/hmp.c
@@ -318,6 +318,9 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
         monitor_printf(mon, " %s: %" PRId64 " milliseconds",
             MigrationParameter_lookup[MIGRATION_PARAMETER_DOWNTIME_LIMIT],
             params->downtime_limit);
+        monitor_printf(mon, " %s: %" PRId64,
+            MigrationParameter_lookup[MIGRATION_PARAMETER_X_MULTIFD_THREADS],
+            params->x_multifd_threads);
         monitor_printf(mon, "\n");
     }

@@ -1386,6 +1389,9 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
                 p.has_downtime_limit = true;
                 use_int_value = true;
                 break;
+            case MIGRATION_PARAMETER_X_MULTIFD_THREADS:
+                p.has_x_multifd_threads = true;
+                break;
             }

             if (use_int_value) {
@@ -1402,6 +1408,7 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
                 p.cpu_throttle_initial = valueint;
                 p.cpu_throttle_increment = valueint;
                 p.downtime_limit = valueint;
+                p.x_multifd_threads = valueint;
             }

             qmp_migrate_set_parameters(&p, &err);
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 5666068..709355e 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -240,6 +240,8 @@ bool migration_in_postcopy(MigrationState *);
 bool migration_in_postcopy_after_devices(MigrationState *);
 MigrationState *migrate_get_current(void);

+int migrate_multifd_threads(void);
+
 void migrate_compress_threads_create(void);
 void migrate_compress_threads_join(void);
 void migrate_decompress_threads_create(void);
diff --git a/migration/migration.c b/migration/migration.c
index 5f7a570..217ccbc 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -62,6 +62,8 @@
 /* Migration XBZRLE default cache size */
 #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024)

+#define DEFAULT_MIGRATE_MULTIFD_THREADS 2
+
 static NotifierList migration_state_notifiers =
     NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);

@@ -94,6 +96,7 @@ MigrationState *migrate_get_current(void)
             .cpu_throttle_increment = DEFAULT_MIGRATE_CPU_THROTTLE_INCREMENT,
             .max_bandwidth = MAX_THROTTLE,
             .downtime_limit = DEFAULT_MIGRATE_SET_DOWNTIME,
+            .x_multifd_threads = DEFAULT_MIGRATE_MULTIFD_THREADS,
         },
     };

@@ -567,6 +570,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
     params->max_bandwidth = s->parameters.max_bandwidth;
     params->has_downtime_limit = true;
     params->downtime_limit = s->parameters.downtime_limit;
+    params->has_x_multifd_threads = true;
+    params->x_multifd_threads = s->parameters.x_multifd_threads;

     return params;
 }
@@ -813,6 +818,13 @@ void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
                    "an integer in the range of 0 to 2000000 milliseconds");
         return;
     }
+    if (params->has_x_multifd_threads &&
+            (params->x_multifd_threads < 1 || params->x_multifd_threads > 255)) {
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+                   "multifd_threads",
+                   "is invalid, it should be in the range of 1 to 255");
+        return;
+    }

     if (params->has_compress_level) {
         s->parameters.compress_level = params->compress_level;
@@ -847,6 +859,9 @@ void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
     if (params->has_downtime_limit) {
         s->parameters.downtime_limit = params->downtime_limit;
     }
+    if (params->has_x_multifd_threads) {
+        s->parameters.x_multifd_threads = params->x_multifd_threads;
+    }
 }


@@ -1282,6 +1297,15 @@ bool migrate_multifd(void)
     return s->enabled_capabilities[MIGRATION_CAPABILITY_X_MULTIFD];
 }

+int migrate_multifd_threads(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->parameters.x_multifd_threads;
+}
+
 int migrate_use_xbzrle(void)
 {
     MigrationState *s;
diff --git a/qapi-schema.json b/qapi-schema.json
index bc96ee4..b5c9a06 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -665,13 +665,16 @@
 # @downtime-limit: set maximum tolerated downtime for migration. maximum
 #                  downtime in milliseconds (Since 2.8)
 #
+# @x-multifd-threads: Number of threads used to migrate data in parallel
+#                     The default value is 1 (since 2.8)
+#
 # Since: 2.4
 ##
 { 'enum': 'MigrationParameter',
   'data': ['compress-level', 'compress-threads', 'decompress-threads',
            'cpu-throttle-initial', 'cpu-throttle-increment',
            'tls-creds', 'tls-hostname', 'max-bandwidth',
-           'downtime-limit'] }
+           'downtime-limit', 'x-multifd-threads'] }

 #
 # @migrate-set-parameters
@@ -726,6 +729,9 @@
 # @downtime-limit: set maximum tolerated downtime for migration. maximum
 #                  downtime in milliseconds (Since 2.8)
 #
+# @x-multifd-threads: Number of threads used to migrate data in parallel
+#                     The default value is 1 (since 2.8)
+#
 # Since: 2.4
 ##
 { 'struct': 'MigrationParameters',
@@ -737,7 +743,8 @@
             '*tls-creds': 'str',
             '*tls-hostname': 'str',
             '*max-bandwidth': 'int',
-            '*downtime-limit': 'int'} }
+            '*downtime-limit': 'int',
+            '*x-multifd-threads': 'int'} }
 ##
 # @query-migrate-parameters
 #
-- 
2.7.4

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [Qemu-devel] [PATCH 05/13] migration: create multifd migration threads
  2016-10-21 19:42 [Qemu-devel] [RFC 00/13] Multifd v2 Juan Quintela
                   ` (3 preceding siblings ...)
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 04/13] migration: Create x-multifd-threads parameter Juan Quintela
@ 2016-10-21 19:42 ` Juan Quintela
  2016-10-26 18:43   ` Dr. David Alan Gilbert
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 06/13] migration: Start of multiple fd work Juan Quintela
                   ` (8 subsequent siblings)
  13 siblings, 1 reply; 25+ messages in thread
From: Juan Quintela @ 2016-10-21 19:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

Creation of the threads, nothing inside yet.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 include/migration/migration.h |   4 ++
 migration/migration.c         |   6 ++
 migration/ram.c               | 148 ++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 158 insertions(+)

diff --git a/include/migration/migration.h b/include/migration/migration.h
index 709355e..80ab8c0 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -241,6 +241,10 @@ bool migration_in_postcopy_after_devices(MigrationState *);
 MigrationState *migrate_get_current(void);

 int migrate_multifd_threads(void);
+void migrate_multifd_send_threads_create(void);
+void migrate_multifd_send_threads_join(void);
+void migrate_multifd_recv_threads_create(void);
+void migrate_multifd_recv_threads_join(void);

 void migrate_compress_threads_create(void);
 void migrate_compress_threads_join(void);
diff --git a/migration/migration.c b/migration/migration.c
index 217ccbc..a4615f5 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -336,6 +336,7 @@ static void process_incoming_migration_bh(void *opaque)
                           MIGRATION_STATUS_FAILED);
         error_report_err(local_err);
         migrate_decompress_threads_join();
+        migrate_multifd_recv_threads_join();
         exit(EXIT_FAILURE);
     }

@@ -360,6 +361,7 @@ static void process_incoming_migration_bh(void *opaque)
         runstate_set(global_state_get_runstate());
     }
     migrate_decompress_threads_join();
+    migrate_multifd_recv_threads_join();
     /*
      * This must happen after any state changes since as soon as an external
      * observer sees this event they might start to prod at the VM assuming
@@ -413,6 +415,7 @@ static void process_incoming_migration_co(void *opaque)
                           MIGRATION_STATUS_FAILED);
         error_report("load of migration failed: %s", strerror(-ret));
         migrate_decompress_threads_join();
+        migrate_multifd_recv_threads_join();
         exit(EXIT_FAILURE);
     }

@@ -425,6 +428,7 @@ void migration_fd_process_incoming(QEMUFile *f)
     Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, f);

     migrate_decompress_threads_create();
+    migrate_multifd_recv_threads_create();
     qemu_file_set_blocking(f, false);
     qemu_coroutine_enter(co);
 }
@@ -916,6 +920,7 @@ static void migrate_fd_cleanup(void *opaque)
         qemu_mutex_lock_iothread();

         migrate_compress_threads_join();
+        migrate_multifd_send_threads_join();
         qemu_fclose(s->to_dst_file);
         s->to_dst_file = NULL;
     }
@@ -1922,6 +1927,7 @@ void migrate_fd_connect(MigrationState *s)
     }

     migrate_compress_threads_create();
+    migrate_multifd_send_threads_create();
     qemu_thread_create(&s->thread, "migration", migration_thread, s,
                        QEMU_THREAD_JOINABLE);
     s->migration_thread_running = true;
diff --git a/migration/ram.c b/migration/ram.c
index 495a931..78d400e 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -389,6 +389,154 @@ void migrate_compress_threads_create(void)
     }
 }

+/* Multiple fd's */
+
+struct MultiFDSendParams {
+    QemuThread thread;
+    QemuCond cond;
+    QemuMutex mutex;
+    bool quit;
+};
+typedef struct MultiFDSendParams MultiFDSendParams;
+
+static MultiFDSendParams *multifd_send;
+
+static void *multifd_send_thread(void *opaque)
+{
+    MultiFDSendParams *params = opaque;
+
+    qemu_mutex_lock(&params->mutex);
+    while (!params->quit){
+        qemu_cond_wait(&params->cond, &params->mutex);
+    }
+    qemu_mutex_unlock(&params->mutex);
+
+    return NULL;
+}
+
+static void terminate_multifd_send_threads(void)
+{
+    int i, thread_count;
+
+    thread_count = migrate_multifd_threads();
+    for (i = 0; i < thread_count; i++) {
+        qemu_mutex_lock(&multifd_send[i].mutex);
+        multifd_send[i].quit = true;
+        qemu_cond_signal(&multifd_send[i].cond);
+        qemu_mutex_unlock(&multifd_send[i].mutex);
+    }
+}
+
+void migrate_multifd_send_threads_join(void)
+{
+    int i, thread_count;
+
+    if (!migrate_multifd()){
+        return;
+    }
+    terminate_multifd_send_threads();
+    thread_count = migrate_multifd_threads();
+    for (i = 0; i < thread_count; i++) {
+        qemu_thread_join(&multifd_send[i].thread);
+        qemu_mutex_destroy(&multifd_send[i].mutex);
+        qemu_cond_destroy(&multifd_send[i].cond);
+    }
+    g_free(multifd_send);
+    multifd_send = NULL;
+}
+
+void migrate_multifd_send_threads_create(void)
+{
+    int i, thread_count;
+
+    if (!migrate_multifd()){
+        return;
+    }
+    thread_count = migrate_multifd_threads();
+    multifd_send = g_new0(MultiFDSendParams, thread_count);
+    for (i = 0; i < thread_count; i++) {
+        qemu_mutex_init(&multifd_send[i].mutex);
+        qemu_cond_init(&multifd_send[i].cond);
+        multifd_send[i].quit = false;
+        qemu_thread_create(&multifd_send[i].thread, "multifd_send",
+                           multifd_send_thread, &multifd_send[i],
+                           QEMU_THREAD_JOINABLE);
+    }
+}
+
+struct MultiFDRecvParams {
+    QemuThread thread;
+    QemuCond cond;
+    QemuMutex mutex;
+    bool quit;
+};
+typedef struct MultiFDRecvParams MultiFDRecvParams;
+
+static MultiFDRecvParams *multifd_recv;
+
+static void *multifd_recv_thread(void *opaque)
+{
+    MultiFDRecvParams *params = opaque;
+ 
+    qemu_mutex_lock(&params->mutex);
+    while (!params->quit){
+        qemu_cond_wait(&params->cond, &params->mutex);
+    }
+    qemu_mutex_unlock(&params->mutex);
+
+    return NULL;
+}
+
+static void terminate_multifd_recv_threads(void)
+{
+    int i, thread_count;
+
+    thread_count = migrate_multifd_threads();
+    for (i = 0; i < thread_count; i++) {
+        qemu_mutex_lock(&multifd_recv[i].mutex);
+        multifd_recv[i].quit = true;
+        qemu_cond_signal(&multifd_recv[i].cond);
+        qemu_mutex_unlock(&multifd_recv[i].mutex);
+    }
+}
+
+void migrate_multifd_recv_threads_join(void)
+{
+    int i, thread_count;
+
+    if (!migrate_multifd()){
+        return;
+    }
+    terminate_multifd_recv_threads();
+    thread_count = migrate_multifd_threads();
+    for (i = 0; i < thread_count; i++) {
+        qemu_thread_join(&multifd_recv[i].thread);
+        qemu_mutex_destroy(&multifd_recv[i].mutex);
+        qemu_cond_destroy(&multifd_recv[i].cond);
+    }
+    g_free(multifd_recv);
+    multifd_recv = NULL;
+}
+
+void migrate_multifd_recv_threads_create(void)
+{
+    int i, thread_count;
+
+    if (!migrate_multifd()){
+        return;
+    }
+    thread_count = migrate_multifd_threads();
+    multifd_recv = g_new0(MultiFDRecvParams, thread_count);
+    for (i = 0; i < thread_count; i++) {
+        qemu_mutex_init(&multifd_recv[i].mutex);
+        qemu_cond_init(&multifd_recv[i].cond);
+        multifd_recv[i].quit = false;
+        qemu_thread_create(&multifd_recv[i].thread, "multifd_recv",
+                           multifd_recv_thread, &multifd_recv[i],
+                           QEMU_THREAD_JOINABLE);
+    }
+}
+
 /**
  * save_page_header: Write page header to wire
  *
-- 
2.7.4

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [Qemu-devel] [PATCH 06/13] migration: Start of multiple fd work
  2016-10-21 19:42 [Qemu-devel] [RFC 00/13] Multifd v2 Juan Quintela
                   ` (4 preceding siblings ...)
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 05/13] migration: create multifd migration threads Juan Quintela
@ 2016-10-21 19:42 ` Juan Quintela
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 07/13] migration: create ram_multifd_page Juan Quintela
                   ` (7 subsequent siblings)
  13 siblings, 0 replies; 25+ messages in thread
From: Juan Quintela @ 2016-10-21 19:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

We create new channels for each new thread created. We only send through
them a character to be sure that we are creating the channels in the
right order.

Note: Reference count/freeing of channels is not done

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 include/migration/migration.h |  6 +++++
 migration/ram.c               | 45 +++++++++++++++++++++++++++++++++++-
 migration/socket.c            | 53 ++++++++++++++++++++++++++++++++++++++++++-
 3 files changed, 102 insertions(+), 2 deletions(-)

diff --git a/include/migration/migration.h b/include/migration/migration.h
index 80ab8c0..0b455d6 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -21,6 +21,7 @@
 #include "migration/vmstate.h"
 #include "qapi-types.h"
 #include "exec/cpu-common.h"
+#include "io/channel.h"

 #define QEMU_VM_FILE_MAGIC           0x5145564d
 #define QEMU_VM_FILE_VERSION_COMPAT  0x00000002
@@ -211,6 +212,11 @@ void tcp_start_incoming_migration(const char *host_port, Error **errp);

 void tcp_start_outgoing_migration(MigrationState *s, const char *host_port, Error **errp);

+QIOChannel *socket_recv_channel_create(void);
+int socket_recv_channel_destroy(QIOChannel *recv);
+QIOChannel *socket_send_channel_create(void);
+int socket_send_channel_destroy(QIOChannel *send);
+
 void unix_start_incoming_migration(const char *path, Error **errp);

 void unix_start_outgoing_migration(MigrationState *s, const char *path, Error **errp);
diff --git a/migration/ram.c b/migration/ram.c
index 78d400e..0ea40eb 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -396,6 +396,8 @@ struct MultiFDSendParams {
     QemuCond cond;
     QemuMutex mutex;
     bool quit;
+    bool started;
+    QIOChannel *c;
 };
 typedef struct MultiFDSendParams MultiFDSendParams;

@@ -404,6 +406,13 @@ static MultiFDSendParams *multifd_send;
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *params = opaque;
+    char start = 's';
+
+    qio_channel_write(params->c, &start, 1, &error_abort);
+    qemu_mutex_lock(&params->mutex);
+    params->started = true;
+    qemu_cond_signal(&params->cond);
+    qemu_mutex_unlock(&params->mutex);

     qemu_mutex_lock(&params->mutex);
     while (!params->quit){
@@ -440,6 +449,7 @@ void migrate_multifd_send_threads_join(void)
         qemu_thread_join(&multifd_send[i].thread);
         qemu_mutex_destroy(&multifd_send[i].mutex);
         qemu_cond_destroy(&multifd_send[i].cond);
+        socket_send_channel_destroy(multifd_send[i].c);
     }
     g_free(multifd_send);
     multifd_send = NULL;
@@ -458,9 +468,20 @@ void migrate_multifd_send_threads_create(void)
         qemu_mutex_init(&multifd_send[i].mutex);
         qemu_cond_init(&multifd_send[i].cond);
         multifd_send[i].quit = false;
+        multifd_send[i].started = false;
+        multifd_send[i].c = socket_send_channel_create();
+        if(!multifd_send[i].c) {
+            printf("Error creating a send channel");
+            exit(0);
+        }
         qemu_thread_create(&multifd_send[i].thread, "multifd_send",
                            multifd_send_thread, &multifd_send[i],
                            QEMU_THREAD_JOINABLE);
+        qemu_mutex_lock(&multifd_send[i].mutex);
+        while (!multifd_send[i].started) {
+            qemu_cond_wait(&multifd_send[i].cond, &multifd_send[i].mutex);
+        }
+        qemu_mutex_unlock(&multifd_send[i].mutex);
     }
 }

@@ -469,6 +490,8 @@ struct MultiFDRecvParams {
     QemuCond cond;
     QemuMutex mutex;
     bool quit;
+    bool started;
+    QIOChannel *c;
 };
 typedef struct MultiFDRecvParams MultiFDRecvParams;

@@ -477,7 +500,14 @@ static MultiFDRecvParams *multifd_recv;
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *params = opaque;
- 
+    char start;
+
+    qio_channel_read(params->c, &start, 1, &error_abort);
+    qemu_mutex_lock(&params->mutex);
+    params->started = true;
+    qemu_cond_signal(&params->cond);
+    qemu_mutex_unlock(&params->mutex);
+
     qemu_mutex_lock(&params->mutex);
     while (!params->quit){
         qemu_cond_wait(&params->cond, &params->mutex);
@@ -513,6 +543,7 @@ void migrate_multifd_recv_threads_join(void)
         qemu_thread_join(&multifd_recv[i].thread);
         qemu_mutex_destroy(&multifd_recv[i].mutex);
         qemu_cond_destroy(&multifd_recv[i].cond);
+        socket_send_channel_destroy(multifd_recv[i].c);
     }
     g_free(multifd_recv);
     multifd_recv = NULL;
@@ -531,9 +562,21 @@ void migrate_multifd_recv_threads_create(void)
         qemu_mutex_init(&multifd_recv[i].mutex);
         qemu_cond_init(&multifd_recv[i].cond);
         multifd_recv[i].quit = false;
+        multifd_recv[i].started = false;
+        multifd_recv[i].c = socket_recv_channel_create();
+
+        if(!multifd_recv[i].c) {
+            printf("Error creating a recv channel");
+            exit(0);
+        }
         qemu_thread_create(&multifd_recv[i].thread, "multifd_recv",
                            multifd_recv_thread, &multifd_recv[i],
                            QEMU_THREAD_JOINABLE);
+        qemu_mutex_lock(&multifd_recv[i].mutex);
+        while (!multifd_recv[i].started) {
+            qemu_cond_wait(&multifd_recv[i].cond, &multifd_recv[i].mutex);
+        }
+        qemu_mutex_unlock(&multifd_recv[i].mutex);
     }
 }

diff --git a/migration/socket.c b/migration/socket.c
index a21c0c5..f001396 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -24,6 +24,48 @@
 #include "io/channel-socket.h"
 #include "trace.h"

+struct SocketArgs {
+    QIOChannelSocket *ioc;
+    SocketAddress *saddr;
+    Error **errp;
+} socket_args;
+
+QIOChannel *socket_recv_channel_create(void)
+{
+    QIOChannelSocket *sioc;
+    Error *err = NULL;
+
+    sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(socket_args.ioc),
+                                     &err);
+    if (!sioc) {
+        error_report("could not accept migration connection (%s)",
+                     error_get_pretty(err));
+        return NULL;
+    }
+    return QIO_CHANNEL(sioc);
+}
+
+int socket_recv_channel_destroy(QIOChannel *recv)
+{
+    // Remove channel here
+    return 0;
+}
+
+QIOChannel *socket_send_channel_create(void)
+{
+    QIOChannelSocket *sioc = qio_channel_socket_new();
+
+    qio_channel_socket_connect_sync(sioc, socket_args.saddr,
+                                    socket_args.errp);
+    qio_channel_set_delay(QIO_CHANNEL(sioc), false);
+    return QIO_CHANNEL(sioc);
+}
+
+int socket_send_channel_destroy(QIOChannel *send)
+{
+    // Remove channel here
+    return 0;
+}

 static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
 {
@@ -96,6 +138,10 @@ static void socket_start_outgoing_migration(MigrationState *s,
     struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);

     data->s = s;
+
+    socket_args.saddr = saddr;
+    socket_args.errp = errp;
+
     if (saddr->type == SOCKET_ADDRESS_KIND_INET) {
         data->hostname = g_strdup(saddr->u.inet.data->host);
     }
@@ -105,7 +151,11 @@ static void socket_start_outgoing_migration(MigrationState *s,
                                      socket_outgoing_migration,
                                      data,
                                      socket_connect_data_free);
+    /*
+      We are not freeing saddr yet, we need some kind of reference
+       counting
     qapi_free_SocketAddress(saddr);
+    */
 }

 void tcp_start_outgoing_migration(MigrationState *s,
@@ -152,7 +202,7 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,

 out:
     /* Close listening socket as its no longer needed */
-    qio_channel_close(ioc, NULL);
+//    qio_channel_close(ioc, NULL);
     return FALSE; /* unregister */
 }

@@ -161,6 +211,7 @@ static void socket_start_incoming_migration(SocketAddress *saddr,
                                             Error **errp)
 {
     QIOChannelSocket *listen_ioc = qio_channel_socket_new();
+    socket_args.ioc = listen_ioc;

     if (qio_channel_socket_listen_sync(listen_ioc, saddr, errp) < 0) {
         object_unref(OBJECT(listen_ioc));
-- 
2.7.4

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [Qemu-devel] [PATCH 07/13] migration: create ram_multifd_page
  2016-10-21 19:42 [Qemu-devel] [RFC 00/13] Multifd v2 Juan Quintela
                   ` (5 preceding siblings ...)
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 06/13] migration: Start of multiple fd work Juan Quintela
@ 2016-10-21 19:42 ` Juan Quintela
  2016-10-26 18:50   ` Dr. David Alan Gilbert
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 08/13] migration: Create thread infrastructure for multifd send side Juan Quintela
                   ` (6 subsequent siblings)
  13 siblings, 1 reply; 25+ messages in thread
From: Juan Quintela @ 2016-10-21 19:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

The function still don't use multifd, but we have simplified
ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
counter and a new flag for this type of pages.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 hmp.c                         |  2 ++
 include/migration/migration.h |  1 +
 migration/migration.c         |  1 +
 migration/ram.c               | 44 ++++++++++++++++++++++++++++++++++++++++++-
 qapi-schema.json              |  4 +++-
 5 files changed, 50 insertions(+), 2 deletions(-)

diff --git a/hmp.c b/hmp.c
index 54f9f03..17a0ee2 100644
--- a/hmp.c
+++ b/hmp.c
@@ -222,6 +222,8 @@ void hmp_info_migrate(Monitor *mon, const QDict *qdict)
             monitor_printf(mon, "postcopy request count: %" PRIu64 "\n",
                            info->ram->postcopy_requests);
         }
+        monitor_printf(mon, "multifd: %" PRIu64 " pages\n",
+                       info->ram->multifd);
     }

     if (info->has_disk) {
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 0b455d6..afdc7ec 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -274,6 +274,7 @@ uint64_t xbzrle_mig_pages_transferred(void);
 uint64_t xbzrle_mig_pages_overflow(void);
 uint64_t xbzrle_mig_pages_cache_miss(void);
 double xbzrle_mig_cache_miss_rate(void);
+uint64_t multifd_mig_pages_transferred(void);

 void ram_handle_compressed(void *host, uint8_t ch, uint64_t size);
 void ram_debug_dump_bitmap(unsigned long *todump, bool expected);
diff --git a/migration/migration.c b/migration/migration.c
index a4615f5..407e0c3 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -625,6 +625,7 @@ static void populate_ram_info(MigrationInfo *info, MigrationState *s)
     info->ram->mbps = s->mbps;
     info->ram->dirty_sync_count = s->dirty_sync_count;
     info->ram->postcopy_requests = s->postcopy_requests;
+    info->ram->multifd = multifd_mig_pages_transferred();

     if (s->state != MIGRATION_STATUS_COMPLETED) {
         info->ram->remaining = ram_bytes_remaining();
diff --git a/migration/ram.c b/migration/ram.c
index 0ea40eb..44b9380 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -68,6 +68,7 @@ static uint64_t bitmap_sync_count;
 #define RAM_SAVE_FLAG_XBZRLE   0x40
 /* 0x80 is reserved in migration.h start with 0x100 next */
 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
+#define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200

 static const uint8_t ZERO_TARGET_PAGE[TARGET_PAGE_SIZE];

@@ -148,6 +149,7 @@ typedef struct AccountingInfo {
     uint64_t dup_pages;
     uint64_t skipped_pages;
     uint64_t norm_pages;
+    uint64_t multifd_pages;
     uint64_t iterations;
     uint64_t xbzrle_bytes;
     uint64_t xbzrle_pages;
@@ -218,6 +220,11 @@ uint64_t xbzrle_mig_pages_overflow(void)
     return acct_info.xbzrle_overflows;
 }

+uint64_t multifd_mig_pages_transferred(void)
+{
+    return acct_info.multifd_pages;
+}
+
 /* This is the last block that we have visited serching for dirty pages
  */
 static RAMBlock *last_seen_block;
@@ -995,6 +1002,33 @@ static int ram_save_page(QEMUFile *f, PageSearchStatus *pss,
     return pages;
 }

+static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
+                            bool last_stage, uint64_t *bytes_transferred)
+{
+    int pages;
+    uint8_t *p;
+    RAMBlock *block = pss->block;
+    ram_addr_t offset = pss->offset;
+
+    p = block->host + offset;
+
+    if (block == last_sent_block) {
+        offset |= RAM_SAVE_FLAG_CONTINUE;
+    }
+    pages = save_zero_page(f, block, offset, p, bytes_transferred);
+    if (pages == -1) {
+        *bytes_transferred +=
+            save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
+        qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
+        *bytes_transferred += TARGET_PAGE_SIZE;
+        pages = 1;
+        acct_info.norm_pages++;
+        acct_info.multifd_pages++;
+    }
+
+    return pages;
+}
+
 static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
                                 ram_addr_t offset)
 {
@@ -1432,6 +1466,8 @@ static int ram_save_target_page(MigrationState *ms, QEMUFile *f,
             res = ram_save_compressed_page(f, pss,
                                            last_stage,
                                            bytes_transferred);
+        } else if (migrate_multifd()) {
+            res = ram_multifd_page(f, pss, last_stage, bytes_transferred);
         } else {
             res = ram_save_page(f, pss, last_stage,
                                 bytes_transferred);
@@ -2678,7 +2714,8 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
         addr &= TARGET_PAGE_MASK;

         if (flags & (RAM_SAVE_FLAG_COMPRESS | RAM_SAVE_FLAG_PAGE |
-                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
+                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
+                     RAM_SAVE_FLAG_MULTIFD_PAGE)) {
             RAMBlock *block = ram_block_from_stream(f, flags);

             host = host_from_ram_block_offset(block, addr);
@@ -2753,6 +2790,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
                 break;
             }
             break;
+
+        case RAM_SAVE_FLAG_MULTIFD_PAGE:
+            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
+            break;
+
         case RAM_SAVE_FLAG_EOS:
             /* normal exit */
             break;
diff --git a/qapi-schema.json b/qapi-schema.json
index b5c9a06..e5b3a84 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -405,6 +405,7 @@
 #
 # @postcopy-requests: The number of page requests received from the destination
 #        (since 2.7)
+# @multifd: number of pages sent with multifd (since 2.8)
 #
 # Since: 0.14.0
 ##
@@ -413,7 +414,8 @@
            'duplicate': 'int', 'skipped': 'int', 'normal': 'int',
            'normal-bytes': 'int', 'dirty-pages-rate' : 'int',
            'mbps' : 'number', 'dirty-sync-count' : 'int',
-           'postcopy-requests' : 'int' } }
+           'postcopy-requests' : 'int',
+           'multifd' : 'int'} }

 ##
 # @XBZRLECacheStats
-- 
2.7.4

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [Qemu-devel] [PATCH 08/13] migration: Create thread infrastructure for multifd send side
  2016-10-21 19:42 [Qemu-devel] [RFC 00/13] Multifd v2 Juan Quintela
                   ` (6 preceding siblings ...)
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 07/13] migration: create ram_multifd_page Juan Quintela
@ 2016-10-21 19:42 ` Juan Quintela
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 09/13] migration: Send the fd number which we are going to use for this page Juan Quintela
                   ` (5 subsequent siblings)
  13 siblings, 0 replies; 25+ messages in thread
From: Juan Quintela @ 2016-10-21 19:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

We make the locking and the transfer of information specific, even if we
are still transmiting things through the main thread.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 53 insertions(+), 2 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 44b9380..0098d33 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -399,17 +399,25 @@ void migrate_compress_threads_create(void)
 /* Multiple fd's */

 struct MultiFDSendParams {
+    /* not changed */
     QemuThread thread;
+    QIOChannel *c;
     QemuCond cond;
     QemuMutex mutex;
+    /* protected by param mutex */
     bool quit;
     bool started;
-    QIOChannel *c;
+    uint8_t *address;
+    /* protected by multifd mutex */
+    bool done;
 };
 typedef struct MultiFDSendParams MultiFDSendParams;

 static MultiFDSendParams *multifd_send;

+QemuMutex multifd_send_mutex;
+QemuCond multifd_send_cond;
+
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *params = opaque;
@@ -423,7 +431,17 @@ static void *multifd_send_thread(void *opaque)

     qemu_mutex_lock(&params->mutex);
     while (!params->quit){
-        qemu_cond_wait(&params->cond, &params->mutex);
+        if (params->address) {
+            params->address = 0;
+            qemu_mutex_unlock(&params->mutex);
+            qemu_mutex_lock(&multifd_send_mutex);
+            params->done = true;
+            qemu_cond_signal(&multifd_send_cond);
+            qemu_mutex_unlock(&multifd_send_mutex);
+            qemu_mutex_lock(&params->mutex);
+        } else {
+            qemu_cond_wait(&params->cond, &params->mutex);
+        }
     }
     qemu_mutex_unlock(&params->mutex);

@@ -471,11 +489,15 @@ void migrate_multifd_send_threads_create(void)
     }
     thread_count = migrate_multifd_threads();
     multifd_send = g_new0(MultiFDSendParams, thread_count);
+    qemu_mutex_init(&multifd_send_mutex);
+    qemu_cond_init(&multifd_send_cond);
     for (i = 0; i < thread_count; i++) {
         qemu_mutex_init(&multifd_send[i].mutex);
         qemu_cond_init(&multifd_send[i].cond);
         multifd_send[i].quit = false;
         multifd_send[i].started = false;
+        multifd_send[i].done = true;
+        multifd_send[i].address = 0;
         multifd_send[i].c = socket_send_channel_create();
         if(!multifd_send[i].c) {
             printf("Error creating a send channel");
@@ -492,6 +514,34 @@ void migrate_multifd_send_threads_create(void)
     }
 }

+static int multifd_send_page(uint8_t *address)
+{
+    int i, thread_count;
+    bool found = false;
+
+    thread_count = migrate_multifd_threads();
+    qemu_mutex_lock(&multifd_send_mutex);
+    while (!found) {
+        for (i = 0; i < thread_count; i++) {
+            if (multifd_send[i].done) {
+                multifd_send[i].done = false;
+                found = true;
+                break;
+            }
+        }
+        if (!found) {
+            qemu_cond_wait(&multifd_send_cond, &multifd_send_mutex);
+        }
+    }
+    qemu_mutex_unlock(&multifd_send_mutex);
+    qemu_mutex_lock(&multifd_send[i].mutex);
+    multifd_send[i].address = address;
+    qemu_cond_signal(&multifd_send[i].cond);
+    qemu_mutex_unlock(&multifd_send[i].mutex);
+
+    return 0;
+}
+
 struct MultiFDRecvParams {
     QemuThread thread;
     QemuCond cond;
@@ -1020,6 +1070,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
         *bytes_transferred +=
             save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
         qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
+        multifd_send_page(p);
         *bytes_transferred += TARGET_PAGE_SIZE;
         pages = 1;
         acct_info.norm_pages++;
-- 
2.7.4

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [Qemu-devel] [PATCH 09/13] migration: Send the fd number which we are going to use for this page
  2016-10-21 19:42 [Qemu-devel] [RFC 00/13] Multifd v2 Juan Quintela
                   ` (7 preceding siblings ...)
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 08/13] migration: Create thread infrastructure for multifd send side Juan Quintela
@ 2016-10-21 19:42 ` Juan Quintela
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 10/13] migration: Create thread infrastructure for multifd recv side Juan Quintela
                   ` (4 subsequent siblings)
  13 siblings, 0 replies; 25+ messages in thread
From: Juan Quintela @ 2016-10-21 19:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

We are still sending the page through the main channel, that would
change later in the series

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 0098d33..8de9b0d 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -539,7 +539,7 @@ static int multifd_send_page(uint8_t *address)
     qemu_cond_signal(&multifd_send[i].cond);
     qemu_mutex_unlock(&multifd_send[i].mutex);

-    return 0;
+    return i;
 }

 struct MultiFDRecvParams {
@@ -1056,6 +1056,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
                             bool last_stage, uint64_t *bytes_transferred)
 {
     int pages;
+    uint16_t fd_num;
     uint8_t *p;
     RAMBlock *block = pss->block;
     ram_addr_t offset = pss->offset;
@@ -1069,8 +1070,10 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
     if (pages == -1) {
         *bytes_transferred +=
             save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
+        fd_num = multifd_send_page(p);
+        qemu_put_be16(f, fd_num);
+        *bytes_transferred += 2; /* size of fd_num */
         qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
-        multifd_send_page(p);
         *bytes_transferred += TARGET_PAGE_SIZE;
         pages = 1;
         acct_info.norm_pages++;
@@ -2758,6 +2761,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
     while (!postcopy_running && !ret && !(flags & RAM_SAVE_FLAG_EOS)) {
         ram_addr_t addr, total_ram_bytes;
         void *host = NULL;
+        uint16_t fd_num;
         uint8_t ch;

         addr = qemu_get_be64(f);
@@ -2843,6 +2847,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             break;

         case RAM_SAVE_FLAG_MULTIFD_PAGE:
+            fd_num = qemu_get_be16(f);
+            if (fd_num != 0) {
+                /* this is yet an unused variable, changed later */
+                fd_num = 0;
+            }
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;

-- 
2.7.4

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [Qemu-devel] [PATCH 10/13] migration: Create thread infrastructure for multifd recv side
  2016-10-21 19:42 [Qemu-devel] [RFC 00/13] Multifd v2 Juan Quintela
                   ` (8 preceding siblings ...)
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 09/13] migration: Send the fd number which we are going to use for this page Juan Quintela
@ 2016-10-21 19:42 ` Juan Quintela
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 11/13] migration: Test new fd infrastructure Juan Quintela
                   ` (3 subsequent siblings)
  13 siblings, 0 replies; 25+ messages in thread
From: Juan Quintela @ 2016-10-21 19:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

We make the locking and the transfer of information specific, even if we
are still receiving things through the main thread.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 47 ++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 44 insertions(+), 3 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 8de9b0d..294a99e 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -543,17 +543,25 @@ static int multifd_send_page(uint8_t *address)
 }

 struct MultiFDRecvParams {
+    /* not changed */
     QemuThread thread;
+    QIOChannel *c;
     QemuCond cond;
     QemuMutex mutex;
+    /* proteced by param mutex */
     bool quit;
     bool started;
-    QIOChannel *c;
+    uint8_t *address;
+    /* proteced by multifd mutex */
+    bool done;
 };
 typedef struct MultiFDRecvParams MultiFDRecvParams;

 static MultiFDRecvParams *multifd_recv;

+QemuMutex multifd_recv_mutex;
+QemuCond  multifd_recv_cond;
+
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *params = opaque;
@@ -567,7 +575,17 @@ static void *multifd_recv_thread(void *opaque)

     qemu_mutex_lock(&params->mutex);
     while (!params->quit){
-        qemu_cond_wait(&params->cond, &params->mutex);
+        if (params->address) {
+            params->address = 0;
+            qemu_mutex_unlock(&params->mutex);
+            qemu_mutex_lock(&multifd_recv_mutex);
+            params->done = true;
+            qemu_cond_signal(&multifd_recv_cond);
+            qemu_mutex_unlock(&multifd_recv_mutex);
+            qemu_mutex_lock(&params->mutex);
+        } else {
+            qemu_cond_wait(&params->cond, &params->mutex);
+        }
     }
     qemu_mutex_unlock(&params->mutex);

@@ -620,8 +638,9 @@ void migrate_multifd_recv_threads_create(void)
         qemu_cond_init(&multifd_recv[i].cond);
         multifd_recv[i].quit = false;
         multifd_recv[i].started = false;
+        multifd_recv[i].done = true;
+        multifd_recv[i].address = 0;
         multifd_recv[i].c = socket_recv_channel_create();
-
         if(!multifd_recv[i].c) {
             printf("Error creating a recv channel");
             exit(0);
@@ -637,6 +656,27 @@ void migrate_multifd_recv_threads_create(void)
     }
 }

+static void multifd_recv_page(uint8_t *address, int fd_num)
+{
+    int thread_count;
+    MultiFDRecvParams *params;
+
+    thread_count = migrate_multifd_threads();
+    assert(fd_num < thread_count);
+    params = &multifd_recv[fd_num];
+
+    qemu_mutex_lock(&multifd_recv_mutex);
+    while (!params->done) {
+        qemu_cond_wait(&multifd_recv_cond, &multifd_recv_mutex);
+    }
+    params->done = false;
+    qemu_mutex_unlock(&multifd_recv_mutex);
+    qemu_mutex_lock(&params->mutex);
+    params->address = address;
+    qemu_cond_signal(&params->cond);
+    qemu_mutex_unlock(&params->mutex);
+}
+
 /**
  * save_page_header: Write page header to wire
  *
@@ -2852,6 +2892,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
                 /* this is yet an unused variable, changed later */
                 fd_num = 0;
             }
+            multifd_recv_page(host, fd_num);
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;

-- 
2.7.4

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [Qemu-devel] [PATCH 11/13] migration: Test new fd infrastructure
  2016-10-21 19:42 [Qemu-devel] [RFC 00/13] Multifd v2 Juan Quintela
                   ` (9 preceding siblings ...)
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 10/13] migration: Create thread infrastructure for multifd recv side Juan Quintela
@ 2016-10-21 19:42 ` Juan Quintela
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 12/13] migration: [HACK]Transfer pages over new channels Juan Quintela
                   ` (2 subsequent siblings)
  13 siblings, 0 replies; 25+ messages in thread
From: Juan Quintela @ 2016-10-21 19:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

We just send the address through the alternate channels and test that it
is ok.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 30 ++++++++++++++++++++++++++----
 1 file changed, 26 insertions(+), 4 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 294a99e..2ead443 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -421,6 +421,7 @@ QemuCond multifd_send_cond;
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *params = opaque;
+    uint8_t *address;
     char start = 's';

     qio_channel_write(params->c, &start, 1, &error_abort);
@@ -432,8 +433,16 @@ static void *multifd_send_thread(void *opaque)
     qemu_mutex_lock(&params->mutex);
     while (!params->quit){
         if (params->address) {
+            address = params->address;
             params->address = 0;
             qemu_mutex_unlock(&params->mutex);
+
+            if (qio_channel_write(params->c, (const char *)&address,
+                                  sizeof(uint8_t *), &error_abort)
+                != sizeof(uint8_t*)) {
+                /* Shuoudn't ever happen */
+                exit(-1);
+            }
             qemu_mutex_lock(&multifd_send_mutex);
             params->done = true;
             qemu_cond_signal(&multifd_send_cond);
@@ -565,6 +574,8 @@ QemuCond  multifd_recv_cond;
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *params = opaque;
+    uint8_t *address;
+    uint8_t *recv_address;
     char start;

     qio_channel_read(params->c, &start, 1, &error_abort);
@@ -576,8 +587,23 @@ static void *multifd_recv_thread(void *opaque)
     qemu_mutex_lock(&params->mutex);
     while (!params->quit){
         if (params->address) {
+            address = params->address;
             params->address = 0;
             qemu_mutex_unlock(&params->mutex);
+
+            if (qio_channel_read(params->c, (char *)&recv_address,
+                                 sizeof(uint8_t*), &error_abort)
+                != sizeof(uint8_t *)) {
+                /* shouldn't ever happen */
+                exit(-1);
+            }
+
+            if (address != recv_address) {
+                printf("We received %p what we were expecting %p\n",
+                       recv_address, address);
+                exit(-1);
+            }
+
             qemu_mutex_lock(&multifd_recv_mutex);
             params->done = true;
             qemu_cond_signal(&multifd_recv_cond);
@@ -2888,10 +2914,6 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)

         case RAM_SAVE_FLAG_MULTIFD_PAGE:
             fd_num = qemu_get_be16(f);
-            if (fd_num != 0) {
-                /* this is yet an unused variable, changed later */
-                fd_num = 0;
-            }
             multifd_recv_page(host, fd_num);
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;
-- 
2.7.4

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [Qemu-devel] [PATCH 12/13] migration: [HACK]Transfer pages over new channels
  2016-10-21 19:42 [Qemu-devel] [RFC 00/13] Multifd v2 Juan Quintela
                   ` (10 preceding siblings ...)
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 11/13] migration: Test new fd infrastructure Juan Quintela
@ 2016-10-21 19:42 ` Juan Quintela
  2016-10-26 19:08   ` Dr. David Alan Gilbert
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 13/13] migration: flush receive queue Juan Quintela
  2016-10-21 20:26 ` [Qemu-devel] [RFC 00/13] Multifd v2 no-reply
  13 siblings, 1 reply; 25+ messages in thread
From: Juan Quintela @ 2016-10-21 19:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

We switch for sending the page number to send real pages.

[HACK]
How we calculate the bandwidth is beyond repair, there is a hack there
that would work for x86 and archs that have 4kb pages.

If you are having a nice day just go to migration/ram.c and look at
acct_update_position().  Now you are depressed, right?

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/migration.c | 15 +++++++++++----
 migration/ram.c       | 46 +++++++++++++++++++++++++++++++---------------
 2 files changed, 42 insertions(+), 19 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 407e0c3..0627f14 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -1757,7 +1757,8 @@ static void *migration_thread(void *opaque)
     /* Used by the bandwidth calcs, updated later */
     int64_t initial_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
     int64_t setup_start = qemu_clock_get_ms(QEMU_CLOCK_HOST);
-    int64_t initial_bytes = 0;
+    int64_t qemu_file_bytes = 0;
+    int64_t multifd_pages = 0;
     int64_t max_size = 0;
     int64_t start_time = initial_time;
     int64_t end_time;
@@ -1840,9 +1841,14 @@ static void *migration_thread(void *opaque)
         }
         current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
         if (current_time >= initial_time + BUFFER_DELAY) {
-            uint64_t transferred_bytes = qemu_ftell(s->to_dst_file) -
-                                         initial_bytes;
             uint64_t time_spent = current_time - initial_time;
+            uint64_t qemu_file_bytes_now = qemu_ftell(s->to_dst_file);
+            uint64_t multifd_pages_now = multifd_mig_pages_transferred();
+            /* Hack ahead.  Why the hell we don't have a function to now the
+               target_page_size.  Hard coding it to 4096 */
+            uint64_t transferred_bytes =
+                (qemu_file_bytes_now - qemu_file_bytes) +
+                (multifd_pages_now - multifd_pages) * 4096;
             double bandwidth = (double)transferred_bytes / time_spent;
             max_size = bandwidth * s->parameters.downtime_limit;

@@ -1859,7 +1865,8 @@ static void *migration_thread(void *opaque)

             qemu_file_reset_rate_limit(s->to_dst_file);
             initial_time = current_time;
-            initial_bytes = qemu_ftell(s->to_dst_file);
+            qemu_file_bytes = qemu_file_bytes_now;
+            multifd_pages = multifd_pages_now;
         }
         if (qemu_file_rate_limit(s->to_dst_file)) {
             /* usleep expects microseconds */
diff --git a/migration/ram.c b/migration/ram.c
index 2ead443..9a20f63 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -437,9 +437,9 @@ static void *multifd_send_thread(void *opaque)
             params->address = 0;
             qemu_mutex_unlock(&params->mutex);

-            if (qio_channel_write(params->c, (const char *)&address,
-                                  sizeof(uint8_t *), &error_abort)
-                != sizeof(uint8_t*)) {
+            if (qio_channel_write(params->c, (const char *)address,
+                                  TARGET_PAGE_SIZE, &error_abort)
+                != TARGET_PAGE_SIZE) {
                 /* Shuoudn't ever happen */
                 exit(-1);
             }
@@ -551,6 +551,23 @@ static int multifd_send_page(uint8_t *address)
     return i;
 }

+static void flush_multifd_send_data(QEMUFile *f)
+{
+    int i, thread_count;
+
+    if (!migrate_multifd()) {
+        return;
+    }
+    qemu_fflush(f);
+    thread_count = migrate_multifd_threads();
+    qemu_mutex_lock(&multifd_send_mutex);
+    for (i = 0; i < thread_count; i++) {
+        while(!multifd_send[i].done) {
+            qemu_cond_wait(&multifd_send_cond, &multifd_send_mutex);
+        }
+    }
+}
+
 struct MultiFDRecvParams {
     /* not changed */
     QemuThread thread;
@@ -575,7 +592,6 @@ static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *params = opaque;
     uint8_t *address;
-    uint8_t *recv_address;
     char start;

     qio_channel_read(params->c, &start, 1, &error_abort);
@@ -591,19 +607,13 @@ static void *multifd_recv_thread(void *opaque)
             params->address = 0;
             qemu_mutex_unlock(&params->mutex);

-            if (qio_channel_read(params->c, (char *)&recv_address,
-                                 sizeof(uint8_t*), &error_abort)
-                != sizeof(uint8_t *)) {
+            if (qio_channel_read(params->c, (char *)address,
+                                 TARGET_PAGE_SIZE, &error_abort)
+                != TARGET_PAGE_SIZE) {
                 /* shouldn't ever happen */
                 exit(-1);
             }

-            if (address != recv_address) {
-                printf("We received %p what we were expecting %p\n",
-                       recv_address, address);
-                exit(-1);
-            }
-
             qemu_mutex_lock(&multifd_recv_mutex);
             params->done = true;
             qemu_cond_signal(&multifd_recv_cond);
@@ -1126,6 +1136,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
     uint8_t *p;
     RAMBlock *block = pss->block;
     ram_addr_t offset = pss->offset;
+    static int count = 32;

     p = block->host + offset;

@@ -1137,9 +1148,14 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
         *bytes_transferred +=
             save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
         fd_num = multifd_send_page(p);
+        count--;
+        if (!count) {
+            qemu_fflush(f);
+            count = 32;
+        }
+
         qemu_put_be16(f, fd_num);
         *bytes_transferred += 2; /* size of fd_num */
-        qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
         *bytes_transferred += TARGET_PAGE_SIZE;
         pages = 1;
         acct_info.norm_pages++;
@@ -2401,6 +2417,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
     }

     flush_compressed_data(f);
+    flush_multifd_send_data(f);
     ram_control_after_iterate(f, RAM_CONTROL_FINISH);

     rcu_read_unlock();
@@ -2915,7 +2932,6 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
         case RAM_SAVE_FLAG_MULTIFD_PAGE:
             fd_num = qemu_get_be16(f);
             multifd_recv_page(host, fd_num);
-            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;

         case RAM_SAVE_FLAG_EOS:
-- 
2.7.4

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [Qemu-devel] [PATCH 13/13] migration: flush receive queue
  2016-10-21 19:42 [Qemu-devel] [RFC 00/13] Multifd v2 Juan Quintela
                   ` (11 preceding siblings ...)
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 12/13] migration: [HACK]Transfer pages over new channels Juan Quintela
@ 2016-10-21 19:42 ` Juan Quintela
  2016-10-26 19:10   ` Dr. David Alan Gilbert
  2016-10-21 20:26 ` [Qemu-devel] [RFC 00/13] Multifd v2 no-reply
  13 siblings, 1 reply; 25+ messages in thread
From: Juan Quintela @ 2016-10-21 19:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert, Juan Quintela

From: Juan Quintela <quintela@trasno.org>

Each time that we sync the bitmap, it is a possiblity that we receive
a page that is being processed by a different thread.  We fix this
problem just making sure that we wait for all receiving threads to
finish its work before we procedeed with the next stage.

I tried to make a migration command for it, but it don't work because
we sync the bitmap sometimes when we have already sent the beggining
of the section, so I just added a new page flag.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 include/migration/migration.h |  1 +
 migration/ram.c               | 40 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 41 insertions(+)

diff --git a/include/migration/migration.h b/include/migration/migration.h
index afdc7ec..49e2ec6 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -251,6 +251,7 @@ void migrate_multifd_send_threads_create(void);
 void migrate_multifd_send_threads_join(void);
 void migrate_multifd_recv_threads_create(void);
 void migrate_multifd_recv_threads_join(void);
+void qemu_savevm_send_multifd_flush(QEMUFile *f);

 void migrate_compress_threads_create(void);
 void migrate_compress_threads_join(void);
diff --git a/migration/ram.c b/migration/ram.c
index 9a20f63..bf2022e 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -69,6 +69,7 @@ static uint64_t bitmap_sync_count;
 /* 0x80 is reserved in migration.h start with 0x100 next */
 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
 #define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
+#define RAM_SAVE_FLAG_MULTIFD_FLUSH    0x400

 static const uint8_t ZERO_TARGET_PAGE[TARGET_PAGE_SIZE];

@@ -398,6 +399,11 @@ void migrate_compress_threads_create(void)

 /* Multiple fd's */

+
+/* Indicates if we have synced the bitmap and we need to assure that
+   target has processeed all previous pages */
+bool multifd_needs_flush = false;
+
 struct MultiFDSendParams {
     /* not changed */
     QemuThread thread;
@@ -713,6 +719,25 @@ static void multifd_recv_page(uint8_t *address, int fd_num)
     qemu_mutex_unlock(&params->mutex);
 }

+
+static int multifd_flush(void)
+{
+    int i, thread_count;
+
+    if (!migrate_multifd()) {
+        return 0;
+    }
+    thread_count = migrate_multifd_threads();
+    qemu_mutex_lock(&multifd_recv_mutex);
+    for (i = 0; i < thread_count; i++) {
+        while(!multifd_recv[i].done) {
+            qemu_cond_wait(&multifd_recv_cond, &multifd_recv_mutex);
+        }
+    }
+    qemu_mutex_unlock(&multifd_recv_mutex);
+    return 0;
+}
+
 /**
  * save_page_header: Write page header to wire
  *
@@ -729,6 +754,11 @@ static size_t save_page_header(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
 {
     size_t size, len;

+    if (multifd_needs_flush) {
+        offset |= RAM_SAVE_FLAG_MULTIFD_FLUSH;
+        multifd_needs_flush = false;
+    }
+
     qemu_put_be64(f, offset);
     size = 8;

@@ -2399,6 +2429,9 @@ static int ram_save_complete(QEMUFile *f, void *opaque)

     if (!migration_in_postcopy(migrate_get_current())) {
         migration_bitmap_sync();
+        if (migrate_multifd()) {
+            multifd_needs_flush = true;
+        }
     }

     ram_control_before_iterate(f, RAM_CONTROL_FINISH);
@@ -2440,6 +2473,9 @@ static void ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size,
         qemu_mutex_lock_iothread();
         rcu_read_lock();
         migration_bitmap_sync();
+        if (migrate_multifd()) {
+            multifd_needs_flush = true;
+        }
         rcu_read_unlock();
         qemu_mutex_unlock_iothread();
         remaining_size = ram_save_remaining() * TARGET_PAGE_SIZE;
@@ -2851,6 +2887,10 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
         flags = addr & ~TARGET_PAGE_MASK;
         addr &= TARGET_PAGE_MASK;

+        if (flags & RAM_SAVE_FLAG_MULTIFD_FLUSH) {
+            multifd_flush();
+            flags = flags & (~RAM_SAVE_FLAG_MULTIFD_FLUSH);
+        }
         if (flags & (RAM_SAVE_FLAG_COMPRESS | RAM_SAVE_FLAG_PAGE |
                      RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
                      RAM_SAVE_FLAG_MULTIFD_PAGE)) {
-- 
2.7.4

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* Re: [Qemu-devel] [RFC 00/13] Multifd v2
  2016-10-21 19:42 [Qemu-devel] [RFC 00/13] Multifd v2 Juan Quintela
                   ` (12 preceding siblings ...)
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 13/13] migration: flush receive queue Juan Quintela
@ 2016-10-21 20:26 ` no-reply
  13 siblings, 0 replies; 25+ messages in thread
From: no-reply @ 2016-10-21 20:26 UTC (permalink / raw)
  To: quintela; +Cc: famz, qemu-devel, amit.shah, dgilbert

Hi,

Your series seems to have some coding style problems. See output below for
more information:

Type: series
Message-id: 1477078935-7182-1-git-send-email-quintela@redhat.com
Subject: [Qemu-devel] [RFC 00/13] Multifd v2

=== TEST SCRIPT BEGIN ===
#!/bin/bash

BASE=base
n=1
total=$(git log --oneline $BASE.. | wc -l)
failed=0

# Useful git options
git config --local diff.renamelimit 0
git config --local diff.renames True

commits="$(git log --format=%H --reverse $BASE..)"
for c in $commits; do
    echo "Checking PATCH $n/$total: $(git show --no-patch --format=%s $c)..."
    if ! git show $c --format=email | ./scripts/checkpatch.pl --mailback -; then
        failed=1
        echo
    fi
    n=$((n+1))
done

exit $failed
=== TEST SCRIPT END ===

Updating 3c8cf5a9c21ff8782164d1def7f44bd888713384
From https://github.com/patchew-project/qemu
 - [tag update]      patchew/1476925867-24748-1-git-send-email-eblake@redhat.com -> patchew/1476925867-24748-1-git-send-email-eblake@redhat.com
Switched to a new branch 'test'
ee17fab migration: flush receive queue
27a9800 migration: [HACK]Transfer pages over new channels
25f1b86 migration: Test new fd infrastructure
d48dd5e migration: Create thread infrastructure for multifd recv side
e7a5acd migration: Send the fd number which we are going to use for this page
b2ace91 migration: Create thread infrastructure for multifd send side
bdfb79f migration: create ram_multifd_page
8209364 migration: Start of multiple fd work
eeb3b70 migration: create multifd migration threads
a6d8287 migration: Create x-multifd-threads parameter
8cc47c1 migration: Add multifd capability
287498c migration: [HACK] Don't create decompression threads if not enabled
4dc8431 migration: create Migration Incoming State at init time

=== OUTPUT BEGIN ===
Checking PATCH 1/13: migration: create Migration Incoming State at init time...
Checking PATCH 2/13: migration: [HACK] Don't create decompression threads if not enabled...
Checking PATCH 3/13: migration: Add multifd capability...
Checking PATCH 4/13: migration: Create x-multifd-threads parameter...
WARNING: line over 80 characters
#92: FILE: migration/migration.c:822:
+            (params->x_multifd_threads < 1 || params->x_multifd_threads > 255)) {

total: 0 errors, 1 warnings, 128 lines checked

Your patch has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.
Checking PATCH 5/13: migration: create multifd migration threads...
ERROR: space required before the open brace '{'
#103: FILE: migration/ram.c:409:
+    while (!params->quit){

ERROR: space required before the open brace '{'
#128: FILE: migration/ram.c:434:
+    if (!migrate_multifd()){

ERROR: space required before the open brace '{'
#146: FILE: migration/ram.c:452:
+    if (!migrate_multifd()){

ERROR: trailing whitespace
#174: FILE: migration/ram.c:480:
+ $

ERROR: space required before the open brace '{'
#176: FILE: migration/ram.c:482:
+    while (!params->quit){

ERROR: space required before the open brace '{'
#201: FILE: migration/ram.c:507:
+    if (!migrate_multifd()){

ERROR: space required before the open brace '{'
#219: FILE: migration/ram.c:525:
+    if (!migrate_multifd()){

total: 7 errors, 0 warnings, 206 lines checked

Your patch has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.

Checking PATCH 6/13: migration: Start of multiple fd work...
ERROR: space required before the open parenthesis '('
#80: FILE: migration/ram.c:473:
+        if(!multifd_send[i].c) {

ERROR: space required before the open parenthesis '('
#135: FILE: migration/ram.c:568:
+        if(!multifd_recv[i].c) {

ERROR: do not use C99 // comments
#181: FILE: migration/socket.c:50:
+    // Remove channel here

ERROR: do not use C99 // comments
#197: FILE: migration/socket.c:66:
+    // Remove channel here

ERROR: do not use C99 // comments
#231: FILE: migration/socket.c:205:
+//    qio_channel_close(ioc, NULL);

total: 5 errors, 0 warnings, 201 lines checked

Your patch has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.

Checking PATCH 7/13: migration: create ram_multifd_page...
Checking PATCH 8/13: migration: Create thread infrastructure for multifd send side...
Checking PATCH 9/13: migration: Send the fd number which we are going to use for this page...
Checking PATCH 10/13: migration: Create thread infrastructure for multifd recv side...
Checking PATCH 11/13: migration: Test new fd infrastructure...
ERROR: "(foo*)" should be "(foo *)"
#34: FILE: migration/ram.c:442:
+                != sizeof(uint8_t*)) {

ERROR: "(foo*)" should be "(foo *)"
#59: FILE: migration/ram.c:595:
+                                 sizeof(uint8_t*), &error_abort)

total: 2 errors, 0 warnings, 64 lines checked

Your patch has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.

Checking PATCH 12/13: migration: [HACK]Transfer pages over new channels...
ERROR: space required before the open parenthesis '('
#91: FILE: migration/ram.c:565:
+        while(!multifd_send[i].done) {

total: 1 errors, 0 warnings, 134 lines checked

Your patch has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.

Checking PATCH 13/13: migration: flush receive queue...
ERROR: do not initialise globals to 0 or NULL
#49: FILE: migration/ram.c:405:
+bool multifd_needs_flush = false;

ERROR: space required before the open parenthesis '('
#69: FILE: migration/ram.c:733:
+        while(!multifd_recv[i].done) {

total: 2 errors, 0 warnings, 89 lines checked

Your patch has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.

=== OUTPUT END ===

Test command exited with code: 1


---
Email generated automatically by Patchew [http://patchew.org/].
Please send your feedback to patchew-devel@freelists.org

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Qemu-devel] [PATCH 03/13] migration: Add multifd capability
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 03/13] migration: Add multifd capability Juan Quintela
@ 2016-10-26 17:57   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 25+ messages in thread
From: Dr. David Alan Gilbert @ 2016-10-26 17:57 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> Signed-off-by: Juan Quintela <quintela@redhat.com>

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> ---
>  include/migration/migration.h | 1 +
>  migration/migration.c         | 9 +++++++++
>  qapi-schema.json              | 5 +++--
>  3 files changed, 13 insertions(+), 2 deletions(-)
> 
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 37ef4f2..5666068 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -290,6 +290,7 @@ bool migrate_postcopy_ram(void);
>  bool migrate_zero_blocks(void);
> 
>  bool migrate_auto_converge(void);
> +bool migrate_multifd(void);
> 
>  int xbzrle_encode_buffer(uint8_t *old_buf, uint8_t *new_buf, int slen,
>                           uint8_t *dst, int dlen);
> diff --git a/migration/migration.c b/migration/migration.c
> index a71921f..5f7a570 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -1273,6 +1273,15 @@ bool migrate_use_events(void)
>      return s->enabled_capabilities[MIGRATION_CAPABILITY_EVENTS];
>  }
> 
> +bool migrate_multifd(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->enabled_capabilities[MIGRATION_CAPABILITY_X_MULTIFD];
> +}
> +
>  int migrate_use_xbzrle(void)
>  {
>      MigrationState *s;
> diff --git a/qapi-schema.json b/qapi-schema.json
> index 5a8ec38..bc96ee4 100644
> --- a/qapi-schema.json
> +++ b/qapi-schema.json
> @@ -574,12 +574,13 @@
>  #          been migrated, pulling the remaining pages along as needed. NOTE: If
>  #          the migration fails during postcopy the VM will fail.  (since 2.6)
>  #
> +# @x-multifd: Use more than one fd for migration (since 2.8)
> +#
>  # Since: 1.2
>  ##
>  { 'enum': 'MigrationCapability',
>    'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks',
> -           'compress', 'events', 'postcopy-ram'] }
> -
> +           'compress', 'events', 'postcopy-ram', 'x-multifd'] }
>  ##
>  # @MigrationCapabilityStatus
>  #
> -- 
> 2.7.4
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Qemu-devel] [PATCH 01/13] migration: create Migration Incoming State at init time
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 01/13] migration: create Migration Incoming State at init time Juan Quintela
@ 2016-10-26 18:05   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 25+ messages in thread
From: Dr. David Alan Gilbert @ 2016-10-26 18:05 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  include/migration/migration.h |  1 -
>  migration/migration.c         | 38 +++++++++++++++++---------------------
>  migration/savevm.c            |  4 ++--
>  3 files changed, 19 insertions(+), 24 deletions(-)
> 
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 2791b90..37ef4f2 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -112,7 +112,6 @@ struct MigrationIncomingState {
>  };
> 
>  MigrationIncomingState *migration_incoming_get_current(void);
> -MigrationIncomingState *migration_incoming_state_new(QEMUFile *f);
>  void migration_incoming_state_destroy(void);
> 
>  /*
> diff --git a/migration/migration.c b/migration/migration.c
> index 4d417b7..a71921f 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -104,32 +104,28 @@ MigrationState *migrate_get_current(void)
>      return &current_migration;
>  }
> 
> -/* For incoming */
> -static MigrationIncomingState *mis_current;
> -
>  MigrationIncomingState *migration_incoming_get_current(void)
>  {
> -    return mis_current;
> -}
> +    static bool once;
> +    static MigrationIncomingState mis_current;
> 
> -MigrationIncomingState *migration_incoming_state_new(QEMUFile* f)
> -{
> -    mis_current = g_new0(MigrationIncomingState, 1);
> -    mis_current->from_src_file = f;
> -    mis_current->state = MIGRATION_STATUS_NONE;
> -    QLIST_INIT(&mis_current->loadvm_handlers);
> -    qemu_mutex_init(&mis_current->rp_mutex);
> -    qemu_event_init(&mis_current->main_thread_load_event, false);
> -
> -    return mis_current;
> +    if (!once) {
> +        mis_current.state = MIGRATION_STATUS_NONE;
> +        memset(&mis_current, 0, sizeof(MigrationIncomingState));
> +        QLIST_INIT(&mis_current.loadvm_handlers);
> +        qemu_mutex_init(&mis_current.rp_mutex);
> +        qemu_event_init(&mis_current.main_thread_load_event, false);
> +        once = true;
> +    }
> +    return &mis_current;
>  }
> 
>  void migration_incoming_state_destroy(void)
>  {
> -    qemu_event_destroy(&mis_current->main_thread_load_event);
> -    loadvm_free_handlers(mis_current);
> -    g_free(mis_current);
> -    mis_current = NULL;
> +    struct MigrationIncomingState *mis = migration_incoming_get_current();
> +
> +    qemu_event_destroy(&mis->main_thread_load_event);
> +    loadvm_free_handlers(mis);
>  }
> 
> 
> @@ -375,11 +371,11 @@ static void process_incoming_migration_bh(void *opaque)
>  static void process_incoming_migration_co(void *opaque)
>  {
>      QEMUFile *f = opaque;
> -    MigrationIncomingState *mis;
> +    MigrationIncomingState *mis = migration_incoming_get_current();
>      PostcopyState ps;
>      int ret;
> 
> -    mis = migration_incoming_state_new(f);
> +    mis->from_src_file = f;
>      postcopy_state_set(POSTCOPY_INCOMING_NONE);
>      migrate_set_state(&mis->state, MIGRATION_STATUS_NONE,
>                        MIGRATION_STATUS_ACTIVE);
> diff --git a/migration/savevm.c b/migration/savevm.c
> index a831ec2..de84be5 100644
> --- a/migration/savevm.c
> +++ b/migration/savevm.c
> @@ -2107,7 +2107,6 @@ void qmp_xen_load_devices_state(const char *filename, Error **errp)
>      }
>      f = qemu_fopen_channel_input(QIO_CHANNEL(ioc));
> 
> -    migration_incoming_state_new(f);

It's curious that this one is the odd case of not setting mis->from_src_file.

Anyway, I think it's OK but please test it with -incoming defer.

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>


>      ret = qemu_loadvm_state(f);
>      qemu_fclose(f);
>      if (ret < 0) {
> @@ -2123,6 +2122,7 @@ int load_vmstate(const char *name)
>      QEMUFile *f;
>      int ret;
>      AioContext *aio_context;
> +    MigrationIncomingState *mis = migration_incoming_get_current();
> 
>      if (!bdrv_all_can_snapshot(&bs)) {
>          error_report("Device '%s' is writable but does not support snapshots.",
> @@ -2173,7 +2173,7 @@ int load_vmstate(const char *name)
>      }
> 
>      qemu_system_reset(VMRESET_SILENT);
> -    migration_incoming_state_new(f);
> +    mis->from_src_file = f;
> 
>      aio_context_acquire(aio_context);
>      ret = qemu_loadvm_state(f);
> -- 
> 2.7.4
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Qemu-devel] [PATCH 04/13] migration: Create x-multifd-threads parameter
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 04/13] migration: Create x-multifd-threads parameter Juan Quintela
@ 2016-10-26 18:33   ` Dr. David Alan Gilbert
  2016-10-26 21:16   ` Eric Blake
  1 sibling, 0 replies; 25+ messages in thread
From: Dr. David Alan Gilbert @ 2016-10-26 18:33 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> Indicates the number of threads that we would create.  By default we
> create 2 threads.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> ---
>  hmp.c                         |  7 +++++++
>  include/migration/migration.h |  2 ++
>  migration/migration.c         | 24 ++++++++++++++++++++++++
>  qapi-schema.json              | 11 +++++++++--
>  4 files changed, 42 insertions(+), 2 deletions(-)
> 
> diff --git a/hmp.c b/hmp.c
> index 80f7f1f..54f9f03 100644
> --- a/hmp.c
> +++ b/hmp.c
> @@ -318,6 +318,9 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
>          monitor_printf(mon, " %s: %" PRId64 " milliseconds",
>              MigrationParameter_lookup[MIGRATION_PARAMETER_DOWNTIME_LIMIT],
>              params->downtime_limit);
> +        monitor_printf(mon, " %s: %" PRId64,
> +            MigrationParameter_lookup[MIGRATION_PARAMETER_X_MULTIFD_THREADS],
> +            params->x_multifd_threads);
>          monitor_printf(mon, "\n");
>      }
> 
> @@ -1386,6 +1389,9 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
>                  p.has_downtime_limit = true;
>                  use_int_value = true;
>                  break;
> +            case MIGRATION_PARAMETER_X_MULTIFD_THREADS:
> +                p.has_x_multifd_threads = true;
> +                break;
>              }
> 
>              if (use_int_value) {
> @@ -1402,6 +1408,7 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
>                  p.cpu_throttle_initial = valueint;
>                  p.cpu_throttle_increment = valueint;
>                  p.downtime_limit = valueint;
> +                p.x_multifd_threads = valueint;
>              }
> 
>              qmp_migrate_set_parameters(&p, &err);
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 5666068..709355e 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -240,6 +240,8 @@ bool migration_in_postcopy(MigrationState *);
>  bool migration_in_postcopy_after_devices(MigrationState *);
>  MigrationState *migrate_get_current(void);
> 
> +int migrate_multifd_threads(void);
> +
>  void migrate_compress_threads_create(void);
>  void migrate_compress_threads_join(void);
>  void migrate_decompress_threads_create(void);
> diff --git a/migration/migration.c b/migration/migration.c
> index 5f7a570..217ccbc 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -62,6 +62,8 @@
>  /* Migration XBZRLE default cache size */
>  #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024)
> 
> +#define DEFAULT_MIGRATE_MULTIFD_THREADS 2
> +
>  static NotifierList migration_state_notifiers =
>      NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
> 
> @@ -94,6 +96,7 @@ MigrationState *migrate_get_current(void)
>              .cpu_throttle_increment = DEFAULT_MIGRATE_CPU_THROTTLE_INCREMENT,
>              .max_bandwidth = MAX_THROTTLE,
>              .downtime_limit = DEFAULT_MIGRATE_SET_DOWNTIME,
> +            .x_multifd_threads = DEFAULT_MIGRATE_MULTIFD_THREADS,
>          },
>      };
> 
> @@ -567,6 +570,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
>      params->max_bandwidth = s->parameters.max_bandwidth;
>      params->has_downtime_limit = true;
>      params->downtime_limit = s->parameters.downtime_limit;
> +    params->has_x_multifd_threads = true;
> +    params->x_multifd_threads = s->parameters.x_multifd_threads;
> 
>      return params;
>  }
> @@ -813,6 +818,13 @@ void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
>                     "an integer in the range of 0 to 2000000 milliseconds");
>          return;
>      }
> +    if (params->has_x_multifd_threads &&
> +            (params->x_multifd_threads < 1 || params->x_multifd_threads > 255)) {
> +        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
> +                   "multifd_threads",
> +                   "is invalid, it should be in the range of 1 to 255");
> +        return;
> +    }
> 
>      if (params->has_compress_level) {
>          s->parameters.compress_level = params->compress_level;
> @@ -847,6 +859,9 @@ void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
>      if (params->has_downtime_limit) {
>          s->parameters.downtime_limit = params->downtime_limit;
>      }
> +    if (params->has_x_multifd_threads) {
> +        s->parameters.x_multifd_threads = params->x_multifd_threads;
> +    }
>  }
> 
> 
> @@ -1282,6 +1297,15 @@ bool migrate_multifd(void)
>      return s->enabled_capabilities[MIGRATION_CAPABILITY_X_MULTIFD];
>  }
> 
> +int migrate_multifd_threads(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->parameters.x_multifd_threads;
> +}
> +
>  int migrate_use_xbzrle(void)
>  {
>      MigrationState *s;
> diff --git a/qapi-schema.json b/qapi-schema.json
> index bc96ee4..b5c9a06 100644
> --- a/qapi-schema.json
> +++ b/qapi-schema.json
> @@ -665,13 +665,16 @@
>  # @downtime-limit: set maximum tolerated downtime for migration. maximum
>  #                  downtime in milliseconds (Since 2.8)
>  #
> +# @x-multifd-threads: Number of threads used to migrate data in parallel
> +#                     The default value is 1 (since 2.8)
> +#
>  # Since: 2.4
>  ##
>  { 'enum': 'MigrationParameter',
>    'data': ['compress-level', 'compress-threads', 'decompress-threads',
>             'cpu-throttle-initial', 'cpu-throttle-increment',
>             'tls-creds', 'tls-hostname', 'max-bandwidth',
> -           'downtime-limit'] }
> +           'downtime-limit', 'x-multifd-threads'] }
> 
>  #
>  # @migrate-set-parameters
> @@ -726,6 +729,9 @@
>  # @downtime-limit: set maximum tolerated downtime for migration. maximum
>  #                  downtime in milliseconds (Since 2.8)
>  #
> +# @x-multifd-threads: Number of threads used to migrate data in parallel
> +#                     The default value is 1 (since 2.8)
> +#
>  # Since: 2.4
>  ##
>  { 'struct': 'MigrationParameters',
> @@ -737,7 +743,8 @@
>              '*tls-creds': 'str',
>              '*tls-hostname': 'str',
>              '*max-bandwidth': 'int',
> -            '*downtime-limit': 'int'} }
> +            '*downtime-limit': 'int',
> +            '*x-multifd-threads': 'int'} }
>  ##
>  # @query-migrate-parameters
>  #
> -- 
> 2.7.4
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Qemu-devel] [PATCH 05/13] migration: create multifd migration threads
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 05/13] migration: create multifd migration threads Juan Quintela
@ 2016-10-26 18:43   ` Dr. David Alan Gilbert
  2017-01-23 17:15     ` Juan Quintela
  0 siblings, 1 reply; 25+ messages in thread
From: Dr. David Alan Gilbert @ 2016-10-26 18:43 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> Creation of the threads, nothing inside yet.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  include/migration/migration.h |   4 ++
>  migration/migration.c         |   6 ++
>  migration/ram.c               | 148 ++++++++++++++++++++++++++++++++++++++++++
>  3 files changed, 158 insertions(+)
> 
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 709355e..80ab8c0 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -241,6 +241,10 @@ bool migration_in_postcopy_after_devices(MigrationState *);
>  MigrationState *migrate_get_current(void);
> 
>  int migrate_multifd_threads(void);
> +void migrate_multifd_send_threads_create(void);
> +void migrate_multifd_send_threads_join(void);
> +void migrate_multifd_recv_threads_create(void);
> +void migrate_multifd_recv_threads_join(void);
> 
>  void migrate_compress_threads_create(void);
>  void migrate_compress_threads_join(void);
> diff --git a/migration/migration.c b/migration/migration.c
> index 217ccbc..a4615f5 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -336,6 +336,7 @@ static void process_incoming_migration_bh(void *opaque)
>                            MIGRATION_STATUS_FAILED);
>          error_report_err(local_err);
>          migrate_decompress_threads_join();
> +        migrate_multifd_recv_threads_join();
>          exit(EXIT_FAILURE);
>      }
> 
> @@ -360,6 +361,7 @@ static void process_incoming_migration_bh(void *opaque)
>          runstate_set(global_state_get_runstate());
>      }
>      migrate_decompress_threads_join();
> +    migrate_multifd_recv_threads_join();
>      /*
>       * This must happen after any state changes since as soon as an external
>       * observer sees this event they might start to prod at the VM assuming
> @@ -413,6 +415,7 @@ static void process_incoming_migration_co(void *opaque)
>                            MIGRATION_STATUS_FAILED);
>          error_report("load of migration failed: %s", strerror(-ret));
>          migrate_decompress_threads_join();
> +        migrate_multifd_recv_threads_join();
>          exit(EXIT_FAILURE);
>      }
> 
> @@ -425,6 +428,7 @@ void migration_fd_process_incoming(QEMUFile *f)
>      Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, f);
> 
>      migrate_decompress_threads_create();
> +    migrate_multifd_recv_threads_create();
>      qemu_file_set_blocking(f, false);
>      qemu_coroutine_enter(co);
>  }
> @@ -916,6 +920,7 @@ static void migrate_fd_cleanup(void *opaque)
>          qemu_mutex_lock_iothread();
> 
>          migrate_compress_threads_join();
> +        migrate_multifd_send_threads_join();
>          qemu_fclose(s->to_dst_file);
>          s->to_dst_file = NULL;
>      }
> @@ -1922,6 +1927,7 @@ void migrate_fd_connect(MigrationState *s)
>      }
> 
>      migrate_compress_threads_create();
> +    migrate_multifd_send_threads_create();
>      qemu_thread_create(&s->thread, "migration", migration_thread, s,
>                         QEMU_THREAD_JOINABLE);
>      s->migration_thread_running = true;
> diff --git a/migration/ram.c b/migration/ram.c
> index 495a931..78d400e 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -389,6 +389,154 @@ void migrate_compress_threads_create(void)
>      }
>  }
> 
> +/* Multiple fd's */
> +
> +struct MultiFDSendParams {
> +    QemuThread thread;
> +    QemuCond cond;
> +    QemuMutex mutex;
> +    bool quit;
> +};
> +typedef struct MultiFDSendParams MultiFDSendParams;
> +
> +static MultiFDSendParams *multifd_send;
> +
> +static void *multifd_send_thread(void *opaque)
> +{
> +    MultiFDSendParams *params = opaque;
> +
> +    qemu_mutex_lock(&params->mutex);
> +    while (!params->quit){
> +        qemu_cond_wait(&params->cond, &params->mutex);
> +    }
> +    qemu_mutex_unlock(&params->mutex);
> +
> +    return NULL;
> +}
> +
> +static void terminate_multifd_send_threads(void)
> +{
> +    int i, thread_count;
> +
> +    thread_count = migrate_multifd_threads();
> +    for (i = 0; i < thread_count; i++) {
> +        qemu_mutex_lock(&multifd_send[i].mutex);
> +        multifd_send[i].quit = true;
> +        qemu_cond_signal(&multifd_send[i].cond);
> +        qemu_mutex_unlock(&multifd_send[i].mutex);
> +    }
> +}
> +
> +void migrate_multifd_send_threads_join(void)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_multifd()){

You've missed the space prior to the  { (and then copied
it everywhere in this patch).

> +        return;
> +    }
> +    terminate_multifd_send_threads();
> +    thread_count = migrate_multifd_threads();
> +    for (i = 0; i < thread_count; i++) {
> +        qemu_thread_join(&multifd_send[i].thread);
> +        qemu_mutex_destroy(&multifd_send[i].mutex);
> +        qemu_cond_destroy(&multifd_send[i].cond);
> +    }
> +    g_free(multifd_send);
> +    multifd_send = NULL;
> +}
> +
> +void migrate_multifd_send_threads_create(void)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_multifd()){
> +        return;
> +    }
> +    thread_count = migrate_multifd_threads();
> +    multifd_send = g_new0(MultiFDSendParams, thread_count);
> +    for (i = 0; i < thread_count; i++) {
> +        qemu_mutex_init(&multifd_send[i].mutex);
> +        qemu_cond_init(&multifd_send[i].cond);
> +        multifd_send[i].quit = false;
> +        qemu_thread_create(&multifd_send[i].thread, "multifd_send",

You could make the name of the thread include the thread number,
that way you could easily see in top if any one of the threads
was getting particularly busy (although be careful with the length
I think linux will ignore the name if it's over 14 characters).

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> +                           multifd_send_thread, &multifd_send[i],
> +                           QEMU_THREAD_JOINABLE);
> +    }
> +}
> +
> +struct MultiFDRecvParams {
> +    QemuThread thread;
> +    QemuCond cond;
> +    QemuMutex mutex;
> +    bool quit;
> +};
> +typedef struct MultiFDRecvParams MultiFDRecvParams;
> +
> +static MultiFDRecvParams *multifd_recv;
> +
> +static void *multifd_recv_thread(void *opaque)
> +{
> +    MultiFDRecvParams *params = opaque;
> + 
> +    qemu_mutex_lock(&params->mutex);
> +    while (!params->quit){
> +        qemu_cond_wait(&params->cond, &params->mutex);
> +    }
> +    qemu_mutex_unlock(&params->mutex);
> +
> +    return NULL;
> +}
> +
> +static void terminate_multifd_recv_threads(void)
> +{
> +    int i, thread_count;
> +
> +    thread_count = migrate_multifd_threads();
> +    for (i = 0; i < thread_count; i++) {
> +        qemu_mutex_lock(&multifd_recv[i].mutex);
> +        multifd_recv[i].quit = true;
> +        qemu_cond_signal(&multifd_recv[i].cond);
> +        qemu_mutex_unlock(&multifd_recv[i].mutex);
> +    }
> +}
> +
> +void migrate_multifd_recv_threads_join(void)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_multifd()){
> +        return;
> +    }
> +    terminate_multifd_recv_threads();
> +    thread_count = migrate_multifd_threads();
> +    for (i = 0; i < thread_count; i++) {
> +        qemu_thread_join(&multifd_recv[i].thread);
> +        qemu_mutex_destroy(&multifd_recv[i].mutex);
> +        qemu_cond_destroy(&multifd_recv[i].cond);
> +    }
> +    g_free(multifd_recv);
> +    multifd_recv = NULL;
> +}
> +
> +void migrate_multifd_recv_threads_create(void)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_multifd()){
> +        return;
> +    }
> +    thread_count = migrate_multifd_threads();
> +    multifd_recv = g_new0(MultiFDRecvParams, thread_count);
> +    for (i = 0; i < thread_count; i++) {
> +        qemu_mutex_init(&multifd_recv[i].mutex);
> +        qemu_cond_init(&multifd_recv[i].cond);
> +        multifd_recv[i].quit = false;
> +        qemu_thread_create(&multifd_recv[i].thread, "multifd_recv",
> +                           multifd_recv_thread, &multifd_recv[i],
> +                           QEMU_THREAD_JOINABLE);
> +    }
> +}
> +
>  /**
>   * save_page_header: Write page header to wire
>   *
> -- 
> 2.7.4
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Qemu-devel] [PATCH 07/13] migration: create ram_multifd_page
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 07/13] migration: create ram_multifd_page Juan Quintela
@ 2016-10-26 18:50   ` Dr. David Alan Gilbert
  2017-01-23 17:13     ` Juan Quintela
  0 siblings, 1 reply; 25+ messages in thread
From: Dr. David Alan Gilbert @ 2016-10-26 18:50 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> The function still don't use multifd, but we have simplified
> ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
> counter and a new flag for this type of pages.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  hmp.c                         |  2 ++
>  include/migration/migration.h |  1 +
>  migration/migration.c         |  1 +
>  migration/ram.c               | 44 ++++++++++++++++++++++++++++++++++++++++++-
>  qapi-schema.json              |  4 +++-
>  5 files changed, 50 insertions(+), 2 deletions(-)
> 
> diff --git a/hmp.c b/hmp.c
> index 54f9f03..17a0ee2 100644
> --- a/hmp.c
> +++ b/hmp.c
> @@ -222,6 +222,8 @@ void hmp_info_migrate(Monitor *mon, const QDict *qdict)
>              monitor_printf(mon, "postcopy request count: %" PRIu64 "\n",
>                             info->ram->postcopy_requests);
>          }
> +        monitor_printf(mon, "multifd: %" PRIu64 " pages\n",
> +                       info->ram->multifd);
>      }
> 
>      if (info->has_disk) {
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 0b455d6..afdc7ec 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -274,6 +274,7 @@ uint64_t xbzrle_mig_pages_transferred(void);
>  uint64_t xbzrle_mig_pages_overflow(void);
>  uint64_t xbzrle_mig_pages_cache_miss(void);
>  double xbzrle_mig_cache_miss_rate(void);
> +uint64_t multifd_mig_pages_transferred(void);
> 
>  void ram_handle_compressed(void *host, uint8_t ch, uint64_t size);
>  void ram_debug_dump_bitmap(unsigned long *todump, bool expected);
> diff --git a/migration/migration.c b/migration/migration.c
> index a4615f5..407e0c3 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -625,6 +625,7 @@ static void populate_ram_info(MigrationInfo *info, MigrationState *s)
>      info->ram->mbps = s->mbps;
>      info->ram->dirty_sync_count = s->dirty_sync_count;
>      info->ram->postcopy_requests = s->postcopy_requests;
> +    info->ram->multifd = multifd_mig_pages_transferred();
> 
>      if (s->state != MIGRATION_STATUS_COMPLETED) {
>          info->ram->remaining = ram_bytes_remaining();
> diff --git a/migration/ram.c b/migration/ram.c
> index 0ea40eb..44b9380 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -68,6 +68,7 @@ static uint64_t bitmap_sync_count;
>  #define RAM_SAVE_FLAG_XBZRLE   0x40
>  /* 0x80 is reserved in migration.h start with 0x100 next */
>  #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
> +#define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
> 
>  static const uint8_t ZERO_TARGET_PAGE[TARGET_PAGE_SIZE];
> 
> @@ -148,6 +149,7 @@ typedef struct AccountingInfo {
>      uint64_t dup_pages;
>      uint64_t skipped_pages;
>      uint64_t norm_pages;
> +    uint64_t multifd_pages;
>      uint64_t iterations;
>      uint64_t xbzrle_bytes;
>      uint64_t xbzrle_pages;
> @@ -218,6 +220,11 @@ uint64_t xbzrle_mig_pages_overflow(void)
>      return acct_info.xbzrle_overflows;
>  }
> 
> +uint64_t multifd_mig_pages_transferred(void)
> +{
> +    return acct_info.multifd_pages;
> +}
> +
>  /* This is the last block that we have visited serching for dirty pages
>   */
>  static RAMBlock *last_seen_block;
> @@ -995,6 +1002,33 @@ static int ram_save_page(QEMUFile *f, PageSearchStatus *pss,
>      return pages;
>  }
> 
> +static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
> +                            bool last_stage, uint64_t *bytes_transferred)
> +{
> +    int pages;
> +    uint8_t *p;
> +    RAMBlock *block = pss->block;
> +    ram_addr_t offset = pss->offset;
> +
> +    p = block->host + offset;
> +
> +    if (block == last_sent_block) {
> +        offset |= RAM_SAVE_FLAG_CONTINUE;
> +    }
> +    pages = save_zero_page(f, block, offset, p, bytes_transferred);
> +    if (pages == -1) {
> +        *bytes_transferred +=
> +            save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
> +        qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
> +        *bytes_transferred += TARGET_PAGE_SIZE;
> +        pages = 1;
> +        acct_info.norm_pages++;
> +        acct_info.multifd_pages++;

The acct_info is now updated simultaneously from multiple
threads?

Dave

> +    }
> +
> +    return pages;
> +}
> +
>  static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
>                                  ram_addr_t offset)
>  {
> @@ -1432,6 +1466,8 @@ static int ram_save_target_page(MigrationState *ms, QEMUFile *f,
>              res = ram_save_compressed_page(f, pss,
>                                             last_stage,
>                                             bytes_transferred);
> +        } else if (migrate_multifd()) {
> +            res = ram_multifd_page(f, pss, last_stage, bytes_transferred);
>          } else {
>              res = ram_save_page(f, pss, last_stage,
>                                  bytes_transferred);
> @@ -2678,7 +2714,8 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>          addr &= TARGET_PAGE_MASK;
> 
>          if (flags & (RAM_SAVE_FLAG_COMPRESS | RAM_SAVE_FLAG_PAGE |
> -                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
> +                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
> +                     RAM_SAVE_FLAG_MULTIFD_PAGE)) {
>              RAMBlock *block = ram_block_from_stream(f, flags);
> 
>              host = host_from_ram_block_offset(block, addr);
> @@ -2753,6 +2790,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>                  break;
>              }
>              break;
> +
> +        case RAM_SAVE_FLAG_MULTIFD_PAGE:
> +            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
> +            break;
> +
>          case RAM_SAVE_FLAG_EOS:
>              /* normal exit */
>              break;
> diff --git a/qapi-schema.json b/qapi-schema.json
> index b5c9a06..e5b3a84 100644
> --- a/qapi-schema.json
> +++ b/qapi-schema.json
> @@ -405,6 +405,7 @@
>  #
>  # @postcopy-requests: The number of page requests received from the destination
>  #        (since 2.7)
> +# @multifd: number of pages sent with multifd (since 2.8)
>  #
>  # Since: 0.14.0
>  ##
> @@ -413,7 +414,8 @@
>             'duplicate': 'int', 'skipped': 'int', 'normal': 'int',
>             'normal-bytes': 'int', 'dirty-pages-rate' : 'int',
>             'mbps' : 'number', 'dirty-sync-count' : 'int',
> -           'postcopy-requests' : 'int' } }
> +           'postcopy-requests' : 'int',
> +           'multifd' : 'int'} }
> 
>  ##
>  # @XBZRLECacheStats
> -- 
> 2.7.4
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Qemu-devel] [PATCH 12/13] migration: [HACK]Transfer pages over new channels
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 12/13] migration: [HACK]Transfer pages over new channels Juan Quintela
@ 2016-10-26 19:08   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 25+ messages in thread
From: Dr. David Alan Gilbert @ 2016-10-26 19:08 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> We switch for sending the page number to send real pages.
> 
> [HACK]
> How we calculate the bandwidth is beyond repair, there is a hack there
> that would work for x86 and archs that have 4kb pages.

Is that a problem? Isn't it always TARGET_PAGE_SIZE?
But it does add an interesting question about what size chunks
to hand out to each fd, do you bother with 4kB chunks or go for something
larger?

> If you are having a nice day just go to migration/ram.c and look at
> acct_update_position().  Now you are depressed, right?

Been there, then saw the interaction with RDMA.

Dave

> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/migration.c | 15 +++++++++++----
>  migration/ram.c       | 46 +++++++++++++++++++++++++++++++---------------
>  2 files changed, 42 insertions(+), 19 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index 407e0c3..0627f14 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -1757,7 +1757,8 @@ static void *migration_thread(void *opaque)
>      /* Used by the bandwidth calcs, updated later */
>      int64_t initial_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
>      int64_t setup_start = qemu_clock_get_ms(QEMU_CLOCK_HOST);
> -    int64_t initial_bytes = 0;
> +    int64_t qemu_file_bytes = 0;
> +    int64_t multifd_pages = 0;
>      int64_t max_size = 0;
>      int64_t start_time = initial_time;
>      int64_t end_time;
> @@ -1840,9 +1841,14 @@ static void *migration_thread(void *opaque)
>          }
>          current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
>          if (current_time >= initial_time + BUFFER_DELAY) {
> -            uint64_t transferred_bytes = qemu_ftell(s->to_dst_file) -
> -                                         initial_bytes;
>              uint64_t time_spent = current_time - initial_time;
> +            uint64_t qemu_file_bytes_now = qemu_ftell(s->to_dst_file);
> +            uint64_t multifd_pages_now = multifd_mig_pages_transferred();
> +            /* Hack ahead.  Why the hell we don't have a function to now the
> +               target_page_size.  Hard coding it to 4096 */
> +            uint64_t transferred_bytes =
> +                (qemu_file_bytes_now - qemu_file_bytes) +
> +                (multifd_pages_now - multifd_pages) * 4096;
>              double bandwidth = (double)transferred_bytes / time_spent;
>              max_size = bandwidth * s->parameters.downtime_limit;
> 
> @@ -1859,7 +1865,8 @@ static void *migration_thread(void *opaque)
> 
>              qemu_file_reset_rate_limit(s->to_dst_file);
>              initial_time = current_time;
> -            initial_bytes = qemu_ftell(s->to_dst_file);
> +            qemu_file_bytes = qemu_file_bytes_now;
> +            multifd_pages = multifd_pages_now;
>          }
>          if (qemu_file_rate_limit(s->to_dst_file)) {
>              /* usleep expects microseconds */
> diff --git a/migration/ram.c b/migration/ram.c
> index 2ead443..9a20f63 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -437,9 +437,9 @@ static void *multifd_send_thread(void *opaque)
>              params->address = 0;
>              qemu_mutex_unlock(&params->mutex);
> 
> -            if (qio_channel_write(params->c, (const char *)&address,
> -                                  sizeof(uint8_t *), &error_abort)
> -                != sizeof(uint8_t*)) {
> +            if (qio_channel_write(params->c, (const char *)address,
> +                                  TARGET_PAGE_SIZE, &error_abort)
> +                != TARGET_PAGE_SIZE) {
>                  /* Shuoudn't ever happen */
>                  exit(-1);
>              }
> @@ -551,6 +551,23 @@ static int multifd_send_page(uint8_t *address)
>      return i;
>  }
> 
> +static void flush_multifd_send_data(QEMUFile *f)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_multifd()) {
> +        return;
> +    }
> +    qemu_fflush(f);
> +    thread_count = migrate_multifd_threads();
> +    qemu_mutex_lock(&multifd_send_mutex);
> +    for (i = 0; i < thread_count; i++) {
> +        while(!multifd_send[i].done) {
> +            qemu_cond_wait(&multifd_send_cond, &multifd_send_mutex);
> +        }
> +    }
> +}
> +
>  struct MultiFDRecvParams {
>      /* not changed */
>      QemuThread thread;
> @@ -575,7 +592,6 @@ static void *multifd_recv_thread(void *opaque)
>  {
>      MultiFDRecvParams *params = opaque;
>      uint8_t *address;
> -    uint8_t *recv_address;
>      char start;
> 
>      qio_channel_read(params->c, &start, 1, &error_abort);
> @@ -591,19 +607,13 @@ static void *multifd_recv_thread(void *opaque)
>              params->address = 0;
>              qemu_mutex_unlock(&params->mutex);
> 
> -            if (qio_channel_read(params->c, (char *)&recv_address,
> -                                 sizeof(uint8_t*), &error_abort)
> -                != sizeof(uint8_t *)) {
> +            if (qio_channel_read(params->c, (char *)address,
> +                                 TARGET_PAGE_SIZE, &error_abort)
> +                != TARGET_PAGE_SIZE) {
>                  /* shouldn't ever happen */
>                  exit(-1);
>              }
> 
> -            if (address != recv_address) {
> -                printf("We received %p what we were expecting %p\n",
> -                       recv_address, address);
> -                exit(-1);
> -            }
> -
>              qemu_mutex_lock(&multifd_recv_mutex);
>              params->done = true;
>              qemu_cond_signal(&multifd_recv_cond);
> @@ -1126,6 +1136,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
>      uint8_t *p;
>      RAMBlock *block = pss->block;
>      ram_addr_t offset = pss->offset;
> +    static int count = 32;
> 
>      p = block->host + offset;
> 
> @@ -1137,9 +1148,14 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
>          *bytes_transferred +=
>              save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
>          fd_num = multifd_send_page(p);
> +        count--;
> +        if (!count) {
> +            qemu_fflush(f);
> +            count = 32;
> +        }
> +
>          qemu_put_be16(f, fd_num);
>          *bytes_transferred += 2; /* size of fd_num */
> -        qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
>          *bytes_transferred += TARGET_PAGE_SIZE;
>          pages = 1;
>          acct_info.norm_pages++;
> @@ -2401,6 +2417,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>      }
> 
>      flush_compressed_data(f);
> +    flush_multifd_send_data(f);
>      ram_control_after_iterate(f, RAM_CONTROL_FINISH);
> 
>      rcu_read_unlock();
> @@ -2915,7 +2932,6 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>          case RAM_SAVE_FLAG_MULTIFD_PAGE:
>              fd_num = qemu_get_be16(f);
>              multifd_recv_page(host, fd_num);
> -            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
>              break;
> 
>          case RAM_SAVE_FLAG_EOS:
> -- 
> 2.7.4
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Qemu-devel] [PATCH 13/13] migration: flush receive queue
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 13/13] migration: flush receive queue Juan Quintela
@ 2016-10-26 19:10   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 25+ messages in thread
From: Dr. David Alan Gilbert @ 2016-10-26 19:10 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah, Juan Quintela

* Juan Quintela (quintela@redhat.com) wrote:
> From: Juan Quintela <quintela@trasno.org>
> 
> Each time that we sync the bitmap, it is a possiblity that we receive
> a page that is being processed by a different thread.  We fix this
> problem just making sure that we wait for all receiving threads to
> finish its work before we procedeed with the next stage.
> 
> I tried to make a migration command for it, but it don't work because
> we sync the bitmap sometimes when we have already sent the beggining
> of the section, so I just added a new page flag.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  include/migration/migration.h |  1 +
>  migration/ram.c               | 40 ++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 41 insertions(+)
> 
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index afdc7ec..49e2ec6 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -251,6 +251,7 @@ void migrate_multifd_send_threads_create(void);
>  void migrate_multifd_send_threads_join(void);
>  void migrate_multifd_recv_threads_create(void);
>  void migrate_multifd_recv_threads_join(void);
> +void qemu_savevm_send_multifd_flush(QEMUFile *f);
> 
>  void migrate_compress_threads_create(void);
>  void migrate_compress_threads_join(void);
> diff --git a/migration/ram.c b/migration/ram.c
> index 9a20f63..bf2022e 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -69,6 +69,7 @@ static uint64_t bitmap_sync_count;
>  /* 0x80 is reserved in migration.h start with 0x100 next */
>  #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
>  #define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
> +#define RAM_SAVE_FLAG_MULTIFD_FLUSH    0x400

Now you really have run out of flags; there are architectures
with 1kB target pages.

However, we really just shouldn't be gobbling these flags up;
the flush could be MULTIFD_PAGE | one of the other flags.

Dave

>  static const uint8_t ZERO_TARGET_PAGE[TARGET_PAGE_SIZE];
> 
> @@ -398,6 +399,11 @@ void migrate_compress_threads_create(void)
> 
>  /* Multiple fd's */
> 
> +
> +/* Indicates if we have synced the bitmap and we need to assure that
> +   target has processeed all previous pages */
> +bool multifd_needs_flush = false;
> +
>  struct MultiFDSendParams {
>      /* not changed */
>      QemuThread thread;
> @@ -713,6 +719,25 @@ static void multifd_recv_page(uint8_t *address, int fd_num)
>      qemu_mutex_unlock(&params->mutex);
>  }
> 
> +
> +static int multifd_flush(void)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_multifd()) {
> +        return 0;
> +    }
> +    thread_count = migrate_multifd_threads();
> +    qemu_mutex_lock(&multifd_recv_mutex);
> +    for (i = 0; i < thread_count; i++) {
> +        while(!multifd_recv[i].done) {
> +            qemu_cond_wait(&multifd_recv_cond, &multifd_recv_mutex);
> +        }
> +    }
> +    qemu_mutex_unlock(&multifd_recv_mutex);
> +    return 0;
> +}
> +
>  /**
>   * save_page_header: Write page header to wire
>   *
> @@ -729,6 +754,11 @@ static size_t save_page_header(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
>  {
>      size_t size, len;
> 
> +    if (multifd_needs_flush) {
> +        offset |= RAM_SAVE_FLAG_MULTIFD_FLUSH;
> +        multifd_needs_flush = false;
> +    }
> +
>      qemu_put_be64(f, offset);
>      size = 8;
> 
> @@ -2399,6 +2429,9 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
> 
>      if (!migration_in_postcopy(migrate_get_current())) {
>          migration_bitmap_sync();
> +        if (migrate_multifd()) {
> +            multifd_needs_flush = true;
> +        }
>      }
> 
>      ram_control_before_iterate(f, RAM_CONTROL_FINISH);
> @@ -2440,6 +2473,9 @@ static void ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size,
>          qemu_mutex_lock_iothread();
>          rcu_read_lock();
>          migration_bitmap_sync();
> +        if (migrate_multifd()) {
> +            multifd_needs_flush = true;
> +        }
>          rcu_read_unlock();
>          qemu_mutex_unlock_iothread();
>          remaining_size = ram_save_remaining() * TARGET_PAGE_SIZE;
> @@ -2851,6 +2887,10 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>          flags = addr & ~TARGET_PAGE_MASK;
>          addr &= TARGET_PAGE_MASK;
> 
> +        if (flags & RAM_SAVE_FLAG_MULTIFD_FLUSH) {
> +            multifd_flush();
> +            flags = flags & (~RAM_SAVE_FLAG_MULTIFD_FLUSH);
> +        }
>          if (flags & (RAM_SAVE_FLAG_COMPRESS | RAM_SAVE_FLAG_PAGE |
>                       RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
>                       RAM_SAVE_FLAG_MULTIFD_PAGE)) {
> -- 
> 2.7.4
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Qemu-devel] [PATCH 04/13] migration: Create x-multifd-threads parameter
  2016-10-21 19:42 ` [Qemu-devel] [PATCH 04/13] migration: Create x-multifd-threads parameter Juan Quintela
  2016-10-26 18:33   ` Dr. David Alan Gilbert
@ 2016-10-26 21:16   ` Eric Blake
  1 sibling, 0 replies; 25+ messages in thread
From: Eric Blake @ 2016-10-26 21:16 UTC (permalink / raw)
  To: Juan Quintela, qemu-devel; +Cc: amit.shah, dgilbert

[-- Attachment #1: Type: text/plain, Size: 2713 bytes --]

On 10/21/2016 02:42 PM, Juan Quintela wrote:
> Indicates the number of threads that we would create.  By default we
> create 2 threads.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---

> @@ -1386,6 +1389,9 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
>                  p.has_downtime_limit = true;
>                  use_int_value = true;
>                  break;
> +            case MIGRATION_PARAMETER_X_MULTIFD_THREADS:
> +                p.has_x_multifd_threads = true;
> +                break;

Won't work unless you add a line 'use_int_value = true;'

>              }
> 
>              if (use_int_value) {
> @@ -1402,6 +1408,7 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
>                  p.cpu_throttle_initial = valueint;
>                  p.cpu_throttle_increment = valueint;
>                  p.downtime_limit = valueint;
> +                p.x_multifd_threads = valueint;
>              }

See also commit bb2b777 as a regression fix for a missing use_int_value.

> +++ b/qapi-schema.json
> @@ -665,13 +665,16 @@
>  # @downtime-limit: set maximum tolerated downtime for migration. maximum
>  #                  downtime in milliseconds (Since 2.8)
>  #
> +# @x-multifd-threads: Number of threads used to migrate data in parallel
> +#                     The default value is 1 (since 2.8)
> +#
>  # Since: 2.4
>  ##
>  { 'enum': 'MigrationParameter',
>    'data': ['compress-level', 'compress-threads', 'decompress-threads',
>             'cpu-throttle-initial', 'cpu-throttle-increment',
>             'tls-creds', 'tls-hostname', 'max-bandwidth',
> -           'downtime-limit'] }
> +           'downtime-limit', 'x-multifd-threads'] }
> 
>  #
>  # @migrate-set-parameters
> @@ -726,6 +729,9 @@
>  # @downtime-limit: set maximum tolerated downtime for migration. maximum
>  #                  downtime in milliseconds (Since 2.8)
>  #
> +# @x-multifd-threads: Number of threads used to migrate data in parallel
> +#                     The default value is 1 (since 2.8)

Pre-existing in the other parameters, but we have an inconsistent use of
#optional markers here.

> +#
>  # Since: 2.4
>  ##
>  { 'struct': 'MigrationParameters',
> @@ -737,7 +743,8 @@
>              '*tls-creds': 'str',
>              '*tls-hostname': 'str',
>              '*max-bandwidth': 'int',
> -            '*downtime-limit': 'int'} }
> +            '*downtime-limit': 'int',
> +            '*x-multifd-threads': 'int'} }
>  ##
>  # @query-migrate-parameters
>  #
> 

-- 
Eric Blake   eblake redhat com    +1-919-301-3266
Libvirt virtualization library http://libvirt.org


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 604 bytes --]

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Qemu-devel] [PATCH 07/13] migration: create ram_multifd_page
  2016-10-26 18:50   ` Dr. David Alan Gilbert
@ 2017-01-23 17:13     ` Juan Quintela
  0 siblings, 0 replies; 25+ messages in thread
From: Juan Quintela @ 2017-01-23 17:13 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, amit.shah

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> The function still don't use multifd, but we have simplified
>> ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
>> counter and a new flag for this type of pages.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>

>> +static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
>> +                            bool last_stage, uint64_t *bytes_transferred)
>> +{
>> +    int pages;
>> +    uint8_t *p;
>> +    RAMBlock *block = pss->block;
>> +    ram_addr_t offset = pss->offset;
>> +
>> +    p = block->host + offset;
>> +
>> +    if (block == last_sent_block) {
>> +        offset |= RAM_SAVE_FLAG_CONTINUE;
>> +    }
>> +    pages = save_zero_page(f, block, offset, p, bytes_transferred);
>> +    if (pages == -1) {
>> +        *bytes_transferred +=
>> +            save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
>> +        qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
>> +        *bytes_transferred += TARGET_PAGE_SIZE;
>> +        pages = 1;
>> +        acct_info.norm_pages++;
>> +        acct_info.multifd_pages++;
>
> The acct_info is now updated simultaneously from multiple
> threads?

No.  This is still done from the migration thread.

Later, Juan.

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Qemu-devel] [PATCH 05/13] migration: create multifd migration threads
  2016-10-26 18:43   ` Dr. David Alan Gilbert
@ 2017-01-23 17:15     ` Juan Quintela
  0 siblings, 0 replies; 25+ messages in thread
From: Juan Quintela @ 2017-01-23 17:15 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, amit.shah

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> Creation of the threads, nothing inside yet.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>

>> +void migrate_multifd_send_threads_join(void)
>> +{
>> +    int i, thread_count;
>> +
>> +    if (!migrate_multifd()){
>
> You've missed the space prior to the  { (and then copied
> it everywhere in this patch).

Fixed.  As guessed, copy & paste O:-)


>> +    thread_count = migrate_multifd_threads();
>> +    multifd_send = g_new0(MultiFDSendParams, thread_count);
>> +    for (i = 0; i < thread_count; i++) {
>> +        qemu_mutex_init(&multifd_send[i].mutex);
>> +        qemu_cond_init(&multifd_send[i].cond);
>> +        multifd_send[i].quit = false;
>> +        qemu_thread_create(&multifd_send[i].thread, "multifd_send",
>
> You could make the name of the thread include the thread number,
> that way you could easily see in top if any one of the threads
> was getting particularly busy (although be careful with the length
> I think linux will ignore the name if it's over 14 characters).

Until 99 thread will be show correctly :p

^ permalink raw reply	[flat|nested] 25+ messages in thread

end of thread, other threads:[~2017-01-23 17:15 UTC | newest]

Thread overview: 25+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-10-21 19:42 [Qemu-devel] [RFC 00/13] Multifd v2 Juan Quintela
2016-10-21 19:42 ` [Qemu-devel] [PATCH 01/13] migration: create Migration Incoming State at init time Juan Quintela
2016-10-26 18:05   ` Dr. David Alan Gilbert
2016-10-21 19:42 ` [Qemu-devel] [PATCH 02/13] migration: [HACK] Don't create decompression threads if not enabled Juan Quintela
2016-10-21 19:42 ` [Qemu-devel] [PATCH 03/13] migration: Add multifd capability Juan Quintela
2016-10-26 17:57   ` Dr. David Alan Gilbert
2016-10-21 19:42 ` [Qemu-devel] [PATCH 04/13] migration: Create x-multifd-threads parameter Juan Quintela
2016-10-26 18:33   ` Dr. David Alan Gilbert
2016-10-26 21:16   ` Eric Blake
2016-10-21 19:42 ` [Qemu-devel] [PATCH 05/13] migration: create multifd migration threads Juan Quintela
2016-10-26 18:43   ` Dr. David Alan Gilbert
2017-01-23 17:15     ` Juan Quintela
2016-10-21 19:42 ` [Qemu-devel] [PATCH 06/13] migration: Start of multiple fd work Juan Quintela
2016-10-21 19:42 ` [Qemu-devel] [PATCH 07/13] migration: create ram_multifd_page Juan Quintela
2016-10-26 18:50   ` Dr. David Alan Gilbert
2017-01-23 17:13     ` Juan Quintela
2016-10-21 19:42 ` [Qemu-devel] [PATCH 08/13] migration: Create thread infrastructure for multifd send side Juan Quintela
2016-10-21 19:42 ` [Qemu-devel] [PATCH 09/13] migration: Send the fd number which we are going to use for this page Juan Quintela
2016-10-21 19:42 ` [Qemu-devel] [PATCH 10/13] migration: Create thread infrastructure for multifd recv side Juan Quintela
2016-10-21 19:42 ` [Qemu-devel] [PATCH 11/13] migration: Test new fd infrastructure Juan Quintela
2016-10-21 19:42 ` [Qemu-devel] [PATCH 12/13] migration: [HACK]Transfer pages over new channels Juan Quintela
2016-10-26 19:08   ` Dr. David Alan Gilbert
2016-10-21 19:42 ` [Qemu-devel] [PATCH 13/13] migration: flush receive queue Juan Quintela
2016-10-26 19:10   ` Dr. David Alan Gilbert
2016-10-21 20:26 ` [Qemu-devel] [RFC 00/13] Multifd v2 no-reply

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.