All of lore.kernel.org
 help / color / mirror / Atom feed
* [RFC PATCH 0/4] Sockperf: Initial multi-threaded throughput client
@ 2017-12-18 15:26 Doug Ledford
       [not found] ` <cover.1513609601.git.dledford-H+wXaHxf7aLQT0dZR+AlfA@public.gmane.org>
  0 siblings, 1 reply; 6+ messages in thread
From: Doug Ledford @ 2017-12-18 15:26 UTC (permalink / raw)
  To: linux-rdma-u79uwXL29TY76Z2rM5mHXA; +Cc: Doug Ledford

During testing, it has become painfully clear that a single threaded UDP
test client can not exercise a 100Gig link due to issues related to
single core maximum throughput.  This patchset implements a
multi-threaded throughput test in sockperf.  This is just an initial
implementation, there is still more work to be done.  In particular:

1) Although the speed improved with this change, it did not improve
drastically.  As soon as the client send bottleneck was removed, it
became clear there is another bottleneck on the server.  When sending to
a server from one client, all data is received on a single queue pair,
and due to how interrupts are spread in the RDMA stack (namely that each
queue pair goes to a single interrupt and we rely on multiple queue
pairs being in use to balance interrupts across different cores), we
take all interrupts from a specific host on a single core and the
receiving side then becomes the bottleneck with single core IPoIB
receive processing being the limiting factor.  On a slower machine, I
clocked 30GBit/s throughput.  On a faster machine as the server, I was
able to get up to 70GBit/s throughput.

2) I thought I might try an experiment to get around the queue pair is
on one CPU issue.  We use P_Keys in our internal lab setup, and so on
the specific link in question, I actually have a total of three
different IP interfaces on different P_Keys.  I tried to open tests on
multiple of these interfaces to see how that would impact performance
(so a multithreaded server listening on ports on three different P_Key
interfaces all on the same physical link, which should use three
different queue pairs, and a multithreaded client sending to those three
different P_Key interfaces from three different P_Key interfaces of its
own).  It tanked it.  Like less than gigabit ethernet speeds.  This
warrants some investigation moving forward I think.

3) I thought I might try sending from two clients to the server at once
and summing their throughput.  That was fun.  With UDP the clients are
able to send enough data that flow control on the link kicks in, at
which point each client starts dropping packets on the floor (they're
UDP after all), and so the net result is that one client claimed
200GBit/s and the other about 175GBit/s.  Meanwhile, the server thought
we were just kidding and didn't actually run a test at all.

4) I reran the test using TCP instead of UDP.  That's a non-starter.
Whether due to my changes, or just because it is the way it is, the TCP
tests all failed.  For larger message sizes, they failed instantly.  For
smaller message sizes the test might run for a few seconds, but would
eventually fail too.  Always the failure was that the server would get a
message it deemed too large and would forcibly close all of the TCP
connections, at which point the client just bails.

I should point out that I don't program C++.  Issues with me not doing
these patches in a C++ typical manner are related to that.

Doug Ledford (4):
  Rename a few variables
  Move num-threads and cpu-affinity to common opts
  Move server thread handler to SockPerf.cpp
  Initial implementation of threaded throughput client

 src/Client.cpp   | 140 +++++++++++++++---------
 src/Client.h     |   3 +-
 src/Defs.h       |  10 +-
 src/Server.cpp   | 137 +----------------------
 src/SockPerf.cpp | 324 ++++++++++++++++++++++++++++++++++++++++++-------------
 5 files changed, 357 insertions(+), 257 deletions(-)

-- 
2.14.3

--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo-u79uwXL29TY76Z2rM5mHXA@public.gmane.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply	[flat|nested] 6+ messages in thread

* [PATCH 1/4] Rename a few variables
       [not found] ` <cover.1513609601.git.dledford-H+wXaHxf7aLQT0dZR+AlfA@public.gmane.org>
