All of lore.kernel.org
 help / color / mirror / Atom feed
* [Qemu-devel] [RFC 00/14] Multifd
@ 2018-01-10 12:47 Juan Quintela
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 01/14] migration: Make migrate_fd_error() the owner of the Error Juan Quintela
                   ` (14 more replies)
  0 siblings, 15 replies; 26+ messages in thread
From: Juan Quintela @ 2018-01-10 12:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Hi

For V10:
- It is a request for comments

- I changed as suggested from Paolo on KVM forum to send page metadata
  over the channels

- I simplified lots of things
- Added reviews
- it is not finished, in no-particular order:
  * old code is not removed for comparison for the people that looked at
    the previous one
  * it would be removed later
  * synchronization at the end of each stage will be done with a flags field
    on the communication channel.
- Yes, there is commented code on the last patch.
- I am open to suggestion about how to call the pacet "message" with
  metadata.

Please, look at it.

Later, Juan.


On v9 series for you:
- qobject_unref() as requested by dan

  Yes he was right, I had a reference leak for _non_ multifd, I
  *thought* he mean for multifd, and that took a while to understand
  (and then find when/where).

- multifd page count: it is dropped for good
- uuid handling: we use the default qemu uuid of 0000...
- uuid handling: using and struct and sending the struct
  * idea is to add a size field and add more parameter after that
  * anyone has a good idea how to "ouptut" info
    migrate_capabilities/parameters json into a string and how to read it back?
- changed how we test that all threads/channels are already created.
  Should be more robust.
- Add tests multifd.  Still not ported on top of migration-tests series sent early
  waiting for review on the ideas there.
- Rebase and remove al the integrated patches (back at 12)

Please, review.

Later, Juan.

[v8]
Things NOT done yet:

- drop x-multifd-page-count?  We can use performance to set a default value
- paolo suggestion of not having a control channel
  needs iyet more cleanups to be able to have more than one ramstate, trying it.
- still not performance done, but it has been very stable

On v8:
- use connect_async
- rename multifd-group to multifd-page-count (danp suggestion)
- rename multifd-threads to multifd-channels (danp suggestion)
- use new qio*channel functions
- Address rest of comments left


So, please review.

My idea will be to pull this changes and continue performance changes
for inside, basically everything is already reviewed.

Thanks, Juan.

On v7:
- tests fixed as danp wanted
- have to revert danp qio_*_all patches, as they break multifd, I have to investigate why.
- error_abort is gone.  After several tries about getting errors, I ended having a single error
  proceted by a lock and first error wins.
- Addressed basically all reviews (see on ToDo)
- Pointers to struct are done now
- fix lots of leaks
- lots of small fixes


[v6]
- Improve migration_ioc_porcess_incoming
- teach about G_SOURCE_REMOVE/CONTINUE
- Add test for migration_has_all_channels
- use DEFIN_PROP*
- change recv_state to use pointers to parameters
  make easier to receive channels out of order
- use g_strdup_printf()
- improve count of threads to know when we have to finish
- report channel id's on errors
- Use last_page parameter for multifd_send_page() sooner
- Improve commets for address
- use g_new0() instead of g_malloc()
- create MULTIFD_CONTINUE instead of using UINT16_MAX
- clear memory used by group of pages
  once there, pass everything to the global state variables instead of being
  local to the function.  This way it works if we cancel migration and start
  a new one
- Really wait to create the migration_thread until all channels are created
- split initial_bytes setup to make clearer following patches.
- createRAM_SAVE_FLAG_MULTIFD_SYNC macro, to make clear what we are doing
- move setting of need_flush to inside bitmap_sync
- Lots of other small changes & reorderings

Please, comment.


[v5]

- tests from qio functions (a.k.a. make danp happy)
- 1st message from one channel to the other contains:
   <uuid> multifd <channel number>
   This would allow us to create more channels as we want them.
   a.k.a. Making dave happy
- Waiting in reception for new channels using qio listeners
  Getting threads, qio and reference counters working at the same time
  was interesing.
  Another make danp happy.

- Lots and lots of small changes and fixes.  Notice that the last 70 patches
  that I merged or so what to make this series easier/smaller.

- NOT DONE: I haven't been woring on measuring performance
  differences, this was about getting the creation of the
  threads/channels right.

So, what I want:

- Are people happy with how I have (ab)used qio channels? (yes danp,
  that is you).
- My understanding is th

ToDo:

- Make paolo happy: He wanted to test using control information
  through each channel, not only pages.  This requires yet more
  cleanups to be able to have more than one QEMUFile/RAMState open at
  the same time.

- How I create multiple channels.  Things I know:
  * with current changes, it should work with fd/channels (the multifd bits),
    but we don;t have a way to pass multiple fd;s or exec files.
    Danp, any idea about how to create an UI for it?
  * My idea is that we would split current code to be:
    + channel creation at migration.c
    + rest of bits at ram.c
    + change format to:
      <uuid> main <rest of migration capabilities/paramentes> so we can check
      <uuid> postcopy <no clue what parameters are needed>
          Dave wanted a way to create a new fd for postcopy for some time
    + Adding new channels is easy

- Performance data/numbers: Yes, I wanted to get this out at once, I
  would continue with this.


Please, review.


[v4]
This is the 4th version of multifd. Changes:
- XBZRLE don't need to be checked for
- Documentation and defaults are consistent
- split socketArgs
- use iovec instead of creating something similar.
- We use now the exported size of target page (another HACK removal)
- created qio_chanel_{wirtev,readv}_all functions.  the _full() name
  was already taken.
  What they do is the same that the without _all() function, but if it
  returns due to blocking it redo the call.
- it is checkpatch.pl clean now.

Please comment, Juan.

Juan Quintela (14):
  migration: Make migrate_fd_error() the owner of the Error
  migration: Rename initial_bytes
  migration: Drop current address parameter from save_zero_page()
  migration: Start of multiple fd work
  migration: Create ram_multifd_page
  migration: Send the fd number which we are going to use for this page
  migration: Create thread infrastructure for multifd recv side
  migration: Transfer pages over new channels
  migration: Flush receive queue
  migration: Add multifd test
  LOCAL: use trace events for migration-test
  migration: Sent the page list over the normal thread
  migration: Add multifd_send_packet trace
  all works

 migration/channel.c    |   1 -
 migration/migration.c  |  39 ++--
 migration/migration.h  |   5 +-
 migration/ram.c        | 495 ++++++++++++++++++++++++++++++++++++++++++++++---
 migration/ram.h        |   3 +
 migration/socket.c     |  43 ++++-
 migration/socket.h     |  10 +
 migration/trace-events |   5 +
 tests/migration-test.c |  52 ++++++
 9 files changed, 614 insertions(+), 39 deletions(-)

-- 
2.14.3

^ permalink raw reply	[flat|nested] 26+ messages in thread

* [Qemu-devel] [PATCH v10 01/14] migration: Make migrate_fd_error() the owner of the Error
  2018-01-10 12:47 [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
@ 2018-01-10 12:47 ` Juan Quintela
  2018-01-12 18:50   ` Dr. David Alan Gilbert
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 02/14] migration: Rename initial_bytes Juan Quintela
                   ` (13 subsequent siblings)
  14 siblings, 1 reply; 26+ messages in thread
From: Juan Quintela @ 2018-01-10 12:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

So far, we had to free the error after each caller, so just do it
here.  Once there, tls.c was leaking the error.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/channel.c   |  1 -
 migration/migration.c | 10 ++++------
 migration/migration.h |  4 ++--
 migration/socket.c    |  1 -
 4 files changed, 6 insertions(+), 10 deletions(-)

diff --git a/migration/channel.c b/migration/channel.c
index 70ec7ea3b7..1dd2ae1530 100644
--- a/migration/channel.c
+++ b/migration/channel.c
@@ -71,7 +71,6 @@ void migration_channel_connect(MigrationState *s,
         migration_tls_channel_connect(s, ioc, hostname, &local_err);
         if (local_err) {
             migrate_fd_error(s, local_err);
-            error_free(local_err);
         }
     } else {
         QEMUFile *f = qemu_fopen_channel_output(ioc);
diff --git a/migration/migration.c b/migration/migration.c
index a5be4592a6..085e88c625 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -1159,16 +1159,14 @@ static void migrate_fd_cleanup(void *opaque)
     block_cleanup_parameters(s);
 }
 
-void migrate_set_error(MigrationState *s, const Error *error)
+void migrate_set_error(MigrationState *s, Error *error)
 {
     qemu_mutex_lock(&s->error_mutex);
-    if (!s->error) {
-        s->error = error_copy(error);
-    }
+    error_propagate(&s->error, error);
     qemu_mutex_unlock(&s->error_mutex);
 }
 
-void migrate_fd_error(MigrationState *s, const Error *error)
+void migrate_fd_error(MigrationState *s, Error *error)
 {
     trace_migrate_fd_error(error_get_pretty(error));
     assert(s->to_dst_file == NULL);
@@ -1448,7 +1446,7 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
     }
 
     if (local_err) {
-        migrate_fd_error(s, local_err);
+        migrate_fd_error(s, error_copy(local_err));
         error_propagate(errp, local_err);
         return;
     }
diff --git a/migration/migration.h b/migration/migration.h
index 2c8c53847a..29a7b79a39 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -177,8 +177,8 @@ bool  migration_has_all_channels(void);
 
 uint64_t migrate_max_downtime(void);
 
-void migrate_set_error(MigrationState *s, const Error *error);
-void migrate_fd_error(MigrationState *s, const Error *error);
+void migrate_set_error(MigrationState *s, Error *error);
+void migrate_fd_error(MigrationState *s, Error *error);
 
 void migrate_fd_connect(MigrationState *s);
 
diff --git a/migration/socket.c b/migration/socket.c
index 248a798543..6d49903978 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -81,7 +81,6 @@ static void socket_outgoing_migration(QIOTask *task,
     if (qio_task_propagate_error(task, &err)) {
         trace_migration_socket_outgoing_error(error_get_pretty(err));
         migrate_fd_error(data->s, err);
-        error_free(err);
     } else {
         trace_migration_socket_outgoing_connected(data->hostname);
         migration_channel_connect(data->s, sioc, data->hostname);
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 26+ messages in thread

* [Qemu-devel] [PATCH v10 02/14] migration: Rename initial_bytes
  2018-01-10 12:47 [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 01/14] migration: Make migrate_fd_error() the owner of the Error Juan Quintela
@ 2018-01-10 12:47 ` Juan Quintela
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 03/14] migration: Drop current address parameter from save_zero_page() Juan Quintela
                   ` (12 subsequent siblings)
  14 siblings, 0 replies; 26+ messages in thread
From: Juan Quintela @ 2018-01-10 12:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Now it is called qemu_file_bytes that reflects better what it does,
and we create qemu_file_bytes_now to not have to call qemu_ftell() twice.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/migration.c | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 085e88c625..e506b9c2c6 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -2217,13 +2217,13 @@ static void *migration_thread(void *opaque)
     /* Used by the bandwidth calcs, updated later */
     int64_t initial_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
     int64_t setup_start = qemu_clock_get_ms(QEMU_CLOCK_HOST);
-    int64_t initial_bytes = 0;
     /*
      * The final stage happens when the remaining data is smaller than
      * this threshold; it's calculated from the requested downtime and
      * measured bandwidth
      */
     int64_t threshold_size = 0;
+    int64_t qemu_file_bytes = 0;
     int64_t start_time = initial_time;
     int64_t end_time;
     bool old_vm_running = false;
@@ -2311,8 +2311,9 @@ static void *migration_thread(void *opaque)
         }
         current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
         if (current_time >= initial_time + BUFFER_DELAY) {
-            uint64_t transferred_bytes = qemu_ftell(s->to_dst_file) -
-                                         initial_bytes;
+            uint64_t qemu_file_bytes_now = qemu_ftell(s->to_dst_file);
+            uint64_t transferred_bytes =
+                qemu_file_bytes_now - qemu_file_bytes;
             uint64_t time_spent = current_time - initial_time;
             double bandwidth = (double)transferred_bytes / time_spent;
             threshold_size = bandwidth * s->parameters.downtime_limit;
@@ -2331,7 +2332,7 @@ static void *migration_thread(void *opaque)
 
             qemu_file_reset_rate_limit(s->to_dst_file);
             initial_time = current_time;
-            initial_bytes = qemu_ftell(s->to_dst_file);
+            qemu_file_bytes = qemu_file_bytes_now;
         }
         if (qemu_file_rate_limit(s->to_dst_file)) {
             /* usleep expects microseconds */
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 26+ messages in thread

* [Qemu-devel] [PATCH v10 03/14] migration: Drop current address parameter from save_zero_page()
  2018-01-10 12:47 [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 01/14] migration: Make migrate_fd_error() the owner of the Error Juan Quintela
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 02/14] migration: Rename initial_bytes Juan Quintela
@ 2018-01-10 12:47 ` Juan Quintela
  2018-01-12 18:56   ` Dr. David Alan Gilbert
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 04/14] migration: Start of multiple fd work Juan Quintela
                   ` (11 subsequent siblings)
  14 siblings, 1 reply; 26+ messages in thread
From: Juan Quintela @ 2018-01-10 12:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

It already has RAMBlock and offset, it can calculate it itself.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index cb1950f3eb..5a109efeda 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -907,11 +907,10 @@ static void migration_bitmap_sync(RAMState *rs)
  * @rs: current RAM state
  * @block: block that contains the page we want to send
  * @offset: offset inside the block for the page
- * @p: pointer to the page
  */
-static int save_zero_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
-                          uint8_t *p)
+static int save_zero_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
 {
+    uint8_t *p = block->host + offset;
     int pages = -1;
 
     if (is_zero_range(p, TARGET_PAGE_SIZE)) {
@@ -984,7 +983,7 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
             }
         }
     } else {
-        pages = save_zero_page(rs, block, offset, p);
+        pages = save_zero_page(rs, block, offset);
         if (pages > 0) {
             /* Must let xbzrle know, otherwise a previous (now 0'd) cached
              * page would be stale
@@ -1160,7 +1159,7 @@ static int ram_save_compressed_page(RAMState *rs, PageSearchStatus *pss,
          */
         if (block != rs->last_sent_block) {
             flush_compressed_data(rs);
-            pages = save_zero_page(rs, block, offset, p);
+            pages = save_zero_page(rs, block, offset);
             if (pages == -1) {
                 /* Make sure the first page is sent out before other pages */
                 bytes_xmit = save_page_header(rs, rs->f, block, offset |
@@ -1180,7 +1179,7 @@ static int ram_save_compressed_page(RAMState *rs, PageSearchStatus *pss,
                 ram_release_pages(block->idstr, offset, pages);
             }
         } else {
-            pages = save_zero_page(rs, block, offset, p);
+            pages = save_zero_page(rs, block, offset);
             if (pages == -1) {
                 pages = compress_page_with_multi_thread(rs, block, offset);
             } else {
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 26+ messages in thread

* [Qemu-devel] [PATCH v10 04/14] migration: Start of multiple fd work
  2018-01-10 12:47 [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
                   ` (2 preceding siblings ...)
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 03/14] migration: Drop current address parameter from save_zero_page() Juan Quintela
@ 2018-01-10 12:47 ` Juan Quintela
  2018-01-22  7:00   ` Peter Xu
  2018-01-23 19:52   ` Dr. David Alan Gilbert
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 05/14] migration: Create ram_multifd_page Juan Quintela
                   ` (10 subsequent siblings)
  14 siblings, 2 replies; 26+ messages in thread
From: Juan Quintela @ 2018-01-10 12:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We create new channels for each new thread created. We send through
them in a packed struct.  This way we can check we connect the right
channels in both sides.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--
Split SocketArgs into incoming and outgoing args

Use UUID's on the initial message, so we are sure we are connecting to
the right channel.

Remove init semaphore.  Now that we use uuids on the init message, we
know that this is our channel.

Fix recv socket destwroy, we were destroying send channels.
This was very interesting, because we were using an unreferred object
without problems.

Move to struct of pointers
init channel sooner.
split recv thread creation.
listen on main thread
We count the number of created threads to know when we need to stop listening
Use g_strdup_printf
report channel id on errors
Add name parameter
Use local_err
Add Error * parameter to socket_send_channel_create()
Use qio_channel_*_all
Use asynchronous connect
Use an struct to send all fields
Use default uuid
Fix local_err = NULL (dave)
Make lines 80 lines long (checkpatch)
Move multifd_new_channel() and multifd_recv_thread() to later patches
when used.
Add __attribute__(packad)
Use UUIDs are opaques isntead of the ASCII represantation
rename migrate_new_channel_async to migrate_new_send_channel_async
rename recv_channel_destroy to _unref.  And create the pairing _ref.
---
 migration/migration.c |   7 +++-
 migration/ram.c       | 114 +++++++++++++++++++++++++++++++++++---------------
 migration/ram.h       |   3 ++
 migration/socket.c    |  39 ++++++++++++++++-
 migration/socket.h    |  10 +++++
 5 files changed, 137 insertions(+), 36 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index e506b9c2c6..77fc17f723 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -426,7 +426,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
         QEMUFile *f = qemu_fopen_channel_input(ioc);
         migration_fd_process_incoming(f);
     }
-    /* We still only have a single channel.  Nothing to do here yet */
+    multifd_recv_new_channel(ioc);
 }
 
 /**
@@ -437,6 +437,11 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
  */
 bool migration_has_all_channels(void)
 {
+    if (migrate_use_multifd()) {
+        int thread_count = migrate_multifd_channels();
+
+        return thread_count == multifd_created_channels();
+    }
     return true;
 }
 
diff --git a/migration/ram.c b/migration/ram.c
index 5a109efeda..aef5a323f3 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -36,6 +36,7 @@
 #include "xbzrle.h"
 #include "ram.h"
 #include "migration.h"
+#include "socket.h"
 #include "migration/register.h"
 #include "migration/misc.h"
 #include "qemu-file.h"
@@ -49,6 +50,8 @@
 #include "qemu/rcu_queue.h"
 #include "migration/colo.h"
 #include "migration/block.h"
+#include "sysemu/sysemu.h"
+#include "qemu/uuid.h"
 
 /***********************************************************/
 /* ram save/restore */
@@ -396,6 +399,7 @@ struct MultiFDSendParams {
     uint8_t id;
     char *name;
     QemuThread thread;
+    QIOChannel *c;
     QemuSemaphore sem;
     QemuMutex mutex;
     bool quit;
@@ -412,6 +416,15 @@ static void terminate_multifd_send_threads(Error *errp)
 {
     int i;
 
+    if (errp) {
+        MigrationState *s = migrate_get_current();
+        migrate_set_error(s, errp);
+        if (s->state == MIGRATION_STATUS_SETUP ||
+            s->state == MIGRATION_STATUS_ACTIVE) {
+            migrate_set_state(&s->state, s->state,
+                              MIGRATION_STATUS_FAILED);
+        }
+    }
     for (i = 0; i < multifd_send_state->count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
@@ -437,6 +450,7 @@ int multifd_save_cleanup(Error **errp)
         qemu_thread_join(&p->thread);
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        socket_send_channel_destroy(p->c);
         g_free(p->name);
         p->name = NULL;
     }
@@ -447,9 +461,27 @@ int multifd_save_cleanup(Error **errp)
     return ret;
 }
 
+typedef struct {
+    uint32_t version;
+    unsigned char uuid[16]; /* QemuUUID */
+    uint8_t id;
+} __attribute__((packed)) MultiFDInit_t;
+
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
+    MultiFDInit_t msg;
+    Error *local_err = NULL;
+    size_t ret;
+
+    msg.version = 1;
+    msg.id = p->id;
+    memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
+    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), &local_err);
+    if (ret != 0) {
+        terminate_multifd_send_threads(local_err);
+        return NULL;
+    }
 
     while (true) {
         qemu_mutex_lock(&p->mutex);
@@ -464,6 +496,27 @@ static void *multifd_send_thread(void *opaque)
     return NULL;
 }
 
