bpf.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH v5 bpf-next 0/3] selftests/bpf: Add parallelism to test_progs
@ 2021-09-16  3:26 Yucong Sun
  2021-09-16  3:26 ` [PATCH v5 bpf-next 1/3] " Yucong Sun
                   ` (2 more replies)
  0 siblings, 3 replies; 9+ messages in thread
From: Yucong Sun @ 2021-09-16  3:26 UTC (permalink / raw)
  To: andrii; +Cc: bpf, Yucong Sun

This patch series adds "-j" parelell execution to test_progs, with
"--debug" to display server/worker communications.

V5 -> V4: 
  * change to SOCK_SEQPACKET for close notification. 
  * move all debug output to "--debug" mode
  * output log as test finish, and all error logs again after summary line.
  * variable naming / style changes
  * adds serial_test_name() to replace serial test lists.

Yucong Sun (3):
  selftests/bpf: Add parallelism to test_progs
  selftests/bpf: add per worker cgroup suffix
  selftests/bpf: pin some tests to worker 0

 tools/testing/selftests/bpf/cgroup_helpers.c  |   5 +-
 tools/testing/selftests/bpf/cgroup_helpers.h  |   1 +
 .../selftests/bpf/prog_tests/bpf_obj_id.c     |   2 +-
 .../bpf/prog_tests/select_reuseport.c         |   2 +-
 .../testing/selftests/bpf/prog_tests/timer.c  |   2 +-
 .../selftests/bpf/prog_tests/xdp_bonding.c    |   2 +-
 .../selftests/bpf/prog_tests/xdp_link.c       |   2 +-
 tools/testing/selftests/bpf/test_progs.c      | 658 +++++++++++++++++-
 tools/testing/selftests/bpf/test_progs.h      |  36 +-
 9 files changed, 667 insertions(+), 43 deletions(-)

-- 
2.30.2


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH v5 bpf-next 1/3] selftests/bpf: Add parallelism to test_progs
  2021-09-16  3:26 [PATCH v5 bpf-next 0/3] selftests/bpf: Add parallelism to test_progs Yucong Sun
