All of lore.kernel.org
 help / color / mirror / Atom feed
From: David Vrabel <david.vrabel@citrix.com>
To: xen-devel@lists.xenproject.org
Cc: Jennifer Herbert <jennifer.herbert@citrix.com>,
	David Vrabel <david.vrabel@citrix.com>,
	Jan Beulich <jbeulich@suse.com>,
	Ian Campbell <ian.campbell@citrix.com>
Subject: [PATCHv1 4/4] spinlock: add fair read-write locks
Date: Fri, 18 Dec 2015 14:09:06 +0000	[thread overview]
Message-ID: <1450447746-9305-5-git-send-email-david.vrabel@citrix.com> (raw)
In-Reply-To: <1450447746-9305-1-git-send-email-david.vrabel@citrix.com>

From: Jennifer Herbert <jennifer.herbert@citrix.com>

The current rwlocks are write-biased and unfair.  This allows writers
to starve readers in situations where there are many writers (e.g.,
p2m type changes from log dirty updates during domain save).

Introduce queued read-write locks which use a fair spinlock (a ticket
lock in this case) to ensure fairness between readers and writers when
they are contended.

This implementation is from the Linux commit 70af2f8a4f48 by Waiman
Long and Peter Zijlstra.

    locking/rwlocks: Introduce 'qrwlocks' - fair, queued rwlocks

    This rwlock uses the arch_spin_lock_t as a waitqueue, and assuming
    the arch_spin_lock_t is a fair lock (ticket,mcs etc..) the
    resulting rwlock is a fair lock.

    It fits in the same 8 bytes as the regular rwlock_t by folding the
    reader and writer count into a single integer, using the remaining
    4 bytes for the arch_spinlock_t.

    Architectures that can single-copy adress bytes can optimize
    queue_write_unlock() with a 0 write to the LSB (the write count).

We do not yet make use of the architecture-specific optimization noted
above.

Signed-off-by: Jennifer Herbert <jennifer.herbert@citrix.com>
Signed-off-by: David Vrabel <david.vrabel@citrix.com>
---
 xen/common/spinlock.c      | 268 +++++++++++++++------------------------------
 xen/include/xen/spinlock.h | 195 +++++++++++++++++++++++++++++----
 2 files changed, 260 insertions(+), 203 deletions(-)

diff --git a/xen/common/spinlock.c b/xen/common/spinlock.c
index c129a88..9cf4136 100644
--- a/xen/common/spinlock.c
+++ b/xen/common/spinlock.c
@@ -288,208 +288,116 @@ void _spin_unlock_recursive(spinlock_t *lock)
     }
 }
 
