All of lore.kernel.org
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH 00/17] multifd v3
@ 2017-01-23 21:32 Juan Quintela
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 01/17] migration: transform remained DPRINTF into trace_ Juan Quintela
                   ` (17 more replies)
  0 siblings, 18 replies; 57+ messages in thread
From: Juan Quintela @ 2017-01-23 21:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

Hi

This is the 3rd version of multifd. Changes:
- comments for previous verion addressed
- lots of bugs fixed
- remove DPRINTF from ram.c

- add multifd-group parameter, it gives how many pages we sent each
  time to the worker threads.  I am open to better names.
- Better flush support.
- with migration_set_speed 2G it is able to migrate "stress -vm 2
  -vm-bytes 512M" over loopback.

Please review.

Thanks, Juan.

[v2]

This is a version against current code.  It is based on top of QIO
work. It improves the thread synchronization and fixes the problem
when we could have two threads handing the same page.

Please comment, Juan.

Juan Quintela (17):
  migration: transform remained DPRINTF into trace_
  migration: create Migration Incoming State at init time
  migration: Test for disabled features on reception
  migration: Don't create decompression threads if not enabled
  migration: Add multifd capability
  migration: Create x-multifd-threads parameter
  migration: Create x-multifd-group parameter
  migration: create multifd migration threads
  migration: Start of multiple fd work
  migration: create ram_multifd_page
  migration: Create thread infrastructure for multifd send side
  migration: really use multiple pages at a time
  migration: Send the fd number which we are going to use for this page
  migration: Create thread infrastructure for multifd recv side
  migration: Test new fd infrastructure
  migration: [HACK]Transfer pages over new channels
  migration: flush receive queue

 hmp.c                         |  18 ++
 include/migration/migration.h |  17 +-
 migration/migration.c         | 115 ++++++++--
 migration/ram.c               | 515 ++++++++++++++++++++++++++++++++++++++++--
 migration/savevm.c            |   4 +-
 migration/socket.c            |  56 ++++-
 migration/trace-events        |   4 +
 qapi-schema.json              |  29 ++-
 8 files changed, 707 insertions(+), 51 deletions(-)

-- 
2.9.3

^ permalink raw reply	[flat|nested] 57+ messages in thread

* [Qemu-devel] [PATCH 01/17] migration: transform remained DPRINTF into trace_
  2017-01-23 21:32 [Qemu-devel] [PATCH 00/17] multifd v3 Juan Quintela
@ 2017-01-23 21:32 ` Juan Quintela
  2017-01-24  2:20   ` Eric Blake
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 02/17] migration: create Migration Incoming State at init time Juan Quintela
                   ` (16 subsequent siblings)
  17 siblings, 1 reply; 57+ messages in thread
From: Juan Quintela @ 2017-01-23 21:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

So we can remove DPRINTF() macro

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c        | 18 ++++--------------
 migration/trace-events |  4 ++++
 2 files changed, 8 insertions(+), 14 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index a1c8089..ef8fadf 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -45,14 +45,6 @@
 #include "qemu/rcu_queue.h"
 #include "migration/colo.h"

-#ifdef DEBUG_MIGRATION_RAM
-#define DPRINTF(fmt, ...) \
-    do { fprintf(stdout, "migration_ram: " fmt, ## __VA_ARGS__); } while (0)
-#else
-#define DPRINTF(fmt, ...) \
-    do { } while (0)
-#endif
-
 static int dirty_rate_high_cnt;

 static uint64_t bitmap_sync_count;
@@ -507,10 +499,10 @@ static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
                                        TARGET_PAGE_SIZE, XBZRLE.encoded_buf,
                                        TARGET_PAGE_SIZE);
     if (encoded_len == 0) {
-        DPRINTF("Skipping unmodified page\n");
+        trace_save_xbzrle_page_skipping();
         return 0;
     } else if (encoded_len == -1) {
-        DPRINTF("Overflow\n");
+        trace_save_xbzrle_page_overflow();
         acct_info.xbzrle_overflows++;
         /* update data in the cache */
         if (!last_stage) {
@@ -2020,8 +2012,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
         if ((i & 63) == 0) {
             uint64_t t1 = (qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - t0) / 1000000;
             if (t1 > MAX_WAIT) {
-                DPRINTF("big wait: %" PRIu64 " milliseconds, %d iterations\n",
-                        t1, i);
+                trace_ram_save_iterate_big_wait(t1, i);
                 break;
             }
         }
@@ -2594,8 +2585,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)

     wait_for_decompress_done();
     rcu_read_unlock();
-    DPRINTF("Completed load of VM with exit code %d seq iteration "
-            "%" PRIu64 "\n", ret, seq_iter);
+    trace_ram_load_complete(ret, seq_iter);
     return ret;
 }

diff --git a/migration/trace-events b/migration/trace-events
index 94134f7..6d781cf 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -186,6 +186,10 @@ postcopy_ram_incoming_cleanup_closeuf(void) ""
 postcopy_ram_incoming_cleanup_entry(void) ""
 postcopy_ram_incoming_cleanup_exit(void) ""
 postcopy_ram_incoming_cleanup_join(void) ""
+save_xbzrle_page_skipping(void) ""
+save_xbzrle_page_overflow(void) ""
+ram_save_iterate_big_wait(uint64_t milliconds, int iterations) "big wait: %" PRIu64 " milliseconds, %d iterations"
+ram_load_complete(int ret, uint64_t seq_iter) "exit_code %d seq iteration %" PRIu64

 # migration/exec.c
 migration_exec_outgoing(const char *cmd) "cmd=%s"
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 57+ messages in thread

* [Qemu-devel] [PATCH 02/17] migration: create Migration Incoming State at init time
  2017-01-23 21:32 [Qemu-devel] [PATCH 00/17] multifd v3 Juan Quintela
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 01/17] migration: transform remained DPRINTF into trace_ Juan Quintela
@ 2017-01-23 21:32 ` Juan Quintela
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 03/17] migration: Test for disabled features on reception Juan Quintela
                   ` (15 subsequent siblings)
  17 siblings, 0 replies; 57+ messages in thread
From: Juan Quintela @ 2017-01-23 21:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 include/migration/migration.h |  1 -
 migration/migration.c         | 38 +++++++++++++++++---------------------
 migration/savevm.c            |  4 ++--
 3 files changed, 19 insertions(+), 24 deletions(-)

diff --git a/include/migration/migration.h b/include/migration/migration.h
index c309d23..a184509 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -119,7 +119,6 @@ struct MigrationIncomingState {
 };

 MigrationIncomingState *migration_incoming_get_current(void);
-MigrationIncomingState *migration_incoming_state_new(QEMUFile *f);
 void migration_incoming_state_destroy(void);

 /*
diff --git a/migration/migration.c b/migration/migration.c
index f498ab8..77d7b84 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -111,32 +111,28 @@ MigrationState *migrate_get_current(void)
     return &current_migration;
 }

-/* For incoming */
-static MigrationIncomingState *mis_current;
-
 MigrationIncomingState *migration_incoming_get_current(void)
 {
-    return mis_current;
-}
+    static bool once;
+    static MigrationIncomingState mis_current;

-MigrationIncomingState *migration_incoming_state_new(QEMUFile* f)
-{
-    mis_current = g_new0(MigrationIncomingState, 1);
-    mis_current->from_src_file = f;
-    mis_current->state = MIGRATION_STATUS_NONE;
-    QLIST_INIT(&mis_current->loadvm_handlers);
-    qemu_mutex_init(&mis_current->rp_mutex);
-    qemu_event_init(&mis_current->main_thread_load_event, false);
-
-    return mis_current;
+    if (!once) {
+        mis_current.state = MIGRATION_STATUS_NONE;
+        memset(&mis_current, 0, sizeof(MigrationIncomingState));
+        QLIST_INIT(&mis_current.loadvm_handlers);
+        qemu_mutex_init(&mis_current.rp_mutex);
+        qemu_event_init(&mis_current.main_thread_load_event, false);
+        once = true;
+    }
+    return &mis_current;
 }

 void migration_incoming_state_destroy(void)
 {
-    qemu_event_destroy(&mis_current->main_thread_load_event);
-    loadvm_free_handlers(mis_current);
-    g_free(mis_current);
-    mis_current = NULL;
+    struct MigrationIncomingState *mis = migration_incoming_get_current();
+
+    qemu_event_destroy(&mis->main_thread_load_event);
+    loadvm_free_handlers(mis);
 }


@@ -382,11 +378,11 @@ static void process_incoming_migration_bh(void *opaque)
 static void process_incoming_migration_co(void *opaque)
 {
     QEMUFile *f = opaque;
-    MigrationIncomingState *mis;
+    MigrationIncomingState *mis = migration_incoming_get_current();
     PostcopyState ps;
     int ret;

-    mis = migration_incoming_state_new(f);
+    mis->from_src_file = f;
     postcopy_state_set(POSTCOPY_INCOMING_NONE);
     migrate_set_state(&mis->state, MIGRATION_STATUS_NONE,
                       MIGRATION_STATUS_ACTIVE);
diff --git a/migration/savevm.c b/migration/savevm.c
index f9c06e9..41a2506 100644
--- a/migration/savevm.c
+++ b/migration/savevm.c
@@ -2185,7 +2185,6 @@ void qmp_xen_load_devices_state(const char *filename, Error **errp)
     qio_channel_set_name(QIO_CHANNEL(ioc), "migration-xen-load-state");
     f = qemu_fopen_channel_input(QIO_CHANNEL(ioc));

-    migration_incoming_state_new(f);
     ret = qemu_loadvm_state(f);
     qemu_fclose(f);
     if (ret < 0) {
@@ -2201,6 +2200,7 @@ int load_vmstate(const char *name)
     QEMUFile *f;
     int ret;
     AioContext *aio_context;
+    MigrationIncomingState *mis = migration_incoming_get_current();

     if (!bdrv_all_can_snapshot(&bs)) {
         error_report("Device '%s' is writable but does not support snapshots.",
@@ -2251,7 +2251,7 @@ int load_vmstate(const char *name)
     }

     qemu_system_reset(VMRESET_SILENT);
-    migration_incoming_state_new(f);
+    mis->from_src_file = f;

     aio_context_acquire(aio_context);
     ret = qemu_loadvm_state(f);
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 57+ messages in thread

* [Qemu-devel] [PATCH 03/17] migration: Test for disabled features on reception
  2017-01-23 21:32 [Qemu-devel] [PATCH 00/17] multifd v3 Juan Quintela
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 01/17] migration: transform remained DPRINTF into trace_ Juan Quintela
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 02/17] migration: create Migration Incoming State at init time Juan Quintela
@ 2017-01-23 21:32 ` Juan Quintela
  2017-01-24 10:33   ` Dr. David Alan Gilbert
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 04/17] migration: Don't create decompression threads if not enabled Juan Quintela
                   ` (14 subsequent siblings)
  17 siblings, 1 reply; 57+ messages in thread
From: Juan Quintela @ 2017-01-23 21:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

Right now, if we receive a compressed page or a xbzrle page while this
features are disabled, Bad Things (TM) can happen.  Just add a test for
them.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 23 ++++++++++++++++++++++-
 1 file changed, 22 insertions(+), 1 deletion(-)

diff --git a/migration/ram.c b/migration/ram.c
index ef8fadf..4ad814a 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -2455,7 +2455,7 @@ static int ram_load_postcopy(QEMUFile *f)

 static int ram_load(QEMUFile *f, void *opaque, int version_id)
 {
-    int flags = 0, ret = 0;
+    int flags = 0, ret = 0, invalid_flags;
     static uint64_t seq_iter;
     int len = 0;
     /*
@@ -2470,6 +2470,15 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
         ret = -EINVAL;
     }

+    invalid_flags = 0;
+
+    if (!migrate_use_xbzrle()) {
+        invalid_flags |= RAM_SAVE_FLAG_XBZRLE;
+    }
+
+    if (!migrate_use_compression()) {
+        invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
+    }
     /* This RCU critical section can be very long running.
      * When RCU reclaims in the code start to become numerous,
      * it will be necessary to reduce the granularity of this
@@ -2490,6 +2499,18 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
         flags = addr & ~TARGET_PAGE_MASK;
         addr &= TARGET_PAGE_MASK;

+        if (flags & invalid_flags) {
+            if (flags & invalid_flags  & RAM_SAVE_FLAG_XBZRLE) {
+                error_report("Received an unexpected XBRLE page");
+            }
+            if (flags & invalid_flags  & RAM_SAVE_FLAG_COMPRESS_PAGE) {
+                error_report("Received an unexpected compressed page");
+            }
+
+            ret = -EINVAL;
+            break;
+        }
+
         if (flags & (RAM_SAVE_FLAG_COMPRESS | RAM_SAVE_FLAG_PAGE |
                      RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
             RAMBlock *block = ram_block_from_stream(f, flags);
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 57+ messages in thread

* [Qemu-devel] [PATCH 04/17] migration: Don't create decompression threads if not enabled
  2017-01-23 21:32 [Qemu-devel] [PATCH 00/17] multifd v3 Juan Quintela
                   ` (2 preceding siblings ...)
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 03/17] migration: Test for disabled features on reception Juan Quintela
@ 2017-01-23 21:32 ` Juan Quintela
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 05/17] migration: Add multifd capability Juan Quintela
                   ` (13 subsequent siblings)
  17 siblings, 0 replies; 57+ messages in thread
From: Juan Quintela @ 2017-01-23 21:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

Signed-off-by: Juan Quintela <quintela@redhat.com>

--

I removed the [HACK] part because previous patch just check that
compression pages are not received.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/migration/ram.c b/migration/ram.c
index 4ad814a..cea213e 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -2251,6 +2251,9 @@ void migrate_decompress_threads_create(void)
 {
     int i, thread_count;

+    if (!migrate_use_compression()) {
+        return;
+    }
     thread_count = migrate_decompress_threads();
     decompress_threads = g_new0(QemuThread, thread_count);
     decomp_param = g_new0(DecompressParam, thread_count);
@@ -2272,6 +2275,9 @@ void migrate_decompress_threads_join(void)
 {
     int i, thread_count;

+    if (!migrate_use_compression()) {
+        return;
+    }
     thread_count = migrate_decompress_threads();
     for (i = 0; i < thread_count; i++) {
         qemu_mutex_lock(&decomp_param[i].mutex);
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 57+ messages in thread

* [Qemu-devel] [PATCH 05/17] migration: Add multifd capability
  2017-01-23 21:32 [Qemu-devel] [PATCH 00/17] multifd v3 Juan Quintela
                   ` (3 preceding siblings ...)
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 04/17] migration: Don't create decompression threads if not enabled Juan Quintela
@ 2017-01-23 21:32 ` Juan Quintela
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 06/17] migration: Create x-multifd-threads parameter Juan Quintela
                   ` (12 subsequent siblings)
  17 siblings, 0 replies; 57+ messages in thread
From: Juan Quintela @ 2017-01-23 21:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 include/migration/migration.h | 1 +
 migration/migration.c         | 9 +++++++++
 qapi-schema.json              | 5 +++--
 3 files changed, 13 insertions(+), 2 deletions(-)

diff --git a/include/migration/migration.h b/include/migration/migration.h
index a184509..19245d6 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -297,6 +297,7 @@ bool migrate_postcopy_ram(void);
 bool migrate_zero_blocks(void);

 bool migrate_auto_converge(void);
+bool migrate_use_multifd(void);

 int xbzrle_encode_buffer(uint8_t *old_buf, uint8_t *new_buf, int slen,
                          uint8_t *dst, int dlen);
diff --git a/migration/migration.c b/migration/migration.c
index 77d7b84..1669e41 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -1319,6 +1319,15 @@ bool migrate_use_events(void)
     return s->enabled_capabilities[MIGRATION_CAPABILITY_EVENTS];
 }

+bool migrate_use_multifd(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->enabled_capabilities[MIGRATION_CAPABILITY_X_MULTIFD];
+}
+
 int migrate_use_xbzrle(void)
 {
     MigrationState *s;
diff --git a/qapi-schema.json b/qapi-schema.json
index ddc8783..d34632e 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -865,12 +865,13 @@
 #        side, this process is called COarse-Grain LOck Stepping (COLO) for
 #        Non-stop Service. (since 2.8)
 #
+# @x-multifd: Use more than one fd for migration (since 2.9)
+#
 # Since: 1.2
 ##
 { 'enum': 'MigrationCapability',
   'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks',
-           'compress', 'events', 'postcopy-ram', 'x-colo'] }
-
+           'compress', 'events', 'postcopy-ram', 'x-colo', 'x-multifd'] }
 ##
 # @MigrationCapabilityStatus:
 #
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 57+ messages in thread

* [Qemu-devel] [PATCH 06/17] migration: Create x-multifd-threads parameter
  2017-01-23 21:32 [Qemu-devel] [PATCH 00/17] multifd v3 Juan Quintela
                   ` (4 preceding siblings ...)
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 05/17] migration: Add multifd capability Juan Quintela
@ 2017-01-23 21:32 ` Juan Quintela
  2017-02-02 15:06   ` Eric Blake
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 07/17] migration: Create x-multifd-group parameter Juan Quintela
                   ` (11 subsequent siblings)
  17 siblings, 1 reply; 57+ messages in thread
From: Juan Quintela @ 2017-01-23 21:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

Indicates the number of threads that we would create.  By default we
create 2 threads.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 hmp.c                         |  8 ++++++++
 include/migration/migration.h |  2 ++
 migration/migration.c         | 23 +++++++++++++++++++++++
 qapi-schema.json              | 13 +++++++++++--
 4 files changed, 44 insertions(+), 2 deletions(-)

diff --git a/hmp.c b/hmp.c
index 8522efe..8c7e302 100644
--- a/hmp.c
+++ b/hmp.c
@@ -322,6 +322,9 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
         monitor_printf(mon, " %s: %" PRId64,
             MigrationParameter_lookup[MIGRATION_PARAMETER_X_CHECKPOINT_DELAY],
             params->x_checkpoint_delay);
+        monitor_printf(mon, " %s: %" PRId64,
+            MigrationParameter_lookup[MIGRATION_PARAMETER_X_MULTIFD_THREADS],
+            params->x_multifd_threads);
         monitor_printf(mon, "\n");
     }

@@ -1394,6 +1397,10 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
                 p.has_x_checkpoint_delay = true;
                 use_int_value = true;
                 break;
+            case MIGRATION_PARAMETER_X_MULTIFD_THREADS:
+                p.has_x_multifd_threads = true;
+                use_int_value = true;
+                break;
             }

             if (use_int_value) {
@@ -1411,6 +1418,7 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
                 p.cpu_throttle_increment = valueint;
                 p.downtime_limit = valueint;
                 p.x_checkpoint_delay = valueint;
+                p.x_multifd_threads = valueint;
             }

             qmp_migrate_set_parameters(&p, &err);
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 19245d6..b35044c 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -247,6 +247,8 @@ bool migration_in_postcopy(MigrationState *);
 bool migration_in_postcopy_after_devices(MigrationState *);
 MigrationState *migrate_get_current(void);

+int migrate_multifd_threads(void);
+
 void migrate_compress_threads_create(void);
 void migrate_compress_threads_join(void);
 void migrate_decompress_threads_create(void);
diff --git a/migration/migration.c b/migration/migration.c
index 1669e41..2fe03d8 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -67,6 +67,7 @@
  * Note: Please change this default value to 10000 when we support hybrid mode.
  */
 #define DEFAULT_MIGRATE_X_CHECKPOINT_DELAY 200
+#define DEFAULT_MIGRATE_MULTIFD_THREADS 2

 static NotifierList migration_state_notifiers =
     NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
@@ -101,6 +102,7 @@ MigrationState *migrate_get_current(void)
             .max_bandwidth = MAX_THROTTLE,
             .downtime_limit = DEFAULT_MIGRATE_SET_DOWNTIME,
             .x_checkpoint_delay = DEFAULT_MIGRATE_X_CHECKPOINT_DELAY,
+            .x_multifd_threads = DEFAULT_MIGRATE_MULTIFD_THREADS,
         },
     };

@@ -591,6 +593,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
     params->downtime_limit = s->parameters.downtime_limit;
     params->has_x_checkpoint_delay = true;
     params->x_checkpoint_delay = s->parameters.x_checkpoint_delay;
+    params->has_x_multifd_threads = true;
+    params->x_multifd_threads = s->parameters.x_multifd_threads;

     return params;
 }
@@ -854,6 +858,13 @@ void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
                     "x_checkpoint_delay",
                     "is invalid, it should be positive");
     }
+    if (params->has_x_multifd_threads &&
+            (params->x_multifd_threads < 1 || params->x_multifd_threads > 255)) {
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+                   "multifd_threads",
+                   "is invalid, it should be in the range of 1 to 255");
+        return;
+    }

     if (params->has_compress_level) {
         s->parameters.compress_level = params->compress_level;
@@ -892,6 +903,9 @@ void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
     if (params->has_x_checkpoint_delay) {
         s->parameters.x_checkpoint_delay = params->x_checkpoint_delay;
     }
+    if (params->has_x_multifd_threads) {
+        s->parameters.x_multifd_threads = params->x_multifd_threads;
+    }
 }


@@ -1328,6 +1342,15 @@ bool migrate_use_multifd(void)
     return s->enabled_capabilities[MIGRATION_CAPABILITY_X_MULTIFD];
 }

+int migrate_multifd_threads(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->parameters.x_multifd_threads;
+}
+
 int migrate_use_xbzrle(void)
 {
     MigrationState *s;
diff --git a/qapi-schema.json b/qapi-schema.json
index d34632e..2273864 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -981,13 +981,17 @@
 # @x-checkpoint-delay: The delay time (in ms) between two COLO checkpoints in
 #          periodic mode. (Since 2.8)
 #
+# @x-multifd-threads: Number of threads used to migrate data in parallel
+#                     The default value is 2 (since 2.9)
+#
 # Since: 2.4
 ##
 { 'enum': 'MigrationParameter',
   'data': ['compress-level', 'compress-threads', 'decompress-threads',
            'cpu-throttle-initial', 'cpu-throttle-increment',
            'tls-creds', 'tls-hostname', 'max-bandwidth',
-           'downtime-limit', 'x-checkpoint-delay' ] }
+           'downtime-limit', 'x-checkpoint-delay',
+           'x-multifd-threads'] }

 ##
 # @migrate-set-parameters:
@@ -1050,6 +1054,10 @@
 #
 # @x-checkpoint-delay: the delay time between two COLO checkpoints. (Since 2.8)
 #
+#
+# @x-multifd-threads: Number of threads used to migrate data in parallel
+#                     The default value is 1 (since 2.9)
+#
 # Since: 2.4
 ##
 { 'struct': 'MigrationParameters',
@@ -1062,7 +1070,8 @@
             '*tls-hostname': 'str',
             '*max-bandwidth': 'int',
             '*downtime-limit': 'int',
-            '*x-checkpoint-delay': 'int'} }
+            '*x-checkpoint-delay': 'int',
+            '*x-multifd-threads': 'int'} }

 ##
 # @query-migrate-parameters:
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 57+ messages in thread

* [Qemu-devel] [PATCH 07/17] migration: Create x-multifd-group parameter
  2017-01-23 21:32 [Qemu-devel] [PATCH 00/17] multifd v3 Juan Quintela
                   ` (5 preceding siblings ...)
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 06/17] migration: Create x-multifd-threads parameter Juan Quintela
@ 2017-01-23 21:32 ` Juan Quintela
  2017-01-26 11:47   ` Dr. David Alan Gilbert
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 08/17] migration: create multifd migration threads Juan Quintela
                   ` (10 subsequent siblings)
  17 siblings, 1 reply; 57+ messages in thread
From: Juan Quintela @ 2017-01-23 21:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

Indicates how many pages we are going to send in each bach to a multifd
thread.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 hmp.c                         |  8 ++++++++
 include/migration/migration.h |  1 +
 migration/migration.c         | 23 +++++++++++++++++++++++
 qapi-schema.json              | 11 +++++++++--
 4 files changed, 41 insertions(+), 2 deletions(-)

diff --git a/hmp.c b/hmp.c
index 8c7e302..e579766 100644
--- a/hmp.c
+++ b/hmp.c
@@ -325,6 +325,9 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
         monitor_printf(mon, " %s: %" PRId64,
             MigrationParameter_lookup[MIGRATION_PARAMETER_X_MULTIFD_THREADS],
             params->x_multifd_threads);
+        monitor_printf(mon, " %s: %" PRId64,
+            MigrationParameter_lookup[MIGRATION_PARAMETER_X_MULTIFD_GROUP],
+            params->x_multifd_group);
         monitor_printf(mon, "\n");
     }

@@ -1401,6 +1404,10 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
                 p.has_x_multifd_threads = true;
                 use_int_value = true;
                 break;
+            case MIGRATION_PARAMETER_X_MULTIFD_GROUP:
+                p.has_x_multifd_group = true;
+                use_int_value = true;
+                break;
             }

             if (use_int_value) {
@@ -1419,6 +1426,7 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
                 p.downtime_limit = valueint;
                 p.x_checkpoint_delay = valueint;
                 p.x_multifd_threads = valueint;
+                p.x_multifd_group = valueint;
             }

             qmp_migrate_set_parameters(&p, &err);
diff --git a/include/migration/migration.h b/include/migration/migration.h
index b35044c..515569d 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -248,6 +248,7 @@ bool migration_in_postcopy_after_devices(MigrationState *);
 MigrationState *migrate_get_current(void);

 int migrate_multifd_threads(void);
+int migrate_multifd_group(void);

 void migrate_compress_threads_create(void);
 void migrate_compress_threads_join(void);
diff --git a/migration/migration.c b/migration/migration.c
index 2fe03d8..9bde01b 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -68,6 +68,7 @@
  */
 #define DEFAULT_MIGRATE_X_CHECKPOINT_DELAY 200
 #define DEFAULT_MIGRATE_MULTIFD_THREADS 2
+#define DEFAULT_MIGRATE_MULTIFD_GROUP 16

 static NotifierList migration_state_notifiers =
     NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
@@ -103,6 +104,7 @@ MigrationState *migrate_get_current(void)
             .downtime_limit = DEFAULT_MIGRATE_SET_DOWNTIME,
             .x_checkpoint_delay = DEFAULT_MIGRATE_X_CHECKPOINT_DELAY,
             .x_multifd_threads = DEFAULT_MIGRATE_MULTIFD_THREADS,
+            .x_multifd_group = DEFAULT_MIGRATE_MULTIFD_GROUP,
         },
     };

@@ -595,6 +597,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
     params->x_checkpoint_delay = s->parameters.x_checkpoint_delay;
     params->has_x_multifd_threads = true;
     params->x_multifd_threads = s->parameters.x_multifd_threads;
+    params->has_x_multifd_group = true;
+    params->x_multifd_group = s->parameters.x_multifd_group;

     return params;
 }
@@ -865,6 +869,13 @@ void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
                    "is invalid, it should be in the range of 1 to 255");
         return;
     }
+    if (params->has_x_multifd_group &&
+            (params->x_multifd_group < 1 || params->x_multifd_group > 10000)) {
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+                   "multifd_group",
+                   "is invalid, it should be in the range of 1 to 10000");
+        return;
+    }

     if (params->has_compress_level) {
         s->parameters.compress_level = params->compress_level;
@@ -906,6 +917,9 @@ void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
     if (params->has_x_multifd_threads) {
         s->parameters.x_multifd_threads = params->x_multifd_threads;
     }
+    if (params->has_x_multifd_group) {
+        s->parameters.x_multifd_group = params->x_multifd_group;
+    }
 }


@@ -1351,6 +1365,15 @@ int migrate_multifd_threads(void)
     return s->parameters.x_multifd_threads;
 }

+int migrate_multifd_group(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->parameters.x_multifd_group;
+}
+
 int migrate_use_xbzrle(void)
 {
     MigrationState *s;
diff --git a/qapi-schema.json b/qapi-schema.json
index 2273864..54232ee 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -984,6 +984,9 @@
 # @x-multifd-threads: Number of threads used to migrate data in parallel
 #                     The default value is 2 (since 2.9)
 #
+# @x-multifd-group: Number of pages sent together to a thread
+#                     The default value is 32 (since 2.9)
+#
 # Since: 2.4
 ##
 { 'enum': 'MigrationParameter',
@@ -991,7 +994,7 @@
            'cpu-throttle-initial', 'cpu-throttle-increment',
            'tls-creds', 'tls-hostname', 'max-bandwidth',
            'downtime-limit', 'x-checkpoint-delay',
-           'x-multifd-threads'] }
+           'x-multifd-threads', 'x-multifd-group'] }

 ##
 # @migrate-set-parameters:
@@ -1058,6 +1061,9 @@
 # @x-multifd-threads: Number of threads used to migrate data in parallel
 #                     The default value is 1 (since 2.9)
 #
+# @x-multifd-group: Number of pages sent together in a bunch
+#                     The default value is 32 (since 2.9)
+#
 # Since: 2.4
 ##
 { 'struct': 'MigrationParameters',
@@ -1071,7 +1077,8 @@
             '*max-bandwidth': 'int',
             '*downtime-limit': 'int',
             '*x-checkpoint-delay': 'int',
-            '*x-multifd-threads': 'int'} }
+            '*x-multifd-threads': 'int',
+            '*x-multifd-group': 'int'} }

 ##
 # @query-migrate-parameters:
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 57+ messages in thread

* [Qemu-devel] [PATCH 08/17] migration: create multifd migration threads
  2017-01-23 21:32 [Qemu-devel] [PATCH 00/17] multifd v3 Juan Quintela
                   ` (6 preceding siblings ...)
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 07/17] migration: Create x-multifd-group parameter Juan Quintela
@ 2017-01-23 21:32 ` Juan Quintela
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 09/17] migration: Start of multiple fd work Juan Quintela
                   ` (9 subsequent siblings)
  17 siblings, 0 replies; 57+ messages in thread
From: Juan Quintela @ 2017-01-23 21:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

Creation of the threads, nothing inside yet.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 include/migration/migration.h |   4 ++
 migration/migration.c         |   6 ++
 migration/ram.c               | 150 ++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 160 insertions(+)

diff --git a/include/migration/migration.h b/include/migration/migration.h
index 515569d..f119ba0 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -249,6 +249,10 @@ MigrationState *migrate_get_current(void);

 int migrate_multifd_threads(void);
 int migrate_multifd_group(void);
+void migrate_multifd_send_threads_create(void);
+void migrate_multifd_send_threads_join(void);
+void migrate_multifd_recv_threads_create(void);
+void migrate_multifd_recv_threads_join(void);

 void migrate_compress_threads_create(void);
 void migrate_compress_threads_join(void);