+static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
+{
+    MultiFDSendParams *p = opaque;
+    QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
+    Error *local_err = NULL;
+
+    if (qio_task_propagate_error(task, &local_err)) {
+        if (multifd_save_cleanup(&local_err) != 0) {
+            migrate_set_error(migrate_get_current(), local_err);
+        }
+    } else {
+        p->c = QIO_CHANNEL(sioc);
+        qio_channel_set_delay(p->c, false);
+
+        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
+                           QEMU_THREAD_JOINABLE);
+
+        multifd_send_state->count++;
+    }
+}
+
 int multifd_save_setup(void)
 {
     int thread_count;
@@ -484,10 +537,7 @@ int multifd_save_setup(void)
         p->quit = false;
         p->id = i;
         p->name = g_strdup_printf("multifdsend_%d", i);
-        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
-                           QEMU_THREAD_JOINABLE);
-
-        multifd_send_state->count++;
+        socket_send_channel_create(multifd_new_send_channel_async, p);
     }
     return 0;
 }
@@ -496,6 +546,7 @@ struct MultiFDRecvParams {
     uint8_t id;
     char *name;
     QemuThread thread;
+    QIOChannel *c;
     QemuSemaphore sem;
     QemuMutex mutex;
     bool quit;
@@ -506,12 +557,25 @@ struct {
     MultiFDRecvParams *params;
     /* number of created threads */
     int count;
+    /* Should we finish */
+    bool quit;
 } *multifd_recv_state;
 
 static void terminate_multifd_recv_threads(Error *errp)
 {
     int i;
 
+    if (errp) {
+        MigrationState *s = migrate_get_current();
+        migrate_set_error(s, errp);
+        if (s->state == MIGRATION_STATUS_SETUP ||
+            s->state == MIGRATION_STATUS_ACTIVE) {
+            migrate_set_state(&s->state, s->state,
+                              MIGRATION_STATUS_FAILED);
+        }
+    }
+    multifd_recv_state->quit = true;
+
     for (i = 0; i < multifd_recv_state->count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
@@ -537,6 +601,7 @@ int multifd_load_cleanup(Error **errp)
         qemu_thread_join(&p->thread);
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        socket_recv_channel_unref(p->c);
         g_free(p->name);
         p->name = NULL;
     }
@@ -548,27 +613,9 @@ int multifd_load_cleanup(Error **errp)
     return ret;
 }
 
-static void *multifd_recv_thread(void *opaque)
-{
-    MultiFDRecvParams *p = opaque;
-
-    while (true) {
-        qemu_mutex_lock(&p->mutex);
-        if (p->quit) {
-            qemu_mutex_unlock(&p->mutex);
-            break;
-        }
-        qemu_mutex_unlock(&p->mutex);
-        qemu_sem_wait(&p->sem);
-    }
-
-    return NULL;
-}
-
 int multifd_load_setup(void)
 {
     int thread_count;
-    uint8_t i;
 
     if (!migrate_use_multifd()) {
         return 0;
@@ -577,21 +624,20 @@ int multifd_load_setup(void)
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
     multifd_recv_state->count = 0;
-    for (i = 0; i < thread_count; i++) {
-        MultiFDRecvParams *p = &multifd_recv_state->params[i];
-
-        qemu_mutex_init(&p->mutex);
-        qemu_sem_init(&p->sem, 0);
-        p->quit = false;
-        p->id = i;
-        p->name = g_strdup_printf("multifdrecv_%d", i);
-        qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
-                           QEMU_THREAD_JOINABLE);
-        multifd_recv_state->count++;
-    }
+    multifd_recv_state->quit = false;
     return 0;
 }
 
+int multifd_created_channels(void)
+{
+    return multifd_recv_state->count;
+}
+
+void multifd_recv_new_channel(QIOChannel *ioc)
+{
+    socket_recv_channel_unref(ioc);
+}
+
 /**
  * save_page_header: write page header to wire
  *
diff --git a/migration/ram.h b/migration/ram.h
index 64d81e9f1d..be7d09d0ec 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -31,6 +31,7 @@
 
 #include "qemu-common.h"
 #include "exec/cpu-common.h"
+#include "io/channel.h"
 
 extern MigrationStats ram_counters;
 extern XBZRLECacheStats xbzrle_counters;
@@ -43,6 +44,8 @@ int multifd_save_setup(void);
 int multifd_save_cleanup(Error **errp);
 int multifd_load_setup(void);
 int multifd_load_cleanup(Error **errp);
+int multifd_created_channels(void);
+void multifd_recv_new_channel(QIOChannel *ioc);
 
 uint64_t ram_pagesize_summary(void);
 int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len);
diff --git a/migration/socket.c b/migration/socket.c
index 6d49903978..aedabee8a1 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -27,6 +27,39 @@
 #include "io/channel-socket.h"
 #include "trace.h"
 
+int socket_recv_channel_ref(QIOChannel *recv)
+{
+    object_ref(OBJECT(recv));
+    return 0;
+}
+
+int socket_recv_channel_unref(QIOChannel *recv)
+{
+    object_unref(OBJECT(recv));
+    return 0;
+}
+
+struct SocketOutgoingArgs {
+    SocketAddress *saddr;
+} outgoing_args;
+
+void socket_send_channel_create(void (*f)(QIOTask *, gpointer), void *data)
+{
+    QIOChannelSocket *sioc = qio_channel_socket_new();
+    qio_channel_socket_connect_async(sioc, outgoing_args.saddr,
+                                     f, data, NULL);
+}
+
+int socket_send_channel_destroy(QIOChannel *send)
+{
+    /* Remove channel */
+    object_unref(OBJECT(send));
+    if (outgoing_args.saddr) {
+        qapi_free_SocketAddress(outgoing_args.saddr);
+        outgoing_args.saddr = NULL;
+    }
+    return 0;
+}
 
 static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
 {
@@ -96,6 +129,11 @@ static void socket_start_outgoing_migration(MigrationState *s,
     struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
 
     data->s = s;
+
+    /* in case previous migration leaked it */
+    qapi_free_SocketAddress(outgoing_args.saddr);
+    outgoing_args.saddr = saddr;
+
     if (saddr->type == SOCKET_ADDRESS_TYPE_INET) {
         data->hostname = g_strdup(saddr->u.inet.host);
     }
@@ -106,7 +144,6 @@ static void socket_start_outgoing_migration(MigrationState *s,
                                      socket_outgoing_migration,
                                      data,
                                      socket_connect_data_free);
-    qapi_free_SocketAddress(saddr);
 }
 
 void tcp_start_outgoing_migration(MigrationState *s,
diff --git a/migration/socket.h b/migration/socket.h
index 6b91e9db38..cbdb8d64c3 100644
--- a/migration/socket.h
+++ b/migration/socket.h
@@ -16,6 +16,16 @@
 
 #ifndef QEMU_MIGRATION_SOCKET_H
 #define QEMU_MIGRATION_SOCKET_H
+
+#include "io/channel.h"
+#include "io/task.h"
+
+int socket_recv_channel_ref(QIOChannel *recv);
+int socket_recv_channel_unref(QIOChannel *recv);
+
+void socket_send_channel_create(void (*f)(QIOTask *, gpointer), void *data);
+int socket_send_channel_destroy(QIOChannel *send);
+
 void tcp_start_incoming_migration(const char *host_port, Error **errp);
 
 void tcp_start_outgoing_migration(MigrationState *s, const char *host_port,
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 26+ messages in thread

* [Qemu-devel] [PATCH v10 05/14] migration: Create ram_multifd_page
  2018-01-10 12:47 [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
                   ` (3 preceding siblings ...)
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 04/14] migration: Start of multiple fd work Juan Quintela
@ 2018-01-10 12:47 ` Juan Quintela
  2018-01-23 20:16   ` Dr. David Alan Gilbert
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 06/14] migration: Send the fd number which we are going to use for this page Juan Quintela
                   ` (9 subsequent siblings)
  14 siblings, 1 reply; 26+ messages in thread
From: Juan Quintela @ 2018-01-10 12:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

The function still don't use multifd, but we have simplified
ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
counter and a new flag for this type of pages.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--
Add last_page parameter
Add commets for done and address
Remove multifd field, it is the same than normal pages
Merge next patch, now we send multiple pages at a time
Remove counter for multifd pages, it is identical to normal pages
Use iovec's instead of creating the equivalent.
Clear memory used by pages (dave)
Use g_new0(danp)
define MULTIFD_CONTINUE
now pages member is a pointer
Fix off-by-one in number of pages in one packet
---
 migration/ram.c        | 159 ++++++++++++++++++++++++++++++++++++++++++++++++-
 migration/trace-events |   2 +
 2 files changed, 160 insertions(+), 1 deletion(-)

diff --git a/migration/ram.c b/migration/ram.c
index aef5a323f3..5d6b46ac23 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -52,6 +52,7 @@
 #include "migration/block.h"
 #include "sysemu/sysemu.h"
 #include "qemu/uuid.h"
+#include "qemu/iov.h"
 
 /***********************************************************/
 /* ram save/restore */
@@ -71,6 +72,7 @@
 #define RAM_SAVE_FLAG_XBZRLE   0x40
 /* 0x80 is reserved in migration.h start with 0x100 next */
 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
+#define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
 
 static inline bool is_zero_range(uint8_t *p, uint64_t size)
 {
@@ -395,14 +397,36 @@ static void compress_threads_save_setup(void)
 
 /* Multiple fd's */
 
+/* used to continue on the same multifd group */
+#define MULTIFD_CONTINUE UINT16_MAX
+
+typedef struct {
+    /* number of used pages */
+    uint32_t used;
+    /* number of allocated pages */
+    uint32_t allocated;
+    /* global number of generated multifd packets */
+    uint32_t seq;
+    struct iovec *iov;
+    RAMBlock *block;
+} multifd_pages_t;
+
 struct MultiFDSendParams {
+    /* not changed */
     uint8_t id;
     char *name;
     QemuThread thread;
     QIOChannel *c;
     QemuSemaphore sem;
     QemuMutex mutex;
+    /* protected by param mutex */
     bool quit;
+    multifd_pages_t *pages;
+    /* how many patches has sent this channel */
+    uint32_t packets_sent;
+    /* protected by multifd mutex */
+    /* has the thread finish the last submitted job */
+    bool done;
 };
 typedef struct MultiFDSendParams MultiFDSendParams;
 
@@ -410,8 +434,31 @@ struct {
     MultiFDSendParams *params;
     /* number of created threads */
     int count;
+    QemuMutex mutex;
+    QemuSemaphore sem;
+    multifd_pages_t *pages;
 } *multifd_send_state;
 
+static void multifd_pages_init(multifd_pages_t **ppages, size_t size)
+{
+    multifd_pages_t *pages = g_new0(multifd_pages_t, 1);
+
+    pages->allocated = size;
+    pages->iov = g_new0(struct iovec, size);
+    *ppages = pages;
+}
+
+static void multifd_pages_clear(multifd_pages_t *pages)
+{
+    pages->used = 0;
+    pages->allocated = 0;
+    pages->seq = 0;
+    pages->block = NULL;
+    g_free(pages->iov);
+    pages->iov = NULL;
+    g_free(pages);
+}
+
 static void terminate_multifd_send_threads(Error *errp)
 {
     int i;
@@ -453,9 +500,13 @@ int multifd_save_cleanup(Error **errp)
         socket_send_channel_destroy(p->c);
         g_free(p->name);
         p->name = NULL;
+        multifd_pages_clear(p->pages);
+        p->pages = NULL;
     }
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
+    multifd_pages_clear(multifd_send_state->pages);
+    multifd_send_state->pages = NULL;
     g_free(multifd_send_state);
     multifd_send_state = NULL;
     return ret;
@@ -482,6 +533,7 @@ static void *multifd_send_thread(void *opaque)
         terminate_multifd_send_threads(local_err);
         return NULL;
     }
+    qemu_sem_post(&multifd_send_state->sem);
 
     while (true) {
         qemu_mutex_lock(&p->mutex);
@@ -489,9 +541,24 @@ static void *multifd_send_thread(void *opaque)
             qemu_mutex_unlock(&p->mutex);
             break;
         }
+        if (p->pages->used) {
+            p->pages->used = 0;
+            qemu_mutex_unlock(&p->mutex);
+
+            trace_multifd_send(p->id, p->pages->seq, p->pages->used);
+            /* ToDo: send page here */
+
+            qemu_mutex_lock(&multifd_send_state->mutex);
+            p->done = true;
+            p->packets_sent++;
+            qemu_mutex_unlock(&multifd_send_state->mutex);
+            qemu_sem_post(&multifd_send_state->sem);
+            continue;
+        }
         qemu_mutex_unlock(&p->mutex);
         qemu_sem_wait(&p->sem);
     }
+    trace_multifd_send_thread(p->id, p->packets_sent);
 
     return NULL;
 }
@@ -529,6 +596,10 @@ int multifd_save_setup(void)
     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
     multifd_send_state->count = 0;
+    qemu_mutex_init(&multifd_send_state->mutex);
+    qemu_sem_init(&multifd_send_state->sem, 0);
+    multifd_pages_init(&multifd_send_state->pages,
+                       migrate_multifd_page_count());
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
@@ -536,12 +607,58 @@ int multifd_save_setup(void)
         qemu_sem_init(&p->sem, 0);
         p->quit = false;
         p->id = i;
+        p->done = true;
+        multifd_pages_init(&p->pages, migrate_multifd_page_count());
         p->name = g_strdup_printf("multifdsend_%d", i);
         socket_send_channel_create(multifd_new_send_channel_async, p);
     }
     return 0;
 }
 
+static uint16_t multifd_send_page(RAMBlock *block, ram_addr_t offset,
+                                  bool last_page)
+{
+    int i;
+    MultiFDSendParams *p = NULL; /* make happy gcc */
+    multifd_pages_t *pages = multifd_send_state->pages;
+
+    if (!pages->block) {
+        pages->block = block;
+    }
+
+    pages->iov[pages->used].iov_base = block->host + offset;
+    pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
+    pages->used++;
+
+    if (!last_page) {
+        if (pages->used < pages->allocated) {
+            return MULTIFD_CONTINUE;
+        }
+    }
+
+    qemu_sem_wait(&multifd_send_state->sem);
+    qemu_mutex_lock(&multifd_send_state->mutex);
+    for (i = 0; i < multifd_send_state->count; i++) {
+        p = &multifd_send_state->params[i];
+
+        if (p->done) {
+            p->done = false;
+            break;
+        }
+    }
+    qemu_mutex_unlock(&multifd_send_state->mutex);
+    qemu_mutex_lock(&p->mutex);
+    p->pages->used = 0;
+    p->pages->seq = pages->seq + 1;
+    p->pages->block = NULL;
+    multifd_send_state->pages = p->pages;
+    p->pages = pages;
+    qemu_mutex_unlock(&p->mutex);
+    qemu_sem_post(&p->sem);
+
+    return i;
+}
+
 struct MultiFDRecvParams {
     uint8_t id;
     char *name;
@@ -1070,6 +1187,31 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
     return pages;
 }
 