@ 2017-12-18 15:26   ` Doug Ledford
  2017-12-18 15:26   ` [PATCH 2/4] Move num-threads and cpu-affinity to common opts Doug Ledford
                     ` (3 subsequent siblings)
  4 siblings, 0 replies; 6+ messages in thread
From: Doug Ledford @ 2017-12-18 15:26 UTC (permalink / raw)
  To: linux-rdma-u79uwXL29TY76Z2rM5mHXA; +Cc: Doug Ledford

In preparation for making sockperf use multiple threads both in server
and client mode, rename a few variables related to the multi-thread
operation that had hints that they only applied to server mode to more
generic names that don't provide what will soon be a false hint.

Signed-off-by: Doug Ledford <dledford-H+wXaHxf7aLQT0dZR+AlfA@public.gmane.org>
---
 src/Defs.h       |  2 +-
 src/Server.cpp   |  4 ++--
 src/SockPerf.cpp | 58 ++++++++++++++++++++++++++++----------------------------
 3 files changed, 32 insertions(+), 32 deletions(-)

diff --git a/src/Defs.h b/src/Defs.h
index 51098e8ba098..0ee0f1e229c7 100644
--- a/src/Defs.h
+++ b/src/Defs.h
@@ -590,7 +590,7 @@ struct user_params_t {
 	unsigned int  burst_size;
 	bool packetrate_stats_print_details;
 //	bool stream_mode; - use b_stream instead
-	int mthread_server;
+	int mthread;
 	struct timeval* select_timeout;
 	int sock_buff_size;
 	int threads_num;
diff --git a/src/Server.cpp b/src/Server.cpp
index efc0222197bf..adbc29f07b65 100644
--- a/src/Server.cpp
+++ b/src/Server.cpp
@@ -146,7 +146,7 @@ void ServerBase::cleanupAfterLoop() {
 	// cleanup
 	log_dbg("thread %lu released allocations",(unsigned long)os_getthread().tid);
 
-	if (!g_pApp->m_const_params.mthread_server) {
+	if (!g_pApp->m_const_params.mthread) {
 		log_msg("%s() exit", __func__);
 	}
 }
@@ -461,7 +461,7 @@ void server_sig_handler(int signum) {
 	   (g_pApp->m_const_params.packetrate_stats_print_ratio < g_receiveCount))
 		printf("\n");
 
-	if (g_pApp->m_const_params.mthread_server) {
+	if (g_pApp->m_const_params.mthread) {
 		if (os_getthread().tid == thread_pid_array[0].tid) {  //main thread
 			if (g_debug_level >= LOG_LVL_DEBUG) {
 				log_dbg("Main thread %lu got signal %d - exiting",(unsigned long)os_getthread().tid,signum);
diff --git a/src/SockPerf.cpp b/src/SockPerf.cpp
index 9603fa305bf6..a7bc55e6bbc8 100644
--- a/src/SockPerf.cpp
+++ b/src/SockPerf.cpp
@@ -94,9 +94,9 @@ extern void server_handler(handler_info *);
 extern void server_select_per_thread(int fd_num);
 
 static bool sock_lib_started = 0; // 
-static int s_fd_max = 0;
-static int s_fd_min = 0;	/* used as THE fd when single mc group is given (RECVFROM blocked mode) */
-static int s_fd_num = 0;
+static int fd_max = 0;
+static int fd_min = 0;	/* used as THE fd when single mc group is given (RECVFROM blocked mode) */
+static int fd_num = 0;
 static struct mutable_params_t s_mutable_params;
 
 static void set_select_timeout(int time_out_msec);
@@ -1498,7 +1498,7 @@ static int proc_mode_server( int id, int argc, const char **argv )
 			if (aopt_check(common_obj, 'f')) {
 				const char* optarg = aopt_value(server_obj, OPT_THREADS_NUM);
 				if (optarg) {
-					s_user_params.mthread_server = 1;
+					s_user_params.mthread = 1;
 					errno = 0;
 					int threads_num = strtol(optarg, NULL, 0);
 					if (errno != 0  || threads_num <= 0) {
@@ -2190,7 +2190,7 @@ void cleanup()
 	int ifd;
 	if (g_fds_array)
 	{
-		for (ifd = 0; ifd <= s_fd_max; ifd++) {
+		for (ifd = 0; ifd <= fd_max; ifd++) {
 			if (g_fds_array[ifd]) {
 				close(ifd);
 				if (g_fds_array[ifd]->active_fd_list) {
@@ -2212,7 +2212,7 @@ void cleanup()
 	}
 #ifdef USING_VMA_EXTRA_API
 	if (g_vma_api && s_user_params.is_vmazcopyread) {
-		for (int i = s_fd_min; i < s_fd_max; i++) {
+		for (int i = fd_min; i < fd_max; i++) {
 			delete g_zeroCopyData[i];
 		}
 	}
@@ -2291,7 +2291,7 @@ void set_defaults()
 	s_user_params.burst_size	= 1;
 	s_user_params.data_integrity = false;
 	s_user_params.fd_handler_type = RECVFROM;
-	s_user_params.mthread_server = 0;
+	s_user_params.mthread = 0;
 	s_user_params.msg_size = MIN_PAYLOAD_SIZE;
 	s_user_params.msg_size_range = 0;
 	s_user_params.sock_buff_size = SOCK_BUFF_DEFAULT_SIZE;
@@ -2968,7 +2968,7 @@ static int set_sockets_from_feedfile(const char *feedfile_name)
 			/* Check if the same value exists */
 			bool is_exist = false;
 			port_and_type port_type_tmp = {tmp->sock_type,tmp->server_addr.sin_port};
-			for (int i = s_fd_min; i <= s_fd_max; i++) {
+			for (int i = fd_min; i <= fd_max; i++) {
 				/* duplicated values are accepted in case client connection using TCP */
 				/* or in case source address is set for multicast socket */
 				if (((s_user_params.mode == MODE_CLIENT)  && (tmp->sock_type == SOCK_STREAM)) || ((tmp->is_multicast) && (tmp->mc_source_ip_addr.s_addr != INADDR_ANY))) {
@@ -3026,7 +3026,7 @@ static int set_sockets_from_feedfile(const char *feedfile_name)
 					}
 					tmp->memberships_size=0;
 
-					s_fd_num++;
+					fd_num++;
 				}
 				if ( curr_fd >=0 ) {
 					if ( (curr_fd >= MAX_FDS_NUM) ||
@@ -3055,14 +3055,14 @@ static int set_sockets_from_feedfile(const char *feedfile_name)
 						tmp->recv.cur_size = tmp->recv.max_size;
 
 						if (new_socket_flag) {
-							if (s_fd_num == 1){ /*it is the first fd*/
-								s_fd_min = curr_fd;
-								s_fd_max = curr_fd;
+							if (fd_num == 1){ /*it is the first fd*/
+								fd_min = curr_fd;
+								fd_max = curr_fd;
 							}
 							else {
 								g_fds_array[last_fd]->next_fd = curr_fd;
-								s_fd_min = _min(s_fd_min, curr_fd);
-								s_fd_max = _max(s_fd_max, curr_fd);
+								fd_min = _min(fd_min, curr_fd);
+								fd_max = _max(fd_max, curr_fd);
 							}
 							last_fd = curr_fd;
 							g_fds_array[curr_fd] = tmp;
@@ -3088,7 +3088,7 @@ static int set_sockets_from_feedfile(const char *feedfile_name)
 	fclose(file_fd);
 
 	if (!rc) {
-		g_fds_array[s_fd_max]->next_fd = s_fd_min; /* close loop for fast wrap around in client */
+		g_fds_array[fd_max]->next_fd = fd_min; /* close loop for fast wrap around in client */
 
 #ifdef ST_TEST
 		{
@@ -3215,7 +3215,7 @@ int bringup(const int *p_daemonize)
 						else {
 							int i = 0;
 
-							s_fd_num = 1;
+							fd_num = 1;
 
 							for (i = 0; i < MAX_ACTIVE_FD_NUM; i++) {
 								tmp->active_fd_list[i] = (int)INVALID_SOCKET;
@@ -3231,9 +3231,9 @@ int bringup(const int *p_daemonize)
 							tmp->recv.cur_offset = 0;
 							tmp->recv.cur_size = tmp->recv.max_size;
 
-							s_fd_min = s_fd_max = curr_fd;
-							g_fds_array[s_fd_min] = tmp;
-							g_fds_array[s_fd_min]->next_fd = s_fd_min;
+							fd_min = fd_max = curr_fd;
+							g_fds_array[fd_min] = tmp;
+							g_fds_array[fd_min]->next_fd = fd_min;
 						}
 					}
 				}
@@ -3252,7 +3252,7 @@ int bringup(const int *p_daemonize)
 		}
 
 		if ( !rc &&
-				(s_user_params.threads_num > s_fd_num  || s_user_params.threads_num == 0)) {
+				(s_user_params.threads_num > fd_num  || s_user_params.threads_num == 0)) {
 			log_msg("Number of threads should be less than sockets count");
 			rc = SOCKPERF_ERR_BAD_ARGUMENT;
 		}
@@ -3305,7 +3305,7 @@ int bringup(const int *p_daemonize)
 		}
 
 		s_user_params.warmup_msec = TEST_FIRST_CONNECTION_FIRST_PACKET_TTL_THRESHOLD_MSEC +
-				s_fd_num * TEST_ANY_CONNECTION_FIRST_PACKET_TTL_THRESHOLD_MSEC;
+				fd_num * TEST_ANY_CONNECTION_FIRST_PACKET_TTL_THRESHOLD_MSEC;
 		if (s_user_params.warmup_msec < TEST_START_WARMUP_MSEC) {
 			s_user_params.warmup_msec = TEST_START_WARMUP_MSEC;
 		} else {
@@ -3327,12 +3327,12 @@ void do_test()
 	handler_info info;
 
 	info.id = 0;
-	info.fd_min = s_fd_min;
-	info.fd_max = s_fd_max;
-	info.fd_num = s_fd_num;
+	info.fd_min = fd_min;
+	info.fd_max = fd_max;
+	info.fd_num = fd_num;
 #ifdef USING_VMA_EXTRA_API
 	if (g_vma_api && s_user_params.is_vmazcopyread) {
-		for (int i = s_fd_min; i < s_fd_max; i++) {
+		for (int i = fd_min; i < fd_max; i++) {
 			g_zeroCopyData[i] = new ZeroCopyData();
 			g_zeroCopyData[i]->allocate();
 		}
@@ -3343,8 +3343,8 @@ void do_test()
 		client_handler(&info);
 		break;
 	case MODE_SERVER:
-		if (s_user_params.mthread_server) {
-			server_select_per_thread(s_fd_num);
+		if (s_user_params.mthread) {
+			server_select_per_thread(fd_num);
 		}
 		else {
 			server_handler(&info);
@@ -3423,7 +3423,7 @@ packetrate_stats_print_ratio = %d \n\t\
 burst_size = %d \n\t\
 packetrate_stats_print_details = %d \n\t\
 fd_handler_type = %d \n\t\
-mthread_server = %d \n\t\
+mthread = %d \n\t\
 sock_buff_size = %d \n\t\
 threads_num = %d \n\t\
 threads_affinity = %s \n\t\
@@ -3462,7 +3462,7 @@ s_user_params.packetrate_stats_print_ratio,
 s_user_params.burst_size,
 s_user_params.packetrate_stats_print_details,
 s_user_params.fd_handler_type,
-s_user_params.mthread_server,
+s_user_params.mthread,
 s_user_params.sock_buff_size,
 s_user_params.threads_num,
 s_user_params.threads_affinity,
-- 
2.14.3

--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo-u79uwXL29TY76Z2rM5mHXA@public.gmane.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [PATCH 2/4] Move num-threads and cpu-affinity to common opts
       [not found] ` <cover.1513609601.git.dledford-H+wXaHxf7aLQT0dZR+AlfA@public.gmane.org>
  2017-12-18 15:26   ` [PATCH 1/4] Rename a few variables Doug Ledford
@ 2017-12-18 15:26   ` Doug Ledford
  2017-12-18 15:26   ` [PATCH 3/4] Move server thread handler to SockPerf.cpp Doug Ledford
                     ` (2 subsequent siblings)
  4 siblings, 0 replies; 6+ messages in thread
From: Doug Ledford @ 2017-12-18 15:26 UTC (permalink / raw)
  To: linux-rdma-u79uwXL29TY76Z2rM5mHXA; +Cc: Doug Ledford