diff --git a/migration/migration.c b/migration/migration.c
index 9bde01b..ab48f06 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -344,6 +344,7 @@ static void process_incoming_migration_bh(void *opaque)
                           MIGRATION_STATUS_FAILED);
         error_report_err(local_err);
         migrate_decompress_threads_join();
+        migrate_multifd_recv_threads_join();
         exit(EXIT_FAILURE);
     }

@@ -368,6 +369,7 @@ static void process_incoming_migration_bh(void *opaque)
         runstate_set(global_state_get_runstate());
     }
     migrate_decompress_threads_join();
+    migrate_multifd_recv_threads_join();
     /*
      * This must happen after any state changes since as soon as an external
      * observer sees this event they might start to prod at the VM assuming
@@ -433,6 +435,7 @@ static void process_incoming_migration_co(void *opaque)
                           MIGRATION_STATUS_FAILED);
         error_report("load of migration failed: %s", strerror(-ret));
         migrate_decompress_threads_join();
+        migrate_multifd_recv_threads_join();
         exit(EXIT_FAILURE);
     }

@@ -445,6 +448,7 @@ void migration_fd_process_incoming(QEMUFile *f)
     Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, f);

     migrate_decompress_threads_create();
+    migrate_multifd_recv_threads_create();
     qemu_file_set_blocking(f, false);
     qemu_coroutine_enter(co);
 }
@@ -974,6 +978,7 @@ static void migrate_fd_cleanup(void *opaque)
         qemu_mutex_lock_iothread();

         migrate_compress_threads_join();
+        migrate_multifd_send_threads_join();
         qemu_fclose(s->to_dst_file);
         s->to_dst_file = NULL;
     }
@@ -2020,6 +2025,7 @@ void migrate_fd_connect(MigrationState *s)
     }

     migrate_compress_threads_create();
+    migrate_multifd_send_threads_create();
     qemu_thread_create(&s->thread, "migration", migration_thread, s,
                        QEMU_THREAD_JOINABLE);
     s->migration_thread_running = true;
diff --git a/migration/ram.c b/migration/ram.c
index cea213e..939f364 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -382,6 +382,156 @@ void migrate_compress_threads_create(void)
     }
 }

+/* Multiple fd's */
+
+struct MultiFDSendParams {
+    QemuThread thread;
+    QemuCond cond;
+    QemuMutex mutex;
+    bool quit;
+};
+typedef struct MultiFDSendParams MultiFDSendParams;
+
+static MultiFDSendParams *multifd_send;
+
+static void *multifd_send_thread(void *opaque)
+{
+    MultiFDSendParams *params = opaque;
+
+    qemu_mutex_lock(&params->mutex);
+    while (!params->quit){
+        qemu_cond_wait(&params->cond, &params->mutex);
+    }
+    qemu_mutex_unlock(&params->mutex);
+
+    return NULL;
+}
+
+static void terminate_multifd_send_threads(void)
+{
+    int i, thread_count;
+
+    thread_count = migrate_multifd_threads();
+    for (i = 0; i < thread_count; i++) {
+        qemu_mutex_lock(&multifd_send[i].mutex);
+        multifd_send[i].quit = true;
+        qemu_cond_signal(&multifd_send[i].cond);
+        qemu_mutex_unlock(&multifd_send[i].mutex);
+    }
+}
+
+void migrate_multifd_send_threads_join(void)
+{
+    int i, thread_count;
+
+    if (!migrate_use_multifd()) {
+        return;
+    }
+    terminate_multifd_send_threads();
+    thread_count = migrate_multifd_threads();
+    for (i = 0; i < thread_count; i++) {
+        qemu_thread_join(&multifd_send[i].thread);
+        qemu_mutex_destroy(&multifd_send[i].mutex);
+        qemu_cond_destroy(&multifd_send[i].cond);
+    }
+    g_free(multifd_send);
+    multifd_send = NULL;
+}
+
+void migrate_multifd_send_threads_create(void)
+{
+    int i, thread_count;
+
+    if (!migrate_use_multifd()) {
+        return;
+    }
+    thread_count = migrate_multifd_threads();
+    multifd_send = g_new0(MultiFDSendParams, thread_count);
+    for (i = 0; i < thread_count; i++) {
+        char thread_name[15];
+        qemu_mutex_init(&multifd_send[i].mutex);
+        qemu_cond_init(&multifd_send[i].cond);
+        multifd_send[i].quit = false;
+        snprintf(thread_name, 15, "multifd_send_%d", i);
+        qemu_thread_create(&multifd_send[i].thread, thread_name,
+                           multifd_send_thread, &multifd_send[i],
+                           QEMU_THREAD_JOINABLE);
+    }
+}
+
+struct MultiFDRecvParams {
+    QemuThread thread;
+    QemuCond cond;
+    QemuMutex mutex;
+    bool quit;
+};
+typedef struct MultiFDRecvParams MultiFDRecvParams;
+
+static MultiFDRecvParams *multifd_recv;
+
+static void *multifd_recv_thread(void *opaque)
+{
+    MultiFDRecvParams *params = opaque;
+ 
+    qemu_mutex_lock(&params->mutex);
+    while (!params->quit){
+        qemu_cond_wait(&params->cond, &params->mutex);
+    }
+    qemu_mutex_unlock(&params->mutex);
+
+    return NULL;
+}
+
+static void terminate_multifd_recv_threads(void)
+{
+    int i, thread_count;
+
+    thread_count = migrate_multifd_threads();
+    for (i = 0; i < thread_count; i++) {
+        qemu_mutex_lock(&multifd_recv[i].mutex);
+        multifd_recv[i].quit = true;
+        qemu_cond_signal(&multifd_recv[i].cond);
+        qemu_mutex_unlock(&multifd_recv[i].mutex);
+    }
+}
+
+void migrate_multifd_recv_threads_join(void)
+{
+    int i, thread_count;
+
+    if (!migrate_use_multifd()) {
+        return;
+    }
+    terminate_multifd_recv_threads();
+    thread_count = migrate_multifd_threads();
+    for (i = 0; i < thread_count; i++) {
+        qemu_thread_join(&multifd_recv[i].thread);
+        qemu_mutex_destroy(&multifd_recv[i].mutex);
+        qemu_cond_destroy(&multifd_recv[i].cond);
+    }
+    g_free(multifd_recv);
+    multifd_recv = NULL;
+}
+
+void migrate_multifd_recv_threads_create(void)
+{
+    int i, thread_count;
+
+    if (!migrate_use_multifd()) {
+        return;
+    }
+    thread_count = migrate_multifd_threads();
+    multifd_recv = g_new0(MultiFDRecvParams, thread_count);
+    for (i = 0; i < thread_count; i++) {
+        qemu_mutex_init(&multifd_recv[i].mutex);
+        qemu_cond_init(&multifd_recv[i].cond);
+        multifd_recv[i].quit = false;
+        qemu_thread_create(&multifd_recv[i].thread, "multifd_recv",
+                           multifd_recv_thread, &multifd_recv[i],
+                           QEMU_THREAD_JOINABLE);
+    }
+}
+
 /**
  * save_page_header: Write page header to wire
  *
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 57+ messages in thread

* [Qemu-devel] [PATCH 09/17] migration: Start of multiple fd work
  2017-01-23 21:32 [Qemu-devel] [PATCH 00/17] multifd v3 Juan Quintela
                   ` (7 preceding siblings ...)
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 08/17] migration: create multifd migration threads Juan Quintela
@ 2017-01-23 21:32 ` Juan Quintela
  2017-01-27 17:45   ` Dr. David Alan Gilbert
  2017-02-13 17:35   ` Daniel P. Berrange
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 10/17] migration: create ram_multifd_page Juan Quintela
                   ` (8 subsequent siblings)
  17 siblings, 2 replies; 57+ messages in thread
From: Juan Quintela @ 2017-01-23 21:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

We create new channels for each new thread created. We only send through
them a character to be sure that we are creating the channels in the
right order.

Note: Reference count/freeing of channels is not done

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 include/migration/migration.h |  6 +++++
 migration/ram.c               | 45 +++++++++++++++++++++++++++++++++-
 migration/socket.c            | 56 +++++++++++++++++++++++++++++++++++++++++--
 3 files changed, 104 insertions(+), 3 deletions(-)

diff --git a/include/migration/migration.h b/include/migration/migration.h
index f119ba0..3989bd6 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -22,6 +22,7 @@
 #include "qapi-types.h"
 #include "exec/cpu-common.h"
 #include "qemu/coroutine_int.h"
+#include "io/channel.h"

 #define QEMU_VM_FILE_MAGIC           0x5145564d
 #define QEMU_VM_FILE_VERSION_COMPAT  0x00000002
@@ -218,6 +219,11 @@ void tcp_start_incoming_migration(const char *host_port, Error **errp);

 void tcp_start_outgoing_migration(MigrationState *s, const char *host_port, Error **errp);

+QIOChannel *socket_recv_channel_create(void);
+int socket_recv_channel_destroy(QIOChannel *recv);
+QIOChannel *socket_send_channel_create(void);
+int socket_send_channel_destroy(QIOChannel *send);
+
 void unix_start_incoming_migration(const char *path, Error **errp);

 void unix_start_outgoing_migration(MigrationState *s, const char *path, Error **errp);
diff --git a/migration/ram.c b/migration/ram.c
index 939f364..5ad7cb3 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -386,9 +386,11 @@ void migrate_compress_threads_create(void)

 struct MultiFDSendParams {
     QemuThread thread;
+    QIOChannel *c;
     QemuCond cond;
     QemuMutex mutex;
     bool quit;
+    bool started;
 };
 typedef struct MultiFDSendParams MultiFDSendParams;

@@ -397,6 +399,13 @@ static MultiFDSendParams *multifd_send;
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *params = opaque;
+    char start = 's';
+
+    qio_channel_write(params->c, &start, 1, &error_abort);
+    qemu_mutex_lock(&params->mutex);
+    params->started = true;
+    qemu_cond_signal(&params->cond);
+    qemu_mutex_unlock(&params->mutex);

     qemu_mutex_lock(&params->mutex);
     while (!params->quit){
@@ -433,6 +442,7 @@ void migrate_multifd_send_threads_join(void)
         qemu_thread_join(&multifd_send[i].thread);
         qemu_mutex_destroy(&multifd_send[i].mutex);
         qemu_cond_destroy(&multifd_send[i].cond);
+        socket_send_channel_destroy(multifd_send[i].c);
     }
     g_free(multifd_send);
     multifd_send = NULL;
@@ -452,18 +462,31 @@ void migrate_multifd_send_threads_create(void)
         qemu_mutex_init(&multifd_send[i].mutex);
         qemu_cond_init(&multifd_send[i].cond);
         multifd_send[i].quit = false;
+        multifd_send[i].started = false;
+        multifd_send[i].c = socket_send_channel_create();
+        if(!multifd_send[i].c) {
+            error_report("Error creating a send channel");
+            exit(0);
+        }
         snprintf(thread_name, 15, "multifd_send_%d", i);
         qemu_thread_create(&multifd_send[i].thread, thread_name,
                            multifd_send_thread, &multifd_send[i],
                            QEMU_THREAD_JOINABLE);
+        qemu_mutex_lock(&multifd_send[i].mutex);
+        while (!multifd_send[i].started) {
+            qemu_cond_wait(&multifd_send[i].cond, &multifd_send[i].mutex);
+        }
+        qemu_mutex_unlock(&multifd_send[i].mutex);
     }
 }

 struct MultiFDRecvParams {
     QemuThread thread;
+    QIOChannel *c;
     QemuCond cond;
     QemuMutex mutex;
     bool quit;
+    bool started;
 };
 typedef struct MultiFDRecvParams MultiFDRecvParams;

@@ -472,7 +495,14 @@ static MultiFDRecvParams *multifd_recv;
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *params = opaque;
- 
+    char start;
+
+    qio_channel_read(params->c, &start, 1, &error_abort);
+    qemu_mutex_lock(&params->mutex);
+    params->started = true;
+    qemu_cond_signal(&params->cond);
+    qemu_mutex_unlock(&params->mutex);
+
     qemu_mutex_lock(&params->mutex);
     while (!params->quit){
         qemu_cond_wait(&params->cond, &params->mutex);
@@ -508,6 +538,7 @@ void migrate_multifd_recv_threads_join(void)
         qemu_thread_join(&multifd_recv[i].thread);
         qemu_mutex_destroy(&multifd_recv[i].mutex);
         qemu_cond_destroy(&multifd_recv[i].cond);
+        socket_send_channel_destroy(multifd_recv[i].c);
     }
     g_free(multifd_recv);
     multifd_recv = NULL;
@@ -526,9 +557,21 @@ void migrate_multifd_recv_threads_create(void)
         qemu_mutex_init(&multifd_recv[i].mutex);
         qemu_cond_init(&multifd_recv[i].cond);
         multifd_recv[i].quit = false;
+        multifd_recv[i].started = false;
+        multifd_recv[i].c = socket_recv_channel_create();
+
+        if(!multifd_recv[i].c) {
+            error_report("Error creating a recv channel");
+            exit(0);
+        }
         qemu_thread_create(&multifd_recv[i].thread, "multifd_recv",
                            multifd_recv_thread, &multifd_recv[i],
                            QEMU_THREAD_JOINABLE);
+        qemu_mutex_lock(&multifd_recv[i].mutex);
+        while (!multifd_recv[i].started) {
+            qemu_cond_wait(&multifd_recv[i].cond, &multifd_recv[i].mutex);
+        }
+        qemu_mutex_unlock(&multifd_recv[i].mutex);
     }
 }

diff --git a/migration/socket.c b/migration/socket.c
index 11f80b1..7cd9213 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -24,6 +24,54 @@
 #include "io/channel-socket.h"
 #include "trace.h"

+struct SocketArgs {
+    QIOChannelSocket *ioc;
+    SocketAddress *saddr;
+    Error **errp;
+} socket_args;
+
+QIOChannel *socket_recv_channel_create(void)
+{
+    QIOChannelSocket *sioc;
+    Error *err = NULL;
+
+    sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(socket_args.ioc),
+                                     &err);
+    if (!sioc) {
+        error_report("could not accept migration connection (%s)",
+                     error_get_pretty(err));
+        return NULL;
+    }
+    return QIO_CHANNEL(sioc);
+}
+
+int socket_recv_channel_destroy(QIOChannel *recv)
+{
+    // Remove channel
+    object_unref(OBJECT(send));
+    return 0;
+}
+
+QIOChannel *socket_send_channel_create(void)
+{
+    QIOChannelSocket *sioc = qio_channel_socket_new();
+
+    qio_channel_socket_connect_sync(sioc, socket_args.saddr,
+                                    socket_args.errp);
+    qio_channel_set_delay(QIO_CHANNEL(sioc), false);
+    return QIO_CHANNEL(sioc);
+}
+
+int socket_send_channel_destroy(QIOChannel *send)
+{
+    // Remove channel
+    object_unref(OBJECT(send));
+    if (socket_args.saddr) {
+        qapi_free_SocketAddress(socket_args.saddr);
+        socket_args.saddr = NULL;
+    }
+    return 0;
+}

 static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
 {
@@ -96,6 +144,10 @@ static void socket_start_outgoing_migration(MigrationState *s,
     struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);

     data->s = s;
+
+    socket_args.saddr = saddr;
+    socket_args.errp = errp;
+
     if (saddr->type == SOCKET_ADDRESS_KIND_INET) {
         data->hostname = g_strdup(saddr->u.inet.data->host);
     }
@@ -106,7 +158,6 @@ static void socket_start_outgoing_migration(MigrationState *s,
                                      socket_outgoing_migration,
                                      data,
                                      socket_connect_data_free);
-    qapi_free_SocketAddress(saddr);
 }

 void tcp_start_outgoing_migration(MigrationState *s,
@@ -154,7 +205,7 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,

 out:
     /* Close listening socket as its no longer needed */
-    qio_channel_close(ioc, NULL);
+//    qio_channel_close(ioc, NULL);
     return FALSE; /* unregister */
 }

@@ -163,6 +214,7 @@ static void socket_start_incoming_migration(SocketAddress *saddr,
                                             Error **errp)
 {
     QIOChannelSocket *listen_ioc = qio_channel_socket_new();
+    socket_args.ioc = listen_ioc;

     qio_channel_set_name(QIO_CHANNEL(listen_ioc),
                          "migration-socket-listener");
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 57+ messages in thread

* [Qemu-devel] [PATCH 10/17] migration: create ram_multifd_page
  2017-01-23 21:32 [Qemu-devel] [PATCH 00/17] multifd v3 Juan Quintela
                   ` (8 preceding siblings ...)
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 09/17] migration: Start of multiple fd work Juan Quintela
@ 2017-01-23 21:32 ` Juan Quintela
  2017-01-27 18:02   ` Dr. David Alan Gilbert
  2017-02-02 11:20   ` Dr. David Alan Gilbert
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 11/17] migration: Create thread infrastructure for multifd send side Juan Quintela
                   ` (7 subsequent siblings)
  17 siblings, 2 replies; 57+ messages in thread
From: Juan Quintela @ 2017-01-23 21:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

The function still don't use multifd, but we have simplified
ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
counter and a new flag for this type of pages.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 hmp.c                         |  2 ++
 include/migration/migration.h |  1 +
 migration/migration.c         |  1 +
 migration/ram.c               | 51 ++++++++++++++++++++++++++++++++++++++++++-
 qapi-schema.json              |  4 +++-
 5 files changed, 57 insertions(+), 2 deletions(-)

diff --git a/hmp.c b/hmp.c
index e579766..76bc8c7 100644
--- a/hmp.c
+++ b/hmp.c
@@ -222,6 +222,8 @@ void hmp_info_migrate(Monitor *mon, const QDict *qdict)
             monitor_printf(mon, "postcopy request count: %" PRIu64 "\n",
                            info->ram->postcopy_requests);
         }
+        monitor_printf(mon, "multifd: %" PRIu64 " pages\n",
+                       info->ram->multifd);
     }

     if (info->has_disk) {
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 3989bd6..b3e4f31 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -282,6 +282,7 @@ uint64_t xbzrle_mig_pages_transferred(void);
 uint64_t xbzrle_mig_pages_overflow(void);
 uint64_t xbzrle_mig_pages_cache_miss(void);
 double xbzrle_mig_cache_miss_rate(void);
+uint64_t multifd_mig_pages_transferred(void);

 void ram_handle_compressed(void *host, uint8_t ch, uint64_t size);
 void ram_debug_dump_bitmap(unsigned long *todump, bool expected);
diff --git a/migration/migration.c b/migration/migration.c
index ab48f06..1d62b91 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -652,6 +652,7 @@ static void populate_ram_info(MigrationInfo *info, MigrationState *s)
     info->ram->mbps = s->mbps;
     info->ram->dirty_sync_count = s->dirty_sync_count;
     info->ram->postcopy_requests = s->postcopy_requests;
+    info->ram->multifd = multifd_mig_pages_transferred();

     if (s->state != MIGRATION_STATUS_COMPLETED) {
         info->ram->remaining = ram_bytes_remaining();
diff --git a/migration/ram.c b/migration/ram.c
index 5ad7cb3..c71929e 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -61,6 +61,7 @@ static uint64_t bitmap_sync_count;
 #define RAM_SAVE_FLAG_XBZRLE   0x40
 /* 0x80 is reserved in migration.h start with 0x100 next */
 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
+#define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200

 static uint8_t *ZERO_TARGET_PAGE;

@@ -141,6 +142,7 @@ typedef struct AccountingInfo {
     uint64_t dup_pages;
     uint64_t skipped_pages;
     uint64_t norm_pages;
+    uint64_t multifd_pages;
     uint64_t iterations;
     uint64_t xbzrle_bytes;
     uint64_t xbzrle_pages;
@@ -211,6 +213,11 @@ uint64_t xbzrle_mig_pages_overflow(void)
     return acct_info.xbzrle_overflows;
 }

+uint64_t multifd_mig_pages_transferred(void)
+{
+    return acct_info.multifd_pages;
+}
+
 /* This is the last block that we have visited serching for dirty pages
  */
 static RAMBlock *last_seen_block;
@@ -990,6 +997,33 @@ static int ram_save_page(QEMUFile *f, PageSearchStatus *pss,
     return pages;
 }

+static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
+                            bool last_stage, uint64_t *bytes_transferred)
+{
+    int pages;
+    uint8_t *p;
+    RAMBlock *block = pss->block;
+    ram_addr_t offset = pss->offset;
+
+    p = block->host + offset;
+
+    if (block == last_sent_block) {
+        offset |= RAM_SAVE_FLAG_CONTINUE;
+    }
+    pages = save_zero_page(f, block, offset, p, bytes_transferred);
+    if (pages == -1) {
+        *bytes_transferred +=
+            save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
+        qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
+        *bytes_transferred += TARGET_PAGE_SIZE;
+        pages = 1;
+        acct_info.norm_pages++;
+        acct_info.multifd_pages++;
+    }
+
+    return pages;
+}
+
 static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
                                 ram_addr_t offset)
 {
@@ -1427,6 +1461,8 @@ static int ram_save_target_page(MigrationState *ms, QEMUFile *f,
             res = ram_save_compressed_page(f, pss,
                                            last_stage,
                                            bytes_transferred);
+        } else if (migrate_use_multifd()) {
+            res = ram_multifd_page(f, pss, last_stage, bytes_transferred);
         } else {
             res = ram_save_page(f, pss, last_stage,
                                 bytes_transferred);
@@ -2678,6 +2714,10 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
     if (!migrate_use_compression()) {
         invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
     }
+
+    if (!migrate_use_multifd()) {
+        invalid_flags |= RAM_SAVE_FLAG_MULTIFD_PAGE;
+    }
     /* This RCU critical section can be very long running.
      * When RCU reclaims in the code start to become numerous,
      * it will be necessary to reduce the granularity of this
@@ -2705,13 +2745,17 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             if (flags & invalid_flags  & RAM_SAVE_FLAG_COMPRESS_PAGE) {
                 error_report("Received an unexpected compressed page");
             }
+            if (flags & invalid_flags  & RAM_SAVE_FLAG_MULTIFD_PAGE) {
+                error_report("Received an unexpected multifd page");
+            }

             ret = -EINVAL;
             break;
         }

         if (flags & (RAM_SAVE_FLAG_COMPRESS | RAM_SAVE_FLAG_PAGE |
-                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
+                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
+                     RAM_SAVE_FLAG_MULTIFD_PAGE)) {
             RAMBlock *block = ram_block_from_stream(f, flags);

             host = host_from_ram_block_offset(block, addr);
@@ -2786,6 +2830,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
                 break;
             }
             break;
+
+        case RAM_SAVE_FLAG_MULTIFD_PAGE:
+            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
+            break;
+
         case RAM_SAVE_FLAG_EOS:
             /* normal exit */
             break;
diff --git a/qapi-schema.json b/qapi-schema.json
index 54232ee..3e93f7f 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -574,6 +574,7 @@
 #
 # @postcopy-requests: The number of page requests received from the destination
 #        (since 2.7)
+# @multifd: number of pages sent with multifd (since 2.9)
 #
 # Since: 0.14.0
 ##
@@ -582,7 +583,8 @@
            'duplicate': 'int', 'skipped': 'int', 'normal': 'int',
            'normal-bytes': 'int', 'dirty-pages-rate' : 'int',
            'mbps' : 'number', 'dirty-sync-count' : 'int',
-           'postcopy-requests' : 'int' } }
+           'postcopy-requests' : 'int',
+           'multifd' : 'int'} }

 ##
 # @XBZRLECacheStats:
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 57+ messages in thread

* [Qemu-devel] [PATCH 11/17] migration: Create thread infrastructure for multifd send side
  2017-01-23 21:32 [Qemu-devel] [PATCH 00/17] multifd v3 Juan Quintela
                   ` (9 preceding siblings ...)
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 10/17] migration: create ram_multifd_page Juan Quintela
@ 2017-01-23 21:32 ` Juan Quintela
  2017-01-26 12:38   ` Paolo Bonzini
  2017-02-02 12:03   ` Dr. David Alan Gilbert
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 12/17] migration: really use multiple pages at a time Juan Quintela
                   ` (6 subsequent siblings)
  17 siblings, 2 replies; 57+ messages in thread
From: Juan Quintela @ 2017-01-23 21:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

We make the locking and the transfer of information specific, even if we
are still transmiting things through the main thread.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 52 insertions(+), 1 deletion(-)

diff --git a/migration/ram.c b/migration/ram.c
index c71929e..9d7bc64 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -392,17 +392,25 @@ void migrate_compress_threads_create(void)
 /* Multiple fd's */

 struct MultiFDSendParams {
+    /* not changed */
     QemuThread thread;
     QIOChannel *c;
     QemuCond cond;
     QemuMutex mutex;
+    /* protected by param mutex */
     bool quit;
     bool started;
+    uint8_t *address;
+    /* protected by multifd mutex */
+    bool done;
 };
 typedef struct MultiFDSendParams MultiFDSendParams;

 static MultiFDSendParams *multifd_send;

+QemuMutex multifd_send_mutex;
+QemuCond multifd_send_cond;
+
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *params = opaque;
@@ -416,7 +424,17 @@ static void *multifd_send_thread(void *opaque)

     qemu_mutex_lock(&params->mutex);
     while (!params->quit){
-        qemu_cond_wait(&params->cond, &params->mutex);
+        if (params->address) {
+            params->address = 0;
+            qemu_mutex_unlock(&params->mutex);
+            qemu_mutex_lock(&multifd_send_mutex);
+            params->done = true;
+            qemu_cond_signal(&multifd_send_cond);
+            qemu_mutex_unlock(&multifd_send_mutex);
+            qemu_mutex_lock(&params->mutex);
+        } else {
+            qemu_cond_wait(&params->cond, &params->mutex);
+        }
     }
     qemu_mutex_unlock(&params->mutex);

@@ -464,12 +482,16 @@ void migrate_multifd_send_threads_create(void)
     }
     thread_count = migrate_multifd_threads();
     multifd_send = g_new0(MultiFDSendParams, thread_count);
+    qemu_mutex_init(&multifd_send_mutex);
+    qemu_cond_init(&multifd_send_cond);
     for (i = 0; i < thread_count; i++) {
         char thread_name[15];
         qemu_mutex_init(&multifd_send[i].mutex);
         qemu_cond_init(&multifd_send[i].cond);
         multifd_send[i].quit = false;
         multifd_send[i].started = false;
+        multifd_send[i].done = true;
+        multifd_send[i].address = 0;
         multifd_send[i].c = socket_send_channel_create();
         if(!multifd_send[i].c) {
             error_report("Error creating a send channel");
@@ -487,6 +509,34 @@ void migrate_multifd_send_threads_create(void)
     }
 }