@ 2021-09-16  3:26 ` Yucong Sun
  2021-09-17 18:43   ` Andrii Nakryiko
  2021-09-17 19:44   ` Andrii Nakryiko
  2021-09-16  3:26 ` [PATCH v5 bpf-next 2/3] selftests/bpf: add per worker cgroup suffix Yucong Sun
  2021-09-16  3:26 ` [PATCH v5 bpf-next 3/3] selftests/bpf: pin some tests to worker 0 Yucong Sun
  2 siblings, 2 replies; 9+ messages in thread
From: Yucong Sun @ 2021-09-16  3:26 UTC (permalink / raw)
  To: andrii; +Cc: bpf, Yucong Sun

From: Yucong Sun <sunyucong@gmail.com>

This patch adds "-j" mode to test_progs, executing tests in multiple process.
"-j" mode is optional, and works with all existing test selection mechanism, as
well as "-v", "-l" etc.

In "-j" mode, main process use UDS to communicate to each forked worker,
commanding it to run tests and collect logs. After all tests are
finished, a summary is printed. main process use multiple competing
threads to dispatch work to worker, trying to keep them all busy.

The test status will be printed as soon as it is finished, if there are error
logs, it will be printed after the final summary line.

By specifying "--debug", additional debug information on server/worker
communication will be printed.

Example output:
  > ./test_progs -n 15-20 -j
  [   12.801730] bpf_testmod: loading out-of-tree module taints kernel.
  Launching 8 workers.
  #20 btf_split:OK
  #16 btf_endian:OK
  #18 btf_module:OK
  #17 btf_map_in_map:OK
  #19 btf_skc_cls_ingress:OK
  #15 btf_dump:OK
  Summary: 6/20 PASSED, 0 SKIPPED, 0 FAILED

Signed-off-by: Yucong Sun <sunyucong@gmail.com>
---
 tools/testing/selftests/bpf/test_progs.c | 577 +++++++++++++++++++++--
 tools/testing/selftests/bpf/test_progs.h |  36 +-
 2 files changed, 581 insertions(+), 32 deletions(-)

diff --git a/tools/testing/selftests/bpf/test_progs.c b/tools/testing/selftests/bpf/test_progs.c
index 2ed01f615d20..36f130455b2f 100644
--- a/tools/testing/selftests/bpf/test_progs.c
+++ b/tools/testing/selftests/bpf/test_progs.c
@@ -12,6 +12,11 @@
 #include <string.h>
 #include <execinfo.h> /* backtrace */
 #include <linux/membarrier.h>
+#include <sys/sysinfo.h> /* get_nprocs */
+#include <netinet/in.h>
+#include <sys/select.h>
+#include <sys/socket.h>
+#include <sys/un.h>
 
 /* Adapted from perf/util/string.c */
 static bool glob_match(const char *str, const char *pat)
@@ -48,6 +53,8 @@ struct prog_test_def {
 	bool force_log;
 	int error_cnt;
 	int skip_cnt;
+	int sub_succ_cnt;
+	bool should_run;
 	bool tested;
 	bool need_cgroup_cleanup;
 
@@ -97,6 +104,10 @@ static void dump_test_log(const struct prog_test_def *test, bool failed)
 	if (stdout == env.stdout)
 		return;
 
+	/* worker always holds log */
+	if (env.worker_id != -1)
+		return;
+
 	fflush(stdout); /* exports env.log_buf & env.log_cnt */
 
 	if (env.verbosity > VERBOSE_NONE || test->force_log || failed) {
@@ -172,14 +183,14 @@ void test__end_subtest()
 
 	dump_test_log(test, sub_error_cnt);
 
-	fprintf(env.stdout, "#%d/%d %s/%s:%s\n",
+	fprintf(stdout, "#%d/%d %s/%s:%s\n",
 	       test->test_num, test->subtest_num, test->test_name, test->subtest_name,
 	       sub_error_cnt ? "FAIL" : (test->skip_cnt ? "SKIP" : "OK"));
 
 	if (sub_error_cnt)
-		env.fail_cnt++;
+		test->error_cnt++;
 	else if (test->skip_cnt == 0)
-		env.sub_succ_cnt++;
+		test->sub_succ_cnt++;
 	skip_account();
 
 	free(test->subtest_name);
@@ -474,6 +485,8 @@ enum ARG_KEYS {
 	ARG_LIST_TEST_NAMES = 'l',
 	ARG_TEST_NAME_GLOB_ALLOWLIST = 'a',
 	ARG_TEST_NAME_GLOB_DENYLIST = 'd',
+	ARG_NUM_WORKERS = 'j',
+	ARG_DEBUG = -1,
 };
 
 static const struct argp_option opts[] = {
@@ -495,6 +508,10 @@ static const struct argp_option opts[] = {
 	  "Run tests with name matching the pattern (supports '*' wildcard)." },
 	{ "deny", ARG_TEST_NAME_GLOB_DENYLIST, "NAMES", 0,
 	  "Don't run tests with name matching the pattern (supports '*' wildcard)." },
+	{ "workers", ARG_NUM_WORKERS, "WORKERS", OPTION_ARG_OPTIONAL,
+	  "Number of workers to run in parallel, default to number of cpus." },
+	{ "debug", ARG_DEBUG, NULL, 0,
+	  "print extra debug information for test_progs." },
 	{},
 };
 
@@ -661,6 +678,20 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
 	case ARG_LIST_TEST_NAMES:
 		env->list_test_names = true;
 		break;
+	case ARG_NUM_WORKERS:
+		if (arg) {
+			env->workers = atoi(arg);
+			if (!env->workers) {
+				fprintf(stderr, "Invalid number of worker: %s.", arg);
+				return -1;
+			}
+		} else {
+			env->workers = get_nprocs();
+		}
+		break;
+	case ARG_DEBUG:
+		env->debug = true;
+		break;
 	case ARGP_KEY_ARG:
 		argp_usage(state);
 		break;
@@ -678,7 +709,7 @@ static void stdio_hijack(void)
 	env.stdout = stdout;
 	env.stderr = stderr;
 
-	if (env.verbosity > VERBOSE_NONE) {
+	if (env.verbosity > VERBOSE_NONE && env.worker_id == -1) {
 		/* nothing to do, output to stdout by default */
 		return;
 	}
@@ -704,10 +735,6 @@ static void stdio_restore(void)
 		return;
 
 	fclose(stdout);
-	free(env.log_buf);
-
-	env.log_buf = NULL;
-	env.log_cnt = 0;
 
 	stdout = env.stdout;
 	stderr = env.stderr;
@@ -794,11 +821,444 @@ void crash_handler(int signum)
 		dump_test_log(env.test, true);
 	if (env.stdout)
 		stdio_restore();
-
+	if (env.worker_id != -1)
+		fprintf(stderr, "[%d]: ", env.worker_id);
 	fprintf(stderr, "Caught signal #%d!\nStack trace:\n", signum);
 	backtrace_symbols_fd(bt, sz, STDERR_FILENO);
 }
 
+void sigint_handler(int signum) {
+	for (int i = 0; i < env.workers; i++)
+		close(env.worker_socks[i]);
+}
+
+static int current_test_idx = 0;
+static pthread_mutex_t current_test_lock;
+static pthread_mutex_t stdout_output_lock;
+
+struct test_result {
+	int error_cnt;
+	int skip_cnt;
+	int sub_succ_cnt;
+
+	size_t log_cnt;
+	char *log_buf;
+};
+
+static struct test_result test_results[ARRAY_SIZE(prog_test_defs)];
+
+static inline const char *str_msg(const struct msg *msg, char *buf)
+{
+	switch (msg->type) {
+	case MSG_DO_TEST:
+		sprintf(buf, "MSG_DO_TEST %d", msg->do_test.test_num);
+		break;
+	case MSG_TEST_DONE:
+		sprintf(buf, "MSG_TEST_DONE %d (log: %d)",
+			msg->test_done.test_num,
+			msg->test_done.have_log);
+		break;
+	case MSG_TEST_LOG:
+		sprintf(buf, "MSG_TEST_LOG (cnt: %ld, last: %d)",
+			strlen(msg->test_log.log_buf),
+			msg->test_log.is_last);
+		break;
+	case MSG_EXIT:
+		sprintf(buf, "MSG_EXIT");
+		break;
+	default:
+		sprintf(buf, "UNKNOWN");
+		break;
+	}
+
+	return buf;
+}
+
+static int send_message(int sock, const struct msg *msg)
+{
+	char buf[256];
+	if (env.verbosity > VERBOSE_SUPER)
+		fprintf(stderr, "Sending msg: %s\n", str_msg(msg, buf));
+	return send(sock, msg, sizeof(*msg), 0);
+}
+
+static int recv_message(int sock, struct msg *msg)
+{
+	int ret;
+	char buf[256];
+
+	memset(msg, 0, sizeof(*msg));
+	ret = recv(sock, msg, sizeof(*msg), 0);
+	if (ret >= 0) {
+		if (env.debug)
+			fprintf(stderr, "Received msg: %s\n", str_msg(msg, buf));
+	}
+	return ret;
+}
+
+static void run_one_test(int test_num) {
+	struct prog_test_def *test = &prog_test_defs[test_num];
+
+	env.test = test;
+
+	test->run_test();
+
+	/* ensure last sub-test is finalized properly */
+	if (test->subtest_name)
+	       test__end_subtest();
+
+	test->tested = true;
+
+	dump_test_log(test, test->error_cnt);
+
+	reset_affinity();
+	restore_netns();
+	if (test->need_cgroup_cleanup)
+	       cleanup_cgroup_environment();
+}
+
+static const char *get_test_name(int idx)
+{
+	struct prog_test_def *test;
+
+	test = &prog_test_defs[idx];
+	return test->test_name;
+}
+
+struct dispatch_data {
+	int worker_id;
+	int sock_fd;
+};
+
+void *dispatch_thread(void *ctx)
+{
+	struct dispatch_data *data = ctx;
+	int sock_fd;
+	FILE *log_fd = NULL;
+
+	sock_fd = data->sock_fd;
+
+	while (true) {
+		int test_to_run = -1;
+		struct prog_test_def *test;
+		struct test_result *result;
+
+		/* grab a test */
+		{
+			pthread_mutex_lock(&current_test_lock);
+
+			if (current_test_idx >= prog_test_cnt) {
+				pthread_mutex_unlock(&current_test_lock);
+				goto done;
+			}
+
+			test = &prog_test_defs[current_test_idx];
+			test_to_run = current_test_idx;
+			current_test_idx++;
+
+			pthread_mutex_unlock(&current_test_lock);
+		}
+
+		if (!test->should_run) {
+			continue;
+		}
+
+
+		/* run test through worker */
+		{
+			struct msg msg_do_test;
+
+			msg_do_test.type = MSG_DO_TEST;
+			msg_do_test.do_test.test_num = test_to_run;
+			if (send_message(sock_fd, &msg_do_test) < 0) {
+				perror("Fail to send command");
+				goto done;
+			}
+			env.worker_current_test[data->worker_id] = test_to_run;
+		}
+
+		/* wait for test done */
+		{
+			struct msg msg_test_done;
+
+			if (errno = recv_message(sock_fd, &msg_test_done) < 0)
+				goto error;
+			if (msg_test_done.type != MSG_TEST_DONE)
+				goto error;
+			if (test_to_run != msg_test_done.test_done.test_num)
+				goto error;
+
+			test->tested = true;
+			result = &test_results[test_to_run];
+
+			result->error_cnt = msg_test_done.test_done.error_cnt;
+			result->skip_cnt = msg_test_done.test_done.skip_cnt;
+			result->sub_succ_cnt = msg_test_done.test_done.sub_succ_cnt;
+
+			/* collect all logs */
+			if (msg_test_done.test_done.have_log) {
+				log_fd = open_memstream(&result->log_buf, &result->log_cnt);
+				if (!log_fd)
+					goto error;
+
+				while (true) {
+					struct msg msg_log;
+
+					if (recv_message(sock_fd, &msg_log) < 0)
+						goto error;
+					if (msg_log.type != MSG_TEST_LOG)
+						goto error;
+
+					fprintf(log_fd, "%s", msg_log.test_log.log_buf);
+					if (msg_log.test_log.is_last)
+						break;
+				}
+				fclose(log_fd);
+				log_fd = NULL;
+			}
+			/* output log */
+			{
+				pthread_mutex_lock(&stdout_output_lock);
+
+				if (result->log_cnt) {
+					result->log_buf[result->log_cnt] = '\0';
+					fprintf(stdout, "%s", result->log_buf);
+					if (result->log_buf[result->log_cnt - 1] != '\n')
+						fprintf(stdout, "\n");
+				}
+
+				fprintf(stdout, "#%d %s:%s\n",
+					test->test_num, test->test_name,
+					result->error_cnt ? "FAIL" : (result->skip_cnt ? "SKIP" : "OK"));
+
+				pthread_mutex_unlock(&stdout_output_lock);
+			}
+
+		} /* wait for test done */
+	} /* while (true) */
+error:
+	if (env.debug)
+		fprintf(stderr, "[%d]: Protocol/IO error: %s.\n", data->worker_id, strerror(errno));
+
+	if (log_fd)
+		fclose(log_fd);
+done:
+	{
+		struct msg msg_exit;
+
+		msg_exit.type = MSG_EXIT;
+		if (send_message(sock_fd, &msg_exit) < 0) {
+			if (env.debug)
+				fprintf(stderr, "[%d]: send_message msg_exit: %s.\n",
+					data->worker_id, strerror(errno));
+		}
+	}
+	return NULL;
+}
+
+static int server_main(void)
+{
+	pthread_t *dispatcher_threads;
+	struct dispatch_data *data;
+
+	dispatcher_threads = calloc(sizeof(pthread_t), env.workers);
+	data = calloc(sizeof(struct dispatch_data), env.workers);
+
+	env.worker_current_test = calloc(sizeof(int), env.workers);
+	for (int i = 0; i < env.workers; i++) {
+		int rc;
+
+		data[i].worker_id = i;
+		data[i].sock_fd = env.worker_socks[i];
+		rc = pthread_create(&dispatcher_threads[i], NULL, dispatch_thread, &data[i]);
+		if (rc < 0) {
+			perror("Failed to launch dispatcher thread");
+			return -1;
+		}
+	}
+
+	/* wait for all dispatcher to finish */
+	for (int i = 0; i < env.workers; i++) {
+		while (true) {
+			struct timespec timeout = {
+				.tv_sec = time(NULL) + 5,
+				.tv_nsec = 0
+			};
+			if (pthread_timedjoin_np(dispatcher_threads[i], NULL, &timeout) != ETIMEDOUT)
+				break;
+			if (env.debug)
+				fprintf(stderr, "Still waiting for thread %d (test %d).\n",
+					i,  env.worker_current_test[i] + 1);
+		}
+	}
+	free(dispatcher_threads);
+	free(env.worker_current_test);
+	free(data);
+
+	/* generate summary */
+	fflush(stderr);
+	fflush(stdout);
+
+	for (int i = 0; i < prog_test_cnt; i++) {
+		struct prog_test_def *current_test;
+		struct test_result *result;
+
+		current_test = &prog_test_defs[i];
+		result = &test_results[i];
+
+		if (!current_test->tested)
+			continue;
+
+		env.succ_cnt += result->error_cnt ? 0 : 1;
+		env.skip_cnt += result->skip_cnt;
+		env.fail_cnt += result->error_cnt;
+		env.sub_succ_cnt += result->sub_succ_cnt;
+	}
+
+	fprintf(stdout, "Summary: %d/%d PASSED, %d SKIPPED, %d FAILED\n",
+		env.succ_cnt, env.sub_succ_cnt, env.skip_cnt, env.fail_cnt);
+	if (env.fail_cnt)
+		fprintf(stdout, "\nAll error logs:\n");
+
+	/* print error logs again */
+
+	for (int i = 0; i < prog_test_cnt; i++) {
+		struct prog_test_def *current_test;
+		struct test_result *result;
+
+		current_test = &prog_test_defs[i];
+		result = &test_results[i];
+
+		if (!current_test->tested || !result->error_cnt)
+			continue;
+
+		if (result->log_cnt) {
+			result->log_buf[result->log_cnt] = '\0';
+			fprintf(stdout, "%s", result->log_buf);
+			if (result->log_buf[result->log_cnt - 1] != '\n')
+				fprintf(stdout, "\n");
+		}
+		fprintf(stdout, "#%d %s:%s\n",
+			current_test->test_num, current_test->test_name,
+			result->error_cnt ? "FAIL" : (result->skip_cnt ? "SKIP" : "OK"));
+	}
+
+	/* reap all workers */
+	for (int i = 0; i < env.workers; i++) {
+		int wstatus, pid;
+
+		pid = waitpid(env.worker_pids[i], &wstatus, 0);
+		if (pid != env.worker_pids[i])
+			perror("Unable to reap worker");
+	}
+
+	return 0;
+}
+
+static int worker_main(int sock)
+{
+	save_netns();
+
+	while (true) {
+		/* receive command */
+		struct msg msg;
+
+		if (recv_message(sock, &msg) < 0)
+			goto out;
+
+		switch (msg.type) {
+		case MSG_EXIT:
+			if (env.debug)
+				fprintf(stderr, "[%d]: worker exit.\n",
+					env.worker_id);
+			goto out;
+		case MSG_DO_TEST: {
+			int test_to_run;
+			struct prog_test_def *test;
+			struct msg msg_done;
+
+			test_to_run = msg.do_test.test_num;
+
+			if (env.debug)
+				fprintf(stderr, "[%d]: #%d:%s running.\n",
+					env.worker_id,
+					test_to_run + 1,
+					get_test_name(test_to_run));
+
+			test = &prog_test_defs[test_to_run];
+
+			stdio_hijack();
+
+			run_one_test(test_to_run);
+
+			stdio_restore();
+
+			memset(&msg_done, 0, sizeof(msg_done));
+			msg_done.type = MSG_TEST_DONE;
+			msg_done.test_done.test_num = test_to_run;
+			msg_done.test_done.error_cnt = test->error_cnt;
+			msg_done.test_done.skip_cnt = test->skip_cnt;
+			msg_done.test_done.sub_succ_cnt = test->sub_succ_cnt;
+			msg_done.test_done.have_log = false;
+
+			if (env.verbosity > VERBOSE_NONE || test->force_log || test->error_cnt) {
+				if (env.log_cnt)
+					msg_done.test_done.have_log = true;
+			}
+			if (send_message(sock, &msg_done) < 0) {
+				perror("Fail to send message done");
+				goto out;
+			}
+
+			/* send logs */
+			if (msg_done.test_done.have_log) {
+				char *src;
+				size_t slen;
+
+				src = env.log_buf;
+				slen = env.log_cnt;
+				while (slen) {
+					struct msg msg_log;
+					char *dest;
+					size_t len;
+
+					memset(&msg_log, 0, sizeof(msg_log));
+					msg_log.type = MSG_TEST_LOG;
+					dest = msg_log.test_log.log_buf;
+					len = slen >= MAX_LOG_TRUNK_SIZE ? MAX_LOG_TRUNK_SIZE : slen;
+					memcpy(dest, src, len);
+
+					src += len;
+					slen -= len;
+					if (!slen)
+						msg_log.test_log.is_last = true;
+
+					assert(send_message(sock, &msg_log) >= 0);
+				}
+			}
+			if (env.log_buf) {
+				free(env.log_buf);
+				env.log_buf = NULL;
+				env.log_cnt = 0;
+			}
+			if (env.debug)
+				fprintf(stderr, "[%d]: #%d:%s done.\n",
+					env.worker_id,
+					test_to_run + 1,
+					get_test_name(test_to_run));
+			break;
+		} /* case MSG_DO_TEST */
+		default:
+			if (env.debug)
+				fprintf(stderr, "[%d]: unknown message.\n",  env.worker_id);
+			return -1;
+		}
+	}
+out:
+	restore_netns();
+	return 0;
+}
+
 int main(int argc, char **argv)
 {
 	static const struct argp argp = {
@@ -809,10 +1269,15 @@ int main(int argc, char **argv)
 	struct sigaction sigact = {
 		.sa_handler = crash_handler,
 		.sa_flags = SA_RESETHAND,
-	};
+		};
+	struct sigaction sigact_int = {
+		.sa_handler = sigint_handler,
+		.sa_flags = SA_RESETHAND,
+		};
 	int err, i;
 
 	sigaction(SIGSEGV, &sigact, NULL);
+	sigaction(SIGINT, &sigact_int, NULL);
 
 	err = argp_parse(&argp, argc, argv, 0, NULL, &env);
 	if (err)
@@ -837,21 +1302,76 @@ int main(int argc, char **argv)
 		return -1;
 	}
 
-	save_netns();
-	stdio_hijack();
+	env.stdout = stdout;
+	env.stderr = stderr;
+
 	env.has_testmod = true;
 	if (!env.list_test_names && load_bpf_testmod()) {
 		fprintf(env.stderr, "WARNING! Selftests relying on bpf_testmod.ko will be skipped.\n");
 		env.has_testmod = false;
 	}
+
+	/* initializing tests */
 	for (i = 0; i < prog_test_cnt; i++) {
 		struct prog_test_def *test = &prog_test_defs[i];
 
-		env.test = test;
 		test->test_num = i + 1;
-
-		if (!should_run(&env.test_selector,
+		if (should_run(&env.test_selector,
 				test->test_num, test->test_name))
+			test->should_run = true;
+		else
+			test->should_run = false;
+	}
+
+	/* ignore workers if we are just listing */
+	if (env.get_test_cnt || env.list_test_names)
+		env.workers = 0;
+
+	/* launch workers if requested */
+	env.worker_id = -1; /* main process */
+	if (env.workers) {
+		env.worker_pids = calloc(sizeof(__pid_t), env.workers);
+		env.worker_socks = calloc(sizeof(int), env.workers);
+		fprintf(stdout, "Launching %d workers.\n", env.workers);
+		for (int i = 0; i < env.workers; i++) {
+			int sv[2];
+			pid_t pid;
+
+			if (socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, sv) < 0) {
+				perror("Fail to create worker socket");
+				return -1;
+			}
+			pid = fork();
+			if (pid < 0) {
+				perror("Failed to fork worker");
+				return -1;
+			} else if (pid != 0) { /* main process */
+				close(sv[1]);
+				env.worker_pids[i] = pid;
+				env.worker_socks[i] = sv[0];
+			} else { /* inside each worker process */
+				close(sv[0]);
+				env.worker_id = i;
+				return worker_main(sv[1]);
+			}
+		}
+
+		if (env.worker_id == -1) {
+			server_main();
+			goto out;
+		}
+	}
+
+	/* The rest of the main process */
+
+	/* on single mode */
+	save_netns();
+	stdio_hijack();
+
+	for (i = 0; i < prog_test_cnt; i++) {
+		struct prog_test_def *test = &prog_test_defs[i];
+
+		if (!test->should_run)
 			continue;
 
 		if (env.get_test_cnt) {
@@ -865,14 +1385,7 @@ int main(int argc, char **argv)
 			continue;
 		}
 
-		test->run_test();
-		/* ensure last sub-test is finalized properly */
-		if (test->subtest_name)
-			test__end_subtest();
-
-		test->tested = true;
-
-		dump_test_log(test, test->error_cnt);
+		run_one_test(i);
 
 		fprintf(env.stdout, "#%d %s:%s\n",
 			test->test_num, test->test_name,
@@ -882,16 +1395,16 @@ int main(int argc, char **argv)
 			env.fail_cnt++;
 		else
 			env.succ_cnt++;
-		skip_account();
 
-		reset_affinity();
-		restore_netns();
-		if (test->need_cgroup_cleanup)
-			cleanup_cgroup_environment();
+		skip_account();
+		env.sub_succ_cnt += test->sub_succ_cnt;
 	}
-	if (!env.list_test_names && env.has_testmod)
-		unload_bpf_testmod();
 	stdio_restore();
+	if (env.log_buf) {
+		free(env.log_buf);
+		env.log_buf = NULL;
+		env.log_cnt = 0;
+	}
 
 	if (env.get_test_cnt) {
 		printf("%d\n", env.succ_cnt);
@@ -904,14 +1417,16 @@ int main(int argc, char **argv)
 	fprintf(stdout, "Summary: %d/%d PASSED, %d SKIPPED, %d FAILED\n",
 		env.succ_cnt, env.sub_succ_cnt, env.skip_cnt, env.fail_cnt);
 
+	close(env.saved_netns_fd);
 out:
+	if (!env.list_test_names && env.has_testmod)
+		unload_bpf_testmod();
 	free_str_set(&env.test_selector.blacklist);
 	free_str_set(&env.test_selector.whitelist);
 	free(env.test_selector.num_set);
 	free_str_set(&env.subtest_selector.blacklist);
 	free_str_set(&env.subtest_selector.whitelist);
 	free(env.subtest_selector.num_set);
-	close(env.saved_netns_fd);
 
 	if (env.succ_cnt + env.fail_cnt + env.skip_cnt == 0)
 		return EXIT_NO_TEST;
diff --git a/tools/testing/selftests/bpf/test_progs.h b/tools/testing/selftests/bpf/test_progs.h
index 94bef0aa74cf..b239dc9fcef0 100644
--- a/tools/testing/selftests/bpf/test_progs.h
+++ b/tools/testing/selftests/bpf/test_progs.h
@@ -62,6 +62,7 @@ struct test_env {
 	struct test_selector test_selector;
 	struct test_selector subtest_selector;
 	bool verifier_stats;
+	bool debug;
 	enum verbosity verbosity;
 
 	bool jit_enabled;
@@ -69,7 +70,8 @@ struct test_env {
 	bool get_test_cnt;
 	bool list_test_names;
 
-	struct prog_test_def *test;
+	struct prog_test_def *test; /* current running tests */
+
 	FILE *stdout;
 	FILE *stderr;
 	char *log_buf;
@@ -82,6 +84,38 @@ struct test_env {
 	int skip_cnt; /* skipped tests */
 
 	int saved_netns_fd;
+	int workers; /* number of worker process */
+	int worker_id; /* id number of current worker, main process is -1 */
+	pid_t *worker_pids; /* array of worker pids */
+	int *worker_socks; /* array of worker socks */
+	int *worker_current_test; /* array of current running test for each worker */
+};
+
+#define MAX_LOG_TRUNK_SIZE 8192
+enum msg_type {
+	MSG_DO_TEST = 0,
+	MSG_TEST_DONE = 1,
+	MSG_TEST_LOG = 2,
+	MSG_EXIT = 255,
+};
+struct msg {
+	enum msg_type type;
+	union {
+		struct {
+			int test_num;
+		} do_test;
+		struct {
+			int test_num;
+			int sub_succ_cnt;
+			int error_cnt;
+			int skip_cnt;
+			bool have_log;
+		} test_done;
+		struct {
+			char log_buf[MAX_LOG_TRUNK_SIZE + 1];
+			bool is_last;
+		} test_log;
+	};
 };
 
 extern struct test_env env;
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH v5 bpf-next 2/3] selftests/bpf: add per worker cgroup suffix
  2021-09-16  3:26 [PATCH v5 bpf-next 0/3] selftests/bpf: Add parallelism to test_progs Yucong Sun
  2021-09-16  3:26 ` [PATCH v5 bpf-next 1/3] " Yucong Sun
