All of lore.kernel.org
 help / color / mirror / Atom feed
From: Juan Quintela <quintela@redhat.com>
To: qemu-devel@nongnu.org
Cc: "Laurent Vivier" <lvivier@redhat.com>,
	"Thomas Huth" <thuth@redhat.com>,
	"Daniel P. Berrangé" <berrange@redhat.com>,
	"Eduardo Habkost" <ehabkost@redhat.com>,
	"Juan Quintela" <quintela@redhat.com>,
	"Dr. David Alan Gilbert" <dgilbert@redhat.com>,
	"Markus Armbruster" <armbru@redhat.com>,
	"Paolo Bonzini" <pbonzini@redhat.com>
Subject: [PATCH v5 3/8] multifd: Make no compression operations into its own structure
Date: Wed, 29 Jan 2020 12:56:50 +0100	[thread overview]
Message-ID: <20200129115655.10414-4-quintela@redhat.com> (raw)
In-Reply-To: <20200129115655.10414-1-quintela@redhat.com>

It will be used later.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/migration.c |   9 ++
 migration/migration.h |   1 +
 migration/multifd.c   | 185 ++++++++++++++++++++++++++++++++++++++++--
 migration/multifd.h   |  25 ++++++
 migration/ram.c       |   1 +
 5 files changed, 213 insertions(+), 8 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index cd72bb6e9a..06f6c2d529 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -2245,6 +2245,15 @@ int migrate_multifd_channels(void)
     return s->parameters.multifd_channels;
 }
 
