io-uring.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCHSET v3 0/12] Add support for ring mapped provided buffers
@ 2022-04-30 20:50 Jens Axboe
  2022-04-30 20:50 ` [PATCH 01/12] io_uring: kill io_recv_buffer_select() wrapper Jens Axboe
                   ` (11 more replies)
  0 siblings, 12 replies; 16+ messages in thread
From: Jens Axboe @ 2022-04-30 20:50 UTC (permalink / raw)
  To: io-uring

Hi,

This series builds to adding support for a different way of doing
provided buffers. The interesting bits here are patch 12, which also has
some performance numbers an an explanation of it.

Patches 1..5 are cleanups that should just applied separately, I
think the clean up the existing code quite nicely.

Patch 6 is a generic optimization for the buffer list lookups.

Patch 7 has the caller use already selected buffer information rather
than rely on io_buffer_select() returning it for REQ_F_BUFFER_SELECTED.

Patch 8 adds NOP support for provided buffers, just so that we can
benchmark the last change.

Patches 9..11 are prep for patch 12.

Patch 12 finally adds the feature.

This passes the full liburing suite, and various test cases I adopted
to use ring provided buffers.

v3:	- Speedups
	- Add patch unifying how io_buffer_select() is called when a buffer
	  has already been selected.
	- Build on above change to ensure we handle async + poll retry
	  correctly.

Can also be found in my git repo, for-5.19/io_uring-pbuf branch:

https://git.kernel.dk/cgit/linux-block/log/?h=for-5.19/io_uring-pbuf

 fs/io_uring.c                 | 475 +++++++++++++++++++++++++---------
 include/uapi/linux/io_uring.h |  26 ++
 2 files changed, 382 insertions(+), 119 deletions(-)

-- 
Jens Axboe



^ permalink raw reply	[flat|nested] 16+ messages in thread

* [PATCH 01/12] io_uring: kill io_recv_buffer_select() wrapper
  2022-04-30 20:50 [PATCHSET v3 0/12] Add support for ring mapped provided buffers Jens Axboe
@ 2022-04-30 20:50 ` Jens Axboe
  2022-04-30 20:50 ` [PATCH 02/12] io_uring: make io_buffer_select() return the user address directly Jens Axboe
                   ` (10 subsequent siblings)
  11 siblings, 0 replies; 16+ messages in thread
From: Jens Axboe @ 2022-04-30 20:50 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe

It's just a thin wrapper around io_buffer_select(), get rid of it.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 12 ++----------
 1 file changed, 2 insertions(+), 10 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index dfebbf3a272a..12f61ce429dc 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -5897,14 +5897,6 @@ static int io_recvmsg_copy_hdr(struct io_kiocb *req,
 	return __io_recvmsg_copy_hdr(req, iomsg);
 }
 
-static struct io_buffer *io_recv_buffer_select(struct io_kiocb *req,
-					       unsigned int issue_flags)
-{
-	struct io_sr_msg *sr = &req->sr_msg;
-
-	return io_buffer_select(req, &sr->len, sr->bgid, issue_flags);
-}
-
 static int io_recvmsg_prep_async(struct io_kiocb *req)
 {
 	int ret;
@@ -5961,7 +5953,7 @@ static int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
 	}
 
 	if (req->flags & REQ_F_BUFFER_SELECT) {
-		kbuf = io_recv_buffer_select(req, issue_flags);
+		kbuf = io_buffer_select(req, &sr->len, sr->bgid, issue_flags);
 		if (IS_ERR(kbuf))
 			return PTR_ERR(kbuf);
 		kmsg->fast_iov[0].iov_base = u64_to_user_ptr(kbuf->addr);
@@ -6022,7 +6014,7 @@ static int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 		return -ENOTSOCK;
 
 	if (req->flags & REQ_F_BUFFER_SELECT) {
-		kbuf = io_recv_buffer_select(req, issue_flags);
+		kbuf = io_buffer_select(req, &sr->len, sr->bgid, issue_flags);
 		if (IS_ERR(kbuf))
 			return PTR_ERR(kbuf);
 		buf = u64_to_user_ptr(kbuf->addr);
-- 
2.35.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH 02/12] io_uring: make io_buffer_select() return the user address directly
  2022-04-30 20:50 [PATCHSET v3 0/12] Add support for ring mapped provided buffers Jens Axboe
  2022-04-30 20:50 ` [PATCH 01/12] io_uring: kill io_recv_buffer_select() wrapper Jens Axboe
@ 2022-04-30 20:50 ` Jens Axboe
  2022-04-30 20:50 ` [PATCH 03/12] io_uring: kill io_rw_buffer_select() wrapper Jens Axboe
                   ` (9 subsequent siblings)
  11 siblings, 0 replies; 16+ messages in thread
From: Jens Axboe @ 2022-04-30 20:50 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe

There's no point in having callers provide a kbuf, we're just returning
the address anyway.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 42 ++++++++++++++++++------------------------
 1 file changed, 18 insertions(+), 24 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 12f61ce429dc..19dfa974ebcf 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -3571,15 +3571,15 @@ static void io_buffer_add_list(struct io_ring_ctx *ctx,
 	list_add(&bl->list, list);
 }
 
-static struct io_buffer *io_buffer_select(struct io_kiocb *req, size_t *len,
-					  int bgid, unsigned int issue_flags)
+static void __user *io_buffer_select(struct io_kiocb *req, size_t *len,
+				     int bgid, unsigned int issue_flags)
 {
 	struct io_buffer *kbuf = req->kbuf;
 	struct io_ring_ctx *ctx = req->ctx;
 	struct io_buffer_list *bl;
 
 	if (req->flags & REQ_F_BUFFER_SELECTED)
-		return kbuf;
+		return u64_to_user_ptr(kbuf->addr);
 
 	io_ring_submit_lock(req->ctx, issue_flags);
 
@@ -3591,25 +3591,18 @@ static struct io_buffer *io_buffer_select(struct io_kiocb *req, size_t *len,
 			*len = kbuf->len;
 		req->flags |= REQ_F_BUFFER_SELECTED;
 		req->kbuf = kbuf;
-	} else {
-		kbuf = ERR_PTR(-ENOBUFS);
+		io_ring_submit_unlock(req->ctx, issue_flags);
+		return u64_to_user_ptr(kbuf->addr);
 	}
 
 	io_ring_submit_unlock(req->ctx, issue_flags);
-	return kbuf;
+	return ERR_PTR(-ENOBUFS);
 }
 
 static void __user *io_rw_buffer_select(struct io_kiocb *req, size_t *len,
 					unsigned int issue_flags)
 {
-	struct io_buffer *kbuf;
-	u16 bgid;
-
-	bgid = req->buf_index;
-	kbuf = io_buffer_select(req, len, bgid, issue_flags);
-	if (IS_ERR(kbuf))
-		return kbuf;
-	return u64_to_user_ptr(kbuf->addr);
+	return io_buffer_select(req, len, req->buf_index, issue_flags);
 }
 
 #ifdef CONFIG_COMPAT
@@ -5934,7 +5927,6 @@ static int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
 	struct io_async_msghdr iomsg, *kmsg;
 	struct io_sr_msg *sr = &req->sr_msg;
 	struct socket *sock;
-	struct io_buffer *kbuf;
 	unsigned flags;
 	int ret, min_ret = 0;
 	bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
@@ -5953,10 +5945,12 @@ static int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
 	}
 
 	if (req->flags & REQ_F_BUFFER_SELECT) {
-		kbuf = io_buffer_select(req, &sr->len, sr->bgid, issue_flags);
-		if (IS_ERR(kbuf))
-			return PTR_ERR(kbuf);
-		kmsg->fast_iov[0].iov_base = u64_to_user_ptr(kbuf->addr);
+		void __user *buf;
+
+		buf = io_buffer_select(req, &sr->len, sr->bgid, issue_flags);
+		if (IS_ERR(buf))
+			return PTR_ERR(buf);
+		kmsg->fast_iov[0].iov_base = buf;
 		kmsg->fast_iov[0].iov_len = req->sr_msg.len;
 		iov_iter_init(&kmsg->msg.msg_iter, READ, kmsg->fast_iov,
 				1, req->sr_msg.len);
@@ -5999,7 +5993,6 @@ static int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
 
 static int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 {
-	struct io_buffer *kbuf;
 	struct io_sr_msg *sr = &req->sr_msg;
 	struct msghdr msg;
 	void __user *buf = sr->buf;
@@ -6014,10 +6007,11 @@ static int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 		return -ENOTSOCK;
 
 	if (req->flags & REQ_F_BUFFER_SELECT) {
-		kbuf = io_buffer_select(req, &sr->len, sr->bgid, issue_flags);
-		if (IS_ERR(kbuf))
-			return PTR_ERR(kbuf);
-		buf = u64_to_user_ptr(kbuf->addr);
+		void __user *buf;
+
+		buf = io_buffer_select(req, &sr->len, sr->bgid, issue_flags);
+		if (IS_ERR(buf))
+			return PTR_ERR(buf);
 	}
 
 	ret = import_single_range(READ, buf, sr->len, &iov, &msg.msg_iter);
-- 
2.35.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH 03/12] io_uring: kill io_rw_buffer_select() wrapper
  2022-04-30 20:50 [PATCHSET v3 0/12] Add support for ring mapped provided buffers Jens Axboe
  2022-04-30 20:50 ` [PATCH 01/12] io_uring: kill io_recv_buffer_select() wrapper Jens Axboe
  2022-04-30 20:50 ` [PATCH 02/12] io_uring: make io_buffer_select() return the user address directly Jens Axboe
@ 2022-04-30 20:50 ` Jens Axboe
  2022-04-30 20:50 ` [PATCH 04/12] io_uring: ignore ->buf_index if REQ_F_BUFFER_SELECT isn't set Jens Axboe
                   ` (8 subsequent siblings)
  11 siblings, 0 replies; 16+ messages in thread
From: Jens Axboe @ 2022-04-30 20:50 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe

After the recent changes, this is direct call to io_buffer_select()
anyway. With this change, there are no wrappers left for provided
buffer selection.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 15 +++++----------
 1 file changed, 5 insertions(+), 10 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 19dfa974ebcf..cdb23f9861c5 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -3599,12 +3599,6 @@ static void __user *io_buffer_select(struct io_kiocb *req, size_t *len,
 	return ERR_PTR(-ENOBUFS);
 }
 
-static void __user *io_rw_buffer_select(struct io_kiocb *req, size_t *len,
-					unsigned int issue_flags)
-{
-	return io_buffer_select(req, len, req->buf_index, issue_flags);
-}
-
 #ifdef CONFIG_COMPAT
 static ssize_t io_compat_import(struct io_kiocb *req, struct iovec *iov,
 				unsigned int issue_flags)
