All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH tip/core/rcu 0/3] rcu: simplify rcu_barrier() interaction with CPU hotplug
@ 2009-09-29  4:49 Paul E. McKenney
  2009-09-29  4:50 ` [PATCH tip/core/rcu 1/3] rcu: replace the rcu_barrier enum with pointer to call_rcu*() function Paul E. McKenney
                   ` (2 more replies)
  0 siblings, 3 replies; 13+ messages in thread
From: Paul E. McKenney @ 2009-09-29  4:49 UTC (permalink / raw)
  To: linux-kernel
  Cc: mingo, laijs, dipankar, akpm, mathieu.desnoyers, josh, dvhltc,
	niv, tglx, peterz, rostedt, Valdis.Kletnieks, dhowells

This patchset simplifies the interaction of rcu_barrier() with CPU
hotplug operations.  The main point of this patchset is to impose an
invariant: offline CPUs never have any RCU callbacks queued.  However,
rcutiny doesn't permit CPU hotplug (yes, I have worked in environments
where the last CPU could be offlined, but Linux is thankfully not one of
them), so the patchset goes as follows:

o	Replace the rcu_barrier enum with a pointer to the relevant
	call_rcu() function, thus eliminating any confusion about
	which .h file this enum should reside in.

o	Move the rcu_barrier() code to rcutree, and create a lightweight
	variant of rcu_barrier() that is suitable for rcutiny.  This
	lightweight variant is identical to rcutree's implementation of
	synchronize_rcu() and friends.

o	Create a list in the rcu_state structure that holds RCU
	callbacks that were "orphaned" by CPUs that just went offline.
	The CPU_DYING notifier moves all RCU callbacks from the outgoing
	CPU to the rcu_state lists, and the CPU_DEAD notifier and
	_rcu_barrier() function "adopt" these orphans.

 b/kernel/rcupdate.c       |   33 +++-------
 b/kernel/rcutiny.c        |   36 ++++++++++
 b/kernel/rcutree.c        |  120 ++++++++++++++++++++++++++++++++++++
 b/kernel/rcutree.h        |   11 +++
 b/kernel/rcutree_plugin.h |   34 ++++++++++
 b/kernel/rcutree_trace.c  |    5 -
 kernel/rcupdate.c         |  120 ------------------------------------
 kernel/rcutree.c          |  151 ++++++++++++++++++++++++----------------------
 8 files changed, 291 insertions(+), 219 deletions(-)

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCH tip/core/rcu 1/3] rcu: replace the rcu_barrier enum with pointer to call_rcu*() function
  2009-09-29  4:49 [PATCH tip/core/rcu 0/3] rcu: simplify rcu_barrier() interaction with CPU hotplug Paul E. McKenney
@ 2009-09-29  4:50 ` Paul E. McKenney
  2009-10-01  7:46   ` [tip:core/rcu] rcu: Replace " tip-bot for Paul E. McKenney
  2009-10-05 19:10   ` tip-bot for Paul E. McKenney
  2009-09-29  4:50 ` [PATCH tip/core/rcu 2/3] rcu: move rcu_barrier() to rcutree, make lightweight rcu_barrier() for rcutiny Paul E. McKenney
  2009-09-29  4:50 ` [PATCH tip/core/rcu 3/3] rcu: make hot-unplugged CPU relinquish its own RCU callbacks Paul E. McKenney
  2 siblings, 2 replies; 13+ messages in thread
From: Paul E. McKenney @ 2009-09-29  4:50 UTC (permalink / raw)
  To: linux-kernel
  Cc: mingo, laijs, dipankar, akpm, mathieu.desnoyers, josh, dvhltc,
	niv, tglx, peterz, rostedt, Valdis.Kletnieks, dhowells,
	Paul E. McKenney

The rcu_barrier enum causes several problems: (1) you have to define
the enum somewhere, and there is no convenient place, (2) the difference
between TREE_RCU and TREE_PREEMPT_RCU causes problems when you need to
map from rcu_barrier enum to struct rcu_state, (3) the switch statement
are large, and (4) TINY_RCU really needs a different rcu_barrier()
than do the treercu implementations.

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
---
 kernel/rcupdate.c |   32 ++++++++++----------------------
 1 files changed, 10 insertions(+), 22 deletions(-)

diff --git a/kernel/rcupdate.c b/kernel/rcupdate.c
index 2480534..fd3ec49 100644
--- a/kernel/rcupdate.c
+++ b/kernel/rcupdate.c
@@ -53,12 +53,6 @@ struct lockdep_map rcu_lock_map =
 EXPORT_SYMBOL_GPL(rcu_lock_map);
 #endif
 
-enum rcu_barrier {
-	RCU_BARRIER_STD,
-	RCU_BARRIER_BH,
-	RCU_BARRIER_SCHED,
-};
-
 static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
 static atomic_t rcu_barrier_cpu_count;
 static DEFINE_MUTEX(rcu_barrier_mutex);
@@ -188,19 +182,12 @@ static void rcu_barrier_func(void *type)
 {
 	int cpu = smp_processor_id();
 	struct rcu_head *head = &per_cpu(rcu_barrier_head, cpu);
+	void (*call_rcu_func)(struct rcu_head *head,
+			      void (*func)(struct rcu_head *head));
 
 	atomic_inc(&rcu_barrier_cpu_count);
-	switch ((enum rcu_barrier)type) {
-	case RCU_BARRIER_STD:
-		call_rcu(head, rcu_barrier_callback);
-		break;
-	case RCU_BARRIER_BH:
-		call_rcu_bh(head, rcu_barrier_callback);
-		break;
-	case RCU_BARRIER_SCHED:
-		call_rcu_sched(head, rcu_barrier_callback);
-		break;
-	}
+	call_rcu_func = type;
+	call_rcu_func(head, rcu_barrier_callback);
 }
 
 static inline void wait_migrated_callbacks(void)
@@ -213,7 +200,8 @@ static inline void wait_migrated_callbacks(void)
  * Orchestrate the specified type of RCU barrier, waiting for all
  * RCU callbacks of the specified type to complete.
  */
-static void _rcu_barrier(enum rcu_barrier type)
+static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
+					       void (*func)(struct rcu_head *head)))
 {
 	BUG_ON(in_interrupt());
 	/* Take cpucontrol mutex to protect against CPU hotplug */
@@ -229,7 +217,7 @@ static void _rcu_barrier(enum rcu_barrier type)
 	 * early.
 	 */
 	atomic_set(&rcu_barrier_cpu_count, 1);
-	on_each_cpu(rcu_barrier_func, (void *)type, 1);
+	on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
 	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
 		complete(&rcu_barrier_completion);
 	wait_for_completion(&rcu_barrier_completion);
@@ -242,7 +230,7 @@ static void _rcu_barrier(enum rcu_barrier type)
  */
 void rcu_barrier(void)
 {
-	_rcu_barrier(RCU_BARRIER_STD);
+	_rcu_barrier(call_rcu);
 }
 EXPORT_SYMBOL_GPL(rcu_barrier);
 
@@ -251,7 +239,7 @@ EXPORT_SYMBOL_GPL(rcu_barrier);
  */
 void rcu_barrier_bh(void)
 {
-	_rcu_barrier(RCU_BARRIER_BH);
+	_rcu_barrier(call_rcu_bh);
 }
 EXPORT_SYMBOL_GPL(rcu_barrier_bh);
 
@@ -260,7 +248,7 @@ EXPORT_SYMBOL_GPL(rcu_barrier_bh);
  */
 void rcu_barrier_sched(void)
 {
-	_rcu_barrier(RCU_BARRIER_SCHED);
+	_rcu_barrier(call_rcu_sched);
 }
 EXPORT_SYMBOL_GPL(rcu_barrier_sched);
 
-- 
1.5.2.5


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH tip/core/rcu 2/3] rcu: move rcu_barrier() to rcutree, make lightweight rcu_barrier() for rcutiny
  2009-09-29  4:49 [PATCH tip/core/rcu 0/3] rcu: simplify rcu_barrier() interaction with CPU hotplug Paul E. McKenney
  2009-09-29  4:50 ` [PATCH tip/core/rcu 1/3] rcu: replace the rcu_barrier enum with pointer to call_rcu*() function Paul E. McKenney
@ 2009-09-29  4:50 ` Paul E. McKenney
  2009-10-01  7:46   ` [tip:core/rcu] rcu: Move " tip-bot for Paul E. McKenney
  2009-10-05 19:10   ` tip-bot for Paul E. McKenney
  2009-09-29  4:50 ` [PATCH tip/core/rcu 3/3] rcu: make hot-unplugged CPU relinquish its own RCU callbacks Paul E. McKenney
  2 siblings, 2 replies; 13+ messages in thread
From: Paul E. McKenney @ 2009-09-29  4:50 UTC (permalink / raw)
  To: linux-kernel
  Cc: mingo, laijs, dipankar, akpm, mathieu.desnoyers, josh, dvhltc,
	niv, tglx, peterz, rostedt, Valdis.Kletnieks, dhowells,
	Paul E. McKenney

From: Paul E. McKenney <paulmck@linux.vnet.ibm.com>

Move the existing rcu_barrier() implementation to rcutree.c, permitting
creation of a smaller and lighter-weight implementation for rcutiny.c
(which is equivalent to rcutree.c's synchronize_rcu() because rcutiny.c
supports but one CPU).  This opens the way to simplify rcutree.c's
rcu_barrier() implementation in a later patch.

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
---
 kernel/rcupdate.c |  120 +----------------------------------------------------
 kernel/rcutiny.c  |   36 ++++++++++++++++
 kernel/rcutree.c  |  119 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 156 insertions(+), 119 deletions(-)

diff --git a/kernel/rcupdate.c b/kernel/rcupdate.c
index fd3ec49..7625f20 100644
--- a/kernel/rcupdate.c
+++ b/kernel/rcupdate.c
@@ -53,16 +53,8 @@ struct lockdep_map rcu_lock_map =
 EXPORT_SYMBOL_GPL(rcu_lock_map);
 #endif
 
-static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
-static atomic_t rcu_barrier_cpu_count;
-static DEFINE_MUTEX(rcu_barrier_mutex);
-static struct completion rcu_barrier_completion;
 int rcu_scheduler_active __read_mostly;
 
-static atomic_t rcu_migrate_type_count = ATOMIC_INIT(0);
-static struct rcu_head rcu_migrate_head[3];
-static DECLARE_WAIT_QUEUE_HEAD(rcu_migrate_wq);
-
 /*
  * Awaken the corresponding synchronize_rcu() instance now that a
  * grace period has elapsed.
@@ -169,120 +161,10 @@ EXPORT_SYMBOL_GPL(synchronize_rcu_bh);
 
 #endif /* #ifndef CONFIG_TINY_RCU */
 
-static void rcu_barrier_callback(struct rcu_head *notused)
-{
-	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
-		complete(&rcu_barrier_completion);
-}
-
-/*
- * Called with preemption disabled, and from cross-cpu IRQ context.
- */
-static void rcu_barrier_func(void *type)
-{
-	int cpu = smp_processor_id();
-	struct rcu_head *head = &per_cpu(rcu_barrier_head, cpu);
-	void (*call_rcu_func)(struct rcu_head *head,
-			      void (*func)(struct rcu_head *head));
-
-	atomic_inc(&rcu_barrier_cpu_count);
-	call_rcu_func = type;
-	call_rcu_func(head, rcu_barrier_callback);
-}
-
-static inline void wait_migrated_callbacks(void)
-{
-	wait_event(rcu_migrate_wq, !atomic_read(&rcu_migrate_type_count));
-	smp_mb(); /* In case we didn't sleep. */
-}
-
-/*
- * Orchestrate the specified type of RCU barrier, waiting for all
- * RCU callbacks of the specified type to complete.
- */
-static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
-					       void (*func)(struct rcu_head *head)))
-{
-	BUG_ON(in_interrupt());
-	/* Take cpucontrol mutex to protect against CPU hotplug */
-	mutex_lock(&rcu_barrier_mutex);
-	init_completion(&rcu_barrier_completion);
-	/*
-	 * Initialize rcu_barrier_cpu_count to 1, then invoke
-	 * rcu_barrier_func() on each CPU, so that each CPU also has
-	 * incremented rcu_barrier_cpu_count.  Only then is it safe to
-	 * decrement rcu_barrier_cpu_count -- otherwise the first CPU
-	 * might complete its grace period before all of the other CPUs
-	 * did their increment, causing this function to return too
-	 * early.
-	 */
-	atomic_set(&rcu_barrier_cpu_count, 1);
-	on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
-	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
-		complete(&rcu_barrier_completion);
-	wait_for_completion(&rcu_barrier_completion);
-	mutex_unlock(&rcu_barrier_mutex);
-	wait_migrated_callbacks();
-}
-
-/**
- * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
- */
-void rcu_barrier(void)
-{
-	_rcu_barrier(call_rcu);
-}
-EXPORT_SYMBOL_GPL(rcu_barrier);
-
-/**
- * rcu_barrier_bh - Wait until all in-flight call_rcu_bh() callbacks complete.
- */
-void rcu_barrier_bh(void)
-{
-	_rcu_barrier(call_rcu_bh);
-}
-EXPORT_SYMBOL_GPL(rcu_barrier_bh);
-
-/**
- * rcu_barrier_sched - Wait for in-flight call_rcu_sched() callbacks.
- */
-void rcu_barrier_sched(void)
-{
-	_rcu_barrier(call_rcu_sched);
-}
-EXPORT_SYMBOL_GPL(rcu_barrier_sched);
-
-static void rcu_migrate_callback(struct rcu_head *notused)
-{
-	if (atomic_dec_and_test(&rcu_migrate_type_count))
-		wake_up(&rcu_migrate_wq);
-}
-
 static int __cpuinit rcu_barrier_cpu_hotplug(struct notifier_block *self,
 		unsigned long action, void *hcpu)
 {
-	rcu_cpu_notify(self, action, hcpu);
-	if (action == CPU_DYING) {
-		/*
-		 * preempt_disable() in on_each_cpu() prevents stop_machine(),
-		 * so when "on_each_cpu(rcu_barrier_func, (void *)type, 1);"
-		 * returns, all online cpus have queued rcu_barrier_func(),
-		 * and the dead cpu(if it exist) queues rcu_migrate_callback()s.
-		 *
-		 * These callbacks ensure _rcu_barrier() waits for all
-		 * RCU callbacks of the specified type to complete.
-		 */
-		atomic_set(&rcu_migrate_type_count, 3);
-		call_rcu_bh(rcu_migrate_head, rcu_migrate_callback);
-		call_rcu_sched(rcu_migrate_head + 1, rcu_migrate_callback);
-		call_rcu(rcu_migrate_head + 2, rcu_migrate_callback);
-	} else if (action == CPU_DOWN_PREPARE) {
-		/* Don't need to wait until next removal operation. */
-		/* rcu_migrate_head is protected by cpu_add_remove_lock */
-		wait_migrated_callbacks();
-	}
-
-	return NOTIFY_OK;
+	return rcu_cpu_notify(self, action, hcpu);
 }
 
 void __init rcu_init(void)
diff --git a/kernel/rcutiny.c b/kernel/rcutiny.c
index 070c65f..89124b0 100644
--- a/kernel/rcutiny.c
+++ b/kernel/rcutiny.c
@@ -239,6 +239,42 @@ void call_rcu_bh(struct rcu_head *head,
 }
 EXPORT_SYMBOL_GPL(call_rcu_bh);
 
+void rcu_barrier(void)
+{
+	struct rcu_synchronize rcu;
+
+	init_completion(&rcu.completion);
+	/* Will wake me after RCU finished. */
+	call_rcu(&rcu.head, wakeme_after_rcu);
+	/* Wait for it. */
+	wait_for_completion(&rcu.completion);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier);
+
+void rcu_barrier_bh(void)
+{
+	struct rcu_synchronize rcu;
+
+	init_completion(&rcu.completion);
+	/* Will wake me after RCU finished. */
+	call_rcu_bh(&rcu.head, wakeme_after_rcu);
+	/* Wait for it. */
+	wait_for_completion(&rcu.completion);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier_bh);
+
+void rcu_barrier_sched(void)
+{
+	struct rcu_synchronize rcu;
+
+	init_completion(&rcu.completion);
+	/* Will wake me after RCU finished. */
+	call_rcu_sched(&rcu.head, wakeme_after_rcu);
+	/* Wait for it. */
+	wait_for_completion(&rcu.completion);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier_sched);
+
 void __rcu_init(void)
 {
 	open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
diff --git a/kernel/rcutree.c b/kernel/rcutree.c
index bb313fd..678b2e2 100644
--- a/kernel/rcutree.c
+++ b/kernel/rcutree.c
@@ -1358,6 +1358,103 @@ int rcu_needs_cpu(int cpu)
 	       rcu_preempt_needs_cpu(cpu);
 }
 
+static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
+static atomic_t rcu_barrier_cpu_count;
+static DEFINE_MUTEX(rcu_barrier_mutex);
+static struct completion rcu_barrier_completion;
+static atomic_t rcu_migrate_type_count = ATOMIC_INIT(0);
+static struct rcu_head rcu_migrate_head[3];
+static DECLARE_WAIT_QUEUE_HEAD(rcu_migrate_wq);
+
+static void rcu_barrier_callback(struct rcu_head *notused)
+{
+	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
+		complete(&rcu_barrier_completion);
+}
+
+/*
+ * Called with preemption disabled, and from cross-cpu IRQ context.
+ */
+static void rcu_barrier_func(void *type)
+{
+	int cpu = smp_processor_id();
+	struct rcu_head *head = &per_cpu(rcu_barrier_head, cpu);
+	void (*call_rcu_func)(struct rcu_head *head,
+			      void (*func)(struct rcu_head *head));
+
+	atomic_inc(&rcu_barrier_cpu_count);
+	call_rcu_func = type;
+	call_rcu_func(head, rcu_barrier_callback);
+}
+
+static inline void wait_migrated_callbacks(void)
+{
+	wait_event(rcu_migrate_wq, !atomic_read(&rcu_migrate_type_count));
+	smp_mb(); /* In case we didn't sleep. */
+}
+
+/*
+ * Orchestrate the specified type of RCU barrier, waiting for all
+ * RCU callbacks of the specified type to complete.
+ */
+static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
+					       void (*func)(struct rcu_head *head)))
+{
+	BUG_ON(in_interrupt());
+	/* Take cpucontrol mutex to protect against CPU hotplug */
+	mutex_lock(&rcu_barrier_mutex);
+	init_completion(&rcu_barrier_completion);
+	/*
+	 * Initialize rcu_barrier_cpu_count to 1, then invoke
+	 * rcu_barrier_func() on each CPU, so that each CPU also has
+	 * incremented rcu_barrier_cpu_count.  Only then is it safe to
+	 * decrement rcu_barrier_cpu_count -- otherwise the first CPU
+	 * might complete its grace period before all of the other CPUs
+	 * did their increment, causing this function to return too
+	 * early.
+	 */
+	atomic_set(&rcu_barrier_cpu_count, 1);
+	on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
+	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
+		complete(&rcu_barrier_completion);
+	wait_for_completion(&rcu_barrier_completion);
+	mutex_unlock(&rcu_barrier_mutex);
+	wait_migrated_callbacks();
+}
+
+/**
+ * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
+ */
+void rcu_barrier(void)
+{
+	_rcu_barrier(call_rcu);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier);
+
+/**
+ * rcu_barrier_bh - Wait until all in-flight call_rcu_bh() callbacks complete.
+ */
+void rcu_barrier_bh(void)
+{
+	_rcu_barrier(call_rcu_bh);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier_bh);
+
+/**
+ * rcu_barrier_sched - Wait for in-flight call_rcu_sched() callbacks.
+ */
+void rcu_barrier_sched(void)
+{
+	_rcu_barrier(call_rcu_sched);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier_sched);
+
+static void rcu_migrate_callback(struct rcu_head *notused)
+{
+	if (atomic_dec_and_test(&rcu_migrate_type_count))
+		wake_up(&rcu_migrate_wq);
+}
+
 /*
  * Do boot-time initialization of a CPU's per-CPU RCU data.
  */
@@ -1454,6 +1551,28 @@ int __cpuinit rcu_cpu_notify(struct notifier_block *self,
 	case CPU_UP_PREPARE_FROZEN:
 		rcu_online_cpu(cpu);
 		break;
+	case CPU_DOWN_PREPARE:
+	case CPU_DOWN_PREPARE_FROZEN:
+		/* Don't need to wait until next removal operation. */
+		/* rcu_migrate_head is protected by cpu_add_remove_lock */
+		wait_migrated_callbacks();
+		break;
+	case CPU_DYING:
+	case CPU_DYING_FROZEN:
+		/*
+		 * preempt_disable() in on_each_cpu() prevents stop_machine(),
+		 * so when "on_each_cpu(rcu_barrier_func, (void *)type, 1);"
+		 * returns, all online cpus have queued rcu_barrier_func(),
+		 * and the dead cpu(if it exist) queues rcu_migrate_callback()s.
+		 *
+		 * These callbacks ensure _rcu_barrier() waits for all
+		 * RCU callbacks of the specified type to complete.
+		 */
+		atomic_set(&rcu_migrate_type_count, 3);
+		call_rcu_bh(rcu_migrate_head, rcu_migrate_callback);
+		call_rcu_sched(rcu_migrate_head + 1, rcu_migrate_callback);
+		call_rcu(rcu_migrate_head + 2, rcu_migrate_callback);
+		break;
 	case CPU_DEAD:
 	case CPU_DEAD_FROZEN:
 	case CPU_UP_CANCELED:
-- 
1.5.2.5


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH tip/core/rcu 3/3] rcu: make hot-unplugged CPU relinquish its own RCU callbacks
  2009-09-29  4:49 [PATCH tip/core/rcu 0/3] rcu: simplify rcu_barrier() interaction with CPU hotplug Paul E. McKenney
  2009-09-29  4:50 ` [PATCH tip/core/rcu 1/3] rcu: replace the rcu_barrier enum with pointer to call_rcu*() function Paul E. McKenney
  2009-09-29  4:50 ` [PATCH tip/core/rcu 2/3] rcu: move rcu_barrier() to rcutree, make lightweight rcu_barrier() for rcutiny Paul E. McKenney
