All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH RFC liburing 0/2] multishot recvmsg
@ 2022-07-08 18:45 Dylan Yudaken
  2022-07-08 18:45 ` [PATCH RFC liburing 1/2] add multishot recvmsg API Dylan Yudaken
  2022-07-08 18:45 ` [PATCH RFC liburing 2/2] add tests for multishot recvmsg Dylan Yudaken
  0 siblings, 2 replies; 3+ messages in thread
From: Dylan Yudaken @ 2022-07-08 18:45 UTC (permalink / raw)
  To: axboe, asml.silence; +Cc: io-uring, Kernel-team, Dylan Yudaken

This series adds an API (patch 1) and a test (#2) for multishot recvmsg.

I have not included docs yet, but I want to get feedback on the API for handling
the result (if there is any).

Dylan Yudaken (2):
  add multishot recvmsg API
  add tests for multishot recvmsg

 src/include/liburing.h          |  59 ++++++++++++++
 src/include/liburing/io_uring.h |   7 ++
 test/recv-multishot.c           | 137 ++++++++++++++++++++++++++++----
 3 files changed, 189 insertions(+), 14 deletions(-)


base-commit: 5d0e33f50a06db768b1891972daab40732400778
-- 
2.30.2


^ permalink raw reply	[flat|nested] 3+ messages in thread

* [PATCH RFC liburing 1/2] add multishot recvmsg API
  2022-07-08 18:45 [PATCH RFC liburing 0/2] multishot recvmsg Dylan Yudaken
@ 2022-07-08 18:45 ` Dylan Yudaken
  2022-07-08 18:45 ` [PATCH RFC liburing 2/2] add tests for multishot recvmsg Dylan Yudaken
  1 sibling, 0 replies; 3+ messages in thread
From: Dylan Yudaken @ 2022-07-08 18:45 UTC (permalink / raw)
  To: axboe, asml.silence; +Cc: io-uring, Kernel-team, Dylan Yudaken

This adds a new API to do multishot recvmsg. This is more complicated than
multishot recv as it requires handling a well known data layout copied by
the kernel.

Signed-off-by: Dylan Yudaken <dylany@fb.com>
---
 src/include/liburing.h          | 59 +++++++++++++++++++++++++++++++++
 src/include/liburing/io_uring.h |  7 ++++
 2 files changed, 66 insertions(+)

diff --git a/src/include/liburing.h b/src/include/liburing.h
index d35bfa9..3f18bd2 100644
--- a/src/include/liburing.h
+++ b/src/include/liburing.h
@@ -419,6 +419,13 @@ static inline void io_uring_prep_recvmsg(struct io_uring_sqe *sqe, int fd,
 	sqe->msg_flags = flags;
 }
 
+static inline void io_uring_prep_recvmsg_multishot(struct io_uring_sqe *sqe, int fd,
+						   struct msghdr *msg, unsigned flags)
+{
+	io_uring_prep_recvmsg(sqe, fd, msg, flags);
+	sqe->ioprio |= IORING_RECV_MULTISHOT;
+}
+
 static inline void io_uring_prep_sendmsg(struct io_uring_sqe *sqe, int fd,
 					 const struct msghdr *msg,
 					 unsigned flags)
@@ -685,6 +692,58 @@ static inline void io_uring_prep_recv_multishot(struct io_uring_sqe *sqe,
 	sqe->ioprio |= IORING_RECV_MULTISHOT;
 }
 
+static inline struct io_uring_recvmsg_out *io_uring_recvmsg_validate(
+	void *buf, int buf_len, struct msghdr *m)
+{
+	struct io_uring_recvmsg_out *ret;
+	size_t header = m->msg_controllen + m->msg_namelen + sizeof(struct io_uring_recvmsg_out);
+
+	if (buf_len < header)
+		return NULL;
+	ret = (struct io_uring_recvmsg_out *)buf;
+	if (buf_len < header + ret->payloadlen)
+		return NULL;
+	return ret;
+}
+
+static inline void *io_uring_recvmsg_name(struct io_uring_recvmsg_out *o)
+{
+	return (void*)&o[1];
+}
+
+static inline struct cmsghdr *io_uring_recvmsg_cmsg_firsthdr(struct io_uring_recvmsg_out *o,
+							     struct msghdr *m)
+{
+	if (o->controllen < sizeof(struct cmsghdr))
+		return NULL;
+	return (struct cmsghdr *)((unsigned char*)io_uring_recvmsg_name(o) + m->msg_namelen);
+}
+
+static inline void *io_uring_recvmsg_payload(struct io_uring_recvmsg_out *o,
+					     struct msghdr *m)
+{
+	return (void*)((unsigned char*)io_uring_recvmsg_name(o) + m->msg_namelen + m->msg_controllen);
+}
+
+static inline struct cmsghdr *io_uring_recvmsg_cmsg_nexthdr(struct io_uring_recvmsg_out *o,
+							    struct msghdr *m,
+							    struct cmsghdr *cmsg)
+{
+	unsigned char *end;
+
+	if (cmsg->cmsg_len < sizeof (struct cmsghdr))
+		return NULL;
+	end = (unsigned char *)io_uring_recvmsg_payload(o, m);
+	cmsg = (struct cmsghdr *)((unsigned char *)cmsg + CMSG_ALIGN(cmsg->cmsg_len));
+
+	if ((unsigned char *)(cmsg + 1) > end)
+		return NULL;
+	if (((unsigned char *)cmsg) + CMSG_ALIGN(cmsg->cmsg_len) > end)
+		return NULL;
+
+	return cmsg;
+}
+
 static inline void io_uring_prep_openat2(struct io_uring_sqe *sqe, int dfd,
 					const char *path, struct open_how *how)
 {
diff --git a/src/include/liburing/io_uring.h b/src/include/liburing/io_uring.h
index fbf6403..5e111b4 100644
--- a/src/include/liburing/io_uring.h
+++ b/src/include/liburing/io_uring.h
@@ -573,6 +573,13 @@ struct io_uring_file_index_range {
 	__u64	resv;
 };
 
+struct io_uring_recvmsg_out {
+	__u32 namelen;
+	__u32 controllen;
+	__u32 payloadlen;
+	__u32 flags;
+};
+
 /*
  * accept flags stored in sqe->ioprio
  */
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 3+ messages in thread

* [PATCH RFC liburing 2/2] add tests for multishot recvmsg
  2022-07-08 18:45 [PATCH RFC liburing 0/2] multishot recvmsg Dylan Yudaken
  2022-07-08 18:45 ` [PATCH RFC liburing 1/2] add multishot recvmsg API Dylan Yudaken
@ 2022-07-08 18:45 ` Dylan Yudaken
  1 sibling, 0 replies; 3+ messages in thread
From: Dylan Yudaken @ 2022-07-08 18:45 UTC (permalink / raw)
  To: axboe, asml.silence; +Cc: io-uring, Kernel-team, Dylan Yudaken

Expand the multishot recv test to include recvmsg.
This also checks that sockaddr comes back, and that control messages work
properly.

Signed-off-by: Dylan Yudaken <dylany@fb.com>
---
 test/recv-multishot.c | 137 +++++++++++++++++++++++++++++++++++++-----
 1 file changed, 123 insertions(+), 14 deletions(-)

diff --git a/test/recv-multishot.c b/test/recv-multishot.c
index 9df8184..b1cc335 100644
--- a/test/recv-multishot.c
+++ b/test/recv-multishot.c
@@ -27,20 +27,42 @@ enum early_error_t {
 struct args {
 	bool stream;
 	bool wait_each;
+	bool recvmsg;
 	enum early_error_t early_error;
 };
 
+static int check_sockaddr(struct sockaddr_in *in)
+{
+	struct in_addr expected;
+	inet_pton(AF_INET, "127.0.0.1", &expected);
+	if (in->sin_family != AF_INET) {
+		fprintf(stderr, "bad family %d\n", (int)htons(in->sin_family));
+		return -1;
+	}
+	if (memcmp(&expected, &in->sin_addr, sizeof(in->sin_addr))) {
+		char buff[256];
+		const char *addr = inet_ntop(AF_INET, &in->sin_addr, buff, sizeof(buff));
+		fprintf(stderr, "unexpected address %s\n", addr ? addr : "INVALID");
+		return -1;
+	}
+	return 0;
+}
+
 static int test(struct args *args)
 {
 	int const N = 8;
 	int const N_BUFFS = N * 64;
 	int const N_CQE_OVERFLOW = 4;
 	int const min_cqes = 2;
+	int const NAME_LEN = sizeof(struct sockaddr_storage);
+	int const CONTROL_LEN = CMSG_ALIGN(sizeof(struct sockaddr_storage))
+					+ sizeof(struct cmsghdr);
 	struct io_uring ring;
 	struct io_uring_cqe *cqe;
 	struct io_uring_sqe *sqe;
 	int fds[2], ret, i, j, total_sent_bytes = 0, total_recv_bytes = 0;
 	int send_buff[256];
+	int *sent_buffs[N_BUFFS];
 	int *recv_buffs[N_BUFFS];
 	int *at;
 	struct io_uring_cqe recv_cqe[N_BUFFS];
@@ -50,7 +72,7 @@ static int test(struct args *args)
 	struct __kernel_timespec timeout = {
 		.tv_sec = 1,
 	};
-
+	struct msghdr msg;
 
 	memset(recv_buffs, 0, sizeof(recv_buffs));
 
@@ -75,21 +97,39 @@ static int test(struct args *args)
 		return ret;
 	}
 
+	if (!args->stream) {
+		bool val = true;
+		/* force some cmsgs to come back to us */
+		if (setsockopt(fds[0], IPPROTO_IP,
+				IP_RECVORIGDSTADDR, &val, sizeof(val))) {
+				fprintf(stderr, "setsockopt failed %d\n", errno);
+				goto cleanup;
+			}
+	}
+
 	for (i = 0; i < ARRAY_SIZE(send_buff); i++)
 		send_buff[i] = i;
 
 	for (i = 0; i < ARRAY_SIZE(recv_buffs); i++) {
 		/* prepare some different sized buffers */
-		int buffer_size = (i % 2 == 0 && args->stream) ? 1 : N * sizeof(int);
+		int buffer_size = (i % 2 == 0 && args->stream) ? 1 : N;
+		buffer_size *= sizeof(int);
+		if (args->recvmsg) {
+			buffer_size +=
+				sizeof(struct io_uring_recvmsg_out) +
+				NAME_LEN +
+				CONTROL_LEN;
+		}
 
-		recv_buffs[i] = malloc(sizeof(*at) * buffer_size);
+		recv_buffs[i] = malloc(buffer_size);
 
 		if (i > 2 && args->early_error == ERROR_NOT_ENOUGH_BUFFERS)
 			continue;
 
 		sqe = io_uring_get_sqe(&ring);
 		io_uring_prep_provide_buffers(sqe, recv_buffs[i],
-					buffer_size * sizeof(*recv_buffs[i]), 1, 7, i);
+					buffer_size, 1, 7, i);
+		memset(recv_buffs[i], 0xcc, buffer_size);
 		if (io_uring_submit_and_wait_timeout(&ring, &cqe, 1, &timeout, NULL) != 0) {
 			fprintf(stderr, "provide buffers failed: %d\n", ret);
 			ret = -1;
@@ -99,7 +139,15 @@ static int test(struct args *args)
 	}
 
 	sqe = io_uring_get_sqe(&ring);
-	io_uring_prep_recv_multishot(sqe, fds[0], NULL, 0, 0);
+	if (args->recvmsg) {
+		memset(&msg, 0, sizeof(msg));
+		msg.msg_namelen = NAME_LEN;
+		msg.msg_controllen = CONTROL_LEN;
+
+		io_uring_prep_recvmsg_multishot(sqe, fds[0], &msg, 0);
+	} else {
+		io_uring_prep_recv_multishot(sqe, fds[0], NULL, 0, 0);
+	}
 	sqe->flags |= IOSQE_BUFFER_SELECT;
 	sqe->buf_group = 7;
 	io_uring_sqe_set_data64(sqe, 1234);
@@ -111,6 +159,7 @@ static int test(struct args *args)
 		int to_send = sizeof(*at) * (i+1);
 
 		total_sent_bytes += to_send;
+		sent_buffs[i] = at;
 		if (send(fds[1], at, to_send, 0) != to_send) {
 			if (early_error_started)
 				break;
@@ -205,6 +254,8 @@ static int test(struct args *args)
 
 
 		if (should_be_last) {
+			int used_res = cqe->res;
+
 			if (!is_last) {
 				fprintf(stderr, "not last cqe had error %d\n", i);
 				goto cleanup;
@@ -234,7 +285,16 @@ static int test(struct args *args)
 				break;
 			case ERROR_NONE:
 			case ERROR_EARLY_CLOSE_SENDER:
-				if (cqe->res != 0) {
+				if (args->recvmsg && (cqe->flags & IORING_CQE_F_BUFFER)) {
+					struct io_uring_recvmsg_out *o =
+						(struct io_uring_recvmsg_out *)recv_buffs[cqe->flags >> 16];
+					if (o->payloadlen != 0) {
+						fprintf(stderr, "early error expected 0 payloadlen, got %u\n",
+							o->payloadlen);
+						goto cleanup;
+					}
+					used_res = 0;
+				} else if (cqe->res != 0) {
 					fprintf(stderr, "early error: res %d\n", cqe->res);
 					goto cleanup;
 				}
@@ -254,7 +314,7 @@ static int test(struct args *args)
 				goto cleanup;
 			}
 
-			if (cqe->res <= 0)
+			if (used_res <= 0)
 				continue;
 		} else {
 			if (!(cqe->flags & IORING_CQE_F_MORE)) {
@@ -268,7 +328,48 @@ static int test(struct args *args)
 			goto cleanup;
 		}
 
+		this_recv = recv_buffs[cqe->flags >> 16];
+
+		if (args->recvmsg) {
+			struct io_uring_recvmsg_out *o = io_uring_recvmsg_validate(
+				this_recv, cqe->res, &msg);
+			if (!o) {
+				fprintf(stderr, "bad recvmsg\n");
+				goto cleanup;
+			}
+			cqe->res = o->payloadlen;
+
+			if (!args->stream) {
+				struct cmsghdr *cmsg;
+				if (o->namelen < sizeof(struct sockaddr_in)) {
+					fprintf(stderr, "bad addr len %d",
+						o->namelen);
+					goto cleanup;
+				}
+				if (check_sockaddr((struct sockaddr_in*)io_uring_recvmsg_name(o)))
+					goto cleanup;
+
+				cmsg = io_uring_recvmsg_cmsg_firsthdr(o, &msg);
+				if (!cmsg ||
+				    cmsg->cmsg_level != IPPROTO_IP ||
+				    cmsg->cmsg_type != IP_RECVORIGDSTADDR) {
+					fprintf(stderr, "bad cmsg");
+					goto cleanup;
+				}
+				if (check_sockaddr((struct sockaddr_in *)CMSG_DATA(cmsg)))
+					goto cleanup;
+				cmsg = io_uring_recvmsg_cmsg_nexthdr(o, &msg, cmsg);
+				if (cmsg) {
+					fprintf(stderr, "unexpected extra cmsg\n");
+					goto cleanup;
+				}
+			}
+
+			this_recv = (int*)io_uring_recvmsg_payload(o, &msg);
+		}
+
 		total_recv_bytes += cqe->res;
+
 		if (cqe->res % 4 != 0) {
 			/*
 			 * doesn't seem to happen in practice, would need some
@@ -278,9 +379,19 @@ static int test(struct args *args)
 			goto cleanup;
 		}
 
-		/* check buffer arrived in order (for tcp) */
-		this_recv = recv_buffs[cqe->flags >> 16];
-		for (j = 0; args->stream && j < cqe->res / 4; j++) {
+		/*
+		 * for tcp: check buffer arrived in order
+		 * for udp: based on size validate data based on size
+		 */
+		if (!args->stream) {
+			int sent_idx = cqe->res / sizeof(*at) - 1;
+			if (sent_idx < 0 || sent_idx > N) {
+				fprintf(stderr, "Bad sent idx: %d\n", sent_idx);
+				goto cleanup;
+			}
+			at = sent_buffs[sent_idx];
+		}
+		for (j = 0; j < cqe->res / 4; j++) {
 			int sent = *at++;
 			int recv = *this_recv++;
 
@@ -297,9 +408,6 @@ static int test(struct args *args)
 		goto cleanup;
 	}
 
-	/* check the final one */
-	cqe = &recv_cqe[recv_cqes-1];
-
 	ret = 0;
 cleanup:
 	for (i = 0; i < ARRAY_SIZE(recv_buffs); i++)
@@ -320,10 +428,11 @@ int main(int argc, char *argv[])
 	if (argc > 1)
 		return T_EXIT_SKIP;
 
-	for (loop = 0; loop < 4; loop++) {
+	for (loop = 0; loop < 8; loop++) {
 		struct args a = {
 			.stream = loop & 0x01,
 			.wait_each = loop & 0x2,
+			.recvmsg = loop & 0x04,
 		};
 		for (early_error = 0; early_error < ERROR_EARLY_LAST; early_error++) {
 			a.early_error = (enum early_error_t)early_error;
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2022-07-08 18:45 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-07-08 18:45 [PATCH RFC liburing 0/2] multishot recvmsg Dylan Yudaken
2022-07-08 18:45 ` [PATCH RFC liburing 1/2] add multishot recvmsg API Dylan Yudaken
2022-07-08 18:45 ` [PATCH RFC liburing 2/2] add tests for multishot recvmsg Dylan Yudaken

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.