@ 2021-09-16  3:26 ` Yucong Sun
  2021-09-17 21:10   ` Andrii Nakryiko
  2021-09-16  3:26 ` [PATCH v5 bpf-next 3/3] selftests/bpf: pin some tests to worker 0 Yucong Sun
  2 siblings, 1 reply; 9+ messages in thread
From: Yucong Sun @ 2021-09-16  3:26 UTC (permalink / raw)
  To: andrii; +Cc: bpf, Yucong Sun

From: Yucong Sun <sunyucong@gmail.com>

This patch allows each worker to use a unique cgroup base directory, thus
allowing tests that uses cgroups to run concurrently.

Signed-off-by: Yucong Sun <sunyucong@gmail.com>
---
 tools/testing/selftests/bpf/cgroup_helpers.c | 5 +++--
 tools/testing/selftests/bpf/cgroup_helpers.h | 1 +
 tools/testing/selftests/bpf/test_progs.c     | 5 +++++
 3 files changed, 9 insertions(+), 2 deletions(-)

diff --git a/tools/testing/selftests/bpf/cgroup_helpers.c b/tools/testing/selftests/bpf/cgroup_helpers.c
index 033051717ba5..a0429f0d6db2 100644
--- a/tools/testing/selftests/bpf/cgroup_helpers.c
+++ b/tools/testing/selftests/bpf/cgroup_helpers.c
@@ -29,9 +29,10 @@
 #define WALK_FD_LIMIT			16
 #define CGROUP_MOUNT_PATH		"/mnt"
 #define CGROUP_WORK_DIR			"/cgroup-test-work-dir"
+const char *CGROUP_WORK_DIR_SUFFIX = "";
 #define format_cgroup_path(buf, path) \
-	snprintf(buf, sizeof(buf), "%s%s%s", CGROUP_MOUNT_PATH, \
-		 CGROUP_WORK_DIR, path)
+	snprintf(buf, sizeof(buf), "%s%s%s%s", CGROUP_MOUNT_PATH, \
+	CGROUP_WORK_DIR, CGROUP_WORK_DIR_SUFFIX, path)
 
 /**
  * enable_all_controllers() - Enable all available cgroup v2 controllers
diff --git a/tools/testing/selftests/bpf/cgroup_helpers.h b/tools/testing/selftests/bpf/cgroup_helpers.h
index 5fe3d88e4f0d..5657aba02161 100644
--- a/tools/testing/selftests/bpf/cgroup_helpers.h
+++ b/tools/testing/selftests/bpf/cgroup_helpers.h
@@ -16,4 +16,5 @@ int setup_cgroup_environment(void);
 void cleanup_cgroup_environment(void);
 unsigned long long get_cgroup_id(const char *path);
 
+extern const char *CGROUP_WORK_DIR_SUFFIX;
 #endif
diff --git a/tools/testing/selftests/bpf/test_progs.c b/tools/testing/selftests/bpf/test_progs.c
index 36f130455b2f..77ed9204cc4a 100644
--- a/tools/testing/selftests/bpf/test_progs.c
+++ b/tools/testing/selftests/bpf/test_progs.c
@@ -1157,6 +1157,11 @@ static int server_main(void)
 
 static int worker_main(int sock)
 {
+	static char suffix[16];
+
+	sprintf(suffix, "%d", env.worker_id);
+	CGROUP_WORK_DIR_SUFFIX = suffix;
+
 	save_netns();
 
 	while (true) {
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH v5 bpf-next 3/3] selftests/bpf: pin some tests to worker 0
  2021-09-16  3:26 [PATCH v5 bpf-next 0/3] selftests/bpf: Add parallelism to test_progs Yucong Sun
  2021-09-16  3:26 ` [PATCH v5 bpf-next 1/3] " Yucong Sun
  2021-09-16  3:26 ` [PATCH v5 bpf-next 2/3] selftests/bpf: add per worker cgroup suffix Yucong Sun
@ 2021-09-16  3:26 ` Yucong Sun
  2021-09-17 21:22   ` Andrii Nakryiko
  2 siblings, 1 reply; 9+ messages in thread
From: Yucong Sun @ 2021-09-16  3:26 UTC (permalink / raw)
  To: andrii; +Cc: bpf, Yucong Sun

From: Yucong Sun <sunyucong@gmail.com>

This patch modify some tests to provide serial_test_name() instead of
test_name() to indicate it must run on worker 0. On encountering these tests,
all other threads will wait on a conditional variable, which worker 0 will
signal once the tests has finished running.

Additionally, before running the test, thread 0 also check and wait until all
other threads has finished their current work, to make sure the pinned test
really are the only test running in the system.

After this change, all tests should pass in '-j' mode.

Signed-off-by: Yucong Sun <sunyucong@gmail.com>
---
 .../selftests/bpf/prog_tests/bpf_obj_id.c     |   2 +-
 .../bpf/prog_tests/select_reuseport.c         |   2 +-
 .../testing/selftests/bpf/prog_tests/timer.c  |   2 +-
 .../selftests/bpf/prog_tests/xdp_bonding.c    |   2 +-
 .../selftests/bpf/prog_tests/xdp_link.c       |   2 +-
 tools/testing/selftests/bpf/test_progs.c      | 112 ++++++++++++++----
 6 files changed, 95 insertions(+), 27 deletions(-)

diff --git a/tools/testing/selftests/bpf/prog_tests/bpf_obj_id.c b/tools/testing/selftests/bpf/prog_tests/bpf_obj_id.c
index 284d5921c345..eb8eeebe6935 100644
--- a/tools/testing/selftests/bpf/prog_tests/bpf_obj_id.c
+++ b/tools/testing/selftests/bpf/prog_tests/bpf_obj_id.c
@@ -3,7 +3,7 @@
 
 #define nr_iters 2
 
-void test_bpf_obj_id(void)
+void serial_test_bpf_obj_id(void)
 {
 	const __u64 array_magic_value = 0xfaceb00c;
 	const __u32 array_key = 0;
diff --git a/tools/testing/selftests/bpf/prog_tests/select_reuseport.c b/tools/testing/selftests/bpf/prog_tests/select_reuseport.c
index 4efd337d6a3c..b5a0b7ed4310 100644
--- a/tools/testing/selftests/bpf/prog_tests/select_reuseport.c
+++ b/tools/testing/selftests/bpf/prog_tests/select_reuseport.c
@@ -858,7 +858,7 @@ void test_map_type(enum bpf_map_type mt)
 	cleanup();
 }
 
-void test_select_reuseport(void)
+void serial_test_select_reuseport(void)
 {
 	saved_tcp_fo = read_int_sysctl(TCP_FO_SYSCTL);
 	if (saved_tcp_fo < 0)
diff --git a/tools/testing/selftests/bpf/prog_tests/timer.c b/tools/testing/selftests/bpf/prog_tests/timer.c
index 25f40e1b9967..bbd074d407fb 100644
--- a/tools/testing/selftests/bpf/prog_tests/timer.c
+++ b/tools/testing/selftests/bpf/prog_tests/timer.c
@@ -39,7 +39,7 @@ static int timer(struct timer *timer_skel)
 	return 0;
 }
 
-void test_timer(void)
+void serial_test_timer(void)
 {
 	struct timer *timer_skel = NULL;
 	int err;
diff --git a/tools/testing/selftests/bpf/prog_tests/xdp_bonding.c b/tools/testing/selftests/bpf/prog_tests/xdp_bonding.c
index 370d220288a6..bb6e0d0c5f79 100644
--- a/tools/testing/selftests/bpf/prog_tests/xdp_bonding.c
+++ b/tools/testing/selftests/bpf/prog_tests/xdp_bonding.c
@@ -468,7 +468,7 @@ static struct bond_test_case bond_test_cases[] = {
 	{ "xdp_bonding_xor_layer34", BOND_MODE_XOR, BOND_XMIT_POLICY_LAYER34, },
 };
 
-void test_xdp_bonding(void)
+void serial_test_xdp_bonding(void)
 {
 	libbpf_print_fn_t old_print_fn;
 	struct skeletons skeletons = {};
diff --git a/tools/testing/selftests/bpf/prog_tests/xdp_link.c b/tools/testing/selftests/bpf/prog_tests/xdp_link.c
index 46eed0a33c23..983ab0b47d30 100644
--- a/tools/testing/selftests/bpf/prog_tests/xdp_link.c
+++ b/tools/testing/selftests/bpf/prog_tests/xdp_link.c
@@ -6,7 +6,7 @@
 
 #define IFINDEX_LO 1
 
-void test_xdp_link(void)
+void serial_test_xdp_link(void)
 {
 	__u32 duration = 0, id1, id2, id0 = 0, prog_fd1, prog_fd2, err;
 	DECLARE_LIBBPF_OPTS(bpf_xdp_set_link_opts, opts, .old_fd = -1);
diff --git a/tools/testing/selftests/bpf/test_progs.c b/tools/testing/selftests/bpf/test_progs.c
index 77ed9204cc4a..c980ed766947 100644
--- a/tools/testing/selftests/bpf/test_progs.c
+++ b/tools/testing/selftests/bpf/test_progs.c
@@ -50,6 +50,7 @@ struct prog_test_def {
 	const char *test_name;
 	int test_num;
 	void (*run_test)(void);
+	void (*run_serial_test)(void);
 	bool force_log;
 	int error_cnt;
 	int skip_cnt;
@@ -457,14 +458,17 @@ static int load_bpf_testmod(void)
 }
 
 /* extern declarations for test funcs */
-#define DEFINE_TEST(name) extern void test_##name(void);
+#define DEFINE_TEST(name)				\
+	extern void test_##name(void) __weak;		\
+	extern void serial_test_##name(void) __weak;
 #include <prog_tests/tests.h>
 #undef DEFINE_TEST
 
 static struct prog_test_def prog_test_defs[] = {
-#define DEFINE_TEST(name) {		\
-	.test_name = #name,		\
-	.run_test = &test_##name,	\
+#define DEFINE_TEST(name) {			\
+	.test_name = #name,			\
+	.run_test = &test_##name,		\
+	.run_serial_test = &serial_test_##name,	\
 },
 #include <prog_tests/tests.h>
 #undef DEFINE_TEST
@@ -835,6 +839,7 @@ void sigint_handler(int signum) {
 static int current_test_idx = 0;
 static pthread_mutex_t current_test_lock;
 static pthread_mutex_t stdout_output_lock;
+static pthread_cond_t wait_for_worker0 = PTHREAD_COND_INITIALIZER;
 
 struct test_result {
 	int error_cnt;
@@ -901,7 +906,10 @@ static void run_one_test(int test_num) {
 
 	env.test = test;
 
-	test->run_test();
+	if (test->run_test)
+		test->run_test();
+	else if (test->run_serial_test)
+		test->run_serial_test();
 
 	/* ensure last sub-test is finalized properly */
 	if (test->subtest_name)
@@ -925,6 +933,11 @@ static const char *get_test_name(int idx)
 	return test->test_name;
 }
 
+static inline bool is_serial_test(int idx)
+{
+	return prog_test_defs[idx].run_serial_test != NULL;
+}
+
 struct dispatch_data {
 	int worker_id;
 	int sock_fd;
@@ -943,6 +956,8 @@ void *dispatch_thread(void *ctx)
 		struct prog_test_def *test;
 		struct test_result *result;
 
+		env.worker_current_test[data->worker_id] = -1;
+
 		/* grab a test */
 		{
 			pthread_mutex_lock(&current_test_lock);
@@ -954,15 +969,42 @@ void *dispatch_thread(void *ctx)
 
 			test = &prog_test_defs[current_test_idx];
 			test_to_run = current_test_idx;
-			current_test_idx++;
 
-			pthread_mutex_unlock(&current_test_lock);
-		}
+			test = &prog_test_defs[test_to_run];
 
-		if (!test->should_run) {
-			continue;
-		}
+			if (!test->should_run) {
+				current_test_idx++;
+				pthread_mutex_unlock(&current_test_lock);
+				goto next;
+			}
+
+			if (is_serial_test(current_test_idx)) {
+				if (data->worker_id != 0) {
+					if (env.debug)
+						fprintf(stderr, "[%d]: Waiting for thread 0 to finish serialized test: %d.\n",
+							data->worker_id, current_test_idx + 1);
+					/* wait for worker 0 to pick this job up and finish */
+					pthread_cond_wait(&wait_for_worker0, &current_test_lock);
+					pthread_mutex_unlock(&current_test_lock);
+					goto next;
+				} else {
+					/* wait until all other worker has parked */
+					for (int i = 1; i < env.workers; i++) {
+						if (env.worker_current_test[i] != -1) {
+							if (env.debug)
+								fprintf(stderr, "[%d]: Waiting for other threads to finish current test...\n", data->worker_id);
+							pthread_mutex_unlock(&current_test_lock);
+							usleep(1 * 1000 * 1000);
+							goto next;
+						}
+					}
+				}
+			} else {
+				current_test_idx++;
+			}
 
+			pthread_mutex_unlock(&current_test_lock);
+		}
 
 		/* run test through worker */
 		{
@@ -1035,6 +1077,14 @@ void *dispatch_thread(void *ctx)
 			}
 
 		} /* wait for test done */
+
+		/* unblock all other dispatcher threads */
+		if (is_serial_test(test_to_run) && data->worker_id == 0) {
+			current_test_idx++;
+			pthread_cond_broadcast(&wait_for_worker0);
+		}
+next:
+	continue;
 	} /* while (true) */
 error:
 	if (env.debug)
@@ -1060,16 +1110,19 @@ static int server_main(void)
 {
 	pthread_t *dispatcher_threads;
 	struct dispatch_data *data;
+	int all_finished = false;
 
 	dispatcher_threads = calloc(sizeof(pthread_t), env.workers);
 	data = calloc(sizeof(struct dispatch_data), env.workers);
 
 	env.worker_current_test = calloc(sizeof(int), env.workers);
+
 	for (int i = 0; i < env.workers; i++) {
 		int rc;
 
 		data[i].worker_id = i;
 		data[i].sock_fd = env.worker_socks[i];
+		env.worker_current_test[i] = -1;
 		rc = pthread_create(&dispatcher_threads[i], NULL, dispatch_thread, &data[i]);
 		if (rc < 0) {
 			perror("Failed to launch dispatcher thread");
@@ -1078,19 +1131,28 @@ static int server_main(void)
 	}
 
 	/* wait for all dispatcher to finish */
-	for (int i = 0; i < env.workers; i++) {
-		while (true) {
-			struct timespec timeout = {
-				.tv_sec = time(NULL) + 5,
-				.tv_nsec = 0
-			};
-			if (pthread_timedjoin_np(dispatcher_threads[i], NULL, &timeout) != ETIMEDOUT)
-				break;
-			if (env.debug)
-				fprintf(stderr, "Still waiting for thread %d (test %d).\n",
-					i,  env.worker_current_test[i] + 1);
+	while (!all_finished) {
+		all_finished = true;
+		for (int i = 0; i < env.workers; i++) {
+			if (!dispatcher_threads[i])
+				continue;
+
+			if (pthread_tryjoin_np(dispatcher_threads[i], NULL) == EBUSY) {
+				all_finished = false;
+				if (!env.debug) continue;
+				if (env.worker_current_test[i] == -1)
+					fprintf(stderr, "Still waiting for thread %d (blocked by thread 0).\n", i);
+				else
+					fprintf(stderr, "Still waiting for thread %d (test #%d:%s).\n",
+						i, env.worker_current_test[i] + 1,
+						get_test_name(env.worker_current_test[i]));
+			} else {
+				dispatcher_threads[i] = 0;
+			}
 		}
+		usleep(10 * 1000 * 1000);
 	}
+
 	free(dispatcher_threads);
 	free(env.worker_current_test);
 	free(data);
@@ -1326,6 +1388,12 @@ int main(int argc, char **argv)
 			test->should_run = true;
 		else
 			test->should_run = false;
+
+		if (test->run_test == NULL && test->run_serial_test == NULL) {
+			fprintf(stderr, "Test %d:%s must have either test_%s() or serial_test_%sl() defined.\n",
+				test->test_num, test->test_name, test->test_name, test->test_name);
+			exit(EXIT_ERR_SETUP_INFRA);
+		}
 	}
 
 	/* ignore workers if we are just listing */
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* Re: [PATCH v5 bpf-next 1/3] selftests/bpf: Add parallelism to test_progs
  2021-09-16  3:26 ` [PATCH v5 bpf-next 1/3] " Yucong Sun
