qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
* [PATCH v2 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd
@ 2021-09-22  5:03 Leonardo Bras
  2021-09-22  5:03 ` [PATCH v2 1/3] QIOCHannel: Add io_async_writev & io_async_flush callbacks Leonardo Bras
                   ` (3 more replies)
  0 siblings, 4 replies; 5+ messages in thread
From: Leonardo Bras @ 2021-09-22  5:03 UTC (permalink / raw)
  To: Daniel P. Berrangé,
	Juan Quintela, Dr. David Alan Gilbert, Peter Xu, Jason Wang
  Cc: Leonardo Bras, qemu-devel

This patch series intends to enable MSG_ZEROCOPY in QIOChannel, and make
use of it for multifd migration performance improvement.

Patch #1 creates new callbacks for QIOChannel, allowing the implementation
of asynchronous writing.

Patch #2 implements async_write and async_flush on QIOChannelSocket,

Patch #3 Makes use of async_write + async_flush to enable MSG_ZEROCOPY
for migration using multifd nocomp. 


Results:
So far, the resource usage of __sys_sendmsg() reduced 15 times, and the
overall migration took 13-18% less time, based in synthetic workload.

The objective is to reduce migration time in hosts with heavy cpu usage.

---
Changes since v1:
- Reimplemented the patchset using async_write + async_flush approach.
- Implemented a flush to be able to tell whenever all data was written.

Leonardo Bras (3):
  QIOCHannel: Add io_async_writev & io_async_flush callbacks
  QIOChannelSocket: Implement io_async_write & io_async_flush
  multifd: Send using asynchronous write on nocomp to send RAM pages.

 include/io/channel-socket.h |   2 +
 include/io/channel.h        |  93 +++++++++++++++++++----
 io/channel-socket.c         | 145 ++++++++++++++++++++++++++++++++++--
 io/channel.c                |  66 ++++++++++++----
 migration/multifd.c         |   3 +-
 5 files changed, 271 insertions(+), 38 deletions(-)

-- 
2.33.0



^ permalink raw reply	[flat|nested] 5+ messages in thread

* [PATCH v2 1/3] QIOCHannel: Add io_async_writev & io_async_flush callbacks
  2021-09-22  5:03 [PATCH v2 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd Leonardo Bras
@ 2021-09-22  5:03 ` Leonardo Bras
  2021-09-22  5:03 ` [PATCH v2 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush Leonardo Bras
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 5+ messages in thread
From: Leonardo Bras @ 2021-09-22  5:03 UTC (permalink / raw)
  To: Daniel P. Berrangé,
	Juan Quintela, Dr. David Alan Gilbert, Peter Xu, Jason Wang
  Cc: Leonardo Bras, qemu-devel

Adds io_async_writev and io_async_flush as optional callback to QIOChannelClass,
allowing the implementation of asynchronous writes by subclasses.

How to use them:
- Write data using qio_channel_async_writev(),
- Wait write completion with qio_channel_async_flush().

Notes:
Some asynchronous implementations may benefit from zerocopy mechanisms, so it's
recommended to keep the write buffer untouched until the return of
qio_channel_async_flush().

As the new callbacks are optional, if a subclass does not implement them
there will be a fallback to the mandatory synchronous implementation:
- io_async_writev will fallback to io_writev,
- io_async_flush will return without changing anything.
This makes simpler for the user to make use of the asynchronous implementation.

Also, some functions like qio_channel_writev_full_all() were adapted to
offer an async version, and make better use of the new callbacks.

Signed-off-by: Leonardo Bras <leobras@redhat.com>
---
 include/io/channel.h | 93 +++++++++++++++++++++++++++++++++++++-------
 io/channel.c         | 66 ++++++++++++++++++++++++-------
 2 files changed, 129 insertions(+), 30 deletions(-)

diff --git a/include/io/channel.h b/include/io/channel.h
index 88988979f8..74f2e3ae8a 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -136,6 +136,14 @@ struct QIOChannelClass {
                                   IOHandler *io_read,
                                   IOHandler *io_write,
                                   void *opaque);
+    ssize_t (*io_async_writev)(QIOChannel *ioc,
+                               const struct iovec *iov,
+                               size_t niov,
+                               int *fds,
+                               size_t nfds,
+                               Error **errp);
+   void (*io_async_flush)(QIOChannel *ioc,
+                          Error **errp);
 };
 
 /* General I/O handling functions */
@@ -255,12 +263,17 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
  * or QIO_CHANNEL_ERR_BLOCK if no data is can be sent
  * and the channel is non-blocking
  */
-ssize_t qio_channel_writev_full(QIOChannel *ioc,
-                                const struct iovec *iov,
-                                size_t niov,
-                                int *fds,
-                                size_t nfds,
-                                Error **errp);
+ssize_t __qio_channel_writev_full(QIOChannel *ioc,
+                                  const struct iovec *iov,
+                                  size_t niov,
+                                  int *fds,
+                                  size_t nfds,
+                                  bool async,
+                                  Error **errp);
+#define qio_channel_writev_full(ioc, iov, niov, fds, nfds, errp) \
+    __qio_channel_writev_full(ioc, iov, niov, fds, nfds, false, errp)
+#define qio_channel_async_writev_full(ioc, iov, niov, fds, nfds, errp) \
+    __qio_channel_writev_full(ioc, iov, niov, fds, nfds, true, errp)
 
 /**
  * qio_channel_readv_all_eof:
@@ -339,10 +352,15 @@ int qio_channel_readv_all(QIOChannel *ioc,
  *
  * Returns: 0 if all bytes were written, or -1 on error
  */
-int qio_channel_writev_all(QIOChannel *ioc,
-                           const struct iovec *iov,
-                           size_t niov,
-                           Error **erp);
+int __qio_channel_writev_all(QIOChannel *ioc,
+                             const struct iovec *iov,
+                             size_t niov,
+                             bool async,
+                             Error **erp);
+#define qio_channel_writev_all(ioc, iov, niov, erp) \
+    __qio_channel_writev_all(ioc, iov, niov, false, erp)
+#define qio_channel_async_writev_all(ioc, iov, niov, erp) \
+    __qio_channel_writev_all(ioc, iov, niov, true, erp)
 
 /**
  * qio_channel_readv:
@@ -849,10 +867,55 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
  * Returns: 0 if all bytes were written, or -1 on error
  */
 
-int qio_channel_writev_full_all(QIOChannel *ioc,
-                                const struct iovec *iov,
-                                size_t niov,
-                                int *fds, size_t nfds,
-                                Error **errp);
+int __qio_channel_writev_full_all(QIOChannel *ioc,
+                                  const struct iovec *iov,
+                                  size_t niov,
+                                  int *fds, size_t nfds,
+                                  bool async, Error **errp);
+#define qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
+    __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, false, errp)
+#define qio_channel_async_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
+    __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, true, errp)
+
+/**
+ * qio_channel_async_writev:
+ * @ioc: the channel object
+ * @iov: the array of memory regions to write data from
+ * @niov: the length of the @iov array
+ * @fds: an array of file handles to send
+ * @nfds: number of file handles in @fds
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Behaves like qio_channel_writev_full, but will send
+ * data asynchronously, this meaning this function
+ * may return before the data is actually sent.
+ *
+ * If at some point it's necessary wait for all data to be
+ * sent, use qio_channel_async_flush().
+ *
+ * If not implemented, falls back to the default writev
+ */
+
+ssize_t qio_channel_async_writev(QIOChannel *ioc,
+                                 const struct iovec *iov,
+                                 size_t niov,
+                                 int *fds,
+                                 size_t nfds,
+                                 Error **errp);
+
+/**
+ * qio_channel_async_flush:
+ * @ioc: the channel object
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Will lock until every packet queued with qio_channel_async_writev()
+ * is sent.
+ *
+ * If not implemented, returns without changing anything.
+ */
+
+void qio_channel_async_flush(QIOChannel *ioc,
+                             Error **errp);
+
 
 #endif /* QIO_CHANNEL_H */
