io-uring.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH RFC v2 liburing 0/2] multishot recvmsg
@ 2022-07-14 11:54 Dylan Yudaken
  2022-07-14 11:54 ` [PATCH RFC v2 liburing 1/2] add multishot recvmsg API Dylan Yudaken
                   ` (3 more replies)
  0 siblings, 4 replies; 5+ messages in thread
From: Dylan Yudaken @ 2022-07-14 11:54 UTC (permalink / raw)
  To: axboe, asml.silence; +Cc: io-uring, Kernel-team, Dylan Yudaken

This series adds an API (patch 1) and a test (#2) for multishot recvmsg.

I have not included docs yet, but I want to get feedback on the API for handling
the result (if there is any).

There was no feedback on v1, but in v2 I added more API to make it easier to deal
with truncated payloads. Additionally there is extra testing for this.

Dylan Yudaken (2):
  add multishot recvmsg API
  add tests for multishot recvmsg

 src/include/liburing.h          |  66 ++++++++++++
 src/include/liburing/io_uring.h |   7 ++
 test/recv-multishot.c           | 180 ++++++++++++++++++++++++++++----
 3 files changed, 234 insertions(+), 19 deletions(-)


base-commit: a705a6b307adfa52ba6315df0b8d5964a3b898b2
-- 
2.30.2


^ permalink raw reply	[flat|nested] 5+ messages in thread

* [PATCH RFC v2 liburing 1/2] add multishot recvmsg API
  2022-07-14 11:54 [PATCH RFC v2 liburing 0/2] multishot recvmsg Dylan Yudaken
@ 2022-07-14 11:54 ` Dylan Yudaken
  2022-07-14 11:54 ` [PATCH RFC v2 liburing 2/2] add tests for multishot recvmsg Dylan Yudaken
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 5+ messages in thread
From: Dylan Yudaken @ 2022-07-14 11:54 UTC (permalink / raw)
  To: axboe, asml.silence; +Cc: io-uring, Kernel-team, Dylan Yudaken

This adds a new API to do multishot recvmsg. This is more complicated than
multishot recv as it requires handling a well known data layout copied by
the kernel.

Signed-off-by: Dylan Yudaken <dylany@fb.com>
---
 src/include/liburing.h          | 66 +++++++++++++++++++++++++++++++++
 src/include/liburing/io_uring.h |  7 ++++
 2 files changed, 73 insertions(+)

diff --git a/src/include/liburing.h b/src/include/liburing.h
index d35bfa9..c9e4e75 100644
--- a/src/include/liburing.h
+++ b/src/include/liburing.h
@@ -419,6 +419,13 @@ static inline void io_uring_prep_recvmsg(struct io_uring_sqe *sqe, int fd,
 	sqe->msg_flags = flags;
 }
 
+static inline void io_uring_prep_recvmsg_multishot(struct io_uring_sqe *sqe, int fd,
+						   struct msghdr *msg, unsigned flags)
+{
+	io_uring_prep_recvmsg(sqe, fd, msg, flags);
+	sqe->ioprio |= IORING_RECV_MULTISHOT;
+}
+
 static inline void io_uring_prep_sendmsg(struct io_uring_sqe *sqe, int fd,
 					 const struct msghdr *msg,
 					 unsigned flags)
@@ -685,6 +692,65 @@ static inline void io_uring_prep_recv_multishot(struct io_uring_sqe *sqe,
 	sqe->ioprio |= IORING_RECV_MULTISHOT;
 }
 
+static inline struct io_uring_recvmsg_out *io_uring_recvmsg_validate(
+	void *buf, int buf_len, struct msghdr *m)
+{
+	size_t header = m->msg_controllen + m->msg_namelen + sizeof(struct io_uring_recvmsg_out);
+
+	if (buf_len < header)
+		return NULL;
+	return (struct io_uring_recvmsg_out *)buf;
+}
+
+static inline void *io_uring_recvmsg_name(struct io_uring_recvmsg_out *o)
+{
+	return (void *)&o[1];
+}
+
+static inline struct cmsghdr *io_uring_recvmsg_cmsg_firsthdr(struct io_uring_recvmsg_out *o,
+							     struct msghdr *m)
+{
+	if (o->controllen < sizeof(struct cmsghdr))
+		return NULL;
+	return (struct cmsghdr *)((unsigned char *)io_uring_recvmsg_name(o) + m->msg_namelen);
+}
+
+static inline void *io_uring_recvmsg_payload(struct io_uring_recvmsg_out *o,
+					     struct msghdr *m)
+{
+	return (void *)((unsigned char *)io_uring_recvmsg_name(o) +
+			m->msg_namelen + m->msg_controllen);
+}
+
+static inline size_t io_uring_recvmsg_payload_length(struct io_uring_recvmsg_out *o,
+						     int buf_length,
+						     struct msghdr *m)
+{
+	unsigned long payload_start = (unsigned long)io_uring_recvmsg_payload(o, m);
+	unsigned long payload_end = (unsigned long)o + buf_length;
+
+	return payload_end - payload_start;
+}
+
+static inline struct cmsghdr *io_uring_recvmsg_cmsg_nexthdr(struct io_uring_recvmsg_out *o,
+							    struct msghdr *m,
+							    struct cmsghdr *cmsg)
+{
+	unsigned char *end;
+
+	if (cmsg->cmsg_len < sizeof(struct cmsghdr))
+		return NULL;
+	end = (unsigned char *)io_uring_recvmsg_payload(o, m);
+	cmsg = (struct cmsghdr *)((unsigned char *)cmsg + CMSG_ALIGN(cmsg->cmsg_len));
+
+	if ((unsigned char *)(cmsg + 1) > end)
+		return NULL;
+	if (((unsigned char *)cmsg) + CMSG_ALIGN(cmsg->cmsg_len) > end)
+		return NULL;
+
+	return cmsg;
+}
+
 static inline void io_uring_prep_openat2(struct io_uring_sqe *sqe, int dfd,
 					const char *path, struct open_how *how)
 {
diff --git a/src/include/liburing/io_uring.h b/src/include/liburing/io_uring.h
index 6b260e2..51126c4 100644
--- a/src/include/liburing/io_uring.h
+++ b/src/include/liburing/io_uring.h
@@ -573,6 +573,13 @@ struct io_uring_file_index_range {
 	__u64	resv;
 };
 
+struct io_uring_recvmsg_out {
+	__u32 namelen;
+	__u32 controllen;
+	__u32 payloadlen;
+	__u32 flags;
+};
+
 /*
  * accept flags stored in sqe->ioprio
  */
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH RFC v2 liburing 2/2] add tests for multishot recvmsg
  2022-07-14 11:54 [PATCH RFC v2 liburing 0/2] multishot recvmsg Dylan Yudaken
  2022-07-14 11:54 ` [PATCH RFC v2 liburing 1/2] add multishot recvmsg API Dylan Yudaken
@ 2022-07-14 11:54 ` Dylan Yudaken
  2022-07-14 18:01 ` [PATCH RFC v2 liburing 0/2] " Jens Axboe
  2022-07-14 18:02 ` Jens Axboe
  3 siblings, 0 replies; 5+ messages in thread
From: Dylan Yudaken @ 2022-07-14 11:54 UTC (permalink / raw)
  To: axboe, asml.silence; +Cc: io-uring, Kernel-team, Dylan Yudaken

Expand the multishot recv test to include recvmsg.
This also checks that sockaddr comes back, and that control messages work
properly.

Signed-off-by: Dylan Yudaken <dylany@fb.com>
---
 test/recv-multishot.c | 180 +++++++++++++++++++++++++++++++++++++-----
 1 file changed, 161 insertions(+), 19 deletions(-)

diff --git a/test/recv-multishot.c b/test/recv-multishot.c
index 9df8184..a322e43 100644
--- a/test/recv-multishot.c
+++ b/test/recv-multishot.c
@@ -27,20 +27,45 @@ enum early_error_t {
 struct args {
 	bool stream;
 	bool wait_each;
+	bool recvmsg;
 	enum early_error_t early_error;
 };
 
+static int check_sockaddr(struct sockaddr_in *in)
+{
+	struct in_addr expected;
+
+	inet_pton(AF_INET, "127.0.0.1", &expected);
+	if (in->sin_family != AF_INET) {
+		fprintf(stderr, "bad family %d\n", (int)htons(in->sin_family));
+		return -1;
+	}
+	if (memcmp(&expected, &in->sin_addr, sizeof(in->sin_addr))) {
+		char buff[256];
+		const char *addr = inet_ntop(AF_INET, &in->sin_addr, buff, sizeof(buff));
+
+		fprintf(stderr, "unexpected address %s\n", addr ? addr : "INVALID");
+		return -1;
+	}
+	return 0;
+}
+
 static int test(struct args *args)
 {
 	int const N = 8;
 	int const N_BUFFS = N * 64;
 	int const N_CQE_OVERFLOW = 4;
 	int const min_cqes = 2;
+	int const NAME_LEN = sizeof(struct sockaddr_storage);
+	int const CONTROL_LEN = CMSG_ALIGN(sizeof(struct sockaddr_storage))
+					+ sizeof(struct cmsghdr);
 	struct io_uring ring;
 	struct io_uring_cqe *cqe;
 	struct io_uring_sqe *sqe;
-	int fds[2], ret, i, j, total_sent_bytes = 0, total_recv_bytes = 0;
+	int fds[2], ret, i, j;
+	int total_sent_bytes = 0, total_recv_bytes = 0, total_dropped_bytes = 0;
 	int send_buff[256];
+	int *sent_buffs[N_BUFFS];
 	int *recv_buffs[N_BUFFS];
 	int *at;
 	struct io_uring_cqe recv_cqe[N_BUFFS];
@@ -50,7 +75,7 @@ static int test(struct args *args)
 	struct __kernel_timespec timeout = {
 		.tv_sec = 1,
 	};
-
+	struct msghdr msg;
 
 	memset(recv_buffs, 0, sizeof(recv_buffs));
 
@@ -75,21 +100,42 @@ static int test(struct args *args)
 		return ret;
 	}
 
+	if (!args->stream) {
+		bool val = true;
+
+		/* force some cmsgs to come back to us */
+		ret = setsockopt(fds[0], IPPROTO_IP, IP_RECVORIGDSTADDR, &val,
+				 sizeof(val));
+		if (ret) {
+			fprintf(stderr, "setsockopt failed %d\n", errno);
+			goto cleanup;
+		}
+	}
+
 	for (i = 0; i < ARRAY_SIZE(send_buff); i++)
 		send_buff[i] = i;
 
 	for (i = 0; i < ARRAY_SIZE(recv_buffs); i++) {
 		/* prepare some different sized buffers */
-		int buffer_size = (i % 2 == 0 && args->stream) ? 1 : N * sizeof(int);
+		int buffer_size = (i % 2 == 0 && (args->stream || args->recvmsg)) ? 1 : N;
+
+		buffer_size *= sizeof(int);
+		if (args->recvmsg) {
+			buffer_size +=
+				sizeof(struct io_uring_recvmsg_out) +
+				NAME_LEN +
+				CONTROL_LEN;
+		}
 
-		recv_buffs[i] = malloc(sizeof(*at) * buffer_size);
+		recv_buffs[i] = malloc(buffer_size);
 
 		if (i > 2 && args->early_error == ERROR_NOT_ENOUGH_BUFFERS)
 			continue;
 
 		sqe = io_uring_get_sqe(&ring);
 		io_uring_prep_provide_buffers(sqe, recv_buffs[i],
-					buffer_size * sizeof(*recv_buffs[i]), 1, 7, i);
+					buffer_size, 1, 7, i);
+		memset(recv_buffs[i], 0xcc, buffer_size);
 		if (io_uring_submit_and_wait_timeout(&ring, &cqe, 1, &timeout, NULL) != 0) {
 			fprintf(stderr, "provide buffers failed: %d\n", ret);
 			ret = -1;
@@ -99,7 +145,19 @@ static int test(struct args *args)
 	}
 
 	sqe = io_uring_get_sqe(&ring);
-	io_uring_prep_recv_multishot(sqe, fds[0], NULL, 0, 0);
+	if (args->recvmsg) {
+		unsigned int flags = 0;
+
+		if (!args->stream)
+			flags |= MSG_TRUNC;
+
+		memset(&msg, 0, sizeof(msg));
+		msg.msg_namelen = NAME_LEN;
+		msg.msg_controllen = CONTROL_LEN;
+		io_uring_prep_recvmsg_multishot(sqe, fds[0], &msg, flags);
+	} else {
+		io_uring_prep_recv_multishot(sqe, fds[0], NULL, 0, 0);
+	}
 	sqe->flags |= IOSQE_BUFFER_SELECT;
 	sqe->buf_group = 7;
 	io_uring_sqe_set_data64(sqe, 1234);
@@ -111,6 +169,7 @@ static int test(struct args *args)
 		int to_send = sizeof(*at) * (i+1);
 
 		total_sent_bytes += to_send;
+		sent_buffs[i] = at;
 		if (send(fds[1], at, to_send, 0) != to_send) {
 			if (early_error_started)
 				break;
@@ -202,9 +261,12 @@ static int test(struct args *args)
 			(args->early_error == ERROR_EARLY_OVERFLOW &&
 			 !args->wait_each && i == N_CQE_OVERFLOW);
 		int *this_recv;
+		int orig_payload_size = cqe->res;
 
 
 		if (should_be_last) {
+			int used_res = cqe->res;
+
 			if (!is_last) {
 				fprintf(stderr, "not last cqe had error %d\n", i);
 				goto cleanup;
@@ -234,7 +296,22 @@ static int test(struct args *args)
 				break;
 			case ERROR_NONE:
 			case ERROR_EARLY_CLOSE_SENDER:
-				if (cqe->res != 0) {
+				if (args->recvmsg && (cqe->flags & IORING_CQE_F_BUFFER)) {
+					void *buff = recv_buffs[cqe->flags >> 16];
+					struct io_uring_recvmsg_out *o =
+						io_uring_recvmsg_validate(buff, cqe->res, &msg);
+
+					if (!o) {
+						fprintf(stderr, "invalid buff\n");
+						goto cleanup;
+					}
+					if (o->payloadlen != 0) {
+						fprintf(stderr, "expected 0 payloadlen, got %u\n",
+							o->payloadlen);
+						goto cleanup;
+					}
+					used_res = 0;
+				} else if (cqe->res != 0) {
 					fprintf(stderr, "early error: res %d\n", cqe->res);
 					goto cleanup;
 				}
@@ -254,7 +331,7 @@ static int test(struct args *args)
 				goto cleanup;
 			}
 
-			if (cqe->res <= 0)
+			if (used_res <= 0)
 				continue;
 		} else {
 			if (!(cqe->flags & IORING_CQE_F_MORE)) {
@@ -268,7 +345,61 @@ static int test(struct args *args)
 			goto cleanup;
 		}
 
+		this_recv = recv_buffs[cqe->flags >> 16];
+
+		if (args->recvmsg) {
+			struct io_uring_recvmsg_out *o = io_uring_recvmsg_validate(
+				this_recv, cqe->res, &msg);
+
+			if (!o) {
+				fprintf(stderr, "bad recvmsg\n");
+				goto cleanup;
+			}
+			orig_payload_size = o->payloadlen;
+
+			if (!args->stream) {
+				orig_payload_size = o->payloadlen;
+
+				struct cmsghdr *cmsg;
+
+				if (o->namelen < sizeof(struct sockaddr_in)) {
+					fprintf(stderr, "bad addr len %d",
+						o->namelen);
+					goto cleanup;
+				}
+				if (check_sockaddr((struct sockaddr_in *)io_uring_recvmsg_name(o)))
+					goto cleanup;
+
+				cmsg = io_uring_recvmsg_cmsg_firsthdr(o, &msg);
+				if (!cmsg ||
+				    cmsg->cmsg_level != IPPROTO_IP ||
+				    cmsg->cmsg_type != IP_RECVORIGDSTADDR) {
+					fprintf(stderr, "bad cmsg");
+					goto cleanup;
+				}
+				if (check_sockaddr((struct sockaddr_in *)CMSG_DATA(cmsg)))
+					goto cleanup;
+				cmsg = io_uring_recvmsg_cmsg_nexthdr(o, &msg, cmsg);
+				if (cmsg) {
+					fprintf(stderr, "unexpected extra cmsg\n");
+					goto cleanup;
+				}
+
+			}
+
+			this_recv = (int *)io_uring_recvmsg_payload(o, &msg);
+			cqe->res = io_uring_recvmsg_payload_length(o, cqe->res, &msg);
+			if (o->payloadlen != cqe->res) {
+				if (!(o->flags & MSG_TRUNC)) {
+					fprintf(stderr, "expected truncated flag\n");
+					goto cleanup;
+				}
+				total_dropped_bytes += (o->payloadlen - cqe->res);
+			}
+		}
+
 		total_recv_bytes += cqe->res;
+
 		if (cqe->res % 4 != 0) {
 			/*
 			 * doesn't seem to happen in practice, would need some
@@ -278,9 +409,20 @@ static int test(struct args *args)
 			goto cleanup;
 		}
 
-		/* check buffer arrived in order (for tcp) */
-		this_recv = recv_buffs[cqe->flags >> 16];
-		for (j = 0; args->stream && j < cqe->res / 4; j++) {
+		/*
+		 * for tcp: check buffer arrived in order
+		 * for udp: based on size validate data based on size
+		 */
+		if (!args->stream) {
+			int sent_idx = orig_payload_size / sizeof(*at) - 1;
+
+			if (sent_idx < 0 || sent_idx > N) {
+				fprintf(stderr, "Bad sent idx: %d\n", sent_idx);
+				goto cleanup;
+			}
+			at = sent_buffs[sent_idx];
+		}
+		for (j = 0; j < cqe->res / 4; j++) {
 			int sent = *at++;
 			int recv = *this_recv++;
 
@@ -291,15 +433,14 @@ static int test(struct args *args)
 		}
 	}
 
-	if (args->early_error == ERROR_NONE && total_recv_bytes < total_sent_bytes) {
+	if (args->early_error == ERROR_NONE &&
+	    total_recv_bytes + total_dropped_bytes < total_sent_bytes) {
 		fprintf(stderr,
-			"missing recv: recv=%d sent=%d\n", total_recv_bytes, total_sent_bytes);
+			"missing recv: recv=%d dropped=%d sent=%d\n",
+			total_recv_bytes, total_sent_bytes, total_dropped_bytes);
 		goto cleanup;
 	}
 
-	/* check the final one */
-	cqe = &recv_cqe[recv_cqes-1];
-
 	ret = 0;
 cleanup:
 	for (i = 0; i < ARRAY_SIZE(recv_buffs); i++)
@@ -320,18 +461,19 @@ int main(int argc, char *argv[])
 	if (argc > 1)
 		return T_EXIT_SKIP;
 
-	for (loop = 0; loop < 4; loop++) {
+	for (loop = 0; loop < 8; loop++) {
 		struct args a = {
 			.stream = loop & 0x01,
 			.wait_each = loop & 0x2,
+			.recvmsg = loop & 0x04,
 		};
 		for (early_error = 0; early_error < ERROR_EARLY_LAST; early_error++) {
 			a.early_error = (enum early_error_t)early_error;
 			ret = test(&a);
 			if (ret) {
 				fprintf(stderr,
-					"test stream=%d wait_each=%d early_error=%d failed\n",
-					a.stream, a.wait_each, a.early_error);
+					"test stream=%d wait_each=%d recvmsg=%d early_error=%d failed\n",
+					a.stream, a.wait_each, a.recvmsg, a.early_error);
 				return T_EXIT_FAIL;
 			}
 			if (no_recv_mshot)
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 5+ messages in thread

* Re: [PATCH RFC v2 liburing 0/2] multishot recvmsg
  2022-07-14 11:54 [PATCH RFC v2 liburing 0/2] multishot recvmsg Dylan Yudaken
  2022-07-14 11:54 ` [PATCH RFC v2 liburing 1/2] add multishot recvmsg API Dylan Yudaken
  2022-07-14 11:54 ` [PATCH RFC v2 liburing 2/2] add tests for multishot recvmsg Dylan Yudaken
@ 2022-07-14 18:01 ` Jens Axboe
  2022-07-14 18:02 ` Jens Axboe
  3 siblings, 0 replies; 5+ messages in thread
From: Jens Axboe @ 2022-07-14 18:01 UTC (permalink / raw)
  To: Dylan Yudaken, asml.silence; +Cc: io-uring, Kernel-team

On 7/14/22 5:54 AM, Dylan Yudaken wrote:
> This series adds an API (patch 1) and a test (#2) for multishot
> recvmsg.
> 
> I have not included docs yet, but I want to get feedback on the API
> for handling the result (if there is any).
> 
> There was no feedback on v1, but in v2 I added more API to make it
> easier to deal with truncated payloads. Additionally there is extra
> testing for this.

I'm going to shove this in for some testing. We have a bit of time to
play with before the next release, might make sense to attempt to mature
it in tree.

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [PATCH RFC v2 liburing 0/2] multishot recvmsg
  2022-07-14 11:54 [PATCH RFC v2 liburing 0/2] multishot recvmsg Dylan Yudaken
                   ` (2 preceding siblings ...)
  2022-07-14 18:01 ` [PATCH RFC v2 liburing 0/2] " Jens Axboe
@ 2022-07-14 18:02 ` Jens Axboe
  3 siblings, 0 replies; 5+ messages in thread
From: Jens Axboe @ 2022-07-14 18:02 UTC (permalink / raw)
  To: dylany, asml.silence; +Cc: Kernel-team, io-uring

On Thu, 14 Jul 2022 04:54:26 -0700, Dylan Yudaken wrote:
> This series adds an API (patch 1) and a test (#2) for multishot recvmsg.
> 
> I have not included docs yet, but I want to get feedback on the API for handling
> the result (if there is any).
> 
> There was no feedback on v1, but in v2 I added more API to make it easier to deal
> with truncated payloads. Additionally there is extra testing for this.
> 
> [...]

Applied, thanks!

[1/2] add multishot recvmsg API
      commit: 874406f7fb0942238a77041d84f07a480400ed81
[2/2] add tests for multishot recvmsg
      commit: 95dc4789e9dbacb7b3913b1a6e0dfef80fabb74d

Best regards,
-- 
Jens Axboe



^ permalink raw reply	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2022-07-14 18:02 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-07-14 11:54 [PATCH RFC v2 liburing 0/2] multishot recvmsg Dylan Yudaken
2022-07-14 11:54 ` [PATCH RFC v2 liburing 1/2] add multishot recvmsg API Dylan Yudaken
2022-07-14 11:54 ` [PATCH RFC v2 liburing 2/2] add tests for multishot recvmsg Dylan Yudaken
2022-07-14 18:01 ` [PATCH RFC v2 liburing 0/2] " Jens Axboe
2022-07-14 18:02 ` Jens Axboe

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).