All of lore.kernel.org
 help / color / mirror / Atom feed
From: Andrey Gruzdev <andrey.gruzdev@virtuozzo.com>
To: qemu-devel@nongnu.org
Cc: Den Lunev <den@openvz.org>,
	Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>,
	Eric Blake <eblake@redhat.com>,
	Paolo Bonzini <pbonzini@redhat.com>,
	Juan Quintela <quintela@redhat.com>,
	"Dr . David Alan Gilbert" <dgilbert@redhat.com>,
	Markus Armbruster <armbru@redhat.com>,
	Peter Xu <peterx@redhat.com>,
	David Hildenbrand <david@redhat.com>,
	Andrey Gruzdev <andrey.gruzdev@virtuozzo.com>
Subject: [RFC PATCH v2 6/7] migration/snapshot: Implementation of qemu-snapshot load path
Date: Thu, 13 May 2021 01:31:26 +0300	[thread overview]
Message-ID: <20210512223127.586885-7-andrey.gruzdev@virtuozzo.com> (raw)
In-Reply-To: <20210512223127.586885-1-andrey.gruzdev@virtuozzo.com>

This part implements snapshot loading in precopy mode.

Signed-off-by: Andrey Gruzdev <andrey.gruzdev@virtuozzo.com>
---
 include/qemu-snapshot.h |  24 +-
 qemu-snapshot-vm.c      | 588 +++++++++++++++++++++++++++++++++++++++-
 qemu-snapshot.c         |  47 +++-
 3 files changed, 654 insertions(+), 5 deletions(-)

diff --git a/include/qemu-snapshot.h b/include/qemu-snapshot.h
index 52519f76c4..aae730d70e 100644
--- a/include/qemu-snapshot.h
+++ b/include/qemu-snapshot.h
@@ -34,6 +34,13 @@
 /* RAM slice size for snapshot revert */
 #define SLICE_SIZE_REVERT           (16 * PAGE_SIZE_MAX)
 