@ 2009-09-29  4:50 ` Paul E. McKenney
  2009-10-01  7:46   ` [tip:core/rcu] rcu: Make " tip-bot for Paul E. McKenney
  2009-10-05 19:11   ` tip-bot for Paul E. McKenney
  2 siblings, 2 replies; 13+ messages in thread
From: Paul E. McKenney @ 2009-09-29  4:50 UTC (permalink / raw)
  To: linux-kernel
  Cc: mingo, laijs, dipankar, akpm, mathieu.desnoyers, josh, dvhltc,
	niv, tglx, peterz, rostedt, Valdis.Kletnieks, dhowells,
	Paul E. McKenney

From: Paul E. McKenney <paulmck@linux.vnet.ibm.com>

The current interaction between RCU and CPU hotplug requires that
RCU block in CPU notifiers waiting for callbacks to drain.  This can
be greatly simplified by haing each CPU relinquish its own callbacks,
and for both _rcu_barrier() and CPU_DEAD notifiers to adopt all callbacks
that were previously relinquished.  This change also eliminates the
possibility of certain types of hangs due to the previous practice of
waiting for callbacks to be invoked from within CPU notifiers.  If you
don't every wait, you cannot hang.

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
---
 kernel/rcutree.c        |  151 ++++++++++++++++++++++++----------------------
 kernel/rcutree.h        |   11 +++-
 kernel/rcutree_plugin.h |   34 +++++++++++
 kernel/rcutree_trace.c  |    4 +-
 4 files changed, 125 insertions(+), 75 deletions(-)

diff --git a/kernel/rcutree.c b/kernel/rcutree.c
index 678b2e2..13b016b 100644
--- a/kernel/rcutree.c
+++ b/kernel/rcutree.c
@@ -62,6 +62,9 @@
 	.gpnum = -300, \
 	.completed = -300, \
 	.onofflock = __SPIN_LOCK_UNLOCKED(&name.onofflock), \
+	.orphan_cbs_list = NULL, \
+	.orphan_cbs_tail = &name.orphan_cbs_list, \
+	.orphan_qlen = 0, \
 	.fqslock = __SPIN_LOCK_UNLOCKED(&name.fqslock), \
 	.n_force_qs = 0, \
 	.n_force_qs_ngp = 0, \
@@ -833,17 +836,63 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
 #ifdef CONFIG_HOTPLUG_CPU
 
 /*
+ * Move a dying CPU's RCU callbacks to the ->orphan_cbs_list for the
+ * specified flavor of RCU.  The callbacks will be adopted by the next
+ * _rcu_barrier() invocation or by the CPU_DEAD notifier, whichever
+ * comes first.  Because this is invoked from the CPU_DYING notifier,
+ * irqs are already disabled.
+ */
+static void rcu_send_cbs_to_orphanage(struct rcu_state *rsp)
+{
+	int i;
+	struct rcu_data *rdp = rsp->rda[smp_processor_id()];
+
+	if (rdp->nxtlist == NULL)
+		return;  /* irqs disabled, so comparison is stable. */
+	spin_lock(&rsp->onofflock);  /* irqs already disabled. */
+	*rsp->orphan_cbs_tail = rdp->nxtlist;
+	rsp->orphan_cbs_tail = rdp->nxttail[RCU_NEXT_TAIL];
+	rdp->nxtlist = NULL;
+	for (i = 0; i < RCU_NEXT_SIZE; i++)
+		rdp->nxttail[i] = &rdp->nxtlist;
+	rsp->orphan_qlen += rdp->qlen;
+	rdp->qlen = 0;
+	spin_unlock(&rsp->onofflock);  /* irqs remain disabled. */
+}
+
+/*
+ * Adopt previously orphaned RCU callbacks.
+ */
+static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
+{
+	unsigned long flags;
+	struct rcu_data *rdp;
+
+	spin_lock_irqsave(&rsp->onofflock, flags);
+	rdp = rsp->rda[smp_processor_id()];
+	if (rsp->orphan_cbs_list == NULL) {
+		spin_unlock_irqrestore(&rsp->onofflock, flags);
+		return;
+	}
+	*rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_cbs_list;
+	rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_cbs_tail;
+	rdp->qlen += rsp->orphan_qlen;
+	rsp->orphan_cbs_list = NULL;
+	rsp->orphan_cbs_tail = &rsp->orphan_cbs_list;
+	rsp->orphan_qlen = 0;
+	spin_unlock_irqrestore(&rsp->onofflock, flags);
+}
+
+/*
  * Remove the outgoing CPU from the bitmasks in the rcu_node hierarchy
  * and move all callbacks from the outgoing CPU to the current one.
  */
 static void __rcu_offline_cpu(int cpu, struct rcu_state *rsp)
 {
-	int i;
 	unsigned long flags;
 	long lastcomp;
 	unsigned long mask;
 	struct rcu_data *rdp = rsp->rda[cpu];
-	struct rcu_data *rdp_me;
 	struct rcu_node *rnp;
 
 	/* Exclude any attempts to start a new grace period. */
@@ -866,32 +915,9 @@ static void __rcu_offline_cpu(int cpu, struct rcu_state *rsp)
 	} while (rnp != NULL);
 	lastcomp = rsp->completed;
 
-	spin_unlock(&rsp->onofflock);		/* irqs remain disabled. */
+	spin_unlock_irqrestore(&rsp->onofflock, flags);
 
-	/*
-	 * Move callbacks from the outgoing CPU to the running CPU.
-	 * Note that the outgoing CPU is now quiescent, so it is now
-	 * (uncharacteristically) safe to access its rcu_data structure.
-	 * Note also that we must carefully retain the order of the
-	 * outgoing CPU's callbacks in order for rcu_barrier() to work
-	 * correctly.  Finally, note that we start all the callbacks
-	 * afresh, even those that have passed through a grace period
-	 * and are therefore ready to invoke.  The theory is that hotplug
-	 * events are rare, and that if they are frequent enough to
-	 * indefinitely delay callbacks, you have far worse things to
-	 * be worrying about.
-	 */
-	if (rdp->nxtlist != NULL) {
-		rdp_me = rsp->rda[smp_processor_id()];
-		*rdp_me->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist;
-		rdp_me->nxttail[RCU_NEXT_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
-		rdp->nxtlist = NULL;
-		for (i = 0; i < RCU_NEXT_SIZE; i++)
-			rdp->nxttail[i] = &rdp->nxtlist;
-		rdp_me->qlen += rdp->qlen;
-		rdp->qlen = 0;
-	}
-	local_irq_restore(flags);
+	rcu_adopt_orphan_cbs(rsp);
 }
 
 /*
@@ -909,6 +935,14 @@ static void rcu_offline_cpu(int cpu)
 
 #else /* #ifdef CONFIG_HOTPLUG_CPU */
 
+static void rcu_send_cbs_to_orphanage(struct rcu_state *rsp)
+{
+}
+
+static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
+{
+}
+
 static void rcu_offline_cpu(int cpu)
 {
 }
@@ -1362,9 +1396,6 @@ static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
 static atomic_t rcu_barrier_cpu_count;
 static DEFINE_MUTEX(rcu_barrier_mutex);
 static struct completion rcu_barrier_completion;
-static atomic_t rcu_migrate_type_count = ATOMIC_INIT(0);
-static struct rcu_head rcu_migrate_head[3];
-static DECLARE_WAIT_QUEUE_HEAD(rcu_migrate_wq);
 
 static void rcu_barrier_callback(struct rcu_head *notused)
 {
@@ -1387,21 +1418,16 @@ static void rcu_barrier_func(void *type)
 	call_rcu_func(head, rcu_barrier_callback);
 }
 
-static inline void wait_migrated_callbacks(void)
-{
-	wait_event(rcu_migrate_wq, !atomic_read(&rcu_migrate_type_count));
-	smp_mb(); /* In case we didn't sleep. */
-}
-
 /*
  * Orchestrate the specified type of RCU barrier, waiting for all
  * RCU callbacks of the specified type to complete.
  */
-static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
+static void _rcu_barrier(struct rcu_state *rsp,
+			 void (*call_rcu_func)(struct rcu_head *head,
 					       void (*func)(struct rcu_head *head)))
 {
 	BUG_ON(in_interrupt());
-	/* Take cpucontrol mutex to protect against CPU hotplug */
+	/* Take mutex to serialize concurrent rcu_barrier() requests. */
 	mutex_lock(&rcu_barrier_mutex);
 	init_completion(&rcu_barrier_completion);
 	/*
@@ -1414,29 +1440,22 @@ static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
 	 * early.
 	 */
 	atomic_set(&rcu_barrier_cpu_count, 1);
+	preempt_disable(); /* stop CPU_DYING from filling orphan_cbs_list */
+	rcu_adopt_orphan_cbs(rsp);
 	on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
+	preempt_enable(); /* CPU_DYING can again fill orphan_cbs_list */
 	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
 		complete(&rcu_barrier_completion);
 	wait_for_completion(&rcu_barrier_completion);
 	mutex_unlock(&rcu_barrier_mutex);
-	wait_migrated_callbacks();
-}
-
-/**
- * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
- */
-void rcu_barrier(void)
-{
-	_rcu_barrier(call_rcu);
 }
-EXPORT_SYMBOL_GPL(rcu_barrier);
 
 /**
  * rcu_barrier_bh - Wait until all in-flight call_rcu_bh() callbacks complete.
  */
 void rcu_barrier_bh(void)
 {
-	_rcu_barrier(call_rcu_bh);
+	_rcu_barrier(&rcu_bh_state, call_rcu_bh);
 }
 EXPORT_SYMBOL_GPL(rcu_barrier_bh);
 
@@ -1445,16 +1464,10 @@ EXPORT_SYMBOL_GPL(rcu_barrier_bh);
  */
 void rcu_barrier_sched(void)
 {
-	_rcu_barrier(call_rcu_sched);
+	_rcu_barrier(&rcu_sched_state, call_rcu_sched);
 }
 EXPORT_SYMBOL_GPL(rcu_barrier_sched);
 
-static void rcu_migrate_callback(struct rcu_head *notused)
-{
-	if (atomic_dec_and_test(&rcu_migrate_type_count))
-		wake_up(&rcu_migrate_wq);
-}
-
 /*
  * Do boot-time initialization of a CPU's per-CPU RCU data.
  */
@@ -1551,27 +1564,21 @@ int __cpuinit rcu_cpu_notify(struct notifier_block *self,
 	case CPU_UP_PREPARE_FROZEN:
 		rcu_online_cpu(cpu);
 		break;
-	case CPU_DOWN_PREPARE:
-	case CPU_DOWN_PREPARE_FROZEN:
-		/* Don't need to wait until next removal operation. */
-		/* rcu_migrate_head is protected by cpu_add_remove_lock */
-		wait_migrated_callbacks();
-		break;
 	case CPU_DYING:
 	case CPU_DYING_FROZEN:
 		/*
-		 * preempt_disable() in on_each_cpu() prevents stop_machine(),
+		 * preempt_disable() in _rcu_barrier() prevents stop_machine(),
 		 * so when "on_each_cpu(rcu_barrier_func, (void *)type, 1);"
-		 * returns, all online cpus have queued rcu_barrier_func(),
-		 * and the dead cpu(if it exist) queues rcu_migrate_callback()s.
-		 *
-		 * These callbacks ensure _rcu_barrier() waits for all
-		 * RCU callbacks of the specified type to complete.
+		 * returns, all online cpus have queued rcu_barrier_func().
+		 * The dying CPU clears its cpu_online_mask bit and
+		 * moves all of its RCU callbacks to ->orphan_cbs_list
+		 * in the context of stop_machine(), so subsequent calls
+		 * to _rcu_barrier() will adopt these callbacks and only
+		 * then queue rcu_barrier_func() on all remaining CPUs.
 		 */
-		atomic_set(&rcu_migrate_type_count, 3);
-		call_rcu_bh(rcu_migrate_head, rcu_migrate_callback);
-		call_rcu_sched(rcu_migrate_head + 1, rcu_migrate_callback);
-		call_rcu(rcu_migrate_head + 2, rcu_migrate_callback);
+		rcu_send_cbs_to_orphanage(&rcu_bh_state);
+		rcu_send_cbs_to_orphanage(&rcu_sched_state);
+		rcu_preempt_send_cbs_to_orphanage();
 		break;
 	case CPU_DEAD:
 	case CPU_DEAD_FROZEN:
diff --git a/kernel/rcutree.h b/kernel/rcutree.h
index 676eecd..b40ac57 100644
--- a/kernel/rcutree.h
+++ b/kernel/rcutree.h
@@ -244,7 +244,15 @@ struct rcu_state {
 	/* End  of fields guarded by root rcu_node's lock. */
 
 	spinlock_t onofflock;			/* exclude on/offline and */
-						/*  starting new GP. */
+						/*  starting new GP.  Also */
+						/*  protects the following */
+						/*  orphan_cbs fields. */
+	struct rcu_head *orphan_cbs_list;	/* list of rcu_head structs */
+						/*  orphaned by all CPUs in */
+						/*  a given leaf rcu_node */
+						/*  going offline. */
+	struct rcu_head **orphan_cbs_tail;	/* And tail pointer. */
+	long orphan_qlen;			/* Number of orphaned cbs. */
 	spinlock_t fqslock;			/* Only one task forcing */
 						/*  quiescent states. */
 	unsigned long jiffies_force_qs;		/* Time at which to invoke */
@@ -305,6 +313,7 @@ void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu));
 static int rcu_preempt_pending(int cpu);
 static int rcu_preempt_needs_cpu(int cpu);
 static void __cpuinit rcu_preempt_init_percpu_data(int cpu);
+static void rcu_preempt_send_cbs_to_orphanage(void);
 static void __init __rcu_init_preempt(void);
 
 #endif /* #else #ifdef RCU_TREE_NONCORE */
diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h
index d88dfd3..2fa3f39 100644
--- a/kernel/rcutree_plugin.h
+++ b/kernel/rcutree_plugin.h
@@ -411,6 +411,15 @@ static int rcu_preempt_needs_cpu(int cpu)
 	return !!per_cpu(rcu_preempt_data, cpu).nxtlist;
 }
 
+/**
+ * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
+ */
+void rcu_barrier(void)
+{
+	_rcu_barrier(&rcu_preempt_state, call_rcu);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier);
+
 /*
  * Initialize preemptable RCU's per-CPU data.
  */
@@ -420,6 +429,14 @@ static void __cpuinit rcu_preempt_init_percpu_data(int cpu)
 }
 
 /*
+ * Move preemptable RCU's callbacks to ->orphan_cbs_list.
+ */
+static void rcu_preempt_send_cbs_to_orphanage(void)
+{
+	rcu_send_cbs_to_orphanage(&rcu_preempt_state);
+}
+
+/*
  * Initialize preemptable RCU's state structures.
  */
 static void __init __rcu_init_preempt(void)
@@ -565,6 +582,16 @@ static int rcu_preempt_needs_cpu(int cpu)
 }
 
 /*
+ * Because preemptable RCU does not exist, rcu_barrier() is just
+ * another name for rcu_barrier_sched().
+ */
+void rcu_barrier(void)
+{
+	rcu_barrier_sched();
+}
+EXPORT_SYMBOL_GPL(rcu_barrier);
+
+/*
  * Because preemptable RCU does not exist, there is no per-CPU
  * data to initialize.
  */
@@ -573,6 +600,13 @@ static void __cpuinit rcu_preempt_init_percpu_data(int cpu)
 }
 
 /*
+ * Because there is no preemptable RCU, there are no callbacks to move.
+ */
+static void rcu_preempt_send_cbs_to_orphanage(void)
+{
+}
+
+/*
  * Because preemptable RCU does not exist, it need not be initialized.
  */
 static void __init __rcu_init_preempt(void)
diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
index f09af28..4b31c77 100644
--- a/kernel/rcutree_trace.c
+++ b/kernel/rcutree_trace.c
@@ -159,13 +159,13 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp)
 	struct rcu_node *rnp;
 
 	seq_printf(m, "c=%ld g=%ld s=%d jfq=%ld j=%x "
-		      "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu\n",
+		      "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld\n",
 		   rsp->completed, rsp->gpnum, rsp->signaled,
 		   (long)(rsp->jiffies_force_qs - jiffies),
 		   (int)(jiffies & 0xffff),
 		   rsp->n_force_qs, rsp->n_force_qs_ngp,
 		   rsp->n_force_qs - rsp->n_force_qs_ngp,
-		   rsp->n_force_qs_lh);
+		   rsp->n_force_qs_lh, rsp->orphan_qlen);
 	for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) {
 		if (rnp->level != level) {
 			seq_puts(m, "\n");
-- 
1.5.2.5


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [tip:core/rcu] rcu: Replace the rcu_barrier enum with pointer to call_rcu*() function
  2009-09-29  4:50 ` [PATCH tip/core/rcu 1/3] rcu: replace the rcu_barrier enum with pointer to call_rcu*() function Paul E. McKenney
@ 2009-10-01  7:46   ` tip-bot for Paul E. McKenney
  2009-10-05 19:10   ` tip-bot for Paul E. McKenney
  1 sibling, 0 replies; 13+ messages in thread
From: tip-bot for Paul E. McKenney @ 2009-10-01  7:46 UTC (permalink / raw)
  To: linux-tip-commits
  Cc: linux-kernel, mathieu.desnoyers, paulmck, hpa, mingo, tglx, mingo

Commit-ID:  0e29f18291cb35b84d0a42c1a79886571b6e5409
Gitweb:     http://git.kernel.org/tip/0e29f18291cb35b84d0a42c1a79886571b6e5409
Author:     Paul E. McKenney <paulmck@linux.vnet.ibm.com>
AuthorDate: Mon, 28 Sep 2009 21:50:21 -0700
Committer:  Ingo Molnar <mingo@elte.hu>
CommitDate: Thu, 1 Oct 2009 09:18:31 +0200

rcu: Replace the rcu_barrier enum with pointer to call_rcu*() function

The rcu_barrier enum causes several problems:

  (1) you have to define the enum somewhere, and there is no
      convenient place,

  (2) the difference between TREE_RCU and TREE_PREEMPT_RCU causes
      problems when you need to map from rcu_barrier enum to struct
      rcu_state,

  (3) the switch statement are large, and

  (4) TINY_RCU really needs a different rcu_barrier() than do the
      treercu implementations.

So replace it with a functionally equivalent but cleaner function
pointer abstraction.

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
Acked-by: Mathieu Desnoyers <mathieu.desnoyers@polymtl.ca>
Cc: laijs@cn.fujitsu.com
Cc: dipankar@in.ibm.com
Cc: akpm@linux-foundation.org
Cc: josh@joshtriplett.org
Cc: dvhltc@us.ibm.com
Cc: niv@us.ibm.com
Cc: peterz@infradead.org
Cc: rostedt@goodmis.org
Cc: Valdis.Kletnieks@vt.edu
Cc: dhowells@redhat.com
LKML-Reference: <12541998232366-git-send-email->
Signed-off-by: Ingo Molnar <mingo@elte.hu>


---
 kernel/rcupdate.c |   32 ++++++++++----------------------
 1 files changed, 10 insertions(+), 22 deletions(-)

diff --git a/kernel/rcupdate.c b/kernel/rcupdate.c
index 2480534..fd3ec49 100644
--- a/kernel/rcupdate.c
+++ b/kernel/rcupdate.c
@@ -53,12 +53,6 @@ struct lockdep_map rcu_lock_map =
 EXPORT_SYMBOL_GPL(rcu_lock_map);
 #endif
 
-enum rcu_barrier {
-	RCU_BARRIER_STD,
-	RCU_BARRIER_BH,
-	RCU_BARRIER_SCHED,
-};
-
 static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
 static atomic_t rcu_barrier_cpu_count;
 static DEFINE_MUTEX(rcu_barrier_mutex);
@@ -188,19 +182,12 @@ static void rcu_barrier_func(void *type)
 {
 	int cpu = smp_processor_id();
 	struct rcu_head *head = &per_cpu(rcu_barrier_head, cpu);
+	void (*call_rcu_func)(struct rcu_head *head,
+			      void (*func)(struct rcu_head *head));
 
 	atomic_inc(&rcu_barrier_cpu_count);
-	switch ((enum rcu_barrier)type) {
-	case RCU_BARRIER_STD:
-		call_rcu(head, rcu_barrier_callback);
-		break;
-	case RCU_BARRIER_BH:
-		call_rcu_bh(head, rcu_barrier_callback);
-		break;
-	case RCU_BARRIER_SCHED:
-		call_rcu_sched(head, rcu_barrier_callback);
-		break;
-	}
+	call_rcu_func = type;
+	call_rcu_func(head, rcu_barrier_callback);
 }
 
 static inline void wait_migrated_callbacks(void)
@@ -213,7 +200,8 @@ static inline void wait_migrated_callbacks(void)
  * Orchestrate the specified type of RCU barrier, waiting for all
  * RCU callbacks of the specified type to complete.
  */