+static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
+                            bool last_stage)
+{
+    int pages;
+    uint8_t *p;
+    RAMBlock *block = pss->block;
+    ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
+
+    p = block->host + offset;
+
+    pages = save_zero_page(rs, block, offset);
+    if (pages == -1) {
+        ram_counters.transferred +=
+            save_page_header(rs, rs->f, block,
+                             offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
+        multifd_send_page(block, offset, rs->migration_dirty_pages == 1);
+        qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
+        ram_counters.transferred += TARGET_PAGE_SIZE;
+        pages = 1;
+        ram_counters.normal++;
+    }
+
+    return pages;
+}
+
 static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
                                 ram_addr_t offset)
 {
@@ -1498,6 +1640,8 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
         if (migrate_use_compression() &&
             (rs->ram_bulk_stage || !migrate_use_xbzrle())) {
             res = ram_save_compressed_page(rs, pss, last_stage);
+        } else if (migrate_use_multifd()) {
+            res = ram_multifd_page(rs, pss, last_stage);
         } else {
             res = ram_save_page(rs, pss, last_stage);
         }
@@ -2878,6 +3022,10 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
     if (!migrate_use_compression()) {
         invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
     }
+
+    if (!migrate_use_multifd()) {
+        invalid_flags |= RAM_SAVE_FLAG_MULTIFD_PAGE;
+    }
     /* This RCU critical section can be very long running.
      * When RCU reclaims in the code start to become numerous,
      * it will be necessary to reduce the granularity of this
@@ -2902,13 +3050,17 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             if (flags & invalid_flags & RAM_SAVE_FLAG_COMPRESS_PAGE) {
                 error_report("Received an unexpected compressed page");
             }
+            if (flags & invalid_flags  & RAM_SAVE_FLAG_MULTIFD_PAGE) {
+                error_report("Received an unexpected multifd page");
+            }
 
             ret = -EINVAL;
             break;
         }
 
         if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
-                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
+                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
+                     RAM_SAVE_FLAG_MULTIFD_PAGE)) {
             RAMBlock *block = ram_block_from_stream(f, flags);
 
             host = host_from_ram_block_offset(block, addr);
@@ -2997,6 +3149,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
                 break;
             }
             break;
+
+        case RAM_SAVE_FLAG_MULTIFD_PAGE:
+            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
+            break;
+
         case RAM_SAVE_FLAG_EOS:
             /* normal exit */
             break;
diff --git a/migration/trace-events b/migration/trace-events
index 141e773305..61ee21a13e 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -77,6 +77,8 @@ ram_load_postcopy_loop(uint64_t addr, int flags) "@%" PRIx64 " %x"
 ram_postcopy_send_discard_bitmap(void) ""
 ram_save_page(const char *rbname, uint64_t offset, void *host) "%s: offset: 0x%" PRIx64 " host: %p"
 ram_save_queue_pages(const char *rbname, size_t start, size_t len) "%s: start: 0x%zx len: 0x%zx"
+multifd_send(char id, int seq, int num) "channel %d sequence %d num pages %d"
+multifd_send_thread(char id, uint32_t packets) "channel %d packets %d"
 
 # migration/migration.c
 await_return_path_close_on_source_close(void) ""
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 26+ messages in thread

* [Qemu-devel] [PATCH v10 06/14] migration: Send the fd number which we are going to use for this page
  2018-01-10 12:47 [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
                   ` (4 preceding siblings ...)
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 05/14] migration: Create ram_multifd_page Juan Quintela
@ 2018-01-10 12:47 ` Juan Quintela
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 07/14] migration: Create thread infrastructure for multifd recv side Juan Quintela
                   ` (8 subsequent siblings)
  14 siblings, 0 replies; 26+ messages in thread
From: Juan Quintela @ 2018-01-10 12:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We are still sending the page through the main channel, that would
change later in the series

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/ram.c | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)

diff --git a/migration/ram.c b/migration/ram.c
index 5d6b46ac23..19c8089c4b 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1191,6 +1191,7 @@ static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
                             bool last_stage)
 {
     int pages;
+    uint16_t fd_num;
     uint8_t *p;
     RAMBlock *block = pss->block;
     ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
@@ -1202,7 +1203,10 @@ static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
         ram_counters.transferred +=
             save_page_header(rs, rs->f, block,
                              offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
-        multifd_send_page(block, offset, rs->migration_dirty_pages == 1);
+        fd_num = multifd_send_page(block, offset,
+                                   rs->migration_dirty_pages == 1);
+        qemu_put_be16(rs->f, fd_num);
+        ram_counters.transferred += 2; /* size of fd_num */
         qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
         ram_counters.transferred += TARGET_PAGE_SIZE;
         pages = 1;
@@ -3040,6 +3044,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
     while (!postcopy_running && !ret && !(flags & RAM_SAVE_FLAG_EOS)) {
         ram_addr_t addr, total_ram_bytes;
         void *host = NULL;
+        uint16_t fd_num;
         uint8_t ch;
 
         addr = qemu_get_be64(f);
@@ -3151,6 +3156,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             break;
 
         case RAM_SAVE_FLAG_MULTIFD_PAGE:
+            fd_num = qemu_get_be16(f);
+            if (fd_num != 0) {
+                /* this is yet an unused variable, changed later */
+                fd_num = fd_num;
+            }
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;
 
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 26+ messages in thread

* [Qemu-devel] [PATCH v10 07/14] migration: Create thread infrastructure for multifd recv side
  2018-01-10 12:47 [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
                   ` (5 preceding siblings ...)
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 06/14] migration: Send the fd number which we are going to use for this page Juan Quintela
@ 2018-01-10 12:47 ` Juan Quintela
  2018-01-24 13:34   ` Dr. David Alan Gilbert
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 08/14] migration: Transfer pages over new channels Juan Quintela
                   ` (7 subsequent siblings)
  14 siblings, 1 reply; 26+ messages in thread
From: Juan Quintela @ 2018-01-10 12:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We make the locking and the transfer of information specific, even if we
are still receiving things through the main thread.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--

We split when we create the main channel and where we start the main
migration thread, so we wait for the creation of the other threads.

Use multifd_clear_pages().
Don't remove object_unref()
We use correctly the channel numbres
Denife multifd_new_channel/multifd_recv_thread in this patch, that is
where it is used.
rename migrate_new_channel to migrate_new_send_channel
Add ToDo comment
Add trace
---
 migration/migration.c  |   5 +-
 migration/migration.h  |   1 +
 migration/ram.c        | 129 ++++++++++++++++++++++++++++++++++++++++++++++---
 migration/socket.c     |   3 ++
 migration/trace-events |   2 +
 5 files changed, 132 insertions(+), 8 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 77fc17f723..1545f3a0b0 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -406,7 +406,7 @@ static void migration_incoming_setup(QEMUFile *f)
     qemu_file_set_blocking(f, false);
 }
 
-static void migration_incoming_process(void)
+void migration_incoming_process(void)
 {
     Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, NULL);
     qemu_coroutine_enter(co);
@@ -424,7 +424,8 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
 
     if (!mis->from_src_file) {
         QEMUFile *f = qemu_fopen_channel_input(ioc);
-        migration_fd_process_incoming(f);
+        migration_incoming_setup(f);
+        return;
     }
     multifd_recv_new_channel(ioc);
 }
diff --git a/migration/migration.h b/migration/migration.h
index 29a7b79a39..7de193a9c0 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -172,6 +172,7 @@ void migrate_set_state(int *state, int old_state, int new_state);
 
 void migration_fd_process_incoming(QEMUFile *f);
 void migration_ioc_process_incoming(QIOChannel *ioc);
+void migration_incoming_process(void);
 
 bool  migration_has_all_channels(void);
 
diff --git a/migration/ram.c b/migration/ram.c
index 19c8089c4b..8443806f12 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -660,13 +660,20 @@ static uint16_t multifd_send_page(RAMBlock *block, ram_addr_t offset,
 }
 
 struct MultiFDRecvParams {
+    /* not changed */
     uint8_t id;
     char *name;
     QemuThread thread;
     QIOChannel *c;
+    QemuSemaphore ready;
     QemuSemaphore sem;
     QemuMutex mutex;
+    /* proteced by param mutex */
     bool quit;
+    /* how many patches has sent this channel */
+    uint32_t packets_recv;
+    multifd_pages_t *pages;
+    bool done;
 };
 typedef struct MultiFDRecvParams MultiFDRecvParams;
 
@@ -676,6 +683,7 @@ struct {
     int count;
     /* Should we finish */
     bool quit;
+    multifd_pages_t *pages;
 } *multifd_recv_state;
 
 static void terminate_multifd_recv_threads(Error *errp)
@@ -721,15 +729,51 @@ int multifd_load_cleanup(Error **errp)
         socket_recv_channel_unref(p->c);
         g_free(p->name);
         p->name = NULL;
+        multifd_pages_clear(p->pages);
+        p->pages = NULL;
     }
     g_free(multifd_recv_state->params);
     multifd_recv_state->params = NULL;
+    multifd_pages_clear(multifd_recv_state->pages);
+    multifd_recv_state->pages = NULL;
     g_free(multifd_recv_state);
     multifd_recv_state = NULL;
 
     return ret;
 }
 
+static void *multifd_recv_thread(void *opaque)
+{
+    MultiFDRecvParams *p = opaque;
+
+    qemu_sem_post(&p->ready);
+    while (true) {
+        qemu_mutex_lock(&p->mutex);
+        if (p->quit) {
+            qemu_mutex_unlock(&p->mutex);
+            break;
+        }
+        if (p->pages->used) {
+            p->pages->used = 0;
+
+            trace_multifd_recv(p->id, p->pages->seq, p->pages->used);
+
+            /* ToDo: receive pages here */
+
+            p->done = true;
+            p->packets_recv++;
+            qemu_mutex_unlock(&p->mutex);
+            qemu_sem_post(&p->ready);
+            continue;
+        }
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_wait(&p->sem);
+    }
+    trace_multifd_recv_thread(p->id, p->packets_recv);
+
+    return NULL;
+}
+
 int multifd_load_setup(void)
 {
     int thread_count;
@@ -742,6 +786,8 @@ int multifd_load_setup(void)
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
     multifd_recv_state->count = 0;
     multifd_recv_state->quit = false;
+    multifd_pages_init(&multifd_recv_state->pages,
+                       migrate_multifd_page_count());
     return 0;
 }
 
@@ -752,7 +798,80 @@ int multifd_created_channels(void)
 
 void multifd_recv_new_channel(QIOChannel *ioc)
 {
-    socket_recv_channel_unref(ioc);
+    MultiFDRecvParams *p;
+    MultiFDInit_t msg;
+    Error *local_err = NULL;
+    size_t ret;
+
+    ret = qio_channel_read_all(ioc, (char *)&msg, sizeof(msg), &local_err);
+    if (ret != 0) {
+        terminate_multifd_recv_threads(local_err);
+        return;
+    }
+
+    if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
+        char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
+        error_setg(&local_err, "multifd: received uuid '%s' and expected "
+                   "uuid '%s' for channel %hhd", msg.uuid, uuid, msg.id);
+        g_free(uuid);
+        terminate_multifd_recv_threads(local_err);
+        return;
+    }
+
+    p = &multifd_recv_state->params[msg.id];
+    if (p->id != 0) {
+        error_setg(&local_err, "multifd: received id '%d' already setup'",
+                   msg.id);
+        terminate_multifd_recv_threads(local_err);
+        return;
+    }
+    qemu_mutex_init(&p->mutex);
+    qemu_sem_init(&p->sem, 0);
+    qemu_sem_init(&p->ready, 0);
+    p->quit = false;
+    p->id = msg.id;
+    p->done = false;
+    multifd_pages_init(&p->pages, migrate_multifd_page_count());
+    p->c = ioc;
+    multifd_recv_state->count++;
+    p->name = g_strdup_printf("multifdrecv_%d", msg.id);
+    socket_recv_channel_ref(ioc);
+
+    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
+                       QEMU_THREAD_JOINABLE);
+    if (multifd_recv_state->count == migrate_multifd_channels()) {
+        migration_incoming_process();
+    }
+}
+
+static void multifd_recv_page(RAMBlock *block, ram_addr_t offset,
+                              uint8_t *address, uint16_t fd_num)
+{
+    int thread_count;
+    MultiFDRecvParams *p;
+    multifd_pages_t *pages = multifd_recv_state->pages;
+
+    pages->iov[pages->used].iov_base = address;
+    pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
+    pages->used++;
+
+    if (fd_num == MULTIFD_CONTINUE) {
+        return;
+    }
+
+    thread_count = migrate_multifd_channels();
+    assert(fd_num < thread_count);
+    p = &multifd_recv_state->params[fd_num];
+
+    qemu_sem_wait(&p->ready);
+
+    qemu_mutex_lock(&p->mutex);
+    p->done = false;
+    p->pages->used = 0;
+    multifd_recv_state->pages = p->pages;
+    p->pages = pages;
+    qemu_mutex_unlock(&p->mutex);
+    qemu_sem_post(&p->sem);
 }
 
 /**
@@ -3042,6 +3161,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
     }
 
     while (!postcopy_running && !ret && !(flags & RAM_SAVE_FLAG_EOS)) {
+        RAMBlock *block = NULL;
         ram_addr_t addr, total_ram_bytes;
         void *host = NULL;
         uint16_t fd_num;
@@ -3066,7 +3186,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
         if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
                      RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
                      RAM_SAVE_FLAG_MULTIFD_PAGE)) {
-            RAMBlock *block = ram_block_from_stream(f, flags);
+            block = ram_block_from_stream(f, flags);
 
             host = host_from_ram_block_offset(block, addr);
             if (!host) {
@@ -3157,10 +3277,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
 
         case RAM_SAVE_FLAG_MULTIFD_PAGE:
             fd_num = qemu_get_be16(f);
-            if (fd_num != 0) {
-                /* this is yet an unused variable, changed later */
-                fd_num = fd_num;
-            }
+            multifd_recv_page(block, addr, host, fd_num);
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;
 
diff --git a/migration/socket.c b/migration/socket.c
index aedabee8a1..ba23442175 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -192,6 +192,9 @@ out:
     if (migration_has_all_channels()) {
         /* Close listening socket as its no longer needed */
         qio_channel_close(ioc, NULL);
+        if (!migrate_use_multifd()) {
+            migration_incoming_process();
+        }
         return G_SOURCE_REMOVE;
     } else {
         return G_SOURCE_CONTINUE;
diff --git a/migration/trace-events b/migration/trace-events
index 61ee21a13e..06aef4d4a3 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -77,7 +77,9 @@ ram_load_postcopy_loop(uint64_t addr, int flags) "@%" PRIx64 " %x"
 ram_postcopy_send_discard_bitmap(void) ""
 ram_save_page(const char *rbname, uint64_t offset, void *host) "%s: offset: 0x%" PRIx64 " host: %p"
 ram_save_queue_pages(const char *rbname, size_t start, size_t len) "%s: start: 0x%zx len: 0x%zx"
+multifd_recv(char id, int seq, int num) "channel %d sequence %d num pages %d"
 multifd_send(char id, int seq, int num) "channel %d sequence %d num pages %d"
+multifd_recv_thread(char id, uint32_t packets) "channel %d packets %d"
 multifd_send_thread(char id, uint32_t packets) "channel %d packets %d"
 
 # migration/migration.c
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 26+ messages in thread

* [Qemu-devel] [PATCH v10 08/14] migration: Transfer pages over new channels
  2018-01-10 12:47 [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
                   ` (6 preceding siblings ...)
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 07/14] migration: Create thread infrastructure for multifd recv side Juan Quintela
@ 2018-01-10 12:47 ` Juan Quintela
  2018-01-24 13:46   ` Dr. David Alan Gilbert
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 09/14] migration: Flush receive queue Juan Quintela
                   ` (6 subsequent siblings)
  14 siblings, 1 reply; 26+ messages in thread
From: Juan Quintela @ 2018-01-10 12:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We switch for sending the page number to send real pages.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--

Remove the HACK bit, now we have the function that calculates the size
of a page exported.
Rename multifd_pages{_now}, to sent pages
Remove multifd pages field, it is the same than normal pages
Merge test channels here
Make sent_pages also work for non multifd case
---
 migration/migration.c | 10 +++++++++-
 migration/ram.c       | 41 +++++++++++++++++++++++++++++------------
 2 files changed, 38 insertions(+), 13 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 1545f3a0b0..6ebcfa36cc 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -2230,6 +2230,8 @@ static void *migration_thread(void *opaque)
      */
     int64_t threshold_size = 0;
     int64_t qemu_file_bytes = 0;
+    /* Stores how many pages we have sent */
+    int64_t sent_pages = 0;
     int64_t start_time = initial_time;
     int64_t end_time;
     bool old_vm_running = false;
@@ -2318,8 +2320,13 @@ static void *migration_thread(void *opaque)
         current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
         if (current_time >= initial_time + BUFFER_DELAY) {
             uint64_t qemu_file_bytes_now = qemu_ftell(s->to_dst_file);
+            uint64_t sent_pages_now = ram_counters.normal;
+            /* multifd sends data out of the qemu_file */
+            uint64_t multifd_transferred = migrate_use_multifd() ?
+                (sent_pages_now - sent_pages) * qemu_target_page_size() : 0;
             uint64_t transferred_bytes =
-                qemu_file_bytes_now - qemu_file_bytes;
+                (qemu_file_bytes_now - qemu_file_bytes) +
+                multifd_transferred;
             uint64_t time_spent = current_time - initial_time;
             double bandwidth = (double)transferred_bytes / time_spent;
             threshold_size = bandwidth * s->parameters.downtime_limit;
@@ -2339,6 +2346,7 @@ static void *migration_thread(void *opaque)
             qemu_file_reset_rate_limit(s->to_dst_file);
             initial_time = current_time;
             qemu_file_bytes = qemu_file_bytes_now;
+            sent_pages = sent_pages_now;
         }
         if (qemu_file_rate_limit(s->to_dst_file)) {
             /* usleep expects microseconds */
diff --git a/migration/ram.c b/migration/ram.c
index 8443806f12..20f3726909 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -542,12 +542,20 @@ static void *multifd_send_thread(void *opaque)
             break;
         }
         if (p->pages->used) {
+            Error *local_err = NULL;
+            size_t ret;
+            uint32_t used;
+
+            used = p->pages->used;
             p->pages->used = 0;
             qemu_mutex_unlock(&p->mutex);
 
-            trace_multifd_send(p->id, p->pages->seq, p->pages->used);
-            /* ToDo: send page here */
-
+            trace_multifd_send(p->id, p->pages->seq, used);
+            ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err);
+            if (ret != 0) {
+                terminate_multifd_send_threads(local_err);
+                return NULL;
+            }
             qemu_mutex_lock(&multifd_send_state->mutex);
             p->done = true;
             p->packets_sent++;
@@ -754,12 +762,21 @@ static void *multifd_recv_thread(void *opaque)
             break;
         }
         if (p->pages->used) {
+            Error *local_err = NULL;
+            size_t ret;
+            uint32_t used;
+
+            used = p->pages->used;
             p->pages->used = 0;
+            qemu_mutex_unlock(&p->mutex);
 
-            trace_multifd_recv(p->id, p->pages->seq, p->pages->used);
-
-            /* ToDo: receive pages here */
-
+            trace_multifd_recv(p->id, p->pages->seq, used);
+            ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
+            if (ret != 0) {
+                terminate_multifd_recv_threads(local_err);
+                return NULL;
+            }
+            qemu_mutex_lock(&p->mutex);
             p->done = true;
             p->packets_recv++;
             qemu_mutex_unlock(&p->mutex);
@@ -1311,12 +1328,9 @@ static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
 {
     int pages;
     uint16_t fd_num;
-    uint8_t *p;
     RAMBlock *block = pss->block;
     ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
 
-    p = block->host + offset;
-
     pages = save_zero_page(rs, block, offset);
     if (pages == -1) {
         ram_counters.transferred +=
@@ -1325,8 +1339,12 @@ static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
         fd_num = multifd_send_page(block, offset,
                                    rs->migration_dirty_pages == 1);
         qemu_put_be16(rs->f, fd_num);
+        if (fd_num != MULTIFD_CONTINUE) {
+            /* We start with a different channel.
+               Flush pending work */
+            qemu_fflush(rs->f);
+        }
         ram_counters.transferred += 2; /* size of fd_num */
-        qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
         ram_counters.transferred += TARGET_PAGE_SIZE;
         pages = 1;
         ram_counters.normal++;
@@ -3278,7 +3296,6 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
         case RAM_SAVE_FLAG_MULTIFD_PAGE:
             fd_num = qemu_get_be16(f);
             multifd_recv_page(block, addr, host, fd_num);
-            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;
 
         case RAM_SAVE_FLAG_EOS:
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 26+ messages in thread

* [Qemu-devel] [PATCH v10 09/14] migration: Flush receive queue
  2018-01-10 12:47 [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
                   ` (7 preceding siblings ...)
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 08/14] migration: Transfer pages over new channels Juan Quintela
@ 2018-01-10 12:47 ` Juan Quintela
  2018-01-24 14:12   ` Dr. David Alan Gilbert
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 10/14] migration: Add multifd test Juan Quintela
                   ` (5 subsequent siblings)
  14 siblings, 1 reply; 26+ messages in thread
From: Juan Quintela @ 2018-01-10 12:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Each time that we sync the bitmap, it is a possiblity that we receive
a page that is being processed by a different thread.  We fix this
problem just making sure that we wait for all receiving threads to
finish its work before we procedeed with the next stage.

We are low on page flags, so we use a combination that is not valid to
emit that message:  MULTIFD_PAGE and COMPRESSED.

I tried to make a migration command for it, but it don't work because
we sync the bitmap sometimes when we have already sent the beggining
of the section, so I just added a new page flag.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--
Create RAM_SAVE_FLAG_MULTIFD_SYNC (dave suggestion)
Move the set of need_flush to inside the bitmap_sync code (peter suggestion)
---
 migration/ram.c | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 55 insertions(+)

diff --git a/migration/ram.c b/migration/ram.c
index 20f3726909..b1ad7b2730 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -74,6 +74,14 @@
 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
 #define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
 
+/* We are getting low on pages flags, so we start using combinations
+   When we need to flush a page, we sent it as
+   RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO
+   We don't allow that combination
+*/
+#define RAM_SAVE_FLAG_MULTIFD_SYNC \
+    (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO)
+
 static inline bool is_zero_range(uint8_t *p, uint64_t size)
 {
     return buffer_is_zero(p, size);
@@ -226,6 +234,9 @@ struct RAMState {
     uint64_t iterations_prev;
     /* Iterations since start */
     uint64_t iterations;
+    /* Indicates if we have synced the bitmap and we need to assure that
+       target has processeed all previous pages */
+    bool multifd_needs_flush;
     /* number of dirty bits in the bitmap */
     uint64_t migration_dirty_pages;
     /* protects modification of the bitmap */
@@ -675,11 +686,13 @@ struct MultiFDRecvParams {
     QIOChannel *c;
     QemuSemaphore ready;
     QemuSemaphore sem;
+    QemuCond cond_sync;
     QemuMutex mutex;
     /* proteced by param mutex */
     bool quit;
     /* how many patches has sent this channel */
     uint32_t packets_recv;
+    bool sync;
     multifd_pages_t *pages;
     bool done;
 };
@@ -734,6 +747,7 @@ int multifd_load_cleanup(Error **errp)
         qemu_thread_join(&p->thread);
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        qemu_cond_destroy(&p->cond_sync);
         socket_recv_channel_unref(p->c);
         g_free(p->name);
         p->name = NULL;
@@ -779,6 +793,10 @@ static void *multifd_recv_thread(void *opaque)
             qemu_mutex_lock(&p->mutex);
             p->done = true;
             p->packets_recv++;
+            if (p->sync) {
+                qemu_cond_signal(&p->cond_sync);
+                p->sync = false;
+            }
             qemu_mutex_unlock(&p->mutex);
             qemu_sem_post(&p->ready);
             continue;
@@ -845,9 +863,11 @@ void multifd_recv_new_channel(QIOChannel *ioc)
     qemu_mutex_init(&p->mutex);
     qemu_sem_init(&p->sem, 0);
     qemu_sem_init(&p->ready, 0);
+    qemu_cond_init(&p->cond_sync);
     p->quit = false;
     p->id = msg.id;
     p->done = false;
+    p->sync = false;
     multifd_pages_init(&p->pages, migrate_multifd_page_count());
     p->c = ioc;
     multifd_recv_state->count++;
@@ -891,6 +911,27 @@ static void multifd_recv_page(RAMBlock *block, ram_addr_t offset,
     qemu_sem_post(&p->sem);
 }
 
+static int multifd_flush(void)
+{
+    int i, thread_count;
+
+    if (!migrate_use_multifd()) {
+        return 0;
+    }
+    thread_count = migrate_multifd_channels();
+    for (i = 0; i < thread_count; i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        qemu_mutex_lock(&p->mutex);
+        while (!p->done) {
+            p->sync = true;
+            qemu_cond_wait(&p->cond_sync, &p->mutex);
+        }
+        qemu_mutex_unlock(&p->mutex);
+    }
+    return 0;
+}
+
 /**
  * save_page_header: write page header to wire
  *
@@ -908,6 +949,12 @@ static size_t save_page_header(RAMState *rs, QEMUFile *f,  RAMBlock *block,
 {
     size_t size, len;
 
+    if (rs->multifd_needs_flush &&
+        (offset & RAM_SAVE_FLAG_MULTIFD_PAGE)) {
+        offset |= RAM_SAVE_FLAG_ZERO;
+        rs->multifd_needs_flush = false;
+    }
+
     if (block == rs->last_sent_block) {
         offset |= RAM_SAVE_FLAG_CONTINUE;
     }
@@ -1196,6 +1243,9 @@ static void migration_bitmap_sync(RAMState *rs)
     if (migrate_use_events()) {
         qapi_event_send_migration_pass(ram_counters.dirty_sync_count, NULL);
     }
+    if (!rs->ram_bulk_stage && migrate_use_multifd()) {
+        rs->multifd_needs_flush = true;
+    }
 }
 
 /**
@@ -3201,6 +3251,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             break;
         }
 
+        if ((flags & RAM_SAVE_FLAG_MULTIFD_SYNC)
+            == RAM_SAVE_FLAG_MULTIFD_SYNC) {
+            multifd_flush();
+            flags = flags & ~RAM_SAVE_FLAG_ZERO;
+        }
         if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
                      RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
                      RAM_SAVE_FLAG_MULTIFD_PAGE)) {
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 26+ messages in thread

* [Qemu-devel] [PATCH v10 10/14] migration: Add multifd test
  2018-01-10 12:47 [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
                   ` (8 preceding siblings ...)
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 09/14] migration: Flush receive queue Juan Quintela
@ 2018-01-10 12:47 ` Juan Quintela
  2018-01-24 14:23   ` Dr. David Alan Gilbert
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 11/14] LOCAL: use trace events for migration-test Juan Quintela
                   ` (4 subsequent siblings)
  14 siblings, 1 reply; 26+ messages in thread
From: Juan Quintela @ 2018-01-10 12:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We set the x-multifd-page-count and x-multifd-channels.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 tests/migration-test.c | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 50 insertions(+)

diff --git a/tests/migration-test.c b/tests/migration-test.c
index eb44a95aa9..488ae89f34 100644
--- a/tests/migration-test.c
+++ b/tests/migration-test.c
@@ -826,6 +826,55 @@ static void test_compress_unix(void)
     g_free(uri);
 }
 
+static void test_multifd_tcp(void)
+{
+    char *uri;
+    char *port;
+    QTestState *from, *to;
+
+    test_migrate_start(&from, &to, "tcp:127.0.0.1:0");
+
+    /* We want to pick a speed slow enough that the test completes
+     * quickly, but that it doesn't complete precopy even on a slow
+     * machine, so also set the downtime.
+     */
+    /* 1 ms should make it not converge*/
+    migrate_set_parameter(from, "downtime-limit", "1");
+    /* 1GB/s */
+    migrate_set_parameter(from, "max-bandwidth", "100000000");
+
+    migrate_set_parameter(from, "x-multifd-channels", "4");
+    migrate_set_parameter(to, "x-multifd-channels", "4");
+
+    migrate_set_parameter(from, "x-multifd-page-count", "64");
+    migrate_set_parameter(to, "x-multifd-page-count", "64");
+
+    migrate_set_capability(from, "x-multifd", "true");
+    migrate_set_capability(to, "x-multifd", "true");
+    /* Wait for the first serial output from the source */
+    wait_for_serial("src_serial");
+
+    port = migrate_get_parameter(to, "tcp-port");
+    uri = g_strdup_printf("tcp:127.0.0.1:%s", port);
+
+    migrate(from, uri);
+
+    wait_for_migration_pass(from);
+
+    /* 300ms it should converge */
+    migrate_set_parameter(from, "downtime-limit", "300");
+
+    if (!got_stop) {
+        qtest_qmp_eventwait(from, "STOP");
+    }
+    qtest_qmp_eventwait(to, "RESUME");
+
+    wait_for_serial("dest_serial");
+    wait_for_migration_complete(from);
+
+    test_migrate_end(from, to);
+}
+
 int main(int argc, char **argv)
 {
     char template[] = "/tmp/migration-test-XXXXXX";
@@ -853,6 +902,7 @@ int main(int argc, char **argv)
     if (0) {
         qtest_add_func("/migration/compress/unix", test_compress_unix);
     }
+    qtest_add_func("/migration/multifd/tcp", test_multifd_tcp);
 
     ret = g_test_run();
 
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 26+ messages in thread

* [Qemu-devel] [PATCH v10 11/14] LOCAL: use trace events for migration-test
  2018-01-10 12:47 [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
                   ` (9 preceding siblings ...)
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 10/14] migration: Add multifd test Juan Quintela
@ 2018-01-10 12:47 ` Juan Quintela
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 12/14] migration: Sent the page list over the normal thread Juan Quintela
                   ` (3 subsequent siblings)
  14 siblings, 0 replies; 26+ messages in thread
