Linux-Block Archive on lore.kernel.org
 help / color / Atom feed
* [PATCHSET v2 0/3] io_uring CQ ring backpressure
@ 2019-11-06 23:53 Jens Axboe
  2019-11-06 23:53 ` [PATCH 1/3] io_uring: make io_cqring_events() take 'ctx' as argument Jens Axboe
                   ` (2 more replies)
  0 siblings, 3 replies; 7+ messages in thread
From: Jens Axboe @ 2019-11-06 23:53 UTC (permalink / raw)
  To: io-uring; +Cc: linux-block, asml.silence, jannh

Currently we drop completion events, if the CQ ring is full. That's fine
for requests with bounded completion times, but it may make it harder to
use io_uring with networked IO where request completion times are
generally unbounded. Or with POLL, for example, which is also unbounded.

This patch adds IORING_SETUP_CQ_NODROP, which changes the behavior a bit
for CQ ring overflows. First of all, it doesn't overflow the ring, it
simply stores backlog of completions that we weren't able to put into
the CQ ring. To prevent the backlog from growing indefinitely, if the
backlog is non-empty, we apply back pressure on IO submissions. Any
attempt to submit new IO with a non-empty backlog will get an -EBUSY
return from the kernel.

I think that makes for a pretty sane API in terms of how the application
can handle it. With CQ_NODROP enabled, we'll never drop a completion
event, but we'll also not allow submissions with a completion backlog.

Changes since v1:

- Drop the cqe_drop structure and allocation, simply use the io_kiocb
  for the overflow backlog
- Rebase on top of Pavel's series which made this cleaner
- Add prep patch for the fill/add CQ handler changes

 fs/io_uring.c                 | 203 +++++++++++++++++++++++-----------
 include/uapi/linux/io_uring.h |   1 +
 2 files changed, 138 insertions(+), 66 deletions(-)

-- 
Jens Axboe



^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH 1/3] io_uring: make io_cqring_events() take 'ctx' as argument
  2019-11-06 23:53 [PATCHSET v2 0/3] io_uring CQ ring backpressure Jens Axboe
@ 2019-11-06 23:53 ` Jens Axboe
  2019-11-06 23:53 ` [PATCH 2/3] io_uring: pass in io_kiocb to fill/add CQ handlers Jens Axboe
  2019-11-06 23:53 ` [PATCH 3/3] io_uring: add support for backlogged CQ ring Jens Axboe
  2 siblings, 0 replies; 7+ messages in thread
From: Jens Axboe @ 2019-11-06 23:53 UTC (permalink / raw)
  To: io-uring; +Cc: linux-block, asml.silence, jannh, Jens Axboe

The rings can be derived from the ctx, and we need the ctx there for
a future change.

No functional changes in this patch.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index b0dd8d322360..36ca7bc38ebf 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -859,8 +859,10 @@ static void io_put_req(struct io_kiocb *req, struct io_kiocb **nxtptr)
 	}
 }
 
-static unsigned io_cqring_events(struct io_rings *rings)
+static unsigned io_cqring_events(struct io_ring_ctx *ctx)
 {
+	struct io_rings *rings = ctx->rings;
+
 	/* See comment at the top of this file */
 	smp_rmb();
 	return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
@@ -1016,7 +1018,7 @@ static int __io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
 		 * If we do, we can potentially be spinning for commands that
 		 * already triggered a CQE (eg in error).
 		 */
-		if (io_cqring_events(ctx->rings))
+		if (io_cqring_events(ctx))
 			break;
 
 		/*
@@ -3064,7 +3066,7 @@ static inline bool io_should_wake(struct io_wait_queue *iowq)
 	 * started waiting. For timeouts, we always want to return to userspace,
 	 * regardless of event count.
 	 */
-	return io_cqring_events(ctx->rings) >= iowq->to_wait ||
+	return io_cqring_events(ctx) >= iowq->to_wait ||
 			atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
 }
 
@@ -3099,7 +3101,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
 	struct io_rings *rings = ctx->rings;
 	int ret = 0;
 
-	if (io_cqring_events(rings) >= min_events)
+	if (io_cqring_events(ctx) >= min_events)
 		return 0;
 
 	if (sig) {
-- 
2.24.0


^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH 2/3] io_uring: pass in io_kiocb to fill/add CQ handlers
  2019-11-06 23:53 [PATCHSET v2 0/3] io_uring CQ ring backpressure Jens Axboe
  2019-11-06 23:53 ` [PATCH 1/3] io_uring: make io_cqring_events() take 'ctx' as argument Jens Axboe
