linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [RFC PATCH v2 01/10] perf workqueue: threadpool creation and destruction
       [not found] <cover.1627657061.git.rickyman7@gmail.com>
@ 2021-07-30 15:34 ` Riccardo Mancini
  2021-08-07  2:24   ` Namhyung Kim
  2021-07-30 15:34 ` [RFC PATCH v2 02/10] perf tests: add test for workqueue Riccardo Mancini
                   ` (8 subsequent siblings)
  9 siblings, 1 reply; 21+ messages in thread
From: Riccardo Mancini @ 2021-07-30 15:34 UTC (permalink / raw)
  To: Arnaldo Carvalho de Melo
  Cc: Ian Rogers, Namhyung Kim, Peter Zijlstra, Ingo Molnar,
	Mark Rutland, Jiri Olsa, linux-kernel, linux-perf-users,
	Alexey Bayduraev, Riccardo Mancini

The workqueue library is made up by two components:
 - threadpool: handles the lifetime of the threads
 - workqueue: handles work distribution among the threads

This first patch introduces the threadpool, starting from its creation
and destruction functions.
Thread management is based on the prototype from Alexey:
https://lore.kernel.org/lkml/cover.1625227739.git.alexey.v.bayduraev@linux.intel.com/

Each thread in the threadpool executes the same function (aka task)
with a different argument tidx.
Threads use a pair of pipes to communicate with the main process.
The threadpool is static (all threads will be spawned at the same time).
Future work could include making it resizable and adding affinity support
(as in Alexey prototype).

Suggested-by: Alexey Bayduraev <alexey.v.bayduraev@linux.intel.com>
Signed-off-by: Riccardo Mancini <rickyman7@gmail.com>
---
 tools/perf/util/Build                  |   1 +
 tools/perf/util/workqueue/Build        |   1 +
 tools/perf/util/workqueue/threadpool.c | 208 +++++++++++++++++++++++++
 tools/perf/util/workqueue/threadpool.h |  30 ++++
 4 files changed, 240 insertions(+)
 create mode 100644 tools/perf/util/workqueue/Build
 create mode 100644 tools/perf/util/workqueue/threadpool.c
 create mode 100644 tools/perf/util/workqueue/threadpool.h

diff --git a/tools/perf/util/Build b/tools/perf/util/Build
index 2d4fa13041789cd6..c7b09701661c869d 100644
--- a/tools/perf/util/Build
+++ b/tools/perf/util/Build
@@ -180,6 +180,7 @@ perf-$(CONFIG_LIBBABELTRACE) += data-convert-bt.o
 perf-y += data-convert-json.o
 
 perf-y += scripting-engines/
+perf-y += workqueue/
 
 perf-$(CONFIG_ZLIB) += zlib.o
 perf-$(CONFIG_LZMA) += lzma.o
diff --git a/tools/perf/util/workqueue/Build b/tools/perf/util/workqueue/Build
new file mode 100644
index 0000000000000000..8b72a6cd4e2cba0d
--- /dev/null
+++ b/tools/perf/util/workqueue/Build
@@ -0,0 +1 @@
+perf-y += threadpool.o
diff --git a/tools/perf/util/workqueue/threadpool.c b/tools/perf/util/workqueue/threadpool.c
new file mode 100644
index 0000000000000000..0004ce606d5fa73d
--- /dev/null
+++ b/tools/perf/util/workqueue/threadpool.c
@@ -0,0 +1,208 @@
+// SPDX-License-Identifier: GPL-2.0
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include "debug.h"
+#include <asm/bug.h>
+#include <linux/zalloc.h>
+#include <linux/string.h>
+#include <linux/err.h>
+#include <linux/kernel.h>
+#include "threadpool.h"
+
+enum threadpool_status {
+	THREADPOOL_STATUS__STOPPED,		/* no threads */
+	THREADPOOL_STATUS__ERROR,		/* errors */
+	THREADPOOL_STATUS__MAX
+};
+
+struct threadpool {
+	int			nr_threads;	/* number of threads in the pool */
+	struct threadpool_entry	*threads;	/* array of threads in the pool */
+	struct task_struct	*current_task;	/* current executing function */
+	enum threadpool_status	status;		/* current status of the pool */
+};
+
+struct threadpool_entry {
+	int				idx;	/* idx of thread in pool->threads */
+	pid_t				tid;	/* tid of thread */
+	struct threadpool		*pool;	/* parent threadpool */
+	struct {
+		int ack[2];			/* messages from thread (acks) */
+		int cmd[2];			/* messages to thread (commands) */
+	} pipes;
+};
+
+/**
+ * threadpool_entry__init_pipes - initialize all pipes of @thread
+ */
+static void threadpool_entry__init_pipes(struct threadpool_entry *thread)
+{
+	thread->pipes.ack[0] = -1;
+	thread->pipes.ack[1] = -1;
+	thread->pipes.cmd[0] = -1;
+	thread->pipes.cmd[1] = -1;
+}
+
+/**
+ * threadpool_entry__open_pipes - open all pipes of @thread
+ */
+static int threadpool_entry__open_pipes(struct threadpool_entry *thread)
+{
+	if (pipe(thread->pipes.ack)) {
+		pr_debug2("threadpool: failed to create comm pipe 'from': %s\n",
+			strerror(errno));
+		return -ENOMEM;
+	}
+
+	if (pipe(thread->pipes.cmd)) {
+		pr_debug2("threadpool: failed to create comm pipe 'to': %s\n",
+			strerror(errno));
+		close(thread->pipes.ack[0]);
+		thread->pipes.ack[0] = -1;
+		close(thread->pipes.ack[1]);
+		thread->pipes.ack[1] = -1;
+		return -ENOMEM;
+	}
+
+	return 0;
+}
+
+/**
+ * threadpool_entry__close_pipes - close all communication pipes of @thread
+ */
+static void threadpool_entry__close_pipes(struct threadpool_entry *thread)
+{
+	if (thread->pipes.ack[0] != -1) {
+		close(thread->pipes.ack[0]);
+		thread->pipes.ack[0] = -1;
+	}
+	if (thread->pipes.ack[1] != -1) {
+		close(thread->pipes.ack[1]);
+		thread->pipes.ack[1] = -1;
+	}
+	if (thread->pipes.cmd[0] != -1) {
+		close(thread->pipes.cmd[0]);
+		thread->pipes.cmd[0] = -1;
+	}
+	if (thread->pipes.cmd[1] != -1) {
+		close(thread->pipes.cmd[1]);
+		thread->pipes.cmd[1] = -1;
+	}
+}
+
+/**
+ * threadpool__new - create a fixed threadpool with @n_threads threads
+ */
+struct threadpool *threadpool__new(int n_threads)
+{
+	int ret, err, t;
+	struct threadpool *pool = malloc(sizeof(*pool));
+
+	if (!pool) {
+		pr_debug2("threadpool: cannot allocate pool: %s\n",
+			strerror(errno));
+		err = -ENOMEM;
+		goto out_return;
+	}
+
+	if (n_threads <= 0) {
+		pr_debug2("threadpool: invalid number of threads: %d\n",
+			n_threads);
+		err = -EINVAL;
+		goto out_free_pool;
+	}
+
+	pool->nr_threads = n_threads;
+	pool->current_task = NULL;
+
+	pool->threads = calloc(n_threads, sizeof(*pool->threads));
+	if (!pool->threads) {
+		pr_debug2("threadpool: cannot allocate threads: %s\n",
+			strerror(errno));
+		err = -ENOMEM;
+		goto out_free_pool;
+	}
+
+	for (t = 0; t < n_threads; t++) {
+		pool->threads[t].idx = t;
+		pool->threads[t].tid = -1;
+		pool->threads[t].pool = pool;
+		threadpool_entry__init_pipes(&pool->threads[t]);
+	}
+
+	for (t = 0; t < n_threads; t++) {
+		ret = threadpool_entry__open_pipes(&pool->threads[t]);
+		if (ret) {
+			err = -ret;
+			goto out_close_pipes;
+		}
+	}
+
+	pool->status = THREADPOOL_STATUS__STOPPED;
+
+	return pool;
+
+out_close_pipes:
+	for (t = 0; t < n_threads; t++)
+		threadpool_entry__close_pipes(&pool->threads[t]);
+
+	zfree(&pool->threads);
+out_free_pool:
+	free(pool);
+out_return:
+	return ERR_PTR(err);
+}
+
+/**
+ * threadpool__strerror - print message regarding given @err in @pool
+ *
+ * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
+ */
+int threadpool__strerror(struct threadpool *pool __maybe_unused, int err, char *buf, size_t size)
+{
+	char sbuf[STRERR_BUFSIZE], *emsg;
+
+	emsg = str_error_r(err, sbuf, sizeof(sbuf));
+	return scnprintf(buf, size, "Error: %s.\n", emsg);
+}
+
+/**
+ * threadpool__new_strerror - print message regarding @err_ptr
+ *
+ * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
+ */
+int threadpool__new_strerror(struct threadpool *err_ptr, char *buf, size_t size)
+{
+	return threadpool__strerror(err_ptr, PTR_ERR(err_ptr), buf, size);
+}
+
+/**
+ * threadpool__delete - free the @pool and all its resources
+ */
+void threadpool__delete(struct threadpool *pool)
+{
+	int t;
+
+	if (IS_ERR_OR_NULL(pool))
+		return;
+
+	WARN_ON(pool->status != THREADPOOL_STATUS__STOPPED
+		&& pool->status != THREADPOOL_STATUS__ERROR);
+
+	for (t = 0; t < pool->nr_threads; t++)
+		threadpool_entry__close_pipes(&pool->threads[t]);
+
+	zfree(&pool->threads);
+	free(pool);
+}
+
+/**
+ * threadpool__size - get number of threads in the threadpool
+ */
+int threadpool__size(struct threadpool *pool)
+{
+	return pool->nr_threads;
+}
diff --git a/tools/perf/util/workqueue/threadpool.h b/tools/perf/util/workqueue/threadpool.h
new file mode 100644
index 0000000000000000..fb18aa32fb64f671
--- /dev/null
+++ b/tools/perf/util/workqueue/threadpool.h
@@ -0,0 +1,30 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef __WORKQUEUE_THREADPOOL_H
+#define __WORKQUEUE_THREADPOOL_H
+
+struct threadpool;
+struct task_struct;
+
+typedef void (*task_func_t)(int tidx, struct task_struct *task);
+
+struct task_struct {
+	task_func_t fn;
+};
+
+extern struct threadpool *threadpool__new(int n_threads);
+extern void threadpool__delete(struct threadpool *pool);
+
+extern int threadpool__start(struct threadpool *pool);
+extern int threadpool__stop(struct threadpool *pool);
+
+extern int threadpool__execute(struct threadpool *pool, struct task_struct *task);
+extern int threadpool__wait(struct threadpool *pool);
+
+extern int threadpool__size(struct threadpool *pool);
+
+/* Error management */
+#define THREADPOOL_STRERR_BUFSIZE (128+STRERR_BUFSIZE)
+extern int threadpool__strerror(struct threadpool *pool, int err, char *buf, size_t size);
+extern int threadpool__new_strerror(struct threadpool *err_ptr, char *buf, size_t size);
+
+#endif /* __WORKQUEUE_THREADPOOL_H */
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [RFC PATCH v2 02/10] perf tests: add test for workqueue
       [not found] <cover.1627657061.git.rickyman7@gmail.com>
  2021-07-30 15:34 ` [RFC PATCH v2 01/10] perf workqueue: threadpool creation and destruction Riccardo Mancini
@ 2021-07-30 15:34 ` Riccardo Mancini
  2021-07-30 15:34 ` [RFC PATCH v2 03/10] perf workqueue: add threadpool start and stop functions Riccardo Mancini
                   ` (7 subsequent siblings)
  9 siblings, 0 replies; 21+ messages in thread
From: Riccardo Mancini @ 2021-07-30 15:34 UTC (permalink / raw)
  To: Arnaldo Carvalho de Melo
  Cc: Ian Rogers, Namhyung Kim, Peter Zijlstra, Ingo Molnar,
	Mark Rutland, Jiri Olsa, linux-kernel, linux-perf-users,
	Alexey Bayduraev, Riccardo Mancini

It will have subtests testing threadpool and workqueue separately.
This patch only introduces the first subtest, checking that the
threadpool is correctly created and destructed.
This test will be expanded when new functions are added in next
patches.

Signed-off-by: Riccardo Mancini <rickyman7@gmail.com>
---
 tools/perf/tests/Build          |   1 +
 tools/perf/tests/builtin-test.c |   9 +++
 tools/perf/tests/tests.h        |   3 +
 tools/perf/tests/workqueue.c    | 115 ++++++++++++++++++++++++++++++++
 4 files changed, 128 insertions(+)
 create mode 100644 tools/perf/tests/workqueue.c

diff --git a/tools/perf/tests/Build b/tools/perf/tests/Build
index 650aec19d49052ca..eda6c78a37cfbc13 100644
--- a/tools/perf/tests/Build
+++ b/tools/perf/tests/Build
@@ -64,6 +64,7 @@ perf-y += parse-metric.o
 perf-y += pe-file-parsing.o
 perf-y += expand-cgroup.o
 perf-y += perf-time-to-tsc.o
+perf-y += workqueue.o
 
 $(OUTPUT)tests/llvm-src-base.c: tests/bpf-script-example.c tests/Build
 	$(call rule_mkdir)
diff --git a/tools/perf/tests/builtin-test.c b/tools/perf/tests/builtin-test.c
index 5e6242576236325c..2ff5d38ed83a723d 100644
--- a/tools/perf/tests/builtin-test.c
+++ b/tools/perf/tests/builtin-test.c
@@ -360,6 +360,15 @@ static struct test generic_tests[] = {
 		.func = test__perf_time_to_tsc,
 		.is_supported = test__tsc_is_supported,
 	},
+	{
+		.desc = "Test workqueue lib",
+		.func = test__workqueue,
+		.subtest = {
+			.skip_if_fail	= false,
+			.get_nr		= test__workqueue_subtest_get_nr,
+			.get_desc	= test__workqueue_subtest_get_desc,
+		}
+	},
 	{
 		.func = NULL,
 	},
diff --git a/tools/perf/tests/tests.h b/tools/perf/tests/tests.h
index 1100dd55b657b779..9ca67113a7402463 100644
--- a/tools/perf/tests/tests.h
+++ b/tools/perf/tests/tests.h
@@ -127,6 +127,9 @@ int test__parse_metric(struct test *test, int subtest);
 int test__pe_file_parsing(struct test *test, int subtest);
 int test__expand_cgroup_events(struct test *test, int subtest);
 int test__perf_time_to_tsc(struct test *test, int subtest);
+int test__workqueue(struct test *test, int subtest);
+const char *test__workqueue_subtest_get_desc(int subtest);
+int test__workqueue_subtest_get_nr(void);
 
 bool test__bp_signal_is_supported(void);
 bool test__bp_account_is_supported(void);
diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
new file mode 100644
index 0000000000000000..fb0e86390d466677
--- /dev/null
+++ b/tools/perf/tests/workqueue.c
@@ -0,0 +1,115 @@
+// SPDX-License-Identifier: GPL-2.0
+#include <linux/kernel.h>
+#include <linux/err.h>
+#include "tests.h"
+#include "util/debug.h"
+#include "util/workqueue/threadpool.h"
+
+struct threadpool_test_args_t {
+	int pool_size;
+};
+
+static int __threadpool__prepare(struct threadpool **pool, int pool_size)
+{
+	*pool = threadpool__new(pool_size);
+	TEST_ASSERT_VAL("threadpool creation failure", !IS_ERR(*pool));
+	TEST_ASSERT_VAL("threadpool size is wrong",
+			threadpool__size(*pool) == pool_size);
+
+	return TEST_OK;
+}
+
+static int __threadpool__teardown(struct threadpool *pool)
+{
+	threadpool__delete(pool);
+
+	return TEST_OK;
+}
+
+
+static int __test__threadpool(void *_args)
+{
+	struct threadpool_test_args_t *args = _args;
+	struct threadpool *pool;
+	int ret;
+
+	ret = __threadpool__prepare(&pool, args->pool_size);
+	if (ret)
+		goto out;
+
+	ret = __threadpool__teardown(pool);
+	if (ret)
+		goto out;
+
+out:
+	return ret;
+}
+
+static const struct threadpool_test_args_t threadpool_test_args[] = {
+	{
+		.pool_size = 1
+	},
+	{
+		.pool_size = 2
+	},
+	{
+		.pool_size = 4
+	},
+	{
+		.pool_size = 8
+	},
+	{
+		.pool_size = 16
+	}
+};
+
+struct test_case {
+	const char *desc;
+	int (*func)(void *args);
+	void *args;
+	int n_args;
+	int arg_size;
+};
+
+static struct test_case workqueue_testcase_table[] = {
+	{
+		.desc = "Threadpool",
+		.func = __test__threadpool,
+		.args = (void *) threadpool_test_args,
+		.n_args = (int)ARRAY_SIZE(threadpool_test_args),
+		.arg_size = sizeof(struct threadpool_test_args_t)
+	}
+};
+
+
+int test__workqueue(struct test *test __maybe_unused, int i)
+{
+	int j, ret;
+	struct test_case *tc;
+
+	if (i < 0 || i >= (int)ARRAY_SIZE(workqueue_testcase_table))
+		return TEST_FAIL;
+
+	tc = &workqueue_testcase_table[i];
+
+	for (j = 0; j < tc->n_args; j++) {
+		ret = tc->func((void *)((char *)tc->args + (j*tc->arg_size)));
+		if (ret)
+			return ret;
+	}
+
+	return TEST_OK;
+}
+
+
+int test__workqueue_subtest_get_nr(void)
+{
+	return (int)ARRAY_SIZE(workqueue_testcase_table);
+}
+
+const char *test__workqueue_subtest_get_desc(int i)
+{
+	if (i < 0 || i >= (int)ARRAY_SIZE(workqueue_testcase_table))
+		return NULL;
+	return workqueue_testcase_table[i].desc;
+}
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [RFC PATCH v2 03/10] perf workqueue: add threadpool start and stop functions
       [not found] <cover.1627657061.git.rickyman7@gmail.com>
  2021-07-30 15:34 ` [RFC PATCH v2 01/10] perf workqueue: threadpool creation and destruction Riccardo Mancini
  2021-07-30 15:34 ` [RFC PATCH v2 02/10] perf tests: add test for workqueue Riccardo Mancini
@ 2021-07-30 15:34 ` Riccardo Mancini
  2021-08-07  2:43   ` Namhyung Kim
  2021-07-30 15:34 ` [RFC PATCH v2 04/10] perf workqueue: add threadpool execute and wait functions Riccardo Mancini
                   ` (6 subsequent siblings)
  9 siblings, 1 reply; 21+ messages in thread
From: Riccardo Mancini @ 2021-07-30 15:34 UTC (permalink / raw)
  To: Arnaldo Carvalho de Melo
  Cc: Ian Rogers, Namhyung Kim, Peter Zijlstra, Ingo Molnar,
	Mark Rutland, Jiri Olsa, linux-kernel, linux-perf-users,
	Alexey Bayduraev, Riccardo Mancini

This patch adds the start and stop functions, alongside the thread
function.
Each thread will run until a stop signal is received.
Furthermore, start and stop are added to the test.

Thread management is based on the prototype from Alexey:
https://lore.kernel.org/lkml/cover.1625227739.git.alexey.v.bayduraev@linux.intel.com/

Suggested-by: Alexey Bayduraev <alexey.v.bayduraev@linux.intel.com>
Signed-off-by: Riccardo Mancini <rickyman7@gmail.com>
---
 tools/perf/tests/workqueue.c           |  13 +
 tools/perf/util/workqueue/threadpool.c | 328 ++++++++++++++++++++++++-
 tools/perf/util/workqueue/threadpool.h |   9 +
 3 files changed, 347 insertions(+), 3 deletions(-)

diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
index fb0e86390d466677..767e4fb60be4b190 100644
--- a/tools/perf/tests/workqueue.c
+++ b/tools/perf/tests/workqueue.c
@@ -11,16 +11,29 @@ struct threadpool_test_args_t {
 
 static int __threadpool__prepare(struct threadpool **pool, int pool_size)
 {
+	int ret;
+
 	*pool = threadpool__new(pool_size);
 	TEST_ASSERT_VAL("threadpool creation failure", !IS_ERR(*pool));
 	TEST_ASSERT_VAL("threadpool size is wrong",
 			threadpool__size(*pool) == pool_size);
 
+	ret = threadpool__start(*pool);
+	TEST_ASSERT_VAL("threadpool start failure", ret == 0);
+	TEST_ASSERT_VAL("threadpool is not ready", threadpool__is_ready(*pool));
+
 	return TEST_OK;
 }
 
 static int __threadpool__teardown(struct threadpool *pool)
 {
+	int ret;
+
+	ret = threadpool__stop(pool);
+	TEST_ASSERT_VAL("threadpool stop failure", ret == 0);
+	TEST_ASSERT_VAL("stopped threadpool is ready",
+			!threadpool__is_ready(pool));
+
 	threadpool__delete(pool);
 
 	return TEST_OK;
diff --git a/tools/perf/util/workqueue/threadpool.c b/tools/perf/util/workqueue/threadpool.c
index 0004ce606d5fa73d..850ef7e110566536 100644
--- a/tools/perf/util/workqueue/threadpool.c
+++ b/tools/perf/util/workqueue/threadpool.c
@@ -4,20 +4,38 @@
 #include <unistd.h>
 #include <errno.h>
 #include <string.h>
+#include <pthread.h>
+#include <signal.h>
+#include <syscall.h>
 #include "debug.h"
 #include <asm/bug.h>
 #include <linux/zalloc.h>
 #include <linux/string.h>
 #include <linux/err.h>
 #include <linux/kernel.h>
+#include <internal/lib.h>
 #include "threadpool.h"
 
+#ifndef HAVE_GETTID
+static inline pid_t gettid(void)
+{
+	return (pid_t)syscall(__NR_gettid);
+}
+#endif
+
 enum threadpool_status {
 	THREADPOOL_STATUS__STOPPED,		/* no threads */
+	THREADPOOL_STATUS__READY,		/* threads are ready but idle */
 	THREADPOOL_STATUS__ERROR,		/* errors */
 	THREADPOOL_STATUS__MAX
 };
 
+static const char * const threadpool_status_tags[] = {
+	"stopped",
+	"ready",
+	"error"
+};
+
 struct threadpool {
 	int			nr_threads;	/* number of threads in the pool */
 	struct threadpool_entry	*threads;	/* array of threads in the pool */
@@ -35,6 +53,29 @@ struct threadpool_entry {
 	} pipes;
 };
 
+enum threadpool_msg {
+	THREADPOOL_MSG__UNDEFINED = 0,
+	THREADPOOL_MSG__ACK,		/* from th: create and exit ack */
+	THREADPOOL_MSG__WAKE,		/* to th: wake up */
+	THREADPOOL_MSG__STOP,		/* to th: exit */
+	THREADPOOL_MSG__MAX
+};
+
+static const char * const threadpool_msg_tags[] = {
+	"undefined",
+	"ack",
+	"wake",
+	"stop"
+};
+
+static const char * const threadpool_errno_str[] = {
+	"Error calling sigprocmask",
+	"Error receiving message from thread",
+	"Error sending message to thread",
+	"Thread sent unexpected message",
+	"This operation is not allowed in this state"
+};
+
 /**
  * threadpool_entry__init_pipes - initialize all pipes of @thread
  */
@@ -93,6 +134,130 @@ static void threadpool_entry__close_pipes(struct threadpool_entry *thread)
 	}
 }
 
