All of lore.kernel.org
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH v7 00/22] Multifd
@ 2017-09-06 11:51 Juan Quintela
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 01/22] Revert "io: add new qio_channel_{readv, writev, read, write}_all functions" Juan Quintela
                   ` (21 more replies)
  0 siblings, 22 replies; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Hi

Things NOT done yet:

  be able to return with errors from other threads.
- still connects synchronusly.  I need to redo the other changes to fix this.
  doing it asynchronously is easy.  Making sure that everything finishes is not.
  testing this.
- renaming outgoing variables, thingking about it some more
- paolo suggestion of not having a control channel
  needs iyet more cleanups to be able to have more than one ramstate, trying it.
- still not performance done, but it has been very stable

Please, comment.

On v7:
- tests fixed as danp wanted
- have to revert danp qio_*_all patches, as they break multifd, I have to investigate why.
- error_abort is gone.  After several tries about getting errors, I ended having a single error
  proceted by a lock and first error wins.
- Addressed basically all reviews (see on ToDo)
- Pointers to struct are done now
- fix lots of leaks
- lots of small fixes


[v6]
- Improve migration_ioc_porcess_incoming
- teach about G_SOURCE_REMOVE/CONTINUE
- Add test for migration_has_all_channels
- use DEFIN_PROP*
- change recv_state to use pointers to parameters
  make easier to receive channels out of order
- use g_strdup_printf()
- improve count of threads to know when we have to finish
- report channel id's on errors
- Use last_page parameter for multifd_send_page() sooner
- Improve commets for address
- use g_new0() instead of g_malloc()
- create MULTIFD_CONTINUE instead of using UINT16_MAX
- clear memory used by group of pages
  once there, pass everything to the global state variables instead of being
  local to the function.  This way it works if we cancel migration and start
  a new one
- Really wait to create the migration_thread until all channels are created
- split initial_bytes setup to make clearer following patches.
- createRAM_SAVE_FLAG_MULTIFD_SYNC macro, to make clear what we are doing
- move setting of need_flush to inside bitmap_sync
- Lots of other small changes & reorderings

Please, comment.


[v5]

- tests from qio functions (a.k.a. make danp happy)
- 1st message from one channel to the other contains:
   <uuid> multifd <channel number>
   This would allow us to create more channels as we want them.
   a.k.a. Making dave happy
- Waiting in reception for new channels using qio listeners
  Getting threads, qio and reference counters working at the same time
  was interesing.
  Another make danp happy.

- Lots and lots of small changes and fixes.  Notice that the last 70 patches
  that I merged or so what to make this series easier/smaller.

- NOT DONE: I haven't been woring on measuring performance
  differences, this was about getting the creation of the
  threads/channels right.

So, what I want:

- Are people happy with how I have (ab)used qio channels? (yes danp,
  that is you).
- My understanding is th

ToDo:

- Make paolo happy: He wanted to test using control information
  through each channel, not only pages.  This requires yet more
  cleanups to be able to have more than one QEMUFile/RAMState open at
  the same time.

- How I create multiple channels.  Things I know:
  * with current changes, it should work with fd/channels (the multifd bits),
    but we don;t have a way to pass multiple fd;s or exec files.
    Danp, any idea about how to create an UI for it?
  * My idea is that we would split current code to be:
    + channel creation at migration.c
    + rest of bits at ram.c
    + change format to:
      <uuid> main <rest of migration capabilities/paramentes> so we can check
      <uuid> postcopy <no clue what parameters are needed>
          Dave wanted a way to create a new fd for postcopy for some time
    + Adding new channels is easy

- Performance data/numbers: Yes, I wanted to get this out at once, I
  would continue with this.


Please, review.


[v4]
This is the 4th version of multifd. Changes:
- XBZRLE don't need to be checked for
- Documentation and defaults are consistent
- split socketArgs
- use iovec instead of creating something similar.
- We use now the exported size of target page (another HACK removal)
- created qio_chanel_{wirtev,readv}_all functions.  the _full() name
  was already taken.
  What they do is the same that the without _all() function, but if it
  returns due to blocking it redo the call.
- it is checkpatch.pl clean now.

Please comment, Juan.


Juan Quintela (22):
  Revert "io: add new qio_channel_{readv, writev, read, write}_all
    functions"
  migration: Create migration_ioc_process_incoming()
  migration: Teach it about G_SOURCE_REMOVE
  migration: Add comments to channel functions
  migration: Create migration_has_all_channels
  migration: Improve migration thread error handling
  migration: Make migrate_fd_error() the owner of the Error
  qio: Create new qio_channel_{readv,writev}_all
  migration: Add multifd capability
  migration: Create x-multifd-threads parameter
  migration: Create x-multifd-group parameter
  migration: Create multifd migration threads
  migration: Split migration_fd_process_incoming
  migration: Start of multiple fd work
  migration: Create ram_multifd_page
  migration: Really use multiple pages at a time
  migration: Send the fd number which we are going to use for this page
  migration: Create thread infrastructure for multifd recv side
  migration: Test new fd infrastructure
  migration: Rename initial_bytes
  migration: Transfer pages over new channels
  migration: Flush receive queue

 hmp.c                         |  16 ++
 include/glib-compat.h         |   2 +
 include/io/channel.h          |  70 +-----
 io/channel.c                  |  57 ++---
 migration/channel.c           |  19 +-
 migration/exec.c              |   2 +-
 migration/fd.c                |   2 +-
 migration/migration.c         | 176 +++++++++++--
 migration/migration.h         |  17 +-
 migration/qemu-file-channel.c |  29 +--
 migration/ram.c               | 564 +++++++++++++++++++++++++++++++++++++++++-
 migration/ram.h               |   8 +
 migration/socket.c            |  46 +++-
 migration/socket.h            |  10 +
 migration/tls.c               |   1 -
 qapi/migration.json           |  44 +++-
 tests/io-channel-helpers.c    | 120 ++++++++-
 17 files changed, 1016 insertions(+), 167 deletions(-)

-- 
2.13.5

^ permalink raw reply	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 01/22] Revert "io: add new qio_channel_{readv, writev, read, write}_all functions"
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-06 14:00   ` Eric Blake
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 02/22] migration: Create migration_ioc_process_incoming() Juan Quintela
                   ` (20 subsequent siblings)
  21 siblings, 1 reply; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

This reverts commit d4622e55883211072621958d39ddaa73483d201e.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 include/io/channel.h       |  90 ---------------------------------------
 io/channel.c               |  94 -----------------------------------------
 tests/io-channel-helpers.c | 102 +++++++++++++++++++++++++++++++++++++++++----
 3 files changed, 93 insertions(+), 193 deletions(-)

diff --git a/include/io/channel.h b/include/io/channel.h
index 8f25893c45..54f3dc252f 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -269,58 +269,6 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
                                 Error **errp);
 
 /**
- * qio_channel_readv_all:
- * @ioc: the channel object
- * @iov: the array of memory regions to read data into
- * @niov: the length of the @iov array
- * @errp: pointer to a NULL-initialized error object
- *
- * Read data from the IO channel, storing it in the
- * memory regions referenced by @iov. Each element
- * in the @iov will be fully populated with data
- * before the next one is used. The @niov parameter
- * specifies the total number of elements in @iov.
- *
- * The function will wait for all requested data
- * to be read, yielding from the current coroutine
- * if required.
- *
- * If end-of-file occurs before all requested data
- * has been read, an error will be reported.
- *
- * Returns: 0 if all bytes were read, or -1 on error
- */
-int qio_channel_readv_all(QIOChannel *ioc,
-                          const struct iovec *iov,
-                          size_t niov,
-                          Error **errp);
-
-
-/**
- * qio_channel_writev_all:
- * @ioc: the channel object
- * @iov: the array of memory regions to write data from
- * @niov: the length of the @iov array
- * @errp: pointer to a NULL-initialized error object
- *
- * Write data to the IO channel, reading it from the
- * memory regions referenced by @iov. Each element
- * in the @iov will be fully sent, before the next
- * one is used. The @niov parameter specifies the
- * total number of elements in @iov.
- *
- * The function will wait for all requested data
- * to be written, yielding from the current coroutine
- * if required.
- *
- * Returns: 0 if all bytes were written, or -1 on error
- */
-int qio_channel_writev_all(QIOChannel *ioc,
-                           const struct iovec *iov,
-                           size_t niov,
-                           Error **erp);
-
-/**
  * qio_channel_readv:
  * @ioc: the channel object
  * @iov: the array of memory regions to read data into
@@ -383,44 +331,6 @@ ssize_t qio_channel_write(QIOChannel *ioc,
                           Error **errp);
 
 /**
- * qio_channel_read_all:
- * @ioc: the channel object
- * @buf: the memory region to read data into
- * @buflen: the number of bytes to @buf
- * @errp: pointer to a NULL-initialized error object
- *
- * Reads @buflen bytes into @buf, possibly blocking or (if the
- * channel is non-blocking) yielding from the current coroutine
- * multiple times until the entire content is read. If end-of-file
- * occurs it will return an error rather than a short-read. Otherwise
- * behaves as qio_channel_read().
- *
- * Returns: 0 if all bytes were read, or -1 on error
- */
-int qio_channel_read_all(QIOChannel *ioc,
-                         char *buf,
-                         size_t buflen,
-                         Error **errp);
-/**
- * qio_channel_write_all:
- * @ioc: the channel object
- * @buf: the memory region to write data into
- * @buflen: the number of bytes to @buf
- * @errp: pointer to a NULL-initialized error object
- *
- * Writes @buflen bytes from @buf, possibly blocking or (if the
- * channel is non-blocking) yielding from the current coroutine
- * multiple times until the entire content is written.  Otherwise
- * behaves as qio_channel_write().
- *
- * Returns: 0 if all bytes were written, or -1 on error
- */
-int qio_channel_write_all(QIOChannel *ioc,
-                          const char *buf,
-                          size_t buflen,
-                          Error **errp);
-
-/**
  * qio_channel_set_blocking:
  * @ioc: the channel object
  * @enabled: the blocking flag state
diff --git a/io/channel.c b/io/channel.c
index 5e8c2f0a91..1cfb8b33a2 100644
--- a/io/channel.c
+++ b/io/channel.c
@@ -22,7 +22,6 @@
 #include "io/channel.h"
 #include "qapi/error.h"
 #include "qemu/main-loop.h"
-#include "qemu/iov.h"
 
 bool qio_channel_has_feature(QIOChannel *ioc,
                              QIOChannelFeature feature)
@@ -86,79 +85,6 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
 }
 
 
-
-int qio_channel_readv_all(QIOChannel *ioc,
-                          const struct iovec *iov,
-                          size_t niov,
-                          Error **errp)
-{
-    int ret = -1;
-    struct iovec *local_iov = g_new(struct iovec, niov);
-    struct iovec *local_iov_head = local_iov;
-    unsigned int nlocal_iov = niov;
-
-    nlocal_iov = iov_copy(local_iov, nlocal_iov,
-                          iov, niov,
-                          0, iov_size(iov, niov));
-
-    while (nlocal_iov > 0) {
-        ssize_t len;
-        len = qio_channel_readv(ioc, local_iov, nlocal_iov, errp);
-        if (len == QIO_CHANNEL_ERR_BLOCK) {
-            qio_channel_wait(ioc, G_IO_IN);
-            continue;
-        } else if (len < 0) {
-            goto cleanup;
-        } else if (len == 0) {
-            error_setg(errp,
-                       "Unexpected end-of-file before all bytes were read");
-            goto cleanup;
-        }
-
-        iov_discard_front(&local_iov, &nlocal_iov, len);
-    }
-
-    ret = 0;
-
- cleanup:
-    g_free(local_iov_head);
-    return ret;
-}
-
-int qio_channel_writev_all(QIOChannel *ioc,
-                           const struct iovec *iov,
-                           size_t niov,
-                           Error **errp)
-{
-    int ret = -1;
-    struct iovec *local_iov = g_new(struct iovec, niov);
-    struct iovec *local_iov_head = local_iov;
-    unsigned int nlocal_iov = niov;
-
-    nlocal_iov = iov_copy(local_iov, nlocal_iov,
-                          iov, niov,
-                          0, iov_size(iov, niov));
-
-    while (nlocal_iov > 0) {
-        ssize_t len;
-        len = qio_channel_writev(ioc, local_iov, nlocal_iov, errp);
-        if (len == QIO_CHANNEL_ERR_BLOCK) {
-            qio_channel_wait(ioc, G_IO_OUT);
-            continue;
-        }
-        if (len < 0) {
-            goto cleanup;
-        }
-
-        iov_discard_front(&local_iov, &nlocal_iov, len);
-    }
-
-    ret = 0;
- cleanup:
-    g_free(local_iov_head);
-    return ret;
-}
-
 ssize_t qio_channel_readv(QIOChannel *ioc,
                           const struct iovec *iov,
                           size_t niov,
@@ -197,26 +123,6 @@ ssize_t qio_channel_write(QIOChannel *ioc,
 }
 
 
-int qio_channel_read_all(QIOChannel *ioc,
-                         char *buf,
-                         size_t buflen,
-                         Error **errp)
-{
-    struct iovec iov = { .iov_base = buf, .iov_len = buflen };
-    return qio_channel_readv_all(ioc, &iov, 1, errp);
-}
-
-
-int qio_channel_write_all(QIOChannel *ioc,
-                          const char *buf,
-                          size_t buflen,
-                          Error **errp)
-{
-    struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
-    return qio_channel_writev_all(ioc, &iov, 1, errp);
-}
-
-
 int qio_channel_set_blocking(QIOChannel *ioc,
                               bool enabled,
                               Error **errp)
diff --git a/tests/io-channel-helpers.c b/tests/io-channel-helpers.c
index 5430e1389d..05e5579cf8 100644
--- a/tests/io-channel-helpers.c
+++ b/tests/io-channel-helpers.c
@@ -21,7 +21,6 @@
 #include "qemu/osdep.h"
 #include "io-channel-helpers.h"
 #include "qapi/error.h"
-#include "qemu/iov.h"
 
 struct QIOChannelTest {
     QIOChannel *src;
@@ -38,17 +37,77 @@ struct QIOChannelTest {
 };
 
 
+static void test_skip_iovec(struct iovec **iov,
+                            size_t *niov,
+                            size_t skip,
+                            struct iovec *old)
+{
+    size_t offset = 0;
+    size_t i;
+
+    for (i = 0; i < *niov; i++) {
+        if (skip < (*iov)[i].iov_len) {
+            old->iov_len = (*iov)[i].iov_len;
+            old->iov_base = (*iov)[i].iov_base;
+
+            (*iov)[i].iov_len -= skip;
+            (*iov)[i].iov_base += skip;
+            break;
+        } else {
+            skip -= (*iov)[i].iov_len;
+
+            if (i == 0 && old->iov_base) {
+                (*iov)[i].iov_len = old->iov_len;
+                (*iov)[i].iov_base = old->iov_base;
+                old->iov_len = 0;
+                old->iov_base = NULL;
+            }
+
+            offset++;
+        }
+    }
+
+    *iov = *iov + offset;
+    *niov -= offset;
+}
+
+
 /* This thread sends all data using iovecs */
 static gpointer test_io_thread_writer(gpointer opaque)
 {
     QIOChannelTest *data = opaque;
+    struct iovec *iov = data->inputv;
+    size_t niov = data->niov;
+    struct iovec old = { 0 };
 
     qio_channel_set_blocking(data->src, data->blocking, NULL);
 
-    qio_channel_writev_all(data->src,
-                           data->inputv,
-                           data->niov,
-                           &data->writeerr);
+    while (niov) {
+        ssize_t ret;
+        ret = qio_channel_writev(data->src,
+                                 iov,
+                                 niov,
+                                 &data->writeerr);
+        if (ret == QIO_CHANNEL_ERR_BLOCK) {
+            if (data->blocking) {
+                error_setg(&data->writeerr,
+                           "Unexpected I/O blocking");
+                break;
+            } else {
+                qio_channel_wait(data->src,
+                                 G_IO_OUT);
+                continue;
+            }
+        } else if (ret < 0) {
+            break;
+        } else if (ret == 0) {
+            error_setg(&data->writeerr,
+                       "Unexpected zero length write");
+            break;
+        }
+
+        test_skip_iovec(&iov, &niov, ret, &old);
+    }
 
     return NULL;
 }
@@ -58,13 +117,38 @@ static gpointer test_io_thread_writer(gpointer opaque)
 static gpointer test_io_thread_reader(gpointer opaque)
 {
     QIOChannelTest *data = opaque;
+    struct iovec *iov = data->outputv;
+    size_t niov = data->niov;
+    struct iovec old = { 0 };
 
     qio_channel_set_blocking(data->dst, data->blocking, NULL);
 
-    qio_channel_readv_all(data->dst,
-                          data->outputv,
-                          data->niov,
-                          &data->readerr);
+    while (niov) {
+        ssize_t ret;
+
+        ret = qio_channel_readv(data->dst,
+                                iov,
+                                niov,
+                                &data->readerr);
+
+        if (ret == QIO_CHANNEL_ERR_BLOCK) {
+            if (data->blocking) {
+                error_setg(&data->readerr,
+                           "Unexpected I/O blocking");
+                break;
+            } else {
+                qio_channel_wait(data->dst,
+                                 G_IO_IN);
+                continue;
+            }
+        } else if (ret < 0) {
+            break;
+        } else if (ret == 0) {
+            break;
+        }
+
+        test_skip_iovec(&iov, &niov, ret, &old);
+    }
 
     return NULL;
 }
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 02/22] migration: Create migration_ioc_process_incoming()
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 01/22] Revert "io: add new qio_channel_{readv, writev, read, write}_all functions" Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 03/22] migration: Teach it about G_SOURCE_REMOVE Juan Quintela
                   ` (19 subsequent siblings)
  21 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We pass the ioc instead of the fd.  This will allow us to have more
than one channel open.  We also make sure that we set the
from_src_file sooner, so we don't need to pass it as a parameter.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Daniel P. Berrange <berrange@redhat.com>
---
 migration/channel.c   |  3 +--
 migration/migration.c | 23 +++++++++++++++++++----
 migration/migration.h |  2 ++
 3 files changed, 22 insertions(+), 6 deletions(-)