@ 2019-11-06 23:53 ` Jens Axboe
  2019-11-07  9:53   ` Pavel Begunkov
  2019-11-06 23:53 ` [PATCH 3/3] io_uring: add support for backlogged CQ ring Jens Axboe
  2 siblings, 1 reply; 7+ messages in thread
From: Jens Axboe @ 2019-11-06 23:53 UTC (permalink / raw)
  To: io-uring; +Cc: linux-block, asml.silence, jannh, Jens Axboe

This is in preparation for handling CQ ring overflow a bit smarter. We
should not have any functional changes in this patch. Most of the changes
are fairly straight forward, the only ones that stick out a bit are the
ones that change:

__io_free_req()

to a double io_put_req(). If the request hasn't been submitted yet, we
know it's safe to simply ignore references and free it. But let's clean
these up too, as later patches will depend on the caller doing the right
thing if the completion logging grabs a reference to the request.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 90 +++++++++++++++++++++++++--------------------------
 1 file changed, 45 insertions(+), 45 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 36ca7bc38ebf..fb621a564dcf 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -369,8 +369,7 @@ struct io_submit_state {
 };
 
 static void io_wq_submit_work(struct io_wq_work **workptr);
-static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
-				 long res);
+static void io_cqring_fill_event(struct io_kiocb *req, long res);
 static void __io_free_req(struct io_kiocb *req);
 static void io_put_req(struct io_kiocb *req, struct io_kiocb **nxtptr);
 
@@ -535,8 +534,8 @@ static void io_kill_timeout(struct io_kiocb *req)
 	if (ret != -1) {
 		atomic_inc(&req->ctx->cq_timeouts);
 		list_del_init(&req->list);
-		io_cqring_fill_event(req->ctx, req->user_data, 0);
-		__io_free_req(req);
+		io_cqring_fill_event(req, 0);
+		io_put_req(req, NULL);
 	}
 }
 
@@ -588,12 +587,12 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
 	return &rings->cqes[tail & ctx->cq_mask];
 }
 
-static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
-				 long res)
+static void io_cqring_fill_event(struct io_kiocb *req, long res)
 {
+	struct io_ring_ctx *ctx = req->ctx;
 	struct io_uring_cqe *cqe;
 
-	trace_io_uring_complete(ctx, ki_user_data, res);
+	trace_io_uring_complete(ctx, req->user_data, res);
 
 	/*
 	 * If we can't get a cq entry, userspace overflowed the
@@ -602,7 +601,7 @@ static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
 	 */
 	cqe = io_get_cqring(ctx);
 	if (cqe) {
-		WRITE_ONCE(cqe->user_data, ki_user_data);
+		WRITE_ONCE(cqe->user_data, req->user_data);
 		WRITE_ONCE(cqe->res, res);
 		WRITE_ONCE(cqe->flags, 0);
 	} else {
@@ -621,13 +620,13 @@ static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
 		eventfd_signal(ctx->cq_ev_fd, 1);
 }
 
-static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data,
-				long res)
+static void io_cqring_add_event(struct io_kiocb *req, long res)
 {
+	struct io_ring_ctx *ctx = req->ctx;
 	unsigned long flags;
 
 	spin_lock_irqsave(&ctx->completion_lock, flags);
-	io_cqring_fill_event(ctx, user_data, res);
+	io_cqring_fill_event(req, res);
 	io_commit_cqring(ctx);
 	spin_unlock_irqrestore(&ctx->completion_lock, flags);
 
@@ -721,10 +720,10 @@ static void io_link_cancel_timeout(struct io_ring_ctx *ctx,
 
 	ret = hrtimer_try_to_cancel(&req->timeout.timer);
 	if (ret != -1) {
-		io_cqring_fill_event(ctx, req->user_data, -ECANCELED);
+		io_cqring_fill_event(req, -ECANCELED);
 		io_commit_cqring(ctx);
 		req->flags &= ~REQ_F_LINK;
-		__io_free_req(req);
+		io_put_req(req, NULL);
 	}
 }
 
@@ -804,8 +803,10 @@ static void io_fail_links(struct io_kiocb *req)
 		    link->submit.sqe->opcode == IORING_OP_LINK_TIMEOUT) {
 			io_link_cancel_timeout(ctx, link);
 		} else {
-			io_cqring_fill_event(ctx, link->user_data, -ECANCELED);
-			__io_free_req(link);
+			io_cqring_fill_event(link, -ECANCELED);
+			/* drop both submit and complete references */
+			io_put_req(link, NULL);
+			io_put_req(link, NULL);
 		}
 	}
 
@@ -891,7 +892,7 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
 		req = list_first_entry(done, struct io_kiocb, list);
 		list_del(&req->list);
 
-		io_cqring_fill_event(ctx, req->user_data, req->result);
+		io_cqring_fill_event(req, req->result);
 		(*nr_events)++;
 
 		if (refcount_dec_and_test(&req->refs)) {
@@ -1087,7 +1088,7 @@ static void io_complete_rw_common(struct kiocb *kiocb, long res)
 
 	if ((req->flags & REQ_F_LINK) && res != req->result)
 		req->flags |= REQ_F_FAIL_LINK;
-	io_cqring_add_event(req->ctx, req->user_data, res);
+	io_cqring_add_event(req, res);
 }
 
 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
@@ -1588,15 +1589,14 @@ static int io_write(struct io_kiocb *req, struct io_kiocb **nxt,
 /*
  * IORING_OP_NOP just posts a completion event, nothing else.
  */
-static int io_nop(struct io_kiocb *req, u64 user_data)
+static int io_nop(struct io_kiocb *req)
 {
 	struct io_ring_ctx *ctx = req->ctx;
-	long err = 0;
 
 	if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
 		return -EINVAL;
 
-	io_cqring_add_event(ctx, user_data, err);
+	io_cqring_add_event(req, 0);
 	io_put_req(req, NULL);
 	return 0;
 }
@@ -1643,7 +1643,7 @@ static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 
 	if (ret < 0 && (req->flags & REQ_F_LINK))
 		req->flags |= REQ_F_FAIL_LINK;
-	io_cqring_add_event(req->ctx, sqe->user_data, ret);
+	io_cqring_add_event(req, ret);
 	io_put_req(req, nxt);
 	return 0;
 }
@@ -1690,7 +1690,7 @@ static int io_sync_file_range(struct io_kiocb *req,
 
 	if (ret < 0 && (req->flags & REQ_F_LINK))
 		req->flags |= REQ_F_FAIL_LINK;
-	io_cqring_add_event(req->ctx, sqe->user_data, ret);
+	io_cqring_add_event(req, ret);
 	io_put_req(req, nxt);
 	return 0;
 }
@@ -1726,7 +1726,7 @@ static int io_send_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 			return ret;
 	}
 
-	io_cqring_add_event(req->ctx, sqe->user_data, ret);
+	io_cqring_add_event(req, ret);
 	if (ret < 0 && (req->flags & REQ_F_LINK))
 		req->flags |= REQ_F_FAIL_LINK;
 	io_put_req(req, nxt);
@@ -1782,7 +1782,7 @@ static int io_accept(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 	}
 	if (ret < 0 && (req->flags & REQ_F_LINK))
 		req->flags |= REQ_F_FAIL_LINK;
-	io_cqring_add_event(req->ctx, sqe->user_data, ret);
+	io_cqring_add_event(req, ret);
 	io_put_req(req, nxt);
 	return 0;
 #else
@@ -1843,7 +1843,7 @@ static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 	}
 	spin_unlock_irq(&ctx->completion_lock);
 
-	io_cqring_add_event(req->ctx, sqe->user_data, ret);
+	io_cqring_add_event(req, ret);
 	if (ret < 0 && (req->flags & REQ_F_LINK))
 		req->flags |= REQ_F_FAIL_LINK;
 	io_put_req(req, NULL);
@@ -1854,7 +1854,7 @@ static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
 			     __poll_t mask)
 {
 	req->poll.done = true;
-	io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask));
+	io_cqring_fill_event(req, mangle_poll(mask));
 	io_commit_cqring(ctx);
 }
 
@@ -2048,7 +2048,7 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
 		list_del_init(&req->list);
 	}
 
-	io_cqring_fill_event(ctx, req->user_data, -ETIME);
+	io_cqring_fill_event(req, -ETIME);
 	io_commit_cqring(ctx);
 	spin_unlock_irqrestore(&ctx->completion_lock, flags);
 
@@ -2092,7 +2092,7 @@ static int io_timeout_remove(struct io_kiocb *req,
 	/* didn't find timeout */
 	if (ret) {
 fill_ev:
-		io_cqring_fill_event(ctx, req->user_data, ret);
+		io_cqring_fill_event(req, ret);
 		io_commit_cqring(ctx);
 		spin_unlock_irq(&ctx->completion_lock);
 		io_cqring_ev_posted(ctx);
@@ -2108,8 +2108,8 @@ static int io_timeout_remove(struct io_kiocb *req,
 		goto fill_ev;
 	}
 
-	io_cqring_fill_event(ctx, req->user_data, 0);
-	io_cqring_fill_event(ctx, treq->user_data, -ECANCELED);
+	io_cqring_fill_event(req, 0);
+	io_cqring_fill_event(treq, -ECANCELED);
 	io_commit_cqring(ctx);
 	spin_unlock_irq(&ctx->completion_lock);
 	io_cqring_ev_posted(ctx);
@@ -2249,7 +2249,7 @@ static int io_async_cancel(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 
 	if (ret < 0 && (req->flags & REQ_F_LINK))
 		req->flags |= REQ_F_FAIL_LINK;
-	io_cqring_add_event(req->ctx, sqe->user_data, ret);
+	io_cqring_add_event(req, ret);
 	io_put_req(req, nxt);
 	return 0;
 }
@@ -2288,12 +2288,10 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 	int ret, opcode;
 	struct sqe_submit *s = &req->submit;
 
-	req->user_data = READ_ONCE(s->sqe->user_data);
-
 	opcode = READ_ONCE(s->sqe->opcode);
 	switch (opcode) {
 	case IORING_OP_NOP:
-		ret = io_nop(req, req->user_data);
+		ret = io_nop(req);
 		break;
 	case IORING_OP_READV:
 		if (unlikely(s->sqe->buf_index))
@@ -2402,7 +2400,7 @@ static void io_wq_submit_work(struct io_wq_work **workptr)
 	if (ret) {
 		if (req->flags & REQ_F_LINK)
 			req->flags |= REQ_F_FAIL_LINK;
-		io_cqring_add_event(ctx, sqe->user_data, ret);
+		io_cqring_add_event(req, ret);
 		io_put_req(req, NULL);
 	}
 
@@ -2530,7 +2528,7 @@ static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
 	if (prev)
 		ret = io_async_cancel_one(ctx, (void *) prev->user_data);
 
-	io_cqring_add_event(ctx, req->user_data, ret);
+	io_cqring_add_event(req, ret);
 	io_put_req(req, NULL);
 	return HRTIMER_NORESTART;
 }
@@ -2573,7 +2571,7 @@ static int io_queue_linked_timeout(struct io_kiocb *req, struct io_kiocb *nxt)
 		 * failed by the regular submission path.
 		 */
 		list_del(&nxt->list);
-		io_cqring_fill_event(ctx, nxt->user_data, ret);
+		io_cqring_fill_event(nxt, ret);
 		trace_io_uring_fail_link(req, nxt);
 		io_commit_cqring(ctx);
 		io_put_req(nxt, NULL);
@@ -2646,7 +2644,7 @@ static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req)
 
 	/* and drop final reference, if we failed */
 	if (ret) {
-		io_cqring_add_event(ctx, req->user_data, ret);
+		io_cqring_add_event(req, ret);
 		if (req->flags & REQ_F_LINK)
 			req->flags |= REQ_F_FAIL_LINK;
 		io_put_req(req, NULL);
@@ -2662,7 +2660,7 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req)
 	ret = io_req_defer(ctx, req);
 	if (ret) {
 		if (ret != -EIOCBQUEUED) {
-			io_cqring_add_event(ctx, req->submit.sqe->user_data, ret);
+			io_cqring_add_event(req, ret);
 			io_free_req(req, NULL);
 		}
 		return 0;
@@ -2689,8 +2687,8 @@ static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
 	ret = io_req_defer(ctx, req);
 	if (ret) {
 		if (ret != -EIOCBQUEUED) {
-			io_cqring_add_event(ctx, req->submit.sqe->user_data, ret);
-			io_free_req(req, NULL);
+			io_cqring_add_event(req, ret);
+			io_put_req(req, NULL);
 			__io_free_req(shadow);
 			return 0;
 		}
@@ -2723,6 +2721,8 @@ static void io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 	struct sqe_submit *s = &req->submit;
 	int ret;
 
+	req->user_data = s->sqe->user_data;
+
 	/* enforce forwards compatibility on users */
 	if (unlikely(s->sqe->flags & ~SQE_VALID_FLAGS)) {
 		ret = -EINVAL;
@@ -2732,13 +2732,13 @@ static void io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 	ret = io_req_set_file(ctx, state, req);
 	if (unlikely(ret)) {
 err_req:
-		io_cqring_add_event(ctx, s->sqe->user_data, ret);
-		io_free_req(req, NULL);
+		io_cqring_add_event(req, ret);
+		/* drop both submit and complete references */
+		io_put_req(req, NULL);
+		io_put_req(req, NULL);
 		return;
 	}
 
-	req->user_data = s->sqe->user_data;
-
 	/*
 	 * If we already have a head request, queue this one for async
 	 * submittal once the head completes. If we don't have a head but
-- 
2.24.0


^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH 3/3] io_uring: add support for backlogged CQ ring
  2019-11-06 23:53 [PATCHSET v2 0/3] io_uring CQ ring backpressure Jens Axboe
  2019-11-06 23:53 ` [PATCH 1/3] io_uring: make io_cqring_events() take 'ctx' as argument Jens Axboe
  2019-11-06 23:53 ` [PATCH 2/3] io_uring: pass in io_kiocb to fill/add CQ handlers Jens Axboe
@ 2019-11-06 23:53 ` Jens Axboe
  2 siblings, 0 replies; 7+ messages in thread