@ 2021-09-17 18:43   ` Andrii Nakryiko
  2021-09-17 18:54     ` Andrii Nakryiko
  2021-09-17 19:44   ` Andrii Nakryiko
  1 sibling, 1 reply; 9+ messages in thread
From: Andrii Nakryiko @ 2021-09-17 18:43 UTC (permalink / raw)
  To: Yucong Sun; +Cc: Andrii Nakryiko, bpf, Yucong Sun

On Wed, Sep 15, 2021 at 8:26 PM Yucong Sun <fallentree@fb.com> wrote:
>
> From: Yucong Sun <sunyucong@gmail.com>
>
> This patch adds "-j" mode to test_progs, executing tests in multiple process.
> "-j" mode is optional, and works with all existing test selection mechanism, as
> well as "-v", "-l" etc.
>
> In "-j" mode, main process use UDS to communicate to each forked worker,
> commanding it to run tests and collect logs. After all tests are
> finished, a summary is printed. main process use multiple competing
> threads to dispatch work to worker, trying to keep them all busy.
>
> The test status will be printed as soon as it is finished, if there are error
> logs, it will be printed after the final summary line.
>
> By specifying "--debug", additional debug information on server/worker
> communication will be printed.
>
> Example output:
>   > ./test_progs -n 15-20 -j
>   [   12.801730] bpf_testmod: loading out-of-tree module taints kernel.
>   Launching 8 workers.
>   #20 btf_split:OK
>   #16 btf_endian:OK
>   #18 btf_module:OK
>   #17 btf_map_in_map:OK
>   #19 btf_skc_cls_ingress:OK
>   #15 btf_dump:OK
>   Summary: 6/20 PASSED, 0 SKIPPED, 0 FAILED
>
> Signed-off-by: Yucong Sun <sunyucong@gmail.com>
> ---

A bit late to review this, sorry. I'm still looking through the code,
but decided to try it out locally first. And here's what I got
immediately running in QEMU:

[vmuser@archvm bpf]$ time sudo ./test_progs -t core
#32 core_autosize:OK
#33 core_extern:OK
#34 core_read_macros:OK
#35 core_reloc:OK
#36 core_retro:OK
Summary: 5/107 PASSED, 0 SKIPPED, 0 FAILED

real    0m0.927s
user    0m0.197s
sys     0m0.103s
[vmuser@archvm bpf]$ time sudo ./test_progs -t core -j
Launching 8 workers.
#34 core_read_macros:OK
#32 core_autosize:OK
#36 core_retro:OK
#33 core_extern:OK
#35 core_reloc:OK
Summary: 5/107 PASSED, 0 SKIPPED, 0 FAILED

real    0m20.048s
user    0m0.194s
sys     0m0.183s


So, first, "Launching 8 workers." should be only displayed with --debug, no?

But most importantly, why does the parallel version take 20 seconds?..
Please take a look, something is not right.

>  tools/testing/selftests/bpf/test_progs.c | 577 +++++++++++++++++++++--
>  tools/testing/selftests/bpf/test_progs.h |  36 +-
>  2 files changed, 581 insertions(+), 32 deletions(-)
>

[...]

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH v5 bpf-next 1/3] selftests/bpf: Add parallelism to test_progs
  2021-09-17 18:43   ` Andrii Nakryiko
