IO-Uring Archive on lore.kernel.org
 help / color / Atom feed
* [PATCHSET v3 0/3] io_uring CQ ring backpressure
@ 2019-11-07 16:00 Jens Axboe
  2019-11-07 16:00 ` [PATCH 1/3] io_uring: make io_cqring_events() take 'ctx' as argument Jens Axboe
                   ` (3 more replies)
  0 siblings, 4 replies; 9+ messages in thread
From: Jens Axboe @ 2019-11-07 16:00 UTC (permalink / raw)
  To: io-uring; +Cc: linux-block, asml.silence, jannh

Currently we drop completion events, if the CQ ring is full. That's fine
for requests with bounded completion times, but it may make it harder to
use io_uring with networked IO where request completion times are
generally unbounded. Or with POLL, for example, which is also unbounded.

This patch adds IORING_SETUP_CQ_NODROP, which changes the behavior a bit
for CQ ring overflows. First of all, it doesn't overflow the ring, it
simply stores backlog of completions that we weren't able to put into
the CQ ring. To prevent the backlog from growing indefinitely, if the
backlog is non-empty, we apply back pressure on IO submissions. Any
attempt to submit new IO with a non-empty backlog will get an -EBUSY
return from the kernel.

I think that makes for a pretty sane API in terms of how the application
can handle it. With CQ_NODROP enabled, we'll never drop a completion
event, but we'll also not allow submissions with a completion backlog.

Changes since v2:

- Add io_double_put_req() helper for the cases where we need to drop both
  the submit and complete reference. We didn't need this before as we
  could just free the request unconditionally, but we don't know if that's
  the case anymore if add/fill grabs a reference to it.
- Fix linked request dropping.

Changes since v1:

- Drop the cqe_drop structure and allocation, simply use the io_kiocb
  for the overflow backlog
- Rebase on top of Pavel's series which made this cleaner
- Add prep patch for the fill/add CQ handler changes

 fs/io_uring.c                 | 209 +++++++++++++++++++++++-----------
 include/uapi/linux/io_uring.h |   1 +
 2 files changed, 143 insertions(+), 67 deletions(-)

-- 
Jens Axboe



^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 1/3] io_uring: make io_cqring_events() take 'ctx' as argument
  2019-11-07 16:00 [PATCHSET v3 0/3] io_uring CQ ring backpressure Jens Axboe
@ 2019-11-07 16:00 ` Jens Axboe
  2019-11-07 16:00 ` [PATCH 2/3] io_uring: pass in io_kiocb to fill/add CQ handlers Jens Axboe
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 9+ messages in thread
From: Jens Axboe @ 2019-11-07 16:00 UTC (permalink / raw)
  To: io-uring; +Cc: linux-block, asml.silence, jannh, Jens Axboe

The rings can be derived from the ctx, and we need the ctx there for
a future change.

No functional changes in this patch.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 22c948db536a..da0fac4275d3 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -866,8 +866,10 @@ static void io_put_req(struct io_kiocb *req, struct io_kiocb **nxtptr)
 	}
 }
 
-static unsigned io_cqring_events(struct io_rings *rings)
+static unsigned io_cqring_events(struct io_ring_ctx *ctx)
 {
+	struct io_rings *rings = ctx->rings;
+
 	/* See comment at the top of this file */
 	smp_rmb();
 	return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
@@ -1023,7 +1025,7 @@ static int __io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
 		 * If we do, we can potentially be spinning for commands that
 		 * already triggered a CQE (eg in error).
 		 */
-		if (io_cqring_events(ctx->rings))
+		if (io_cqring_events(ctx))
 			break;
 
 		/*
@@ -3076,7 +3078,7 @@ static inline bool io_should_wake(struct io_wait_queue *iowq)
 	 * started waiting. For timeouts, we always want to return to userspace,
 	 * regardless of event count.
 	 */
-	return io_cqring_events(ctx->rings) >= iowq->to_wait ||
+	return io_cqring_events(ctx) >= iowq->to_wait ||
 			atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
 }
 
@@ -3111,7 +3113,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
 	struct io_rings *rings = ctx->rings;
 	int ret = 0;
 
-	if (io_cqring_events(rings) >= min_events)
+	if (io_cqring_events(ctx) >= min_events)
 		return 0;
 
 	if (sig) {
-- 
2.24.0


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 2/3] io_uring: pass in io_kiocb to fill/add CQ handlers
  2019-11-07 16:00 [PATCHSET v3 0/3] io_uring CQ ring backpressure Jens Axboe
  2019-11-07 16:00 ` [PATCH 1/3] io_uring: make io_cqring_events() take 'ctx' as argument Jens Axboe
@ 2019-11-07 16:00 ` Jens Axboe
  2019-11-07 16:00 ` [PATCH 3/3] io_uring: add support for backlogged CQ ring Jens Axboe
  2019-11-08  9:26 ` [PATCHSET v3 0/3] io_uring CQ ring backpressure Pavel Begunkov
  3 siblings, 0 replies; 9+ messages in thread
From: Jens Axboe @ 2019-11-07 16:00 UTC (permalink / raw)
  To: io-uring; +Cc: linux-block, asml.silence, jannh, Jens Axboe

This is in preparation for handling CQ ring overflow a bit smarter. We
should not have any functional changes in this patch. Most of the
changes are fairly straight forward, the only ones that stick out a bit
are the ones that change __io_free_req() to take the reference count
into account. If the request hasn't been submitted yet, we know it's
safe to simply ignore references and free it. But let's clean these up
too, as later patches will depend on the caller doing the right thing if
the completion logging grabs a reference to the request.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 96 +++++++++++++++++++++++++++------------------------
 1 file changed, 50 insertions(+), 46 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index da0fac4275d3..f69d9794ce17 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -369,10 +369,10 @@ struct io_submit_state {
 };
 
 static void io_wq_submit_work(struct io_wq_work **workptr);
-static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
-				 long res);
+static void io_cqring_fill_event(struct io_kiocb *req, long res);
 static void __io_free_req(struct io_kiocb *req);
 static void io_put_req(struct io_kiocb *req, struct io_kiocb **nxtptr);
+static void io_double_put_req(struct io_kiocb *req);
 
 static struct kmem_cache *req_cachep;
 