diff --git a/io/channel.c b/io/channel.c
index e8b019dc36..a35109a006 100644
--- a/io/channel.c
+++ b/io/channel.c
@@ -67,12 +67,13 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
 }
 
 
-ssize_t qio_channel_writev_full(QIOChannel *ioc,
-                                const struct iovec *iov,
-                                size_t niov,
-                                int *fds,
-                                size_t nfds,
-                                Error **errp)
+ssize_t __qio_channel_writev_full(QIOChannel *ioc,
+                                  const struct iovec *iov,
+                                  size_t niov,
+                                  int *fds,
+                                  size_t nfds,
+                                  bool async,
+                                  Error **errp)
 {
     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 
@@ -83,6 +84,10 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
         return -1;
     }
 
+    if (async) {
+        return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
+    }
+
     return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
 }
 
@@ -212,19 +217,20 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
     return ret;
 }
 
-int qio_channel_writev_all(QIOChannel *ioc,
-                           const struct iovec *iov,
-                           size_t niov,
-                           Error **errp)
+int __qio_channel_writev_all(QIOChannel *ioc,
+                             const struct iovec *iov,
+                             size_t niov,
+                             bool async,
+                             Error **errp)
 {
-    return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, errp);
+    return __qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, async, errp);
 }
 
-int qio_channel_writev_full_all(QIOChannel *ioc,
+int __qio_channel_writev_full_all(QIOChannel *ioc,
                                 const struct iovec *iov,
                                 size_t niov,
                                 int *fds, size_t nfds,
-                                Error **errp)
+                                bool async, Error **errp)
 {
     int ret = -1;
     struct iovec *local_iov = g_new(struct iovec, niov);
@@ -237,8 +243,8 @@ int qio_channel_writev_full_all(QIOChannel *ioc,
 
     while (nlocal_iov > 0) {
         ssize_t len;
-        len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
-                                      errp);
+        len = __qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
+                                        async, errp);
         if (len == QIO_CHANNEL_ERR_BLOCK) {
             if (qemu_in_coroutine()) {
                 qio_channel_yield(ioc, G_IO_OUT);
@@ -474,6 +480,36 @@ off_t qio_channel_io_seek(QIOChannel *ioc,
 }
 
 
+ssize_t qio_channel_async_writev(QIOChannel *ioc,
+                                 const struct iovec *iov,
+                                 size_t niov,
+                                 int *fds,
+                                 size_t nfds,
+                                 Error **errp)
+{
+     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+    if (!klass->io_async_writev) {
+        return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
+    }
+
+     return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
+}
+
+
+void qio_channel_async_flush(QIOChannel *ioc,
+                             Error **errp)
+{
+     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+    if (!klass->io_async_flush) {
+        return;
+    }
+
+     klass->io_async_flush(ioc, errp);
+}
+
+
 static void qio_channel_restart_read(void *opaque)
 {
     QIOChannel *ioc = opaque;
-- 
2.33.0



^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH v2 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush
  2021-09-22  5:03 [PATCH v2 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd Leonardo Bras
  2021-09-22  5:03 ` [PATCH v2 1/3] QIOCHannel: Add io_async_writev & io_async_flush callbacks Leonardo Bras
@ 2021-09-22  5:03 ` Leonardo Bras
  2021-09-22  5:03 ` [PATCH v2 3/3] multifd: Send using asynchronous write on nocomp to send RAM pages Leonardo Bras
  2021-09-22 22:28 ` [PATCH v2 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd Leonardo Bras Soares Passos
  3 siblings, 0 replies; 5+ messages in thread
From: Leonardo Bras @ 2021-09-22  5:03 UTC (permalink / raw)
  To: Daniel P. Berrangé,
	Juan Quintela, Dr. David Alan Gilbert, Peter Xu, Jason Wang
  Cc: Leonardo Bras, qemu-devel

Implement the new optional callbacks io_async_write and io_async_flush on
QIOChannelSocket, but enables it only when MSG_ZEROCOPY feature is
available in the host kernel, and TCP sockets are used.

qio_channel_socket_writev() contents were moved to a helper function
__qio_channel_socket_writev() which accepts an extra 'flag' argument.
This helper function is used to implement qio_channel_socket_writev(), with
flags = 0, keeping it's behavior unchanged, and
qio_channel_socket_async_writev() with flags = MSG_ZEROCOPY.

qio_channel_socket_async_flush() was implemented by reading the socket's error
queue, which will have information on MSG_ZEROCOPY send completion.
There is no need to worry with re-sending packets in case any error happens, as
MSG_ZEROCOPY only works with TCP and it will re-tranmsmit if any error ocurs.

Notes on using async_write():
- As MSG_ZEROCOPY tells the kernel to use the same user buffer to avoid copying,
some caution is necessary to avoid overwriting any buffer before it's sent.
If something like this happen, a newer version of the buffer may be sent instead.
- If this is a problem, it's recommended to use async_flush() before freeing or
re-using the buffer.
										.
Signed-off-by: Leonardo Bras <leobras@redhat.com>
---
 include/io/channel-socket.h |   2 +
 io/channel-socket.c         | 145 ++++++++++++++++++++++++++++++++++--
 2 files changed, 140 insertions(+), 7 deletions(-)

diff --git a/include/io/channel-socket.h b/include/io/channel-socket.h
index e747e63514..4d1be0637a 100644
--- a/include/io/channel-socket.h
+++ b/include/io/channel-socket.h
@@ -47,6 +47,8 @@ struct QIOChannelSocket {
     socklen_t localAddrLen;
     struct sockaddr_storage remoteAddr;
     socklen_t remoteAddrLen;
+    ssize_t async_queued;
+    ssize_t async_sent;
 };
 
 
diff --git a/io/channel-socket.c b/io/channel-socket.c
index 606ec97cf7..128fab4cd2 100644
--- a/io/channel-socket.c
+++ b/io/channel-socket.c
@@ -26,9 +26,23 @@
 #include "io/channel-watch.h"
 #include "trace.h"
 #include "qapi/clone-visitor.h"
+#ifdef CONFIG_LINUX
+#include <linux/errqueue.h>
+#include <poll.h>
+#endif
 
 #define SOCKET_MAX_FDS 16
 
+static ssize_t qio_channel_socket_async_writev(QIOChannel *ioc,
+                                               const struct iovec *iov,
+                                               size_t niov,
+                                               int *fds,
+                                               size_t nfds,
+                                               Error **errp);
+
+static void qio_channel_socket_async_flush(QIOChannel *ioc,
+                                           Error **errp);
+
 SocketAddress *
 qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
                                      Error **errp)
@@ -55,6 +69,8 @@ qio_channel_socket_new(void)
 
     sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET));
     sioc->fd = -1;
+    sioc->async_queued = 0;
+    sioc->async_sent = 0;
 
     ioc = QIO_CHANNEL(sioc);
     qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
@@ -140,6 +156,7 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
                                     Error **errp)
 {
     int fd;
+    int ret, v = 1;
 
     trace_qio_channel_socket_connect_sync(ioc, addr);
     fd = socket_connect(addr, errp);
@@ -154,6 +171,19 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
         return -1;
     }
 
+#ifdef CONFIG_LINUX
+    if (addr->type != SOCKET_ADDRESS_TYPE_INET) {
+        return 0;
+    }
+
+    ret = qemu_setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
+    if (ret >= 0) {
+        QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+        klass->io_async_writev = qio_channel_socket_async_writev;
+        klass->io_async_flush = qio_channel_socket_async_flush;
+    }
+#endif
+
     return 0;
 }
 
@@ -520,12 +550,13 @@ static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
     return ret;
 }
 
