All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH v3 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd
@ 2021-09-22 22:24 Leonardo Bras
  2021-09-22 22:24 ` [PATCH v3 1/3] QIOChannel: Add io_async_writev & io_async_flush callbacks Leonardo Bras
                   ` (3 more replies)
  0 siblings, 4 replies; 23+ messages in thread
From: Leonardo Bras @ 2021-09-22 22:24 UTC (permalink / raw)
  To: Daniel P. Berrangé,
	Juan Quintela, Dr. David Alan Gilbert, Peter Xu, Jason Wang
  Cc: Leonardo Bras, qemu-devel

This patch series intends to enable MSG_ZEROCOPY in QIOChannel, and make
use of it for multifd migration performance improvement.

Patch #1 creates new callbacks for QIOChannel, allowing the implementation
of asynchronous writing.

Patch #2 implements async_write and async_flush on QIOChannelSocket,

Patch #3 Makes use of async_write + async_flush to enable MSG_ZEROCOPY
for migration using multifd nocomp.

Results:
So far, the resource usage of __sys_sendmsg() reduced 15 times, and the
overall migration took 13-18% less time, based in synthetic workload.

The objective is to reduce migration time in hosts with heavy cpu usage.

---
Changes since v2:
- Patch #1: One more fallback
- Patch #2: Fall back to sync if fails to lock buffer memory in MSG_ZEROCOPY send.

Changes since v1:
- Reimplemented the patchset using async_write + async_flush approach.
- Implemented a flush to be able to tell whenever all data was written.

Leonardo Bras (3):
  QIOChannel: Add io_async_writev & io_async_flush callbacks
  QIOChannelSocket: Implement io_async_write & io_async_flush
  multifd: Send using asynchronous write on nocomp to send RAM pages.

 include/io/channel-socket.h |   2 +
 include/io/channel.h        |  94 ++++++++++++++++---
 io/channel-socket.c         | 176 ++++++++++++++++++++++++++++++++++--
 io/channel.c                |  66 +++++++++++---
 migration/multifd.c         |   3 +-
 5 files changed, 300 insertions(+), 41 deletions(-)

-- 
2.33.0



^ permalink raw reply	[flat|nested] 23+ messages in thread

* [PATCH v3 1/3] QIOChannel: Add io_async_writev & io_async_flush callbacks
  2021-09-22 22:24 [PATCH v3 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd Leonardo Bras
@ 2021-09-22 22:24 ` Leonardo Bras
  2021-09-24 17:16   ` Daniel P. Berrangé
  2021-09-22 22:24 ` [PATCH v3 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush Leonardo Bras
                   ` (2 subsequent siblings)
  3 siblings, 1 reply; 23+ messages in thread
From: Leonardo Bras @ 2021-09-22 22:24 UTC (permalink / raw)
  To: Daniel P. Berrangé,
	Juan Quintela, Dr. David Alan Gilbert, Peter Xu, Jason Wang
  Cc: Leonardo Bras, qemu-devel

Adds io_async_writev and io_async_flush as optional callback to QIOChannelClass,
allowing the implementation of asynchronous writes by subclasses.

How to use them:
- Write data using qio_channel_async_writev(),
- Wait write completion with qio_channel_async_flush().

Notes:
Some asynchronous implementations may benefit from zerocopy mechanisms, so it's
recommended to keep the write buffer untouched until the return of
qio_channel_async_flush().

As the new callbacks are optional, if a subclass does not implement them
there will be a fallback to the mandatory synchronous implementation:
- io_async_writev will fallback to io_writev,
- io_async_flush will return without changing anything.
This makes simpler for the user to make use of the asynchronous implementation.

Also, some functions like qio_channel_writev_full_all() were adapted to
offer an async version, and make better use of the new callbacks.

Signed-off-by: Leonardo Bras <leobras@redhat.com>
---
 include/io/channel.h | 93 +++++++++++++++++++++++++++++++++++++-------
 io/channel.c         | 66 ++++++++++++++++++++++++-------
 2 files changed, 129 insertions(+), 30 deletions(-)

diff --git a/include/io/channel.h b/include/io/channel.h
index 88988979f8..74f2e3ae8a 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -136,6 +136,14 @@ struct QIOChannelClass {
                                   IOHandler *io_read,
                                   IOHandler *io_write,
                                   void *opaque);
+    ssize_t (*io_async_writev)(QIOChannel *ioc,
+                               const struct iovec *iov,
+                               size_t niov,
+                               int *fds,
+                               size_t nfds,
+                               Error **errp);
+   void (*io_async_flush)(QIOChannel *ioc,
+                          Error **errp);
 };
 
 /* General I/O handling functions */
@@ -255,12 +263,17 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
  * or QIO_CHANNEL_ERR_BLOCK if no data is can be sent
  * and the channel is non-blocking
  */
-ssize_t qio_channel_writev_full(QIOChannel *ioc,
-                                const struct iovec *iov,
-                                size_t niov,
-                                int *fds,
-                                size_t nfds,
-                                Error **errp);
+ssize_t __qio_channel_writev_full(QIOChannel *ioc,
+                                  const struct iovec *iov,
+                                  size_t niov,
+                                  int *fds,
+                                  size_t nfds,
+                                  bool async,
+                                  Error **errp);
+#define qio_channel_writev_full(ioc, iov, niov, fds, nfds, errp) \
+    __qio_channel_writev_full(ioc, iov, niov, fds, nfds, false, errp)
+#define qio_channel_async_writev_full(ioc, iov, niov, fds, nfds, errp) \
+    __qio_channel_writev_full(ioc, iov, niov, fds, nfds, true, errp)
 
 /**
  * qio_channel_readv_all_eof:
@@ -339,10 +352,15 @@ int qio_channel_readv_all(QIOChannel *ioc,
  *
  * Returns: 0 if all bytes were written, or -1 on error
  */
-int qio_channel_writev_all(QIOChannel *ioc,
-                           const struct iovec *iov,
-                           size_t niov,
-                           Error **erp);
+int __qio_channel_writev_all(QIOChannel *ioc,
+                             const struct iovec *iov,
+                             size_t niov,
+                             bool async,
+                             Error **erp);
+#define qio_channel_writev_all(ioc, iov, niov, erp) \
+    __qio_channel_writev_all(ioc, iov, niov, false, erp)
+#define qio_channel_async_writev_all(ioc, iov, niov, erp) \
+    __qio_channel_writev_all(ioc, iov, niov, true, erp)
 
 /**
  * qio_channel_readv:
@@ -849,10 +867,55 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
  * Returns: 0 if all bytes were written, or -1 on error
  */
 
-int qio_channel_writev_full_all(QIOChannel *ioc,
-                                const struct iovec *iov,
-                                size_t niov,
-                                int *fds, size_t nfds,
-                                Error **errp);
+int __qio_channel_writev_full_all(QIOChannel *ioc,
+                                  const struct iovec *iov,
+                                  size_t niov,
+                                  int *fds, size_t nfds,
+                                  bool async, Error **errp);
+#define qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
+    __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, false, errp)
+#define qio_channel_async_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
+    __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, true, errp)
+
+/**
+ * qio_channel_async_writev:
+ * @ioc: the channel object
+ * @iov: the array of memory regions to write data from
+ * @niov: the length of the @iov array
+ * @fds: an array of file handles to send
+ * @nfds: number of file handles in @fds
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Behaves like qio_channel_writev_full, but will send
+ * data asynchronously, this meaning this function
+ * may return before the data is actually sent.
+ *
+ * If at some point it's necessary wait for all data to be
+ * sent, use qio_channel_async_flush().
+ *
+ * If not implemented, falls back to the default writev
+ */
+
+ssize_t qio_channel_async_writev(QIOChannel *ioc,
+                                 const struct iovec *iov,
+                                 size_t niov,
+                                 int *fds,
+                                 size_t nfds,
+                                 Error **errp);
+
+/**
+ * qio_channel_async_flush:
+ * @ioc: the channel object
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Will lock until every packet queued with qio_channel_async_writev()
+ * is sent.
+ *
+ * If not implemented, returns without changing anything.
+ */
+
+void qio_channel_async_flush(QIOChannel *ioc,
+                             Error **errp);
+
 
 #endif /* QIO_CHANNEL_H */
diff --git a/io/channel.c b/io/channel.c
index e8b019dc36..c4819b922f 100644
--- a/io/channel.c
+++ b/io/channel.c
@@ -67,12 +67,13 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
 }
 
 
-ssize_t qio_channel_writev_full(QIOChannel *ioc,
-                                const struct iovec *iov,
-                                size_t niov,
-                                int *fds,
-                                size_t nfds,
-                                Error **errp)
+ssize_t __qio_channel_writev_full(QIOChannel *ioc,
+                                  const struct iovec *iov,
+                                  size_t niov,
+                                  int *fds,
+                                  size_t nfds,
+                                  bool async,
+                                  Error **errp)
 {
     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 
@@ -83,6 +84,10 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
         return -1;
     }
 
+    if (async && klass->io_async_writev) {
+        return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
+    }
+
     return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
 }
 
@@ -212,19 +217,20 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
     return ret;
 }
 
-int qio_channel_writev_all(QIOChannel *ioc,
-                           const struct iovec *iov,
-                           size_t niov,
-                           Error **errp)
+int __qio_channel_writev_all(QIOChannel *ioc,
+                             const struct iovec *iov,
+                             size_t niov,
+                             bool async,
+                             Error **errp)
 {
-    return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, errp);
+    return __qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, async, errp);
 }
 
