linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
From: Tejun Heo <tj@kernel.org>
To: torvalds@linux-foundation.org, awalls@radix.net,
	linux-kernel@vger.kernel.org, jeff@garzik.org, mingo@elte.hu,
	akpm@linux-foundation.org, jens.axboe@oracle.com,
	rusty@rustcorp.com.au, cl@linux-foundation.org,
	dhowells@redhat.com, arjan@linux.intel.com, avi@redhat.com,
	peterz@infradead.org, johannes@sipsolutions.net,
	andi@firstfloor.org
Cc: Tejun Heo <tj@kernel.org>
Subject: [PATCH 24/27] workqueue: make single thread workqueue shared worker pool friendly
Date: Fri, 18 Dec 2009 21:58:05 +0900	[thread overview]
Message-ID: <1261141088-2014-25-git-send-email-tj@kernel.org> (raw)
In-Reply-To: <1261141088-2014-1-git-send-email-tj@kernel.org>

Reimplement st (single thread) workqueue so that it's friendly to
shared worker pool.  It was originally implemented by confining st
workqueues to use cwq of a fixed cpu and always having a worker for
the cpu.  This implementation isn't very friendly to shared worker
pool and suboptimal in that it ends up crossing cpu boundaries often.

Reimplement st workqueue using dynamic single cpu binding and
cwq->limit.  WQ_SINGLE_THREAD is replaced with WQ_SINGLE_CPU.  In a
single cpu workqueue, at most single cwq is bound to the wq at any
given time.  Arbitration is done using atomic accesses to
wq->single_cpu when queueing a work.  Once bound, the binding stays
till the workqueue is drained.

Note that the binding is never broken while a workqueue is frozen.
This is because idle cwqs may have works waiting in delayed_works
queue while frozen.  On thaw, the cwq is restarted if there are any
delayed works or unbound otherwise.

When combined with max_active limit of 1, single cpu workqueue has
exactly the same execution properties as the original single thread
workqueue while allowing sharing of per-cpu workers.

Signed-off-by: Tejun Heo <tj@kernel.org>
---
 include/linux/workqueue.h |    6 +-
 kernel/workqueue.c        |  139 ++++++++++++++++++++++++++++++++-------------
 2 files changed, 103 insertions(+), 42 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 7a260df..b012da7 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -212,7 +212,7 @@ static inline bool work_static(struct work_struct *work) { return false; }
 
 enum {
 	WQ_FREEZEABLE		= 1 << 0, /* freeze during suspend */
-	WQ_SINGLE_THREAD	= 1 << 1, /* no per-cpu worker */
+	WQ_SINGLE_CPU		= 1 << 1, /* only single cpu at a time */
 };
 
 extern struct workqueue_struct *
