linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
From: Riccardo Mancini <rickyman7@gmail.com>
To: Arnaldo Carvalho de Melo <acme@kernel.org>
Cc: Ian Rogers <irogers@google.com>,
	Namhyung Kim <namhyung@kernel.org>,
	Peter Zijlstra <peterz@infradead.org>,
	Ingo Molnar <mingo@redhat.com>,
	Mark Rutland <mark.rutland@arm.com>, Jiri Olsa <jolsa@redhat.com>,
	linux-kernel@vger.kernel.org, linux-perf-users@vger.kernel.org,
	Alexey Bayduraev <alexey.v.bayduraev@linux.intel.com>,
	Riccardo Mancini <rickyman7@gmail.com>
Subject: [RFC PATCH v2 07/10] perf workqueue: implement worker thread and management
Date: Fri, 30 Jul 2021 17:34:14 +0200	[thread overview]
Message-ID: <c693ab79b15914ce2da05d223dd37943a16a1e70.1627657061.git.rickyman7@gmail.com> (raw)
In-Reply-To: <cover.1627657061.git.rickyman7@gmail.com>

This patch adds the implementation of the worker thread that is executed
in the threadpool, and all management-related functions.

At startup, a worker registers itself with the workqueue by adding itself
to the idle_list, then it sends an ack back to the main thread. When
creating wotkers, the main thread will wait for the related acks.
Once there is work to do, threads are woken up to perform the work.
Threads will try to dequeue a new pending work before going to sleep.

This registering mechanism has been implemented to enable for dynamic
growth of the workqueue in the future.

Signed-off-by: Riccardo Mancini <rickyman7@gmail.com>
---
 tools/perf/util/workqueue/workqueue.c | 243 +++++++++++++++++++++++++-
 1 file changed, 242 insertions(+), 1 deletion(-)

diff --git a/tools/perf/util/workqueue/workqueue.c b/tools/perf/util/workqueue/workqueue.c
index d3c6d4c4e75944a5..16a55de25cf247d8 100644
--- a/tools/perf/util/workqueue/workqueue.c
+++ b/tools/perf/util/workqueue/workqueue.c
@@ -12,6 +12,15 @@
 #include <internal/lib.h>
 #include "workqueue.h"
 