@@ -535,8 +535,8 @@ static void io_kill_timeout(struct io_kiocb *req)
 	if (ret != -1) {
 		atomic_inc(&req->ctx->cq_timeouts);
 		list_del_init(&req->list);
-		io_cqring_fill_event(req->ctx, req->user_data, 0);
-		__io_free_req(req);
+		io_cqring_fill_event(req, 0);
+		io_put_req(req, NULL);
 	}
 }
 
@@ -588,12 +588,12 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
 	return &rings->cqes[tail & ctx->cq_mask];
 }
 
-static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
-				 long res)
+static void io_cqring_fill_event(struct io_kiocb *req, long res)
 {
+	struct io_ring_ctx *ctx = req->ctx;
 	struct io_uring_cqe *cqe;
 
-	trace_io_uring_complete(ctx, ki_user_data, res);
+	trace_io_uring_complete(ctx, req->user_data, res);
 
 	/*
 	 * If we can't get a cq entry, userspace overflowed the
@@ -602,7 +602,7 @@ static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
 	 */
 	cqe = io_get_cqring(ctx);
 	if (cqe) {
-		WRITE_ONCE(cqe->user_data, ki_user_data);
+		WRITE_ONCE(cqe->user_data, req->user_data);
 		WRITE_ONCE(cqe->res, res);
 		WRITE_ONCE(cqe->flags, 0);
 	} else {
@@ -621,13 +621,13 @@ static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
 		eventfd_signal(ctx->cq_ev_fd, 1);
 }
 
-static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data,
-				long res)
+static void io_cqring_add_event(struct io_kiocb *req, long res)
 {
+	struct io_ring_ctx *ctx = req->ctx;
 	unsigned long flags;
 
 	spin_lock_irqsave(&ctx->completion_lock, flags);
-	io_cqring_fill_event(ctx, user_data, res);
+	io_cqring_fill_event(req, res);
 	io_commit_cqring(ctx);
 	spin_unlock_irqrestore(&ctx->completion_lock, flags);
 
@@ -721,10 +721,10 @@ static bool io_link_cancel_timeout(struct io_ring_ctx *ctx,
 
 	ret = hrtimer_try_to_cancel(&req->timeout.timer);
 	if (ret != -1) {
-		io_cqring_fill_event(ctx, req->user_data, -ECANCELED);
+		io_cqring_fill_event(req, -ECANCELED);
 		io_commit_cqring(ctx);
 		req->flags &= ~REQ_F_LINK;
-		__io_free_req(req);
+		io_put_req(req, NULL);
 		return true;
 	}
 
@@ -795,8 +795,8 @@ static void io_fail_links(struct io_kiocb *req)
 		    link->submit.sqe->opcode == IORING_OP_LINK_TIMEOUT) {
 			io_link_cancel_timeout(ctx, link);
 		} else {
-			io_cqring_fill_event(ctx, link->user_data, -ECANCELED);
-			__io_free_req(link);
+			io_cqring_fill_event(link, -ECANCELED);
+			io_double_put_req(link);
 		}
 	}
 
@@ -866,6 +866,13 @@ static void io_put_req(struct io_kiocb *req, struct io_kiocb **nxtptr)
 	}
 }
 
+static void io_double_put_req(struct io_kiocb *req)
+{
+	/* drop both submit and complete references */
+	if (refcount_sub_and_test(2, &req->refs))
+		__io_free_req(req);
+}
+
 static unsigned io_cqring_events(struct io_ring_ctx *ctx)
 {
 	struct io_rings *rings = ctx->rings;
@@ -898,7 +905,7 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
 		req = list_first_entry(done, struct io_kiocb, list);
 		list_del(&req->list);
 
-		io_cqring_fill_event(ctx, req->user_data, req->result);
+		io_cqring_fill_event(req, req->result);
 		(*nr_events)++;
 
 		if (refcount_dec_and_test(&req->refs)) {
@@ -1094,7 +1101,7 @@ static void io_complete_rw_common(struct kiocb *kiocb, long res)
 
 	if ((req->flags & REQ_F_LINK) && res != req->result)
 		req->flags |= REQ_F_FAIL_LINK;
-	io_cqring_add_event(req->ctx, req->user_data, res);
+	io_cqring_add_event(req, res);
 }
 
 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
@@ -1595,15 +1602,14 @@ static int io_write(struct io_kiocb *req, struct io_kiocb **nxt,
 /*
  * IORING_OP_NOP just posts a completion event, nothing else.
  */
-static int io_nop(struct io_kiocb *req, u64 user_data)
+static int io_nop(struct io_kiocb *req)
 {
 	struct io_ring_ctx *ctx = req->ctx;
-	long err = 0;
 
 	if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
 		return -EINVAL;
 
-	io_cqring_add_event(ctx, user_data, err);
+	io_cqring_add_event(req, 0);
 	io_put_req(req, NULL);
 	return 0;
 }
@@ -1650,7 +1656,7 @@ static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 
 	if (ret < 0 && (req->flags & REQ_F_LINK))
 		req->flags |= REQ_F_FAIL_LINK;
-	io_cqring_add_event(req->ctx, sqe->user_data, ret);
+	io_cqring_add_event(req, ret);
 	io_put_req(req, nxt);
 	return 0;
 }
@@ -1697,7 +1703,7 @@ static int io_sync_file_range(struct io_kiocb *req,
 
 	if (ret < 0 && (req->flags & REQ_F_LINK))
 		req->flags |= REQ_F_FAIL_LINK;
-	io_cqring_add_event(req->ctx, sqe->user_data, ret);
+	io_cqring_add_event(req, ret);
 	io_put_req(req, nxt);
 	return 0;
 }
@@ -1733,7 +1739,7 @@ static int io_send_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 			return ret;
 	}
 
-	io_cqring_add_event(req->ctx, sqe->user_data, ret);
+	io_cqring_add_event(req, ret);
 	if (ret < 0 && (req->flags & REQ_F_LINK))
 		req->flags |= REQ_F_FAIL_LINK;
 	io_put_req(req, nxt);
@@ -1789,7 +1795,7 @@ static int io_accept(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 	}
 	if (ret < 0 && (req->flags & REQ_F_LINK))
 		req->flags |= REQ_F_FAIL_LINK;
-	io_cqring_add_event(req->ctx, sqe->user_data, ret);
+	io_cqring_add_event(req, ret);
 	io_put_req(req, nxt);
 	return 0;
 #else