+static int multifd_send_page(uint8_t *address)
+{
+    int i, thread_count;
+    bool found = false;
+
+    thread_count = migrate_multifd_threads();
+    qemu_mutex_lock(&multifd_send_mutex);
+    while (!found) {
+        for (i = 0; i < thread_count; i++) {
+            if (multifd_send[i].done) {
+                multifd_send[i].done = false;
+                found = true;
+                break;
+            }
+        }
+        if (!found) {
+            qemu_cond_wait(&multifd_send_cond, &multifd_send_mutex);
+        }
+    }
+    qemu_mutex_unlock(&multifd_send_mutex);
+    qemu_mutex_lock(&multifd_send[i].mutex);
+    multifd_send[i].address = address;
+    qemu_cond_signal(&multifd_send[i].cond);
+    qemu_mutex_unlock(&multifd_send[i].mutex);
+
+    return 0;
+}
+
 struct MultiFDRecvParams {
     QemuThread thread;
     QIOChannel *c;
@@ -1015,6 +1065,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
         *bytes_transferred +=
             save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
         qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
+        multifd_send_page(p);
         *bytes_transferred += TARGET_PAGE_SIZE;
         pages = 1;
         acct_info.norm_pages++;
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 57+ messages in thread

* [Qemu-devel] [PATCH 12/17] migration: really use multiple pages at a time
  2017-01-23 21:32 [Qemu-devel] [PATCH 00/17] multifd v3 Juan Quintela
                   ` (10 preceding siblings ...)
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 11/17] migration: Create thread infrastructure for multifd send side Juan Quintela
@ 2017-01-23 21:32 ` Juan Quintela
  2017-02-03 10:54   ` Dr. David Alan Gilbert
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 13/17] migration: Send the fd number which we are going to use for this page Juan Quintela
                   ` (5 subsequent siblings)
  17 siblings, 1 reply; 57+ messages in thread
From: Juan Quintela @ 2017-01-23 21:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

We now send several pages at a time each time that we wakeup a thread.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 44 ++++++++++++++++++++++++++++++++++++++------
 1 file changed, 38 insertions(+), 6 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 9d7bc64..1267730 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -391,6 +391,13 @@ void migrate_compress_threads_create(void)

 /* Multiple fd's */

+
+typedef struct {
+    int num;
+    int size;
+    uint8_t **address;
+} multifd_pages_t;
+
 struct MultiFDSendParams {
     /* not changed */
     QemuThread thread;
@@ -400,7 +407,7 @@ struct MultiFDSendParams {
     /* protected by param mutex */
     bool quit;
     bool started;
-    uint8_t *address;
+    multifd_pages_t pages;
     /* protected by multifd mutex */
     bool done;
 };
@@ -424,8 +431,8 @@ static void *multifd_send_thread(void *opaque)

     qemu_mutex_lock(&params->mutex);
     while (!params->quit){
-        if (params->address) {
-            params->address = 0;
+        if (params->pages.num) {
+            params->pages.num = 0;
             qemu_mutex_unlock(&params->mutex);
             qemu_mutex_lock(&multifd_send_mutex);
             params->done = true;
@@ -473,6 +480,13 @@ void migrate_multifd_send_threads_join(void)
     multifd_send = NULL;
 }

+static void multifd_init_group(multifd_pages_t *pages)
+{
+    pages->num = 0;
+    pages->size = migrate_multifd_group();
+    pages->address = g_malloc0(pages->size * sizeof(uint8_t *));
+}
+
 void migrate_multifd_send_threads_create(void)
 {
     int i, thread_count;
@@ -491,7 +505,7 @@ void migrate_multifd_send_threads_create(void)
         multifd_send[i].quit = false;
         multifd_send[i].started = false;
         multifd_send[i].done = true;
-        multifd_send[i].address = 0;
+        multifd_init_group(&multifd_send[i].pages);
         multifd_send[i].c = socket_send_channel_create();
         if(!multifd_send[i].c) {
             error_report("Error creating a send channel");
@@ -511,8 +525,22 @@ void migrate_multifd_send_threads_create(void)

 static int multifd_send_page(uint8_t *address)
 {
-    int i, thread_count;
+    int i, j, thread_count;
     bool found = false;
+    static multifd_pages_t pages;
+    static bool once = false;
+
+    if (!once) {
+        multifd_init_group(&pages);
+        once = true;
+    }
+
+    pages.address[pages.num] = address;
+    pages.num++;
+
+    if (pages.num < (pages.size - 1)) {
+        return UINT16_MAX;
+    }

     thread_count = migrate_multifd_threads();
     qemu_mutex_lock(&multifd_send_mutex);
@@ -530,7 +558,11 @@ static int multifd_send_page(uint8_t *address)
     }
     qemu_mutex_unlock(&multifd_send_mutex);
     qemu_mutex_lock(&multifd_send[i].mutex);
-    multifd_send[i].address = address;
+    multifd_send[i].pages.num = pages.num;
+    for(j = 0; j < pages.size; j++) {
+        multifd_send[i].pages.address[j] = pages.address[j];
+    }
+    pages.num = 0;
     qemu_cond_signal(&multifd_send[i].cond);
     qemu_mutex_unlock(&multifd_send[i].mutex);

-- 
2.9.3

^ permalink raw reply related	[flat|nested] 57+ messages in thread

* [Qemu-devel] [PATCH 13/17] migration: Send the fd number which we are going to use for this page
  2017-01-23 21:32 [Qemu-devel] [PATCH 00/17] multifd v3 Juan Quintela
                   ` (11 preceding siblings ...)
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 12/17] migration: really use multiple pages at a time Juan Quintela
@ 2017-01-23 21:32 ` Juan Quintela
  2017-02-03 10:59   ` Dr. David Alan Gilbert
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 14/17] migration: Create thread infrastructure for multifd recv side Juan Quintela
                   ` (4 subsequent siblings)
  17 siblings, 1 reply; 57+ messages in thread
From: Juan Quintela @ 2017-01-23 21:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

We are still sending the page through the main channel, that would
change later in the series

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 1267730..ca94704 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -566,7 +566,7 @@ static int multifd_send_page(uint8_t *address)
     qemu_cond_signal(&multifd_send[i].cond);
     qemu_mutex_unlock(&multifd_send[i].mutex);

-    return 0;
+    return i;
 }

 struct MultiFDRecvParams {
@@ -1083,6 +1083,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
                             bool last_stage, uint64_t *bytes_transferred)
 {
     int pages;
+    uint16_t fd_num;
     uint8_t *p;
     RAMBlock *block = pss->block;
     ram_addr_t offset = pss->offset;
@@ -1096,8 +1097,10 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
     if (pages == -1) {
         *bytes_transferred +=
             save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
+        fd_num = multifd_send_page(p);
+        qemu_put_be16(f, fd_num);
+        *bytes_transferred += 2; /* size of fd_num */
         qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
-        multifd_send_page(p);
         *bytes_transferred += TARGET_PAGE_SIZE;
         pages = 1;
         acct_info.norm_pages++;
@@ -2815,6 +2818,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
     while (!postcopy_running && !ret && !(flags & RAM_SAVE_FLAG_EOS)) {
         ram_addr_t addr, total_ram_bytes;
         void *host = NULL;
+        uint16_t fd_num;
         uint8_t ch;

         addr = qemu_get_be64(f);
@@ -2915,6 +2919,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             break;

         case RAM_SAVE_FLAG_MULTIFD_PAGE:
+            fd_num = qemu_get_be16(f);
+            if (fd_num != 0) {
+                /* this is yet an unused variable, changed later */
+                fd_num = fd_num;
+            }
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;

-- 
2.9.3

^ permalink raw reply related	[flat|nested] 57+ messages in thread

* [Qemu-devel] [PATCH 14/17] migration: Create thread infrastructure for multifd recv side
  2017-01-23 21:32 [Qemu-devel] [PATCH 00/17] multifd v3 Juan Quintela
                   ` (12 preceding siblings ...)
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 13/17] migration: Send the fd number which we are going to use for this page Juan Quintela
@ 2017-01-23 21:32 ` Juan Quintela
  2017-01-26 12:39   ` Paolo Bonzini
  2017-02-03 11:24   ` Dr. David Alan Gilbert
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 15/17] migration: Test new fd infrastructure Juan Quintela
                   ` (3 subsequent siblings)
  17 siblings, 2 replies; 57+ messages in thread
From: Juan Quintela @ 2017-01-23 21:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

We make the locking and the transfer of information specific, even if we
are still receiving things through the main thread.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 77 +++++++++++++++++++++++++++++++++++++++++++++++++--------
 1 file changed, 67 insertions(+), 10 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index ca94704..4e530ea 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -523,7 +523,7 @@ void migrate_multifd_send_threads_create(void)
     }
 }

-static int multifd_send_page(uint8_t *address)
+static uint16_t multifd_send_page(uint8_t *address, bool last_page)
 {
     int i, j, thread_count;
     bool found = false;
@@ -538,8 +538,10 @@ static int multifd_send_page(uint8_t *address)
     pages.address[pages.num] = address;
     pages.num++;

-    if (pages.num < (pages.size - 1)) {
-        return UINT16_MAX;
+    if (!last_page) {
+        if (pages.num < (pages.size - 1)) {
+            return UINT16_MAX;
+        }
     }

     thread_count = migrate_multifd_threads();
@@ -570,17 +572,25 @@ static int multifd_send_page(uint8_t *address)
 }

 struct MultiFDRecvParams {
+    /* not changed */
     QemuThread thread;
     QIOChannel *c;
     QemuCond cond;
     QemuMutex mutex;
+    /* proteced by param mutex */
     bool quit;
     bool started;
+    multifd_pages_t pages;
+    /* proteced by multifd mutex */
+    bool done;
 };
 typedef struct MultiFDRecvParams MultiFDRecvParams;

 static MultiFDRecvParams *multifd_recv;

+QemuMutex multifd_recv_mutex;
+QemuCond  multifd_recv_cond;
+
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *params = opaque;
@@ -594,7 +604,17 @@ static void *multifd_recv_thread(void *opaque)

     qemu_mutex_lock(&params->mutex);
     while (!params->quit){
-        qemu_cond_wait(&params->cond, &params->mutex);
+        if (params->pages.num) {
+            params->pages.num = 0;
+            qemu_mutex_unlock(&params->mutex);
+            qemu_mutex_lock(&multifd_recv_mutex);
+            params->done = true;
+            qemu_cond_signal(&multifd_recv_cond);
+            qemu_mutex_unlock(&multifd_recv_mutex);
+            qemu_mutex_lock(&params->mutex);
+        } else {
+            qemu_cond_wait(&params->cond, &params->mutex);
+        }
     }
     qemu_mutex_unlock(&params->mutex);

@@ -647,8 +667,9 @@ void migrate_multifd_recv_threads_create(void)
         qemu_cond_init(&multifd_recv[i].cond);
         multifd_recv[i].quit = false;
         multifd_recv[i].started = false;
+        multifd_recv[i].done = true;
+        multifd_init_group(&multifd_recv[i].pages);
         multifd_recv[i].c = socket_recv_channel_create();
-
         if(!multifd_recv[i].c) {
             error_report("Error creating a recv channel");
             exit(0);
@@ -664,6 +685,45 @@ void migrate_multifd_recv_threads_create(void)
     }
 }

+static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
+{
+    int i, thread_count;
+    MultiFDRecvParams *params;
+    static multifd_pages_t pages;
+    static bool once = false;
+
+    if (!once) {
+        multifd_init_group(&pages);
+        once = true;
+    }
+
+    pages.address[pages.num] = address;
+    pages.num++;
+
+    if (fd_num == UINT16_MAX) {
+        return;
+    }
+
+    thread_count = migrate_multifd_threads();
+    assert(fd_num < thread_count);
+    params = &multifd_recv[fd_num];
+
+    qemu_mutex_lock(&multifd_recv_mutex);
+    while (!params->done) {
+        qemu_cond_wait(&multifd_recv_cond, &multifd_recv_mutex);
+    }
+    params->done = false;
+    qemu_mutex_unlock(&multifd_recv_mutex);
+    qemu_mutex_lock(&params->mutex);
+    for(i = 0; i < pages.num; i++) {
+        params->pages.address[i] = pages.address[i];
+    }
+    params->pages.num = pages.num;
+    pages.num = 0;
+    qemu_cond_signal(&params->cond);
+    qemu_mutex_unlock(&params->mutex);
+}
+
 /**
  * save_page_header: Write page header to wire
  *
@@ -1097,7 +1157,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
     if (pages == -1) {
         *bytes_transferred +=
             save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
-        fd_num = multifd_send_page(p);
+        fd_num = multifd_send_page(p, migration_dirty_pages == 1);
         qemu_put_be16(f, fd_num);
         *bytes_transferred += 2; /* size of fd_num */
         qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
@@ -2920,10 +2980,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)

         case RAM_SAVE_FLAG_MULTIFD_PAGE:
             fd_num = qemu_get_be16(f);
-            if (fd_num != 0) {
-                /* this is yet an unused variable, changed later */
-                fd_num = fd_num;
-            }
+            multifd_recv_page(host, fd_num);
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;

-- 
2.9.3

^ permalink raw reply related	[flat|nested] 57+ messages in thread

* [Qemu-devel] [PATCH 15/17] migration: Test new fd infrastructure
  2017-01-23 21:32 [Qemu-devel] [PATCH 00/17] multifd v3 Juan Quintela
                   ` (13 preceding siblings ...)
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 14/17] migration: Create thread infrastructure for multifd recv side Juan Quintela
@ 2017-01-23 21:32 ` Juan Quintela
  2017-02-03 11:36   ` Dr. David Alan Gilbert
  2017-02-14 11:15   ` Daniel P. Berrange
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 16/17] migration: [HACK]Transfer pages over new channels Juan Quintela
                   ` (2 subsequent siblings)
  17 siblings, 2 replies; 57+ messages in thread
From: Juan Quintela @ 2017-01-23 21:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

We just send the address through the alternate channels and test that it
is ok.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 36 ++++++++++++++++++++++++++++++++++++
 1 file changed, 36 insertions(+)

diff --git a/migration/ram.c b/migration/ram.c
index 4e530ea..95af694 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -432,8 +432,22 @@ static void *multifd_send_thread(void *opaque)
     qemu_mutex_lock(&params->mutex);
     while (!params->quit){
         if (params->pages.num) {
+            int i;
+            int num;
+
+            num = params->pages.num;
             params->pages.num = 0;
             qemu_mutex_unlock(&params->mutex);
+
+            for(i=0; i < num; i++) {
+                if (qio_channel_write(params->c,
+                                      (const char *)&params->pages.address[i],
+                                      sizeof(uint8_t *), &error_abort)
+                    != sizeof(uint8_t*)) {
+                    /* Shuoudn't ever happen */
+                    exit(-1);
+                }
+            }
             qemu_mutex_lock(&multifd_send_mutex);
             params->done = true;
             qemu_cond_signal(&multifd_send_cond);
@@ -594,6 +608,7 @@ QemuCond  multifd_recv_cond;
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *params = opaque;
+    uint8_t *recv_address;
     char start;

     qio_channel_read(params->c, &start, 1, &error_abort);
@@ -605,8 +620,29 @@ static void *multifd_recv_thread(void *opaque)
     qemu_mutex_lock(&params->mutex);
     while (!params->quit){
         if (params->pages.num) {
+            int i;
+            int num;
+
+            num = params->pages.num;
             params->pages.num = 0;
             qemu_mutex_unlock(&params->mutex);
+
+            for(i = 0; i < num; i++) {
+                if (qio_channel_read(params->c,
+                                     (char *)&recv_address,
+                                     sizeof(uint8_t*), &error_abort)
+                    != sizeof(uint8_t *)) {
+                    /* shouldn't ever happen */
+                    exit(-1);
+                }
+                if (recv_address != params->pages.address[i]) {
+                    printf("We received %p what we were expecting %p (%d)\n",
+                           recv_address,
+                           params->pages.address[i], i);
+                    exit(-1);
+                }
+            }
+
             qemu_mutex_lock(&multifd_recv_mutex);
             params->done = true;
             qemu_cond_signal(&multifd_recv_cond);
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 57+ messages in thread

* [Qemu-devel] [PATCH 16/17] migration: [HACK]Transfer pages over new channels
  2017-01-23 21:32 [Qemu-devel] [PATCH 00/17] multifd v3 Juan Quintela
                   ` (14 preceding siblings ...)
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 15/17] migration: Test new fd infrastructure Juan Quintela
@ 2017-01-23 21:32 ` Juan Quintela
  2017-02-03 11:41   ` Dr. David Alan Gilbert
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 17/17] migration: flush receive queue Juan Quintela
  2017-01-23 22:12 ` [Qemu-devel] [PATCH 00/17] multifd v3 no-reply
  17 siblings, 1 reply; 57+ messages in thread
From: Juan Quintela @ 2017-01-23 21:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

We switch for sending the page number to send real pages.

[HACK]
How we calculate the bandwidth is beyond repair, there is a hack there
that would work for x86 and archs that have 4kb pages.

If you are having a nice day just go to migration/ram.c and look at
acct_update_position().  Now you are depressed, right?

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/migration.c | 15 +++++++++++----
 migration/ram.c       | 25 +++++++++----------------
 2 files changed, 20 insertions(+), 20 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 1d62b91..cbbf2a3 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -1839,7 +1839,8 @@ static void *migration_thread(void *opaque)
     /* Used by the bandwidth calcs, updated later */
     int64_t initial_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
     int64_t setup_start = qemu_clock_get_ms(QEMU_CLOCK_HOST);
-    int64_t initial_bytes = 0;
+    int64_t qemu_file_bytes = 0;
+    int64_t multifd_pages = 0;
     int64_t max_size = 0;
     int64_t start_time = initial_time;
     int64_t end_time;
@@ -1923,9 +1924,14 @@ static void *migration_thread(void *opaque)
         }
         current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
         if (current_time >= initial_time + BUFFER_DELAY) {
-            uint64_t transferred_bytes = qemu_ftell(s->to_dst_file) -
-                                         initial_bytes;
             uint64_t time_spent = current_time - initial_time;
+            uint64_t qemu_file_bytes_now = qemu_ftell(s->to_dst_file);
+            uint64_t multifd_pages_now = multifd_mig_pages_transferred();
+            /* Hack ahead.  Why the hell we don't have a function to now the
+               target_page_size.  Hard coding it to 4096 */
+            uint64_t transferred_bytes =
+                (qemu_file_bytes_now - qemu_file_bytes) +
+                (multifd_pages_now - multifd_pages) * 4096;
             double bandwidth = (double)transferred_bytes / time_spent;
             max_size = bandwidth * s->parameters.downtime_limit;

@@ -1942,7 +1948,8 @@ static void *migration_thread(void *opaque)

             qemu_file_reset_rate_limit(s->to_dst_file);
             initial_time = current_time;
-            initial_bytes = qemu_ftell(s->to_dst_file);
+            qemu_file_bytes = qemu_file_bytes_now;
+            multifd_pages = multifd_pages_now;
         }
         if (qemu_file_rate_limit(s->to_dst_file)) {
             /* usleep expects microseconds */
diff --git a/migration/ram.c b/migration/ram.c
index 95af694..28d099f 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -441,9 +441,9 @@ static void *multifd_send_thread(void *opaque)

             for(i=0; i < num; i++) {
                 if (qio_channel_write(params->c,
-                                      (const char *)&params->pages.address[i],
-                                      sizeof(uint8_t *), &error_abort)
-                    != sizeof(uint8_t*)) {
+                                      (const char *)params->pages.address[i],
+                                      TARGET_PAGE_SIZE, &error_abort)
+                    != TARGET_PAGE_SIZE) {
                     /* Shuoudn't ever happen */
                     exit(-1);
                 }
@@ -608,7 +608,6 @@ QemuCond  multifd_recv_cond;
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *params = opaque;
-    uint8_t *recv_address;
     char start;

     qio_channel_read(params->c, &start, 1, &error_abort);
@@ -629,20 +628,13 @@ static void *multifd_recv_thread(void *opaque)

             for(i = 0; i < num; i++) {
                 if (qio_channel_read(params->c,
-                                     (char *)&recv_address,
-                                     sizeof(uint8_t*), &error_abort)
-                    != sizeof(uint8_t *)) {
+                                     (char *)params->pages.address[i],
+                                     TARGET_PAGE_SIZE, &error_abort)
+                    != TARGET_PAGE_SIZE) {
                     /* shouldn't ever happen */
                     exit(-1);
                 }
-                if (recv_address != params->pages.address[i]) {
-                    printf("We received %p what we were expecting %p (%d)\n",
-                           recv_address,
-                           params->pages.address[i], i);
-                    exit(-1);
-                }
             }
-
             qemu_mutex_lock(&multifd_recv_mutex);
             params->done = true;
             qemu_cond_signal(&multifd_recv_cond);
@@ -1195,8 +1187,10 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
             save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
         fd_num = multifd_send_page(p, migration_dirty_pages == 1);
         qemu_put_be16(f, fd_num);
+        if (fd_num != UINT16_MAX) {
+            qemu_fflush(f);
+        }
         *bytes_transferred += 2; /* size of fd_num */
-        qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
         *bytes_transferred += TARGET_PAGE_SIZE;
         pages = 1;
         acct_info.norm_pages++;
@@ -3017,7 +3011,6 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
         case RAM_SAVE_FLAG_MULTIFD_PAGE:
             fd_num = qemu_get_be16(f);
             multifd_recv_page(host, fd_num);
-            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;

         case RAM_SAVE_FLAG_EOS:
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 57+ messages in thread

* [Qemu-devel] [PATCH 17/17] migration: flush receive queue
  2017-01-23 21:32 [Qemu-devel] [PATCH 00/17] multifd v3 Juan Quintela
                   ` (15 preceding siblings ...)
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 16/17] migration: [HACK]Transfer pages over new channels Juan Quintela
@ 2017-01-23 21:32 ` Juan Quintela
  2017-02-03 12:28   ` Dr. David Alan Gilbert
  2017-01-23 22:12 ` [Qemu-devel] [PATCH 00/17] multifd v3 no-reply
  17 siblings, 1 reply; 57+ messages in thread
From: Juan Quintela @ 2017-01-23 21:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: amit.shah, dgilbert

Each time that we sync the bitmap, it is a possiblity that we receive
a page that is being processed by a different thread.  We fix this
problem just making sure that we wait for all receiving threads to
finish its work before we procedeed with the next stage.

We are low on page flags, so we use a combination that is not valid to
emit that message:  MULTIFD_PAGE and COMPRESSED.

I tried to make a migration command for it, but it don't work because
we sync the bitmap sometimes when we have already sent the beggining
of the section, so I just added a new page flag.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 include/migration/migration.h |  1 +
 migration/ram.c               | 46 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 47 insertions(+)

diff --git a/include/migration/migration.h b/include/migration/migration.h
index b3e4f31..1bd6bc0 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -259,6 +259,7 @@ void migrate_multifd_send_threads_create(void);
 void migrate_multifd_send_threads_join(void);
 void migrate_multifd_recv_threads_create(void);
 void migrate_multifd_recv_threads_join(void);
+void qemu_savevm_send_multifd_flush(QEMUFile *f);

 void migrate_compress_threads_create(void);
 void migrate_compress_threads_join(void);
diff --git a/migration/ram.c b/migration/ram.c
index 28d099f..3baead8 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -63,6 +63,13 @@ static uint64_t bitmap_sync_count;
 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
 #define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200

+/* We are getting low on pages flags, so we start using combinations 
+   When we need to flush a page, we sent it as
+   RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_COMPRESS_PAGE
+   We don't allow that combination
+*/
+
+
 static uint8_t *ZERO_TARGET_PAGE;

 static inline bool is_zero_range(uint8_t *p, uint64_t size)
@@ -391,6 +398,9 @@ void migrate_compress_threads_create(void)

 /* Multiple fd's */

+/* Indicates if we have synced the bitmap and we need to assure that
+   target has processeed all previous pages */
+bool multifd_needs_flush = false;

 typedef struct {
     int num;
@@ -752,6 +762,25 @@ static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
     qemu_mutex_unlock(&params->mutex);
 }

+
+static int multifd_flush(void)
+{
+    int i, thread_count;
+
+    if (!migrate_use_multifd()) {
+        return 0;
+    }
+    thread_count = migrate_multifd_threads();
+    qemu_mutex_lock(&multifd_recv_mutex);
+    for (i = 0; i < thread_count; i++) {
+        while(!multifd_recv[i].done) {
+            qemu_cond_wait(&multifd_recv_cond, &multifd_recv_mutex);
+        }
+    }
+    qemu_mutex_unlock(&multifd_recv_mutex);
+    return 0;
+}
+
 /**
  * save_page_header: Write page header to wire
  *
@@ -768,6 +797,12 @@ static size_t save_page_header(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
 {
     size_t size, len;

+    if (multifd_needs_flush &&
+        (offset & RAM_SAVE_FLAG_MULTIFD_PAGE)) {
+        offset |= RAM_SAVE_FLAG_COMPRESS;
+        multifd_needs_flush = false;
+    }
+
     qemu_put_be64(f, offset);
     size = 8;

@@ -2450,6 +2485,9 @@ static int ram_save_complete(QEMUFile *f, void *opaque)

     if (!migration_in_postcopy(migrate_get_current())) {
         migration_bitmap_sync();
+        if (migrate_use_multifd()) {
+            multifd_needs_flush = true;
+        }
     }

     ram_control_before_iterate(f, RAM_CONTROL_FINISH);
@@ -2491,6 +2529,9 @@ static void ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size,
         qemu_mutex_lock_iothread();
         rcu_read_lock();
         migration_bitmap_sync();
+        if (migrate_use_multifd()) {
+            multifd_needs_flush = true;
+        }
         rcu_read_unlock();
         qemu_mutex_unlock_iothread();
         remaining_size = ram_save_remaining() * TARGET_PAGE_SIZE;
@@ -2930,6 +2971,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             break;
         }

+        if ((flags & (RAM_SAVE_FLAG_MULTIFD_PAGE|RAM_SAVE_FLAG_COMPRESS))
+                  == (RAM_SAVE_FLAG_MULTIFD_PAGE|RAM_SAVE_FLAG_COMPRESS)) {
+            multifd_flush();
+            flags = flags & ~RAM_SAVE_FLAG_COMPRESS;
+        }
         if (flags & (RAM_SAVE_FLAG_COMPRESS | RAM_SAVE_FLAG_PAGE |
                      RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
                      RAM_SAVE_FLAG_MULTIFD_PAGE)) {
-- 
2.9.3

^ permalink raw reply related	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 00/17] multifd v3
  2017-01-23 21:32 [Qemu-devel] [PATCH 00/17] multifd v3 Juan Quintela
                   ` (16 preceding siblings ...)
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 17/17] migration: flush receive queue Juan Quintela
@ 2017-01-23 22:12 ` no-reply
  17 siblings, 0 replies; 57+ messages in thread
From: no-reply @ 2017-01-23 22:12 UTC (permalink / raw)
  To: quintela; +Cc: famz, qemu-devel, amit.shah, dgilbert

Hi,

Your series seems to have some coding style problems. See output below for
more information:

Type: series
Subject: [Qemu-devel] [PATCH 00/17] multifd v3
Message-id: 1485207141-1941-1-git-send-email-quintela@redhat.com

=== TEST SCRIPT BEGIN ===
#!/bin/bash

BASE=base
n=1
total=$(git log --oneline $BASE.. | wc -l)
failed=0

# Useful git options
git config --local diff.renamelimit 0
git config --local diff.renames True

commits="$(git log --format=%H --reverse $BASE..)"
for c in $commits; do
    echo "Checking PATCH $n/$total: $(git log -n 1 --format=%s $c)..."
    if ! git show $c --format=email | ./scripts/checkpatch.pl --mailback -; then
        failed=1
        echo
    fi
    n=$((n+1))
done

exit $failed
=== TEST SCRIPT END ===

Updating 3c8cf5a9c21ff8782164d1def7f44bd888713384
Switched to a new branch 'test'
977ae47 migration: flush receive queue
94cd0ed migration: [HACK]Transfer pages over new channels
403e546 migration: Test new fd infrastructure
1b26e24 migration: Create thread infrastructure for multifd recv side
20ead0e migration: Send the fd number which we are going to use for this page
24b9a1c migration: really use multiple pages at a time
d97ec5e migration: Create thread infrastructure for multifd send side
b2952e8 migration: create ram_multifd_page
62325d2 migration: Start of multiple fd work
8734a27 migration: create multifd migration threads
7723249 migration: Create x-multifd-group parameter
9b1abf5 migration: Create x-multifd-threads parameter
d9ec656 migration: Add multifd capability
bb058af migration: Don't create decompression threads if not enabled
3bb5588 migration: Test for disabled features on reception
cefb07a migration: create Migration Incoming State at init time
ce46b29 migration: transform remained DPRINTF into trace_

=== OUTPUT BEGIN ===
Checking PATCH 1/17: migration: transform remained DPRINTF into trace_...
Checking PATCH 2/17: migration: create Migration Incoming State at init time...
Checking PATCH 3/17: migration: Test for disabled features on reception...
Checking PATCH 4/17: migration: Don't create decompression threads if not enabled...
Checking PATCH 5/17: migration: Add multifd capability...
Checking PATCH 6/17: migration: Create x-multifd-threads parameter...
WARNING: line over 80 characters
#93: FILE: migration/migration.c:862:
+            (params->x_multifd_threads < 1 || params->x_multifd_threads > 255)) {

total: 0 errors, 1 warnings, 130 lines checked

Your patch has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.
Checking PATCH 7/17: migration: Create x-multifd-group parameter...
Checking PATCH 8/17: migration: create multifd migration threads...
ERROR: space required before the open brace '{'
#104: FILE: migration/ram.c:402:
+    while (!params->quit){

ERROR: trailing whitespace
#177: FILE: migration/ram.c:475:
+ $

ERROR: space required before the open brace '{'
#179: FILE: migration/ram.c:477:
+    while (!params->quit){

total: 3 errors, 0 warnings, 208 lines checked

Your patch has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.

Checking PATCH 9/17: migration: Start of multiple fd work...
ERROR: space required before the open parenthesis '('
#83: FILE: migration/ram.c:467:
+        if(!multifd_send[i].c) {

ERROR: space required before the open parenthesis '('
#140: FILE: migration/ram.c:563:
+        if(!multifd_recv[i].c) {

ERROR: do not use C99 // comments
#186: FILE: migration/socket.c:50:
+    // Remove channel

ERROR: do not use C99 // comments
#203: FILE: migration/socket.c:67:
+    // Remove channel

ERROR: do not use C99 // comments
#238: FILE: migration/socket.c:209:
+//    qio_channel_close(ioc, NULL);

total: 5 errors, 0 warnings, 209 lines checked

Your patch has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.

Checking PATCH 10/17: migration: create ram_multifd_page...
Checking PATCH 11/17: migration: Create thread infrastructure for multifd send side...
Checking PATCH 12/17: migration: really use multiple pages at a time...
ERROR: do not initialise statics to 0 or NULL
#80: FILE: migration/ram.c:531:
+    static bool once = false;

ERROR: space required before the open parenthesis '('
#102: FILE: migration/ram.c:562:
+    for(j = 0; j < pages.size; j++) {

total: 2 errors, 0 warnings, 87 lines checked

Your patch has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.

Checking PATCH 13/17: migration: Send the fd number which we are going to use for this page...
Checking PATCH 14/17: migration: Create thread infrastructure for multifd recv side...
ERROR: do not initialise statics to 0 or NULL
#103: FILE: migration/ram.c:693:
+    static bool once = false;

ERROR: space required before the open parenthesis '('
#128: FILE: migration/ram.c:718:
+    for(i = 0; i < pages.num; i++) {

total: 2 errors, 0 warnings, 137 lines checked

Your patch has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.

Checking PATCH 15/17: migration: Test new fd infrastructure...
ERROR: spaces required around that '=' (ctx:VxV)
#27: FILE: migration/ram.c:442:
+            for(i=0; i < num; i++) {
                  ^

ERROR: space required before the open parenthesis '('
#27: FILE: migration/ram.c:442:
+            for(i=0; i < num; i++) {

ERROR: "(foo*)" should be "(foo *)"
#31: FILE: migration/ram.c:446:
+                    != sizeof(uint8_t*)) {

ERROR: space required before the open parenthesis '('
#58: FILE: migration/ram.c:630:
+            for(i = 0; i < num; i++) {

ERROR: "(foo*)" should be "(foo *)"
#61: FILE: migration/ram.c:633:
+                                     sizeof(uint8_t*), &error_abort)

total: 5 errors, 0 warnings, 58 lines checked

Your patch has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.

Checking PATCH 16/17: migration: [HACK]Transfer pages over new channels...
Checking PATCH 17/17: migration: flush receive queue...
ERROR: trailing whitespace
#41: FILE: migration/ram.c:66:
+/* We are getting low on pages flags, so we start using combinations $

ERROR: do not initialise globals to 0 or NULL
#57: FILE: migration/ram.c:403:
+bool multifd_needs_flush = false;

ERROR: space required before the open parenthesis '('
#76: FILE: migration/ram.c:776:
+        while(!multifd_recv[i].done) {

ERROR: spaces required around that '|' (ctx:VxV)
#124: FILE: migration/ram.c:2974:
+        if ((flags & (RAM_SAVE_FLAG_MULTIFD_PAGE|RAM_SAVE_FLAG_COMPRESS))
                                                 ^

ERROR: spaces required around that '|' (ctx:VxV)
#125: FILE: migration/ram.c:2975:
+                  == (RAM_SAVE_FLAG_MULTIFD_PAGE|RAM_SAVE_FLAG_COMPRESS)) {
                                                 ^

total: 5 errors, 0 warnings, 95 lines checked

Your patch has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.

=== OUTPUT END ===

Test command exited with code: 1


---
Email generated automatically by Patchew [http://patchew.org/].
Please send your feedback to patchew-devel@freelists.org

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 01/17] migration: transform remained DPRINTF into trace_
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 01/17] migration: transform remained DPRINTF into trace_ Juan Quintela
@ 2017-01-24  2:20   ` Eric Blake
  2017-01-24 12:20     ` Dr. David Alan Gilbert
  0 siblings, 1 reply; 57+ messages in thread
From: Eric Blake @ 2017-01-24  2:20 UTC (permalink / raw)
  To: Juan Quintela, qemu-devel; +Cc: amit.shah, dgilbert

[-- Attachment #1: Type: text/plain, Size: 992 bytes --]

On 01/23/2017 03:32 PM, Juan Quintela wrote:

In the subject: s/remained/remaining/

> So we can remove DPRINTF() macro
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c        | 18 ++++--------------
>  migration/trace-events |  4 ++++
>  2 files changed, 8 insertions(+), 14 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index a1c8089..ef8fadf 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -45,14 +45,6 @@
>  #include "qemu/rcu_queue.h"
>  #include "migration/colo.h"
> 
> -#ifdef DEBUG_MIGRATION_RAM
> -#define DPRINTF(fmt, ...) \
> -    do { fprintf(stdout, "migration_ram: " fmt, ## __VA_ARGS__); } while (0)
> -#else
> -#define DPRINTF(fmt, ...) \
> -    do { } while (0)

Yay - you're getting rid of the bitrot pattern in this file!

Reviewed-by: Eric Blake <eblake@redhat.com>

-- 
Eric Blake   eblake redhat com    +1-919-301-3266
Libvirt virtualization library http://libvirt.org


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 604 bytes --]

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 03/17] migration: Test for disabled features on reception
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 03/17] migration: Test for disabled features on reception Juan Quintela
@ 2017-01-24 10:33   ` Dr. David Alan Gilbert
  2017-02-09 17:12     ` Juan Quintela
  0 siblings, 1 reply; 57+ messages in thread
From: Dr. David Alan Gilbert @ 2017-01-24 10:33 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> Right now, if we receive a compressed page or a xbzrle page while this
> features are disabled, Bad Things (TM) can happen.  Just add a test for
> them.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 23 ++++++++++++++++++++++-
>  1 file changed, 22 insertions(+), 1 deletion(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index ef8fadf..4ad814a 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -2455,7 +2455,7 @@ static int ram_load_postcopy(QEMUFile *f)
> 
>  static int ram_load(QEMUFile *f, void *opaque, int version_id)
>  {
> -    int flags = 0, ret = 0;
> +    int flags = 0, ret = 0, invalid_flags;
>      static uint64_t seq_iter;
>      int len = 0;
>      /*
> @@ -2470,6 +2470,15 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>          ret = -EINVAL;
>      }
> 
> +    invalid_flags = 0;
> +
> +    if (!migrate_use_xbzrle()) {
> +        invalid_flags |= RAM_SAVE_FLAG_XBZRLE;
> +    }

Is that really the case? I thought we used to ignore the flags on the incoming
side and didn't need to enable xbzrle on the destination?

Dave

> +    if (!migrate_use_compression()) {
> +        invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
> +    }
>      /* This RCU critical section can be very long running.
>       * When RCU reclaims in the code start to become numerous,
>       * it will be necessary to reduce the granularity of this
> @@ -2490,6 +2499,18 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>          flags = addr & ~TARGET_PAGE_MASK;
>          addr &= TARGET_PAGE_MASK;
> 
> +        if (flags & invalid_flags) {
> +            if (flags & invalid_flags  & RAM_SAVE_FLAG_XBZRLE) {
> +                error_report("Received an unexpected XBRLE page");
> +            }
> +            if (flags & invalid_flags  & RAM_SAVE_FLAG_COMPRESS_PAGE) {
> +                error_report("Received an unexpected compressed page");
> +            }
> +
> +            ret = -EINVAL;
> +            break;
> +        }
> +
>          if (flags & (RAM_SAVE_FLAG_COMPRESS | RAM_SAVE_FLAG_PAGE |
>                       RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
>              RAMBlock *block = ram_block_from_stream(f, flags);
> -- 
> 2.9.3
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 01/17] migration: transform remained DPRINTF into trace_
  2017-01-24  2:20   ` Eric Blake
@ 2017-01-24 12:20     ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 57+ messages in thread
From: Dr. David Alan Gilbert @ 2017-01-24 12:20 UTC (permalink / raw)
  To: Eric Blake; +Cc: Juan Quintela, qemu-devel, amit.shah

* Eric Blake (eblake@redhat.com) wrote:
> On 01/23/2017 03:32 PM, Juan Quintela wrote:
> 
> In the subject: s/remained/remaining/
> 
> > So we can remove DPRINTF() macro
> > 
> > Signed-off-by: Juan Quintela <quintela@redhat.com>
> > ---
> >  migration/ram.c        | 18 ++++--------------
> >  migration/trace-events |  4 ++++
> >  2 files changed, 8 insertions(+), 14 deletions(-)
> > 
> > diff --git a/migration/ram.c b/migration/ram.c
> > index a1c8089..ef8fadf 100644
> > --- a/migration/ram.c
> > +++ b/migration/ram.c
> > @@ -45,14 +45,6 @@
> >  #include "qemu/rcu_queue.h"
> >  #include "migration/colo.h"
> > 
> > -#ifdef DEBUG_MIGRATION_RAM
> > -#define DPRINTF(fmt, ...) \
> > -    do { fprintf(stdout, "migration_ram: " fmt, ## __VA_ARGS__); } while (0)
> > -#else
> > -#define DPRINTF(fmt, ...) \
> > -    do { } while (0)
> 
> Yay - you're getting rid of the bitrot pattern in this file!
> 
> Reviewed-by: Eric Blake <eblake@redhat.com>

This change doesn't need the rest of the series so,
queued.
(Fixed up 'remained')

Dave

> 
> -- 
> Eric Blake   eblake redhat com    +1-919-301-3266
> Libvirt virtualization library http://libvirt.org
> 



--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 07/17] migration: Create x-multifd-group parameter
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 07/17] migration: Create x-multifd-group parameter Juan Quintela
@ 2017-01-26 11:47   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 57+ messages in thread
From: Dr. David Alan Gilbert @ 2017-01-26 11:47 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> Indicates how many pages we are going to send in each bach to a multifd
> thread.

bach->batch

> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  hmp.c                         |  8 ++++++++
>  include/migration/migration.h |  1 +
>  migration/migration.c         | 23 +++++++++++++++++++++++
>  qapi-schema.json              | 11 +++++++++--
>  4 files changed, 41 insertions(+), 2 deletions(-)
> 
> diff --git a/hmp.c b/hmp.c
> index 8c7e302..e579766 100644
> --- a/hmp.c
> +++ b/hmp.c
> @@ -325,6 +325,9 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
>          monitor_printf(mon, " %s: %" PRId64,
>              MigrationParameter_lookup[MIGRATION_PARAMETER_X_MULTIFD_THREADS],
>              params->x_multifd_threads);
> +        monitor_printf(mon, " %s: %" PRId64,
> +            MigrationParameter_lookup[MIGRATION_PARAMETER_X_MULTIFD_GROUP],
> +            params->x_multifd_group);
>          monitor_printf(mon, "\n");
>      }
> 
> @@ -1401,6 +1404,10 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
>                  p.has_x_multifd_threads = true;
>                  use_int_value = true;
>                  break;
> +            case MIGRATION_PARAMETER_X_MULTIFD_GROUP:
> +                p.has_x_multifd_group = true;
> +                use_int_value = true;
> +                break;
>              }
> 
>              if (use_int_value) {
> @@ -1419,6 +1426,7 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
>                  p.downtime_limit = valueint;
>                  p.x_checkpoint_delay = valueint;
>                  p.x_multifd_threads = valueint;
> +                p.x_multifd_group = valueint;
>              }
> 
>              qmp_migrate_set_parameters(&p, &err);
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index b35044c..515569d 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -248,6 +248,7 @@ bool migration_in_postcopy_after_devices(MigrationState *);
>  MigrationState *migrate_get_current(void);
> 
>  int migrate_multifd_threads(void);
> +int migrate_multifd_group(void);
> 
>  void migrate_compress_threads_create(void);
>  void migrate_compress_threads_join(void);
> diff --git a/migration/migration.c b/migration/migration.c
> index 2fe03d8..9bde01b 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -68,6 +68,7 @@
>   */
>  #define DEFAULT_MIGRATE_X_CHECKPOINT_DELAY 200
>  #define DEFAULT_MIGRATE_MULTIFD_THREADS 2
> +#define DEFAULT_MIGRATE_MULTIFD_GROUP 16

See below ^^^

> 
>  static NotifierList migration_state_notifiers =
>      NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
> @@ -103,6 +104,7 @@ MigrationState *migrate_get_current(void)
>              .downtime_limit = DEFAULT_MIGRATE_SET_DOWNTIME,
>              .x_checkpoint_delay = DEFAULT_MIGRATE_X_CHECKPOINT_DELAY,
>              .x_multifd_threads = DEFAULT_MIGRATE_MULTIFD_THREADS,
> +            .x_multifd_group = DEFAULT_MIGRATE_MULTIFD_GROUP,
>          },
>      };
> 
> @@ -595,6 +597,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
>      params->x_checkpoint_delay = s->parameters.x_checkpoint_delay;
>      params->has_x_multifd_threads = true;
>      params->x_multifd_threads = s->parameters.x_multifd_threads;
> +    params->has_x_multifd_group = true;
> +    params->x_multifd_group = s->parameters.x_multifd_group;
> 
>      return params;
>  }
> @@ -865,6 +869,13 @@ void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
>                     "is invalid, it should be in the range of 1 to 255");
>          return;
>      }
> +    if (params->has_x_multifd_group &&
> +            (params->x_multifd_group < 1 || params->x_multifd_group > 10000)) {
> +        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
> +                   "multifd_group",
> +                   "is invalid, it should be in the range of 1 to 10000");
> +        return;
> +    }
> 
>      if (params->has_compress_level) {
>          s->parameters.compress_level = params->compress_level;
> @@ -906,6 +917,9 @@ void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
>      if (params->has_x_multifd_threads) {
>          s->parameters.x_multifd_threads = params->x_multifd_threads;
>      }
> +    if (params->has_x_multifd_group) {
> +        s->parameters.x_multifd_group = params->x_multifd_group;
> +    }
>  }
> 
> 
> @@ -1351,6 +1365,15 @@ int migrate_multifd_threads(void)
>      return s->parameters.x_multifd_threads;
>  }
> 
> +int migrate_multifd_group(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->parameters.x_multifd_group;
> +}
> +
>  int migrate_use_xbzrle(void)
>  {
>      MigrationState *s;
> diff --git a/qapi-schema.json b/qapi-schema.json
> index 2273864..54232ee 100644
> --- a/qapi-schema.json
> +++ b/qapi-schema.json
> @@ -984,6 +984,9 @@
>  # @x-multifd-threads: Number of threads used to migrate data in parallel
>  #                     The default value is 2 (since 2.9)
>  #
> +# @x-multifd-group: Number of pages sent together to a thread
> +#                     The default value is 32 (since 2.9)

Your default is set above as 16.
Other than that;

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> +#
>  # Since: 2.4
>  ##
>  { 'enum': 'MigrationParameter',
> @@ -991,7 +994,7 @@
>             'cpu-throttle-initial', 'cpu-throttle-increment',
>             'tls-creds', 'tls-hostname', 'max-bandwidth',
>             'downtime-limit', 'x-checkpoint-delay',
> -           'x-multifd-threads'] }
> +           'x-multifd-threads', 'x-multifd-group'] }
> 
>  ##
>  # @migrate-set-parameters:
> @@ -1058,6 +1061,9 @@
>  # @x-multifd-threads: Number of threads used to migrate data in parallel
>  #                     The default value is 1 (since 2.9)
>  #
> +# @x-multifd-group: Number of pages sent together in a bunch
> +#                     The default value is 32 (since 2.9)
> +#
>  # Since: 2.4
>  ##
>  { 'struct': 'MigrationParameters',
> @@ -1071,7 +1077,8 @@
>              '*max-bandwidth': 'int',
>              '*downtime-limit': 'int',
>              '*x-checkpoint-delay': 'int',
> -            '*x-multifd-threads': 'int'} }
> +            '*x-multifd-threads': 'int',
> +            '*x-multifd-group': 'int'} }
> 
>  ##
>  # @query-migrate-parameters:
> -- 
> 2.9.3
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 11/17] migration: Create thread infrastructure for multifd send side
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 11/17] migration: Create thread infrastructure for multifd send side Juan Quintela
@ 2017-01-26 12:38   ` Paolo Bonzini
  2017-02-13 16:38     ` Juan Quintela
  2017-02-02 12:03   ` Dr. David Alan Gilbert
  1 sibling, 1 reply; 57+ messages in thread
From: Paolo Bonzini @ 2017-01-26 12:38 UTC (permalink / raw)
  To: Juan Quintela, qemu-devel; +Cc: amit.shah, dgilbert



On 23/01/2017 22:32, Juan Quintela wrote:
> We make the locking and the transfer of information specific, even if we
> are still transmiting things through the main thread.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 52 insertions(+), 1 deletion(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index c71929e..9d7bc64 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -392,17 +392,25 @@ void migrate_compress_threads_create(void)
>  /* Multiple fd's */
> 
>  struct MultiFDSendParams {
> +    /* not changed */
>      QemuThread thread;
>      QIOChannel *c;
>      QemuCond cond;
>      QemuMutex mutex;
> +    /* protected by param mutex */
>      bool quit;
>      bool started;
> +    uint8_t *address;
> +    /* protected by multifd mutex */
> +    bool done;
>  };
>  typedef struct MultiFDSendParams MultiFDSendParams;
> 
>  static MultiFDSendParams *multifd_send;
> 
> +QemuMutex multifd_send_mutex;
> +QemuCond multifd_send_cond;

Having n+1 semaphores instead of n+1 cond/mutex pairs could be more
efficient.  See thread-pool.c for an example.

Paolo

>  static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *params = opaque;
> @@ -416,7 +424,17 @@ static void *multifd_send_thread(void *opaque)
> 
>      qemu_mutex_lock(&params->mutex);
>      while (!params->quit){
> -        qemu_cond_wait(&params->cond, &params->mutex);
> +        if (params->address) {
> +            params->address = 0;
> +            qemu_mutex_unlock(&params->mutex);
> +            qemu_mutex_lock(&multifd_send_mutex);
> +            params->done = true;
> +            qemu_cond_signal(&multifd_send_cond);
> +            qemu_mutex_unlock(&multifd_send_mutex);
> +            qemu_mutex_lock(&params->mutex);
> +        } else {
> +            qemu_cond_wait(&params->cond, &params->mutex);
> +        }
>      }
>      qemu_mutex_unlock(&params->mutex);
> 
> @@ -464,12 +482,16 @@ void migrate_multifd_send_threads_create(void)
>      }
>      thread_count = migrate_multifd_threads();
>      multifd_send = g_new0(MultiFDSendParams, thread_count);
> +    qemu_mutex_init(&multifd_send_mutex);
> +    qemu_cond_init(&multifd_send_cond);
>      for (i = 0; i < thread_count; i++) {
>          char thread_name[15];
>          qemu_mutex_init(&multifd_send[i].mutex);
>          qemu_cond_init(&multifd_send[i].cond);
>          multifd_send[i].quit = false;
>          multifd_send[i].started = false;
> +        multifd_send[i].done = true;
> +        multifd_send[i].address = 0;
>          multifd_send[i].c = socket_send_channel_create();
>          if(!multifd_send[i].c) {
>              error_report("Error creating a send channel");
> @@ -487,6 +509,34 @@ void migrate_multifd_send_threads_create(void)
>      }
>  }
> 
> +static int multifd_send_page(uint8_t *address)
> +{
> +    int i, thread_count;
> +    bool found = false;
> +
> +    thread_count = migrate_multifd_threads();
> +    qemu_mutex_lock(&multifd_send_mutex);
> +    while (!found) {
> +        for (i = 0; i < thread_count; i++) {
> +            if (multifd_send[i].done) {
> +                multifd_send[i].done = false;
> +                found = true;
> +                break;
> +            }
> +        }
> +        if (!found) {
> +            qemu_cond_wait(&multifd_send_cond, &multifd_send_mutex);
> +        }
> +    }
> +    qemu_mutex_unlock(&multifd_send_mutex);
> +    qemu_mutex_lock(&multifd_send[i].mutex);
> +    multifd_send[i].address = address;
> +    qemu_cond_signal(&multifd_send[i].cond);
> +    qemu_mutex_unlock(&multifd_send[i].mutex);
> +
> +    return 0;
> +}
> +
>  struct MultiFDRecvParams {
>      QemuThread thread;
>      QIOChannel *c;
> @@ -1015,6 +1065,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
>          *bytes_transferred +=
>              save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
>          qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
> +        multifd_send_page(p);
>          *bytes_transferred += TARGET_PAGE_SIZE;
>          pages = 1;
>          acct_info.norm_pages++;
> 

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 14/17] migration: Create thread infrastructure for multifd recv side
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 14/17] migration: Create thread infrastructure for multifd recv side Juan Quintela
@ 2017-01-26 12:39   ` Paolo Bonzini
  2017-02-03 11:24   ` Dr. David Alan Gilbert
  1 sibling, 0 replies; 57+ messages in thread
From: Paolo Bonzini @ 2017-01-26 12:39 UTC (permalink / raw)
  To: Juan Quintela, qemu-devel; +Cc: amit.shah, dgilbert



On 23/01/2017 22:32, Juan Quintela wrote:
>      bool started;
> +    multifd_pages_t pages;
> +    /* proteced by multifd mutex */
> +    bool done;
>  };
>  typedef struct MultiFDRecvParams MultiFDRecvParams;
> 
>  static MultiFDRecvParams *multifd_recv;
> 
> +QemuMutex multifd_recv_mutex;
> +QemuCond  multifd_recv_cond;
> +
>  static void *multifd_recv_thread(void *opaque)

Same here (and it should also be wrapped in an abstract data type).

Paolo

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 09/17] migration: Start of multiple fd work
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 09/17] migration: Start of multiple fd work Juan Quintela
@ 2017-01-27 17:45   ` Dr. David Alan Gilbert
  2017-02-13 16:34     ` Juan Quintela
  2017-02-13 17:35   ` Daniel P. Berrange
  1 sibling, 1 reply; 57+ messages in thread
