All of lore.kernel.org
 help / color / mirror / Atom feed
From: Andrey Gruzdev <andrey.gruzdev@virtuozzo.com>
To: qemu-devel@nongnu.org
Cc: Den Lunev <den@openvz.org>,
	Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>,
	Eric Blake <eblake@redhat.com>,
	Paolo Bonzini <pbonzini@redhat.com>,
	Juan Quintela <quintela@redhat.com>,
	"Dr . David Alan Gilbert" <dgilbert@redhat.com>,
	Markus Armbruster <armbru@redhat.com>,
	Peter Xu <peterx@redhat.com>,
	David Hildenbrand <david@redhat.com>,
	Andrey Gruzdev <andrey.gruzdev@virtuozzo.com>
Subject: [RFC PATCH v1 4/7] migration/snapshot: Block layer AIO support in qemu-snapshot
Date: Wed, 12 May 2021 22:26:16 +0300	[thread overview]
Message-ID: <20210512192619.537268-5-andrey.gruzdev@virtuozzo.com> (raw)
In-Reply-To: <20210512192619.537268-1-andrey.gruzdev@virtuozzo.com>

This commit enables asynchronous block layer I/O for qemu-snapshot tool.
Implementation provides in-order request completion delivery to simplify
migration code.

Several file utility routines are introduced as well.

Signed-off-by: Andrey Gruzdev <andrey.gruzdev@virtuozzo.com>
---
 include/qemu-snapshot.h |  30 +++++
 meson.build             |   2 +-
 qemu-snapshot-io.c      | 266 ++++++++++++++++++++++++++++++++++++++++
 3 files changed, 297 insertions(+), 1 deletion(-)
 create mode 100644 qemu-snapshot-io.c

diff --git a/include/qemu-snapshot.h b/include/qemu-snapshot.h
index 154e11e9a5..7b3406fd56 100644
--- a/include/qemu-snapshot.h
+++ b/include/qemu-snapshot.h
@@ -34,6 +34,23 @@
 /* RAM slice size for snapshot revert */
 #define SLICE_SIZE_REVERT           (16 * PAGE_SIZE_MAX)
 
+typedef struct AioRing AioRing;
+
+typedef struct AioRingRequest {
+    void *opaque;               /* Opaque */
+
+    void *data;                 /* Data buffer */
+    int64_t offset;             /* Offset */
+    size_t size;                /* Size */
+} AioRingRequest;
+
+typedef struct AioRingEvent {
+    AioRingRequest *origin;     /* Originating request */
+    ssize_t status;             /* Completion status */
+} AioRingEvent;
+
+typedef ssize_t coroutine_fn (*AioRingFunc)(AioRingRequest *req);
+
 typedef struct StateSaveCtx {
     BlockBackend *blk;          /* Block backend */
 } StateSaveCtx;
@@ -56,4 +73,17 @@ StateLoadCtx *get_load_context(void);
 int coroutine_fn save_state_main(StateSaveCtx *s);
 int coroutine_fn load_state_main(StateLoadCtx *s);
 
+AioRing *coroutine_fn aio_ring_new(AioRingFunc func, unsigned ring_entries,
+        unsigned max_inflight);
+void aio_ring_free(AioRing *ring);
+void aio_ring_set_max_inflight(AioRing *ring, unsigned max_inflight);
+AioRingRequest *coroutine_fn aio_ring_get_request(AioRing *ring);
+void coroutine_fn aio_ring_submit(AioRing *ring);
+AioRingEvent *coroutine_fn aio_ring_wait_event(AioRing *ring);
+void coroutine_fn aio_ring_complete(AioRing *ring);
+
+QEMUFile *qemu_fopen_bdrv_vmstate(BlockDriverState *bs, int is_writable);
+void qemu_fsplice(QEMUFile *f_dst, QEMUFile *f_src, size_t size);
+void qemu_fsplice_tail(QEMUFile *f_dst, QEMUFile *f_src);
+
 #endif /* QEMU_SNAPSHOT_H */