@ 2021-09-17 18:54     ` Andrii Nakryiko
  0 siblings, 0 replies; 9+ messages in thread
From: Andrii Nakryiko @ 2021-09-17 18:54 UTC (permalink / raw)
  To: Yucong Sun; +Cc: Andrii Nakryiko, bpf, Yucong Sun

On Fri, Sep 17, 2021 at 11:43 AM Andrii Nakryiko
<andrii.nakryiko@gmail.com> wrote:
>
> On Wed, Sep 15, 2021 at 8:26 PM Yucong Sun <fallentree@fb.com> wrote:
> >
> > From: Yucong Sun <sunyucong@gmail.com>
> >
> > This patch adds "-j" mode to test_progs, executing tests in multiple process.
> > "-j" mode is optional, and works with all existing test selection mechanism, as
> > well as "-v", "-l" etc.
> >
> > In "-j" mode, main process use UDS to communicate to each forked worker,
> > commanding it to run tests and collect logs. After all tests are
> > finished, a summary is printed. main process use multiple competing
> > threads to dispatch work to worker, trying to keep them all busy.
> >
> > The test status will be printed as soon as it is finished, if there are error
> > logs, it will be printed after the final summary line.
> >
> > By specifying "--debug", additional debug information on server/worker
> > communication will be printed.
> >
> > Example output:
> >   > ./test_progs -n 15-20 -j
> >   [   12.801730] bpf_testmod: loading out-of-tree module taints kernel.
> >   Launching 8 workers.
> >   #20 btf_split:OK
> >   #16 btf_endian:OK
> >   #18 btf_module:OK
> >   #17 btf_map_in_map:OK
> >   #19 btf_skc_cls_ingress:OK
> >   #15 btf_dump:OK
> >   Summary: 6/20 PASSED, 0 SKIPPED, 0 FAILED
> >
> > Signed-off-by: Yucong Sun <sunyucong@gmail.com>
> > ---
>
> A bit late to review this, sorry. I'm still looking through the code,
> but decided to try it out locally first. And here's what I got
> immediately running in QEMU:
>
> [vmuser@archvm bpf]$ time sudo ./test_progs -t core
> #32 core_autosize:OK
> #33 core_extern:OK
> #34 core_read_macros:OK
> #35 core_reloc:OK
> #36 core_retro:OK
> Summary: 5/107 PASSED, 0 SKIPPED, 0 FAILED
>
> real    0m0.927s
> user    0m0.197s
> sys     0m0.103s
> [vmuser@archvm bpf]$ time sudo ./test_progs -t core -j
> Launching 8 workers.
> #34 core_read_macros:OK
> #32 core_autosize:OK
> #36 core_retro:OK
> #33 core_extern:OK
> #35 core_reloc:OK
> Summary: 5/107 PASSED, 0 SKIPPED, 0 FAILED
>
> real    0m20.048s
> user    0m0.194s
> sys     0m0.183s
>
>
> So, first, "Launching 8 workers." should be only displayed with --debug, no?
>
> But most importantly, why does the parallel version take 20 seconds?..
> Please take a look, something is not right.

Running full run:

Summary: 176/972 PASSED, 4 SKIPPED, 2 FAILED

All error logs:
test_cg_storage_multi:PASS:cg-create-parent 0 nsec
test_cg_storage_multi:PASS:cg-create-child 0 nsec
test_egress_only:PASS:skel-load 0 nsec
test_egress_only:PASS:parent-cg-attach 0 nsec
test_egress_only:PASS:first-connect-send 0 nsec
test_egress_only:PASS:first-invoke 0 nsec
assert_storage:PASS:map-lookup 0 nsec
assert_storage:PASS:assert-storage 0 nsec
assert_storage_noexist:PASS:map-lookup 0 nsec
assert_storage_noexist:PASS:map-lookup 0 nsec
test_egress_only:PASS:child-cg-attach 0 nsec
test_egress_only:PASS:second-connect-send 0 nsec
test_egress_only:PASS:second-invoke 0 nsec
assert_storage:PASS:map-lookup 0 nsec
assert_storage:PASS:assert-storage 0 nsec
assert_storage:PASS:map-lookup 0 nsec
assert_storage:PASS:assert-storage 0 nsec
#28/1 cg_storage_multi/egress_only:OK
test_isolated:PASS:skel-load 0 nsec
test_isolated:PASS:parent-egress1-cg-attach 0 nsec
test_isolated:PASS:parent-egress2-cg-attach 0 nsec
test_isolated:PASS:parent-ingress-cg-attach 0 nsec
test_isolated:PASS:first-connect-send 0 nsec
test_isolated:FAIL:first-invoke invocations=2#28/2
cg_storage_multi/isolated:FAIL
test_shared:PASS:skel-load 0 nsec
test_shared:PASS:parent-egress1-cg-attach 0 nsec
test_shared:PASS:parent-egress2-cg-attach 0 nsec
test_shared:PASS:parent-ingress-cg-attach 0 nsec
test_shared:PASS:first-connect-send 0 nsec
test_shared:PASS:first-invoke 0 nsec
assert_storage:PASS:map-lookup 0 nsec
assert_storage:PASS:assert-storage 0 nsec
assert_storage_noexist:PASS:map-lookup 0 nsec
assert_storage_noexist:PASS:map-lookup 0 nsec
test_shared:PASS:child-egress1-cg-attach 0 nsec
test_shared:PASS:child-egress2-cg-attach 0 nsec
test_shared:PASS:child-ingress-cg-attach 0 nsec
test_shared:PASS:second-connect-send 0 nsec
test_shared:PASS:second-invoke 0 nsec
assert_storage:PASS:map-lookup 0 nsec
assert_storage:PASS:assert-storage 0 nsec
assert_storage:PASS:map-lookup 0 nsec
assert_storage:PASS:assert-storage 0 nsec
#28/3 cg_storage_multi/shared:OK
#28 cg_storage_multi:FAIL

real    1m0.057s
user    0m4.167s
sys     0m40.824s


Running in sequential mode, I got this timing:

real    1m10.007s
user    0m3.910s
sys     0m37.004s

So not much speed up, unfortunately. I assumed it's bpf_verif_scale
test (which we will break up to parallelize it better). So I re-ran
with it blacklisted, let's see how it did:

Summary: 175/949 PASSED, 4 SKIPPED, 1 FAILED

All error logs:
libbpf: elf: skipping unrecognized data section(7) .rodata.str1.1
test_snprintf_btf:PASS:skel_open 0 nsec
test_snprintf_btf:PASS:skel_load 0 nsec
test_snprintf_btf:PASS:skel_attach 0 nsec
test_snprintf_btf:PASS:system 0 nsec
test_snprintf_btf:PASS:bpf_snprintf_ret 0 nsec
test_snprintf_btf:PASS:check if subtests ran 0 nsec
test_snprintf_btf:FAIL:check all subtests ran only ran 1535 of 1431 tests
#116 snprintf_btf:FAIL

real    0m40.041s
user    0m4.212s
sys     0m7.966s

Note how cg_storage_multi didn't fail this time, but snprintf_btf did.
I think we'll need to audit selftests some more to see which ones are
not filtering by pid and make some system-wide modifications, those
would need to be adapted or marked as serial. We probably don't need
to do it in this first patch set, but definitely would have to do it
before we can start using this parallel mode for real both for local
development and in CI. For now let's concentrate on runner's
correctness (e.g., those 20 seconds don't seem right at all).

But getting back to my comparison, running all but bpf_verif_scale
test sequentially:

Summary: 176/949 PASSED, 5 SKIPPED, 0 FAILED

real    0m39.343s
user    0m3.857s
sys     0m6.509s

It is actually faster to run sequentially... Any idea what might be
going on here?

>
> >  tools/testing/selftests/bpf/test_progs.c | 577 +++++++++++++++++++++--
> >  tools/testing/selftests/bpf/test_progs.h |  36 +-
> >  2 files changed, 581 insertions(+), 32 deletions(-)
> >
>
> [...]

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH v5 bpf-next 1/3] selftests/bpf: Add parallelism to test_progs
  2021-09-16  3:26 ` [PATCH v5 bpf-next 1/3] " Yucong Sun
  2021-09-17 18:43   ` Andrii Nakryiko
@ 2021-09-17 19:44   ` Andrii Nakryiko
  1 sibling, 0 replies; 9+ messages in thread
From: Andrii Nakryiko @ 2021-09-17 19:44 UTC (permalink / raw)
  To: Yucong Sun; +Cc: Andrii Nakryiko, bpf, Yucong Sun

On Wed, Sep 15, 2021 at 8:26 PM Yucong Sun <fallentree@fb.com> wrote:
>
> From: Yucong Sun <sunyucong@gmail.com>
>
> This patch adds "-j" mode to test_progs, executing tests in multiple process.
> "-j" mode is optional, and works with all existing test selection mechanism, as
> well as "-v", "-l" etc.
>
> In "-j" mode, main process use UDS to communicate to each forked worker,
> commanding it to run tests and collect logs. After all tests are
> finished, a summary is printed. main process use multiple competing
> threads to dispatch work to worker, trying to keep them all busy.
>
> The test status will be printed as soon as it is finished, if there are error
> logs, it will be printed after the final summary line.
>
> By specifying "--debug", additional debug information on server/worker
> communication will be printed.
>
> Example output:
>   > ./test_progs -n 15-20 -j
>   [   12.801730] bpf_testmod: loading out-of-tree module taints kernel.
>   Launching 8 workers.
>   #20 btf_split:OK
>   #16 btf_endian:OK
>   #18 btf_module:OK
>   #17 btf_map_in_map:OK
>   #19 btf_skc_cls_ingress:OK
>   #15 btf_dump:OK
>   Summary: 6/20 PASSED, 0 SKIPPED, 0 FAILED
>
> Signed-off-by: Yucong Sun <sunyucong@gmail.com>
> ---