From: Dr. David Alan Gilbert @ 2017-01-27 17:45 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> We create new channels for each new thread created. We only send through
> them a character to be sure that we are creating the channels in the
> right order.
> 
> Note: Reference count/freeing of channels is not done
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  include/migration/migration.h |  6 +++++
>  migration/ram.c               | 45 +++++++++++++++++++++++++++++++++-
>  migration/socket.c            | 56 +++++++++++++++++++++++++++++++++++++++++--
>  3 files changed, 104 insertions(+), 3 deletions(-)

One thing not direclt in here, you should probably look at the migration cancel
code to get it to call shutdown() on all your extra sockets, it stops things
blocking in any one of them.

> 
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index f119ba0..3989bd6 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -22,6 +22,7 @@
>  #include "qapi-types.h"
>  #include "exec/cpu-common.h"
>  #include "qemu/coroutine_int.h"
> +#include "io/channel.h"
> 
>  #define QEMU_VM_FILE_MAGIC           0x5145564d
>  #define QEMU_VM_FILE_VERSION_COMPAT  0x00000002
> @@ -218,6 +219,11 @@ void tcp_start_incoming_migration(const char *host_port, Error **errp);
> 
>  void tcp_start_outgoing_migration(MigrationState *s, const char *host_port, Error **errp);
> 
> +QIOChannel *socket_recv_channel_create(void);
> +int socket_recv_channel_destroy(QIOChannel *recv);
> +QIOChannel *socket_send_channel_create(void);
> +int socket_send_channel_destroy(QIOChannel *send);
> +
>  void unix_start_incoming_migration(const char *path, Error **errp);
> 
>  void unix_start_outgoing_migration(MigrationState *s, const char *path, Error **errp);
> diff --git a/migration/ram.c b/migration/ram.c
> index 939f364..5ad7cb3 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -386,9 +386,11 @@ void migrate_compress_threads_create(void)
> 
>  struct MultiFDSendParams {
>      QemuThread thread;
> +    QIOChannel *c;
>      QemuCond cond;
>      QemuMutex mutex;
>      bool quit;
> +    bool started;
>  };
>  typedef struct MultiFDSendParams MultiFDSendParams;
> 
> @@ -397,6 +399,13 @@ static MultiFDSendParams *multifd_send;
>  static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *params = opaque;
> +    char start = 's';
> +
> +    qio_channel_write(params->c, &start, 1, &error_abort);

I'd be tempted to send something stronger as a guarantee
that you're connecting the right thing to the right place;
maybe something like a QEMU + UUID + fd index?
I guarantee someone is going to mess up the fd's in the wrong
order or connect some random other process to one of them.

> +    qemu_mutex_lock(&params->mutex);
> +    params->started = true;
> +    qemu_cond_signal(&params->cond);
> +    qemu_mutex_unlock(&params->mutex);
> 
>      qemu_mutex_lock(&params->mutex);

That unlock/lock pair is odd.

>      while (!params->quit){
> @@ -433,6 +442,7 @@ void migrate_multifd_send_threads_join(void)
>          qemu_thread_join(&multifd_send[i].thread);
>          qemu_mutex_destroy(&multifd_send[i].mutex);
>          qemu_cond_destroy(&multifd_send[i].cond);
> +        socket_send_channel_destroy(multifd_send[i].c);
>      }
>      g_free(multifd_send);
>      multifd_send = NULL;
> @@ -452,18 +462,31 @@ void migrate_multifd_send_threads_create(void)
>          qemu_mutex_init(&multifd_send[i].mutex);
>          qemu_cond_init(&multifd_send[i].cond);
>          multifd_send[i].quit = false;
> +        multifd_send[i].started = false;
> +        multifd_send[i].c = socket_send_channel_create();
> +        if(!multifd_send[i].c) {
> +            error_report("Error creating a send channel");
> +            exit(0);

Hmm no exit!

> +        }
>          snprintf(thread_name, 15, "multifd_send_%d", i);
>          qemu_thread_create(&multifd_send[i].thread, thread_name,
>                             multifd_send_thread, &multifd_send[i],
>                             QEMU_THREAD_JOINABLE);
> +        qemu_mutex_lock(&multifd_send[i].mutex);
> +        while (!multifd_send[i].started) {
> +            qemu_cond_wait(&multifd_send[i].cond, &multifd_send[i].mutex);
> +        }
> +        qemu_mutex_unlock(&multifd_send[i].mutex);
>      }
>  }
> 
>  struct MultiFDRecvParams {
>      QemuThread thread;
> +    QIOChannel *c;
>      QemuCond cond;
>      QemuMutex mutex;
>      bool quit;
> +    bool started;
>  };
>  typedef struct MultiFDRecvParams MultiFDRecvParams;
> 
> @@ -472,7 +495,14 @@ static MultiFDRecvParams *multifd_recv;
>  static void *multifd_recv_thread(void *opaque)
>  {
>      MultiFDRecvParams *params = opaque;
> - 
> +    char start;
> +
> +    qio_channel_read(params->c, &start, 1, &error_abort);
> +    qemu_mutex_lock(&params->mutex);
> +    params->started = true;
> +    qemu_cond_signal(&params->cond);
> +    qemu_mutex_unlock(&params->mutex);
> +
>      qemu_mutex_lock(&params->mutex);
>      while (!params->quit){
>          qemu_cond_wait(&params->cond, &params->mutex);
> @@ -508,6 +538,7 @@ void migrate_multifd_recv_threads_join(void)
>          qemu_thread_join(&multifd_recv[i].thread);
>          qemu_mutex_destroy(&multifd_recv[i].mutex);
>          qemu_cond_destroy(&multifd_recv[i].cond);
> +        socket_send_channel_destroy(multifd_recv[i].c);
>      }
>      g_free(multifd_recv);
>      multifd_recv = NULL;
> @@ -526,9 +557,21 @@ void migrate_multifd_recv_threads_create(void)
>          qemu_mutex_init(&multifd_recv[i].mutex);
>          qemu_cond_init(&multifd_recv[i].cond);
>          multifd_recv[i].quit = false;
> +        multifd_recv[i].started = false;
> +        multifd_recv[i].c = socket_recv_channel_create();
> +
> +        if(!multifd_recv[i].c) {
> +            error_report("Error creating a recv channel");
> +            exit(0);
> +        }
>          qemu_thread_create(&multifd_recv[i].thread, "multifd_recv",
>                             multifd_recv_thread, &multifd_recv[i],
>                             QEMU_THREAD_JOINABLE);
> +        qemu_mutex_lock(&multifd_recv[i].mutex);
> +        while (!multifd_recv[i].started) {
> +            qemu_cond_wait(&multifd_recv[i].cond, &multifd_recv[i].mutex);
> +        }
> +        qemu_mutex_unlock(&multifd_recv[i].mutex);
>      }
>  }
> 
> diff --git a/migration/socket.c b/migration/socket.c
> index 11f80b1..7cd9213 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -24,6 +24,54 @@
>  #include "io/channel-socket.h"
>  #include "trace.h"
> 
> +struct SocketArgs {
> +    QIOChannelSocket *ioc;
> +    SocketAddress *saddr;
> +    Error **errp;
> +} socket_args;
> +
> +QIOChannel *socket_recv_channel_create(void)
> +{
> +    QIOChannelSocket *sioc;
> +    Error *err = NULL;
> +
> +    sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(socket_args.ioc),
> +                                     &err);
> +    if (!sioc) {
> +        error_report("could not accept migration connection (%s)",
> +                     error_get_pretty(err));
> +        return NULL;
> +    }
> +    return QIO_CHANNEL(sioc);
> +}
> +
> +int socket_recv_channel_destroy(QIOChannel *recv)
> +{
> +    // Remove channel
> +    object_unref(OBJECT(send));
> +    return 0;
> +}
> +
> +QIOChannel *socket_send_channel_create(void)
> +{
> +    QIOChannelSocket *sioc = qio_channel_socket_new();
> +
> +    qio_channel_socket_connect_sync(sioc, socket_args.saddr,
> +                                    socket_args.errp);

We need to be careful there since that's a sync; it depends what
calls this, if I'm reading the code above correctly then it gets called
from the main thread and that would be bad if it blocked; it's ok if it
was the fd threads or the migration thread though.

> +    qio_channel_set_delay(QIO_CHANNEL(sioc), false);
> +    return QIO_CHANNEL(sioc);
> +}
> +
> +int socket_send_channel_destroy(QIOChannel *send)
> +{
> +    // Remove channel
> +    object_unref(OBJECT(send));
> +    if (socket_args.saddr) {
> +        qapi_free_SocketAddress(socket_args.saddr);
> +        socket_args.saddr = NULL;
> +    }
> +    return 0;
> +}
> 
>  static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
>  {
> @@ -96,6 +144,10 @@ static void socket_start_outgoing_migration(MigrationState *s,
>      struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
> 
>      data->s = s;
> +
> +    socket_args.saddr = saddr;
> +    socket_args.errp = errp;
> +
>      if (saddr->type == SOCKET_ADDRESS_KIND_INET) {
>          data->hostname = g_strdup(saddr->u.inet.data->host);
>      }
> @@ -106,7 +158,6 @@ static void socket_start_outgoing_migration(MigrationState *s,
>                                       socket_outgoing_migration,
>                                       data,
>                                       socket_connect_data_free);
> -    qapi_free_SocketAddress(saddr);
>  }
> 
>  void tcp_start_outgoing_migration(MigrationState *s,
> @@ -154,7 +205,7 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
> 
>  out:
>      /* Close listening socket as its no longer needed */
> -    qio_channel_close(ioc, NULL);
> +//    qio_channel_close(ioc, NULL);
>      return FALSE; /* unregister */
>  }
> 
> @@ -163,6 +214,7 @@ static void socket_start_incoming_migration(SocketAddress *saddr,
>                                              Error **errp)
>  {
>      QIOChannelSocket *listen_ioc = qio_channel_socket_new();
> +    socket_args.ioc = listen_ioc;
> 
>      qio_channel_set_name(QIO_CHANNEL(listen_ioc),
>                           "migration-socket-listener");
> -- 
> 2.9.3
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 10/17] migration: create ram_multifd_page
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 10/17] migration: create ram_multifd_page Juan Quintela
@ 2017-01-27 18:02   ` Dr. David Alan Gilbert
  2017-01-30 10:06     ` Juan Quintela
  2017-02-13 16:36     ` Juan Quintela
  2017-02-02 11:20   ` Dr. David Alan Gilbert
  1 sibling, 2 replies; 57+ messages in thread