+/**
+ * threadpool__wait_thread - receive ack from thread
+ *
+ * NB: call only from main thread!
+ */
+static int threadpool__wait_thread(struct threadpool_entry *thread)
+{
+	int res;
+	enum threadpool_msg msg = THREADPOOL_MSG__UNDEFINED;
+
+	res = readn(thread->pipes.ack[0], &msg, sizeof(msg));
+	if (res < 0) {
+		pr_debug2("threadpool: failed to recv msg from tid=%d: %s\n",
+		       thread->tid, strerror(errno));
+		return -THREADPOOL_ERROR__READPIPE;
+	}
+	if (msg != THREADPOOL_MSG__ACK) {
+		pr_debug2("threadpool: received unexpected msg from tid=%d: %s\n",
+		       thread->tid, threadpool_msg_tags[msg]);
+		return -THREADPOOL_ERROR__INVALIDMSG;
+	}
+
+	pr_debug2("threadpool: received ack from tid=%d\n", thread->tid);
+
+	return 0;
+}
+
+/**
+ * threadpool__terminate_thread - send stop signal to thread and wait for ack
+ *
+ * NB: call only from main thread!
+ */
+static int threadpool__terminate_thread(struct threadpool_entry *thread)
+{
+	int res;
+	enum threadpool_msg msg = THREADPOOL_MSG__STOP;
+
+	res = writen(thread->pipes.cmd[1], &msg, sizeof(msg));
+	if (res < 0) {
+		pr_debug2("threadpool: error sending stop msg to tid=%d: %s\n",
+			thread->tid, strerror(errno));
+		return -THREADPOOL_ERROR__WRITEPIPE;
+	}
+
+	return threadpool__wait_thread(thread);
+}
+
+/**
+ * threadpool_entry__function - send ack to main thread
+ */
+static int threadpool_entry__send_ack(struct threadpool_entry *thread)
+{
+	enum threadpool_msg msg = THREADPOOL_MSG__ACK;
+	int ret = writen(thread->pipes.ack[1], &msg, sizeof(msg));
+
+	if (ret < 0) {
+		pr_debug("threadpool[%d]: failed to send ack: %s\n",
+			thread->tid, strerror(errno));
+		return -THREADPOOL_ERROR__WRITEPIPE;
+	}
+
+	return 0;
+}
+
+/**
+ * threadpool_entry__recv_cmd - receive command from main thread
+ */
+static int threadpool_entry__recv_cmd(struct threadpool_entry *thread,
+					enum threadpool_msg *cmd)
+{
+	int ret;
+
+	*cmd = THREADPOOL_MSG__UNDEFINED;
+	ret = readn(thread->pipes.cmd[0], cmd, sizeof(*cmd));
+	if (ret < 0) {
+		pr_debug("threadpool[%d]: error receiving command: %s\n",
+			thread->tid, strerror(errno));
+		return -THREADPOOL_ERROR__READPIPE;
+	}
+
+	if (*cmd != THREADPOOL_MSG__WAKE && *cmd != THREADPOOL_MSG__STOP) {
+		pr_debug("threadpool[%d]: received unexpected command: %s\n",
+			thread->tid, threadpool_msg_tags[*cmd]);
+		return -THREADPOOL_ERROR__INVALIDMSG;
+	}
+
+	return 0;
+}
+
+/**
+ * threadpool_entry__function - function running on thread
+ *
+ * This function waits for a signal from main thread to start executing
+ * a task.
+ * On completion, it will go back to sleep, waiting for another signal.
+ * Signals are delivered through pipes.
+ */
+static void *threadpool_entry__function(void *args)
+{
+	struct threadpool_entry *thread = (struct threadpool_entry *) args;
+	enum threadpool_msg cmd;
+
+	thread->tid = gettid();
+
+	pr_debug2("threadpool[%d]: started\n", thread->tid);
+
+	for (;;) {
+		if (threadpool_entry__send_ack(thread))
+			break;
+
+		if (threadpool_entry__recv_cmd(thread, &cmd))
+			break;
+
+		if (cmd == THREADPOOL_MSG__STOP)
+			break;
+	}
+
+	pr_debug2("threadpool[%d]: exit\n", thread->tid);
+
+	threadpool_entry__send_ack(thread);
+
+	return NULL;
+}
+
 /**
  * threadpool__new - create a fixed threadpool with @n_threads threads
  */
@@ -161,12 +326,30 @@ struct threadpool *threadpool__new(int n_threads)
  *
  * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
  */
-int threadpool__strerror(struct threadpool *pool __maybe_unused, int err, char *buf, size_t size)
+int threadpool__strerror(struct threadpool *pool, int err, char *buf, size_t size)
 {
 	char sbuf[STRERR_BUFSIZE], *emsg;
+	const char *status_str, *errno_str;
 
-	emsg = str_error_r(err, sbuf, sizeof(sbuf));
-	return scnprintf(buf, size, "Error: %s.\n", emsg);
+	status_str = IS_ERR_OR_NULL(pool) ? "error" : threadpool_status_tags[pool->status];
+
+	switch (err) {
+	case -THREADPOOL_ERROR__SIGPROCMASK:
+	case -THREADPOOL_ERROR__READPIPE:
+	case -THREADPOOL_ERROR__WRITEPIPE:
+		emsg = str_error_r(errno, sbuf, sizeof(sbuf));
+		errno_str = threadpool_errno_str[-err-THREADPOOL_ERROR__OFFSET];
+		return scnprintf(buf, size, "%s: %s.\n", errno_str, emsg);
+	case -THREADPOOL_ERROR__INVALIDMSG:
+		errno_str = threadpool_errno_str[-err-THREADPOOL_ERROR__OFFSET];
+		return scnprintf(buf, size, "%s.\n", errno_str);
+	case -THREADPOOL_ERROR__NOTALLOWED:
+		return scnprintf(buf, size, "%s (%s).\n",
+			threadpool_errno_str[-err], status_str);
+	default:
+		emsg = str_error_r(err, sbuf, sizeof(sbuf));
+		return scnprintf(buf, size, "Error: %s", emsg);
+	}
 }
 
 /**
@@ -206,3 +389,142 @@ int threadpool__size(struct threadpool *pool)
 {
 	return pool->nr_threads;
 }
+
+/**
+ * __threadpool__start - start all threads in the pool.
+ *
+ * NB: use threadpool_start. This function does not change @pool->status.
+ */
+static int __threadpool__start(struct threadpool *pool)
+{
+	int t, tt, ret, err = 0, nr_threads = pool->nr_threads;
+	sigset_t full, mask;
+	pthread_t handle;
+	pthread_attr_t attrs;
+
+	sigfillset(&full);
+	if (sigprocmask(SIG_SETMASK, &full, &mask)) {
+		pr_debug2("Failed to block signals on threads start: %s\n", strerror(errno));
+		return -THREADPOOL_ERROR__SIGPROCMASK;
+	}
+
+	pthread_attr_init(&attrs);
+	pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
+
+	for (t = 0; t < nr_threads; t++) {
+		struct threadpool_entry *thread = &pool->threads[t];
+
+		ret = pthread_create(&handle, &attrs, threadpool_entry__function, thread);
+		if (ret) {
+			err = ret;
+			pr_debug2("Failed to start threads: %s\n", strerror(errno));
+			break;
+		}
+	}
+
+	/**
+	 * Wait for all threads that we managed to run.
+	 * In case of further errors, continue to terminate possibly not
+	 * failing threads.
+	 */
+	for (tt = 0; tt < t; tt++) {
+		struct threadpool_entry *thread = &pool->threads[tt];
+
+		ret = threadpool__wait_thread(thread);
+		if (ret)
+			err = ret;
+	}
+
+	/**
+	 * In case of errors, terminate all threads that we managed to run.
+	 */
+	if (err) {
+		for (tt = 0; tt < t; tt++)
+			threadpool__terminate_thread(&pool->threads[tt]);
+	}
+
+	pthread_attr_destroy(&attrs);
+
+	if (sigprocmask(SIG_SETMASK, &mask, NULL)) {
+		pr_debug2("Failed to unblock signals on threads start: %s\n", strerror(errno));
+		err = -THREADPOOL_ERROR__SIGPROCMASK;
+	}
+
+	return err;
+}
+
+/**
+ * threadpool__start - start all threads in the pool.
+ *
+ * The function blocks until all threads are up and running.
+ */
+int threadpool__start(struct threadpool *pool)
+{
+	int ret;
+
+	if (pool->status != THREADPOOL_STATUS__STOPPED) {
+		pr_debug2("threadpool: starting not stopped pool\n");
+		return -THREADPOOL_ERROR__NOTALLOWED;
+	}
+
+	ret = __threadpool__start(pool);
+	if (ret) {
+		pool->status = THREADPOOL_STATUS__ERROR;
+		return ret;
+	}
+	pool->status = THREADPOOL_STATUS__READY;
+	return 0;
+}
+
+/**
+ * __threadpool__stop - stop all threads in the pool.
+ *
+ * NB: use threadpool_stop. This function does not change @pool->status.
+ */
+static int __threadpool__stop(struct threadpool *pool)
+{
+	int t, ret, err = 0;
+
+	for (t = 0; t < pool->nr_threads; t++) {
+		/**
+		 * Even if a termination fails, we should continue to terminate
+		 * all other threads.
+		 */
+		ret = threadpool__terminate_thread(&pool->threads[t]);
+		if (ret)
+			err = ret;
+	}
+
+	return err;
+}
+
+/**
+ * threadpool__stop - stop all threads in the pool.
+ *
+ * This function blocks waiting for ack from all threads.
+ */
+int threadpool__stop(struct threadpool *pool)
+{
+	int ret;
+
+	if (pool->status != THREADPOOL_STATUS__READY) {
+		pr_debug2("threadpool: stopping not ready pool\n");
+		return -THREADPOOL_ERROR__NOTALLOWED;
+	}
+
+	ret = __threadpool__stop(pool);
+	if (ret) {
+		pool->status = THREADPOOL_STATUS__ERROR;
+		return ret;
+	}
+	pool->status = THREADPOOL_STATUS__STOPPED;
+	return 0;
+}
+
+/**
+ * threadpool__is_ready - check if the threads are running
+ */
+bool threadpool__is_ready(struct threadpool *pool)
+{
+	return pool->status == THREADPOOL_STATUS__READY;
+}
diff --git a/tools/perf/util/workqueue/threadpool.h b/tools/perf/util/workqueue/threadpool.h
index fb18aa32fb64f671..7d56e5450fac605b 100644
--- a/tools/perf/util/workqueue/threadpool.h
+++ b/tools/perf/util/workqueue/threadpool.h
@@ -21,9 +21,18 @@ extern int threadpool__execute(struct threadpool *pool, struct task_struct *task
 extern int threadpool__wait(struct threadpool *pool);
 
 extern int threadpool__size(struct threadpool *pool);
+extern bool threadpool__is_ready(struct threadpool *pool);
 
 /* Error management */
 #define THREADPOOL_STRERR_BUFSIZE (128+STRERR_BUFSIZE)
+#define THREADPOOL_ERROR__OFFSET 512
+enum {
+	THREADPOOL_ERROR__SIGPROCMASK = THREADPOOL_ERROR__OFFSET,
+	THREADPOOL_ERROR__READPIPE,
+	THREADPOOL_ERROR__WRITEPIPE,
+	THREADPOOL_ERROR__INVALIDMSG,
+	THREADPOOL_ERROR__NOTALLOWED
+};
 extern int threadpool__strerror(struct threadpool *pool, int err, char *buf, size_t size);
 extern int threadpool__new_strerror(struct threadpool *err_ptr, char *buf, size_t size);
 
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [RFC PATCH v2 04/10] perf workqueue: add threadpool execute and wait functions
       [not found] <cover.1627657061.git.rickyman7@gmail.com>
                   ` (2 preceding siblings ...)
  2021-07-30 15:34 ` [RFC PATCH v2 03/10] perf workqueue: add threadpool start and stop functions Riccardo Mancini
@ 2021-07-30 15:34 ` Riccardo Mancini
  2021-08-07  2:56   ` Namhyung Kim
  2021-07-30 15:34 ` [RFC PATCH v2 05/10] tools: add sparse context/locking annotations in compiler-types.h Riccardo Mancini
                   ` (5 subsequent siblings)
  9 siblings, 1 reply; 21+ messages in thread
From: Riccardo Mancini @ 2021-07-30 15:34 UTC (permalink / raw)
  To: Arnaldo Carvalho de Melo
  Cc: Ian Rogers, Namhyung Kim, Peter Zijlstra, Ingo Molnar,
	Mark Rutland, Jiri Olsa, linux-kernel, linux-perf-users,
	Alexey Bayduraev, Riccardo Mancini

This patch adds:
 - threadpool__execute: assigns a task to the threads to execute
   asynchronously.
 - threadpool__wait: waits for the task to complete on all threads.
Furthermore, testing for these new functions is added.

This patch completes the threadpool.

Signed-off-by: Riccardo Mancini <rickyman7@gmail.com>
---
 tools/perf/tests/workqueue.c           |  85 +++++++++++++-
 tools/perf/util/workqueue/threadpool.c | 146 ++++++++++++++++++++++++-
 tools/perf/util/workqueue/threadpool.h |   1 +
 3 files changed, 230 insertions(+), 2 deletions(-)

diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
index 767e4fb60be4b190..87bf8fc71c346653 100644
--- a/tools/perf/tests/workqueue.c
+++ b/tools/perf/tests/workqueue.c
@@ -1,14 +1,60 @@
 // SPDX-License-Identifier: GPL-2.0
+#include <stdlib.h>
 #include <linux/kernel.h>
 #include <linux/err.h>
+#include <linux/zalloc.h>
 #include "tests.h"
 #include "util/debug.h"
 #include "util/workqueue/threadpool.h"
 
+#define DUMMY_FACTOR 100000
+#define N_DUMMY_WORK_SIZES 7
+
 struct threadpool_test_args_t {
 	int pool_size;
 };
 
+struct test_task {
+	struct task_struct task;
+	int n_threads;
+	int *array;
+};
+
+/**
+ * dummy_work - calculates DUMMY_FACTOR * (idx % N_DUMMY_WORK_SIZES) inefficiently
+ *
+ * This function uses modulus to create work items of different sizes.
+ */
+static void dummy_work(int idx)
+{
+	volatile int prod = 0;	/* prevent possible compiler optimizations */
+	int k = idx % N_DUMMY_WORK_SIZES;
+	int i, j;
+
+	for (i = 0; i < DUMMY_FACTOR; i++)
+		for (j = 0; j < k; j++)
+			prod++;
+
+	pr_debug3("dummy: %d * %d = %d\n", DUMMY_FACTOR, k, prod);
+}
+
+static void test_task_fn1(int tidx, struct task_struct *task)
+{
+	struct test_task *mtask = container_of(task, struct test_task, task);
+
+	dummy_work(tidx);
+	mtask->array[tidx] = tidx+1;
+}
+
+static void test_task_fn2(int tidx, struct task_struct *task)
+{
+	struct test_task *mtask = container_of(task, struct test_task, task);
+
+	dummy_work(tidx);
+	mtask->array[tidx] = tidx*2;
+}
+
+
 static int __threadpool__prepare(struct threadpool **pool, int pool_size)
 {
 	int ret;
@@ -39,22 +85,59 @@ static int __threadpool__teardown(struct threadpool *pool)
 	return TEST_OK;
 }
 
+static int __threadpool__exec_wait(struct threadpool *pool,
+				struct task_struct *task)
+{
+	int ret = threadpool__execute(pool, task);
+
+	TEST_ASSERT_VAL("threadpool execute failure", ret == 0);
+	TEST_ASSERT_VAL("threadpool is not executing", threadpool__is_busy(pool));
+
+	ret = threadpool__wait(pool);
+	TEST_ASSERT_VAL("threadpool wait failure", ret == 0);
+	TEST_ASSERT_VAL("waited threadpool is not ready", threadpool__is_ready(pool));
+
+	return TEST_OK;
+}
 
 static int __test__threadpool(void *_args)
 {
 	struct threadpool_test_args_t *args = _args;
 	struct threadpool *pool;
-	int ret;
+	int ret, i;
+	struct test_task task;
+
+	task.task.fn = test_task_fn1;
+	task.n_threads = args->pool_size;
+	task.array = calloc(args->pool_size, sizeof(*task.array));
+	TEST_ASSERT_VAL("calloc failure", task.array);
 
 	ret = __threadpool__prepare(&pool, args->pool_size);
 	if (ret)
 		goto out;
 
+	ret = __threadpool__exec_wait(pool, &task.task);
+	if (ret)
+		goto out;
+
+	for (i = 0; i < args->pool_size; i++)
+		TEST_ASSERT_VAL("failed array check (1)", task.array[i] == i+1);
+
+	task.task.fn = test_task_fn2;
+
+	ret = __threadpool__exec_wait(pool, &task.task);
+	if (ret)
+		goto out;
+
+	for (i = 0; i < args->pool_size; i++)
+		TEST_ASSERT_VAL("failed array check (2)", task.array[i] == 2*i);
+
 	ret = __threadpool__teardown(pool);
 	if (ret)
 		goto out;
 
 out:
+	free(task.array);
 	return ret;
 }
 
diff --git a/tools/perf/util/workqueue/threadpool.c b/tools/perf/util/workqueue/threadpool.c
index 850ef7e110566536..33320dd0fb44fe38 100644
--- a/tools/perf/util/workqueue/threadpool.c
+++ b/tools/perf/util/workqueue/threadpool.c
@@ -26,6 +26,7 @@ static inline pid_t gettid(void)
 enum threadpool_status {
 	THREADPOOL_STATUS__STOPPED,		/* no threads */
 	THREADPOOL_STATUS__READY,		/* threads are ready but idle */
+	THREADPOOL_STATUS__BUSY,		/* threads are busy */
 	THREADPOOL_STATUS__ERROR,		/* errors */
 	THREADPOOL_STATUS__MAX
 };
@@ -33,6 +34,7 @@ enum threadpool_status {
 static const char * const threadpool_status_tags[] = {
 	"stopped",
 	"ready",
+	"busy",
 	"error"
 };
 
@@ -181,6 +183,28 @@ static int threadpool__terminate_thread(struct threadpool_entry *thread)
 	return threadpool__wait_thread(thread);
 }
 
+/**
+ * threadpool__wake_thread - send wake msg to @thread
+ *
+ * This function does not wait for the thread to actually wake
+ * NB: call only from main thread!
+ */
+static int threadpool__wake_thread(struct threadpool_entry *thread)
+{
+	int res;
+	enum threadpool_msg msg = THREADPOOL_MSG__WAKE;
+
+	res = writen(thread->pipes.cmd[1], &msg, sizeof(msg));
+	if (res < 0) {
+		pr_debug2("threadpool: error sending wake msg: %s\n", strerror(errno));
+		return -THREADPOOL_ERROR__WRITEPIPE;
+	}
+
+	pr_debug2("threadpool: sent wake msg %s to tid=%d\n",
+		threadpool_msg_tags[msg], thread->tid);
+	return 0;
+}
+
 /**
  * threadpool_entry__function - send ack to main thread
  */
@@ -249,6 +273,15 @@ static void *threadpool_entry__function(void *args)
 
 		if (cmd == THREADPOOL_MSG__STOP)
 			break;
+
+		if (!thread->pool->current_task) {
+			pr_debug("threadpool[%d]: received wake without task\n",
+				thread->tid);
+			break;
+		}
+
+		pr_debug("threadpool[%d]: executing task\n", thread->tid);
+		thread->pool->current_task->fn(thread->idx, thread->pool->current_task);
 	}
 
 	pr_debug2("threadpool[%d]: exit\n", thread->tid);
@@ -502,10 +535,14 @@ static int __threadpool__stop(struct threadpool *pool)
  * threadpool__stop - stop all threads in the pool.
  *
  * This function blocks waiting for ack from all threads.