diff --git a/migration/channel.c b/migration/channel.c
index 3b7252f5a2..edceebdb7b 100644
--- a/migration/channel.c
+++ b/migration/channel.c
@@ -36,8 +36,7 @@ void migration_channel_process_incoming(QIOChannel *ioc)
             error_report_err(local_err);
         }
     } else {
-        QEMUFile *f = qemu_fopen_channel_input(ioc);
-        migration_fd_process_incoming(f);
+        migration_ioc_process_incoming(ioc);
     }
 }
 
diff --git a/migration/migration.c b/migration/migration.c
index 1a2b3ebd1a..d16d8a63ec 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -306,17 +306,16 @@ static void process_incoming_migration_bh(void *opaque)
 
 static void process_incoming_migration_co(void *opaque)
 {
-    QEMUFile *f = opaque;
     MigrationIncomingState *mis = migration_incoming_get_current();
     PostcopyState ps;
     int ret;
 
-    mis->from_src_file = f;
+    assert(mis->from_src_file);
     mis->largest_page_size = qemu_ram_pagesize_largest();
     postcopy_state_set(POSTCOPY_INCOMING_NONE);
     migrate_set_state(&mis->state, MIGRATION_STATUS_NONE,
                       MIGRATION_STATUS_ACTIVE);
-    ret = qemu_loadvm_state(f);
+    ret = qemu_loadvm_state(mis->from_src_file);
 
     ps = postcopy_state_get();
     trace_process_incoming_migration_co_end(ret, ps);
@@ -364,12 +363,28 @@ static void process_incoming_migration_co(void *opaque)
 
 void migration_fd_process_incoming(QEMUFile *f)
 {
-    Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, f);
+    Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, NULL);
+    MigrationIncomingState *mis = migration_incoming_get_current();
 
+    if (!mis->from_src_file) {
+        mis->from_src_file = f;
+    }
     qemu_file_set_blocking(f, false);
     qemu_coroutine_enter(co);
 }
 
+void migration_ioc_process_incoming(QIOChannel *ioc)
+{
+    MigrationIncomingState *mis = migration_incoming_get_current();
+
+    if (!mis->from_src_file) {
+        QEMUFile *f = qemu_fopen_channel_input(ioc);
+        mis->from_src_file = f;
+        migration_fd_process_incoming(f);
+    }
+    /* We still only have a single channel.  Nothing to do here yet */
+}
+
 /*
  * Send a 'SHUT' message on the return channel with the given value
  * to indicate that we've finished with the RP.  Non-0 value indicates
diff --git a/migration/migration.h b/migration/migration.h
index 148c9facbc..99c398d484 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -20,6 +20,7 @@
 #include "exec/cpu-common.h"
 #include "qemu/coroutine_int.h"
 #include "hw/qdev.h"
+#include "io/channel.h"
 
 /* State for the incoming migration */
 struct MigrationIncomingState {
@@ -152,6 +153,7 @@ struct MigrationState
 void migrate_set_state(int *state, int old_state, int new_state);
 
 void migration_fd_process_incoming(QEMUFile *f);
+void migration_ioc_process_incoming(QIOChannel *ioc);
 
 uint64_t migrate_max_downtime(void);
 
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 03/22] migration: Teach it about G_SOURCE_REMOVE
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 01/22] Revert "io: add new qio_channel_{readv, writev, read, write}_all functions" Juan Quintela
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 02/22] migration: Create migration_ioc_process_incoming() Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-08 17:18   ` Dr. David Alan Gilbert
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 04/22] migration: Add comments to channel functions Juan Quintela
                   ` (18 subsequent siblings)
  21 siblings, 1 reply; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

As this is defined on glib 2.32, add compatibility macros for older glibs.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Daniel P. Berrange <berrange@redhat.com>
Reviewed-by: Peter Xu <peterx@redhat.com>
---
 include/glib-compat.h | 2 ++
 migration/exec.c      | 2 +-
 migration/fd.c        | 2 +-
 migration/socket.c    | 2 +-
 4 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/include/glib-compat.h b/include/glib-compat.h
index fcffcd3f07..e15aca2d40 100644
--- a/include/glib-compat.h
+++ b/include/glib-compat.h
@@ -223,6 +223,8 @@ static inline gboolean g_hash_table_contains(GHashTable *hash_table,
 {
     return g_hash_table_lookup_extended(hash_table, key, NULL, NULL);
 }
+#define G_SOURCE_CONTINUE TRUE
+#define G_SOURCE_REMOVE FALSE
 #endif
 
 #ifndef g_assert_true
diff --git a/migration/exec.c b/migration/exec.c
index 08b599e0e2..f3be1baf2e 100644
--- a/migration/exec.c
+++ b/migration/exec.c
@@ -49,7 +49,7 @@ static gboolean exec_accept_incoming_migration(QIOChannel *ioc,
 {
     migration_channel_process_incoming(ioc);
     object_unref(OBJECT(ioc));
-    return FALSE; /* unregister */
+    return G_SOURCE_REMOVE;
 }
 
 void exec_start_incoming_migration(const char *command, Error **errp)
diff --git a/migration/fd.c b/migration/fd.c
index 30f5258a6a..30de4b9847 100644
--- a/migration/fd.c
+++ b/migration/fd.c
@@ -49,7 +49,7 @@ static gboolean fd_accept_incoming_migration(QIOChannel *ioc,
 {
     migration_channel_process_incoming(ioc);
     object_unref(OBJECT(ioc));
-    return FALSE; /* unregister */
+    return G_SOURCE_REMOVE;
 }
 
 void fd_start_incoming_migration(const char *infd, Error **errp)
diff --git a/migration/socket.c b/migration/socket.c
index 757d3821a1..b02d37d7a3 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -154,7 +154,7 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
 out:
     /* Close listening socket as its no longer needed */
     qio_channel_close(ioc, NULL);
-    return FALSE; /* unregister */
+    return G_SOURCE_REMOVE;
 }
 
 
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 04/22] migration: Add comments to channel functions
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
                   ` (2 preceding siblings ...)
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 03/22] migration: Teach it about G_SOURCE_REMOVE Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 05/22] migration: Create migration_has_all_channels Juan Quintela
                   ` (17 subsequent siblings)
  21 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Daniel P. Berrange <berrange@redhat.com>
---
 migration/channel.c | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git a/migration/channel.c b/migration/channel.c
index edceebdb7b..70ec7ea3b7 100644
--- a/migration/channel.c
+++ b/migration/channel.c
@@ -19,6 +19,14 @@
 #include "qapi/error.h"
 #include "io/channel-tls.h"
 
+/**
+ * @migration_channel_process_incoming - Create new incoming migration channel
+ *
+ * Notice that TLS is special.  For it we listen in a listener socket,
+ * and then create a new client socket from the TLS library.
+ *
+ * @ioc: Channel to which we are connecting
+ */
 void migration_channel_process_incoming(QIOChannel *ioc)
 {
     MigrationState *s = migrate_get_current();
@@ -41,6 +49,13 @@ void migration_channel_process_incoming(QIOChannel *ioc)
 }
 
 
+/**
+ * @migration_channel_connect - Create new outgoing migration channel
+ *
+ * @s: Current migration state
+ * @ioc: Channel to which we are connecting
+ * @hostname: Where we want to connect
+ */
 void migration_channel_connect(MigrationState *s,
                                QIOChannel *ioc,
                                const char *hostname)
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 05/22] migration: Create migration_has_all_channels
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
                   ` (3 preceding siblings ...)
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 04/22] migration: Add comments to channel functions Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 06/22] migration: Improve migration thread error handling Juan Quintela
                   ` (16 subsequent siblings)
  21 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

This function allows us to decide when to close the listener socket.
For now, we only need one connection.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Daniel P. Berrange <berrange@redhat.com>
---
 migration/migration.c | 11 +++++++++++
 migration/migration.h |  2 ++
 migration/socket.c    | 10 +++++++---
 3 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index d16d8a63ec..3d57e5ee90 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -385,6 +385,17 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
     /* We still only have a single channel.  Nothing to do here yet */
 }
 
+/**
+ * @migration_has_all_channels: We have received all channels that we need
+ *
+ * Returns true when we have got connections to all the channels that
+ * we need for migration.
+ */
+bool migration_has_all_channels(void)
+{
+    return true;
+}
+
 /*
  * Send a 'SHUT' message on the return channel with the given value
  * to indicate that we've finished with the RP.  Non-0 value indicates
diff --git a/migration/migration.h b/migration/migration.h
index 99c398d484..1881e4a754 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -155,6 +155,8 @@ void migrate_set_state(int *state, int old_state, int new_state);
 void migration_fd_process_incoming(QEMUFile *f);
 void migration_ioc_process_incoming(QIOChannel *ioc);
 
+bool  migration_has_all_channels(void);
+
 uint64_t migrate_max_downtime(void);
 
 void migrate_fd_error(MigrationState *s, const Error *error);
diff --git a/migration/socket.c b/migration/socket.c
index b02d37d7a3..dee869044a 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -152,9 +152,13 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
     object_unref(OBJECT(sioc));
 
 out:
-    /* Close listening socket as its no longer needed */
-    qio_channel_close(ioc, NULL);
-    return G_SOURCE_REMOVE;
+    if (migration_has_all_channels()) {
+        /* Close listening socket as its no longer needed */
+        qio_channel_close(ioc, NULL);
+        return G_SOURCE_REMOVE;
+    } else {
+        return G_SOURCE_CONTINUE;
+    }
 }
 
 
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 06/22] migration: Improve migration thread error handling
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
                   ` (4 preceding siblings ...)
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 05/22] migration: Create migration_has_all_channels Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-06 19:04   ` Dr. David Alan Gilbert
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 07/22] migration: Make migrate_fd_error() the owner of the Error Juan Quintela
                   ` (15 subsequent siblings)
  21 siblings, 1 reply; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We now report errors also when we finish migration, not only on info
migrate.  We plan to use this error from several places, and we want
the first error to happen to win, so we add an mutex to order it.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/migration.c | 19 ++++++++++++++++---
 migration/migration.h |  7 ++++++-
 migration/ram.c       |  2 +-
 migration/tls.c       |  1 -
 4 files changed, 23 insertions(+), 6 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 3d57e5ee90..5e17168c1c 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -1013,19 +1013,30 @@ static void migrate_fd_cleanup(void *opaque)
                           MIGRATION_STATUS_CANCELLED);
     }
 
+    if (s->error) {
+        /* It is used on info migrate.  We can't free it */
+        error_report_err(error_copy(s->error));
+    }
     notifier_list_notify(&migration_state_notifiers, s);
     block_cleanup_parameters(s);
 }
 
+void migrate_set_error(MigrationState *s, const Error *error)
+{
+    qemu_mutex_lock(&s->error_mutex);
+    if (!s->error) {
+        s->error = error_copy(error);
+    }
+    qemu_mutex_unlock(&s->error_mutex);
+}
+
 void migrate_fd_error(MigrationState *s, const Error *error)
 {
     trace_migrate_fd_error(error_get_pretty(error));
     assert(s->to_dst_file == NULL);
     migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
                       MIGRATION_STATUS_FAILED);
-    if (!s->error) {
-        s->error = error_copy(error);
-    }
+    migrate_set_error(s, error);
     notifier_list_notify(&migration_state_notifiers, s);
     block_cleanup_parameters(s);
 }
@@ -2244,6 +2255,7 @@ static void migration_instance_finalize(Object *obj)
     MigrationState *ms = MIGRATION_OBJ(obj);
     MigrationParameters *params = &ms->parameters;
 
+    qemu_mutex_destroy(&ms->error_mutex);
     g_free(params->tls_hostname);
     g_free(params->tls_creds);
 }
@@ -2256,6 +2268,7 @@ static void migration_instance_init(Object *obj)
     ms->state = MIGRATION_STATUS_NONE;
     ms->xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE;
     ms->mbps = -1;
+    qemu_mutex_init(&ms->error_mutex);
 
     params->tls_hostname = g_strdup("");
     params->tls_creds = g_strdup("");
diff --git a/migration/migration.h b/migration/migration.h
index 1881e4a754..9a81b8a70a 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -129,8 +129,12 @@ struct MigrationState
     int64_t colo_checkpoint_time;
     QEMUTimer *colo_delay_timer;
 
-    /* The last error that occurred */
+    /* The first error that has occurred.
+       We used the mutex to be able to return the 1st error message */
     Error *error;
+    /* mutex to protect errp */
+    QemuMutex error_mutex;
+
     /* Do we have to clean up -b/-i from old migrate parameters */
     /* This feature is deprecated and will be removed */
     bool must_remove_block_options;
@@ -159,6 +163,7 @@ bool  migration_has_all_channels(void);
 
 uint64_t migrate_max_downtime(void);
 
+void migrate_set_error(MigrationState *s, const Error *error);
 void migrate_fd_error(MigrationState *s, const Error *error);
 
 void migrate_fd_connect(MigrationState *s);
diff --git a/migration/ram.c b/migration/ram.c
index e18b3e2d4f..e0179fc838 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1789,7 +1789,7 @@ int ram_discard_range(const char *rbname, uint64_t start, size_t length)
     RAMBlock *rb = qemu_ram_block_by_name(rbname);
 
     if (!rb) {
-        error_report("ram_discard_range: Failed to find block '%s'", rbname);
+        error_report("ram_discard_rang0e: Failed to find block '%s'", rbname);
         goto err;
     }
 
diff --git a/migration/tls.c b/migration/tls.c
index 596e8790bd..026a008667 100644
--- a/migration/tls.c
+++ b/migration/tls.c
@@ -119,7 +119,6 @@ static void migration_tls_outgoing_handshake(QIOTask *task,
     if (qio_task_propagate_error(task, &err)) {
         trace_migration_tls_outgoing_handshake_error(error_get_pretty(err));
         migrate_fd_error(s, err);
-        error_free(err);
     } else {
         trace_migration_tls_outgoing_handshake_complete();
         migration_channel_connect(s, ioc, NULL);
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 07/22] migration: Make migrate_fd_error() the owner of the Error
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
                   ` (5 preceding siblings ...)
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 06/22] migration: Improve migration thread error handling Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-06 16:15   ` Eric Blake
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 08/22] qio: Create new qio_channel_{readv, writev}_all Juan Quintela
                   ` (14 subsequent siblings)
  21 siblings, 1 reply; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

So far, we had to free the error after each caller, so just do it
here.  Once there, tls.c was leaking the error.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/channel.c   |  1 -
 migration/migration.c | 10 ++++------
 migration/migration.h |  4 ++--
 migration/socket.c    |  1 -
 4 files changed, 6 insertions(+), 10 deletions(-)

diff --git a/migration/channel.c b/migration/channel.c
index 70ec7ea3b7..1dd2ae1530 100644
--- a/migration/channel.c
+++ b/migration/channel.c
@@ -71,7 +71,6 @@ void migration_channel_connect(MigrationState *s,
         migration_tls_channel_connect(s, ioc, hostname, &local_err);
         if (local_err) {
             migrate_fd_error(s, local_err);
-            error_free(local_err);
         }
     } else {
         QEMUFile *f = qemu_fopen_channel_output(ioc);
diff --git a/migration/migration.c b/migration/migration.c
index 5e17168c1c..6191551e5a 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -1021,16 +1021,14 @@ static void migrate_fd_cleanup(void *opaque)
     block_cleanup_parameters(s);
 }
 
-void migrate_set_error(MigrationState *s, const Error *error)
+void migrate_set_error(MigrationState *s, Error *error)
 {
     qemu_mutex_lock(&s->error_mutex);
-    if (!s->error) {
-        s->error = error_copy(error);
-    }
+    error_propagate(&s->error, error);
     qemu_mutex_unlock(&s->error_mutex);
 }
 
-void migrate_fd_error(MigrationState *s, const Error *error)
+void migrate_fd_error(MigrationState *s, Error *error)
 {
     trace_migrate_fd_error(error_get_pretty(error));
     assert(s->to_dst_file == NULL);
@@ -1304,7 +1302,7 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
     }
 
     if (local_err) {
-        migrate_fd_error(s, local_err);
+        migrate_fd_error(s, error_copy(local_err));
         error_propagate(errp, local_err);
         return;
     }
diff --git a/migration/migration.h b/migration/migration.h
index 9a81b8a70a..7ddb0cdc1a 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -163,8 +163,8 @@ bool  migration_has_all_channels(void);
 
 uint64_t migrate_max_downtime(void);
 
-void migrate_set_error(MigrationState *s, const Error *error);
-void migrate_fd_error(MigrationState *s, const Error *error);
+void migrate_set_error(MigrationState *s, Error *error);
+void migrate_fd_error(MigrationState *s, Error *error);
 
 void migrate_fd_connect(MigrationState *s);
 