-int qio_channel_writev_full_all(QIOChannel *ioc,
+int __qio_channel_writev_full_all(QIOChannel *ioc,
                                 const struct iovec *iov,
                                 size_t niov,
                                 int *fds, size_t nfds,
-                                Error **errp)
+                                bool async, Error **errp)
 {
     int ret = -1;
     struct iovec *local_iov = g_new(struct iovec, niov);
@@ -237,8 +243,8 @@ int qio_channel_writev_full_all(QIOChannel *ioc,
 
     while (nlocal_iov > 0) {
         ssize_t len;
-        len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
-                                      errp);
+        len = __qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
+                                        async, errp);
         if (len == QIO_CHANNEL_ERR_BLOCK) {
             if (qemu_in_coroutine()) {
                 qio_channel_yield(ioc, G_IO_OUT);
@@ -474,6 +480,36 @@ off_t qio_channel_io_seek(QIOChannel *ioc,
 }
 
 
+ssize_t qio_channel_async_writev(QIOChannel *ioc,
+                                 const struct iovec *iov,
+                                 size_t niov,
+                                 int *fds,
+                                 size_t nfds,
+                                 Error **errp)
+{
+     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+    if (!klass->io_async_writev) {
+        return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
+    }
+
+     return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
+}
+
+
+void qio_channel_async_flush(QIOChannel *ioc,
+                             Error **errp)
+{
+     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+    if (!klass->io_async_flush) {
+        return;
+    }
+
+     klass->io_async_flush(ioc, errp);
+}
+
+
 static void qio_channel_restart_read(void *opaque)
 {
     QIOChannel *ioc = opaque;
-- 
2.33.0



^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH v3 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush
  2021-09-22 22:24 [PATCH v3 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd Leonardo Bras
  2021-09-22 22:24 ` [PATCH v3 1/3] QIOChannel: Add io_async_writev & io_async_flush callbacks Leonardo Bras
@ 2021-09-22 22:24 ` Leonardo Bras
  2021-09-24 17:38   ` Daniel P. Berrangé
  2021-09-28 22:45   ` Peter Xu
  2021-09-22 22:24 ` [PATCH v3 3/3] multifd: Send using asynchronous write on nocomp to send RAM pages Leonardo Bras
  2021-09-28 22:50 ` [PATCH v3 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd Peter Xu
  3 siblings, 2 replies; 23+ messages in thread
From: Leonardo Bras @ 2021-09-22 22:24 UTC (permalink / raw)
  To: Daniel P. Berrangé,
	Juan Quintela, Dr. David Alan Gilbert, Peter Xu, Jason Wang
  Cc: Leonardo Bras, qemu-devel

Implement the new optional callbacks io_async_write and io_async_flush on
QIOChannelSocket, but enables it only when MSG_ZEROCOPY feature is
available in the host kernel, and TCP sockets are used.

qio_channel_socket_writev() contents were moved to a helper function
__qio_channel_socket_writev() which accepts an extra 'flag' argument.
This helper function is used to implement qio_channel_socket_writev(), with
flags = 0, keeping it's behavior unchanged, and
qio_channel_socket_async_writev() with flags = MSG_ZEROCOPY.

qio_channel_socket_async_flush() was implemented by reading the socket's error
queue, which will have information on MSG_ZEROCOPY send completion.
There is no need to worry with re-sending packets in case any error happens, as
MSG_ZEROCOPY only works with TCP and it will re-tranmsmit if any error ocurs.

Notes on using async_write():
- As MSG_ZEROCOPY tells the kernel to use the same user buffer to avoid copying,
some caution is necessary to avoid overwriting any buffer before it's sent.
If something like this happen, a newer version of the buffer may be sent instead.
- If this is a problem, it's recommended to use async_flush() before freeing or
re-using the buffer.
- When using MSG_ZERCOCOPY, the buffer memory will be locked, so it may require
a larger amount than usually available to non-root user.
- If the required amount of locked memory is not available, it falls-back to
buffer copying behavior, and synchronous sending.

Signed-off-by: Leonardo Bras <leobras@redhat.com>
---
 include/io/channel-socket.h |   2 +
 include/io/channel.h        |   1 +
 io/channel-socket.c         | 176 ++++++++++++++++++++++++++++++++++--
 3 files changed, 169 insertions(+), 10 deletions(-)

diff --git a/include/io/channel-socket.h b/include/io/channel-socket.h
index e747e63514..4d1be0637a 100644
--- a/include/io/channel-socket.h
+++ b/include/io/channel-socket.h
@@ -47,6 +47,8 @@ struct QIOChannelSocket {
     socklen_t localAddrLen;
     struct sockaddr_storage remoteAddr;
     socklen_t remoteAddrLen;
+    ssize_t async_queued;
+    ssize_t async_sent;
 };
 
 
diff --git a/include/io/channel.h b/include/io/channel.h
index 74f2e3ae8a..611bb2ea26 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -31,6 +31,7 @@ OBJECT_DECLARE_TYPE(QIOChannel, QIOChannelClass,
 
 
 #define QIO_CHANNEL_ERR_BLOCK -2
+#define QIO_CHANNEL_ERR_NOBUFS -3
 
 typedef enum QIOChannelFeature QIOChannelFeature;
 
diff --git a/io/channel-socket.c b/io/channel-socket.c
index 606ec97cf7..c67832d0bb 100644
--- a/io/channel-socket.c
+++ b/io/channel-socket.c
@@ -26,9 +26,23 @@
 #include "io/channel-watch.h"
 #include "trace.h"
 #include "qapi/clone-visitor.h"
+#ifdef CONFIG_LINUX
+#include <linux/errqueue.h>
+#include <poll.h>
+#endif
 
 #define SOCKET_MAX_FDS 16
 
+static ssize_t qio_channel_socket_async_writev(QIOChannel *ioc,
+                                               const struct iovec *iov,
+                                               size_t niov,
+                                               int *fds,
+                                               size_t nfds,
+                                               Error **errp);
+
+static void qio_channel_socket_async_flush(QIOChannel *ioc,
+                                           Error **errp);
+
 SocketAddress *
 qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
                                      Error **errp)
@@ -55,6 +69,8 @@ qio_channel_socket_new(void)
 
     sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET));
     sioc->fd = -1;
+    sioc->async_queued = 0;
+    sioc->async_sent = 0;
 
     ioc = QIO_CHANNEL(sioc);
     qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
@@ -140,6 +156,7 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
                                     Error **errp)
 {
     int fd;
+    int ret, v = 1;
 
     trace_qio_channel_socket_connect_sync(ioc, addr);
     fd = socket_connect(addr, errp);
@@ -154,6 +171,19 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
         return -1;
     }
 
+#ifdef CONFIG_LINUX
+    if (addr->type != SOCKET_ADDRESS_TYPE_INET) {
+        return 0;
+    }
+
+    ret = qemu_setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
+    if (ret >= 0) {
+        QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+        klass->io_async_writev = qio_channel_socket_async_writev;
+        klass->io_async_flush = qio_channel_socket_async_flush;
+    }
+#endif
+
     return 0;
 }
 
@@ -520,12 +550,13 @@ static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
     return ret;
 }
 
-static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
-                                         const struct iovec *iov,
-                                         size_t niov,
-                                         int *fds,
-                                         size_t nfds,
-                                         Error **errp)
+static ssize_t __qio_channel_socket_writev(QIOChannel *ioc,
+                                           const struct iovec *iov,
+                                           size_t niov,
+                                           int *fds,
+                                           size_t nfds,
+                                           int flags,
+                                           Error **errp)
 {
     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
     ssize_t ret;
@@ -558,20 +589,145 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
     }
 
  retry:
-    ret = sendmsg(sioc->fd, &msg, 0);
+    ret = sendmsg(sioc->fd, &msg, flags);
     if (ret <= 0) {
-        if (errno == EAGAIN) {
+        switch (errno) {
+        case EAGAIN:
             return QIO_CHANNEL_ERR_BLOCK;
-        }
-        if (errno == EINTR) {
+        case EINTR:
             goto retry;
+        case ENOBUFS:
+            return QIO_CHANNEL_ERR_NOBUFS;
         }
+
         error_setg_errno(errp, errno,
                          "Unable to write to socket");
         return -1;
     }
     return ret;
 }
+
+static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
+                                         const struct iovec *iov,
+                                         size_t niov,
+                                         int *fds,
+                                         size_t nfds,
+                                         Error **errp)
+{
+    return __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, 0, errp);
+}
+
+static ssize_t qio_channel_socket_async_writev(QIOChannel *ioc,
+                                               const struct iovec *iov,
+                                               size_t niov,
+                                               int *fds,
+                                               size_t nfds,
+                                               Error **errp)
+{
+    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+    ssize_t ret;
+
+    sioc->async_queued++;
+
+    ret = __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, MSG_ZEROCOPY,
+                                       errp);
+    if (ret == QIO_CHANNEL_ERR_NOBUFS) {
+        /*
+         * Not enough locked memory available to the process.
+         * Fallback to default sync callback.
+         */
+
+        if (errp && *errp) {
+            warn_reportf_err(*errp,
+                             "Process can't lock enough memory for using MSG_ZEROCOPY,"
+                             "falling back to non-zerocopy");
+        }
+
+        QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+        klass->io_async_writev = NULL;
+        klass->io_async_flush = NULL;
+
+        /* Re-send current buffer */
+        ret = qio_channel_socket_writev(ioc, iov, niov, fds, nfds, errp);
+    }
+
+    return ret;
+}
+
+
+static void qio_channel_socket_async_flush(QIOChannel *ioc,
+                                           Error **errp)
+{
+    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+    struct msghdr msg = {};
+    struct pollfd pfd;
+    struct sock_extended_err *serr;
+    struct cmsghdr *cm;
+    char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
+    int ret;
+
+    memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
+    msg.msg_control = control;
+    msg.msg_controllen = sizeof(control);
+
+    while (sioc->async_sent < sioc->async_queued) {
+        ret = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
+        if (ret < 0) {
+            if (errno == EAGAIN) {
+                /* Nothing on errqueue, wait */
+                pfd.fd = sioc->fd;
+                pfd.events = 0;
+                ret = poll(&pfd, 1, 250);
+                if (ret == 0) {
+                    /*
+                     * Timeout : After 250ms without receiving any zerocopy
+                     * notification, consider all data as sent.
+                     */
+                    break;
+                } else if (ret < 0 ||
+                           (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))) {
+                    error_setg_errno(errp, errno,
+                                     "Poll error");
+                    break;
+                } else {
+                    continue;
+                }
+            }
+            if (errno == EINTR) {
+                continue;
+            }
+
+            error_setg_errno(errp, errno,
+                             "Unable to read errqueue");
+            break;
+        }
+
+        cm = CMSG_FIRSTHDR(&msg);
+        if (cm->cmsg_level != SOL_IP &&
+            cm->cmsg_type != IP_RECVERR) {
+            error_setg_errno(errp, EPROTOTYPE,
+                             "Wrong cmsg in errqueue");
+            break;
+        }
+
+        serr = (void *) CMSG_DATA(cm);
+        if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
+            error_setg_errno(errp, serr->ee_errno,
+                             "Error on socket");
+            break;
+        }
+        if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
+            error_setg_errno(errp, serr->ee_origin,
+                             "Error not from zerocopy");
+            break;
+        }
+
+        /* No errors, count sent ids*/
+        sioc->async_sent += serr->ee_data - serr->ee_info + 1;
+    }
+}
+
+
 #else /* WIN32 */
 static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
                                         const struct iovec *iov,
-- 
2.33.0