-static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
-                                         const struct iovec *iov,
-                                         size_t niov,
-                                         int *fds,
-                                         size_t nfds,
-                                         Error **errp)
+static ssize_t __qio_channel_socket_writev(QIOChannel *ioc,
+                                           const struct iovec *iov,
+                                           size_t niov,
+                                           int *fds,
+                                           size_t nfds,
+                                           int flags,
+                                           Error **errp)
 {
     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
     ssize_t ret;
@@ -558,7 +589,7 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
     }
 
  retry:
-    ret = sendmsg(sioc->fd, &msg, 0);
+    ret = sendmsg(sioc->fd, &msg, flags);
     if (ret <= 0) {
         if (errno == EAGAIN) {
             return QIO_CHANNEL_ERR_BLOCK;
@@ -572,6 +603,106 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
     }
     return ret;
 }
+
+static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
+                                         const struct iovec *iov,
+                                         size_t niov,
+                                         int *fds,
+                                         size_t nfds,
+                                         Error **errp)
+{
+    return __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, 0, errp);
+}
+
+static ssize_t qio_channel_socket_async_writev(QIOChannel *ioc,
+                                               const struct iovec *iov,
+                                               size_t niov,
+                                               int *fds,
+                                               size_t nfds,
+                                               Error **errp)
+{
+    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+
+    sioc->async_queued++;
+
+    return __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, MSG_ZEROCOPY,
+                                       errp);
+}
+
+
+static void qio_channel_socket_async_flush(QIOChannel *ioc,
+                                           Error **errp)
+{
+    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+    struct msghdr msg = {};
+    struct pollfd pfd;
+    struct sock_extended_err *serr;
+    struct cmsghdr *cm;
+    char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
+    int ret;
+
+    memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
+    msg.msg_control = control;
+    msg.msg_controllen = sizeof(control);
+
+    while (sioc->async_sent < sioc->async_queued) {
+        ret = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
+        if (ret < 0) {
+            if (errno == EAGAIN) {
+                /* Nothing on errqueue, wait */
+                pfd.fd = sioc->fd;
+                pfd.events = 0;
+                ret = poll(&pfd, 1, 250);
+                if (ret == 0) {
+                    /*
+                     * Timeout : After 250ms without receiving any zerocopy
+                     * notification, consider all data as sent.
+                     */
+                    break;
+                } else if (ret < 0 ||
+                           (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))) {
+                    error_setg_errno(errp, errno,
+                                     "Poll error");
+                    break;
+                } else {
+                    continue;
+                }
+            }
+            if (errno == EINTR) {
+                continue;
+            }
+
+            error_setg_errno(errp, errno,
+                             "Unable to read errqueue");
+            break;
+        }
+
+        cm = CMSG_FIRSTHDR(&msg);
+        if (cm->cmsg_level != SOL_IP &&
+            cm->cmsg_type != IP_RECVERR) {
+            error_setg_errno(errp, EPROTOTYPE,
+                             "Wrong cmsg in errqueue");
+            break;
+        }
+
+        serr = (void *) CMSG_DATA(cm);
+        if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
+            error_setg_errno(errp, serr->ee_errno,
+                             "Error on socket");
+            break;
+        }
+        if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
+            error_setg_errno(errp, serr->ee_origin,
+                             "Error not from zerocopy");
+            break;
+        }
+
+        /* No errors, count sent ids*/
+        sioc->async_sent += serr->ee_data - serr->ee_info + 1;
+    }
+}
+
+
 #else /* WIN32 */
 static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
                                         const struct iovec *iov,