+enum worker_msg {
+	WORKER_MSG__UNDEFINED,
+	WORKER_MSG__READY,                          /* from worker: ack */
+	WORKER_MSG__WAKE,                           /* to worker: wake up */
+	WORKER_MSG__STOP,                           /* to worker: exit */
+	WORKER_MSG__ERROR,
+	WORKER_MSG__MAX
+};
+
 enum workqueue_status {
 	WORKQUEUE_STATUS__READY,	/* wq is ready to receive work */
 	WORKQUEUE_STATUS__ERROR,
@@ -46,13 +55,216 @@ static const char * const workqueue_errno_str[] = {
 	"Received unexpected message from worker",
 };
 
+struct worker {
+	int				tidx;		/* idx of thread in pool */
+	struct list_head		entry;		/* in idle or busy list */
+	struct work_struct		*current_work;	/* work being processed */
+	int				msg_pipe[2];	/* main thread comm pipes*/
+};
+
+#define for_each_busy_worker(wq, m_worker) \
+	list_for_each_entry(m_worker, &wq->busy_list, entry)
+
+#define for_each_idle_worker(wq, m_worker) \
+	list_for_each_entry(m_worker, &wq->idle_list, entry)
+
+static inline int lock_workqueue(struct workqueue_struct *wq)
+__acquires(&wq->lock)
+{
+	__acquire(&wq->lock);
+	return pthread_mutex_lock(&wq->lock);
+}
+
+static inline int unlock_workqueue(struct workqueue_struct *wq)
+__releases(&wq->lock)
+{
+	__release(&wq->lock);
+	return pthread_mutex_unlock(&wq->lock);
+}
+
+/**
+ * available_work - check if @wq has work to do
+ */
+static int available_work(struct workqueue_struct *wq)
+__must_hold(&wq->lock)
+{
+	return !list_empty(&wq->pending);
+}
+
+/**
+ * dequeue_work - retrieve the next work in @wq to be executed by the worker
+ *
+ * Called inside worker.
+ */
+static struct work_struct *dequeue_work(struct workqueue_struct *wq)
+__must_hold(&wq->lock)
+{
+	struct work_struct *work = list_first_entry(&wq->pending, struct work_struct, entry);
+
+	list_del_init(&work->entry);
+	return work;
+}
+
+/**
+ * sleep_worker - worker @w of workqueue @wq goes to sleep
+ *
+ * Called inside worker.
+ * If this was the last idle thread, signal it to the main thread, in case it
+ * was flushing the workqueue.
+ */
+static void sleep_worker(struct workqueue_struct *wq, struct worker *w)
+__must_hold(&wq->lock)
+{
+	list_move(&w->entry, &wq->idle_list);
+	if (list_empty(&wq->busy_list))
+		pthread_cond_signal(&wq->idle_cond);
+}
+
+/**
+ * stop_worker - stop worker @w
+ *
+ * Called from main thread.
+ * Send stop message to worker @w.
+ */
+static int stop_worker(struct worker *w)
+{
+	int ret;
+	enum worker_msg msg;
+
+	msg = WORKER_MSG__STOP;
+	ret = writen(w->msg_pipe[1], &msg, sizeof(msg));
+	if (ret < 0) {
+		pr_debug2("workqueue: error sending stop msg: %s\n",
+			strerror(errno));
+		return -WORKQUEUE_ERROR__WRITEPIPE;
+	}
+
+	return 0;
+}
+
+/**
+ * init_worker - init @w struct
+ * @w: the struct to init
+ * @tidx: index of the executing thread inside the threadpool
+ */
+static int init_worker(struct worker *w, int tidx)
+{
+	if (pipe(w->msg_pipe)) {
+		pr_debug2("worker[%d]: error opening pipe: %s\n", tidx, strerror(errno));
+		return -ENOMEM;
+	}
+
+	w->tidx = tidx;
+	w->current_work = NULL;
+	INIT_LIST_HEAD(&w->entry);
+
+	return 0;
+}
+
+/**
+ * fini_worker - deallocate resources used by @w struct
+ */
+static void fini_worker(struct worker *w)
+{
+	close(w->msg_pipe[0]);
+	w->msg_pipe[0] = -1;
+	close(w->msg_pipe[1]);
+	w->msg_pipe[1] = -1;
+}
+
+/**
+ * register_worker - add worker to @wq->idle_list
+ */
+static void register_worker(struct workqueue_struct *wq, struct worker *w)
+__must_hold(&wq->lock)
+{
+	list_move(&w->entry, &wq->idle_list);
+}
+
+/**
+ * unregister_worker - remove worker from @wq->idle_list
+ */
+static void unregister_worker(struct workqueue_struct *wq __maybe_unused,
+			struct worker *w)
+__must_hold(&wq->lock)
+{
+	list_del_init(&w->entry);
+}
+
 /**
  * worker_thread - worker function executed on threadpool
  */
 static void worker_thread(int tidx, struct task_struct *task)
 {
+	struct workqueue_struct *wq = container_of(task, struct workqueue_struct, task);
+	struct worker this_worker;
+	enum worker_msg msg;
+	int ret, init_err = init_worker(&this_worker, tidx);
+
+	if (init_err) {
+		// send error message to main thread
+		msg = WORKER_MSG__ERROR;
+	} else {
+		lock_workqueue(wq);
+		register_worker(wq, &this_worker);
+		unlock_workqueue(wq);
+
+		// ack worker creation
+		msg = WORKER_MSG__READY;
+	}
+
+	ret = writen(wq->msg_pipe[1], &msg, sizeof(msg));
+	if (ret < 0) {
+		pr_debug("worker[%d]: error sending msg: %s\n",
+			tidx, strerror(errno));
+
+		if (init_err)
+			return;
+		goto out;
+	}
 
-	pr_info("Hi from worker %d, executing task %p\n", tidx, task);
+	// stop if there have been errors in init
+	if (init_err)
+		return;
+
+	for (;;) {
+		msg = WORKER_MSG__UNDEFINED;
+		ret = readn(this_worker.msg_pipe[0], &msg, sizeof(msg));
+		if (ret < 0 || (msg != WORKER_MSG__WAKE && msg != WORKER_MSG__STOP)) {
+			pr_debug("worker[%d]: error receiving msg: %s\n",
+				tidx, strerror(errno));
+			break;
+		}
+
+		if (msg == WORKER_MSG__STOP)
+			break;
+
+		// main thread takes care of moving to busy list and assigning current_work
+
+		while (this_worker.current_work) {
+			this_worker.current_work->func(this_worker.current_work);
+
+			lock_workqueue(wq);
+			if (available_work(wq)) {
+				this_worker.current_work = dequeue_work(wq);
+				pr_debug2("worker[%d]: dequeued work\n",
+					tidx);
+			} else {
+				this_worker.current_work = NULL;
+				sleep_worker(wq, &this_worker);
+				pr_debug2("worker[%d]: going to sleep\n",
+					tidx);
+			}
+			unlock_workqueue(wq);
+		}
+	}
+
+out:
+	lock_workqueue(wq);
+	unregister_worker(wq, &this_worker);
+	unlock_workqueue(wq);
+
+	fini_worker(&this_worker);
 }
 
 /**
@@ -61,6 +273,9 @@ static void worker_thread(int tidx, struct task_struct *task)
 static int attach_threadpool_to_workqueue(struct workqueue_struct *wq,
 					struct threadpool *pool)
 {
+	int ret, t;
+	enum worker_msg msg;
+
 	if (!threadpool__is_ready(pool)) {
 		pr_debug2("workqueue: cannot attach to pool: pool is not ready\n");
 		return -WORKQUEUE_ERROR__NOTALLOWED;
@@ -72,6 +287,22 @@ static int attach_threadpool_to_workqueue(struct workqueue_struct *wq,
 	if (wq->pool_errno)
 		return -WORKQUEUE_ERROR__POOLEXE;
 
+
+	// wait ack from all threads
+	for (t = 0; t < threadpool__size(pool); t++) {
+		msg = WORKER_MSG__UNDEFINED;
+		ret = readn(wq->msg_pipe[0], &msg, sizeof(msg));
+		if (ret < 0) {
+			pr_debug("workqueue: error receiving ack: %s\n",
+				strerror(errno));
+			return -WORKQUEUE_ERROR__READPIPE;
+		}
+		if (msg != WORKER_MSG__READY) {
+			pr_debug2("workqueue: received error\n");
+			return -WORKQUEUE_ERROR__INVALIDMSG;
+		}
+	}
+
 	return 0;
 }
 
@@ -81,12 +312,22 @@ static int attach_threadpool_to_workqueue(struct workqueue_struct *wq,
 static int detach_threadpool_from_workqueue(struct workqueue_struct *wq)
 {
 	int ret, err = 0;
+	struct worker *w;
 
 	if (wq->status != WORKQUEUE_STATUS__READY) {
 		pr_debug2("workqueue: cannot attach to pool: wq is not ready\n");
 		return -WORKQUEUE_ERROR__NOTALLOWED;
 	}
 
+	lock_workqueue(wq);
+	for_each_idle_worker(wq, w) {
+		ret = stop_worker(w);
+		if (ret)
+			err = ret;
+	}
+	unlock_workqueue(wq);
+
+
 	ret = threadpool__wait(wq->pool);
 	if (ret) {
 		pr_debug2("workqueue: error waiting threadpool\n");
-- 
2.31.1


  parent reply	other threads:[~2021-07-30 15:38 UTC|newest]

Thread overview: 21+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
     [not found] <cover.1627657061.git.rickyman7@gmail.com>
2021-07-30 15:34 ` [RFC PATCH v2 01/10] perf workqueue: threadpool creation and destruction Riccardo Mancini
2021-08-07  2:24   ` Namhyung Kim
2021-08-09 10:30     ` Riccardo Mancini
2021-08-10 18:54       ` Namhyung Kim
2021-08-10 20:24         ` Arnaldo Carvalho de Melo
2021-08-11 17:55           ` Riccardo Mancini
2021-07-30 15:34 ` [RFC PATCH v2 02/10] perf tests: add test for workqueue Riccardo Mancini
2021-07-30 15:34 ` [RFC PATCH v2 03/10] perf workqueue: add threadpool start and stop functions Riccardo Mancini
2021-08-07  2:43   ` Namhyung Kim
2021-08-09 10:35     ` Riccardo Mancini
2021-07-30 15:34 ` [RFC PATCH v2 04/10] perf workqueue: add threadpool execute and wait functions Riccardo Mancini
2021-08-07  2:56   ` Namhyung Kim
2021-07-30 15:34 ` [RFC PATCH v2 05/10] tools: add sparse context/locking annotations in compiler-types.h Riccardo Mancini
2021-07-30 15:34 ` [RFC PATCH v2 06/10] perf workqueue: introduce workqueue struct Riccardo Mancini
2021-08-09 12:04   ` Jiri Olsa
2021-07-30 15:34 ` Riccardo Mancini [this message]
2021-07-30 15:34 ` [RFC PATCH v2 08/10] perf workqueue: add queue_work and flush_workqueue functions Riccardo Mancini
2021-07-30 15:34 ` [RFC PATCH v2 09/10] perf workqueue: add utility to execute a for loop in parallel Riccardo Mancini
2021-07-30 15:34 ` [RFC PATCH v2 10/10] perf synthetic-events: use workqueue parallel_for Riccardo Mancini
2021-08-09 12:04   ` Jiri Olsa
2021-08-09 13:24     ` Riccardo Mancini

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=c693ab79b15914ce2da05d223dd37943a16a1e70.1627657061.git.rickyman7@gmail.com \
    --to=rickyman7@gmail.com \
    --cc=acme@kernel.org \
    --cc=alexey.v.bayduraev@linux.intel.com \
    --cc=irogers@google.com \
    --cc=jolsa@redhat.com \
    --cc=linux-kernel@vger.kernel.org \
    --cc=linux-perf-users@vger.kernel.org \
    --cc=mark.rutland@arm.com \
    --cc=mingo@redhat.com \
    --cc=namhyung@kernel.org \
    --cc=peterz@infradead.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).