All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH liburing v2 0/5] sendzc test improvements
@ 2023-03-05  5:13 Pavel Begunkov
  2023-03-05  5:13 ` [PATCH liburing v2 1/5] examples/send-zc: add defer taskrun support Pavel Begunkov
                   ` (5 more replies)
  0 siblings, 6 replies; 7+ messages in thread
From: Pavel Begunkov @ 2023-03-05  5:13 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, asml.silence

Add affinity, multithreading and the server, and also fix TPC
performance issues

v2: rebase
    add defer support (patch 1/5)
    fix rx tcp problems (patch 5/5) 

Pavel Begunkov (5):
  examples/send-zc: add defer taskrun support
  examples/send-zc: add affinity / CPU pinning
  examples/send-zc: add multithreading
  examples/send-zc: add the receive part
  examples/send-zc: kill sock bufs configuration

 examples/Makefile        |   3 +
 examples/send-zerocopy.c | 290 ++++++++++++++++++++++++++++++++++-----
 2 files changed, 260 insertions(+), 33 deletions(-)

-- 
2.39.1


^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH liburing v2 1/5] examples/send-zc: add defer taskrun support
  2023-03-05  5:13 [PATCH liburing v2 0/5] sendzc test improvements Pavel Begunkov
@ 2023-03-05  5:13 ` Pavel Begunkov
  2023-03-05  5:13 ` [PATCH liburing v2 2/5] examples/send-zc: add affinity / CPU pinning Pavel Begunkov
                   ` (4 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Pavel Begunkov @ 2023-03-05  5:13 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, asml.silence

Signed-off-by: Pavel Begunkov <asml.silence@gmail.com>
---
 examples/send-zerocopy.c | 19 ++++++++++++++++---
 1 file changed, 16 insertions(+), 3 deletions(-)

diff --git a/examples/send-zerocopy.c b/examples/send-zerocopy.c
index 2844491..d60335c 100644
--- a/examples/send-zerocopy.c
+++ b/examples/send-zerocopy.c
@@ -48,6 +48,7 @@ static bool cfg_zc = 1;
 static int  cfg_nr_reqs = 8;
 static bool cfg_fixed_buf = 1;
 static bool cfg_hugetlb = 0;
+static bool cfg_defer_taskrun = 0;
 
 static int  cfg_family		= PF_UNSPEC;
 static int  cfg_payload_len;
@@ -151,6 +152,7 @@ static inline struct io_uring_cqe *wait_cqe_fast(struct io_uring *ring)
 
 static void do_tx(int domain, int type, int protocol)
 {
+	const int notif_slack = 128;
 	unsigned long packets = 0;
 	unsigned long bytes = 0;
 	struct io_uring ring;
@@ -158,10 +160,14 @@ static void do_tx(int domain, int type, int protocol)
 	uint64_t tstop;
 	int i, fd, ret;
 	int compl_cqes = 0;
+	int ring_flags = IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SINGLE_ISSUER;
+
+	if (cfg_defer_taskrun)
+		ring_flags |= IORING_SETUP_DEFER_TASKRUN;
 
 	fd = do_setup_tx(domain, type, protocol);
 
-	ret = io_uring_queue_init(512, &ring, IORING_SETUP_COOP_TASKRUN);
+	ret = io_uring_queue_init(512, &ring, ring_flags);
 	if (ret)
 		t_error(1, ret, "io_uring: queue init");
 
@@ -211,7 +217,11 @@ static void do_tx(int domain, int type, int protocol)
 			}
 		}
 
-		ret = io_uring_submit(&ring);
+		if (cfg_defer_taskrun && compl_cqes >= notif_slack)
+			ret = io_uring_submit_and_get_events(&ring);
+		else
+			ret = io_uring_submit(&ring);
+
 		if (ret != cfg_nr_reqs)
 			t_error(1, ret, "submit");
 
@@ -292,7 +302,7 @@ static void parse_opts(int argc, char **argv)
 
 	cfg_payload_len = max_payload_len;
 
-	while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:")) != -1) {
+	while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:d")) != -1) {
 		switch (c) {
 		case '4':
 			if (cfg_family != PF_UNSPEC)
@@ -330,6 +340,9 @@ static void parse_opts(int argc, char **argv)
 		case 'l':
 			cfg_hugetlb = strtoul(optarg, NULL, 0);
 			break;
+		case 'd':
+			cfg_defer_taskrun = 1;
+			break;
 		}
 	}
 
-- 
2.39.1


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH liburing v2 2/5] examples/send-zc: add affinity / CPU pinning
  2023-03-05  5:13 [PATCH liburing v2 0/5] sendzc test improvements Pavel Begunkov
  2023-03-05  5:13 ` [PATCH liburing v2 1/5] examples/send-zc: add defer taskrun support Pavel Begunkov
