All of lore.kernel.org
 help / color / mirror / Atom feed
* [Qemu-devel] [RFC v11 00/15] mutifd
@ 2018-03-16 11:53 Juan Quintela
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 01/15] migration: Set error state in case of error Juan Quintela
                   ` (15 more replies)
  0 siblings, 16 replies; 28+ messages in thread
From: Juan Quintela @ 2018-03-16 11:53 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Multifd


Hi

[v11]

Changes on top of previous sumbimission:
- Now on top of migration-tests/v6 that I sent on Wednesday
- Rebased to latest upstream
- Everything that is sent through the network should be converted correctly
  (famous last words)
- Still on RFC (sometimes it ends some packets at the end), just to
  show how things are going on.  Problems are only on the last patch.

- Redo some locking (again) Now the problem is being able te send the
  synchronization through the multifd channels.  I end the migration
  _before_ all the channels have recevied all the packets.

- Trying to get a flags argument into each packet, to be able to synchronze
  through the network, not from the "main" incoming corroutine.

- Related to the network-safe fields, now everything is in its own
  routine, it should be easier to understand/review.  Once there, I
  check that all values are inside range.

So, please comment.

Thanks, Juan.


[v10]
Lots of changes from previous versions:
a - everything is sent now through the multifd channels, nothing is sent through main channel
b - locking is band new, I was getting into a hole with the previous approach, right now, there is a single way to
    do locking (both source and destination)
       main thread : sets a ->sync variable for each thread and wakeps it
       multifd threads: clean the variable and signal (sem) back to main thread

    using this for either:
    - all threads have started
    - we need to synchronize after each round through memory
    - all threads have finished

c - I have to use a qio watcher for a thread to wait for ready data to read

d - lots of cleanups

e - to make things easier, I have included the missing tests stuff on
    this round of patches, because they build on top of them

f - lots of traces, it is now much easier to follow what is happening

Now, why it is an RFC:

- in the last patch, there is still race between the whatcher, the
  ->quit of the threads and the last synchronization.  Techinically they
  are done in oder, but in practice, they are hanging sometimes.

- I *know* I can optimize the synchronization of the threads sending
  the "we start a new round" through the multifd channels, have to add a flag here.

- Not having a thread on the incoming side  is a mess, I can't block waiting for things to happen :-(

- When doing the synchronization, I need to optimize the sending of the "not finished packet" of pages, working on that.

please, take a look and review.

Thanks, Juan.

[v9]

This series is on top of my migration test series just sent, only reject should be on the test system, though.

On v9 series for you:
- qobject_unref() as requested by dan

  Yes he was right, I had a reference leak for _non_ multifd, I
  *thought* he mean for multifd, and that took a while to understand
  (and then find when/where).

- multifd page count: it is dropped for good
- uuid handling: we use the default qemu uuid of 0000...
- uuid handling: using and struct and sending the struct
  * idea is to add a size field and add more parameter after that
  * anyone has a good idea how to "ouptut" info
    migrate_capabilities/parameters json into a string and how to read it back?
- changed how we test that all threads/channels are already created.
  Should be more robust.
- Add tests multifd.  Still not ported on top of migration-tests series sent early
  waiting for review on the ideas there.
- Rebase and remove al the integrated patches (back at 12)

Please, review.

Later, Juan.

[v8]
Things NOT done yet:

- drop x-multifd-page-count?  We can use performance to set a default value
- paolo suggestion of not having a control channel
  needs iyet more cleanups to be able to have more than one ramstate, trying it.
- still not performance done, but it has been very stable

On v8:
- use connect_async
- rename multifd-group to multifd-page-count (danp suggestion)
- rename multifd-threads to multifd-channels (danp suggestion)
- use new qio*channel functions
- Address rest of comments left


So, please review.

My idea will be to pull this changes and continue performance changes
for inside, basically everything is already reviewed.

Thanks, Juan.

On v7:
- tests fixed as danp wanted
- have to revert danp qio_*_all patches, as they break multifd, I have to investigate why.
- error_abort is gone.  After several tries about getting errors, I ended having a single error
  proceted by a lock and first error wins.
- Addressed basically all reviews (see on ToDo)
- Pointers to struct are done now
- fix lots of leaks
- lots of small fixes


[v6]
- Improve migration_ioc_porcess_incoming
- teach about G_SOURCE_REMOVE/CONTINUE
- Add test for migration_has_all_channels
- use DEFIN_PROP*
- change recv_state to use pointers to parameters
  make easier to receive channels out of order
- use g_strdup_printf()
- improve count of threads to know when we have to finish
- report channel id's on errors
- Use last_page parameter for multifd_send_page() sooner
- Improve commets for address
- use g_new0() instead of g_malloc()
- create MULTIFD_CONTINUE instead of using UINT16_MAX
- clear memory used by group of pages
  once there, pass everything to the global state variables instead of being
  local to the function.  This way it works if we cancel migration and start
  a new one
- Really wait to create the migration_thread until all channels are created
- split initial_bytes setup to make clearer following patches.
- createRAM_SAVE_FLAG_MULTIFD_SYNC macro, to make clear what we are doing
- move setting of need_flush to inside bitmap_sync
- Lots of other small changes & reorderings

Please, comment.


[v5]

- tests from qio functions (a.k.a. make danp happy)
- 1st message from one channel to the other contains:
   <uuid> multifd <channel number>
   This would allow us to create more channels as we want them.
   a.k.a. Making dave happy
- Waiting in reception for new channels using qio listeners
  Getting threads, qio and reference counters working at the same time
  was interesing.
  Another make danp happy.

- Lots and lots of small changes and fixes.  Notice that the last 70 patches
  that I merged or so what to make this series easier/smaller.

- NOT DONE: I haven't been woring on measuring performance
  differences, this was about getting the creation of the
  threads/channels right.

So, what I want:

- Are people happy with how I have (ab)used qio channels? (yes danp,
  that is you).
- My understanding is th

ToDo:

- Make paolo happy: He wanted to test using control information
  through each channel, not only pages.  This requires yet more
  cleanups to be able to have more than one QEMUFile/RAMState open at
  the same time.

- How I create multiple channels.  Things I know:
  * with current changes, it should work with fd/channels (the multifd bits),
    but we don;t have a way to pass multiple fd;s or exec files.
    Danp, any idea about how to create an UI for it?
  * My idea is that we would split current code to be:
    + channel creation at migration.c
    + rest of bits at ram.c
    + change format to:
      <uuid> main <rest of migration capabilities/paramentes> so we can check
      <uuid> postcopy <no clue what parameters are needed>
          Dave wanted a way to create a new fd for postcopy for some time
    + Adding new channels is easy

- Performance data/numbers: Yes, I wanted to get this out at once, I
  would continue with this.


Please, review.


[v4]
This is the 4th version of multifd. Changes:
- XBZRLE don't need to be checked for
- Documentation and defaults are consistent
- split socketArgs
- use iovec instead of creating something similar.
- We use now the exported size of target page (another HACK removal)
- created qio_chanel_{wirtev,readv}_all functions.  the _full() name
  was already taken.
  What they do is the same that the without _all() function, but if it
  returns due to blocking it redo the call.
- it is checkpatch.pl clean now.

Please comment, Juan.

Juan Quintela (15):
  migration: Set error state in case of error
  migration: In case of error just end the migration
  migration: terminate_* can be called for other threads
  migration: Introduce multifd_recv_new_channel()
  migration: Be sure all recv channels are created
  migration: Export functions to create send channels
  migration: Synchronize send threads
  migration: Synchronize recv threads
  migration: Add multifd traces for start/end thread
  migration: Create multifd channels
  migration: Delay start of migration main routines
  migration: Transmit initial package through the multifd channels
  migration: Create ram_multifd_page
  migration: Create pages structure for reception
  [RFC] migration: Send pages through the multifd channels

 migration/migration.c  |  13 +-
 migration/migration.h  |   1 +
 migration/ram.c        | 671 ++++++++++++++++++++++++++++++++++++++++++++++---
 migration/ram.h        |   3 +
 migration/socket.c     |  36 ++-
 migration/socket.h     |   7 +
 migration/trace-events |  12 +
 7 files changed, 701 insertions(+), 42 deletions(-)

-- 
2.14.3

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH v11 01/15] migration: Set error state in case of error
  2018-03-16 11:53 [Qemu-devel] [RFC v11 00/15] mutifd Juan Quintela
@ 2018-03-16 11:53 ` Juan Quintela
  2018-03-16 17:49   ` Daniel P. Berrangé
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 02/15] migration: In case of error just end the migration Juan Quintela
                   ` (14 subsequent siblings)
  15 siblings, 1 reply; 28+ messages in thread
From: Juan Quintela @ 2018-03-16 11:53 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 20 ++++++++++++++++++++
 1 file changed, 20 insertions(+)

diff --git a/migration/ram.c b/migration/ram.c
index 7266351fd0..1b8095a358 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -414,6 +414,16 @@ static void terminate_multifd_send_threads(Error *errp)
 {
     int i;
 
+    if (errp) {
+        MigrationState *s = migrate_get_current();
+        migrate_set_error(s, errp);
+        if (s->state == MIGRATION_STATUS_SETUP ||
+            s->state == MIGRATION_STATUS_ACTIVE) {
+            migrate_set_state(&s->state, s->state,
+                              MIGRATION_STATUS_FAILED);
+        }
+    }
+
     for (i = 0; i < multifd_send_state->count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
@@ -514,6 +524,16 @@ static void terminate_multifd_recv_threads(Error *errp)
 {
     int i;
 
+    if (errp) {
+        MigrationState *s = migrate_get_current();
+        migrate_set_error(s, errp);
+        if (s->state == MIGRATION_STATUS_SETUP ||
+            s->state == MIGRATION_STATUS_ACTIVE) {
+            migrate_set_state(&s->state, s->state,
+                              MIGRATION_STATUS_FAILED);
+        }
+    }
+
     for (i = 0; i < multifd_recv_state->count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH v11 02/15] migration: In case of error just end the migration
  2018-03-16 11:53 [Qemu-devel] [RFC v11 00/15] mutifd Juan Quintela
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 01/15] migration: Set error state in case of error Juan Quintela
@ 2018-03-16 11:53 ` Juan Quintela
  2018-03-16 17:49   ` Daniel P. Berrangé
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 03/15] migration: terminate_* can be called for other threads Juan Quintela
                   ` (13 subsequent siblings)
  15 siblings, 1 reply; 28+ messages in thread
From: Juan Quintela @ 2018-03-16 11:53 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>

--

As requested, just continue connection in case of error.
---
 migration/socket.c | 5 +----
 1 file changed, 1 insertion(+), 4 deletions(-)

diff --git a/migration/socket.c b/migration/socket.c
index 52db0c0c09..8dda1d9a98 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -140,9 +140,7 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
     sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc),
                                      &err);
     if (!sioc) {
-        error_report("could not accept migration connection (%s)",
-                     error_get_pretty(err));
-        goto out;
+        return G_SOURCE_CONTINUE;
     }
 
     trace_migration_socket_incoming_accepted();
@@ -151,7 +149,6 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
     migration_channel_process_incoming(QIO_CHANNEL(sioc));
     object_unref(OBJECT(sioc));
 
-out:
     if (migration_has_all_channels()) {
         /* Close listening socket as its no longer needed */
         qio_channel_close(ioc, NULL);
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH v11 03/15] migration: terminate_* can be called for other threads
  2018-03-16 11:53 [Qemu-devel] [RFC v11 00/15] mutifd Juan Quintela
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 01/15] migration: Set error state in case of error Juan Quintela
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 02/15] migration: In case of error just end the migration Juan Quintela
@ 2018-03-16 11:53 ` Juan Quintela
  2018-03-16 17:51   ` Daniel P. Berrangé
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 04/15] migration: Introduce multifd_recv_new_channel() Juan Quintela
                   ` (12 subsequent siblings)
  15 siblings, 1 reply; 28+ messages in thread
From: Juan Quintela @ 2018-03-16 11:53 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Once there, make  count field to always be accessed with atomic
operations.  To make blocking operations, we need to know that the
thread is running, so create a bool to indicate that.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--

Once here, s/terminate_multifd_*-threads/multifd_*_terminate_threads/
This is consistente with every other function
---
 migration/ram.c | 38 ++++++++++++++++++++++++--------------
 1 file changed, 24 insertions(+), 14 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 1b8095a358..2d51c8b94c 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -400,6 +400,7 @@ struct MultiFDSendParams {
     QemuThread thread;
     QemuSemaphore sem;
     QemuMutex mutex;
+    bool running;
     bool quit;
 };
 typedef struct MultiFDSendParams MultiFDSendParams;
@@ -410,7 +411,7 @@ struct {
     int count;
 } *multifd_send_state;
 
-static void terminate_multifd_send_threads(Error *errp)
+static void multifd_send_terminate_threads(Error *errp)
 {
     int i;
 
@@ -424,7 +425,7 @@ static void terminate_multifd_send_threads(Error *errp)
         }
     }
 