diff --git a/migration/socket.c b/migration/socket.c
index dee869044a..2d70747a1a 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -80,7 +80,6 @@ static void socket_outgoing_migration(QIOTask *task,
     if (qio_task_propagate_error(task, &err)) {
         trace_migration_socket_outgoing_error(error_get_pretty(err));
         migrate_fd_error(data->s, err);
-        error_free(err);
     } else {
         trace_migration_socket_outgoing_connected(data->hostname);
         migration_channel_connect(data->s, sioc, data->hostname);
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 08/22] qio: Create new qio_channel_{readv, writev}_all
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
                   ` (6 preceding siblings ...)
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 07/22] migration: Make migrate_fd_error() the owner of the Error Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-06 14:03   ` Eric Blake
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 09/22] migration: Add multifd capability Juan Quintela
                   ` (13 subsequent siblings)
  21 siblings, 1 reply; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

The functions waits until it is able to write the full iov.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--

Add tests.

fix reader to check for len == 0.

make reader wait on G_IO_IN (dave)
change tests to make sure that all combinations of call work with all backends
---
 include/io/channel.h          | 44 +++++++++++++++++++++++++
 io/channel.c                  | 77 +++++++++++++++++++++++++++++++++++++++++++
 migration/qemu-file-channel.c | 29 +---------------
 tests/io-channel-helpers.c    | 36 +++++++++++++++-----
 4 files changed, 149 insertions(+), 37 deletions(-)

diff --git a/include/io/channel.h b/include/io/channel.h
index 54f3dc252f..8e8e2faf1a 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -269,6 +269,50 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
                                 Error **errp);
 
 /**
+ * qio_channel_readv_all:
+ * @ioc: the channel object
+ * @iov: the array of memory regions to read data into
+ * @niov: the length of the @iov array
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Read data from the IO channel, storing it in the
+ * memory regions referenced by @iov. Each element
+ * in the @iov will be fully populated with data
+ * before the next one is used. The @niov parameter
+ * specifies the total number of elements in @iov.
+ *
+ * Returns: the number of bytes read, or -1 on error
+ */
+ssize_t qio_channel_readv_all(QIOChannel *ioc,
+                              const struct iovec *iov,
+                              size_t niov,
+                              Error **errp);
+
+
+/**
+ * qio_channel_writev_all:
+ * @ioc: the channel object
+ * @iov: the array of memory regions to write data from
+ * @niov: the length of the @iov array
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Write data to the IO channel, reading it from the
+ * memory regions referenced by @iov. Each element
+ * in the @iov will be fully sent, before the next
+ * one is used. The @niov parameter specifies the
+ * total number of elements in @iov.
+ *
+ * It is required for all @iov data to be fully
+ * sent.
+ *
+ * Returns: the number of bytes sent, or -1 on error,
+ */
+ssize_t qio_channel_writev_all(QIOChannel *ioc,
+                               const struct iovec *iov,
+                               size_t niov,
+                               Error **erp);
+
+/**
  * qio_channel_readv:
  * @ioc: the channel object
  * @iov: the array of memory regions to read data into
diff --git a/io/channel.c b/io/channel.c
index 1cfb8b33a2..702e712798 100644
--- a/io/channel.c
+++ b/io/channel.c
@@ -22,6 +22,7 @@
 #include "io/channel.h"
 #include "qapi/error.h"
 #include "qemu/main-loop.h"
+#include "qemu/iov.h"
 
 bool qio_channel_has_feature(QIOChannel *ioc,
                              QIOChannelFeature feature)
@@ -85,6 +86,82 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
 }
 
 
+
+ssize_t qio_channel_readv_all(QIOChannel *ioc,
+                              const struct iovec *iov,
+                              size_t niov,
+                              Error **errp)
+{
+    ssize_t done = 0;
+    struct iovec *local_iov = g_new(struct iovec, niov);
+    struct iovec *local_iov_head = local_iov;
+    unsigned int nlocal_iov = niov;
+
+    nlocal_iov = iov_copy(local_iov, nlocal_iov,
+                          iov, niov,
+                          0, iov_size(iov, niov));
+
+    while (nlocal_iov > 0) {
+        ssize_t len;
+        len = qio_channel_readv(ioc, local_iov, nlocal_iov, errp);
+        if (len == QIO_CHANNEL_ERR_BLOCK) {
+            qio_channel_wait(ioc, G_IO_IN);
+            continue;
+        } else if (len < 0) {
+            error_setg_errno(errp, EIO,
+                             "Channel was not able to read full iov");
+            done = -1;
+            goto cleanup;
+        } else if (len == 0) {
+            goto cleanup;
+        }
+
+        iov_discard_front(&local_iov, &nlocal_iov, len);
+        done += len;
+    }
+
+ cleanup:
+    g_free(local_iov_head);
+    return done;
+}
+
+ssize_t qio_channel_writev_all(QIOChannel *ioc,
+                               const struct iovec *iov,
+                               size_t niov,
+                               Error **errp)
+{
+    ssize_t done = 0;
+    struct iovec *local_iov = g_new(struct iovec, niov);
+    struct iovec *local_iov_head = local_iov;
+    unsigned int nlocal_iov = niov;
+
+    nlocal_iov = iov_copy(local_iov, nlocal_iov,
+                          iov, niov,
+                          0, iov_size(iov, niov));
+
+    while (nlocal_iov > 0) {
+        ssize_t len;
+        len = qio_channel_writev(ioc, local_iov, nlocal_iov, errp);
+        if (len == QIO_CHANNEL_ERR_BLOCK) {
+            qio_channel_wait(ioc, G_IO_OUT);
+            continue;
+        }
+        if (len < 0) {
+            error_setg_errno(errp, EIO,
+                             "Channel was not able to write full iov");
+            done = -1;
+            goto cleanup;
+        }
+
+        iov_discard_front(&local_iov, &nlocal_iov, len);
+        done += len;
+    }
+
+ cleanup:
+    g_free(local_iov_head);
+    return done;
+}
+
 ssize_t qio_channel_readv(QIOChannel *ioc,
                           const struct iovec *iov,
                           size_t niov,
diff --git a/migration/qemu-file-channel.c b/migration/qemu-file-channel.c
index e202d73834..457ea6c831 100644
--- a/migration/qemu-file-channel.c
+++ b/migration/qemu-file-channel.c
@@ -36,35 +36,8 @@ static ssize_t channel_writev_buffer(void *opaque,
                                      int64_t pos)
 {
     QIOChannel *ioc = QIO_CHANNEL(opaque);
-    ssize_t done = 0;
-    struct iovec *local_iov = g_new(struct iovec, iovcnt);
-    struct iovec *local_iov_head = local_iov;
-    unsigned int nlocal_iov = iovcnt;
 
-    nlocal_iov = iov_copy(local_iov, nlocal_iov,
-                          iov, iovcnt,
-                          0, iov_size(iov, iovcnt));
-
-    while (nlocal_iov > 0) {
-        ssize_t len;
-        len = qio_channel_writev(ioc, local_iov, nlocal_iov, NULL);
-        if (len == QIO_CHANNEL_ERR_BLOCK) {
-            qio_channel_wait(ioc, G_IO_OUT);
-            continue;
-        }
-        if (len < 0) {
-            /* XXX handle Error objects */
-            done = -EIO;
-            goto cleanup;
-        }
-
-        iov_discard_front(&local_iov, &nlocal_iov, len);
-        done += len;
-    }
-
- cleanup:
-    g_free(local_iov_head);
-    return done;
+    return qio_channel_writev_all(ioc, iov, iovcnt, NULL);
 }
 
 
diff --git a/tests/io-channel-helpers.c b/tests/io-channel-helpers.c
index 05e5579cf8..ea7b32b4e5 100644
--- a/tests/io-channel-helpers.c
+++ b/tests/io-channel-helpers.c
@@ -83,11 +83,21 @@ static gpointer test_io_thread_writer(gpointer opaque)
     qio_channel_set_blocking(data->src, data->blocking, NULL);
 
     while (niov) {
+        int use_all;
         ssize_t ret;
-        ret = qio_channel_writev(data->src,
-                                 iov,
-                                 niov,
-                                 &data->writeerr);
+        use_all = g_test_rand_int_range(0, 2);
+
+        if (use_all) {
+            ret = qio_channel_writev_all(data->src,
+                                         iov,
+                                         niov,
+                                         &data->writeerr);
+        } else {
+            ret = qio_channel_writev(data->src,
+                                     iov,
+                                     niov,
+                                     &data->writeerr);
+        }
         if (ret == QIO_CHANNEL_ERR_BLOCK) {
             if (data->blocking) {
                 error_setg(&data->writeerr,
@@ -124,13 +134,21 @@ static gpointer test_io_thread_reader(gpointer opaque)
     qio_channel_set_blocking(data->dst, data->blocking, NULL);
 
     while (niov) {
+        int use_all;
         ssize_t ret;
+        use_all = g_test_rand_int_range(0, 2);
 
-        ret = qio_channel_readv(data->dst,
-                                iov,
-                                niov,
-                                &data->readerr);
-
+        if (use_all) {
+            ret = qio_channel_readv_all(data->dst,
+                                        iov,
+                                        niov,
+                                        &data->readerr);
+        } else {
+            ret = qio_channel_readv(data->dst,
+                                    iov,
+                                    niov,
+                                    &data->readerr);
+        }
         if (ret == QIO_CHANNEL_ERR_BLOCK) {
             if (data->blocking) {
                 error_setg(&data->readerr,
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 09/22] migration: Add multifd capability
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
                   ` (7 preceding siblings ...)
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 08/22] qio: Create new qio_channel_{readv, writev}_all Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 10/22] migration: Create x-multifd-threads parameter Juan Quintela
                   ` (12 subsequent siblings)
  21 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
Reviewed-by: Peter Xu <peterx@redhat.com>
Reviewed-by: Daniel P. Berrange <berrange@redhat.com>

--

Use new DEFINE_PROP
---
 migration/migration.c | 10 ++++++++++
 migration/migration.h |  1 +
 qapi/migration.json   |  4 +++-
 3 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/migration/migration.c b/migration/migration.c
index 6191551e5a..1e97b3114e 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -1458,6 +1458,15 @@ bool migrate_use_events(void)
     return s->enabled_capabilities[MIGRATION_CAPABILITY_EVENTS];
 }
 
+bool migrate_use_multifd(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->enabled_capabilities[MIGRATION_CAPABILITY_X_MULTIFD];
+}
+
 int migrate_use_xbzrle(void)
 {
     MigrationState *s;
@@ -2236,6 +2245,7 @@ static Property migration_properties[] = {
     DEFINE_PROP_MIG_CAP("x-release-ram", MIGRATION_CAPABILITY_RELEASE_RAM),
     DEFINE_PROP_MIG_CAP("x-block", MIGRATION_CAPABILITY_BLOCK),
     DEFINE_PROP_MIG_CAP("x-return-path", MIGRATION_CAPABILITY_RETURN_PATH),
+    DEFINE_PROP_MIG_CAP("x-multifd", MIGRATION_CAPABILITY_X_MULTIFD),
 
     DEFINE_PROP_END_OF_LIST(),
 };
diff --git a/migration/migration.h b/migration/migration.h
index 7ddb0cdc1a..023b14aa08 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -179,6 +179,7 @@ bool migrate_postcopy_ram(void);
 bool migrate_zero_blocks(void);
 
 bool migrate_auto_converge(void);
+bool migrate_use_multifd(void);
 
 int migrate_use_xbzrle(void);
 int64_t migrate_xbzrle_cache_size(void);
diff --git a/qapi/migration.json b/qapi/migration.json
index ee2b3b8733..ec4a88a43a 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -341,12 +341,14 @@
 # @return-path: If enabled, migration will use the return path even
 #               for precopy. (since 2.10)
 #
+# @x-multifd: Use more than one fd for migration (since 2.11)
+#
 # Since: 1.2
 ##
 { 'enum': 'MigrationCapability',
   'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks',
            'compress', 'events', 'postcopy-ram', 'x-colo', 'release-ram',
-           'block', 'return-path' ] }
+           'block', 'return-path', 'x-multifd' ] }
 
 ##
 # @MigrationCapabilityStatus:
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 10/22] migration: Create x-multifd-threads parameter
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
                   ` (8 preceding siblings ...)
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 09/22] migration: Add multifd capability Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 11/22] migration: Create x-multifd-group parameter Juan Quintela
                   ` (11 subsequent siblings)
  21 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Indicates the number of threads that we would create.  By default we
create 2 threads.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
Reviewed-by: Peter Xu <peterx@redhat.com>

--

Catch inconsistent defaults (eric).
Improve comment stating that number of threads is the same than number
of sockets
Use new DEFIN_PROP_*
---
 hmp.c                 |  7 +++++++
 migration/migration.c | 26 ++++++++++++++++++++++++++
 migration/migration.h |  1 +
 qapi/migration.json   | 24 +++++++++++++++++++++---
 4 files changed, 55 insertions(+), 3 deletions(-)

diff --git a/hmp.c b/hmp.c
index cd046c6d71..4931b2fe9f 100644
--- a/hmp.c
+++ b/hmp.c
@@ -336,6 +336,9 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
         monitor_printf(mon, "%s: %s\n",
             MigrationParameter_str(MIGRATION_PARAMETER_BLOCK_INCREMENTAL),
             params->block_incremental ? "on" : "off");
+        monitor_printf(mon, "%s: %" PRId64 "\n",
+            MigrationParameter_str(MIGRATION_PARAMETER_X_MULTIFD_THREADS),
+            params->x_multifd_threads);
     }
 
     qapi_free_MigrationParameters(params);
@@ -1621,6 +1624,10 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
         p->has_block_incremental = true;
         visit_type_bool(v, param, &p->block_incremental, &err);
         break;
+    case MIGRATION_PARAMETER_X_MULTIFD_THREADS:
+        p->has_x_multifd_threads = true;
+        visit_type_int(v, param, &p->x_multifd_threads, &err);
+        break;
     default:
         assert(0);
     }
diff --git a/migration/migration.c b/migration/migration.c
index 1e97b3114e..fe9788ed62 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -77,6 +77,7 @@
  * Note: Please change this default value to 10000 when we support hybrid mode.
  */
 #define DEFAULT_MIGRATE_X_CHECKPOINT_DELAY 200
+#define DEFAULT_MIGRATE_MULTIFD_THREADS 2
 
 static NotifierList migration_state_notifiers =
     NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
@@ -482,6 +483,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
     params->x_checkpoint_delay = s->parameters.x_checkpoint_delay;
     params->has_block_incremental = true;
     params->block_incremental = s->parameters.block_incremental;
+    params->has_x_multifd_threads = true;
+    params->x_multifd_threads = s->parameters.x_multifd_threads;
 
     return params;
 }
@@ -763,6 +766,13 @@ static bool migrate_params_check(MigrationParameters *params, Error **errp)
                     "is invalid, it should be positive");
         return false;
     }
+    if (params->has_x_multifd_threads &&
+        (params->x_multifd_threads < 1 || params->x_multifd_threads > 255)) {
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+                   "multifd_threads",
+                   "is invalid, it should be in the range of 1 to 255");
+        return false;
+    }
 
     return true;
 }
@@ -881,6 +891,9 @@ static void migrate_params_apply(MigrateSetParameters *params)
     if (params->has_block_incremental) {
         s->parameters.block_incremental = params->block_incremental;
     }
+    if (params->has_x_multifd_threads) {
+        s->parameters.x_multifd_threads = params->x_multifd_threads;
+    }
 }
 
 void qmp_migrate_set_parameters(MigrateSetParameters *params, Error **errp)
@@ -1467,6 +1480,15 @@ bool migrate_use_multifd(void)
     return s->enabled_capabilities[MIGRATION_CAPABILITY_X_MULTIFD];
 }
 
+int migrate_multifd_threads(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->parameters.x_multifd_threads;
+}
+
 int migrate_use_xbzrle(void)
 {
     MigrationState *s;
@@ -2232,6 +2254,9 @@ static Property migration_properties[] = {
     DEFINE_PROP_INT64("x-checkpoint-delay", MigrationState,
                       parameters.x_checkpoint_delay,
                       DEFAULT_MIGRATE_X_CHECKPOINT_DELAY),
+    DEFINE_PROP_INT64("x-multifd-threads", MigrationState,
+                      parameters.x_multifd_threads,
+                      DEFAULT_MIGRATE_MULTIFD_THREADS),
 
     /* Migration capabilities */
     DEFINE_PROP_MIG_CAP("x-xbzrle", MIGRATION_CAPABILITY_XBZRLE),
@@ -2291,6 +2316,7 @@ static void migration_instance_init(Object *obj)
     params->has_downtime_limit = true;
     params->has_x_checkpoint_delay = true;
     params->has_block_incremental = true;
+    params->has_x_multifd_threads = true;
 }
 
 /*
diff --git a/migration/migration.h b/migration/migration.h
index 023b14aa08..8a61f460b0 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -180,6 +180,7 @@ bool migrate_zero_blocks(void);
 
 bool migrate_auto_converge(void);
 bool migrate_use_multifd(void);
+int migrate_multifd_threads(void);
 
 int migrate_use_xbzrle(void);
 int64_t migrate_xbzrle_cache_size(void);
diff --git a/qapi/migration.json b/qapi/migration.json
index ec4a88a43a..b5a5d694f4 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -466,13 +466,19 @@
 # 	migrated and the destination must already have access to the
 # 	same backing chain as was used on the source.  (since 2.10)
 #
+# @x-multifd-threads: Number of threads used to migrate data in
+#                     parallel. This is the same number that the
+#                     number of sockets used for migration.
+#                     The default value is 2 (since 2.11)
+#
 # Since: 2.4
 ##
 { 'enum': 'MigrationParameter',
   'data': ['compress-level', 'compress-threads', 'decompress-threads',
            'cpu-throttle-initial', 'cpu-throttle-increment',
            'tls-creds', 'tls-hostname', 'max-bandwidth',
-           'downtime-limit', 'x-checkpoint-delay', 'block-incremental' ] }
+           'downtime-limit', 'x-checkpoint-delay', 'block-incremental',
+           'x-multifd-threads'] }
 
 ##
 # @MigrateSetParameters:
@@ -528,6 +534,11 @@
 # 	migrated and the destination must already have access to the
 # 	same backing chain as was used on the source.  (since 2.10)
 #
+# @x-multifd-threads: Number of threads used to migrate data in
+#                     parallel. This is the same number that the
+#                     number of sockets used for migration.
+#                     The default value is 2 (since 2.11)
+#
 # Since: 2.4
 ##
 # TODO either fuse back into MigrationParameters, or make
@@ -543,7 +554,8 @@
             '*max-bandwidth': 'int',
             '*downtime-limit': 'int',
             '*x-checkpoint-delay': 'int',
-            '*block-incremental': 'bool' } }
+            '*block-incremental': 'bool',
+            '*x-multifd-threads': 'int' } }
 
 ##
 # @migrate-set-parameters:
@@ -614,6 +626,11 @@
 # 	migrated and the destination must already have access to the
 # 	same backing chain as was used on the source.  (since 2.10)
 #
+# @x-multifd-threads: Number of threads used to migrate data in
+#                     parallel. This is the same number that the
+#                     number of sockets used for migration.
+#                     The default value is 2 (since 2.11)
+#
 # Since: 2.4
 ##
 { 'struct': 'MigrationParameters',
@@ -627,7 +644,8 @@
             '*max-bandwidth': 'int',
             '*downtime-limit': 'int',
             '*x-checkpoint-delay': 'int',
-            '*block-incremental': 'bool' } }
+            '*block-incremental': 'bool' ,
+            '*x-multifd-threads': 'int' } }
 
 ##
 # @query-migrate-parameters:
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 11/22] migration: Create x-multifd-group parameter
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
                   ` (9 preceding siblings ...)
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 10/22] migration: Create x-multifd-threads parameter Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 12/22] migration: Create multifd migration threads Juan Quintela
                   ` (10 subsequent siblings)
  21 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Indicates how many pages we are going to send in each batch to a multifd
thread.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
Reviewed-by: Peter Xu <peterx@redhat.com>

--

Be consistent with defaults and documentation
Use new DEFINE_PROP_*
---
 hmp.c                 |  7 +++++++
 migration/migration.c | 26 ++++++++++++++++++++++++++
 migration/migration.h |  1 +
 qapi/migration.json   | 17 ++++++++++++++---
 4 files changed, 48 insertions(+), 3 deletions(-)

diff --git a/hmp.c b/hmp.c
index 4931b2fe9f..d9562103ee 100644
--- a/hmp.c
+++ b/hmp.c
@@ -339,6 +339,9 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
         monitor_printf(mon, "%s: %" PRId64 "\n",
             MigrationParameter_str(MIGRATION_PARAMETER_X_MULTIFD_THREADS),
             params->x_multifd_threads);
+        monitor_printf(mon, "%s: %" PRId64 "\n",
+            MigrationParameter_str(MIGRATION_PARAMETER_X_MULTIFD_GROUP),
+            params->x_multifd_group);
     }
 
     qapi_free_MigrationParameters(params);
@@ -1628,6 +1631,10 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
         p->has_x_multifd_threads = true;
         visit_type_int(v, param, &p->x_multifd_threads, &err);
         break;
+    case MIGRATION_PARAMETER_X_MULTIFD_GROUP:
+        p->has_x_multifd_group = true;
+        visit_type_int(v, param, &p->x_multifd_group, &err);
+        break;
     default:
         assert(0);
     }
diff --git a/migration/migration.c b/migration/migration.c
index fe9788ed62..208554dc37 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -78,6 +78,7 @@
  */
 #define DEFAULT_MIGRATE_X_CHECKPOINT_DELAY 200
 #define DEFAULT_MIGRATE_MULTIFD_THREADS 2
+#define DEFAULT_MIGRATE_MULTIFD_GROUP 16
 
 static NotifierList migration_state_notifiers =
     NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
@@ -485,6 +486,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
     params->block_incremental = s->parameters.block_incremental;
     params->has_x_multifd_threads = true;
     params->x_multifd_threads = s->parameters.x_multifd_threads;
+    params->has_x_multifd_group = true;
+    params->x_multifd_group = s->parameters.x_multifd_group;
 
     return params;
 }
@@ -773,6 +776,13 @@ static bool migrate_params_check(MigrationParameters *params, Error **errp)
                    "is invalid, it should be in the range of 1 to 255");
         return false;
     }
+    if (params->has_x_multifd_group &&
+            (params->x_multifd_group < 1 || params->x_multifd_group > 10000)) {
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+                   "multifd_group",
+                   "is invalid, it should be in the range of 1 to 10000");
+        return false;
+    }
 
     return true;
 }
@@ -894,6 +904,9 @@ static void migrate_params_apply(MigrateSetParameters *params)
     if (params->has_x_multifd_threads) {
         s->parameters.x_multifd_threads = params->x_multifd_threads;
     }
+    if (params->has_x_multifd_group) {
+        s->parameters.x_multifd_group = params->x_multifd_group;
+    }
 }
 
 void qmp_migrate_set_parameters(MigrateSetParameters *params, Error **errp)
@@ -1489,6 +1502,15 @@ int migrate_multifd_threads(void)
     return s->parameters.x_multifd_threads;
 }
 
+int migrate_multifd_group(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->parameters.x_multifd_group;
+}
+
 int migrate_use_xbzrle(void)
 {
     MigrationState *s;
@@ -2257,6 +2279,9 @@ static Property migration_properties[] = {
     DEFINE_PROP_INT64("x-multifd-threads", MigrationState,
                       parameters.x_multifd_threads,
                       DEFAULT_MIGRATE_MULTIFD_THREADS),
+    DEFINE_PROP_INT64("x-multifd-group", MigrationState,
+                      parameters.x_multifd_group,
+                      DEFAULT_MIGRATE_MULTIFD_GROUP),
 
     /* Migration capabilities */
     DEFINE_PROP_MIG_CAP("x-xbzrle", MIGRATION_CAPABILITY_XBZRLE),
@@ -2317,6 +2342,7 @@ static void migration_instance_init(Object *obj)
     params->has_x_checkpoint_delay = true;
     params->has_block_incremental = true;
     params->has_x_multifd_threads = true;
+    params->has_x_multifd_group = true;
 }
 
 /*
diff --git a/migration/migration.h b/migration/migration.h
index 8a61f460b0..cb634fc31f 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -181,6 +181,7 @@ bool migrate_zero_blocks(void);
 bool migrate_auto_converge(void);
 bool migrate_use_multifd(void);
 int migrate_multifd_threads(void);
+int migrate_multifd_group(void);
 
 int migrate_use_xbzrle(void);
 int64_t migrate_xbzrle_cache_size(void);
diff --git a/qapi/migration.json b/qapi/migration.json
index b5a5d694f4..6a838369ef 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -471,6 +471,9 @@
 #                     number of sockets used for migration.
 #                     The default value is 2 (since 2.11)
 #
+# @x-multifd-group: Number of pages sent together to a thread
+#                   The default value is 16 (since 2.11)
+#
 # Since: 2.4
 ##
 { 'enum': 'MigrationParameter',
@@ -478,7 +481,7 @@
            'cpu-throttle-initial', 'cpu-throttle-increment',
            'tls-creds', 'tls-hostname', 'max-bandwidth',
            'downtime-limit', 'x-checkpoint-delay', 'block-incremental',
-           'x-multifd-threads'] }
+           'x-multifd-threads', 'x-multifd-group' ] }
 
 ##
 # @MigrateSetParameters:
@@ -539,6 +542,9 @@
 #                     number of sockets used for migration.
 #                     The default value is 2 (since 2.11)
 #
+# @x-multifd-group: Number of pages sent together to a thread
+#                   The default value is 16 (since 2.11)
+#
 # Since: 2.4
 ##
 # TODO either fuse back into MigrationParameters, or make
@@ -555,7 +561,8 @@
             '*downtime-limit': 'int',
             '*x-checkpoint-delay': 'int',
             '*block-incremental': 'bool',
-            '*x-multifd-threads': 'int' } }
+            '*x-multifd-threads': 'int',
+            '*x-multifd-group': 'int' } }
 
 ##
 # @migrate-set-parameters:
@@ -631,6 +638,9 @@
 #                     number of sockets used for migration.
 #                     The default value is 2 (since 2.11)
 #
+# @x-multifd-group: Number of pages sent together to a thread
+#                   The default value is 16 (since 2.11)
+#
 # Since: 2.4
 ##
 { 'struct': 'MigrationParameters',
@@ -645,7 +655,8 @@
             '*downtime-limit': 'int',
             '*x-checkpoint-delay': 'int',
             '*block-incremental': 'bool' ,
-            '*x-multifd-threads': 'int' } }
+            '*x-multifd-threads': 'int',
+            '*x-multifd-group': 'int' } }
 
 ##
 # @query-migrate-parameters:
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 12/22] migration: Create multifd migration threads
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
                   ` (10 preceding siblings ...)
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 11/22] migration: Create x-multifd-group parameter Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 13/22] migration: Split migration_fd_process_incoming Juan Quintela
                   ` (9 subsequent siblings)
  21 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Creation of the threads, nothing inside yet.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

--

Use pointers instead of long array names
Move to use semaphores instead of conditions as paolo suggestion

Put all the state inside one struct.
Use a counter for the number of threads created.  Needed during cancellation.

Add error return to thread creation

Add id field

Rename functions to multifd_save/load_setup/cleanup
Change recv parameters to a pointer to struct
Change back to a struct
Use Error * for _cleanup

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/migration.c |  26 +++++++
 migration/ram.c       | 202 ++++++++++++++++++++++++++++++++++++++++++++++++++
 migration/ram.h       |   5 ++
 3 files changed, 233 insertions(+)

diff --git a/migration/migration.c b/migration/migration.c
index 208554dc37..9fec880a58 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -281,6 +281,10 @@ static void process_incoming_migration_bh(void *opaque)
      */
     qemu_announce_self();
 
+    if (multifd_load_cleanup(&local_err) != 0) {
+        error_report_err(local_err);
+        autostart = false;
+    }
     /* If global state section was not received or we are in running
        state, we need to obey autostart. Any other state is set with
        runstate_set. */
@@ -353,10 +357,15 @@ static void process_incoming_migration_co(void *opaque)
     }
 
     if (ret < 0) {
+        Error *local_err = NULL;
+
         migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
                           MIGRATION_STATUS_FAILED);
         error_report("load of migration failed: %s", strerror(-ret));
         qemu_fclose(mis->from_src_file);
+        if (multifd_load_cleanup(&local_err) != 0) {
+            error_report_err(local_err);
+        }
         exit(EXIT_FAILURE);
     }
     mis->bh = qemu_bh_new(process_incoming_migration_bh, mis);
@@ -368,6 +377,12 @@ void migration_fd_process_incoming(QEMUFile *f)
     Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, NULL);
     MigrationIncomingState *mis = migration_incoming_get_current();
 
+    if (multifd_load_setup() != 0) {
+        /* We haven't been able to create multifd threads
+           nothing better to do */
+        exit(EXIT_FAILURE);
+    }
+
     if (!mis->from_src_file) {
         mis->from_src_file = f;
     }
@@ -1019,6 +1034,8 @@ static void migrate_fd_cleanup(void *opaque)
     s->cleanup_bh = NULL;
 
     if (s->to_dst_file) {
+        Error *local_err = NULL;
+
         trace_migrate_fd_cleanup();
         qemu_mutex_unlock_iothread();
         if (s->migration_thread_running) {
@@ -1027,6 +1044,9 @@ static void migrate_fd_cleanup(void *opaque)
         }
         qemu_mutex_lock_iothread();
 
+        if (multifd_save_cleanup(&local_err) != 0) {
+            error_report_err(local_err);
+        }
         qemu_fclose(s->to_dst_file);
         s->to_dst_file = NULL;
     }
@@ -2225,6 +2245,12 @@ void migrate_fd_connect(MigrationState *s)
         }
     }
 
+    if (multifd_save_setup() != 0) {
+        migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
+                          MIGRATION_STATUS_FAILED);
+        migrate_fd_cleanup(s);
+        return;
+    }
     qemu_thread_create(&s->thread, "live_migration", migration_thread, s,
                        QEMU_THREAD_JOINABLE);
     s->migration_thread_running = true;
diff --git a/migration/ram.c b/migration/ram.c
index e0179fc838..4e1616b953 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -356,6 +356,208 @@ static void compress_threads_save_setup(void)
     }
 }
 
+/* Multiple fd's */
+
+struct MultiFDSendParams {
+    uint8_t id;
+    char *name;
+    QemuThread thread;
+    QemuSemaphore sem;
+    QemuMutex mutex;
+    bool quit;
+};
+typedef struct MultiFDSendParams MultiFDSendParams;
+
+struct {
+    MultiFDSendParams *params;
+    /* number of created threads */
+    int count;
+} *multifd_send_state;
+
+static void terminate_multifd_send_threads(Error *errp)
+{
+    int i;
+
+    for (i = 0; i < multifd_send_state->count; i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+
+        qemu_mutex_lock(&p->mutex);
+        p->quit = true;
+        qemu_sem_post(&p->sem);
+        qemu_mutex_unlock(&p->mutex);
+    }
+}
+
+int multifd_save_cleanup(Error **errp)
+{
+    int i;
+    int ret = 0;
+
+    if (!migrate_use_multifd()) {
+        return 0;
+    }
+    terminate_multifd_send_threads(NULL);
+    for (i = 0; i < multifd_send_state->count; i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+
+        qemu_thread_join(&p->thread);
+        qemu_mutex_destroy(&p->mutex);
+        qemu_sem_destroy(&p->sem);
+        g_free(p->name);
+        p->name = NULL;
+    }
+    g_free(multifd_send_state->params);
+    multifd_send_state->params = NULL;
+    g_free(multifd_send_state);
+    multifd_send_state = NULL;
+    return ret;
+}
+
+static void *multifd_send_thread(void *opaque)
+{
+    MultiFDSendParams *p = opaque;
+
+    while (true) {
+        qemu_mutex_lock(&p->mutex);
+        if (p->quit) {
+            qemu_mutex_unlock(&p->mutex);
+            break;
+        }
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_wait(&p->sem);
+    }
+
+    return NULL;
+}
+
+int multifd_save_setup(void)
+{
+    int thread_count;
+    uint8_t i;
+
+    if (!migrate_use_multifd()) {
+        return 0;
+    }
+    thread_count = migrate_multifd_threads();
+    multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
+    multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
+    multifd_send_state->count = 0;
+    for (i = 0; i < thread_count; i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+
+        qemu_mutex_init(&p->mutex);
+        qemu_sem_init(&p->sem, 0);
+        p->quit = false;
+        p->id = i;
+        p->name = g_strdup_printf("multifdsend_%d", i);
+        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
+                           QEMU_THREAD_JOINABLE);
+
+        multifd_send_state->count++;
+    }
+    return 0;
+}
+
+struct MultiFDRecvParams {
+    uint8_t id;
+    char *name;
+    QemuThread thread;
+    QemuSemaphore sem;
+    QemuMutex mutex;
+    bool quit;
+};
+typedef struct MultiFDRecvParams MultiFDRecvParams;
+
+struct {
+    MultiFDRecvParams *params;
+    /* number of created threads */
+    int count;
+} *multifd_recv_state;
+
+static void terminate_multifd_recv_threads(Error *errp)
+{
+    int i;
+
+    for (i = 0; i < multifd_recv_state->count; i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        qemu_mutex_lock(&p->mutex);
+        p->quit = true;
+        qemu_sem_post(&p->sem);
+        qemu_mutex_unlock(&p->mutex);
+    }
+}
+
+int multifd_load_cleanup(Error **errp)
+{
+    int i;
+    int ret = 0;
+
+    if (!migrate_use_multifd()) {
+        return 0;
+    }
+    terminate_multifd_recv_threads(NULL);
+    for (i = 0; i < multifd_recv_state->count; i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        qemu_thread_join(&p->thread);
+        qemu_mutex_destroy(&p->mutex);
+        qemu_sem_destroy(&p->sem);
+        g_free(p->name);
+        p->name = NULL;
+    }
+    g_free(multifd_recv_state->params);
+    multifd_recv_state->params = NULL;
+    g_free(multifd_recv_state);
+    multifd_recv_state = NULL;
+
+    return ret;
+}
+
+static void *multifd_recv_thread(void *opaque)
+{
+    MultiFDRecvParams *p = opaque;
+
+    while (true) {
+        qemu_mutex_lock(&p->mutex);
+        if (p->quit) {
+            qemu_mutex_unlock(&p->mutex);
+            break;
+        }
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_wait(&p->sem);
+    }
+
+    return NULL;
+}
+
+int multifd_load_setup(void)
+{
+    int thread_count;
+    uint8_t i;
+
+    if (!migrate_use_multifd()) {
+        return 0;
+    }
+    thread_count = migrate_multifd_threads();
+    multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
+    multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
+    multifd_recv_state->count = 0;
+    for (i = 0; i < thread_count; i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        qemu_mutex_init(&p->mutex);
+        qemu_sem_init(&p->sem, 0);
+        p->quit = false;
+        p->id = i;
+        p->name = g_strdup_printf("multifdrecv_%d", i);
+        qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
+                           QEMU_THREAD_JOINABLE);
+        multifd_recv_state->count++;
+    }
+    return 0;
+}
+
 /**
  * save_page_header: write page header to wire
  *
diff --git a/migration/ram.h b/migration/ram.h
index c081fde86c..4a72d66503 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -39,6 +39,11 @@ int64_t xbzrle_cache_resize(int64_t new_size);
 uint64_t ram_bytes_remaining(void);
 uint64_t ram_bytes_total(void);
 
+int multifd_save_setup(void);
+int multifd_save_cleanup(Error **errp);
+int multifd_load_setup(void);
+int multifd_load_cleanup(Error **errp);
+
 uint64_t ram_pagesize_summary(void);
 int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len);
 void acct_update_position(QEMUFile *f, size_t size, bool zero);
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 13/22] migration: Split migration_fd_process_incoming
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
                   ` (11 preceding siblings ...)
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 12/22] migration: Create multifd migration threads Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-08 17:22   ` Dr. David Alan Gilbert
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 14/22] migration: Start of multiple fd work Juan Quintela
                   ` (8 subsequent siblings)
  21 siblings, 1 reply; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We need that on later patches.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