+MultiFDMethod migrate_multifd_method(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->parameters.multifd_method;
+}
+
 int migrate_use_xbzrle(void)
 {
     MigrationState *s;
diff --git a/migration/migration.h b/migration/migration.h
index 8473ddfc88..3d23a0852e 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -300,6 +300,7 @@ bool migrate_auto_converge(void);
 bool migrate_use_multifd(void);
 bool migrate_pause_before_switchover(void);
 int migrate_multifd_channels(void);
+MultiFDMethod migrate_multifd_method(void);
 
 int migrate_use_xbzrle(void);
 int64_t migrate_xbzrle_cache_size(void);
diff --git a/migration/multifd.c b/migration/multifd.c
index b3e8ae9bcc..1c49c2a665 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -38,6 +38,134 @@ typedef struct {
     uint64_t unused2[4];    /* Reserved for future use */
 } __attribute__((packed)) MultiFDInit_t;
 
+/* Multifd without compression */
+
+/**
+ * nocomp_send_setup: setup send side
+ *
+ * For no compression this function does nothing.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
+{
+    return 0;
+}
+
+/**
+ * nocomp_send_cleanup: cleanup send side
+ *
+ * For no compression this function does nothing.
+ *
+ * @p: Params for the channel that we are using
+ */
+static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
+{
+    return;
+}
+
+/**
+ * nocomp_send_prepare: prepare date to be able to send
+ *
+ * For no compression we just have to calculate the size of the
+ * packet.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @used: number of pages used
+ * @errp: pointer to an error
+ */
+static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used,
+                               Error **errp)
+{
+    p->next_packet_size = used * qemu_target_page_size();
+    p->flags |= MULTIFD_FLAG_NOCOMP;
+    return 0;
+}
+
+/**
+ * nocomp_send_write: do the actual write of the data
+ *
+ * For no compression we just have to write the data.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @used: number of pages used
+ * @errp: pointer to an error
+ */
+static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
+{
+    return qio_channel_writev_all(p->c, p->pages->iov, used, errp);
+}
+
+/**
+ * nocomp_recv_setup: setup receive side
+ *
+ * For no compression this function does nothing.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
+{
+    return 0;
+}
+
+/**
+ * nocomp_recv_cleanup: setup receive side
+ *
+ * For no compression this function does nothing.
+ *
+ * @p: Params for the channel that we are using
+ */
+static void nocomp_recv_cleanup(MultiFDRecvParams *p)
+{
+}
+
+/**
+ * nocomp_recv_pages: read the data from the channel into actual pages
+ *
+ * For no compression we just need to read things into the correct place.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @used: number of pages used
+ * @errp: pointer to an error
+ */
+static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
+{
+    uint32_t flags = p->flags & MULTIFD_FLAG_METHOD_MASK;
+
+    if (flags != MULTIFD_FLAG_NOCOMP) {
+        error_setg(errp, "multifd %d: flags received %x flags expected %x",
+                   p->id, flags, MULTIFD_FLAG_NOCOMP);
+        return -1;
+    }
+    return qio_channel_readv_all(p->c, p->pages->iov, used, errp);
+}
+
+static MultiFDMethods multifd_nocomp_ops = {
+    .send_setup = nocomp_send_setup,
+    .send_cleanup = nocomp_send_cleanup,
+    .send_prepare = nocomp_send_prepare,
+    .send_write = nocomp_send_write,
+    .recv_setup = nocomp_recv_setup,
+    .recv_cleanup = nocomp_recv_cleanup,
+    .recv_pages = nocomp_recv_pages
+};
+
+static MultiFDMethods *multifd_ops[MULTIFD_METHOD__MAX] = {
+    [MULTIFD_METHOD_NONE] = &multifd_nocomp_ops,
+};
+
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
 {
     MultiFDInit_t msg = {};
@@ -246,6 +374,8 @@ struct {
      * We will use atomic operations.  Only valid values are 0 and 1.
      */
     int exiting;
+    /* multifd ops */
+    MultiFDMethods *ops;
 } *multifd_send_state;
 
 /*
@@ -397,6 +527,7 @@ void multifd_save_cleanup(void)
     }
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
+        Error *local_err = NULL;
 
         socket_send_channel_destroy(p->c);
         p->c = NULL;
@@ -410,6 +541,10 @@ void multifd_save_cleanup(void)
         p->packet_len = 0;
         g_free(p->packet);
         p->packet = NULL;
+        multifd_send_state->ops->send_cleanup(p, &local_err);
+        if (local_err) {
+            migrate_set_error(migrate_get_current(), local_err);
+        }
     }
     qemu_sem_destroy(&multifd_send_state->channels_ready);
     g_free(multifd_send_state->params);
@@ -494,7 +629,14 @@ static void *multifd_send_thread(void *opaque)
             uint64_t packet_num = p->packet_num;
             flags = p->flags;
 
-            p->next_packet_size = used * qemu_target_page_size();
+            if (used) {
+                ret = multifd_send_state->ops->send_prepare(p, used,
+                                                            &local_err);
+                if (ret != 0) {
+                    qemu_mutex_unlock(&p->mutex);
+                    break;
+                }
+            }
             multifd_send_fill_packet(p);
             p->flags = 0;
             p->num_packets++;
@@ -513,8 +655,7 @@ static void *multifd_send_thread(void *opaque)
             }
 
             if (used) {
-                ret = qio_channel_writev_all(p->c, p->pages->iov,
-                                             used, &local_err);
+                ret = multifd_send_state->ops->send_write(p, used, &local_err);
                 if (ret != 0) {
                     break;
                 }
@@ -604,6 +745,7 @@ int multifd_save_setup(Error **errp)
     multifd_send_state->pages = multifd_pages_init(page_count);
     qemu_sem_init(&multifd_send_state->channels_ready, 0);
     atomic_set(&multifd_send_state->exiting, 0);
+    multifd_send_state->ops = multifd_ops[migrate_multifd_method()];
 
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
@@ -623,6 +765,18 @@ int multifd_save_setup(Error **errp)
         p->name = g_strdup_printf("multifdsend_%d", i);
         socket_send_channel_create(multifd_new_send_channel_async, p);
     }
+
+    for (i = 0; i < thread_count; i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+        Error *local_err = NULL;
+        int ret;
+
+        ret = multifd_send_state->ops->send_setup(p, &local_err);
+        if (ret) {
+            error_propagate(errp, local_err);
+            return ret;
+        }
+    }
     return 0;
 }
 
@@ -634,6 +788,8 @@ struct {
     QemuSemaphore sem_sync;
     /* global number of generated multifd packets */
     uint64_t packet_num;
+    /* multifd ops */
+    MultiFDMethods *ops;
 } *multifd_recv_state;
 
 static void multifd_recv_terminate_threads(Error *err)
