qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
* [RFC patch v1 0/3] qemu-file writing performance improving
@ 2020-04-13 11:12 Denis Plotnikov
  2020-04-13 11:12 ` [RFC patch v1 1/3] qemu-file: introduce current buffer Denis Plotnikov
                   ` (5 more replies)
  0 siblings, 6 replies; 19+ messages in thread
From: Denis Plotnikov @ 2020-04-13 11:12 UTC (permalink / raw)
  To: qemu-devel; +Cc: den, dgilbert, quintela

Problem description: qcow2 internal snapshot saving time is too big on HDD ~ 25 sec

When a qcow2 image is placed on a regular HDD and the image is openned with
O_DIRECT the snapshot saving time is around 26 sec.
The snapshot saving time can be 4 times sorter.
The patch series propose the way to achive that. 

Why is the saving time = ~25 sec?

There are three things:
1. qemu-file iov limit (currently 64)
2. direct qemu_fflush calls, inducing disk writings
3. ram data copying and synchronous disk wrtings

When 1, 2 are quite clear, the 3rd needs some explaination:

Internal snapshot uses qemu-file as an interface to store the data with
stream semantics.
qemu-file avoids data coping when possible (mostly for ram data)
and use iovectors to propagate the data to an undelying block driver state.
In the case when qcow2 openned with O_DIRECT it is suboptimal.

This is what happens: on writing, when the iovectors query goes from qemu-file
to bdrv (here and further by brdv I mean qcow2 with posix O_DIRECT openned backend),
the brdv checks all iovectors to be base and size aligned, if it's not the case,
the data copied to an internal buffer and synchronous pwrite is called.
If the iovectors are aligned, io_submit is called.

In our case, snapshot almost always induces pwrite, since we never have all
the iovectors aligned in the query, because of frequent adding a short iovector:
8 byte ram-page delimiters, after adding each ram page iovector.

So the qemu-file code in this case:
1. doesn't aviod ram copying
2. works fully synchronously

How to improve the snapshot time:

1. easy way: to increase iov limit to IOV_MAX (1024).
This will reduce synchronous writing frequency.
My test revealed that with iov limit = IOV_MAX the snapshot time *~12 sec*.

2. complex way: do writings asynchronously.
Introduce both base- and size-aligned buffer, write the data only when
the buffer is full, write the buffer asynchronously, meanwhile filling another
buffer with snapshot data.
My test revealed that this complex way provides the snapshot time *~6 sec*,
2 times better than just iov limit increasing.

The patch proposes how to improve the snapshot performance in the complex way,
allowing to use the asyncronous writings when needed.

This is an RFC series, as I didn't confident that I fully understand all
qemu-file use cases. I tried to make the series in a safe way to not break
anything related to qemu-file using in other places, like migration.

All comments are *VERY* appriciated!

Thanks,
Denis

Denis Plotnikov (3):
  qemu-file: introduce current buffer
  qemu-file: add buffered mode
  migration/savevm: use qemu-file buffered mode for non-cached bdrv

 include/qemu/typedefs.h |   2 +
 migration/qemu-file.c   | 479 +++++++++++++++++++++++++++++++++++++++++-------
 migration/qemu-file.h   |   9 +
 migration/savevm.c      |  38 +++-
 4 files changed, 456 insertions(+), 72 deletions(-)

-- 
1.8.3.1



^ permalink raw reply	[flat|nested] 19+ messages in thread

* [RFC patch v1 1/3] qemu-file: introduce current buffer
  2020-04-13 11:12 [RFC patch v1 0/3] qemu-file writing performance improving Denis Plotnikov
@ 2020-04-13 11:12 ` Denis Plotnikov
  2020-04-24 17:47   ` Vladimir Sementsov-Ogievskiy
  2020-04-24 21:12   ` Eric Blake
  2020-04-13 11:12 ` [RFC patch v1 2/3] qemu-file: add buffered mode Denis Plotnikov
                   ` (4 subsequent siblings)
  5 siblings, 2 replies; 19+ messages in thread
From: Denis Plotnikov @ 2020-04-13 11:12 UTC (permalink / raw)
  To: qemu-devel; +Cc: den, dgilbert, quintela

To approach async wrtiting in the further commits, the buffer
allocated in QEMUFile struct is replaced with the link to the
current buffer. We're going to use many buffers to write the
qemu file stream to the unerlying storage asynchronously. The
current buffer points out to the buffer is currently filled
with data.

This patch doesn't add any features to qemu-file and doesn't
change any qemu-file behavior.

Signed-off-by: Denis Plotnikov <dplotnikov@virtuozzo.com>
---
 include/qemu/typedefs.h |   1 +
 migration/qemu-file.c   | 156 +++++++++++++++++++++++++++++-------------------
 2 files changed, 95 insertions(+), 62 deletions(-)

diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
index 375770a..88dce54 100644
--- a/include/qemu/typedefs.h
+++ b/include/qemu/typedefs.h
@@ -97,6 +97,7 @@ typedef struct QDict QDict;
 typedef struct QEMUBH QEMUBH;
 typedef struct QemuConsole QemuConsole;
 typedef struct QEMUFile QEMUFile;
+typedef struct QEMUFileBuffer QEMUFileBuffer;
 typedef struct QemuLockable QemuLockable;
 typedef struct QemuMutex QemuMutex;
 typedef struct QemuOpt QemuOpt;
diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index 1c3a358..285c6ef 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -33,6 +33,17 @@
 #define IO_BUF_SIZE 32768
 #define MAX_IOV_SIZE MIN(IOV_MAX, 64)
 
+QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, 512));
+
+struct QEMUFileBuffer {
+    int buf_index;
+    int buf_size; /* 0 when writing */
+    uint8_t *buf;
+    unsigned long *may_free;
+    struct iovec *iov;
+    unsigned int iovcnt;
+};
+
 struct QEMUFile {
     const QEMUFileOps *ops;
     const QEMUFileHooks *hooks;
@@ -43,18 +54,12 @@ struct QEMUFile {
 
     int64_t pos; /* start of buffer when writing, end of buffer
                     when reading */
-    int buf_index;
-    int buf_size; /* 0 when writing */
-    uint8_t buf[IO_BUF_SIZE];
-
-    DECLARE_BITMAP(may_free, MAX_IOV_SIZE);
-    struct iovec iov[MAX_IOV_SIZE];
-    unsigned int iovcnt;
-
     int last_error;
     Error *last_error_obj;
     /* has the file has been shutdown */
     bool shutdown;
+    /* currently used buffer */
+    QEMUFileBuffer *current_buf;
 };
 
 /*
@@ -109,6 +114,12 @@ QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops)
 
     f->opaque = opaque;
     f->ops = ops;
+
+    f->current_buf = g_new0(QEMUFileBuffer, 1);
+    f->current_buf->buf = g_malloc(IO_BUF_SIZE);
+    f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
+    f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
+
     return f;
 }
 
@@ -177,35 +188,37 @@ static void qemu_iovec_release_ram(QEMUFile *f)
 {
     struct iovec iov;
     unsigned long idx;
+    QEMUFileBuffer *fb = f->current_buf;
 
     /* Find and release all the contiguous memory ranges marked as may_free. */
-    idx = find_next_bit(f->may_free, f->iovcnt, 0);
-    if (idx >= f->iovcnt) {
+    idx = find_next_bit(fb->may_free, fb->iovcnt, 0);
+    if (idx >= fb->iovcnt) {
         return;
     }
-    iov = f->iov[idx];
+    iov = fb->iov[idx];
 
     /* The madvise() in the loop is called for iov within a continuous range and
      * then reinitialize the iov. And in the end, madvise() is called for the
      * last iov.
      */
-    while ((idx = find_next_bit(f->may_free, f->iovcnt, idx + 1)) < f->iovcnt) {
+    while ((idx = find_next_bit(fb->may_free,
+                                fb->iovcnt, idx + 1)) < fb->iovcnt) {
         /* check for adjacent buffer and coalesce them */
-        if (iov.iov_base + iov.iov_len == f->iov[idx].iov_base) {
-            iov.iov_len += f->iov[idx].iov_len;
+        if (iov.iov_base + iov.iov_len == fb->iov[idx].iov_base) {
+            iov.iov_len += fb->iov[idx].iov_len;
             continue;
         }
         if (qemu_madvise(iov.iov_base, iov.iov_len, QEMU_MADV_DONTNEED) < 0) {
             error_report("migrate: madvise DONTNEED failed %p %zd: %s",
                          iov.iov_base, iov.iov_len, strerror(errno));
         }
-        iov = f->iov[idx];
+        iov = fb->iov[idx];
     }
     if (qemu_madvise(iov.iov_base, iov.iov_len, QEMU_MADV_DONTNEED) < 0) {
             error_report("migrate: madvise DONTNEED failed %p %zd: %s",
                          iov.iov_base, iov.iov_len, strerror(errno));
     }
-    memset(f->may_free, 0, sizeof(f->may_free));
+    bitmap_zero(fb->may_free, MAX_IOV_SIZE);
 }
 
 /**
@@ -219,6 +232,7 @@ void qemu_fflush(QEMUFile *f)
     ssize_t ret = 0;
     ssize_t expect = 0;
     Error *local_error = NULL;
+    QEMUFileBuffer *fb = f->current_buf;
 
     if (!qemu_file_is_writable(f)) {
         return;
@@ -227,9 +241,9 @@ void qemu_fflush(QEMUFile *f)
     if (f->shutdown) {
         return;
     }
-    if (f->iovcnt > 0) {
-        expect = iov_size(f->iov, f->iovcnt);
-        ret = f->ops->writev_buffer(f->opaque, f->iov, f->iovcnt, f->pos,
+    if (fb->iovcnt > 0) {
+        expect = iov_size(fb->iov, fb->iovcnt);
+        ret = f->ops->writev_buffer(f->opaque, fb->iov, fb->iovcnt, f->pos,
                                     &local_error);
 
         qemu_iovec_release_ram(f);
@@ -244,8 +258,8 @@ void qemu_fflush(QEMUFile *f)
     if (ret != expect) {
         qemu_file_set_error_obj(f, ret < 0 ? ret : -EIO, local_error);
     }
-    f->buf_index = 0;
-    f->iovcnt = 0;
+    fb->buf_index = 0;
+    fb->iovcnt = 0;
 }
 
 void ram_control_before_iterate(QEMUFile *f, uint64_t flags)
@@ -331,24 +345,25 @@ static ssize_t qemu_fill_buffer(QEMUFile *f)
     int len;
     int pending;
     Error *local_error = NULL;
+    QEMUFileBuffer *fb = f->current_buf;
 
     assert(!qemu_file_is_writable(f));
 
-    pending = f->buf_size - f->buf_index;
+    pending = fb->buf_size - fb->buf_index;
     if (pending > 0) {
-        memmove(f->buf, f->buf + f->buf_index, pending);
+        memmove(fb->buf, fb->buf + fb->buf_index, pending);
     }
-    f->buf_index = 0;
-    f->buf_size = pending;
+    fb->buf_index = 0;
+    fb->buf_size = pending;
 
     if (f->shutdown) {
         return 0;
     }
 
-    len = f->ops->get_buffer(f->opaque, f->buf + pending, f->pos,
+    len = f->ops->get_buffer(f->opaque, fb->buf + pending, f->pos,
                              IO_BUF_SIZE - pending, &local_error);
     if (len > 0) {
-        f->buf_size += len;
+        fb->buf_size += len;
         f->pos += len;
     } else if (len == 0) {
         qemu_file_set_error_obj(f, -EIO, local_error);
@@ -393,6 +408,10 @@ int qemu_fclose(QEMUFile *f)
         ret = f->last_error;
     }
     error_free(f->last_error_obj);
+    g_free(f->current_buf->buf);
+    g_free(f->current_buf->iov);
+    g_free(f->current_buf->may_free);
+    g_free(f->current_buf);
     g_free(f);
     trace_qemu_file_fclose();
     return ret;
@@ -409,21 +428,22 @@ int qemu_fclose(QEMUFile *f)
 static int add_to_iovec(QEMUFile *f, const uint8_t *buf, size_t size,
                         bool may_free)
 {
+    QEMUFileBuffer *fb = f->current_buf;
     /* check for adjacent buffer and coalesce them */
-    if (f->iovcnt > 0 && buf == f->iov[f->iovcnt - 1].iov_base +
-        f->iov[f->iovcnt - 1].iov_len &&
-        may_free == test_bit(f->iovcnt - 1, f->may_free))
+    if (fb->iovcnt > 0 && buf == fb->iov[fb->iovcnt - 1].iov_base +
+        fb->iov[fb->iovcnt - 1].iov_len &&
+        may_free == test_bit(fb->iovcnt - 1, fb->may_free))
     {
-        f->iov[f->iovcnt - 1].iov_len += size;
+        fb->iov[fb->iovcnt - 1].iov_len += size;
     } else {
         if (may_free) {
-            set_bit(f->iovcnt, f->may_free);
+            set_bit(fb->iovcnt, fb->may_free);
         }
-        f->iov[f->iovcnt].iov_base = (uint8_t *)buf;
-        f->iov[f->iovcnt++].iov_len = size;
+        fb->iov[fb->iovcnt].iov_base = (uint8_t *)buf;
+        fb->iov[fb->iovcnt++].iov_len = size;
     }
 
-    if (f->iovcnt >= MAX_IOV_SIZE) {
+    if (fb->iovcnt >= MAX_IOV_SIZE) {
         qemu_fflush(f);
         return 1;
     }
@@ -433,9 +453,10 @@ static int add_to_iovec(QEMUFile *f, const uint8_t *buf, size_t size,
 
 static void add_buf_to_iovec(QEMUFile *f, size_t len)
 {
-    if (!add_to_iovec(f, f->buf + f->buf_index, len, false)) {
-        f->buf_index += len;
-        if (f->buf_index == IO_BUF_SIZE) {
+    QEMUFileBuffer *fb = f->current_buf;
+    if (!add_to_iovec(f, fb->buf + fb->buf_index, len, false)) {
+        fb->buf_index += len;
+        if (fb->buf_index == IO_BUF_SIZE) {
             qemu_fflush(f);
         }
     }
@@ -455,17 +476,18 @@ void qemu_put_buffer_async(QEMUFile *f, const uint8_t *buf, size_t size,
 void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
 {
     size_t l;
+    QEMUFileBuffer *fb = f->current_buf;
 
     if (f->last_error) {
         return;
     }
 
     while (size > 0) {
-        l = IO_BUF_SIZE - f->buf_index;
+        l = IO_BUF_SIZE - fb->buf_index;
         if (l > size) {
             l = size;
         }
-        memcpy(f->buf + f->buf_index, buf, l);
+        memcpy(fb->buf + fb->buf_index, buf, l);
         f->bytes_xfer += l;
         add_buf_to_iovec(f, l);
         if (qemu_file_get_error(f)) {
@@ -478,19 +500,23 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
 
 void qemu_put_byte(QEMUFile *f, int v)
 {
+    QEMUFileBuffer *fb = f->current_buf;
+
     if (f->last_error) {
         return;
     }
 
-    f->buf[f->buf_index] = v;
+    fb->buf[fb->buf_index] = v;
     f->bytes_xfer++;
     add_buf_to_iovec(f, 1);
 }
 
 void qemu_file_skip(QEMUFile *f, int size)
 {
-    if (f->buf_index + size <= f->buf_size) {
-        f->buf_index += size;
+    QEMUFileBuffer *fb = f->current_buf;
+
+    if (fb->buf_index + size <= fb->buf_size) {
+        fb->buf_index += size;
     }
 }
 
@@ -506,15 +532,16 @@ size_t qemu_peek_buffer(QEMUFile *f, uint8_t **buf, size_t size, size_t offset)
 {
     ssize_t pending;
     size_t index;
+    QEMUFileBuffer *fb = f->current_buf;
 
     assert(!qemu_file_is_writable(f));
     assert(offset < IO_BUF_SIZE);
     assert(size <= IO_BUF_SIZE - offset);
 
     /* The 1st byte to read from */
-    index = f->buf_index + offset;
+    index = fb->buf_index + offset;
     /* The number of available bytes starting at index */
-    pending = f->buf_size - index;
+    pending = fb->buf_size - index;
 
     /*
      * qemu_fill_buffer might return just a few bytes, even when there isn't
@@ -527,8 +554,8 @@ size_t qemu_peek_buffer(QEMUFile *f, uint8_t **buf, size_t size, size_t offset)
             break;
         }
 
-        index = f->buf_index + offset;
-        pending = f->buf_size - index;
+        index = fb->buf_index + offset;
+        pending = fb->buf_size - index;
     }
 
     if (pending <= 0) {
@@ -538,7 +565,7 @@ size_t qemu_peek_buffer(QEMUFile *f, uint8_t **buf, size_t size, size_t offset)
         size = pending;
     }
 
-    *buf = f->buf + index;
+    *buf = fb->buf + index;
     return size;
 }
 
@@ -615,19 +642,21 @@ size_t qemu_get_buffer_in_place(QEMUFile *f, uint8_t **buf, size_t size)
  */
 int qemu_peek_byte(QEMUFile *f, int offset)
 {
-    int index = f->buf_index + offset;
+    QEMUFileBuffer *fb = f->current_buf;
+
+    int index = fb->buf_index + offset;
 
     assert(!qemu_file_is_writable(f));
     assert(offset < IO_BUF_SIZE);
 
-    if (index >= f->buf_size) {
+    if (index >= fb->buf_size) {
         qemu_fill_buffer(f);
-        index = f->buf_index + offset;
-        if (index >= f->buf_size) {
+        index = fb->buf_index + offset;
+        if (index >= fb->buf_size) {
             return 0;
         }
     }
-    return f->buf[index];
+    return fb->buf[index];
 }
 
 int qemu_get_byte(QEMUFile *f)
@@ -643,9 +672,10 @@ int64_t qemu_ftell_fast(QEMUFile *f)
 {
     int64_t ret = f->pos;
     int i;
+    QEMUFileBuffer *fb = f->current_buf;
 
-    for (i = 0; i < f->iovcnt; i++) {
-        ret += f->iov[i].iov_len;
+    for (i = 0; i < fb->iovcnt; i++) {
+        ret += fb->iov[i].iov_len;
     }
 
     return ret;
@@ -770,13 +800,14 @@ static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
 ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
                                   const uint8_t *p, size_t size)
 {
-    ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t);
+    QEMUFileBuffer *fb = f->current_buf;
+    ssize_t blen = IO_BUF_SIZE - fb->buf_index - sizeof(int32_t);
 
     if (blen < compressBound(size)) {
         return -1;
     }
 
-    blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t),
+    blen = qemu_compress_data(stream, fb->buf + fb->buf_index + sizeof(int32_t),
                               blen, p, size);
     if (blen < 0) {
         return -1;
@@ -794,12 +825,13 @@ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
 int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src)
 {
     int len = 0;
+    QEMUFileBuffer *fb_src = f_src->current_buf;
 
-    if (f_src->buf_index > 0) {
-        len = f_src->buf_index;
-        qemu_put_buffer(f_des, f_src->buf, f_src->buf_index);
-        f_src->buf_index = 0;
-        f_src->iovcnt = 0;
+    if (fb_src->buf_index > 0) {
+        len = fb_src->buf_index;
+        qemu_put_buffer(f_des, fb_src->buf, fb_src->buf_index);
+        fb_src->buf_index = 0;
+        fb_src->iovcnt = 0;
     }
     return len;
 }
-- 
1.8.3.1



^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [RFC patch v1 2/3] qemu-file: add buffered mode
  2020-04-13 11:12 [RFC patch v1 0/3] qemu-file writing performance improving Denis Plotnikov
  2020-04-13 11:12 ` [RFC patch v1 1/3] qemu-file: introduce current buffer Denis Plotnikov
@ 2020-04-13 11:12 ` Denis Plotnikov
  2020-04-24 21:25   ` Eric Blake
                     ` (2 more replies)
  2020-04-13 11:12 ` [RFC patch v1 3/3] migration/savevm: use qemu-file buffered mode for non-cached bdrv Denis Plotnikov
                   ` (3 subsequent siblings)
  5 siblings, 3 replies; 19+ messages in thread
From: Denis Plotnikov @ 2020-04-13 11:12 UTC (permalink / raw)
  To: qemu-devel; +Cc: den, dgilbert, quintela

The patch adds ability to qemu-file to write the data
asynchronously to improve the performance on writing.
Before, only synchronous writing was supported.

Enabling of the asyncronous mode is managed by new
"enabled_buffered" callback.

Signed-off-by: Denis Plotnikov <dplotnikov@virtuozzo.com>
---
 include/qemu/typedefs.h |   1 +
 migration/qemu-file.c   | 351 +++++++++++++++++++++++++++++++++++++++++++++---
 migration/qemu-file.h   |   9 ++
 3 files changed, 339 insertions(+), 22 deletions(-)

diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
index 88dce54..9b388c8 100644
--- a/include/qemu/typedefs.h
+++ b/include/qemu/typedefs.h
@@ -98,6 +98,7 @@ typedef struct QEMUBH QEMUBH;
 typedef struct QemuConsole QemuConsole;
 typedef struct QEMUFile QEMUFile;
 typedef struct QEMUFileBuffer QEMUFileBuffer;
+typedef struct QEMUFileAioTask QEMUFileAioTask;
 typedef struct QemuLockable QemuLockable;
 typedef struct QemuMutex QemuMutex;
 typedef struct QemuOpt QemuOpt;
diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index 285c6ef..f42f949 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -29,19 +29,25 @@
 #include "qemu-file.h"
 #include "trace.h"
 #include "qapi/error.h"
+#include "block/aio_task.h"
 
-#define IO_BUF_SIZE 32768
+#define IO_BUF_SIZE (1024 * 1024)
 #define MAX_IOV_SIZE MIN(IOV_MAX, 64)
+#define IO_BUF_NUM 2
+#define IO_BUF_ALIGNMENT 512
 
-QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, 512));
+QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, IO_BUF_ALIGNMENT));
+QEMU_BUILD_BUG_ON(IO_BUF_SIZE > INT_MAX);
+QEMU_BUILD_BUG_ON(IO_BUF_NUM <= 0);
 
 struct QEMUFileBuffer {
     int buf_index;
-    int buf_size; /* 0 when writing */
+    int buf_size; /* 0 when non-buffered writing */
     uint8_t *buf;
     unsigned long *may_free;
     struct iovec *iov;
     unsigned int iovcnt;
+    QLIST_ENTRY(QEMUFileBuffer) link;
 };
 
 struct QEMUFile {
@@ -60,6 +66,22 @@ struct QEMUFile {
     bool shutdown;
     /* currently used buffer */
     QEMUFileBuffer *current_buf;
+    /*
+     * with buffered_mode enabled all the data copied to 512 byte
+     * aligned buffer, including iov data. Then the buffer is passed
+     * to writev_buffer callback.
+     */
+    bool buffered_mode;
+    /* for async buffer writing */
+    AioTaskPool *pool;
+    /* the list of free buffers, currently used on is NOT there */
+    QLIST_HEAD(, QEMUFileBuffer) free_buffers;
+};
+
+struct QEMUFileAioTask {
+    AioTask task;
+    QEMUFile *f;
+    QEMUFileBuffer *fb;
 };
 
 /*
@@ -115,10 +137,42 @@ QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops)
     f->opaque = opaque;
     f->ops = ops;
 
-    f->current_buf = g_new0(QEMUFileBuffer, 1);
-    f->current_buf->buf = g_malloc(IO_BUF_SIZE);
-    f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
-    f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
+    if (f->ops->enable_buffered) {
+        f->buffered_mode = f->ops->enable_buffered(f->opaque);
+    }
+
+    if (f->buffered_mode && qemu_file_is_writable(f)) {
+        int i;
+        /*
+         * in buffered_mode we don't use internal io vectors
+         * and may_free bitmap, because we copy the data to be
+         * written right away to the buffer
+         */
+        f->pool = aio_task_pool_new(IO_BUF_NUM);
+
+        /* allocate io buffers */
+        for (i = 0; i < IO_BUF_NUM; i++) {
+            QEMUFileBuffer *fb = g_new0(QEMUFileBuffer, 1);
+
+            fb->buf = qemu_memalign(IO_BUF_ALIGNMENT, IO_BUF_SIZE);
+            fb->buf_size = IO_BUF_SIZE;
+
+            /*
+             * put the first buffer to the current buf and the rest
+             * to the list of free buffers
+             */
+            if (i == 0) {
+                f->current_buf = fb;
+            } else {
+                QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
+            }
+        }
+    } else {
+        f->current_buf = g_new0(QEMUFileBuffer, 1);
+        f->current_buf->buf = g_malloc(IO_BUF_SIZE);
+        f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
+        f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
+    }
 
     return f;
 }
@@ -190,6 +244,8 @@ static void qemu_iovec_release_ram(QEMUFile *f)
     unsigned long idx;
     QEMUFileBuffer *fb = f->current_buf;
 
+    assert(!f->buffered_mode);
+
     /* Find and release all the contiguous memory ranges marked as may_free. */
     idx = find_next_bit(fb->may_free, fb->iovcnt, 0);
     if (idx >= fb->iovcnt) {
@@ -221,6 +277,147 @@ static void qemu_iovec_release_ram(QEMUFile *f)
     bitmap_zero(fb->may_free, MAX_IOV_SIZE);
 }
 
+static void advance_buf_ptr(QEMUFile *f, size_t size)
+{
+    QEMUFileBuffer *fb = f->current_buf;
+    /* must not advance to 0 */
+    assert(size);
+    /* must not overflow buf_index (int) */
+    assert(fb->buf_index + size <= INT_MAX);
+    /* must not exceed buf_size */
+    assert(fb->buf_index + size <= fb->buf_size);
+
+    fb->buf_index += size;
+}
+
+static size_t get_buf_free_size(QEMUFile *f)
+{
+    QEMUFileBuffer *fb = f->current_buf;
+    /* buf_index can't be greated than buf_size */
+    assert(fb->buf_size >= fb->buf_index);
+    return fb->buf_size - fb->buf_index;
+}
+
+static size_t get_buf_used_size(QEMUFile *f)
+{
+    QEMUFileBuffer *fb = f->current_buf;
+    return fb->buf_index;
+}
+
+static uint8_t *get_buf_ptr(QEMUFile *f)
+{
+    QEMUFileBuffer *fb = f->current_buf;
+    /* protects from out of bound reading */
+    assert(fb->buf_index <= IO_BUF_SIZE);
+    return fb->buf + fb->buf_index;
+}
+
+static bool buf_is_full(QEMUFile *f)
+{
+    return get_buf_free_size(f) == 0;
+}
+
+static void reset_buf(QEMUFile *f)
+{
+    QEMUFileBuffer *fb = f->current_buf;
+    fb->buf_index = 0;
+}
+
+static int write_task_fn(AioTask *task)
+{
+    int ret;
+    Error *local_error = NULL;
+    QEMUFileAioTask *t = (QEMUFileAioTask *) task;
+    QEMUFile *f = t->f;
+    QEMUFileBuffer *fb = t->fb;
+    uint64_t pos = f->pos;
+    struct iovec v = (struct iovec) {
+        .iov_base = fb->buf,
+        .iov_len = fb->buf_index,
+    };
+
+    assert(f->buffered_mode);
+
+    /*
+     * Increment file position.
+     * This needs to be here before calling writev_buffer, because
+     * writev_buffer is asynchronous and there could be more than one
+     * writev_buffer started simultaniously. Each writev_buffer should
+     * use its own file pos to write to. writev_buffer may write less
+     * than buf_index bytes but we treat this situation as an error.
+     * If error appeared, further file using is meaningless.
+     * We expect that, the most of the time the full buffer is written,
+     * (when buf_size == buf_index). The only case when the non-full
+     * buffer is written (buf_size != buf_index) is file close,
+     * when we need to flush the rest of the buffer content.
+     */
+    f->pos += fb->buf_index;
+
+    ret = f->ops->writev_buffer(f->opaque, &v, 1, pos, &local_error);
+
+    /* return the just written buffer to the free list */
+    QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
+
+    /* check that we have written everything */
+    if (ret != fb->buf_index) {
+        qemu_file_set_error_obj(f, ret < 0 ? ret : -EIO, local_error);
+    }
+
+    /*
+     * always return 0 - don't use task error handling, relay on
+     * qemu file error handling
+     */
+    return 0;
+}
+
+static void qemu_file_switch_current_buf(QEMUFile *f)
+{
+    /*
+     * if the list is empty, wait until some task returns a buffer
+     * to the list of free buffers.
+     */
+    if (QLIST_EMPTY(&f->free_buffers)) {
+        aio_task_pool_wait_slot(f->pool);
+    }
+
+    /*
+     * sanity check that the list isn't empty
+     * if the free list was empty, we waited for a task complition,
+     * and the pompleted task must return a buffer to a list of free buffers
+     */
+    assert(!QLIST_EMPTY(&f->free_buffers));
+
+    /* set the current buffer for using from the free list */
+    f->current_buf = QLIST_FIRST(&f->free_buffers);
+    reset_buf(f);
+
+    QLIST_REMOVE(f->current_buf, link);
+}
+
+/**
+ *  Asynchronously flushes QEMUFile buffer
+ *
+ * This will flush all pending data. If data was only partially flushed, it
+ * will set an error state. The function may return before the data actually
+ * written.
+ */
+static void flush_buffer(QEMUFile *f)
+{
+    QEMUFileAioTask *t = g_new(QEMUFileAioTask, 1);
+
+    *t = (QEMUFileAioTask) {
+        .task.func = &write_task_fn,
+        .f = f,
+        .fb = f->current_buf,
+    };
+
+    /* aio_task_pool should free t for us */
+    aio_task_pool_start_task(f->pool, (AioTask *) t);
+
+    /* if no errors this will switch the buffer */
+    qemu_file_switch_current_buf(f);
+}
+
 /**
  * Flushes QEMUFile buffer
  *
@@ -241,7 +438,13 @@ void qemu_fflush(QEMUFile *f)
     if (f->shutdown) {
         return;
     }
+
+    if (f->buffered_mode) {
+        return;
+    }
+
     if (fb->iovcnt > 0) {
+        /* this is non-buffered mode */
         expect = iov_size(fb->iov, fb->iovcnt);
         ret = f->ops->writev_buffer(f->opaque, fb->iov, fb->iovcnt, f->pos,
                                     &local_error);
@@ -378,6 +581,7 @@ static ssize_t qemu_fill_buffer(QEMUFile *f)
 
 void qemu_update_position(QEMUFile *f, size_t size)
 {
+    assert(!f->buffered_mode);
     f->pos += size;
 }
 
@@ -392,7 +596,18 @@ void qemu_update_position(QEMUFile *f, size_t size)
 int qemu_fclose(QEMUFile *f)
 {
     int ret;
-    qemu_fflush(f);
+
+    if (qemu_file_is_writable(f) && f->buffered_mode) {
+        ret = qemu_file_get_error(f);
+        if (!ret) {
+            flush_buffer(f);
+        }
+        /* wait until all tasks are done */
+        aio_task_pool_wait_all(f->pool);
+    } else {
+        qemu_fflush(f);
+    }
+
     ret = qemu_file_get_error(f);
 
     if (f->ops->close) {
@@ -408,16 +623,77 @@ int qemu_fclose(QEMUFile *f)
         ret = f->last_error;
     }
     error_free(f->last_error_obj);
-    g_free(f->current_buf->buf);
-    g_free(f->current_buf->iov);
-    g_free(f->current_buf->may_free);
-    g_free(f->current_buf);
+
+    if (f->buffered_mode) {
+        QEMUFileBuffer *fb, *next;
+        /*
+         * put the current back to the free buffers list
+         * to destroy all the buffers in one loop
+         */
+        QLIST_INSERT_HEAD(&f->free_buffers, f->current_buf, link);
+
+        /* destroy all the buffers */
+        QLIST_FOREACH_SAFE(fb, &f->free_buffers, link, next) {
+            QLIST_REMOVE(fb, link);
+            /* looks like qemu_vfree pairs with qemu_memalign */
+            qemu_vfree(fb->buf);
+            g_free(fb);
+        }
+        g_free(f->pool);
+    } else {
+        g_free(f->current_buf->buf);
+        g_free(f->current_buf->iov);
+        g_free(f->current_buf->may_free);
+        g_free(f->current_buf);
+    }
+
     g_free(f);
     trace_qemu_file_fclose();
     return ret;
 }
 
 /*
+ * Copy an external buffer to the intenal current buffer.
+ */
+static void copy_buf(QEMUFile *f, const uint8_t *buf, size_t size,
+                     bool may_free)
+{
+    size_t data_size = size;
+    const uint8_t *src_ptr = buf;
+
+    assert(f->buffered_mode);
+    assert(size <= INT_MAX);
+
+    while (data_size > 0) {
+        size_t chunk_size;
+
+        if (buf_is_full(f)) {
+            flush_buffer(f);
+            if (qemu_file_get_error(f)) {
+                return;
+            }
+        }
+
+        chunk_size = MIN(get_buf_free_size(f), data_size);
+
+        memcpy(get_buf_ptr(f), src_ptr, chunk_size);
+
+        advance_buf_ptr(f, chunk_size);
+
+        src_ptr += chunk_size;
+        data_size -= chunk_size;
+        f->bytes_xfer += chunk_size;
+    }
+
+    if (may_free) {
+        if (qemu_madvise((void *) buf, size, QEMU_MADV_DONTNEED) < 0) {
+            error_report("migrate: madvise DONTNEED failed %p %zd: %s",
+                         buf, size, strerror(errno));
+        }
+    }
+}
+
+/*
  * Add buf to iovec. Do flush if iovec is full.
  *
  * Return values:
@@ -454,6 +730,9 @@ static int add_to_iovec(QEMUFile *f, const uint8_t *buf, size_t size,
 static void add_buf_to_iovec(QEMUFile *f, size_t len)
 {
     QEMUFileBuffer *fb = f->current_buf;
+
+    assert(!f->buffered_mode);
+
     if (!add_to_iovec(f, fb->buf + fb->buf_index, len, false)) {
         fb->buf_index += len;
         if (fb->buf_index == IO_BUF_SIZE) {
@@ -469,8 +748,12 @@ void qemu_put_buffer_async(QEMUFile *f, const uint8_t *buf, size_t size,
         return;
     }
 
-    f->bytes_xfer += size;
-    add_to_iovec(f, buf, size, may_free);
+    if (f->buffered_mode) {
+        copy_buf(f, buf, size, may_free);
+    } else {
+        f->bytes_xfer += size;
+        add_to_iovec(f, buf, size, may_free);
+    }
 }
 
 void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
@@ -482,6 +765,11 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
         return;
     }
 
+    if (f->buffered_mode) {
+        copy_buf(f, buf, size, false);
+        return;
+    }
+
     while (size > 0) {
         l = IO_BUF_SIZE - fb->buf_index;
         if (l > size) {
@@ -506,15 +794,21 @@ void qemu_put_byte(QEMUFile *f, int v)
         return;
     }
 
-    fb->buf[fb->buf_index] = v;
-    f->bytes_xfer++;
-    add_buf_to_iovec(f, 1);
+    if (f->buffered_mode) {
+        copy_buf(f, (const uint8_t *) &v, 1, false);
+    } else {
+        fb->buf[fb->buf_index] = v;
+        add_buf_to_iovec(f, 1);
+        f->bytes_xfer++;
+    }
 }
 
 void qemu_file_skip(QEMUFile *f, int size)
 {
     QEMUFileBuffer *fb = f->current_buf;
 
+    assert(!f->buffered_mode);
+
     if (fb->buf_index + size <= fb->buf_size) {
         fb->buf_index += size;
     }
@@ -672,10 +966,14 @@ int64_t qemu_ftell_fast(QEMUFile *f)
 {
     int64_t ret = f->pos;
     int i;
-    QEMUFileBuffer *fb = f->current_buf;
 
-    for (i = 0; i < fb->iovcnt; i++) {
-        ret += fb->iov[i].iov_len;
+    if (f->buffered_mode) {
+        ret += get_buf_used_size(f);
+    } else {
+        QEMUFileBuffer *fb = f->current_buf;
+        for (i = 0; i < fb->iovcnt; i++) {
+            ret += fb->iov[i].iov_len;
+        }
     }
 
     return ret;
@@ -683,8 +981,12 @@ int64_t qemu_ftell_fast(QEMUFile *f)
 
 int64_t qemu_ftell(QEMUFile *f)
 {
-    qemu_fflush(f);
-    return f->pos;
+    if (f->buffered_mode) {
+        return qemu_ftell_fast(f);
+    } else {
+        qemu_fflush(f);
+        return f->pos;
+    }
 }
 
 int qemu_file_rate_limit(QEMUFile *f)
@@ -803,6 +1105,8 @@ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
     QEMUFileBuffer *fb = f->current_buf;
     ssize_t blen = IO_BUF_SIZE - fb->buf_index - sizeof(int32_t);
 
+    assert(!f->buffered_mode);
+
     if (blen < compressBound(size)) {
         return -1;
     }
@@ -827,6 +1131,9 @@ int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src)
     int len = 0;
     QEMUFileBuffer *fb_src = f_src->current_buf;
 
+    assert(!f_des->buffered_mode);
+    assert(!f_src->buffered_mode);
+
     if (fb_src->buf_index > 0) {
         len = fb_src->buf_index;
         qemu_put_buffer(f_des, fb_src->buf, fb_src->buf_index);
diff --git a/migration/qemu-file.h b/migration/qemu-file.h
index a9b6d6c..08655d2 100644
--- a/migration/qemu-file.h
+++ b/migration/qemu-file.h
@@ -103,6 +103,14 @@ typedef QEMUFile *(QEMURetPathFunc)(void *opaque);
 typedef int (QEMUFileShutdownFunc)(void *opaque, bool rd, bool wr,
                                    Error **errp);
 
+/*
+ * Enables or disables the buffered mode
+ * Existing blocking reads/writes must be woken
+ * Returns true if the buffered mode has to be enabled,
+ * false if it has to be disabled.
+ */
+typedef bool (QEMUFileEnableBufferedFunc)(void *opaque);
+
 typedef struct QEMUFileOps {
     QEMUFileGetBufferFunc *get_buffer;
     QEMUFileCloseFunc *close;
@@ -110,6 +118,7 @@ typedef struct QEMUFileOps {
     QEMUFileWritevBufferFunc *writev_buffer;
     QEMURetPathFunc *get_return_path;
     QEMUFileShutdownFunc *shut_down;
+    QEMUFileEnableBufferedFunc *enable_buffered;
 } QEMUFileOps;
 
 typedef struct QEMUFileHooks {
-- 
1.8.3.1



^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [RFC patch v1 3/3] migration/savevm: use qemu-file buffered mode for non-cached bdrv
  2020-04-13 11:12 [RFC patch v1 0/3] qemu-file writing performance improving Denis Plotnikov
  2020-04-13 11:12 ` [RFC patch v1 1/3] qemu-file: introduce current buffer Denis Plotnikov
  2020-04-13 11:12 ` [RFC patch v1 2/3] qemu-file: add buffered mode Denis Plotnikov
@ 2020-04-13 11:12 ` Denis Plotnikov
  2020-04-13 12:10 ` [RFC patch v1 0/3] qemu-file writing performance improving Denis V. Lunev
                   ` (2 subsequent siblings)
  5 siblings, 0 replies; 19+ messages in thread
