Linux-Fsdevel Archive on lore.kernel.org
 help / Atom feed
From: Jens Axboe <axboe@kernel.dk>
To: linux-fsdevel@vger.kernel.org, linux-block@vger.kernel.org
Cc: viro@zeniv.linux.org.uk, Jens Axboe <axboe@kernel.dk>
Subject: [PATCH 3/3] io_uring: add support for sqe links
Date: Fri, 17 May 2019 15:41:31 -0600
Message-ID: <20190517214131.5925-4-axboe@kernel.dk> (raw)
In-Reply-To: <20190517214131.5925-1-axboe@kernel.dk>

With SQE links, we can create chains of dependent SQEs. One example
would be queueing an SQE that's a read from one file descriptor, with
the linked SQE being a write to another with the same set of buffers.

An SQE link will not stall the pipeline, it'll just ensure that
dependent SQEs aren't issued before the previous link has completed.

Any error at submission or completion time will break the chain of SQEs.
For completions, this also includes short reads or writes, as the next
SQE could depend on the previous one being fully completed.

Any SQE in a chain that gets canceled due to any of the above errors,
will get an CQE fill with -ECANCELED as the error value.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c                 | 211 +++++++++++++++++++++++++++-------
 include/uapi/linux/io_uring.h |   1 +
 2 files changed, 172 insertions(+), 40 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 3e7f08094a85..b45313ed8337 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -322,6 +322,7 @@ struct io_kiocb {
 
 	struct io_ring_ctx	*ctx;
 	struct list_head	list;
+	struct list_head	link_list;
 	unsigned int		flags;
 	refcount_t		refs;
 #define REQ_F_NOWAIT		1	/* must not punt to workers */
@@ -330,8 +331,10 @@ struct io_kiocb {
 #define REQ_F_SEQ_PREV		8	/* sequential with previous */
 #define REQ_F_IO_DRAIN		16	/* drain existing IO first */
 #define REQ_F_IO_DRAINED	32	/* drain done */
+#define REQ_F_LINK		64	/* linked sqes */
+#define REQ_F_FAIL_LINK		128	/* fail rest of links */
 	u64			user_data;
-	u32			error;	/* iopoll result from callback */
+	u32			result;
 	u32			sequence;
 
 	struct work_struct	work;
@@ -583,6 +586,7 @@ static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
 	req->flags = 0;
 	/* one is dropped after submission, the other at completion */
 	refcount_set(&req->refs, 2);
+	req->result = 0;
 	return req;
 out:
 	io_ring_drop_ctx_refs(ctx, 1);
@@ -598,7 +602,7 @@ static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
 	}
 }
 
-static void io_free_req(struct io_kiocb *req)
+static void __io_free_req(struct io_kiocb *req)
 {
 	if (req->file && !(req->flags & REQ_F_FIXED_FILE))
 		fput(req->file);
@@ -606,6 +610,56 @@ static void io_free_req(struct io_kiocb *req)
 	kmem_cache_free(req_cachep, req);
 }
 