Reviewed-by: Peter Xu <peterx@redhat.com>
Reviewed-by: Daniel P. Berrange <berrange@redhat.com>
---
 migration/migration.c | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 9fec880a58..18bd24a14c 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -372,9 +372,8 @@ static void process_incoming_migration_co(void *opaque)
     qemu_bh_schedule(mis->bh);
 }
 
-void migration_fd_process_incoming(QEMUFile *f)
+static void migration_incoming_setup(QEMUFile *f)
 {
-    Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, NULL);
     MigrationIncomingState *mis = migration_incoming_get_current();
 
     if (multifd_load_setup() != 0) {
@@ -387,9 +386,20 @@ void migration_fd_process_incoming(QEMUFile *f)
         mis->from_src_file = f;
     }
     qemu_file_set_blocking(f, false);
+}
+
+static void migration_incoming_process(void)
+{
+    Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, NULL);
     qemu_coroutine_enter(co);
 }
 
+void migration_fd_process_incoming(QEMUFile *f)
+{
+    migration_incoming_setup(f);
+    migration_incoming_process();
+}
+
 void migration_ioc_process_incoming(QIOChannel *ioc)
 {
     MigrationIncomingState *mis = migration_incoming_get_current();
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 14/22] migration: Start of multiple fd work
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
                   ` (12 preceding siblings ...)
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 13/22] migration: Split migration_fd_process_incoming Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 15/22] migration: Create ram_multifd_page Juan Quintela
                   ` (7 subsequent siblings)
  21 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We create new channels for each new thread created. We send through
them a string containing <uuid> multifd <channel number> so we are
sure that we connect the right channels in both sides.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--
Split SocketArgs into incoming and outgoing args

Use UUID's on the initial message, so we are sure we are connecting to
the right channel.