From: Denis Plotnikov @ 2020-04-13 11:12 UTC (permalink / raw)
  To: qemu-devel; +Cc: den, dgilbert, quintela

This makes internal snapshots of HDD placed qcow2 images opened with
O_DIRECT flag 4 times faster.

The test:
   creates 500M internal snapshot for a cow2 image placed on HDD
Result times:
   with the patch: ~6 sec
   without patch: ~24 sec

This happens because the internal snapshot saving produces a lot of
pwrites, because of flushing the internal buffers with non-aligned
io vectors and direct calling qemu_fflush.

To fix it, we introduce an internal pointer and size aligned buffer.
The most of the time the buffer is flushed only when it's full regardless
of direct calling qemu_fflush. When the buffer is full, it is written
asynchronously.

This gives us a cople of advantages leading to performance improvement:

1. beacause of pointer and size aligned buffers we can use asynchronous
   os write syscall, like io_submit
2. when some buffer is being written, another buffer is filled with
   data.

Signed-off-by: Denis Plotnikov <dplotnikov@virtuozzo.com>
---
 migration/savevm.c | 38 ++++++++++++++++++++++++++++++++++++--
 1 file changed, 36 insertions(+), 2 deletions(-)

diff --git a/migration/savevm.c b/migration/savevm.c
index c00a680..db0cac9 100644
--- a/migration/savevm.c
+++ b/migration/savevm.c
@@ -63,6 +63,7 @@
 #include "migration/colo.h"
 #include "qemu/bitmap.h"
 #include "net/announce.h"
+#include "block/block_int.h"
 
 const unsigned int postcopy_ram_discard_version = 0;
 
@@ -153,6 +154,12 @@ static int bdrv_fclose(void *opaque, Error **errp)
     return bdrv_flush(opaque);
 }
 