^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH v3 3/3] multifd: Send using asynchronous write on nocomp to send RAM pages.
  2021-09-22 22:24 [PATCH v3 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd Leonardo Bras
  2021-09-22 22:24 ` [PATCH v3 1/3] QIOChannel: Add io_async_writev & io_async_flush callbacks Leonardo Bras
  2021-09-22 22:24 ` [PATCH v3 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush Leonardo Bras
@ 2021-09-22 22:24 ` Leonardo Bras
  2021-09-24 17:43   ` Daniel P. Berrangé
  2021-09-28 22:50 ` [PATCH v3 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd Peter Xu
  3 siblings, 1 reply; 23+ messages in thread
From: Leonardo Bras @ 2021-09-22 22:24 UTC (permalink / raw)
  To: Daniel P. Berrangé,
	Juan Quintela, Dr. David Alan Gilbert, Peter Xu, Jason Wang
  Cc: Leonardo Bras, qemu-devel

Change multifd nocomp version to use asynchronous write for RAM pages, and
benefit of MSG_ZEROCOPY when it's available.

The asynchronous flush happens on cleanup only, before destroying the QIOChannel.

This will work fine on RAM migration because the RAM pages are not usually freed,
and there is no problem on changing the pages content between async_send() and
the actual sending of the buffer, because this change will dirty the page and
cause it to be re-sent on a next iteration anyway.

Signed-off-by: Leonardo Bras <leobras@redhat.com>
---
 migration/multifd.c | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/migration/multifd.c b/migration/multifd.c
index 377da78f5b..d247207a0a 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -105,7 +105,7 @@ static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used,
  */
 static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
 {
-    return qio_channel_writev_all(p->c, p->pages->iov, used, errp);
+    return qio_channel_async_writev_all(p->c, p->pages->iov, used, errp);
 }
 
 /**
@@ -546,6 +546,7 @@ void multifd_save_cleanup(void)
         MultiFDSendParams *p = &multifd_send_state->params[i];
         Error *local_err = NULL;
 
+        qio_channel_async_flush(p->c, NULL);
         socket_send_channel_destroy(p->c);
         p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
-- 
2.33.0



^ permalink raw reply related	[flat|nested] 23+ messages in thread

* Re: [PATCH v3 1/3] QIOChannel: Add io_async_writev & io_async_flush callbacks
  2021-09-22 22:24 ` [PATCH v3 1/3] QIOChannel: Add io_async_writev & io_async_flush callbacks Leonardo Bras
@ 2021-09-24 17:16   ` Daniel P. Berrangé
  2021-09-28 21:52     ` Peter Xu
  2021-09-29 19:03     ` Leonardo Bras Soares Passos
  0 siblings, 2 replies; 23+ messages in thread
From: Daniel P. Berrangé @ 2021-09-24 17:16 UTC (permalink / raw)
  To: Leonardo Bras
  Cc: qemu-devel, Jason Wang, Dr. David Alan Gilbert, Peter Xu, Juan Quintela

On Wed, Sep 22, 2021 at 07:24:21PM -0300, Leonardo Bras wrote:
> Adds io_async_writev and io_async_flush as optional callback to QIOChannelClass,
> allowing the implementation of asynchronous writes by subclasses.
> 
> How to use them:
> - Write data using qio_channel_async_writev(),
> - Wait write completion with qio_channel_async_flush().
> 
> Notes:
> Some asynchronous implementations may benefit from zerocopy mechanisms, so it's
> recommended to keep the write buffer untouched until the return of
> qio_channel_async_flush().
> 
> As the new callbacks are optional, if a subclass does not implement them
> there will be a fallback to the mandatory synchronous implementation:
> - io_async_writev will fallback to io_writev,
> - io_async_flush will return without changing anything.
> This makes simpler for the user to make use of the asynchronous implementation.
> 
> Also, some functions like qio_channel_writev_full_all() were adapted to
> offer an async version, and make better use of the new callbacks.
> 
> Signed-off-by: Leonardo Bras <leobras@redhat.com>
> ---
>  include/io/channel.h | 93 +++++++++++++++++++++++++++++++++++++-------
>  io/channel.c         | 66 ++++++++++++++++++++++++-------
>  2 files changed, 129 insertions(+), 30 deletions(-)
> 
> diff --git a/include/io/channel.h b/include/io/channel.h
> index 88988979f8..74f2e3ae8a 100644
> --- a/include/io/channel.h
> +++ b/include/io/channel.h
> @@ -136,6 +136,14 @@ struct QIOChannelClass {
>                                    IOHandler *io_read,
>                                    IOHandler *io_write,
>                                    void *opaque);
> +    ssize_t (*io_async_writev)(QIOChannel *ioc,
> +                               const struct iovec *iov,
> +                               size_t niov,
> +                               int *fds,
> +                               size_t nfds,
> +                               Error **errp);
> +   void (*io_async_flush)(QIOChannel *ioc,
> +                          Error **errp);
>  };
>  
>  /* General I/O handling functions */
> @@ -255,12 +263,17 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
>   * or QIO_CHANNEL_ERR_BLOCK if no data is can be sent
>   * and the channel is non-blocking
>   */
> -ssize_t qio_channel_writev_full(QIOChannel *ioc,
> -                                const struct iovec *iov,
> -                                size_t niov,
> -                                int *fds,
> -                                size_t nfds,
> -                                Error **errp);
> +ssize_t __qio_channel_writev_full(QIOChannel *ioc,

Using "__" is undesirable as that namespace is reserved.

> +                                  const struct iovec *iov,
> +                                  size_t niov,
> +                                  int *fds,
> +                                  size_t nfds,
> +                                  bool async,
> +                                  Error **errp);
> +#define qio_channel_writev_full(ioc, iov, niov, fds, nfds, errp) \
> +    __qio_channel_writev_full(ioc, iov, niov, fds, nfds, false, errp)
> +#define qio_channel_async_writev_full(ioc, iov, niov, fds, nfds, errp) \
> +    __qio_channel_writev_full(ioc, iov, niov, fds, nfds, true, errp)

The API docs only cover the first function, not the second.


>  /**
>   * qio_channel_readv_all_eof:
> @@ -339,10 +352,15 @@ int qio_channel_readv_all(QIOChannel *ioc,
>   *
>   * Returns: 0 if all bytes were written, or -1 on error
>   */
> -int qio_channel_writev_all(QIOChannel *ioc,
> -                           const struct iovec *iov,
> -                           size_t niov,
> -                           Error **erp);
> +int __qio_channel_writev_all(QIOChannel *ioc,
> +                             const struct iovec *iov,
> +                             size_t niov,
> +                             bool async,
> +                             Error **erp);
> +#define qio_channel_writev_all(ioc, iov, niov, erp) \
> +    __qio_channel_writev_all(ioc, iov, niov, false, erp)
> +#define qio_channel_async_writev_all(ioc, iov, niov, erp) \
> +    __qio_channel_writev_all(ioc, iov, niov, true, erp)


>  
>  /**
>   * qio_channel_readv:
> @@ -849,10 +867,55 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
>   * Returns: 0 if all bytes were written, or -1 on error
>   */
>  
> -int qio_channel_writev_full_all(QIOChannel *ioc,
> -                                const struct iovec *iov,
> -                                size_t niov,
> -                                int *fds, size_t nfds,
> -                                Error **errp);
> +int __qio_channel_writev_full_all(QIOChannel *ioc,
> +                                  const struct iovec *iov,
> +                                  size_t niov,
> +                                  int *fds, size_t nfds,
> +                                  bool async, Error **errp);
> +#define qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
> +    __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, false, errp)
> +#define qio_channel_async_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
> +    __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, true, errp)
> +
> +/**
> + * qio_channel_async_writev:
> + * @ioc: the channel object
> + * @iov: the array of memory regions to write data from
> + * @niov: the length of the @iov array
> + * @fds: an array of file handles to send
> + * @nfds: number of file handles in @fds
> + * @errp: pointer to a NULL-initialized error object
> + *
> + * Behaves like qio_channel_writev_full, but will send
> + * data asynchronously, this meaning this function
> + * may return before the data is actually sent.
> + *
> + * If at some point it's necessary wait for all data to be
> + * sent, use qio_channel_async_flush().
> + *
> + * If not implemented, falls back to the default writev
> + */

I'm not convinced by the fallback here. If you're
layering I/O channels this is not going to result in
desirable behaviour.

eg if QIOChannelTLS doesn't implement async, then when
you call async_writev, it'lll invoke sync writev on
the QIOChannelTLS, which will in turn invoke the sync
writev on QIOChannelSocket, despite the latter having
async writev support.  I think this is very misleading
behaviour

> +ssize_t qio_channel_async_writev(QIOChannel *ioc,
> +                                 const struct iovec *iov,
> +                                 size_t niov,
> +                                 int *fds,
> +                                 size_t nfds,
> +                                 Error **errp);

This is missing any flags. We need something like

   QIO_CHANNEL_WRITE_FLAG_ZEROCOPY

passed in an 'unsigned int flags' parameter. This in
turn makes me question whether we should have the
common helpers at all, as the api is going to be
different for sync vs async.

The QIOChannelFeature enum probably ought to be
extended with QIO_CHANNEL_FEATURE_WRITE_ZEROCOPY with
support for probing whether that's supported or not.

> +
> +/**
> + * qio_channel_async_flush:
> + * @ioc: the channel object
> + * @errp: pointer to a NULL-initialized error object
> + *
> + * Will lock until every packet queued with qio_channel_async_writev()

s/lock/block/ I presume.

> + * is sent.
> + *
> + * If not implemented, returns without changing anything.
> + */
> +
> +void qio_channel_async_flush(QIOChannel *ioc,
> +                             Error **errp);
> +
>  
>  #endif /* QIO_CHANNEL_H */
> diff --git a/io/channel.c b/io/channel.c
> index e8b019dc36..c4819b922f 100644
> --- a/io/channel.c
> +++ b/io/channel.c
> @@ -67,12 +67,13 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
>  }
>  
>  
> -ssize_t qio_channel_writev_full(QIOChannel *ioc,
> -                                const struct iovec *iov,
> -                                size_t niov,
> -                                int *fds,
> -                                size_t nfds,
> -                                Error **errp)
> +ssize_t __qio_channel_writev_full(QIOChannel *ioc,
> +                                  const struct iovec *iov,
> +                                  size_t niov,
> +                                  int *fds,
> +                                  size_t nfds,
> +                                  bool async,
> +                                  Error **errp)
>  {
>      QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
>  
> @@ -83,6 +84,10 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
>          return -1;
>      }
>  
> +    if (async && klass->io_async_writev) {
> +        return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
> +    }
> +
>      return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
>  }
>  
> @@ -212,19 +217,20 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
>      return ret;
>  }
>  
> -int qio_channel_writev_all(QIOChannel *ioc,
> -                           const struct iovec *iov,
> -                           size_t niov,
> -                           Error **errp)
> +int __qio_channel_writev_all(QIOChannel *ioc,
> +                             const struct iovec *iov,
> +                             size_t niov,
> +                             bool async,
> +                             Error **errp)
>  {
> -    return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, errp);
> +    return __qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, async, errp);
>  }
>  
> -int qio_channel_writev_full_all(QIOChannel *ioc,
> +int __qio_channel_writev_full_all(QIOChannel *ioc,
>                                  const struct iovec *iov,
>                                  size_t niov,
>                                  int *fds, size_t nfds,
> -                                Error **errp)
> +                                bool async, Error **errp)
>  {
>      int ret = -1;
>      struct iovec *local_iov = g_new(struct iovec, niov);
> @@ -237,8 +243,8 @@ int qio_channel_writev_full_all(QIOChannel *ioc,
>  
>      while (nlocal_iov > 0) {
>          ssize_t len;
> -        len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
> -                                      errp);
> +        len = __qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
> +                                        async, errp);
>          if (len == QIO_CHANNEL_ERR_BLOCK) {
>              if (qemu_in_coroutine()) {
>                  qio_channel_yield(ioc, G_IO_OUT);
> @@ -474,6 +480,36 @@ off_t qio_channel_io_seek(QIOChannel *ioc,
>  }
>  
>  
> +ssize_t qio_channel_async_writev(QIOChannel *ioc,
> +                                 const struct iovec *iov,
> +                                 size_t niov,
> +                                 int *fds,
> +                                 size_t nfds,
> +                                 Error **errp)
> +{
> +     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> +
> +    if (!klass->io_async_writev) {
> +        return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
> +    }
> +
> +     return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
> +}
> +
> +
> +void qio_channel_async_flush(QIOChannel *ioc,
> +                             Error **errp)
> +{
> +     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> +
> +    if (!klass->io_async_flush) {
> +        return;
> +    }
> +
> +     klass->io_async_flush(ioc, errp);
> +}
> +
> +
>  static void qio_channel_restart_read(void *opaque)
>  {
>      QIOChannel *ioc = opaque;
> -- 
> 2.33.0
> 

Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v3 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush
  2021-09-22 22:24 ` [PATCH v3 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush Leonardo Bras
@ 2021-09-24 17:38   ` Daniel P. Berrangé
  2021-09-29 19:32     ` Leonardo Bras Soares Passos
  2021-09-28 22:45   ` Peter Xu
  1 sibling, 1 reply; 23+ messages in thread
From: Daniel P. Berrangé @ 2021-09-24 17:38 UTC (permalink / raw)
  To: Leonardo Bras
  Cc: qemu-devel, Jason Wang, Dr. David Alan Gilbert, Peter Xu, Juan Quintela

On Wed, Sep 22, 2021 at 07:24:22PM -0300, Leonardo Bras wrote:
> Implement the new optional callbacks io_async_write and io_async_flush on
> QIOChannelSocket, but enables it only when MSG_ZEROCOPY feature is
> available in the host kernel, and TCP sockets are used.
> 
> qio_channel_socket_writev() contents were moved to a helper function
> __qio_channel_socket_writev() which accepts an extra 'flag' argument.
> This helper function is used to implement qio_channel_socket_writev(), with
> flags = 0, keeping it's behavior unchanged, and
> qio_channel_socket_async_writev() with flags = MSG_ZEROCOPY.
> 
> qio_channel_socket_async_flush() was implemented by reading the socket's error
> queue, which will have information on MSG_ZEROCOPY send completion.
> There is no need to worry with re-sending packets in case any error happens, as
> MSG_ZEROCOPY only works with TCP and it will re-tranmsmit if any error ocurs.
> 
> Notes on using async_write():
> - As MSG_ZEROCOPY tells the kernel to use the same user buffer to avoid copying,
> some caution is necessary to avoid overwriting any buffer before it's sent.
> If something like this happen, a newer version of the buffer may be sent instead.
> - If this is a problem, it's recommended to use async_flush() before freeing or
> re-using the buffer.
> - When using MSG_ZERCOCOPY, the buffer memory will be locked, so it may require
> a larger amount than usually available to non-root user.
> - If the required amount of locked memory is not available, it falls-back to
> buffer copying behavior, and synchronous sending.
> 
> Signed-off-by: Leonardo Bras <leobras@redhat.com>
> ---
>  include/io/channel-socket.h |   2 +
>  include/io/channel.h        |   1 +
>  io/channel-socket.c         | 176 ++++++++++++++++++++++++++++++++++--
>  3 files changed, 169 insertions(+), 10 deletions(-)
> 
> diff --git a/include/io/channel-socket.h b/include/io/channel-socket.h
> index e747e63514..4d1be0637a 100644
> --- a/include/io/channel-socket.h
> +++ b/include/io/channel-socket.h
> @@ -47,6 +47,8 @@ struct QIOChannelSocket {
>      socklen_t localAddrLen;
>      struct sockaddr_storage remoteAddr;
>      socklen_t remoteAddrLen;
> +    ssize_t async_queued;
> +    ssize_t async_sent;
>  };
>  
>  
> diff --git a/include/io/channel.h b/include/io/channel.h
> index 74f2e3ae8a..611bb2ea26 100644
> --- a/include/io/channel.h
> +++ b/include/io/channel.h
> @@ -31,6 +31,7 @@ OBJECT_DECLARE_TYPE(QIOChannel, QIOChannelClass,
>  
>  
>  #define QIO_CHANNEL_ERR_BLOCK -2
> +#define QIO_CHANNEL_ERR_NOBUFS -3
>  
>  typedef enum QIOChannelFeature QIOChannelFeature;
>  
> diff --git a/io/channel-socket.c b/io/channel-socket.c
> index 606ec97cf7..c67832d0bb 100644
> --- a/io/channel-socket.c
> +++ b/io/channel-socket.c
> @@ -26,9 +26,23 @@
>  #include "io/channel-watch.h"
>  #include "trace.h"
>  #include "qapi/clone-visitor.h"
> +#ifdef CONFIG_LINUX
> +#include <linux/errqueue.h>
> +#include <poll.h>
> +#endif
>  
>  #define SOCKET_MAX_FDS 16
>  
> +static ssize_t qio_channel_socket_async_writev(QIOChannel *ioc,
> +                                               const struct iovec *iov,
> +                                               size_t niov,
> +                                               int *fds,
> +                                               size_t nfds,
> +                                               Error **errp);
> +
> +static void qio_channel_socket_async_flush(QIOChannel *ioc,
> +                                           Error **errp);
> +
>  SocketAddress *
>  qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
>                                       Error **errp)
> @@ -55,6 +69,8 @@ qio_channel_socket_new(void)
>  
>      sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET));
>      sioc->fd = -1;
> +    sioc->async_queued = 0;
> +    sioc->async_sent = 0;
>  
>      ioc = QIO_CHANNEL(sioc);
>      qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
> @@ -140,6 +156,7 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
>                                      Error **errp)
>  {
>      int fd;
> +    int ret, v = 1;
>  
>      trace_qio_channel_socket_connect_sync(ioc, addr);
>      fd = socket_connect(addr, errp);
> @@ -154,6 +171,19 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
>          return -1;
>      }
>  
> +#ifdef CONFIG_LINUX
> +    if (addr->type != SOCKET_ADDRESS_TYPE_INET) {
> +        return 0;
> +    }
> +
> +    ret = qemu_setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
> +    if (ret >= 0) {
> +        QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> +        klass->io_async_writev = qio_channel_socket_async_writev;
> +        klass->io_async_flush = qio_channel_socket_async_flush;
> +    }
> +#endif

This is not write - the async APIs should not be tied 1:1 to ZEROCOPY
usage - we should have them take a flag to request ZEROCOPY behaviour.

> +
>      return 0;
>  }
>  
> @@ -520,12 +550,13 @@ static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
>      return ret;
>  }
>  
> -static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
> -                                         const struct iovec *iov,
> -                                         size_t niov,
> -                                         int *fds,
> -                                         size_t nfds,
> -                                         Error **errp)
> +static ssize_t __qio_channel_socket_writev(QIOChannel *ioc,
> +                                           const struct iovec *iov,
> +                                           size_t niov,
> +                                           int *fds,
> +                                           size_t nfds,
> +                                           int flags,
> +                                           Error **errp)
>  {
>      QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
>      ssize_t ret;
> @@ -558,20 +589,145 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
>      }
>  
>   retry:
> -    ret = sendmsg(sioc->fd, &msg, 0);
> +    ret = sendmsg(sioc->fd, &msg, flags);
>      if (ret <= 0) {
> -        if (errno == EAGAIN) {
> +        switch (errno) {
> +        case EAGAIN:
>              return QIO_CHANNEL_ERR_BLOCK;
> -        }
> -        if (errno == EINTR) {
> +        case EINTR:
>              goto retry;
> +        case ENOBUFS:
> +            return QIO_CHANNEL_ERR_NOBUFS;
>          }
> +
>          error_setg_errno(errp, errno,
>                           "Unable to write to socket");
>          return -1;
>      }
>      return ret;
>  }
> +
> +static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
> +                                         const struct iovec *iov,
> +                                         size_t niov,
> +                                         int *fds,
> +                                         size_t nfds,
> +                                         Error **errp)
> +{
> +    return __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, 0, errp);
> +}
> +
> +static ssize_t qio_channel_socket_async_writev(QIOChannel *ioc,
> +                                               const struct iovec *iov,
> +                                               size_t niov,
> +                                               int *fds,
> +                                               size_t nfds,
> +                                               Error **errp)
> +{
> +    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> +    ssize_t ret;
> +
> +    sioc->async_queued++;
> +
> +    ret = __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, MSG_ZEROCOPY,
> +                                       errp);
> +    if (ret == QIO_CHANNEL_ERR_NOBUFS) {
> +        /*
> +         * Not enough locked memory available to the process.
> +         * Fallback to default sync callback.
> +         */
> +
> +        if (errp && *errp) {
> +            warn_reportf_err(*errp,
> +                             "Process can't lock enough memory for using MSG_ZEROCOPY,"
> +                             "falling back to non-zerocopy");

This is not nice as it hides what is likely mis-configuration error.
If someone asked for zerocopy, we should honour that or report an
error back.

> +        }
> +
> +        QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> +        klass->io_async_writev = NULL;
> +        klass->io_async_flush = NULL;

Clearing the flush callback is wrong. We might have pending async
writes that haven't been processed that, and the lack of buffers
may be a transient problem just caused by a backlog of writes.

> +
> +        /* Re-send current buffer */
> +        ret = qio_channel_socket_writev(ioc, iov, niov, fds, nfds, errp);
> +    }
> +
> +    return ret;
> +}
> +
> +
> +static void qio_channel_socket_async_flush(QIOChannel *ioc,
> +                                           Error **errp)
> +{
> +    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> +    struct msghdr msg = {};
> +    struct pollfd pfd;
> +    struct sock_extended_err *serr;
> +    struct cmsghdr *cm;
> +    char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];

Err  sizeof(int) * SOCKET_MAX_FDS   doesn't look right. This
buffer needs to hold 'struct sock_extended_err' instances,
not 'int', and SOCKET_MAX_FDS is an unrelated limit.

> +    int ret;
> +
> +    memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
> +    msg.msg_control = control;
> +    msg.msg_controllen = sizeof(control);
> +
> +    while (sioc->async_sent < sioc->async_queued) {
> +        ret = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
> +        if (ret < 0) {
> +            if (errno == EAGAIN) {
> +                /* Nothing on errqueue, wait */
> +                pfd.fd = sioc->fd;
> +                pfd.events = 0;
> +                ret = poll(&pfd, 1, 250);
> +                if (ret == 0) {
> +                    /*
> +                     * Timeout : After 250ms without receiving any zerocopy
> +                     * notification, consider all data as sent.
> +                     */

This feels very dubious indeed. If some caller needs a guarantee that the
data was successfully sent, merely waiting 250ms is not going to be reliable
enough. 

A regular non-async + non-zerocopy right will wait as long as is needed
unless SO_SNDTIMEO has been set on the socket.

At the very least the timeout ought to be a parameter passed in, and the
return value should indicate whether it timed out, or report how many
pending writes still aren't processed, so the caller can decide whether
to call flush again.

> +                    break;
> +                } else if (ret < 0 ||
> +                           (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))) {
> +                    error_setg_errno(errp, errno,
> +                                     "Poll error");
> +                    break;
> +                } else {
> +                    continue;
> +                }
> +            }
> +            if (errno == EINTR) {
> +                continue;
> +            }
> +
> +            error_setg_errno(errp, errno,
> +                             "Unable to read errqueue");
> +            break;
> +        }
> +
> +        cm = CMSG_FIRSTHDR(&msg);
> +        if (cm->cmsg_level != SOL_IP &&
> +            cm->cmsg_type != IP_RECVERR) {
> +            error_setg_errno(errp, EPROTOTYPE,
> +                             "Wrong cmsg in errqueue");
> +            break;
> +        }
> +
> +        serr = (void *) CMSG_DATA(cm);
> +        if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
> +            error_setg_errno(errp, serr->ee_errno,
> +                             "Error on socket");
> +            break;
> +        }
> +        if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
> +            error_setg_errno(errp, serr->ee_origin,
> +                             "Error not from zerocopy");
> +            break;
> +        }
> +
> +        /* No errors, count sent ids*/
> +        sioc->async_sent += serr->ee_data - serr->ee_info + 1;
> +    }
> +}
> +
> +
>  #else /* WIN32 */
>  static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
>                                          const struct iovec *iov,

Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v3 3/3] multifd: Send using asynchronous write on nocomp to send RAM pages.
  2021-09-22 22:24 ` [PATCH v3 3/3] multifd: Send using asynchronous write on nocomp to send RAM pages Leonardo Bras
@ 2021-09-24 17:43   ` Daniel P. Berrangé
  2021-09-28 22:48     ` Peter Xu
  2021-09-29 19:44     ` Leonardo Bras Soares Passos
  0 siblings, 2 replies; 23+ messages in thread
From: Daniel P. Berrangé @ 2021-09-24 17:43 UTC (permalink / raw)
  To: Leonardo Bras
  Cc: qemu-devel, Jason Wang, Dr. David Alan Gilbert, Peter Xu, Juan Quintela

On Wed, Sep 22, 2021 at 07:24:23PM -0300, Leonardo Bras wrote:
> Change multifd nocomp version to use asynchronous write for RAM pages, and
> benefit of MSG_ZEROCOPY when it's available.
> 
> The asynchronous flush happens on cleanup only, before destroying the QIOChannel.
> 
> This will work fine on RAM migration because the RAM pages are not usually freed,
> and there is no problem on changing the pages content between async_send() and
> the actual sending of the buffer, because this change will dirty the page and
> cause it to be re-sent on a next iteration anyway.
> 
> Signed-off-by: Leonardo Bras <leobras@redhat.com>
> ---
>  migration/multifd.c | 3 ++-
>  1 file changed, 2 insertions(+), 1 deletion(-)
> 
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 377da78f5b..d247207a0a 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -105,7 +105,7 @@ static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used,
>   */
>  static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
>  {
> -    return qio_channel_writev_all(p->c, p->pages->iov, used, errp);
> +    return qio_channel_async_writev_all(p->c, p->pages->iov, used, errp);
>  }

This needs to be made conditional so zeroopy is only used if rquested
by the mgmt app, and it isn't going to work in all cases (eg TLS) so
silently enabling it is not good.

>  
>  /**
> @@ -546,6 +546,7 @@ void multifd_save_cleanup(void)
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>          Error *local_err = NULL;
>  
> +        qio_channel_async_flush(p->c, NULL);
>          socket_send_channel_destroy(p->c);
>          p->c = NULL;
>          qemu_mutex_destroy(&p->mutex);

This isn't reliable beucase qio_channel_async_flush will return early
even if not everything is flushed.

Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v3 1/3] QIOChannel: Add io_async_writev & io_async_flush callbacks
  2021-09-24 17:16   ` Daniel P. Berrangé
@ 2021-09-28 21:52     ` Peter Xu
  2021-09-29 19:06       ` Leonardo Bras Soares Passos
  2021-09-29 19:03     ` Leonardo Bras Soares Passos
  1 sibling, 1 reply; 23+ messages in thread
From: Peter Xu @ 2021-09-28 21:52 UTC (permalink / raw)
  To: Daniel P. Berrangé
  Cc: qemu-devel, Jason Wang, Leonardo Bras, Dr. David Alan Gilbert,
	Juan Quintela

On Fri, Sep 24, 2021 at 06:16:04PM +0100, Daniel P. Berrangé wrote:
> > +ssize_t qio_channel_async_writev(QIOChannel *ioc,
> > +                                 const struct iovec *iov,
> > +                                 size_t niov,
> > +                                 int *fds,
> > +                                 size_t nfds,
> > +                                 Error **errp);
> 
> This is missing any flags. We need something like
> 
>    QIO_CHANNEL_WRITE_FLAG_ZEROCOPY
> 
> passed in an 'unsigned int flags' parameter. This in
> turn makes me question whether we should have the
> common helpers at all, as the api is going to be
> different for sync vs async.
> 
> The QIOChannelFeature enum probably ought to be
> extended with QIO_CHANNEL_FEATURE_WRITE_ZEROCOPY with
> support for probing whether that's supported or not.

I'm also wondering whether we could just drop the fds/nfds as per my knowledge
SCM_RIGHT is the only user, at the meantime I don't see why an async interface
would pass in any fd anyways..  Thanks,

-- 
Peter Xu



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v3 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush
  2021-09-22 22:24 ` [PATCH v3 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush Leonardo Bras
  2021-09-24 17:38   ` Daniel P. Berrangé
@ 2021-09-28 22:45   ` Peter Xu
  2021-09-29 19:36     ` Leonardo Bras Soares Passos
  1 sibling, 1 reply; 23+ messages in thread
From: Peter Xu @ 2021-09-28 22:45 UTC (permalink / raw)
  To: Leonardo Bras
  Cc: qemu-devel, Jason Wang, Daniel P. Berrangé,
	Dr. David Alan Gilbert, Juan Quintela

On Wed, Sep 22, 2021 at 07:24:22PM -0300, Leonardo Bras wrote:
> +static void qio_channel_socket_async_flush(QIOChannel *ioc,
> +                                           Error **errp)
> +{
> +    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> +    struct msghdr msg = {};
> +    struct pollfd pfd;
> +    struct sock_extended_err *serr;
> +    struct cmsghdr *cm;
> +    char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
> +    int ret;
> +
> +    memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
> +    msg.msg_control = control;
> +    msg.msg_controllen = sizeof(control);
> +
> +    while (sioc->async_sent < sioc->async_queued) {
> +        ret = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
> +        if (ret < 0) {
> +            if (errno == EAGAIN) {
> +                /* Nothing on errqueue, wait */
> +                pfd.fd = sioc->fd;
> +                pfd.events = 0;
> +                ret = poll(&pfd, 1, 250);
> +                if (ret == 0) {
> +                    /*
> +                     * Timeout : After 250ms without receiving any zerocopy
> +                     * notification, consider all data as sent.
> +                     */
> +                    break;

After a timeout, we'll break the while loop and continue parsing an invalid
msg [1].  Is that what we want?

Also, I don't think we can return the flush() even if timed out - iiuc we
should keep polling until we have async_sent==async_queued.  It depends on how
we define flush(): if it's "when this function returns all data is sent", then
we should keep polling, and afaict this is what we want here right now.

> +                } else if (ret < 0 ||
> +                           (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))) {
> +                    error_setg_errno(errp, errno,
> +                                     "Poll error");
> +                    break;
> +                } else {
> +                    continue;
> +                }
> +            }
> +            if (errno == EINTR) {
> +                continue;
> +            }
> +
> +            error_setg_errno(errp, errno,
> +                             "Unable to read errqueue");
> +            break;
> +        }
> +
> +        cm = CMSG_FIRSTHDR(&msg);

[1]

> +        if (cm->cmsg_level != SOL_IP &&
> +            cm->cmsg_type != IP_RECVERR) {
> +            error_setg_errno(errp, EPROTOTYPE,
> +                             "Wrong cmsg in errqueue");
> +            break;
> +        }

-- 
Peter Xu



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v3 3/3] multifd: Send using asynchronous write on nocomp to send RAM pages.
  2021-09-24 17:43   ` Daniel P. Berrangé
@ 2021-09-28 22:48     ` Peter Xu
  2021-09-29 19:46       ` Leonardo Bras Soares Passos
  2021-09-29 19:44     ` Leonardo Bras Soares Passos
  1 sibling, 1 reply; 23+ messages in thread
From: Peter Xu @ 2021-09-28 22:48 UTC (permalink / raw)
  To: Daniel P. Berrangé
  Cc: qemu-devel, Jason Wang, Leonardo Bras, Dr. David Alan Gilbert,
	Juan Quintela

On Fri, Sep 24, 2021 at 06:43:49PM +0100, Daniel P. Berrangé wrote:
> > @@ -546,6 +546,7 @@ void multifd_save_cleanup(void)
> >          MultiFDSendParams *p = &multifd_send_state->params[i];
> >          Error *local_err = NULL;
> >  
> > +        qio_channel_async_flush(p->c, NULL);
> >          socket_send_channel_destroy(p->c);
> >          p->c = NULL;
> >          qemu_mutex_destroy(&p->mutex);
> 
> This isn't reliable beucase qio_channel_async_flush will return early
> even if not everything is flushed.

Right, though I think the problem is in patch 2 where we should make sure
flush() will only return if all data sent.

And at the meantime we may also want to move it to before we send the devices
states for both pre/post copy.  multifd_save_cleanup() is called only until
migration completed and we're cleaning stuffs, I'm afraid it's too late, so
potentially the device states can arrive and dest vm running without the latest
src VM memories.

-- 
Peter Xu



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v3 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd
  2021-09-22 22:24 [PATCH v3 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd Leonardo Bras
                   ` (2 preceding siblings ...)
  2021-09-22 22:24 ` [PATCH v3 3/3] multifd: Send using asynchronous write on nocomp to send RAM pages Leonardo Bras
@ 2021-09-28 22:50 ` Peter Xu
  2021-09-29 18:34   ` Leonardo Bras Soares Passos
  3 siblings, 1 reply; 23+ messages in thread
From: Peter Xu @ 2021-09-28 22:50 UTC (permalink / raw)
  To: Leonardo Bras
  Cc: qemu-devel, Jason Wang, Daniel P. Berrangé,
	Dr. David Alan Gilbert, Juan Quintela

On Wed, Sep 22, 2021 at 07:24:20PM -0300, Leonardo Bras wrote:
> This patch series intends to enable MSG_ZEROCOPY in QIOChannel, and make
> use of it for multifd migration performance improvement.
> 
> Patch #1 creates new callbacks for QIOChannel, allowing the implementation
> of asynchronous writing.
> 
> Patch #2 implements async_write and async_flush on QIOChannelSocket,
> 
> Patch #3 Makes use of async_write + async_flush to enable MSG_ZEROCOPY
> for migration using multifd nocomp.
> 
> Results:
> So far, the resource usage of __sys_sendmsg() reduced 15 times, and the
> overall migration took 13-18% less time, based in synthetic workload.
> 
> The objective is to reduce migration time in hosts with heavy cpu usage.

My previous memory is that we'll add a capability bit for migration, so it'll
not be enabled until user specified it.  Plan to do it in the next version?

It'll still be okay if you want to separate the work of (1) qio channel
zero-copy support on sockets, and (2) apply zero copy to migration, then that
can be done in part 2.  Your call. :)

Thanks,

-- 
Peter Xu



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v3 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd
  2021-09-28 22:50 ` [PATCH v3 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd Peter Xu
@ 2021-09-29 18:34   ` Leonardo Bras Soares Passos
  2021-09-29 19:22     ` Peter Xu
  0 siblings, 1 reply; 23+ messages in thread
From: Leonardo Bras Soares Passos @ 2021-09-29 18:34 UTC (permalink / raw)
  To: Peter Xu
  Cc: qemu-devel, Jason Wang, Daniel P. Berrangé,
	Dr. David Alan Gilbert, Juan Quintela

Hello Peter,

On Tue, Sep 28, 2021 at 7:51 PM Peter Xu <peterx@redhat.com> wrote:
>
> On Wed, Sep 22, 2021 at 07:24:20PM -0300, Leonardo Bras wrote:
> > This patch series intends to enable MSG_ZEROCOPY in QIOChannel, and make
> > use of it for multifd migration performance improvement.
> >
> > Patch #1 creates new callbacks for QIOChannel, allowing the implementation
> > of asynchronous writing.
> >
> > Patch #2 implements async_write and async_flush on QIOChannelSocket,
> >
> > Patch #3 Makes use of async_write + async_flush to enable MSG_ZEROCOPY
> > for migration using multifd nocomp.
> >
> > Results:
> > So far, the resource usage of __sys_sendmsg() reduced 15 times, and the
> > overall migration took 13-18% less time, based in synthetic workload.
> >
> > The objective is to reduce migration time in hosts with heavy cpu usage.
>
> My previous memory is that we'll add a capability bit for migration, so it'll
> not be enabled until user specified it.  Plan to do it in the next version?

You mean like I did in pre-V1, enabling ZEROCOPY for multifd it in QMP?
Or is this something else?


>
> It'll still be okay if you want to separate the work of (1) qio channel
> zero-copy support on sockets, and (2) apply zero copy to migration, then that
> can be done in part 2.  Your call. :)

Ok  :)

>
> Thanks,
>
> --
> Peter Xu
>

Thank you Peter,

Best regards,
Leonardo



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v3 1/3] QIOChannel: Add io_async_writev & io_async_flush callbacks
  2021-09-24 17:16   ` Daniel P. Berrangé
  2021-09-28 21:52     ` Peter Xu
@ 2021-09-29 19:03     ` Leonardo Bras Soares Passos
  1 sibling, 0 replies; 23+ messages in thread
From: Leonardo Bras Soares Passos @ 2021-09-29 19:03 UTC (permalink / raw)
  To: Daniel P. Berrangé
  Cc: qemu-devel, Jason Wang, Dr. David Alan Gilbert, Peter Xu, Juan Quintela

Hello Daniel, thank you for reviewing!

On Fri, Sep 24, 2021 at 2:16 PM Daniel P. Berrangé <berrange@redhat.com> wrote:
>
> On Wed, Sep 22, 2021 at 07:24:21PM -0300, Leonardo Bras wrote:
> > Adds io_async_writev and io_async_flush as optional callback to QIOChannelClass,
> > allowing the implementation of asynchronous writes by subclasses.
> >
> > How to use them:
> > - Write data using qio_channel_async_writev(),
> > - Wait write completion with qio_channel_async_flush().
> >
> > Notes:
> > Some asynchronous implementations may benefit from zerocopy mechanisms, so it's
> > recommended to keep the write buffer untouched until the return of
> > qio_channel_async_flush().
> >
> > As the new callbacks are optional, if a subclass does not implement them
> > there will be a fallback to the mandatory synchronous implementation:
> > - io_async_writev will fallback to io_writev,
> > - io_async_flush will return without changing anything.
> > This makes simpler for the user to make use of the asynchronous implementation.
> >
> > Also, some functions like qio_channel_writev_full_all() were adapted to
> > offer an async version, and make better use of the new callbacks.
> >
> > Signed-off-by: Leonardo Bras <leobras@redhat.com>
> > ---
> >  include/io/channel.h | 93 +++++++++++++++++++++++++++++++++++++-------
> >  io/channel.c         | 66 ++++++++++++++++++++++++-------
> >  2 files changed, 129 insertions(+), 30 deletions(-)
> >
> > diff --git a/include/io/channel.h b/include/io/channel.h
> > index 88988979f8..74f2e3ae8a 100644
> > --- a/include/io/channel.h
> > +++ b/include/io/channel.h
> > @@ -136,6 +136,14 @@ struct QIOChannelClass {
> >                                    IOHandler *io_read,
> >                                    IOHandler *io_write,
> >                                    void *opaque);
> > +    ssize_t (*io_async_writev)(QIOChannel *ioc,
> > +                               const struct iovec *iov,
> > +                               size_t niov,
> > +                               int *fds,
> > +                               size_t nfds,
> > +                               Error **errp);
> > +   void (*io_async_flush)(QIOChannel *ioc,
> > +                          Error **errp);
> >  };
> >
> >  /* General I/O handling functions */
> > @@ -255,12 +263,17 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
> >   * or QIO_CHANNEL_ERR_BLOCK if no data is can be sent
> >   * and the channel is non-blocking
> >   */
> > -ssize_t qio_channel_writev_full(QIOChannel *ioc,
> > -                                const struct iovec *iov,
> > -                                size_t niov,
> > -                                int *fds,
> > -                                size_t nfds,
> > -                                Error **errp);
> > +ssize_t __qio_channel_writev_full(QIOChannel *ioc,
>
> Using "__" is undesirable as that namespace is reserved.

Thank you for the tip!
I will make sure to remember avoiding this in the future.


>
> > +                                  const struct iovec *iov,
> > +                                  size_t niov,
> > +                                  int *fds,
> > +                                  size_t nfds,
> > +                                  bool async,
> > +                                  Error **errp);
> > +#define qio_channel_writev_full(ioc, iov, niov, fds, nfds, errp) \
> > +    __qio_channel_writev_full(ioc, iov, niov, fds, nfds, false, errp)
> > +#define qio_channel_async_writev_full(ioc, iov, niov, fds, nfds, errp) \
> > +    __qio_channel_writev_full(ioc, iov, niov, fds, nfds, true, errp)
>
> The API docs only cover the first function, not the second.

You are right.
If this ends up being the final implementation, I will make sure to
provide correct docs.

>
>
> >  /**
> >   * qio_channel_readv_all_eof:
> > @@ -339,10 +352,15 @@ int qio_channel_readv_all(QIOChannel *ioc,
> >   *
> >   * Returns: 0 if all bytes were written, or -1 on error
> >   */
> > -int qio_channel_writev_all(QIOChannel *ioc,
> > -                           const struct iovec *iov,
> > -                           size_t niov,
> > -                           Error **erp);
> > +int __qio_channel_writev_all(QIOChannel *ioc,
> > +                             const struct iovec *iov,
> > +                             size_t niov,
> > +                             bool async,
> > +                             Error **erp);
> > +#define qio_channel_writev_all(ioc, iov, niov, erp) \
> > +    __qio_channel_writev_all(ioc, iov, niov, false, erp)
> > +#define qio_channel_async_writev_all(ioc, iov, niov, erp) \
> > +    __qio_channel_writev_all(ioc, iov, niov, true, erp)
>
>
> >
> >  /**
> >   * qio_channel_readv:
> > @@ -849,10 +867,55 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
> >   * Returns: 0 if all bytes were written, or -1 on error
> >   */
> >
> > -int qio_channel_writev_full_all(QIOChannel *ioc,
> > -                                const struct iovec *iov,
> > -                                size_t niov,
> > -                                int *fds, size_t nfds,
> > -                                Error **errp);
> > +int __qio_channel_writev_full_all(QIOChannel *ioc,
> > +                                  const struct iovec *iov,
> > +                                  size_t niov,
> > +                                  int *fds, size_t nfds,
> > +                                  bool async, Error **errp);
> > +#define qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
> > +    __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, false, errp)
> > +#define qio_channel_async_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
> > +    __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, true, errp)
> > +
> > +/**
> > + * qio_channel_async_writev:
> > + * @ioc: the channel object
> > + * @iov: the array of memory regions to write data from
> > + * @niov: the length of the @iov array
> > + * @fds: an array of file handles to send
> > + * @nfds: number of file handles in @fds
> > + * @errp: pointer to a NULL-initialized error object
> > + *
> > + * Behaves like qio_channel_writev_full, but will send
> > + * data asynchronously, this meaning this function
> > + * may return before the data is actually sent.
> > + *
> > + * If at some point it's necessary wait for all data to be
> > + * sent, use qio_channel_async_flush().
> > + *
> > + * If not implemented, falls back to the default writev
> > + */
>
> I'm not convinced by the fallback here. If you're
> layering I/O channels this is not going to result in
> desirable behaviour.
>
> eg if QIOChannelTLS doesn't implement async, then when
> you call async_writev, it'lll invoke sync writev on
> the QIOChannelTLS, which will in turn invoke the sync
> writev on QIOChannelSocket, despite the latter having
> async writev support.  I think this is very misleading
> behaviour

Yeah, it's a good point.
Failing when async is not supported seems a better approach.

>
> > +ssize_t qio_channel_async_writev(QIOChannel *ioc,
> > +                                 const struct iovec *iov,
> > +                                 size_t niov,
> > +                                 int *fds,
> > +                                 size_t nfds,
> > +                                 Error **errp);
>
> This is missing any flags. We need something like
>
>    QIO_CHANNEL_WRITE_FLAG_ZEROCOPY
>
> passed in an 'unsigned int flags' parameter. This in
> turn makes me question whether we should have the
> common helpers at all, as the api is going to be
> different for sync vs async.
>
> The QIOChannelFeature enum probably ought to be
> extended with QIO_CHANNEL_FEATURE_WRITE_ZEROCOPY with
> support for probing whether that's supported or not.

Yeah, that makes sense to me.

>
> > +
> > +/**
> > + * qio_channel_async_flush:
> > + * @ioc: the channel object
> > + * @errp: pointer to a NULL-initialized error object
> > + *
> > + * Will lock until every packet queued with qio_channel_async_writev()
>
> s/lock/block/ I presume.

correct.

>
> > + * is sent.
> > + *
> > + * If not implemented, returns without changing anything.
> > + */
> > +
> > +void qio_channel_async_flush(QIOChannel *ioc,
> > +                             Error **errp);
> > +
> >
> >  #endif /* QIO_CHANNEL_H */
> > diff --git a/io/channel.c b/io/channel.c
> > index e8b019dc36..c4819b922f 100644
> > --- a/io/channel.c
> > +++ b/io/channel.c
> > @@ -67,12 +67,13 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
> >  }
> >
> >
> > -ssize_t qio_channel_writev_full(QIOChannel *ioc,
> > -                                const struct iovec *iov,
> > -                                size_t niov,
> > -                                int *fds,
> > -                                size_t nfds,
> > -                                Error **errp)
> > +ssize_t __qio_channel_writev_full(QIOChannel *ioc,
> > +                                  const struct iovec *iov,
> > +                                  size_t niov,
> > +                                  int *fds,
> > +                                  size_t nfds,
> > +                                  bool async,
> > +                                  Error **errp)
> >  {
> >      QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> >
> > @@ -83,6 +84,10 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
> >          return -1;
> >      }
> >
> > +    if (async && klass->io_async_writev) {
> > +        return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
> > +    }
> > +
> >      return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
> >  }
> >
> > @@ -212,19 +217,20 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
> >      return ret;
> >  }
> >
> > -int qio_channel_writev_all(QIOChannel *ioc,
> > -                           const struct iovec *iov,
> > -                           size_t niov,
> > -                           Error **errp)
> > +int __qio_channel_writev_all(QIOChannel *ioc,
> > +                             const struct iovec *iov,
> > +                             size_t niov,
> > +                             bool async,
> > +                             Error **errp)
> >  {
> > -    return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, errp);
> > +    return __qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, async, errp);
> >  }
> >
> > -int qio_channel_writev_full_all(QIOChannel *ioc,
> > +int __qio_channel_writev_full_all(QIOChannel *ioc,
> >                                  const struct iovec *iov,
> >                                  size_t niov,
> >                                  int *fds, size_t nfds,
> > -                                Error **errp)
> > +                                bool async, Error **errp)
> >  {
> >      int ret = -1;
> >      struct iovec *local_iov = g_new(struct iovec, niov);
> > @@ -237,8 +243,8 @@ int qio_channel_writev_full_all(QIOChannel *ioc,
> >
> >      while (nlocal_iov > 0) {
> >          ssize_t len;
> > -        len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
> > -                                      errp);
> > +        len = __qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
> > +                                        async, errp);
> >          if (len == QIO_CHANNEL_ERR_BLOCK) {
> >              if (qemu_in_coroutine()) {
> >                  qio_channel_yield(ioc, G_IO_OUT);
> > @@ -474,6 +480,36 @@ off_t qio_channel_io_seek(QIOChannel *ioc,
> >  }
> >
> >
> > +ssize_t qio_channel_async_writev(QIOChannel *ioc,
> > +                                 const struct iovec *iov,
> > +                                 size_t niov,
> > +                                 int *fds,
> > +                                 size_t nfds,
> > +                                 Error **errp)
> > +{
> > +     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> > +
> > +    if (!klass->io_async_writev) {
> > +        return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
> > +    }
> > +
> > +     return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
> > +}
> > +
> > +
> > +void qio_channel_async_flush(QIOChannel *ioc,
> > +                             Error **errp)
> > +{
> > +     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> > +
> > +    if (!klass->io_async_flush) {
> > +        return;
> > +    }
> > +
> > +     klass->io_async_flush(ioc, errp);
> > +}
> > +
> > +
> >  static void qio_channel_restart_read(void *opaque)
> >  {
> >      QIOChannel *ioc = opaque;
> > --
> > 2.33.0
> >
>
> Regards,
> Daniel
> --
> |: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
> |: https://libvirt.org         -o-            https://fstop138.berrange.com :|
> |: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|
>



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v3 1/3] QIOChannel: Add io_async_writev & io_async_flush callbacks
  2021-09-28 21:52     ` Peter Xu
@ 2021-09-29 19:06       ` Leonardo Bras Soares Passos
  2021-09-30  8:34         ` Daniel P. Berrangé
  0 siblings, 1 reply; 23+ messages in thread
From: Leonardo Bras Soares Passos @ 2021-09-29 19:06 UTC (permalink / raw)
  To: Peter Xu
  Cc: qemu-devel, Jason Wang, Daniel P. Berrangé,
	Dr. David Alan Gilbert, Juan Quintela

Hello Peter, thanks for reviewing!

On Tue, Sep 28, 2021 at 6:52 PM Peter Xu <peterx@redhat.com> wrote:
>
> On Fri, Sep 24, 2021 at 06:16:04PM +0100, Daniel P. Berrangé wrote:
> > > +ssize_t qio_channel_async_writev(QIOChannel *ioc,
> > > +                                 const struct iovec *iov,
> > > +                                 size_t niov,
> > > +                                 int *fds,
> > > +                                 size_t nfds,
> > > +                                 Error **errp);
> >
> > This is missing any flags. We need something like
> >
> >    QIO_CHANNEL_WRITE_FLAG_ZEROCOPY
> >
> > passed in an 'unsigned int flags' parameter. This in
> > turn makes me question whether we should have the
> > common helpers at all, as the api is going to be
> > different for sync vs async.
> >
> > The QIOChannelFeature enum probably ought to be
> > extended with QIO_CHANNEL_FEATURE_WRITE_ZEROCOPY with
> > support for probing whether that's supported or not.
>
> I'm also wondering whether we could just drop the fds/nfds as per my knowledge
> SCM_RIGHT is the only user, at the meantime I don't see why an async interface
> would pass in any fd anyways..  Thanks,

FWIW, I think it's a great idea.
Daniel, what do you think?

>
> --
> Peter Xu
>

Best regards,
Leonardo



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v3 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd
  2021-09-29 18:34   ` Leonardo Bras Soares Passos
@ 2021-09-29 19:22     ` Peter Xu
  2021-09-29 19:48       ` Leonardo Bras Soares Passos
  0 siblings, 1 reply; 23+ messages in thread
From: Peter Xu @ 2021-09-29 19:22 UTC (permalink / raw)
  To: Leonardo Bras Soares Passos
  Cc: qemu-devel, Jason Wang, Daniel P. Berrangé,
	Dr. David Alan Gilbert, Juan Quintela

On Wed, Sep 29, 2021 at 03:34:01PM -0300, Leonardo Bras Soares Passos wrote:
> You mean like I did in pre-V1, enabling ZEROCOPY for multifd it in QMP?
> Or is this something else?

I mean any way to still be able to turn zerocopy off? E.g. when the user has no
privilege on mlock.  Thanks,

-- 
Peter Xu



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v3 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush
  2021-09-24 17:38   ` Daniel P. Berrangé
@ 2021-09-29 19:32     ` Leonardo Bras Soares Passos
  2021-09-30  8:39       ` Daniel P. Berrangé
  0 siblings, 1 reply; 23+ messages in thread
From: Leonardo Bras Soares Passos @ 2021-09-29 19:32 UTC (permalink / raw)
  To: Daniel P. Berrangé
  Cc: qemu-devel, Jason Wang, Dr. David Alan Gilbert, Peter Xu, Juan Quintela

Hello Daniel,

On Fri, Sep 24, 2021 at 2:38 PM Daniel P. Berrangé <berrange@redhat.com> wrote:
[...]
> > @@ -154,6 +171,19 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
> >          return -1;
> >      }
> >
> > +#ifdef CONFIG_LINUX
> > +    if (addr->type != SOCKET_ADDRESS_TYPE_INET) {
> > +        return 0;
> > +    }
> > +
> > +    ret = qemu_setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
> > +    if (ret >= 0) {
> > +        QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> > +        klass->io_async_writev = qio_channel_socket_async_writev;
> > +        klass->io_async_flush = qio_channel_socket_async_flush;
> > +    }
> > +#endif
>
> This is not write - the async APIs should not be tied 1:1 to ZEROCOPY
> usage - we should have them take a flag to request ZEROCOPY behaviour.

I agree, but I am not aware of how to do asynchronous send in a socket
without MSG_ZEROCOPY.

I mean, I know of the non-blocking send, but I am not sure how it
checks if everything was sent (i.e. the flush part).
Would it also be using the ERRQUEUE for that?

What would you suggest?

>
> > +
> >      return 0;
> >  }
> >
> > @@ -520,12 +550,13 @@ static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
> >      return ret;
> >  }
> >
> > -static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
> > -                                         const struct iovec *iov,
> > -                                         size_t niov,
> > -                                         int *fds,
> > -                                         size_t nfds,
> > -                                         Error **errp)
> > +static ssize_t __qio_channel_socket_writev(QIOChannel *ioc,
> > +                                           const struct iovec *iov,
> > +                                           size_t niov,
> > +                                           int *fds,
> > +                                           size_t nfds,
> > +                                           int flags,
> > +                                           Error **errp)
> >  {
> >      QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> >      ssize_t ret;
> > @@ -558,20 +589,145 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
> >      }
> >
> >   retry:
> > -    ret = sendmsg(sioc->fd, &msg, 0);
> > +    ret = sendmsg(sioc->fd, &msg, flags);
> >      if (ret <= 0) {
> > -        if (errno == EAGAIN) {
> > +        switch (errno) {
> > +        case EAGAIN:
> >              return QIO_CHANNEL_ERR_BLOCK;
> > -        }
> > -        if (errno == EINTR) {
> > +        case EINTR:
> >              goto retry;
> > +        case ENOBUFS:
> > +            return QIO_CHANNEL_ERR_NOBUFS;
> >          }
> > +
> >          error_setg_errno(errp, errno,
> >                           "Unable to write to socket");
> >          return -1;
> >      }
> >      return ret;
> >  }
> > +
> > +static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
> > +                                         const struct iovec *iov,
> > +                                         size_t niov,
> > +                                         int *fds,
> > +                                         size_t nfds,
> > +                                         Error **errp)
> > +{
> > +    return __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, 0, errp);
> > +}
> > +
> > +static ssize_t qio_channel_socket_async_writev(QIOChannel *ioc,
> > +                                               const struct iovec *iov,
> > +                                               size_t niov,
> > +                                               int *fds,
> > +                                               size_t nfds,
> > +                                               Error **errp)
> > +{
> > +    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> > +    ssize_t ret;
> > +
> > +    sioc->async_queued++;
> > +
> > +    ret = __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, MSG_ZEROCOPY,
> > +                                       errp);
> > +    if (ret == QIO_CHANNEL_ERR_NOBUFS) {
> > +        /*
> > +         * Not enough locked memory available to the process.
> > +         * Fallback to default sync callback.
> > +         */
> > +
> > +        if (errp && *errp) {
> > +            warn_reportf_err(*errp,
> > +                             "Process can't lock enough memory for using MSG_ZEROCOPY,"
> > +                             "falling back to non-zerocopy");
>
> This is not nice as it hides what is likely mis-configuration error.
> If someone asked for zerocopy, we should honour that or report an
> error back.

Yeah, that makes sense to me.
Thank you for pointing that out.

>
> > +        }
> > +
> > +        QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> > +        klass->io_async_writev = NULL;
> > +        klass->io_async_flush = NULL;
>
> Clearing the flush callback is wrong. We might have pending async
> writes that haven't been processed that, and the lack of buffers
> may be a transient problem just caused by a backlog of writes.

I agree that it's wrong.
But I think it will be deprecated anyway if we implement ZEROCOPY as
a feature instead of async, and avoid doing fallback to writev when async is
not available.


>
> > +
> > +        /* Re-send current buffer */
> > +        ret = qio_channel_socket_writev(ioc, iov, niov, fds, nfds, errp);
> > +    }
> > +
> > +    return ret;
> > +}
> > +
> > +
> > +static void qio_channel_socket_async_flush(QIOChannel *ioc,
> > +                                           Error **errp)
> > +{
> > +    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> > +    struct msghdr msg = {};
> > +    struct pollfd pfd;
> > +    struct sock_extended_err *serr;
> > +    struct cmsghdr *cm;
> > +    char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
>
> Err  sizeof(int) * SOCKET_MAX_FDS   doesn't look right. This
> buffer needs to hold 'struct sock_extended_err' instances,
> not 'int', and SOCKET_MAX_FDS is an unrelated limit.

That was a bad mistake,
I got it by reusing code from above functions, and it got past the review I
did before sending the patch.
Sorry about that.

>
> > +    int ret;
> > +
> > +    memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
> > +    msg.msg_control = control;
> > +    msg.msg_controllen = sizeof(control);
> > +
> > +    while (sioc->async_sent < sioc->async_queued) {
> > +        ret = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
> > +        if (ret < 0) {
> > +            if (errno == EAGAIN) {
> > +                /* Nothing on errqueue, wait */
> > +                pfd.fd = sioc->fd;
> > +                pfd.events = 0;
> > +                ret = poll(&pfd, 1, 250);
> > +                if (ret == 0) {
> > +                    /*
> > +                     * Timeout : After 250ms without receiving any zerocopy
> > +                     * notification, consider all data as sent.
> > +                     */
>
> This feels very dubious indeed. If some caller needs a guarantee that the
> data was successfully sent, merely waiting 250ms is not going to be reliable
> enough.

That makes sense.
I added this part because at some point in debugging I got an infinite
loop in this part
(I think it was somehow missing some notifications).

>
> A regular non-async + non-zerocopy right will wait as long as is needed
> unless SO_SNDTIMEO has been set on the socket.

So It would be ok to let it loop here?
Maybe the timeout could be only enough to keep the cpu from getting
stuck in here.

>
> At the very least the timeout ought to be a parameter passed in, and the
> return value should indicate whether it timed out, or report how many
> pending writes still aren't processed, so the caller can decide whether
> to call flush again.

That is also makes sense to me.

>
> > +                    break;
> > +                } else if (ret < 0 ||
> > +                           (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))) {
> > +                    error_setg_errno(errp, errno,
> > +                                     "Poll error");
> > +                    break;
> > +                } else {
> > +                    continue;
> > +                }
> > +            }
> > +            if (errno == EINTR) {
> > +                continue;
> > +            }
> > +
> > +            error_setg_errno(errp, errno,
> > +                             "Unable to read errqueue");
> > +            break;
> > +        }
> > +
> > +        cm = CMSG_FIRSTHDR(&msg);
> > +        if (cm->cmsg_level != SOL_IP &&
> > +            cm->cmsg_type != IP_RECVERR) {
> > +            error_setg_errno(errp, EPROTOTYPE,
> > +                             "Wrong cmsg in errqueue");
> > +            break;
> > +        }
> > +
> > +        serr = (void *) CMSG_DATA(cm);
> > +        if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
> > +            error_setg_errno(errp, serr->ee_errno,
> > +                             "Error on socket");
> > +            break;
> > +        }
> > +        if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
> > +            error_setg_errno(errp, serr->ee_origin,
> > +                             "Error not from zerocopy");
> > +            break;
> > +        }
> > +
> > +        /* No errors, count sent ids*/
> > +        sioc->async_sent += serr->ee_data - serr->ee_info + 1;
> > +    }
> > +}
> > +
> > +
> >  #else /* WIN32 */
> >  static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
> >                                          const struct iovec *iov,
>
> Regards,
> Daniel
> --
> |: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
> |: https://libvirt.org         -o-            https://fstop138.berrange.com :|
> |: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|
>