The option settings are split into common, server, client (which is
really client common), and client mode specific settings.  Move
the two listed options to the common settings.  This does not make
the options *work* for clients yet, just makes them accepted for
clients.  With the exception of changing the help comment for
num-threads, this made no other change except to move the code
location.

Signed-off-by: Doug Ledford <dledford-H+wXaHxf7aLQT0dZR+AlfA@public.gmane.org>
---
 src/SockPerf.cpp | 90 ++++++++++++++++++++++++++++----------------------------
 1 file changed, 45 insertions(+), 45 deletions(-)

diff --git a/src/SockPerf.cpp b/src/SockPerf.cpp
index a7bc55e6bbc8..7f8d6969e645 100644
--- a/src/SockPerf.cpp
+++ b/src/SockPerf.cpp
@@ -169,6 +169,14 @@ static const AOPT_DESC  common_opt_desc[] =
 		'f', AOPT_ARG, aopt_set_literal( 'f' ), aopt_set_string( "file" ),
 		"Read multiple ip+port combinations from file <file> (will use IO muxer '-F' such as epoll, poll or select)"
 	},
+	{
+		OPT_THREADS_NUM, AOPT_ARG, aopt_set_literal( 0 ), aopt_set_string( "threads-num" ),
+		"Run <N> threads to process sockets (requires '-f' option)."
+	},
+	{
+		OPT_THREADS_AFFINITY, AOPT_ARG,	aopt_set_literal( 0 ),	aopt_set_string( "cpu-affinity" ),
+		"Set threads affinity to the given core ids in list format (see: cat /proc/cpuinfo)."
+	},
 	{
 		'F', AOPT_ARG, aopt_set_literal( 'F' ), aopt_set_string( "iomux-type" ),
 #ifdef WIN32
@@ -1421,14 +1429,6 @@ static int proc_mode_server( int id, int argc, const char **argv )
 			"Run in Bridge mode."
 		}, 
 */		
-		{
-			 OPT_THREADS_NUM, AOPT_ARG, aopt_set_literal( 0 ), aopt_set_string( "threads-num" ),
-			 "Run <N> threads on server side (requires '-f' option)."
-		 },
-		 {
-			 OPT_THREADS_AFFINITY, AOPT_ARG,	aopt_set_literal( 0 ),	aopt_set_string( "cpu-affinity" ),
-			 "Set threads affinity to the given core ids in list format (see: cat /proc/cpuinfo)."
-		 },
 #ifndef WIN32
 		 {
 			 OPT_VMARXFILTERCB, AOPT_NOARG,	aopt_set_literal( 0 ),	aopt_set_string( "vmarxfiltercb" ),
@@ -1493,43 +1493,6 @@ static int proc_mode_server( int id, int argc, const char **argv )
 			s_user_params.mode = MODE_BRIDGE;
 			p_addr->sin_port = htons(5001); /*iperf's default port*/
 		}
-
-		if ( !rc && aopt_check(server_obj, OPT_THREADS_NUM) ) {
-			if (aopt_check(common_obj, 'f')) {
-				const char* optarg = aopt_value(server_obj, OPT_THREADS_NUM);
-				if (optarg) {
-					s_user_params.mthread = 1;
-					errno = 0;
-					int threads_num = strtol(optarg, NULL, 0);
-					if (errno != 0  || threads_num <= 0) {
-						log_msg("'-%d' Invalid threads number: %s", OPT_THREADS_NUM, optarg);
-						rc = SOCKPERF_ERR_BAD_ARGUMENT;
-					}
-					else {
-						s_user_params.threads_num = threads_num;
-					}
-				}
-				else {
-					log_msg("'-%d' Invalid value", OPT_THREADS_NUM);
-					rc = SOCKPERF_ERR_BAD_ARGUMENT;
-				}
-			}
-			else {
-				log_msg("--threads-num must be used with feed file (option '-f')");
-				rc = SOCKPERF_ERR_BAD_ARGUMENT;
-			}
-		}
-
-		if ( !rc && aopt_check(server_obj, OPT_THREADS_AFFINITY) ) {
-			const char* optarg = aopt_value(server_obj, OPT_THREADS_AFFINITY);
-			if (optarg) {
-				strcpy(s_user_params.threads_affinity, optarg);
-			}
-			else {
-				log_msg("'-%d' Invalid threads affinity", OPT_THREADS_AFFINITY);
-				rc = SOCKPERF_ERR_BAD_ARGUMENT;
-			}
-		}
 #ifndef WIN32
 		if ( !rc && aopt_check(server_obj, OPT_VMARXFILTERCB) ) {
 			s_user_params.is_vmarxfiltercb = true;
@@ -1691,6 +1654,43 @@ static int parse_common_opt( const AOPT_OBJECT *common_obj )
 			}
 		}
 