-    for (i = 0; i < multifd_send_state->count; i++) {
+    for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
         qemu_mutex_lock(&p->mutex);
@@ -442,11 +443,13 @@ int multifd_save_cleanup(Error **errp)
     if (!migrate_use_multifd()) {
         return 0;
     }
-    terminate_multifd_send_threads(NULL);
-    for (i = 0; i < multifd_send_state->count; i++) {
+    multifd_send_terminate_threads(NULL);
+    for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
-        qemu_thread_join(&p->thread);
+        if (p->running) {
+            qemu_thread_join(&p->thread);
+        }
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
         g_free(p->name);
@@ -466,6 +469,7 @@ static void *multifd_send_thread(void *opaque)
     while (true) {
         qemu_mutex_lock(&p->mutex);
         if (p->quit) {
+            p->running = false;
             qemu_mutex_unlock(&p->mutex);
             break;
         }
@@ -487,7 +491,7 @@ int multifd_save_setup(void)
     thread_count = migrate_multifd_channels();
     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
-    multifd_send_state->count = 0;
+    atomic_set(&multifd_send_state->count, 0);
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
@@ -496,10 +500,11 @@ int multifd_save_setup(void)
         p->quit = false;
         p->id = i;
         p->name = g_strdup_printf("multifdsend_%d", i);
+        p->running = true;
         qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
                            QEMU_THREAD_JOINABLE);
 
-        multifd_send_state->count++;
+        atomic_inc(&multifd_send_state->count);
     }
     return 0;
 }
@@ -510,6 +515,7 @@ struct MultiFDRecvParams {
     QemuThread thread;
     QemuSemaphore sem;
     QemuMutex mutex;
+    bool running;
     bool quit;
 };
 typedef struct MultiFDRecvParams MultiFDRecvParams;
@@ -520,7 +526,7 @@ struct {
     int count;
 } *multifd_recv_state;
 
-static void terminate_multifd_recv_threads(Error *errp)
+static void multifd_recv_terminate_threads(Error *errp)
 {
     int i;
 
@@ -534,7 +540,7 @@ static void terminate_multifd_recv_threads(Error *errp)
         }
     }
 
-    for (i = 0; i < multifd_recv_state->count; i++) {
+    for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         qemu_mutex_lock(&p->mutex);
@@ -552,11 +558,13 @@ int multifd_load_cleanup(Error **errp)
     if (!migrate_use_multifd()) {
         return 0;
     }
-    terminate_multifd_recv_threads(NULL);
-    for (i = 0; i < multifd_recv_state->count; i++) {
+    multifd_recv_terminate_threads(NULL);
+    for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
-        qemu_thread_join(&p->thread);
+        if (p->running) {
+            qemu_thread_join(&p->thread);
+        }
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
         g_free(p->name);
@@ -577,6 +585,7 @@ static void *multifd_recv_thread(void *opaque)
     while (true) {
         qemu_mutex_lock(&p->mutex);
         if (p->quit) {
+            p->running = false;
             qemu_mutex_unlock(&p->mutex);
             break;
         }
@@ -598,7 +607,7 @@ int multifd_load_setup(void)
     thread_count = migrate_multifd_channels();
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
-    multifd_recv_state->count = 0;
+    atomic_set(&multifd_recv_state->count, 0);
     for (i = 0; i < thread_count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
@@ -607,9 +616,10 @@ int multifd_load_setup(void)
         p->quit = false;
         p->id = i;
         p->name = g_strdup_printf("multifdrecv_%d", i);
+        p->running = true;
         qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
                            QEMU_THREAD_JOINABLE);
-        multifd_recv_state->count++;
+        atomic_inc(&multifd_recv_state->count);
     }
     return 0;
 }
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH v11 04/15] migration: Introduce multifd_recv_new_channel()
  2018-03-16 11:53 [Qemu-devel] [RFC v11 00/15] mutifd Juan Quintela
                   ` (2 preceding siblings ...)
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 03/15] migration: terminate_* can be called for other threads Juan Quintela
@ 2018-03-16 11:53 ` Juan Quintela
  2018-03-16 17:51   ` Daniel P. Berrangé
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 05/15] migration: Be sure all recv channels are created Juan Quintela
                   ` (11 subsequent siblings)
  15 siblings, 1 reply; 28+ messages in thread
From: Juan Quintela @ 2018-03-16 11:53 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/migration.c | 3 ++-
 migration/ram.c       | 6 ++++++
 migration/ram.h       | 2 ++
 3 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/migration/migration.c b/migration/migration.c
index 3b811c213a..21c651b4ee 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -448,8 +448,9 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
     if (!mis->from_src_file) {
         QEMUFile *f = qemu_fopen_channel_input(ioc);
         migration_fd_process_incoming(f);
+        return;
     }
-    /* We still only have a single channel.  Nothing to do here yet */
+    multifd_recv_new_channel(ioc);
 }
 
 /**
diff --git a/migration/ram.c b/migration/ram.c
index 2d51c8b94c..f958a7aad3 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -36,6 +36,7 @@
 #include "xbzrle.h"
 #include "ram.h"
 #include "migration.h"
+#include "socket.h"
 #include "migration/register.h"
 #include "migration/misc.h"
 #include "qemu-file.h"
@@ -624,6 +625,11 @@ int multifd_load_setup(void)
     return 0;
 }
 
+void multifd_recv_new_channel(QIOChannel *ioc)
+{
+    /* nothing to do yet */
+}
+
 /**
  * save_page_header: write page header to wire
  *
diff --git a/migration/ram.h b/migration/ram.h
index 53f0021c51..a2031acf59 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -32,6 +32,7 @@
 #include "qemu-common.h"
 #include "qapi/qapi-types-migration.h"
 #include "exec/cpu-common.h"
+#include "io/channel.h"
 
 extern MigrationStats ram_counters;
 extern XBZRLECacheStats xbzrle_counters;
@@ -44,6 +45,7 @@ int multifd_save_setup(void);
 int multifd_save_cleanup(Error **errp);
 int multifd_load_setup(void);
 int multifd_load_cleanup(Error **errp);
+void multifd_recv_new_channel(QIOChannel *ioc);
 
 uint64_t ram_pagesize_summary(void);
 int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len);
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH v11 05/15] migration: Be sure all recv channels are created
  2018-03-16 11:53 [Qemu-devel] [RFC v11 00/15] mutifd Juan Quintela
                   ` (3 preceding siblings ...)
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 04/15] migration: Introduce multifd_recv_new_channel() Juan Quintela
@ 2018-03-16 11:53 ` Juan Quintela
  2018-03-16 17:52   ` Daniel P. Berrangé
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 06/15] migration: Export functions to create send channels Juan Quintela
                   ` (10 subsequent siblings)
  15 siblings, 1 reply; 28+ messages in thread
From: Juan Quintela @ 2018-03-16 11:53 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We need them before we start migration.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/migration.c |  6 +++++-
 migration/ram.c       | 11 +++++++++++
 migration/ram.h       |  1 +
 3 files changed, 17 insertions(+), 1 deletion(-)

diff --git a/migration/migration.c b/migration/migration.c
index 21c651b4ee..b3c6198e12 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -461,7 +461,11 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
  */
 bool migration_has_all_channels(void)
 {
-    return true;
+    bool all_channels;
+
+    all_channels = multifd_recv_all_channels_created();
+
+    return all_channels;
 }
 
 /*
diff --git a/migration/ram.c b/migration/ram.c
index f958a7aad3..7e60fc82a6 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -625,6 +625,17 @@ int multifd_load_setup(void)
     return 0;
 }
 
+bool multifd_recv_all_channels_created(void)
+{
+    int thread_count = migrate_multifd_channels();
+
+    if (!migrate_use_multifd()) {
+        return true;
+    }
+
+    return thread_count == atomic_read(&multifd_recv_state->count);
+}
+
 void multifd_recv_new_channel(QIOChannel *ioc)
 {
     /* nothing to do yet */
diff --git a/migration/ram.h b/migration/ram.h
index a2031acf59..3daf074bcc 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -45,6 +45,7 @@ int multifd_save_setup(void);
 int multifd_save_cleanup(Error **errp);
 int multifd_load_setup(void);
 int multifd_load_cleanup(Error **errp);
+bool multifd_recv_all_channels_created(void);
 void multifd_recv_new_channel(QIOChannel *ioc);
 
 uint64_t ram_pagesize_summary(void);
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH v11 06/15] migration: Export functions to create send channels
  2018-03-16 11:53 [Qemu-devel] [RFC v11 00/15] mutifd Juan Quintela
                   ` (4 preceding siblings ...)
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 05/15] migration: Be sure all recv channels are created Juan Quintela
@ 2018-03-16 11:53 ` Juan Quintela
  2018-03-16 17:53   ` Daniel P. Berrangé
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 07/15] migration: Synchronize send threads Juan Quintela
                   ` (9 subsequent siblings)
  15 siblings, 1 reply; 28+ messages in thread
From: Juan Quintela @ 2018-03-16 11:53 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/socket.c | 28 +++++++++++++++++++++++++++-
 migration/socket.h |  7 +++++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/migration/socket.c b/migration/socket.c
index 8dda1d9a98..7889753fab 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -28,6 +28,28 @@
 #include "trace.h"
 
 