From: Jens Axboe @ 2019-11-06 23:53 UTC (permalink / raw)
  To: io-uring; +Cc: linux-block, asml.silence, jannh, Jens Axboe

Currently we drop completion events, if the CQ ring is full. That's fine
for requests with bounded completion times, but it may make it harder to
use io_uring with networked IO where request completion times are
generally unbounded. Or with POLL, for example, which is also unbounded.

This patch adds IORING_SETUP_CQ_NODROP, which changes the behavior a bit
for CQ ring overflows. First of all, it doesn't overflow the ring, it
simply stores a backlog of completions that we weren't able to put into
the CQ ring. To prevent the backlog from growing indefinitely, if the
backlog is non-empty, we apply back pressure on IO submissions. Any
attempt to submit new IO with a non-empty backlog will get an -EBUSY
return from the kernel. This is a signal to the application that it has
backlogged CQ events, and that it must reap those before being allowed
to submit more IO.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c                 | 103 ++++++++++++++++++++++++++++------
 include/uapi/linux/io_uring.h |   1 +
 2 files changed, 87 insertions(+), 17 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index fb621a564dcf..22373b7b3db0 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -207,6 +207,7 @@ struct io_ring_ctx {
 
 		struct list_head	defer_list;
 		struct list_head	timeout_list;
+		struct list_head	cq_overflow_list;
 
 		wait_queue_head_t	inflight_wait;
 	} ____cacheline_aligned_in_smp;
@@ -413,6 +414,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 
 	ctx->flags = p->flags;
 	init_waitqueue_head(&ctx->cq_wait);
+	INIT_LIST_HEAD(&ctx->cq_overflow_list);
 	init_completion(&ctx->ctx_done);
 	init_completion(&ctx->sqo_thread_started);
 	mutex_init(&ctx->uring_lock);
@@ -587,6 +589,72 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
 	return &rings->cqes[tail & ctx->cq_mask];
 }
 
