From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1751825AbcGPRJx (ORCPT ); Sat, 16 Jul 2016 13:09:53 -0400 Received: from mx1.redhat.com ([209.132.183.28]:34752 "EHLO mx1.redhat.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1751623AbcGPRJv (ORCPT ); Sat, 16 Jul 2016 13:09:51 -0400 Date: Sat, 16 Jul 2016 19:10:07 +0200 From: Oleg Nesterov To: "Paul E. McKenney" , Peter Zijlstra Cc: mingo@kernel.org, linux-kernel@vger.kernel.org, tj@kernel.org, john.stultz@linaro.org, dimitrysh@google.com, romlem@google.com, ccross@google.com, tkjos@google.com Subject: [PATCH] rcu_sync: simplify the state machine, introduce __rcu_sync_enter() Message-ID: <20160716171007.GA30000@redhat.com> References: <20160714182545.786693675@infradead.org> <20160714183022.336211504@infradead.org> <20160714184351.GA18388@redhat.com> <20160714192018.GM30154@twins.programming.kicks-ass.net> <20160715132709.GA27644@redhat.com> <20160715133928.GH7094@linux.vnet.ibm.com> <20160715134523.GB28589@redhat.com> <20160715153840.GI7094@linux.vnet.ibm.com> <20160715164938.GA4294@redhat.com> <20160715180140.GM7094@linux.vnet.ibm.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20160715180140.GM7094@linux.vnet.ibm.com> User-Agent: Mutt/1.5.18 (2008-05-17) X-Greylist: Sender IP whitelisted, not delayed by milter-greylist-4.5.16 (mx1.redhat.com [10.5.110.39]); Sat, 16 Jul 2016 17:09:50 +0000 (UTC) Sender: linux-kernel-owner@vger.kernel.org List-ID: X-Mailing-List: linux-kernel@vger.kernel.org On 07/15, Paul E. McKenney wrote: > > On Fri, Jul 15, 2016 at 06:49:39PM +0200, Oleg Nesterov wrote: > > > > IOW, please ignore 2/2 which adds PERCPU_RWSEM_READER, the new version > > just adds rcu_sync_sabotage() which should be renamed (and use GP_PASSED). > > OK, then move the checks out into the callers that would have used > __rcu_sync_enter(). ;-) Cough, now I can't understand which check do you mean ;) OK, let me show the code, then we will hopefully understand each other. ---------------------------------------------------------------------- So, as you can see I have fooled you ;) I'll send the patch on top of Peter's changes, this is the (UNTESTED) code with the patch applied. Peter, Paul, could you review? Do you see any hole? Why. Firstly, note that the state machine was greatly simplified, and rsp->cb_state has gone, we have a single "state" variable, gp_state. Note also the ->sync() op has gone (and actually ->wait() too, see the "TODO" comment). GP_IDLE - owned by __rcu_sync_enter() which can only move this state to GP_ENTER - owned by rcu-callback which moves it to GP_PASSED - owned by the last rcu_sync_exit() which moves it to GP_EXIT - owned by rcu-callback which moves it back to GP_IDLE. Yes, this is a bit simplified, we also have GP_REPLAY, but hopefully clear. And, there is another possible transition, GP_ENTER -> GP_IDLE, because not it is possible to call __rcu_sync_enter() and rcu_sync_exit() in any state (except obviously they should be balanced), and they do not block. The only blocking call is __rcu_sync_wait() which actually waits for GP. Obviously should only be called if gp_count != 0, iow after __rcu_sync_enter. -------------------------------------------------------------------------- Now, cgroup_init() can simply call __rcu_sync_enter(&cgroup_threadgroup_rwsem) and switch this sem into the slow mode. Compared to "sabotage" from Peter this implies the unnecessary call_rcu_sched(), but I hope we can tolerate this. And we can even add a runtime knob to switch between "fast" and "slow aka writer-biased" modes for cgroup_threadgroup_rwsem. -------------------------------------------------------------------------- And I think __rcu_sync_enter() can have more users. Let's look at freeze_super(). It calls percpu_down_write() 3 times, and waits for 3 GP's sequentally. Now we can add 3 __rcu_sync_enter's at the start and 3 rcu_sync_exit's at the end (actually we can do better, just to simplify). And again, note that rcu_sync_exit() will work correctly even if we (say) return -EBUSY, so rcu_sync_wait and/or percpu_down_write() was not called in between, and in this case we won't block waiting for GP. What do you think? Oleg. --- // rcu_sync.h: ---------------------------------------------------------------- struct rcu_sync { int gp_state; int gp_count; wait_queue_head_t gp_wait; struct rcu_head cb_head; enum rcu_sync_type gp_type; }; // sync.c --------------------------------------------------------------------- #include #include #ifdef CONFIG_PROVE_RCU #define __INIT_HELD(func) .held = func, #else #define __INIT_HELD(func) #endif static const struct { void (*call)(struct rcu_head *, void (*)(struct rcu_head *)); void (*wait)(void); // TODO: remove this, see the comment in dtor #ifdef CONFIG_PROVE_RCU int (*held)(void); #endif } gp_ops[] = { [RCU_SYNC] = { .call = call_rcu, .wait = rcu_barrier, __INIT_HELD(rcu_read_lock_held) }, [RCU_SCHED_SYNC] = { .call = call_rcu_sched, .wait = rcu_barrier_sched, __INIT_HELD(rcu_read_lock_sched_held) }, [RCU_BH_SYNC] = { .call = call_rcu_bh, .wait = rcu_barrier_bh, __INIT_HELD(rcu_read_lock_bh_held) }, }; #define rss_lock gp_wait.lock enum { GP_IDLE = 0, GP_ENTER, GP_PASSED, GP_EXIT, GP_REPLAY }; static void rcu_sync_func(struct rcu_head *rcu); static void rcu_sync_call(struct rcu_sync *rsp) { // TODO: THIS IS SUBOPTIMAL. We want to call it directly // if rcu_blocking_is_gp() == T, but it has might_sleep(). gp_ops[rsp->gp_type].call(&rsp->cb_head, rcu_sync_func); } static void rcu_sync_func(struct rcu_head *rcu) { struct rcu_sync *rsp = container_of(rcu, struct rcu_sync, cb_head); unsigned long flags; BUG_ON(rsp->gp_state == GP_IDLE); BUG_ON(rsp->gp_state == GP_PASSED); spin_lock_irqsave(&rsp->rss_lock, flags); if (rsp->gp_count) { /* * We're at least a GP after the first __rcu_sync_enter(). */ rsp->gp_state = GP_PASSED; } else if (rsp->gp_state == GP_REPLAY) { /* * A new rcu_sync_exit() has happened; requeue the callback * to catch a later GP. */ rsp->gp_state = GP_EXIT; rcu_sync_call(rsp); } else { /* * We're at least a GP after the last rcu_sync_exit(); * eveybody will now have observed the write side critical * section. Let 'em rip!. * * OR. ->gp_state can be still GP_ENTER if __rcu_sync_wait() * wasn't called after __rcu_sync_enter(), abort. */ rsp->gp_state = GP_IDLE; } spin_unlock_irqrestore(&rsp->rss_lock, flags); } bool __rcu_sync_enter(struct rcu_sync *rsp) { int gp_count, gp_state; spin_lock_irq(&rsp->rss_lock); gp_count = rsp->gp_count++; gp_state = rsp->gp_state; if (gp_state == GP_IDLE) { rsp->gp_state = GP_ENTER; rcu_sync_call(rsp); } spin_unlock_irq(&rsp->rss_lock); BUG_ON(gp_count != 0 && gp_state == GP_IDLE); BUG_ON(gp_count == 0 && gp_state == GP_PASSED); return gp_state < GP_PASSED; } void __rcu_sync_wait(struct rcu_sync *rsp) { BUG_ON(rsp->gp_state == GP_IDLE); BUG_ON(rsp->gp_count == 0); wait_event(rsp->gp_wait, rsp->gp_state >= GP_PASSED); } void rcu_sync_enter(struct rcu_sync *rsp) { if (__rcu_sync_enter(rsp)) __rcu_sync_wait(rsp); } void rcu_sync_exit(struct rcu_sync *rsp) { BUG_ON(rsp->gp_state == GP_IDLE); BUG_ON(rsp->gp_count == 0); spin_lock_irq(&rsp->rss_lock); if (!--rsp->gp_count) { if (rsp->gp_state == GP_PASSED) { rsp->gp_state = GP_EXIT; rcu_sync_call(rsp); } else if (rsp->gp_state == GP_EXIT) { rsp->gp_state = GP_REPLAY; } } spin_unlock_irq(&rsp->rss_lock); } void rcu_sync_dtor(struct rcu_sync *rsp) { int gp_state; BUG_ON(rsp->gp_count); BUG_ON(rsp->gp_state == GP_PASSED); spin_lock_irq(&rsp->rss_lock); if (rsp->gp_state == GP_REPLAY) rsp->gp_state = GP_EXIT; gp_state = rsp->gp_state; spin_unlock_irq(&rsp->rss_lock); // TODO: add another wake_up_locked() into rcu_sync_func(), // use wait_event + spin_lock_wait, remove gp_ops->wait(). if (gp_state != GP_IDLE) { gp_ops[rsp->gp_type].wait(); BUG_ON(rsp->gp_state != GP_IDLE); } }