All of lore.kernel.org
 help / color / mirror / Atom feed
From: Fabiano Rosas <farosas@suse.de>
To: qemu-devel@nongnu.org
Cc: "Claudio Fontana" <cfontana@suse.de>,
	jfehlig@suse.com, dfaggioli@suse.com, dgilbert@redhat.com,
	"Daniel P . Berrangé" <berrange@redhat.com>,
	"Juan Quintela" <quintela@redhat.com>
Subject: [RFC PATCH v1 19/26] migration/multifd: Add pages to the receiving side
Date: Thu, 30 Mar 2023 15:03:29 -0300	[thread overview]
Message-ID: <20230330180336.2791-20-farosas@suse.de> (raw)
In-Reply-To: <20230330180336.2791-1-farosas@suse.de>

Currently multifd does not need to have knowledge of pages on the
receiving side because all the information needed is within the
packets that come in the stream.

We're about to add support to fixed-ram migration, which cannot use
packets because it expects the ramblock section in the migration file
to contain only the guest pages data.

Add a pointer to MultiFDPages in the multifd_recv_state and use the
pages similarly to what we already do on the sending side. The pages
are used to transfer data between the ram migration code in the main
migration thread and the multifd receiving threads.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
---
 migration/multifd.c | 98 +++++++++++++++++++++++++++++++++++++++++++++
 migration/multifd.h | 12 ++++++
 2 files changed, 110 insertions(+)

diff --git a/migration/multifd.c b/migration/multifd.c
index 1332b426ce..20ef665218 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -1004,6 +1004,8 @@ int multifd_save_setup(Error **errp)
 
 struct {
     MultiFDRecvParams *params;
+    /* array of pages to receive */
+    MultiFDPages_t *pages;
     /* number of created threads */
     int count;
     /* syncs main thread and channels */
@@ -1014,6 +1016,66 @@ struct {
     MultiFDMethods *ops;
 } *multifd_recv_state;
 
+static int multifd_recv_pages(QEMUFile *f)
+{
+    int i;
+    static int next_recv_channel;
+    MultiFDRecvParams *p = NULL;
+    MultiFDPages_t *pages = multifd_recv_state->pages;
+
+    /*
+     * next_channel can remain from a previous migration that was
+     * using more channels, so ensure it doesn't overflow if the
+     * limit is lower now.
+     */
+    next_recv_channel %= migrate_multifd_channels();
+    for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
+        p = &multifd_recv_state->params[i];
+
+        qemu_mutex_lock(&p->mutex);
+        if (p->quit) {
+            error_report("%s: channel %d has already quit!", __func__, i);
+            qemu_mutex_unlock(&p->mutex);
+            return -1;
+        }
+        if (!p->pending_job) {
+            p->pending_job++;
+            next_recv_channel = (i + 1) % migrate_multifd_channels();
+            break;
+        }
+        qemu_mutex_unlock(&p->mutex);
+    }
+
+    multifd_recv_state->pages = p->pages;
+    p->pages = pages;
+    qemu_mutex_unlock(&p->mutex);
+    qemu_sem_post(&p->sem);
+
+    return 1;
+}
+
+int multifd_recv_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
+{
+    MultiFDPages_t *pages = multifd_recv_state->pages;
+
+    if (!pages->block) {
+        pages->block = block;
+    }
+
+    pages->offset[pages->num] = offset;
+    pages->num++;
+
+    if (pages->num < pages->allocated) {
+        return 1;
+    }
+
+    if (multifd_recv_pages(f) < 0) {
+        return -1;
+    }
+
+    return 1;
+}
+
 static void multifd_recv_terminate_threads(Error *err)
 {
     int i;
@@ -1035,6 +1097,7 @@ static void multifd_recv_terminate_threads(Error *err)
 
         qemu_mutex_lock(&p->mutex);
         p->quit = true;
+        qemu_sem_post(&p->sem);
         /*
          * We could arrive here for two reasons:
          *  - normal quit, i.e. everything went fine, just finished
@@ -1083,9 +1146,12 @@ void multifd_load_cleanup(void)
         object_unref(OBJECT(p->c));
         p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
+        qemu_sem_destroy(&p->sem);
         qemu_sem_destroy(&p->sem_sync);
         g_free(p->name);
         p->name = NULL;
+        multifd_pages_clear(p->pages);
+        p->pages = NULL;
         p->packet_len = 0;
         g_free(p->packet);
         p->packet = NULL;
@@ -1098,6 +1164,8 @@ void multifd_load_cleanup(void)
     qemu_sem_destroy(&multifd_recv_state->sem_sync);
     g_free(multifd_recv_state->params);
     multifd_recv_state->params = NULL;
+    multifd_pages_clear(multifd_recv_state->pages);
+    multifd_recv_state->pages = NULL;
     g_free(multifd_recv_state);
     multifd_recv_state = NULL;
 }
@@ -1160,6 +1228,25 @@ static void *multifd_recv_thread(void *opaque)
                 break;
             }
             p->num_packets++;
+        } else {
+            /*
+             * No packets, so we need to wait for the vmstate code to
+             * queue pages.
+             */
+            qemu_sem_wait(&p->sem);
+            qemu_mutex_lock(&p->mutex);
+            if (!p->pending_job) {
+                qemu_mutex_unlock(&p->mutex);
+                break;
+            }
+
+            for (int i = 0; i < p->pages->num; i++) {
+                p->normal[p->normal_num] = p->pages->offset[i];
+                p->normal_num++;
+            }
+
+            p->pages->num = 0;
+            p->host = p->pages->block->host;
         }
 
         flags = p->flags;