+static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
+{
+	if (waitqueue_active(&ctx->wait))
+		wake_up(&ctx->wait);
+	if (waitqueue_active(&ctx->sqo_wait))
+		wake_up(&ctx->sqo_wait);
+	if (ctx->cq_ev_fd)
+		eventfd_signal(ctx->cq_ev_fd, 1);
+}
+
+static void io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
+{
+	struct io_rings *rings = ctx->rings;
+	struct io_uring_cqe *cqe;
+	struct io_kiocb *req;
+	unsigned long flags;
+	LIST_HEAD(list);
+
+	if (list_empty_careful(&ctx->cq_overflow_list))
+		return;
+	if (ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
+	    rings->cq_ring_entries)
+		return;
+
+	spin_lock_irqsave(&ctx->completion_lock, flags);
+
+	while (!list_empty(&ctx->cq_overflow_list)) {
+		cqe = io_get_cqring(ctx);
+		if (!cqe && !force)
+			break;
+
+		req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
+						list);
+		list_move(&req->list, &list);
+		if (cqe) {
+			WRITE_ONCE(cqe->user_data, req->user_data);
+			WRITE_ONCE(cqe->res, req->result);
+			WRITE_ONCE(cqe->flags, 0);
+		}
+	}
+
+	io_commit_cqring(ctx);
+	spin_unlock_irqrestore(&ctx->completion_lock, flags);
+	io_cqring_ev_posted(ctx);
+
+	while (!list_empty(&list)) {
+		req = list_first_entry(&list, struct io_kiocb, list);
+		list_del(&req->list);
+		io_put_req(req, NULL);
+	}
+}
+
+static void io_cqring_overflow(struct io_ring_ctx *ctx, struct io_kiocb *req,
+			       long res)
+	__must_hold(&ctx->completion_lock)
+{
+	if (!(ctx->flags & IORING_SETUP_CQ_NODROP)) {
+		WRITE_ONCE(ctx->rings->cq_overflow,
+				atomic_inc_return(&ctx->cached_cq_overflow));
+	} else {
+		refcount_inc(&req->refs);
+		req->result = res;
+		list_add_tail(&req->list, &ctx->cq_overflow_list);
+	}
+}
+
 static void io_cqring_fill_event(struct io_kiocb *req, long res)
 {
 	struct io_ring_ctx *ctx = req->ctx;
@@ -600,26 +668,15 @@ static void io_cqring_fill_event(struct io_kiocb *req, long res)
 	 * the ring.
 	 */
 	cqe = io_get_cqring(ctx);
-	if (cqe) {
+	if (likely(cqe)) {
 		WRITE_ONCE(cqe->user_data, req->user_data);
 		WRITE_ONCE(cqe->res, res);
 		WRITE_ONCE(cqe->flags, 0);
 	} else {
-		WRITE_ONCE(ctx->rings->cq_overflow,
-				atomic_inc_return(&ctx->cached_cq_overflow));
+		io_cqring_overflow(ctx, req, res);
 	}
 }
 
-static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
-{
-	if (waitqueue_active(&ctx->wait))
-		wake_up(&ctx->wait);
-	if (waitqueue_active(&ctx->sqo_wait))
-		wake_up(&ctx->sqo_wait);
-	if (ctx->cq_ev_fd)
-		eventfd_signal(ctx->cq_ev_fd, 1);
-}
-
 static void io_cqring_add_event(struct io_kiocb *req, long res)
 {
 	struct io_ring_ctx *ctx = req->ctx;
@@ -864,6 +921,9 @@ static unsigned io_cqring_events(struct io_ring_ctx *ctx)
 {
 	struct io_rings *rings = ctx->rings;
 
+	if (ctx->flags & IORING_SETUP_CQ_NODROP)
+		io_cqring_overflow_flush(ctx, false);
+
 	/* See comment at the top of this file */
 	smp_rmb();
 	return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
@@ -2863,6 +2923,10 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
 	int i, submitted = 0;
 	bool mm_fault = false;
 
+	if ((ctx->flags & IORING_SETUP_CQ_NODROP) &&
+	    !list_empty(&ctx->cq_overflow_list))
+		return -EBUSY;
+
 	if (nr > IO_PLUG_THRESHOLD) {
 		io_submit_state_start(&state, ctx, nr);
 		statep = &state;
@@ -2951,6 +3015,7 @@ static int io_sq_thread(void *data)
 	timeout = inflight = 0;
 	while (!kthread_should_park()) {
 		unsigned int to_submit;
+		int ret;
 
 		if (inflight) {
 			unsigned nr_events = 0;
@@ -3035,8 +3100,9 @@ static int io_sq_thread(void *data)
 		}
 
 		to_submit = min(to_submit, ctx->sq_entries);
-		inflight += io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm,
-					   true);
+		ret = io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm, true);
+		if (ret > 0)
+			inflight += ret;
 	}
 
 	set_fs(old_fs);
@@ -4100,8 +4166,10 @@ static int io_uring_flush(struct file *file, void *data)
 	struct io_ring_ctx *ctx = file->private_data;
 
 	io_uring_cancel_files(ctx, data);
-	if (fatal_signal_pending(current) || (current->flags & PF_EXITING))
+	if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) {
+		io_cqring_overflow_flush(ctx, true);
 		io_wq_cancel_all(ctx->io_wq);
+	}
 	return 0;
 }
 
@@ -4402,7 +4470,8 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
 	}
 
 	if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