@@ -1850,7 +1856,7 @@ static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 	}
 	spin_unlock_irq(&ctx->completion_lock);
 
-	io_cqring_add_event(req->ctx, sqe->user_data, ret);
+	io_cqring_add_event(req, ret);
 	if (ret < 0 && (req->flags & REQ_F_LINK))
 		req->flags |= REQ_F_FAIL_LINK;
 	io_put_req(req, NULL);
@@ -1861,7 +1867,7 @@ static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
 			     __poll_t mask)
 {
 	req->poll.done = true;
-	io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask));
+	io_cqring_fill_event(req, mangle_poll(mask));
 	io_commit_cqring(ctx);
 }
 
@@ -2055,7 +2061,7 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
 		list_del_init(&req->list);
 	}
 
-	io_cqring_fill_event(ctx, req->user_data, -ETIME);
+	io_cqring_fill_event(req, -ETIME);
 	io_commit_cqring(ctx);
 	spin_unlock_irqrestore(&ctx->completion_lock, flags);
 
@@ -2099,7 +2105,7 @@ static int io_timeout_remove(struct io_kiocb *req,
 	/* didn't find timeout */
 	if (ret) {
 fill_ev:
-		io_cqring_fill_event(ctx, req->user_data, ret);
+		io_cqring_fill_event(req, ret);
 		io_commit_cqring(ctx);
 		spin_unlock_irq(&ctx->completion_lock);
 		io_cqring_ev_posted(ctx);
@@ -2115,8 +2121,8 @@ static int io_timeout_remove(struct io_kiocb *req,
 		goto fill_ev;
 	}
 
-	io_cqring_fill_event(ctx, req->user_data, 0);
-	io_cqring_fill_event(ctx, treq->user_data, -ECANCELED);
+	io_cqring_fill_event(req, 0);
+	io_cqring_fill_event(treq, -ECANCELED);
 	io_commit_cqring(ctx);
 	spin_unlock_irq(&ctx->completion_lock);
 	io_cqring_ev_posted(ctx);
@@ -2256,7 +2262,7 @@ static int io_async_cancel(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 
 	if (ret < 0 && (req->flags & REQ_F_LINK))
 		req->flags |= REQ_F_FAIL_LINK;
-	io_cqring_add_event(req->ctx, sqe->user_data, ret);
+	io_cqring_add_event(req, ret);
 	io_put_req(req, nxt);
 	return 0;
 }
@@ -2295,12 +2301,10 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 	int ret, opcode;
 	struct sqe_submit *s = &req->submit;
 
-	req->user_data = READ_ONCE(s->sqe->user_data);
-
 	opcode = READ_ONCE(s->sqe->opcode);
 	switch (opcode) {
 	case IORING_OP_NOP:
-		ret = io_nop(req, req->user_data);
+		ret = io_nop(req);
 		break;
 	case IORING_OP_READV:
 		if (unlikely(s->sqe->buf_index))
@@ -2409,7 +2413,7 @@ static void io_wq_submit_work(struct io_wq_work **workptr)
 	if (ret) {
 		if (req->flags & REQ_F_LINK)
 			req->flags |= REQ_F_FAIL_LINK;
-		io_cqring_add_event(ctx, sqe->user_data, ret);
+		io_cqring_add_event(req, ret);
 		io_put_req(req, NULL);
 	}
 
@@ -2539,7 +2543,7 @@ static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
 		ret = io_async_cancel_one(ctx, user_data);
 	}
 
-	io_cqring_add_event(ctx, req->user_data, ret);
+	io_cqring_add_event(req, ret);
 	io_put_req(req, NULL);
 	return HRTIMER_NORESTART;
 }
@@ -2582,7 +2586,7 @@ static int io_queue_linked_timeout(struct io_kiocb *req, struct io_kiocb *nxt)
 		 * failed by the regular submission path.
 		 */
 		list_del(&nxt->list);
-		io_cqring_fill_event(ctx, nxt->user_data, ret);
+		io_cqring_fill_event(nxt, ret);
 		trace_io_uring_fail_link(req, nxt);
 		io_commit_cqring(ctx);
 		io_put_req(nxt, NULL);
@@ -2655,7 +2659,7 @@ static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req)
 
 	/* and drop final reference, if we failed */
 	if (ret) {
-		io_cqring_add_event(ctx, req->user_data, ret);
+		io_cqring_add_event(req, ret);
 		if (req->flags & REQ_F_LINK)
 			req->flags |= REQ_F_FAIL_LINK;
 		io_put_req(req, NULL);
@@ -2671,8 +2675,8 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req)
 	ret = io_req_defer(ctx, req);
 	if (ret) {
 		if (ret != -EIOCBQUEUED) {
-			io_cqring_add_event(ctx, req->submit.sqe->user_data, ret);
-			io_free_req(req, NULL);
+			io_cqring_add_event(req, ret);
+			io_double_put_req(req);
 		}
 		return 0;
 	}