-static void _rcu_barrier(enum rcu_barrier type)
+static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
+					       void (*func)(struct rcu_head *head)))
 {
 	BUG_ON(in_interrupt());
 	/* Take cpucontrol mutex to protect against CPU hotplug */
@@ -229,7 +217,7 @@ static void _rcu_barrier(enum rcu_barrier type)
 	 * early.
 	 */
 	atomic_set(&rcu_barrier_cpu_count, 1);
-	on_each_cpu(rcu_barrier_func, (void *)type, 1);
+	on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
 	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
 		complete(&rcu_barrier_completion);
 	wait_for_completion(&rcu_barrier_completion);
@@ -242,7 +230,7 @@ static void _rcu_barrier(enum rcu_barrier type)
  */
 void rcu_barrier(void)
 {
-	_rcu_barrier(RCU_BARRIER_STD);
+	_rcu_barrier(call_rcu);
 }
 EXPORT_SYMBOL_GPL(rcu_barrier);
 
@@ -251,7 +239,7 @@ EXPORT_SYMBOL_GPL(rcu_barrier);
  */
 void rcu_barrier_bh(void)
 {
-	_rcu_barrier(RCU_BARRIER_BH);
+	_rcu_barrier(call_rcu_bh);
 }
 EXPORT_SYMBOL_GPL(rcu_barrier_bh);
 
@@ -260,7 +248,7 @@ EXPORT_SYMBOL_GPL(rcu_barrier_bh);
  */
 void rcu_barrier_sched(void)
 {
-	_rcu_barrier(RCU_BARRIER_SCHED);
+	_rcu_barrier(call_rcu_sched);
 }
 EXPORT_SYMBOL_GPL(rcu_barrier_sched);
 

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [tip:core/rcu] rcu: Move rcu_barrier() to rcutree, make lightweight rcu_barrier() for rcutiny
  2009-09-29  4:50 ` [PATCH tip/core/rcu 2/3] rcu: move rcu_barrier() to rcutree, make lightweight rcu_barrier() for rcutiny Paul E. McKenney
@ 2009-10-01  7:46   ` tip-bot for Paul E. McKenney
  2009-10-05 19:10   ` tip-bot for Paul E. McKenney
  1 sibling, 0 replies; 13+ messages in thread
From: tip-bot for Paul E. McKenney @ 2009-10-01  7:46 UTC (permalink / raw)
  To: linux-tip-commits
  Cc: linux-kernel, mathieu.desnoyers, paulmck, hpa, mingo, tglx, mingo

Commit-ID:  8b681a85b34b23492a1b4832fd40b37e4679373a
Gitweb:     http://git.kernel.org/tip/8b681a85b34b23492a1b4832fd40b37e4679373a
Author:     Paul E. McKenney <paulmck@linux.vnet.ibm.com>
AuthorDate: Mon, 28 Sep 2009 21:50:22 -0700
Committer:  Ingo Molnar <mingo@elte.hu>
CommitDate: Thu, 1 Oct 2009 09:18:44 +0200

rcu: Move rcu_barrier() to rcutree, make lightweight rcu_barrier() for rcutiny

Move the existing rcu_barrier() implementation to rcutree.c,
permitting creation of a smaller and lighter-weight implementation
for rcutiny.c (which is equivalent to rcutree.c's synchronize_rcu()
because rcutiny.c supports but one CPU).

This opens the way to simplify and fix rcutree.c's rcu_barrier()
implementation in a later patch.

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
Acked-by: Mathieu Desnoyers <mathieu.desnoyers@polymtl.ca>
Cc: laijs@cn.fujitsu.com
Cc: dipankar@in.ibm.com
Cc: akpm@linux-foundation.org
Cc: josh@joshtriplett.org
Cc: dvhltc@us.ibm.com
Cc: niv@us.ibm.com
Cc: peterz@infradead.org
Cc: rostedt@goodmis.org
Cc: Valdis.Kletnieks@vt.edu
Cc: dhowells@redhat.com
LKML-Reference: <12541998233817-git-send-email->
Signed-off-by: Ingo Molnar <mingo@elte.hu>


---
 kernel/rcupdate.c |  120 +----------------------------------------------------
 kernel/rcutiny.c  |   36 ++++++++++++++++
 kernel/rcutree.c  |  119 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 156 insertions(+), 119 deletions(-)

diff --git a/kernel/rcupdate.c b/kernel/rcupdate.c
index fd3ec49..7625f20 100644
--- a/kernel/rcupdate.c
+++ b/kernel/rcupdate.c
@@ -53,16 +53,8 @@ struct lockdep_map rcu_lock_map =
 EXPORT_SYMBOL_GPL(rcu_lock_map);
 #endif
 
-static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
-static atomic_t rcu_barrier_cpu_count;
-static DEFINE_MUTEX(rcu_barrier_mutex);
-static struct completion rcu_barrier_completion;
 int rcu_scheduler_active __read_mostly;
 
-static atomic_t rcu_migrate_type_count = ATOMIC_INIT(0);
-static struct rcu_head rcu_migrate_head[3];
-static DECLARE_WAIT_QUEUE_HEAD(rcu_migrate_wq);
-
 /*
  * Awaken the corresponding synchronize_rcu() instance now that a
  * grace period has elapsed.
@@ -169,120 +161,10 @@ EXPORT_SYMBOL_GPL(synchronize_rcu_bh);
 
 #endif /* #ifndef CONFIG_TINY_RCU */
 
-static void rcu_barrier_callback(struct rcu_head *notused)
-{
-	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
-		complete(&rcu_barrier_completion);
-}
-
-/*
- * Called with preemption disabled, and from cross-cpu IRQ context.
- */
-static void rcu_barrier_func(void *type)
-{
-	int cpu = smp_processor_id();
-	struct rcu_head *head = &per_cpu(rcu_barrier_head, cpu);
-	void (*call_rcu_func)(struct rcu_head *head,
-			      void (*func)(struct rcu_head *head));
-
-	atomic_inc(&rcu_barrier_cpu_count);
-	call_rcu_func = type;
-	call_rcu_func(head, rcu_barrier_callback);
-}
-
-static inline void wait_migrated_callbacks(void)
-{
-	wait_event(rcu_migrate_wq, !atomic_read(&rcu_migrate_type_count));
-	smp_mb(); /* In case we didn't sleep. */
-}
-
-/*
- * Orchestrate the specified type of RCU barrier, waiting for all
- * RCU callbacks of the specified type to complete.
- */
-static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
-					       void (*func)(struct rcu_head *head)))
-{
-	BUG_ON(in_interrupt());
-	/* Take cpucontrol mutex to protect against CPU hotplug */
-	mutex_lock(&rcu_barrier_mutex);
-	init_completion(&rcu_barrier_completion);
-	/*
-	 * Initialize rcu_barrier_cpu_count to 1, then invoke
-	 * rcu_barrier_func() on each CPU, so that each CPU also has
-	 * incremented rcu_barrier_cpu_count.  Only then is it safe to
-	 * decrement rcu_barrier_cpu_count -- otherwise the first CPU
-	 * might complete its grace period before all of the other CPUs
-	 * did their increment, causing this function to return too
-	 * early.
-	 */
-	atomic_set(&rcu_barrier_cpu_count, 1);
-	on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
-	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
-		complete(&rcu_barrier_completion);
-	wait_for_completion(&rcu_barrier_completion);
-	mutex_unlock(&rcu_barrier_mutex);
-	wait_migrated_callbacks();
-}
-
-/**
- * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
- */
-void rcu_barrier(void)
-{
-	_rcu_barrier(call_rcu);
-}
-EXPORT_SYMBOL_GPL(rcu_barrier);
-
-/**
- * rcu_barrier_bh - Wait until all in-flight call_rcu_bh() callbacks complete.
- */
-void rcu_barrier_bh(void)
-{
-	_rcu_barrier(call_rcu_bh);
-}
-EXPORT_SYMBOL_GPL(rcu_barrier_bh);
-
-/**
- * rcu_barrier_sched - Wait for in-flight call_rcu_sched() callbacks.
- */
-void rcu_barrier_sched(void)
-{
-	_rcu_barrier(call_rcu_sched);
-}
-EXPORT_SYMBOL_GPL(rcu_barrier_sched);
-
-static void rcu_migrate_callback(struct rcu_head *notused)
-{
-	if (atomic_dec_and_test(&rcu_migrate_type_count))
-		wake_up(&rcu_migrate_wq);
-}
-
 static int __cpuinit rcu_barrier_cpu_hotplug(struct notifier_block *self,
 		unsigned long action, void *hcpu)
 {
-	rcu_cpu_notify(self, action, hcpu);
-	if (action == CPU_DYING) {
-		/*
-		 * preempt_disable() in on_each_cpu() prevents stop_machine(),
-		 * so when "on_each_cpu(rcu_barrier_func, (void *)type, 1);"
-		 * returns, all online cpus have queued rcu_barrier_func(),
-		 * and the dead cpu(if it exist) queues rcu_migrate_callback()s.
-		 *
-		 * These callbacks ensure _rcu_barrier() waits for all
-		 * RCU callbacks of the specified type to complete.
-		 */
-		atomic_set(&rcu_migrate_type_count, 3);
-		call_rcu_bh(rcu_migrate_head, rcu_migrate_callback);
-		call_rcu_sched(rcu_migrate_head + 1, rcu_migrate_callback);
-		call_rcu(rcu_migrate_head + 2, rcu_migrate_callback);
-	} else if (action == CPU_DOWN_PREPARE) {
-		/* Don't need to wait until next removal operation. */
-		/* rcu_migrate_head is protected by cpu_add_remove_lock */
-		wait_migrated_callbacks();
-	}
-
-	return NOTIFY_OK;
+	return rcu_cpu_notify(self, action, hcpu);
 }
 
 void __init rcu_init(void)
diff --git a/kernel/rcutiny.c b/kernel/rcutiny.c
index 070c65f..89124b0 100644
--- a/kernel/rcutiny.c
+++ b/kernel/rcutiny.c
@@ -239,6 +239,42 @@ void call_rcu_bh(struct rcu_head *head,
 }
 EXPORT_SYMBOL_GPL(call_rcu_bh);
 
+void rcu_barrier(void)
+{
+	struct rcu_synchronize rcu;
+
+	init_completion(&rcu.completion);
+	/* Will wake me after RCU finished. */
+	call_rcu(&rcu.head, wakeme_after_rcu);
+	/* Wait for it. */
+	wait_for_completion(&rcu.completion);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier);
+
+void rcu_barrier_bh(void)
+{
+	struct rcu_synchronize rcu;
+
+	init_completion(&rcu.completion);
+	/* Will wake me after RCU finished. */
+	call_rcu_bh(&rcu.head, wakeme_after_rcu);
+	/* Wait for it. */
+	wait_for_completion(&rcu.completion);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier_bh);
+
+void rcu_barrier_sched(void)
+{
+	struct rcu_synchronize rcu;
+
+	init_completion(&rcu.completion);
+	/* Will wake me after RCU finished. */
+	call_rcu_sched(&rcu.head, wakeme_after_rcu);
+	/* Wait for it. */
+	wait_for_completion(&rcu.completion);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier_sched);
+
 void __rcu_init(void)
 {
 	open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
diff --git a/kernel/rcutree.c b/kernel/rcutree.c
index e2e272b..0108570 100644
--- a/kernel/rcutree.c
+++ b/kernel/rcutree.c
@@ -1363,6 +1363,103 @@ int rcu_needs_cpu(int cpu)
 	       rcu_preempt_needs_cpu(cpu);
 }
 
+static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
+static atomic_t rcu_barrier_cpu_count;
+static DEFINE_MUTEX(rcu_barrier_mutex);
+static struct completion rcu_barrier_completion;
+static atomic_t rcu_migrate_type_count = ATOMIC_INIT(0);
+static struct rcu_head rcu_migrate_head[3];
+static DECLARE_WAIT_QUEUE_HEAD(rcu_migrate_wq);
+
+static void rcu_barrier_callback(struct rcu_head *notused)
+{
+	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
+		complete(&rcu_barrier_completion);
+}
+
+/*
+ * Called with preemption disabled, and from cross-cpu IRQ context.
+ */
+static void rcu_barrier_func(void *type)
+{
+	int cpu = smp_processor_id();
+	struct rcu_head *head = &per_cpu(rcu_barrier_head, cpu);
+	void (*call_rcu_func)(struct rcu_head *head,
+			      void (*func)(struct rcu_head *head));
+
+	atomic_inc(&rcu_barrier_cpu_count);
+	call_rcu_func = type;
+	call_rcu_func(head, rcu_barrier_callback);
+}
+
+static inline void wait_migrated_callbacks(void)
+{
+	wait_event(rcu_migrate_wq, !atomic_read(&rcu_migrate_type_count));
+	smp_mb(); /* In case we didn't sleep. */
+}
+
+/*
+ * Orchestrate the specified type of RCU barrier, waiting for all
+ * RCU callbacks of the specified type to complete.
+ */
+static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
+					       void (*func)(struct rcu_head *head)))
+{
+	BUG_ON(in_interrupt());
+	/* Take cpucontrol mutex to protect against CPU hotplug */
+	mutex_lock(&rcu_barrier_mutex);
+	init_completion(&rcu_barrier_completion);
+	/*
+	 * Initialize rcu_barrier_cpu_count to 1, then invoke
+	 * rcu_barrier_func() on each CPU, so that each CPU also has
+	 * incremented rcu_barrier_cpu_count.  Only then is it safe to
+	 * decrement rcu_barrier_cpu_count -- otherwise the first CPU
+	 * might complete its grace period before all of the other CPUs
+	 * did their increment, causing this function to return too
+	 * early.
+	 */
+	atomic_set(&rcu_barrier_cpu_count, 1);
+	on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
+	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
+		complete(&rcu_barrier_completion);
+	wait_for_completion(&rcu_barrier_completion);
+	mutex_unlock(&rcu_barrier_mutex);
+	wait_migrated_callbacks();
+}
+
+/**
+ * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
+ */
+void rcu_barrier(void)
+{
+	_rcu_barrier(call_rcu);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier);
+
+/**
+ * rcu_barrier_bh - Wait until all in-flight call_rcu_bh() callbacks complete.
+ */
+void rcu_barrier_bh(void)
+{
+	_rcu_barrier(call_rcu_bh);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier_bh);
+
+/**
+ * rcu_barrier_sched - Wait for in-flight call_rcu_sched() callbacks.
+ */
+void rcu_barrier_sched(void)
+{
+	_rcu_barrier(call_rcu_sched);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier_sched);
+
+static void rcu_migrate_callback(struct rcu_head *notused)
+{
+	if (atomic_dec_and_test(&rcu_migrate_type_count))
+		wake_up(&rcu_migrate_wq);
+}
+
 /*
  * Do boot-time initialization of a CPU's per-CPU RCU data.
  */
@@ -1459,6 +1556,28 @@ int __cpuinit rcu_cpu_notify(struct notifier_block *self,
 	case CPU_UP_PREPARE_FROZEN:
 		rcu_online_cpu(cpu);
 		break;
+	case CPU_DOWN_PREPARE:
+	case CPU_DOWN_PREPARE_FROZEN:
+		/* Don't need to wait until next removal operation. */
+		/* rcu_migrate_head is protected by cpu_add_remove_lock */
+		wait_migrated_callbacks();
+		break;
+	case CPU_DYING:
+	case CPU_DYING_FROZEN:
+		/*
+		 * preempt_disable() in on_each_cpu() prevents stop_machine(),
+		 * so when "on_each_cpu(rcu_barrier_func, (void *)type, 1);"
+		 * returns, all online cpus have queued rcu_barrier_func(),
+		 * and the dead cpu(if it exist) queues rcu_migrate_callback()s.
+		 *
+		 * These callbacks ensure _rcu_barrier() waits for all
+		 * RCU callbacks of the specified type to complete.
+		 */
+		atomic_set(&rcu_migrate_type_count, 3);
+		call_rcu_bh(rcu_migrate_head, rcu_migrate_callback);
+		call_rcu_sched(rcu_migrate_head + 1, rcu_migrate_callback);
+		call_rcu(rcu_migrate_head + 2, rcu_migrate_callback);
+		break;
 	case CPU_DEAD:
 	case CPU_DEAD_FROZEN:
 	case CPU_UP_CANCELED:

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [tip:core/rcu] rcu: Make hot-unplugged CPU relinquish its own RCU callbacks
  2009-09-29  4:50 ` [PATCH tip/core/rcu 3/3] rcu: make hot-unplugged CPU relinquish its own RCU callbacks Paul E. McKenney
@ 2009-10-01  7:46   ` tip-bot for Paul E. McKenney
  2009-10-05 19:11   ` tip-bot for Paul E. McKenney
  1 sibling, 0 replies; 13+ messages in thread
From: tip-bot for Paul E. McKenney @ 2009-10-01  7:46 UTC (permalink / raw)
  To: linux-tip-commits
  Cc: linux-kernel, mathieu.desnoyers, paulmck, hpa, mingo, tglx, mingo

Commit-ID:  69e1a644f2ee46829fa3e8256c3e958a9ea1d5b8
Gitweb:     http://git.kernel.org/tip/69e1a644f2ee46829fa3e8256c3e958a9ea1d5b8
Author:     Paul E. McKenney <paulmck@linux.vnet.ibm.com>
AuthorDate: Mon, 28 Sep 2009 21:50:23 -0700
Committer:  Ingo Molnar <mingo@elte.hu>
CommitDate: Thu, 1 Oct 2009 09:18:55 +0200

rcu: Make hot-unplugged CPU relinquish its own RCU callbacks

The current interaction between RCU and CPU hotplug requires that
RCU block in CPU notifiers waiting for callbacks to drain.

This can be greatly simplified by having each CPU relinquish its
own callbacks, and for both _rcu_barrier() and CPU_DEAD notifiers
to adopt all callbacks that were previously relinquished.

This change also eliminates the possibility of certain types of
hangs due to the previous practice of waiting for callbacks to be
invoked from within CPU notifiers.

If you don't ever wait, you cannot hang.

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
Acked-by: Mathieu Desnoyers <mathieu.desnoyers@polymtl.ca>
Cc: laijs@cn.fujitsu.com
Cc: dipankar@in.ibm.com
Cc: akpm@linux-foundation.org
Cc: josh@joshtriplett.org
Cc: dvhltc@us.ibm.com
Cc: niv@us.ibm.com
Cc: peterz@infradead.org
Cc: rostedt@goodmis.org
Cc: Valdis.Kletnieks@vt.edu
Cc: dhowells@redhat.com
LKML-Reference: <1254199823589-git-send-email->
Signed-off-by: Ingo Molnar <mingo@elte.hu>


---
 kernel/rcutree.c        |  151 ++++++++++++++++++++++++----------------------
 kernel/rcutree.h        |   11 +++-
 kernel/rcutree_plugin.h |   34 +++++++++++
 kernel/rcutree_trace.c  |    4 +-
 4 files changed, 125 insertions(+), 75 deletions(-)

diff --git a/kernel/rcutree.c b/kernel/rcutree.c
index 0108570..d8d9865 100644
--- a/kernel/rcutree.c
+++ b/kernel/rcutree.c
@@ -63,6 +63,9 @@
 	.gpnum = -300, \
 	.completed = -300, \
 	.onofflock = __SPIN_LOCK_UNLOCKED(&name.onofflock), \
+	.orphan_cbs_list = NULL, \
+	.orphan_cbs_tail = &name.orphan_cbs_list, \
+	.orphan_qlen = 0, \
 	.fqslock = __SPIN_LOCK_UNLOCKED(&name.fqslock), \
 	.n_force_qs = 0, \
 	.n_force_qs_ngp = 0, \
@@ -838,17 +841,63 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
 #ifdef CONFIG_HOTPLUG_CPU
 
 /*
+ * Move a dying CPU's RCU callbacks to the ->orphan_cbs_list for the
+ * specified flavor of RCU.  The callbacks will be adopted by the next
+ * _rcu_barrier() invocation or by the CPU_DEAD notifier, whichever
+ * comes first.  Because this is invoked from the CPU_DYING notifier,
+ * irqs are already disabled.
+ */
+static void rcu_send_cbs_to_orphanage(struct rcu_state *rsp)
+{
+	int i;
+	struct rcu_data *rdp = rsp->rda[smp_processor_id()];
+
+	if (rdp->nxtlist == NULL)
+		return;  /* irqs disabled, so comparison is stable. */
+	spin_lock(&rsp->onofflock);  /* irqs already disabled. */
+	*rsp->orphan_cbs_tail = rdp->nxtlist;
+	rsp->orphan_cbs_tail = rdp->nxttail[RCU_NEXT_TAIL];
+	rdp->nxtlist = NULL;
+	for (i = 0; i < RCU_NEXT_SIZE; i++)
+		rdp->nxttail[i] = &rdp->nxtlist;
+	rsp->orphan_qlen += rdp->qlen;
+	rdp->qlen = 0;
+	spin_unlock(&rsp->onofflock);  /* irqs remain disabled. */
+}
+
+/*
+ * Adopt previously orphaned RCU callbacks.
+ */
+static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
+{
+	unsigned long flags;
+	struct rcu_data *rdp;
+
+	spin_lock_irqsave(&rsp->onofflock, flags);
+	rdp = rsp->rda[smp_processor_id()];
+	if (rsp->orphan_cbs_list == NULL) {
+		spin_unlock_irqrestore(&rsp->onofflock, flags);
+		return;
+	}
+	*rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_cbs_list;
+	rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_cbs_tail;
+	rdp->qlen += rsp->orphan_qlen;
+	rsp->orphan_cbs_list = NULL;
+	rsp->orphan_cbs_tail = &rsp->orphan_cbs_list;
+	rsp->orphan_qlen = 0;
+	spin_unlock_irqrestore(&rsp->onofflock, flags);
+}
+
+/*
  * Remove the outgoing CPU from the bitmasks in the rcu_node hierarchy
  * and move all callbacks from the outgoing CPU to the current one.
  */
 static void __rcu_offline_cpu(int cpu, struct rcu_state *rsp)
 {
-	int i;
 	unsigned long flags;
 	long lastcomp;
 	unsigned long mask;
 	struct rcu_data *rdp = rsp->rda[cpu];
-	struct rcu_data *rdp_me;
 	struct rcu_node *rnp;
 
 	/* Exclude any attempts to start a new grace period. */
@@ -871,32 +920,9 @@ static void __rcu_offline_cpu(int cpu, struct rcu_state *rsp)
 	} while (rnp != NULL);
 	lastcomp = rsp->completed;
 
-	spin_unlock(&rsp->onofflock);		/* irqs remain disabled. */
+	spin_unlock_irqrestore(&rsp->onofflock, flags);
 
-	/*
-	 * Move callbacks from the outgoing CPU to the running CPU.
-	 * Note that the outgoing CPU is now quiescent, so it is now
-	 * (uncharacteristically) safe to access its rcu_data structure.
-	 * Note also that we must carefully retain the order of the
-	 * outgoing CPU's callbacks in order for rcu_barrier() to work
-	 * correctly.  Finally, note that we start all the callbacks
-	 * afresh, even those that have passed through a grace period
-	 * and are therefore ready to invoke.  The theory is that hotplug
-	 * events are rare, and that if they are frequent enough to
-	 * indefinitely delay callbacks, you have far worse things to
-	 * be worrying about.
-	 */
-	if (rdp->nxtlist != NULL) {
-		rdp_me = rsp->rda[smp_processor_id()];
-		*rdp_me->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist;
-		rdp_me->nxttail[RCU_NEXT_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
-		rdp->nxtlist = NULL;
-		for (i = 0; i < RCU_NEXT_SIZE; i++)
-			rdp->nxttail[i] = &rdp->nxtlist;
-		rdp_me->qlen += rdp->qlen;
-		rdp->qlen = 0;
-	}
-	local_irq_restore(flags);
+	rcu_adopt_orphan_cbs(rsp);
 }
 
 /*
@@ -914,6 +940,14 @@ static void rcu_offline_cpu(int cpu)
 
 #else /* #ifdef CONFIG_HOTPLUG_CPU */
 
+static void rcu_send_cbs_to_orphanage(struct rcu_state *rsp)
+{
+}
+
+static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
+{
+}
+
 static void rcu_offline_cpu(int cpu)
 {
 }
@@ -1367,9 +1401,6 @@ static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
 static atomic_t rcu_barrier_cpu_count;
 static DEFINE_MUTEX(rcu_barrier_mutex);
 static struct completion rcu_barrier_completion;
-static atomic_t rcu_migrate_type_count = ATOMIC_INIT(0);
-static struct rcu_head rcu_migrate_head[3];
-static DECLARE_WAIT_QUEUE_HEAD(rcu_migrate_wq);
 
 static void rcu_barrier_callback(struct rcu_head *notused)
 {
@@ -1392,21 +1423,16 @@ static void rcu_barrier_func(void *type)
 	call_rcu_func(head, rcu_barrier_callback);
 }
 
-static inline void wait_migrated_callbacks(void)
-{
-	wait_event(rcu_migrate_wq, !atomic_read(&rcu_migrate_type_count));
-	smp_mb(); /* In case we didn't sleep. */
-}
-
 /*
  * Orchestrate the specified type of RCU barrier, waiting for all
  * RCU callbacks of the specified type to complete.
  */
-static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
+static void _rcu_barrier(struct rcu_state *rsp,
+			 void (*call_rcu_func)(struct rcu_head *head,
 					       void (*func)(struct rcu_head *head)))
 {
 	BUG_ON(in_interrupt());
-	/* Take cpucontrol mutex to protect against CPU hotplug */
+	/* Take mutex to serialize concurrent rcu_barrier() requests. */
 	mutex_lock(&rcu_barrier_mutex);
 	init_completion(&rcu_barrier_completion);
 	/*
@@ -1419,29 +1445,22 @@ static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
 	 * early.
 	 */
 	atomic_set(&rcu_barrier_cpu_count, 1);
+	preempt_disable(); /* stop CPU_DYING from filling orphan_cbs_list */
+	rcu_adopt_orphan_cbs(rsp);
 	on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
+	preempt_enable(); /* CPU_DYING can again fill orphan_cbs_list */
 	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
 		complete(&rcu_barrier_completion);
 	wait_for_completion(&rcu_barrier_completion);
 	mutex_unlock(&rcu_barrier_mutex);
-	wait_migrated_callbacks();
-}
-
-/**
- * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
- */
-void rcu_barrier(void)
-{
-	_rcu_barrier(call_rcu);
 }
-EXPORT_SYMBOL_GPL(rcu_barrier);
 
 /**
  * rcu_barrier_bh - Wait until all in-flight call_rcu_bh() callbacks complete.
  */
 void rcu_barrier_bh(void)
 {
-	_rcu_barrier(call_rcu_bh);
+	_rcu_barrier(&rcu_bh_state, call_rcu_bh);
 }
 EXPORT_SYMBOL_GPL(rcu_barrier_bh);
 
@@ -1450,16 +1469,10 @@ EXPORT_SYMBOL_GPL(rcu_barrier_bh);
  */
 void rcu_barrier_sched(void)
 {
-	_rcu_barrier(call_rcu_sched);
+	_rcu_barrier(&rcu_sched_state, call_rcu_sched);
 }
 EXPORT_SYMBOL_GPL(rcu_barrier_sched);
 
-static void rcu_migrate_callback(struct rcu_head *notused)
-{
-	if (atomic_dec_and_test(&rcu_migrate_type_count))
-		wake_up(&rcu_migrate_wq);
-}
-
 /*
  * Do boot-time initialization of a CPU's per-CPU RCU data.
  */
@@ -1556,27 +1569,21 @@ int __cpuinit rcu_cpu_notify(struct notifier_block *self,
 	case CPU_UP_PREPARE_FROZEN:
 		rcu_online_cpu(cpu);
 		break;
-	case CPU_DOWN_PREPARE:
-	case CPU_DOWN_PREPARE_FROZEN:
-		/* Don't need to wait until next removal operation. */
-		/* rcu_migrate_head is protected by cpu_add_remove_lock */
-		wait_migrated_callbacks();
-		break;
 	case CPU_DYING:
 	case CPU_DYING_FROZEN:
 		/*
-		 * preempt_disable() in on_each_cpu() prevents stop_machine(),
+		 * preempt_disable() in _rcu_barrier() prevents stop_machine(),
 		 * so when "on_each_cpu(rcu_barrier_func, (void *)type, 1);"
-		 * returns, all online cpus have queued rcu_barrier_func(),
-		 * and the dead cpu(if it exist) queues rcu_migrate_callback()s.
-		 *
-		 * These callbacks ensure _rcu_barrier() waits for all
-		 * RCU callbacks of the specified type to complete.
+		 * returns, all online cpus have queued rcu_barrier_func().
+		 * The dying CPU clears its cpu_online_mask bit and
+		 * moves all of its RCU callbacks to ->orphan_cbs_list
+		 * in the context of stop_machine(), so subsequent calls
+		 * to _rcu_barrier() will adopt these callbacks and only
+		 * then queue rcu_barrier_func() on all remaining CPUs.
 		 */
-		atomic_set(&rcu_migrate_type_count, 3);
-		call_rcu_bh(rcu_migrate_head, rcu_migrate_callback);
-		call_rcu_sched(rcu_migrate_head + 1, rcu_migrate_callback);
-		call_rcu(rcu_migrate_head + 2, rcu_migrate_callback);
+		rcu_send_cbs_to_orphanage(&rcu_bh_state);
+		rcu_send_cbs_to_orphanage(&rcu_sched_state);
+		rcu_preempt_send_cbs_to_orphanage();
 		break;
 	case CPU_DEAD:
 	case CPU_DEAD_FROZEN:
diff --git a/kernel/rcutree.h b/kernel/rcutree.h
index 676eecd..b40ac57 100644
--- a/kernel/rcutree.h
+++ b/kernel/rcutree.h
@@ -244,7 +244,15 @@ struct rcu_state {
 	/* End  of fields guarded by root rcu_node's lock. */
 
 	spinlock_t onofflock;			/* exclude on/offline and */
-						/*  starting new GP. */
+						/*  starting new GP.  Also */
+						/*  protects the following */
+						/*  orphan_cbs fields. */
+	struct rcu_head *orphan_cbs_list;	/* list of rcu_head structs */
+						/*  orphaned by all CPUs in */
+						/*  a given leaf rcu_node */
+						/*  going offline. */
+	struct rcu_head **orphan_cbs_tail;	/* And tail pointer. */
+	long orphan_qlen;			/* Number of orphaned cbs. */
 	spinlock_t fqslock;			/* Only one task forcing */
 						/*  quiescent states. */
 	unsigned long jiffies_force_qs;		/* Time at which to invoke */
@@ -305,6 +313,7 @@ void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu));
 static int rcu_preempt_pending(int cpu);
 static int rcu_preempt_needs_cpu(int cpu);
 static void __cpuinit rcu_preempt_init_percpu_data(int cpu);
+static void rcu_preempt_send_cbs_to_orphanage(void);
 static void __init __rcu_init_preempt(void);
 
 #endif /* #else #ifdef RCU_TREE_NONCORE */
diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h
index 57200fe..c0cb783 100644
--- a/kernel/rcutree_plugin.h
+++ b/kernel/rcutree_plugin.h
@@ -410,6 +410,15 @@ static int rcu_preempt_needs_cpu(int cpu)
 	return !!per_cpu(rcu_preempt_data, cpu).nxtlist;
 }
 