From: Juan Quintela @ 2018-01-10 12:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 tests/migration-test.c | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/tests/migration-test.c b/tests/migration-test.c
index 488ae89f34..d0b0c1b64b 100644
--- a/tests/migration-test.c
+++ b/tests/migration-test.c
@@ -457,11 +457,13 @@ static void test_migrate_start(QTestState **from, QTestState **to,
     if (strcmp(arch, "i386") == 0 || strcmp(arch, "x86_64") == 0) {
         init_bootfile_x86(bootpath);
         cmd_src = g_strdup_printf("-machine accel=%s -m 256M"
+                                  " -trace events=/home/quintela/tmp/events"
                                   " -name source,debug-threads=on"
                                   " -serial file:%s/src_serial"
                                   " -drive file=%s,format=raw",
                                   accel, tmpfs, bootpath);
         cmd_dst = g_strdup_printf("-machine accel=%s -m 256M"
+                                  " -trace events=/home/quintela/tmp/events"
                                   " -name target,debug-threads=on"
                                   " -serial file:%s/dest_serial"
                                   " -drive file=%s,format=raw"
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 26+ messages in thread

* [Qemu-devel] [PATCH v10 12/14] migration: Sent the page list over the normal thread
  2018-01-10 12:47 [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
                   ` (10 preceding siblings ...)
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 11/14] LOCAL: use trace events for migration-test Juan Quintela
@ 2018-01-10 12:47 ` Juan Quintela
  2018-01-24 14:29   ` Dr. David Alan Gilbert
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 13/14] migration: Add multifd_send_packet trace Juan Quintela
                   ` (2 subsequent siblings)
  14 siblings, 1 reply; 26+ messages in thread
From: Juan Quintela @ 2018-01-10 12:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 69 +++++++++++++++++++++++++++++++++++++++++++++++++--------
 1 file changed, 60 insertions(+), 9 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index b1ad7b2730..f636c7da0a 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -411,6 +411,19 @@ static void compress_threads_save_setup(void)
 /* used to continue on the same multifd group */
 #define MULTIFD_CONTINUE UINT16_MAX
 
+#define MULTIFD_MAGIC 0x112233d
+#define MULTIFD_VERSION 1
+
+typedef struct {
+    uint32_t magic;
+    uint32_t version;
+    uint32_t size;
+    uint32_t used;
+    uint32_t seq;
+    char ramblock[256];
+    ram_addr_t offset[];
+} __attribute__((packed)) MultiFDPacket_t;
+
 typedef struct {
     /* number of used pages */
     uint32_t used;
@@ -420,6 +433,8 @@ typedef struct {
     uint32_t seq;
     struct iovec *iov;
     RAMBlock *block;
+    uint32_t packet_len;
+    MultiFDPacket_t *packet;
 } multifd_pages_t;
 
 struct MultiFDSendParams {
@@ -456,6 +471,8 @@ static void multifd_pages_init(multifd_pages_t **ppages, size_t size)
 
     pages->allocated = size;
     pages->iov = g_new0(struct iovec, size);
+    pages->packet_len = sizeof(MultiFDPacket_t) + sizeof(ram_addr_t) * size;
+    pages->packet = g_malloc0(pages->packet_len);
     *ppages = pages;
 }
 
@@ -467,6 +484,9 @@ static void multifd_pages_clear(multifd_pages_t *pages)
     pages->block = NULL;
     g_free(pages->iov);
     pages->iov = NULL;
+    pages->packet_len = 0;
+    g_free(pages->packet);
+    pages->packet = NULL;
     g_free(pages);
 }
 
@@ -553,16 +573,27 @@ static void *multifd_send_thread(void *opaque)
             break;
         }
         if (p->pages->used) {
+            MultiFDPacket_t *packet = p->pages->packet;
             Error *local_err = NULL;
             size_t ret;
-            uint32_t used;
 
-            used = p->pages->used;
+            packet->used = p->pages->used;
             p->pages->used = 0;
             qemu_mutex_unlock(&p->mutex);
-
-            trace_multifd_send(p->id, p->pages->seq, used);
-            ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err);
+            packet->magic = MULTIFD_MAGIC;
+            packet->version = MULTIFD_VERSION;
+            strncpy(packet->ramblock, p->pages->block->idstr, 256);
+            packet->size = migrate_multifd_page_count();
+            packet->seq = p->pages->seq;
+            ret = qio_channel_write_all(p->c, (void *)packet,
+                                        p->pages->packet_len, &local_err);
+            if (ret != 0) {
+                terminate_multifd_send_threads(local_err);
+                return NULL;
+            }
+            trace_multifd_send(p->id, p->pages->seq, packet->used);
+            ret = qio_channel_writev_all(p->c, p->pages->iov,
+                                         packet->used, &local_err);
             if (ret != 0) {
                 terminate_multifd_send_threads(local_err);
                 return NULL;
@@ -645,6 +676,7 @@ static uint16_t multifd_send_page(RAMBlock *block, ram_addr_t offset,
         pages->block = block;
     }
 
+    pages->packet->offset[pages->used] = offset;
     pages->iov[pages->used].iov_base = block->host + offset;
     pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
     pages->used++;
@@ -776,16 +808,35 @@ static void *multifd_recv_thread(void *opaque)
             break;
         }
         if (p->pages->used) {
+            MultiFDPacket_t *packet = p->pages->packet;
+            RAMBlock *block;
             Error *local_err = NULL;
             size_t ret;
-            uint32_t used;
+            int i;
 
-            used = p->pages->used;
             p->pages->used = 0;
             qemu_mutex_unlock(&p->mutex);
 
-            trace_multifd_recv(p->id, p->pages->seq, used);
-            ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
+            ret = qio_channel_read_all(p->c, (void *)packet,
+                                       p->pages->packet_len, &local_err);
+            if (ret != 0) {
+                terminate_multifd_recv_threads(local_err);
+                return NULL;
+            }
+            block = qemu_ram_block_by_name(packet->ramblock);
+            p->pages->seq = packet->seq;
+            for (i = 0; i < packet->used; i++) {
+                if (block->host + packet->offset[i]
+                    != p->pages->iov[i].iov_base) {
+                    printf("page offset %d packet %p pages %p\n", i,
+                           block->host + packet->offset[i],
+                           p->pages->iov[i].iov_base);
+                    break;
+                }
+            }
+            trace_multifd_recv(p->id, p->pages->seq, packet->used);
+            ret = qio_channel_readv_all(p->c, p->pages->iov,
+                                        packet->used, &local_err);
             if (ret != 0) {
                 terminate_multifd_recv_threads(local_err);
                 return NULL;
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 26+ messages in thread

* [Qemu-devel] [PATCH v10 13/14] migration: Add multifd_send_packet trace
  2018-01-10 12:47 [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
                   ` (11 preceding siblings ...)
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 12/14] migration: Sent the page list over the normal thread Juan Quintela
@ 2018-01-10 12:47 ` Juan Quintela
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 14/14] all works Juan Quintela
  2018-01-10 15:01 ` [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
  14 siblings, 0 replies; 26+ messages in thread
From: Juan Quintela @ 2018-01-10 12:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

I need the block name, so I can't do that sooner.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c        | 2 ++
 migration/trace-events | 1 +
 2 files changed, 3 insertions(+)

diff --git a/migration/ram.c b/migration/ram.c
index f636c7da0a..6e45f668d1 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -705,6 +705,8 @@ static uint16_t multifd_send_page(RAMBlock *block, ram_addr_t offset,
     multifd_send_state->pages = p->pages;
     p->pages = pages;
     qemu_mutex_unlock(&p->mutex);
+    trace_multifd_send_packet(p->id, p->pages->block->idstr, p->pages->seq,
+                              p->pages->used, last_page);
     qemu_sem_post(&p->sem);
 
     return i;
diff --git a/migration/trace-events b/migration/trace-events
index 06aef4d4a3..598620c58e 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -81,6 +81,7 @@ multifd_recv(char id, int seq, int num) "channel %d sequence %d num pages %d"
 multifd_send(char id, int seq, int num) "channel %d sequence %d num pages %d"
 multifd_recv_thread(char id, uint32_t packets) "channel %d packets %d"
 multifd_send_thread(char id, uint32_t packets) "channel %d packets %d"
+multifd_send_packet(char id, char *block_name, uint32_t seq, uint32_t packets_used, bool last) "channel %d block %s seq %d used %d last %d"
 
 # migration/migration.c
 await_return_path_close_on_source_close(void) ""
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 26+ messages in thread

* [Qemu-devel] [PATCH v10 14/14] all works
  2018-01-10 12:47 [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
                   ` (12 preceding siblings ...)
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 13/14] migration: Add multifd_send_packet trace Juan Quintela
@ 2018-01-10 12:47 ` Juan Quintela
  2018-01-10 15:01 ` [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
  14 siblings, 0 replies; 26+ messages in thread
From: Juan Quintela @ 2018-01-10 12:47 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 69 ++++++++++++++++++++++++++++-----------------------------
 1 file changed, 34 insertions(+), 35 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 6e45f668d1..a689d4a218 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -671,22 +671,25 @@ static uint16_t multifd_send_page(RAMBlock *block, ram_addr_t offset,
     int i;
     MultiFDSendParams *p = NULL; /* make happy gcc */
     multifd_pages_t *pages = multifd_send_state->pages;
+    bool same_block;
 
     if (!pages->block) {
         pages->block = block;
     }
 
-    pages->packet->offset[pages->used] = offset;
-    pages->iov[pages->used].iov_base = block->host + offset;
-    pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
-    pages->used++;
+    same_block = pages->block == block;
+    if (same_block) {
+        pages->packet->offset[pages->used] = offset;
+        pages->iov[pages->used].iov_base = block->host + offset;
+        pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
+        pages->used++;
 
-    if (!last_page) {
-        if (pages->used < pages->allocated) {
-            return MULTIFD_CONTINUE;
+        if (!last_page) {
+            if (pages->used < pages->allocated) {
+                return MULTIFD_CONTINUE;
+            }
         }
     }
-
     qemu_sem_wait(&multifd_send_state->sem);
     qemu_mutex_lock(&multifd_send_state->mutex);
     for (i = 0; i < multifd_send_state->count; i++) {
@@ -709,7 +712,10 @@ static uint16_t multifd_send_page(RAMBlock *block, ram_addr_t offset,
                               p->pages->used, last_page);
     qemu_sem_post(&p->sem);
 
-    return i;
+    if (!same_block) {
+        multifd_send_page(block, offset, last_page);
+    }
+    return 0;
 }
 
 struct MultiFDRecvParams {
@@ -809,14 +815,13 @@ static void *multifd_recv_thread(void *opaque)
             qemu_mutex_unlock(&p->mutex);
             break;
         }
-        if (p->pages->used) {
+        if (true) {
             MultiFDPacket_t *packet = p->pages->packet;
             RAMBlock *block;
             Error *local_err = NULL;
             size_t ret;
             int i;
 
-            p->pages->used = 0;
             qemu_mutex_unlock(&p->mutex);
 
             ret = qio_channel_read_all(p->c, (void *)packet,
@@ -828,13 +833,8 @@ static void *multifd_recv_thread(void *opaque)
             block = qemu_ram_block_by_name(packet->ramblock);
             p->pages->seq = packet->seq;
             for (i = 0; i < packet->used; i++) {
-                if (block->host + packet->offset[i]
-                    != p->pages->iov[i].iov_base) {
-                    printf("page offset %d packet %p pages %p\n", i,
-                           block->host + packet->offset[i],
-                           p->pages->iov[i].iov_base);
-                    break;
-                }
+                p->pages->iov[i].iov_base = block->host + packet->offset[i];
+                p->pages->iov[i].iov_len = TARGET_PAGE_SIZE;
             }
             trace_multifd_recv(p->id, p->pages->seq, packet->used);
             ret = qio_channel_readv_all(p->c, p->pages->iov,
@@ -851,11 +851,11 @@ static void *multifd_recv_thread(void *opaque)
                 p->sync = false;
             }
             qemu_mutex_unlock(&p->mutex);
-            qemu_sem_post(&p->ready);
+//            qemu_sem_post(&p->ready);
             continue;
         }
         qemu_mutex_unlock(&p->mutex);
-        qemu_sem_wait(&p->sem);
+//        qemu_sem_wait(&p->sem);
     }
     trace_multifd_recv_thread(p->id, p->packets_recv);
 
@@ -953,7 +953,7 @@ static void multifd_recv_page(RAMBlock *block, ram_addr_t offset,
     assert(fd_num < thread_count);
     p = &multifd_recv_state->params[fd_num];
 
-    qemu_sem_wait(&p->ready);
+//    qemu_sem_wait(&p->ready);
 
     qemu_mutex_lock(&p->mutex);
     p->done = false;
@@ -961,7 +961,7 @@ static void multifd_recv_page(RAMBlock *block, ram_addr_t offset,
     multifd_recv_state->pages = p->pages;
     p->pages = pages;
     qemu_mutex_unlock(&p->mutex);
-    qemu_sem_post(&p->sem);
+//    qemu_sem_post(&p->sem);
 }
 
 static int multifd_flush(void)
@@ -1430,24 +1430,23 @@ static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
                             bool last_stage)
 {
     int pages;
-    uint16_t fd_num;
+//    uint16_t fd_num;
     RAMBlock *block = pss->block;
     ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
 
     pages = save_zero_page(rs, block, offset);
     if (pages == -1) {
-        ram_counters.transferred +=
-            save_page_header(rs, rs->f, block,
-                             offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
-        fd_num = multifd_send_page(block, offset,
-                                   rs->migration_dirty_pages == 1);
-        qemu_put_be16(rs->f, fd_num);
-        if (fd_num != MULTIFD_CONTINUE) {
-            /* We start with a different channel.
-               Flush pending work */
-            qemu_fflush(rs->f);
-        }
-        ram_counters.transferred += 2; /* size of fd_num */
+//        ram_counters.transferred +=
+//            save_page_header(rs, rs->f, block,
+//                             offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
+        multifd_send_page(block, offset, rs->migration_dirty_pages == 1);
+//        qemu_put_be16(rs->f, fd_num);
+//        if (fd_num != MULTIFD_CONTINUE) {
+//            /* We start with a different channel.
+//               Flush pending work */
+//            qemu_fflush(rs->f);
+//        }
+//        ram_counters.transferred += 2; /* size of fd_num */
         ram_counters.transferred += TARGET_PAGE_SIZE;
         pages = 1;
         ram_counters.normal++;
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [RFC 00/14] Multifd
  2018-01-10 12:47 [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
                   ` (13 preceding siblings ...)
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 14/14] all works Juan Quintela
@ 2018-01-10 15:01 ` Juan Quintela
  14 siblings, 0 replies; 26+ messages in thread
From: Juan Quintela @ 2018-01-10 15:01 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Juan Quintela <quintela@redhat.com> wrote:
> Hi
>
> For V10:
> - It is a request for comments
>
> - I changed as suggested from Paolo on KVM forum to send page metadata
>   over the channels
>
> - I simplified lots of things
> - Added reviews
> - it is not finished, in no-particular order:
>   * old code is not removed for comparison for the people that looked at
>     the previous one
>   * it would be removed later
>   * synchronization at the end of each stage will be done with a flags field
>     on the communication channel.
> - Yes, there is commented code on the last patch.
> - I am open to suggestion about how to call the pacet "message" with
>   metadata.

Oops, somehow I sent the wrong branch, sorry.

End result is similar, but last patch have some better comments, and the
LOCAL: one was mean to be that, local for me O:-)

Sorry for the noise, Juan.

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH v10 01/14] migration: Make migrate_fd_error() the owner of the Error
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 01/14] migration: Make migrate_fd_error() the owner of the Error Juan Quintela
@ 2018-01-12 18:50   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 26+ messages in thread
From: Dr. David Alan Gilbert @ 2018-01-12 18:50 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> So far, we had to free the error after each caller, so just do it
> here.  Once there, tls.c was leaking the error.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

This clashes a bit with my
'Route errors down through migration_channel_connect'
'Allow migrate_fd_connect to take an Error *'

pair; I think they also clean up the leak and actually end up displaying
the error.

Dave

> ---
>  migration/channel.c   |  1 -
>  migration/migration.c | 10 ++++------
>  migration/migration.h |  4 ++--
>  migration/socket.c    |  1 -
>  4 files changed, 6 insertions(+), 10 deletions(-)
> 
> diff --git a/migration/channel.c b/migration/channel.c
> index 70ec7ea3b7..1dd2ae1530 100644
> --- a/migration/channel.c
> +++ b/migration/channel.c
> @@ -71,7 +71,6 @@ void migration_channel_connect(MigrationState *s,
>          migration_tls_channel_connect(s, ioc, hostname, &local_err);
>          if (local_err) {
>              migrate_fd_error(s, local_err);
> -            error_free(local_err);
>          }
>      } else {
>          QEMUFile *f = qemu_fopen_channel_output(ioc);
> diff --git a/migration/migration.c b/migration/migration.c
> index a5be4592a6..085e88c625 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -1159,16 +1159,14 @@ static void migrate_fd_cleanup(void *opaque)
>      block_cleanup_parameters(s);
>  }
>  
> -void migrate_set_error(MigrationState *s, const Error *error)
> +void migrate_set_error(MigrationState *s, Error *error)
>  {
>      qemu_mutex_lock(&s->error_mutex);
> -    if (!s->error) {
> -        s->error = error_copy(error);
> -    }
> +    error_propagate(&s->error, error);
>      qemu_mutex_unlock(&s->error_mutex);
>  }
>  
> -void migrate_fd_error(MigrationState *s, const Error *error)
> +void migrate_fd_error(MigrationState *s, Error *error)
>  {
>      trace_migrate_fd_error(error_get_pretty(error));
>      assert(s->to_dst_file == NULL);
> @@ -1448,7 +1446,7 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
>      }
>  
>      if (local_err) {
> -        migrate_fd_error(s, local_err);
> +        migrate_fd_error(s, error_copy(local_err));
>          error_propagate(errp, local_err);
>          return;
>      }
> diff --git a/migration/migration.h b/migration/migration.h
> index 2c8c53847a..29a7b79a39 100644
> --- a/migration/migration.h
> +++ b/migration/migration.h
> @@ -177,8 +177,8 @@ bool  migration_has_all_channels(void);
>  
>  uint64_t migrate_max_downtime(void);
>  
> -void migrate_set_error(MigrationState *s, const Error *error);
> -void migrate_fd_error(MigrationState *s, const Error *error);
> +void migrate_set_error(MigrationState *s, Error *error);
> +void migrate_fd_error(MigrationState *s, Error *error);
>  
>  void migrate_fd_connect(MigrationState *s);
>  
> diff --git a/migration/socket.c b/migration/socket.c
> index 248a798543..6d49903978 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -81,7 +81,6 @@ static void socket_outgoing_migration(QIOTask *task,
>      if (qio_task_propagate_error(task, &err)) {
>          trace_migration_socket_outgoing_error(error_get_pretty(err));
>          migrate_fd_error(data->s, err);
> -        error_free(err);
>      } else {
>          trace_migration_socket_outgoing_connected(data->hostname);
>          migration_channel_connect(data->s, sioc, data->hostname);
> -- 
> 2.14.3
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH v10 03/14] migration: Drop current address parameter from save_zero_page()
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 03/14] migration: Drop current address parameter from save_zero_page() Juan Quintela
@ 2018-01-12 18:56   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 26+ messages in thread
From: Dr. David Alan Gilbert @ 2018-01-12 18:56 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> It already has RAMBlock and offset, it can calculate it itself.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> ---
>  migration/ram.c | 11 +++++------
>  1 file changed, 5 insertions(+), 6 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index cb1950f3eb..5a109efeda 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -907,11 +907,10 @@ static void migration_bitmap_sync(RAMState *rs)
>   * @rs: current RAM state
>   * @block: block that contains the page we want to send
>   * @offset: offset inside the block for the page
> - * @p: pointer to the page
>   */
> -static int save_zero_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
> -                          uint8_t *p)
> +static int save_zero_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
>  {
> +    uint8_t *p = block->host + offset;
>      int pages = -1;
>  
>      if (is_zero_range(p, TARGET_PAGE_SIZE)) {
> @@ -984,7 +983,7 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
>              }
>          }
>      } else {
> -        pages = save_zero_page(rs, block, offset, p);
> +        pages = save_zero_page(rs, block, offset);
>          if (pages > 0) {
>              /* Must let xbzrle know, otherwise a previous (now 0'd) cached
>               * page would be stale
> @@ -1160,7 +1159,7 @@ static int ram_save_compressed_page(RAMState *rs, PageSearchStatus *pss,
>           */
>          if (block != rs->last_sent_block) {
>              flush_compressed_data(rs);
> -            pages = save_zero_page(rs, block, offset, p);
> +            pages = save_zero_page(rs, block, offset);
>              if (pages == -1) {
>                  /* Make sure the first page is sent out before other pages */
>                  bytes_xmit = save_page_header(rs, rs->f, block, offset |
> @@ -1180,7 +1179,7 @@ static int ram_save_compressed_page(RAMState *rs, PageSearchStatus *pss,
>                  ram_release_pages(block->idstr, offset, pages);
>              }
>          } else {
> -            pages = save_zero_page(rs, block, offset, p);
> +            pages = save_zero_page(rs, block, offset);
>              if (pages == -1) {
>                  pages = compress_page_with_multi_thread(rs, block, offset);
>              } else {
> -- 
> 2.14.3
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH v10 04/14] migration: Start of multiple fd work
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 04/14] migration: Start of multiple fd work Juan Quintela
@ 2018-01-22  7:00   ` Peter Xu
  2018-01-23 19:52   ` Dr. David Alan Gilbert
  1 sibling, 0 replies; 26+ messages in thread
From: Peter Xu @ 2018-01-22  7:00 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, dgilbert, lvivier

On Wed, Jan 10, 2018 at 01:47:13PM +0100, Juan Quintela wrote:
> We create new channels for each new thread created. We send through
> them in a packed struct.  This way we can check we connect the right
> channels in both sides.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> --
> Split SocketArgs into incoming and outgoing args
> 
> Use UUID's on the initial message, so we are sure we are connecting to
> the right channel.
> 
> Remove init semaphore.  Now that we use uuids on the init message, we
> know that this is our channel.
> 
> Fix recv socket destwroy, we were destroying send channels.
> This was very interesting, because we were using an unreferred object
> without problems.
> 
> Move to struct of pointers
> init channel sooner.
> split recv thread creation.
> listen on main thread
> We count the number of created threads to know when we need to stop listening
> Use g_strdup_printf
> report channel id on errors
> Add name parameter
> Use local_err
> Add Error * parameter to socket_send_channel_create()
> Use qio_channel_*_all
> Use asynchronous connect
> Use an struct to send all fields
> Use default uuid
> Fix local_err = NULL (dave)
> Make lines 80 lines long (checkpatch)
> Move multifd_new_channel() and multifd_recv_thread() to later patches
> when used.
> Add __attribute__(packad)
> Use UUIDs are opaques isntead of the ASCII represantation
> rename migrate_new_channel_async to migrate_new_send_channel_async
> rename recv_channel_destroy to _unref.  And create the pairing _ref.
> ---
>  migration/migration.c |   7 +++-
>  migration/ram.c       | 114 +++++++++++++++++++++++++++++++++++---------------
>  migration/ram.h       |   3 ++
>  migration/socket.c    |  39 ++++++++++++++++-
>  migration/socket.h    |  10 +++++
>  5 files changed, 137 insertions(+), 36 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index e506b9c2c6..77fc17f723 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -426,7 +426,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
>          QEMUFile *f = qemu_fopen_channel_input(ioc);
>          migration_fd_process_incoming(f);
>      }
> -    /* We still only have a single channel.  Nothing to do here yet */
> +    multifd_recv_new_channel(ioc);
>  }
>  
>  /**
> @@ -437,6 +437,11 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
>   */
>  bool migration_has_all_channels(void)
>  {
> +    if (migrate_use_multifd()) {
> +        int thread_count = migrate_multifd_channels();
> +
> +        return thread_count == multifd_created_channels();
> +    }
>      return true;
>  }
>  
> diff --git a/migration/ram.c b/migration/ram.c
> index 5a109efeda..aef5a323f3 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -36,6 +36,7 @@
>  #include "xbzrle.h"
>  #include "ram.h"
>  #include "migration.h"
> +#include "socket.h"
>  #include "migration/register.h"
>  #include "migration/misc.h"
>  #include "qemu-file.h"
> @@ -49,6 +50,8 @@
>  #include "qemu/rcu_queue.h"
>  #include "migration/colo.h"
>  #include "migration/block.h"
> +#include "sysemu/sysemu.h"
> +#include "qemu/uuid.h"
>  
>  /***********************************************************/
>  /* ram save/restore */
> @@ -396,6 +399,7 @@ struct MultiFDSendParams {
>      uint8_t id;
>      char *name;
>      QemuThread thread;
> +    QIOChannel *c;
>      QemuSemaphore sem;
>      QemuMutex mutex;
>      bool quit;
> @@ -412,6 +416,15 @@ static void terminate_multifd_send_threads(Error *errp)
>  {
>      int i;
>  
> +    if (errp) {
> +        MigrationState *s = migrate_get_current();
> +        migrate_set_error(s, errp);
> +        if (s->state == MIGRATION_STATUS_SETUP ||
> +            s->state == MIGRATION_STATUS_ACTIVE) {
> +            migrate_set_state(&s->state, s->state,
> +                              MIGRATION_STATUS_FAILED);

I'm fine with it, but could I ask why we explicitly pass the error
into this function and handle the state machine transition here?
Asked since IMHO we know the error already when calling
terminate_multifd_send_threads() because we passed it in as parameter,
then why not we do that there?

[1]

> +        }
> +    }
>      for (i = 0; i < multifd_send_state->count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
> @@ -437,6 +450,7 @@ int multifd_save_cleanup(Error **errp)
>          qemu_thread_join(&p->thread);
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        socket_send_channel_destroy(p->c);
>          g_free(p->name);
>          p->name = NULL;
>      }
> @@ -447,9 +461,27 @@ int multifd_save_cleanup(Error **errp)
>      return ret;
>  }
>  
> +typedef struct {
> +    uint32_t version;
> +    unsigned char uuid[16]; /* QemuUUID */
> +    uint8_t id;
> +} __attribute__((packed)) MultiFDInit_t;
> +
>  static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *p = opaque;
> +    MultiFDInit_t msg;
> +    Error *local_err = NULL;
> +    size_t ret;
> +
> +    msg.version = 1;
> +    msg.id = p->id;
> +    memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));

I don't remember whether I asked before, but... do we need to handle
the BE/LE convertion?  Say, do we need to support migration to happen
between BE/LE hosts with multifd?

> +    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), &local_err);
> +    if (ret != 0) {
> +        terminate_multifd_send_threads(local_err);

Hmm... so we are in a thread handler, but we are calling the global
cleanup function for all the threads.  Will this function be called
more than once?  I think it'll also be called in
multifd_save_cleanup() at last too (that seems to be N+1 times)?

Shall we just keep the only one in multifd_save_cleanup() since after
all we should reach there when error happens?

> +        return NULL;
> +    }
>  
>      while (true) {
>          qemu_mutex_lock(&p->mutex);
> @@ -464,6 +496,27 @@ static void *multifd_send_thread(void *opaque)
>      return NULL;
>  }
>  
> +static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
> +{
> +    MultiFDSendParams *p = opaque;
> +    QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
> +    Error *local_err = NULL;
> +
> +    if (qio_task_propagate_error(task, &local_err)) {
> +        if (multifd_save_cleanup(&local_err) != 0) {

Similar question here: are we calling multifd_save_cleanup() (which is
a function to clean up everything related to multifd) in a per-channel
handler?  Will it be called more than once somehow?

> +            migrate_set_error(migrate_get_current(), local_err);
> +        }
> +    } else {
> +        p->c = QIO_CHANNEL(sioc);
> +        qio_channel_set_delay(p->c, false);
> +
> +        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
> +                           QEMU_THREAD_JOINABLE);
> +
> +        multifd_send_state->count++;
> +    }
> +}
> +
>  int multifd_save_setup(void)
>  {
>      int thread_count;
> @@ -484,10 +537,7 @@ int multifd_save_setup(void)
>          p->quit = false;
>          p->id = i;
>          p->name = g_strdup_printf("multifdsend_%d", i);
> -        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
> -                           QEMU_THREAD_JOINABLE);
> -
> -        multifd_send_state->count++;
> +        socket_send_channel_create(multifd_new_send_channel_async, p);
>      }
>      return 0;
>  }
> @@ -496,6 +546,7 @@ struct MultiFDRecvParams {
>      uint8_t id;
>      char *name;
>      QemuThread thread;
> +    QIOChannel *c;
>      QemuSemaphore sem;
>      QemuMutex mutex;
>      bool quit;
> @@ -506,12 +557,25 @@ struct {
>      MultiFDRecvParams *params;
>      /* number of created threads */
>      int count;
> +    /* Should we finish */
> +    bool quit;
>  } *multifd_recv_state;
>  
>  static void terminate_multifd_recv_threads(Error *errp)
>  {
>      int i;
>  
> +    if (errp) {
> +        MigrationState *s = migrate_get_current();
> +        migrate_set_error(s, errp);
> +        if (s->state == MIGRATION_STATUS_SETUP ||
> +            s->state == MIGRATION_STATUS_ACTIVE) {
> +            migrate_set_state(&s->state, s->state,
> +                              MIGRATION_STATUS_FAILED);
> +        }

Similar question like above [1].  Would it be better to handle the
error before calling terminate_multifd_recv_threads(), so that we can
avoid touching the state machine in multifd code?

> +    }
> +    multifd_recv_state->quit = true;
> +
>      for (i = 0; i < multifd_recv_state->count; i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
> @@ -537,6 +601,7 @@ int multifd_load_cleanup(Error **errp)
>          qemu_thread_join(&p->thread);
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        socket_recv_channel_unref(p->c);
>          g_free(p->name);
>          p->name = NULL;
>      }
> @@ -548,27 +613,9 @@ int multifd_load_cleanup(Error **errp)
>      return ret;
>  }
>  
> -static void *multifd_recv_thread(void *opaque)
> -{
> -    MultiFDRecvParams *p = opaque;
> -
> -    while (true) {
> -        qemu_mutex_lock(&p->mutex);
> -        if (p->quit) {
> -            qemu_mutex_unlock(&p->mutex);
> -            break;
> -        }
> -        qemu_mutex_unlock(&p->mutex);
> -        qemu_sem_wait(&p->sem);
> -    }
> -
> -    return NULL;
> -}
> -
>  int multifd_load_setup(void)
>  {
>      int thread_count;
> -    uint8_t i;
>  
>      if (!migrate_use_multifd()) {
>          return 0;
> @@ -577,21 +624,20 @@ int multifd_load_setup(void)
>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
>      multifd_recv_state->count = 0;
> -    for (i = 0; i < thread_count; i++) {
> -        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> -
> -        qemu_mutex_init(&p->mutex);
> -        qemu_sem_init(&p->sem, 0);
> -        p->quit = false;
> -        p->id = i;
> -        p->name = g_strdup_printf("multifdrecv_%d", i);
> -        qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
> -                           QEMU_THREAD_JOINABLE);
> -        multifd_recv_state->count++;
> -    }
> +    multifd_recv_state->quit = false;
>      return 0;
>  }
>  
> +int multifd_created_channels(void)
> +{
> +    return multifd_recv_state->count;
> +}
> +
> +void multifd_recv_new_channel(QIOChannel *ioc)
> +{
> +    socket_recv_channel_unref(ioc);
> +}
> +
>  /**
>   * save_page_header: write page header to wire
>   *
> diff --git a/migration/ram.h b/migration/ram.h
> index 64d81e9f1d..be7d09d0ec 100644
> --- a/migration/ram.h
> +++ b/migration/ram.h
> @@ -31,6 +31,7 @@
>  
>  #include "qemu-common.h"
>  #include "exec/cpu-common.h"
> +#include "io/channel.h"
>  
>  extern MigrationStats ram_counters;
>  extern XBZRLECacheStats xbzrle_counters;
> @@ -43,6 +44,8 @@ int multifd_save_setup(void);
>  int multifd_save_cleanup(Error **errp);
>  int multifd_load_setup(void);
>  int multifd_load_cleanup(Error **errp);
> +int multifd_created_channels(void);
> +void multifd_recv_new_channel(QIOChannel *ioc);
>  
>  uint64_t ram_pagesize_summary(void);
>  int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len);
> diff --git a/migration/socket.c b/migration/socket.c
> index 6d49903978..aedabee8a1 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -27,6 +27,39 @@
>  #include "io/channel-socket.h"
>  #include "trace.h"
>  
> +int socket_recv_channel_ref(QIOChannel *recv)
> +{
> +    object_ref(OBJECT(recv));
> +    return 0;
> +}
> +
> +int socket_recv_channel_unref(QIOChannel *recv)
> +{
> +    object_unref(OBJECT(recv));
> +    return 0;
> +}
> +
> +struct SocketOutgoingArgs {
> +    SocketAddress *saddr;
> +} outgoing_args;
> +
> +void socket_send_channel_create(void (*f)(QIOTask *, gpointer), void *data)
> +{
> +    QIOChannelSocket *sioc = qio_channel_socket_new();
> +    qio_channel_socket_connect_async(sioc, outgoing_args.saddr,
> +                                     f, data, NULL);
> +}
> +
> +int socket_send_channel_destroy(QIOChannel *send)
> +{
> +    /* Remove channel */
> +    object_unref(OBJECT(send));
> +    if (outgoing_args.saddr) {
> +        qapi_free_SocketAddress(outgoing_args.saddr);
> +        outgoing_args.saddr = NULL;

I would still prefer to free global variables somewhere else, rather
than doing it in per-channel destructor.  At least IMHO if we free
globals in socket_send_channel_destroy() then the function will not be
thread-safe, or we can have double free here?

Thanks,

-- 
Peter Xu

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH v10 04/14] migration: Start of multiple fd work
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 04/14] migration: Start of multiple fd work Juan Quintela
  2018-01-22  7:00   ` Peter Xu
@ 2018-01-23 19:52   ` Dr. David Alan Gilbert
  1 sibling, 0 replies; 26+ messages in thread
From: Dr. David Alan Gilbert @ 2018-01-23 19:52 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> We create new channels for each new thread created. We send through
> them in a packed struct.  This way we can check we connect the right
> channels in both sides.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

General comment; given it's reasonably large, it would be nice to split
this into a send and a receive patch.
Some tracing wouldn't do any harm - it's going to be fun debugging
failures where some sockets connect and then fail, so tracing would
help.


> --
> Split SocketArgs into incoming and outgoing args
> 
> Use UUID's on the initial message, so we are sure we are connecting to
> the right channel.
> 
> Remove init semaphore.  Now that we use uuids on the init message, we
> know that this is our channel.
> 
> Fix recv socket destwroy, we were destroying send channels.
> This was very interesting, because we were using an unreferred object
> without problems.
> 
> Move to struct of pointers
> init channel sooner.
> split recv thread creation.
> listen on main thread
> We count the number of created threads to know when we need to stop listening
> Use g_strdup_printf
> report channel id on errors
> Add name parameter
> Use local_err
> Add Error * parameter to socket_send_channel_create()
> Use qio_channel_*_all
> Use asynchronous connect
> Use an struct to send all fields
> Use default uuid
> Fix local_err = NULL (dave)
> Make lines 80 lines long (checkpatch)
> Move multifd_new_channel() and multifd_recv_thread() to later patches
> when used.
> Add __attribute__(packad)
> Use UUIDs are opaques isntead of the ASCII represantation
> rename migrate_new_channel_async to migrate_new_send_channel_async
> rename recv_channel_destroy to _unref.  And create the pairing _ref.
> ---
>  migration/migration.c |   7 +++-
>  migration/ram.c       | 114 +++++++++++++++++++++++++++++++++++---------------
>  migration/ram.h       |   3 ++
>  migration/socket.c    |  39 ++++++++++++++++-
>  migration/socket.h    |  10 +++++
>  5 files changed, 137 insertions(+), 36 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index e506b9c2c6..77fc17f723 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -426,7 +426,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
>          QEMUFile *f = qemu_fopen_channel_input(ioc);
>          migration_fd_process_incoming(f);
>      }
> -    /* We still only have a single channel.  Nothing to do here yet */
> +    multifd_recv_new_channel(ioc);

OK, that looks a little odd at the moment - do we grow the
recv_new_channel and still leave the first one in there?

>  }
>  
>  /**
> @@ -437,6 +437,11 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
>   */
>  bool migration_has_all_channels(void)
>  {
> +    if (migrate_use_multifd()) {
> +        int thread_count = migrate_multifd_channels();
> +
> +        return thread_count == multifd_created_channels();
> +    }
>      return true;
>  }
>  
> diff --git a/migration/ram.c b/migration/ram.c
> index 5a109efeda..aef5a323f3 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -36,6 +36,7 @@
>  #include "xbzrle.h"
>  #include "ram.h"
>  #include "migration.h"
> +#include "socket.h"
>  #include "migration/register.h"
>  #include "migration/misc.h"
>  #include "qemu-file.h"
> @@ -49,6 +50,8 @@
>  #include "qemu/rcu_queue.h"
>  #include "migration/colo.h"
>  #include "migration/block.h"
> +#include "sysemu/sysemu.h"
> +#include "qemu/uuid.h"
>  
>  /***********************************************************/
>  /* ram save/restore */
> @@ -396,6 +399,7 @@ struct MultiFDSendParams {
>      uint8_t id;
>      char *name;
>      QemuThread thread;
> +    QIOChannel *c;
>      QemuSemaphore sem;
>      QemuMutex mutex;
>      bool quit;
> @@ -412,6 +416,15 @@ static void terminate_multifd_send_threads(Error *errp)
>  {
>      int i;
>  
> +    if (errp) {
> +        MigrationState *s = migrate_get_current();
> +        migrate_set_error(s, errp);
> +        if (s->state == MIGRATION_STATUS_SETUP ||
> +            s->state == MIGRATION_STATUS_ACTIVE) {
> +            migrate_set_state(&s->state, s->state,
> +                              MIGRATION_STATUS_FAILED);
> +        }
> +    }
>      for (i = 0; i < multifd_send_state->count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
> @@ -437,6 +450,7 @@ int multifd_save_cleanup(Error **errp)
>          qemu_thread_join(&p->thread);
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        socket_send_channel_destroy(p->c);
>          g_free(p->name);
>          p->name = NULL;
>      }
> @@ -447,9 +461,27 @@ int multifd_save_cleanup(Error **errp)
>      return ret;
>  }
>  
> +typedef struct {
> +    uint32_t version;
> +    unsigned char uuid[16]; /* QemuUUID */
> +    uint8_t id;
> +} __attribute__((packed)) MultiFDInit_t;
> +
>  static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *p = opaque;
> +    MultiFDInit_t msg;
> +    Error *local_err = NULL;
> +    size_t ret;
> +
> +    msg.version = 1;
> +    msg.id = p->id;
> +    memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
> +    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), &local_err);
> +    if (ret != 0) {
> +        terminate_multifd_send_threads(local_err);
> +        return NULL;
> +    }
>  
>      while (true) {
>          qemu_mutex_lock(&p->mutex);
> @@ -464,6 +496,27 @@ static void *multifd_send_thread(void *opaque)
>      return NULL;
>  }
>  
> +static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
> +{
> +    MultiFDSendParams *p = opaque;
> +    QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
> +    Error *local_err = NULL;
> +
> +    if (qio_task_propagate_error(task, &local_err)) {
> +        if (multifd_save_cleanup(&local_err) != 0) {
> +            migrate_set_error(migrate_get_current(), local_err);
> +        }
> +    } else {
> +        p->c = QIO_CHANNEL(sioc);
> +        qio_channel_set_delay(p->c, false);
> +
> +        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
> +                           QEMU_THREAD_JOINABLE);
> +
> +        multifd_send_state->count++;
> +    }
> +}
> +
>  int multifd_save_setup(void)
>  {
>      int thread_count;
> @@ -484,10 +537,7 @@ int multifd_save_setup(void)
>          p->quit = false;
>          p->id = i;
>          p->name = g_strdup_printf("multifdsend_%d", i);
> -        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
> -                           QEMU_THREAD_JOINABLE);
> -
> -        multifd_send_state->count++;
> +        socket_send_channel_create(multifd_new_send_channel_async, p);
>      }
>      return 0;
>  }
> @@ -496,6 +546,7 @@ struct MultiFDRecvParams {
>      uint8_t id;
>      char *name;
>      QemuThread thread;
> +    QIOChannel *c;
>      QemuSemaphore sem;
>      QemuMutex mutex;
>      bool quit;
> @@ -506,12 +557,25 @@ struct {
>      MultiFDRecvParams *params;
>      /* number of created threads */
>      int count;
> +    /* Should we finish */
> +    bool quit;
>  } *multifd_recv_state;
>  
>  static void terminate_multifd_recv_threads(Error *errp)
>  {
>      int i;
>  
> +    if (errp) {
> +        MigrationState *s = migrate_get_current();
> +        migrate_set_error(s, errp);
> +        if (s->state == MIGRATION_STATUS_SETUP ||
> +            s->state == MIGRATION_STATUS_ACTIVE) {
> +            migrate_set_state(&s->state, s->state,
> +                              MIGRATION_STATUS_FAILED);
> +        }
> +    }
> +    multifd_recv_state->quit = true;
> +
>      for (i = 0; i < multifd_recv_state->count; i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
> @@ -537,6 +601,7 @@ int multifd_load_cleanup(Error **errp)
>          qemu_thread_join(&p->thread);
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        socket_recv_channel_unref(p->c);

I'm a bit confused about who ref's what;  multifd_recv_new_channel is
currently doing an unref, but we've also got this.
(Although I guess are the receive threads not added to the list yet
so it's not a problem??)

>          g_free(p->name);
>          p->name = NULL;
>      }
> @@ -548,27 +613,9 @@ int multifd_load_cleanup(Error **errp)
>      return ret;
>  }
>  
> -static void *multifd_recv_thread(void *opaque)
> -{
> -    MultiFDRecvParams *p = opaque;
> -
> -    while (true) {
> -        qemu_mutex_lock(&p->mutex);
> -        if (p->quit) {
> -            qemu_mutex_unlock(&p->mutex);
> -            break;
> -        }
> -        qemu_mutex_unlock(&p->mutex);
> -        qemu_sem_wait(&p->sem);
> -    }
> -
> -    return NULL;
> -}
> -
>  int multifd_load_setup(void)
>  {
>      int thread_count;
> -    uint8_t i;
>  
>      if (!migrate_use_multifd()) {
>          return 0;
> @@ -577,21 +624,20 @@ int multifd_load_setup(void)
>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
>      multifd_recv_state->count = 0;
> -    for (i = 0; i < thread_count; i++) {
> -        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> -
> -        qemu_mutex_init(&p->mutex);
> -        qemu_sem_init(&p->sem, 0);
> -        p->quit = false;
> -        p->id = i;
> -        p->name = g_strdup_printf("multifdrecv_%d", i);
> -        qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
> -                           QEMU_THREAD_JOINABLE);
> -        multifd_recv_state->count++;
> -    }
> +    multifd_recv_state->quit = false;
>      return 0;
>  }
>  
> +int multifd_created_channels(void)
> +{
> +    return multifd_recv_state->count;
> +}
> +
> +void multifd_recv_new_channel(QIOChannel *ioc)
> +{
> +    socket_recv_channel_unref(ioc);
> +}
> +
>  /**
>   * save_page_header: write page header to wire
>   *
> diff --git a/migration/ram.h b/migration/ram.h
> index 64d81e9f1d..be7d09d0ec 100644
> --- a/migration/ram.h
> +++ b/migration/ram.h
> @@ -31,6 +31,7 @@
>  
>  #include "qemu-common.h"
>  #include "exec/cpu-common.h"
> +#include "io/channel.h"
>  
>  extern MigrationStats ram_counters;
>  extern XBZRLECacheStats xbzrle_counters;
> @@ -43,6 +44,8 @@ int multifd_save_setup(void);
>  int multifd_save_cleanup(Error **errp);
>  int multifd_load_setup(void);
>  int multifd_load_cleanup(Error **errp);
> +int multifd_created_channels(void);
> +void multifd_recv_new_channel(QIOChannel *ioc);
>  
>  uint64_t ram_pagesize_summary(void);
>  int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len);
> diff --git a/migration/socket.c b/migration/socket.c
> index 6d49903978..aedabee8a1 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -27,6 +27,39 @@
>  #include "io/channel-socket.h"
>  #include "trace.h"
>  
> +int socket_recv_channel_ref(QIOChannel *recv)
> +{
> +    object_ref(OBJECT(recv));
> +    return 0;
> +}
> +
> +int socket_recv_channel_unref(QIOChannel *recv)
> +{
> +    object_unref(OBJECT(recv));
> +    return 0;
> +}
> +
> +struct SocketOutgoingArgs {
> +    SocketAddress *saddr;
> +} outgoing_args;
> +
> +void socket_send_channel_create(void (*f)(QIOTask *, gpointer), void *data)
> +{
> +    QIOChannelSocket *sioc = qio_channel_socket_new();
> +    qio_channel_socket_connect_async(sioc, outgoing_args.saddr,
> +                                     f, data, NULL);
> +}
> +
> +int socket_send_channel_destroy(QIOChannel *send)
> +{
> +    /* Remove channel */
> +    object_unref(OBJECT(send));
> +    if (outgoing_args.saddr) {
> +        qapi_free_SocketAddress(outgoing_args.saddr);
> +        outgoing_args.saddr = NULL;
> +    }
> +    return 0;
> +}
>  
>  static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
>  {
> @@ -96,6 +129,11 @@ static void socket_start_outgoing_migration(MigrationState *s,
>      struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
>  
>      data->s = s;
> +
> +    /* in case previous migration leaked it */
> +    qapi_free_SocketAddress(outgoing_args.saddr);
> +    outgoing_args.saddr = saddr;
> +
>      if (saddr->type == SOCKET_ADDRESS_TYPE_INET) {
>          data->hostname = g_strdup(saddr->u.inet.host);
>      }
> @@ -106,7 +144,6 @@ static void socket_start_outgoing_migration(MigrationState *s,
>                                       socket_outgoing_migration,
>                                       data,
>                                       socket_connect_data_free);
> -    qapi_free_SocketAddress(saddr);
>  }
>  
>  void tcp_start_outgoing_migration(MigrationState *s,
> diff --git a/migration/socket.h b/migration/socket.h
> index 6b91e9db38..cbdb8d64c3 100644
> --- a/migration/socket.h
> +++ b/migration/socket.h
> @@ -16,6 +16,16 @@
>  
>  #ifndef QEMU_MIGRATION_SOCKET_H
>  #define QEMU_MIGRATION_SOCKET_H
> +
> +#include "io/channel.h"
> +#include "io/task.h"
> +
> +int socket_recv_channel_ref(QIOChannel *recv);
> +int socket_recv_channel_unref(QIOChannel *recv);
> +
> +void socket_send_channel_create(void (*f)(QIOTask *, gpointer), void *data);
> +int socket_send_channel_destroy(QIOChannel *send);
> +
>  void tcp_start_incoming_migration(const char *host_port, Error **errp);
>  
>  void tcp_start_outgoing_migration(MigrationState *s, const char *host_port,
> -- 
> 2.14.3
> 

Dave

--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH v10 05/14] migration: Create ram_multifd_page
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 05/14] migration: Create ram_multifd_page Juan Quintela
@ 2018-01-23 20:16   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 26+ messages in thread
From: Dr. David Alan Gilbert @ 2018-01-23 20:16 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> The function still don't use multifd, but we have simplified
> ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
> counter and a new flag for this type of pages.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> --
> Add last_page parameter
> Add commets for done and address
> Remove multifd field, it is the same than normal pages
> Merge next patch, now we send multiple pages at a time
> Remove counter for multifd pages, it is identical to normal pages
> Use iovec's instead of creating the equivalent.
> Clear memory used by pages (dave)
> Use g_new0(danp)
> define MULTIFD_CONTINUE
> now pages member is a pointer
> Fix off-by-one in number of pages in one packet
> ---
>  migration/ram.c        | 159 ++++++++++++++++++++++++++++++++++++++++++++++++-
>  migration/trace-events |   2 +
>  2 files changed, 160 insertions(+), 1 deletion(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index aef5a323f3..5d6b46ac23 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -52,6 +52,7 @@
>  #include "migration/block.h"
>  #include "sysemu/sysemu.h"
>  #include "qemu/uuid.h"
> +#include "qemu/iov.h"
>  
>  /***********************************************************/
>  /* ram save/restore */
> @@ -71,6 +72,7 @@
>  #define RAM_SAVE_FLAG_XBZRLE   0x40
>  /* 0x80 is reserved in migration.h start with 0x100 next */
>  #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
> +#define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
>  
>  static inline bool is_zero_range(uint8_t *p, uint64_t size)
>  {
> @@ -395,14 +397,36 @@ static void compress_threads_save_setup(void)
>  
>  /* Multiple fd's */
>  
> +/* used to continue on the same multifd group */
> +#define MULTIFD_CONTINUE UINT16_MAX
> +
> +typedef struct {
> +    /* number of used pages */

      Telling us it's used doesn't help much; if I understand
right, this is for accumulating them into one large block
before actually sending them?

> +    uint32_t used;
> +    /* number of allocated pages */
> +    uint32_t allocated;
> +    /* global number of generated multifd packets */
> +    uint32_t seq;
> +    struct iovec *iov;
> +    RAMBlock *block;
> +} multifd_pages_t;
> +
>  struct MultiFDSendParams {
> +    /* not changed */

???

>      uint8_t id;
>      char *name;
>      QemuThread thread;
>      QIOChannel *c;
>      QemuSemaphore sem;
>      QemuMutex mutex;
> +    /* protected by param mutex */
>      bool quit;
> +    multifd_pages_t *pages;
> +    /* how many patches has sent this channel */

s/patches/packets/

> +    uint32_t packets_sent;
> +    /* protected by multifd mutex */
> +    /* has the thread finish the last submitted job */
> +    bool done;
>  };
>  typedef struct MultiFDSendParams MultiFDSendParams;
>  
> @@ -410,8 +434,31 @@ struct {
>      MultiFDSendParams *params;
>      /* number of created threads */
>      int count;
> +    QemuMutex mutex;
> +    QemuSemaphore sem;
> +    multifd_pages_t *pages;
>  } *multifd_send_state;
>  
> +static void multifd_pages_init(multifd_pages_t **ppages, size_t size)

What is the 'size' here - it's allocated pages for something?

> +{
> +    multifd_pages_t *pages = g_new0(multifd_pages_t, 1);
> +
> +    pages->allocated = size;
> +    pages->iov = g_new0(struct iovec, size);
> +    *ppages = pages;
> +}
> +
> +static void multifd_pages_clear(multifd_pages_t *pages)
> +{
> +    pages->used = 0;
> +    pages->allocated = 0;
> +    pages->seq = 0;
> +    pages->block = NULL;
> +    g_free(pages->iov);
> +    pages->iov = NULL;
> +    g_free(pages);
> +}
> +
>  static void terminate_multifd_send_threads(Error *errp)
>  {
>      int i;
> @@ -453,9 +500,13 @@ int multifd_save_cleanup(Error **errp)
>          socket_send_channel_destroy(p->c);
>          g_free(p->name);
>          p->name = NULL;
> +        multifd_pages_clear(p->pages);
> +        p->pages = NULL;
>      }
>      g_free(multifd_send_state->params);
>      multifd_send_state->params = NULL;
> +    multifd_pages_clear(multifd_send_state->pages);
> +    multifd_send_state->pages = NULL;
>      g_free(multifd_send_state);
>      multifd_send_state = NULL;
>      return ret;
> @@ -482,6 +533,7 @@ static void *multifd_send_thread(void *opaque)
>          terminate_multifd_send_threads(local_err);
>          return NULL;
>      }
> +    qemu_sem_post(&multifd_send_state->sem);
>  
>      while (true) {
>          qemu_mutex_lock(&p->mutex);
> @@ -489,9 +541,24 @@ static void *multifd_send_thread(void *opaque)
>              qemu_mutex_unlock(&p->mutex);
>              break;
>          }
> +        if (p->pages->used) {
> +            p->pages->used = 0;
> +            qemu_mutex_unlock(&p->mutex);
> +
> +            trace_multifd_send(p->id, p->pages->seq, p->pages->used);

but p->pages->used is just been set to 0?

> +            /* ToDo: send page here */
> +
> +            qemu_mutex_lock(&multifd_send_state->mutex);
> +            p->done = true;
> +            p->packets_sent++;
> +            qemu_mutex_unlock(&multifd_send_state->mutex);
> +            qemu_sem_post(&multifd_send_state->sem);
> +            continue;
> +        }
>          qemu_mutex_unlock(&p->mutex);
>          qemu_sem_wait(&p->sem);
>      }
> +    trace_multifd_send_thread(p->id, p->packets_sent);
>  
>      return NULL;
>  }
> @@ -529,6 +596,10 @@ int multifd_save_setup(void)
>      multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
>      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
>      multifd_send_state->count = 0;
> +    qemu_mutex_init(&multifd_send_state->mutex);
> +    qemu_sem_init(&multifd_send_state->sem, 0);
> +    multifd_pages_init(&multifd_send_state->pages,
> +                       migrate_multifd_page_count());
>      for (i = 0; i < thread_count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
> @@ -536,12 +607,58 @@ int multifd_save_setup(void)
>          qemu_sem_init(&p->sem, 0);
>          p->quit = false;
>          p->id = i;
> +        p->done = true;
> +        multifd_pages_init(&p->pages, migrate_multifd_page_count());
>          p->name = g_strdup_printf("multifdsend_%d", i);
>          socket_send_channel_create(multifd_new_send_channel_async, p);
>      }
>      return 0;
>  }
>  
> +static uint16_t multifd_send_page(RAMBlock *block, ram_addr_t offset,
> +                                  bool last_page)
> +{
> +    int i;
> +    MultiFDSendParams *p = NULL; /* make happy gcc */

(English: Make gcc happy)

> +    multifd_pages_t *pages = multifd_send_state->pages;
> +
> +    if (!pages->block) {
> +        pages->block = block;
> +    }
> +
> +    pages->iov[pages->used].iov_base = block->host + offset;
> +    pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
> +    pages->used++;
> +
> +    if (!last_page) {
> +        if (pages->used < pages->allocated) {
> +            return MULTIFD_CONTINUE;
> +        }
> +    }

I'm confused by this a bit.
Isn't the next bit waiting for a free thread?

> +    qemu_sem_wait(&multifd_send_state->sem);
> +    qemu_mutex_lock(&multifd_send_state->mutex);
> +    for (i = 0; i < multifd_send_state->count; i++) {
> +        p = &multifd_send_state->params[i];
> +
> +        if (p->done) {
> +            p->done = false;
> +            break;
> +        }
> +    }
> +    qemu_mutex_unlock(&multifd_send_state->mutex);
> +    qemu_mutex_lock(&p->mutex);
> +    p->pages->used = 0;

If we're handing the block of pages to the thread, I don't understand
why we zero used here.

> +    p->pages->seq = pages->seq + 1;
> +    p->pages->block = NULL;
> +    multifd_send_state->pages = p->pages;
> +    p->pages = pages;
> +    qemu_mutex_unlock(&p->mutex);
> +    qemu_sem_post(&p->sem);
> +
> +    return i;
> +}
> +
>  struct MultiFDRecvParams {
>      uint8_t id;
>      char *name;
> @@ -1070,6 +1187,31 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
>      return pages;
>  }
>  
> +static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
> +                            bool last_stage)
> +{
> +    int pages;
> +    uint8_t *p;
> +    RAMBlock *block = pss->block;
> +    ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
> +
> +    p = block->host + offset;
> +
> +    pages = save_zero_page(rs, block, offset);
> +    if (pages == -1) {
> +        ram_counters.transferred +=
> +            save_page_header(rs, rs->f, block,
> +                             offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
> +        multifd_send_page(block, offset, rs->migration_dirty_pages == 1);
> +        qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);

So that's temporary - we're hoping multifd_send_page will do that?

> +        ram_counters.transferred += TARGET_PAGE_SIZE;
> +        pages = 1;
> +        ram_counters.normal++;
> +    }
> +
> +    return pages;
> +}
> +
>  static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
>                                  ram_addr_t offset)
>  {
> @@ -1498,6 +1640,8 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
>          if (migrate_use_compression() &&
>              (rs->ram_bulk_stage || !migrate_use_xbzrle())) {
>              res = ram_save_compressed_page(rs, pss, last_stage);
> +        } else if (migrate_use_multifd()) {
> +            res = ram_multifd_page(rs, pss, last_stage);
>          } else {
>              res = ram_save_page(rs, pss, last_stage);
>          }
> @@ -2878,6 +3022,10 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>      if (!migrate_use_compression()) {
>          invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
>      }
> +
> +    if (!migrate_use_multifd()) {
> +        invalid_flags |= RAM_SAVE_FLAG_MULTIFD_PAGE;
> +    }
>      /* This RCU critical section can be very long running.
>       * When RCU reclaims in the code start to become numerous,
>       * it will be necessary to reduce the granularity of this
> @@ -2902,13 +3050,17 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>              if (flags & invalid_flags & RAM_SAVE_FLAG_COMPRESS_PAGE) {
>                  error_report("Received an unexpected compressed page");
>              }
> +            if (flags & invalid_flags  & RAM_SAVE_FLAG_MULTIFD_PAGE) {
> +                error_report("Received an unexpected multifd page");
> +            }
>  
>              ret = -EINVAL;
>              break;
>          }
>  
>          if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
> -                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
> +                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
> +                     RAM_SAVE_FLAG_MULTIFD_PAGE)) {
>              RAMBlock *block = ram_block_from_stream(f, flags);
>  
>              host = host_from_ram_block_offset(block, addr);
> @@ -2997,6 +3149,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>                  break;
>              }
>              break;
> +
> +        case RAM_SAVE_FLAG_MULTIFD_PAGE:
> +            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
> +            break;
> +
>          case RAM_SAVE_FLAG_EOS:
>              /* normal exit */
>              break;
> diff --git a/migration/trace-events b/migration/trace-events
> index 141e773305..61ee21a13e 100644
> --- a/migration/trace-events
> +++ b/migration/trace-events
> @@ -77,6 +77,8 @@ ram_load_postcopy_loop(uint64_t addr, int flags) "@%" PRIx64 " %x"
>  ram_postcopy_send_discard_bitmap(void) ""
>  ram_save_page(const char *rbname, uint64_t offset, void *host) "%s: offset: 0x%" PRIx64 " host: %p"
>  ram_save_queue_pages(const char *rbname, size_t start, size_t len) "%s: start: 0x%zx len: 0x%zx"
> +multifd_send(char id, int seq, int num) "channel %d sequence %d num pages %d"
> +multifd_send_thread(char id, uint32_t packets) "channel %d packets %d"
>  
>  # migration/migration.c
>  await_return_path_close_on_source_close(void) ""
> -- 
> 2.14.3
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH v10 07/14] migration: Create thread infrastructure for multifd recv side
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 07/14] migration: Create thread infrastructure for multifd recv side Juan Quintela
@ 2018-01-24 13:34   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 26+ messages in thread
From: Dr. David Alan Gilbert @ 2018-01-24 13:34 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> We make the locking and the transfer of information specific, even if we
> are still receiving things through the main thread.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

