All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCHSET RFC 0/7] Send and receive bundles
@ 2024-03-08 23:34 Jens Axboe
  2024-03-08 23:34 ` [PATCH 1/7] io_uring/net: add generic multishot retry helper Jens Axboe
                   ` (7 more replies)
  0 siblings, 8 replies; 9+ messages in thread
From: Jens Axboe @ 2024-03-08 23:34 UTC (permalink / raw)
  To: io-uring; +Cc: asml.silence, dyudaken, dw

Hi,

I went back to the drawing board a bit on the send multishot, and this
is what came out.

First support was added for provided buffers for send. This works like
provided buffers for recv/recvmsg, and the intent here to use the buffer
ring queue as an outgoing sequence for sending.

But the real meat is adding support for picking multiple buffers at the
time, what I dubbed "bundles" here. Rather than just pick a single buffer
for send, it can pick a bunch of them and send them in one go. The idea
here is that the expensive part of a request is not the sqe issue, it's
the fact that we have to do each buffer separately. That entails calling
all the way down into the networking stack, locking the socket, checking
what needs doing afterwards (like flushing the backlog), unlocking the
socket, etc. If we have an outgoing send queue, then pick what buffers
we have (up to a certain cap), and pass them to the networking stack in
one go.

Bundles must be used with provided buffers, obviously. At completion
time, they pass the starting buffer ID in cqe->flags, like any other
provided buffer completion. cqe->res is the TOTAL number of bytes sent,
so it's up to the application to iterate buffers to figure out how many
completed. This part is trivial. I'll push the proxy changes out soon,
just need to cleanup them up as I did the sendmsg bundling too and would
love to compare.

With that in place, I added support for recv for bundles as well. Exactly
the same as the send side - if we have a known amount of data pending,
pick enough buffers to satisfy the receive and post a single completion
for that round. Buffer ID in cqe->flags, cqe->res is the total number of
buffers sent. Receive can be used with multishot as well - fire off one
multishot recv, and keep getting big completions. Unfortunately, recvmsg
multishot is just not as efficient as recv, as it carries additional
data that needs copying. recv multishot with bundles provide a good
alternative to recvmsg, if all you need is more than one range of data.
I'll compare these too soon as well.

This is obviously a bigger win for smaller packets than for large ones,
as the overall cost of entering sys_sendmsg/sys_recvmsg() in terms of
throughput decreases as the packet size increases. For the extreme end,
using 32b packets, performance increases substantially. Runtime for
proxying 32b packets between three machines on a 10G link for the test:

Send ring:		3462 msec		1183Mbit
Send ring + bundles	 844 msec		4853Mbit

and bundles reach 100% bandwidth at 80b of packet size, compared to send
ring alone needing 320b to reach 95% of bandwidth (I didn't redo that
test so don't have the 100% number).

Patches are on top of my for-6.9/io_uring branch and can also be found
here:

https://git.kernel.dk/cgit/linux/log/?h=io_uring-recvsend-bundle

 include/linux/io_uring_types.h |   3 +
 include/uapi/linux/io_uring.h  |  10 +
 io_uring/io_uring.c            |   3 +-
 io_uring/kbuf.c                | 203 ++++++++++++-
 io_uring/kbuf.h                |  39 ++-
 io_uring/net.c                 | 528 +++++++++++++++++++++++----------
 io_uring/net.h                 |   2 +-
 io_uring/opdef.c               |   9 +-
 8 files changed, 609 insertions(+), 188 deletions(-)

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 1/7] io_uring/net: add generic multishot retry helper
  2024-03-08 23:34 [PATCHSET RFC 0/7] Send and receive bundles Jens Axboe
@ 2024-03-08 23:34 ` Jens Axboe
  2024-03-08 23:34 ` [PATCH 2/7] io_uring/net: add provided buffer support for IORING_OP_SEND Jens Axboe
                   ` (6 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Jens Axboe @ 2024-03-08 23:34 UTC (permalink / raw)
  To: io-uring; +Cc: asml.silence, dyudaken, dw, Jens Axboe

This is just moving io_recv_prep_retry() higher up so we can use it
for sends as well, and renaming it to be generically useful for both
sends and receives.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 io_uring/net.c | 22 +++++++++++-----------
 1 file changed, 11 insertions(+), 11 deletions(-)

diff --git a/io_uring/net.c b/io_uring/net.c
index 19451f0dbf81..97559cdec98e 100644
--- a/io_uring/net.c
+++ b/io_uring/net.c
@@ -191,6 +191,16 @@ static int io_setup_async_msg(struct io_kiocb *req,
 	return -EAGAIN;
 }
 
+static inline void io_mshot_prep_retry(struct io_kiocb *req)
+{
+	struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
+
+	req->flags &= ~REQ_F_BL_EMPTY;
+	sr->done_io = 0;
+	sr->len = 0; /* get from the provided buffer */
+	req->buf_index = sr->buf_group;
+}
+
 #ifdef CONFIG_COMPAT
 static int io_compat_msg_copy_hdr(struct io_kiocb *req,
 				  struct io_async_msghdr *iomsg,
@@ -668,16 +678,6 @@ int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 	return 0;
 }
 
-static inline void io_recv_prep_retry(struct io_kiocb *req)
-{
-	struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
-
-	req->flags &= ~REQ_F_BL_EMPTY;
-	sr->done_io = 0;
-	sr->len = 0; /* get from the provided buffer */
-	req->buf_index = sr->buf_group;
-}
-
 /*
  * Finishes io_recv and io_recvmsg.
  *
@@ -704,7 +704,7 @@ static inline bool io_recv_finish(struct io_kiocb *req, int *ret,
 		struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
 		int mshot_retry_ret = IOU_ISSUE_SKIP_COMPLETE;
 
-		io_recv_prep_retry(req);
+		io_mshot_prep_retry(req);
 		/* Known not-empty or unknown state, retry */
 		if (cflags & IORING_CQE_F_SOCK_NONEMPTY || msg->msg_inq < 0) {
 			if (sr->nr_multishot_loops++ < MULTISHOT_MAX_RETRY)
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH 2/7] io_uring/net: add provided buffer support for IORING_OP_SEND
  2024-03-08 23:34 [PATCHSET RFC 0/7] Send and receive bundles Jens Axboe
  2024-03-08 23:34 ` [PATCH 1/7] io_uring/net: add generic multishot retry helper Jens Axboe
