From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S3001240AbdDZOb6 (ORCPT ); Wed, 26 Apr 2017 10:31:58 -0400 Received: from mx0a-001b2d01.pphosted.com ([148.163.156.1]:37313 "EHLO mx0a-001b2d01.pphosted.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S3001222AbdDZObt (ORCPT ); Wed, 26 Apr 2017 10:31:49 -0400 Date: Wed, 26 Apr 2017 07:31:45 -0700 From: "Paul E. McKenney" To: Mike Galbraith Cc: LKML Subject: Re: TREE_SRCU slows hotplug by factor ~16 Reply-To: paulmck@linux.vnet.ibm.com References: <1493002089.4145.7.camel@gmx.de> <20170424033250.GX3956@linux.vnet.ibm.com> <1493011484.4166.13.camel@gmx.de> <20170424062220.GZ3956@linux.vnet.ibm.com> <1493019303.4617.6.camel@gmx.de> <20170424162442.GI3956@linux.vnet.ibm.com> <20170425223642.GA10931@linux.vnet.ibm.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20170425223642.GA10931@linux.vnet.ibm.com> User-Agent: Mutt/1.5.21 (2010-09-15) X-TM-AS-GCONF: 00 x-cbid: 17042614-0040-0000-0000-00000326AC39 X-IBM-SpamModules-Scores: X-IBM-SpamModules-Versions: BY=3.00006977; HX=3.00000240; KW=3.00000007; PH=3.00000004; SC=3.00000208; SDB=6.00852759; UDB=6.00421552; IPR=6.00631597; BA=6.00005313; NDR=6.00000001; ZLA=6.00000005; ZF=6.00000009; ZB=6.00000000; ZP=6.00000000; ZH=6.00000000; ZU=6.00000002; MB=3.00015187; XFM=3.00000013; UTC=2017-04-26 14:31:46 X-IBM-AV-DETECTION: SAVI=unused REMOTE=unused XFE=unused x-cbparentid: 17042614-0041-0000-0000-0000071AC603 Message-Id: <20170426143145.GA5683@linux.vnet.ibm.com> X-Proofpoint-Virus-Version: vendor=fsecure engine=2.50.10432:,, definitions=2017-04-26_10:,, signatures=0 X-Proofpoint-Spam-Details: rule=outbound_notspam policy=outbound score=0 spamscore=0 suspectscore=0 malwarescore=0 phishscore=0 adultscore=0 bulkscore=0 classifier=spam adjust=0 reason=mlx scancount=1 engine=8.0.1-1703280000 definitions=main-1704260253 Sender: linux-kernel-owner@vger.kernel.org List-ID: X-Mailing-List: linux-kernel@vger.kernel.org On Tue, Apr 25, 2017 at 03:36:42PM -0700, Paul E. McKenney wrote: > On Mon, Apr 24, 2017 at 09:24:42AM -0700, Paul E. McKenney wrote: > > On Mon, Apr 24, 2017 at 09:35:03AM +0200, Mike Galbraith wrote: > > > On Sun, 2017-04-23 at 23:22 -0700, Paul E. McKenney wrote: > > > > > > > Could you please collect an ftrace (or whatever) showing the timestamp > > > > sequence of calls to synchronize_srcu(), synchronize_srcu_expedited(), > > > > and call_srcu() during the execution of the stress script? If it is easy > > > > to do, also the timestamp sequence of returns from synchronize_srcu() > > > > and synchronize_srcu_expedited()? > > > > > > The first two minutes below config bits, trace_printk([enter/exit]) in > > > each function. If you (unlikely, but..) want the whole trace, holler. > > > > > > # RCU Subsystem > > > CONFIG_TREE_RCU=y > > > CONFIG_RCU_EXPERT=y > > > CONFIG_SRCU=y > > > # CONFIG_CLASSIC_SRCU is not set > > > CONFIG_TREE_SRCU=y > > > # CONFIG_TASKS_RCU is not set > > > CONFIG_RCU_STALL_COMMON=y > > > CONFIG_RCU_FANOUT=64 > > > CONFIG_RCU_FANOUT_LEAF=16 > > > # CONFIG_RCU_FAST_NO_HZ is not set > > > # CONFIG_TREE_RCU_TRACE is not set > > > CONFIG_RCU_KTHREAD_PRIO=0 > > > CONFIG_RCU_NOCB_CPU=y > > > CONFIG_RCU_NOCB_CPU_NONE=y > > > # CONFIG_RCU_NOCB_CPU_ZERO is not set > > > # CONFIG_RCU_NOCB_CPU_ALL is not set > > > # RCU Debugging > > > # CONFIG_PROVE_RCU is not set > > > # CONFIG_SPARSE_RCU_POINTER is not set > > > # CONFIG_RCU_PERF_TEST is not set > > > # CONFIG_RCU_TORTURE_TEST is not set > > > CONFIG_RCU_CPU_STALL_TIMEOUT=60 > > > # CONFIG_RCU_TRACE is not set > > > # CONFIG_RCU_EQS_DEBUG is not set > > > > Thank you for the data, extremely helpful! I now know that I must > > immediately fix this the hard way rather than trying several simpler > > possibilities that, according to your trace data, would be complete > > wastes of time. > > > > My initial calculation was actually optimistic. There was one > > exit-to-enter interval at about 80 milliseconds, but the second > > longest is 342 microseconds, and the vast majority are under 200 > > microseconds. > > > > Back to the drawing board! > > > > This will take some time, but I should have a patch for you in a few days, > > hopefully sooner. > > Making progress, might have something late tomorrow (Wednesday) Pacific > Time, will definitely have something Thursday Pacific Time. And a sneak preview, semi-tested. If you get a chance to run this, please let me know now it goes. This should apply on -tip or on -rcu. Probably on recent -next as well. Thanx, Paul ------------------------------------------------------------------------ diff --git a/Documentation/admin-guide/kernel-parameters.txt b/Documentation/admin-guide/kernel-parameters.txt index facc20a3f962..4a4b9266c4de 100644 --- a/Documentation/admin-guide/kernel-parameters.txt +++ b/Documentation/admin-guide/kernel-parameters.txt @@ -3779,6 +3779,14 @@ spia_pedr= spia_peddr= + srcutree.exp_holdoff [KNL] + Specifies how many nanoseconds must elapse + since the end of the last SRCU grace period for + a given srcu_struct until the next normal SRCU + grace period will be considered for automatic + expediting. Set to zero to disable automatic + expediting. + stacktrace [FTRACE] Enabled the stack tracer on boot up. diff --git a/include/linux/srcuclassic.h b/include/linux/srcuclassic.h index 41cf99930f34..5753f7322262 100644 --- a/include/linux/srcuclassic.h +++ b/include/linux/srcuclassic.h @@ -98,4 +98,18 @@ void synchronize_srcu_expedited(struct srcu_struct *sp); void srcu_barrier(struct srcu_struct *sp); unsigned long srcu_batches_completed(struct srcu_struct *sp); +static inline void srcutorture_get_gp_data(enum rcutorture_type test_type, + struct srcu_struct *sp, int *flags, + unsigned long *gpnum, + unsigned long *completed) +{ + if (test_type != SRCU_FLAVOR) + return; + *flags = 0; + *completed = sp->completed; + *gpnum = *completed; + if (sp->batch_queue.head || sp->batch_check0.head || sp->batch_check0.head) + (*gpnum)++; +} + #endif diff --git a/include/linux/srcutiny.h b/include/linux/srcutiny.h index 4f284e4f4d8c..42311ee0334f 100644 --- a/include/linux/srcutiny.h +++ b/include/linux/srcutiny.h @@ -78,4 +78,16 @@ static inline unsigned long srcu_batches_completed(struct srcu_struct *sp) return 0; } +static inline void srcutorture_get_gp_data(enum rcutorture_type test_type, + struct srcu_struct *sp, int *flags, + unsigned long *gpnum, + unsigned long *completed) +{ + if (test_type != SRCU_FLAVOR) + return; + *flags = 0; + *completed = sp->srcu_gp_seq; + *gpnum = *completed; +} + #endif diff --git a/include/linux/srcutree.h b/include/linux/srcutree.h index 0400e211aa44..32e86d85fd11 100644 --- a/include/linux/srcutree.h +++ b/include/linux/srcutree.h @@ -43,10 +43,13 @@ struct srcu_data { spinlock_t lock ____cacheline_internodealigned_in_smp; struct rcu_segcblist srcu_cblist; /* List of callbacks.*/ unsigned long srcu_gp_seq_needed; /* Furthest future GP needed. */ + unsigned long srcu_gp_seq_needed_exp; /* Furthest future exp GP. */ bool srcu_cblist_invoking; /* Invoking these CBs? */ struct delayed_work work; /* Context for CB invoking. */ struct rcu_head srcu_barrier_head; /* For srcu_barrier() use. */ struct srcu_node *mynode; /* Leaf srcu_node. */ + unsigned long grpmask; /* Mask for leaf srcu_node */ + /* ->srcu_data_have_cbs[]. */ int cpu; struct srcu_struct *sp; }; @@ -59,6 +62,9 @@ struct srcu_node { unsigned long srcu_have_cbs[4]; /* GP seq for children */ /* having CBs, but only */ /* is > ->srcu_gq_seq. */ + unsigned long srcu_data_have_cbs[4]; /* Which srcu_data structs */ + /* have CBs for given GP? */ + unsigned long srcu_gp_seq_needed_exp; /* Furthest future exp GP. */ struct srcu_node *srcu_parent; /* Next up in tree. */ int grplo; /* Least CPU for node. */ int grphi; /* Biggest CPU for node. */ @@ -77,7 +83,8 @@ struct srcu_struct { unsigned int srcu_idx; /* Current rdr array element. */ unsigned long srcu_gp_seq; /* Grace-period seq #. */ unsigned long srcu_gp_seq_needed; /* Latest gp_seq needed. */ - atomic_t srcu_exp_cnt; /* # ongoing expedited GPs. */ + unsigned long srcu_gp_seq_needed_exp; /* Furthest future exp GP. */ + unsigned long srcu_last_gp_end; /* Last GP end timestamp (ns) */ struct srcu_data __percpu *sda; /* Per-CPU srcu_data array. */ unsigned long srcu_barrier_seq; /* srcu_barrier seq #. */ struct mutex srcu_barrier_mutex; /* Serialize barrier ops. */ @@ -136,4 +143,8 @@ void synchronize_srcu_expedited(struct srcu_struct *sp); void srcu_barrier(struct srcu_struct *sp); unsigned long srcu_batches_completed(struct srcu_struct *sp); +void srcutorture_get_gp_data(enum rcutorture_type test_type, + struct srcu_struct *sp, int *flags, + unsigned long *gpnum, unsigned long *completed); + #endif diff --git a/kernel/rcu/rcutorture.c b/kernel/rcu/rcutorture.c index e9d4527cdd43..ae6e574d4cf5 100644 --- a/kernel/rcu/rcutorture.c +++ b/kernel/rcu/rcutorture.c @@ -1360,12 +1360,14 @@ rcu_torture_stats_print(void) cur_ops->stats(); if (rtcv_snap == rcu_torture_current_version && rcu_torture_current != NULL) { - int __maybe_unused flags; - unsigned long __maybe_unused gpnum; - unsigned long __maybe_unused completed; + int __maybe_unused flags = 0; + unsigned long __maybe_unused gpnum = 0; + unsigned long __maybe_unused completed = 0; rcutorture_get_gp_data(cur_ops->ttype, &flags, &gpnum, &completed); + srcutorture_get_gp_data(cur_ops->ttype, srcu_ctlp, + &flags, &gpnum, &completed); wtp = READ_ONCE(writer_task); pr_alert("??? Writer stall state %s(%d) g%lu c%lu f%#x ->state %#lx\n", rcu_torture_writer_state_getname(), diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c index 9ecf0acc18eb..b09221912f48 100644 --- a/kernel/rcu/srcutree.c +++ b/kernel/rcu/srcutree.c @@ -34,10 +34,14 @@ #include #include #include +#include #include #include "rcu.h" +ulong exp_holdoff = 50 * 1000; /* Holdoff (ns) for auto-expediting. */ +module_param(exp_holdoff, ulong, 0444); + static void srcu_invoke_callbacks(struct work_struct *work); static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay); @@ -66,8 +70,13 @@ static void init_srcu_struct_nodes(struct srcu_struct *sp, bool is_static) /* Each pass through this loop initializes one srcu_node structure. */ rcu_for_each_node_breadth_first(sp, snp) { spin_lock_init(&snp->lock); - for (i = 0; i < ARRAY_SIZE(snp->srcu_have_cbs); i++) + WARN_ON_ONCE(ARRAY_SIZE(snp->srcu_have_cbs) != + ARRAY_SIZE(snp->srcu_data_have_cbs)); + for (i = 0; i < ARRAY_SIZE(snp->srcu_have_cbs); i++) { snp->srcu_have_cbs[i] = 0; + snp->srcu_data_have_cbs[i] = 0; + } + snp->srcu_gp_seq_needed_exp = 0; snp->grplo = -1; snp->grphi = -1; if (snp == &sp->node[0]) { @@ -98,6 +107,7 @@ static void init_srcu_struct_nodes(struct srcu_struct *sp, bool is_static) rcu_segcblist_init(&sdp->srcu_cblist); sdp->srcu_cblist_invoking = false; sdp->srcu_gp_seq_needed = sp->srcu_gp_seq; + sdp->srcu_gp_seq_needed_exp = sp->srcu_gp_seq; sdp->mynode = &snp_first[cpu / levelspread[level]]; for (snp = sdp->mynode; snp != NULL; snp = snp->srcu_parent) { if (snp->grplo < 0) @@ -107,6 +117,7 @@ static void init_srcu_struct_nodes(struct srcu_struct *sp, bool is_static) sdp->cpu = cpu; INIT_DELAYED_WORK(&sdp->work, srcu_invoke_callbacks); sdp->sp = sp; + sdp->grpmask = 1 << (cpu - sdp->mynode->grplo); if (is_static) continue; @@ -130,7 +141,6 @@ static int init_srcu_struct_fields(struct srcu_struct *sp, bool is_static) mutex_init(&sp->srcu_gp_mutex); sp->srcu_idx = 0; sp->srcu_gp_seq = 0; - atomic_set(&sp->srcu_exp_cnt, 0); sp->srcu_barrier_seq = 0; mutex_init(&sp->srcu_barrier_mutex); atomic_set(&sp->srcu_barrier_cpu_cnt, 0); @@ -138,6 +148,8 @@ static int init_srcu_struct_fields(struct srcu_struct *sp, bool is_static) if (!is_static) sp->sda = alloc_percpu(struct srcu_data); init_srcu_struct_nodes(sp, is_static); + sp->srcu_gp_seq_needed_exp = 0; + sp->srcu_last_gp_end = ktime_get_mono_fast_ns(); smp_store_release(&sp->srcu_gp_seq_needed, 0); /* Init done. */ return sp->sda ? 0 : -ENOMEM; } @@ -302,6 +314,18 @@ static bool srcu_readers_active(struct srcu_struct *sp) #define SRCU_INTERVAL 1 +/* + * Return grace-period delay, zero if there are expedited grace + * periods pending, SRCU_INTERVAL otherwise. + */ +static unsigned long srcu_get_delay(struct srcu_struct *sp) +{ + if (ULONG_CMP_LT(READ_ONCE(sp->srcu_gp_seq), + READ_ONCE(sp->srcu_gp_seq_needed_exp))) + return 0; + return SRCU_INTERVAL; +} + /** * cleanup_srcu_struct - deconstruct a sleep-RCU structure * @sp: structure to clean up. @@ -313,7 +337,8 @@ void cleanup_srcu_struct(struct srcu_struct *sp) { int cpu; - WARN_ON_ONCE(atomic_read(&sp->srcu_exp_cnt)); + if (WARN_ON(!srcu_get_delay(sp))) + return; /* Leakage unless caller handles error. */ if (WARN_ON(srcu_readers_active(sp))) return; /* Leakage unless caller handles error. */ flush_delayed_work(&sp->work); @@ -382,6 +407,7 @@ static void srcu_gp_start(struct srcu_struct *sp) rcu_seq_current(&sp->srcu_gp_seq)); (void)rcu_segcblist_accelerate(&sdp->srcu_cblist, rcu_seq_snap(&sp->srcu_gp_seq)); + smp_mb(); /* Order prior store to ->srcu_gp_seq_needed vs. GP start. */ rcu_seq_start(&sp->srcu_gp_seq); state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)); WARN_ON_ONCE(state != SRCU_STATE_SCAN1); @@ -434,16 +460,20 @@ static void srcu_schedule_cbs_sdp(struct srcu_data *sdp, unsigned long delay) /* * Schedule callback invocation for all srcu_data structures associated - * with the specified srcu_node structure, if possible, on the corresponding - * CPUs. + * with the specified srcu_node structure that have callbacks for the + * just-completed grace period, the one corresponding to idx. If possible, + * schedule this invocation on the corresponding CPUs. */ -static void srcu_schedule_cbs_snp(struct srcu_struct *sp, struct srcu_node *snp) +static void srcu_schedule_cbs_snp(struct srcu_struct *sp, struct srcu_node *snp, + unsigned long mask, unsigned long delay) { int cpu; - for (cpu = snp->grplo; cpu <= snp->grphi; cpu++) - srcu_schedule_cbs_sdp(per_cpu_ptr(sp->sda, cpu), - atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL); + for (cpu = snp->grplo; cpu <= snp->grphi; cpu++) { + if (!(mask & (1 << (cpu - snp->grplo)))) + continue; + srcu_schedule_cbs_sdp(per_cpu_ptr(sp->sda, cpu), delay); + } } /* @@ -457,10 +487,12 @@ static void srcu_schedule_cbs_snp(struct srcu_struct *sp, struct srcu_node *snp) */ static void srcu_gp_end(struct srcu_struct *sp) { + unsigned long cbdelay; bool cbs; unsigned long gpseq; int idx; int idxnext; + unsigned long mask; struct srcu_node *snp; /* Prevent more than one additional grace period. */ @@ -470,8 +502,12 @@ static void srcu_gp_end(struct srcu_struct *sp) spin_lock_irq(&sp->gp_lock); idx = rcu_seq_state(sp->srcu_gp_seq); WARN_ON_ONCE(idx != SRCU_STATE_SCAN2); + cbdelay = srcu_get_delay(sp); + sp->srcu_last_gp_end = ktime_get_mono_fast_ns(); rcu_seq_end(&sp->srcu_gp_seq); gpseq = rcu_seq_current(&sp->srcu_gp_seq); + if (ULONG_CMP_LT(sp->srcu_gp_seq_needed_exp, gpseq)) + sp->srcu_gp_seq_needed_exp = gpseq; spin_unlock_irq(&sp->gp_lock); mutex_unlock(&sp->srcu_gp_mutex); /* A new grace period can start at this point. But only one. */ @@ -486,10 +522,14 @@ static void srcu_gp_end(struct srcu_struct *sp) cbs = snp->srcu_have_cbs[idx] == gpseq; snp->srcu_have_cbs[idx] = gpseq; rcu_seq_set_state(&snp->srcu_have_cbs[idx], 1); + if (ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, gpseq)) + snp->srcu_gp_seq_needed_exp = gpseq; + mask = snp->srcu_data_have_cbs[idx]; + snp->srcu_data_have_cbs[idx] = 0; spin_unlock_irq(&snp->lock); if (cbs) { smp_mb(); /* GP end before CB invocation. */ - srcu_schedule_cbs_snp(sp, snp); + srcu_schedule_cbs_snp(sp, snp, mask, cbdelay); } } @@ -504,25 +544,52 @@ static void srcu_gp_end(struct srcu_struct *sp) srcu_gp_start(sp); spin_unlock_irq(&sp->gp_lock); /* Throttle expedited grace periods: Should be rare! */ - srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) && - rcu_seq_ctr(gpseq) & 0xf - ? 0 - : SRCU_INTERVAL); + srcu_reschedule(sp, rcu_seq_ctr(gpseq) & 0x3ff + ? 0 : SRCU_INTERVAL); } else { spin_unlock_irq(&sp->gp_lock); } } /* + * Funnel-locking scheme to scalably mediate many concurrent expedited + * grace-period requests. This function is invoked for the first known + * expedited request for a grace period that has already been requested, + * but without expediting. To start a completely new grace period, + * whether expedited or not, use srcu_funnel_gp_start() instead. + */ +static void srcu_funnel_exp_start(struct srcu_struct *sp, struct srcu_node *snp, + unsigned long s) +{ + unsigned long flags; + + for (; snp != NULL; snp = snp->srcu_parent) { + if (rcu_seq_done(&sp->srcu_gp_seq, s) || + ULONG_CMP_GE(READ_ONCE(snp->srcu_gp_seq_needed_exp), s)) + return; + spin_lock_irqsave(&snp->lock, flags); + if (ULONG_CMP_GE(snp->srcu_gp_seq_needed_exp, s)) { + spin_unlock_irqrestore(&snp->lock, flags); + return; + } + WRITE_ONCE(snp->srcu_gp_seq_needed_exp, s); + spin_unlock_irqrestore(&snp->lock, flags); + } + spin_lock_irqsave(&sp->gp_lock, flags); + if (!ULONG_CMP_LT(sp->srcu_gp_seq_needed_exp, s)) + sp->srcu_gp_seq_needed_exp = s; + spin_unlock_irqrestore(&sp->gp_lock, flags); +} + +/* * Funnel-locking scheme to scalably mediate many concurrent grace-period * requests. The winner has to do the work of actually starting grace * period s. Losers must either ensure that their desired grace-period * number is recorded on at least their leaf srcu_node structure, or they * must take steps to invoke their own callbacks. */ -static void srcu_funnel_gp_start(struct srcu_struct *sp, - struct srcu_data *sdp, - unsigned long s) +static void srcu_funnel_gp_start(struct srcu_struct *sp, struct srcu_data *sdp, + unsigned long s, bool do_norm) { unsigned long flags; int idx = rcu_seq_ctr(s) % ARRAY_SIZE(sdp->mynode->srcu_have_cbs); @@ -536,14 +603,25 @@ static void srcu_funnel_gp_start(struct srcu_struct *sp, spin_lock_irqsave(&snp->lock, flags); if (ULONG_CMP_GE(snp->srcu_have_cbs[idx], s)) { snp_seq = snp->srcu_have_cbs[idx]; + if (snp == sdp->mynode && snp_seq == s) + snp->srcu_data_have_cbs[idx] |= sdp->grpmask; spin_unlock_irqrestore(&snp->lock, flags); if (snp == sdp->mynode && snp_seq != s) { smp_mb(); /* CBs after GP! */ - srcu_schedule_cbs_sdp(sdp, 0); + srcu_schedule_cbs_sdp(sdp, do_norm + ? SRCU_INTERVAL + : 0); + return; } + if (!do_norm) + srcu_funnel_exp_start(sp, snp, s); return; } snp->srcu_have_cbs[idx] = s; + if (snp == sdp->mynode) + snp->srcu_data_have_cbs[idx] |= sdp->grpmask; + if (!do_norm && ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, s)) + snp->srcu_gp_seq_needed_exp = s; spin_unlock_irqrestore(&snp->lock, flags); } @@ -556,6 +634,8 @@ static void srcu_funnel_gp_start(struct srcu_struct *sp, */ smp_store_release(&sp->srcu_gp_seq_needed, s); /*^^^*/ } + if (!do_norm && ULONG_CMP_LT(sp->srcu_gp_seq_needed_exp, s)) + sp->srcu_gp_seq_needed_exp = s; /* If grace period not already done and none in progress, start it. */ if (!rcu_seq_done(&sp->srcu_gp_seq, s) && @@ -563,9 +643,7 @@ static void srcu_funnel_gp_start(struct srcu_struct *sp, WARN_ON_ONCE(ULONG_CMP_GE(sp->srcu_gp_seq, sp->srcu_gp_seq_needed)); srcu_gp_start(sp); queue_delayed_work(system_power_efficient_wq, &sp->work, - atomic_read(&sp->srcu_exp_cnt) - ? 0 - : SRCU_INTERVAL); + srcu_get_delay(sp)); } spin_unlock_irqrestore(&sp->gp_lock, flags); } @@ -580,7 +658,7 @@ static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount) for (;;) { if (srcu_readers_active_idx_check(sp, idx)) return true; - if (--trycount + !!atomic_read(&sp->srcu_exp_cnt) <= 0) + if (--trycount + !srcu_get_delay(sp) <= 0) return false; udelay(SRCU_RETRY_CHECK_DELAY); } @@ -606,6 +684,67 @@ static void srcu_flip(struct srcu_struct *sp) } /* + * If SRCU is likely idle, return true, otherwise return false. + * + * Note that it is OK for several current from-idle requests for a new + * grace period from idle to specify expediting because they will all end + * up requesting the same grace period anyhow. So no loss. + * + * Note also that if any CPU (including the current one) is still invoking + * callbacks, this function will nevertheless say "idle". This is not + * ideal, but the overhead of checking all CPUs' callback lists is even + * less ideal, especially on large systems. Furthermore, the wakeup + * can happen before the callback is fully removed, so we have no choice + * but to accept this type of error. + * + * This function is also subject to counter-wrap errors, but let's face + * it, if this function was preempted for enough time for the counters + * to wrap, it really doesn't matter whether or not we expedite the grace + * period. The extra overhead of a needlessly expedited grace period is + * negligible when amoritized over that time period, and the extra latency + * of a needlessly non-expedited grace period is similarly negligible. + */ +static bool srcu_might_be_idle(struct srcu_struct *sp) +{ + unsigned long curseq; + unsigned long flags; + struct srcu_data *sdp; + unsigned long t; + + /* If the local srcu_data structure has callbacks, not idle. */ + local_irq_save(flags); + sdp = this_cpu_ptr(sp->sda); + if (rcu_segcblist_pend_cbs(&sdp->srcu_cblist)) { + local_irq_restore(flags); + return false; /* Callbacks already present, so not idle. */ + } + local_irq_restore(flags); + + /* + * No local callbacks, so probabalistically probe global state. + * Exact information would require acquiring locks, which would + * kill scalability, hence the probabalistic nature of the probe. + */ + + /* First, see if enough time has passed since the last GP. */ + t = ktime_get_mono_fast_ns(); + if (exp_holdoff == 0 || + time_in_range_open(t, sp->srcu_last_gp_end, + sp->srcu_last_gp_end + exp_holdoff)) + return false; /* Too soon after last GP. */ + + /* Next, check for probable idleness. */ + curseq = rcu_seq_current(&sp->srcu_gp_seq); + smp_mb(); /* Order ->srcu_gp_seq with ->srcu_gp_seq_needed. */ + if (ULONG_CMP_LT(curseq, READ_ONCE(sp->srcu_gp_seq_needed))) + return false; /* Grace period in progress, so not idle. */ + smp_mb(); /* Order ->srcu_gp_seq with prior access. */ + if (curseq != rcu_seq_current(&sp->srcu_gp_seq)) + return false; /* GP # changed, so not idle. */ + return true; /* With reasonable probability, idle! */ +} + +/* * Enqueue an SRCU callback on the srcu_data structure associated with * the current CPU and the specified srcu_struct structure, initiating * grace-period processing if it is not already running. @@ -633,10 +772,11 @@ static void srcu_flip(struct srcu_struct *sp) * srcu_read_lock(), and srcu_read_unlock() that are all passed the same * srcu_struct structure. */ -void call_srcu(struct srcu_struct *sp, struct rcu_head *rhp, - rcu_callback_t func) +void __call_srcu(struct srcu_struct *sp, struct rcu_head *rhp, + rcu_callback_t func, bool do_norm) { unsigned long flags; + bool needexp = false; bool needgp = false; unsigned long s; struct srcu_data *sdp; @@ -655,16 +795,28 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *rhp, sdp->srcu_gp_seq_needed = s; needgp = true; } + if (ULONG_CMP_LT(sdp->srcu_gp_seq_needed_exp, s)) { + sdp->srcu_gp_seq_needed_exp = s; + needexp = true; + } spin_unlock_irqrestore(&sdp->lock, flags); if (needgp) - srcu_funnel_gp_start(sp, sdp, s); + srcu_funnel_gp_start(sp, sdp, s, do_norm); + else + srcu_funnel_exp_start(sp, sdp->mynode, s); +} + +void call_srcu(struct srcu_struct *sp, struct rcu_head *rhp, + rcu_callback_t func) +{ + __call_srcu(sp, rhp, func, true); } EXPORT_SYMBOL_GPL(call_srcu); /* * Helper function for synchronize_srcu() and synchronize_srcu_expedited(). */ -static void __synchronize_srcu(struct srcu_struct *sp) +static void __synchronize_srcu(struct srcu_struct *sp, bool do_norm) { struct rcu_synchronize rcu; @@ -680,7 +832,7 @@ static void __synchronize_srcu(struct srcu_struct *sp) check_init_srcu_struct(sp); init_completion(&rcu.completion); init_rcu_head_on_stack(&rcu.head); - call_srcu(sp, &rcu.head, wakeme_after_rcu); + __call_srcu(sp, &rcu.head, wakeme_after_rcu, do_norm); wait_for_completion(&rcu.completion); destroy_rcu_head_on_stack(&rcu.head); } @@ -697,18 +849,7 @@ static void __synchronize_srcu(struct srcu_struct *sp) */ void synchronize_srcu_expedited(struct srcu_struct *sp) { - bool do_norm = rcu_gp_is_normal(); - - check_init_srcu_struct(sp); - if (!do_norm) { - atomic_inc(&sp->srcu_exp_cnt); - smp_mb__after_atomic(); /* increment before GP. */ - } - __synchronize_srcu(sp); - if (!do_norm) { - smp_mb__before_atomic(); /* GP before decrement. */ - WARN_ON_ONCE(atomic_dec_return(&sp->srcu_exp_cnt) < 0); - } + __synchronize_srcu(sp, rcu_gp_is_normal()); } EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); @@ -750,13 +891,18 @@ EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); * Of course, these memory-ordering guarantees apply only when * synchronize_srcu(), srcu_read_lock(), and srcu_read_unlock() are * passed the same srcu_struct structure. + * + * If SRCU is likely idle, expedite the first request. This semantic + * was provided by Classic SRCU, and is relied upon by its users, so TREE + * SRCU must also provide it. Note that detecting idleness is heuristic + * and subject to both false positives and negatives. */ void synchronize_srcu(struct srcu_struct *sp) { - if (rcu_gp_is_expedited()) + if (srcu_might_be_idle(sp) || rcu_gp_is_expedited()) synchronize_srcu_expedited(sp); else - __synchronize_srcu(sp); + __synchronize_srcu(sp, true); } EXPORT_SYMBOL_GPL(synchronize_srcu); @@ -991,6 +1137,18 @@ void process_srcu(struct work_struct *work) sp = container_of(work, struct srcu_struct, work.work); srcu_advance_state(sp); - srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL); + srcu_reschedule(sp, srcu_get_delay(sp)); } EXPORT_SYMBOL_GPL(process_srcu); + +void srcutorture_get_gp_data(enum rcutorture_type test_type, + struct srcu_struct *sp, int *flags, + unsigned long *gpnum, unsigned long *completed) +{ + if (test_type != SRCU_FLAVOR) + return; + *flags = 0; + *completed = rcu_seq_ctr(sp->srcu_gp_seq); + *gpnum = rcu_seq_ctr(sp->srcu_gp_seq_needed); +} +EXPORT_SYMBOL_GPL(srcutorture_get_gp_data); diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c index 23aa02587d0f..91fff49d5869 100644 --- a/kernel/rcu/tree.c +++ b/kernel/rcu/tree.c @@ -704,15 +704,11 @@ void rcutorture_get_gp_data(enum rcutorture_type test_type, int *flags, default: break; } - if (rsp != NULL) { - *flags = READ_ONCE(rsp->gp_flags); - *gpnum = READ_ONCE(rsp->gpnum); - *completed = READ_ONCE(rsp->completed); + if (rsp == NULL) return; - } - *flags = 0; - *gpnum = 0; - *completed = 0; + *flags = READ_ONCE(rsp->gp_flags); + *gpnum = READ_ONCE(rsp->gpnum); + *completed = READ_ONCE(rsp->completed); } EXPORT_SYMBOL_GPL(rcutorture_get_gp_data);