All of lore.kernel.org
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH v12 00/21] Multifd
@ 2018-04-25 11:27 Juan Quintela
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 01/21] migration: Set error state in case of error Juan Quintela
                   ` (22 more replies)
  0 siblings, 23 replies; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx


Hi


[v12]

Big news, it is not RFC anymore, it works reliabely for me.

Changes:
- Locknig changed completely (several times)
- We now send  all pages through the channels.  In a 2GB guest with 1 disk and a network card, the amount of data send for RAM was 80KB.
- This is not optimized yet, but it shouws clear improvements over precopy.  testing over localhost networking I can guet:
  - 2 VCPUs guest
  - 2GB RAM
  - runn stress --vm 4 --vm 500GB (i.e. dirtying 2GB or RAM each second)

  - Total time: precopy ~50seconds, multifd  around 11seconds
  - Bandwidth usage is around 273MB/s vs 71MB/s on the same hardware

This is very preleminary testing, will send more numbers when I got them.  But looks promissing.

Things that will be improved later:
- Initial synchronization is too slow (around 1s)
- We synchronize all threads after each RAM section, we can move to only
  synchronize them after we have done a bitmap syncrhronization
- We can improve bitmap walking (but that is independent of multifd)

Please review.

Later, Juan.


[v11]

Changes on top of previous sumbimission:
- Now on top of migration-tests/v6 that I sent on Wednesday
- Rebased to latest upstream
- Everything that is sent through the network should be converted correctly
  (famous last words)
- Still on RFC (sometimes it ends some packets at the end), just to
  show how things are going on.  Problems are only on the last patch.

- Redo some locking (again) Now the problem is being able te send the
  synchronization through the multifd channels.  I end the migration
  _before_ all the channels have recevied all the packets.

- Trying to get a flags argument into each packet, to be able to synchronze
  through the network, not from the "main" incoming corroutine.

- Related to the network-safe fields, now everything is in its own
  routine, it should be easier to understand/review.  Once there, I
  check that all values are inside range.

So, please comment.

Thanks, Juan.


[v10]
Lots of changes from previous versions:
a - everything is sent now through the multifd channels, nothing is sent through main channel
b - locking is band new, I was getting into a hole with the previous approach, right now, there is a single way to
    do locking (both source and destination)
       main thread : sets a ->sync variable for each thread and wakeps it
       multifd threads: clean the variable and signal (sem) back to main thread

    using this for either:
    - all threads have started
    - we need to synchronize after each round through memory
    - all threads have finished

c - I have to use a qio watcher for a thread to wait for ready data to read

d - lots of cleanups

e - to make things easier, I have included the missing tests stuff on
    this round of patches, because they build on top of them

f - lots of traces, it is now much easier to follow what is happening

Now, why it is an RFC:

- in the last patch, there is still race between the whatcher, the
  ->quit of the threads and the last synchronization.  Techinically they
  are done in oder, but in practice, they are hanging sometimes.

- I *know* I can optimize the synchronization of the threads sending
  the "we start a new round" through the multifd channels, have to add a flag here.

- Not having a thread on the incoming side  is a mess, I can't block waiting for things to happen :-(

- When doing the synchronization, I need to optimize the sending of the "not finished packet" of pages, working on that.

please, take a look and review.

Thanks, Juan.

[v9]

This series is on top of my migration test series just sent, only reject should be on the test system, though.

On v9 series for you:
- qobject_unref() as requested by dan

  Yes he was right, I had a reference leak for _non_ multifd, I
  *thought* he mean for multifd, and that took a while to understand
  (and then find when/where).

- multifd page count: it is dropped for good
- uuid handling: we use the default qemu uuid of 0000...
- uuid handling: using and struct and sending the struct
  * idea is to add a size field and add more parameter after that
  * anyone has a good idea how to "ouptut" info
    migrate_capabilities/parameters json into a string and how to read it back?
- changed how we test that all threads/channels are already created.
  Should be more robust.
- Add tests multifd.  Still not ported on top of migration-tests series sent early
  waiting for review on the ideas there.
- Rebase and remove al the integrated patches (back at 12)

Please, review.

Later, Juan.

[v8]
Things NOT done yet:

- drop x-multifd-page-count?  We can use performance to set a default value
- paolo suggestion of not having a control channel
  needs iyet more cleanups to be able to have more than one ramstate, trying it.
- still not performance done, but it has been very stable

On v8:
- use connect_async
- rename multifd-group to multifd-page-count (danp suggestion)
- rename multifd-threads to multifd-channels (danp suggestion)
- use new qio*channel functions
- Address rest of comments left


So, please review.

My idea will be to pull this changes and continue performance changes
for inside, basically everything is already reviewed.

Thanks, Juan.

On v7:
- tests fixed as danp wanted
- have to revert danp qio_*_all patches, as they break multifd, I have to investigate why.
- error_abort is gone.  After several tries about getting errors, I ended having a single error
  proceted by a lock and first error wins.
- Addressed basically all reviews (see on ToDo)
- Pointers to struct are done now
- fix lots of leaks
- lots of small fixes


[v6]
- Improve migration_ioc_porcess_incoming
- teach about G_SOURCE_REMOVE/CONTINUE
- Add test for migration_has_all_channels
- use DEFIN_PROP*
- change recv_state to use pointers to parameters
  make easier to receive channels out of order
- use g_strdup_printf()
- improve count of threads to know when we have to finish
- report channel id's on errors
- Use last_page parameter for multifd_send_page() sooner
- Improve commets for address
- use g_new0() instead of g_malloc()
- create MULTIFD_CONTINUE instead of using UINT16_MAX
- clear memory used by group of pages
  once there, pass everything to the global state variables instead of being
  local to the function.  This way it works if we cancel migration and start
  a new one
- Really wait to create the migration_thread until all channels are created
- split initial_bytes setup to make clearer following patches.
- createRAM_SAVE_FLAG_MULTIFD_SYNC macro, to make clear what we are doing
- move setting of need_flush to inside bitmap_sync
- Lots of other small changes & reorderings

Please, comment.


[v5]

- tests from qio functions (a.k.a. make danp happy)
- 1st message from one channel to the other contains:
   <uuid> multifd <channel number>
   This would allow us to create more channels as we want them.
   a.k.a. Making dave happy
- Waiting in reception for new channels using qio listeners
  Getting threads, qio and reference counters working at the same time
  was interesing.
  Another make danp happy.

- Lots and lots of small changes and fixes.  Notice that the last 70 patches
  that I merged or so what to make this series easier/smaller.

- NOT DONE: I haven't been woring on measuring performance
  differences, this was about getting the creation of the
  threads/channels right.

So, what I want:

- Are people happy with how I have (ab)used qio channels? (yes danp,
  that is you).
- My understanding is th

ToDo:

- Make paolo happy: He wanted to test using control information
  through each channel, not only pages.  This requires yet more
  cleanups to be able to have more than one QEMUFile/RAMState open at
  the same time.

- How I create multiple channels.  Things I know:
  * with current changes, it should work with fd/channels (the multifd bits),
    but we don;t have a way to pass multiple fd;s or exec files.
    Danp, any idea about how to create an UI for it?
  * My idea is that we would split current code to be:
    + channel creation at migration.c
    + rest of bits at ram.c
    + change format to:
      <uuid> main <rest of migration capabilities/paramentes> so we can check
      <uuid> postcopy <no clue what parameters are needed>
          Dave wanted a way to create a new fd for postcopy for some time
    + Adding new channels is easy

- Performance data/numbers: Yes, I wanted to get this out at once, I
  would continue with this.


Please, review.


[v4]
This is the 4th version of multifd. Changes:
- XBZRLE don't need to be checked for
- Documentation and defaults are consistent
- split socketArgs
- use iovec instead of creating something similar.
- We use now the exported size of target page (another HACK removal)
- created qio_chanel_{wirtev,readv}_all functions.  the _full() name
  was already taken.
  What they do is the same that the without _all() function, but if it
  returns due to blocking it redo the call.
- it is checkpatch.pl clean now.

Please comment, Juan.


Juan Quintela (21):
  migration: Set error state in case of error
  migration: Introduce multifd_recv_new_channel()
  migration: terminate_* can be called for other threads
  migration: Be sure all recv channels are created
  migration: Export functions to create send channels
  migration: Create multifd channels
  migration: Delay start of migration main routines
  migration: Transmit initial package through the multifd channels
  migration: Define MultifdRecvParams sooner
  migration: Create multipage support
  migration: Create multifd packet
  migration: Add multifd traces for start/end thread
  migration: Calculate transferred ram correctly
  migration: Multifd channels always wait on the sem
  migration: Add block where to send/receive packets
  migration: Synchronize multifd threads with main thread
  migration: Create ram_multifd_page
  migration: Start sending messages
  migration: Wait for blocking IO
  migration: Remove not needed semaphore and quit
  migration: Stop sending whole pages through main channel

 migration/migration.c  |  24 +-
 migration/migration.h  |   1 +
 migration/ram.c        | 710 ++++++++++++++++++++++++++++++++++++++---
 migration/ram.h        |   3 +
 migration/socket.c     |  32 +-
 migration/socket.h     |   7 +
 migration/trace-events |  12 +
 7 files changed, 740 insertions(+), 49 deletions(-)

-- 
2.17.0

^ permalink raw reply	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 01/21] migration: Set error state in case of error
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-05-02 15:53   ` Dr. David Alan Gilbert
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 02/21] migration: Introduce multifd_recv_new_channel() Juan Quintela
                   ` (21 subsequent siblings)
  22 siblings, 1 reply; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 24 ++++++++++++++++++++++--
 1 file changed, 22 insertions(+), 2 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 0e90efa092..2ae560ea80 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -415,10 +415,20 @@ struct {
     int count;
 } *multifd_send_state;
 
-static void terminate_multifd_send_threads(Error *errp)
+static void terminate_multifd_send_threads(Error *err)
 {
     int i;
 
+    if (err) {
+        MigrationState *s = migrate_get_current();
+        migrate_set_error(s, err);
+        if (s->state == MIGRATION_STATUS_SETUP ||
+            s->state == MIGRATION_STATUS_ACTIVE) {
+            migrate_set_state(&s->state, s->state,
+                              MIGRATION_STATUS_FAILED);
+        }
+    }
+
     for (i = 0; i < multifd_send_state->count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
@@ -515,10 +525,20 @@ struct {
     int count;
 } *multifd_recv_state;
 
-static void terminate_multifd_recv_threads(Error *errp)
+static void terminate_multifd_recv_threads(Error *err)
 {
     int i;
 
+    if (err) {
+        MigrationState *s = migrate_get_current();
+        migrate_set_error(s, err);
+        if (s->state == MIGRATION_STATUS_SETUP ||
+            s->state == MIGRATION_STATUS_ACTIVE) {
+            migrate_set_state(&s->state, s->state,
+                              MIGRATION_STATUS_FAILED);
+        }
+    }
+
     for (i = 0; i < multifd_recv_state->count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 02/21] migration: Introduce multifd_recv_new_channel()
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 01/21] migration: Set error state in case of error Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 03/21] migration: terminate_* can be called for other threads Juan Quintela
                   ` (20 subsequent siblings)
  22 siblings, 0 replies; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>
---
 migration/migration.c | 3 ++-
 migration/ram.c       | 6 ++++++
 migration/ram.h       | 2 ++
 3 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/migration/migration.c b/migration/migration.c
index 7eca65d1f0..604722cec9 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -466,8 +466,9 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
     if (!mis->from_src_file) {
         QEMUFile *f = qemu_fopen_channel_input(ioc);
         migration_fd_process_incoming(f);
+        return;
     }
-    /* We still only have a single channel.  Nothing to do here yet */
+    multifd_recv_new_channel(ioc);
 }
 
 /**
diff --git a/migration/ram.c b/migration/ram.c
index 2ae560ea80..c3c330b0e0 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -36,6 +36,7 @@
 #include "xbzrle.h"
 #include "ram.h"
 #include "migration.h"
+#include "socket.h"
 #include "migration/register.h"
 #include "migration/misc.h"
 #include "qemu-file.h"
@@ -619,6 +620,11 @@ int multifd_load_setup(void)
     return 0;
 }
 
+void multifd_recv_new_channel(QIOChannel *ioc)
+{
+    /* nothing to do yet */
+}
+
 /**
  * save_page_header: write page header to wire
  *
diff --git a/migration/ram.h b/migration/ram.h
index 5030be110a..06dbddc2a2 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -32,6 +32,7 @@
 #include "qemu-common.h"
 #include "qapi/qapi-types-migration.h"
 #include "exec/cpu-common.h"
+#include "io/channel.h"
 
 extern MigrationStats ram_counters;
 extern XBZRLECacheStats xbzrle_counters;
@@ -44,6 +45,7 @@ int multifd_save_setup(void);
 int multifd_save_cleanup(Error **errp);
 int multifd_load_setup(void);
 int multifd_load_cleanup(Error **errp);
+void multifd_recv_new_channel(QIOChannel *ioc);
 
 uint64_t ram_pagesize_summary(void);
 int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len);
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 03/21] migration: terminate_* can be called for other threads
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 01/21] migration: Set error state in case of error Juan Quintela
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 02/21] migration: Introduce multifd_recv_new_channel() Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 04/21] migration: Be sure all recv channels are created Juan Quintela
                   ` (19 subsequent siblings)
  22 siblings, 0 replies; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Once there, make  count field to always be accessed with atomic
operations.  To make blocking operations, we need to know that the
thread is running, so create a bool to indicate that.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>

--

Once here, s/terminate_multifd_*-threads/multifd_*_terminate_threads/
This is consistente with every other function
---
 migration/ram.c | 44 ++++++++++++++++++++++++++++++--------------
 1 file changed, 30 insertions(+), 14 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index c3c330b0e0..c0ad6d8e9f 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -406,6 +406,7 @@ struct MultiFDSendParams {
     QemuThread thread;
     QemuSemaphore sem;
     QemuMutex mutex;
+    bool running;
     bool quit;
 };
 typedef struct MultiFDSendParams MultiFDSendParams;
@@ -416,7 +417,7 @@ struct {
     int count;
 } *multifd_send_state;
 
-static void terminate_multifd_send_threads(Error *err)
+static void multifd_send_terminate_threads(Error *err)
 {
     int i;
 
@@ -430,7 +431,7 @@ static void terminate_multifd_send_threads(Error *err)
         }
     }
 
-    for (i = 0; i < multifd_send_state->count; i++) {
+    for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
         qemu_mutex_lock(&p->mutex);
@@ -448,11 +449,13 @@ int multifd_save_cleanup(Error **errp)
     if (!migrate_use_multifd()) {
         return 0;
     }
-    terminate_multifd_send_threads(NULL);
-    for (i = 0; i < multifd_send_state->count; i++) {
+    multifd_send_terminate_threads(NULL);
+    for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
-        qemu_thread_join(&p->thread);
+        if (p->running) {
+            qemu_thread_join(&p->thread);
+        }
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
         g_free(p->name);
@@ -479,6 +482,10 @@ static void *multifd_send_thread(void *opaque)
         qemu_sem_wait(&p->sem);
     }
 
+    qemu_mutex_lock(&p->mutex);
+    p->running = false;
+    qemu_mutex_unlock(&p->mutex);
+
     return NULL;
 }
 
@@ -493,7 +500,7 @@ int multifd_save_setup(void)
     thread_count = migrate_multifd_channels();
     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
-    multifd_send_state->count = 0;
+    atomic_set(&multifd_send_state->count, 0);
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
@@ -502,10 +509,11 @@ int multifd_save_setup(void)
         p->quit = false;
         p->id = i;
         p->name = g_strdup_printf("multifdsend_%d", i);
+        p->running = true;
         qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
                            QEMU_THREAD_JOINABLE);
 
-        multifd_send_state->count++;
+        atomic_inc(&multifd_send_state->count);
     }
     return 0;
 }
@@ -516,6 +524,7 @@ struct MultiFDRecvParams {
     QemuThread thread;
     QemuSemaphore sem;
     QemuMutex mutex;
+    bool running;
     bool quit;
 };
 typedef struct MultiFDRecvParams MultiFDRecvParams;
@@ -526,7 +535,7 @@ struct {
     int count;
 } *multifd_recv_state;
 
-static void terminate_multifd_recv_threads(Error *err)
+static void multifd_recv_terminate_threads(Error *err)
 {
     int i;
 
@@ -540,7 +549,7 @@ static void terminate_multifd_recv_threads(Error *err)
         }
     }
 
-    for (i = 0; i < multifd_recv_state->count; i++) {
+    for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         qemu_mutex_lock(&p->mutex);
@@ -558,11 +567,13 @@ int multifd_load_cleanup(Error **errp)
     if (!migrate_use_multifd()) {
         return 0;
     }
-    terminate_multifd_recv_threads(NULL);
-    for (i = 0; i < multifd_recv_state->count; i++) {
+    multifd_recv_terminate_threads(NULL);
+    for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
-        qemu_thread_join(&p->thread);
+        if (p->running) {
+            qemu_thread_join(&p->thread);
+        }
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
         g_free(p->name);
@@ -590,6 +601,10 @@ static void *multifd_recv_thread(void *opaque)
         qemu_sem_wait(&p->sem);
     }
 
+    qemu_mutex_lock(&p->mutex);
+    p->running = false;
+    qemu_mutex_unlock(&p->mutex);
+
     return NULL;
 }
 
@@ -604,7 +619,7 @@ int multifd_load_setup(void)
     thread_count = migrate_multifd_channels();
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
-    multifd_recv_state->count = 0;
+    atomic_set(&multifd_recv_state->count, 0);
     for (i = 0; i < thread_count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
@@ -613,9 +628,10 @@ int multifd_load_setup(void)
         p->quit = false;
         p->id = i;
         p->name = g_strdup_printf("multifdrecv_%d", i);
+        p->running = true;
         qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
                            QEMU_THREAD_JOINABLE);
-        multifd_recv_state->count++;
+        atomic_inc(&multifd_recv_state->count);
     }
     return 0;
 }
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 04/21] migration: Be sure all recv channels are created
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
                   ` (2 preceding siblings ...)
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 03/21] migration: terminate_* can be called for other threads Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 05/21] migration: Export functions to create send channels Juan Quintela
                   ` (18 subsequent siblings)
  22 siblings, 0 replies; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We need them before we start migration.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>
---
 migration/migration.c |  6 +++++-
 migration/ram.c       | 11 +++++++++++
 migration/ram.h       |  1 +
 3 files changed, 17 insertions(+), 1 deletion(-)

diff --git a/migration/migration.c b/migration/migration.c
index 604722cec9..98f85e982c 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -479,7 +479,11 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
  */
 bool migration_has_all_channels(void)
 {
-    return true;
+    bool all_channels;
+
+    all_channels = multifd_recv_all_channels_created();
+
+    return all_channels;
 }
 
 /*
diff --git a/migration/ram.c b/migration/ram.c
index c0ad6d8e9f..3a01472835 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -636,6 +636,17 @@ int multifd_load_setup(void)
     return 0;
 }
 
+bool multifd_recv_all_channels_created(void)
+{
+    int thread_count = migrate_multifd_channels();
+
+    if (!migrate_use_multifd()) {
+        return true;
+    }
+
+    return thread_count == atomic_read(&multifd_recv_state->count);
+}
+
 void multifd_recv_new_channel(QIOChannel *ioc)
 {
     /* nothing to do yet */
diff --git a/migration/ram.h b/migration/ram.h
index 06dbddc2a2..3f4b7daee8 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -45,6 +45,7 @@ int multifd_save_setup(void);
 int multifd_save_cleanup(Error **errp);
 int multifd_load_setup(void);
 int multifd_load_cleanup(Error **errp);
+bool multifd_recv_all_channels_created(void);
 void multifd_recv_new_channel(QIOChannel *ioc);
 
 uint64_t ram_pagesize_summary(void);
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 05/21] migration: Export functions to create send channels
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
                   ` (3 preceding siblings ...)
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 04/21] migration: Be sure all recv channels are created Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-04-26  7:28   ` Peter Xu
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 06/21] migration: Create multifd channels Juan Quintela
                   ` (17 subsequent siblings)
  22 siblings, 1 reply; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>
---
 migration/socket.c | 28 +++++++++++++++++++++++++++-
 migration/socket.h |  7 +++++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/migration/socket.c b/migration/socket.c
index edf33c70cf..893a04f4cc 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -29,6 +29,28 @@
 #include "trace.h"
 
 
+struct SocketOutgoingArgs {
+    SocketAddress *saddr;
+} outgoing_args;
+
+void socket_send_channel_create(QIOTaskFunc f, void *data)
+{
+    QIOChannelSocket *sioc = qio_channel_socket_new();
+    qio_channel_socket_connect_async(sioc, outgoing_args.saddr,
+                                     f, data, NULL, NULL);
+}
+
+int socket_send_channel_destroy(QIOChannel *send)
+{
+    /* Remove channel */
+    object_unref(OBJECT(send));
+    if (outgoing_args.saddr) {
+        qapi_free_SocketAddress(outgoing_args.saddr);
+        outgoing_args.saddr = NULL;
+    }
+    return 0;
+}
+
 static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
 {
     SocketAddress *saddr;
@@ -96,6 +118,11 @@ static void socket_start_outgoing_migration(MigrationState *s,
     struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
 
     data->s = s;
+
+    /* in case previous migration leaked it */
+    qapi_free_SocketAddress(outgoing_args.saddr);
+    outgoing_args.saddr = saddr;
+
     if (saddr->type == SOCKET_ADDRESS_TYPE_INET) {
         data->hostname = g_strdup(saddr->u.inet.host);
     }
@@ -107,7 +134,6 @@ static void socket_start_outgoing_migration(MigrationState *s,
                                      data,
                                      socket_connect_data_free,
                                      NULL);
-    qapi_free_SocketAddress(saddr);
 }
 
 void tcp_start_outgoing_migration(MigrationState *s,
diff --git a/migration/socket.h b/migration/socket.h
index 6b91e9db38..528c3b0202 100644
--- a/migration/socket.h
+++ b/migration/socket.h
@@ -16,6 +16,13 @@
 
 #ifndef QEMU_MIGRATION_SOCKET_H
 #define QEMU_MIGRATION_SOCKET_H
+
+#include "io/channel.h"
+#include "io/task.h"
+
+void socket_send_channel_create(QIOTaskFunc f, void *data);
+int socket_send_channel_destroy(QIOChannel *send);
+
 void tcp_start_incoming_migration(const char *host_port, Error **errp);
 
 void tcp_start_outgoing_migration(MigrationState *s, const char *host_port,
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 06/21] migration: Create multifd channels
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
                   ` (4 preceding siblings ...)
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 05/21] migration: Export functions to create send channels Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 07/21] migration: Delay start of migration main routines Juan Quintela
                   ` (16 subsequent siblings)
  22 siblings, 0 replies; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

In both sides.  We still don't transmit anything through them.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>
---
 migration/ram.c | 52 +++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 42 insertions(+), 10 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 3a01472835..54b1f8e836 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -404,6 +404,7 @@ struct MultiFDSendParams {
     uint8_t id;
     char *name;
     QemuThread thread;
+    QIOChannel *c;
     QemuSemaphore sem;
     QemuMutex mutex;
     bool running;
@@ -456,6 +457,8 @@ int multifd_save_cleanup(Error **errp)
         if (p->running) {
             qemu_thread_join(&p->thread);
         }
+        socket_send_channel_destroy(p->c);
+        p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
         g_free(p->name);
@@ -489,6 +492,27 @@ static void *multifd_send_thread(void *opaque)
     return NULL;
 }
 
+static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
+{
+    MultiFDSendParams *p = opaque;
+    QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
+    Error *local_err = NULL;
+
+    if (qio_task_propagate_error(task, &local_err)) {
+        if (multifd_save_cleanup(&local_err) != 0) {
+            migrate_set_error(migrate_get_current(), local_err);
+        }
+    } else {
+        p->c = QIO_CHANNEL(sioc);
+        qio_channel_set_delay(p->c, false);
+        p->running = true;
+        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
+                           QEMU_THREAD_JOINABLE);
+
+        atomic_inc(&multifd_send_state->count);
+    }
+}
+
 int multifd_save_setup(void)
 {
     int thread_count;
@@ -509,11 +533,7 @@ int multifd_save_setup(void)
         p->quit = false;
         p->id = i;
         p->name = g_strdup_printf("multifdsend_%d", i);
-        p->running = true;
-        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
-                           QEMU_THREAD_JOINABLE);
-
-        atomic_inc(&multifd_send_state->count);
+        socket_send_channel_create(multifd_new_send_channel_async, p);
     }
     return 0;
 }
@@ -522,6 +542,7 @@ struct MultiFDRecvParams {
     uint8_t id;
     char *name;
     QemuThread thread;
+    QIOChannel *c;
     QemuSemaphore sem;
     QemuMutex mutex;
     bool running;
@@ -574,6 +595,8 @@ int multifd_load_cleanup(Error **errp)
         if (p->running) {
             qemu_thread_join(&p->thread);
         }
+        object_unref(OBJECT(p->c));
+        p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
         g_free(p->name);
@@ -628,10 +651,6 @@ int multifd_load_setup(void)
         p->quit = false;
         p->id = i;
         p->name = g_strdup_printf("multifdrecv_%d", i);
-        p->running = true;
-        qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
-                           QEMU_THREAD_JOINABLE);
-        atomic_inc(&multifd_recv_state->count);
     }
     return 0;
 }
@@ -649,7 +668,20 @@ bool multifd_recv_all_channels_created(void)
 
 void multifd_recv_new_channel(QIOChannel *ioc)
 {
-    /* nothing to do yet */
+    MultiFDRecvParams *p;
+    /* we need to invent channels id's until we transmit */
+    /* we will remove this on a later patch */
+    static int i;
+
+    p = &multifd_recv_state->params[i];
+    i++;
+    p->c = ioc;
+    object_ref(OBJECT(ioc));
+
+    p->running = true;
+    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
+                       QEMU_THREAD_JOINABLE);
+    atomic_inc(&multifd_recv_state->count);
 }
 
 /**
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 07/21] migration: Delay start of migration main routines
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
                   ` (5 preceding siblings ...)
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 06/21] migration: Create multifd channels Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 08/21] migration: Transmit initial package through the multifd channels Juan Quintela
                   ` (15 subsequent siblings)
  22 siblings, 0 replies; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We need to make sure that we have started all the multifd threads.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>
---
 migration/migration.c | 4 ++--
 migration/migration.h | 1 +
 migration/ram.c       | 3 +++
 migration/socket.c    | 4 ++++
 4 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 98f85e982c..9b510a809a 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -447,7 +447,7 @@ static void migration_incoming_setup(QEMUFile *f)
     qemu_file_set_blocking(f, false);
 }
 
-static void migration_incoming_process(void)
+void migration_incoming_process(void)
 {
     Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, NULL);
     qemu_coroutine_enter(co);
@@ -465,7 +465,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
 
     if (!mis->from_src_file) {
         QEMUFile *f = qemu_fopen_channel_input(ioc);
-        migration_fd_process_incoming(f);
+        migration_incoming_setup(f);
         return;
     }
     multifd_recv_new_channel(ioc);
diff --git a/migration/migration.h b/migration/migration.h
index 4774ee305f..e20f680bac 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -188,6 +188,7 @@ void migrate_set_state(int *state, int old_state, int new_state);
 
 void migration_fd_process_incoming(QEMUFile *f);
 void migration_ioc_process_incoming(QIOChannel *ioc);
+void migration_incoming_process(void);
 
 bool  migration_has_all_channels(void);
 
diff --git a/migration/ram.c b/migration/ram.c
index 54b1f8e836..5a87d74862 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -682,6 +682,9 @@ void multifd_recv_new_channel(QIOChannel *ioc)
     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
                        QEMU_THREAD_JOINABLE);
     atomic_inc(&multifd_recv_state->count);
+    if (multifd_recv_state->count == migrate_multifd_channels()) {
+        migration_incoming_process();
+    }
 }
 
 /**
diff --git a/migration/socket.c b/migration/socket.c
index 893a04f4cc..d4a2c1e916 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -171,6 +171,10 @@ static void socket_accept_incoming_migration(QIONetListener *listener,
         qio_net_listener_disconnect(listener);
 
         object_unref(OBJECT(listener));
+
+        if (!migrate_use_multifd()) {
+            migration_incoming_process();
+        }
     }
 }
 
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 08/21] migration: Transmit initial package through the multifd channels
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
                   ` (6 preceding siblings ...)
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 07/21] migration: Delay start of migration main routines Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-05-02 17:19   ` Dr. David Alan Gilbert
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 09/21] migration: Define MultifdRecvParams sooner Juan Quintela
                   ` (14 subsequent siblings)
  22 siblings, 1 reply; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>