+struct SocketOutgoingArgs {
+    SocketAddress *saddr;
+} outgoing_args;
+
+void socket_send_channel_create(QIOTaskFunc f, void *data)
+{
+    QIOChannelSocket *sioc = qio_channel_socket_new();
+    qio_channel_socket_connect_async(sioc, outgoing_args.saddr,
+                                     f, data, NULL, NULL);
+}
+
+int socket_send_channel_destroy(QIOChannel *send)
+{
+    /* Remove channel */
+    object_unref(OBJECT(send));
+    if (outgoing_args.saddr) {
+        qapi_free_SocketAddress(outgoing_args.saddr);
+        outgoing_args.saddr = NULL;
+    }
+    return 0;
+}
+
 static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
 {
     SocketAddress *saddr;
@@ -95,6 +117,11 @@ static void socket_start_outgoing_migration(MigrationState *s,
     struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
 
     data->s = s;
+
+    /* in case previous migration leaked it */
+    qapi_free_SocketAddress(outgoing_args.saddr);
+    outgoing_args.saddr = saddr;
+
     if (saddr->type == SOCKET_ADDRESS_TYPE_INET) {
         data->hostname = g_strdup(saddr->u.inet.host);
     }
@@ -106,7 +133,6 @@ static void socket_start_outgoing_migration(MigrationState *s,
                                      data,
                                      socket_connect_data_free,
                                      NULL);
-    qapi_free_SocketAddress(saddr);
 }
 
 void tcp_start_outgoing_migration(MigrationState *s,
diff --git a/migration/socket.h b/migration/socket.h
index 6b91e9db38..528c3b0202 100644
--- a/migration/socket.h
+++ b/migration/socket.h
@@ -16,6 +16,13 @@
 
 #ifndef QEMU_MIGRATION_SOCKET_H
 #define QEMU_MIGRATION_SOCKET_H
+
+#include "io/channel.h"
+#include "io/task.h"
+
+void socket_send_channel_create(QIOTaskFunc f, void *data);
+int socket_send_channel_destroy(QIOChannel *send);
+
 void tcp_start_incoming_migration(const char *host_port, Error **errp);
 
 void tcp_start_outgoing_migration(MigrationState *s, const char *host_port,
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH v11 07/15] migration: Synchronize send threads
  2018-03-16 11:53 [Qemu-devel] [RFC v11 00/15] mutifd Juan Quintela
                   ` (5 preceding siblings ...)
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 06/15] migration: Export functions to create send channels Juan Quintela
@ 2018-03-16 11:53 ` Juan Quintela
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 08/15] migration: Synchronize recv threads Juan Quintela
                   ` (8 subsequent siblings)
  15 siblings, 0 replies; 28+ messages in thread
From: Juan Quintela @ 2018-03-16 11:53 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We synchronize all threads each RAM_SAVE_FLAG_EOS.  Bitmap
synchronizations don't happen inside a  ram section, so we are safe
about two channels trying to overwrite the same memory.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c        | 52 +++++++++++++++++++++++++++++++++++++++++++++++++-
 migration/trace-events |  3 +++
 2 files changed, 54 insertions(+), 1 deletion(-)

diff --git a/migration/ram.c b/migration/ram.c
index 7e60fc82a6..6aeb63f6ef 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -403,6 +403,7 @@ struct MultiFDSendParams {
     QemuMutex mutex;
     bool running;
     bool quit;
+    bool sync;
 };
 typedef struct MultiFDSendParams MultiFDSendParams;
 
@@ -410,6 +411,8 @@ struct {
     MultiFDSendParams *params;
     /* number of created threads */
     int count;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_main;
 } *multifd_send_state;
 
 static void multifd_send_terminate_threads(Error *errp)
@@ -456,6 +459,7 @@ int multifd_save_cleanup(Error **errp)
         g_free(p->name);
         p->name = NULL;
     }
+    qemu_sem_destroy(&multifd_send_state->sem_main);
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
     g_free(multifd_send_state);
@@ -463,19 +467,59 @@ int multifd_save_cleanup(Error **errp)
     return ret;
 }
 
+static void multifd_send_sync_main(void)
+{
+    int i;
+
+    if (!migrate_use_multifd()) {
+        return;
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+
+        trace_multifd_send_sync_signal(p->id, p->quit, p->running);
+
+        qemu_mutex_lock(&p->mutex);
+        p->sync = true;
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_post(&p->sem);
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+        bool wait;
+
+        trace_multifd_send_sync_wait(p->id, p->quit, p->running);
+
+        qemu_mutex_lock(&p->mutex);
+        wait = p->running;
+        qemu_mutex_unlock(&p->mutex);
+
+        if (wait) {
+            qemu_sem_wait(&multifd_send_state->sem_main);
+        }
+    }
+    trace_multifd_send_sync_main();
+}
+
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
 
     while (true) {
+        qemu_sem_wait(&p->sem);
         qemu_mutex_lock(&p->mutex);
+        if (p->sync) {
+            p->sync = false;
+            qemu_mutex_unlock(&p->mutex);
+            qemu_sem_post(&multifd_send_state->sem_main);
+            continue;
+        }
         if (p->quit) {
             p->running = false;
             qemu_mutex_unlock(&p->mutex);
             break;
         }
         qemu_mutex_unlock(&p->mutex);
-        qemu_sem_wait(&p->sem);
     }
 
     return NULL;
@@ -493,6 +537,8 @@ int multifd_save_setup(void)
     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
     atomic_set(&multifd_send_state->count, 0);
+    qemu_sem_init(&multifd_send_state->sem_main, 0);
+
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
@@ -507,6 +553,7 @@ int multifd_save_setup(void)
 
         atomic_inc(&multifd_send_state->count);
     }
+
     return 0;
 }
 
@@ -2283,6 +2330,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
 
+    multifd_send_sync_main();
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
 
     return 0;
@@ -2358,6 +2406,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
      */
     ram_control_after_iterate(f, RAM_CONTROL_ROUND);
 
+    multifd_send_sync_main();
 out:
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
     ram_counters.transferred += 8;
@@ -2411,6 +2460,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
 
     rcu_read_unlock();
 
+    multifd_send_sync_main();
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
 
     return 0;
diff --git a/migration/trace-events b/migration/trace-events
index 93961dea16..845612c177 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -77,6 +77,9 @@ ram_load_postcopy_loop(uint64_t addr, int flags) "@%" PRIx64 " %x"
 ram_postcopy_send_discard_bitmap(void) ""
 ram_save_page(const char *rbname, uint64_t offset, void *host) "%s: offset: 0x%" PRIx64 " host: %p"
 ram_save_queue_pages(const char *rbname, size_t start, size_t len) "%s: start: 0x%zx len: 0x%zx"
+multifd_send_sync_main(void) ""
+multifd_send_sync_signal(uint8_t id, bool quit, bool running) "channel %d quit %d running %d"
+multifd_send_sync_wait(uint8_t id, bool quit, bool running) "channel %d quit %d running %d"
 
 # migration/migration.c
 await_return_path_close_on_source_close(void) ""
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH v11 08/15] migration: Synchronize recv threads
  2018-03-16 11:53 [Qemu-devel] [RFC v11 00/15] mutifd Juan Quintela
                   ` (6 preceding siblings ...)
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 07/15] migration: Synchronize send threads Juan Quintela
@ 2018-03-16 11:53 ` Juan Quintela
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 09/15] migration: Add multifd traces for start/end thread Juan Quintela
                   ` (7 subsequent siblings)
  15 siblings, 0 replies; 28+ messages in thread
From: Juan Quintela @ 2018-03-16 11:53 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We synchronize all threads each RAM_SAVE_FLAG_EOS.  Bitmap
synchronizations don't happen inside a  ram section, so we are safe
about two channels trying to overwrite the same memory.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c        | 49 ++++++++++++++++++++++++++++++++++++++++++++++++-
 migration/trace-events |  3 +++
 2 files changed, 51 insertions(+), 1 deletion(-)

diff --git a/migration/ram.c b/migration/ram.c
index 6aeb63f6ef..4ba03cf9c9 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -565,6 +565,7 @@ struct MultiFDRecvParams {
     QemuMutex mutex;
     bool running;
     bool quit;
+    bool sync;
 };
 typedef struct MultiFDRecvParams MultiFDRecvParams;
 
@@ -572,6 +573,8 @@ struct {
     MultiFDRecvParams *params;
     /* number of created threads */
     int count;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_main;
 } *multifd_recv_state;
 
 static void multifd_recv_terminate_threads(Error *errp)
@@ -618,6 +621,7 @@ int multifd_load_cleanup(Error **errp)
         g_free(p->name);
         p->name = NULL;
     }
+    qemu_sem_destroy(&multifd_recv_state->sem_main);
     g_free(multifd_recv_state->params);
     multifd_recv_state->params = NULL;
     g_free(multifd_recv_state);
@@ -626,19 +630,59 @@ int multifd_load_cleanup(Error **errp)
     return ret;
 }
 
+static void multifd_recv_sync_main(void)
+{
+    int i;
+
+    if (!migrate_use_multifd()) {
+        return;
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        trace_multifd_recv_sync_signal(p->id, p->quit, p->running);
+
+        qemu_mutex_lock(&p->mutex);
+        p->sync = true;
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_post(&p->sem);
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+        bool wait;
+
+        trace_multifd_recv_sync_wait(p->id, p->quit, p->running);
+
+        qemu_mutex_lock(&p->mutex);
+        wait = p->running;
+        qemu_mutex_unlock(&p->mutex);
+
+        if (wait) {
+            qemu_sem_wait(&multifd_recv_state->sem_main);
+        }
+    }
+    trace_multifd_recv_sync_main();
+}
+
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
 
     while (true) {
+        qemu_sem_wait(&p->sem);
         qemu_mutex_lock(&p->mutex);
+        if (p->sync) {
+            p->sync = false;
+            qemu_mutex_unlock(&p->mutex);
+            qemu_sem_post(&multifd_recv_state->sem_main);
+            continue;
+        }
         if (p->quit) {
             p->running = false;
             qemu_mutex_unlock(&p->mutex);
             break;
         }
         qemu_mutex_unlock(&p->mutex);
-        qemu_sem_wait(&p->sem);
     }
 
     return NULL;
@@ -656,6 +700,7 @@ int multifd_load_setup(void)
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
     atomic_set(&multifd_recv_state->count, 0);