+/**
+ * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
+ */
+void rcu_barrier(void)
+{
+	_rcu_barrier(&rcu_preempt_state, call_rcu);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier);
+
 /*
  * Initialize preemptable RCU's per-CPU data.
  */
@@ -419,6 +428,14 @@ static void __cpuinit rcu_preempt_init_percpu_data(int cpu)
 }
 
 /*
+ * Move preemptable RCU's callbacks to ->orphan_cbs_list.
+ */
+static void rcu_preempt_send_cbs_to_orphanage(void)
+{
+	rcu_send_cbs_to_orphanage(&rcu_preempt_state);
+}
+
+/*
  * Initialize preemptable RCU's state structures.
  */
 static void __init __rcu_init_preempt(void)
@@ -564,6 +581,16 @@ static int rcu_preempt_needs_cpu(int cpu)
 }
 
 /*
+ * Because preemptable RCU does not exist, rcu_barrier() is just
+ * another name for rcu_barrier_sched().
+ */
+void rcu_barrier(void)
+{
+	rcu_barrier_sched();
+}
+EXPORT_SYMBOL_GPL(rcu_barrier);
+
+/*
  * Because preemptable RCU does not exist, there is no per-CPU
  * data to initialize.
  */
@@ -572,6 +599,13 @@ static void __cpuinit rcu_preempt_init_percpu_data(int cpu)
 }
 
 /*
+ * Because there is no preemptable RCU, there are no callbacks to move.
+ */
+static void rcu_preempt_send_cbs_to_orphanage(void)
+{
+}
+
+/*
  * Because preemptable RCU does not exist, it need not be initialized.
  */
 static void __init __rcu_init_preempt(void)
diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
index f09af28..4b31c77 100644
--- a/kernel/rcutree_trace.c
+++ b/kernel/rcutree_trace.c
@@ -159,13 +159,13 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp)
 	struct rcu_node *rnp;
 
 	seq_printf(m, "c=%ld g=%ld s=%d jfq=%ld j=%x "
-		      "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu\n",
+		      "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld\n",
 		   rsp->completed, rsp->gpnum, rsp->signaled,
 		   (long)(rsp->jiffies_force_qs - jiffies),
 		   (int)(jiffies & 0xffff),
 		   rsp->n_force_qs, rsp->n_force_qs_ngp,
 		   rsp->n_force_qs - rsp->n_force_qs_ngp,
-		   rsp->n_force_qs_lh);
+		   rsp->n_force_qs_lh, rsp->orphan_qlen);
 	for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) {
 		if (rnp->level != level) {
 			seq_puts(m, "\n");

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [tip:core/rcu] rcu: Replace the rcu_barrier enum with pointer to call_rcu*() function
  2009-09-29  4:50 ` [PATCH tip/core/rcu 1/3] rcu: replace the rcu_barrier enum with pointer to call_rcu*() function Paul E. McKenney
  2009-10-01  7:46   ` [tip:core/rcu] rcu: Replace " tip-bot for Paul E. McKenney
@ 2009-10-05 19:10   ` tip-bot for Paul E. McKenney
  1 sibling, 0 replies; 13+ messages in thread
From: tip-bot for Paul E. McKenney @ 2009-10-05 19:10 UTC (permalink / raw)
  To: linux-tip-commits
  Cc: linux-kernel, mathieu.desnoyers, paulmck, hpa, mingo, tglx, mingo

Commit-ID:  135c8aea557cf53abe6c8847e286d01442124193
Gitweb:     http://git.kernel.org/tip/135c8aea557cf53abe6c8847e286d01442124193
Author:     Paul E. McKenney <paulmck@linux.vnet.ibm.com>
AuthorDate: Mon, 28 Sep 2009 21:50:21 -0700
Committer:  Ingo Molnar <mingo@elte.hu>
CommitDate: Mon, 5 Oct 2009 21:02:05 +0200

rcu: Replace the rcu_barrier enum with pointer to call_rcu*() function

The rcu_barrier enum causes several problems:

  (1) you have to define the enum somewhere, and there is no
      convenient place,

  (2) the difference between TREE_RCU and TREE_PREEMPT_RCU causes
      problems when you need to map from rcu_barrier enum to struct
      rcu_state,

  (3) the switch statement are large, and

  (4) TINY_RCU really needs a different rcu_barrier() than do the
      treercu implementations.

So replace it with a functionally equivalent but cleaner function
pointer abstraction.

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
Acked-by: Mathieu Desnoyers <mathieu.desnoyers@polymtl.ca>
Cc: laijs@cn.fujitsu.com
Cc: dipankar@in.ibm.com
Cc: akpm@linux-foundation.org
Cc: josh@joshtriplett.org
Cc: dvhltc@us.ibm.com
Cc: niv@us.ibm.com
Cc: peterz@infradead.org
Cc: rostedt@goodmis.org
Cc: Valdis.Kletnieks@vt.edu
Cc: dhowells@redhat.com
LKML-Reference: <12541998232366-git-send-email->
Signed-off-by: Ingo Molnar <mingo@elte.hu>


---
 kernel/rcupdate.c |   32 ++++++++++----------------------
 1 files changed, 10 insertions(+), 22 deletions(-)

diff --git a/kernel/rcupdate.c b/kernel/rcupdate.c
index 4a189ea..e432422 100644
--- a/kernel/rcupdate.c
+++ b/kernel/rcupdate.c
@@ -53,12 +53,6 @@ struct lockdep_map rcu_lock_map =
 EXPORT_SYMBOL_GPL(rcu_lock_map);
 #endif
 
-enum rcu_barrier {
-	RCU_BARRIER_STD,
-	RCU_BARRIER_BH,
-	RCU_BARRIER_SCHED,
-};
-
 static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
 static atomic_t rcu_barrier_cpu_count;
 static DEFINE_MUTEX(rcu_barrier_mutex);
@@ -184,19 +178,12 @@ static void rcu_barrier_func(void *type)
 {
 	int cpu = smp_processor_id();
 	struct rcu_head *head = &per_cpu(rcu_barrier_head, cpu);
+	void (*call_rcu_func)(struct rcu_head *head,
+			      void (*func)(struct rcu_head *head));
 
 	atomic_inc(&rcu_barrier_cpu_count);
-	switch ((enum rcu_barrier)type) {
-	case RCU_BARRIER_STD:
-		call_rcu(head, rcu_barrier_callback);
-		break;
-	case RCU_BARRIER_BH:
-		call_rcu_bh(head, rcu_barrier_callback);
-		break;
-	case RCU_BARRIER_SCHED:
-		call_rcu_sched(head, rcu_barrier_callback);
-		break;
-	}
+	call_rcu_func = type;
+	call_rcu_func(head, rcu_barrier_callback);
 }
 
 static inline void wait_migrated_callbacks(void)
@@ -209,7 +196,8 @@ static inline void wait_migrated_callbacks(void)
  * Orchestrate the specified type of RCU barrier, waiting for all
  * RCU callbacks of the specified type to complete.
  */
-static void _rcu_barrier(enum rcu_barrier type)
+static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
+					       void (*func)(struct rcu_head *head)))
 {
 	BUG_ON(in_interrupt());
 	/* Take cpucontrol mutex to protect against CPU hotplug */
@@ -225,7 +213,7 @@ static void _rcu_barrier(enum rcu_barrier type)
 	 * early.
 	 */
 	atomic_set(&rcu_barrier_cpu_count, 1);
-	on_each_cpu(rcu_barrier_func, (void *)type, 1);
+	on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
 	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
 		complete(&rcu_barrier_completion);
 	wait_for_completion(&rcu_barrier_completion);
@@ -238,7 +226,7 @@ static void _rcu_barrier(enum rcu_barrier type)
  */
 void rcu_barrier(void)
 {
-	_rcu_barrier(RCU_BARRIER_STD);
+	_rcu_barrier(call_rcu);
 }
 EXPORT_SYMBOL_GPL(rcu_barrier);
 
@@ -247,7 +235,7 @@ EXPORT_SYMBOL_GPL(rcu_barrier);
  */
 void rcu_barrier_bh(void)
 {
-	_rcu_barrier(RCU_BARRIER_BH);
+	_rcu_barrier(call_rcu_bh);
 }
 EXPORT_SYMBOL_GPL(rcu_barrier_bh);
 
@@ -256,7 +244,7 @@ EXPORT_SYMBOL_GPL(rcu_barrier_bh);
  */
 void rcu_barrier_sched(void)
 {
-	_rcu_barrier(RCU_BARRIER_SCHED);
+	_rcu_barrier(call_rcu_sched);
 }
 EXPORT_SYMBOL_GPL(rcu_barrier_sched);
 

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [tip:core/rcu] rcu: Move rcu_barrier() to rcutree, make lightweight rcu_barrier() for rcutiny
  2009-09-29  4:50 ` [PATCH tip/core/rcu 2/3] rcu: move rcu_barrier() to rcutree, make lightweight rcu_barrier() for rcutiny Paul E. McKenney
  2009-10-01  7:46   ` [tip:core/rcu] rcu: Move " tip-bot for Paul E. McKenney
@ 2009-10-05 19:10   ` tip-bot for Paul E. McKenney
  1 sibling, 0 replies; 13+ messages in thread
From: tip-bot for Paul E. McKenney @ 2009-10-05 19:10 UTC (permalink / raw)
  To: linux-tip-commits
  Cc: linux-kernel, mathieu.desnoyers, paulmck, hpa, mingo, tglx, mingo

Commit-ID:  eddd96296949009aa10a6f41ebf01d14420f6dec
Gitweb:     http://git.kernel.org/tip/eddd96296949009aa10a6f41ebf01d14420f6dec
Author:     Paul E. McKenney <paulmck@linux.vnet.ibm.com>
AuthorDate: Mon, 28 Sep 2009 21:50:22 -0700
Committer:  Ingo Molnar <mingo@elte.hu>
CommitDate: Mon, 5 Oct 2009 21:05:39 +0200

rcu: Move rcu_barrier() to rcutree, make lightweight rcu_barrier() for rcutiny

Move the existing rcu_barrier() implementation to rcutree.c,
permitting creation of a smaller and lighter-weight implementation
for rcutiny.c (which is equivalent to rcutree.c's synchronize_rcu()
because rcutiny.c supports but one CPU).

This opens the way to simplify and fix rcutree.c's rcu_barrier()
implementation in a later patch.

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
Acked-by: Mathieu Desnoyers <mathieu.desnoyers@polymtl.ca>
Cc: laijs@cn.fujitsu.com
Cc: dipankar@in.ibm.com
Cc: akpm@linux-foundation.org
Cc: josh@joshtriplett.org
Cc: dvhltc@us.ibm.com
Cc: niv@us.ibm.com
Cc: peterz@infradead.org
Cc: rostedt@goodmis.org
Cc: Valdis.Kletnieks@vt.edu
Cc: dhowells@redhat.com
LKML-Reference: <12541998233817-git-send-email->
Signed-off-by: Ingo Molnar <mingo@elte.hu>


---
 kernel/rcupdate.c |  120 +----------------------------------------------------
 kernel/rcutiny.c  |   36 ++++++++++++++++
 kernel/rcutree.c  |  119 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 156 insertions(+), 119 deletions(-)

diff --git a/kernel/rcupdate.c b/kernel/rcupdate.c
index fd3ec49..7625f20 100644
--- a/kernel/rcupdate.c
+++ b/kernel/rcupdate.c
@@ -53,16 +53,8 @@ struct lockdep_map rcu_lock_map =
 EXPORT_SYMBOL_GPL(rcu_lock_map);
 #endif
 
-static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
-static atomic_t rcu_barrier_cpu_count;
-static DEFINE_MUTEX(rcu_barrier_mutex);
-static struct completion rcu_barrier_completion;
 int rcu_scheduler_active __read_mostly;
 
-static atomic_t rcu_migrate_type_count = ATOMIC_INIT(0);
-static struct rcu_head rcu_migrate_head[3];
-static DECLARE_WAIT_QUEUE_HEAD(rcu_migrate_wq);
-
 /*
  * Awaken the corresponding synchronize_rcu() instance now that a
  * grace period has elapsed.
@@ -169,120 +161,10 @@ EXPORT_SYMBOL_GPL(synchronize_rcu_bh);
 
 #endif /* #ifndef CONFIG_TINY_RCU */
 
-static void rcu_barrier_callback(struct rcu_head *notused)
-{
-	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
-		complete(&rcu_barrier_completion);
-}
-
-/*
- * Called with preemption disabled, and from cross-cpu IRQ context.
- */
-static void rcu_barrier_func(void *type)
-{
-	int cpu = smp_processor_id();
-	struct rcu_head *head = &per_cpu(rcu_barrier_head, cpu);
-	void (*call_rcu_func)(struct rcu_head *head,
-			      void (*func)(struct rcu_head *head));
-
-	atomic_inc(&rcu_barrier_cpu_count);
-	call_rcu_func = type;
-	call_rcu_func(head, rcu_barrier_callback);
-}
-
-static inline void wait_migrated_callbacks(void)
-{
-	wait_event(rcu_migrate_wq, !atomic_read(&rcu_migrate_type_count));
-	smp_mb(); /* In case we didn't sleep. */
-}
-
-/*
- * Orchestrate the specified type of RCU barrier, waiting for all
- * RCU callbacks of the specified type to complete.
- */
-static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
-					       void (*func)(struct rcu_head *head)))
-{
-	BUG_ON(in_interrupt());
-	/* Take cpucontrol mutex to protect against CPU hotplug */
-	mutex_lock(&rcu_barrier_mutex);
-	init_completion(&rcu_barrier_completion);
-	/*
-	 * Initialize rcu_barrier_cpu_count to 1, then invoke
-	 * rcu_barrier_func() on each CPU, so that each CPU also has
-	 * incremented rcu_barrier_cpu_count.  Only then is it safe to
-	 * decrement rcu_barrier_cpu_count -- otherwise the first CPU
-	 * might complete its grace period before all of the other CPUs
-	 * did their increment, causing this function to return too
-	 * early.
-	 */
-	atomic_set(&rcu_barrier_cpu_count, 1);
-	on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
-	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
-		complete(&rcu_barrier_completion);
-	wait_for_completion(&rcu_barrier_completion);
-	mutex_unlock(&rcu_barrier_mutex);
-	wait_migrated_callbacks();
-}
-
-/**
- * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
- */
-void rcu_barrier(void)
-{
-	_rcu_barrier(call_rcu);
-}
-EXPORT_SYMBOL_GPL(rcu_barrier);
-
-/**
- * rcu_barrier_bh - Wait until all in-flight call_rcu_bh() callbacks complete.
- */
-void rcu_barrier_bh(void)
-{
-	_rcu_barrier(call_rcu_bh);
-}
-EXPORT_SYMBOL_GPL(rcu_barrier_bh);
-
-/**
- * rcu_barrier_sched - Wait for in-flight call_rcu_sched() callbacks.
- */
-void rcu_barrier_sched(void)
-{
-	_rcu_barrier(call_rcu_sched);
-}
-EXPORT_SYMBOL_GPL(rcu_barrier_sched);
-
-static void rcu_migrate_callback(struct rcu_head *notused)
-{
-	if (atomic_dec_and_test(&rcu_migrate_type_count))
-		wake_up(&rcu_migrate_wq);
-}
-
 static int __cpuinit rcu_barrier_cpu_hotplug(struct notifier_block *self,
 		unsigned long action, void *hcpu)
 {
-	rcu_cpu_notify(self, action, hcpu);
-	if (action == CPU_DYING) {
-		/*
-		 * preempt_disable() in on_each_cpu() prevents stop_machine(),
-		 * so when "on_each_cpu(rcu_barrier_func, (void *)type, 1);"
-		 * returns, all online cpus have queued rcu_barrier_func(),
-		 * and the dead cpu(if it exist) queues rcu_migrate_callback()s.
-		 *
-		 * These callbacks ensure _rcu_barrier() waits for all
-		 * RCU callbacks of the specified type to complete.
-		 */
-		atomic_set(&rcu_migrate_type_count, 3);
-		call_rcu_bh(rcu_migrate_head, rcu_migrate_callback);
-		call_rcu_sched(rcu_migrate_head + 1, rcu_migrate_callback);
-		call_rcu(rcu_migrate_head + 2, rcu_migrate_callback);
-	} else if (action == CPU_DOWN_PREPARE) {
-		/* Don't need to wait until next removal operation. */
-		/* rcu_migrate_head is protected by cpu_add_remove_lock */
-		wait_migrated_callbacks();
-	}
-
-	return NOTIFY_OK;
+	return rcu_cpu_notify(self, action, hcpu);
 }
 
 void __init rcu_init(void)