--

Be network agnostic.
Add error checking for all values.
---
 migration/ram.c | 101 +++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 96 insertions(+), 5 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 5a87d74862..1aab392d8f 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -52,6 +52,8 @@
 #include "qemu/rcu_queue.h"
 #include "migration/colo.h"
 #include "migration/block.h"
+#include "sysemu/sysemu.h"
+#include "qemu/uuid.h"
 
 /***********************************************************/
 /* ram save/restore */
@@ -400,6 +402,16 @@ static void compress_threads_save_setup(void)
 
 /* Multiple fd's */
 
+#define MULTIFD_MAGIC 0x11223344U
+#define MULTIFD_VERSION 1
+
+typedef struct {
+    uint32_t magic;
+    uint32_t version;
+    unsigned char uuid[16]; /* QemuUUID */
+    uint8_t id;
+} __attribute__((packed)) MultiFDInit_t;
+
 struct MultiFDSendParams {
     uint8_t id;
     char *name;
@@ -412,6 +424,65 @@ struct MultiFDSendParams {
 };
 typedef struct MultiFDSendParams MultiFDSendParams;
 
+static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
+{
+    MultiFDInit_t msg;
+    int ret;
+
+    msg.magic = cpu_to_be32(MULTIFD_MAGIC);
+    msg.version = cpu_to_be32(MULTIFD_VERSION);
+    msg.id = p->id;
+    memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
+
+    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
+    if (ret != 0) {
+        return -1;
+    }
+    return 0;
+}
+
+static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
+{
+    MultiFDInit_t msg;
+    int ret;
+
+    ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
+    if (ret != 0) {
+        return -1;
+    }
+
+    be32_to_cpus(&msg.magic);
+    be32_to_cpus(&msg.version);
+
+    if (msg.magic != MULTIFD_MAGIC) {
+        error_setg(errp, "multifd: received packet magic %d "
+                   "expected %d", msg.magic, MULTIFD_MAGIC);
+        return -1;
+    }
+
+    if (msg.version != MULTIFD_VERSION) {
+        error_setg(errp, "multifd: received packet version %d "
+                   "expected %d", msg.version, MULTIFD_VERSION);
+        return -1;
+    }
+
+    if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
+        char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
+        error_setg(errp, "multifd: received uuid '%s' and expected "
+                   "uuid '%s' for channel %hhd", msg.uuid, uuid, msg.id);
+        g_free(uuid);
+        return -1;
+    }
+
+    if (msg.id > migrate_multifd_channels()) {
+        error_setg(errp, "multifd: received channel version %d "
+                   "expected %d", msg.version, MULTIFD_VERSION);
+        return -1;
+    }
+
+    return msg.id;
+}
+
 struct {
     MultiFDSendParams *params;
     /* number of created threads */
@@ -474,6 +545,11 @@ int multifd_save_cleanup(Error **errp)
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
+    Error *local_err = NULL;
+
+    if (multifd_send_initial_packet(p, &local_err) < 0) {
+        goto out;
+    }
 
     while (true) {
         qemu_mutex_lock(&p->mutex);
@@ -485,6 +561,11 @@ static void *multifd_send_thread(void *opaque)
         qemu_sem_wait(&p->sem);
     }
 
+out:
+    if (local_err) {
+        multifd_send_terminate_threads(local_err);
+    }
+
     qemu_mutex_lock(&p->mutex);
     p->running = false;
     qemu_mutex_unlock(&p->mutex);
@@ -669,12 +750,22 @@ bool multifd_recv_all_channels_created(void)
 void multifd_recv_new_channel(QIOChannel *ioc)
 {
     MultiFDRecvParams *p;
-    /* we need to invent channels id's until we transmit */
-    /* we will remove this on a later patch */
-    static int i;
+    Error *local_err = NULL;
+    int id;
 
-    p = &multifd_recv_state->params[i];
-    i++;
+    id = multifd_recv_initial_packet(ioc, &local_err);
+    if (id < 0) {
+        multifd_recv_terminate_threads(local_err);
+        return;
+    }
+
+    p = &multifd_recv_state->params[id];
+    if (p->c != NULL) {
+        error_setg(&local_err, "multifd: received id '%d' already setup'",
+                   id);
+        multifd_recv_terminate_threads(local_err);
+        return;
+    }
     p->c = ioc;
     object_ref(OBJECT(ioc));
 
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 09/21] migration: Define MultifdRecvParams sooner
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
                   ` (7 preceding siblings ...)
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 08/21] migration: Transmit initial package through the multifd channels Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-05-02 17:32   ` Dr. David Alan Gilbert
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 10/21] migration: Create multipage support Juan Quintela
                   ` (13 subsequent siblings)
  22 siblings, 1 reply; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Once there, we don't need the struct names anywhere, just the
typedefs.  And now also document all fields.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 46 +++++++++++++++++++++++++++++++---------------
 1 file changed, 31 insertions(+), 15 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 1aab392d8f..ffefa73099 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -412,17 +412,45 @@ typedef struct {
     uint8_t id;
 } __attribute__((packed)) MultiFDInit_t;
 
-struct MultiFDSendParams {
+typedef struct {
+    /* this fields are not changed once the thread is created */
+    /* channel number */
     uint8_t id;
+    /* channel thread name */
     char *name;
+    /* channel thread id */
     QemuThread thread;
+    /* communication channel */
     QIOChannel *c;
+    /* sem where to wait for more work */
     QemuSemaphore sem;
+    /* this mutex protects the following parameters */
     QemuMutex mutex;
+    /* is this channel thread running */
     bool running;
+    /* should this thread finish */
     bool quit;
-};
-typedef struct MultiFDSendParams MultiFDSendParams;
+}  MultiFDSendParams;
+
+typedef struct {
+    /* this fields are not changed once the thread is created */
+    /* channel number */
+    uint8_t id;
+    /* channel thread name */
+    char *name;
+    /* channel thread id */
+    QemuThread thread;
+    /* communication channel */
+    QIOChannel *c;
+    /* sem where to wait for more work */
+    QemuSemaphore sem;
+    /* this mutex protects the following parameters */
+    QemuMutex mutex;
+    /* is this channel thread running */
+    bool running;
+    /* should this thread finish */
+    bool quit;
+} MultiFDRecvParams;
 
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
 {
@@ -619,18 +647,6 @@ int multifd_save_setup(void)
     return 0;
 }
 
-struct MultiFDRecvParams {
-    uint8_t id;
-    char *name;
-    QemuThread thread;
-    QIOChannel *c;
-    QemuSemaphore sem;
-    QemuMutex mutex;
-    bool running;
-    bool quit;
-};
-typedef struct MultiFDRecvParams MultiFDRecvParams;
-
 struct {
     MultiFDRecvParams *params;
     /* number of created threads */
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 10/21] migration: Create multipage support
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
                   ` (8 preceding siblings ...)
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 09/21] migration: Define MultifdRecvParams sooner Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-04-26  7:15   ` Peter Xu
  2018-05-02 17:52   ` Dr. David Alan Gilbert
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 11/21] migration: Create multifd packet Juan Quintela
                   ` (12 subsequent siblings)
  22 siblings, 2 replies; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We only create/destry the page list here.  We will use it later.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 56 +++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 56 insertions(+)

diff --git a/migration/ram.c b/migration/ram.c
index ffefa73099..b19300992e 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -412,6 +412,20 @@ typedef struct {
     uint8_t id;
 } __attribute__((packed)) MultiFDInit_t;
 
+typedef struct {
+    /* number of used pages */
+    uint32_t used;
+    /* number of allocated pages */
+    uint32_t allocated;
+    /* global number of generated multifd packets */
+    uint32_t seq;
+    /* offset of each page */
+    ram_addr_t *offset;
+    /* pointer to each page */
+    struct iovec *iov;
+    RAMBlock *block;
+} MultiFDPages_t;
+
 typedef struct {
     /* this fields are not changed once the thread is created */
     /* channel number */
@@ -430,6 +444,8 @@ typedef struct {
     bool running;
     /* should this thread finish */
     bool quit;
+    /* array of pages to sent */
+    MultiFDPages_t *pages;
 }  MultiFDSendParams;
 
 typedef struct {
@@ -450,6 +466,8 @@ typedef struct {
     bool running;
     /* should this thread finish */
     bool quit;
+    /* array of pages to receive */
+    MultiFDPages_t *pages;
 } MultiFDRecvParams;
 
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
@@ -511,10 +529,35 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
     return msg.id;
 }
 
+static void multifd_pages_init(MultiFDPages_t **ppages, size_t size)
+{
+    MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
+
+    pages->allocated = size;
+    pages->iov = g_new0(struct iovec, size);
+    pages->offset = g_new0(ram_addr_t, size);
+    *ppages = pages;
+}
+
+static void multifd_pages_clear(MultiFDPages_t *pages)
+{
+    pages->used = 0;
+    pages->allocated = 0;
+    pages->seq = 0;
+    pages->block = NULL;
+    g_free(pages->iov);
+    pages->iov = NULL;
+    g_free(pages->offset);
+    pages->offset = NULL;
+    g_free(pages);
+}
+
 struct {
     MultiFDSendParams *params;
     /* number of created threads */
     int count;
+    /* array of pages to sent */
+    MultiFDPages_t *pages;
 } *multifd_send_state;
 
 static void multifd_send_terminate_threads(Error *err)
@@ -562,9 +605,13 @@ int multifd_save_cleanup(Error **errp)
         qemu_sem_destroy(&p->sem);
         g_free(p->name);
         p->name = NULL;
+        multifd_pages_clear(p->pages);
+        p->pages = NULL;
     }
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
+    multifd_pages_clear(multifd_send_state->pages);
+    multifd_send_state->pages = NULL;
     g_free(multifd_send_state);
     multifd_send_state = NULL;
     return ret;
@@ -625,6 +672,7 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
 int multifd_save_setup(void)
 {
     int thread_count;
+    uint32_t page_count = migrate_multifd_page_count();
     uint8_t i;
 
     if (!migrate_use_multifd()) {
@@ -634,6 +682,8 @@ int multifd_save_setup(void)
     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
     atomic_set(&multifd_send_state->count, 0);
+    multifd_pages_init(&multifd_send_state->pages, page_count);
+
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
@@ -641,6 +691,7 @@ int multifd_save_setup(void)
         qemu_sem_init(&p->sem, 0);
         p->quit = false;
         p->id = i;
+        multifd_pages_init(&p->pages, page_count);
         p->name = g_strdup_printf("multifdsend_%d", i);
         socket_send_channel_create(multifd_new_send_channel_async, p);
     }
@@ -698,6 +749,8 @@ int multifd_load_cleanup(Error **errp)
         qemu_sem_destroy(&p->sem);
         g_free(p->name);
         p->name = NULL;
+        multifd_pages_clear(p->pages);
+        p->pages = NULL;
     }
     g_free(multifd_recv_state->params);
     multifd_recv_state->params = NULL;
@@ -731,6 +784,7 @@ static void *multifd_recv_thread(void *opaque)
 int multifd_load_setup(void)
 {
     int thread_count;
+    uint32_t page_count = migrate_multifd_page_count();
     uint8_t i;
 
     if (!migrate_use_multifd()) {
@@ -740,6 +794,7 @@ int multifd_load_setup(void)
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
     atomic_set(&multifd_recv_state->count, 0);
+
     for (i = 0; i < thread_count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
@@ -747,6 +802,7 @@ int multifd_load_setup(void)
         qemu_sem_init(&p->sem, 0);
         p->quit = false;
         p->id = i;
+        multifd_pages_init(&p->pages, page_count);
         p->name = g_strdup_printf("multifdrecv_%d", i);
     }
     return 0;
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 11/21] migration: Create multifd packet
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
                   ` (9 preceding siblings ...)
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 10/21] migration: Create multipage support Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-05-02 18:04   ` Dr. David Alan Gilbert
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 12/21] migration: Add multifd traces for start/end thread Juan Quintela
                   ` (11 subsequent siblings)
  22 siblings, 1 reply; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We still don't put anything there.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 137 +++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 136 insertions(+), 1 deletion(-)

diff --git a/migration/ram.c b/migration/ram.c
index b19300992e..804c83ed89 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -412,6 +412,17 @@ typedef struct {
     uint8_t id;
 } __attribute__((packed)) MultiFDInit_t;
 
+typedef struct {
+    uint32_t magic;
+    uint32_t version;
+    uint32_t flags;
+    uint32_t size;
+    uint32_t used;
+    uint32_t seq;
+    char ramblock[256];
+    uint64_t offset[];
+} __attribute__((packed)) MultiFDPacket_t;
+
 typedef struct {
     /* number of used pages */
     uint32_t used;
@@ -446,6 +457,14 @@ typedef struct {
     bool quit;
     /* array of pages to sent */
     MultiFDPages_t *pages;
+    /* packet allocated len */
+    uint32_t packet_len;
+    /* pointer to the packet */
+    MultiFDPacket_t *packet;
+    /* multifd flags for each packet */
+    uint32_t flags;
+    /* global number of generated multifd packets */
+    uint32_t seq;
 }  MultiFDSendParams;
 
 typedef struct {
@@ -468,6 +487,14 @@ typedef struct {
     bool quit;
     /* array of pages to receive */
     MultiFDPages_t *pages;
+    /* packet allocated len */
+    uint32_t packet_len;
+    /* pointer to the packet */
+    MultiFDPacket_t *packet;
+    /* multifd flags for each packet */
+    uint32_t flags;
+    /* global number of generated multifd packets */
+    uint32_t seq;
 } MultiFDRecvParams;
 
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
@@ -552,6 +579,91 @@ static void multifd_pages_clear(MultiFDPages_t *pages)
     g_free(pages);
 }
 
+static void multifd_send_fill_packet(MultiFDSendParams *p)
+{
+    MultiFDPacket_t *packet = p->packet;
+    int i;
+
+    packet->magic = cpu_to_be32(MULTIFD_MAGIC);
+    packet->version = cpu_to_be32(MULTIFD_VERSION);
+    packet->flags = cpu_to_be32(p->flags);
+    packet->size = cpu_to_be32(migrate_multifd_page_count());
+    packet->used = cpu_to_be32(p->pages->used);
+    packet->seq = cpu_to_be32(p->seq);
+
+    if (p->pages->block) {
+        strncpy(packet->ramblock, p->pages->block->idstr, 256);
+    }
+
+    for (i = 0; i < p->pages->used; i++) {
+        packet->offset[i] = cpu_to_be64(p->pages->offset[i]);
+    }
+}
+
+static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
+{
+    MultiFDPacket_t *packet = p->packet;
+    RAMBlock *block;
+    int i;
+
+    /* ToDo: We can't use it until we haven't received a message */
+    return 0;
+
+    be32_to_cpus(&packet->magic);
+    if (packet->magic != MULTIFD_MAGIC) {
+        error_setg(errp, "multifd: received packet "
+                   "version %d and expected version %d",
+                   packet->magic, MULTIFD_VERSION);
+        return -1;
+    }
+
+    be32_to_cpus(&packet->version);
+    if (packet->version != MULTIFD_VERSION) {
+        error_setg(errp, "multifd: received packet "
+                   "version %d and expected version %d",
+                   packet->version, MULTIFD_VERSION);
+        return -1;
+    }
+
+    p->flags = be32_to_cpu(packet->flags);
+
+    be32_to_cpus(&packet->size);
+    if (packet->size > migrate_multifd_page_count()) {
+        error_setg(errp, "multifd: received packet "
+                   "with size %d and expected maximum size %d",
+                   packet->size, migrate_multifd_page_count()) ;
+        return -1;
+    }
+
+    p->pages->used = be32_to_cpu(packet->used);
+    if (p->pages->used > packet->size) {
+        error_setg(errp, "multifd: received packet "
+                   "with size %d and expected maximum size %d",
+                   p->pages->used, packet->size) ;
+        return -1;
+    }
+
+    p->seq = be32_to_cpu(packet->seq);
+
+    if (p->pages->used) {
+        block = qemu_ram_block_by_name(packet->ramblock);
+        if (!block) {
+            error_setg(errp, "multifd: unknown ram block %s",
+                       packet->ramblock);
+            return -1;
+        }
+    }
+
+    for (i = 0; i < p->pages->used; i++) {
+        ram_addr_t offset = be64_to_cpu(packet->offset[i]);
+
+        p->pages->iov[i].iov_base = block->host + offset;
+        p->pages->iov[i].iov_len = TARGET_PAGE_SIZE;
+    }
+
+    return 0;
+}
+
 struct {
     MultiFDSendParams *params;
     /* number of created threads */
@@ -607,6 +719,9 @@ int multifd_save_cleanup(Error **errp)
         p->name = NULL;
         multifd_pages_clear(p->pages);
         p->pages = NULL;
+        p->packet_len = 0;
+        g_free(p->packet);
+        p->packet = NULL;
     }
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
@@ -628,6 +743,7 @@ static void *multifd_send_thread(void *opaque)
 
     while (true) {
         qemu_mutex_lock(&p->mutex);
+        multifd_send_fill_packet(p);
         if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
@@ -692,6 +808,9 @@ int multifd_save_setup(void)
         p->quit = false;
         p->id = i;
         multifd_pages_init(&p->pages, page_count);
+        p->packet_len = sizeof(MultiFDPacket_t)
+                      + sizeof(ram_addr_t) * page_count;
+        p->packet = g_malloc0(p->packet_len);
         p->name = g_strdup_printf("multifdsend_%d", i);
         socket_send_channel_create(multifd_new_send_channel_async, p);
     }
@@ -751,6 +870,9 @@ int multifd_load_cleanup(Error **errp)
         p->name = NULL;
         multifd_pages_clear(p->pages);
         p->pages = NULL;
+        p->packet_len = 0;
+        g_free(p->packet);
+        p->packet = NULL;
     }
     g_free(multifd_recv_state->params);
     multifd_recv_state->params = NULL;
@@ -763,10 +885,20 @@ int multifd_load_cleanup(Error **errp)
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
+    Error *local_err = NULL;
+    int ret;
 
     while (true) {
         qemu_mutex_lock(&p->mutex);
-        if (p->quit) {
+        if (false)  {
+            /* ToDo: Packet reception goes here */
+
+            ret = multifd_recv_unfill_packet(p, &local_err);
+            qemu_mutex_unlock(&p->mutex);
+            if (ret) {
+                break;
+            }
+        } else if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
         }
@@ -803,6 +935,9 @@ int multifd_load_setup(void)
         p->quit = false;
         p->id = i;
         multifd_pages_init(&p->pages, page_count);
+        p->packet_len = sizeof(MultiFDPacket_t)
+                      + sizeof(ram_addr_t) * page_count;
+        p->packet = g_malloc0(p->packet_len);
         p->name = g_strdup_printf("multifdrecv_%d", i);
     }
     return 0;
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 12/21] migration: Add multifd traces for start/end thread
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
                   ` (10 preceding siblings ...)
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 11/21] migration: Create multifd packet Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-05-02 18:35   ` Dr. David Alan Gilbert
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 13/21] migration: Calculate transferred ram correctly Juan Quintela
                   ` (10 subsequent siblings)
  22 siblings, 1 reply; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We want to know how many pages/packets each channel has sent.  Add
counters for those.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c        | 20 ++++++++++++++++++++
 migration/trace-events |  4 ++++
 2 files changed, 24 insertions(+)

diff --git a/migration/ram.c b/migration/ram.c
index 804c83ed89..0f1340b4e3 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -465,6 +465,10 @@ typedef struct {
     uint32_t flags;
     /* global number of generated multifd packets */
     uint32_t seq;
+    /* packets sent through this channel */
+    uint32_t num_packets;
+    /* pages sent through this channel */
+    uint32_t num_pages;
 }  MultiFDSendParams;
 
 typedef struct {
@@ -495,6 +499,10 @@ typedef struct {
     uint32_t flags;
     /* global number of generated multifd packets */
     uint32_t seq;
+    /* packets sent through this channel */
+    uint32_t num_packets;
+    /* pages sent through this channel */
+    uint32_t num_pages;
 } MultiFDRecvParams;
 
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
@@ -737,9 +745,13 @@ static void *multifd_send_thread(void *opaque)
     MultiFDSendParams *p = opaque;
     Error *local_err = NULL;
 
+    trace_multifd_send_thread_start(p->id);
+
     if (multifd_send_initial_packet(p, &local_err) < 0) {
         goto out;
     }
+    /* initial packet */
+    p->num_packets = 1;
 
     while (true) {
         qemu_mutex_lock(&p->mutex);
@@ -761,6 +773,8 @@ out:
     p->running = false;
     qemu_mutex_unlock(&p->mutex);
 
+    trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
+
     return NULL;
 }
 
@@ -888,6 +902,8 @@ static void *multifd_recv_thread(void *opaque)
     Error *local_err = NULL;
     int ret;
 
+    trace_multifd_recv_thread_start(p->id);
+
     while (true) {
         qemu_mutex_lock(&p->mutex);
         if (false)  {
@@ -910,6 +926,8 @@ static void *multifd_recv_thread(void *opaque)
     p->running = false;
     qemu_mutex_unlock(&p->mutex);
 
+    trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
+
     return NULL;
 }
 
@@ -975,6 +993,8 @@ void multifd_recv_new_channel(QIOChannel *ioc)
     }
     p->c = ioc;
     object_ref(OBJECT(ioc));
+    /* initial packet */
+    p->num_packets = 1;
 
     p->running = true;
     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
diff --git a/migration/trace-events b/migration/trace-events
index a180d7b008..e480eb050e 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -77,6 +77,10 @@ ram_load_postcopy_loop(uint64_t addr, int flags) "@%" PRIx64 " %x"
 ram_postcopy_send_discard_bitmap(void) ""
 ram_save_page(const char *rbname, uint64_t offset, void *host) "%s: offset: 0x%" PRIx64 " host: %p"
 ram_save_queue_pages(const char *rbname, size_t start, size_t len) "%s: start: 0x%zx len: 0x%zx"
+multifd_send_thread_start(uint8_t id) "%d"
+multifd_send_thread_end(uint8_t id, uint32_t packets, uint32_t pages) "channel %d packets %d pages %d"
+multifd_recv_thread_start(uint8_t id) "%d"
+multifd_recv_thread_end(uint8_t id, uint32_t packets, uint32_t pages) "channel %d packets %d pages %d"
 
 # migration/migration.c
 await_return_path_close_on_source_close(void) ""
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 13/21] migration: Calculate transferred ram correctly
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
                   ` (11 preceding siblings ...)
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 12/21] migration: Add multifd traces for start/end thread Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-05-02 18:59   ` Dr. David Alan Gilbert
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 14/21] migration: Multifd channels always wait on the sem Juan Quintela
                   ` (9 subsequent siblings)
  22 siblings, 1 reply; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