+    qemu_sem_init(&multifd_recv_state->sem_main, 0);
     for (i = 0; i < thread_count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
@@ -2890,6 +2935,7 @@ static int ram_load_postcopy(QEMUFile *f)
             break;
         case RAM_SAVE_FLAG_EOS:
             /* normal exit */
+            multifd_recv_sync_main();
             break;
         default:
             error_report("Unknown combination of migration flags: %#x"
@@ -3075,6 +3121,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             break;
         case RAM_SAVE_FLAG_EOS:
             /* normal exit */
+            multifd_recv_sync_main();
             break;
         default:
             if (flags & RAM_SAVE_FLAG_HOOK) {
diff --git a/migration/trace-events b/migration/trace-events
index 845612c177..551d325daf 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -80,6 +80,9 @@ ram_save_queue_pages(const char *rbname, size_t start, size_t len) "%s: start: 0
 multifd_send_sync_main(void) ""
 multifd_send_sync_signal(uint8_t id, bool quit, bool running) "channel %d quit %d running %d"
 multifd_send_sync_wait(uint8_t id, bool quit, bool running) "channel %d quit %d running %d"
+multifd_recv_sync_main(void) ""
+multifd_recv_sync_signal(uint8_t id, bool quit, bool running) "channel %d quit %d running %d"
+multifd_recv_sync_wait(uint8_t id, bool quit, bool running) "channel %d quit %d running %d"
 
 # migration/migration.c
 await_return_path_close_on_source_close(void) ""
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH v11 09/15] migration: Add multifd traces for start/end thread
  2018-03-16 11:53 [Qemu-devel] [RFC v11 00/15] mutifd Juan Quintela
                   ` (7 preceding siblings ...)
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 08/15] migration: Synchronize recv threads Juan Quintela
@ 2018-03-16 11:53 ` Juan Quintela
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 10/15] migration: Create multifd channels Juan Quintela
                   ` (6 subsequent siblings)
  15 siblings, 0 replies; 28+ messages in thread
From: Juan Quintela @ 2018-03-16 11:53 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>
---
 migration/ram.c        | 6 ++++++
 migration/trace-events | 4 ++++
 2 files changed, 10 insertions(+)

diff --git a/migration/ram.c b/migration/ram.c
index 4ba03cf9c9..7d9e363bbe 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -505,6 +505,8 @@ static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
 
+    trace_multifd_send_thread_start(p->id);
+
     while (true) {
         qemu_sem_wait(&p->sem);
         qemu_mutex_lock(&p->mutex);
@@ -521,6 +523,7 @@ static void *multifd_send_thread(void *opaque)
         }
         qemu_mutex_unlock(&p->mutex);
     }
+    trace_multifd_send_thread_end(p->id);
 
     return NULL;
 }
@@ -668,6 +671,8 @@ static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
 
+    trace_multifd_recv_thread_start(p->id);
+
     while (true) {
         qemu_sem_wait(&p->sem);
         qemu_mutex_lock(&p->mutex);
@@ -685,6 +690,7 @@ static void *multifd_recv_thread(void *opaque)
         qemu_mutex_unlock(&p->mutex);
     }
 
+    trace_multifd_recv_thread_end(p->id);
     return NULL;
 }
 
diff --git a/migration/trace-events b/migration/trace-events
index 551d325daf..9c92d3ec14 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -83,6 +83,10 @@ multifd_send_sync_wait(uint8_t id, bool quit, bool running) "channel %d quit %d
 multifd_recv_sync_main(void) ""
 multifd_recv_sync_signal(uint8_t id, bool quit, bool running) "channel %d quit %d running %d"
 multifd_recv_sync_wait(uint8_t id, bool quit, bool running) "channel %d quit %d running %d"
+multifd_send_thread_start(uint8_t id) "%d"
+multifd_send_thread_end(uint8_t id) "%d"
+multifd_recv_thread_start(uint8_t id) "%d"
+multifd_recv_thread_end(uint8_t id) "%d"
 
 # migration/migration.c
 await_return_path_close_on_source_close(void) ""
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH v11 10/15] migration: Create multifd channels
  2018-03-16 11:53 [Qemu-devel] [RFC v11 00/15] mutifd Juan Quintela
                   ` (8 preceding siblings ...)
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 09/15] migration: Add multifd traces for start/end thread Juan Quintela
@ 2018-03-16 11:53 ` Juan Quintela
  2018-03-16 17:58   ` Daniel P. Berrangé
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 11/15] migration: Delay start of migration main routines Juan Quintela
                   ` (5 subsequent siblings)
  15 siblings, 1 reply; 28+ messages in thread
From: Juan Quintela @ 2018-03-16 11:53 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

In both sides.  We still don't transmit anything through them.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 50 ++++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 40 insertions(+), 10 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 7d9e363bbe..5dade41243 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -399,6 +399,7 @@ struct MultiFDSendParams {
     uint8_t id;
     char *name;
     QemuThread thread;
+    QIOChannel *c;
     QemuSemaphore sem;
     QemuMutex mutex;
     bool running;
@@ -454,6 +455,8 @@ int multifd_save_cleanup(Error **errp)
         if (p->running) {
             qemu_thread_join(&p->thread);
         }
+        socket_send_channel_destroy(p->c);
+        p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
         g_free(p->name);
@@ -528,6 +531,25 @@ static void *multifd_send_thread(void *opaque)
     return NULL;
 }
 
+static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
+{
+    MultiFDSendParams *p = opaque;
+    QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
+    Error *local_err = NULL;
+
+    if (qio_task_propagate_error(task, &local_err)) {
+        multifd_send_terminate_threads(local_err);
+    } else {
+        p->c = QIO_CHANNEL(sioc);
+        qio_channel_set_delay(p->c, false);
+        p->running = true;
+        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
+                           QEMU_THREAD_JOINABLE);
+
+        atomic_inc(&multifd_send_state->count);
+    }
+}
+
 int multifd_save_setup(void)
 {
     int thread_count;
@@ -550,11 +572,7 @@ int multifd_save_setup(void)
         p->quit = false;
         p->id = i;
         p->name = g_strdup_printf("multifdsend_%d", i);
-        p->running = true;
-        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
-                           QEMU_THREAD_JOINABLE);
-
-        atomic_inc(&multifd_send_state->count);
+        socket_send_channel_create(multifd_new_send_channel_async, p);
     }
 
     return 0;
@@ -564,6 +582,7 @@ struct MultiFDRecvParams {
     uint8_t id;
     char *name;
     QemuThread thread;
+    QIOChannel *c;
     QemuSemaphore sem;
     QemuMutex mutex;
     bool running;
@@ -619,6 +638,8 @@ int multifd_load_cleanup(Error **errp)
         if (p->running) {
             qemu_thread_join(&p->thread);
         }
+        object_unref(OBJECT(p->c));
+        p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
         g_free(p->name);
@@ -715,10 +736,6 @@ int multifd_load_setup(void)
         p->quit = false;
         p->id = i;
         p->name = g_strdup_printf("multifdrecv_%d", i);
-        p->running = true;
-        qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
-                           QEMU_THREAD_JOINABLE);
-        atomic_inc(&multifd_recv_state->count);
     }
     return 0;
 }
@@ -736,7 +753,20 @@ bool multifd_recv_all_channels_created(void)
 
 void multifd_recv_new_channel(QIOChannel *ioc)
 {
-    /* nothing to do yet */
+    MultiFDRecvParams *p;
+    /* we need to invent channels id's until we transmit */
+    /* we will remove this on a later patch */
+    static int i;
+
+    p = &multifd_recv_state->params[i];
+    i++;
+    p->c = ioc;
+    object_ref(OBJECT(ioc));
+
+    p->running = true;
+    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
+                       QEMU_THREAD_JOINABLE);
+    atomic_inc(&multifd_recv_state->count);
 }
 
 /**
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH v11 11/15] migration: Delay start of migration main routines
  2018-03-16 11:53 [Qemu-devel] [RFC v11 00/15] mutifd Juan Quintela
                   ` (9 preceding siblings ...)
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 10/15] migration: Create multifd channels Juan Quintela
@ 2018-03-16 11:53 ` Juan Quintela
  2018-03-16 18:03   ` Daniel P. Berrangé
  2018-03-16 11:54 ` [Qemu-devel] [PATCH v11 12/15] migration: Transmit initial package through the multifd channels Juan Quintela
                   ` (4 subsequent siblings)
  15 siblings, 1 reply; 28+ messages in thread
From: Juan Quintela @ 2018-03-16 11:53 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We need to make sure that we have started all the multifd threads.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/migration.c | 4 ++--
 migration/migration.h | 1 +
 migration/ram.c       | 3 +++
 migration/socket.c    | 3 +++
 4 files changed, 9 insertions(+), 2 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index b3c6198e12..8856860c44 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -429,7 +429,7 @@ static void migration_incoming_setup(QEMUFile *f)
     qemu_file_set_blocking(f, false);
 }
 
-static void migration_incoming_process(void)
+void migration_incoming_process(void)
 {
     Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, NULL);
     qemu_coroutine_enter(co);
@@ -447,7 +447,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
 
     if (!mis->from_src_file) {
         QEMUFile *f = qemu_fopen_channel_input(ioc);
-        migration_fd_process_incoming(f);
+        migration_incoming_setup(f);
         return;
     }
     multifd_recv_new_channel(ioc);
diff --git a/migration/migration.h b/migration/migration.h
index 36b9c70fd6..cef4c189c5 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -184,6 +184,7 @@ void migrate_set_state(int *state, int old_state, int new_state);
 
 void migration_fd_process_incoming(QEMUFile *f);
 void migration_ioc_process_incoming(QIOChannel *ioc);
+void migration_incoming_process(void);
 
 bool  migration_has_all_channels(void);
 
diff --git a/migration/ram.c b/migration/ram.c
index 5dade41243..55af077abc 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -767,6 +767,9 @@ void multifd_recv_new_channel(QIOChannel *ioc)
     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
                        QEMU_THREAD_JOINABLE);
     atomic_inc(&multifd_recv_state->count);
+    if (multifd_recv_state->count == migrate_multifd_channels()) {
+        migration_incoming_process();
+    }
 }
 
 /**
diff --git a/migration/socket.c b/migration/socket.c
index 7889753fab..55af3b8180 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -178,6 +178,9 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
     if (migration_has_all_channels()) {
         /* Close listening socket as its no longer needed */
         qio_channel_close(ioc, NULL);
+        if (!migrate_use_multifd()) {
+            migration_incoming_process();
+        }
         return G_SOURCE_REMOVE;
     } else {
         return G_SOURCE_CONTINUE;
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH v11 12/15] migration: Transmit initial package through the multifd channels
  2018-03-16 11:53 [Qemu-devel] [RFC v11 00/15] mutifd Juan Quintela
                   ` (10 preceding siblings ...)
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 11/15] migration: Delay start of migration main routines Juan Quintela
@ 2018-03-16 11:54 ` Juan Quintela
  2018-03-16 18:06   ` Daniel P. Berrangé
  2018-03-16 11:54 ` [Qemu-devel] [PATCH v11 13/15] migration: Create ram_multifd_page Juan Quintela
                   ` (3 subsequent siblings)
  15 siblings, 1 reply; 28+ messages in thread
From: Juan Quintela @ 2018-03-16 11:54 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Signed-off-by: Juan Quintela <quintela@redhat.com>

--

Be network agnostic.
Add error checking for all values.
---
 migration/ram.c | 97 ++++++++++++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 92 insertions(+), 5 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 55af077abc..dd77c78016 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -52,6 +52,8 @@
 #include "qemu/rcu_queue.h"
 #include "migration/colo.h"
 #include "migration/block.h"
+#include "sysemu/sysemu.h"
+#include "qemu/uuid.h"
 
 /***********************************************************/
 /* ram save/restore */
@@ -395,6 +397,16 @@ static void compress_threads_save_setup(void)
 
 /* Multiple fd's */
 
+#define MULTIFD_MAGIC 0x11223344U
+#define MULTIFD_VERSION 1
+
+typedef struct {
+    uint32_t magic;
+    uint32_t version;
+    unsigned char uuid[16]; /* QemuUUID */
+    uint8_t id;
+} __attribute__((packed)) MultiFDInit_t;
+
 struct MultiFDSendParams {
     uint8_t id;
     char *name;
@@ -408,6 +420,65 @@ struct MultiFDSendParams {
 };
 typedef struct MultiFDSendParams MultiFDSendParams;
 
+static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
+{
+    MultiFDInit_t msg;
+    int ret;
+
+    msg.magic = cpu_to_be32(MULTIFD_MAGIC);
+    msg.version = cpu_to_be32(MULTIFD_VERSION);
+    msg.id = p->id;
+    memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
+
+    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
+    if (ret != 0) {
+        return -1;
+    }
+    return 0;
+}
+
+static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
+{
+    MultiFDInit_t msg;
+    int ret;
+
+    ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
+    if (ret != 0) {
+        return -1;
+    }
+
+    be32_to_cpus(&msg.magic);
+    be32_to_cpus(&msg.version);
+
+    if (msg.magic != MULTIFD_MAGIC) {
+        error_setg(errp, "multifd: recevied packet magic %d "
+                   "expected %d", msg.magic, MULTIFD_MAGIC);
+        return -1;
+    }
+
+    if (msg.version != MULTIFD_VERSION) {
+        error_setg(errp, "multifd: recevied packet version %d "
+                   "expected %d", msg.version, MULTIFD_VERSION);
+        return -1;
+    }
+
+    if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
+        char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
+        error_setg(errp, "multifd: received uuid '%s' and expected "
+                   "uuid '%s' for channel %hhd", msg.uuid, uuid, msg.id);
+        g_free(uuid);
+        return -1;
+    }
+
+    if (msg.id > migrate_multifd_channels()) {
+        error_setg(errp, "multifd: recevied channel version %d "
+                   "expected %d", msg.version, MULTIFD_VERSION);
+        return -1;
+    }
+
+    return msg.id;
+}
+
 struct {
     MultiFDSendParams *params;
     /* number of created threads */
@@ -507,9 +578,15 @@ static void multifd_send_sync_main(void)
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
+    Error *local_err = NULL;
 
     trace_multifd_send_thread_start(p->id);
 
+    if (multifd_send_initial_packet(p, &local_err) < 0) {
+        multifd_send_terminate_threads(local_err);
+        return NULL;
+    }
+
     while (true) {
         qemu_sem_wait(&p->sem);
         qemu_mutex_lock(&p->mutex);
@@ -754,12 +831,22 @@ bool multifd_recv_all_channels_created(void)
 void multifd_recv_new_channel(QIOChannel *ioc)
 {
     MultiFDRecvParams *p;
-    /* we need to invent channels id's until we transmit */
-    /* we will remove this on a later patch */
-    static int i;
+    Error *local_err = NULL;
+    int id;
 
-    p = &multifd_recv_state->params[i];
-    i++;
+    id = multifd_recv_initial_packet(ioc, &local_err);
+    if (id < 0) {
+        multifd_recv_terminate_threads(local_err);
+        return;
+    }
+
+    p = &multifd_recv_state->params[id];
+    if (p->c != NULL) {
+        error_setg(&local_err, "multifd: received id '%d' already setup'",
+                   id);
+        multifd_recv_terminate_threads(local_err);
+        return;
+    }
     p->c = ioc;
     object_ref(OBJECT(ioc));
 
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH v11 13/15] migration: Create ram_multifd_page
  2018-03-16 11:53 [Qemu-devel] [RFC v11 00/15] mutifd Juan Quintela
                   ` (11 preceding siblings ...)
  2018-03-16 11:54 ` [Qemu-devel] [PATCH v11 12/15] migration: Transmit initial package through the multifd channels Juan Quintela
@ 2018-03-16 11:54 ` Juan Quintela
  2018-03-16 11:54 ` [Qemu-devel] [PATCH v11 14/15] migration: Create pages structure for reception Juan Quintela
                   ` (2 subsequent siblings)
  15 siblings, 0 replies; 28+ messages in thread
