qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
* [RFC PATCH v1 0/7] migration/snapshot: External snapshot utility
@ 2021-05-12 19:26 Andrey Gruzdev
  2021-05-12 19:26 ` [RFC PATCH v1 1/7] migration/snapshot: Introduce qemu-snapshot tool Andrey Gruzdev
                   ` (7 more replies)
  0 siblings, 8 replies; 9+ messages in thread
From: Andrey Gruzdev @ 2021-05-12 19:26 UTC (permalink / raw)
  To: qemu-devel
  Cc: Den Lunev, Vladimir Sementsov-Ogievskiy, Eric Blake,
	Paolo Bonzini, Juan Quintela, Dr . David Alan Gilbert,
	Markus Armbruster, Peter Xu, David Hildenbrand, Andrey Gruzdev

Changes v0 -> v1:
 * Changed command-line format, now use blockdev specification to
   define vmstate image.
 * Don't deal with image creation in the tool, create externally.
 * Better block layer AIO handling in the load path.
 * Reduced fragmentation of the image backing file by using 'writtent-slice'
   bitmaps in RAM blocks. Zero block write is issued to a never written slice
   before the actual memory page write takes place.
 * Improved load performance in postcopy by using 'loaded-slice' bitmaps
   in RAM blocks.
 * Refactored error handling/messages.
 * Refactored namings.

This series is a kind of PoC for asynchronous snapshot reverting. It is
about external snapshots only and doesn't involve block devices. Thus, it's
mainly intended to be used with the new 'background-snapshot' migration
capability and otherwise standard QEMU migration mechanism.

The major ideas behind this first version were:
  * Make it compatible with 'exec:'-style migration - options can be create
    some separate tool or integrate into qemu-system.
  * Support asynchronous revert stage by using unaltered postcopy logic
    at destination. To do this, we should be capable of saving RAM pages
    so that any particular page can be directly addressed by it's block ID
    and page offset. Possible solutions here seem to be:
      use separate index (and storing it somewhere)
      create sparse file on host FS and address pages with file offset
      use QCOW2 (or other) image container with inherent sparsity support
  * Make image file dense on the host FS so we don't depend on
    copy/backup tools and how they deal with sparse files. Off course,
    there's some performance cost for this choice.
  * Make the code which is parsing unstructered format of migration stream,
    at least, not very sophisticated. Also, try to have minimum dependencies
    on QEMU migration code, both RAM and device.
  * Try to keep page save latencies small while not degrading migration
    bandwidth too much.

For this first version I decided not to integrate into main QEMU code but
create a separate tool. The main reason is that there's not too much migration
code that is target-specific and can be used in it's unmodified form. Also,
it's still not very clear how to make 'qemu-system' integration in terms of
command-line (or monitor/QMP?) interface extension.

For the storage format, QCOW2 as a container and large (1MB) cluster size seem
to be an optimal choice. Larger cluster is beneficial for performance particularly
in the case when image preallocation is disabled. Such cluster size does not result
in too high internal fragmentation level (~10% of space waste in most cases) yet
allows to reduce significantly the number of expensive cluster allocations.

A bit tricky part is dispatching QEMU migration stream cause it is mostly
unstructered and depends on configuration parameters like 'send-configuration'
and 'send-section-footer'. But, for the case with default values in migration
globals it seems that implemented dispatching code works well and won't have
compatibility issues in a reasonably long time frame.

I decided to keep RAM save path synchronous, anyhow it's better to use writeback
cache mode for the live snapshots cause of it's interleaving page address pattern.
Page coalescing buffer is used to merge contiguous pages to optimize block layer
writes.

Since for snapshot loading opening image file in cached mode would not do any good,
it implies that Linux native AIO and O_DIRECT mode is used in a common scenario.
AIO support in RAM loading path is implemented by using a ring of preallocated
fixed-sized buffers in such a way that there's always a number of outstanding block
requests anytime. It also ensures in-order request completion.

How to use:

**Save:**
* > qemu-img create -f qcow2 -o size=<2_x_ram_size>,cluster_size=1M,
           preallocation=off,refcount_bits=8 <image-filename>
* qemu> migrate_set_capability background-snapshot on
* qemu> migrate "exec:qemu-snapshot
           <image-filename>,cache.direct=off,file.aio=threads"

**Load:**
* Use 'qemu-system-* -incoming defer'
* qemu> migrate_incoming "exec:qemu-snapshot --revert
           <image-filename>,cache.direct=on,file.aio=native"

**Load with postcopy:**
* Use 'qemu-system-* -incoming defer'
* qemu> migrate_set_capability postcopy-ram on
* qemu> migrate_incoming "exec:qemu-snapshot --revert --postcopy=60
           <image-filename>,cache.direct=on,file.aio=native"

And yes, asynchronous revert works well only with SSD, not with rotational disk..

Some performance stats:
* SATA SSD drive with ~500/450 MB/s sequantial read/write and ~60K IOPS max.
* 220 MB/s average save rate (depends on workload).
* 440 MB/s average load rate in precopy.
* 260 MB/s average load rate in postcopy.


Andrey Gruzdev (7):
  migration/snapshot: Introduce qemu-snapshot tool
  migration/snapshot: Introduce qemu_ftell2() routine
  migration/snapshot: Move RAM_SAVE_FLAG_xxx defines to migration/ram.h
  migration/snapshot: Block layer AIO support in qemu-snapshot
  migration/snapshot: Implementation of qemu-snapshot save path
  migration/snapshot: Implementation of qemu-snapshot load path
  migration/snapshot: Implementation of qemu-snapshot load path in
    postcopy mode

 include/qemu-snapshot.h |  155 ++++
 meson.build             |    2 +
 migration/qemu-file.c   |    6 +
 migration/qemu-file.h   |    1 +
 migration/ram.c         |   16 -
 migration/ram.h         |   16 +
 qemu-snapshot-io.c      |  266 ++++++
 qemu-snapshot-vm.c      | 1881 +++++++++++++++++++++++++++++++++++++++
 qemu-snapshot.c         |  554 ++++++++++++
 9 files changed, 2881 insertions(+), 16 deletions(-)
 create mode 100644 include/qemu-snapshot.h
 create mode 100644 qemu-snapshot-io.c
 create mode 100644 qemu-snapshot-vm.c
 create mode 100644 qemu-snapshot.c

-- 
2.27.0



^ permalink raw reply	[flat|nested] 9+ messages in thread

* [RFC PATCH v1 1/7] migration/snapshot: Introduce qemu-snapshot tool
  2021-05-12 19:26 [RFC PATCH v1 0/7] migration/snapshot: External snapshot utility Andrey Gruzdev
@ 2021-05-12 19:26 ` Andrey Gruzdev
  2021-05-12 19:26 ` [RFC PATCH v1 2/7] migration/snapshot: Introduce qemu_ftell2() routine Andrey Gruzdev
                   ` (6 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Andrey Gruzdev @ 2021-05-12 19:26 UTC (permalink / raw)
  To: qemu-devel
  Cc: Den Lunev, Vladimir Sementsov-Ogievskiy, Eric Blake,
	Paolo Bonzini, Juan Quintela, Dr . David Alan Gilbert,
	Markus Armbruster, Peter Xu, David Hildenbrand, Andrey Gruzdev

Execution environment, command-line argument parsing, usage/version info etc.

Signed-off-by: Andrey Gruzdev <andrey.gruzdev@virtuozzo.com>
---
 include/qemu-snapshot.h |  59 ++++++
 meson.build             |   2 +
 qemu-snapshot-vm.c      |  57 ++++++
 qemu-snapshot.c         | 439 ++++++++++++++++++++++++++++++++++++++++
 4 files changed, 557 insertions(+)
 create mode 100644 include/qemu-snapshot.h
 create mode 100644 qemu-snapshot-vm.c
 create mode 100644 qemu-snapshot.c

diff --git a/include/qemu-snapshot.h b/include/qemu-snapshot.h
new file mode 100644
index 0000000000..154e11e9a5
--- /dev/null
+++ b/include/qemu-snapshot.h
@@ -0,0 +1,59 @@
+/*
+ * QEMU External Snapshot Utility
+ *
+ * Copyright Virtuozzo GmbH, 2021
+ *
+ * Authors:
+ *  Andrey Gruzdev   <andrey.gruzdev@virtuozzo.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later. See the COPYING file in the top-level directory.
+ */
+
+#ifndef QEMU_SNAPSHOT_H
+#define QEMU_SNAPSHOT_H
+
+/* Invalid offset */
+#define INVALID_OFFSET              -1
+/* Maximum byte count for qemu_get_buffer_in_place() */
+#define INPLACE_READ_MAX            (32768 - 4096)
+
+/* Backing cluster size */
+#define BDRV_CLUSTER_SIZE           (1024 * 1024)
+
+/* Minimum supported target page size */
+#define PAGE_SIZE_MIN               4096
+/*
+ * Maximum supported target page size. The limit is caused by using
+ * QEMUFile and qemu_get_buffer_in_place() on migration channel.
+ * IO_BUF_SIZE is currently 32KB.
+ */
+#define PAGE_SIZE_MAX               16384
+/* RAM slice size for snapshot saving */
+#define SLICE_SIZE                  PAGE_SIZE_MAX
+/* RAM slice size for snapshot revert */
+#define SLICE_SIZE_REVERT           (16 * PAGE_SIZE_MAX)
+
+typedef struct StateSaveCtx {
+    BlockBackend *blk;          /* Block backend */
+} StateSaveCtx;
+
+typedef struct StateLoadCtx {
+    BlockBackend *blk;          /* Block backend */
+} StateLoadCtx;
+
+extern int64_t page_size;       /* Page size */
+extern int64_t page_mask;       /* Page mask */
+extern int page_bits;           /* Page size bits */
+extern int64_t slice_size;      /* RAM slice size */
+extern int64_t slice_mask;      /* RAM slice mask */
+extern int slice_bits;          /* RAM slice size bits */
+
+void ram_init_state(void);
+void ram_destroy_state(void);
+StateSaveCtx *get_save_context(void);
+StateLoadCtx *get_load_context(void);
+int coroutine_fn save_state_main(StateSaveCtx *s);
+int coroutine_fn load_state_main(StateLoadCtx *s);
+
+#endif /* QEMU_SNAPSHOT_H */
diff --git a/meson.build b/meson.build
index 0b41ff4118..b851671914 100644
--- a/meson.build
+++ b/meson.build
@@ -2361,6 +2361,8 @@ if have_tools
              dependencies: [block, qemuutil], install: true)
   qemu_nbd = executable('qemu-nbd', files('qemu-nbd.c'),
                dependencies: [blockdev, qemuutil, gnutls], install: true)
+  qemu_snapshot = executable('qemu-snapshot', files('qemu-snapshot.c', 'qemu-snapshot-vm.c'),
+               dependencies: [blockdev, qemuutil, migration], install: true)
 
   subdir('storage-daemon')
   subdir('contrib/rdmacm-mux')
diff --git a/qemu-snapshot-vm.c b/qemu-snapshot-vm.c
new file mode 100644
index 0000000000..f7695e75c7
--- /dev/null
+++ b/qemu-snapshot-vm.c
@@ -0,0 +1,57 @@
+/*
+ * QEMU External Snapshot Utility
+ *
+ * Copyright Virtuozzo GmbH, 2021
+ *
+ * Authors:
+ *  Andrey Gruzdev   <andrey.gruzdev@virtuozzo.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later. See the COPYING file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "sysemu/block-backend.h"
+#include "qemu/coroutine.h"
+#include "qemu/cutils.h"
+#include "qemu/bitmap.h"
+#include "qemu/error-report.h"
+#include "io/channel-buffer.h"
+#include "migration/qemu-file-channel.h"
+#include "migration/qemu-file.h"
+#include "migration/savevm.h"
+#include "migration/ram.h"
+#include "qemu-snapshot.h"
+
+/* RAM transfer context */
+typedef struct RAMCtx {
+    int64_t normal_pages;       /* Total number of normal pages */
+} RAMCtx;
+
+static RAMCtx ram_ctx;
+
+int coroutine_fn save_state_main(StateSaveCtx *s)
+{
+    /* TODO: implement */
+    return 0;
+}
+
+int coroutine_fn load_state_main(StateLoadCtx *s)
+{
+    /* TODO: implement */
+    return 0;
+}
+
+/* Initialize snapshot RAM state */
+void ram_init_state(void)
+{
+    RAMCtx *ram = &ram_ctx;
+
+    memset(ram, 0, sizeof(ram_ctx));
+}
+
+/* Destroy snapshot RAM state */
+void ram_destroy_state(void)
+{
+    /* TODO: implement */
+}
diff --git a/qemu-snapshot.c b/qemu-snapshot.c
new file mode 100644
index 0000000000..7ac4ef66c4
--- /dev/null
+++ b/qemu-snapshot.c
@@ -0,0 +1,439 @@
+/*
+ * QEMU External Snapshot Utility
+ *
+ * Copyright Virtuozzo GmbH, 2021
+ *
+ * Authors:
+ *  Andrey Gruzdev   <andrey.gruzdev@virtuozzo.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later. See the COPYING file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include <getopt.h>
+
+#include "qemu-common.h"
+#include "qemu-version.h"
+#include "qapi/error.h"
+#include "qapi/qmp/qdict.h"
+#include "sysemu/sysemu.h"
+#include "sysemu/block-backend.h"
+#include "sysemu/runstate.h" /* for qemu_system_killed() prototype */
+#include "qemu/cutils.h"
+#include "qemu/coroutine.h"
+#include "qemu/error-report.h"
+#include "qemu/config-file.h"
+#include "qemu/log.h"
+#include "qemu/option_int.h"
+#include "trace/control.h"
+#include "io/channel-util.h"
+#include "io/channel-buffer.h"
+#include "migration/qemu-file-channel.h"
+#include "migration/qemu-file.h"
+#include "qemu-snapshot.h"
+
+int64_t page_size;
+int64_t page_mask;
+int page_bits;
+int64_t slice_size;
+int64_t slice_mask;
+int slice_bits;
+
+static QemuOptsList snap_blk_optslist = {
+    .name = "blockdev",
+    .implied_opt_name = "file.filename",
+    .head = QTAILQ_HEAD_INITIALIZER(snap_blk_optslist.head),
+    .desc = {
+        { /*End of the list */ }
+    },
+};
+
+static struct {
+    bool revert;                /* Operation is snapshot revert */
+
+    int fd;                     /* Migration channel fd */
+    int rp_fd;                  /* Return path fd (for postcopy) */
+
+    const char *blk_optstr;     /* Command-line options for vmstate blockdev */
+    QDict *blk_options;         /* Blockdev options */
+    int blk_flags;              /* Blockdev flags */
+
+    bool postcopy;              /* Use postcopy */
+    int postcopy_percent;       /* Start postcopy after % of normal pages loaded */
+} params;
+
+static StateSaveCtx state_save_ctx;
+static StateLoadCtx state_load_ctx;
+
+static enum {
+    RUNNING = 0,
+    TERMINATED
+} state;
+
+#ifdef CONFIG_POSIX
+void qemu_system_killed(int signum, pid_t pid)
+{
+}
+#endif /* CONFIG_POSIX */
+
+StateSaveCtx *get_save_context(void)
+{
+    return &state_save_ctx;
+}
+
+StateLoadCtx *get_load_context(void)
+{
+    return &state_load_ctx;
+}
+
+static void init_save_context(void)
+{
+    memset(&state_save_ctx, 0, sizeof(state_save_ctx));
+}
+
+static void destroy_save_context(void)
+{
+    /* TODO: implement */
+}
+
+static void init_load_context(void)
+{
+    memset(&state_load_ctx, 0, sizeof(state_load_ctx));
+}
+
+static void destroy_load_context(void)
+{
+    /* TODO: implement */
+}
+
+static BlockBackend *image_open_opts(const char *optstr, QDict *options, int flags)
+{
+    BlockBackend *blk;
+    Error *local_err = NULL;
+
+    /* Open image and create block backend */
+    blk = blk_new_open(NULL, NULL, options, flags, &local_err);
+    if (!blk) {
+        error_reportf_err(local_err, "Failed to open image '%s': ", optstr);
+        return NULL;
+    }
+
+    blk_set_enable_write_cache(blk, true);
+
+    return blk;
+}
+
+/* Use BH to enter coroutine from the main loop */
+static void enter_co_bh(void *opaque)
+{
+    Coroutine *co = (Coroutine *) opaque;
+    qemu_coroutine_enter(co);
+}
+
+static void coroutine_fn snapshot_save_co(void *opaque)
+{
+    StateSaveCtx *s = get_save_context();
+    int res = -1;
+
+    init_save_context();
+
+    /* Block backend */
+    s->blk = image_open_opts(params.blk_optstr, params.blk_options,
+                             params.blk_flags);
+    if (!s->blk) {
+        goto fail;
+    }
+
+    res = save_state_main(s);
+    if (res) {
+        error_report("Failed to save snapshot: %s", strerror(-res));
+    }
+
+fail:
+    destroy_save_context();
+    state = TERMINATED;
+}
+
+static void coroutine_fn snapshot_load_co(void *opaque)
+{
+    StateLoadCtx *s = get_load_context();
+    int res = -1;
+
+    init_load_context();
+
+    /* Block backend */
+    s->blk = image_open_opts(params.blk_optstr, params.blk_options,
+                             params.blk_flags);
+    if (!s->blk) {
+        goto fail;
+    }
+
+    res = load_state_main(s);
+    if (res) {
+        error_report("Failed to load snapshot: %s", strerror(-res));
+    }
+
+fail:
+    destroy_load_context();
+    state = TERMINATED;
+}
+
+static void usage(const char *name)
+{
+    printf(
+        "Usage: %s [options] <image-blockspec>\n"
+        "QEMU External Snapshot Utility\n"
+        "\n"
+        "'image-blockspec' is a block device specification for vmstate image\n"
+        "\n"
+        "  -h, --help                display this help and exit\n"
+        "  -V, --version             output version information and exit\n"
+        "\n"
+        "Options:\n"
+        "  -T, --trace [[enable=]<pattern>][,events=<file>][,file=<file>]\n"
+        "                            specify tracing options\n"
+        "  -r, --revert              revert to snapshot\n"
+        "      --uri=fd:<fd>         specify migration fd\n"
+        "      --page-size=<size>    specify target page size\n"
+        "      --postcopy=<%%ram>     switch to postcopy after %%ram loaded\n"
+        "\n"
+        QEMU_HELP_BOTTOM "\n", name);
+}
+
+static void version(const char *name)
+{
+    printf(
+        "%s " QEMU_FULL_VERSION "\n"
+        "Written by Andrey Gruzdev.\n"
+        "\n"
+        QEMU_COPYRIGHT "\n"
+        "This is free software; see the source for copying conditions.  There is NO\n"
+        "warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.\n",
+        name);
+}
+
+enum {
+    OPTION_PAGE_SIZE = 256,
+    OPTION_POSTCOPY,
+    OPTION_URI,
+};
+
+static void process_options(int argc, char *argv[])
+{
+    static const char *s_opt = "rhVT:";
+    static const struct option l_opt[] = {
+        { "page-size", required_argument, NULL, OPTION_PAGE_SIZE },
+        { "postcopy", required_argument, NULL, OPTION_POSTCOPY },
+        { "uri", required_argument, NULL,  OPTION_URI },
+        { "revert", no_argument, NULL, 'r' },
+        { "help", no_argument, NULL, 'h' },
+        { "version", no_argument, NULL, 'V' },
+        { "trace", required_argument, NULL, 'T' },
+        { NULL, 0, NULL, 0 }
+    };
+
+    bool has_page_size = false;
+    bool has_uri = false;
+
+    int64_t target_page_size = qemu_real_host_page_size;
+    int uri_fd = -1;
+    bool revert = false;
+    bool postcopy = false;
+    int postcopy_percent = 0;
+    const char *blk_optstr;
+    QemuOpts *blk_opts;
+    QDict *blk_options;
+    int c;
+
+    while ((c = getopt_long(argc, argv, s_opt, l_opt, NULL)) != -1) {
+        switch (c) {
+            case '?':
+                exit(EXIT_FAILURE);
+
+            case 'h':
+                usage(argv[0]);
+                exit(EXIT_SUCCESS);
+
+            case 'V':
+                version(argv[0]);
+                exit(EXIT_SUCCESS);
+
+            case 'T':
+                trace_opt_parse(optarg);
+                break;
+
+            case 'r':
+                if (revert) {
+                    error_report("-r and --revert can only be specified once");
+                    exit(EXIT_FAILURE);
+                }
+                revert = true;
+                
+                break;
+
+            case OPTION_POSTCOPY:
+            {
+                char *r;
+
+                if (postcopy) {
+                    error_report("--postcopy can only be specified once");
+                    exit(EXIT_FAILURE);
+                }
+                postcopy = true;
+
+                postcopy_percent = strtol(optarg, &r, 10);
+                if (*r != '\0' || postcopy_percent < 0 || postcopy_percent > 100) {
+                    error_report("Invalid argument to --postcopy");
+                    exit(EXIT_FAILURE);
+                }
+
+                break;
+            }
+
+            case OPTION_PAGE_SIZE:
+            {
+                char *r;
+
+                if (has_page_size) {
+                    error_report("--page-size can only be specified once");
+                    exit(EXIT_FAILURE);
+                }
+                has_page_size = true;
+
+                target_page_size = strtol(optarg, &r, 0);
+                if (*r != '\0' || (target_page_size & (target_page_size - 1)) != 0 ||
+                        target_page_size < PAGE_SIZE_MIN ||
+                        target_page_size > PAGE_SIZE_MAX) {
+                    error_report("Invalid argument to --page-size");
+                    exit(EXIT_FAILURE);
+                }
+
+                break;
+            }
+
+            case OPTION_URI:
+            {
+                const char *p;
+
+                if (has_uri) {
+                    error_report("--uri can only be specified once");
+                    exit(EXIT_FAILURE);
+                }
+                has_uri = true;
+
+                /* Only "--uri=fd:<fd>" is currently supported */
+                if (strstart(optarg, "fd:", &p)) {
+                    char *r;
+                    int fd;
+
+                    fd = strtol(p, &r,10);
+                    if (*r != '\0' || fd <= STDERR_FILENO) {
+                        error_report("Invalid FD value");
+                        exit(EXIT_FAILURE);
+                    }
+
+                    uri_fd = qemu_dup_flags(fd, O_CLOEXEC);
+                    if (uri_fd < 0) {
+                        error_report("Could not dup FD %d", fd);
+                        exit(EXIT_FAILURE);
+                    }
+
+                    /* Close original fd */
+                    close(fd);
+                } else {
+                    error_report("Invalid argument to --uri");
+                    exit(EXIT_FAILURE);
+                }
+
+                break;
+            }
+
+            default:
+                g_assert_not_reached();
+        }
+    }
+
+    if ((argc - optind) != 1) {
+        error_report("Invalid number of arguments");
+        exit(EXIT_FAILURE);
+    }
+
+    blk_optstr = argv[optind];
+
+    blk_opts = qemu_opts_parse_noisily(&snap_blk_optslist, blk_optstr, true);
+    if (!blk_opts) {
+        exit(EXIT_FAILURE);
+    }
+    blk_options = qemu_opts_to_qdict(blk_opts, NULL);
+    qemu_opts_reset(&snap_blk_optslist);
+
+    /* Enforced block layer options */
+    qdict_put_str(blk_options, "driver", "qcow2");
+    qdict_put_null(blk_options, "backing");
+    qdict_put_str(blk_options, "overlap-check", "none");
+    qdict_put_str(blk_options, "auto-read-only", "off");
+    qdict_put_str(blk_options, "detect-zeroes", "off");
+    qdict_put_str(blk_options, "lazy-refcounts", "on");
+    qdict_put_str(blk_options, "file.auto-read-only", "off");
+    qdict_put_str(blk_options, "file.detect-zeroes", "off");
+
+    params.revert = revert;
+
+    if (uri_fd != -1) {
+        params.fd = params.rp_fd = uri_fd;
+    } else {
+        params.fd = revert ? STDOUT_FILENO : STDIN_FILENO;
+        params.rp_fd = revert ? STDIN_FILENO : -1;
+    }
+    params.blk_optstr = blk_optstr;
+    params.blk_options = blk_options;
+    params.blk_flags = revert ? 0 : BDRV_O_RDWR;
+    params.postcopy = postcopy;
+    params.postcopy_percent = postcopy_percent;
+
+    page_size = target_page_size;
+    page_mask = ~(page_size - 1);
+    page_bits = ctz64(page_size);
+    slice_size = revert ? SLICE_SIZE_REVERT : SLICE_SIZE;
+    slice_mask = ~(slice_size - 1);
+    slice_bits = ctz64(slice_size);
+}
+
+int main(int argc, char **argv)
+{
+    Coroutine *co;
+
+    os_setup_early_signal_handling();
+    os_setup_signal_handling();
+    error_init(argv[0]);
+    qemu_init_exec_dir(argv[0]);
+    module_call_init(MODULE_INIT_TRACE);
+    module_call_init(MODULE_INIT_QOM);
+    qemu_init_main_loop(&error_fatal);
+    bdrv_init();
+
+    qemu_add_opts(&qemu_trace_opts);
+    process_options(argc, argv);
+
+    if (!trace_init_backends()) {
+        exit(EXIT_FAILURE);
+    }
+    trace_init_file();
+    qemu_set_log(LOG_TRACE);
+
+    ram_init_state();
+
+    if (params.revert) {
+        co = qemu_coroutine_create(snapshot_load_co, NULL);
+    } else {
+        co = qemu_coroutine_create(snapshot_save_co, NULL);
+    }
+    aio_bh_schedule_oneshot(qemu_get_aio_context(), enter_co_bh, co);
+
+    do {
+        main_loop_wait(false);
+    } while (state != TERMINATED);
+
+    exit(EXIT_SUCCESS);
+}
-- 
2.27.0



^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [RFC PATCH v1 2/7] migration/snapshot: Introduce qemu_ftell2() routine
  2021-05-12 19:26 [RFC PATCH v1 0/7] migration/snapshot: External snapshot utility Andrey Gruzdev
  2021-05-12 19:26 ` [RFC PATCH v1 1/7] migration/snapshot: Introduce qemu-snapshot tool Andrey Gruzdev