diff --git a/kernel/rcutiny.c b/kernel/rcutiny.c
index 070c65f..89124b0 100644
--- a/kernel/rcutiny.c
+++ b/kernel/rcutiny.c
@@ -239,6 +239,42 @@ void call_rcu_bh(struct rcu_head *head,
 }
 EXPORT_SYMBOL_GPL(call_rcu_bh);
 
+void rcu_barrier(void)
+{
+	struct rcu_synchronize rcu;
+
+	init_completion(&rcu.completion);
+	/* Will wake me after RCU finished. */
+	call_rcu(&rcu.head, wakeme_after_rcu);
+	/* Wait for it. */
+	wait_for_completion(&rcu.completion);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier);
+
+void rcu_barrier_bh(void)
+{
+	struct rcu_synchronize rcu;
+
+	init_completion(&rcu.completion);
+	/* Will wake me after RCU finished. */
+	call_rcu_bh(&rcu.head, wakeme_after_rcu);
+	/* Wait for it. */
+	wait_for_completion(&rcu.completion);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier_bh);
+
+void rcu_barrier_sched(void)
+{
+	struct rcu_synchronize rcu;
+
+	init_completion(&rcu.completion);
+	/* Will wake me after RCU finished. */
+	call_rcu_sched(&rcu.head, wakeme_after_rcu);
+	/* Wait for it. */
+	wait_for_completion(&rcu.completion);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier_sched);
+
 void __rcu_init(void)
 {
 	open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
diff --git a/kernel/rcutree.c b/kernel/rcutree.c
index e2e272b..0108570 100644
--- a/kernel/rcutree.c
+++ b/kernel/rcutree.c
@@ -1363,6 +1363,103 @@ int rcu_needs_cpu(int cpu)
 	       rcu_preempt_needs_cpu(cpu);
 }
 
+static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
+static atomic_t rcu_barrier_cpu_count;
+static DEFINE_MUTEX(rcu_barrier_mutex);
+static struct completion rcu_barrier_completion;
+static atomic_t rcu_migrate_type_count = ATOMIC_INIT(0);
+static struct rcu_head rcu_migrate_head[3];
+static DECLARE_WAIT_QUEUE_HEAD(rcu_migrate_wq);
+
+static void rcu_barrier_callback(struct rcu_head *notused)
+{
+	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
+		complete(&rcu_barrier_completion);
+}
+
+/*
+ * Called with preemption disabled, and from cross-cpu IRQ context.
+ */
+static void rcu_barrier_func(void *type)
+{
+	int cpu = smp_processor_id();
+	struct rcu_head *head = &per_cpu(rcu_barrier_head, cpu);
+	void (*call_rcu_func)(struct rcu_head *head,
+			      void (*func)(struct rcu_head *head));
+
+	atomic_inc(&rcu_barrier_cpu_count);
+	call_rcu_func = type;
+	call_rcu_func(head, rcu_barrier_callback);
+}
+
+static inline void wait_migrated_callbacks(void)
+{
+	wait_event(rcu_migrate_wq, !atomic_read(&rcu_migrate_type_count));
+	smp_mb(); /* In case we didn't sleep. */
+}
+
+/*
+ * Orchestrate the specified type of RCU barrier, waiting for all
+ * RCU callbacks of the specified type to complete.
+ */
+static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
+					       void (*func)(struct rcu_head *head)))
+{
+	BUG_ON(in_interrupt());
+	/* Take cpucontrol mutex to protect against CPU hotplug */
+	mutex_lock(&rcu_barrier_mutex);
+	init_completion(&rcu_barrier_completion);
+	/*
+	 * Initialize rcu_barrier_cpu_count to 1, then invoke
+	 * rcu_barrier_func() on each CPU, so that each CPU also has
+	 * incremented rcu_barrier_cpu_count.  Only then is it safe to
+	 * decrement rcu_barrier_cpu_count -- otherwise the first CPU
+	 * might complete its grace period before all of the other CPUs
+	 * did their increment, causing this function to return too
+	 * early.
+	 */
+	atomic_set(&rcu_barrier_cpu_count, 1);
+	on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
+	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
+		complete(&rcu_barrier_completion);
+	wait_for_completion(&rcu_barrier_completion);
+	mutex_unlock(&rcu_barrier_mutex);
+	wait_migrated_callbacks();
+}
+
+/**
+ * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
+ */
+void rcu_barrier(void)
+{
+	_rcu_barrier(call_rcu);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier);
+
+/**
+ * rcu_barrier_bh - Wait until all in-flight call_rcu_bh() callbacks complete.
+ */
+void rcu_barrier_bh(void)
+{
+	_rcu_barrier(call_rcu_bh);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier_bh);
+
+/**
+ * rcu_barrier_sched - Wait for in-flight call_rcu_sched() callbacks.
+ */
+void rcu_barrier_sched(void)
+{
+	_rcu_barrier(call_rcu_sched);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier_sched);
+
+static void rcu_migrate_callback(struct rcu_head *notused)
+{
+	if (atomic_dec_and_test(&rcu_migrate_type_count))
+		wake_up(&rcu_migrate_wq);
+}
+
 /*
  * Do boot-time initialization of a CPU's per-CPU RCU data.
  */