@ 2023-03-05  5:13 ` Pavel Begunkov
  2023-03-05  5:13 ` [PATCH liburing v2 3/5] examples/send-zc: add multithreading Pavel Begunkov
                   ` (3 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Pavel Begunkov @ 2023-03-05  5:13 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, asml.silence

Pass '-C <cpu_num>' to pin threads and io-wq to the specified CPU.

Signed-off-by: Pavel Begunkov <asml.silence@gmail.com>
---
 examples/send-zerocopy.c | 37 ++++++++++++++++++++++++++++++++++++-
 1 file changed, 36 insertions(+), 1 deletion(-)

diff --git a/examples/send-zerocopy.c b/examples/send-zerocopy.c
index d60335c..baa2bdf 100644
--- a/examples/send-zerocopy.c
+++ b/examples/send-zerocopy.c
@@ -12,6 +12,7 @@
 #include <stdarg.h>
 #include <string.h>
 
+#include <sched.h>
 #include <arpa/inet.h>
 #include <linux/if_packet.h>
 #include <linux/ipv6.h>
@@ -49,6 +50,7 @@ static int  cfg_nr_reqs = 8;
 static bool cfg_fixed_buf = 1;
 static bool cfg_hugetlb = 0;
 static bool cfg_defer_taskrun = 0;
+static int  cfg_cpu = -1;
 
 static int  cfg_family		= PF_UNSPEC;
 static int  cfg_payload_len;
@@ -78,6 +80,32 @@ static void t_error(int status, int errnum, const char *format, ...)
 	exit(status);
 }
 
+static void set_cpu_affinity(void)
+{
+	cpu_set_t mask;
+
+	if (cfg_cpu == -1)
+		return;
+
+	CPU_ZERO(&mask);
+	CPU_SET(cfg_cpu, &mask);
+	if (sched_setaffinity(0, sizeof(mask), &mask))
+		t_error(1, errno, "unable to pin cpu\n");
+}
+
+static void set_iowq_affinity(struct io_uring *ring)
+{
+	cpu_set_t mask;
+	int ret;
+
+	if (cfg_cpu == -1)
+		return;
+
+	ret = io_uring_register_iowq_aff(ring, 1, &mask);
+	if (ret)
+		t_error(1, ret, "unabled to set io-wq affinity\n");
+}
+
 static unsigned long gettimeofday_ms(void)
 {
 	struct timeval tv;
@@ -171,6 +199,9 @@ static void do_tx(int domain, int type, int protocol)
 	if (ret)
 		t_error(1, ret, "io_uring: queue init");
 
+	set_cpu_affinity();
+	set_iowq_affinity(&ring);
+
 	if (cfg_fixed_files) {
 		ret = io_uring_register_files(&ring, &fd, 1);
 		if (ret < 0)
@@ -302,7 +333,7 @@ static void parse_opts(int argc, char **argv)
 
 	cfg_payload_len = max_payload_len;
 
-	while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:d")) != -1) {
+	while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:dC:")) != -1) {
 		switch (c) {
 		case '4':
 			if (cfg_family != PF_UNSPEC)
@@ -343,6 +374,9 @@ static void parse_opts(int argc, char **argv)
 		case 'd':
 			cfg_defer_taskrun = 1;
 			break;
+		case 'C':
+			cfg_cpu = strtol(optarg, NULL, 0);
+			break;
 		}
 	}
 
@@ -362,6 +396,7 @@ int main(int argc, char **argv)
 	const char *cfg_test;
 
 	parse_opts(argc, argv);
+	set_cpu_affinity();
 
 	payload = payload_buf;
 	if (cfg_hugetlb) {
-- 
2.39.1


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH liburing v2 3/5] examples/send-zc: add multithreading
  2023-03-05  5:13 [PATCH liburing v2 0/5] sendzc test improvements Pavel Begunkov
  2023-03-05  5:13 ` [PATCH liburing v2 1/5] examples/send-zc: add defer taskrun support Pavel Begunkov
  2023-03-05  5:13 ` [PATCH liburing v2 2/5] examples/send-zc: add affinity / CPU pinning Pavel Begunkov
@ 2023-03-05  5:13 ` Pavel Begunkov
  2023-03-05  5:13 ` [PATCH liburing v2 4/5] examples/send-zc: add the receive part Pavel Begunkov
                   ` (2 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Pavel Begunkov @ 2023-03-05  5:13 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, asml.silence

'-T <nr_threads>' will create the specified number of threads to test in
parallel. Each thread will have its own connection.

Signed-off-by: Pavel Begunkov <asml.silence@gmail.com>
---
 examples/Makefile        |   3 +
 examples/send-zerocopy.c | 116 ++++++++++++++++++++++++++-------------
 2 files changed, 81 insertions(+), 38 deletions(-)

diff --git a/examples/Makefile b/examples/Makefile
index ef79e42..ce33af9 100644
--- a/examples/Makefile
+++ b/examples/Makefile
@@ -10,6 +10,9 @@ ifneq ($(MAKECMDGOALS),clean)
 include ../config-host.mak
 endif
 
+LDFLAGS ?=
+override LDFLAGS += -L../src/ -luring -lpthread
+
 example_srcs := \
 	io_uring-close-test.c \
 	io_uring-cp.c \
diff --git a/examples/send-zerocopy.c b/examples/send-zerocopy.c
index baa2bdf..683a965 100644
--- a/examples/send-zerocopy.c
+++ b/examples/send-zerocopy.c
@@ -11,6 +11,7 @@
 #include <stdbool.h>
 #include <stdarg.h>
 #include <string.h>
+#include <pthread.h>
 
 #include <sched.h>
 #include <arpa/inet.h>
@@ -42,6 +43,16 @@
 
 #define ZC_TAG 0xfffffffULL
 #define MAX_SUBMIT_NR 512
+#define MAX_THREADS 100
+
+struct thread_data {
+	pthread_t thread;
+	void *ret;
+	int idx;
+	unsigned long long packets;
+	unsigned long long bytes;
+	struct sockaddr_storage dst_addr;
+};
 
 static bool cfg_reg_ringfd = true;
 static bool cfg_fixed_files = 1;
@@ -51,17 +62,21 @@ static bool cfg_fixed_buf = 1;
 static bool cfg_hugetlb = 0;
 static bool cfg_defer_taskrun = 0;
 static int  cfg_cpu = -1;
+static unsigned  cfg_nr_threads = 1;
 
 static int  cfg_family		= PF_UNSPEC;
+static int  cfg_type		= 0;
 static int  cfg_payload_len;
 static int  cfg_port		= 8000;
 static int  cfg_runtime_ms	= 4200;
 
 static socklen_t cfg_alen;
-static struct sockaddr_storage cfg_dst_addr;
+static char *str_addr = NULL;
 
 static char payload_buf[IP_MAXPACKET] __attribute__((aligned(4096)));
 static char *payload;
+static struct thread_data threads[MAX_THREADS];
+static pthread_barrier_t barrier;
 
 /*
  * Implementation of error(3), prints an error message and exits.
@@ -125,12 +140,13 @@ static void setup_sockaddr(int domain, const char *str_addr,
 {
 	struct sockaddr_in6 *addr6 = (void *) sockaddr;
 	struct sockaddr_in *addr4 = (void *) sockaddr;
+	int port = cfg_port;
 
 	switch (domain) {
 	case PF_INET:
 		memset(addr4, 0, sizeof(*addr4));
 		addr4->sin_family = AF_INET;
-		addr4->sin_port = htons(cfg_port);
+		addr4->sin_port = htons(port);
 		if (str_addr &&
 		    inet_pton(AF_INET, str_addr, &(addr4->sin_addr)) != 1)
 			t_error(1, 0, "ipv4 parse error: %s", str_addr);
@@ -138,7 +154,7 @@ static void setup_sockaddr(int domain, const char *str_addr,
 	case PF_INET6:
 		memset(addr6, 0, sizeof(*addr6));
 		addr6->sin6_family = AF_INET6;
-		addr6->sin6_port = htons(cfg_port);
+		addr6->sin6_port = htons(port);
 		if (str_addr &&
 		    inet_pton(AF_INET6, str_addr, &(addr6->sin6_addr)) != 1)
 			t_error(1, 0, "ipv6 parse error: %s", str_addr);
@@ -148,21 +164,6 @@ static void setup_sockaddr(int domain, const char *str_addr,
 	}
 }
 
-static int do_setup_tx(int domain, int type, int protocol)
-{
-	int fd;
-
-	fd = socket(domain, type, protocol);
-	if (fd == -1)
-		t_error(1, errno, "socket t");
-
-	do_setsockopt(fd, SOL_SOCKET, SO_SNDBUF, 1 << 21);
-
-	if (connect(fd, (void *) &cfg_dst_addr, cfg_alen))
-		t_error(1, errno, "connect");
-	return fd;
-}
-
 static inline struct io_uring_cqe *wait_cqe_fast(struct io_uring *ring)
 {
 	struct io_uring_cqe *cqe;
@@ -178,11 +179,9 @@ static inline struct io_uring_cqe *wait_cqe_fast(struct io_uring *ring)
 	return cqe;
 }
 
-static void do_tx(int domain, int type, int protocol)
+static void do_tx(struct thread_data *td, int domain, int type, int protocol)
 {
 	const int notif_slack = 128;
-	unsigned long packets = 0;
-	unsigned long bytes = 0;
 	struct io_uring ring;
 	struct iovec iov;
 	uint64_t tstop;
@@ -193,7 +192,14 @@ static void do_tx(int domain, int type, int protocol)
 	if (cfg_defer_taskrun)
 		ring_flags |= IORING_SETUP_DEFER_TASKRUN;
 
-	fd = do_setup_tx(domain, type, protocol);
+	fd = socket(domain, type, protocol);
+	if (fd == -1)
+		t_error(1, errno, "socket t");
+
+	do_setsockopt(fd, SOL_SOCKET, SO_SNDBUF, 1 << 21);
+
+	if (connect(fd, (void *)&td->dst_addr, cfg_alen))
+		t_error(1, errno, "connect, idx %i", td->idx);
 
 	ret = io_uring_queue_init(512, &ring, ring_flags);
 	if (ret)
@@ -220,6 +226,8 @@ static void do_tx(int domain, int type, int protocol)
 	if (ret)
 		t_error(1, ret, "io_uring: buffer registration");
 
+	pthread_barrier_wait(&barrier);
+
 	tstop = gettimeofday_ms() + cfg_runtime_ms;
 	do {
 		struct io_uring_sqe *sqe;
@@ -271,8 +279,8 @@ static void do_tx(int domain, int type, int protocol)
 				compl_cqes++;
 
 			if (cqe->res >= 0) {
-				packets++;
-				bytes += cqe->res;
+				td->packets++;
+				td->bytes += cqe->res;
 			} else if (cqe->res == -ECONNREFUSED || cqe->res == -EPIPE ||
 				   cqe->res == -ECONNRESET) {
 				fprintf(stderr, "Connection failure");
@@ -289,11 +297,6 @@ out_fail:
 	if (close(fd))
 		t_error(1, errno, "close");
 
-	fprintf(stderr, "tx=%lu (MB=%lu), tx/s=%lu (MB/s=%lu)\n",
-			packets, bytes >> 20,
-			packets / (cfg_runtime_ms / 1000),
-			(bytes >> 20) / (cfg_runtime_ms / 1000));
-
 	while (compl_cqes) {
 		struct io_uring_cqe *cqe = wait_cqe_fast(&ring);
 
@@ -303,14 +306,16 @@ out_fail:
 	io_uring_queue_exit(&ring);
 }
 
-static void do_test(int domain, int type, int protocol)
+
+static void *do_test(void *arg)
 {
-	int i;
+	struct thread_data *td = arg;
+	int protocol = 0;
 
-	for (i = 0; i < IP_MAXPACKET; i++)
-		payload[i] = 'a' + (i % 26);
+	setup_sockaddr(cfg_family, str_addr, &td->dst_addr);
 
-	do_tx(domain, type, protocol);
+	do_tx(td, cfg_family, cfg_type, protocol);
+	pthread_exit(&td->ret);
 }
 
 static void usage(const char *filepath)
@@ -333,7 +338,7 @@ static void parse_opts(int argc, char **argv)
 
 	cfg_payload_len = max_payload_len;
 
-	while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:dC:")) != -1) {
+	while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:dC:T:")) != -1) {
 		switch (c) {
 		case '4':
 			if (cfg_family != PF_UNSPEC)
@@ -377,6 +382,11 @@ static void parse_opts(int argc, char **argv)
 		case 'C':
 			cfg_cpu = strtol(optarg, NULL, 0);
 			break;
+		case 'T':
+			cfg_nr_threads = strtol(optarg, NULL, 0);
+			if (cfg_nr_threads > MAX_THREADS)
+				t_error(1, 0, "too many threads\n");
+			break;
 		}
 	}
 
@@ -385,7 +395,7 @@ static void parse_opts(int argc, char **argv)
 	if (cfg_payload_len > max_payload_len)
 		t_error(1, 0, "-s: payload exceeds max (%d)", max_payload_len);
 
-	setup_sockaddr(cfg_family, daddr, &cfg_dst_addr);
+	str_addr = daddr;
 
 	if (optind != argc - 1)
 		usage(argv[0]);
@@ -393,7 +403,11 @@ static void parse_opts(int argc, char **argv)
 
 int main(int argc, char **argv)
 {
+	unsigned long long packets = 0, bytes = 0;
+	struct thread_data *td;
 	const char *cfg_test;
+	void *res;
+	int i;
 
 	parse_opts(argc, argv);
 	set_cpu_affinity();
@@ -411,11 +425,37 @@ int main(int argc, char **argv)
 
 	cfg_test = argv[argc - 1];
 	if (!strcmp(cfg_test, "tcp"))
-		do_test(cfg_family, SOCK_STREAM, 0);
+		cfg_type = SOCK_STREAM;
 	else if (!strcmp(cfg_test, "udp"))
-		do_test(cfg_family, SOCK_DGRAM, 0);
+		cfg_type = SOCK_DGRAM;
 	else
 		t_error(1, 0, "unknown cfg_test %s", cfg_test);
 
+	pthread_barrier_init(&barrier, NULL, cfg_nr_threads);
+
+	for (i = 0; i < IP_MAXPACKET; i++)
+		payload[i] = 'a' + (i % 26);
+
+	for (i = 0; i < cfg_nr_threads; i++) {
+		td = &threads[i];
+		td->idx = i;
+	}
+
+	for (i = 0; i < cfg_nr_threads; i++)
+		pthread_create(&threads[i].thread, NULL, do_test, td);
+
+	for (i = 0; i < cfg_nr_threads; i++) {
+		td = &threads[i];
+		pthread_join(td->thread, &res);
+		packets += td->packets;
+		bytes += td->bytes;
+	}
+
+	fprintf(stderr, "tx=%llu (MB=%llu), tx/s=%llu (MB/s=%llu)\n",
+		packets, bytes >> 20,
+		packets / (cfg_runtime_ms / 1000),
+		(bytes >> 20) / (cfg_runtime_ms / 1000));
+
+	pthread_barrier_destroy(&barrier);
 	return 0;
 }
-- 
2.39.1


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH liburing v2 4/5] examples/send-zc: add the receive part
  2023-03-05  5:13 [PATCH liburing v2 0/5] sendzc test improvements Pavel Begunkov
                   ` (2 preceding siblings ...)
  2023-03-05  5:13 ` [PATCH liburing v2 3/5] examples/send-zc: add multithreading Pavel Begunkov
@ 2023-03-05  5:13 ` Pavel Begunkov
  2023-03-05  5:13 ` [PATCH liburing v2 5/5] examples/send-zc: kill sock bufs configuration Pavel Begunkov
  2023-03-05 14:35 ` [PATCH liburing v2 0/5] sendzc test improvements Jens Axboe
  5 siblings, 0 replies; 7+ messages in thread
From: Pavel Begunkov @ 2023-03-05  5:13 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, asml.silence

'-R' will switch the benchmark into the server mode accepting data. For
TCP the number of threads should match the number of threads of the
client. For UDP just one thread/connection should be enough.

Signed-off-by: Pavel Begunkov <asml.silence@gmail.com>
---
 examples/send-zerocopy.c | 148 +++++++++++++++++++++++++++++++++++++--
 1 file changed, 144 insertions(+), 4 deletions(-)

diff --git a/examples/send-zerocopy.c b/examples/send-zerocopy.c
index 683a965..8e1242e 100644
--- a/examples/send-zerocopy.c
+++ b/examples/send-zerocopy.c
@@ -13,6 +13,7 @@
 #include <string.h>
 #include <pthread.h>
 
+#include <poll.h>
 #include <sched.h>
 #include <arpa/inet.h>
 #include <linux/if_packet.h>
@@ -52,6 +53,7 @@ struct thread_data {
 	unsigned long long packets;
 	unsigned long long bytes;
 	struct sockaddr_storage dst_addr;
+	int fd;
 };
 
 static bool cfg_reg_ringfd = true;
@@ -62,6 +64,7 @@ static bool cfg_fixed_buf = 1;
 static bool cfg_hugetlb = 0;
 static bool cfg_defer_taskrun = 0;
 static int  cfg_cpu = -1;
+static bool cfg_rx = 0;
 static unsigned  cfg_nr_threads = 1;
 
 static int  cfg_family		= PF_UNSPEC;
@@ -164,6 +167,135 @@ static void setup_sockaddr(int domain, const char *str_addr,
 	}
 }
 
+static int do_poll(int fd, int events)
+{
+	struct pollfd pfd;
+	int ret;
+
+	pfd.events = events;
+	pfd.revents = 0;
+	pfd.fd = fd;
+
+	ret = poll(&pfd, 1, -1);
+	if (ret == -1)
+		t_error(1, errno, "poll");
+
+	return ret && (pfd.revents & events);
+}
+
+/* Flush all outstanding bytes for the tcp receive queue */
+static int do_flush_tcp(struct thread_data *td, int fd)
+{
+	int ret;
+
+	/* MSG_TRUNC flushes up to len bytes */
+	ret = recv(fd, NULL, 1 << 21, MSG_TRUNC | MSG_DONTWAIT);
+	if (ret == -1 && errno == EAGAIN)
+		return 0;
+	if (ret == -1)
+		t_error(1, errno, "flush");
+	if (!ret)
+		return 1;
+
+	td->packets++;
+	td->bytes += ret;
+	return 0;
+}
+
+/* Flush all outstanding datagrams. Verify first few bytes of each. */
+static int do_flush_datagram(struct thread_data *td, int fd, int type)
+{
+	int ret, off = 0;
+	char buf[64];
+
+	/* MSG_TRUNC will return full datagram length */
+	ret = recv(fd, buf, sizeof(buf), MSG_DONTWAIT | MSG_TRUNC);
+	if (ret == -1 && errno == EAGAIN)
+		return 0;
+
+	if (ret == -1)
+		t_error(1, errno, "recv");
+	if (ret != cfg_payload_len)
+		t_error(1, 0, "recv: ret=%u != %u", ret, cfg_payload_len);
+	if (ret > sizeof(buf) - off)
+		ret = sizeof(buf) - off;
+	if (memcmp(buf + off, payload, ret))
+		t_error(1, 0, "recv: data mismatch");
+
+	td->packets++;
+	td->bytes += cfg_payload_len;
+	return 0;
+}
+
+static void do_setup_rx(int domain, int type, int protocol)
+{
+	struct sockaddr_storage addr = {};
+	struct thread_data *td;
+	int listen_fd, fd, i;
+
+	fd = socket(domain, type, protocol);
+	if (fd == -1)
+		t_error(1, errno, "socket r");
+
+	do_setsockopt(fd, SOL_SOCKET, SO_RCVBUF, 1 << 21);
+	do_setsockopt(fd, SOL_SOCKET, SO_RCVLOWAT, 1 << 16);
+	do_setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
+
+	setup_sockaddr(cfg_family, str_addr, &addr);
+
+	if (bind(fd, (void *)&addr, cfg_alen))
+		t_error(1, errno, "bind");
+
+	if (type != SOCK_STREAM) {
+		if (cfg_nr_threads != 1)
+			t_error(1, 0, "udp rx cant multithread");
+		threads[0].fd = fd;
+		return;
+	}
+
+	listen_fd = fd;
+	if (listen(listen_fd, cfg_nr_threads))
+		t_error(1, errno, "listen");
+
+	for (i = 0; i < cfg_nr_threads; i++) {
+		td = &threads[i];
+
+		fd = accept(listen_fd, NULL, NULL);
+		if (fd == -1)
+			t_error(1, errno, "accept");
+		td->fd = fd;
+	}
+
+	if (close(listen_fd))
+		t_error(1, errno, "close listen sock");
+}
+
+static void *do_rx(void *arg)
+{
+	struct thread_data *td = arg;
+	const int cfg_receiver_wait_ms = 400;
+	uint64_t tstop;
+	int ret, fd = td->fd;
+
+	tstop = gettimeofday_ms() + cfg_runtime_ms + cfg_receiver_wait_ms;
+	do {
+		if (cfg_type == SOCK_STREAM)
+			ret = do_flush_tcp(td, fd);
+		else
+			ret = do_flush_datagram(td, fd, cfg_type);
+
+		if (ret)
+			break;
+
+		do_poll(fd, POLLIN);
+	} while (gettimeofday_ms() < tstop);
+
+	if (close(fd))
+		t_error(1, errno, "close");
+	pthread_exit(&td->ret);
+	return NULL;
+}
+
 static inline struct io_uring_cqe *wait_cqe_fast(struct io_uring *ring)
 {
 	struct io_uring_cqe *cqe;
@@ -283,7 +415,7 @@ static void do_tx(struct thread_data *td, int domain, int type, int protocol)
 				td->bytes += cqe->res;
 			} else if (cqe->res == -ECONNREFUSED || cqe->res == -EPIPE ||
 				   cqe->res == -ECONNRESET) {
-				fprintf(stderr, "Connection failure");
+				fprintf(stderr, "Connection failure\n");
 				goto out_fail;
 			} else if (cqe->res != -EAGAIN) {
 				t_error(1, cqe->res, "send failed");
@@ -316,6 +448,7 @@ static void *do_test(void *arg)
 
 	do_tx(td, cfg_family, cfg_type, protocol);
 	pthread_exit(&td->ret);
+	return NULL;
 }
 
 static void usage(const char *filepath)
@@ -338,7 +471,7 @@ static void parse_opts(int argc, char **argv)
 
 	cfg_payload_len = max_payload_len;
 
-	while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:dC:T:")) != -1) {
+	while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:dC:T:R")) != -1) {
 		switch (c) {
 		case '4':
 			if (cfg_family != PF_UNSPEC)
@@ -387,6 +520,9 @@ static void parse_opts(int argc, char **argv)
 			if (cfg_nr_threads > MAX_THREADS)
 				t_error(1, 0, "too many threads\n");
 			break;
+		case 'R':
+			cfg_rx = 1;
+			break;
 		}
 	}
 