@@ -241,9 +241,9 @@ __create_workqueue_key(const char *name, unsigned int flags, int max_active,
 #define create_workqueue(name)					\
 	__create_workqueue((name), 0, 1)
 #define create_freezeable_workqueue(name)			\
-	__create_workqueue((name), WQ_FREEZEABLE | WQ_SINGLE_THREAD, 1)
+	__create_workqueue((name), WQ_FREEZEABLE | WQ_SINGLE_CPU, 1)
 #define create_singlethread_workqueue(name)			\
-	__create_workqueue((name), WQ_SINGLE_THREAD, 1)
+	__create_workqueue((name), WQ_SINGLE_CPU, 1)
 
 extern void destroy_workqueue(struct workqueue_struct *wq);
 
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 265f480..19cfa12 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -114,8 +114,7 @@ struct global_cwq {
 } ____cacheline_aligned_in_smp;
 
 /*
- * The per-CPU workqueue (if single thread, we always use the first
- * possible cpu).  The lower WORK_STRUCT_FLAG_BITS of
+ * The per-CPU workqueue.  The lower WORK_STRUCT_FLAG_BITS of
  * work_struct->data are used for flags and thus cwqs need to be
  * aligned at two's power of the number of flag bits.
  */
@@ -159,6 +158,8 @@ struct workqueue_struct {
 	struct list_head	flusher_queue;	/* F: flush waiters */
 	struct list_head	flusher_overflow; /* F: flush overflow list */
 
+	unsigned long		single_cpu;	/* cpu for single cpu wq */
+
 	int			saved_max_active; /* I: saved cwq max_active */
 	const char		*name;		/* I: workqueue name */
 #ifdef CONFIG_LOCKDEP
@@ -289,8 +290,6 @@ static DEFINE_PER_CPU(struct global_cwq, global_cwq);
 
 static int worker_thread(void *__worker);
 
-static int singlethread_cpu __read_mostly;
-
 static struct global_cwq *get_gcwq(unsigned int cpu)
 {
 	return &per_cpu(global_cwq, cpu);
@@ -302,14 +301,6 @@ static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
 	return per_cpu_ptr(wq->cpu_wq, cpu);
 }
 
-static struct cpu_workqueue_struct *target_cwq(unsigned int cpu,
-					       struct workqueue_struct *wq)
-{
-	if (unlikely(wq->flags & WQ_SINGLE_THREAD))
-		cpu = singlethread_cpu;
-	return get_cwq(cpu, wq);
-}
-
 static unsigned int work_color_to_flags(int color)
 {
 	return color << WORK_STRUCT_COLOR_SHIFT;
@@ -403,17 +394,84 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
 	wake_up_process(cwq->worker->task);
 }
 
-static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
-			 struct work_struct *work)
+/**
+ * cwq_unbind_single_cpu - unbind cwq from single cpu workqueue processing
+ * @cwq: cwq to unbind
+ *
+ * Try to unbind @cwq from single cpu workqueue processing.  If
+ * @cwq->wq is frozen, unbind is delayed till the workqueue is thawed.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ */
+static void cwq_unbind_single_cpu(struct cpu_workqueue_struct *cwq)
 {
-	struct cpu_workqueue_struct *cwq = target_cwq(cpu, wq);
+	struct workqueue_struct *wq = cwq->wq;
 	struct global_cwq *gcwq = cwq->gcwq;
+
+	BUG_ON(wq->single_cpu != gcwq->cpu);
+	/*
+	 * Unbind from workqueue if @cwq is not frozen.  If frozen,
+	 * thaw_workqueues() will either restart processing on this
+	 * cpu or unbind if empty.  This keeps works queued while
+	 * frozen fully ordered and flushable.
+	 */
+	if (likely(!(gcwq->flags & GCWQ_FREEZING))) {
+		smp_wmb();	/* paired with cmpxchg() in __queue_work() */
+		wq->single_cpu = NR_CPUS;
+	}
+}
+
+static void __queue_work(unsigned int req_cpu, struct workqueue_struct *wq,
+			 struct work_struct *work)
+{
+	struct global_cwq *gcwq;
+	struct cpu_workqueue_struct *cwq;
 	struct list_head *worklist;
+	unsigned int cpu;
 	unsigned long flags;
+	bool arbitrate;
 
 	debug_work_activate(work);
 
-	spin_lock_irqsave(&gcwq->lock, flags);
+	if (!(wq->flags & WQ_SINGLE_CPU)) {
+		/* just use the requested cpu for multicpu workqueues */
+		cwq = get_cwq(req_cpu, wq);
+		gcwq = cwq->gcwq;
+		spin_lock_irqsave(&gcwq->lock, flags);
+	} else {
+		/*
+		 * It's a bit more complex for single cpu workqueues.
+		 * We first need to determine which cpu is going to be
+		 * used.  If no cpu is currently serving this
+		 * workqueue, arbitrate using atomic accesses to
+		 * wq->single_cpu; otherwise, use the current one.
+		 */
+	retry:
+		cpu = wq->single_cpu;
+		arbitrate = cpu == NR_CPUS;
+		if (arbitrate)
+			cpu = req_cpu;
+
+		cwq = get_cwq(cpu, wq);
+		gcwq = cwq->gcwq;
+		spin_lock_irqsave(&gcwq->lock, flags);
+
+		/*
+		 * The following cmpxchg() is full barrier paired with
+		 * smp_wmb() in cwq_unbind_single_cpu() and guarantees
+		 * that all changes to wq->st_* fields are visible on
+		 * the new cpu after this point.
+		 */
+		if (arbitrate)
+			cmpxchg(&wq->single_cpu, NR_CPUS, cpu);
+
+		if (unlikely(wq->single_cpu != cpu)) {
+			spin_unlock_irqrestore(&gcwq->lock, flags);
+			goto retry;
+		}
+	}
+
 	BUG_ON(!list_empty(&work->entry));
 
 	cwq->nr_in_flight[cwq->work_color]++;
@@ -523,7 +581,7 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
 		timer_stats_timer_set_start_info(&dwork->timer);
 
 		/* This stores cwq for the moment, for the timer_fn */
-		set_wq_data(work, target_cwq(raw_smp_processor_id(), wq), 0);
+		set_wq_data(work, get_cwq(raw_smp_processor_id(), wq), 0);
 		timer->expires = jiffies + delay;
 		timer->data = (unsigned long)dwork;
 		timer->function = delayed_work_timer_fn;
@@ -784,10 +842,14 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
 	cwq->nr_in_flight[color]--;
 	cwq->nr_active--;
 
-	/* one down, submit a delayed one */
-	if (!list_empty(&cwq->delayed_works) &&
-	    cwq->nr_active < cwq->max_active)
-		cwq_activate_first_delayed(cwq);
+	if (!list_empty(&cwq->delayed_works)) {
+		/* one down, submit a delayed one */
+		if (cwq->nr_active < cwq->max_active)
+			cwq_activate_first_delayed(cwq);
+	} else if (!cwq->nr_active && cwq->wq->flags & WQ_SINGLE_CPU) {
+		/* this was the last work, unbind from single cpu */
+		cwq_unbind_single_cpu(cwq);
+	}
 
 	/* is flush in progress and are we at the flushing tip? */
 	if (likely(cwq->flush_color != color))
@@ -1667,7 +1729,6 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
 						struct lock_class_key *key,
 						const char *lock_name)
 {
-	bool singlethread = flags & WQ_SINGLE_THREAD;
 	struct workqueue_struct *wq;
 	bool failed = false;
 	unsigned int cpu;
@@ -1688,6 +1749,8 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
 	atomic_set(&wq->nr_cwqs_to_flush, 0);
 	INIT_LIST_HEAD(&wq->flusher_queue);
 	INIT_LIST_HEAD(&wq->flusher_overflow);
+	wq->single_cpu = NR_CPUS;
+
 	wq->name = name;
 	lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
 	INIT_LIST_HEAD(&wq->list);
@@ -1713,8 +1776,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
 
 		if (failed)
 			continue;
-		cwq->worker = create_worker(cwq,
-					    cpu_online(cpu) && !singlethread);
+		cwq->worker = create_worker(cwq, cpu_online(cpu));
 		if (cwq->worker)
 			start_worker(cwq->worker);
 		else
@@ -1912,18 +1974,16 @@ static int __cpuinit trustee_thread(void *__gcwq)
 
 	spin_lock_irq(&gcwq->lock);
 	/*
-	 * Make all multithread workers rogue.  Trustee must be bound
-	 * to the target cpu and can't be cancelled.
+	 * Make all workers rogue.  Trustee must be bound to the
+	 * target cpu and can't be cancelled.
 	 */
 	BUG_ON(gcwq->cpu != smp_processor_id());
 
 	list_for_each_entry(worker, &gcwq->idle_list, entry)
-		if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD))
-			worker->flags |= WORKER_ROGUE;
+		worker->flags |= WORKER_ROGUE;
 
 	for_each_busy_worker(worker, i, pos, gcwq)