-void _read_lock(rwlock_t *lock)
-{
-    uint32_t x;
-
-    check_lock(&lock->debug);
-    do {
-        while ( (x = lock->lock) & RW_WRITE_FLAG )
-            cpu_relax();
-    } while ( cmpxchg(&lock->lock, x, x+1) != x );
-    preempt_disable();
-}
-
-void _read_lock_irq(rwlock_t *lock)
-{
-    uint32_t x;
 
-    ASSERT(local_irq_is_enabled());
-    local_irq_disable();
-    check_lock(&lock->debug);
-    do {
-        if ( (x = lock->lock) & RW_WRITE_FLAG )
-        {
-            local_irq_enable();
-            while ( (x = lock->lock) & RW_WRITE_FLAG )
-                cpu_relax();
-            local_irq_disable();
-        }
-    } while ( cmpxchg(&lock->lock, x, x+1) != x );
-    preempt_disable();
+/**
+ * rspin_until_writer_unlock - inc reader count & spin until writer is gone
+ * @lock  : Pointer to queue rwlock structure
+ * @writer: Current queue rwlock writer status byte
+ *
+ * In interrupt context or at the head of the queue, the reader will just
+ * increment the reader count & wait until the writer releases the lock.
+ */
+static inline void rspin_until_writer_unlock(rwlock_t *lock, u32 cnts)
+{
+    while ( (cnts & _QW_WMASK) == _QW_LOCKED )
+    {
+        cpu_relax();
+        smp_rmb();
+        cnts = atomic_read(&lock->cnts);
+    }
 }
 
-unsigned long _read_lock_irqsave(rwlock_t *lock)
+/**
+ * queue_read_lock_slowpath - acquire read lock of a queue rwlock
+ * @lock: Pointer to queue rwlock structure
+ */
+void queue_read_lock_slowpath(rwlock_t *lock)
 {
-    uint32_t x;
-    unsigned long flags;
-
-    local_irq_save(flags);
-    check_lock(&lock->debug);
-    do {
-        if ( (x = lock->lock) & RW_WRITE_FLAG )
-        {
-            local_irq_restore(flags);
-            while ( (x = lock->lock) & RW_WRITE_FLAG )
-                cpu_relax();
-            local_irq_disable();
-        }
-    } while ( cmpxchg(&lock->lock, x, x+1) != x );
-    preempt_disable();
-    return flags;
-}
+    u32 cnts;
+    /*
+     * Readers come here when they cannot get the lock without waiting
+     */
+    if ( unlikely(in_irq()) )
+    {
+        /*
+         * Readers in interrupt context will spin until the lock is
+         * available without waiting in the queue.
+         */
+        smp_rmb();
+        cnts = atomic_read(&lock->cnts);
+        rspin_until_writer_unlock(lock, cnts);
+        return;
+    }
+    atomic_sub(_QR_BIAS, &lock->cnts);
 
-int _read_trylock(rwlock_t *lock)
-{
-    uint32_t x;
+    /*
+     * Put the reader into the wait queue
+     */
+    spin_lock(&lock->lock);
 
-    check_lock(&lock->debug);
-    do {
-        if ( (x = lock->lock) & RW_WRITE_FLAG )
-            return 0;
-    } while ( cmpxchg(&lock->lock, x, x+1) != x );
-    preempt_disable();
-    return 1;
-}
+    /*
+     * At the head of the wait queue now, wait until the writer state
+     * goes to 0 and then try to increment the reader count and get
+     * the lock. It is possible that an incoming writer may steal the
+     * lock in the interim, so it is necessary to check the writer byte
+     * to make sure that the write lock isn't taken.
+     */
+    while ( atomic_read(&lock->cnts) & _QW_WMASK )
+        cpu_relax();
 
-#ifndef _raw_read_unlock
-# define _raw_read_unlock(l) do {                      \
-    uint32_t x = (l)->lock, y;                         \
-    while ( (y = cmpxchg(&(l)->lock, x, x - 1)) != x ) \
-        x = y;                                         \
-} while (0)
-#endif
+    cnts = atomic_add_return(_QR_BIAS, &lock->cnts) - _QR_BIAS;
+    rspin_until_writer_unlock(lock, cnts);
 
-inline void _read_unlock(rwlock_t *lock)
-{
-    preempt_enable();
-    _raw_read_unlock(lock);
+    /*
+     * Signal the next one in queue to become queue head
+     */
+    spin_unlock(&lock->lock);
 }
 
-void _read_unlock_irq(rwlock_t *lock)
+/**
+ * queue_write_lock_slowpath - acquire write lock of a queue rwlock
+ * @lock : Pointer to queue rwlock structure
+ */
+void queue_write_lock_slowpath(rwlock_t *lock)
 {
-    _read_unlock(lock);
-    local_irq_enable();
-}
+    u32 cnts;
 
-void _read_unlock_irqrestore(rwlock_t *lock, unsigned long flags)
-{
-    _read_unlock(lock);
-    local_irq_restore(flags);
-}
+    /* Put the writer into the wait queue */
+    spin_lock(&lock->lock);
 
-void _write_lock(rwlock_t *lock)
-{
-    uint32_t x;
+    /* Try to acquire the lock directly if no reader is present */
+    if ( !atomic_read(&lock->cnts) &&
+         (atomic_cmpxchg(&lock->cnts, 0, _QW_LOCKED) == 0) )
+        goto unlock;
 
-    check_lock(&lock->debug);
-    do {
-        while ( (x = lock->lock) & RW_WRITE_FLAG )
-            cpu_relax();
-    } while ( cmpxchg(&lock->lock, x, x|RW_WRITE_FLAG) != x );
-    while ( x != 0 )
+    /*
+     * Set the waiting flag to notify readers that a writer is pending,
+     * or wait for a previous writer to go away.
+     */
+    for (;;)
     {
-        cpu_relax();
-        x = lock->lock & ~RW_WRITE_FLAG;
-    }
-    preempt_disable();
-}
-
-void _write_lock_irq(rwlock_t *lock)
-{
-    uint32_t x;
+        cnts = atomic_read(&lock->cnts);
+        if ( !(cnts & _QW_WMASK) &&
+             (atomic_cmpxchg(&lock->cnts, cnts,
+                             cnts | _QW_WAITING) == cnts) )
+            break;
 
-    ASSERT(local_irq_is_enabled());
-    local_irq_disable();
-    check_lock(&lock->debug);
-    do {
-        if ( (x = lock->lock) & RW_WRITE_FLAG )
-        {
-            local_irq_enable();
-            while ( (x = lock->lock) & RW_WRITE_FLAG )
-                cpu_relax();
-            local_irq_disable();
-        }
-    } while ( cmpxchg(&lock->lock, x, x|RW_WRITE_FLAG) != x );
-    while ( x != 0 )
-    {
         cpu_relax();
-        x = lock->lock & ~RW_WRITE_FLAG;
     }
-    preempt_disable();
-}
 
