Implement lock handoff to avoid lock starvation. Lock starvation is possible because mutex_lock() allows lock stealing, where a running (or optimistic spinning) task beats the woken waiter to the acquire. Lock stealing is an important performance optimization because waiting for a waiter to wake up and get runtime can take a significant time, during which everyboy would stall on the lock. The down-side is of course that it allows for starvation. This patch has the waiter requesting a handoff if it fails to acquire the lock upon waking. This re-introduces some of the wait time, because once we do a handoff we have to wait for the waiter to wake up again. A future patch will add a round of optimistic spinning to attempt to alleviate this penalty, but if that turns out to not be enough, we can add a counter and only request handoff after multiple failed wakeups. There are a few tricky implementation details: - accepting a handoff must only be done in the wait-loop. Since the handoff condition is owner == current, it can easily cause recursive locking trouble. - accepting the handoff must be careful to provide the ACQUIRE semantics. - having the HANDOFF bit set on unlock requires care, we must not clear the owner. - we must be careful to not leave HANDOFF set after we've acquired the lock. An alternative approach is to combine the bit set and a trylock in one cmpxchg loop, which ensures we either steal/acquire the lock or unlock must observe and clear HANDOFF. Signed-off-by: Peter Zijlstra (Intel) --- kernel/locking/mutex.c | 139 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 117 insertions(+), 22 deletions(-) --- a/kernel/locking/mutex.c +++ b/kernel/locking/mutex.c @@ -54,8 +54,10 @@ EXPORT_SYMBOL(__mutex_init); * bits to store extra state. * * Bit0 indicates a non-empty waiter list; unlock must issue a wakeup. + * Bit1 indicates unlock needs to hand the lock to the top-waiter */ #define MUTEX_FLAG_WAITERS 0x01 +#define MUTEX_FLAG_HANDOFF 0x02 #define MUTEX_FLAGS 0x03 @@ -71,8 +73,13 @@ static inline unsigned long __owner_flag /* * Actual trylock that will work on any unlocked state. + * + * When setting the owner field, we must preserve the low flag bits. + * + * Be careful with @handoff, only set that in a wait-loop (where you set + * HANDOFF) to avoid recursive lock attempts. */ -static inline bool __mutex_trylock(struct mutex *lock) +static inline bool __mutex_trylock(struct mutex *lock, const bool handoff) { unsigned long owner, curr = (unsigned long)current; @@ -80,8 +87,24 @@ static inline bool __mutex_trylock(struc for (;;) { /* must loop, can race against a flag */ unsigned long old; - if (__owner_task(owner)) + if (__owner_task(owner)) { + if (handoff && unlikely(__owner_task(owner) == current)) { + /* + * Provide ACQUIRE semantics for the lock-handoff. + * + * We cannot easily use load-acquire here, since + * the actual load is a failed cmpxchg, which + * doesn't imply any barriers. + * + * Also, this is a fairly unlikely scenario, and + * this contains the cost. + */ + smp_mb(); /* ACQUIRE */ + return true; + } + return false; + } old = atomic_long_cmpxchg_acquire(&lock->owner, owner, curr | __owner_flags(owner)); @@ -134,6 +157,39 @@ static inline void __mutex_clear_flag(st atomic_long_andnot(flag, &lock->owner); } +static inline bool __mutex_waiter_is_first(struct mutex *lock, struct mutex_waiter *waiter) +{ + return list_first_entry(&lock->wait_list, struct mutex_waiter, list) == waiter; +} + +/* + * Give up ownership to a specific task, when @task = NULL, this is equivalent + * to a regular unlock. Clears HANDOFF, preserves WAITERS. Provides RELEASE + * semantics like a regular unlock, the __mutex_trylock() provides matching + * ACQUIRE semantics for the handoff. + */ +static void __mutex_handoff(struct mutex *lock, struct task_struct *task) +{ + unsigned long owner = atomic_long_read(&lock->owner); + + for (;;) { + unsigned long old, new; + +#ifdef CONFIG_DEBUG_MUTEXES + DEBUG_LOCKS_WARN_ON(__owner_task(owner) != current); +#endif + + new = (owner & MUTEX_FLAG_WAITERS); + new |= (unsigned long)task; + + old = atomic_long_cmpxchg_release(&lock->owner, owner, new); + if (old == owner) + break; + + owner = old; + } +} + #ifndef CONFIG_DEBUG_LOCK_ALLOC /* * We split the mutex lock/unlock logic into separate fastpath and @@ -398,7 +454,7 @@ static bool mutex_optimistic_spin(struct break; /* Try to acquire the mutex if it is unlocked. */ - if (__mutex_trylock(lock)) { + if (__mutex_trylock(lock, false)) { osq_unlock(&lock->osq); return true; } @@ -523,6 +579,7 @@ __mutex_lock_common(struct mutex *lock, struct task_struct *task = current; struct mutex_waiter waiter; unsigned long flags; + bool first = false; int ret; if (use_ww_ctx) { @@ -534,7 +591,8 @@ __mutex_lock_common(struct mutex *lock, preempt_disable(); mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip); - if (__mutex_trylock(lock) || mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) { + if (__mutex_trylock(lock, false) || + mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) { /* got the lock, yay! */ lock_acquired(&lock->dep_map, ip); if (use_ww_ctx) { @@ -551,7 +609,7 @@ __mutex_lock_common(struct mutex *lock, /* * After waiting to acquire the wait_lock, try again. */ - if (__mutex_trylock(lock)) + if (__mutex_trylock(lock, false)) goto skip_wait; debug_mutex_lock_common(lock, &waiter); @@ -561,15 +619,15 @@ __mutex_lock_common(struct mutex *lock, list_add_tail(&waiter.list, &lock->wait_list); waiter.task = task; - if (list_first_entry(&lock->wait_list, struct mutex_waiter, list) == &waiter) + if (__mutex_waiter_is_first(lock, &waiter)) __mutex_set_flag(lock, MUTEX_FLAG_WAITERS); + if (__mutex_trylock(lock, false)) + goto remove_waiter; + lock_contended(&lock->dep_map, ip); for (;;) { - if (__mutex_trylock(lock)) - break; - /* * got a signal? (This code gets eliminated in the * TASK_UNINTERRUPTIBLE case.) @@ -586,17 +644,26 @@ __mutex_lock_common(struct mutex *lock, } __set_task_state(task, state); - - /* didn't get the lock, go to sleep: */ spin_unlock_mutex(&lock->wait_lock, flags); schedule_preempt_disabled(); spin_lock_mutex(&lock->wait_lock, flags); + + if (__mutex_waiter_is_first(lock, &waiter)) { + first = true; + __mutex_set_flag(lock, MUTEX_FLAG_HANDOFF); + } + + if (__mutex_trylock(lock, true)) + break; } __set_task_state(task, TASK_RUNNING); +remove_waiter: mutex_remove_waiter(lock, &waiter, task); if (likely(list_empty(&lock->wait_list))) - __mutex_clear_flag(lock, MUTEX_FLAG_WAITERS); + __mutex_clear_flag(lock, MUTEX_FLAGS); + else if (first && (atomic_long_read(&lock->owner) & MUTEX_FLAG_HANDOFF)) + __mutex_clear_flag(lock, MUTEX_FLAG_HANDOFF); debug_mutex_free_waiter(&waiter); @@ -724,33 +791,61 @@ EXPORT_SYMBOL_GPL(__ww_mutex_lock_interr */ static noinline void __sched __mutex_unlock_slowpath(struct mutex *lock, unsigned long ip) { + struct task_struct *next = NULL; unsigned long owner, flags; WAKE_Q(wake_q); mutex_release(&lock->dep_map, 1, ip); /* - * Release the lock before (potentially) taking the spinlock - * such that other contenders can get on with things ASAP. + * Release the lock before (potentially) taking the spinlock such that + * other contenders can get on with things ASAP. + * + * Except when HANDOFF, in that case we must not clear the owner field, + * but instead set it to the top waiter. */ - owner = atomic_long_fetch_and_release(MUTEX_FLAGS, &lock->owner); - if (!owner) - return; + owner = atomic_long_read(&lock->owner); + for (;;) { + unsigned long old; + +#ifdef CONFIG_DEBUG_MUTEXES + DEBUG_LOCKS_WARN_ON(__owner_task(owner) != current); +#endif + + if (owner & MUTEX_FLAG_HANDOFF) + break; + + old = atomic_long_cmpxchg_release(&lock->owner, owner, + __owner_flags(owner)); + if (old == owner) { + if (owner & MUTEX_FLAG_WAITERS) + break; + + return; + } + + owner = old; + } spin_lock_mutex(&lock->wait_lock, flags); debug_mutex_unlock(lock); - if (!list_empty(&lock->wait_list)) { /* get the first entry from the wait-list: */ struct mutex_waiter *waiter = - list_entry(lock->wait_list.next, - struct mutex_waiter, list); + list_first_entry(&lock->wait_list, + struct mutex_waiter, list); + + next = waiter->task; debug_mutex_wake_waiter(lock, waiter); - wake_q_add(&wake_q, waiter->task); + wake_q_add(&wake_q, next); } + if (owner & MUTEX_FLAG_HANDOFF) + __mutex_handoff(lock, next); + spin_unlock_mutex(&lock->wait_lock, flags); + wake_up_q(&wake_q); } @@ -853,7 +948,7 @@ __ww_mutex_lock_interruptible_slowpath(s */ int __sched mutex_trylock(struct mutex *lock) { - bool locked = __mutex_trylock(lock); + bool locked = __mutex_trylock(lock, false); if (locked) mutex_acquire(&lock->dep_map, 0, 1, _RET_IP_);