It might be better to split the change to
migration_incoming_process/setup out into a separate patch.
I'm not too sure what it's trying to do here.

> --
> 
> We split when we create the main channel and where we start the main
> migration thread, so we wait for the creation of the other threads.
> 
> Use multifd_clear_pages().
> Don't remove object_unref()
> We use correctly the channel numbres
> Denife multifd_new_channel/multifd_recv_thread in this patch, that is
> where it is used.
> rename migrate_new_channel to migrate_new_send_channel
> Add ToDo comment
> Add trace
> ---
>  migration/migration.c  |   5 +-
>  migration/migration.h  |   1 +
>  migration/ram.c        | 129 ++++++++++++++++++++++++++++++++++++++++++++++---
>  migration/socket.c     |   3 ++
>  migration/trace-events |   2 +
>  5 files changed, 132 insertions(+), 8 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index 77fc17f723..1545f3a0b0 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -406,7 +406,7 @@ static void migration_incoming_setup(QEMUFile *f)
>      qemu_file_set_blocking(f, false);
>  }
>  
> -static void migration_incoming_process(void)
> +void migration_incoming_process(void)
>  {
>      Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, NULL);
>      qemu_coroutine_enter(co);
> @@ -424,7 +424,8 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
>  
>      if (!mis->from_src_file) {
>          QEMUFile *f = qemu_fopen_channel_input(ioc);
> -        migration_fd_process_incoming(f);
> +        migration_incoming_setup(f);
> +        return;
>      }
>      multifd_recv_new_channel(ioc);
>  }
> diff --git a/migration/migration.h b/migration/migration.h
> index 29a7b79a39..7de193a9c0 100644
> --- a/migration/migration.h
> +++ b/migration/migration.h
> @@ -172,6 +172,7 @@ void migrate_set_state(int *state, int old_state, int new_state);
>  
>  void migration_fd_process_incoming(QEMUFile *f);
>  void migration_ioc_process_incoming(QIOChannel *ioc);
> +void migration_incoming_process(void);
>  
>  bool  migration_has_all_channels(void);
>  
> diff --git a/migration/ram.c b/migration/ram.c
> index 19c8089c4b..8443806f12 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -660,13 +660,20 @@ static uint16_t multifd_send_page(RAMBlock *block, ram_addr_t offset,
>  }
>  