-unsigned long _write_lock_irqsave(rwlock_t *lock)
-{
-    uint32_t x;
-    unsigned long flags;
-
-    local_irq_save(flags);
-    check_lock(&lock->debug);
-    do {
-        if ( (x = lock->lock) & RW_WRITE_FLAG )
-        {
-            local_irq_restore(flags);
-            while ( (x = lock->lock) & RW_WRITE_FLAG )
-                cpu_relax();
-            local_irq_disable();
-        }
-    } while ( cmpxchg(&lock->lock, x, x|RW_WRITE_FLAG) != x );
-    while ( x != 0 )
+    /* When no more readers, set the locked flag */
+    for (;;)
     {
+        cnts = atomic_read(&lock->cnts);
+        if ( (cnts == _QW_WAITING) &&
+             (atomic_cmpxchg(&lock->cnts, _QW_WAITING,
+                             _QW_LOCKED) == _QW_WAITING) )
+            break;
+
         cpu_relax();
-        x = lock->lock & ~RW_WRITE_FLAG;
     }
-    preempt_disable();
-    return flags;
-}
-
-int _write_trylock(rwlock_t *lock)
-{
-    uint32_t x;
-
-    check_lock(&lock->debug);
-    do {
-        if ( (x = lock->lock) != 0 )
-            return 0;
-    } while ( cmpxchg(&lock->lock, x, x|RW_WRITE_FLAG) != x );
-    preempt_disable();
-    return 1;
-}
-
-#ifndef _raw_write_unlock
-# define _raw_write_unlock(l) xchg(&(l)->lock, 0)
-#endif
-
-inline void _write_unlock(rwlock_t *lock)
-{
-    preempt_enable();
-    if ( _raw_write_unlock(lock) != RW_WRITE_FLAG )
-        BUG();
-}
-
-void _write_unlock_irq(rwlock_t *lock)
-{
-    _write_unlock(lock);
-    local_irq_enable();
-}
-
-void _write_unlock_irqrestore(rwlock_t *lock, unsigned long flags)
-{
-    _write_unlock(lock);
-    local_irq_restore(flags);
-}
-
-int _rw_is_locked(rwlock_t *lock)
-{
-    check_lock(&lock->debug);
-    return (lock->lock != 0); /* anyone in critical section? */
-}
-
-int _rw_is_write_locked(rwlock_t *lock)
-{
-    check_lock(&lock->debug);
-    return (lock->lock == RW_WRITE_FLAG); /* writer in critical section? */
+unlock:
+    spin_unlock(&lock->lock);
 }
 
 #ifdef LOCK_PROFILE
diff --git a/xen/include/xen/spinlock.h b/xen/include/xen/spinlock.h
index 5e54407..98f242c 100644
--- a/xen/include/xen/spinlock.h
+++ b/xen/include/xen/spinlock.h
@@ -1,6 +1,7 @@
 #ifndef __SPINLOCK_H__
 #define __SPINLOCK_H__
 
+#include <xen/atomic.h>
 #include <asm/system.h>
 #include <asm/spinlock.h>
 
@@ -148,17 +149,31 @@ typedef struct spinlock {
 
 #define spin_lock_init(l) (*(l) = (spinlock_t)SPIN_LOCK_UNLOCKED)
 
+
 typedef struct {
-    volatile uint32_t lock;
-    struct lock_debug debug;
+    atomic_t cnts;
+    spinlock_t lock;
+    /* FIXME: struct lock_debug debug; */
 } rwlock_t;
 
-#define RW_WRITE_FLAG (1u<<31)
+#define    RW_LOCK_UNLOCKED {           \
+    .cnts = ATOMIC_INIT(0),             \
+    .lock = SPIN_LOCK_UNLOCKED,         \
+/*    debug  */                         \
+}
 