-- 
2.33.0



^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH v2 3/3] multifd: Send using asynchronous write on nocomp to send RAM pages.
  2021-09-22  5:03 [PATCH v2 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd Leonardo Bras
  2021-09-22  5:03 ` [PATCH v2 1/3] QIOCHannel: Add io_async_writev & io_async_flush callbacks Leonardo Bras
  2021-09-22  5:03 ` [PATCH v2 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush Leonardo Bras
@ 2021-09-22  5:03 ` Leonardo Bras
  2021-09-22 22:28 ` [PATCH v2 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd Leonardo Bras Soares Passos
  3 siblings, 0 replies; 5+ messages in thread
From: Leonardo Bras @ 2021-09-22  5:03 UTC (permalink / raw)
  To: Daniel P. Berrangé,
	Juan Quintela, Dr. David Alan Gilbert, Peter Xu, Jason Wang
  Cc: Leonardo Bras, qemu-devel

Change multifd nocomp version to use asynchronous write for RAM pages, and
benefit of MSG_ZEROCOPY when it's available.

The asynchronous flush happens on cleanup only, before destroying the QIOChannel.

This will work fine on RAM migration because the RAM pages are not usually freed,
and there is no problem on changing the pages content between async_send() and
the actual sending of the buffer, because this change will dirty the page and
cause it to be re-sent on a next iteration anyway.

Signed-off-by: Leonardo Bras <leobras@redhat.com>
---
 migration/multifd.c | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/migration/multifd.c b/migration/multifd.c
index 377da78f5b..d247207a0a 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -105,7 +105,7 @@ static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used,
  */
 static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
 {
-    return qio_channel_writev_all(p->c, p->pages->iov, used, errp);
+    return qio_channel_async_writev_all(p->c, p->pages->iov, used, errp);
 }
 
 /**
@@ -546,6 +546,7 @@ void multifd_save_cleanup(void)
         MultiFDSendParams *p = &multifd_send_state->params[i];
         Error *local_err = NULL;
 
+        qio_channel_async_flush(p->c, NULL);
         socket_send_channel_destroy(p->c);
         p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
-- 
2.33.0



^ permalink raw reply related	[flat|nested] 5+ messages in thread

* Re: [PATCH v2 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd
  2021-09-22  5:03 [PATCH v2 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd Leonardo Bras
                   ` (2 preceding siblings ...)
  2021-09-22  5:03 ` [PATCH v2 3/3] multifd: Send using asynchronous write on nocomp to send RAM pages Leonardo Bras
@ 2021-09-22 22:28 ` Leonardo Bras Soares Passos
  3 siblings, 0 replies; 5+ messages in thread
From: Leonardo Bras Soares Passos @ 2021-09-22 22:28 UTC (permalink / raw)
  To: Daniel P. Berrangé,
	Juan Quintela, Dr. David Alan Gilbert, Peter Xu, Jason Wang
  Cc: qemu-devel

I sent an v3 to this series, due to some errors on testing
MSG_ZEROCOPY on a user with a low memory locking resource.
 - If there is not enough memory locking resource, fall back to sync
(non-zerocopy) approach.

Best regards,
Leo



^ permalink raw reply	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2021-09-22 22:29 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-09-22  5:03 [PATCH v2 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd Leonardo Bras
2021-09-22  5:03 ` [PATCH v2 1/3] QIOCHannel: Add io_async_writev & io_async_flush callbacks Leonardo Bras
2021-09-22  5:03 ` [PATCH v2 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush Leonardo Bras
2021-09-22  5:03 ` [PATCH v2 3/3] multifd: Send using asynchronous write on nocomp to send RAM pages Leonardo Bras
2021-09-22 22:28 ` [PATCH v2 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd Leonardo Bras Soares Passos

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).