All of lore.kernel.org
 help / color / mirror / Atom feed
From: Dave Chinner <david@fromorbit.com>
To: linux-xfs@vger.kernel.org
Subject: [PATCH 4/7] workqueue: bound maximum queue depth
Date: Tue, 30 Oct 2018 22:20:40 +1100	[thread overview]
Message-ID: <20181030112043.6034-5-david@fromorbit.com> (raw)
In-Reply-To: <20181030112043.6034-1-david@fromorbit.com>

From: Dave Chinner <dchinner@redhat.com>

Existing users of workqueues have bound maximum queue depths in
their external algorithms (e.g. prefetch counts). For parallelising
work that doesn't have an external bound, allow workqueues to
throttle incoming requests at a maximum bound. bounded workqueues
also need to distribute work over all worker threads themselves as
there is no external bounding or worker function throttling
provided.

Existing callers are not throttled and retain direct control of
worker threads, only users of the new create interface will be
throttled and concurrency managed.

Signed-off-by: Dave Chinner <dchinner@redhat.com>
---
 include/workqueue.h |  4 ++++
 libfrog/workqueue.c | 30 +++++++++++++++++++++++++++---
 2 files changed, 31 insertions(+), 3 deletions(-)

diff --git a/include/workqueue.h b/include/workqueue.h
index c45dc4fbcf64..504da9403b85 100644
--- a/include/workqueue.h
+++ b/include/workqueue.h
@@ -30,10 +30,14 @@ struct workqueue {
 	unsigned int		item_count;
 	unsigned int		thread_count;
 	bool			terminate;
+	int			max_queued;
+	pthread_cond_t		queue_full;
 };
 
 int workqueue_create(struct workqueue *wq, void *wq_ctx,
 		unsigned int nr_workers);
+int workqueue_create_bound(struct workqueue *wq, void *wq_ctx,
+		unsigned int nr_workers, int max_queue);
 int workqueue_add(struct workqueue *wq, workqueue_func_t fn,
 		uint32_t index, void *arg);
 void workqueue_destroy(struct workqueue *wq);
diff --git a/libfrog/workqueue.c b/libfrog/workqueue.c
index 7311477374b4..8fe0dc7249f5 100644
--- a/libfrog/workqueue.c
+++ b/libfrog/workqueue.c
@@ -40,13 +40,21 @@ workqueue_thread(void *arg)
 		}
 
 		/*
-		 *  Dequeue work from the head of the list.
+		 *  Dequeue work from the head of the list. If the queue was
+		 *  full then send a wakeup if we're configured to do so.
 		 */
 		assert(wq->item_count > 0);
+		if (wq->max_queued && wq->item_count == wq->max_queued)
+			pthread_cond_signal(&wq->queue_full);
+
 		wi = wq->next_item;
 		wq->next_item = wi->next;
 		wq->item_count--;
 
+		if (wq->max_queued && wq->next_item) {
+			/* more work, wake up another worker */
+			pthread_cond_signal(&wq->wakeup);
+		}
 		pthread_mutex_unlock(&wq->lock);
 
 		(wi->function)(wi->queue, wi->index, wi->arg);
@@ -58,22 +66,25 @@ workqueue_thread(void *arg)
 
 /* Allocate a work queue and threads. */
 int