Remove init semaphore.  Now that we use uuids on the init message, we
know that this is our channel.

Fix recv socket destwroy, we were destroying send channels.
This was very interesting, because we were using an unreferred object
without problems.

Move to struct of pointers
init channel sooner.
split recv thread creation.
listen on main thread
We count the number of created threads to know when we need to stop listening
Use g_strdup_printf
report channel id on errors
Add name parameter
Use local_err
Add Error * parameter to socket_send_channel_create()
---
 migration/migration.c |   5 +++
 migration/ram.c       | 120 ++++++++++++++++++++++++++++++++++++++++++++------
 migration/ram.h       |   3 ++
 migration/socket.c    |  33 +++++++++++++-
 migration/socket.h    |  10 +++++
 5 files changed, 157 insertions(+), 14 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 18bd24a14c..b06de8b189 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -420,6 +420,11 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
  */
 bool migration_has_all_channels(void)
 {
+    if (migrate_use_multifd()) {
+        int thread_count = migrate_multifd_threads();
+
+        return thread_count == multifd_created_threads();
+    }
     return true;
 }
 
diff --git a/migration/ram.c b/migration/ram.c
index 4e1616b953..9d45f4c7ca 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -36,6 +36,7 @@
 #include "xbzrle.h"
 #include "ram.h"
 #include "migration.h"
+#include "socket.h"
 #include "migration/register.h"
 #include "migration/misc.h"
 #include "qemu-file.h"
@@ -46,6 +47,8 @@
 #include "exec/ram_addr.h"
 #include "qemu/rcu_queue.h"
 #include "migration/colo.h"
+#include "sysemu/sysemu.h"
+#include "qemu/uuid.h"
 
 /***********************************************************/
 /* ram save/restore */
@@ -362,6 +365,7 @@ struct MultiFDSendParams {
     uint8_t id;
     char *name;
     QemuThread thread;
+    QIOChannel *c;
     QemuSemaphore sem;
     QemuMutex mutex;
     bool quit;
@@ -378,6 +382,12 @@ static void terminate_multifd_send_threads(Error *errp)
 {
     int i;
 
+    if (errp) {
+        MigrationState *s = migrate_get_current();
+        migrate_set_error(s, errp);
+        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
+                          MIGRATION_STATUS_FAILED);
+    }
     for (i = 0; i < multifd_send_state->count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
@@ -403,6 +413,7 @@ int multifd_save_cleanup(Error **errp)
         qemu_thread_join(&p->thread);
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        socket_send_channel_destroy(p->c);
         g_free(p->name);
         p->name = NULL;
     }
@@ -413,9 +424,32 @@ int multifd_save_cleanup(Error **errp)
     return ret;
 }
 
+/* Default uuid for multifd when qemu is not started with uuid */
+static char multifd_uuid[] = "5c49fd7e-af88-4a07-b6e8-091fd696ad40";
+/* strlen(multifd) + '-' + <channel id> + '-' +  UUID_FMT + '\0' */
+#define MULTIFD_UUID_MSG (7 + 1 + 3 + 1 + UUID_FMT_LEN + 1)
+
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
+    Error *local_err = NULL;
+    char *string;
+    char *string_uuid;
+    size_t ret;
+
+    if (qemu_uuid_set) {
+        string_uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
+    } else {
+        string_uuid = g_strdup(multifd_uuid);
+    }
+    string = g_strdup_printf("%s multifd %03d", string_uuid, p->id);
+    g_free(string_uuid);
+    ret = qio_channel_write(p->c, string, MULTIFD_UUID_MSG, &local_err);
+    g_free(string);
+    if (ret != MULTIFD_UUID_MSG) {
+        terminate_multifd_send_threads(local_err);
+        return NULL;
+    }
 
     while (true) {
         qemu_mutex_lock(&p->mutex);
@@ -432,6 +466,7 @@ static void *multifd_send_thread(void *opaque)
 
 int multifd_save_setup(void)
 {
+    Error *local_err = NULL;
     int thread_count;
     uint8_t i;
 
@@ -449,6 +484,13 @@ int multifd_save_setup(void)
         qemu_sem_init(&p->sem, 0);
         p->quit = false;
         p->id = i;
+        p->c = socket_send_channel_create(&local_err);
+        if (!p->c) {
+            if (multifd_save_cleanup(&local_err) != 0) {
+                migrate_set_error(migrate_get_current(), local_err);
+            }
+            return -1;
+        }
         p->name = g_strdup_printf("multifdsend_%d", i);
         qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
                            QEMU_THREAD_JOINABLE);
@@ -462,6 +504,7 @@ struct MultiFDRecvParams {
     uint8_t id;
     char *name;
     QemuThread thread;
+    QIOChannel *c;
     QemuSemaphore sem;
     QemuMutex mutex;
     bool quit;
@@ -472,12 +515,22 @@ struct {
     MultiFDRecvParams *params;
     /* number of created threads */
     int count;
+    /* Should we finish */
+    bool quit;
 } *multifd_recv_state;
 
 static void terminate_multifd_recv_threads(Error *errp)
 {
     int i;
 
+    if (errp) {
+        MigrationState *s = migrate_get_current();
+        migrate_set_error(s, errp);
+        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
+                          MIGRATION_STATUS_FAILED);
+    }
+    multifd_recv_state->quit = true;
+
     for (i = 0; i < multifd_recv_state->count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
@@ -503,6 +556,7 @@ int multifd_load_cleanup(Error **errp)
         qemu_thread_join(&p->thread);
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        socket_recv_channel_destroy(p->c);
         g_free(p->name);
         p->name = NULL;
     }
@@ -531,10 +585,56 @@ static void *multifd_recv_thread(void *opaque)
     return NULL;
 }
 
+void multifd_new_channel(QIOChannel *ioc)
+{
+    MultiFDRecvParams *p;
+    char string[MULTIFD_UUID_MSG];
+    char string_uuid[UUID_FMT_LEN];
+    Error *local_err = NULL;
+    char *uuid;
+    size_t ret;
+    int id;
+
+    ret = qio_channel_read(ioc, string, sizeof(string), &local_err);
+    if (ret != sizeof(string)) {
+        terminate_multifd_recv_threads(local_err);
+        return;
+    }
+    sscanf(string, "%s multifd %03d", string_uuid, &id);
+
+    if (qemu_uuid_set) {
+        uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
+    } else {
+        uuid = g_strdup(multifd_uuid);
+    }
+    if (strcmp(string_uuid, uuid)) {
+        error_setg(&local_err, "multifd: received uuid '%s' and expected "
+                   "uuid '%s' for channel %d", string_uuid, uuid, id);
+        terminate_multifd_recv_threads(local_err);
+        return;
+    }
+    g_free(uuid);
+
+    p = &multifd_recv_state->params[id];
+    if (p->id != 0) {
+        error_setg(&local_err, "multifd: received id '%d' already setup'", id);
+        terminate_multifd_recv_threads(local_err);
+        return;
+    }
+    qemu_mutex_init(&p->mutex);
+    qemu_sem_init(&p->sem, 0);
+    p->quit = false;
+    p->id = id;
+    p->c = ioc;
+    multifd_recv_state->count++;
+    p->name = g_strdup_printf("multifdrecv_%d", id);
+    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
+                       QEMU_THREAD_JOINABLE);
+}
+
 int multifd_load_setup(void)
 {
     int thread_count;
-    uint8_t i;
 
     if (!migrate_use_multifd()) {
         return 0;
@@ -543,21 +643,15 @@ int multifd_load_setup(void)
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
     multifd_recv_state->count = 0;
-    for (i = 0; i < thread_count; i++) {
-        MultiFDRecvParams *p = &multifd_recv_state->params[i];
-
-        qemu_mutex_init(&p->mutex);
-        qemu_sem_init(&p->sem, 0);
-        p->quit = false;
-        p->id = i;
-        p->name = g_strdup_printf("multifdrecv_%d", i);
-        qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
-                           QEMU_THREAD_JOINABLE);
-        multifd_recv_state->count++;
-    }
+    multifd_recv_state->quit = false;
     return 0;
 }
 
+int multifd_created_threads(void)
+{
+    return multifd_recv_state->count;
+}
+
 /**
  * save_page_header: write page header to wire
  *
diff --git a/migration/ram.h b/migration/ram.h
index 4a72d66503..5572f52f0a 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -31,6 +31,7 @@
 
 #include "qemu-common.h"
 #include "exec/cpu-common.h"
+#include "io/channel.h"
 
 extern MigrationStats ram_counters;
 extern XBZRLECacheStats xbzrle_counters;
@@ -43,6 +44,8 @@ int multifd_save_setup(void);
 int multifd_save_cleanup(Error **errp);
 int multifd_load_setup(void);
 int multifd_load_cleanup(Error **errp);
+void multifd_new_channel(QIOChannel *ioc);
+int multifd_created_threads(void);
 
 uint64_t ram_pagesize_summary(void);
 int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len);
diff --git a/migration/socket.c b/migration/socket.c
index 2d70747a1a..58e81ae87b 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -26,6 +26,36 @@
 #include "io/channel-socket.h"
 #include "trace.h"
 
+int socket_recv_channel_destroy(QIOChannel *recv)
+{
+    /* Remove channel */
+    object_unref(OBJECT(recv));
+    return 0;
+}
+
+struct SocketOutgoingArgs {
+    SocketAddress *saddr;
+} outgoing_args;
+
+QIOChannel *socket_send_channel_create(Error **errp)
+{
+    QIOChannelSocket *sioc = qio_channel_socket_new();
+
+    qio_channel_socket_connect_sync(sioc, outgoing_args.saddr, errp);
+    qio_channel_set_delay(QIO_CHANNEL(sioc), false);
+    return QIO_CHANNEL(sioc);
+}
+
+int socket_send_channel_destroy(QIOChannel *send)
+{
+    /* Remove channel */
+    object_unref(OBJECT(send));
+    if (outgoing_args.saddr) {
+        qapi_free_SocketAddress(outgoing_args.saddr);
+        outgoing_args.saddr = NULL;
+    }
+    return 0;
+}
 
 static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
 {
@@ -95,6 +125,8 @@ static void socket_start_outgoing_migration(MigrationState *s,
     struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
 
     data->s = s;
+    outgoing_args.saddr = saddr;
+
     if (saddr->type == SOCKET_ADDRESS_TYPE_INET) {
         data->hostname = g_strdup(saddr->u.inet.host);
     }
@@ -105,7 +137,6 @@ static void socket_start_outgoing_migration(MigrationState *s,
                                      socket_outgoing_migration,
                                      data,
                                      socket_connect_data_free);
-    qapi_free_SocketAddress(saddr);
 }
 
 void tcp_start_outgoing_migration(MigrationState *s,
diff --git a/migration/socket.h b/migration/socket.h
index 6b91e9db38..8dd1a78d29 100644
--- a/migration/socket.h
+++ b/migration/socket.h
@@ -16,6 +16,16 @@
 
 #ifndef QEMU_MIGRATION_SOCKET_H
 #define QEMU_MIGRATION_SOCKET_H
+
+#include "io/channel.h"
+
+QIOChannel *socket_recv_channel_create(void);
+int socket_recv_channel_destroy(QIOChannel *recv);
+
+QIOChannel *socket_send_channel_create(Error **errp);
+
+int socket_send_channel_destroy(QIOChannel *send);
+
 void tcp_start_incoming_migration(const char *host_port, Error **errp);
 
 void tcp_start_outgoing_migration(MigrationState *s, const char *host_port,
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 15/22] migration: Create ram_multifd_page
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
                   ` (13 preceding siblings ...)
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 14/22] migration: Start of multiple fd work Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 16/22] migration: Really use multiple pages at a time Juan Quintela
                   ` (6 subsequent siblings)
  21 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

The function still don't use multifd, but we have simplified
ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
counter and a new flag for this type of pages.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--
Add last_page parameter
Add commets for done and address
---
 hmp.c                 |  2 ++
 migration/migration.c |  1 +
 migration/ram.c       | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++-
 qapi/migration.json   |  5 ++-
 4 files changed, 100 insertions(+), 2 deletions(-)

diff --git a/hmp.c b/hmp.c
index d9562103ee..7e865f5955 100644
--- a/hmp.c
+++ b/hmp.c
@@ -233,6 +233,8 @@ void hmp_info_migrate(Monitor *mon, const QDict *qdict)
             monitor_printf(mon, "postcopy request count: %" PRIu64 "\n",
                            info->ram->postcopy_requests);
         }
+        monitor_printf(mon, "multifd: %" PRIu64 " pages\n",
+                       info->ram->multifd);
     }
 
     if (info->has_disk) {
diff --git a/migration/migration.c b/migration/migration.c
index b06de8b189..b5875c0b15 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -556,6 +556,7 @@ static void populate_ram_info(MigrationInfo *info, MigrationState *s)
     info->ram->dirty_sync_count = ram_counters.dirty_sync_count;
     info->ram->postcopy_requests = ram_counters.postcopy_requests;
     info->ram->page_size = qemu_target_page_size();
+    info->ram->multifd = ram_counters.multifd;
 
     if (migrate_use_xbzrle()) {
         info->has_xbzrle_cache = true;
diff --git a/migration/ram.c b/migration/ram.c
index 9d45f4c7ca..2ee2699bb2 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -68,6 +68,7 @@
 #define RAM_SAVE_FLAG_XBZRLE   0x40
 /* 0x80 is reserved in migration.h start with 0x100 next */
 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
+#define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
 
 static inline bool is_zero_range(uint8_t *p, uint64_t size)
 {
@@ -362,13 +363,23 @@ static void compress_threads_save_setup(void)
 /* Multiple fd's */
 
 struct MultiFDSendParams {
+    /* not changed */
     uint8_t id;
     char *name;
     QemuThread thread;
     QIOChannel *c;
     QemuSemaphore sem;
     QemuMutex mutex;
+    /* protected by param mutex */
     bool quit;
+    /* This is a temp field.  We are using it now to transmit
+       something the address of the page.  Later in the series, we
+       change it for the real page.
+    */
+    uint8_t *address;
+    /* protected by multifd mutex */
+    /* has the thread finish the last submitted job */
+    bool done;
 };
 typedef struct MultiFDSendParams MultiFDSendParams;
 
@@ -376,6 +387,8 @@ struct {
     MultiFDSendParams *params;
     /* number of created threads */
     int count;
+    QemuMutex mutex;
+    QemuSemaphore sem;
 } *multifd_send_state;
 
 static void terminate_multifd_send_threads(Error *errp)
@@ -450,6 +463,7 @@ static void *multifd_send_thread(void *opaque)
         terminate_multifd_send_threads(local_err);
         return NULL;
     }
+    qemu_sem_post(&multifd_send_state->sem);
 
     while (true) {
         qemu_mutex_lock(&p->mutex);
@@ -457,6 +471,15 @@ static void *multifd_send_thread(void *opaque)
             qemu_mutex_unlock(&p->mutex);
             break;
         }
+        if (p->address) {
+            p->address = 0;
+            qemu_mutex_unlock(&p->mutex);
+            qemu_mutex_lock(&multifd_send_state->mutex);
+            p->done = true;
+            qemu_mutex_unlock(&multifd_send_state->mutex);
+            qemu_sem_post(&multifd_send_state->sem);
+            continue;
+        }
         qemu_mutex_unlock(&p->mutex);
         qemu_sem_wait(&p->sem);
     }
@@ -477,6 +500,8 @@ int multifd_save_setup(void)
     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
     multifd_send_state->count = 0;
+    qemu_mutex_init(&multifd_send_state->mutex);
+    qemu_sem_init(&multifd_send_state->sem, 0);
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
@@ -484,6 +509,8 @@ int multifd_save_setup(void)
         qemu_sem_init(&p->sem, 0);
         p->quit = false;
         p->id = i;
+        p->done = true;
+        p->address = 0;
         p->c = socket_send_channel_create(&local_err);
         if (!p->c) {
             if (multifd_save_cleanup(&local_err) != 0) {
@@ -500,6 +527,30 @@ int multifd_save_setup(void)
     return 0;
 }
 
+static uint16_t multifd_send_page(uint8_t *address, bool last_page)
+{
+    int i;
+    MultiFDSendParams *p = NULL; /* make happy gcc */
+
+    qemu_sem_wait(&multifd_send_state->sem);
+    qemu_mutex_lock(&multifd_send_state->mutex);
+    for (i = 0; i < multifd_send_state->count; i++) {
+        p = &multifd_send_state->params[i];
+
+        if (p->done) {
+            p->done = false;
+            break;
+        }
+    }
+    qemu_mutex_unlock(&multifd_send_state->mutex);
+    qemu_mutex_lock(&p->mutex);
+    p->address = address;
+    qemu_mutex_unlock(&p->mutex);
+    qemu_sem_post(&p->sem);
+
+    return 0;
+}
+
 struct MultiFDRecvParams {
     uint8_t id;
     char *name;
@@ -1082,6 +1133,32 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
     return pages;
 }
 
+static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
+                            bool last_stage)
+{
+    int pages;
+    uint8_t *p;
+    RAMBlock *block = pss->block;
+    ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
+
+    p = block->host + offset;
+
+    pages = save_zero_page(rs, block, offset, p);
+    if (pages == -1) {
+        ram_counters.transferred +=
+            save_page_header(rs, rs->f, block,
+                             offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
+        qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
+        multifd_send_page(p, rs->migration_dirty_pages == 1);
+        ram_counters.transferred += TARGET_PAGE_SIZE;
+        pages = 1;
+        ram_counters.normal++;
+        ram_counters.multifd++;
+    }
+
+    return pages;
+}
+
 static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
                                 ram_addr_t offset)
 {
@@ -1510,6 +1587,8 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
         if (migrate_use_compression() &&
             (rs->ram_bulk_stage || !migrate_use_xbzrle())) {
             res = ram_save_compressed_page(rs, pss, last_stage);
+        } else if (migrate_use_multifd()) {
+            res = ram_multifd_page(rs, pss, last_stage);
         } else {
             res = ram_save_page(rs, pss, last_stage);
         }
@@ -2802,6 +2881,10 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
     if (!migrate_use_compression()) {
         invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
     }
+
+    if (!migrate_use_multifd()) {
+        invalid_flags |= RAM_SAVE_FLAG_MULTIFD_PAGE;
+    }
     /* This RCU critical section can be very long running.
      * When RCU reclaims in the code start to become numerous,
      * it will be necessary to reduce the granularity of this
@@ -2826,13 +2909,17 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             if (flags & invalid_flags & RAM_SAVE_FLAG_COMPRESS_PAGE) {
                 error_report("Received an unexpected compressed page");
             }
+            if (flags & invalid_flags  & RAM_SAVE_FLAG_MULTIFD_PAGE) {
+                error_report("Received an unexpected multifd page");
+            }
 
             ret = -EINVAL;
             break;
         }
 
         if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
-                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
+                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
+                     RAM_SAVE_FLAG_MULTIFD_PAGE)) {
             RAMBlock *block = ram_block_from_stream(f, flags);
 
             host = host_from_ram_block_offset(block, addr);
@@ -2920,6 +3007,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
                 break;
             }
             break;
+
+        case RAM_SAVE_FLAG_MULTIFD_PAGE:
+            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
+            break;
+
         case RAM_SAVE_FLAG_EOS:
             /* normal exit */
             break;
diff --git a/qapi/migration.json b/qapi/migration.json
index 6a838369ef..f7efa703de 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -39,6 +39,8 @@
 # @page-size: The number of bytes per page for the various page-based
 #        statistics (since 2.10)
 #
+# @multifd: number of pages sent with multifd (since 2.10)
+#
 # Since: 0.14.0
 ##
 { 'struct': 'MigrationStats',
@@ -46,7 +48,8 @@
            'duplicate': 'int', 'skipped': 'int', 'normal': 'int',
            'normal-bytes': 'int', 'dirty-pages-rate' : 'int',
            'mbps' : 'number', 'dirty-sync-count' : 'int',
-           'postcopy-requests' : 'int', 'page-size' : 'int' } }
+           'postcopy-requests' : 'int', 'page-size' : 'int',
+           'multifd': 'int' } }
 
 ##
 # @XBZRLECacheStats:
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 16/22] migration: Really use multiple pages at a time
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
                   ` (14 preceding siblings ...)
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 15/22] migration: Create ram_multifd_page Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 17/22] migration: Send the fd number which we are going to use for this page Juan Quintela
                   ` (5 subsequent siblings)
  21 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We now send several pages at a time each time that we wakeup a thread.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--