Thanks Daniel,

Best regards,
Leonardo Bras



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v3 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush
  2021-09-28 22:45   ` Peter Xu
@ 2021-09-29 19:36     ` Leonardo Bras Soares Passos
  2021-09-29 19:58       ` Peter Xu
  0 siblings, 1 reply; 23+ messages in thread
From: Leonardo Bras Soares Passos @ 2021-09-29 19:36 UTC (permalink / raw)
  To: Peter Xu
  Cc: qemu-devel, Jason Wang, Daniel P. Berrangé,
	Dr. David Alan Gilbert, Juan Quintela

On Tue, Sep 28, 2021 at 7:45 PM Peter Xu <peterx@redhat.com> wrote:
>
> On Wed, Sep 22, 2021 at 07:24:22PM -0300, Leonardo Bras wrote:
> > +static void qio_channel_socket_async_flush(QIOChannel *ioc,
> > +                                           Error **errp)
> > +{
> > +    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> > +    struct msghdr msg = {};
> > +    struct pollfd pfd;
> > +    struct sock_extended_err *serr;
> > +    struct cmsghdr *cm;
> > +    char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
> > +    int ret;
> > +
> > +    memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
> > +    msg.msg_control = control;
> > +    msg.msg_controllen = sizeof(control);
> > +
> > +    while (sioc->async_sent < sioc->async_queued) {
> > +        ret = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
> > +        if (ret < 0) {
> > +            if (errno == EAGAIN) {
> > +                /* Nothing on errqueue, wait */
> > +                pfd.fd = sioc->fd;
> > +                pfd.events = 0;
> > +                ret = poll(&pfd, 1, 250);
> > +                if (ret == 0) {
> > +                    /*
> > +                     * Timeout : After 250ms without receiving any zerocopy
> > +                     * notification, consider all data as sent.
> > +                     */
> > +                    break;
>
> After a timeout, we'll break the while loop and continue parsing an invalid
> msg [1].  Is that what we want?

No, the point here was returning from flush if this (long) timeout
happened, as in
"if asso long has passed, there must be no pending send", which I
agree is quite bad,
but it was all I could think to avoid an infinite loop here if
something goes wrong.

>
> Also, I don't think we can return the flush() even if timed out - iiuc we
> should keep polling until we have async_sent==async_queued.  It depends on how
> we define flush(): if it's "when this function returns all data is sent", then
> we should keep polling, and afaict this is what we want here right now.

Yeah, I agree.
That is the correct way to deal with this.

>
> > +                } else if (ret < 0 ||
> > +                           (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))) {
> > +                    error_setg_errno(errp, errno,
> > +                                     "Poll error");
> > +                    break;
> > +                } else {
> > +                    continue;
> > +                }
> > +            }
> > +            if (errno == EINTR) {
> > +                continue;
> > +            }
> > +
> > +            error_setg_errno(errp, errno,
> > +                             "Unable to read errqueue");
> > +            break;
> > +        }
> > +
> > +        cm = CMSG_FIRSTHDR(&msg);
>
> [1]
>
> > +        if (cm->cmsg_level != SOL_IP &&
> > +            cm->cmsg_type != IP_RECVERR) {
> > +            error_setg_errno(errp, EPROTOTYPE,
> > +                             "Wrong cmsg in errqueue");
> > +            break;
> > +        }
>
> --
> Peter Xu
>

Best regards,
Leonardo



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v3 3/3] multifd: Send using asynchronous write on nocomp to send RAM pages.
  2021-09-24 17:43   ` Daniel P. Berrangé
  2021-09-28 22:48     ` Peter Xu
@ 2021-09-29 19:44     ` Leonardo Bras Soares Passos
  1 sibling, 0 replies; 23+ messages in thread