-			IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE))
+			IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
+			IORING_SETUP_CQ_NODROP))
 		return -EINVAL;
 
 	ret = io_uring_create(entries, &p);
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index f1a118b01d18..3d8517eb376e 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -56,6 +56,7 @@ struct io_uring_sqe {
 #define IORING_SETUP_SQPOLL	(1U << 1)	/* SQ poll thread */
 #define IORING_SETUP_SQ_AFF	(1U << 2)	/* sq_thread_cpu is valid */
 #define IORING_SETUP_CQSIZE	(1U << 3)	/* app defines CQ size */
+#define IORING_SETUP_CQ_NODROP	(1U << 4)	/* no CQ drops */
 
 #define IORING_OP_NOP		0
 #define IORING_OP_READV		1
-- 
2.24.0


^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH 2/3] io_uring: pass in io_kiocb to fill/add CQ handlers
  2019-11-06 23:53 ` [PATCH 2/3] io_uring: pass in io_kiocb to fill/add CQ handlers Jens Axboe
@ 2019-11-07  9:53   ` Pavel Begunkov
  2019-11-07 13:16     ` Jens Axboe
  0 siblings, 1 reply; 7+ messages in thread
From: Pavel Begunkov @ 2019-11-07  9:53 UTC (permalink / raw)
  To: Jens Axboe, io-uring; +Cc: linux-block, jannh

[-- Attachment #1.1: Type: text/plain, Size: 12542 bytes --]

On 07/11/2019 02:53, Jens Axboe wrote:
> This is in preparation for handling CQ ring overflow a bit smarter. We
> should not have any functional changes in this patch. Most of the changes
> are fairly straight forward, the only ones that stick out a bit are the
> ones that change:
> 
> __io_free_req()
> 
> to a double io_put_req(). If the request hasn't been submitted yet, we
> know it's safe to simply ignore references and free it. But let's clean
> these up too, as later patches will depend on the caller doing the right
> thing if the completion logging grabs a reference to the request.
> 
> Signed-off-by: Jens Axboe <axboe@kernel.dk>
> ---
>  fs/io_uring.c | 90 +++++++++++++++++++++++++--------------------------
>  1 file changed, 45 insertions(+), 45 deletions(-)
> 
> diff --git a/fs/io_uring.c b/fs/io_uring.c
> index 36ca7bc38ebf..fb621a564dcf 100644
> --- a/fs/io_uring.c
> +++ b/fs/io_uring.c
> @@ -369,8 +369,7 @@ struct io_submit_state {
>  };
>  
>  static void io_wq_submit_work(struct io_wq_work **workptr);
> -static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
> -				 long res);
> +static void io_cqring_fill_event(struct io_kiocb *req, long res);
>  static void __io_free_req(struct io_kiocb *req);
>  static void io_put_req(struct io_kiocb *req, struct io_kiocb **nxtptr);
>  
> @@ -535,8 +534,8 @@ static void io_kill_timeout(struct io_kiocb *req)
>  	if (ret != -1) {
>  		atomic_inc(&req->ctx->cq_timeouts);
>  		list_del_init(&req->list);
> -		io_cqring_fill_event(req->ctx, req->user_data, 0);
> -		__io_free_req(req);
> +		io_cqring_fill_event(req, 0);
> +		io_put_req(req, NULL);
>  	}
>  }
>  
> @@ -588,12 +587,12 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
>  	return &rings->cqes[tail & ctx->cq_mask];
>  }
>  
> -static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
> -				 long res)
> +static void io_cqring_fill_event(struct io_kiocb *req, long res)
>  {
> +	struct io_ring_ctx *ctx = req->ctx;
>  	struct io_uring_cqe *cqe;
>  
> -	trace_io_uring_complete(ctx, ki_user_data, res);
> +	trace_io_uring_complete(ctx, req->user_data, res);
>  
>  	/*
>  	 * If we can't get a cq entry, userspace overflowed the
> @@ -602,7 +601,7 @@ static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
>  	 */
>  	cqe = io_get_cqring(ctx);
>  	if (cqe) {
> -		WRITE_ONCE(cqe->user_data, ki_user_data);
> +		WRITE_ONCE(cqe->user_data, req->user_data);
>  		WRITE_ONCE(cqe->res, res);
>  		WRITE_ONCE(cqe->flags, 0);
>  	} else {
> @@ -621,13 +620,13 @@ static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
>  		eventfd_signal(ctx->cq_ev_fd, 1);
>  }
>  
> -static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data,
> -				long res)
> +static void io_cqring_add_event(struct io_kiocb *req, long res)
>  {
> +	struct io_ring_ctx *ctx = req->ctx;
>  	unsigned long flags;
>  
>  	spin_lock_irqsave(&ctx->completion_lock, flags);
> -	io_cqring_fill_event(ctx, user_data, res);
> +	io_cqring_fill_event(req, res);
>  	io_commit_cqring(ctx);
>  	spin_unlock_irqrestore(&ctx->completion_lock, flags);
>  
> @@ -721,10 +720,10 @@ static void io_link_cancel_timeout(struct io_ring_ctx *ctx,
>  
>  	ret = hrtimer_try_to_cancel(&req->timeout.timer);
>  	if (ret != -1) {
> -		io_cqring_fill_event(ctx, req->user_data, -ECANCELED);
> +		io_cqring_fill_event(req, -ECANCELED);
>  		io_commit_cqring(ctx);
>  		req->flags &= ~REQ_F_LINK;
> -		__io_free_req(req);
> +		io_put_req(req, NULL);
>  	}
>  }
>  
> @@ -804,8 +803,10 @@ static void io_fail_links(struct io_kiocb *req)
>  		    link->submit.sqe->opcode == IORING_OP_LINK_TIMEOUT) {
>  			io_link_cancel_timeout(ctx, link);
>  		} else {
> -			io_cqring_fill_event(ctx, link->user_data, -ECANCELED);
> -			__io_free_req(link);
> +			io_cqring_fill_event(link, -ECANCELED);
> +			/* drop both submit and complete references */
> +			io_put_req(link, NULL);
> +			io_put_req(link, NULL);

io_put_req() -> ... -> io_free_req() -> io_fail_links() -> io_put_req()

It shouldn't recurse further, but probably it would be better to avoid
it at all.


>  		}
>  	}
>  
> @@ -891,7 +892,7 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
>  		req = list_first_entry(done, struct io_kiocb, list);
>  		list_del(&req->list);
>  
> -		io_cqring_fill_event(ctx, req->user_data, req->result);
> +		io_cqring_fill_event(req, req->result);
>  		(*nr_events)++;
>  
>  		if (refcount_dec_and_test(&req->refs)) {
> @@ -1087,7 +1088,7 @@ static void io_complete_rw_common(struct kiocb *kiocb, long res)
>  
>  	if ((req->flags & REQ_F_LINK) && res != req->result)
>  		req->flags |= REQ_F_FAIL_LINK;
> -	io_cqring_add_event(req->ctx, req->user_data, res);
> +	io_cqring_add_event(req, res);
>  }
>  
>  static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
> @@ -1588,15 +1589,14 @@ static int io_write(struct io_kiocb *req, struct io_kiocb **nxt,
>  /*
>   * IORING_OP_NOP just posts a completion event, nothing else.
>   */
> -static int io_nop(struct io_kiocb *req, u64 user_data)
> +static int io_nop(struct io_kiocb *req)
>  {
>  	struct io_ring_ctx *ctx = req->ctx;
> -	long err = 0;
>  
>  	if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
>  		return -EINVAL;
>  
> -	io_cqring_add_event(ctx, user_data, err);
> +	io_cqring_add_event(req, 0);
>  	io_put_req(req, NULL);
>  	return 0;
>  }
> @@ -1643,7 +1643,7 @@ static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
>  
>  	if (ret < 0 && (req->flags & REQ_F_LINK))
>  		req->flags |= REQ_F_FAIL_LINK;
> -	io_cqring_add_event(req->ctx, sqe->user_data, ret);
> +	io_cqring_add_event(req, ret);
>  	io_put_req(req, nxt);
>  	return 0;
>  }
> @@ -1690,7 +1690,7 @@ static int io_sync_file_range(struct io_kiocb *req,
>  
>  	if (ret < 0 && (req->flags & REQ_F_LINK))
>  		req->flags |= REQ_F_FAIL_LINK;
> -	io_cqring_add_event(req->ctx, sqe->user_data, ret);
> +	io_cqring_add_event(req, ret);
>  	io_put_req(req, nxt);
>  	return 0;
>  }
> @@ -1726,7 +1726,7 @@ static int io_send_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
>  			return ret;
>  	}
>  
> -	io_cqring_add_event(req->ctx, sqe->user_data, ret);
> +	io_cqring_add_event(req, ret);
>  	if (ret < 0 && (req->flags & REQ_F_LINK))
>  		req->flags |= REQ_F_FAIL_LINK;
>  	io_put_req(req, nxt);
> @@ -1782,7 +1782,7 @@ static int io_accept(struct io_kiocb *req, const struct io_uring_sqe *sqe,
>  	}
>  	if (ret < 0 && (req->flags & REQ_F_LINK))
>  		req->flags |= REQ_F_FAIL_LINK;
> -	io_cqring_add_event(req->ctx, sqe->user_data, ret);
> +	io_cqring_add_event(req, ret);
>  	io_put_req(req, nxt);
>  	return 0;
>  #else
> @@ -1843,7 +1843,7 @@ static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
>  	}
>  	spin_unlock_irq(&ctx->completion_lock);
>  
> -	io_cqring_add_event(req->ctx, sqe->user_data, ret);
> +	io_cqring_add_event(req, ret);
>  	if (ret < 0 && (req->flags & REQ_F_LINK))
>  		req->flags |= REQ_F_FAIL_LINK;
>  	io_put_req(req, NULL);
> @@ -1854,7 +1854,7 @@ static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
>  			     __poll_t mask)
>  {
>  	req->poll.done = true;
> -	io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask));
> +	io_cqring_fill_event(req, mangle_poll(mask));
>  	io_commit_cqring(ctx);
>  }
>  
> @@ -2048,7 +2048,7 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
>  		list_del_init(&req->list);
>  	}
>  
> -	io_cqring_fill_event(ctx, req->user_data, -ETIME);
> +	io_cqring_fill_event(req, -ETIME);
>  	io_commit_cqring(ctx);
>  	spin_unlock_irqrestore(&ctx->completion_lock, flags);
>  
> @@ -2092,7 +2092,7 @@ static int io_timeout_remove(struct io_kiocb *req,
>  	/* didn't find timeout */
>  	if (ret) {
>  fill_ev:
> -		io_cqring_fill_event(ctx, req->user_data, ret);
> +		io_cqring_fill_event(req, ret);
>  		io_commit_cqring(ctx);
>  		spin_unlock_irq(&ctx->completion_lock);
>  		io_cqring_ev_posted(ctx);
> @@ -2108,8 +2108,8 @@ static int io_timeout_remove(struct io_kiocb *req,
>  		goto fill_ev;
>  	}
>  
> -	io_cqring_fill_event(ctx, req->user_data, 0);
> -	io_cqring_fill_event(ctx, treq->user_data, -ECANCELED);
> +	io_cqring_fill_event(req, 0);
> +	io_cqring_fill_event(treq, -ECANCELED);
>  	io_commit_cqring(ctx);
>  	spin_unlock_irq(&ctx->completion_lock);
>  	io_cqring_ev_posted(ctx);
> @@ -2249,7 +2249,7 @@ static int io_async_cancel(struct io_kiocb *req, const struct io_uring_sqe *sqe,
>  
>  	if (ret < 0 && (req->flags & REQ_F_LINK))
>  		req->flags |= REQ_F_FAIL_LINK;
> -	io_cqring_add_event(req->ctx, sqe->user_data, ret);
> +	io_cqring_add_event(req, ret);
>  	io_put_req(req, nxt);
>  	return 0;
>  }
> @@ -2288,12 +2288,10 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
>  	int ret, opcode;
>  	struct sqe_submit *s = &req->submit;
>  
> -	req->user_data = READ_ONCE(s->sqe->user_data);
> -
>  	opcode = READ_ONCE(s->sqe->opcode);
>  	switch (opcode) {
>  	case IORING_OP_NOP:
> -		ret = io_nop(req, req->user_data);
> +		ret = io_nop(req);
>  		break;
>  	case IORING_OP_READV:
>  		if (unlikely(s->sqe->buf_index))
> @@ -2402,7 +2400,7 @@ static void io_wq_submit_work(struct io_wq_work **workptr)
>  	if (ret) {
>  		if (req->flags & REQ_F_LINK)
>  			req->flags |= REQ_F_FAIL_LINK;
> -		io_cqring_add_event(ctx, sqe->user_data, ret);
> +		io_cqring_add_event(req, ret);
>  		io_put_req(req, NULL);
>  	}
>  
> @@ -2530,7 +2528,7 @@ static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
>  	if (prev)
>  		ret = io_async_cancel_one(ctx, (void *) prev->user_data);
>  
> -	io_cqring_add_event(ctx, req->user_data, ret);
> +	io_cqring_add_event(req, ret);
>  	io_put_req(req, NULL);
>  	return HRTIMER_NORESTART;
>  }
> @@ -2573,7 +2571,7 @@ static int io_queue_linked_timeout(struct io_kiocb *req, struct io_kiocb *nxt)
>  		 * failed by the regular submission path.
>  		 */
>  		list_del(&nxt->list);
> -		io_cqring_fill_event(ctx, nxt->user_data, ret);
> +		io_cqring_fill_event(nxt, ret);
>  		trace_io_uring_fail_link(req, nxt);
>  		io_commit_cqring(ctx);
>  		io_put_req(nxt, NULL);
> @@ -2646,7 +2644,7 @@ static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req)
>  
>  	/* and drop final reference, if we failed */
>  	if (ret) {
> -		io_cqring_add_event(ctx, req->user_data, ret);
> +		io_cqring_add_event(req, ret);
>  		if (req->flags & REQ_F_LINK)
>  			req->flags |= REQ_F_FAIL_LINK;
>  		io_put_req(req, NULL);
> @@ -2662,7 +2660,7 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req)
>  	ret = io_req_defer(ctx, req);
>  	if (ret) {
>  		if (ret != -EIOCBQUEUED) {
> -			io_cqring_add_event(ctx, req->submit.sqe->user_data, ret);
> +			io_cqring_add_event(req, ret);
>  			io_free_req(req, NULL);
>  		}
>  		return 0;
> @@ -2689,8 +2687,8 @@ static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
>  	ret = io_req_defer(ctx, req);
>  	if (ret) {
>  		if (ret != -EIOCBQUEUED) {
> -			io_cqring_add_event(ctx, req->submit.sqe->user_data, ret);
> -			io_free_req(req, NULL);
> +			io_cqring_add_event(req, ret);
> +			io_put_req(req, NULL);
>  			__io_free_req(shadow);
>  			return 0;
>  		}
> @@ -2723,6 +2721,8 @@ static void io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
>  	struct sqe_submit *s = &req->submit;
>  	int ret;
>  
> +	req->user_data = s->sqe->user_data;
> +
>  	/* enforce forwards compatibility on users */
>  	if (unlikely(s->sqe->flags & ~SQE_VALID_FLAGS)) {
>  		ret = -EINVAL;
> @@ -2732,13 +2732,13 @@ static void io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
>  	ret = io_req_set_file(ctx, state, req);
>  	if (unlikely(ret)) {
>  err_req:
> -		io_cqring_add_event(ctx, s->sqe->user_data, ret);
> -		io_free_req(req, NULL);
> +		io_cqring_add_event(req, ret);
> +		/* drop both submit and complete references */
> +		io_put_req(req, NULL);
> +		io_put_req(req, NULL);
>  		return;
>  	}
>  
> -	req->user_data = s->sqe->user_data;
> -
>  	/*
>  	 * If we already have a head request, queue this one for async
>  	 * submittal once the head completes. If we don't have a head but
> 

-- 
Pavel Begunkov


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH 2/3] io_uring: pass in io_kiocb to fill/add CQ handlers
  2019-11-07  9:53   ` Pavel Begunkov