-		if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD))
-			worker->flags |= WORKER_ROGUE;
+		worker->flags |= WORKER_ROGUE;
 
 	/*
 	 * We're now in charge.  Notify and proceed to drain.  We need
@@ -2027,20 +2087,17 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
 		}
 
 		/*
-		 * Clear ROGUE from and rebind all multithread
-		 * workers.  Unsetting ROGUE and rebinding require
-		 * dropping gcwq->lock.  Restart loop after each
-		 * successful release.
+		 * Clear ROGUE from and rebind all workers.  Unsetting
+		 * ROGUE and rebinding require dropping gcwq->lock.
+		 * Restart loop after each successful release.
 		 */
 	recheck:
 		list_for_each_entry(worker, &gcwq->idle_list, entry)
-			if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD) &&
-			    trustee_unset_rogue(worker))
+			if (trustee_unset_rogue(worker))
 				goto recheck;
 
 		for_each_busy_worker(worker, i, pos, gcwq)
-			if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD) &&
-			    trustee_unset_rogue(worker))
+			if (trustee_unset_rogue(worker))
 				goto recheck;
 		break;
 	}
@@ -2226,6 +2283,11 @@ void thaw_workqueues(void)
 			       cwq->nr_active < cwq->max_active)
 				cwq_activate_first_delayed(cwq);
 
+			/* perform delayed unbind from single cpu if empty */
+			if (wq->single_cpu == gcwq->cpu &&
+			    !cwq->nr_active && list_empty(&cwq->delayed_works))
+				cwq_unbind_single_cpu(cwq);
+
 			wake_up_process(cwq->worker->task);
 		}
 
@@ -2251,7 +2313,6 @@ void __init init_workqueues(void)
 	BUILD_BUG_ON(__alignof__(struct cpu_workqueue_struct) <
 		     __alignof__(unsigned long long));
 