@@ -2698,8 +2702,8 @@ static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
 	ret = io_req_defer(ctx, req);
 	if (ret) {
 		if (ret != -EIOCBQUEUED) {
-			io_cqring_add_event(ctx, req->submit.sqe->user_data, ret);
-			io_free_req(req, NULL);
+			io_cqring_add_event(req, ret);
+			io_double_put_req(req);
 			__io_free_req(shadow);
 			return 0;
 		}
@@ -2732,6 +2736,8 @@ static void io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 	struct sqe_submit *s = &req->submit;
 	int ret;
 
+	req->user_data = s->sqe->user_data;
+
 	/* enforce forwards compatibility on users */
 	if (unlikely(s->sqe->flags & ~SQE_VALID_FLAGS)) {
 		ret = -EINVAL;
@@ -2741,13 +2747,11 @@ static void io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 	ret = io_req_set_file(ctx, state, req);
 	if (unlikely(ret)) {
 err_req:
-		io_cqring_add_event(ctx, s->sqe->user_data, ret);
-		io_free_req(req, NULL);
+		io_cqring_add_event(req, ret);
+		io_double_put_req(req);
 		return;
 	}
 
-	req->user_data = s->sqe->user_data;
-
 	/*
 	 * If we already have a head request, queue this one for async
 	 * submittal once the head completes. If we don't have a head but
-- 
2.24.0


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 3/3] io_uring: add support for backlogged CQ ring
  2019-11-07 16:00 [PATCHSET v3 0/3] io_uring CQ ring backpressure Jens Axboe
  2019-11-07 16:00 ` [PATCH 1/3] io_uring: make io_cqring_events() take 'ctx' as argument Jens Axboe
  2019-11-07 16:00 ` [PATCH 2/3] io_uring: pass in io_kiocb to fill/add CQ handlers Jens Axboe
@ 2019-11-07 16:00 ` Jens Axboe
  2019-11-09 12:25   ` Pavel Begunkov
  2019-11-08  9:26 ` [PATCHSET v3 0/3] io_uring CQ ring backpressure Pavel Begunkov
  3 siblings, 1 reply; 9+ messages in thread
From: Jens Axboe @ 2019-11-07 16:00 UTC (permalink / raw)
  To: io-uring; +Cc: linux-block, asml.silence, jannh, Jens Axboe

Currently we drop completion events, if the CQ ring is full. That's fine
for requests with bounded completion times, but it may make it harder to
use io_uring with networked IO where request completion times are
generally unbounded. Or with POLL, for example, which is also unbounded.

This patch adds IORING_SETUP_CQ_NODROP, which changes the behavior a bit
for CQ ring overflows. First of all, it doesn't overflow the ring, it
simply stores a backlog of completions that we weren't able to put into
the CQ ring. To prevent the backlog from growing indefinitely, if the
backlog is non-empty, we apply back pressure on IO submissions. Any
attempt to submit new IO with a non-empty backlog will get an -EBUSY
return from the kernel. This is a signal to the application that it has
backlogged CQ events, and that it must reap those before being allowed
to submit more IO.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c                 | 103 ++++++++++++++++++++++++++++------
 include/uapi/linux/io_uring.h |   1 +
 2 files changed, 87 insertions(+), 17 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index f69d9794ce17..ff0f79a57f7b 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -207,6 +207,7 @@ struct io_ring_ctx {
 
 		struct list_head	defer_list;
 		struct list_head	timeout_list;
+		struct list_head	cq_overflow_list;
 
 		wait_queue_head_t	inflight_wait;
 	} ____cacheline_aligned_in_smp;
@@ -414,6 +415,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 
 	ctx->flags = p->flags;
 	init_waitqueue_head(&ctx->cq_wait);
+	INIT_LIST_HEAD(&ctx->cq_overflow_list);
 	init_completion(&ctx->ctx_done);
 	init_completion(&ctx->sqo_thread_started);
 	mutex_init(&ctx->uring_lock);
@@ -588,6 +590,72 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
 	return &rings->cqes[tail & ctx->cq_mask];
 }
 
+static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
+{
+	if (waitqueue_active(&ctx->wait))
+		wake_up(&ctx->wait);
+	if (waitqueue_active(&ctx->sqo_wait))
+		wake_up(&ctx->sqo_wait);
+	if (ctx->cq_ev_fd)
+		eventfd_signal(ctx->cq_ev_fd, 1);
+}
+
+static void io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
+{
+	struct io_rings *rings = ctx->rings;
+	struct io_uring_cqe *cqe;
+	struct io_kiocb *req;
+	unsigned long flags;
+	LIST_HEAD(list);
+
+	if (list_empty_careful(&ctx->cq_overflow_list))
+		return;
+	if (ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
+	    rings->cq_ring_entries)
+		return;
+
+	spin_lock_irqsave(&ctx->completion_lock, flags);
+
+	while (!list_empty(&ctx->cq_overflow_list)) {
+		cqe = io_get_cqring(ctx);
+		if (!cqe && !force)
+			break;
+
+		req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
+						list);
+		list_move(&req->list, &list);
+		if (cqe) {
+			WRITE_ONCE(cqe->user_data, req->user_data);
+			WRITE_ONCE(cqe->res, req->result);
+			WRITE_ONCE(cqe->flags, 0);
+		}
+	}
+
+	io_commit_cqring(ctx);
+	spin_unlock_irqrestore(&ctx->completion_lock, flags);
+	io_cqring_ev_posted(ctx);
+
+	while (!list_empty(&list)) {
+		req = list_first_entry(&list, struct io_kiocb, list);
+		list_del(&req->list);
+		io_put_req(req, NULL);
+	}
+}
+
+static void io_cqring_overflow(struct io_ring_ctx *ctx, struct io_kiocb *req,
+			       long res)
+	__must_hold(&ctx->completion_lock)
+{
+	if (!(ctx->flags & IORING_SETUP_CQ_NODROP)) {
+		WRITE_ONCE(ctx->rings->cq_overflow,
+				atomic_inc_return(&ctx->cached_cq_overflow));
+	} else {
+		refcount_inc(&req->refs);
+		req->result = res;
+		list_add_tail(&req->list, &ctx->cq_overflow_list);
+	}
+}
+
 static void io_cqring_fill_event(struct io_kiocb *req, long res)
 {
 	struct io_ring_ctx *ctx = req->ctx;
@@ -601,26 +669,15 @@ static void io_cqring_fill_event(struct io_kiocb *req, long res)
 	 * the ring.
 	 */
 	cqe = io_get_cqring(ctx);
-	if (cqe) {
+	if (likely(cqe)) {
 		WRITE_ONCE(cqe->user_data, req->user_data);
 		WRITE_ONCE(cqe->res, res);
 		WRITE_ONCE(cqe->flags, 0);
 	} else {
-		WRITE_ONCE(ctx->rings->cq_overflow,
-				atomic_inc_return(&ctx->cached_cq_overflow));
+		io_cqring_overflow(ctx, req, res);
 	}
 }
 
-static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
-{
-	if (waitqueue_active(&ctx->wait))
-		wake_up(&ctx->wait);
-	if (waitqueue_active(&ctx->sqo_wait))
-		wake_up(&ctx->sqo_wait);
-	if (ctx->cq_ev_fd)
-		eventfd_signal(ctx->cq_ev_fd, 1);
-}
-
 static void io_cqring_add_event(struct io_kiocb *req, long res)
 {
 	struct io_ring_ctx *ctx = req->ctx;
@@ -877,6 +934,9 @@ static unsigned io_cqring_events(struct io_ring_ctx *ctx)
 {
 	struct io_rings *rings = ctx->rings;
 
+	if (ctx->flags & IORING_SETUP_CQ_NODROP)
+		io_cqring_overflow_flush(ctx, false);
+
 	/* See comment at the top of this file */
 	smp_rmb();
 	return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
@@ -2876,6 +2936,10 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
 	int i, submitted = 0;
 	bool mm_fault = false;
 
+	if ((ctx->flags & IORING_SETUP_CQ_NODROP) &&
+	    !list_empty(&ctx->cq_overflow_list))
+		return -EBUSY;
+
 	if (nr > IO_PLUG_THRESHOLD) {
 		io_submit_state_start(&state, ctx, nr);
 		statep = &state;
@@ -2967,6 +3031,7 @@ static int io_sq_thread(void *data)
 	timeout = inflight = 0;
 	while (!kthread_should_park()) {
 		unsigned int to_submit;
+		int ret;
 
 		if (inflight) {
 			unsigned nr_events = 0;
@@ -3051,8 +3116,9 @@ static int io_sq_thread(void *data)
 		}
 
 		to_submit = min(to_submit, ctx->sq_entries);
-		inflight += io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm,
-					   true);
+		ret = io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm, true);
+		if (ret > 0)
+			inflight += ret;
 	}
 
 	set_fs(old_fs);
@@ -4116,8 +4182,10 @@ static int io_uring_flush(struct file *file, void *data)
 	struct io_ring_ctx *ctx = file->private_data;
 
 	io_uring_cancel_files(ctx, data);
-	if (fatal_signal_pending(current) || (current->flags & PF_EXITING))
+	if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) {
+		io_cqring_overflow_flush(ctx, true);
 		io_wq_cancel_all(ctx->io_wq);
+	}
 	return 0;
 }
 
@@ -4418,7 +4486,8 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
 	}
 
 	if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