From: Juan Quintela @ 2018-03-16 11:54 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

The function still don't use multifd, but we have simplified
ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
counter.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--
Add last_page parameter
Add commets for done and address
Remove multifd field, it is the same than normal pages
Merge next patch, now we send multiple pages at a time
Remove counter for multifd pages, it is identical to normal pages
Use iovec's instead of creating the equivalent.
Clear memory used by pages (dave)
Use g_new0(danp)
define MULTIFD_CONTINUE
now pages member is a pointer
Fix off-by-one in number of pages in one packet
Remove RAM_SAVE_FLAG_MULTIFD_PAGE
s/multifd_pages_t/MultiFDPages_t/
---
 migration/ram.c        | 148 ++++++++++++++++++++++++++++++++++++++++++++++++-
 migration/trace-events |   3 +-
 2 files changed, 149 insertions(+), 2 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index dd77c78016..9919777a21 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -54,6 +54,7 @@
 #include "migration/block.h"
 #include "sysemu/sysemu.h"
 #include "qemu/uuid.h"
+#include "qemu/iov.h"
 
 /***********************************************************/
 /* ram save/restore */
@@ -407,7 +408,22 @@ typedef struct {
     uint8_t id;
 } __attribute__((packed)) MultiFDInit_t;
 
+typedef struct {
+    /* number of used pages */
+    uint32_t used;
+    /* number of allocated pages */
+    uint32_t allocated;
+    /* global number of generated multifd packets */
+    uint32_t seq;
+    /* offset of each page */
+    ram_addr_t *offset;
+    /* pointer to each page */
+    struct iovec *iov;
+    RAMBlock *block;
+} MultiFDPages_t;
+
 struct MultiFDSendParams {
+    /* not changed */
     uint8_t id;
     char *name;
     QemuThread thread;
@@ -415,8 +431,15 @@ struct MultiFDSendParams {
     QemuSemaphore sem;
     QemuMutex mutex;
     bool running;
+    /* protected by param mutex */
     bool quit;
     bool sync;
+    MultiFDPages_t *pages;
+    /* how many patches has sent this channel */
+    uint32_t packets_sent;
+    /* protected by multifd mutex */
+    /* has the thread finish the last submitted job */
+    bool done;
 };
 typedef struct MultiFDSendParams MultiFDSendParams;
 
@@ -485,8 +508,34 @@ struct {
     int count;
     /* syncs main thread and channels */
     QemuSemaphore sem_main;
+    QemuMutex mutex;
+    QemuSemaphore sem;
+    MultiFDPages_t *pages;
 } *multifd_send_state;
 
+static void multifd_pages_init(MultiFDPages_t **ppages, size_t size)
+{
+    MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
+
+    pages->allocated = size;
+    pages->iov = g_new0(struct iovec, size);
+    pages->offset = g_new0(ram_addr_t, size);
+    *ppages = pages;
+}
+
+static void multifd_pages_clear(MultiFDPages_t *pages)
+{
+    pages->used = 0;
+    pages->allocated = 0;
+    pages->seq = 0;
+    pages->block = NULL;
+    g_free(pages->iov);
+    pages->iov = NULL;
+    g_free(pages->offset);
+    pages->offset = NULL;
+    g_free(pages);
+}
+
 static void multifd_send_terminate_threads(Error *errp)
 {
     int i;
@@ -532,10 +581,14 @@ int multifd_save_cleanup(Error **errp)
         qemu_sem_destroy(&p->sem);
         g_free(p->name);
         p->name = NULL;
+        multifd_pages_clear(p->pages);
+        p->pages = NULL;
     }
     qemu_sem_destroy(&multifd_send_state->sem_main);
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
+    multifd_pages_clear(multifd_send_state->pages);
+    multifd_send_state->pages = NULL;
     g_free(multifd_send_state);
     multifd_send_state = NULL;
     return ret;
@@ -586,6 +639,7 @@ static void *multifd_send_thread(void *opaque)
         multifd_send_terminate_threads(local_err);
         return NULL;
     }
+    qemu_sem_post(&multifd_send_state->sem);
 
     while (true) {
         qemu_sem_wait(&p->sem);
@@ -601,9 +655,23 @@ static void *multifd_send_thread(void *opaque)
             qemu_mutex_unlock(&p->mutex);
             break;
         }
+        if (p->pages->used) {
+            p->pages->used = 0;
+            qemu_mutex_unlock(&p->mutex);
+
+            trace_multifd_send(p->id, p->pages->seq, p->pages->used);
+            /* ToDo: send page here */
+
+            qemu_mutex_lock(&multifd_send_state->mutex);
+            p->done = true;
+            p->packets_sent++;
+            qemu_mutex_unlock(&multifd_send_state->mutex);
+            qemu_sem_post(&multifd_send_state->sem);
+            continue;
+        }
         qemu_mutex_unlock(&p->mutex);
     }
-    trace_multifd_send_thread_end(p->id);
+    trace_multifd_send_thread_end(p->id, p->packets_sent);
 
     return NULL;
 }
@@ -630,6 +698,7 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
 int multifd_save_setup(void)
 {
     int thread_count;
+    uint32_t page_count = migrate_multifd_page_count();
     uint8_t i;
 
     if (!migrate_use_multifd()) {
@@ -640,6 +709,9 @@ int multifd_save_setup(void)
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
     atomic_set(&multifd_send_state->count, 0);
     qemu_sem_init(&multifd_send_state->sem_main, 0);
+    qemu_mutex_init(&multifd_send_state->mutex);
+    qemu_sem_init(&multifd_send_state->sem, 0);
+    multifd_pages_init(&multifd_send_state->pages, page_count);
 
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
@@ -648,6 +720,8 @@ int multifd_save_setup(void)
         qemu_sem_init(&p->sem, 0);
         p->quit = false;
         p->id = i;
+        p->done = true;
+        multifd_pages_init(&p->pages, page_count);
         p->name = g_strdup_printf("multifdsend_%d", i);
         socket_send_channel_create(multifd_new_send_channel_async, p);
     }
@@ -655,6 +729,51 @@ int multifd_save_setup(void)
     return 0;
 }
 
+static void multifd_send_page(RAMBlock *block, ram_addr_t offset,
+                              bool last_page)
+{
+    int i;
+    static int next_channel;
+    MultiFDSendParams *p = NULL; /* make happy gcc */
+    MultiFDPages_t *pages = multifd_send_state->pages;
+
+    if (!pages->block) {
+        pages->block = block;
+    }
+
+    pages->offset[pages->used] = offset;
+    pages->iov[pages->used].iov_base = block->host + offset;
+    pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
+    pages->used++;
+
+    if (!last_page) {
+        if (pages->used < pages->allocated) {
+            return;
+        }
+    }
+
+    qemu_sem_wait(&multifd_send_state->sem);
+    qemu_mutex_lock(&multifd_send_state->mutex);
+    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
+        p = &multifd_send_state->params[i];
+
+        if (p->done) {
+            p->done = false;
+            next_channel = (i + 1) % migrate_multifd_channels();
+            break;
+        }
+    }
+    qemu_mutex_unlock(&multifd_send_state->mutex);
+    qemu_mutex_lock(&p->mutex);
+    p->pages->used = 0;
+    p->pages->seq = pages->seq + 1;
+    p->pages->block = NULL;
+    multifd_send_state->pages = p->pages;
+    p->pages = pages;
+    qemu_mutex_unlock(&p->mutex);
+    qemu_sem_post(&p->sem);
+}
+
 struct MultiFDRecvParams {
     uint8_t id;
     char *name;
@@ -1291,6 +1410,31 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
     return pages;
 }
 
+static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
+                            bool last_stage)
+{
+    int pages;
+    uint8_t *p;
+    RAMBlock *block = pss->block;
+    ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
+
+    p = block->host + offset;
+
+    pages = save_zero_page(rs, block, offset);
+    if (pages == -1) {
+        ram_counters.transferred +=
+            save_page_header(rs, rs->f, block,
+                             offset | RAM_SAVE_FLAG_PAGE);
+        multifd_send_page(block, offset, rs->migration_dirty_pages == 1);
+        qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
+        ram_counters.transferred += TARGET_PAGE_SIZE;
+        pages = 1;
+        ram_counters.normal++;
+    }
+
+    return pages;
+}
+
 static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
                                 ram_addr_t offset)
 {
@@ -1719,6 +1863,8 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
         if (migrate_use_compression() &&
             (rs->ram_bulk_stage || !migrate_use_xbzrle())) {
             res = ram_save_compressed_page(rs, pss, last_stage);
+        } else if (migrate_use_multifd()) {
+            res = ram_multifd_page(rs, pss, last_stage);
         } else {
             res = ram_save_page(rs, pss, last_stage);
         }
diff --git a/migration/trace-events b/migration/trace-events
index 9c92d3ec14..06a9ead811 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -84,9 +84,10 @@ multifd_recv_sync_main(void) ""
 multifd_recv_sync_signal(uint8_t id, bool quit, bool running) "channel %d quit %d running %d"
 multifd_recv_sync_wait(uint8_t id, bool quit, bool running) "channel %d quit %d running %d"
 multifd_send_thread_start(uint8_t id) "%d"
-multifd_send_thread_end(uint8_t id) "%d"
+multifd_send_thread_end(uint8_t id, uint32_t packets) "channel %d packets %d"
 multifd_recv_thread_start(uint8_t id) "%d"
 multifd_recv_thread_end(uint8_t id) "%d"
+multifd_send(uint8_t id, int seq, int num) "channel %d sequence %d num pages %d"
 
 # migration/migration.c
 await_return_path_close_on_source_close(void) ""
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH v11 14/15] migration: Create pages structure for reception
  2018-03-16 11:53 [Qemu-devel] [RFC v11 00/15] mutifd Juan Quintela
                   ` (12 preceding siblings ...)
  2018-03-16 11:54 ` [Qemu-devel] [PATCH v11 13/15] migration: Create ram_multifd_page Juan Quintela