Love the error summary logic. Can we please have it in sequential mode
as well?... :)

Also didn't find anything obvious that would explain 20 second
parallel run for few small and fast tests.

>  tools/testing/selftests/bpf/test_progs.c | 577 +++++++++++++++++++++--
>  tools/testing/selftests/bpf/test_progs.h |  36 +-
>  2 files changed, 581 insertions(+), 32 deletions(-)
>

[...]

> @@ -661,6 +678,20 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
>         case ARG_LIST_TEST_NAMES:
>                 env->list_test_names = true;
>                 break;
> +       case ARG_NUM_WORKERS:
> +               if (arg) {
> +                       env->workers = atoi(arg);
> +                       if (!env->workers) {
> +                               fprintf(stderr, "Invalid number of worker: %s.", arg);
> +                               return -1;

I missed this problem when SELFTESTS_VERBOSE logic was added, but all
the rest of the code returns -EINVAL, not -EPERM (-1). Let's keep it
consistent.

> +                       }
> +               } else {
> +                       env->workers = get_nprocs();
> +               }
> +               break;
> +       case ARG_DEBUG:
> +               env->debug = true;
> +               break;
>         case ARGP_KEY_ARG:
>                 argp_usage(state);
>                 break;
> @@ -678,7 +709,7 @@ static void stdio_hijack(void)
>         env.stdout = stdout;
>         env.stderr = stderr;
>
> -       if (env.verbosity > VERBOSE_NONE) {
> +       if (env.verbosity > VERBOSE_NONE && env.worker_id == -1) {
>                 /* nothing to do, output to stdout by default */
>                 return;
>         }
> @@ -704,10 +735,6 @@ static void stdio_restore(void)
>                 return;
>
>         fclose(stdout);
> -       free(env.log_buf);
> -
> -       env.log_buf = NULL;
> -       env.log_cnt = 0;
>
>         stdout = env.stdout;
>         stderr = env.stderr;
> @@ -794,11 +821,444 @@ void crash_handler(int signum)
>                 dump_test_log(env.test, true);
>         if (env.stdout)
>                 stdio_restore();
> -
> +       if (env.worker_id != -1)
> +               fprintf(stderr, "[%d]: ", env.worker_id);
>         fprintf(stderr, "Caught signal #%d!\nStack trace:\n", signum);
>         backtrace_symbols_fd(bt, sz, STDERR_FILENO);
>  }
>
> +void sigint_handler(int signum) {

{ on new line, please run checkpatch.pl

> +       for (int i = 0; i < env.workers; i++)
> +               close(env.worker_socks[i]);

this can race with env.worker_socks allocation, no? Better to install
signal handler after we initialize env completely. Also it's a good
idea to reset sigint handler after first ctrl-c just in case something
got stuck. So that second ctrl-c will unconditionally kill the
process. Nothing more annoying than application that got stuck and it
doesn't die on ctrl-c

> +}
> +
> +static int current_test_idx = 0;
> +static pthread_mutex_t current_test_lock;
> +static pthread_mutex_t stdout_output_lock;
> +
> +struct test_result {
> +       int error_cnt;
> +       int skip_cnt;
> +       int sub_succ_cnt;
> +
> +       size_t log_cnt;
> +       char *log_buf;
> +};
> +
> +static struct test_result test_results[ARRAY_SIZE(prog_test_defs)];
> +
> +static inline const char *str_msg(const struct msg *msg, char *buf)
> +{
> +       switch (msg->type) {
> +       case MSG_DO_TEST:
> +               sprintf(buf, "MSG_DO_TEST %d", msg->do_test.test_num);
> +               break;
> +       case MSG_TEST_DONE:
> +               sprintf(buf, "MSG_TEST_DONE %d (log: %d)",
> +                       msg->test_done.test_num,
> +                       msg->test_done.have_log);
> +               break;
> +       case MSG_TEST_LOG:
> +               sprintf(buf, "MSG_TEST_LOG (cnt: %ld, last: %d)",
> +                       strlen(msg->test_log.log_buf),
> +                       msg->test_log.is_last);
> +               break;
> +       case MSG_EXIT:
> +               sprintf(buf, "MSG_EXIT");
> +               break;
> +       default:
> +               sprintf(buf, "UNKNOWN");
> +               break;
> +       }
> +
> +       return buf;
> +}
> +
> +static int send_message(int sock, const struct msg *msg)
> +{
> +       char buf[256];

empty line after variable declaration block

> +       if (env.verbosity > VERBOSE_SUPER)
> +               fprintf(stderr, "Sending msg: %s\n", str_msg(msg, buf));

I thought you were going to make this depend on --debug? you can also
combine check for --debug and verbosity levels, if necessry, so that
you can have more control for debugging

> +       return send(sock, msg, sizeof(*msg), 0);
> +}
> +
> +static int recv_message(int sock, struct msg *msg)
> +{
> +       int ret;
> +       char buf[256];
> +
> +       memset(msg, 0, sizeof(*msg));
> +       ret = recv(sock, msg, sizeof(*msg), 0);
> +       if (ret >= 0) {
> +               if (env.debug)
> +                       fprintf(stderr, "Received msg: %s\n", str_msg(msg, buf));
> +       }
> +       return ret;
> +}
> +
> +static void run_one_test(int test_num) {

{ on separate line

> +       struct prog_test_def *test = &prog_test_defs[test_num];
> +
> +       env.test = test;
> +
> +       test->run_test();
> +
> +       /* ensure last sub-test is finalized properly */
> +       if (test->subtest_name)
> +              test__end_subtest();
> +
> +       test->tested = true;
> +
> +       dump_test_log(test, test->error_cnt);
> +
> +       reset_affinity();
> +       restore_netns();
> +       if (test->need_cgroup_cleanup)
> +              cleanup_cgroup_environment();
> +}
> +
> +static const char *get_test_name(int idx)
> +{
> +       struct prog_test_def *test;
> +
> +       test = &prog_test_defs[idx];
> +       return test->test_name;
> +}

seems like a quite useless helper, prog_tests_defs[idx].test_name is
just as short. If you are worried about prog_test_defs[] setup
changing, then adding helper to return pointer to the test definition
itself is more useful, because you fetch that test def pointer in more
places

> +
> +struct dispatch_data {
> +       int worker_id;
> +       int sock_fd;
> +};
> +
> +void *dispatch_thread(void *ctx)

static

> +{
> +       struct dispatch_data *data = ctx;
> +       int sock_fd;
> +       FILE *log_fd = NULL;

not a FD, log_file/log_stream?

> +
> +       sock_fd = data->sock_fd;
> +
> +       while (true) {
> +               int test_to_run = -1;

no need to initialize, it's always set if it's used. This -1 made me
search the code needlessly to figure out where this -1 matters.

> +               struct prog_test_def *test;
> +               struct test_result *result;
> +
> +               /* grab a test */
> +               {
> +                       pthread_mutex_lock(&current_test_lock);
> +
> +                       if (current_test_idx >= prog_test_cnt) {
> +                               pthread_mutex_unlock(&current_test_lock);
> +                               goto done;
> +                       }
> +
> +                       test = &prog_test_defs[current_test_idx];
> +                       test_to_run = current_test_idx;
> +                       current_test_idx++;
> +
> +                       pthread_mutex_unlock(&current_test_lock);
> +               }
> +
> +               if (!test->should_run) {
> +                       continue;
> +               }

styling: no {} around single-line if bodies. But instead of
locking/unlocking and then checking, why not just find the next test
that should_run above?

> +
> +
> +               /* run test through worker */
> +               {
> +                       struct msg msg_do_test;
> +
> +                       msg_do_test.type = MSG_DO_TEST;
> +                       msg_do_test.do_test.test_num = test_to_run;
> +                       if (send_message(sock_fd, &msg_do_test) < 0) {
> +                               perror("Fail to send command");
> +                               goto done;
> +                       }
> +                       env.worker_current_test[data->worker_id] = test_to_run;
> +               }
> +
> +               /* wait for test done */
> +               {
> +                       struct msg msg_test_done;
> +
> +                       if (errno = recv_message(sock_fd, &msg_test_done) < 0)

is this

errno = (recv_message(sock_fd, &msg_test_done) < 0) && errno != 0

or

errno = recv_message(sock_fd, &msg_test_done);
if (errno < 0) { ... }

?

So please no. And also don't use errno (global thread-local variable
declared by libc) for this, declare `int err`.

> +                               goto error;
> +                       if (msg_test_done.type != MSG_TEST_DONE)
> +                               goto error;
> +                       if (test_to_run != msg_test_done.test_done.test_num)
> +                               goto error;
> +
> +                       test->tested = true;
> +                       result = &test_results[test_to_run];
> +
> +                       result->error_cnt = msg_test_done.test_done.error_cnt;
> +                       result->skip_cnt = msg_test_done.test_done.skip_cnt;
> +                       result->sub_succ_cnt = msg_test_done.test_done.sub_succ_cnt;

s/msg_test_done/msg/, same above for msg_do_test. You have a local
variable in extra {} lexical scope, so no conflict. And it will read
non-redundantly: msg.test_done.sub_succ_cnt.

> +
> +                       /* collect all logs */
> +                       if (msg_test_done.test_done.have_log) {
> +                               log_fd = open_memstream(&result->log_buf, &result->log_cnt);
> +                               if (!log_fd)
> +                                       goto error;
> +
> +                               while (true) {
> +                                       struct msg msg_log;
> +
> +                                       if (recv_message(sock_fd, &msg_log) < 0)
> +                                               goto error;
> +                                       if (msg_log.type != MSG_TEST_LOG)
> +                                               goto error;
> +
> +                                       fprintf(log_fd, "%s", msg_log.test_log.log_buf);
> +                                       if (msg_log.test_log.is_last)
> +                                               break;
> +                               }
> +                               fclose(log_fd);
> +                               log_fd = NULL;
> +                       }
> +                       /* output log */
> +                       {
> +                               pthread_mutex_lock(&stdout_output_lock);
> +
> +                               if (result->log_cnt) {
> +                                       result->log_buf[result->log_cnt] = '\0';
> +                                       fprintf(stdout, "%s", result->log_buf);
> +                                       if (result->log_buf[result->log_cnt - 1] != '\n')
> +                                               fprintf(stdout, "\n");
> +                               }
> +
> +                               fprintf(stdout, "#%d %s:%s\n",
> +                                       test->test_num, test->test_name,
> +                                       result->error_cnt ? "FAIL" : (result->skip_cnt ? "SKIP" : "OK"));
> +
> +                               pthread_mutex_unlock(&stdout_output_lock);
> +                       }
> +
> +               } /* wait for test done */
> +       } /* while (true) */
> +error:
> +       if (env.debug)
> +               fprintf(stderr, "[%d]: Protocol/IO error: %s.\n", data->worker_id, strerror(errno));

you don't always set errno above when you do `goto error`. Please
don't use errno directly, it's better to save it into int err, and do
`err = -errno; goto error;` everywhere. It's way too easy to forget
and clobber errno.

> +
> +       if (log_fd)
> +               fclose(log_fd);
> +done:
> +       {
> +               struct msg msg_exit;
> +
> +               msg_exit.type = MSG_EXIT;
> +               if (send_message(sock_fd, &msg_exit) < 0) {
> +                       if (env.debug)
> +                               fprintf(stderr, "[%d]: send_message msg_exit: %s.\n",
> +                                       data->worker_id, strerror(errno));
> +               }
> +       }
> +       return NULL;
> +}
> +
> +static int server_main(void)
> +{
> +       pthread_t *dispatcher_threads;
> +       struct dispatch_data *data;
> +
> +       dispatcher_threads = calloc(sizeof(pthread_t), env.workers);
> +       data = calloc(sizeof(struct dispatch_data), env.workers);
> +
> +       env.worker_current_test = calloc(sizeof(int), env.workers);

nit: this is backwards use of calloc(). First argument is the count,
the second is the size of the element. I don't know if that matters,
but it might matter for some alignment logic. So please swap the
order.

> +       for (int i = 0; i < env.workers; i++) {

int i inside for  isn't C89, please declare outside. And we should
specify whatever the compiler flags are specified for the kernel to
make the compiler complain about this...

> +               int rc;
> +
> +               data[i].worker_id = i;
> +               data[i].sock_fd = env.worker_socks[i];
> +               rc = pthread_create(&dispatcher_threads[i], NULL, dispatch_thread, &data[i]);
> +               if (rc < 0) {
> +                       perror("Failed to launch dispatcher thread");
> +                       return -1;
> +               }
> +       }
> +
> +       /* wait for all dispatcher to finish */
> +       for (int i = 0; i < env.workers; i++) {

same

> +               while (true) {
> +                       struct timespec timeout = {
> +                               .tv_sec = time(NULL) + 5,
> +                               .tv_nsec = 0
> +                       };

empty line goes here, checkpatch.pl goes everywhere :)

> +                       if (pthread_timedjoin_np(dispatcher_threads[i], NULL, &timeout) != ETIMEDOUT)
> +                               break;
> +                       if (env.debug)
> +                               fprintf(stderr, "Still waiting for thread %d (test %d).\n",
> +                                       i,  env.worker_current_test[i] + 1);
> +               }
> +       }
> +       free(dispatcher_threads);
> +       free(env.worker_current_test);
> +       free(data);
> +
> +       /* generate summary */
> +       fflush(stderr);
> +       fflush(stdout);
> +
> +       for (int i = 0; i < prog_test_cnt; i++) {
> +               struct prog_test_def *current_test;
> +               struct test_result *result;
> +
> +               current_test = &prog_test_defs[i];
> +               result = &test_results[i];
> +
> +               if (!current_test->tested)
> +                       continue;
> +
> +               env.succ_cnt += result->error_cnt ? 0 : 1;
> +               env.skip_cnt += result->skip_cnt;
> +               env.fail_cnt += result->error_cnt;
> +               env.sub_succ_cnt += result->sub_succ_cnt;
> +       }
> +
> +       fprintf(stdout, "Summary: %d/%d PASSED, %d SKIPPED, %d FAILED\n",
> +               env.succ_cnt, env.sub_succ_cnt, env.skip_cnt, env.fail_cnt);
> +       if (env.fail_cnt)
> +               fprintf(stdout, "\nAll error logs:\n");
> +
> +       /* print error logs again */
> +
> +       for (int i = 0; i < prog_test_cnt; i++) {
> +               struct prog_test_def *current_test;
> +               struct test_result *result;
> +
> +               current_test = &prog_test_defs[i];
> +               result = &test_results[i];
> +
> +               if (!current_test->tested || !result->error_cnt)
> +                       continue;
> +
> +               if (result->log_cnt) {
> +                       result->log_buf[result->log_cnt] = '\0';
> +                       fprintf(stdout, "%s", result->log_buf);
> +                       if (result->log_buf[result->log_cnt - 1] != '\n')
> +                               fprintf(stdout, "\n");
> +               }
> +               fprintf(stdout, "#%d %s:%s\n",
> +                       current_test->test_num, current_test->test_name,
> +                       result->error_cnt ? "FAIL" : (result->skip_cnt ? "SKIP" : "OK"));

This if and fprintfs is duplicated, can you move it into a helper?

> +       }
> +
> +       /* reap all workers */
> +       for (int i = 0; i < env.workers; i++) {
> +               int wstatus, pid;
> +
> +               pid = waitpid(env.worker_pids[i], &wstatus, 0);
> +               if (pid != env.worker_pids[i])
> +                       perror("Unable to reap worker");
> +       }
> +
> +       return 0;
> +}
> +
> +static int worker_main(int sock)
> +{
> +       save_netns();
> +
> +       while (true) {
> +               /* receive command */
> +               struct msg msg;
> +
> +               if (recv_message(sock, &msg) < 0)
> +                       goto out;
> +
> +               switch (msg.type) {
> +               case MSG_EXIT:
> +                       if (env.debug)
> +                               fprintf(stderr, "[%d]: worker exit.\n",
> +                                       env.worker_id);
> +                       goto out;
> +               case MSG_DO_TEST: {
> +                       int test_to_run;
> +                       struct prog_test_def *test;
> +                       struct msg msg_done;
> +
> +                       test_to_run = msg.do_test.test_num;
> +
> +                       if (env.debug)
> +                               fprintf(stderr, "[%d]: #%d:%s running.\n",
> +                                       env.worker_id,
> +                                       test_to_run + 1,
> +                                       get_test_name(test_to_run));
> +
> +                       test = &prog_test_defs[test_to_run];
> +
> +                       stdio_hijack();
> +
> +                       run_one_test(test_to_run);
> +
> +                       stdio_restore();
> +
> +                       memset(&msg_done, 0, sizeof(msg_done));

just reuse msg, it's not used anymore at this point

> +                       msg_done.type = MSG_TEST_DONE;
> +                       msg_done.test_done.test_num = test_to_run;
> +                       msg_done.test_done.error_cnt = test->error_cnt;
> +                       msg_done.test_done.skip_cnt = test->skip_cnt;
> +                       msg_done.test_done.sub_succ_cnt = test->sub_succ_cnt;
> +                       msg_done.test_done.have_log = false;
> +
> +                       if (env.verbosity > VERBOSE_NONE || test->force_log || test->error_cnt) {
> +                               if (env.log_cnt)
> +                                       msg_done.test_done.have_log = true;
> +                       }
> +                       if (send_message(sock, &msg_done) < 0) {
> +                               perror("Fail to send message done");
> +                               goto out;
> +                       }
> +
> +                       /* send logs */
> +                       if (msg_done.test_done.have_log) {
> +                               char *src;
> +                               size_t slen;
> +
> +                               src = env.log_buf;
> +                               slen = env.log_cnt;
> +                               while (slen) {
> +                                       struct msg msg_log;

same, msg can be just reused

> +                                       char *dest;
> +                                       size_t len;
> +
> +                                       memset(&msg_log, 0, sizeof(msg_log));
> +                                       msg_log.type = MSG_TEST_LOG;
> +                                       dest = msg_log.test_log.log_buf;
> +                                       len = slen >= MAX_LOG_TRUNK_SIZE ? MAX_LOG_TRUNK_SIZE : slen;
> +                                       memcpy(dest, src, len);
> +
> +                                       src += len;
> +                                       slen -= len;
> +                                       if (!slen)
> +                                               msg_log.test_log.is_last = true;
> +
> +                                       assert(send_message(sock, &msg_log) >= 0);

missed assert here

> +                               }
> +                       }
> +                       if (env.log_buf) {
> +                               free(env.log_buf);
> +                               env.log_buf = NULL;
> +                               env.log_cnt = 0;
> +                       }
> +                       if (env.debug)
> +                               fprintf(stderr, "[%d]: #%d:%s done.\n",
> +                                       env.worker_id,
> +                                       test_to_run + 1,
> +                                       get_test_name(test_to_run));
> +                       break;
> +               } /* case MSG_DO_TEST */
> +               default:
> +                       if (env.debug)
> +                               fprintf(stderr, "[%d]: unknown message.\n",  env.worker_id);
> +                       return -1;
> +               }
> +       }
> +out:
> +       restore_netns();
> +       return 0;
> +}
> +
>  int main(int argc, char **argv)
>  {
>         static const struct argp argp = {
> @@ -809,10 +1269,15 @@ int main(int argc, char **argv)
>         struct sigaction sigact = {
>                 .sa_handler = crash_handler,
>                 .sa_flags = SA_RESETHAND,
> -       };
> +               };
> +       struct sigaction sigact_int = {
> +               .sa_handler = sigint_handler,
> +               .sa_flags = SA_RESETHAND,
> +               };
>         int err, i;
>
>         sigaction(SIGSEGV, &sigact, NULL);
> +       sigaction(SIGINT, &sigact_int, NULL);

as mentioned above, let's delay it until we setup environment completely?

>
>         err = argp_parse(&argp, argc, argv, 0, NULL, &env);
>         if (err)
> @@ -837,21 +1302,76 @@ int main(int argc, char **argv)
>                 return -1;
>         }
>
> -       save_netns();
> -       stdio_hijack();
> +       env.stdout = stdout;
> +       env.stderr = stderr;
> +
>         env.has_testmod = true;
>         if (!env.list_test_names && load_bpf_testmod()) {
>                 fprintf(env.stderr, "WARNING! Selftests relying on bpf_testmod.ko will be skipped.\n");
>                 env.has_testmod = false;
>         }
> +
> +       /* initializing tests */
>         for (i = 0; i < prog_test_cnt; i++) {
>                 struct prog_test_def *test = &prog_test_defs[i];
>
> -               env.test = test;
>                 test->test_num = i + 1;
> -
> -               if (!should_run(&env.test_selector,
> +               if (should_run(&env.test_selector,
>                                 test->test_num, test->test_name))
> +                       test->should_run = true;
> +               else
> +                       test->should_run = false;

test->should_run = should_run(...)

> +       }
> +
> +       /* ignore workers if we are just listing */
> +       if (env.get_test_cnt || env.list_test_names)
> +               env.workers = 0;
> +
> +       /* launch workers if requested */
> +       env.worker_id = -1; /* main process */
> +       if (env.workers) {
> +               env.worker_pids = calloc(sizeof(__pid_t), env.workers);
> +               env.worker_socks = calloc(sizeof(int), env.workers);

check for NULLs

> +               fprintf(stdout, "Launching %d workers.\n", env.workers);

if (debug)

> +               for (int i = 0; i < env.workers; i++) {
> +                       int sv[2];
> +                       pid_t pid;
> +
> +                       if (socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, sv) < 0) {
> +                               perror("Fail to create worker socket");
> +                               return -1;
> +                       }
> +                       pid = fork();
> +                       if (pid < 0) {
> +                               perror("Failed to fork worker");
> +                               return -1;
> +                       } else if (pid != 0) { /* main process */
> +                               close(sv[1]);
> +                               env.worker_pids[i] = pid;
> +                               env.worker_socks[i] = sv[0];
> +                       } else { /* inside each worker process */
> +                               close(sv[0]);
> +                               env.worker_id = i;
> +                               return worker_main(sv[1]);
> +                       }
> +               }
> +
> +               if (env.worker_id == -1) {

you have return worker_main above, can't be anything else, drop unnecessary ifs

> +                       server_main();
> +                       goto out;
> +               }
> +       }
> +
> +       /* The rest of the main process */
> +
> +       /* on single mode */
> +       save_netns();
> +       stdio_hijack();
> +
> +       for (i = 0; i < prog_test_cnt; i++) {
> +               struct prog_test_def *test = &prog_test_defs[i];
> +
> +               if (!test->should_run)
>                         continue;
>
>                 if (env.get_test_cnt) {
> @@ -865,14 +1385,7 @@ int main(int argc, char **argv)
>                         continue;
>                 }
>
> -               test->run_test();
> -               /* ensure last sub-test is finalized properly */
> -               if (test->subtest_name)
> -                       test__end_subtest();
> -
> -               test->tested = true;
> -
> -               dump_test_log(test, test->error_cnt);
> +               run_one_test(i);
>
>                 fprintf(env.stdout, "#%d %s:%s\n",
>                         test->test_num, test->test_name,
> @@ -882,16 +1395,16 @@ int main(int argc, char **argv)
>                         env.fail_cnt++;
>                 else
>                         env.succ_cnt++;
> -               skip_account();
>
> -               reset_affinity();
> -               restore_netns();
> -               if (test->need_cgroup_cleanup)
> -                       cleanup_cgroup_environment();
> +               skip_account();

can you please double-check skip accounting in parallel case? I'm not
sure it's there.

> +               env.sub_succ_cnt += test->sub_succ_cnt;
>         }
> -       if (!env.list_test_names && env.has_testmod)
> -               unload_bpf_testmod();
>         stdio_restore();
> +       if (env.log_buf) {
> +               free(env.log_buf);
> +               env.log_buf = NULL;
> +               env.log_cnt = 0;
> +       }

[...]

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH v5 bpf-next 2/3] selftests/bpf: add per worker cgroup suffix
  2021-09-16  3:26 ` [PATCH v5 bpf-next 2/3] selftests/bpf: add per worker cgroup suffix Yucong Sun
@ 2021-09-17 21:10   ` Andrii Nakryiko
  0 siblings, 0 replies; 9+ messages in thread
From: Andrii Nakryiko @ 2021-09-17 21:10 UTC (permalink / raw)
  To: Yucong Sun; +Cc: Andrii Nakryiko, bpf, Yucong Sun

On Wed, Sep 15, 2021 at 8:26 PM Yucong Sun <fallentree@fb.com> wrote:
>
> From: Yucong Sun <sunyucong@gmail.com>
>
> This patch allows each worker to use a unique cgroup base directory, thus
> allowing tests that uses cgroups to run concurrently.
>
> Signed-off-by: Yucong Sun <sunyucong@gmail.com>
> ---

What if we always set the prefix to be a PID of the process?

BTW, Daniel mentioned that this might need some rebasing and conflict
resolution due to his patch in bpf tree, which is now merged into
bpf-next tree.

>  tools/testing/selftests/bpf/cgroup_helpers.c | 5 +++--
>  tools/testing/selftests/bpf/cgroup_helpers.h | 1 +
>  tools/testing/selftests/bpf/test_progs.c     | 5 +++++
>  3 files changed, 9 insertions(+), 2 deletions(-)
>
> diff --git a/tools/testing/selftests/bpf/cgroup_helpers.c b/tools/testing/selftests/bpf/cgroup_helpers.c
> index 033051717ba5..a0429f0d6db2 100644
> --- a/tools/testing/selftests/bpf/cgroup_helpers.c
> +++ b/tools/testing/selftests/bpf/cgroup_helpers.c
> @@ -29,9 +29,10 @@
>  #define WALK_FD_LIMIT                  16
>  #define CGROUP_MOUNT_PATH              "/mnt"
>  #define CGROUP_WORK_DIR                        "/cgroup-test-work-dir"
> +const char *CGROUP_WORK_DIR_SUFFIX = "";
>  #define format_cgroup_path(buf, path) \
> -       snprintf(buf, sizeof(buf), "%s%s%s", CGROUP_MOUNT_PATH, \
> -                CGROUP_WORK_DIR, path)
> +       snprintf(buf, sizeof(buf), "%s%s%s%s", CGROUP_MOUNT_PATH, \
> +       CGROUP_WORK_DIR, CGROUP_WORK_DIR_SUFFIX, path)
>
>  /**
>   * enable_all_controllers() - Enable all available cgroup v2 controllers
> diff --git a/tools/testing/selftests/bpf/cgroup_helpers.h b/tools/testing/selftests/bpf/cgroup_helpers.h
> index 5fe3d88e4f0d..5657aba02161 100644
> --- a/tools/testing/selftests/bpf/cgroup_helpers.h
> +++ b/tools/testing/selftests/bpf/cgroup_helpers.h
> @@ -16,4 +16,5 @@ int setup_cgroup_environment(void);
>  void cleanup_cgroup_environment(void);
>  unsigned long long get_cgroup_id(const char *path);
>
> +extern const char *CGROUP_WORK_DIR_SUFFIX;
>  #endif
> diff --git a/tools/testing/selftests/bpf/test_progs.c b/tools/testing/selftests/bpf/test_progs.c
> index 36f130455b2f..77ed9204cc4a 100644
> --- a/tools/testing/selftests/bpf/test_progs.c
> +++ b/tools/testing/selftests/bpf/test_progs.c
> @@ -1157,6 +1157,11 @@ static int server_main(void)
>
>  static int worker_main(int sock)
>  {
> +       static char suffix[16];
> +
> +       sprintf(suffix, "%d", env.worker_id);
> +       CGROUP_WORK_DIR_SUFFIX = suffix;
> +
>         save_netns();
>
>         while (true) {
> --
> 2.30.2
>

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH v5 bpf-next 3/3] selftests/bpf: pin some tests to worker 0
  2021-09-16  3:26 ` [PATCH v5 bpf-next 3/3] selftests/bpf: pin some tests to worker 0 Yucong Sun
@ 2021-09-17 21:22   ` Andrii Nakryiko
  0 siblings, 0 replies; 9+ messages in thread
From: Andrii Nakryiko @ 2021-09-17 21:22 UTC (permalink / raw)
  To: Yucong Sun; +Cc: Andrii Nakryiko, bpf, Yucong Sun

On Wed, Sep 15, 2021 at 8:26 PM Yucong Sun <fallentree@fb.com> wrote:
>
> From: Yucong Sun <sunyucong@gmail.com>
>
> This patch modify some tests to provide serial_test_name() instead of
> test_name() to indicate it must run on worker 0. On encountering these tests,
> all other threads will wait on a conditional variable, which worker 0 will
> signal once the tests has finished running.
>
> Additionally, before running the test, thread 0 also check and wait until all
> other threads has finished their current work, to make sure the pinned test
> really are the only test running in the system.
>
> After this change, all tests should pass in '-j' mode.
>
> Signed-off-by: Yucong Sun <sunyucong@gmail.com>
> ---

I don't like this approach, it's over-complicated. I see two ways to
do this more simple:

1) Let main process run all the serial tests before even instantiating
workers, then run parallel tests with worker. The benefit of this is
that we can structure code to have main testing loop that in
sequential mode will run both parallel and serial tests, while in
parallel mode will run only serial tests.

2) Teach worker 0 to run all serial tests one by one on start, before
any parallel tests are run. Then we need to teach other workers to
wait for it or teach main process to wait for worker 0 to finish
before dispatching work to worker 1+. I think this is more convoluted
and complicated.

I certainly prefer #1. Additional benefit is that the worker and
server's code would need to work consistently with all the data
structured (preserving error logs until the end), etc. So it's a good
test and forcing function to unify parallel and sequential modes.

WDYT?

>  .../selftests/bpf/prog_tests/bpf_obj_id.c     |   2 +-
>  .../bpf/prog_tests/select_reuseport.c         |   2 +-
>  .../testing/selftests/bpf/prog_tests/timer.c  |   2 +-
>  .../selftests/bpf/prog_tests/xdp_bonding.c    |   2 +-
>  .../selftests/bpf/prog_tests/xdp_link.c       |   2 +-
>  tools/testing/selftests/bpf/test_progs.c      | 112 ++++++++++++++----
>  6 files changed, 95 insertions(+), 27 deletions(-)
>

[...]

> @@ -954,15 +969,42 @@ void *dispatch_thread(void *ctx)
>
>                         test = &prog_test_defs[current_test_idx];
>                         test_to_run = current_test_idx;
> -                       current_test_idx++;
>
> -                       pthread_mutex_unlock(&current_test_lock);
> -               }
> +                       test = &prog_test_defs[test_to_run];

that's the same test as current_test_idx above?...

>
> -               if (!test->should_run) {
> -                       continue;
> -               }
> +                       if (!test->should_run) {
> +                               current_test_idx++;
> +                               pthread_mutex_unlock(&current_test_lock);
> +                               goto next;
> +                       }
> +
> +                       if (is_serial_test(current_test_idx)) {
> +                               if (data->worker_id != 0) {
> +                                       if (env.debug)
> +                                               fprintf(stderr, "[%d]: Waiting for thread 0 to finish serialized test: %d.\n",
> +                                                       data->worker_id, current_test_idx + 1);
> +                                       /* wait for worker 0 to pick this job up and finish */
> +                                       pthread_cond_wait(&wait_for_worker0, &current_test_lock);
> +                                       pthread_mutex_unlock(&current_test_lock);
> +                                       goto next;
> +                               } else {
> +                                       /* wait until all other worker has parked */
> +                                       for (int i = 1; i < env.workers; i++) {
> +                                               if (env.worker_current_test[i] != -1) {
> +                                                       if (env.debug)
> +                                                               fprintf(stderr, "[%d]: Waiting for other threads to finish current test...\n", data->worker_id);
> +                                                       pthread_mutex_unlock(&current_test_lock);
> +                                                       usleep(1 * 1000 * 1000);


hm... I wonder if this contributes to those 20 seconds run time even
for very fast tests...

> +                                                       goto next;
> +                                               }
> +                                       }
> +                               }
> +                       } else {
> +                               current_test_idx++;
> +                       }