mostly same comments as the sent side:

>  struct MultiFDRecvParams {
> +    /* not changed */
?

>      uint8_t id;
>      char *name;
>      QemuThread thread;
>      QIOChannel *c;
> +    QemuSemaphore ready;
>      QemuSemaphore sem;
>      QemuMutex mutex;
> +    /* proteced by param mutex */

Typo: 't'

>      bool quit;
> +    /* how many patches has sent this channel */

s/patches/packets/

> +    uint32_t packets_recv;
> +    multifd_pages_t *pages;
> +    bool done;

>  };
>  typedef struct MultiFDRecvParams MultiFDRecvParams;
>  
> @@ -676,6 +683,7 @@ struct {
>      int count;
>      /* Should we finish */
>      bool quit;
> +    multifd_pages_t *pages;
>  } *multifd_recv_state;
>  
>  static void terminate_multifd_recv_threads(Error *errp)
> @@ -721,15 +729,51 @@ int multifd_load_cleanup(Error **errp)
>          socket_recv_channel_unref(p->c);
>          g_free(p->name);
>          p->name = NULL;
> +        multifd_pages_clear(p->pages);
> +        p->pages = NULL;
>      }
>      g_free(multifd_recv_state->params);
>      multifd_recv_state->params = NULL;
> +    multifd_pages_clear(multifd_recv_state->pages);
> +    multifd_recv_state->pages = NULL;
>      g_free(multifd_recv_state);
>      multifd_recv_state = NULL;
>  
>      return ret;
>  }
>  
> +static void *multifd_recv_thread(void *opaque)
> +{
> +    MultiFDRecvParams *p = opaque;
> +
> +    qemu_sem_post(&p->ready);
> +    while (true) {
> +        qemu_mutex_lock(&p->mutex);
> +        if (p->quit) {
> +            qemu_mutex_unlock(&p->mutex);
> +            break;
> +        }
> +        if (p->pages->used) {
> +            p->pages->used = 0;
> +
> +            trace_multifd_recv(p->id, p->pages->seq, p->pages->used);

Same as on send, ->used is 0 here

> +            /* ToDo: receive pages here */
> +
> +            p->done = true;
> +            p->packets_recv++;
> +            qemu_mutex_unlock(&p->mutex);
> +            qemu_sem_post(&p->ready);
> +            continue;
> +        }
> +        qemu_mutex_unlock(&p->mutex);
> +        qemu_sem_wait(&p->sem);
> +    }
> +    trace_multifd_recv_thread(p->id, p->packets_recv);
> +
> +    return NULL;
> +}
> +
>  int multifd_load_setup(void)
>  {
>      int thread_count;
> @@ -742,6 +786,8 @@ int multifd_load_setup(void)
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
>      multifd_recv_state->count = 0;
>      multifd_recv_state->quit = false;
> +    multifd_pages_init(&multifd_recv_state->pages,
> +                       migrate_multifd_page_count());

What happens if the migrate parameter for multifd-page-count is
different on source and dest?

>      return 0;
>  }
>  
> @@ -752,7 +798,80 @@ int multifd_created_channels(void)
>  
>  void multifd_recv_new_channel(QIOChannel *ioc)
>  {
> -    socket_recv_channel_unref(ioc);
> +    MultiFDRecvParams *p;
> +    MultiFDInit_t msg;
> +    Error *local_err = NULL;
> +    size_t ret;
> +
> +    ret = qio_channel_read_all(ioc, (char *)&msg, sizeof(msg), &local_err);
> +    if (ret != 0) {
> +        terminate_multifd_recv_threads(local_err);
> +        return;
> +    }
> +
> +    if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
> +        char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
> +        error_setg(&local_err, "multifd: received uuid '%s' and expected "
> +                   "uuid '%s' for channel %hhd", msg.uuid, uuid, msg.id);
> +        g_free(uuid);
> +        terminate_multifd_recv_threads(local_err);
> +        return;
> +    }
> +
> +    p = &multifd_recv_state->params[msg.id];