@ 2018-03-16 11:54 ` Juan Quintela
  2018-03-16 11:54 ` [Qemu-devel] [PATCH v11 15/15] [RFC] migration: Send pages through the multifd channels Juan Quintela
  2018-03-16 18:08 ` [Qemu-devel] [RFC v11 00/15] mutifd Daniel P. Berrangé
  15 siblings, 0 replies; 28+ messages in thread
From: Juan Quintela @ 2018-03-16 11:54 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/migration/ram.c b/migration/ram.c
index 9919777a21..0132de6e02 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -775,6 +775,7 @@ static void multifd_send_page(RAMBlock *block, ram_addr_t offset,
 }
 
 struct MultiFDRecvParams {
+    /* not changed */
     uint8_t id;
     char *name;
     QemuThread thread;
@@ -782,8 +783,13 @@ struct MultiFDRecvParams {
     QemuSemaphore sem;
     QemuMutex mutex;
     bool running;
+    /* protected by param mutex */
     bool quit;
     bool sync;
+    /* how many patckets has recv this channel */
+    uint32_t packets_recv;
+    MultiFDPages_t *pages;
+    bool done;
 };
 typedef struct MultiFDRecvParams MultiFDRecvParams;
 
@@ -840,6 +846,8 @@ int multifd_load_cleanup(Error **errp)
         qemu_sem_destroy(&p->sem);
         g_free(p->name);
         p->name = NULL;
+        multifd_pages_clear(p->pages);
+        p->pages = NULL;
     }
     qemu_sem_destroy(&multifd_recv_state->sem_main);
     g_free(multifd_recv_state->params);
@@ -914,6 +922,7 @@ static void *multifd_recv_thread(void *opaque)
 int multifd_load_setup(void)
 {
     int thread_count;
+    uint32_t page_count = migrate_multifd_page_count();
     uint8_t i;
 
     if (!migrate_use_multifd()) {
@@ -924,6 +933,7 @@ int multifd_load_setup(void)
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
     atomic_set(&multifd_recv_state->count, 0);
     qemu_sem_init(&multifd_recv_state->sem_main, 0);
+
     for (i = 0; i < thread_count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
@@ -932,6 +942,7 @@ int multifd_load_setup(void)
         p->quit = false;
         p->id = i;
         p->name = g_strdup_printf("multifdrecv_%d", i);
+        multifd_pages_init(&p->pages, page_count);
     }
     return 0;
 }
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH v11 15/15] [RFC] migration: Send pages through the multifd channels
  2018-03-16 11:53 [Qemu-devel] [RFC v11 00/15] mutifd Juan Quintela
                   ` (13 preceding siblings ...)
  2018-03-16 11:54 ` [Qemu-devel] [PATCH v11 14/15] migration: Create pages structure for reception Juan Quintela
@ 2018-03-16 11:54 ` Juan Quintela
  2018-03-16 18:08 ` [Qemu-devel] [RFC v11 00/15] mutifd Daniel P. Berrangé
  15 siblings, 0 replies; 28+ messages in thread
From: Juan Quintela @ 2018-03-16 11:54 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Migration ends correctly, but there is still a race between clean up
and last synchronization.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c        | 240 ++++++++++++++++++++++++++++++++++++++++++-------
 migration/trace-events |   3 +-
 2 files changed, 211 insertions(+), 32 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 0132de6e02..d8ad456eca 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -408,6 +408,16 @@ typedef struct {
     uint8_t id;
 } __attribute__((packed)) MultiFDInit_t;
 
+typedef struct {
+    uint32_t magic;
+    uint32_t version;
+    uint32_t size;
+    uint32_t used;
+    uint32_t seq;
+    char ramblock[256];
+    uint64_t offset[];
+} __attribute__((packed)) MultiFDPacket_t;
+
 typedef struct {
     /* number of used pages */
     uint32_t used;
@@ -422,7 +432,7 @@ typedef struct {
     RAMBlock *block;
 } MultiFDPages_t;
 
-struct MultiFDSendParams {
+typedef struct {
     /* not changed */
     uint8_t id;
     char *name;
@@ -440,8 +450,29 @@ struct MultiFDSendParams {
     /* protected by multifd mutex */
     /* has the thread finish the last submitted job */
     bool done;
-};
-typedef struct MultiFDSendParams MultiFDSendParams;
+    uint32_t packet_len;
+    MultiFDPacket_t *packet;
+} MultiFDSendParams;
+
+typedef struct {
+    /* not changed */
+    uint8_t id;
+    char *name;
+    QemuThread thread;
+    QIOChannel *c;
+    QemuSemaphore sem;
+    QemuMutex mutex;
+    bool running;
+    /* protected by param mutex */
+    bool quit;
+    bool sync;
+    MultiFDPages_t *pages;
+    /* how many patckets has recv this channel */
+    uint32_t packets_recv;
+    bool done;
+    uint32_t packet_len;
+    MultiFDPacket_t *packet;
+} MultiFDRecvParams;
 
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
 {
@@ -502,6 +533,80 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
     return msg.id;
 }
 
+static void multifd_send_fill_packet(MultiFDSendParams *p)
+{
+    MultiFDPacket_t *packet = p->packet;
+    int i;
+
+    packet->magic = cpu_to_be32(MULTIFD_MAGIC);
+    packet->version = cpu_to_be32(MULTIFD_VERSION);
+    packet->size = cpu_to_be32(migrate_multifd_page_count());
+    packet->used = cpu_to_be32(p->pages->used);
+    packet->seq = cpu_to_be32(p->pages->seq);
+
+    for (i = 0; i < p->pages->used; i++) {
+        packet->offset[i] = cpu_to_be64(p->pages->offset[i]);
+    }
+
+    strncpy(packet->ramblock, p->pages->block->idstr, 256);
+}
+
+static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
+{
+    MultiFDPacket_t *packet = p->packet;
+    RAMBlock *block;
+    int i;
+
+    be32_to_cpus(&packet->magic);
+    if (packet->magic != MULTIFD_MAGIC) {
+        error_setg(errp, "multifd: received packet "
+                   "version %d and expected version %d",
+                   packet->magic, MULTIFD_VERSION);
+        return -1;
+    }
+
+    be32_to_cpus(&packet->version);
+    if (packet->version != MULTIFD_VERSION) {
+        error_setg(errp, "multifd: received packet "
+                   "version %d and expected version %d",
+                   packet->version, MULTIFD_VERSION);
+        return -1;
+    }
+
+    be32_to_cpus(&packet->size);
+    if (packet->size > migrate_multifd_page_count()) {
+        error_setg(errp, "multifd: received packet "
+                   "with size %d and expected maximum size %d",
+                   packet->size, migrate_multifd_page_count()) ;
+        return -1;
+    }
+
+    p->pages->used = be32_to_cpu(packet->used);
+    if (p->pages->used > packet->size) {
+        error_setg(errp, "multifd: received packet "
+                   "with size %d and expected maximum size %d",
+                   p->pages->used, packet->size) ;
+        return -1;
+    }
+
+    be32_to_cpus(&packet->seq);
+
+    block = qemu_ram_block_by_name(packet->ramblock);
+    if (!block) {
+        error_setg(errp, "multifd: unknown ram block %s",
+                   packet->ramblock);
+        return -1;
+    }
+
+    for (i = 0; i < p->pages->used; i++) {
+        ram_addr_t offset = be64_to_cpu(packet->offset[i]);
+
+        p->pages->iov[i].iov_base = block->host + offset;
+        p->pages->iov[i].iov_len = TARGET_PAGE_SIZE;
+    }
+    return 0;
+}
+
 struct {
     MultiFDSendParams *params;
     /* number of created threads */
@@ -583,6 +688,9 @@ int multifd_save_cleanup(Error **errp)
         p->name = NULL;
         multifd_pages_clear(p->pages);
         p->pages = NULL;
+        p->packet_len = 0;
+        g_free(p->packet);
+        p->packet = NULL;
     }
     qemu_sem_destroy(&multifd_send_state->sem_main);
     g_free(multifd_send_state->params);
@@ -632,12 +740,13 @@ static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
     Error *local_err = NULL;
+    int ret;
 
     trace_multifd_send_thread_start(p->id);
 
-    if (multifd_send_initial_packet(p, &local_err) < 0) {
-        multifd_send_terminate_threads(local_err);
-        return NULL;
+    ret = multifd_send_initial_packet(p, &local_err);
+    if (ret < 0) {
+        goto out;
     }
     qemu_sem_post(&multifd_send_state->sem);
 
@@ -651,17 +760,28 @@ static void *multifd_send_thread(void *opaque)
             continue;
         }
         if (p->quit) {
-            p->running = false;
             qemu_mutex_unlock(&p->mutex);
             break;
         }
         if (p->pages->used) {
+            Error *local_err = NULL;
+            uint32_t used;
+
+            multifd_send_fill_packet(p);
+            used = p->pages->used;
             p->pages->used = 0;
             qemu_mutex_unlock(&p->mutex);
 
-            trace_multifd_send(p->id, p->pages->seq, p->pages->used);
-            /* ToDo: send page here */
-
+            ret = qio_channel_write_all(p->c, (void *)p->packet,
+                                        p->packet_len, &local_err);
+            if (ret != 0) {
+                break;
+            }
+            trace_multifd_send(p->id, p->pages->seq, used);
+            ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err);
+            if (ret != 0) {
+                break;
+            }
             qemu_mutex_lock(&multifd_send_state->mutex);
             p->done = true;
             p->packets_sent++;
@@ -671,6 +791,15 @@ static void *multifd_send_thread(void *opaque)
         }
         qemu_mutex_unlock(&p->mutex);
     }
+out:
+    if (ret) {
+        multifd_send_terminate_threads(local_err);
+    }
+
+    qemu_mutex_lock(&p->mutex);
+    p->running = false;
+    qemu_mutex_unlock(&p->mutex);
+
     trace_multifd_send_thread_end(p->id, p->packets_sent);
 
     return NULL;
@@ -722,6 +851,9 @@ int multifd_save_setup(void)
         p->id = i;
         p->done = true;
         multifd_pages_init(&p->pages, page_count);
+        p->packet_len = sizeof(MultiFDPacket_t)
+                      + sizeof(ram_addr_t) * page_count;
+        p->packet = g_malloc0(p->packet_len);
         p->name = g_strdup_printf("multifdsend_%d", i);
         socket_send_channel_create(multifd_new_send_channel_async, p);
     }
@@ -774,25 +906,6 @@ static void multifd_send_page(RAMBlock *block, ram_addr_t offset,
     qemu_sem_post(&p->sem);
 }
 
-struct MultiFDRecvParams {
-    /* not changed */
-    uint8_t id;
-    char *name;
-    QemuThread thread;
-    QIOChannel *c;
-    QemuSemaphore sem;
-    QemuMutex mutex;
-    bool running;
-    /* protected by param mutex */
-    bool quit;
-    bool sync;
-    /* how many patckets has recv this channel */
-    uint32_t packets_recv;
-    MultiFDPages_t *pages;
-    bool done;
-};
-typedef struct MultiFDRecvParams MultiFDRecvParams;
-
 struct {
     MultiFDRecvParams *params;
     /* number of created threads */
@@ -848,6 +961,9 @@ int multifd_load_cleanup(Error **errp)
         p->name = NULL;
         multifd_pages_clear(p->pages);
         p->pages = NULL;
+        p->packet_len = 0;
+        g_free(p->packet);
+        p->packet = NULL;
     }
     qemu_sem_destroy(&multifd_recv_state->sem_main);
     g_free(multifd_recv_state->params);
@@ -892,12 +1008,34 @@ static void multifd_recv_sync_main(void)
     trace_multifd_recv_sync_main();
 }
 
+static gboolean recv_channel_ready(QIOChannel *ioc,
+                                   GIOCondition condition,
+                                   gpointer opaque)
+{
+    MultiFDRecvParams *p = opaque;
+
+    if (condition != G_IO_IN) {
+        return G_SOURCE_REMOVE;
+    }
+
+    qemu_mutex_lock(&p->mutex);
+    p->done = false;
+    qemu_mutex_unlock(&p->mutex);
+    qemu_sem_post(&p->sem);
+
+    return G_SOURCE_CONTINUE;
+
+}
+
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
 
     trace_multifd_recv_thread_start(p->id);
 
+    qio_channel_add_watch(p->c, G_IO_IN | G_IO_HUP | G_IO_ERR,
+                          recv_channel_ready, p, NULL);
+
     while (true) {
         qemu_sem_wait(&p->sem);
         qemu_mutex_lock(&p->mutex);
@@ -907,15 +1045,51 @@ static void *multifd_recv_thread(void *opaque)
             qemu_sem_post(&multifd_recv_state->sem_main);
             continue;
         }
+        if (!p->done) {
+            Error *local_err = NULL;
+            int ret;
+
+            qemu_mutex_unlock(&p->mutex);
+
+            ret = qio_channel_read_all(p->c, (void *)p->packet,
+                                       p->packet_len, &local_err);
+            if (ret != 0) {
+                multifd_recv_terminate_threads(local_err);
+                break;
+            }
+
+            ret = multifd_recv_unfill_packet(p, &local_err);
+            if (ret < 0) {
+                multifd_recv_terminate_threads(local_err);
+                break;
+            }
+
+            trace_multifd_recv(p->id, p->pages->seq, p->pages->used);
+            ret = qio_channel_readv_all(p->c, p->pages->iov,
+                                        p->pages->used, &local_err);
+            if (ret != 0) {
+                multifd_recv_terminate_threads(local_err);
+                break;
+            }
+            qemu_mutex_lock(&p->mutex);
+            p->done = true;
+            p->packets_recv++;
+            qemu_mutex_unlock(&p->mutex);
+
+            continue;
+        }
         if (p->quit) {
-            p->running = false;
             qemu_mutex_unlock(&p->mutex);
             break;
         }
         qemu_mutex_unlock(&p->mutex);
     }
 
-    trace_multifd_recv_thread_end(p->id);
+    qemu_mutex_lock(&p->mutex);
+    p->running = false;
+    qemu_mutex_unlock(&p->mutex);
+
+    trace_multifd_recv_thread_end(p->id, p->packets_recv);
     return NULL;
 }
 
@@ -940,9 +1114,13 @@ int multifd_load_setup(void)
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
         p->quit = false;
+        p->done = true;
         p->id = i;
         p->name = g_strdup_printf("multifdrecv_%d", i);
         multifd_pages_init(&p->pages, page_count);
+        p->packet_len = sizeof(MultiFDPacket_t)
+                      + sizeof(ram_addr_t) * page_count;
+        p->packet = g_malloc0(p->packet_len);
     }
     return 0;
 }
diff --git a/migration/trace-events b/migration/trace-events
index 06a9ead811..a6c1c4b20c 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -86,8 +86,9 @@ multifd_recv_sync_wait(uint8_t id, bool quit, bool running) "channel %d quit %d
 multifd_send_thread_start(uint8_t id) "%d"
 multifd_send_thread_end(uint8_t id, uint32_t packets) "channel %d packets %d"
 multifd_recv_thread_start(uint8_t id) "%d"
-multifd_recv_thread_end(uint8_t id) "%d"
+multifd_recv_thread_end(uint8_t id, uint32_t packets) "channel %d packets %d"
 multifd_send(uint8_t id, int seq, int num) "channel %d sequence %d num pages %d"
+multifd_recv(uint8_t id, int seq, int num) "channel %d sequence %d num pages %d"
 
 # migration/migration.c
 await_return_path_close_on_source_close(void) ""
-- 
2.14.3

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH v11 01/15] migration: Set error state in case of error
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 01/15] migration: Set error state in case of error Juan Quintela
@ 2018-03-16 17:49   ` Daniel P. Berrangé
  2018-03-16 17:57     ` Daniel P. Berrangé
  0 siblings, 1 reply; 28+ messages in thread
From: Daniel P. Berrangé @ 2018-03-16 17:49 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, dgilbert, peterx

On Fri, Mar 16, 2018 at 12:53:49PM +0100, Juan Quintela wrote:
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 20 ++++++++++++++++++++
>  1 file changed, 20 insertions(+)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 7266351fd0..1b8095a358 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -414,6 +414,16 @@ static void terminate_multifd_send_threads(Error *errp)
>  {
>      int i;
>  
> +    if (errp) {
> +        MigrationState *s = migrate_get_current();
> +        migrate_set_error(s, errp);

This doesn't look quiet right. You're checking if 'errp' is a non-NULL,
which just tells you if the caller wants to collect the error, not
whether an error has happened. For the latter you need

  if (errp && *errp)

seems a little strange though for the caller to pass an error into this
method for reporting.

> +        if (s->state == MIGRATION_STATUS_SETUP ||
> +            s->state == MIGRATION_STATUS_ACTIVE) {
> +            migrate_set_state(&s->state, s->state,
> +                              MIGRATION_STATUS_FAILED);
> +        }
> +    }
> +
>      for (i = 0; i < multifd_send_state->count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
> @@ -514,6 +524,16 @@ static void terminate_multifd_recv_threads(Error *errp)
>  {
>      int i;
>  
> +    if (errp) {
> +        MigrationState *s = migrate_get_current();
> +        migrate_set_error(s, errp);
> +        if (s->state == MIGRATION_STATUS_SETUP ||
> +            s->state == MIGRATION_STATUS_ACTIVE) {
> +            migrate_set_state(&s->state, s->state,
> +                              MIGRATION_STATUS_FAILED);
> +        }
> +    }
> +
>      for (i = 0; i < multifd_recv_state->count; i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
> -- 
> 2.14.3
> 
> 

Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH v11 02/15] migration: In case of error just end the migration
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 02/15] migration: In case of error just end the migration Juan Quintela
@ 2018-03-16 17:49   ` Daniel P. Berrangé
  0 siblings, 0 replies; 28+ messages in thread