@@ -441,8 +577,12 @@ int main(int argc, char **argv)
 		td->idx = i;
 	}
 
+	if (cfg_rx)
+		do_setup_rx(cfg_family, cfg_type, 0);
+
 	for (i = 0; i < cfg_nr_threads; i++)
-		pthread_create(&threads[i].thread, NULL, do_test, td);
+		pthread_create(&threads[i].thread, NULL,
+				!cfg_rx ? do_test : do_rx, &threads[i]);
 
 	for (i = 0; i < cfg_nr_threads; i++) {
 		td = &threads[i];
@@ -451,7 +591,7 @@ int main(int argc, char **argv)
 		bytes += td->bytes;
 	}
 
-	fprintf(stderr, "tx=%llu (MB=%llu), tx/s=%llu (MB/s=%llu)\n",
+	fprintf(stderr, "packets=%llu (MB=%llu), rps=%llu (MB/s=%llu)\n",
 		packets, bytes >> 20,
 		packets / (cfg_runtime_ms / 1000),
 		(bytes >> 20) / (cfg_runtime_ms / 1000));
-- 
2.39.1


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH liburing v2 5/5] examples/send-zc: kill sock bufs configuration
  2023-03-05  5:13 [PATCH liburing v2 0/5] sendzc test improvements Pavel Begunkov
                   ` (3 preceding siblings ...)
  2023-03-05  5:13 ` [PATCH liburing v2 4/5] examples/send-zc: add the receive part Pavel Begunkov