From: Leonardo Bras Soares Passos @ 2021-09-29 19:44 UTC (permalink / raw)
  To: Daniel P. Berrangé
  Cc: qemu-devel, Jason Wang, Dr. David Alan Gilbert, Peter Xu, Juan Quintela

On Fri, Sep 24, 2021 at 2:44 PM Daniel P. Berrangé <berrange@redhat.com> wrote:
>
> On Wed, Sep 22, 2021 at 07:24:23PM -0300, Leonardo Bras wrote:
> > Change multifd nocomp version to use asynchronous write for RAM pages, and
> > benefit of MSG_ZEROCOPY when it's available.
> >
> > The asynchronous flush happens on cleanup only, before destroying the QIOChannel.
> >
> > This will work fine on RAM migration because the RAM pages are not usually freed,
> > and there is no problem on changing the pages content between async_send() and
> > the actual sending of the buffer, because this change will dirty the page and
> > cause it to be re-sent on a next iteration anyway.
> >
> > Signed-off-by: Leonardo Bras <leobras@redhat.com>
> > ---
> >  migration/multifd.c | 3 ++-
> >  1 file changed, 2 insertions(+), 1 deletion(-)
> >
> > diff --git a/migration/multifd.c b/migration/multifd.c
> > index 377da78f5b..d247207a0a 100644
> > --- a/migration/multifd.c
> > +++ b/migration/multifd.c
> > @@ -105,7 +105,7 @@ static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used,
> >   */
> >  static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
> >  {
> > -    return qio_channel_writev_all(p->c, p->pages->iov, used, errp);
> > +    return qio_channel_async_writev_all(p->c, p->pages->iov, used, errp);
> >  }
>
> This needs to be made conditional so zeroopy is only used if rquested
> by the mgmt app, and it isn't going to work in all cases (eg TLS) so
> silently enabling it is not good.