+static void io_req_link_next(struct io_kiocb *req)
+{
+	struct io_kiocb *nxt;
+
+	nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb, list);
+	list_del(&nxt->list);
+	if (!list_empty(&req->link_list)) {
+		INIT_LIST_HEAD(&nxt->link_list);
+		list_splice(&req->link_list, &nxt->link_list);
+		nxt->flags |= REQ_F_LINK;
+	}
+
+	INIT_WORK(&nxt->work, io_sq_wq_submit_work);
+	queue_work(req->ctx->sqo_wq, &nxt->work);
+}
+
+/*
+ * Called if REQ_F_LINK is set, and we fail the head request
+ */
+static void io_fail_links(struct io_kiocb *req)
+{
+	struct io_kiocb *link;
+
+	while (!list_empty(&req->link_list)) {
+		link = list_first_entry(&req->link_list, struct io_kiocb, list);
+		list_del(&link->list);
+
+		io_cqring_add_event(req->ctx, link->user_data, -ECANCELED);
+		__io_free_req(link);
+	}
+}
+
+static void io_free_req(struct io_kiocb *req)
+{
+	/*
+	 * If LINK is set, we have dependent requests in this chain. If we
+	 * didn't fail this request, queue the first one up, moving any other
+	 * dependencies to the next request. In case of failure, fail the rest
+	 * of the chain.
+	 */
+	if (req->flags & REQ_F_LINK) {
+		if (req->flags & REQ_F_FAIL_LINK)
+			io_fail_links(req);
+		else
+			io_req_link_next(req);
+	}
+
+	__io_free_req(req);
+}
+
 static void io_put_req(struct io_kiocb *req)
 {
 	if (refcount_dec_and_test(&req->refs))
@@ -627,7 +681,7 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
 		req = list_first_entry(done, struct io_kiocb, list);
 		list_del(&req->list);
 
-		io_cqring_fill_event(ctx, req->user_data, req->error);
+		io_cqring_fill_event(ctx, req->user_data, req->result);
 		(*nr_events)++;
 
 		if (refcount_dec_and_test(&req->refs)) {
@@ -636,7 +690,8 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
 			 * completions for those, only batch free for fixed
 			 * file.
 			 */
-			if (req->flags & REQ_F_FIXED_FILE) {
+			if ((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) ==
+			    REQ_F_FIXED_FILE) {
 				reqs[to_free++] = req;
 				if (to_free == ARRAY_SIZE(reqs))
 					io_free_req_many(ctx, reqs, &to_free);
@@ -775,6 +830,8 @@ static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
 
 	kiocb_end_write(kiocb);
 
+	if ((req->flags & REQ_F_LINK) && res != req->result)
+		req->flags |= REQ_F_FAIL_LINK;
 	io_cqring_add_event(req->ctx, req->user_data, res);
 	io_put_req(req);
 }
@@ -785,7 +842,9 @@ static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
 
 	kiocb_end_write(kiocb);
 
-	req->error = res;
+	if ((req->flags & REQ_F_LINK) && res != req->result)
+		req->flags |= REQ_F_FAIL_LINK;
+	req->result = res;
 	if (res != -EAGAIN)
 		req->flags |= REQ_F_IOPOLL_COMPLETED;
 }
@@ -928,7 +987,6 @@ static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
 		    !kiocb->ki_filp->f_op->iopoll)
 			return -EOPNOTSUPP;
 
-		req->error = 0;
 		kiocb->ki_flags |= IOCB_HIPRI;
 		kiocb->ki_complete = io_complete_rw_iopoll;
 	} else {
@@ -1106,6 +1164,9 @@ static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
 		return ret;
 
 	read_size = ret;
+	if (req->flags & REQ_F_LINK)
+		req->result = read_size;
+
 	iov_count = iov_iter_count(&iter);
 	ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
 	if (!ret) {
@@ -1163,6 +1224,9 @@ static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
 	if (ret < 0)
 		return ret;
 
+	if (req->flags & REQ_F_LINK)
+		req->result = ret;
+
 	iov_count = iov_iter_count(&iter);
 
 	ret = -EAGAIN;
@@ -1266,6 +1330,8 @@ static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 				end > 0 ? end : LLONG_MAX,
 				fsync_flags & IORING_FSYNC_DATASYNC);
 
+	if (ret < 0)
+		req->flags |= REQ_F_FAIL_LINK;
 	io_cqring_add_event(req->ctx, sqe->user_data, ret);
 	io_put_req(req);
 	return 0;
@@ -1310,6 +1376,8 @@ static int io_sync_file_range(struct io_kiocb *req,
 
 	ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags);
 
+	if (ret < 0)
+		req->flags |= REQ_F_FAIL_LINK;
 	io_cqring_add_event(req->ctx, sqe->user_data, ret);
 	io_put_req(req);
 	return 0;
@@ -1337,6 +1405,8 @@ static int io_sendmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 		ret = __sys_sendmsg_sock(sock, msg, flags);
 	}
 
+	if (ret < 0)
+		req->flags |= REQ_F_FAIL_LINK;
 	io_cqring_add_event(req->ctx, sqe->user_data, ret);
 	io_put_req(req);
 	return 0;
@@ -1367,6 +1437,8 @@ static int io_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 		ret = __sys_recvmsg_sock(sock, msg, flags);
 	}
 
+	if (ret < 0)
+		req->flags |= REQ_F_FAIL_LINK;
 	io_cqring_add_event(req->ctx, sqe->user_data, ret);
 	io_put_req(req);
 	return 0;
