All of lore.kernel.org
 help / color / mirror / Atom feed
From: "Darrick J. Wong" <darrick.wong@oracle.com>
To: sandeen@redhat.com
Cc: linux-xfs@vger.kernel.org
Subject: [PATCH v2 06/12] libfrog: create a threaded workqueue
Date: Mon, 27 Nov 2017 22:00:40 -0800	[thread overview]
Message-ID: <20171128060040.GG21412@magnolia> (raw)
In-Reply-To: <151094964785.29763.1548614345004906846.stgit@magnolia>

From: Darrick J. Wong <darrick.wong@oracle.com>

Create a thread pool that queues and runs discrete work items.  This
will be a namespaced version of the pool in repair/threads.c; a
subsequent patch will switch repair over.  xfs_scrub will use the
generic thread pool.

Signed-off-by: Darrick J. Wong <darrick.wong@oracle.com>
---
v2: updated copyright information
---
 include/workqueue.h |   55 ++++++++++++++++
 libfrog/Makefile    |    3 +
 libfrog/workqueue.c |  174 +++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 231 insertions(+), 1 deletion(-)
 create mode 100644 include/workqueue.h
 create mode 100644 libfrog/workqueue.c

diff --git a/include/workqueue.h b/include/workqueue.h
new file mode 100644
index 0000000..b4b3541
--- /dev/null
+++ b/include/workqueue.h
@@ -0,0 +1,55 @@
+/*
+ * Copyright (C) 2017 Oracle.  All Rights Reserved.
+ *
+ * Author: Darrick J. Wong <darrick.wong@oracle.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it would be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write the Free Software Foundation,
+ * Inc.,  51 Franklin St, Fifth Floor, Boston, MA  02110-1301, USA.
+ *
+ * This code was adapted from repair/threads.h.
+ */
+#ifndef	_WORKQUEUE_H_
+#define	_WORKQUEUE_H_
+
+struct workqueue;
+
+typedef void workqueue_func_t(struct workqueue *wq, uint32_t index, void *arg);
+
+struct workqueue_item {
+	struct workqueue	*queue;
+	struct workqueue_item	*next;
+	workqueue_func_t	*function;
+	void			*arg;
+	uint32_t		index;
+};
+
+struct workqueue {
+	void			*wq_ctx;
+	pthread_t		*threads;
+	struct workqueue_item	*next_item;
+	struct workqueue_item	*last_item;
+	pthread_mutex_t		lock;
+	pthread_cond_t		wakeup;
+	unsigned int		item_count;
+	unsigned int		thread_count;
+	bool			terminate;
+};
+
+int workqueue_create(struct workqueue *wq, void *wq_ctx,
+		unsigned int nr_workers);
+int workqueue_add(struct workqueue *wq, workqueue_func_t fn,
+		uint32_t index, void *arg);
+void workqueue_destroy(struct workqueue *wq);
+
+#endif	/* _WORKQUEUE_H_ */
diff --git a/libfrog/Makefile b/libfrog/Makefile
index 3fd42a4..9a43621 100644
--- a/libfrog/Makefile
+++ b/libfrog/Makefile
@@ -14,7 +14,8 @@ CFILES = \
 avl64.c \
 list_sort.c \
 radix-tree.c \
-util.c
+util.c \
+workqueue.c
 
 default: ltdepend $(LTLIBRARY)
 