Use iovec's instead of creating the equivalent.
Clear memory used by pages (dave)
Use g_new0(danp)
define MULTIFD_CONTINUE
---
 migration/ram.c | 57 ++++++++++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 48 insertions(+), 9 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 2ee2699bb2..4329039f8c 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -49,6 +49,7 @@
 #include "migration/colo.h"
 #include "sysemu/sysemu.h"
 #include "qemu/uuid.h"
+#include "qemu/iov.h"
 
 /***********************************************************/
 /* ram save/restore */
@@ -362,6 +363,15 @@ static void compress_threads_save_setup(void)
 
 /* Multiple fd's */
 
+/* used to continue on the same multifd group */
+#define MULTIFD_CONTINUE UINT16_MAX
+
+typedef struct {
+    int num;
+    size_t size;
+    struct iovec *iov;
+} multifd_pages_t;
+
 struct MultiFDSendParams {
     /* not changed */
     uint8_t id;
@@ -372,11 +382,7 @@ struct MultiFDSendParams {
     QemuMutex mutex;
     /* protected by param mutex */
     bool quit;
-    /* This is a temp field.  We are using it now to transmit
-       something the address of the page.  Later in the series, we
-       change it for the real page.
-    */
-    uint8_t *address;
+    multifd_pages_t pages;
     /* protected by multifd mutex */
     /* has the thread finish the last submitted job */
     bool done;
@@ -389,8 +395,24 @@ struct {
     int count;
     QemuMutex mutex;
     QemuSemaphore sem;
+    multifd_pages_t pages;
 } *multifd_send_state;
 
+static void multifd_init_group(multifd_pages_t *pages)
+{
+    pages->num = 0;
+    pages->size = migrate_multifd_group();
+    pages->iov = g_new0(struct iovec, pages->size);
+}
+
+static void multifd_clear_group(multifd_pages_t *pages)
+{
+    pages->num = 0;
+    pages->size = 0;
+    g_free(pages->iov);
+    pages->iov = NULL;
+}
+
 static void terminate_multifd_send_threads(Error *errp)
 {
     int i;
@@ -429,9 +451,11 @@ int multifd_save_cleanup(Error **errp)
         socket_send_channel_destroy(p->c);
         g_free(p->name);
         p->name = NULL;
+        multifd_clear_group(&p->pages);
     }
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
+    multifd_clear_group(&multifd_send_state->pages);
     g_free(multifd_send_state);
     multifd_send_state = NULL;
     return ret;
@@ -471,8 +495,8 @@ static void *multifd_send_thread(void *opaque)
             qemu_mutex_unlock(&p->mutex);
             break;
         }
-        if (p->address) {
-            p->address = 0;
+        if (p->pages.num) {
+            p->pages.num = 0;
             qemu_mutex_unlock(&p->mutex);
             qemu_mutex_lock(&multifd_send_state->mutex);
             p->done = true;
@@ -502,6 +526,7 @@ int multifd_save_setup(void)
     multifd_send_state->count = 0;
     qemu_mutex_init(&multifd_send_state->mutex);
     qemu_sem_init(&multifd_send_state->sem, 0);
+    multifd_init_group(&multifd_send_state->pages);
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
@@ -510,7 +535,7 @@ int multifd_save_setup(void)
         p->quit = false;
         p->id = i;
         p->done = true;
-        p->address = 0;
+        multifd_init_group(&p->pages);
         p->c = socket_send_channel_create(&local_err);
         if (!p->c) {
             if (multifd_save_cleanup(&local_err) != 0) {
@@ -531,6 +556,17 @@ static uint16_t multifd_send_page(uint8_t *address, bool last_page)
 {
     int i;
     MultiFDSendParams *p = NULL; /* make happy gcc */
+    multifd_pages_t *pages = &multifd_send_state->pages;
+
+    pages->iov[pages->num].iov_base = address;
+    pages->iov[pages->num].iov_len = TARGET_PAGE_SIZE;
+    pages->num++;
+
+    if (!last_page) {
+        if (pages->num < (pages->size - 1)) {
+            return MULTIFD_CONTINUE;
+        }
+    }
 
     qemu_sem_wait(&multifd_send_state->sem);
     qemu_mutex_lock(&multifd_send_state->mutex);
@@ -544,7 +580,10 @@ static uint16_t multifd_send_page(uint8_t *address, bool last_page)
     }
     qemu_mutex_unlock(&multifd_send_state->mutex);
     qemu_mutex_lock(&p->mutex);
-    p->address = address;
+    p->pages.num = pages->num;
+    iov_copy(p->pages.iov, pages->num, pages->iov, pages->num, 0,
+             iov_size(pages->iov, pages->num));
+    pages->num = 0;
     qemu_mutex_unlock(&p->mutex);
     qemu_sem_post(&p->sem);
 
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 17/22] migration: Send the fd number which we are going to use for this page
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
                   ` (15 preceding siblings ...)
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 16/22] migration: Really use multiple pages at a time Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 18/22] migration: Create thread infrastructure for multifd recv side Juan Quintela
                   ` (4 subsequent siblings)
  21 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We are still sending the page through the main channel, that would
change later in the series

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/ram.c | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 4329039f8c..7eba87b84c 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -587,7 +587,7 @@ static uint16_t multifd_send_page(uint8_t *address, bool last_page)
     qemu_mutex_unlock(&p->mutex);
     qemu_sem_post(&p->sem);
 
-    return 0;
+    return i;
 }
 
 struct MultiFDRecvParams {
@@ -1176,6 +1176,7 @@ static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
                             bool last_stage)
 {
     int pages;
+    uint16_t fd_num;
     uint8_t *p;
     RAMBlock *block = pss->block;
     ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
@@ -1187,8 +1188,10 @@ static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
         ram_counters.transferred +=
             save_page_header(rs, rs->f, block,
                              offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
+        fd_num = multifd_send_page(p, rs->migration_dirty_pages == 1);
+        qemu_put_be16(rs->f, fd_num);
+        ram_counters.transferred += 2; /* size of fd_num */
         qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
-        multifd_send_page(p, rs->migration_dirty_pages == 1);
         ram_counters.transferred += TARGET_PAGE_SIZE;
         pages = 1;
         ram_counters.normal++;
@@ -2938,6 +2941,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
     while (!postcopy_running && !ret && !(flags & RAM_SAVE_FLAG_EOS)) {
         ram_addr_t addr, total_ram_bytes;
         void *host = NULL;
+        uint16_t fd_num;
         uint8_t ch;
 
         addr = qemu_get_be64(f);
@@ -3048,6 +3052,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             break;
 
         case RAM_SAVE_FLAG_MULTIFD_PAGE:
+            fd_num = qemu_get_be16(f);
+            if (fd_num != 0) {
+                /* this is yet an unused variable, changed later */
+                fd_num = fd_num;
+            }
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;
 
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 18/22] migration: Create thread infrastructure for multifd recv side
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
                   ` (16 preceding siblings ...)
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 17/22] migration: Send the fd number which we are going to use for this page Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 19/22] migration: Test new fd infrastructure Juan Quintela
                   ` (3 subsequent siblings)
  21 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We make the locking and the transfer of information specific, even if we
are still receiving things through the main thread.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--

We split when we create the main channel and where we start the main
migration thread, so we wait for the creation of the other threads.

Use multifd_clear_group().
---
 migration/migration.c |  7 ++++---
 migration/migration.h |  1 +
 migration/ram.c       | 55 +++++++++++++++++++++++++++++++++++++++++++++++----
 migration/socket.c    |  2 +-
 4 files changed, 57 insertions(+), 8 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index b5875c0b15..157e8fd1d0 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -388,7 +388,7 @@ static void migration_incoming_setup(QEMUFile *f)
     qemu_file_set_blocking(f, false);
 }
 
-static void migration_incoming_process(void)
+void migration_incoming_process(void)
 {
     Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, NULL);
     qemu_coroutine_enter(co);
@@ -407,9 +407,10 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
     if (!mis->from_src_file) {
         QEMUFile *f = qemu_fopen_channel_input(ioc);
         mis->from_src_file = f;
-        migration_fd_process_incoming(f);
+        migration_incoming_setup(f);
+        return;
     }
-    /* We still only have a single channel.  Nothing to do here yet */
+    multifd_new_channel(ioc);
 }
 
 /**
diff --git a/migration/migration.h b/migration/migration.h
index cb634fc31f..a1b50e53ef 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -158,6 +158,7 @@ void migrate_set_state(int *state, int old_state, int new_state);
 
 void migration_fd_process_incoming(QEMUFile *f);
 void migration_ioc_process_incoming(QIOChannel *ioc);
+void migration_incoming_process(void);
 
 bool  migration_has_all_channels(void);
 
diff --git a/migration/ram.c b/migration/ram.c
index 7eba87b84c..b897d490f2 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -591,13 +591,18 @@ static uint16_t multifd_send_page(uint8_t *address, bool last_page)
 }
 
 struct MultiFDRecvParams {
+    /* not changed */
     uint8_t id;
     char *name;
     QemuThread thread;
     QIOChannel *c;
+    QemuSemaphore ready;
     QemuSemaphore sem;
     QemuMutex mutex;
+    /* proteced by param mutex */
     bool quit;
+    multifd_pages_t pages;
+    bool done;
 };
 typedef struct MultiFDRecvParams MultiFDRecvParams;
 
@@ -607,6 +612,7 @@ struct {
     int count;
     /* Should we finish */
     bool quit;
+    multifd_pages_t pages;
 } *multifd_recv_state;
 
 static void terminate_multifd_recv_threads(Error *errp)
@@ -628,6 +634,7 @@ static void terminate_multifd_recv_threads(Error *errp)
         p->quit = true;
         qemu_sem_post(&p->sem);
         qemu_mutex_unlock(&p->mutex);
+        multifd_clear_group(&p->pages);
     }
 }
 
@@ -652,6 +659,7 @@ int multifd_load_cleanup(Error **errp)
     }
     g_free(multifd_recv_state->params);
     multifd_recv_state->params = NULL;
+    multifd_clear_group(&multifd_recv_state->pages);
     g_free(multifd_recv_state);
     multifd_recv_state = NULL;
 
@@ -662,12 +670,20 @@ static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
 
+    qemu_sem_post(&p->ready);
     while (true) {
         qemu_mutex_lock(&p->mutex);
         if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
         }
+        if (p->pages.num) {
+            p->pages.num = 0;
+            p->done = true;
+            qemu_mutex_unlock(&p->mutex);
+            qemu_sem_post(&p->ready);
+            continue;
+        }
         qemu_mutex_unlock(&p->mutex);
         qemu_sem_wait(&p->sem);
     }
@@ -713,8 +729,11 @@ void multifd_new_channel(QIOChannel *ioc)
     }
     qemu_mutex_init(&p->mutex);
     qemu_sem_init(&p->sem, 0);
+    qemu_sem_init(&p->ready, 0);
     p->quit = false;
     p->id = id;
+    p->done = false;
+    multifd_init_group(&p->pages);
     p->c = ioc;
     multifd_recv_state->count++;
     p->name = g_strdup_printf("multifdrecv_%d", id);
@@ -734,6 +753,7 @@ int multifd_load_setup(void)
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
     multifd_recv_state->count = 0;
     multifd_recv_state->quit = false;
+    multifd_init_group(&multifd_recv_state->pages);
     return 0;
 }
 
@@ -742,6 +762,36 @@ int multifd_created_threads(void)
     return multifd_recv_state->count;
 }
 
+static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
+{
+    int thread_count;
+    MultiFDRecvParams *p;
+    multifd_pages_t *pages = &multifd_recv_state->pages;
+
+    pages->iov[pages->num].iov_base = address;
+    pages->iov[pages->num].iov_len = TARGET_PAGE_SIZE;
+    pages->num++;
+
+    if (fd_num == MULTIFD_CONTINUE) {
+        return;
+    }
+
+    thread_count = migrate_multifd_threads();
+    assert(fd_num < thread_count);
+    p = &multifd_recv_state->params[fd_num];
+
+    qemu_sem_wait(&p->ready);
+
+    qemu_mutex_lock(&p->mutex);
+    p->done = false;
+    iov_copy(p->pages.iov, pages->num, pages->iov, pages->num, 0,
+             iov_size(pages->iov, pages->num));
+    p->pages.num = pages->num;
+    pages->num = 0;
+    qemu_mutex_unlock(&p->mutex);
+    qemu_sem_post(&p->sem);
+}
+
 /**
  * save_page_header: write page header to wire
  *
@@ -3053,10 +3103,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
 
         case RAM_SAVE_FLAG_MULTIFD_PAGE:
             fd_num = qemu_get_be16(f);
-            if (fd_num != 0) {
-                /* this is yet an unused variable, changed later */
-                fd_num = fd_num;
-            }
+            multifd_recv_page(host, fd_num);
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;
 
diff --git a/migration/socket.c b/migration/socket.c
index 58e81ae87b..530962a286 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -179,12 +179,12 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
 
     qio_channel_set_name(QIO_CHANNEL(sioc), "migration-socket-incoming");
     migration_channel_process_incoming(QIO_CHANNEL(sioc));
-    object_unref(OBJECT(sioc));
 
 out:
     if (migration_has_all_channels()) {
         /* Close listening socket as its no longer needed */
         qio_channel_close(ioc, NULL);
+        migration_incoming_process();
         return G_SOURCE_REMOVE;
     } else {
         return G_SOURCE_CONTINUE;
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 19/22] migration: Test new fd infrastructure
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
                   ` (17 preceding siblings ...)
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 18/22] migration: Create thread infrastructure for multifd recv side Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 20/22] migration: Rename initial_bytes Juan Quintela
                   ` (2 subsequent siblings)
  21 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We just send the address through the alternate channels and test that it
is ok.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 39 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 39 insertions(+)

diff --git a/migration/ram.c b/migration/ram.c
index b897d490f2..348ce1141a 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -496,8 +496,24 @@ static void *multifd_send_thread(void *opaque)
             break;
         }
         if (p->pages.num) {
+            Error *local_err = NULL;
+            size_t ret;
+            int i;
+            int num;
+
+            num = p->pages.num;
             p->pages.num = 0;
             qemu_mutex_unlock(&p->mutex);
+
+            for (i = 0; i < num; i++) {
+                ret = qio_channel_write(p->c,
+                                        (const char *)&p->pages.iov[i].iov_base,
+                                        sizeof(uint8_t *), &local_err);
+                if (ret != sizeof(uint8_t *)) {
+                    terminate_multifd_send_threads(local_err);
+                    return NULL;
+                }
+            }
             qemu_mutex_lock(&multifd_send_state->mutex);
             p->done = true;
             qemu_mutex_unlock(&multifd_send_state->mutex);
@@ -669,6 +685,7 @@ int multifd_load_cleanup(Error **errp)
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
+    uint8_t *recv_address;
 
     qemu_sem_post(&p->ready);
     while (true) {
@@ -678,7 +695,29 @@ static void *multifd_recv_thread(void *opaque)
             break;
         }
         if (p->pages.num) {
+            Error *local_err = NULL;
+            size_t ret;
+            int i;
+            int num;
+
+            num = p->pages.num;
             p->pages.num = 0;
+
+            for (i = 0; i < num; i++) {
+                ret = qio_channel_read(p->c, (char *)&recv_address,
+                                       sizeof(uint8_t *), &local_err);
+                if (ret != sizeof(uint8_t *)) {
+                    terminate_multifd_recv_threads(local_err);
+                    return NULL;
+                }
+                if (recv_address != p->pages.iov[i].iov_base) {
+                    error_setg(&local_err, "received %p and expecting %p (%d)\n",
+                               recv_address, p->pages.iov[i].iov_base, i);
+                    terminate_multifd_recv_threads(local_err);
+                    return NULL;
+                }
+            }
+
             p->done = true;
             qemu_mutex_unlock(&p->mutex);
             qemu_sem_post(&p->ready);
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 20/22] migration: Rename initial_bytes
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
                   ` (18 preceding siblings ...)
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 19/22] migration: Test new fd infrastructure Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 21/22] migration: Transfer pages over new channels Juan Quintela
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 22/22] migration: Flush receive queue Juan Quintela
  21 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Now it is called qemu_file_bytes that reflects better what it does,
and we create qemu_file_bytes_now to not have to call qemu_ftell() twice.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/migration.c | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 157e8fd1d0..62c410aac3 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -2064,13 +2064,13 @@ static void *migration_thread(void *opaque)
     /* Used by the bandwidth calcs, updated later */
     int64_t initial_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
     int64_t setup_start = qemu_clock_get_ms(QEMU_CLOCK_HOST);
-    int64_t initial_bytes = 0;
     /*
      * The final stage happens when the remaining data is smaller than
      * this threshold; it's calculated from the requested downtime and
      * measured bandwidth
      */
     int64_t threshold_size = 0;
+    int64_t qemu_file_bytes = 0;
     int64_t start_time = initial_time;
     int64_t end_time;
     bool old_vm_running = false;
@@ -2158,8 +2158,9 @@ static void *migration_thread(void *opaque)
         }
         current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
         if (current_time >= initial_time + BUFFER_DELAY) {
-            uint64_t transferred_bytes = qemu_ftell(s->to_dst_file) -
-                                         initial_bytes;
+            uint64_t qemu_file_bytes_now = qemu_ftell(s->to_dst_file);
+            uint64_t transferred_bytes =
+                qemu_file_bytes_now - qemu_file_bytes;
             uint64_t time_spent = current_time - initial_time;
             double bandwidth = (double)transferred_bytes / time_spent;
             threshold_size = bandwidth * s->parameters.downtime_limit;
@@ -2178,7 +2179,7 @@ static void *migration_thread(void *opaque)
 
             qemu_file_reset_rate_limit(s->to_dst_file);
             initial_time = current_time;
-            initial_bytes = qemu_ftell(s->to_dst_file);
+            qemu_file_bytes = qemu_file_bytes_now;
         }
         if (qemu_file_rate_limit(s->to_dst_file)) {
             /* usleep expects microseconds */
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 21/22] migration: Transfer pages over new channels
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
                   ` (19 preceding siblings ...)
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 20/22] migration: Rename initial_bytes Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 22/22] migration: Flush receive queue Juan Quintela
  21 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We switch for sending the page number to send real pages.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--