From: Daniel P. Berrangé @ 2018-03-16 17:49 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, dgilbert, peterx

On Fri, Mar 16, 2018 at 12:53:50PM +0100, Juan Quintela wrote:
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>
> 
> --
> 
> As requested, just continue connection in case of error.

Just need to touch up the $subject to match the behaviour now. eg

   migration: continue listening for clients if accept fails

> ---
>  migration/socket.c | 5 +----
>  1 file changed, 1 insertion(+), 4 deletions(-)
> 
> diff --git a/migration/socket.c b/migration/socket.c
> index 52db0c0c09..8dda1d9a98 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -140,9 +140,7 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
>      sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc),
>                                       &err);
>      if (!sioc) {
> -        error_report("could not accept migration connection (%s)",
> -                     error_get_pretty(err));
> -        goto out;
> +        return G_SOURCE_CONTINUE;
>      }
>  
>      trace_migration_socket_incoming_accepted();
> @@ -151,7 +149,6 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
>      migration_channel_process_incoming(QIO_CHANNEL(sioc));
>      object_unref(OBJECT(sioc));
>  
> -out:
>      if (migration_has_all_channels()) {
>          /* Close listening socket as its no longer needed */
>          qio_channel_close(ioc, NULL);
> -- 
> 2.14.3
> 
> 

Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH v11 03/15] migration: terminate_* can be called for other threads
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 03/15] migration: terminate_* can be called for other threads Juan Quintela
@ 2018-03-16 17:51   ` Daniel P. Berrangé
  0 siblings, 0 replies; 28+ messages in thread
From: Daniel P. Berrangé @ 2018-03-16 17:51 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, dgilbert, peterx

On Fri, Mar 16, 2018 at 12:53:51PM +0100, Juan Quintela wrote:
> Once there, make  count field to always be accessed with atomic
> operations.  To make blocking operations, we need to know that the
> thread is running, so create a bool to indicate that.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> --
> 
> Once here, s/terminate_multifd_*-threads/multifd_*_terminate_threads/
> This is consistente with every other function

Usuaully I'd suggest that renames be done in a separate patch from
functional changes.

> ---
>  migration/ram.c | 38 ++++++++++++++++++++++++--------------
>  1 file changed, 24 insertions(+), 14 deletions(-)

Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>


Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH v11 04/15] migration: Introduce multifd_recv_new_channel()
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 04/15] migration: Introduce multifd_recv_new_channel() Juan Quintela
@ 2018-03-16 17:51   ` Daniel P. Berrangé
  0 siblings, 0 replies; 28+ messages in thread
From: Daniel P. Berrangé @ 2018-03-16 17:51 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, dgilbert, peterx

On Fri, Mar 16, 2018 at 12:53:52PM +0100, Juan Quintela wrote:
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/migration.c | 3 ++-
>  migration/ram.c       | 6 ++++++
>  migration/ram.h       | 2 ++
>  3 files changed, 10 insertions(+), 1 deletion(-)

Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>


Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH v11 05/15] migration: Be sure all recv channels are created
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 05/15] migration: Be sure all recv channels are created Juan Quintela
@ 2018-03-16 17:52   ` Daniel P. Berrangé
  0 siblings, 0 replies; 28+ messages in thread
From: Daniel P. Berrangé @ 2018-03-16 17:52 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, dgilbert, peterx

On Fri, Mar 16, 2018 at 12:53:53PM +0100, Juan Quintela wrote:
> We need them before we start migration.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/migration.c |  6 +++++-
>  migration/ram.c       | 11 +++++++++++
>  migration/ram.h       |  1 +
>  3 files changed, 17 insertions(+), 1 deletion(-)

Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>


Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH v11 06/15] migration: Export functions to create send channels
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 06/15] migration: Export functions to create send channels Juan Quintela
@ 2018-03-16 17:53   ` Daniel P. Berrangé
  0 siblings, 0 replies; 28+ messages in thread
From: Daniel P. Berrangé @ 2018-03-16 17:53 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, dgilbert, peterx

