* [RFC 1/1] selftests/bpf: Add parallelism to test_progs
2021-08-27 23:13 [RFC 0/1] add parallelism to test_progs Yucong Sun
@ 2021-08-27 23:13 ` Yucong Sun
2021-08-31 3:37 ` Andrii Nakryiko
2021-08-31 4:03 ` [RFC 0/1] add " Andrii Nakryiko
1 sibling, 1 reply; 5+ messages in thread
From: Yucong Sun @ 2021-08-27 23:13 UTC (permalink / raw)
To: andrii; +Cc: sunyucong, bpf
From: Yucong Sun <sunyucong@gmail.com>
This patch adds "-p" parameter to test_progs, which will spawn workers and
distribute tests evenly among all workers, speeding up execution.
"-p" mode is optional, and works with all existing test selection mechanism,
including "-l".
Each worker print its own summary and exit with its own status, the main
process will collect all status together and exit with a overall status.
---
tools/testing/selftests/bpf/test_progs.c | 94 ++++++++++++++++++++++--
tools/testing/selftests/bpf/test_progs.h | 3 +
2 files changed, 91 insertions(+), 6 deletions(-)
diff --git a/tools/testing/selftests/bpf/test_progs.c b/tools/testing/selftests/bpf/test_progs.c
index cc1cd240445d..740698360526 100644
--- a/tools/testing/selftests/bpf/test_progs.c
+++ b/tools/testing/selftests/bpf/test_progs.c
@@ -474,6 +474,7 @@ enum ARG_KEYS {
ARG_LIST_TEST_NAMES = 'l',
ARG_TEST_NAME_GLOB_ALLOWLIST = 'a',
ARG_TEST_NAME_GLOB_DENYLIST = 'd',
+ ARG_NUM_WORKERS = 'p',
};
static const struct argp_option opts[] = {
@@ -494,7 +495,9 @@ static const struct argp_option opts[] = {
{ "allow", ARG_TEST_NAME_GLOB_ALLOWLIST, "NAMES", 0,
"Run tests with name matching the pattern (supports '*' wildcard)." },
{ "deny", ARG_TEST_NAME_GLOB_DENYLIST, "NAMES", 0,
- "Don't run tests with name matching the pattern (supports '*' wildcard)." },
+ "Don't run tests with name matching the pattern (supports '*' wildcard)." },
+ { "workers", ARG_NUM_WORKERS, "WORKERS", 0,
+ "Number of workers to run in parallel, default to 1." },
{},
};
@@ -661,6 +664,13 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
case ARG_LIST_TEST_NAMES:
env->list_test_names = true;
break;
+ case ARG_NUM_WORKERS:
+ env->workers = atoi(arg);
+ if (!env->workers) {
+ fprintf(stderr, "Invalid worker number, must be bigger than 1.");
+ return -1;
+ }
+ break;
case ARGP_KEY_ARG:
argp_usage(state);
break;
@@ -694,6 +704,10 @@ static void stdio_hijack(void)
}
stderr = stdout;
+
+ /* force line buffering on stdio, so they interleave naturally. */
+ setvbuf(stdout, NULL, _IOLBF, 8192);
+ setvbuf(stderr, NULL, _IOLBF, 8192);
#endif
}
@@ -798,14 +812,74 @@ int main(int argc, char **argv)
return -1;
}
- save_netns();
- stdio_hijack();
env.has_testmod = true;
if (!env.list_test_names && load_bpf_testmod()) {
fprintf(env.stderr, "WARNING! Selftests relying on bpf_testmod.ko will be skipped.\n");
env.has_testmod = false;
}
+
+ /* launch workers if requested */
+ env.worker_index = -1; /* main process */
+ if (env.workers) {
+ env.worker_pids = calloc(sizeof(__pid_t), env.workers);
+ fprintf(stdout, "Launching %d workers.\n", env.workers);
+ for(int i = 0; i < env.workers; i++) {
+ __pid_t pid = fork();
+ if (pid < 0) {
+ perror("Failed to fork worker");
+ return -1;
+ } else if (pid != 0) {
+ env.worker_pids[i] = pid;
+ } else {
+ env.worker_index = i;
+ fprintf(stdout, "[%d] worker launched with pid %d.\n", i, getpid());
+ break;
+ }
+ }
+ }
+
+ /* If we have worker, here is the rest of the main process */
+ if (env.workers && env.worker_index == -1) {
+ int abnormal_exit_cnt = 0;
+ for(int i = 0; i < env.workers; i++) {
+ int status;
+ assert(waitpid(env.worker_pids[i], &status, 0) == env.worker_pids[i]);
+ if (WIFEXITED(status)) {
+ switch (WEXITSTATUS(status)) {
+ case EXIT_SUCCESS:
+ env.succ_cnt++;
+ break;
+ case EXIT_FAILURE:
+ env.fail_cnt++;
+ break;
+ case EXIT_NO_TEST:
+ env.skip_cnt++;
+ break;
+ }
+ } else {
+ abnormal_exit_cnt++;
+ env.fail_cnt++;
+ }
+ }
+ fprintf(stdout, "Worker Summary: %d SUCCESS, %d FAILED, %d IDLE",
+ env.succ_cnt, env.fail_cnt, env.skip_cnt);
+ fprintf(stdout, "\n");
+
+ goto main_out;
+ }
+
+ /* no worker, or inside each worker process */
+ // sigaction(SIGSEGV, &sigact, NULL); /* set crash handler again */
+
+ save_netns();
+ stdio_hijack();
+
for (i = 0; i < prog_test_cnt; i++) {
+ /* skip tests not assigned to this worker */
+ if (env.workers) {
+ if (i % env.workers != env.worker_index)
+ continue;
+ }
struct prog_test_def *test = &prog_test_defs[i];
env.test = test;
@@ -821,6 +895,8 @@ int main(int argc, char **argv)
}
if (env.list_test_names) {
+ if (env.worker_index != -1)
+ fprintf(env.stdout, "[%d] ", env.worker_index);
fprintf(env.stdout, "%s\n", test->test_name);
env.succ_cnt++;
continue;
@@ -835,6 +911,8 @@ int main(int argc, char **argv)
dump_test_log(test, test->error_cnt);
+ if (env.worker_index != -1)
+ fprintf(env.stdout, "[%d] ", env.worker_index);
fprintf(env.stdout, "#%d %s:%s\n",
test->test_num, test->test_name,
test->error_cnt ? "FAIL" : (test->skip_cnt ? "SKIP" : "OK"));
@@ -850,8 +928,6 @@ int main(int argc, char **argv)
if (test->need_cgroup_cleanup)
cleanup_cgroup_environment();
}
- if (!env.list_test_names && env.has_testmod)
- unload_bpf_testmod();
stdio_restore();
if (env.get_test_cnt) {
@@ -862,17 +938,23 @@ int main(int argc, char **argv)
if (env.list_test_names)
goto out;
+ if (env.worker_index != -1)
+ fprintf(env.stdout, "[%d] ", env.worker_index);
fprintf(stdout, "Summary: %d/%d PASSED, %d SKIPPED, %d FAILED\n",
env.succ_cnt, env.sub_succ_cnt, env.skip_cnt, env.fail_cnt);
out:
+ close(env.saved_netns_fd);
+main_out:
+ if (env.worker_index == -1)
+ if (!env.list_test_names && env.has_testmod)
+ unload_bpf_testmod();
free_str_set(&env.test_selector.blacklist);
free_str_set(&env.test_selector.whitelist);
free(env.test_selector.num_set);
free_str_set(&env.subtest_selector.blacklist);
free_str_set(&env.subtest_selector.whitelist);
free(env.subtest_selector.num_set);
- close(env.saved_netns_fd);
if (env.succ_cnt + env.fail_cnt + env.skip_cnt == 0)
return EXIT_NO_TEST;
diff --git a/tools/testing/selftests/bpf/test_progs.h b/tools/testing/selftests/bpf/test_progs.h
index c8c2bf878f67..c55274a3b767 100644
--- a/tools/testing/selftests/bpf/test_progs.h
+++ b/tools/testing/selftests/bpf/test_progs.h
@@ -82,6 +82,9 @@ struct test_env {
int skip_cnt; /* skipped tests */
int saved_netns_fd;
+ int workers; /* number of worker process */
+ __pid_t *worker_pids; /* array of worker pids */
+ int worker_index;
};
extern struct test_env env;
--
2.30.2
^ permalink raw reply related [flat|nested] 5+ messages in thread
* Re: [RFC 0/1] add parallelism to test_progs
2021-08-27 23:13 [RFC 0/1] add parallelism to test_progs Yucong Sun
2021-08-27 23:13 ` [RFC 1/1] selftests/bpf: Add " Yucong Sun
@ 2021-08-31 4:03 ` Andrii Nakryiko
1 sibling, 0 replies; 5+ messages in thread
From: Andrii Nakryiko @ 2021-08-31 4:03 UTC (permalink / raw)
To: Yucong Sun; +Cc: Andrii Nakryiko, sunyucong, bpf
On Fri, Aug 27, 2021 at 4:13 PM Yucong Sun <fallentree@fb.com> wrote:
>
> This patch added a optional "-p" to test_progs to run tests in multiple
> process, speeding up the tests.
>
> Example:
>
> time ./test_progs
> real 5m51.393s
> user 0m4.695s
> sys 5m48.055s
>
> time ./test_progs -p 16 (on a 8 core vm)
> real 3m45.673s
> user 0m4.434s
> sys 5m47.465s
>
> The feedback area I'm looking for :
>
> 1.Some tests are taking too long to run (for example:
> bpf_verif_scale/pyperf* takes almost 80% of the total runtime). If we
> need a work-stealing pool mechanism it would be a bigger change.
Seems like you did just a static assignment based on worker number and
test number in this RFC. I think that's way too simplistic to work
well in practice. But I don't think we need a work stealing queue
either (not any explicit queue at all).
I'd rather go with a simple client/server model, where the server is
the main process which does all the coordination. It would "dispense"
task to each forked worker one by one, wait for that test to complete,
accumulating test's output in per-worker temporary output. If we are
running in verbose mode or a test failed, output accumulated logs. If
not verbose and test is successful, just emit a summary with test name
and OK message and discard accumulated output. I think we can easily
extend this to support running multiple sub-tests on *different*
workers, "breaking up" and scaling that bpf_verif_scale test nicely.
But that could be a pretty easy step #2 after the whole client/server
machinery is setup.
Look into Unix domain sockets (UDS). But not the SOCK_STREAM kind,
rather SOCK_DGRAM. UDS allows to establish bi-directional connection
between server and worker. And it preserves packet boundaries, so you
don't have TCP stream problems of delineating boundaries of logical
packets. And it preserves ordering between packets. All great
properties. With this we can set up client/server communication with a
very simple protocol:
1. Server sends "RUN_TEST" command, specifying the number of the test
to execute by the worker.
2. Worker sends back "TEST_COMPLETED" command with the test number,
test result (success, failure, skipped), and, optionally, console
output.
3. Repeat #1-#2 as many times as needed.
4. Server sends "SHUTDOWN" command and worker exits.
(Well, probably we need a bit more flexibility to report sub-test
successes, so maybe worker will have two possible messages:
SUBTEST_COMPLETED and TEST_COMPLETED, or something along those lines).
On the server side, we can use as suboptimal and simplistic locking
scheme as possible to coordinate everything. It's probably simplest to
have a thread per worker that would take global lock to take the next
test to run (just i++, but under lock). And just remember all the
statuses (and error outputs, for dumping failed tests details).
Some refactoring will be needed to make existing code work in both
non-parallelized and parallelized modes with minimal amount of
changes, but this seems simple enough.
>
> 2. The tests output from workers are currently interleaved from all
> workers, making it harder to read, one option would be redirect all
> outputs onto pipes and have main process collect and print in sequence
> for each worker finish, but that will make seeing real time progress
> harder.
Yeah, I don't think that's acceptable. Good news is that we needed
some more complexity to hold onto test output until the very end for
error summary reporting anyway.
>
> 3. If main process want to collect tests results from worker, I plan
> to have each worker writes a stats file to /tmp, or I can use IPC, any
> preference?
See above, I think UDS is the way to go.
>
> 4. Some tests would fail if run in parallel, I think we would need to
> pin some tasks onto worker 0.
Yeah, we can mark such tests with some special naming convention
(e.g., to test_blahblah_noparallel) and run them sequentially.
>
> Yucong Sun (1):
> selftests/bpf: Add parallelism to test_progs
>
> tools/testing/selftests/bpf/test_progs.c | 94 ++++++++++++++++++++++--
> tools/testing/selftests/bpf/test_progs.h | 3 +
> 2 files changed, 91 insertions(+), 6 deletions(-)
>
> --
> 2.30.2
>
^ permalink raw reply [flat|nested] 5+ messages in thread