msg.id needs to be checked against the size of params (thread_count ?)
before that array index.

> +    if (p->id != 0) {
> +        error_setg(&local_err, "multifd: received id '%d' already setup'",
> +                   msg.id);
> +        terminate_multifd_recv_threads(local_err);
> +        return;
> +    }
> +    qemu_mutex_init(&p->mutex);
> +    qemu_sem_init(&p->sem, 0);
> +    qemu_sem_init(&p->ready, 0);
> +    p->quit = false;
> +    p->id = msg.id;
> +    p->done = false;
> +    multifd_pages_init(&p->pages, migrate_multifd_page_count());
> +    p->c = ioc;
> +    multifd_recv_state->count++;
> +    p->name = g_strdup_printf("multifdrecv_%d", msg.id);
> +    socket_recv_channel_ref(ioc);
> +
> +    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
> +                       QEMU_THREAD_JOINABLE);
> +    if (multifd_recv_state->count == migrate_multifd_channels()) {
> +        migration_incoming_process();
> +    }
> +}
> +
> +static void multifd_recv_page(RAMBlock *block, ram_addr_t offset,
> +                              uint8_t *address, uint16_t fd_num)
> +{
> +    int thread_count;
> +    MultiFDRecvParams *p;
> +    multifd_pages_t *pages = multifd_recv_state->pages;
> +
> +    pages->iov[pages->used].iov_base = address;
> +    pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
> +    pages->used++;
> +
> +    if (fd_num == MULTIFD_CONTINUE) {
> +        return;
> +    }
> +
> +    thread_count = migrate_multifd_channels();
> +    assert(fd_num < thread_count);
> +    p = &multifd_recv_state->params[fd_num];
> +
> +    qemu_sem_wait(&p->ready);
> +
> +    qemu_mutex_lock(&p->mutex);
> +    p->done = false;
> +    p->pages->used = 0;
> +    multifd_recv_state->pages = p->pages;
> +    p->pages = pages;

This needs a comment; I'm a bit confused - too many things called
'pages' to know what is going where.  I think you've just waited for
one of the threads to get data, then passed the received data
out into multifd_recv_state?  But then the last p->pages = pages I'm
not sure.


> +    qemu_mutex_unlock(&p->mutex);
> +    qemu_sem_post(&p->sem);
>  }
>  
>  /**
> @@ -3042,6 +3161,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>      }
>  
>      while (!postcopy_running && !ret && !(flags & RAM_SAVE_FLAG_EOS)) {
> +        RAMBlock *block = NULL;
>          ram_addr_t addr, total_ram_bytes;
>          void *host = NULL;
>          uint16_t fd_num;
> @@ -3066,7 +3186,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>          if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
>                       RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
>                       RAM_SAVE_FLAG_MULTIFD_PAGE)) {
> -            RAMBlock *block = ram_block_from_stream(f, flags);
> +            block = ram_block_from_stream(f, flags);
>  
>              host = host_from_ram_block_offset(block, addr);
>              if (!host) {
> @@ -3157,10 +3277,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>  
>          case RAM_SAVE_FLAG_MULTIFD_PAGE:
>              fd_num = qemu_get_be16(f);
> -            if (fd_num != 0) {
> -                /* this is yet an unused variable, changed later */
> -                fd_num = fd_num;
> -            }
> +            multifd_recv_page(block, addr, host, fd_num);
>              qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
>              break;
>  
> diff --git a/migration/socket.c b/migration/socket.c
> index aedabee8a1..ba23442175 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -192,6 +192,9 @@ out:
>      if (migration_has_all_channels()) {
>          /* Close listening socket as its no longer needed */
>          qio_channel_close(ioc, NULL);
> +        if (!migrate_use_multifd()) {
> +            migration_incoming_process();
> +        }
>          return G_SOURCE_REMOVE;
>      } else {
>          return G_SOURCE_CONTINUE;
> diff --git a/migration/trace-events b/migration/trace-events
> index 61ee21a13e..06aef4d4a3 100644
> --- a/migration/trace-events
> +++ b/migration/trace-events
> @@ -77,7 +77,9 @@ ram_load_postcopy_loop(uint64_t addr, int flags) "@%" PRIx64 " %x"
>  ram_postcopy_send_discard_bitmap(void) ""
>  ram_save_page(const char *rbname, uint64_t offset, void *host) "%s: offset: 0x%" PRIx64 " host: %p"
>  ram_save_queue_pages(const char *rbname, size_t start, size_t len) "%s: start: 0x%zx len: 0x%zx"
> +multifd_recv(char id, int seq, int num) "channel %d sequence %d num pages %d"
>  multifd_send(char id, int seq, int num) "channel %d sequence %d num pages %d"
> +multifd_recv_thread(char id, uint32_t packets) "channel %d packets %d"
>  multifd_send_thread(char id, uint32_t packets) "channel %d packets %d"
>  
>  # migration/migration.c
> -- 
> 2.14.3
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH v10 08/14] migration: Transfer pages over new channels
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 08/14] migration: Transfer pages over new channels Juan Quintela
@ 2018-01-24 13:46   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 26+ messages in thread
From: Dr. David Alan Gilbert @ 2018-01-24 13:46 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> We switch for sending the page number to send real pages.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