I agree, that seems a better approach.


>
> >
> >  /**
> > @@ -546,6 +546,7 @@ void multifd_save_cleanup(void)
> >          MultiFDSendParams *p = &multifd_send_state->params[i];
> >          Error *local_err = NULL;
> >
> > +        qio_channel_async_flush(p->c, NULL);
> >          socket_send_channel_destroy(p->c);
> >          p->c = NULL;
> >          qemu_mutex_destroy(&p->mutex);
>
> This isn't reliable beucase qio_channel_async_flush will return early
> even if not everything is flushed.

Yeah, I need to make sure qio_channel_async_flush will only return after
everything is sent, or at least return the number of missing requests.

>
> Regards,
> Daniel
> --
> |: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
> |: https://libvirt.org         -o-            https://fstop138.berrange.com :|
> |: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|
>

Best regards,
Leonardo



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v3 3/3] multifd: Send using asynchronous write on nocomp to send RAM pages.
  2021-09-28 22:48     ` Peter Xu
@ 2021-09-29 19:46       ` Leonardo Bras Soares Passos
  0 siblings, 0 replies; 23+ messages in thread
From: Leonardo Bras Soares Passos @ 2021-09-29 19:46 UTC (permalink / raw)
  To: Peter Xu
  Cc: qemu-devel, Jason Wang, Daniel P. Berrangé,
	Dr. David Alan Gilbert, Juan Quintela

On Tue, Sep 28, 2021 at 7:49 PM Peter Xu <peterx@redhat.com> wrote:
>
> On Fri, Sep 24, 2021 at 06:43:49PM +0100, Daniel P. Berrangé wrote:
> > > @@ -546,6 +546,7 @@ void multifd_save_cleanup(void)
> > >          MultiFDSendParams *p = &multifd_send_state->params[i];
> > >          Error *local_err = NULL;
> > >
> > > +        qio_channel_async_flush(p->c, NULL);
> > >          socket_send_channel_destroy(p->c);
> > >          p->c = NULL;
> > >          qemu_mutex_destroy(&p->mutex);
> >
> > This isn't reliable beucase qio_channel_async_flush will return early
> > even if not everything is flushed.
>
> Right, though I think the problem is in patch 2 where we should make sure
> flush() will only return if all data sent.
>
> And at the meantime we may also want to move it to before we send the devices
> states for both pre/post copy.  multifd_save_cleanup() is called only until
> migration completed and we're cleaning stuffs, I'm afraid it's too late, so
> potentially the device states can arrive and dest vm running without the latest
> src VM memories.

Thanks for that Peter!
I had some trouble sorting out when to flush, and I think I still missed this.

I will have thet improved for v3.

>
> --
> Peter Xu
>

Best regards,
Leonardo



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v3 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd
  2021-09-29 19:22     ` Peter Xu
@ 2021-09-29 19:48       ` Leonardo Bras Soares Passos
  0 siblings, 0 replies; 23+ messages in thread
From: Leonardo Bras Soares Passos @ 2021-09-29 19:48 UTC (permalink / raw)
  To: Peter Xu
  Cc: qemu-devel, Jason Wang, Daniel P. Berrangé,
	Dr. David Alan Gilbert, Juan Quintela

On Wed, Sep 29, 2021 at 4:23 PM Peter Xu <peterx@redhat.com> wrote:
>
> On Wed, Sep 29, 2021 at 03:34:01PM -0300, Leonardo Bras Soares Passos wrote:
> > You mean like I did in pre-V1, enabling ZEROCOPY for multifd it in QMP?
> > Or is this something else?
>
> I mean any way to still be able to turn zerocopy off? E.g. when the user has no
> privilege on mlock.  Thanks,
>
> --
> Peter Xu
>

Yeah, that makes sense in the new approach of failing when zerocopy is
not possible.
I will make sure to do the in v3.

Best regards,
Leonardo



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v3 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush
  2021-09-29 19:36     ` Leonardo Bras Soares Passos
@ 2021-09-29 19:58       ` Peter Xu
  0 siblings, 0 replies; 23+ messages in thread
From: Peter Xu @ 2021-09-29 19:58 UTC (permalink / raw)
  To: Leonardo Bras Soares Passos
  Cc: qemu-devel, Jason Wang, Daniel P. Berrangé,
	Dr. David Alan Gilbert, Juan Quintela

On Wed, Sep 29, 2021 at 04:36:10PM -0300, Leonardo Bras Soares Passos wrote:
> On Tue, Sep 28, 2021 at 7:45 PM Peter Xu <peterx@redhat.com> wrote:
> >
> > On Wed, Sep 22, 2021 at 07:24:22PM -0300, Leonardo Bras wrote:
> > > +static void qio_channel_socket_async_flush(QIOChannel *ioc,
> > > +                                           Error **errp)
> > > +{
> > > +    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> > > +    struct msghdr msg = {};
> > > +    struct pollfd pfd;
> > > +    struct sock_extended_err *serr;
> > > +    struct cmsghdr *cm;
> > > +    char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
> > > +    int ret;
> > > +
> > > +    memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
> > > +    msg.msg_control = control;
> > > +    msg.msg_controllen = sizeof(control);
> > > +
> > > +    while (sioc->async_sent < sioc->async_queued) {
> > > +        ret = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
> > > +        if (ret < 0) {
> > > +            if (errno == EAGAIN) {
> > > +                /* Nothing on errqueue, wait */
> > > +                pfd.fd = sioc->fd;
> > > +                pfd.events = 0;
> > > +                ret = poll(&pfd, 1, 250);
> > > +                if (ret == 0) {
> > > +                    /*
> > > +                     * Timeout : After 250ms without receiving any zerocopy
> > > +                     * notification, consider all data as sent.
> > > +                     */
> > > +                    break;
> >
> > After a timeout, we'll break the while loop and continue parsing an invalid
> > msg [1].  Is that what we want?
> 
> No, the point here was returning from flush if this (long) timeout
> happened, as in
> "if asso long has passed, there must be no pending send", which I
> agree is quite bad,
> but it was all I could think to avoid an infinite loop here if
> something goes wrong.

IMHO it's the same when we write() to a socket but the buffer is always full,
we'll simply block there until it has some space.  I don't know what we can do
here besides infinite loop on the timeout - we shouldn't eat the cpu all, but
we should still wait?

-- 
Peter Xu



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v3 1/3] QIOChannel: Add io_async_writev & io_async_flush callbacks
  2021-09-29 19:06       ` Leonardo Bras Soares Passos
@ 2021-09-30  8:34         ` Daniel P. Berrangé
  0 siblings, 0 replies; 23+ messages in thread