@@ -1459,6 +1556,28 @@ int __cpuinit rcu_cpu_notify(struct notifier_block *self,
 	case CPU_UP_PREPARE_FROZEN:
 		rcu_online_cpu(cpu);
 		break;
+	case CPU_DOWN_PREPARE:
+	case CPU_DOWN_PREPARE_FROZEN:
+		/* Don't need to wait until next removal operation. */
+		/* rcu_migrate_head is protected by cpu_add_remove_lock */
+		wait_migrated_callbacks();
+		break;
+	case CPU_DYING:
+	case CPU_DYING_FROZEN:
+		/*
+		 * preempt_disable() in on_each_cpu() prevents stop_machine(),
+		 * so when "on_each_cpu(rcu_barrier_func, (void *)type, 1);"
+		 * returns, all online cpus have queued rcu_barrier_func(),
+		 * and the dead cpu(if it exist) queues rcu_migrate_callback()s.
+		 *
+		 * These callbacks ensure _rcu_barrier() waits for all
+		 * RCU callbacks of the specified type to complete.
+		 */
+		atomic_set(&rcu_migrate_type_count, 3);
+		call_rcu_bh(rcu_migrate_head, rcu_migrate_callback);
+		call_rcu_sched(rcu_migrate_head + 1, rcu_migrate_callback);
+		call_rcu(rcu_migrate_head + 2, rcu_migrate_callback);
+		break;
 	case CPU_DEAD:
 	case CPU_DEAD_FROZEN:
 	case CPU_UP_CANCELED:

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [tip:core/rcu] rcu: Make hot-unplugged CPU relinquish its own RCU callbacks
  2009-09-29  4:50 ` [PATCH tip/core/rcu 3/3] rcu: make hot-unplugged CPU relinquish its own RCU callbacks Paul E. McKenney
  2009-10-01  7:46   ` [tip:core/rcu] rcu: Make " tip-bot for Paul E. McKenney
@ 2009-10-05 19:11   ` tip-bot for Paul E. McKenney
  1 sibling, 0 replies; 13+ messages in thread
From: tip-bot for Paul E. McKenney @ 2009-10-05 19:11 UTC (permalink / raw)
  To: linux-tip-commits
  Cc: linux-kernel, mathieu.desnoyers, paulmck, hpa, mingo, tglx, mingo

Commit-ID:  3ffea791e80d295eb8f035b41703be80dfbc15b1
Gitweb:     http://git.kernel.org/tip/3ffea791e80d295eb8f035b41703be80dfbc15b1
Author:     Paul E. McKenney <paulmck@linux.vnet.ibm.com>
AuthorDate: Mon, 28 Sep 2009 21:50:23 -0700
Committer:  Ingo Molnar <mingo@elte.hu>
CommitDate: Mon, 5 Oct 2009 21:05:41 +0200

rcu: Make hot-unplugged CPU relinquish its own RCU callbacks

The current interaction between RCU and CPU hotplug requires that
RCU block in CPU notifiers waiting for callbacks to drain.

This can be greatly simplified by having each CPU relinquish its
own callbacks, and for both _rcu_barrier() and CPU_DEAD notifiers
to adopt all callbacks that were previously relinquished.

This change also eliminates the possibility of certain types of
hangs due to the previous practice of waiting for callbacks to be
invoked from within CPU notifiers.

If you don't ever wait, you cannot hang.

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
Acked-by: Mathieu Desnoyers <mathieu.desnoyers@polymtl.ca>
Cc: laijs@cn.fujitsu.com
Cc: dipankar@in.ibm.com
Cc: akpm@linux-foundation.org
Cc: josh@joshtriplett.org
Cc: dvhltc@us.ibm.com
Cc: niv@us.ibm.com
Cc: peterz@infradead.org
Cc: rostedt@goodmis.org
Cc: Valdis.Kletnieks@vt.edu
Cc: dhowells@redhat.com
LKML-Reference: <1254199823589-git-send-email->
Signed-off-by: Ingo Molnar <mingo@elte.hu>


---
 kernel/rcutree.c        |  151 ++++++++++++++++++++++++----------------------
 kernel/rcutree.h        |   11 +++-
 kernel/rcutree_plugin.h |   34 +++++++++++
 kernel/rcutree_trace.c  |    4 +-
 4 files changed, 125 insertions(+), 75 deletions(-)

diff --git a/kernel/rcutree.c b/kernel/rcutree.c
index 0108570..d8d9865 100644
--- a/kernel/rcutree.c
+++ b/kernel/rcutree.c
@@ -63,6 +63,9 @@
 	.gpnum = -300, \
 	.completed = -300, \
 	.onofflock = __SPIN_LOCK_UNLOCKED(&name.onofflock), \
+	.orphan_cbs_list = NULL, \
+	.orphan_cbs_tail = &name.orphan_cbs_list, \
+	.orphan_qlen = 0, \
 	.fqslock = __SPIN_LOCK_UNLOCKED(&name.fqslock), \
 	.n_force_qs = 0, \
 	.n_force_qs_ngp = 0, \
@@ -838,17 +841,63 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
 #ifdef CONFIG_HOTPLUG_CPU
 
 /*
+ * Move a dying CPU's RCU callbacks to the ->orphan_cbs_list for the
+ * specified flavor of RCU.  The callbacks will be adopted by the next
+ * _rcu_barrier() invocation or by the CPU_DEAD notifier, whichever
+ * comes first.  Because this is invoked from the CPU_DYING notifier,
+ * irqs are already disabled.
+ */
+static void rcu_send_cbs_to_orphanage(struct rcu_state *rsp)
+{
+	int i;
+	struct rcu_data *rdp = rsp->rda[smp_processor_id()];
+
+	if (rdp->nxtlist == NULL)
+		return;  /* irqs disabled, so comparison is stable. */
+	spin_lock(&rsp->onofflock);  /* irqs already disabled. */
+	*rsp->orphan_cbs_tail = rdp->nxtlist;
+	rsp->orphan_cbs_tail = rdp->nxttail[RCU_NEXT_TAIL];
+	rdp->nxtlist = NULL;
+	for (i = 0; i < RCU_NEXT_SIZE; i++)
+		rdp->nxttail[i] = &rdp->nxtlist;
+	rsp->orphan_qlen += rdp->qlen;
+	rdp->qlen = 0;
+	spin_unlock(&rsp->onofflock);  /* irqs remain disabled. */
+}
+
+/*
+ * Adopt previously orphaned RCU callbacks.
+ */
+static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
+{
+	unsigned long flags;
+	struct rcu_data *rdp;
+
+	spin_lock_irqsave(&rsp->onofflock, flags);
+	rdp = rsp->rda[smp_processor_id()];
+	if (rsp->orphan_cbs_list == NULL) {
+		spin_unlock_irqrestore(&rsp->onofflock, flags);
+		return;
+	}
+	*rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_cbs_list;
+	rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_cbs_tail;
+	rdp->qlen += rsp->orphan_qlen;
+	rsp->orphan_cbs_list = NULL;
+	rsp->orphan_cbs_tail = &rsp->orphan_cbs_list;
+	rsp->orphan_qlen = 0;
+	spin_unlock_irqrestore(&rsp->onofflock, flags);
+}
+
+/*
  * Remove the outgoing CPU from the bitmasks in the rcu_node hierarchy
  * and move all callbacks from the outgoing CPU to the current one.
  */
 static void __rcu_offline_cpu(int cpu, struct rcu_state *rsp)
 {
-	int i;
 	unsigned long flags;
 	long lastcomp;
 	unsigned long mask;
 	struct rcu_data *rdp = rsp->rda[cpu];
-	struct rcu_data *rdp_me;
 	struct rcu_node *rnp;
 
 	/* Exclude any attempts to start a new grace period. */
@@ -871,32 +920,9 @@ static void __rcu_offline_cpu(int cpu, struct rcu_state *rsp)
 	} while (rnp != NULL);
 	lastcomp = rsp->completed;
 
-	spin_unlock(&rsp->onofflock);		/* irqs remain disabled. */
+	spin_unlock_irqrestore(&rsp->onofflock, flags);
 
-	/*
-	 * Move callbacks from the outgoing CPU to the running CPU.
-	 * Note that the outgoing CPU is now quiescent, so it is now
-	 * (uncharacteristically) safe to access its rcu_data structure.
-	 * Note also that we must carefully retain the order of the
-	 * outgoing CPU's callbacks in order for rcu_barrier() to work
-	 * correctly.  Finally, note that we start all the callbacks
-	 * afresh, even those that have passed through a grace period
-	 * and are therefore ready to invoke.  The theory is that hotplug
-	 * events are rare, and that if they are frequent enough to
-	 * indefinitely delay callbacks, you have far worse things to
-	 * be worrying about.
-	 */
-	if (rdp->nxtlist != NULL) {
-		rdp_me = rsp->rda[smp_processor_id()];
-		*rdp_me->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist;
-		rdp_me->nxttail[RCU_NEXT_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
-		rdp->nxtlist = NULL;
-		for (i = 0; i < RCU_NEXT_SIZE; i++)
-			rdp->nxttail[i] = &rdp->nxtlist;
-		rdp_me->qlen += rdp->qlen;
-		rdp->qlen = 0;
-	}
-	local_irq_restore(flags);
+	rcu_adopt_orphan_cbs(rsp);
 }
 
 /*
@@ -914,6 +940,14 @@ static void rcu_offline_cpu(int cpu)
 
 #else /* #ifdef CONFIG_HOTPLUG_CPU */
 
+static void rcu_send_cbs_to_orphanage(struct rcu_state *rsp)
+{
+}
+
+static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
+{
+}
+
 static void rcu_offline_cpu(int cpu)
 {
 }
@@ -1367,9 +1401,6 @@ static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
 static atomic_t rcu_barrier_cpu_count;
 static DEFINE_MUTEX(rcu_barrier_mutex);
 static struct completion rcu_barrier_completion;
-static atomic_t rcu_migrate_type_count = ATOMIC_INIT(0);
-static struct rcu_head rcu_migrate_head[3];
-static DECLARE_WAIT_QUEUE_HEAD(rcu_migrate_wq);
 
 static void rcu_barrier_callback(struct rcu_head *notused)
 {
@@ -1392,21 +1423,16 @@ static void rcu_barrier_func(void *type)
 	call_rcu_func(head, rcu_barrier_callback);
 }
 
-static inline void wait_migrated_callbacks(void)
-{
-	wait_event(rcu_migrate_wq, !atomic_read(&rcu_migrate_type_count));
-	smp_mb(); /* In case we didn't sleep. */
-}
-
 /*
  * Orchestrate the specified type of RCU barrier, waiting for all
  * RCU callbacks of the specified type to complete.
  */
-static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
+static void _rcu_barrier(struct rcu_state *rsp,
+			 void (*call_rcu_func)(struct rcu_head *head,
 					       void (*func)(struct rcu_head *head)))
 {
 	BUG_ON(in_interrupt());
-	/* Take cpucontrol mutex to protect against CPU hotplug */
+	/* Take mutex to serialize concurrent rcu_barrier() requests. */
 	mutex_lock(&rcu_barrier_mutex);
 	init_completion(&rcu_barrier_completion);
 	/*
@@ -1419,29 +1445,22 @@ static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
 	 * early.
 	 */
 	atomic_set(&rcu_barrier_cpu_count, 1);
+	preempt_disable(); /* stop CPU_DYING from filling orphan_cbs_list */
+	rcu_adopt_orphan_cbs(rsp);
 	on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
+	preempt_enable(); /* CPU_DYING can again fill orphan_cbs_list */
 	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
 		complete(&rcu_barrier_completion);
 	wait_for_completion(&rcu_barrier_completion);
 	mutex_unlock(&rcu_barrier_mutex);
-	wait_migrated_callbacks();
-}
-
-/**
- * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
- */
-void rcu_barrier(void)
-{
-	_rcu_barrier(call_rcu);
 }
-EXPORT_SYMBOL_GPL(rcu_barrier);
 
 /**
  * rcu_barrier_bh - Wait until all in-flight call_rcu_bh() callbacks complete.
  */
 void rcu_barrier_bh(void)
 {
-	_rcu_barrier(call_rcu_bh);
+	_rcu_barrier(&rcu_bh_state, call_rcu_bh);
 }
 EXPORT_SYMBOL_GPL(rcu_barrier_bh);
 
@@ -1450,16 +1469,10 @@ EXPORT_SYMBOL_GPL(rcu_barrier_bh);
  */
 void rcu_barrier_sched(void)
 {
-	_rcu_barrier(call_rcu_sched);
+	_rcu_barrier(&rcu_sched_state, call_rcu_sched);
 }
 EXPORT_SYMBOL_GPL(rcu_barrier_sched);
 
-static void rcu_migrate_callback(struct rcu_head *notused)
-{
-	if (atomic_dec_and_test(&rcu_migrate_type_count))
-		wake_up(&rcu_migrate_wq);
-}
-
 /*
  * Do boot-time initialization of a CPU's per-CPU RCU data.
  */
@@ -1556,27 +1569,21 @@ int __cpuinit rcu_cpu_notify(struct notifier_block *self,
 	case CPU_UP_PREPARE_FROZEN:
 		rcu_online_cpu(cpu);
 		break;
-	case CPU_DOWN_PREPARE:
-	case CPU_DOWN_PREPARE_FROZEN:
-		/* Don't need to wait until next removal operation. */
-		/* rcu_migrate_head is protected by cpu_add_remove_lock */
-		wait_migrated_callbacks();
-		break;
 	case CPU_DYING:
 	case CPU_DYING_FROZEN:
 		/*
-		 * preempt_disable() in on_each_cpu() prevents stop_machine(),
+		 * preempt_disable() in _rcu_barrier() prevents stop_machine(),
 		 * so when "on_each_cpu(rcu_barrier_func, (void *)type, 1);"
-		 * returns, all online cpus have queued rcu_barrier_func(),
-		 * and the dead cpu(if it exist) queues rcu_migrate_callback()s.
-		 *
-		 * These callbacks ensure _rcu_barrier() waits for all
-		 * RCU callbacks of the specified type to complete.
+		 * returns, all online cpus have queued rcu_barrier_func().
+		 * The dying CPU clears its cpu_online_mask bit and
+		 * moves all of its RCU callbacks to ->orphan_cbs_list
+		 * in the context of stop_machine(), so subsequent calls
+		 * to _rcu_barrier() will adopt these callbacks and only
+		 * then queue rcu_barrier_func() on all remaining CPUs.
 		 */
-		atomic_set(&rcu_migrate_type_count, 3);
-		call_rcu_bh(rcu_migrate_head, rcu_migrate_callback);
-		call_rcu_sched(rcu_migrate_head + 1, rcu_migrate_callback);
-		call_rcu(rcu_migrate_head + 2, rcu_migrate_callback);
+		rcu_send_cbs_to_orphanage(&rcu_bh_state);
+		rcu_send_cbs_to_orphanage(&rcu_sched_state);
+		rcu_preempt_send_cbs_to_orphanage();
 		break;
 	case CPU_DEAD:
 	case CPU_DEAD_FROZEN:
diff --git a/kernel/rcutree.h b/kernel/rcutree.h
index 676eecd..b40ac57 100644
--- a/kernel/rcutree.h
+++ b/kernel/rcutree.h
@@ -244,7 +244,15 @@ struct rcu_state {
 	/* End  of fields guarded by root rcu_node's lock. */
 
 	spinlock_t onofflock;			/* exclude on/offline and */
-						/*  starting new GP. */
+						/*  starting new GP.  Also */
+						/*  protects the following */
+						/*  orphan_cbs fields. */
+	struct rcu_head *orphan_cbs_list;	/* list of rcu_head structs */
+						/*  orphaned by all CPUs in */
+						/*  a given leaf rcu_node */
+						/*  going offline. */
+	struct rcu_head **orphan_cbs_tail;	/* And tail pointer. */
+	long orphan_qlen;			/* Number of orphaned cbs. */
 	spinlock_t fqslock;			/* Only one task forcing */
 						/*  quiescent states. */
 	unsigned long jiffies_force_qs;		/* Time at which to invoke */
@@ -305,6 +313,7 @@ void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu));
 static int rcu_preempt_pending(int cpu);
 static int rcu_preempt_needs_cpu(int cpu);
 static void __cpuinit rcu_preempt_init_percpu_data(int cpu);
+static void rcu_preempt_send_cbs_to_orphanage(void);
 static void __init __rcu_init_preempt(void);
 
 #endif /* #else #ifdef RCU_TREE_NONCORE */
diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h
index 57200fe..c0cb783 100644
--- a/kernel/rcutree_plugin.h
+++ b/kernel/rcutree_plugin.h
@@ -410,6 +410,15 @@ static int rcu_preempt_needs_cpu(int cpu)
 	return !!per_cpu(rcu_preempt_data, cpu).nxtlist;
 }
 
+/**
+ * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
+ */
+void rcu_barrier(void)
+{
+	_rcu_barrier(&rcu_preempt_state, call_rcu);
+}
+EXPORT_SYMBOL_GPL(rcu_barrier);
+
 /*
  * Initialize preemptable RCU's per-CPU data.
  */
@@ -419,6 +428,14 @@ static void __cpuinit rcu_preempt_init_percpu_data(int cpu)
 }
 
 /*
+ * Move preemptable RCU's callbacks to ->orphan_cbs_list.
+ */
+static void rcu_preempt_send_cbs_to_orphanage(void)
+{
+	rcu_send_cbs_to_orphanage(&rcu_preempt_state);
+}
+
+/*
  * Initialize preemptable RCU's state structures.
  */
 static void __init __rcu_init_preempt(void)
@@ -564,6 +581,16 @@ static int rcu_preempt_needs_cpu(int cpu)
 }
 
 /*
+ * Because preemptable RCU does not exist, rcu_barrier() is just
+ * another name for rcu_barrier_sched().
+ */
+void rcu_barrier(void)
+{
+	rcu_barrier_sched();
+}
+EXPORT_SYMBOL_GPL(rcu_barrier);
+
+/*
  * Because preemptable RCU does not exist, there is no per-CPU
  * data to initialize.
  */
@@ -572,6 +599,13 @@ static void __cpuinit rcu_preempt_init_percpu_data(int cpu)
 }
 
 /*
+ * Because there is no preemptable RCU, there are no callbacks to move.
+ */
+static void rcu_preempt_send_cbs_to_orphanage(void)
+{
+}
+
+/*
  * Because preemptable RCU does not exist, it need not be initialized.
  */
 static void __init __rcu_init_preempt(void)
diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
index f09af28..4b31c77 100644
--- a/kernel/rcutree_trace.c
+++ b/kernel/rcutree_trace.c
@@ -159,13 +159,13 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp)
 	struct rcu_node *rnp;
 
 	seq_printf(m, "c=%ld g=%ld s=%d jfq=%ld j=%x "
-		      "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu\n",
+		      "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld\n",
 		   rsp->completed, rsp->gpnum, rsp->signaled,
 		   (long)(rsp->jiffies_force_qs - jiffies),
 		   (int)(jiffies & 0xffff),
 		   rsp->n_force_qs, rsp->n_force_qs_ngp,
 		   rsp->n_force_qs - rsp->n_force_qs_ngp,
-		   rsp->n_force_qs_lh);
+		   rsp->n_force_qs_lh, rsp->orphan_qlen);
 	for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) {
 		if (rnp->level != level) {
 			seq_puts(m, "\n");

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* Re: [PATCH tip/core/rcu 3/3] rcu: make hot-unplugged CPU relinquish its own RCU callbacks
  2009-09-29 14:56 ` Paul E. McKenney
@ 2009-09-29 15:57   ` Mathieu Desnoyers
  0 siblings, 0 replies; 13+ messages in thread
From: Mathieu Desnoyers @ 2009-09-29 15:57 UTC (permalink / raw)
  To: Paul E. McKenney
  Cc: linux-kernel, mingo, laijs, dipankar, akpm, josh, dvhltc, niv,
	tglx, peterz, rostedt, Valdis.Kletnieks, dhowells

* Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote:
> On Tue, Sep 29, 2009 at 09:50:17AM -0400, Mathieu Desnoyers wrote:
> > * Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote:
> > > From: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
> > > 
> > > The current interaction between RCU and CPU hotplug requires that
> > > RCU block in CPU notifiers waiting for callbacks to drain.  This can
> > > be greatly simplified by haing each CPU relinquish its own callbacks,
> > 
> > "having"
> 
> I plead jet lag.
> 
> > > and for both _rcu_barrier() and CPU_DEAD notifiers to adopt all callbacks
> > > that were previously relinquished.  This change also eliminates the
> > > possibility of certain types of hangs due to the previous practice of
> > > waiting for callbacks to be invoked from within CPU notifiers.  If you
> > > don't every wait, you cannot hang.
> > 
> > "ever"
> 
> Twice.  ;-)
> 
> > This idea reminds me a discussion we had at Plumbers, good ! ;)
> > 
> > Simplification of RCU vs CPU hotplug interaction will clearly be
> > welcome. How does it deal with many processors going offline at once
> > while the system is under large RCU callback queue loads ?
> 
> Right now, the "orphan" variables are protected by the rcu_state
> structure's ->onofflock spinlock.  The code appends the CPU's callbacks
> to whatever is already on the orphan list, so it should work in this
> case.  If multiple CPUs go offline, they will serialize appending
> their callbacks onto the orphan list.
> 
> This single lock might well turn out to be a scalability bottleneck,
> but someone is going to have to demonstrate this to me before I would
> be willing to complicate the code to increase scalability.
> 
> > I guess the rationale for letting cpu hotplug wait for callback
> > execution being going offline is close to telling a CPU : "you asked for
> > this callback to be executed, well, _you_ do it before going to sleep.
> > And no, don't try to push this task on your little brothers."
> 
> Not only that, but I am getting really paranoid about RCU and CPU
> hotplug waiting on each other.  Getting some hangs that this patchset
> seems to fix.
> 
> > The only problem I see here is that a CPU could really load up the
> > system and, in the worse case scenario, never execute its own callbacks
> > and let the others do the work. Have you considered this issue ?
> 
> Hmmmm...  The only way I can see for this to happen is for the CPU to
> generate a bunch of callbacks, be taken offline, be brought online,
> generate more callbacks, and so on.  The thing to remember is that
> it typically takes longer than a grace period to take a CPU offline
> and bring it back online, so callbacks should not normally "pile up".
> 

Yes, this is also what I think: if someone brings a CPU offline, he
should expect an increased load on other cpus for a short while.
Simplicity is definitely something palatable in this area.

Thanks,

Mathieu

> > (haha ! just seen the "orphan" variables _after_ writing this up) ;)
> 
> ;-)
> 
> 							Thanx, Paul
> 
> > Thanks,
> > 
> > Mathieu
> > 
> > > 
> > > Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
> > > ---
> > >  kernel/rcutree.c        |  151 ++++++++++++++++++++++++----------------------
> > >  kernel/rcutree.h        |   11 +++-
> > >  kernel/rcutree_plugin.h |   34 +++++++++++
> > >  kernel/rcutree_trace.c  |    4 +-
> > >  4 files changed, 125 insertions(+), 75 deletions(-)
> > > 
> > > diff --git a/kernel/rcutree.c b/kernel/rcutree.c
> > > index 678b2e2..13b016b 100644
> > > --- a/kernel/rcutree.c
> > > +++ b/kernel/rcutree.c
> > > @@ -62,6 +62,9 @@
> > >  	.gpnum = -300, \
> > >  	.completed = -300, \
> > >  	.onofflock = __SPIN_LOCK_UNLOCKED(&name.onofflock), \
> > > +	.orphan_cbs_list = NULL, \
> > > +	.orphan_cbs_tail = &name.orphan_cbs_list, \
> > > +	.orphan_qlen = 0, \
> > >  	.fqslock = __SPIN_LOCK_UNLOCKED(&name.fqslock), \
> > >  	.n_force_qs = 0, \
> > >  	.n_force_qs_ngp = 0, \
> > > @@ -833,17 +836,63 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
> > >  #ifdef CONFIG_HOTPLUG_CPU
> > >  
> > >  /*
> > > + * Move a dying CPU's RCU callbacks to the ->orphan_cbs_list for the
> > > + * specified flavor of RCU.  The callbacks will be adopted by the next
> > > + * _rcu_barrier() invocation or by the CPU_DEAD notifier, whichever
> > > + * comes first.  Because this is invoked from the CPU_DYING notifier,
> > > + * irqs are already disabled.
> > > + */
> > > +static void rcu_send_cbs_to_orphanage(struct rcu_state *rsp)
> > > +{
> > > +	int i;
> > > +	struct rcu_data *rdp = rsp->rda[smp_processor_id()];
> > > +
> > > +	if (rdp->nxtlist == NULL)
> > > +		return;  /* irqs disabled, so comparison is stable. */
> > > +	spin_lock(&rsp->onofflock);  /* irqs already disabled. */
> > > +	*rsp->orphan_cbs_tail = rdp->nxtlist;
> > > +	rsp->orphan_cbs_tail = rdp->nxttail[RCU_NEXT_TAIL];
> > > +	rdp->nxtlist = NULL;
> > > +	for (i = 0; i < RCU_NEXT_SIZE; i++)
> > > +		rdp->nxttail[i] = &rdp->nxtlist;
> > > +	rsp->orphan_qlen += rdp->qlen;
> > > +	rdp->qlen = 0;
> > > +	spin_unlock(&rsp->onofflock);  /* irqs remain disabled. */
> > > +}
> > > +
> > > +/*
> > > + * Adopt previously orphaned RCU callbacks.
> > > + */
> > > +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
> > > +{
> > > +	unsigned long flags;
> > > +	struct rcu_data *rdp;
> > > +
> > > +	spin_lock_irqsave(&rsp->onofflock, flags);
> > > +	rdp = rsp->rda[smp_processor_id()];
> > > +	if (rsp->orphan_cbs_list == NULL) {
> > > +		spin_unlock_irqrestore(&rsp->onofflock, flags);
> > > +		return;
> > > +	}
> > > +	*rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_cbs_list;
> > > +	rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_cbs_tail;
> > > +	rdp->qlen += rsp->orphan_qlen;
> > > +	rsp->orphan_cbs_list = NULL;
> > > +	rsp->orphan_cbs_tail = &rsp->orphan_cbs_list;
> > > +	rsp->orphan_qlen = 0;
> > > +	spin_unlock_irqrestore(&rsp->onofflock, flags);
> > > +}
> > > +
> > > +/*
> > >   * Remove the outgoing CPU from the bitmasks in the rcu_node hierarchy
> > >   * and move all callbacks from the outgoing CPU to the current one.
> > >   */
> > >  static void __rcu_offline_cpu(int cpu, struct rcu_state *rsp)
> > >  {
> > > -	int i;
> > >  	unsigned long flags;
> > >  	long lastcomp;
> > >  	unsigned long mask;
> > >  	struct rcu_data *rdp = rsp->rda[cpu];
> > > -	struct rcu_data *rdp_me;
> > >  	struct rcu_node *rnp;
> > >  
> > >  	/* Exclude any attempts to start a new grace period. */
> > > @@ -866,32 +915,9 @@ static void __rcu_offline_cpu(int cpu, struct rcu_state *rsp)
> > >  	} while (rnp != NULL);
> > >  	lastcomp = rsp->completed;
> > >  
> > > -	spin_unlock(&rsp->onofflock);		/* irqs remain disabled. */
> > > +	spin_unlock_irqrestore(&rsp->onofflock, flags);
> > >  
> > > -	/*
> > > -	 * Move callbacks from the outgoing CPU to the running CPU.
> > > -	 * Note that the outgoing CPU is now quiescent, so it is now
> > > -	 * (uncharacteristically) safe to access its rcu_data structure.
> > > -	 * Note also that we must carefully retain the order of the
> > > -	 * outgoing CPU's callbacks in order for rcu_barrier() to work
> > > -	 * correctly.  Finally, note that we start all the callbacks
> > > -	 * afresh, even those that have passed through a grace period
> > > -	 * and are therefore ready to invoke.  The theory is that hotplug
> > > -	 * events are rare, and that if they are frequent enough to
> > > -	 * indefinitely delay callbacks, you have far worse things to
> > > -	 * be worrying about.
> > > -	 */
> > > -	if (rdp->nxtlist != NULL) {
> > > -		rdp_me = rsp->rda[smp_processor_id()];
> > > -		*rdp_me->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist;
> > > -		rdp_me->nxttail[RCU_NEXT_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
> > > -		rdp->nxtlist = NULL;
> > > -		for (i = 0; i < RCU_NEXT_SIZE; i++)
> > > -			rdp->nxttail[i] = &rdp->nxtlist;
> > > -		rdp_me->qlen += rdp->qlen;
> > > -		rdp->qlen = 0;
> > > -	}
> > > -	local_irq_restore(flags);
> > > +	rcu_adopt_orphan_cbs(rsp);
> > >  }
> > >  
> > >  /*
> > > @@ -909,6 +935,14 @@ static void rcu_offline_cpu(int cpu)
> > >  
> > >  #else /* #ifdef CONFIG_HOTPLUG_CPU */
> > >  
> > > +static void rcu_send_cbs_to_orphanage(struct rcu_state *rsp)
> > > +{
> > > +}
> > > +
> > > +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
> > > +{
> > > +}
> > > +
> > >  static void rcu_offline_cpu(int cpu)
> > >  {
> > >  }
> > > @@ -1362,9 +1396,6 @@ static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
> > >  static atomic_t rcu_barrier_cpu_count;
> > >  static DEFINE_MUTEX(rcu_barrier_mutex);
> > >  static struct completion rcu_barrier_completion;
> > > -static atomic_t rcu_migrate_type_count = ATOMIC_INIT(0);
> > > -static struct rcu_head rcu_migrate_head[3];
> > > -static DECLARE_WAIT_QUEUE_HEAD(rcu_migrate_wq);
> > >  
> > >  static void rcu_barrier_callback(struct rcu_head *notused)
> > >  {
> > > @@ -1387,21 +1418,16 @@ static void rcu_barrier_func(void *type)
> > >  	call_rcu_func(head, rcu_barrier_callback);
> > >  }
> > >  
> > > -static inline void wait_migrated_callbacks(void)
> > > -{
> > > -	wait_event(rcu_migrate_wq, !atomic_read(&rcu_migrate_type_count));
> > > -	smp_mb(); /* In case we didn't sleep. */
> > > -}
> > > -
> > >  /*
> > >   * Orchestrate the specified type of RCU barrier, waiting for all
> > >   * RCU callbacks of the specified type to complete.
> > >   */
> > > -static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
> > > +static void _rcu_barrier(struct rcu_state *rsp,
> > > +			 void (*call_rcu_func)(struct rcu_head *head,
> > >  					       void (*func)(struct rcu_head *head)))
> > >  {
> > >  	BUG_ON(in_interrupt());
> > > -	/* Take cpucontrol mutex to protect against CPU hotplug */
> > > +	/* Take mutex to serialize concurrent rcu_barrier() requests. */
> > >  	mutex_lock(&rcu_barrier_mutex);
> > >  	init_completion(&rcu_barrier_completion);
> > >  	/*
> > > @@ -1414,29 +1440,22 @@ static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
> > >  	 * early.
> > >  	 */
> > >  	atomic_set(&rcu_barrier_cpu_count, 1);
> > > +	preempt_disable(); /* stop CPU_DYING from filling orphan_cbs_list */
> > > +	rcu_adopt_orphan_cbs(rsp);
> > >  	on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
> > > +	preempt_enable(); /* CPU_DYING can again fill orphan_cbs_list */
> > >  	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
> > >  		complete(&rcu_barrier_completion);
> > >  	wait_for_completion(&rcu_barrier_completion);
> > >  	mutex_unlock(&rcu_barrier_mutex);
> > > -	wait_migrated_callbacks();
> > > -}
> > > -
> > > -/**
> > > - * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
> > > - */
> > > -void rcu_barrier(void)
> > > -{
> > > -	_rcu_barrier(call_rcu);
> > >  }
> > > -EXPORT_SYMBOL_GPL(rcu_barrier);
> > >  
> > >  /**
> > >   * rcu_barrier_bh - Wait until all in-flight call_rcu_bh() callbacks complete.
> > >   */
> > >  void rcu_barrier_bh(void)
> > >  {
> > > -	_rcu_barrier(call_rcu_bh);
> > > +	_rcu_barrier(&rcu_bh_state, call_rcu_bh);
> > >  }
> > >  EXPORT_SYMBOL_GPL(rcu_barrier_bh);
> > >  
> > > @@ -1445,16 +1464,10 @@ EXPORT_SYMBOL_GPL(rcu_barrier_bh);
> > >   */
> > >  void rcu_barrier_sched(void)
> > >  {
> > > -	_rcu_barrier(call_rcu_sched);
> > > +	_rcu_barrier(&rcu_sched_state, call_rcu_sched);
> > >  }
> > >  EXPORT_SYMBOL_GPL(rcu_barrier_sched);
> > >  
> > > -static void rcu_migrate_callback(struct rcu_head *notused)
> > > -{
> > > -	if (atomic_dec_and_test(&rcu_migrate_type_count))
> > > -		wake_up(&rcu_migrate_wq);
> > > -}
> > > -
> > >  /*
> > >   * Do boot-time initialization of a CPU's per-CPU RCU data.
> > >   */
> > > @@ -1551,27 +1564,21 @@ int __cpuinit rcu_cpu_notify(struct notifier_block *self,
> > >  	case CPU_UP_PREPARE_FROZEN:
> > >  		rcu_online_cpu(cpu);
> > >  		break;
> > > -	case CPU_DOWN_PREPARE:
> > > -	case CPU_DOWN_PREPARE_FROZEN:
> > > -		/* Don't need to wait until next removal operation. */
> > > -		/* rcu_migrate_head is protected by cpu_add_remove_lock */
> > > -		wait_migrated_callbacks();
> > > -		break;
> > >  	case CPU_DYING:
> > >  	case CPU_DYING_FROZEN:
> > >  		/*
> > > -		 * preempt_disable() in on_each_cpu() prevents stop_machine(),
> > > +		 * preempt_disable() in _rcu_barrier() prevents stop_machine(),
> > >  		 * so when "on_each_cpu(rcu_barrier_func, (void *)type, 1);"
> > > -		 * returns, all online cpus have queued rcu_barrier_func(),
> > > -		 * and the dead cpu(if it exist) queues rcu_migrate_callback()s.
> > > -		 *
> > > -		 * These callbacks ensure _rcu_barrier() waits for all
> > > -		 * RCU callbacks of the specified type to complete.
> > > +		 * returns, all online cpus have queued rcu_barrier_func().
> > > +		 * The dying CPU clears its cpu_online_mask bit and
> > > +		 * moves all of its RCU callbacks to ->orphan_cbs_list
> > > +		 * in the context of stop_machine(), so subsequent calls
> > > +		 * to _rcu_barrier() will adopt these callbacks and only
> > > +		 * then queue rcu_barrier_func() on all remaining CPUs.
> > >  		 */
> > > -		atomic_set(&rcu_migrate_type_count, 3);
> > > -		call_rcu_bh(rcu_migrate_head, rcu_migrate_callback);
> > > -		call_rcu_sched(rcu_migrate_head + 1, rcu_migrate_callback);
> > > -		call_rcu(rcu_migrate_head + 2, rcu_migrate_callback);
> > > +		rcu_send_cbs_to_orphanage(&rcu_bh_state);
> > > +		rcu_send_cbs_to_orphanage(&rcu_sched_state);
> > > +		rcu_preempt_send_cbs_to_orphanage();
> > >  		break;
> > >  	case CPU_DEAD:
> > >  	case CPU_DEAD_FROZEN:
> > > diff --git a/kernel/rcutree.h b/kernel/rcutree.h
> > > index 676eecd..b40ac57 100644
> > > --- a/kernel/rcutree.h
> > > +++ b/kernel/rcutree.h
> > > @@ -244,7 +244,15 @@ struct rcu_state {
> > >  	/* End  of fields guarded by root rcu_node's lock. */
> > >  
> > >  	spinlock_t onofflock;			/* exclude on/offline and */
> > > -						/*  starting new GP. */
> > > +						/*  starting new GP.  Also */
> > > +						/*  protects the following */
> > > +						/*  orphan_cbs fields. */
> > > +	struct rcu_head *orphan_cbs_list;	/* list of rcu_head structs */
> > > +						/*  orphaned by all CPUs in */
> > > +						/*  a given leaf rcu_node */
> > > +						/*  going offline. */
> > > +	struct rcu_head **orphan_cbs_tail;	/* And tail pointer. */
> > > +	long orphan_qlen;			/* Number of orphaned cbs. */
> > >  	spinlock_t fqslock;			/* Only one task forcing */
> > >  						/*  quiescent states. */
> > >  	unsigned long jiffies_force_qs;		/* Time at which to invoke */
> > > @@ -305,6 +313,7 @@ void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu));
> > >  static int rcu_preempt_pending(int cpu);
> > >  static int rcu_preempt_needs_cpu(int cpu);
> > >  static void __cpuinit rcu_preempt_init_percpu_data(int cpu);
> > > +static void rcu_preempt_send_cbs_to_orphanage(void);
> > >  static void __init __rcu_init_preempt(void);
> > >  
> > >  #endif /* #else #ifdef RCU_TREE_NONCORE */
> > > diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h
> > > index d88dfd3..2fa3f39 100644
> > > --- a/kernel/rcutree_plugin.h
> > > +++ b/kernel/rcutree_plugin.h
> > > @@ -411,6 +411,15 @@ static int rcu_preempt_needs_cpu(int cpu)
> > >  	return !!per_cpu(rcu_preempt_data, cpu).nxtlist;
> > >  }
> > >  
> > > +/**
> > > + * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
> > > + */
> > > +void rcu_barrier(void)
> > > +{
> > > +	_rcu_barrier(&rcu_preempt_state, call_rcu);
> > > +}
> > > +EXPORT_SYMBOL_GPL(rcu_barrier);
> > > +
> > >  /*
> > >   * Initialize preemptable RCU's per-CPU data.
> > >   */
> > > @@ -420,6 +429,14 @@ static void __cpuinit rcu_preempt_init_percpu_data(int cpu)
> > >  }
> > >  
> > >  /*
> > > + * Move preemptable RCU's callbacks to ->orphan_cbs_list.
> > > + */
> > > +static void rcu_preempt_send_cbs_to_orphanage(void)
> > > +{
> > > +	rcu_send_cbs_to_orphanage(&rcu_preempt_state);
> > > +}
> > > +
> > > +/*
> > >   * Initialize preemptable RCU's state structures.
> > >   */
> > >  static void __init __rcu_init_preempt(void)
> > > @@ -565,6 +582,16 @@ static int rcu_preempt_needs_cpu(int cpu)
> > >  }
> > >  
> > >  /*
> > > + * Because preemptable RCU does not exist, rcu_barrier() is just
> > > + * another name for rcu_barrier_sched().
> > > + */
> > > +void rcu_barrier(void)
> > > +{
> > > +	rcu_barrier_sched();
> > > +}
> > > +EXPORT_SYMBOL_GPL(rcu_barrier);
> > > +
> > > +/*
> > >   * Because preemptable RCU does not exist, there is no per-CPU
> > >   * data to initialize.
> > >   */
> > > @@ -573,6 +600,13 @@ static void __cpuinit rcu_preempt_init_percpu_data(int cpu)
> > >  }
> > >  
> > >  /*
> > > + * Because there is no preemptable RCU, there are no callbacks to move.
> > > + */
> > > +static void rcu_preempt_send_cbs_to_orphanage(void)
> > > +{
> > > +}
> > > +
> > > +/*
> > >   * Because preemptable RCU does not exist, it need not be initialized.
> > >   */
> > >  static void __init __rcu_init_preempt(void)
> > > diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
> > > index f09af28..4b31c77 100644
> > > --- a/kernel/rcutree_trace.c
> > > +++ b/kernel/rcutree_trace.c
> > > @@ -159,13 +159,13 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp)
> > >  	struct rcu_node *rnp;
> > >  
> > >  	seq_printf(m, "c=%ld g=%ld s=%d jfq=%ld j=%x "
> > > -		      "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu\n",
> > > +		      "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld\n",
> > >  		   rsp->completed, rsp->gpnum, rsp->signaled,
> > >  		   (long)(rsp->jiffies_force_qs - jiffies),
> > >  		   (int)(jiffies & 0xffff),
> > >  		   rsp->n_force_qs, rsp->n_force_qs_ngp,
> > >  		   rsp->n_force_qs - rsp->n_force_qs_ngp,
> > > -		   rsp->n_force_qs_lh);
> > > +		   rsp->n_force_qs_lh, rsp->orphan_qlen);
> > >  	for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) {
> > >  		if (rnp->level != level) {
> > >  			seq_puts(m, "\n");
> > > -- 
> > > 1.5.2.5
> > > 
> > 
> > -- 
> > Mathieu Desnoyers
> > OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F  BA06 3F25 A8FE 3BAE 9A68
> > --
> > To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
> > the body of a message to majordomo@vger.kernel.org
> > More majordomo info at  http://vger.kernel.org/majordomo-info.html
> > Please read the FAQ at  http://www.tux.org/lkml/

-- 
Mathieu Desnoyers
OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F  BA06 3F25 A8FE 3BAE 9A68

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH tip/core/rcu 3/3] rcu: make hot-unplugged CPU relinquish its own RCU callbacks
  2009-09-29 13:50 [PATCH tip/core/rcu 3/3] rcu: make " Mathieu Desnoyers
@ 2009-09-29 14:56 ` Paul E. McKenney
  2009-09-29 15:57   ` Mathieu Desnoyers
  0 siblings, 1 reply; 13+ messages in thread
From: Paul E. McKenney @ 2009-09-29 14:56 UTC (permalink / raw)
  To: Mathieu Desnoyers
  Cc: linux-kernel, mingo, laijs, dipankar, akpm, josh, dvhltc, niv,
	tglx, peterz, rostedt, Valdis.Kletnieks, dhowells

On Tue, Sep 29, 2009 at 09:50:17AM -0400, Mathieu Desnoyers wrote:
> * Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote:
> > From: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
> > 
> > The current interaction between RCU and CPU hotplug requires that
> > RCU block in CPU notifiers waiting for callbacks to drain.  This can
> > be greatly simplified by haing each CPU relinquish its own callbacks,
> 
> "having"

I plead jet lag.

> > and for both _rcu_barrier() and CPU_DEAD notifiers to adopt all callbacks
> > that were previously relinquished.  This change also eliminates the
> > possibility of certain types of hangs due to the previous practice of
> > waiting for callbacks to be invoked from within CPU notifiers.  If you
> > don't every wait, you cannot hang.
> 
> "ever"

Twice.  ;-)

> This idea reminds me a discussion we had at Plumbers, good ! ;)
> 
> Simplification of RCU vs CPU hotplug interaction will clearly be
> welcome. How does it deal with many processors going offline at once
> while the system is under large RCU callback queue loads ?

Right now, the "orphan" variables are protected by the rcu_state
structure's ->onofflock spinlock.  The code appends the CPU's callbacks
to whatever is already on the orphan list, so it should work in this
case.  If multiple CPUs go offline, they will serialize appending
their callbacks onto the orphan list.

This single lock might well turn out to be a scalability bottleneck,
but someone is going to have to demonstrate this to me before I would
be willing to complicate the code to increase scalability.

> I guess the rationale for letting cpu hotplug wait for callback
> execution being going offline is close to telling a CPU : "you asked for
> this callback to be executed, well, _you_ do it before going to sleep.
> And no, don't try to push this task on your little brothers."

Not only that, but I am getting really paranoid about RCU and CPU
hotplug waiting on each other.  Getting some hangs that this patchset
seems to fix.

> The only problem I see here is that a CPU could really load up the
> system and, in the worse case scenario, never execute its own callbacks
> and let the others do the work. Have you considered this issue ?

Hmmmm...  The only way I can see for this to happen is for the CPU to
generate a bunch of callbacks, be taken offline, be brought online,
generate more callbacks, and so on.  The thing to remember is that
it typically takes longer than a grace period to take a CPU offline
and bring it back online, so callbacks should not normally "pile up".

> (haha ! just seen the "orphan" variables _after_ writing this up) ;)

;-)

							Thanx, Paul

> Thanks,
> 
> Mathieu
> 
> > 
> > Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
> > ---
> >  kernel/rcutree.c        |  151 ++++++++++++++++++++++++----------------------
> >  kernel/rcutree.h        |   11 +++-
> >  kernel/rcutree_plugin.h |   34 +++++++++++
> >  kernel/rcutree_trace.c  |    4 +-
> >  4 files changed, 125 insertions(+), 75 deletions(-)
> > 
> > diff --git a/kernel/rcutree.c b/kernel/rcutree.c
> > index 678b2e2..13b016b 100644
> > --- a/kernel/rcutree.c
> > +++ b/kernel/rcutree.c
> > @@ -62,6 +62,9 @@
> >  	.gpnum = -300, \
> >  	.completed = -300, \
> >  	.onofflock = __SPIN_LOCK_UNLOCKED(&name.onofflock), \
> > +	.orphan_cbs_list = NULL, \
> > +	.orphan_cbs_tail = &name.orphan_cbs_list, \
> > +	.orphan_qlen = 0, \
> >  	.fqslock = __SPIN_LOCK_UNLOCKED(&name.fqslock), \
> >  	.n_force_qs = 0, \
> >  	.n_force_qs_ngp = 0, \
> > @@ -833,17 +836,63 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
> >  #ifdef CONFIG_HOTPLUG_CPU
> >  
> >  /*
> > + * Move a dying CPU's RCU callbacks to the ->orphan_cbs_list for the
> > + * specified flavor of RCU.  The callbacks will be adopted by the next
> > + * _rcu_barrier() invocation or by the CPU_DEAD notifier, whichever
> > + * comes first.  Because this is invoked from the CPU_DYING notifier,
> > + * irqs are already disabled.
> > + */
> > +static void rcu_send_cbs_to_orphanage(struct rcu_state *rsp)
> > +{
> > +	int i;
> > +	struct rcu_data *rdp = rsp->rda[smp_processor_id()];
> > +
> > +	if (rdp->nxtlist == NULL)
> > +		return;  /* irqs disabled, so comparison is stable. */
> > +	spin_lock(&rsp->onofflock);  /* irqs already disabled. */
> > +	*rsp->orphan_cbs_tail = rdp->nxtlist;
> > +	rsp->orphan_cbs_tail = rdp->nxttail[RCU_NEXT_TAIL];
> > +	rdp->nxtlist = NULL;
> > +	for (i = 0; i < RCU_NEXT_SIZE; i++)
> > +		rdp->nxttail[i] = &rdp->nxtlist;
> > +	rsp->orphan_qlen += rdp->qlen;
> > +	rdp->qlen = 0;
> > +	spin_unlock(&rsp->onofflock);  /* irqs remain disabled. */
> > +}
> > +
> > +/*
> > + * Adopt previously orphaned RCU callbacks.
> > + */
> > +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
> > +{
> > +	unsigned long flags;
> > +	struct rcu_data *rdp;
> > +
> > +	spin_lock_irqsave(&rsp->onofflock, flags);
> > +	rdp = rsp->rda[smp_processor_id()];
> > +	if (rsp->orphan_cbs_list == NULL) {
> > +		spin_unlock_irqrestore(&rsp->onofflock, flags);
> > +		return;
> > +	}
> > +	*rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_cbs_list;
> > +	rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_cbs_tail;
> > +	rdp->qlen += rsp->orphan_qlen;
> > +	rsp->orphan_cbs_list = NULL;
> > +	rsp->orphan_cbs_tail = &rsp->orphan_cbs_list;
> > +	rsp->orphan_qlen = 0;
> > +	spin_unlock_irqrestore(&rsp->onofflock, flags);
> > +}
> > +
> > +/*
> >   * Remove the outgoing CPU from the bitmasks in the rcu_node hierarchy
> >   * and move all callbacks from the outgoing CPU to the current one.
> >   */
> >  static void __rcu_offline_cpu(int cpu, struct rcu_state *rsp)
> >  {
> > -	int i;
> >  	unsigned long flags;
> >  	long lastcomp;
> >  	unsigned long mask;
> >  	struct rcu_data *rdp = rsp->rda[cpu];
> > -	struct rcu_data *rdp_me;
> >  	struct rcu_node *rnp;
> >  
> >  	/* Exclude any attempts to start a new grace period. */
> > @@ -866,32 +915,9 @@ static void __rcu_offline_cpu(int cpu, struct rcu_state *rsp)
> >  	} while (rnp != NULL);
> >  	lastcomp = rsp->completed;
> >  
> > -	spin_unlock(&rsp->onofflock);		/* irqs remain disabled. */
> > +	spin_unlock_irqrestore(&rsp->onofflock, flags);
> >  
> > -	/*
> > -	 * Move callbacks from the outgoing CPU to the running CPU.
> > -	 * Note that the outgoing CPU is now quiescent, so it is now
> > -	 * (uncharacteristically) safe to access its rcu_data structure.
> > -	 * Note also that we must carefully retain the order of the
> > -	 * outgoing CPU's callbacks in order for rcu_barrier() to work
> > -	 * correctly.  Finally, note that we start all the callbacks
> > -	 * afresh, even those that have passed through a grace period
> > -	 * and are therefore ready to invoke.  The theory is that hotplug
> > -	 * events are rare, and that if they are frequent enough to
> > -	 * indefinitely delay callbacks, you have far worse things to
> > -	 * be worrying about.
> > -	 */
> > -	if (rdp->nxtlist != NULL) {
> > -		rdp_me = rsp->rda[smp_processor_id()];
> > -		*rdp_me->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist;
> > -		rdp_me->nxttail[RCU_NEXT_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
> > -		rdp->nxtlist = NULL;
> > -		for (i = 0; i < RCU_NEXT_SIZE; i++)
> > -			rdp->nxttail[i] = &rdp->nxtlist;
> > -		rdp_me->qlen += rdp->qlen;
> > -		rdp->qlen = 0;
> > -	}
> > -	local_irq_restore(flags);
> > +	rcu_adopt_orphan_cbs(rsp);
> >  }
> >  
> >  /*
> > @@ -909,6 +935,14 @@ static void rcu_offline_cpu(int cpu)
> >  
> >  #else /* #ifdef CONFIG_HOTPLUG_CPU */
> >  
> > +static void rcu_send_cbs_to_orphanage(struct rcu_state *rsp)
> > +{
> > +}
> > +
> > +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
> > +{
> > +}
> > +
> >  static void rcu_offline_cpu(int cpu)
> >  {
> >  }
> > @@ -1362,9 +1396,6 @@ static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
> >  static atomic_t rcu_barrier_cpu_count;
> >  static DEFINE_MUTEX(rcu_barrier_mutex);
> >  static struct completion rcu_barrier_completion;
> > -static atomic_t rcu_migrate_type_count = ATOMIC_INIT(0);
> > -static struct rcu_head rcu_migrate_head[3];
> > -static DECLARE_WAIT_QUEUE_HEAD(rcu_migrate_wq);
> >  
> >  static void rcu_barrier_callback(struct rcu_head *notused)
> >  {
> > @@ -1387,21 +1418,16 @@ static void rcu_barrier_func(void *type)
> >  	call_rcu_func(head, rcu_barrier_callback);
> >  }
> >  
> > -static inline void wait_migrated_callbacks(void)
> > -{
> > -	wait_event(rcu_migrate_wq, !atomic_read(&rcu_migrate_type_count));
> > -	smp_mb(); /* In case we didn't sleep. */
> > -}
> > -
> >  /*
> >   * Orchestrate the specified type of RCU barrier, waiting for all
> >   * RCU callbacks of the specified type to complete.
> >   */
> > -static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
> > +static void _rcu_barrier(struct rcu_state *rsp,
> > +			 void (*call_rcu_func)(struct rcu_head *head,
> >  					       void (*func)(struct rcu_head *head)))
> >  {
> >  	BUG_ON(in_interrupt());
> > -	/* Take cpucontrol mutex to protect against CPU hotplug */
> > +	/* Take mutex to serialize concurrent rcu_barrier() requests. */
> >  	mutex_lock(&rcu_barrier_mutex);
> >  	init_completion(&rcu_barrier_completion);
> >  	/*
> > @@ -1414,29 +1440,22 @@ static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
> >  	 * early.
> >  	 */
> >  	atomic_set(&rcu_barrier_cpu_count, 1);
> > +	preempt_disable(); /* stop CPU_DYING from filling orphan_cbs_list */
> > +	rcu_adopt_orphan_cbs(rsp);
> >  	on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
> > +	preempt_enable(); /* CPU_DYING can again fill orphan_cbs_list */
> >  	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
> >  		complete(&rcu_barrier_completion);
> >  	wait_for_completion(&rcu_barrier_completion);
> >  	mutex_unlock(&rcu_barrier_mutex);
> > -	wait_migrated_callbacks();
> > -}
> > -
> > -/**
> > - * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
> > - */
> > -void rcu_barrier(void)
> > -{
> > -	_rcu_barrier(call_rcu);
> >  }
> > -EXPORT_SYMBOL_GPL(rcu_barrier);
> >  
> >  /**
> >   * rcu_barrier_bh - Wait until all in-flight call_rcu_bh() callbacks complete.
> >   */
> >  void rcu_barrier_bh(void)
> >  {
> > -	_rcu_barrier(call_rcu_bh);
> > +	_rcu_barrier(&rcu_bh_state, call_rcu_bh);
> >  }
> >  EXPORT_SYMBOL_GPL(rcu_barrier_bh);
> >  
> > @@ -1445,16 +1464,10 @@ EXPORT_SYMBOL_GPL(rcu_barrier_bh);
> >   */
> >  void rcu_barrier_sched(void)
> >  {
> > -	_rcu_barrier(call_rcu_sched);
> > +	_rcu_barrier(&rcu_sched_state, call_rcu_sched);
> >  }
> >  EXPORT_SYMBOL_GPL(rcu_barrier_sched);
> >  
> > -static void rcu_migrate_callback(struct rcu_head *notused)
> > -{
> > -	if (atomic_dec_and_test(&rcu_migrate_type_count))
> > -		wake_up(&rcu_migrate_wq);
> > -}
> > -
> >  /*
> >   * Do boot-time initialization of a CPU's per-CPU RCU data.
> >   */
> > @@ -1551,27 +1564,21 @@ int __cpuinit rcu_cpu_notify(struct notifier_block *self,
> >  	case CPU_UP_PREPARE_FROZEN:
> >  		rcu_online_cpu(cpu);
> >  		break;
> > -	case CPU_DOWN_PREPARE:
> > -	case CPU_DOWN_PREPARE_FROZEN:
> > -		/* Don't need to wait until next removal operation. */
> > -		/* rcu_migrate_head is protected by cpu_add_remove_lock */
> > -		wait_migrated_callbacks();
> > -		break;
> >  	case CPU_DYING:
> >  	case CPU_DYING_FROZEN:
> >  		/*
> > -		 * preempt_disable() in on_each_cpu() prevents stop_machine(),
> > +		 * preempt_disable() in _rcu_barrier() prevents stop_machine(),
> >  		 * so when "on_each_cpu(rcu_barrier_func, (void *)type, 1);"
> > -		 * returns, all online cpus have queued rcu_barrier_func(),
> > -		 * and the dead cpu(if it exist) queues rcu_migrate_callback()s.
> > -		 *
> > -		 * These callbacks ensure _rcu_barrier() waits for all
> > -		 * RCU callbacks of the specified type to complete.
> > +		 * returns, all online cpus have queued rcu_barrier_func().
> > +		 * The dying CPU clears its cpu_online_mask bit and
> > +		 * moves all of its RCU callbacks to ->orphan_cbs_list
> > +		 * in the context of stop_machine(), so subsequent calls
> > +		 * to _rcu_barrier() will adopt these callbacks and only
> > +		 * then queue rcu_barrier_func() on all remaining CPUs.
> >  		 */
> > -		atomic_set(&rcu_migrate_type_count, 3);
> > -		call_rcu_bh(rcu_migrate_head, rcu_migrate_callback);
> > -		call_rcu_sched(rcu_migrate_head + 1, rcu_migrate_callback);
> > -		call_rcu(rcu_migrate_head + 2, rcu_migrate_callback);
> > +		rcu_send_cbs_to_orphanage(&rcu_bh_state);
> > +		rcu_send_cbs_to_orphanage(&rcu_sched_state);
> > +		rcu_preempt_send_cbs_to_orphanage();
> >  		break;
> >  	case CPU_DEAD:
> >  	case CPU_DEAD_FROZEN:
> > diff --git a/kernel/rcutree.h b/kernel/rcutree.h
> > index 676eecd..b40ac57 100644
> > --- a/kernel/rcutree.h
> > +++ b/kernel/rcutree.h
> > @@ -244,7 +244,15 @@ struct rcu_state {
> >  	/* End  of fields guarded by root rcu_node's lock. */
> >  
> >  	spinlock_t onofflock;			/* exclude on/offline and */
> > -						/*  starting new GP. */
> > +						/*  starting new GP.  Also */
> > +						/*  protects the following */
> > +						/*  orphan_cbs fields. */
> > +	struct rcu_head *orphan_cbs_list;	/* list of rcu_head structs */
> > +						/*  orphaned by all CPUs in */
> > +						/*  a given leaf rcu_node */
> > +						/*  going offline. */
> > +	struct rcu_head **orphan_cbs_tail;	/* And tail pointer. */
> > +	long orphan_qlen;			/* Number of orphaned cbs. */
> >  	spinlock_t fqslock;			/* Only one task forcing */
> >  						/*  quiescent states. */
> >  	unsigned long jiffies_force_qs;		/* Time at which to invoke */
> > @@ -305,6 +313,7 @@ void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu));
> >  static int rcu_preempt_pending(int cpu);
> >  static int rcu_preempt_needs_cpu(int cpu);
> >  static void __cpuinit rcu_preempt_init_percpu_data(int cpu);
> > +static void rcu_preempt_send_cbs_to_orphanage(void);
> >  static void __init __rcu_init_preempt(void);
> >  
> >  #endif /* #else #ifdef RCU_TREE_NONCORE */
> > diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h
> > index d88dfd3..2fa3f39 100644
> > --- a/kernel/rcutree_plugin.h
> > +++ b/kernel/rcutree_plugin.h
> > @@ -411,6 +411,15 @@ static int rcu_preempt_needs_cpu(int cpu)
> >  	return !!per_cpu(rcu_preempt_data, cpu).nxtlist;
> >  }
> >  
> > +/**
> > + * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
> > + */
> > +void rcu_barrier(void)
> > +{
> > +	_rcu_barrier(&rcu_preempt_state, call_rcu);
> > +}
> > +EXPORT_SYMBOL_GPL(rcu_barrier);
> > +
> >  /*
> >   * Initialize preemptable RCU's per-CPU data.
> >   */
> > @@ -420,6 +429,14 @@ static void __cpuinit rcu_preempt_init_percpu_data(int cpu)
> >  }
> >  
> >  /*
> > + * Move preemptable RCU's callbacks to ->orphan_cbs_list.
> > + */
> > +static void rcu_preempt_send_cbs_to_orphanage(void)
> > +{
> > +	rcu_send_cbs_to_orphanage(&rcu_preempt_state);
> > +}
> > +
> > +/*
> >   * Initialize preemptable RCU's state structures.
> >   */
> >  static void __init __rcu_init_preempt(void)
> > @@ -565,6 +582,16 @@ static int rcu_preempt_needs_cpu(int cpu)
> >  }
> >  
> >  /*
> > + * Because preemptable RCU does not exist, rcu_barrier() is just
> > + * another name for rcu_barrier_sched().
> > + */
> > +void rcu_barrier(void)
> > +{
> > +	rcu_barrier_sched();
> > +}
> > +EXPORT_SYMBOL_GPL(rcu_barrier);
> > +
> > +/*
> >   * Because preemptable RCU does not exist, there is no per-CPU
> >   * data to initialize.
> >   */
> > @@ -573,6 +600,13 @@ static void __cpuinit rcu_preempt_init_percpu_data(int cpu)
> >  }
> >  
> >  /*
> > + * Because there is no preemptable RCU, there are no callbacks to move.
> > + */
> > +static void rcu_preempt_send_cbs_to_orphanage(void)
> > +{
> > +}
> > +
> > +/*
> >   * Because preemptable RCU does not exist, it need not be initialized.
> >   */
> >  static void __init __rcu_init_preempt(void)
> > diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
> > index f09af28..4b31c77 100644
> > --- a/kernel/rcutree_trace.c
> > +++ b/kernel/rcutree_trace.c
> > @@ -159,13 +159,13 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp)
> >  	struct rcu_node *rnp;
> >  
> >  	seq_printf(m, "c=%ld g=%ld s=%d jfq=%ld j=%x "
> > -		      "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu\n",
> > +		      "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld\n",
> >  		   rsp->completed, rsp->gpnum, rsp->signaled,
> >  		   (long)(rsp->jiffies_force_qs - jiffies),
> >  		   (int)(jiffies & 0xffff),
> >  		   rsp->n_force_qs, rsp->n_force_qs_ngp,
> >  		   rsp->n_force_qs - rsp->n_force_qs_ngp,
> > -		   rsp->n_force_qs_lh);
> > +		   rsp->n_force_qs_lh, rsp->orphan_qlen);
> >  	for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) {
> >  		if (rnp->level != level) {
> >  			seq_puts(m, "\n");
> > -- 
> > 1.5.2.5
> > 
> 
> -- 
> Mathieu Desnoyers
> OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F  BA06 3F25 A8FE 3BAE 9A68
> --
> To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
> Please read the FAQ at  http://www.tux.org/lkml/

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH tip/core/rcu 3/3] rcu: make hot-unplugged CPU relinquish its own RCU callbacks
@ 2009-09-29 13:50 Mathieu Desnoyers
  2009-09-29 14:56 ` Paul E. McKenney
  0 siblings, 1 reply; 13+ messages in thread
From: Mathieu Desnoyers @ 2009-09-29 13:50 UTC (permalink / raw)
  To: Paul E. McKenney
  Cc: linux-kernel, mingo, laijs, dipankar, akpm, josh, dvhltc, niv,
	tglx, peterz, rostedt, Valdis.Kletnieks, dhowells

* Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote:
> From: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
> 
> The current interaction between RCU and CPU hotplug requires that
> RCU block in CPU notifiers waiting for callbacks to drain.  This can
> be greatly simplified by haing each CPU relinquish its own callbacks,

"having"

> and for both _rcu_barrier() and CPU_DEAD notifiers to adopt all callbacks
> that were previously relinquished.  This change also eliminates the
> possibility of certain types of hangs due to the previous practice of
> waiting for callbacks to be invoked from within CPU notifiers.  If you
> don't every wait, you cannot hang.

"ever"

This idea reminds me a discussion we had at Plumbers, good ! ;)