@@ -1622,9 +1694,10 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 {
 	int ret, opcode;
 
+	req->user_data = READ_ONCE(s->sqe->user_data);
+
 	if (unlikely(s->index >= ctx->sq_entries))
 		return -EINVAL;
-	req->user_data = READ_ONCE(s->sqe->user_data);
 
 	opcode = READ_ONCE(s->sqe->opcode);
 	switch (opcode) {
@@ -1674,7 +1747,7 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 		return ret;
 
 	if (ctx->flags & IORING_SETUP_IOPOLL) {
-		if (req->error == -EAGAIN)
+		if (req->result == -EAGAIN)
 			return -EAGAIN;
 
 		/* workqueue context doesn't hold uring_lock, grab it now */
@@ -1900,31 +1973,11 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
 	return 0;
 }
 
-static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
-			 struct io_submit_state *state)
+static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
+			struct sqe_submit *s)
 {
-	struct io_kiocb *req;
 	int ret;
 
-	/* enforce forwards compatibility on users */
-	if (unlikely(s->sqe->flags & ~(IOSQE_FIXED_FILE | IOSQE_IO_DRAIN)))
-		return -EINVAL;
-
-	req = io_get_req(ctx, state);
-	if (unlikely(!req))
-		return -EAGAIN;
-
-	ret = io_req_set_file(ctx, s, state, req);
-	if (unlikely(ret))
-		goto out;
-
-	ret = io_req_defer(ctx, req, s->sqe);
-	if (ret) {
-		if (ret == -EIOCBQUEUED)
-			ret = 0;
-		return ret;
-	}
-
 	ret = __io_submit_sqe(ctx, req, s, true);
 	if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
 		struct io_uring_sqe *sqe_copy;
@@ -1947,20 +2000,82 @@ static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
 
 			/*
 			 * Queued up for async execution, worker will release
-			 * submit reference when the iocb is actually
-			 * submitted.
+			 * submit reference when the iocb is actually submitted.
 			 */
 			return 0;
 		}
 	}
 
-out:
 	/* drop submission reference */
 	io_put_req(req);
 
 	/* and drop final reference, if we failed */
-	if (ret)
+	if (ret) {
+		io_cqring_add_event(ctx, req->user_data, ret);
+		if (req->flags & REQ_F_LINK)
+			req->flags |= REQ_F_FAIL_LINK;
 		io_put_req(req);
+	}
+
+	return ret;
+}
+
+#define SQE_VALID_FLAGS	(IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK)
+
+static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
+			 struct io_submit_state *state, struct io_kiocb **link)
+{
+	struct io_uring_sqe *sqe_copy;
+	struct io_kiocb *req;
+	int ret;
+
+	/* enforce forwards compatibility on users */
+	if (unlikely(s->sqe->flags & ~SQE_VALID_FLAGS))
+		return -EINVAL;
+
+	req = io_get_req(ctx, state);
+	if (unlikely(!req))
+		return -EAGAIN;
+
+	ret = io_req_set_file(ctx, s, state, req);
+	if (unlikely(ret)) {
+		io_free_req(req);
+		return ret;
+	}
+
+	ret = io_req_defer(ctx, req, s->sqe);
+	if (ret) {
+		if (ret == -EIOCBQUEUED)
+			ret = 0;
+		return ret;
+	}
+
+	/*
+	 * If we already have a head request, queue this one for async
+	 * submittal once the head completes. If we don't have a head but
+	 * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
+	 * submitted sync once the chain is complete. If none of those
+	 * conditions are true (normal request), then just queue it.
+	 */
+	if (*link) {
+		struct io_kiocb *prev = *link;
+
+		sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL);
+		if (!sqe_copy)
+			return -EAGAIN;
+
+		s->sqe = sqe_copy;
+		memcpy(&req->submit, s, sizeof(*s));
+		list_add_tail(&req->list, &prev->link_list);
+	} else if (s->sqe->flags & IOSQE_IO_LINK) {
+		req->flags |= REQ_F_LINK;
+
+		memcpy(&req->submit, s, sizeof(*s));
+		INIT_LIST_HEAD(&req->link_list);
+		*link = req;
+	} else {
+		ret = io_queue_sqe(ctx, req, s);
+	}
 
 	return ret;
 }