@ 2023-03-05  5:13 ` Pavel Begunkov
  2023-03-05 14:35 ` [PATCH liburing v2 0/5] sendzc test improvements Jens Axboe
  5 siblings, 0 replies; 7+ messages in thread
From: Pavel Begunkov @ 2023-03-05  5:13 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, asml.silence

Remove SO_RCVLOWAT / SO_RCVBUF, they are arbitrary and drastically
affect performance.

Signed-off-by: Pavel Begunkov <asml.silence@gmail.com>
---
 examples/send-zerocopy.c | 4 ----
 1 file changed, 4 deletions(-)

diff --git a/examples/send-zerocopy.c b/examples/send-zerocopy.c
index 8e1242e..f400f38 100644
--- a/examples/send-zerocopy.c
+++ b/examples/send-zerocopy.c
@@ -237,8 +237,6 @@ static void do_setup_rx(int domain, int type, int protocol)
 	if (fd == -1)
 		t_error(1, errno, "socket r");
 
-	do_setsockopt(fd, SOL_SOCKET, SO_RCVBUF, 1 << 21);
-	do_setsockopt(fd, SOL_SOCKET, SO_RCVLOWAT, 1 << 16);
 	do_setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
 
 	setup_sockaddr(cfg_family, str_addr, &addr);