On multifd we send data from more places that main channel.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/migration.c | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 9b510a809a..75d30661e9 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -2246,12 +2246,19 @@ static void migration_update_counters(MigrationState *s,
 {
     uint64_t transferred, time_spent;
     double bandwidth;
+    uint64_t now;
 
     if (current_time < s->iteration_start_time + BUFFER_DELAY) {
         return;
     }
 
-    transferred = qemu_ftell(s->to_dst_file) - s->iteration_initial_bytes;
+    if (migrate_use_multifd()) {
+        now = ram_counters.normal * qemu_target_page_size()
+            + qemu_ftell(s->to_dst_file);
+    } else {
+        now = qemu_ftell(s->to_dst_file);
+    }
+    transferred = now - s->iteration_initial_bytes;
     time_spent = current_time - s->iteration_start_time;
     bandwidth = (double)transferred / time_spent;
     s->threshold_size = bandwidth * s->parameters.downtime_limit;
@@ -2271,7 +2278,7 @@ static void migration_update_counters(MigrationState *s,
     qemu_file_reset_rate_limit(s->to_dst_file);
 
     s->iteration_start_time = current_time;
-    s->iteration_initial_bytes = qemu_ftell(s->to_dst_file);
+    s->iteration_initial_bytes = now;
 
     trace_migrate_transferred(transferred, time_spent,
                               bandwidth, s->threshold_size);
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 14/21] migration: Multifd channels always wait on the sem
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
                   ` (12 preceding siblings ...)
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 13/21] migration: Calculate transferred ram correctly Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-05-03  9:36   ` Dr. David Alan Gilbert
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 15/21] migration: Add block where to send/receive packets Juan Quintela
                   ` (8 subsequent siblings)
  22 siblings, 1 reply; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Either for quit, sync or packet, we first wake them.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 0f1340b4e3..21b448c4ed 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -754,6 +754,7 @@ static void *multifd_send_thread(void *opaque)
     p->num_packets = 1;
 
     while (true) {
+        qemu_sem_wait(&p->sem);
         qemu_mutex_lock(&p->mutex);
         multifd_send_fill_packet(p);
         if (p->quit) {
@@ -761,7 +762,9 @@ static void *multifd_send_thread(void *opaque)
             break;
         }
         qemu_mutex_unlock(&p->mutex);
-        qemu_sem_wait(&p->sem);
+        /* this is impossible */
+        error_setg(&local_err, "multifd_send_thread: Unknown command");
+        break;
     }
 
 out:
@@ -905,6 +908,7 @@ static void *multifd_recv_thread(void *opaque)
     trace_multifd_recv_thread_start(p->id);
 
     while (true) {
+        qemu_sem_wait(&p->sem);
         qemu_mutex_lock(&p->mutex);
         if (false)  {
             /* ToDo: Packet reception goes here */
@@ -919,9 +923,14 @@ static void *multifd_recv_thread(void *opaque)
             break;
         }
         qemu_mutex_unlock(&p->mutex);
-        qemu_sem_wait(&p->sem);
+        /* this is impossible */
+        error_setg(&local_err, "multifd_recv_thread: Unknown command");
+        break;
     }
 
+    if (local_err) {
+        multifd_recv_terminate_threads(local_err);
+    }
     qemu_mutex_lock(&p->mutex);
     p->running = false;
     qemu_mutex_unlock(&p->mutex);
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 15/21] migration: Add block where to send/receive packets
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
                   ` (13 preceding siblings ...)
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 14/21] migration: Multifd channels always wait on the sem Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-05-03 10:03   ` Dr. David Alan Gilbert
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 16/21] migration: Synchronize multifd threads with main thread Juan Quintela
                   ` (7 subsequent siblings)
  22 siblings, 1 reply; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Once there add tracepoints.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c        | 49 +++++++++++++++++++++++++++++++++++++-----
 migration/trace-events |  2 ++
 2 files changed, 46 insertions(+), 5 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 21b448c4ed..c4c185cc4c 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -455,6 +455,8 @@ typedef struct {
     bool running;
     /* should this thread finish */
     bool quit;
+    /* thread has work to do */
+    int pending_job;
     /* array of pages to sent */
     MultiFDPages_t *pages;
     /* packet allocated len */
@@ -489,6 +491,8 @@ typedef struct {
     bool running;
     /* should this thread finish */
     bool quit;
+    /* thread has work to do */
+    bool pending_job;
     /* array of pages to receive */
     MultiFDPages_t *pages;
     /* packet allocated len */
@@ -756,8 +760,28 @@ static void *multifd_send_thread(void *opaque)
     while (true) {
         qemu_sem_wait(&p->sem);
         qemu_mutex_lock(&p->mutex);
-        multifd_send_fill_packet(p);
-        if (p->quit) {
+
+        if (p->pending_job) {
+            uint32_t used = p->pages->used;
+            uint32_t seq = p->seq;
+            uint32_t flags = p->flags;
+
+            multifd_send_fill_packet(p);
+            p->flags = 0;
+            p->num_packets++;
+            p->num_pages += used;
+            p->pages->used = 0;
+            qemu_mutex_unlock(&p->mutex);
+
+            trace_multifd_send(p->id, seq, used, flags);
+
+            /* ToDo: send packet here */
+
+            qemu_mutex_lock(&p->mutex);
+            p->pending_job--;
+            qemu_mutex_unlock(&p->mutex);
+            continue;
+        } else if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
         }
@@ -823,6 +847,7 @@ int multifd_save_setup(void)
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
         p->quit = false;
+        p->pending_job = 0;
         p->id = i;
         multifd_pages_init(&p->pages, page_count);
         p->packet_len = sizeof(MultiFDPacket_t)
@@ -910,14 +935,27 @@ static void *multifd_recv_thread(void *opaque)
     while (true) {
         qemu_sem_wait(&p->sem);
         qemu_mutex_lock(&p->mutex);
-        if (false)  {
-            /* ToDo: Packet reception goes here */
+        if (p->pending_job) {
+            uint32_t used;
+            uint32_t flags;
+            qemu_mutex_unlock(&p->mutex);
 
+            /* ToDo: recv packet here */
+
+            qemu_mutex_lock(&p->mutex);
             ret = multifd_recv_unfill_packet(p, &local_err);
-            qemu_mutex_unlock(&p->mutex);
             if (ret) {
+                qemu_mutex_unlock(&p->mutex);
                 break;
             }
+
+            used = p->pages->used;
+            flags = p->flags;
+            trace_multifd_recv(p->id, p->seq, used, flags);
+            p->pending_job = false;
+            p->num_packets++;
+            p->num_pages += used;
+            qemu_mutex_unlock(&p->mutex);
         } else if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
@@ -960,6 +998,7 @@ int multifd_load_setup(void)
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
         p->quit = false;
+        p->pending_job = false;
         p->id = i;
         multifd_pages_init(&p->pages, page_count);
         p->packet_len = sizeof(MultiFDPacket_t)
diff --git a/migration/trace-events b/migration/trace-events
index e480eb050e..9eee048287 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -81,6 +81,8 @@ multifd_send_thread_start(uint8_t id) "%d"
 multifd_send_thread_end(uint8_t id, uint32_t packets, uint32_t pages) "channel %d packets %d pages %d"
 multifd_recv_thread_start(uint8_t id) "%d"
 multifd_recv_thread_end(uint8_t id, uint32_t packets, uint32_t pages) "channel %d packets %d pages %d"
+multifd_send(uint8_t id, uint32_t seq, uint32_t used, uint32_t flags) "channel %d seq number %d pages %d flags 0x%x"
+multifd_recv(uint8_t id, uint32_t seq, uint32_t used, uint32_t flags) "channel %d seq number %d pages %d flags 0x%x"
 
 # migration/migration.c
 await_return_path_close_on_source_close(void) ""
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 16/21] migration: Synchronize multifd threads with main thread
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
                   ` (14 preceding siblings ...)
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 15/21] migration: Add block where to send/receive packets Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-05-03 10:44   ` Dr. David Alan Gilbert
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 17/21] migration: Create ram_multifd_page Juan Quintela
                   ` (6 subsequent siblings)
  22 siblings, 1 reply; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We synchronize all threads each RAM_SAVE_FLAG_EOS.  Bitmap
synchronizations don't happen inside a  ram section, so we are safe
about two channels trying to overwrite the same memory.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c        | 118 +++++++++++++++++++++++++++++++++++++----
 migration/trace-events |   6 +++
 2 files changed, 113 insertions(+), 11 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index c4c185cc4c..398cb0af3b 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -405,6 +405,8 @@ static void compress_threads_save_setup(void)
 #define MULTIFD_MAGIC 0x11223344U
 #define MULTIFD_VERSION 1
 
+#define MULTIFD_FLAG_SYNC (1 << 0)
+
 typedef struct {
     uint32_t magic;
     uint32_t version;
@@ -471,6 +473,8 @@ typedef struct {
     uint32_t num_packets;
     /* pages sent through this channel */
     uint32_t num_pages;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
 }  MultiFDSendParams;
 
 typedef struct {
@@ -507,6 +511,8 @@ typedef struct {
     uint32_t num_packets;
     /* pages sent through this channel */
     uint32_t num_pages;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
 } MultiFDRecvParams;
 
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
@@ -682,6 +688,10 @@ struct {
     int count;
     /* array of pages to sent */
     MultiFDPages_t *pages;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
+    /* global number of generated multifd packets */
+    uint32_t seq;
 } *multifd_send_state;
 
 static void multifd_send_terminate_threads(Error *err)
@@ -727,6 +737,7 @@ int multifd_save_cleanup(Error **errp)
         p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        qemu_sem_destroy(&p->sem_sync);
         g_free(p->name);
         p->name = NULL;
         multifd_pages_clear(p->pages);
@@ -735,6 +746,7 @@ int multifd_save_cleanup(Error **errp)
         g_free(p->packet);
         p->packet = NULL;
     }
+    qemu_sem_destroy(&multifd_send_state->sem_sync);
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
     multifd_pages_clear(multifd_send_state->pages);
@@ -744,6 +756,33 @@ int multifd_save_cleanup(Error **errp)
     return ret;
 }
 
+static void multifd_send_sync_main(void)
+{
+    int i;
+
+    if (!migrate_use_multifd()) {
+        return;
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+
+        trace_multifd_send_sync_main_signal(p->id);
+
+        qemu_mutex_lock(&p->mutex);
+        p->flags |= MULTIFD_FLAG_SYNC;
+        p->pending_job++;
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_post(&p->sem);
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+
+        trace_multifd_send_sync_main_wait(p->id);
+        qemu_sem_wait(&multifd_send_state->sem_sync);
+    }
+    trace_multifd_send_sync_main(multifd_send_state->seq);
+}
+
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
@@ -778,17 +817,20 @@ static void *multifd_send_thread(void *opaque)
             /* ToDo: send packet here */
 
             qemu_mutex_lock(&p->mutex);
+            p->flags = 0;
             p->pending_job--;
             qemu_mutex_unlock(&p->mutex);
-            continue;
+
+            if (flags & MULTIFD_FLAG_SYNC) {
+                qemu_sem_post(&multifd_send_state->sem_sync);
+            }
         } else if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
+        } else {
+            qemu_mutex_unlock(&p->mutex);
+            /* sometimes there are spurious wakeups */
         }
-        qemu_mutex_unlock(&p->mutex);
-        /* this is impossible */
-        error_setg(&local_err, "multifd_send_thread: Unknown command");
-        break;
     }
 
 out:
@@ -840,12 +882,14 @@ int multifd_save_setup(void)
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
     atomic_set(&multifd_send_state->count, 0);
     multifd_pages_init(&multifd_send_state->pages, page_count);
+    qemu_sem_init(&multifd_send_state->sem_sync, 0);
 
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
+        qemu_sem_init(&p->sem_sync, 0);
         p->quit = false;
         p->pending_job = 0;
         p->id = i;
@@ -863,6 +907,10 @@ struct {
     MultiFDRecvParams *params;
     /* number of created threads */
     int count;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
+    /* global number of generated multifd packets */
+    uint32_t seq;
 } *multifd_recv_state;
 
 static void multifd_recv_terminate_threads(Error *err)
@@ -908,6 +956,7 @@ int multifd_load_cleanup(Error **errp)
         p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        qemu_sem_destroy(&p->sem_sync);
         g_free(p->name);
         p->name = NULL;
         multifd_pages_clear(p->pages);
@@ -916,6 +965,7 @@ int multifd_load_cleanup(Error **errp)
         g_free(p->packet);
         p->packet = NULL;
     }
+    qemu_sem_destroy(&multifd_recv_state->sem_sync);
     g_free(multifd_recv_state->params);
     multifd_recv_state->params = NULL;
     g_free(multifd_recv_state);
@@ -924,6 +974,42 @@ int multifd_load_cleanup(Error **errp)
     return ret;
 }
 
+static void multifd_recv_sync_main(void)
+{
+    int i;
+
+    if (!migrate_use_multifd()) {
+        return;
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        trace_multifd_recv_sync_main_signal(p->id);
+        qemu_mutex_lock(&p->mutex);
+        p->pending_job = true;
+        qemu_mutex_unlock(&p->mutex);
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        trace_multifd_recv_sync_main_wait(p->id);
+        qemu_sem_wait(&multifd_recv_state->sem_sync);
+        qemu_mutex_lock(&p->mutex);
+        if (multifd_recv_state->seq < p->seq) {
+            multifd_recv_state->seq = p->seq;
+        }
+        qemu_mutex_unlock(&p->mutex);
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        trace_multifd_recv_sync_main_signal(p->id);
+
+        qemu_sem_post(&p->sem_sync);
+    }
+    trace_multifd_recv_sync_main(multifd_recv_state->seq);
+}
+
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
@@ -933,9 +1019,8 @@ static void *multifd_recv_thread(void *opaque)
     trace_multifd_recv_thread_start(p->id);
 
     while (true) {
-        qemu_sem_wait(&p->sem);
         qemu_mutex_lock(&p->mutex);
-        if (p->pending_job) {
+        if (true || p->pending_job) {
             uint32_t used;
             uint32_t flags;
             qemu_mutex_unlock(&p->mutex);
@@ -956,14 +1041,18 @@ static void *multifd_recv_thread(void *opaque)
             p->num_packets++;
             p->num_pages += used;
             qemu_mutex_unlock(&p->mutex);
+
+            if (flags & MULTIFD_FLAG_SYNC) {
+                qemu_sem_post(&multifd_recv_state->sem_sync);
+                qemu_sem_wait(&p->sem_sync);
+            }
         } else if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
+        } else {
+            qemu_mutex_unlock(&p->mutex);
+            /* sometimes there are spurious wakeups */
         }
-        qemu_mutex_unlock(&p->mutex);
-        /* this is impossible */
-        error_setg(&local_err, "multifd_recv_thread: Unknown command");
-        break;
     }
 
     if (local_err) {
@@ -991,12 +1080,14 @@ int multifd_load_setup(void)
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
     atomic_set(&multifd_recv_state->count, 0);
+    qemu_sem_init(&multifd_recv_state->sem_sync, 0);
 
     for (i = 0; i < thread_count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
+        qemu_sem_init(&p->sem_sync, 0);
         p->quit = false;
         p->pending_job = false;
         p->id = i;
@@ -2695,6 +2786,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
 
+    multifd_send_sync_main();
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
 
     return 0;
@@ -2770,6 +2862,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
      */
     ram_control_after_iterate(f, RAM_CONTROL_ROUND);
 
+    multifd_send_sync_main();
 out:
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
     ram_counters.transferred += 8;
@@ -2823,6 +2916,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
 
     rcu_read_unlock();
 
+    multifd_send_sync_main();
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
 
     return 0;
@@ -3253,6 +3347,7 @@ static int ram_load_postcopy(QEMUFile *f)
             break;
         case RAM_SAVE_FLAG_EOS:
             /* normal exit */
+            multifd_recv_sync_main();
             break;
         default:
             error_report("Unknown combination of migration flags: %#x"
@@ -3438,6 +3533,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             break;
         case RAM_SAVE_FLAG_EOS:
             /* normal exit */
+            multifd_recv_sync_main();
             break;
         default:
             if (flags & RAM_SAVE_FLAG_HOOK) {
diff --git a/migration/trace-events b/migration/trace-events
index 9eee048287..b0ab8e2d03 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -83,6 +83,12 @@ multifd_recv_thread_start(uint8_t id) "%d"
 multifd_recv_thread_end(uint8_t id, uint32_t packets, uint32_t pages) "channel %d packets %d pages %d"
 multifd_send(uint8_t id, uint32_t seq, uint32_t used, uint32_t flags) "channel %d seq number %d pages %d flags 0x%x"
 multifd_recv(uint8_t id, uint32_t seq, uint32_t used, uint32_t flags) "channel %d seq number %d pages %d flags 0x%x"
+multifd_send_sync_main(uint32_t seq) "seq %d"
+multifd_send_sync_main_signal(uint8_t id) "channel %d"
+multifd_send_sync_main_wait(uint8_t id) "channel %d"
+multifd_recv_sync_main(uint32_t seq) "seq %d"
+multifd_recv_sync_main_signal(uint8_t id) "channel %d"
+multifd_recv_sync_main_wait(uint8_t id) "channel %d"
 
 # migration/migration.c
 await_return_path_close_on_source_close(void) ""
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 17/21] migration: Create ram_multifd_page
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
                   ` (15 preceding siblings ...)
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 16/21] migration: Synchronize multifd threads with main thread Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-04-26  7:43   ` Peter Xu
  2018-04-26  8:18   ` Peter Xu
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 18/21] migration: Start sending messages Juan Quintela
                   ` (5 subsequent siblings)
  22 siblings, 2 replies; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

The function still don't use multifd, but we have simplified
ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
counter.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--
Add last_page parameter
Add commets for done and address
Remove multifd field, it is the same than normal pages
Merge next patch, now we send multiple pages at a time
Remove counter for multifd pages, it is identical to normal pages
Use iovec's instead of creating the equivalent.
Clear memory used by pages (dave)
Use g_new0(danp)
define MULTIFD_CONTINUE
now pages member is a pointer
Fix off-by-one in number of pages in one packet
Remove RAM_SAVE_FLAG_MULTIFD_PAGE
s/multifd_pages_t/MultiFDPages_t/
---
 migration/ram.c | 93 +++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 93 insertions(+)

diff --git a/migration/ram.c b/migration/ram.c
index 398cb0af3b..862ec53d32 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -54,6 +54,7 @@
 #include "migration/block.h"
 #include "sysemu/sysemu.h"
 #include "qemu/uuid.h"
+#include "qemu/iov.h"
 
 /***********************************************************/
 /* ram save/restore */
@@ -692,8 +693,65 @@ struct {
     QemuSemaphore sem_sync;
     /* global number of generated multifd packets */
     uint32_t seq;
+    /* send channels ready */
+    QemuSemaphore channels_ready;
 } *multifd_send_state;
 
+static void multifd_send_pages(void)
+{
+    int i;
+    static int next_channel;
+    MultiFDSendParams *p = NULL; /* make happy gcc */
+    MultiFDPages_t *pages = multifd_send_state->pages;
+
+    qemu_sem_wait(&multifd_send_state->channels_ready);
+    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
+        p = &multifd_send_state->params[i];
+
+        qemu_mutex_lock(&p->mutex);
+        if (!p->pending_job) {
+            p->pending_job++;
+            next_channel = (i + 1) % migrate_multifd_channels();
+            break;
+        }
+        qemu_mutex_unlock(&p->mutex);
+    }
+    p->pages->used = 0;
+    multifd_send_state->seq++;
+    p->seq = multifd_send_state->seq;
+    p->pages->block = NULL;
+    multifd_send_state->pages = p->pages;
+    p->pages = pages;
+    qemu_mutex_unlock(&p->mutex);
+    qemu_sem_post(&p->sem);
+}
+
+static void multifd_queue_page(RAMBlock *block, ram_addr_t offset)
+{
+    MultiFDPages_t *pages = multifd_send_state->pages;
+
+    if (!pages->block) {
+        pages->block = block;
+    }
+
+    if (pages->block == block) {
+        pages->offset[pages->used] = offset;
+        pages->iov[pages->used].iov_base = block->host + offset;
+        pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
+        pages->used++;
+
+        if (pages->used < pages->allocated) {
+            return;
+        }
+    }
+
+    multifd_send_pages();
+
+    if (pages->block != block) {
+        multifd_queue_page(block, offset);
+    }
+}
+
 static void multifd_send_terminate_threads(Error *err)
 {
     int i;
@@ -746,6 +804,7 @@ int multifd_save_cleanup(Error **errp)
         g_free(p->packet);
         p->packet = NULL;
     }
+    qemu_sem_destroy(&multifd_send_state->channels_ready);
     qemu_sem_destroy(&multifd_send_state->sem_sync);
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
@@ -763,12 +822,17 @@ static void multifd_send_sync_main(void)
     if (!migrate_use_multifd()) {
         return;
     }
+    if (multifd_send_state->pages->used) {
+        multifd_send_pages();
+    }
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
         trace_multifd_send_sync_main_signal(p->id);
 
         qemu_mutex_lock(&p->mutex);
+        multifd_send_state->seq++;
+        p->seq = multifd_send_state->seq;
         p->flags |= MULTIFD_FLAG_SYNC;
         p->pending_job++;
         qemu_mutex_unlock(&p->mutex);
@@ -824,6 +888,7 @@ static void *multifd_send_thread(void *opaque)
             if (flags & MULTIFD_FLAG_SYNC) {
                 qemu_sem_post(&multifd_send_state->sem_sync);
             }
+            qemu_sem_post(&multifd_send_state->channels_ready);
         } else if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
@@ -883,6 +948,7 @@ int multifd_save_setup(void)
     atomic_set(&multifd_send_state->count, 0);
     multifd_pages_init(&multifd_send_state->pages, page_count);
     qemu_sem_init(&multifd_send_state->sem_sync, 0);
+    qemu_sem_init(&multifd_send_state->channels_ready, 0);
 
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
@@ -1576,6 +1642,31 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
     return pages;
 }
 
+static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
+                            bool last_stage)
+{
+    int pages;
+    uint8_t *p;
+    RAMBlock *block = pss->block;
+    ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
+
+    p = block->host + offset;
+
+    pages = save_zero_page(rs, block, offset);
+    if (pages == -1) {
+        ram_counters.transferred +=
+            save_page_header(rs, rs->f, block,
+                             offset | RAM_SAVE_FLAG_PAGE);
+        multifd_queue_page(block, offset);
+        qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
+        ram_counters.transferred += TARGET_PAGE_SIZE;
+        pages = 1;
+        ram_counters.normal++;
+    }
+
+    return pages;
+}
+
 static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
                                 ram_addr_t offset)
 {
@@ -2004,6 +2095,8 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
         if (migrate_use_compression() &&
             (rs->ram_bulk_stage || !migrate_use_xbzrle())) {
             res = ram_save_compressed_page(rs, pss, last_stage);
+        } else if (migrate_use_multifd()) {
+            res = ram_multifd_page(rs, pss, last_stage);
         } else {
             res = ram_save_page(rs, pss, last_stage);
         }
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 18/21] migration: Start sending messages
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
                   ` (16 preceding siblings ...)
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 17/21] migration: Create ram_multifd_page Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-05-03 14:55   ` Dr. David Alan Gilbert
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 19/21] migration: Wait for blocking IO Juan Quintela
                   ` (4 subsequent siblings)
  22 siblings, 1 reply; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 30 ++++++++++++++++++++++++------
 1 file changed, 24 insertions(+), 6 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 862ec53d32..9adbaa81f9 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -625,9 +625,6 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
     RAMBlock *block;
     int i;
 
-    /* ToDo: We can't use it until we haven't received a message */
-    return 0;
-
     be32_to_cpus(&packet->magic);
     if (packet->magic != MULTIFD_MAGIC) {
         error_setg(errp, "multifd: received packet "
@@ -851,6 +848,7 @@ static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
     Error *local_err = NULL;
+    int ret;
 
     trace_multifd_send_thread_start(p->id);
 
@@ -878,10 +876,18 @@ static void *multifd_send_thread(void *opaque)
 
             trace_multifd_send(p->id, seq, used, flags);
 
-            /* ToDo: send packet here */
+            ret = qio_channel_write_all(p->c, (void *)p->packet,
+                                        p->packet_len, &local_err);
+            if (ret != 0) {
+                break;
+            }
+
+            ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err);
+            if (ret != 0) {
+                break;
+            }
 
             qemu_mutex_lock(&p->mutex);
-            p->flags = 0;
             p->pending_job--;
             qemu_mutex_unlock(&p->mutex);
 
@@ -1091,7 +1097,14 @@ static void *multifd_recv_thread(void *opaque)
             uint32_t flags;
             qemu_mutex_unlock(&p->mutex);
 
-            /* ToDo: recv packet here */
+            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
+                                           p->packet_len, &local_err);
+            if (ret == 0) {   /* EOF */
+                break;
+            }
+            if (ret == -1) {   /* Error */
+                break;
+            }
 
             qemu_mutex_lock(&p->mutex);
             ret = multifd_recv_unfill_packet(p, &local_err);
@@ -1108,6 +1121,11 @@ static void *multifd_recv_thread(void *opaque)
             p->num_pages += used;
             qemu_mutex_unlock(&p->mutex);
 
+            ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
+            if (ret != 0) {
+                break;
+            }
+
             if (flags & MULTIFD_FLAG_SYNC) {
                 qemu_sem_post(&multifd_recv_state->sem_sync);
                 qemu_sem_wait(&p->sem_sync);
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 19/21] migration: Wait for blocking IO
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
                   ` (17 preceding siblings ...)
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 18/21] migration: Start sending messages Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-05-03 15:04   ` Dr. David Alan Gilbert
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 20/21] migration: Remove not needed semaphore and quit Juan Quintela
                   ` (3 subsequent siblings)
  22 siblings, 1 reply; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We have three conditions here:
- channel fails -> error
- we have to quit: we close the channel and reads fails
- normal read that success, we are in bussiness

So forget the complications of waiting in a semaphore.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 81 ++++++++++++++++++-------------------------------
 1 file changed, 29 insertions(+), 52 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 9adbaa81f9..2734f91ded 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -496,8 +496,6 @@ typedef struct {
     bool running;
     /* should this thread finish */
     bool quit;
-    /* thread has work to do */
-    bool pending_job;
     /* array of pages to receive */
     MultiFDPages_t *pages;
     /* packet allocated len */
@@ -1056,14 +1054,6 @@ static void multifd_recv_sync_main(void)
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
-        trace_multifd_recv_sync_main_signal(p->id);
-        qemu_mutex_lock(&p->mutex);
-        p->pending_job = true;
-        qemu_mutex_unlock(&p->mutex);
-    }
-    for (i = 0; i < migrate_multifd_channels(); i++) {
-        MultiFDRecvParams *p = &multifd_recv_state->params[i];
-
         trace_multifd_recv_sync_main_wait(p->id);
         qemu_sem_wait(&multifd_recv_state->sem_sync);
         qemu_mutex_lock(&p->mutex);
@@ -1076,7 +1066,6 @@ static void multifd_recv_sync_main(void)
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         trace_multifd_recv_sync_main_signal(p->id);
-
         qemu_sem_post(&p->sem_sync);
     }
     trace_multifd_recv_sync_main(multifd_recv_state->seq);
@@ -1091,51 +1080,40 @@ static void *multifd_recv_thread(void *opaque)
     trace_multifd_recv_thread_start(p->id);
 
     while (true) {
+        uint32_t used;
+        uint32_t flags;
+
+        ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
+                                       p->packet_len, &local_err);
+        if (ret == 0) {   /* EOF */
+            break;
+        }
+        if (ret == -1) {   /* Error */
+            break;
+        }
+
         qemu_mutex_lock(&p->mutex);
-        if (true || p->pending_job) {
-            uint32_t used;
-            uint32_t flags;
-            qemu_mutex_unlock(&p->mutex);
-
-            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
-                                           p->packet_len, &local_err);
-            if (ret == 0) {   /* EOF */
-                break;
-            }
-            if (ret == -1) {   /* Error */
-                break;
-            }
-
-            qemu_mutex_lock(&p->mutex);
-            ret = multifd_recv_unfill_packet(p, &local_err);
-            if (ret) {
-                qemu_mutex_unlock(&p->mutex);
-                break;
-            }
-
-            used = p->pages->used;
-            flags = p->flags;
-            trace_multifd_recv(p->id, p->seq, used, flags);
-            p->pending_job = false;
-            p->num_packets++;
-            p->num_pages += used;
+        ret = multifd_recv_unfill_packet(p, &local_err);
+        if (ret) {
             qemu_mutex_unlock(&p->mutex);
+            break;
+        }
 
-            ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
-            if (ret != 0) {
-                break;
-            }
+        used = p->pages->used;
+        flags = p->flags;
+        trace_multifd_recv(p->id, p->seq, used, flags);
+        p->num_packets++;
+        p->num_pages += used;
+        qemu_mutex_unlock(&p->mutex);
 
-            if (flags & MULTIFD_FLAG_SYNC) {
-                qemu_sem_post(&multifd_recv_state->sem_sync);
-                qemu_sem_wait(&p->sem_sync);
-            }
-        } else if (p->quit) {
-            qemu_mutex_unlock(&p->mutex);
+        ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
+        if (ret != 0) {
             break;
-        } else {
-            qemu_mutex_unlock(&p->mutex);
-            /* sometimes there are spurious wakeups */
+        }
+
+        if (flags & MULTIFD_FLAG_SYNC) {
+            qemu_sem_post(&multifd_recv_state->sem_sync);
+            qemu_sem_wait(&p->sem_sync);
         }
     }
 
@@ -1173,7 +1151,6 @@ int multifd_load_setup(void)
         qemu_sem_init(&p->sem, 0);
         qemu_sem_init(&p->sem_sync, 0);
         p->quit = false;
-        p->pending_job = false;
         p->id = i;
         multifd_pages_init(&p->pages, page_count);
         p->packet_len = sizeof(MultiFDPacket_t)
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 20/21] migration: Remove not needed semaphore and quit
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
                   ` (18 preceding siblings ...)
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 19/21] migration: Wait for blocking IO Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 21/21] migration: Stop sending whole pages through main channel Juan Quintela
                   ` (2 subsequent siblings)
  22 siblings, 0 replies; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We know quit closing the QIO.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 11 ++---------
 1 file changed, 2 insertions(+), 9 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 2734f91ded..23203756b7 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -488,14 +488,10 @@ typedef struct {
     QemuThread thread;
     /* communication channel */
     QIOChannel *c;
-    /* sem where to wait for more work */
-    QemuSemaphore sem;
     /* this mutex protects the following parameters */
     QemuMutex mutex;
     /* is this channel thread running */
     bool running;
-    /* should this thread finish */
-    bool quit;
     /* array of pages to receive */
     MultiFDPages_t *pages;
     /* packet allocated len */
@@ -1001,8 +997,8 @@ static void multifd_recv_terminate_threads(Error *err)
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         qemu_mutex_lock(&p->mutex);
-        p->quit = true;
-        qemu_sem_post(&p->sem);
+        object_unref(OBJECT(p->c));
+        p->c = NULL;
         qemu_mutex_unlock(&p->mutex);
     }
 }
@@ -1025,7 +1021,6 @@ int multifd_load_cleanup(Error **errp)
         object_unref(OBJECT(p->c));
         p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
-        qemu_sem_destroy(&p->sem);
         qemu_sem_destroy(&p->sem_sync);
         g_free(p->name);
         p->name = NULL;
@@ -1148,9 +1143,7 @@ int multifd_load_setup(void)
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         qemu_mutex_init(&p->mutex);
-        qemu_sem_init(&p->sem, 0);
         qemu_sem_init(&p->sem_sync, 0);
-        p->quit = false;
         p->id = i;
         multifd_pages_init(&p->pages, page_count);
         p->packet_len = sizeof(MultiFDPacket_t)
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* [Qemu-devel] [PATCH v12 21/21] migration: Stop sending whole pages through main channel
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
                   ` (19 preceding siblings ...)
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 20/21] migration: Remove not needed semaphore and quit Juan Quintela
@ 2018-04-25 11:27 ` Juan Quintela
  2018-05-03 15:24   ` Dr. David Alan Gilbert
  2018-04-25 11:44 ` [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
  2018-04-26  8:28 ` Peter Xu
  22 siblings, 1 reply; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:27 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We have to flush() the QEMUFile because now we sent really few data