From: Dr. David Alan Gilbert @ 2017-01-27 18:02 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> The function still don't use multifd, but we have simplified
> ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
> counter and a new flag for this type of pages.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  hmp.c                         |  2 ++
>  include/migration/migration.h |  1 +
>  migration/migration.c         |  1 +
>  migration/ram.c               | 51 ++++++++++++++++++++++++++++++++++++++++++-
>  qapi-schema.json              |  4 +++-
>  5 files changed, 57 insertions(+), 2 deletions(-)
> 
> diff --git a/hmp.c b/hmp.c
> index e579766..76bc8c7 100644
> --- a/hmp.c
> +++ b/hmp.c
> @@ -222,6 +222,8 @@ void hmp_info_migrate(Monitor *mon, const QDict *qdict)
>              monitor_printf(mon, "postcopy request count: %" PRIu64 "\n",
>                             info->ram->postcopy_requests);
>          }
> +        monitor_printf(mon, "multifd: %" PRIu64 " pages\n",
> +                       info->ram->multifd);
>      }
> 
>      if (info->has_disk) {
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 3989bd6..b3e4f31 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -282,6 +282,7 @@ uint64_t xbzrle_mig_pages_transferred(void);
>  uint64_t xbzrle_mig_pages_overflow(void);
>  uint64_t xbzrle_mig_pages_cache_miss(void);
>  double xbzrle_mig_cache_miss_rate(void);
> +uint64_t multifd_mig_pages_transferred(void);
> 
>  void ram_handle_compressed(void *host, uint8_t ch, uint64_t size);
>  void ram_debug_dump_bitmap(unsigned long *todump, bool expected);
> diff --git a/migration/migration.c b/migration/migration.c
> index ab48f06..1d62b91 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -652,6 +652,7 @@ static void populate_ram_info(MigrationInfo *info, MigrationState *s)
>      info->ram->mbps = s->mbps;
>      info->ram->dirty_sync_count = s->dirty_sync_count;
>      info->ram->postcopy_requests = s->postcopy_requests;
> +    info->ram->multifd = multifd_mig_pages_transferred();
> 
>      if (s->state != MIGRATION_STATUS_COMPLETED) {
>          info->ram->remaining = ram_bytes_remaining();
> diff --git a/migration/ram.c b/migration/ram.c
> index 5ad7cb3..c71929e 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -61,6 +61,7 @@ static uint64_t bitmap_sync_count;
>  #define RAM_SAVE_FLAG_XBZRLE   0x40
>  /* 0x80 is reserved in migration.h start with 0x100 next */
>  #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
> +#define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200

I think a similar reminder from the last iteration of this patch;
I think we're out of bits here - I'm not sure if 0x200 is even
available.

>  static uint8_t *ZERO_TARGET_PAGE;
> 
> @@ -141,6 +142,7 @@ typedef struct AccountingInfo {
>      uint64_t dup_pages;
>      uint64_t skipped_pages;
>      uint64_t norm_pages;
> +    uint64_t multifd_pages;
>      uint64_t iterations;
>      uint64_t xbzrle_bytes;
>      uint64_t xbzrle_pages;
> @@ -211,6 +213,11 @@ uint64_t xbzrle_mig_pages_overflow(void)
>      return acct_info.xbzrle_overflows;
>  }
> 
> +uint64_t multifd_mig_pages_transferred(void)
> +{
> +    return acct_info.multifd_pages;
> +}
> +
>  /* This is the last block that we have visited serching for dirty pages
>   */
>  static RAMBlock *last_seen_block;
> @@ -990,6 +997,33 @@ static int ram_save_page(QEMUFile *f, PageSearchStatus *pss,
>      return pages;
>  }
> 
> +static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
> +                            bool last_stage, uint64_t *bytes_transferred)
> +{
> +    int pages;
> +    uint8_t *p;
> +    RAMBlock *block = pss->block;
> +    ram_addr_t offset = pss->offset;
> +
> +    p = block->host + offset;
> +
> +    if (block == last_sent_block) {
> +        offset |= RAM_SAVE_FLAG_CONTINUE;
> +    }
> +    pages = save_zero_page(f, block, offset, p, bytes_transferred);
> +    if (pages == -1) {
> +        *bytes_transferred +=
> +            save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
> +        qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
> +        *bytes_transferred += TARGET_PAGE_SIZE;
> +        pages = 1;
> +        acct_info.norm_pages++;
> +        acct_info.multifd_pages++;
> +    }
> +
> +    return pages;
> +}
> +
>  static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
>                                  ram_addr_t offset)
>  {
> @@ -1427,6 +1461,8 @@ static int ram_save_target_page(MigrationState *ms, QEMUFile *f,
>              res = ram_save_compressed_page(f, pss,
>                                             last_stage,
>                                             bytes_transferred);
> +        } else if (migrate_use_multifd()) {
> +            res = ram_multifd_page(f, pss, last_stage, bytes_transferred);

I'm curious whether it's best to pick the destination fd at this level or one level
higher; for example would it be good to keep all the components of a host page or huge
page together on the same fd? If so then it would be best to pick the fd
at ram_save_host_page level.

Dave

>          } else {
>              res = ram_save_page(f, pss, last_stage,
>                                  bytes_transferred);
> @@ -2678,6 +2714,10 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>      if (!migrate_use_compression()) {
>          invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
>      }
> +
> +    if (!migrate_use_multifd()) {
> +        invalid_flags |= RAM_SAVE_FLAG_MULTIFD_PAGE;
> +    }
>      /* This RCU critical section can be very long running.
>       * When RCU reclaims in the code start to become numerous,
>       * it will be necessary to reduce the granularity of this
> @@ -2705,13 +2745,17 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>              if (flags & invalid_flags  & RAM_SAVE_FLAG_COMPRESS_PAGE) {
>                  error_report("Received an unexpected compressed page");
>              }
> +            if (flags & invalid_flags  & RAM_SAVE_FLAG_MULTIFD_PAGE) {
> +                error_report("Received an unexpected multifd page");
> +            }
> 
>              ret = -EINVAL;
>              break;
>          }
> 
>          if (flags & (RAM_SAVE_FLAG_COMPRESS | RAM_SAVE_FLAG_PAGE |
> -                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
> +                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
> +                     RAM_SAVE_FLAG_MULTIFD_PAGE)) {
>              RAMBlock *block = ram_block_from_stream(f, flags);
> 
>              host = host_from_ram_block_offset(block, addr);
> @@ -2786,6 +2830,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>                  break;
>              }
>              break;
> +
> +        case RAM_SAVE_FLAG_MULTIFD_PAGE:
> +            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
> +            break;
> +
>          case RAM_SAVE_FLAG_EOS:
>              /* normal exit */
>              break;
> diff --git a/qapi-schema.json b/qapi-schema.json
> index 54232ee..3e93f7f 100644
> --- a/qapi-schema.json
> +++ b/qapi-schema.json
> @@ -574,6 +574,7 @@
>  #
>  # @postcopy-requests: The number of page requests received from the destination
>  #        (since 2.7)
> +# @multifd: number of pages sent with multifd (since 2.9)
>  #
>  # Since: 0.14.0
>  ##
> @@ -582,7 +583,8 @@
>             'duplicate': 'int', 'skipped': 'int', 'normal': 'int',
>             'normal-bytes': 'int', 'dirty-pages-rate' : 'int',
>             'mbps' : 'number', 'dirty-sync-count' : 'int',
> -           'postcopy-requests' : 'int' } }
> +           'postcopy-requests' : 'int',
> +           'multifd' : 'int'} }

>  ##
>  # @XBZRLECacheStats:
> -- 
> 2.9.3
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 10/17] migration: create ram_multifd_page
  2017-01-27 18:02   ` Dr. David Alan Gilbert
@ 2017-01-30 10:06     ` Juan Quintela
  2017-02-02 11:04       ` Dr. David Alan Gilbert
  2017-02-13 16:36     ` Juan Quintela
  1 sibling, 1 reply; 57+ messages in thread
From: Juan Quintela @ 2017-01-30 10:06 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, amit.shah

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> The function still don't use multifd, but we have simplified
>> ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
>> counter and a new flag for this type of pages.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>

>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -61,6 +61,7 @@ static uint64_t bitmap_sync_count;
>>  #define RAM_SAVE_FLAG_XBZRLE   0x40
>>  /* 0x80 is reserved in migration.h start with 0x100 next */
>>  #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
>> +#define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
>
> I think a similar reminder from the last iteration of this patch;
> I think we're out of bits here - I'm not sure if 0x200 is even
> available.

In previous iteration, I used *two* bits.  As per your recomendation, I
"reused" and old one for doing the synchronization.


Later, Juan.

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 10/17] migration: create ram_multifd_page
  2017-01-30 10:06     ` Juan Quintela
@ 2017-02-02 11:04       ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 57+ messages in thread
From: Dr. David Alan Gilbert @ 2017-02-02 11:04 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> > * Juan Quintela (quintela@redhat.com) wrote:
> >> The function still don't use multifd, but we have simplified
> >> ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
> >> counter and a new flag for this type of pages.
> >> 
> >> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> >> --- a/migration/ram.c
> >> +++ b/migration/ram.c
> >> @@ -61,6 +61,7 @@ static uint64_t bitmap_sync_count;
> >>  #define RAM_SAVE_FLAG_XBZRLE   0x40
> >>  /* 0x80 is reserved in migration.h start with 0x100 next */
> >>  #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
> >> +#define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
> >
> > I think a similar reminder from the last iteration of this patch;
> > I think we're out of bits here - I'm not sure if 0x200 is even
> > available.
> 
> In previous iteration, I used *two* bits.  As per your recomendation, I
> "reused" and old one for doing the synchronization.

Hmm OK, but that's still the last free bit; given TARGET_PAGE_BITS 10
on 3 targets, we can't use 0x400.
I guess the next user of a new bit will need to use RAM_SAVE_FLAG_FULL
which has never really been used and that will have to be some
magic that caused another block of bits to be read.

Dave

> 
> Later, Juan.
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 10/17] migration: create ram_multifd_page
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 10/17] migration: create ram_multifd_page Juan Quintela
  2017-01-27 18:02   ` Dr. David Alan Gilbert
@ 2017-02-02 11:20   ` Dr. David Alan Gilbert
  1 sibling, 0 replies; 57+ messages in thread
From: Dr. David Alan Gilbert @ 2017-02-02 11:20 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> The function still don't use multifd, but we have simplified
> ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
> counter and a new flag for this type of pages.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

Dave

> ---
>  hmp.c                         |  2 ++
>  include/migration/migration.h |  1 +
>  migration/migration.c         |  1 +
>  migration/ram.c               | 51 ++++++++++++++++++++++++++++++++++++++++++-
>  qapi-schema.json              |  4 +++-
>  5 files changed, 57 insertions(+), 2 deletions(-)
> 
> diff --git a/hmp.c b/hmp.c
> index e579766..76bc8c7 100644
> --- a/hmp.c
> +++ b/hmp.c
> @@ -222,6 +222,8 @@ void hmp_info_migrate(Monitor *mon, const QDict *qdict)
>              monitor_printf(mon, "postcopy request count: %" PRIu64 "\n",
>                             info->ram->postcopy_requests);
>          }
> +        monitor_printf(mon, "multifd: %" PRIu64 " pages\n",
> +                       info->ram->multifd);
>      }
> 
>      if (info->has_disk) {
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 3989bd6..b3e4f31 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -282,6 +282,7 @@ uint64_t xbzrle_mig_pages_transferred(void);
>  uint64_t xbzrle_mig_pages_overflow(void);
>  uint64_t xbzrle_mig_pages_cache_miss(void);
>  double xbzrle_mig_cache_miss_rate(void);
> +uint64_t multifd_mig_pages_transferred(void);
> 
>  void ram_handle_compressed(void *host, uint8_t ch, uint64_t size);
>  void ram_debug_dump_bitmap(unsigned long *todump, bool expected);
> diff --git a/migration/migration.c b/migration/migration.c
> index ab48f06..1d62b91 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -652,6 +652,7 @@ static void populate_ram_info(MigrationInfo *info, MigrationState *s)
>      info->ram->mbps = s->mbps;
>      info->ram->dirty_sync_count = s->dirty_sync_count;
>      info->ram->postcopy_requests = s->postcopy_requests;
> +    info->ram->multifd = multifd_mig_pages_transferred();
> 
>      if (s->state != MIGRATION_STATUS_COMPLETED) {
>          info->ram->remaining = ram_bytes_remaining();
> diff --git a/migration/ram.c b/migration/ram.c
> index 5ad7cb3..c71929e 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -61,6 +61,7 @@ static uint64_t bitmap_sync_count;
>  #define RAM_SAVE_FLAG_XBZRLE   0x40
>  /* 0x80 is reserved in migration.h start with 0x100 next */
>  #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
> +#define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
> 
>  static uint8_t *ZERO_TARGET_PAGE;
> 
> @@ -141,6 +142,7 @@ typedef struct AccountingInfo {
>      uint64_t dup_pages;
>      uint64_t skipped_pages;
>      uint64_t norm_pages;
> +    uint64_t multifd_pages;
>      uint64_t iterations;
>      uint64_t xbzrle_bytes;
>      uint64_t xbzrle_pages;
> @@ -211,6 +213,11 @@ uint64_t xbzrle_mig_pages_overflow(void)
>      return acct_info.xbzrle_overflows;
>  }
> 
> +uint64_t multifd_mig_pages_transferred(void)
> +{
> +    return acct_info.multifd_pages;
> +}
> +
>  /* This is the last block that we have visited serching for dirty pages
>   */
>  static RAMBlock *last_seen_block;
> @@ -990,6 +997,33 @@ static int ram_save_page(QEMUFile *f, PageSearchStatus *pss,
>      return pages;
>  }
> 
> +static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
> +                            bool last_stage, uint64_t *bytes_transferred)
> +{
> +    int pages;
> +    uint8_t *p;
> +    RAMBlock *block = pss->block;
> +    ram_addr_t offset = pss->offset;
> +
> +    p = block->host + offset;
> +
> +    if (block == last_sent_block) {
> +        offset |= RAM_SAVE_FLAG_CONTINUE;
> +    }
> +    pages = save_zero_page(f, block, offset, p, bytes_transferred);
> +    if (pages == -1) {
> +        *bytes_transferred +=
> +            save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
> +        qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
> +        *bytes_transferred += TARGET_PAGE_SIZE;
> +        pages = 1;
> +        acct_info.norm_pages++;
> +        acct_info.multifd_pages++;
> +    }
> +
> +    return pages;
> +}
> +
>  static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
>                                  ram_addr_t offset)
>  {
> @@ -1427,6 +1461,8 @@ static int ram_save_target_page(MigrationState *ms, QEMUFile *f,
>              res = ram_save_compressed_page(f, pss,
>                                             last_stage,
>                                             bytes_transferred);
> +        } else if (migrate_use_multifd()) {
> +            res = ram_multifd_page(f, pss, last_stage, bytes_transferred);
>          } else {
>              res = ram_save_page(f, pss, last_stage,
>                                  bytes_transferred);
> @@ -2678,6 +2714,10 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>      if (!migrate_use_compression()) {
>          invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
>      }
> +
> +    if (!migrate_use_multifd()) {
> +        invalid_flags |= RAM_SAVE_FLAG_MULTIFD_PAGE;
> +    }
>      /* This RCU critical section can be very long running.
>       * When RCU reclaims in the code start to become numerous,
>       * it will be necessary to reduce the granularity of this
> @@ -2705,13 +2745,17 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>              if (flags & invalid_flags  & RAM_SAVE_FLAG_COMPRESS_PAGE) {
>                  error_report("Received an unexpected compressed page");
>              }
> +            if (flags & invalid_flags  & RAM_SAVE_FLAG_MULTIFD_PAGE) {
> +                error_report("Received an unexpected multifd page");
> +            }
> 
>              ret = -EINVAL;
>              break;
>          }
> 
>          if (flags & (RAM_SAVE_FLAG_COMPRESS | RAM_SAVE_FLAG_PAGE |
> -                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
> +                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
> +                     RAM_SAVE_FLAG_MULTIFD_PAGE)) {
>              RAMBlock *block = ram_block_from_stream(f, flags);
> 
>              host = host_from_ram_block_offset(block, addr);
> @@ -2786,6 +2830,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>                  break;
>              }
>              break;
> +
> +        case RAM_SAVE_FLAG_MULTIFD_PAGE:
> +            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
> +            break;
> +
>          case RAM_SAVE_FLAG_EOS:
>              /* normal exit */
>              break;
> diff --git a/qapi-schema.json b/qapi-schema.json
> index 54232ee..3e93f7f 100644
> --- a/qapi-schema.json
> +++ b/qapi-schema.json
> @@ -574,6 +574,7 @@
>  #
>  # @postcopy-requests: The number of page requests received from the destination
>  #        (since 2.7)
> +# @multifd: number of pages sent with multifd (since 2.9)
>  #
>  # Since: 0.14.0
>  ##
> @@ -582,7 +583,8 @@
>             'duplicate': 'int', 'skipped': 'int', 'normal': 'int',
>             'normal-bytes': 'int', 'dirty-pages-rate' : 'int',
>             'mbps' : 'number', 'dirty-sync-count' : 'int',
> -           'postcopy-requests' : 'int' } }
> +           'postcopy-requests' : 'int',
> +           'multifd' : 'int'} }
> 
>  ##
>  # @XBZRLECacheStats:
> -- 
> 2.9.3
> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 11/17] migration: Create thread infrastructure for multifd send side
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 11/17] migration: Create thread infrastructure for multifd send side Juan Quintela
  2017-01-26 12:38   ` Paolo Bonzini
@ 2017-02-02 12:03   ` Dr. David Alan Gilbert
  2017-02-13 16:40     ` Juan Quintela
  1 sibling, 1 reply; 57+ messages in thread
From: Dr. David Alan Gilbert @ 2017-02-02 12:03 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> We make the locking and the transfer of information specific, even if we
> are still transmiting things through the main thread.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 52 insertions(+), 1 deletion(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index c71929e..9d7bc64 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -392,17 +392,25 @@ void migrate_compress_threads_create(void)
>  /* Multiple fd's */
> 
>  struct MultiFDSendParams {
> +    /* not changed */
>      QemuThread thread;
>      QIOChannel *c;
>      QemuCond cond;
>      QemuMutex mutex;
> +    /* protected by param mutex */
>      bool quit;
>      bool started;
> +    uint8_t *address;
> +    /* protected by multifd mutex */
> +    bool done;
>  };
>  typedef struct MultiFDSendParams MultiFDSendParams;
> 
>  static MultiFDSendParams *multifd_send;
> 
> +QemuMutex multifd_send_mutex;
> +QemuCond multifd_send_cond;
> +
>  static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *params = opaque;
> @@ -416,7 +424,17 @@ static void *multifd_send_thread(void *opaque)
> 
>      qemu_mutex_lock(&params->mutex);
>      while (!params->quit){
> -        qemu_cond_wait(&params->cond, &params->mutex);
> +        if (params->address) {
> +            params->address = 0;

This confused me (I wondered what happens to the 1st block) but
I see in the next patch this gets replaced by something more complex;
so I suggest just using params->dummy and commented it's about
to get replaced.

> +            qemu_mutex_unlock(&params->mutex);
> +            qemu_mutex_lock(&multifd_send_mutex);
> +            params->done = true;
> +            qemu_cond_signal(&multifd_send_cond);
> +            qemu_mutex_unlock(&multifd_send_mutex);
> +            qemu_mutex_lock(&params->mutex);
> +        } else {
> +            qemu_cond_wait(&params->cond, &params->mutex);
> +        }
>      }
>      qemu_mutex_unlock(&params->mutex);
> 
> @@ -464,12 +482,16 @@ void migrate_multifd_send_threads_create(void)
>      }
>      thread_count = migrate_multifd_threads();
>      multifd_send = g_new0(MultiFDSendParams, thread_count);
> +    qemu_mutex_init(&multifd_send_mutex);
> +    qemu_cond_init(&multifd_send_cond);
>      for (i = 0; i < thread_count; i++) {
>          char thread_name[15];
>          qemu_mutex_init(&multifd_send[i].mutex);
>          qemu_cond_init(&multifd_send[i].cond);
>          multifd_send[i].quit = false;
>          multifd_send[i].started = false;
> +        multifd_send[i].done = true;
> +        multifd_send[i].address = 0;
>          multifd_send[i].c = socket_send_channel_create();
>          if(!multifd_send[i].c) {
>              error_report("Error creating a send channel");
> @@ -487,6 +509,34 @@ void migrate_multifd_send_threads_create(void)
>      }
>  }
> 
> +static int multifd_send_page(uint8_t *address)
> +{
> +    int i, thread_count;
> +    bool found = false;
> +
> +    thread_count = migrate_multifd_threads();
> +    qemu_mutex_lock(&multifd_send_mutex);
> +    while (!found) {
> +        for (i = 0; i < thread_count; i++) {
> +            if (multifd_send[i].done) {
> +                multifd_send[i].done = false;
> +                found = true;
> +                break;
> +            }
> +        }
> +        if (!found) {
> +            qemu_cond_wait(&multifd_send_cond, &multifd_send_mutex);
> +        }
> +    }
> +    qemu_mutex_unlock(&multifd_send_mutex);
> +    qemu_mutex_lock(&multifd_send[i].mutex);

Having a 'multifd_send_mutex' and a
         'multifd_send[i].mutex'
is pretty confusing!

> +    multifd_send[i].address = address;
> +    qemu_cond_signal(&multifd_send[i].cond);
> +    qemu_mutex_unlock(&multifd_send[i].mutex);
> +
> +    return 0;
> +}
> +
>  struct MultiFDRecvParams {
>      QemuThread thread;
>      QIOChannel *c;
> @@ -1015,6 +1065,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
>          *bytes_transferred +=
>              save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
>          qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
> +        multifd_send_page(p);
>          *bytes_transferred += TARGET_PAGE_SIZE;
>          pages = 1;
>          acct_info.norm_pages++;
> -- 
> 2.9.3

I think I'm pretty OK with this; but we'll see what it looks like
after you think about Paolo's suggestion; it does feel like it should
be possible to do the locking etc simpler; I just don't know how.

Dave

--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 06/17] migration: Create x-multifd-threads parameter
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 06/17] migration: Create x-multifd-threads parameter Juan Quintela
@ 2017-02-02 15:06   ` Eric Blake
  2017-02-09 17:28     ` Juan Quintela
  0 siblings, 1 reply; 57+ messages in thread
From: Eric Blake @ 2017-02-02 15:06 UTC (permalink / raw)
  To: Juan Quintela, qemu-devel; +Cc: amit.shah, dgilbert

[-- Attachment #1: Type: text/plain, Size: 1020 bytes --]

On 01/23/2017 03:32 PM, Juan Quintela wrote:
> Indicates the number of threads that we would create.  By default we
> create 2 threads.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
> ---

> +++ b/qapi-schema.json
> @@ -981,13 +981,17 @@
>  # @x-checkpoint-delay: The delay time (in ms) between two COLO checkpoints in
>  #          periodic mode. (Since 2.8)
>  #
> +# @x-multifd-threads: Number of threads used to migrate data in parallel
> +#                     The default value is 2 (since 2.9)


> +#
> +# @x-multifd-threads: Number of threads used to migrate data in parallel
> +#                     The default value is 1 (since 2.9)

So which is it? I understand why you document it twice (one for setting
the value, and one for listing the current setting), but not why you
have two different defaults.

-- 
Eric Blake   eblake redhat com    +1-919-301-3266
Libvirt virtualization library http://libvirt.org


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 604 bytes --]

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 12/17] migration: really use multiple pages at a time
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 12/17] migration: really use multiple pages at a time Juan Quintela
@ 2017-02-03 10:54   ` Dr. David Alan Gilbert
  2017-02-13 16:47     ` Juan Quintela
  0 siblings, 1 reply; 57+ messages in thread
From: Dr. David Alan Gilbert @ 2017-02-03 10:54 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> We now send several pages at a time each time that we wakeup a thread.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 44 ++++++++++++++++++++++++++++++++++++++------
>  1 file changed, 38 insertions(+), 6 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 9d7bc64..1267730 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -391,6 +391,13 @@ void migrate_compress_threads_create(void)
> 
>  /* Multiple fd's */
> 
> +
> +typedef struct {
> +    int num;
> +    int size;
> +    uint8_t **address;
> +} multifd_pages_t;

The naming is odd for QEMU; should be MultiFDPages ?
You might want to make num & size unsigned.

I was trying to understand why you store 'size' - is that because you worry about
someone changing the size parameter while we're running?  But given that we call
init in a few places I'm not sure it covers it.