@ 2024-03-08 23:34 ` Jens Axboe
  2024-03-08 23:34 ` [PATCH 3/7] io_uring/kbuf: add helpers for getting/peeking multiple buffers Jens Axboe
                   ` (5 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Jens Axboe @ 2024-03-08 23:34 UTC (permalink / raw)
  To: io-uring; +Cc: asml.silence, dyudaken, dw, Jens Axboe

It's pretty trivial to wire up provided buffer support for the send
side, just like we do on the receive side. This enables setting up
a buffer ring that an application can use to push pending sends to,
and then have a send pick a buffer from that ring.

One of the challenges with async IO and networking sends is that you
can get into reordering conditions if you have more than one inflight
at the same time. Consider the following scenario where everything is
fine:

1) App queues sendA for socket1
2) App queues sendB for socket1
3) App does io_uring_submit()
4) sendA is issued, completes successfully, posts CQE
5) sendB is issued, completes successfully, posts CQE

All is fine. Requests are always issued in-order, and both complete
inline as most sends do.

However, if we're flooding socket1 with sends, the following could
also result from the same sequence:

1) App queues sendA for socket1
2) App queues sendB for socket1
3) App does io_uring_submit()
4) sendA is issued, socket1 is full, poll is armed for retry
5) Space frees up in socket1, this triggers sendA retry via task_work
6) sendB is issued, completes successfully, posts CQE
7) sendA is retried, completes successfully, posts CQE

Now we've sent sendB before sendA, which can make things unhappy. If
both sendA and sendB had been using provided buffers, then it would look
as follows instead:

1) App queues dataA for sendA, queues sendA for socket1
2) App queues dataB for sendB queues sendB for socket1
3) App does io_uring_submit()
4) sendA is issued, socket1 is full, poll is armed for retry
5) Space frees up in socket1, this triggers sendA retry via task_work
6) sendB is issued, picks first buffer (dataA), completes successfully,
   posts CQE (which says "I sent dataA")
7) sendA is retried, picks first buffer (dataB), completes successfully,
   posts CQE (which says "I sent dataB")

Now we've sent the data in order, and everybody is happy.

It's worth noting that this also opens the door for supporting multishot
sends, as provided buffers would be a prerequisite for that. Those can
trigger either when new buffers are added to the outgoing ring, or (if
stalled due to lack of space) when space frees up in the socket.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 io_uring/net.c   | 19 ++++++++++++++++---
 io_uring/opdef.c |  1 +
 2 files changed, 17 insertions(+), 3 deletions(-)

diff --git a/io_uring/net.c b/io_uring/net.c
index 97559cdec98e..566ef401f976 100644
--- a/io_uring/net.c
+++ b/io_uring/net.c
@@ -484,8 +484,10 @@ int io_send(struct io_kiocb *req, unsigned int issue_flags)
 {
 	struct sockaddr_storage __address;
 	struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
-	struct msghdr msg;
+	size_t len = sr->len;
 	struct socket *sock;
+	unsigned int cflags;
+	struct msghdr msg;
 	unsigned flags;
 	int min_ret = 0;
 	int ret;
@@ -518,7 +520,17 @@ int io_send(struct io_kiocb *req, unsigned int issue_flags)
 	if (unlikely(!sock))
 		return -ENOTSOCK;
 
-	ret = import_ubuf(ITER_SOURCE, sr->buf, sr->len, &msg.msg_iter);
+	if (io_do_buffer_select(req)) {
+		void __user *buf;
+
+		buf = io_buffer_select(req, &len, issue_flags);
+		if (!buf)
+			return -ENOBUFS;
+		sr->buf = buf;
+		sr->len = len;
+	}
+
+	ret = import_ubuf(ITER_SOURCE, sr->buf, len, &msg.msg_iter);
 	if (unlikely(ret))
 		return ret;
 
@@ -550,7 +562,8 @@ int io_send(struct io_kiocb *req, unsigned int issue_flags)
 		ret += sr->done_io;
 	else if (sr->done_io)
 		ret = sr->done_io;
-	io_req_set_res(req, ret, 0);
+	cflags = io_put_kbuf(req, issue_flags);
+	io_req_set_res(req, ret, cflags);
 	return IOU_OK;
 }
 
diff --git a/io_uring/opdef.c b/io_uring/opdef.c
index 9c080aadc5a6..88fbe5cfd379 100644
--- a/io_uring/opdef.c
+++ b/io_uring/opdef.c
@@ -273,6 +273,7 @@ const struct io_issue_def io_issue_defs[] = {
 		.audit_skip		= 1,
 		.ioprio			= 1,
 		.manual_alloc		= 1,
+		.buffer_select		= 1,
 #if defined(CONFIG_NET)
 		.prep			= io_sendmsg_prep,
 		.issue			= io_send,
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH 3/7] io_uring/kbuf: add helpers for getting/peeking multiple buffers
  2024-03-08 23:34 [PATCHSET RFC 0/7] Send and receive bundles Jens Axboe
  2024-03-08 23:34 ` [PATCH 1/7] io_uring/net: add generic multishot retry helper Jens Axboe
  2024-03-08 23:34 ` [PATCH 2/7] io_uring/net: add provided buffer support for IORING_OP_SEND Jens Axboe
@ 2024-03-08 23:34 ` Jens Axboe
  2024-03-08 23:34 ` [PATCH 4/7] io_uring/net: switch io_send() and io_send_zc() to using io_async_msghdr Jens Axboe
                   ` (4 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Jens Axboe @ 2024-03-08 23:34 UTC (permalink / raw)
  To: io-uring; +Cc: asml.silence, dyudaken, dw, Jens Axboe

Our provided buffer interface only allows selection of a single buffer.
Add an API that allows getting/peeking multiple buffers at the same time.

This is only implemented for the ring provided buffers. It could be added
for the legacy provided buffers as well, but since it's strongly
encouraged to use the new interface, let's keep it simpler and just
provide it for the new API. The legacy interface will always just select
a single buffer.

There are two new main functions:

io_buffers_select(), which selects up as many buffers as it can. The
caller supplies the iovec array, and io_buffers_select() may allocate
a bigger array if the 'out_len' being passed in is non-zero and bigger
than what we can fit in the provided iovec. Buffers grabbed with this
helper are permanently assigned.

io_buffers_peek(), which works like io_buffers_select(), except they can
be recycled, if needed. Callers using either of these functions should
call io_put_kbufs() rather than io_put_kbuf() at completion time. The
peek interface must be called with the ctx locked from peek to
completion.

This add a bit state for the request:

- REQ_F_BUFFERS_COMMIT, which means that the the buffers have been
  peeked and should be committed to the buffer ring head when they are
  put as part of completion. Prior to this, we used the fact that
  req->buf_list was cleared to NULL when committed. But with the peek
  interface requiring the ring to be locked throughout the operation,
  we can use that as a lookup cache instead.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 include/linux/io_uring_types.h |   3 +
 io_uring/kbuf.c                | 203 ++++++++++++++++++++++++++++++---
 io_uring/kbuf.h                |  39 +++++--
 3 files changed, 223 insertions(+), 22 deletions(-)

diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index e24893625085..971294dfd22e 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -481,6 +481,7 @@ enum {
 	REQ_F_CAN_POLL_BIT,
 	REQ_F_BL_EMPTY_BIT,
 	REQ_F_BL_NO_RECYCLE_BIT,
+	REQ_F_BUFFERS_COMMIT_BIT,
 
 	/* not a real bit, just to check we're not overflowing the space */
 	__REQ_F_LAST_BIT,
@@ -559,6 +560,8 @@ enum {
 	REQ_F_BL_EMPTY		= IO_REQ_FLAG(REQ_F_BL_EMPTY_BIT),
 	/* don't recycle provided buffers for this request */
 	REQ_F_BL_NO_RECYCLE	= IO_REQ_FLAG(REQ_F_BL_NO_RECYCLE_BIT),
+	/* buffer ring head needs incrementing on put */
+	REQ_F_BUFFERS_COMMIT	= IO_REQ_FLAG(REQ_F_BUFFERS_COMMIT_BIT),
 };
 
 typedef void (*io_req_tw_func_t)(struct io_kiocb *req, struct io_tw_state *ts);
diff --git a/io_uring/kbuf.c b/io_uring/kbuf.c
index 9be42bff936b..921e8e25e027 100644
--- a/io_uring/kbuf.c
+++ b/io_uring/kbuf.c
@@ -140,34 +140,57 @@ static void __user *io_provided_buffer_select(struct io_kiocb *req, size_t *len,
 	return NULL;
 }
 
+static int io_provided_buffers_select(struct io_kiocb *req, size_t *len,
+				      struct io_buffer_list *bl,
+				      struct iovec *iov)
+{
+	void __user *buf;
+
+	buf = io_provided_buffer_select(req, len, bl);
+	if (unlikely(!buf))
+		return -ENOBUFS;
+
+	iov[0].iov_base = buf;
+	iov[0].iov_len = *len;
+	return 0;
+}
+
+static struct io_uring_buf *io_ring_head_to_buf(struct io_buffer_list *bl,
+						__u16 head)
+{
+	head &= bl->mask;
+
+	/* mmaped buffers are always contig */
+	if (bl->is_mmap || head < IO_BUFFER_LIST_BUF_PER_PAGE) {
+		return &bl->buf_ring->bufs[head];
+	} else {
+		int off = head & (IO_BUFFER_LIST_BUF_PER_PAGE - 1);
+		int index = head / IO_BUFFER_LIST_BUF_PER_PAGE;
+		struct io_uring_buf *buf;
+
+		buf = page_address(bl->buf_pages[index]);
+		return buf + off;
+	}
+}
+
 static void __user *io_ring_buffer_select(struct io_kiocb *req, size_t *len,
 					  struct io_buffer_list *bl,
 					  unsigned int issue_flags)
 {
-	struct io_uring_buf_ring *br = bl->buf_ring;
 	__u16 tail, head = bl->head;
 	struct io_uring_buf *buf;
 
-	tail = smp_load_acquire(&br->tail);
+	tail = smp_load_acquire(&bl->buf_ring->tail);
 	if (unlikely(tail == head))
 		return NULL;
 
 	if (head + 1 == tail)
 		req->flags |= REQ_F_BL_EMPTY;
 
-	head &= bl->mask;
-	/* mmaped buffers are always contig */
-	if (bl->is_mmap || head < IO_BUFFER_LIST_BUF_PER_PAGE) {
-		buf = &br->bufs[head];
-	} else {
-		int off = head & (IO_BUFFER_LIST_BUF_PER_PAGE - 1);
-		int index = head / IO_BUFFER_LIST_BUF_PER_PAGE;
-		buf = page_address(bl->buf_pages[index]);
-		buf += off;
-	}
+	buf = io_ring_head_to_buf(bl, head);
 	if (*len == 0 || *len > buf->len)
 		*len = buf->len;
-	req->flags |= REQ_F_BUFFER_RING;
+	req->flags |= REQ_F_BUFFER_RING | REQ_F_BUFFERS_COMMIT;
 	req->buf_list = bl;
 	req->buf_index = buf->bid;
 
@@ -182,6 +205,7 @@ static void __user *io_ring_buffer_select(struct io_kiocb *req, size_t *len,
 		 * the transfer completes (or if we get -EAGAIN and must poll of
 		 * retry).
 		 */
+		req->flags &= ~REQ_F_BUFFERS_COMMIT;
 		req->buf_list = NULL;
 		bl->head++;
 	}
@@ -208,6 +232,159 @@ void __user *io_buffer_select(struct io_kiocb *req, size_t *len,
 	return ret;
 }
 
+static int io_ring_buffers_peek(struct io_kiocb *req, struct iovec **iovs,
+				int nr_iovs, size_t *out_len,
+				struct io_buffer_list *bl)
+{
+	struct iovec *iov = *iovs;
+	__u16 nr_avail, tail, head;
+	struct io_uring_buf *buf;
+	size_t max_len = 0;
+	int i;
+
+	if (*out_len) {
+		max_len = *out_len;
+		*out_len = 0;
+	}
+
+	tail = smp_load_acquire(&bl->buf_ring->tail);
+	head = bl->head;
+	nr_avail = tail - head;
+	if (unlikely(!nr_avail))
+		return -ENOBUFS;
+
+	buf = io_ring_head_to_buf(bl, head);
+	if (max_len) {
+		int needed;
+
+		needed = (max_len + buf->len - 1) / buf->len;
+		/* cap it at a reasonable 256, will be one page even for 4K */
+		needed = min(needed, 256);
+		if (nr_avail > needed)
+			nr_avail = needed;
+	}
+
+	if (nr_avail > UIO_MAXIOV)
+		nr_avail = UIO_MAXIOV;
+
+	/*
+	 * only alloc a bigger array if we know we have data to map, eg not
+	 * a speculative peek operation.
+	 */
+	if (nr_iovs == UIO_FASTIOV && nr_avail > nr_iovs && max_len) {
+		iov = kmalloc_array(nr_avail, sizeof(struct iovec), GFP_KERNEL);
+		if (unlikely(!iov))
+			return -ENOMEM;
+		nr_iovs = nr_avail;
+	} else if (nr_avail < nr_iovs) {
+		nr_iovs = nr_avail;
+	}
+
+	buf = io_ring_head_to_buf(bl, head);
+	req->buf_index = buf->bid;
+
+	i = 0;
+	while (nr_iovs--) {
+		void __user *ubuf;
+
+		/* truncate end piece, if needed */
+		if (max_len && buf->len > max_len)
+			buf->len = max_len;
+
+		ubuf = u64_to_user_ptr(buf->addr);
+		if (!access_ok(ubuf, buf->len))
+			break;
+		iov[i].iov_base = ubuf;
+		iov[i].iov_len = buf->len;
+		*out_len += buf->len;
+		i++;
+		head++;
+		if (max_len) {
+			max_len -= buf->len;
+			if (!max_len)
+				break;
+		}
+		buf = io_ring_head_to_buf(bl, head);
+	}
+
+	if (head == tail)
+		req->flags |= REQ_F_BL_EMPTY;
+
+	if (i) {
+		req->flags |= REQ_F_BUFFER_RING;
+		*iovs = iov;
+		return i;
+	}
+
+	if (iov != *iovs)
+		kfree(iov);
+	*iovs = NULL;
+	return -EFAULT;
+}
+
+int io_buffers_select(struct io_kiocb *req, struct iovec **iovs, int nr_iovs,
+		      size_t *out_len, unsigned int issue_flags)
+{
+	struct io_ring_ctx *ctx = req->ctx;
+	struct io_buffer_list *bl;
+	int ret = -ENOENT;
+
+	io_ring_submit_lock(ctx, issue_flags);
+	bl = io_buffer_get_list(ctx, req->buf_index);
+	if (unlikely(!bl))
+		goto out_unlock;
+
+	if (bl->is_mapped) {
+		ret = io_ring_buffers_peek(req, iovs, nr_iovs, out_len, bl);
+		/*
+		 * Don't recycle these buffers if we need to go through poll.
+		 * Nobody else can use them anyway, and holding on to provided
+		 * buffers for a send/write operation would happen on the app
+		 * side anyway with normal buffers. Besides, we already
+		 * committed them, they cannot be put back in the queue.
+		 */
+		req->buf_list = bl;
+		if (ret > 0) {
+			req->flags |= REQ_F_BL_NO_RECYCLE;
+			req->buf_list->head += ret;
+		}
+	} else {
+		ret = io_provided_buffers_select(req, out_len, bl, *iovs);
+	}
+out_unlock:
+	io_ring_submit_unlock(ctx, issue_flags);
+	return ret;
+}
+
+int io_buffers_peek(struct io_kiocb *req, struct iovec **iovs, int nr_iovs,
+		    size_t *out_len)
+{
+	struct io_ring_ctx *ctx = req->ctx;
+	struct io_buffer_list *bl;
+	int ret;
+
+	lockdep_assert_held(&ctx->uring_lock);
+
+	if (req->buf_list) {
+		bl = req->buf_list;
+	} else {
+		bl = io_buffer_get_list(ctx, req->buf_index);
+		if (unlikely(!bl))
+			return -ENOENT;
+	}
+
+	/* don't support multiple buffer selections for legacy */
+	if (!bl->is_mapped)
+		return io_provided_buffers_select(req, out_len, bl, *iovs);
+
+	ret = io_ring_buffers_peek(req, iovs, nr_iovs, out_len, bl);
+	if (ret > 0) {
+		req->buf_list = bl;
+		req->flags |= REQ_F_BUFFERS_COMMIT;
+	}
+	return ret;
+}
+
 static __cold int io_init_bl_list(struct io_ring_ctx *ctx)
 {
 	struct io_buffer_list *bl;
diff --git a/io_uring/kbuf.h b/io_uring/kbuf.h
index 5218bfd79e87..b4f48a144b73 100644
--- a/io_uring/kbuf.h
+++ b/io_uring/kbuf.h
@@ -43,6 +43,10 @@ struct io_buffer {
 
 void __user *io_buffer_select(struct io_kiocb *req, size_t *len,
 			      unsigned int issue_flags);
+int io_buffers_select(struct io_kiocb *req, struct iovec **iovs, int nr_iovs,
+		      size_t *out_len, unsigned int issue_flags);
+int io_buffers_peek(struct io_kiocb *req, struct iovec **iovs, int nr_iovs,
+		      size_t *out_len);
 void io_destroy_buffers(struct io_ring_ctx *ctx);
 
 int io_remove_buffers_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
@@ -74,7 +78,7 @@ static inline bool io_kbuf_recycle_ring(struct io_kiocb *req)
 	 */
 	if (req->buf_list) {
 		req->buf_index = req->buf_list->bgid;
-		req->flags &= ~REQ_F_BUFFER_RING;
+		req->flags &= ~(REQ_F_BUFFER_RING|REQ_F_BUFFERS_COMMIT);
 		return true;
 	}
 	return false;
@@ -98,11 +102,16 @@ static inline bool io_kbuf_recycle(struct io_kiocb *req, unsigned issue_flags)
 	return false;
 }
 
-static inline void __io_put_kbuf_ring(struct io_kiocb *req)
+static inline void __io_put_kbuf_ring(struct io_kiocb *req, int nr)
 {
-	if (req->buf_list) {
-		req->buf_index = req->buf_list->bgid;
-		req->buf_list->head++;
+	struct io_buffer_list *bl = req->buf_list;
+
+	if (bl) {
+		if (req->flags & REQ_F_BUFFERS_COMMIT) {
+			bl->head += nr;
+			req->flags &= ~REQ_F_BUFFERS_COMMIT;
+		}
+		req->buf_index = bl->bgid;
 	}
 	req->flags &= ~REQ_F_BUFFER_RING;
 }
@@ -111,7 +120,7 @@ static inline void __io_put_kbuf_list(struct io_kiocb *req,
 				      struct list_head *list)
 {
 	if (req->flags & REQ_F_BUFFER_RING) {
-		__io_put_kbuf_ring(req);
+		__io_put_kbuf_ring(req, 1);
 	} else {
 		req->buf_index = req->kbuf->bgid;
 		list_add(&req->kbuf->list, list);
@@ -133,8 +142,8 @@ static inline unsigned int io_put_kbuf_comp(struct io_kiocb *req)
 	return ret;
 }
 
-static inline unsigned int io_put_kbuf(struct io_kiocb *req,
-				       unsigned issue_flags)
+static inline unsigned int __io_put_kbufs(struct io_kiocb *req, int nbufs,
+					  unsigned issue_flags)
 {
 	unsigned int ret;
 
@@ -143,9 +152,21 @@ static inline unsigned int io_put_kbuf(struct io_kiocb *req,
 
 	ret = IORING_CQE_F_BUFFER | (req->buf_index << IORING_CQE_BUFFER_SHIFT);
 	if (req->flags & REQ_F_BUFFER_RING)
-		__io_put_kbuf_ring(req);
+		__io_put_kbuf_ring(req, nbufs);
 	else
 		__io_put_kbuf(req, issue_flags);
 	return ret;
 }
+
+static inline unsigned int io_put_kbuf(struct io_kiocb *req,
+				       unsigned issue_flags)
+{
+	return __io_put_kbufs(req, 1, issue_flags);
+}
+
+static inline unsigned int io_put_kbufs(struct io_kiocb *req, int nbufs,
+					unsigned issue_flags)
+{
+	return __io_put_kbufs(req, nbufs, issue_flags);
+}
 #endif
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH 4/7] io_uring/net: switch io_send() and io_send_zc() to using io_async_msghdr
  2024-03-08 23:34 [PATCHSET RFC 0/7] Send and receive bundles Jens Axboe
                   ` (2 preceding siblings ...)
  2024-03-08 23:34 ` [PATCH 3/7] io_uring/kbuf: add helpers for getting/peeking multiple buffers Jens Axboe
@ 2024-03-08 23:34 ` Jens Axboe
  2024-03-08 23:34 ` [PATCH 5/7] io_uring/net: support bundles for send Jens Axboe
                   ` (3 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Jens Axboe @ 2024-03-08 23:34 UTC (permalink / raw)
  To: io-uring; +Cc: asml.silence, dyudaken, dw, Jens Axboe

No functional changes in this patch, just in preparation for carrying
more state then we have now, if necessary. While unifying some of this
code, add a generic send setup prep handler that they can both use.

This gets rid of some manual msghdr and sockaddr on the stack, and makes
it look a bit more like the sendmsg/recvmsg variants. We can probably
unify a bit more on top of this going forward.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 io_uring/net.c   | 208 ++++++++++++++++++++++++-----------------------
 io_uring/opdef.c |   1 +
 2 files changed, 109 insertions(+), 100 deletions(-)

diff --git a/io_uring/net.c b/io_uring/net.c
index 566ef401f976..66318fbba805 100644
--- a/io_uring/net.c
+++ b/io_uring/net.c
@@ -332,33 +332,23 @@ static int io_sendmsg_copy_hdr(struct io_kiocb *req,
 
 int io_send_prep_async(struct io_kiocb *req)
 {
-	struct io_sr_msg *zc = io_kiocb_to_cmd(req, struct io_sr_msg);
+	struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
 	struct io_async_msghdr *io;
 	int ret;
 
-	if (!zc->addr || req_has_async_data(req))
+	if (req_has_async_data(req))
 		return 0;
 	io = io_msg_alloc_async_prep(req);
 	if (!io)
 		return -ENOMEM;
-	ret = move_addr_to_kernel(zc->addr, zc->addr_len, &io->addr);
-	return ret;
-}
-
-static int io_setup_async_addr(struct io_kiocb *req,
-			      struct sockaddr_storage *addr_storage,
-			      unsigned int issue_flags)
-{
-	struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
-	struct io_async_msghdr *io;
+	memset(&io->msg, 0, sizeof(io->msg));
 
-	if (!sr->addr || req_has_async_data(req))
-		return -EAGAIN;
-	io = io_msg_alloc_async(req, issue_flags);
-	if (!io)
-		return -ENOMEM;
-	memcpy(&io->addr, addr_storage, sizeof(io->addr));
-	return -EAGAIN;
+	ret = import_ubuf(ITER_SOURCE, sr->buf, sr->len, &io->msg.msg_iter);
+	if (unlikely(ret))
+		return ret;
+	if (sr->addr)
+		return move_addr_to_kernel(sr->addr, sr->addr_len, &io->addr);
+	return 0;
 }
 
 int io_sendmsg_prep_async(struct io_kiocb *req)
@@ -480,46 +470,72 @@ int io_sendmsg(struct io_kiocb *req, unsigned int issue_flags)
 	return IOU_OK;
 }
 
-int io_send(struct io_kiocb *req, unsigned int issue_flags)
+static struct io_async_msghdr *io_send_setup(struct io_kiocb *req,
+					     struct io_async_msghdr *stack_msg,
+					     unsigned int issue_flags)
 {
-	struct sockaddr_storage __address;
 	struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
-	size_t len = sr->len;
-	struct socket *sock;
-	unsigned int cflags;
-	struct msghdr msg;
-	unsigned flags;
-	int min_ret = 0;
+	struct io_async_msghdr *kmsg;
 	int ret;
 
-	msg.msg_name = NULL;
-	msg.msg_control = NULL;
-	msg.msg_controllen = 0;
-	msg.msg_namelen = 0;
-	msg.msg_ubuf = NULL;
-
-	if (sr->addr) {
-		if (req_has_async_data(req)) {
-			struct io_async_msghdr *io = req->async_data;
+	if (req_has_async_data(req)) {
+		kmsg = req->async_data;
+	} else {
+		kmsg = stack_msg;
+		kmsg->free_iov = NULL;
 
-			msg.msg_name = &io->addr;
-		} else {
-			ret = move_addr_to_kernel(sr->addr, sr->addr_len, &__address);
+		if (sr->addr) {
+			ret = move_addr_to_kernel(sr->addr, sr->addr_len,
+						  &kmsg->addr);
 			if (unlikely(ret < 0))
-				return ret;
-			msg.msg_name = (struct sockaddr *)&__address;
+				return ERR_PTR(ret);
+		}
+
+		if (!io_do_buffer_select(req)) {
+			ret = import_ubuf(ITER_SOURCE, sr->buf, sr->len,
+					  &kmsg->msg.msg_iter);
+			if (unlikely(ret))
+				return ERR_PTR(ret);
 		}
-		msg.msg_namelen = sr->addr_len;
 	}
 
+	if (sr->addr) {
+		kmsg->msg.msg_name = &kmsg->addr;
+		kmsg->msg.msg_namelen = sr->addr_len;
+	} else {
+		kmsg->msg.msg_name = NULL;
+		kmsg->msg.msg_namelen = 0;
+	}
+	kmsg->msg.msg_control = NULL;
+	kmsg->msg.msg_controllen = 0;
+	kmsg->msg.msg_ubuf = NULL;
+
 	if (!(req->flags & REQ_F_POLLED) &&
 	    (sr->flags & IORING_RECVSEND_POLL_FIRST))
-		return io_setup_async_addr(req, &__address, issue_flags);
+		return ERR_PTR(io_setup_async_msg(req, kmsg, issue_flags));
+
+	return kmsg;
+}
+
+int io_send(struct io_kiocb *req, unsigned int issue_flags)
+{
+	struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
+	struct io_async_msghdr iomsg, *kmsg;
+	size_t len = sr->len;
+	struct socket *sock;
+	unsigned int cflags;
+	unsigned flags;
+	int min_ret = 0;
+	int ret;
 
 	sock = sock_from_file(req->file);
 	if (unlikely(!sock))
 		return -ENOTSOCK;
 
+	kmsg = io_send_setup(req, &iomsg, issue_flags);
+	if (IS_ERR(kmsg))
+		return PTR_ERR(kmsg);
+
 	if (io_do_buffer_select(req)) {
 		void __user *buf;
 
@@ -528,31 +544,29 @@ int io_send(struct io_kiocb *req, unsigned int issue_flags)
 			return -ENOBUFS;
 		sr->buf = buf;
 		sr->len = len;
-	}
 
-	ret = import_ubuf(ITER_SOURCE, sr->buf, len, &msg.msg_iter);
-	if (unlikely(ret))
-		return ret;
+		ret = import_ubuf(ITER_SOURCE, sr->buf, len, &kmsg->msg.msg_iter);
+		if (unlikely(ret))
+			return ret;
+	}
 
 	flags = sr->msg_flags;
 	if (issue_flags & IO_URING_F_NONBLOCK)
 		flags |= MSG_DONTWAIT;
 	if (flags & MSG_WAITALL)
-		min_ret = iov_iter_count(&msg.msg_iter);
+		min_ret = iov_iter_count(&kmsg->msg.msg_iter);
 
 	flags &= ~MSG_INTERNAL_SENDMSG_FLAGS;
-	msg.msg_flags = flags;
-	ret = sock_sendmsg(sock, &msg);
+	kmsg->msg.msg_flags = flags;
+	ret = sock_sendmsg(sock, &kmsg->msg);
 	if (ret < min_ret) {
 		if (ret == -EAGAIN && (issue_flags & IO_URING_F_NONBLOCK))
-			return io_setup_async_addr(req, &__address, issue_flags);
+			return io_setup_async_msg(req, kmsg, issue_flags);
 
 		if (ret > 0 && io_net_retry(sock, flags)) {
-			sr->len -= ret;
-			sr->buf += ret;
 			sr->done_io += ret;
 			req->flags |= REQ_F_BL_NO_RECYCLE;
-			return io_setup_async_addr(req, &__address, issue_flags);
+			return io_setup_async_msg(req, kmsg, issue_flags);
 		}
 		if (ret == -ERESTARTSYS)
 			ret = -EINTR;
@@ -562,6 +576,7 @@ int io_send(struct io_kiocb *req, unsigned int issue_flags)
 		ret += sr->done_io;
 	else if (sr->done_io)
 		ret = sr->done_io;
+	io_req_msg_cleanup(req, kmsg, issue_flags);
 	cflags = io_put_kbuf(req, issue_flags);
 	io_req_set_res(req, ret, cflags);
 	return IOU_OK;
@@ -1165,11 +1180,35 @@ static int io_sg_from_iter(struct sock *sk, struct sk_buff *skb,
 	return ret;
 }
 
+static int io_send_zc_import(struct io_kiocb *req, struct io_async_msghdr *kmsg)
+{
+	struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
+	int ret;
+
+	if (sr->flags & IORING_RECVSEND_FIXED_BUF) {
+		ret = io_import_fixed(ITER_SOURCE, &kmsg->msg.msg_iter, req->imu,
+					(u64)(uintptr_t)sr->buf, sr->len);
+		if (unlikely(ret))
+			return ret;
+		kmsg->msg.sg_from_iter = io_sg_from_iter;
+	} else {
+		io_notif_set_extended(sr->notif);
+		ret = import_ubuf(ITER_SOURCE, sr->buf, sr->len, &kmsg->msg.msg_iter);
+		if (unlikely(ret))
+			return ret;
+		ret = io_notif_account_mem(sr->notif, sr->len);
+		if (unlikely(ret))
+			return ret;
+		kmsg->msg.sg_from_iter = io_sg_from_iter_iovec;
+	}
+
+	return ret;
+}
+
 int io_send_zc(struct io_kiocb *req, unsigned int issue_flags)
 {
-	struct sockaddr_storage __address;
 	struct io_sr_msg *zc = io_kiocb_to_cmd(req, struct io_sr_msg);
-	struct msghdr msg;
+	struct io_async_msghdr iomsg, *kmsg;
 	struct socket *sock;
 	unsigned msg_flags;
 	int ret, min_ret = 0;
@@ -1180,67 +1219,35 @@ int io_send_zc(struct io_kiocb *req, unsigned int issue_flags)
 	if (!test_bit(SOCK_SUPPORT_ZC, &sock->flags))
 		return -EOPNOTSUPP;
 
-	msg.msg_name = NULL;
-	msg.msg_control = NULL;
-	msg.msg_controllen = 0;
-	msg.msg_namelen = 0;
-
-	if (zc->addr) {
-		if (req_has_async_data(req)) {
-			struct io_async_msghdr *io = req->async_data;
+	kmsg = io_send_setup(req, &iomsg, issue_flags);
+	if (IS_ERR(kmsg))
+		return PTR_ERR(kmsg);
 
-			msg.msg_name = &io->addr;
-		} else {
-			ret = move_addr_to_kernel(zc->addr, zc->addr_len, &__address);
-			if (unlikely(ret < 0))
-				return ret;
-			msg.msg_name = (struct sockaddr *)&__address;
-		}
-		msg.msg_namelen = zc->addr_len;
-	}
-
-	if (!(req->flags & REQ_F_POLLED) &&
-	    (zc->flags & IORING_RECVSEND_POLL_FIRST))
-		return io_setup_async_addr(req, &__address, issue_flags);
-
-	if (zc->flags & IORING_RECVSEND_FIXED_BUF) {
-		ret = io_import_fixed(ITER_SOURCE, &msg.msg_iter, req->imu,
-					(u64)(uintptr_t)zc->buf, zc->len);
-		if (unlikely(ret))
-			return ret;
-		msg.sg_from_iter = io_sg_from_iter;
-	} else {
-		io_notif_set_extended(zc->notif);
-		ret = import_ubuf(ITER_SOURCE, zc->buf, zc->len, &msg.msg_iter);
+	if (!zc->done_io) {
+		ret = io_send_zc_import(req, kmsg);
 		if (unlikely(ret))
 			return ret;
-		ret = io_notif_account_mem(zc->notif, zc->len);
-		if (unlikely(ret))
-			return ret;
-		msg.sg_from_iter = io_sg_from_iter_iovec;
 	}
 
 	msg_flags = zc->msg_flags | MSG_ZEROCOPY;
 	if (issue_flags & IO_URING_F_NONBLOCK)
 		msg_flags |= MSG_DONTWAIT;
 	if (msg_flags & MSG_WAITALL)
-		min_ret = iov_iter_count(&msg.msg_iter);
+		min_ret = iov_iter_count(&kmsg->msg.msg_iter);
 	msg_flags &= ~MSG_INTERNAL_SENDMSG_FLAGS;
 
-	msg.msg_flags = msg_flags;
-	msg.msg_ubuf = &io_notif_to_data(zc->notif)->uarg;
-	ret = sock_sendmsg(sock, &msg);
+	kmsg->msg.msg_flags = msg_flags;
+	kmsg->msg.msg_ubuf = &io_notif_to_data(zc->notif)->uarg;
+	ret = sock_sendmsg(sock, &kmsg->msg);
 
 	if (unlikely(ret < min_ret)) {
 		if (ret == -EAGAIN && (issue_flags & IO_URING_F_NONBLOCK))
-			return io_setup_async_addr(req, &__address, issue_flags);
+			return io_setup_async_msg(req, kmsg, issue_flags);
 
-		if (ret > 0 && io_net_retry(sock, msg.msg_flags)) {
-			zc->len -= ret;
-			zc->buf += ret;
+		if (ret > 0 && io_net_retry(sock, kmsg->msg.msg_flags)) {
 			zc->done_io += ret;
 			req->flags |= REQ_F_BL_NO_RECYCLE;
-			return io_setup_async_addr(req, &__address, issue_flags);
+			return io_setup_async_msg(req, kmsg, issue_flags);
 		}
 		if (ret == -ERESTARTSYS)
 			ret = -EINTR;
@@ -1258,6 +1265,7 @@ int io_send_zc(struct io_kiocb *req, unsigned int issue_flags)
 	 */
 	if (!(issue_flags & IO_URING_F_UNLOCKED)) {
 		io_notif_flush(zc->notif);
+		io_netmsg_recycle(req, issue_flags);
 		req->flags &= ~REQ_F_NEED_CLEANUP;
 	}
 	io_req_set_res(req, ret, IORING_CQE_F_MORE);
diff --git a/io_uring/opdef.c b/io_uring/opdef.c
index 88fbe5cfd379..dd932d1058f6 100644
--- a/io_uring/opdef.c
+++ b/io_uring/opdef.c
@@ -603,6 +603,7 @@ const struct io_cold_def io_cold_defs[] = {
 		.name			= "SEND",
 #if defined(CONFIG_NET)
 		.async_size		= sizeof(struct io_async_msghdr),
+		.cleanup		= io_sendmsg_recvmsg_cleanup,
 		.fail			= io_sendrecv_fail,
 		.prep_async		= io_send_prep_async,
 #endif
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH 5/7] io_uring/net: support bundles for send
  2024-03-08 23:34 [PATCHSET RFC 0/7] Send and receive bundles Jens Axboe
                   ` (3 preceding siblings ...)
  2024-03-08 23:34 ` [PATCH 4/7] io_uring/net: switch io_send() and io_send_zc() to using io_async_msghdr Jens Axboe
@ 2024-03-08 23:34 ` Jens Axboe
  2024-03-08 23:34 ` [PATCH 6/7] io_uring/net: switch io_recv() to using io_async_msghdr Jens Axboe
                   ` (2 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Jens Axboe @ 2024-03-08 23:34 UTC (permalink / raw)
  To: io-uring; +Cc: asml.silence, dyudaken, dw, Jens Axboe

If IORING_OP_SEND is used with provided buffers, the caller may also
set IORING_RECVSEND_BUNDLE to turn it into a multi-buffer send. The idea
is that an application can fill outgoing buffers in a provided buffer
group, and then arm a single send that will service them all. Once
there are no more buffers to send, or if the requested length has
been sent, the request posts a single completion for all the buffers.

This only enables it for IORING_OP_SEND, IORING_OP_SENDMSG is coming
in a separate patch. However, this patch does do a lot of the prep
work that makes wiring up the sendmsg variant pretty trivial. They
share the prep side.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 include/uapi/linux/io_uring.h |   9 +++
 io_uring/net.c                | 138 +++++++++++++++++++++++++++++-----
 2 files changed, 129 insertions(+), 18 deletions(-)

diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 7bd10201a02b..3a0ff6da35de 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -351,11 +351,20 @@ enum io_uring_op {
  *				0 is reported if zerocopy was actually possible.
  *				IORING_NOTIF_USAGE_ZC_COPIED if data was copied
  *				(at least partially).
+ *
+ * IORING_RECVSEND_BUNDLE	Used with IOSQE_BUFFER_SELECT. If set, send will
+ *				grab as many buffers from the buffer group ID
+ *				given and send them all. The completion result
+ *				will be the number of buffers send, with the
+ *				starting buffer ID in cqe->flags as per usual
+ *				for provided buffer usage. The buffers will be
+ *				contigious from the starting buffer ID.
  */
 #define IORING_RECVSEND_POLL_FIRST	(1U << 0)
 #define IORING_RECV_MULTISHOT		(1U << 1)
 #define IORING_RECVSEND_FIXED_BUF	(1U << 2)
 #define IORING_SEND_ZC_REPORT_USAGE	(1U << 3)
+#define IORING_RECVSEND_BUNDLE		(1U << 4)
 
 /*
  * cqe.res for IORING_CQE_F_NOTIF if
diff --git a/io_uring/net.c b/io_uring/net.c
index 66318fbba805..0c4273005a68 100644
--- a/io_uring/net.c
+++ b/io_uring/net.c
@@ -370,6 +370,8 @@ void io_sendmsg_recvmsg_cleanup(struct io_kiocb *req)
 	kfree(io->free_iov);
 }
 
+#define SENDMSG_FLAGS (IORING_RECVSEND_POLL_FIRST | IORING_RECVSEND_BUNDLE)
+
 int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 {
 	struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
@@ -388,11 +390,20 @@ int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 	sr->umsg = u64_to_user_ptr(READ_ONCE(sqe->addr));
 	sr->len = READ_ONCE(sqe->len);
 	sr->flags = READ_ONCE(sqe->ioprio);
-	if (sr->flags & ~IORING_RECVSEND_POLL_FIRST)
+	if (sr->flags & ~SENDMSG_FLAGS)
 		return -EINVAL;
 	sr->msg_flags = READ_ONCE(sqe->msg_flags) | MSG_NOSIGNAL;
 	if (sr->msg_flags & MSG_DONTWAIT)
 		req->flags |= REQ_F_NOWAIT;
+	if (sr->flags & IORING_RECVSEND_BUNDLE) {
+		if (req->opcode == IORING_OP_SENDMSG)
+			return -EINVAL;
+		if (!(req->flags & REQ_F_BUFFER_SELECT))
+			return -EINVAL;
+		sr->msg_flags |= MSG_WAITALL;
+		sr->buf_group = req->buf_index;
+		req->buf_list = NULL;
+	}
 
 #ifdef CONFIG_COMPAT
 	if (req->ctx->compat)
@@ -412,6 +423,84 @@ static void io_req_msg_cleanup(struct io_kiocb *req,
 	io_netmsg_recycle(req, issue_flags);
 }
 
+/*
+ * For bundle completions, we need to figure out how many segments we consumed.
+ * A bundle could be using a single ITER_UBUF if that's all we mapped, or it
+ * could be using an ITER_IOVEC. If the latter, then if we consumed all of
+ * the segments, then it's a trivial questiont o answer. If we have residual
+ * data in the iter, then loop the segments to figure out how much we
+ * transferred.
+ */
+static int io_bundle_nbufs(struct io_async_msghdr *kmsg, int ret)
+{
+	struct iovec *iov;
+	int nbufs;
+
+	/* no data is always zero segments, and a ubuf is always 1 segment */
+	if (ret <= 0)
+		return 0;
+	if (iter_is_ubuf(&kmsg->msg.msg_iter))
+		return 1;
+
+	iov = kmsg->free_iov;
+	if (!iov)
+		iov = kmsg->fast_iov;
+
+	/* if all data was transferred, it's basic pointer math */
+	if (!iov_iter_count(&kmsg->msg.msg_iter))
+		return iter_iov(&kmsg->msg.msg_iter) - iov;
+
+	/* short transfer, count segments */
+	nbufs = 0;
+	do {
+		int this_len = min_t(int, iov[nbufs].iov_len, ret);
+
+		nbufs++;
+		ret -= this_len;
+	} while (ret);
+
+	return nbufs;
+}
+
+static inline bool io_send_finish(struct io_kiocb *req, int *ret,
+				  struct io_async_msghdr *kmsg,
+				  unsigned issue_flags)
+{
+	struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
+	bool bundle_finished = *ret <= 0;
+	unsigned int cflags;
+
+	if (!(sr->flags & IORING_RECVSEND_BUNDLE)) {
+		cflags = io_put_kbuf(req, issue_flags);
+		goto finish;
+	}
+
+	cflags = io_put_kbufs(req, io_bundle_nbufs(kmsg, *ret), issue_flags);
+
+	if (bundle_finished || req->flags & REQ_F_BL_EMPTY)
+		goto finish;
+
+	/*
+	 * Fill CQE for this receive and see if we should keep trying to
+	 * receive from this socket.
+	 */
+	if (io_fill_cqe_req_aux(req, issue_flags & IO_URING_F_COMPLETE_DEFER,
+				*ret, cflags | IORING_CQE_F_MORE)) {
+		io_mshot_prep_retry(req);
+		if (kmsg->free_iov) {
+			kfree(kmsg->free_iov);
+			kmsg->free_iov = NULL;
+		}
+		return false;
+	}
+
+	/* Otherwise stop bundle and use the current result. */
+finish:
+	io_req_set_res(req, *ret, cflags);
+	*ret = IOU_OK;
+	return true;
+}
+
 int io_sendmsg(struct io_kiocb *req, unsigned int issue_flags)
 {
 	struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
@@ -521,9 +610,7 @@ int io_send(struct io_kiocb *req, unsigned int issue_flags)
 {
 	struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
 	struct io_async_msghdr iomsg, *kmsg;
-	size_t len = sr->len;
 	struct socket *sock;
-	unsigned int cflags;
 	unsigned flags;
 	int min_ret = 0;
 	int ret;
@@ -536,24 +623,37 @@ int io_send(struct io_kiocb *req, unsigned int issue_flags)
 	if (IS_ERR(kmsg))
 		return PTR_ERR(kmsg);
 
+	flags = sr->msg_flags;
+	if (issue_flags & IO_URING_F_NONBLOCK)
+		flags |= MSG_DONTWAIT;
+
+retry_bundle:
 	if (io_do_buffer_select(req)) {
-		void __user *buf;
+		size_t len = min_not_zero(sr->len, (unsigned) INT_MAX);
+		int max_segs = ARRAY_SIZE(kmsg->fast_iov);
 
-		buf = io_buffer_select(req, &len, issue_flags);
-		if (!buf)
-			return -ENOBUFS;
-		sr->buf = buf;
-		sr->len = len;
+		if (!(sr->flags & IORING_RECVSEND_BUNDLE))
+			max_segs = 1;
 
-		ret = import_ubuf(ITER_SOURCE, sr->buf, len, &kmsg->msg.msg_iter);
-		if (unlikely(ret))
+		kmsg->free_iov = kmsg->fast_iov;
+		ret = io_buffers_select(req, &kmsg->free_iov, max_segs, &len,
+					issue_flags);
+		if (unlikely(ret < 0))
 			return ret;
+
+		sr->len = len;
+		iov_iter_init(&kmsg->msg.msg_iter, ITER_SOURCE, kmsg->free_iov,
+			      ret, len);
+		if (kmsg->free_iov == kmsg->fast_iov)
+			kmsg->free_iov = NULL;
 	}
 
-	flags = sr->msg_flags;
-	if (issue_flags & IO_URING_F_NONBLOCK)
-		flags |= MSG_DONTWAIT;
-	if (flags & MSG_WAITALL)
+	/*
+	 * If MSG_WAITALL is set, or this is a bundle send, then we need
+	 * the full amount. If just bundle is set, if we do a short send
+	 * then we complete the bundle sequence rather than continue on.
+	 */
+	if (flags & MSG_WAITALL || sr->flags & IORING_RECVSEND_BUNDLE)
 		min_ret = iov_iter_count(&kmsg->msg.msg_iter);
 
 	flags &= ~MSG_INTERNAL_SENDMSG_FLAGS;
@@ -576,10 +676,12 @@ int io_send(struct io_kiocb *req, unsigned int issue_flags)
 		ret += sr->done_io;
 	else if (sr->done_io)
 		ret = sr->done_io;
+
+	if (!io_send_finish(req, &ret, kmsg, issue_flags))
+		goto retry_bundle;
+
 	io_req_msg_cleanup(req, kmsg, issue_flags);
-	cflags = io_put_kbuf(req, issue_flags);
-	io_req_set_res(req, ret, cflags);
-	return IOU_OK;
+	return ret;
 }
 
 static int io_recvmsg_mshot_prep(struct io_kiocb *req,
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH 6/7] io_uring/net: switch io_recv() to using io_async_msghdr
  2024-03-08 23:34 [PATCHSET RFC 0/7] Send and receive bundles Jens Axboe
                   ` (4 preceding siblings ...)
  2024-03-08 23:34 ` [PATCH 5/7] io_uring/net: support bundles for send Jens Axboe
@ 2024-03-08 23:34 ` Jens Axboe
  2024-03-08 23:34 ` [PATCH 7/7] io_uring/net: support bundles for recv Jens Axboe
  2024-03-10 18:15 ` [PATCHSET RFC 0/7] Send and receive bundles Jens Axboe
  7 siblings, 0 replies; 9+ messages in thread
From: Jens Axboe @ 2024-03-08 23:34 UTC (permalink / raw)
  To: io-uring; +Cc: asml.silence, dyudaken, dw, Jens Axboe

No functional changes in this patch, just in preparation for carrying
more state then we have now, if necessary.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 io_uring/net.c   | 78 +++++++++++++++++++++++++++++-------------------
 io_uring/net.h   |  2 +-
 io_uring/opdef.c |  7 +++--
 3 files changed, 54 insertions(+), 33 deletions(-)

diff --git a/io_uring/net.c b/io_uring/net.c
index 0c4273005a68..07831e764068 100644
--- a/io_uring/net.c
+++ b/io_uring/net.c
@@ -330,7 +330,7 @@ static int io_sendmsg_copy_hdr(struct io_kiocb *req,
 	return ret;
 }
 
-int io_send_prep_async(struct io_kiocb *req)
+int io_sendrecv_prep_async(struct io_kiocb *req)
 {
 	struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
 	struct io_async_msghdr *io;
@@ -815,13 +815,13 @@ int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
  * again (for multishot).
  */
 static inline bool io_recv_finish(struct io_kiocb *req, int *ret,
-				  struct msghdr *msg, bool mshot_finished,
-				  unsigned issue_flags)
+				  struct io_async_msghdr *kmsg,
+				  bool mshot_finished, unsigned issue_flags)
 {
 	unsigned int cflags;
 
 	cflags = io_put_kbuf(req, issue_flags);
-	if (msg->msg_inq > 0)
+	if (kmsg->msg.msg_inq > 0)
 		cflags |= IORING_CQE_F_SOCK_NONEMPTY;
 
 	/*
@@ -836,7 +836,7 @@ static inline bool io_recv_finish(struct io_kiocb *req, int *ret,
 
 		io_mshot_prep_retry(req);
 		/* Known not-empty or unknown state, retry */
-		if (cflags & IORING_CQE_F_SOCK_NONEMPTY || msg->msg_inq < 0) {
+		if (cflags & IORING_CQE_F_SOCK_NONEMPTY || kmsg->msg.msg_inq < 0) {
 			if (sr->nr_multishot_loops++ < MULTISHOT_MAX_RETRY)
 				return false;
 			/* mshot retries exceeded, force a requeue */
@@ -1037,7 +1037,7 @@ int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
 	else
 		io_kbuf_recycle(req, issue_flags);
 
-	if (!io_recv_finish(req, &ret, &kmsg->msg, mshot_finished, issue_flags))
+	if (!io_recv_finish(req, &ret, kmsg, mshot_finished, issue_flags))
 		goto retry_multishot;
 
 	if (mshot_finished)
@@ -1051,29 +1051,42 @@ int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
 int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 {
 	struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
-	struct msghdr msg;
+	struct io_async_msghdr iomsg, *kmsg;
 	struct socket *sock;
 	unsigned flags;
 	int ret, min_ret = 0;
 	bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
 	size_t len = sr->len;
 
+	if (req_has_async_data(req)) {
+		kmsg = req->async_data;
+	} else {
+		kmsg = &iomsg;
+		kmsg->free_iov = NULL;
+		kmsg->msg.msg_name = NULL;
+		kmsg->msg.msg_namelen = 0;
+		kmsg->msg.msg_control = NULL;
+		kmsg->msg.msg_get_inq = 1;
+		kmsg->msg.msg_controllen = 0;
+		kmsg->msg.msg_iocb = NULL;
+		kmsg->msg.msg_ubuf = NULL;
+
+		if (!io_do_buffer_select(req)) {
+			ret = import_ubuf(ITER_DEST, sr->buf, sr->len,
+					  &kmsg->msg.msg_iter);
+			if (unlikely(ret))
+				return ret;
+		}
+	}
+
 	if (!(req->flags & REQ_F_POLLED) &&
 	    (sr->flags & IORING_RECVSEND_POLL_FIRST))
-		return -EAGAIN;
+		return io_setup_async_msg(req, kmsg, issue_flags);
 
 	sock = sock_from_file(req->file);
 	if (unlikely(!sock))
 		return -ENOTSOCK;
 
-	msg.msg_name = NULL;
-	msg.msg_namelen = 0;
-	msg.msg_control = NULL;
-	msg.msg_get_inq = 1;
-	msg.msg_controllen = 0;
-	msg.msg_iocb = NULL;
-	msg.msg_ubuf = NULL;
-
 	flags = sr->msg_flags;
 	if (force_nonblock)
 		flags |= MSG_DONTWAIT;
@@ -1087,22 +1100,23 @@ int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 			return -ENOBUFS;
 		sr->buf = buf;
 		sr->len = len;
+		ret = import_ubuf(ITER_DEST, sr->buf, sr->len,
+				  &kmsg->msg.msg_iter);
+		if (unlikely(ret))
+			goto out_free;
 	}
 
-	ret = import_ubuf(ITER_DEST, sr->buf, len, &msg.msg_iter);
-	if (unlikely(ret))
-		goto out_free;
-
-	msg.msg_inq = -1;
-	msg.msg_flags = 0;
+	kmsg->msg.msg_inq = -1;
+	kmsg->msg.msg_flags = 0;
 
 	if (flags & MSG_WAITALL)
-		min_ret = iov_iter_count(&msg.msg_iter);
+		min_ret = iov_iter_count(&kmsg->msg.msg_iter);
 
-	ret = sock_recvmsg(sock, &msg, flags);
+	ret = sock_recvmsg(sock, &kmsg->msg, flags);
 	if (ret < min_ret) {
 		if (ret == -EAGAIN && force_nonblock) {
-			if (issue_flags & IO_URING_F_MULTISHOT) {
+			ret = io_setup_async_msg(req, kmsg, issue_flags);
+			if (ret == -EAGAIN && issue_flags & IO_URING_F_MULTISHOT) {
 				io_kbuf_recycle(req, issue_flags);
 				return IOU_ISSUE_SKIP_COMPLETE;
 			}
@@ -1110,16 +1124,14 @@ int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 			return -EAGAIN;
 		}
 		if (ret > 0 && io_net_retry(sock, flags)) {
-			sr->len -= ret;
-			sr->buf += ret;
 			sr->done_io += ret;
 			req->flags |= REQ_F_BL_NO_RECYCLE;
-			return -EAGAIN;
+			return io_setup_async_msg(req, kmsg, issue_flags);
 		}
 		if (ret == -ERESTARTSYS)
 			ret = -EINTR;
 		req_set_fail(req);
-	} else if ((flags & MSG_WAITALL) && (msg.msg_flags & (MSG_TRUNC | MSG_CTRUNC))) {
+	} else if ((flags & MSG_WAITALL) && (kmsg->msg.msg_flags & (MSG_TRUNC | MSG_CTRUNC))) {
 out_free:
 		req_set_fail(req);
 	}
@@ -1131,9 +1143,15 @@ int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 	else
 		io_kbuf_recycle(req, issue_flags);
 
-	if (!io_recv_finish(req, &ret, &msg, ret <= 0, issue_flags))
+	if (!io_recv_finish(req, &ret, kmsg, ret <= 0, issue_flags)) {
+		if (kmsg->free_iov) {
+			kfree(kmsg->free_iov);
+			kmsg->free_iov = NULL;
+		}
 		goto retry_multishot;
+	}
 
+	io_req_msg_cleanup(req, kmsg, issue_flags);
 	return ret;
 }
 
diff --git a/io_uring/net.h b/io_uring/net.h
index 191009979bcb..5c1230f1aaf9 100644
--- a/io_uring/net.h
+++ b/io_uring/net.h
@@ -40,7 +40,7 @@ int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
 int io_sendmsg(struct io_kiocb *req, unsigned int issue_flags);
 
 int io_send(struct io_kiocb *req, unsigned int issue_flags);
-int io_send_prep_async(struct io_kiocb *req);
+int io_sendrecv_prep_async(struct io_kiocb *req);
 
 int io_recvmsg_prep_async(struct io_kiocb *req);
 int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
diff --git a/io_uring/opdef.c b/io_uring/opdef.c
index dd932d1058f6..352f743d6a69 100644
--- a/io_uring/opdef.c
+++ b/io_uring/opdef.c
@@ -605,13 +605,16 @@ const struct io_cold_def io_cold_defs[] = {
 		.async_size		= sizeof(struct io_async_msghdr),
 		.cleanup		= io_sendmsg_recvmsg_cleanup,
 		.fail			= io_sendrecv_fail,
-		.prep_async		= io_send_prep_async,
+		.prep_async		= io_sendrecv_prep_async,
 #endif
 	},
 	[IORING_OP_RECV] = {
 		.name			= "RECV",
 #if defined(CONFIG_NET)
+		.async_size		= sizeof(struct io_async_msghdr),
+		.cleanup		= io_sendmsg_recvmsg_cleanup,
 		.fail			= io_sendrecv_fail,
+		.prep_async		= io_sendrecv_prep_async,
 #endif
 	},
 	[IORING_OP_OPENAT2] = {
@@ -688,7 +691,7 @@ const struct io_cold_def io_cold_defs[] = {
 		.name			= "SEND_ZC",
 #if defined(CONFIG_NET)
 		.async_size		= sizeof(struct io_async_msghdr),
-		.prep_async		= io_send_prep_async,
+		.prep_async		= io_sendrecv_prep_async,
 		.cleanup		= io_send_zc_cleanup,
 		.fail			= io_sendrecv_fail,
 #endif
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH 7/7] io_uring/net: support bundles for recv
  2024-03-08 23:34 [PATCHSET RFC 0/7] Send and receive bundles Jens Axboe
                   ` (5 preceding siblings ...)
  2024-03-08 23:34 ` [PATCH 6/7] io_uring/net: switch io_recv() to using io_async_msghdr Jens Axboe
@ 2024-03-08 23:34 ` Jens Axboe
  2024-03-10 18:15 ` [PATCHSET RFC 0/7] Send and receive bundles Jens Axboe
  7 siblings, 0 replies; 9+ messages in thread
From: Jens Axboe @ 2024-03-08 23:34 UTC (permalink / raw)
  To: io-uring; +Cc: asml.silence, dyudaken, dw, Jens Axboe

If IORING_OP_RECV is used with provided buffers, the caller may also set
IORING_RECVSEND_BUNDLE to turn it into a multi-buffer recv. This grabs
buffers available and receives into them, posting a single completion for
all of it.

This can be used with multishot receive as well, or without it.

Now that both send and receive support bundles, add a feature flag for
it as well. If IORING_FEAT_RECVSEND_BUNDLE is set after registering the
ring, then the kernel supports bundles for recv and send.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 include/uapi/linux/io_uring.h |  15 +++--
 io_uring/io_uring.c           |   3 +-
 io_uring/net.c                | 119 ++++++++++++++++++++++++++--------
 3 files changed, 101 insertions(+), 36 deletions(-)

diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 3a0ff6da35de..9cf6c45149dd 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -352,13 +352,13 @@ enum io_uring_op {
  *				IORING_NOTIF_USAGE_ZC_COPIED if data was copied
  *				(at least partially).
  *
- * IORING_RECVSEND_BUNDLE	Used with IOSQE_BUFFER_SELECT. If set, send will
- *				grab as many buffers from the buffer group ID
- *				given and send them all. The completion result
- *				will be the number of buffers send, with the
- *				starting buffer ID in cqe->flags as per usual
- *				for provided buffer usage. The buffers will be
- *				contigious from the starting buffer ID.
+ * IORING_RECVSEND_BUNDLE	Used with IOSQE_BUFFER_SELECT. If set, send or
+ *				recv will grab as many buffers from the buffer
+ *				group ID given and send them all. The completion
+ *				result 	will be the number of buffers send, with
+ *				the starting buffer ID in cqe->flags as per
+ *				usual for provided buffer usage. The buffers
+ *				will be	contigious from the starting buffer ID.
  */
 #define IORING_RECVSEND_POLL_FIRST	(1U << 0)
 #define IORING_RECV_MULTISHOT		(1U << 1)
@@ -531,6 +531,7 @@ struct io_uring_params {
 #define IORING_FEAT_CQE_SKIP		(1U << 11)
 #define IORING_FEAT_LINKED_FILE		(1U << 12)
 #define IORING_FEAT_REG_REG_RING	(1U << 13)
+#define IORING_FEAT_RECVSEND_BUNDLE	(1U << 14)
 
 /*
  * io_uring_register(2) opcodes and arguments
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index cf348c33f485..112c21053e6f 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -3982,7 +3982,8 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
 			IORING_FEAT_POLL_32BITS | IORING_FEAT_SQPOLL_NONFIXED |
 			IORING_FEAT_EXT_ARG | IORING_FEAT_NATIVE_WORKERS |
 			IORING_FEAT_RSRC_TAGS | IORING_FEAT_CQE_SKIP |
-			IORING_FEAT_LINKED_FILE | IORING_FEAT_REG_REG_RING;
+			IORING_FEAT_LINKED_FILE | IORING_FEAT_REG_REG_RING |
+			IORING_FEAT_RECVSEND_BUNDLE;
 
 	if (copy_to_user(params, p, sizeof(*p))) {
 		ret = -EFAULT;
diff --git a/io_uring/net.c b/io_uring/net.c
index 07831e764068..c671ecb5b849 100644
--- a/io_uring/net.c
+++ b/io_uring/net.c
@@ -760,7 +760,8 @@ int io_recvmsg_prep_async(struct io_kiocb *req)
 	return ret;
 }
 
-#define RECVMSG_FLAGS (IORING_RECVSEND_POLL_FIRST | IORING_RECV_MULTISHOT)
+#define RECVMSG_FLAGS (IORING_RECVSEND_POLL_FIRST | IORING_RECV_MULTISHOT | \
+			IORING_RECVSEND_BUNDLE)
 
 int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 {
@@ -774,21 +775,14 @@ int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 	sr->umsg = u64_to_user_ptr(READ_ONCE(sqe->addr));
 	sr->len = READ_ONCE(sqe->len);
 	sr->flags = READ_ONCE(sqe->ioprio);
-	if (sr->flags & ~(RECVMSG_FLAGS))
+	if (sr->flags & ~RECVMSG_FLAGS)
 		return -EINVAL;
 	sr->msg_flags = READ_ONCE(sqe->msg_flags);
 	if (sr->msg_flags & MSG_DONTWAIT)
 		req->flags |= REQ_F_NOWAIT;
 	if (sr->msg_flags & MSG_ERRQUEUE)
 		req->flags |= REQ_F_CLEAR_POLLIN;
-	if (sr->flags & IORING_RECV_MULTISHOT) {
-		if (!(req->flags & REQ_F_BUFFER_SELECT))
-			return -EINVAL;
-		if (sr->msg_flags & MSG_WAITALL)
-			return -EINVAL;
-		if (req->opcode == IORING_OP_RECV && sr->len)
-			return -EINVAL;
-		req->flags |= REQ_F_APOLL_MULTISHOT;
+	if (req->flags & REQ_F_BUFFER_SELECT) {
 		/*
 		 * Store the buffer group for this multishot receive separately,
 		 * as if we end up doing an io-wq based issue that selects a
@@ -798,6 +792,20 @@ int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 		 * restore it.
 		 */
 		sr->buf_group = req->buf_index;
+		req->buf_list = NULL;
+	}
+	if (sr->flags & IORING_RECV_MULTISHOT) {
+		if (!(req->flags & REQ_F_BUFFER_SELECT))
+			return -EINVAL;
+		if (sr->msg_flags & MSG_WAITALL)
+			return -EINVAL;
+		if (req->opcode == IORING_OP_RECV && sr->len)
+			return -EINVAL;
+		req->flags |= REQ_F_APOLL_MULTISHOT;
+	}
+	if (sr->flags & IORING_RECVSEND_BUNDLE) {
+		if (req->opcode == IORING_OP_RECVMSG)
+			return -EINVAL;
 	}
 
 #ifdef CONFIG_COMPAT
@@ -818,12 +826,22 @@ static inline bool io_recv_finish(struct io_kiocb *req, int *ret,
 				  struct io_async_msghdr *kmsg,
 				  bool mshot_finished, unsigned issue_flags)
 {
+	struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
 	unsigned int cflags;
 
-	cflags = io_put_kbuf(req, issue_flags);
+	if (sr->flags & IORING_RECVSEND_BUNDLE)
+		cflags = io_put_kbufs(req, io_bundle_nbufs(kmsg, *ret),
+				      issue_flags);
+	else
+		cflags = io_put_kbuf(req, issue_flags);
+
 	if (kmsg->msg.msg_inq > 0)
 		cflags |= IORING_CQE_F_SOCK_NONEMPTY;
 
+	/* bundle with no more immediate buffers, we're done */
+	if (sr->flags & IORING_RECVSEND_BUNDLE && req->flags & REQ_F_BL_EMPTY)
+		goto finish;
+
 	/*
 	 * Fill CQE for this receive and see if we should keep trying to
 	 * receive from this socket.
@@ -831,14 +849,18 @@ static inline bool io_recv_finish(struct io_kiocb *req, int *ret,
 	if ((req->flags & REQ_F_APOLL_MULTISHOT) && !mshot_finished &&
 	    io_fill_cqe_req_aux(req, issue_flags & IO_URING_F_COMPLETE_DEFER,
 				*ret, cflags | IORING_CQE_F_MORE)) {
-		struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
 		int mshot_retry_ret = IOU_ISSUE_SKIP_COMPLETE;
 
 		io_mshot_prep_retry(req);
 		/* Known not-empty or unknown state, retry */
 		if (cflags & IORING_CQE_F_SOCK_NONEMPTY || kmsg->msg.msg_inq < 0) {
-			if (sr->nr_multishot_loops++ < MULTISHOT_MAX_RETRY)
+			if (sr->nr_multishot_loops++ < MULTISHOT_MAX_RETRY) {
+				if (kmsg->free_iov) {
+					kfree(kmsg->free_iov);
+					kmsg->free_iov = NULL;
+				}
 				return false;
+			}
 			/* mshot retries exceeded, force a requeue */
 			sr->nr_multishot_loops = 0;
 			mshot_retry_ret = IOU_REQUEUE;
@@ -851,6 +873,7 @@ static inline bool io_recv_finish(struct io_kiocb *req, int *ret,
 	}
 
 	/* Finish the request / stop multishot. */
+finish:
 	io_req_set_res(req, *ret, cflags);
 
 	if (issue_flags & IO_URING_F_MULTISHOT)
@@ -1048,6 +1071,58 @@ int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
 	return ret;
 }
 
+static int io_recv_buf_select(struct io_kiocb *req, struct io_async_msghdr *kmsg,
+			      size_t *len, unsigned int issue_flags)
+{
+	struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
+	int ret;
+
+	/*
+	 * If the ring isn't locked, then don't use the peek interface
+	 * to grab multiple buffers as we will lock/unlock between
+	 * this selection and posting the buffers.
+	 */
+	if (!(issue_flags & IO_URING_F_UNLOCKED) &&
+	    sr->flags & IORING_RECVSEND_BUNDLE) {
+		struct iovec *iov = kmsg->fast_iov;
+
+		*len = 0;
+		if (kmsg->msg.msg_inq > 0) {
+			*len = kmsg->msg.msg_inq;
+			if (sr->len && *len > sr->len)
+				*len = sr->len;
+		}
+		ret = io_buffers_peek(req, &iov, ARRAY_SIZE(kmsg->fast_iov), len);
+		if (unlikely(ret < 0))
+			return ret;
+
+		if (ret == 1) {
+			sr->buf = iov->iov_base;
+			sr->len = iov->iov_len;
+			goto ubuf;
+		}
+		iov_iter_init(&kmsg->msg.msg_iter, ITER_DEST, iov, ret, *len);
+		if (iov != kmsg->fast_iov)
+			kmsg->free_iov = iov;
+	} else {
+		void __user *buf;
+
+		*len = sr->len;
+		buf = io_buffer_select(req, len, issue_flags);
+		if (!buf)
+			return -ENOBUFS;
+		sr->buf = buf;
+		sr->len = *len;
+ubuf:
+		ret = import_ubuf(ITER_DEST, sr->buf, sr->len,
+				  &kmsg->msg.msg_iter);
+		if (unlikely(ret))
+			return ret;
+	}
+
+	return 0;
+}
+
 int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 {
 	struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
@@ -1093,17 +1168,10 @@ int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 
 retry_multishot:
 	if (io_do_buffer_select(req)) {
-		void __user *buf;
-
-		buf = io_buffer_select(req, &len, issue_flags);
-		if (!buf)
-			return -ENOBUFS;
-		sr->buf = buf;
-		sr->len = len;
-		ret = import_ubuf(ITER_DEST, sr->buf, sr->len,
-				  &kmsg->msg.msg_iter);
+		ret = io_recv_buf_select(req, kmsg, &len, issue_flags);
 		if (unlikely(ret))
 			goto out_free;
+		sr->buf = NULL;
 	}
 
 	kmsg->msg.msg_inq = -1;
@@ -1143,13 +1211,8 @@ int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 	else
 		io_kbuf_recycle(req, issue_flags);
 
-	if (!io_recv_finish(req, &ret, kmsg, ret <= 0, issue_flags)) {
-		if (kmsg->free_iov) {
-			kfree(kmsg->free_iov);
-			kmsg->free_iov = NULL;
-		}
+	if (!io_recv_finish(req, &ret, kmsg, ret <= 0, issue_flags))
 		goto retry_multishot;
-	}
 
 	io_req_msg_cleanup(req, kmsg, issue_flags);
 	return ret;
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* Re: [PATCHSET RFC 0/7] Send and receive bundles
  2024-03-08 23:34 [PATCHSET RFC 0/7] Send and receive bundles Jens Axboe
                   ` (6 preceding siblings ...)
  2024-03-08 23:34 ` [PATCH 7/7] io_uring/net: support bundles for recv Jens Axboe
@ 2024-03-10 18:15 ` Jens Axboe
  7 siblings, 0 replies; 9+ messages in thread
From: Jens Axboe @ 2024-03-10 18:15 UTC (permalink / raw)
  To: io-uring; +Cc: asml.silence, dyudaken, dw

[-- Attachment #1: Type: text/plain, Size: 3274 bytes --]

On 3/8/24 4:34 PM, Jens Axboe wrote:
> Hi,
> 
> I went back to the drawing board a bit on the send multishot, and this
> is what came out.
> 
> First support was added for provided buffers for send. This works like
> provided buffers for recv/recvmsg, and the intent here to use the buffer
> ring queue as an outgoing sequence for sending.
> 
> But the real meat is adding support for picking multiple buffers at the
> time, what I dubbed "bundles" here. Rather than just pick a single buffer
> for send, it can pick a bunch of them and send them in one go. The idea
> here is that the expensive part of a request is not the sqe issue, it's
> the fact that we have to do each buffer separately. That entails calling
> all the way down into the networking stack, locking the socket, checking
> what needs doing afterwards (like flushing the backlog), unlocking the
> socket, etc. If we have an outgoing send queue, then pick what buffers
> we have (up to a certain cap), and pass them to the networking stack in
> one go.
> 
> Bundles must be used with provided buffers, obviously. At completion
> time, they pass the starting buffer ID in cqe->flags, like any other
> provided buffer completion. cqe->res is the TOTAL number of bytes sent,
> so it's up to the application to iterate buffers to figure out how many
> completed. This part is trivial. I'll push the proxy changes out soon,
> just need to cleanup them up as I did the sendmsg bundling too and would
> love to compare.
> 
> With that in place, I added support for recv for bundles as well. Exactly
> the same as the send side - if we have a known amount of data pending,
> pick enough buffers to satisfy the receive and post a single completion
> for that round. Buffer ID in cqe->flags, cqe->res is the total number of
> buffers sent. Receive can be used with multishot as well - fire off one
> multishot recv, and keep getting big completions. Unfortunately, recvmsg
> multishot is just not as efficient as recv, as it carries additional
> data that needs copying. recv multishot with bundles provide a good
> alternative to recvmsg, if all you need is more than one range of data.
> I'll compare these too soon as well.
> 
> This is obviously a bigger win for smaller packets than for large ones,
> as the overall cost of entering sys_sendmsg/sys_recvmsg() in terms of
> throughput decreases as the packet size increases. For the extreme end,
> using 32b packets, performance increases substantially. Runtime for
> proxying 32b packets between three machines on a 10G link for the test:
> 
> Send ring:		3462 msec		1183Mbit
> Send ring + bundles	 844 msec		4853Mbit
> 
> and bundles reach 100% bandwidth at 80b of packet size, compared to send
> ring alone needing 320b to reach 95% of bandwidth (I didn't redo that
> test so don't have the 100% number).

Re-did all the numbers, see attached graph. tldr is that send bundles OR
sendmsg are by far the fastest, they hit line rate very quickly. This is
expected as both of these send methods can pack more than a single
packet into a send operation, reducing the cost of the smaller payloads.
Looking at profiles, sendmsg does use ~3.5% more CPU for the same work.
Which is also expected, it needs to do a bit more work to accomplish the
same.

-- 
Jens Axboe

[-- Attachment #2: io_uring-10G-test.png --]
[-- Type: image/png, Size: 64083 bytes --]

^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2024-03-10 18:15 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2024-03-08 23:34 [PATCHSET RFC 0/7] Send and receive bundles Jens Axboe
2024-03-08 23:34 ` [PATCH 1/7] io_uring/net: add generic multishot retry helper Jens Axboe
2024-03-08 23:34 ` [PATCH 2/7] io_uring/net: add provided buffer support for IORING_OP_SEND Jens Axboe
2024-03-08 23:34 ` [PATCH 3/7] io_uring/kbuf: add helpers for getting/peeking multiple buffers Jens Axboe
2024-03-08 23:34 ` [PATCH 4/7] io_uring/net: switch io_send() and io_send_zc() to using io_async_msghdr Jens Axboe
2024-03-08 23:34 ` [PATCH 5/7] io_uring/net: support bundles for send Jens Axboe
2024-03-08 23:34 ` [PATCH 6/7] io_uring/net: switch io_recv() to using io_async_msghdr Jens Axboe
2024-03-08 23:34 ` [PATCH 7/7] io_uring/net: support bundles for recv Jens Axboe
2024-03-10 18:15 ` [PATCHSET RFC 0/7] Send and receive bundles Jens Axboe

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.