All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH v2 bpf-next] selftests/bpf: Add parallelism to test_progs
@ 2021-09-09 19:31 Yucong Sun
  0 siblings, 0 replies; only message in thread
From: Yucong Sun @ 2021-09-09 19:31 UTC (permalink / raw)
  To: andrii; +Cc: bpf, Yucong Sun

From: Yucong Sun <sunyucong@gmail.com>

This patch adds "-j" mode to test_progs, executing tests in multiple process.
"-j" mode is optional, and works with all existing test selection mechanism, as
well as "-v", "-l" etc.

In "-j" mode, main process use UDS/DGRAM to communicate to each forked worker,
commanding it to run tests and collect logs. After all tests are finished, a
summary is printed. main process use multiple competing threads to dispatch
work to worker, trying to keep them busy.

Example output:

./test_progs -n 15-20 -j
[    8.584709] bpf_testmod: loading out-of-tree module taints kernel.
Launching 2 workers.
[0]: Running test 15.
[1]: Running test 16.
[1]: Running test 17.
[1]: Running test 18.
[1]: Running test 19.
[1]: Running test 20.
[1]: worker exit.
[0]: worker exit.
Summary: 6/20 PASSED, 0 SKIPPED, 0 FAILED

Know issue:

some tests fail when running concurrently, later patch will either
fix the test or pin them to worker 0.

Signed-off-by: Yucong Sun <sunyucong@gmail.com>

V2 -> V1: switch to UDS client/server model.
---
 tools/testing/selftests/bpf/test_progs.c | 456 ++++++++++++++++++++++-
 tools/testing/selftests/bpf/test_progs.h |  36 +-
 2 files changed, 478 insertions(+), 14 deletions(-)

diff --git a/tools/testing/selftests/bpf/test_progs.c b/tools/testing/selftests/bpf/test_progs.c
index cc1cd240445d..8204dd9aa657 100644
--- a/tools/testing/selftests/bpf/test_progs.c
+++ b/tools/testing/selftests/bpf/test_progs.c
@@ -12,6 +12,11 @@
 #include <string.h>
 #include <execinfo.h> /* backtrace */
 #include <linux/membarrier.h>
+#include <sys/sysinfo.h> /* get_nprocs */
+#include <netinet/in.h>
+#include <sys/select.h>
+#include <sys/socket.h>
+#include <sys/un.h>
 
 /* Adapted from perf/util/string.c */
 static bool glob_match(const char *str, const char *pat)
