All of lore.kernel.org
 help / color / mirror / Atom feed
From: Juan Quintela <quintela@redhat.com>
To: qemu-devel@nongnu.org
Cc: "Laurent Vivier" <lvivier@redhat.com>,
	"Thomas Huth" <thuth@redhat.com>,
	"Daniel P. Berrangé" <berrange@redhat.com>,
	"Eduardo Habkost" <ehabkost@redhat.com>,
	"Juan Quintela" <quintela@redhat.com>,
	"Markus Armbruster" <armbru@redhat.com>,
	"Dr. David Alan Gilbert" <dgilbert@redhat.com>,
	"Paolo Bonzini" <pbonzini@redhat.com>
Subject: [PATCH v3 18/21] migration: Make no compression operations into its own structure
Date: Thu, 23 Jan 2020 12:58:28 +0100	[thread overview]
Message-ID: <20200123115831.36842-19-quintela@redhat.com> (raw)
In-Reply-To: <20200123115831.36842-1-quintela@redhat.com>

It will be used later.

Signed-off-by: Juan Quintela <quintela@redhat.com>

---
Move setup of ->ops helper to proper place (wei)
Rename s/none/nocomp/ (dave)
Introduce MULTIFD_FLAG_NOCOMP
right order of arguments for print
---
 migration/migration.c |   9 +++
 migration/migration.h |   1 +
 migration/multifd.c   | 183 ++++++++++++++++++++++++++++++++++++++++--
 migration/multifd.h   |  22 +++++
 migration/ram.c       |   1 +
 5 files changed, 208 insertions(+), 8 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 3501bc3353..d25fdb3e6b 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -2245,6 +2245,15 @@ int migrate_multifd_channels(void)
     return s->parameters.multifd_channels;
 }
 