through that channel.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 11 +++--------
 1 file changed, 3 insertions(+), 8 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 23203756b7..f5cff2eb59 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1634,20 +1634,12 @@ static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
                             bool last_stage)
 {
     int pages;
-    uint8_t *p;
     RAMBlock *block = pss->block;
     ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
 
-    p = block->host + offset;
-
     pages = save_zero_page(rs, block, offset);
     if (pages == -1) {
-        ram_counters.transferred +=
-            save_page_header(rs, rs->f, block,
-                             offset | RAM_SAVE_FLAG_PAGE);
         multifd_queue_page(block, offset);
-        qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
-        ram_counters.transferred += TARGET_PAGE_SIZE;
         pages = 1;
         ram_counters.normal++;
     }
@@ -2869,6 +2861,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
 
     multifd_send_sync_main();
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
+    qemu_fflush(f);
 
     return 0;
 }
@@ -2946,6 +2939,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
     multifd_send_sync_main();
 out:
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
+    qemu_fflush(f);
     ram_counters.transferred += 8;
 
     ret = qemu_file_get_error(f);
@@ -2999,6 +2993,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
 
     multifd_send_sync_main();
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
+    qemu_fflush(f);
 
     return 0;
 }
-- 
2.17.0

^ permalink raw reply related	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 00/21] Multifd
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
                   ` (20 preceding siblings ...)
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 21/21] migration: Stop sending whole pages through main channel Juan Quintela
@ 2018-04-25 11:44 ` Juan Quintela
  2018-05-03 15:32   ` Dr. David Alan Gilbert
  2018-04-26  8:28 ` Peter Xu
  22 siblings, 1 reply; 60+ messages in thread
From: Juan Quintela @ 2018-04-25 11:44 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Juan Quintela <quintela@redhat.com> wrote:
> Hi
>
>
> [v12]
>
> Big news, it is not RFC anymore, it works reliabely for me.
>
> Changes:
> - Locknig changed completely (several times)
> - We now send  all pages through the channels.  In a 2GB guest with 1 disk and a network card, the amount of data send for RAM was 80KB.
> - This is not optimized yet, but it shouws clear improvements over precopy.  testing over localhost networking I can guet:
>   - 2 VCPUs guest
>   - 2GB RAM
>   - runn stress --vm 4 --vm 500GB (i.e. dirtying 2GB or RAM each second)
>
>   - Total time: precopy ~50seconds, multifd  around 11seconds
>   - Bandwidth usage is around 273MB/s vs 71MB/s on the same hardware
>
> This is very preleminary testing, will send more numbers when I got them.  But looks promissing.
>
> Things that will be improved later:
> - Initial synchronization is too slow (around 1s)
> - We synchronize all threads after each RAM section, we can move to only
>   synchronize them after we have done a bitmap syncrhronization
> - We can improve bitmap walking (but that is independent of multifd)

I forgot to put there that on the last 4 patches, I have not been able
to split them in a way that:
- is logical for review
- works for multifd tests in all versions

So, I ended trynig to get the "logical" viewe, and it works after the
last patch.  Why is that?
- Before I am able to transmit data, I need to be able to
  end/synchronize the different channels
- To finish channels in case of error, I just close the channels
  But I can't opet then yet.

I have to think if I can come with a simpler way to split it, but you
can also consider that the  last 3-4 patches are a single one.

Later, Juan.

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 10/21] migration: Create multipage support
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 10/21] migration: Create multipage support Juan Quintela
@ 2018-04-26  7:15   ` Peter Xu
  2018-05-09 10:52     ` Juan Quintela
  2018-05-02 17:52   ` Dr. David Alan Gilbert
  1 sibling, 1 reply; 60+ messages in thread
From: Peter Xu @ 2018-04-26  7:15 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, dgilbert, lvivier

On Wed, Apr 25, 2018 at 01:27:12PM +0200, Juan Quintela wrote:

[...]

> +static void multifd_pages_init(MultiFDPages_t **ppages, size_t size)
> +{
> +    MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
> +
> +    pages->allocated = size;
> +    pages->iov = g_new0(struct iovec, size);
> +    pages->offset = g_new0(ram_addr_t, size);
> +    *ppages = pages;
> +}

Can we just return the pages pointer?  Then it can be:

  static MultiFDPages_t *multifd_pages_init(size_t size)

[...]

> @@ -731,6 +784,7 @@ static void *multifd_recv_thread(void *opaque)
>  int multifd_load_setup(void)
>  {
>      int thread_count;
> +    uint32_t page_count = migrate_multifd_page_count();
>      uint8_t i;
>  
>      if (!migrate_use_multifd()) {
> @@ -740,6 +794,7 @@ int multifd_load_setup(void)
>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
>      atomic_set(&multifd_recv_state->count, 0);
> +

Useless line?

Otherwise:

Reviewed-by: Peter Xu <peterx@redhat.com>

-- 
Peter Xu

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 05/21] migration: Export functions to create send channels
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 05/21] migration: Export functions to create send channels Juan Quintela
@ 2018-04-26  7:28   ` Peter Xu
  2018-05-09  8:05     ` Juan Quintela
  0 siblings, 1 reply; 60+ messages in thread
From: Peter Xu @ 2018-04-26  7:28 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, dgilbert, lvivier

On Wed, Apr 25, 2018 at 01:27:07PM +0200, Juan Quintela wrote:
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>
> ---
>  migration/socket.c | 28 +++++++++++++++++++++++++++-
>  migration/socket.h |  7 +++++++
>  2 files changed, 34 insertions(+), 1 deletion(-)
> 
> diff --git a/migration/socket.c b/migration/socket.c
> index edf33c70cf..893a04f4cc 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -29,6 +29,28 @@
>  #include "trace.h"
>  
>  
> +struct SocketOutgoingArgs {
> +    SocketAddress *saddr;
> +} outgoing_args;

I am not sure whether I have asked before, but... could we put this
into MigrateState*?  The thing is that introducing more global
variables will make things scattered, and we do stuff to merge them
(like the RAMState cleanup work).  IMHO it saves time if we can do it
from the very beginning.

> +
> +void socket_send_channel_create(QIOTaskFunc f, void *data)
> +{
> +    QIOChannelSocket *sioc = qio_channel_socket_new();
> +    qio_channel_socket_connect_async(sioc, outgoing_args.saddr,
> +                                     f, data, NULL, NULL);
> +}
> +
> +int socket_send_channel_destroy(QIOChannel *send)
> +{
> +    /* Remove channel */
> +    object_unref(OBJECT(send));
> +    if (outgoing_args.saddr) {
> +        qapi_free_SocketAddress(outgoing_args.saddr);
> +        outgoing_args.saddr = NULL;
> +    }
> +    return 0;
> +}

Here I would possibly avoid bothering introducing the two new APIs
since AFAIU they didn't do much things, and both of them are only
called once...  And IMHO when we call socket_send_channel_create() in
multifd_save_setup() we can initialize MultiFDSendParams->c already
with the object returned by qio_channel_socket_new() if without the
API, instead of waiting until multifd_new_send_channel_async() is
called.

Thanks,

-- 
Peter Xu

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 17/21] migration: Create ram_multifd_page
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 17/21] migration: Create ram_multifd_page Juan Quintela
@ 2018-04-26  7:43   ` Peter Xu
  2018-04-26  8:18   ` Peter Xu
  1 sibling, 0 replies; 60+ messages in thread
From: Peter Xu @ 2018-04-26  7:43 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, dgilbert, lvivier

On Wed, Apr 25, 2018 at 01:27:19PM +0200, Juan Quintela wrote:

[...]

> +static void multifd_queue_page(RAMBlock *block, ram_addr_t offset)
> +{
> +    MultiFDPages_t *pages = multifd_send_state->pages;
> +
> +    if (!pages->block) {
> +        pages->block = block;
> +    }
> +
> +    if (pages->block == block) {
> +        pages->offset[pages->used] = offset;
> +        pages->iov[pages->used].iov_base = block->host + offset;
> +        pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
> +        pages->used++;
> +
> +        if (pages->used < pages->allocated) {
> +            return;
> +        }
> +    }
> +
> +    multifd_send_pages();
> +
> +    if (pages->block != block) {
> +        multifd_queue_page(block, offset);

Nits: could we avoid the recursive call here?  E.g.:

multifd_queue_page()
{
  /* flush pages if necessary */
  if (pages->block && (pages->block != block ||
                       pages->used >= pages->allocated))
    multifd_send_pages();

  if (!pages->block)
    pages->block = block;

  pages->offset[pages->used] = ...
  pages->iov[pages->used].iov_base = ...
  pages->iov[pages->used].iov_len = ...
  pages->used++;
}

> +    }
> +}
> +
>  static void multifd_send_terminate_threads(Error *err)
>  {
>      int i;
> @@ -746,6 +804,7 @@ int multifd_save_cleanup(Error **errp)
>          g_free(p->packet);
>          p->packet = NULL;
>      }
> +    qemu_sem_destroy(&multifd_send_state->channels_ready);
>      qemu_sem_destroy(&multifd_send_state->sem_sync);
>      g_free(multifd_send_state->params);
>      multifd_send_state->params = NULL;
> @@ -763,12 +822,17 @@ static void multifd_send_sync_main(void)
>      if (!migrate_use_multifd()) {
>          return;
>      }
> +    if (multifd_send_state->pages->used) {
> +        multifd_send_pages();
> +    }
>      for (i = 0; i < migrate_multifd_channels(); i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
>          trace_multifd_send_sync_main_signal(p->id);
>  
>          qemu_mutex_lock(&p->mutex);
> +        multifd_send_state->seq++;
> +        p->seq = multifd_send_state->seq;
>          p->flags |= MULTIFD_FLAG_SYNC;
>          p->pending_job++;
>          qemu_mutex_unlock(&p->mutex);
> @@ -824,6 +888,7 @@ static void *multifd_send_thread(void *opaque)
>              if (flags & MULTIFD_FLAG_SYNC) {
>                  qemu_sem_post(&multifd_send_state->sem_sync);
>              }
> +            qemu_sem_post(&multifd_send_state->channels_ready);
>          } else if (p->quit) {
>              qemu_mutex_unlock(&p->mutex);
>              break;
> @@ -883,6 +948,7 @@ int multifd_save_setup(void)
>      atomic_set(&multifd_send_state->count, 0);
>      multifd_pages_init(&multifd_send_state->pages, page_count);
>      qemu_sem_init(&multifd_send_state->sem_sync, 0);
> +    qemu_sem_init(&multifd_send_state->channels_ready, 0);
>  
>      for (i = 0; i < thread_count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
> @@ -1576,6 +1642,31 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
>      return pages;
>  }
>  
> +static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
> +                            bool last_stage)
> +{

Maybe name it as ram_send_multifd_page()? :)

So that it can be aligned with the rest like ram_save_page() and
ram_save_compressed_page().

> +    int pages;
> +    uint8_t *p;
> +    RAMBlock *block = pss->block;
> +    ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
> +
> +    p = block->host + offset;
> +
> +    pages = save_zero_page(rs, block, offset);

I suspect the series will need to rebase to Dave's next pull request
(or after its merging) since Guangrong's work should have refactored
this part in the coming pull request...

Thanks,

> +    if (pages == -1) {
> +        ram_counters.transferred +=
> +            save_page_header(rs, rs->f, block,
> +                             offset | RAM_SAVE_FLAG_PAGE);
> +        multifd_queue_page(block, offset);
> +        qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
> +        ram_counters.transferred += TARGET_PAGE_SIZE;
> +        pages = 1;
> +        ram_counters.normal++;
> +    }
> +
> +    return pages;
> +}
> +
>  static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
>                                  ram_addr_t offset)
>  {
> @@ -2004,6 +2095,8 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
>          if (migrate_use_compression() &&
>              (rs->ram_bulk_stage || !migrate_use_xbzrle())) {
>              res = ram_save_compressed_page(rs, pss, last_stage);
> +        } else if (migrate_use_multifd()) {
> +            res = ram_multifd_page(rs, pss, last_stage);
>          } else {
>              res = ram_save_page(rs, pss, last_stage);
>          }
> -- 
> 2.17.0
> 

-- 
Peter Xu

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 17/21] migration: Create ram_multifd_page
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 17/21] migration: Create ram_multifd_page Juan Quintela
  2018-04-26  7:43   ` Peter Xu
@ 2018-04-26  8:18   ` Peter Xu
  2018-05-03 11:30     ` Dr. David Alan Gilbert
  2018-05-23 11:13     ` Juan Quintela
  1 sibling, 2 replies; 60+ messages in thread
From: Peter Xu @ 2018-04-26  8:18 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, dgilbert, lvivier

On Wed, Apr 25, 2018 at 01:27:19PM +0200, Juan Quintela wrote:
> The function still don't use multifd, but we have simplified
> ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
> counter.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> --
> Add last_page parameter
> Add commets for done and address
> Remove multifd field, it is the same than normal pages
> Merge next patch, now we send multiple pages at a time
> Remove counter for multifd pages, it is identical to normal pages
> Use iovec's instead of creating the equivalent.
> Clear memory used by pages (dave)
> Use g_new0(danp)
> define MULTIFD_CONTINUE
> now pages member is a pointer
> Fix off-by-one in number of pages in one packet
> Remove RAM_SAVE_FLAG_MULTIFD_PAGE
> s/multifd_pages_t/MultiFDPages_t/
> ---
>  migration/ram.c | 93 +++++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 93 insertions(+)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 398cb0af3b..862ec53d32 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -54,6 +54,7 @@
>  #include "migration/block.h"
>  #include "sysemu/sysemu.h"
>  #include "qemu/uuid.h"
> +#include "qemu/iov.h"
>  
>  /***********************************************************/
>  /* ram save/restore */
> @@ -692,8 +693,65 @@ struct {
>      QemuSemaphore sem_sync;
>      /* global number of generated multifd packets */
>      uint32_t seq;
> +    /* send channels ready */
> +    QemuSemaphore channels_ready;
>  } *multifd_send_state;
>  
> +static void multifd_send_pages(void)
> +{
> +    int i;
> +    static int next_channel;
> +    MultiFDSendParams *p = NULL; /* make happy gcc */
> +    MultiFDPages_t *pages = multifd_send_state->pages;
> +
> +    qemu_sem_wait(&multifd_send_state->channels_ready);

This sem is posted when a thread has finished its work.  However this
is called in the main migration thread.  If with this line, are the
threads really sending things in parallel?  Since it looks to me that
this function (and the main thread) won't send the 2nd page array if
the 1st hasn't finished, and won't send the 3rd if the 2nd hasn't,
vice versa...

Maybe I misunderstood something.  Please feel free to correct me.

> +    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
> +        p = &multifd_send_state->params[i];
> +
> +        qemu_mutex_lock(&p->mutex);
> +        if (!p->pending_job) {
> +            p->pending_job++;
> +            next_channel = (i + 1) % migrate_multifd_channels();
> +            break;
> +        }
> +        qemu_mutex_unlock(&p->mutex);
> +    }
> +    p->pages->used = 0;
> +    multifd_send_state->seq++;
> +    p->seq = multifd_send_state->seq;
> +    p->pages->block = NULL;
> +    multifd_send_state->pages = p->pages;
> +    p->pages = pages;

Here we directly replaced MultiFDSendParams.pages with
multifd_send_state->pages.  Then are we always using a single
MultiFDPages_t struct?  And if so, will all the initial
MultiFDSendParams.pages memory leaked without freed?

> +    qemu_mutex_unlock(&p->mutex);
> +    qemu_sem_post(&p->sem);
> +}

Thanks,

-- 
Peter Xu

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 00/21] Multifd
  2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
                   ` (21 preceding siblings ...)
  2018-04-25 11:44 ` [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
@ 2018-04-26  8:28 ` Peter Xu
  22 siblings, 0 replies; 60+ messages in thread
From: Peter Xu @ 2018-04-26  8:28 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, dgilbert, lvivier

On Wed, Apr 25, 2018 at 01:27:02PM +0200, Juan Quintela wrote:
> 
> Hi
> 
> 
> [v12]
> 
> Big news, it is not RFC anymore, it works reliabely for me.
> 
> Changes:
> - Locknig changed completely (several times)
> - We now send  all pages through the channels.  In a 2GB guest with 1 disk and a network card, the amount of data send for RAM was 80KB.
> - This is not optimized yet, but it shouws clear improvements over precopy.  testing over localhost networking I can guet:
>   - 2 VCPUs guest
>   - 2GB RAM
>   - runn stress --vm 4 --vm 500GB (i.e. dirtying 2GB or RAM each second)
> 
>   - Total time: precopy ~50seconds, multifd  around 11seconds
>   - Bandwidth usage is around 273MB/s vs 71MB/s on the same hardware
> 
> This is very preleminary testing, will send more numbers when I got them.  But looks promissing.
> 
> Things that will be improved later:
> - Initial synchronization is too slow (around 1s)
> - We synchronize all threads after each RAM section, we can move to only
>   synchronize them after we have done a bitmap syncrhronization
> - We can improve bitmap walking (but that is independent of multifd)

Hi, Juan,

I got some high level review comments and notes:

- This series may need to rebase after Guangrong's cleanup series.

- Looks like now we allow multifd and compression be enabled
  together.  Shall we restrict on that?

- Is multifd only for TCP?  If so, do we check against that?  E.g.,
  should we fail the unix/fd/exec migrations when multifd is enabled?

- Why init sync is slow (1s)?   Is there any clue of that problem?

- Currently the sync between threads are still very complicated to
  me... we have these on the sender side (I didn't dig the recv side):

  - two global semaphores in multifd_send_state,
  - one mutex and two semaphores in each of the send thread,

  So in total we'll have 2+3*N such locks/sems.

  I'm thinking whether we can further simplify the sync logic a bit...

Thanks,

-- 
Peter Xu

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 01/21] migration: Set error state in case of error
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 01/21] migration: Set error state in case of error Juan Quintela
@ 2018-05-02 15:53   ` Dr. David Alan Gilbert
  2018-05-09  8:15     ` Juan Quintela
  0 siblings, 1 reply; 60+ messages in thread