@@ -48,6 +53,7 @@ struct prog_test_def {
 	bool force_log;
 	int error_cnt;
 	int skip_cnt;
+	int sub_succ_cnt;
 	bool tested;
 	bool need_cgroup_cleanup;
 
@@ -97,6 +103,10 @@ static void dump_test_log(const struct prog_test_def *test, bool failed)
 	if (stdout == env.stdout)
 		return;
 
+	/* worker always holds log */
+	if (env.worker_index != -1)
+		return;
+
 	fflush(stdout); /* exports env.log_buf & env.log_cnt */
 
 	if (env.verbosity > VERBOSE_NONE || test->force_log || failed) {
@@ -172,14 +182,14 @@ void test__end_subtest()
 
 	dump_test_log(test, sub_error_cnt);
 
-	fprintf(env.stdout, "#%d/%d %s/%s:%s\n",
+	fprintf(stdout, "#%d/%d %s/%s:%s\n",
 	       test->test_num, test->subtest_num, test->test_name, test->subtest_name,
 	       sub_error_cnt ? "FAIL" : (test->skip_cnt ? "SKIP" : "OK"));
 
 	if (sub_error_cnt)
-		env.fail_cnt++;
+		test->error_cnt++;
 	else if (test->skip_cnt == 0)
-		env.sub_succ_cnt++;
+		test->sub_succ_cnt++;
 	skip_account();
 
 	free(test->subtest_name);
@@ -474,6 +484,7 @@ enum ARG_KEYS {
 	ARG_LIST_TEST_NAMES = 'l',
 	ARG_TEST_NAME_GLOB_ALLOWLIST = 'a',
 	ARG_TEST_NAME_GLOB_DENYLIST = 'd',
+	ARG_NUM_WORKERS = 'j',
 };
 
 static const struct argp_option opts[] = {
@@ -495,6 +506,8 @@ static const struct argp_option opts[] = {
 	  "Run tests with name matching the pattern (supports '*' wildcard)." },
 	{ "deny", ARG_TEST_NAME_GLOB_DENYLIST, "NAMES", 0,
 	  "Don't run tests with name matching the pattern (supports '*' wildcard)." },
+	{ "workers", ARG_NUM_WORKERS, "WORKERS", OPTION_ARG_OPTIONAL,
+	  "Number of workers to run in parallel, default to number of cpus." },
 	{},
 };
 
@@ -661,6 +674,17 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
 	case ARG_LIST_TEST_NAMES:
 		env->list_test_names = true;
 		break;
+	case ARG_NUM_WORKERS:
+		if (arg) {
+			env->workers = atoi(arg);
+			if (!env->workers) {
+				fprintf(stderr, "Invalid number of worker: %s.", arg);
+				return -1;
+			}
+		} else {
+			env->workers = get_nprocs();
+		}
+		break;
 	case ARGP_KEY_ARG:
 		argp_usage(state);
 		break;
@@ -678,7 +702,7 @@ static void stdio_hijack(void)
 	env.stdout = stdout;
 	env.stderr = stderr;
 
-	if (env.verbosity > VERBOSE_NONE) {
+	if (env.verbosity > VERBOSE_NONE && env.worker_index == -1) {
 		/* nothing to do, output to stdout by default */
 		return;
 	}
@@ -704,10 +728,6 @@ static void stdio_restore(void)
 		return;
 
 	fclose(stdout);
-	free(env.log_buf);
-
-	env.log_buf = NULL;
-	env.log_cnt = 0;
 
 	stdout = env.stdout;
 	stderr = env.stderr;
@@ -760,6 +780,364 @@ void crash_handler(int signum)
 	backtrace_symbols_fd(bt, sz, STDERR_FILENO);
 }
 
+int current_test_idx = -1;
+pthread_mutex_t current_test_lock;
+
+struct test_result {
+	int error_cnt;
+	int skip_cnt;
+	int sub_succ_cnt;
+
+	size_t log_cnt;
+	char *log_buf;
+};
+struct test_result test_results[ARRAY_SIZE(prog_test_defs)];
+
+
+static inline const char *str_msg(const struct message *msg)
+{
+	static char buf[255];
+
+	switch (msg->type) {
+	case MSG_DO_TEST:
+		sprintf(buf, "MSG_DO_TEST %d", msg->u.message_do_test.num);
+		break;
+	case MSG_TEST_DONE:
+		sprintf(buf, "MSG_TEST_DONE %d (log: %d)",
+			       msg->u.message_test_done.num,
+			       msg->u.message_test_done.have_log);
+		break;
+	case MSG_TEST_LOG:
+		sprintf(buf, "MSG_TEST_LOG (cnt: %ld, last: %d)",
+			strlen(msg->u.message_test_log.log_buf),
+			msg->u.message_test_log.is_last);
+		break;
+	case MSG_EXIT:
+		sprintf(buf, "MSG_EXIT");
+		break;
+	default:
+		sprintf(buf, "UNKNOWN");
+		break;
+	}
+
+	return buf;
+}
+
+int send_message(int sock, const struct message *msg)
+{
+	if (env.verbosity > VERBOSE_NONE)
+		fprintf(stderr, "Sending msg: %s\n", str_msg(msg));
+	return send(sock, msg, sizeof(*msg), 0);
+}
+
+int recv_message(int sock, struct message *msg)
+{
+	int ret;
+
+	memset(msg, 0, sizeof(*msg));
+	ret = recv(sock, msg, sizeof(*msg), 0);
+	if (ret >= 0) {
+		if (env.verbosity > VERBOSE_NONE)
+			fprintf(stderr, "Received msg: %s\n", str_msg(msg));
+	}
+	return ret;
+}
+
+struct dispatch_data {
+	int idx;
+	int sock;
+};
+
+void* dispatch_thread(void *_data)
+{
+	struct dispatch_data *data;
+	int sock;
+	FILE *log_fd = NULL;
+
+	data = (struct dispatch_data *)_data;
+	sock = data->sock;
+
+	while (true) {
+		int test_to_run = -1;
+		struct prog_test_def *test;
+		struct test_result *result;
+
+		/* grab a test */
+		{
+			pthread_mutex_lock(&current_test_lock);
+
+			if (current_test_idx >= prog_test_cnt) {
+				pthread_mutex_unlock(&current_test_lock);
+				goto done;
+			}
+			test_to_run = current_test_idx;
+			current_test_idx++;
+
+			pthread_mutex_unlock(&current_test_lock);
+		}
+
+		test = &prog_test_defs[test_to_run];
+		test->test_num = test_to_run + 1;
+
+		if (!should_run(&env.test_selector,
+				test->test_num, test->test_name))
+			continue;
+
+		/* run test through worker */
+		{
+			struct message msg_do_test;
+
+			msg_do_test.type = MSG_DO_TEST;
+			msg_do_test.u.message_do_test.num = test_to_run;
+			if (send_message(sock, &msg_do_test) < 0) {
+				perror("Fail to send command");
+				goto done;
+			}
+			env.worker_current_test[data->idx] = test_to_run;
+		}
+
+		/* wait for test done */
+		{
+			struct message msg_test_done;
+
+			if (recv_message(sock, &msg_test_done) < 0)
+				goto error;
+			if (msg_test_done.type != MSG_TEST_DONE)
+				goto error;
+			if (test_to_run != msg_test_done.u.message_test_done.num)
+				goto error;
+
+			result = &test_results[test_to_run];
+
+			test->tested = true;
+
+			result->error_cnt = msg_test_done.u.message_test_done.error_cnt;
+			result->skip_cnt = msg_test_done.u.message_test_done.skip_cnt;
+			result->sub_succ_cnt = msg_test_done.u.message_test_done.sub_succ_cnt;
+
+			/* collect all logs */
+			if (msg_test_done.u.message_test_done.have_log) {
+				log_fd = open_memstream(&result->log_buf, &result->log_cnt);
+				if (!log_fd)
+					goto error;
+
+				while (true) {
+					struct message msg_log;
+
+					if (recv_message(sock, &msg_log) < 0)
+						goto error;
+					if (msg_log.type != MSG_TEST_LOG)
+						goto error;
+
+					fprintf(log_fd, "%s", msg_log.u.message_test_log.log_buf);
+					if (msg_log.u.message_test_log.is_last)
+						break;
+				}
+				fclose(log_fd);
+				log_fd = NULL;
+			}
+		}
+	} /* while (true) */
+error:
+	fprintf(stderr, "[%d]: Protocol/IO error: %s", data->idx, strerror(errno));
+
+	if (log_fd)
+		fclose(log_fd);
+done:
+	{
+		struct message msg_exit;
+
+		msg_exit.type = MSG_EXIT;
+		if (send_message(sock, &msg_exit) < 0)
+			fprintf(stderr, "[%d]: send_message msg_exit: %s", data->idx, strerror(errno));
+	}
+	return NULL;
+}
+
+int server_main(void)
+{
+	pthread_t *dispatcher_threads;
+	struct dispatch_data *data;
+
+	dispatcher_threads = calloc(sizeof(pthread_t), env.workers);
+	data = calloc(sizeof(struct dispatch_data), env.workers);
+
+	env.worker_current_test = calloc(sizeof(int), env.workers);
+	for (int i = 0; i < env.workers; i++) {
+		int rc;
+
+		data[i].idx = i;
+		data[i].sock = env.worker_socks[i];
+		rc = pthread_create(&dispatcher_threads[i], NULL, dispatch_thread, &data[i]);
+		if (rc < 0) {
+			perror("Failed to launch dispatcher thread");
+			return -1;
+		}
+	}
+
+	/* wait for all dispatcher to finish */
+	for (int i = 0; i < env.workers; i++) {
+		while (true) {
+			struct timespec timeout = {
+				.tv_sec = time(NULL) + 5,
+				.tv_nsec = 0
+			};
+			if (pthread_timedjoin_np(dispatcher_threads[i], NULL, &timeout) != ETIMEDOUT)
+				break;
+			fprintf(stderr, "Still waiting for thread %d (test %d).\n", i,  env.worker_current_test[i] + 1);
+		}
+	}
+	free(dispatcher_threads);
+	free(env.worker_current_test);
+	free(data);
+
+	/* generate summary */
+	fflush(stderr);
+	fflush(stdout);
+
+	for (int i = 0; i < prog_test_cnt; i++) {
+		struct prog_test_def *current_test;
+		struct test_result *result;
+
+		current_test = &prog_test_defs[i];
+		result = &test_results[i];
+
+		if (!current_test->tested)
+			continue;
+
+		env.succ_cnt += result->error_cnt ? 0 : 1;
+		env.skip_cnt += result->skip_cnt;
+		env.fail_cnt += result->error_cnt;
+		env.sub_succ_cnt += result->sub_succ_cnt;
+
+		if (result->log_cnt) {
+			result->log_buf[result->log_cnt] = '\0';
+			fprintf(stdout, "%s", result->log_buf);
+			if (result->log_buf[result->log_cnt - 1] != '\n')
+				fprintf(stdout, "\n");
+		}
+		fprintf(stdout, "#%d %s:%s\n",
+			current_test->test_num, current_test->test_name,
+			result->error_cnt ? "FAIL" : (result->skip_cnt ? "SKIP" : "OK"));
+	}
+
+	fprintf(stdout, "Summary: %d/%d PASSED, %d SKIPPED, %d FAILED\n",
+		env.succ_cnt, env.sub_succ_cnt, env.skip_cnt, env.fail_cnt);
+
+	/* reap all workers */
+	for (int i = 0; i < env.workers; i++) {
+		int wstatus, pid;
+
+		pid = waitpid(env.worker_pids[i], &wstatus, 0);
+		assert(pid == env.worker_pids[i]);
+	}
+
+	return 0;
+}
+
+int worker_main(int sock)
+{
+	save_netns();
+
+	while (true) {
+		/* receive command */
+		struct message msg;
+
+		assert(recv_message(sock, &msg) >= 0);
+
+		switch (msg.type) {
+		case MSG_EXIT:
+			fprintf(stderr, "[%d]: worker exit.\n",  env.worker_index);
+			goto out;
+		case MSG_DO_TEST: {
+			int test_to_run = msg.u.message_do_test.num;
+
+			fprintf(stderr, "[%d]: Running test %d.\n",
+				env.worker_index, test_to_run + 1);
+
+			struct prog_test_def *test =
+				&prog_test_defs[test_to_run];
+
+			env.test = test;
+			test->test_num = test_to_run + 1;
+
+			stdio_hijack();
+
+			test->run_test();
+
+			/* ensure last sub-test is finalized properly */
+			if (test->subtest_name)
+				test__end_subtest();
+
+			stdio_restore();
+
+			test->tested = true;
+
+			skip_account();
+
+			reset_affinity();
+			restore_netns();
+			if (test->need_cgroup_cleanup)
+				cleanup_cgroup_environment();
+
+			struct message msg_done;
+
+			msg_done.type = MSG_TEST_DONE;
+			msg_done.u.message_test_done.num = test_to_run;
+			msg_done.u.message_test_done.error_cnt = test->error_cnt;
+			msg_done.u.message_test_done.skip_cnt = test->skip_cnt;
+			msg_done.u.message_test_done.sub_succ_cnt = test->sub_succ_cnt;
+			msg_done.u.message_test_done.have_log = false;
+
+			if (env.verbosity > VERBOSE_NONE || test->force_log || test->error_cnt) {
+				if (env.log_cnt)
+					msg_done.u.message_test_done.have_log = true;
+			}
+			assert(send_message(sock, &msg_done) >= 0);
+
+			/* send logs */
+			if (msg_done.u.message_test_done.have_log) {
+				char *src;
+				size_t slen;
+
+				src = env.log_buf;
+				slen = env.log_cnt;
+				while (slen) {
+					struct message msg_log;
+					char *dest;
+					size_t len;
+
+					memset(&msg_log, 0, sizeof(msg_log));
+					msg_log.type = MSG_TEST_LOG;
+					dest = msg_log.u.message_test_log.log_buf;
+					len = slen >= MAX_LOG_TRUNK_SIZE ? MAX_LOG_TRUNK_SIZE : slen;
+					memcpy(dest, src, len);
+
+					src += len;
+					slen -= len;
+					if (!slen)
+						msg_log.u.message_test_log.is_last = true;
+
+					assert(send_message(sock, &msg_log) >= 0);
+				}
+			}
+			if (env.log_buf) {
+				free(env.log_buf);
+				env.log_buf = NULL;
+				env.log_cnt = 0;
+			}
+			break;
+		} /* case MSG_DO_TEST */
+		default:
+			fprintf(stderr, "[%d]: unknown message.\n",  env.worker_index);
+			return -1;
+		}
+	}
+out:
+	restore_netns();
+	return 0;
+}
+
 int main(int argc, char **argv)
 {
 	static const struct argp argp = {
@@ -798,13 +1176,57 @@ int main(int argc, char **argv)
 		return -1;
 	}
 
-	save_netns();
-	stdio_hijack();
 	env.has_testmod = true;
 	if (!env.list_test_names && load_bpf_testmod()) {
 		fprintf(env.stderr, "WARNING! Selftests relying on bpf_testmod.ko will be skipped.\n");
 		env.has_testmod = false;
 	}
+
+	/* ignore workers if we are just listing */
+	if (env.get_test_cnt || env.list_test_names)
+		env.workers = 0;
+
+	/* launch workers if requested */
+	env.worker_index = -1; /* main process */
+	if (env.workers) {
+		env.worker_pids = calloc(sizeof(__pid_t), env.workers);
+		env.worker_socks = calloc(sizeof(int), env.workers);
+		fprintf(stdout, "Launching %d workers.\n", env.workers);
+		for (int i = 0; i < env.workers; i++) {
+			int sv[2];
+			__pid_t pid;
+
+			if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sv) < 0) {
+				perror("Fail to create worker socket");
+				return -1;
+			}
+			pid = fork();
+			if (pid < 0) {
+				perror("Failed to fork worker");
+				return -1;
+			} else if (pid != 0) { /* main process */
+				close(sv[1]);
+				env.worker_pids[i] = pid;
+				env.worker_socks[i] = sv[0];
+			} else { /* inside each worker process */
+				close(sv[0]);
+				env.worker_index = i;
+				return worker_main(sv[1]);
+			}
+		}
+
+		if (env.worker_index == -1) {
+			server_main();
+			goto out;
+		}
+	}
+
+	/* The rest of the main process */
+
+	/* on single mode */
+	save_netns();
+	stdio_hijack();
+
 	for (i = 0; i < prog_test_cnt; i++) {
 		struct prog_test_def *test = &prog_test_defs[i];
 
@@ -827,6 +1249,7 @@ int main(int argc, char **argv)
 		}
 
 		test->run_test();
+
 		/* ensure last sub-test is finalized properly */
 		if (test->subtest_name)
 			test__end_subtest();
@@ -843,16 +1266,21 @@ int main(int argc, char **argv)
 			env.fail_cnt++;
 		else
 			env.succ_cnt++;
+
 		skip_account();
+		env.sub_succ_cnt += test->sub_succ_cnt;
 
 		reset_affinity();
 		restore_netns();
 		if (test->need_cgroup_cleanup)
 			cleanup_cgroup_environment();
 	}
-	if (!env.list_test_names && env.has_testmod)
-		unload_bpf_testmod();
 	stdio_restore();
+	if (env.log_buf) {
+		free(env.log_buf);
+		env.log_buf = NULL;
+		env.log_cnt = 0;
+	}
 
 	if (env.get_test_cnt) {
 		printf("%d\n", env.succ_cnt);
@@ -865,14 +1293,16 @@ int main(int argc, char **argv)
 	fprintf(stdout, "Summary: %d/%d PASSED, %d SKIPPED, %d FAILED\n",
 		env.succ_cnt, env.sub_succ_cnt, env.skip_cnt, env.fail_cnt);
 
+	close(env.saved_netns_fd);
 out:
+	if (!env.list_test_names && env.has_testmod)
+		unload_bpf_testmod();
 	free_str_set(&env.test_selector.blacklist);
 	free_str_set(&env.test_selector.whitelist);
 	free(env.test_selector.num_set);
 	free_str_set(&env.subtest_selector.blacklist);
 	free_str_set(&env.subtest_selector.whitelist);
 	free(env.subtest_selector.num_set);
-	close(env.saved_netns_fd);
 
 	if (env.succ_cnt + env.fail_cnt + env.skip_cnt == 0)
 		return EXIT_NO_TEST;
diff --git a/tools/testing/selftests/bpf/test_progs.h b/tools/testing/selftests/bpf/test_progs.h
index c8c2bf878f67..085e3580ec08 100644
--- a/tools/testing/selftests/bpf/test_progs.h
+++ b/tools/testing/selftests/bpf/test_progs.h
@@ -69,7 +69,8 @@ struct test_env {
 	bool get_test_cnt;
 	bool list_test_names;
 
-	struct prog_test_def *test;
+	struct prog_test_def *test; /* current running tests */
+
 	FILE *stdout;
 	FILE *stderr;
 	char *log_buf;
@@ -82,6 +83,39 @@ struct test_env {
 	int skip_cnt; /* skipped tests */
 
 	int saved_netns_fd;
+	int workers; /* number of worker process */
+	int worker_index; /* index number of current worker, main process is -1 */
+	__pid_t *worker_pids; /* array of worker pids */
+	int *worker_socks; /* array of worker socks */
+	int *worker_current_test; /* array of current running test for each worker */
+};
+
+#define MAX_LOG_TRUNK_SIZE 8192
+enum message_type {
+	MSG_DO_TEST = 0,
+	MSG_TEST_DONE = 1,
+	MSG_TEST_LOG = 2,
+	MSG_EXIT = 255,
+};
+struct message {
+	enum message_type type;
+	union {
+		struct {
+			int num;
+		} message_do_test;
+		struct {
+			int num;
+
+			int sub_succ_cnt;
+			int error_cnt;
+			int skip_cnt;
+			bool have_log;
+		} message_test_done;
+		struct {
+			char log_buf[MAX_LOG_TRUNK_SIZE + 1];
+			bool is_last;
+		} message_test_log;
+	} u;
 };
 
 extern struct test_env env;
-- 
2.30.2


^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2021-09-09 19:31 UTC | newest]

Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-09-09 19:31 [PATCH v2 bpf-next] selftests/bpf: Add parallelism to test_progs Yucong Sun

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.