Remove the HACK bit, now we have the function that calculates the size
of a page exported.
---
 migration/migration.c |  7 ++++++-
 migration/ram.c       | 38 +++++++++++---------------------------
 2 files changed, 17 insertions(+), 28 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 62c410aac3..1f6efc3207 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -2071,6 +2071,7 @@ static void *migration_thread(void *opaque)
      */
     int64_t threshold_size = 0;
     int64_t qemu_file_bytes = 0;
+    int64_t multifd_pages = 0;
     int64_t start_time = initial_time;
     int64_t end_time;
     bool old_vm_running = false;
@@ -2159,8 +2160,11 @@ static void *migration_thread(void *opaque)
         current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
         if (current_time >= initial_time + BUFFER_DELAY) {
             uint64_t qemu_file_bytes_now = qemu_ftell(s->to_dst_file);
+            uint64_t multifd_pages_now = ram_counters.multifd;
             uint64_t transferred_bytes =
-                qemu_file_bytes_now - qemu_file_bytes;
+                (qemu_file_bytes_now - qemu_file_bytes) +
+                (multifd_pages_now - multifd_pages) *
+                qemu_target_page_size();
             uint64_t time_spent = current_time - initial_time;
             double bandwidth = (double)transferred_bytes / time_spent;
             threshold_size = bandwidth * s->parameters.downtime_limit;
@@ -2180,6 +2184,7 @@ static void *migration_thread(void *opaque)
             qemu_file_reset_rate_limit(s->to_dst_file);
             initial_time = current_time;
             qemu_file_bytes = qemu_file_bytes_now;
+            multifd_pages = multifd_pages_now;
         }
         if (qemu_file_rate_limit(s->to_dst_file)) {
             /* usleep expects microseconds */
diff --git a/migration/ram.c b/migration/ram.c
index 348ce1141a..0c2782c452 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -498,21 +498,16 @@ static void *multifd_send_thread(void *opaque)
         if (p->pages.num) {
             Error *local_err = NULL;
             size_t ret;
-            int i;
             int num;
 
             num = p->pages.num;
             p->pages.num = 0;
             qemu_mutex_unlock(&p->mutex);
 
-            for (i = 0; i < num; i++) {
-                ret = qio_channel_write(p->c,
-                                        (const char *)&p->pages.iov[i].iov_base,
-                                        sizeof(uint8_t *), &local_err);
-                if (ret != sizeof(uint8_t *)) {
-                    terminate_multifd_send_threads(local_err);
-                    return NULL;
-                }
+            ret = qio_channel_writev_all(p->c, p->pages.iov, num, &local_err);
+            if (ret != num * TARGET_PAGE_SIZE) {
+                terminate_multifd_send_threads(local_err);
+                return NULL;
             }
             qemu_mutex_lock(&multifd_send_state->mutex);
             p->done = true;
@@ -685,7 +680,6 @@ int multifd_load_cleanup(Error **errp)
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
-    uint8_t *recv_address;
 
     qemu_sem_post(&p->ready);
     while (true) {
@@ -697,27 +691,16 @@ static void *multifd_recv_thread(void *opaque)
         if (p->pages.num) {
             Error *local_err = NULL;
             size_t ret;
-            int i;
             int num;
 
             num = p->pages.num;
             p->pages.num = 0;
 
-            for (i = 0; i < num; i++) {
-                ret = qio_channel_read(p->c, (char *)&recv_address,
-                                       sizeof(uint8_t *), &local_err);
-                if (ret != sizeof(uint8_t *)) {
-                    terminate_multifd_recv_threads(local_err);
-                    return NULL;
-                }
-                if (recv_address != p->pages.iov[i].iov_base) {
-                    error_setg(&local_err, "received %p and expecting %p (%d)\n",
-                               recv_address, p->pages.iov[i].iov_base, i);
-                    terminate_multifd_recv_threads(local_err);
-                    return NULL;
-                }
+            ret = qio_channel_readv_all(p->c, p->pages.iov, num, &local_err);
+            if (ret != num * TARGET_PAGE_SIZE) {
+                terminate_multifd_recv_threads(local_err);
+                return NULL;
             }
-
             p->done = true;
             qemu_mutex_unlock(&p->mutex);
             qemu_sem_post(&p->ready);
@@ -1279,8 +1262,10 @@ static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
                              offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
         fd_num = multifd_send_page(p, rs->migration_dirty_pages == 1);
         qemu_put_be16(rs->f, fd_num);
+        if (fd_num != MULTIFD_CONTINUE) {
+            qemu_fflush(rs->f);
+        }
         ram_counters.transferred += 2; /* size of fd_num */
-        qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
         ram_counters.transferred += TARGET_PAGE_SIZE;
         pages = 1;
         ram_counters.normal++;
@@ -3143,7 +3128,6 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
         case RAM_SAVE_FLAG_MULTIFD_PAGE:
             fd_num = qemu_get_be16(f);
             multifd_recv_page(host, fd_num);
-            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;
 
         case RAM_SAVE_FLAG_EOS:
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v7 22/22] migration: Flush receive queue
  2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
                   ` (20 preceding siblings ...)
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 21/22] migration: Transfer pages over new channels Juan Quintela
@ 2017-09-06 11:51 ` Juan Quintela
  21 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 11:51 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Each time that we sync the bitmap, it is a possiblity that we receive
a page that is being processed by a different thread.  We fix this
problem just making sure that we wait for all receiving threads to
finish its work before we procedeed with the next stage.

We are low on page flags, so we use a combination that is not valid to
emit that message:  MULTIFD_PAGE and COMPRESSED.

I tried to make a migration command for it, but it don't work because
we sync the bitmap sometimes when we have already sent the beggining
of the section, so I just added a new page flag.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--
Create RAM_SAVE_FLAG_MULTIFD_SYNC (dave suggestion)
Move the set of need_flush to inside the bitmap_sync code (peter suggestion)
---
 migration/ram.c | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 54 insertions(+)

diff --git a/migration/ram.c b/migration/ram.c
index 0c2782c452..295b6002ac 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -71,6 +71,13 @@
 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
 #define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
 
+/* We are getting low on pages flags, so we start using combinations
+   When we need to flush a page, we sent it as
+   RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_COMPRESS_PAGE
+   We don't allow that combination
+*/
+#define RAM_SAVE_FLAG_MULTIFD_SYNC (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO)
+
 static inline bool is_zero_range(uint8_t *p, uint64_t size)
 {
     return buffer_is_zero(p, size);
@@ -193,6 +200,9 @@ struct RAMState {
     uint64_t iterations_prev;
     /* Iterations since start */
     uint64_t iterations;
+    /* Indicates if we have synced the bitmap and we need to assure that
+       target has processeed all previous pages */
+    bool multifd_needs_flush;
     /* number of dirty bits in the bitmap */
     uint64_t migration_dirty_pages;
     /* protects modification of the bitmap */
@@ -609,9 +619,11 @@ struct MultiFDRecvParams {
     QIOChannel *c;
     QemuSemaphore ready;
     QemuSemaphore sem;
+    QemuCond cond_sync;
     QemuMutex mutex;
     /* proteced by param mutex */
     bool quit;
+    bool sync;
     multifd_pages_t pages;
     bool done;
 };
@@ -664,6 +676,7 @@ int multifd_load_cleanup(Error **errp)
         qemu_thread_join(&p->thread);
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        qemu_cond_destroy(&p->cond_sync);
         socket_recv_channel_destroy(p->c);
         g_free(p->name);
         p->name = NULL;
@@ -702,6 +715,10 @@ static void *multifd_recv_thread(void *opaque)
                 return NULL;
             }
             p->done = true;
+            if (p->sync) {
+                qemu_cond_signal(&p->cond_sync);
+                p->sync = false;
+            }
             qemu_mutex_unlock(&p->mutex);
             qemu_sem_post(&p->ready);
             continue;
@@ -752,9 +769,11 @@ void multifd_new_channel(QIOChannel *ioc)
     qemu_mutex_init(&p->mutex);
     qemu_sem_init(&p->sem, 0);
     qemu_sem_init(&p->ready, 0);
+    qemu_cond_init(&p->cond_sync);
     p->quit = false;
     p->id = id;
     p->done = false;
+    p->sync = false;
     multifd_init_group(&p->pages);
     p->c = ioc;
     multifd_recv_state->count++;
@@ -814,6 +833,27 @@ static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
     qemu_sem_post(&p->sem);
 }
 
+static int multifd_flush(void)
+{
+    int i, thread_count;
+
+    if (!migrate_use_multifd()) {
+        return 0;
+    }
+    thread_count = migrate_multifd_threads();
+    for (i = 0; i < thread_count; i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        qemu_mutex_lock(&p->mutex);
+        while (!p->done) {
+            p->sync = true;
+            qemu_cond_wait(&p->cond_sync, &p->mutex);
+        }
+        qemu_mutex_unlock(&p->mutex);
+    }
+    return 0;
+}
+
 /**
  * save_page_header: write page header to wire
  *
@@ -831,6 +871,12 @@ static size_t save_page_header(RAMState *rs, QEMUFile *f,  RAMBlock *block,
 {
     size_t size, len;
 
+    if (rs->multifd_needs_flush &&
+        (offset & RAM_SAVE_FLAG_MULTIFD_PAGE)) {
+        offset |= RAM_SAVE_FLAG_ZERO;
+        rs->multifd_needs_flush = false;
+    }
+
     if (block == rs->last_sent_block) {
         offset |= RAM_SAVE_FLAG_CONTINUE;
     }
@@ -1116,6 +1162,9 @@ static void migration_bitmap_sync(RAMState *rs)
     if (migrate_use_events()) {
         qapi_event_send_migration_pass(ram_counters.dirty_sync_count, NULL);
     }
+    if (!rs->ram_bulk_stage && migrate_use_multifd()) {
+        rs->multifd_needs_flush = true;
+    }
 }
 
 /**
@@ -3034,6 +3083,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             break;
         }
 
+        if ((flags & RAM_SAVE_FLAG_MULTIFD_SYNC)
+            == RAM_SAVE_FLAG_MULTIFD_SYNC) {
+            multifd_flush();
+            flags = flags & ~RAM_SAVE_FLAG_ZERO;
+        }
         if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
                      RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
                      RAM_SAVE_FLAG_MULTIFD_PAGE)) {
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v7 01/22] Revert "io: add new qio_channel_{readv, writev, read, write}_all functions"
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 01/22] Revert "io: add new qio_channel_{readv, writev, read, write}_all functions" Juan Quintela
@ 2017-09-06 14:00   ` Eric Blake
  2017-09-06 14:42     ` Juan Quintela
  0 siblings, 1 reply; 35+ messages in thread
From: Eric Blake @ 2017-09-06 14:00 UTC (permalink / raw)
  To: Juan Quintela, qemu-devel; +Cc: lvivier, dgilbert, peterx

[-- Attachment #1: Type: text/plain, Size: 1089 bytes --]

On 09/06/2017 06:51 AM, Juan Quintela wrote:
> This reverts commit d4622e55883211072621958d39ddaa73483d201e.

But with no reason why?  What bugs are you fixing by reverting this?

> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  include/io/channel.h       |  90 ---------------------------------------
>  io/channel.c               |  94 -----------------------------------------
>  tests/io-channel-helpers.c | 102 +++++++++++++++++++++++++++++++++++++++++----
>  3 files changed, 93 insertions(+), 193 deletions(-)
>

Looking ahead, I see 8/22 recreates qio_channel_readv_all (but not
qio_channel_read_all); how does that differ from this one?

Should you be squashing 1/22 and 8/22 into a single non-revert patch
that just fixes bugs on top of what is already in the tree?

Also, have you seen my patches, that also fix bugs in the _all functions?
https://lists.gnu.org/archive/html/qemu-devel/2017-09/msg01053.html

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3266
Virtualization:  qemu.org | libvirt.org


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 619 bytes --]

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v7 08/22] qio: Create new qio_channel_{readv, writev}_all
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 08/22] qio: Create new qio_channel_{readv, writev}_all Juan Quintela
@ 2017-09-06 14:03   ` Eric Blake
  2017-09-06 14:11     ` Daniel P. Berrange
  0 siblings, 1 reply; 35+ messages in thread
From: Eric Blake @ 2017-09-06 14:03 UTC (permalink / raw)
  To: Juan Quintela, qemu-devel; +Cc: lvivier, dgilbert, peterx, Daniel P. Berrange

[-- Attachment #1: Type: text/plain, Size: 651 bytes --]

On 09/06/2017 06:51 AM, Juan Quintela wrote:
> The functions waits until it is able to write the full iov.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> --
> 
> Add tests.
> 
> fix reader to check for len == 0.
> 
> make reader wait on G_IO_IN (dave)
> change tests to make sure that all combinations of call work with all backends

Since Dan's patch to add qio_channel_readv_all has already landed,
shouldn't this patch just be rebased to add the tests? Or is it also
fixing some bugs?

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3266
Virtualization:  qemu.org | libvirt.org


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 619 bytes --]

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v7 08/22] qio: Create new qio_channel_{readv, writev}_all
  2017-09-06 14:03   ` Eric Blake
@ 2017-09-06 14:11     ` Daniel P. Berrange
  0 siblings, 0 replies; 35+ messages in thread
From: Daniel P. Berrange @ 2017-09-06 14:11 UTC (permalink / raw)
  To: Eric Blake; +Cc: Juan Quintela, qemu-devel, lvivier, dgilbert, peterx

On Wed, Sep 06, 2017 at 09:03:40AM -0500, Eric Blake wrote:
> On 09/06/2017 06:51 AM, Juan Quintela wrote:
> > The functions waits until it is able to write the full iov.
> > 
> > Signed-off-by: Juan Quintela <quintela@redhat.com>
> > 
> > --
> > 
> > Add tests.
> > 
> > fix reader to check for len == 0.
> > 
> > make reader wait on G_IO_IN (dave)
> > change tests to make sure that all combinations of call work with all backends
> 
> Since Dan's patch to add qio_channel_readv_all has already landed,
> shouldn't this patch just be rebased to add the tests? Or is it also
> fixing some bugs?

I don't think this patch is needed at all from the QIO pov now - the test
script changes don't add any extra coverage compared to what merged.

IOW, only needs the migration/ bit of the patch

Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v7 01/22] Revert "io: add new qio_channel_{readv, writev, read, write}_all functions"
  2017-09-06 14:00   ` Eric Blake
@ 2017-09-06 14:42     ` Juan Quintela
  2017-09-06 16:09       ` Eric Blake
  2017-09-06 16:32       ` Daniel P. Berrange
  0 siblings, 2 replies; 35+ messages in thread
From: Juan Quintela @ 2017-09-06 14:42 UTC (permalink / raw)
  To: Eric Blake; +Cc: qemu-devel, lvivier, dgilbert, peterx

Eric Blake <eblake@redhat.com> wrote:
> On 09/06/2017 06:51 AM, Juan Quintela wrote:
>> This reverts commit d4622e55883211072621958d39ddaa73483d201e.
>
> But with no reason why?  What bugs are you fixing by reverting this?

I put it on the cover letter.  I am investigating *why* it fails on me.
It got the thread handed.

>
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  include/io/channel.h       |  90 ---------------------------------------
>>  io/channel.c               |  94 -----------------------------------------
>>  tests/io-channel-helpers.c | 102 +++++++++++++++++++++++++++++++++++++++++----
>>  3 files changed, 93 insertions(+), 193 deletions(-)
>>
>
> Looking ahead, I see 8/22 recreates qio_channel_readv_all (but not
> qio_channel_read_all); how does that differ from this one?
>
> Should you be squashing 1/22 and 8/22 into a single non-revert patch
> that just fixes bugs on top of what is already in the tree?

My plan is to fix whatever is there and see why it is failing.

> Also, have you seen my patches, that also fix bugs in the _all functions?
> https://lists.gnu.org/archive/html/qemu-devel/2017-09/msg01053.html

No, I have to take a look, thanks.

Thanks, Juan.

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v7 01/22] Revert "io: add new qio_channel_{readv, writev, read, write}_all functions"
  2017-09-06 14:42     ` Juan Quintela
@ 2017-09-06 16:09       ` Eric Blake
  2017-09-06 16:32       ` Daniel P. Berrange
  1 sibling, 0 replies; 35+ messages in thread
From: Eric Blake @ 2017-09-06 16:09 UTC (permalink / raw)
  To: quintela; +Cc: qemu-devel, lvivier, dgilbert, peterx

[-- Attachment #1: Type: text/plain, Size: 1093 bytes --]

On 09/06/2017 09:42 AM, Juan Quintela wrote:
> Eric Blake <eblake@redhat.com> wrote:
>> On 09/06/2017 06:51 AM, Juan Quintela wrote:
>>> This reverts commit d4622e55883211072621958d39ddaa73483d201e.
>>
>> But with no reason why?  What bugs are you fixing by reverting this?
> 
> I put it on the cover letter.  I am investigating *why* it fails on me.
> It got the thread handed.

Then I think I ran into the same problems as you (NBD was hanging for me
due to the nested event loop being called from a coroutine context),...

> 
> My plan is to fix whatever is there and see why it is failing.
> 
>> Also, have you seen my patches, that also fix bugs in the _all functions?
>> https://lists.gnu.org/archive/html/qemu-devel/2017-09/msg01053.html
> 
> No, I have to take a look, thanks.

...and hopefully my patch helps the hang you were seeing (now in a pull
request:
https://lists.gnu.org/archive/html/qemu-devel/2017-09/msg01338.html).

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3266
Virtualization:  qemu.org | libvirt.org


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 619 bytes --]

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v7 07/22] migration: Make migrate_fd_error() the owner of the Error
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 07/22] migration: Make migrate_fd_error() the owner of the Error Juan Quintela
@ 2017-09-06 16:15   ` Eric Blake
  2017-09-07 11:53     ` Juan Quintela
  2017-09-11 18:58     ` Dr. David Alan Gilbert
  0 siblings, 2 replies; 35+ messages in thread
From: Eric Blake @ 2017-09-06 16:15 UTC (permalink / raw)
  To: Juan Quintela, qemu-devel; +Cc: lvivier, dgilbert, peterx

[-- Attachment #1: Type: text/plain, Size: 1442 bytes --]

On 09/06/2017 06:51 AM, Juan Quintela wrote:
> So far, we had to free the error after each caller, so just do it
> here.  Once there, tls.c was leaking the error.

You mention tls.c,

> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/channel.c   |  1 -
>  migration/migration.c | 10 ++++------
>  migration/migration.h |  4 ++--
>  migration/socket.c    |  1 -
>  4 files changed, 6 insertions(+), 10 deletions(-)

but don't touch it.  Am I missing something?

>  
> -void migrate_fd_error(MigrationState *s, const Error *error)
> +void migrate_fd_error(MigrationState *s, Error *error)
>  {

No comments at definition,

> +++ b/migration/migration.h
> @@ -163,8 +163,8 @@ bool  migration_has_all_channels(void);
>  
>  uint64_t migrate_max_downtime(void);
>  
> -void migrate_set_error(MigrationState *s, const Error *error);
> -void migrate_fd_error(MigrationState *s, const Error *error);
> +void migrate_set_error(MigrationState *s, Error *error);
> +void migrate_fd_error(MigrationState *s, Error *error);

or at declaration. That would be worth adding at some point, but this
patch isn't making it worse.

The code looks okay in isolation, so if it is only the commit message
that needs fixing,
Reviewed-by: Eric Blake <eblake@redhat.com>

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3266
Virtualization:  qemu.org | libvirt.org


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 619 bytes --]

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v7 01/22] Revert "io: add new qio_channel_{readv, writev, read, write}_all functions"
  2017-09-06 14:42     ` Juan Quintela
  2017-09-06 16:09       ` Eric Blake
@ 2017-09-06 16:32       ` Daniel P. Berrange
  1 sibling, 0 replies; 35+ messages in thread
From: Daniel P. Berrange @ 2017-09-06 16:32 UTC (permalink / raw)
  To: Juan Quintela; +Cc: Eric Blake, lvivier, qemu-devel, peterx, dgilbert

On Wed, Sep 06, 2017 at 04:42:18PM +0200, Juan Quintela wrote:
> Eric Blake <eblake@redhat.com> wrote:
> > On 09/06/2017 06:51 AM, Juan Quintela wrote:
> >> This reverts commit d4622e55883211072621958d39ddaa73483d201e.
> >
> > But with no reason why?  What bugs are you fixing by reverting this?
> 
> I put it on the cover letter.  I am investigating *why* it fails on me.
> It got the thread handed.

Your functions return the number of bytes written. My impl only
returns the 0 or -1 on the basis that the caller does not need
to know how many bytes were written - its always exactly the
amount asked for. You probably need to adjust your code to take
that into account.

Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v7 06/22] migration: Improve migration thread error handling
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 06/22] migration: Improve migration thread error handling Juan Quintela
@ 2017-09-06 19:04   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 35+ messages in thread
From: Dr. David Alan Gilbert @ 2017-09-06 19:04 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> We now report errors also when we finish migration, not only on info
> migrate.  We plan to use this error from several places, and we want
> the first error to happen to win, so we add an mutex to order it.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

I think this is OK for errors from the migration code itself;
I'd prefer not to lost any multiplelayers of errors we get
from devices (e.g.we see an error about a particular bit, and then
see it's from a PCI device etc)

> diff --git a/migration/ram.c b/migration/ram.c
> index e18b3e2d4f..e0179fc838 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -1789,7 +1789,7 @@ int ram_discard_range(const char *rbname, uint64_t start, size_t length)
>      RAMBlock *rb = qemu_ram_block_by_name(rbname);
>  
>      if (!rb) {
> -        error_report("ram_discard_range: Failed to find block '%s'", rbname);
> +        error_report("ram_discard_rang0e: Failed to find block '%s'", rbname);

Typo!

Dave

>          goto err;
>      }
>  
> diff --git a/migration/tls.c b/migration/tls.c
> index 596e8790bd..026a008667 100644
> --- a/migration/tls.c
> +++ b/migration/tls.c
> @@ -119,7 +119,6 @@ static void migration_tls_outgoing_handshake(QIOTask *task,
>      if (qio_task_propagate_error(task, &err)) {
>          trace_migration_tls_outgoing_handshake_error(error_get_pretty(err));
>          migrate_fd_error(s, err);
> -        error_free(err);
>      } else {
>          trace_migration_tls_outgoing_handshake_complete();
>          migration_channel_connect(s, ioc, NULL);
> -- 
> 2.13.5
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v7 07/22] migration: Make migrate_fd_error() the owner of the Error
  2017-09-06 16:15   ` Eric Blake
@ 2017-09-07 11:53     ` Juan Quintela
  2017-09-11 18:58     ` Dr. David Alan Gilbert
  1 sibling, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-09-07 11:53 UTC (permalink / raw)
  To: Eric Blake; +Cc: qemu-devel, lvivier, dgilbert, peterx

Eric Blake <eblake@redhat.com> wrote:
D> On 09/06/2017 06:51 AM, Juan Quintela wrote:
>> So far, we had to free the error after each caller, so just do it
>> here.  Once there, tls.c was leaking the error.
>
> You mention tls.c,
>
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/channel.c   |  1 -
>>  migration/migration.c | 10 ++++------
>>  migration/migration.h |  4 ++--
>>  migration/socket.c    |  1 -
>>  4 files changed, 6 insertions(+), 10 deletions(-)
>
> but don't touch it.  Am I missing something?
>

It was missing error_free();  So it leaked the Error * variable.
I will improve the message for next version.



>>  
>> -void migrate_fd_error(MigrationState *s, const Error *error)
>> +void migrate_fd_error(MigrationState *s, Error *error)
>>  {
>
> No comments at definition,

We free it inside now, so it can't be const.

>> +++ b/migration/migration.h
>> @@ -163,8 +163,8 @@ bool  migration_has_all_channels(void);
>>  
>>  uint64_t migrate_max_downtime(void);
>>  
>> -void migrate_set_error(MigrationState *s, const Error *error);
>> -void migrate_fd_error(MigrationState *s, const Error *error);
>> +void migrate_set_error(MigrationState *s, Error *error);
>> +void migrate_fd_error(MigrationState *s, Error *error);
>
> or at declaration. That would be worth adding at some point, but this
> patch isn't making it worse.

will add them, thanks.

> The code looks okay in isolation, so if it is only the commit message
> that needs fixing,
> Reviewed-by: Eric Blake <eblake@redhat.com>

Thanks.

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v7 03/22] migration: Teach it about G_SOURCE_REMOVE
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 03/22] migration: Teach it about G_SOURCE_REMOVE Juan Quintela
@ 2017-09-08 17:18   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 35+ messages in thread
From: Dr. David Alan Gilbert @ 2017-09-08 17:18 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> As this is defined on glib 2.32, add compatibility macros for older glibs.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> Reviewed-by: Daniel P. Berrange <berrange@redhat.com>
> Reviewed-by: Peter Xu <peterx@redhat.com>

I think 03 and 04 could both be merged to current qemu?

I think you could remove the G_SOURCE_CONTINUE/REMOVE defines in
contrib/vhost-user-scsi/vhost-user-scsi.c as well.

Dave

> ---
>  include/glib-compat.h | 2 ++
>  migration/exec.c      | 2 +-
>  migration/fd.c        | 2 +-
>  migration/socket.c    | 2 +-
>  4 files changed, 5 insertions(+), 3 deletions(-)
> 
> diff --git a/include/glib-compat.h b/include/glib-compat.h
> index fcffcd3f07..e15aca2d40 100644
> --- a/include/glib-compat.h
> +++ b/include/glib-compat.h
> @@ -223,6 +223,8 @@ static inline gboolean g_hash_table_contains(GHashTable *hash_table,
>  {
>      return g_hash_table_lookup_extended(hash_table, key, NULL, NULL);
>  }
> +#define G_SOURCE_CONTINUE TRUE
> +#define G_SOURCE_REMOVE FALSE
>  #endif
>  
>  #ifndef g_assert_true
> diff --git a/migration/exec.c b/migration/exec.c
> index 08b599e0e2..f3be1baf2e 100644
> --- a/migration/exec.c
> +++ b/migration/exec.c
> @@ -49,7 +49,7 @@ static gboolean exec_accept_incoming_migration(QIOChannel *ioc,
>  {
>      migration_channel_process_incoming(ioc);
>      object_unref(OBJECT(ioc));
> -    return FALSE; /* unregister */
> +    return G_SOURCE_REMOVE;
>  }
>  
>  void exec_start_incoming_migration(const char *command, Error **errp)
> diff --git a/migration/fd.c b/migration/fd.c
> index 30f5258a6a..30de4b9847 100644
> --- a/migration/fd.c
> +++ b/migration/fd.c
> @@ -49,7 +49,7 @@ static gboolean fd_accept_incoming_migration(QIOChannel *ioc,
>  {
>      migration_channel_process_incoming(ioc);
>      object_unref(OBJECT(ioc));
> -    return FALSE; /* unregister */
> +    return G_SOURCE_REMOVE;
>  }
>  
>  void fd_start_incoming_migration(const char *infd, Error **errp)
> diff --git a/migration/socket.c b/migration/socket.c
> index 757d3821a1..b02d37d7a3 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -154,7 +154,7 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
>  out:
>      /* Close listening socket as its no longer needed */
>      qio_channel_close(ioc, NULL);
> -    return FALSE; /* unregister */
> +    return G_SOURCE_REMOVE;
>  }
>  
>  
> -- 
> 2.13.5
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v7 13/22] migration: Split migration_fd_process_incoming
  2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 13/22] migration: Split migration_fd_process_incoming Juan Quintela
@ 2017-09-08 17:22   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 35+ messages in thread
From: Dr. David Alan Gilbert @ 2017-09-08 17:22 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> We need that on later patches.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
> Reviewed-by: Peter Xu <peterx@redhat.com>
> Reviewed-by: Daniel P. Berrange <berrange@redhat.com>

I think this could also go into current qemu?

Dave

> ---
>  migration/migration.c | 14 ++++++++++++--
>  1 file changed, 12 insertions(+), 2 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index 9fec880a58..18bd24a14c 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -372,9 +372,8 @@ static void process_incoming_migration_co(void *opaque)
>      qemu_bh_schedule(mis->bh);
>  }
>  
> -void migration_fd_process_incoming(QEMUFile *f)
> +static void migration_incoming_setup(QEMUFile *f)
>  {
> -    Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, NULL);
>      MigrationIncomingState *mis = migration_incoming_get_current();
>  
>      if (multifd_load_setup() != 0) {
> @@ -387,9 +386,20 @@ void migration_fd_process_incoming(QEMUFile *f)
>          mis->from_src_file = f;
>      }
>      qemu_file_set_blocking(f, false);
> +}
> +
> +static void migration_incoming_process(void)
> +{
> +    Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, NULL);
>      qemu_coroutine_enter(co);
>  }
>  
> +void migration_fd_process_incoming(QEMUFile *f)
> +{
> +    migration_incoming_setup(f);
> +    migration_incoming_process();
> +}
> +
>  void migration_ioc_process_incoming(QIOChannel *ioc)
>  {
>      MigrationIncomingState *mis = migration_incoming_get_current();
> -- 
> 2.13.5
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v7 07/22] migration: Make migrate_fd_error() the owner of the Error
  2017-09-06 16:15   ` Eric Blake
  2017-09-07 11:53     ` Juan Quintela