>  struct MultiFDSendParams {
>      /* not changed */
>      QemuThread thread;
> @@ -400,7 +407,7 @@ struct MultiFDSendParams {
>      /* protected by param mutex */
>      bool quit;
>      bool started;
> -    uint8_t *address;
> +    multifd_pages_t pages;
>      /* protected by multifd mutex */
>      bool done;
>  };
> @@ -424,8 +431,8 @@ static void *multifd_send_thread(void *opaque)
> 
>      qemu_mutex_lock(&params->mutex);
>      while (!params->quit){
> -        if (params->address) {
> -            params->address = 0;
> +        if (params->pages.num) {
> +            params->pages.num = 0;
>              qemu_mutex_unlock(&params->mutex);
>              qemu_mutex_lock(&multifd_send_mutex);
>              params->done = true;
> @@ -473,6 +480,13 @@ void migrate_multifd_send_threads_join(void)
>      multifd_send = NULL;
>  }
> 
> +static void multifd_init_group(multifd_pages_t *pages)
> +{
> +    pages->num = 0;
> +    pages->size = migrate_multifd_group();
> +    pages->address = g_malloc0(pages->size * sizeof(uint8_t *));

g_new0(uint8_t *, pages->size)

> +}
> +
>  void migrate_multifd_send_threads_create(void)
>  {
>      int i, thread_count;
> @@ -491,7 +505,7 @@ void migrate_multifd_send_threads_create(void)
>          multifd_send[i].quit = false;
>          multifd_send[i].started = false;
>          multifd_send[i].done = true;
> -        multifd_send[i].address = 0;
> +        multifd_init_group(&multifd_send[i].pages);
>          multifd_send[i].c = socket_send_channel_create();
>          if(!multifd_send[i].c) {
>              error_report("Error creating a send channel");
> @@ -511,8 +525,22 @@ void migrate_multifd_send_threads_create(void)
> 
>  static int multifd_send_page(uint8_t *address)

Can you comment multifd_send_page to explain what it returns.

(Do we really need u16 for fd number? More than 256 streams would seem
surprising).

>  {
> -    int i, thread_count;
> +    int i, j, thread_count;
>      bool found = false;
> +    static multifd_pages_t pages;
> +    static bool once = false;
> +
> +    if (!once) {
> +        multifd_init_group(&pages);
> +        once = true;
> +    }
> +
> +    pages.address[pages.num] = address;
> +    pages.num++;
> +
> +    if (pages.num < (pages.size - 1)) {
> +        return UINT16_MAX;
> +    }
> 
>      thread_count = migrate_multifd_threads();
>      qemu_mutex_lock(&multifd_send_mutex);
> @@ -530,7 +558,11 @@ static int multifd_send_page(uint8_t *address)
>      }
>      qemu_mutex_unlock(&multifd_send_mutex);
>      qemu_mutex_lock(&multifd_send[i].mutex);
> -    multifd_send[i].address = address;
> +    multifd_send[i].pages.num = pages.num;
> +    for(j = 0; j < pages.size; j++) {
> +        multifd_send[i].pages.address[j] = pages.address[j];
> +    }
> +    pages.num = 0;
>      qemu_cond_signal(&multifd_send[i].cond);
>      qemu_mutex_unlock(&multifd_send[i].mutex);
> 
> -- 
> 2.9.3

Dave

> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 13/17] migration: Send the fd number which we are going to use for this page
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 13/17] migration: Send the fd number which we are going to use for this page Juan Quintela
@ 2017-02-03 10:59   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 57+ messages in thread
From: Dr. David Alan Gilbert @ 2017-02-03 10:59 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> We are still sending the page through the main channel, that would
> change later in the series
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> ---
>  migration/ram.c | 13 +++++++++++--
>  1 file changed, 11 insertions(+), 2 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 1267730..ca94704 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -566,7 +566,7 @@ static int multifd_send_page(uint8_t *address)
>      qemu_cond_signal(&multifd_send[i].cond);
>      qemu_mutex_unlock(&multifd_send[i].mutex);
> 
> -    return 0;
> +    return i;
>  }
> 
>  struct MultiFDRecvParams {
> @@ -1083,6 +1083,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
>                              bool last_stage, uint64_t *bytes_transferred)
>  {
>      int pages;
> +    uint16_t fd_num;
>      uint8_t *p;
>      RAMBlock *block = pss->block;
>      ram_addr_t offset = pss->offset;
> @@ -1096,8 +1097,10 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
>      if (pages == -1) {
>          *bytes_transferred +=
>              save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
> +        fd_num = multifd_send_page(p);
> +        qemu_put_be16(f, fd_num);
> +        *bytes_transferred += 2; /* size of fd_num */
>          qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
> -        multifd_send_page(p);
>          *bytes_transferred += TARGET_PAGE_SIZE;
>          pages = 1;
>          acct_info.norm_pages++;
> @@ -2815,6 +2818,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>      while (!postcopy_running && !ret && !(flags & RAM_SAVE_FLAG_EOS)) {
>          ram_addr_t addr, total_ram_bytes;
>          void *host = NULL;
> +        uint16_t fd_num;
>          uint8_t ch;
> 
>          addr = qemu_get_be64(f);
> @@ -2915,6 +2919,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>              break;
> 
>          case RAM_SAVE_FLAG_MULTIFD_PAGE:
> +            fd_num = qemu_get_be16(f);
> +            if (fd_num != 0) {
> +                /* this is yet an unused variable, changed later */
> +                fd_num = fd_num;
> +            }
>              qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
>              break;
> 
> -- 
> 2.9.3
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 14/17] migration: Create thread infrastructure for multifd recv side
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 14/17] migration: Create thread infrastructure for multifd recv side Juan Quintela
  2017-01-26 12:39   ` Paolo Bonzini
@ 2017-02-03 11:24   ` Dr. David Alan Gilbert
  2017-02-13 16:56     ` Juan Quintela
  1 sibling, 1 reply; 57+ messages in thread
From: Dr. David Alan Gilbert @ 2017-02-03 11:24 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> We make the locking and the transfer of information specific, even if we
> are still receiving things through the main thread.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 77 +++++++++++++++++++++++++++++++++++++++++++++++++--------
>  1 file changed, 67 insertions(+), 10 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index ca94704..4e530ea 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -523,7 +523,7 @@ void migrate_multifd_send_threads_create(void)
>      }
>  }
> 
> -static int multifd_send_page(uint8_t *address)
> +static uint16_t multifd_send_page(uint8_t *address, bool last_page)
>  {
>      int i, j, thread_count;
>      bool found = false;
> @@ -538,8 +538,10 @@ static int multifd_send_page(uint8_t *address)
>      pages.address[pages.num] = address;
>      pages.num++;
> 
> -    if (pages.num < (pages.size - 1)) {
> -        return UINT16_MAX;
> +    if (!last_page) {
> +        if (pages.num < (pages.size - 1)) {
> +            return UINT16_MAX;
> +        }
>      }

This should be in the previous patch?
(and the place that adds the last_page parameter below)?

> 
>      thread_count = migrate_multifd_threads();
> @@ -570,17 +572,25 @@ static int multifd_send_page(uint8_t *address)
>  }
> 
>  struct MultiFDRecvParams {
> +    /* not changed */
>      QemuThread thread;
>      QIOChannel *c;
>      QemuCond cond;
>      QemuMutex mutex;
> +    /* proteced by param mutex */
>      bool quit;
>      bool started;
> +    multifd_pages_t pages;
> +    /* proteced by multifd mutex */
> +    bool done;
>  };
>  typedef struct MultiFDRecvParams MultiFDRecvParams;
> 
>  static MultiFDRecvParams *multifd_recv;
> 
> +QemuMutex multifd_recv_mutex;
> +QemuCond  multifd_recv_cond;
> +
>  static void *multifd_recv_thread(void *opaque)
>  {
>      MultiFDRecvParams *params = opaque;
> @@ -594,7 +604,17 @@ static void *multifd_recv_thread(void *opaque)
> 
>      qemu_mutex_lock(&params->mutex);
>      while (!params->quit){
> -        qemu_cond_wait(&params->cond, &params->mutex);
> +        if (params->pages.num) {
> +            params->pages.num = 0;
> +            qemu_mutex_unlock(&params->mutex);
> +            qemu_mutex_lock(&multifd_recv_mutex);
> +            params->done = true;
> +            qemu_cond_signal(&multifd_recv_cond);
> +            qemu_mutex_unlock(&multifd_recv_mutex);
> +            qemu_mutex_lock(&params->mutex);
> +        } else {
> +            qemu_cond_wait(&params->cond, &params->mutex);
> +        }
>      }
>      qemu_mutex_unlock(&params->mutex);
> 
> @@ -647,8 +667,9 @@ void migrate_multifd_recv_threads_create(void)
>          qemu_cond_init(&multifd_recv[i].cond);
>          multifd_recv[i].quit = false;
>          multifd_recv[i].started = false;
> +        multifd_recv[i].done = true;
> +        multifd_init_group(&multifd_recv[i].pages);
>          multifd_recv[i].c = socket_recv_channel_create();
> -
>          if(!multifd_recv[i].c) {
>              error_report("Error creating a recv channel");
>              exit(0);
> @@ -664,6 +685,45 @@ void migrate_multifd_recv_threads_create(void)
>      }
>  }
> 
> +static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
> +{
> +    int i, thread_count;
> +    MultiFDRecvParams *params;
> +    static multifd_pages_t pages;
> +    static bool once = false;
> +
> +    if (!once) {
> +        multifd_init_group(&pages);
> +        once = true;
> +    }
> +
> +    pages.address[pages.num] = address;
> +    pages.num++;
> +
> +    if (fd_num == UINT16_MAX) {
> +        return;
> +    }
> +
> +    thread_count = migrate_multifd_threads();
> +    assert(fd_num < thread_count);
> +    params = &multifd_recv[fd_num];
> +
> +    qemu_mutex_lock(&multifd_recv_mutex);
> +    while (!params->done) {
> +        qemu_cond_wait(&multifd_recv_cond, &multifd_recv_mutex);
> +    }
> +    params->done = false;
> +    qemu_mutex_unlock(&multifd_recv_mutex);
> +    qemu_mutex_lock(&params->mutex);
> +    for(i = 0; i < pages.num; i++) {
> +        params->pages.address[i] = pages.address[i];
> +    }
> +    params->pages.num = pages.num;
> +    pages.num = 0;
> +    qemu_cond_signal(&params->cond);
> +    qemu_mutex_unlock(&params->mutex);
> +}
> +
>  /**
>   * save_page_header: Write page header to wire
>   *
> @@ -1097,7 +1157,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
>      if (pages == -1) {
>          *bytes_transferred +=
>              save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
> -        fd_num = multifd_send_page(p);
> +        fd_num = multifd_send_page(p, migration_dirty_pages == 1);
>          qemu_put_be16(f, fd_num);
>          *bytes_transferred += 2; /* size of fd_num */
>          qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
> @@ -2920,10 +2980,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
> 
>          case RAM_SAVE_FLAG_MULTIFD_PAGE:
>              fd_num = qemu_get_be16(f);
> -            if (fd_num != 0) {
> -                /* this is yet an unused variable, changed later */
> -                fd_num = fd_num;
> -            }
> +            multifd_recv_page(host, fd_num);

This is going to be quite tricky to fit into ram_load_postcopy
in this form; somehow it's going to have to find addresses to use for place page
and with anything with a page size != target page size it gets messy.

Dave

>              qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
>              break;
> 
> -- 
> 2.9.3
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 15/17] migration: Test new fd infrastructure
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 15/17] migration: Test new fd infrastructure Juan Quintela
@ 2017-02-03 11:36   ` Dr. David Alan Gilbert
  2017-02-13 16:57     ` Juan Quintela
  2017-02-14 11:15   ` Daniel P. Berrange
  1 sibling, 1 reply; 57+ messages in thread
From: Dr. David Alan Gilbert @ 2017-02-03 11:36 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> We just send the address through the alternate channels and test that it
> is ok.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 36 ++++++++++++++++++++++++++++++++++++
>  1 file changed, 36 insertions(+)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 4e530ea..95af694 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -432,8 +432,22 @@ static void *multifd_send_thread(void *opaque)
>      qemu_mutex_lock(&params->mutex);
>      while (!params->quit){
>          if (params->pages.num) {
> +            int i;
> +            int num;
> +
> +            num = params->pages.num;
>              params->pages.num = 0;
>              qemu_mutex_unlock(&params->mutex);
> +
> +            for(i=0; i < num; i++) {
> +                if (qio_channel_write(params->c,
> +                                      (const char *)&params->pages.address[i],
> +                                      sizeof(uint8_t *), &error_abort)
> +                    != sizeof(uint8_t*)) {
> +                    /* Shuoudn't ever happen */
> +                    exit(-1);
> +                }

Nope, need to find a way to cleanly find the migration; that
might actually be tricky from one of these threads?

> +            }
>              qemu_mutex_lock(&multifd_send_mutex);
>              params->done = true;
>              qemu_cond_signal(&multifd_send_cond);
> @@ -594,6 +608,7 @@ QemuCond  multifd_recv_cond;
>  static void *multifd_recv_thread(void *opaque)
>  {
>      MultiFDRecvParams *params = opaque;
> +    uint8_t *recv_address;
>      char start;
> 
>      qio_channel_read(params->c, &start, 1, &error_abort);
> @@ -605,8 +620,29 @@ static void *multifd_recv_thread(void *opaque)
>      qemu_mutex_lock(&params->mutex);
>      while (!params->quit){
>          if (params->pages.num) {
> +            int i;
> +            int num;
> +
> +            num = params->pages.num;
>              params->pages.num = 0;
>              qemu_mutex_unlock(&params->mutex);
> +
> +            for(i = 0; i < num; i++) {
> +                if (qio_channel_read(params->c,
> +                                     (char *)&recv_address,
> +                                     sizeof(uint8_t*), &error_abort)
> +                    != sizeof(uint8_t *)) {
> +                    /* shouldn't ever happen */
> +                    exit(-1);
> +                }
> +                if (recv_address != params->pages.address[i]) {
> +                    printf("We received %p what we were expecting %p (%d)\n",
> +                           recv_address,
> +                           params->pages.address[i], i);
> +                    exit(-1);
> +                }
> +            }
> +
>              qemu_mutex_lock(&multifd_recv_mutex);
>              params->done = true;
>              qemu_cond_signal(&multifd_recv_cond);
> -- 
> 2.9.3
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 16/17] migration: [HACK]Transfer pages over new channels
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 16/17] migration: [HACK]Transfer pages over new channels Juan Quintela
@ 2017-02-03 11:41   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 57+ messages in thread
From: Dr. David Alan Gilbert @ 2017-02-03 11:41 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> We switch for sending the page number to send real pages.
> 
> [HACK]
> How we calculate the bandwidth is beyond repair, there is a hack there
> that would work for x86 and archs that have 4kb pages.
> 
> If you are having a nice day just go to migration/ram.c and look at
> acct_update_position().  Now you are depressed, right?
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/migration.c | 15 +++++++++++----
>  migration/ram.c       | 25 +++++++++----------------
>  2 files changed, 20 insertions(+), 20 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index 1d62b91..cbbf2a3 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -1839,7 +1839,8 @@ static void *migration_thread(void *opaque)
>      /* Used by the bandwidth calcs, updated later */
>      int64_t initial_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
>      int64_t setup_start = qemu_clock_get_ms(QEMU_CLOCK_HOST);
> -    int64_t initial_bytes = 0;
> +    int64_t qemu_file_bytes = 0;
> +    int64_t multifd_pages = 0;
>      int64_t max_size = 0;
>      int64_t start_time = initial_time;
>      int64_t end_time;
> @@ -1923,9 +1924,14 @@ static void *migration_thread(void *opaque)
>          }
>          current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
>          if (current_time >= initial_time + BUFFER_DELAY) {
> -            uint64_t transferred_bytes = qemu_ftell(s->to_dst_file) -
> -                                         initial_bytes;
>              uint64_t time_spent = current_time - initial_time;
> +            uint64_t qemu_file_bytes_now = qemu_ftell(s->to_dst_file);
> +            uint64_t multifd_pages_now = multifd_mig_pages_transferred();
> +            /* Hack ahead.  Why the hell we don't have a function to now the
> +               target_page_size.  Hard coding it to 4096 */
> +            uint64_t transferred_bytes =
> +                (qemu_file_bytes_now - qemu_file_bytes) +
> +                (multifd_pages_now - multifd_pages) * 4096;

The problem was that it was target specific so the macros aren't allowed in
migration.c that's target independent; but you'll see I added the
qemu_target_page_bits() function, so you can do  1ul << qemu_target_page_bits()

Still, I don't quite understand why we need the transferred_bytes hack;
can't you just accumulate them as you pass them to multifd_send_page ?

>              double bandwidth = (double)transferred_bytes / time_spent;
>              max_size = bandwidth * s->parameters.downtime_limit;
> 
> @@ -1942,7 +1948,8 @@ static void *migration_thread(void *opaque)
> 
>              qemu_file_reset_rate_limit(s->to_dst_file);
>              initial_time = current_time;
> -            initial_bytes = qemu_ftell(s->to_dst_file);
> +            qemu_file_bytes = qemu_file_bytes_now;
> +            multifd_pages = multifd_pages_now;
>          }
>          if (qemu_file_rate_limit(s->to_dst_file)) {
>              /* usleep expects microseconds */
> diff --git a/migration/ram.c b/migration/ram.c
> index 95af694..28d099f 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -441,9 +441,9 @@ static void *multifd_send_thread(void *opaque)
> 
>              for(i=0; i < num; i++) {
>                  if (qio_channel_write(params->c,
> -                                      (const char *)&params->pages.address[i],
> -                                      sizeof(uint8_t *), &error_abort)
> -                    != sizeof(uint8_t*)) {
> +                                      (const char *)params->pages.address[i],
> +                                      TARGET_PAGE_SIZE, &error_abort)
> +                    != TARGET_PAGE_SIZE) {
>                      /* Shuoudn't ever happen */
>                      exit(-1);
>                  }
> @@ -608,7 +608,6 @@ QemuCond  multifd_recv_cond;
>  static void *multifd_recv_thread(void *opaque)
>  {
>      MultiFDRecvParams *params = opaque;
> -    uint8_t *recv_address;
>      char start;
> 
>      qio_channel_read(params->c, &start, 1, &error_abort);
> @@ -629,20 +628,13 @@ static void *multifd_recv_thread(void *opaque)
> 
>              for(i = 0; i < num; i++) {
>                  if (qio_channel_read(params->c,
> -                                     (char *)&recv_address,
> -                                     sizeof(uint8_t*), &error_abort)
> -                    != sizeof(uint8_t *)) {
> +                                     (char *)params->pages.address[i],
> +                                     TARGET_PAGE_SIZE, &error_abort)
> +                    != TARGET_PAGE_SIZE) {
>                      /* shouldn't ever happen */
>                      exit(-1);
>                  }
> -                if (recv_address != params->pages.address[i]) {
> -                    printf("We received %p what we were expecting %p (%d)\n",
> -                           recv_address,
> -                           params->pages.address[i], i);
> -                    exit(-1);
> -                }
>              }
> -
>              qemu_mutex_lock(&multifd_recv_mutex);
>              params->done = true;
>              qemu_cond_signal(&multifd_recv_cond);
> @@ -1195,8 +1187,10 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
>              save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
>          fd_num = multifd_send_page(p, migration_dirty_pages == 1);
>          qemu_put_be16(f, fd_num);
> +        if (fd_num != UINT16_MAX) {
> +            qemu_fflush(f);
> +        }
>          *bytes_transferred += 2; /* size of fd_num */
> -        qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
>          *bytes_transferred += TARGET_PAGE_SIZE;
>          pages = 1;
>          acct_info.norm_pages++;
> @@ -3017,7 +3011,6 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>          case RAM_SAVE_FLAG_MULTIFD_PAGE:
>              fd_num = qemu_get_be16(f);
>              multifd_recv_page(host, fd_num);

I think also at some point you need to validate that fd_num that's
come off the wire.

Dave

> -            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
>              break;
> 
>          case RAM_SAVE_FLAG_EOS:
> -- 
> 2.9.3
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 17/17] migration: flush receive queue
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 17/17] migration: flush receive queue Juan Quintela
@ 2017-02-03 12:28   ` Dr. David Alan Gilbert
  2017-02-13 17:13     ` Juan Quintela
  0 siblings, 1 reply; 57+ messages in thread
From: Dr. David Alan Gilbert @ 2017-02-03 12:28 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> Each time that we sync the bitmap, it is a possiblity that we receive
> a page that is being processed by a different thread.  We fix this
> problem just making sure that we wait for all receiving threads to
> finish its work before we procedeed with the next stage.
> 
> We are low on page flags, so we use a combination that is not valid to
> emit that message:  MULTIFD_PAGE and COMPRESSED.
> 
> I tried to make a migration command for it, but it don't work because
> we sync the bitmap sometimes when we have already sent the beggining
> of the section, so I just added a new page flag.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

Is there something that makes sure that the very last page is marked
with a flush to ensure that the read completes before we start
trying to do anything after ram_load ?
As I read this the flag gets added to the first page of the next round.

Dave

> ---
>  include/migration/migration.h |  1 +
>  migration/ram.c               | 46 +++++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 47 insertions(+)
> 
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index b3e4f31..1bd6bc0 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -259,6 +259,7 @@ void migrate_multifd_send_threads_create(void);
>  void migrate_multifd_send_threads_join(void);
>  void migrate_multifd_recv_threads_create(void);
>  void migrate_multifd_recv_threads_join(void);
> +void qemu_savevm_send_multifd_flush(QEMUFile *f);
> 
>  void migrate_compress_threads_create(void);
>  void migrate_compress_threads_join(void);
> diff --git a/migration/ram.c b/migration/ram.c
> index 28d099f..3baead8 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -63,6 +63,13 @@ static uint64_t bitmap_sync_count;
>  #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
>  #define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
> 
> +/* We are getting low on pages flags, so we start using combinations 
> +   When we need to flush a page, we sent it as
> +   RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_COMPRESS_PAGE
> +   We don't allow that combination
> +*/
> +
> +
>  static uint8_t *ZERO_TARGET_PAGE;
> 
>  static inline bool is_zero_range(uint8_t *p, uint64_t size)
> @@ -391,6 +398,9 @@ void migrate_compress_threads_create(void)
> 
>  /* Multiple fd's */
> 
> +/* Indicates if we have synced the bitmap and we need to assure that
> +   target has processeed all previous pages */
> +bool multifd_needs_flush = false;
> 
>  typedef struct {
>      int num;
> @@ -752,6 +762,25 @@ static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
>      qemu_mutex_unlock(&params->mutex);
>  }
> 
> +
> +static int multifd_flush(void)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_use_multifd()) {
> +        return 0;
> +    }
> +    thread_count = migrate_multifd_threads();
> +    qemu_mutex_lock(&multifd_recv_mutex);
> +    for (i = 0; i < thread_count; i++) {
> +        while(!multifd_recv[i].done) {
> +            qemu_cond_wait(&multifd_recv_cond, &multifd_recv_mutex);
> +        }
> +    }
> +    qemu_mutex_unlock(&multifd_recv_mutex);
> +    return 0;
> +}
> +
>  /**
>   * save_page_header: Write page header to wire
>   *
> @@ -768,6 +797,12 @@ static size_t save_page_header(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
>  {
>      size_t size, len;
> 
> +    if (multifd_needs_flush &&
> +        (offset & RAM_SAVE_FLAG_MULTIFD_PAGE)) {
> +        offset |= RAM_SAVE_FLAG_COMPRESS;
> +        multifd_needs_flush = false;
> +    }
> +
>      qemu_put_be64(f, offset);
>      size = 8;
> 
> @@ -2450,6 +2485,9 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
> 
>      if (!migration_in_postcopy(migrate_get_current())) {
>          migration_bitmap_sync();
> +        if (migrate_use_multifd()) {
> +            multifd_needs_flush = true;
> +        }
>      }
> 
>      ram_control_before_iterate(f, RAM_CONTROL_FINISH);
> @@ -2491,6 +2529,9 @@ static void ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size,
>          qemu_mutex_lock_iothread();
>          rcu_read_lock();
>          migration_bitmap_sync();
> +        if (migrate_use_multifd()) {
> +            multifd_needs_flush = true;
> +        }
>          rcu_read_unlock();
>          qemu_mutex_unlock_iothread();
>          remaining_size = ram_save_remaining() * TARGET_PAGE_SIZE;
> @@ -2930,6 +2971,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>              break;
>          }
> 
> +        if ((flags & (RAM_SAVE_FLAG_MULTIFD_PAGE|RAM_SAVE_FLAG_COMPRESS))
> +                  == (RAM_SAVE_FLAG_MULTIFD_PAGE|RAM_SAVE_FLAG_COMPRESS)) {
> +            multifd_flush();
> +            flags = flags & ~RAM_SAVE_FLAG_COMPRESS;
> +        }
>          if (flags & (RAM_SAVE_FLAG_COMPRESS | RAM_SAVE_FLAG_PAGE |
>                       RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
>                       RAM_SAVE_FLAG_MULTIFD_PAGE)) {
> -- 
> 2.9.3
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 03/17] migration: Test for disabled features on reception
  2017-01-24 10:33   ` Dr. David Alan Gilbert
@ 2017-02-09 17:12     ` Juan Quintela
  0 siblings, 0 replies; 57+ messages in thread
From: Juan Quintela @ 2017-02-09 17:12 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, amit.shah

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> Right now, if we receive a compressed page or a xbzrle page while this
>> features are disabled, Bad Things (TM) can happen.  Just add a test for
>> them.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/ram.c | 23 ++++++++++++++++++++++-
>>  1 file changed, 22 insertions(+), 1 deletion(-)
>> 
>> diff --git a/migration/ram.c b/migration/ram.c
>> index ef8fadf..4ad814a 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -2455,7 +2455,7 @@ static int ram_load_postcopy(QEMUFile *f)
>> 
>>  static int ram_load(QEMUFile *f, void *opaque, int version_id)
>>  {
>> -    int flags = 0, ret = 0;
>> +    int flags = 0, ret = 0, invalid_flags;
>>      static uint64_t seq_iter;
>>      int len = 0;
>>      /*
>> @@ -2470,6 +2470,15 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>>          ret = -EINVAL;
>>      }
>> 
>> +    invalid_flags = 0;
>> +
>> +    if (!migrate_use_xbzrle()) {
>> +        invalid_flags |= RAM_SAVE_FLAG_XBZRLE;
>> +    }
>
> Is that really the case? I thought we used to ignore the flags on the incoming
> side and didn't need to enable xbzrle on the destination?

we don't need infrastructure for xbzrle, but we need it for compression.

Removing xbzrle.

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 06/17] migration: Create x-multifd-threads parameter
  2017-02-02 15:06   ` Eric Blake
@ 2017-02-09 17:28     ` Juan Quintela
  0 siblings, 0 replies; 57+ messages in thread
From: Juan Quintela @ 2017-02-09 17:28 UTC (permalink / raw)
  To: Eric Blake; +Cc: qemu-devel, amit.shah, dgilbert

Eric Blake <eblake@redhat.com> wrote:
> On 01/23/2017 03:32 PM, Juan Quintela wrote:
>> Indicates the number of threads that we would create.  By default we
>> create 2 threads.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
>> ---
>
>> +++ b/qapi-schema.json
>> @@ -981,13 +981,17 @@
>>  # @x-checkpoint-delay: The delay time (in ms) between two COLO checkpoints in
>>  #          periodic mode. (Since 2.8)
>>  #
>> +# @x-multifd-threads: Number of threads used to migrate data in parallel
>> +#                     The default value is 2 (since 2.9)
>
>
>> +#
>> +# @x-multifd-threads: Number of threads used to migrate data in parallel
>> +#                     The default value is 1 (since 2.9)
>
> So which is it? I understand why you document it twice (one for setting
> the value, and one for listing the current setting), but not why you
> have two different defaults.

Because of copy & paste, and only updating one of the places.

Move both of them to 2.

Thanks, Juan.

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 09/17] migration: Start of multiple fd work
  2017-01-27 17:45   ` Dr. David Alan Gilbert
@ 2017-02-13 16:34     ` Juan Quintela
  2017-02-13 16:39       ` Dr. David Alan Gilbert
  0 siblings, 1 reply; 57+ messages in thread
From: Juan Quintela @ 2017-02-13 16:34 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, amit.shah

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> We create new channels for each new thread created. We only send through
>> them a character to be sure that we are creating the channels in the
>> right order.
>> 
>> Note: Reference count/freeing of channels is not done
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  include/migration/migration.h |  6 +++++
>>  migration/ram.c               | 45 +++++++++++++++++++++++++++++++++-
>>  migration/socket.c            | 56 +++++++++++++++++++++++++++++++++++++++++--
>>  3 files changed, 104 insertions(+), 3 deletions(-)
>
> One thing not direclt in here, you should probably look at the migration cancel
> code to get it to call shutdown() on all your extra sockets, it stops things
> blocking in any one of them.

Will do.

>> 
>> diff --git a/include/migration/migration.h b/include/migration/migration.h
>> index f119ba0..3989bd6 100644
>> --- a/include/migration/migration.h
>> +++ b/include/migration/migration.h
>> @@ -22,6 +22,7 @@
>>  #include "qapi-types.h"
>>  #include "exec/cpu-common.h"
>>  #include "qemu/coroutine_int.h"
>> +#include "io/channel.h"
>> 
>>  #define QEMU_VM_FILE_MAGIC           0x5145564d
>>  #define QEMU_VM_FILE_VERSION_COMPAT  0x00000002
>> @@ -218,6 +219,11 @@ void tcp_start_incoming_migration(const char *host_port, Error **errp);
>> 
>>  void tcp_start_outgoing_migration(MigrationState *s, const char *host_port, Error **errp);
>> 
>> +QIOChannel *socket_recv_channel_create(void);
>> +int socket_recv_channel_destroy(QIOChannel *recv);
>> +QIOChannel *socket_send_channel_create(void);
>> +int socket_send_channel_destroy(QIOChannel *send);
>> +
>>  void unix_start_incoming_migration(const char *path, Error **errp);
>> 
>>  void unix_start_outgoing_migration(MigrationState *s, const char *path, Error **errp);
>> diff --git a/migration/ram.c b/migration/ram.c
>> index 939f364..5ad7cb3 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -386,9 +386,11 @@ void migrate_compress_threads_create(void)
>> 
>>  struct MultiFDSendParams {
>>      QemuThread thread;
>> +    QIOChannel *c;
>>      QemuCond cond;
>>      QemuMutex mutex;
>>      bool quit;
>> +    bool started;
>>  };
>>  typedef struct MultiFDSendParams MultiFDSendParams;
>> 
>> @@ -397,6 +399,13 @@ static MultiFDSendParams *multifd_send;
>>  static void *multifd_send_thread(void *opaque)
>>  {
>>      MultiFDSendParams *params = opaque;
>> +    char start = 's';
>> +
>> +    qio_channel_write(params->c, &start, 1, &error_abort);
>
> I'd be tempted to send something stronger as a guarantee
> that you're connecting the right thing to the right place;
> maybe something like a QEMU + UUID + fd index?
> I guarantee someone is going to mess up the fd's in the wrong
> order or connect some random other process to one of them.

which UUID? I can put anything there.

>> +    qemu_mutex_lock(&params->mutex);
>> +    params->started = true;
>> +    qemu_cond_signal(&params->cond);
>> +    qemu_mutex_unlock(&params->mutex);
>> 
>>      qemu_mutex_lock(&params->mutex);
>
> That unlock/lock pair is odd.

Fixed.

>
>>      while (!params->quit){
>> @@ -433,6 +442,7 @@ void migrate_multifd_send_threads_join(void)
>>          qemu_thread_join(&multifd_send[i].thread);
>>          qemu_mutex_destroy(&multifd_send[i].mutex);
>>          qemu_cond_destroy(&multifd_send[i].cond);
>> +        socket_send_channel_destroy(multifd_send[i].c);
>>      }
>>      g_free(multifd_send);
>>      multifd_send = NULL;
>> @@ -452,18 +462,31 @@ void migrate_multifd_send_threads_create(void)
>>          qemu_mutex_init(&multifd_send[i].mutex);
>>          qemu_cond_init(&multifd_send[i].cond);
>>          multifd_send[i].quit = false;
>> +        multifd_send[i].started = false;
>> +        multifd_send[i].c = socket_send_channel_create();
>> +        if(!multifd_send[i].c) {
>> +            error_report("Error creating a send channel");
>> +            exit(0);
>
> Hmm no exit!

I have to add the error path before to the callers :-(


>
> We need to be careful there since that's a sync; it depends what
> calls this, if I'm reading the code above correctly then it gets called
> from the main thread and that would be bad if it blocked; it's ok if it
> was the fd threads or the migration thread though.

I think it is from the migration thread, no?
/me checks

I stand corrected.  It is called from the main thread.  It works if
destination is not up.  It segfaults if destination is launched but not
conffigured for multithread.

Will post fix later.

Later, Juan.

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 10/17] migration: create ram_multifd_page
  2017-01-27 18:02   ` Dr. David Alan Gilbert
  2017-01-30 10:06     ` Juan Quintela
@ 2017-02-13 16:36     ` Juan Quintela
  2017-02-14 11:26       ` Dr. David Alan Gilbert
  1 sibling, 1 reply; 57+ messages in thread
From: Juan Quintela @ 2017-02-13 16:36 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, amit.shah

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> The function still don't use multifd, but we have simplified
>> ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
>> counter and a new flag for this type of pages.


>> +static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
>> +                            bool last_stage, uint64_t *bytes_transferred)
>> +{
>> +    int pages;
>> +    uint8_t *p;
>> +    RAMBlock *block = pss->block;
>> +    ram_addr_t offset = pss->offset;
>> +
>> +    p = block->host + offset;
>> +
>> +    if (block == last_sent_block) {
>> +        offset |= RAM_SAVE_FLAG_CONTINUE;
>> +    }
>> +    pages = save_zero_page(f, block, offset, p, bytes_transferred);
>> +    if (pages == -1) {
>> +        *bytes_transferred +=
>> +            save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
>> +        qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
>> +        *bytes_transferred += TARGET_PAGE_SIZE;
>> +        pages = 1;
>> +        acct_info.norm_pages++;
>> +        acct_info.multifd_pages++;
>> +    }
>> +
>> +    return pages;
>> +}
>> +
>>  static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
>>                                  ram_addr_t offset)
>>  {
>> @@ -1427,6 +1461,8 @@ static int ram_save_target_page(MigrationState *ms, QEMUFile *f,
>>              res = ram_save_compressed_page(f, pss,
>>                                             last_stage,
>>                                             bytes_transferred);
>> +        } else if (migrate_use_multifd()) {
>> +            res = ram_multifd_page(f, pss, last_stage, bytes_transferred);
>
> I'm curious whether it's best to pick the destination fd at this level or one level
> higher; for example would it be good to keep all the components of a
> host page or huge
> page together on the same fd? If so then it would be best to pick the fd
> at ram_save_host_page level.

my plan here would be to change the migration code to be able to call
with a bigger sizes, not page by page, and then the problem is solved by
itself?

Later, Juan.

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 11/17] migration: Create thread infrastructure for multifd send side
  2017-01-26 12:38   ` Paolo Bonzini
@ 2017-02-13 16:38     ` Juan Quintela
  0 siblings, 0 replies; 57+ messages in thread
From: Juan Quintela @ 2017-02-13 16:38 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, amit.shah, dgilbert

Paolo Bonzini <pbonzini@redhat.com> wrote:
> On 23/01/2017 22:32, Juan Quintela wrote:
>> We make the locking and the transfer of information specific, even if we
>> are still transmiting things through the main thread.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/ram.c | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
>>  1 file changed, 52 insertions(+), 1 deletion(-)
>> 
>> diff --git a/migration/ram.c b/migration/ram.c
>> index c71929e..9d7bc64 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -392,17 +392,25 @@ void migrate_compress_threads_create(void)
>>  /* Multiple fd's */
>> 
>>  struct MultiFDSendParams {
>> +    /* not changed */
>>      QemuThread thread;
>>      QIOChannel *c;
>>      QemuCond cond;
>>      QemuMutex mutex;
>> +    /* protected by param mutex */
>>      bool quit;
>>      bool started;
>> +    uint8_t *address;
>> +    /* protected by multifd mutex */
>> +    bool done;
>>  };
>>  typedef struct MultiFDSendParams MultiFDSendParams;
>> 
>>  static MultiFDSendParams *multifd_send;
>> 
>> +QemuMutex multifd_send_mutex;
>> +QemuCond multifd_send_cond;
>
> Having n+1 semaphores instead of n+1 cond/mutex pairs could be more
> efficient.  See thread-pool.c for an example.

Did that.  See next version.

Only partial success.   It goes faster, and code is somehow easier.
But on reception, I end having to add 3 sems for thread (ok, I could
move to only two reusing them, but indeed).  On send side, I got
speedups, on reception side no, but I haven't still found the cause.

Thanks, Juan.

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 09/17] migration: Start of multiple fd work
  2017-02-13 16:34     ` Juan Quintela
@ 2017-02-13 16:39       ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 57+ messages in thread
From: Dr. David Alan Gilbert @ 2017-02-13 16:39 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> > * Juan Quintela (quintela@redhat.com) wrote:
> >> We create new channels for each new thread created. We only send through
> >> them a character to be sure that we are creating the channels in the
> >> right order.
> >> 
> >> Note: Reference count/freeing of channels is not done
> >> 
> >> Signed-off-by: Juan Quintela <quintela@redhat.com>
> >> ---
> >>  include/migration/migration.h |  6 +++++
> >>  migration/ram.c               | 45 +++++++++++++++++++++++++++++++++-
> >>  migration/socket.c            | 56 +++++++++++++++++++++++++++++++++++++++++--
> >>  3 files changed, 104 insertions(+), 3 deletions(-)
> >
> > One thing not direclt in here, you should probably look at the migration cancel
> > code to get it to call shutdown() on all your extra sockets, it stops things
> > blocking in any one of them.
> 
> Will do.
> 
> >> 
> >> diff --git a/include/migration/migration.h b/include/migration/migration.h
> >> index f119ba0..3989bd6 100644
> >> --- a/include/migration/migration.h
> >> +++ b/include/migration/migration.h
> >> @@ -22,6 +22,7 @@
> >>  #include "qapi-types.h"
> >>  #include "exec/cpu-common.h"
> >>  #include "qemu/coroutine_int.h"
> >> +#include "io/channel.h"
> >> 
> >>  #define QEMU_VM_FILE_MAGIC           0x5145564d
> >>  #define QEMU_VM_FILE_VERSION_COMPAT  0x00000002
> >> @@ -218,6 +219,11 @@ void tcp_start_incoming_migration(const char *host_port, Error **errp);
> >> 
> >>  void tcp_start_outgoing_migration(MigrationState *s, const char *host_port, Error **errp);
> >> 
> >> +QIOChannel *socket_recv_channel_create(void);
> >> +int socket_recv_channel_destroy(QIOChannel *recv);
> >> +QIOChannel *socket_send_channel_create(void);
> >> +int socket_send_channel_destroy(QIOChannel *send);
> >> +
> >>  void unix_start_incoming_migration(const char *path, Error **errp);
> >> 
> >>  void unix_start_outgoing_migration(MigrationState *s, const char *path, Error **errp);
> >> diff --git a/migration/ram.c b/migration/ram.c
> >> index 939f364..5ad7cb3 100644
> >> --- a/migration/ram.c
> >> +++ b/migration/ram.c
> >> @@ -386,9 +386,11 @@ void migrate_compress_threads_create(void)
> >> 
> >>  struct MultiFDSendParams {
> >>      QemuThread thread;
> >> +    QIOChannel *c;
> >>      QemuCond cond;
> >>      QemuMutex mutex;
> >>      bool quit;
> >> +    bool started;
> >>  };
> >>  typedef struct MultiFDSendParams MultiFDSendParams;
> >> 
> >> @@ -397,6 +399,13 @@ static MultiFDSendParams *multifd_send;
> >>  static void *multifd_send_thread(void *opaque)
> >>  {
> >>      MultiFDSendParams *params = opaque;
> >> +    char start = 's';
> >> +
> >> +    qio_channel_write(params->c, &start, 1, &error_abort);
> >
> > I'd be tempted to send something stronger as a guarantee
> > that you're connecting the right thing to the right place;
> > maybe something like a QEMU + UUID + fd index?
> > I guarantee someone is going to mess up the fd's in the wrong
> > order or connect some random other process to one of them.
> 
> which UUID? I can put anything there.

I was thinking just something to stop two migrations getting mixed
up; but I see there is a qemu_uuid variable defined, might be as good
as anything.

> >> +    qemu_mutex_lock(&params->mutex);
> >> +    params->started = true;
> >> +    qemu_cond_signal(&params->cond);
> >> +    qemu_mutex_unlock(&params->mutex);
> >> 
> >>      qemu_mutex_lock(&params->mutex);
> >
> > That unlock/lock pair is odd.
> 
> Fixed.
> 
> >
> >>      while (!params->quit){
> >> @@ -433,6 +442,7 @@ void migrate_multifd_send_threads_join(void)
> >>          qemu_thread_join(&multifd_send[i].thread);
> >>          qemu_mutex_destroy(&multifd_send[i].mutex);
> >>          qemu_cond_destroy(&multifd_send[i].cond);
> >> +        socket_send_channel_destroy(multifd_send[i].c);
> >>      }
> >>      g_free(multifd_send);
> >>      multifd_send = NULL;
> >> @@ -452,18 +462,31 @@ void migrate_multifd_send_threads_create(void)
> >>          qemu_mutex_init(&multifd_send[i].mutex);
> >>          qemu_cond_init(&multifd_send[i].cond);
> >>          multifd_send[i].quit = false;
> >> +        multifd_send[i].started = false;
> >> +        multifd_send[i].c = socket_send_channel_create();
> >> +        if(!multifd_send[i].c) {
> >> +            error_report("Error creating a send channel");
> >> +            exit(0);
> >
> > Hmm no exit!
> 
> I have to add the error path before to the callers :-(
> 
> 
> >
> > We need to be careful there since that's a sync; it depends what
> > calls this, if I'm reading the code above correctly then it gets called
> > from the main thread and that would be bad if it blocked; it's ok if it
> > was the fd threads or the migration thread though.
> 
> I think it is from the migration thread, no?
> /me checks

Probably worth a comment saying which thread it comes from :-)

> I stand corrected.  It is called from the main thread.  It works if
> destination is not up.  It segfaults if destination is launched but not
> conffigured for multithread.

I think I was more worried what happens if the destination or network
is blocked.

Dave

> Will post fix later.
> 
> Later, Juan.
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 11/17] migration: Create thread infrastructure for multifd send side
  2017-02-02 12:03   ` Dr. David Alan Gilbert
@ 2017-02-13 16:40     ` Juan Quintela
  2017-02-14 11:58       ` Dr. David Alan Gilbert
  0 siblings, 1 reply; 57+ messages in thread
From: Juan Quintela @ 2017-02-13 16:40 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, amit.shah

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> We make the locking and the transfer of information specific, even if we
>> are still transmiting things through the main thread.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/ram.c | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
>>  1 file changed, 52 insertions(+), 1 deletion(-)
>> 
>> diff --git a/migration/ram.c b/migration/ram.c
>> index c71929e..9d7bc64 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -392,17 +392,25 @@ void migrate_compress_threads_create(void)
>>  /* Multiple fd's */
>> 
>>  struct MultiFDSendParams {
>> +    /* not changed */
>>      QemuThread thread;
>>      QIOChannel *c;
>>      QemuCond cond;
>>      QemuMutex mutex;
>> +    /* protected by param mutex */
>>      bool quit;
>>      bool started;
>> +    uint8_t *address;
>> +    /* protected by multifd mutex */
>> +    bool done;
>>  };
>>  typedef struct MultiFDSendParams MultiFDSendParams;
>> 
>>  static MultiFDSendParams *multifd_send;
>> 
>> +QemuMutex multifd_send_mutex;
>> +QemuCond multifd_send_cond;
>> +
>>  static void *multifd_send_thread(void *opaque)
>>  {
>>      MultiFDSendParams *params = opaque;
>> @@ -416,7 +424,17 @@ static void *multifd_send_thread(void *opaque)
>> 
>>      qemu_mutex_lock(&params->mutex);
>>      while (!params->quit){
>> -        qemu_cond_wait(&params->cond, &params->mutex);
>> +        if (params->address) {
>> +            params->address = 0;
>
> This confused me (I wondered what happens to the 1st block) but
> I see in the next patch this gets replaced by something more complex;
> so I suggest just using params->dummy and commented it's about
> to get replaced.

if you preffer, I wanted to minimize the change on the next patch,
otherwise I also have to change the places where I check the value of
address.


>> +    qemu_mutex_unlock(&multifd_send_mutex);
>> +    qemu_mutex_lock(&multifd_send[i].mutex);
>
> Having a 'multifd_send_mutex' and a
>          'multifd_send[i].mutex'
> is pretty confusing!

For different reason, I have moved all the

  multifd_send[i]. to "p->"

Better?


>
>> +    multifd_send[i].address = address;
>> +    qemu_cond_signal(&multifd_send[i].cond);
>> +    qemu_mutex_unlock(&multifd_send[i].mutex);
>> +
>> +    return 0;
>> +}
>> +
>>  struct MultiFDRecvParams {
>>      QemuThread thread;
>>      QIOChannel *c;
>> @@ -1015,6 +1065,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
>>          *bytes_transferred +=
>>              save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
>>          qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
>> +        multifd_send_page(p);
>>          *bytes_transferred += TARGET_PAGE_SIZE;
>>          pages = 1;
>>          acct_info.norm_pages++;
>> -- 
>> 2.9.3
>
> I think I'm pretty OK with this; but we'll see what it looks like
> after you think about Paolo's suggestion; it does feel like it should
> be possible to do the locking etc simpler; I just don't know how.

Locking can be simpler, but the problem is being speed :-(
Paolo suggestion have helped.
That our meassurement of bandwidth is lame, haven't :-(

Later, Juan.

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 12/17] migration: really use multiple pages at a time
  2017-02-03 10:54   ` Dr. David Alan Gilbert
@ 2017-02-13 16:47     ` Juan Quintela
  0 siblings, 0 replies; 57+ messages in thread
From: Juan Quintela @ 2017-02-13 16:47 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, amit.shah

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> We now send several pages at a time each time that we wakeup a thread.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/ram.c | 44 ++++++++++++++++++++++++++++++++++++++------
>>  1 file changed, 38 insertions(+), 6 deletions(-)
>> 
>> diff --git a/migration/ram.c b/migration/ram.c
>> index 9d7bc64..1267730 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -391,6 +391,13 @@ void migrate_compress_threads_create(void)
>> 
>>  /* Multiple fd's */
>> 
>> +
>> +typedef struct {
>> +    int num;
>> +    int size;
>> +    uint8_t **address;
>> +} multifd_pages_t;
>
> The naming is odd for QEMU; should be MultiFDPages ?
> You might want to make num & size unsigned.

ok.

> I was trying to understand why you store 'size' - is that because you worry about
> someone changing the size parameter while we're running?  But given that we call
> init in a few places I'm not sure it covers it.

No.  I am planning to chang the code so I call:

ram_save_block(&multifd_pages, &size,...)

And it returns at most "size" pages, depending on whatever it is easy to
do.  Plan is not to change blocks, or do any expensive operation.  Just
return the next "size" pages available that are easy to pick.


>> +    pages->num = 0;
>> +    pages->size = migrate_multifd_group();
>> +    pages->address = g_malloc0(pages->size * sizeof(uint8_t *));
>
> g_new0(uint8_t *, pages->size)

changing it, I don't  really care O:-)

>> +}
>> +
>>  void migrate_multifd_send_threads_create(void)
>>  {
>>      int i, thread_count;
>> @@ -491,7 +505,7 @@ void migrate_multifd_send_threads_create(void)
>>          multifd_send[i].quit = false;
>>          multifd_send[i].started = false;
>>          multifd_send[i].done = true;
>> -        multifd_send[i].address = 0;
>> +        multifd_init_group(&multifd_send[i].pages);
>>          multifd_send[i].c = socket_send_channel_create();
>>          if(!multifd_send[i].c) {
>>              error_report("Error creating a send channel");
>> @@ -511,8 +525,22 @@ void migrate_multifd_send_threads_create(void)
>> 
>>  static int multifd_send_page(uint8_t *address)
>
> Can you comment multifd_send_page to explain what it returns.
>
> (Do we really need u16 for fd number? More than 256 streams would seem
> surprising).

I *guess*, (big guess) that people are going to use the same number of
threads than cpus.  And we are going to have more than 256 in the near
future, su it looked more future proof to use uint16_t.  I don't really
care though.

Thanks, Juan.

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 14/17] migration: Create thread infrastructure for multifd recv side
  2017-02-03 11:24   ` Dr. David Alan Gilbert
@ 2017-02-13 16:56     ` Juan Quintela
  2017-02-14 11:34       ` Dr. David Alan Gilbert
  0 siblings, 1 reply; 57+ messages in thread
From: Juan Quintela @ 2017-02-13 16:56 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, amit.shah

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> We make the locking and the transfer of information specific, even if we
>> are still receiving things through the main thread.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/ram.c | 77 +++++++++++++++++++++++++++++++++++++++++++++++++--------
>>  1 file changed, 67 insertions(+), 10 deletions(-)
>> 
>> diff --git a/migration/ram.c b/migration/ram.c
>> index ca94704..4e530ea 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -523,7 +523,7 @@ void migrate_multifd_send_threads_create(void)
>>      }
>>  }
>> 
>> -static int multifd_send_page(uint8_t *address)
>> +static uint16_t multifd_send_page(uint8_t *address, bool last_page)
>>  {
>>      int i, j, thread_count;
>>      bool found = false;
>> @@ -538,8 +538,10 @@ static int multifd_send_page(uint8_t *address)
>>      pages.address[pages.num] = address;
>>      pages.num++;
>> 
>> -    if (pages.num < (pages.size - 1)) {
>> -        return UINT16_MAX;
>> +    if (!last_page) {
>> +        if (pages.num < (pages.size - 1)) {
>> +            return UINT16_MAX;
>> +        }
>>      }
>
> This should be in the previous patch?
> (and the place that adds the last_page parameter below)?

ok.

>> @@ -2920,10 +2980,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>> 
>>          case RAM_SAVE_FLAG_MULTIFD_PAGE:
>>              fd_num = qemu_get_be16(f);
>> -            if (fd_num != 0) {
>> -                /* this is yet an unused variable, changed later */
>> -                fd_num = fd_num;
>> -            }
>> +            multifd_recv_page(host, fd_num);
>
> This is going to be quite tricky to fit into ram_load_postcopy
> in this form; somehow it's going to have to find addresses to use for place page
> and with anything with a page size != target page size it gets messy.

What do you have in mind?

Later, Juan.

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 15/17] migration: Test new fd infrastructure
  2017-02-03 11:36   ` Dr. David Alan Gilbert
@ 2017-02-13 16:57     ` Juan Quintela
  2017-02-14 11:05       ` Dr. David Alan Gilbert
  0 siblings, 1 reply; 57+ messages in thread
From: Juan Quintela @ 2017-02-13 16:57 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, amit.shah

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> We just send the address through the alternate channels and test that it
>> is ok.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/ram.c | 36 ++++++++++++++++++++++++++++++++++++
>>  1 file changed, 36 insertions(+)
>> 
>> diff --git a/migration/ram.c b/migration/ram.c
>> index 4e530ea..95af694 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -432,8 +432,22 @@ static void *multifd_send_thread(void *opaque)
>>      qemu_mutex_lock(&params->mutex);
>>      while (!params->quit){
>>          if (params->pages.num) {
>> +            int i;
>> +            int num;
>> +
>> +            num = params->pages.num;
>>              params->pages.num = 0;
>>              qemu_mutex_unlock(&params->mutex);
>> +
>> +            for(i=0; i < num; i++) {
>> +                if (qio_channel_write(params->c,
>> +                                      (const char *)&params->pages.address[i],
>> +                                      sizeof(uint8_t *), &error_abort)
>> +                    != sizeof(uint8_t*)) {
>> +                    /* Shuoudn't ever happen */
>> +                    exit(-1);
>> +                }
>
> Nope, need to find a way to cleanly find the migration; that
> might actually be tricky from one of these threads?

It is tricky, but the error handling is wrong in the callers of this
already.  Will try to improve it on next series, but the problem is
already there.

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 17/17] migration: flush receive queue
  2017-02-03 12:28   ` Dr. David Alan Gilbert
@ 2017-02-13 17:13     ` Juan Quintela
  0 siblings, 0 replies; 57+ messages in thread
From: Juan Quintela @ 2017-02-13 17:13 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, amit.shah

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> Each time that we sync the bitmap, it is a possiblity that we receive
>> a page that is being processed by a different thread.  We fix this
>> problem just making sure that we wait for all receiving threads to
>> finish its work before we procedeed with the next stage.
>> 
>> We are low on page flags, so we use a combination that is not valid to
>> emit that message:  MULTIFD_PAGE and COMPRESSED.
>> 
>> I tried to make a migration command for it, but it don't work because
>> we sync the bitmap sometimes when we have already sent the beggining
>> of the section, so I just added a new page flag.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>
> Is there something that makes sure that the very last page is marked
> with a flush to ensure that the read completes before we start
> trying to do anything after ram_load ?
> As I read this the flag gets added to the first page of the next round.

Yes.  It is easier.  We handle this flag before we handle the following
page, so it is safe, no?

>> 
>> +        if ((flags & (RAM_SAVE_FLAG_MULTIFD_PAGE|RAM_SAVE_FLAG_COMPRESS))
>> +                  == (RAM_SAVE_FLAG_MULTIFD_PAGE|RAM_SAVE_FLAG_COMPRESS)) {
>> +            multifd_flush();
>> +            flags = flags & ~RAM_SAVE_FLAG_COMPRESS;
>> +        }
>>          if (flags & (RAM_SAVE_FLAG_COMPRESS | RAM_SAVE_FLAG_PAGE |
>>                       RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
>>                       RAM_SAVE_FLAG_MULTIFD_PAGE)) {

Here is where we make sure than we have done.

<think, think , think ...>

Ok,  I think what you mean, what we do for the last page of the last
round.  Will try to move it to the place that you asked for.  Not
trivial because we generate the header before we do anything else, and
the last page could not be a multifd one.  Will think about it.

Thanks, Juan.


>> -- 
>> 2.9.3
>> 
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 09/17] migration: Start of multiple fd work
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 09/17] migration: Start of multiple fd work Juan Quintela
  2017-01-27 17:45   ` Dr. David Alan Gilbert
@ 2017-02-13 17:35   ` Daniel P. Berrange
  2017-02-15 14:46     ` Dr. David Alan Gilbert
  1 sibling, 1 reply; 57+ messages in thread
From: Daniel P. Berrange @ 2017-02-13 17:35 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah, dgilbert

On Mon, Jan 23, 2017 at 10:32:13PM +0100, Juan Quintela wrote:
> We create new channels for each new thread created. We only send through
> them a character to be sure that we are creating the channels in the
> right order.
> 
> Note: Reference count/freeing of channels is not done
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  include/migration/migration.h |  6 +++++
>  migration/ram.c               | 45 +++++++++++++++++++++++++++++++++-
>  migration/socket.c            | 56 +++++++++++++++++++++++++++++++++++++++++--

BTW, right now libvirt never uses QEMU's tcp: protocol - it does everything
with the fd: protocol.  So either we need multi-fd support for fd: protocol,
or libvirt needs to switch to use tcp:

In fact, having said that, we're going to have to switch to use  the tcp:
protocol anyway in order to support TLS, so this is just another good
reason for the switch.

We avoided tcp: in the past because QEMU was incapable of reporting error
messages when the connection failed. That's fixed since

  commit d59ce6f34434bf47a9b26138c908650bf9a24be1
  Author: Daniel P. Berrange <berrange@redhat.com>
  Date:   Wed Apr 27 11:05:00 2016 +0100

    migration: add reporting of errors for outgoing migration

so libvirt should be ok to use tcp: now.

>  3 files changed, 104 insertions(+), 3 deletions(-)
> 
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index f119ba0..3989bd6 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -22,6 +22,7 @@
>  #include "qapi-types.h"
>  #include "exec/cpu-common.h"
>  #include "qemu/coroutine_int.h"
> +#include "io/channel.h"
> 
>  #define QEMU_VM_FILE_MAGIC           0x5145564d
>  #define QEMU_VM_FILE_VERSION_COMPAT  0x00000002
> @@ -218,6 +219,11 @@ void tcp_start_incoming_migration(const char *host_port, Error **errp);
> 
>  void tcp_start_outgoing_migration(MigrationState *s, const char *host_port, Error **errp);
> 
> +QIOChannel *socket_recv_channel_create(void);
> +int socket_recv_channel_destroy(QIOChannel *recv);
> +QIOChannel *socket_send_channel_create(void);
> +int socket_send_channel_destroy(QIOChannel *send);
> +
>  void unix_start_incoming_migration(const char *path, Error **errp);
> 
>  void unix_start_outgoing_migration(MigrationState *s, const char *path, Error **errp);
> diff --git a/migration/ram.c b/migration/ram.c
> index 939f364..5ad7cb3 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -386,9 +386,11 @@ void migrate_compress_threads_create(void)
> 
>  struct MultiFDSendParams {
>      QemuThread thread;
> +    QIOChannel *c;
>      QemuCond cond;
>      QemuMutex mutex;
>      bool quit;
> +    bool started;
>  };
>  typedef struct MultiFDSendParams MultiFDSendParams;
> 
> @@ -397,6 +399,13 @@ static MultiFDSendParams *multifd_send;
>  static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *params = opaque;
> +    char start = 's';
> +
> +    qio_channel_write(params->c, &start, 1, &error_abort);
> +    qemu_mutex_lock(&params->mutex);
> +    params->started = true;
> +    qemu_cond_signal(&params->cond);
> +    qemu_mutex_unlock(&params->mutex);
> 
>      qemu_mutex_lock(&params->mutex);
>      while (!params->quit){
> @@ -433,6 +442,7 @@ void migrate_multifd_send_threads_join(void)
>          qemu_thread_join(&multifd_send[i].thread);
>          qemu_mutex_destroy(&multifd_send[i].mutex);
>          qemu_cond_destroy(&multifd_send[i].cond);
> +        socket_send_channel_destroy(multifd_send[i].c);
>      }
>      g_free(multifd_send);
>      multifd_send = NULL;
> @@ -452,18 +462,31 @@ void migrate_multifd_send_threads_create(void)
>          qemu_mutex_init(&multifd_send[i].mutex);
>          qemu_cond_init(&multifd_send[i].cond);
>          multifd_send[i].quit = false;
> +        multifd_send[i].started = false;
> +        multifd_send[i].c = socket_send_channel_create();
> +        if(!multifd_send[i].c) {
> +            error_report("Error creating a send channel");
> +            exit(0);
> +        }
>          snprintf(thread_name, 15, "multifd_send_%d", i);
>          qemu_thread_create(&multifd_send[i].thread, thread_name,
>                             multifd_send_thread, &multifd_send[i],
>                             QEMU_THREAD_JOINABLE);
> +        qemu_mutex_lock(&multifd_send[i].mutex);
> +        while (!multifd_send[i].started) {
> +            qemu_cond_wait(&multifd_send[i].cond, &multifd_send[i].mutex);
> +        }
> +        qemu_mutex_unlock(&multifd_send[i].mutex);
>      }
>  }
> 
>  struct MultiFDRecvParams {
>      QemuThread thread;
> +    QIOChannel *c;
>      QemuCond cond;
>      QemuMutex mutex;
>      bool quit;
> +    bool started;
>  };
>  typedef struct MultiFDRecvParams MultiFDRecvParams;
> 
> @@ -472,7 +495,14 @@ static MultiFDRecvParams *multifd_recv;
>  static void *multifd_recv_thread(void *opaque)
>  {
>      MultiFDRecvParams *params = opaque;
> - 
> +    char start;
> +
> +    qio_channel_read(params->c, &start, 1, &error_abort);
> +    qemu_mutex_lock(&params->mutex);
> +    params->started = true;
> +    qemu_cond_signal(&params->cond);
> +    qemu_mutex_unlock(&params->mutex);
> +
>      qemu_mutex_lock(&params->mutex);
>      while (!params->quit){
>          qemu_cond_wait(&params->cond, &params->mutex);
> @@ -508,6 +538,7 @@ void migrate_multifd_recv_threads_join(void)
>          qemu_thread_join(&multifd_recv[i].thread);
>          qemu_mutex_destroy(&multifd_recv[i].mutex);
>          qemu_cond_destroy(&multifd_recv[i].cond);
> +        socket_send_channel_destroy(multifd_recv[i].c);
>      }
>      g_free(multifd_recv);
>      multifd_recv = NULL;
> @@ -526,9 +557,21 @@ void migrate_multifd_recv_threads_create(void)
>          qemu_mutex_init(&multifd_recv[i].mutex);
>          qemu_cond_init(&multifd_recv[i].cond);
>          multifd_recv[i].quit = false;
> +        multifd_recv[i].started = false;
> +        multifd_recv[i].c = socket_recv_channel_create();
> +
> +        if(!multifd_recv[i].c) {
> +            error_report("Error creating a recv channel");
> +            exit(0);
> +        }
>          qemu_thread_create(&multifd_recv[i].thread, "multifd_recv",
>                             multifd_recv_thread, &multifd_recv[i],
>                             QEMU_THREAD_JOINABLE);
> +        qemu_mutex_lock(&multifd_recv[i].mutex);
> +        while (!multifd_recv[i].started) {
> +            qemu_cond_wait(&multifd_recv[i].cond, &multifd_recv[i].mutex);
> +        }
> +        qemu_mutex_unlock(&multifd_recv[i].mutex);
>      }
>  }
> 
> diff --git a/migration/socket.c b/migration/socket.c
> index 11f80b1..7cd9213 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -24,6 +24,54 @@
>  #include "io/channel-socket.h"
>  #include "trace.h"
> 
> +struct SocketArgs {
> +    QIOChannelSocket *ioc;
> +    SocketAddress *saddr;
> +    Error **errp;
> +} socket_args;

Passing data from one method to another indirectly via this random
global var feels rather dirty, since two different pairs of methods
are both using the same global var. It happens to be ok since one
pair of methods is only ever called on the target, and one pair is
only ever called on the source. It is recipe for future unpleasant
surprises though, so I think this needs rethinking.

> +QIOChannel *socket_recv_channel_create(void)
> +{
> +    QIOChannelSocket *sioc;
> +    Error *err = NULL;
> +
> +    sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(socket_args.ioc),
> +                                     &err);
> +    if (!sioc) {
> +        error_report("could not accept migration connection (%s)",
> +                     error_get_pretty(err));
> +        return NULL;
> +    }
> +    return QIO_CHANNEL(sioc);
> +}
> +
> +int socket_recv_channel_destroy(QIOChannel *recv)
> +{
> +    // Remove channel
> +    object_unref(OBJECT(send));
> +    return 0;
> +}
> +
> +QIOChannel *socket_send_channel_create(void)
> +{
> +    QIOChannelSocket *sioc = qio_channel_socket_new();
> +
> +    qio_channel_socket_connect_sync(sioc, socket_args.saddr,
> +                                    socket_args.errp);
> +    qio_channel_set_delay(QIO_CHANNEL(sioc), false);
> +    return QIO_CHANNEL(sioc);
> +}
> +
> +int socket_send_channel_destroy(QIOChannel *send)
> +{
> +    // Remove channel
> +    object_unref(OBJECT(send));
> +    if (socket_args.saddr) {
> +        qapi_free_SocketAddress(socket_args.saddr);
> +        socket_args.saddr = NULL;
> +    }
> +    return 0;
> +}
> 
>  static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
>  {
> @@ -96,6 +144,10 @@ static void socket_start_outgoing_migration(MigrationState *s,
>      struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
> 
>      data->s = s;
> +
> +    socket_args.saddr = saddr;
> +    socket_args.errp = errp;
> +
>      if (saddr->type == SOCKET_ADDRESS_KIND_INET) {
>          data->hostname = g_strdup(saddr->u.inet.data->host);
>      }
> @@ -106,7 +158,6 @@ static void socket_start_outgoing_migration(MigrationState *s,
>                                       socket_outgoing_migration,
>                                       data,
>                                       socket_connect_data_free);
> -    qapi_free_SocketAddress(saddr);
>  }
> 
>  void tcp_start_outgoing_migration(MigrationState *s,
> @@ -154,7 +205,7 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
> 
>  out:
>      /* Close listening socket as its no longer needed */
> -    qio_channel_close(ioc, NULL);
> +//    qio_channel_close(ioc, NULL);
>      return FALSE; /* unregister */
>  }

If you changed this to return TRUE, then this existing code would be
automatically invoked when the client makes its 2nd, 3rd, etc
connection. You'd just have to put some logic in
migration_channel_process_incoming to take different behaviour when
seeing the 1st vs the additional connections.


> 
> @@ -163,6 +214,7 @@ static void socket_start_incoming_migration(SocketAddress *saddr,
>                                              Error **errp)
>  {
>      QIOChannelSocket *listen_ioc = qio_channel_socket_new();
> +    socket_args.ioc = listen_ioc;
> 
>      qio_channel_set_name(QIO_CHANNEL(listen_ioc),
>                           "migration-socket-listener");



Regards,
Daniel
-- 
|: http://berrange.com      -o-    http://www.flickr.com/photos/dberrange/ :|
|: http://libvirt.org              -o-             http://virt-manager.org :|
|: http://entangle-photo.org       -o-    http://search.cpan.org/~danberr/ :|

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 15/17] migration: Test new fd infrastructure
  2017-02-13 16:57     ` Juan Quintela
@ 2017-02-14 11:05       ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 57+ messages in thread
From: Dr. David Alan Gilbert @ 2017-02-14 11:05 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> > * Juan Quintela (quintela@redhat.com) wrote:
> >> We just send the address through the alternate channels and test that it
> >> is ok.
> >> 
> >> Signed-off-by: Juan Quintela <quintela@redhat.com>
> >> ---
> >>  migration/ram.c | 36 ++++++++++++++++++++++++++++++++++++
> >>  1 file changed, 36 insertions(+)
> >> 
> >> diff --git a/migration/ram.c b/migration/ram.c
> >> index 4e530ea..95af694 100644
> >> --- a/migration/ram.c
> >> +++ b/migration/ram.c
> >> @@ -432,8 +432,22 @@ static void *multifd_send_thread(void *opaque)
> >>      qemu_mutex_lock(&params->mutex);
> >>      while (!params->quit){
> >>          if (params->pages.num) {
> >> +            int i;
> >> +            int num;
> >> +
> >> +            num = params->pages.num;
> >>              params->pages.num = 0;
> >>              qemu_mutex_unlock(&params->mutex);
> >> +
> >> +            for(i=0; i < num; i++) {
> >> +                if (qio_channel_write(params->c,
> >> +                                      (const char *)&params->pages.address[i],
> >> +                                      sizeof(uint8_t *), &error_abort)
> >> +                    != sizeof(uint8_t*)) {
> >> +                    /* Shuoudn't ever happen */
> >> +                    exit(-1);
> >> +                }
> >
> > Nope, need to find a way to cleanly find the migration; that
> > might actually be tricky from one of these threads?
> 
> It is tricky, but the error handling is wrong in the callers of this
> already.  Will try to improve it on next series, but the problem is
> already there.

Well we should never kill the source because of a failed migration;
especially just due to a fairly boring IO error.
You could just exit the thread and return a marker value that gets
delivered to the pthread_join.
There is qemu_file_set_error() that you could call - although you'd
have to figure out thread safety.
(Which might be as simple as cmpxchg the error values and only
change the error value if it was previously 0).

Dave
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 15/17] migration: Test new fd infrastructure
  2017-01-23 21:32 ` [Qemu-devel] [PATCH 15/17] migration: Test new fd infrastructure Juan Quintela
  2017-02-03 11:36   ` Dr. David Alan Gilbert
@ 2017-02-14 11:15   ` Daniel P. Berrange
  1 sibling, 0 replies; 57+ messages in thread
From: Daniel P. Berrange @ 2017-02-14 11:15 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah, dgilbert

On Mon, Jan 23, 2017 at 10:32:19PM +0100, Juan Quintela wrote:
> We just send the address through the alternate channels and test that it
> is ok.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 36 ++++++++++++++++++++++++++++++++++++
>  1 file changed, 36 insertions(+)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 4e530ea..95af694 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -432,8 +432,22 @@ static void *multifd_send_thread(void *opaque)
>      qemu_mutex_lock(&params->mutex);
>      while (!params->quit){
>          if (params->pages.num) {
> +            int i;
> +            int num;
> +
> +            num = params->pages.num;

Is this likely to be a small or a large value ? .....

>              params->pages.num = 0;
>              qemu_mutex_unlock(&params->mutex);
> +
> +            for(i=0; i < num; i++) {
> +                if (qio_channel_write(params->c,
> +                                      (const char *)&params->pages.address[i],
> +                                      sizeof(uint8_t *), &error_abort)
> +                    != sizeof(uint8_t*)) {
> +                    /* Shuoudn't ever happen */
> +                    exit(-1);
> +                }
> +            }

If 'num' is large,then you would be better populating an iovec
and using qio_channel_writev() rather than sending one uint8_t *
at a time.


> @@ -605,8 +620,29 @@ static void *multifd_recv_thread(void *opaque)
>      qemu_mutex_lock(&params->mutex);
>      while (!params->quit){
>          if (params->pages.num) {
> +            int i;
> +            int num;
> +
> +            num = params->pages.num;
>              params->pages.num = 0;
>              qemu_mutex_unlock(&params->mutex);
> +
> +            for(i = 0; i < num; i++) {
> +                if (qio_channel_read(params->c,
> +                                     (char *)&recv_address,
> +                                     sizeof(uint8_t*), &error_abort)
> +                    != sizeof(uint8_t *)) {
> +                    /* shouldn't ever happen */
> +                    exit(-1);
> +                }
> +                if (recv_address != params->pages.address[i]) {
> +                    printf("We received %p what we were expecting %p (%d)\n",
> +                           recv_address,
> +                           params->pages.address[i], i);
> +                    exit(-1);
> +                }
> +            }

Same comment as above.

Regards,
Daniel
-- 
|: http://berrange.com      -o-    http://www.flickr.com/photos/dberrange/ :|
|: http://libvirt.org              -o-             http://virt-manager.org :|
|: http://entangle-photo.org       -o-    http://search.cpan.org/~danberr/ :|

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 10/17] migration: create ram_multifd_page
  2017-02-13 16:36     ` Juan Quintela
@ 2017-02-14 11:26       ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 57+ messages in thread
From: Dr. David Alan Gilbert @ 2017-02-14 11:26 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> > * Juan Quintela (quintela@redhat.com) wrote:
> >> The function still don't use multifd, but we have simplified
> >> ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
> >> counter and a new flag for this type of pages.
> 
> 
> >> +static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
> >> +                            bool last_stage, uint64_t *bytes_transferred)
> >> +{
> >> +    int pages;
> >> +    uint8_t *p;
> >> +    RAMBlock *block = pss->block;
> >> +    ram_addr_t offset = pss->offset;
> >> +
> >> +    p = block->host + offset;
> >> +
> >> +    if (block == last_sent_block) {
> >> +        offset |= RAM_SAVE_FLAG_CONTINUE;
> >> +    }
> >> +    pages = save_zero_page(f, block, offset, p, bytes_transferred);
> >> +    if (pages == -1) {
> >> +        *bytes_transferred +=
> >> +            save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
> >> +        qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
> >> +        *bytes_transferred += TARGET_PAGE_SIZE;
> >> +        pages = 1;
> >> +        acct_info.norm_pages++;
> >> +        acct_info.multifd_pages++;
> >> +    }
> >> +
> >> +    return pages;
> >> +}
> >> +
> >>  static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
> >>                                  ram_addr_t offset)
> >>  {
> >> @@ -1427,6 +1461,8 @@ static int ram_save_target_page(MigrationState *ms, QEMUFile *f,
> >>              res = ram_save_compressed_page(f, pss,
> >>                                             last_stage,
> >>                                             bytes_transferred);
> >> +        } else if (migrate_use_multifd()) {
> >> +            res = ram_multifd_page(f, pss, last_stage, bytes_transferred);
> >
> > I'm curious whether it's best to pick the destination fd at this level or one level
> > higher; for example would it be good to keep all the components of a
> > host page or huge
> > page together on the same fd? If so then it would be best to pick the fd
> > at ram_save_host_page level.
> 
> my plan here would be to change the migration code to be able to call
> with a bigger sizes, not page by page, and then the problem is solved by
> itself?

Yes it might be, but you may have to be careful to keep all your FDs busy;
for example, imagine that we're using huge pages, and you try and stuff
an entire hugepage down one FD, for 2MB hugepages that's about half of the
write buffer (tcp-wmem?) so there's a fair chance it'll block.  But thereagain
I think it's probably the right thing to do as long as you can get
different pages down different FDs.

Dave

> Later, Juan.
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 14/17] migration: Create thread infrastructure for multifd recv side
  2017-02-13 16:56     ` Juan Quintela
@ 2017-02-14 11:34       ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 57+ messages in thread
From: Dr. David Alan Gilbert @ 2017-02-14 11:34 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> > * Juan Quintela (quintela@redhat.com) wrote:
> >> We make the locking and the transfer of information specific, even if we
> >> are still receiving things through the main thread.
> >> 
> >> Signed-off-by: Juan Quintela <quintela@redhat.com>
> >> ---
> >>  migration/ram.c | 77 +++++++++++++++++++++++++++++++++++++++++++++++++--------
> >>  1 file changed, 67 insertions(+), 10 deletions(-)
> >> 
> >> diff --git a/migration/ram.c b/migration/ram.c
> >> index ca94704..4e530ea 100644
> >> --- a/migration/ram.c
> >> +++ b/migration/ram.c
> >> @@ -523,7 +523,7 @@ void migrate_multifd_send_threads_create(void)
> >>      }
> >>  }
> >> 
> >> -static int multifd_send_page(uint8_t *address)
> >> +static uint16_t multifd_send_page(uint8_t *address, bool last_page)
> >>  {
> >>      int i, j, thread_count;
> >>      bool found = false;
> >> @@ -538,8 +538,10 @@ static int multifd_send_page(uint8_t *address)
> >>      pages.address[pages.num] = address;
> >>      pages.num++;
> >> 
> >> -    if (pages.num < (pages.size - 1)) {
> >> -        return UINT16_MAX;
> >> +    if (!last_page) {
> >> +        if (pages.num < (pages.size - 1)) {
> >> +            return UINT16_MAX;
> >> +        }
> >>      }
> >
> > This should be in the previous patch?
> > (and the place that adds the last_page parameter below)?
> 
> ok.
> 
> >> @@ -2920,10 +2980,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
> >> 
> >>          case RAM_SAVE_FLAG_MULTIFD_PAGE:
> >>              fd_num = qemu_get_be16(f);
> >> -            if (fd_num != 0) {
> >> -                /* this is yet an unused variable, changed later */
> >> -                fd_num = fd_num;
> >> -            }
> >> +            multifd_recv_page(host, fd_num);
> >
> > This is going to be quite tricky to fit into ram_load_postcopy
> > in this form; somehow it's going to have to find addresses to use for place page
> > and with anything with a page size != target page size it gets messy.
> 
> What do you have in mind?

The problem is that for postcopy we read the data into a temporary buffer
and then call a system call to 'place' the page atomically in memory.
At the moment there's a single temporary buffer; for x86 this is easy -
read a page into buffer; place it.  For Power/ARM or hugepages we
read consecutive target-pages into the temporary buffer and at the end
of the page place the whole host/huge page at once.
If you're reading multiple pages in parallel then you're going to need
to take care with multiple temporary buffers; having one hugepage/hostpage
per fd would probably be the easiest way.

A related thing to take care of is that when switching to postcopy mode
we probably need to take care to sync all of the fds to make sure
any outstanding RAM load has completed before we start doing any postcopy
magic.

Dave

> Later, Juan.
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 11/17] migration: Create thread infrastructure for multifd send side
  2017-02-13 16:40     ` Juan Quintela
@ 2017-02-14 11:58       ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 57+ messages in thread
From: Dr. David Alan Gilbert @ 2017-02-14 11:58 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, amit.shah

* Juan Quintela (quintela@redhat.com) wrote:
> "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> > * Juan Quintela (quintela@redhat.com) wrote:
> >> We make the locking and the transfer of information specific, even if we
> >> are still transmiting things through the main thread.
> >> 
> >> Signed-off-by: Juan Quintela <quintela@redhat.com>
> >> ---
> >>  migration/ram.c | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
> >>  1 file changed, 52 insertions(+), 1 deletion(-)
> >> 
> >> diff --git a/migration/ram.c b/migration/ram.c
> >> index c71929e..9d7bc64 100644
> >> --- a/migration/ram.c
> >> +++ b/migration/ram.c
> >> @@ -392,17 +392,25 @@ void migrate_compress_threads_create(void)
> >>  /* Multiple fd's */
> >> 
> >>  struct MultiFDSendParams {
> >> +    /* not changed */
> >>      QemuThread thread;
> >>      QIOChannel *c;
> >>      QemuCond cond;
> >>      QemuMutex mutex;
> >> +    /* protected by param mutex */
> >>      bool quit;
> >>      bool started;
> >> +    uint8_t *address;
> >> +    /* protected by multifd mutex */
> >> +    bool done;
> >>  };
> >>  typedef struct MultiFDSendParams MultiFDSendParams;
> >> 
> >>  static MultiFDSendParams *multifd_send;
> >> 
> >> +QemuMutex multifd_send_mutex;
> >> +QemuCond multifd_send_cond;
> >> +
> >>  static void *multifd_send_thread(void *opaque)
> >>  {
> >>      MultiFDSendParams *params = opaque;
> >> @@ -416,7 +424,17 @@ static void *multifd_send_thread(void *opaque)
> >> 
> >>      qemu_mutex_lock(&params->mutex);
> >>      while (!params->quit){
> >> -        qemu_cond_wait(&params->cond, &params->mutex);
> >> +        if (params->address) {
> >> +            params->address = 0;
> >
> > This confused me (I wondered what happens to the 1st block) but
> > I see in the next patch this gets replaced by something more complex;
> > so I suggest just using params->dummy and commented it's about
> > to get replaced.
> 
> if you preffer, I wanted to minimize the change on the next patch,
> otherwise I also have to change the places where I check the value of
> address.
> 

OK, perhaps just adding a comment to say it's going to go in the
next patch would work.

> >> +    qemu_mutex_unlock(&multifd_send_mutex);
> >> +    qemu_mutex_lock(&multifd_send[i].mutex);
> >
> > Having a 'multifd_send_mutex' and a
> >          'multifd_send[i].mutex'
> > is pretty confusing!
> 
> For different reason, I have moved all the
> 
>   multifd_send[i]. to "p->"
> 
> Better?

Maybe!

> >
> >> +    multifd_send[i].address = address;
> >> +    qemu_cond_signal(&multifd_send[i].cond);
> >> +    qemu_mutex_unlock(&multifd_send[i].mutex);
> >> +
> >> +    return 0;
> >> +}
> >> +
> >>  struct MultiFDRecvParams {
> >>      QemuThread thread;
> >>      QIOChannel *c;
> >> @@ -1015,6 +1065,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
> >>          *bytes_transferred +=
> >>              save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
> >>          qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
> >> +        multifd_send_page(p);
> >>          *bytes_transferred += TARGET_PAGE_SIZE;
> >>          pages = 1;
> >>          acct_info.norm_pages++;
> >> -- 
> >> 2.9.3
> >
> > I think I'm pretty OK with this; but we'll see what it looks like
> > after you think about Paolo's suggestion; it does feel like it should
> > be possible to do the locking etc simpler; I just don't know how.
> 
> Locking can be simpler, but the problem is being speed :-(
> Paolo suggestion have helped.
> That our meassurement of bandwidth is lame, haven't :-(

Are you sure that your performance problems are anything to do with locking?

Dave

> Later, Juan.
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 09/17] migration: Start of multiple fd work
  2017-02-13 17:35   ` Daniel P. Berrange
@ 2017-02-15 14:46     ` Dr. David Alan Gilbert
  2017-02-15 15:01       ` Daniel P. Berrange
  0 siblings, 1 reply; 57+ messages in thread
From: Dr. David Alan Gilbert @ 2017-02-15 14:46 UTC (permalink / raw)
  To: Daniel P. Berrange; +Cc: Juan Quintela, qemu-devel

* Daniel P. Berrange (berrange@redhat.com) wrote:
> On Mon, Jan 23, 2017 at 10:32:13PM +0100, Juan Quintela wrote:
> > We create new channels for each new thread created. We only send through
> > them a character to be sure that we are creating the channels in the
> > right order.
> > 
> > Note: Reference count/freeing of channels is not done
> > 
> > Signed-off-by: Juan Quintela <quintela@redhat.com>
> > ---
> >  include/migration/migration.h |  6 +++++
> >  migration/ram.c               | 45 +++++++++++++++++++++++++++++++++-
> >  migration/socket.c            | 56 +++++++++++++++++++++++++++++++++++++++++--
> 
> BTW, right now libvirt never uses QEMU's tcp: protocol - it does everything
> with the fd: protocol.  So either we need multi-fd support for fd: protocol,
> or libvirt needs to switch to use tcp:

I thought using fd was safer than tcp: because of the race when something else
could listen on the proposed port on the incoming side between the point of libvirt
picking the port number and qemu starting.

> In fact, having said that, we're going to have to switch to use  the tcp:
> protocol anyway in order to support TLS, so this is just another good
> reason for the switch.

I thought you had a way of allowing fd to work for TLS?

Dave

> 
> We avoided tcp: in the past because QEMU was incapable of reporting error
> messages when the connection failed. That's fixed since
> 
>   commit d59ce6f34434bf47a9b26138c908650bf9a24be1
>   Author: Daniel P. Berrange <berrange@redhat.com>
>   Date:   Wed Apr 27 11:05:00 2016 +0100
> 
>     migration: add reporting of errors for outgoing migration
> 
> so libvirt should be ok to use tcp: now.
> 
> >  3 files changed, 104 insertions(+), 3 deletions(-)
> > 
> > diff --git a/include/migration/migration.h b/include/migration/migration.h
> > index f119ba0..3989bd6 100644
> > --- a/include/migration/migration.h
> > +++ b/include/migration/migration.h
> > @@ -22,6 +22,7 @@
> >  #include "qapi-types.h"
> >  #include "exec/cpu-common.h"
> >  #include "qemu/coroutine_int.h"
> > +#include "io/channel.h"
> > 
> >  #define QEMU_VM_FILE_MAGIC           0x5145564d
> >  #define QEMU_VM_FILE_VERSION_COMPAT  0x00000002
> > @@ -218,6 +219,11 @@ void tcp_start_incoming_migration(const char *host_port, Error **errp);
> > 
> >  void tcp_start_outgoing_migration(MigrationState *s, const char *host_port, Error **errp);
> > 
> > +QIOChannel *socket_recv_channel_create(void);
> > +int socket_recv_channel_destroy(QIOChannel *recv);
> > +QIOChannel *socket_send_channel_create(void);
> > +int socket_send_channel_destroy(QIOChannel *send);
> > +
> >  void unix_start_incoming_migration(const char *path, Error **errp);
> > 
> >  void unix_start_outgoing_migration(MigrationState *s, const char *path, Error **errp);
> > diff --git a/migration/ram.c b/migration/ram.c
> > index 939f364..5ad7cb3 100644
> > --- a/migration/ram.c
> > +++ b/migration/ram.c
> > @@ -386,9 +386,11 @@ void migrate_compress_threads_create(void)
> > 
> >  struct MultiFDSendParams {
> >      QemuThread thread;
> > +    QIOChannel *c;
> >      QemuCond cond;
> >      QemuMutex mutex;
> >      bool quit;
> > +    bool started;
> >  };
> >  typedef struct MultiFDSendParams MultiFDSendParams;
> > 
> > @@ -397,6 +399,13 @@ static MultiFDSendParams *multifd_send;
> >  static void *multifd_send_thread(void *opaque)
> >  {
> >      MultiFDSendParams *params = opaque;
> > +    char start = 's';
> > +
> > +    qio_channel_write(params->c, &start, 1, &error_abort);
> > +    qemu_mutex_lock(&params->mutex);
> > +    params->started = true;
> > +    qemu_cond_signal(&params->cond);
> > +    qemu_mutex_unlock(&params->mutex);
> > 
> >      qemu_mutex_lock(&params->mutex);
> >      while (!params->quit){
> > @@ -433,6 +442,7 @@ void migrate_multifd_send_threads_join(void)
> >          qemu_thread_join(&multifd_send[i].thread);
> >          qemu_mutex_destroy(&multifd_send[i].mutex);
> >          qemu_cond_destroy(&multifd_send[i].cond);
> > +        socket_send_channel_destroy(multifd_send[i].c);
> >      }
> >      g_free(multifd_send);
> >      multifd_send = NULL;
> > @@ -452,18 +462,31 @@ void migrate_multifd_send_threads_create(void)
> >          qemu_mutex_init(&multifd_send[i].mutex);
> >          qemu_cond_init(&multifd_send[i].cond);
> >          multifd_send[i].quit = false;
> > +        multifd_send[i].started = false;
> > +        multifd_send[i].c = socket_send_channel_create();
> > +        if(!multifd_send[i].c) {
> > +            error_report("Error creating a send channel");
> > +            exit(0);
> > +        }
> >          snprintf(thread_name, 15, "multifd_send_%d", i);
> >          qemu_thread_create(&multifd_send[i].thread, thread_name,
> >                             multifd_send_thread, &multifd_send[i],
> >                             QEMU_THREAD_JOINABLE);
> > +        qemu_mutex_lock(&multifd_send[i].mutex);
> > +        while (!multifd_send[i].started) {
> > +            qemu_cond_wait(&multifd_send[i].cond, &multifd_send[i].mutex);
> > +        }
> > +        qemu_mutex_unlock(&multifd_send[i].mutex);
> >      }
> >  }
> > 
> >  struct MultiFDRecvParams {
> >      QemuThread thread;
> > +    QIOChannel *c;
> >      QemuCond cond;
> >      QemuMutex mutex;
> >      bool quit;
> > +    bool started;
> >  };
> >  typedef struct MultiFDRecvParams MultiFDRecvParams;
> > 
> > @@ -472,7 +495,14 @@ static MultiFDRecvParams *multifd_recv;
> >  static void *multifd_recv_thread(void *opaque)
> >  {
> >      MultiFDRecvParams *params = opaque;
> > - 
> > +    char start;
> > +
> > +    qio_channel_read(params->c, &start, 1, &error_abort);
> > +    qemu_mutex_lock(&params->mutex);
> > +    params->started = true;
> > +    qemu_cond_signal(&params->cond);
> > +    qemu_mutex_unlock(&params->mutex);
> > +
> >      qemu_mutex_lock(&params->mutex);
> >      while (!params->quit){
> >          qemu_cond_wait(&params->cond, &params->mutex);
> > @@ -508,6 +538,7 @@ void migrate_multifd_recv_threads_join(void)
> >          qemu_thread_join(&multifd_recv[i].thread);
> >          qemu_mutex_destroy(&multifd_recv[i].mutex);
> >          qemu_cond_destroy(&multifd_recv[i].cond);
> > +        socket_send_channel_destroy(multifd_recv[i].c);
> >      }
> >      g_free(multifd_recv);
> >      multifd_recv = NULL;
> > @@ -526,9 +557,21 @@ void migrate_multifd_recv_threads_create(void)
> >          qemu_mutex_init(&multifd_recv[i].mutex);
> >          qemu_cond_init(&multifd_recv[i].cond);
> >          multifd_recv[i].quit = false;
> > +        multifd_recv[i].started = false;
> > +        multifd_recv[i].c = socket_recv_channel_create();
> > +
> > +        if(!multifd_recv[i].c) {
> > +            error_report("Error creating a recv channel");
> > +            exit(0);
> > +        }
> >          qemu_thread_create(&multifd_recv[i].thread, "multifd_recv",
> >                             multifd_recv_thread, &multifd_recv[i],
> >                             QEMU_THREAD_JOINABLE);
> > +        qemu_mutex_lock(&multifd_recv[i].mutex);
> > +        while (!multifd_recv[i].started) {
> > +            qemu_cond_wait(&multifd_recv[i].cond, &multifd_recv[i].mutex);
> > +        }
> > +        qemu_mutex_unlock(&multifd_recv[i].mutex);
> >      }
> >  }
> > 
> > diff --git a/migration/socket.c b/migration/socket.c
> > index 11f80b1..7cd9213 100644
> > --- a/migration/socket.c
> > +++ b/migration/socket.c
> > @@ -24,6 +24,54 @@
> >  #include "io/channel-socket.h"
> >  #include "trace.h"
> > 
> > +struct SocketArgs {
> > +    QIOChannelSocket *ioc;
> > +    SocketAddress *saddr;
> > +    Error **errp;
> > +} socket_args;
> 
> Passing data from one method to another indirectly via this random
> global var feels rather dirty, since two different pairs of methods
> are both using the same global var. It happens to be ok since one
> pair of methods is only ever called on the target, and one pair is
> only ever called on the source. It is recipe for future unpleasant
> surprises though, so I think this needs rethinking.
> 
> > +QIOChannel *socket_recv_channel_create(void)
> > +{
> > +    QIOChannelSocket *sioc;
> > +    Error *err = NULL;
> > +
> > +    sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(socket_args.ioc),
> > +                                     &err);
> > +    if (!sioc) {
> > +        error_report("could not accept migration connection (%s)",
> > +                     error_get_pretty(err));
> > +        return NULL;
> > +    }
> > +    return QIO_CHANNEL(sioc);
> > +}
> > +
> > +int socket_recv_channel_destroy(QIOChannel *recv)
> > +{
> > +    // Remove channel
> > +    object_unref(OBJECT(send));
> > +    return 0;
> > +}
> > +
> > +QIOChannel *socket_send_channel_create(void)
> > +{
> > +    QIOChannelSocket *sioc = qio_channel_socket_new();
> > +
> > +    qio_channel_socket_connect_sync(sioc, socket_args.saddr,
> > +                                    socket_args.errp);
> > +    qio_channel_set_delay(QIO_CHANNEL(sioc), false);
> > +    return QIO_CHANNEL(sioc);
> > +}
> > +
> > +int socket_send_channel_destroy(QIOChannel *send)
> > +{
> > +    // Remove channel
> > +    object_unref(OBJECT(send));
> > +    if (socket_args.saddr) {
> > +        qapi_free_SocketAddress(socket_args.saddr);
> > +        socket_args.saddr = NULL;
> > +    }
> > +    return 0;
> > +}
> > 
> >  static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
> >  {
> > @@ -96,6 +144,10 @@ static void socket_start_outgoing_migration(MigrationState *s,
> >      struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
> > 
> >      data->s = s;
> > +
> > +    socket_args.saddr = saddr;
> > +    socket_args.errp = errp;
> > +
> >      if (saddr->type == SOCKET_ADDRESS_KIND_INET) {
> >          data->hostname = g_strdup(saddr->u.inet.data->host);
> >      }
> > @@ -106,7 +158,6 @@ static void socket_start_outgoing_migration(MigrationState *s,
> >                                       socket_outgoing_migration,
> >                                       data,
> >                                       socket_connect_data_free);
> > -    qapi_free_SocketAddress(saddr);
> >  }
> > 
> >  void tcp_start_outgoing_migration(MigrationState *s,
> > @@ -154,7 +205,7 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
> > 
> >  out:
> >      /* Close listening socket as its no longer needed */
> > -    qio_channel_close(ioc, NULL);
> > +//    qio_channel_close(ioc, NULL);
> >      return FALSE; /* unregister */
> >  }
> 
> If you changed this to return TRUE, then this existing code would be
> automatically invoked when the client makes its 2nd, 3rd, etc
> connection. You'd just have to put some logic in
> migration_channel_process_incoming to take different behaviour when
> seeing the 1st vs the additional connections.
> 
> 
> > 
> > @@ -163,6 +214,7 @@ static void socket_start_incoming_migration(SocketAddress *saddr,
> >                                              Error **errp)
> >  {
> >      QIOChannelSocket *listen_ioc = qio_channel_socket_new();
> > +    socket_args.ioc = listen_ioc;
> > 
> >      qio_channel_set_name(QIO_CHANNEL(listen_ioc),
> >                           "migration-socket-listener");
> 
> 
> 
> Regards,
> Daniel
> -- 
> |: http://berrange.com      -o-    http://www.flickr.com/photos/dberrange/ :|
> |: http://libvirt.org              -o-             http://virt-manager.org :|
> |: http://entangle-photo.org       -o-    http://search.cpan.org/~danberr/ :|
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 57+ messages in thread

* Re: [Qemu-devel] [PATCH 09/17] migration: Start of multiple fd work
  2017-02-15 14:46     ` Dr. David Alan Gilbert
@ 2017-02-15 15:01       ` Daniel P. Berrange
  0 siblings, 0 replies; 57+ messages in thread
From: Daniel P. Berrange @ 2017-02-15 15:01 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: Juan Quintela, qemu-devel

On Wed, Feb 15, 2017 at 02:46:15PM +0000, Dr. David Alan Gilbert wrote:
> * Daniel P. Berrange (berrange@redhat.com) wrote:
> > On Mon, Jan 23, 2017 at 10:32:13PM +0100, Juan Quintela wrote:
> > > We create new channels for each new thread created. We only send through
> > > them a character to be sure that we are creating the channels in the
> > > right order.
> > > 
> > > Note: Reference count/freeing of channels is not done
> > > 
> > > Signed-off-by: Juan Quintela <quintela@redhat.com>
> > > ---
> > >  include/migration/migration.h |  6 +++++
> > >  migration/ram.c               | 45 +++++++++++++++++++++++++++++++++-
> > >  migration/socket.c            | 56 +++++++++++++++++++++++++++++++++++++++++--
> > 
> > BTW, right now libvirt never uses QEMU's tcp: protocol - it does everything
> > with the fd: protocol.  So either we need multi-fd support for fd: protocol,
> > or libvirt needs to switch to use tcp:
> 
> I thought using fd was safer than tcp: because of the race when something else
> could listen on the proposed port on the incoming side between the point of libvirt
> picking the port number and qemu starting.

Hmm, good point.

> > In fact, having said that, we're going to have to switch to use  the tcp:
> > protocol anyway in order to support TLS, so this is just another good
> > reason for the switch.
> 
> I thought you had a way of allowing fd to work for TLS?

Oh yes, I forgot that I made that work :-)

Regards,
Daniel
-- 
|: http://berrange.com      -o-    http://www.flickr.com/photos/dberrange/ :|
|: http://libvirt.org              -o-             http://virt-manager.org :|
|: http://entangle-photo.org       -o-    http://search.cpan.org/~danberr/ :|

^ permalink raw reply	[flat|nested] 57+ messages in thread

end of thread, other threads:[~2017-02-15 15:01 UTC | newest]

Thread overview: 57+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2017-01-23 21:32 [Qemu-devel] [PATCH 00/17] multifd v3 Juan Quintela
2017-01-23 21:32 ` [Qemu-devel] [PATCH 01/17] migration: transform remained DPRINTF into trace_ Juan Quintela
2017-01-24  2:20   ` Eric Blake
2017-01-24 12:20     ` Dr. David Alan Gilbert
2017-01-23 21:32 ` [Qemu-devel] [PATCH 02/17] migration: create Migration Incoming State at init time Juan Quintela
2017-01-23 21:32 ` [Qemu-devel] [PATCH 03/17] migration: Test for disabled features on reception Juan Quintela
2017-01-24 10:33   ` Dr. David Alan Gilbert
2017-02-09 17:12     ` Juan Quintela
2017-01-23 21:32 ` [Qemu-devel] [PATCH 04/17] migration: Don't create decompression threads if not enabled Juan Quintela
2017-01-23 21:32 ` [Qemu-devel] [PATCH 05/17] migration: Add multifd capability Juan Quintela
2017-01-23 21:32 ` [Qemu-devel] [PATCH 06/17] migration: Create x-multifd-threads parameter Juan Quintela
2017-02-02 15:06   ` Eric Blake
2017-02-09 17:28     ` Juan Quintela
2017-01-23 21:32 ` [Qemu-devel] [PATCH 07/17] migration: Create x-multifd-group parameter Juan Quintela
2017-01-26 11:47   ` Dr. David Alan Gilbert
2017-01-23 21:32 ` [Qemu-devel] [PATCH 08/17] migration: create multifd migration threads Juan Quintela
2017-01-23 21:32 ` [Qemu-devel] [PATCH 09/17] migration: Start of multiple fd work Juan Quintela
2017-01-27 17:45   ` Dr. David Alan Gilbert
2017-02-13 16:34     ` Juan Quintela
2017-02-13 16:39       ` Dr. David Alan Gilbert
2017-02-13 17:35   ` Daniel P. Berrange
2017-02-15 14:46     ` Dr. David Alan Gilbert
2017-02-15 15:01       ` Daniel P. Berrange
2017-01-23 21:32 ` [Qemu-devel] [PATCH 10/17] migration: create ram_multifd_page Juan Quintela
2017-01-27 18:02   ` Dr. David Alan Gilbert
2017-01-30 10:06     ` Juan Quintela
2017-02-02 11:04       ` Dr. David Alan Gilbert
2017-02-13 16:36     ` Juan Quintela
2017-02-14 11:26       ` Dr. David Alan Gilbert
2017-02-02 11:20   ` Dr. David Alan Gilbert
2017-01-23 21:32 ` [Qemu-devel] [PATCH 11/17] migration: Create thread infrastructure for multifd send side Juan Quintela
2017-01-26 12:38   ` Paolo Bonzini
2017-02-13 16:38     ` Juan Quintela
2017-02-02 12:03   ` Dr. David Alan Gilbert
2017-02-13 16:40     ` Juan Quintela
2017-02-14 11:58       ` Dr. David Alan Gilbert
2017-01-23 21:32 ` [Qemu-devel] [PATCH 12/17] migration: really use multiple pages at a time Juan Quintela
2017-02-03 10:54   ` Dr. David Alan Gilbert
2017-02-13 16:47     ` Juan Quintela
2017-01-23 21:32 ` [Qemu-devel] [PATCH 13/17] migration: Send the fd number which we are going to use for this page Juan Quintela
2017-02-03 10:59   ` Dr. David Alan Gilbert
2017-01-23 21:32 ` [Qemu-devel] [PATCH 14/17] migration: Create thread infrastructure for multifd recv side Juan Quintela
2017-01-26 12:39   ` Paolo Bonzini
2017-02-03 11:24   ` Dr. David Alan Gilbert
2017-02-13 16:56     ` Juan Quintela
2017-02-14 11:34       ` Dr. David Alan Gilbert
2017-01-23 21:32 ` [Qemu-devel] [PATCH 15/17] migration: Test new fd infrastructure Juan Quintela
2017-02-03 11:36   ` Dr. David Alan Gilbert
2017-02-13 16:57     ` Juan Quintela
2017-02-14 11:05       ` Dr. David Alan Gilbert
2017-02-14 11:15   ` Daniel P. Berrange
2017-01-23 21:32 ` [Qemu-devel] [PATCH 16/17] migration: [HACK]Transfer pages over new channels Juan Quintela
2017-02-03 11:41   ` Dr. David Alan Gilbert
2017-01-23 21:32 ` [Qemu-devel] [PATCH 17/17] migration: flush receive queue Juan Quintela
2017-02-03 12:28   ` Dr. David Alan Gilbert
2017-02-13 17:13     ` Juan Quintela
2017-01-23 22:12 ` [Qemu-devel] [PATCH 00/17] multifd v3 no-reply

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.