@ 2019-11-07 13:16     ` Jens Axboe
  0 siblings, 0 replies; 7+ messages in thread
From: Jens Axboe @ 2019-11-07 13:16 UTC (permalink / raw)
  To: Pavel Begunkov, io-uring; +Cc: linux-block, jannh

On 11/7/19 2:53 AM, Pavel Begunkov wrote:
>> @@ -804,8 +803,10 @@ static void io_fail_links(struct io_kiocb *req)
>>   		    link->submit.sqe->opcode == IORING_OP_LINK_TIMEOUT) {
>>   			io_link_cancel_timeout(ctx, link);
>>   		} else {
>> -			io_cqring_fill_event(ctx, link->user_data, -ECANCELED);
>> -			__io_free_req(link);
>> +			io_cqring_fill_event(link, -ECANCELED);
>> +			/* drop both submit and complete references */
>> +			io_put_req(link, NULL);
>> +			io_put_req(link, NULL);
> 
> io_put_req() -> ... -> io_free_req() -> io_fail_links() -> io_put_req()
> 
> It shouldn't recurse further, but probably it would be better to avoid
> it at all.

Not sure how to improve that. We could do something ala:

if (refcount_sub_and_test(2, &link->refs))
	__io_free_req(link);

to make it clear and more resistant against recursion.

I also think we need to put that link path out-of-line in io_free_req().
I'll make those two changes.

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH 2/3] io_uring: pass in io_kiocb to fill/add CQ handlers
  2019-11-07 16:00 [PATCHSET v3 0/3] io_uring CQ ring backpressure Jens Axboe
@ 2019-11-07 16:00 ` Jens Axboe
  0 siblings, 0 replies; 7+ messages in thread
From: Jens Axboe @ 2019-11-07 16:00 UTC (permalink / raw)
  To: io-uring; +Cc: linux-block, asml.silence, jannh, Jens Axboe