From: Daniel P. Berrangé @ 2021-09-30  8:34 UTC (permalink / raw)
  To: Leonardo Bras Soares Passos
  Cc: qemu-devel, Jason Wang, Dr. David Alan Gilbert, Peter Xu, Juan Quintela

On Wed, Sep 29, 2021 at 04:06:33PM -0300, Leonardo Bras Soares Passos wrote:
> Hello Peter, thanks for reviewing!
> 
> On Tue, Sep 28, 2021 at 6:52 PM Peter Xu <peterx@redhat.com> wrote:
> >
> > On Fri, Sep 24, 2021 at 06:16:04PM +0100, Daniel P. Berrangé wrote:
> > > > +ssize_t qio_channel_async_writev(QIOChannel *ioc,
> > > > +                                 const struct iovec *iov,
> > > > +                                 size_t niov,
> > > > +                                 int *fds,
> > > > +                                 size_t nfds,
> > > > +                                 Error **errp);
> > >
> > > This is missing any flags. We need something like
> > >
> > >    QIO_CHANNEL_WRITE_FLAG_ZEROCOPY
> > >
> > > passed in an 'unsigned int flags' parameter. This in
> > > turn makes me question whether we should have the
> > > common helpers at all, as the api is going to be
> > > different for sync vs async.
> > >
> > > The QIOChannelFeature enum probably ought to be
> > > extended with QIO_CHANNEL_FEATURE_WRITE_ZEROCOPY with
> > > support for probing whether that's supported or not.
> >
> > I'm also wondering whether we could just drop the fds/nfds as per my knowledge
> > SCM_RIGHT is the only user, at the meantime I don't see why an async interface
> > would pass in any fd anyways..  Thanks,
> 
> FWIW, I think it's a great idea.
> Daniel, what do you think?