-#define RW_LOCK_UNLOCKED { 0, _LOCK_DEBUG }
 #define DEFINE_RWLOCK(l) rwlock_t l = RW_LOCK_UNLOCKED
 #define rwlock_init(l) (*(l) = (rwlock_t)RW_LOCK_UNLOCKED)
 
+/*
+ * Writer states & reader shift and bias
+ */
+#define    _QW_WAITING  1               /* A writer is waiting     */
+#define    _QW_LOCKED   0xff            /* A writer holds the lock */
+#define    _QW_WMASK    0xff            /* Writer mask             */
+#define    _QR_SHIFT    8               /* Reader count shift      */
+#define    _QR_BIAS     (1U << _QR_SHIFT)
+
 void _spin_lock(spinlock_t *lock);
 void _spin_lock_irq(spinlock_t *lock);
 unsigned long _spin_lock_irqsave(spinlock_t *lock);
@@ -175,26 +190,160 @@ int _spin_trylock_recursive(spinlock_t *lock);
 void _spin_lock_recursive(spinlock_t *lock);
 void _spin_unlock_recursive(spinlock_t *lock);
 
-void _read_lock(rwlock_t *lock);
-void _read_lock_irq(rwlock_t *lock);
-unsigned long _read_lock_irqsave(rwlock_t *lock);
-
-void _read_unlock(rwlock_t *lock);
-void _read_unlock_irq(rwlock_t *lock);
-void _read_unlock_irqrestore(rwlock_t *lock, unsigned long flags);
-int _read_trylock(rwlock_t *lock);
-
-void _write_lock(rwlock_t *lock);
-void _write_lock_irq(rwlock_t *lock);
-unsigned long _write_lock_irqsave(rwlock_t *lock);
-int _write_trylock(rwlock_t *lock);
+void queue_read_lock_slowpath(rwlock_t *lock);
+void queue_write_lock_slowpath(rwlock_t *lock);
 