@ 2017-09-11 18:58     ` Dr. David Alan Gilbert
  1 sibling, 0 replies; 35+ messages in thread
From: Dr. David Alan Gilbert @ 2017-09-11 18:58 UTC (permalink / raw)
  To: Eric Blake; +Cc: Juan Quintela, qemu-devel, lvivier, peterx

* Eric Blake (eblake@redhat.com) wrote:
> On 09/06/2017 06:51 AM, Juan Quintela wrote:
> > So far, we had to free the error after each caller, so just do it
> > here.  Once there, tls.c was leaking the error.
> 
> You mention tls.c,
> 
> > 
> > Signed-off-by: Juan Quintela <quintela@redhat.com>
> > ---
> >  migration/channel.c   |  1 -
> >  migration/migration.c | 10 ++++------
> >  migration/migration.h |  4 ++--
> >  migration/socket.c    |  1 -
> >  4 files changed, 6 insertions(+), 10 deletions(-)
> 
> but don't touch it.  Am I missing something?


hmm well I see in migration/tls.c:

    if (qio_task_propagate_error(task, &err)) {
        trace_migration_tls_outgoing_handshake_error(error_get_pretty(err));
        migrate_fd_error(s, err);
        error_free(err);
    } else {
        trace_migration_tls_outgoing_handshake_complete();
        migration_channel_connect(s, ioc, NULL);
    }

so I think that error_free has to go?

Dave

> >  
> > -void migrate_fd_error(MigrationState *s, const Error *error)
> > +void migrate_fd_error(MigrationState *s, Error *error)
> >  {
> 
> No comments at definition,
> 
> > +++ b/migration/migration.h
> > @@ -163,8 +163,8 @@ bool  migration_has_all_channels(void);
> >  
> >  uint64_t migrate_max_downtime(void);
> >  
> > -void migrate_set_error(MigrationState *s, const Error *error);
> > -void migrate_fd_error(MigrationState *s, const Error *error);
> > +void migrate_set_error(MigrationState *s, Error *error);
> > +void migrate_fd_error(MigrationState *s, Error *error);
> 
> or at declaration. That would be worth adding at some point, but this
> patch isn't making it worse.
> 
> The code looks okay in isolation, so if it is only the commit message
> that needs fixing,
> Reviewed-by: Eric Blake <eblake@redhat.com>
> 
> -- 
> Eric Blake, Principal Software Engineer
> Red Hat, Inc.           +1-919-301-3266
> Virtualization:  qemu.org | libvirt.org
> 



--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 35+ messages in thread

end of thread, other threads:[~2017-09-11 18:58 UTC | newest]

Thread overview: 35+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2017-09-06 11:51 [Qemu-devel] [PATCH v7 00/22] Multifd Juan Quintela
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 01/22] Revert "io: add new qio_channel_{readv, writev, read, write}_all functions" Juan Quintela
2017-09-06 14:00   ` Eric Blake
2017-09-06 14:42     ` Juan Quintela
2017-09-06 16:09       ` Eric Blake
2017-09-06 16:32       ` Daniel P. Berrange
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 02/22] migration: Create migration_ioc_process_incoming() Juan Quintela
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 03/22] migration: Teach it about G_SOURCE_REMOVE Juan Quintela
2017-09-08 17:18   ` Dr. David Alan Gilbert
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 04/22] migration: Add comments to channel functions Juan Quintela
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 05/22] migration: Create migration_has_all_channels Juan Quintela
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 06/22] migration: Improve migration thread error handling Juan Quintela
2017-09-06 19:04   ` Dr. David Alan Gilbert
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 07/22] migration: Make migrate_fd_error() the owner of the Error Juan Quintela
2017-09-06 16:15   ` Eric Blake
2017-09-07 11:53     ` Juan Quintela
2017-09-11 18:58     ` Dr. David Alan Gilbert
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 08/22] qio: Create new qio_channel_{readv, writev}_all Juan Quintela
2017-09-06 14:03   ` Eric Blake
2017-09-06 14:11     ` Daniel P. Berrange
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 09/22] migration: Add multifd capability Juan Quintela
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 10/22] migration: Create x-multifd-threads parameter Juan Quintela
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 11/22] migration: Create x-multifd-group parameter Juan Quintela
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 12/22] migration: Create multifd migration threads Juan Quintela
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 13/22] migration: Split migration_fd_process_incoming Juan Quintela
2017-09-08 17:22   ` Dr. David Alan Gilbert
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 14/22] migration: Start of multiple fd work Juan Quintela
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 15/22] migration: Create ram_multifd_page Juan Quintela
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 16/22] migration: Really use multiple pages at a time Juan Quintela
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 17/22] migration: Send the fd number which we are going to use for this page Juan Quintela
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 18/22] migration: Create thread infrastructure for multifd recv side Juan Quintela
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 19/22] migration: Test new fd infrastructure Juan Quintela
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 20/22] migration: Rename initial_bytes Juan Quintela
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 21/22] migration: Transfer pages over new channels Juan Quintela
2017-09-06 11:51 ` [Qemu-devel] [PATCH v7 22/22] migration: Flush receive queue Juan Quintela

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.