-			IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE))
+			IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
+			IORING_SETUP_CQ_NODROP))
 		return -EINVAL;
 
 	ret = io_uring_create(entries, &p);
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index f1a118b01d18..3d8517eb376e 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -56,6 +56,7 @@ struct io_uring_sqe {
 #define IORING_SETUP_SQPOLL	(1U << 1)	/* SQ poll thread */
 #define IORING_SETUP_SQ_AFF	(1U << 2)	/* sq_thread_cpu is valid */
 #define IORING_SETUP_CQSIZE	(1U << 3)	/* app defines CQ size */
+#define IORING_SETUP_CQ_NODROP	(1U << 4)	/* no CQ drops */
 
 #define IORING_OP_NOP		0
 #define IORING_OP_READV		1
-- 
2.24.0


^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCHSET v3 0/3] io_uring CQ ring backpressure
  2019-11-07 16:00 [PATCHSET v3 0/3] io_uring CQ ring backpressure Jens Axboe
                   ` (2 preceding siblings ...)
  2019-11-07 16:00 ` [PATCH 3/3] io_uring: add support for backlogged CQ ring Jens Axboe
@ 2019-11-08  9:26 ` Pavel Begunkov
  3 siblings, 0 replies; 9+ messages in thread
From: Pavel Begunkov @ 2019-11-08  9:26 UTC (permalink / raw)
  To: Jens Axboe, io-uring; +Cc: linux-block, jannh

[-- Attachment #1.1: Type: text/plain, Size: 1913 bytes --]

On 07/11/2019 19:00, Jens Axboe wrote:
> Currently we drop completion events, if the CQ ring is full. That's fine
> for requests with bounded completion times, but it may make it harder to
> use io_uring with networked IO where request completion times are
> generally unbounded. Or with POLL, for example, which is also unbounded.
> 
> This patch adds IORING_SETUP_CQ_NODROP, which changes the behavior a bit
> for CQ ring overflows. First of all, it doesn't overflow the ring, it
> simply stores backlog of completions that we weren't able to put into
> the CQ ring. To prevent the backlog from growing indefinitely, if the
> backlog is non-empty, we apply back pressure on IO submissions. Any
> attempt to submit new IO with a non-empty backlog will get an -EBUSY
> return from the kernel.
> 
> I think that makes for a pretty sane API in terms of how the application
> can handle it. With CQ_NODROP enabled, we'll never drop a completion
> event, but we'll also not allow submissions with a completion backlog.
> 
Looks good to me
Reviewed-by: Pavel Begunkov <asml.silence@gmail.com>


> Changes since v2:
> 
> - Add io_double_put_req() helper for the cases where we need to drop both
>   the submit and complete reference. We didn't need this before as we
>   could just free the request unconditionally, but we don't know if that's
>   the case anymore if add/fill grabs a reference to it.
> - Fix linked request dropping.
> 
> Changes since v1:
> 
> - Drop the cqe_drop structure and allocation, simply use the io_kiocb
>   for the overflow backlog
> - Rebase on top of Pavel's series which made this cleaner
> - Add prep patch for the fill/add CQ handler changes
> 
>  fs/io_uring.c                 | 209 +++++++++++++++++++++++-----------
>  include/uapi/linux/io_uring.h |   1 +
>  2 files changed, 143 insertions(+), 67 deletions(-)
> 

-- 
Pavel Begunkov


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH 3/3] io_uring: add support for backlogged CQ ring
  2019-11-07 16:00 ` [PATCH 3/3] io_uring: add support for backlogged CQ ring Jens Axboe
@ 2019-11-09 12:25   ` Pavel Begunkov
  2019-11-09 12:33     ` Pavel Begunkov
  0 siblings, 1 reply; 9+ messages in thread
From: Pavel Begunkov @ 2019-11-09 12:25 UTC (permalink / raw)
  To: Jens Axboe, io-uring; +Cc: linux-block, jannh