@@ -1182,6 +1269,13 @@ static void *multifd_recv_thread(void *opaque)
             qemu_sem_post(&multifd_recv_state->sem_sync);
             qemu_sem_wait(&p->sem_sync);
         }
+
+        if (!use_packets) {
+            qemu_mutex_lock(&p->mutex);
+            p->pending_job--;
+            p->pages->block = NULL;
+            qemu_mutex_unlock(&p->mutex);
+        }
     }
 
     if (local_err) {
@@ -1216,6 +1310,7 @@ int multifd_load_setup(Error **errp)
     thread_count = migrate_multifd_channels();
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
+    multifd_recv_state->pages = multifd_pages_init(page_count);
     qatomic_set(&multifd_recv_state->count, 0);
     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
@@ -1224,9 +1319,12 @@ int multifd_load_setup(Error **errp)
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         qemu_mutex_init(&p->mutex);
+        qemu_sem_init(&p->sem, 0);
         qemu_sem_init(&p->sem_sync, 0);
         p->quit = false;
+        p->pending_job = 0;
         p->id = i;
+        p->pages = multifd_pages_init(page_count);
 
         if (use_packets) {
             p->packet_len = sizeof(MultiFDPacket_t)
diff --git a/migration/multifd.h b/migration/multifd.h
index 354150ff55..2f008217c3 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -24,6 +24,7 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
 void multifd_recv_sync_main(void);
 int multifd_send_sync_main(QEMUFile *f);
 int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset);
+int multifd_recv_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset);
 
 /* Multifd Compression flags */
 #define MULTIFD_FLAG_SYNC (1 << 0)