diff --git a/meson.build b/meson.build
index b851671914..c25fc518df 100644
--- a/meson.build
+++ b/meson.build
@@ -2361,7 +2361,7 @@ if have_tools
              dependencies: [block, qemuutil], install: true)
   qemu_nbd = executable('qemu-nbd', files('qemu-nbd.c'),
                dependencies: [blockdev, qemuutil, gnutls], install: true)
-  qemu_snapshot = executable('qemu-snapshot', files('qemu-snapshot.c', 'qemu-snapshot-vm.c'),
+  qemu_snapshot = executable('qemu-snapshot', files('qemu-snapshot.c', 'qemu-snapshot-vm.c', 'qemu-snapshot-io.c'),
                dependencies: [blockdev, qemuutil, migration], install: true)
 
   subdir('storage-daemon')
diff --git a/qemu-snapshot-io.c b/qemu-snapshot-io.c
new file mode 100644
index 0000000000..cd6428a4a2
--- /dev/null
+++ b/qemu-snapshot-io.c
@@ -0,0 +1,266 @@
+/*
+ * QEMU External Snapshot Utility
+ *
+ * Copyright Virtuozzo GmbH, 2021
+ *
+ * Authors:
+ *  Andrey Gruzdev   <andrey.gruzdev@virtuozzo.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later. See the COPYING file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/coroutine.h"
+#include "sysemu/block-backend.h"
+#include "migration/qemu-file.h"
+#include "qemu-snapshot.h"
+
+/*
+ * AIO ring.
+ *
+ * Coroutine-based environment to support asynchronous I/O operations
+ * providing in-order completion event delivery.
+ *
+ * All routines (with an exception of aio_ring_free()) are required to be
+ * called from the same coroutine.
+ *
+ * Call sequence to keep AIO ring filled:
+ *
+ *   aio_ring_new()             !
+ *                              !
+ *   aio_ring_get_request()     !<------!<------!
+ *   aio_ring_submit()          !------>!       !
+ *                              !               !
+ *   aio_ring_wait_event()      !               !
+ *   aio_ring_complete()        !-------------->!
+ *                              !
+ *   aio_ring_free()            !
+ *
+ */
+
+typedef struct AioRingEntry {
+    AioRingRequest request;     /* I/O request */
+    AioRingEvent event;         /* I/O completion event */
+    bool owned;                 /* Owned by caller */
+} AioRingEntry;
+
+typedef struct AioRing {
+    unsigned head;              /* Head entry index */
+    unsigned tail;              /* Tail entry index */
+
+    unsigned ring_mask;         /* Mask for ring entry indices */
+    unsigned ring_entries;      /* Number of entries in the ring */
+
+    AioRingFunc func;           /* Routine to call */
+
+    Coroutine *main_co;         /* Caller's coroutine */
+    bool waiting;               /* Caller is waiting for event */
+
+    unsigned length;            /* Tail-head distance */
+    unsigned inflight;          /* Number of in-flight requests */
+    unsigned max_inflight;      /* Maximum in-flight requests */
+
+    AioRingEntry entries[];     /* Flex-array of AioRingEntry */
+} AioRing;
+
+static void coroutine_fn aio_ring_co(void *opaque)
+{
+    AioRing *ring = (AioRing *) opaque;
+    AioRingEntry *entry = &ring->entries[ring->tail];
+
+    ring->tail = (ring->tail + 1) & ring->ring_mask;
+    ring->length++;
+
+    ring->inflight++;
+    entry->owned = false;
+
+    entry->event.status = ring->func(&entry->request);
+
+    entry->event.origin = &entry->request;
+    entry->owned = true;
+    ring->inflight--;
+
+    if (ring->waiting) {
+        ring->waiting = false;
+        aio_co_wake(ring->main_co);
+    }
+}
+
+AioRingRequest *coroutine_fn aio_ring_get_request(AioRing *ring)
+{
+    assert(qemu_coroutine_self() == ring->main_co);
+
+    if (ring->length >= ring->ring_entries ||
+            ring->inflight >= ring->max_inflight) {
+        return NULL;
+    }
+
+    return &ring->entries[ring->tail].request;
+}
+
+void coroutine_fn aio_ring_submit(AioRing *ring)
+{
+    assert(qemu_coroutine_self() == ring->main_co);
+    assert(ring->length < ring->ring_entries);
+
+    qemu_coroutine_enter(qemu_coroutine_create(aio_ring_co, ring));
+}
+
+AioRingEvent *coroutine_fn aio_ring_wait_event(AioRing *ring)
+{
+    AioRingEntry *entry = &ring->entries[ring->head];
+
+    assert(qemu_coroutine_self() == ring->main_co);
+
+    if (!ring->length) {
+        return NULL;
+    }
+
+    while (true) {
+        if (entry->owned) {
+            return &entry->event;
+        }
+        ring->waiting = true;
+        qemu_coroutine_yield();
+    }
+
+    /* NOTREACHED */
+}
+
+void coroutine_fn aio_ring_complete(AioRing *ring)
+{
+    AioRingEntry *entry = &ring->entries[ring->head];
+
+    assert(qemu_coroutine_self() == ring->main_co);
+    assert(ring->length);
+
+    ring->head = (ring->head + 1) & ring->ring_mask;
+    ring->length--;
+
+    entry->event.origin = NULL;
+    entry->event.status = 0;
+}
+
+/* Create new AIO ring */
+AioRing *coroutine_fn aio_ring_new(AioRingFunc func, unsigned ring_entries,
+                                   unsigned max_inflight)
+{
+    AioRing *ring;
+
+    assert(is_power_of_2(ring_entries));
+    assert(max_inflight && max_inflight <= ring_entries);
+
+    ring = g_malloc0(sizeof(AioRing) + ring_entries * sizeof(AioRingEntry));
+    ring->main_co = qemu_coroutine_self();
+    ring->ring_entries = ring_entries;
+    ring->ring_mask = ring_entries - 1;
+    ring->max_inflight = max_inflight;
+    ring->func = func;
+
+    return ring;
+}
+
+/* Free AIO ring */
+void aio_ring_free(AioRing *ring)
+{
+    assert(!ring->inflight);
+    g_free(ring);
+}
+
+/* Limit the maximum number of in-flight AIO requests */
+void aio_ring_set_max_inflight(AioRing *ring, unsigned max_inflight)
+{
+    ring->max_inflight = MIN(max_inflight, ring->ring_entries);
+}
+
+static ssize_t bdrv_vmstate_get_buffer(void *opaque, uint8_t *buf, int64_t pos,
+                                       size_t size, Error **errp)
+{
+    return bdrv_load_vmstate((BlockDriverState *) opaque, buf, pos, size);
+}
+
+static ssize_t bdrv_vmstate_writev_buffer(void *opaque, struct iovec *iov,
+        int iovcnt, int64_t pos, Error **errp)
+{
+    QEMUIOVector qiov;
+    int res;
+
+    qemu_iovec_init_external(&qiov, iov, iovcnt);
+
+    res = bdrv_writev_vmstate((BlockDriverState *) opaque, &qiov, pos);
+    if (res < 0) {
+        return res;
+    }
+
+    return qiov.size;
+}
+
+static int bdrv_vmstate_fclose(void *opaque, Error **errp)
+{
+    return bdrv_flush((BlockDriverState *) opaque);
+}
+
+static const QEMUFileOps bdrv_vmstate_read_ops = {
+    .get_buffer = bdrv_vmstate_get_buffer,
+    .close      = bdrv_vmstate_fclose,
+};
+
+static const QEMUFileOps bdrv_vmstate_write_ops = {
+    .writev_buffer  = bdrv_vmstate_writev_buffer,
+    .close          = bdrv_vmstate_fclose,
+};
+
+/* Create QEMUFile to access vmstate stream on QCOW2 image */
+QEMUFile *qemu_fopen_bdrv_vmstate(BlockDriverState *bs, int is_writable)
+{
+    if (is_writable) {
+        return qemu_fopen_ops(bs, &bdrv_vmstate_write_ops);
+    }
+
+    return qemu_fopen_ops(bs, &bdrv_vmstate_read_ops);
+}
+
+/* Move number of bytes from the source QEMUFile to destination */
+void qemu_fsplice(QEMUFile *f_dst, QEMUFile *f_src, size_t size)
+{
+    size_t rest = size;
+
+    while (rest) {
+        uint8_t *ptr = NULL;
+        size_t req_size;
+        size_t count;
+
+        req_size = MIN(rest, INPLACE_READ_MAX);
+        count = qemu_peek_buffer(f_src, &ptr, req_size, 0);
+        qemu_file_skip(f_src, count);
+
+        qemu_put_buffer(f_dst, ptr, count);
+        rest -= count;
+    }
+}
+
+/*
+ * Move data from source QEMUFile to destination
+ * until EOF is reached on source.
+ */
+void qemu_fsplice_tail(QEMUFile *f_dst, QEMUFile *f_src)
+{
+    bool eof = false;
+
+    while (!eof) {
+        const size_t size = INPLACE_READ_MAX;
+        uint8_t *buffer = NULL;
+        size_t count;
+
+        count = qemu_peek_buffer(f_src, &buffer, size, 0);
+        qemu_file_skip(f_src, count);
+
+        /* Reached EOF on source? */
+        if (count != size) {
+            eof = true;
+        }
+
+        qemu_put_buffer(f_dst, buffer, count);
+    }
+}
-- 
2.27.0



  parent reply	other threads:[~2021-05-12 20:03 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-05-12 19:26 [RFC PATCH v1 0/7] migration/snapshot: External snapshot utility Andrey Gruzdev
2021-05-12 19:26 ` [RFC PATCH v1 1/7] migration/snapshot: Introduce qemu-snapshot tool Andrey Gruzdev
2021-05-12 19:26 ` [RFC PATCH v1 2/7] migration/snapshot: Introduce qemu_ftell2() routine Andrey Gruzdev
2021-05-12 19:26 ` [RFC PATCH v1 3/7] migration/snapshot: Move RAM_SAVE_FLAG_xxx defines to migration/ram.h Andrey Gruzdev
2021-05-12 19:26 ` Andrey Gruzdev [this message]
2021-05-12 19:26 ` [RFC PATCH v1 5/7] migration/snapshot: Implementation of qemu-snapshot save path Andrey Gruzdev
2021-05-12 19:26 ` [RFC PATCH v1 6/7] migration/snapshot: Implementation of qemu-snapshot load path Andrey Gruzdev
2021-05-12 19:26 ` [RFC PATCH v1 7/7] migration/snapshot: Implementation of qemu-snapshot load path in postcopy mode Andrey Gruzdev
2021-05-12 20:18 ` [RFC PATCH v1 0/7] migration/snapshot: External snapshot utility no-reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210512192619.537268-5-andrey.gruzdev@virtuozzo.com \
    --to=andrey.gruzdev@virtuozzo.com \
    --cc=armbru@redhat.com \
    --cc=david@redhat.com \
    --cc=den@openvz.org \
    --cc=dgilbert@redhat.com \
    --cc=eblake@redhat.com \
    --cc=pbonzini@redhat.com \
    --cc=peterx@redhat.com \
    --cc=qemu-devel@nongnu.org \
    --cc=quintela@redhat.com \
    --cc=vsementsov@virtuozzo.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.