It is functionally equivalent to struct rcu_sync_struct { atomic_t counter; }; static inline bool rcu_sync_is_idle(struct rcu_sync_struct *rss) { return atomic_read(&rss->counter) == 0; } static inline void rcu_sync_enter(struct rcu_sync_struct *rss) { atomic_inc(&rss->counter); synchronize_sched(); } static inline void rcu_sync_exit(struct rcu_sync_struct *rss) { synchronize_sched(); atomic_dec(&rss->counter); } except: it records the state and synchronize_sched() is only called by rcu_sync_enter() and only if necessary. Reviewed-by: Paul E. McKenney Signed-off-by: Oleg Nesterov Signed-off-by: Peter Zijlstra (Intel) --- include/linux/rcusync.h | 64 ++++++++++++++++++++++++++++ kernel/rcu/Makefile | 2 kernel/rcu/sync.c | 108 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 173 insertions(+), 1 deletion(-) --- /dev/null +++ b/include/linux/rcusync.h @@ -0,0 +1,64 @@ +#ifndef _LINUX_RCUSYNC_H_ +#define _LINUX_RCUSYNC_H_ + +#include +#include + +struct rcu_sync_struct { + int gp_state; + int gp_count; + wait_queue_head_t gp_wait; + + int cb_state; + struct rcu_head cb_head; + + void (*sync)(void); + void (*call)(struct rcu_head *, void (*)(struct rcu_head *)); +}; + +#define ___RCU_SYNC_INIT(name) \ + .gp_state = 0, \ + .gp_count = 0, \ + .gp_wait = __WAIT_QUEUE_HEAD_INITIALIZER(name.gp_wait), \ + .cb_state = 0 + +#define __RCU_SCHED_SYNC_INIT(name) { \ + ___RCU_SYNC_INIT(name), \ + .sync = synchronize_sched, \ + .call = call_rcu_sched, \ +} + +#define __RCU_BH_SYNC_INIT(name) { \ + ___RCU_SYNC_INIT(name), \ + .sync = synchronize_rcu_bh, \ + .call = call_rcu_bh, \ +} + +#define __RCU_SYNC_INIT(name) { \ + ___RCU_SYNC_INIT(name), \ + .sync = synchronize_rcu, \ + .call = call_rcu, \ +} + +#define DEFINE_RCU_SCHED_SYNC(name) \ + struct rcu_sync_struct name = __RCU_SCHED_SYNC_INIT(name) + +#define DEFINE_RCU_BH_SYNC(name) \ + struct rcu_sync_struct name = __RCU_BH_SYNC_INIT(name) + +#define DEFINE_RCU_SYNC(name) \ + struct rcu_sync_struct name = __RCU_SYNC_INIT(name) + +static inline bool rcu_sync_is_idle(struct rcu_sync_struct *rss) +{ + return !rss->gp_state; /* GP_IDLE */ +} + +enum rcu_sync_type { RCU_SYNC, RCU_SCHED_SYNC, RCU_BH_SYNC }; + +extern void rcu_sync_init(struct rcu_sync_struct *, enum rcu_sync_type); +extern void rcu_sync_enter(struct rcu_sync_struct *); +extern void rcu_sync_exit(struct rcu_sync_struct *); + +#endif /* _LINUX_RCUSYNC_H_ */ + --- a/kernel/rcu/Makefile +++ b/kernel/rcu/Makefile @@ -1,4 +1,4 @@ -obj-y += update.o +obj-y += update.o sync.o obj-$(CONFIG_SRCU) += srcu.o obj-$(CONFIG_RCU_TORTURE_TEST) += rcutorture.o obj-$(CONFIG_TREE_RCU) += tree.o --- /dev/null +++ b/kernel/rcu/sync.c @@ -0,0 +1,108 @@ + +#include +#include + +enum { GP_IDLE = 0, GP_PENDING, GP_PASSED }; +enum { CB_IDLE = 0, CB_PENDING, CB_REPLAY }; + +#define rss_lock gp_wait.lock + +void rcu_sync_init(struct rcu_sync_struct *rss, enum rcu_sync_type type) +{ + memset(rss, 0, sizeof(*rss)); + init_waitqueue_head(&rss->gp_wait); + + switch (type) { + case RCU_SYNC: + rss->sync = synchronize_rcu; + rss->call = call_rcu; + break; + + case RCU_SCHED_SYNC: + rss->sync = synchronize_sched; + rss->call = call_rcu_sched; + break; + + case RCU_BH_SYNC: + rss->sync = synchronize_rcu_bh; + rss->call = call_rcu_bh; + break; + } +} + +void rcu_sync_enter(struct rcu_sync_struct *rss) +{ + bool need_wait, need_sync; + + spin_lock_irq(&rss->rss_lock); + need_wait = rss->gp_count++; + need_sync = rss->gp_state == GP_IDLE; + if (need_sync) + rss->gp_state = GP_PENDING; + spin_unlock_irq(&rss->rss_lock); + + BUG_ON(need_wait && need_sync); + + if (need_sync) { + rss->sync(); + rss->gp_state = GP_PASSED; + wake_up_all(&rss->gp_wait); + } else if (need_wait) { + wait_event(rss->gp_wait, rss->gp_state == GP_PASSED); + } else { + /* + * Possible when there's a pending CB from a rcu_sync_exit(). + * Nobody has yet been allowed the 'fast' path and thus we can + * avoid doing any sync(). The callback will get 'dropped'. + */ + BUG_ON(rss->gp_state != GP_PASSED); + } +} + +static void rcu_sync_func(struct rcu_head *rcu) +{ + struct rcu_sync_struct *rss = + container_of(rcu, struct rcu_sync_struct, cb_head); + unsigned long flags; + + + BUG_ON(rss->gp_state != GP_PASSED); + BUG_ON(rss->cb_state == CB_IDLE); + + spin_lock_irqsave(&rss->rss_lock, flags); + if (rss->gp_count) { + /* + * A new rcu_sync_begin() has happened; drop the callback. + */ + rss->cb_state = CB_IDLE; + } else if (rss->cb_state == CB_REPLAY) { + /* + * A new rcu_sync_exit() has happened; requeue the callback + * to catch a later GP. + */ + rss->cb_state = CB_PENDING; + rss->call(&rss->cb_head, rcu_sync_func); + } else { + /* + * We're at least a GP after rcu_sync_exit(); eveybody will now + * have observed the write side critical section. Let 'em rip!. + */ + rss->cb_state = CB_IDLE; + rss->gp_state = GP_IDLE; + } + spin_unlock_irqrestore(&rss->rss_lock, flags); +} + +void rcu_sync_exit(struct rcu_sync_struct *rss) +{ + spin_lock_irq(&rss->rss_lock); + if (!--rss->gp_count) { + if (rss->cb_state == CB_IDLE) { + rss->cb_state = CB_PENDING; + rss->call(&rss->cb_head, rcu_sync_func); + } else if (rss->cb_state == CB_PENDING) { + rss->cb_state = CB_REPLAY; + } + } + spin_unlock_irq(&rss->rss_lock); +} -- To unsubscribe from this list: send the line "unsubscribe linux-kernel" in Please read the FAQ at http://www.tux.org/lkml/