+/* AIO transfer size */
+#define AIO_TRANSFER_SIZE           BDRV_CLUSTER_SIZE
+/* AIO ring size */
+#define AIO_RING_SIZE               64
+/* AIO ring in-flight limit */
+#define AIO_RING_INFLIGHT           16
+
 typedef struct AioRing AioRing;
 
 typedef struct AioRingRequest {
@@ -88,7 +95,20 @@ typedef struct StateSaveCtx {
 } StateSaveCtx;
 
 typedef struct StateLoadCtx {
-    BlockBackend *blk;          /* Block backend */
+    BlockBackend *blk;              /* Block backend */
+    QEMUFile *f_fd;                 /* QEMUFile for outgoing stream */
+    QEMUFile *f_vmstate;            /* QEMUFile for vmstate backing */
+
+    QIOChannelBuffer *ioc_leader;   /* vmstate stream leader */
+
+    AioRing *aio_ring;              /* AIO ring */
+
+    /* vmstate offset of the section containing list of RAM blocks */
+    int64_t ram_list_offset;
+    /* vmstate offset of the first non-iterable device section */
+    int64_t device_offset;
+    /* vmstate EOF */
+    int64_t eof_offset;
 } StateLoadCtx;
 
 extern int64_t page_size;       /* Page size */
@@ -100,6 +120,8 @@ extern int slice_bits;          /* RAM slice size bits */
 
 void ram_init_state(void);
 void ram_destroy_state(void);
+ssize_t coroutine_fn ram_load_aio_co(AioRingRequest *req);
+
 StateSaveCtx *get_save_context(void);
 StateLoadCtx *get_load_context(void);
 int coroutine_fn save_state_main(StateSaveCtx *s);
diff --git a/qemu-snapshot-vm.c b/qemu-snapshot-vm.c
index 2d8f2d3d79..b7ce85c5d9 100644
--- a/qemu-snapshot-vm.c
+++ b/qemu-snapshot-vm.c
@@ -57,6 +57,11 @@ typedef struct RAMPage {
 /* RAM transfer context */
 typedef struct RAMCtx {
     int64_t normal_pages;       /* Total number of normal pages */
+    int64_t loaded_pages;       /* Number of normal pages loaded */
+
+    RAMPage last_page;          /* Last loaded page */
+
+    RAMBlock *last_sent_block;  /* RAM block of last sent page */
 
     /* RAM block list head */
     QSIMPLEQ_HEAD(, RAMBlock) ram_block_list;
@@ -100,17 +105,26 @@ typedef struct SectionHandlers {
 
 /* Forward declarations */
 static int default_save(QEMUFile *f, void *opaque, int version_id);
+static int default_load(QEMUFile *f, void *opaque, int version_id);
+
 static int ram_save(QEMUFile *f, void *opaque, int version_id);
+static int ram_load(QEMUFile *f, void *opaque, int version_id);
+static int ram_load_iterate(QEMUFile *f, void *opaque, int version_id);
+
 static int save_state_complete(StateSaveCtx *s);
+static int load_section_start_full(StateLoadCtx *s);
 
 static RAMCtx ram_ctx;
 
 static SectionHandlerOps default_handler_ops = {
     .save_state = default_save,
+    .load_state = default_load,
 };
 
 static SectionHandlerOps ram_handler_ops = {
     .save_state = ram_save,
+    .load_state = ram_load,
+    .load_state_iterate = ram_load_iterate,
 };
 
 static SectionHandlers section_handlers = {
@@ -218,6 +232,19 @@ static RAMBlock *ram_block_by_idstr(const char *idstr)
     return NULL;
 }
 
+static RAMBlock *ram_block_by_bdrv_offset(int64_t bdrv_offset)
+{
+    RAMBlock *block;
+
+    QSIMPLEQ_FOREACH(block, &ram_ctx.ram_block_list, next) {
+        if (ram_bdrv_offset_in_block(block, bdrv_offset)) {
+            return block;
+        }
+    }
+
+    return NULL;
+}
+
 static RAMBlock *ram_block_from_stream(QEMUFile *f, int flags)
 {
     static RAMBlock *block;
@@ -803,10 +830,555 @@ int coroutine_fn save_state_main(StateSaveCtx *s)
     return MIN(res, 0);
 }
 
+static void load_state_check_errors(StateLoadCtx *s, int *res)
+{
+    /*
+     * Check for file errors on success. Replace generic -EINVAL
+     * retcode with file error if possible.
+     */
+    if (*res >= 0 || *res == -EINVAL) {
+        int f_res = qemu_file_get_error(s->f_fd);
+
+        if (!f_res) {
+            f_res = qemu_file_get_error(s->f_vmstate);
+        }
+        if (f_res) {
+            *res = f_res;
+        }
+    }
+}
+
+static void send_section_header_part_end(QEMUFile *f, SectionHandlersEntry *se,
+        uint8_t section_type)
+{
+    assert(section_type == QEMU_VM_SECTION_PART ||
+           section_type == QEMU_VM_SECTION_END);
+
+    qemu_put_byte(f, section_type);
+    qemu_put_be32(f, se->real_section_id);
+}
+
+static void send_section_footer(QEMUFile *f, SectionHandlersEntry *se)
+{
+    qemu_put_byte(f, QEMU_VM_SECTION_FOOTER);
+    qemu_put_be32(f, se->real_section_id);
+}
+
+static void send_page_header(QEMUFile *f, RAMBlock *block, int64_t offset)
+{
+    uint8_t hdr_buf[512];
+    int hdr_len = 8;
+
+    stq_be_p(hdr_buf, offset);
+    if (!(offset & RAM_SAVE_FLAG_CONTINUE)) {
+        int id_len;
+
+        id_len = strlen(block->idstr);
+        assert(id_len < 256);
+
+        hdr_buf[hdr_len] = id_len;
+        memcpy((hdr_buf + hdr_len + 1), block->idstr, id_len);
+
+        hdr_len += 1 + id_len;
+    }
+
+    qemu_put_buffer(f, hdr_buf, hdr_len);
+}
+
+static void send_zeropage(QEMUFile *f, RAMBlock *block, int64_t offset)
+{
+    send_page_header(f, block, offset | RAM_SAVE_FLAG_ZERO);
+    qemu_put_byte(f, 0);
+}
+
+static bool find_next_page(RAMPage *page)
+{
+    RAMCtx *ram = &ram_ctx;
+    RAMBlock *block = ram->last_page.block;
+    int64_t slice = ram->last_page.offset >> slice_bits;
+    bool full_round = false;
+    bool found = false;
+
+    if (!block) {
+restart:
+        block = QSIMPLEQ_FIRST(&ram->ram_block_list);
+        slice = 0;
+        full_round = true;
+    }
+
+    while (!found && block) {
+        slice = find_next_bit(block->bitmap, block->nr_slices, slice);
+        /* Can't find unsent slice in block? */
+        if (slice >= block->nr_slices) {
+            /* Try next block */
+            block = QSIMPLEQ_NEXT(block, next);
+            slice = 0;
+
+            continue;
+        }
+
+        found = true;
+    }
+
+    /*
+     * Re-start from the beginning if couldn't find unsent slice,
+     * but do it only once.
+     */
+    if (!found && !full_round) {
+        goto restart;
+    }
+
+    if (found) {
+        page->block = block;
+        page->offset = slice << slice_bits;
+    }
+
+    return found;
+}
+
+static inline
+void get_page_range(RAMPage *page, unsigned *length, unsigned max_length)
+{
+    int64_t start_slice;
+    int64_t end_slice;
+    int64_t tmp;
+
+    assert(QEMU_IS_ALIGNED(page->offset, slice_size));
+    assert(max_length >= slice_size);
+
+    start_slice = page->offset >> slice_bits;
+    end_slice = find_next_zero_bit(page->block->bitmap, page->block->nr_slices,
+                                   page->offset >> slice_bits);
+
+    tmp = (end_slice - start_slice) << slice_bits;
+    tmp = MIN(page->block->length - page->offset, tmp);
+
+    /*
+     * Length is always aligned to slice_size with the exception of case
+     * when it is the last slice in RAM block.
+     */
+    *length = MIN(max_length, tmp);
+}
+
+static inline
+void clear_page_range(RAMPage *page, unsigned length)
+{
+    assert(QEMU_IS_ALIGNED(page->offset, slice_size));
+    assert(length);
+
+    /*
+     * Page offsets are aligned to the slice boundary so we only need
+     * to round up length for the case when we load last slice in the block.
+     */
+    bitmap_clear(page->block->bitmap, page->offset >> slice_bits,
+                 ((length - 1) >> slice_bits) + 1);
+}
+
+ssize_t coroutine_fn ram_load_aio_co(AioRingRequest *req)
+{
+    return blk_pread((BlockBackend *) req->opaque, req->offset,
+            req->data, req->size);
+}
+
+static void coroutine_fn ram_load_submit_aio(StateLoadCtx *s)
+{
+    RAMCtx *ram = &ram_ctx;
+    AioRingRequest *req;
+
+    while ((req = aio_ring_get_request(s->aio_ring))) {
+        RAMPage page;
+        unsigned max_length = AIO_TRANSFER_SIZE;
+        unsigned length;
+
+        if (!find_next_page(&page)) {
+            break;
+        }
+
+        /* Get range of contiguous pages that were not transferred yet */
+        get_page_range(&page, &length, max_length);
+        /* Clear range of pages to be queued for I/O */
+        clear_page_range(&page, length);
+
+        /* Used by find_next_page() */
+        ram->last_page.block = page.block;
+        ram->last_page.offset = page.offset + length;
+
+        /* Setup I/O request */
+        req->opaque = s->blk;
+        req->data = qemu_blockalign(blk_bs(s->blk), length);
+        req->offset = ram_bdrv_from_block_offset(page.block, page.offset);
+        req->size = length;
+
+        aio_ring_submit(s->aio_ring);
+    }
+}
+
+static int ram_load_complete_aio(StateLoadCtx *s, AioRingEvent *ev)
+{
+    QEMUFile *f = s->f_fd;
+    RAMCtx *ram = &ram_ctx;
+    RAMBlock *block = ram->last_sent_block;
+    void *bdrv_data = ev->origin->data;
+    int64_t bdrv_offset = ev->origin->offset;
+    ssize_t bdrv_count = ev->status;
+    int64_t offset;
+    int64_t flags = RAM_SAVE_FLAG_CONTINUE;
+    int pages = 0;
+
+    /* Need to switch to the another RAM block? */
+    if (!ram_bdrv_offset_in_block(block, bdrv_offset)) {
+        /*
+         * Lookup RAM block by BDRV offset cause in postcopy we
+         * can issue AIO loads from arbitrary blocks.
+         */
+        block = ram_block_by_bdrv_offset(bdrv_offset);
+        ram->last_sent_block = block;
+
+        /* Reset RAM_SAVE_FLAG_CONTINUE */
+        flags = 0;
+    }
+    offset = ram_block_offset_from_bdrv(block, bdrv_offset);
+
+    for (ssize_t count = 0; count < bdrv_count; count += page_size) {
+        if (buffer_is_zero(bdrv_data, page_size)) {
+            send_zeropage(f, block, (offset | flags));
+        } else {
+            send_page_header(f, block, (offset | RAM_SAVE_FLAG_PAGE | flags));
+            qemu_put_buffer_async(f, bdrv_data, page_size, false);
+
+            /* Update normal page count */
+            ram->loaded_pages++;
+        }
+
+        /*
+         * BDRV request shall never cross RAM block boundary so we can
+         * set RAM_SAVE_FLAG_CONTINUE here unconditionally.
+         */
+        flags = RAM_SAVE_FLAG_CONTINUE;
+
+        bdrv_data += page_size;
+        offset += page_size;
+        pages++;
+    }
+
+    /* Need to flush here cause we use qemu_put_buffer_async() */
+    qemu_fflush(f);
+
+    return pages;
+}
+
+static int coroutine_fn ram_load_pages(StateLoadCtx *s)
+{
+    AioRingEvent *event;
+    int res = 0;
+
+    /* Fill blockdev AIO queue */
+    ram_load_submit_aio(s);
+
+    /* Check for AIO completion event */
+    event = aio_ring_wait_event(s->aio_ring);
+    if (event) {
+        /* Check completion status */
+        res = event->status;
+        if (res > 0) {
+            res = ram_load_complete_aio(s, event);
+        }
+
+        qemu_vfree(event->origin->data);
+        aio_ring_complete(s->aio_ring);
+    }
+
+    return res;
+}
+
+static int coroutine_fn ram_load_pages_flush(StateLoadCtx *s)
+{
+    AioRingEvent *event;
+
+    while ((event = aio_ring_wait_event(s->aio_ring))) {
+        /* Check completion status */
+        if (event->status > 0) {
+            ram_load_complete_aio(s, event);
+        }
+
+        qemu_vfree(event->origin->data);
+        aio_ring_complete(s->aio_ring);
+    }
+
+    return 0;
+}
+
+static int ram_load(QEMUFile *f, void *opaque, int version_id)
+{
+    int compat_flags = RAM_SAVE_FLAG_MEM_SIZE | RAM_SAVE_FLAG_EOS;
+    int flags = 0;
+    int res = 0;
+
+    if (version_id != 4) {
+        error_report("Unsupported version %d for 'ram' handler v4", version_id);
+        return -EINVAL;
+    }
+
+    while (!res && !(flags & RAM_SAVE_FLAG_EOS)) {
+        int64_t offset;
+
+        offset = qemu_get_be64(f);
+        flags = offset & ~page_mask;
+        offset &= page_mask;
+
+        if (flags & ~compat_flags) {
+            error_report("Incompatible RAM page flags 0x%x", flags);
+            res = -EINVAL;
+            break;
+        }
+
+        switch (flags) {
+        case RAM_SAVE_FLAG_MEM_SIZE:
+            /* Fill RAM block list */
+            ram_block_list_from_stream(f, offset);
+            break;
+
+        case RAM_SAVE_FLAG_EOS:
+            /* Normal exit */
+            break;
+
+        default:
+            error_report("Unknown combination of RAM page flags 0x%x", flags);
+            res = -EINVAL;
+        }
+
+        /* Check for file errors even if everything looks good */
+        if (!res) {
+            res = qemu_file_get_error(f);
+        }
+    }
+
+    return res;
+}
+
+#define YIELD_AFTER_MS  500 /* ms */
+
+static int ram_load_iterate(QEMUFile *f, void *opaque, int version_id)
+{
+    StateLoadCtx *s = (StateLoadCtx *) opaque;
+    int64_t t_start;
+    int tmp_res;
+    int res = 1;
+
+    t_start = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+
+    for (int iter = 0; res > 0; iter++) {
+        res = ram_load_pages(s);
+
+        if (!(iter & 7)) {
+            int64_t t_cur = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+
+            if ((t_cur - t_start) > YIELD_AFTER_MS) {
+                break;
+            }
+        }
+    }
+
+    /* Zero retcode means that there're no more pages to load */
+    if (res >= 0) {
+        res = res ? 0 : 1;
+    }
+
+    /* Process pending AIO ring events */
+    tmp_res = ram_load_pages_flush(s);
+    res = tmp_res ? tmp_res : res;
+
+    /* Send EOS flag before section footer */
+    qemu_put_be64(s->f_fd, RAM_SAVE_FLAG_EOS);
+    qemu_fflush(s->f_fd);
+
+    return res;
+}
+
+static int ram_load_memory(StateLoadCtx *s)
+{
+    SectionHandlersEntry *se;
+    int res;
+
+    se = find_se("ram", 0);
+    assert(se && se->ops->load_state_iterate);
+
+    /* Send section header with QEMU_VM_SECTION_PART type */
+    send_section_header_part_end(s->f_fd, se, QEMU_VM_SECTION_PART);
+    res = se->ops->load_state_iterate(s->f_fd, s, se->real_version_id);
+    send_section_footer(s->f_fd, se);
+
+    return res;
+}
+
+static int default_load(QEMUFile *f, void *opaque, int version_id)
+{
+    error_report("Unexpected (non-iterable device state) section");
+    return -EINVAL;
+}
+
+static int load_state_header(StateLoadCtx *s)
+{
+    QEMUFile *f = s->f_vmstate;
+    int v;
+
+    /* Validate magic */
+    v = qemu_get_be32(f);
+    if (v != VMSTATE_HEADER_MAGIC) {
+        error_report("Not a valid snapshot");
+        return -EINVAL;
+    }
+
+    v = qemu_get_be32(f);
+    if (v != page_size) {
+        error_report("Incompatible page size: got %d expected %d",
+                     v, (int) page_size);
+        return -EINVAL;
+    }
+
+    /* Number of non-zero pages in all RAM blocks */
+    ram_ctx.normal_pages = qemu_get_be64(f);
+
+    /* vmstate stream offsets, counted from QEMU_VM_FILE_MAGIC */
+    s->ram_list_offset = qemu_get_be32(f);
+    s->device_offset = qemu_get_be32(f);
+    s->eof_offset = qemu_get_be32(f);
+
+    /* Check that offsets are within the limits */
+    if ((VMSTATE_HEADER_SIZE + s->device_offset) > INPLACE_READ_MAX ||
+            s->device_offset <= s->ram_list_offset) {
+        error_report("Corrupted snapshot header");
+        return -EINVAL;
+    }
+
+    /* Skip up to RAM block list section */
+    qemu_file_skip(f, s->ram_list_offset);
+
+    return 0;
+}
+
+static int load_state_ramlist(StateLoadCtx *s)
+{
+    QEMUFile *f = s->f_vmstate;
+    uint8_t section_type;
+    int res;
+
+    section_type = qemu_get_byte(f);
+
+    if (section_type == QEMU_VM_EOF) {
+        error_report("Unexpected EOF token");
+        return -EINVAL;
+    } else if (section_type != QEMU_VM_SECTION_FULL &&
+               section_type != QEMU_VM_SECTION_START) {
+        error_report("Unexpected section type %d", section_type);
+        return -EINVAL;
+    }
+
+    res = load_section_start_full(s);
+    if (!res) {
+        ram_block_list_init_bitmaps();
+    }
+
+    return res;
+}
+
+static int load_state_complete(StateLoadCtx *s)
+{
+    /* Forward non-iterable device state */
+    qemu_fsplice(s->f_fd, s->f_vmstate, s->eof_offset - s->device_offset);
+
+    qemu_fflush(s->f_fd);
+
+    return 1;
+}
+
+static int load_section_start_full(StateLoadCtx *s)
+{
+    QEMUFile *f = s->f_vmstate;
+    int section_id;
+    int instance_id;
+    int version_id;
+    char idstr[256];
+    SectionHandlersEntry *se;
+    int res;
+
+    section_id = qemu_get_be32(f);
+
+    if (!qemu_get_counted_string(f, idstr)) {
+        error_report("Failed to get section name(%d)", section_id);
+        return -EINVAL;
+    }
+
+    instance_id = qemu_get_be32(f);
+    version_id = qemu_get_be32(f);
+
+    /* Find section handler */
+    se = find_se(idstr, instance_id);
+    if (!se) {
+        se = &section_handlers.default_;
+    } else if (version_id > se->version_id) {
+        /* Validate version */
+        error_report("Unsupported version %d for '%s' v%d",
+                     version_id, idstr, se->version_id);
+        return -EINVAL;
+    }
+
+    se->real_section_id = section_id;
+    se->real_version_id = version_id;
+
+    res = se->ops->load_state(f, s, se->real_version_id);
+    if (res) {
+        return res;
+    }
+
+    if (!check_section_footer(f, se)) {
+        return -EINVAL;
+    }
+
+    return 0;
+}
+
+static int send_state_leader(StateLoadCtx *s)
+{
+    qemu_put_buffer(s->f_fd, s->ioc_leader->data + VMSTATE_HEADER_SIZE,
+                    s->device_offset);
+    return qemu_file_get_error(s->f_fd);
+}
+
 int coroutine_fn load_state_main(StateLoadCtx *s)
 {
-    /* TODO: implement */
-    return 0;
+    int res;
+
+    res = load_state_header(s);
+    if (res) {
+        goto fail;
+    }
+
+    res = load_state_ramlist(s);
+    if (res) {
+        goto fail;
+    }
+
+    res = send_state_leader(s);
+    if (res) {
+        goto fail;
+    }
+
+    do {
+        res = ram_load_memory(s);
+        /* Check for file errors */
+        load_state_check_errors(s, &res);
+    } while (!res);
+
+    if (res == 1) {
+        res = load_state_complete(s);
+    }
+
+fail:
+    load_state_check_errors(s, &res);
+
+    /* Replace positive retcode with 0 */
+    return MIN(res, 0);
 }
 
 /* Initialize snapshot RAM state */
@@ -815,10 +1387,20 @@ void ram_init_state(void)
     RAMCtx *ram = &ram_ctx;
 
     memset(ram, 0, sizeof(ram_ctx));
+
+    /* Initialize RAM block list head */
+    QSIMPLEQ_INIT(&ram->ram_block_list);
 }
 
 /* Destroy snapshot RAM state */
 void ram_destroy_state(void)
 {
-    /* TODO: implement */
+    RAMBlock *block;
+    RAMBlock *next_block;
+
+    /* Free RAM blocks */
+    QSIMPLEQ_FOREACH_SAFE(block, &ram_ctx.ram_block_list, next, next_block) {
+        g_free(block->bitmap);
+        g_free(block);
+    }
 }
diff --git a/qemu-snapshot.c b/qemu-snapshot.c
index f559d5e54d..f9692900db 100644
--- a/qemu-snapshot.c
+++ b/qemu-snapshot.c
@@ -121,7 +121,20 @@ static void init_load_context(void)
 
 static void destroy_load_context(void)
 {
-    /* TODO: implement */
+    StateLoadCtx *s = get_load_context();
+
+    if (s->f_vmstate) {
+        qemu_fclose(s->f_vmstate);
+    }
+    if (s->blk) {
+        blk_unref(s->blk);
+    }
+    if (s->aio_ring) {
+        aio_ring_free(s->aio_ring);
+    }
+    if (s->ioc_leader) {
+        object_unref(OBJECT(s->ioc_leader));
+    }
 }
 
 static BlockBackend *image_open_opts(const char *optstr, QDict *options, int flags)
@@ -212,6 +225,9 @@ fail:
 static void coroutine_fn snapshot_load_co(void *opaque)
 {
     StateLoadCtx *s = get_load_context();
+    QIOChannel *ioc_fd;
+    uint8_t *buf;
+    size_t count;
     int res = -1;
 
     init_load_context();
@@ -223,6 +239,35 @@ static void coroutine_fn snapshot_load_co(void *opaque)
         goto fail;
     }
 
+    /* QEMUFile on vmstate */
+    s->f_vmstate = qemu_fopen_bdrv_vmstate(blk_bs(s->blk), 0);
+    qemu_file_set_blocking(s->f_vmstate, false);
+
+    /* QEMUFile on migration fd */
+    ioc_fd = qio_channel_new_fd(params.fd, NULL);
+    qio_channel_set_name(QIO_CHANNEL(ioc_fd), "migration-channel-outgoing");
+    s->f_fd = qemu_fopen_channel_output(ioc_fd);
+    object_unref(OBJECT(ioc_fd));
+    qemu_file_set_blocking(s->f_fd, false);
+
+    /* Buffer channel to store leading part of migration stream */
+    s->ioc_leader = qio_channel_buffer_new(INPLACE_READ_MAX);
+    qio_channel_set_name(QIO_CHANNEL(s->ioc_leader), "migration-leader-buffer");
+
+    /* AIO ring */
+    s->aio_ring = aio_ring_new(ram_load_aio_co, AIO_RING_SIZE, AIO_RING_INFLIGHT);
+
+    /*
+     * Here we stash the leading part of vmstate stream without promoting read
+     * position.
+     */
+    count = qemu_peek_buffer(s->f_vmstate, &buf, INPLACE_READ_MAX, 0);
+    res = qemu_file_get_error(s->f_vmstate);
+    if (res < 0) {
+        goto fail;
+    }
+    qio_channel_write(QIO_CHANNEL(s->ioc_leader), (char *) buf, count, NULL);
+
     res = load_state_main(s);
     if (res) {
         error_report("Failed to load snapshot: %s", strerror(-res));
-- 
2.27.0



  parent reply	other threads:[~2021-05-12 22:33 UTC|newest]

Thread overview: 8+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-05-12 22:31 [RFC PATCH v2 0/7] migration/snapshot: External snapshot utility Andrey Gruzdev
2021-05-12 22:31 ` [RFC PATCH v2 1/7] migration/snapshot: Introduce qemu-snapshot tool Andrey Gruzdev
2021-05-12 22:31 ` [RFC PATCH v2 2/7] migration/snapshot: Introduce qemu_ftell2() routine Andrey Gruzdev
2021-05-12 22:31 ` [RFC PATCH v2 3/7] migration/snapshot: Move RAM_SAVE_FLAG_xxx defines to migration/ram.h Andrey Gruzdev
2021-05-12 22:31 ` [RFC PATCH v2 4/7] migration/snapshot: Block layer AIO support in qemu-snapshot Andrey Gruzdev
2021-05-12 22:31 ` [RFC PATCH v2 5/7] migration/snapshot: Implementation of qemu-snapshot save path Andrey Gruzdev
2021-05-12 22:31 ` Andrey Gruzdev [this message]
2021-05-12 22:31 ` [RFC PATCH v2 7/7] migration/snapshot: Implementation of qemu-snapshot load path in postcopy mode Andrey Gruzdev

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210512223127.586885-7-andrey.gruzdev@virtuozzo.com \
    --to=andrey.gruzdev@virtuozzo.com \
    --cc=armbru@redhat.com \
    --cc=david@redhat.com \
    --cc=den@openvz.org \
    --cc=dgilbert@redhat.com \
    --cc=eblake@redhat.com \
    --cc=pbonzini@redhat.com \
    --cc=peterx@redhat.com \
    --cc=qemu-devel@nongnu.org \
    --cc=quintela@redhat.com \
    --cc=vsementsov@virtuozzo.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.