-workqueue_create(
+workqueue_create_bound(
 	struct workqueue	*wq,
 	void			*wq_ctx,
-	unsigned int		nr_workers)
+	unsigned int		nr_workers,
+	int			max_queue)
 {
 	unsigned int		i;
 	int			err = 0;
 
 	memset(wq, 0, sizeof(*wq));
 	pthread_cond_init(&wq->wakeup, NULL);
+	pthread_cond_init(&wq->queue_full, NULL);
 	pthread_mutex_init(&wq->lock, NULL);
 
 	wq->wq_ctx = wq_ctx;
 	wq->thread_count = nr_workers;
 	wq->threads = malloc(nr_workers * sizeof(pthread_t));
 	wq->terminate = false;
+	wq->max_queued = max_queue;
 
 	for (i = 0; i < nr_workers; i++) {
 		err = pthread_create(&wq->threads[i], NULL, workqueue_thread,
@@ -87,6 +98,15 @@ workqueue_create(
 	return err;
 }
 
+int
+workqueue_create(
+	struct workqueue	*wq,
+	void			*wq_ctx,
+	unsigned int		nr_workers)
+{
+	return workqueue_create_bound(wq, wq_ctx, nr_workers, 0);
+}
+
 /*
  * Create a work item consisting of a function and some arguments and
  * schedule the work item to be run via the thread pool.
@@ -122,6 +142,9 @@ workqueue_add(
 		assert(wq->item_count == 0);
 		pthread_cond_signal(&wq->wakeup);
 	} else {
+		/* throttle on a full queue if configured */
+		if (wq->max_queued && wq->item_count == wq->max_queued)
+			pthread_cond_wait(&wq->queue_full, &wq->lock);
 		wq->last_item->next = wi;
 	}
 	wq->last_item = wi;
@@ -153,5 +176,6 @@ workqueue_destroy(
 	free(wq->threads);
 	pthread_mutex_destroy(&wq->lock);
 	pthread_cond_destroy(&wq->wakeup);
+	pthread_cond_destroy(&wq->queue_full);
 	memset(wq, 0, sizeof(*wq));
 }
-- 
2.19.1

  parent reply	other threads:[~2018-10-30 20:13 UTC|newest]

Thread overview: 35+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-10-30 11:20 [PATCH 0/7] xfs_repair: scale to 150,000 iops Dave Chinner
2018-10-30 11:20 ` [PATCH 1/7] Revert "xfs_repair: treat zero da btree pointers as corruption" Dave Chinner
2018-10-30 17:20   ` Darrick J. Wong
2018-10-30 19:35     ` Eric Sandeen
2018-10-30 20:11       ` Dave Chinner
2018-10-30 11:20 ` [PATCH 2/7] repair: don't dirty inodes which are not unlinked Dave Chinner
2018-10-30 17:26   ` Darrick J. Wong
2018-10-30 20:03   ` Eric Sandeen
2018-10-30 20:09     ` Eric Sandeen
2018-10-30 20:34       ` Dave Chinner
2018-10-30 20:40         ` Eric Sandeen
2018-10-30 20:58           ` Dave Chinner
2018-10-30 11:20 ` [PATCH 3/7] cache: prevent expansion races Dave Chinner
2018-10-30 17:39   ` Darrick J. Wong
2018-10-30 20:35     ` Dave Chinner
2018-10-31 17:13   ` Brian Foster
2018-11-01  1:27     ` Dave Chinner
2018-11-01 13:17       ` Brian Foster
2018-11-01 21:23         ` Dave Chinner
2018-11-02 11:31           ` Brian Foster
2018-11-02 23:26             ` Dave Chinner
2018-10-30 11:20 ` Dave Chinner [this message]
2018-10-30 17:58   ` [PATCH 4/7] workqueue: bound maximum queue depth Darrick J. Wong
2018-10-30 20:53     ` Dave Chinner
2018-10-31 17:14       ` Brian Foster
2018-10-30 11:20 ` [PATCH 5/7] repair: Protect bad inode list with mutex Dave Chinner
2018-10-30 17:44   ` Darrick J. Wong
2018-10-30 20:54     ` Dave Chinner
2018-10-30 11:20 ` [PATCH 6/7] repair: protect inode chunk tree records with a mutex Dave Chinner
2018-10-30 17:46   ` Darrick J. Wong
2018-10-30 11:20 ` [PATCH 7/7] repair: parallelise phase 6 Dave Chinner
2018-10-30 17:51   ` Darrick J. Wong
2018-10-30 20:55     ` Dave Chinner
2018-11-07  5:44 ` [PATCH 0/7] xfs_repair: scale to 150,000 iops Arkadiusz Miśkiewicz
2018-11-07  6:48   ` Dave Chinner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20181030112043.6034-5-david@fromorbit.com \
    --to=david@fromorbit.com \
    --cc=linux-xfs@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.