From: Dr. David Alan Gilbert @ 2018-05-02 15:53 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 24 ++++++++++++++++++++++--
>  1 file changed, 22 insertions(+), 2 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 0e90efa092..2ae560ea80 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -415,10 +415,20 @@ struct {
>      int count;
>  } *multifd_send_state;
>  
> -static void terminate_multifd_send_threads(Error *errp)
> +static void terminate_multifd_send_threads(Error *err)
>  {
>      int i;
>  
> +    if (err) {
> +        MigrationState *s = migrate_get_current();
> +        migrate_set_error(s, err);
> +        if (s->state == MIGRATION_STATUS_SETUP ||
> +            s->state == MIGRATION_STATUS_ACTIVE) {

Can you explain/add comment why these only set it in some states?
For example what about PRE_SWITCHOVER, DEVICE or postcopy?

Dave

> +            migrate_set_state(&s->state, s->state,
> +                              MIGRATION_STATUS_FAILED);
> +        }
> +    }
> +
>      for (i = 0; i < multifd_send_state->count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
> @@ -515,10 +525,20 @@ struct {
>      int count;
>  } *multifd_recv_state;
>  
> -static void terminate_multifd_recv_threads(Error *errp)
> +static void terminate_multifd_recv_threads(Error *err)
>  {
>      int i;
>  
> +    if (err) {
> +        MigrationState *s = migrate_get_current();
> +        migrate_set_error(s, err);
> +        if (s->state == MIGRATION_STATUS_SETUP ||
> +            s->state == MIGRATION_STATUS_ACTIVE) {
> +            migrate_set_state(&s->state, s->state,
> +                              MIGRATION_STATUS_FAILED);
> +        }
> +    }
> +
>      for (i = 0; i < multifd_recv_state->count; i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
> -- 
> 2.17.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 08/21] migration: Transmit initial package through the multifd channels
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 08/21] migration: Transmit initial package through the multifd channels Juan Quintela
@ 2018-05-02 17:19   ` Dr. David Alan Gilbert
  2018-05-09  8:34     ` Juan Quintela
  0 siblings, 1 reply; 60+ messages in thread
From: Dr. David Alan Gilbert @ 2018-05-02 17:19 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>
> 
> --
> 
> Be network agnostic.
> Add error checking for all values.
> ---
>  migration/ram.c | 101 +++++++++++++++++++++++++++++++++++++++++++++---
>  1 file changed, 96 insertions(+), 5 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 5a87d74862..1aab392d8f 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -52,6 +52,8 @@
>  #include "qemu/rcu_queue.h"
>  #include "migration/colo.h"
>  #include "migration/block.h"
> +#include "sysemu/sysemu.h"
> +#include "qemu/uuid.h"
>  
>  /***********************************************************/
>  /* ram save/restore */
> @@ -400,6 +402,16 @@ static void compress_threads_save_setup(void)
>  
>  /* Multiple fd's */
>  
> +#define MULTIFD_MAGIC 0x11223344U
> +#define MULTIFD_VERSION 1
> +
> +typedef struct {
> +    uint32_t magic;
> +    uint32_t version;
> +    unsigned char uuid[16]; /* QemuUUID */
> +    uint8_t id;
> +} __attribute__((packed)) MultiFDInit_t;
> +
>  struct MultiFDSendParams {
>      uint8_t id;
>      char *name;
> @@ -412,6 +424,65 @@ struct MultiFDSendParams {
>  };
>  typedef struct MultiFDSendParams MultiFDSendParams;
>  
> +static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
> +{
> +    MultiFDInit_t msg;
> +    int ret;
> +
> +    msg.magic = cpu_to_be32(MULTIFD_MAGIC);
> +    msg.version = cpu_to_be32(MULTIFD_VERSION);
> +    msg.id = p->id;
> +    memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
> +
> +    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
> +    if (ret != 0) {
> +        return -1;
> +    }
> +    return 0;
> +}
> +
> +static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
> +{
> +    MultiFDInit_t msg;
> +    int ret;
> +
> +    ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
> +    if (ret != 0) {
> +        return -1;
> +    }
> +
> +    be32_to_cpus(&msg.magic);
> +    be32_to_cpus(&msg.version);
> +
> +    if (msg.magic != MULTIFD_MAGIC) {
> +        error_setg(errp, "multifd: received packet magic %d "
> +                   "expected %d", msg.magic, MULTIFD_MAGIC);

Please print the magic with %x.

> +        return -1;
> +    }
> +
> +    if (msg.version != MULTIFD_VERSION) {
> +        error_setg(errp, "multifd: received packet version %d "
> +                   "expected %d", msg.version, MULTIFD_VERSION);
> +        return -1;
> +    }
> +
> +    if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
> +        char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
> +        error_setg(errp, "multifd: received uuid '%s' and expected "
> +                   "uuid '%s' for channel %hhd", msg.uuid, uuid, msg.id);

I don't think you can just print msg.uuid there; it's the raw 16 bytes
isn't it rather than a string?
I suspect you need to do the unprase on both of them.

Dave

> +        g_free(uuid);
> +        return -1;
> +    }
> +
> +    if (msg.id > migrate_multifd_channels()) {
> +        error_setg(errp, "multifd: received channel version %d "
> +                   "expected %d", msg.version, MULTIFD_VERSION);
> +        return -1;
> +    }
> +
> +    return msg.id;
> +}
> +
>  struct {
>      MultiFDSendParams *params;
>      /* number of created threads */
> @@ -474,6 +545,11 @@ int multifd_save_cleanup(Error **errp)
>  static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *p = opaque;
> +    Error *local_err = NULL;
> +
> +    if (multifd_send_initial_packet(p, &local_err) < 0) {
> +        goto out;
> +    }
>  
>      while (true) {
>          qemu_mutex_lock(&p->mutex);
> @@ -485,6 +561,11 @@ static void *multifd_send_thread(void *opaque)
>          qemu_sem_wait(&p->sem);
>      }
>  
> +out:
> +    if (local_err) {
> +        multifd_send_terminate_threads(local_err);
> +    }
> +
>      qemu_mutex_lock(&p->mutex);
>      p->running = false;
>      qemu_mutex_unlock(&p->mutex);
> @@ -669,12 +750,22 @@ bool multifd_recv_all_channels_created(void)
>  void multifd_recv_new_channel(QIOChannel *ioc)
>  {
>      MultiFDRecvParams *p;
> -    /* we need to invent channels id's until we transmit */
> -    /* we will remove this on a later patch */
> -    static int i;
> +    Error *local_err = NULL;
> +    int id;
>  
> -    p = &multifd_recv_state->params[i];
> -    i++;
> +    id = multifd_recv_initial_packet(ioc, &local_err);
> +    if (id < 0) {
> +        multifd_recv_terminate_threads(local_err);
> +        return;
> +    }
> +
> +    p = &multifd_recv_state->params[id];
> +    if (p->c != NULL) {
> +        error_setg(&local_err, "multifd: received id '%d' already setup'",
> +                   id);
> +        multifd_recv_terminate_threads(local_err);
> +        return;
> +    }
>      p->c = ioc;
>      object_ref(OBJECT(ioc));
>  
> -- 
> 2.17.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 09/21] migration: Define MultifdRecvParams sooner
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 09/21] migration: Define MultifdRecvParams sooner Juan Quintela
@ 2018-05-02 17:32   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 60+ messages in thread
From: Dr. David Alan Gilbert @ 2018-05-02 17:32 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> Once there, we don't need the struct names anywhere, just the
> typedefs.  And now also document all fields.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> ---
>  migration/ram.c | 46 +++++++++++++++++++++++++++++++---------------
>  1 file changed, 31 insertions(+), 15 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 1aab392d8f..ffefa73099 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -412,17 +412,45 @@ typedef struct {
>      uint8_t id;
>  } __attribute__((packed)) MultiFDInit_t;
>  
> -struct MultiFDSendParams {
> +typedef struct {
> +    /* this fields are not changed once the thread is created */
> +    /* channel number */
>      uint8_t id;
> +    /* channel thread name */
>      char *name;
> +    /* channel thread id */
>      QemuThread thread;
> +    /* communication channel */
>      QIOChannel *c;
> +    /* sem where to wait for more work */
>      QemuSemaphore sem;
> +    /* this mutex protects the following parameters */
>      QemuMutex mutex;
> +    /* is this channel thread running */
>      bool running;
> +    /* should this thread finish */
>      bool quit;
> -};
> -typedef struct MultiFDSendParams MultiFDSendParams;
> +}  MultiFDSendParams;
> +
> +typedef struct {
> +    /* this fields are not changed once the thread is created */
> +    /* channel number */
> +    uint8_t id;
> +    /* channel thread name */
> +    char *name;
> +    /* channel thread id */
> +    QemuThread thread;
> +    /* communication channel */
> +    QIOChannel *c;
> +    /* sem where to wait for more work */
> +    QemuSemaphore sem;
> +    /* this mutex protects the following parameters */
> +    QemuMutex mutex;
> +    /* is this channel thread running */
> +    bool running;
> +    /* should this thread finish */
> +    bool quit;
> +} MultiFDRecvParams;
>  
>  static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
>  {
> @@ -619,18 +647,6 @@ int multifd_save_setup(void)
>      return 0;
>  }
>  
> -struct MultiFDRecvParams {
> -    uint8_t id;
> -    char *name;
> -    QemuThread thread;
> -    QIOChannel *c;
> -    QemuSemaphore sem;
> -    QemuMutex mutex;
> -    bool running;
> -    bool quit;
> -};
> -typedef struct MultiFDRecvParams MultiFDRecvParams;
> -
>  struct {
>      MultiFDRecvParams *params;
>      /* number of created threads */
> -- 
> 2.17.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 10/21] migration: Create multipage support
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 10/21] migration: Create multipage support Juan Quintela
  2018-04-26  7:15   ` Peter Xu
@ 2018-05-02 17:52   ` Dr. David Alan Gilbert
  2018-05-09 10:53     ` Juan Quintela
  1 sibling, 1 reply; 60+ messages in thread
From: Dr. David Alan Gilbert @ 2018-05-02 17:52 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> We only create/destry the page list here.  We will use it later.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 56 +++++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 56 insertions(+)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index ffefa73099..b19300992e 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -412,6 +412,20 @@ typedef struct {
>      uint8_t id;
>  } __attribute__((packed)) MultiFDInit_t;
>  
> +typedef struct {
> +    /* number of used pages */
> +    uint32_t used;
> +    /* number of allocated pages */
> +    uint32_t allocated;
> +    /* global number of generated multifd packets */
> +    uint32_t seq;

Is that sufficiently large?
Consider a 40Gbps link; that's ~4GByte/second,
so if each packet is a 4K page, that's just over one hour at
that link speed;  that's a long migration on a fast link, but
it's not impossible is it - and what happens when it wraps?

> +    /* offset of each page */
> +    ram_addr_t *offset;
> +    /* pointer to each page */
> +    struct iovec *iov;
> +    RAMBlock *block;
> +} MultiFDPages_t;
> +
>  typedef struct {
>      /* this fields are not changed once the thread is created */
>      /* channel number */
> @@ -430,6 +444,8 @@ typedef struct {
>      bool running;
>      /* should this thread finish */
>      bool quit;
> +    /* array of pages to sent */

s/sent/send/

Dave

> +    MultiFDPages_t *pages;
>  }  MultiFDSendParams;
>  
>  typedef struct {
> @@ -450,6 +466,8 @@ typedef struct {
>      bool running;
>      /* should this thread finish */
>      bool quit;
> +    /* array of pages to receive */
> +    MultiFDPages_t *pages;
>  } MultiFDRecvParams;
>  
>  static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
> @@ -511,10 +529,35 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
>      return msg.id;
>  }
>  
> +static void multifd_pages_init(MultiFDPages_t **ppages, size_t size)
> +{
> +    MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
> +
> +    pages->allocated = size;
> +    pages->iov = g_new0(struct iovec, size);
> +    pages->offset = g_new0(ram_addr_t, size);
> +    *ppages = pages;
> +}
> +
> +static void multifd_pages_clear(MultiFDPages_t *pages)
> +{
> +    pages->used = 0;
> +    pages->allocated = 0;
> +    pages->seq = 0;
> +    pages->block = NULL;
> +    g_free(pages->iov);
> +    pages->iov = NULL;
> +    g_free(pages->offset);
> +    pages->offset = NULL;
> +    g_free(pages);
> +}
> +
>  struct {
>      MultiFDSendParams *params;
>      /* number of created threads */
>      int count;
> +    /* array of pages to sent */
> +    MultiFDPages_t *pages;
>  } *multifd_send_state;
>  
>  static void multifd_send_terminate_threads(Error *err)
> @@ -562,9 +605,13 @@ int multifd_save_cleanup(Error **errp)
>          qemu_sem_destroy(&p->sem);
>          g_free(p->name);
>          p->name = NULL;
> +        multifd_pages_clear(p->pages);
> +        p->pages = NULL;
>      }
>      g_free(multifd_send_state->params);
>      multifd_send_state->params = NULL;
> +    multifd_pages_clear(multifd_send_state->pages);
> +    multifd_send_state->pages = NULL;
>      g_free(multifd_send_state);
>      multifd_send_state = NULL;
>      return ret;
> @@ -625,6 +672,7 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
>  int multifd_save_setup(void)
>  {
>      int thread_count;
> +    uint32_t page_count = migrate_multifd_page_count();
>      uint8_t i;
>  
>      if (!migrate_use_multifd()) {
> @@ -634,6 +682,8 @@ int multifd_save_setup(void)
>      multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
>      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
>      atomic_set(&multifd_send_state->count, 0);
> +    multifd_pages_init(&multifd_send_state->pages, page_count);
> +
>      for (i = 0; i < thread_count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
> @@ -641,6 +691,7 @@ int multifd_save_setup(void)
>          qemu_sem_init(&p->sem, 0);
>          p->quit = false;
>          p->id = i;
> +        multifd_pages_init(&p->pages, page_count);
>          p->name = g_strdup_printf("multifdsend_%d", i);
>          socket_send_channel_create(multifd_new_send_channel_async, p);
>      }
> @@ -698,6 +749,8 @@ int multifd_load_cleanup(Error **errp)
>          qemu_sem_destroy(&p->sem);
>          g_free(p->name);
>          p->name = NULL;
> +        multifd_pages_clear(p->pages);
> +        p->pages = NULL;
>      }
>      g_free(multifd_recv_state->params);
>      multifd_recv_state->params = NULL;
> @@ -731,6 +784,7 @@ static void *multifd_recv_thread(void *opaque)
>  int multifd_load_setup(void)
>  {
>      int thread_count;
> +    uint32_t page_count = migrate_multifd_page_count();
>      uint8_t i;
>  
>      if (!migrate_use_multifd()) {
> @@ -740,6 +794,7 @@ int multifd_load_setup(void)
>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
>      atomic_set(&multifd_recv_state->count, 0);
> +
>      for (i = 0; i < thread_count; i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
> @@ -747,6 +802,7 @@ int multifd_load_setup(void)
>          qemu_sem_init(&p->sem, 0);
>          p->quit = false;
>          p->id = i;
> +        multifd_pages_init(&p->pages, page_count);
>          p->name = g_strdup_printf("multifdrecv_%d", i);
>      }
>      return 0;
> -- 
> 2.17.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 11/21] migration: Create multifd packet
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 11/21] migration: Create multifd packet Juan Quintela
@ 2018-05-02 18:04   ` Dr. David Alan Gilbert
  2018-05-09 11:09     ` Juan Quintela
  0 siblings, 1 reply; 60+ messages in thread
From: Dr. David Alan Gilbert @ 2018-05-02 18:04 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> We still don't put anything there.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 137 +++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 136 insertions(+), 1 deletion(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index b19300992e..804c83ed89 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -412,6 +412,17 @@ typedef struct {
>      uint8_t id;
>  } __attribute__((packed)) MultiFDInit_t;
>  
> +typedef struct {
> +    uint32_t magic;
> +    uint32_t version;
> +    uint32_t flags;
> +    uint32_t size;
> +    uint32_t used;
> +    uint32_t seq;
> +    char ramblock[256];
> +    uint64_t offset[];
> +} __attribute__((packed)) MultiFDPacket_t;
> +
>  typedef struct {
>      /* number of used pages */
>      uint32_t used;
> @@ -446,6 +457,14 @@ typedef struct {
>      bool quit;
>      /* array of pages to sent */
>      MultiFDPages_t *pages;
> +    /* packet allocated len */
> +    uint32_t packet_len;
> +    /* pointer to the packet */
> +    MultiFDPacket_t *packet;
> +    /* multifd flags for each packet */
> +    uint32_t flags;
> +    /* global number of generated multifd packets */
> +    uint32_t seq;
>  }  MultiFDSendParams;
>  
>  typedef struct {
> @@ -468,6 +487,14 @@ typedef struct {
>      bool quit;
>      /* array of pages to receive */
>      MultiFDPages_t *pages;
> +    /* packet allocated len */
> +    uint32_t packet_len;
> +    /* pointer to the packet */
> +    MultiFDPacket_t *packet;
> +    /* multifd flags for each packet */
> +    uint32_t flags;
> +    /* global number of generated multifd packets */
> +    uint32_t seq;
>  } MultiFDRecvParams;
>  
>  static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
> @@ -552,6 +579,91 @@ static void multifd_pages_clear(MultiFDPages_t *pages)
>      g_free(pages);
>  }
>  
> +static void multifd_send_fill_packet(MultiFDSendParams *p)
> +{
> +    MultiFDPacket_t *packet = p->packet;
> +    int i;
> +
> +    packet->magic = cpu_to_be32(MULTIFD_MAGIC);
> +    packet->version = cpu_to_be32(MULTIFD_VERSION);
> +    packet->flags = cpu_to_be32(p->flags);
> +    packet->size = cpu_to_be32(migrate_multifd_page_count());
> +    packet->used = cpu_to_be32(p->pages->used);
> +    packet->seq = cpu_to_be32(p->seq);
> +
> +    if (p->pages->block) {
> +        strncpy(packet->ramblock, p->pages->block->idstr, 256);
> +    }
> +
> +    for (i = 0; i < p->pages->used; i++) {
> +        packet->offset[i] = cpu_to_be64(p->pages->offset[i]);
> +    }
> +}
> +
> +static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
> +{
> +    MultiFDPacket_t *packet = p->packet;
> +    RAMBlock *block;
> +    int i;
> +
> +    /* ToDo: We can't use it until we haven't received a message */
> +    return 0;
> +
> +    be32_to_cpus(&packet->magic);
> +    if (packet->magic != MULTIFD_MAGIC) {
> +        error_setg(errp, "multifd: received packet "
> +                   "version %d and expected version %d",
> +                   packet->magic, MULTIFD_VERSION);

That's mixing magic and version. (Magic's as %x please)

> +        return -1;
> +    }
> +
> +    be32_to_cpus(&packet->version);
> +    if (packet->version != MULTIFD_VERSION) {
> +        error_setg(errp, "multifd: received packet "
> +                   "version %d and expected version %d",
> +                   packet->version, MULTIFD_VERSION);
> +        return -1;
> +    }
> +
> +    p->flags = be32_to_cpu(packet->flags);
> +
> +    be32_to_cpus(&packet->size);
> +    if (packet->size > migrate_multifd_page_count()) {
> +        error_setg(errp, "multifd: received packet "
> +                   "with size %d and expected maximum size %d",
> +                   packet->size, migrate_multifd_page_count()) ;
> +        return -1;
> +    }
> +
> +    p->pages->used = be32_to_cpu(packet->used);
> +    if (p->pages->used > packet->size) {
> +        error_setg(errp, "multifd: received packet "
> +                   "with size %d and expected maximum size %d",
> +                   p->pages->used, packet->size) ;
> +        return -1;
> +    }
> +
> +    p->seq = be32_to_cpu(packet->seq);
> +
> +    if (p->pages->used) {
> +        block = qemu_ram_block_by_name(packet->ramblock);

Do you need to ensure that packet->ramblock is a terminated string
first?

> +        if (!block) {
> +            error_setg(errp, "multifd: unknown ram block %s",
> +                       packet->ramblock);
> +            return -1;
> +        }
> +    }
> +
> +    for (i = 0; i < p->pages->used; i++) {
> +        ram_addr_t offset = be64_to_cpu(packet->offset[i]);
> +
> +        p->pages->iov[i].iov_base = block->host + offset;

I think that needs validating to ensure that the source didn't
send us junk and cause us to overwrite after the end of block->host

> +        p->pages->iov[i].iov_len = TARGET_PAGE_SIZE;
> +    }
> +
> +    return 0;
> +}
> +
>  struct {
>      MultiFDSendParams *params;
>      /* number of created threads */
> @@ -607,6 +719,9 @@ int multifd_save_cleanup(Error **errp)
>          p->name = NULL;
>          multifd_pages_clear(p->pages);
>          p->pages = NULL;
> +        p->packet_len = 0;
> +        g_free(p->packet);
> +        p->packet = NULL;
>      }
>      g_free(multifd_send_state->params);
>      multifd_send_state->params = NULL;
> @@ -628,6 +743,7 @@ static void *multifd_send_thread(void *opaque)
>  
>      while (true) {
>          qemu_mutex_lock(&p->mutex);
> +        multifd_send_fill_packet(p);
>          if (p->quit) {
>              qemu_mutex_unlock(&p->mutex);
>              break;
> @@ -692,6 +808,9 @@ int multifd_save_setup(void)
>          p->quit = false;
>          p->id = i;
>          multifd_pages_init(&p->pages, page_count);
> +        p->packet_len = sizeof(MultiFDPacket_t)
> +                      + sizeof(ram_addr_t) * page_count;
> +        p->packet = g_malloc0(p->packet_len);
>          p->name = g_strdup_printf("multifdsend_%d", i);
>          socket_send_channel_create(multifd_new_send_channel_async, p);
>      }
> @@ -751,6 +870,9 @@ int multifd_load_cleanup(Error **errp)
>          p->name = NULL;
>          multifd_pages_clear(p->pages);
>          p->pages = NULL;
> +        p->packet_len = 0;
> +        g_free(p->packet);
> +        p->packet = NULL;
>      }
>      g_free(multifd_recv_state->params);
>      multifd_recv_state->params = NULL;
> @@ -763,10 +885,20 @@ int multifd_load_cleanup(Error **errp)
>  static void *multifd_recv_thread(void *opaque)
>  {
>      MultiFDRecvParams *p = opaque;
> +    Error *local_err = NULL;
> +    int ret;
>  
>      while (true) {
>          qemu_mutex_lock(&p->mutex);
> -        if (p->quit) {
> +        if (false)  {
> +            /* ToDo: Packet reception goes here */
> +
> +            ret = multifd_recv_unfill_packet(p, &local_err);
> +            qemu_mutex_unlock(&p->mutex);
> +            if (ret) {
> +                break;
> +            }
> +        } else if (p->quit) {
>              qemu_mutex_unlock(&p->mutex);
>              break;
>          }
> @@ -803,6 +935,9 @@ int multifd_load_setup(void)
>          p->quit = false;
>          p->id = i;
>          multifd_pages_init(&p->pages, page_count);
> +        p->packet_len = sizeof(MultiFDPacket_t)
> +                      + sizeof(ram_addr_t) * page_count;
> +        p->packet = g_malloc0(p->packet_len);
>          p->name = g_strdup_printf("multifdrecv_%d", i);
>      }
>      return 0;
> -- 
> 2.17.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 12/21] migration: Add multifd traces for start/end thread
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 12/21] migration: Add multifd traces for start/end thread Juan Quintela
@ 2018-05-02 18:35   ` Dr. David Alan Gilbert
  2018-05-09 11:11     ` Juan Quintela
  0 siblings, 1 reply; 60+ messages in thread
From: Dr. David Alan Gilbert @ 2018-05-02 18:35 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> We want to know how many pages/packets each channel has sent.  Add
> counters for those.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c        | 20 ++++++++++++++++++++
>  migration/trace-events |  4 ++++
>  2 files changed, 24 insertions(+)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 804c83ed89..0f1340b4e3 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -465,6 +465,10 @@ typedef struct {
>      uint32_t flags;
>      /* global number of generated multifd packets */
>      uint32_t seq;
> +    /* packets sent through this channel */
> +    uint32_t num_packets;
> +    /* pages sent through this channel */
> +    uint32_t num_pages;
>  }  MultiFDSendParams;
>  
>  typedef struct {
> @@ -495,6 +499,10 @@ typedef struct {
>      uint32_t flags;
>      /* global number of generated multifd packets */
>      uint32_t seq;
> +    /* packets sent through this channel */
> +    uint32_t num_packets;
> +    /* pages sent through this channel */
> +    uint32_t num_pages;

Doesn't the comment for 'mutex' say 'protects the following parameters'
(added in 09/21) ?  In which case....

>  } MultiFDRecvParams;
>  
>  static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
> @@ -737,9 +745,13 @@ static void *multifd_send_thread(void *opaque)
>      MultiFDSendParams *p = opaque;
>      Error *local_err = NULL;
>  
> +    trace_multifd_send_thread_start(p->id);
> +
>      if (multifd_send_initial_packet(p, &local_err) < 0) {
>          goto out;
>      }
> +    /* initial packet */
> +    p->num_packets = 1;

That's writing to num_packets before the mutex is taken.

>  
>      while (true) {
>          qemu_mutex_lock(&p->mutex);
> @@ -761,6 +773,8 @@ out:
>      p->running = false;
>      qemu_mutex_unlock(&p->mutex);
>  
> +    trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
> +

and should this happen before they're released (OK, it's only a trace)

>      return NULL;
>  }
>  
> @@ -888,6 +902,8 @@ static void *multifd_recv_thread(void *opaque)
>      Error *local_err = NULL;
>      int ret;
>  
> +    trace_multifd_recv_thread_start(p->id);
> +
>      while (true) {
>          qemu_mutex_lock(&p->mutex);
>          if (false)  {
> @@ -910,6 +926,8 @@ static void *multifd_recv_thread(void *opaque)
>      p->running = false;
>      qemu_mutex_unlock(&p->mutex);
>  
> +    trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
> +
>      return NULL;
>  }
>  
> @@ -975,6 +993,8 @@ void multifd_recv_new_channel(QIOChannel *ioc)
>      }
>      p->c = ioc;
>      object_ref(OBJECT(ioc));
> +    /* initial packet */
> +    p->num_packets = 1;

OK, the receive side here is different.

>      p->running = true;
>      qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
> diff --git a/migration/trace-events b/migration/trace-events
> index a180d7b008..e480eb050e 100644
> --- a/migration/trace-events
> +++ b/migration/trace-events
> @@ -77,6 +77,10 @@ ram_load_postcopy_loop(uint64_t addr, int flags) "@%" PRIx64 " %x"
>  ram_postcopy_send_discard_bitmap(void) ""
>  ram_save_page(const char *rbname, uint64_t offset, void *host) "%s: offset: 0x%" PRIx64 " host: %p"
>  ram_save_queue_pages(const char *rbname, size_t start, size_t len) "%s: start: 0x%zx len: 0x%zx"
> +multifd_send_thread_start(uint8_t id) "%d"
> +multifd_send_thread_end(uint8_t id, uint32_t packets, uint32_t pages) "channel %d packets %d pages %d"
> +multifd_recv_thread_start(uint8_t id) "%d"
> +multifd_recv_thread_end(uint8_t id, uint32_t packets, uint32_t pages) "channel %d packets %d pages %d"

Please put the m's before the r's

Dave

>  # migration/migration.c
>  await_return_path_close_on_source_close(void) ""
> -- 
> 2.17.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 13/21] migration: Calculate transferred ram correctly
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 13/21] migration: Calculate transferred ram correctly Juan Quintela
@ 2018-05-02 18:59   ` Dr. David Alan Gilbert
  2018-05-09 11:14     ` Juan Quintela
  2018-05-09 19:46     ` Juan Quintela
  0 siblings, 2 replies; 60+ messages in thread
From: Dr. David Alan Gilbert @ 2018-05-02 18:59 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> On multifd we send data from more places that main channel.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/migration.c | 11 +++++++++--
>  1 file changed, 9 insertions(+), 2 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index 9b510a809a..75d30661e9 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -2246,12 +2246,19 @@ static void migration_update_counters(MigrationState *s,
>  {
>      uint64_t transferred, time_spent;
>      double bandwidth;
> +    uint64_t now;
>  
>      if (current_time < s->iteration_start_time + BUFFER_DELAY) {
>          return;
>      }
>  
> -    transferred = qemu_ftell(s->to_dst_file) - s->iteration_initial_bytes;
> +    if (migrate_use_multifd()) {
> +        now = ram_counters.normal * qemu_target_page_size()
> +            + qemu_ftell(s->to_dst_file);

OK, can I just confirm, the 'multifd packets' go over the main fd, and
just the pags are going over the other fd's?  In which case this is
right; but if the headers are going over the other fd's as well then
this is wrong.

Dave

> +    } else {
> +        now = qemu_ftell(s->to_dst_file);
> +    }
> +    transferred = now - s->iteration_initial_bytes;
>      time_spent = current_time - s->iteration_start_time;
>      bandwidth = (double)transferred / time_spent;
>      s->threshold_size = bandwidth * s->parameters.downtime_limit;
> @@ -2271,7 +2278,7 @@ static void migration_update_counters(MigrationState *s,
>      qemu_file_reset_rate_limit(s->to_dst_file);
>  
>      s->iteration_start_time = current_time;
> -    s->iteration_initial_bytes = qemu_ftell(s->to_dst_file);
> +    s->iteration_initial_bytes = now;
>  
>      trace_migrate_transferred(transferred, time_spent,
>                                bandwidth, s->threshold_size);
> -- 
> 2.17.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 14/21] migration: Multifd channels always wait on the sem
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 14/21] migration: Multifd channels always wait on the sem Juan Quintela
@ 2018-05-03  9:36   ` Dr. David Alan Gilbert
  2018-05-23 10:59     ` Juan Quintela
  0 siblings, 1 reply; 60+ messages in thread
From: Dr. David Alan Gilbert @ 2018-05-03  9:36 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> Either for quit, sync or packet, we first wake them.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 13 +++++++++++--
>  1 file changed, 11 insertions(+), 2 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 0f1340b4e3..21b448c4ed 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -754,6 +754,7 @@ static void *multifd_send_thread(void *opaque)
>      p->num_packets = 1;
>  
>      while (true) {
> +        qemu_sem_wait(&p->sem);
>          qemu_mutex_lock(&p->mutex);
>          multifd_send_fill_packet(p);
>          if (p->quit) {
> @@ -761,7 +762,9 @@ static void *multifd_send_thread(void *opaque)
>              break;
>          }
>          qemu_mutex_unlock(&p->mutex);
> -        qemu_sem_wait(&p->sem);
> +        /* this is impossible */
> +        error_setg(&local_err, "multifd_send_thread: Unknown command");
> +        break;

This error disappears in a later patch saying that you can have spurious
wakeups.

>      }
>  
>  out:
> @@ -905,6 +908,7 @@ static void *multifd_recv_thread(void *opaque)
>      trace_multifd_recv_thread_start(p->id);
>  
>      while (true) {
> +        qemu_sem_wait(&p->sem);
>          qemu_mutex_lock(&p->mutex);

All this stuff seems to change again in later patches.

Is there stuff here that can be flattened into the other patches?

Dave

>          if (false)  {
>              /* ToDo: Packet reception goes here */
> @@ -919,9 +923,14 @@ static void *multifd_recv_thread(void *opaque)
>              break;
>          }
>          qemu_mutex_unlock(&p->mutex);
> -        qemu_sem_wait(&p->sem);
> +        /* this is impossible */
> +        error_setg(&local_err, "multifd_recv_thread: Unknown command");
> +        break;
>      }
>  
> +    if (local_err) {
> +        multifd_recv_terminate_threads(local_err);
> +    }
>      qemu_mutex_lock(&p->mutex);
>      p->running = false;
>      qemu_mutex_unlock(&p->mutex);
> -- 
> 2.17.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 15/21] migration: Add block where to send/receive packets
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 15/21] migration: Add block where to send/receive packets Juan Quintela
@ 2018-05-03 10:03   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 60+ messages in thread
From: Dr. David Alan Gilbert @ 2018-05-03 10:03 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> Once there add tracepoints.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c        | 49 +++++++++++++++++++++++++++++++++++++-----
>  migration/trace-events |  2 ++
>  2 files changed, 46 insertions(+), 5 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 21b448c4ed..c4c185cc4c 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -455,6 +455,8 @@ typedef struct {
>      bool running;
>      /* should this thread finish */
>      bool quit;
> +    /* thread has work to do */
> +    int pending_job;
>      /* array of pages to sent */
>      MultiFDPages_t *pages;
>      /* packet allocated len */
> @@ -489,6 +491,8 @@ typedef struct {
>      bool running;
>      /* should this thread finish */
>      bool quit;
> +    /* thread has work to do */
> +    bool pending_job;
>      /* array of pages to receive */
>      MultiFDPages_t *pages;
>      /* packet allocated len */
> @@ -756,8 +760,28 @@ static void *multifd_send_thread(void *opaque)
>      while (true) {
>          qemu_sem_wait(&p->sem);
>          qemu_mutex_lock(&p->mutex);
> -        multifd_send_fill_packet(p);
> -        if (p->quit) {
> +
> +        if (p->pending_job) {
> +            uint32_t used = p->pages->used;
> +            uint32_t seq = p->seq;
> +            uint32_t flags = p->flags;
> +
> +            multifd_send_fill_packet(p);
> +            p->flags = 0;
> +            p->num_packets++;
> +            p->num_pages += used;
> +            p->pages->used = 0;
> +            qemu_mutex_unlock(&p->mutex);
> +
> +            trace_multifd_send(p->id, seq, used, flags);
> +
> +            /* ToDo: send packet here */
> +
> +            qemu_mutex_lock(&p->mutex);
> +            p->pending_job--;
> +            qemu_mutex_unlock(&p->mutex);
> +            continue;

OK, but this continue is an artifact of the warning about no command to
do that you later remove, so you could merge this down, but other than
that:


Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> +        } else if (p->quit) {
>              qemu_mutex_unlock(&p->mutex);
>              break;
>          }
> @@ -823,6 +847,7 @@ int multifd_save_setup(void)
>          qemu_mutex_init(&p->mutex);
>          qemu_sem_init(&p->sem, 0);
>          p->quit = false;
> +        p->pending_job = 0;
>          p->id = i;
>          multifd_pages_init(&p->pages, page_count);
>          p->packet_len = sizeof(MultiFDPacket_t)
> @@ -910,14 +935,27 @@ static void *multifd_recv_thread(void *opaque)
>      while (true) {
>          qemu_sem_wait(&p->sem);
>          qemu_mutex_lock(&p->mutex);
> -        if (false)  {
> -            /* ToDo: Packet reception goes here */
> +        if (p->pending_job) {
> +            uint32_t used;
> +            uint32_t flags;
> +            qemu_mutex_unlock(&p->mutex);
>  
> +            /* ToDo: recv packet here */
> +
> +            qemu_mutex_lock(&p->mutex);
>              ret = multifd_recv_unfill_packet(p, &local_err);
> -            qemu_mutex_unlock(&p->mutex);
>              if (ret) {
> +                qemu_mutex_unlock(&p->mutex);
>                  break;
>              }
> +
> +            used = p->pages->used;
> +            flags = p->flags;
> +            trace_multifd_recv(p->id, p->seq, used, flags);
> +            p->pending_job = false;
> +            p->num_packets++;
> +            p->num_pages += used;
> +            qemu_mutex_unlock(&p->mutex);
>          } else if (p->quit) {
>              qemu_mutex_unlock(&p->mutex);
>              break;
> @@ -960,6 +998,7 @@ int multifd_load_setup(void)
>          qemu_mutex_init(&p->mutex);
>          qemu_sem_init(&p->sem, 0);
>          p->quit = false;
> +        p->pending_job = false;
>          p->id = i;
>          multifd_pages_init(&p->pages, page_count);
>          p->packet_len = sizeof(MultiFDPacket_t)
> diff --git a/migration/trace-events b/migration/trace-events
> index e480eb050e..9eee048287 100644
> --- a/migration/trace-events
> +++ b/migration/trace-events
> @@ -81,6 +81,8 @@ multifd_send_thread_start(uint8_t id) "%d"
>  multifd_send_thread_end(uint8_t id, uint32_t packets, uint32_t pages) "channel %d packets %d pages %d"
>  multifd_recv_thread_start(uint8_t id) "%d"
>  multifd_recv_thread_end(uint8_t id, uint32_t packets, uint32_t pages) "channel %d packets %d pages %d"
> +multifd_send(uint8_t id, uint32_t seq, uint32_t used, uint32_t flags) "channel %d seq number %d pages %d flags 0x%x"
> +multifd_recv(uint8_t id, uint32_t seq, uint32_t used, uint32_t flags) "channel %d seq number %d pages %d flags 0x%x"
>  
>  # migration/migration.c
>  await_return_path_close_on_source_close(void) ""
> -- 
> 2.17.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 16/21] migration: Synchronize multifd threads with main thread
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 16/21] migration: Synchronize multifd threads with main thread Juan Quintela
@ 2018-05-03 10:44   ` Dr. David Alan Gilbert
  2018-05-09 19:45     ` Juan Quintela
  0 siblings, 1 reply; 60+ messages in thread
From: Dr. David Alan Gilbert @ 2018-05-03 10:44 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> We synchronize all threads each RAM_SAVE_FLAG_EOS.  Bitmap
> synchronizations don't happen inside a  ram section, so we are safe
> about two channels trying to overwrite the same memory.

OK, that's quite neat - so you don't need any extra flags in the stream
to do the sync;  it probably needs a comment in the code somewhere so we
don't forget!


> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c        | 118 +++++++++++++++++++++++++++++++++++++----
>  migration/trace-events |   6 +++
>  2 files changed, 113 insertions(+), 11 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index c4c185cc4c..398cb0af3b 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -405,6 +405,8 @@ static void compress_threads_save_setup(void)
>  #define MULTIFD_MAGIC 0x11223344U
>  #define MULTIFD_VERSION 1
>  
> +#define MULTIFD_FLAG_SYNC (1 << 0)
> +
>  typedef struct {
>      uint32_t magic;
>      uint32_t version;
> @@ -471,6 +473,8 @@ typedef struct {
>      uint32_t num_packets;
>      /* pages sent through this channel */
>      uint32_t num_pages;
> +    /* syncs main thread and channels */
> +    QemuSemaphore sem_sync;
>  }  MultiFDSendParams;
>  
>  typedef struct {
> @@ -507,6 +511,8 @@ typedef struct {
>      uint32_t num_packets;
>      /* pages sent through this channel */
>      uint32_t num_pages;
> +    /* syncs main thread and channels */
> +    QemuSemaphore sem_sync;
>  } MultiFDRecvParams;
>  
>  static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
> @@ -682,6 +688,10 @@ struct {
>      int count;
>      /* array of pages to sent */
>      MultiFDPages_t *pages;
> +    /* syncs main thread and channels */
> +    QemuSemaphore sem_sync;
> +    /* global number of generated multifd packets */
> +    uint32_t seq;

It's interesting you use the same comment for 'seq' in
MultiFDSendParams - but I guess that means only this one is the global
version and the others aren't really global number - they're just
local to that thread?

>  } *multifd_send_state;
>  
>  static void multifd_send_terminate_threads(Error *err)
> @@ -727,6 +737,7 @@ int multifd_save_cleanup(Error **errp)
>          p->c = NULL;
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        qemu_sem_destroy(&p->sem_sync);
>          g_free(p->name);
>          p->name = NULL;
>          multifd_pages_clear(p->pages);
> @@ -735,6 +746,7 @@ int multifd_save_cleanup(Error **errp)
>          g_free(p->packet);
>          p->packet = NULL;
>      }
> +    qemu_sem_destroy(&multifd_send_state->sem_sync);
>      g_free(multifd_send_state->params);
>      multifd_send_state->params = NULL;
>      multifd_pages_clear(multifd_send_state->pages);
> @@ -744,6 +756,33 @@ int multifd_save_cleanup(Error **errp)
>      return ret;
>  }
>  
> +static void multifd_send_sync_main(void)
> +{
> +    int i;
> +
> +    if (!migrate_use_multifd()) {
> +        return;
> +    }
> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> +        MultiFDSendParams *p = &multifd_send_state->params[i];
> +
> +        trace_multifd_send_sync_main_signal(p->id);
> +
> +        qemu_mutex_lock(&p->mutex);
> +        p->flags |= MULTIFD_FLAG_SYNC;
> +        p->pending_job++;
> +        qemu_mutex_unlock(&p->mutex);
> +        qemu_sem_post(&p->sem);
> +    }
> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> +        MultiFDSendParams *p = &multifd_send_state->params[i];
> +
> +        trace_multifd_send_sync_main_wait(p->id);
> +        qemu_sem_wait(&multifd_send_state->sem_sync);
> +    }
> +    trace_multifd_send_sync_main(multifd_send_state->seq);
> +}
> +

OK, so this just makes each of the sending threads ack, so that seems
OK.
But what happens with an error? multifd_send_sync_main exits it's
loop with a 'break' if the writes fail, and that could mean they never
come and post the flag-sync sem.

>  static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *p = opaque;
> @@ -778,17 +817,20 @@ static void *multifd_send_thread(void *opaque)
>              /* ToDo: send packet here */
>  
>              qemu_mutex_lock(&p->mutex);
> +            p->flags = 0;
>              p->pending_job--;
>              qemu_mutex_unlock(&p->mutex);
> -            continue;
> +
> +            if (flags & MULTIFD_FLAG_SYNC) {
> +                qemu_sem_post(&multifd_send_state->sem_sync);
> +            }
>          } else if (p->quit) {
>              qemu_mutex_unlock(&p->mutex);
>              break;
> +        } else {
> +            qemu_mutex_unlock(&p->mutex);
> +            /* sometimes there are spurious wakeups */
>          }
> -        qemu_mutex_unlock(&p->mutex);
> -        /* this is impossible */
> -        error_setg(&local_err, "multifd_send_thread: Unknown command");
> -        break;
>      }
>  
>  out:
> @@ -840,12 +882,14 @@ int multifd_save_setup(void)
>      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
>      atomic_set(&multifd_send_state->count, 0);
>      multifd_pages_init(&multifd_send_state->pages, page_count);
> +    qemu_sem_init(&multifd_send_state->sem_sync, 0);
>  
>      for (i = 0; i < thread_count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
>          qemu_mutex_init(&p->mutex);
>          qemu_sem_init(&p->sem, 0);
> +        qemu_sem_init(&p->sem_sync, 0);
>          p->quit = false;
>          p->pending_job = 0;
>          p->id = i;
> @@ -863,6 +907,10 @@ struct {
>      MultiFDRecvParams *params;
>      /* number of created threads */
>      int count;
> +    /* syncs main thread and channels */
> +    QemuSemaphore sem_sync;
> +    /* global number of generated multifd packets */
> +    uint32_t seq;
>  } *multifd_recv_state;
>  
>  static void multifd_recv_terminate_threads(Error *err)
> @@ -908,6 +956,7 @@ int multifd_load_cleanup(Error **errp)
>          p->c = NULL;
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        qemu_sem_destroy(&p->sem_sync);
>          g_free(p->name);
>          p->name = NULL;
>          multifd_pages_clear(p->pages);
> @@ -916,6 +965,7 @@ int multifd_load_cleanup(Error **errp)
>          g_free(p->packet);
>          p->packet = NULL;
>      }
> +    qemu_sem_destroy(&multifd_recv_state->sem_sync);
>      g_free(multifd_recv_state->params);
>      multifd_recv_state->params = NULL;
>      g_free(multifd_recv_state);
> @@ -924,6 +974,42 @@ int multifd_load_cleanup(Error **errp)
>      return ret;
>  }
>  
> +static void multifd_recv_sync_main(void)
> +{
> +    int i;
> +
> +    if (!migrate_use_multifd()) {
> +        return;
> +    }
> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> +        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +
> +        trace_multifd_recv_sync_main_signal(p->id);
> +        qemu_mutex_lock(&p->mutex);
> +        p->pending_job = true;
> +        qemu_mutex_unlock(&p->mutex);
> +    }
> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> +        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +
> +        trace_multifd_recv_sync_main_wait(p->id);
> +        qemu_sem_wait(&multifd_recv_state->sem_sync);
> +        qemu_mutex_lock(&p->mutex);
> +        if (multifd_recv_state->seq < p->seq) {
> +            multifd_recv_state->seq = p->seq;
> +        }