Yes, FD passing is not compatible with async operations, becuase it is
too complex to deal with FD lifetime on failure to send IO


Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v3 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush
  2021-09-29 19:32     ` Leonardo Bras Soares Passos
@ 2021-09-30  8:39       ` Daniel P. Berrangé
  0 siblings, 0 replies; 23+ messages in thread
From: Daniel P. Berrangé @ 2021-09-30  8:39 UTC (permalink / raw)
  To: Leonardo Bras Soares Passos
  Cc: qemu-devel, Jason Wang, Dr. David Alan Gilbert, Peter Xu, Juan Quintela

On Wed, Sep 29, 2021 at 04:32:12PM -0300, Leonardo Bras Soares Passos wrote:
> Hello Daniel,
> 
> On Fri, Sep 24, 2021 at 2:38 PM Daniel P. Berrangé <berrange@redhat.com> wrote:
> [...]
> > > @@ -154,6 +171,19 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
> > >          return -1;
> > >      }
> > >
> > > +#ifdef CONFIG_LINUX
> > > +    if (addr->type != SOCKET_ADDRESS_TYPE_INET) {
> > > +        return 0;
> > > +    }
> > > +
> > > +    ret = qemu_setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
> > > +    if (ret >= 0) {
> > > +        QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> > > +        klass->io_async_writev = qio_channel_socket_async_writev;
> > > +        klass->io_async_flush = qio_channel_socket_async_flush;
> > > +    }
> > > +#endif
> >
> > This is not write - the async APIs should not be tied 1:1 to ZEROCOPY
> > usage - we should have them take a flag to request ZEROCOPY behaviour.
> 
> I agree, but I am not aware of how to do asynchronous send in a socket
> without MSG_ZEROCOPY.
> 
> I mean, I know of the non-blocking send, but I am not sure how it
> checks if everything was sent (i.e. the flush part).
> Would it also be using the ERRQUEUE for that?
> 
> What would you suggest?

Yeah, there isn't any really. I guess I'm anticipating a future that
probably won't exist.  Lets just call the callbacks 'io_write_zerocopy'
and 'io_flush_zerocopy' and ignore the flag.


Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|



^ permalink raw reply	[flat|nested] 23+ messages in thread

end of thread, other threads:[~2021-09-30  8:40 UTC | newest]

Thread overview: 23+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-09-22 22:24 [PATCH v3 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd Leonardo Bras
2021-09-22 22:24 ` [PATCH v3 1/3] QIOChannel: Add io_async_writev & io_async_flush callbacks Leonardo Bras
2021-09-24 17:16   ` Daniel P. Berrangé
2021-09-28 21:52     ` Peter Xu
2021-09-29 19:06       ` Leonardo Bras Soares Passos
2021-09-30  8:34         ` Daniel P. Berrangé
2021-09-29 19:03     ` Leonardo Bras Soares Passos
2021-09-22 22:24 ` [PATCH v3 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush Leonardo Bras
2021-09-24 17:38   ` Daniel P. Berrangé
2021-09-29 19:32     ` Leonardo Bras Soares Passos
2021-09-30  8:39       ` Daniel P. Berrangé
2021-09-28 22:45   ` Peter Xu
2021-09-29 19:36     ` Leonardo Bras Soares Passos
2021-09-29 19:58       ` Peter Xu
2021-09-22 22:24 ` [PATCH v3 3/3] multifd: Send using asynchronous write on nocomp to send RAM pages Leonardo Bras
2021-09-24 17:43   ` Daniel P. Berrangé
2021-09-28 22:48     ` Peter Xu
2021-09-29 19:46       ` Leonardo Bras Soares Passos
2021-09-29 19:44     ` Leonardo Bras Soares Passos
2021-09-28 22:50 ` [PATCH v3 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd Peter Xu
2021-09-29 18:34   ` Leonardo Bras Soares Passos
2021-09-29 19:22     ` Peter Xu
2021-09-29 19:48       ` Leonardo Bras Soares Passos

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.