diff --git a/libfrog/workqueue.c b/libfrog/workqueue.c
new file mode 100644
index 0000000..66e8075
--- /dev/null
+++ b/libfrog/workqueue.c
@@ -0,0 +1,174 @@
+/*
+ * Copyright (C) 2017 Oracle.  All Rights Reserved.
+ *
+ * Author: Darrick J. Wong <darrick.wong@oracle.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it would be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write the Free Software Foundation,
+ * Inc.,  51 Franklin St, Fifth Floor, Boston, MA  02110-1301, USA.
+ *
+ * This code was adapted from repair/threads.c, which (at the time)
+ * did not contain a copyright statement.
+ */
+#include <pthread.h>
+#include <signal.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <errno.h>
+#include <assert.h>
+#include "workqueue.h"
+
+/* Main processing thread */
+static void *
+workqueue_thread(void *arg)
+{
+	struct workqueue	*wq = arg;
+	struct workqueue_item	*wi;
+
+	/*
+	 * Loop pulling work from the passed in work queue.
+	 * Check for notification to exit after every chunk of work.
+	 */
+	while (1) {
+		pthread_mutex_lock(&wq->lock);
+
+		/*
+		 * Wait for work.
+		 */
+		while (wq->next_item == NULL && !wq->terminate) {
+			assert(wq->item_count == 0);
+			pthread_cond_wait(&wq->wakeup, &wq->lock);
+		}
+		if (wq->next_item == NULL && wq->terminate) {
+			pthread_mutex_unlock(&wq->lock);
+			break;
+		}
+
+		/*
+		 *  Dequeue work from the head of the list.
+		 */
+		assert(wq->item_count > 0);
+		wi = wq->next_item;
+		wq->next_item = wi->next;
+		wq->item_count--;
+
+		pthread_mutex_unlock(&wq->lock);
+
+		(wi->function)(wi->queue, wi->index, wi->arg);
+		free(wi);
+	}
+
+	return NULL;
+}
+
+/* Allocate a work queue and threads. */
+int
+workqueue_create(
+	struct workqueue	*wq,
+	void			*wq_ctx,
+	unsigned int		nr_workers)
+{
+	unsigned int		i;
+	int			err = 0;
+
+	memset(wq, 0, sizeof(*wq));
+	pthread_cond_init(&wq->wakeup, NULL);
+	pthread_mutex_init(&wq->lock, NULL);
+
+	wq->wq_ctx = wq_ctx;
+	wq->thread_count = nr_workers;
+	wq->threads = malloc(nr_workers * sizeof(pthread_t));
+	wq->terminate = false;
+
+	for (i = 0; i < nr_workers; i++) {
+		err = pthread_create(&wq->threads[i], NULL, workqueue_thread,
+				wq);
+		if (err)
+			break;
+	}
+
+	if (err)
+		workqueue_destroy(wq);
+	return err;
+}
+
+/*
+ * Create a work item consisting of a function and some arguments and
+ * schedule the work item to be run via the thread pool.
+ */
+int
+workqueue_add(
+	struct workqueue	*wq,
+	workqueue_func_t	func,
+	uint32_t		index,
+	void			*arg)
+{
+	struct workqueue_item	*wi;
+
+	if (wq->thread_count == 0) {
+		func(wq, index, arg);
+		return 0;
+	}
+
+	wi = malloc(sizeof(struct workqueue_item));
+	if (wi == NULL)
+		return ENOMEM;
+
+	wi->function = func;
+	wi->index = index;
+	wi->arg = arg;
+	wi->queue = wq;
+	wi->next = NULL;
+
+	/* Now queue the new work structure to the work queue. */
+	pthread_mutex_lock(&wq->lock);
+	if (wq->next_item == NULL) {
+		wq->next_item = wi;
+		assert(wq->item_count == 0);
+		pthread_cond_signal(&wq->wakeup);
+	} else {
+		wq->last_item->next = wi;
+	}
+	wq->last_item = wi;
+	wq->item_count++;
+	pthread_mutex_unlock(&wq->lock);
+
+	return 0;
+}
+
+/*
+ * Wait for all pending work items to be processed and tear down the
+ * workqueue.
+ */
+void
+workqueue_destroy(
+	struct workqueue	*wq)
+{
+	unsigned int		i;
+
+	pthread_mutex_lock(&wq->lock);
+	wq->terminate = 1;
+	pthread_mutex_unlock(&wq->lock);
+
+	pthread_cond_broadcast(&wq->wakeup);
+
+	for (i = 0; i < wq->thread_count; i++)
+		pthread_join(wq->threads[i], NULL);
+
+	free(wq->threads);
+	pthread_mutex_destroy(&wq->lock);
+	pthread_cond_destroy(&wq->wakeup);
+	memset(wq, 0, sizeof(*wq));
+}

  reply	other threads:[~2017-11-28  6:00 UTC|newest]

Thread overview: 20+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2017-11-17 20:13 [PATCH 00/12] xfsprogs-4.15: common library for misc. routines Darrick J. Wong
2017-11-17 20:13 ` [PATCH 01/12] libfrog: move all the userspace support stuff into a new library Darrick J. Wong
2017-11-17 20:13 ` [PATCH 02/12] libfrog: move libxfs_log2_roundup to libfrog Darrick J. Wong
2017-12-04 22:01   ` [PATCH v2 " Darrick J. Wong
2017-11-17 20:13 ` [PATCH 03/12] libfrog: add bit manipulation functions Darrick J. Wong
2017-11-17 20:13 ` [PATCH 04/12] libfrog: move list_sort out of libxfs Darrick J. Wong
2017-11-17 20:13 ` [PATCH 05/12] libfrog: promote avl64 code from xfs_repair Darrick J. Wong
2017-11-17 20:14 ` [PATCH 06/12] libfrog: create a threaded workqueue Darrick J. Wong
2017-11-28  6:00   ` Darrick J. Wong [this message]
2017-11-17 20:14 ` [PATCH 07/12] libfrog: move topology code out of libxcmd Darrick J. Wong
2017-11-17 20:14 ` [PATCH 08/12] libfrog: move conversion factors " Darrick J. Wong
2017-11-27 21:47   ` Eric Sandeen
2017-11-28  0:31     ` Darrick J. Wong
2017-11-28  0:45   ` [PATCH v2 " Darrick J. Wong
2017-11-28  5:59   ` [PATCH v3 " Darrick J. Wong
2017-11-17 20:14 ` [PATCH 09/12] libfrog: move paths.c " Darrick J. Wong
2017-11-17 20:14 ` [PATCH 10/12] libfrog: add missing function fs_table_destroy Darrick J. Wong
2017-11-17 20:14 ` [PATCH 11/12] libhandle: add missing destructor Darrick J. Wong
2017-11-17 20:14 ` [PATCH 12/12] xfs_repair: remove old workqueue implementation in favor of libfrog code Darrick J. Wong
2017-12-04 23:44 ` [PATCH 00/12] xfsprogs-4.15: common library for misc. routines Eric Sandeen

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20171128060040.GG21412@magnolia \
    --to=darrick.wong@oracle.com \
    --cc=linux-xfs@vger.kernel.org \
    --cc=sandeen@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.