@@ -328,8 +326,6 @@ static void do_tx(struct thread_data *td, int domain, int type, int protocol)
 	if (fd == -1)
 		t_error(1, errno, "socket t");
 
-	do_setsockopt(fd, SOL_SOCKET, SO_SNDBUF, 1 << 21);
-
 	if (connect(fd, (void *)&td->dst_addr, cfg_alen))
 		t_error(1, errno, "connect, idx %i", td->idx);
 
-- 
2.39.1


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* Re: [PATCH liburing v2 0/5] sendzc test improvements
  2023-03-05  5:13 [PATCH liburing v2 0/5] sendzc test improvements Pavel Begunkov
                   ` (4 preceding siblings ...)
  2023-03-05  5:13 ` [PATCH liburing v2 5/5] examples/send-zc: kill sock bufs configuration Pavel Begunkov
@ 2023-03-05 14:35 ` Jens Axboe
  5 siblings, 0 replies; 7+ messages in thread
From: Jens Axboe @ 2023-03-05 14:35 UTC (permalink / raw)
  To: io-uring, Pavel Begunkov


On Sun, 05 Mar 2023 05:13:03 +0000, Pavel Begunkov wrote:
> Add affinity, multithreading and the server, and also fix TPC
> performance issues
> 
> v2: rebase
>     add defer support (patch 1/5)
>     fix rx tcp problems (patch 5/5)
> 
> [...]

Applied, thanks!

[1/5] examples/send-zc: add defer taskrun support
      commit: 209fb0e9b6a8f813276262790066c162e13975ac
[2/5] examples/send-zc: add affinity / CPU pinning
      commit: bacbc4ca724c12d303395fb55a03e8d7a40c036b
[3/5] examples/send-zc: add multithreading
      commit: d0e68bc1132c52867649889570e86ae620604833
[4/5] examples/send-zc: add the receive part
      commit: f1af5ff51a3320a8971c611368c693c1dec560c5
[5/5] examples/send-zc: kill sock bufs configuration
      commit: 38d357b73791a31912c3ef13b42b74e568e71dbb

Best regards,
-- 
Jens Axboe




^ permalink raw reply	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2023-03-05 14:35 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-03-05  5:13 [PATCH liburing v2 0/5] sendzc test improvements Pavel Begunkov
2023-03-05  5:13 ` [PATCH liburing v2 1/5] examples/send-zc: add defer taskrun support Pavel Begunkov
2023-03-05  5:13 ` [PATCH liburing v2 2/5] examples/send-zc: add affinity / CPU pinning Pavel Begunkov
2023-03-05  5:13 ` [PATCH liburing v2 3/5] examples/send-zc: add multithreading Pavel Begunkov
2023-03-05  5:13 ` [PATCH liburing v2 4/5] examples/send-zc: add the receive part Pavel Begunkov
2023-03-05  5:13 ` [PATCH liburing v2 5/5] examples/send-zc: kill sock bufs configuration Pavel Begunkov
2023-03-05 14:35 ` [PATCH liburing v2 0/5] sendzc test improvements Jens Axboe

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.