+static bool qemu_file_is_buffered(void *opaque)
+{
+    BlockDriverState *bs = (BlockDriverState *) opaque;
+    return !!(bs->open_flags & BDRV_O_NOCACHE);
+}
+
 static const QEMUFileOps bdrv_read_ops = {
     .get_buffer = block_get_buffer,
     .close =      bdrv_fclose
@@ -160,7 +167,8 @@ static const QEMUFileOps bdrv_read_ops = {
 
 static const QEMUFileOps bdrv_write_ops = {
     .writev_buffer  = block_writev_buffer,
-    .close          = bdrv_fclose
+    .close          = bdrv_fclose,
+    .enable_buffered = qemu_file_is_buffered
 };
 
 static QEMUFile *qemu_fopen_bdrv(BlockDriverState *bs, int is_writable)
@@ -2624,7 +2632,7 @@ int qemu_load_device_state(QEMUFile *f)
     return 0;
 }
 
-int save_snapshot(const char *name, Error **errp)
+static int coroutine_fn save_snapshot_fn(const char *name, Error **errp)
 {
     BlockDriverState *bs, *bs1;
     QEMUSnapshotInfo sn1, *sn = &sn1, old_sn1, *old_sn = &old_sn1;
@@ -2747,6 +2755,32 @@ int save_snapshot(const char *name, Error **errp)
     return ret;
 }
 
+ typedef struct SaveVMParams {
+     const char *name;
+     Error **errp;
+     int ret;
+ } SaveVMParams;
+
+static void coroutine_fn save_snapshot_entry(void *opaque)
+{
+    SaveVMParams *p = (SaveVMParams *) opaque;
+    p->ret = save_snapshot_fn(p->name, p->errp);
+}
+
+int save_snapshot(const char *name, Error **errp)
+{
+    SaveVMParams p = (SaveVMParams) {
+        .name = name,
+        .errp = errp,
+        .ret = -EINPROGRESS,
+    };
+
+    Coroutine *co = qemu_coroutine_create(save_snapshot_entry, &p);
+    aio_co_enter(qemu_get_aio_context(), co);
+    AIO_WAIT_WHILE(qemu_get_aio_context(), p.ret == -EINPROGRESS);
+    return p.ret;
+}
+
 void qmp_xen_save_devices_state(const char *filename, bool has_live, bool live,
                                 Error **errp)
 {
-- 
1.8.3.1



^ permalink raw reply related	[flat|nested] 19+ messages in thread

* Re: [RFC patch v1 0/3] qemu-file writing performance improving
  2020-04-13 11:12 [RFC patch v1 0/3] qemu-file writing performance improving Denis Plotnikov
                   ` (2 preceding siblings ...)
  2020-04-13 11:12 ` [RFC patch v1 3/3] migration/savevm: use qemu-file buffered mode for non-cached bdrv Denis Plotnikov
@ 2020-04-13 12:10 ` Denis V. Lunev
  2020-04-13 13:51 ` no-reply
  2020-04-21  8:13 ` Denis Plotnikov
  5 siblings, 0 replies; 19+ messages in thread
From: Denis V. Lunev @ 2020-04-13 12:10 UTC (permalink / raw)
  To: Denis Plotnikov, qemu-devel; +Cc: dgilbert, quintela

On 4/13/20 2:12 PM, Denis Plotnikov wrote:
> Problem description: qcow2 internal snapshot saving time is too big on HDD ~ 25 sec
>
> When a qcow2 image is placed on a regular HDD and the image is openned with
> O_DIRECT the snapshot saving time is around 26 sec.
> The snapshot saving time can be 4 times sorter.
> The patch series propose the way to achive that. 
>
> Why is the saving time = ~25 sec?
>
> There are three things:
> 1. qemu-file iov limit (currently 64)
> 2. direct qemu_fflush calls, inducing disk writings
in a non-aligned way, which results further in READ-MODIFY-WRITE
operations at the beginning and at the end of the writing data.
Within synchronous operations this slow-downs the process a lot!

> 3. ram data copying and synchronous disk wrtings
>
> When 1, 2 are quite clear, the 3rd needs some explaination:
>
> Internal snapshot uses qemu-file as an interface to store the data with
> stream semantics.
> qemu-file avoids data coping when possible (mostly for ram data)
> and use iovectors to propagate the data to an undelying block driver state.
> In the case when qcow2 openned with O_DIRECT it is suboptimal.
>
> This is what happens: on writing, when the iovectors query goes from qemu-file
> to bdrv (here and further by brdv I mean qcow2 with posix O_DIRECT openned backend),
> the brdv checks all iovectors to be base and size aligned, if it's not the case,
> the data copied to an internal buffer and synchronous pwrite is called.
> If the iovectors are aligned, io_submit is called.
>
> In our case, snapshot almost always induces pwrite, since we never have all
> the iovectors aligned in the query, because of frequent adding a short iovector:
> 8 byte ram-page delimiters, after adding each ram page iovector.
>
> So the qemu-file code in this case:
> 1. doesn't aviod ram copying
> 2. works fully synchronously
>
> How to improve the snapshot time:
>
> 1. easy way: to increase iov limit to IOV_MAX (1024).
> This will reduce synchronous writing frequency.
> My test revealed that with iov limit = IOV_MAX the snapshot time *~12 sec*.
>
> 2. complex way: do writings asynchronously.
> Introduce both base- and size-aligned buffer, write the data only when
> the buffer is full, write the buffer asynchronously, meanwhile filling another
> buffer with snapshot data.
> My test revealed that this complex way provides the snapshot time *~6 sec*,
> 2 times better than just iov limit increasing.

We also align written data as flush operations over the disk
are not mandatory.

> The patch proposes how to improve the snapshot performance in the complex way,
> allowing to use the asyncronous writings when needed.
>
> This is an RFC series, as I didn't confident that I fully understand all
> qemu-file use cases. I tried to make the series in a safe way to not break
> anything related to qemu-file using in other places, like migration.
>
> All comments are *VERY* appriciated!
>
> Thanks,
> Denis
>
> Denis Plotnikov (3):
>   qemu-file: introduce current buffer
>   qemu-file: add buffered mode
>   migration/savevm: use qemu-file buffered mode for non-cached bdrv
>
>  include/qemu/typedefs.h |   2 +
>  migration/qemu-file.c   | 479 +++++++++++++++++++++++++++++++++++++++++-------
>  migration/qemu-file.h   |   9 +
>  migration/savevm.c      |  38 +++-
>  4 files changed, 456 insertions(+), 72 deletions(-)
>



^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [RFC patch v1 0/3] qemu-file writing performance improving
  2020-04-13 11:12 [RFC patch v1 0/3] qemu-file writing performance improving Denis Plotnikov
                   ` (3 preceding siblings ...)
  2020-04-13 12:10 ` [RFC patch v1 0/3] qemu-file writing performance improving Denis V. Lunev
@ 2020-04-13 13:51 ` no-reply
  2020-04-21  8:13 ` Denis Plotnikov
  5 siblings, 0 replies; 19+ messages in thread
From: no-reply @ 2020-04-13 13:51 UTC (permalink / raw)
  To: dplotnikov; +Cc: den, quintela, qemu-devel, dgilbert

Patchew URL: https://patchew.org/QEMU/1586776334-641239-1-git-send-email-dplotnikov@virtuozzo.com/



Hi,

This series failed the asan build test. Please find the testing commands and
their output below. If you have Docker installed, you can probably reproduce it
locally.

=== TEST SCRIPT BEGIN ===
#!/bin/bash
export ARCH=x86_64
make docker-image-fedora V=1 NETWORK=1
time make docker-test-debug@fedora TARGET_LIST=x86_64-softmmu J=14 NETWORK=1
=== TEST SCRIPT END ===

/tmp/qemu-test/src/migration/qemu-file.c:415: undefined reference to `aio_task_pool_start_task'
/usr/bin/ld: migration/qemu-file.o: in function `qemu_file_switch_current_buf':
/tmp/qemu-test/src/migration/qemu-file.c:380: undefined reference to `aio_task_pool_wait_slot'
clang-8: error: linker command failed with exit code 1 (use -v to see invocation)
make: *** [/tmp/qemu-test/src/rules.mak:124: tests/test-vmstate] Error 1
make: *** Waiting for unfinished jobs....
Traceback (most recent call last):
  File "./tests/docker/docker.py", line 664, in <module>
---
    raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['sudo', '-n', 'docker', 'run', '--label', 'com.qemu.instance.uuid=305548e9be434ddeb3a06c706e9dab1a', '-u', '1001', '--security-opt', 'seccomp=unconfined', '--rm', '-e', 'TARGET_LIST=x86_64-softmmu', '-e', 'EXTRA_CONFIGURE_OPTS=', '-e', 'V=', '-e', 'J=14', '-e', 'DEBUG=', '-e', 'SHOW_ENV=', '-e', 'CCACHE_DIR=/var/tmp/ccache', '-v', '/home/patchew/.cache/qemu-docker-ccache:/var/tmp/ccache:z', '-v', '/var/tmp/patchew-tester-tmp-aounzkok/src/docker-src.2020-04-13-09.44.50.27035:/var/tmp/qemu:z,ro', 'qemu:fedora', '/var/tmp/qemu/run', 'test-debug']' returned non-zero exit status 2.
filter=--filter=label=com.qemu.instance.uuid=305548e9be434ddeb3a06c706e9dab1a
make[1]: *** [docker-run] Error 1
make[1]: Leaving directory `/var/tmp/patchew-tester-tmp-aounzkok/src'
make: *** [docker-run-test-debug@fedora] Error 2

real    6m39.403s
user    0m9.069s


The full log is available at
http://patchew.org/logs/1586776334-641239-1-git-send-email-dplotnikov@virtuozzo.com/testing.asan/?type=message.
---
Email generated automatically by Patchew [https://patchew.org/].
Please send your feedback to patchew-devel@redhat.com

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [RFC patch v1 0/3] qemu-file writing performance improving
  2020-04-13 11:12 [RFC patch v1 0/3] qemu-file writing performance improving Denis Plotnikov
                   ` (4 preceding siblings ...)
  2020-04-13 13:51 ` no-reply
@ 2020-04-21  8:13 ` Denis Plotnikov
  5 siblings, 0 replies; 19+ messages in thread
From: Denis Plotnikov @ 2020-04-21  8:13 UTC (permalink / raw)
  To: qemu-devel; +Cc: den, dgilbert, quintela

Ping!

On 13.04.2020 14:12, Denis Plotnikov wrote:
> Problem description: qcow2 internal snapshot saving time is too big on HDD ~ 25 sec
>
> When a qcow2 image is placed on a regular HDD and the image is openned with
> O_DIRECT the snapshot saving time is around 26 sec.
> The snapshot saving time can be 4 times sorter.
> The patch series propose the way to achive that.
>
> Why is the saving time = ~25 sec?
>
> There are three things:
> 1. qemu-file iov limit (currently 64)
> 2. direct qemu_fflush calls, inducing disk writings
> 3. ram data copying and synchronous disk wrtings
>
> When 1, 2 are quite clear, the 3rd needs some explaination:
>
> Internal snapshot uses qemu-file as an interface to store the data with
> stream semantics.
> qemu-file avoids data coping when possible (mostly for ram data)
> and use iovectors to propagate the data to an undelying block driver state.
> In the case when qcow2 openned with O_DIRECT it is suboptimal.
>
> This is what happens: on writing, when the iovectors query goes from qemu-file
> to bdrv (here and further by brdv I mean qcow2 with posix O_DIRECT openned backend),
> the brdv checks all iovectors to be base and size aligned, if it's not the case,
> the data copied to an internal buffer and synchronous pwrite is called.
> If the iovectors are aligned, io_submit is called.
>
> In our case, snapshot almost always induces pwrite, since we never have all
> the iovectors aligned in the query, because of frequent adding a short iovector:
> 8 byte ram-page delimiters, after adding each ram page iovector.
>
> So the qemu-file code in this case:
> 1. doesn't aviod ram copying
> 2. works fully synchronously
>
> How to improve the snapshot time:
>
> 1. easy way: to increase iov limit to IOV_MAX (1024).
> This will reduce synchronous writing frequency.
> My test revealed that with iov limit = IOV_MAX the snapshot time *~12 sec*.
>
> 2. complex way: do writings asynchronously.
> Introduce both base- and size-aligned buffer, write the data only when
> the buffer is full, write the buffer asynchronously, meanwhile filling another
> buffer with snapshot data.
> My test revealed that this complex way provides the snapshot time *~6 sec*,
> 2 times better than just iov limit increasing.
>
> The patch proposes how to improve the snapshot performance in the complex way,
> allowing to use the asyncronous writings when needed.
>
> This is an RFC series, as I didn't confident that I fully understand all
> qemu-file use cases. I tried to make the series in a safe way to not break
> anything related to qemu-file using in other places, like migration.
>
> All comments are *VERY* appriciated!
>
> Thanks,
> Denis
>
> Denis Plotnikov (3):
>    qemu-file: introduce current buffer
>    qemu-file: add buffered mode
>    migration/savevm: use qemu-file buffered mode for non-cached bdrv
>
>   include/qemu/typedefs.h |   2 +
>   migration/qemu-file.c   | 479 +++++++++++++++++++++++++++++++++++++++++-------
>   migration/qemu-file.h   |   9 +
>   migration/savevm.c      |  38 +++-
>   4 files changed, 456 insertions(+), 72 deletions(-)
>



^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [RFC patch v1 1/3] qemu-file: introduce current buffer
  2020-04-13 11:12 ` [RFC patch v1 1/3] qemu-file: introduce current buffer Denis Plotnikov
@ 2020-04-24 17:47   ` Vladimir Sementsov-Ogievskiy
  2020-04-24 21:12   ` Eric Blake
  1 sibling, 0 replies; 19+ messages in thread
From: Vladimir Sementsov-Ogievskiy @ 2020-04-24 17:47 UTC (permalink / raw)
  To: Denis Plotnikov, qemu-devel; +Cc: den, dgilbert, quintela

13.04.2020 14:12, Denis Plotnikov wrote:
> To approach async wrtiting in the further commits, the buffer
> allocated in QEMUFile struct is replaced with the link to the
> current buffer. We're going to use many buffers to write the
> qemu file stream to the unerlying storage asynchronously. The
> current buffer points out to the buffer is currently filled
> with data.
> 
> This patch doesn't add any features to qemu-file and doesn't
> change any qemu-file behavior.
> 
> Signed-off-by: Denis Plotnikov<dplotnikov@virtuozzo.com>

Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>

-- 
Best regards,
Vladimir


^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [RFC patch v1 1/3] qemu-file: introduce current buffer
  2020-04-13 11:12 ` [RFC patch v1 1/3] qemu-file: introduce current buffer Denis Plotnikov
  2020-04-24 17:47   ` Vladimir Sementsov-Ogievskiy
@ 2020-04-24 21:12   ` Eric Blake
  1 sibling, 0 replies; 19+ messages in thread
From: Eric Blake @ 2020-04-24 21:12 UTC (permalink / raw)
  To: Denis Plotnikov, qemu-devel; +Cc: den, dgilbert, quintela

On 4/13/20 6:12 AM, Denis Plotnikov wrote:
> To approach async wrtiting in the further commits, the buffer

writing

> allocated in QEMUFile struct is replaced with the link to the
> current buffer. We're going to use many buffers to write the
> qemu file stream to the unerlying storage asynchronously. The

underlying

> current buffer points out to the buffer is currently filled
> with data.

This sentence is confusing.  I'd suggest: The current_buf pointer tracks 
which buffer is currently filled with data.

> 
> This patch doesn't add any features to qemu-file and doesn't
> change any qemu-file behavior.
> 
> Signed-off-by: Denis Plotnikov <dplotnikov@virtuozzo.com>
> ---
>   include/qemu/typedefs.h |   1 +
>   migration/qemu-file.c   | 156 +++++++++++++++++++++++++++++-------------------
>   2 files changed, 95 insertions(+), 62 deletions(-)
> 
> diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
> index 375770a..88dce54 100644
> --- a/include/qemu/typedefs.h
> +++ b/include/qemu/typedefs.h
> @@ -97,6 +97,7 @@ typedef struct QDict QDict;
>   typedef struct QEMUBH QEMUBH;
>   typedef struct QemuConsole QemuConsole;
>   typedef struct QEMUFile QEMUFile;
> +typedef struct QEMUFileBuffer QEMUFileBuffer;
>   typedef struct QemuLockable QemuLockable;
>   typedef struct QemuMutex QemuMutex;
>   typedef struct QemuOpt QemuOpt;
> diff --git a/migration/qemu-file.c b/migration/qemu-file.c
> index 1c3a358..285c6ef 100644
> --- a/migration/qemu-file.c
> +++ b/migration/qemu-file.c
> @@ -33,6 +33,17 @@
>   #define IO_BUF_SIZE 32768
>   #define MAX_IOV_SIZE MIN(IOV_MAX, 64)
>   
> +QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, 512));
> +
> +struct QEMUFileBuffer {
> +    int buf_index;
> +    int buf_size; /* 0 when writing */
> +    uint8_t *buf;
> +    unsigned long *may_free;

Do we really want something that compiles differently on 32- vs. 64-bit?
/me reads ahead...

> +    struct iovec *iov;
> +    unsigned int iovcnt;
> +};
> +
>   struct QEMUFile {
>       const QEMUFileOps *ops;
>       const QEMUFileHooks *hooks;
> @@ -43,18 +54,12 @@ struct QEMUFile {
>   
>       int64_t pos; /* start of buffer when writing, end of buffer
>                       when reading */
> -    int buf_index;
> -    int buf_size; /* 0 when writing */
> -    uint8_t buf[IO_BUF_SIZE];
> -
> -    DECLARE_BITMAP(may_free, MAX_IOV_SIZE);

...ah, you're replacing a bitmap, and our bitmap code _does_ use 'long' 
as its core for optimum speed (which overcomes the fact that it does 
mean annoying differences on 32- vs. 64-bit).

> -    struct iovec iov[MAX_IOV_SIZE];
> -    unsigned int iovcnt;
> -
>       int last_error;
>       Error *last_error_obj;
>       /* has the file has been shutdown */
>       bool shutdown;
> +    /* currently used buffer */
> +    QEMUFileBuffer *current_buf;
>   };
>   

Most of the patch is mechanical conversion to the rearranged struct.

Reviewed-by: Eric Blake <eblake@redhat.com>

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3226
Virtualization:  qemu.org | libvirt.org



^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [RFC patch v1 2/3] qemu-file: add buffered mode
  2020-04-13 11:12 ` [RFC patch v1 2/3] qemu-file: add buffered mode Denis Plotnikov
@ 2020-04-24 21:25   ` Eric Blake
  2020-04-27  8:21     ` Denis Plotnikov
  2020-04-25  9:10   ` Vladimir Sementsov-Ogievskiy
  2020-04-27 12:14   ` Dr. David Alan Gilbert
  2 siblings, 1 reply; 19+ messages in thread
From: Eric Blake @ 2020-04-24 21:25 UTC (permalink / raw)
  To: Denis Plotnikov, qemu-devel; +Cc: den, dgilbert, quintela

On 4/13/20 6:12 AM, Denis Plotnikov wrote:
> The patch adds ability to qemu-file to write the data
> asynchronously to improve the performance on writing.
> Before, only synchronous writing was supported.
> 
> Enabling of the asyncronous mode is managed by new

asynchronous

> "enabled_buffered" callback.

The term "enabled_buffered" does not appear in the patch.  Did you mean...

> 
> Signed-off-by: Denis Plotnikov <dplotnikov@virtuozzo.com>
> ---
>   include/qemu/typedefs.h |   1 +
>   migration/qemu-file.c   | 351 +++++++++++++++++++++++++++++++++++++++++++++---
>   migration/qemu-file.h   |   9 ++
>   3 files changed, 339 insertions(+), 22 deletions(-)
> 

> @@ -60,6 +66,22 @@ struct QEMUFile {
>       bool shutdown;
>       /* currently used buffer */
>       QEMUFileBuffer *current_buf;
> +    /*
> +     * with buffered_mode enabled all the data copied to 512 byte
> +     * aligned buffer, including iov data. Then the buffer is passed
> +     * to writev_buffer callback.
> +     */
> +    bool buffered_mode;

..."Asynchronous mode is managed by setting the new buffered_mode flag"? 
  ...


> +    /* for async buffer writing */
> +    AioTaskPool *pool;
> +    /* the list of free buffers, currently used on is NOT there */

s/on/one/

> +    QLIST_HEAD(, QEMUFileBuffer) free_buffers;
> +};
> +
> +struct QEMUFileAioTask {
> +    AioTask task;
> +    QEMUFile *f;
> +    QEMUFileBuffer *fb;
>   };
>   
>   /*
> @@ -115,10 +137,42 @@ QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops)
>       f->opaque = opaque;
>       f->ops = ops;
>   
> -    f->current_buf = g_new0(QEMUFileBuffer, 1);
> -    f->current_buf->buf = g_malloc(IO_BUF_SIZE);
> -    f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
> -    f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
> +    if (f->ops->enable_buffered) {
> +        f->buffered_mode = f->ops->enable_buffered(f->opaque);

...ah, you meant 'enable_buffered'.  But still, why do we need a 
callback function?  Is it not sufficient to just have a bool flag?


> +static size_t get_buf_free_size(QEMUFile *f)
> +{
> +    QEMUFileBuffer *fb = f->current_buf;
> +    /* buf_index can't be greated than buf_size */

greater

> +    assert(fb->buf_size >= fb->buf_index);
> +    return fb->buf_size - fb->buf_index;
> +}
> +

> +static int write_task_fn(AioTask *task)
> +{

> +    /*
> +     * Increment file position.
> +     * This needs to be here before calling writev_buffer, because
> +     * writev_buffer is asynchronous and there could be more than one
> +     * writev_buffer started simultaniously. Each writev_buffer should

simultaneously

> +     * use its own file pos to write to. writev_buffer may write less
> +     * than buf_index bytes but we treat this situation as an error.
> +     * If error appeared, further file using is meaningless.

s/using/use/

> +     * We expect that, the most of the time the full buffer is written,
> +     * (when buf_size == buf_index). The only case when the non-full
> +     * buffer is written (buf_size != buf_index) is file close,
> +     * when we need to flush the rest of the buffer content.

We expect that most of the time, the full buffer will be written 
(buf_size == buf_index), with the exception at file close where we need 
to flush the final partial buffer.

> +     */
> +    f->pos += fb->buf_index;
> +
> +    ret = f->ops->writev_buffer(f->opaque, &v, 1, pos, &local_error);
> +
> +    /* return the just written buffer to the free list */
> +    QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
> +
> +    /* check that we have written everything */
> +    if (ret != fb->buf_index) {
> +        qemu_file_set_error_obj(f, ret < 0 ? ret : -EIO, local_error);
> +    }
> +
> +    /*
> +     * always return 0 - don't use task error handling, relay on

rely

> +     * qemu file error handling
> +     */
> +    return 0;
> +}
> +
> +static void qemu_file_switch_current_buf(QEMUFile *f)
> +{
> +    /*
> +     * if the list is empty, wait until some task returns a buffer
> +     * to the list of free buffers.
> +     */
> +    if (QLIST_EMPTY(&f->free_buffers)) {
> +        aio_task_pool_wait_slot(f->pool);
> +    }
> +
> +    /*
> +     * sanity check that the list isn't empty
> +     * if the free list was empty, we waited for a task complition,

completion

> +     * and the pompleted task must return a buffer to a list of free buffers

completed

> +     */
> +    assert(!QLIST_EMPTY(&f->free_buffers));
> +
> +    /* set the current buffer for using from the free list */
> +    f->current_buf = QLIST_FIRST(&f->free_buffers);
> +    reset_buf(f);
> +
> +    QLIST_REMOVE(f->current_buf, link);
> +}
> +

>   
>   /*
> + * Copy an external buffer to the intenal current buffer.

internal

> + */
> +static void copy_buf(QEMUFile *f, const uint8_t *buf, size_t size,
> +                     bool may_free)
> +{

> +++ b/migration/qemu-file.h
> @@ -103,6 +103,14 @@ typedef QEMUFile *(QEMURetPathFunc)(void *opaque);
>   typedef int (QEMUFileShutdownFunc)(void *opaque, bool rd, bool wr,
>                                      Error **errp);
>   
> +/*
> + * Enables or disables the buffered mode
> + * Existing blocking reads/writes must be woken
> + * Returns true if the buffered mode has to be enabled,
> + * false if it has to be disabled.
> + */
> +typedef bool (QEMUFileEnableBufferedFunc)(void *opaque);

If this never gets called outside of initial creation of the QemuFile 
(that is, it is not dynamic), then making it a straight flag instead of 
a callback function is simpler.

> +
>   typedef struct QEMUFileOps {
>       QEMUFileGetBufferFunc *get_buffer;
>       QEMUFileCloseFunc *close;
> @@ -110,6 +118,7 @@ typedef struct QEMUFileOps {
>       QEMUFileWritevBufferFunc *writev_buffer;
>       QEMURetPathFunc *get_return_path;
>       QEMUFileShutdownFunc *shut_down;
> +    QEMUFileEnableBufferedFunc *enable_buffered;
>   } QEMUFileOps;
>   
>   typedef struct QEMUFileHooks {
> 

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3226
Virtualization:  qemu.org | libvirt.org



^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [RFC patch v1 2/3] qemu-file: add buffered mode
  2020-04-13 11:12 ` [RFC patch v1 2/3] qemu-file: add buffered mode Denis Plotnikov
  2020-04-24 21:25   ` Eric Blake
@ 2020-04-25  9:10   ` Vladimir Sementsov-Ogievskiy
  2020-04-27  8:19     ` Denis Plotnikov
  2020-04-27 12:14   ` Dr. David Alan Gilbert
  2 siblings, 1 reply; 19+ messages in thread
From: Vladimir Sementsov-Ogievskiy @ 2020-04-25  9:10 UTC (permalink / raw)
  To: Denis Plotnikov, qemu-devel; +Cc: den, dgilbert, quintela

13.04.2020 14:12, Denis Plotnikov wrote:
> The patch adds ability to qemu-file to write the data
> asynchronously to improve the performance on writing.
> Before, only synchronous writing was supported.
> 
> Enabling of the asyncronous mode is managed by new
> "enabled_buffered" callback.

Hmm.

I don't like resulting architecture very much:

1. Function naming is not clean: how can I understand that copy_buf is for buffered mode when add_to_iovec is +- same thing for non-buffered mode?

Hmm actually, you just alter several significant functions of QEMUFile - open, close, put, flush. In old mode we do one thing, in a new mode - absolutely another. This looks like a driver. So may be we want to add QEMUFileDriver struct, to define these functions as callbacks, move old realizations to default driver, and add new functionality as a new driver, what do you think?

2. Terminology: you say you add buffered mode, but actually qemu file is already work in buffered mode, so it should be clarified somehow..
You also add asynchronisity, but old implementation has already qemu_put_buffer_async..
You use aio task pool, but don't say that it may be used only from coroutine.
May be, we'd better call it "coroutine-mode" ?


Also, am I correct that new mode can't be used for read? Should we document/assert it somehow? Or may be support reading? Will switch to snapshot benefit if we implement reading in a new mode?

> 
> Signed-off-by: Denis Plotnikov <dplotnikov@virtuozzo.com>
> ---
>   include/qemu/typedefs.h |   1 +
>   migration/qemu-file.c   | 351 +++++++++++++++++++++++++++++++++++++++++++++---
>   migration/qemu-file.h   |   9 ++
>   3 files changed, 339 insertions(+), 22 deletions(-)
> 
> diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
> index 88dce54..9b388c8 100644
> --- a/include/qemu/typedefs.h
> +++ b/include/qemu/typedefs.h
> @@ -98,6 +98,7 @@ typedef struct QEMUBH QEMUBH;
>   typedef struct QemuConsole QemuConsole;
>   typedef struct QEMUFile QEMUFile;
>   typedef struct QEMUFileBuffer QEMUFileBuffer;
> +typedef struct QEMUFileAioTask QEMUFileAioTask;
>   typedef struct QemuLockable QemuLockable;
>   typedef struct QemuMutex QemuMutex;
>   typedef struct QemuOpt QemuOpt;
> diff --git a/migration/qemu-file.c b/migration/qemu-file.c
> index 285c6ef..f42f949 100644
> --- a/migration/qemu-file.c
> +++ b/migration/qemu-file.c
> @@ -29,19 +29,25 @@
>   #include "qemu-file.h"
>   #include "trace.h"
>   #include "qapi/error.h"
> +#include "block/aio_task.h"
>   
> -#define IO_BUF_SIZE 32768
> +#define IO_BUF_SIZE (1024 * 1024)
>   #define MAX_IOV_SIZE MIN(IOV_MAX, 64)
> +#define IO_BUF_NUM 2

Interesting, how much is it better than if we set to 1, limiting the influence of the series to alignment of written chunks?

> +#define IO_BUF_ALIGNMENT 512
>   
> -QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, 512));
> +QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, IO_BUF_ALIGNMENT));
> +QEMU_BUILD_BUG_ON(IO_BUF_SIZE > INT_MAX);
> +QEMU_BUILD_BUG_ON(IO_BUF_NUM <= 0);
>   
>   struct QEMUFileBuffer {
>       int buf_index;
> -    int buf_size; /* 0 when writing */
> +    int buf_size; /* 0 when non-buffered writing */
>       uint8_t *buf;
>       unsigned long *may_free;
>       struct iovec *iov;
>       unsigned int iovcnt;
> +    QLIST_ENTRY(QEMUFileBuffer) link;
>   };
>   
>   struct QEMUFile {
> @@ -60,6 +66,22 @@ struct QEMUFile {
>       bool shutdown;
>       /* currently used buffer */
>       QEMUFileBuffer *current_buf;
> +    /*
> +     * with buffered_mode enabled all the data copied to 512 byte
> +     * aligned buffer, including iov data. Then the buffer is passed
> +     * to writev_buffer callback.
> +     */
> +    bool buffered_mode;
> +    /* for async buffer writing */
> +    AioTaskPool *pool;
> +    /* the list of free buffers, currently used on is NOT there */
> +    QLIST_HEAD(, QEMUFileBuffer) free_buffers;
> +};
> +
> +struct QEMUFileAioTask {
> +    AioTask task;
> +    QEMUFile *f;
> +    QEMUFileBuffer *fb;
>   };
>   
>   /*
> @@ -115,10 +137,42 @@ QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops)
>       f->opaque = opaque;
>       f->ops = ops;
>   
> -    f->current_buf = g_new0(QEMUFileBuffer, 1);
> -    f->current_buf->buf = g_malloc(IO_BUF_SIZE);
> -    f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
> -    f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
> +    if (f->ops->enable_buffered) {
> +        f->buffered_mode = f->ops->enable_buffered(f->opaque);
> +    }
> +
> +    if (f->buffered_mode && qemu_file_is_writable(f)) {

So we actually go to buffered_mode if file is writable. Then, shouldn't we otherwise set buffered_mode to false otherwise?

> +        int i;
> +        /*
> +         * in buffered_mode we don't use internal io vectors
> +         * and may_free bitmap, because we copy the data to be
> +         * written right away to the buffer
> +         */
> +        f->pool = aio_task_pool_new(IO_BUF_NUM);
> +
> +        /* allocate io buffers */
> +        for (i = 0; i < IO_BUF_NUM; i++) {
> +            QEMUFileBuffer *fb = g_new0(QEMUFileBuffer, 1);
> +
> +            fb->buf = qemu_memalign(IO_BUF_ALIGNMENT, IO_BUF_SIZE);
> +            fb->buf_size = IO_BUF_SIZE;
> +
> +            /*
> +             * put the first buffer to the current buf and the rest
> +             * to the list of free buffers
> +             */
> +            if (i == 0) {
> +                f->current_buf = fb;
> +            } else {
> +                QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
> +            }
> +        }
> +    } else {
> +        f->current_buf = g_new0(QEMUFileBuffer, 1);
> +        f->current_buf->buf = g_malloc(IO_BUF_SIZE);
> +        f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
> +        f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
> +    }
>   
>       return f;
>   }
> @@ -190,6 +244,8 @@ static void qemu_iovec_release_ram(QEMUFile *f)
>       unsigned long idx;
>       QEMUFileBuffer *fb = f->current_buf;
>   
> +    assert(!f->buffered_mode);
> +
>       /* Find and release all the contiguous memory ranges marked as may_free. */
>       idx = find_next_bit(fb->may_free, fb->iovcnt, 0);
>       if (idx >= fb->iovcnt) {
> @@ -221,6 +277,147 @@ static void qemu_iovec_release_ram(QEMUFile *f)
>       bitmap_zero(fb->may_free, MAX_IOV_SIZE);
>   }
>   
> +static void advance_buf_ptr(QEMUFile *f, size_t size)
> +{
> +    QEMUFileBuffer *fb = f->current_buf;
> +    /* must not advance to 0 */
> +    assert(size);
> +    /* must not overflow buf_index (int) */
> +    assert(fb->buf_index + size <= INT_MAX);

to not overflow in check: assert(fb->buf_index <= INT_MAX - size)

> +    /* must not exceed buf_size */
> +    assert(fb->buf_index + size <= fb->buf_size);
> +
> +    fb->buf_index += size;
> +}
> +
> +static size_t get_buf_free_size(QEMUFile *f)
> +{
> +    QEMUFileBuffer *fb = f->current_buf;
> +    /* buf_index can't be greated than buf_size */
> +    assert(fb->buf_size >= fb->buf_index);
> +    return fb->buf_size - fb->buf_index;
> +}
> +
> +static size_t get_buf_used_size(QEMUFile *f)
> +{
> +    QEMUFileBuffer *fb = f->current_buf;
> +    return fb->buf_index;
> +}
> +
> +static uint8_t *get_buf_ptr(QEMUFile *f)
> +{
> +    QEMUFileBuffer *fb = f->current_buf;
> +    /* protects from out of bound reading */
> +    assert(fb->buf_index <= IO_BUF_SIZE);
> +    return fb->buf + fb->buf_index;
> +}
> +
> +static bool buf_is_full(QEMUFile *f)
> +{
> +    return get_buf_free_size(f) == 0;
> +}
> +
> +static void reset_buf(QEMUFile *f)
> +{
> +    QEMUFileBuffer *fb = f->current_buf;
> +    fb->buf_index = 0;
> +}
> +
> +static int write_task_fn(AioTask *task)
> +{
> +    int ret;
> +    Error *local_error = NULL;
> +    QEMUFileAioTask *t = (QEMUFileAioTask *) task;
> +    QEMUFile *f = t->f;
> +    QEMUFileBuffer *fb = t->fb;
> +    uint64_t pos = f->pos;
> +    struct iovec v = (struct iovec) {
> +        .iov_base = fb->buf,
> +        .iov_len = fb->buf_index,
> +    };
> +
> +    assert(f->buffered_mode);
> +
> +    /*
> +     * Increment file position.
> +     * This needs to be here before calling writev_buffer, because
> +     * writev_buffer is asynchronous and there could be more than one
> +     * writev_buffer started simultaniously. Each writev_buffer should
> +     * use its own file pos to write to. writev_buffer may write less
> +     * than buf_index bytes but we treat this situation as an error.
> +     * If error appeared, further file using is meaningless.
> +     * We expect that, the most of the time the full buffer is written,
> +     * (when buf_size == buf_index). The only case when the non-full
> +     * buffer is written (buf_size != buf_index) is file close,
> +     * when we need to flush the rest of the buffer content.
> +     */
> +    f->pos += fb->buf_index;

Seems safer to add pos to QEMUFileAioTask instead, and manage global f->pos
in main coroutine, not in tasks.

> +
> +    ret = f->ops->writev_buffer(f->opaque, &v, 1, pos, &local_error);
> +
> +    /* return the just written buffer to the free list */
> +    QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
> +
> +    /* check that we have written everything */
> +    if (ret != fb->buf_index) {
> +        qemu_file_set_error_obj(f, ret < 0 ? ret : -EIO, local_error);
> +    }
> +
> +    /*
> +     * always return 0 - don't use task error handling, relay on
> +     * qemu file error handling
> +     */
> +    return 0;
> +}
> +
> +static void qemu_file_switch_current_buf(QEMUFile *f)
> +{
> +    /*
> +     * if the list is empty, wait until some task returns a buffer
> +     * to the list of free buffers.
> +     */
> +    if (QLIST_EMPTY(&f->free_buffers)) {
> +        aio_task_pool_wait_slot(f->pool);
> +    }
> +
> +    /*
> +     * sanity check that the list isn't empty
> +     * if the free list was empty, we waited for a task complition,
> +     * and the pompleted task must return a buffer to a list of free buffers
> +     */
> +    assert(!QLIST_EMPTY(&f->free_buffers));
> +
> +    /* set the current buffer for using from the free list */
> +    f->current_buf = QLIST_FIRST(&f->free_buffers);
> +    reset_buf(f);
> +
> +    QLIST_REMOVE(f->current_buf, link);
> +}
> +
> +/**
> + *  Asynchronously flushes QEMUFile buffer
> + *
> + * This will flush all pending data. If data was only partially flushed, it
> + * will set an error state. The function may return before the data actually
> + * written.
> + */
> +static void flush_buffer(QEMUFile *f)
> +{
> +    QEMUFileAioTask *t = g_new(QEMUFileAioTask, 1);
> +
> +    *t = (QEMUFileAioTask) {
> +        .task.func = &write_task_fn,
> +        .f = f,
> +        .fb = f->current_buf,
> +    };
> +
> +    /* aio_task_pool should free t for us */
> +    aio_task_pool_start_task(f->pool, (AioTask *) t);
> +
> +    /* if no errors this will switch the buffer */
> +    qemu_file_switch_current_buf(f);
> +}
> +
>   /**
>    * Flushes QEMUFile buffer
>    *
> @@ -241,7 +438,13 @@ void qemu_fflush(QEMUFile *f)
>       if (f->shutdown) {
>           return;
>       }
> +
> +    if (f->buffered_mode) {
> +        return;

I don't think it's correct. qemu_fflush is public interface of QEMUFile and it's assumed to write all in-flight data.

> +    }
> +
>       if (fb->iovcnt > 0) {
> +        /* this is non-buffered mode */
>           expect = iov_size(fb->iov, fb->iovcnt);
>           ret = f->ops->writev_buffer(f->opaque, fb->iov, fb->iovcnt, f->pos,
>                                       &local_error);
> @@ -378,6 +581,7 @@ static ssize_t qemu_fill_buffer(QEMUFile *f)
>   
>   void qemu_update_position(QEMUFile *f, size_t size)
>   {
> +    assert(!f->buffered_mode);

Public interface. Why are you sure that it's not used in snapshot? It is used in migration/ram.c ...

>       f->pos += size;

And it shouldn't be the problem for the new mode, just ass pos parameter to QEMUFileAioTask,

>   }
>   
> @@ -392,7 +596,18 @@ void qemu_update_position(QEMUFile *f, size_t size)
>   int qemu_fclose(QEMUFile *f)
>   {
>       int ret;
> -    qemu_fflush(f);
> +
> +    if (qemu_file_is_writable(f) && f->buffered_mode) {
> +        ret = qemu_file_get_error(f);
> +        if (!ret) {
> +            flush_buffer(f);
> +        }
> +        /* wait until all tasks are done */
> +        aio_task_pool_wait_all(f->pool);
> +    } else {
> +        qemu_fflush(f);
> +    }

Again, not clean thing: for buffered mode we use flush_buffer, for non-buffered we use qemu_fflush.. It's not clean from function naming.

> +
>       ret = qemu_file_get_error(f);
>   
>       if (f->ops->close) {
> @@ -408,16 +623,77 @@ int qemu_fclose(QEMUFile *f)
>           ret = f->last_error;
>       }
>       error_free(f->last_error_obj);
> -    g_free(f->current_buf->buf);
> -    g_free(f->current_buf->iov);
> -    g_free(f->current_buf->may_free);
> -    g_free(f->current_buf);
> +
> +    if (f->buffered_mode) {
> +        QEMUFileBuffer *fb, *next;
> +        /*
> +         * put the current back to the free buffers list
> +         * to destroy all the buffers in one loop
> +         */
> +        QLIST_INSERT_HEAD(&f->free_buffers, f->current_buf, link);
> +
> +        /* destroy all the buffers */
> +        QLIST_FOREACH_SAFE(fb, &f->free_buffers, link, next) {
> +            QLIST_REMOVE(fb, link);
> +            /* looks like qemu_vfree pairs with qemu_memalign */
> +            qemu_vfree(fb->buf);
> +            g_free(fb);
> +        }
> +        g_free(f->pool);
> +    } else {
> +        g_free(f->current_buf->buf);
> +        g_free(f->current_buf->iov);
> +        g_free(f->current_buf->may_free);
> +        g_free(f->current_buf);
> +    }
> +
>       g_free(f);
>       trace_qemu_file_fclose();
>       return ret;
>   }
>   
>   /*
> + * Copy an external buffer to the intenal current buffer.
> + */
> +static void copy_buf(QEMUFile *f, const uint8_t *buf, size_t size,
> +                     bool may_free)
> +{
> +    size_t data_size = size;
> +    const uint8_t *src_ptr = buf;
> +
> +    assert(f->buffered_mode);
> +    assert(size <= INT_MAX);
> +
> +    while (data_size > 0) {
> +        size_t chunk_size;
> +
> +        if (buf_is_full(f)) {
> +            flush_buffer(f);
> +            if (qemu_file_get_error(f)) {
> +                return;
> +            }
> +        }
> +
> +        chunk_size = MIN(get_buf_free_size(f), data_size);
> +
> +        memcpy(get_buf_ptr(f), src_ptr, chunk_size);
> +
> +        advance_buf_ptr(f, chunk_size);
> +
> +        src_ptr += chunk_size;
> +        data_size -= chunk_size;
> +        f->bytes_xfer += chunk_size;
> +    }
> +
> +    if (may_free) {
> +        if (qemu_madvise((void *) buf, size, QEMU_MADV_DONTNEED) < 0) {
> +            error_report("migrate: madvise DONTNEED failed %p %zd: %s",
> +                         buf, size, strerror(errno));
> +        }
> +    }
> +}
> +
> +/*
>    * Add buf to iovec. Do flush if iovec is full.
>    *
>    * Return values:
> @@ -454,6 +730,9 @@ static int add_to_iovec(QEMUFile *f, const uint8_t *buf, size_t size,
>   static void add_buf_to_iovec(QEMUFile *f, size_t len)
>   {
>       QEMUFileBuffer *fb = f->current_buf;
> +
> +    assert(!f->buffered_mode);
> +
>       if (!add_to_iovec(f, fb->buf + fb->buf_index, len, false)) {
>           fb->buf_index += len;
>           if (fb->buf_index == IO_BUF_SIZE) {
> @@ -469,8 +748,12 @@ void qemu_put_buffer_async(QEMUFile *f, const uint8_t *buf, size_t size,
>           return;
>       }
>   
> -    f->bytes_xfer += size;
> -    add_to_iovec(f, buf, size, may_free);
> +    if (f->buffered_mode) {
> +        copy_buf(f, buf, size, may_free);
> +    } else {
> +        f->bytes_xfer += size;
> +        add_to_iovec(f, buf, size, may_free);
> +    }
>   }
>   
>   void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
> @@ -482,6 +765,11 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
>           return;
>       }
>   
> +    if (f->buffered_mode) {
> +        copy_buf(f, buf, size, false);
> +        return;
> +    }
> +
>       while (size > 0) {
>           l = IO_BUF_SIZE - fb->buf_index;
>           if (l > size) {
> @@ -506,15 +794,21 @@ void qemu_put_byte(QEMUFile *f, int v)
>           return;
>       }
>   
> -    fb->buf[fb->buf_index] = v;
> -    f->bytes_xfer++;
> -    add_buf_to_iovec(f, 1);
> +    if (f->buffered_mode) {
> +        copy_buf(f, (const uint8_t *) &v, 1, false);
> +    } else {
> +        fb->buf[fb->buf_index] = v;
> +        add_buf_to_iovec(f, 1);
> +        f->bytes_xfer++;
> +    }
>   }
>   
>   void qemu_file_skip(QEMUFile *f, int size)
>   {
>       QEMUFileBuffer *fb = f->current_buf;
>   
> +    assert(!f->buffered_mode);
> +
>       if (fb->buf_index + size <= fb->buf_size) {
>           fb->buf_index += size;
>       }
> @@ -672,10 +966,14 @@ int64_t qemu_ftell_fast(QEMUFile *f)
>   {
>       int64_t ret = f->pos;
>       int i;
> -    QEMUFileBuffer *fb = f->current_buf;
>   
> -    for (i = 0; i < fb->iovcnt; i++) {
> -        ret += fb->iov[i].iov_len;
> +    if (f->buffered_mode) {
> +        ret += get_buf_used_size(f);
> +    } else {
> +        QEMUFileBuffer *fb = f->current_buf;
> +        for (i = 0; i < fb->iovcnt; i++) {
> +            ret += fb->iov[i].iov_len;
> +        }
>       }
>   
>       return ret;
> @@ -683,8 +981,12 @@ int64_t qemu_ftell_fast(QEMUFile *f)
>   
>   int64_t qemu_ftell(QEMUFile *f)
>   {
> -    qemu_fflush(f);
> -    return f->pos;
> +    if (f->buffered_mode) {
> +        return qemu_ftell_fast(f);
> +    } else {
> +        qemu_fflush(f);
> +        return f->pos;
> +    }
>   }
>   
>   int qemu_file_rate_limit(QEMUFile *f)
> @@ -803,6 +1105,8 @@ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
>       QEMUFileBuffer *fb = f->current_buf;
>       ssize_t blen = IO_BUF_SIZE - fb->buf_index - sizeof(int32_t);
>   
> +    assert(!f->buffered_mode);
> +
>       if (blen < compressBound(size)) {
>           return -1;
>       }
> @@ -827,6 +1131,9 @@ int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src)
>       int len = 0;
>       QEMUFileBuffer *fb_src = f_src->current_buf;
>   
> +    assert(!f_des->buffered_mode);
> +    assert(!f_src->buffered_mode);
> +
>       if (fb_src->buf_index > 0) {
>           len = fb_src->buf_index;
>           qemu_put_buffer(f_des, fb_src->buf, fb_src->buf_index);
> diff --git a/migration/qemu-file.h b/migration/qemu-file.h
> index a9b6d6c..08655d2 100644
> --- a/migration/qemu-file.h
> +++ b/migration/qemu-file.h
> @@ -103,6 +103,14 @@ typedef QEMUFile *(QEMURetPathFunc)(void *opaque);
>   typedef int (QEMUFileShutdownFunc)(void *opaque, bool rd, bool wr,
>                                      Error **errp);
>   
> +/*
> + * Enables or disables the buffered mode
> + * Existing blocking reads/writes must be woken
> + * Returns true if the buffered mode has to be enabled,
> + * false if it has to be disabled.
> + */
> +typedef bool (QEMUFileEnableBufferedFunc)(void *opaque);
> +
>   typedef struct QEMUFileOps {
>       QEMUFileGetBufferFunc *get_buffer;
>       QEMUFileCloseFunc *close;
> @@ -110,6 +118,7 @@ typedef struct QEMUFileOps {
>       QEMUFileWritevBufferFunc *writev_buffer;
>       QEMURetPathFunc *get_return_path;
>       QEMUFileShutdownFunc *shut_down;
> +    QEMUFileEnableBufferedFunc *enable_buffered;
>   } QEMUFileOps;
>   
>   typedef struct QEMUFileHooks {
> 


-- 
Best regards,
Vladimir


^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [RFC patch v1 2/3] qemu-file: add buffered mode
  2020-04-25  9:10   ` Vladimir Sementsov-Ogievskiy
@ 2020-04-27  8:19     ` Denis Plotnikov
  2020-04-27 11:04       ` Vladimir Sementsov-Ogievskiy
  0 siblings, 1 reply; 19+ messages in thread
From: Denis Plotnikov @ 2020-04-27  8:19 UTC (permalink / raw)
  To: Vladimir Sementsov-Ogievskiy, qemu-devel; +Cc: den, dgilbert, quintela



On 25.04.2020 12:10, Vladimir Sementsov-Ogievskiy wrote:
> 13.04.2020 14:12, Denis Plotnikov wrote:
>> The patch adds ability to qemu-file to write the data
>> asynchronously to improve the performance on writing.
>> Before, only synchronous writing was supported.
>>
>> Enabling of the asyncronous mode is managed by new
>> "enabled_buffered" callback.
>
> Hmm.
>
> I don't like resulting architecture very much:
>
> 1. Function naming is not clean: how can I understand that copy_buf is 
> for buffered mode when add_to_iovec is +- same thing for non-buffered 
> mode?
Yes, this is to be changed in the next patches
>
> Hmm actually, you just alter several significant functions of QEMUFile 
> - open, close, put, flush. In old mode we do one thing, in a new mode 
> - absolutely another. This looks like a driver. So may be we want to 
> add QEMUFileDriver struct, to define these functions as callbacks, 
> move old realizations to default driver, and add new functionality as 
> a new driver, what do you think?
Yes it looks like that, but on the other hand I can't imagine another 
driver to be added and changing the code to "the driver notation" would 
involve more code adding. So, should we really do that. Anyway, your 
suggestion looks cleaner.
>
> 2. Terminology: you say you add buffered mode, but actually qemu file 
> is already work in buffered mode, so it should be clarified somehow..
The initial implementation uses mixed implementation, it has a buffer 
and an iovec array. The buffer is used to store *some* parts (ecluding 
RAM) of a vm state + service information. Each written to the buffer is 
added to the iovec array as a separate entry. RAM pages aren't added to 
the buffer, instead they are added to the iovec array directly without 
coping to the buffer. This is why we almost always get the iovec array 
consisting of size- and pointer- unaligned iovec-s and why we have the 
performance issues (more detailed in 0000 of this series).
So, I can't say that the qemu-file has "real" buffered-mode.
> You also add asynchronisity, but old implementation has already 
> qemu_put_buffer_async..
In my opinion, this function name is kind of confusing. What the 
function does is adding the buffer pointer to the internal iovec array 
without coping the buffer. It's not related to some asynchronous operation.
> You use aio task pool, but don't say that it may be used only from 
> coroutine.
> May be, we'd better call it "coroutine-mode" ?
I don't think that mentioning any implementation-specific name is a good 
idea. aio_task_pool is a good option to implement async operation, but 
it can be any other interface.
I'd rather implicitly assert qemu_in_coroutine() when in "buffered-mode".
>
>
> Also, am I correct that new mode can't be used for read? Should we 
> document/assert it somehow? 
It can't just for now since read-buffered mode isn't implemented. 
Enabling buffered-mode for reading affects nothing - qemu-file works as 
before.
The same qemu-file instance can't  be opened for reading and writing at 
the same time. I'd add that assert on qemu-file open.
> Or may be support reading? Will switch to snapshot benefit if we 
> implement reading in a new mode?
I thought about that. I think it could benefit making some kind of 
read-ahead to a number of buffers, while the "current" buffer is used to 
fill a vm state.
I didn't want to focus on the reading improvements because it's not that 
big problem in comparison to the writing. The qemu-file reading uses a 
buffer and fills  that buffer with a single io operation.

>
>>
>> Signed-off-by: Denis Plotnikov <dplotnikov@virtuozzo.com>
>> ---
>>   include/qemu/typedefs.h |   1 +
>>   migration/qemu-file.c   | 351 
>> +++++++++++++++++++++++++++++++++++++++++++++---
>>   migration/qemu-file.h   |   9 ++
>>   3 files changed, 339 insertions(+), 22 deletions(-)
>>
>> diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
>> index 88dce54..9b388c8 100644
>> --- a/include/qemu/typedefs.h
>> +++ b/include/qemu/typedefs.h
>> @@ -98,6 +98,7 @@ typedef struct QEMUBH QEMUBH;
>>   typedef struct QemuConsole QemuConsole;
>>   typedef struct QEMUFile QEMUFile;
>>   typedef struct QEMUFileBuffer QEMUFileBuffer;
>> +typedef struct QEMUFileAioTask QEMUFileAioTask;
>>   typedef struct QemuLockable QemuLockable;
>>   typedef struct QemuMutex QemuMutex;
>>   typedef struct QemuOpt QemuOpt;
>> diff --git a/migration/qemu-file.c b/migration/qemu-file.c
>> index 285c6ef..f42f949 100644
>> --- a/migration/qemu-file.c
>> +++ b/migration/qemu-file.c
>> @@ -29,19 +29,25 @@
>>   #include "qemu-file.h"
>>   #include "trace.h"
>>   #include "qapi/error.h"
>> +#include "block/aio_task.h"
>>   -#define IO_BUF_SIZE 32768
>> +#define IO_BUF_SIZE (1024 * 1024)
>>   #define MAX_IOV_SIZE MIN(IOV_MAX, 64)
>> +#define IO_BUF_NUM 2
>
> Interesting, how much is it better than if we set to 1, limiting the 
> influence of the series to alignment of written chunks?
>> +#define IO_BUF_ALIGNMENT 512
>>   -QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, 512));
>> +QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, IO_BUF_ALIGNMENT));
>> +QEMU_BUILD_BUG_ON(IO_BUF_SIZE > INT_MAX);
>> +QEMU_BUILD_BUG_ON(IO_BUF_NUM <= 0);
>>     struct QEMUFileBuffer {
>>       int buf_index;
>> -    int buf_size; /* 0 when writing */
>> +    int buf_size; /* 0 when non-buffered writing */
>>       uint8_t *buf;
>>       unsigned long *may_free;
>>       struct iovec *iov;
>>       unsigned int iovcnt;
>> +    QLIST_ENTRY(QEMUFileBuffer) link;
>>   };
>>     struct QEMUFile {
>> @@ -60,6 +66,22 @@ struct QEMUFile {
>>       bool shutdown;
>>       /* currently used buffer */
>>       QEMUFileBuffer *current_buf;
>> +    /*
>> +     * with buffered_mode enabled all the data copied to 512 byte
>> +     * aligned buffer, including iov data. Then the buffer is passed
>> +     * to writev_buffer callback.
>> +     */
>> +    bool buffered_mode;
>> +    /* for async buffer writing */
>> +    AioTaskPool *pool;
>> +    /* the list of free buffers, currently used on is NOT there */
>> +    QLIST_HEAD(, QEMUFileBuffer) free_buffers;
>> +};
>> +
>> +struct QEMUFileAioTask {
>> +    AioTask task;
>> +    QEMUFile *f;
>> +    QEMUFileBuffer *fb;
>>   };
>>     /*
>> @@ -115,10 +137,42 @@ QEMUFile *qemu_fopen_ops(void *opaque, const 
>> QEMUFileOps *ops)
>>       f->opaque = opaque;
>>       f->ops = ops;
>>   -    f->current_buf = g_new0(QEMUFileBuffer, 1);
>> -    f->current_buf->buf = g_malloc(IO_BUF_SIZE);
>> -    f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
>> -    f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
>> +    if (f->ops->enable_buffered) {
>> +        f->buffered_mode = f->ops->enable_buffered(f->opaque);
>> +    }
>> +
>> +    if (f->buffered_mode && qemu_file_is_writable(f)) {
>
> So we actually go to buffered_mode if file is writable. Then, 
> shouldn't we otherwise set buffered_mode to false otherwise?
buffered_mode is initialized with false
>
>> +        int i;
>> +        /*
>> +         * in buffered_mode we don't use internal io vectors
>> +         * and may_free bitmap, because we copy the data to be
>> +         * written right away to the buffer
>> +         */
>> +        f->pool = aio_task_pool_new(IO_BUF_NUM);
>> +
>> +        /* allocate io buffers */
>> +        for (i = 0; i < IO_BUF_NUM; i++) {
>> +            QEMUFileBuffer *fb = g_new0(QEMUFileBuffer, 1);
>> +
>> +            fb->buf = qemu_memalign(IO_BUF_ALIGNMENT, IO_BUF_SIZE);
>> +            fb->buf_size = IO_BUF_SIZE;
>> +
>> +            /*
>> +             * put the first buffer to the current buf and the rest
>> +             * to the list of free buffers
>> +             */
>> +            if (i == 0) {
>> +                f->current_buf = fb;
>> +            } else {
>> +                QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
>> +            }
>> +        }
>> +    } else {
>> +        f->current_buf = g_new0(QEMUFileBuffer, 1);
>> +        f->current_buf->buf = g_malloc(IO_BUF_SIZE);
>> +        f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
>> +        f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
>> +    }
>>         return f;
>>   }
>> @@ -190,6 +244,8 @@ static void qemu_iovec_release_ram(QEMUFile *f)
>>       unsigned long idx;
>>       QEMUFileBuffer *fb = f->current_buf;
>>   +    assert(!f->buffered_mode);
>> +
>>       /* Find and release all the contiguous memory ranges marked as 
>> may_free. */
>>       idx = find_next_bit(fb->may_free, fb->iovcnt, 0);
>>       if (idx >= fb->iovcnt) {
>> @@ -221,6 +277,147 @@ static void qemu_iovec_release_ram(QEMUFile *f)
>>       bitmap_zero(fb->may_free, MAX_IOV_SIZE);
>>   }
>>   +static void advance_buf_ptr(QEMUFile *f, size_t size)
>> +{
>> +    QEMUFileBuffer *fb = f->current_buf;
>> +    /* must not advance to 0 */
>> +    assert(size);
>> +    /* must not overflow buf_index (int) */
>> +    assert(fb->buf_index + size <= INT_MAX);
>
> to not overflow in check: assert(fb->buf_index <= INT_MAX - size)
good catch
>
>> +    /* must not exceed buf_size */
>> +    assert(fb->buf_index + size <= fb->buf_size);
>> +
>> +    fb->buf_index += size;
>> +}
>> +
>> +static size_t get_buf_free_size(QEMUFile *f)
>> +{
>> +    QEMUFileBuffer *fb = f->current_buf;
>> +    /* buf_index can't be greated than buf_size */
>> +    assert(fb->buf_size >= fb->buf_index);
>> +    return fb->buf_size - fb->buf_index;
>> +}
>> +
>> +static size_t get_buf_used_size(QEMUFile *f)
>> +{
>> +    QEMUFileBuffer *fb = f->current_buf;
>> +    return fb->buf_index;
>> +}
>> +
>> +static uint8_t *get_buf_ptr(QEMUFile *f)
>> +{
>> +    QEMUFileBuffer *fb = f->current_buf;
>> +    /* protects from out of bound reading */
>> +    assert(fb->buf_index <= IO_BUF_SIZE);
>> +    return fb->buf + fb->buf_index;
>> +}
>> +
>> +static bool buf_is_full(QEMUFile *f)
>> +{
>> +    return get_buf_free_size(f) == 0;
>> +}
>> +
>> +static void reset_buf(QEMUFile *f)
>> +{
>> +    QEMUFileBuffer *fb = f->current_buf;
>> +    fb->buf_index = 0;
>> +}
>> +
>> +static int write_task_fn(AioTask *task)
>> +{
>> +    int ret;
>> +    Error *local_error = NULL;
>> +    QEMUFileAioTask *t = (QEMUFileAioTask *) task;
>> +    QEMUFile *f = t->f;
>> +    QEMUFileBuffer *fb = t->fb;
>> +    uint64_t pos = f->pos;
>> +    struct iovec v = (struct iovec) {
>> +        .iov_base = fb->buf,
>> +        .iov_len = fb->buf_index,
>> +    };
>> +
>> +    assert(f->buffered_mode);
>> +
>> +    /*
>> +     * Increment file position.
>> +     * This needs to be here before calling writev_buffer, because
>> +     * writev_buffer is asynchronous and there could be more than one
>> +     * writev_buffer started simultaniously. Each writev_buffer should
>> +     * use its own file pos to write to. writev_buffer may write less
>> +     * than buf_index bytes but we treat this situation as an error.
>> +     * If error appeared, further file using is meaningless.
>> +     * We expect that, the most of the time the full buffer is written,
>> +     * (when buf_size == buf_index). The only case when the non-full
>> +     * buffer is written (buf_size != buf_index) is file close,
>> +     * when we need to flush the rest of the buffer content.
>> +     */
>> +    f->pos += fb->buf_index;
>
> Seems safer to add pos to QEMUFileAioTask instead, and manage global 
> f->pos
> in main coroutine, not in tasks.
I also though about that but I didn't find any benefit because with 
couroutines we shouldn't get race problems
>
>> +
>> +    ret = f->ops->writev_buffer(f->opaque, &v, 1, pos, &local_error);
>> +
>> +    /* return the just written buffer to the free list */
>> +    QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
>> +
>> +    /* check that we have written everything */
>> +    if (ret != fb->buf_index) {
>> +        qemu_file_set_error_obj(f, ret < 0 ? ret : -EIO, local_error);
>> +    }
>> +
>> +    /*
>> +     * always return 0 - don't use task error handling, relay on
>> +     * qemu file error handling
>> +     */
>> +    return 0;
>> +}
>> +
>> +static void qemu_file_switch_current_buf(QEMUFile *f)
>> +{
>> +    /*
>> +     * if the list is empty, wait until some task returns a buffer
>> +     * to the list of free buffers.
>> +     */
>> +    if (QLIST_EMPTY(&f->free_buffers)) {
>> +        aio_task_pool_wait_slot(f->pool);
>> +    }
>> +
>> +    /*
>> +     * sanity check that the list isn't empty
>> +     * if the free list was empty, we waited for a task complition,
>> +     * and the pompleted task must return a buffer to a list of free 
>> buffers
>> +     */
>> +    assert(!QLIST_EMPTY(&f->free_buffers));
>> +
>> +    /* set the current buffer for using from the free list */
>> +    f->current_buf = QLIST_FIRST(&f->free_buffers);
>> +    reset_buf(f);
>> +
>> +    QLIST_REMOVE(f->current_buf, link);
>> +}
>> +
>> +/**
>> + *  Asynchronously flushes QEMUFile buffer
>> + *
>> + * This will flush all pending data. If data was only partially 
>> flushed, it
>> + * will set an error state. The function may return before the data 
>> actually
>> + * written.
>> + */
>> +static void flush_buffer(QEMUFile *f)
>> +{
>> +    QEMUFileAioTask *t = g_new(QEMUFileAioTask, 1);
>> +
>> +    *t = (QEMUFileAioTask) {
>> +        .task.func = &write_task_fn,
>> +        .f = f,
>> +        .fb = f->current_buf,
>> +    };
>> +
>> +    /* aio_task_pool should free t for us */
>> +    aio_task_pool_start_task(f->pool, (AioTask *) t);
>> +
>> +    /* if no errors this will switch the buffer */
>> +    qemu_file_switch_current_buf(f);
>> +}
>> +
>>   /**
>>    * Flushes QEMUFile buffer
>>    *
>> @@ -241,7 +438,13 @@ void qemu_fflush(QEMUFile *f)
>>       if (f->shutdown) {
>>           return;
>>       }
>> +
>> +    if (f->buffered_mode) {
>> +        return;
>
> I don't think it's correct. qemu_fflush is public interface of 
> QEMUFile and it's assumed to write all in-flight data.
Yes it should, when you use sockets and you want to make sure that 
you've sent what your peer is waiting for.
But this isn't the case when you write an internal snapshot, especially 
when you explicitly ask to open qemu-file in "buffered_mode".
>
>> +    }
>> +
>>       if (fb->iovcnt > 0) {
>> +        /* this is non-buffered mode */
>>           expect = iov_size(fb->iov, fb->iovcnt);
>>           ret = f->ops->writev_buffer(f->opaque, fb->iov, fb->iovcnt, 
>> f->pos,
>>                                       &local_error);
>> @@ -378,6 +581,7 @@ static ssize_t qemu_fill_buffer(QEMUFile *f)
>>     void qemu_update_position(QEMUFile *f, size_t size)
>>   {
>> +    assert(!f->buffered_mode);
>
> Public interface. Why are you sure that it's not used in snapshot? It 
> is used in migration/ram.c ...
Looks like on reading... I didn't encounter that assert on writing when 
testing. Add this protection just in case, because ...
>
>>       f->pos += size;
>
> And it shouldn't be the problem for the new mode, just ass pos 
> parameter to QEMUFileAioTask,
...It could be a problem. In the new mode we want our buffers 
size-aligned, if we update pos, we need to fill the rest of the buffer 
with zeros, put the buffer for writing, get another free buffer for using.
But I didn't see qemu_update_position() using in writing path.
>
>>   }
>>   @@ -392,7 +596,18 @@ void qemu_update_position(QEMUFile *f, size_t 
>> size)
>>   int qemu_fclose(QEMUFile *f)
>>   {
>>       int ret;
>> -    qemu_fflush(f);
>> +
>> +    if (qemu_file_is_writable(f) && f->buffered_mode) {
>> +        ret = qemu_file_get_error(f);
>> +        if (!ret) {
>> +            flush_buffer(f);
>> +        }
>> +        /* wait until all tasks are done */
>> +        aio_task_pool_wait_all(f->pool);
>> +    } else {
>> +        qemu_fflush(f);
>> +    }
>
> Again, not clean thing: for buffered mode we use flush_buffer, for 
> non-buffered we use qemu_fflush.. It's not clean from function naming.
Will improve the naming
>
>> +
>>       ret = qemu_file_get_error(f);
>>         if (f->ops->close) {
>> @@ -408,16 +623,77 @@ int qemu_fclose(QEMUFile *f)
>>           ret = f->last_error;
>>       }
>>       error_free(f->last_error_obj);
>> -    g_free(f->current_buf->buf);
>> -    g_free(f->current_buf->iov);
>> -    g_free(f->current_buf->may_free);
>> -    g_free(f->current_buf);
>> +
>> +    if (f->buffered_mode) {
>> +        QEMUFileBuffer *fb, *next;
>> +        /*
>> +         * put the current back to the free buffers list
>> +         * to destroy all the buffers in one loop
>> +         */
>> +        QLIST_INSERT_HEAD(&f->free_buffers, f->current_buf, link);
>> +
>> +        /* destroy all the buffers */
>> +        QLIST_FOREACH_SAFE(fb, &f->free_buffers, link, next) {
>> +            QLIST_REMOVE(fb, link);
>> +            /* looks like qemu_vfree pairs with qemu_memalign */
>> +            qemu_vfree(fb->buf);
>> +            g_free(fb);
>> +        }
>> +        g_free(f->pool);
>> +    } else {
>> +        g_free(f->current_buf->buf);
>> +        g_free(f->current_buf->iov);
>> +        g_free(f->current_buf->may_free);
>> +        g_free(f->current_buf);
>> +    }
>> +
>>       g_free(f);
>>       trace_qemu_file_fclose();
>>       return ret;
>>   }
>>     /*
>> + * Copy an external buffer to the intenal current buffer.
>> + */
>> +static void copy_buf(QEMUFile *f, const uint8_t *buf, size_t size,
>> +                     bool may_free)
>> +{
>> +    size_t data_size = size;
>> +    const uint8_t *src_ptr = buf;
>> +
>> +    assert(f->buffered_mode);
>> +    assert(size <= INT_MAX);
>> +
>> +    while (data_size > 0) {
>> +        size_t chunk_size;
>> +
>> +        if (buf_is_full(f)) {
>> +            flush_buffer(f);
>> +            if (qemu_file_get_error(f)) {
>> +                return;
>> +            }
>> +        }
>> +
>> +        chunk_size = MIN(get_buf_free_size(f), data_size);
>> +
>> +        memcpy(get_buf_ptr(f), src_ptr, chunk_size);
>> +
>> +        advance_buf_ptr(f, chunk_size);
>> +
>> +        src_ptr += chunk_size;
>> +        data_size -= chunk_size;
>> +        f->bytes_xfer += chunk_size;
>> +    }
>> +
>> +    if (may_free) {
>> +        if (qemu_madvise((void *) buf, size, QEMU_MADV_DONTNEED) < 0) {
>> +            error_report("migrate: madvise DONTNEED failed %p %zd: %s",
>> +                         buf, size, strerror(errno));
>> +        }
>> +    }
>> +}
>> +
>> +/*
>>    * Add buf to iovec. Do flush if iovec is full.
>>    *
>>    * Return values:
>> @@ -454,6 +730,9 @@ static int add_to_iovec(QEMUFile *f, const 
>> uint8_t *buf, size_t size,
>>   static void add_buf_to_iovec(QEMUFile *f, size_t len)
>>   {
>>       QEMUFileBuffer *fb = f->current_buf;
>> +
>> +    assert(!f->buffered_mode);
>> +
>>       if (!add_to_iovec(f, fb->buf + fb->buf_index, len, false)) {
>>           fb->buf_index += len;
>>           if (fb->buf_index == IO_BUF_SIZE) {
>> @@ -469,8 +748,12 @@ void qemu_put_buffer_async(QEMUFile *f, const 
>> uint8_t *buf, size_t size,
>>           return;
>>       }
>>   -    f->bytes_xfer += size;
>> -    add_to_iovec(f, buf, size, may_free);
>> +    if (f->buffered_mode) {
>> +        copy_buf(f, buf, size, may_free);
>> +    } else {
>> +        f->bytes_xfer += size;
>> +        add_to_iovec(f, buf, size, may_free);
>> +    }
>>   }
>>     void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
>> @@ -482,6 +765,11 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t 
>> *buf, size_t size)
>>           return;
>>       }
>>   +    if (f->buffered_mode) {
>> +        copy_buf(f, buf, size, false);
>> +        return;
>> +    }
>> +
>>       while (size > 0) {
>>           l = IO_BUF_SIZE - fb->buf_index;
>>           if (l > size) {
>> @@ -506,15 +794,21 @@ void qemu_put_byte(QEMUFile *f, int v)
>>           return;
>>       }
>>   -    fb->buf[fb->buf_index] = v;
>> -    f->bytes_xfer++;
>> -    add_buf_to_iovec(f, 1);
>> +    if (f->buffered_mode) {
>> +        copy_buf(f, (const uint8_t *) &v, 1, false);
>> +    } else {
>> +        fb->buf[fb->buf_index] = v;
>> +        add_buf_to_iovec(f, 1);
>> +        f->bytes_xfer++;
>> +    }
>>   }
>>     void qemu_file_skip(QEMUFile *f, int size)
>>   {
>>       QEMUFileBuffer *fb = f->current_buf;
>>   +    assert(!f->buffered_mode);
>> +
>>       if (fb->buf_index + size <= fb->buf_size) {
>>           fb->buf_index += size;
>>       }
>> @@ -672,10 +966,14 @@ int64_t qemu_ftell_fast(QEMUFile *f)
>>   {
>>       int64_t ret = f->pos;
>>       int i;
>> -    QEMUFileBuffer *fb = f->current_buf;
>>   -    for (i = 0; i < fb->iovcnt; i++) {
>> -        ret += fb->iov[i].iov_len;
>> +    if (f->buffered_mode) {
>> +        ret += get_buf_used_size(f);
>> +    } else {
>> +        QEMUFileBuffer *fb = f->current_buf;
>> +        for (i = 0; i < fb->iovcnt; i++) {
>> +            ret += fb->iov[i].iov_len;
>> +        }
>>       }
>>         return ret;
>> @@ -683,8 +981,12 @@ int64_t qemu_ftell_fast(QEMUFile *f)
>>     int64_t qemu_ftell(QEMUFile *f)
>>   {
>> -    qemu_fflush(f);
>> -    return f->pos;
>> +    if (f->buffered_mode) {
>> +        return qemu_ftell_fast(f);
>> +    } else {
>> +        qemu_fflush(f);
>> +        return f->pos;
>> +    }
>>   }
>>     int qemu_file_rate_limit(QEMUFile *f)
>> @@ -803,6 +1105,8 @@ ssize_t qemu_put_compression_data(QEMUFile *f, 
>> z_stream *stream,
>>       QEMUFileBuffer *fb = f->current_buf;
>>       ssize_t blen = IO_BUF_SIZE - fb->buf_index - sizeof(int32_t);
>>   +    assert(!f->buffered_mode);
>> +
>>       if (blen < compressBound(size)) {
>>           return -1;
>>       }
>> @@ -827,6 +1131,9 @@ int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile 
>> *f_src)
>>       int len = 0;
>>       QEMUFileBuffer *fb_src = f_src->current_buf;
>>   +    assert(!f_des->buffered_mode);
>> +    assert(!f_src->buffered_mode);
>> +
>>       if (fb_src->buf_index > 0) {
>>           len = fb_src->buf_index;
>>           qemu_put_buffer(f_des, fb_src->buf, fb_src->buf_index);
>> diff --git a/migration/qemu-file.h b/migration/qemu-file.h
>> index a9b6d6c..08655d2 100644
>> --- a/migration/qemu-file.h
>> +++ b/migration/qemu-file.h
>> @@ -103,6 +103,14 @@ typedef QEMUFile *(QEMURetPathFunc)(void *opaque);
>>   typedef int (QEMUFileShutdownFunc)(void *opaque, bool rd, bool wr,
>>                                      Error **errp);
>>   +/*
>> + * Enables or disables the buffered mode
>> + * Existing blocking reads/writes must be woken
>> + * Returns true if the buffered mode has to be enabled,
>> + * false if it has to be disabled.
>> + */
>> +typedef bool (QEMUFileEnableBufferedFunc)(void *opaque);
>> +
>>   typedef struct QEMUFileOps {
>>       QEMUFileGetBufferFunc *get_buffer;
>>       QEMUFileCloseFunc *close;
>> @@ -110,6 +118,7 @@ typedef struct QEMUFileOps {
>>       QEMUFileWritevBufferFunc *writev_buffer;
>>       QEMURetPathFunc *get_return_path;
>>       QEMUFileShutdownFunc *shut_down;
>> +    QEMUFileEnableBufferedFunc *enable_buffered;
>>   } QEMUFileOps;
>>     typedef struct QEMUFileHooks {
>>
>
>



^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [RFC patch v1 2/3] qemu-file: add buffered mode
  2020-04-24 21:25   ` Eric Blake
@ 2020-04-27  8:21     ` Denis Plotnikov
  0 siblings, 0 replies; 19+ messages in thread
From: Denis Plotnikov @ 2020-04-27  8:21 UTC (permalink / raw)
  To: Eric Blake, qemu-devel; +Cc: den, dgilbert, quintela



On 25.04.2020 00:25, Eric Blake wrote:
> On 4/13/20 6:12 AM, Denis Plotnikov wrote:
>> The patch adds ability to qemu-file to write the data
>> asynchronously to improve the performance on writing.
>> Before, only synchronous writing was supported.
>>
>> Enabling of the asyncronous mode is managed by new
>
> asynchronous
>
>> "enabled_buffered" callback.
>
> The term "enabled_buffered" does not appear in the patch.  Did you 
> mean...
>
>>
>> Signed-off-by: Denis Plotnikov <dplotnikov@virtuozzo.com>
>> ---
>>   include/qemu/typedefs.h |   1 +
>>   migration/qemu-file.c   | 351 
>> +++++++++++++++++++++++++++++++++++++++++++++---
>>   migration/qemu-file.h   |   9 ++
>>   3 files changed, 339 insertions(+), 22 deletions(-)
>>
>
>> @@ -60,6 +66,22 @@ struct QEMUFile {
>>       bool shutdown;
>>       /* currently used buffer */
>>       QEMUFileBuffer *current_buf;
>> +    /*
>> +     * with buffered_mode enabled all the data copied to 512 byte
>> +     * aligned buffer, including iov data. Then the buffer is passed
>> +     * to writev_buffer callback.
>> +     */
>> +    bool buffered_mode;
>
> ..."Asynchronous mode is managed by setting the new buffered_mode 
> flag"?  ...
>
>
>> +    /* for async buffer writing */
>> +    AioTaskPool *pool;
>> +    /* the list of free buffers, currently used on is NOT there */
>
> s/on/one/
>
>> +    QLIST_HEAD(, QEMUFileBuffer) free_buffers;
>> +};
>> +
>> +struct QEMUFileAioTask {
>> +    AioTask task;
>> +    QEMUFile *f;
>> +    QEMUFileBuffer *fb;
>>   };
>>     /*
>> @@ -115,10 +137,42 @@ QEMUFile *qemu_fopen_ops(void *opaque, const 
>> QEMUFileOps *ops)
>>       f->opaque = opaque;
>>       f->ops = ops;
>>   -    f->current_buf = g_new0(QEMUFileBuffer, 1);
>> -    f->current_buf->buf = g_malloc(IO_BUF_SIZE);
>> -    f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
>> -    f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
>> +    if (f->ops->enable_buffered) {
>> +        f->buffered_mode = f->ops->enable_buffered(f->opaque);
>
> ...ah, you meant 'enable_buffered'.  But still, why do we need a 
> callback function?  Is it not sufficient to just have a bool flag?
>
>
>> +static size_t get_buf_free_size(QEMUFile *f)
>> +{
>> +    QEMUFileBuffer *fb = f->current_buf;
>> +    /* buf_index can't be greated than buf_size */
>
> greater
>
>> +    assert(fb->buf_size >= fb->buf_index);
>> +    return fb->buf_size - fb->buf_index;
>> +}
>> +
>
>> +static int write_task_fn(AioTask *task)
>> +{
>
>> +    /*
>> +     * Increment file position.
>> +     * This needs to be here before calling writev_buffer, because
>> +     * writev_buffer is asynchronous and there could be more than one
>> +     * writev_buffer started simultaniously. Each writev_buffer should
>
> simultaneously
>
>> +     * use its own file pos to write to. writev_buffer may write less
>> +     * than buf_index bytes but we treat this situation as an error.
>> +     * If error appeared, further file using is meaningless.
>
> s/using/use/
>
>> +     * We expect that, the most of the time the full buffer is written,
>> +     * (when buf_size == buf_index). The only case when the non-full
>> +     * buffer is written (buf_size != buf_index) is file close,
>> +     * when we need to flush the rest of the buffer content.
>
> We expect that most of the time, the full buffer will be written 
> (buf_size == buf_index), with the exception at file close where we 
> need to flush the final partial buffer.
>
>> +     */
>> +    f->pos += fb->buf_index;
>> +
>> +    ret = f->ops->writev_buffer(f->opaque, &v, 1, pos, &local_error);
>> +
>> +    /* return the just written buffer to the free list */
>> +    QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
>> +
>> +    /* check that we have written everything */
>> +    if (ret != fb->buf_index) {
>> +        qemu_file_set_error_obj(f, ret < 0 ? ret : -EIO, local_error);
>> +    }
>> +
>> +    /*
>> +     * always return 0 - don't use task error handling, relay on
>
> rely
>
>> +     * qemu file error handling
>> +     */
>> +    return 0;
>> +}
>> +
>> +static void qemu_file_switch_current_buf(QEMUFile *f)
>> +{
>> +    /*
>> +     * if the list is empty, wait until some task returns a buffer
>> +     * to the list of free buffers.
>> +     */
>> +    if (QLIST_EMPTY(&f->free_buffers)) {
>> +        aio_task_pool_wait_slot(f->pool);
>> +    }
>> +
>> +    /*
>> +     * sanity check that the list isn't empty
>> +     * if the free list was empty, we waited for a task complition,
>
> completion
>
>> +     * and the pompleted task must return a buffer to a list of free 
>> buffers
>
> completed
>
>> +     */
>> +    assert(!QLIST_EMPTY(&f->free_buffers));
>> +
>> +    /* set the current buffer for using from the free list */
>> +    f->current_buf = QLIST_FIRST(&f->free_buffers);
>> +    reset_buf(f);
>> +
>> +    QLIST_REMOVE(f->current_buf, link);
>> +}
>> +
>
>>     /*
>> + * Copy an external buffer to the intenal current buffer.
>
> internal
>
>> + */
>> +static void copy_buf(QEMUFile *f, const uint8_t *buf, size_t size,
>> +                     bool may_free)
>> +{
>
>> +++ b/migration/qemu-file.h
>> @@ -103,6 +103,14 @@ typedef QEMUFile *(QEMURetPathFunc)(void *opaque);
>>   typedef int (QEMUFileShutdownFunc)(void *opaque, bool rd, bool wr,
>>                                      Error **errp);
>>   +/*
>> + * Enables or disables the buffered mode
>> + * Existing blocking reads/writes must be woken
>> + * Returns true if the buffered mode has to be enabled,
>> + * false if it has to be disabled.
>> + */
>> +typedef bool (QEMUFileEnableBufferedFunc)(void *opaque);
>
> If this never gets called outside of initial creation of the QemuFile 
> (that is, it is not dynamic), then making it a straight flag instead 
> of a callback function is simpler.
Yes, I agree.

Thanks for reviewing and lots of grammar fixing!
>
>
>> +
>>   typedef struct QEMUFileOps {
>>       QEMUFileGetBufferFunc *get_buffer;
>>       QEMUFileCloseFunc *close;
>> @@ -110,6 +118,7 @@ typedef struct QEMUFileOps {
>>       QEMUFileWritevBufferFunc *writev_buffer;
>>       QEMURetPathFunc *get_return_path;
>>       QEMUFileShutdownFunc *shut_down;
>> +    QEMUFileEnableBufferedFunc *enable_buffered;
>>   } QEMUFileOps;
>>     typedef struct QEMUFileHooks {
>>
>



^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [RFC patch v1 2/3] qemu-file: add buffered mode
  2020-04-27  8:19     ` Denis Plotnikov
@ 2020-04-27 11:04       ` Vladimir Sementsov-Ogievskiy
  0 siblings, 0 replies; 19+ messages in thread
From: Vladimir Sementsov-Ogievskiy @ 2020-04-27 11:04 UTC (permalink / raw)
  To: Denis Plotnikov, qemu-devel; +Cc: den, dgilbert, quintela

27.04.2020 11:19, Denis Plotnikov wrote:
> 
> 
> On 25.04.2020 12:10, Vladimir Sementsov-Ogievskiy wrote:
>> 13.04.2020 14:12, Denis Plotnikov wrote:
>>> The patch adds ability to qemu-file to write the data
>>> asynchronously to improve the performance on writing.
>>> Before, only synchronous writing was supported.
>>>
>>> Enabling of the asyncronous mode is managed by new
>>> "enabled_buffered" callback.
>>
>> Hmm.
>>
>> I don't like resulting architecture very much:
>>
>> 1. Function naming is not clean: how can I understand that copy_buf is for buffered mode when add_to_iovec is +- same thing for non-buffered mode?
> Yes, this is to be changed in the next patches
>>
>> Hmm actually, you just alter several significant functions of QEMUFile - open, close, put, flush. In old mode we do one thing, in a new mode - absolutely another. This looks like a driver. So may be we want to add QEMUFileDriver struct, to define these functions as callbacks, move old realizations to default driver, and add new functionality as a new driver, what do you think?
> Yes it looks like that, but on the other hand I can't imagine another driver to be added and changing the code to "the driver notation" would involve more code adding. So, should we really do that. Anyway, your suggestion looks cleaner.
>>
>> 2. Terminology: you say you add buffered mode, but actually qemu file is already work in buffered mode, so it should be clarified somehow..
> The initial implementation uses mixed implementation, it has a buffer and an iovec array. The buffer is used to store *some* parts (ecluding RAM) of a vm state + service information. Each written to the buffer is added to the iovec array as a separate entry. RAM pages aren't added to the buffer, instead they are added to the iovec array directly without coping to the buffer. This is why we almost always get the iovec array consisting of size- and pointer- unaligned iovec-s and why we have the performance issues (more detailed in 0000 of this series).
> So, I can't say that the qemu-file has "real" buffered-mode.

But if someone will come and try to understand the code, it would be difficult because of such naming. If we keep it, we should add your description as comment to that name.

>> You also add asynchronisity, but old implementation has already qemu_put_buffer_async..
> In my opinion, this function name is kind of confusing. What the function does is adding the buffer pointer to the internal iovec array without coping the buffer. It's not related to some asynchronous operation.
>> You use aio task pool, but don't say that it may be used only from coroutine.
>> May be, we'd better call it "coroutine-mode" ?
> I don't think that mentioning any implementation-specific name is a good idea. aio_task_pool is a good option to implement async operation, but it can be any other interface.
> I'd rather implicitly assert qemu_in_coroutine() when in "buffered-mode".
>>
>>
>> Also, am I correct that new mode can't be used for read? Should we document/assert it somehow? 
> It can't just for now since read-buffered mode isn't implemented. Enabling buffered-mode for reading affects nothing - qemu-file works as before.
> The same qemu-file instance can't  be opened for reading and writing at the same time. I'd add that assert on qemu-file open.

Ah, OK. I saw it, but thought somehow that enabled writing doesn't protect from reading.

>> Or may be support reading? Will switch to snapshot benefit if we implement reading in a new mode?
> I thought about that. I think it could benefit making some kind of read-ahead to a number of buffers, while the "current" buffer is used to fill a vm state.
> I didn't want to focus on the reading improvements because it's not that big problem in comparison to the writing. The qemu-file reading uses a buffer and fills  that buffer with a single io operation.

OK

> 
>>
>>>
>>> Signed-off-by: Denis Plotnikov <dplotnikov@virtuozzo.com>
>>> ---
>>>   include/qemu/typedefs.h |   1 +
>>>   migration/qemu-file.c   | 351 +++++++++++++++++++++++++++++++++++++++++++++---
>>>   migration/qemu-file.h   |   9 ++
>>>   3 files changed, 339 insertions(+), 22 deletions(-)
>>>
>>> diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
>>> index 88dce54..9b388c8 100644
>>> --- a/include/qemu/typedefs.h
>>> +++ b/include/qemu/typedefs.h
>>> @@ -98,6 +98,7 @@ typedef struct QEMUBH QEMUBH;
>>>   typedef struct QemuConsole QemuConsole;
>>>   typedef struct QEMUFile QEMUFile;
>>>   typedef struct QEMUFileBuffer QEMUFileBuffer;
>>> +typedef struct QEMUFileAioTask QEMUFileAioTask;
>>>   typedef struct QemuLockable QemuLockable;
>>>   typedef struct QemuMutex QemuMutex;
>>>   typedef struct QemuOpt QemuOpt;
>>> diff --git a/migration/qemu-file.c b/migration/qemu-file.c
>>> index 285c6ef..f42f949 100644
>>> --- a/migration/qemu-file.c
>>> +++ b/migration/qemu-file.c
>>> @@ -29,19 +29,25 @@
>>>   #include "qemu-file.h"
>>>   #include "trace.h"
>>>   #include "qapi/error.h"
>>> +#include "block/aio_task.h"
>>>   -#define IO_BUF_SIZE 32768
>>> +#define IO_BUF_SIZE (1024 * 1024)
>>>   #define MAX_IOV_SIZE MIN(IOV_MAX, 64)
>>> +#define IO_BUF_NUM 2
>>
>> Interesting, how much is it better than if we set to 1, limiting the influence of the series to alignment of written chunks?
>>> +#define IO_BUF_ALIGNMENT 512
>>>   -QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, 512));
>>> +QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, IO_BUF_ALIGNMENT));
>>> +QEMU_BUILD_BUG_ON(IO_BUF_SIZE > INT_MAX);
>>> +QEMU_BUILD_BUG_ON(IO_BUF_NUM <= 0);
>>>     struct QEMUFileBuffer {
>>>       int buf_index;
>>> -    int buf_size; /* 0 when writing */
>>> +    int buf_size; /* 0 when non-buffered writing */
>>>       uint8_t *buf;
>>>       unsigned long *may_free;
>>>       struct iovec *iov;
>>>       unsigned int iovcnt;
>>> +    QLIST_ENTRY(QEMUFileBuffer) link;
>>>   };
>>>     struct QEMUFile {
>>> @@ -60,6 +66,22 @@ struct QEMUFile {
>>>       bool shutdown;
>>>       /* currently used buffer */
>>>       QEMUFileBuffer *current_buf;
>>> +    /*
>>> +     * with buffered_mode enabled all the data copied to 512 byte
>>> +     * aligned buffer, including iov data. Then the buffer is passed
>>> +     * to writev_buffer callback.
>>> +     */
>>> +    bool buffered_mode;
>>> +    /* for async buffer writing */
>>> +    AioTaskPool *pool;
>>> +    /* the list of free buffers, currently used on is NOT there */
>>> +    QLIST_HEAD(, QEMUFileBuffer) free_buffers;
>>> +};
>>> +
>>> +struct QEMUFileAioTask {
>>> +    AioTask task;
>>> +    QEMUFile *f;
>>> +    QEMUFileBuffer *fb;
>>>   };
>>>     /*
>>> @@ -115,10 +137,42 @@ QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops)
>>>       f->opaque = opaque;
>>>       f->ops = ops;
>>>   -    f->current_buf = g_new0(QEMUFileBuffer, 1);
>>> -    f->current_buf->buf = g_malloc(IO_BUF_SIZE);
>>> -    f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
>>> -    f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
>>> +    if (f->ops->enable_buffered) {
>>> +        f->buffered_mode = f->ops->enable_buffered(f->opaque);
>>> +    }
>>> +
>>> +    if (f->buffered_mode && qemu_file_is_writable(f)) {
>>
>> So we actually go to buffered_mode if file is writable. Then, shouldn't we otherwise set buffered_mode to false otherwise?
> buffered_mode is initialized with false

I mean, if enable_buffered() returned true but qemu_file_is_writable() returned false.. I now think it should an error.

>>
>>> +        int i;
>>> +        /*
>>> +         * in buffered_mode we don't use internal io vectors
>>> +         * and may_free bitmap, because we copy the data to be
>>> +         * written right away to the buffer
>>> +         */
>>> +        f->pool = aio_task_pool_new(IO_BUF_NUM);
>>> +
>>> +        /* allocate io buffers */
>>> +        for (i = 0; i < IO_BUF_NUM; i++) {
>>> +            QEMUFileBuffer *fb = g_new0(QEMUFileBuffer, 1);
>>> +
>>> +            fb->buf = qemu_memalign(IO_BUF_ALIGNMENT, IO_BUF_SIZE);
>>> +            fb->buf_size = IO_BUF_SIZE;
>>> +
>>> +            /*
>>> +             * put the first buffer to the current buf and the rest
>>> +             * to the list of free buffers
>>> +             */
>>> +            if (i == 0) {
>>> +                f->current_buf = fb;
>>> +            } else {
>>> +                QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
>>> +            }
>>> +        }
>>> +    } else {
>>> +        f->current_buf = g_new0(QEMUFileBuffer, 1);
>>> +        f->current_buf->buf = g_malloc(IO_BUF_SIZE);
>>> +        f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
>>> +        f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
>>> +    }
>>>         return f;
>>>   }
>>> @@ -190,6 +244,8 @@ static void qemu_iovec_release_ram(QEMUFile *f)
>>>       unsigned long idx;
>>>       QEMUFileBuffer *fb = f->current_buf;
>>>   +    assert(!f->buffered_mode);
>>> +
>>>       /* Find and release all the contiguous memory ranges marked as may_free. */
>>>       idx = find_next_bit(fb->may_free, fb->iovcnt, 0);
>>>       if (idx >= fb->iovcnt) {
>>> @@ -221,6 +277,147 @@ static void qemu_iovec_release_ram(QEMUFile *f)
>>>       bitmap_zero(fb->may_free, MAX_IOV_SIZE);
>>>   }
>>>   +static void advance_buf_ptr(QEMUFile *f, size_t size)
>>> +{
>>> +    QEMUFileBuffer *fb = f->current_buf;
>>> +    /* must not advance to 0 */
>>> +    assert(size);
>>> +    /* must not overflow buf_index (int) */
>>> +    assert(fb->buf_index + size <= INT_MAX);
>>
>> to not overflow in check: assert(fb->buf_index <= INT_MAX - size)
> good catch
>>
>>> +    /* must not exceed buf_size */
>>> +    assert(fb->buf_index + size <= fb->buf_size);
>>> +
>>> +    fb->buf_index += size;
>>> +}
>>> +
>>> +static size_t get_buf_free_size(QEMUFile *f)
>>> +{
>>> +    QEMUFileBuffer *fb = f->current_buf;
>>> +    /* buf_index can't be greated than buf_size */
>>> +    assert(fb->buf_size >= fb->buf_index);
>>> +    return fb->buf_size - fb->buf_index;
>>> +}
>>> +
>>> +static size_t get_buf_used_size(QEMUFile *f)
>>> +{
>>> +    QEMUFileBuffer *fb = f->current_buf;
>>> +    return fb->buf_index;
>>> +}
>>> +
>>> +static uint8_t *get_buf_ptr(QEMUFile *f)
>>> +{
>>> +    QEMUFileBuffer *fb = f->current_buf;
>>> +    /* protects from out of bound reading */
>>> +    assert(fb->buf_index <= IO_BUF_SIZE);
>>> +    return fb->buf + fb->buf_index;
>>> +}
>>> +
>>> +static bool buf_is_full(QEMUFile *f)
>>> +{
>>> +    return get_buf_free_size(f) == 0;
>>> +}
>>> +
>>> +static void reset_buf(QEMUFile *f)
>>> +{
>>> +    QEMUFileBuffer *fb = f->current_buf;
>>> +    fb->buf_index = 0;
>>> +}
>>> +
>>> +static int write_task_fn(AioTask *task)
>>> +{
>>> +    int ret;
>>> +    Error *local_error = NULL;
>>> +    QEMUFileAioTask *t = (QEMUFileAioTask *) task;
>>> +    QEMUFile *f = t->f;
>>> +    QEMUFileBuffer *fb = t->fb;
>>> +    uint64_t pos = f->pos;
>>> +    struct iovec v = (struct iovec) {
>>> +        .iov_base = fb->buf,
>>> +        .iov_len = fb->buf_index,
>>> +    };
>>> +
>>> +    assert(f->buffered_mode);
>>> +
>>> +    /*
>>> +     * Increment file position.
>>> +     * This needs to be here before calling writev_buffer, because
>>> +     * writev_buffer is asynchronous and there could be more than one
>>> +     * writev_buffer started simultaniously. Each writev_buffer should
>>> +     * use its own file pos to write to. writev_buffer may write less
>>> +     * than buf_index bytes but we treat this situation as an error.
>>> +     * If error appeared, further file using is meaningless.
>>> +     * We expect that, the most of the time the full buffer is written,
>>> +     * (when buf_size == buf_index). The only case when the non-full
>>> +     * buffer is written (buf_size != buf_index) is file close,
>>> +     * when we need to flush the rest of the buffer content.
>>> +     */
>>> +    f->pos += fb->buf_index;
>>
>> Seems safer to add pos to QEMUFileAioTask instead, and manage global f->pos
>> in main coroutine, not in tasks.
> I also though about that but I didn't find any benefit because with couroutines we shouldn't get race problems

Still, it would be more obvious code pattern IMHO, so that task is offset and bytes, instead of just bytes and global offset variable which we should update with care.

>>
>>> +
>>> +    ret = f->ops->writev_buffer(f->opaque, &v, 1, pos, &local_error);
>>> +
>>> +    /* return the just written buffer to the free list */
>>> +    QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
>>> +
>>> +    /* check that we have written everything */
>>> +    if (ret != fb->buf_index) {
>>> +        qemu_file_set_error_obj(f, ret < 0 ? ret : -EIO, local_error);
>>> +    }
>>> +
>>> +    /*
>>> +     * always return 0 - don't use task error handling, relay on
>>> +     * qemu file error handling
>>> +     */
>>> +    return 0;
>>> +}
>>> +
>>> +static void qemu_file_switch_current_buf(QEMUFile *f)
>>> +{
>>> +    /*
>>> +     * if the list is empty, wait until some task returns a buffer
>>> +     * to the list of free buffers.
>>> +     */
>>> +    if (QLIST_EMPTY(&f->free_buffers)) {
>>> +        aio_task_pool_wait_slot(f->pool);
>>> +    }
>>> +
>>> +    /*
>>> +     * sanity check that the list isn't empty
>>> +     * if the free list was empty, we waited for a task complition,
>>> +     * and the pompleted task must return a buffer to a list of free buffers
>>> +     */
>>> +    assert(!QLIST_EMPTY(&f->free_buffers));
>>> +
>>> +    /* set the current buffer for using from the free list */
>>> +    f->current_buf = QLIST_FIRST(&f->free_buffers);
>>> +    reset_buf(f);
>>> +
>>> +    QLIST_REMOVE(f->current_buf, link);
>>> +}
>>> +
>>> +/**
>>> + *  Asynchronously flushes QEMUFile buffer
>>> + *
>>> + * This will flush all pending data. If data was only partially flushed, it
>>> + * will set an error state. The function may return before the data actually
>>> + * written.
>>> + */
>>> +static void flush_buffer(QEMUFile *f)
>>> +{
>>> +    QEMUFileAioTask *t = g_new(QEMUFileAioTask, 1);
>>> +
>>> +    *t = (QEMUFileAioTask) {
>>> +        .task.func = &write_task_fn,
>>> +        .f = f,
>>> +        .fb = f->current_buf,
>>> +    };
>>> +
>>> +    /* aio_task_pool should free t for us */
>>> +    aio_task_pool_start_task(f->pool, (AioTask *) t);
>>> +
>>> +    /* if no errors this will switch the buffer */
>>> +    qemu_file_switch_current_buf(f);
>>> +}
>>> +
>>>   /**
>>>    * Flushes QEMUFile buffer
>>>    *
>>> @@ -241,7 +438,13 @@ void qemu_fflush(QEMUFile *f)
>>>       if (f->shutdown) {
>>>           return;
>>>       }
>>> +
>>> +    if (f->buffered_mode) {
>>> +        return;
>>
>> I don't think it's correct. qemu_fflush is public interface of QEMUFile and it's assumed to write all in-flight data.
> Yes it should, when you use sockets and you want to make sure that you've sent what your peer is waiting for.
> But this isn't the case when you write an internal snapshot, especially when you explicitly ask to open qemu-file in "buffered_mode".

Hmm, would be good to add a comment in the code..

>>
>>> +    }
>>> +
>>>       if (fb->iovcnt > 0) {
>>> +        /* this is non-buffered mode */
>>>           expect = iov_size(fb->iov, fb->iovcnt);
>>>           ret = f->ops->writev_buffer(f->opaque, fb->iov, fb->iovcnt, f->pos,
>>>                                       &local_error);
>>> @@ -378,6 +581,7 @@ static ssize_t qemu_fill_buffer(QEMUFile *f)
>>>     void qemu_update_position(QEMUFile *f, size_t size)
>>>   {

[..]


-- 
Best regards,
Vladimir


^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [RFC patch v1 2/3] qemu-file: add buffered mode
  2020-04-13 11:12 ` [RFC patch v1 2/3] qemu-file: add buffered mode Denis Plotnikov
  2020-04-24 21:25   ` Eric Blake
  2020-04-25  9:10   ` Vladimir Sementsov-Ogievskiy
@ 2020-04-27 12:14   ` Dr. David Alan Gilbert
  2020-04-28  8:06     ` Denis Plotnikov
  2020-05-04  9:08     ` Daniel P. Berrangé
  2 siblings, 2 replies; 19+ messages in thread
From: Dr. David Alan Gilbert @ 2020-04-27 12:14 UTC (permalink / raw)
  To: Denis Plotnikov; +Cc: den, qemu-devel, quintela

* Denis Plotnikov (dplotnikov@virtuozzo.com) wrote:
> The patch adds ability to qemu-file to write the data
> asynchronously to improve the performance on writing.
> Before, only synchronous writing was supported.
> 
> Enabling of the asyncronous mode is managed by new
> "enabled_buffered" callback.

It's a bit invasive isn't it - changes a lot of functions in a lot of
places!
The multifd code separated the control headers from the data on separate
fd's - but that doesn't help your case.

Is there any chance you could do this by using the existing 'save_page'
hook (that RDMA uses).

In the cover letter you mention direct qemu_fflush calls - have we got a
few too many in some palces that you think we can clean out?

Dave

> Signed-off-by: Denis Plotnikov <dplotnikov@virtuozzo.com>
> ---
>  include/qemu/typedefs.h |   1 +
>  migration/qemu-file.c   | 351 +++++++++++++++++++++++++++++++++++++++++++++---
>  migration/qemu-file.h   |   9 ++
>  3 files changed, 339 insertions(+), 22 deletions(-)
> 
> diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
> index 88dce54..9b388c8 100644
> --- a/include/qemu/typedefs.h
> +++ b/include/qemu/typedefs.h
> @@ -98,6 +98,7 @@ typedef struct QEMUBH QEMUBH;
>  typedef struct QemuConsole QemuConsole;
>  typedef struct QEMUFile QEMUFile;
>  typedef struct QEMUFileBuffer QEMUFileBuffer;
> +typedef struct QEMUFileAioTask QEMUFileAioTask;
>  typedef struct QemuLockable QemuLockable;
>  typedef struct QemuMutex QemuMutex;
>  typedef struct QemuOpt QemuOpt;
> diff --git a/migration/qemu-file.c b/migration/qemu-file.c
> index 285c6ef..f42f949 100644
> --- a/migration/qemu-file.c
> +++ b/migration/qemu-file.c
> @@ -29,19 +29,25 @@
>  #include "qemu-file.h"
>  #include "trace.h"
>  #include "qapi/error.h"
> +#include "block/aio_task.h"
>  
> -#define IO_BUF_SIZE 32768
> +#define IO_BUF_SIZE (1024 * 1024)
>  #define MAX_IOV_SIZE MIN(IOV_MAX, 64)
> +#define IO_BUF_NUM 2
> +#define IO_BUF_ALIGNMENT 512
>  
> -QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, 512));
> +QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, IO_BUF_ALIGNMENT));
> +QEMU_BUILD_BUG_ON(IO_BUF_SIZE > INT_MAX);
> +QEMU_BUILD_BUG_ON(IO_BUF_NUM <= 0);
>  
>  struct QEMUFileBuffer {
>      int buf_index;
> -    int buf_size; /* 0 when writing */
> +    int buf_size; /* 0 when non-buffered writing */
>      uint8_t *buf;
>      unsigned long *may_free;
>      struct iovec *iov;
>      unsigned int iovcnt;
> +    QLIST_ENTRY(QEMUFileBuffer) link;
>  };
>  
>  struct QEMUFile {
> @@ -60,6 +66,22 @@ struct QEMUFile {
>      bool shutdown;
>      /* currently used buffer */
>      QEMUFileBuffer *current_buf;
> +    /*
> +     * with buffered_mode enabled all the data copied to 512 byte
> +     * aligned buffer, including iov data. Then the buffer is passed
> +     * to writev_buffer callback.
> +     */
> +    bool buffered_mode;
> +    /* for async buffer writing */
> +    AioTaskPool *pool;
> +    /* the list of free buffers, currently used on is NOT there */
> +    QLIST_HEAD(, QEMUFileBuffer) free_buffers;
> +};
> +
> +struct QEMUFileAioTask {
> +    AioTask task;
> +    QEMUFile *f;
> +    QEMUFileBuffer *fb;
>  };
>  
>  /*
> @@ -115,10 +137,42 @@ QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops)
>      f->opaque = opaque;
>      f->ops = ops;
>  
> -    f->current_buf = g_new0(QEMUFileBuffer, 1);
> -    f->current_buf->buf = g_malloc(IO_BUF_SIZE);
> -    f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
> -    f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
> +    if (f->ops->enable_buffered) {
> +        f->buffered_mode = f->ops->enable_buffered(f->opaque);
> +    }
> +
> +    if (f->buffered_mode && qemu_file_is_writable(f)) {
> +        int i;
> +        /*
> +         * in buffered_mode we don't use internal io vectors
> +         * and may_free bitmap, because we copy the data to be
> +         * written right away to the buffer
> +         */
> +        f->pool = aio_task_pool_new(IO_BUF_NUM);
> +
> +        /* allocate io buffers */
> +        for (i = 0; i < IO_BUF_NUM; i++) {
> +            QEMUFileBuffer *fb = g_new0(QEMUFileBuffer, 1);
> +
> +            fb->buf = qemu_memalign(IO_BUF_ALIGNMENT, IO_BUF_SIZE);
> +            fb->buf_size = IO_BUF_SIZE;
> +
> +            /*
> +             * put the first buffer to the current buf and the rest
> +             * to the list of free buffers
> +             */
> +            if (i == 0) {
> +                f->current_buf = fb;
> +            } else {
> +                QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
> +            }
> +        }
> +    } else {
> +        f->current_buf = g_new0(QEMUFileBuffer, 1);
> +        f->current_buf->buf = g_malloc(IO_BUF_SIZE);
> +        f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
> +        f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
> +    }
>  
>      return f;
>  }
> @@ -190,6 +244,8 @@ static void qemu_iovec_release_ram(QEMUFile *f)
>      unsigned long idx;
>      QEMUFileBuffer *fb = f->current_buf;
>  
> +    assert(!f->buffered_mode);
> +
>      /* Find and release all the contiguous memory ranges marked as may_free. */
>      idx = find_next_bit(fb->may_free, fb->iovcnt, 0);
>      if (idx >= fb->iovcnt) {
> @@ -221,6 +277,147 @@ static void qemu_iovec_release_ram(QEMUFile *f)
>      bitmap_zero(fb->may_free, MAX_IOV_SIZE);
>  }
>  
> +static void advance_buf_ptr(QEMUFile *f, size_t size)
> +{
> +    QEMUFileBuffer *fb = f->current_buf;
> +    /* must not advance to 0 */
> +    assert(size);
> +    /* must not overflow buf_index (int) */
> +    assert(fb->buf_index + size <= INT_MAX);
> +    /* must not exceed buf_size */
> +    assert(fb->buf_index + size <= fb->buf_size);
> +
> +    fb->buf_index += size;
> +}
> +
> +static size_t get_buf_free_size(QEMUFile *f)
> +{
> +    QEMUFileBuffer *fb = f->current_buf;
> +    /* buf_index can't be greated than buf_size */
> +    assert(fb->buf_size >= fb->buf_index);
> +    return fb->buf_size - fb->buf_index;
> +}
> +
> +static size_t get_buf_used_size(QEMUFile *f)
> +{
> +    QEMUFileBuffer *fb = f->current_buf;
> +    return fb->buf_index;
> +}
> +
> +static uint8_t *get_buf_ptr(QEMUFile *f)
> +{
> +    QEMUFileBuffer *fb = f->current_buf;
> +    /* protects from out of bound reading */
> +    assert(fb->buf_index <= IO_BUF_SIZE);
> +    return fb->buf + fb->buf_index;
> +}
> +
> +static bool buf_is_full(QEMUFile *f)
> +{
> +    return get_buf_free_size(f) == 0;
> +}
> +
> +static void reset_buf(QEMUFile *f)
> +{
> +    QEMUFileBuffer *fb = f->current_buf;
> +    fb->buf_index = 0;
> +}
> +
> +static int write_task_fn(AioTask *task)
> +{
> +    int ret;
> +    Error *local_error = NULL;
> +    QEMUFileAioTask *t = (QEMUFileAioTask *) task;
> +    QEMUFile *f = t->f;
> +    QEMUFileBuffer *fb = t->fb;
> +    uint64_t pos = f->pos;
> +    struct iovec v = (struct iovec) {
> +        .iov_base = fb->buf,
> +        .iov_len = fb->buf_index,
> +    };
> +
> +    assert(f->buffered_mode);
> +
> +    /*
> +     * Increment file position.
> +     * This needs to be here before calling writev_buffer, because
> +     * writev_buffer is asynchronous and there could be more than one
> +     * writev_buffer started simultaniously. Each writev_buffer should
> +     * use its own file pos to write to. writev_buffer may write less
> +     * than buf_index bytes but we treat this situation as an error.
> +     * If error appeared, further file using is meaningless.
> +     * We expect that, the most of the time the full buffer is written,
> +     * (when buf_size == buf_index). The only case when the non-full
> +     * buffer is written (buf_size != buf_index) is file close,
> +     * when we need to flush the rest of the buffer content.
> +     */
> +    f->pos += fb->buf_index;
> +
> +    ret = f->ops->writev_buffer(f->opaque, &v, 1, pos, &local_error);
> +
> +    /* return the just written buffer to the free list */
> +    QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
> +
> +    /* check that we have written everything */
> +    if (ret != fb->buf_index) {
> +        qemu_file_set_error_obj(f, ret < 0 ? ret : -EIO, local_error);
> +    }
> +
> +    /*
> +     * always return 0 - don't use task error handling, relay on
> +     * qemu file error handling
> +     */
> +    return 0;
> +}
> +
> +static void qemu_file_switch_current_buf(QEMUFile *f)
> +{
> +    /*
> +     * if the list is empty, wait until some task returns a buffer
> +     * to the list of free buffers.
> +     */
> +    if (QLIST_EMPTY(&f->free_buffers)) {
> +        aio_task_pool_wait_slot(f->pool);
> +    }
> +
> +    /*
> +     * sanity check that the list isn't empty
> +     * if the free list was empty, we waited for a task complition,
> +     * and the pompleted task must return a buffer to a list of free buffers
> +     */
> +    assert(!QLIST_EMPTY(&f->free_buffers));
> +
> +    /* set the current buffer for using from the free list */
> +    f->current_buf = QLIST_FIRST(&f->free_buffers);
> +    reset_buf(f);
> +
> +    QLIST_REMOVE(f->current_buf, link);
> +}
> +
> +/**
> + *  Asynchronously flushes QEMUFile buffer
> + *
> + * This will flush all pending data. If data was only partially flushed, it
> + * will set an error state. The function may return before the data actually
> + * written.
> + */
> +static void flush_buffer(QEMUFile *f)
> +{
> +    QEMUFileAioTask *t = g_new(QEMUFileAioTask, 1);
> +
> +    *t = (QEMUFileAioTask) {
> +        .task.func = &write_task_fn,
> +        .f = f,
> +        .fb = f->current_buf,
> +    };
> +
> +    /* aio_task_pool should free t for us */
> +    aio_task_pool_start_task(f->pool, (AioTask *) t);
> +
> +    /* if no errors this will switch the buffer */
> +    qemu_file_switch_current_buf(f);
> +}
> +
>  /**
>   * Flushes QEMUFile buffer
>   *
> @@ -241,7 +438,13 @@ void qemu_fflush(QEMUFile *f)
>      if (f->shutdown) {
>          return;
>      }
> +
> +    if (f->buffered_mode) {
> +        return;
> +    }
> +
>      if (fb->iovcnt > 0) {
> +        /* this is non-buffered mode */
>          expect = iov_size(fb->iov, fb->iovcnt);
>          ret = f->ops->writev_buffer(f->opaque, fb->iov, fb->iovcnt, f->pos,
>                                      &local_error);
> @@ -378,6 +581,7 @@ static ssize_t qemu_fill_buffer(QEMUFile *f)
>  
>  void qemu_update_position(QEMUFile *f, size_t size)
>  {
> +    assert(!f->buffered_mode);
>      f->pos += size;
>  }
>  
> @@ -392,7 +596,18 @@ void qemu_update_position(QEMUFile *f, size_t size)
>  int qemu_fclose(QEMUFile *f)
>  {
>      int ret;
> -    qemu_fflush(f);
> +
> +    if (qemu_file_is_writable(f) && f->buffered_mode) {
> +        ret = qemu_file_get_error(f);
> +        if (!ret) {
> +            flush_buffer(f);
> +        }
> +        /* wait until all tasks are done */
> +        aio_task_pool_wait_all(f->pool);
> +    } else {
> +        qemu_fflush(f);
> +    }
> +
>      ret = qemu_file_get_error(f);
>  
>      if (f->ops->close) {
> @@ -408,16 +623,77 @@ int qemu_fclose(QEMUFile *f)
>          ret = f->last_error;
>      }
>      error_free(f->last_error_obj);
> -    g_free(f->current_buf->buf);
> -    g_free(f->current_buf->iov);
> -    g_free(f->current_buf->may_free);
> -    g_free(f->current_buf);
> +
> +    if (f->buffered_mode) {
> +        QEMUFileBuffer *fb, *next;
> +        /*
> +         * put the current back to the free buffers list
> +         * to destroy all the buffers in one loop
> +         */
> +        QLIST_INSERT_HEAD(&f->free_buffers, f->current_buf, link);
> +
> +        /* destroy all the buffers */
> +        QLIST_FOREACH_SAFE(fb, &f->free_buffers, link, next) {
> +            QLIST_REMOVE(fb, link);
> +            /* looks like qemu_vfree pairs with qemu_memalign */
> +            qemu_vfree(fb->buf);
> +            g_free(fb);
> +        }
> +        g_free(f->pool);
> +    } else {
> +        g_free(f->current_buf->buf);
> +        g_free(f->current_buf->iov);
> +        g_free(f->current_buf->may_free);
> +        g_free(f->current_buf);
> +    }
> +
>      g_free(f);
>      trace_qemu_file_fclose();
>      return ret;
>  }
>  
>  /*
> + * Copy an external buffer to the intenal current buffer.
> + */
> +static void copy_buf(QEMUFile *f, const uint8_t *buf, size_t size,
> +                     bool may_free)
> +{
> +    size_t data_size = size;
> +    const uint8_t *src_ptr = buf;
> +
> +    assert(f->buffered_mode);
> +    assert(size <= INT_MAX);
> +
> +    while (data_size > 0) {
> +        size_t chunk_size;
> +
> +        if (buf_is_full(f)) {
> +            flush_buffer(f);
> +            if (qemu_file_get_error(f)) {
> +                return;
> +            }
> +        }
> +
> +        chunk_size = MIN(get_buf_free_size(f), data_size);
> +
> +        memcpy(get_buf_ptr(f), src_ptr, chunk_size);
> +
> +        advance_buf_ptr(f, chunk_size);
> +
> +        src_ptr += chunk_size;
> +        data_size -= chunk_size;
> +        f->bytes_xfer += chunk_size;
> +    }
> +
> +    if (may_free) {
> +        if (qemu_madvise((void *) buf, size, QEMU_MADV_DONTNEED) < 0) {
> +            error_report("migrate: madvise DONTNEED failed %p %zd: %s",
> +                         buf, size, strerror(errno));
> +        }
> +    }
> +}
> +
> +/*
>   * Add buf to iovec. Do flush if iovec is full.
>   *
>   * Return values:
> @@ -454,6 +730,9 @@ static int add_to_iovec(QEMUFile *f, const uint8_t *buf, size_t size,
>  static void add_buf_to_iovec(QEMUFile *f, size_t len)
>  {
>      QEMUFileBuffer *fb = f->current_buf;
> +
> +    assert(!f->buffered_mode);
> +
>      if (!add_to_iovec(f, fb->buf + fb->buf_index, len, false)) {
>          fb->buf_index += len;
>          if (fb->buf_index == IO_BUF_SIZE) {
> @@ -469,8 +748,12 @@ void qemu_put_buffer_async(QEMUFile *f, const uint8_t *buf, size_t size,
>          return;
>      }
>  
> -    f->bytes_xfer += size;
> -    add_to_iovec(f, buf, size, may_free);
> +    if (f->buffered_mode) {
> +        copy_buf(f, buf, size, may_free);
> +    } else {
> +        f->bytes_xfer += size;
> +        add_to_iovec(f, buf, size, may_free);
> +    }
>  }
>  
>  void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
> @@ -482,6 +765,11 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
>          return;
>      }
>  
> +    if (f->buffered_mode) {
> +        copy_buf(f, buf, size, false);
> +        return;
> +    }
> +
>      while (size > 0) {
>          l = IO_BUF_SIZE - fb->buf_index;
>          if (l > size) {
> @@ -506,15 +794,21 @@ void qemu_put_byte(QEMUFile *f, int v)
>          return;
>      }
>  
> -    fb->buf[fb->buf_index] = v;
> -    f->bytes_xfer++;
> -    add_buf_to_iovec(f, 1);
> +    if (f->buffered_mode) {
> +        copy_buf(f, (const uint8_t *) &v, 1, false);
> +    } else {
> +        fb->buf[fb->buf_index] = v;
> +        add_buf_to_iovec(f, 1);
> +        f->bytes_xfer++;
> +    }
>  }
>  
>  void qemu_file_skip(QEMUFile *f, int size)
>  {
>      QEMUFileBuffer *fb = f->current_buf;
>  
> +    assert(!f->buffered_mode);
> +
>      if (fb->buf_index + size <= fb->buf_size) {
>          fb->buf_index += size;
>      }
> @@ -672,10 +966,14 @@ int64_t qemu_ftell_fast(QEMUFile *f)
>  {
>      int64_t ret = f->pos;
>      int i;
> -    QEMUFileBuffer *fb = f->current_buf;
>  
> -    for (i = 0; i < fb->iovcnt; i++) {
> -        ret += fb->iov[i].iov_len;
> +    if (f->buffered_mode) {
> +        ret += get_buf_used_size(f);
> +    } else {
> +        QEMUFileBuffer *fb = f->current_buf;
> +        for (i = 0; i < fb->iovcnt; i++) {
> +            ret += fb->iov[i].iov_len;
> +        }
>      }
>  
>      return ret;
> @@ -683,8 +981,12 @@ int64_t qemu_ftell_fast(QEMUFile *f)
>  
>  int64_t qemu_ftell(QEMUFile *f)
>  {
> -    qemu_fflush(f);
> -    return f->pos;
> +    if (f->buffered_mode) {
> +        return qemu_ftell_fast(f);
> +    } else {
> +        qemu_fflush(f);
> +        return f->pos;
> +    }
>  }
>  
>  int qemu_file_rate_limit(QEMUFile *f)
> @@ -803,6 +1105,8 @@ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
>      QEMUFileBuffer *fb = f->current_buf;
>      ssize_t blen = IO_BUF_SIZE - fb->buf_index - sizeof(int32_t);
>  
> +    assert(!f->buffered_mode);
> +
>      if (blen < compressBound(size)) {
>          return -1;
>      }
> @@ -827,6 +1131,9 @@ int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src)
>      int len = 0;
>      QEMUFileBuffer *fb_src = f_src->current_buf;
>  
> +    assert(!f_des->buffered_mode);
> +    assert(!f_src->buffered_mode);
> +
>      if (fb_src->buf_index > 0) {
>          len = fb_src->buf_index;
>          qemu_put_buffer(f_des, fb_src->buf, fb_src->buf_index);
> diff --git a/migration/qemu-file.h b/migration/qemu-file.h
> index a9b6d6c..08655d2 100644
> --- a/migration/qemu-file.h
> +++ b/migration/qemu-file.h
> @@ -103,6 +103,14 @@ typedef QEMUFile *(QEMURetPathFunc)(void *opaque);
>  typedef int (QEMUFileShutdownFunc)(void *opaque, bool rd, bool wr,
>                                     Error **errp);
>  
> +/*
> + * Enables or disables the buffered mode
> + * Existing blocking reads/writes must be woken
> + * Returns true if the buffered mode has to be enabled,
> + * false if it has to be disabled.
> + */
> +typedef bool (QEMUFileEnableBufferedFunc)(void *opaque);
> +
>  typedef struct QEMUFileOps {
>      QEMUFileGetBufferFunc *get_buffer;
>      QEMUFileCloseFunc *close;
> @@ -110,6 +118,7 @@ typedef struct QEMUFileOps {
>      QEMUFileWritevBufferFunc *writev_buffer;
>      QEMURetPathFunc *get_return_path;
>      QEMUFileShutdownFunc *shut_down;
> +    QEMUFileEnableBufferedFunc *enable_buffered;
>  } QEMUFileOps;
>  
>  typedef struct QEMUFileHooks {
> -- 
> 1.8.3.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK



^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [RFC patch v1 2/3] qemu-file: add buffered mode
  2020-04-27 12:14   ` Dr. David Alan Gilbert
@ 2020-04-28  8:06     ` Denis Plotnikov
  2020-04-28 17:54       ` Dr. David Alan Gilbert
  2020-05-04  9:08     ` Daniel P. Berrangé
  1 sibling, 1 reply; 19+ messages in thread
From: Denis Plotnikov @ 2020-04-28  8:06 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: den, qemu-devel, quintela



On 27.04.2020 15:14, Dr. David Alan Gilbert wrote:
> * Denis Plotnikov (dplotnikov@virtuozzo.com) wrote:
>> The patch adds ability to qemu-file to write the data
>> asynchronously to improve the performance on writing.
>> Before, only synchronous writing was supported.
>>
>> Enabling of the asyncronous mode is managed by new
>> "enabled_buffered" callback.
> It's a bit invasive isn't it - changes a lot of functions in a lot of
> places!

If you mean changing the qemu-file code - yes, it is.

If you mean changing the qemu-file usage in the code - no.
The only place to change is the snapshot code when the buffered mode is 
enabled with a callback.
The change is in 03 patch of the series.

> The multifd code separated the control headers from the data on separate
> fd's - but that doesn't help your case.

yes, that doesn't help
>
> Is there any chance you could do this by using the existing 'save_page'
> hook (that RDMA uses).

I don't think so. My goal is to improve writing performance of
the internal snapshot to qcow2 image. The snapshot is saved in qcow2 as
continuous stream placed in the end of address space.
To achieve the best writing speed I need a size and base-aligned buffer
containing the vm state (with ram) which looks like that (related to ram):

... | ram page header | ram page | ram page header | ram page | ... and 
so on

to store the buffer in qcow2 with a single operation.

'save_page' would allow me not to store 'ram page' in the qemu-file 
internal structures,
and write my own ram page storing logic. I think that wouldn't help me a 
lot because:
1. I need a page with the ram page header
2. I want to reduce the number of io operations
3. I want to save other parts of vm state as fast as possible

May be I can't see the better way of using 'save page' callback.
Could you suggest anything?

Denis
> In the cover letter you mention direct qemu_fflush calls - have we got a
> few too many in some palces that you think we can clean out?

I'm not sure that some of them are excessive. To the best of my knowlege,
qemu-file is used for the source-destination communication on migration
and removing some qemu_fflush-es may break communication logic.

Snapshot is just a special case (if not the only) when we know that we 
can do buffered (cached)
writings. Do you know any other cases when the buffered (cached) mode 
could be useful?

>
> Dave
>
>> Signed-off-by: Denis Plotnikov <dplotnikov@virtuozzo.com>
>> ---
>>   include/qemu/typedefs.h |   1 +
>>   migration/qemu-file.c   | 351 +++++++++++++++++++++++++++++++++++++++++++++---
>>   migration/qemu-file.h   |   9 ++
>>   3 files changed, 339 insertions(+), 22 deletions(-)
>>
>> diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
>> index 88dce54..9b388c8 100644
>> --- a/include/qemu/typedefs.h
>> +++ b/include/qemu/typedefs.h
>> @@ -98,6 +98,7 @@ typedef struct QEMUBH QEMUBH;
>>   typedef struct QemuConsole QemuConsole;
>>   typedef struct QEMUFile QEMUFile;
>>   typedef struct QEMUFileBuffer QEMUFileBuffer;
>> +typedef struct QEMUFileAioTask QEMUFileAioTask;
>>   typedef struct QemuLockable QemuLockable;
>>   typedef struct QemuMutex QemuMutex;
>>   typedef struct QemuOpt QemuOpt;
>> diff --git a/migration/qemu-file.c b/migration/qemu-file.c
>> index 285c6ef..f42f949 100644
>> --- a/migration/qemu-file.c
>> +++ b/migration/qemu-file.c
>> @@ -29,19 +29,25 @@
>>   #include "qemu-file.h"
>>   #include "trace.h"
>>   #include "qapi/error.h"
>> +#include "block/aio_task.h"
>>   
>> -#define IO_BUF_SIZE 32768
>> +#define IO_BUF_SIZE (1024 * 1024)
>>   #define MAX_IOV_SIZE MIN(IOV_MAX, 64)
>> +#define IO_BUF_NUM 2
>> +#define IO_BUF_ALIGNMENT 512
>>   
>> -QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, 512));
>> +QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, IO_BUF_ALIGNMENT));
>> +QEMU_BUILD_BUG_ON(IO_BUF_SIZE > INT_MAX);
>> +QEMU_BUILD_BUG_ON(IO_BUF_NUM <= 0);
>>   
>>   struct QEMUFileBuffer {
>>       int buf_index;
>> -    int buf_size; /* 0 when writing */
>> +    int buf_size; /* 0 when non-buffered writing */
>>       uint8_t *buf;
>>       unsigned long *may_free;
>>       struct iovec *iov;
>>       unsigned int iovcnt;
>> +    QLIST_ENTRY(QEMUFileBuffer) link;
>>   };
>>   
>>   struct QEMUFile {
>> @@ -60,6 +66,22 @@ struct QEMUFile {
>>       bool shutdown;
>>       /* currently used buffer */
>>       QEMUFileBuffer *current_buf;
>> +    /*
>> +     * with buffered_mode enabled all the data copied to 512 byte
>> +     * aligned buffer, including iov data. Then the buffer is passed
>> +     * to writev_buffer callback.
>> +     */
>> +    bool buffered_mode;
>> +    /* for async buffer writing */
>> +    AioTaskPool *pool;
>> +    /* the list of free buffers, currently used on is NOT there */
>> +    QLIST_HEAD(, QEMUFileBuffer) free_buffers;
>> +};
>> +
>> +struct QEMUFileAioTask {
>> +    AioTask task;
>> +    QEMUFile *f;
>> +    QEMUFileBuffer *fb;
>>   };
>>   
>>   /*
>> @@ -115,10 +137,42 @@ QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops)
>>       f->opaque = opaque;
>>       f->ops = ops;
>>   
>> -    f->current_buf = g_new0(QEMUFileBuffer, 1);
>> -    f->current_buf->buf = g_malloc(IO_BUF_SIZE);
>> -    f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
>> -    f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
>> +    if (f->ops->enable_buffered) {
>> +        f->buffered_mode = f->ops->enable_buffered(f->opaque);
>> +    }
>> +
>> +    if (f->buffered_mode && qemu_file_is_writable(f)) {
>> +        int i;
>> +        /*
>> +         * in buffered_mode we don't use internal io vectors
>> +         * and may_free bitmap, because we copy the data to be
>> +         * written right away to the buffer
>> +         */
>> +        f->pool = aio_task_pool_new(IO_BUF_NUM);
>> +
>> +        /* allocate io buffers */
>> +        for (i = 0; i < IO_BUF_NUM; i++) {
>> +            QEMUFileBuffer *fb = g_new0(QEMUFileBuffer, 1);
>> +
>> +            fb->buf = qemu_memalign(IO_BUF_ALIGNMENT, IO_BUF_SIZE);
>> +            fb->buf_size = IO_BUF_SIZE;
>> +
>> +            /*
>> +             * put the first buffer to the current buf and the rest
>> +             * to the list of free buffers
>> +             */
>> +            if (i == 0) {
>> +                f->current_buf = fb;
>> +            } else {
>> +                QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
>> +            }
>> +        }
>> +    } else {
>> +        f->current_buf = g_new0(QEMUFileBuffer, 1);
>> +        f->current_buf->buf = g_malloc(IO_BUF_SIZE);
>> +        f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
>> +        f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
>> +    }
>>   
>>       return f;
>>   }
>> @@ -190,6 +244,8 @@ static void qemu_iovec_release_ram(QEMUFile *f)
>>       unsigned long idx;
>>       QEMUFileBuffer *fb = f->current_buf;
>>   
>> +    assert(!f->buffered_mode);
>> +
>>       /* Find and release all the contiguous memory ranges marked as may_free. */
>>       idx = find_next_bit(fb->may_free, fb->iovcnt, 0);
>>       if (idx >= fb->iovcnt) {
>> @@ -221,6 +277,147 @@ static void qemu_iovec_release_ram(QEMUFile *f)
>>       bitmap_zero(fb->may_free, MAX_IOV_SIZE);
>>   }
>>   
>> +static void advance_buf_ptr(QEMUFile *f, size_t size)
>> +{
>> +    QEMUFileBuffer *fb = f->current_buf;
>> +    /* must not advance to 0 */
>> +    assert(size);
>> +    /* must not overflow buf_index (int) */
>> +    assert(fb->buf_index + size <= INT_MAX);
>> +    /* must not exceed buf_size */
>> +    assert(fb->buf_index + size <= fb->buf_size);
>> +
>> +    fb->buf_index += size;
>> +}
>> +
>> +static size_t get_buf_free_size(QEMUFile *f)
>> +{
>> +    QEMUFileBuffer *fb = f->current_buf;
>> +    /* buf_index can't be greated than buf_size */
>> +    assert(fb->buf_size >= fb->buf_index);
>> +    return fb->buf_size - fb->buf_index;
>> +}
>> +
>> +static size_t get_buf_used_size(QEMUFile *f)
>> +{
>> +    QEMUFileBuffer *fb = f->current_buf;
>> +    return fb->buf_index;
>> +}
>> +
>> +static uint8_t *get_buf_ptr(QEMUFile *f)
>> +{
>> +    QEMUFileBuffer *fb = f->current_buf;
>> +    /* protects from out of bound reading */
>> +    assert(fb->buf_index <= IO_BUF_SIZE);
>> +    return fb->buf + fb->buf_index;
>> +}
>> +
>> +static bool buf_is_full(QEMUFile *f)
>> +{
>> +    return get_buf_free_size(f) == 0;
>> +}
>> +
>> +static void reset_buf(QEMUFile *f)
>> +{
>> +    QEMUFileBuffer *fb = f->current_buf;
>> +    fb->buf_index = 0;
>> +}
>> +
>> +static int write_task_fn(AioTask *task)
>> +{
>> +    int ret;
>> +    Error *local_error = NULL;
>> +    QEMUFileAioTask *t = (QEMUFileAioTask *) task;
>> +    QEMUFile *f = t->f;
>> +    QEMUFileBuffer *fb = t->fb;
>> +    uint64_t pos = f->pos;
>> +    struct iovec v = (struct iovec) {
>> +        .iov_base = fb->buf,
>> +        .iov_len = fb->buf_index,
>> +    };
>> +
>> +    assert(f->buffered_mode);
>> +
>> +    /*
>> +     * Increment file position.
>> +     * This needs to be here before calling writev_buffer, because
>> +     * writev_buffer is asynchronous and there could be more than one
>> +     * writev_buffer started simultaniously. Each writev_buffer should
>> +     * use its own file pos to write to. writev_buffer may write less
>> +     * than buf_index bytes but we treat this situation as an error.
>> +     * If error appeared, further file using is meaningless.
>> +     * We expect that, the most of the time the full buffer is written,
>> +     * (when buf_size == buf_index). The only case when the non-full
>> +     * buffer is written (buf_size != buf_index) is file close,
>> +     * when we need to flush the rest of the buffer content.
>> +     */
>> +    f->pos += fb->buf_index;
>> +
>> +    ret = f->ops->writev_buffer(f->opaque, &v, 1, pos, &local_error);
>> +
>> +    /* return the just written buffer to the free list */
>> +    QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
>> +
>> +    /* check that we have written everything */
>> +    if (ret != fb->buf_index) {
>> +        qemu_file_set_error_obj(f, ret < 0 ? ret : -EIO, local_error);
>> +    }
>> +
>> +    /*
>> +     * always return 0 - don't use task error handling, relay on
>> +     * qemu file error handling
>> +     */
>> +    return 0;
>> +}
>> +
>> +static void qemu_file_switch_current_buf(QEMUFile *f)
>> +{
>> +    /*
>> +     * if the list is empty, wait until some task returns a buffer
>> +     * to the list of free buffers.
>> +     */
>> +    if (QLIST_EMPTY(&f->free_buffers)) {
>> +        aio_task_pool_wait_slot(f->pool);
>> +    }
>> +
>> +    /*
>> +     * sanity check that the list isn't empty
>> +     * if the free list was empty, we waited for a task complition,
>> +     * and the pompleted task must return a buffer to a list of free buffers
>> +     */
>> +    assert(!QLIST_EMPTY(&f->free_buffers));
>> +
>> +    /* set the current buffer for using from the free list */
>> +    f->current_buf = QLIST_FIRST(&f->free_buffers);
>> +    reset_buf(f);
>> +
>> +    QLIST_REMOVE(f->current_buf, link);
>> +}
>> +
>> +/**
>> + *  Asynchronously flushes QEMUFile buffer
>> + *
>> + * This will flush all pending data. If data was only partially flushed, it
>> + * will set an error state. The function may return before the data actually
>> + * written.
>> + */
>> +static void flush_buffer(QEMUFile *f)
>> +{
>> +    QEMUFileAioTask *t = g_new(QEMUFileAioTask, 1);
>> +
>> +    *t = (QEMUFileAioTask) {
>> +        .task.func = &write_task_fn,
>> +        .f = f,
>> +        .fb = f->current_buf,
>> +    };
>> +
>> +    /* aio_task_pool should free t for us */
>> +    aio_task_pool_start_task(f->pool, (AioTask *) t);
>> +
>> +    /* if no errors this will switch the buffer */
>> +    qemu_file_switch_current_buf(f);
>> +}
>> +
>>   /**
>>    * Flushes QEMUFile buffer
>>    *
>> @@ -241,7 +438,13 @@ void qemu_fflush(QEMUFile *f)
>>       if (f->shutdown) {
>>           return;
>>       }
>> +
>> +    if (f->buffered_mode) {
>> +        return;
>> +    }
>> +
>>       if (fb->iovcnt > 0) {
>> +        /* this is non-buffered mode */
>>           expect = iov_size(fb->iov, fb->iovcnt);
>>           ret = f->ops->writev_buffer(f->opaque, fb->iov, fb->iovcnt, f->pos,
>>                                       &local_error);
>> @@ -378,6 +581,7 @@ static ssize_t qemu_fill_buffer(QEMUFile *f)
>>   
>>   void qemu_update_position(QEMUFile *f, size_t size)
>>   {
>> +    assert(!f->buffered_mode);
>>       f->pos += size;
>>   }
>>   
>> @@ -392,7 +596,18 @@ void qemu_update_position(QEMUFile *f, size_t size)
>>   int qemu_fclose(QEMUFile *f)
>>   {
>>       int ret;
>> -    qemu_fflush(f);
>> +
>> +    if (qemu_file_is_writable(f) && f->buffered_mode) {
>> +        ret = qemu_file_get_error(f);
>> +        if (!ret) {
>> +            flush_buffer(f);
>> +        }
>> +        /* wait until all tasks are done */
>> +        aio_task_pool_wait_all(f->pool);
>> +    } else {
>> +        qemu_fflush(f);
>> +    }
>> +
>>       ret = qemu_file_get_error(f);
>>   
>>       if (f->ops->close) {
>> @@ -408,16 +623,77 @@ int qemu_fclose(QEMUFile *f)
>>           ret = f->last_error;
>>       }
>>       error_free(f->last_error_obj);
>> -    g_free(f->current_buf->buf);
>> -    g_free(f->current_buf->iov);
>> -    g_free(f->current_buf->may_free);
>> -    g_free(f->current_buf);
>> +
>> +    if (f->buffered_mode) {
>> +        QEMUFileBuffer *fb, *next;
>> +        /*
>> +         * put the current back to the free buffers list
>> +         * to destroy all the buffers in one loop
>> +         */
>> +        QLIST_INSERT_HEAD(&f->free_buffers, f->current_buf, link);
>> +
>> +        /* destroy all the buffers */
>> +        QLIST_FOREACH_SAFE(fb, &f->free_buffers, link, next) {
>> +            QLIST_REMOVE(fb, link);
>> +            /* looks like qemu_vfree pairs with qemu_memalign */
>> +            qemu_vfree(fb->buf);
>> +            g_free(fb);
>> +        }
>> +        g_free(f->pool);
>> +    } else {
>> +        g_free(f->current_buf->buf);
>> +        g_free(f->current_buf->iov);
>> +        g_free(f->current_buf->may_free);
>> +        g_free(f->current_buf);
>> +    }
>> +
>>       g_free(f);
>>       trace_qemu_file_fclose();
>>       return ret;
>>   }
>>   
>>   /*
>> + * Copy an external buffer to the intenal current buffer.
>> + */
>> +static void copy_buf(QEMUFile *f, const uint8_t *buf, size_t size,
>> +                     bool may_free)
>> +{
>> +    size_t data_size = size;
>> +    const uint8_t *src_ptr = buf;
>> +
>> +    assert(f->buffered_mode);
>> +    assert(size <= INT_MAX);
>> +
>> +    while (data_size > 0) {
>> +        size_t chunk_size;
>> +
>> +        if (buf_is_full(f)) {
>> +            flush_buffer(f);
>> +            if (qemu_file_get_error(f)) {
>> +                return;
>> +            }
>> +        }
>> +
>> +        chunk_size = MIN(get_buf_free_size(f), data_size);
>> +
>> +        memcpy(get_buf_ptr(f), src_ptr, chunk_size);
>> +
>> +        advance_buf_ptr(f, chunk_size);
>> +
>> +        src_ptr += chunk_size;
>> +        data_size -= chunk_size;
>> +        f->bytes_xfer += chunk_size;
>> +    }
>> +
>> +    if (may_free) {
>> +        if (qemu_madvise((void *) buf, size, QEMU_MADV_DONTNEED) < 0) {
>> +            error_report("migrate: madvise DONTNEED failed %p %zd: %s",
>> +                         buf, size, strerror(errno));
>> +        }
>> +    }
>> +}
>> +
>> +/*
>>    * Add buf to iovec. Do flush if iovec is full.
>>    *
>>    * Return values:
>> @@ -454,6 +730,9 @@ static int add_to_iovec(QEMUFile *f, const uint8_t *buf, size_t size,
>>   static void add_buf_to_iovec(QEMUFile *f, size_t len)
>>   {
>>       QEMUFileBuffer *fb = f->current_buf;
>> +
>> +    assert(!f->buffered_mode);
>> +
>>       if (!add_to_iovec(f, fb->buf + fb->buf_index, len, false)) {
>>           fb->buf_index += len;
>>           if (fb->buf_index == IO_BUF_SIZE) {
>> @@ -469,8 +748,12 @@ void qemu_put_buffer_async(QEMUFile *f, const uint8_t *buf, size_t size,
>>           return;
>>       }
>>   
>> -    f->bytes_xfer += size;
>> -    add_to_iovec(f, buf, size, may_free);
>> +    if (f->buffered_mode) {
>> +        copy_buf(f, buf, size, may_free);
>> +    } else {
>> +        f->bytes_xfer += size;
>> +        add_to_iovec(f, buf, size, may_free);
>> +    }
>>   }
>>   
>>   void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
>> @@ -482,6 +765,11 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
>>           return;
>>       }
>>   
>> +    if (f->buffered_mode) {
>> +        copy_buf(f, buf, size, false);
>> +        return;
>> +    }
>> +
>>       while (size > 0) {
>>           l = IO_BUF_SIZE - fb->buf_index;
>>           if (l > size) {
>> @@ -506,15 +794,21 @@ void qemu_put_byte(QEMUFile *f, int v)
>>           return;
>>       }
>>   
>> -    fb->buf[fb->buf_index] = v;
>> -    f->bytes_xfer++;
>> -    add_buf_to_iovec(f, 1);
>> +    if (f->buffered_mode) {
>> +        copy_buf(f, (const uint8_t *) &v, 1, false);
>> +    } else {
>> +        fb->buf[fb->buf_index] = v;
>> +        add_buf_to_iovec(f, 1);
>> +        f->bytes_xfer++;
>> +    }
>>   }
>>   
>>   void qemu_file_skip(QEMUFile *f, int size)
>>   {
>>       QEMUFileBuffer *fb = f->current_buf;
>>   
>> +    assert(!f->buffered_mode);
>> +
>>       if (fb->buf_index + size <= fb->buf_size) {
>>           fb->buf_index += size;
>>       }
>> @@ -672,10 +966,14 @@ int64_t qemu_ftell_fast(QEMUFile *f)
>>   {
>>       int64_t ret = f->pos;
>>       int i;
>> -    QEMUFileBuffer *fb = f->current_buf;
>>   
>> -    for (i = 0; i < fb->iovcnt; i++) {
>> -        ret += fb->iov[i].iov_len;
>> +    if (f->buffered_mode) {
>> +        ret += get_buf_used_size(f);
>> +    } else {
>> +        QEMUFileBuffer *fb = f->current_buf;
>> +        for (i = 0; i < fb->iovcnt; i++) {
>> +            ret += fb->iov[i].iov_len;
>> +        }
>>       }
>>   
>>       return ret;
>> @@ -683,8 +981,12 @@ int64_t qemu_ftell_fast(QEMUFile *f)
>>   
>>   int64_t qemu_ftell(QEMUFile *f)
>>   {
>> -    qemu_fflush(f);
>> -    return f->pos;
>> +    if (f->buffered_mode) {
>> +        return qemu_ftell_fast(f);
>> +    } else {
>> +        qemu_fflush(f);
>> +        return f->pos;
>> +    }
>>   }
>>   
>>   int qemu_file_rate_limit(QEMUFile *f)
>> @@ -803,6 +1105,8 @@ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
>>       QEMUFileBuffer *fb = f->current_buf;
>>       ssize_t blen = IO_BUF_SIZE - fb->buf_index - sizeof(int32_t);
>>   
>> +    assert(!f->buffered_mode);
>> +
>>       if (blen < compressBound(size)) {
>>           return -1;
>>       }
>> @@ -827,6 +1131,9 @@ int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src)
>>       int len = 0;
>>       QEMUFileBuffer *fb_src = f_src->current_buf;
>>   
>> +    assert(!f_des->buffered_mode);
>> +    assert(!f_src->buffered_mode);
>> +
>>       if (fb_src->buf_index > 0) {
>>           len = fb_src->buf_index;
>>           qemu_put_buffer(f_des, fb_src->buf, fb_src->buf_index);
>> diff --git a/migration/qemu-file.h b/migration/qemu-file.h
>> index a9b6d6c..08655d2 100644
>> --- a/migration/qemu-file.h
>> +++ b/migration/qemu-file.h
>> @@ -103,6 +103,14 @@ typedef QEMUFile *(QEMURetPathFunc)(void *opaque);
>>   typedef int (QEMUFileShutdownFunc)(void *opaque, bool rd, bool wr,
>>                                      Error **errp);
>>   
>> +/*
>> + * Enables or disables the buffered mode
>> + * Existing blocking reads/writes must be woken
>> + * Returns true if the buffered mode has to be enabled,
>> + * false if it has to be disabled.
>> + */
>> +typedef bool (QEMUFileEnableBufferedFunc)(void *opaque);
>> +
>>   typedef struct QEMUFileOps {
>>       QEMUFileGetBufferFunc *get_buffer;
>>       QEMUFileCloseFunc *close;
>> @@ -110,6 +118,7 @@ typedef struct QEMUFileOps {
>>       QEMUFileWritevBufferFunc *writev_buffer;
>>       QEMURetPathFunc *get_return_path;
>>       QEMUFileShutdownFunc *shut_down;
>> +    QEMUFileEnableBufferedFunc *enable_buffered;
>>   } QEMUFileOps;
>>   
>>   typedef struct QEMUFileHooks {
>> -- 
>> 1.8.3.1
>>
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
>



^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [RFC patch v1 2/3] qemu-file: add buffered mode
  2020-04-28  8:06     ` Denis Plotnikov
@ 2020-04-28 17:54       ` Dr. David Alan Gilbert
  2020-04-28 20:25         ` Denis Plotnikov
  0 siblings, 1 reply; 19+ messages in thread
From: Dr. David Alan Gilbert @ 2020-04-28 17:54 UTC (permalink / raw)
  To: Denis Plotnikov; +Cc: den, qemu-devel, quintela

* Denis Plotnikov (dplotnikov@virtuozzo.com) wrote:
> 
> 
> On 27.04.2020 15:14, Dr. David Alan Gilbert wrote:
> > * Denis Plotnikov (dplotnikov@virtuozzo.com) wrote:
> > > The patch adds ability to qemu-file to write the data
> > > asynchronously to improve the performance on writing.
> > > Before, only synchronous writing was supported.
> > > 
> > > Enabling of the asyncronous mode is managed by new
> > > "enabled_buffered" callback.
> > It's a bit invasive isn't it - changes a lot of functions in a lot of
> > places!
> 
> If you mean changing the qemu-file code - yes, it is.

Yeh that's what I worry about; qemu-file is pretty complex as it is.
Especially when it then passes it to the channel code etc

> If you mean changing the qemu-file usage in the code - no.
> The only place to change is the snapshot code when the buffered mode is
> enabled with a callback.
> The change is in 03 patch of the series.

That's fine - that's easy.

> > The multifd code separated the control headers from the data on separate
> > fd's - but that doesn't help your case.
> 
> yes, that doesn't help
> > 
> > Is there any chance you could do this by using the existing 'save_page'
> > hook (that RDMA uses).
> 
> I don't think so. My goal is to improve writing performance of
> the internal snapshot to qcow2 image. The snapshot is saved in qcow2 as
> continuous stream placed in the end of address space.
> To achieve the best writing speed I need a size and base-aligned buffer
> containing the vm state (with ram) which looks like that (related to ram):
> 
> ... | ram page header | ram page | ram page header | ram page | ... and so
> on
> 
> to store the buffer in qcow2 with a single operation.
> 
> 'save_page' would allow me not to store 'ram page' in the qemu-file internal
> structures,
> and write my own ram page storing logic. I think that wouldn't help me a lot
> because:
> 1. I need a page with the ram page header
> 2. I want to reduce the number of io operations
> 3. I want to save other parts of vm state as fast as possible
> 
> May be I can't see the better way of using 'save page' callback.
> Could you suggest anything?

I guess it depends if we care about keeping the format of the snapshot
the same here;  if we were open to changing it, then we could use
the save_page hook to delay the writes, so we'd have a pile of headers
followed by a pile of pages.

> Denis
> > In the cover letter you mention direct qemu_fflush calls - have we got a
> > few too many in some palces that you think we can clean out?
> 
> I'm not sure that some of them are excessive. To the best of my knowlege,
> qemu-file is used for the source-destination communication on migration
> and removing some qemu_fflush-es may break communication logic.

I can't see any obvious places where it's called during the ram
migration; can you try and give me a hint to where you're seeing it ?

> Snapshot is just a special case (if not the only) when we know that we can
> do buffered (cached)
> writings. Do you know any other cases when the buffered (cached) mode could
> be useful?

The RDMA code does it because it's really not good at small transfers,
but maybe generally it would be a good idea to do larger writes if
possible - something that multifd manages.

Dave

> 
> > 
> > Dave
> > 
> > > Signed-off-by: Denis Plotnikov <dplotnikov@virtuozzo.com>
> > > ---
> > >   include/qemu/typedefs.h |   1 +
> > >   migration/qemu-file.c   | 351 +++++++++++++++++++++++++++++++++++++++++++++---
> > >   migration/qemu-file.h   |   9 ++
> > >   3 files changed, 339 insertions(+), 22 deletions(-)
> > > 
> > > diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
> > > index 88dce54..9b388c8 100644
> > > --- a/include/qemu/typedefs.h
> > > +++ b/include/qemu/typedefs.h
> > > @@ -98,6 +98,7 @@ typedef struct QEMUBH QEMUBH;
> > >   typedef struct QemuConsole QemuConsole;
> > >   typedef struct QEMUFile QEMUFile;
> > >   typedef struct QEMUFileBuffer QEMUFileBuffer;
> > > +typedef struct QEMUFileAioTask QEMUFileAioTask;
> > >   typedef struct QemuLockable QemuLockable;
> > >   typedef struct QemuMutex QemuMutex;
> > >   typedef struct QemuOpt QemuOpt;
> > > diff --git a/migration/qemu-file.c b/migration/qemu-file.c
> > > index 285c6ef..f42f949 100644
> > > --- a/migration/qemu-file.c
> > > +++ b/migration/qemu-file.c
> > > @@ -29,19 +29,25 @@
> > >   #include "qemu-file.h"
> > >   #include "trace.h"
> > >   #include "qapi/error.h"
> > > +#include "block/aio_task.h"
> > > -#define IO_BUF_SIZE 32768
> > > +#define IO_BUF_SIZE (1024 * 1024)
> > >   #define MAX_IOV_SIZE MIN(IOV_MAX, 64)
> > > +#define IO_BUF_NUM 2
> > > +#define IO_BUF_ALIGNMENT 512
> > > -QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, 512));
> > > +QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, IO_BUF_ALIGNMENT));
> > > +QEMU_BUILD_BUG_ON(IO_BUF_SIZE > INT_MAX);
> > > +QEMU_BUILD_BUG_ON(IO_BUF_NUM <= 0);
> > >   struct QEMUFileBuffer {
> > >       int buf_index;
> > > -    int buf_size; /* 0 when writing */
> > > +    int buf_size; /* 0 when non-buffered writing */
> > >       uint8_t *buf;
> > >       unsigned long *may_free;
> > >       struct iovec *iov;
> > >       unsigned int iovcnt;
> > > +    QLIST_ENTRY(QEMUFileBuffer) link;
> > >   };
> > >   struct QEMUFile {
> > > @@ -60,6 +66,22 @@ struct QEMUFile {
> > >       bool shutdown;
> > >       /* currently used buffer */
> > >       QEMUFileBuffer *current_buf;
> > > +    /*
> > > +     * with buffered_mode enabled all the data copied to 512 byte
> > > +     * aligned buffer, including iov data. Then the buffer is passed
> > > +     * to writev_buffer callback.
> > > +     */
> > > +    bool buffered_mode;
> > > +    /* for async buffer writing */
> > > +    AioTaskPool *pool;
> > > +    /* the list of free buffers, currently used on is NOT there */
> > > +    QLIST_HEAD(, QEMUFileBuffer) free_buffers;
> > > +};
> > > +
> > > +struct QEMUFileAioTask {
> > > +    AioTask task;
> > > +    QEMUFile *f;
> > > +    QEMUFileBuffer *fb;
> > >   };
> > >   /*
> > > @@ -115,10 +137,42 @@ QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops)
> > >       f->opaque = opaque;
> > >       f->ops = ops;
> > > -    f->current_buf = g_new0(QEMUFileBuffer, 1);
> > > -    f->current_buf->buf = g_malloc(IO_BUF_SIZE);
> > > -    f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
> > > -    f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
> > > +    if (f->ops->enable_buffered) {
> > > +        f->buffered_mode = f->ops->enable_buffered(f->opaque);
> > > +    }
> > > +
> > > +    if (f->buffered_mode && qemu_file_is_writable(f)) {
> > > +        int i;
> > > +        /*
> > > +         * in buffered_mode we don't use internal io vectors
> > > +         * and may_free bitmap, because we copy the data to be
> > > +         * written right away to the buffer
> > > +         */
> > > +        f->pool = aio_task_pool_new(IO_BUF_NUM);
> > > +
> > > +        /* allocate io buffers */
> > > +        for (i = 0; i < IO_BUF_NUM; i++) {
> > > +            QEMUFileBuffer *fb = g_new0(QEMUFileBuffer, 1);
> > > +
> > > +            fb->buf = qemu_memalign(IO_BUF_ALIGNMENT, IO_BUF_SIZE);
> > > +            fb->buf_size = IO_BUF_SIZE;
> > > +
> > > +            /*
> > > +             * put the first buffer to the current buf and the rest
> > > +             * to the list of free buffers
> > > +             */
> > > +            if (i == 0) {
> > > +                f->current_buf = fb;
> > > +            } else {
> > > +                QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
> > > +            }
> > > +        }
> > > +    } else {
> > > +        f->current_buf = g_new0(QEMUFileBuffer, 1);
> > > +        f->current_buf->buf = g_malloc(IO_BUF_SIZE);
> > > +        f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
> > > +        f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
> > > +    }
> > >       return f;
> > >   }
> > > @@ -190,6 +244,8 @@ static void qemu_iovec_release_ram(QEMUFile *f)
> > >       unsigned long idx;
> > >       QEMUFileBuffer *fb = f->current_buf;
> > > +    assert(!f->buffered_mode);
> > > +
> > >       /* Find and release all the contiguous memory ranges marked as may_free. */
> > >       idx = find_next_bit(fb->may_free, fb->iovcnt, 0);
> > >       if (idx >= fb->iovcnt) {
> > > @@ -221,6 +277,147 @@ static void qemu_iovec_release_ram(QEMUFile *f)
> > >       bitmap_zero(fb->may_free, MAX_IOV_SIZE);
> > >   }
> > > +static void advance_buf_ptr(QEMUFile *f, size_t size)
> > > +{
> > > +    QEMUFileBuffer *fb = f->current_buf;
> > > +    /* must not advance to 0 */
> > > +    assert(size);
> > > +    /* must not overflow buf_index (int) */
> > > +    assert(fb->buf_index + size <= INT_MAX);
> > > +    /* must not exceed buf_size */
> > > +    assert(fb->buf_index + size <= fb->buf_size);
> > > +
> > > +    fb->buf_index += size;
> > > +}
> > > +
> > > +static size_t get_buf_free_size(QEMUFile *f)
> > > +{
> > > +    QEMUFileBuffer *fb = f->current_buf;
> > > +    /* buf_index can't be greated than buf_size */
> > > +    assert(fb->buf_size >= fb->buf_index);
> > > +    return fb->buf_size - fb->buf_index;
> > > +}
> > > +
> > > +static size_t get_buf_used_size(QEMUFile *f)
> > > +{
> > > +    QEMUFileBuffer *fb = f->current_buf;
> > > +    return fb->buf_index;
> > > +}
> > > +
> > > +static uint8_t *get_buf_ptr(QEMUFile *f)
> > > +{
> > > +    QEMUFileBuffer *fb = f->current_buf;
> > > +    /* protects from out of bound reading */
> > > +    assert(fb->buf_index <= IO_BUF_SIZE);
> > > +    return fb->buf + fb->buf_index;
> > > +}
> > > +
> > > +static bool buf_is_full(QEMUFile *f)
> > > +{
> > > +    return get_buf_free_size(f) == 0;
> > > +}
> > > +
> > > +static void reset_buf(QEMUFile *f)
> > > +{
> > > +    QEMUFileBuffer *fb = f->current_buf;
> > > +    fb->buf_index = 0;
> > > +}
> > > +
> > > +static int write_task_fn(AioTask *task)
> > > +{
> > > +    int ret;
> > > +    Error *local_error = NULL;
> > > +    QEMUFileAioTask *t = (QEMUFileAioTask *) task;
> > > +    QEMUFile *f = t->f;
> > > +    QEMUFileBuffer *fb = t->fb;
> > > +    uint64_t pos = f->pos;
> > > +    struct iovec v = (struct iovec) {
> > > +        .iov_base = fb->buf,
> > > +        .iov_len = fb->buf_index,
> > > +    };
> > > +
> > > +    assert(f->buffered_mode);
> > > +
> > > +    /*
> > > +     * Increment file position.
> > > +     * This needs to be here before calling writev_buffer, because
> > > +     * writev_buffer is asynchronous and there could be more than one
> > > +     * writev_buffer started simultaniously. Each writev_buffer should
> > > +     * use its own file pos to write to. writev_buffer may write less
> > > +     * than buf_index bytes but we treat this situation as an error.
> > > +     * If error appeared, further file using is meaningless.
> > > +     * We expect that, the most of the time the full buffer is written,
> > > +     * (when buf_size == buf_index). The only case when the non-full
> > > +     * buffer is written (buf_size != buf_index) is file close,
> > > +     * when we need to flush the rest of the buffer content.
> > > +     */
> > > +    f->pos += fb->buf_index;
> > > +
> > > +    ret = f->ops->writev_buffer(f->opaque, &v, 1, pos, &local_error);
> > > +
> > > +    /* return the just written buffer to the free list */
> > > +    QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
> > > +
> > > +    /* check that we have written everything */
> > > +    if (ret != fb->buf_index) {
> > > +        qemu_file_set_error_obj(f, ret < 0 ? ret : -EIO, local_error);
> > > +    }
> > > +
> > > +    /*
> > > +     * always return 0 - don't use task error handling, relay on
> > > +     * qemu file error handling
> > > +     */
> > > +    return 0;
> > > +}
> > > +
> > > +static void qemu_file_switch_current_buf(QEMUFile *f)
> > > +{
> > > +    /*
> > > +     * if the list is empty, wait until some task returns a buffer
> > > +     * to the list of free buffers.
> > > +     */
> > > +    if (QLIST_EMPTY(&f->free_buffers)) {
> > > +        aio_task_pool_wait_slot(f->pool);
> > > +    }
> > > +
> > > +    /*
> > > +     * sanity check that the list isn't empty
> > > +     * if the free list was empty, we waited for a task complition,
> > > +     * and the pompleted task must return a buffer to a list of free buffers
> > > +     */
> > > +    assert(!QLIST_EMPTY(&f->free_buffers));
> > > +
> > > +    /* set the current buffer for using from the free list */
> > > +    f->current_buf = QLIST_FIRST(&f->free_buffers);
> > > +    reset_buf(f);
> > > +
> > > +    QLIST_REMOVE(f->current_buf, link);
> > > +}
> > > +
> > > +/**
> > > + *  Asynchronously flushes QEMUFile buffer
> > > + *
> > > + * This will flush all pending data. If data was only partially flushed, it
> > > + * will set an error state. The function may return before the data actually
> > > + * written.
> > > + */
> > > +static void flush_buffer(QEMUFile *f)
> > > +{
> > > +    QEMUFileAioTask *t = g_new(QEMUFileAioTask, 1);
> > > +
> > > +    *t = (QEMUFileAioTask) {
> > > +        .task.func = &write_task_fn,
> > > +        .f = f,
> > > +        .fb = f->current_buf,
> > > +    };
> > > +
> > > +    /* aio_task_pool should free t for us */
> > > +    aio_task_pool_start_task(f->pool, (AioTask *) t);
> > > +
> > > +    /* if no errors this will switch the buffer */
> > > +    qemu_file_switch_current_buf(f);
> > > +}
> > > +
> > >   /**
> > >    * Flushes QEMUFile buffer
> > >    *
> > > @@ -241,7 +438,13 @@ void qemu_fflush(QEMUFile *f)
> > >       if (f->shutdown) {
> > >           return;
> > >       }
> > > +
> > > +    if (f->buffered_mode) {
> > > +        return;
> > > +    }
> > > +
> > >       if (fb->iovcnt > 0) {
> > > +        /* this is non-buffered mode */
> > >           expect = iov_size(fb->iov, fb->iovcnt);
> > >           ret = f->ops->writev_buffer(f->opaque, fb->iov, fb->iovcnt, f->pos,
> > >                                       &local_error);
> > > @@ -378,6 +581,7 @@ static ssize_t qemu_fill_buffer(QEMUFile *f)
> > >   void qemu_update_position(QEMUFile *f, size_t size)
> > >   {
> > > +    assert(!f->buffered_mode);
> > >       f->pos += size;
> > >   }
> > > @@ -392,7 +596,18 @@ void qemu_update_position(QEMUFile *f, size_t size)
> > >   int qemu_fclose(QEMUFile *f)
> > >   {
> > >       int ret;
> > > -    qemu_fflush(f);
> > > +
> > > +    if (qemu_file_is_writable(f) && f->buffered_mode) {
> > > +        ret = qemu_file_get_error(f);
> > > +        if (!ret) {
> > > +            flush_buffer(f);
> > > +        }
> > > +        /* wait until all tasks are done */
> > > +        aio_task_pool_wait_all(f->pool);
> > > +    } else {
> > > +        qemu_fflush(f);
> > > +    }
> > > +
> > >       ret = qemu_file_get_error(f);
> > >       if (f->ops->close) {
> > > @@ -408,16 +623,77 @@ int qemu_fclose(QEMUFile *f)
> > >           ret = f->last_error;
> > >       }
> > >       error_free(f->last_error_obj);
> > > -    g_free(f->current_buf->buf);
> > > -    g_free(f->current_buf->iov);
> > > -    g_free(f->current_buf->may_free);
> > > -    g_free(f->current_buf);
> > > +
> > > +    if (f->buffered_mode) {
> > > +        QEMUFileBuffer *fb, *next;
> > > +        /*
> > > +         * put the current back to the free buffers list
> > > +         * to destroy all the buffers in one loop
> > > +         */
> > > +        QLIST_INSERT_HEAD(&f->free_buffers, f->current_buf, link);
> > > +
> > > +        /* destroy all the buffers */
> > > +        QLIST_FOREACH_SAFE(fb, &f->free_buffers, link, next) {
> > > +            QLIST_REMOVE(fb, link);
> > > +            /* looks like qemu_vfree pairs with qemu_memalign */
> > > +            qemu_vfree(fb->buf);
> > > +            g_free(fb);
> > > +        }
> > > +        g_free(f->pool);
> > > +    } else {
> > > +        g_free(f->current_buf->buf);
> > > +        g_free(f->current_buf->iov);
> > > +        g_free(f->current_buf->may_free);
> > > +        g_free(f->current_buf);
> > > +    }
> > > +
> > >       g_free(f);
> > >       trace_qemu_file_fclose();
> > >       return ret;
> > >   }
> > >   /*
> > > + * Copy an external buffer to the intenal current buffer.
> > > + */
> > > +static void copy_buf(QEMUFile *f, const uint8_t *buf, size_t size,
> > > +                     bool may_free)
> > > +{
> > > +    size_t data_size = size;
> > > +    const uint8_t *src_ptr = buf;
> > > +
> > > +    assert(f->buffered_mode);
> > > +    assert(size <= INT_MAX);
> > > +
> > > +    while (data_size > 0) {
> > > +        size_t chunk_size;
> > > +
> > > +        if (buf_is_full(f)) {
> > > +            flush_buffer(f);
> > > +            if (qemu_file_get_error(f)) {
> > > +                return;
> > > +            }
> > > +        }
> > > +
> > > +        chunk_size = MIN(get_buf_free_size(f), data_size);
> > > +
> > > +        memcpy(get_buf_ptr(f), src_ptr, chunk_size);
> > > +
> > > +        advance_buf_ptr(f, chunk_size);
> > > +
> > > +        src_ptr += chunk_size;
> > > +        data_size -= chunk_size;
> > > +        f->bytes_xfer += chunk_size;
> > > +    }
> > > +
> > > +    if (may_free) {
> > > +        if (qemu_madvise((void *) buf, size, QEMU_MADV_DONTNEED) < 0) {
> > > +            error_report("migrate: madvise DONTNEED failed %p %zd: %s",
> > > +                         buf, size, strerror(errno));
> > > +        }
> > > +    }
> > > +}
> > > +
> > > +/*
> > >    * Add buf to iovec. Do flush if iovec is full.
> > >    *
> > >    * Return values:
> > > @@ -454,6 +730,9 @@ static int add_to_iovec(QEMUFile *f, const uint8_t *buf, size_t size,
> > >   static void add_buf_to_iovec(QEMUFile *f, size_t len)
> > >   {
> > >       QEMUFileBuffer *fb = f->current_buf;
> > > +
> > > +    assert(!f->buffered_mode);
> > > +
> > >       if (!add_to_iovec(f, fb->buf + fb->buf_index, len, false)) {
> > >           fb->buf_index += len;
> > >           if (fb->buf_index == IO_BUF_SIZE) {
> > > @@ -469,8 +748,12 @@ void qemu_put_buffer_async(QEMUFile *f, const uint8_t *buf, size_t size,
> > >           return;
> > >       }
> > > -    f->bytes_xfer += size;
> > > -    add_to_iovec(f, buf, size, may_free);
> > > +    if (f->buffered_mode) {
> > > +        copy_buf(f, buf, size, may_free);
> > > +    } else {
> > > +        f->bytes_xfer += size;
> > > +        add_to_iovec(f, buf, size, may_free);
> > > +    }
> > >   }
> > >   void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
> > > @@ -482,6 +765,11 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
> > >           return;
> > >       }
> > > +    if (f->buffered_mode) {
> > > +        copy_buf(f, buf, size, false);
> > > +        return;
> > > +    }
> > > +
> > >       while (size > 0) {
> > >           l = IO_BUF_SIZE - fb->buf_index;
> > >           if (l > size) {
> > > @@ -506,15 +794,21 @@ void qemu_put_byte(QEMUFile *f, int v)
> > >           return;
> > >       }
> > > -    fb->buf[fb->buf_index] = v;
> > > -    f->bytes_xfer++;
> > > -    add_buf_to_iovec(f, 1);
> > > +    if (f->buffered_mode) {
> > > +        copy_buf(f, (const uint8_t *) &v, 1, false);
> > > +    } else {
> > > +        fb->buf[fb->buf_index] = v;
> > > +        add_buf_to_iovec(f, 1);
> > > +        f->bytes_xfer++;
> > > +    }
> > >   }
> > >   void qemu_file_skip(QEMUFile *f, int size)
> > >   {
> > >       QEMUFileBuffer *fb = f->current_buf;
> > > +    assert(!f->buffered_mode);
> > > +
> > >       if (fb->buf_index + size <= fb->buf_size) {
> > >           fb->buf_index += size;
> > >       }
> > > @@ -672,10 +966,14 @@ int64_t qemu_ftell_fast(QEMUFile *f)
> > >   {
> > >       int64_t ret = f->pos;
> > >       int i;
> > > -    QEMUFileBuffer *fb = f->current_buf;
> > > -    for (i = 0; i < fb->iovcnt; i++) {
> > > -        ret += fb->iov[i].iov_len;
> > > +    if (f->buffered_mode) {
> > > +        ret += get_buf_used_size(f);
> > > +    } else {
> > > +        QEMUFileBuffer *fb = f->current_buf;
> > > +        for (i = 0; i < fb->iovcnt; i++) {
> > > +            ret += fb->iov[i].iov_len;
> > > +        }
> > >       }
> > >       return ret;
> > > @@ -683,8 +981,12 @@ int64_t qemu_ftell_fast(QEMUFile *f)
> > >   int64_t qemu_ftell(QEMUFile *f)
> > >   {
> > > -    qemu_fflush(f);
> > > -    return f->pos;
> > > +    if (f->buffered_mode) {
> > > +        return qemu_ftell_fast(f);
> > > +    } else {
> > > +        qemu_fflush(f);
> > > +        return f->pos;
> > > +    }
> > >   }
> > >   int qemu_file_rate_limit(QEMUFile *f)
> > > @@ -803,6 +1105,8 @@ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
> > >       QEMUFileBuffer *fb = f->current_buf;
> > >       ssize_t blen = IO_BUF_SIZE - fb->buf_index - sizeof(int32_t);
> > > +    assert(!f->buffered_mode);
> > > +
> > >       if (blen < compressBound(size)) {
> > >           return -1;
> > >       }
> > > @@ -827,6 +1131,9 @@ int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src)
> > >       int len = 0;
> > >       QEMUFileBuffer *fb_src = f_src->current_buf;
> > > +    assert(!f_des->buffered_mode);
> > > +    assert(!f_src->buffered_mode);
> > > +
> > >       if (fb_src->buf_index > 0) {
> > >           len = fb_src->buf_index;
> > >           qemu_put_buffer(f_des, fb_src->buf, fb_src->buf_index);
> > > diff --git a/migration/qemu-file.h b/migration/qemu-file.h
> > > index a9b6d6c..08655d2 100644
> > > --- a/migration/qemu-file.h
> > > +++ b/migration/qemu-file.h
> > > @@ -103,6 +103,14 @@ typedef QEMUFile *(QEMURetPathFunc)(void *opaque);
> > >   typedef int (QEMUFileShutdownFunc)(void *opaque, bool rd, bool wr,
> > >                                      Error **errp);
> > > +/*
> > > + * Enables or disables the buffered mode
> > > + * Existing blocking reads/writes must be woken
> > > + * Returns true if the buffered mode has to be enabled,
> > > + * false if it has to be disabled.
> > > + */
> > > +typedef bool (QEMUFileEnableBufferedFunc)(void *opaque);
> > > +
> > >   typedef struct QEMUFileOps {
> > >       QEMUFileGetBufferFunc *get_buffer;
> > >       QEMUFileCloseFunc *close;
> > > @@ -110,6 +118,7 @@ typedef struct QEMUFileOps {
> > >       QEMUFileWritevBufferFunc *writev_buffer;
> > >       QEMURetPathFunc *get_return_path;
> > >       QEMUFileShutdownFunc *shut_down;
> > > +    QEMUFileEnableBufferedFunc *enable_buffered;
> > >   } QEMUFileOps;
> > >   typedef struct QEMUFileHooks {
> > > -- 
> > > 1.8.3.1
> > > 
> > --
> > Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
> > 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK



^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [RFC patch v1 2/3] qemu-file: add buffered mode
  2020-04-28 17:54       ` Dr. David Alan Gilbert
@ 2020-04-28 20:25         ` Denis Plotnikov
  0 siblings, 0 replies; 19+ messages in thread
From: Denis Plotnikov @ 2020-04-28 20:25 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: den, qemu-devel, quintela



On 28.04.2020 20:54, Dr. David Alan Gilbert wrote:
> * Denis Plotnikov (dplotnikov@virtuozzo.com) wrote:
>>
>> On 27.04.2020 15:14, Dr. David Alan Gilbert wrote:
>>> * Denis Plotnikov (dplotnikov@virtuozzo.com) wrote:
>>>> The patch adds ability to qemu-file to write the data
>>>> asynchronously to improve the performance on writing.
>>>> Before, only synchronous writing was supported.
>>>>
>>>> Enabling of the asyncronous mode is managed by new
>>>> "enabled_buffered" callback.
>>> It's a bit invasive isn't it - changes a lot of functions in a lot of
>>> places!
>> If you mean changing the qemu-file code - yes, it is.
> Yeh that's what I worry about; qemu-file is pretty complex as it is.
> Especially when it then passes it to the channel code etc
>
>> If you mean changing the qemu-file usage in the code - no.
>> The only place to change is the snapshot code when the buffered mode is
>> enabled with a callback.
>> The change is in 03 patch of the series.
> That's fine - that's easy.
>
>>> The multifd code separated the control headers from the data on separate
>>> fd's - but that doesn't help your case.
>> yes, that doesn't help
>>> Is there any chance you could do this by using the existing 'save_page'
>>> hook (that RDMA uses).
>> I don't think so. My goal is to improve writing performance of
>> the internal snapshot to qcow2 image. The snapshot is saved in qcow2 as
>> continuous stream placed in the end of address space.
>> To achieve the best writing speed I need a size and base-aligned buffer
>> containing the vm state (with ram) which looks like that (related to ram):
>>
>> ... | ram page header | ram page | ram page header | ram page | ... and so
>> on
>>
>> to store the buffer in qcow2 with a single operation.
>>
>> 'save_page' would allow me not to store 'ram page' in the qemu-file internal
>> structures,
>> and write my own ram page storing logic. I think that wouldn't help me a lot
>> because:
>> 1. I need a page with the ram page header
>> 2. I want to reduce the number of io operations
>> 3. I want to save other parts of vm state as fast as possible
>>
>> May be I can't see the better way of using 'save page' callback.
>> Could you suggest anything?
> I guess it depends if we care about keeping the format of the snapshot
> the same here;  if we were open to changing it, then we could use
> the save_page hook to delay the writes, so we'd have a pile of headers
> followed by a pile of pages.

I think we have to care about keeping the format. Because many users 
already have internal snapshots
saved in the qcow2 images, if we change the format we can't load 
snapshots from those images
as well as make snapshots non-readable for older qemu-s or we need to 
support two versions of format
which I think is too complicated.

>
>> Denis
>>> In the cover letter you mention direct qemu_fflush calls - have we got a
>>> few too many in some palces that you think we can clean out?
>> I'm not sure that some of them are excessive. To the best of my knowlege,
>> qemu-file is used for the source-destination communication on migration
>> and removing some qemu_fflush-es may break communication logic.
> I can't see any obvious places where it's called during the ram
> migration; can you try and give me a hint to where you're seeing it ?

I think those qemu_fflush-es aren't in the ram migration rather than in 
other vm state parts.
Although, those parts are quite small in comparison to ram, I saw quite 
a lot of qemu_fflush-es while debugging.
Still, we could benefit saving them with fewer number of io operation if 
we going to use buffered mode.

Denis

>
>> Snapshot is just a special case (if not the only) when we know that we can
>> do buffered (cached)
>> writings. Do you know any other cases when the buffered (cached) mode could
>> be useful?
> The RDMA code does it because it's really not good at small transfers,
> but maybe generally it would be a good idea to do larger writes if
> possible - something that multifd manages.
>
> Dave
>
>>> Dave
>>>
>>>> Signed-off-by: Denis Plotnikov <dplotnikov@virtuozzo.com>
>>>> ---
>>>>    include/qemu/typedefs.h |   1 +
>>>>    migration/qemu-file.c   | 351 +++++++++++++++++++++++++++++++++++++++++++++---
>>>>    migration/qemu-file.h   |   9 ++
>>>>    3 files changed, 339 insertions(+), 22 deletions(-)
>>>>
>>>> diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
>>>> index 88dce54..9b388c8 100644
>>>> --- a/include/qemu/typedefs.h
>>>> +++ b/include/qemu/typedefs.h
>>>> @@ -98,6 +98,7 @@ typedef struct QEMUBH QEMUBH;
>>>>    typedef struct QemuConsole QemuConsole;
>>>>    typedef struct QEMUFile QEMUFile;
>>>>    typedef struct QEMUFileBuffer QEMUFileBuffer;
>>>> +typedef struct QEMUFileAioTask QEMUFileAioTask;
>>>>    typedef struct QemuLockable QemuLockable;
>>>>    typedef struct QemuMutex QemuMutex;
>>>>    typedef struct QemuOpt QemuOpt;
>>>> diff --git a/migration/qemu-file.c b/migration/qemu-file.c
>>>> index 285c6ef..f42f949 100644
>>>> --- a/migration/qemu-file.c
>>>> +++ b/migration/qemu-file.c
>>>> @@ -29,19 +29,25 @@
>>>>    #include "qemu-file.h"
>>>>    #include "trace.h"
>>>>    #include "qapi/error.h"
>>>> +#include "block/aio_task.h"
>>>> -#define IO_BUF_SIZE 32768
>>>> +#define IO_BUF_SIZE (1024 * 1024)
>>>>    #define MAX_IOV_SIZE MIN(IOV_MAX, 64)
>>>> +#define IO_BUF_NUM 2
>>>> +#define IO_BUF_ALIGNMENT 512
>>>> -QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, 512));
>>>> +QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, IO_BUF_ALIGNMENT));
>>>> +QEMU_BUILD_BUG_ON(IO_BUF_SIZE > INT_MAX);
>>>> +QEMU_BUILD_BUG_ON(IO_BUF_NUM <= 0);
>>>>    struct QEMUFileBuffer {
>>>>        int buf_index;
>>>> -    int buf_size; /* 0 when writing */
>>>> +    int buf_size; /* 0 when non-buffered writing */
>>>>        uint8_t *buf;
>>>>        unsigned long *may_free;
>>>>        struct iovec *iov;
>>>>        unsigned int iovcnt;
>>>> +    QLIST_ENTRY(QEMUFileBuffer) link;
>>>>    };
>>>>    struct QEMUFile {
>>>> @@ -60,6 +66,22 @@ struct QEMUFile {
>>>>        bool shutdown;
>>>>        /* currently used buffer */
>>>>        QEMUFileBuffer *current_buf;
>>>> +    /*
>>>> +     * with buffered_mode enabled all the data copied to 512 byte
>>>> +     * aligned buffer, including iov data. Then the buffer is passed
>>>> +     * to writev_buffer callback.
>>>> +     */
>>>> +    bool buffered_mode;
>>>> +    /* for async buffer writing */
>>>> +    AioTaskPool *pool;
>>>> +    /* the list of free buffers, currently used on is NOT there */
>>>> +    QLIST_HEAD(, QEMUFileBuffer) free_buffers;
>>>> +};
>>>> +
>>>> +struct QEMUFileAioTask {
>>>> +    AioTask task;
>>>> +    QEMUFile *f;
>>>> +    QEMUFileBuffer *fb;
>>>>    };
>>>>    /*
>>>> @@ -115,10 +137,42 @@ QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops)
>>>>        f->opaque = opaque;
>>>>        f->ops = ops;
>>>> -    f->current_buf = g_new0(QEMUFileBuffer, 1);
>>>> -    f->current_buf->buf = g_malloc(IO_BUF_SIZE);
>>>> -    f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
>>>> -    f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
>>>> +    if (f->ops->enable_buffered) {
>>>> +        f->buffered_mode = f->ops->enable_buffered(f->opaque);
>>>> +    }
>>>> +
>>>> +    if (f->buffered_mode && qemu_file_is_writable(f)) {
>>>> +        int i;
>>>> +        /*
>>>> +         * in buffered_mode we don't use internal io vectors
>>>> +         * and may_free bitmap, because we copy the data to be
>>>> +         * written right away to the buffer
>>>> +         */
>>>> +        f->pool = aio_task_pool_new(IO_BUF_NUM);
>>>> +
>>>> +        /* allocate io buffers */
>>>> +        for (i = 0; i < IO_BUF_NUM; i++) {
>>>> +            QEMUFileBuffer *fb = g_new0(QEMUFileBuffer, 1);
>>>> +
>>>> +            fb->buf = qemu_memalign(IO_BUF_ALIGNMENT, IO_BUF_SIZE);
>>>> +            fb->buf_size = IO_BUF_SIZE;
>>>> +
>>>> +            /*
>>>> +             * put the first buffer to the current buf and the rest
>>>> +             * to the list of free buffers
>>>> +             */
>>>> +            if (i == 0) {
>>>> +                f->current_buf = fb;
>>>> +            } else {
>>>> +                QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
>>>> +            }
>>>> +        }
>>>> +    } else {
>>>> +        f->current_buf = g_new0(QEMUFileBuffer, 1);
>>>> +        f->current_buf->buf = g_malloc(IO_BUF_SIZE);
>>>> +        f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
>>>> +        f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
>>>> +    }
>>>>        return f;
>>>>    }
>>>> @@ -190,6 +244,8 @@ static void qemu_iovec_release_ram(QEMUFile *f)
>>>>        unsigned long idx;
>>>>        QEMUFileBuffer *fb = f->current_buf;
>>>> +    assert(!f->buffered_mode);
>>>> +
>>>>        /* Find and release all the contiguous memory ranges marked as may_free. */
>>>>        idx = find_next_bit(fb->may_free, fb->iovcnt, 0);
>>>>        if (idx >= fb->iovcnt) {
>>>> @@ -221,6 +277,147 @@ static void qemu_iovec_release_ram(QEMUFile *f)
>>>>        bitmap_zero(fb->may_free, MAX_IOV_SIZE);
>>>>    }
>>>> +static void advance_buf_ptr(QEMUFile *f, size_t size)
>>>> +{
>>>> +    QEMUFileBuffer *fb = f->current_buf;
>>>> +    /* must not advance to 0 */
>>>> +    assert(size);
>>>> +    /* must not overflow buf_index (int) */
>>>> +    assert(fb->buf_index + size <= INT_MAX);
>>>> +    /* must not exceed buf_size */
>>>> +    assert(fb->buf_index + size <= fb->buf_size);
>>>> +
>>>> +    fb->buf_index += size;
>>>> +}
>>>> +
>>>> +static size_t get_buf_free_size(QEMUFile *f)
>>>> +{
>>>> +    QEMUFileBuffer *fb = f->current_buf;
>>>> +    /* buf_index can't be greated than buf_size */
>>>> +    assert(fb->buf_size >= fb->buf_index);
>>>> +    return fb->buf_size - fb->buf_index;
>>>> +}
>>>> +
>>>> +static size_t get_buf_used_size(QEMUFile *f)
>>>> +{
>>>> +    QEMUFileBuffer *fb = f->current_buf;
>>>> +    return fb->buf_index;
>>>> +}
>>>> +
>>>> +static uint8_t *get_buf_ptr(QEMUFile *f)
>>>> +{
>>>> +    QEMUFileBuffer *fb = f->current_buf;
>>>> +    /* protects from out of bound reading */
>>>> +    assert(fb->buf_index <= IO_BUF_SIZE);
>>>> +    return fb->buf + fb->buf_index;
>>>> +}
>>>> +
>>>> +static bool buf_is_full(QEMUFile *f)
>>>> +{
>>>> +    return get_buf_free_size(f) == 0;
>>>> +}
>>>> +
>>>> +static void reset_buf(QEMUFile *f)
>>>> +{
>>>> +    QEMUFileBuffer *fb = f->current_buf;
>>>> +    fb->buf_index = 0;
>>>> +}
>>>> +
>>>> +static int write_task_fn(AioTask *task)
>>>> +{
>>>> +    int ret;
>>>> +    Error *local_error = NULL;
>>>> +    QEMUFileAioTask *t = (QEMUFileAioTask *) task;
>>>> +    QEMUFile *f = t->f;
>>>> +    QEMUFileBuffer *fb = t->fb;
>>>> +    uint64_t pos = f->pos;
>>>> +    struct iovec v = (struct iovec) {
>>>> +        .iov_base = fb->buf,
>>>> +        .iov_len = fb->buf_index,
>>>> +    };
>>>> +
>>>> +    assert(f->buffered_mode);
>>>> +
>>>> +    /*
>>>> +     * Increment file position.
>>>> +     * This needs to be here before calling writev_buffer, because
>>>> +     * writev_buffer is asynchronous and there could be more than one
>>>> +     * writev_buffer started simultaniously. Each writev_buffer should
>>>> +     * use its own file pos to write to. writev_buffer may write less
>>>> +     * than buf_index bytes but we treat this situation as an error.
>>>> +     * If error appeared, further file using is meaningless.
>>>> +     * We expect that, the most of the time the full buffer is written,
>>>> +     * (when buf_size == buf_index). The only case when the non-full
>>>> +     * buffer is written (buf_size != buf_index) is file close,
>>>> +     * when we need to flush the rest of the buffer content.
>>>> +     */
>>>> +    f->pos += fb->buf_index;
>>>> +
>>>> +    ret = f->ops->writev_buffer(f->opaque, &v, 1, pos, &local_error);
>>>> +
>>>> +    /* return the just written buffer to the free list */
>>>> +    QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
>>>> +
>>>> +    /* check that we have written everything */
>>>> +    if (ret != fb->buf_index) {
>>>> +        qemu_file_set_error_obj(f, ret < 0 ? ret : -EIO, local_error);
>>>> +    }
>>>> +
>>>> +    /*
>>>> +     * always return 0 - don't use task error handling, relay on
>>>> +     * qemu file error handling
>>>> +     */
>>>> +    return 0;
>>>> +}
>>>> +
>>>> +static void qemu_file_switch_current_buf(QEMUFile *f)
>>>> +{
>>>> +    /*
>>>> +     * if the list is empty, wait until some task returns a buffer
>>>> +     * to the list of free buffers.
>>>> +     */
>>>> +    if (QLIST_EMPTY(&f->free_buffers)) {
>>>> +        aio_task_pool_wait_slot(f->pool);
>>>> +    }
>>>> +
>>>> +    /*
>>>> +     * sanity check that the list isn't empty
>>>> +     * if the free list was empty, we waited for a task complition,
>>>> +     * and the pompleted task must return a buffer to a list of free buffers
>>>> +     */
>>>> +    assert(!QLIST_EMPTY(&f->free_buffers));
>>>> +
>>>> +    /* set the current buffer for using from the free list */
>>>> +    f->current_buf = QLIST_FIRST(&f->free_buffers);
>>>> +    reset_buf(f);
>>>> +
>>>> +    QLIST_REMOVE(f->current_buf, link);
>>>> +}
>>>> +
>>>> +/**
>>>> + *  Asynchronously flushes QEMUFile buffer
>>>> + *
>>>> + * This will flush all pending data. If data was only partially flushed, it
>>>> + * will set an error state. The function may return before the data actually
>>>> + * written.
>>>> + */
>>>> +static void flush_buffer(QEMUFile *f)
>>>> +{
>>>> +    QEMUFileAioTask *t = g_new(QEMUFileAioTask, 1);
>>>> +
>>>> +    *t = (QEMUFileAioTask) {
>>>> +        .task.func = &write_task_fn,
>>>> +        .f = f,
>>>> +        .fb = f->current_buf,
>>>> +    };
>>>> +
>>>> +    /* aio_task_pool should free t for us */
>>>> +    aio_task_pool_start_task(f->pool, (AioTask *) t);
>>>> +
>>>> +    /* if no errors this will switch the buffer */
>>>> +    qemu_file_switch_current_buf(f);
>>>> +}
>>>> +
>>>>    /**
>>>>     * Flushes QEMUFile buffer
>>>>     *
>>>> @@ -241,7 +438,13 @@ void qemu_fflush(QEMUFile *f)
>>>>        if (f->shutdown) {
>>>>            return;
>>>>        }
>>>> +
>>>> +    if (f->buffered_mode) {
>>>> +        return;
>>>> +    }
>>>> +
>>>>        if (fb->iovcnt > 0) {
>>>> +        /* this is non-buffered mode */
>>>>            expect = iov_size(fb->iov, fb->iovcnt);
>>>>            ret = f->ops->writev_buffer(f->opaque, fb->iov, fb->iovcnt, f->pos,
>>>>                                        &local_error);
>>>> @@ -378,6 +581,7 @@ static ssize_t qemu_fill_buffer(QEMUFile *f)
>>>>    void qemu_update_position(QEMUFile *f, size_t size)
>>>>    {
>>>> +    assert(!f->buffered_mode);
>>>>        f->pos += size;
>>>>    }
>>>> @@ -392,7 +596,18 @@ void qemu_update_position(QEMUFile *f, size_t size)
>>>>    int qemu_fclose(QEMUFile *f)
>>>>    {
>>>>        int ret;
>>>> -    qemu_fflush(f);
>>>> +
>>>> +    if (qemu_file_is_writable(f) && f->buffered_mode) {
>>>> +        ret = qemu_file_get_error(f);
>>>> +        if (!ret) {
>>>> +            flush_buffer(f);
>>>> +        }
>>>> +        /* wait until all tasks are done */
>>>> +        aio_task_pool_wait_all(f->pool);
>>>> +    } else {
>>>> +        qemu_fflush(f);
>>>> +    }
>>>> +
>>>>        ret = qemu_file_get_error(f);
>>>>        if (f->ops->close) {
>>>> @@ -408,16 +623,77 @@ int qemu_fclose(QEMUFile *f)
>>>>            ret = f->last_error;
>>>>        }
>>>>        error_free(f->last_error_obj);
>>>> -    g_free(f->current_buf->buf);
>>>> -    g_free(f->current_buf->iov);
>>>> -    g_free(f->current_buf->may_free);
>>>> -    g_free(f->current_buf);
>>>> +
>>>> +    if (f->buffered_mode) {
>>>> +        QEMUFileBuffer *fb, *next;
>>>> +        /*
>>>> +         * put the current back to the free buffers list
>>>> +         * to destroy all the buffers in one loop
>>>> +         */
>>>> +        QLIST_INSERT_HEAD(&f->free_buffers, f->current_buf, link);
>>>> +
>>>> +        /* destroy all the buffers */
>>>> +        QLIST_FOREACH_SAFE(fb, &f->free_buffers, link, next) {
>>>> +            QLIST_REMOVE(fb, link);
>>>> +            /* looks like qemu_vfree pairs with qemu_memalign */
>>>> +            qemu_vfree(fb->buf);
>>>> +            g_free(fb);
>>>> +        }
>>>> +        g_free(f->pool);
>>>> +    } else {
>>>> +        g_free(f->current_buf->buf);
>>>> +        g_free(f->current_buf->iov);
>>>> +        g_free(f->current_buf->may_free);
>>>> +        g_free(f->current_buf);
>>>> +    }
>>>> +
>>>>        g_free(f);
>>>>        trace_qemu_file_fclose();
>>>>        return ret;
>>>>    }
>>>>    /*
>>>> + * Copy an external buffer to the intenal current buffer.
>>>> + */
>>>> +static void copy_buf(QEMUFile *f, const uint8_t *buf, size_t size,
>>>> +                     bool may_free)
>>>> +{
>>>> +    size_t data_size = size;
>>>> +    const uint8_t *src_ptr = buf;
>>>> +
>>>> +    assert(f->buffered_mode);
>>>> +    assert(size <= INT_MAX);
>>>> +
>>>> +    while (data_size > 0) {
>>>> +        size_t chunk_size;
>>>> +
>>>> +        if (buf_is_full(f)) {
>>>> +            flush_buffer(f);
>>>> +            if (qemu_file_get_error(f)) {
>>>> +                return;
>>>> +            }
>>>> +        }
>>>> +
>>>> +        chunk_size = MIN(get_buf_free_size(f), data_size);
>>>> +
>>>> +        memcpy(get_buf_ptr(f), src_ptr, chunk_size);
>>>> +
>>>> +        advance_buf_ptr(f, chunk_size);
>>>> +
>>>> +        src_ptr += chunk_size;
>>>> +        data_size -= chunk_size;
>>>> +        f->bytes_xfer += chunk_size;
>>>> +    }
>>>> +
>>>> +    if (may_free) {
>>>> +        if (qemu_madvise((void *) buf, size, QEMU_MADV_DONTNEED) < 0) {
>>>> +            error_report("migrate: madvise DONTNEED failed %p %zd: %s",
>>>> +                         buf, size, strerror(errno));
>>>> +        }
>>>> +    }
>>>> +}
>>>> +
>>>> +/*
>>>>     * Add buf to iovec. Do flush if iovec is full.
>>>>     *
>>>>     * Return values:
>>>> @@ -454,6 +730,9 @@ static int add_to_iovec(QEMUFile *f, const uint8_t *buf, size_t size,
>>>>    static void add_buf_to_iovec(QEMUFile *f, size_t len)
>>>>    {
>>>>        QEMUFileBuffer *fb = f->current_buf;
>>>> +
>>>> +    assert(!f->buffered_mode);
>>>> +
>>>>        if (!add_to_iovec(f, fb->buf + fb->buf_index, len, false)) {
>>>>            fb->buf_index += len;
>>>>            if (fb->buf_index == IO_BUF_SIZE) {
>>>> @@ -469,8 +748,12 @@ void qemu_put_buffer_async(QEMUFile *f, const uint8_t *buf, size_t size,
>>>>            return;
>>>>        }
>>>> -    f->bytes_xfer += size;
>>>> -    add_to_iovec(f, buf, size, may_free);
>>>> +    if (f->buffered_mode) {
>>>> +        copy_buf(f, buf, size, may_free);
>>>> +    } else {
>>>> +        f->bytes_xfer += size;
>>>> +        add_to_iovec(f, buf, size, may_free);
>>>> +    }
>>>>    }
>>>>    void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
>>>> @@ -482,6 +765,11 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
>>>>            return;
>>>>        }
>>>> +    if (f->buffered_mode) {
>>>> +        copy_buf(f, buf, size, false);
>>>> +        return;
>>>> +    }
>>>> +
>>>>        while (size > 0) {
>>>>            l = IO_BUF_SIZE - fb->buf_index;
>>>>            if (l > size) {
>>>> @@ -506,15 +794,21 @@ void qemu_put_byte(QEMUFile *f, int v)
>>>>            return;
>>>>        }
>>>> -    fb->buf[fb->buf_index] = v;
>>>> -    f->bytes_xfer++;
>>>> -    add_buf_to_iovec(f, 1);
>>>> +    if (f->buffered_mode) {
>>>> +        copy_buf(f, (const uint8_t *) &v, 1, false);
>>>> +    } else {
>>>> +        fb->buf[fb->buf_index] = v;
>>>> +        add_buf_to_iovec(f, 1);
>>>> +        f->bytes_xfer++;
>>>> +    }
>>>>    }
>>>>    void qemu_file_skip(QEMUFile *f, int size)
>>>>    {
>>>>        QEMUFileBuffer *fb = f->current_buf;
>>>> +    assert(!f->buffered_mode);
>>>> +
>>>>        if (fb->buf_index + size <= fb->buf_size) {
>>>>            fb->buf_index += size;
>>>>        }
>>>> @@ -672,10 +966,14 @@ int64_t qemu_ftell_fast(QEMUFile *f)
>>>>    {
>>>>        int64_t ret = f->pos;
>>>>        int i;
>>>> -    QEMUFileBuffer *fb = f->current_buf;
>>>> -    for (i = 0; i < fb->iovcnt; i++) {
>>>> -        ret += fb->iov[i].iov_len;
>>>> +    if (f->buffered_mode) {
>>>> +        ret += get_buf_used_size(f);
>>>> +    } else {
>>>> +        QEMUFileBuffer *fb = f->current_buf;
>>>> +        for (i = 0; i < fb->iovcnt; i++) {
>>>> +            ret += fb->iov[i].iov_len;
>>>> +        }
>>>>        }
>>>>        return ret;
>>>> @@ -683,8 +981,12 @@ int64_t qemu_ftell_fast(QEMUFile *f)
>>>>    int64_t qemu_ftell(QEMUFile *f)
>>>>    {
>>>> -    qemu_fflush(f);
>>>> -    return f->pos;
>>>> +    if (f->buffered_mode) {
>>>> +        return qemu_ftell_fast(f);
>>>> +    } else {
>>>> +        qemu_fflush(f);
>>>> +        return f->pos;
>>>> +    }
>>>>    }
>>>>    int qemu_file_rate_limit(QEMUFile *f)
>>>> @@ -803,6 +1105,8 @@ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
>>>>        QEMUFileBuffer *fb = f->current_buf;
>>>>        ssize_t blen = IO_BUF_SIZE - fb->buf_index - sizeof(int32_t);
>>>> +    assert(!f->buffered_mode);
>>>> +
>>>>        if (blen < compressBound(size)) {
>>>>            return -1;
>>>>        }
>>>> @@ -827,6 +1131,9 @@ int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src)
>>>>        int len = 0;
>>>>        QEMUFileBuffer *fb_src = f_src->current_buf;
>>>> +    assert(!f_des->buffered_mode);
>>>> +    assert(!f_src->buffered_mode);
>>>> +
>>>>        if (fb_src->buf_index > 0) {
>>>>            len = fb_src->buf_index;
>>>>            qemu_put_buffer(f_des, fb_src->buf, fb_src->buf_index);
>>>> diff --git a/migration/qemu-file.h b/migration/qemu-file.h
>>>> index a9b6d6c..08655d2 100644
>>>> --- a/migration/qemu-file.h
>>>> +++ b/migration/qemu-file.h
>>>> @@ -103,6 +103,14 @@ typedef QEMUFile *(QEMURetPathFunc)(void *opaque);
>>>>    typedef int (QEMUFileShutdownFunc)(void *opaque, bool rd, bool wr,
>>>>                                       Error **errp);
>>>> +/*
>>>> + * Enables or disables the buffered mode
>>>> + * Existing blocking reads/writes must be woken
>>>> + * Returns true if the buffered mode has to be enabled,
>>>> + * false if it has to be disabled.
>>>> + */
>>>> +typedef bool (QEMUFileEnableBufferedFunc)(void *opaque);
>>>> +
>>>>    typedef struct QEMUFileOps {
>>>>        QEMUFileGetBufferFunc *get_buffer;
>>>>        QEMUFileCloseFunc *close;
>>>> @@ -110,6 +118,7 @@ typedef struct QEMUFileOps {
>>>>        QEMUFileWritevBufferFunc *writev_buffer;
>>>>        QEMURetPathFunc *get_return_path;
>>>>        QEMUFileShutdownFunc *shut_down;
>>>> +    QEMUFileEnableBufferedFunc *enable_buffered;
>>>>    } QEMUFileOps;
>>>>    typedef struct QEMUFileHooks {
>>>> -- 
>>>> 1.8.3.1
>>>>
>>> --
>>> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
>>>
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
>



^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [RFC patch v1 2/3] qemu-file: add buffered mode
  2020-04-27 12:14   ` Dr. David Alan Gilbert
  2020-04-28  8:06     ` Denis Plotnikov
@ 2020-05-04  9:08     ` Daniel P. Berrangé
  1 sibling, 0 replies; 19+ messages in thread
From: Daniel P. Berrangé @ 2020-05-04  9:08 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: den, Denis Plotnikov, qemu-devel, quintela

On Mon, Apr 27, 2020 at 01:14:33PM +0100, Dr. David Alan Gilbert wrote:
> * Denis Plotnikov (dplotnikov@virtuozzo.com) wrote:
> > The patch adds ability to qemu-file to write the data
> > asynchronously to improve the performance on writing.
> > Before, only synchronous writing was supported.
> > 
> > Enabling of the asyncronous mode is managed by new
> > "enabled_buffered" callback.
> 
> It's a bit invasive isn't it - changes a lot of functions in a lot of
> places!
> The multifd code separated the control headers from the data on separate
> fd's - but that doesn't help your case.
> 
> Is there any chance you could do this by using the existing 'save_page'
> hook (that RDMA uses).
> 
> In the cover letter you mention direct qemu_fflush calls - have we got a
> few too many in some palces that you think we can clean out?

When I first introduced the QIOChannel framework, I hoped that we could
largely eliminate QEMUFile as a concept.  Thus I'm a bit suspicious of
the idea of introducing more functionality to QEMUFile, especially as the
notion of buffering I/O is rather generic. Is there scope for having a
QIOChannelBuffered object for doing buffering. Would that provide better
isolation from the migration code and thus be less invasive/complex to
maintain ?


Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|



^ permalink raw reply	[flat|nested] 19+ messages in thread

end of thread, other threads:[~2020-05-04  9:09 UTC | newest]

Thread overview: 19+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-04-13 11:12 [RFC patch v1 0/3] qemu-file writing performance improving Denis Plotnikov
2020-04-13 11:12 ` [RFC patch v1 1/3] qemu-file: introduce current buffer Denis Plotnikov
2020-04-24 17:47   ` Vladimir Sementsov-Ogievskiy
2020-04-24 21:12   ` Eric Blake
2020-04-13 11:12 ` [RFC patch v1 2/3] qemu-file: add buffered mode Denis Plotnikov
2020-04-24 21:25   ` Eric Blake
2020-04-27  8:21     ` Denis Plotnikov
2020-04-25  9:10   ` Vladimir Sementsov-Ogievskiy
2020-04-27  8:19     ` Denis Plotnikov
2020-04-27 11:04       ` Vladimir Sementsov-Ogievskiy
2020-04-27 12:14   ` Dr. David Alan Gilbert
2020-04-28  8:06     ` Denis Plotnikov
2020-04-28 17:54       ` Dr. David Alan Gilbert
2020-04-28 20:25         ` Denis Plotnikov
2020-05-04  9:08     ` Daniel P. Berrangé
2020-04-13 11:12 ` [RFC patch v1 3/3] migration/savevm: use qemu-file buffered mode for non-cached bdrv Denis Plotnikov
2020-04-13 12:10 ` [RFC patch v1 0/3] qemu-file writing performance improving Denis V. Lunev
2020-04-13 13:51 ` no-reply
2020-04-21  8:13 ` Denis Plotnikov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).