+int migrate_multifd_method(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->parameters.multifd_compress;
+}
+
 int migrate_use_xbzrle(void)
 {
     MigrationState *s;
diff --git a/migration/migration.h b/migration/migration.h
index 8473ddfc88..3b0b413a93 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -300,6 +300,7 @@ bool migrate_auto_converge(void);
 bool migrate_use_multifd(void);
 bool migrate_pause_before_switchover(void);
 int migrate_multifd_channels(void);
+int migrate_multifd_method(void);
 
 int migrate_use_xbzrle(void);
 int64_t migrate_xbzrle_cache_size(void);
diff --git a/migration/multifd.c b/migration/multifd.c
index 1875bb3aaa..353140cd25 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -38,6 +38,132 @@ typedef struct {
     uint64_t unused2[4];    /* Reserved for future use */
 } __attribute__((packed)) MultiFDInit_t;
 
+/* Multifd without compression */
+
+/**
+ * nocomp_send_setup: setup send side
+ *
+ * For no compression this function does nothing.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
+{
+    return 0;
+}
+
+/**
+ * nocomp_send_cleanup: cleanup send side
+ *
+ * For no compression this function does nothing.
+ *
+ * @p: Params for the channel that we are using
+ */
+static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
+{
+    return;
+}
+
+/**
+ * nocomp_send_prepare: prepare date to be able to send
+ *
+ * For no compression we just have to calculate the size of the
+ * packet.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @used: number of pages used
+ * @errp: pointer to an error
+ */
+static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used,
+                               Error **errp)
+{
+    p->next_packet_size = used * qemu_target_page_size();
+    p->flags |= MULTIFD_FLAG_NOCOMP;
+    return 0;
+}
+
+/**
+ * nocomp_send_write: do the actual write of the data
+ *
+ * For no compression we just have to write the data.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @used: number of pages used
+ * @errp: pointer to an error
+ */
+static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
+{
+    return qio_channel_writev_all(p->c, p->pages->iov, used, errp);
+}
+
+/**
+ * nocomp_recv_setup: setup receive side
+ *
+ * For no compression this function does nothing.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
+{
+    return 0;
+}
+
+/**
+ * nocomp_recv_cleanup: setup receive side
+ *
+ * For no compression this function does nothing.
+ *
+ * @p: Params for the channel that we are using
+ */
+static void nocomp_recv_cleanup(MultiFDRecvParams *p)
+{
+}
+
+/**
+ * nocomp_recv_pages: read the data from the channel into actual pages
+ *
+ * For no compression we just need to read things into the correct place.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @used: number of pages used
+ * @errp: pointer to an error
+ */
+static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
+{
+    if (p->flags != MULTIFD_FLAG_NOCOMP) {
+        error_setg(errp, "multifd %d: flags received %x flags expected %x",
+                   p->id, p->flags, MULTIFD_FLAG_NOCOMP);
+        return -1;
+    }
+    return qio_channel_readv_all(p->c, p->pages->iov, used, errp);
+}
+
+static MultiFDMethods multifd_nocomp_ops = {
+    .send_setup = nocomp_send_setup,
+    .send_cleanup = nocomp_send_cleanup,
+    .send_prepare = nocomp_send_prepare,
+    .send_write = nocomp_send_write,
+    .recv_setup = nocomp_recv_setup,
+    .recv_cleanup = nocomp_recv_cleanup,
+    .recv_pages = nocomp_recv_pages
+};
+
+static MultiFDMethods *multifd_ops[MULTIFD_COMPRESS__MAX] = {
+    [MULTIFD_COMPRESS_NONE] = &multifd_nocomp_ops,
+};
+
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
 {
     MultiFDInit_t msg = {};
@@ -246,6 +372,8 @@ struct {
      * We will use atomic operations.  Only valid values are 0 and 1.
      */
     int exiting;
+    /* multifd ops */
+    MultiFDMethods *ops;
 } *multifd_send_state;
 
 /*
@@ -397,6 +525,7 @@ void multifd_save_cleanup(void)
     }
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
+        Error *local_err = NULL;
 
         socket_send_channel_destroy(p->c);
         p->c = NULL;
@@ -410,6 +539,10 @@ void multifd_save_cleanup(void)
         p->packet_len = 0;
         g_free(p->packet);
         p->packet = NULL;
+        multifd_send_state->ops->send_cleanup(p, &local_err);
+        if (local_err) {
+            migrate_set_error(migrate_get_current(), local_err);
+        }
     }
     qemu_sem_destroy(&multifd_send_state->channels_ready);
     g_free(multifd_send_state->params);
@@ -494,7 +627,14 @@ static void *multifd_send_thread(void *opaque)
             uint64_t packet_num = p->packet_num;
             flags = p->flags;
 
-            p->next_packet_size = used * qemu_target_page_size();
+            if (used) {
+                ret = multifd_send_state->ops->send_prepare(p, used,
+                                                            &local_err);
+                if (ret != 0) {
+                    qemu_mutex_unlock(&p->mutex);
+                    break;
+                }
+            }
             multifd_send_fill_packet(p);
             p->flags = 0;
             p->num_packets++;
@@ -513,8 +653,7 @@ static void *multifd_send_thread(void *opaque)
             }
 
             if (used) {
-                ret = qio_channel_writev_all(p->c, p->pages->iov,
-                                             used, &local_err);
+                ret = multifd_send_state->ops->send_write(p, used, &local_err);
                 if (ret != 0) {
                     break;
                 }
@@ -596,6 +735,7 @@ int multifd_save_setup(Error **errp)
     multifd_send_state->pages = multifd_pages_init(page_count);
     qemu_sem_init(&multifd_send_state->channels_ready, 0);
     atomic_set(&multifd_send_state->exiting, 0);
+    multifd_send_state->ops = multifd_ops[migrate_multifd_method()];
 
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
@@ -615,6 +755,18 @@ int multifd_save_setup(Error **errp)
         p->name = g_strdup_printf("multifdsend_%d", i);
         socket_send_channel_create(multifd_new_send_channel_async, p);
     }
+
+    for (i = 0; i < thread_count; i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+        Error *local_err = NULL;
+        int ret;
+
+        ret = multifd_send_state->ops->send_setup(p, &local_err);
+        if (ret) {
+            error_propagate(errp, local_err);
+            return ret;
+        }
+    }
     return 0;
 }
 
@@ -626,6 +778,8 @@ struct {
     QemuSemaphore sem_sync;
     /* global number of generated multifd packets */
     uint64_t packet_num;
+    /* multifd ops */
+    MultiFDMethods *ops;
 } *multifd_recv_state;
 
 static void multifd_recv_terminate_threads(Error *err)
@@ -665,7 +819,6 @@ static void multifd_recv_terminate_threads(Error *err)
 int multifd_load_cleanup(Error **errp)
 {
     int i;
-    int ret = 0;
 
     if (!migrate_use_multifd()) {
         return 0;
@@ -698,6 +851,7 @@ int multifd_load_cleanup(Error **errp)
         p->packet_len = 0;
         g_free(p->packet);
         p->packet = NULL;
+        multifd_recv_state->ops->recv_cleanup(p);
     }
     qemu_sem_destroy(&multifd_recv_state->sem_sync);
     g_free(multifd_recv_state->params);
@@ -705,7 +859,7 @@ int multifd_load_cleanup(Error **errp)
     g_free(multifd_recv_state);
     multifd_recv_state = NULL;
 
-    return ret;
+    return 0;
 }
 
 void multifd_recv_sync_main(void)