This is in preparation for handling CQ ring overflow a bit smarter. We
should not have any functional changes in this patch. Most of the
changes are fairly straight forward, the only ones that stick out a bit
are the ones that change __io_free_req() to take the reference count
into account. If the request hasn't been submitted yet, we know it's
safe to simply ignore references and free it. But let's clean these up
too, as later patches will depend on the caller doing the right thing if
the completion logging grabs a reference to the request.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 96 +++++++++++++++++++++++++++------------------------
 1 file changed, 50 insertions(+), 46 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index da0fac4275d3..f69d9794ce17 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -369,10 +369,10 @@ struct io_submit_state {
 };
 
 static void io_wq_submit_work(struct io_wq_work **workptr);
-static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
-				 long res);
+static void io_cqring_fill_event(struct io_kiocb *req, long res);
 static void __io_free_req(struct io_kiocb *req);
 static void io_put_req(struct io_kiocb *req, struct io_kiocb **nxtptr);
+static void io_double_put_req(struct io_kiocb *req);
 
 static struct kmem_cache *req_cachep;
 
@@ -535,8 +535,8 @@ static void io_kill_timeout(struct io_kiocb *req)
 	if (ret != -1) {
 		atomic_inc(&req->ctx->cq_timeouts);
 		list_del_init(&req->list);
-		io_cqring_fill_event(req->ctx, req->user_data, 0);
-		__io_free_req(req);
+		io_cqring_fill_event(req, 0);
+		io_put_req(req, NULL);
 	}
 }
 
@@ -588,12 +588,12 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
 	return &rings->cqes[tail & ctx->cq_mask];
 }
 
-static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
-				 long res)
+static void io_cqring_fill_event(struct io_kiocb *req, long res)
 {
+	struct io_ring_ctx *ctx = req->ctx;
 	struct io_uring_cqe *cqe;
 
-	trace_io_uring_complete(ctx, ki_user_data, res);
+	trace_io_uring_complete(ctx, req->user_data, res);
 
 	/*
 	 * If we can't get a cq entry, userspace overflowed the
@@ -602,7 +602,7 @@ static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
 	 */
 	cqe = io_get_cqring(ctx);
 	if (cqe) {
-		WRITE_ONCE(cqe->user_data, ki_user_data);
+		WRITE_ONCE(cqe->user_data, req->user_data);
 		WRITE_ONCE(cqe->res, res);
 		WRITE_ONCE(cqe->flags, 0);
 	} else {
@@ -621,13 +621,13 @@ static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
 		eventfd_signal(ctx->cq_ev_fd, 1);
 }
 
-static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data,
-				long res)
+static void io_cqring_add_event(struct io_kiocb *req, long res)
 {
+	struct io_ring_ctx *ctx = req->ctx;
 	unsigned long flags;
 
 	spin_lock_irqsave(&ctx->completion_lock, flags);
-	io_cqring_fill_event(ctx, user_data, res);
+	io_cqring_fill_event(req, res);
 	io_commit_cqring(ctx);
 	spin_unlock_irqrestore(&ctx->completion_lock, flags);
 
@@ -721,10 +721,10 @@ static bool io_link_cancel_timeout(struct io_ring_ctx *ctx,
 
 	ret = hrtimer_try_to_cancel(&req->timeout.timer);
 	if (ret != -1) {
-		io_cqring_fill_event(ctx, req->user_data, -ECANCELED);
+		io_cqring_fill_event(req, -ECANCELED);
 		io_commit_cqring(ctx);
 		req->flags &= ~REQ_F_LINK;
-		__io_free_req(req);
+		io_put_req(req, NULL);
 		return true;
 	}
 
@@ -795,8 +795,8 @@ static void io_fail_links(struct io_kiocb *req)
 		    link->submit.sqe->opcode == IORING_OP_LINK_TIMEOUT) {
 			io_link_cancel_timeout(ctx, link);
 		} else {
-			io_cqring_fill_event(ctx, link->user_data, -ECANCELED);
-			__io_free_req(link);
+			io_cqring_fill_event(link, -ECANCELED);
+			io_double_put_req(link);
 		}
 	}
 
@@ -866,6 +866,13 @@ static void io_put_req(struct io_kiocb *req, struct io_kiocb **nxtptr)
 	}
 }
 
+static void io_double_put_req(struct io_kiocb *req)
+{
+	/* drop both submit and complete references */
+	if (refcount_sub_and_test(2, &req->refs))
+		__io_free_req(req);
+}
+
 static unsigned io_cqring_events(struct io_ring_ctx *ctx)
 {
 	struct io_rings *rings = ctx->rings;
@@ -898,7 +905,7 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
 		req = list_first_entry(done, struct io_kiocb, list);
 		list_del(&req->list);
 
-		io_cqring_fill_event(ctx, req->user_data, req->result);
+		io_cqring_fill_event(req, req->result);
 		(*nr_events)++;
 
 		if (refcount_dec_and_test(&req->refs)) {
@@ -1094,7 +1101,7 @@ static void io_complete_rw_common(struct kiocb *kiocb, long res)
 
 	if ((req->flags & REQ_F_LINK) && res != req->result)
 		req->flags |= REQ_F_FAIL_LINK;
-	io_cqring_add_event(req->ctx, req->user_data, res);
+	io_cqring_add_event(req, res);
 }
 
 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
@@ -1595,15 +1602,14 @@ static int io_write(struct io_kiocb *req, struct io_kiocb **nxt,
 /*
  * IORING_OP_NOP just posts a completion event, nothing else.
  */
-static int io_nop(struct io_kiocb *req, u64 user_data)
+static int io_nop(struct io_kiocb *req)
 {
 	struct io_ring_ctx *ctx = req->ctx;
-	long err = 0;
 
 	if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
 		return -EINVAL;
 
-	io_cqring_add_event(ctx, user_data, err);
+	io_cqring_add_event(req, 0);
 	io_put_req(req, NULL);
 	return 0;
 }
@@ -1650,7 +1656,7 @@ static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 
 	if (ret < 0 && (req->flags & REQ_F_LINK))
 		req->flags |= REQ_F_FAIL_LINK;
-	io_cqring_add_event(req->ctx, sqe->user_data, ret);
+	io_cqring_add_event(req, ret);
 	io_put_req(req, nxt);
 	return 0;
 }
@@ -1697,7 +1703,7 @@ static int io_sync_file_range(struct io_kiocb *req,
 
 	if (ret < 0 && (req->flags & REQ_F_LINK))
 		req->flags |= REQ_F_FAIL_LINK;
-	io_cqring_add_event(req->ctx, sqe->user_data, ret);
+	io_cqring_add_event(req, ret);
 	io_put_req(req, nxt);
 	return 0;
 }
@@ -1733,7 +1739,7 @@ static int io_send_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 			return ret;
 	}
 
-	io_cqring_add_event(req->ctx, sqe->user_data, ret);
+	io_cqring_add_event(req, ret);
 	if (ret < 0 && (req->flags & REQ_F_LINK))
 		req->flags |= REQ_F_FAIL_LINK;
 	io_put_req(req, nxt);
@@ -1789,7 +1795,7 @@ static int io_accept(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 	}
 	if (ret < 0 && (req->flags & REQ_F_LINK))
 		req->flags |= REQ_F_FAIL_LINK;
-	io_cqring_add_event(req->ctx, sqe->user_data, ret);
+	io_cqring_add_event(req, ret);
 	io_put_req(req, nxt);
 	return 0;
 #else
@@ -1850,7 +1856,7 @@ static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 	}
 	spin_unlock_irq(&ctx->completion_lock);
 