+ * If the pool was busy, it will first wait for the task to finish.
  */
 int threadpool__stop(struct threadpool *pool)
 {
-	int ret;
+	int ret = threadpool__wait(pool);
+
+	if (ret)
+		return ret;
 
 	if (pool->status != THREADPOOL_STATUS__READY) {
 		pr_debug2("threadpool: stopping not ready pool\n");
@@ -528,3 +565,110 @@ bool threadpool__is_ready(struct threadpool *pool)
 {
 	return pool->status == THREADPOOL_STATUS__READY;
 }
+
+/**
+ * __threadpool__execute - execute @task on all threads of the @pool
+ *
+ * NB: use threadpool__execute. This function does not change @pool->status.
+ */
+static int __threadpool__execute(struct threadpool *pool, struct task_struct *task)
+{
+	int t, ret;
+
+	pool->current_task = task;
+
+	for (t = 0; t < pool->nr_threads; t++) {
+		ret = threadpool__wake_thread(&pool->threads[t]);
+		if (ret)
+			return ret;
+	}
+
+	return 0;
+}
+
+/**
+ * threadpool__execute - execute @task on all threads of the @pool
+ *
+ * The task will run asynchronously wrt the main thread.
+ * The task can be waited with threadpool__wait.
+ *
+ * NB: make sure the pool is ready before calling this, since no queueing is
+ *     performed. If you need queueing, have a look at the workqueue.
+ */
+int threadpool__execute(struct threadpool *pool, struct task_struct *task)
+{
+	int ret;
+
+	if (pool->status != THREADPOOL_STATUS__READY) {
+		pr_debug2("threadpool: executing on not ready pool\n");
+		return -THREADPOOL_ERROR__NOTALLOWED;
+	}
+
+	ret = __threadpool__execute(pool, task);
+	if (ret) {
+		pool->status = THREADPOOL_STATUS__ERROR;
+		return ret;
+	}
+	pool->status = THREADPOOL_STATUS__BUSY;
+	return 0;
+}
+
+/**
+ * __threadpool__wait - wait until all threads in @pool are done
+ *
+ * NB: use threadpool__wait. This function does not change @pool->status.
+ */
+static int __threadpool__wait(struct threadpool *pool)
+{
+	int t, err = 0, ret;
+
+	for (t = 0; t < pool->nr_threads; t++) {
+		ret = threadpool__wait_thread(&pool->threads[t]);
+		if (ret)
+			err = ret;
+	}
+
+	pool->current_task = NULL;
+	return err;
+}
+
+/**
+ * threadpool__wait - wait until all threads in @pool are done
+ *
+ * This function will wait for all threads to finish execution and send their
+ * ack message.
+ *
+ * NB: call only from main thread!
+ */
+int threadpool__wait(struct threadpool *pool)
+{
+	int ret;
+
+	switch (pool->status) {
+	case THREADPOOL_STATUS__BUSY:
+		break;
+	case THREADPOOL_STATUS__READY:
+		return 0;
+	default:
+	case THREADPOOL_STATUS__STOPPED:
+	case THREADPOOL_STATUS__ERROR:
+	case THREADPOOL_STATUS__MAX:
+		return -THREADPOOL_ERROR__NOTALLOWED;
+	}
+
+	ret = __threadpool__wait(pool);
+	if (ret) {
+		pool->status = THREADPOOL_STATUS__ERROR;
+		return ret;
+	}
+	pool->status = THREADPOOL_STATUS__READY;
+	return 0;
+}
+
+/**
+ * threadpool__is_busy - check if the pool is busy
+ */
+int threadpool__is_busy(struct threadpool *pool)
+{
+	return pool->status == THREADPOOL_STATUS__BUSY;
+}
diff --git a/tools/perf/util/workqueue/threadpool.h b/tools/perf/util/workqueue/threadpool.h
index 7d56e5450fac605b..f28b3856d30e91e5 100644
--- a/tools/perf/util/workqueue/threadpool.h
+++ b/tools/perf/util/workqueue/threadpool.h
@@ -22,6 +22,7 @@ extern int threadpool__wait(struct threadpool *pool);
 
 extern int threadpool__size(struct threadpool *pool);
 extern bool threadpool__is_ready(struct threadpool *pool);
+extern int threadpool__is_busy(struct threadpool *pool);
 
 /* Error management */
 #define THREADPOOL_STRERR_BUFSIZE (128+STRERR_BUFSIZE)
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [RFC PATCH v2 05/10] tools: add sparse context/locking annotations in compiler-types.h
       [not found] <cover.1627657061.git.rickyman7@gmail.com>
                   ` (3 preceding siblings ...)
  2021-07-30 15:34 ` [RFC PATCH v2 04/10] perf workqueue: add threadpool execute and wait functions Riccardo Mancini
@ 2021-07-30 15:34 ` Riccardo Mancini
  2021-07-30 15:34 ` [RFC PATCH v2 06/10] perf workqueue: introduce workqueue struct Riccardo Mancini
                   ` (4 subsequent siblings)
  9 siblings, 0 replies; 21+ messages in thread
From: Riccardo Mancini @ 2021-07-30 15:34 UTC (permalink / raw)
  To: Arnaldo Carvalho de Melo
  Cc: Ian Rogers, Namhyung Kim, Peter Zijlstra, Ingo Molnar,
	Mark Rutland, Jiri Olsa, linux-kernel, linux-perf-users,
	Alexey Bayduraev, Riccardo Mancini

This patch copies sparse context/locking annotations from
include/compiler-types.h to tools/include/compiler-types.h.

Signed-off-by: Riccardo Mancini <rickyman7@gmail.com>
---
 tools/include/linux/compiler_types.h | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)

diff --git a/tools/include/linux/compiler_types.h b/tools/include/linux/compiler_types.h
index feea09029f610120..24ae3054f304f274 100644
--- a/tools/include/linux/compiler_types.h
+++ b/tools/include/linux/compiler_types.h
@@ -13,6 +13,24 @@
 #define __has_builtin(x) (0)
 #endif
 
+#ifdef __CHECKER__
+/* context/locking */
+# define __must_hold(x)	__attribute__((context(x,1,1)))
+# define __acquires(x)	__attribute__((context(x,0,1)))
+# define __releases(x)	__attribute__((context(x,1,0)))
+# define __acquire(x)	__context__(x,1)
+# define __release(x)	__context__(x,-1)
+# define __cond_lock(x,c)	((c) ? ({ __acquire(x); 1; }) : 0)
+#else /* __CHECKER__ */
+/* context/locking */
+# define __must_hold(x)
+# define __acquires(x)
+# define __releases(x)
+# define __acquire(x)	(void)0
+# define __release(x)	(void)0
+# define __cond_lock(x,c) (c)
+#endif /* __CHECKER__ */
+
 /* Compiler specific macros. */
 #ifdef __GNUC__
 #include <linux/compiler-gcc.h>
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [RFC PATCH v2 06/10] perf workqueue: introduce workqueue struct
       [not found] <cover.1627657061.git.rickyman7@gmail.com>
                   ` (4 preceding siblings ...)
  2021-07-30 15:34 ` [RFC PATCH v2 05/10] tools: add sparse context/locking annotations in compiler-types.h Riccardo Mancini
@ 2021-07-30 15:34 ` Riccardo Mancini
  2021-08-09 12:04   ` Jiri Olsa
  2021-07-30 15:34 ` [RFC PATCH v2 07/10] perf workqueue: implement worker thread and management Riccardo Mancini
                   ` (3 subsequent siblings)
  9 siblings, 1 reply; 21+ messages in thread
From: Riccardo Mancini @ 2021-07-30 15:34 UTC (permalink / raw)
  To: Arnaldo Carvalho de Melo
  Cc: Ian Rogers, Namhyung Kim, Peter Zijlstra, Ingo Molnar,
	Mark Rutland, Jiri Olsa, linux-kernel, linux-perf-users,
	Alexey Bayduraev, Riccardo Mancini

This patch adds the workqueue definition, along with simple creation and
destruction functions.
Furthermore, a simple subtest is added.

A workqueue is attached to a pool, on which it executes its workers.
Next patches will introduce workers.

Signed-off-by: Riccardo Mancini <rickyman7@gmail.com>
---
 tools/perf/tests/workqueue.c          |  86 +++++++++
 tools/perf/util/workqueue/Build       |   1 +
 tools/perf/util/workqueue/workqueue.c | 268 ++++++++++++++++++++++++++
 tools/perf/util/workqueue/workqueue.h |  38 ++++
 4 files changed, 393 insertions(+)
 create mode 100644 tools/perf/util/workqueue/workqueue.c
 create mode 100644 tools/perf/util/workqueue/workqueue.h

diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
index 87bf8fc71c346653..2165a563e47bd6a5 100644
--- a/tools/perf/tests/workqueue.c
+++ b/tools/perf/tests/workqueue.c
@@ -6,6 +6,7 @@
 #include "tests.h"
 #include "util/debug.h"
 #include "util/workqueue/threadpool.h"
+#include "util/workqueue/workqueue.h"
 
 #define DUMMY_FACTOR 100000
 #define N_DUMMY_WORK_SIZES 7
@@ -14,6 +15,11 @@ struct threadpool_test_args_t {
 	int pool_size;
 };
 
+struct workqueue_test_args_t {
+	int pool_size;
+	int n_work_items;
+};
+
 struct test_task {
 	struct task_struct task;
 	int n_threads;
@@ -141,6 +147,52 @@ static int __test__threadpool(void *_args)
 	return ret;
 }
 
+
+static int __workqueue__prepare(struct threadpool **pool,
+				struct workqueue_struct **wq,
+				int pool_size)
+{
+	int ret = __threadpool__prepare(pool, pool_size);
+
+	if (ret)
+		return ret;
+
+	*wq = create_workqueue(*pool);
+	TEST_ASSERT_VAL("workqueue creation failure", !IS_ERR(*wq));
+	TEST_ASSERT_VAL("workqueue wrong size", workqueue_nr_threads(*wq) == pool_size);
+	TEST_ASSERT_VAL("threadpool is not executing", threadpool__is_busy(*pool));
+
+	return TEST_OK;
+}
+
+static int __workqueue__teardown(struct threadpool *pool,
+				struct workqueue_struct *wq)
+{
+	int ret = destroy_workqueue(wq);
+
+	TEST_ASSERT_VAL("workqueue detruction failure", ret == 0);
+
+	return __threadpool__teardown(pool);
+}
+
+static int __test__workqueue(void *_args)
+{
+	struct workqueue_test_args_t *args = _args;
+	struct threadpool *pool;
+	struct workqueue_struct *wq;
+	int ret = __workqueue__prepare(&pool, &wq, args->pool_size);
+
+	if (ret)
+		goto out;
+
+	ret = __workqueue__teardown(pool, wq);
+	if (ret)
+		goto out;
+
+out:
+	return ret;
+}
+
 static const struct threadpool_test_args_t threadpool_test_args[] = {
 	{
 		.pool_size = 1
@@ -159,6 +211,33 @@ static const struct threadpool_test_args_t threadpool_test_args[] = {
 	}
 };
 
+static const struct workqueue_test_args_t workqueue_test_args[] = {
+	{
+		.pool_size = 1,
+		.n_work_items = 1
+	},
+	{
+		.pool_size = 1,
+		.n_work_items = 10
+	},
+	{
+		.pool_size = 2,
+		.n_work_items = 1
+	},
+	{
+		.pool_size = 2,
+		.n_work_items = 100
+	},
+	{
+		.pool_size = 16,
+		.n_work_items = 7
+	},
+	{
+		.pool_size = 16,
+		.n_work_items = 2789
+	}
+};
+
 struct test_case {
 	const char *desc;
 	int (*func)(void *args);
@@ -174,6 +253,13 @@ static struct test_case workqueue_testcase_table[] = {
 		.args = (void *) threadpool_test_args,
 		.n_args = (int)ARRAY_SIZE(threadpool_test_args),
 		.arg_size = sizeof(struct threadpool_test_args_t)
+	},
+	{
+		.desc = "Workqueue",
+		.func = __test__workqueue,
+		.args = (void *) workqueue_test_args,
+		.n_args = (int)ARRAY_SIZE(workqueue_test_args),
+		.arg_size = sizeof(struct workqueue_test_args_t)
 	}
 };
 
diff --git a/tools/perf/util/workqueue/Build b/tools/perf/util/workqueue/Build
index 8b72a6cd4e2cba0d..4af721345c0a6bb7 100644
--- a/tools/perf/util/workqueue/Build
+++ b/tools/perf/util/workqueue/Build
@@ -1 +1,2 @@
 perf-y += threadpool.o
+perf-y += workqueue.o
diff --git a/tools/perf/util/workqueue/workqueue.c b/tools/perf/util/workqueue/workqueue.c
new file mode 100644
index 0000000000000000..d3c6d4c4e75944a5
--- /dev/null
+++ b/tools/perf/util/workqueue/workqueue.c
@@ -0,0 +1,268 @@
+// SPDX-License-Identifier: GPL-2.0
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <pthread.h>
+#include <linux/list.h>
+#include <linux/err.h>
+#include <linux/string.h>
+#include "debug.h"
+#include <internal/lib.h>
+#include "workqueue.h"
+
+enum workqueue_status {
+	WORKQUEUE_STATUS__READY,	/* wq is ready to receive work */
+	WORKQUEUE_STATUS__ERROR,
+	WORKQUEUE_STATUS__MAX
+};
+
+static const char * const workqueue_status_tags[] = {
+	"ready",
+	"error"
+};
+
+struct workqueue_struct {
+	pthread_mutex_t		lock;		/* locking of the thread_pool */
+	pthread_cond_t		idle_cond;	/* all workers are idle cond */
+	struct threadpool	*pool;		/* underlying pool */
+	int			pool_errno;	/* latest pool error */
+	struct task_struct	task;		/* threadpool task */
+	struct list_head	busy_list;	/* busy workers */
+	struct list_head	idle_list;	/* idle workers */
+	struct list_head	pending;	/* pending work items */
+	int			msg_pipe[2];	/* main thread comm pipes */
+	enum workqueue_status	status;
+};
+
+static const char * const workqueue_errno_str[] = {
+	"",
+	"This operation is not allowed in this workqueue status",
+	"Error executing function in threadpool",
+	"Error waiting the threadpool",
+	"Error sending message to worker",
+	"Error receiving message from worker",
+	"Received unexpected message from worker",
+};
+
+/**
+ * worker_thread - worker function executed on threadpool
+ */
+static void worker_thread(int tidx, struct task_struct *task)
+{
+
+	pr_info("Hi from worker %d, executing task %p\n", tidx, task);
+}
+
+/**
+ * attach_threadpool_to_workqueue - start @wq workers on @pool
+ */
+static int attach_threadpool_to_workqueue(struct workqueue_struct *wq,
+					struct threadpool *pool)
+{
+	if (!threadpool__is_ready(pool)) {
+		pr_debug2("workqueue: cannot attach to pool: pool is not ready\n");
+		return -WORKQUEUE_ERROR__NOTALLOWED;
+	}
+
+	wq->pool = pool;
+
+	wq->pool_errno = threadpool__execute(pool, &wq->task);
+	if (wq->pool_errno)
+		return -WORKQUEUE_ERROR__POOLEXE;
+
+	return 0;
+}
+
+/**
+ * detach_threadpool_from_workqueue - stop @wq workers on @pool
+ */
+static int detach_threadpool_from_workqueue(struct workqueue_struct *wq)
+{
+	int ret, err = 0;
+
+	if (wq->status != WORKQUEUE_STATUS__READY) {
+		pr_debug2("workqueue: cannot attach to pool: wq is not ready\n");
+		return -WORKQUEUE_ERROR__NOTALLOWED;
+	}
+
+	ret = threadpool__wait(wq->pool);
+	if (ret) {
+		pr_debug2("workqueue: error waiting threadpool\n");
+		wq->pool_errno = ret;
+		err = -WORKQUEUE_ERROR__POOLWAIT;
+	}
+
+	wq->pool = NULL;
+	return err;
+}
+
+/**
+ * create_workqueue - create a workqueue associated to @pool
+ *
+ * Only one workqueue can execute on a pool at a time.
+ */
+struct workqueue_struct *create_workqueue(struct threadpool *pool)
+{
+	int ret, err = 0;
+	struct workqueue_struct *wq = malloc(sizeof(struct workqueue_struct));
+
+	if (!wq) {
+		err = -ENOMEM;
+		goto out_return;
+	}
+
+	ret = pthread_mutex_init(&wq->lock, NULL);
+	if (ret) {
+		err = -ret;
+		goto out_free_wq;
+	}
+
+	ret = pthread_cond_init(&wq->idle_cond, NULL);
+	if (ret) {
+		err = -ret;
+		goto out_destroy_mutex;
+	}
+
+	wq->pool = NULL;
+	INIT_LIST_HEAD(&wq->busy_list);
+	INIT_LIST_HEAD(&wq->idle_list);
+
+	INIT_LIST_HEAD(&wq->pending);
+
+	ret = pipe(wq->msg_pipe);
+	if (ret) {
+		err = -ENOMEM;
+		goto out_destroy_cond;
+	}
+
+	wq->task.fn = worker_thread;
+
+	ret = attach_threadpool_to_workqueue(wq, pool);
+	if (ret) {
+		err = ret;
+		goto out_destroy_cond;
+	}
+
+	wq->status = WORKQUEUE_STATUS__READY;
+
+	return wq;
+
+out_destroy_cond:
+	pthread_cond_destroy(&wq->idle_cond);
+out_destroy_mutex:
+	pthread_mutex_destroy(&wq->lock);
+out_free_wq:
+	free(wq);
+out_return:
+	return ERR_PTR(err);
+}
+
+/**
+ * destroy_workqueue - stop @wq workers and destroy @wq
+ */
+int destroy_workqueue(struct workqueue_struct *wq)
+{
+	int err = 0, ret;
+
+	if (IS_ERR_OR_NULL(wq))
+		return 0;
+
+	ret = detach_threadpool_from_workqueue(wq);
+	if (ret) {
+		pr_debug2("workqueue: error detaching from threadpool.\n");
+		err = ret;
+	}
+
+	ret = pthread_mutex_destroy(&wq->lock);
+	if (ret) {
+		err = -ret;
+		pr_debug2("workqueue: error pthread_mutex_destroy: %s\n",
+			strerror(errno));
+	}
+
+	ret = pthread_cond_destroy(&wq->idle_cond);
+	if (ret) {
+		err = -ret;
+		pr_debug2("workqueue: error pthread_cond_destroy: %s\n",
+			strerror(errno));
+	}
+
+	close(wq->msg_pipe[0]);
+	wq->msg_pipe[0] = -1;
+
+	close(wq->msg_pipe[1]);
+	wq->msg_pipe[1] = -1;
+
+	free(wq);
+	return err;
+}
+
+/**
+ * workqueue_strerror - print message regarding lastest error in @wq
+ *
+ * Buffer size should be at least WORKQUEUE_STRERR_BUFSIZE bytes.
+ */
+int workqueue_strerror(struct workqueue_struct *wq, int err, char *buf, size_t size)
+{
+	int ret;
+	char sbuf[THREADPOOL_STRERR_BUFSIZE], *emsg;
+	const char *status_str, *errno_str;
+
+	status_str = IS_ERR_OR_NULL(wq) ? "error" : workqueue_status_tags[wq->status];
+	errno_str = workqueue_errno_str[-err-WORKQUEUE_ERROR__OFFSET];
+
+	switch (err) {
+	case -WORKQUEUE_ERROR__NOTALLOWED:
+		return scnprintf(buf, size, "%s (%s).\n",
+			errno_str, status_str);
+	case -WORKQUEUE_ERROR__POOLEXE:
+	case -WORKQUEUE_ERROR__POOLWAIT:
+		if (IS_ERR_OR_NULL(wq))
+			return scnprintf(buf, size, "%s: unknown.\n",
+				errno_str);
+
+		ret = threadpool__strerror(wq->pool, wq->pool_errno, sbuf, sizeof(sbuf));
+		if (ret < 0)
+			return ret;
+		return scnprintf(buf, size, "%s: %s.\n", errno_str, sbuf);
+	case -WORKQUEUE_ERROR__WRITEPIPE:
+	case -WORKQUEUE_ERROR__READPIPE:
+		emsg = str_error_r(errno, sbuf, sizeof(sbuf));
+		return scnprintf(buf, size, "%s: %s.\n", errno_str, emsg);
+	case -WORKQUEUE_ERROR__INVALIDMSG:
+		return scnprintf(buf, size, "%s.\n", errno_str);
+	default:
+		emsg = str_error_r(err, sbuf, sizeof(sbuf));
+		return scnprintf(buf, size, "Error: %s", emsg);
+	}
+}
+
+/**
+ * create_workqueue_strerror - print message regarding @err_ptr
+ *
+ * Buffer size should be at least WORKQUEUE_STRERR_BUFSIZE bytes.
+ */
+int create_workqueue_strerror(struct workqueue_struct *err_ptr, char *buf, size_t size)
+{
+	return workqueue_strerror(err_ptr, PTR_ERR(err_ptr), buf, size);
+}
+
+/**
+ * destroy_workqueue_strerror - print message regarding @err
+ *
+ * Buffer size should be at least WORKQUEUE_STRERR_BUFSIZE bytes.
+ */
+int destroy_workqueue_strerror(int err, char *buf, size_t size)
+{
+	return workqueue_strerror(NULL, err, buf, size);
+}
+
+/**
+ * workqueue_nr_threads - get size of threadpool underlying @wq
+ */
+int workqueue_nr_threads(struct workqueue_struct *wq)
+{
+	return threadpool__size(wq->pool);
+}
diff --git a/tools/perf/util/workqueue/workqueue.h b/tools/perf/util/workqueue/workqueue.h
new file mode 100644
index 0000000000000000..456cd8b6cb2a26d8
--- /dev/null
+++ b/tools/perf/util/workqueue/workqueue.h
@@ -0,0 +1,38 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef __WORKQUEUE_WORKQUEUE_H
+#define __WORKQUEUE_WORKQUEUE_H
+
+#include <stdlib.h>
+#include <sys/types.h>
+#include <linux/list.h>
+#include "threadpool.h"
+
+struct work_struct;
+typedef void (*work_func_t)(struct work_struct *work);
+
+struct work_struct {
+	struct list_head entry;
+	work_func_t func;
+};
+
+struct workqueue_struct;
+
+extern struct workqueue_struct *create_workqueue(struct threadpool *pool);
+extern int destroy_workqueue(struct workqueue_struct *wq);
+
+extern int workqueue_nr_threads(struct workqueue_struct *wq);
+
+#define WORKQUEUE_STRERR_BUFSIZE (128+THREADPOOL_STRERR_BUFSIZE)
+#define WORKQUEUE_ERROR__OFFSET 512
+enum {
+	WORKQUEUE_ERROR__NOTALLOWED = WORKQUEUE_ERROR__OFFSET,
+	WORKQUEUE_ERROR__POOLEXE,
+	WORKQUEUE_ERROR__POOLWAIT,
+	WORKQUEUE_ERROR__WRITEPIPE,
+	WORKQUEUE_ERROR__READPIPE,
+	WORKQUEUE_ERROR__INVALIDMSG,
+};
+extern int workqueue_strerror(struct workqueue_struct *wq, int err, char *buf, size_t size);
+extern int create_workqueue_strerror(struct workqueue_struct *err_ptr, char *buf, size_t size);
+extern int destroy_workqueue_strerror(int err, char *buf, size_t size);
+#endif /* __WORKQUEUE_WORKQUEUE_H */
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [RFC PATCH v2 07/10] perf workqueue: implement worker thread and management
       [not found] <cover.1627657061.git.rickyman7@gmail.com>
                   ` (5 preceding siblings ...)
  2021-07-30 15:34 ` [RFC PATCH v2 06/10] perf workqueue: introduce workqueue struct Riccardo Mancini
@ 2021-07-30 15:34 ` Riccardo Mancini
  2021-07-30 15:34 ` [RFC PATCH v2 08/10] perf workqueue: add queue_work and flush_workqueue functions Riccardo Mancini
                   ` (2 subsequent siblings)
  9 siblings, 0 replies; 21+ messages in thread
From: Riccardo Mancini @ 2021-07-30 15:34 UTC (permalink / raw)
  To: Arnaldo Carvalho de Melo
  Cc: Ian Rogers, Namhyung Kim, Peter Zijlstra, Ingo Molnar,
	Mark Rutland, Jiri Olsa, linux-kernel, linux-perf-users,
	Alexey Bayduraev, Riccardo Mancini

This patch adds the implementation of the worker thread that is executed
in the threadpool, and all management-related functions.

At startup, a worker registers itself with the workqueue by adding itself
to the idle_list, then it sends an ack back to the main thread. When
creating wotkers, the main thread will wait for the related acks.
Once there is work to do, threads are woken up to perform the work.
Threads will try to dequeue a new pending work before going to sleep.

This registering mechanism has been implemented to enable for dynamic
growth of the workqueue in the future.

Signed-off-by: Riccardo Mancini <rickyman7@gmail.com>
---
 tools/perf/util/workqueue/workqueue.c | 243 +++++++++++++++++++++++++-
 1 file changed, 242 insertions(+), 1 deletion(-)

diff --git a/tools/perf/util/workqueue/workqueue.c b/tools/perf/util/workqueue/workqueue.c
index d3c6d4c4e75944a5..16a55de25cf247d8 100644
--- a/tools/perf/util/workqueue/workqueue.c
+++ b/tools/perf/util/workqueue/workqueue.c
@@ -12,6 +12,15 @@
 #include <internal/lib.h>
 #include "workqueue.h"
 
+enum worker_msg {
+	WORKER_MSG__UNDEFINED,
+	WORKER_MSG__READY,                          /* from worker: ack */
+	WORKER_MSG__WAKE,                           /* to worker: wake up */
+	WORKER_MSG__STOP,                           /* to worker: exit */
+	WORKER_MSG__ERROR,
+	WORKER_MSG__MAX
+};
+
 enum workqueue_status {
 	WORKQUEUE_STATUS__READY,	/* wq is ready to receive work */
 	WORKQUEUE_STATUS__ERROR,
@@ -46,13 +55,216 @@ static const char * const workqueue_errno_str[] = {
 	"Received unexpected message from worker",
 };
 
+struct worker {
+	int				tidx;		/* idx of thread in pool */
+	struct list_head		entry;		/* in idle or busy list */
+	struct work_struct		*current_work;	/* work being processed */
+	int				msg_pipe[2];	/* main thread comm pipes*/
+};
+
+#define for_each_busy_worker(wq, m_worker) \
+	list_for_each_entry(m_worker, &wq->busy_list, entry)
+
+#define for_each_idle_worker(wq, m_worker) \
+	list_for_each_entry(m_worker, &wq->idle_list, entry)
+
+static inline int lock_workqueue(struct workqueue_struct *wq)
+__acquires(&wq->lock)
+{
+	__acquire(&wq->lock);
+	return pthread_mutex_lock(&wq->lock);
+}
+
+static inline int unlock_workqueue(struct workqueue_struct *wq)
+__releases(&wq->lock)
+{
+	__release(&wq->lock);
+	return pthread_mutex_unlock(&wq->lock);
+}
+
+/**
+ * available_work - check if @wq has work to do
+ */
+static int available_work(struct workqueue_struct *wq)
+__must_hold(&wq->lock)
+{
+	return !list_empty(&wq->pending);
+}
+
+/**
+ * dequeue_work - retrieve the next work in @wq to be executed by the worker
+ *
+ * Called inside worker.
+ */
+static struct work_struct *dequeue_work(struct workqueue_struct *wq)
+__must_hold(&wq->lock)
+{
+	struct work_struct *work = list_first_entry(&wq->pending, struct work_struct, entry);
+
+	list_del_init(&work->entry);
+	return work;
+}
+
+/**
+ * sleep_worker - worker @w of workqueue @wq goes to sleep
+ *
+ * Called inside worker.
+ * If this was the last idle thread, signal it to the main thread, in case it
+ * was flushing the workqueue.
+ */
+static void sleep_worker(struct workqueue_struct *wq, struct worker *w)
+__must_hold(&wq->lock)
+{
+	list_move(&w->entry, &wq->idle_list);
+	if (list_empty(&wq->busy_list))
+		pthread_cond_signal(&wq->idle_cond);
+}
+
+/**
+ * stop_worker - stop worker @w
+ *
+ * Called from main thread.
+ * Send stop message to worker @w.
+ */
+static int stop_worker(struct worker *w)
+{
+	int ret;
+	enum worker_msg msg;
+
+	msg = WORKER_MSG__STOP;
+	ret = writen(w->msg_pipe[1], &msg, sizeof(msg));
+	if (ret < 0) {
+		pr_debug2("workqueue: error sending stop msg: %s\n",
+			strerror(errno));
+		return -WORKQUEUE_ERROR__WRITEPIPE;
+	}
+
+	return 0;
+}
+
+/**
+ * init_worker - init @w struct
+ * @w: the struct to init
+ * @tidx: index of the executing thread inside the threadpool
+ */
+static int init_worker(struct worker *w, int tidx)
+{
+	if (pipe(w->msg_pipe)) {
+		pr_debug2("worker[%d]: error opening pipe: %s\n", tidx, strerror(errno));
+		return -ENOMEM;
+	}
+
+	w->tidx = tidx;
+	w->current_work = NULL;
+	INIT_LIST_HEAD(&w->entry);
+
+	return 0;
+}
+
+/**
+ * fini_worker - deallocate resources used by @w struct
+ */
+static void fini_worker(struct worker *w)
+{
+	close(w->msg_pipe[0]);
+	w->msg_pipe[0] = -1;
+	close(w->msg_pipe[1]);
+	w->msg_pipe[1] = -1;
+}
+
+/**
+ * register_worker - add worker to @wq->idle_list
+ */
+static void register_worker(struct workqueue_struct *wq, struct worker *w)
+__must_hold(&wq->lock)
+{
+	list_move(&w->entry, &wq->idle_list);
+}
+
+/**
+ * unregister_worker - remove worker from @wq->idle_list
+ */
+static void unregister_worker(struct workqueue_struct *wq __maybe_unused,
+			struct worker *w)
+__must_hold(&wq->lock)
+{
+	list_del_init(&w->entry);
+}
+
 /**
  * worker_thread - worker function executed on threadpool
  */
 static void worker_thread(int tidx, struct task_struct *task)
 {
+	struct workqueue_struct *wq = container_of(task, struct workqueue_struct, task);
+	struct worker this_worker;
+	enum worker_msg msg;
+	int ret, init_err = init_worker(&this_worker, tidx);
+
+	if (init_err) {
+		// send error message to main thread
+		msg = WORKER_MSG__ERROR;
+	} else {
+		lock_workqueue(wq);
+		register_worker(wq, &this_worker);
+		unlock_workqueue(wq);
+
+		// ack worker creation
+		msg = WORKER_MSG__READY;
+	}
+
+	ret = writen(wq->msg_pipe[1], &msg, sizeof(msg));
+	if (ret < 0) {
+		pr_debug("worker[%d]: error sending msg: %s\n",
+			tidx, strerror(errno));
+
+		if (init_err)
+			return;
+		goto out;
+	}
 
-	pr_info("Hi from worker %d, executing task %p\n", tidx, task);
+	// stop if there have been errors in init
+	if (init_err)
+		return;
+
+	for (;;) {
+		msg = WORKER_MSG__UNDEFINED;
+		ret = readn(this_worker.msg_pipe[0], &msg, sizeof(msg));
+		if (ret < 0 || (msg != WORKER_MSG__WAKE && msg != WORKER_MSG__STOP)) {
+			pr_debug("worker[%d]: error receiving msg: %s\n",
+				tidx, strerror(errno));
+			break;
+		}
+
+		if (msg == WORKER_MSG__STOP)
+			break;
+
+		// main thread takes care of moving to busy list and assigning current_work
+
+		while (this_worker.current_work) {
+			this_worker.current_work->func(this_worker.current_work);
+
+			lock_workqueue(wq);
+			if (available_work(wq)) {
+				this_worker.current_work = dequeue_work(wq);
+				pr_debug2("worker[%d]: dequeued work\n",
+					tidx);
+			} else {
+				this_worker.current_work = NULL;
+				sleep_worker(wq, &this_worker);
+				pr_debug2("worker[%d]: going to sleep\n",
+					tidx);
+			}
+			unlock_workqueue(wq);
+		}
+	}
+
+out:
+	lock_workqueue(wq);
+	unregister_worker(wq, &this_worker);
+	unlock_workqueue(wq);
+
+	fini_worker(&this_worker);
 }
 
 /**
@@ -61,6 +273,9 @@ static void worker_thread(int tidx, struct task_struct *task)
 static int attach_threadpool_to_workqueue(struct workqueue_struct *wq,
 					struct threadpool *pool)
 {
+	int ret, t;
+	enum worker_msg msg;
+
 	if (!threadpool__is_ready(pool)) {
 		pr_debug2("workqueue: cannot attach to pool: pool is not ready\n");
 		return -WORKQUEUE_ERROR__NOTALLOWED;
@@ -72,6 +287,22 @@ static int attach_threadpool_to_workqueue(struct workqueue_struct *wq,
 	if (wq->pool_errno)
 		return -WORKQUEUE_ERROR__POOLEXE;
 
+
+	// wait ack from all threads
+	for (t = 0; t < threadpool__size(pool); t++) {
+		msg = WORKER_MSG__UNDEFINED;
+		ret = readn(wq->msg_pipe[0], &msg, sizeof(msg));
+		if (ret < 0) {
+			pr_debug("workqueue: error receiving ack: %s\n",
+				strerror(errno));
+			return -WORKQUEUE_ERROR__READPIPE;
+		}
+		if (msg != WORKER_MSG__READY) {
+			pr_debug2("workqueue: received error\n");
+			return -WORKQUEUE_ERROR__INVALIDMSG;
+		}
+	}
+
 	return 0;
 }
 
@@ -81,12 +312,22 @@ static int attach_threadpool_to_workqueue(struct workqueue_struct *wq,
 static int detach_threadpool_from_workqueue(struct workqueue_struct *wq)
 {
 	int ret, err = 0;
+	struct worker *w;
 
 	if (wq->status != WORKQUEUE_STATUS__READY) {
 		pr_debug2("workqueue: cannot attach to pool: wq is not ready\n");
 		return -WORKQUEUE_ERROR__NOTALLOWED;
 	}
 
+	lock_workqueue(wq);
+	for_each_idle_worker(wq, w) {
+		ret = stop_worker(w);
+		if (ret)
+			err = ret;
+	}
+	unlock_workqueue(wq);
+
+
 	ret = threadpool__wait(wq->pool);
 	if (ret) {
 		pr_debug2("workqueue: error waiting threadpool\n");
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [RFC PATCH v2 08/10] perf workqueue: add queue_work and flush_workqueue functions
       [not found] <cover.1627657061.git.rickyman7@gmail.com>
                   ` (6 preceding siblings ...)
  2021-07-30 15:34 ` [RFC PATCH v2 07/10] perf workqueue: implement worker thread and management Riccardo Mancini
@ 2021-07-30 15:34 ` Riccardo Mancini
  2021-07-30 15:34 ` [RFC PATCH v2 09/10] perf workqueue: add utility to execute a for loop in parallel Riccardo Mancini
  2021-07-30 15:34 ` [RFC PATCH v2 10/10] perf synthetic-events: use workqueue parallel_for Riccardo Mancini
  9 siblings, 0 replies; 21+ messages in thread
From: Riccardo Mancini @ 2021-07-30 15:34 UTC (permalink / raw)
  To: Arnaldo Carvalho de Melo
  Cc: Ian Rogers, Namhyung Kim, Peter Zijlstra, Ingo Molnar,
	Mark Rutland, Jiri Olsa, linux-kernel, linux-perf-users,
	Alexey Bayduraev, Riccardo Mancini

This patch adds functions to queue and wait work_structs, and
related tests.

When a new work item is added, the workqueue first checks if there
are threads to wake up. If so, it wakes it up with the given work item,
otherwise it will just add it to the list pending work items. A thread
which completes a work item will check this list before going to sleep.

Signed-off-by: Riccardo Mancini <rickyman7@gmail.com>
---
 tools/perf/tests/workqueue.c          |  70 ++++++++++++++++-
 tools/perf/util/workqueue/workqueue.c | 106 ++++++++++++++++++++++++++
 tools/perf/util/workqueue/workqueue.h |   6 ++
 3 files changed, 181 insertions(+), 1 deletion(-)

diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
index 2165a563e47bd6a5..2488ee971877b1f0 100644
--- a/tools/perf/tests/workqueue.c
+++ b/tools/perf/tests/workqueue.c
@@ -147,6 +147,27 @@ static int __test__threadpool(void *_args)
 	return ret;
 }
 
+struct test_work {
+	struct work_struct work;
+	int i;
+	int *array;
+};
+
+static void test_work_fn1(struct work_struct *work)
+{
+	struct test_work *mwork = container_of(work, struct test_work, work);
+
+	dummy_work(mwork->i);
+	mwork->array[mwork->i] = mwork->i+1;
+}
+
+static void test_work_fn2(struct work_struct *work)
+{
+	struct test_work *mwork = container_of(work, struct test_work, work);
+
+	dummy_work(mwork->i);
+	mwork->array[mwork->i] = mwork->i*2;
+}
 
 static int __workqueue__prepare(struct threadpool **pool,
 				struct workqueue_struct **wq,
@@ -175,21 +196,68 @@ static int __workqueue__teardown(struct threadpool *pool,
 	return __threadpool__teardown(pool);
 }
 
+static int __workqueue__exec_wait(struct workqueue_struct *wq,
+				int *array, struct test_work *works,
+				work_func_t func, int n_work_items)
+{
+	int ret, i;
+
+	for (i = 0; i < n_work_items; i++) {
+		works[i].array = array;
+		works[i].i = i;
+
+		init_work(&works[i].work);
+		works[i].work.func = func;
+		queue_work(wq, &works[i].work);
+	}
+
+	ret = flush_workqueue(wq);
+	TEST_ASSERT_VAL("workqueue flush failure", ret == 0);
+
+	return TEST_OK;
+}
+
+
 static int __test__workqueue(void *_args)
 {
 	struct workqueue_test_args_t *args = _args;
 	struct threadpool *pool;
 	struct workqueue_struct *wq;
-	int ret = __workqueue__prepare(&pool, &wq, args->pool_size);
+	int *array;
+	struct test_work *works;
+	int i, ret = __workqueue__prepare(&pool, &wq, args->pool_size);
 
+	if (ret)
+		return ret;
+
+	array = calloc(args->n_work_items, sizeof(*array));
+	TEST_ASSERT_VAL("failed array calloc", array);
+	works = calloc(args->n_work_items, sizeof(*works));
+	TEST_ASSERT_VAL("failed works calloc", works);
+
+	ret = __workqueue__exec_wait(wq, array, works, test_work_fn1,
+					args->n_work_items);
 	if (ret)
 		goto out;
 
+	for (i = 0; i < args->n_work_items; i++)
+		TEST_ASSERT_VAL("failed array check (1)", array[i] == i+1);
+
+	ret = __workqueue__exec_wait(wq, array, works, test_work_fn2,
+					args->n_work_items);
+	if (ret)
+		goto out;
+
+	for (i = 0; i < args->n_work_items; i++)
+		TEST_ASSERT_VAL("failed array check (2)", array[i] == 2*i);
+
 	ret = __workqueue__teardown(pool, wq);
 	if (ret)
 		goto out;
 
 out:
+	free(array);
+	free(works);
 	return ret;
 }
 
diff --git a/tools/perf/util/workqueue/workqueue.c b/tools/perf/util/workqueue/workqueue.c
index 16a55de25cf247d8..a32d62dac1ec04a6 100644
--- a/tools/perf/util/workqueue/workqueue.c
+++ b/tools/perf/util/workqueue/workqueue.c
@@ -23,12 +23,14 @@ enum worker_msg {
 
 enum workqueue_status {
 	WORKQUEUE_STATUS__READY,	/* wq is ready to receive work */
+	WORKQUEUE_STATUS__STOPPING,	/* wq is being destructed */
 	WORKQUEUE_STATUS__ERROR,
 	WORKQUEUE_STATUS__MAX
 };
 
 static const char * const workqueue_status_tags[] = {
 	"ready",
+	"stopping",
 	"error"
 };
 
@@ -120,6 +122,39 @@ __must_hold(&wq->lock)
 		pthread_cond_signal(&wq->idle_cond);
 }
 
+/**
+ * wake_worker - wake worker @w of workqueue @wq assigning @work to do
+ *
+ * Called from main thread.
+ * Moves worker from idle to busy list, assigns @work to it and sends it a
+ * wake up message.
+ *
+ * NB: this function releases the lock to be able to send the notification
+ * outside the critical section.
+ */
+static int wake_worker(struct workqueue_struct *wq, struct worker *w,
+			struct work_struct *work)
+__must_hold(&wq->lock)
+__releases(&wq->lock)
+{
+	enum worker_msg msg = WORKER_MSG__WAKE;
+	int ret;
+
+	list_move(&w->entry, &wq->busy_list);
+	w->current_work = work;
+	unlock_workqueue(wq);
+
+	// send wake msg outside critical section to reduce time spent inside it
+	ret = writen(w->msg_pipe[1], &msg, sizeof(msg));
+	if (ret < 0) {
+		pr_debug2("wake_worker[%d]: error seding msg: %s\n",
+			w->tidx, strerror(errno));
+		return -WORKQUEUE_ERROR__WRITEPIPE;
+	}
+
+	return 0;
+}
+
 /**
  * stop_worker - stop worker @w
  *
@@ -319,6 +354,11 @@ static int detach_threadpool_from_workqueue(struct workqueue_struct *wq)
 		return -WORKQUEUE_ERROR__NOTALLOWED;
 	}
 
+	wq->status = WORKQUEUE_STATUS__STOPPING;
+	ret = flush_workqueue(wq);
+	if (ret)
+		return ret;
+
 	lock_workqueue(wq);
 	for_each_idle_worker(wq, w) {
 		ret = stop_worker(w);
@@ -507,3 +547,69 @@ int workqueue_nr_threads(struct workqueue_struct *wq)
 {
 	return threadpool__size(wq->pool);
 }
+
+/**
+ * queue_work - add @work to @wq internal queue
+ *
+ * If there are idle threads, one of these will be woken up.
+ * Otherwise, the work is added to the pending list.
+ */
+int queue_work(struct workqueue_struct *wq, struct work_struct *work)
+{
+	struct worker *chosen_worker;
+	int ret = 0;
+
+	// in particular, this can fail if workqueue is marked to be stopping
+	if (wq->status != WORKQUEUE_STATUS__READY) {
+		pr_debug2("workqueue: trying to queue but workqueue is not ready\n");
+		return -WORKQUEUE_ERROR__NOTALLOWED;
+	}
+
+	lock_workqueue(wq);
+	if (list_empty(&wq->idle_list)) {
+		list_add_tail(&work->entry, &wq->pending);
+		unlock_workqueue(wq);
+		pr_debug("workqueue: queued new work item\n");
+	} else {
+		chosen_worker = list_first_entry(&wq->idle_list, struct worker, entry);
+		ret = wake_worker(wq, chosen_worker, work);
+		pr_debug("workqueue: woke worker %d\n", chosen_worker->tidx);
+	}
+
+	if (ret) {
+		wq->status = WORKQUEUE_STATUS__ERROR;
+		return ret;
+	}
+	return 0;
+}
+
+/**
+ * flush_workqueue - wait for all currently executed and pending work to finish
+ *
+ * This function blocks until all threads become idle.
+ */
+int flush_workqueue(struct workqueue_struct *wq)
+{
+	int err = 0, ret;
+
+	lock_workqueue(wq);
+	while (!list_empty(&wq->busy_list)) {
+		ret = pthread_cond_wait(&wq->idle_cond, &wq->lock);
+		if (ret) {
+			pr_debug2("%s: error in pthread_cond_wait\n", __func__);
+			err = -ret;
+			break;
+		}
+	}
+	unlock_workqueue(wq);
+
+	return err;
+}
+
+/**
+ * init_work - initialize the @work struct
+ */
+void init_work(struct work_struct *work)
+{
+	INIT_LIST_HEAD(&work->entry);
+}
diff --git a/tools/perf/util/workqueue/workqueue.h b/tools/perf/util/workqueue/workqueue.h
index 456cd8b6cb2a26d8..ec998291b6a0623d 100644
--- a/tools/perf/util/workqueue/workqueue.h
+++ b/tools/perf/util/workqueue/workqueue.h
@@ -22,6 +22,12 @@ extern int destroy_workqueue(struct workqueue_struct *wq);
 
 extern int workqueue_nr_threads(struct workqueue_struct *wq);
 
+extern int queue_work(struct workqueue_struct *wq, struct work_struct *work);
+
+extern int flush_workqueue(struct workqueue_struct *wq);
+
+extern void init_work(struct work_struct *work);
+
 #define WORKQUEUE_STRERR_BUFSIZE (128+THREADPOOL_STRERR_BUFSIZE)
 #define WORKQUEUE_ERROR__OFFSET 512
 enum {
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [RFC PATCH v2 09/10] perf workqueue: add utility to execute a for loop in parallel
       [not found] <cover.1627657061.git.rickyman7@gmail.com>
                   ` (7 preceding siblings ...)
  2021-07-30 15:34 ` [RFC PATCH v2 08/10] perf workqueue: add queue_work and flush_workqueue functions Riccardo Mancini
@ 2021-07-30 15:34 ` Riccardo Mancini
  2021-07-30 15:34 ` [RFC PATCH v2 10/10] perf synthetic-events: use workqueue parallel_for Riccardo Mancini
  9 siblings, 0 replies; 21+ messages in thread
From: Riccardo Mancini @ 2021-07-30 15:34 UTC (permalink / raw)
  To: Arnaldo Carvalho de Melo
  Cc: Ian Rogers, Namhyung Kim, Peter Zijlstra, Ingo Molnar,
	Mark Rutland, Jiri Olsa, linux-kernel, linux-perf-users,
	Alexey Bayduraev, Riccardo Mancini

This patch adds the parallel_for which executes a given function inside
the workqueue, taking care of managing the work items.

Signed-off-by: Riccardo Mancini <rickyman7@gmail.com>
---
 tools/perf/tests/workqueue.c          |  85 ++++++++++++++++
 tools/perf/util/workqueue/workqueue.c | 135 ++++++++++++++++++++++++++
 tools/perf/util/workqueue/workqueue.h |   7 ++
 3 files changed, 227 insertions(+)

diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
index 2488ee971877b1f0..258fc0196a55c407 100644
--- a/tools/perf/tests/workqueue.c
+++ b/tools/perf/tests/workqueue.c
@@ -20,6 +20,12 @@ struct workqueue_test_args_t {
 	int n_work_items;
 };
 
+struct parallel_for_test_args_t {
+	int pool_size;
+	int n_work_items;
+	int work_size;
+};
+
 struct test_task {
 	struct task_struct task;
 	int n_threads;
@@ -261,6 +267,45 @@ static int __test__workqueue(void *_args)
 	return ret;
 }
 
+static void test_pfw_fn(int i, void *args)
+{
+	int *array = args;
+
+	dummy_work(i);
+	array[i] = i+1;
+}
+
+static int __test__parallel_for(void *_args)
+{
+	struct parallel_for_test_args_t *args = _args;
+	struct threadpool *pool;
+	struct workqueue_struct *wq;
+	int ret, i;
+	int *array = calloc(args->n_work_items, sizeof(*array));
+
+	TEST_ASSERT_VAL("calloc array failure", array);
+
+	ret = __workqueue__prepare(&pool, &wq, args->pool_size);
+	if (ret)
+		goto out;
+
+	ret = parallel_for(wq, 0, args->n_work_items, args->work_size,
+				test_pfw_fn, array);
+	TEST_ASSERT_VAL("parallel_for failure", ret == 0);
+
+	for (i = 0; i < args->n_work_items; i++)
+		TEST_ASSERT_VAL("failed array check", array[i] == i+1);
+
+	ret = __workqueue__teardown(pool, wq);
+	if (ret)
+		goto out;
+
+out:
+	free(array);
+
+	return TEST_OK;
+}
+
 static const struct threadpool_test_args_t threadpool_test_args[] = {
 	{
 		.pool_size = 1
@@ -306,6 +351,39 @@ static const struct workqueue_test_args_t workqueue_test_args[] = {
 	}
 };
 
+static const struct parallel_for_test_args_t parallel_for_test_args[] = {
+	{
+		.pool_size = 1,
+		.n_work_items = 1,
+		.work_size = 1
+	},
+	{
+		.pool_size = 1,
+		.n_work_items = 10,
+		.work_size = 3
+	},
+	{
+		.pool_size = 2,
+		.n_work_items = 1,
+		.work_size = 1
+	},
+	{
+		.pool_size = 2,
+		.n_work_items = 100,
+		.work_size = 10
+	},
+	{
+		.pool_size = 16,
+		.n_work_items = 7,
+		.work_size = 2
+	},
+	{
+		.pool_size = 16,
+		.n_work_items = 2789,
+		.work_size = 16
+	}
+};
+
 struct test_case {
 	const char *desc;
 	int (*func)(void *args);
@@ -328,6 +406,13 @@ static struct test_case workqueue_testcase_table[] = {
 		.args = (void *) workqueue_test_args,
 		.n_args = (int)ARRAY_SIZE(workqueue_test_args),
 		.arg_size = sizeof(struct workqueue_test_args_t)
+	},
+	{
+		.desc = "Workqueue parallel-for",
+		.func = __test__parallel_for,
+		.args = (void *) parallel_for_test_args,
+		.n_args = (int)ARRAY_SIZE(parallel_for_test_args),
+		.arg_size = sizeof(struct parallel_for_test_args_t)
 	}
 };
 
diff --git a/tools/perf/util/workqueue/workqueue.c b/tools/perf/util/workqueue/workqueue.c
index a32d62dac1ec04a6..7f45b1e264ed6145 100644
--- a/tools/perf/util/workqueue/workqueue.c
+++ b/tools/perf/util/workqueue/workqueue.c
@@ -8,6 +8,7 @@
 #include <linux/list.h>
 #include <linux/err.h>
 #include <linux/string.h>
+#include <linux/kernel.h>
 #include "debug.h"
 #include <internal/lib.h>
 #include "workqueue.h"
@@ -613,3 +614,137 @@ void init_work(struct work_struct *work)
 {
 	INIT_LIST_HEAD(&work->entry);
 }
+
+/* Parallel-for utility */
+
+struct parallel_for_work {
+	struct work_struct work;	/* work item that is queued */
+	parallel_for_func_t func;	/* function to execute for each item */
+	void *args;			/* additional args to pass to func */
+	int start;			/* first item to execute */
+	int num;			/* number of items to execute */
+};
+
+/**
+ * parallel_for_work_fn - execute parallel_for_work.func in parallel
+ *
+ * This function will be executed by workqueue's workers.
+ */
+static void parallel_for_work_fn(struct work_struct *work)
+{
+	struct parallel_for_work *pfw = container_of(work, struct parallel_for_work, work);
+	int i;
+
+	for (i = 0; i < pfw->num; i++)
+		pfw->func(pfw->start+i, pfw->args);
+}
+
+static inline void init_parallel_for_work(struct parallel_for_work *pfw,
+					parallel_for_func_t func, void *args,
+					int start, int num)
+{
+	init_work(&pfw->work);
+	pfw->work.func = parallel_for_work_fn;
+	pfw->func = func;
+	pfw->args = args;
+	pfw->start = start;
+	pfw->num = num;
+
+	pr_debug2("pfw: start=%d, num=%d\n", start, num);
+}
+
+/**
+ * parallel_for - execute @func in parallel over indexes between @from and @to
+ * @wq: workqueue that will run @func in parallel
+ * @from: first index
+ * @to: last index (excluded)
+ * @work_size: number of indexes to handle on the same work item.
+ *             ceil((to-from)/work_size) work items will be added to @wq
+ *             NB: this is only a hint. The function will reduce the size of
+ *                 the work items to fill all workers.
+ * @func: function to execute in parallel
+ * @args: additional arguments to @func
+ *
+ * This function is equivalent to:
+ * for (i = from; i < to; i++) {
+ *     // parallel
+ *     func(i, args);
+ * }
+ * // sync
+ *
+ * This function takes care of:
+ *  - creating balanced work items to submit to workqueue
+ *  - submitting the work items to the workqueue
+ *  - waiting for completion of the work items
+ *  - cleanup of the work items
+ */
+int parallel_for(struct workqueue_struct *wq, int from, int to, int work_size,
+		parallel_for_func_t func, void *args)
+{
+	int n = to-from;
+	int n_work_items;
+	int nr_threads = workqueue_nr_threads(wq);
+	int i, j, start, num, m, base, num_per_item;
+	struct parallel_for_work *pfw_array;
+	int ret, err = 0;
+
+	if (work_size <= 0) {
+		pr_debug("workqueue parallel-for: work_size must be >0\n");
+		return -EINVAL;
+	}
+
+	if (to < from) {
+		pr_debug("workqueue parallel-for: to must be >= from\n");
+		return -EINVAL;
+	} else if (to == from) {
+		pr_debug2("workqueue parallel-for: skip since from == to\n");
+		return 0;
+	}
+
+	n_work_items = DIV_ROUND_UP(n, work_size);
+	if (n_work_items < nr_threads)
+		n_work_items = min(n, nr_threads);
+
+	pfw_array = calloc(n_work_items, sizeof(*pfw_array));
+
+	if (!pfw_array) {
+		pr_debug2("%s: error allocating pfw_array\n", __func__);
+		return -ENOMEM;
+	}
+
+	num_per_item = n / n_work_items;
+	m = n % n_work_items;
+
+	for (i = 0; i < m; i++) {
+		num = num_per_item + 1;
+		start = i * num;
+		init_parallel_for_work(&pfw_array[i], func, args, start, num);
+		ret = queue_work(wq, &pfw_array[i].work);
+		if (ret) {
+			err = ret;
+			goto out;
+		}
+	}
+	if (i != 0)
+		base = pfw_array[i-1].start + pfw_array[i-1].num;
+	else
+		base = 0;
+	for (j = i; j < n_work_items; j++) {
+		num = num_per_item;
+		start = base + (j - i) * num;
+		init_parallel_for_work(&pfw_array[j], func, args, start, num);
+		ret = queue_work(wq, &pfw_array[j].work);
+		if (ret) {
+			err = ret;
+			goto out;
+		}
+	}
+
+out:
+	ret = flush_workqueue(wq);
+	if (ret)
+		err = ret;
+
+	free(pfw_array);
+	return err;
+}
diff --git a/tools/perf/util/workqueue/workqueue.h b/tools/perf/util/workqueue/workqueue.h
index ec998291b6a0623d..6185f4ca0f662dcb 100644
--- a/tools/perf/util/workqueue/workqueue.h
+++ b/tools/perf/util/workqueue/workqueue.h
@@ -28,6 +28,13 @@ extern int flush_workqueue(struct workqueue_struct *wq);
 
 extern void init_work(struct work_struct *work);
 
+/* parallel_for utility */
+
+typedef void (*parallel_for_func_t)(int i, void *args);
+
+extern int parallel_for(struct workqueue_struct *wq, int from, int to, int work_size,
+			parallel_for_func_t func, void *args);
+
 #define WORKQUEUE_STRERR_BUFSIZE (128+THREADPOOL_STRERR_BUFSIZE)
 #define WORKQUEUE_ERROR__OFFSET 512
 enum {
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [RFC PATCH v2 10/10] perf synthetic-events: use workqueue parallel_for
       [not found] <cover.1627657061.git.rickyman7@gmail.com>
                   ` (8 preceding siblings ...)
  2021-07-30 15:34 ` [RFC PATCH v2 09/10] perf workqueue: add utility to execute a for loop in parallel Riccardo Mancini
@ 2021-07-30 15:34 ` Riccardo Mancini
  2021-08-09 12:04   ` Jiri Olsa
  9 siblings, 1 reply; 21+ messages in thread
From: Riccardo Mancini @ 2021-07-30 15:34 UTC (permalink / raw)
  To: Arnaldo Carvalho de Melo
  Cc: Ian Rogers, Namhyung Kim, Peter Zijlstra, Ingo Molnar,
	Mark Rutland, Jiri Olsa, linux-kernel, linux-perf-users,
	Alexey Bayduraev, Riccardo Mancini

To generate synthetic events, perf has the option to use multiple
threads. These threads are created manually using pthread_created.

This patch replaces the manual pthread_create with a workqueue,
using the parallel_for utility.

Experimental results show that workqueue has a slightly higher overhead,
but this is repayed by the improved work balancing among threads.

Results of perf bench before and after are reported below:
Command: sudo ./perf bench internals synthesize -t
Average synthesis time in usec is reported.

Laptop (2 cores 4 threads i7), avg num events ~21500:
 N    pthread (before)        workqueue (after)
 1  121475.200 +- 2227.757  118882.900 +- 1389.398
 2   72834.100 +- 1860.677   67668.600 +- 2847.693
 3   70650.200 +-  540.096   55694.200 +-  496.155
 4   55554.300 +-  259.968   50901.400 +-  434.327

VM (16 vCPU over 16 cores 32 threads Xeon), avg num events ~2920:
 N    pthread (before)        workqueue (after)
 1  35182.400 +- 3561.189   37528.300 +- 2972.887
 2  29188.400 +- 2191.767   28250.300 +- 1694.575
 3  22172.200 +-  788.659   19062.400 +-  611.201
 4  21600.700 +-  728.941   16812.900 +- 1085.359
 5  19395.800 +- 1070.617   14764.600 +- 1339.113
 6  18553.000 +- 1272.486   12814.200 +-  408.462
 7  14691.400 +-  485.105   12382.200 +-  464.964
 8  16036.400 +-  842.728   15015.000 +- 1648.844
 9  15606.800 +-  470.100   13230.800 +- 1288.246
10  15527.000 +-  822.317   12661.800 +-  873.199
11  13097.400 +-  513.870   13082.700 +-  974.378
12  14053.700 +-  592.427   13123.400 +- 1054.939
13  15446.400 +-  765.850   12837.200 +-  770.646
14  14979.400 +- 1056.955   13695.400 +- 1066.302
15  12578.000 +-  846.142   15053.600 +-  992.118
16  12394.800 +-  602.295   13683.700 +-  911.517

Signed-off-by: Riccardo Mancini <rickyman7@gmail.com>
---
 tools/perf/util/synthetic-events.c | 155 +++++++++++++++--------------
 1 file changed, 81 insertions(+), 74 deletions(-)

diff --git a/tools/perf/util/synthetic-events.c b/tools/perf/util/synthetic-events.c
index 35aa0c0f7cd955b2..3fcda677e100b3ae 100644
--- a/tools/perf/util/synthetic-events.c
+++ b/tools/perf/util/synthetic-events.c
@@ -22,6 +22,7 @@
 #include <linux/string.h>
 #include <linux/zalloc.h>
 #include <linux/perf_event.h>
+#include <linux/err.h>
 #include <asm/bug.h>
 #include <perf/evsel.h>
 #include <perf/cpumap.h>
@@ -41,6 +42,7 @@
 #include <sys/stat.h>
 #include <fcntl.h>
 #include <unistd.h>
+#include "util/workqueue/workqueue.h"
 
 #define DEFAULT_PROC_MAP_PARSE_TIMEOUT 500
 
@@ -882,16 +884,13 @@ static int __perf_event__synthesize_threads(struct perf_tool *tool,
 					    perf_event__handler_t process,
 					    struct machine *machine,
 					    bool mmap_data,
-					    struct dirent **dirent,
-					    int start,
-					    int num)
+					    char *d_name)
 {
 	union perf_event *comm_event, *mmap_event, *fork_event;
 	union perf_event *namespaces_event;
 	int err = -1;
 	char *end;
 	pid_t pid;
-	int i;
 
 	comm_event = malloc(sizeof(comm_event->comm) + machine->id_hdr_size);
 	if (comm_event == NULL)
@@ -911,24 +910,22 @@ static int __perf_event__synthesize_threads(struct perf_tool *tool,
 	if (namespaces_event == NULL)
 		goto out_free_fork;
 
-	for (i = start; i < start + num; i++) {
-		if (!isdigit(dirent[i]->d_name[0]))
-			continue;
+	if (!isdigit(d_name[0]))
+		goto out_free_namespaces;
 
-		pid = (pid_t)strtol(dirent[i]->d_name, &end, 10);
-		/* only interested in proper numerical dirents */
-		if (*end)
-			continue;
-		/*
-		 * We may race with exiting thread, so don't stop just because
-		 * one thread couldn't be synthesized.
-		 */
-		__event__synthesize_thread(comm_event, mmap_event, fork_event,
-					   namespaces_event, pid, 1, process,
-					   tool, machine, mmap_data);
-	}
+	pid = (pid_t)strtol(d_name, &end, 10);
+	/* only interested in proper numerical dirents */
+	if (*end)
+		goto out_free_namespaces;
+	/*
+	 * We may race with exiting thread, so don't stop just because
+	 * one thread couldn't be synthesized.
+	 */
+	__event__synthesize_thread(comm_event, mmap_event, fork_event,
+					namespaces_event, pid, 1, process,
+					tool, machine, mmap_data);
 	err = 0;
-
+out_free_namespaces:
 	free(namespaces_event);
 out_free_fork:
 	free(fork_event);
@@ -946,19 +943,15 @@ struct synthesize_threads_arg {
 	struct machine *machine;
 	bool mmap_data;
 	struct dirent **dirent;
-	int num;
-	int start;
 };
 
-static void *synthesize_threads_worker(void *arg)
+static void synthesize_threads_worker(int i, void *arg)
 {
 	struct synthesize_threads_arg *args = arg;
 
 	__perf_event__synthesize_threads(args->tool, args->process,
 					 args->machine, args->mmap_data,
-					 args->dirent,
-					 args->start, args->num);
-	return NULL;
+					 args->dirent[i]->d_name);
 }
 
 int perf_event__synthesize_threads(struct perf_tool *tool,
@@ -967,15 +960,15 @@ int perf_event__synthesize_threads(struct perf_tool *tool,
 				   bool mmap_data,
 				   unsigned int nr_threads_synthesize)
 {
-	struct synthesize_threads_arg *args = NULL;
-	pthread_t *synthesize_threads = NULL;
+	struct synthesize_threads_arg args;
 	char proc_path[PATH_MAX];
 	struct dirent **dirent;
-	int num_per_thread;
-	int m, n, i, j;
+	int n, i;
 	int thread_nr;
-	int base = 0;
-	int err = -1;
+	int err = -1, ret;
+	struct threadpool *pool;
+	struct workqueue_struct *wq;
+	char err_buf[WORKQUEUE_STRERR_BUFSIZE];
 
 
 	if (machine__is_default_guest(machine))
@@ -992,54 +985,68 @@ int perf_event__synthesize_threads(struct perf_tool *tool,
 		thread_nr = nr_threads_synthesize;
 
 	if (thread_nr <= 1) {
-		err = __perf_event__synthesize_threads(tool, process,
-						       machine, mmap_data,
-						       dirent, base, n);
+		for (i = 0; i < n; i++)
+			err = __perf_event__synthesize_threads(tool, process,
+								machine, mmap_data,
+								dirent[i]->d_name);
 		goto free_dirent;
 	}
-	if (thread_nr > n)
-		thread_nr = n;
 
-	synthesize_threads = calloc(sizeof(pthread_t), thread_nr);
-	if (synthesize_threads == NULL)
+	pool = threadpool__new(thread_nr);
+	if (IS_ERR(pool)) {
+		ret = threadpool__new_strerror(pool, err_buf, sizeof(err_buf));
+		pr_err("threadpool__new: %s\n",
+			ret ? "Error generating error msg" : err_buf);
 		goto free_dirent;
-
-	args = calloc(sizeof(*args), thread_nr);
-	if (args == NULL)
-		goto free_threads;
-
-	num_per_thread = n / thread_nr;
-	m = n % thread_nr;
-	for (i = 0; i < thread_nr; i++) {
-		args[i].tool = tool;
-		args[i].process = process;
-		args[i].machine = machine;
-		args[i].mmap_data = mmap_data;
-		args[i].dirent = dirent;
-	}
-	for (i = 0; i < m; i++) {
-		args[i].num = num_per_thread + 1;
-		args[i].start = i * args[i].num;
-	}
-	if (i != 0)
-		base = args[i-1].start + args[i-1].num;
-	for (j = i; j < thread_nr; j++) {
-		args[j].num = num_per_thread;
-		args[j].start = base + (j - i) * args[i].num;
 	}
 
-	for (i = 0; i < thread_nr; i++) {
-		if (pthread_create(&synthesize_threads[i], NULL,
-				   synthesize_threads_worker, &args[i]))
-			goto out_join;
-	}
-	err = 0;
-out_join:
-	for (i = 0; i < thread_nr; i++)
-		pthread_join(synthesize_threads[i], NULL);
-	free(args);
-free_threads:
-	free(synthesize_threads);
+	err = threadpool__start(pool);
+	if (err) {
+		ret = threadpool__strerror(pool, err, err_buf, sizeof(err_buf));
+		pr_err("threadpool__start: %s\n",
+			ret ? "Error generating error msg" : err_buf);
+		goto free_pool;
+	}
+
+	wq = create_workqueue(pool);
+	if (IS_ERR(wq)) {
+		ret = create_workqueue_strerror(wq, err_buf, sizeof(err_buf));
+		pr_err("create_workqueue: %s\n",
+			ret ? "Error generating error msg" : err_buf);
+		goto stop_pool;
+	}
+
+	args.tool = tool;
+	args.process = process;
+	args.machine = machine;
+	args.mmap_data = mmap_data;
+	args.dirent = dirent;
+
+	ret = parallel_for(wq, 0, n, 1, synthesize_threads_worker, &args);
+	if (ret) {
+		ret = workqueue_strerror(wq, ret, err_buf, sizeof(err_buf));
+		pr_err("parallel_for: %s\n",
+			ret ? "Error generating error msg" : err_buf);
+		err = ret;
+	}
+
+	ret = destroy_workqueue(wq);
+	if (ret) {
+		ret = destroy_workqueue_strerror(ret, err_buf, sizeof(err_buf));
+		pr_err("destroy_workqueue: %s\n",
+			ret ? "Error generating error msg" : err_buf);
+		err = ret;
+	}
+stop_pool:
+	ret = threadpool__stop(pool);
+	if (ret) {
+		ret = threadpool__strerror(pool, ret, err_buf, sizeof(err_buf));
+		pr_err("threadpool__stop: %s\n",
+			ret ? "Error generating error msg" : err_buf);
+		err = ret;
+	}
+free_pool:
+	threadpool__delete(pool);
 free_dirent:
 	for (i = 0; i < n; i++)
 		zfree(&dirent[i]);
-- 
2.31.1


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* Re: [RFC PATCH v2 01/10] perf workqueue: threadpool creation and destruction
  2021-07-30 15:34 ` [RFC PATCH v2 01/10] perf workqueue: threadpool creation and destruction Riccardo Mancini
@ 2021-08-07  2:24   ` Namhyung Kim
  2021-08-09 10:30     ` Riccardo Mancini
  0 siblings, 1 reply; 21+ messages in thread
From: Namhyung Kim @ 2021-08-07  2:24 UTC (permalink / raw)
  To: Riccardo Mancini
  Cc: Arnaldo Carvalho de Melo, Ian Rogers, Peter Zijlstra,
	Ingo Molnar, Mark Rutland, Jiri Olsa, linux-kernel,
	linux-perf-users, Alexey Bayduraev

Hi Riccardo,

On Fri, Jul 30, 2021 at 8:34 AM Riccardo Mancini <rickyman7@gmail.com> wrote:
>
> The workqueue library is made up by two components:
>  - threadpool: handles the lifetime of the threads
>  - workqueue: handles work distribution among the threads
>
> This first patch introduces the threadpool, starting from its creation
> and destruction functions.
> Thread management is based on the prototype from Alexey:
> https://lore.kernel.org/lkml/cover.1625227739.git.alexey.v.bayduraev@linux.intel.com/
>
> Each thread in the threadpool executes the same function (aka task)
> with a different argument tidx.
> Threads use a pair of pipes to communicate with the main process.
> The threadpool is static (all threads will be spawned at the same time).
> Future work could include making it resizable and adding affinity support
> (as in Alexey prototype).
>
> Suggested-by: Alexey Bayduraev <alexey.v.bayduraev@linux.intel.com>
> Signed-off-by: Riccardo Mancini <rickyman7@gmail.com>
> ---
>  tools/perf/util/Build                  |   1 +
>  tools/perf/util/workqueue/Build        |   1 +
>  tools/perf/util/workqueue/threadpool.c | 208 +++++++++++++++++++++++++
>  tools/perf/util/workqueue/threadpool.h |  30 ++++
>  4 files changed, 240 insertions(+)
>  create mode 100644 tools/perf/util/workqueue/Build
>  create mode 100644 tools/perf/util/workqueue/threadpool.c
>  create mode 100644 tools/perf/util/workqueue/threadpool.h
>
> diff --git a/tools/perf/util/Build b/tools/perf/util/Build
> index 2d4fa13041789cd6..c7b09701661c869d 100644
> --- a/tools/perf/util/Build
> +++ b/tools/perf/util/Build
> @@ -180,6 +180,7 @@ perf-$(CONFIG_LIBBABELTRACE) += data-convert-bt.o
>  perf-y += data-convert-json.o
>
>  perf-y += scripting-engines/
> +perf-y += workqueue/
>
>  perf-$(CONFIG_ZLIB) += zlib.o
>  perf-$(CONFIG_LZMA) += lzma.o
> diff --git a/tools/perf/util/workqueue/Build b/tools/perf/util/workqueue/Build
> new file mode 100644
> index 0000000000000000..8b72a6cd4e2cba0d
> --- /dev/null
> +++ b/tools/perf/util/workqueue/Build
> @@ -0,0 +1 @@
> +perf-y += threadpool.o
> diff --git a/tools/perf/util/workqueue/threadpool.c b/tools/perf/util/workqueue/threadpool.c
> new file mode 100644
> index 0000000000000000..0004ce606d5fa73d
> --- /dev/null
> +++ b/tools/perf/util/workqueue/threadpool.c
> @@ -0,0 +1,208 @@
> +// SPDX-License-Identifier: GPL-2.0
> +#include <stdlib.h>
> +#include <stdio.h>
> +#include <unistd.h>
> +#include <errno.h>
> +#include <string.h>
> +#include "debug.h"
> +#include <asm/bug.h>
> +#include <linux/zalloc.h>
> +#include <linux/string.h>
> +#include <linux/err.h>
> +#include <linux/kernel.h>
> +#include "threadpool.h"
> +
> +enum threadpool_status {
> +       THREADPOOL_STATUS__STOPPED,             /* no threads */
> +       THREADPOOL_STATUS__ERROR,               /* errors */
> +       THREADPOOL_STATUS__MAX
> +};
> +
> +struct threadpool {
> +       int                     nr_threads;     /* number of threads in the pool */
> +       struct threadpool_entry *threads;       /* array of threads in the pool */
> +       struct task_struct      *current_task;  /* current executing function */
> +       enum threadpool_status  status;         /* current status of the pool */

Better to move to below the nr_threads for alignment.


> +};
> +
> +struct threadpool_entry {
> +       int                             idx;    /* idx of thread in pool->threads */
> +       pid_t                           tid;    /* tid of thread */
> +       struct threadpool               *pool;  /* parent threadpool */
> +       struct {
> +               int ack[2];                     /* messages from thread (acks) */
> +               int cmd[2];                     /* messages to thread (commands) */
> +       } pipes;
> +};
> +
> +/**
> + * threadpool_entry__init_pipes - initialize all pipes of @thread
> + */
> +static void threadpool_entry__init_pipes(struct threadpool_entry *thread)
> +{
> +       thread->pipes.ack[0] = -1;
> +       thread->pipes.ack[1] = -1;
> +       thread->pipes.cmd[0] = -1;
> +       thread->pipes.cmd[1] = -1;
> +}
> +
> +/**
> + * threadpool_entry__open_pipes - open all pipes of @thread
> + */
> +static int threadpool_entry__open_pipes(struct threadpool_entry *thread)
> +{
> +       if (pipe(thread->pipes.ack)) {
> +               pr_debug2("threadpool: failed to create comm pipe 'from': %s\n",
> +                       strerror(errno));
> +               return -ENOMEM;
> +       }
> +
> +       if (pipe(thread->pipes.cmd)) {
> +               pr_debug2("threadpool: failed to create comm pipe 'to': %s\n",
> +                       strerror(errno));
> +               close(thread->pipes.ack[0]);
> +               thread->pipes.ack[0] = -1;
> +               close(thread->pipes.ack[1]);
> +               thread->pipes.ack[1] = -1;

Maybe you don't need to do it here if the caller handles it already ...

> +               return -ENOMEM;
> +       }
> +
> +       return 0;
> +}
> +
> +/**
> + * threadpool_entry__close_pipes - close all communication pipes of @thread
> + */
> +static void threadpool_entry__close_pipes(struct threadpool_entry *thread)
> +{
> +       if (thread->pipes.ack[0] != -1) {
> +               close(thread->pipes.ack[0]);
> +               thread->pipes.ack[0] = -1;
> +       }
> +       if (thread->pipes.ack[1] != -1) {
> +               close(thread->pipes.ack[1]);
> +               thread->pipes.ack[1] = -1;
> +       }
> +       if (thread->pipes.cmd[0] != -1) {
> +               close(thread->pipes.cmd[0]);
> +               thread->pipes.cmd[0] = -1;
> +       }
> +       if (thread->pipes.cmd[1] != -1) {
> +               close(thread->pipes.cmd[1]);
> +               thread->pipes.cmd[1] = -1;
> +       }
> +}
> +
> +/**
> + * threadpool__new - create a fixed threadpool with @n_threads threads
> + */
> +struct threadpool *threadpool__new(int n_threads)
> +{
> +       int ret, err, t;
> +       struct threadpool *pool = malloc(sizeof(*pool));
> +
> +       if (!pool) {
> +               pr_debug2("threadpool: cannot allocate pool: %s\n",
> +                       strerror(errno));
> +               err = -ENOMEM;
> +               goto out_return;
> +       }
> +
> +       if (n_threads <= 0) {
> +               pr_debug2("threadpool: invalid number of threads: %d\n",
> +                       n_threads);
> +               err = -EINVAL;
> +               goto out_free_pool;

Isn't' it natural to check it before the allocation?

> +       }
> +
> +       pool->nr_threads = n_threads;
> +       pool->current_task = NULL;
> +
> +       pool->threads = calloc(n_threads, sizeof(*pool->threads));
> +       if (!pool->threads) {
> +               pr_debug2("threadpool: cannot allocate threads: %s\n",
> +                       strerror(errno));
> +               err = -ENOMEM;
> +               goto out_free_pool;
> +       }
> +
> +       for (t = 0; t < n_threads; t++) {
> +               pool->threads[t].idx = t;
> +               pool->threads[t].tid = -1;
> +               pool->threads[t].pool = pool;
> +               threadpool_entry__init_pipes(&pool->threads[t]);
> +       }
> +
> +       for (t = 0; t < n_threads; t++) {
> +               ret = threadpool_entry__open_pipes(&pool->threads[t]);
> +               if (ret) {
> +                       err = -ret;
> +                       goto out_close_pipes;

... like this.  But threadpool_entry__open_pipes() already
returns a negative.

> +               }
> +       }
> +
> +       pool->status = THREADPOOL_STATUS__STOPPED;
> +
> +       return pool;
> +
> +out_close_pipes:
> +       for (t = 0; t < n_threads; t++)
> +               threadpool_entry__close_pipes(&pool->threads[t]);
> +
> +       zfree(&pool->threads);
> +out_free_pool:
> +       free(pool);
> +out_return:
> +       return ERR_PTR(err);
> +}
> +
> +/**
> + * threadpool__strerror - print message regarding given @err in @pool
> + *
> + * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> + */
> +int threadpool__strerror(struct threadpool *pool __maybe_unused, int err, char *buf, size_t size)
> +{
> +       char sbuf[STRERR_BUFSIZE], *emsg;
> +
> +       emsg = str_error_r(err, sbuf, sizeof(sbuf));
> +       return scnprintf(buf, size, "Error: %s.\n", emsg);
> +}
> +
> +/**
> + * threadpool__new_strerror - print message regarding @err_ptr
> + *
> + * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> + */
> +int threadpool__new_strerror(struct threadpool *err_ptr, char *buf, size_t size)
> +{
> +       return threadpool__strerror(err_ptr, PTR_ERR(err_ptr), buf, size);
> +}

Why two different functions?

> +
> +/**
> + * threadpool__delete - free the @pool and all its resources
> + */
> +void threadpool__delete(struct threadpool *pool)
> +{
> +       int t;
> +
> +       if (IS_ERR_OR_NULL(pool))
> +               return;
> +
> +       WARN_ON(pool->status != THREADPOOL_STATUS__STOPPED
> +               && pool->status != THREADPOOL_STATUS__ERROR);
> +
> +       for (t = 0; t < pool->nr_threads; t++)
> +               threadpool_entry__close_pipes(&pool->threads[t]);
> +
> +       zfree(&pool->threads);
> +       free(pool);
> +}
> +
> +/**
> + * threadpool__size - get number of threads in the threadpool
> + */
> +int threadpool__size(struct threadpool *pool)
> +{
> +       return pool->nr_threads;
> +}
> diff --git a/tools/perf/util/workqueue/threadpool.h b/tools/perf/util/workqueue/threadpool.h
> new file mode 100644
> index 0000000000000000..fb18aa32fb64f671
> --- /dev/null
> +++ b/tools/perf/util/workqueue/threadpool.h
> @@ -0,0 +1,30 @@
> +/* SPDX-License-Identifier: GPL-2.0 */
> +#ifndef __WORKQUEUE_THREADPOOL_H
> +#define __WORKQUEUE_THREADPOOL_H
> +
> +struct threadpool;
> +struct task_struct;

You can just move the definition here.

> +
> +typedef void (*task_func_t)(int tidx, struct task_struct *task);
> +
> +struct task_struct {
> +       task_func_t fn;
> +};
> +
> +extern struct threadpool *threadpool__new(int n_threads);
> +extern void threadpool__delete(struct threadpool *pool);
> +
> +extern int threadpool__start(struct threadpool *pool);
> +extern int threadpool__stop(struct threadpool *pool);
> +
> +extern int threadpool__execute(struct threadpool *pool, struct task_struct *task);
> +extern int threadpool__wait(struct threadpool *pool);

These are not defined yet.

> +
> +extern int threadpool__size(struct threadpool *pool);
> +
> +/* Error management */
> +#define THREADPOOL_STRERR_BUFSIZE (128+STRERR_BUFSIZE)
> +extern int threadpool__strerror(struct threadpool *pool, int err, char *buf, size_t size);
> +extern int threadpool__new_strerror(struct threadpool *err_ptr, char *buf, size_t size);
> +
> +#endif /* __WORKQUEUE_THREADPOOL_H */
> --
> 2.31.1
>

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [RFC PATCH v2 03/10] perf workqueue: add threadpool start and stop functions
  2021-07-30 15:34 ` [RFC PATCH v2 03/10] perf workqueue: add threadpool start and stop functions Riccardo Mancini
@ 2021-08-07  2:43   ` Namhyung Kim
  2021-08-09 10:35     ` Riccardo Mancini
  0 siblings, 1 reply; 21+ messages in thread
From: Namhyung Kim @ 2021-08-07  2:43 UTC (permalink / raw)
  To: Riccardo Mancini
  Cc: Arnaldo Carvalho de Melo, Ian Rogers, Peter Zijlstra,
	Ingo Molnar, Mark Rutland, Jiri Olsa, linux-kernel,
	linux-perf-users, Alexey Bayduraev

On Fri, Jul 30, 2021 at 8:34 AM Riccardo Mancini <rickyman7@gmail.com> wrote:
>
> This patch adds the start and stop functions, alongside the thread
> function.
> Each thread will run until a stop signal is received.
> Furthermore, start and stop are added to the test.
>
> Thread management is based on the prototype from Alexey:
> https://lore.kernel.org/lkml/cover.1625227739.git.alexey.v.bayduraev@linux.intel.com/
>
> Suggested-by: Alexey Bayduraev <alexey.v.bayduraev@linux.intel.com>
> Signed-off-by: Riccardo Mancini <rickyman7@gmail.com>
> ---

[SNIP]
> @@ -93,6 +134,130 @@ static void threadpool_entry__close_pipes(struct threadpool_entry *thread)
>         }
>  }
>
> +/**
> + * threadpool__wait_thread - receive ack from thread
> + *
> + * NB: call only from main thread!
> + */
> +static int threadpool__wait_thread(struct threadpool_entry *thread)

If you wanted to differentiate APIs for main thread, I think it's better
to pass the pool struct (according to the name) and the index like:

  int threadpool__wait_thread(struct threadpool *pool, int idx)

Then it can get a pointer to the entry easily.

> +{
> +       int res;
> +       enum threadpool_msg msg = THREADPOOL_MSG__UNDEFINED;
> +
> +       res = readn(thread->pipes.ack[0], &msg, sizeof(msg));
> +       if (res < 0) {
> +               pr_debug2("threadpool: failed to recv msg from tid=%d: %s\n",
> +                      thread->tid, strerror(errno));
> +               return -THREADPOOL_ERROR__READPIPE;
> +       }
> +       if (msg != THREADPOOL_MSG__ACK) {
> +               pr_debug2("threadpool: received unexpected msg from tid=%d: %s\n",
> +                      thread->tid, threadpool_msg_tags[msg]);
> +               return -THREADPOOL_ERROR__INVALIDMSG;
> +       }
> +
> +       pr_debug2("threadpool: received ack from tid=%d\n", thread->tid);
> +
> +       return 0;
> +}
> +
> +/**
> + * threadpool__terminate_thread - send stop signal to thread and wait for ack
> + *
> + * NB: call only from main thread!
> + */
> +static int threadpool__terminate_thread(struct threadpool_entry *thread)

Ditto.

> +{
> +       int res;
> +       enum threadpool_msg msg = THREADPOOL_MSG__STOP;
> +
> +       res = writen(thread->pipes.cmd[1], &msg, sizeof(msg));
> +       if (res < 0) {
> +               pr_debug2("threadpool: error sending stop msg to tid=%d: %s\n",
> +                       thread->tid, strerror(errno));
> +               return -THREADPOOL_ERROR__WRITEPIPE;
> +       }
> +
> +       return threadpool__wait_thread(thread);
> +}
> +

[SNIP]
> @@ -161,12 +326,30 @@ struct threadpool *threadpool__new(int n_threads)
>   *
>   * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
>   */
> -int threadpool__strerror(struct threadpool *pool __maybe_unused, int err, char *buf, size_t size)
> +int threadpool__strerror(struct threadpool *pool, int err, char *buf, size_t size)
>  {
>         char sbuf[STRERR_BUFSIZE], *emsg;
> +       const char *status_str, *errno_str;
>
> -       emsg = str_error_r(err, sbuf, sizeof(sbuf));
> -       return scnprintf(buf, size, "Error: %s.\n", emsg);
> +       status_str = IS_ERR_OR_NULL(pool) ? "error" : threadpool_status_tags[pool->status];
> +
> +       switch (err) {
> +       case -THREADPOOL_ERROR__SIGPROCMASK:
> +       case -THREADPOOL_ERROR__READPIPE:
> +       case -THREADPOOL_ERROR__WRITEPIPE:
> +               emsg = str_error_r(errno, sbuf, sizeof(sbuf));
> +               errno_str = threadpool_errno_str[-err-THREADPOOL_ERROR__OFFSET];
> +               return scnprintf(buf, size, "%s: %s.\n", errno_str, emsg);
> +       case -THREADPOOL_ERROR__INVALIDMSG:
> +               errno_str = threadpool_errno_str[-err-THREADPOOL_ERROR__OFFSET];
> +               return scnprintf(buf, size, "%s.\n", errno_str);
> +       case -THREADPOOL_ERROR__NOTALLOWED:
> +               return scnprintf(buf, size, "%s (%s).\n",
> +                       threadpool_errno_str[-err], status_str);

s/-err/-err-THREADPOOL_ERROR__OFFSET/ ?

It'd be nice if you calculate the index once.

> +       default:
> +               emsg = str_error_r(err, sbuf, sizeof(sbuf));

I'm confused whether the 'err' is negative or positive?

Thanks,
Namhyung


> +               return scnprintf(buf, size, "Error: %s", emsg);
> +       }
>  }
>

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [RFC PATCH v2 04/10] perf workqueue: add threadpool execute and wait functions
  2021-07-30 15:34 ` [RFC PATCH v2 04/10] perf workqueue: add threadpool execute and wait functions Riccardo Mancini
@ 2021-08-07  2:56   ` Namhyung Kim
  0 siblings, 0 replies; 21+ messages in thread
From: Namhyung Kim @ 2021-08-07  2:56 UTC (permalink / raw)
  To: Riccardo Mancini
  Cc: Arnaldo Carvalho de Melo, Ian Rogers, Peter Zijlstra,
	Ingo Molnar, Mark Rutland, Jiri Olsa, linux-kernel,
	linux-perf-users, Alexey Bayduraev

On Fri, Jul 30, 2021 at 8:34 AM Riccardo Mancini <rickyman7@gmail.com> wrote:
>
> This patch adds:
>  - threadpool__execute: assigns a task to the threads to execute
>    asynchronously.
>  - threadpool__wait: waits for the task to complete on all threads.
> Furthermore, testing for these new functions is added.
>
> This patch completes the threadpool.
>
> Signed-off-by: Riccardo Mancini <rickyman7@gmail.com>
> ---

[SNIP]
> +/**
> + * threadpool__wake_thread - send wake msg to @thread
> + *
> + * This function does not wait for the thread to actually wake
> + * NB: call only from main thread!
> + */
> +static int threadpool__wake_thread(struct threadpool_entry *thread)

Same here.  You can pass pool and idx instead.

Thanks,
Namhyung


> +{
> +       int res;
> +       enum threadpool_msg msg = THREADPOOL_MSG__WAKE;
> +
> +       res = writen(thread->pipes.cmd[1], &msg, sizeof(msg));
> +       if (res < 0) {
> +               pr_debug2("threadpool: error sending wake msg: %s\n", strerror(errno));
> +               return -THREADPOOL_ERROR__WRITEPIPE;
> +       }
> +
> +       pr_debug2("threadpool: sent wake msg %s to tid=%d\n",
> +               threadpool_msg_tags[msg], thread->tid);
> +       return 0;
> +}
> +

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [RFC PATCH v2 01/10] perf workqueue: threadpool creation and destruction
  2021-08-07  2:24   ` Namhyung Kim
@ 2021-08-09 10:30     ` Riccardo Mancini
  2021-08-10 18:54       ` Namhyung Kim
  0 siblings, 1 reply; 21+ messages in thread
From: Riccardo Mancini @ 2021-08-09 10:30 UTC (permalink / raw)
  To: Namhyung Kim
  Cc: Arnaldo Carvalho de Melo, Ian Rogers, Peter Zijlstra,
	Ingo Molnar, Mark Rutland, Jiri Olsa, linux-kernel,
	linux-perf-users, Alexey Bayduraev

Hi Namhyung,
thanks for the review!

On Fri, 2021-08-06 at 19:24 -0700, Namhyung Kim wrote:
> Hi Riccardo,
> 
> On Fri, Jul 30, 2021 at 8:34 AM Riccardo Mancini <rickyman7@gmail.com> wrote:
> > 
> > The workqueue library is made up by two components:
> >  - threadpool: handles the lifetime of the threads
> >  - workqueue: handles work distribution among the threads
> > 
> > This first patch introduces the threadpool, starting from its creation
> > and destruction functions.
> > Thread management is based on the prototype from Alexey:
> > https://lore.kernel.org/lkml/cover.1625227739.git.alexey.v.bayduraev@linux.intel.com/
> > 
> > Each thread in the threadpool executes the same function (aka task)
> > with a different argument tidx.
> > Threads use a pair of pipes to communicate with the main process.
> > The threadpool is static (all threads will be spawned at the same time).
> > Future work could include making it resizable and adding affinity support
> > (as in Alexey prototype).
> > 
> > Suggested-by: Alexey Bayduraev <alexey.v.bayduraev@linux.intel.com>
> > Signed-off-by: Riccardo Mancini <rickyman7@gmail.com>
> > ---
> >  tools/perf/util/Build                  |   1 +
> >  tools/perf/util/workqueue/Build        |   1 +
> >  tools/perf/util/workqueue/threadpool.c | 208 +++++++++++++++++++++++++
> >  tools/perf/util/workqueue/threadpool.h |  30 ++++
> >  4 files changed, 240 insertions(+)
> >  create mode 100644 tools/perf/util/workqueue/Build
> >  create mode 100644 tools/perf/util/workqueue/threadpool.c
> >  create mode 100644 tools/perf/util/workqueue/threadpool.h
> > 
> > diff --git a/tools/perf/util/Build b/tools/perf/util/Build
> > index 2d4fa13041789cd6..c7b09701661c869d 100644
> > --- a/tools/perf/util/Build
> > +++ b/tools/perf/util/Build
> > @@ -180,6 +180,7 @@ perf-$(CONFIG_LIBBABELTRACE) += data-convert-bt.o
> >  perf-y += data-convert-json.o
> > 
> >  perf-y += scripting-engines/
> > +perf-y += workqueue/
> > 
> >  perf-$(CONFIG_ZLIB) += zlib.o
> >  perf-$(CONFIG_LZMA) += lzma.o
> > diff --git a/tools/perf/util/workqueue/Build
> > b/tools/perf/util/workqueue/Build
> > new file mode 100644
> > index 0000000000000000..8b72a6cd4e2cba0d
> > --- /dev/null
> > +++ b/tools/perf/util/workqueue/Build
> > @@ -0,0 +1 @@
> > +perf-y += threadpool.o
> > diff --git a/tools/perf/util/workqueue/threadpool.c
> > b/tools/perf/util/workqueue/threadpool.c
> > new file mode 100644
> > index 0000000000000000..0004ce606d5fa73d
> > --- /dev/null
> > +++ b/tools/perf/util/workqueue/threadpool.c
> > @@ -0,0 +1,208 @@
> > +// SPDX-License-Identifier: GPL-2.0
> > +#include <stdlib.h>
> > +#include <stdio.h>
> > +#include <unistd.h>
> > +#include <errno.h>
> > +#include <string.h>
> > +#include "debug.h"
> > +#include <asm/bug.h>
> > +#include <linux/zalloc.h>
> > +#include <linux/string.h>
> > +#include <linux/err.h>
> > +#include <linux/kernel.h>
> > +#include "threadpool.h"
> > +
> > +enum threadpool_status {
> > +       THREADPOOL_STATUS__STOPPED,             /* no threads */
> > +       THREADPOOL_STATUS__ERROR,               /* errors */
> > +       THREADPOOL_STATUS__MAX
> > +};
> > +
> > +struct threadpool {
> > +       int                     nr_threads;     /* number of threads in the
> > pool */
> > +       struct threadpool_entry *threads;       /* array of threads in the
> > pool */
> > +       struct task_struct      *current_task;  /* current executing
> > function */
> > +       enum threadpool_status  status;         /* current status of the
> > pool */
> 
> Better to move to below the nr_threads for alignment.

ack

> 
> 
> > +};
> > +
> > +struct threadpool_entry {
> > +       int                             idx;    /* idx of thread in pool-
> > >threads */
> > +       pid_t                           tid;    /* tid of thread */
> > +       struct threadpool               *pool;  /* parent threadpool */
> > +       struct {
> > +               int ack[2];                     /* messages from thread
> > (acks) */
> > +               int cmd[2];                     /* messages to thread
> > (commands) */
> > +       } pipes;
> > +};
> > +
> > +/**
> > + * threadpool_entry__init_pipes - initialize all pipes of @thread
> > + */
> > +static void threadpool_entry__init_pipes(struct threadpool_entry *thread)
> > +{
> > +       thread->pipes.ack[0] = -1;
> > +       thread->pipes.ack[1] = -1;
> > +       thread->pipes.cmd[0] = -1;
> > +       thread->pipes.cmd[1] = -1;
> > +}
> > +
> > +/**
> > + * threadpool_entry__open_pipes - open all pipes of @thread
> > + */
> > +static int threadpool_entry__open_pipes(struct threadpool_entry *thread)
> > +{
> > +       if (pipe(thread->pipes.ack)) {
> > +               pr_debug2("threadpool: failed to create comm pipe 'from':
> > %s\n",
> > +                       strerror(errno));
> > +               return -ENOMEM;
> > +       }
> > +
> > +       if (pipe(thread->pipes.cmd)) {
> > +               pr_debug2("threadpool: failed to create comm pipe 'to':
> > %s\n",
> > +                       strerror(errno));
> > +               close(thread->pipes.ack[0]);
> > +               thread->pipes.ack[0] = -1;
> > +               close(thread->pipes.ack[1]);
> > +               thread->pipes.ack[1] = -1;
> 
> Maybe you don't need to do it here if the caller handles it already ...

oops, thanks.

> 
> > +               return -ENOMEM;
> > +       }
> > +
> > +       return 0;
> > +}
> > +
> > +/**
> > + * threadpool_entry__close_pipes - close all communication pipes of @thread
> > + */
> > +static void threadpool_entry__close_pipes(struct threadpool_entry *thread)
> > +{
> > +       if (thread->pipes.ack[0] != -1) {
> > +               close(thread->pipes.ack[0]);
> > +               thread->pipes.ack[0] = -1;
> > +       }
> > +       if (thread->pipes.ack[1] != -1) {
> > +               close(thread->pipes.ack[1]);
> > +               thread->pipes.ack[1] = -1;
> > +       }
> > +       if (thread->pipes.cmd[0] != -1) {
> > +               close(thread->pipes.cmd[0]);
> > +               thread->pipes.cmd[0] = -1;
> > +       }
> > +       if (thread->pipes.cmd[1] != -1) {
> > +               close(thread->pipes.cmd[1]);
> > +               thread->pipes.cmd[1] = -1;
> > +       }
> > +}
> > +
> > +/**
> > + * threadpool__new - create a fixed threadpool with @n_threads threads
> > + */
> > +struct threadpool *threadpool__new(int n_threads)
> > +{
> > +       int ret, err, t;
> > +       struct threadpool *pool = malloc(sizeof(*pool));
> > +
> > +       if (!pool) {
> > +               pr_debug2("threadpool: cannot allocate pool: %s\n",
> > +                       strerror(errno));
> > +               err = -ENOMEM;
> > +               goto out_return;
> > +       }
> > +
> > +       if (n_threads <= 0) {
> > +               pr_debug2("threadpool: invalid number of threads: %d\n",
> > +                       n_threads);
> > +               err = -EINVAL;
> > +               goto out_free_pool;
> 
> Isn't' it natural to check it before the allocation?

It sure is, thanks.

> 
> > +       }
> > +
> > +       pool->nr_threads = n_threads;
> > +       pool->current_task = NULL;
> > +
> > +       pool->threads = calloc(n_threads, sizeof(*pool->threads));
> > +       if (!pool->threads) {
> > +               pr_debug2("threadpool: cannot allocate threads: %s\n",
> > +                       strerror(errno));
> > +               err = -ENOMEM;
> > +               goto out_free_pool;
> > +       }
> > +
> > +       for (t = 0; t < n_threads; t++) {
> > +               pool->threads[t].idx = t;
> > +               pool->threads[t].tid = -1;
> > +               pool->threads[t].pool = pool;
> > +               threadpool_entry__init_pipes(&pool->threads[t]);
> > +       }
> > +
> > +       for (t = 0; t < n_threads; t++) {
> > +               ret = threadpool_entry__open_pipes(&pool->threads[t]);
> > +               if (ret) {
> > +                       err = -ret;
> > +                       goto out_close_pipes;
> 
> ... like this.  But threadpool_entry__open_pipes() already
> returns a negative.

Yeah, I made some confusion with signs in this version, because I first wanted
to use positive for errnos and negative for custom errors but it didn't sound
like a good idea, so then I reverted to only negative errors, using the offset
for the custom errors.
I will have a better look at all return codes.

> 
> > +               }
> > +       }
> > +
> > +       pool->status = THREADPOOL_STATUS__STOPPED;
> > +
> > +       return pool;
> > +
> > +out_close_pipes:
> > +       for (t = 0; t < n_threads; t++)
> > +               threadpool_entry__close_pipes(&pool->threads[t]);
> > +
> > +       zfree(&pool->threads);
> > +out_free_pool:
> > +       free(pool);
> > +out_return:
> > +       return ERR_PTR(err);
> > +}
> > +
> > +/**
> > + * threadpool__strerror - print message regarding given @err in @pool
> > + *
> > + * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> > + */
> > +int threadpool__strerror(struct threadpool *pool __maybe_unused, int err,
> > char *buf, size_t size)
> > +{
> > +       char sbuf[STRERR_BUFSIZE], *emsg;
> > +
> > +       emsg = str_error_r(err, sbuf, sizeof(sbuf));
> > +       return scnprintf(buf, size, "Error: %s.\n", emsg);
> > +}
> > +
> > +/**
> > + * threadpool__new_strerror - print message regarding @err_ptr
> > + *
> > + * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> > + */
> > +int threadpool__new_strerror(struct threadpool *err_ptr, char *buf, size_t
> > size)
> > +{
> > +       return threadpool__strerror(err_ptr, PTR_ERR(err_ptr), buf, size);
> > +}
> 
> Why two different functions?

Since when new fails you don't have a err number, just an err_ptr so it's not
very clear how to call threadpool__strerror. Therefore I made a wrapper to
remove any ambiguity.
> 
> > +
> > +/**
> > + * threadpool__delete - free the @pool and all its resources
> > + */
> > +void threadpool__delete(struct threadpool *pool)
> > +{
> > +       int t;
> > +
> > +       if (IS_ERR_OR_NULL(pool))
> > +               return;
> > +
> > +       WARN_ON(pool->status != THREADPOOL_STATUS__STOPPED
> > +               && pool->status != THREADPOOL_STATUS__ERROR);
> > +
> > +       for (t = 0; t < pool->nr_threads; t++)
> > +               threadpool_entry__close_pipes(&pool->threads[t]);
> > +
> > +       zfree(&pool->threads);
> > +       free(pool);
> > +}
> > +
> > +/**
> > + * threadpool__size - get number of threads in the threadpool
> > + */
> > +int threadpool__size(struct threadpool *pool)
> > +{
> > +       return pool->nr_threads;
> > +}
> > diff --git a/tools/perf/util/workqueue/threadpool.h
> > b/tools/perf/util/workqueue/threadpool.h
> > new file mode 100644
> > index 0000000000000000..fb18aa32fb64f671
> > --- /dev/null
> > +++ b/tools/perf/util/workqueue/threadpool.h
> > @@ -0,0 +1,30 @@
> > +/* SPDX-License-Identifier: GPL-2.0 */
> > +#ifndef __WORKQUEUE_THREADPOOL_H
> > +#define __WORKQUEUE_THREADPOOL_H
> > +
> > +struct threadpool;
> > +struct task_struct;
> 
> You can just move the definition here.
> 
> > +
> > +typedef void (*task_func_t)(int tidx, struct task_struct *task);
> > +
> > +struct task_struct {
> > +       task_func_t fn;
> > +};

I thought it was not allowed, since task_func_t refers to task_struct and
viceversa.
I will try to remove it if possible.

> > +
> > +extern struct threadpool *threadpool__new(int n_threads);
> > +extern void threadpool__delete(struct threadpool *pool);
> > +
> > +extern int threadpool__start(struct threadpool *pool);
> > +extern int threadpool__stop(struct threadpool *pool);
> > +
> > +extern int threadpool__execute(struct threadpool *pool, struct task_struct
> > *task);
> > +extern int threadpool__wait(struct threadpool *pool);
> 
> These are not defined yet.

Oops, they must've leaked from the 3rd patch.

Thanks,
Riccardo

> 
> > +
> > +extern int threadpool__size(struct threadpool *pool);
> > +
> > +/* Error management */
> > +#define THREADPOOL_STRERR_BUFSIZE (128+STRERR_BUFSIZE)
> > +extern int threadpool__strerror(struct threadpool *pool, int err, char
> > *buf, size_t size);
> > +extern int threadpool__new_strerror(struct threadpool *err_ptr, char *buf,
> > size_t size);
> > +
> > +#endif /* __WORKQUEUE_THREADPOOL_H */
> > --
> > 2.31.1
> > 



^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [RFC PATCH v2 03/10] perf workqueue: add threadpool start and stop functions
  2021-08-07  2:43   ` Namhyung Kim
@ 2021-08-09 10:35     ` Riccardo Mancini
  0 siblings, 0 replies; 21+ messages in thread
From: Riccardo Mancini @ 2021-08-09 10:35 UTC (permalink / raw)
  To: Namhyung Kim
  Cc: Arnaldo Carvalho de Melo, Ian Rogers, Peter Zijlstra,
	Ingo Molnar, Mark Rutland, Jiri Olsa, linux-kernel,
	linux-perf-users, Alexey Bayduraev

Hi,

On Fri, 2021-08-06 at 19:43 -0700, Namhyung Kim wrote:
> On Fri, Jul 30, 2021 at 8:34 AM Riccardo Mancini <rickyman7@gmail.com> wrote:
> > 
> > This patch adds the start and stop functions, alongside the thread
> > function.
> > Each thread will run until a stop signal is received.
> > Furthermore, start and stop are added to the test.
> > 
> > Thread management is based on the prototype from Alexey:
> > https://lore.kernel.org/lkml/cover.1625227739.git.alexey.v.bayduraev@linux.intel.com/
> > 
> > Suggested-by: Alexey Bayduraev <alexey.v.bayduraev@linux.intel.com>
> > Signed-off-by: Riccardo Mancini <rickyman7@gmail.com>
> > ---
> 
> [SNIP]
> > @@ -93,6 +134,130 @@ static void threadpool_entry__close_pipes(struct
> > threadpool_entry *thread)
> >         }
> >  }
> > 
> > +/**
> > + * threadpool__wait_thread - receive ack from thread
> > + *
> > + * NB: call only from main thread!
> > + */
> > +static int threadpool__wait_thread(struct threadpool_entry *thread)
> 
> If you wanted to differentiate APIs for main thread, I think it's better
> to pass the pool struct (according to the name) and the index like:
> 
>   int threadpool__wait_thread(struct threadpool *pool, int idx)
> 
> Then it can get a pointer to the entry easily.

Agree. 
I also need to more clearly separate those APIs too.

> 
> > +{
> > +       int res;
> > +       enum threadpool_msg msg = THREADPOOL_MSG__UNDEFINED;
> > +
> > +       res = readn(thread->pipes.ack[0], &msg, sizeof(msg));
> > +       if (res < 0) {
> > +               pr_debug2("threadpool: failed to recv msg from tid=%d:
> > %s\n",
> > +                      thread->tid, strerror(errno));
> > +               return -THREADPOOL_ERROR__READPIPE;
> > +       }
> > +       if (msg != THREADPOOL_MSG__ACK) {
> > +               pr_debug2("threadpool: received unexpected msg from tid=%d:
> > %s\n",
> > +                      thread->tid, threadpool_msg_tags[msg]);
> > +               return -THREADPOOL_ERROR__INVALIDMSG;
> > +       }
> > +
> > +       pr_debug2("threadpool: received ack from tid=%d\n", thread->tid);
> > +
> > +       return 0;
> > +}
> > +
> > +/**
> > + * threadpool__terminate_thread - send stop signal to thread and wait for
> > ack
> > + *
> > + * NB: call only from main thread!
> > + */
> > +static int threadpool__terminate_thread(struct threadpool_entry *thread)
> 
> Ditto.

ack

> 
> > +{
> > +       int res;
> > +       enum threadpool_msg msg = THREADPOOL_MSG__STOP;
> > +
> > +       res = writen(thread->pipes.cmd[1], &msg, sizeof(msg));
> > +       if (res < 0) {
> > +               pr_debug2("threadpool: error sending stop msg to tid=%d:
> > %s\n",
> > +                       thread->tid, strerror(errno));
> > +               return -THREADPOOL_ERROR__WRITEPIPE;
> > +       }
> > +
> > +       return threadpool__wait_thread(thread);
> > +}
> > +
> 
> [SNIP]
> > @@ -161,12 +326,30 @@ struct threadpool *threadpool__new(int n_threads)
> >   *
> >   * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> >   */
> > -int threadpool__strerror(struct threadpool *pool __maybe_unused, int err,
> > char *buf, size_t size)
> > +int threadpool__strerror(struct threadpool *pool, int err, char *buf,
> > size_t size)
> >  {
> >         char sbuf[STRERR_BUFSIZE], *emsg;
> > +       const char *status_str, *errno_str;
> > 
> > -       emsg = str_error_r(err, sbuf, sizeof(sbuf));
> > -       return scnprintf(buf, size, "Error: %s.\n", emsg);
> > +       status_str = IS_ERR_OR_NULL(pool) ? "error" :
> > threadpool_status_tags[pool->status];
> > +
> > +       switch (err) {
> > +       case -THREADPOOL_ERROR__SIGPROCMASK:
> > +       case -THREADPOOL_ERROR__READPIPE:
> > +       case -THREADPOOL_ERROR__WRITEPIPE:
> > +               emsg = str_error_r(errno, sbuf, sizeof(sbuf));
> > +               errno_str = threadpool_errno_str[-err-
> > THREADPOOL_ERROR__OFFSET];
> > +               return scnprintf(buf, size, "%s: %s.\n", errno_str, emsg);
> > +       case -THREADPOOL_ERROR__INVALIDMSG:
> > +               errno_str = threadpool_errno_str[-err-
> > THREADPOOL_ERROR__OFFSET];
> > +               return scnprintf(buf, size, "%s.\n", errno_str);
> > +       case -THREADPOOL_ERROR__NOTALLOWED:
> > +               return scnprintf(buf, size, "%s (%s).\n",
> > +                       threadpool_errno_str[-err], status_str);
> 
> s/-err/-err-THREADPOOL_ERROR__OFFSET/ ?
> 
> It'd be nice if you calculate the index once.

agreed

> 
> > +       default:
> > +               emsg = str_error_r(err, sbuf, sizeof(sbuf));
> 
> I'm confused whether the 'err' is negative or positive?

It's negative, so it needs to be inverted in this case.

Thanks,
Riccardo

> 
> Thanks,
> Namhyung
> 
> 
> > +               return scnprintf(buf, size, "Error: %s", emsg);
> > +       }
> >  }
> > 



^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [RFC PATCH v2 10/10] perf synthetic-events: use workqueue parallel_for
  2021-07-30 15:34 ` [RFC PATCH v2 10/10] perf synthetic-events: use workqueue parallel_for Riccardo Mancini
@ 2021-08-09 12:04   ` Jiri Olsa
  2021-08-09 13:24     ` Riccardo Mancini
  0 siblings, 1 reply; 21+ messages in thread
From: Jiri Olsa @ 2021-08-09 12:04 UTC (permalink / raw)
  To: Riccardo Mancini
  Cc: Arnaldo Carvalho de Melo, Ian Rogers, Namhyung Kim,
	Peter Zijlstra, Ingo Molnar, Mark Rutland, linux-kernel,
	linux-perf-users, Alexey Bayduraev

On Fri, Jul 30, 2021 at 05:34:17PM +0200, Riccardo Mancini wrote:
> To generate synthetic events, perf has the option to use multiple
> threads. These threads are created manually using pthread_created.
> 
> This patch replaces the manual pthread_create with a workqueue,
> using the parallel_for utility.

hi,
I really like this new interface

> 
> Experimental results show that workqueue has a slightly higher overhead,
> but this is repayed by the improved work balancing among threads.

how did you measure that balancing improvement?
is there less kernel cycles spent?

I ran the benchmark and if I'm reading the results correctly I see
performance drop for high cpu numbers (full list attached below).


old perf:                                                                 new perf:

[jolsa@dell-r440-01 perf]$ ./perf.old bench internals synthesize -t       [jolsa@dell-r440-01 perf]$ ./perf bench internals synthesize -t
...
  Number of synthesis threads: 40                                           Number of synthesis threads: 40
    Average synthesis took: 2489.400 usec (+- 49.832 usec)                    Average synthesis took: 4576.500 usec (+- 75.278 usec)
    Average num. events: 956.800 (+- 6.721)                                   Average num. events: 1020.000 (+- 0.000)
    Average time per event 2.602 usec                                         Average time per event 4.487 usec

maybe profiling will show what's going on?

thanks,
jirka


---
[jolsa@dell-r440-01 perf]$ ./perf.old bench internals synthesize -t       [jolsa@dell-r440-01 perf]$ ./perf bench internals synthesize -t
# Running 'internals/synthesize' benchmark:                               # Running 'internals/synthesize' benchmark:
Computing performance of multi threaded perf event synthesis by           Computing performance of multi threaded perf event synthesis by
synthesizing events on CPU 0:                                             synthesizing events on CPU 0:
  Number of synthesis threads: 1                                            Number of synthesis threads: 1 
    Average synthesis took: 7907.100 usec (+- 197.363 usec)                   Average synthesis took: 7972.900 usec (+- 198.158 usec)
    Average num. events: 956.000 (+- 0.000)                                   Average num. events: 936.000 (+- 0.000)
    Average time per event 8.271 usec                                         Average time per event 8.518 usec
  Number of synthesis threads: 2                                            Number of synthesis threads: 2 
    Average synthesis took: 5616.800 usec (+- 61.253 usec)                    Average synthesis took: 5844.700 usec (+- 87.219 usec)
    Average num. events: 958.800 (+- 0.327)                                   Average num. events: 940.000 (+- 0.000)
    Average time per event 5.858 usec                                         Average time per event 6.218 usec
  Number of synthesis threads: 3                                            Number of synthesis threads: 3 
    Average synthesis took: 4274.000 usec (+- 93.293 usec)                    Average synthesis took: 4019.700 usec (+- 67.354 usec)
    Average num. events: 962.000 (+- 0.000)                                   Average num. events: 942.000 (+- 0.000)
    Average time per event 4.443 usec                                         Average time per event 4.267 usec
  Number of synthesis threads: 4                                            Number of synthesis threads: 4 
    Average synthesis took: 3425.700 usec (+- 43.044 usec)                    Average synthesis took: 3382.200 usec (+- 74.652 usec)
    Average num. events: 959.600 (+- 0.933)                                   Average num. events: 944.000 (+- 0.000)
    Average time per event 3.570 usec                                         Average time per event 3.583 usec
  Number of synthesis threads: 5                                            Number of synthesis threads: 5 
    Average synthesis took: 2958.000 usec (+- 82.951 usec)                    Average synthesis took: 3086.500 usec (+- 48.213 usec)
    Average num. events: 966.000 (+- 0.000)                                   Average num. events: 946.000 (+- 0.000)
    Average time per event 3.062 usec                                         Average time per event 3.263 usec
  Number of synthesis threads: 6                                            Number of synthesis threads: 6 
    Average synthesis took: 2808.400 usec (+- 66.868 usec)                    Average synthesis took: 2752.200 usec (+- 56.411 usec)
    Average num. events: 956.800 (+- 0.327)                                   Average num. events: 948.000 (+- 0.000)
    Average time per event 2.935 usec                                         Average time per event 2.903 usec
  Number of synthesis threads: 7                                            Number of synthesis threads: 7 
    Average synthesis took: 2622.900 usec (+- 83.524 usec)                    Average synthesis took: 2548.200 usec (+- 48.042 usec)
    Average num. events: 958.400 (+- 0.267)                                   Average num. events: 950.000 (+- 0.000)
    Average time per event 2.737 usec                                         Average time per event 2.682 usec
  Number of synthesis threads: 8                                            Number of synthesis threads: 8 
    Average synthesis took: 2271.600 usec (+- 29.181 usec)                    Average synthesis took: 2486.600 usec (+- 47.862 usec)
    Average num. events: 972.000 (+- 0.000)                                   Average num. events: 952.000 (+- 0.000)
    Average time per event 2.337 usec                                         Average time per event 2.612 usec
  Number of synthesis threads: 9                                            Number of synthesis threads: 9 
    Average synthesis took: 2372.000 usec (+- 95.495 usec)                    Average synthesis took: 2347.300 usec (+- 23.959 usec)
    Average num. events: 959.200 (+- 0.952)                                   Average num. events: 954.000 (+- 0.000)
    Average time per event 2.473 usec                                         Average time per event 2.460 usec
  Number of synthesis threads: 10                                           Number of synthesis threads: 10
    Average synthesis took: 2544.600 usec (+- 107.569 usec)                   Average synthesis took: 2328.800 usec (+- 14.234 usec)
    Average num. events: 968.400 (+- 3.124)                                   Average num. events: 957.400 (+- 0.306)
    Average time per event 2.628 usec                                         Average time per event 2.432 usec
  Number of synthesis threads: 11                                           Number of synthesis threads: 11
    Average synthesis took: 2299.300 usec (+- 57.597 usec)                    Average synthesis took: 2340.300 usec (+- 34.638 usec)
    Average num. events: 956.000 (+- 0.000)                                   Average num. events: 960.000 (+- 0.000)
    Average time per event 2.405 usec                                         Average time per event 2.438 usec
  Number of synthesis threads: 12                                           Number of synthesis threads: 12
    Average synthesis took: 2545.500 usec (+- 69.557 usec)                    Average synthesis took: 2318.700 usec (+- 15.803 usec)
    Average num. events: 974.800 (+- 0.611)                                   Average num. events: 963.800 (+- 0.200)
    Average time per event 2.611 usec                                         Average time per event 2.406 usec
  Number of synthesis threads: 13                                           Number of synthesis threads: 13
    Average synthesis took: 2386.400 usec (+- 79.244 usec)                    Average synthesis took: 2408.700 usec (+- 27.071 usec)
    Average num. events: 950.500 (+- 5.726)                                   Average num. events: 966.000 (+- 0.000)
    Average time per event 2.511 usec                                         Average time per event 2.493 usec
  Number of synthesis threads: 14                                           Number of synthesis threads: 14 
    Average synthesis took: 2466.600 usec (+- 57.893 usec)                    Average synthesis took: 2547.200 usec (+- 53.445 usec)
    Average num. events: 957.600 (+- 0.718)                                   Average num. events: 968.000 (+- 0.000)
    Average time per event 2.576 usec                                         Average time per event 2.631 usec
  Number of synthesis threads: 15                                           Number of synthesis threads: 15 
    Average synthesis took: 2249.700 usec (+- 64.026 usec)                    Average synthesis took: 2647.900 usec (+- 79.014 usec)
    Average num. events: 956.000 (+- 0.000)                                   Average num. events: 970.000 (+- 0.000)
    Average time per event 2.353 usec                                         Average time per event 2.730 usec
  Number of synthesis threads: 16                                           Number of synthesis threads: 16 
    Average synthesis took: 2311.700 usec (+- 64.304 usec)                    Average synthesis took: 2676.200 usec (+- 34.824 usec)
    Average num. events: 955.000 (+- 0.907)                                   Average num. events: 972.000 (+- 0.000)
    Average time per event 2.421 usec                                         Average time per event 2.753 usec
  Number of synthesis threads: 17                                           Number of synthesis threads: 17 
    Average synthesis took: 2174.100 usec (+- 36.673 usec)                    Average synthesis took: 2580.100 usec (+- 45.414 usec)
    Average num. events: 971.600 (+- 3.124)                                   Average num. events: 974.000 (+- 0.000)
    Average time per event 2.238 usec                                         Average time per event 2.649 usec
  Number of synthesis threads: 18                                           Number of synthesis threads: 18 
    Average synthesis took: 2294.200 usec (+- 63.657 usec)                    Average synthesis took: 2810.200 usec (+- 49.113 usec)
    Average num. events: 953.200 (+- 0.611)                                   Average num. events: 976.000 (+- 0.000)
    Average time per event 2.407 usec                                         Average time per event 2.879 usec
  Number of synthesis threads: 19                                           Number of synthesis threads: 19 
    Average synthesis took: 2410.700 usec (+- 120.169 usec)                   Average synthesis took: 2862.400 usec (+- 36.982 usec)
    Average num. events: 953.400 (+- 0.306)                                   Average num. events: 978.000 (+- 0.000)
    Average time per event 2.529 usec                                         Average time per event 2.927 usec
  Number of synthesis threads: 20                                           Number of synthesis threads: 20 
    Average synthesis took: 2387.000 usec (+- 91.051 usec)                    Average synthesis took: 2908.800 usec (+- 36.404 usec)
    Average num. events: 952.800 (+- 0.800)                                   Average num. events: 978.600 (+- 0.306)
    Average time per event 2.505 usec                                         Average time per event 2.972 usec
  Number of synthesis threads: 21                                           Number of synthesis threads: 21 
    Average synthesis took: 2275.700 usec (+- 39.815 usec)                    Average synthesis took: 3141.100 usec (+- 30.896 usec)
    Average num. events: 954.600 (+- 0.306)                                   Average num. events: 980.000 (+- 0.000)
    Average time per event 2.384 usec                                         Average time per event 3.205 usec
  Number of synthesis threads: 22                                           Number of synthesis threads: 22 
    Average synthesis took: 2373.200 usec (+- 89.528 usec)                    Average synthesis took: 3342.400 usec (+- 112.115 usec)
    Average num. events: 949.100 (+- 5.843)                                   Average num. events: 982.000 (+- 0.000)
    Average time per event 2.500 usec                                         Average time per event 3.404 usec
  Number of synthesis threads: 23                                           Number of synthesis threads: 23 
    Average synthesis took: 2318.300 usec (+- 39.395 usec)                    Average synthesis took: 3269.700 usec (+- 55.215 usec)
    Average num. events: 954.600 (+- 0.427)                                   Average num. events: 984.000 (+- 0.000)
    Average time per event 2.429 usec                                         Average time per event 3.323 usec
  Number of synthesis threads: 24                                           Number of synthesis threads: 24
    Average synthesis took: 2241.900 usec (+- 52.577 usec)                    Average synthesis took: 3379.500 usec (+- 56.380 usec)
    Average num. events: 954.000 (+- 0.000)                                   Average num. events: 986.000 (+- 0.000)
    Average time per event 2.350 usec                                         Average time per event 3.427 usec
  Number of synthesis threads: 25                                           Number of synthesis threads: 25
    Average synthesis took: 2343.400 usec (+- 101.611 usec)                   Average synthesis took: 3382.500 usec (+- 51.535 usec)
    Average num. events: 956.200 (+- 1.009)                                   Average num. events: 988.000 (+- 0.000)
    Average time per event 2.451 usec                                         Average time per event 3.424 usec
  Number of synthesis threads: 26                                           Number of synthesis threads: 26
    Average synthesis took: 2260.700 usec (+- 18.863 usec)                    Average synthesis took: 3391.600 usec (+- 44.053 usec)
    Average num. events: 954.000 (+- 0.000)                                   Average num. events: 990.000 (+- 0.000)
    Average time per event 2.370 usec                                         Average time per event 3.426 usec
  Number of synthesis threads: 27                                           Number of synthesis threads: 27
    Average synthesis took: 2373.800 usec (+- 74.213 usec)                    Average synthesis took: 3659.200 usec (+- 113.176 usec)
    Average num. events: 955.000 (+- 0.803)                                   Average num. events: 992.000 (+- 0.000)
    Average time per event 2.486 usec                                         Average time per event 3.689 usec
  Number of synthesis threads: 28                                           Number of synthesis threads: 28
    Average synthesis took: 2335.500 usec (+- 49.480 usec)                    Average synthesis took: 3625.000 usec (+- 90.131 usec)
    Average num. events: 954.000 (+- 0.000)                                   Average num. events: 994.000 (+- 0.000)
    Average time per event 2.448 usec                                         Average time per event 3.647 usec
  Number of synthesis threads: 29                                           Number of synthesis threads: 29
    Average synthesis took: 2182.100 usec (+- 41.649 usec)                    Average synthesis took: 3708.400 usec (+- 103.717 usec)
    Average num. events: 954.000 (+- 0.000)                                   Average num. events: 996.000 (+- 0.000)
    Average time per event 2.287 usec                                         Average time per event 3.723 usec
  Number of synthesis threads: 30                                           Number of synthesis threads: 30
    Average synthesis took: 2246.100 usec (+- 58.252 usec)                    Average synthesis took: 3820.500 usec (+- 95.282 usec)
    Average num. events: 954.000 (+- 0.000)                                   Average num. events: 998.000 (+- 0.000)
    Average time per event 2.354 usec                                         Average time per event 3.828 usec
  Number of synthesis threads: 31                                           Number of synthesis threads: 31
    Average synthesis took: 2156.900 usec (+- 26.141 usec)                    Average synthesis took: 3881.400 usec (+- 36.277 usec)
    Average num. events: 948.300 (+- 5.700)                                   Average num. events: 1000.000 (+- 0.000)
    Average time per event 2.274 usec                                         Average time per event 3.881 usec
  Number of synthesis threads: 32                                           Number of synthesis threads: 32
    Average synthesis took: 2295.300 usec (+- 41.538 usec)                    Average synthesis took: 4191.700 usec (+- 149.780 usec)
    Average num. events: 954.000 (+- 0.000)                                   Average num. events: 1002.000 (+- 0.000)
    Average time per event 2.406 usec                                         Average time per event 4.183 usec
  Number of synthesis threads: 33                                           Number of synthesis threads: 33
    Average synthesis took: 2249.100 usec (+- 59.135 usec)                    Average synthesis took: 3988.200 usec (+- 25.015 usec)
    Average num. events: 948.500 (+- 5.726)                                   Average num. events: 1004.000 (+- 0.000)
    Average time per event 2.371 usec                                         Average time per event 3.972 usec
  Number of synthesis threads: 34                                           Number of synthesis threads: 34
    Average synthesis took: 2270.400 usec (+- 65.011 usec)                    Average synthesis took: 4064.600 usec (+- 44.158 usec)
    Average num. events: 954.200 (+- 0.200)                                   Average num. events: 1006.000 (+- 0.000)
    Average time per event 2.379 usec                                         Average time per event 4.040 usec
  Number of synthesis threads: 35                                           Number of synthesis threads: 35
    Average synthesis took: 2259.200 usec (+- 44.287 usec)                    Average synthesis took: 4145.700 usec (+- 37.297 usec)
    Average num. events: 954.000 (+- 0.000)                                   Average num. events: 1008.000 (+- 0.000)
    Average time per event 2.368 usec                                         Average time per event 4.113 usec
  Number of synthesis threads: 36                                           Number of synthesis threads: 36
    Average synthesis took: 2294.100 usec (+- 38.693 usec)                    Average synthesis took: 4234.900 usec (+- 81.904 usec)
    Average num. events: 954.000 (+- 0.000)                                   Average num. events: 1010.400 (+- 0.267)
    Average time per event 2.405 usec                                         Average time per event 4.191 usec
  Number of synthesis threads: 37                                           Number of synthesis threads: 37
    Average synthesis took: 2338.900 usec (+- 80.346 usec)                    Average synthesis took: 4337.900 usec (+- 30.071 usec)
    Average num. events: 954.400 (+- 0.267)                                   Average num. events: 1014.000 (+- 0.000)
    Average time per event 2.451 usec                                         Average time per event 4.278 usec
  Number of synthesis threads: 38                                           Number of synthesis threads: 38
    Average synthesis took: 2406.300 usec (+- 57.140 usec)                    Average synthesis took: 4426.600 usec (+- 27.035 usec)
    Average num. events: 938.400 (+- 7.730)                                   Average num. events: 1016.000 (+- 0.000)
    Average time per event 2.564 usec                                         Average time per event 4.357 usec
  Number of synthesis threads: 39                                           Number of synthesis threads: 39
    Average synthesis took: 2371.000 usec (+- 35.676 usec)                    Average synthesis took: 5979.000 usec (+- 1518.855 usec)
    Average num. events: 963.000 (+- 0.000)                                   Average num. events: 1018.000 (+- 0.000)
    Average time per event 2.462 usec                                         Average time per event 5.873 usec
  Number of synthesis threads: 40                                           Number of synthesis threads: 40
    Average synthesis took: 2489.400 usec (+- 49.832 usec)                    Average synthesis took: 4576.500 usec (+- 75.278 usec)
    Average num. events: 956.800 (+- 6.721)                                   Average num. events: 1020.000 (+- 0.000)
    Average time per event 2.602 usec                                         Average time per event 4.487 usec


^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [RFC PATCH v2 06/10] perf workqueue: introduce workqueue struct
  2021-07-30 15:34 ` [RFC PATCH v2 06/10] perf workqueue: introduce workqueue struct Riccardo Mancini
@ 2021-08-09 12:04   ` Jiri Olsa
  0 siblings, 0 replies; 21+ messages in thread
From: Jiri Olsa @ 2021-08-09 12:04 UTC (permalink / raw)
  To: Riccardo Mancini
  Cc: Arnaldo Carvalho de Melo, Ian Rogers, Namhyung Kim,
	Peter Zijlstra, Ingo Molnar, Mark Rutland, linux-kernel,
	linux-perf-users, Alexey Bayduraev

On Fri, Jul 30, 2021 at 05:34:13PM +0200, Riccardo Mancini wrote:

SNIP

> +static void worker_thread(int tidx, struct task_struct *task)
> +{
> +
> +	pr_info("Hi from worker %d, executing task %p\n", tidx, task);
> +}
> +
> +/**
> + * attach_threadpool_to_workqueue - start @wq workers on @pool
> + */
> +static int attach_threadpool_to_workqueue(struct workqueue_struct *wq,
> +					struct threadpool *pool)
> +{
> +	if (!threadpool__is_ready(pool)) {
> +		pr_debug2("workqueue: cannot attach to pool: pool is not ready\n");
> +		return -WORKQUEUE_ERROR__NOTALLOWED;
> +	}
> +
> +	wq->pool = pool;
> +
> +	wq->pool_errno = threadpool__execute(pool, &wq->task);
> +	if (wq->pool_errno)
> +		return -WORKQUEUE_ERROR__POOLEXE;

SNIP

> +
> +/**
> + * create_workqueue - create a workqueue associated to @pool
> + *
> + * Only one workqueue can execute on a pool at a time.
> + */
> +struct workqueue_struct *create_workqueue(struct threadpool *pool)
> +{
> +	int ret, err = 0;
> +	struct workqueue_struct *wq = malloc(sizeof(struct workqueue_struct));
> +
> +	if (!wq) {
> +		err = -ENOMEM;
> +		goto out_return;
> +	}
> +
> +	ret = pthread_mutex_init(&wq->lock, NULL);
> +	if (ret) {
> +		err = -ret;
> +		goto out_free_wq;
> +	}
> +
> +	ret = pthread_cond_init(&wq->idle_cond, NULL);
> +	if (ret) {
> +		err = -ret;
> +		goto out_destroy_mutex;
> +	}
> +
> +	wq->pool = NULL;
> +	INIT_LIST_HEAD(&wq->busy_list);
> +	INIT_LIST_HEAD(&wq->idle_list);
> +
> +	INIT_LIST_HEAD(&wq->pending);
> +
> +	ret = pipe(wq->msg_pipe);
> +	if (ret) {
> +		err = -ENOMEM;
> +		goto out_destroy_cond;
> +	}
> +
> +	wq->task.fn = worker_thread;
> +
> +	ret = attach_threadpool_to_workqueue(wq, pool);
> +	if (ret) {
> +		err = ret;
> +		goto out_destroy_cond;
> +	}
> +
> +	wq->status = WORKQUEUE_STATUS__READY;
> +
> +	return wq;
> +
> +out_destroy_cond:
> +	pthread_cond_destroy(&wq->idle_cond);

leaking wq->msg_pipe?

thanks,
jirka

> +out_destroy_mutex:
> +	pthread_mutex_destroy(&wq->lock);
> +out_free_wq:
> +	free(wq);
> +out_return:
> +	return ERR_PTR(err);
> +}
> +


SNIP


^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [RFC PATCH v2 10/10] perf synthetic-events: use workqueue parallel_for
  2021-08-09 12:04   ` Jiri Olsa
@ 2021-08-09 13:24     ` Riccardo Mancini
  0 siblings, 0 replies; 21+ messages in thread
From: Riccardo Mancini @ 2021-08-09 13:24 UTC (permalink / raw)
  To: Jiri Olsa
  Cc: Arnaldo Carvalho de Melo, Ian Rogers, Namhyung Kim,
	Peter Zijlstra, Ingo Molnar, Mark Rutland, linux-kernel,
	linux-perf-users, Alexey Bayduraev

Hi Jiri,
thanks for looking into this patchset.

On Mon, 2021-08-09 at 14:04 +0200, Jiri Olsa wrote:
> On Fri, Jul 30, 2021 at 05:34:17PM +0200, Riccardo Mancini wrote:
> > To generate synthetic events, perf has the option to use multiple
> > threads. These threads are created manually using pthread_created.
> > 
> > This patch replaces the manual pthread_create with a workqueue,
> > using the parallel_for utility.
> 
> hi,
> I really like this new interface
> 
> > 
> > Experimental results show that workqueue has a slightly higher overhead,
> > but this is repayed by the improved work balancing among threads.
> 
> how did you measure that balancing improvement?
> is there less kernel cycles spent?

I meant that the workqueue with the shared queue is able to balance work among
threads. This is particulary important in synthesize since low pids are
associated to kthreads, which require less processing (as far as I understand),
therefore the current work assignment is not great.

I think the goal of the workqueue is not to be faster than the current
implementation (which it is only due to the aforementioned issue), but to have a
better abstraction with a contained overhead.

> 
> I ran the benchmark and if I'm reading the results correctly I see
> performance drop for high cpu numbers (full list attached below).

The implementation with the shared queue suffers from this problem, but I hoped
it would hold up to more threads.
I'm working on having one queue per thread, in order to be able to scale better
on more cpus. I do not have any workstealing at the moment so there is no
rebalancing of work, but in our usecases it is not that important, at the
moment.
You can find it on https://github.com/Manciukic/linux.git in perf/workqueue/dev
branch. If you can run the same benchmark there, it would be really helpful for
me.

Thanks,
Riccardo

> 
> 
> old perf:                                                                 new
> perf:
> 
> [jolsa@dell-r440-01 perf]$ ./perf.old bench internals synthesize -t      
> [jolsa@dell-r440-01 perf]$ ./perf bench internals synthesize -t
> ...
>   Number of synthesis threads: 40                                          
> Number of synthesis threads: 40
>     Average synthesis took: 2489.400 usec (+- 49.832 usec)                   
> Average synthesis took: 4576.500 usec (+- 75.278 usec)
>     Average num. events: 956.800 (+- 6.721)                                  
> Average num. events: 1020.000 (+- 0.000)
>     Average time per event 2.602 usec                                        
> Average time per event 4.487 usec
> 
> maybe profiling will show what's going on?
> 
> thanks,
> jirka
> 
> 
> ---
> [jolsa@dell-r440-01 perf]$ ./perf.old bench internals synthesize -t      
> [jolsa@dell-r440-01 perf]$ ./perf bench internals synthesize -t
> # Running 'internals/synthesize' benchmark:                               #
> Running 'internals/synthesize' benchmark:
> Computing performance of multi threaded perf event synthesis by          
> Computing performance of multi threaded perf event synthesis by
> synthesizing events on CPU 0:                                            
> synthesizing events on CPU 0:
>   Number of synthesis threads: 1                                           
> Number of synthesis threads: 1 
>     Average synthesis took: 7907.100 usec (+- 197.363 usec)                  
> Average synthesis took: 7972.900 usec (+- 198.158 usec)
>     Average num. events: 956.000 (+- 0.000)                                  
> Average num. events: 936.000 (+- 0.000)
>     Average time per event 8.271 usec                                        
> Average time per event 8.518 usec
>   Number of synthesis threads: 2                                           
> Number of synthesis threads: 2 
>     Average synthesis took: 5616.800 usec (+- 61.253 usec)                   
> Average synthesis took: 5844.700 usec (+- 87.219 usec)
>     Average num. events: 958.800 (+- 0.327)                                  
> Average num. events: 940.000 (+- 0.000)
>     Average time per event 5.858 usec                                        
> Average time per event 6.218 usec
>   Number of synthesis threads: 3                                           
> Number of synthesis threads: 3 
>     Average synthesis took: 4274.000 usec (+- 93.293 usec)                   
> Average synthesis took: 4019.700 usec (+- 67.354 usec)
>     Average num. events: 962.000 (+- 0.000)                                  
> Average num. events: 942.000 (+- 0.000)
>     Average time per event 4.443 usec                                        
> Average time per event 4.267 usec
>   Number of synthesis threads: 4                                           
> Number of synthesis threads: 4 
>     Average synthesis took: 3425.700 usec (+- 43.044 usec)                   
> Average synthesis took: 3382.200 usec (+- 74.652 usec)
>     Average num. events: 959.600 (+- 0.933)                                  
> Average num. events: 944.000 (+- 0.000)
>     Average time per event 3.570 usec                                        
> Average time per event 3.583 usec
>   Number of synthesis threads: 5                                           
> Number of synthesis threads: 5 
>     Average synthesis took: 2958.000 usec (+- 82.951 usec)                   
> Average synthesis took: 3086.500 usec (+- 48.213 usec)
>     Average num. events: 966.000 (+- 0.000)                                  
> Average num. events: 946.000 (+- 0.000)
>     Average time per event 3.062 usec                                        
> Average time per event 3.263 usec
>   Number of synthesis threads: 6                                           
> Number of synthesis threads: 6 
>     Average synthesis took: 2808.400 usec (+- 66.868 usec)                   
> Average synthesis took: 2752.200 usec (+- 56.411 usec)
>     Average num. events: 956.800 (+- 0.327)                                  
> Average num. events: 948.000 (+- 0.000)
>     Average time per event 2.935 usec                                        
> Average time per event 2.903 usec
>   Number of synthesis threads: 7                                           
> Number of synthesis threads: 7 
>     Average synthesis took: 2622.900 usec (+- 83.524 usec)                   
> Average synthesis took: 2548.200 usec (+- 48.042 usec)
>     Average num. events: 958.400 (+- 0.267)                                  
> Average num. events: 950.000 (+- 0.000)
>     Average time per event 2.737 usec                                        
> Average time per event 2.682 usec
>   Number of synthesis threads: 8                                           
> Number of synthesis threads: 8 
>     Average synthesis took: 2271.600 usec (+- 29.181 usec)                   
> Average synthesis took: 2486.600 usec (+- 47.862 usec)
>     Average num. events: 972.000 (+- 0.000)                                  
> Average num. events: 952.000 (+- 0.000)
>     Average time per event 2.337 usec                                        
> Average time per event 2.612 usec
>   Number of synthesis threads: 9                                           
> Number of synthesis threads: 9 
>     Average synthesis took: 2372.000 usec (+- 95.495 usec)                   
> Average synthesis took: 2347.300 usec (+- 23.959 usec)
>     Average num. events: 959.200 (+- 0.952)                                  
> Average num. events: 954.000 (+- 0.000)
>     Average time per event 2.473 usec                                        
> Average time per event 2.460 usec
>   Number of synthesis threads: 10                                          
> Number of synthesis threads: 10
>     Average synthesis took: 2544.600 usec (+- 107.569 usec)                  
> Average synthesis took: 2328.800 usec (+- 14.234 usec)
>     Average num. events: 968.400 (+- 3.124)                                  
> Average num. events: 957.400 (+- 0.306)
>     Average time per event 2.628 usec                                        
> Average time per event 2.432 usec
>   Number of synthesis threads: 11                                          
> Number of synthesis threads: 11
>     Average synthesis took: 2299.300 usec (+- 57.597 usec)                   
> Average synthesis took: 2340.300 usec (+- 34.638 usec)
>     Average num. events: 956.000 (+- 0.000)                                  
> Average num. events: 960.000 (+- 0.000)
>     Average time per event 2.405 usec                                        
> Average time per event 2.438 usec
>   Number of synthesis threads: 12                                          
> Number of synthesis threads: 12
>     Average synthesis took: 2545.500 usec (+- 69.557 usec)                   
> Average synthesis took: 2318.700 usec (+- 15.803 usec)
>     Average num. events: 974.800 (+- 0.611)                                  
> Average num. events: 963.800 (+- 0.200)
>     Average time per event 2.611 usec                                        
> Average time per event 2.406 usec
>   Number of synthesis threads: 13                                          
> Number of synthesis threads: 13
>     Average synthesis took: 2386.400 usec (+- 79.244 usec)                   
> Average synthesis took: 2408.700 usec (+- 27.071 usec)
>     Average num. events: 950.500 (+- 5.726)                                  
> Average num. events: 966.000 (+- 0.000)
>     Average time per event 2.511 usec                                        
> Average time per event 2.493 usec
>   Number of synthesis threads: 14                                          
> Number of synthesis threads: 14 
>     Average synthesis took: 2466.600 usec (+- 57.893 usec)                   
> Average synthesis took: 2547.200 usec (+- 53.445 usec)
>     Average num. events: 957.600 (+- 0.718)                                  
> Average num. events: 968.000 (+- 0.000)
>     Average time per event 2.576 usec                                        
> Average time per event 2.631 usec
>   Number of synthesis threads: 15                                          
> Number of synthesis threads: 15 
>     Average synthesis took: 2249.700 usec (+- 64.026 usec)                   
> Average synthesis took: 2647.900 usec (+- 79.014 usec)
>     Average num. events: 956.000 (+- 0.000)                                  
> Average num. events: 970.000 (+- 0.000)
>     Average time per event 2.353 usec                                        
> Average time per event 2.730 usec
>   Number of synthesis threads: 16                                          
> Number of synthesis threads: 16 
>     Average synthesis took: 2311.700 usec (+- 64.304 usec)                   
> Average synthesis took: 2676.200 usec (+- 34.824 usec)
>     Average num. events: 955.000 (+- 0.907)                                  
> Average num. events: 972.000 (+- 0.000)
>     Average time per event 2.421 usec                                        
> Average time per event 2.753 usec
>   Number of synthesis threads: 17                                          
> Number of synthesis threads: 17 
>     Average synthesis took: 2174.100 usec (+- 36.673 usec)                   
> Average synthesis took: 2580.100 usec (+- 45.414 usec)
>     Average num. events: 971.600 (+- 3.124)                                  
> Average num. events: 974.000 (+- 0.000)
>     Average time per event 2.238 usec                                        
> Average time per event 2.649 usec
>   Number of synthesis threads: 18                                          
> Number of synthesis threads: 18 
>     Average synthesis took: 2294.200 usec (+- 63.657 usec)                   
> Average synthesis took: 2810.200 usec (+- 49.113 usec)
>     Average num. events: 953.200 (+- 0.611)                                  
> Average num. events: 976.000 (+- 0.000)
>     Average time per event 2.407 usec                                        
> Average time per event 2.879 usec
>   Number of synthesis threads: 19                                          
> Number of synthesis threads: 19 
>     Average synthesis took: 2410.700 usec (+- 120.169 usec)                  
> Average synthesis took: 2862.400 usec (+- 36.982 usec)
>     Average num. events: 953.400 (+- 0.306)                                  
> Average num. events: 978.000 (+- 0.000)
>     Average time per event 2.529 usec                                        
> Average time per event 2.927 usec
>   Number of synthesis threads: 20                                          
> Number of synthesis threads: 20 
>     Average synthesis took: 2387.000 usec (+- 91.051 usec)                   
> Average synthesis took: 2908.800 usec (+- 36.404 usec)
>     Average num. events: 952.800 (+- 0.800)                                  
> Average num. events: 978.600 (+- 0.306)
>     Average time per event 2.505 usec                                        
> Average time per event 2.972 usec
>   Number of synthesis threads: 21                                          
> Number of synthesis threads: 21 
>     Average synthesis took: 2275.700 usec (+- 39.815 usec)                   
> Average synthesis took: 3141.100 usec (+- 30.896 usec)
>     Average num. events: 954.600 (+- 0.306)                                  
> Average num. events: 980.000 (+- 0.000)
>     Average time per event 2.384 usec                                        
> Average time per event 3.205 usec
>   Number of synthesis threads: 22                                          
> Number of synthesis threads: 22 
>     Average synthesis took: 2373.200 usec (+- 89.528 usec)                   
> Average synthesis took: 3342.400 usec (+- 112.115 usec)
>     Average num. events: 949.100 (+- 5.843)                                  
> Average num. events: 982.000 (+- 0.000)
>     Average time per event 2.500 usec                                        
> Average time per event 3.404 usec
>   Number of synthesis threads: 23                                          
> Number of synthesis threads: 23 
>     Average synthesis took: 2318.300 usec (+- 39.395 usec)                   
> Average synthesis took: 3269.700 usec (+- 55.215 usec)
>     Average num. events: 954.600 (+- 0.427)                                  
> Average num. events: 984.000 (+- 0.000)
>     Average time per event 2.429 usec                                        
> Average time per event 3.323 usec
>   Number of synthesis threads: 24                                          
> Number of synthesis threads: 24
>     Average synthesis took: 2241.900 usec (+- 52.577 usec)                   
> Average synthesis took: 3379.500 usec (+- 56.380 usec)
>     Average num. events: 954.000 (+- 0.000)                                  
> Average num. events: 986.000 (+- 0.000)
>     Average time per event 2.350 usec                                        
> Average time per event 3.427 usec
>   Number of synthesis threads: 25                                          
> Number of synthesis threads: 25
>     Average synthesis took: 2343.400 usec (+- 101.611 usec)                  
> Average synthesis took: 3382.500 usec (+- 51.535 usec)
>     Average num. events: 956.200 (+- 1.009)                                  
> Average num. events: 988.000 (+- 0.000)
>     Average time per event 2.451 usec                                        
> Average time per event 3.424 usec
>   Number of synthesis threads: 26                                          
> Number of synthesis threads: 26
>     Average synthesis took: 2260.700 usec (+- 18.863 usec)                   
> Average synthesis took: 3391.600 usec (+- 44.053 usec)
>     Average num. events: 954.000 (+- 0.000)                                  
> Average num. events: 990.000 (+- 0.000)
>     Average time per event 2.370 usec                                        
> Average time per event 3.426 usec
>   Number of synthesis threads: 27                                          
> Number of synthesis threads: 27
>     Average synthesis took: 2373.800 usec (+- 74.213 usec)                   
> Average synthesis took: 3659.200 usec (+- 113.176 usec)
>     Average num. events: 955.000 (+- 0.803)                                  
> Average num. events: 992.000 (+- 0.000)
>     Average time per event 2.486 usec                                        
> Average time per event 3.689 usec
>   Number of synthesis threads: 28                                          
> Number of synthesis threads: 28
>     Average synthesis took: 2335.500 usec (+- 49.480 usec)                   
> Average synthesis took: 3625.000 usec (+- 90.131 usec)
>     Average num. events: 954.000 (+- 0.000)                                  
> Average num. events: 994.000 (+- 0.000)
>     Average time per event 2.448 usec                                        
> Average time per event 3.647 usec
>   Number of synthesis threads: 29                                          
> Number of synthesis threads: 29
>     Average synthesis took: 2182.100 usec (+- 41.649 usec)                   
> Average synthesis took: 3708.400 usec (+- 103.717 usec)
>     Average num. events: 954.000 (+- 0.000)                                  
> Average num. events: 996.000 (+- 0.000)
>     Average time per event 2.287 usec                                        
> Average time per event 3.723 usec
>   Number of synthesis threads: 30                                          
> Number of synthesis threads: 30
>     Average synthesis took: 2246.100 usec (+- 58.252 usec)                   
> Average synthesis took: 3820.500 usec (+- 95.282 usec)
>     Average num. events: 954.000 (+- 0.000)                                  
> Average num. events: 998.000 (+- 0.000)
>     Average time per event 2.354 usec                                        
> Average time per event 3.828 usec
>   Number of synthesis threads: 31                                          
> Number of synthesis threads: 31
>     Average synthesis took: 2156.900 usec (+- 26.141 usec)                   
> Average synthesis took: 3881.400 usec (+- 36.277 usec)
>     Average num. events: 948.300 (+- 5.700)                                  
> Average num. events: 1000.000 (+- 0.000)
>     Average time per event 2.274 usec                                        
> Average time per event 3.881 usec
>   Number of synthesis threads: 32                                          
> Number of synthesis threads: 32
>     Average synthesis took: 2295.300 usec (+- 41.538 usec)                   
> Average synthesis took: 4191.700 usec (+- 149.780 usec)
>     Average num. events: 954.000 (+- 0.000)                                  
> Average num. events: 1002.000 (+- 0.000)
>     Average time per event 2.406 usec                                        
> Average time per event 4.183 usec
>   Number of synthesis threads: 33                                          
> Number of synthesis threads: 33
>     Average synthesis took: 2249.100 usec (+- 59.135 usec)                   
> Average synthesis took: 3988.200 usec (+- 25.015 usec)
>     Average num. events: 948.500 (+- 5.726)                                  
> Average num. events: 1004.000 (+- 0.000)
>     Average time per event 2.371 usec                                        
> Average time per event 3.972 usec
>   Number of synthesis threads: 34                                          
> Number of synthesis threads: 34
>     Average synthesis took: 2270.400 usec (+- 65.011 usec)                   
> Average synthesis took: 4064.600 usec (+- 44.158 usec)
>     Average num. events: 954.200 (+- 0.200)                                  
> Average num. events: 1006.000 (+- 0.000)
>     Average time per event 2.379 usec                                        
> Average time per event 4.040 usec
>   Number of synthesis threads: 35                                          
> Number of synthesis threads: 35
>     Average synthesis took: 2259.200 usec (+- 44.287 usec)                   
> Average synthesis took: 4145.700 usec (+- 37.297 usec)
>     Average num. events: 954.000 (+- 0.000)                                  
> Average num. events: 1008.000 (+- 0.000)
>     Average time per event 2.368 usec                                        
> Average time per event 4.113 usec
>   Number of synthesis threads: 36                                          
> Number of synthesis threads: 36
>     Average synthesis took: 2294.100 usec (+- 38.693 usec)                   
> Average synthesis took: 4234.900 usec (+- 81.904 usec)
>     Average num. events: 954.000 (+- 0.000)                                  
> Average num. events: 1010.400 (+- 0.267)
>     Average time per event 2.405 usec                                        
> Average time per event 4.191 usec
>   Number of synthesis threads: 37                                          
> Number of synthesis threads: 37
>     Average synthesis took: 2338.900 usec (+- 80.346 usec)                   
> Average synthesis took: 4337.900 usec (+- 30.071 usec)
>     Average num. events: 954.400 (+- 0.267)                                  
> Average num. events: 1014.000 (+- 0.000)
>     Average time per event 2.451 usec                                        
> Average time per event 4.278 usec
>   Number of synthesis threads: 38                                          
> Number of synthesis threads: 38
>     Average synthesis took: 2406.300 usec (+- 57.140 usec)                   
> Average synthesis took: 4426.600 usec (+- 27.035 usec)
>     Average num. events: 938.400 (+- 7.730)                                  
> Average num. events: 1016.000 (+- 0.000)
>     Average time per event 2.564 usec                                        
> Average time per event 4.357 usec
>   Number of synthesis threads: 39                                          
> Number of synthesis threads: 39
>     Average synthesis took: 2371.000 usec (+- 35.676 usec)                   
> Average synthesis took: 5979.000 usec (+- 1518.855 usec)
>     Average num. events: 963.000 (+- 0.000)                                  
> Average num. events: 1018.000 (+- 0.000)
>     Average time per event 2.462 usec                                        
> Average time per event 5.873 usec
>   Number of synthesis threads: 40                                          
> Number of synthesis threads: 40
>     Average synthesis took: 2489.400 usec (+- 49.832 usec)                   
> Average synthesis took: 4576.500 usec (+- 75.278 usec)
>     Average num. events: 956.800 (+- 6.721)                                  
> Average num. events: 1020.000 (+- 0.000)
>     Average time per event 2.602 usec                                        
> Average time per event 4.487 usec
> 



^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [RFC PATCH v2 01/10] perf workqueue: threadpool creation and destruction
  2021-08-09 10:30     ` Riccardo Mancini
@ 2021-08-10 18:54       ` Namhyung Kim
  2021-08-10 20:24         ` Arnaldo Carvalho de Melo
  0 siblings, 1 reply; 21+ messages in thread
From: Namhyung Kim @ 2021-08-10 18:54 UTC (permalink / raw)
  To: Riccardo Mancini
  Cc: Arnaldo Carvalho de Melo, Ian Rogers, Peter Zijlstra,
	Ingo Molnar, Mark Rutland, Jiri Olsa, linux-kernel,
	linux-perf-users, Alexey Bayduraev

On Mon, Aug 9, 2021 at 3:30 AM Riccardo Mancini <rickyman7@gmail.com> wrote:
> On Fri, 2021-08-06 at 19:24 -0700, Namhyung Kim wrote:
> > > +
> > > +/**
> > > + * threadpool__strerror - print message regarding given @err in @pool
> > > + *
> > > + * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> > > + */
> > > +int threadpool__strerror(struct threadpool *pool __maybe_unused, int err,
> > > char *buf, size_t size)
> > > +{
> > > +       char sbuf[STRERR_BUFSIZE], *emsg;
> > > +
> > > +       emsg = str_error_r(err, sbuf, sizeof(sbuf));
> > > +       return scnprintf(buf, size, "Error: %s.\n", emsg);
> > > +}
> > > +
> > > +/**
> > > + * threadpool__new_strerror - print message regarding @err_ptr
> > > + *
> > > + * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> > > + */
> > > +int threadpool__new_strerror(struct threadpool *err_ptr, char *buf, size_t
> > > size)
> > > +{
> > > +       return threadpool__strerror(err_ptr, PTR_ERR(err_ptr), buf, size);
> > > +}
> >
> > Why two different functions?
>
> Since when new fails you don't have a err number, just an err_ptr so it's not
> very clear how to call threadpool__strerror. Therefore I made a wrapper to
> remove any ambiguity.

What do you mean by "when new fails"?

> >
> > > +
> > > +/**
> > > + * threadpool__delete - free the @pool and all its resources
> > > + */
> > > +void threadpool__delete(struct threadpool *pool)
> > > +{
> > > +       int t;
> > > +
> > > +       if (IS_ERR_OR_NULL(pool))
> > > +               return;
> > > +
> > > +       WARN_ON(pool->status != THREADPOOL_STATUS__STOPPED
> > > +               && pool->status != THREADPOOL_STATUS__ERROR);
> > > +
> > > +       for (t = 0; t < pool->nr_threads; t++)
> > > +               threadpool_entry__close_pipes(&pool->threads[t]);
> > > +
> > > +       zfree(&pool->threads);
> > > +       free(pool);
> > > +}
> > > +
> > > +/**
> > > + * threadpool__size - get number of threads in the threadpool
> > > + */
> > > +int threadpool__size(struct threadpool *pool)
> > > +{
> > > +       return pool->nr_threads;
> > > +}
> > > diff --git a/tools/perf/util/workqueue/threadpool.h
> > > b/tools/perf/util/workqueue/threadpool.h
> > > new file mode 100644
> > > index 0000000000000000..fb18aa32fb64f671
> > > --- /dev/null
> > > +++ b/tools/perf/util/workqueue/threadpool.h
> > > @@ -0,0 +1,30 @@
> > > +/* SPDX-License-Identifier: GPL-2.0 */
> > > +#ifndef __WORKQUEUE_THREADPOOL_H
> > > +#define __WORKQUEUE_THREADPOOL_H
> > > +
> > > +struct threadpool;
> > > +struct task_struct;
> >
> > You can just move the definition here.
> >
> > > +
> > > +typedef void (*task_func_t)(int tidx, struct task_struct *task);
> > > +
> > > +struct task_struct {
> > > +       task_func_t fn;
> > > +};
>
> I thought it was not allowed, since task_func_t refers to task_struct and
> viceversa.
> I will try to remove it if possible.

Oh, I missed that, sorry for the noise.

Thanks,
namhyung

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [RFC PATCH v2 01/10] perf workqueue: threadpool creation and destruction
  2021-08-10 18:54       ` Namhyung Kim
@ 2021-08-10 20:24         ` Arnaldo Carvalho de Melo
  2021-08-11 17:55           ` Riccardo Mancini
  0 siblings, 1 reply; 21+ messages in thread
From: Arnaldo Carvalho de Melo @ 2021-08-10 20:24 UTC (permalink / raw)
  To: Namhyung Kim
  Cc: Riccardo Mancini, Ian Rogers, Peter Zijlstra, Ingo Molnar,
	Mark Rutland, Jiri Olsa, linux-kernel, linux-perf-users,
	Alexey Bayduraev

Em Tue, Aug 10, 2021 at 11:54:19AM -0700, Namhyung Kim escreveu:
> On Mon, Aug 9, 2021 at 3:30 AM Riccardo Mancini <rickyman7@gmail.com> wrote:
> > On Fri, 2021-08-06 at 19:24 -0700, Namhyung Kim wrote:
> > > > +/**
> > > > + * threadpool__strerror - print message regarding given @err in @pool
> > > > + *
> > > > + * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> > > > + */
> > > > +int threadpool__strerror(struct threadpool *pool __maybe_unused, int err,
> > > > char *buf, size_t size)
> > > > +{
> > > > +       char sbuf[STRERR_BUFSIZE], *emsg;
> > > > +
> > > > +       emsg = str_error_r(err, sbuf, sizeof(sbuf));
> > > > +       return scnprintf(buf, size, "Error: %s.\n", emsg);
> > > > +}
> > > > +
> > > > +/**
> > > > + * threadpool__new_strerror - print message regarding @err_ptr
> > > > + *
> > > > + * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> > > > + */
> > > > +int threadpool__new_strerror(struct threadpool *err_ptr, char *buf, size_t
> > > > size)
> > > > +{
> > > > +       return threadpool__strerror(err_ptr, PTR_ERR(err_ptr), buf, size);
> > > > +}

> > > Why two different functions?

> > Since when new fails you don't have a err number, just an err_ptr so it's not
> > very clear how to call threadpool__strerror. Therefore I made a wrapper to
> > remove any ambiguity.
> 
> What do you mean by "when new fails"?

I think 'new' is 'constructor', i.e. something__new() returns a newly
created object and this not an error number, so he uses ERR_PTR() and
then he needs to pass it to the 'strerror' specific to the
threadpool__new, which will use PTR_ERR() to get an integer, and then
map that to a proper error string, right?

- Arnaldo

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [RFC PATCH v2 01/10] perf workqueue: threadpool creation and destruction
  2021-08-10 20:24         ` Arnaldo Carvalho de Melo
@ 2021-08-11 17:55           ` Riccardo Mancini
  0 siblings, 0 replies; 21+ messages in thread
From: Riccardo Mancini @ 2021-08-11 17:55 UTC (permalink / raw)
  To: Arnaldo Carvalho de Melo, Namhyung Kim
  Cc: Ian Rogers, Peter Zijlstra, Ingo Molnar, Mark Rutland, Jiri Olsa,
	linux-kernel, linux-perf-users, Alexey Bayduraev

On Tue, 2021-08-10 at 17:24 -0300, Arnaldo Carvalho de Melo wrote:
> Em Tue, Aug 10, 2021 at 11:54:19AM -0700, Namhyung Kim escreveu:
> > On Mon, Aug 9, 2021 at 3:30 AM Riccardo Mancini <rickyman7@gmail.com> wrote:
> > > On Fri, 2021-08-06 at 19:24 -0700, Namhyung Kim wrote:
> > > > > +/**
> > > > > + * threadpool__strerror - print message regarding given @err in @pool
> > > > > + *
> > > > > + * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> > > > > + */
> > > > > +int threadpool__strerror(struct threadpool *pool __maybe_unused, int
> > > > > err,
> > > > > char *buf, size_t size)
> > > > > +{
> > > > > +       char sbuf[STRERR_BUFSIZE], *emsg;
> > > > > +
> > > > > +       emsg = str_error_r(err, sbuf, sizeof(sbuf));
> > > > > +       return scnprintf(buf, size, "Error: %s.\n", emsg);
> > > > > +}
> > > > > +
> > > > > +/**
> > > > > + * threadpool__new_strerror - print message regarding @err_ptr
> > > > > + *
> > > > > + * Buffer size should be at least THREADPOOL_STRERR_BUFSIZE bytes.
> > > > > + */
> > > > > +int threadpool__new_strerror(struct threadpool *err_ptr, char *buf,
> > > > > size_t
> > > > > size)
> > > > > +{
> > > > > +       return threadpool__strerror(err_ptr, PTR_ERR(err_ptr), buf,
> > > > > size);
> > > > > +}
> 
> > > > Why two different functions?
> 
> > > Since when new fails you don't have a err number, just an err_ptr so it's
> > > not
> > > very clear how to call threadpool__strerror. Therefore I made a wrapper to
> > > remove any ambiguity.
> > 
> > What do you mean by "when new fails"?
> 
> I think 'new' is 'constructor', i.e. something__new() returns a newly
> created object and this not an error number, so he uses ERR_PTR() and
> then he needs to pass it to the 'strerror' specific to the
> threadpool__new, which will use PTR_ERR() to get an integer, and then
> map that to a proper error string, right?

Correct. 
threadpool__new_strerror is not really needed since one could use
threadpool__strerror directly, but then I would need to handle all possible ways
it could be called (e.g. (NULL, PTR_ERR(err_ptr)), (err_ptr, 0), (err_ptr,
PTR_ERR(err_ptr)), so I thought it was better to just provide a strerror that
only took the err_ptr.
Maybe I can provide it as an inline in the header, or as a macro.

Riccardo

> 
> - Arnaldo



^ permalink raw reply	[flat|nested] 21+ messages in thread

end of thread, other threads:[~2021-08-11 17:55 UTC | newest]

Thread overview: 21+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
     [not found] <cover.1627657061.git.rickyman7@gmail.com>
2021-07-30 15:34 ` [RFC PATCH v2 01/10] perf workqueue: threadpool creation and destruction Riccardo Mancini
2021-08-07  2:24   ` Namhyung Kim
2021-08-09 10:30     ` Riccardo Mancini
2021-08-10 18:54       ` Namhyung Kim
2021-08-10 20:24         ` Arnaldo Carvalho de Melo
2021-08-11 17:55           ` Riccardo Mancini
2021-07-30 15:34 ` [RFC PATCH v2 02/10] perf tests: add test for workqueue Riccardo Mancini
2021-07-30 15:34 ` [RFC PATCH v2 03/10] perf workqueue: add threadpool start and stop functions Riccardo Mancini
2021-08-07  2:43   ` Namhyung Kim
2021-08-09 10:35     ` Riccardo Mancini
2021-07-30 15:34 ` [RFC PATCH v2 04/10] perf workqueue: add threadpool execute and wait functions Riccardo Mancini
2021-08-07  2:56   ` Namhyung Kim
2021-07-30 15:34 ` [RFC PATCH v2 05/10] tools: add sparse context/locking annotations in compiler-types.h Riccardo Mancini
2021-07-30 15:34 ` [RFC PATCH v2 06/10] perf workqueue: introduce workqueue struct Riccardo Mancini
2021-08-09 12:04   ` Jiri Olsa
2021-07-30 15:34 ` [RFC PATCH v2 07/10] perf workqueue: implement worker thread and management Riccardo Mancini
2021-07-30 15:34 ` [RFC PATCH v2 08/10] perf workqueue: add queue_work and flush_workqueue functions Riccardo Mancini
2021-07-30 15:34 ` [RFC PATCH v2 09/10] perf workqueue: add utility to execute a for loop in parallel Riccardo Mancini
2021-07-30 15:34 ` [RFC PATCH v2 10/10] perf synthetic-events: use workqueue parallel_for Riccardo Mancini
2021-08-09 12:04   ` Jiri Olsa
2021-08-09 13:24     ` Riccardo Mancini

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).