From: Riccardo Mancini <rickyman7@gmail.com> To: Arnaldo Carvalho de Melo <acme@kernel.org> Cc: Ian Rogers <irogers@google.com>, Namhyung Kim <namhyung@kernel.org>, Peter Zijlstra <peterz@infradead.org>, Ingo Molnar <mingo@redhat.com>, Mark Rutland <mark.rutland@arm.com>, Jiri Olsa <jolsa@redhat.com>, linux-kernel@vger.kernel.org, linux-perf-users@vger.kernel.org, Alexey Bayduraev <alexey.v.bayduraev@linux.intel.com>, Riccardo Mancini <rickyman7@gmail.com> Subject: [RFC PATCH v3 06/15] perf workqueue: introduce workqueue struct Date: Fri, 20 Aug 2021 12:53:52 +0200 [thread overview] Message-ID: <9dee672538967a3cf7e82ef194ebccc709b1af41.1629454773.git.rickyman7@gmail.com> (raw) In-Reply-To: <cover.1629454773.git.rickyman7@gmail.com> This patch adds the workqueue definition, along with simple creation and destruction functions. Furthermore, a simple subtest is added. A workqueue is attached to a pool, on which it executes its workers. Next patches will introduce workers. Signed-off-by: Riccardo Mancini <rickyman7@gmail.com> --- tools/perf/tests/workqueue.c | 81 +++++++++++ tools/perf/util/workqueue/Build | 1 + tools/perf/util/workqueue/workqueue.c | 196 ++++++++++++++++++++++++++ tools/perf/util/workqueue/workqueue.h | 39 +++++ 4 files changed, 317 insertions(+) create mode 100644 tools/perf/util/workqueue/workqueue.c create mode 100644 tools/perf/util/workqueue/workqueue.h diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c index b145a5155089497f..1aa6ee788b0b1c32 100644 --- a/tools/perf/tests/workqueue.c +++ b/tools/perf/tests/workqueue.c @@ -7,6 +7,7 @@ #include "tests.h" #include "util/debug.h" #include "util/workqueue/threadpool.h" +#include "util/workqueue/workqueue.h" #define DUMMY_FACTOR 100000 #define N_DUMMY_WORK_SIZES 7 @@ -15,6 +16,11 @@ struct threadpool_test_args_t { int pool_size; }; +struct workqueue_test_args_t { + int pool_size; + int n_work_items; +}; + struct test_task { struct task_struct task; int n_threads; @@ -141,6 +147,43 @@ static int __test__threadpool(void *_args) return ret; } +static int __workqueue__prepare(struct workqueue_struct **wq, + int pool_size) +{ + *wq = create_workqueue(pool_size); + TEST_ASSERT_VAL("workqueue creation failure", !IS_ERR(*wq)); + TEST_ASSERT_VAL("workqueue wrong size", workqueue_nr_threads(*wq) == pool_size); + + return TEST_OK; +} + +static int __workqueue__teardown(struct workqueue_struct *wq) +{ + int ret = destroy_workqueue(wq); + + TEST_ASSERT_VAL("workqueue detruction failure", ret == 0); + + return 0; +} + +static int __test__workqueue(void *_args) +{ + struct workqueue_test_args_t *args = _args; + struct workqueue_struct *wq; + int pool_size = args->pool_size ?: sysconf(_SC_NPROCESSORS_ONLN); + int ret = __workqueue__prepare(&wq, pool_size); + + if (ret) + goto out; + + ret = __workqueue__teardown(wq); + if (ret) + goto out; + +out: + return ret; +} + static const struct threadpool_test_args_t threadpool_test_args[] = { { .pool_size = 1 @@ -162,6 +205,37 @@ static const struct threadpool_test_args_t threadpool_test_args[] = { } }; +static const struct workqueue_test_args_t workqueue_test_args[] = { + { + .pool_size = 1, + .n_work_items = 1 + }, + { + .pool_size = 1, + .n_work_items = 10 + }, + { + .pool_size = 2, + .n_work_items = 1 + }, + { + .pool_size = 2, + .n_work_items = 100 + }, + { + .pool_size = 16, + .n_work_items = 7 + }, + { + .pool_size = 16, + .n_work_items = 2789 + }, + { + .pool_size = 0, // sysconf(_SC_NPROCESSORS_ONLN) + .n_work_items = 8191 + } +}; + struct test_case { const char *desc; int (*func)(void *args); @@ -177,6 +251,13 @@ static struct test_case workqueue_testcase_table[] = { .args = (void *) threadpool_test_args, .n_args = (int)ARRAY_SIZE(threadpool_test_args), .arg_size = sizeof(struct threadpool_test_args_t) + }, + { + .desc = "Workqueue", + .func = __test__workqueue, + .args = (void *) workqueue_test_args, + .n_args = (int)ARRAY_SIZE(workqueue_test_args), + .arg_size = sizeof(struct workqueue_test_args_t) } }; diff --git a/tools/perf/util/workqueue/Build b/tools/perf/util/workqueue/Build index 8b72a6cd4e2cba0d..4af721345c0a6bb7 100644 --- a/tools/perf/util/workqueue/Build +++ b/tools/perf/util/workqueue/Build @@ -1 +1,2 @@ perf-y += threadpool.o +perf-y += workqueue.o diff --git a/tools/perf/util/workqueue/workqueue.c b/tools/perf/util/workqueue/workqueue.c new file mode 100644 index 0000000000000000..053aac43e038f0b7 --- /dev/null +++ b/tools/perf/util/workqueue/workqueue.c @@ -0,0 +1,196 @@ +// SPDX-License-Identifier: GPL-2.0 +#include <stdlib.h> +#include <stdio.h> +#include <unistd.h> +#include <errno.h> +#include <string.h> +#include <pthread.h> +#include <linux/list.h> +#include <linux/err.h> +#include <linux/string.h> +#include <linux/zalloc.h> +#include "debug.h" +#include <internal/lib.h> +#include "workqueue.h" + +struct workqueue_struct { + pthread_mutex_t lock; /* locking of the workqueue */ + pthread_cond_t idle_cond; /* all workers are idle cond */ + struct threadpool *pool; /* underlying pool */ + int pool_errno; /* latest pool error */ + struct task_struct task; /* threadpool task */ + struct list_head busy_list; /* busy workers */ + struct list_head idle_list; /* idle workers */ + int msg_pipe[2]; /* main thread comm pipes */ +}; + +static const char * const workqueue_errno_str[] = { + "Error creating threadpool", + "Error executing function in threadpool", + "Error stopping the threadpool", + "Error starting thread in the threadpool", + "Error sending message to worker", + "Error receiving message from worker", + "Received unexpected message from worker", +}; + +/** + * create_workqueue - create a workqueue associated to @pool + * + * The workqueue will create a threadpool on which to execute. + */ +struct workqueue_struct *create_workqueue(int nr_threads) +{ + int ret, err = 0; + struct workqueue_struct *wq = zalloc(sizeof(struct workqueue_struct)); + + if (!wq) { + err = -ENOMEM; + goto out_return; + } + + wq->pool = threadpool__new(nr_threads); + if (IS_ERR(wq->pool)) { + err = -WORKQUEUE_ERROR__POOLNEW; + wq->pool_errno = PTR_ERR(wq->pool); + goto out_free_wq; + } + + ret = pthread_mutex_init(&wq->lock, NULL); + if (ret) { + err = -ret; + goto out_delete_pool; + } + + ret = pthread_cond_init(&wq->idle_cond, NULL); + if (ret) { + err = -ret; + goto out_destroy_mutex; + } + + INIT_LIST_HEAD(&wq->busy_list); + INIT_LIST_HEAD(&wq->idle_list); + + ret = pipe(wq->msg_pipe); + if (ret) { + err = -ENOMEM; + goto out_destroy_cond; + } + + return wq; + +out_destroy_cond: + pthread_cond_destroy(&wq->idle_cond); +out_destroy_mutex: + pthread_mutex_destroy(&wq->lock); +out_delete_pool: + threadpool__delete(wq->pool); +out_free_wq: + free(wq); +out_return: + return ERR_PTR(err); +} + +/** + * destroy_workqueue - stop @wq workers and destroy @wq + */ +int destroy_workqueue(struct workqueue_struct *wq) +{ + int err = 0, ret; + char sbuf[STRERR_BUFSIZE]; + + if (IS_ERR_OR_NULL(wq)) + return 0; + + threadpool__delete(wq->pool); + wq->pool = NULL; + + ret = pthread_mutex_destroy(&wq->lock); + if (ret) { + err = -ret; + pr_debug2("workqueue: error pthread_mutex_destroy: %s\n", + str_error_r(ret, sbuf, sizeof(sbuf))); + } + + ret = pthread_cond_destroy(&wq->idle_cond); + if (ret) { + err = -ret; + pr_debug2("workqueue: error pthread_cond_destroy: %s\n", + str_error_r(ret, sbuf, sizeof(sbuf))); + } + + close(wq->msg_pipe[0]); + wq->msg_pipe[0] = -1; + + close(wq->msg_pipe[1]); + wq->msg_pipe[1] = -1; + + free(wq); + return err; +} + +/** + * workqueue_strerror - print message regarding lastest error in @wq + * + * Buffer size should be at least WORKQUEUE_STRERR_BUFSIZE bytes. + */ +int workqueue_strerror(struct workqueue_struct *wq, int err, char *buf, size_t size) +{ + int ret; + char sbuf[THREADPOOL_STRERR_BUFSIZE], *emsg; + const char *errno_str; + + errno_str = workqueue_errno_str[-err-WORKQUEUE_ERROR__OFFSET]; + + switch (err) { + case -WORKQUEUE_ERROR__POOLNEW: + case -WORKQUEUE_ERROR__POOLEXE: + case -WORKQUEUE_ERROR__POOLSTOP: + case -WORKQUEUE_ERROR__POOLSTARTTHREAD: + if (IS_ERR_OR_NULL(wq)) + return scnprintf(buf, size, "%s: unknown.\n", + errno_str); + + ret = threadpool__strerror(wq->pool, wq->pool_errno, sbuf, sizeof(sbuf)); + if (ret < 0) + return ret; + return scnprintf(buf, size, "%s: %s.\n", errno_str, sbuf); + case -WORKQUEUE_ERROR__WRITEPIPE: + case -WORKQUEUE_ERROR__READPIPE: + emsg = str_error_r(errno, sbuf, sizeof(sbuf)); + return scnprintf(buf, size, "%s: %s.\n", errno_str, emsg); + case -WORKQUEUE_ERROR__INVALIDMSG: + return scnprintf(buf, size, "%s.\n", errno_str); + default: + emsg = str_error_r(err, sbuf, sizeof(sbuf)); + return scnprintf(buf, size, "Error: %s", emsg); + } +} + +/** + * create_workqueue_strerror - print message regarding @err_ptr + * + * Buffer size should be at least WORKQUEUE_STRERR_BUFSIZE bytes. + */ +int create_workqueue_strerror(struct workqueue_struct *err_ptr, char *buf, size_t size) +{ + return workqueue_strerror(err_ptr, PTR_ERR(err_ptr), buf, size); +} + +/** + * destroy_workqueue_strerror - print message regarding @err + * + * Buffer size should be at least WORKQUEUE_STRERR_BUFSIZE bytes. + */ +int destroy_workqueue_strerror(int err, char *buf, size_t size) +{ + return workqueue_strerror(NULL, err, buf, size); +} + +/** + * workqueue_nr_threads - get size of threadpool underlying @wq + */ +int workqueue_nr_threads(struct workqueue_struct *wq) +{ + return threadpool__size(wq->pool); +} diff --git a/tools/perf/util/workqueue/workqueue.h b/tools/perf/util/workqueue/workqueue.h new file mode 100644 index 0000000000000000..100841cc035fde1d --- /dev/null +++ b/tools/perf/util/workqueue/workqueue.h @@ -0,0 +1,39 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +#ifndef __WORKQUEUE_WORKQUEUE_H +#define __WORKQUEUE_WORKQUEUE_H + +#include <stdlib.h> +#include <sys/types.h> +#include <linux/list.h> +#include "threadpool.h" + +struct work_struct; +typedef void (*work_func_t)(struct work_struct *work); + +struct work_struct { + struct list_head entry; + work_func_t func; +}; + +struct workqueue_struct; + +extern struct workqueue_struct *create_workqueue(int nr_threads); +extern int destroy_workqueue(struct workqueue_struct *wq); + +extern int workqueue_nr_threads(struct workqueue_struct *wq); + +#define WORKQUEUE_STRERR_BUFSIZE (128+THREADPOOL_STRERR_BUFSIZE) +#define WORKQUEUE_ERROR__OFFSET 512 +enum { + WORKQUEUE_ERROR__POOLNEW = WORKQUEUE_ERROR__OFFSET, + WORKQUEUE_ERROR__POOLEXE, + WORKQUEUE_ERROR__POOLSTOP, + WORKQUEUE_ERROR__POOLSTARTTHREAD, + WORKQUEUE_ERROR__WRITEPIPE, + WORKQUEUE_ERROR__READPIPE, + WORKQUEUE_ERROR__INVALIDMSG, +}; +extern int workqueue_strerror(struct workqueue_struct *wq, int err, char *buf, size_t size); +extern int create_workqueue_strerror(struct workqueue_struct *err_ptr, char *buf, size_t size); +extern int destroy_workqueue_strerror(int err, char *buf, size_t size); +#endif /* __WORKQUEUE_WORKQUEUE_H */ -- 2.31.1
next prev parent reply other threads:[~2021-08-20 10:54 UTC|newest] Thread overview: 24+ messages / expand[flat|nested] mbox.gz Atom feed top 2021-08-20 10:53 [RFC PATCH v3 00/15] perf: add workqueue library and use it in synthetic-events Riccardo Mancini 2021-08-20 10:53 ` [RFC PATCH v3 01/15] perf workqueue: threadpool creation and destruction Riccardo Mancini 2021-08-20 10:53 ` [RFC PATCH v3 02/15] perf tests: add test for workqueue Riccardo Mancini 2021-08-20 10:53 ` [RFC PATCH v3 03/15] perf workqueue: add threadpool start and stop functions Riccardo Mancini 2021-08-20 10:53 ` [RFC PATCH v3 04/15] perf workqueue: add threadpool execute and wait functions Riccardo Mancini 2021-08-20 10:53 ` [RFC PATCH v3 05/15] tools: add sparse context/locking annotations in compiler-types.h Riccardo Mancini 2021-08-20 10:53 ` Riccardo Mancini [this message] 2021-08-24 19:27 ` [RFC PATCH v3 06/15] perf workqueue: introduce workqueue struct Namhyung Kim 2021-08-31 16:13 ` Riccardo Mancini 2021-08-20 10:53 ` [RFC PATCH v3 07/15] perf workqueue: implement worker thread and management Riccardo Mancini 2021-08-30 7:22 ` Jiri Olsa 2021-08-20 10:53 ` [RFC PATCH v3 08/15] perf workqueue: add queue_work and flush_workqueue functions Riccardo Mancini 2021-08-24 19:40 ` Namhyung Kim 2021-08-31 16:23 ` Riccardo Mancini 2021-08-20 10:53 ` [RFC PATCH v3 09/15] perf workqueue: spinup threads when needed Riccardo Mancini 2021-08-20 10:53 ` [RFC PATCH v3 10/15] perf workqueue: create global workqueue Riccardo Mancini 2021-08-20 10:53 ` [RFC PATCH v3 11/15] perf workqueue: add utility to execute a for loop in parallel Riccardo Mancini 2021-08-20 10:53 ` [RFC PATCH v3 12/15] perf record: setup global workqueue Riccardo Mancini 2021-08-20 10:53 ` [RFC PATCH v3 13/15] perf top: " Riccardo Mancini 2021-08-20 10:54 ` [RFC PATCH v3 14/15] perf test/synthesis: " Riccardo Mancini 2021-08-20 10:54 ` [RFC PATCH v3 15/15] perf synthetic-events: use workqueue parallel_for Riccardo Mancini 2021-08-29 21:59 ` [RFC PATCH v3 00/15] perf: add workqueue library and use it in synthetic-events Jiri Olsa 2021-08-31 15:46 ` Jiri Olsa 2021-08-31 16:57 ` Riccardo Mancini
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=9dee672538967a3cf7e82ef194ebccc709b1af41.1629454773.git.rickyman7@gmail.com \ --to=rickyman7@gmail.com \ --cc=acme@kernel.org \ --cc=alexey.v.bayduraev@linux.intel.com \ --cc=irogers@google.com \ --cc=jolsa@redhat.com \ --cc=linux-kernel@vger.kernel.org \ --cc=linux-perf-users@vger.kernel.org \ --cc=mark.rutland@arm.com \ --cc=mingo@redhat.com \ --cc=namhyung@kernel.org \ --cc=peterz@infradead.org \ --subject='Re: [RFC PATCH v3 06/15] perf workqueue: introduce workqueue struct' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox; as well as URLs for NNTP newsgroup(s).