Can you explain what this is for?
Something like the latest received block?

> +        qemu_mutex_unlock(&p->mutex);
> +    }
> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> +        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +
> +        trace_multifd_recv_sync_main_signal(p->id);
> +
> +        qemu_sem_post(&p->sem_sync);
> +    }
> +    trace_multifd_recv_sync_main(multifd_recv_state->seq);
> +}
> +
>  static void *multifd_recv_thread(void *opaque)
>  {
>      MultiFDRecvParams *p = opaque;
> @@ -933,9 +1019,8 @@ static void *multifd_recv_thread(void *opaque)
>      trace_multifd_recv_thread_start(p->id);
>  
>      while (true) {
> -        qemu_sem_wait(&p->sem);
>          qemu_mutex_lock(&p->mutex);
> -        if (p->pending_job) {
> +        if (true || p->pending_job) {

A TODO I guess???

>              uint32_t used;
>              uint32_t flags;
>              qemu_mutex_unlock(&p->mutex);
> @@ -956,14 +1041,18 @@ static void *multifd_recv_thread(void *opaque)
>              p->num_packets++;
>              p->num_pages += used;
>              qemu_mutex_unlock(&p->mutex);
> +
> +            if (flags & MULTIFD_FLAG_SYNC) {
> +                qemu_sem_post(&multifd_recv_state->sem_sync);
> +                qemu_sem_wait(&p->sem_sync);
> +            }

Can you explain the receive side logic - I think this is waiting for all
receive threads to 'ack' - but how do we know that they've finished
receiving all data that was sent?

Dave

>          } else if (p->quit) {
>              qemu_mutex_unlock(&p->mutex);
>              break;
> +        } else {
> +            qemu_mutex_unlock(&p->mutex);
> +            /* sometimes there are spurious wakeups */
>          }
> -        qemu_mutex_unlock(&p->mutex);
> -        /* this is impossible */
> -        error_setg(&local_err, "multifd_recv_thread: Unknown command");
> -        break;
>      }
>  
>      if (local_err) {
> @@ -991,12 +1080,14 @@ int multifd_load_setup(void)
>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
>      atomic_set(&multifd_recv_state->count, 0);
> +    qemu_sem_init(&multifd_recv_state->sem_sync, 0);
>  
>      for (i = 0; i < thread_count; i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
>          qemu_mutex_init(&p->mutex);
>          qemu_sem_init(&p->sem, 0);
> +        qemu_sem_init(&p->sem_sync, 0);
>          p->quit = false;
>          p->pending_job = false;
>          p->id = i;
> @@ -2695,6 +2786,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
>      ram_control_before_iterate(f, RAM_CONTROL_SETUP);
>      ram_control_after_iterate(f, RAM_CONTROL_SETUP);
>  
> +    multifd_send_sync_main();
>      qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
>  
>      return 0;
> @@ -2770,6 +2862,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>       */
>      ram_control_after_iterate(f, RAM_CONTROL_ROUND);
>  
> +    multifd_send_sync_main();
>  out:
>      qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
>      ram_counters.transferred += 8;
> @@ -2823,6 +2916,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>  
>      rcu_read_unlock();
>  
> +    multifd_send_sync_main();
>      qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
>  
>      return 0;
> @@ -3253,6 +3347,7 @@ static int ram_load_postcopy(QEMUFile *f)
>              break;
>          case RAM_SAVE_FLAG_EOS:
>              /* normal exit */
> +            multifd_recv_sync_main();
>              break;
>          default:
>              error_report("Unknown combination of migration flags: %#x"
> @@ -3438,6 +3533,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>              break;
>          case RAM_SAVE_FLAG_EOS:
>              /* normal exit */
> +            multifd_recv_sync_main();
>              break;
>          default:
>              if (flags & RAM_SAVE_FLAG_HOOK) {
> diff --git a/migration/trace-events b/migration/trace-events
> index 9eee048287..b0ab8e2d03 100644
> --- a/migration/trace-events
> +++ b/migration/trace-events
> @@ -83,6 +83,12 @@ multifd_recv_thread_start(uint8_t id) "%d"
>  multifd_recv_thread_end(uint8_t id, uint32_t packets, uint32_t pages) "channel %d packets %d pages %d"
>  multifd_send(uint8_t id, uint32_t seq, uint32_t used, uint32_t flags) "channel %d seq number %d pages %d flags 0x%x"
>  multifd_recv(uint8_t id, uint32_t seq, uint32_t used, uint32_t flags) "channel %d seq number %d pages %d flags 0x%x"
> +multifd_send_sync_main(uint32_t seq) "seq %d"
> +multifd_send_sync_main_signal(uint8_t id) "channel %d"
> +multifd_send_sync_main_wait(uint8_t id) "channel %d"
> +multifd_recv_sync_main(uint32_t seq) "seq %d"
> +multifd_recv_sync_main_signal(uint8_t id) "channel %d"
> +multifd_recv_sync_main_wait(uint8_t id) "channel %d"
>  
>  # migration/migration.c
>  await_return_path_close_on_source_close(void) ""
> -- 
> 2.17.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 17/21] migration: Create ram_multifd_page
  2018-04-26  8:18   ` Peter Xu
@ 2018-05-03 11:30     ` Dr. David Alan Gilbert
  2018-05-23 11:13     ` Juan Quintela
  1 sibling, 0 replies; 60+ messages in thread
From: Dr. David Alan Gilbert @ 2018-05-03 11:30 UTC (permalink / raw)
  To: Peter Xu; +Cc: Juan Quintela, qemu-devel, lvivier

* Peter Xu (peterx@redhat.com) wrote:
> On Wed, Apr 25, 2018 at 01:27:19PM +0200, Juan Quintela wrote:
> > The function still don't use multifd, but we have simplified
> > ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
> > counter.
> > 
> > Signed-off-by: Juan Quintela <quintela@redhat.com>
> > 
> > --
> > Add last_page parameter
> > Add commets for done and address
> > Remove multifd field, it is the same than normal pages
> > Merge next patch, now we send multiple pages at a time
> > Remove counter for multifd pages, it is identical to normal pages
> > Use iovec's instead of creating the equivalent.
> > Clear memory used by pages (dave)
> > Use g_new0(danp)
> > define MULTIFD_CONTINUE
> > now pages member is a pointer
> > Fix off-by-one in number of pages in one packet
> > Remove RAM_SAVE_FLAG_MULTIFD_PAGE
> > s/multifd_pages_t/MultiFDPages_t/
> > ---
> >  migration/ram.c | 93 +++++++++++++++++++++++++++++++++++++++++++++++++
> >  1 file changed, 93 insertions(+)
> > 
> > diff --git a/migration/ram.c b/migration/ram.c
> > index 398cb0af3b..862ec53d32 100644
> > --- a/migration/ram.c
> > +++ b/migration/ram.c
> > @@ -54,6 +54,7 @@
> >  #include "migration/block.h"
> >  #include "sysemu/sysemu.h"
> >  #include "qemu/uuid.h"
> > +#include "qemu/iov.h"
> >  
> >  /***********************************************************/
> >  /* ram save/restore */
> > @@ -692,8 +693,65 @@ struct {
> >      QemuSemaphore sem_sync;
> >      /* global number of generated multifd packets */
> >      uint32_t seq;
> > +    /* send channels ready */
> > +    QemuSemaphore channels_ready;
> >  } *multifd_send_state;
> >  
> > +static void multifd_send_pages(void)
> > +{
> > +    int i;
> > +    static int next_channel;
> > +    MultiFDSendParams *p = NULL; /* make happy gcc */
> > +    MultiFDPages_t *pages = multifd_send_state->pages;
> > +
> > +    qemu_sem_wait(&multifd_send_state->channels_ready);
> 
> This sem is posted when a thread has finished its work.  However this
> is called in the main migration thread.  If with this line, are the
> threads really sending things in parallel?  Since it looks to me that
> this function (and the main thread) won't send the 2nd page array if
> the 1st hasn't finished, and won't send the 3rd if the 2nd hasn't,
> vice versa...
> 
> Maybe I misunderstood something.  Please feel free to correct me.

I share a similar misunderstanding;  except I can't understand how the
first item ever gets sent if we're waiting for channels_ready.
I think I could have understood it if there was a sem_post at the top of
multifd_send_thread.

Dave

> > +    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
> > +        p = &multifd_send_state->params[i];
> > +
> > +        qemu_mutex_lock(&p->mutex);
> > +        if (!p->pending_job) {
> > +            p->pending_job++;
> > +            next_channel = (i + 1) % migrate_multifd_channels();
> > +            break;
> > +        }
> > +        qemu_mutex_unlock(&p->mutex);
> > +    }
> > +    p->pages->used = 0;
> > +    multifd_send_state->seq++;
> > +    p->seq = multifd_send_state->seq;
> > +    p->pages->block = NULL;
> > +    multifd_send_state->pages = p->pages;
> > +    p->pages = pages;
> 
> Here we directly replaced MultiFDSendParams.pages with
> multifd_send_state->pages.  Then are we always using a single
> MultiFDPages_t struct?  And if so, will all the initial
> MultiFDSendParams.pages memory leaked without freed?
> 
> > +    qemu_mutex_unlock(&p->mutex);
> > +    qemu_sem_post(&p->sem);
> > +}
> 
> Thanks,
> 
> -- 
> Peter Xu
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 18/21] migration: Start sending messages
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 18/21] migration: Start sending messages Juan Quintela
@ 2018-05-03 14:55   ` Dr. David Alan Gilbert
  2018-05-23 10:51     ` Juan Quintela
  0 siblings, 1 reply; 60+ messages in thread
From: Dr. David Alan Gilbert @ 2018-05-03 14:55 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 30 ++++++++++++++++++++++++------
>  1 file changed, 24 insertions(+), 6 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 862ec53d32..9adbaa81f9 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -625,9 +625,6 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
>      RAMBlock *block;
>      int i;
>  
> -    /* ToDo: We can't use it until we haven't received a message */
> -    return 0;
> -
>      be32_to_cpus(&packet->magic);
>      if (packet->magic != MULTIFD_MAGIC) {
>          error_setg(errp, "multifd: received packet "
> @@ -851,6 +848,7 @@ static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *p = opaque;
>      Error *local_err = NULL;
> +    int ret;
>  
>      trace_multifd_send_thread_start(p->id);
>  
> @@ -878,10 +876,18 @@ static void *multifd_send_thread(void *opaque)
>  
>              trace_multifd_send(p->id, seq, used, flags);
>  
> -            /* ToDo: send packet here */
> +            ret = qio_channel_write_all(p->c, (void *)p->packet,
> +                                        p->packet_len, &local_err);
> +            if (ret != 0) {
> +                break;
> +            }
> +
> +            ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err);
> +            if (ret != 0) {
> +                break;
> +            }
>  
>              qemu_mutex_lock(&p->mutex);
> -            p->flags = 0;

What's this change?

Other than that looks OK.

Dave

>              p->pending_job--;
>              qemu_mutex_unlock(&p->mutex);
>  
> @@ -1091,7 +1097,14 @@ static void *multifd_recv_thread(void *opaque)
>              uint32_t flags;
>              qemu_mutex_unlock(&p->mutex);
>  
> -            /* ToDo: recv packet here */
> +            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
> +                                           p->packet_len, &local_err);
> +            if (ret == 0) {   /* EOF */
> +                break;
> +            }
> +            if (ret == -1) {   /* Error */
> +                break;
> +            }
>  
>              qemu_mutex_lock(&p->mutex);
>              ret = multifd_recv_unfill_packet(p, &local_err);
> @@ -1108,6 +1121,11 @@ static void *multifd_recv_thread(void *opaque)
>              p->num_pages += used;
>              qemu_mutex_unlock(&p->mutex);
>  
> +            ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
> +            if (ret != 0) {
> +                break;
> +            }
> +
>              if (flags & MULTIFD_FLAG_SYNC) {
>                  qemu_sem_post(&multifd_recv_state->sem_sync);
>                  qemu_sem_wait(&p->sem_sync);
> -- 
> 2.17.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 19/21] migration: Wait for blocking IO
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 19/21] migration: Wait for blocking IO Juan Quintela
@ 2018-05-03 15:04   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 60+ messages in thread
From: Dr. David Alan Gilbert @ 2018-05-03 15:04 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> We have three conditions here:
> - channel fails -> error
> - we have to quit: we close the channel and reads fails
> - normal read that success, we are in bussiness
> 
> So forget the complications of waiting in a semaphore.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

I *think* this is OK; but I'd prefer to see most of this folded into
earlier patches where I think they'd be cleaner rather than putting
stuff in and shuffling it around.

Dave

> ---
>  migration/ram.c | 81 ++++++++++++++++++-------------------------------
>  1 file changed, 29 insertions(+), 52 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 9adbaa81f9..2734f91ded 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -496,8 +496,6 @@ typedef struct {
>      bool running;
>      /* should this thread finish */
>      bool quit;
> -    /* thread has work to do */
> -    bool pending_job;
>      /* array of pages to receive */
>      MultiFDPages_t *pages;
>      /* packet allocated len */
> @@ -1056,14 +1054,6 @@ static void multifd_recv_sync_main(void)
>      for (i = 0; i < migrate_multifd_channels(); i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
> -        trace_multifd_recv_sync_main_signal(p->id);
> -        qemu_mutex_lock(&p->mutex);
> -        p->pending_job = true;
> -        qemu_mutex_unlock(&p->mutex);
> -    }
> -    for (i = 0; i < migrate_multifd_channels(); i++) {
> -        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> -
>          trace_multifd_recv_sync_main_wait(p->id);
>          qemu_sem_wait(&multifd_recv_state->sem_sync);
>          qemu_mutex_lock(&p->mutex);
> @@ -1076,7 +1066,6 @@ static void multifd_recv_sync_main(void)
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
>          trace_multifd_recv_sync_main_signal(p->id);
> -
>          qemu_sem_post(&p->sem_sync);
>      }
>      trace_multifd_recv_sync_main(multifd_recv_state->seq);
> @@ -1091,51 +1080,40 @@ static void *multifd_recv_thread(void *opaque)
>      trace_multifd_recv_thread_start(p->id);
>  
>      while (true) {
> +        uint32_t used;
> +        uint32_t flags;
> +
> +        ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
> +                                       p->packet_len, &local_err);
> +        if (ret == 0) {   /* EOF */
> +            break;
> +        }
> +        if (ret == -1) {   /* Error */
> +            break;
> +        }
> +
>          qemu_mutex_lock(&p->mutex);
> -        if (true || p->pending_job) {
> -            uint32_t used;
> -            uint32_t flags;
> -            qemu_mutex_unlock(&p->mutex);
> -
> -            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
> -                                           p->packet_len, &local_err);
> -            if (ret == 0) {   /* EOF */
> -                break;
> -            }
> -            if (ret == -1) {   /* Error */
> -                break;
> -            }
> -
> -            qemu_mutex_lock(&p->mutex);
> -            ret = multifd_recv_unfill_packet(p, &local_err);
> -            if (ret) {
> -                qemu_mutex_unlock(&p->mutex);
> -                break;
> -            }
> -
> -            used = p->pages->used;
> -            flags = p->flags;
> -            trace_multifd_recv(p->id, p->seq, used, flags);
> -            p->pending_job = false;
> -            p->num_packets++;
> -            p->num_pages += used;
> +        ret = multifd_recv_unfill_packet(p, &local_err);
> +        if (ret) {
>              qemu_mutex_unlock(&p->mutex);
> +            break;
> +        }
>  
> -            ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
> -            if (ret != 0) {
> -                break;
> -            }
> +        used = p->pages->used;
> +        flags = p->flags;
> +        trace_multifd_recv(p->id, p->seq, used, flags);
> +        p->num_packets++;
> +        p->num_pages += used;
> +        qemu_mutex_unlock(&p->mutex);
>  
> -            if (flags & MULTIFD_FLAG_SYNC) {
> -                qemu_sem_post(&multifd_recv_state->sem_sync);
> -                qemu_sem_wait(&p->sem_sync);
> -            }
> -        } else if (p->quit) {
> -            qemu_mutex_unlock(&p->mutex);
> +        ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
> +        if (ret != 0) {
>              break;
> -        } else {
> -            qemu_mutex_unlock(&p->mutex);
> -            /* sometimes there are spurious wakeups */
> +        }
> +
> +        if (flags & MULTIFD_FLAG_SYNC) {
> +            qemu_sem_post(&multifd_recv_state->sem_sync);
> +            qemu_sem_wait(&p->sem_sync);
>          }
>      }
>  
> @@ -1173,7 +1151,6 @@ int multifd_load_setup(void)
>          qemu_sem_init(&p->sem, 0);
>          qemu_sem_init(&p->sem_sync, 0);
>          p->quit = false;
> -        p->pending_job = false;
>          p->id = i;
>          multifd_pages_init(&p->pages, page_count);
>          p->packet_len = sizeof(MultiFDPacket_t)
> -- 
> 2.17.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 21/21] migration: Stop sending whole pages through main channel
  2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 21/21] migration: Stop sending whole pages through main channel Juan Quintela
@ 2018-05-03 15:24   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 60+ messages in thread
From: Dr. David Alan Gilbert @ 2018-05-03 15:24 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> We have to flush() the QEMUFile because now we sent really few data
> through that channel.

I think this makes sense, so


Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

However, I think again it could really be folded into an earlier patch,
except the fflush's themselves which do make sense here.

Dave

> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 11 +++--------
>  1 file changed, 3 insertions(+), 8 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 23203756b7..f5cff2eb59 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -1634,20 +1634,12 @@ static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
>                              bool last_stage)
>  {
>      int pages;
> -    uint8_t *p;
>      RAMBlock *block = pss->block;
>      ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
>  
> -    p = block->host + offset;
> -
>      pages = save_zero_page(rs, block, offset);
>      if (pages == -1) {
> -        ram_counters.transferred +=
> -            save_page_header(rs, rs->f, block,
> -                             offset | RAM_SAVE_FLAG_PAGE);
>          multifd_queue_page(block, offset);
> -        qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
> -        ram_counters.transferred += TARGET_PAGE_SIZE;
>          pages = 1;
>          ram_counters.normal++;
>      }
> @@ -2869,6 +2861,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
>  
>      multifd_send_sync_main();
>      qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
> +    qemu_fflush(f);
>  
>      return 0;
>  }
> @@ -2946,6 +2939,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>      multifd_send_sync_main();
>  out:
>      qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
> +    qemu_fflush(f);
>      ram_counters.transferred += 8;
>  
>      ret = qemu_file_get_error(f);
> @@ -2999,6 +2993,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>  
>      multifd_send_sync_main();
>      qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
> +    qemu_fflush(f);
>  
>      return 0;
>  }
> -- 
> 2.17.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 00/21] Multifd
  2018-04-25 11:44 ` [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
@ 2018-05-03 15:32   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 60+ messages in thread
From: Dr. David Alan Gilbert @ 2018-05-03 15:32 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> Juan Quintela <quintela@redhat.com> wrote:
> > Hi
> >
> >
> > [v12]
> >
> > Big news, it is not RFC anymore, it works reliabely for me.
> >
> > Changes:
> > - Locknig changed completely (several times)
> > - We now send  all pages through the channels.  In a 2GB guest with 1 disk and a network card, the amount of data send for RAM was 80KB.
> > - This is not optimized yet, but it shouws clear improvements over precopy.  testing over localhost networking I can guet:
> >   - 2 VCPUs guest
> >   - 2GB RAM
> >   - runn stress --vm 4 --vm 500GB (i.e. dirtying 2GB or RAM each second)
> >
> >   - Total time: precopy ~50seconds, multifd  around 11seconds
> >   - Bandwidth usage is around 273MB/s vs 71MB/s on the same hardware
> >
> > This is very preleminary testing, will send more numbers when I got them.  But looks promissing.
> >
> > Things that will be improved later:
> > - Initial synchronization is too slow (around 1s)
> > - We synchronize all threads after each RAM section, we can move to only
> >   synchronize them after we have done a bitmap syncrhronization
> > - We can improve bitmap walking (but that is independent of multifd)
> 
> I forgot to put there that on the last 4 patches, I have not been able
> to split them in a way that:
> - is logical for review
> - works for multifd tests in all versions
> 
> So, I ended trynig to get the "logical" viewe, and it works after the
> last patch.  Why is that?
> - Before I am able to transmit data, I need to be able to
>   end/synchronize the different channels
> - To finish channels in case of error, I just close the channels
>   But I can't opet then yet.
> 
> I have to think if I can come with a simpler way to split it, but you
> can also consider that the  last 3-4 patches are a single one.

I think most of the last few can be flattened into earlier patches;
I'd prefer it rather than having patches that add stuff and then they
get reworked/removed later.

I don't think it matters that the order of the last few doesn't work
until the end; since it didn't work at the beginning, it doesn't matter
until the end of the series.

Dave

> Later, Juan.
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 05/21] migration: Export functions to create send channels
  2018-04-26  7:28   ` Peter Xu
@ 2018-05-09  8:05     ` Juan Quintela
  0 siblings, 0 replies; 60+ messages in thread
From: Juan Quintela @ 2018-05-09  8:05 UTC (permalink / raw)
  To: Peter Xu; +Cc: qemu-devel, dgilbert, lvivier

Peter Xu <peterx@redhat.com> wrote:
> On Wed, Apr 25, 2018 at 01:27:07PM +0200, Juan Quintela wrote:
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>
>> ---
>>  migration/socket.c | 28 +++++++++++++++++++++++++++-
>>  migration/socket.h |  7 +++++++
>>  2 files changed, 34 insertions(+), 1 deletion(-)
>> 
>> diff --git a/migration/socket.c b/migration/socket.c
>> index edf33c70cf..893a04f4cc 100644
>> --- a/migration/socket.c
>> +++ b/migration/socket.c
>> @@ -29,6 +29,28 @@
>>  #include "trace.h"
>>  
>>  
>> +struct SocketOutgoingArgs {
>> +    SocketAddress *saddr;
>> +} outgoing_args;
>
> I am not sure whether I have asked before, but... could we put this
> into MigrateState*?  The thing is that introducing more global
> variables will make things scattered, and we do stuff to merge them
> (like the RAMState cleanup work).  IMHO it saves time if we can do it
> from the very beginning.

we could, but this file don't depend at all on migration, so I didn't
want to put that outside of this file, that is th ereason that it is
this way.

>
>> +
>> +void socket_send_channel_create(QIOTaskFunc f, void *data)
>> +{
>> +    QIOChannelSocket *sioc = qio_channel_socket_new();
>> +    qio_channel_socket_connect_async(sioc, outgoing_args.saddr,
>> +                                     f, data, NULL, NULL);
>> +}
>> +
>> +int socket_send_channel_destroy(QIOChannel *send)
>> +{
>> +    /* Remove channel */
>> +    object_unref(OBJECT(send));
>> +    if (outgoing_args.saddr) {
>> +        qapi_free_SocketAddress(outgoing_args.saddr);
>> +        outgoing_args.saddr = NULL;
>> +    }
>> +    return 0;
>> +}
>
> Here I would possibly avoid bothering introducing the two new APIs
> since AFAIU they didn't do much things, and both of them are only
> called once...  And IMHO when we call socket_send_channel_create() in
> multifd_save_setup() we can initialize MultiFDSendParams->c already
> with the object returned by qio_channel_socket_new() if without the
> API, instead of waiting until multifd_new_send_channel_async() is
> called.

We can do it that way, but then we need the migration code to learn more
about this channels stuff.  You can't have both.  My understanding is
that the other functions are alrady quite complicated.

Later, Juan.

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 01/21] migration: Set error state in case of error
  2018-05-02 15:53   ` Dr. David Alan Gilbert
@ 2018-05-09  8:15     ` Juan Quintela
  0 siblings, 0 replies; 60+ messages in thread
From: Juan Quintela @ 2018-05-09  8:15 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, lvivier, peterx

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/ram.c | 24 ++++++++++++++++++++++--
>>  1 file changed, 22 insertions(+), 2 deletions(-)
>> 
>> diff --git a/migration/ram.c b/migration/ram.c
>> index 0e90efa092..2ae560ea80 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -415,10 +415,20 @@ struct {
>>      int count;
>>  } *multifd_send_state;
>>  
>> -static void terminate_multifd_send_threads(Error *errp)
>> +static void terminate_multifd_send_threads(Error *err)
>>  {
>>      int i;
>>  
>> +    if (err) {
>> +        MigrationState *s = migrate_get_current();
>> +        migrate_set_error(s, err);
>> +        if (s->state == MIGRATION_STATUS_SETUP ||
>> +            s->state == MIGRATION_STATUS_ACTIVE) {
>
> Can you explain/add comment why these only set it in some states?
> For example what about PRE_SWITCHOVER, DEVICE or postcopy?

This were the ones needed when I did the patch? O:-)

Ok, looking at all of them:

    MIGRATION_STATUS_NONE = 0,

We can't be on that state

    MIGRATION_STATUS_SETUP = 1,
There

MIGRATION_STATUS_CANCELLING = 2,
    MIGRATION_STATUS_CANCELLED = 3,

We don't want to touch in the case of that two.

    MIGRATION_STATUS_ACTIVE = 4,

There.

    MIGRATION_STATUS_POSTCOPY_ACTIVE = 5,
    MIGRATION_STATUS_POSTCOPY_PAUSED = 6,

We are not supporing those yet.

    MIGRATION_STATUS_COMPLETED = 7,

This is impossible (TM)

    MIGRATION_STATUS_FAILED = 8,

We don't want to change.

    MIGRATION_STATUS_COLO = 9,

Not contemplated yet
    MIGRATION_STATUS_PRE_SWITCHOVER = 10,
    MIGRATION_STATUS_DEVICE = 11,

Needed.
    MIGRATION_STATUS__MAX = 12,

Irrelelevant.

So, I agree with your suggestion.

Later, Juan.


> Dave
>
>> +            migrate_set_state(&s->state, s->state,
>> +                              MIGRATION_STATUS_FAILED);
>> +        }
>> +    }
>> +
>>      for (i = 0; i < multifd_send_state->count; i++) {
>>          MultiFDSendParams *p = &multifd_send_state->params[i];
>>  
>> @@ -515,10 +525,20 @@ struct {
>>      int count;
>>  } *multifd_recv_state;
>>  
>> -static void terminate_multifd_recv_threads(Error *errp)
>> +static void terminate_multifd_recv_threads(Error *err)
>>  {
>>      int i;
>>  
>> +    if (err) {
>> +        MigrationState *s = migrate_get_current();
>> +        migrate_set_error(s, err);
>> +        if (s->state == MIGRATION_STATUS_SETUP ||
>> +            s->state == MIGRATION_STATUS_ACTIVE) {
>> +            migrate_set_state(&s->state, s->state,
>> +                              MIGRATION_STATUS_FAILED);
>> +        }
>> +    }
>> +
>>      for (i = 0; i < multifd_recv_state->count; i++) {
>>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>>  
>> -- 
>> 2.17.0
>> 
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 08/21] migration: Transmit initial package through the multifd channels
  2018-05-02 17:19   ` Dr. David Alan Gilbert
@ 2018-05-09  8:34     ` Juan Quintela
  0 siblings, 0 replies; 60+ messages in thread
From: Juan Quintela @ 2018-05-09  8:34 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, lvivier, peterx

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>
>
> Please print the magic with %x.

done.

>
>> +        return -1;
>> +    }
>> +
>> +    if (msg.version != MULTIFD_VERSION) {
>> +        error_setg(errp, "multifd: received packet version %d "
>> +                   "expected %d", msg.version, MULTIFD_VERSION);
>> +        return -1;
>> +    }
>> +
>> +    if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
>> +        char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
>> +        error_setg(errp, "multifd: received uuid '%s' and expected "
>> +                   "uuid '%s' for channel %hhd", msg.uuid, uuid, msg.id);
>
> I don't think you can just print msg.uuid there; it's the raw 16 bytes
> isn't it rather than a string?

You ae right.

> I suspect you need to do the unprase on both of them.

Done.

Thanks, Juan.

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 10/21] migration: Create multipage support
  2018-04-26  7:15   ` Peter Xu
@ 2018-05-09 10:52     ` Juan Quintela
  0 siblings, 0 replies; 60+ messages in thread
From: Juan Quintela @ 2018-05-09 10:52 UTC (permalink / raw)
  To: Peter Xu; +Cc: qemu-devel, dgilbert, lvivier

Peter Xu <peterx@redhat.com> wrote:
> On Wed, Apr 25, 2018 at 01:27:12PM +0200, Juan Quintela wrote:
>
> [...]
>
>> +static void multifd_pages_init(MultiFDPages_t **ppages, size_t size)
>> +{
>> +    MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
>> +
>> +    pages->allocated = size;
>> +    pages->iov = g_new0(struct iovec, size);
>> +    pages->offset = g_new0(ram_addr_t, size);
>> +    *ppages = pages;
>> +}
>
> Can we just return the pages pointer?  Then it can be:
>
>   static MultiFDPages_t *multifd_pages_init(size_t size)

Done.

>
> [...]
>
>> @@ -731,6 +784,7 @@ static void *multifd_recv_thread(void *opaque)
>>  int multifd_load_setup(void)
>>  {
>>      int thread_count;
>> +    uint32_t page_count = migrate_multifd_page_count();
>>      uint8_t i;
>>  
>>      if (!migrate_use_multifd()) {
>> @@ -740,6 +794,7 @@ int multifd_load_setup(void)
>>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
>>      atomic_set(&multifd_recv_state->count, 0);
>> +
>
> Useless line?

Changed it a lot in other places.

Later, Juan.

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 10/21] migration: Create multipage support
  2018-05-02 17:52   ` Dr. David Alan Gilbert
@ 2018-05-09 10:53     ` Juan Quintela
  0 siblings, 0 replies; 60+ messages in thread
From: Juan Quintela @ 2018-05-09 10:53 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, lvivier, peterx

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> We only create/destry the page list here.  We will use it later.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/ram.c | 56 +++++++++++++++++++++++++++++++++++++++++++++++++
>>  1 file changed, 56 insertions(+)
>> 
>> diff --git a/migration/ram.c b/migration/ram.c
>> index ffefa73099..b19300992e 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -412,6 +412,20 @@ typedef struct {
>>      uint8_t id;
>>  } __attribute__((packed)) MultiFDInit_t;
>>  
>> +typedef struct {
>> +    /* number of used pages */
>> +    uint32_t used;
>> +    /* number of allocated pages */
>> +    uint32_t allocated;
>> +    /* global number of generated multifd packets */
>> +    uint32_t seq;
>
> Is that sufficiently large?
> Consider a 40Gbps link; that's ~4GByte/second,
> so if each packet is a 4K page, that's just over one hour at
> that link speed;  that's a long migration on a fast link, but
> it's not impossible is it - and what happens when it wraps?

Nothing really, it is just a counter that is used for traces.

Later, Juan.

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 11/21] migration: Create multifd packet
  2018-05-02 18:04   ` Dr. David Alan Gilbert
@ 2018-05-09 11:09     ` Juan Quintela
  2018-05-09 11:12       ` Dr. David Alan Gilbert
  0 siblings, 1 reply; 60+ messages in thread
From: Juan Quintela @ 2018-05-09 11:09 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, lvivier, peterx

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> We still don't put anything there.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/ram.c | 137 +++++++++++++++++++++++++++++++++++++++++++++++-
>>  1 file changed, 136 insertions(+), 1 deletion(-)
>> +    be32_to_cpus(&packet->magic);
>> +    if (packet->magic != MULTIFD_MAGIC) {
>> +        error_setg(errp, "multifd: received packet "
>> +                   "version %d and expected version %d",
>> +                   packet->magic, MULTIFD_VERSION);
>
> That's mixing magic and version. (Magic's as %x please)

Oops, fixed.


>> +    p->seq = be32_to_cpu(packet->seq);
>> +
>> +    if (p->pages->used) {
>> +        block = qemu_ram_block_by_name(packet->ramblock);
>
> Do you need to ensure that packet->ramblock is a terminated string
> first?

packet->ramblock[255] = 0;

>
>> +        if (!block) {
>> +            error_setg(errp, "multifd: unknown ram block %s",
>> +                       packet->ramblock);
>> +            return -1;
>> +        }
>> +    }
>> +
>> +    for (i = 0; i < p->pages->used; i++) {
>> +        ram_addr_t offset = be64_to_cpu(packet->offset[i]);
>> +
>> +        p->pages->iov[i].iov_base = block->host + offset;
>
> I think that needs validating to ensure that the source didn't
> send us junk and cause us to overwrite after the end of block->host

        if (offset > block->used_length) {
            error_setg(errp, "multifd: offest too long %" PRId64
                       " (max %" PRId64 ")",
                       offset, block->max_length);
            return -1;
        }
??

Thanks, Juan.

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 12/21] migration: Add multifd traces for start/end thread
  2018-05-02 18:35   ` Dr. David Alan Gilbert
@ 2018-05-09 11:11     ` Juan Quintela
  0 siblings, 0 replies; 60+ messages in thread
From: Juan Quintela @ 2018-05-09 11:11 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, lvivier, peterx

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> We want to know how many pages/packets each channel has sent.  Add
>> counters for those.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/ram.c        | 20 ++++++++++++++++++++
>>  migration/trace-events |  4 ++++
>>  2 files changed, 24 insertions(+)

>>  typedef struct {
>> @@ -495,6 +499,10 @@ typedef struct {
>>      uint32_t flags;
>>      /* global number of generated multifd packets */
>>      uint32_t seq;
>> +    /* packets sent through this channel */
>> +    uint32_t num_packets;
>> +    /* pages sent through this channel */
>> +    uint32_t num_pages;
>
> Doesn't the comment for 'mutex' say 'protects the following parameters'
> (added in 09/21) ?  In which case....

This two are really local.  Fixing comment.

Thanks, Juan.

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 11/21] migration: Create multifd packet
  2018-05-09 11:09     ` Juan Quintela
@ 2018-05-09 11:12       ` Dr. David Alan Gilbert
  2018-05-09 19:46         ` Juan Quintela
  0 siblings, 1 reply; 60+ messages in thread
From: Dr. David Alan Gilbert @ 2018-05-09 11:12 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> > * Juan Quintela (quintela@redhat.com) wrote:
> >> We still don't put anything there.
> >> 
> >> Signed-off-by: Juan Quintela <quintela@redhat.com>
> >> ---
> >>  migration/ram.c | 137 +++++++++++++++++++++++++++++++++++++++++++++++-
> >>  1 file changed, 136 insertions(+), 1 deletion(-)
> >> +    be32_to_cpus(&packet->magic);
> >> +    if (packet->magic != MULTIFD_MAGIC) {
> >> +        error_setg(errp, "multifd: received packet "
> >> +                   "version %d and expected version %d",
> >> +                   packet->magic, MULTIFD_VERSION);
> >
> > That's mixing magic and version. (Magic's as %x please)
> 
> Oops, fixed.
> 
> 
> >> +    p->seq = be32_to_cpu(packet->seq);
> >> +
> >> +    if (p->pages->used) {
> >> +        block = qemu_ram_block_by_name(packet->ramblock);
> >
> > Do you need to ensure that packet->ramblock is a terminated string
> > first?
> 
> packet->ramblock[255] = 0;
> 
> >
> >> +        if (!block) {
> >> +            error_setg(errp, "multifd: unknown ram block %s",
> >> +                       packet->ramblock);
> >> +            return -1;
> >> +        }
> >> +    }
> >> +
> >> +    for (i = 0; i < p->pages->used; i++) {
> >> +        ram_addr_t offset = be64_to_cpu(packet->offset[i]);
> >> +
> >> +        p->pages->iov[i].iov_base = block->host + offset;
> >
> > I think that needs validating to ensure that the source didn't
> > send us junk and cause us to overwrite after the end of block->host
> 
>         if (offset > block->used_length) {
>             error_setg(errp, "multifd: offest too long %" PRId64
>                        " (max %" PRId64 ")",
>                        offset, block->max_length);
>             return -1;
>         }
> ??

It's probably  (offset + TARGET_PAGE_SIZE) that needs checking
but it needs doing in a wrap-safe way.

Dave

> Thanks, Juan.
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 13/21] migration: Calculate transferred ram correctly
  2018-05-02 18:59   ` Dr. David Alan Gilbert
@ 2018-05-09 11:14     ` Juan Quintela
  2018-05-09 19:46     ` Juan Quintela
  1 sibling, 0 replies; 60+ messages in thread
From: Juan Quintela @ 2018-05-09 11:14 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, lvivier, peterx

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> On multifd we send data from more places that main channel.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/migration.c | 11 +++++++++--
>>  1 file changed, 9 insertions(+), 2 deletions(-)
>> 
>> diff --git a/migration/migration.c b/migration/migration.c
>> index 9b510a809a..75d30661e9 100644
>> --- a/migration/migration.c
>> +++ b/migration/migration.c
>> @@ -2246,12 +2246,19 @@ static void migration_update_counters(MigrationState *s,
>>  {
>>      uint64_t transferred, time_spent;
>>      double bandwidth;
>> +    uint64_t now;
>>  
>>      if (current_time < s->iteration_start_time + BUFFER_DELAY) {
>>          return;
>>      }
>>  
>> -    transferred = qemu_ftell(s->to_dst_file) - s->iteration_initial_bytes;
>> +    if (migrate_use_multifd()) {
>> +        now = ram_counters.normal * qemu_target_page_size()
>> +            + qemu_ftell(s->to_dst_file);
>
> OK, can I just confirm, the 'multifd packets' go over the main fd, and
> just the pags are going over the other fd's?  In which case this is
> right; but if the headers are going over the other fd's as well then
> this is wrong.

We are not counting the headers on the multifd pages, you are right. fixing.

Thanks, Juan.

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 16/21] migration: Synchronize multifd threads with main thread
  2018-05-03 10:44   ` Dr. David Alan Gilbert
@ 2018-05-09 19:45     ` Juan Quintela
  2018-05-11 16:32       ` Dr. David Alan Gilbert
  0 siblings, 1 reply; 60+ messages in thread
From: Juan Quintela @ 2018-05-09 19:45 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, lvivier, peterx

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> We synchronize all threads each RAM_SAVE_FLAG_EOS.  Bitmap
>> synchronizations don't happen inside a  ram section, so we are safe
>> about two channels trying to overwrite the same memory.
>
> OK, that's quite neat - so you don't need any extra flags in the stream
> to do the sync;  it probably needs a comment in the code somewhere so we
> don't forget!

Thanks.

>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/ram.c        | 118 +++++++++++++++++++++++++++++++++++++----
>>  migration/trace-events |   6 +++
>>  2 files changed, 113 insertions(+), 11 deletions(-)
>> 
>> diff --git a/migration/ram.c b/migration/ram.c
>> index c4c185cc4c..398cb0af3b 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -405,6 +405,8 @@ static void compress_threads_save_setup(void)
>>  #define MULTIFD_MAGIC 0x11223344U
>>  #define MULTIFD_VERSION 1
>>  
>> +#define MULTIFD_FLAG_SYNC (1 << 0)
>> +
>>  typedef struct {
>>      uint32_t magic;
>>      uint32_t version;
>> @@ -471,6 +473,8 @@ typedef struct {
>>      uint32_t num_packets;
>>      /* pages sent through this channel */
>>      uint32_t num_pages;
>> +    /* syncs main thread and channels */
>> +    QemuSemaphore sem_sync;
>>  }  MultiFDSendParams;
>>  
>>  typedef struct {
>> @@ -507,6 +511,8 @@ typedef struct {
>>      uint32_t num_packets;
>>      /* pages sent through this channel */
>>      uint32_t num_pages;
>> +    /* syncs main thread and channels */
>> +    QemuSemaphore sem_sync;
>>  } MultiFDRecvParams;
>>  
>>  static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
>> @@ -682,6 +688,10 @@ struct {
>>      int count;
>>      /* array of pages to sent */
>>      MultiFDPages_t *pages;
>> +    /* syncs main thread and channels */
>> +    QemuSemaphore sem_sync;
>> +    /* global number of generated multifd packets */
>> +    uint32_t seq;
>
> It's interesting you use the same comment for 'seq' in
> MultiFDSendParams - but I guess that means only this one is the global
> version and the others aren't really global number - they're just
> local to that thread?

Only place that "increases/generates" seq is multifd_send_pages(), that
is what creates a new packet to be sent.  So, if we see _any_ packet on
the wire, we know the real global ordering.  They are only used for
traces, to se that packet 42 was sent through channel 3, and on
reception you check that packet 42 is what you received through channel
3.  They only appears on traces, but I find they useful for debugging
synchcronization errors.


>> +    for (i = 0; i < migrate_multifd_channels(); i++) {
>> +        MultiFDSendParams *p = &multifd_send_state->params[i];
>> +
>> +        trace_multifd_send_sync_main_signal(p->id);
>> +
>> +        qemu_mutex_lock(&p->mutex);
>> +        p->flags |= MULTIFD_FLAG_SYNC;
>> +        p->pending_job++;
>> +        qemu_mutex_unlock(&p->mutex);
>> +        qemu_sem_post(&p->sem);
>> +    }

[1]

>> +    for (i = 0; i < migrate_multifd_channels(); i++) {
>> +        MultiFDSendParams *p = &multifd_send_state->params[i];
>> +
>> +        trace_multifd_send_sync_main_wait(p->id);
>> +        qemu_sem_wait(&multifd_send_state->sem_sync);
>> +    }

[2]

>> +    trace_multifd_send_sync_main(multifd_send_state->seq);
>> +}
>> +
>
> OK, so this just makes each of the sending threads ack, so that seems
> OK.
> But what happens with an error? multifd_send_sync_main exits it's
> loop with a 'break' if the writes fail, and that could mean they never
> come and post the flag-sync sem.

Let's see.

[1]: we are just doing mutex_lock/sem_post(), if we are not able to do
that, we have got a big race that needs to be fixed.  So that bit is ok.

[2]: We do an unconditional sem_wait().  Looking at the worker code.
     In this patch level, we are ok, but I agree with you than in later
     patches, we need to also do the post on the error case.  Changing.

>> +
>> +        trace_multifd_recv_sync_main_wait(p->id);
>> +        qemu_sem_wait(&multifd_recv_state->sem_sync);
>> +        qemu_mutex_lock(&p->mutex);
>> +        if (multifd_recv_state->seq < p->seq) {
>> +            multifd_recv_state->seq = p->seq;
>> +        }
>
> Can you explain what this is for?
> Something like the latest received block?

When we are at a synhronization point, we don't know on the main thread
when that synchronization happened (at what packet considered as a
logical list of packages).  So, we choose 'seq' from the channel with
the highest number.   That is the one that we want.  We only use this
for tracing, so we can "match" that we did a synchronization on the send
side at packet N and we see the trace at reception side that we did it
at packet N also.

Remember than in a  previous patch you asked me what happened if this
does a wark around?  At that point nothing.  But now I need to change
this code to be.


    multifd_recv_state->seq = 0;
    for (i = 0; i < migrate_multifd_channels(); i++) {
        MultiFDRecvParams *p = &multifd_recv_state->params[i];
        ...
        if (multifd_recv_state->seq < p->seq) {
            multifd_recv_state->seq = p->seq;
        }

And I have fixed the workaround problem, no?

>> @@ -933,9 +1019,8 @@ static void *multifd_recv_thread(void *opaque)
>>      trace_multifd_recv_thread_start(p->id);
>>  
>>      while (true) {
>> -        qemu_sem_wait(&p->sem);
>>          qemu_mutex_lock(&p->mutex);
>> -        if (p->pending_job) {
>> +        if (true || p->pending_job) {
>
> A TODO I guess???

Oops, that should be out.

Fixed on next version.

>>              uint32_t used;
>>              uint32_t flags;
>>              qemu_mutex_unlock(&p->mutex);
>> @@ -956,14 +1041,18 @@ static void *multifd_recv_thread(void *opaque)
>>              p->num_packets++;
>>              p->num_pages += used;
>>              qemu_mutex_unlock(&p->mutex);
>> +
>> +            if (flags & MULTIFD_FLAG_SYNC) {
>> +                qemu_sem_post(&multifd_recv_state->sem_sync);
>> +                qemu_sem_wait(&p->sem_sync);
>> +            }
>
> Can you explain the receive side logic - I think this is waiting for all
> receive threads to 'ack' - but how do we know that they've finished
> receiving all data that was sent?

Because they need to receive a packet with MULTIFD_FLAG_SYNC sent.  And
if they receive that flag, we know that is the last one of the sequence.

synchrconization works like (2 channels to make things easy):

                main thread:
                we finish a RAM_SECTION;
                flush pending packets to one of the channels
                send packet with MULTIFD_FLAG_SYNC for all the channels
                wait unil all the channels have processesed the FLAG_SYNC
                At this point send the RAM_SECTION_EOS footer.

worker1                                                worker 2

if there is a pending packet, send it                  if there is a pending packet, send it
(notice that there can't be more than one ever)
send a pacet with SYNC flag set                        send a pacet with SYNC flag set

On recetpion side


              main thread
              receives RAM_SECTION_EOS footer
              wait for works to receive a sync

worker1                                                worker1
process any pending packet(no sync)                    process any pending packet(no sync)
process packet with SYNC                               process packet with SYNC
post main thread                                       post main thread

              now main thread can continue

Notice that we don't care what happens first, receiving packet with SYNC
in workeers or RAM_SECTION_EOS on main thread, all works as expected.

Noticing how long took to explain this, I think that I am going to add
this to migration documentation.  Will wait for any question you had
before adding it.

Later, Juan.

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 11/21] migration: Create multifd packet
  2018-05-09 11:12       ` Dr. David Alan Gilbert
@ 2018-05-09 19:46         ` Juan Quintela
  2018-05-11 16:36           ` Dr. David Alan Gilbert
  0 siblings, 1 reply; 60+ messages in thread
From: Juan Quintela @ 2018-05-09 19:46 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, lvivier, peterx

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
>> > * Juan Quintela (quintela@redhat.com) wrote:
>> > I think that needs validating to ensure that the source didn't
>> > send us junk and cause us to overwrite after the end of block->host
>> 
>>         if (offset > block->used_length) {
>>             error_setg(errp, "multifd: offest too long %" PRId64
>>                        " (max %" PRId64 ")",
>>                        offset, block->max_length);
>>             return -1;
>>         }
>> ??
>
> It's probably  (offset + TARGET_PAGE_SIZE) that needs checking
> but it needs doing in a wrap-safe way.
>

        if ((offset + TARGET_PAGE_SIZE) < offset) {
            error_setg(errp, "multifd: offset %" PRId64 " wraps around"
                       " with offset: %" PRId64, offset, block->max_length);
            return -1;
        }
        if ((offset + TARGET_PAGE_SIZE) > block->used_length) {
            error_setg(errp, "multifd: offset too long %" PRId64
                       " (max %" PRId64 ")",
                       offset, block->max_length);
            return -1;
        }

Sometimes I wonder how is that we don't have

ramblock_contains_range(ramblock, start, size);

But well, c'est la vie.

Later, Juan.

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 13/21] migration: Calculate transferred ram correctly
  2018-05-02 18:59   ` Dr. David Alan Gilbert
  2018-05-09 11:14     ` Juan Quintela
@ 2018-05-09 19:46     ` Juan Quintela
  1 sibling, 0 replies; 60+ messages in thread
From: Juan Quintela @ 2018-05-09 19:46 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, lvivier, peterx

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> On multifd we send data from more places that main channel.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/migration.c | 11 +++++++++--
>>  1 file changed, 9 insertions(+), 2 deletions(-)
>> 
>> diff --git a/migration/migration.c b/migration/migration.c
>> index 9b510a809a..75d30661e9 100644
>> --- a/migration/migration.c
>> +++ b/migration/migration.c
>> @@ -2246,12 +2246,19 @@ static void migration_update_counters(MigrationState *s,
>>  {
>>      uint64_t transferred, time_spent;
>>      double bandwidth;
>> +    uint64_t now;
>>  
>>      if (current_time < s->iteration_start_time + BUFFER_DELAY) {
>>          return;
>>      }
>>  
>> -    transferred = qemu_ftell(s->to_dst_file) - s->iteration_initial_bytes;
>> +    if (migrate_use_multifd()) {
>> +        now = ram_counters.normal * qemu_target_page_size()
>> +            + qemu_ftell(s->to_dst_file);
>
> OK, can I just confirm, the 'multifd packets' go over the main fd, and
> just the pags are going over the other fd's?

Nope.  We send (at the end) pages and metadata pages through the
multifd channels.  We don't send anything over the normal page for
multifd pages.  Multifd metadata is not taking into account.  It is
multifd_send_state->seq * sizeof(packet).  Fixing that.

> In which case this is
> right; but if the headers are going over the other fd's as well then
> this is wrong.

They are small, but still (aronud 64 offsets + something 20 bytes of
header), once every 64 pages, but as said, it is not difficult to do the
right thing.

Thanks, Juan.

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 16/21] migration: Synchronize multifd threads with main thread
  2018-05-09 19:45     ` Juan Quintela
@ 2018-05-11 16:32       ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 60+ messages in thread
From: Dr. David Alan Gilbert @ 2018-05-11 16:32 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> > * Juan Quintela (quintela@redhat.com) wrote:
> >> We synchronize all threads each RAM_SAVE_FLAG_EOS.  Bitmap
> >> synchronizations don't happen inside a  ram section, so we are safe
> >> about two channels trying to overwrite the same memory.
> >
> > OK, that's quite neat - so you don't need any extra flags in the stream
> > to do the sync;  it probably needs a comment in the code somewhere so we
> > don't forget!
> 
> Thanks.
> 
> >> Signed-off-by: Juan Quintela <quintela@redhat.com>
> >> ---
> >>  migration/ram.c        | 118 +++++++++++++++++++++++++++++++++++++----
> >>  migration/trace-events |   6 +++
> >>  2 files changed, 113 insertions(+), 11 deletions(-)
> >> 
> >> diff --git a/migration/ram.c b/migration/ram.c
> >> index c4c185cc4c..398cb0af3b 100644
> >> --- a/migration/ram.c
> >> +++ b/migration/ram.c
> >> @@ -405,6 +405,8 @@ static void compress_threads_save_setup(void)
> >>  #define MULTIFD_MAGIC 0x11223344U
> >>  #define MULTIFD_VERSION 1
> >>  
> >> +#define MULTIFD_FLAG_SYNC (1 << 0)
> >> +
> >>  typedef struct {
> >>      uint32_t magic;
> >>      uint32_t version;
> >> @@ -471,6 +473,8 @@ typedef struct {
> >>      uint32_t num_packets;
> >>      /* pages sent through this channel */
> >>      uint32_t num_pages;
> >> +    /* syncs main thread and channels */
> >> +    QemuSemaphore sem_sync;
> >>  }  MultiFDSendParams;
> >>  
> >>  typedef struct {
> >> @@ -507,6 +511,8 @@ typedef struct {
> >>      uint32_t num_packets;
> >>      /* pages sent through this channel */
> >>      uint32_t num_pages;
> >> +    /* syncs main thread and channels */
> >> +    QemuSemaphore sem_sync;
> >>  } MultiFDRecvParams;
> >>  
> >>  static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
> >> @@ -682,6 +688,10 @@ struct {
> >>      int count;
> >>      /* array of pages to sent */
> >>      MultiFDPages_t *pages;
> >> +    /* syncs main thread and channels */
> >> +    QemuSemaphore sem_sync;
> >> +    /* global number of generated multifd packets */
> >> +    uint32_t seq;
> >
> > It's interesting you use the same comment for 'seq' in
> > MultiFDSendParams - but I guess that means only this one is the global
> > version and the others aren't really global number - they're just
> > local to that thread?
> 
> Only place that "increases/generates" seq is multifd_send_pages(), that
> is what creates a new packet to be sent.  So, if we see _any_ packet on
> the wire, we know the real global ordering.  They are only used for
> traces, to se that packet 42 was sent through channel 3, and on
> reception you check that packet 42 is what you received through channel
> 3.  They only appears on traces, but I find they useful for debugging
> synchcronization errors.

Ah, and multifd_send_pages is the main thread, and it always operates
on the multifd_send_state->seq and then passes it to the SendParams; OK.
I'm not sure how to explain that better; but it's a little confusing.

> >> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> >> +        MultiFDSendParams *p = &multifd_send_state->params[i];
> >> +
> >> +        trace_multifd_send_sync_main_signal(p->id);
> >> +
> >> +        qemu_mutex_lock(&p->mutex);
> >> +        p->flags |= MULTIFD_FLAG_SYNC;
> >> +        p->pending_job++;
> >> +        qemu_mutex_unlock(&p->mutex);
> >> +        qemu_sem_post(&p->sem);
> >> +    }
> 
> [1]
> 
> >> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> >> +        MultiFDSendParams *p = &multifd_send_state->params[i];
> >> +
> >> +        trace_multifd_send_sync_main_wait(p->id);
> >> +        qemu_sem_wait(&multifd_send_state->sem_sync);
> >> +    }
> 
> [2]
> 
> >> +    trace_multifd_send_sync_main(multifd_send_state->seq);
> >> +}
> >> +
> >
> > OK, so this just makes each of the sending threads ack, so that seems
> > OK.
> > But what happens with an error? multifd_send_sync_main exits it's
> > loop with a 'break' if the writes fail, and that could mean they never
> > come and post the flag-sync sem.
> 
> Let's see.
> 
> [1]: we are just doing mutex_lock/sem_post(), if we are not able to do
> that, we have got a big race that needs to be fixed.  So that bit is ok.
> 
> [2]: We do an unconditional sem_wait().  Looking at the worker code.
>      In this patch level, we are ok, but I agree with you than in later
>      patches, we need to also do the post on the error case.  Changing.
K.


> >> +
> >> +        trace_multifd_recv_sync_main_wait(p->id);
> >> +        qemu_sem_wait(&multifd_recv_state->sem_sync);
> >> +        qemu_mutex_lock(&p->mutex);
> >> +        if (multifd_recv_state->seq < p->seq) {
> >> +            multifd_recv_state->seq = p->seq;
> >> +        }
> >
> > Can you explain what this is for?
> > Something like the latest received block?
> 
> When we are at a synhronization point, we don't know on the main thread
> when that synchronization happened (at what packet considered as a
> logical list of packages).  So, we choose 'seq' from the channel with
> the highest number.   That is the one that we want.  We only use this
> for tracing, so we can "match" that we did a synchronization on the send
> side at packet N and we see the trace at reception side that we did it
> at packet N also.

OK, I think I see; again, this code is main thread, and it's
going around all the subthreads; so it's updating the central copy
seeing who has been received - OK.

> Remember than in a  previous patch you asked me what happened if this
> does a wark around?  At that point nothing.  But now I need to change
> this code to be.
> 
> 
>     multifd_recv_state->seq = 0;
>     for (i = 0; i < migrate_multifd_channels(); i++) {
>         MultiFDRecvParams *p = &multifd_recv_state->params[i];
>         ...
>         if (multifd_recv_state->seq < p->seq) {
>             multifd_recv_state->seq = p->seq;
>         }
> 
> And I have fixed the workaround problem, no?

Yes.  Adding a note somewhat saying it's just for debug would help as
well.

> >> @@ -933,9 +1019,8 @@ static void *multifd_recv_thread(void *opaque)
> >>      trace_multifd_recv_thread_start(p->id);
> >>  
> >>      while (true) {
> >> -        qemu_sem_wait(&p->sem);
> >>          qemu_mutex_lock(&p->mutex);
> >> -        if (p->pending_job) {
> >> +        if (true || p->pending_job) {
> >
> > A TODO I guess???
> 
> Oops, that should be out.
> 
> Fixed on next version.
> 
> >>              uint32_t used;
> >>              uint32_t flags;
> >>              qemu_mutex_unlock(&p->mutex);
> >> @@ -956,14 +1041,18 @@ static void *multifd_recv_thread(void *opaque)
> >>              p->num_packets++;
> >>              p->num_pages += used;
> >>              qemu_mutex_unlock(&p->mutex);
> >> +
> >> +            if (flags & MULTIFD_FLAG_SYNC) {
> >> +                qemu_sem_post(&multifd_recv_state->sem_sync);
> >> +                qemu_sem_wait(&p->sem_sync);
> >> +            }
> >
> > Can you explain the receive side logic - I think this is waiting for all
> > receive threads to 'ack' - but how do we know that they've finished
> > receiving all data that was sent?
> 
> Because they need to receive a packet with MULTIFD_FLAG_SYNC sent.  And
> if they receive that flag, we know that is the last one of the sequence.
> 
> synchrconization works like (2 channels to make things easy):
> 
>                 main thread:
>                 we finish a RAM_SECTION;
>                 flush pending packets to one of the channels
>                 send packet with MULTIFD_FLAG_SYNC for all the channels
>                 wait unil all the channels have processesed the FLAG_SYNC
>                 At this point send the RAM_SECTION_EOS footer.
> 
> worker1                                                worker 2
> 
> if there is a pending packet, send it                  if there is a pending packet, send it
> (notice that there can't be more than one ever)
> send a pacet with SYNC flag set                        send a pacet with SYNC flag set
> 
> On recetpion side
> 
> 
>               main thread
>               receives RAM_SECTION_EOS footer
>               wait for works to receive a sync
> 
> worker1                                                worker1
> process any pending packet(no sync)                    process any pending packet(no sync)
> process packet with SYNC                               process packet with SYNC
> post main thread                                       post main thread
> 
>               now main thread can continue
> 
> Notice that we don't care what happens first, receiving packet with SYNC
> in workeers or RAM_SECTION_EOS on main thread, all works as expected.
> 
> Noticing how long took to explain this, I think that I am going to add
> this to migration documentation.  Will wait for any question you had
> before adding it.

Thanks; that I think makes sense.

Dave

> Later, Juan.
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 11/21] migration: Create multifd packet
  2018-05-09 19:46         ` Juan Quintela
@ 2018-05-11 16:36           ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 60+ messages in thread
From: Dr. David Alan Gilbert @ 2018-05-11 16:36 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> > * Juan Quintela (quintela@redhat.com) wrote:
> >> "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> >> > * Juan Quintela (quintela@redhat.com) wrote:
> >> > I think that needs validating to ensure that the source didn't
> >> > send us junk and cause us to overwrite after the end of block->host
> >> 
> >>         if (offset > block->used_length) {
> >>             error_setg(errp, "multifd: offest too long %" PRId64
> >>                        " (max %" PRId64 ")",
> >>                        offset, block->max_length);
> >>             return -1;
> >>         }
> >> ??
> >
> > It's probably  (offset + TARGET_PAGE_SIZE) that needs checking
> > but it needs doing in a wrap-safe way.
> >
> 
>         if ((offset + TARGET_PAGE_SIZE) < offset) {
>             error_setg(errp, "multifd: offset %" PRId64 " wraps around"
>                        " with offset: %" PRId64, offset, block->max_length);
>             return -1;
>         }
>         if ((offset + TARGET_PAGE_SIZE) > block->used_length) {
>             error_setg(errp, "multifd: offset too long %" PRId64
>                        " (max %" PRId64 ")",
>                        offset, block->max_length);
>             return -1;
>         }

How about:
   if (offset > (block->used_length - TARGET_PAGE_SIZE)) {
    ....
   }
  (*assuming that block->used_length is always at least a
TARGET_PAGE_SIZE ?)

Dave

> Sometimes I wonder how is that we don't have
> 
> ramblock_contains_range(ramblock, start, size);
> 
> But well, c'est la vie.
> 
> Later, Juan.
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 18/21] migration: Start sending messages
  2018-05-03 14:55   ` Dr. David Alan Gilbert
@ 2018-05-23 10:51     ` Juan Quintela
  0 siblings, 0 replies; 60+ messages in thread
From: Juan Quintela @ 2018-05-23 10:51 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, lvivier, peterx

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/ram.c | 30 ++++++++++++++++++++++++------
>>  1 file changed, 24 insertions(+), 6 deletions(-)
>> 
>> diff --git a/migration/ram.c b/migration/ram.c
>> index 862ec53d32..9adbaa81f9 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -625,9 +625,6 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
>>      RAMBlock *block;
>>      int i;
>>  
>> -    /* ToDo: We can't use it until we haven't received a message */
>> -    return 0;
>> -
>>      be32_to_cpus(&packet->magic);
>>      if (packet->magic != MULTIFD_MAGIC) {
>>          error_setg(errp, "multifd: received packet "
>> @@ -851,6 +848,7 @@ static void *multifd_send_thread(void *opaque)
>>  {
>>      MultiFDSendParams *p = opaque;
>>      Error *local_err = NULL;
>> +    int ret;
>>  
>>      trace_multifd_send_thread_start(p->id);
>>  
>> @@ -878,10 +876,18 @@ static void *multifd_send_thread(void *opaque)
>>  
>>              trace_multifd_send(p->id, seq, used, flags);
>>  
>> -            /* ToDo: send packet here */
>> +            ret = qio_channel_write_all(p->c, (void *)p->packet,
>> +                                        p->packet_len, &local_err);
>> +            if (ret != 0) {
>> +                break;
>> +            }
>> +
>> +            ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err);
>> +            if (ret != 0) {
>> +                break;
>> +            }
>>  
>>              qemu_mutex_lock(&p->mutex);
>> -            p->flags = 0;
>
> What's this change?

Leftover from previous approach on patch 16, we already do that
assignment several lines before.  Removed it on patch 16 as it should.

Thanks, Juan.

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 14/21] migration: Multifd channels always wait on the sem
  2018-05-03  9:36   ` Dr. David Alan Gilbert
@ 2018-05-23 10:59     ` Juan Quintela
  0 siblings, 0 replies; 60+ messages in thread
From: Juan Quintela @ 2018-05-23 10:59 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, lvivier, peterx

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> Either for quit, sync or packet, we first wake them.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/ram.c | 13 +++++++++++--
>>  1 file changed, 11 insertions(+), 2 deletions(-)
>> 
>> diff --git a/migration/ram.c b/migration/ram.c
>> index 0f1340b4e3..21b448c4ed 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -754,6 +754,7 @@ static void *multifd_send_thread(void *opaque)
>>      p->num_packets = 1;
>>  
>>      while (true) {
>> +        qemu_sem_wait(&p->sem);
>>          qemu_mutex_lock(&p->mutex);
>>          multifd_send_fill_packet(p);
>>          if (p->quit) {
>> @@ -761,7 +762,9 @@ static void *multifd_send_thread(void *opaque)
>>              break;
>>          }
>>          qemu_mutex_unlock(&p->mutex);
>> -        qemu_sem_wait(&p->sem);
>> +        /* this is impossible */
>> +        error_setg(&local_err, "multifd_send_thread: Unknown command");
>> +        break;
>
> This error disappears in a later patch saying that you can have spurious
> wakeups.
>
>>      }
>>  
>>  out:
>> @@ -905,6 +908,7 @@ static void *multifd_recv_thread(void *opaque)
>>      trace_multifd_recv_thread_start(p->id);
>>  
>>      while (true) {
>> +        qemu_sem_wait(&p->sem);
>>          qemu_mutex_lock(&p->mutex);
>
> All this stuff seems to change again in later patches.

Tricky :-(

Except, for patches 18, 19 and 20, everything else is independent and
works in every interval.  18 and 19 don't work (for multifd).  So, I
have the option of:
- creating a bigger patch that is more difficult to understand (my
  humble opinion)
- or having it split logically but that they don't work.

The real problem is that before we really send data through the
channels, we synchronize on one sem (that is reception).  After we start
sending data through the channels, reception synchronizes in one
read().  "faking" synchronizations on reads() while you are not reading
is "interesting".

Later, JUan.

^ permalink raw reply	[flat|nested] 60+ messages in thread

* Re: [Qemu-devel] [PATCH v12 17/21] migration: Create ram_multifd_page
  2018-04-26  8:18   ` Peter Xu
  2018-05-03 11:30     ` Dr. David Alan Gilbert
@ 2018-05-23 11:13     ` Juan Quintela
  1 sibling, 0 replies; 60+ messages in thread
From: Juan Quintela @ 2018-05-23 11:13 UTC (permalink / raw)
  To: Peter Xu; +Cc: qemu-devel, dgilbert, lvivier

Peter Xu <peterx@redhat.com> wrote:
> On Wed, Apr 25, 2018 at 01:27:19PM +0200, Juan Quintela wrote:
>> The function still don't use multifd, but we have simplified
>> ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
>> counter.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> 
>> --
>> Add last_page parameter
>> Add commets for done and address
>> Remove multifd field, it is the same than normal pages
>> Merge next patch, now we send multiple pages at a time
>> Remove counter for multifd pages, it is identical to normal pages
>> Use iovec's instead of creating the equivalent.
>> Clear memory used by pages (dave)
>> Use g_new0(danp)
>> define MULTIFD_CONTINUE
>> now pages member is a pointer
>> Fix off-by-one in number of pages in one packet
>> Remove RAM_SAVE_FLAG_MULTIFD_PAGE
>> s/multifd_pages_t/MultiFDPages_t/
>> ---
>>  migration/ram.c | 93 +++++++++++++++++++++++++++++++++++++++++++++++++
>>  1 file changed, 93 insertions(+)
>> 
>> diff --git a/migration/ram.c b/migration/ram.c
>> index 398cb0af3b..862ec53d32 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -54,6 +54,7 @@
>>  #include "migration/block.h"
>>  #include "sysemu/sysemu.h"
>>  #include "qemu/uuid.h"
>> +#include "qemu/iov.h"
>>  
>>  /***********************************************************/
>>  /* ram save/restore */
>> @@ -692,8 +693,65 @@ struct {
>>      QemuSemaphore sem_sync;
>>      /* global number of generated multifd packets */
>>      uint32_t seq;
>> +    /* send channels ready */
>> +    QemuSemaphore channels_ready;
>>  } *multifd_send_state;
>>  
>> +static void multifd_send_pages(void)
>> +{
>> +    int i;
>> +    static int next_channel;
>> +    MultiFDSendParams *p = NULL; /* make happy gcc */
>> +    MultiFDPages_t *pages = multifd_send_state->pages;
>> +
>> +    qemu_sem_wait(&multifd_send_state->channels_ready);
>
> This sem is posted when a thread has finished its work.  However this
> is called in the main migration thread.  If with this line, are the
> threads really sending things in parallel?  Since it looks to me that
> this function (and the main thread) won't send the 2nd page array if
> the 1st hasn't finished, and won't send the 3rd if the 2nd hasn't,
> vice versa...
>
> Maybe I misunderstood something.  Please feel free to correct me.

@@ -824,6 +888,7 @@ static void *multifd_send_thread(void *opaque)
             if (flags & MULTIFD_FLAG_SYNC) {
                 qemu_sem_post(&multifd_send_state->sem_sync);
             }
+            qemu_sem_post(&multifd_send_state->channels_ready);
         } else if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;


Notice this bit on multifd_send_thread.  We are adding one to channels_ready.
But we can enter there for two reasons:
- we need to send a new packet full of pages
- we need to send a syncronization packet.
  this is what happens when we start.

Before the main thread start, all the other channels have to be created,
so we have that semaphore initialized to the right number of channels.

How do you preffer this to be documented?

Later, Juan.



>> +    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
>> +        p = &multifd_send_state->params[i];
>> +
>> +        qemu_mutex_lock(&p->mutex);
>> +        if (!p->pending_job) {
>> +            p->pending_job++;
>> +            next_channel = (i + 1) % migrate_multifd_channels();
>> +            break;
>> +        }
>> +        qemu_mutex_unlock(&p->mutex);
>> +    }
>> +    p->pages->used = 0;
>> +    multifd_send_state->seq++;
>> +    p->seq = multifd_send_state->seq;
>> +    p->pages->block = NULL;
>> +    multifd_send_state->pages = p->pages;
>> +    p->pages = pages;

[1]

>
> Here we directly replaced MultiFDSendParams.pages with
> multifd_send_state->pages.  Then are we always using a single
> MultiFDPages_t struct?  And if so, will all the initial
> MultiFDSendParams.pages memory leaked without freed?

Multifdsend_state_pages is stored in pages variable.
We stored it at [1].

What we have (sending side) is:

- 1 multifd_pages by channel
- 1 multifd_pages by main thread

What we do here is:

  pages = multifd_send_state->pages;
  multifd_send_state->pages = channel[i]->pages;
  channel[i]->pages = pages;

So we are just doing a swap.  We do that through the whole loop to have
smaller names (ii.e. basically we do everything over pages->foo), buht
the idea is what I put there. (Ok, what I called channel[i] is "p").

But everywhere on that file (compression threads and multifd ones) use p
to mean the parameters of a thread.

Later, Juan.

^ permalink raw reply	[flat|nested] 60+ messages in thread

end of thread, other threads:[~2018-05-23 11:13 UTC | newest]

Thread overview: 60+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 01/21] migration: Set error state in case of error Juan Quintela
2018-05-02 15:53   ` Dr. David Alan Gilbert
2018-05-09  8:15     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 02/21] migration: Introduce multifd_recv_new_channel() Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 03/21] migration: terminate_* can be called for other threads Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 04/21] migration: Be sure all recv channels are created Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 05/21] migration: Export functions to create send channels Juan Quintela
2018-04-26  7:28   ` Peter Xu
2018-05-09  8:05     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 06/21] migration: Create multifd channels Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 07/21] migration: Delay start of migration main routines Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 08/21] migration: Transmit initial package through the multifd channels Juan Quintela
2018-05-02 17:19   ` Dr. David Alan Gilbert
2018-05-09  8:34     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 09/21] migration: Define MultifdRecvParams sooner Juan Quintela
2018-05-02 17:32   ` Dr. David Alan Gilbert
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 10/21] migration: Create multipage support Juan Quintela
2018-04-26  7:15   ` Peter Xu
2018-05-09 10:52     ` Juan Quintela
2018-05-02 17:52   ` Dr. David Alan Gilbert
2018-05-09 10:53     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 11/21] migration: Create multifd packet Juan Quintela
2018-05-02 18:04   ` Dr. David Alan Gilbert
2018-05-09 11:09     ` Juan Quintela
2018-05-09 11:12       ` Dr. David Alan Gilbert
2018-05-09 19:46         ` Juan Quintela
2018-05-11 16:36           ` Dr. David Alan Gilbert
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 12/21] migration: Add multifd traces for start/end thread Juan Quintela
2018-05-02 18:35   ` Dr. David Alan Gilbert
2018-05-09 11:11     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 13/21] migration: Calculate transferred ram correctly Juan Quintela
2018-05-02 18:59   ` Dr. David Alan Gilbert
2018-05-09 11:14     ` Juan Quintela
2018-05-09 19:46     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 14/21] migration: Multifd channels always wait on the sem Juan Quintela
2018-05-03  9:36   ` Dr. David Alan Gilbert
2018-05-23 10:59     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 15/21] migration: Add block where to send/receive packets Juan Quintela
2018-05-03 10:03   ` Dr. David Alan Gilbert
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 16/21] migration: Synchronize multifd threads with main thread Juan Quintela
2018-05-03 10:44   ` Dr. David Alan Gilbert
2018-05-09 19:45     ` Juan Quintela
2018-05-11 16:32       ` Dr. David Alan Gilbert
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 17/21] migration: Create ram_multifd_page Juan Quintela
2018-04-26  7:43   ` Peter Xu
2018-04-26  8:18   ` Peter Xu
2018-05-03 11:30     ` Dr. David Alan Gilbert
2018-05-23 11:13     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 18/21] migration: Start sending messages Juan Quintela
2018-05-03 14:55   ` Dr. David Alan Gilbert
2018-05-23 10:51     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 19/21] migration: Wait for blocking IO Juan Quintela
2018-05-03 15:04   ` Dr. David Alan Gilbert
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 20/21] migration: Remove not needed semaphore and quit Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 21/21] migration: Stop sending whole pages through main channel Juan Quintela
2018-05-03 15:24   ` Dr. David Alan Gilbert
2018-04-25 11:44 ` [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
2018-05-03 15:32   ` Dr. David Alan Gilbert
2018-04-26  8:28 ` Peter Xu

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.