+		if ( !rc && aopt_check(common_obj, OPT_THREADS_NUM) ) {
+			if (aopt_check(common_obj, 'f')) {
+				const char* optarg = aopt_value(common_obj, OPT_THREADS_NUM);
+				if (optarg) {
+					s_user_params.mthread = 1;
+					errno = 0;
+					int threads_num = strtol(optarg, NULL, 0);
+					if (errno != 0  || threads_num <= 0) {
+						log_msg("'-%d' Invalid threads number: %s", OPT_THREADS_NUM, optarg);
+						rc = SOCKPERF_ERR_BAD_ARGUMENT;
+					}
+					else {
+						s_user_params.threads_num = threads_num;
+					}
+				}
+				else {
+					log_msg("'-%d' Invalid value", OPT_THREADS_NUM);
+					rc = SOCKPERF_ERR_BAD_ARGUMENT;
+				}
+			}
+			else {
+				log_msg("--threads-num must be used with feed file (option '-f')");
+				rc = SOCKPERF_ERR_BAD_ARGUMENT;
+			}
+		}
+
+		if ( !rc && aopt_check(common_obj, OPT_THREADS_AFFINITY) ) {
+			const char* optarg = aopt_value(common_obj, OPT_THREADS_AFFINITY);
+			if (optarg) {
+				strcpy(s_user_params.threads_affinity, optarg);
+			}
+			else {
+				log_msg("'-%d' Invalid threads affinity", OPT_THREADS_AFFINITY);
+				rc = SOCKPERF_ERR_BAD_ARGUMENT;
+			}
+		}
+
 		if ( !rc && aopt_check(common_obj, 'F') ) {
 			if (aopt_check(common_obj, 'f')) {
 				const char* optarg = aopt_value(common_obj, 'F');
-- 
2.14.3

--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo-u79uwXL29TY76Z2rM5mHXA@public.gmane.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [PATCH 3/4] Move server thread handler to SockPerf.cpp
       [not found] ` <cover.1513609601.git.dledford-H+wXaHxf7aLQT0dZR+AlfA@public.gmane.org>
  2017-12-18 15:26   ` [PATCH 1/4] Rename a few variables Doug Ledford
  2017-12-18 15:26   ` [PATCH 2/4] Move num-threads and cpu-affinity to common opts Doug Ledford
@ 2017-12-18 15:26   ` Doug Ledford
  2017-12-18 15:26   ` [PATCH 4/4] Initial implementation of threaded throughput client Doug Ledford
  2017-12-20  8:52   ` [RFC PATCH 0/4] Sockperf: Initial multi-threaded " Leon Romanovsky
  4 siblings, 0 replies; 6+ messages in thread
From: Doug Ledford @ 2017-12-18 15:26 UTC (permalink / raw)
  To: linux-rdma-u79uwXL29TY76Z2rM5mHXA; +Cc: Doug Ledford

In order to use the thread starting code as common, move it to
SockPerf.cpp and make it no longer specific to the server.  Next we'll
add a Client handler and call that from the common code.

Signed-off-by: Doug Ledford <dledford-H+wXaHxf7aLQT0dZR+AlfA@public.gmane.org>
---
 src/Server.cpp   | 133 ++----------------------------------------------------
 src/SockPerf.cpp | 135 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 136 insertions(+), 132 deletions(-)

diff --git a/src/Server.cpp b/src/Server.cpp
index adbc29f07b65..f6d79a245ff4 100644
--- a/src/Server.cpp
+++ b/src/Server.cpp
@@ -32,8 +32,8 @@
 
 // static members initialization
 /*static*/ seq_num_map SwitchOnCalcGaps::ms_seq_num_map;
-static CRITICAL_SECTION	thread_exit_lock;
-static os_thread_t *thread_pid_array = NULL;
+extern CRITICAL_SECTION	thread_exit_lock;
+extern os_thread_t *thread_pid_array;
 
 //==============================================================================
 
@@ -407,7 +407,7 @@ void server_handler(handler_info *p_info)
 
 
 //------------------------------------------------------------------------------
-void *server_handler_for_multi_threaded(void *arg)
+void *server_handler_multi_thread(void *arg)
 {
 	handler_info *p_info = (handler_info *)arg;
 
@@ -431,23 +431,6 @@ void *server_handler_for_multi_threaded(void *arg)
 }
 
 
-//------------------------------------------------------------------------------
-void find_min_max_fds(int start_look_from, int len, int* p_fd_min, int* p_fd_max) {
-	int num_of_detected_fds;
-	int i;
-
-	for(num_of_detected_fds = 0, i = start_look_from; num_of_detected_fds < len;i++) {
-		if (g_fds_array[i]) {
-			if (!num_of_detected_fds) {
-				*p_fd_min = i;
-			}
-			num_of_detected_fds++;
-		}
-	}
-	*p_fd_max = i - 1;
-}
-
-
 //------------------------------------------------------------------------------
 void server_sig_handler(int signum) {
 	if (g_b_exit) {
@@ -497,116 +480,6 @@ void server_sig_handler(int signum) {
 }
 
 
-//------------------------------------------------------------------------------
-void server_select_per_thread(int _fd_num) {
-	int rc = SOCKPERF_ERR_NONE;
-	int i;
-	os_thread_t thread;
-	int fd_num;
-	int num_of_remainded_fds;
-	int last_fds = 0;
-	handler_info *handler_info_array = NULL;
-
-	handler_info_array = (handler_info*)MALLOC(sizeof(handler_info) * g_pApp->m_const_params.threads_num);
-	memset(handler_info_array, 0, sizeof(handler_info) * g_pApp->m_const_params.threads_num);
-	if (!handler_info_array) {
-		log_err("Failed to allocate memory for handler_info_arr");
-		rc = SOCKPERF_ERR_NO_MEMORY;
-	}
-
-	if (rc == SOCKPERF_ERR_NONE) {
-		thread_pid_array = (os_thread_t*)MALLOC(sizeof(os_thread_t)*(g_pApp->m_const_params.threads_num + 1));
-		if(!thread_pid_array) {
-			log_err("Failed to allocate memory for pid array");
-			rc = SOCKPERF_ERR_NO_MEMORY;
-		}
-		else {
-			memset(thread_pid_array, 0, sizeof(os_thread_t)*(g_pApp->m_const_params.threads_num + 1));
-			log_msg("Running %d threads to manage %d sockets", g_pApp->m_const_params.threads_num, _fd_num);
-		}
-	}
-
-	if (rc == SOCKPERF_ERR_NONE) {
-		INIT_CRITICAL(&thread_exit_lock);
-
-		thread_pid_array[0].tid = os_getthread().tid;
-
-		/* Divide fds_arr between threads */
-		num_of_remainded_fds = _fd_num % g_pApp->m_const_params.threads_num;
-		fd_num = _fd_num / g_pApp->m_const_params.threads_num;
-
-		for (i = 0; i < g_pApp->m_const_params.threads_num; i++) {
-			handler_info *cur_handler_info = (handler_info_array + i);
-
-			/* Set ID of handler (thread) */
-			cur_handler_info->id = i;
-
-			/* Set number of processed sockets */
-			cur_handler_info->fd_num = fd_num;
-			if (num_of_remainded_fds) {
-				cur_handler_info->fd_num++;
-				num_of_remainded_fds--;
-			}
-
-			/* Set min/max possible socket to be processed */
-			find_min_max_fds(last_fds, cur_handler_info->fd_num, &(cur_handler_info->fd_min), &(cur_handler_info->fd_max));
-
-			/* Launch handler */
-			errno = 0;
-			int ret = os_thread_exec(&thread, server_handler_for_multi_threaded, (void *)cur_handler_info);
-
-			/*
-			 * There is undocumented behaviour for early versions of libc (for example libc 2.5, 2.6, 2.7)
-			 * as pthread_create() call returns error code 12 ENOMEM and return value 0
-			 * Note: libc-2.9 demonstrates expected behaivour
-			 */
-			if ( (ret != 0) || (errno == ENOMEM) ) {
-				log_err("create thread has failed");
-				rc = SOCKPERF_ERR_FATAL;
-				break;
-			}
-			thread_pid_array[i + 1].tid = thread.tid;
-			last_fds = cur_handler_info->fd_max + 1;
-		}
-
-		/* Wait for ^C */
-		while ((rc == SOCKPERF_ERR_NONE) && !g_b_exit) {
-			sleep(1);
-		}
-
-		/* Stop all launched threads (the first index is reserved for main thread) */
-		for (i = 1; i <= g_pApp->m_const_params.threads_num; i++) {
-			os_thread_t cur_thread_pid;
-			cur_thread_pid.tid = 0;
-
-			ENTER_CRITICAL(&thread_exit_lock);
-			cur_thread_pid.tid = thread_pid_array[i].tid;
-			if (cur_thread_pid.tid) {
-				os_thread_kill(&cur_thread_pid);
-			}
-			LEAVE_CRITICAL(&thread_exit_lock);
-			if (cur_thread_pid.tid) {
-				os_thread_join(&cur_thread_pid);
-			}
-		}
-
-		DELETE_CRITICAL(&thread_exit_lock);
-	}
-
-	/* Free thread info allocated data */
-	if (handler_info_array) {
-		FREE(handler_info_array);
-	}
-
-	/* Free thread TID array */
-	if (thread_pid_array) {
-		FREE(thread_pid_array);
-	}
-
-	log_msg("%s() exit", __func__);
-}
-
-
 // Temp location because of compilation issue (inline-unit-growth=200) with the way this method was inlined
 void SwitchOnCalcGaps::execute(struct sockaddr_in *clt_addr, uint64_t seq_num, bool is_warmup)
 {
diff --git a/src/SockPerf.cpp b/src/SockPerf.cpp
index 7f8d6969e645..3c628cbc5ba1 100644
--- a/src/SockPerf.cpp
+++ b/src/SockPerf.cpp
@@ -86,12 +86,16 @@
 #include <dlfcn.h>
 #endif
 
+// For use by both Client.cpp & Server.cpp
+CRITICAL_SECTION	thread_exit_lock;
+os_thread_t *thread_pid_array = NULL;
+
 // forward declarations from Client.cpp & Server.cpp
 extern void client_sig_handler(int signum);
 extern void client_handler(handler_info *);
 extern void server_sig_handler(int signum);
 extern void server_handler(handler_info *);
-extern void server_select_per_thread(int fd_num);
+extern void *server_handler_multi_thread(void *);
 
 static bool sock_lib_started = 0; // 
 static int fd_max = 0;
@@ -331,6 +335,133 @@ static const AOPT_DESC  client_opt_desc[] =
 };
 
 
+//------------------------------------------------------------------------------
+void find_min_max_fds(int start_look_from, int len, int* p_fd_min, int* p_fd_max) {
+	int num_of_detected_fds;
+	int i;
+
+	for(num_of_detected_fds = 0, i = start_look_from; num_of_detected_fds < len;i++) {
+		if (g_fds_array[i]) {
+			if (!num_of_detected_fds) {
+				*p_fd_min = i;
+			}
+			num_of_detected_fds++;
+		}
+	}
+	*p_fd_max = i - 1;
+}
+
+
+//------------------------------------------------------------------------------
+void select_per_thread(void *(handler)(void *), int _fd_num) {
+	int rc = SOCKPERF_ERR_NONE;
+	int i;
+	os_thread_t thread;
+	int fd_num;
+	int num_of_remainded_fds;
+	int last_fds = 0;
+	handler_info *handler_info_array = NULL;
+
+	handler_info_array = (handler_info*)MALLOC(sizeof(handler_info) * g_pApp->m_const_params.threads_num);
+	memset(handler_info_array, 0, sizeof(handler_info) * g_pApp->m_const_params.threads_num);
+	if (!handler_info_array) {
+		log_err("Failed to allocate memory for handler_info_arr");
+		rc = SOCKPERF_ERR_NO_MEMORY;
+	}
+
+	if (rc == SOCKPERF_ERR_NONE) {
+		thread_pid_array = (os_thread_t*)MALLOC(sizeof(os_thread_t)*(g_pApp->m_const_params.threads_num + 1));
+		if(!thread_pid_array) {
+			log_err("Failed to allocate memory for pid array");
+			rc = SOCKPERF_ERR_NO_MEMORY;
+		}
+		else {
+			memset(thread_pid_array, 0, sizeof(os_thread_t)*(g_pApp->m_const_params.threads_num + 1));
+			log_msg("Running %d threads to manage %d sockets", g_pApp->m_const_params.threads_num, _fd_num);
+		}
+	}
+
+	if (rc == SOCKPERF_ERR_NONE) {
+		INIT_CRITICAL(&thread_exit_lock);
+
+		thread_pid_array[0].tid = os_getthread().tid;
+
+		/* Divide fds_arr between threads */
+		num_of_remainded_fds = _fd_num % g_pApp->m_const_params.threads_num;
+		fd_num = _fd_num / g_pApp->m_const_params.threads_num;
+
+		for (i = 0; i < g_pApp->m_const_params.threads_num; i++) {
+			handler_info *cur_handler_info = (handler_info_array + i);
+
+			/* Set ID of handler (thread) */
+			cur_handler_info->id = i;
+
+			/* Set number of processed sockets */
+			cur_handler_info->fd_num = fd_num;
+			if (num_of_remainded_fds) {
+				cur_handler_info->fd_num++;
+				num_of_remainded_fds--;
+			}
+
+			/* Set min/max possible socket to be processed */
+			find_min_max_fds(last_fds, cur_handler_info->fd_num, &(cur_handler_info->fd_min), &(cur_handler_info->fd_max));
+
+			/* Launch handler */
+			errno = 0;
+			int ret = os_thread_exec(&thread, handler, (void *)cur_handler_info);
+
+			/*
+			 * There is undocumented behaviour for early versions of libc (for example libc 2.5, 2.6, 2.7)
+			 * as pthread_create() call returns error code 12 ENOMEM and return value 0
+			 * Note: libc-2.9 demonstrates expected behaivour
+			 */
+			if ( (ret != 0) || (errno == ENOMEM) ) {
+				log_err("create thread has failed");
+				rc = SOCKPERF_ERR_FATAL;
+				break;
+			}
+			thread_pid_array[i + 1].tid = thread.tid;
+			last_fds = cur_handler_info->fd_max + 1;
+		}
+
+		/* Wait for ^C */
+		while ((rc == SOCKPERF_ERR_NONE) && !g_b_exit) {
+			sleep(1);
+		}
+
+		/* Stop all launched threads (the first index is reserved for main thread) */
+		for (i = 1; i <= g_pApp->m_const_params.threads_num; i++) {
+			os_thread_t cur_thread_pid;
+			cur_thread_pid.tid = 0;
+
+			ENTER_CRITICAL(&thread_exit_lock);
+			cur_thread_pid.tid = thread_pid_array[i].tid;
+			if (cur_thread_pid.tid) {
+				os_thread_kill(&cur_thread_pid);
+			}
+			LEAVE_CRITICAL(&thread_exit_lock);
+			if (cur_thread_pid.tid) {
+				os_thread_join(&cur_thread_pid);
+			}
+		}
+
+		DELETE_CRITICAL(&thread_exit_lock);
+	}
+
+	/* Free thread info allocated data */
+	if (handler_info_array) {
+		FREE(handler_info_array);
+	}
+
+	/* Free thread TID array */
+	if (thread_pid_array) {
+		FREE(thread_pid_array);
+	}
+
+	log_msg("%s() exit", __func__);
+}
+
+
 //------------------------------------------------------------------------------
 static int proc_mode_help( int id, int argc, const char **argv )
 {
@@ -3344,7 +3475,7 @@ void do_test()
 		break;
 	case MODE_SERVER:
 		if (s_user_params.mthread) {
-			server_select_per_thread(fd_num);
+			select_per_thread(server_handler_multi_thread, fd_num);
 		}
 		else {
 			server_handler(&info);
-- 
2.14.3

--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo-u79uwXL29TY76Z2rM5mHXA@public.gmane.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [PATCH 4/4] Initial implementation of threaded throughput client
       [not found] ` <cover.1513609601.git.dledford-H+wXaHxf7aLQT0dZR+AlfA@public.gmane.org>
                     ` (2 preceding siblings ...)
  2017-12-18 15:26   ` [PATCH 3/4] Move server thread handler to SockPerf.cpp Doug Ledford
@ 2017-12-18 15:26   ` Doug Ledford
  2017-12-20  8:52   ` [RFC PATCH 0/4] Sockperf: Initial multi-threaded " Leon Romanovsky
  4 siblings, 0 replies; 6+ messages in thread
From: Doug Ledford @ 2017-12-18 15:26 UTC (permalink / raw)
  To: linux-rdma-u79uwXL29TY76Z2rM5mHXA; +Cc: Doug Ledford

We add fields to the handler_info struct and use that to store
the results of individual threads since the thread destructor would
otherwise delete our performance data before we could sum it up for a
total.  Otherwise, we reuse most of the implementation of the server
thread code.

Signed-off-by: Doug Ledford <dledford-H+wXaHxf7aLQT0dZR+AlfA@public.gmane.org>
---
 src/Client.cpp   | 140 ++++++++++++++++++++++++++++++++++++-------------------
 src/Client.h     |   3 +-
 src/Defs.h       |   8 ++++
 src/SockPerf.cpp |  53 +++++++++++++++++++--
 4 files changed, 150 insertions(+), 54 deletions(-)

diff --git a/src/Client.cpp b/src/Client.cpp
index f8c1fdd11256..0edf9c36c3fc 100644
--- a/src/Client.cpp
+++ b/src/Client.cpp
@@ -33,7 +33,9 @@
 #include "PacketTimes.h"
 #include "Switches.h"
 
-TicksTime s_startTime, s_endTime;
+extern CRITICAL_SECTION	thread_exit_lock;
+extern os_thread_t *thread_pid_array;
+TicksTime g_c_startTime, g_c_endTime;
 
 //==============================================================================
 //==============================================================================
@@ -119,7 +121,7 @@ void client_statistics(int serverNo, Message *pMsgRequest)
 
 	/* Print total statistic that is independent on server count */
 	if (SERVER_NO == 0) {
-		TicksDuration totalRunTime = s_endTime - s_startTime;
+		TicksDuration totalRunTime = g_c_endTime - g_c_startTime;
 		if (g_skipCount) {
 			log_msg_file2(f, "[Total Run] RunTime=%.3lf sec; Warm up time=%" PRIu32 " msec; SentMessages=%" PRIu64 "; ReceivedMessages=%" PRIu64 "; SkippedMessages=%" PRIu64 "",
 				totalRunTime.toDecimalUsec()/1000000, g_pApp->m_const_params.warmup_msec, sendCount, receiveCount, g_skipCount);
@@ -261,24 +263,34 @@ void client_statistics(int serverNo, Message *pMsgRequest)
 }
 
 //------------------------------------------------------------------------------
-void stream_statistics(Message *pMsgRequest)
+void stream_statistics(struct handler_info *p_info)
 {
-	TicksDuration totalRunTime = s_endTime - s_startTime;
+	TicksDuration totalRunTime = p_info->c_endTime - p_info->c_startTime;
+	uint64_t sendCount = p_info->sendCount;
+	char prefix[20];
+
+	if (g_pApp->m_const_params.mthread) {
+		if (p_info->id)
+			snprintf(prefix, sizeof prefix, "[TID: %d] ", p_info->id);
+		else
+			snprintf(prefix, sizeof prefix, "[TID: ALL] ");
+	}
+	else {
+		prefix[0] = '\0';
+	}
 
 	if (totalRunTime <= TicksDuration::TICKS0) return;
 	if (!g_pApp->m_const_params.b_stream) return;
 
-	const uint64_t sendCount = pMsgRequest->getSequenceCounter();
-
 	// Send only mode!
 	if (g_skipCount) {
-		log_msg("Total of %" PRIu64 " messages sent in %.3lf sec (%" PRIu64 " messages skipped)\n",
-				sendCount, totalRunTime.toDecimalUsec()/1000000, g_skipCount);
+		log_msg("%sTotal of %" PRIu64 " messages sent in %.3lf sec (%" PRIu64 " messages skipped)\n",
+				prefix, sendCount, totalRunTime.toDecimalUsec()/1000000, g_skipCount);
 	}
 	else 
 	{
-		log_msg("Total of %" PRIu64 " messages sent in %.3lf sec\n",
-				sendCount, totalRunTime.toDecimalUsec()/1000000);
+		log_msg("%sTotal of %" PRIu64 " messages sent in %.3lf sec\n",
+				prefix, sendCount, totalRunTime.toDecimalUsec()/1000000);
 	}
 	if (g_pApp->m_const_params.mps != MPS_MAX) {
 		if (g_pApp->m_const_params.msg_size_range)
@@ -308,17 +320,17 @@ void stream_statistics(Message *pMsgRequest)
 	int total_line_ip_data = g_pApp->m_const_params.msg_size;
 	double MBps = ((double)msgps * total_line_ip_data)/1024/1024; /* No including IP + UDP Headers per fragment */
 	if (ip_frags_per_msg == 1)
-		log_msg("Summary: Message Rate is %d [msg/sec]", msgps);
+		log_msg("%sSummary: Message Rate is %d [msg/sec]", prefix, msgps);
 	else
-		log_msg("Summary: Message Rate is %d [msg/sec], Packet Rate is about %d [pkt/sec] (%d ip frags / msg)", msgps, pktps, ip_frags_per_msg);
+		log_msg("%sSummary: Message Rate is %d [msg/sec], Packet Rate is about %d [pkt/sec] (%d ip frags / msg)", prefix, msgps, pktps, ip_frags_per_msg);
 	if (g_pApp->m_const_params.giga_size){
-		log_msg("Summary: BandWidth is %.3f GBps (%.3f Gbps)", MBps/1000, MBps*8/1000);
+		log_msg("%sSummary: BandWidth is %.3f GBps (%.3f Gbps)", prefix, MBps/1000, MBps*8/1000);
 	}
 	else if (g_pApp->m_const_params.increase_output_precision){
-			log_msg("Summary: BandWidth is %.9f GBps (%.9f Gbps)", MBps, MBps*8);
+			log_msg("%sSummary: BandWidth is %.9f GBps (%.9f Gbps)", prefix, MBps, MBps*8);
 	}
 	else{
-		log_msg("Summary: BandWidth is %.3f MBps (%.3f Mbps)", MBps, MBps*8);
+		log_msg("%sSummary: BandWidth is %.3f MBps (%.3f Mbps)", prefix, MBps, MBps*8);
 	}
 }
 
@@ -329,7 +341,7 @@ void client_sig_handler(int signum)
 		log_msg("Test end (interrupted by signal %d)", signum);
 		return;
 	}
-	s_endTime.setNowNonInline();
+	g_c_endTime.setNowNonInline();
 	g_b_exit = true;
 
 	// Just in case not Activity updates where logged add a '\n'
@@ -370,13 +382,15 @@ ClientBase::~ClientBase()
 	delete m_pMsgRequest;
 }
 
+
 //------------------------------------------------------------------------------
 template <class IoType, class SwitchDataIntegrity, class SwitchActivityInfo, class SwitchCycleDuration, class SwitchMsgSize , class PongModeCare >
-Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize , PongModeCare>::Client(int _fd_min, int _fd_max, int _fd_num):
+Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize , PongModeCare>::Client(struct handler_info *_p_info):
 	ClientBase(),
-	m_ioHandler(_fd_min, _fd_max, _fd_num),
+	m_ioHandler(_p_info->fd_min, _p_info->fd_max, _p_info->fd_num),
 	m_pongModeCare(m_pMsgRequest)
 {
+	p_info = _p_info;
 	os_thread_init (&m_receiverTid);
 }
 
@@ -408,6 +422,8 @@ template <class IoType, class SwitchDataIntegrity, class SwitchActivityInfo, cla
 void Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize , PongModeCare>
 ::cleanupAfterLoop()
 {
+	p_info->c_endTime.setNowNonInline();
+
 	usleep(100*1000);//0.1 sec - wait for rx packets for last sends (in normal flow)
 	if (m_receiverTid.tid) {
 		os_thread_kill(&m_receiverTid);
@@ -426,7 +442,9 @@ void Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration
 	}
 	else if (g_pApp->m_const_params.b_stream)
 	{
-		stream_statistics(m_pMsgRequest);
+		p_info->sendCount = m_pMsgRequest->getSequenceCounter();
+
+		stream_statistics(p_info);
 	}
 	else
 	{
@@ -581,9 +599,12 @@ int Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration,
 					}
 
 					if (rc == SOCKPERF_ERR_NONE) {
-						s_startTime.setNowNonInline();
-						g_lastTicks = s_startTime;
-						g_cycleStartTime = s_startTime - g_pApp->m_const_params.cycleDuration;
+						p_info->c_startTime.setNowNonInline();
+						if (g_c_startTime == TicksTime::TICKS0) {
+							g_c_startTime = p_info->c_startTime;
+							g_lastTicks = p_info->c_startTime;
+							g_cycleStartTime = p_info->c_startTime - g_pApp->m_const_params.cycleDuration;
+						}
 					}
 				}
 			}
@@ -635,7 +656,7 @@ void Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration
 ::doPlayback()
 {
 	usleep(100*1000);//wait for receiver thread to start (since we don't use warmup) //TODO: configure!
-	s_startTime.setNowNonInline();//reduce code size by calling non inline func from slow path
+	p_info->c_startTime.setNowNonInline();//reduce code size by calling non inline func from slow path
 	const PlaybackVector &pv = * g_pApp->m_const_params.pPlaybackVector;
 
 	size_t i = 0;
@@ -655,7 +676,7 @@ void Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration
 		m_switchActivityInfo.execute(m_pMsgRequest->getSequenceCounter());
 	}
 	g_cycle_wait_loop_counter++; //for silenting waring at the end
-	s_endTime.setNowNonInline();//reduce code size by calling non inline func from slow path
+	p_info->c_endTime.setNowNonInline();//reduce code size by calling non inline func from slow path
 	usleep(20*1000);//wait for reply of last packet //TODO: configure!
 	g_b_exit = true;
 }
@@ -684,61 +705,61 @@ void Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration
 
 //------------------------------------------------------------------------------
 template <class IoType, class SwitchDataIntegrity, class SwitchActivityInfo, class SwitchCycleDuration, class SwitchMsgSize, class PongModeCare>
-void client_handler(int _fd_min, int _fd_max, int _fd_num) {
-	Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize, PongModeCare> c(_fd_min, _fd_max, _fd_num);
+void client_handler(struct handler_info *_p_info) {
+	Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize, PongModeCare> c(_p_info);
 	c.doHandler();
 }
 
 //------------------------------------------------------------------------------
 template <class IoType, class SwitchDataIntegrity, class SwitchActivityInfo, class SwitchCycleDuration, class SwitchMsgSize>
-void client_handler(int _fd_min, int _fd_max, int _fd_num) {
+void client_handler(struct handler_info *_p_info) {
 	if (g_pApp->m_const_params.b_stream)
-		client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize, PongModeNever> (_fd_min, _fd_max, _fd_num);
+		client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize, PongModeNever> (_p_info);
 	else if (g_pApp->m_const_params.reply_every == 1)
-		client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize, PongModeAlways> (_fd_min, _fd_max, _fd_num);
+		client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize, PongModeAlways> (_p_info);
 	else
-		client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize, PongModeNormal> (_fd_min, _fd_max, _fd_num);
+		client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchMsgSize, PongModeNormal> (_p_info);
 }
 
 //------------------------------------------------------------------------------
 template <class IoType, class SwitchDataIntegrity, class SwitchActivityInfo, class SwitchCycleDuration>
-void client_handler(int _fd_min, int _fd_max, int _fd_num) {
+void client_handler(struct handler_info *_p_info) {
 	if (g_pApp->m_const_params.msg_size_range > 0)
-		client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchOnMsgSize> (_fd_min, _fd_max, _fd_num);
+		client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchOnMsgSize> (_p_info);
 	else
-		client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchOff> (_fd_min, _fd_max, _fd_num);
+		client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration, SwitchOff> (_p_info);
 }
 
 //------------------------------------------------------------------------------
 template <class IoType, class SwitchDataIntegrity, class SwitchActivityInfo>
-void client_handler(int _fd_min, int _fd_max, int _fd_num) {
+void client_handler(struct handler_info *_p_info) {
 	if (g_pApp->m_const_params.cycleDuration > TicksDuration::TICKS0) {
 		if (g_pApp->m_const_params.dummy_mps) {
-			client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchOnDummySend> (_fd_min, _fd_max, _fd_num);
+			client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchOnDummySend> (_p_info);
 		} else {
-			client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchOnCycleDuration> (_fd_min, _fd_max, _fd_num);
+			client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchOnCycleDuration> (_p_info);
 		}
 	}
 	else
-		client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchOff> (_fd_min, _fd_max, _fd_num);
+		client_handler<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchOff> (_p_info);
 }
 
 //------------------------------------------------------------------------------
 template <class IoType, class SwitchDataIntegrity>
-void client_handler(int _fd_min, int _fd_max, int _fd_num) {
+void client_handler(struct handler_info *_p_info) {
 	if (g_pApp->m_const_params.packetrate_stats_print_ratio > 0)
-		client_handler<IoType, SwitchDataIntegrity, SwitchOnActivityInfo> (_fd_min, _fd_max, _fd_num);
+		client_handler<IoType, SwitchDataIntegrity, SwitchOnActivityInfo> (_p_info);
 	else
-		client_handler<IoType, SwitchDataIntegrity, SwitchOff> (_fd_min, _fd_max, _fd_num);
+		client_handler<IoType, SwitchDataIntegrity, SwitchOff> (_p_info);
 }
 
 //------------------------------------------------------------------------------
 template <class IoType>
-void client_handler(int _fd_min, int _fd_max, int _fd_num) {
+void client_handler(struct handler_info *_p_info) {
 	if (g_pApp->m_const_params.data_integrity)
-		client_handler<IoType, SwitchOnDataIntegrity> (_fd_min, _fd_max, _fd_num);
+		client_handler<IoType, SwitchOnDataIntegrity> (_p_info);
 	else
-		client_handler<IoType, SwitchOff> (_fd_min, _fd_max, _fd_num);
+		client_handler<IoType, SwitchOff> (_p_info);
 }
 
 //------------------------------------------------------------------------------
@@ -748,29 +769,29 @@ void client_handler(handler_info *p_info)
 		switch (g_pApp->m_const_params.fd_handler_type) {
 			case SELECT:
 			{
-				client_handler<IoSelect> (p_info->fd_min, p_info->fd_max, p_info->fd_num);
+				client_handler<IoSelect> (p_info);
 				break;
 			}
 			case RECVFROM:
 			{
-				client_handler<IoRecvfrom> (p_info->fd_min, p_info->fd_max, p_info->fd_num);
+				client_handler<IoRecvfrom> (p_info);
 				break;
 			}
 			case RECVFROMMUX:
 			{
-				client_handler<IoRecvfromMUX> (p_info->fd_min, p_info->fd_max, p_info->fd_num);
+				client_handler<IoRecvfromMUX> (p_info);
 				break;
 			}
 #ifndef WIN32
 			case POLL:
 			{
-				client_handler<IoPoll> (p_info->fd_min, p_info->fd_max, p_info->fd_num);
+				client_handler<IoPoll> (p_info);
 				break;
 			}
 #ifndef __FreeBSD__
 			case EPOLL:
 			{
-				client_handler<IoEpoll> (p_info->fd_min, p_info->fd_max, p_info->fd_num);
+				client_handler<IoEpoll> (p_info);
 				break;
 			}
 #endif
@@ -783,3 +804,26 @@ void client_handler(handler_info *p_info)
 		}
 	}
 }
+
+void *client_handler_multi_thread(void *arg)
+{
+	struct handler_info *p_info = (handler_info *)arg;
+
+	if (p_info) {
+		client_handler(p_info);
+
+		/* Mark this thread as complete (the first index is reserved for main thread) */
+		{
+			int i = p_info->id + 1;
+			if (p_info->id < g_pApp->m_const_params.threads_num) {
+				if (thread_pid_array && thread_pid_array[i].tid && (thread_pid_array[i].tid == os_getthread().tid)) {
+					ENTER_CRITICAL(&thread_exit_lock);
+					thread_pid_array[i].tid = 0;
+					LEAVE_CRITICAL(&thread_exit_lock);
+				}
+			}
+		}
+	}
+
+	return 0;
+}
diff --git a/src/Client.h b/src/Client.h
index 965b3b847cad..a3898705987a 100644
--- a/src/Client.h
+++ b/src/Client.h
@@ -53,6 +53,7 @@ private:
 	os_thread_t m_receiverTid;
 	IoType m_ioHandler;
 	addr_to_id   m_ServerList;
+	struct handler_info *p_info;
 
 	SwitchDataIntegrity m_switchDataIntegrity;
 	SwitchActivityInfo  m_switchActivityInfo;
@@ -61,7 +62,7 @@ private:
 	PongModeCare        m_pongModeCare; // has msg_sendto() method and can be one of: PongModeNormal, PongModeAlways, PongModeNever
 
 public:
-	Client(int _fd_min, int _fd_max, int _fd_num);
+	Client(struct handler_info *_p_info);
 	virtual ~Client();
 	void doHandler();
 	void client_receiver_thread();
diff --git a/src/Defs.h b/src/Defs.h
index 0ee0f1e229c7..97c54cd3b248 100644
--- a/src/Defs.h
+++ b/src/Defs.h
@@ -467,6 +467,14 @@ typedef struct handler_info {
 	int fd_min;					/**< minimum descriptor (fd) */
 	int fd_max;					/**< maximum socket descriptor (fd) */
 	int fd_num;					/**< number of socket descriptors */
+	/* These are all of the stats relevant to a single thread's streaming
+	 * I/O performance.  When running a throughput test as client and
+	 * running in multiple threads, we sum these up to across the
+	 * threads to get a total
+	 */
+	uint64_t sendCount;
+	TicksTime c_startTime;
+	TicksTime c_endTime;
 }handler_info;
 
 typedef struct clt_session_info {
diff --git a/src/SockPerf.cpp b/src/SockPerf.cpp
index 3c628cbc5ba1..3f912adf103e 100644
--- a/src/SockPerf.cpp
+++ b/src/SockPerf.cpp
@@ -91,8 +91,10 @@ CRITICAL_SECTION	thread_exit_lock;
 os_thread_t *thread_pid_array = NULL;
 
 // forward declarations from Client.cpp & Server.cpp
+extern void stream_statistics(struct handler_info *p_info);
 extern void client_sig_handler(int signum);
 extern void client_handler(handler_info *);
+extern void *client_handler_multi_thread(void *);
 extern void server_sig_handler(int signum);
 extern void server_handler(handler_info *);
 extern void *server_handler_multi_thread(void *);
@@ -362,8 +364,8 @@ void select_per_thread(void *(handler)(void *), int _fd_num) {
 	int last_fds = 0;
 	handler_info *handler_info_array = NULL;
 
-	handler_info_array = (handler_info*)MALLOC(sizeof(handler_info) * g_pApp->m_const_params.threads_num);
-	memset(handler_info_array, 0, sizeof(handler_info) * g_pApp->m_const_params.threads_num);
+	handler_info_array = (handler_info*)MALLOC(sizeof(handler_info) * (g_pApp->m_const_params.threads_num + 1));
+	memset(handler_info_array, 0, sizeof(handler_info) * (g_pApp->m_const_params.threads_num + 1));
 	if (!handler_info_array) {
 		log_err("Failed to allocate memory for handler_info_arr");
 		rc = SOCKPERF_ERR_NO_MEMORY;
@@ -390,7 +392,7 @@ void select_per_thread(void *(handler)(void *), int _fd_num) {
 		num_of_remainded_fds = _fd_num % g_pApp->m_const_params.threads_num;
 		fd_num = _fd_num / g_pApp->m_const_params.threads_num;
 
-		for (i = 0; i < g_pApp->m_const_params.threads_num; i++) {
+		for (i = 1; i <= g_pApp->m_const_params.threads_num; i++) {
 			handler_info *cur_handler_info = (handler_info_array + i);
 
 			/* Set ID of handler (thread) */
@@ -420,7 +422,7 @@ void select_per_thread(void *(handler)(void *), int _fd_num) {
 				rc = SOCKPERF_ERR_FATAL;
 				break;
 			}
-			thread_pid_array[i + 1].tid = thread.tid;
+			thread_pid_array[i].tid = thread.tid;
 			last_fds = cur_handler_info->fd_max + 1;
 		}
 
@@ -429,6 +431,17 @@ void select_per_thread(void *(handler)(void *), int _fd_num) {
 			sleep(1);
 		}
 
+		/* If we are stopped by a timer, we need to wait for the
+		 * sending threads to complete and fill out their p_info
+		 * structs or our totals are off.  We might have been the
+		 * thread that took the timer interrupt, and os_thread_join
+		 * below isn't waiting for us to get results, so just sleep
+		 * for a little extra time
+		 */
+		if (g_pApp->m_const_params.b_stream) {
+			sleep(1);
+		}
+
 		/* Stop all launched threads (the first index is reserved for main thread) */
 		for (i = 1; i <= g_pApp->m_const_params.threads_num; i++) {
 			os_thread_t cur_thread_pid;
@@ -448,6 +461,31 @@ void select_per_thread(void *(handler)(void *), int _fd_num) {
 		DELETE_CRITICAL(&thread_exit_lock);
 	}
 
+	/* Print out stream stats for all threads combined */
+	if (g_pApp->m_const_params.b_stream && g_pApp->m_const_params.mthread &&
+	    g_pApp->m_const_params.threads_num > 1) {
+		struct handler_info *p0 = handler_info_array;
+		struct handler_info *t;
+		int threads = g_pApp->m_const_params.threads_num;
+		TicksDuration threadRunTime, totalRunTime;
+
+		totalRunTime = TicksDuration::TICKS0;
+		/* Sum up the totals fields */
+		for (i = 1; i <= threads; i++) {
+			t = handler_info_array + i;
+			p0->sendCount += t->sendCount;
+			threadRunTime = t->c_endTime - t->c_startTime;
+			totalRunTime += threadRunTime;
+		}
+		/* average out the runtimes across the threads */
+		totalRunTime /= threads;
+		p0->c_startTime = t->c_startTime;
+		p0->c_endTime = t->c_startTime + totalRunTime;
+
+		/* print it out */
+		stream_statistics(p0);
+	}
+
 	/* Free thread info allocated data */
 	if (handler_info_array) {
 		FREE(handler_info_array);
@@ -3471,7 +3509,12 @@ void do_test()
 #endif
 	switch (s_user_params.mode) {
 	case MODE_CLIENT:
-		client_handler(&info);
+		if (s_user_params.b_stream && s_user_params.mthread) {
+			select_per_thread(client_handler_multi_thread, fd_num);
+		}
+		else {
+			client_handler(&info);
+		}
 		break;
 	case MODE_SERVER:
 		if (s_user_params.mthread) {
-- 
2.14.3

--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo-u79uwXL29TY76Z2rM5mHXA@public.gmane.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply related	[flat|nested] 6+ messages in thread

* Re: [RFC PATCH 0/4] Sockperf: Initial multi-threaded throughput client
       [not found] ` <cover.1513609601.git.dledford-H+wXaHxf7aLQT0dZR+AlfA@public.gmane.org>
                     ` (3 preceding siblings ...)
  2017-12-18 15:26   ` [PATCH 4/4] Initial implementation of threaded throughput client Doug Ledford
@ 2017-12-20  8:52   ` Leon Romanovsky
  4 siblings, 0 replies; 6+ messages in thread
From: Leon Romanovsky @ 2017-12-20  8:52 UTC (permalink / raw)
  To: Doug Ledford
  Cc: linux-rdma-u79uwXL29TY76Z2rM5mHXA, nirni-VPRAkNaXOzVWk0Htik3J/w

[-- Attachment #1: Type: text/plain, Size: 3543 bytes --]

On Mon, Dec 18, 2017 at 10:26:42AM -0500, Doug Ledford wrote:
> During testing, it has become painfully clear that a single threaded UDP
> test client can not exercise a 100Gig link due to issues related to
> single core maximum throughput.  This patchset implements a
> multi-threaded throughput test in sockperf.  This is just an initial
> implementation, there is still more work to be done.  In particular:
>
> 1) Although the speed improved with this change, it did not improve
> drastically.  As soon as the client send bottleneck was removed, it
> became clear there is another bottleneck on the server.  When sending to
> a server from one client, all data is received on a single queue pair,
> and due to how interrupts are spread in the RDMA stack (namely that each
> queue pair goes to a single interrupt and we rely on multiple queue
> pairs being in use to balance interrupts across different cores), we
> take all interrupts from a specific host on a single core and the
> receiving side then becomes the bottleneck with single core IPoIB
> receive processing being the limiting factor.  On a slower machine, I
> clocked 30GBit/s throughput.  On a faster machine as the server, I was
> able to get up to 70GBit/s throughput.
>
> 2) I thought I might try an experiment to get around the queue pair is
> on one CPU issue.  We use P_Keys in our internal lab setup, and so on
> the specific link in question, I actually have a total of three
> different IP interfaces on different P_Keys.  I tried to open tests on
> multiple of these interfaces to see how that would impact performance
> (so a multithreaded server listening on ports on three different P_Key
> interfaces all on the same physical link, which should use three
> different queue pairs, and a multithreaded client sending to those three
> different P_Key interfaces from three different P_Key interfaces of its
> own).  It tanked it.  Like less than gigabit ethernet speeds.  This
> warrants some investigation moving forward I think.
>
> 3) I thought I might try sending from two clients to the server at once
> and summing their throughput.  That was fun.  With UDP the clients are
> able to send enough data that flow control on the link kicks in, at
> which point each client starts dropping packets on the floor (they're
> UDP after all), and so the net result is that one client claimed
> 200GBit/s and the other about 175GBit/s.  Meanwhile, the server thought
> we were just kidding and didn't actually run a test at all.
>
> 4) I reran the test using TCP instead of UDP.  That's a non-starter.
> Whether due to my changes, or just because it is the way it is, the TCP
> tests all failed.  For larger message sizes, they failed instantly.  For
> smaller message sizes the test might run for a few seconds, but would
> eventually fail too.  Always the failure was that the server would get a
> message it deemed too large and would forcibly close all of the TCP
> connections, at which point the client just bails.
>
> I should point out that I don't program C++.  Issues with me not doing
> these patches in a C++ typical manner are related to that.
>

Doug,

I contacted the group which is responsible for the development of sockperf
https://github.com/Mellanox/sockperf

Their maintainer is on vacation till third week of January and unluckily
there are no other people right now who can take a look onto your proposal.

In meanwhile, it is better to push the code to github, because their
development flow is based on github and not on mailing list.

Thanks

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 833 bytes --]

^ permalink raw reply	[flat|nested] 6+ messages in thread

end of thread, other threads:[~2017-12-20  8:52 UTC | newest]

Thread overview: 6+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2017-12-18 15:26 [RFC PATCH 0/4] Sockperf: Initial multi-threaded throughput client Doug Ledford
     [not found] ` <cover.1513609601.git.dledford-H+wXaHxf7aLQT0dZR+AlfA@public.gmane.org>
2017-12-18 15:26   ` [PATCH 1/4] Rename a few variables Doug Ledford
2017-12-18 15:26   ` [PATCH 2/4] Move num-threads and cpu-affinity to common opts Doug Ledford
2017-12-18 15:26   ` [PATCH 3/4] Move server thread handler to SockPerf.cpp Doug Ledford
2017-12-18 15:26   ` [PATCH 4/4] Initial implementation of threaded throughput client Doug Ledford
2017-12-20  8:52   ` [RFC PATCH 0/4] Sockperf: Initial multi-threaded " Leon Romanovsky

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.