@@ -673,7 +829,6 @@ static void multifd_recv_terminate_threads(Error *err)
 int multifd_load_cleanup(Error **errp)
 {
     int i;
-    int ret = 0;
 
     if (!migrate_use_multifd()) {
         return 0;
@@ -706,6 +861,7 @@ int multifd_load_cleanup(Error **errp)
         p->packet_len = 0;
         g_free(p->packet);
         p->packet = NULL;
+        multifd_recv_state->ops->recv_cleanup(p);
     }
     qemu_sem_destroy(&multifd_recv_state->sem_sync);
     g_free(multifd_recv_state->params);
@@ -713,7 +869,7 @@ int multifd_load_cleanup(Error **errp)
     g_free(multifd_recv_state);
     multifd_recv_state = NULL;
 
-    return ret;
+    return 0;
 }
 
 void multifd_recv_sync_main(void)
@@ -778,6 +934,8 @@ static void *multifd_recv_thread(void *opaque)
 
         used = p->pages->used;
         flags = p->flags;
+        /* recv methods don't know how to handle the SYNC flag */
+        p->flags &= ~MULTIFD_FLAG_SYNC;
         trace_multifd_recv(p->id, p->packet_num, used, flags,
                            p->next_packet_size);
         p->num_packets++;
@@ -785,8 +943,7 @@ static void *multifd_recv_thread(void *opaque)
         qemu_mutex_unlock(&p->mutex);
 
         if (used) {
-            ret = qio_channel_readv_all(p->c, p->pages->iov,
-                                        used, &local_err);
+            ret = multifd_recv_state->ops->recv_pages(p, used, &local_err);
             if (ret != 0) {
                 break;
             }
@@ -825,6 +982,7 @@ int multifd_load_setup(Error **errp)
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
     atomic_set(&multifd_recv_state->count, 0);
     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
+    multifd_recv_state->ops = multifd_ops[migrate_multifd_method()];
 
     for (i = 0; i < thread_count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
@@ -839,6 +997,18 @@ int multifd_load_setup(Error **errp)
         p->packet = g_malloc0(p->packet_len);
         p->name = g_strdup_printf("multifdrecv_%d", i);
     }
+
+    for (i = 0; i < thread_count; i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+        Error *local_err = NULL;
+        int ret;
+
+        ret = multifd_recv_state->ops->recv_setup(p, &local_err);
+        if (ret) {
+            error_propagate(errp, local_err);
+            return ret;
+        }
+    }
     return 0;
 }
 
@@ -896,4 +1066,3 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
     return atomic_read(&multifd_recv_state->count) ==
            migrate_multifd_channels();
 }
-
diff --git a/migration/multifd.h b/migration/multifd.h
index d8b0205977..c7fea4914c 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -25,6 +25,10 @@ int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset);
 
 #define MULTIFD_FLAG_SYNC (1 << 0)
 
+/* We reserve 3 bits for METHODS */
+#define MULTIFD_FLAG_METHOD_MASK (7 << 1)
+#define MULTIFD_FLAG_NOCOMP (1 << 1)
+
 /* This value needs to be a multiple of qemu_target_page_size() */
 #define MULTIFD_PACKET_SIZE (512 * 1024)
 