On 11/7/2019 7:00 PM, Jens Axboe wrote:
> Currently we drop completion events, if the CQ ring is full. That's fine
> for requests with bounded completion times, but it may make it harder to
> use io_uring with networked IO where request completion times are
> generally unbounded. Or with POLL, for example, which is also unbounded.
> 
> This patch adds IORING_SETUP_CQ_NODROP, which changes the behavior a bit
> for CQ ring overflows. First of all, it doesn't overflow the ring, it
> simply stores a backlog of completions that we weren't able to put into
> the CQ ring. To prevent the backlog from growing indefinitely, if the
> backlog is non-empty, we apply back pressure on IO submissions. Any
> attempt to submit new IO with a non-empty backlog will get an -EBUSY
> return from the kernel. This is a signal to the application that it has
> backlogged CQ events, and that it must reap those before being allowed
> to submit more IO.>
> Signed-off-by: Jens Axboe <axboe@kernel.dk>
> ---
>  fs/io_uring.c                 | 103 ++++++++++++++++++++++++++++------
>  include/uapi/linux/io_uring.h |   1 +
>  2 files changed, 87 insertions(+), 17 deletions(-)
> 
> diff --git a/fs/io_uring.c b/fs/io_uring.c
> index f69d9794ce17..ff0f79a57f7b 100644
> --- a/fs/io_uring.c
> +++ b/fs/io_uring.c
> @@ -207,6 +207,7 @@ struct io_ring_ctx {
>  
>  		struct list_head	defer_list;
>  		struct list_head	timeout_list;
> +		struct list_head	cq_overflow_list;
>  
>  		wait_queue_head_t	inflight_wait;
>  	} ____cacheline_aligned_in_smp;
> @@ -414,6 +415,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
>  
>  	ctx->flags = p->flags;
>  	init_waitqueue_head(&ctx->cq_wait);
> +	INIT_LIST_HEAD(&ctx->cq_overflow_list);
>  	init_completion(&ctx->ctx_done);
>  	init_completion(&ctx->sqo_thread_started);
>  	mutex_init(&ctx->uring_lock);
> @@ -588,6 +590,72 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
>  	return &rings->cqes[tail & ctx->cq_mask];
>  }
>  
> +static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
> +{
> +	if (waitqueue_active(&ctx->wait))
> +		wake_up(&ctx->wait);
> +	if (waitqueue_active(&ctx->sqo_wait))
> +		wake_up(&ctx->sqo_wait);
> +	if (ctx->cq_ev_fd)
> +		eventfd_signal(ctx->cq_ev_fd, 1);
> +}
> +
> +static void io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
> +{
> +	struct io_rings *rings = ctx->rings;
> +	struct io_uring_cqe *cqe;
> +	struct io_kiocb *req;
> +	unsigned long flags;
> +	LIST_HEAD(list);
> +
> +	if (list_empty_careful(&ctx->cq_overflow_list))
> +		return;
> +	if (ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
> +	    rings->cq_ring_entries)
> +		return;
> +
> +	spin_lock_irqsave(&ctx->completion_lock, flags);
> +
> +	while (!list_empty(&ctx->cq_overflow_list)) {
> +		cqe = io_get_cqring(ctx);
> +		if (!cqe && !force)
> +			break;> +
> +		req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
> +						list);
> +		list_move(&req->list, &list);
> +		if (cqe) {
> +			WRITE_ONCE(cqe->user_data, req->user_data);
> +			WRITE_ONCE(cqe->res, req->result);
> +			WRITE_ONCE(cqe->flags, 0);
> +		}

Hmm, second thought. We should account overflow here.

> +	}
> +
> +	io_commit_cqring(ctx);
> +	spin_unlock_irqrestore(&ctx->completion_lock, flags);
> +	io_cqring_ev_posted(ctx);
> +
> +	while (!list_empty(&list)) {
> +		req = list_first_entry(&list, struct io_kiocb, list);
> +		list_del(&req->list);
> +		io_put_req(req, NULL);
> +	}
> +}
> +
> +static void io_cqring_overflow(struct io_ring_ctx *ctx, struct io_kiocb *req,
> +			       long res)
> +	__must_hold(&ctx->completion_lock)
> +{
> +	if (!(ctx->flags & IORING_SETUP_CQ_NODROP)) {
> +		WRITE_ONCE(ctx->rings->cq_overflow,
> +				atomic_inc_return(&ctx->cached_cq_overflow));
> +	} else {
> +		refcount_inc(&req->refs);
> +		req->result = res;
> +		list_add_tail(&req->list, &ctx->cq_overflow_list);
> +	}
> +}
> +
>  static void io_cqring_fill_event(struct io_kiocb *req, long res)
>  {
>  	struct io_ring_ctx *ctx = req->ctx;
> @@ -601,26 +669,15 @@ static void io_cqring_fill_event(struct io_kiocb *req, long res)
>  	 * the ring.
>  	 */
>  	cqe = io_get_cqring(ctx);
> -	if (cqe) {
> +	if (likely(cqe)) {
>  		WRITE_ONCE(cqe->user_data, req->user_data);
>  		WRITE_ONCE(cqe->res, res);
>  		WRITE_ONCE(cqe->flags, 0);
>  	} else {
> -		WRITE_ONCE(ctx->rings->cq_overflow,
> -				atomic_inc_return(&ctx->cached_cq_overflow));
> +		io_cqring_overflow(ctx, req, res);
>  	}
>  }
>  
> -static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
> -{
> -	if (waitqueue_active(&ctx->wait))
> -		wake_up(&ctx->wait);
> -	if (waitqueue_active(&ctx->sqo_wait))
> -		wake_up(&ctx->sqo_wait);
> -	if (ctx->cq_ev_fd)
> -		eventfd_signal(ctx->cq_ev_fd, 1);
> -}
> -
>  static void io_cqring_add_event(struct io_kiocb *req, long res)
>  {
>  	struct io_ring_ctx *ctx = req->ctx;
> @@ -877,6 +934,9 @@ static unsigned io_cqring_events(struct io_ring_ctx *ctx)
>  {
>  	struct io_rings *rings = ctx->rings;
>  
> +	if (ctx->flags & IORING_SETUP_CQ_NODROP)
> +		io_cqring_overflow_flush(ctx, false);
> +
>  	/* See comment at the top of this file */
>  	smp_rmb();
>  	return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
> @@ -2876,6 +2936,10 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
>  	int i, submitted = 0;
>  	bool mm_fault = false;
>  
> +	if ((ctx->flags & IORING_SETUP_CQ_NODROP) &&
> +	    !list_empty(&ctx->cq_overflow_list))
> +		return -EBUSY;
> +
>  	if (nr > IO_PLUG_THRESHOLD) {
>  		io_submit_state_start(&state, ctx, nr);
>  		statep = &state;
> @@ -2967,6 +3031,7 @@ static int io_sq_thread(void *data)
>  	timeout = inflight = 0;
>  	while (!kthread_should_park()) {
>  		unsigned int to_submit;
> +		int ret;
>  
>  		if (inflight) {
>  			unsigned nr_events = 0;
> @@ -3051,8 +3116,9 @@ static int io_sq_thread(void *data)
>  		}
>  
>  		to_submit = min(to_submit, ctx->sq_entries);
> -		inflight += io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm,
> -					   true);
> +		ret = io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm, true);
> +		if (ret > 0)
> +			inflight += ret;
>  	}
>  
>  	set_fs(old_fs);
> @@ -4116,8 +4182,10 @@ static int io_uring_flush(struct file *file, void *data)
>  	struct io_ring_ctx *ctx = file->private_data;
>  
>  	io_uring_cancel_files(ctx, data);
> -	if (fatal_signal_pending(current) || (current->flags & PF_EXITING))
> +	if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) {
> +		io_cqring_overflow_flush(ctx, true);
>  		io_wq_cancel_all(ctx->io_wq);
> +	}
>  	return 0;
>  }
>  
> @@ -4418,7 +4486,8 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
>  	}
>  
>  	if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
> -			IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE))
> +			IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
> +			IORING_SETUP_CQ_NODROP))
>  		return -EINVAL;
>  
>  	ret = io_uring_create(entries, &p);
> diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
> index f1a118b01d18..3d8517eb376e 100644
> --- a/include/uapi/linux/io_uring.h
> +++ b/include/uapi/linux/io_uring.h
> @@ -56,6 +56,7 @@ struct io_uring_sqe {
>  #define IORING_SETUP_SQPOLL	(1U << 1)	/* SQ poll thread */
>  #define IORING_SETUP_SQ_AFF	(1U << 2)	/* sq_thread_cpu is valid */
>  #define IORING_SETUP_CQSIZE	(1U << 3)	/* app defines CQ size */
> +#define IORING_SETUP_CQ_NODROP	(1U << 4)	/* no CQ drops */
>  
>  #define IORING_OP_NOP		0
>  #define IORING_OP_READV		1
> 

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH 3/3] io_uring: add support for backlogged CQ ring
  2019-11-09 12:25   ` Pavel Begunkov
@ 2019-11-09 12:33     ` Pavel Begunkov
  2019-11-09 14:14       ` Jens Axboe
  0 siblings, 1 reply; 9+ messages in thread
From: Pavel Begunkov @ 2019-11-09 12:33 UTC (permalink / raw)
  To: Jens Axboe, io-uring; +Cc: linux-block, jannh

On 11/9/2019 3:25 PM, Pavel Begunkov wrote:
> On 11/7/2019 7:00 PM, Jens Axboe wrote:
>> Currently we drop completion events, if the CQ ring is full. That's fine
>> for requests with bounded completion times, but it may make it harder to
>> use io_uring with networked IO where request completion times are
>> generally unbounded. Or with POLL, for example, which is also unbounded.
>>
>> This patch adds IORING_SETUP_CQ_NODROP, which changes the behavior a bit
>> for CQ ring overflows. First of all, it doesn't overflow the ring, it
>> simply stores a backlog of completions that we weren't able to put into
>> the CQ ring. To prevent the backlog from growing indefinitely, if the
>> backlog is non-empty, we apply back pressure on IO submissions. Any
>> attempt to submit new IO with a non-empty backlog will get an -EBUSY
>> return from the kernel. This is a signal to the application that it has
>> backlogged CQ events, and that it must reap those before being allowed
>> to submit more IO.>
>> Signed-off-by: Jens Axboe <axboe@kernel.dk>
>> ---
>>  fs/io_uring.c                 | 103 ++++++++++++++++++++++++++++------
>>  include/uapi/linux/io_uring.h |   1 +
>>  2 files changed, 87 insertions(+), 17 deletions(-)
>>
>> +static void io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
>> +{
>> +	struct io_rings *rings = ctx->rings;
>> +	struct io_uring_cqe *cqe;
>> +	struct io_kiocb *req;
>> +	unsigned long flags;
>> +	LIST_HEAD(list);
>> +
>> +	if (list_empty_careful(&ctx->cq_overflow_list))
>> +		return;
>> +	if (ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
>> +	    rings->cq_ring_entries)
>> +		return;
>> +
>> +	spin_lock_irqsave(&ctx->completion_lock, flags);
>> +
>> +	while (!list_empty(&ctx->cq_overflow_list)) {
>> +		cqe = io_get_cqring(ctx);
>> +		if (!cqe && !force)
>> +			break;> +
>> +		req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
>> +						list);
>> +		list_move(&req->list, &list);
>> +		if (cqe) {
>> +			WRITE_ONCE(cqe->user_data, req->user_data);
>> +			WRITE_ONCE(cqe->res, req->result);
>> +			WRITE_ONCE(cqe->flags, 0);
>> +		}
> 
> Hmm, second thought. We should account overflow here.
> 
Clarification: We should account overflow in case of (!cqe).

i.e.
if (!cqe) { // else
	WRITE_ONCE(ctx->rings->cq_overflow,
			atomic_inc_return(&ctx->cached_cq_overflow));
}

>> +	}
>> +
>> +	io_commit_cqring(ctx);
>> +	spin_unlock_irqrestore(&ctx->completion_lock, flags);
>> +	io_cqring_ev_posted(ctx);
>> +
>> +	while (!list_empty(&list)) {
>> +		req = list_first_entry(&list, struct io_kiocb, list);
>> +		list_del(&req->list);
>> +		io_put_req(req, NULL);
>> +	}
>> +}
>> +

-- 
Pavel Begunkov

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH 3/3] io_uring: add support for backlogged CQ ring
  2019-11-09 12:33     ` Pavel Begunkov