@@ -3612,7 +3606,7 @@ static ssize_t io_compat_import(struct io_kiocb *req, struct iovec *iov,
 	struct compat_iovec __user *uiov;
 	compat_ssize_t clen;
 	void __user *buf;
-	ssize_t len;
+	size_t len;
 
 	uiov = u64_to_user_ptr(req->rw.addr);
 	if (!access_ok(uiov, sizeof(*uiov)))
@@ -3623,7 +3617,7 @@ static ssize_t io_compat_import(struct io_kiocb *req, struct iovec *iov,
 		return -EINVAL;
 
 	len = clen;
-	buf = io_rw_buffer_select(req, &len, issue_flags);
+	buf = io_buffer_select(req, &len, req->buf_index, issue_flags);
 	if (IS_ERR(buf))
 		return PTR_ERR(buf);
 	iov[0].iov_base = buf;
@@ -3645,7 +3639,7 @@ static ssize_t __io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov,
 	len = iov[0].iov_len;
 	if (len < 0)
 		return -EINVAL;
-	buf = io_rw_buffer_select(req, &len, issue_flags);
+	buf = io_buffer_select(req, &len, req->buf_index, issue_flags);
 	if (IS_ERR(buf))
 		return PTR_ERR(buf);
 	iov[0].iov_base = buf;
@@ -3701,7 +3695,8 @@ static struct iovec *__io_import_iovec(int rw, struct io_kiocb *req,
 
 	if (opcode == IORING_OP_READ || opcode == IORING_OP_WRITE) {
 		if (req->flags & REQ_F_BUFFER_SELECT) {
-			buf = io_rw_buffer_select(req, &sqe_len, issue_flags);
+			buf = io_buffer_select(req, &sqe_len, req->buf_index,
+						issue_flags);
 			if (IS_ERR(buf))
 				return ERR_CAST(buf);
 			req->rw.len = sqe_len;
-- 
2.35.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH 04/12] io_uring: ignore ->buf_index if REQ_F_BUFFER_SELECT isn't set
  2022-04-30 20:50 [PATCHSET v3 0/12] Add support for ring mapped provided buffers Jens Axboe
                   ` (2 preceding siblings ...)
  2022-04-30 20:50 ` [PATCH 03/12] io_uring: kill io_rw_buffer_select() wrapper Jens Axboe
@ 2022-04-30 20:50 ` Jens Axboe
  2022-04-30 20:50 ` [PATCH 05/12] io_uring: always use req->buf_index for the provided buffer group Jens Axboe
                   ` (7 subsequent siblings)
  11 siblings, 0 replies; 16+ messages in thread
From: Jens Axboe @ 2022-04-30 20:50 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe

There's no point in validity checking buf_index if the request doesn't
have REQ_F_BUFFER_SELECT set, as we will never use it for that case.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 4 ----
 1 file changed, 4 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index cdb23f9861c5..a570f47e3f76 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -3686,10 +3686,6 @@ static struct iovec *__io_import_iovec(int rw, struct io_kiocb *req,
 		return NULL;
 	}
 
-	/* buffer index only valid with fixed read/write, or buffer select  */
-	if (unlikely(req->buf_index && !(req->flags & REQ_F_BUFFER_SELECT)))
-		return ERR_PTR(-EINVAL);
-
 	buf = u64_to_user_ptr(req->rw.addr);
 	sqe_len = req->rw.len;
 
-- 
2.35.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH 05/12] io_uring: always use req->buf_index for the provided buffer group
  2022-04-30 20:50 [PATCHSET v3 0/12] Add support for ring mapped provided buffers Jens Axboe
                   ` (3 preceding siblings ...)
  2022-04-30 20:50 ` [PATCH 04/12] io_uring: ignore ->buf_index if REQ_F_BUFFER_SELECT isn't set Jens Axboe
@ 2022-04-30 20:50 ` Jens Axboe
  2022-04-30 20:50 ` [PATCH 06/12] io_uring: cache last io_buffer_list lookup Jens Axboe
                   ` (6 subsequent siblings)
  11 siblings, 0 replies; 16+ messages in thread
From: Jens Axboe @ 2022-04-30 20:50 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe

The read/write opcodes use it already, but the recv/recvmsg do not. If
we switch them over and read and validate this at init time while we're
checking if the opcode supports it anyway, then we can do it in one spot
and we don't have to pass in a separate group ID for io_buffer_select().

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 26 +++++++++++++-------------
 1 file changed, 13 insertions(+), 13 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index a570f47e3f76..41205548180d 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -644,7 +644,6 @@ struct io_sr_msg {
 		void __user			*buf;
 	};
 	int				msg_flags;
-	int				bgid;
 	size_t				len;
 	size_t				done_io;
 };
@@ -3412,6 +3411,7 @@ static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 	req->rw.addr = READ_ONCE(sqe->addr);
 	req->rw.len = READ_ONCE(sqe->len);
 	req->rw.flags = READ_ONCE(sqe->rw_flags);
+	/* used for fixed read/write too - just read unconditionally */
 	req->buf_index = READ_ONCE(sqe->buf_index);
 	return 0;
 }
@@ -3572,7 +3572,7 @@ static void io_buffer_add_list(struct io_ring_ctx *ctx,
 }
 
 static void __user *io_buffer_select(struct io_kiocb *req, size_t *len,
-				     int bgid, unsigned int issue_flags)
+				     unsigned int issue_flags)
 {
 	struct io_buffer *kbuf = req->kbuf;
 	struct io_ring_ctx *ctx = req->ctx;
@@ -3583,7 +3583,7 @@ static void __user *io_buffer_select(struct io_kiocb *req, size_t *len,
 
 	io_ring_submit_lock(req->ctx, issue_flags);
 
-	bl = io_buffer_get_list(ctx, bgid);
+	bl = io_buffer_get_list(ctx, req->buf_index);
 	if (bl && !list_empty(&bl->buf_list)) {
 		kbuf = list_first_entry(&bl->buf_list, struct io_buffer, list);
 		list_del(&kbuf->list);
@@ -3617,7 +3617,7 @@ static ssize_t io_compat_import(struct io_kiocb *req, struct iovec *iov,
 		return -EINVAL;
 
 	len = clen;
-	buf = io_buffer_select(req, &len, req->buf_index, issue_flags);
+	buf = io_buffer_select(req, &len, issue_flags);
 	if (IS_ERR(buf))
 		return PTR_ERR(buf);
 	iov[0].iov_base = buf;
@@ -3639,7 +3639,7 @@ static ssize_t __io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov,
 	len = iov[0].iov_len;
 	if (len < 0)
 		return -EINVAL;
-	buf = io_buffer_select(req, &len, req->buf_index, issue_flags);
+	buf = io_buffer_select(req, &len, issue_flags);
 	if (IS_ERR(buf))
 		return PTR_ERR(buf);
 	iov[0].iov_base = buf;
@@ -3691,8 +3691,7 @@ static struct iovec *__io_import_iovec(int rw, struct io_kiocb *req,
 
 	if (opcode == IORING_OP_READ || opcode == IORING_OP_WRITE) {
 		if (req->flags & REQ_F_BUFFER_SELECT) {
-			buf = io_buffer_select(req, &sqe_len, req->buf_index,
-						issue_flags);
+			buf = io_buffer_select(req, &sqe_len, issue_flags);
 			if (IS_ERR(buf))
 				return ERR_CAST(buf);
 			req->rw.len = sqe_len;
@@ -5900,7 +5899,6 @@ static int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 
 	sr->umsg = u64_to_user_ptr(READ_ONCE(sqe->addr));
 	sr->len = READ_ONCE(sqe->len);
-	sr->bgid = READ_ONCE(sqe->buf_group);
 	sr->msg_flags = READ_ONCE(sqe->msg_flags) | MSG_NOSIGNAL;
 	if (sr->msg_flags & MSG_DONTWAIT)
 		req->flags |= REQ_F_NOWAIT;
@@ -5938,7 +5936,7 @@ static int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
 	if (req->flags & REQ_F_BUFFER_SELECT) {
 		void __user *buf;
 
-		buf = io_buffer_select(req, &sr->len, sr->bgid, issue_flags);
+		buf = io_buffer_select(req, &sr->len, issue_flags);
 		if (IS_ERR(buf))
 			return PTR_ERR(buf);
 		kmsg->fast_iov[0].iov_base = buf;
@@ -6000,7 +5998,7 @@ static int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 	if (req->flags & REQ_F_BUFFER_SELECT) {
 		void __user *buf;
 
-		buf = io_buffer_select(req, &sr->len, sr->bgid, issue_flags);
+		buf = io_buffer_select(req, &sr->len, issue_flags);
 		if (IS_ERR(buf))
 			return PTR_ERR(buf);
 	}
@@ -8273,9 +8271,11 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
 		/* enforce forwards compatibility on users */
 		if (sqe_flags & ~SQE_VALID_FLAGS)
 			return -EINVAL;
-		if ((sqe_flags & IOSQE_BUFFER_SELECT) &&
-		    !io_op_defs[opcode].buffer_select)
-			return -EOPNOTSUPP;
+		if (sqe_flags & IOSQE_BUFFER_SELECT) {
+			if (!io_op_defs[opcode].buffer_select)
+				return -EOPNOTSUPP;
+			req->buf_index = READ_ONCE(sqe->buf_group);
+		}
 		if (sqe_flags & IOSQE_CQE_SKIP_SUCCESS)
 			ctx->drain_disabled = true;
 		if (sqe_flags & IOSQE_IO_DRAIN) {
-- 
2.35.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH 06/12] io_uring: cache last io_buffer_list lookup
  2022-04-30 20:50 [PATCHSET v3 0/12] Add support for ring mapped provided buffers Jens Axboe
                   ` (4 preceding siblings ...)
  2022-04-30 20:50 ` [PATCH 05/12] io_uring: always use req->buf_index for the provided buffer group Jens Axboe
@ 2022-04-30 20:50 ` Jens Axboe
  2022-04-30 20:50 ` [PATCH 07/12] io_uring: never call io_buffer_select() for a buffer re-select Jens Axboe
                   ` (5 subsequent siblings)
  11 siblings, 0 replies; 16+ messages in thread
From: Jens Axboe @ 2022-04-30 20:50 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe

Most use cases have 1 or few buffer groups, and even if they have
multiple, there's often some locality in looking them up. Add a basic
one-hit cache to avoid hashing the group ID and starting the list
iteration.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 20 +++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 41205548180d..4bd2f4d868c2 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -411,14 +411,16 @@ struct io_ring_ctx {
 		struct io_mapped_ubuf	**user_bufs;
 
 		struct io_submit_state	submit_state;
+		struct list_head	*io_buffers;
+		struct io_buffer_list	*io_bl_last;
+		unsigned int		io_bl_bgid;
+		u32			pers_next;
+		struct list_head	io_buffers_cache;
 		struct list_head	timeout_list;
 		struct list_head	ltimeout_list;
 		struct list_head	cq_overflow_list;
-		struct list_head	*io_buffers;
-		struct list_head	io_buffers_cache;
 		struct list_head	apoll_cache;
 		struct xarray		personalities;
-		u32			pers_next;
 		unsigned		sq_thread_idle;
 	} ____cacheline_aligned_in_smp;
 
@@ -1616,10 +1618,17 @@ static struct io_buffer_list *io_buffer_get_list(struct io_ring_ctx *ctx,
 	struct list_head *hash_list;
 	struct io_buffer_list *bl;
 
+	if (bgid == ctx->io_bl_bgid)
+		return ctx->io_bl_last;
+
 	hash_list = &ctx->io_buffers[hash_32(bgid, IO_BUFFERS_HASH_BITS)];
-	list_for_each_entry(bl, hash_list, list)
-		if (bl->bgid == bgid || bgid == -1U)
+	list_for_each_entry(bl, hash_list, list) {
+		if (bl->bgid == bgid || bgid == -1U) {
+			ctx->io_bl_bgid = bgid;
+			ctx->io_bl_last = bl;
 			return bl;
+		}
+	}
 
 	return NULL;
 }
@@ -1760,6 +1769,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 		goto err;
 	for (i = 0; i < (1U << IO_BUFFERS_HASH_BITS); i++)
 		INIT_LIST_HEAD(&ctx->io_buffers[i]);
+	ctx->io_bl_bgid = -1U;
 
 	if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
 			    PERCPU_REF_ALLOW_REINIT, GFP_KERNEL))
-- 
2.35.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH 07/12] io_uring: never call io_buffer_select() for a buffer re-select
  2022-04-30 20:50 [PATCHSET v3 0/12] Add support for ring mapped provided buffers Jens Axboe
                   ` (5 preceding siblings ...)
  2022-04-30 20:50 ` [PATCH 06/12] io_uring: cache last io_buffer_list lookup Jens Axboe
@ 2022-04-30 20:50 ` Jens Axboe
  2022-04-30 20:50 ` [PATCH 08/12] io_uring: add buffer selection support to IORING_OP_NOP Jens Axboe
                   ` (4 subsequent siblings)
  11 siblings, 0 replies; 16+ messages in thread
From: Jens Axboe @ 2022-04-30 20:50 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe

Callers already have room to store the addr and length information,
clean it up by having the caller just assign the previously provided
data.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 30 ++++++++++++++++++------------
 1 file changed, 18 insertions(+), 12 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 4bd2f4d868c2..3b61cf06275d 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -3588,9 +3588,6 @@ static void __user *io_buffer_select(struct io_kiocb *req, size_t *len,
 	struct io_ring_ctx *ctx = req->ctx;
 	struct io_buffer_list *bl;
 
-	if (req->flags & REQ_F_BUFFER_SELECTED)
-		return u64_to_user_ptr(kbuf->addr);
-
 	io_ring_submit_lock(req->ctx, issue_flags);
 
 	bl = io_buffer_get_list(ctx, req->buf_index);
@@ -3630,8 +3627,9 @@ static ssize_t io_compat_import(struct io_kiocb *req, struct iovec *iov,
 	buf = io_buffer_select(req, &len, issue_flags);
 	if (IS_ERR(buf))
 		return PTR_ERR(buf);
+	req->rw.addr = (unsigned long) buf;
 	iov[0].iov_base = buf;
-	iov[0].iov_len = (compat_size_t) len;
+	req->rw.len = iov[0].iov_len = (compat_size_t) len
 	return 0;
 }
 #endif
@@ -3652,8 +3650,9 @@ static ssize_t __io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov,
 	buf = io_buffer_select(req, &len, issue_flags);
 	if (IS_ERR(buf))
 		return PTR_ERR(buf);
+	req->rw.addr = (unsigned long) buf;
 	iov[0].iov_base = buf;
-	iov[0].iov_len = len;
+	req->rw.len = iov[0].iov_len = len;
 	return 0;
 }
 
@@ -3661,10 +3660,8 @@ static ssize_t io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov,
 				    unsigned int issue_flags)
 {
 	if (req->flags & REQ_F_BUFFER_SELECTED) {
-		struct io_buffer *kbuf = req->kbuf;
-
-		iov[0].iov_base = u64_to_user_ptr(kbuf->addr);
-		iov[0].iov_len = kbuf->len;
+		iov[0].iov_base = u64_to_user_ptr(req->rw.addr);
+		iov[0].iov_len = req->rw.len;
 		return 0;
 	}
 	if (req->rw.len != 1)
@@ -3678,6 +3675,13 @@ static ssize_t io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov,
 	return __io_iov_buffer_select(req, iov, issue_flags);
 }
 
+static inline bool io_do_buffer_select(struct io_kiocb *req)
+{
+	if (!(req->flags & REQ_F_BUFFER_SELECT))
+		return false;
+	return !(req->flags & REQ_F_BUFFER_SELECTED);
+}
+
 static struct iovec *__io_import_iovec(int rw, struct io_kiocb *req,
 				       struct io_rw_state *s,
 				       unsigned int issue_flags)
@@ -3700,10 +3704,11 @@ static struct iovec *__io_import_iovec(int rw, struct io_kiocb *req,
 	sqe_len = req->rw.len;
 
 	if (opcode == IORING_OP_READ || opcode == IORING_OP_WRITE) {
-		if (req->flags & REQ_F_BUFFER_SELECT) {
+		if (io_do_buffer_select(req)) {
 			buf = io_buffer_select(req, &sqe_len, issue_flags);
 			if (IS_ERR(buf))
 				return ERR_CAST(buf);
+			req->rw.addr = (unsigned long) buf;
 			req->rw.len = sqe_len;
 		}
 
@@ -5943,7 +5948,7 @@ static int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
 		kmsg = &iomsg;
 	}
 
-	if (req->flags & REQ_F_BUFFER_SELECT) {
+	if (io_do_buffer_select(req)) {
 		void __user *buf;
 
 		buf = io_buffer_select(req, &sr->len, issue_flags);
@@ -6005,12 +6010,13 @@ static int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 	if (unlikely(!sock))
 		return -ENOTSOCK;
 
-	if (req->flags & REQ_F_BUFFER_SELECT) {
+	if (io_do_buffer_select(req)) {
 		void __user *buf;
 
 		buf = io_buffer_select(req, &sr->len, issue_flags);
 		if (IS_ERR(buf))
 			return PTR_ERR(buf);
+		sr->buf = buf;
 	}
 
 	ret = import_single_range(READ, buf, sr->len, &iov, &msg.msg_iter);
-- 
2.35.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH 08/12] io_uring: add buffer selection support to IORING_OP_NOP
  2022-04-30 20:50 [PATCHSET v3 0/12] Add support for ring mapped provided buffers Jens Axboe
                   ` (6 preceding siblings ...)
  2022-04-30 20:50 ` [PATCH 07/12] io_uring: never call io_buffer_select() for a buffer re-select Jens Axboe
@ 2022-04-30 20:50 ` Jens Axboe
  2022-04-30 20:50 ` [PATCH 09/12] io_uring: add io_pin_pages() helper Jens Axboe
                   ` (3 subsequent siblings)
  11 siblings, 0 replies; 16+ messages in thread
From: Jens Axboe @ 2022-04-30 20:50 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe

Obviously not really useful since it's not transferring data, but it
is helpful in benchmarking overhead of provided buffers.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 3b61cf06275d..eb168a7c6e06 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -1052,7 +1052,9 @@ struct io_op_def {
 };
 
 static const struct io_op_def io_op_defs[] = {
-	[IORING_OP_NOP] = {},
+	[IORING_OP_NOP] = {
+		.buffer_select		= 1,
+	},
 	[IORING_OP_READV] = {
 		.needs_file		= 1,
 		.unbound_nonreg_file	= 1,
@@ -4910,11 +4912,20 @@ static int io_splice(struct io_kiocb *req, unsigned int issue_flags)
 static int io_nop(struct io_kiocb *req, unsigned int issue_flags)
 {
 	struct io_ring_ctx *ctx = req->ctx;
+	void __user *buf;
 
 	if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
 		return -EINVAL;
 
-	__io_req_complete(req, issue_flags, 0, 0);
+	if (req->flags & REQ_F_BUFFER_SELECT) {
+		size_t len = 1;
+
+		buf = io_buffer_select(req, &len, issue_flags);
+		if (IS_ERR(buf))
+			return PTR_ERR(buf);
+	}
+
+	__io_req_complete(req, issue_flags, 0, io_put_kbuf(req, issue_flags));
 	return 0;
 }
 
-- 
2.35.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH 09/12] io_uring: add io_pin_pages() helper
  2022-04-30 20:50 [PATCHSET v3 0/12] Add support for ring mapped provided buffers Jens Axboe
                   ` (7 preceding siblings ...)
  2022-04-30 20:50 ` [PATCH 08/12] io_uring: add buffer selection support to IORING_OP_NOP Jens Axboe
@ 2022-04-30 20:50 ` Jens Axboe
  2022-04-30 20:50 ` [PATCH 10/12] io_uring: abstract out provided buffer list selection Jens Axboe
                   ` (2 subsequent siblings)
  11 siblings, 0 replies; 16+ messages in thread
From: Jens Axboe @ 2022-04-30 20:50 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe

Abstract this out from io_sqe_buffer_register() so we can use it
elsewhere too without duplicating this code.

No intended functional changes in this patch.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 77 +++++++++++++++++++++++++++++++++------------------
 1 file changed, 50 insertions(+), 27 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index eb168a7c6e06..8b9636043dbf 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -10166,30 +10166,18 @@ static int io_buffer_account_pin(struct io_ring_ctx *ctx, struct page **pages,
 	return ret;
 }
 
-static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov,
-				  struct io_mapped_ubuf **pimu,
-				  struct page **last_hpage)
+static struct page **io_pin_pages(unsigned long ubuf, unsigned long len,
+				  int *npages)
 {
-	struct io_mapped_ubuf *imu = NULL;
+	unsigned long start, end, nr_pages;
 	struct vm_area_struct **vmas = NULL;
 	struct page **pages = NULL;
-	unsigned long off, start, end, ubuf;
-	size_t size;
-	int ret, pret, nr_pages, i;
-
-	if (!iov->iov_base) {
-		*pimu = ctx->dummy_ubuf;
-		return 0;
-	}
+	int i, pret, ret = -ENOMEM;
 
-	ubuf = (unsigned long) iov->iov_base;
-	end = (ubuf + iov->iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
+	end = (ubuf + len + PAGE_SIZE - 1) >> PAGE_SHIFT;
 	start = ubuf >> PAGE_SHIFT;
 	nr_pages = end - start;
 
-	*pimu = NULL;
-	ret = -ENOMEM;
-
 	pages = kvmalloc_array(nr_pages, sizeof(struct page *), GFP_KERNEL);
 	if (!pages)
 		goto done;
@@ -10199,10 +10187,6 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov,
 	if (!vmas)
 		goto done;
 
-	imu = kvmalloc(struct_size(imu, bvec, nr_pages), GFP_KERNEL);
-	if (!imu)
-		goto done;
-
 	ret = 0;
 	mmap_read_lock(current->mm);
 	pret = pin_user_pages(ubuf, nr_pages, FOLL_WRITE | FOLL_LONGTERM,
@@ -10220,6 +10204,7 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov,
 				break;
 			}
 		}
+		*npages = nr_pages;
 	} else {
 		ret = pret < 0 ? pret : -EFAULT;
 	}
@@ -10233,14 +10218,53 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov,
 			unpin_user_pages(pages, pret);
 		goto done;
 	}
+	ret = 0;
+done:
+	kvfree(vmas);
+	if (ret < 0) {
+		kvfree(pages);
+		pages = ERR_PTR(ret);
+	}
+	return pages;
+}
 
-	ret = io_buffer_account_pin(ctx, pages, pret, imu, last_hpage);
+static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov,
+				  struct io_mapped_ubuf **pimu,
+				  struct page **last_hpage)
+{
+	struct io_mapped_ubuf *imu = NULL;
+	struct page **pages = NULL;
+	unsigned long off;
+	size_t size;
+	int ret, nr_pages, i;
+
+	if (!iov->iov_base) {
+		*pimu = ctx->dummy_ubuf;
+		return 0;
+	}
+
+	*pimu = NULL;
+	ret = -ENOMEM;
+
+	pages = io_pin_pages((unsigned long) iov->iov_base, iov->iov_len,
+				&nr_pages);
+	if (IS_ERR(pages)) {
+		ret = PTR_ERR(pages);
+		pages = NULL;
+		goto done;
+	}
+
+	imu = kvmalloc(struct_size(imu, bvec, nr_pages), GFP_KERNEL);
+	if (!imu)
+		goto done;
+
+	ret = io_buffer_account_pin(ctx, pages, nr_pages, imu, last_hpage);
 	if (ret) {
-		unpin_user_pages(pages, pret);
+		unpin_user_pages(pages, nr_pages);
 		goto done;
 	}
 
-	off = ubuf & ~PAGE_MASK;
+	off = (unsigned long) iov->iov_base & ~PAGE_MASK;
 	size = iov->iov_len;
 	for (i = 0; i < nr_pages; i++) {
 		size_t vec_len;
@@ -10253,8 +10277,8 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov,
 		size -= vec_len;
 	}
 	/* store original address for later verification */
-	imu->ubuf = ubuf;
-	imu->ubuf_end = ubuf + iov->iov_len;
+	imu->ubuf = (unsigned long) iov->iov_base;
+	imu->ubuf_end = imu->ubuf + iov->iov_len;
 	imu->nr_bvecs = nr_pages;
 	*pimu = imu;
 	ret = 0;
@@ -10262,7 +10286,6 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov,
 	if (ret)
 		kvfree(imu);
 	kvfree(pages);
-	kvfree(vmas);
 	return ret;
 }
 
-- 
2.35.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH 10/12] io_uring: abstract out provided buffer list selection
  2022-04-30 20:50 [PATCHSET v3 0/12] Add support for ring mapped provided buffers Jens Axboe
                   ` (8 preceding siblings ...)
  2022-04-30 20:50 ` [PATCH 09/12] io_uring: add io_pin_pages() helper Jens Axboe
@ 2022-04-30 20:50 ` Jens Axboe
  2022-04-30 20:50 ` [PATCH 11/12] io_uring: move provided and fixed buffers into the same io_kiocb area Jens Axboe
  2022-04-30 20:50 ` [PATCH 12/12] io_uring: add support for ring mapped supplied buffers Jens Axboe
  11 siblings, 0 replies; 16+ messages in thread
From: Jens Axboe @ 2022-04-30 20:50 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe

In preparation for providing another way to select a buffer, move the
existing logic into a helper.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 31 ++++++++++++++++++++-----------
 1 file changed, 20 insertions(+), 11 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 8b9636043dbf..616a1c07c369 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -3583,29 +3583,38 @@ static void io_buffer_add_list(struct io_ring_ctx *ctx,
 	list_add(&bl->list, list);
 }
 
+static void __user *io_provided_buffer_select(struct io_kiocb *req, size_t *len,
+					      struct io_buffer_list *bl,
+					      unsigned int issue_flags)
+{
+	struct io_buffer *kbuf;
+
+	kbuf = list_first_entry(&bl->buf_list, struct io_buffer, list);
+	list_del(&kbuf->list);
+	if (*len > kbuf->len)
+		*len = kbuf->len;
+	req->flags |= REQ_F_BUFFER_SELECTED;
+	req->kbuf = kbuf;
+	io_ring_submit_unlock(req->ctx, issue_flags);
+	return u64_to_user_ptr(kbuf->addr);
+}
+
 static void __user *io_buffer_select(struct io_kiocb *req, size_t *len,
 				     unsigned int issue_flags)
 {
-	struct io_buffer *kbuf = req->kbuf;
 	struct io_ring_ctx *ctx = req->ctx;
 	struct io_buffer_list *bl;
 
 	io_ring_submit_lock(req->ctx, issue_flags);
 
 	bl = io_buffer_get_list(ctx, req->buf_index);
-	if (bl && !list_empty(&bl->buf_list)) {
-		kbuf = list_first_entry(&bl->buf_list, struct io_buffer, list);
-		list_del(&kbuf->list);
-		if (*len > kbuf->len)
-			*len = kbuf->len;
-		req->flags |= REQ_F_BUFFER_SELECTED;
-		req->kbuf = kbuf;
+	if (unlikely(!bl)) {
 		io_ring_submit_unlock(req->ctx, issue_flags);
-		return u64_to_user_ptr(kbuf->addr);
+		return ERR_PTR(-ENOBUFS);
 	}
 
-	io_ring_submit_unlock(req->ctx, issue_flags);
-	return ERR_PTR(-ENOBUFS);
+	/* selection helpers drop the submit lock again, if needed */
+	return io_provided_buffer_select(req, len, bl, issue_flags);
 }
 
 #ifdef CONFIG_COMPAT
-- 
2.35.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH 11/12] io_uring: move provided and fixed buffers into the same io_kiocb area
  2022-04-30 20:50 [PATCHSET v3 0/12] Add support for ring mapped provided buffers Jens Axboe
                   ` (9 preceding siblings ...)
  2022-04-30 20:50 ` [PATCH 10/12] io_uring: abstract out provided buffer list selection Jens Axboe
@ 2022-04-30 20:50 ` Jens Axboe
  2022-04-30 20:50 ` [PATCH 12/12] io_uring: add support for ring mapped supplied buffers Jens Axboe
  11 siblings, 0 replies; 16+ messages in thread
From: Jens Axboe @ 2022-04-30 20:50 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe

These are mutually exclusive - if you use provided buffers, then you
cannot use fixed buffers and vice versa. Move them into the same spot
in the io_kiocb, which is also advantageous for provided buffers as
they get near the submit side hot cacheline.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 616a1c07c369..3d5d02b40347 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -977,8 +977,14 @@ struct io_kiocb {
 	struct task_struct		*task;
 
 	struct io_rsrc_node		*rsrc_node;
-	/* store used ubuf, so we can prevent reloading */
-	struct io_mapped_ubuf		*imu;
+
+	union {
+		/* store used ubuf, so we can prevent reloading */
+		struct io_mapped_ubuf	*imu;
+
+		/* stores selected buf, valid IFF REQ_F_BUFFER_SELECTED is set */
+		struct io_buffer	*kbuf;
+	};
 
 	union {
 		/* used by request caches, completion batching and iopoll */
@@ -995,8 +1001,6 @@ struct io_kiocb {
 	struct async_poll		*apoll;
 	/* opcode allocated if it needs to store data for async defer */
 	void				*async_data;
-	/* stores selected buf, valid IFF REQ_F_BUFFER_SELECTED is set */
-	struct io_buffer		*kbuf;
 	/* linked requests, IFF REQ_F_HARDLINK or REQ_F_LINK are set */
 	struct io_kiocb			*link;
 	/* custom credentials, valid IFF REQ_F_CREDS is set */
-- 
2.35.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH 12/12] io_uring: add support for ring mapped supplied buffers
  2022-04-30 20:50 [PATCHSET v3 0/12] Add support for ring mapped provided buffers Jens Axboe
                   ` (10 preceding siblings ...)
  2022-04-30 20:50 ` [PATCH 11/12] io_uring: move provided and fixed buffers into the same io_kiocb area Jens Axboe
@ 2022-04-30 20:50 ` Jens Axboe
  2022-05-01 10:28   ` Dylan Yudaken
  11 siblings, 1 reply; 16+ messages in thread
From: Jens Axboe @ 2022-04-30 20:50 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe

Provided buffers allow an application to supply io_uring with buffers
that can then be grabbed for a read/receive request, when the data
source is ready to deliver data. The existing scheme relies on using
IORING_OP_PROVIDE_BUFFERS to do that, but it can be difficult to use
in real world applications. It's pretty efficient if the application
is able to supply back batches of provided buffers when they have been
consumed and the application is ready to recycle them, but if
fragmentation occurs in the buffer space, it can become difficult to
supply enough buffers at the time. This hurts efficiency.

Add a register op, IORING_REGISTER_PBUF_RING, which allows an application
to setup a shared queue for each buffer group of provided buffers. The
application can then supply buffers simply by adding them to this ring,
and the kernel can consume then just as easily. The ring shares the head
with the application, the tail remains private in the kernel.

Provided buffers setup with IORING_REGISTER_PBUF_RING cannot use
IORING_OP_{PROVIDE,REMOVE}_BUFFERS for adding or removing entries to the
ring, they must use the mapped ring. Mapped provided buffer rings can
co-exist with normal provided buffers, just not within the same group ID.

To gauge overhead of the existing scheme and evaluate the mapped ring
approach, a simple NOP benchmark was written. It uses a ring of 128
entries, and submits/completes 32 at the time. 'Replenish' is how
many buffers are provided back at the time after they have been
consumed:

Test			Replenish			NOPs/sec
================================================================
No provided buffers	NA				~30M
Provided buffers	32				~16M
Provided buffers	 1				~10M
Ring buffers		32				~27M
Ring buffers		 1				~27M

The ring mapped buffers perform almost as well as not using provided
buffers at all, and they don't care if you provided 1 or more back at
the same time. This means application can just replenish as they go,
rather than need to batch and compact, further reducing overhead in the
application. The NOP benchmark above doesn't need to do any compaction,
so that overhead isn't even reflected in the above test.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c                 | 227 +++++++++++++++++++++++++++++++---
 include/uapi/linux/io_uring.h |  26 ++++
 2 files changed, 238 insertions(+), 15 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 3d5d02b40347..a9691727652c 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -285,7 +285,16 @@ struct io_rsrc_data {
 struct io_buffer_list {
 	struct list_head list;
 	struct list_head buf_list;
+	struct page **buf_pages;
 	__u16 bgid;
+
+	/* below is for ring provided buffers */
+	__u16 buf_nr_pages;
+	__u16 nr_entries;
+	__u16 buf_per_page;
+	struct io_uring_buf_ring *buf_ring;
+	__u32 tail;
+	__u32 mask;
 };
 
 struct io_buffer {
@@ -815,6 +824,7 @@ enum {
 	REQ_F_NEED_CLEANUP_BIT,
 	REQ_F_POLLED_BIT,
 	REQ_F_BUFFER_SELECTED_BIT,
+	REQ_F_BUFFER_RING_BIT,
 	REQ_F_COMPLETE_INLINE_BIT,
 	REQ_F_REISSUE_BIT,
 	REQ_F_CREDS_BIT,
@@ -865,6 +875,8 @@ enum {
 	REQ_F_POLLED		= BIT(REQ_F_POLLED_BIT),
 	/* buffer already selected */
 	REQ_F_BUFFER_SELECTED	= BIT(REQ_F_BUFFER_SELECTED_BIT),
+	/* buffer selected from ring, needs commit */
+	REQ_F_BUFFER_RING	= BIT(REQ_F_BUFFER_RING_BIT),
 	/* completion is deferred through io_comp_state */
 	REQ_F_COMPLETE_INLINE	= BIT(REQ_F_COMPLETE_INLINE_BIT),
 	/* caller should reissue async */
@@ -984,6 +996,15 @@ struct io_kiocb {
 
 		/* stores selected buf, valid IFF REQ_F_BUFFER_SELECTED is set */
 		struct io_buffer	*kbuf;
+
+		/*
+		 * stores buffer ID for ring provided buffers, valid IFF
+		 * REQ_F_BUFFER_RING is set.
+		 */
+		 struct {
+			 struct io_buffer_list	*buf_list;
+			 __u32			bid;
+		 };
 	};
 
 	union {
@@ -1564,21 +1585,30 @@ static inline void io_req_set_rsrc_node(struct io_kiocb *req,
 
 static unsigned int __io_put_kbuf(struct io_kiocb *req, struct list_head *list)
 {
-	struct io_buffer *kbuf = req->kbuf;
 	unsigned int cflags;
 
-	cflags = IORING_CQE_F_BUFFER | (kbuf->bid << IORING_CQE_BUFFER_SHIFT);
-	req->flags &= ~REQ_F_BUFFER_SELECTED;
-	list_add(&kbuf->list, list);
-	req->kbuf = NULL;
-	return cflags;
+	if (req->flags & REQ_F_BUFFER_RING) {
+		if (req->buf_list)
+			req->buf_list->tail++;
+
+		cflags = req->bid << IORING_CQE_BUFFER_SHIFT;
+		req->flags &= ~REQ_F_BUFFER_RING;
+	} else {
+		struct io_buffer *kbuf = req->kbuf;
+
+		cflags = kbuf->bid << IORING_CQE_BUFFER_SHIFT;
+		list_add(&kbuf->list, list);
+		req->kbuf = NULL;
+		req->flags &= ~REQ_F_BUFFER_SELECTED;
+	}
+	return cflags | IORING_CQE_F_BUFFER;
 }
 
 static inline unsigned int io_put_kbuf_comp(struct io_kiocb *req)
 {
 	lockdep_assert_held(&req->ctx->completion_lock);
 
-	if (likely(!(req->flags & REQ_F_BUFFER_SELECTED)))
+	if (!(req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)))
 		return 0;
 	return __io_put_kbuf(req, &req->ctx->io_buffers_comp);
 }
@@ -1588,7 +1618,7 @@ static inline unsigned int io_put_kbuf(struct io_kiocb *req,
 {
 	unsigned int cflags;
 
-	if (likely(!(req->flags & REQ_F_BUFFER_SELECTED)))
+	if (!(req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)))
 		return 0;
 
 	/*
@@ -1603,7 +1633,10 @@ static inline unsigned int io_put_kbuf(struct io_kiocb *req,
 	 * We migrate buffers from the comp_list to the issue cache list
 	 * when we need one.
 	 */
-	if (issue_flags & IO_URING_F_UNLOCKED) {
+	if (req->flags & REQ_F_BUFFER_RING) {
+		/* no buffers to recycle for this case */
+		cflags = __io_put_kbuf(req, NULL);
+	} else if (issue_flags & IO_URING_F_UNLOCKED) {
 		struct io_ring_ctx *ctx = req->ctx;
 
 		spin_lock(&ctx->completion_lock);
@@ -1645,11 +1678,21 @@ static void io_kbuf_recycle(struct io_kiocb *req, unsigned issue_flags)
 	struct io_buffer_list *bl;
 	struct io_buffer *buf;
 
-	if (likely(!(req->flags & REQ_F_BUFFER_SELECTED)))
+	if (!(req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)))
 		return;
 	/* don't recycle if we already did IO to this buffer */
 	if (req->flags & REQ_F_PARTIAL_IO)
 		return;
+	/*
+	 * We don't need to recycle for REQ_F_BUFFER_RING, we can just clear
+	 * the flag and hence ensure that bl->tail doesn't get incremented.
+	 * If the tail has already been incremented, hang on to it.
+	 */
+	if (req->flags & REQ_F_BUFFER_RING) {
+		if (req->buf_list)
+			req->flags &= ~REQ_F_BUFFER_RING;
+		return;
+	}
 
 	io_ring_submit_lock(ctx, issue_flags);
 
@@ -3603,6 +3646,53 @@ static void __user *io_provided_buffer_select(struct io_kiocb *req, size_t *len,
 	return u64_to_user_ptr(kbuf->addr);
 }
 
+static void __user *io_ring_buffer_select(struct io_kiocb *req, size_t *len,
+					  struct io_buffer_list *bl,
+					  unsigned int issue_flags)
+{
+	struct io_uring_buf_ring *br = bl->buf_ring;
+	struct io_uring_buf *buf = &br->bufs[0];
+	__u32 tail = bl->tail;
+
+	if (unlikely(smp_load_acquire(&br->head) == tail))
+		return ERR_PTR(-ENOBUFS);
+
+	tail &= bl->mask;
+	if (tail < bl->buf_per_page) {
+		buf = &br->bufs[tail];
+	} else {
+		int index = tail - bl->buf_per_page;
+		int off = index & bl->buf_per_page;
+
+		index = (index >> (PAGE_SHIFT - 4)) + 1;
+		buf = page_address(bl->buf_pages[index]);
+		buf += off;
+	}
+	if (*len > buf->len)
+		*len = buf->len;
+	req->flags |= REQ_F_BUFFER_RING;
+	req->buf_list = bl;
+	req->bid = buf->bid;
+
+	if (!(issue_flags & IO_URING_F_UNLOCKED))
+		return u64_to_user_ptr(buf->addr);
+
+	/*
+	 * If we came in unlocked, we have no choice but to
+	 * consume the buffer here. This does mean it'll be
+	 * pinned until the IO completes. But coming in
+	 * unlocked means we're in io-wq context, hence there
+	 * should be no further retry. For the locked case, the
+	 * caller must ensure to call the commit when the
+	 * transfer completes (or if we get -EAGAIN and must
+	 * poll or retry).
+	 */
+	req->buf_list = NULL;
+	bl->tail++;
+	io_ring_submit_unlock(req->ctx, issue_flags);
+	return u64_to_user_ptr(buf->addr);
+}
+
 static void __user *io_buffer_select(struct io_kiocb *req, size_t *len,
 				     unsigned int issue_flags)
 {
@@ -3618,6 +3708,9 @@ static void __user *io_buffer_select(struct io_kiocb *req, size_t *len,
 	}
 
 	/* selection helpers drop the submit lock again, if needed */
+	if (bl->buf_pages)
+		return io_ring_buffer_select(req, len, bl, issue_flags);
+
 	return io_provided_buffer_select(req, len, bl, issue_flags);
 }
 
@@ -3674,7 +3767,7 @@ static ssize_t __io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov,
 static ssize_t io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov,
 				    unsigned int issue_flags)
 {
-	if (req->flags & REQ_F_BUFFER_SELECTED) {
+	if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) {
 		iov[0].iov_base = u64_to_user_ptr(req->rw.addr);
 		iov[0].iov_len = req->rw.len;
 		return 0;
@@ -3694,7 +3787,7 @@ static inline bool io_do_buffer_select(struct io_kiocb *req)
 {
 	if (!(req->flags & REQ_F_BUFFER_SELECT))
 		return false;
-	return !(req->flags & REQ_F_BUFFER_SELECTED);
+	return !(req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING));
 }
 
 static struct iovec *__io_import_iovec(int rw, struct io_kiocb *req,
@@ -5216,6 +5309,17 @@ static int __io_remove_buffers(struct io_ring_ctx *ctx,
 	if (!nbufs)
 		return 0;
 
+	if (bl->buf_pages) {
+		int j;
+
+		if (WARN_ON_ONCE(nbufs != -1U))
+			return -EINVAL;
+		for (j = 0; j < bl->buf_nr_pages; j++)
+			unpin_user_page(bl->buf_pages[j]);
+		kvfree(bl->buf_pages);
+		bl->buf_pages = NULL;
+	}
+
 	/* the head kbuf is the list itself */
 	while (!list_empty(&bl->buf_list)) {
 		struct io_buffer *nxt;
@@ -5242,8 +5346,12 @@ static int io_remove_buffers(struct io_kiocb *req, unsigned int issue_flags)
 
 	ret = -ENOENT;
 	bl = io_buffer_get_list(ctx, p->bgid);
-	if (bl)
-		ret = __io_remove_buffers(ctx, bl, p->nbufs);
+	if (bl) {
+		ret = -EINVAL;
+		/* can't use provide/remove buffers command on mapped buffers */
+		if (!bl->buf_pages)
+			ret = __io_remove_buffers(ctx, bl, p->nbufs);
+	}
 	if (ret < 0)
 		req_set_fail(req);
 
@@ -5368,13 +5476,18 @@ static int io_provide_buffers(struct io_kiocb *req, unsigned int issue_flags)
 
 	bl = io_buffer_get_list(ctx, p->bgid);
 	if (unlikely(!bl)) {
-		bl = kmalloc(sizeof(*bl), GFP_KERNEL);
+		bl = kzalloc(sizeof(*bl), GFP_KERNEL);
 		if (!bl) {
 			ret = -ENOMEM;
 			goto err;
 		}
 		io_buffer_add_list(ctx, bl, p->bgid);
 	}
+	/* can't add buffers via this command for a mapped buffer ring */
+	if (bl->buf_pages) {
+		ret = -EINVAL;
+		goto err;
+	}
 
 	ret = io_add_buffers(ctx, p, bl);
 err:
@@ -12318,6 +12431,77 @@ static __cold int io_register_iowq_max_workers(struct io_ring_ctx *ctx,
 	return ret;
 }
 
+static int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
+{
+	struct io_uring_buf_ring *br;
+	struct io_uring_buf_reg reg;
+	struct io_buffer_list *bl;
+	struct page **pages;
+	int nr_pages;
+
+	if (copy_from_user(&reg, arg, sizeof(reg)))
+		return -EFAULT;
+
+	if (reg.resv[0] || reg.resv[1] || reg.resv[2])
+		return -EINVAL;
+	if (!reg.ring_addr)
+		return -EFAULT;
+	if (reg.ring_addr & ~PAGE_MASK)
+		return -EINVAL;
+	if (!is_power_of_2(reg.ring_entries))
+		return -EINVAL;
+
+	bl = io_buffer_get_list(ctx, reg.bgid);
+	if (bl)
+		return -EEXIST;
+	bl = kzalloc(sizeof(*bl), GFP_KERNEL);
+	if (!bl)
+		return -ENOMEM;
+
+	pages = io_pin_pages(reg.ring_addr,
+			     struct_size(br, bufs, reg.ring_entries),
+			     &nr_pages);
+	if (IS_ERR(pages)) {
+		kfree(bl);
+		return PTR_ERR(pages);
+	}
+
+	br = page_address(pages[0]);
+	br->head = 0;
+	bl->buf_pages = pages;
+	bl->buf_nr_pages = nr_pages;
+	bl->nr_entries = reg.ring_entries;
+	BUILD_BUG_ON(sizeof(struct io_uring_buf) != 16);
+	bl->buf_per_page = (PAGE_SIZE - sizeof(struct io_uring_buf)) /
+				sizeof(struct io_uring_buf);
+	bl->buf_ring = br;
+	bl->mask = reg.ring_entries - 1;
+	io_buffer_add_list(ctx, bl, reg.bgid);
+	return 0;
+}
+
+static int io_unregister_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
+{
+	struct io_uring_buf_reg reg;
+	struct io_buffer_list *bl;
+
+	if (copy_from_user(&reg, arg, sizeof(reg)))
+		return -EFAULT;
+	if (reg.resv[0] || reg.resv[1] || reg.resv[2])
+		return -EINVAL;
+
+	bl = io_buffer_get_list(ctx, reg.bgid);
+	if (!bl)
+		return -ENOENT;
+	if (!bl->buf_pages)
+		return -EINVAL;
+
+	__io_remove_buffers(ctx, bl, -1U);
+	list_del(&bl->list);
+	kfree(bl);
+	return 0;
+}
+
 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
 			       void __user *arg, unsigned nr_args)
 	__releases(ctx->uring_lock)
@@ -12446,6 +12630,18 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
 	case IORING_UNREGISTER_RING_FDS:
 		ret = io_ringfd_unregister(ctx, arg, nr_args);
 		break;
+	case IORING_REGISTER_PBUF_RING:
+		ret = -EINVAL;
+		if (!arg || nr_args != 1)
+			break;
+		ret = io_register_pbuf_ring(ctx, arg);
+		break;
+	case IORING_UNREGISTER_PBUF_RING:
+		ret = -EINVAL;
+		if (!arg || nr_args != 1)
+			break;
+		ret = io_unregister_pbuf_ring(ctx, arg);
+		break;
 	default:
 		ret = -EINVAL;
 		break;
@@ -12531,6 +12727,7 @@ static int __init io_uring_init(void)
 
 	/* ->buf_index is u16 */
 	BUILD_BUG_ON(IORING_MAX_REG_BUFFERS >= (1u << 16));
+	BUILD_BUG_ON(offsetof(struct io_uring_buf_ring, bufs) != 16);
 
 	/* should fit into one byte */
 	BUILD_BUG_ON(SQE_VALID_FLAGS >= (1 << 8));
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 49d1f3994f8d..90b70071110a 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -352,6 +352,10 @@ enum {
 	IORING_REGISTER_RING_FDS		= 20,
 	IORING_UNREGISTER_RING_FDS		= 21,
 
+	/* register ring based provide buffer group */
+	IORING_REGISTER_PBUF_RING		= 22,
+	IORING_UNREGISTER_PBUF_RING		= 23,
+
 	/* this goes last */
 	IORING_REGISTER_LAST
 };
@@ -423,6 +427,28 @@ struct io_uring_restriction {
 	__u32 resv2[3];
 };
 
+struct io_uring_buf {
+	__u64	addr;
+	__u32	len;
+	__u32	bid;
+};
+
+struct io_uring_buf_ring {
+	union {
+		__u32			head;
+		struct io_uring_buf	pad;
+	};
+	struct io_uring_buf		bufs[];
+};
+
+/* argument for IORING_(UN)REGISTER_PBUF_RING */
+struct io_uring_buf_reg {
+	__u64	ring_addr;
+	__u32	ring_entries;
+	__u32	bgid;
+	__u64	resv[3];
+};
+
 /*
  * io_uring_restriction->opcode values
  */
-- 
2.35.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* Re: [PATCH 12/12] io_uring: add support for ring mapped supplied buffers
  2022-04-30 20:50 ` [PATCH 12/12] io_uring: add support for ring mapped supplied buffers Jens Axboe
@ 2022-05-01 10:28   ` Dylan Yudaken
  2022-05-01 12:25     ` Jens Axboe
  0 siblings, 1 reply; 16+ messages in thread
From: Dylan Yudaken @ 2022-05-01 10:28 UTC (permalink / raw)
  To: axboe, io-uring

On Sat, 2022-04-30 at 14:50 -0600, Jens Axboe wrote:
> Provided buffers allow an application to supply io_uring with buffers
> that can then be grabbed for a read/receive request, when the data
> source is ready to deliver data. The existing scheme relies on using
> IORING_OP_PROVIDE_BUFFERS to do that, but it can be difficult to use
> in real world applications. It's pretty efficient if the application
> is able to supply back batches of provided buffers when they have
> been
> consumed and the application is ready to recycle them, but if
> fragmentation occurs in the buffer space, it can become difficult to
> supply enough buffers at the time. This hurts efficiency.
> 
> Add a register op, IORING_REGISTER_PBUF_RING, which allows an
> application
> to setup a shared queue for each buffer group of provided buffers.
> The
> application can then supply buffers simply by adding them to this
> ring,
> and the kernel can consume then just as easily. The ring shares the
> head
> with the application, the tail remains private in the kernel.
> 
> Provided buffers setup with IORING_REGISTER_PBUF_RING cannot use
> IORING_OP_{PROVIDE,REMOVE}_BUFFERS for adding or removing entries to
> the
> ring, they must use the mapped ring. Mapped provided buffer rings can
> co-exist with normal provided buffers, just not within the same group
> ID.
> 
> To gauge overhead of the existing scheme and evaluate the mapped ring
> approach, a simple NOP benchmark was written. It uses a ring of 128
> entries, and submits/completes 32 at the time. 'Replenish' is how
> many buffers are provided back at the time after they have been
> consumed:
> 
> Test                    Replenish                       NOPs/sec
> ================================================================
> No provided buffers     NA                              ~30M
> Provided buffers        32                              ~16M
> Provided buffers         1                              ~10M
> Ring buffers            32                              ~27M
> Ring buffers             1                              ~27M
> 
> The ring mapped buffers perform almost as well as not using provided
> buffers at all, and they don't care if you provided 1 or more back at
> the same time. This means application can just replenish as they go,
> rather than need to batch and compact, further reducing overhead in
> the
> application. The NOP benchmark above doesn't need to do any
> compaction,
> so that overhead isn't even reflected in the above test.
> 
> Signed-off-by: Jens Axboe <axboe@kernel.dk>
> ---
>  fs/io_uring.c                 | 227 +++++++++++++++++++++++++++++++-
> --
>  include/uapi/linux/io_uring.h |  26 ++++
>  2 files changed, 238 insertions(+), 15 deletions(-)
> 
> diff --git a/fs/io_uring.c b/fs/io_uring.c
> index 3d5d02b40347..a9691727652c 100644
> --- a/fs/io_uring.c
> +++ b/fs/io_uring.c
> @@ -285,7 +285,16 @@ struct io_rsrc_data {
>  struct io_buffer_list {
>         struct list_head list;
>         struct list_head buf_list;
> +       struct page **buf_pages;
>         __u16 bgid;
> +
> +       /* below is for ring provided buffers */
> +       __u16 buf_nr_pages;
> +       __u16 nr_entries;
> +       __u16 buf_per_page;
> +       struct io_uring_buf_ring *buf_ring;
> +       __u32 tail;
> +       __u32 mask;
>  };
>  
>  struct io_buffer {
> @@ -815,6 +824,7 @@ enum {
>         REQ_F_NEED_CLEANUP_BIT,
>         REQ_F_POLLED_BIT,
>         REQ_F_BUFFER_SELECTED_BIT,
> +       REQ_F_BUFFER_RING_BIT,
>         REQ_F_COMPLETE_INLINE_BIT,
>         REQ_F_REISSUE_BIT,
>         REQ_F_CREDS_BIT,
> @@ -865,6 +875,8 @@ enum {
>         REQ_F_POLLED            = BIT(REQ_F_POLLED_BIT),
>         /* buffer already selected */
>         REQ_F_BUFFER_SELECTED   = BIT(REQ_F_BUFFER_SELECTED_BIT),
> +       /* buffer selected from ring, needs commit */
> +       REQ_F_BUFFER_RING       = BIT(REQ_F_BUFFER_RING_BIT),
>         /* completion is deferred through io_comp_state */
>         REQ_F_COMPLETE_INLINE   = BIT(REQ_F_COMPLETE_INLINE_BIT),
>         /* caller should reissue async */
> @@ -984,6 +996,15 @@ struct io_kiocb {
>  
>                 /* stores selected buf, valid IFF
> REQ_F_BUFFER_SELECTED is set */
>                 struct io_buffer        *kbuf;
> +
> +               /*
> +                * stores buffer ID for ring provided buffers, valid
> IFF
> +                * REQ_F_BUFFER_RING is set.
> +                */
> +                struct {
> +                        struct io_buffer_list  *buf_list;
> +                        __u32                  bid;
> +                };
>         };
>  
>         union {
> @@ -1564,21 +1585,30 @@ static inline void
> io_req_set_rsrc_node(struct io_kiocb *req,
>  
>  static unsigned int __io_put_kbuf(struct io_kiocb *req, struct
> list_head *list)
>  {
> -       struct io_buffer *kbuf = req->kbuf;
>         unsigned int cflags;
>  
> -       cflags = IORING_CQE_F_BUFFER | (kbuf->bid <<
> IORING_CQE_BUFFER_SHIFT);
> -       req->flags &= ~REQ_F_BUFFER_SELECTED;
> -       list_add(&kbuf->list, list);
> -       req->kbuf = NULL;
> -       return cflags;
> +       if (req->flags & REQ_F_BUFFER_RING) {
> +               if (req->buf_list)
> +                       req->buf_list->tail++;

does this need locking? both on buf_list being available or atomic
increment on tail.

> +
> +               cflags = req->bid << IORING_CQE_BUFFER_SHIFT;
> +               req->flags &= ~REQ_F_BUFFER_RING;
> +       } else {
> +               struct io_buffer *kbuf = req->kbuf;
> +
> +               cflags = kbuf->bid << IORING_CQE_BUFFER_SHIFT;
> +               list_add(&kbuf->list, list);
> +               req->kbuf = NULL;

I wonder if this is not necessary? we don't do it above to buf_list and
it seems to work? For consistency though maybe better to pick one
approach?

> +               req->flags &= ~REQ_F_BUFFER_SELECTED;
> +       }
> +       return cflags | IORING_CQE_F_BUFFER;
>  }
>  
>  static inline unsigned int io_put_kbuf_comp(struct io_kiocb *req)
>  {
>         lockdep_assert_held(&req->ctx->completion_lock);
>  
> -       if (likely(!(req->flags & REQ_F_BUFFER_SELECTED)))
> +       if (!(req->flags &
> (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)))
>                 return 0;
>         return __io_put_kbuf(req, &req->ctx->io_buffers_comp);
>  }
> @@ -1588,7 +1618,7 @@ static inline unsigned int io_put_kbuf(struct
> io_kiocb *req,
>  {
>         unsigned int cflags;
>  
> -       if (likely(!(req->flags & REQ_F_BUFFER_SELECTED)))
> +       if (!(req->flags &
> (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)))
>                 return 0;
>  
>         /*
> @@ -1603,7 +1633,10 @@ static inline unsigned int io_put_kbuf(struct
> io_kiocb *req,
>          * We migrate buffers from the comp_list to the issue cache
> list
>          * when we need one.
>          */
> -       if (issue_flags & IO_URING_F_UNLOCKED) {
> +       if (req->flags & REQ_F_BUFFER_RING) {
> +               /* no buffers to recycle for this case */
> +               cflags = __io_put_kbuf(req, NULL);
> +       } else if (issue_flags & IO_URING_F_UNLOCKED) {
>                 struct io_ring_ctx *ctx = req->ctx;
>  
>                 spin_lock(&ctx->completion_lock);
> @@ -1645,11 +1678,21 @@ static void io_kbuf_recycle(struct io_kiocb
> *req, unsigned issue_flags)
>         struct io_buffer_list *bl;
>         struct io_buffer *buf;
>  
> -       if (likely(!(req->flags & REQ_F_BUFFER_SELECTED)))
> +       if (!(req->flags &
> (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)))
>                 return;
>         /* don't recycle if we already did IO to this buffer */
>         if (req->flags & REQ_F_PARTIAL_IO)
>                 return;
> +       /*
> +        * We don't need to recycle for REQ_F_BUFFER_RING, we can
> just clear
> +        * the flag and hence ensure that bl->tail doesn't get
> incremented.
> +        * If the tail has already been incremented, hang on to it.
> +        */
> +       if (req->flags & REQ_F_BUFFER_RING) {
> +               if (req->buf_list)
> +                       req->flags &= ~REQ_F_BUFFER_RING;
> +               return;
> +       }
>  
>         io_ring_submit_lock(ctx, issue_flags);
>  
> @@ -3603,6 +3646,53 @@ static void __user
> *io_provided_buffer_select(struct io_kiocb *req, size_t *len,
>         return u64_to_user_ptr(kbuf->addr);
>  }
>  
> +static void __user *io_ring_buffer_select(struct io_kiocb *req,
> size_t *len,
> +                                         struct io_buffer_list *bl,
> +                                         unsigned int issue_flags)
> +{
> +       struct io_uring_buf_ring *br = bl->buf_ring;
> +       struct io_uring_buf *buf = &br->bufs[0];
> +       __u32 tail = bl->tail;
> +
> +       if (unlikely(smp_load_acquire(&br->head) == tail))
> +               return ERR_PTR(-ENOBUFS);
> +
> +       tail &= bl->mask;
> +       if (tail < bl->buf_per_page) {
> +               buf = &br->bufs[tail];
> +       } else {
> +               int index = tail - bl->buf_per_page;
> +               int off = index & bl->buf_per_page;
> +
> +               index = (index >> (PAGE_SHIFT - 4)) + 1;
> +               buf = page_address(bl->buf_pages[index]);
> +               buf += off;
> +       }
> +       if (*len > buf->len)
> +               *len = buf->len;
> +       req->flags |= REQ_F_BUFFER_RING;
> +       req->buf_list = bl;
> +       req->bid = buf->bid;
> +
> +       if (!(issue_flags & IO_URING_F_UNLOCKED))
> +               return u64_to_user_ptr(buf->addr);
> +
> +       /*
> +        * If we came in unlocked, we have no choice but to
> +        * consume the buffer here. This does mean it'll be
> +        * pinned until the IO completes. But coming in
> +        * unlocked means we're in io-wq context, hence there

I do not know if this matters here, but task work can also run unlocked
operations.

> +        * should be no further retry. For the locked case, the
> +        * caller must ensure to call the commit when the
> +        * transfer completes (or if we get -EAGAIN and must
> +        * poll or retry).
> +        */
> +       req->buf_list = NULL;
> +       bl->tail++;
> +       io_ring_submit_unlock(req->ctx, issue_flags);
> +       return u64_to_user_ptr(buf->addr);
> +}
> +
>  static void __user *io_buffer_select(struct io_kiocb *req, size_t
> *len,
>                                      unsigned int issue_flags)
>  {
> @@ -3618,6 +3708,9 @@ static void __user *io_buffer_select(struct
> io_kiocb *req, size_t *len,
>         }
>  
>         /* selection helpers drop the submit lock again, if needed */
> +       if (bl->buf_pages)
> +               return io_ring_buffer_select(req, len, bl,
> issue_flags);
> +
>         return io_provided_buffer_select(req, len, bl, issue_flags);
>  }
>  
> @@ -3674,7 +3767,7 @@ static ssize_t __io_iov_buffer_select(struct
> io_kiocb *req, struct iovec *iov,
>  static ssize_t io_iov_buffer_select(struct io_kiocb *req, struct
> iovec *iov,
>                                     unsigned int issue_flags)
>  {
> -       if (req->flags & REQ_F_BUFFER_SELECTED) {
> +       if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) {
>                 iov[0].iov_base = u64_to_user_ptr(req->rw.addr);
>                 iov[0].iov_len = req->rw.len;
>                 return 0;
> @@ -3694,7 +3787,7 @@ static inline bool io_do_buffer_select(struct
> io_kiocb *req)
>  {
>         if (!(req->flags & REQ_F_BUFFER_SELECT))
>                 return false;
> -       return !(req->flags & REQ_F_BUFFER_SELECTED);
> +       return !(req->flags &
> (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING));
>  }
>  
>  static struct iovec *__io_import_iovec(int rw, struct io_kiocb *req,
> @@ -5216,6 +5309,17 @@ static int __io_remove_buffers(struct
> io_ring_ctx *ctx,
>         if (!nbufs)
>                 return 0;
>  
> +       if (bl->buf_pages) {
> +               int j;
> +
> +               if (WARN_ON_ONCE(nbufs != -1U))
> +                       return -EINVAL;
> +               for (j = 0; j < bl->buf_nr_pages; j++)
> +                       unpin_user_page(bl->buf_pages[j]);
> +               kvfree(bl->buf_pages);
> +               bl->buf_pages = NULL;
> +       }
> +
>         /* the head kbuf is the list itself */
>         while (!list_empty(&bl->buf_list)) {
>                 struct io_buffer *nxt;
> @@ -5242,8 +5346,12 @@ static int io_remove_buffers(struct io_kiocb
> *req, unsigned int issue_flags)
>  
>         ret = -ENOENT;
>         bl = io_buffer_get_list(ctx, p->bgid);
> -       if (bl)
> -               ret = __io_remove_buffers(ctx, bl, p->nbufs);
> +       if (bl) {
> +               ret = -EINVAL;
> +               /* can't use provide/remove buffers command on mapped
> buffers */
> +               if (!bl->buf_pages)
> +                       ret = __io_remove_buffers(ctx, bl, p->nbufs);
> +       }
>         if (ret < 0)
>                 req_set_fail(req);
>  
> @@ -5368,13 +5476,18 @@ static int io_provide_buffers(struct io_kiocb
> *req, unsigned int issue_flags)
>  
>         bl = io_buffer_get_list(ctx, p->bgid);
>         if (unlikely(!bl)) {
> -               bl = kmalloc(sizeof(*bl), GFP_KERNEL);
> +               bl = kzalloc(sizeof(*bl), GFP_KERNEL);
>                 if (!bl) {
>                         ret = -ENOMEM;
>                         goto err;
>                 }
>                 io_buffer_add_list(ctx, bl, p->bgid);
>         }
> +       /* can't add buffers via this command for a mapped buffer
> ring */
> +       if (bl->buf_pages) {
> +               ret = -EINVAL;
> +               goto err;
> +       }
>  
>         ret = io_add_buffers(ctx, p, bl);
>  err:
> @@ -12318,6 +12431,77 @@ static __cold int
> io_register_iowq_max_workers(struct io_ring_ctx *ctx,
>         return ret;
>  }
>  
> +static int io_register_pbuf_ring(struct io_ring_ctx *ctx, void
> __user *arg)
> +{
> +       struct io_uring_buf_ring *br;
> +       struct io_uring_buf_reg reg;
> +       struct io_buffer_list *bl;
> +       struct page **pages;
> +       int nr_pages;
> +
> +       if (copy_from_user(&reg, arg, sizeof(reg)))
> +               return -EFAULT;
> +
> +       if (reg.resv[0] || reg.resv[1] || reg.resv[2])
> +               return -EINVAL;
> +       if (!reg.ring_addr)
> +               return -EFAULT;
> +       if (reg.ring_addr & ~PAGE_MASK)
> +               return -EINVAL;
> +       if (!is_power_of_2(reg.ring_entries))
> +               return -EINVAL;
> +
> +       bl = io_buffer_get_list(ctx, reg.bgid);
> +       if (bl)
> +               return -EEXIST;
> +       bl = kzalloc(sizeof(*bl), GFP_KERNEL);
> +       if (!bl)
> +               return -ENOMEM;
> +
> +       pages = io_pin_pages(reg.ring_addr,
> +                            struct_size(br, bufs, reg.ring_entries),
> +                            &nr_pages);
> +       if (IS_ERR(pages)) {
> +               kfree(bl);
> +               return PTR_ERR(pages);
> +       }
> +
> +       br = page_address(pages[0]);
> +       br->head = 0;
> +       bl->buf_pages = pages;
> +       bl->buf_nr_pages = nr_pages;
> +       bl->nr_entries = reg.ring_entries;
> +       BUILD_BUG_ON(sizeof(struct io_uring_buf) != 16);
> +       bl->buf_per_page = (PAGE_SIZE - sizeof(struct io_uring_buf))
> /
> +                               sizeof(struct io_uring_buf);
> +       bl->buf_ring = br;
> +       bl->mask = reg.ring_entries - 1;
> +       io_buffer_add_list(ctx, bl, reg.bgid);
> +       return 0;
> +}
> +
> +static int io_unregister_pbuf_ring(struct io_ring_ctx *ctx, void
> __user *arg)
> +{
> +       struct io_uring_buf_reg reg;
> +       struct io_buffer_list *bl;
> +
> +       if (copy_from_user(&reg, arg, sizeof(reg)))
> +               return -EFAULT;
> +       if (reg.resv[0] || reg.resv[1] || reg.resv[2])
> +               return -EINVAL;
> +
> +       bl = io_buffer_get_list(ctx, reg.bgid);
> +       if (!bl)
> +               return -ENOENT;
> +       if (!bl->buf_pages)
> +               return -EINVAL;
> +
> +       __io_remove_buffers(ctx, bl, -1U);
> +       list_del(&bl->list);
> +       kfree(bl);
> +       return 0;
> +}
> +
>  static int __io_uring_register(struct io_ring_ctx *ctx, unsigned
> opcode,
>                                void __user *arg, unsigned nr_args)
>         __releases(ctx->uring_lock)
> @@ -12446,6 +12630,18 @@ static int __io_uring_register(struct
> io_ring_ctx *ctx, unsigned opcode,
>         case IORING_UNREGISTER_RING_FDS:
>                 ret = io_ringfd_unregister(ctx, arg, nr_args);
>                 break;
> +       case IORING_REGISTER_PBUF_RING:
> +               ret = -EINVAL;
> +               if (!arg || nr_args != 1)
> +                       break;
> +               ret = io_register_pbuf_ring(ctx, arg);
> +               break;
> +       case IORING_UNREGISTER_PBUF_RING:
> +               ret = -EINVAL;
> +               if (!arg || nr_args != 1)
> +                       break;
> +               ret = io_unregister_pbuf_ring(ctx, arg);
> +               break;
>         default:
>                 ret = -EINVAL;
>                 break;
> @@ -12531,6 +12727,7 @@ static int __init io_uring_init(void)
>  
>         /* ->buf_index is u16 */
>         BUILD_BUG_ON(IORING_MAX_REG_BUFFERS >= (1u << 16));
> +       BUILD_BUG_ON(offsetof(struct io_uring_buf_ring, bufs) != 16);
>  
>         /* should fit into one byte */
>         BUILD_BUG_ON(SQE_VALID_FLAGS >= (1 << 8));
> diff --git a/include/uapi/linux/io_uring.h
> b/include/uapi/linux/io_uring.h
> index 49d1f3994f8d..90b70071110a 100644
> --- a/include/uapi/linux/io_uring.h
> +++ b/include/uapi/linux/io_uring.h
> @@ -352,6 +352,10 @@ enum {
>         IORING_REGISTER_RING_FDS                = 20,
>         IORING_UNREGISTER_RING_FDS              = 21,
>  
> +       /* register ring based provide buffer group */
> +       IORING_REGISTER_PBUF_RING               = 22,
> +       IORING_UNREGISTER_PBUF_RING             = 23,
> +
>         /* this goes last */
>         IORING_REGISTER_LAST
>  };
> @@ -423,6 +427,28 @@ struct io_uring_restriction {
>         __u32 resv2[3];
>  };
>  
> +struct io_uring_buf {
> +       __u64   addr;
> +       __u32   len;
> +       __u32   bid;
> +};

I believe bid is 16 bits due to the way it comes back in CQE flags

> +
> +struct io_uring_buf_ring {
> +       union {
> +               __u32                   head;
> +               struct io_uring_buf     pad;
> +       };
> +       struct io_uring_buf             bufs[];
> +};
> +
> +/* argument for IORING_(UN)REGISTER_PBUF_RING */
> +struct io_uring_buf_reg {
> +       __u64   ring_addr;
> +       __u32   ring_entries;
> +       __u32   bgid;

ditto for bgid



^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH 12/12] io_uring: add support for ring mapped supplied buffers
  2022-05-01 10:28   ` Dylan Yudaken
@ 2022-05-01 12:25     ` Jens Axboe
  2022-05-01 13:25       ` Jens Axboe
  0 siblings, 1 reply; 16+ messages in thread
From: Jens Axboe @ 2022-05-01 12:25 UTC (permalink / raw)
  To: Dylan Yudaken, io-uring

On 5/1/22 4:28 AM, Dylan Yudaken wrote:
> On Sat, 2022-04-30 at 14:50 -0600, Jens Axboe wrote:
>> Provided buffers allow an application to supply io_uring with buffers
>> that can then be grabbed for a read/receive request, when the data
>> source is ready to deliver data. The existing scheme relies on using
>> IORING_OP_PROVIDE_BUFFERS to do that, but it can be difficult to use
>> in real world applications. It's pretty efficient if the application
>> is able to supply back batches of provided buffers when they have
>> been
>> consumed and the application is ready to recycle them, but if
>> fragmentation occurs in the buffer space, it can become difficult to
>> supply enough buffers at the time. This hurts efficiency.
>>
>> Add a register op, IORING_REGISTER_PBUF_RING, which allows an
>> application
>> to setup a shared queue for each buffer group of provided buffers.
>> The
>> application can then supply buffers simply by adding them to this
>> ring,
>> and the kernel can consume then just as easily. The ring shares the
>> head
>> with the application, the tail remains private in the kernel.
>>
>> Provided buffers setup with IORING_REGISTER_PBUF_RING cannot use
>> IORING_OP_{PROVIDE,REMOVE}_BUFFERS for adding or removing entries to
>> the
>> ring, they must use the mapped ring. Mapped provided buffer rings can
>> co-exist with normal provided buffers, just not within the same group
>> ID.
>>
>> To gauge overhead of the existing scheme and evaluate the mapped ring
>> approach, a simple NOP benchmark was written. It uses a ring of 128
>> entries, and submits/completes 32 at the time. 'Replenish' is how
>> many buffers are provided back at the time after they have been
>> consumed:
>>
>> Test                    Replenish                       NOPs/sec
>> ================================================================
>> No provided buffers     NA                              ~30M
>> Provided buffers        32                              ~16M
>> Provided buffers         1                              ~10M
>> Ring buffers            32                              ~27M
>> Ring buffers             1                              ~27M
>>
>> The ring mapped buffers perform almost as well as not using provided
>> buffers at all, and they don't care if you provided 1 or more back at
>> the same time. This means application can just replenish as they go,
>> rather than need to batch and compact, further reducing overhead in
>> the
>> application. The NOP benchmark above doesn't need to do any
>> compaction,
>> so that overhead isn't even reflected in the above test.
>>
>> Signed-off-by: Jens Axboe <axboe@kernel.dk>
>> ---
>>  fs/io_uring.c                 | 227 +++++++++++++++++++++++++++++++-
>> --
>>  include/uapi/linux/io_uring.h |  26 ++++
>>  2 files changed, 238 insertions(+), 15 deletions(-)
>>
>> diff --git a/fs/io_uring.c b/fs/io_uring.c
>> index 3d5d02b40347..a9691727652c 100644
>> --- a/fs/io_uring.c
>> +++ b/fs/io_uring.c
>> @@ -285,7 +285,16 @@ struct io_rsrc_data {
>>  struct io_buffer_list {
>>         struct list_head list;
>>         struct list_head buf_list;
>> +       struct page **buf_pages;
>>         __u16 bgid;
>> +
>> +       /* below is for ring provided buffers */
>> +       __u16 buf_nr_pages;
>> +       __u16 nr_entries;
>> +       __u16 buf_per_page;
>> +       struct io_uring_buf_ring *buf_ring;
>> +       __u32 tail;
>> +       __u32 mask;
>>  };
>>  
>>  struct io_buffer {
>> @@ -815,6 +824,7 @@ enum {
>>         REQ_F_NEED_CLEANUP_BIT,
>>         REQ_F_POLLED_BIT,
>>         REQ_F_BUFFER_SELECTED_BIT,
>> +       REQ_F_BUFFER_RING_BIT,
>>         REQ_F_COMPLETE_INLINE_BIT,
>>         REQ_F_REISSUE_BIT,
>>         REQ_F_CREDS_BIT,
>> @@ -865,6 +875,8 @@ enum {
>>         REQ_F_POLLED            = BIT(REQ_F_POLLED_BIT),
>>         /* buffer already selected */
>>         REQ_F_BUFFER_SELECTED   = BIT(REQ_F_BUFFER_SELECTED_BIT),
>> +       /* buffer selected from ring, needs commit */
>> +       REQ_F_BUFFER_RING       = BIT(REQ_F_BUFFER_RING_BIT),
>>         /* completion is deferred through io_comp_state */
>>         REQ_F_COMPLETE_INLINE   = BIT(REQ_F_COMPLETE_INLINE_BIT),
>>         /* caller should reissue async */
>> @@ -984,6 +996,15 @@ struct io_kiocb {
>>  
>>                 /* stores selected buf, valid IFF
>> REQ_F_BUFFER_SELECTED is set */
>>                 struct io_buffer        *kbuf;
>> +
>> +               /*
>> +                * stores buffer ID for ring provided buffers, valid
>> IFF
>> +                * REQ_F_BUFFER_RING is set.
>> +                */
>> +                struct {
>> +                        struct io_buffer_list  *buf_list;
>> +                        __u32                  bid;
>> +                };
>>         };
>>  
>>         union {
>> @@ -1564,21 +1585,30 @@ static inline void
>> io_req_set_rsrc_node(struct io_kiocb *req,
>>  
>>  static unsigned int __io_put_kbuf(struct io_kiocb *req, struct
>> list_head *list)
>>  {
>> -       struct io_buffer *kbuf = req->kbuf;
>>         unsigned int cflags;
>>  
>> -       cflags = IORING_CQE_F_BUFFER | (kbuf->bid <<
>> IORING_CQE_BUFFER_SHIFT);
>> -       req->flags &= ~REQ_F_BUFFER_SELECTED;
>> -       list_add(&kbuf->list, list);
>> -       req->kbuf = NULL;
>> -       return cflags;
>> +       if (req->flags & REQ_F_BUFFER_RING) {
>> +               if (req->buf_list)
>> +                       req->buf_list->tail++;
> 
> does this need locking? both on buf_list being available or atomic
> increment on tail.

This needs some comments and checks around the expectation. But the idea
is that the fast path will invoke eg the recv with the uring_lock
already held, and we'll hold it until we complete it.

Basically we have two cases:

1) Op invoked with uring_lock held. Either the request completes
   successfully in this invocation, and we put the kbuf with it still
   held. The completion just increments the tail, buf now consumed. Or
   we need to retry somehow, and we can just clear REQ_F_BUFFER_RING to
   recycle the buf, that's it.

2) Op invoked without uring_lock held. We get a buf and increment the
   tail, as we'd otherwise need to grab it again for the completion.
   We're now stuck with the buf, hold it until the request completes.

#1 is the above code, just need some checks and safe guards to ensure
that if ->buf_list is still set, we are still holding the lock.

>> +
>> +               cflags = req->bid << IORING_CQE_BUFFER_SHIFT;
>> +               req->flags &= ~REQ_F_BUFFER_RING;
>> +       } else {
>> +               struct io_buffer *kbuf = req->kbuf;
>> +
>> +               cflags = kbuf->bid << IORING_CQE_BUFFER_SHIFT;
>> +               list_add(&kbuf->list, list);
>> +               req->kbuf = NULL;
> 
> I wonder if this is not necessary? we don't do it above to buf_list and
> it seems to work? For consistency though maybe better to pick one
> approach?

You mean clearing ->kbuf? Yes that's not needed, we're clearing
REQ_F_BUFFER_SELECTED anyway, it's more of a belt and suspenders thing.
I just left the original code alone here, we can remove the NULL
separately.

>> +       /*
>> +        * If we came in unlocked, we have no choice but to
>> +        * consume the buffer here. This does mean it'll be
>> +        * pinned until the IO completes. But coming in
>> +        * unlocked means we're in io-wq context, hence there
> 
> I do not know if this matters here, but task work can also run unlocked
> operations.

As long as the issue_flags are correct, then we should be fine.

>> @@ -423,6 +427,28 @@ struct io_uring_restriction {
>>         __u32 resv2[3];
>>  };
>>  
>> +struct io_uring_buf {
>> +       __u64   addr;
>> +       __u32   len;
>> +       __u32   bid;
>> +};
> 
> I believe bid is 16 bits due to the way it comes back in CQE flags

That is correctl it's limited to 64K on each. We can make this __u16
and add some reserved field. Ditto for the io_uring_buf_reg as well.

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH 12/12] io_uring: add support for ring mapped supplied buffers
  2022-05-01 12:25     ` Jens Axboe
@ 2022-05-01 13:25       ` Jens Axboe
  0 siblings, 0 replies; 16+ messages in thread
From: Jens Axboe @ 2022-05-01 13:25 UTC (permalink / raw)
  To: Dylan Yudaken, io-uring

On 5/1/22 6:25 AM, Jens Axboe wrote:
>>> @@ -1564,21 +1585,30 @@ static inline void
>>> io_req_set_rsrc_node(struct io_kiocb *req,
>>>  
>>>  static unsigned int __io_put_kbuf(struct io_kiocb *req, struct
>>> list_head *list)
>>>  {
>>> -       struct io_buffer *kbuf = req->kbuf;
>>>         unsigned int cflags;
>>>  
>>> -       cflags = IORING_CQE_F_BUFFER | (kbuf->bid <<
>>> IORING_CQE_BUFFER_SHIFT);
>>> -       req->flags &= ~REQ_F_BUFFER_SELECTED;
>>> -       list_add(&kbuf->list, list);
>>> -       req->kbuf = NULL;
>>> -       return cflags;
>>> +       if (req->flags & REQ_F_BUFFER_RING) {
>>> +               if (req->buf_list)
>>> +                       req->buf_list->tail++;
>>
>> does this need locking? both on buf_list being available or atomic
>> increment on tail.
> 
> This needs some comments and checks around the expectation. But the idea
> is that the fast path will invoke eg the recv with the uring_lock
> already held, and we'll hold it until we complete it.
> 
> Basically we have two cases:
> 
> 1) Op invoked with uring_lock held. Either the request completes
>    successfully in this invocation, and we put the kbuf with it still
>    held. The completion just increments the tail, buf now consumed. Or
>    we need to retry somehow, and we can just clear REQ_F_BUFFER_RING to
>    recycle the buf, that's it.
> 
> 2) Op invoked without uring_lock held. We get a buf and increment the
>    tail, as we'd otherwise need to grab it again for the completion.
>    We're now stuck with the buf, hold it until the request completes.
> 
> #1 is the above code, just need some checks and safe guards to ensure
> that if ->buf_list is still set, we are still holding the lock.

Here's a debug patch that does a lock sequence check for it. Totally
untested, but it'd be able to catch a violation of the above.

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 28654864201e..51e3536befe2 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -389,6 +389,7 @@ struct io_ring_ctx {
 	/* submission data */
 	struct {
 		struct mutex		uring_lock;
+		int			__lock_seq;
 
 		/*
 		 * Ring buffer of indices into array of io_uring_sqe, which is
@@ -1004,6 +1005,7 @@ struct io_kiocb {
 		 struct {
 			 struct io_buffer_list	*buf_list;
 			 __u32			bid;
+			 int			buf_lock_seq;
 		 };
 	};
 
@@ -1443,11 +1445,32 @@ static inline bool io_file_need_scm(struct file *filp)
 }
 #endif
 
+static inline void ctx_lock(struct io_ring_ctx *ctx)
+{
+	mutex_lock(&ctx->uring_lock);
+	ctx->__lock_seq++;
+}
+
+static inline bool ctx_trylock(struct io_ring_ctx *ctx)
+{
+	if (mutex_trylock(&ctx->uring_lock)) {
+		ctx->__lock_seq++;
+		return true;
+	}
+	return false;
+}
+
+static inline void ctx_unlock(struct io_ring_ctx *ctx)
+{
+	ctx->__lock_seq++;
+	mutex_unlock(&ctx->uring_lock);
+}
+
 static void io_ring_submit_unlock(struct io_ring_ctx *ctx, unsigned issue_flags)
 {
 	lockdep_assert_held(&ctx->uring_lock);
 	if (issue_flags & IO_URING_F_UNLOCKED)
-		mutex_unlock(&ctx->uring_lock);
+		ctx_unlock(ctx);
 }
 
 static void io_ring_submit_lock(struct io_ring_ctx *ctx, unsigned issue_flags)
@@ -1459,14 +1482,14 @@ static void io_ring_submit_lock(struct io_ring_ctx *ctx, unsigned issue_flags)
 	 * from an async worker thread, grab the lock for that case.
 	 */
 	if (issue_flags & IO_URING_F_UNLOCKED)
-		mutex_lock(&ctx->uring_lock);
+		ctx_lock(ctx);
 	lockdep_assert_held(&ctx->uring_lock);
 }
 
 static inline void io_tw_lock(struct io_ring_ctx *ctx, bool *locked)
 {
 	if (!*locked) {
-		mutex_lock(&ctx->uring_lock);
+		ctx_lock(ctx);
 		*locked = true;
 	}
 }
@@ -1588,8 +1611,10 @@ static unsigned int __io_put_kbuf(struct io_kiocb *req, struct list_head *list)
 	unsigned int cflags;
 
 	if (req->flags & REQ_F_BUFFER_RING) {
-		if (req->buf_list)
+		if (req->buf_list) {
+			WARN_ON_ONCE(req->buf_lock_seq != req->ctx->__lock_seq);
 			req->buf_list->tail++;
+		}
 
 		cflags = req->bid << IORING_CQE_BUFFER_SHIFT;
 		req->flags &= ~REQ_F_BUFFER_RING;
@@ -1777,7 +1802,7 @@ static __cold void io_fallback_req_func(struct work_struct *work)
 
 	if (locked) {
 		io_submit_flush_completions(ctx);
-		mutex_unlock(&ctx->uring_lock);
+		ctx_unlock(ctx);
 	}
 	percpu_ref_put(&ctx->refs);
 }
@@ -2237,10 +2262,10 @@ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx)
 	if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
 		/* iopoll syncs against uring_lock, not completion_lock */
 		if (ctx->flags & IORING_SETUP_IOPOLL)
-			mutex_lock(&ctx->uring_lock);
+			ctx_lock(ctx);
 		ret = __io_cqring_overflow_flush(ctx, false);
 		if (ctx->flags & IORING_SETUP_IOPOLL)
-			mutex_unlock(&ctx->uring_lock);
+			ctx_unlock(ctx);
 	}
 
 	return ret;
@@ -2698,7 +2723,7 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, bool *locked)
 		return;
 	if (*locked) {
 		io_submit_flush_completions(ctx);
-		mutex_unlock(&ctx->uring_lock);
+		ctx_unlock(ctx);
 		*locked = false;
 	}
 	percpu_ref_put(&ctx->refs);
@@ -2731,7 +2756,7 @@ static void handle_prev_tw_list(struct io_wq_work_node *node,
 			ctx_flush_and_put(*ctx, uring_locked);
 			*ctx = req->ctx;
 			/* if not contended, grab and improve batching */
-			*uring_locked = mutex_trylock(&(*ctx)->uring_lock);
+			*uring_locked = ctx_trylock(*ctx);
 			percpu_ref_get(&(*ctx)->refs);
 			if (unlikely(!*uring_locked))
 				spin_lock(&(*ctx)->completion_lock);
@@ -2762,7 +2787,7 @@ static void handle_tw_list(struct io_wq_work_node *node,
 			ctx_flush_and_put(*ctx, locked);
 			*ctx = req->ctx;
 			/* if not contended, grab and improve batching */
-			*locked = mutex_trylock(&(*ctx)->uring_lock);
+			*locked = ctx_trylock(*ctx);
 			percpu_ref_get(&(*ctx)->refs);
 		}
 		req->io_task_work.func(req, locked);
@@ -3126,7 +3151,7 @@ static __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
 	if (!(ctx->flags & IORING_SETUP_IOPOLL))
 		return;
 
-	mutex_lock(&ctx->uring_lock);
+	ctx_lock(ctx);
 	while (!wq_list_empty(&ctx->iopoll_list)) {
 		/* let it sleep and repeat later if can't complete a request */
 		if (io_do_iopoll(ctx, true) == 0)
@@ -3137,12 +3162,12 @@ static __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
 		 * Also let task_work, etc. to progress by releasing the mutex
 		 */
 		if (need_resched()) {
-			mutex_unlock(&ctx->uring_lock);
+			ctx_unlock(ctx);
 			cond_resched();
-			mutex_lock(&ctx->uring_lock);
+			ctx_lock(ctx);
 		}
 	}
-	mutex_unlock(&ctx->uring_lock);
+	ctx_unlock(ctx);
 }
 
 static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
@@ -3183,9 +3208,9 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
 		if (wq_list_empty(&ctx->iopoll_list)) {
 			u32 tail = ctx->cached_cq_tail;
 
-			mutex_unlock(&ctx->uring_lock);
+			ctx_unlock(ctx);
 			io_run_task_work();
-			mutex_lock(&ctx->uring_lock);
+			ctx_lock(ctx);
 
 			/* some requests don't go through iopoll_list */
 			if (tail != ctx->cached_cq_tail ||
@@ -3347,7 +3372,7 @@ static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags)
 
 	/* workqueue context doesn't hold uring_lock, grab it now */
 	if (unlikely(needs_lock))
-		mutex_lock(&ctx->uring_lock);
+		ctx_lock(ctx);
 
 	/*
 	 * Track whether we have multiple files in our lists. This will impact
@@ -3385,7 +3410,7 @@ static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags)
 		    wq_has_sleeper(&ctx->sq_data->wait))
 			wake_up(&ctx->sq_data->wait);
 
-		mutex_unlock(&ctx->uring_lock);
+		ctx_unlock(ctx);
 	}
 }
 
@@ -3674,8 +3699,10 @@ static void __user *io_ring_buffer_select(struct io_kiocb *req, size_t *len,
 	req->buf_list = bl;
 	req->bid = buf->bid;
 
-	if (!(issue_flags & IO_URING_F_UNLOCKED))
+	if (!(issue_flags & IO_URING_F_UNLOCKED)) {
+		req->buf_lock_seq = req->ctx->__lock_seq;
 		return u64_to_user_ptr(buf->addr);
+	}
 
 	/*
 	 * If we came in unlocked, we have no choice but to
@@ -8740,7 +8767,7 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries)
 		if (ctx->sq_creds != current_cred())
 			creds = override_creds(ctx->sq_creds);
 
-		mutex_lock(&ctx->uring_lock);
+		ctx_lock(ctx);
 		if (!wq_list_empty(&ctx->iopoll_list))
 			io_do_iopoll(ctx, true);
 
@@ -8751,7 +8778,7 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries)
 		if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) &&
 		    !(ctx->flags & IORING_SETUP_R_DISABLED))
 			ret = io_submit_sqes(ctx, to_submit);
-		mutex_unlock(&ctx->uring_lock);
+		ctx_unlock(ctx);
 
 		if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait))
 			wake_up(&ctx->sqo_sq_wait);
@@ -9165,17 +9192,17 @@ static __cold int io_rsrc_ref_quiesce(struct io_rsrc_data *data,
 		/* kill initial ref, already quiesced if zero */
 		if (atomic_dec_and_test(&data->refs))
 			break;
-		mutex_unlock(&ctx->uring_lock);
+		ctx_unlock(ctx);
 		flush_delayed_work(&ctx->rsrc_put_work);
 		ret = wait_for_completion_interruptible(&data->done);
 		if (!ret) {
-			mutex_lock(&ctx->uring_lock);
+			ctx_lock(ctx);
 			if (atomic_read(&data->refs) > 0) {
 				/*
 				 * it has been revived by another thread while
 				 * we were unlocked
 				 */
-				mutex_unlock(&ctx->uring_lock);
+				ctx_unlock(ctx);
 			} else {
 				break;
 			}
@@ -9187,7 +9214,7 @@ static __cold int io_rsrc_ref_quiesce(struct io_rsrc_data *data,
 		reinit_completion(&data->done);
 
 		ret = io_run_task_work_sig();
-		mutex_lock(&ctx->uring_lock);
+		ctx_lock(ctx);
 	} while (ret >= 0);
 	data->quiesce = false;
 
@@ -9572,7 +9599,7 @@ static void __io_rsrc_put_work(struct io_rsrc_node *ref_node)
 
 		if (prsrc->tag) {
 			if (ctx->flags & IORING_SETUP_IOPOLL)
-				mutex_lock(&ctx->uring_lock);
+				ctx_lock(ctx);
 
 			spin_lock(&ctx->completion_lock);
 			io_fill_cqe_aux(ctx, prsrc->tag, 0, 0);
@@ -9581,7 +9608,7 @@ static void __io_rsrc_put_work(struct io_rsrc_node *ref_node)
 			io_cqring_ev_posted(ctx);
 
 			if (ctx->flags & IORING_SETUP_IOPOLL)
-				mutex_unlock(&ctx->uring_lock);
+				ctx_unlock(ctx);
 		}
 
 		rsrc_data->do_put(ctx, prsrc);
@@ -9879,19 +9906,19 @@ static struct io_wq *io_init_wq_offload(struct io_ring_ctx *ctx,
 	struct io_wq_data data;
 	unsigned int concurrency;
 
-	mutex_lock(&ctx->uring_lock);
+	ctx_lock(ctx);
 	hash = ctx->hash_map;
 	if (!hash) {
 		hash = kzalloc(sizeof(*hash), GFP_KERNEL);
 		if (!hash) {
-			mutex_unlock(&ctx->uring_lock);
+			ctx_unlock(ctx);
 			return ERR_PTR(-ENOMEM);
 		}
 		refcount_set(&hash->refs, 1);
 		init_waitqueue_head(&hash->wait);
 		ctx->hash_map = hash;
 	}
-	mutex_unlock(&ctx->uring_lock);
+	ctx_unlock(ctx);
 
 	data.hash = hash;
 	data.task = task;
@@ -10642,7 +10669,7 @@ static void io_req_caches_free(struct io_ring_ctx *ctx)
 	struct io_submit_state *state = &ctx->submit_state;
 	int nr = 0;
 
-	mutex_lock(&ctx->uring_lock);
+	ctx_lock(ctx);
 	io_flush_cached_locked_reqs(ctx, state);
 
 	while (!io_req_cache_empty(ctx)) {
@@ -10656,7 +10683,7 @@ static void io_req_caches_free(struct io_ring_ctx *ctx)
 	}
 	if (nr)
 		percpu_ref_put_many(&ctx->refs, nr);
-	mutex_unlock(&ctx->uring_lock);
+	ctx_unlock(ctx);
 }
 
 static void io_wait_rsrc_data(struct io_rsrc_data *data)
@@ -10691,7 +10718,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
 	io_wait_rsrc_data(ctx->buf_data);
 	io_wait_rsrc_data(ctx->file_data);
 
-	mutex_lock(&ctx->uring_lock);
+	ctx_lock(ctx);
 	if (ctx->buf_data)
 		__io_sqe_buffers_unregister(ctx);
 	if (ctx->file_data)
@@ -10700,7 +10727,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
 		__io_cqring_overflow_flush(ctx, true);
 	io_eventfd_unregister(ctx);
 	io_flush_apoll_cache(ctx);
-	mutex_unlock(&ctx->uring_lock);
+	ctx_unlock(ctx);
 	io_destroy_buffers(ctx);
 	if (ctx->sq_creds)
 		put_cred(ctx->sq_creds);
@@ -10859,7 +10886,7 @@ static __cold void io_ring_exit_work(struct work_struct *work)
 	 * completion_lock, see io_req_task_submit(). Apart from other work,
 	 * this lock/unlock section also waits them to finish.
 	 */
-	mutex_lock(&ctx->uring_lock);
+	ctx_lock(ctx);
 	while (!list_empty(&ctx->tctx_list)) {
 		WARN_ON_ONCE(time_after(jiffies, timeout));
 
@@ -10871,11 +10898,11 @@ static __cold void io_ring_exit_work(struct work_struct *work)
 		if (WARN_ON_ONCE(ret))
 			continue;
 
-		mutex_unlock(&ctx->uring_lock);
+		ctx_unlock(ctx);
 		wait_for_completion(&exit.completion);
-		mutex_lock(&ctx->uring_lock);
+		ctx_lock(ctx);
 	}
-	mutex_unlock(&ctx->uring_lock);
+	ctx_unlock(ctx);
 	spin_lock(&ctx->completion_lock);
 	spin_unlock(&ctx->completion_lock);
 
@@ -10910,13 +10937,13 @@ static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
 	unsigned long index;
 	struct creds *creds;
 
-	mutex_lock(&ctx->uring_lock);
+	ctx_lock(ctx);
 	percpu_ref_kill(&ctx->refs);
 	if (ctx->rings)
 		__io_cqring_overflow_flush(ctx, true);
 	xa_for_each(&ctx->personalities, index, creds)
 		io_unregister_personality(ctx, index);
-	mutex_unlock(&ctx->uring_lock);
+	ctx_unlock(ctx);
 
 	/* failed during ring init, it couldn't have issued any requests */
 	if (ctx->rings) {
@@ -10991,7 +11018,7 @@ static __cold bool io_uring_try_cancel_iowq(struct io_ring_ctx *ctx)
 	enum io_wq_cancel cret;
 	bool ret = false;
 
-	mutex_lock(&ctx->uring_lock);
+	ctx_lock(ctx);
 	list_for_each_entry(node, &ctx->tctx_list, ctx_node) {
 		struct io_uring_task *tctx = node->task->io_uring;
 
@@ -11004,7 +11031,7 @@ static __cold bool io_uring_try_cancel_iowq(struct io_ring_ctx *ctx)
 		cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_ctx_cb, ctx, true);
 		ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
 	}
-	mutex_unlock(&ctx->uring_lock);
+	ctx_unlock(ctx);
 
 	return ret;
 }
@@ -11091,9 +11118,9 @@ static int __io_uring_add_tctx_node(struct io_ring_ctx *ctx)
 			return ret;
 		}
 
-		mutex_lock(&ctx->uring_lock);
+		ctx_lock(ctx);
 		list_add(&node->ctx_node, &ctx->tctx_list);
-		mutex_unlock(&ctx->uring_lock);
+		ctx_unlock(ctx);
 	}
 	tctx->last = ctx;
 	return 0;
@@ -11128,9 +11155,9 @@ static __cold void io_uring_del_tctx_node(unsigned long index)
 	WARN_ON_ONCE(current != node->task);
 	WARN_ON_ONCE(list_empty(&node->ctx_node));
 
-	mutex_lock(&node->ctx->uring_lock);
+	ctx_lock(node->ctx);
 	list_del(&node->ctx_node);
-	mutex_unlock(&node->ctx->uring_lock);
+	ctx_unlock(node->ctx);
 
 	if (tctx->last == node->ctx)
 		tctx->last = NULL;
@@ -11296,9 +11323,9 @@ static int io_ringfd_register(struct io_ring_ctx *ctx, void __user *__arg,
 	if (!nr_args || nr_args > IO_RINGFD_REG_MAX)
 		return -EINVAL;
 
-	mutex_unlock(&ctx->uring_lock);
+	ctx_unlock(ctx);
 	ret = io_uring_add_tctx_node(ctx);
-	mutex_lock(&ctx->uring_lock);
+	ctx_lock(ctx);
 	if (ret)
 		return ret;
 
@@ -11583,15 +11610,15 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
 		if (unlikely(ret))
 			goto out;
 
-		mutex_lock(&ctx->uring_lock);
+		ctx_lock(ctx);
 		ret = io_submit_sqes(ctx, to_submit);
 		if (ret != to_submit) {
-			mutex_unlock(&ctx->uring_lock);
+			ctx_unlock(ctx);
 			goto out;
 		}
 		if ((flags & IORING_ENTER_GETEVENTS) && ctx->syscall_iopoll)
 			goto iopoll_locked;
-		mutex_unlock(&ctx->uring_lock);
+		ctx_unlock(ctx);
 	}
 	if (flags & IORING_ENTER_GETEVENTS) {
 		int ret2;
@@ -11602,7 +11629,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
 			 * prevent racing with polled issue that got punted to
 			 * a workqueue.
 			 */
-			mutex_lock(&ctx->uring_lock);
+			ctx_lock(ctx);
 iopoll_locked:
 			ret2 = io_validate_ext_arg(flags, argp, argsz);
 			if (likely(!ret2)) {
@@ -11610,7 +11637,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
 						   ctx->cq_entries);
 				ret2 = io_iopoll_check(ctx, min_complete);
 			}
-			mutex_unlock(&ctx->uring_lock);
+			ctx_unlock(ctx);
 		} else {
 			const sigset_t __user *sig;
 			struct __kernel_timespec __user *ts;
@@ -12372,9 +12399,9 @@ static __cold int io_register_iowq_max_workers(struct io_ring_ctx *ctx,
 			 * a ref to the ctx.
 			 */
 			refcount_inc(&sqd->refs);
-			mutex_unlock(&ctx->uring_lock);
+			ctx_unlock(ctx);
 			mutex_lock(&sqd->lock);
-			mutex_lock(&ctx->uring_lock);
+			ctx_unlock(ctx);
 			if (sqd->thread)
 				tctx = sqd->thread->io_uring;
 		}
@@ -12668,9 +12695,9 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
 
 	io_run_task_work();
 
-	mutex_lock(&ctx->uring_lock);
+	ctx_lock(ctx);
 	ret = __io_uring_register(ctx, opcode, arg, nr_args);
-	mutex_unlock(&ctx->uring_lock);
+	ctx_unlock(ctx);
 	trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs, ret);
 out_fput:
 	fdput(f);

-- 
Jens Axboe


^ permalink raw reply related	[flat|nested] 16+ messages in thread

end of thread, other threads:[~2022-05-01 13:25 UTC | newest]

Thread overview: 16+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-04-30 20:50 [PATCHSET v3 0/12] Add support for ring mapped provided buffers Jens Axboe
2022-04-30 20:50 ` [PATCH 01/12] io_uring: kill io_recv_buffer_select() wrapper Jens Axboe
2022-04-30 20:50 ` [PATCH 02/12] io_uring: make io_buffer_select() return the user address directly Jens Axboe
2022-04-30 20:50 ` [PATCH 03/12] io_uring: kill io_rw_buffer_select() wrapper Jens Axboe
2022-04-30 20:50 ` [PATCH 04/12] io_uring: ignore ->buf_index if REQ_F_BUFFER_SELECT isn't set Jens Axboe
2022-04-30 20:50 ` [PATCH 05/12] io_uring: always use req->buf_index for the provided buffer group Jens Axboe
2022-04-30 20:50 ` [PATCH 06/12] io_uring: cache last io_buffer_list lookup Jens Axboe
2022-04-30 20:50 ` [PATCH 07/12] io_uring: never call io_buffer_select() for a buffer re-select Jens Axboe
2022-04-30 20:50 ` [PATCH 08/12] io_uring: add buffer selection support to IORING_OP_NOP Jens Axboe
2022-04-30 20:50 ` [PATCH 09/12] io_uring: add io_pin_pages() helper Jens Axboe
2022-04-30 20:50 ` [PATCH 10/12] io_uring: abstract out provided buffer list selection Jens Axboe
2022-04-30 20:50 ` [PATCH 11/12] io_uring: move provided and fixed buffers into the same io_kiocb area Jens Axboe
2022-04-30 20:50 ` [PATCH 12/12] io_uring: add support for ring mapped supplied buffers Jens Axboe
2022-05-01 10:28   ` Dylan Yudaken
2022-05-01 12:25     ` Jens Axboe
2022-05-01 13:25       ` Jens Axboe

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).