@@ -770,6 +924,8 @@ static void *multifd_recv_thread(void *opaque)
 
         used = p->pages->used;
         flags = p->flags;
+        /* recv methods don't know how to handle the SYNC flag */
+        p->flags &= ~MULTIFD_FLAG_SYNC;
         trace_multifd_recv(p->id, p->packet_num, used, flags,
                            p->next_packet_size);
         p->num_packets++;
@@ -777,8 +933,7 @@ static void *multifd_recv_thread(void *opaque)
         qemu_mutex_unlock(&p->mutex);
 
         if (used) {
-            ret = qio_channel_readv_all(p->c, p->pages->iov,
-                                        used, &local_err);
+            ret = multifd_recv_state->ops->recv_pages(p, used, &local_err);
             if (ret != 0) {
                 break;
             }
@@ -817,6 +972,7 @@ int multifd_load_setup(Error **errp)
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
     atomic_set(&multifd_recv_state->count, 0);
     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
+    multifd_recv_state->ops = multifd_ops[migrate_multifd_method()];
 
     for (i = 0; i < thread_count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
@@ -831,6 +987,18 @@ int multifd_load_setup(Error **errp)
         p->packet = g_malloc0(p->packet_len);
         p->name = g_strdup_printf("multifdrecv_%d", i);
     }
+
+    for (i = 0; i < thread_count; i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+        Error *local_err = NULL;
+        int ret;
+
+        ret = multifd_recv_state->ops->recv_setup(p, &local_err);
+        if (ret) {
+            error_propagate(errp, local_err);
+            return ret;
+        }
+    }
     return 0;
 }
 
@@ -888,4 +1056,3 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
     return atomic_read(&multifd_recv_state->count) ==
            migrate_multifd_channels();
 }
-
diff --git a/migration/multifd.h b/migration/multifd.h
index d8b0205977..8edea4fdac 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -24,6 +24,7 @@ void multifd_send_sync_main(QEMUFile *f);
 int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset);
 
 #define MULTIFD_FLAG_SYNC (1 << 0)
+#define MULTIFD_FLAG_NOCOMP (1 << 1)
 
 /* This value needs to be a multiple of qemu_target_page_size() */
 #define MULTIFD_PACKET_SIZE (512 * 1024)