@ 2019-11-09 14:14       ` Jens Axboe
  0 siblings, 0 replies; 9+ messages in thread
From: Jens Axboe @ 2019-11-09 14:14 UTC (permalink / raw)
  To: Pavel Begunkov, io-uring; +Cc: linux-block, jannh

On 11/9/19 5:33 AM, Pavel Begunkov wrote:
> On 11/9/2019 3:25 PM, Pavel Begunkov wrote:
>> On 11/7/2019 7:00 PM, Jens Axboe wrote:
>>> Currently we drop completion events, if the CQ ring is full. That's fine
>>> for requests with bounded completion times, but it may make it harder to
>>> use io_uring with networked IO where request completion times are
>>> generally unbounded. Or with POLL, for example, which is also unbounded.
>>>
>>> This patch adds IORING_SETUP_CQ_NODROP, which changes the behavior a bit
>>> for CQ ring overflows. First of all, it doesn't overflow the ring, it
>>> simply stores a backlog of completions that we weren't able to put into
>>> the CQ ring. To prevent the backlog from growing indefinitely, if the
>>> backlog is non-empty, we apply back pressure on IO submissions. Any
>>> attempt to submit new IO with a non-empty backlog will get an -EBUSY
>>> return from the kernel. This is a signal to the application that it has
>>> backlogged CQ events, and that it must reap those before being allowed
>>> to submit more IO.>
>>> Signed-off-by: Jens Axboe <axboe@kernel.dk>
>>> ---
>>>   fs/io_uring.c                 | 103 ++++++++++++++++++++++++++++------
>>>   include/uapi/linux/io_uring.h |   1 +
>>>   2 files changed, 87 insertions(+), 17 deletions(-)
>>>
>>> +static void io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
>>> +{
>>> +	struct io_rings *rings = ctx->rings;
>>> +	struct io_uring_cqe *cqe;
>>> +	struct io_kiocb *req;
>>> +	unsigned long flags;
>>> +	LIST_HEAD(list);
>>> +
>>> +	if (list_empty_careful(&ctx->cq_overflow_list))
>>> +		return;
>>> +	if (ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
>>> +	    rings->cq_ring_entries)
>>> +		return;
>>> +
>>> +	spin_lock_irqsave(&ctx->completion_lock, flags);
>>> +
>>> +	while (!list_empty(&ctx->cq_overflow_list)) {
>>> +		cqe = io_get_cqring(ctx);
>>> +		if (!cqe && !force)
>>> +			break;> +
>>> +		req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
>>> +						list);
>>> +		list_move(&req->list, &list);
>>> +		if (cqe) {
>>> +			WRITE_ONCE(cqe->user_data, req->user_data);
>>> +			WRITE_ONCE(cqe->res, req->result);
>>> +			WRITE_ONCE(cqe->flags, 0);
>>> +		}
>>
>> Hmm, second thought. We should account overflow here.
>>
> Clarification: We should account overflow in case of (!cqe).
> 
> i.e.
> if (!cqe) { // else
> 	WRITE_ONCE(ctx->rings->cq_overflow,
> 			atomic_inc_return(&ctx->cached_cq_overflow));
> }

Ah yes, we should, even if this is only the flush path. I'll send out
a patch for that, unless you beat me to it.

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 1/3] io_uring: make io_cqring_events() take 'ctx' as argument
  2019-11-06 23:53 [PATCHSET v2 " Jens Axboe
@ 2019-11-06 23:53 ` Jens Axboe
  0 siblings, 0 replies; 9+ messages in thread