On Fri, Mar 16, 2018 at 12:53:54PM +0100, Juan Quintela wrote:
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/socket.c | 28 +++++++++++++++++++++++++++-
>  migration/socket.h |  7 +++++++
>  2 files changed, 34 insertions(+), 1 deletion(-)

Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>



Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH v11 01/15] migration: Set error state in case of error
  2018-03-16 17:49   ` Daniel P. Berrangé
@ 2018-03-16 17:57     ` Daniel P. Berrangé
  2018-04-06 17:10       ` Juan Quintela
  0 siblings, 1 reply; 28+ messages in thread
From: Daniel P. Berrangé @ 2018-03-16 17:57 UTC (permalink / raw)
  To: Juan Quintela; +Cc: lvivier, qemu-devel, peterx, dgilbert

On Fri, Mar 16, 2018 at 05:49:07PM +0000, Daniel P. Berrangé wrote:
> On Fri, Mar 16, 2018 at 12:53:49PM +0100, Juan Quintela wrote:
> > Signed-off-by: Juan Quintela <quintela@redhat.com>
> > ---
> >  migration/ram.c | 20 ++++++++++++++++++++
> >  1 file changed, 20 insertions(+)
> > 
> > diff --git a/migration/ram.c b/migration/ram.c
> > index 7266351fd0..1b8095a358 100644
> > --- a/migration/ram.c
> > +++ b/migration/ram.c
> > @@ -414,6 +414,16 @@ static void terminate_multifd_send_threads(Error *errp)
> >  {
> >      int i;
> >  
> > +    if (errp) {
> > +        MigrationState *s = migrate_get_current();
> > +        migrate_set_error(s, errp);
> 
> This doesn't look quiet right. You're checking if 'errp' is a non-NULL,
> which just tells you if the caller wants to collect the error, not
> whether an error has happened. For the latter you need
> 
>   if (errp && *errp)
> 
> seems a little strange though for the caller to pass an error into this
> method for reporting.

Oh wait, I'm being mislead by the unusual parameter name.

An "errp" name should only ever be used for a "Error **", but we
only have an "Error *" here.

So just fix the parameter name to be "err" instead of "errp".


> > +        if (s->state == MIGRATION_STATUS_SETUP ||
> > +            s->state == MIGRATION_STATUS_ACTIVE) {
> > +            migrate_set_state(&s->state, s->state,
> > +                              MIGRATION_STATUS_FAILED);
> > +        }
> > +    }
> > +
> >      for (i = 0; i < multifd_send_state->count; i++) {
> >          MultiFDSendParams *p = &multifd_send_state->params[i];
> >  
> > @@ -514,6 +524,16 @@ static void terminate_multifd_recv_threads(Error *errp)

This parameter name needs fixing too.

These are actually a pre-existing problem in current GIT, so worth fixing
in a separate patch.

> >  {
> >      int i;
> >  
> > +    if (errp) {
> > +        MigrationState *s = migrate_get_current();
> > +        migrate_set_error(s, errp);
> > +        if (s->state == MIGRATION_STATUS_SETUP ||
> > +            s->state == MIGRATION_STATUS_ACTIVE) {
> > +            migrate_set_state(&s->state, s->state,
> > +                              MIGRATION_STATUS_FAILED);
> > +        }
> > +    }
> > +
> >      for (i = 0; i < multifd_recv_state->count; i++) {
> >          MultiFDRecvParams *p = &multifd_recv_state->params[i];
> >  
> > -- 
> > 2.14.3
> > 
> > 
> 
> Regards,
> Daniel
> -- 
> |: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
> |: https://libvirt.org         -o-            https://fstop138.berrange.com :|
> |: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|
> 

Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH v11 10/15] migration: Create multifd channels
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 10/15] migration: Create multifd channels Juan Quintela
@ 2018-03-16 17:58   ` Daniel P. Berrangé
  0 siblings, 0 replies; 28+ messages in thread
From: Daniel P. Berrangé @ 2018-03-16 17:58 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, dgilbert, peterx

On Fri, Mar 16, 2018 at 12:53:58PM +0100, Juan Quintela wrote:
> In both sides.  We still don't transmit anything through them.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 50 ++++++++++++++++++++++++++++++++++++++++----------
>  1 file changed, 40 insertions(+), 10 deletions(-)

Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>


Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH v11 11/15] migration: Delay start of migration main routines
  2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 11/15] migration: Delay start of migration main routines Juan Quintela
@ 2018-03-16 18:03   ` Daniel P. Berrangé
  0 siblings, 0 replies; 28+ messages in thread
From: Daniel P. Berrangé @ 2018-03-16 18:03 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, dgilbert, peterx

On Fri, Mar 16, 2018 at 12:53:59PM +0100, Juan Quintela wrote:
> We need to make sure that we have started all the multifd threads.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/migration.c | 4 ++--
>  migration/migration.h | 1 +
>  migration/ram.c       | 3 +++
>  migration/socket.c    | 3 +++
>  4 files changed, 9 insertions(+), 2 deletions(-)

Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>


Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH v11 12/15] migration: Transmit initial package through the multifd channels
  2018-03-16 11:54 ` [Qemu-devel] [PATCH v11 12/15] migration: Transmit initial package through the multifd channels Juan Quintela
@ 2018-03-16 18:06   ` Daniel P. Berrangé
  0 siblings, 0 replies; 28+ messages in thread
From: Daniel P. Berrangé @ 2018-03-16 18:06 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, dgilbert, peterx

On Fri, Mar 16, 2018 at 12:54:00PM +0100, Juan Quintela wrote:
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> --
> 
> Be network agnostic.
> Add error checking for all values.
> ---
>  migration/ram.c | 97 ++++++++++++++++++++++++++++++++++++++++++++++++++++++---
>  1 file changed, 92 insertions(+), 5 deletions(-)
> 


> +static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
> +{
> +    MultiFDInit_t msg;
> +    int ret;
> +
> +    ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
> +    if (ret != 0) {
> +        return -1;
> +    }
> +
> +    be32_to_cpus(&msg.magic);
> +    be32_to_cpus(&msg.version);
> +
> +    if (msg.magic != MULTIFD_MAGIC) {
> +        error_setg(errp, "multifd: recevied packet magic %d "

s/recevied/received/

and in few places below too

> +                   "expected %d", msg.magic, MULTIFD_MAGIC);
> +        return -1;
> +    }
> +
> +    if (msg.version != MULTIFD_VERSION) {
> +        error_setg(errp, "multifd: recevied packet version %d "
> +                   "expected %d", msg.version, MULTIFD_VERSION);
> +        return -1;
> +    }
> +
> +    if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
> +        char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
> +        error_setg(errp, "multifd: received uuid '%s' and expected "
> +                   "uuid '%s' for channel %hhd", msg.uuid, uuid, msg.id);
> +        g_free(uuid);
> +        return -1;
> +    }
> +
> +    if (msg.id > migrate_multifd_channels()) {
> +        error_setg(errp, "multifd: recevied channel version %d "
> +                   "expected %d", msg.version, MULTIFD_VERSION);
> +        return -1;
> +    }
> +
> +    return msg.id;
> +}

Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>


Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [RFC v11 00/15] mutifd
  2018-03-16 11:53 [Qemu-devel] [RFC v11 00/15] mutifd Juan Quintela
                   ` (14 preceding siblings ...)
  2018-03-16 11:54 ` [Qemu-devel] [PATCH v11 15/15] [RFC] migration: Send pages through the multifd channels Juan Quintela
@ 2018-03-16 18:08 ` Daniel P. Berrangé
  15 siblings, 0 replies; 28+ messages in thread
From: Daniel P. Berrangé @ 2018-03-16 18:08 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, dgilbert, peterx

On Fri, Mar 16, 2018 at 12:53:48PM +0100, Juan Quintela wrote:
> Multifd
> 
> 
> Hi
> 
> [v11]
> 
> Changes on top of previous sumbimission:
> - Now on top of migration-tests/v6 that I sent on Wednesday
> - Rebased to latest upstream
> - Everything that is sent through the network should be converted correctly
>   (famous last words)
> - Still on RFC (sometimes it ends some packets at the end), just to
>   show how things are going on.  Problems are only on the last patch.
> 
> - Redo some locking (again) Now the problem is being able te send the
>   synchronization through the multifd channels.  I end the migration
>   _before_ all the channels have recevied all the packets.
> 
> - Trying to get a flags argument into each packet, to be able to synchronze
>   through the network, not from the "main" incoming corroutine.
> 
> - Related to the network-safe fields, now everything is in its own
>   routine, it should be easier to understand/review.  Once there, I
>   check that all values are inside range.
> 
> So, please comment.

Just a few very minor things I've noticed. No more comments from me,
i'll defer to someone else for understanding of the actual RAM page
handling patches


Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH v11 01/15] migration: Set error state in case of error
  2018-03-16 17:57     ` Daniel P. Berrangé
@ 2018-04-06 17:10       ` Juan Quintela
  0 siblings, 0 replies; 28+ messages in thread
From: Juan Quintela @ 2018-04-06 17:10 UTC (permalink / raw)
  To: Daniel P. Berrangé; +Cc: lvivier, qemu-devel, peterx, dgilbert

Daniel P. Berrange <berrange@redhat.com> wrote:
> On Fri, Mar 16, 2018 at 05:49:07PM +0000, Daniel P. Berrangé wrote:
>> On Fri, Mar 16, 2018 at 12:53:49PM +0100, Juan Quintela wrote:
>> > Signed-off-by: Juan Quintela <quintela@redhat.com>
>> > ---
>> >  migration/ram.c | 20 ++++++++++++++++++++
>> >  1 file changed, 20 insertions(+)
>> > 
>> > diff --git a/migration/ram.c b/migration/ram.c
>> > index 7266351fd0..1b8095a358 100644
>> > --- a/migration/ram.c
>> > +++ b/migration/ram.c
>> > @@ -414,6 +414,16 @@ static void terminate_multifd_send_threads(Error *errp)
>> >  {
>> >      int i;
>> >  
>> > +    if (errp) {
>> > +        MigrationState *s = migrate_get_current();
>> > +        migrate_set_error(s, errp);
>> 
>> This doesn't look quiet right. You're checking if 'errp' is a non-NULL,
>> which just tells you if the caller wants to collect the error, not
>> whether an error has happened. For the latter you need
>> 
>>   if (errp && *errp)
>> 
>> seems a little strange though for the caller to pass an error into this
>> method for reporting.
>
> Oh wait, I'm being mislead by the unusual parameter name.
>
> An "errp" name should only ever be used for a "Error **", but we
> only have an "Error *" here.

Copy & Paste O:-)

> So just fix the parameter name to be "err" instead of "errp".

Done.

Thanks,

^ permalink raw reply	[flat|nested] 28+ messages in thread

end of thread, other threads:[~2018-04-06 17:10 UTC | newest]

Thread overview: 28+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-03-16 11:53 [Qemu-devel] [RFC v11 00/15] mutifd Juan Quintela
2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 01/15] migration: Set error state in case of error Juan Quintela
2018-03-16 17:49   ` Daniel P. Berrangé
2018-03-16 17:57     ` Daniel P. Berrangé
2018-04-06 17:10       ` Juan Quintela
2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 02/15] migration: In case of error just end the migration Juan Quintela
2018-03-16 17:49   ` Daniel P. Berrangé
2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 03/15] migration: terminate_* can be called for other threads Juan Quintela
2018-03-16 17:51   ` Daniel P. Berrangé
2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 04/15] migration: Introduce multifd_recv_new_channel() Juan Quintela
2018-03-16 17:51   ` Daniel P. Berrangé
2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 05/15] migration: Be sure all recv channels are created Juan Quintela
2018-03-16 17:52   ` Daniel P. Berrangé
2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 06/15] migration: Export functions to create send channels Juan Quintela
2018-03-16 17:53   ` Daniel P. Berrangé
2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 07/15] migration: Synchronize send threads Juan Quintela
2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 08/15] migration: Synchronize recv threads Juan Quintela
2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 09/15] migration: Add multifd traces for start/end thread Juan Quintela
2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 10/15] migration: Create multifd channels Juan Quintela
2018-03-16 17:58   ` Daniel P. Berrangé
2018-03-16 11:53 ` [Qemu-devel] [PATCH v11 11/15] migration: Delay start of migration main routines Juan Quintela
2018-03-16 18:03   ` Daniel P. Berrangé
2018-03-16 11:54 ` [Qemu-devel] [PATCH v11 12/15] migration: Transmit initial package through the multifd channels Juan Quintela
2018-03-16 18:06   ` Daniel P. Berrangé
2018-03-16 11:54 ` [Qemu-devel] [PATCH v11 13/15] migration: Create ram_multifd_page Juan Quintela
2018-03-16 11:54 ` [Qemu-devel] [PATCH v11 14/15] migration: Create pages structure for reception Juan Quintela
2018-03-16 11:54 ` [Qemu-devel] [PATCH v11 15/15] [RFC] migration: Send pages through the multifd channels Juan Quintela
2018-03-16 18:08 ` [Qemu-devel] [RFC v11 00/15] mutifd Daniel P. Berrangé

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.