[...]

> +       while (!all_finished) {
> +               all_finished = true;
> +               for (int i = 0; i < env.workers; i++) {
> +                       if (!dispatcher_threads[i])
> +                               continue;
> +
> +                       if (pthread_tryjoin_np(dispatcher_threads[i], NULL) == EBUSY) {
> +                               all_finished = false;
> +                               if (!env.debug) continue;
> +                               if (env.worker_current_test[i] == -1)
> +                                       fprintf(stderr, "Still waiting for thread %d (blocked by thread 0).\n", i);
> +                               else
> +                                       fprintf(stderr, "Still waiting for thread %d (test #%d:%s).\n",
> +                                               i, env.worker_current_test[i] + 1,
> +                                               get_test_name(env.worker_current_test[i]));
> +                       } else {
> +                               dispatcher_threads[i] = 0;
> +                       }
>                 }
> +               usleep(10 * 1000 * 1000);

and here you have 10 seconds just waiting doing nothing...

>         }
> +
>         free(dispatcher_threads);
>         free(env.worker_current_test);
>         free(data);
> @@ -1326,6 +1388,12 @@ int main(int argc, char **argv)
>                         test->should_run = true;
>                 else
>                         test->should_run = false;
> +
> +               if (test->run_test == NULL && test->run_serial_test == NULL) {
> +                       fprintf(stderr, "Test %d:%s must have either test_%s() or serial_test_%sl() defined.\n",

but not both, so let's check !!test->run_test ==
!!test->run_serial_test to make sure that only one is specified

> +                               test->test_num, test->test_name, test->test_name, test->test_name);
> +                       exit(EXIT_ERR_SETUP_INFRA);
> +               }
>         }
>
>         /* ignore workers if we are just listing */
> --
> 2.30.2
>

^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2021-09-17 21:22 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-09-16  3:26 [PATCH v5 bpf-next 0/3] selftests/bpf: Add parallelism to test_progs Yucong Sun
2021-09-16  3:26 ` [PATCH v5 bpf-next 1/3] " Yucong Sun
2021-09-17 18:43   ` Andrii Nakryiko
2021-09-17 18:54     ` Andrii Nakryiko
2021-09-17 19:44   ` Andrii Nakryiko
2021-09-16  3:26 ` [PATCH v5 bpf-next 2/3] selftests/bpf: add per worker cgroup suffix Yucong Sun
2021-09-17 21:10   ` Andrii Nakryiko
2021-09-16  3:26 ` [PATCH v5 bpf-next 3/3] selftests/bpf: pin some tests to worker 0 Yucong Sun
2021-09-17 21:22   ` Andrii Nakryiko

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).