From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([2001:4830:134:3::10]:57304) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1aSoTO-0006NJ-7H for qemu-devel@nongnu.org; Mon, 08 Feb 2016 11:15:36 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1aSoTL-0002SS-MZ for qemu-devel@nongnu.org; Mon, 08 Feb 2016 11:15:34 -0500 Received: from mail-wm0-x243.google.com ([2a00:1450:400c:c09::243]:32834) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1aSoTL-0002SL-Bk for qemu-devel@nongnu.org; Mon, 08 Feb 2016 11:15:31 -0500 Received: by mail-wm0-x243.google.com with SMTP id c200so2288197wme.0 for ; Mon, 08 Feb 2016 08:15:31 -0800 (PST) Sender: Paolo Bonzini From: Paolo Bonzini Date: Mon, 8 Feb 2016 17:15:02 +0100 Message-Id: <1454948107-11844-12-git-send-email-pbonzini@redhat.com> In-Reply-To: <1454948107-11844-1-git-send-email-pbonzini@redhat.com> References: <1454948107-11844-1-git-send-email-pbonzini@redhat.com> Subject: [Qemu-devel] [PATCH 11/16] qemu-thread: optimize QemuLockCnt with futexes on Linux List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: qemu-devel@nongnu.org Cc: stefanha@redhat.com This is complex, but I think it is reasonably documented in the source. Signed-off-by: Paolo Bonzini --- docs/lockcnt.txt | 9 +- include/qemu/futex.h | 36 ++++++ include/qemu/thread.h | 3 + trace-events | 10 ++ util/lockcnt.c | 282 +++++++++++++++++++++++++++++++++++++++++++++++ util/qemu-thread-posix.c | 25 +---- 6 files changed, 336 insertions(+), 29 deletions(-) create mode 100644 include/qemu/futex.h diff --git a/docs/lockcnt.txt b/docs/lockcnt.txt index fc5d240..594764b 100644 --- a/docs/lockcnt.txt +++ b/docs/lockcnt.txt @@ -142,12 +142,11 @@ can also be more efficient in two ways: - it avoids taking the lock for many operations (for example incrementing the counter while it is non-zero); -- on some platforms, one could implement QemuLockCnt to hold the - lock and the mutex in a single word, making it no more expensive +- on some platforms, one can implement QemuLockCnt to hold the lock + and the mutex in a single word, making the fast path no more expensive than simply managing a counter using atomic operations (see - docs/atomics.txt). This is not implemented yet, but can be - very helpful if concurrent access to the data structure is - expected to be rare. + docs/atomics.txt). This can be very helpful if concurrent access to + the data structure is expected to be rare. Using the same mutex for frees and writes can still incur some small diff --git a/include/qemu/futex.h b/include/qemu/futex.h new file mode 100644 index 0000000..c3d1089 --- /dev/null +++ b/include/qemu/futex.h @@ -0,0 +1,36 @@ +/* + * Wrappers around Linux futex syscall + * + * Copyright Red Hat, Inc. 2015 + * + * Author: + * Paolo Bonzini + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + * + */ + +#include +#include + +#define futex(...) syscall(__NR_futex, __VA_ARGS__) + +static inline void futex_wake(void *f, int n) +{ + futex(f, FUTEX_WAKE, n, NULL, NULL, 0); +} + +static inline void futex_wait(void *f, unsigned val) +{ + while (futex(f, FUTEX_WAIT, (int) val, NULL, NULL, 0)) { + switch (errno) { + case EWOULDBLOCK: + return; + case EINTR: + break; /* get out of switch and retry */ + default: + abort(); + } + } +} diff --git a/include/qemu/thread.h b/include/qemu/thread.h index 9fadca4..22d92d2 100644 --- a/include/qemu/thread.h +++ b/include/qemu/thread.h @@ -1,6 +1,7 @@ #ifndef __QEMU_THREAD_H #define __QEMU_THREAD_H 1 +#include "config-host.h" #include #include @@ -67,7 +68,9 @@ void qemu_thread_atexit_add(struct Notifier *notifier); void qemu_thread_atexit_remove(struct Notifier *notifier); struct QemuLockCnt { +#ifndef CONFIG_LINUX QemuMutex mutex; +#endif unsigned count; }; diff --git a/trace-events b/trace-events index c9ac144..daa091e 100644 --- a/trace-events +++ b/trace-events @@ -1434,6 +1434,16 @@ hbitmap_iter_skip_words(const void *hb, void *hbi, uint64_t pos, unsigned long c hbitmap_reset(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64 hbitmap_set(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64 +# util/lockcnt.c +lockcnt_fast_path_attempt(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d" +lockcnt_fast_path_success(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d succeeded" +lockcnt_unlock_attempt(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d" +lockcnt_unlock_success(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d succeeded" +lockcnt_futex_wait_prepare(const void *lockcnt, int expected, int new) "lockcnt %p preparing slow path %d->%d" +lockcnt_futex_wait(const void *lockcnt, int val) "lockcnt %p waiting on %d" +lockcnt_futex_wait_resume(const void *lockcnt, int new) "lockcnt %p after wait: %d" +lockcnt_futex_wake(const void *lockcnt) "lockcnt %p waking up one waiter" + # target-s390x/mmu_helper.c get_skeys_nonzero(int rc) "SKEY: Call to get_skeys unexpectedly returned %d" set_skeys_nonzero(int rc) "SKEY: Call to set_skeys unexpectedly returned %d" diff --git a/util/lockcnt.c b/util/lockcnt.c index 304f9d9..56eb29e 100644 --- a/util/lockcnt.c +++ b/util/lockcnt.c @@ -18,7 +18,288 @@ #include #include "qemu/thread.h" #include "qemu/atomic.h" +#include "trace.h" +#ifdef CONFIG_LINUX +#include "qemu/futex.h" + +/* On Linux, bits 0-1 are a futex-based lock, bits 2-31 are the counter. + * For the mutex algorithm see Ulrich Drepper's "Futexes Are Tricky" (ok, + * this is not the most relaxing citation I could make...). It is similar + * to mutex2 in the paper. + */ + +#define QEMU_LOCKCNT_STATE_MASK 3 +#define QEMU_LOCKCNT_STATE_FREE 0 +#define QEMU_LOCKCNT_STATE_LOCKED 1 +#define QEMU_LOCKCNT_STATE_WAITING 2 + +#define QEMU_LOCKCNT_COUNT_STEP 4 +#define QEMU_LOCKCNT_COUNT_SHIFT 2 + +void qemu_lockcnt_init(QemuLockCnt *lockcnt) +{ + lockcnt->count = 0; +} + +void qemu_lockcnt_destroy(QemuLockCnt *lockcnt) +{ +} + +/* *val is the current value of lockcnt->count. + * + * If the lock is free, try a cmpxchg from *val to new_if_free; return + * true and set *val to the old value found by the cmpxchg in + * lockcnt->count. + * + * If the lock is taken, wait for it to be released and return false + * *without trying again to take the lock*. Again, set *val to the + * new value of lockcnt->count. + * + * new_if_free's bottom two bits must not be QEMU_LOCKCNT_STATE_LOCKED + * if calling this function a second time after it has returned + * false. + */ +static bool qemu_lockcnt_cmpxchg_or_wait(QemuLockCnt *lockcnt, int *val, + int new_if_free, bool *waited) +{ + /* Fast path for when the lock is free. */ + if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_FREE) { + int expected = *val; + + trace_lockcnt_fast_path_attempt(lockcnt, expected, new_if_free); + *val = atomic_cmpxchg(&lockcnt->count, expected, new_if_free); + if (*val == expected) { + trace_lockcnt_fast_path_success(lockcnt, expected, new_if_free); + *val = new_if_free; + return true; + } + } + + /* The slow path moves from locked to waiting if necessary, then + * does a futex wait. Both steps can be repeated ad nauseam, + * only getting out of the loop if we can have another shot at the + * fast path. Once we can, get out to compute the new destination + * value for the fast path. + */ + while ((*val & QEMU_LOCKCNT_STATE_MASK) != QEMU_LOCKCNT_STATE_FREE) { + if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_LOCKED) { + int expected = *val; + int new = expected - QEMU_LOCKCNT_STATE_LOCKED + QEMU_LOCKCNT_STATE_WAITING; + + trace_lockcnt_futex_wait_prepare(lockcnt, expected, new); + *val = atomic_cmpxchg(&lockcnt->count, expected, new); + if (*val == expected) { + *val = new; + } + continue; + } + + if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_WAITING) { + *waited = true; + trace_lockcnt_futex_wait(lockcnt, *val); + futex_wait(&lockcnt->count, *val); + *val = atomic_read(&lockcnt->count); + trace_lockcnt_futex_wait_resume(lockcnt, *val); + continue; + } + + abort(); + } + return false; +} + +static void lockcnt_wake(QemuLockCnt *lockcnt) +{ + trace_lockcnt_futex_wake(lockcnt); + futex_wake(&lockcnt->count, 1); +} + +void qemu_lockcnt_inc(QemuLockCnt *lockcnt) +{ + int val = atomic_read(&lockcnt->count); + bool waited = false; + + for (;;) { + if (val >= QEMU_LOCKCNT_COUNT_STEP) { + int expected = val; + val = atomic_cmpxchg(&lockcnt->count, val, val + QEMU_LOCKCNT_COUNT_STEP); + if (val == expected) { + break; + } + } else { + /* The fast path is (0, unlocked)->(1, unlocked). */ + if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, QEMU_LOCKCNT_COUNT_STEP, + &waited)) { + break; + } + } + } + + /* If we were woken by another thread, we should also wake one because + * we are effectively releasing the lock that was given to us. This is + * the case where qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING + * in the low bits, and qemu_lockcnt_inc_and_unlock would find it and + * wake someone. + */ + if (waited) { + lockcnt_wake(lockcnt); + } +} + +void qemu_lockcnt_dec(QemuLockCnt *lockcnt) +{ + atomic_sub(&lockcnt->count, QEMU_LOCKCNT_COUNT_STEP); +} + +/* Decrement a counter, and return locked if it is decremented to zero. + * If the function returns true, it is impossible for the counter to + * become nonzero until the next qemu_lockcnt_unlock. + */ +bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt) +{ + int val = atomic_read(&lockcnt->count); + int locked_state = QEMU_LOCKCNT_STATE_LOCKED; + bool waited = false; + + for (;;) { + if (val >= 2 * QEMU_LOCKCNT_COUNT_STEP) { + int expected = val; + int new = val - QEMU_LOCKCNT_COUNT_STEP; + val = atomic_cmpxchg(&lockcnt->count, val, new); + if (val == expected) { + break; + } + } + + /* If count is going 1->0, take the lock. The fast path is + * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting). + */ + if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) { + return true; + } + + if (waited) { + /* At this point we do not know if there are more waiters. Assume + * there are. + */ + locked_state = QEMU_LOCKCNT_STATE_WAITING; + } + } + + /* If we were woken by another thread, but we're returning in unlocked + * state, we should also wake a thread because we are effectively + * releasing the lock that was given to us. This is the case where + * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low + * bits, and qemu_lockcnt_unlock would find it and wake someone. + */ + if (waited) { + lockcnt_wake(lockcnt); + } + return false; +} + +/* If the counter is one, decrement it and return locked. Otherwise do + * nothing. + * + * If the function returns true, it is impossible for the counter to + * become nonzero until the next qemu_lockcnt_unlock. + */ +bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt) +{ + int val = atomic_read(&lockcnt->count); + int locked_state = QEMU_LOCKCNT_STATE_LOCKED; + bool waited = false; + + while (val < 2 * QEMU_LOCKCNT_COUNT_STEP) { + /* If count is going 1->0, take the lock. The fast path is + * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting). + */ + if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) { + return true; + } + + if (waited) { + /* At this point we do not know if there are more waiters. Assume + * there are. + */ + locked_state = QEMU_LOCKCNT_STATE_WAITING; + } + } + + /* If we were woken by another thread, but we're returning in unlocked + * state, we should also wake a thread because we are effectively + * releasing the lock that was given to us. This is the case where + * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low + * bits, and qemu_lockcnt_inc_and_unlock would find it and wake someone. + */ + if (waited) { + lockcnt_wake(lockcnt); + } + return false; +} + +void qemu_lockcnt_lock(QemuLockCnt *lockcnt) +{ + int val = atomic_read(&lockcnt->count); + int step = QEMU_LOCKCNT_STATE_LOCKED; + bool waited = false; + + /* The third argument is only used if the low bits of val are 0 + * (QEMU_LOCKCNT_STATE_FREE), so just blindly mix in the desired + * state. + */ + while (!qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, val + step, &waited)) { + if (waited) { + /* At this point we do not know if there are more waiters. Assume + * there are. + */ + step = QEMU_LOCKCNT_STATE_WAITING; + } + } +} + +void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt) +{ + int expected, new, val; + + val = atomic_read(&lockcnt->count); + do { + expected = val; + new = (val + QEMU_LOCKCNT_COUNT_STEP) & ~QEMU_LOCKCNT_STATE_MASK; + trace_lockcnt_unlock_attempt(lockcnt, val, new); + val = atomic_cmpxchg(&lockcnt->count, val, new); + } while (val != expected); + + trace_lockcnt_unlock_success(lockcnt, val, new); + if (val & QEMU_LOCKCNT_STATE_WAITING) { + lockcnt_wake(lockcnt); + } +} + +void qemu_lockcnt_unlock(QemuLockCnt *lockcnt) +{ + int expected, new, val; + + val = atomic_read(&lockcnt->count); + do { + expected = val; + new = val & ~QEMU_LOCKCNT_STATE_MASK; + trace_lockcnt_unlock_attempt(lockcnt, val, new); + val = atomic_cmpxchg(&lockcnt->count, val, new); + } while (val != expected); + + trace_lockcnt_unlock_success(lockcnt, val, new); + if (val & QEMU_LOCKCNT_STATE_WAITING) { + lockcnt_wake(lockcnt); + } +} + +unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt) +{ + return lockcnt->count >> QEMU_LOCKCNT_COUNT_SHIFT; +} +#else void qemu_lockcnt_init(QemuLockCnt *lockcnt) { qemu_mutex_init(&lockcnt->mutex); @@ -120,3 +401,4 @@ unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt) { return lockcnt->count; } +#endif diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c index 1aec83f..b1e3cdb 100644 --- a/util/qemu-thread-posix.c +++ b/util/qemu-thread-posix.c @@ -11,10 +11,6 @@ * */ #include "qemu/osdep.h" -#ifdef __linux__ -#include -#include -#endif #include "qemu/thread.h" #include "qemu/atomic.h" #include "qemu/notify.h" @@ -293,26 +289,7 @@ void qemu_sem_wait(QemuSemaphore *sem) } #ifdef __linux__ -#define futex(...) syscall(__NR_futex, __VA_ARGS__) - -static inline void futex_wake(QemuEvent *ev, int n) -{ - futex(ev, FUTEX_WAKE, n, NULL, NULL, 0); -} - -static inline void futex_wait(QemuEvent *ev, unsigned val) -{ - while (futex(ev, FUTEX_WAIT, (int) val, NULL, NULL, 0)) { - switch (errno) { - case EWOULDBLOCK: - return; - case EINTR: - break; /* get out of switch and retry */ - default: - abort(); - } - } -} +#include "qemu/futex.h" #else static inline void futex_wake(QemuEvent *ev, int n) { -- 2.5.0