io-uring.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCHSET 0/3] io_uring support for automatic buffers
@ 2020-02-24  2:56 Jens Axboe
  2020-02-24  2:56 ` [PATCH 1/3] io_uring: buffer registration infrastructure Jens Axboe
                   ` (2 more replies)
  0 siblings, 3 replies; 10+ messages in thread
From: Jens Axboe @ 2020-02-24  2:56 UTC (permalink / raw)
  To: io-uring; +Cc: andres

With the poll retry based async IO patchset I posted last week, the one
big missing thing for me was the ability to have automatic buffer
selection. Generally applications that handle tons of sockets like to
poll for activity on them, then issue IO when they become ready. This is
of course at least two system calls, but it also means that it provides
an application a chance to manage how many IO buffers it needs. With the
io_uring based polled IO, the application need only issue an
IORING_OP_RECV (for example, to receive socket data), it doesn't need to
poll at all. However, this means that the application no longer has an
opportune moment to select how many IO buffers to keep in flight, it has
to be equal to what it currently has pending.

I had originally intended to use BPF to provide some means of buffer
selection, but I had a hard time imagining how life times of the buffer
could be managed through that. I had a false start today, but Andres
suggested a nifty approach that also solves the life time issue.

Basically the application registers buffers with the kernel. Each buffer
is registered with a given group ID, and buffer ID. The buffers are
organized by group ID, and the application selects a buffer pool based
on this group ID. One use case might be to group by size. There's an
opcode for this, IORING_OP_PROVIDE_BUFFER.

With that, when doing the same IORING_OP_RECV, no buffer is passed in
with the request. Instead, it's flagged with IOSQE_BUFFER_SELECT, and
sqe->buf_group is filled in with a valid group ID. When the kernel can
satisfy the receive, a buffer is selected from the specified group ID
pool. If none are available, the IO is terminated with -ENOBUFS. On
success, the buffer ID is passed back through the (CQE) completion
event. This tells the application what specific buffer was used.

A buffer can be used only once. On completion, the application may
choose to free it, or register it again with IORING_OP_PROVIDE_BUFFER.

Patches can also be found in the below repo:

https://git.kernel.dk/cgit/linux-block/log/?h=io_uring-buf-select

and they are obviously layered on top of the poll retry rework.

-- 
Jens Axboe




^ permalink raw reply	[flat|nested] 10+ messages in thread

* [PATCH 1/3] io_uring: buffer registration infrastructure
  2020-02-24  2:56 [PATCHSET 0/3] io_uring support for automatic buffers Jens Axboe
@ 2020-02-24  2:56 ` Jens Axboe
  2020-02-24  2:56 ` [PATCH 2/3] io_uring: add IORING_OP_PROVIDE_BUFFER Jens Axboe
  2020-02-24  2:56 ` [PATCH 3/3] io_uring: support buffer selection Jens Axboe
  2 siblings, 0 replies; 10+ messages in thread
From: Jens Axboe @ 2020-02-24  2:56 UTC (permalink / raw)
  To: io-uring; +Cc: andres, Jens Axboe

This just prepares the ring for having lists of buffers associated with
it, that the application can provide for SQEs to consume instead of
providing their own.

The buffers are organized by group ID.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 33 +++++++++++++++++++++++++++++++++
 1 file changed, 33 insertions(+)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 945547a5e792..98b0e9552ef2 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -199,6 +199,13 @@ struct fixed_file_data {
 	struct completion		done;
 };
 
+struct io_buffer {
+	struct list_head list;
+	__u64 addr;
+	__s32 len;
+	__u16 bid;
+};
+
 struct io_ring_ctx {
 	struct {
 		struct percpu_ref	refs;
@@ -276,6 +283,8 @@ struct io_ring_ctx {
 	struct socket		*ring_sock;
 #endif
 
+	struct idr		io_buffer_idr;
+
 	struct idr		personality_idr;
 
 	struct {
@@ -863,6 +872,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 	INIT_LIST_HEAD(&ctx->cq_overflow_list);
 	init_completion(&ctx->completions[0]);
 	init_completion(&ctx->completions[1]);
+	idr_init(&ctx->io_buffer_idr);
 	idr_init(&ctx->personality_idr);
 	mutex_init(&ctx->uring_lock);
 	init_waitqueue_head(&ctx->wait);
@@ -6447,6 +6457,28 @@ static int io_eventfd_unregister(struct io_ring_ctx *ctx)
 	return -ENXIO;
 }
 
+static int __io_destroy_buffers(int id, void *p, void *data)
+{
+	struct io_ring_ctx *ctx = data;
+	struct list_head *buf_list = p;
+	struct io_buffer *buf;
+
+	while (!list_empty(buf_list)) {
+		buf = list_first_entry(buf_list, struct io_buffer, list);
+		list_del(&buf->list);
+		kfree(buf);
+	}
+	idr_remove(&ctx->io_buffer_idr, id);
+	kfree(buf_list);
+	return 0;
+}
+
+static void io_destroy_buffers(struct io_ring_ctx *ctx)
+{
+	idr_for_each(&ctx->io_buffer_idr, __io_destroy_buffers, ctx);
+	idr_destroy(&ctx->io_buffer_idr);
+}
+
 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
 {
 	io_finish_async(ctx);
@@ -6457,6 +6489,7 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx)
 	io_sqe_buffer_unregister(ctx);
 	io_sqe_files_unregister(ctx);
 	io_eventfd_unregister(ctx);
+	io_destroy_buffers(ctx);
 	idr_destroy(&ctx->personality_idr);
 
 #if defined(CONFIG_UNIX)
-- 
2.25.1


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH 2/3] io_uring: add IORING_OP_PROVIDE_BUFFER
  2020-02-24  2:56 [PATCHSET 0/3] io_uring support for automatic buffers Jens Axboe
  2020-02-24  2:56 ` [PATCH 1/3] io_uring: buffer registration infrastructure Jens Axboe
@ 2020-02-24  2:56 ` Jens Axboe
  2020-02-24  2:56 ` [PATCH 3/3] io_uring: support buffer selection Jens Axboe
  2 siblings, 0 replies; 10+ messages in thread
From: Jens Axboe @ 2020-02-24  2:56 UTC (permalink / raw)
  To: io-uring; +Cc: andres, Jens Axboe

IORING_OP_PROVIDE_BUFFER uses the buffer registration infrastructure to
support passing in an addr/len that is associated with a buffer ID and
buffer group ID. The group ID is used to index and lookup the buffers,
while the buffer ID can be used to notify the application which buffer
in the group was used.

At least for now, no validation is done of the buffer ID. If the
application provides buffers within the same group with identical buffer
IDs, then it'll have a hard time telling which buffer ID was used. The
only restriction is that the buffer ID can be a max of 16-bits in size,
so USHRT_MAX is the maximum ID that can be used.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c                 | 84 +++++++++++++++++++++++++++++++++++
 include/uapi/linux/io_uring.h |  1 +
 2 files changed, 85 insertions(+)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 98b0e9552ef2..8b7c5ab69658 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -442,6 +442,14 @@ struct io_epoll {
 	struct epoll_event		event;
 };
 
+struct io_provide_buffer {
+	struct file			*file;
+	__u64				addr;
+	__s32				len;
+	__u32				gid;
+	__u16				bid;
+};
+
 struct io_async_connect {
 	struct sockaddr_storage		address;
 };
@@ -566,6 +574,7 @@ struct io_kiocb {
 		struct io_fadvise	fadvise;
 		struct io_madvise	madvise;
 		struct io_epoll		epoll;
+		struct io_provide_buffer	pbuf;
 	};
 
 	struct io_async_ctx		*io;