@@ -96,6 +100,8 @@ typedef struct {
     uint64_t num_pages;
     /* syncs main thread and channels */
     QemuSemaphore sem_sync;
+    /* used for compression methods */
+    void *data;
 }  MultiFDSendParams;
 
 typedef struct {
@@ -133,7 +139,26 @@ typedef struct {
     uint64_t num_pages;
     /* syncs main thread and channels */
     QemuSemaphore sem_sync;
+    /* used for de-compression methods */
+    void *data;
 } MultiFDRecvParams;
 
+typedef struct {
+    /* Setup for sending side */
+    int (*send_setup)(MultiFDSendParams *p, Error **errp);
+    /* Cleanup for sending side */
+    void (*send_cleanup)(MultiFDSendParams *p, Error **errp);
+    /* Prepare the send packet */
+    int (*send_prepare)(MultiFDSendParams *p, uint32_t used, Error **errp);
+    /* Write the send packet */
+    int (*send_write)(MultiFDSendParams *p, uint32_t used, Error **errp);
+    /* Setup for receiving side */
+    int (*recv_setup)(MultiFDRecvParams *p, Error **errp);
+    /* Cleanup for receiving side */
+    void (*recv_cleanup)(MultiFDRecvParams *p);
+    /* Read all pages */
+    int (*recv_pages)(MultiFDRecvParams *p, uint32_t used, Error **errp);
+} MultiFDMethods;
+
 #endif
 
diff --git a/migration/ram.c b/migration/ram.c
index ed23ed1c7c..73a141bb60 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -43,6 +43,7 @@
 #include "page_cache.h"
 #include "qemu/error-report.h"
 #include "qapi/error.h"
+#include "qapi/qapi-types-migration.h"
 #include "qapi/qapi-events-migration.h"
 #include "qapi/qmp/qerror.h"
 #include "trace.h"
-- 
2.24.1



  parent reply	other threads:[~2020-01-29 11:59 UTC|newest]

Thread overview: 39+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-01-29 11:56 [PATCH v5 0/8] Multifd Migration Compression Juan Quintela
2020-01-29 11:56 ` [PATCH v5 1/8] multifd: Add multifd-method parameter Juan Quintela
2020-01-30  7:54   ` Markus Armbruster
2020-01-30  9:11     ` Juan Quintela
2020-01-30 12:17       ` Markus Armbruster
2020-02-11 18:50   ` Daniel P. Berrangé
2020-02-13 19:29     ` Juan Quintela
2020-01-29 11:56 ` [PATCH v5 2/8] migration: Add support for modules Juan Quintela
2020-02-11 10:54   ` Dr. David Alan Gilbert
2020-02-13 19:38     ` Juan Quintela
2020-01-29 11:56 ` Juan Quintela [this message]
2020-02-07 18:45   ` [PATCH v5 3/8] multifd: Make no compression operations into its own structure Dr. David Alan Gilbert
2020-02-11 11:23     ` Juan Quintela
2020-01-29 11:56 ` [PATCH v5 4/8] multifd: Add multifd-zlib-level parameter Juan Quintela
2020-01-30  8:03   ` Markus Armbruster
2020-01-30  8:56     ` Juan Quintela
2020-02-11 18:57     ` Daniel P. Berrangé
2020-02-13 13:27       ` Markus Armbruster
2020-02-13 16:33       ` Juan Quintela
2020-01-29 11:56 ` [PATCH v5 5/8] multifd: Add zlib compression multifd support Juan Quintela
2020-01-30  8:04   ` Markus Armbruster
2020-02-11 18:43   ` Dr. David Alan Gilbert
2020-02-13 20:24     ` Juan Quintela
2020-01-29 11:56 ` [PATCH v5 6/8] configure: Enable test and libs for zstd Juan Quintela
2020-02-11 20:11   ` Daniel P. Berrangé
2020-02-13 21:08     ` Juan Quintela
2020-02-14 10:26       ` Daniel P. Berrangé
2020-01-29 11:56 ` [PATCH v5 7/8] multifd: Add multifd-zstd-level parameter Juan Quintela
2020-01-30  8:08   ` Markus Armbruster
2020-02-11 18:47   ` Dr. David Alan Gilbert
2020-02-13 14:04     ` Markus Armbruster
2020-02-13 14:28       ` Dr. David Alan Gilbert
2020-02-13 15:33       ` Juan Quintela
2020-02-14  8:49         ` Markus Armbruster
2020-02-14 18:50           ` Dr. David Alan Gilbert
2020-01-29 11:56 ` [PATCH v5 8/8] multifd: Add zstd compression multifd support Juan Quintela
2020-01-30  8:08   ` Markus Armbruster
2020-02-11 20:01   ` Dr. David Alan Gilbert
2020-02-13 20:39     ` Juan Quintela

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20200129115655.10414-4-quintela@redhat.com \
    --to=quintela@redhat.com \
    --cc=armbru@redhat.com \
    --cc=berrange@redhat.com \
    --cc=dgilbert@redhat.com \
    --cc=ehabkost@redhat.com \
    --cc=lvivier@redhat.com \
    --cc=pbonzini@redhat.com \
    --cc=qemu-devel@nongnu.org \
    --cc=thuth@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.