-	io_cqring_add_event(req->ctx, sqe->user_data, ret);
+	io_cqring_add_event(req, ret);
 	if (ret < 0 && (req->flags & REQ_F_LINK))
 		req->flags |= REQ_F_FAIL_LINK;
 	io_put_req(req, NULL);
@@ -1861,7 +1867,7 @@ static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
 			     __poll_t mask)
 {
 	req->poll.done = true;
-	io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask));
+	io_cqring_fill_event(req, mangle_poll(mask));
 	io_commit_cqring(ctx);
 }
 
@@ -2055,7 +2061,7 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
 		list_del_init(&req->list);
 	}
 
-	io_cqring_fill_event(ctx, req->user_data, -ETIME);
+	io_cqring_fill_event(req, -ETIME);
 	io_commit_cqring(ctx);
 	spin_unlock_irqrestore(&ctx->completion_lock, flags);
 
@@ -2099,7 +2105,7 @@ static int io_timeout_remove(struct io_kiocb *req,
 	/* didn't find timeout */
 	if (ret) {
 fill_ev:
-		io_cqring_fill_event(ctx, req->user_data, ret);
+		io_cqring_fill_event(req, ret);
 		io_commit_cqring(ctx);
 		spin_unlock_irq(&ctx->completion_lock);
 		io_cqring_ev_posted(ctx);
@@ -2115,8 +2121,8 @@ static int io_timeout_remove(struct io_kiocb *req,
 		goto fill_ev;
 	}
 
-	io_cqring_fill_event(ctx, req->user_data, 0);
-	io_cqring_fill_event(ctx, treq->user_data, -ECANCELED);
+	io_cqring_fill_event(req, 0);
+	io_cqring_fill_event(treq, -ECANCELED);
 	io_commit_cqring(ctx);
 	spin_unlock_irq(&ctx->completion_lock);
 	io_cqring_ev_posted(ctx);
@@ -2256,7 +2262,7 @@ static int io_async_cancel(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 
 	if (ret < 0 && (req->flags & REQ_F_LINK))
 		req->flags |= REQ_F_FAIL_LINK;
-	io_cqring_add_event(req->ctx, sqe->user_data, ret);
+	io_cqring_add_event(req, ret);
 	io_put_req(req, nxt);
 	return 0;
 }
@@ -2295,12 +2301,10 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 	int ret, opcode;
 	struct sqe_submit *s = &req->submit;
 
-	req->user_data = READ_ONCE(s->sqe->user_data);
-
 	opcode = READ_ONCE(s->sqe->opcode);
 	switch (opcode) {
 	case IORING_OP_NOP:
-		ret = io_nop(req, req->user_data);
+		ret = io_nop(req);
 		break;
 	case IORING_OP_READV:
 		if (unlikely(s->sqe->buf_index))
@@ -2409,7 +2413,7 @@ static void io_wq_submit_work(struct io_wq_work **workptr)
 	if (ret) {
 		if (req->flags & REQ_F_LINK)
 			req->flags |= REQ_F_FAIL_LINK;
-		io_cqring_add_event(ctx, sqe->user_data, ret);
+		io_cqring_add_event(req, ret);
 		io_put_req(req, NULL);
 	}
 
@@ -2539,7 +2543,7 @@ static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
 		ret = io_async_cancel_one(ctx, user_data);
 	}
 
-	io_cqring_add_event(ctx, req->user_data, ret);
+	io_cqring_add_event(req, ret);
 	io_put_req(req, NULL);
 	return HRTIMER_NORESTART;
 }
@@ -2582,7 +2586,7 @@ static int io_queue_linked_timeout(struct io_kiocb *req, struct io_kiocb *nxt)
 		 * failed by the regular submission path.
 		 */
 		list_del(&nxt->list);
-		io_cqring_fill_event(ctx, nxt->user_data, ret);
+		io_cqring_fill_event(nxt, ret);
 		trace_io_uring_fail_link(req, nxt);
 		io_commit_cqring(ctx);
 		io_put_req(nxt, NULL);
@@ -2655,7 +2659,7 @@ static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req)
 
 	/* and drop final reference, if we failed */
 	if (ret) {
-		io_cqring_add_event(ctx, req->user_data, ret);
+		io_cqring_add_event(req, ret);
 		if (req->flags & REQ_F_LINK)
 			req->flags |= REQ_F_FAIL_LINK;
 		io_put_req(req, NULL);
@@ -2671,8 +2675,8 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req)
 	ret = io_req_defer(ctx, req);
 	if (ret) {
 		if (ret != -EIOCBQUEUED) {
-			io_cqring_add_event(ctx, req->submit.sqe->user_data, ret);
-			io_free_req(req, NULL);
+			io_cqring_add_event(req, ret);
+			io_double_put_req(req);
 		}
 		return 0;
 	}
@@ -2698,8 +2702,8 @@ static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
 	ret = io_req_defer(ctx, req);
 	if (ret) {
 		if (ret != -EIOCBQUEUED) {
-			io_cqring_add_event(ctx, req->submit.sqe->user_data, ret);
-			io_free_req(req, NULL);
+			io_cqring_add_event(req, ret);
+			io_double_put_req(req);
 			__io_free_req(shadow);
 			return 0;
 		}
@@ -2732,6 +2736,8 @@ static void io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 	struct sqe_submit *s = &req->submit;
 	int ret;
 
+	req->user_data = s->sqe->user_data;
+
 	/* enforce forwards compatibility on users */
 	if (unlikely(s->sqe->flags & ~SQE_VALID_FLAGS)) {
 		ret = -EINVAL;
@@ -2741,13 +2747,11 @@ static void io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 	ret = io_req_set_file(ctx, state, req);
 	if (unlikely(ret)) {
 err_req:
-		io_cqring_add_event(ctx, s->sqe->user_data, ret);
-		io_free_req(req, NULL);
+		io_cqring_add_event(req, ret);
+		io_double_put_req(req);
 		return;
 	}
 
-	req->user_data = s->sqe->user_data;
-
 	/*
 	 * If we already have a head request, queue this one for async
 	 * submittal once the head completes. If we don't have a head but
-- 
2.24.0


^ permalink raw reply	[flat|nested] 7+ messages in thread

end of thread, back to index

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-11-06 23:53 [PATCHSET v2 0/3] io_uring CQ ring backpressure Jens Axboe
2019-11-06 23:53 ` [PATCH 1/3] io_uring: make io_cqring_events() take 'ctx' as argument Jens Axboe
2019-11-06 23:53 ` [PATCH 2/3] io_uring: pass in io_kiocb to fill/add CQ handlers Jens Axboe
2019-11-07  9:53   ` Pavel Begunkov
2019-11-07 13:16     ` Jens Axboe
2019-11-06 23:53 ` [PATCH 3/3] io_uring: add support for backlogged CQ ring Jens Axboe
2019-11-07 16:00 [PATCHSET v3 0/3] io_uring CQ ring backpressure Jens Axboe
2019-11-07 16:00 ` [PATCH 2/3] io_uring: pass in io_kiocb to fill/add CQ handlers Jens Axboe

Linux-Block Archive on lore.kernel.org

Archives are clonable:
	git clone --mirror https://lore.kernel.org/linux-block/0 linux-block/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 linux-block linux-block/ https://lore.kernel.org/linux-block \
		linux-block@vger.kernel.org
	public-inbox-index linux-block

Example config snippet for mirrors

Newsgroup available over NNTP:
	nntp://nntp.lore.kernel.org/org.kernel.vger.linux-block


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git