-void _write_unlock(rwlock_t *lock);
-void _write_unlock_irq(rwlock_t *lock);
-void _write_unlock_irqrestore(rwlock_t *lock, unsigned long flags);
-
-int _rw_is_locked(rwlock_t *lock);
-int _rw_is_write_locked(rwlock_t *lock);
+/**
+ * queue_read_trylock - try to acquire read lock of a queue rwlock
+ * @lock : Pointer to queue rwlock structure
+ * Return: 1 if lock acquired, 0 if failed
+ */
+static inline int _read_trylock(rwlock_t *lock)
+{
+    u32 cnts;
+
+    cnts = atomic_read(&lock->cnts);
+    if ( likely(!(cnts & _QW_WMASK)) )
+    {
+        cnts = (u32)atomic_add_return(_QR_BIAS, &lock->cnts);
+        if ( likely(!(cnts & _QW_WMASK)) )
+            return 1;
+        atomic_sub(_QR_BIAS, &lock->cnts);
+    }
+    return 0;
+}
+
+/**
+ * queue_read_lock - acquire read lock of a queue rwlock
+ * @lock: Pointer to queue rwlock structure
+ */
+static inline void _read_lock(rwlock_t *lock)
+{
+    u32 cnts;
+
+    cnts = atomic_add_return(_QR_BIAS, &lock->cnts);
+    if ( likely(!(cnts & _QW_WMASK)) )
+        return;
+
+    /* The slowpath will decrement the reader count, if necessary. */
+    queue_read_lock_slowpath(lock);
+}
+
+static inline void _read_lock_irq(rwlock_t *lock)
+{
+    ASSERT(local_irq_is_enabled());
+    local_irq_disable();
+    _read_lock(lock);
+}
+
+static inline unsigned long _read_lock_irqsave(rwlock_t *lock)
+{
+    unsigned long flags;
+    local_irq_save(flags);
+    _read_lock(lock);
+    return flags;
+}
+
+/**
+ * queue_read_unlock - release read lock of a queue rwlock
+ * @lock : Pointer to queue rwlock structure
+ */
+static inline void _read_unlock(rwlock_t *lock)
+{
+    /*
+     * Atomically decrement the reader count
+     */
+    atomic_sub(_QR_BIAS, &lock->cnts);
+}
+
+static inline void _read_unlock_irq(rwlock_t *lock)
+{
+    _read_unlock(lock);
+    local_irq_enable();
+}
+
+static inline void _read_unlock_irqrestore(rwlock_t *lock, unsigned long flags)
+{
+    _read_unlock(lock);
+    local_irq_restore(flags);
+}
+
+static inline int _rw_is_locked(rwlock_t *lock)
+{
+    return atomic_read(&lock->cnts);
+}
+
+/**
+ * queue_write_lock - acquire write lock of a queue rwlock
+ * @lock : Pointer to queue rwlock structure
+ */
+static inline void _write_lock(rwlock_t *lock)
+{
+    /* Optimize for the unfair lock case where the fair flag is 0. */
+    if ( atomic_cmpxchg(&lock->cnts, 0, _QW_LOCKED) == 0 )
+        return;
+
+    queue_write_lock_slowpath(lock);
+}
+
+static inline void _write_lock_irq(rwlock_t *lock)
+{
+    ASSERT(local_irq_is_enabled());
+    local_irq_disable();
+    _write_lock(lock);
+}
+
+static inline unsigned long _write_lock_irqsave(rwlock_t *lock)
+{
+    unsigned long flags;
+
+    local_irq_save(flags);
+    _write_lock(lock);
+    return flags;
+}
+
+/**
+ * queue_write_trylock - try to acquire write lock of a queue rwlock
+ * @lock : Pointer to queue rwlock structure
+ * Return: 1 if lock acquired, 0 if failed
+ */
+static inline int _write_trylock(rwlock_t *lock)
+{
+    u32 cnts;
+
+    cnts = atomic_read(&lock->cnts);
+    if ( unlikely(cnts) )
+        return 0;
+
+    return likely(atomic_cmpxchg(&lock->cnts,
+                                 cnts, cnts | _QW_LOCKED) == cnts);
+}
+
+static inline void _write_unlock(rwlock_t *lock)
+{
+    /*
+     * If the writer field is atomic, it can be cleared directly.
+     * Otherwise, an atomic subtraction will be used to clear it.
+     */
+    atomic_sub(_QW_LOCKED, &lock->cnts);
+}
+
+static inline void _write_unlock_irq(rwlock_t *lock)
+{
+    _write_unlock(lock);
+    local_irq_enable();
+}
+
+static inline void _write_unlock_irqrestore(rwlock_t *lock, unsigned long flags)
+{
+    _write_unlock(lock);
+    local_irq_restore(flags);
+}
+
+static inline int _rw_is_write_locked(rwlock_t *lock)
+{
+    return atomic_read(&lock->cnts) & _QW_WMASK;
+}
 
 #define spin_lock(l)                  _spin_lock(l)
 #define spin_lock_irq(l)              _spin_lock_irq(l)
-- 
2.1.4

  parent reply	other threads:[~2015-12-18 14:09 UTC|newest]

Thread overview: 14+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2015-12-18 14:09 [PATCHv1 0/4] spinlock: add qrwlocks David Vrabel
2015-12-18 14:09 ` [PATCHv1 1/4] atomic: replace atomic_compareandswap() with atomic_cmpxchg() David Vrabel
2015-12-18 17:01   ` Jan Beulich
2016-01-21 15:19   ` Ian Campbell
2015-12-18 14:09 ` [PATCHv1 2/4] x86/domain: Compile with lock_profile=y enabled David Vrabel
2015-12-18 14:09 ` [PATCHv1 3/4] spinlock: shrink struct lock_debug David Vrabel
2015-12-18 16:58   ` Jan Beulich
2016-01-19 12:31     ` Jennifer Herbert
2016-01-19 13:04       ` Jan Beulich
2015-12-18 14:09 ` David Vrabel [this message]
2015-12-22 13:54   ` [PATCHv1 4/4] spinlock: add fair read-write locks Jan Beulich
2016-01-18 16:44     ` Jennifer Herbert
2015-12-18 15:47 ` [PATCHv1 0/4] spinlock: add qrwlocks Ian Campbell
2015-12-18 15:49   ` David Vrabel

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1450447746-9305-5-git-send-email-david.vrabel@citrix.com \
    --to=david.vrabel@citrix.com \
    --cc=ian.campbell@citrix.com \
    --cc=jbeulich@suse.com \
    --cc=jennifer.herbert@citrix.com \
    --cc=xen-devel@lists.xenproject.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.