@@ -96,6 +97,8 @@ typedef struct {
     uint64_t num_pages;
     /* syncs main thread and channels */
     QemuSemaphore sem_sync;
+    /* used for compression methods */
+    void *data;
 }  MultiFDSendParams;
 
 typedef struct {
@@ -133,7 +136,26 @@ typedef struct {
     uint64_t num_pages;
     /* syncs main thread and channels */
     QemuSemaphore sem_sync;
+    /* used for de-compression methods */
+    void *data;
 } MultiFDRecvParams;
 
+typedef struct {
+    /* Setup for sending side */
+    int (*send_setup)(MultiFDSendParams *p, Error **errp);
+    /* Cleanup for sending side */
+    void (*send_cleanup)(MultiFDSendParams *p, Error **errp);
+    /* Prepare the send packet */
+    int (*send_prepare)(MultiFDSendParams *p, uint32_t used, Error **errp);
+    /* Write the send packet */
+    int (*send_write)(MultiFDSendParams *p, uint32_t used, Error **errp);
+    /* Setup for receiving side */
+    int (*recv_setup)(MultiFDRecvParams *p, Error **errp);
+    /* Cleanup for receiving side */
+    void (*recv_cleanup)(MultiFDRecvParams *p);
+    /* Read all pages */
+    int (*recv_pages)(MultiFDRecvParams *p, uint32_t used, Error **errp);
+} MultiFDMethods;
+
 #endif
 
diff --git a/migration/ram.c b/migration/ram.c
index ed23ed1c7c..73a141bb60 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -43,6 +43,7 @@
 #include "page_cache.h"
 #include "qemu/error-report.h"
 #include "qapi/error.h"
+#include "qapi/qapi-types-migration.h"
 #include "qapi/qapi-events-migration.h"
 #include "qapi/qmp/qerror.h"
 #include "trace.h"
-- 
2.24.1



  parent reply	other threads:[~2020-01-23 13:31 UTC|newest]

Thread overview: 51+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-01-23 11:58 [PATCH v3 00/21] Multifd Migration Compression Juan Quintela
2020-01-23 11:58 ` [PATCH v3 01/21] migration-test: Use g_free() instead of free() Juan Quintela
2020-01-24  9:37   ` Dr. David Alan Gilbert
2020-01-24  9:50   ` Philippe Mathieu-Daudé
2020-01-24 10:39   ` Daniel P. Berrangé
2020-01-24 11:08     ` Juan Quintela
2020-01-23 11:58 ` [PATCH v3 02/21] multifd: Make sure that we don't do any IO after an error Juan Quintela
2020-01-23 11:58 ` [PATCH v3 03/21] qemu-file: Don't do IO after shutdown Juan Quintela
2020-01-23 11:58 ` [PATCH v3 04/21] migration-test: Make sure that multifd and cancel works Juan Quintela
2020-01-23 11:58 ` [PATCH v3 05/21] migration: Create migration_is_running() Juan Quintela
2020-01-24  9:38   ` Dr. David Alan Gilbert
2020-01-23 11:58 ` [PATCH v3 06/21] migration: Don't send data if we have stopped Juan Quintela
2020-01-24  9:42   ` Dr. David Alan Gilbert
2020-01-23 11:58 ` [PATCH v3 07/21] migration: Make multifd_save_setup() get an Error parameter Juan Quintela
2020-01-24 12:57   ` Dr. David Alan Gilbert
2020-01-23 11:58 ` [PATCH v3 08/21] migration: Make multifd_load_setup() " Juan Quintela
2020-01-24 13:02   ` Dr. David Alan Gilbert
2020-01-23 11:58 ` [PATCH v3 09/21] migration: Add multifd-compress parameter Juan Quintela
2020-01-24 16:15   ` Eric Blake
2020-01-23 11:58 ` [PATCH v3 10/21] ram_addr: Split RAMBlock definition Juan Quintela
2020-01-24 10:39   ` Dr. David Alan Gilbert
2020-01-23 11:58 ` [PATCH v3 11/21] multifd: multifd_send_pages only needs the qemufile Juan Quintela
2020-01-24 10:57   ` Dr. David Alan Gilbert
2020-01-23 11:58 ` [PATCH v3 12/21] multifd: multifd_queue_page " Juan Quintela
2020-01-24 11:37   ` Dr. David Alan Gilbert
2020-01-23 11:58 ` [PATCH v3 13/21] multifd: multifd_send_sync_main " Juan Quintela
2020-01-24 11:40   ` Dr. David Alan Gilbert
2020-01-23 11:58 ` [PATCH v3 14/21] multifd: Use qemu_target_page_size() Juan Quintela
2020-01-24 11:42   ` Dr. David Alan Gilbert
2020-01-23 11:58 ` [PATCH v3 15/21] migration: Make checkpatch happy with comments Juan Quintela
2020-01-24 11:54   ` Dr. David Alan Gilbert
2020-01-23 11:58 ` [PATCH v3 16/21] migration: Add support for modules Juan Quintela
2020-01-24 18:13   ` Dr. David Alan Gilbert
2020-01-24 18:56     ` Juan Quintela
2020-01-23 11:58 ` [PATCH v3 17/21] multifd: Split multifd code into its own file Juan Quintela
2020-01-24 12:10   ` Dr. David Alan Gilbert
2020-01-23 11:58 ` Juan Quintela [this message]
2020-01-24 12:47   ` [PATCH v3 18/21] migration: Make no compression operations into its own structure Dr. David Alan Gilbert
2020-01-24 13:39     ` Juan Quintela
2020-01-24 13:46       ` Dr. David Alan Gilbert
2020-01-23 11:58 ` [PATCH v3 19/21] migration: Add zlib compression multifd support Juan Quintela
2020-01-24 13:44   ` Dr. David Alan Gilbert
2020-01-27 13:43     ` Juan Quintela
2020-01-24 16:16   ` Eric Blake
2020-01-23 11:58 ` [PATCH v3 20/21] configure: Enable test and libs for zstd Juan Quintela
2020-01-24 18:39   ` Dr. David Alan Gilbert
2020-01-23 11:58 ` [PATCH v3 21/21] migration: Add zstd compression multifd support Juan Quintela
2020-01-24 16:18   ` Eric Blake
2020-01-24 18:49     ` Juan Quintela
2020-01-23 12:17 ` [PATCH v3 00/21] Multifd Migration Compression Juan Quintela
2020-01-25  7:20 ` Markus Armbruster

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20200123115831.36842-19-quintela@redhat.com \
    --to=quintela@redhat.com \
    --cc=armbru@redhat.com \
    --cc=berrange@redhat.com \
    --cc=dgilbert@redhat.com \
    --cc=ehabkost@redhat.com \
    --cc=lvivier@redhat.com \
    --cc=pbonzini@redhat.com \
    --cc=qemu-devel@nongnu.org \
    --cc=thuth@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.