-	singlethread_cpu = cpumask_first(cpu_possible_mask);
 	hotcpu_notifier(workqueue_cpu_callback, 0);
 
 	/* initialize gcwqs */
-- 
1.6.4.2


  parent reply	other threads:[~2009-12-18 12:58 UTC|newest]

Thread overview: 104+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2009-12-18 12:57 Tejun Heo
2009-12-18 12:57 ` [PATCH 01/27] sched: rename preempt_notifiers to sched_notifiers and refactor implementation Tejun Heo
2009-12-18 12:57 ` [PATCH 02/27] sched: refactor try_to_wake_up() Tejun Heo
2009-12-18 12:57 ` [PATCH 03/27] sched: implement __set_cpus_allowed() Tejun Heo
2009-12-18 12:57 ` [PATCH 04/27] sched: make sched_notifiers unconditional Tejun Heo
2009-12-18 12:57 ` [PATCH 05/27] sched: add wakeup/sleep sched_notifiers and allow NULL notifier ops Tejun Heo
2009-12-18 12:57 ` [PATCH 06/27] sched: implement try_to_wake_up_local() Tejun Heo
2009-12-18 12:57 ` [PATCH 07/27] acpi: use queue_work_on() instead of binding workqueue worker to cpu0 Tejun Heo
2009-12-18 12:57 ` [PATCH 08/27] stop_machine: reimplement without using workqueue Tejun Heo
2009-12-18 12:57 ` [PATCH 09/27] workqueue: misc/cosmetic updates Tejun Heo
2009-12-18 12:57 ` [PATCH 10/27] workqueue: merge feature parameters into flags Tejun Heo
2009-12-18 12:57 ` [PATCH 11/27] workqueue: define both bit position and mask for work flags Tejun Heo
2009-12-18 12:57 ` [PATCH 12/27] workqueue: separate out process_one_work() Tejun Heo
2009-12-18 12:57 ` [PATCH 13/27] workqueue: temporarily disable workqueue tracing Tejun Heo
2009-12-18 12:57 ` [PATCH 14/27] workqueue: kill cpu_populated_map Tejun Heo
2009-12-18 12:57 ` [PATCH 15/27] workqueue: update cwq alignement Tejun Heo
2009-12-18 12:57 ` [PATCH 16/27] workqueue: reimplement workqueue flushing using color coded works Tejun Heo
2009-12-18 12:57 ` [PATCH 17/27] workqueue: introduce worker Tejun Heo
2009-12-18 12:57 ` [PATCH 18/27] workqueue: reimplement work flushing using linked works Tejun Heo
2009-12-18 12:58 ` [PATCH 19/27] workqueue: implement per-cwq active work limit Tejun Heo
2009-12-18 12:58 ` [PATCH 20/27] workqueue: reimplement workqueue freeze using max_active Tejun Heo
2009-12-18 12:58 ` [PATCH 21/27] workqueue: introduce global cwq and unify cwq locks Tejun Heo
2009-12-18 12:58 ` [PATCH 22/27] workqueue: implement worker states Tejun Heo
2009-12-18 12:58 ` [PATCH 23/27] workqueue: reimplement CPU hotplugging support using trustee Tejun Heo
2009-12-18 12:58 ` Tejun Heo [this message]
2009-12-18 12:58 ` [PATCH 25/27] workqueue: use shared worklist and pool all workers per cpu Tejun Heo
2009-12-18 12:58 ` [PATCH 26/27] workqueue: implement concurrency managed dynamic worker pool Tejun Heo
2009-12-18 12:58 ` [PATCH 27/27] workqueue: increase max_active of keventd and kill current_is_keventd() Tejun Heo
2009-12-18 13:00 ` SUBJ: [RFC PATCHSET] concurrency managed workqueue, take#2 Tejun Heo
2009-12-18 13:03 ` Tejun Heo
2009-12-18 13:45 ` workqueue thing Peter Zijlstra
2009-12-18 13:50   ` Andi Kleen
2009-12-18 15:01     ` Arjan van de Ven
2009-12-21  3:19       ` Tejun Heo
2009-12-21  9:17       ` Jens Axboe
2009-12-21 10:35         ` Peter Zijlstra
2009-12-21 11:09         ` Andi Kleen
2009-12-21 11:17           ` Arjan van de Ven
2009-12-21 11:33             ` Andi Kleen
2009-12-21 13:18             ` Tejun Heo
2009-12-21 11:11         ` Arjan van de Ven
2009-12-21 13:22           ` Tejun Heo
2009-12-21 13:53             ` Arjan van de Ven
2009-12-21 14:19               ` Tejun Heo
2009-12-21 15:19                 ` Arjan van de Ven
2009-12-22  0:00                   ` Tejun Heo
2009-12-22 11:10                     ` Peter Zijlstra
2009-12-22 17:20                       ` Linus Torvalds
2009-12-22 17:47                         ` Peter Zijlstra
2009-12-22 18:07                           ` Andi Kleen
2009-12-22 18:20                             ` Peter Zijlstra
2009-12-23  8:17                             ` Stijn Devriendt
2009-12-23  8:43                               ` Peter Zijlstra
2009-12-23  9:01                                 ` Stijn Devriendt
2009-12-22 18:28                           ` Linus Torvalds
2009-12-23  8:06                             ` Johannes Berg
2009-12-23  3:37                           ` Tejun Heo
2009-12-23  6:52                             ` Herbert Xu
2009-12-23  8:00                               ` Steffen Klassert
2009-12-23  8:01                                 ` [PATCH 0/2] Parallel crypto/IPsec v7 Steffen Klassert
2009-12-23  8:03                                   ` [PATCH 1/2] padata: generic parallelization/serialization interface Steffen Klassert
2009-12-23  8:04                                   ` [PATCH 2/2] crypto: pcrypt - Add pcrypt crypto parallelization wrapper Steffen Klassert
2010-01-07  5:39                                   ` [PATCH 0/2] Parallel crypto/IPsec v7 Herbert Xu
2010-01-16  9:44                                     ` David Miller
2009-12-18 15:30   ` workqueue thing Linus Torvalds
2009-12-18 15:39     ` Ingo Molnar
2009-12-18 15:39     ` Peter Zijlstra
2009-12-18 15:47       ` Linus Torvalds
2009-12-18 15:53         ` Peter Zijlstra
2009-12-21  3:04   ` Tejun Heo
2009-12-21  9:22     ` Peter Zijlstra
2009-12-21 13:30       ` Tejun Heo
2009-12-21 14:26         ` Peter Zijlstra
2009-12-21 23:50           ` Tejun Heo
2009-12-22 11:00             ` Peter Zijlstra
2009-12-22 11:03             ` Peter Zijlstra
2009-12-23  3:43               ` Tejun Heo
2009-12-22 11:04             ` Peter Zijlstra
2009-12-23  3:48               ` Tejun Heo
2009-12-22 11:06             ` Peter Zijlstra
2009-12-23  4:18               ` Tejun Heo
2009-12-23  4:42                 ` Linus Torvalds
2009-12-23  6:02                   ` Ingo Molnar
2009-12-23  6:13                     ` Jeff Garzik
2009-12-23  7:53                       ` Ingo Molnar
2009-12-23  8:41                       ` Peter Zijlstra
2009-12-23 10:25                         ` Jeff Garzik
2009-12-23 13:33                           ` Stefan Richter
2009-12-23 14:20                           ` Mark Brown
2009-12-23  7:09                     ` Tejun Heo
2009-12-23  8:01                       ` Ingo Molnar
2009-12-23  8:12                         ` Ingo Molnar
2009-12-23  8:32                           ` Tejun Heo
2009-12-23  8:42                             ` Ingo Molnar
2009-12-23  8:27                         ` Tejun Heo
2009-12-23  8:37                           ` Ingo Molnar
2009-12-23  8:49                             ` Tejun Heo
2009-12-23  8:49                               ` Ingo Molnar
2009-12-23  9:03                                 ` Tejun Heo
2009-12-23 13:40                             ` Stefan Richter
2009-12-23 13:43                               ` Stefan Richter
2009-12-23  8:25                       ` Arjan van de Ven
2009-12-23 13:00                     ` Stefan Richter
2009-12-23  8:31             ` Stijn Devriendt

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1261141088-2014-25-git-send-email-tj@kernel.org \
    --to=tj@kernel.org \
    --cc=akpm@linux-foundation.org \
    --cc=andi@firstfloor.org \
    --cc=arjan@linux.intel.com \
    --cc=avi@redhat.com \
    --cc=awalls@radix.net \
    --cc=cl@linux-foundation.org \
    --cc=dhowells@redhat.com \
    --cc=jeff@garzik.org \
    --cc=jens.axboe@oracle.com \
    --cc=johannes@sipsolutions.net \
    --cc=linux-kernel@vger.kernel.org \
    --cc=mingo@elte.hu \
    --cc=peterz@infradead.org \
    --cc=rusty@rustcorp.com.au \
    --cc=torvalds@linux-foundation.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).