From: Jens Axboe @ 2019-11-06 23:53 UTC (permalink / raw)
  To: io-uring; +Cc: linux-block, asml.silence, jannh, Jens Axboe

The rings can be derived from the ctx, and we need the ctx there for
a future change.

No functional changes in this patch.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index b0dd8d322360..36ca7bc38ebf 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -859,8 +859,10 @@ static void io_put_req(struct io_kiocb *req, struct io_kiocb **nxtptr)
 	}
 }
 
-static unsigned io_cqring_events(struct io_rings *rings)
+static unsigned io_cqring_events(struct io_ring_ctx *ctx)
 {
+	struct io_rings *rings = ctx->rings;
+
 	/* See comment at the top of this file */
 	smp_rmb();
 	return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
@@ -1016,7 +1018,7 @@ static int __io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
 		 * If we do, we can potentially be spinning for commands that
 		 * already triggered a CQE (eg in error).
 		 */
-		if (io_cqring_events(ctx->rings))
+		if (io_cqring_events(ctx))
 			break;
 
 		/*
@@ -3064,7 +3066,7 @@ static inline bool io_should_wake(struct io_wait_queue *iowq)
 	 * started waiting. For timeouts, we always want to return to userspace,
 	 * regardless of event count.
 	 */
-	return io_cqring_events(ctx->rings) >= iowq->to_wait ||
+	return io_cqring_events(ctx) >= iowq->to_wait ||
 			atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
 }
 
@@ -3099,7 +3101,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
 	struct io_rings *rings = ctx->rings;
 	int ret = 0;
 
-	if (io_cqring_events(rings) >= min_events)
+	if (io_cqring_events(ctx) >= min_events)
 		return 0;
 
 	if (sig) {
-- 
2.24.0


^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, back to index

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-11-07 16:00 [PATCHSET v3 0/3] io_uring CQ ring backpressure Jens Axboe
2019-11-07 16:00 ` [PATCH 1/3] io_uring: make io_cqring_events() take 'ctx' as argument Jens Axboe
2019-11-07 16:00 ` [PATCH 2/3] io_uring: pass in io_kiocb to fill/add CQ handlers Jens Axboe
2019-11-07 16:00 ` [PATCH 3/3] io_uring: add support for backlogged CQ ring Jens Axboe
2019-11-09 12:25   ` Pavel Begunkov
2019-11-09 12:33     ` Pavel Begunkov
2019-11-09 14:14       ` Jens Axboe
2019-11-08  9:26 ` [PATCHSET v3 0/3] io_uring CQ ring backpressure Pavel Begunkov
  -- strict thread matches above, loose matches on Subject: below --
2019-11-06 23:53 [PATCHSET v2 " Jens Axboe
2019-11-06 23:53 ` [PATCH 1/3] io_uring: make io_cqring_events() take 'ctx' as argument Jens Axboe

IO-Uring Archive on lore.kernel.org

Archives are clonable:
	git clone --mirror https://lore.kernel.org/io-uring/0 io-uring/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 io-uring io-uring/ https://lore.kernel.org/io-uring \
		io-uring@vger.kernel.org
	public-inbox-index io-uring

Example config snippet for mirrors

Newsgroup available over NNTP:
	nntp://nntp.lore.kernel.org/org.kernel.vger.io-uring


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git