On x86 we cannot do fetch_or with a single instruction and end up using a cmpxchg loop, this reduces determinism. Replace the fetch_or with a very tricky composite xchg8 + load. The basic idea is that we use xchg8 to test-and-set the pending bit (when it is a byte) and then a load to fetch the whole word. Using two instructions of course opens a window we previously did not have. In particular the ordering between pending and tail is of interrest, because that is where the split happens. The claim is that if we order them, it all works out just fine. There are two specific cases where the pending,tail state changes: - when the 3rd lock(er) comes in and finds pending set, it'll queue and set tail; since we set tail while pending is set, the ordering is split is not important (and not fundamentally different form fetch_or). [*] - when the last queued lock holder acquires the lock (uncontended), we clear the tail and set the lock byte. By first setting the pending bit this cmpxchg will fail and the later load must then see the remaining tail. Another interesting scenario is where there are only 2 threads: lock := (0,0,0) CPU 0 CPU 1 lock() lock() trylock(-> 0,0,1) trylock() /* fail */ return; xchg_relaxed(pending, 1) (-> 0,1,1) mb() val = smp_load_acquire(*lock); Where, without the mb() the load would've been allowed to return 0 for the locked byte. [*] there is a fun scenario where the 3rd locker observes the pending bit, but before it sets the tail, the 1st lock (owner) unlocks and the 2nd locker acquires the lock, leaving us with a temporary 0,0,1 state. But this is not different between fetch_or or this new method. Suggested-by: Will Deacon Signed-off-by: Peter Zijlstra (Intel) --- arch/x86/include/asm/qspinlock.h | 2 + kernel/locking/qspinlock.c | 56 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 57 insertions(+), 1 deletion(-) --- a/arch/x86/include/asm/qspinlock.h +++ b/arch/x86/include/asm/qspinlock.h @@ -9,6 +9,8 @@ #define _Q_PENDING_LOOPS (1 << 9) +#define _Q_NO_FETCH_OR + #ifdef CONFIG_PARAVIRT_SPINLOCKS extern void native_queued_spin_lock_slowpath(struct qspinlock *lock, u32 val); extern void __pv_init_lock_hash(void); --- a/kernel/locking/qspinlock.c +++ b/kernel/locking/qspinlock.c @@ -232,6 +232,60 @@ static __always_inline u32 xchg_tail(str #endif /* _Q_PENDING_BITS == 8 */ /** + * set_pending_fetch_acquire - fetch the whole lock value and set pending and locked + * @lock : Pointer to queued spinlock structure + * Return: The previous lock value + * + * *,*,* -> *,1,* + */ +static __always_inline u32 set_pending_fetch_acquire(struct qspinlock *lock) +{ +#if defined(_Q_NO_FETCH_OR) && _Q_PENDING_BITS == 8 + u32 val; + + /* + * For the !LL/SC archs that do not have a native atomic_fetch_or + * but do have a native xchg (x86) we can do trickery and avoid the + * cmpxchg-loop based fetch-or to improve determinism. + * + * We first xchg the pending byte to set PENDING, and then issue a load + * for the rest of the word and mask out the pending byte to compute a + * 'full' value. + */ + val = xchg_relaxed(&lock->pending, 1) << _Q_PENDING_OFFSET; + /* + * Ensures the tail load happens after the xchg(). + * + * lock unlock (a) + * xchg ---------------. + * (b) lock unlock +----- fetch_or + * load ---------------' + * lock unlock (c) + * + * For both lock and unlock, (a) and (c) are the same as fetch_or(), + * since either are fully before or after. The only new case is (b), + * where a concurrent lock or unlock can interleave with the composite + * operation. + * + * In either case, it is the queueing case that is of interrest, otherwise + * the whole operation is covered by the xchg() and the tail will be 0. + * + * For lock-(b); we only care if we set the PENDING bit or not. If we lost + * the PENDING race, we queue. Otherwise we'd observe the tail anyway. + * + * For unlock-(b); since we'll have set PENDING, the uncontended claim + * will fail and we'll observe a !0 tail. + */ + smp_mb__after_atomic(); + val |= atomic_read_acquire(&lock->val) & ~_Q_PENDING_MASK; + + return val; +#else + return atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val); +#endif +} + +/** * set_locked - Set the lock bit and own the lock * @lock: Pointer to queued spinlock structure * @@ -328,7 +382,7 @@ void queued_spin_lock_slowpath(struct qs * * 0,0,* -> 0,1,* -> 0,0,1 pending, trylock */ - val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val); + val = set_pending_fetch_acquire(lock); /* * If we observe contention, there was a concurrent lock.