@ 2021-05-12 19:26 ` Andrey Gruzdev
  2021-05-12 19:26 ` [RFC PATCH v1 3/7] migration/snapshot: Move RAM_SAVE_FLAG_xxx defines to migration/ram.h Andrey Gruzdev
                   ` (5 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Andrey Gruzdev @ 2021-05-12 19:26 UTC (permalink / raw)
  To: qemu-devel
  Cc: Den Lunev, Vladimir Sementsov-Ogievskiy, Eric Blake,
	Paolo Bonzini, Juan Quintela, Dr . David Alan Gilbert,
	Markus Armbruster, Peter Xu, David Hildenbrand, Andrey Gruzdev

In qemu-snapshot it is needed to retrieve current QEMUFile offset as a
number of bytes read by qemu_get_byte()/qemu_get_buffer().

The existing qemu_ftell() routine would give read position as a number
of bytes fetched from underlying IOChannel which is not the same.

Signed-off-by: Andrey Gruzdev <andrey.gruzdev@virtuozzo.com>
---
 migration/qemu-file.c | 6 ++++++
 migration/qemu-file.h | 1 +
 2 files changed, 7 insertions(+)

diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index d6e03dbc0e..66be5e6460 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -657,6 +657,12 @@ int64_t qemu_ftell(QEMUFile *f)
     return f->pos;
 }
 
+int64_t qemu_ftell2(QEMUFile *f)
+{
+    qemu_fflush(f);
+    return f->pos + f->buf_index - f->buf_size;
+}
+
 int qemu_file_rate_limit(QEMUFile *f)
 {
     if (f->shutdown) {
diff --git a/migration/qemu-file.h b/migration/qemu-file.h
index a9b6d6ccb7..bd1a6def02 100644
--- a/migration/qemu-file.h
+++ b/migration/qemu-file.h
@@ -124,6 +124,7 @@ void qemu_file_set_hooks(QEMUFile *f, const QEMUFileHooks *hooks);
 int qemu_get_fd(QEMUFile *f);
 int qemu_fclose(QEMUFile *f);
 int64_t qemu_ftell(QEMUFile *f);
+int64_t qemu_ftell2(QEMUFile *f);
 int64_t qemu_ftell_fast(QEMUFile *f);
 /*
  * put_buffer without copying the buffer.
-- 
2.27.0



^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [RFC PATCH v1 3/7] migration/snapshot: Move RAM_SAVE_FLAG_xxx defines to migration/ram.h
  2021-05-12 19:26 [RFC PATCH v1 0/7] migration/snapshot: External snapshot utility Andrey Gruzdev
  2021-05-12 19:26 ` [RFC PATCH v1 1/7] migration/snapshot: Introduce qemu-snapshot tool Andrey Gruzdev
  2021-05-12 19:26 ` [RFC PATCH v1 2/7] migration/snapshot: Introduce qemu_ftell2() routine Andrey Gruzdev
@ 2021-05-12 19:26 ` Andrey Gruzdev
  2021-05-12 19:26 ` [RFC PATCH v1 4/7] migration/snapshot: Block layer AIO support in qemu-snapshot Andrey Gruzdev
                   ` (4 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Andrey Gruzdev @ 2021-05-12 19:26 UTC (permalink / raw)
  To: qemu-devel
  Cc: Den Lunev, Vladimir Sementsov-Ogievskiy, Eric Blake,
	Paolo Bonzini, Juan Quintela, Dr . David Alan Gilbert,
	Markus Armbruster, Peter Xu, David Hildenbrand, Andrey Gruzdev

Move RAM_SAVE_FLAG_xxx defines from migration/ram.c to migration/ram.h

Signed-off-by: Andrey Gruzdev <andrey.gruzdev@virtuozzo.com>
---
 migration/ram.c | 16 ----------------
 migration/ram.h | 16 ++++++++++++++++
 2 files changed, 16 insertions(+), 16 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index ace8ad431c..0359b63dde 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -63,22 +63,6 @@
 /***********************************************************/
 /* ram save/restore */
 
-/* RAM_SAVE_FLAG_ZERO used to be named RAM_SAVE_FLAG_COMPRESS, it
- * worked for pages that where filled with the same char.  We switched
- * it to only search for the zero value.  And to avoid confusion with
- * RAM_SSAVE_FLAG_COMPRESS_PAGE just rename it.
- */
-
-#define RAM_SAVE_FLAG_FULL     0x01 /* Obsolete, not used anymore */
-#define RAM_SAVE_FLAG_ZERO     0x02
-#define RAM_SAVE_FLAG_MEM_SIZE 0x04
-#define RAM_SAVE_FLAG_PAGE     0x08
-#define RAM_SAVE_FLAG_EOS      0x10
-#define RAM_SAVE_FLAG_CONTINUE 0x20
-#define RAM_SAVE_FLAG_XBZRLE   0x40
-/* 0x80 is reserved in migration.h start with 0x100 next */
-#define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
-
 static inline bool is_zero_range(uint8_t *p, uint64_t size)
 {
     return buffer_is_zero(p, size);
diff --git a/migration/ram.h b/migration/ram.h
index 4833e9fd5b..d6498b651f 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -33,6 +33,22 @@
 #include "exec/cpu-common.h"
 #include "io/channel.h"
 
+/* RAM_SAVE_FLAG_ZERO used to be named RAM_SAVE_FLAG_COMPRESS, it
+ * worked for pages that where filled with the same char.  We switched
+ * it to only search for the zero value.  And to avoid confusion with
+ * RAM_SSAVE_FLAG_COMPRESS_PAGE just rename it.
+ */
+
+#define RAM_SAVE_FLAG_FULL     0x01 /* Obsolete, not used anymore */
+#define RAM_SAVE_FLAG_ZERO     0x02
+#define RAM_SAVE_FLAG_MEM_SIZE 0x04
+#define RAM_SAVE_FLAG_PAGE     0x08
+#define RAM_SAVE_FLAG_EOS      0x10
+#define RAM_SAVE_FLAG_CONTINUE 0x20
+#define RAM_SAVE_FLAG_XBZRLE   0x40
+/* 0x80 is reserved in migration.h start with 0x100 next */
+#define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
+
 extern MigrationStats ram_counters;
 extern XBZRLECacheStats xbzrle_counters;
 extern CompressionStats compression_counters;
-- 
2.27.0



^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [RFC PATCH v1 4/7] migration/snapshot: Block layer AIO support in qemu-snapshot
  2021-05-12 19:26 [RFC PATCH v1 0/7] migration/snapshot: External snapshot utility Andrey Gruzdev
                   ` (2 preceding siblings ...)
  2021-05-12 19:26 ` [RFC PATCH v1 3/7] migration/snapshot: Move RAM_SAVE_FLAG_xxx defines to migration/ram.h Andrey Gruzdev
@ 2021-05-12 19:26 ` Andrey Gruzdev
  2021-05-12 19:26 ` [RFC PATCH v1 5/7] migration/snapshot: Implementation of qemu-snapshot save path Andrey Gruzdev
                   ` (3 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Andrey Gruzdev @ 2021-05-12 19:26 UTC (permalink / raw)
  To: qemu-devel
  Cc: Den Lunev, Vladimir Sementsov-Ogievskiy, Eric Blake,
	Paolo Bonzini, Juan Quintela, Dr . David Alan Gilbert,
	Markus Armbruster, Peter Xu, David Hildenbrand, Andrey Gruzdev

This commit enables asynchronous block layer I/O for qemu-snapshot tool.
Implementation provides in-order request completion delivery to simplify
migration code.

Several file utility routines are introduced as well.

Signed-off-by: Andrey Gruzdev <andrey.gruzdev@virtuozzo.com>
---
 include/qemu-snapshot.h |  30 +++++
 meson.build             |   2 +-
 qemu-snapshot-io.c      | 266 ++++++++++++++++++++++++++++++++++++++++
 3 files changed, 297 insertions(+), 1 deletion(-)
 create mode 100644 qemu-snapshot-io.c

diff --git a/include/qemu-snapshot.h b/include/qemu-snapshot.h
index 154e11e9a5..7b3406fd56 100644
--- a/include/qemu-snapshot.h
+++ b/include/qemu-snapshot.h
@@ -34,6 +34,23 @@
 /* RAM slice size for snapshot revert */
 #define SLICE_SIZE_REVERT           (16 * PAGE_SIZE_MAX)
 
+typedef struct AioRing AioRing;
+
+typedef struct AioRingRequest {
+    void *opaque;               /* Opaque */
+
+    void *data;                 /* Data buffer */
+    int64_t offset;             /* Offset */
+    size_t size;                /* Size */
+} AioRingRequest;
+
+typedef struct AioRingEvent {
+    AioRingRequest *origin;     /* Originating request */
+    ssize_t status;             /* Completion status */
+} AioRingEvent;
+
+typedef ssize_t coroutine_fn (*AioRingFunc)(AioRingRequest *req);
+
 typedef struct StateSaveCtx {
     BlockBackend *blk;          /* Block backend */
 } StateSaveCtx;
@@ -56,4 +73,17 @@ StateLoadCtx *get_load_context(void);
 int coroutine_fn save_state_main(StateSaveCtx *s);
 int coroutine_fn load_state_main(StateLoadCtx *s);
 
+AioRing *coroutine_fn aio_ring_new(AioRingFunc func, unsigned ring_entries,
+        unsigned max_inflight);
+void aio_ring_free(AioRing *ring);
+void aio_ring_set_max_inflight(AioRing *ring, unsigned max_inflight);
+AioRingRequest *coroutine_fn aio_ring_get_request(AioRing *ring);
+void coroutine_fn aio_ring_submit(AioRing *ring);
+AioRingEvent *coroutine_fn aio_ring_wait_event(AioRing *ring);
+void coroutine_fn aio_ring_complete(AioRing *ring);
+
+QEMUFile *qemu_fopen_bdrv_vmstate(BlockDriverState *bs, int is_writable);
+void qemu_fsplice(QEMUFile *f_dst, QEMUFile *f_src, size_t size);
+void qemu_fsplice_tail(QEMUFile *f_dst, QEMUFile *f_src);
+
 #endif /* QEMU_SNAPSHOT_H */
diff --git a/meson.build b/meson.build
index b851671914..c25fc518df 100644
--- a/meson.build
+++ b/meson.build
@@ -2361,7 +2361,7 @@ if have_tools
              dependencies: [block, qemuutil], install: true)
   qemu_nbd = executable('qemu-nbd', files('qemu-nbd.c'),
                dependencies: [blockdev, qemuutil, gnutls], install: true)
-  qemu_snapshot = executable('qemu-snapshot', files('qemu-snapshot.c', 'qemu-snapshot-vm.c'),
+  qemu_snapshot = executable('qemu-snapshot', files('qemu-snapshot.c', 'qemu-snapshot-vm.c', 'qemu-snapshot-io.c'),
                dependencies: [blockdev, qemuutil, migration], install: true)
 
   subdir('storage-daemon')
diff --git a/qemu-snapshot-io.c b/qemu-snapshot-io.c
new file mode 100644
index 0000000000..cd6428a4a2
--- /dev/null
+++ b/qemu-snapshot-io.c
@@ -0,0 +1,266 @@
+/*
+ * QEMU External Snapshot Utility
+ *
+ * Copyright Virtuozzo GmbH, 2021
+ *
+ * Authors:
+ *  Andrey Gruzdev   <andrey.gruzdev@virtuozzo.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later. See the COPYING file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/coroutine.h"
+#include "sysemu/block-backend.h"
+#include "migration/qemu-file.h"
+#include "qemu-snapshot.h"
+
+/*
+ * AIO ring.
+ *
+ * Coroutine-based environment to support asynchronous I/O operations
+ * providing in-order completion event delivery.
+ *
+ * All routines (with an exception of aio_ring_free()) are required to be
+ * called from the same coroutine.
+ *
+ * Call sequence to keep AIO ring filled:
+ *
+ *   aio_ring_new()             !
+ *                              !
+ *   aio_ring_get_request()     !<------!<------!
+ *   aio_ring_submit()          !------>!       !
+ *                              !               !
+ *   aio_ring_wait_event()      !               !
+ *   aio_ring_complete()        !-------------->!
+ *                              !
+ *   aio_ring_free()            !
+ *
+ */
+
+typedef struct AioRingEntry {
+    AioRingRequest request;     /* I/O request */
+    AioRingEvent event;         /* I/O completion event */
+    bool owned;                 /* Owned by caller */
+} AioRingEntry;
+
+typedef struct AioRing {
+    unsigned head;              /* Head entry index */
+    unsigned tail;              /* Tail entry index */
+
+    unsigned ring_mask;         /* Mask for ring entry indices */
+    unsigned ring_entries;      /* Number of entries in the ring */
+
+    AioRingFunc func;           /* Routine to call */
+
+    Coroutine *main_co;         /* Caller's coroutine */
+    bool waiting;               /* Caller is waiting for event */
+
+    unsigned length;            /* Tail-head distance */
+    unsigned inflight;          /* Number of in-flight requests */
+    unsigned max_inflight;      /* Maximum in-flight requests */
+
+    AioRingEntry entries[];     /* Flex-array of AioRingEntry */
+} AioRing;
+
+static void coroutine_fn aio_ring_co(void *opaque)
+{
+    AioRing *ring = (AioRing *) opaque;
+    AioRingEntry *entry = &ring->entries[ring->tail];
+
+    ring->tail = (ring->tail + 1) & ring->ring_mask;
+    ring->length++;
+
+    ring->inflight++;
+    entry->owned = false;
+
+    entry->event.status = ring->func(&entry->request);
+
+    entry->event.origin = &entry->request;
+    entry->owned = true;
+    ring->inflight--;
+
+    if (ring->waiting) {
+        ring->waiting = false;
+        aio_co_wake(ring->main_co);
+    }
+}
+
+AioRingRequest *coroutine_fn aio_ring_get_request(AioRing *ring)
+{
+    assert(qemu_coroutine_self() == ring->main_co);
+
+    if (ring->length >= ring->ring_entries ||
+            ring->inflight >= ring->max_inflight) {
+        return NULL;
+    }
+
+    return &ring->entries[ring->tail].request;
+}
+
+void coroutine_fn aio_ring_submit(AioRing *ring)
+{
+    assert(qemu_coroutine_self() == ring->main_co);
+    assert(ring->length < ring->ring_entries);
+
+    qemu_coroutine_enter(qemu_coroutine_create(aio_ring_co, ring));
+}
+
+AioRingEvent *coroutine_fn aio_ring_wait_event(AioRing *ring)
+{
+    AioRingEntry *entry = &ring->entries[ring->head];
+
+    assert(qemu_coroutine_self() == ring->main_co);
+
+    if (!ring->length) {
+        return NULL;
+    }
+
+    while (true) {
+        if (entry->owned) {
+            return &entry->event;
+        }
+        ring->waiting = true;
+        qemu_coroutine_yield();
+    }
+
+    /* NOTREACHED */
+}
+
+void coroutine_fn aio_ring_complete(AioRing *ring)
+{
+    AioRingEntry *entry = &ring->entries[ring->head];
+
+    assert(qemu_coroutine_self() == ring->main_co);
+    assert(ring->length);
+
+    ring->head = (ring->head + 1) & ring->ring_mask;
+    ring->length--;
+
+    entry->event.origin = NULL;
+    entry->event.status = 0;
+}
+
+/* Create new AIO ring */
+AioRing *coroutine_fn aio_ring_new(AioRingFunc func, unsigned ring_entries,
+                                   unsigned max_inflight)
+{
+    AioRing *ring;
+
+    assert(is_power_of_2(ring_entries));
+    assert(max_inflight && max_inflight <= ring_entries);
+
+    ring = g_malloc0(sizeof(AioRing) + ring_entries * sizeof(AioRingEntry));
+    ring->main_co = qemu_coroutine_self();
+    ring->ring_entries = ring_entries;
+    ring->ring_mask = ring_entries - 1;
+    ring->max_inflight = max_inflight;
+    ring->func = func;
+
+    return ring;
+}
+
+/* Free AIO ring */
+void aio_ring_free(AioRing *ring)
+{
+    assert(!ring->inflight);
+    g_free(ring);
+}
+
+/* Limit the maximum number of in-flight AIO requests */
+void aio_ring_set_max_inflight(AioRing *ring, unsigned max_inflight)
+{
+    ring->max_inflight = MIN(max_inflight, ring->ring_entries);
+}
+
+static ssize_t bdrv_vmstate_get_buffer(void *opaque, uint8_t *buf, int64_t pos,
+                                       size_t size, Error **errp)
+{
+    return bdrv_load_vmstate((BlockDriverState *) opaque, buf, pos, size);
+}
+
+static ssize_t bdrv_vmstate_writev_buffer(void *opaque, struct iovec *iov,
+        int iovcnt, int64_t pos, Error **errp)
+{
+    QEMUIOVector qiov;
+    int res;
+
+    qemu_iovec_init_external(&qiov, iov, iovcnt);
+
+    res = bdrv_writev_vmstate((BlockDriverState *) opaque, &qiov, pos);
+    if (res < 0) {
+        return res;
+    }
+
+    return qiov.size;
+}
+
+static int bdrv_vmstate_fclose(void *opaque, Error **errp)
+{
+    return bdrv_flush((BlockDriverState *) opaque);
+}
+
+static const QEMUFileOps bdrv_vmstate_read_ops = {
+    .get_buffer = bdrv_vmstate_get_buffer,
+    .close      = bdrv_vmstate_fclose,
+};
+
+static const QEMUFileOps bdrv_vmstate_write_ops = {
+    .writev_buffer  = bdrv_vmstate_writev_buffer,
+    .close          = bdrv_vmstate_fclose,
+};
+
+/* Create QEMUFile to access vmstate stream on QCOW2 image */
+QEMUFile *qemu_fopen_bdrv_vmstate(BlockDriverState *bs, int is_writable)
+{
+    if (is_writable) {
+        return qemu_fopen_ops(bs, &bdrv_vmstate_write_ops);
+    }
+
+    return qemu_fopen_ops(bs, &bdrv_vmstate_read_ops);
+}
+
+/* Move number of bytes from the source QEMUFile to destination */
+void qemu_fsplice(QEMUFile *f_dst, QEMUFile *f_src, size_t size)
+{
+    size_t rest = size;
+
+    while (rest) {
+        uint8_t *ptr = NULL;
+        size_t req_size;
+        size_t count;
+
+        req_size = MIN(rest, INPLACE_READ_MAX);
+        count = qemu_peek_buffer(f_src, &ptr, req_size, 0);
+        qemu_file_skip(f_src, count);
+
+        qemu_put_buffer(f_dst, ptr, count);
+        rest -= count;
+    }
+}
+
+/*
+ * Move data from source QEMUFile to destination
+ * until EOF is reached on source.
+ */
+void qemu_fsplice_tail(QEMUFile *f_dst, QEMUFile *f_src)
+{
+    bool eof = false;
+
+    while (!eof) {
+        const size_t size = INPLACE_READ_MAX;
+        uint8_t *buffer = NULL;
+        size_t count;
+
+        count = qemu_peek_buffer(f_src, &buffer, size, 0);
+        qemu_file_skip(f_src, count);
+
+        /* Reached EOF on source? */
+        if (count != size) {
+            eof = true;
+        }
+
+        qemu_put_buffer(f_dst, buffer, count);
+    }
+}
-- 
2.27.0



^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [RFC PATCH v1 5/7] migration/snapshot: Implementation of qemu-snapshot save path
  2021-05-12 19:26 [RFC PATCH v1 0/7] migration/snapshot: External snapshot utility Andrey Gruzdev
                   ` (3 preceding siblings ...)
  2021-05-12 19:26 ` [RFC PATCH v1 4/7] migration/snapshot: Block layer AIO support in qemu-snapshot Andrey Gruzdev
@ 2021-05-12 19:26 ` Andrey Gruzdev
  2021-05-12 19:26 ` [RFC PATCH v1 6/7] migration/snapshot: Implementation of qemu-snapshot load path Andrey Gruzdev
                   ` (2 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Andrey Gruzdev @ 2021-05-12 19:26 UTC (permalink / raw)
  To: qemu-devel
  Cc: Den Lunev, Vladimir Sementsov-Ogievskiy, Eric Blake,
	Paolo Bonzini, Juan Quintela, Dr . David Alan Gilbert,
	Markus Armbruster, Peter Xu, David Hildenbrand, Andrey Gruzdev

Includes code to parse incoming migration stream, dispatch data to
section handlers and deal with complications of open-coded migration
format without introducing strong dependencies on QEMU migration code.

Signed-off-by: Andrey Gruzdev <andrey.gruzdev@virtuozzo.com>
---
 include/qemu-snapshot.h |  34 +-
 qemu-snapshot-vm.c      | 771 +++++++++++++++++++++++++++++++++++++++-
 qemu-snapshot.c         |  56 ++-
 3 files changed, 857 insertions(+), 4 deletions(-)

diff --git a/include/qemu-snapshot.h b/include/qemu-snapshot.h
index 7b3406fd56..52519f76c4 100644
--- a/include/qemu-snapshot.h
+++ b/include/qemu-snapshot.h
@@ -51,8 +51,40 @@ typedef struct AioRingEvent {
 
 typedef ssize_t coroutine_fn (*AioRingFunc)(AioRingRequest *req);
 
+typedef struct QIOChannelBuffer QIOChannelBuffer;
+
 typedef struct StateSaveCtx {
-    BlockBackend *blk;          /* Block backend */
+    BlockBackend *blk;              /* Block backend */
+    QEMUFile *f_fd;                 /* QEMUFile for incoming stream */
+    QEMUFile *f_vmstate;            /* QEMUFile for vmstate backing */
+
+    QIOChannelBuffer *ioc_leader;   /* Migration stream leader */
+    QIOChannelBuffer *ioc_pages;    /* Page coalescing buffer */
+
+    /* Block offset of first page in ioc_pages */
+    int64_t bdrv_offset;
+    /* Block offset of the last page in ioc_pages */
+    int64_t last_bdrv_offset;
+
+    /* Current section offset */
+    int64_t section_offset;
+    /* Offset of the section containing list of RAM blocks */
+    int64_t ram_list_offset;
+    /* Offset of the first RAM section */
+    int64_t ram_offset;
+    /* Offset of the first non-iterable device section */
+    int64_t device_offset;
+
+    /* Zero buffer to fill unwritten slices on backing */
+    void *zero_buf;
+
+    /*
+     * Since we can't rewind the state of migration stream QEMUFile, we just
+     * keep first few hundreds of bytes from the beginning of each section for
+     * the case if particular section appears to be the first non-iterable
+     * device section and we are going to call default_handler().
+     */
+    uint8_t section_header[512];
 } StateSaveCtx;
 
 typedef struct StateLoadCtx {
diff --git a/qemu-snapshot-vm.c b/qemu-snapshot-vm.c
index f7695e75c7..2d8f2d3d79 100644
--- a/qemu-snapshot-vm.c
+++ b/qemu-snapshot-vm.c
@@ -23,17 +23,784 @@
 #include "migration/ram.h"
 #include "qemu-snapshot.h"
 
+/* vmstate header magic */
+#define VMSTATE_HEADER_MAGIC        0x5354564d
+/* vmstate header eof_offset position */
+#define VMSTATE_HEADER_EOF_OFFSET   24
+/* vmstate header size */
+#define VMSTATE_HEADER_SIZE         28
+
+/* Maximum size of page coalescing buffer */
+#define PAGE_COALESC_MAX            (512 * 1024)
+
+/* RAM block */
+typedef struct RAMBlock {
+    int64_t bdrv_offset;        /* Offset on backing storage */
+    int64_t length;             /* Length */
+    int64_t nr_pages;           /* Page count */
+    int64_t nr_slices;          /* Number of slices (for bitmap bookkeeping) */
+
+    unsigned long *bitmap;      /* Bitmap of RAM slices */
+
+    /* Link into ram_list */
+    QSIMPLEQ_ENTRY(RAMBlock) next;
+
+    char idstr[256];            /* RAM block id string */
+} RAMBlock;
+
+/* RAM block page */
+typedef struct RAMPage {
+    RAMBlock *block;            /* RAM block containing the page */
+    int64_t offset;             /* Page offset in RAM block */
+} RAMPage;
+
 /* RAM transfer context */
 typedef struct RAMCtx {
     int64_t normal_pages;       /* Total number of normal pages */
+
+    /* RAM block list head */
+    QSIMPLEQ_HEAD(, RAMBlock) ram_block_list;
 } RAMCtx;
 
+/* Section handler ops */
+typedef struct SectionHandlerOps {
+    int (*save_state)(QEMUFile *f, void *opaque, int version_id);
+    int (*load_state)(QEMUFile *f, void *opaque, int version_id);
+    int (*load_state_iterate)(QEMUFile *f, void *opaque, int version_id);
+} SectionHandlerOps;
+
+/* Section handlers entry */
+typedef struct SectionHandlersEntry {
+    const char *idstr;          /* Section id string */
+    const int instance_id;      /* Section instance id */
+    const int version_id;       /* Max. supported section version id */
+
+    int real_section_id;        /* Section id from migration stream */
+    int real_version_id;        /* Version id from migration stream */
+
+    SectionHandlerOps *ops;     /* Section handler callbacks */
+} SectionHandlersEntry;
+
+/* Section handlers */
+typedef struct SectionHandlers {
+    /* Default handler */
+    SectionHandlersEntry default_;
+    /* Handlers */
+    SectionHandlersEntry handlers[];
+} SectionHandlers;
+
+#define SECTION_HANDLERS_ENTRY(_idstr, _instance_id, _version_id, _ops) {   \
+    .idstr          = _idstr,   \
+    .instance_id    = (_instance_id),   \
+    .version_id     = (_version_id),    \
+    .ops            = (_ops),           \
+}
+
+#define SECTION_HANDLERS_END()  { NULL, }
+
+/* Forward declarations */
+static int default_save(QEMUFile *f, void *opaque, int version_id);
+static int ram_save(QEMUFile *f, void *opaque, int version_id);
+static int save_state_complete(StateSaveCtx *s);
+
 static RAMCtx ram_ctx;
 
+static SectionHandlerOps default_handler_ops = {
+    .save_state = default_save,
+};
+
+static SectionHandlerOps ram_handler_ops = {
+    .save_state = ram_save,
+};
+
+static SectionHandlers section_handlers = {
+    .default_ = SECTION_HANDLERS_ENTRY("default", 0, 0, &default_handler_ops),
+    .handlers = {
+        SECTION_HANDLERS_ENTRY("ram", 0, 4, &ram_handler_ops),
+        SECTION_HANDLERS_END(),
+    },
+};
+
+static SectionHandlersEntry *find_se(const char *idstr, int instance_id)
+{
+    SectionHandlersEntry *se;
+
+    for (se = section_handlers.handlers; se->idstr; se++) {
+        if (!strcmp(se->idstr, idstr) && (instance_id == se->instance_id)) {
+            return se;
+        }
+    }
+
+    return NULL;
+}
+
+static SectionHandlersEntry *find_se_by_section_id(int section_id)
+{
+    SectionHandlersEntry *se;
+
+    for (se = section_handlers.handlers; se->idstr; se++) {
+        if (section_id == se->real_section_id) {
+            return se;
+        }
+    }
+
+    return NULL;
+}
+
+static bool check_section_footer(QEMUFile *f, SectionHandlersEntry *se)
+{
+    uint8_t token;
+    int section_id;
+
+    token = qemu_get_byte(f);
+    if (token != QEMU_VM_SECTION_FOOTER) {
+        error_report("Missing footer for section %s(%d)",
+                     se->idstr, se->real_section_id);
+        return false;
+    }
+
+    section_id = qemu_get_be32(f);
+    if (section_id != se->real_section_id) {
+        error_report("Unmatched footer for for section %s(%d): %d",
+                     se->idstr, se->real_section_id, section_id);
+        return false;
+    }
+
+    return true;
+}
+
+static inline
+bool ram_offset_in_block(RAMBlock *block, int64_t offset)
+{
+    return block && offset < block->length;
+}
+
+static inline
+bool ram_bdrv_offset_in_block(RAMBlock *block, int64_t bdrv_offset)
+{
+    return block && bdrv_offset >= block->bdrv_offset &&
+            bdrv_offset < block->bdrv_offset + block->length;
+}
+
+static inline
+int64_t ram_bdrv_from_block_offset(RAMBlock *block, int64_t offset)
+{
+    if (!ram_offset_in_block(block, offset)) {
+        return INVALID_OFFSET;
+    }
+
+    return block->bdrv_offset + offset;
+}
+
+static inline
+int64_t ram_block_offset_from_bdrv(RAMBlock *block, int64_t bdrv_offset)
+{
+    int64_t offset;
+
+    if (!block) {
+        return INVALID_OFFSET;
+    }
+
+    offset = bdrv_offset - block->bdrv_offset;
+    return offset >= 0 ? offset : INVALID_OFFSET;
+}
+
+static RAMBlock *ram_block_by_idstr(const char *idstr)
+{
+    RAMBlock *block;
+
+    QSIMPLEQ_FOREACH(block, &ram_ctx.ram_block_list, next) {
+        if (!strcmp(idstr, block->idstr)) {
+            return block;
+        }
+    }
+
+    return NULL;
+}
+
+static RAMBlock *ram_block_from_stream(QEMUFile *f, int flags)
+{
+    static RAMBlock *block;
+    char idstr[256];
+
+    if (flags & RAM_SAVE_FLAG_CONTINUE) {
+        if (!block) {
+            error_report("RAM_SAVE_FLAG_CONTINUE outside RAM block");
+            return NULL;
+        }
+
+        return block;
+    }
+
+    if (!qemu_get_counted_string(f, idstr)) {
+        error_report("Failed to get RAM block name");
+        return NULL;
+    }
+
+    block = ram_block_by_idstr(idstr);
+    if (!block) {
+        error_report("Can't find RAM block %s", idstr);
+        return NULL;
+    }
+
+    return block;
+}
+
+static int64_t ram_block_next_bdrv_offset(void)
+{
+    RAMBlock *last_block;
+    int64_t offset;
+
+    last_block = QSIMPLEQ_LAST(&ram_ctx.ram_block_list, RAMBlock, next);
+    if (!last_block) {
+        return 0;
+    }
+
+    offset = last_block->bdrv_offset + last_block->length;
+    return ROUND_UP(offset, BDRV_CLUSTER_SIZE);
+}
+
+static void ram_block_add(const char *idstr, int64_t size)
+{
+    RAMBlock *block;
+
+    block = g_new0(RAMBlock, 1);
+    block->length = size;
+    block->bdrv_offset = ram_block_next_bdrv_offset();
+    strcpy(block->idstr, idstr);
+
+    QSIMPLEQ_INSERT_TAIL(&ram_ctx.ram_block_list, block, next);
+}
+
+static void ram_block_list_init_bitmaps(void)
+{
+    RAMBlock *block;
+
+    QSIMPLEQ_FOREACH(block, &ram_ctx.ram_block_list, next) {
+        block->nr_pages = block->length >> page_bits;
+        block->nr_slices = ROUND_UP(block->length, slice_size) >> slice_bits;
+
+        block->bitmap = bitmap_new(block->nr_slices);
+        bitmap_set(block->bitmap, 0, block->nr_slices);
+    }
+}
+
+static bool ram_block_list_from_stream(QEMUFile *f, int64_t mem_size)
+{
+    int64_t total_ram_bytes;
+
+    total_ram_bytes = mem_size;
+    while (total_ram_bytes > 0) {
+        char idstr[256];
+        int64_t size;
+
+        if (!qemu_get_counted_string(f, idstr)) {
+            error_report("Failed to get RAM block list");
+            return false;
+        }
+        size = qemu_get_be64(f);
+
+        ram_block_add(idstr, size);
+        total_ram_bytes -= size;
+    }
+
+    if (total_ram_bytes != 0) {
+        error_report("Corrupted RAM block list");
+        return false;
+    }
+
+    /* Initialize per-block bitmaps */
+    ram_block_list_init_bitmaps();
+
+    return true;
+}
+
+static void save_state_check_errors(StateSaveCtx *s, int *res)
+{
+    /* Check for -EIO which indicates input stream EOF */
+    if (*res == -EIO) {
+        *res = 0;
+    }
+
+    /*
+     * Check for file errors on success. Replace generic -EINVAL
+     * retcode with file error if possible.
+     */
+    if (*res >= 0 || *res == -EINVAL) {
+        int f_res = qemu_file_get_error(s->f_fd);
+
+        f_res = (f_res == -EIO) ? 0 : f_res;
+        if (!f_res) {
+            f_res = qemu_file_get_error(s->f_vmstate);
+        }
+        if (f_res) {
+            *res = f_res;
+        }
+    }
+}
+
+static int ram_alloc_page_backing(StateSaveCtx *s, RAMPage *page,
+                                  int64_t bdrv_offset)
+{
+    int res = 0;
+
+    /*
+     * Reduce the number of unwritten extents in image backing file.
+     *
+     * We can achieve that by using a bitmap of RAM block 'slices' to
+     * enforce zero blockdev write once we are going to store a memory
+     * page within that slice.
+     */
+    if (test_and_clear_bit(page->offset >> slice_bits, page->block->bitmap)) {
+        res = blk_pwrite(s->blk, bdrv_offset & slice_mask,
+                         s->zero_buf, slice_size, 0);
+    }
+
+    return MIN(res, 0);
+}
+
+static int ram_save_page(StateSaveCtx *s, RAMPage *page, uint8_t *data)
+{
+    size_t usage = s->ioc_pages->usage;
+    int64_t bdrv_offset;
+    int res = 0;
+
+    bdrv_offset = ram_bdrv_from_block_offset(page->block, page->offset);
+    if (bdrv_offset == INVALID_OFFSET) {
+        error_report("Corrupted RAM page");
+        return -EINVAL;
+    }
+
+    /* Deal with fragmentation of the image backing file */
+    res = ram_alloc_page_backing(s, page, bdrv_offset);
+    if (res) {
+        return res;
+    }
+
+    /* Are we saving a contiguous page? */
+    if (bdrv_offset != s->last_bdrv_offset ||
+            (usage + page_size) >= PAGE_COALESC_MAX) {
+        if (usage) {
+            /* Flush coalesced pages to block device */
+            res = blk_pwrite(s->blk, s->bdrv_offset, s->ioc_pages->data,
+                             usage, 0);
+            res = MIN(res, 0);
+        }
+
+        /* Reset coalescing buffer state */
+        s->ioc_pages->usage = 0;
+        s->ioc_pages->offset = 0;
+        /* Switch to the new bdrv_offset */
+        s->bdrv_offset = bdrv_offset;
+    }
+
+    qio_channel_write(QIO_CHANNEL(s->ioc_pages), (char *) data,
+                      page_size, NULL);
+    s->last_bdrv_offset = bdrv_offset + page_size;
+
+    return res;
+}
+
+static int ram_save_page_flush(StateSaveCtx *s)
+{
+    size_t usage = s->ioc_pages->usage;
+    int res = 0;
+
+    if (usage) {
+        /* Flush coalesced pages to block device */
+        res = blk_pwrite(s->blk, s->bdrv_offset,
+                         s->ioc_pages->data, usage, 0);
+        res = MIN(res, 0);
+    }
+
+    /* Reset coalescing buffer state */
+    s->ioc_pages->usage = 0;
+    s->ioc_pages->offset = 0;
+
+    s->last_bdrv_offset = INVALID_OFFSET;
+
+    return res;
+}
+
+static int ram_save(QEMUFile *f, void *opaque, int version_id)
+{
+    StateSaveCtx *s = (StateSaveCtx *) opaque;
+    int incompat_flags = RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE;
+    int flags = 0;
+    int res = 0;
+
+    if (version_id != 4) {
+        error_report("Unsupported version %d for 'ram' handler v4", version_id);
+        return -EINVAL;
+    }
+
+    while (!res && !(flags & RAM_SAVE_FLAG_EOS)) {
+        RAMBlock *block = NULL;
+        int64_t offset;
+
+        offset = qemu_get_be64(f);
+        flags = offset & ~page_mask;
+        offset &= page_mask;
+
+        if (flags & incompat_flags) {
+            error_report("Incompatible RAM page flags 0x%x", flags);
+            res = -EINVAL;
+            break;
+        }
+
+        /* Lookup RAM block for the page */
+        if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE)) {
+            block = ram_block_from_stream(f, flags);
+            if (!block) {
+                res = -EINVAL;
+                break;
+            }
+        }
+
+        switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
+        case RAM_SAVE_FLAG_MEM_SIZE:
+            if (s->ram_list_offset) {
+                error_report("Repeated RAM page with RAM_SAVE_FLAG_MEM_SIZE");
+                res = -EINVAL;
+                break;
+            }
+
+            /* Save position of section with the list of RAM blocks */
+            s->ram_list_offset = s->section_offset;
+
+            /* Get RAM block list */
+            if (!ram_block_list_from_stream(f, offset)) {
+                res = -EINVAL;
+            }
+            break;
+
+        case RAM_SAVE_FLAG_ZERO:
+            /* Nothing to do with zero page */
+            qemu_get_byte(f);
+            break;
+
+        case RAM_SAVE_FLAG_PAGE:
+        {
+            RAMPage page = { .block = block, .offset = offset };
+            uint8_t *data;
+            ssize_t count;
+
+            count = qemu_peek_buffer(f, &data, page_size, 0);
+            qemu_file_skip(f, count);
+            if (count != page_size) {
+                /* I/O error */
+                break;
+            }
+
+            res = ram_save_page(s, &page, data);
+
+            /* Update normal page count */
+            ram_ctx.normal_pages++;
+            break;
+        }
+
+        case RAM_SAVE_FLAG_EOS:
+            /* Normal exit */
+            break;
+
+        default:
+            error_report("RAM page with unknown combination of flags 0x%x", flags);
+            res = -EINVAL;
+
+        }
+
+        /* Make additional check for file errors */
+        if (!res) {
+            res = qemu_file_get_error(f);
+        }
+    }
+
+    /* Flush page coalescing buffer */
+    if (!res) {
+        res = ram_save_page_flush(s);
+    }
+
+    return res;
+}
+
+static int default_save(QEMUFile *f, void *opaque, int version_id)
+{
+    StateSaveCtx *s = (StateSaveCtx *) opaque;
+
+    if (!s->ram_offset) {
+        error_report("Unexpected (non-iterable device state) section");
+        return -EINVAL;
+    }
+
+    if (!s->device_offset) {
+        s->device_offset = s->section_offset;
+        /* Save the rest of vmstate, including non-iterable device state */
+        return save_state_complete(s);
+    }
+
+    /* Should never get here */
+    assert(false);
+    return -EINVAL;
+}
+
+static int save_state_complete(StateSaveCtx *s)
+{
+    QEMUFile *f = s->f_fd;
+    int64_t eof_pos;
+    int64_t pos;
+
+    /* Current read offset */
+    pos = qemu_ftell2(f);
+
+    /* vmstate magic */
+    qemu_put_be32(s->f_vmstate, VMSTATE_HEADER_MAGIC);
+    /* Target page size */
+    qemu_put_be32(s->f_vmstate, page_size);
+    /* Number of non-zero pages */
+    qemu_put_be64(s->f_vmstate, ram_ctx.normal_pages);
+
+    /* Offsets relative to QEMU_VM_FILE_MAGIC: */
+
+    /* RAM block list section */
+    qemu_put_be32(s->f_vmstate, s->ram_list_offset);
+    /*
+     * First non-iterable device section.
+     *
+     * Partial RAM sections are skipped in the vmstate stream so
+     * ram_offset shall become the device_offset.
+     */
+    qemu_put_be32(s->f_vmstate, s->ram_offset);
+    /* Slot for eof_offset */
+    qemu_put_be32(s->f_vmstate, 0);
+
+    /*
+     * At the completion stage we save the leading part of migration stream
+     * which contains header, configuration section and the 'ram' section
+     * with QEMU_VM_SECTION_FULL type containing list of RAM blocks.
+     *
+     * Migration leader ends at the first partial RAM section.
+     * QEMU_VM_SECTION_PART token for that section is pointed by s->ram_offset.
+     */
+    qemu_put_buffer(s->f_vmstate, s->ioc_leader->data, s->ram_offset);
+    /*
+     * Trailing part with non-iterable device state.
+     *
+     * First goes the section header which was skipped with QEMUFile
+     * so we need to take it from s->section_header.
+     */
+    qemu_put_buffer(s->f_vmstate, s->section_header, pos - s->device_offset);
+
+    /* Finally we forward the tail of migration stream to vmstate on backing */
+    qemu_fsplice_tail(s->f_vmstate, f);
+    eof_pos = qemu_ftell(s->f_vmstate);
+
+    /* Put eof_offset to the slot in vmstate stream: */
+
+    /* Simulate negative seek() */
+    qemu_update_position(s->f_vmstate,
+                         (size_t)(ssize_t) (VMSTATE_HEADER_EOF_OFFSET - eof_pos));
+    /* Write to the eof_offset header field */
+    qemu_put_be32(s->f_vmstate, eof_pos - VMSTATE_HEADER_SIZE);
+    qemu_fflush(s->f_vmstate);
+
+    return 1;
+}
+
+static int save_section_config(StateSaveCtx *s)
+{
+    QEMUFile *f = s->f_fd;
+    uint32_t id_len;
+
+    id_len = qemu_get_be32(f);
+    if (id_len > 255) {
+        error_report("Corrupted configuration section");
+        return -EINVAL;
+    }
+    qemu_file_skip(f, id_len);
+
+    return 0;
+}
+
+static int save_section_start_full(StateSaveCtx *s)
+{
+    QEMUFile *f = s->f_fd;
+    SectionHandlersEntry *se;
+    int section_id;
+    int instance_id;
+    int version_id;
+    char idstr[256];
+    int res;
+
+    section_id = qemu_get_be32(f);
+    if (!qemu_get_counted_string(f, idstr)) {
+        error_report("Failed to get section name(%d)", section_id);
+        return -EINVAL;
+    }
+
+    instance_id = qemu_get_be32(f);
+    version_id = qemu_get_be32(f);
+
+    /* Find section handler */
+    se = find_se(idstr, instance_id);
+    if (!se) {
+        se = &section_handlers.default_;
+    } else if (version_id > se->version_id) {
+        /* Validate version */
+        error_report("Unsupported version %d for '%s' v%d",
+                version_id, idstr, se->version_id);
+        return -EINVAL;
+    }
+
+    se->real_section_id = section_id;
+    se->real_version_id = version_id;
+
+    res = se->ops->save_state(f, s, se->real_version_id);
+    /* Positive value indicates completion, no need to check footer */
+    if (res) {
+        return res;
+    }
+
+    /* Check section footer */
+    if (!check_section_footer(f, se)) {
+        return -EINVAL;
+    }
+
+    return 0;
+}
+
+static int save_section_part_end(StateSaveCtx *s)
+{
+    QEMUFile *f = s->f_fd;
+    SectionHandlersEntry *se;
+    int section_id;
+    int res;
+
+    /* First section with QEMU_VM_SECTION_PART type must be a 'ram' section */
+    if (!s->ram_offset) {
+        s->ram_offset = s->section_offset;
+    }
+
+    section_id = qemu_get_be32(f);
+
+    /* Lookup section handler by numeric section id */
+    se = find_se_by_section_id(section_id);
+    if (!se) {
+        error_report("Unknown section id %d", section_id);
+        return -EINVAL;
+    }
+
+    res = se->ops->save_state(f, s, se->real_version_id);
+    /* With partial sections we won't have positive success retcodes */
+    if (res) {
+        return res;
+    }
+
+    /* Check section footer */
+    if (!check_section_footer(f, se)) {
+        return -EINVAL;
+    }
+
+    return 0;
+}
+
+static int save_state_header(StateSaveCtx *s)
+{
+    QEMUFile *f = s->f_fd;
+    uint32_t v;
+
+    /* Validate qemu magic */
+    v = qemu_get_be32(f);
+    if (v != QEMU_VM_FILE_MAGIC) {
+        error_report("Not a migration stream");
+        return -EINVAL;
+    }
+
+    v = qemu_get_be32(f);
+    if (v == QEMU_VM_FILE_VERSION_COMPAT) {
+        error_report("SaveVM v2 format is obsolete");
+        return -EINVAL;
+    }
+
+    if (v != QEMU_VM_FILE_VERSION) {
+        error_report("Unsupported migration stream version");
+        return -EINVAL;
+    }
+
+    return 0;
+}
+
 int coroutine_fn save_state_main(StateSaveCtx *s)
 {
-    /* TODO: implement */
-    return 0;
+    QEMUFile *f = s->f_fd;
+    uint8_t *buf;
+    uint8_t section_type;
+    int res = 0;
+
+    /* Deal with migration stream header */
+    res = save_state_header(s);
+    if (res) {
+        /* Check for file errors in case we have -EINVAL */
+        save_state_check_errors(s, &res);
+        return res;
+    }
+
+    while (!res) {
+        /* Update current section offset */
+        s->section_offset = qemu_ftell2(f);
+
+        /*
+         * We need to keep some data from the beginning of each section.
+         *
+         * When first non-iterable device section is reached and we are going
+         * to write to the vmstate stream in 'default_handler', it is used to
+         * restore the already skipped part of migration stream.
+         */
+        qemu_peek_buffer(f, &buf, sizeof(s->section_header), 0);
+        memcpy(s->section_header, buf, sizeof(s->section_header));
+
+        /* Read section type token */
+        section_type = qemu_get_byte(f);
+
+        switch (section_type) {
+        case QEMU_VM_CONFIGURATION:
+            res = save_section_config(s);
+            break;
+
+        case QEMU_VM_SECTION_FULL:
+        case QEMU_VM_SECTION_START:
+            res = save_section_start_full(s);
+            break;
+
+        case QEMU_VM_SECTION_PART:
+        case QEMU_VM_SECTION_END:
+            res = save_section_part_end(s);
+            break;
+
+        case QEMU_VM_EOF:
+            /*
+             * End of migration stream.
+             *
+             * Normally we will never get here since the ending part of migration
+             * stream is a series of QEMU_VM_SECTION_FULL sections holding
+             * state for non-iterable devices. In our case all those sections
+             * are saved with a single call to save_section_start_full() once
+             * we get an unknown section id and invoke default handler.
+             */
+            res = -EINVAL;
+            break;
+
+        default:
+            error_report("Unknown section type %d", section_type);
+            res = -EINVAL;
+
+        }
+
+        /* Additional check for file errors */
+        save_state_check_errors(s, &res);
+    }
+
+    /* Replace positive retcode with 0 */
+    return MIN(res, 0);
 }
 
 int coroutine_fn load_state_main(StateLoadCtx *s)
diff --git a/qemu-snapshot.c b/qemu-snapshot.c
index 7ac4ef66c4..d434b8f245 100644
--- a/qemu-snapshot.c
+++ b/qemu-snapshot.c
@@ -94,7 +94,24 @@ static void init_save_context(void)
 
 static void destroy_save_context(void)
 {
-    /* TODO: implement */
+    StateSaveCtx *s = get_save_context();
+
+    if (s->f_vmstate) {
+        qemu_fclose(s->f_vmstate);
+    }
+    if (s->blk) {
+        blk_flush(s->blk);
+        blk_unref(s->blk);
+    }
+    if (s->zero_buf) {
+        qemu_vfree(s->zero_buf);
+    }
+    if (s->ioc_leader) {
+        object_unref(OBJECT(s->ioc_leader));
+    }
+    if (s->ioc_pages) {
+        object_unref(OBJECT(s->ioc_pages));
+    }
 }
 
 static void init_load_context(void)
@@ -134,6 +151,9 @@ static void enter_co_bh(void *opaque)
 static void coroutine_fn snapshot_save_co(void *opaque)
 {
     StateSaveCtx *s = get_save_context();
+    QIOChannel *ioc_fd;
+    uint8_t *buf;
+    size_t count;
     int res = -1;
 
     init_save_context();
@@ -145,6 +165,40 @@ static void coroutine_fn snapshot_save_co(void *opaque)
         goto fail;
     }
 
+    /* QEMUFile on vmstate */
+    s->f_vmstate = qemu_fopen_bdrv_vmstate(blk_bs(s->blk), 1);
+    qemu_file_set_blocking(s->f_vmstate, false);
+
+    /* QEMUFile on migration fd */
+    ioc_fd = qio_channel_new_fd(params.fd, &error_fatal);
+    qio_channel_set_name(QIO_CHANNEL(ioc_fd), "migration-channel-incoming");
+    s->f_fd = qemu_fopen_channel_input(ioc_fd);
+    object_unref(OBJECT(ioc_fd));
+    /* Use non-blocking mode in coroutine */
+    qemu_file_set_blocking(s->f_fd, false);
+
+    /* Buffer channel to store leading part of migration stream */
+    s->ioc_leader = qio_channel_buffer_new(INPLACE_READ_MAX);
+    qio_channel_set_name(QIO_CHANNEL(s->ioc_leader), "migration-leader-buffer");
+
+    /* Page coalescing buffer */
+    s->ioc_pages = qio_channel_buffer_new(128 * 1024);
+    qio_channel_set_name(QIO_CHANNEL(s->ioc_pages), "migration-page-buffer");
+
+    /* Bounce buffer to fill unwritten extents in image backing */
+    s->zero_buf = qemu_blockalign0(blk_bs(s->blk), slice_size);
+
+    /*
+     * Here we stash the leading part of migration stream without promoting read
+     * position. Later we'll make use of it when writing the vmstate stream.
+     */
+    count = qemu_peek_buffer(s->f_fd, &buf, INPLACE_READ_MAX, 0);
+    res = qemu_file_get_error(s->f_fd);
+    if (res < 0) {
+        goto fail;
+    }
+    qio_channel_write(QIO_CHANNEL(s->ioc_leader), (char *) buf, count, NULL);
+
     res = save_state_main(s);
     if (res) {
         error_report("Failed to save snapshot: %s", strerror(-res));
-- 
2.27.0



^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [RFC PATCH v1 6/7] migration/snapshot: Implementation of qemu-snapshot load path
  2021-05-12 19:26 [RFC PATCH v1 0/7] migration/snapshot: External snapshot utility Andrey Gruzdev
                   ` (4 preceding siblings ...)
  2021-05-12 19:26 ` [RFC PATCH v1 5/7] migration/snapshot: Implementation of qemu-snapshot save path Andrey Gruzdev
@ 2021-05-12 19:26 ` Andrey Gruzdev
  2021-05-12 19:26 ` [RFC PATCH v1 7/7] migration/snapshot: Implementation of qemu-snapshot load path in postcopy mode Andrey Gruzdev
  2021-05-12 20:18 ` [RFC PATCH v1 0/7] migration/snapshot: External snapshot utility no-reply
  7 siblings, 0 replies; 9+ messages in thread
From: Andrey Gruzdev @ 2021-05-12 19:26 UTC (permalink / raw)
  To: qemu-devel
  Cc: Den Lunev, Vladimir Sementsov-Ogievskiy, Eric Blake,
	Paolo Bonzini, Juan Quintela, Dr . David Alan Gilbert,
	Markus Armbruster, Peter Xu, David Hildenbrand, Andrey Gruzdev

This part implements snapshot loading in precopy mode.

Signed-off-by: Andrey Gruzdev <andrey.gruzdev@virtuozzo.com>
---
 include/qemu-snapshot.h |  24 +-
 qemu-snapshot-vm.c      | 588 +++++++++++++++++++++++++++++++++++++++-
 qemu-snapshot.c         |  47 +++-
 3 files changed, 654 insertions(+), 5 deletions(-)

diff --git a/include/qemu-snapshot.h b/include/qemu-snapshot.h
index 52519f76c4..aae730d70e 100644
--- a/include/qemu-snapshot.h
+++ b/include/qemu-snapshot.h
@@ -34,6 +34,13 @@
 /* RAM slice size for snapshot revert */
 #define SLICE_SIZE_REVERT           (16 * PAGE_SIZE_MAX)
 
+/* AIO transfer size */
+#define AIO_TRANSFER_SIZE           BDRV_CLUSTER_SIZE
+/* AIO ring size */
+#define AIO_RING_SIZE               64
+/* AIO ring in-flight limit */
+#define AIO_RING_INFLIGHT           16
+
 typedef struct AioRing AioRing;
 
 typedef struct AioRingRequest {
@@ -88,7 +95,20 @@ typedef struct StateSaveCtx {
 } StateSaveCtx;
 
 typedef struct StateLoadCtx {
-    BlockBackend *blk;          /* Block backend */
+    BlockBackend *blk;              /* Block backend */
+    QEMUFile *f_fd;                 /* QEMUFile for outgoing stream */
+    QEMUFile *f_vmstate;            /* QEMUFile for vmstate backing */
+
+    QIOChannelBuffer *ioc_leader;   /* vmstate stream leader */
+
+    AioRing *aio_ring;              /* AIO ring */
+
+    /* vmstate offset of the section containing list of RAM blocks */
+    int64_t ram_list_offset;
+    /* vmstate offset of the first non-iterable device section */
+    int64_t device_offset;
+    /* vmstate EOF */
+    int64_t eof_offset;
 } StateLoadCtx;
 
 extern int64_t page_size;       /* Page size */
@@ -100,6 +120,8 @@ extern int slice_bits;          /* RAM slice size bits */
 
 void ram_init_state(void);
 void ram_destroy_state(void);
+ssize_t coroutine_fn ram_load_aio_co(AioRingRequest *req);
+
 StateSaveCtx *get_save_context(void);
 StateLoadCtx *get_load_context(void);
 int coroutine_fn save_state_main(StateSaveCtx *s);
diff --git a/qemu-snapshot-vm.c b/qemu-snapshot-vm.c
index 2d8f2d3d79..dae5f84b80 100644
--- a/qemu-snapshot-vm.c
+++ b/qemu-snapshot-vm.c
@@ -57,6 +57,11 @@ typedef struct RAMPage {
 /* RAM transfer context */
 typedef struct RAMCtx {
     int64_t normal_pages;       /* Total number of normal pages */
+    int64_t loaded_pages;       /* Number of normal pages loaded */
+
+    RAMPage last_page;          /* Last loaded page */
+
+    RAMBlock *last_sent_block;  /* RAM block of last sent page */
 
     /* RAM block list head */
     QSIMPLEQ_HEAD(, RAMBlock) ram_block_list;
@@ -100,17 +105,26 @@ typedef struct SectionHandlers {
 
 /* Forward declarations */
 static int default_save(QEMUFile *f, void *opaque, int version_id);
+static int default_load(QEMUFile *f, void *opaque, int version_id);
+
 static int ram_save(QEMUFile *f, void *opaque, int version_id);
+static int ram_load(QEMUFile *f, void *opaque, int version_id);
+static int ram_load_iterate(QEMUFile *f, void *opaque, int version_id);
+
 static int save_state_complete(StateSaveCtx *s);
+static int load_section_start_full(StateLoadCtx *s);
 
 static RAMCtx ram_ctx;
 
 static SectionHandlerOps default_handler_ops = {
     .save_state = default_save,
+    .load_state = default_load,
 };
 
 static SectionHandlerOps ram_handler_ops = {
     .save_state = ram_save,
+    .load_state = ram_load,
+    .load_state_iterate = ram_load_iterate,
 };
 
 static SectionHandlers section_handlers = {
@@ -218,6 +232,19 @@ static RAMBlock *ram_block_by_idstr(const char *idstr)
     return NULL;
 }
 
+static RAMBlock *ram_block_by_bdrv_offset(int64_t bdrv_offset)
+{
+    RAMBlock *block;
+
+    QSIMPLEQ_FOREACH(block, &ram_ctx.ram_block_list, next) {
+        if (ram_bdrv_offset_in_block(block, bdrv_offset)) {
+            return block;
+        }
+    }
+
+    return NULL;
+}
+
 static RAMBlock *ram_block_from_stream(QEMUFile *f, int flags)
 {
     static RAMBlock *block;
@@ -803,10 +830,555 @@ int coroutine_fn save_state_main(StateSaveCtx *s)
     return MIN(res, 0);
 }
 
+static void load_state_check_errors(StateLoadCtx *s, int *res)
+{
+    /*
+     * Check for file errors on success. Replace generic -EINVAL
+     * retcode with file error if possible.
+     */
+    if (*res >= 0 || *res == -EINVAL) {
+        int f_res = qemu_file_get_error(s->f_fd);
+
+        if (!f_res) {
+            f_res = qemu_file_get_error(s->f_vmstate);
+        }
+        if (f_res) {
+            *res = f_res;
+        }
+    }
+}
+
+static void send_section_header_part_end(QEMUFile *f, SectionHandlersEntry *se,
+        uint8_t section_type)
+{
+    assert(section_type == QEMU_VM_SECTION_PART ||
+           section_type == QEMU_VM_SECTION_END);
+
+    qemu_put_byte(f, section_type);
+    qemu_put_be32(f, se->real_section_id);
+}
+
+static void send_section_footer(QEMUFile *f, SectionHandlersEntry *se)
+{
+    qemu_put_byte(f, QEMU_VM_SECTION_FOOTER);
+    qemu_put_be32(f, se->real_section_id);
+}
+
+static void send_page_header(QEMUFile *f, RAMBlock *block, int64_t offset)
+{
+    uint8_t hdr_buf[512];
+    int hdr_len = 8;
+
+    stq_be_p(hdr_buf, offset);
+    if (!(offset & RAM_SAVE_FLAG_CONTINUE)) {
+        int id_len;
+
+        id_len = strlen(block->idstr);
+        assert(id_len < 256);
+
+        hdr_buf[hdr_len] = id_len;
+        memcpy((hdr_buf + hdr_len + 1), block->idstr, id_len);
+
+        hdr_len += 1 + id_len;
+    }
+
+    qemu_put_buffer(f, hdr_buf, hdr_len);
+}
+
+static void send_zeropage(QEMUFile *f, RAMBlock *block, int64_t offset)
+{
+    send_page_header(f, block, offset | RAM_SAVE_FLAG_ZERO);
+    qemu_put_byte(f, 0);
+}
+
+static bool find_next_page(RAMPage *page)
+{
+    RAMCtx *ram = &ram_ctx;
+    RAMBlock *block = ram->last_page.block;
+    int64_t slice = ram->last_page.offset >> slice_bits;
+    bool full_round = false;
+    bool found = false;
+
+    if (!block) {
+restart:
+        block = QSIMPLEQ_FIRST(&ram->ram_block_list);
+        slice = 0;
+        full_round = true;
+    }
+
+    while (!found && block) {
+        slice = find_next_bit(block->bitmap, block->nr_slices, slice);
+        /* Can't find unsent slice in block? */
+        if (slice >= block->nr_slices) {
+            /* Try next block */
+            block = QSIMPLEQ_NEXT(block, next);
+            slice = 0;
+
+            continue;
+        }
+
+        found = true;
+    }
+
+    /*
+     * Re-start from the beginning if couldn't find unsent slice,
+     * but do it only once.
+     */
+    if (!found && !full_round) {
+        goto restart;
+    }
+
+    if (found) {
+        page->block = block;
+        page->offset = slice << slice_bits;
+    }
+
+    return found;
+}
+
+static inline
+void get_page_range(RAMPage *page, unsigned *length, unsigned max_length)
+{
+    int64_t start_slice;
+    int64_t end_slice;
+    int64_t tmp;
+
+    assert(QEMU_IS_ALIGNED(page->offset, slice_size));
+    assert(max_length >= slice_size);
+
+    start_slice = page->offset >> slice_bits;
+    end_slice = find_next_zero_bit(page->block->bitmap, page->block->nr_slices,
+                                   page->offset >> slice_bits);
+
+    tmp = (end_slice - start_slice) << slice_bits;
+    tmp = MIN(page->block->length - page->offset, tmp);
+
+    /*
+     * Length is always aligned to slice_size with the exception of case
+     * when it is the last slice in RAM block.
+     */
+    *length = MIN(max_length, tmp);
+}
+
+static inline
+void clear_page_range(RAMPage *page, unsigned length)
+{
+    assert(QEMU_IS_ALIGNED(page->offset, slice_size));
+    assert(length);
+
+    /*
+     * Page offsets are aligned to the slice boundary so we only need
+     * to round up length for the case when we load last slice in the block.
+     */
+    bitmap_clear(page->block->bitmap, page->offset >> slice_bits,
+                 ((length - 1) >> slice_bits) + 1);
+}
+
+ssize_t coroutine_fn ram_load_aio_co(AioRingRequest *req)
+{
+    return blk_pread((BlockBackend *) req->opaque, req->offset,
+            req->data, req->size);
+}
+
+static void coroutine_fn ram_load_submit_aio(StateLoadCtx *s)
+{
+    RAMCtx *ram = &ram_ctx;
+    AioRingRequest *req;
+
+    while ((req = aio_ring_get_request(s->aio_ring))) {
+        RAMPage page;
+        unsigned max_length = AIO_TRANSFER_SIZE;
+        unsigned length;
+
+        if (!find_next_page(&page)) {
+            break;
+        }
+        
+        /* Get range of contiguous pages that were not transferred yet */
+        get_page_range(&page, &length, max_length);
+        /* Clear range of pages to be queued for I/O */
+        clear_page_range(&page, length);
+
+        /* Used by find_next_page() */
+        ram->last_page.block = page.block;
+        ram->last_page.offset = page.offset + length;
+
+        /* Setup I/O request */
+        req->opaque = s->blk;
+        req->data = qemu_blockalign(blk_bs(s->blk), length);
+        req->offset = ram_bdrv_from_block_offset(page.block, page.offset);
+        req->size = length;
+
+        aio_ring_submit(s->aio_ring);
+    }
+}
+
+static int ram_load_complete_aio(StateLoadCtx *s, AioRingEvent *ev)
+{
+    QEMUFile *f = s->f_fd;
+    RAMCtx *ram = &ram_ctx;
+    RAMBlock *block = ram->last_sent_block;
+    void *bdrv_data = ev->origin->data;
+    int64_t bdrv_offset = ev->origin->offset;
+    ssize_t bdrv_count = ev->status;
+    int64_t offset;
+    int64_t flags = RAM_SAVE_FLAG_CONTINUE;
+    int pages = 0;
+
+    /* Need to switch to the another RAM block? */
+    if (!ram_bdrv_offset_in_block(block, bdrv_offset)) {
+        /*
+         * Lookup RAM block by BDRV offset cause in postcopy we
+         * can issue AIO loads from arbitrary blocks.
+         */
+        block = ram_block_by_bdrv_offset(bdrv_offset);
+        ram->last_sent_block = block;
+
+        /* Reset RAM_SAVE_FLAG_CONTINUE */
+        flags = 0;
+    }
+    offset = ram_block_offset_from_bdrv(block, bdrv_offset);
+
+    for (ssize_t count = 0; count < bdrv_count; count += page_size) {
+        if (buffer_is_zero(bdrv_data, page_size)) {
+            send_zeropage(f, block, (offset | flags));
+        } else {
+            send_page_header(f, block, (offset | RAM_SAVE_FLAG_PAGE | flags));
+            qemu_put_buffer_async(f, bdrv_data, page_size, false);
+
+            /* Update normal page count */
+            ram->loaded_pages++;
+        }
+
+        /*
+         * BDRV request shall never cross RAM block boundary so we can
+         * set RAM_SAVE_FLAG_CONTINUE here unconditionally.
+         */
+        flags = RAM_SAVE_FLAG_CONTINUE;
+
+        bdrv_data += page_size;
+        offset += page_size;
+        pages++;
+    }
+
+    /* Need to flush here cause we use qemu_put_buffer_async() */
+    qemu_fflush(f);
+
+    return pages;
+}
+
+static int coroutine_fn ram_load_pages(StateLoadCtx *s)
+{
+    AioRingEvent *event;
+    int res = 0;
+
+    /* Fill blockdev AIO queue */
+    ram_load_submit_aio(s);
+
+    /* Check for AIO completion event */
+    event = aio_ring_wait_event(s->aio_ring);
+    if (event) {
+        /* Check completion status */
+        res = event->status;
+        if (res > 0) {
+            res = ram_load_complete_aio(s, event);
+        }
+
+        qemu_vfree(event->origin->data);
+        aio_ring_complete(s->aio_ring);
+    }
+
+    return res;
+}
+
+static int coroutine_fn ram_load_pages_flush(StateLoadCtx *s)
+{
+    AioRingEvent *event;
+
+    while ((event = aio_ring_wait_event(s->aio_ring))) {
+        /* Check completion status */
+        if (event->status > 0) {
+            ram_load_complete_aio(s, event);
+        }
+
+        qemu_vfree(event->origin->data);
+        aio_ring_complete(s->aio_ring);
+    }
+
+    return 0;
+}
+
+static int ram_load(QEMUFile *f, void *opaque, int version_id)
+{
+    int compat_flags = RAM_SAVE_FLAG_MEM_SIZE | RAM_SAVE_FLAG_EOS;
+    int flags = 0;
+    int res = 0;
+
+    if (version_id != 4) {
+        error_report("Unsupported version %d for 'ram' handler v4", version_id);
+        return -EINVAL;
+    }
+
+    while (!res && !(flags & RAM_SAVE_FLAG_EOS)) {
+        int64_t offset;
+
+        offset = qemu_get_be64(f);
+        flags = offset & ~page_mask;
+        offset &= page_mask;
+
+        if (flags & ~compat_flags) {
+            error_report("Incompatible RAM page flags 0x%x", flags);
+            res = -EINVAL;
+            break;
+        }
+
+        switch (flags) {
+            case RAM_SAVE_FLAG_MEM_SIZE:
+                /* Fill RAM block list */
+                ram_block_list_from_stream(f, offset);
+                break;
+
+            case RAM_SAVE_FLAG_EOS:
+                /* Normal exit */
+                break;
+
+            default:
+                error_report("Unknown combination of RAM page flags 0x%x", flags);
+                res = -EINVAL;
+        }
+
+        /* Check for file errors even if everything looks good */
+        if (!res) {
+            res = qemu_file_get_error(f);
+        }
+    }
+
+    return res;
+}
+
+#define YIELD_AFTER_MS  500 /* ms */
+
+static int ram_load_iterate(QEMUFile *f, void *opaque, int version_id)
+{
+    StateLoadCtx *s = (StateLoadCtx *) opaque;
+    int64_t t_start;
+    int tmp_res;
+    int res = 1;
+
+    t_start = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+
+    for (int iter = 0; res > 0; iter++) {
+        res = ram_load_pages(s);
+
+        if (!(iter & 7)) {
+            int64_t t_cur = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+
+            if ((t_cur - t_start) > YIELD_AFTER_MS) {
+                break;
+            }
+        }
+    }
+
+    /* Zero retcode means that there're no more pages to load */
+    if (res >= 0) {
+        res = res ? 0 : 1;
+    }
+
+    /* Process pending AIO ring events */
+    tmp_res = ram_load_pages_flush(s);
+    res = tmp_res ? tmp_res : res;
+
+    /* Send EOS flag before section footer */
+    qemu_put_be64(s->f_fd, RAM_SAVE_FLAG_EOS);
+    qemu_fflush(s->f_fd);
+
+    return res;
+}
+
+static int ram_load_memory(StateLoadCtx *s)
+{
+    SectionHandlersEntry *se;
+    int res;
+
+    se = find_se("ram", 0);
+    assert(se && se->ops->load_state_iterate);
+
+    /* Send section header with QEMU_VM_SECTION_PART type */
+    send_section_header_part_end(s->f_fd, se, QEMU_VM_SECTION_PART);
+    res = se->ops->load_state_iterate(s->f_fd, s, se->real_version_id);
+    send_section_footer(s->f_fd, se);
+
+    return res;
+}
+
+static int default_load(QEMUFile *f, void *opaque, int version_id)
+{
+    error_report("Unexpected (non-iterable device state) section");
+    return -EINVAL;
+}
+
+static int load_state_header(StateLoadCtx *s)
+{
+    QEMUFile *f = s->f_vmstate;
+    int v;
+
+    /* Validate magic */
+    v = qemu_get_be32(f);
+    if (v != VMSTATE_HEADER_MAGIC) {
+        error_report("Not a valid snapshot");
+        return -EINVAL;
+    }
+
+    v = qemu_get_be32(f);
+    if (v != page_size) {
+        error_report("Incompatible page size: got %d expected %d",
+                     v, (int) page_size);
+        return -EINVAL;
+    }
+
+    /* Number of non-zero pages in all RAM blocks */
+    ram_ctx.normal_pages = qemu_get_be64(f);
+
+    /* vmstate stream offsets, counted from QEMU_VM_FILE_MAGIC */
+    s->ram_list_offset = qemu_get_be32(f);
+    s->device_offset = qemu_get_be32(f);
+    s->eof_offset = qemu_get_be32(f);
+
+    /* Check that offsets are within the limits */
+    if ((VMSTATE_HEADER_SIZE + s->device_offset) > INPLACE_READ_MAX ||
+            s->device_offset <= s->ram_list_offset) {
+        error_report("Corrupted snapshot header");
+        return -EINVAL;
+    }
+
+    /* Skip up to RAM block list section */
+    qemu_file_skip(f, s->ram_list_offset);
+
+    return 0;
+}
+
+static int load_state_ramlist(StateLoadCtx *s)
+{
+    QEMUFile *f = s->f_vmstate;
+    uint8_t section_type;
+    int res;
+
+    section_type = qemu_get_byte(f);
+
+    if (section_type == QEMU_VM_EOF) {
+        error_report("Unexpected EOF token");
+        return -EINVAL;
+    } else if (section_type != QEMU_VM_SECTION_FULL &&
+               section_type != QEMU_VM_SECTION_START) {
+        error_report("Unexpected section type %d", section_type);
+        return -EINVAL;
+    }
+
+    res = load_section_start_full(s);
+    if (!res) {
+        ram_block_list_init_bitmaps();
+    }
+
+    return res;
+}
+
+static int load_state_complete(StateLoadCtx *s)
+{
+    /* Forward non-iterable device state */
+    qemu_fsplice(s->f_fd, s->f_vmstate, s->eof_offset - s->device_offset);
+
+    qemu_fflush(s->f_fd);
+
+    return 1;
+}
+
+static int load_section_start_full(StateLoadCtx *s)
+{
+    QEMUFile *f = s->f_vmstate;
+    int section_id;
+    int instance_id;
+    int version_id;
+    char idstr[256];
+    SectionHandlersEntry *se;
+    int res;
+
+    section_id = qemu_get_be32(f);
+
+    if (!qemu_get_counted_string(f, idstr)) {
+        error_report("Failed to get section name(%d)", section_id);
+        return -EINVAL;
+    }
+
+    instance_id = qemu_get_be32(f);
+    version_id = qemu_get_be32(f);
+
+    /* Find section handler */
+    se = find_se(idstr, instance_id);
+    if (!se) {
+        se = &section_handlers.default_;
+    } else if (version_id > se->version_id) {
+        /* Validate version */
+        error_report("Unsupported version %d for '%s' v%d",
+                     version_id, idstr, se->version_id);
+        return -EINVAL;
+    }
+
+    se->real_section_id = section_id;
+    se->real_version_id = version_id;
+
+    res = se->ops->load_state(f, s, se->real_version_id);
+    if (res) {
+        return res;
+    }
+
+    if (!check_section_footer(f, se)) {
+        return -EINVAL;
+    }
+
+    return 0;
+}
+
+static int send_state_leader(StateLoadCtx *s)
+{
+    qemu_put_buffer(s->f_fd, s->ioc_leader->data + VMSTATE_HEADER_SIZE,
+                    s->device_offset);
+    return qemu_file_get_error(s->f_fd);
+}
+
 int coroutine_fn load_state_main(StateLoadCtx *s)
 {
-    /* TODO: implement */
-    return 0;
+    int res;
+
+    res = load_state_header(s);
+    if (res) {
+        goto fail;
+    }
+
+    res = load_state_ramlist(s);
+    if (res) {
+        goto fail;
+    }
+
+    res = send_state_leader(s);
+    if (res) {
+        goto fail;
+    }
+
+    do {
+        res = ram_load_memory(s);
+        /* Check for file errors */
+        load_state_check_errors(s, &res);
+    } while (!res);
+
+    if (res == 1) {
+        res = load_state_complete(s);
+    }
+
+fail:
+    load_state_check_errors(s, &res);
+
+    /* Replace positive retcode with 0 */
+    return MIN(res, 0);
 }
 
 /* Initialize snapshot RAM state */
@@ -815,10 +1387,20 @@ void ram_init_state(void)
     RAMCtx *ram = &ram_ctx;
 
     memset(ram, 0, sizeof(ram_ctx));
+
+    /* Initialize RAM block list head */
+    QSIMPLEQ_INIT(&ram->ram_block_list);
 }
 
 /* Destroy snapshot RAM state */
 void ram_destroy_state(void)
 {
-    /* TODO: implement */
+    RAMBlock *block;
+    RAMBlock *next_block;
+
+    /* Free RAM blocks */
+    QSIMPLEQ_FOREACH_SAFE(block, &ram_ctx.ram_block_list, next, next_block) {
+        g_free(block->bitmap);
+        g_free(block);
+    }
 }
diff --git a/qemu-snapshot.c b/qemu-snapshot.c
index d434b8f245..92956623f7 100644
--- a/qemu-snapshot.c
+++ b/qemu-snapshot.c
@@ -121,7 +121,20 @@ static void init_load_context(void)
 
 static void destroy_load_context(void)
 {
-    /* TODO: implement */
+    StateLoadCtx *s = get_load_context();
+
+    if (s->f_vmstate) {
+        qemu_fclose(s->f_vmstate);
+    }
+    if (s->blk) {
+        blk_unref(s->blk);
+    }
+    if (s->aio_ring) {
+        aio_ring_free(s->aio_ring);
+    }
+    if (s->ioc_leader) {
+        object_unref(OBJECT(s->ioc_leader));
+    }
 }
 
 static BlockBackend *image_open_opts(const char *optstr, QDict *options, int flags)
@@ -212,6 +225,9 @@ fail:
 static void coroutine_fn snapshot_load_co(void *opaque)
 {
     StateLoadCtx *s = get_load_context();
+    QIOChannel *ioc_fd;
+    uint8_t *buf;
+    size_t count;
     int res = -1;
 
     init_load_context();
@@ -223,6 +239,35 @@ static void coroutine_fn snapshot_load_co(void *opaque)
         goto fail;
     }
 
+    /* QEMUFile on vmstate */
+    s->f_vmstate = qemu_fopen_bdrv_vmstate(blk_bs(s->blk), 0);
+    qemu_file_set_blocking(s->f_vmstate, false);
+
+    /* QEMUFile on migration fd */
+    ioc_fd = qio_channel_new_fd(params.fd, NULL);
+    qio_channel_set_name(QIO_CHANNEL(ioc_fd), "migration-channel-outgoing");
+    s->f_fd = qemu_fopen_channel_output(ioc_fd);
+    object_unref(OBJECT(ioc_fd));
+    qemu_file_set_blocking(s->f_fd, false);
+
+    /* Buffer channel to store leading part of migration stream */
+    s->ioc_leader = qio_channel_buffer_new(INPLACE_READ_MAX);
+    qio_channel_set_name(QIO_CHANNEL(s->ioc_leader), "migration-leader-buffer");
+
+    /* AIO ring */
+    s->aio_ring = aio_ring_new(ram_load_aio_co, AIO_RING_SIZE, AIO_RING_INFLIGHT);
+
+    /*
+     * Here we stash the leading part of vmstate stream without promoting read
+     * position.
+     */
+    count = qemu_peek_buffer(s->f_vmstate, &buf, INPLACE_READ_MAX, 0);
+    res = qemu_file_get_error(s->f_vmstate);
+    if (res < 0) {
+        goto fail;
+    }
+    qio_channel_write(QIO_CHANNEL(s->ioc_leader), (char *) buf, count, NULL);
+
     res = load_state_main(s);
     if (res) {
         error_report("Failed to load snapshot: %s", strerror(-res));
-- 
2.27.0



^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [RFC PATCH v1 7/7] migration/snapshot: Implementation of qemu-snapshot load path in postcopy mode
  2021-05-12 19:26 [RFC PATCH v1 0/7] migration/snapshot: External snapshot utility Andrey Gruzdev
                   ` (5 preceding siblings ...)
  2021-05-12 19:26 ` [RFC PATCH v1 6/7] migration/snapshot: Implementation of qemu-snapshot load path Andrey Gruzdev
@ 2021-05-12 19:26 ` Andrey Gruzdev
  2021-05-12 20:18 ` [RFC PATCH v1 0/7] migration/snapshot: External snapshot utility no-reply
  7 siblings, 0 replies; 9+ messages in thread
From: Andrey Gruzdev @ 2021-05-12 19:26 UTC (permalink / raw)
  To: qemu-devel
  Cc: Den Lunev, Vladimir Sementsov-Ogievskiy, Eric Blake,
	Paolo Bonzini, Juan Quintela, Dr . David Alan Gilbert,
	Markus Armbruster, Peter Xu, David Hildenbrand, Andrey Gruzdev

The commit enables asynchronous snapshot loading using standard postcopy
migration mechanism on destination VM.

The point of switchover to postcopy is trivially selected based on
percentage of non-zero pages loaded in precopy.

Signed-off-by: Andrey Gruzdev <andrey.gruzdev@virtuozzo.com>
---
 include/qemu-snapshot.h |  12 +
 qemu-snapshot-vm.c      | 485 +++++++++++++++++++++++++++++++++++++++-
 qemu-snapshot.c         |  16 ++
 3 files changed, 508 insertions(+), 5 deletions(-)

diff --git a/include/qemu-snapshot.h b/include/qemu-snapshot.h
index aae730d70e..84a0c38e08 100644
--- a/include/qemu-snapshot.h
+++ b/include/qemu-snapshot.h
@@ -36,10 +36,14 @@
 
 /* AIO transfer size */
 #define AIO_TRANSFER_SIZE           BDRV_CLUSTER_SIZE
+/* AIO transfer size for postcopy */
+#define AIO_TRANSFER_SIZE_LOWLAT    (BDRV_CLUSTER_SIZE / 4)
 /* AIO ring size */
 #define AIO_RING_SIZE               64
 /* AIO ring in-flight limit */
 #define AIO_RING_INFLIGHT           16
+/* AIO ring in-flight limit for postcopy */
+#define AIO_RING_INFLIGHT_LOWLAT    4
 
 typedef struct AioRing AioRing;
 
@@ -97,12 +101,20 @@ typedef struct StateSaveCtx {
 typedef struct StateLoadCtx {
     BlockBackend *blk;              /* Block backend */
     QEMUFile *f_fd;                 /* QEMUFile for outgoing stream */
+    QEMUFile *f_rp_fd;              /* QEMUFile for return path stream */
     QEMUFile *f_vmstate;            /* QEMUFile for vmstate backing */
 
     QIOChannelBuffer *ioc_leader;   /* vmstate stream leader */
 
     AioRing *aio_ring;              /* AIO ring */
 
+    bool postcopy;                  /* From command-line --postcopy */
+    int postcopy_percent;           /* From command-line --postcopy */
+    bool in_postcopy;               /* In postcopy mode */
+
+    QemuThread rp_listen_thread;    /* Return path listening thread */
+    bool has_rp_listen_thread;      /* Have listening thread */
+
     /* vmstate offset of the section containing list of RAM blocks */
     int64_t ram_list_offset;
     /* vmstate offset of the first non-iterable device section */
diff --git a/qemu-snapshot-vm.c b/qemu-snapshot-vm.c
index dae5f84b80..76980520ea 100644
--- a/qemu-snapshot-vm.c
+++ b/qemu-snapshot-vm.c
@@ -40,7 +40,9 @@ typedef struct RAMBlock {
     int64_t nr_pages;           /* Page count */
     int64_t nr_slices;          /* Number of slices (for bitmap bookkeeping) */
 
-    unsigned long *bitmap;      /* Bitmap of RAM slices */
+    int64_t discard_offset;     /* Last page offset sent in precopy */
+
+    unsigned long *bitmap;      /* Bitmap of transferred RAM slices */
 
     /* Link into ram_list */
     QSIMPLEQ_ENTRY(RAMBlock) next;
@@ -54,17 +56,33 @@ typedef struct RAMPage {
     int64_t offset;             /* Page offset in RAM block */
 } RAMPage;
 
+/* Page request from destination in postcopy */
+typedef struct RAMPageRequest {
+    RAMBlock *block;            /* RAM block */
+    int64_t offset;             /* Offset in RAM block */
+    unsigned size;              /* Size of request */
+
+    /* Link into ram_ctx.page_req */
+    QSIMPLEQ_ENTRY(RAMPageRequest) next;
+} RAMPageRequest;
+
 /* RAM transfer context */
 typedef struct RAMCtx {
     int64_t normal_pages;       /* Total number of normal pages */
+    int64_t precopy_pages;      /* Normal pages to load in precopy */
     int64_t loaded_pages;       /* Number of normal pages loaded */
 
     RAMPage last_page;          /* Last loaded page */
 
     RAMBlock *last_sent_block;  /* RAM block of last sent page */
+    RAMBlock *last_req_block;   /* RAM block of last page request */
 
     /* RAM block list head */
     QSIMPLEQ_HEAD(, RAMBlock) ram_block_list;
+
+    /* Page request queue for postcopy */
+    QemuMutex page_req_mutex;
+    QSIMPLEQ_HEAD(, RAMPageRequest) page_req;
 } RAMCtx;
 
 /* Section handler ops */
@@ -848,6 +866,433 @@ static void load_state_check_errors(StateLoadCtx *s, int *res)
     }
 }
 
+static bool get_queued_page(RAMPage *page)
+{
+    RAMCtx *ram = &ram_ctx;
+
+    if (QSIMPLEQ_EMPTY_ATOMIC(&ram->page_req)) {
+        return false;
+    }
+
+    QEMU_LOCK_GUARD(&ram->page_req_mutex);
+
+    while (!QSIMPLEQ_EMPTY(&ram->page_req)) {
+        RAMPageRequest *entry = QSIMPLEQ_FIRST(&ram->page_req);
+        RAMBlock *block = entry->block;
+        int64_t slice = entry->offset >> slice_bits;
+
+        QSIMPLEQ_REMOVE_HEAD(&ram->page_req, next);
+        g_free(entry);
+
+        /*
+         * Test respective bit in RAM block's slice bitmap to check if
+         * we still haven't read that slice from the image.
+         */
+        if (test_bit(slice, block->bitmap)) {
+            page->block = block;
+            page->offset = slice << slice_bits;
+
+            return true;
+        }
+    }
+
+    return false;
+}
+
+static int queue_page_request(const char *idstr, int64_t offset, unsigned size)
+{
+    RAMCtx *ram = &ram_ctx;
+    RAMBlock *block;
+    RAMPageRequest *new_entry;
+
+    if (!idstr) {
+        block = ram->last_req_block;
+        if (!block) {
+            error_report("RP-REQ_PAGES: no previous block");
+            return -EINVAL;
+        }
+    } else {
+        block = ram_block_by_idstr(idstr);
+        if (!block) {
+            error_report("RP-REQ_PAGES: cannot find block %s", idstr);
+            return -EINVAL;
+        }
+
+        ram->last_req_block = block;
+    }
+
+    if (!ram_offset_in_block(block, offset)) {
+        error_report("RP-REQ_PAGES: offset 0x%" PRIx64 " out of RAM block %s",
+                     offset, idstr);
+        return -EINVAL;
+    }
+
+    new_entry = g_new0(RAMPageRequest, 1);
+    new_entry->block = block;
+    new_entry->offset = offset;
+    new_entry->size = size;
+
+    qemu_mutex_lock(&ram->page_req_mutex);
+    QSIMPLEQ_INSERT_TAIL(&ram->page_req, new_entry, next);
+    qemu_mutex_unlock(&ram->page_req_mutex);
+
+    return 0;
+}
+
+/* QEMU_VM_COMMAND sub-commands */
+typedef enum VmSubCmd {
+    MIG_CMD_OPEN_RETURN_PATH = 1,
+    MIG_CMD_POSTCOPY_ADVISE = 3,
+    MIG_CMD_POSTCOPY_LISTEN = 4,
+    MIG_CMD_POSTCOPY_RUN = 5,
+    MIG_CMD_POSTCOPY_RAM_DISCARD = 6,
+    MIG_CMD_PACKAGED = 7,
+} VmSubCmd;
+
+/* Return-path message types */
+typedef enum RpMsgType {
+    MIG_RP_MSG_INVALID = 0,
+    MIG_RP_MSG_SHUT = 1,
+    MIG_RP_MSG_REQ_PAGES_ID = 3,
+    MIG_RP_MSG_REQ_PAGES = 4,
+    MIG_RP_MSG_MAX = 7,
+} RpMsgType;
+
+typedef struct RpMsgArgs {
+    int len;
+    const char *name;
+} RpMsgArgs;
+
+/*
+ * Return-path message length/name indexed by message type.
+ * -1 value stands for variable message length.
+ */
+static RpMsgArgs rp_msg_args[] = {
+    [MIG_RP_MSG_INVALID]        = { .len = -1, .name = "INVALID" },
+    [MIG_RP_MSG_SHUT]           = { .len =  4, .name = "SHUT" },
+    [MIG_RP_MSG_REQ_PAGES_ID]   = { .len = -1, .name = "REQ_PAGES_ID" },
+    [MIG_RP_MSG_REQ_PAGES]      = { .len = 12, .name = "REQ_PAGES" },
+    [MIG_RP_MSG_MAX]            = { .len = -1, .name = "MAX" },
+};
+
+/* Return-path message processing thread */
+static void *rp_listen_thread(void *opaque)
+{
+    StateLoadCtx *s = (StateLoadCtx *) opaque;
+    QEMUFile *f = s->f_rp_fd;
+    int res = 0;
+
+    while (!res) {
+        uint8_t h_buf[512];
+        const int h_max_len = sizeof(h_buf);
+        int h_type;
+        int h_len;
+        size_t count;
+
+        h_type = qemu_get_be16(f);
+        h_len = qemu_get_be16(f);
+
+        /* Make early check for input errors */
+        res = qemu_file_get_error(f);
+        if (res) {
+            break;
+        }
+
+        /* Check message type */
+        if (h_type >= MIG_RP_MSG_MAX || h_type == MIG_RP_MSG_INVALID) {
+            error_report("RP: received invalid message type %d length %d",
+                         h_type, h_len);
+            res = -EINVAL;
+            break;
+        }
+
+        /* Check message length */
+        if (rp_msg_args[h_type].len != -1 && h_len != rp_msg_args[h_type].len) {
+            error_report("RP: received %s message len %d expected %d",
+                         rp_msg_args[h_type].name,
+                         h_len, rp_msg_args[h_type].len);
+            res = -EINVAL;
+            break;
+        } else if (h_len > h_max_len) {
+            error_report("RP: received %s message len %d max_len %d",
+                         rp_msg_args[h_type].name, h_len, h_max_len);
+            res = -EINVAL;
+            break;
+        }
+
+        count = qemu_get_buffer(f, h_buf, h_len);
+        if (count != h_len) {
+            break;
+        }
+
+        switch (h_type) {
+        case MIG_RP_MSG_SHUT:
+        {
+            int shut_error;
+
+            shut_error = be32_to_cpu(*(uint32_t *) h_buf);
+            if (shut_error) {
+                error_report("RP: sibling shutdown, error %d", shut_error);
+            }
+
+            /* Exit processing loop */
+            res = 1;
+            break;
+        }
+
+        case MIG_RP_MSG_REQ_PAGES:
+        case MIG_RP_MSG_REQ_PAGES_ID:
+        {
+            uint64_t offset;
+            uint32_t size;
+            char *id_str = NULL;
+
+            offset = be64_to_cpu(*(uint64_t *) (h_buf + 0));
+            size = be32_to_cpu(*(uint32_t *) (h_buf + 8));
+
+            if (h_type == MIG_RP_MSG_REQ_PAGES_ID) {
+                int h_parsed_len = rp_msg_args[MIG_RP_MSG_REQ_PAGES].len;
+
+                if (h_len > h_parsed_len) {
+                    int id_len;
+
+                    /* RAM block id string */
+                    id_len = h_buf[h_parsed_len];
+                    id_str = (char *) &h_buf[h_parsed_len + 1];
+                    id_str[id_len] = 0;
+
+                    h_parsed_len += id_len + 1;
+                }
+
+                if (h_parsed_len != h_len) {
+                    error_report("RP: received %s message len %d expected %d",
+                                 rp_msg_args[MIG_RP_MSG_REQ_PAGES_ID].name,
+                                 h_len, h_parsed_len);
+                    res = -EINVAL;
+                    break;
+                }
+            }
+
+            res = queue_page_request(id_str, offset, size);
+            break;
+        }
+
+        default:
+            error_report("RP: received unexpected message type %d len %d",
+                         h_type, h_len);
+            res = -EINVAL;
+        }
+    }
+
+    if (res >= 0) {
+        res = qemu_file_get_error(f);
+    }
+    if (res) {
+        error_report("RP: listen thread exit, error %d", res);
+    }
+
+    return NULL;
+}
+
+static void send_command(QEMUFile *f, int cmd, uint16_t len, uint8_t *data)
+{
+    qemu_put_byte(f, QEMU_VM_COMMAND);
+    qemu_put_be16(f, (uint16_t) cmd);
+    qemu_put_be16(f, len);
+
+    qemu_put_buffer_async(f, data, len, false);
+    qemu_fflush(f);
+}
+
+static void send_ram_block_discard(QEMUFile *f, RAMBlock *block)
+{
+    int id_len;
+    int msg_len;
+    uint8_t msg_buf[512];
+
+    id_len = strlen(block->idstr);
+    assert(id_len < 256);
+
+    /* Version, always 0 */
+    msg_buf[0] = 0;
+    /* RAM block ID string length, not including terminating 0 */
+    msg_buf[1] = id_len;
+    /* RAM block ID string with terminating zero */
+    memcpy(msg_buf + 2, block->idstr, (id_len + 1));
+    msg_len = 2 + id_len + 1;
+    /* Discard range offset */
+    stq_be_p(msg_buf + msg_len, block->discard_offset);
+    msg_len += 8;
+    /* Discard range length */
+    stq_be_p(msg_buf + msg_len, (block->length - block->discard_offset));
+    msg_len += 8;
+
+    send_command(f, MIG_CMD_POSTCOPY_RAM_DISCARD, msg_len, msg_buf);
+}
+
+static int send_each_ram_block_discard(QEMUFile *f)
+{
+    RAMBlock *block;
+    int res = 0;
+
+    QSIMPLEQ_FOREACH(block, &ram_ctx.ram_block_list, next) {
+        send_ram_block_discard(f, block);
+
+        res = qemu_file_get_error(f);
+        if (res) {
+            break;
+        }
+    }
+
+    return res;
+}
+
+static int prepare_postcopy(StateLoadCtx *s)
+{
+    QEMUFile *f = s->f_fd;
+    uint64_t tmp[2];
+    int res;
+
+    /* Number of pages to load in precopy before switching to postcopy */
+    ram_ctx.precopy_pages = ram_ctx.normal_pages * s->postcopy_percent / 100;
+
+    /* Send POSTCOPY_ADVISE */
+    tmp[0] = cpu_to_be64(page_size);
+    tmp[1] = cpu_to_be64(page_size);
+    send_command(f, MIG_CMD_POSTCOPY_ADVISE, 16, (uint8_t *) tmp);
+
+    /* Open return path on destination */
+    send_command(f, MIG_CMD_OPEN_RETURN_PATH, 0, NULL);
+
+    /*
+     * Check for file errors after sending POSTCOPY_ADVISE command
+     * since destination may already have closed input pipe in case
+     * postcopy had not been enabled in advance.
+     */
+    res = qemu_file_get_error(f);
+    if (!res) {
+        qemu_thread_create(&s->rp_listen_thread, "rp_thread",
+                           rp_listen_thread, s, QEMU_THREAD_JOINABLE);
+        s->has_rp_listen_thread = true;
+    }
+
+    return res;
+}
+
+static int start_postcopy(StateLoadCtx *s)
+{
+    QIOChannelBuffer *bioc;
+    QEMUFile *fb;
+    int eof_pos;
+    uint32_t length;
+    int res = 0;
+
+    /*
+     * Send RAM discards for each block's unsent part. Without discards,
+     * the userfault_fd code on destination will not trigger page requests
+     * as expected. Also, the UFFDIO_COPY ioctl that is used to place incoming
+     * page in postcopy would give an error if that page has not faulted
+     * with MISSING reason.
+     */
+    res = send_each_ram_block_discard(s->f_fd);
+    if (res) {
+        return res;
+    }
+
+    /*
+     * To perform a switch to postcopy on destination, we need to send
+     * commands and the device state data in the following order:
+     *   * MIG_CMD_POSTCOPY_LISTEN
+     *   * Non-iterable device state sections
+     *   * MIG_CMD_POSTCOPY_RUN
+     *
+     * All this has to be packaged into a single blob using MIG_CMD_PACKAGED
+     * command. While loading the device state we may trigger page transfer
+     * requests and the fd must be free to process those, thus the destination
+     * must read the whole device state off the fd before it starts
+     * processing it. To wrap it up in a package, QEMU buffer channel is used.
+     */
+    bioc = qio_channel_buffer_new(512 * 1024);
+    qio_channel_set_name(QIO_CHANNEL(bioc), "migration-postcopy-buffer");
+    fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
+    object_unref(OBJECT(bioc));
+
+    /* MIG_CMD_POSTCOPY_LISTEN command */
+    send_command(fb, MIG_CMD_POSTCOPY_LISTEN, 0, NULL);
+
+    /* The rest of non-iterable device state with an optional vmdesc section */
+    qemu_fsplice(fb, s->f_vmstate, s->eof_offset - s->device_offset);
+    qemu_fflush(fb);
+
+    /*
+     * vmdesc section may optionally be present at the end of the stream
+     * so we'll try to locate it and truncate the trailer.
+     */
+    eof_pos = bioc->usage - 1;
+
+    for (int offset = (bioc->usage - 11); offset >= 0; offset--) {
+        if (bioc->data[offset] == QEMU_VM_SECTION_FOOTER &&
+                bioc->data[offset + 5] == QEMU_VM_EOF &&
+                bioc->data[offset + 6] == QEMU_VM_VMDESCRIPTION) {
+            uint32_t expected_length = bioc->usage - (offset + 11);
+            uint32_t json_length;
+
+            json_length = be32_to_cpu(*(uint32_t  *) &bioc->data[offset + 7]);
+            if (json_length != expected_length) {
+                error_report("Corrupted vmdesc trailer: length %" PRIu32
+                             " expected %" PRIu32,
+                             json_length, expected_length);
+                res = -EINVAL;
+                goto fail;
+            }
+
+            eof_pos = offset + 5;
+            break;
+        }
+    }
+
+    /*
+     * When switching to postcopy we need to skip QEMU_VM_EOF token which
+     * normally is placed after the last non-iterable device state section
+     * (but before the vmdesc section).
+     *
+     * Skipping QEMU_VM_EOF is required to allow migration process to
+     * continue in postcopy. Vmdesc section also has to be skipped here.
+     */
+    if (eof_pos >= 0 && bioc->data[eof_pos] == QEMU_VM_EOF) {
+        bioc->usage = eof_pos;
+        bioc->offset = eof_pos;
+    }
+
+    /* Finally is the MIG_CMD_POSTCOPY_RUN command */
+    send_command(fb, MIG_CMD_POSTCOPY_RUN, 0, NULL);
+
+    /* Now send that blob */
+    length = cpu_to_be32(bioc->usage);
+    send_command(s->f_fd, MIG_CMD_PACKAGED, sizeof(length), (uint8_t *) &length);
+    qemu_put_buffer_async(s->f_fd, bioc->data, bioc->usage, false);
+    qemu_fflush(s->f_fd);
+
+    /*
+     * Switch to lower setting of in-flight requests limit
+     * to reduce page request latencies.
+     */
+    aio_ring_set_max_inflight(s->aio_ring, AIO_RING_INFLIGHT_LOWLAT);
+
+    s->in_postcopy = true;
+
+fail:
+    qemu_fclose(fb);
+    load_state_check_errors(s, &res);
+
+    return res;
+}
+
+static bool is_postcopy_switchover(StateLoadCtx *s)
+{
+    return ram_ctx.loaded_pages > ram_ctx.precopy_pages;
+}
+
 static void send_section_header_part_end(QEMUFile *f, SectionHandlersEntry *se,
         uint8_t section_type)
 {
@@ -987,10 +1432,13 @@ static void coroutine_fn ram_load_submit_aio(StateLoadCtx *s)
 
     while ((req = aio_ring_get_request(s->aio_ring))) {
         RAMPage page;
-        unsigned max_length = AIO_TRANSFER_SIZE;
+        unsigned max_length = s->in_postcopy ? AIO_TRANSFER_SIZE_LOWLAT :
+                AIO_TRANSFER_SIZE;
         unsigned length;
+        bool urgent;
 
-        if (!find_next_page(&page)) {
+        urgent = get_queued_page(&page);
+        if (!urgent && !find_next_page(&page)) {
             break;
         }
         
@@ -1003,6 +1451,9 @@ static void coroutine_fn ram_load_submit_aio(StateLoadCtx *s)
         ram->last_page.block = page.block;
         ram->last_page.offset = page.offset + length;
 
+        /* Used by send_ram_block_discard() */
+        page.block->discard_offset = ram->last_page.offset;
+
         /* Setup I/O request */
         req->opaque = s->blk;
         req->data = qemu_blockalign(blk_bs(s->blk), length);
@@ -1284,8 +1735,13 @@ static int load_state_ramlist(StateLoadCtx *s)
 
 static int load_state_complete(StateLoadCtx *s)
 {
-    /* Forward non-iterable device state */
-    qemu_fsplice(s->f_fd, s->f_vmstate, s->eof_offset - s->device_offset);
+    if (!s->in_postcopy) {
+        /* Forward non-iterable device state */
+        qemu_fsplice(s->f_fd, s->f_vmstate, s->eof_offset - s->device_offset);
+    } else {
+        /* Send terminating QEMU_VM_EOF if in postcopy */
+        qemu_put_byte(s->f_fd, QEMU_VM_EOF);
+    }
 
     qemu_fflush(s->f_fd);
 
@@ -1364,10 +1820,22 @@ int coroutine_fn load_state_main(StateLoadCtx *s)
         goto fail;
     }
 
+    if (s->postcopy) {
+        res = prepare_postcopy(s);
+        if (res) {
+            goto fail;
+        }
+    }
+
     do {
         res = ram_load_memory(s);
         /* Check for file errors */
         load_state_check_errors(s, &res);
+
+        /* Switch to postcopy? */
+        if (!res && s->postcopy && !s->in_postcopy && is_postcopy_switchover(s)) {
+            res = start_postcopy(s);
+        }
     } while (!res);
 
     if (res == 1) {
@@ -1390,6 +1858,10 @@ void ram_init_state(void)
 
     /* Initialize RAM block list head */
     QSIMPLEQ_INIT(&ram->ram_block_list);
+
+    /* Initialize postcopy page request queue */
+    qemu_mutex_init(&ram->page_req_mutex);
+    QSIMPLEQ_INIT(&ram->page_req);
 }
 
 /* Destroy snapshot RAM state */
@@ -1403,4 +1875,7 @@ void ram_destroy_state(void)
         g_free(block->bitmap);
         g_free(block);
     }
+
+    /* Destroy page request mutex */
+    qemu_mutex_destroy(&ram_ctx.page_req_mutex);
 }
diff --git a/qemu-snapshot.c b/qemu-snapshot.c
index 92956623f7..29d954c5d6 100644
--- a/qemu-snapshot.c
+++ b/qemu-snapshot.c
@@ -123,6 +123,10 @@ static void destroy_load_context(void)
 {
     StateLoadCtx *s = get_load_context();
 
+    if (s->has_rp_listen_thread) {
+        qemu_thread_join(&s->rp_listen_thread);
+    }
+
     if (s->f_vmstate) {
         qemu_fclose(s->f_vmstate);
     }
@@ -226,12 +230,16 @@ static void coroutine_fn snapshot_load_co(void *opaque)
 {
     StateLoadCtx *s = get_load_context();
     QIOChannel *ioc_fd;
+    QIOChannel *ioc_rp_fd;
     uint8_t *buf;
     size_t count;
     int res = -1;
 
     init_load_context();
 
+    s->postcopy = params.postcopy;
+    s->postcopy_percent = params.postcopy_percent;
+
     /* Block backend */
     s->blk = image_open_opts(params.blk_optstr, params.blk_options,
                              params.blk_flags);
@@ -250,6 +258,14 @@ static void coroutine_fn snapshot_load_co(void *opaque)
     object_unref(OBJECT(ioc_fd));
     qemu_file_set_blocking(s->f_fd, false);
 
+    /* QEMUFile on return path fd if we are going to use postcopy */
+    if (params.postcopy) {
+        ioc_rp_fd = qio_channel_new_fd(params.rp_fd, NULL);
+        qio_channel_set_name(QIO_CHANNEL(ioc_fd), "migration-channel-rp");
+        s->f_rp_fd = qemu_fopen_channel_input(ioc_rp_fd);
+        object_unref(OBJECT(ioc_rp_fd));
+    }
+
     /* Buffer channel to store leading part of migration stream */
     s->ioc_leader = qio_channel_buffer_new(INPLACE_READ_MAX);
     qio_channel_set_name(QIO_CHANNEL(s->ioc_leader), "migration-leader-buffer");
-- 
2.27.0



^ permalink raw reply related	[flat|nested] 9+ messages in thread

* Re: [RFC PATCH v1 0/7] migration/snapshot: External snapshot utility
  2021-05-12 19:26 [RFC PATCH v1 0/7] migration/snapshot: External snapshot utility Andrey Gruzdev
                   ` (6 preceding siblings ...)
  2021-05-12 19:26 ` [RFC PATCH v1 7/7] migration/snapshot: Implementation of qemu-snapshot load path in postcopy mode Andrey Gruzdev
@ 2021-05-12 20:18 ` no-reply
  7 siblings, 0 replies; 9+ messages in thread
From: no-reply @ 2021-05-12 20:18 UTC (permalink / raw)
  To: andrey.gruzdev
  Cc: vsementsov, quintela, armbru, david, qemu-devel, peterx,
	dgilbert, pbonzini, den, andrey.gruzdev

Patchew URL: https://patchew.org/QEMU/20210512192619.537268-1-andrey.gruzdev@virtuozzo.com/



Hi,

This series seems to have some coding style problems. See output below for
more information:

Type: series
Message-id: 20210512192619.537268-1-andrey.gruzdev@virtuozzo.com
Subject: [RFC PATCH v1 0/7] migration/snapshot: External snapshot utility

=== TEST SCRIPT BEGIN ===
#!/bin/bash
git rev-parse base > /dev/null || exit 0
git config --local diff.renamelimit 0
git config --local diff.renames True
git config --local diff.algorithm histogram
./scripts/checkpatch.pl --mailback base..
=== TEST SCRIPT END ===

Updating 3c8cf5a9c21ff8782164d1def7f44bd888713384
From https://github.com/patchew-project/qemu
   3158964..3e9f48b  master     -> master
 - [tag update]      patchew/20210511163151.45167-1-kwolf@redhat.com -> patchew/20210511163151.45167-1-kwolf@redhat.com
 * [new tag]         patchew/20210512192619.537268-1-andrey.gruzdev@virtuozzo.com -> patchew/20210512192619.537268-1-andrey.gruzdev@virtuozzo.com
Switched to a new branch 'test'
87d9eb7 migration/snapshot: Implementation of qemu-snapshot load path in postcopy mode
dece241 migration/snapshot: Implementation of qemu-snapshot load path
cbaa5b9 migration/snapshot: Implementation of qemu-snapshot save path
f9dd5c5 migration/snapshot: Block layer AIO support in qemu-snapshot
ed9b73d migration/snapshot: Move RAM_SAVE_FLAG_xxx defines to migration/ram.h
92f9263 migration/snapshot: Introduce qemu_ftell2() routine
32c347a migration/snapshot: Introduce qemu-snapshot tool

=== OUTPUT BEGIN ===
1/7 Checking commit 32c347a19ef4 (migration/snapshot: Introduce qemu-snapshot tool)
Use of uninitialized value $acpi_testexpected in string eq at ./scripts/checkpatch.pl line 1529.
WARNING: added, moved or deleted file(s), does MAINTAINERS need updating?
#19: 
new file mode 100644

WARNING: line over 80 characters
#227: FILE: qemu-snapshot.c:63:
+    int postcopy_percent;       /* Start postcopy after % of normal pages loaded */

WARNING: line over 80 characters
#274: FILE: qemu-snapshot.c:110:
+static BlockBackend *image_open_opts(const char *optstr, QDict *options, int flags)

ERROR: switch and case should be at the same indent
#414: FILE: qemu-snapshot.c:250:
+        switch (c) {
+            case '?':
[...]
+            case 'h':
[...]
+            case 'V':
[...]
+            case 'T':
[...]
+            case 'r':
[...]
+            case OPTION_POSTCOPY:
[...]
+            case OPTION_PAGE_SIZE:
[...]
+            case OPTION_URI:
[...]
+            default:

ERROR: trailing whitespace
#436: FILE: qemu-snapshot.c:272:
+                $

ERROR: consider using qemu_strtol in preference to strtol
#449: FILE: qemu-snapshot.c:285:
+                postcopy_percent = strtol(optarg, &r, 10);

WARNING: line over 80 characters
#450: FILE: qemu-snapshot.c:286:
+                if (*r != '\0' || postcopy_percent < 0 || postcopy_percent > 100) {

ERROR: consider using qemu_strtol in preference to strtol
#468: FILE: qemu-snapshot.c:304:
+                target_page_size = strtol(optarg, &r, 0);

WARNING: line over 80 characters
#469: FILE: qemu-snapshot.c:305:
+                if (*r != '\0' || (target_page_size & (target_page_size - 1)) != 0 ||

ERROR: space required after that ',' (ctx:VxV)
#494: FILE: qemu-snapshot.c:330:
+                    fd = strtol(p, &r,10);
                                      ^

ERROR: consider using qemu_strtol in preference to strtol
#494: FILE: qemu-snapshot.c:330:
+                    fd = strtol(p, &r,10);

total: 6 errors, 5 warnings, 563 lines checked

Patch 1/7 has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.

2/7 Checking commit 92f92637b84c (migration/snapshot: Introduce qemu_ftell2() routine)
3/7 Checking commit ed9b73da773a (migration/snapshot: Move RAM_SAVE_FLAG_xxx defines to migration/ram.h)
WARNING: Block comments use a leading /* on a separate line
#51: FILE: migration/ram.h:36:
+/* RAM_SAVE_FLAG_ZERO used to be named RAM_SAVE_FLAG_COMPRESS, it

total: 0 errors, 1 warnings, 44 lines checked

Patch 3/7 has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.
4/7 Checking commit f9dd5c542600 (migration/snapshot: Block layer AIO support in qemu-snapshot)
Use of uninitialized value $acpi_testexpected in string eq at ./scripts/checkpatch.pl line 1529.
WARNING: added, moved or deleted file(s), does MAINTAINERS need updating?
#81: 
new file mode 100644

total: 0 errors, 1 warnings, 314 lines checked

Patch 4/7 has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.
5/7 Checking commit cbaa5b9bd2a6 (migration/snapshot: Implementation of qemu-snapshot save path)
WARNING: line over 80 characters
#554: FILE: qemu-snapshot-vm.c:507:
+            error_report("RAM page with unknown combination of flags 0x%x", flags);

WARNING: line over 80 characters
#648: FILE: qemu-snapshot-vm.c:601:
+                         (size_t)(ssize_t) (VMSTATE_HEADER_EOF_OFFSET - eof_pos));

WARNING: line over 80 characters
#832: FILE: qemu-snapshot-vm.c:783:
+             * Normally we will never get here since the ending part of migration

total: 0 errors, 3 warnings, 901 lines checked

Patch 5/7 has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.
6/7 Checking commit dece2418c08d (migration/snapshot: Implementation of qemu-snapshot load path)
ERROR: trailing whitespace
#296: FILE: qemu-snapshot-vm.c:996:
+        $

ERROR: switch and case should be at the same indent
#435: FILE: qemu-snapshot-vm.c:1135:
+        switch (flags) {
+            case RAM_SAVE_FLAG_MEM_SIZE:
[...]
+            case RAM_SAVE_FLAG_EOS:
[...]
+            default:

WARNING: line over 80 characters
#446: FILE: qemu-snapshot-vm.c:1146:
+                error_report("Unknown combination of RAM page flags 0x%x", flags);

WARNING: line over 80 characters
#765: FILE: qemu-snapshot.c:258:
+    s->aio_ring = aio_ring_new(ram_load_aio_co, AIO_RING_SIZE, AIO_RING_INFLIGHT);

total: 2 errors, 2 warnings, 741 lines checked

Patch 6/7 has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.

7/7 Checking commit 87d9eb785185 (migration/snapshot: Implementation of qemu-snapshot load path in postcopy mode)
WARNING: line over 80 characters
#517: FILE: qemu-snapshot-vm.c:1272:
+    send_command(s->f_fd, MIG_CMD_PACKAGED, sizeof(length), (uint8_t *) &length);

WARNING: line over 80 characters
#603: FILE: qemu-snapshot-vm.c:1836:
+        if (!res && s->postcopy && !s->in_postcopy && is_postcopy_switchover(s)) {

total: 0 errors, 2 warnings, 628 lines checked

Patch 7/7 has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.
=== OUTPUT END ===

Test command exited with code: 1


The full log is available at
http://patchew.org/logs/20210512192619.537268-1-andrey.gruzdev@virtuozzo.com/testing.checkpatch/?type=message.
---
Email generated automatically by Patchew [https://patchew.org/].
Please send your feedback to patchew-devel@redhat.com

^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2021-05-12 20:23 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-05-12 19:26 [RFC PATCH v1 0/7] migration/snapshot: External snapshot utility Andrey Gruzdev
2021-05-12 19:26 ` [RFC PATCH v1 1/7] migration/snapshot: Introduce qemu-snapshot tool Andrey Gruzdev
2021-05-12 19:26 ` [RFC PATCH v1 2/7] migration/snapshot: Introduce qemu_ftell2() routine Andrey Gruzdev
2021-05-12 19:26 ` [RFC PATCH v1 3/7] migration/snapshot: Move RAM_SAVE_FLAG_xxx defines to migration/ram.h Andrey Gruzdev
2021-05-12 19:26 ` [RFC PATCH v1 4/7] migration/snapshot: Block layer AIO support in qemu-snapshot Andrey Gruzdev
2021-05-12 19:26 ` [RFC PATCH v1 5/7] migration/snapshot: Implementation of qemu-snapshot save path Andrey Gruzdev
2021-05-12 19:26 ` [RFC PATCH v1 6/7] migration/snapshot: Implementation of qemu-snapshot load path Andrey Gruzdev
2021-05-12 19:26 ` [RFC PATCH v1 7/7] migration/snapshot: Implementation of qemu-snapshot load path in postcopy mode Andrey Gruzdev
2021-05-12 20:18 ` [RFC PATCH v1 0/7] migration/snapshot: External snapshot utility no-reply

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).