Simplification of RCU vs CPU hotplug interaction will clearly be
welcome. How does it deal with many processors going offline at once
while the system is under large RCU callback queue loads ?

I guess the rationale for letting cpu hotplug wait for callback
execution being going offline is close to telling a CPU : "you asked for
this callback to be executed, well, _you_ do it before going to sleep.
And no, don't try to push this task on your little brothers."

The only problem I see here is that a CPU could really load up the
system and, in the worse case scenario, never execute its own callbacks
and let the others do the work. Have you considered this issue ?

(haha ! just seen the "orphan" variables _after_ writing this up) ;)

Thanks,

Mathieu

> 
> Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
> ---
>  kernel/rcutree.c        |  151 ++++++++++++++++++++++++----------------------
>  kernel/rcutree.h        |   11 +++-
>  kernel/rcutree_plugin.h |   34 +++++++++++
>  kernel/rcutree_trace.c  |    4 +-
>  4 files changed, 125 insertions(+), 75 deletions(-)
> 
> diff --git a/kernel/rcutree.c b/kernel/rcutree.c
> index 678b2e2..13b016b 100644
> --- a/kernel/rcutree.c
> +++ b/kernel/rcutree.c
> @@ -62,6 +62,9 @@
>  	.gpnum = -300, \
>  	.completed = -300, \
>  	.onofflock = __SPIN_LOCK_UNLOCKED(&name.onofflock), \
> +	.orphan_cbs_list = NULL, \
> +	.orphan_cbs_tail = &name.orphan_cbs_list, \
> +	.orphan_qlen = 0, \
>  	.fqslock = __SPIN_LOCK_UNLOCKED(&name.fqslock), \
>  	.n_force_qs = 0, \
>  	.n_force_qs_ngp = 0, \
> @@ -833,17 +836,63 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
>  #ifdef CONFIG_HOTPLUG_CPU
>  
>  /*
> + * Move a dying CPU's RCU callbacks to the ->orphan_cbs_list for the
> + * specified flavor of RCU.  The callbacks will be adopted by the next
> + * _rcu_barrier() invocation or by the CPU_DEAD notifier, whichever
> + * comes first.  Because this is invoked from the CPU_DYING notifier,
> + * irqs are already disabled.
> + */
> +static void rcu_send_cbs_to_orphanage(struct rcu_state *rsp)
> +{
> +	int i;
> +	struct rcu_data *rdp = rsp->rda[smp_processor_id()];
> +
> +	if (rdp->nxtlist == NULL)
> +		return;  /* irqs disabled, so comparison is stable. */
> +	spin_lock(&rsp->onofflock);  /* irqs already disabled. */
> +	*rsp->orphan_cbs_tail = rdp->nxtlist;
> +	rsp->orphan_cbs_tail = rdp->nxttail[RCU_NEXT_TAIL];
> +	rdp->nxtlist = NULL;
> +	for (i = 0; i < RCU_NEXT_SIZE; i++)
> +		rdp->nxttail[i] = &rdp->nxtlist;
> +	rsp->orphan_qlen += rdp->qlen;
> +	rdp->qlen = 0;
> +	spin_unlock(&rsp->onofflock);  /* irqs remain disabled. */
> +}
> +
> +/*
> + * Adopt previously orphaned RCU callbacks.
> + */
> +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
> +{
> +	unsigned long flags;
> +	struct rcu_data *rdp;
> +
> +	spin_lock_irqsave(&rsp->onofflock, flags);
> +	rdp = rsp->rda[smp_processor_id()];
> +	if (rsp->orphan_cbs_list == NULL) {
> +		spin_unlock_irqrestore(&rsp->onofflock, flags);
> +		return;
> +	}
> +	*rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_cbs_list;
> +	rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_cbs_tail;
> +	rdp->qlen += rsp->orphan_qlen;
> +	rsp->orphan_cbs_list = NULL;
> +	rsp->orphan_cbs_tail = &rsp->orphan_cbs_list;
> +	rsp->orphan_qlen = 0;
> +	spin_unlock_irqrestore(&rsp->onofflock, flags);
> +}
> +
> +/*
>   * Remove the outgoing CPU from the bitmasks in the rcu_node hierarchy
>   * and move all callbacks from the outgoing CPU to the current one.
>   */
>  static void __rcu_offline_cpu(int cpu, struct rcu_state *rsp)
>  {
> -	int i;
>  	unsigned long flags;
>  	long lastcomp;
>  	unsigned long mask;
>  	struct rcu_data *rdp = rsp->rda[cpu];
> -	struct rcu_data *rdp_me;
>  	struct rcu_node *rnp;
>  
>  	/* Exclude any attempts to start a new grace period. */
> @@ -866,32 +915,9 @@ static void __rcu_offline_cpu(int cpu, struct rcu_state *rsp)
>  	} while (rnp != NULL);
>  	lastcomp = rsp->completed;
>  
> -	spin_unlock(&rsp->onofflock);		/* irqs remain disabled. */
> +	spin_unlock_irqrestore(&rsp->onofflock, flags);
>  
> -	/*
> -	 * Move callbacks from the outgoing CPU to the running CPU.
> -	 * Note that the outgoing CPU is now quiescent, so it is now
> -	 * (uncharacteristically) safe to access its rcu_data structure.
> -	 * Note also that we must carefully retain the order of the
> -	 * outgoing CPU's callbacks in order for rcu_barrier() to work
> -	 * correctly.  Finally, note that we start all the callbacks
> -	 * afresh, even those that have passed through a grace period
> -	 * and are therefore ready to invoke.  The theory is that hotplug
> -	 * events are rare, and that if they are frequent enough to
> -	 * indefinitely delay callbacks, you have far worse things to
> -	 * be worrying about.
> -	 */
> -	if (rdp->nxtlist != NULL) {
> -		rdp_me = rsp->rda[smp_processor_id()];
> -		*rdp_me->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist;
> -		rdp_me->nxttail[RCU_NEXT_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
> -		rdp->nxtlist = NULL;
> -		for (i = 0; i < RCU_NEXT_SIZE; i++)
> -			rdp->nxttail[i] = &rdp->nxtlist;
> -		rdp_me->qlen += rdp->qlen;
> -		rdp->qlen = 0;
> -	}
> -	local_irq_restore(flags);
> +	rcu_adopt_orphan_cbs(rsp);
>  }
>  
>  /*
> @@ -909,6 +935,14 @@ static void rcu_offline_cpu(int cpu)
>  
>  #else /* #ifdef CONFIG_HOTPLUG_CPU */
>  
> +static void rcu_send_cbs_to_orphanage(struct rcu_state *rsp)
> +{
> +}
> +
> +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
> +{
> +}
> +
>  static void rcu_offline_cpu(int cpu)
>  {
>  }
> @@ -1362,9 +1396,6 @@ static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
>  static atomic_t rcu_barrier_cpu_count;
>  static DEFINE_MUTEX(rcu_barrier_mutex);
>  static struct completion rcu_barrier_completion;
> -static atomic_t rcu_migrate_type_count = ATOMIC_INIT(0);
> -static struct rcu_head rcu_migrate_head[3];
> -static DECLARE_WAIT_QUEUE_HEAD(rcu_migrate_wq);
>  
>  static void rcu_barrier_callback(struct rcu_head *notused)
>  {
> @@ -1387,21 +1418,16 @@ static void rcu_barrier_func(void *type)
>  	call_rcu_func(head, rcu_barrier_callback);
>  }
>  
> -static inline void wait_migrated_callbacks(void)
> -{
> -	wait_event(rcu_migrate_wq, !atomic_read(&rcu_migrate_type_count));
> -	smp_mb(); /* In case we didn't sleep. */
> -}
> -
>  /*
>   * Orchestrate the specified type of RCU barrier, waiting for all
>   * RCU callbacks of the specified type to complete.
>   */
> -static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
> +static void _rcu_barrier(struct rcu_state *rsp,
> +			 void (*call_rcu_func)(struct rcu_head *head,
>  					       void (*func)(struct rcu_head *head)))
>  {
>  	BUG_ON(in_interrupt());
> -	/* Take cpucontrol mutex to protect against CPU hotplug */
> +	/* Take mutex to serialize concurrent rcu_barrier() requests. */
>  	mutex_lock(&rcu_barrier_mutex);
>  	init_completion(&rcu_barrier_completion);
>  	/*
> @@ -1414,29 +1440,22 @@ static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
>  	 * early.
>  	 */
>  	atomic_set(&rcu_barrier_cpu_count, 1);
> +	preempt_disable(); /* stop CPU_DYING from filling orphan_cbs_list */
> +	rcu_adopt_orphan_cbs(rsp);
>  	on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
> +	preempt_enable(); /* CPU_DYING can again fill orphan_cbs_list */
>  	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
>  		complete(&rcu_barrier_completion);
>  	wait_for_completion(&rcu_barrier_completion);
>  	mutex_unlock(&rcu_barrier_mutex);
> -	wait_migrated_callbacks();
> -}
> -
> -/**
> - * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
> - */
> -void rcu_barrier(void)
> -{
> -	_rcu_barrier(call_rcu);
>  }
> -EXPORT_SYMBOL_GPL(rcu_barrier);
>  
>  /**
>   * rcu_barrier_bh - Wait until all in-flight call_rcu_bh() callbacks complete.
>   */
>  void rcu_barrier_bh(void)
>  {
> -	_rcu_barrier(call_rcu_bh);
> +	_rcu_barrier(&rcu_bh_state, call_rcu_bh);
>  }
>  EXPORT_SYMBOL_GPL(rcu_barrier_bh);
>  
> @@ -1445,16 +1464,10 @@ EXPORT_SYMBOL_GPL(rcu_barrier_bh);
>   */
>  void rcu_barrier_sched(void)
>  {
> -	_rcu_barrier(call_rcu_sched);
> +	_rcu_barrier(&rcu_sched_state, call_rcu_sched);
>  }
>  EXPORT_SYMBOL_GPL(rcu_barrier_sched);
>  
> -static void rcu_migrate_callback(struct rcu_head *notused)
> -{
> -	if (atomic_dec_and_test(&rcu_migrate_type_count))
> -		wake_up(&rcu_migrate_wq);
> -}
> -
>  /*
>   * Do boot-time initialization of a CPU's per-CPU RCU data.
>   */
> @@ -1551,27 +1564,21 @@ int __cpuinit rcu_cpu_notify(struct notifier_block *self,
>  	case CPU_UP_PREPARE_FROZEN:
>  		rcu_online_cpu(cpu);
>  		break;
> -	case CPU_DOWN_PREPARE:
> -	case CPU_DOWN_PREPARE_FROZEN:
> -		/* Don't need to wait until next removal operation. */
> -		/* rcu_migrate_head is protected by cpu_add_remove_lock */
> -		wait_migrated_callbacks();
> -		break;
>  	case CPU_DYING:
>  	case CPU_DYING_FROZEN:
>  		/*
> -		 * preempt_disable() in on_each_cpu() prevents stop_machine(),
> +		 * preempt_disable() in _rcu_barrier() prevents stop_machine(),
>  		 * so when "on_each_cpu(rcu_barrier_func, (void *)type, 1);"
> -		 * returns, all online cpus have queued rcu_barrier_func(),
> -		 * and the dead cpu(if it exist) queues rcu_migrate_callback()s.
> -		 *
> -		 * These callbacks ensure _rcu_barrier() waits for all
> -		 * RCU callbacks of the specified type to complete.
> +		 * returns, all online cpus have queued rcu_barrier_func().
> +		 * The dying CPU clears its cpu_online_mask bit and
> +		 * moves all of its RCU callbacks to ->orphan_cbs_list
> +		 * in the context of stop_machine(), so subsequent calls
> +		 * to _rcu_barrier() will adopt these callbacks and only
> +		 * then queue rcu_barrier_func() on all remaining CPUs.
>  		 */
> -		atomic_set(&rcu_migrate_type_count, 3);
> -		call_rcu_bh(rcu_migrate_head, rcu_migrate_callback);
> -		call_rcu_sched(rcu_migrate_head + 1, rcu_migrate_callback);
> -		call_rcu(rcu_migrate_head + 2, rcu_migrate_callback);
> +		rcu_send_cbs_to_orphanage(&rcu_bh_state);
> +		rcu_send_cbs_to_orphanage(&rcu_sched_state);
> +		rcu_preempt_send_cbs_to_orphanage();
>  		break;
>  	case CPU_DEAD:
>  	case CPU_DEAD_FROZEN:
> diff --git a/kernel/rcutree.h b/kernel/rcutree.h
> index 676eecd..b40ac57 100644
> --- a/kernel/rcutree.h
> +++ b/kernel/rcutree.h
> @@ -244,7 +244,15 @@ struct rcu_state {
>  	/* End  of fields guarded by root rcu_node's lock. */
>  
>  	spinlock_t onofflock;			/* exclude on/offline and */
> -						/*  starting new GP. */
> +						/*  starting new GP.  Also */
> +						/*  protects the following */
> +						/*  orphan_cbs fields. */
> +	struct rcu_head *orphan_cbs_list;	/* list of rcu_head structs */
> +						/*  orphaned by all CPUs in */
> +						/*  a given leaf rcu_node */
> +						/*  going offline. */
> +	struct rcu_head **orphan_cbs_tail;	/* And tail pointer. */
> +	long orphan_qlen;			/* Number of orphaned cbs. */
>  	spinlock_t fqslock;			/* Only one task forcing */
>  						/*  quiescent states. */
>  	unsigned long jiffies_force_qs;		/* Time at which to invoke */
> @@ -305,6 +313,7 @@ void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu));
>  static int rcu_preempt_pending(int cpu);
>  static int rcu_preempt_needs_cpu(int cpu);
>  static void __cpuinit rcu_preempt_init_percpu_data(int cpu);
> +static void rcu_preempt_send_cbs_to_orphanage(void);
>  static void __init __rcu_init_preempt(void);
>  
>  #endif /* #else #ifdef RCU_TREE_NONCORE */
> diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h
> index d88dfd3..2fa3f39 100644
> --- a/kernel/rcutree_plugin.h
> +++ b/kernel/rcutree_plugin.h
> @@ -411,6 +411,15 @@ static int rcu_preempt_needs_cpu(int cpu)
>  	return !!per_cpu(rcu_preempt_data, cpu).nxtlist;
>  }
>  
> +/**
> + * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
> + */
> +void rcu_barrier(void)
> +{
> +	_rcu_barrier(&rcu_preempt_state, call_rcu);
> +}
> +EXPORT_SYMBOL_GPL(rcu_barrier);
> +
>  /*
>   * Initialize preemptable RCU's per-CPU data.
>   */
> @@ -420,6 +429,14 @@ static void __cpuinit rcu_preempt_init_percpu_data(int cpu)
>  }
>  
>  /*
> + * Move preemptable RCU's callbacks to ->orphan_cbs_list.
> + */
> +static void rcu_preempt_send_cbs_to_orphanage(void)
> +{
> +	rcu_send_cbs_to_orphanage(&rcu_preempt_state);
> +}
> +
> +/*
>   * Initialize preemptable RCU's state structures.
>   */
>  static void __init __rcu_init_preempt(void)
> @@ -565,6 +582,16 @@ static int rcu_preempt_needs_cpu(int cpu)
>  }
>  
>  /*
> + * Because preemptable RCU does not exist, rcu_barrier() is just
> + * another name for rcu_barrier_sched().
> + */
> +void rcu_barrier(void)
> +{
> +	rcu_barrier_sched();
> +}
> +EXPORT_SYMBOL_GPL(rcu_barrier);
> +
> +/*
>   * Because preemptable RCU does not exist, there is no per-CPU
>   * data to initialize.
>   */
> @@ -573,6 +600,13 @@ static void __cpuinit rcu_preempt_init_percpu_data(int cpu)
>  }
>  
>  /*
> + * Because there is no preemptable RCU, there are no callbacks to move.
> + */
> +static void rcu_preempt_send_cbs_to_orphanage(void)
> +{
> +}
> +
> +/*
>   * Because preemptable RCU does not exist, it need not be initialized.
>   */
>  static void __init __rcu_init_preempt(void)
> diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
> index f09af28..4b31c77 100644
> --- a/kernel/rcutree_trace.c
> +++ b/kernel/rcutree_trace.c
> @@ -159,13 +159,13 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp)
>  	struct rcu_node *rnp;
>  
>  	seq_printf(m, "c=%ld g=%ld s=%d jfq=%ld j=%x "
> -		      "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu\n",
> +		      "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld\n",
>  		   rsp->completed, rsp->gpnum, rsp->signaled,
>  		   (long)(rsp->jiffies_force_qs - jiffies),
>  		   (int)(jiffies & 0xffff),
>  		   rsp->n_force_qs, rsp->n_force_qs_ngp,
>  		   rsp->n_force_qs - rsp->n_force_qs_ngp,
> -		   rsp->n_force_qs_lh);
> +		   rsp->n_force_qs_lh, rsp->orphan_qlen);
>  	for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) {
>  		if (rnp->level != level) {
>  			seq_puts(m, "\n");
> -- 
> 1.5.2.5
> 

-- 
Mathieu Desnoyers
OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F  BA06 3F25 A8FE 3BAE 9A68

^ permalink raw reply	[flat|nested] 13+ messages in thread

end of thread, other threads:[~2009-10-05 19:12 UTC | newest]

Thread overview: 13+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2009-09-29  4:49 [PATCH tip/core/rcu 0/3] rcu: simplify rcu_barrier() interaction with CPU hotplug Paul E. McKenney
2009-09-29  4:50 ` [PATCH tip/core/rcu 1/3] rcu: replace the rcu_barrier enum with pointer to call_rcu*() function Paul E. McKenney
2009-10-01  7:46   ` [tip:core/rcu] rcu: Replace " tip-bot for Paul E. McKenney
2009-10-05 19:10   ` tip-bot for Paul E. McKenney
2009-09-29  4:50 ` [PATCH tip/core/rcu 2/3] rcu: move rcu_barrier() to rcutree, make lightweight rcu_barrier() for rcutiny Paul E. McKenney
2009-10-01  7:46   ` [tip:core/rcu] rcu: Move " tip-bot for Paul E. McKenney
2009-10-05 19:10   ` tip-bot for Paul E. McKenney
2009-09-29  4:50 ` [PATCH tip/core/rcu 3/3] rcu: make hot-unplugged CPU relinquish its own RCU callbacks Paul E. McKenney
2009-10-01  7:46   ` [tip:core/rcu] rcu: Make " tip-bot for Paul E. McKenney
2009-10-05 19:11   ` tip-bot for Paul E. McKenney
2009-09-29 13:50 [PATCH tip/core/rcu 3/3] rcu: make " Mathieu Desnoyers
2009-09-29 14:56 ` Paul E. McKenney
2009-09-29 15:57   ` Mathieu Desnoyers

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.