@@ -153,7 +154,11 @@ typedef struct {
     uint32_t page_size;
     /* number of pages in a full packet */
     uint32_t page_count;
+    /* multifd flags for receiving ram */
+    int read_flags;
 
+    /* sem where to wait for more work */
+    QemuSemaphore sem;
     /* syncs main thread and channels */
     QemuSemaphore sem_sync;
 
@@ -167,6 +172,13 @@ typedef struct {
     uint32_t flags;
     /* global number of generated multifd packets */
     uint64_t packet_num;
+    int pending_job;
+    /* array of pages to sent.
+     * The owner of 'pages' depends of 'pending_job' value:
+     * pending_job == 0 -> migration_thread can use it.
+     * pending_job != 0 -> multifd_channel can use it.
+     */
+    MultiFDPages_t *pages;
 
     /* thread local variables. No locking required */
 
-- 
2.35.3



  parent reply	other threads:[~2023-03-30 18:06 UTC|newest]

Thread overview: 65+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-03-30 18:03 [RFC PATCH v1 00/26] migration: File based migration with multifd and fixed-ram Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 01/26] migration: Add support for 'file:' uri for source migration Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 02/26] migration: Add support for 'file:' uri for incoming migration Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 03/26] tests/qtest: migration: Add migrate_incoming_qmp helper Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 04/26] tests/qtest: migration-test: Add tests for file-based migration Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 05/26] migration: Initial support of fixed-ram feature for analyze-migration.py Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 06/26] io: add and implement QIO_CHANNEL_FEATURE_SEEKABLE for channel file Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 07/26] io: Add generic pwritev/preadv interface Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 08/26] io: implement io_pwritev/preadv for QIOChannelFile Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 09/26] migration/qemu-file: add utility methods for working with seekable channels Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 10/26] migration/ram: Introduce 'fixed-ram' migration stream capability Fabiano Rosas
2023-03-30 22:01   ` Peter Xu
2023-03-31  7:56     ` Daniel P. Berrangé
2023-03-31 14:39       ` Peter Xu
2023-03-31 15:34         ` Daniel P. Berrangé
2023-03-31 16:13           ` Peter Xu
2023-03-31 15:05     ` Fabiano Rosas
2023-03-31  5:50   ` Markus Armbruster
2023-03-30 18:03 ` [RFC PATCH v1 11/26] migration: Refactor precopy ram loading code Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 12/26] migration: Add support for 'fixed-ram' migration restore Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 13/26] tests/qtest: migration-test: Add tests for fixed-ram file-based migration Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 14/26] migration: Add completion tracepoint Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 15/26] migration/multifd: Remove direct "socket" references Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 16/26] migration/multifd: Allow multifd without packets Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 17/26] migration/multifd: Add outgoing QIOChannelFile support Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 18/26] migration/multifd: Add incoming " Fabiano Rosas
2023-03-30 18:03 ` Fabiano Rosas [this message]
2023-03-30 18:03 ` [RFC PATCH v1 20/26] io: Add a pwritev/preadv version that takes a discontiguous iovec Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 21/26] migration/ram: Add a wrapper for fixed-ram shadow bitmap Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 22/26] migration/multifd: Support outgoing fixed-ram stream format Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 23/26] migration/multifd: Support incoming " Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 24/26] tests/qtest: Add a multifd + fixed-ram migration test Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 25/26] migration: Add direct-io parameter Fabiano Rosas
2023-03-30 18:03 ` [RFC PATCH v1 26/26] tests/migration/guestperf: Add file, fixed-ram and direct-io support Fabiano Rosas
2023-03-30 21:41 ` [RFC PATCH v1 00/26] migration: File based migration with multifd and fixed-ram Peter Xu
2023-03-31 14:37   ` Fabiano Rosas
2023-03-31 14:52     ` Peter Xu
2023-03-31 15:30       ` Fabiano Rosas
2023-03-31 15:55         ` Peter Xu
2023-03-31 16:10           ` Daniel P. Berrangé
2023-03-31 16:27             ` Peter Xu
2023-03-31 18:18               ` Fabiano Rosas
2023-03-31 21:52                 ` Peter Xu
2023-04-03  7:47                   ` Claudio Fontana
2023-04-03 19:26                     ` Peter Xu
2023-04-04  8:00                       ` Claudio Fontana
2023-04-04 14:53                         ` Peter Xu
2023-04-04 15:10                           ` Claudio Fontana
2023-04-04 15:56                             ` Peter Xu
2023-04-06 16:46                               ` Fabiano Rosas
2023-04-07 10:36                                 ` Claudio Fontana
2023-04-11 15:48                                   ` Peter Xu
2023-04-18 16:58               ` Daniel P. Berrangé
2023-04-18 19:26                 ` Peter Xu
2023-04-19 17:12                   ` Daniel P. Berrangé
2023-04-19 19:07                     ` Peter Xu
2023-04-20  9:02                       ` Daniel P. Berrangé
2023-04-20 19:19                         ` Peter Xu
2023-04-21  7:48                           ` Daniel P. Berrangé
2023-04-21 13:56                             ` Peter Xu
2023-03-31 15:46       ` Daniel P. Berrangé
2023-04-03  7:38 ` David Hildenbrand
2023-04-03 14:41   ` Fabiano Rosas
2023-04-03 16:24     ` David Hildenbrand
2023-04-03 16:36       ` Fabiano Rosas

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20230330180336.2791-20-farosas@suse.de \
    --to=farosas@suse.de \
    --cc=berrange@redhat.com \
    --cc=cfontana@suse.de \
    --cc=dfaggioli@suse.com \
    --cc=dgilbert@redhat.com \
    --cc=jfehlig@suse.com \
    --cc=qemu-devel@nongnu.org \
    --cc=quintela@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.