@@ -2047,6 +2162,8 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
 			  unsigned int nr, bool has_user, bool mm_fault)
 {
 	struct io_submit_state state, *statep = NULL;
+	struct io_kiocb *link = NULL;
+	bool prev_was_link = false;
 	int ret, i, submitted = 0;
 
 	if (nr > IO_PLUG_THRESHOLD) {
@@ -2055,22 +2172,28 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
 	}
 
 	for (i = 0; i < nr; i++) {
+		if (!prev_was_link && link) {
+			io_queue_sqe(ctx, link, &link->submit);
+			link = NULL;
+		}
+		prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0;
+
 		if (unlikely(mm_fault)) {
 			ret = -EFAULT;
 		} else {
 			sqes[i].has_user = has_user;
 			sqes[i].needs_lock = true;
 			sqes[i].needs_fixed_file = true;
-			ret = io_submit_sqe(ctx, &sqes[i], statep);
+			ret = io_submit_sqe(ctx, &sqes[i], statep, &link);
 		}
 		if (!ret) {
 			submitted++;
 			continue;
 		}
-
-		io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret);
 	}
 
+	if (link)
+		io_queue_sqe(ctx, link, &link->submit);
 	if (statep)
 		io_submit_state_end(&state);
 
@@ -2211,6 +2334,8 @@ static int io_sq_thread(void *data)
 static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
 {
 	struct io_submit_state state, *statep = NULL;
+	struct io_kiocb *link = NULL;
+	bool prev_was_link = false;
 	int i, submit = 0;
 
 	if (to_submit > IO_PLUG_THRESHOLD) {
@@ -2225,17 +2350,23 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
 		if (!io_get_sqring(ctx, &s))
 			break;
 
+		if (!prev_was_link && link) {
+			io_queue_sqe(ctx, link, &link->submit);
+			link = NULL;
+		}
+		prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
+
 		s.has_user = true;
 		s.needs_lock = false;
 		s.needs_fixed_file = false;
 		submit++;
 
-		ret = io_submit_sqe(ctx, &s, statep);
-		if (ret)
-			io_cqring_add_event(ctx, s.sqe->user_data, ret);
+		ret = io_submit_sqe(ctx, &s, statep, &link);
 	}
 	io_commit_sqring(ctx);
 
+	if (link)
+		io_queue_sqe(ctx, link, &link->submit);
 	if (statep)
 		io_submit_state_end(statep);
 
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index c5ea202684e3..1e1652f25cc1 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -41,6 +41,7 @@ struct io_uring_sqe {
  */
 #define IOSQE_FIXED_FILE	(1U << 0)	/* use fixed fileset */
 #define IOSQE_IO_DRAIN		(1U << 1)	/* issue after inflight IO */
+#define IOSQE_IO_LINK		(1U << 2)	/* links next sqe */
 
 /*
  * io_uring_setup() flags
-- 
2.17.1


  parent reply index

Thread overview: 6+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-05-17 21:41 [PATCHSET 0/3] io_uring: support for linked SQEs Jens Axboe
2019-05-17 21:41 ` [PATCH 1/3] uio: make import_iovec()/compat_import_iovec() return bytes on success Jens Axboe
2019-05-17 21:41 ` [PATCH 2/3] io_uring: punt short reads to async context Jens Axboe
2019-05-17 21:41 ` Jens Axboe [this message]
2019-05-22 16:34 ` [PATCHSET 0/3] io_uring: support for linked SQEs Christoph Hellwig
2019-05-22 16:49   ` Jens Axboe

Reply instructions:

You may reply publically to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20190517214131.5925-4-axboe@kernel.dk \
    --to=axboe@kernel.dk \
    --cc=linux-block@vger.kernel.org \
    --cc=linux-fsdevel@vger.kernel.org \
    --cc=viro@zeniv.linux.org.uk \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

Linux-Fsdevel Archive on lore.kernel.org

Archives are clonable:
	git clone --mirror https://lore.kernel.org/linux-fsdevel/0 linux-fsdevel/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 linux-fsdevel linux-fsdevel/ https://lore.kernel.org/linux-fsdevel \
		linux-fsdevel@vger.kernel.org linux-fsdevel@archiver.kernel.org
	public-inbox-index linux-fsdevel


Newsgroup available over NNTP:
	nntp://nntp.lore.kernel.org/org.kernel.vger.linux-fsdevel


AGPL code for this site: git clone https://public-inbox.org/ public-inbox