Yes, I think this one makes sense (and explains some of the earlier
stuff), so


Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> --
> 
> Remove the HACK bit, now we have the function that calculates the size
> of a page exported.
> Rename multifd_pages{_now}, to sent pages
> Remove multifd pages field, it is the same than normal pages
> Merge test channels here
> Make sent_pages also work for non multifd case
> ---
>  migration/migration.c | 10 +++++++++-
>  migration/ram.c       | 41 +++++++++++++++++++++++++++++------------
>  2 files changed, 38 insertions(+), 13 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index 1545f3a0b0..6ebcfa36cc 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -2230,6 +2230,8 @@ static void *migration_thread(void *opaque)
>       */
>      int64_t threshold_size = 0;
>      int64_t qemu_file_bytes = 0;
> +    /* Stores how many pages we have sent */
> +    int64_t sent_pages = 0;
>      int64_t start_time = initial_time;
>      int64_t end_time;
>      bool old_vm_running = false;
> @@ -2318,8 +2320,13 @@ static void *migration_thread(void *opaque)
>          current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
>          if (current_time >= initial_time + BUFFER_DELAY) {
>              uint64_t qemu_file_bytes_now = qemu_ftell(s->to_dst_file);
> +            uint64_t sent_pages_now = ram_counters.normal;
> +            /* multifd sends data out of the qemu_file */
> +            uint64_t multifd_transferred = migrate_use_multifd() ?
> +                (sent_pages_now - sent_pages) * qemu_target_page_size() : 0;
>              uint64_t transferred_bytes =
> -                qemu_file_bytes_now - qemu_file_bytes;
> +                (qemu_file_bytes_now - qemu_file_bytes) +
> +                multifd_transferred;
>              uint64_t time_spent = current_time - initial_time;
>              double bandwidth = (double)transferred_bytes / time_spent;
>              threshold_size = bandwidth * s->parameters.downtime_limit;
> @@ -2339,6 +2346,7 @@ static void *migration_thread(void *opaque)
>              qemu_file_reset_rate_limit(s->to_dst_file);
>              initial_time = current_time;
>              qemu_file_bytes = qemu_file_bytes_now;
> +            sent_pages = sent_pages_now;
>          }
>          if (qemu_file_rate_limit(s->to_dst_file)) {
>              /* usleep expects microseconds */
> diff --git a/migration/ram.c b/migration/ram.c
> index 8443806f12..20f3726909 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -542,12 +542,20 @@ static void *multifd_send_thread(void *opaque)
>              break;
>          }
>          if (p->pages->used) {
> +            Error *local_err = NULL;
> +            size_t ret;
> +            uint32_t used;
> +
> +            used = p->pages->used;
>              p->pages->used = 0;
>              qemu_mutex_unlock(&p->mutex);
>  
> -            trace_multifd_send(p->id, p->pages->seq, p->pages->used);
> -            /* ToDo: send page here */
> -
> +            trace_multifd_send(p->id, p->pages->seq, used);
> +            ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err);
> +            if (ret != 0) {
> +                terminate_multifd_send_threads(local_err);
> +                return NULL;
> +            }
>              qemu_mutex_lock(&multifd_send_state->mutex);
>              p->done = true;
>              p->packets_sent++;
> @@ -754,12 +762,21 @@ static void *multifd_recv_thread(void *opaque)
>              break;
>          }
>          if (p->pages->used) {
> +            Error *local_err = NULL;
> +            size_t ret;
> +            uint32_t used;
> +
> +            used = p->pages->used;
>              p->pages->used = 0;
> +            qemu_mutex_unlock(&p->mutex);
>  
> -            trace_multifd_recv(p->id, p->pages->seq, p->pages->used);
> -
> -            /* ToDo: receive pages here */
> -
> +            trace_multifd_recv(p->id, p->pages->seq, used);
> +            ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
> +            if (ret != 0) {
> +                terminate_multifd_recv_threads(local_err);
> +                return NULL;
> +            }
> +            qemu_mutex_lock(&p->mutex);
>              p->done = true;
>              p->packets_recv++;
>              qemu_mutex_unlock(&p->mutex);
> @@ -1311,12 +1328,9 @@ static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
>  {
>      int pages;
>      uint16_t fd_num;
> -    uint8_t *p;
>      RAMBlock *block = pss->block;
>      ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
>  
> -    p = block->host + offset;
> -
>      pages = save_zero_page(rs, block, offset);
>      if (pages == -1) {
>          ram_counters.transferred +=
> @@ -1325,8 +1339,12 @@ static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
>          fd_num = multifd_send_page(block, offset,
>                                     rs->migration_dirty_pages == 1);
>          qemu_put_be16(rs->f, fd_num);
> +        if (fd_num != MULTIFD_CONTINUE) {
> +            /* We start with a different channel.
> +               Flush pending work */
> +            qemu_fflush(rs->f);
> +        }
>          ram_counters.transferred += 2; /* size of fd_num */
> -        qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
>          ram_counters.transferred += TARGET_PAGE_SIZE;
>          pages = 1;
>          ram_counters.normal++;
> @@ -3278,7 +3296,6 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>          case RAM_SAVE_FLAG_MULTIFD_PAGE:
>              fd_num = qemu_get_be16(f);
>              multifd_recv_page(block, addr, host, fd_num);
> -            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
>              break;
>  
>          case RAM_SAVE_FLAG_EOS:
> -- 
> 2.14.3
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH v10 09/14] migration: Flush receive queue
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 09/14] migration: Flush receive queue Juan Quintela
@ 2018-01-24 14:12   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 26+ messages in thread
From: Dr. David Alan Gilbert @ 2018-01-24 14:12 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> Each time that we sync the bitmap, it is a possiblity that we receive
> a page that is being processed by a different thread.  We fix this
> problem just making sure that we wait for all receiving threads to
> finish its work before we procedeed with the next stage.
> 
> We are low on page flags, so we use a combination that is not valid to
> emit that message:  MULTIFD_PAGE and COMPRESSED.
> 
> I tried to make a migration command for it, but it don't work because
> we sync the bitmap sometimes when we have already sent the beggining
> of the section, so I just added a new page flag.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> --
> Create RAM_SAVE_FLAG_MULTIFD_SYNC (dave suggestion)
> Move the set of need_flush to inside the bitmap_sync code (peter suggestion)
> ---
>  migration/ram.c | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 55 insertions(+)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 20f3726909..b1ad7b2730 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -74,6 +74,14 @@
>  #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
>  #define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
>  
> +/* We are getting low on pages flags, so we start using combinations
> +   When we need to flush a page, we sent it as
> +   RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO
> +   We don't allow that combination
> +*/
> +#define RAM_SAVE_FLAG_MULTIFD_SYNC \
> +    (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO)
> +
>  static inline bool is_zero_range(uint8_t *p, uint64_t size)
>  {
>      return buffer_is_zero(p, size);
> @@ -226,6 +234,9 @@ struct RAMState {
>      uint64_t iterations_prev;
>      /* Iterations since start */
>      uint64_t iterations;
> +    /* Indicates if we have synced the bitmap and we need to assure that
> +       target has processeed all previous pages */
> +    bool multifd_needs_flush;
>      /* number of dirty bits in the bitmap */
>      uint64_t migration_dirty_pages;
>      /* protects modification of the bitmap */
> @@ -675,11 +686,13 @@ struct MultiFDRecvParams {
>      QIOChannel *c;
>      QemuSemaphore ready;
>      QemuSemaphore sem;
> +    QemuCond cond_sync;
>      QemuMutex mutex;
>      /* proteced by param mutex */
>      bool quit;
>      /* how many patches has sent this channel */
>      uint32_t packets_recv;
> +    bool sync;
>      multifd_pages_t *pages;
>      bool done;
>  };
> @@ -734,6 +747,7 @@ int multifd_load_cleanup(Error **errp)
>          qemu_thread_join(&p->thread);
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        qemu_cond_destroy(&p->cond_sync);
>          socket_recv_channel_unref(p->c);
>          g_free(p->name);
>          p->name = NULL;
> @@ -779,6 +793,10 @@ static void *multifd_recv_thread(void *opaque)
>              qemu_mutex_lock(&p->mutex);
>              p->done = true;
>              p->packets_recv++;
> +            if (p->sync) {
> +                qemu_cond_signal(&p->cond_sync);
> +                p->sync = false;
> +            }
>              qemu_mutex_unlock(&p->mutex);
>              qemu_sem_post(&p->ready);
>              continue;
> @@ -845,9 +863,11 @@ void multifd_recv_new_channel(QIOChannel *ioc)
>      qemu_mutex_init(&p->mutex);
>      qemu_sem_init(&p->sem, 0);
>      qemu_sem_init(&p->ready, 0);
> +    qemu_cond_init(&p->cond_sync);
>      p->quit = false;
>      p->id = msg.id;
>      p->done = false;
> +    p->sync = false;
>      multifd_pages_init(&p->pages, migrate_multifd_page_count());
>      p->c = ioc;
>      multifd_recv_state->count++;
> @@ -891,6 +911,27 @@ static void multifd_recv_page(RAMBlock *block, ram_addr_t offset,
>      qemu_sem_post(&p->sem);
>  }
>  
> +static int multifd_flush(void)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_use_multifd()) {
> +        return 0;
> +    }
> +    thread_count = migrate_multifd_channels();
> +    for (i = 0; i < thread_count; i++) {
> +        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +
> +        qemu_mutex_lock(&p->mutex);
> +        while (!p->done) {
> +            p->sync = true;
> +            qemu_cond_wait(&p->cond_sync, &p->mutex);
> +        }

I think this can get stuck if there's an error at just the wrong time
in the recv_thread;  perhaps if you make sure the
terminate_multifd_recev_threads wakes all the cond_sync's it would work?

Dave

> +        qemu_mutex_unlock(&p->mutex);
> +    }
> +    return 0;
> +}
> +
>  /**
>   * save_page_header: write page header to wire
>   *
> @@ -908,6 +949,12 @@ static size_t save_page_header(RAMState *rs, QEMUFile *f,  RAMBlock *block,
>  {
>      size_t size, len;
>  
> +    if (rs->multifd_needs_flush &&
> +        (offset & RAM_SAVE_FLAG_MULTIFD_PAGE)) {
> +        offset |= RAM_SAVE_FLAG_ZERO;
> +        rs->multifd_needs_flush = false;
> +    }
> +
>      if (block == rs->last_sent_block) {
>          offset |= RAM_SAVE_FLAG_CONTINUE;
>      }
> @@ -1196,6 +1243,9 @@ static void migration_bitmap_sync(RAMState *rs)
>      if (migrate_use_events()) {
>          qapi_event_send_migration_pass(ram_counters.dirty_sync_count, NULL);
>      }
> +    if (!rs->ram_bulk_stage && migrate_use_multifd()) {
> +        rs->multifd_needs_flush = true;
> +    }
>  }
>  
>  /**
> @@ -3201,6 +3251,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>              break;
>          }
>  
> +        if ((flags & RAM_SAVE_FLAG_MULTIFD_SYNC)
> +            == RAM_SAVE_FLAG_MULTIFD_SYNC) {
> +            multifd_flush();
> +            flags = flags & ~RAM_SAVE_FLAG_ZERO;

OK, so it's worth a comment saying that a MULTIFD_SYNC is not just a
sync, it's a sync and a page.

Dave

> +        }
>          if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
>                       RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
>                       RAM_SAVE_FLAG_MULTIFD_PAGE)) {
> -- 
> 2.14.3
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH v10 10/14] migration: Add multifd test
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 10/14] migration: Add multifd test Juan Quintela
@ 2018-01-24 14:23   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 26+ messages in thread
From: Dr. David Alan Gilbert @ 2018-01-24 14:23 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> We set the x-multifd-page-count and x-multifd-channels.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> ---
>  tests/migration-test.c | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 50 insertions(+)
> 
> diff --git a/tests/migration-test.c b/tests/migration-test.c
> index eb44a95aa9..488ae89f34 100644
> --- a/tests/migration-test.c
> +++ b/tests/migration-test.c
> @@ -826,6 +826,55 @@ static void test_compress_unix(void)
>      g_free(uri);
>  }
>  
> +static void test_multifd_tcp(void)
> +{
> +    char *uri;
> +    char *port;
> +    QTestState *from, *to;
> +
> +    test_migrate_start(&from, &to, "tcp:127.0.0.1:0");
> +
> +    /* We want to pick a speed slow enough that the test completes
> +     * quickly, but that it doesn't complete precopy even on a slow
> +     * machine, so also set the downtime.
> +     */
> +    /* 1 ms should make it not converge*/
> +    migrate_set_parameter(from, "downtime-limit", "1");
> +    /* 1GB/s */
> +    migrate_set_parameter(from, "max-bandwidth", "100000000");
> +
> +    migrate_set_parameter(from, "x-multifd-channels", "4");
> +    migrate_set_parameter(to, "x-multifd-channels", "4");
> +
> +    migrate_set_parameter(from, "x-multifd-page-count", "64");
> +    migrate_set_parameter(to, "x-multifd-page-count", "64");
> +
> +    migrate_set_capability(from, "x-multifd", "true");
> +    migrate_set_capability(to, "x-multifd", "true");
> +    /* Wait for the first serial output from the source */
> +    wait_for_serial("src_serial");
> +
> +    port = migrate_get_parameter(to, "tcp-port");
> +    uri = g_strdup_printf("tcp:127.0.0.1:%s", port);
> +
> +    migrate(from, uri);
> +
> +    wait_for_migration_pass(from);
> +
> +    /* 300ms it should converge */
> +    migrate_set_parameter(from, "downtime-limit", "300");
> +
> +    if (!got_stop) {
> +        qtest_qmp_eventwait(from, "STOP");
> +    }
> +    qtest_qmp_eventwait(to, "RESUME");
> +
> +    wait_for_serial("dest_serial");
> +    wait_for_migration_complete(from);
> +
> +    test_migrate_end(from, to);
> +}
> +
>  int main(int argc, char **argv)
>  {
>      char template[] = "/tmp/migration-test-XXXXXX";
> @@ -853,6 +902,7 @@ int main(int argc, char **argv)
>      if (0) {
>          qtest_add_func("/migration/compress/unix", test_compress_unix);
>      }
> +    qtest_add_func("/migration/multifd/tcp", test_multifd_tcp);
>  
>      ret = g_test_run();
>  
> -- 
> 2.14.3
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH v10 12/14] migration: Sent the page list over the normal thread
  2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 12/14] migration: Sent the page list over the normal thread Juan Quintela
@ 2018-01-24 14:29   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 26+ messages in thread
From: Dr. David Alan Gilbert @ 2018-01-24 14:29 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> Signed-off-by: Juan Quintela <quintela@redhat.com>

Hmm not sure what this one is doing;  is this permanent?
Is there a guarantee already that all the pages in one chunk sent over
multifd are from the same RAMBlock?
(and there's a printf down there)

Dave

> ---
>  migration/ram.c | 69 +++++++++++++++++++++++++++++++++++++++++++++++++--------
>  1 file changed, 60 insertions(+), 9 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index b1ad7b2730..f636c7da0a 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -411,6 +411,19 @@ static void compress_threads_save_setup(void)
>  /* used to continue on the same multifd group */
>  #define MULTIFD_CONTINUE UINT16_MAX
>  
> +#define MULTIFD_MAGIC 0x112233d
> +#define MULTIFD_VERSION 1
> +
> +typedef struct {
> +    uint32_t magic;
> +    uint32_t version;
> +    uint32_t size;
> +    uint32_t used;
> +    uint32_t seq;
> +    char ramblock[256];
> +    ram_addr_t offset[];
> +} __attribute__((packed)) MultiFDPacket_t;
> +
>  typedef struct {
>      /* number of used pages */
>      uint32_t used;
> @@ -420,6 +433,8 @@ typedef struct {
>      uint32_t seq;
>      struct iovec *iov;
>      RAMBlock *block;
> +    uint32_t packet_len;
> +    MultiFDPacket_t *packet;
>  } multifd_pages_t;
>  
>  struct MultiFDSendParams {
> @@ -456,6 +471,8 @@ static void multifd_pages_init(multifd_pages_t **ppages, size_t size)
>  
>      pages->allocated = size;
>      pages->iov = g_new0(struct iovec, size);
> +    pages->packet_len = sizeof(MultiFDPacket_t) + sizeof(ram_addr_t) * size;
> +    pages->packet = g_malloc0(pages->packet_len);
>      *ppages = pages;
>  }
>  
> @@ -467,6 +484,9 @@ static void multifd_pages_clear(multifd_pages_t *pages)
>      pages->block = NULL;
>      g_free(pages->iov);
>      pages->iov = NULL;
> +    pages->packet_len = 0;
> +    g_free(pages->packet);
> +    pages->packet = NULL;
>      g_free(pages);
>  }
>  
> @@ -553,16 +573,27 @@ static void *multifd_send_thread(void *opaque)
>              break;
>          }
>          if (p->pages->used) {
> +            MultiFDPacket_t *packet = p->pages->packet;
>              Error *local_err = NULL;
>              size_t ret;
> -            uint32_t used;
>  
> -            used = p->pages->used;
> +            packet->used = p->pages->used;
>              p->pages->used = 0;
>              qemu_mutex_unlock(&p->mutex);
> -
> -            trace_multifd_send(p->id, p->pages->seq, used);
> -            ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err);
> +            packet->magic = MULTIFD_MAGIC;
> +            packet->version = MULTIFD_VERSION;
> +            strncpy(packet->ramblock, p->pages->block->idstr, 256);
> +            packet->size = migrate_multifd_page_count();
> +            packet->seq = p->pages->seq;
> +            ret = qio_channel_write_all(p->c, (void *)packet,
> +                                        p->pages->packet_len, &local_err);
> +            if (ret != 0) {
> +                terminate_multifd_send_threads(local_err);
> +                return NULL;
> +            }
> +            trace_multifd_send(p->id, p->pages->seq, packet->used);
> +            ret = qio_channel_writev_all(p->c, p->pages->iov,
> +                                         packet->used, &local_err);
>              if (ret != 0) {
>                  terminate_multifd_send_threads(local_err);
>                  return NULL;
> @@ -645,6 +676,7 @@ static uint16_t multifd_send_page(RAMBlock *block, ram_addr_t offset,
>          pages->block = block;
>      }
>  
> +    pages->packet->offset[pages->used] = offset;
>      pages->iov[pages->used].iov_base = block->host + offset;
>      pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
>      pages->used++;
> @@ -776,16 +808,35 @@ static void *multifd_recv_thread(void *opaque)
>              break;
>          }
>          if (p->pages->used) {
> +            MultiFDPacket_t *packet = p->pages->packet;
> +            RAMBlock *block;
>              Error *local_err = NULL;
>              size_t ret;
> -            uint32_t used;
> +            int i;
>  
> -            used = p->pages->used;
>              p->pages->used = 0;
>              qemu_mutex_unlock(&p->mutex);
>  
> -            trace_multifd_recv(p->id, p->pages->seq, used);
> -            ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
> +            ret = qio_channel_read_all(p->c, (void *)packet,
> +                                       p->pages->packet_len, &local_err);
> +            if (ret != 0) {
> +                terminate_multifd_recv_threads(local_err);
> +                return NULL;
> +            }
> +            block = qemu_ram_block_by_name(packet->ramblock);
> +            p->pages->seq = packet->seq;
> +            for (i = 0; i < packet->used; i++) {
> +                if (block->host + packet->offset[i]
> +                    != p->pages->iov[i].iov_base) {
> +                    printf("page offset %d packet %p pages %p\n", i,
> +                           block->host + packet->offset[i],
> +                           p->pages->iov[i].iov_base);
> +                    break;
> +                }
> +            }
> +            trace_multifd_recv(p->id, p->pages->seq, packet->used);
> +            ret = qio_channel_readv_all(p->c, p->pages->iov,
> +                                        packet->used, &local_err);
>              if (ret != 0) {
>                  terminate_multifd_recv_threads(local_err);
>                  return NULL;
> -- 
> 2.14.3
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 26+ messages in thread

end of thread, other threads:[~2018-01-24 14:29 UTC | newest]

Thread overview: 26+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-01-10 12:47 [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 01/14] migration: Make migrate_fd_error() the owner of the Error Juan Quintela
2018-01-12 18:50   ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 02/14] migration: Rename initial_bytes Juan Quintela
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 03/14] migration: Drop current address parameter from save_zero_page() Juan Quintela
2018-01-12 18:56   ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 04/14] migration: Start of multiple fd work Juan Quintela
2018-01-22  7:00   ` Peter Xu
2018-01-23 19:52   ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 05/14] migration: Create ram_multifd_page Juan Quintela
2018-01-23 20:16   ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 06/14] migration: Send the fd number which we are going to use for this page Juan Quintela
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 07/14] migration: Create thread infrastructure for multifd recv side Juan Quintela
2018-01-24 13:34   ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 08/14] migration: Transfer pages over new channels Juan Quintela
2018-01-24 13:46   ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 09/14] migration: Flush receive queue Juan Quintela
2018-01-24 14:12   ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 10/14] migration: Add multifd test Juan Quintela
2018-01-24 14:23   ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 11/14] LOCAL: use trace events for migration-test Juan Quintela
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 12/14] migration: Sent the page list over the normal thread Juan Quintela
2018-01-24 14:29   ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 13/14] migration: Add multifd_send_packet trace Juan Quintela
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 14/14] all works Juan Quintela
2018-01-10 15:01 ` [Qemu-devel] [RFC 00/14] Multifd Juan Quintela

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.