@@ -790,6 +799,7 @@ static const struct io_op_def io_op_defs[] = {
 		.unbound_nonreg_file	= 1,
 		.file_table		= 1,
 	},
+	[IORING_OP_PROVIDE_BUFFER] = {},
 };
 
 static void io_wq_submit_work(struct io_wq_work **workptr);
@@ -2703,6 +2713,69 @@ static int io_openat(struct io_kiocb *req, struct io_kiocb **nxt,
 	return io_openat2(req, nxt, force_nonblock);
 }
 
+static int io_provide_buffer_prep(struct io_kiocb *req,
+				  const struct io_uring_sqe *sqe)
+{
+	struct io_provide_buffer *p = &req->pbuf;
+	u64 off;
+
+	p->addr = READ_ONCE(sqe->addr);
+	p->len = READ_ONCE(sqe->len);
+	p->gid = READ_ONCE(sqe->fd);
+	off = READ_ONCE(sqe->off);
+	if (off > USHRT_MAX)
+		return -EINVAL;
+	p->bid = off;
+	return 0;
+}
+
+static int io_provide_buffer(struct io_kiocb *req, struct io_kiocb **nxt)
+{
+	struct io_provide_buffer *p = &req->pbuf;
+	struct io_ring_ctx *ctx = req->ctx;
+	struct list_head *list;
+	struct io_buffer *buf;
+	int ret = 0;
+
+	list = idr_find(&ctx->io_buffer_idr, p->gid);
+	if (!list) {
+		list = kmalloc(sizeof(*list), GFP_KERNEL);
+		if (!list) {
+			ret = -ENOMEM;
+			goto out;
+		}
+		INIT_LIST_HEAD(list);
+		ret = idr_alloc(&ctx->io_buffer_idr, list, p->gid, p->gid + 1,
+					GFP_KERNEL);
+		if (ret < 0) {
+			kfree(list);
+			goto out;
+		}
+	}
+
+	buf = kmalloc(sizeof(*buf), GFP_KERNEL);
+	if (!buf) {
+		if (list_empty(list)) {
+			idr_remove(&ctx->io_buffer_idr, p->gid);
+			kfree(list);
+		}
+		ret = -ENOMEM;
+		goto out;
+	}
+
+	buf->addr = p->addr;
+	buf->len = p->len;
+	buf->bid = p->bid;
+	list_add(&buf->list, list);
+	ret = buf->bid;
+out:
+	if (ret < 0)
+		req_set_fail_links(req);
+	io_cqring_add_event(req, ret);
+	io_put_req_find_next(req, nxt);
+	return 0;
+}
+
 static int io_epoll_ctl_prep(struct io_kiocb *req,
 			     const struct io_uring_sqe *sqe)
 {
@@ -4314,6 +4387,9 @@ static int io_req_defer_prep(struct io_kiocb *req,
 	case IORING_OP_EPOLL_CTL:
 		ret = io_epoll_ctl_prep(req, sqe);
 		break;
+	case IORING_OP_PROVIDE_BUFFER:
+		ret = io_provide_buffer_prep(req, sqe);
+		break;
 	default:
 		printk_once(KERN_WARNING "io_uring: unhandled opcode %d\n",
 				req->opcode);
@@ -4579,6 +4655,14 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 		}
 		ret = io_epoll_ctl(req, nxt, force_nonblock);
 		break;
+	case IORING_OP_PROVIDE_BUFFER:
+		if (sqe) {
+			ret = io_provide_buffer_prep(req, sqe);
+			if (ret)
+				break;
+		}
+		ret = io_provide_buffer(req, nxt);
+		break;
 	default:
 		ret = -EINVAL;
 		break;
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 653865554691..21915ada9507 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -113,6 +113,7 @@ enum {
 	IORING_OP_RECV,
 	IORING_OP_OPENAT2,
 	IORING_OP_EPOLL_CTL,
+	IORING_OP_PROVIDE_BUFFER,
 
 	/* this goes last, obviously */
 	IORING_OP_LAST,
-- 
2.25.1


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH 3/3] io_uring: support buffer selection
  2020-02-24  2:56 [PATCHSET 0/3] io_uring support for automatic buffers Jens Axboe
  2020-02-24  2:56 ` [PATCH 1/3] io_uring: buffer registration infrastructure Jens Axboe
  2020-02-24  2:56 ` [PATCH 2/3] io_uring: add IORING_OP_PROVIDE_BUFFER Jens Axboe
@ 2020-02-24  2:56 ` Jens Axboe
  2020-02-24 15:50   ` Jann Horn
  2 siblings, 1 reply; 10+ messages in thread
From: Jens Axboe @ 2020-02-24  2:56 UTC (permalink / raw)
  To: io-uring; +Cc: andres, Jens Axboe

If a server process has tons of pending socket connections, generally
it uses epoll to wait for activity. When the socket is ready for reading
(or writing), the task can select a buffer and issue a recv/send on the
given fd.

Now that we have fast (non-async thread) support, a task can have tons
of pending reads or writes pending. But that means they need buffers to
back that data, and if the number of connections is high enough, having
them preallocated for all possible connections is unfeasible.

With IORING_OP_PROVIDE_BUFFER, an application can register buffers to
use for any request. The request then sets IOSQE_BUFFER_SELECT in the
sqe, and a given group ID in sqe->buf_group. When the fd becomes ready,
a free buffer from the specified group is selected. If none are
available, the request is terminated with -ENOBUFS. If successful, the
CQE on completion will contain the buffer ID chosen in the cqe->flags
member, encoded as:

	(buffer_id << IORING_CQE_BUFFER_SHIFT) | IORING_CQE_F_BUFFER;

Once a buffer has been consumed by a request, it is no longer available
and must be registered again with IORING_OP_PROVIDE_BUFFER.

Requests need to support this feature. For now, IORING_OP_READ,
IORING_OP_WRITE, IORING_OP_RECV, and IORING_OP_SEND supports it. This
is checked on SQE submission, a CQE with res == -EINVAL will be posted
if attempted on unsupported requests.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c                 | 169 ++++++++++++++++++++++++++++++----
 include/uapi/linux/io_uring.h |  22 ++++-
 2 files changed, 171 insertions(+), 20 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 8b7c5ab69658..1b96b88485d8 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -399,7 +399,9 @@ struct io_sr_msg {
 		void __user		*buf;
 	};
 	int				msg_flags;
+	int				gid;
 	size_t				len;
+	struct io_buffer		*kbuf;
 };
 
 struct io_open {
@@ -484,6 +486,7 @@ enum {
 	REQ_F_LINK_BIT		= IOSQE_IO_LINK_BIT,
 	REQ_F_HARDLINK_BIT	= IOSQE_IO_HARDLINK_BIT,
 	REQ_F_FORCE_ASYNC_BIT	= IOSQE_ASYNC_BIT,
+	REQ_F_BUFFER_SELECT_BIT	= IOSQE_BUFFER_SELECT_BIT,
 
 	REQ_F_LINK_NEXT_BIT,
 	REQ_F_FAIL_LINK_BIT,
@@ -500,6 +503,7 @@ enum {
 	REQ_F_NEED_CLEANUP_BIT,
 	REQ_F_OVERFLOW_BIT,
 	REQ_F_POLLED_BIT,
+	REQ_F_BUFFER_SELECTED_BIT,
 };
 
 enum {
@@ -513,6 +517,8 @@ enum {
 	REQ_F_HARDLINK		= BIT(REQ_F_HARDLINK_BIT),
 	/* IOSQE_ASYNC */
 	REQ_F_FORCE_ASYNC	= BIT(REQ_F_FORCE_ASYNC_BIT),
+	/* IOSQE_BUFFER_SELECT */
+	REQ_F_BUFFER_SELECT	= BIT(REQ_F_BUFFER_SELECT_BIT),
 
 	/* already grabbed next link */
 	REQ_F_LINK_NEXT		= BIT(REQ_F_LINK_NEXT_BIT),
@@ -544,6 +550,8 @@ enum {
 	REQ_F_OVERFLOW		= BIT(REQ_F_OVERFLOW_BIT),
 	/* already went through poll handler */
 	REQ_F_POLLED		= BIT(REQ_F_POLLED_BIT),
+	/* buffer already selected */
+	REQ_F_BUFFER_SELECTED	= BIT(REQ_F_BUFFER_SELECTED_BIT),
 };
 
 struct async_poll {
@@ -606,6 +614,7 @@ struct io_kiocb {
 			struct callback_head	task_work;
 			struct hlist_node	hash_node;
 			struct async_poll	*apoll;
+			int			cflags;
 		};
 		struct io_wq_work	work;
 	};
@@ -655,6 +664,8 @@ struct io_op_def {
 	/* set if opcode supports polled "wait" */
 	unsigned		pollin : 1;
 	unsigned		pollout : 1;
+	/* op supports buffer selection */
+	unsigned		buffer_select : 1;
 };
 
 static const struct io_op_def io_op_defs[] = {
@@ -764,12 +775,14 @@ static const struct io_op_def io_op_defs[] = {
 		.needs_file		= 1,
 		.unbound_nonreg_file	= 1,
 		.pollin			= 1,
+		.buffer_select		= 1,
 	},
 	[IORING_OP_WRITE] = {
 		.needs_mm		= 1,
 		.needs_file		= 1,
 		.unbound_nonreg_file	= 1,
 		.pollout		= 1,
+		.buffer_select		= 1,
 	},
 	[IORING_OP_FADVISE] = {
 		.needs_file		= 1,
@@ -782,12 +795,14 @@ static const struct io_op_def io_op_defs[] = {
 		.needs_file		= 1,
 		.unbound_nonreg_file	= 1,
 		.pollout		= 1,
+		.buffer_select		= 1,
 	},
 	[IORING_OP_RECV] = {
 		.needs_mm		= 1,
 		.needs_file		= 1,
 		.unbound_nonreg_file	= 1,
 		.pollin			= 1,
+		.buffer_select		= 1,
 	},
 	[IORING_OP_OPENAT2] = {
 		.needs_file		= 1,
@@ -1157,7 +1172,7 @@ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
 		if (cqe) {
 			WRITE_ONCE(cqe->user_data, req->user_data);
 			WRITE_ONCE(cqe->res, req->result);
-			WRITE_ONCE(cqe->flags, 0);
+			WRITE_ONCE(cqe->flags, req->flags);
 		} else {
 			WRITE_ONCE(ctx->rings->cq_overflow,
 				atomic_inc_return(&ctx->cached_cq_overflow));
@@ -1181,7 +1196,7 @@ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
 	return cqe != NULL;
 }
 
-static void io_cqring_fill_event(struct io_kiocb *req, long res)
+static void __io_cqring_fill_event(struct io_kiocb *req, long res, long cflags)
 {
 	struct io_ring_ctx *ctx = req->ctx;
 	struct io_uring_cqe *cqe;
@@ -1197,7 +1212,7 @@ static void io_cqring_fill_event(struct io_kiocb *req, long res)
 	if (likely(cqe)) {
 		WRITE_ONCE(cqe->user_data, req->user_data);
 		WRITE_ONCE(cqe->res, res);
-		WRITE_ONCE(cqe->flags, 0);
+		WRITE_ONCE(cqe->flags, cflags);
 	} else if (ctx->cq_overflow_flushed) {
 		WRITE_ONCE(ctx->rings->cq_overflow,
 				atomic_inc_return(&ctx->cached_cq_overflow));
@@ -1209,23 +1224,34 @@ static void io_cqring_fill_event(struct io_kiocb *req, long res)
 		req->flags |= REQ_F_OVERFLOW;
 		refcount_inc(&req->refs);
 		req->result = res;
+		req->cflags = cflags;
 		list_add_tail(&req->list, &ctx->cq_overflow_list);
 	}
 }
 
-static void io_cqring_add_event(struct io_kiocb *req, long res)
+static void io_cqring_fill_event(struct io_kiocb *req, long res)
+{
+	__io_cqring_fill_event(req, res, 0);
+}
+
+static void __io_cqring_add_event(struct io_kiocb *req, long res, long cflags)
 {
 	struct io_ring_ctx *ctx = req->ctx;
 	unsigned long flags;
 
 	spin_lock_irqsave(&ctx->completion_lock, flags);
-	io_cqring_fill_event(req, res);
+	__io_cqring_fill_event(req, res, cflags);
 	io_commit_cqring(ctx);
 	spin_unlock_irqrestore(&ctx->completion_lock, flags);
 
 	io_cqring_ev_posted(ctx);
 }
 
+static void io_cqring_add_event(struct io_kiocb *req, long res)
+{
+	__io_cqring_add_event(req, res, 0);
+}
+
 static inline bool io_is_fallback_req(struct io_kiocb *req)
 {
 	return req == (struct io_kiocb *)
@@ -1603,6 +1629,17 @@ static inline bool io_req_multi_free(struct req_batch *rb, struct io_kiocb *req)
 	return true;
 }
 
+static int io_rw_common_cflags(struct io_kiocb *req)
+{
+	struct io_buffer *kbuf = (struct io_buffer *) req->rw.addr;
+	int cflags;
+
+	cflags = kbuf->bid << IORING_CQE_BUFFER_SHIFT;
+	cflags |= IORING_CQE_F_BUFFER;
+	kfree(kbuf);
+	return cflags;
+}
+
 /*
  * Find and free completed poll iocbs
  */
@@ -1614,10 +1651,15 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
 
 	rb.to_free = rb.need_iter = 0;
 	while (!list_empty(done)) {
+		int cflags = 0;
+
 		req = list_first_entry(done, struct io_kiocb, list);
 		list_del(&req->list);
 
-		io_cqring_fill_event(req, req->result);
+		if (req->flags & REQ_F_BUFFER_SELECTED)
+			cflags = io_rw_common_cflags(req);
+
+		__io_cqring_fill_event(req, req->result, cflags);
 		(*nr_events)++;
 
 		if (refcount_dec_and_test(&req->refs) &&
@@ -1792,13 +1834,16 @@ static inline void req_set_fail_links(struct io_kiocb *req)
 static void io_complete_rw_common(struct kiocb *kiocb, long res)
 {
 	struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
+	int cflags = 0;
 
 	if (kiocb->ki_flags & IOCB_WRITE)
 		kiocb_end_write(req);
 
 	if (res != req->result)
 		req_set_fail_links(req);
-	io_cqring_add_event(req, res);
+	if (req->flags & REQ_F_BUFFER_SELECTED)
+		cflags = io_rw_common_cflags(req);
+	__io_cqring_add_event(req, res, cflags);
 }
 
 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
@@ -1983,7 +2028,7 @@ static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 
 	req->rw.addr = READ_ONCE(sqe->addr);
 	req->rw.len = READ_ONCE(sqe->len);
-	/* we own ->private, reuse it for the buffer index */
+	/* we own ->private, reuse it for the buffer index  / buffer ID */
 	req->rw.kiocb.private = (void *) (unsigned long)
 					READ_ONCE(sqe->buf_index);
 	return 0;
@@ -2097,6 +2142,24 @@ static ssize_t io_import_fixed(struct io_kiocb *req, int rw,
 	return len;
 }
 
+static struct io_buffer *io_buffer_select(struct io_kiocb *req, int gid,
+					  void *buf)
+{
+	struct list_head *list;
+	struct io_buffer *kbuf;
+
+	if (req->flags & REQ_F_BUFFER_SELECTED)
+		return buf;
+
+	list = idr_find(&req->ctx->io_buffer_idr, gid);
+	if (!list || list_empty(list))
+		return ERR_PTR(-ENOBUFS);
+
+	kbuf = list_first_entry(list, struct io_buffer, list);
+	list_del(&kbuf->list);
+	return kbuf;
+}
+
 static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
 			       struct iovec **iovec, struct iov_iter *iter)
 {
@@ -2110,12 +2173,30 @@ static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
 		return io_import_fixed(req, rw, iter);
 	}
 
-	/* buffer index only valid with fixed read/write */
-	if (req->rw.kiocb.private)
+	/* buffer index only valid with fixed read/write, or buffer select  */
+	if (req->rw.kiocb.private && !(req->flags & REQ_F_BUFFER_SELECT))
 		return -EINVAL;
 
 	if (opcode == IORING_OP_READ || opcode == IORING_OP_WRITE) {
 		ssize_t ret;
+
+		if (req->flags & REQ_F_BUFFER_SELECT) {
+			struct io_buffer *kbuf;
+			int gid;
+
+			gid = (int) (unsigned long) req->rw.kiocb.private;
+			kbuf = io_buffer_select(req, gid, buf);
+			if (IS_ERR(kbuf)) {
+				*iovec = NULL;
+				return PTR_ERR(kbuf);
+			}
+			req->rw.addr = (u64) kbuf;
+			if (sqe_len > kbuf->len)
+				sqe_len = kbuf->len;
+			req->flags |= REQ_F_BUFFER_SELECTED;
+			buf = u64_to_user_ptr(kbuf->addr);
+		}
+
 		ret = import_single_range(rw, buf, sqe_len, *iovec, iter);
 		*iovec = NULL;
 		return ret;
@@ -3112,6 +3193,7 @@ static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 	sr->msg_flags = READ_ONCE(sqe->msg_flags);
 	sr->msg = u64_to_user_ptr(READ_ONCE(sqe->addr));
 	sr->len = READ_ONCE(sqe->len);
+	sr->gid = READ_ONCE(sqe->buf_group);
 
 	if (!io || req->opcode == IORING_OP_SEND)
 		return 0;
@@ -3202,12 +3284,36 @@ static int io_sendmsg(struct io_kiocb *req, struct io_kiocb **nxt,
 #endif
 }
 
+static struct io_buffer *io_send_recv_buffer_select(struct io_kiocb *req,
+						    struct io_buffer **kbuf,
+						    int *cflags)
+{
+	struct io_sr_msg *sr = &req->sr_msg;
+
+	if (!(req->flags & REQ_F_BUFFER_SELECT))
+		return req->sr_msg.buf;
+
+	*kbuf = io_buffer_select(req, sr->gid, sr->kbuf);
+	if (IS_ERR(*kbuf))
+		return *kbuf;
+
+	sr->kbuf = *kbuf;
+	if (sr->len > (*kbuf)->len)
+		sr->len = (*kbuf)->len;
+	req->flags |= REQ_F_BUFFER_SELECTED;
+
+	*cflags = (*kbuf)->bid << IORING_CQE_BUFFER_SHIFT;
+	*cflags |= IORING_CQE_F_BUFFER;
+	return u64_to_user_ptr((*kbuf)->addr);
+}
+
 static int io_send(struct io_kiocb *req, struct io_kiocb **nxt,
 		   bool force_nonblock)
 {
 #if defined(CONFIG_NET)
+	struct io_buffer *kbuf = NULL;
 	struct socket *sock;
-	int ret;
+	int ret, cflags = 0;
 
 	if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
 		return -EINVAL;
@@ -3217,9 +3323,16 @@ static int io_send(struct io_kiocb *req, struct io_kiocb **nxt,
 		struct io_sr_msg *sr = &req->sr_msg;
 		struct msghdr msg;
 		struct iovec iov;
+		void __user *buf;
 		unsigned flags;
 
-		ret = import_single_range(WRITE, sr->buf, sr->len, &iov,
+		buf = io_send_recv_buffer_select(req, &kbuf, &cflags);
+		if (IS_ERR(buf)) {
+			ret = PTR_ERR(buf);
+			goto out;
+		}
+
+		ret = import_single_range(WRITE, buf, sr->len, &iov,
 						&msg.msg_iter);
 		if (ret)
 			return ret;
@@ -3243,7 +3356,9 @@ static int io_send(struct io_kiocb *req, struct io_kiocb **nxt,
 			ret = -EINTR;
 	}
 
-	io_cqring_add_event(req, ret);
+	kfree(kbuf);
+out:
+	__io_cqring_add_event(req, ret, cflags);
 	if (ret < 0)
 		req_set_fail_links(req);
 	io_put_req_find_next(req, nxt);
@@ -3264,6 +3379,7 @@ static int io_recvmsg_prep(struct io_kiocb *req,
 	sr->msg_flags = READ_ONCE(sqe->msg_flags);
 	sr->msg = u64_to_user_ptr(READ_ONCE(sqe->addr));
 	sr->len = READ_ONCE(sqe->len);
+	sr->gid = READ_ONCE(sqe->buf_group);
 
 	if (!io || req->opcode == IORING_OP_RECV)
 		return 0;
@@ -3360,8 +3476,9 @@ static int io_recv(struct io_kiocb *req, struct io_kiocb **nxt,
 		   bool force_nonblock)
 {
 #if defined(CONFIG_NET)
+	struct io_buffer *kbuf = NULL;
 	struct socket *sock;
-	int ret;
+	int ret, cflags = 0;
 
 	if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
 		return -EINVAL;
@@ -3371,9 +3488,16 @@ static int io_recv(struct io_kiocb *req, struct io_kiocb **nxt,
 		struct io_sr_msg *sr = &req->sr_msg;
 		struct msghdr msg;
 		struct iovec iov;
+		void __user *buf;
 		unsigned flags;
 
-		ret = import_single_range(READ, sr->buf, sr->len, &iov,
+		buf = io_send_recv_buffer_select(req, &kbuf, &cflags);
+		if (IS_ERR(buf)) {
+			ret = PTR_ERR(buf);
+			goto out;
+		}
+
+		ret = import_single_range(READ, buf, sr->len, &iov,
 						&msg.msg_iter);
 		if (ret)
 			return ret;
@@ -3398,7 +3522,9 @@ static int io_recv(struct io_kiocb *req, struct io_kiocb **nxt,
 			ret = -EINTR;
 	}
 
-	io_cqring_add_event(req, ret);
+	kfree(kbuf);
+out:
+	__io_cqring_add_event(req, ret, cflags);
 	if (ret < 0)
 		req_set_fail_links(req);
 	io_put_req_find_next(req, nxt);
@@ -5006,7 +5132,8 @@ static inline void io_queue_link_head(struct io_kiocb *req)
 }
 
 #define SQE_VALID_FLAGS	(IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK|	\
-				IOSQE_IO_HARDLINK | IOSQE_ASYNC)
+				IOSQE_IO_HARDLINK | IOSQE_ASYNC | \
+				IOSQE_BUFFER_SELECT)
 
 static bool io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 			  struct io_submit_state *state, struct io_kiocb **link)
@@ -5023,6 +5150,12 @@ static bool io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 		goto err_req;
 	}
 
+	if ((sqe_flags & IOSQE_BUFFER_SELECT) &&
+	    !io_op_defs[req->opcode].buffer_select) {
+		ret = -EINVAL;
+		goto err_req;
+	}
+
 	id = READ_ONCE(sqe->personality);
 	if (id) {
 		req->work.creds = idr_find(&ctx->personality_idr, id);
@@ -5035,7 +5168,7 @@ static bool io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 
 	/* same numerical values with corresponding REQ_F_*, safe to copy */
 	req->flags |= sqe_flags & (IOSQE_IO_DRAIN|IOSQE_IO_HARDLINK|
-					IOSQE_ASYNC);
+					IOSQE_ASYNC|IOSQE_BUFFER_SELECT);
 
 	ret = io_req_set_file(state, req, sqe);
 	if (unlikely(ret)) {
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 21915ada9507..d46fd80af913 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -41,8 +41,12 @@ struct io_uring_sqe {
 	__u64	user_data;	/* data to be passed back at completion time */
 	union {
 		struct {
-			/* index into fixed buffers, if used */
-			__u16	buf_index;
+			union {
+				/* index into fixed buffers, if used */
+				__u16	buf_index;
+				/* for grouped buffer selection */
+				__u16	buf_group;
+			};
 			/* personality to use, if used */
 			__u16	personality;
 		};
@@ -56,6 +60,7 @@ enum {
 	IOSQE_IO_LINK_BIT,
 	IOSQE_IO_HARDLINK_BIT,
 	IOSQE_ASYNC_BIT,
+	IOSQE_BUFFER_SELECT_BIT,
 };
 
 /*
@@ -71,6 +76,8 @@ enum {
 #define IOSQE_IO_HARDLINK	(1U << IOSQE_IO_HARDLINK_BIT)
 /* always go async */
 #define IOSQE_ASYNC		(1U << IOSQE_ASYNC_BIT)
+/* select buffer from sqe->buf_group */
+#define IOSQE_BUFFER_SELECT	(1U << IOSQE_BUFFER_SELECT_BIT)
 
 /*
  * io_uring_setup() flags
@@ -138,6 +145,17 @@ struct io_uring_cqe {
 	__u32	flags;
 };
 
+/*
+ * cqe->flags
+ *
+ * IORING_CQE_F_BUFFER	If set, the upper 16 bits are the buffer ID
+ */
+#define IORING_CQE_F_BUFFER		(1U << 0)
+
+enum {
+	IORING_CQE_BUFFER_SHIFT		= 16,
+};
+
 /*
  * Magic offsets for the application to mmap the data it needs
  */
-- 
2.25.1


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* Re: [PATCH 3/3] io_uring: support buffer selection
  2020-02-24  2:56 ` [PATCH 3/3] io_uring: support buffer selection Jens Axboe
@ 2020-02-24 15:50   ` Jann Horn
  2020-02-24 15:57     ` Jens Axboe
  0 siblings, 1 reply; 10+ messages in thread
From: Jann Horn @ 2020-02-24 15:50 UTC (permalink / raw)
  To: Jens Axboe; +Cc: io-uring, andres

On Mon, Feb 24, 2020 at 3:56 AM Jens Axboe <axboe@kernel.dk> wrote:
[...]
> With IORING_OP_PROVIDE_BUFFER, an application can register buffers to
> use for any request. The request then sets IOSQE_BUFFER_SELECT in the
> sqe, and a given group ID in sqe->buf_group. When the fd becomes ready,
> a free buffer from the specified group is selected. If none are
> available, the request is terminated with -ENOBUFS. If successful, the
> CQE on completion will contain the buffer ID chosen in the cqe->flags
> member, encoded as:
[...]
> +static struct io_buffer *io_buffer_select(struct io_kiocb *req, int gid,
> +                                         void *buf)
> +{
> +       struct list_head *list;
> +       struct io_buffer *kbuf;
> +
> +       if (req->flags & REQ_F_BUFFER_SELECTED)
> +               return buf;
> +
> +       list = idr_find(&req->ctx->io_buffer_idr, gid);
> +       if (!list || list_empty(list))
> +               return ERR_PTR(-ENOBUFS);
> +
> +       kbuf = list_first_entry(list, struct io_buffer, list);
> +       list_del(&kbuf->list);
> +       return kbuf;
> +}

This function requires some sort of external locking, right? Since
it's removing an entry from a list.

[...]
> +static struct io_buffer *io_send_recv_buffer_select(struct io_kiocb *req,
> +                                                   struct io_buffer **kbuf,
> +                                                   int *cflags)
> +{
> +       struct io_sr_msg *sr = &req->sr_msg;
> +
> +       if (!(req->flags & REQ_F_BUFFER_SELECT))
> +               return req->sr_msg.buf;
> +
> +       *kbuf = io_buffer_select(req, sr->gid, sr->kbuf);

But we use it here in io_send_recv_buffer_select(), not taking any
extra locks before calling it...

> +       if (IS_ERR(*kbuf))
> +               return *kbuf;
> +
> +       sr->kbuf = *kbuf;
> +       if (sr->len > (*kbuf)->len)
> +               sr->len = (*kbuf)->len;
> +       req->flags |= REQ_F_BUFFER_SELECTED;
> +
> +       *cflags = (*kbuf)->bid << IORING_CQE_BUFFER_SHIFT;
> +       *cflags |= IORING_CQE_F_BUFFER;
> +       return u64_to_user_ptr((*kbuf)->addr);
> +}
> +
>  static int io_send(struct io_kiocb *req, struct io_kiocb **nxt,
>                    bool force_nonblock)
>  {
>  #if defined(CONFIG_NET)
> +       struct io_buffer *kbuf = NULL;
>         struct socket *sock;
> -       int ret;
> +       int ret, cflags = 0;
>
>         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
>                 return -EINVAL;
> @@ -3217,9 +3323,16 @@ static int io_send(struct io_kiocb *req, struct io_kiocb **nxt,
>                 struct io_sr_msg *sr = &req->sr_msg;
>                 struct msghdr msg;
>                 struct iovec iov;
> +               void __user *buf;
>                 unsigned flags;
>
> -               ret = import_single_range(WRITE, sr->buf, sr->len, &iov,
> +               buf = io_send_recv_buffer_select(req, &kbuf, &cflags);

... and call io_send_recv_buffer_select() from here without holding
any extra locks in this function. This function is then called from
io_issue_sqe() (no extra locks held there either), which is called
from __io_queue_sqe() (no extra locks AFAICS), which is called from
places like task work.

Am I missing something?

It might in general be helpful to have more lockdep assertions in this
code, both to help the reader understand the locking rules and to help
verify locking correctness.

^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [PATCH 3/3] io_uring: support buffer selection
  2020-02-24 15:50   ` Jann Horn
@ 2020-02-24 15:57     ` Jens Axboe
  2020-02-24 17:06       ` Jens Axboe
  0 siblings, 1 reply; 10+ messages in thread
From: Jens Axboe @ 2020-02-24 15:57 UTC (permalink / raw)
  To: Jann Horn; +Cc: io-uring, andres

On 2/24/20 8:50 AM, Jann Horn wrote:
> On Mon, Feb 24, 2020 at 3:56 AM Jens Axboe <axboe@kernel.dk> wrote:
> [...]
>> With IORING_OP_PROVIDE_BUFFER, an application can register buffers to
>> use for any request. The request then sets IOSQE_BUFFER_SELECT in the
>> sqe, and a given group ID in sqe->buf_group. When the fd becomes ready,
>> a free buffer from the specified group is selected. If none are
>> available, the request is terminated with -ENOBUFS. If successful, the
>> CQE on completion will contain the buffer ID chosen in the cqe->flags
>> member, encoded as:
> [...]
>> +static struct io_buffer *io_buffer_select(struct io_kiocb *req, int gid,
>> +                                         void *buf)
>> +{
>> +       struct list_head *list;
>> +       struct io_buffer *kbuf;
>> +
>> +       if (req->flags & REQ_F_BUFFER_SELECTED)
>> +               return buf;
>> +
>> +       list = idr_find(&req->ctx->io_buffer_idr, gid);
>> +       if (!list || list_empty(list))
>> +               return ERR_PTR(-ENOBUFS);
>> +
>> +       kbuf = list_first_entry(list, struct io_buffer, list);
>> +       list_del(&kbuf->list);
>> +       return kbuf;
>> +}
> 
> This function requires some sort of external locking, right? Since
> it's removing an entry from a list.
> 
> [...]
>> +static struct io_buffer *io_send_recv_buffer_select(struct io_kiocb *req,
>> +                                                   struct io_buffer **kbuf,
>> +                                                   int *cflags)
>> +{
>> +       struct io_sr_msg *sr = &req->sr_msg;
>> +
>> +       if (!(req->flags & REQ_F_BUFFER_SELECT))
>> +               return req->sr_msg.buf;
>> +
>> +       *kbuf = io_buffer_select(req, sr->gid, sr->kbuf);
> 
> But we use it here in io_send_recv_buffer_select(), not taking any
> extra locks before calling it...
> 
>> +       if (IS_ERR(*kbuf))
>> +               return *kbuf;
>> +
>> +       sr->kbuf = *kbuf;
>> +       if (sr->len > (*kbuf)->len)
>> +               sr->len = (*kbuf)->len;
>> +       req->flags |= REQ_F_BUFFER_SELECTED;
>> +
>> +       *cflags = (*kbuf)->bid << IORING_CQE_BUFFER_SHIFT;
>> +       *cflags |= IORING_CQE_F_BUFFER;
>> +       return u64_to_user_ptr((*kbuf)->addr);
>> +}
>> +
>>  static int io_send(struct io_kiocb *req, struct io_kiocb **nxt,
>>                    bool force_nonblock)
>>  {
>>  #if defined(CONFIG_NET)
>> +       struct io_buffer *kbuf = NULL;
>>         struct socket *sock;
>> -       int ret;
>> +       int ret, cflags = 0;
>>
>>         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
>>                 return -EINVAL;
>> @@ -3217,9 +3323,16 @@ static int io_send(struct io_kiocb *req, struct io_kiocb **nxt,
>>                 struct io_sr_msg *sr = &req->sr_msg;
>>                 struct msghdr msg;
>>                 struct iovec iov;
>> +               void __user *buf;
>>                 unsigned flags;
>>
>> -               ret = import_single_range(WRITE, sr->buf, sr->len, &iov,
>> +               buf = io_send_recv_buffer_select(req, &kbuf, &cflags);
> 
> ... and call io_send_recv_buffer_select() from here without holding
> any extra locks in this function. This function is then called from
> io_issue_sqe() (no extra locks held there either), which is called
> from __io_queue_sqe() (no extra locks AFAICS), which is called from
> places like task work.
> 
> Am I missing something?

Submission is all done under the ctx->uring_lock mutex, though the async
stuff does not. So for the normal path we should all be fine, as we'll
be serialized for that. We just need to ensure we do buffer select when
we prep the request for async execution, so the workers never have to do
it. Either that, or ensure the workers grab the mutex before doing the
grab (less ideal).

> It might in general be helpful to have more lockdep assertions in this
> code, both to help the reader understand the locking rules and to help
> verify locking correctness.

Agree, that'd be a good addition.

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [PATCH 3/3] io_uring: support buffer selection
  2020-02-24 15:57     ` Jens Axboe
@ 2020-02-24 17:06       ` Jens Axboe
  0 siblings, 0 replies; 10+ messages in thread
From: Jens Axboe @ 2020-02-24 17:06 UTC (permalink / raw)
  To: Jann Horn; +Cc: io-uring, andres

On 2/24/20 8:57 AM, Jens Axboe wrote:
>> ... and call io_send_recv_buffer_select() from here without holding
>> any extra locks in this function. This function is then called from
>> io_issue_sqe() (no extra locks held there either), which is called
>> from __io_queue_sqe() (no extra locks AFAICS), which is called from
>> places like task work.
>>
>> Am I missing something?
> 
> Submission is all done under the ctx->uring_lock mutex, though the async
> stuff does not. So for the normal path we should all be fine, as we'll
> be serialized for that. We just need to ensure we do buffer select when
> we prep the request for async execution, so the workers never have to do
> it. Either that, or ensure the workers grab the mutex before doing the
> grab (less ideal).
> 
>> It might in general be helpful to have more lockdep assertions in this
>> code, both to help the reader understand the locking rules and to help
>> verify locking correctness.
> 
> Agree, that'd be a good addition.

Tried to cater to both, here's an incremental that ensures we also grab
the uring_lock mutex for the async offload case, and adds lockdep
annotation for this particular case.


diff --git a/fs/io_uring.c b/fs/io_uring.c
index ba8d4e2d9f99..792ef01a521c 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -2173,7 +2173,7 @@ static ssize_t io_import_fixed(struct io_kiocb *req, int rw,
 }
 
 static struct io_buffer *io_buffer_select(struct io_kiocb *req, int gid,
-					  void *buf)
+					  void *buf, bool needs_lock)
 {
 	struct list_head *list;
 	struct io_buffer *kbuf;
@@ -2181,17 +2181,34 @@ static struct io_buffer *io_buffer_select(struct io_kiocb *req, int gid,
 	if (req->flags & REQ_F_BUFFER_SELECTED)
 		return buf;
 
+	/*
+	 * "Normal" inline submissions always hold the uring_lock, since we
+	 * grab it from the system call. Same is true for the SQPOLL offload.
+	 * The only exception is when we've detached the request and issue it
+	 * from an async worker thread, grab the lock for that case.
+	 */
+	if (needs_lock)
+		mutex_lock(&req->ctx->uring_lock);
+
+	lockdep_assert_held(&req->ctx->uring_lock);
+
 	list = idr_find(&req->ctx->io_buffer_idr, gid);
-	if (!list || list_empty(list))
-		return ERR_PTR(-ENOBUFS);
+	if (list && !list_empty(list)) {
+		kbuf = list_first_entry(list, struct io_buffer, list);
+		list_del(&kbuf->list);
+	} else {
+		kbuf = ERR_PTR(-ENOBUFS);
+	}
+
+	if (needs_lock)
+		mutex_unlock(&req->ctx->uring_lock);
 
-	kbuf = list_first_entry(list, struct io_buffer, list);
-	list_del(&kbuf->list);
 	return kbuf;
 }
 
 static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
-			       struct iovec **iovec, struct iov_iter *iter)
+			       struct iovec **iovec, struct iov_iter *iter,
+			       bool needs_lock)
 {
 	void __user *buf = u64_to_user_ptr(req->rw.addr);
 	size_t sqe_len = req->rw.len;
@@ -2215,7 +2232,7 @@ static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
 			int gid;
 
 			gid = (int) (unsigned long) req->rw.kiocb.private;
-			kbuf = io_buffer_select(req, gid, buf);
+			kbuf = io_buffer_select(req, gid, buf, needs_lock);
 			if (IS_ERR(kbuf)) {
 				*iovec = NULL;
 				return PTR_ERR(kbuf);
@@ -2369,7 +2386,7 @@ static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 	io = req->io;
 	io->rw.iov = io->rw.fast_iov;
 	req->io = NULL;
-	ret = io_import_iovec(READ, req, &io->rw.iov, &iter);
+	ret = io_import_iovec(READ, req, &io->rw.iov, &iter, !force_nonblock);
 	req->io = io;
 	if (ret < 0)
 		return ret;
@@ -2387,7 +2404,7 @@ static int io_read(struct io_kiocb *req, struct io_kiocb **nxt,
 	size_t iov_count;
 	ssize_t io_size, ret;
 
-	ret = io_import_iovec(READ, req, &iovec, &iter);
+	ret = io_import_iovec(READ, req, &iovec, &iter, !force_nonblock);
 	if (ret < 0)
 		return ret;
 
@@ -2459,7 +2476,7 @@ static int io_write_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 	io = req->io;
 	io->rw.iov = io->rw.fast_iov;
 	req->io = NULL;
-	ret = io_import_iovec(WRITE, req, &io->rw.iov, &iter);
+	ret = io_import_iovec(WRITE, req, &io->rw.iov, &iter, !force_nonblock);
 	req->io = io;
 	if (ret < 0)
 		return ret;
@@ -2477,7 +2494,7 @@ static int io_write(struct io_kiocb *req, struct io_kiocb **nxt,
 	size_t iov_count;
 	ssize_t ret, io_size;
 
-	ret = io_import_iovec(WRITE, req, &iovec, &iter);
+	ret = io_import_iovec(WRITE, req, &iovec, &iter, !force_nonblock);
 	if (ret < 0)
 		return ret;
 
@@ -2910,7 +2927,8 @@ static int io_provide_buffer_prep(struct io_kiocb *req,
 	return 0;
 }
 
-static int io_provide_buffer(struct io_kiocb *req, struct io_kiocb **nxt)
+static int io_provide_buffer(struct io_kiocb *req, struct io_kiocb **nxt,
+			     bool force_nonblock)
 {
 	struct io_provide_buf *p = &req->pbuf;
 	struct io_ring_ctx *ctx = req->ctx;
@@ -2918,6 +2936,17 @@ static int io_provide_buffer(struct io_kiocb *req, struct io_kiocb **nxt)
 	struct io_buffer *buf;
 	int ret = 0;
 
+	/*
+	 * "Normal" inline submissions always hold the uring_lock, since we
+	 * grab it from the system call. Same is true for the SQPOLL offload.
+	 * The only exception is when we've detached the request and issue it
+	 * from an async worker thread, grab the lock for that case.
+	 */
+	if (!force_nonblock)
+		mutex_lock(&ctx->uring_lock);
+
+	lockdep_assert_held(&ctx->uring_lock);
+
 	list = idr_find(&ctx->io_buffer_idr, p->gid);
 	if (!list) {
 		list = kmalloc(sizeof(*list), GFP_KERNEL);
@@ -2950,6 +2979,8 @@ static int io_provide_buffer(struct io_kiocb *req, struct io_kiocb **nxt)
 	list_add(&buf->list, list);
 	ret = buf->bid;
 out:
+	if (!force_nonblock)
+		mutex_unlock(&ctx->uring_lock);
 	if (ret < 0)
 		req_set_fail_links(req);
 	io_cqring_add_event(req, ret);
@@ -3387,14 +3418,15 @@ static int io_sendmsg(struct io_kiocb *req, struct io_kiocb **nxt,
 
 static struct io_buffer *io_send_recv_buffer_select(struct io_kiocb *req,
 						    struct io_buffer **kbuf,
-						    int *cflags)
+						    int *cflags,
+						    bool needs_lock)
 {
 	struct io_sr_msg *sr = &req->sr_msg;
 
 	if (!(req->flags & REQ_F_BUFFER_SELECT))
 		return req->sr_msg.buf;
 
-	*kbuf = io_buffer_select(req, sr->gid, sr->kbuf);
+	*kbuf = io_buffer_select(req, sr->gid, sr->kbuf, needs_lock);
 	if (IS_ERR(*kbuf))
 		return *kbuf;
 
@@ -3427,7 +3459,8 @@ static int io_send(struct io_kiocb *req, struct io_kiocb **nxt,
 		void __user *buf;
 		unsigned flags;
 
-		buf = io_send_recv_buffer_select(req, &kbuf, &cflags);
+		buf = io_send_recv_buffer_select(req, &kbuf, &cflags,
+							!force_nonblock);
 		if (IS_ERR(buf)) {
 			ret = PTR_ERR(buf);
 			goto out;
@@ -3592,7 +3625,8 @@ static int io_recv(struct io_kiocb *req, struct io_kiocb **nxt,
 		void __user *buf;
 		unsigned flags;
 
-		buf = io_send_recv_buffer_select(req, &kbuf, &cflags);
+		buf = io_send_recv_buffer_select(req, &kbuf, &cflags,
+							!force_nonblock);
 		if (IS_ERR(buf)) {
 			ret = PTR_ERR(buf);
 			goto out;
@@ -4903,7 +4937,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 			if (ret)
 				break;
 		}
-		ret = io_provide_buffer(req, nxt);
+		ret = io_provide_buffer(req, nxt, force_nonblock);
 		break;
 	default:
 		ret = -EINVAL;

-- 
Jens Axboe


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH 1/3] io_uring: buffer registration infrastructure
  2020-02-25 16:20 [PATCHSET v2b 0/3] io_uring support for automatic buffers Jens Axboe
@ 2020-02-25 16:20 ` Jens Axboe
  0 siblings, 0 replies; 10+ messages in thread
From: Jens Axboe @ 2020-02-25 16:20 UTC (permalink / raw)
  To: io-uring; +Cc: andres, Jens Axboe

This just prepares the ring for having lists of buffers associated with
it, that the application can provide for SQEs to consume instead of
providing their own.

The buffers are organized by group ID.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 33 +++++++++++++++++++++++++++++++++
 1 file changed, 33 insertions(+)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 040bdfc04874..d985da9252a2 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -200,6 +200,13 @@ struct fixed_file_data {
 	struct completion		done;
 };
 
+struct io_buffer {
+	struct list_head list;
+	__u64 addr;
+	__s32 len;
+	__u16 bid;
+};
+
 struct io_ring_ctx {
 	struct {
 		struct percpu_ref	refs;
@@ -277,6 +284,8 @@ struct io_ring_ctx {
 	struct socket		*ring_sock;
 #endif
 
+	struct idr		io_buffer_idr;
+
 	struct idr		personality_idr;
 
 	struct {
@@ -880,6 +889,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 	INIT_LIST_HEAD(&ctx->cq_overflow_list);
 	init_completion(&ctx->completions[0]);
 	init_completion(&ctx->completions[1]);
+	idr_init(&ctx->io_buffer_idr);
 	idr_init(&ctx->personality_idr);
 	mutex_init(&ctx->uring_lock);
 	init_waitqueue_head(&ctx->wait);
@@ -6570,6 +6580,28 @@ static int io_eventfd_unregister(struct io_ring_ctx *ctx)
 	return -ENXIO;
 }
 
+static int __io_destroy_buffers(int id, void *p, void *data)
+{
+	struct io_ring_ctx *ctx = data;
+	struct list_head *buf_list = p;
+	struct io_buffer *buf;
+
+	while (!list_empty(buf_list)) {
+		buf = list_first_entry(buf_list, struct io_buffer, list);
+		list_del(&buf->list);
+		kfree(buf);
+	}
+	idr_remove(&ctx->io_buffer_idr, id);
+	kfree(buf_list);
+	return 0;
+}
+
+static void io_destroy_buffers(struct io_ring_ctx *ctx)
+{
+	idr_for_each(&ctx->io_buffer_idr, __io_destroy_buffers, ctx);
+	idr_destroy(&ctx->io_buffer_idr);
+}
+
 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
 {
 	io_finish_async(ctx);
@@ -6580,6 +6612,7 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx)
 	io_sqe_buffer_unregister(ctx);
 	io_sqe_files_unregister(ctx);
 	io_eventfd_unregister(ctx);
+	io_destroy_buffers(ctx);
 	idr_destroy(&ctx->personality_idr);
 
 #if defined(CONFIG_UNIX)
-- 
2.25.1


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH 1/3] io_uring: buffer registration infrastructure
  2020-02-25 16:19 [PATCHSET v2 0/3] io_uring support for automatic buffers Jens Axboe
@ 2020-02-25 16:19 ` Jens Axboe
  0 siblings, 0 replies; 10+ messages in thread
From: Jens Axboe @ 2020-02-25 16:19 UTC (permalink / raw)
  To: io-uring; +Cc: andres, Jens Axboe

This just prepares the ring for having lists of buffers associated with
it, that the application can provide for SQEs to consume instead of
providing their own.

The buffers are organized by group ID.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 33 +++++++++++++++++++++++++++++++++
 1 file changed, 33 insertions(+)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 040bdfc04874..d985da9252a2 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -200,6 +200,13 @@ struct fixed_file_data {
 	struct completion		done;
 };
 
+struct io_buffer {
+	struct list_head list;
+	__u64 addr;
+	__s32 len;
+	__u16 bid;
+};
+
 struct io_ring_ctx {
 	struct {
 		struct percpu_ref	refs;
@@ -277,6 +284,8 @@ struct io_ring_ctx {
 	struct socket		*ring_sock;
 #endif
 
+	struct idr		io_buffer_idr;
+
 	struct idr		personality_idr;
 
 	struct {
@@ -880,6 +889,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 	INIT_LIST_HEAD(&ctx->cq_overflow_list);
 	init_completion(&ctx->completions[0]);
 	init_completion(&ctx->completions[1]);
+	idr_init(&ctx->io_buffer_idr);
 	idr_init(&ctx->personality_idr);
 	mutex_init(&ctx->uring_lock);
 	init_waitqueue_head(&ctx->wait);
@@ -6570,6 +6580,28 @@ static int io_eventfd_unregister(struct io_ring_ctx *ctx)
 	return -ENXIO;
 }
 
+static int __io_destroy_buffers(int id, void *p, void *data)
+{
+	struct io_ring_ctx *ctx = data;
+	struct list_head *buf_list = p;
+	struct io_buffer *buf;
+
+	while (!list_empty(buf_list)) {
+		buf = list_first_entry(buf_list, struct io_buffer, list);
+		list_del(&buf->list);
+		kfree(buf);
+	}
+	idr_remove(&ctx->io_buffer_idr, id);
+	kfree(buf_list);
+	return 0;
+}
+
+static void io_destroy_buffers(struct io_ring_ctx *ctx)
+{
+	idr_for_each(&ctx->io_buffer_idr, __io_destroy_buffers, ctx);
+	idr_destroy(&ctx->io_buffer_idr);
+}
+
 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
 {
 	io_finish_async(ctx);
@@ -6580,6 +6612,7 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx)
 	io_sqe_buffer_unregister(ctx);
 	io_sqe_files_unregister(ctx);
 	io_eventfd_unregister(ctx);
+	io_destroy_buffers(ctx);
 	idr_destroy(&ctx->personality_idr);
 
 #if defined(CONFIG_UNIX)
-- 
2.25.1


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH 1/3] io_uring: buffer registration infrastructure
  2020-02-25 16:04 [PATCHSET v2 0/3] io_uring support for automatic buffers Jens Axboe
@ 2020-02-25 16:04 ` Jens Axboe
  0 siblings, 0 replies; 10+ messages in thread
From: Jens Axboe @ 2020-02-25 16:04 UTC (permalink / raw)
  To: io-uring; +Cc: andres, Jens Axboe

This just prepares the ring for having lists of buffers associated with
it, that the application can provide for SQEs to consume instead of
providing their own.

The buffers are organized by group ID.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 33 +++++++++++++++++++++++++++++++++
 1 file changed, 33 insertions(+)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 040bdfc04874..d985da9252a2 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -200,6 +200,13 @@ struct fixed_file_data {
 	struct completion		done;
 };
 
+struct io_buffer {
+	struct list_head list;
+	__u64 addr;
+	__s32 len;
+	__u16 bid;
+};
+
 struct io_ring_ctx {
 	struct {
 		struct percpu_ref	refs;
@@ -277,6 +284,8 @@ struct io_ring_ctx {
 	struct socket		*ring_sock;
 #endif
 
+	struct idr		io_buffer_idr;
+
 	struct idr		personality_idr;
 
 	struct {
@@ -880,6 +889,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 	INIT_LIST_HEAD(&ctx->cq_overflow_list);
 	init_completion(&ctx->completions[0]);
 	init_completion(&ctx->completions[1]);
+	idr_init(&ctx->io_buffer_idr);
 	idr_init(&ctx->personality_idr);
 	mutex_init(&ctx->uring_lock);
 	init_waitqueue_head(&ctx->wait);
@@ -6570,6 +6580,28 @@ static int io_eventfd_unregister(struct io_ring_ctx *ctx)
 	return -ENXIO;
 }
 
+static int __io_destroy_buffers(int id, void *p, void *data)
+{
+	struct io_ring_ctx *ctx = data;
+	struct list_head *buf_list = p;
+	struct io_buffer *buf;
+
+	while (!list_empty(buf_list)) {
+		buf = list_first_entry(buf_list, struct io_buffer, list);
+		list_del(&buf->list);
+		kfree(buf);
+	}
+	idr_remove(&ctx->io_buffer_idr, id);
+	kfree(buf_list);
+	return 0;
+}
+
+static void io_destroy_buffers(struct io_ring_ctx *ctx)
+{
+	idr_for_each(&ctx->io_buffer_idr, __io_destroy_buffers, ctx);
+	idr_destroy(&ctx->io_buffer_idr);
+}
+
 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
 {
 	io_finish_async(ctx);
@@ -6580,6 +6612,7 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx)
 	io_sqe_buffer_unregister(ctx);
 	io_sqe_files_unregister(ctx);
 	io_eventfd_unregister(ctx);
+	io_destroy_buffers(ctx);
 	idr_destroy(&ctx->personality_idr);
 
 #if defined(CONFIG_UNIX)
-- 
2.25.1


^ permalink raw reply related	[flat|nested] 10+ messages in thread

end of thread, other threads:[~2020-02-25 16:21 UTC | newest]

Thread overview: 10+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-02-24  2:56 [PATCHSET 0/3] io_uring support for automatic buffers Jens Axboe
2020-02-24  2:56 ` [PATCH 1/3] io_uring: buffer registration infrastructure Jens Axboe
2020-02-24  2:56 ` [PATCH 2/3] io_uring: add IORING_OP_PROVIDE_BUFFER Jens Axboe
2020-02-24  2:56 ` [PATCH 3/3] io_uring: support buffer selection Jens Axboe
2020-02-24 15:50   ` Jann Horn
2020-02-24 15:57     ` Jens Axboe
2020-02-24 17:06       ` Jens Axboe
2020-02-25 16:04 [PATCHSET v2 0/3] io_uring support for automatic buffers Jens Axboe
2020-02-25 16:04 ` [PATCH 1/3] io_uring: buffer registration infrastructure Jens Axboe
2020-02-25 16:19 [PATCHSET v2 0/3] io_uring support for automatic buffers Jens Axboe
2020-02-25 16:19 ` [PATCH 1/3] io_uring: buffer registration infrastructure Jens Axboe
2020-02-25 16:20 [PATCHSET v2b 0/3] io_uring support for automatic buffers Jens Axboe
2020-02-25 16:20 ` [PATCH 1/3] io_uring: buffer registration infrastructure Jens Axboe

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).