linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [RFC PATCH 0/6] fast queue spinlocks
@ 2013-01-22 23:13 Michel Lespinasse
  2013-01-22 23:13 ` [RFC PATCH 1/6] kernel: implement queue spinlock API Michel Lespinasse
                   ` (6 more replies)
  0 siblings, 7 replies; 25+ messages in thread
From: Michel Lespinasse @ 2013-01-22 23:13 UTC (permalink / raw)
  To: Rik van Riel, Ingo Molnar, Paul E. McKenney, David Howells,
	Thomas Gleixner, Eric Dumazet, Eric W. Biederman, Manfred Spraul
  Cc: linux-kernel

I would like to propose a fast queue spinlock implementation, which could
be used on selected spinlocks instead of the default ticket spinlock.

I understand that in the past, such proposals have been defeated by
two main arguments:
- That it is generally preferable to use smart algorithms that can avoid
  lock contention altogether, rather than spending effort on the lock
  itself to deal with the contention, and
- That the lightly contended case matters more to real workloads than the
  highly contended case, and that the most well known scalable spinlock
  algorithms tend to be relatively slow in the lightly contended case.

I am hoping for a different result this time around based on the following
counter-arguments:
- There are situations where the lock contention is directly driven by user
  events, with little opportunity to reduce it in-kernel. One example might
  be when the user requests that the kernel acquires or releases semaphores
  on its behalf using sysv IPC APIs - it is hard to imagine how the kernel
  could mitigate spinlock contention issues for the user.
- More importantly, the queue spinlock implementation I am proposing
  seems to behave well both under light and heavy contention. It uses one
  single atomic opearation on the lock side, and (on x86) only a single
  memory store on the unlock side, with fairly short code sections on
  both sides, and just compares well with the ticket spinlock on the
  benchmarks I have tried.
To be clear, I am not proposing replacing every ticket spinlock usage with
queue spinlocks; I am only proposing to have both APIs available and pick
them as appropriate for each use case.

In order to have concrete examples, I have converted two spinlocks to
the proposed queue spinlock API:
- The IPC object spinlock, which protects each individually allocated
  IPC message queue, IPC shared memory segment or IPC semaphore array.
- The network qdisc busylock, which if I understand correctly seems to
  be a way to prevent multiple network writers starving the thread that
  wants to actually dequeue packets to send them on the wire.
I picked these two locks because I knew of situations where they get
contended; additionally I wanted to have examples of one lock that gets
taken in process context and one that can be taken in softirq context,
in order to demonstrate that the proposed spinlock can deal with the
resulting reentrancy issues. I am not very familiar with either the
sysv IPC or the networking code, however I know that these areas have
been looked at by competent people so I assume there are no obvious
ways to avoid contention on these two spinlocks (but feel free to
propose better examples if I'm wrong :)

The first 3 patches are a bit of a strawman, as they introduce a classic
MCS queue lock implementation and convert the IPC and qdisc spinlocks
to use the corresponding locking API. I expect this might be rejected
since MCS spinlocks have some higher cost than ticket based ones under
light contention; however I consider this a useful step as it lets us
implement most of the mechanical changes to convert our two spinlocks
to the proposed queue spinlock API.

Patch 4 implements my proposed fast queue spinlock algorithm, which
seems to performs better than MCS at all contention levels and is
competitive with ticket spinlocks at low contention levels. This comes
with a few limitations, mostly due to the fact that spinlocks must point
to a dynamically allocated ticket structure - so they can't have a static
initializer, they may be inappropriate for short lived objects, the init
function may fail, and they need to be explicitly destroyed before the
object they're in is freed.

Patches 5-6 convert the IPC and qdisc spinlocks to deal with the API
limitations introduced by the fast queue spinlock algorithm in patch 4.

Michel Lespinasse (6):
  kernel: implement queue spinlock API
  net: convert qdisc busylock to use queue spinlock API
  ipc: convert ipc objects to use queue spinlock API
  kernel: faster queue spinlock implementation
  net: qdisc busylock updates to account for queue spinlock api change
  ipc: object locking updates to account for queue spinlock api change

 arch/x86/include/asm/queue_spinlock.h |   20 +++++
 include/asm-generic/queue_spinlock.h  |    7 ++
 include/linux/ipc.h                   |    9 +--
 include/linux/msg.h                   |    2 +-
 include/linux/queue_spinlock.h        |  113 +++++++++++++++++++++++++++++
 include/linux/shm.h                   |    2 +-
 include/net/sch_generic.h             |    3 +-
 init/main.c                           |    2 +
 ipc/msg.c                             |   61 +++++++++-------
 ipc/namespace.c                       |    8 ++-
 ipc/sem.c                             |  115 +++++++++++++++++-------------
 ipc/shm.c                             |   95 ++++++++++++++-----------
 ipc/util.c                            |  122 +++++++++++++++++++++-----------
 ipc/util.h                            |   25 ++++---
 kernel/Makefile                       |    2 +-
 kernel/queue_spinlock.c               |  125 +++++++++++++++++++++++++++++++++
 net/core/dev.c                        |    9 ++-
 net/sched/sch_generic.c               |   19 +++--
 18 files changed, 546 insertions(+), 193 deletions(-)
 create mode 100644 arch/x86/include/asm/queue_spinlock.h
 create mode 100644 include/asm-generic/queue_spinlock.h
 create mode 100644 include/linux/queue_spinlock.h
 create mode 100644 kernel/queue_spinlock.c


Now for the obligatory benchmarks:


First, testing IPC locking using rik's semop_test benchmark:

On 4 socket AMD Barcelona system (16 cores total):
1 thread (no contention): ~5.38 Mop/s queue spinlock vs ~5.45 baseline (-1%)
2 threads:                ~1.47 Mop/s queue spinlock vs ~1.48 baseline
4 threads:                ~1.57 Mop/s queue spinlock vs ~1.21 baseline (+30%)
8 threads:                ~1.41 Mop/s queue spinlock vs ~0.83 baseline (+70%)
16 threads:               ~1.40 Mop/s queue spinlock vs ~0.47 baseline (+200%)
For comparison, rik's proportional backoff (v3) gets ~1.18 Mop/s at 16 threads

On 2 socket Intel Sandybridge system (16 cores / 32 threads total):
1 thread (no contention): ~8.36 Mop/s queue spinlock vs ~8.25 baseline (+1%)
2 threads: results are noisy there, from ~5 to ~6.5 Mops/s with either trees
4 threads: ~4.5 to ~5.8 Mops/s queue spinlock (noisy), ~5.5 baseline
8 threads: ~4.3 to ~5 Mops/s queue spinlock, ~4.1 to ~4.2 baseline
16 threads: ~3.2 to ~3.6 Mops/s queue spinlock, ~1.5 to ~2.4 baseline
32 threads: ~2.7 to ~3.2 Mops/s queue spinlock, ~1.5 to ~1.9 baseline
For comparison, rik's proposal gets ~1.75 to ~2.1 at 32 threads

Basically for semop_test, the data is noisy on one of the test machines but
the queue spinlock is never measurably slower than the ticket spinlock,
except by 1% in the uncontended case on the AMD system (and it's 1% faster
in the uncontended case on the other system, so I'd call it a draw :)


32 instances of netperf -t UDP_STREAM -H <server> -- -m 128 on a 32
hypercores machine (2 sandybridge sockets) and htb root qdisc:
~650 to ~680 Mbps total with baseline 3.7
~895 to ~915 Mbps total with fast queue spinlock
~860 to ~890 Mbps total with rik's proportional backoff (v3)


Additionally I will attach (as a reply to this email) graphs showing total
spinlock throughput for a microbenchmark consisting of N threads doing
lock/unlock operations in a tight loop. We can see that the proposed
fast queue spinlock is comparable to ticket spinlocks for low number
of threads, scales much better for high number of threads, and is always
faster than the MCS strawman proposal (which did have the issue of being
kinda slow at around 2-3 threads).
mach1 is a 4 socket AMD Barcelona system (16 cores total)
mach2 is a 2 socket Intel Westmere system (12 cores / 24 threads total)
mach3 is a 2 socket Intel Sandybridge system (16 cores / 32 threads total)

-- 
1.7.7.3

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [RFC PATCH 1/6] kernel: implement queue spinlock API
  2013-01-22 23:13 [RFC PATCH 0/6] fast queue spinlocks Michel Lespinasse
@ 2013-01-22 23:13 ` Michel Lespinasse
  2013-02-07 22:34   ` Paul E. McKenney
  2013-01-22 23:13 ` [RFC PATCH 2/6] net: convert qdisc busylock to use " Michel Lespinasse
                   ` (5 subsequent siblings)
  6 siblings, 1 reply; 25+ messages in thread
From: Michel Lespinasse @ 2013-01-22 23:13 UTC (permalink / raw)
  To: Rik van Riel, Ingo Molnar, Paul E. McKenney, David Howells,
	Thomas Gleixner, Eric Dumazet, Eric W. Biederman, Manfred Spraul
  Cc: linux-kernel

Introduce queue spinlocks, to be used in situations where it is desired
to have good throughput even under the occasional high-contention situation.

This initial implementation is based on the classic MCS spinlock,
because I think this represents the nicest API we can hope for in a
fast queue spinlock algorithm. The MCS spinlock has known limitations
in that it performs very well under high contention, but is not as
good as the ticket spinlock under low contention. I will address these
limitations in a later patch, which will propose an alternative,
higher performance implementation using (mostly) the same API.

Sample use case acquiring mystruct->lock:

  struct q_spinlock_node node;

  q_spin_lock(&mystruct->lock, &node);
  ...
  q_spin_unlock(&mystruct->lock, &node);

The q_spinlock_node must be used by a single lock holder / waiter and it must
stay allocated for the entire duration when the lock is held (or waited on).

Signed-off-by: Michel Lespinasse <walken@google.com>

---
 arch/x86/include/asm/queue_spinlock.h |   20 ++++++++
 include/asm-generic/queue_spinlock.h  |    7 +++
 include/linux/queue_spinlock.h        |   61 ++++++++++++++++++++++++
 kernel/Makefile                       |    2 +-
 kernel/queue_spinlock.c               |   82 +++++++++++++++++++++++++++++++++
 5 files changed, 171 insertions(+), 1 deletions(-)
 create mode 100644 arch/x86/include/asm/queue_spinlock.h
 create mode 100644 include/asm-generic/queue_spinlock.h
 create mode 100644 include/linux/queue_spinlock.h
 create mode 100644 kernel/queue_spinlock.c

diff --git a/arch/x86/include/asm/queue_spinlock.h b/arch/x86/include/asm/queue_spinlock.h
new file mode 100644
index 000000000000..655e01c1c2e8
--- /dev/null
+++ b/arch/x86/include/asm/queue_spinlock.h
@@ -0,0 +1,20 @@
+#ifndef _ASM_QUEUE_SPINLOCK_H
+#define _ASM_QUEUE_SPINLOCK_H
+
+#if defined(CONFIG_X86_PPRO_FENCE) || defined(CONFIG_X86_OOSTORE)
+
+#define q_spin_lock_mb() mb()
+#define q_spin_unlock_mb() mb()
+
+#else
+
+/*
+ * x86 memory ordering only needs compiler barriers to implement
+ * acquire load / release store semantics
+ */
+#define q_spin_lock_mb() barrier()
+#define q_spin_unlock_mb() barrier()
+
+#endif
+
+#endif /* _ASM_QUEUE_SPINLOCK_H */
diff --git a/include/asm-generic/queue_spinlock.h b/include/asm-generic/queue_spinlock.h
new file mode 100644
index 000000000000..01fb9fbfb8dc
--- /dev/null
+++ b/include/asm-generic/queue_spinlock.h
@@ -0,0 +1,7 @@
+#ifndef _ASM_GENERIC_QUEUE_SPINLOCK_H
+#define _ASM_GENERIC_QUEUE_SPINLOCK_H
+
+#define q_spin_lock_mb() mb()
+#define q_spin_unlock_mb() mb()
+
+#endif /* _ASM_GENERIC_QUEUE_SPINLOCK_H */
diff --git a/include/linux/queue_spinlock.h b/include/linux/queue_spinlock.h
new file mode 100644
index 000000000000..84b2470d232b
--- /dev/null
+++ b/include/linux/queue_spinlock.h
@@ -0,0 +1,61 @@
+#ifndef __LINUX_QUEUE_SPINLOCK_H
+#define __LINUX_QUEUE_SPINLOCK_H
+
+#include <linux/bug.h>
+
+struct q_spinlock_node {
+	struct q_spinlock_node *next;
+	bool wait;
+};
+
+struct q_spinlock {
+	struct q_spinlock_node *node;
+};
+
+#define __Q_SPIN_LOCK_UNLOCKED(name) { NULL }
+
+#ifdef CONFIG_SMP
+
+void q_spin_lock(struct q_spinlock *lock, struct q_spinlock_node *node);
+void q_spin_lock_bh(struct q_spinlock *lock, struct q_spinlock_node *node);
+void q_spin_unlock(struct q_spinlock *lock, struct q_spinlock_node *node);
+void q_spin_unlock_bh(struct q_spinlock *lock, struct q_spinlock_node *node);
+
+static inline void q_spin_lock_init(struct q_spinlock *lock)
+{
+	lock->node = NULL;
+}
+
+static inline void assert_q_spin_locked(struct q_spinlock *lock)
+{
+	BUG_ON(!lock->node);
+}
+
+static inline void assert_q_spin_unlocked(struct q_spinlock *lock)
+{
+	BUG_ON(lock->node);
+}
+
+#else /* !CONFIG_SMP */
+
+static inline void
+q_spin_lock(struct q_spinlock *lock, struct q_spinlock_node *node) {}
+
+static inline void
+q_spin_lock_bh(struct q_spinlock *lock, struct q_spinlock_node *node) {}
+
+static inline void
+q_spin_unlock(struct q_spinlock *lock, struct q_spinlock_node *node) {}
+
+static inline void
+q_spin_unlock_bh(struct q_spinlock *lock, struct q_spinlock_node *node) {}
+
+static inline void q_spin_lock_init(struct q_spinlock *lock) {}
+
+static inline void assert_q_spin_locked(struct q_spinlock *lock) {}
+
+static inline void assert_q_spin_unlocked(struct q_spinlock *lock) {}
+
+#endif /* CONFIG_SMP */
+
+#endif /* __LINUX_QUEUE_SPINLOCK_H */
diff --git a/kernel/Makefile b/kernel/Makefile
index 86e3285ae7e5..ec91cfef4d4e 100644
--- a/kernel/Makefile
+++ b/kernel/Makefile
@@ -49,7 +49,7 @@ obj-$(CONFIG_SMP) += smp.o
 ifneq ($(CONFIG_SMP),y)
 obj-y += up.o
 endif
-obj-$(CONFIG_SMP) += spinlock.o
+obj-$(CONFIG_SMP) += spinlock.o queue_spinlock.o
 obj-$(CONFIG_DEBUG_SPINLOCK) += spinlock.o
 obj-$(CONFIG_PROVE_LOCKING) += spinlock.o
 obj-$(CONFIG_UID16) += uid16.o
diff --git a/kernel/queue_spinlock.c b/kernel/queue_spinlock.c
new file mode 100644
index 000000000000..20304f0bc852
--- /dev/null
+++ b/kernel/queue_spinlock.c
@@ -0,0 +1,82 @@
+#include <linux/queue_spinlock.h>
+#include <linux/export.h>
+#include <linux/preempt.h>
+#include <linux/bottom_half.h>
+#include <asm/barrier.h>
+#include <asm/processor.h>	/* for cpu_relax() */
+#include <asm/queue_spinlock.h>
+
+static inline void
+__q_spin_lock(struct q_spinlock *lock, struct q_spinlock_node *node)
+{
+	struct q_spinlock_node *prev;
+
+	node->next = NULL;
+	prev = xchg(&lock->node, node);
+	if (likely(!prev))
+		/*
+		 * Acquired the lock.
+		 * xchg() already includes a full memory barrier.
+		 */
+		return;
+
+	node->wait = true;
+	smp_wmb();
+	ACCESS_ONCE(prev->next) = node;
+	while (ACCESS_ONCE(node->wait))
+		cpu_relax();
+	q_spin_lock_mb();	/* guarantee acquire load semantics */
+}
+
+static inline void
+__q_spin_unlock(struct q_spinlock *lock, struct q_spinlock_node *node)
+{
+	struct q_spinlock_node *next;
+	if (likely(!(next = ACCESS_ONCE(node->next)))) {
+		/*
+		 * Try to release lock.
+		 * cmpxchg() already includes a full memory barrier.
+		 */
+		if (likely(cmpxchg(&lock->node, node, NULL) == node))
+			return;
+
+		/*
+		 * We raced with __q_spin_lock().
+		 * Wait for next thread to be known.
+		 */
+		while (!(next = ACCESS_ONCE(node->next)))
+			cpu_relax();
+	}
+	q_spin_unlock_mb();	/* guarantee release store semantics */
+	ACCESS_ONCE(next->wait) = false;
+}
+
+void q_spin_lock(struct q_spinlock *lock, struct q_spinlock_node *node)
+{
+	preempt_disable();
+	__q_spin_lock(lock, node);
+}
+EXPORT_SYMBOL(q_spin_lock);
+
+void q_spin_lock_bh(struct q_spinlock *lock, struct q_spinlock_node *node)
+{
+	local_bh_disable();
+	preempt_disable();
+	__q_spin_lock(lock, node);
+}
+EXPORT_SYMBOL(q_spin_lock_bh);
+
+void q_spin_unlock(struct q_spinlock *lock, struct q_spinlock_node *node)
+{
+	__q_spin_unlock(lock, node);
+	preempt_enable();
+}
+EXPORT_SYMBOL(q_spin_unlock);
+
+void q_spin_unlock_bh(struct q_spinlock *lock, struct q_spinlock_node *node)
+{
+	__q_spin_unlock(lock, node);
+	preempt_enable_no_resched();
+	local_bh_enable_ip((unsigned long)__builtin_return_address(0));
+}
+EXPORT_SYMBOL(q_spin_unlock_bh);
-- 
1.7.7.3

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [RFC PATCH 2/6] net: convert qdisc busylock to use queue spinlock API
  2013-01-22 23:13 [RFC PATCH 0/6] fast queue spinlocks Michel Lespinasse
  2013-01-22 23:13 ` [RFC PATCH 1/6] kernel: implement queue spinlock API Michel Lespinasse
@ 2013-01-22 23:13 ` Michel Lespinasse
  2013-01-22 23:13 ` [RFC PATCH 3/6] ipc: convert ipc objects " Michel Lespinasse
                   ` (4 subsequent siblings)
  6 siblings, 0 replies; 25+ messages in thread
From: Michel Lespinasse @ 2013-01-22 23:13 UTC (permalink / raw)
  To: Rik van Riel, Ingo Molnar, Paul E. McKenney, David Howells,
	Thomas Gleixner, Eric Dumazet, Eric W. Biederman, Manfred Spraul
  Cc: linux-kernel

Convert qdisc busylock to use the proposed queue spinlock API.

Signed-off-by: Michel Lespinasse <walken@google.com>

---
 include/net/sch_generic.h |    3 ++-
 net/core/dev.c            |    9 +++++----
 net/sched/sch_generic.c   |   10 +++-------
 3 files changed, 10 insertions(+), 12 deletions(-)

diff --git a/include/net/sch_generic.h b/include/net/sch_generic.h
index 4616f468d599..456c06581a57 100644
--- a/include/net/sch_generic.h
+++ b/include/net/sch_generic.h
@@ -6,6 +6,7 @@
 #include <linux/rcupdate.h>
 #include <linux/pkt_sched.h>
 #include <linux/pkt_cls.h>
+#include <linux/queue_spinlock.h>
 #include <net/gen_stats.h>
 #include <net/rtnetlink.h>
 
@@ -81,7 +82,7 @@ struct Qdisc {
 	unsigned int		__state;
 	struct gnet_stats_queue	qstats;
 	struct rcu_head		rcu_head;
-	spinlock_t		busylock;
+	struct q_spinlock	busylock;
 	u32			limit;
 };
 
diff --git a/net/core/dev.c b/net/core/dev.c
index e5942bf45a6d..058c984cd024 100644
--- a/net/core/dev.c
+++ b/net/core/dev.c
@@ -2440,6 +2440,7 @@ static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
 {
 	spinlock_t *root_lock = qdisc_lock(q);
 	bool contended;
+	struct q_spinlock_node busylock_node;
 	int rc;
 
 	qdisc_skb_cb(skb)->pkt_len = skb->len;
@@ -2452,7 +2453,7 @@ static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
 	 */
 	contended = qdisc_is_running(q);
 	if (unlikely(contended))
-		spin_lock(&q->busylock);
+		q_spin_lock(&q->busylock, &busylock_node);
 
 	spin_lock(root_lock);
 	if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
@@ -2472,7 +2473,7 @@ static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
 
 		if (sch_direct_xmit(skb, q, dev, txq, root_lock)) {
 			if (unlikely(contended)) {
-				spin_unlock(&q->busylock);
+				q_spin_unlock(&q->busylock, &busylock_node);
 				contended = false;
 			}
 			__qdisc_run(q);
@@ -2485,7 +2486,7 @@ static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
 		rc = q->enqueue(skb, q) & NET_XMIT_MASK;
 		if (qdisc_run_begin(q)) {
 			if (unlikely(contended)) {
-				spin_unlock(&q->busylock);
+				q_spin_unlock(&q->busylock, &busylock_node);
 				contended = false;
 			}
 			__qdisc_run(q);
@@ -2493,7 +2494,7 @@ static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
 	}
 	spin_unlock(root_lock);
 	if (unlikely(contended))
-		spin_unlock(&q->busylock);
+		q_spin_unlock(&q->busylock, &busylock_node);
 	return rc;
 }
 
diff --git a/net/sched/sch_generic.c b/net/sched/sch_generic.c
index aefc1504dc88..6675d30d526a 100644
--- a/net/sched/sch_generic.c
+++ b/net/sched/sch_generic.c
@@ -362,7 +362,7 @@ struct Qdisc noop_qdisc = {
 	.list		=	LIST_HEAD_INIT(noop_qdisc.list),
 	.q.lock		=	__SPIN_LOCK_UNLOCKED(noop_qdisc.q.lock),
 	.dev_queue	=	&noop_netdev_queue,
-	.busylock	=	__SPIN_LOCK_UNLOCKED(noop_qdisc.busylock),
+	.busylock	=	__Q_SPIN_LOCK_UNLOCKED(noop_qdisc.busylock),
 };
 EXPORT_SYMBOL(noop_qdisc);
 
@@ -389,7 +389,7 @@ static struct Qdisc noqueue_qdisc = {
 	.list		=	LIST_HEAD_INIT(noqueue_qdisc.list),
 	.q.lock		=	__SPIN_LOCK_UNLOCKED(noqueue_qdisc.q.lock),
 	.dev_queue	=	&noqueue_netdev_queue,
-	.busylock	=	__SPIN_LOCK_UNLOCKED(noqueue_qdisc.busylock),
+	.busylock	=	__Q_SPIN_LOCK_UNLOCKED(noqueue_qdisc.busylock),
 };
 
 
@@ -527,8 +527,6 @@ struct Qdisc_ops pfifo_fast_ops __read_mostly = {
 };
 EXPORT_SYMBOL(pfifo_fast_ops);
 
-static struct lock_class_key qdisc_tx_busylock;
-
 struct Qdisc *qdisc_alloc(struct netdev_queue *dev_queue,
 			  struct Qdisc_ops *ops)
 {
@@ -557,9 +555,7 @@ struct Qdisc *qdisc_alloc(struct netdev_queue *dev_queue,
 	INIT_LIST_HEAD(&sch->list);
 	skb_queue_head_init(&sch->q);
 
-	spin_lock_init(&sch->busylock);
-	lockdep_set_class(&sch->busylock,
-			  dev->qdisc_tx_busylock ?: &qdisc_tx_busylock);
+	q_spin_lock_init(&sch->busylock);
 
 	sch->ops = ops;
 	sch->enqueue = ops->enqueue;
-- 
1.7.7.3

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [RFC PATCH 3/6] ipc: convert ipc objects to use queue spinlock API
  2013-01-22 23:13 [RFC PATCH 0/6] fast queue spinlocks Michel Lespinasse
  2013-01-22 23:13 ` [RFC PATCH 1/6] kernel: implement queue spinlock API Michel Lespinasse
  2013-01-22 23:13 ` [RFC PATCH 2/6] net: convert qdisc busylock to use " Michel Lespinasse
@ 2013-01-22 23:13 ` Michel Lespinasse
  2013-01-22 23:13 ` [RFC PATCH 4/6] kernel: faster queue spinlock implementation Michel Lespinasse
                   ` (3 subsequent siblings)
  6 siblings, 0 replies; 25+ messages in thread
From: Michel Lespinasse @ 2013-01-22 23:13 UTC (permalink / raw)
  To: Rik van Riel, Ingo Molnar, Paul E. McKenney, David Howells,
	Thomas Gleixner, Eric Dumazet, Eric W. Biederman, Manfred Spraul
  Cc: linux-kernel

This change converts the IPC message queues, semaphores and shm segments
to use the queue spinlock API.

This is relatively large, but mostly mechanical, mostly adding
struct q_spinlock_node variables before locking ipc objects and passing
them all the way to the actual locking and unlocking functions.

In most cases the struct q_spinlock_node is allocated on stack, but
there are exceptions such as the sysvipc_proc_seqops allocating it
within their iterator structure.

Signed-off-by: Michel Lespinasse <walken@google.com>

---
 include/linux/ipc.h |    4 +-
 ipc/msg.c           |   61 ++++++++++++++++------------
 ipc/namespace.c     |    8 ++-
 ipc/sem.c           |  112 ++++++++++++++++++++++++++++----------------------
 ipc/shm.c           |   95 ++++++++++++++++++++++++-------------------
 ipc/util.c          |   55 ++++++++++++++----------
 ipc/util.h          |   25 +++++++----
 7 files changed, 207 insertions(+), 153 deletions(-)

diff --git a/include/linux/ipc.h b/include/linux/ipc.h
index 8d861b2651f7..81693a8a5177 100644
--- a/include/linux/ipc.h
+++ b/include/linux/ipc.h
@@ -1,7 +1,7 @@
 #ifndef _LINUX_IPC_H
 #define _LINUX_IPC_H
 
-#include <linux/spinlock.h>
+#include <linux/queue_spinlock.h>
 #include <linux/uidgid.h>
 #include <uapi/linux/ipc.h>
 
@@ -10,7 +10,7 @@
 /* used by in-kernel data structures */
 struct kern_ipc_perm
 {
-	spinlock_t	lock;
+	struct q_spinlock	lock;
 	int		deleted;
 	int		id;
 	key_t		key;
diff --git a/ipc/msg.c b/ipc/msg.c
index a71af5a65abf..aeb32d539f6e 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -69,9 +69,10 @@ struct msg_sender {
 
 #define msg_ids(ns)	((ns)->ids[IPC_MSG_IDS])
 
-#define msg_unlock(msq)		ipc_unlock(&(msq)->q_perm)
+#define msg_unlock(msq,node)		ipc_unlock(&(msq)->q_perm, node)
 
-static void freeque(struct ipc_namespace *, struct kern_ipc_perm *);
+static void freeque(struct ipc_namespace *, struct kern_ipc_perm *,
+		    struct q_spinlock_node *);
 static int newque(struct ipc_namespace *, struct ipc_params *);
 #ifdef CONFIG_PROC_FS
 static int sysvipc_msg_proc_show(struct seq_file *s, void *it);
@@ -144,9 +145,10 @@ void __init msg_init(void)
  * msg_lock_(check_) routines are called in the paths where the rw_mutex
  * is not held.
  */
-static inline struct msg_queue *msg_lock(struct ipc_namespace *ns, int id)
+static inline struct msg_queue *msg_lock(struct ipc_namespace *ns, int id,
+					 struct q_spinlock_node *node)
 {
-	struct kern_ipc_perm *ipcp = ipc_lock(&msg_ids(ns), id);
+	struct kern_ipc_perm *ipcp = ipc_lock(&msg_ids(ns), id, node);
 
 	if (IS_ERR(ipcp))
 		return (struct msg_queue *)ipcp;
@@ -155,9 +157,10 @@ static inline struct msg_queue *msg_lock(struct ipc_namespace *ns, int id)
 }
 
 static inline struct msg_queue *msg_lock_check(struct ipc_namespace *ns,
-						int id)
+					       int id,
+					       struct q_spinlock_node *node)
 {
-	struct kern_ipc_perm *ipcp = ipc_lock_check(&msg_ids(ns), id);
+	struct kern_ipc_perm *ipcp = ipc_lock_check(&msg_ids(ns), id, node);
 
 	if (IS_ERR(ipcp))
 		return (struct msg_queue *)ipcp;
@@ -183,6 +186,7 @@ static int newque(struct ipc_namespace *ns, struct ipc_params *params)
 	int id, retval;
 	key_t key = params->key;
 	int msgflg = params->flg;
+	struct q_spinlock_node node;
 
 	msq = ipc_rcu_alloc(sizeof(*msq));
 	if (!msq)
@@ -201,7 +205,7 @@ static int newque(struct ipc_namespace *ns, struct ipc_params *params)
 	/*
 	 * ipc_addid() locks msq
 	 */
-	id = ipc_addid(&msg_ids(ns), &msq->q_perm, ns->msg_ctlmni);
+	id = ipc_addid(&msg_ids(ns), &msq->q_perm, ns->msg_ctlmni, &node);
 	if (id < 0) {
 		security_msg_queue_free(msq);
 		ipc_rcu_putref(msq);
@@ -217,7 +221,7 @@ static int newque(struct ipc_namespace *ns, struct ipc_params *params)
 	INIT_LIST_HEAD(&msq->q_receivers);
 	INIT_LIST_HEAD(&msq->q_senders);
 
-	msg_unlock(msq);
+	msg_unlock(msq, &node);
 
 	return msq->q_perm.id;
 }
@@ -276,7 +280,8 @@ static void expunge_all(struct msg_queue *msq, int res)
  * msg_ids.rw_mutex (writer) and the spinlock for this message queue are held
  * before freeque() is called. msg_ids.rw_mutex remains locked on exit.
  */
-static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
+static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp,
+		    struct q_spinlock_node *node)
 {
 	struct list_head *tmp;
 	struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm);
@@ -284,7 +289,7 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 	expunge_all(msq, -EIDRM);
 	ss_wakeup(&msq->q_senders, 1);
 	msg_rmid(ns, msq);
-	msg_unlock(msq);
+	msg_unlock(msq, node);
 
 	tmp = msq->q_messages.next;
 	while (tmp != &msq->q_messages) {
@@ -415,6 +420,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 	struct msqid64_ds uninitialized_var(msqid64);
 	struct msg_queue *msq;
 	int err;
+	struct q_spinlock_node node;
 
 	if (cmd == IPC_SET) {
 		if (copy_msqid_from_user(&msqid64, buf, version))
@@ -422,7 +428,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 	}
 
 	ipcp = ipcctl_pre_down(ns, &msg_ids(ns), msqid, cmd,
-			       &msqid64.msg_perm, msqid64.msg_qbytes);
+			       &msqid64.msg_perm, msqid64.msg_qbytes, &node);
 	if (IS_ERR(ipcp))
 		return PTR_ERR(ipcp);
 
@@ -434,7 +440,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 
 	switch (cmd) {
 	case IPC_RMID:
-		freeque(ns, ipcp);
+		freeque(ns, ipcp, &node);
 		goto out_up;
 	case IPC_SET:
 		if (msqid64.msg_qbytes > ns->msg_ctlmnb &&
@@ -463,7 +469,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 		err = -EINVAL;
 	}
 out_unlock:
-	msg_unlock(msq);
+	msg_unlock(msq, &node);
 out_up:
 	up_write(&msg_ids(ns).rw_mutex);
 	return err;
@@ -474,6 +480,7 @@ SYSCALL_DEFINE3(msgctl, int, msqid, int, cmd, struct msqid_ds __user *, buf)
 	struct msg_queue *msq;
 	int err, version;
 	struct ipc_namespace *ns;
+	struct q_spinlock_node node;
 
 	if (msqid < 0 || cmd < 0)
 		return -EINVAL;
@@ -531,12 +538,12 @@ SYSCALL_DEFINE3(msgctl, int, msqid, int, cmd, struct msqid_ds __user *, buf)
 			return -EFAULT;
 
 		if (cmd == MSG_STAT) {
-			msq = msg_lock(ns, msqid);
+			msq = msg_lock(ns, msqid, &node);
 			if (IS_ERR(msq))
 				return PTR_ERR(msq);
 			success_return = msq->q_perm.id;
 		} else {
-			msq = msg_lock_check(ns, msqid);
+			msq = msg_lock_check(ns, msqid, &node);
 			if (IS_ERR(msq))
 				return PTR_ERR(msq);
 			success_return = 0;
@@ -560,7 +567,7 @@ SYSCALL_DEFINE3(msgctl, int, msqid, int, cmd, struct msqid_ds __user *, buf)
 		tbuf.msg_qbytes = msq->q_qbytes;
 		tbuf.msg_lspid  = msq->q_lspid;
 		tbuf.msg_lrpid  = msq->q_lrpid;
-		msg_unlock(msq);
+		msg_unlock(msq, &node);
 		if (copy_msqid_to_user(buf, &tbuf, version))
 			return -EFAULT;
 		return success_return;
@@ -574,7 +581,7 @@ SYSCALL_DEFINE3(msgctl, int, msqid, int, cmd, struct msqid_ds __user *, buf)
 	}
 
 out_unlock:
-	msg_unlock(msq);
+	msg_unlock(msq, &node);
 	return err;
 }
 
@@ -642,6 +649,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 	struct msg_msg *msg;
 	int err;
 	struct ipc_namespace *ns;
+	struct q_spinlock_node node;
 
 	ns = current->nsproxy->ipc_ns;
 
@@ -657,7 +665,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 	msg->m_type = mtype;
 	msg->m_ts = msgsz;
 
-	msq = msg_lock_check(ns, msqid);
+	msq = msg_lock_check(ns, msqid, &node);
 	if (IS_ERR(msq)) {
 		err = PTR_ERR(msq);
 		goto out_free;
@@ -686,10 +694,10 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 		}
 		ss_add(msq, &s);
 		ipc_rcu_getref(msq);
-		msg_unlock(msq);
+		msg_unlock(msq, &node);
 		schedule();
 
-		ipc_lock_by_ptr(&msq->q_perm);
+		ipc_lock_by_ptr(&msq->q_perm, &node);
 		ipc_rcu_putref(msq);
 		if (msq->q_perm.deleted) {
 			err = -EIDRM;
@@ -719,7 +727,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 	msg = NULL;
 
 out_unlock_free:
-	msg_unlock(msq);
+	msg_unlock(msq, &node);
 out_free:
 	if (msg != NULL)
 		free_msg(msg);
@@ -762,13 +770,14 @@ long do_msgrcv(int msqid, long *pmtype, void __user *mtext,
 	struct msg_msg *msg;
 	int mode;
 	struct ipc_namespace *ns;
+	struct q_spinlock_node node;
 
 	if (msqid < 0 || (long) msgsz < 0)
 		return -EINVAL;
 	mode = convert_mode(&msgtyp, msgflg);
 	ns = current->nsproxy->ipc_ns;
 
-	msq = msg_lock_check(ns, msqid);
+	msq = msg_lock_check(ns, msqid, &node);
 	if (IS_ERR(msq))
 		return PTR_ERR(msq);
 
@@ -819,7 +828,7 @@ long do_msgrcv(int msqid, long *pmtype, void __user *mtext,
 			atomic_sub(msg->m_ts, &ns->msg_bytes);
 			atomic_dec(&ns->msg_hdrs);
 			ss_wakeup(&msq->q_senders, 0);
-			msg_unlock(msq);
+			msg_unlock(msq, &node);
 			break;
 		}
 		/* No message waiting. Wait for a message */
@@ -837,7 +846,7 @@ long do_msgrcv(int msqid, long *pmtype, void __user *mtext,
 			msr_d.r_maxsize = msgsz;
 		msr_d.r_msg = ERR_PTR(-EAGAIN);
 		current->state = TASK_INTERRUPTIBLE;
-		msg_unlock(msq);
+		msg_unlock(msq, &node);
 
 		schedule();
 
@@ -876,7 +885,7 @@ long do_msgrcv(int msqid, long *pmtype, void __user *mtext,
 		/* Lockless receive, part 3:
 		 * Acquire the queue spinlock.
 		 */
-		ipc_lock_by_ptr(&msq->q_perm);
+		ipc_lock_by_ptr(&msq->q_perm, &node);
 		rcu_read_unlock();
 
 		/* Lockless receive, part 4:
@@ -890,7 +899,7 @@ long do_msgrcv(int msqid, long *pmtype, void __user *mtext,
 		if (signal_pending(current)) {
 			msg = ERR_PTR(-ERESTARTNOHAND);
 out_unlock:
-			msg_unlock(msq);
+			msg_unlock(msq, &node);
 			break;
 		}
 	}
diff --git a/ipc/namespace.c b/ipc/namespace.c
index f362298c5ce4..ba4f87c18870 100644
--- a/ipc/namespace.c
+++ b/ipc/namespace.c
@@ -70,11 +70,13 @@ struct ipc_namespace *copy_ipcs(unsigned long flags,
  * Called for each kind of ipc when an ipc_namespace exits.
  */
 void free_ipcs(struct ipc_namespace *ns, struct ipc_ids *ids,
-	       void (*free)(struct ipc_namespace *, struct kern_ipc_perm *))
+	       void (*free)(struct ipc_namespace *, struct kern_ipc_perm *,
+			    struct q_spinlock_node *))
 {
 	struct kern_ipc_perm *perm;
 	int next_id;
 	int total, in_use;
+	struct q_spinlock_node node;
 
 	down_write(&ids->rw_mutex);
 
@@ -84,8 +86,8 @@ void free_ipcs(struct ipc_namespace *ns, struct ipc_ids *ids,
 		perm = idr_find(&ids->ipcs_idr, next_id);
 		if (perm == NULL)
 			continue;
-		ipc_lock_by_ptr(perm);
-		free(ns, perm);
+		ipc_lock_by_ptr(perm, &node);
+		free(ns, perm, &node);
 		total++;
 	}
 	up_write(&ids->rw_mutex);
diff --git a/ipc/sem.c b/ipc/sem.c
index 58d31f1c1eb5..84b7f3b2c632 100644
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -138,11 +138,12 @@ struct sem_undo_list {
 
 #define sem_ids(ns)	((ns)->ids[IPC_SEM_IDS])
 
-#define sem_unlock(sma)		ipc_unlock(&(sma)->sem_perm)
+#define sem_unlock(sma,node)		ipc_unlock(&(sma)->sem_perm, node)
 #define sem_checkid(sma, semid)	ipc_checkid(&sma->sem_perm, semid)
 
 static int newary(struct ipc_namespace *, struct ipc_params *);
-static void freeary(struct ipc_namespace *, struct kern_ipc_perm *);
+static void freeary(struct ipc_namespace *, struct kern_ipc_perm *,
+		    struct q_spinlock_node *);
 #ifdef CONFIG_PROC_FS
 static int sysvipc_sem_proc_show(struct seq_file *s, void *it);
 #endif
@@ -194,9 +195,10 @@ void __init sem_init (void)
  * sem_lock_(check_) routines are called in the paths where the rw_mutex
  * is not held.
  */
-static inline struct sem_array *sem_lock(struct ipc_namespace *ns, int id)
+static inline struct sem_array *sem_lock(struct ipc_namespace *ns, int id,
+					 struct q_spinlock_node *node)
 {
-	struct kern_ipc_perm *ipcp = ipc_lock(&sem_ids(ns), id);
+	struct kern_ipc_perm *ipcp = ipc_lock(&sem_ids(ns), id, node);
 
 	if (IS_ERR(ipcp))
 		return (struct sem_array *)ipcp;
@@ -205,9 +207,10 @@ static inline struct sem_array *sem_lock(struct ipc_namespace *ns, int id)
 }
 
 static inline struct sem_array *sem_lock_check(struct ipc_namespace *ns,
-						int id)
+					       int id,
+					       struct q_spinlock_node *node)
 {
-	struct kern_ipc_perm *ipcp = ipc_lock_check(&sem_ids(ns), id);
+	struct kern_ipc_perm *ipcp = ipc_lock_check(&sem_ids(ns), id, node);
 
 	if (IS_ERR(ipcp))
 		return (struct sem_array *)ipcp;
@@ -215,23 +218,26 @@ static inline struct sem_array *sem_lock_check(struct ipc_namespace *ns,
 	return container_of(ipcp, struct sem_array, sem_perm);
 }
 
-static inline void sem_lock_and_putref(struct sem_array *sma)
+static inline void sem_lock_and_putref(struct sem_array *sma,
+				       struct q_spinlock_node *node)
 {
-	ipc_lock_by_ptr(&sma->sem_perm);
+	ipc_lock_by_ptr(&sma->sem_perm, node);
 	ipc_rcu_putref(sma);
 }
 
-static inline void sem_getref_and_unlock(struct sem_array *sma)
+static inline void sem_getref_and_unlock(struct sem_array *sma,
+					 struct q_spinlock_node *node)
 {
 	ipc_rcu_getref(sma);
-	ipc_unlock(&(sma)->sem_perm);
+	ipc_unlock(&(sma)->sem_perm, node);
 }
 
 static inline void sem_putref(struct sem_array *sma)
 {
-	ipc_lock_by_ptr(&sma->sem_perm);
+	struct q_spinlock_node node;
+	ipc_lock_by_ptr(&sma->sem_perm, &node);
 	ipc_rcu_putref(sma);
-	ipc_unlock(&(sma)->sem_perm);
+	ipc_unlock(&(sma)->sem_perm, &node);
 }
 
 static inline void sem_rmid(struct ipc_namespace *ns, struct sem_array *s)
@@ -291,6 +297,7 @@ static int newary(struct ipc_namespace *ns, struct ipc_params *params)
 	int nsems = params->u.nsems;
 	int semflg = params->flg;
 	int i;
+	struct q_spinlock_node node;
 
 	if (!nsems)
 		return -EINVAL;
@@ -314,7 +321,7 @@ static int newary(struct ipc_namespace *ns, struct ipc_params *params)
 		return retval;
 	}
 
-	id = ipc_addid(&sem_ids(ns), &sma->sem_perm, ns->sc_semmni);
+	id = ipc_addid(&sem_ids(ns), &sma->sem_perm, ns->sc_semmni, &node);
 	if (id < 0) {
 		security_sem_free(sma);
 		ipc_rcu_putref(sma);
@@ -332,7 +339,7 @@ static int newary(struct ipc_namespace *ns, struct ipc_params *params)
 	INIT_LIST_HEAD(&sma->list_id);
 	sma->sem_nsems = nsems;
 	sma->sem_ctime = get_seconds();
-	sem_unlock(sma);
+	sem_unlock(sma, &node);
 
 	return sma->sem_perm.id;
 }
@@ -739,7 +746,8 @@ static int count_semzcnt (struct sem_array * sma, ushort semnum)
  * as a writer and the spinlock for this semaphore set hold. sem_ids.rw_mutex
  * remains locked on exit.
  */
-static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
+static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp,
+		    struct q_spinlock_node *node)
 {
 	struct sem_undo *un, *tu;
 	struct sem_queue *q, *tq;
@@ -747,7 +755,7 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 	struct list_head tasks;
 
 	/* Free the existing undo structures for this semaphore set.  */
-	assert_spin_locked(&sma->sem_perm.lock);
+	assert_q_spin_locked(&sma->sem_perm.lock);
 	list_for_each_entry_safe(un, tu, &sma->list_id, list_id) {
 		list_del(&un->list_id);
 		spin_lock(&un->ulp->lock);
@@ -766,7 +774,7 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 
 	/* Remove the semaphore set from the IDR */
 	sem_rmid(ns, sma);
-	sem_unlock(sma);
+	sem_unlock(sma, node);
 
 	wake_up_sem_queue_do(&tasks);
 	ns->used_sems -= sma->sem_nsems;
@@ -803,6 +811,7 @@ static int semctl_nolock(struct ipc_namespace *ns, int semid,
 {
 	int err;
 	struct sem_array *sma;
+	struct q_spinlock_node node;
 
 	switch(cmd) {
 	case IPC_INFO:
@@ -845,12 +854,12 @@ static int semctl_nolock(struct ipc_namespace *ns, int semid,
 		int id;
 
 		if (cmd == SEM_STAT) {
-			sma = sem_lock(ns, semid);
+			sma = sem_lock(ns, semid, &node);
 			if (IS_ERR(sma))
 				return PTR_ERR(sma);
 			id = sma->sem_perm.id;
 		} else {
-			sma = sem_lock_check(ns, semid);
+			sma = sem_lock_check(ns, semid, &node);
 			if (IS_ERR(sma))
 				return PTR_ERR(sma);
 			id = 0;
@@ -870,7 +879,7 @@ static int semctl_nolock(struct ipc_namespace *ns, int semid,
 		tbuf.sem_otime  = sma->sem_otime;
 		tbuf.sem_ctime  = sma->sem_ctime;
 		tbuf.sem_nsems  = sma->sem_nsems;
-		sem_unlock(sma);
+		sem_unlock(sma, &node);
 		if (copy_semid_to_user (arg.buf, &tbuf, version))
 			return -EFAULT;
 		return id;
@@ -879,7 +888,7 @@ static int semctl_nolock(struct ipc_namespace *ns, int semid,
 		return -EINVAL;
 	}
 out_unlock:
-	sem_unlock(sma);
+	sem_unlock(sma, &node);
 	return err;
 }
 
@@ -893,8 +902,9 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
 	ushort* sem_io = fast_sem_io;
 	int nsems;
 	struct list_head tasks;
+	struct q_spinlock_node node;
 
-	sma = sem_lock_check(ns, semid);
+	sma = sem_lock_check(ns, semid, &node);
 	if (IS_ERR(sma))
 		return PTR_ERR(sma);
 
@@ -918,7 +928,7 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
 		int i;
 
 		if(nsems > SEMMSL_FAST) {
-			sem_getref_and_unlock(sma);
+			sem_getref_and_unlock(sma, &node);
 
 			sem_io = ipc_alloc(sizeof(ushort)*nsems);
 			if(sem_io == NULL) {
@@ -926,9 +936,9 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
 				return -ENOMEM;
 			}
 
-			sem_lock_and_putref(sma);
+			sem_lock_and_putref(sma, &node);
 			if (sma->sem_perm.deleted) {
-				sem_unlock(sma);
+				sem_unlock(sma, &node);
 				err = -EIDRM;
 				goto out_free;
 			}
@@ -936,7 +946,7 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
 
 		for (i = 0; i < sma->sem_nsems; i++)
 			sem_io[i] = sma->sem_base[i].semval;
-		sem_unlock(sma);
+		sem_unlock(sma, &node);
 		err = 0;
 		if(copy_to_user(array, sem_io, nsems*sizeof(ushort)))
 			err = -EFAULT;
@@ -947,7 +957,7 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
 		int i;
 		struct sem_undo *un;
 
-		sem_getref_and_unlock(sma);
+		sem_getref_and_unlock(sma, &node);
 
 		if(nsems > SEMMSL_FAST) {
 			sem_io = ipc_alloc(sizeof(ushort)*nsems);
@@ -970,9 +980,9 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
 				goto out_free;
 			}
 		}
-		sem_lock_and_putref(sma);
+		sem_lock_and_putref(sma, &node);
 		if (sma->sem_perm.deleted) {
-			sem_unlock(sma);
+			sem_unlock(sma, &node);
 			err = -EIDRM;
 			goto out_free;
 		}
@@ -980,7 +990,7 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
 		for (i = 0; i < nsems; i++)
 			sma->sem_base[i].semval = sem_io[i];
 
-		assert_spin_locked(&sma->sem_perm.lock);
+		assert_q_spin_locked(&sma->sem_perm.lock);
 		list_for_each_entry(un, &sma->list_id, list_id) {
 			for (i = 0; i < nsems; i++)
 				un->semadj[i] = 0;
@@ -1021,7 +1031,7 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
 		if (val > SEMVMX || val < 0)
 			goto out_unlock;
 
-		assert_spin_locked(&sma->sem_perm.lock);
+		assert_q_spin_locked(&sma->sem_perm.lock);
 		list_for_each_entry(un, &sma->list_id, list_id)
 			un->semadj[semnum] = 0;
 
@@ -1035,7 +1045,7 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
 	}
 	}
 out_unlock:
-	sem_unlock(sma);
+	sem_unlock(sma, &node);
 	wake_up_sem_queue_do(&tasks);
 
 out_free:
@@ -1082,6 +1092,7 @@ static int semctl_down(struct ipc_namespace *ns, int semid,
 	int err;
 	struct semid64_ds semid64;
 	struct kern_ipc_perm *ipcp;
+	struct q_spinlock_node node;
 
 	if(cmd == IPC_SET) {
 		if (copy_semid_from_user(&semid64, arg.buf, version))
@@ -1089,7 +1100,7 @@ static int semctl_down(struct ipc_namespace *ns, int semid,
 	}
 
 	ipcp = ipcctl_pre_down(ns, &sem_ids(ns), semid, cmd,
-			       &semid64.sem_perm, 0);
+			       &semid64.sem_perm, 0, &node);
 	if (IS_ERR(ipcp))
 		return PTR_ERR(ipcp);
 
@@ -1101,7 +1112,7 @@ static int semctl_down(struct ipc_namespace *ns, int semid,
 
 	switch(cmd){
 	case IPC_RMID:
-		freeary(ns, ipcp);
+		freeary(ns, ipcp, &node);
 		goto out_up;
 	case IPC_SET:
 		err = ipc_update_perm(&semid64.sem_perm, ipcp);
@@ -1114,7 +1125,7 @@ static int semctl_down(struct ipc_namespace *ns, int semid,
 	}
 
 out_unlock:
-	sem_unlock(sma);
+	sem_unlock(sma, &node);
 out_up:
 	up_write(&sem_ids(ns).rw_mutex);
 	return err;
@@ -1237,6 +1248,7 @@ static struct sem_undo *find_alloc_undo(struct ipc_namespace *ns, int semid)
 	struct sem_undo *un, *new;
 	int nsems;
 	int error;
+	struct q_spinlock_node node;
 
 	error = get_undo_list(&ulp);
 	if (error)
@@ -1252,12 +1264,12 @@ static struct sem_undo *find_alloc_undo(struct ipc_namespace *ns, int semid)
 
 	/* no undo structure around - allocate one. */
 	/* step 1: figure out the size of the semaphore array */
-	sma = sem_lock_check(ns, semid);
+	sma = sem_lock_check(ns, semid, &node);
 	if (IS_ERR(sma))
 		return ERR_CAST(sma);
 
 	nsems = sma->sem_nsems;
-	sem_getref_and_unlock(sma);
+	sem_getref_and_unlock(sma, &node);
 
 	/* step 2: allocate new undo structure */
 	new = kzalloc(sizeof(struct sem_undo) + sizeof(short)*nsems, GFP_KERNEL);
@@ -1267,9 +1279,9 @@ static struct sem_undo *find_alloc_undo(struct ipc_namespace *ns, int semid)
 	}
 
 	/* step 3: Acquire the lock on semaphore array */
-	sem_lock_and_putref(sma);
+	sem_lock_and_putref(sma, &node);
 	if (sma->sem_perm.deleted) {
-		sem_unlock(sma);
+		sem_unlock(sma, &node);
 		kfree(new);
 		un = ERR_PTR(-EIDRM);
 		goto out;
@@ -1290,14 +1302,14 @@ static struct sem_undo *find_alloc_undo(struct ipc_namespace *ns, int semid)
 	new->semid = semid;
 	assert_spin_locked(&ulp->lock);
 	list_add_rcu(&new->list_proc, &ulp->list_proc);
-	assert_spin_locked(&sma->sem_perm.lock);
+	assert_q_spin_locked(&sma->sem_perm.lock);
 	list_add(&new->list_id, &sma->list_id);
 	un = new;
 
 success:
 	spin_unlock(&ulp->lock);
 	rcu_read_lock();
-	sem_unlock(sma);
+	sem_unlock(sma, &node);
 out:
 	return un;
 }
@@ -1342,6 +1354,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 	unsigned long jiffies_left = 0;
 	struct ipc_namespace *ns;
 	struct list_head tasks;
+	struct q_spinlock_node node;
 
 	ns = current->nsproxy->ipc_ns;
 
@@ -1392,7 +1405,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 
 	INIT_LIST_HEAD(&tasks);
 
-	sma = sem_lock_check(ns, semid);
+	sma = sem_lock_check(ns, semid, &node);
 	if (IS_ERR(sma)) {
 		if (un)
 			rcu_read_unlock();
@@ -1477,7 +1490,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 
 sleep_again:
 	current->state = TASK_INTERRUPTIBLE;
-	sem_unlock(sma);
+	sem_unlock(sma, &node);
 
 	if (timeout)
 		jiffies_left = schedule_timeout(jiffies_left);
@@ -1499,7 +1512,7 @@ sleep_again:
 		goto out_free;
 	}
 
-	sma = sem_lock(ns, semid);
+	sma = sem_lock(ns, semid, &node);
 
 	/*
 	 * Wait until it's guaranteed that no wakeup_sem_queue_do() is ongoing.
@@ -1538,7 +1551,7 @@ sleep_again:
 	unlink_queue(sma, &queue);
 
 out_unlock_free:
-	sem_unlock(sma);
+	sem_unlock(sma, &node);
 
 	wake_up_sem_queue_do(&tasks);
 out_free:
@@ -1604,6 +1617,7 @@ void exit_sem(struct task_struct *tsk)
 		struct list_head tasks;
 		int semid;
 		int i;
+		struct q_spinlock_node node;
 
 		rcu_read_lock();
 		un = list_entry_rcu(ulp->list_proc.next,
@@ -1617,7 +1631,7 @@ void exit_sem(struct task_struct *tsk)
 		if (semid == -1)
 			break;
 
-		sma = sem_lock_check(tsk->nsproxy->ipc_ns, un->semid);
+		sma = sem_lock_check(tsk->nsproxy->ipc_ns, un->semid, &node);
 
 		/* exit_sem raced with IPC_RMID, nothing to do */
 		if (IS_ERR(sma))
@@ -1628,12 +1642,12 @@ void exit_sem(struct task_struct *tsk)
 			/* exit_sem raced with IPC_RMID+semget() that created
 			 * exactly the same semid. Nothing to do.
 			 */
-			sem_unlock(sma);
+			sem_unlock(sma, &node);
 			continue;
 		}
 
 		/* remove un from the linked lists */
-		assert_spin_locked(&sma->sem_perm.lock);
+		assert_q_spin_locked(&sma->sem_perm.lock);
 		list_del(&un->list_id);
 
 		spin_lock(&ulp->lock);
@@ -1668,7 +1682,7 @@ void exit_sem(struct task_struct *tsk)
 		/* maybe some queued-up processes were waiting for this */
 		INIT_LIST_HEAD(&tasks);
 		do_smart_update(sma, NULL, 0, 1, &tasks);
-		sem_unlock(sma);
+		sem_unlock(sma, &node);
 		wake_up_sem_queue_do(&tasks);
 
 		kfree_rcu(un, rcu);
diff --git a/ipc/shm.c b/ipc/shm.c
index dff40c9f73c9..67645b4de436 100644
--- a/ipc/shm.c
+++ b/ipc/shm.c
@@ -58,13 +58,14 @@ static const struct vm_operations_struct shm_vm_ops;
 
 #define shm_ids(ns)	((ns)->ids[IPC_SHM_IDS])
 
-#define shm_unlock(shp)			\
-	ipc_unlock(&(shp)->shm_perm)
+#define shm_unlock(shp,node)			\
+	ipc_unlock(&(shp)->shm_perm,node)
 
 static int newseg(struct ipc_namespace *, struct ipc_params *);
 static void shm_open(struct vm_area_struct *vma);
 static void shm_close(struct vm_area_struct *vma);
-static void shm_destroy (struct ipc_namespace *ns, struct shmid_kernel *shp);
+static void shm_destroy (struct ipc_namespace *ns, struct shmid_kernel *shp,
+			 struct q_spinlock_node *node);
 #ifdef CONFIG_PROC_FS
 static int sysvipc_shm_proc_show(struct seq_file *s, void *it);
 #endif
@@ -83,7 +84,8 @@ void shm_init_ns(struct ipc_namespace *ns)
  * Called with shm_ids.rw_mutex (writer) and the shp structure locked.
  * Only shm_ids.rw_mutex remains locked on exit.
  */
-static void do_shm_rmid(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
+static void do_shm_rmid(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp,
+			struct q_spinlock_node *node)
 {
 	struct shmid_kernel *shp;
 	shp = container_of(ipcp, struct shmid_kernel, shm_perm);
@@ -92,9 +94,9 @@ static void do_shm_rmid(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 		shp->shm_perm.mode |= SHM_DEST;
 		/* Do not find it any more */
 		shp->shm_perm.key = IPC_PRIVATE;
-		shm_unlock(shp);
+		shm_unlock(shp, node);
 	} else
-		shm_destroy(ns, shp);
+		shm_destroy(ns, shp, node);
 }
 
 #ifdef CONFIG_IPC_NS
@@ -128,9 +130,10 @@ void __init shm_init (void)
  * shm_lock_(check_) routines are called in the paths where the rw_mutex
  * is not necessarily held.
  */
-static inline struct shmid_kernel *shm_lock(struct ipc_namespace *ns, int id)
+static inline struct shmid_kernel *shm_lock(struct ipc_namespace *ns, int id,
+					    struct q_spinlock_node *node)
 {
-	struct kern_ipc_perm *ipcp = ipc_lock(&shm_ids(ns), id);
+	struct kern_ipc_perm *ipcp = ipc_lock(&shm_ids(ns), id, node);
 
 	if (IS_ERR(ipcp))
 		return (struct shmid_kernel *)ipcp;
@@ -138,16 +141,17 @@ static inline struct shmid_kernel *shm_lock(struct ipc_namespace *ns, int id)
 	return container_of(ipcp, struct shmid_kernel, shm_perm);
 }
 
-static inline void shm_lock_by_ptr(struct shmid_kernel *ipcp)
+static inline void shm_lock_by_ptr(struct shmid_kernel *ipcp,
+				   struct q_spinlock_node *node)
 {
-	rcu_read_lock();
-	spin_lock(&ipcp->shm_perm.lock);
+	ipc_lock_by_ptr(&ipcp->shm_perm, node);
 }
 
 static inline struct shmid_kernel *shm_lock_check(struct ipc_namespace *ns,
-						int id)
+						  int id,
+						  struct q_spinlock_node *node)
 {
-	struct kern_ipc_perm *ipcp = ipc_lock_check(&shm_ids(ns), id);
+	struct kern_ipc_perm *ipcp = ipc_lock_check(&shm_ids(ns), id, node);
 
 	if (IS_ERR(ipcp))
 		return (struct shmid_kernel *)ipcp;
@@ -167,13 +171,14 @@ static void shm_open(struct vm_area_struct *vma)
 	struct file *file = vma->vm_file;
 	struct shm_file_data *sfd = shm_file_data(file);
 	struct shmid_kernel *shp;
+	struct q_spinlock_node node;
 
-	shp = shm_lock(sfd->ns, sfd->id);
+	shp = shm_lock(sfd->ns, sfd->id, &node);
 	BUG_ON(IS_ERR(shp));
 	shp->shm_atim = get_seconds();
 	shp->shm_lprid = task_tgid_vnr(current);
 	shp->shm_nattch++;
-	shm_unlock(shp);
+	shm_unlock(shp, &node);
 }
 
 /*
@@ -185,11 +190,12 @@ static void shm_open(struct vm_area_struct *vma)
  * It has to be called with shp and shm_ids.rw_mutex (writer) locked,
  * but returns with shp unlocked and freed.
  */
-static void shm_destroy(struct ipc_namespace *ns, struct shmid_kernel *shp)
+static void shm_destroy(struct ipc_namespace *ns, struct shmid_kernel *shp,
+			struct q_spinlock_node *node)
 {
 	ns->shm_tot -= (shp->shm_segsz + PAGE_SIZE - 1) >> PAGE_SHIFT;
 	shm_rmid(ns, shp);
-	shm_unlock(shp);
+	shm_unlock(shp, node);
 	if (!is_file_hugepages(shp->shm_file))
 		shmem_lock(shp->shm_file, 0, shp->mlock_user);
 	else if (shp->mlock_user)
@@ -229,18 +235,19 @@ static void shm_close(struct vm_area_struct *vma)
 	struct shm_file_data *sfd = shm_file_data(file);
 	struct shmid_kernel *shp;
 	struct ipc_namespace *ns = sfd->ns;
+	struct q_spinlock_node node;
 
 	down_write(&shm_ids(ns).rw_mutex);
 	/* remove from the list of attaches of the shm segment */
-	shp = shm_lock(ns, sfd->id);
+	shp = shm_lock(ns, sfd->id, &node);
 	BUG_ON(IS_ERR(shp));
 	shp->shm_lprid = task_tgid_vnr(current);
 	shp->shm_dtim = get_seconds();
 	shp->shm_nattch--;
 	if (shm_may_destroy(ns, shp))
-		shm_destroy(ns, shp);
+		shm_destroy(ns, shp, &node);
 	else
-		shm_unlock(shp);
+		shm_unlock(shp, &node);
 	up_write(&shm_ids(ns).rw_mutex);
 }
 
@@ -269,8 +276,9 @@ static int shm_try_destroy_current(int id, void *p, void *data)
 		return 0;
 
 	if (shm_may_destroy(ns, shp)) {
-		shm_lock_by_ptr(shp);
-		shm_destroy(ns, shp);
+		struct q_spinlock_node node;
+		shm_lock_by_ptr(shp, &node);
+		shm_destroy(ns, shp, &node);
 	}
 	return 0;
 }
@@ -292,8 +300,9 @@ static int shm_try_destroy_orphaned(int id, void *p, void *data)
 		return 0;
 
 	if (shm_may_destroy(ns, shp)) {
-		shm_lock_by_ptr(shp);
-		shm_destroy(ns, shp);
+		struct q_spinlock_node node;
+		shm_lock_by_ptr(shp, &node);
+		shm_destroy(ns, shp, &node);
 	}
 	return 0;
 }
@@ -467,6 +476,7 @@ static int newseg(struct ipc_namespace *ns, struct ipc_params *params)
 	char name[13];
 	int id;
 	vm_flags_t acctflag = 0;
+	struct q_spinlock_node node;
 
 	if (size < SHMMIN || size > ns->shm_ctlmax)
 		return -EINVAL;
@@ -510,7 +520,7 @@ static int newseg(struct ipc_namespace *ns, struct ipc_params *params)
 	if (IS_ERR(file))
 		goto no_file;
 
-	id = ipc_addid(&shm_ids(ns), &shp->shm_perm, ns->shm_ctlmni);
+	id = ipc_addid(&shm_ids(ns), &shp->shm_perm, ns->shm_ctlmni, &node);
 	if (id < 0) {
 		error = id;
 		goto no_id;
@@ -532,7 +542,7 @@ static int newseg(struct ipc_namespace *ns, struct ipc_params *params)
 
 	ns->shm_tot += numpages;
 	error = shp->shm_perm.id;
-	shm_unlock(shp);
+	shm_unlock(shp, &node);
 	return error;
 
 no_id:
@@ -737,6 +747,7 @@ static int shmctl_down(struct ipc_namespace *ns, int shmid, int cmd,
 	struct shmid64_ds shmid64;
 	struct shmid_kernel *shp;
 	int err;
+	struct q_spinlock_node node;
 
 	if (cmd == IPC_SET) {
 		if (copy_shmid_from_user(&shmid64, buf, version))
@@ -744,7 +755,7 @@ static int shmctl_down(struct ipc_namespace *ns, int shmid, int cmd,
 	}
 
 	ipcp = ipcctl_pre_down(ns, &shm_ids(ns), shmid, cmd,
-			       &shmid64.shm_perm, 0);
+			       &shmid64.shm_perm, 0, &node);
 	if (IS_ERR(ipcp))
 		return PTR_ERR(ipcp);
 
@@ -755,7 +766,7 @@ static int shmctl_down(struct ipc_namespace *ns, int shmid, int cmd,
 		goto out_unlock;
 	switch (cmd) {
 	case IPC_RMID:
-		do_shm_rmid(ns, ipcp);
+		do_shm_rmid(ns, ipcp, &node);
 		goto out_up;
 	case IPC_SET:
 		err = ipc_update_perm(&shmid64.shm_perm, ipcp);
@@ -767,7 +778,7 @@ static int shmctl_down(struct ipc_namespace *ns, int shmid, int cmd,
 		err = -EINVAL;
 	}
 out_unlock:
-	shm_unlock(shp);
+	shm_unlock(shp, &node);
 out_up:
 	up_write(&shm_ids(ns).rw_mutex);
 	return err;
@@ -778,6 +789,7 @@ SYSCALL_DEFINE3(shmctl, int, shmid, int, cmd, struct shmid_ds __user *, buf)
 	struct shmid_kernel *shp;
 	int err, version;
 	struct ipc_namespace *ns;
+	struct q_spinlock_node node;
 
 	if (cmd < 0 || shmid < 0) {
 		err = -EINVAL;
@@ -845,14 +857,14 @@ SYSCALL_DEFINE3(shmctl, int, shmid, int, cmd, struct shmid_ds __user *, buf)
 		int result;
 
 		if (cmd == SHM_STAT) {
-			shp = shm_lock(ns, shmid);
+			shp = shm_lock(ns, shmid, &node);
 			if (IS_ERR(shp)) {
 				err = PTR_ERR(shp);
 				goto out;
 			}
 			result = shp->shm_perm.id;
 		} else {
-			shp = shm_lock_check(ns, shmid);
+			shp = shm_lock_check(ns, shmid, &node);
 			if (IS_ERR(shp)) {
 				err = PTR_ERR(shp);
 				goto out;
@@ -874,7 +886,7 @@ SYSCALL_DEFINE3(shmctl, int, shmid, int, cmd, struct shmid_ds __user *, buf)
 		tbuf.shm_cpid	= shp->shm_cprid;
 		tbuf.shm_lpid	= shp->shm_lprid;
 		tbuf.shm_nattch	= shp->shm_nattch;
-		shm_unlock(shp);
+		shm_unlock(shp, &node);
 		if(copy_shmid_to_user (buf, &tbuf, version))
 			err = -EFAULT;
 		else
@@ -886,7 +898,7 @@ SYSCALL_DEFINE3(shmctl, int, shmid, int, cmd, struct shmid_ds __user *, buf)
 	{
 		struct file *shm_file;
 
-		shp = shm_lock_check(ns, shmid);
+		shp = shm_lock_check(ns, shmid, &node);
 		if (IS_ERR(shp)) {
 			err = PTR_ERR(shp);
 			goto out;
@@ -929,7 +941,7 @@ SYSCALL_DEFINE3(shmctl, int, shmid, int, cmd, struct shmid_ds __user *, buf)
 		shp->shm_perm.mode &= ~SHM_LOCKED;
 		shp->mlock_user = NULL;
 		get_file(shm_file);
-		shm_unlock(shp);
+		shm_unlock(shp, &node);
 		shmem_unlock_mapping(shm_file->f_mapping);
 		fput(shm_file);
 		goto out;
@@ -943,7 +955,7 @@ SYSCALL_DEFINE3(shmctl, int, shmid, int, cmd, struct shmid_ds __user *, buf)
 	}
 
 out_unlock:
-	shm_unlock(shp);
+	shm_unlock(shp, &node);
 out:
 	return err;
 }
@@ -971,6 +983,7 @@ long do_shmat(int shmid, char __user *shmaddr, int shmflg, ulong *raddr,
 	struct shm_file_data *sfd;
 	struct path path;
 	fmode_t f_mode;
+	struct q_spinlock_node node;
 
 	err = -EINVAL;
 	if (shmid < 0)
@@ -1012,7 +1025,7 @@ long do_shmat(int shmid, char __user *shmaddr, int shmflg, ulong *raddr,
 	 * additional creator id...
 	 */
 	ns = current->nsproxy->ipc_ns;
-	shp = shm_lock_check(ns, shmid);
+	shp = shm_lock_check(ns, shmid, &node);
 	if (IS_ERR(shp)) {
 		err = PTR_ERR(shp);
 		goto out;
@@ -1030,7 +1043,7 @@ long do_shmat(int shmid, char __user *shmaddr, int shmflg, ulong *raddr,
 	path_get(&path);
 	shp->shm_nattch++;
 	size = i_size_read(path.dentry->d_inode);
-	shm_unlock(shp);
+	shm_unlock(shp, &node);
 
 	err = -ENOMEM;
 	sfd = kzalloc(sizeof(*sfd), GFP_KERNEL);
@@ -1082,20 +1095,20 @@ out_fput:
 
 out_nattch:
 	down_write(&shm_ids(ns).rw_mutex);
-	shp = shm_lock(ns, shmid);
+	shp = shm_lock(ns, shmid, &node);
 	BUG_ON(IS_ERR(shp));
 	shp->shm_nattch--;
 	if (shm_may_destroy(ns, shp))
-		shm_destroy(ns, shp);
+		shm_destroy(ns, shp, &node);
 	else
-		shm_unlock(shp);
+		shm_unlock(shp, &node);
 	up_write(&shm_ids(ns).rw_mutex);
 
 out:
 	return err;
 
 out_unlock:
-	shm_unlock(shp);
+	shm_unlock(shp, &node);
 	goto out;
 
 out_free:
diff --git a/ipc/util.c b/ipc/util.c
index 72fd0785ac94..b4232f0cb473 100644
--- a/ipc/util.c
+++ b/ipc/util.c
@@ -178,7 +178,8 @@ void __init ipc_init_proc_interface(const char *path, const char *header,
  *	If key is found ipc points to the owning ipc structure
  */
  
-static struct kern_ipc_perm *ipc_findkey(struct ipc_ids *ids, key_t key)
+static struct kern_ipc_perm *ipc_findkey(struct ipc_ids *ids, key_t key,
+					 struct q_spinlock_node *node)
 {
 	struct kern_ipc_perm *ipc;
 	int next_id;
@@ -195,7 +196,7 @@ static struct kern_ipc_perm *ipc_findkey(struct ipc_ids *ids, key_t key)
 			continue;
 		}
 
-		ipc_lock_by_ptr(ipc);
+		ipc_lock_by_ptr(ipc, node);
 		return ipc;
 	}
 
@@ -247,7 +248,8 @@ int ipc_get_maxid(struct ipc_ids *ids)
  *	Called with ipc_ids.rw_mutex held as a writer.
  */
  
-int ipc_addid(struct ipc_ids* ids, struct kern_ipc_perm* new, int size)
+int ipc_addid(struct ipc_ids* ids, struct kern_ipc_perm* new, int size,
+	      struct q_spinlock_node *node)
 {
 	kuid_t euid;
 	kgid_t egid;
@@ -259,14 +261,14 @@ int ipc_addid(struct ipc_ids* ids, struct kern_ipc_perm* new, int size)
 	if (ids->in_use >= size)
 		return -ENOSPC;
 
-	spin_lock_init(&new->lock);
+	q_spin_lock_init(&new->lock);
 	new->deleted = 0;
 	rcu_read_lock();
-	spin_lock(&new->lock);
+	q_spin_lock(&new->lock, node);
 
 	err = idr_get_new(&ids->ipcs_idr, new, &id);
 	if (err) {
-		spin_unlock(&new->lock);
+		q_spin_unlock(&new->lock, node);
 		rcu_read_unlock();
 		return err;
 	}
@@ -368,6 +370,7 @@ static int ipcget_public(struct ipc_namespace *ns, struct ipc_ids *ids,
 	struct kern_ipc_perm *ipcp;
 	int flg = params->flg;
 	int err;
+	struct q_spinlock_node node;
 retry:
 	err = idr_pre_get(&ids->ipcs_idr, GFP_KERNEL);
 
@@ -376,7 +379,7 @@ retry:
 	 * a new entry + read locks are not "upgradable"
 	 */
 	down_write(&ids->rw_mutex);
-	ipcp = ipc_findkey(ids, params->key);
+	ipcp = ipc_findkey(ids, params->key, &node);
 	if (ipcp == NULL) {
 		/* key not used */
 		if (!(flg & IPC_CREAT))
@@ -401,7 +404,7 @@ retry:
 				 */
 				err = ipc_check_perms(ns, ipcp, ops, params);
 		}
-		ipc_unlock(ipcp);
+		ipc_unlock(ipcp, &node);
 	}
 	up_write(&ids->rw_mutex);
 
@@ -681,7 +684,8 @@ void ipc64_perm_to_ipc_perm (struct ipc64_perm *in, struct ipc_perm *out)
  * The ipc object is locked on exit.
  */
 
-struct kern_ipc_perm *ipc_lock(struct ipc_ids *ids, int id)
+struct kern_ipc_perm *ipc_lock(struct ipc_ids *ids, int id,
+			       struct q_spinlock_node *node)
 {
 	struct kern_ipc_perm *out;
 	int lid = ipcid_to_idx(id);
@@ -693,13 +697,13 @@ struct kern_ipc_perm *ipc_lock(struct ipc_ids *ids, int id)
 		return ERR_PTR(-EINVAL);
 	}
 
-	spin_lock(&out->lock);
+	q_spin_lock(&out->lock, node);
 	
 	/* ipc_rmid() may have already freed the ID while ipc_lock
 	 * was spinning: here verify that the structure is still valid
 	 */
 	if (out->deleted) {
-		spin_unlock(&out->lock);
+		q_spin_unlock(&out->lock, node);
 		rcu_read_unlock();
 		return ERR_PTR(-EINVAL);
 	}
@@ -707,16 +711,17 @@ struct kern_ipc_perm *ipc_lock(struct ipc_ids *ids, int id)
 	return out;
 }
 
-struct kern_ipc_perm *ipc_lock_check(struct ipc_ids *ids, int id)
+struct kern_ipc_perm *ipc_lock_check(struct ipc_ids *ids, int id,
+				     struct q_spinlock_node *node)
 {
 	struct kern_ipc_perm *out;
 
-	out = ipc_lock(ids, id);
+	out = ipc_lock(ids, id, node);
 	if (IS_ERR(out))
 		return out;
 
 	if (ipc_checkid(out, id)) {
-		ipc_unlock(out);
+		ipc_unlock(out, node);
 		return ERR_PTR(-EIDRM);
 	}
 
@@ -781,14 +786,15 @@ int ipc_update_perm(struct ipc64_perm *in, struct kern_ipc_perm *out)
  */
 struct kern_ipc_perm *ipcctl_pre_down(struct ipc_namespace *ns,
 				      struct ipc_ids *ids, int id, int cmd,
-				      struct ipc64_perm *perm, int extra_perm)
+				      struct ipc64_perm *perm, int extra_perm,
+				      struct q_spinlock_node *node)
 {
 	struct kern_ipc_perm *ipcp;
 	kuid_t euid;
 	int err;
 
 	down_write(&ids->rw_mutex);
-	ipcp = ipc_lock_check(ids, id);
+	ipcp = ipc_lock_check(ids, id, node);
 	if (IS_ERR(ipcp)) {
 		err = PTR_ERR(ipcp);
 		goto out_up;
@@ -805,7 +811,7 @@ struct kern_ipc_perm *ipcctl_pre_down(struct ipc_namespace *ns,
 		return ipcp;
 
 	err = -EPERM;
-	ipc_unlock(ipcp);
+	ipc_unlock(ipcp, node);
 out_up:
 	up_write(&ids->rw_mutex);
 	return ERR_PTR(err);
@@ -839,13 +845,15 @@ int ipc_parse_version (int *cmd)
 struct ipc_proc_iter {
 	struct ipc_namespace *ns;
 	struct ipc_proc_iface *iface;
+	struct q_spinlock_node node;
 };
 
 /*
  * This routine locks the ipc structure found at least at position pos.
  */
 static struct kern_ipc_perm *sysvipc_find_ipc(struct ipc_ids *ids, loff_t pos,
-					      loff_t *new_pos)
+					      loff_t *new_pos,
+					      struct q_spinlock_node *node)
 {
 	struct kern_ipc_perm *ipc;
 	int total, id;
@@ -864,7 +872,7 @@ static struct kern_ipc_perm *sysvipc_find_ipc(struct ipc_ids *ids, loff_t pos,
 		ipc = idr_find(&ids->ipcs_idr, pos);
 		if (ipc != NULL) {
 			*new_pos = pos + 1;
-			ipc_lock_by_ptr(ipc);
+			ipc_lock_by_ptr(ipc, node);
 			return ipc;
 		}
 	}
@@ -881,9 +889,10 @@ static void *sysvipc_proc_next(struct seq_file *s, void *it, loff_t *pos)
 
 	/* If we had an ipc id locked before, unlock it */
 	if (ipc && ipc != SEQ_START_TOKEN)
-		ipc_unlock(ipc);
+		ipc_unlock(ipc, &iter->node);
 
-	return sysvipc_find_ipc(&iter->ns->ids[iface->ids], *pos, pos);
+	return sysvipc_find_ipc(&iter->ns->ids[iface->ids], *pos, pos,
+				&iter->node);
 }
 
 /*
@@ -913,7 +922,7 @@ static void *sysvipc_proc_start(struct seq_file *s, loff_t *pos)
 		return SEQ_START_TOKEN;
 
 	/* Find the (pos-1)th ipc */
-	return sysvipc_find_ipc(ids, *pos - 1, pos);
+	return sysvipc_find_ipc(ids, *pos - 1, pos, &iter->node);
 }
 
 static void sysvipc_proc_stop(struct seq_file *s, void *it)
@@ -925,7 +934,7 @@ static void sysvipc_proc_stop(struct seq_file *s, void *it)
 
 	/* If we had a locked structure, release it */
 	if (ipc && ipc != SEQ_START_TOKEN)
-		ipc_unlock(ipc);
+		ipc_unlock(ipc, &iter->node);
 
 	ids = &iter->ns->ids[iface->ids];
 	/* Release the lock we took in start() */
diff --git a/ipc/util.h b/ipc/util.h
index c8fe2f7631e9..f03c36188cc3 100644
--- a/ipc/util.h
+++ b/ipc/util.h
@@ -94,7 +94,8 @@ void __init ipc_init_proc_interface(const char *path, const char *header,
 #define ipcid_to_idx(id) ((id) % SEQ_MULTIPLIER)
 
 /* must be called with ids->rw_mutex acquired for writing */
-int ipc_addid(struct ipc_ids *, struct kern_ipc_perm *, int);
+int ipc_addid(struct ipc_ids *, struct kern_ipc_perm *, int,
+	      struct q_spinlock_node *);
 
 /* must be called with ids->rw_mutex acquired for reading */
 int ipc_get_maxid(struct ipc_ids *);
@@ -121,14 +122,16 @@ void* ipc_rcu_alloc(int size);
 void ipc_rcu_getref(void *ptr);
 void ipc_rcu_putref(void *ptr);
 
-struct kern_ipc_perm *ipc_lock(struct ipc_ids *, int);
+struct kern_ipc_perm *ipc_lock(struct ipc_ids *, int,
+			       struct q_spinlock_node *);
 
 void kernel_to_ipc64_perm(struct kern_ipc_perm *in, struct ipc64_perm *out);
 void ipc64_perm_to_ipc_perm(struct ipc64_perm *in, struct ipc_perm *out);
 int ipc_update_perm(struct ipc64_perm *in, struct kern_ipc_perm *out);
 struct kern_ipc_perm *ipcctl_pre_down(struct ipc_namespace *ns,
 				      struct ipc_ids *ids, int id, int cmd,
-				      struct ipc64_perm *perm, int extra_perm);
+				      struct ipc64_perm *perm, int extra_perm,
+				      struct q_spinlock_node *node);
 
 #ifndef CONFIG_ARCH_WANT_IPC_PARSE_VERSION
   /* On IA-64, we always use the "64-bit version" of the IPC structures.  */ 
@@ -158,21 +161,25 @@ static inline int ipc_checkid(struct kern_ipc_perm *ipcp, int uid)
 	return 0;
 }
 
-static inline void ipc_lock_by_ptr(struct kern_ipc_perm *perm)
+static inline void ipc_lock_by_ptr(struct kern_ipc_perm *perm,
+				   struct q_spinlock_node *node)
 {
 	rcu_read_lock();
-	spin_lock(&perm->lock);
+	q_spin_lock(&perm->lock, node);
 }
 
-static inline void ipc_unlock(struct kern_ipc_perm *perm)
+static inline void ipc_unlock(struct kern_ipc_perm *perm,
+			      struct q_spinlock_node *node)
 {
-	spin_unlock(&perm->lock);
+	q_spin_unlock(&perm->lock, node);
 	rcu_read_unlock();
 }
 
-struct kern_ipc_perm *ipc_lock_check(struct ipc_ids *ids, int id);
+struct kern_ipc_perm *ipc_lock_check(struct ipc_ids *ids, int id,
+				     struct q_spinlock_node *node);
 int ipcget(struct ipc_namespace *ns, struct ipc_ids *ids,
 			struct ipc_ops *ops, struct ipc_params *params);
 void free_ipcs(struct ipc_namespace *ns, struct ipc_ids *ids,
-		void (*free)(struct ipc_namespace *, struct kern_ipc_perm *));
+	       void (*free)(struct ipc_namespace *, struct kern_ipc_perm *,
+			    struct q_spinlock_node *));
 #endif
-- 
1.7.7.3

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [RFC PATCH 4/6] kernel: faster queue spinlock implementation
  2013-01-22 23:13 [RFC PATCH 0/6] fast queue spinlocks Michel Lespinasse
                   ` (2 preceding siblings ...)
  2013-01-22 23:13 ` [RFC PATCH 3/6] ipc: convert ipc objects " Michel Lespinasse
@ 2013-01-22 23:13 ` Michel Lespinasse
  2013-01-23 21:55   ` Rik van Riel
                     ` (2 more replies)
  2013-01-22 23:13 ` [RFC PATCH 5/6] net: qdisc busylock updates to account for queue spinlock api change Michel Lespinasse
                   ` (2 subsequent siblings)
  6 siblings, 3 replies; 25+ messages in thread
From: Michel Lespinasse @ 2013-01-22 23:13 UTC (permalink / raw)
  To: Rik van Riel, Ingo Molnar, Paul E. McKenney, David Howells,
	Thomas Gleixner, Eric Dumazet, Eric W. Biederman, Manfred Spraul
  Cc: linux-kernel

The MCS based queue spinlock implementation was easy to use, but it has
a few annoying performance limitations under moderate contention.

- In the uncontended case, it uses atomic operations on both the acquire
  and the release paths, as opposed to the ticket spinlock which can use
  (on x86) a simple store on the release path.

- In the contended case, a thread will write both to its own
  q_spinlock_node (to initialize the wait field), to the previous
  node (to set the next field), and to the next node (to release the
  spinlock). It would be nice to reduce the number of locations accessed.

- When some thread's unlock operation races with the next thread's lock
  operation, there is a short window where the unlocker might have to spin
  waiting for the locker to fully register itself. This race is most likely
  to hit with a small number of threads and a short lock hold time, and it
  may reduce performance in that case.

Because of these limitations, the MCS queue spinlock implementation does
not always compare favorably to ticket spinlocks under moderate contention.

This alternative queue spinlock implementation has some nice properties:

- One single atomic operation (xchg) during acquire
- One single memory store for unlock. No busy looping either.
  Actually, the unlock is so short that we can just inline it.
- Same basic API as with the MCS spinlock

It does however have a couple downsides:

- Spinlock initialization makes use of kmalloc(), and can fail.
  This is easy enough to deal with for dynamically allocated objects,
  where the object allocation could have failed too; however this is
  slightly inconvenient for statically allocated objects.
- trylock() is not sanely implementable in this framework, while the MCS
  queue spinlock implementation could have supported it without issues.
- This proposal does not support acquiring queue spinlocks from hardirq
  or nmi contexts (I don't expect this to be a significant downside).

Each spinlock points to a q_spinlock_token structure, which consists of a
single 'wait' field. That value is true whenever the spinlock is locked.
Each CPU has a private stash of two tokens. To acquire the spinlock,
we must use one of the per-CPU stashed tokens, set the wait field to true,
and atomically exchange it with the spinlock's token, thus getting the
previous lock holder/waiter's token. Then we wait for the previous holder
to release the lock, by waiting for that previous token's wait field to
become false. At that point, the previous holder won't access that token
anymore so we can stash it back into our per-CPU reserve. To unlock,
we must simply remember the token we had exchanged into the spinlock
variable (it gets passed from the lock function to the unlock function
through the q_spinlock_node structure) and set its wait field to false.

Since the spinlock acquisition takes one token out of our per-CPU stash
at the start of the operation, and returns a new token to the per-CPU stash
only after it's done spinning for the previous lock owner to unlock,
we need to deal with the possibility of getting interrupted during that
process and needing to acquire a separate spinlock during that interrupt.
This situation is dealt with by having two stashed tokens per CPU, and
by not supporting queue spinlock acquisition in hardirq or nmi contexts,
so that we never need more than two levels of nesting here (for process
and softirq contexts).

Signed-off-by: Michel Lespinasse <walken@google.com>

---
 include/linux/queue_spinlock.h |   80 ++++++++++++++++++----
 init/main.c                    |    2 +
 kernel/queue_spinlock.c        |  149 ++++++++++++++++++++++++++--------------
 3 files changed, 164 insertions(+), 67 deletions(-)

diff --git a/include/linux/queue_spinlock.h b/include/linux/queue_spinlock.h
index 84b2470d232b..992b72be27e3 100644
--- a/include/linux/queue_spinlock.h
+++ b/include/linux/queue_spinlock.h
@@ -2,38 +2,80 @@
 #define __LINUX_QUEUE_SPINLOCK_H
 
 #include <linux/bug.h>
+#include <linux/preempt.h>
+#include <linux/bottom_half.h>
+#include <asm/queue_spinlock.h>
 
-struct q_spinlock_node {
-	struct q_spinlock_node *next;
+struct q_spinlock_token {
 	bool wait;
 };
 
-struct q_spinlock {
-	struct q_spinlock_node *node;
+struct q_spinlock_node {
+	struct q_spinlock_token *token;
 };
 
-#define __Q_SPIN_LOCK_UNLOCKED(name) { NULL }
+struct q_spinlock {
+	struct q_spinlock_token *token;
+};
 
 #ifdef CONFIG_SMP
 
-void q_spin_lock(struct q_spinlock *lock, struct q_spinlock_node *node);
-void q_spin_lock_bh(struct q_spinlock *lock, struct q_spinlock_node *node);
-void q_spin_unlock(struct q_spinlock *lock, struct q_spinlock_node *node);
-void q_spin_unlock_bh(struct q_spinlock *lock, struct q_spinlock_node *node);
+struct q_spinlock_token *__q_spin_lock(struct q_spinlock *lock);
+struct q_spinlock_token *__q_spin_lock_bh(struct q_spinlock *lock);
+struct q_spinlock_token *__q_spin_lock_process(struct q_spinlock *lock);
+int q_spin_lock_init(struct q_spinlock *lock);
+void q_spin_lock_destroy(struct q_spinlock *lock);
+void q_spin_lock_percpu_init(void);
+
+static inline void
+q_spin_lock(struct q_spinlock *lock, struct q_spinlock_node *node)
+{
+	node->token = __q_spin_lock(lock);
+}
+
+static inline void
+q_spin_lock_bh(struct q_spinlock *lock, struct q_spinlock_node *node)
+{
+	node->token = __q_spin_lock_bh(lock);
+}
+
+static inline void
+q_spin_lock_process(struct q_spinlock *lock, struct q_spinlock_node *node)
+{
+	node->token = __q_spin_lock_process(lock);
+}
+
+static inline void
+q_spin_unlock(struct q_spinlock *lock, struct q_spinlock_node *node)
+{
+	q_spin_unlock_mb();	/* guarantee release store semantics */
+	ACCESS_ONCE(node->token->wait) = false;
+	preempt_enable();
+}
+
+static inline void
+q_spin_unlock_bh(struct q_spinlock *lock, struct q_spinlock_node *node)
+{
+	q_spin_unlock_mb();	/* guarantee release store semantics */
+	ACCESS_ONCE(node->token->wait) = false;
+	preempt_enable_no_resched();
+	local_bh_enable_ip((unsigned long)__builtin_return_address(0));
+}
 
-static inline void q_spin_lock_init(struct q_spinlock *lock)
+static inline void
+q_spin_unlock_process(struct q_spinlock *lock, struct q_spinlock_node *node)
 {
-	lock->node = NULL;
+	q_spin_unlock(lock, node);
 }
 
 static inline void assert_q_spin_locked(struct q_spinlock *lock)
 {
-	BUG_ON(!lock->node);
+	BUG_ON(!lock->token->wait);
 }
 
 static inline void assert_q_spin_unlocked(struct q_spinlock *lock)
 {
-	BUG_ON(lock->node);
+	BUG_ON(lock->token->wait);
 }
 
 #else /* !CONFIG_SMP */
@@ -45,17 +87,27 @@ static inline void
 q_spin_lock_bh(struct q_spinlock *lock, struct q_spinlock_node *node) {}
 
 static inline void
+q_spin_lock_process(struct q_spinlock *lock, struct q_spinlock_node *node) {}
+
+static inline void
 q_spin_unlock(struct q_spinlock *lock, struct q_spinlock_node *node) {}
 
 static inline void
 q_spin_unlock_bh(struct q_spinlock *lock, struct q_spinlock_node *node) {}
 
-static inline void q_spin_lock_init(struct q_spinlock *lock) {}
+static inline void
+q_spin_unlock_process(struct q_spinlock *lock, struct q_spinlock_node *node) {}
+
+static inline int q_spin_lock_init(struct q_spinlock *lock) { return 0; }
+
+static inline void q_spin_lock_destroy(struct q_spinlock *lock) {}
 
 static inline void assert_q_spin_locked(struct q_spinlock *lock) {}
 
 static inline void assert_q_spin_unlocked(struct q_spinlock *lock) {}
 
+static inline void q_spin_lock_percpu_init(void) {};
+
 #endif /* CONFIG_SMP */
 
 #endif /* __LINUX_QUEUE_SPINLOCK_H */
diff --git a/init/main.c b/init/main.c
index e33e09df3cbc..ebfb6bd4a819 100644
--- a/init/main.c
+++ b/init/main.c
@@ -70,6 +70,7 @@
 #include <linux/perf_event.h>
 #include <linux/file.h>
 #include <linux/ptrace.h>
+#include <linux/queue_spinlock.h>
 
 #include <asm/io.h>
 #include <asm/bugs.h>
@@ -525,6 +526,7 @@ asmlinkage void __init start_kernel(void)
 	sort_main_extable();
 	trap_init();
 	mm_init();
+	q_spin_lock_percpu_init();
 
 	/*
 	 * Set up the scheduler prior starting any interrupts (such as the
diff --git a/kernel/queue_spinlock.c b/kernel/queue_spinlock.c
index 20304f0bc852..b5715088e9cd 100644
--- a/kernel/queue_spinlock.c
+++ b/kernel/queue_spinlock.c
@@ -2,81 +2,124 @@
 #include <linux/export.h>
 #include <linux/preempt.h>
 #include <linux/bottom_half.h>
-#include <asm/barrier.h>
+#include <linux/percpu.h>
+#include <linux/slab.h>
+#include <linux/hardirq.h>
+#include <linux/cache.h>
 #include <asm/processor.h>	/* for cpu_relax() */
 #include <asm/queue_spinlock.h>
 
-static inline void
-__q_spin_lock(struct q_spinlock *lock, struct q_spinlock_node *node)
+DEFINE_PER_CPU(struct q_spinlock_token *, q_spinlock_token[2]);
+
+static inline struct q_spinlock_token *
+____q_spin_lock(struct q_spinlock *lock,
+		struct q_spinlock_token **percpu_token)
 {
-	struct q_spinlock_node *prev;
-
-	node->next = NULL;
-	prev = xchg(&lock->node, node);
-	if (likely(!prev))
-		/*
-		 * Acquired the lock.
-		 * xchg() already includes a full memory barrier.
-		 */
-		return;
-
-	node->wait = true;
-	smp_wmb();
-	ACCESS_ONCE(prev->next) = node;
-	while (ACCESS_ONCE(node->wait))
+	/*
+	 * Careful about reentrancy here - if we are interrupted and the code
+	 * running in that interrupt tries to get another queue spinlock,
+	 * it must not use the same percpu_token that we're using here.
+	 */
+
+	struct q_spinlock_token *token, *prev;
+
+	token = __this_cpu_read(*percpu_token);
+	token->wait = true;
+	prev = xchg(&lock->token, token);
+	__this_cpu_write(*percpu_token, prev);
+	while (ACCESS_ONCE(prev->wait))
 		cpu_relax();
 	q_spin_lock_mb();	/* guarantee acquire load semantics */
+	return token;
 }
 
-static inline void
-__q_spin_unlock(struct q_spinlock *lock, struct q_spinlock_node *node)
+#ifdef CONFIG_DEBUG_SPINLOCK
+#define SPINLOCK_WARN_ON(x) WARN_ON_ONCE(x)
+#else
+#define SPINLOCK_WARN_ON(x)
+#endif
+
+/*
+ * Acquire queue spinlock from process context.  If an interrupt comes,
+ * the BH may call __q_spin_lock() or __q_spin_lock_bh() before we return.
+ */
+struct q_spinlock_token *__q_spin_lock_process(struct q_spinlock *lock)
 {
-	struct q_spinlock_node *next;
-	if (likely(!(next = ACCESS_ONCE(node->next)))) {
-		/*
-		 * Try to release lock.
-		 * cmpxchg() already includes a full memory barrier.
-		 */
-		if (likely(cmpxchg(&lock->node, node, NULL) == node))
-			return;
-
-		/*
-		 * We raced with __q_spin_lock().
-		 * Wait for next thread to be known.
-		 */
-		while (!(next = ACCESS_ONCE(node->next)))
-			cpu_relax();
-	}
-	q_spin_unlock_mb();	/* guarantee release store semantics */
-	ACCESS_ONCE(next->wait) = false;
+	/* Call only from process context */
+	SPINLOCK_WARN_ON(in_serving_softirq() | in_irq() || in_nmi());
+
+	preempt_disable();
+	return ____q_spin_lock(lock, q_spinlock_token);
 }
+EXPORT_SYMBOL(__q_spin_lock_process);
 
-void q_spin_lock(struct q_spinlock *lock, struct q_spinlock_node *node)
+/*
+ * Acquire queue spinlock with BH processing already known to be disabled
+ * (thus there can't be nested queue spinlock acquisitions before we return).
+ */
+struct q_spinlock_token *__q_spin_lock(struct q_spinlock *lock)
 {
+	/* Do not call from hardirq context */
+	SPINLOCK_WARN_ON(in_irq() || in_nmi());
+
+	/*
+	 * Do not call with BH processing enabled
+	 * (call q_spin_lock_process() instead)
+	 */
+	SPINLOCK_WARN_ON(!in_softirq());
+
 	preempt_disable();
-	__q_spin_lock(lock, node);
+	return ____q_spin_lock(lock, q_spinlock_token + 1);
 }
-EXPORT_SYMBOL(q_spin_lock);
+EXPORT_SYMBOL(__q_spin_lock);
 
-void q_spin_lock_bh(struct q_spinlock *lock, struct q_spinlock_node *node)
+/*
+ * Acquire queue spinlock and disable BH processing
+ * (thus there can't be nested queue spinlock acquisitions before we return).
+ */
+struct q_spinlock_token *__q_spin_lock_bh(struct q_spinlock *lock)
 {
+	/* Do not call from hardirq context */
+	SPINLOCK_WARN_ON(in_irq() || in_nmi());
+
 	local_bh_disable();
 	preempt_disable();
-	__q_spin_lock(lock, node);
+	return ____q_spin_lock(lock, q_spinlock_token + 1);
+}
+EXPORT_SYMBOL(__q_spin_lock_bh);
+
+struct q_spinlock_alloc_token {
+	struct q_spinlock_token ____cacheline_aligned t;
+};
+
+int q_spin_lock_init(struct q_spinlock *lock)
+{
+	struct q_spinlock_token *token;
+	token = kmalloc(sizeof(struct q_spinlock_alloc_token), GFP_KERNEL);
+	if (!token)
+		return -ENOMEM;
+	token->wait = false;
+	lock->token = token;
+	return 0;
 }
-EXPORT_SYMBOL(q_spin_lock_bh);
+EXPORT_SYMBOL(q_spin_lock_init);
 
-void q_spin_unlock(struct q_spinlock *lock, struct q_spinlock_node *node)
+void q_spin_lock_destroy(struct q_spinlock *lock)
 {
-	__q_spin_unlock(lock, node);
-	preempt_enable();
+	kfree(lock->token);
 }
-EXPORT_SYMBOL(q_spin_unlock);
+EXPORT_SYMBOL(q_spin_lock_destroy);
 
-void q_spin_unlock_bh(struct q_spinlock *lock, struct q_spinlock_node *node)
+void q_spin_lock_percpu_init(void)
 {
-	__q_spin_unlock(lock, node);
-	preempt_enable_no_resched();
-	local_bh_enable_ip((unsigned long)__builtin_return_address(0));
+	unsigned int cpu, i;
+	struct q_spinlock_token *token;
+	for_each_possible_cpu(cpu)
+		for (i = 0; i < 2; i++) {
+			token = kmalloc(sizeof(struct q_spinlock_alloc_token),
+					GFP_KERNEL);
+			BUG_ON(!token);
+			token->wait = false;
+			per_cpu(q_spinlock_token[i], cpu) = token;
+		}
 }
-EXPORT_SYMBOL(q_spin_unlock_bh);
-- 
1.7.7.3

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [RFC PATCH 5/6] net: qdisc busylock updates to account for queue spinlock api change
  2013-01-22 23:13 [RFC PATCH 0/6] fast queue spinlocks Michel Lespinasse
                   ` (3 preceding siblings ...)
  2013-01-22 23:13 ` [RFC PATCH 4/6] kernel: faster queue spinlock implementation Michel Lespinasse
@ 2013-01-22 23:13 ` Michel Lespinasse
  2013-01-22 23:13 ` [RFC PATCH 6/6] ipc: object locking " Michel Lespinasse
  2013-01-22 23:17 ` [RFC PATCH 0/6] fast queue spinlocks Michel Lespinasse
  6 siblings, 0 replies; 25+ messages in thread
From: Michel Lespinasse @ 2013-01-22 23:13 UTC (permalink / raw)
  To: Rik van Riel, Ingo Molnar, Paul E. McKenney, David Howells,
	Thomas Gleixner, Eric Dumazet, Eric W. Biederman, Manfred Spraul
  Cc: linux-kernel

Due to limitations of the second queue spinlock implementation,
a static spinlock initializer is not available anymore, so we must
use an init function instead. Additionally, for dynamically allocated
qdiscs we must handle the case of running out of memory as we try to
initialize the busylock spinlock.

Signed-off-by: Michel Lespinasse <walken@google.com>

---
 net/sched/sch_generic.c |   15 +++++++++++----
 1 files changed, 11 insertions(+), 4 deletions(-)

diff --git a/net/sched/sch_generic.c b/net/sched/sch_generic.c
index 6675d30d526a..1f9458b54ad6 100644
--- a/net/sched/sch_generic.c
+++ b/net/sched/sch_generic.c
@@ -362,7 +362,6 @@ struct Qdisc noop_qdisc = {
 	.list		=	LIST_HEAD_INIT(noop_qdisc.list),
 	.q.lock		=	__SPIN_LOCK_UNLOCKED(noop_qdisc.q.lock),
 	.dev_queue	=	&noop_netdev_queue,
-	.busylock	=	__Q_SPIN_LOCK_UNLOCKED(noop_qdisc.busylock),
 };
 EXPORT_SYMBOL(noop_qdisc);
 
@@ -389,9 +388,14 @@ static struct Qdisc noqueue_qdisc = {
 	.list		=	LIST_HEAD_INIT(noqueue_qdisc.list),
 	.q.lock		=	__SPIN_LOCK_UNLOCKED(noqueue_qdisc.q.lock),
 	.dev_queue	=	&noqueue_netdev_queue,
-	.busylock	=	__Q_SPIN_LOCK_UNLOCKED(noqueue_qdisc.busylock),
 };
 
+static int __init sch_generic_init(void)
+{
+	return q_spin_lock_init(&noop_qdisc.busylock) ||
+		q_spin_lock_init(&noqueue_qdisc.busylock);
+}
+module_init(sch_generic_init);
 
 static const u8 prio2band[TC_PRIO_MAX + 1] = {
 	1, 2, 2, 2, 1, 2, 0, 0 , 1, 1, 1, 1, 1, 1, 1, 1
@@ -552,11 +556,11 @@ struct Qdisc *qdisc_alloc(struct netdev_queue *dev_queue,
 		sch = (struct Qdisc *) QDISC_ALIGN((unsigned long) p);
 		sch->padded = (char *) sch - (char *) p;
 	}
+	if (q_spin_lock_init(&sch->busylock))
+		goto freeout;
 	INIT_LIST_HEAD(&sch->list);
 	skb_queue_head_init(&sch->q);
 
-	q_spin_lock_init(&sch->busylock);
-
 	sch->ops = ops;
 	sch->enqueue = ops->enqueue;
 	sch->dequeue = ops->dequeue;
@@ -565,6 +569,8 @@ struct Qdisc *qdisc_alloc(struct netdev_queue *dev_queue,
 	atomic_set(&sch->refcnt, 1);
 
 	return sch;
+freeout:
+	kfree(p);
 errout:
 	return ERR_PTR(err);
 }
@@ -609,6 +615,7 @@ static void qdisc_rcu_free(struct rcu_head *head)
 {
 	struct Qdisc *qdisc = container_of(head, struct Qdisc, rcu_head);
 
+	q_spin_lock_destroy(&qdisc->busylock);
 	kfree((char *) qdisc - qdisc->padded);
 }
 
-- 
1.7.7.3

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [RFC PATCH 6/6] ipc: object locking updates to account for queue spinlock api change
  2013-01-22 23:13 [RFC PATCH 0/6] fast queue spinlocks Michel Lespinasse
                   ` (4 preceding siblings ...)
  2013-01-22 23:13 ` [RFC PATCH 5/6] net: qdisc busylock updates to account for queue spinlock api change Michel Lespinasse
@ 2013-01-22 23:13 ` Michel Lespinasse
  2013-01-22 23:17 ` [RFC PATCH 0/6] fast queue spinlocks Michel Lespinasse
  6 siblings, 0 replies; 25+ messages in thread
From: Michel Lespinasse @ 2013-01-22 23:13 UTC (permalink / raw)
  To: Rik van Riel, Ingo Molnar, Paul E. McKenney, David Howells,
	Thomas Gleixner, Eric Dumazet, Eric W. Biederman, Manfred Spraul
  Cc: linux-kernel

Compared to the previous change updating the ipc subsystem to use
queue spinlocks, this is very minimal. The changes are:

- As we run in process context without disabling BH processing, it is
  possible that we get interrupted in the middle of a spinlock acquisition.
  Use q_spin_lock_process() instead of q_spin_lock() in order to account
  for that by using the per-CPU token that is reserved for such process
  context processing.

- Deal with the possibility of running out of memory as we initialize
  a dynamically allocated IPC object's spinlock, and destroy the spinlock
  when the ipc object is freed. There is a bit of complexity here as IPC
  objects use a scheme where object deletion is delayed by one RCU grace
  period, during which the object lock may still be accessed. In order to
  address this we have to make the ipc rcu allocation/freeing system
  aware of where the object spinlocks are located within the allocated
  object. This is not too hard in practice, as all ipc objects start
  with the kern_ipc_perm structure, which is where the object's spinlock
  is located.

Signed-off-by: Michel Lespinasse <walken@google.com>

---
 include/linux/ipc.h |    5 +--
 include/linux/msg.h |    2 +-
 include/linux/shm.h |    2 +-
 ipc/sem.c           |    3 +-
 ipc/util.c          |   77 ++++++++++++++++++++++++++++++++++----------------
 ipc/util.h          |    4 +-
 6 files changed, 60 insertions(+), 33 deletions(-)

diff --git a/include/linux/ipc.h b/include/linux/ipc.h
index 81693a8a5177..4bc1751664fe 100644
--- a/include/linux/ipc.h
+++ b/include/linux/ipc.h
@@ -7,9 +7,8 @@
 
 #define IPCMNI 32768  /* <= MAX_INT limit for ipc arrays (including sysctl changes) */
 
-/* used by in-kernel data structures */
-struct kern_ipc_perm
-{
+/* used by in-kernel data structures, must be placed first */
+struct kern_ipc_perm {
 	struct q_spinlock	lock;
 	int		deleted;
 	int		id;
diff --git a/include/linux/msg.h b/include/linux/msg.h
index 7a4b9e97d29a..36a69d8d9a95 100644
--- a/include/linux/msg.h
+++ b/include/linux/msg.h
@@ -16,7 +16,7 @@ struct msg_msg {
 
 /* one msq_queue structure for each present queue on the system */
 struct msg_queue {
-	struct kern_ipc_perm q_perm;
+	struct kern_ipc_perm q_perm;	/* permissions, must be first */
 	time_t q_stime;			/* last msgsnd time */
 	time_t q_rtime;			/* last msgrcv time */
 	time_t q_ctime;			/* last change time */
diff --git a/include/linux/shm.h b/include/linux/shm.h
index bcf8a6a3ec00..2e74b96922df 100644
--- a/include/linux/shm.h
+++ b/include/linux/shm.h
@@ -8,7 +8,7 @@
 #include <asm/shmparam.h>
 struct shmid_kernel /* private to the kernel */
 {	
-	struct kern_ipc_perm	shm_perm;
+	struct kern_ipc_perm	shm_perm;	/* must be first */
 	struct file *		shm_file;
 	unsigned long		shm_nattch;
 	unsigned long		shm_segsz;
diff --git a/ipc/sem.c b/ipc/sem.c
index 84b7f3b2c632..e1f24942b294 100644
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -309,7 +309,6 @@ static int newary(struct ipc_namespace *ns, struct ipc_params *params)
 	if (!sma) {
 		return -ENOMEM;
 	}
-	memset (sma, 0, size);
 
 	sma->sem_perm.mode = (semflg & S_IRWXUGO);
 	sma->sem_perm.key = key;
@@ -330,6 +329,7 @@ static int newary(struct ipc_namespace *ns, struct ipc_params *params)
 	ns->used_sems += nsems;
 
 	sma->sem_base = (struct sem *) &sma[1];
+	memset (sma->sem_base, 0, nsems * sizeof (struct sem));
 
 	for (i = 0; i < nsems; i++)
 		INIT_LIST_HEAD(&sma->sem_base[i].sem_pending);
@@ -338,6 +338,7 @@ static int newary(struct ipc_namespace *ns, struct ipc_params *params)
 	INIT_LIST_HEAD(&sma->sem_pending);
 	INIT_LIST_HEAD(&sma->list_id);
 	sma->sem_nsems = nsems;
+	sma->sem_otime = 0;
 	sma->sem_ctime = get_seconds();
 	sem_unlock(sma, &node);
 
diff --git a/ipc/util.c b/ipc/util.c
index b4232f0cb473..f4f94b0c2e1b 100644
--- a/ipc/util.c
+++ b/ipc/util.c
@@ -255,20 +255,21 @@ int ipc_addid(struct ipc_ids* ids, struct kern_ipc_perm* new, int size,
 	kgid_t egid;
 	int id, err;
 
+	assert_q_spin_unlocked(&new->lock);
+
 	if (size > IPCMNI)
 		size = IPCMNI;
 
 	if (ids->in_use >= size)
 		return -ENOSPC;
 
-	q_spin_lock_init(&new->lock);
 	new->deleted = 0;
 	rcu_read_lock();
-	q_spin_lock(&new->lock, node);
+	q_spin_lock_process(&new->lock, node);
 
 	err = idr_get_new(&ids->ipcs_idr, new, &id);
 	if (err) {
-		q_spin_unlock(&new->lock, node);
+		q_spin_unlock_process(&new->lock, node);
 		rcu_read_unlock();
 		return err;
 	}
@@ -480,28 +481,28 @@ void ipc_free(void* ptr, int size)
  * - [only if vmalloc]: ipc_rcu_sched.
  * Their lifetime doesn't overlap, thus the headers share the same memory.
  * Unlike a normal union, they are right-aligned, thus some container_of
- * forward/backward casting is necessary:
+ * forward/backward casting is necessary.
+ * Note that the first data field following these headers is always
+ * of type struct kern_ipc_perm.
  */
 struct ipc_rcu_hdr
 {
 	int refcount;
 	int is_vmalloc;
-	void *data[0];
+	struct kern_ipc_perm data[0];
 };
 
 
 struct ipc_rcu_grace
 {
 	struct rcu_head rcu;
-	/* "void *" makes sure alignment of following data is sane. */
-	void *data[0];
+	struct kern_ipc_perm data[0];
 };
 
 struct ipc_rcu_sched
 {
 	struct work_struct work;
-	/* "void *" makes sure alignment of following data is sane. */
-	void *data[0];
+	struct kern_ipc_perm data[0];
 };
 
 #define HDRLEN_KMALLOC		(sizeof(struct ipc_rcu_grace) > sizeof(struct ipc_rcu_hdr) ? \
@@ -528,24 +529,33 @@ static inline int rcu_use_vmalloc(int size)
  
 void* ipc_rcu_alloc(int size)
 {
-	void* out;
+	void* alloc;
+	struct kern_ipc_perm *out = NULL;
 	/* 
 	 * We prepend the allocation with the rcu struct, and
 	 * workqueue if necessary (for vmalloc). 
 	 */
 	if (rcu_use_vmalloc(size)) {
-		out = vmalloc(HDRLEN_VMALLOC + size);
-		if (out) {
-			out += HDRLEN_VMALLOC;
-			container_of(out, struct ipc_rcu_hdr, data)->is_vmalloc = 1;
-			container_of(out, struct ipc_rcu_hdr, data)->refcount = 1;
+		alloc = vmalloc(HDRLEN_VMALLOC + size);
+		if (alloc) {
+			out = alloc + HDRLEN_VMALLOC;
+			if (q_spin_lock_init(&out->lock)) {
+				vfree(alloc);
+				return NULL;
+			}
+			container_of(out, struct ipc_rcu_hdr, data[0])->is_vmalloc = 1;
+			container_of(out, struct ipc_rcu_hdr, data[0])->refcount = 1;
 		}
 	} else {
-		out = kmalloc(HDRLEN_KMALLOC + size, GFP_KERNEL);
-		if (out) {
-			out += HDRLEN_KMALLOC;
-			container_of(out, struct ipc_rcu_hdr, data)->is_vmalloc = 0;
-			container_of(out, struct ipc_rcu_hdr, data)->refcount = 1;
+		alloc = kmalloc(HDRLEN_KMALLOC + size, GFP_KERNEL);
+		if (alloc) {
+			out = alloc + HDRLEN_KMALLOC;
+			if (q_spin_lock_init(&out->lock)) {
+				kfree(alloc);
+				return NULL;
+			}
+			container_of(out, struct ipc_rcu_hdr, data[0])->is_vmalloc = 0;
+			container_of(out, struct ipc_rcu_hdr, data[0])->refcount = 1;
 		}
 	}
 
@@ -575,13 +585,29 @@ static void ipc_schedule_free(struct rcu_head *head)
 	struct ipc_rcu_sched *sched;
 
 	grace = container_of(head, struct ipc_rcu_grace, rcu);
-	sched = container_of(&(grace->data[0]), struct ipc_rcu_sched,
-				data[0]);
+	assert_q_spin_unlocked(&grace->data[0].lock);
+	q_spin_lock_destroy(&grace->data[0].lock);
 
+	sched = container_of(&(grace->data[0]), struct ipc_rcu_sched, data[0]);
 	INIT_WORK(&sched->work, ipc_do_vfree);
 	schedule_work(&sched->work);
 }
 
+/**
+ * ipc_immediate_free - free ipc + rcu space
+ * @head: RCU callback structure that contains pointer to be freed
+ *
+ * Free from the RCU callback context.
+ */
+static void ipc_immediate_free(struct rcu_head *head)
+{
+	struct ipc_rcu_grace *free =
+		container_of(head, struct ipc_rcu_grace, rcu);
+	assert_q_spin_unlocked(&free->data[0].lock);
+	q_spin_lock_destroy(&free->data[0].lock);
+	kfree(free);
+}
+
 void ipc_rcu_putref(void *ptr)
 {
 	if (--container_of(ptr, struct ipc_rcu_hdr, data)->refcount > 0)
@@ -591,7 +617,8 @@ void ipc_rcu_putref(void *ptr)
 		call_rcu(&container_of(ptr, struct ipc_rcu_grace, data)->rcu,
 				ipc_schedule_free);
 	} else {
-		kfree_rcu(container_of(ptr, struct ipc_rcu_grace, data), rcu);
+		call_rcu(&container_of(ptr, struct ipc_rcu_grace, data)->rcu,
+				ipc_immediate_free);
 	}
 }
 
@@ -697,13 +724,13 @@ struct kern_ipc_perm *ipc_lock(struct ipc_ids *ids, int id,
 		return ERR_PTR(-EINVAL);
 	}
 
-	q_spin_lock(&out->lock, node);
+	q_spin_lock_process(&out->lock, node);
 	
 	/* ipc_rmid() may have already freed the ID while ipc_lock
 	 * was spinning: here verify that the structure is still valid
 	 */
 	if (out->deleted) {
-		q_spin_unlock(&out->lock, node);
+		q_spin_unlock_process(&out->lock, node);
 		rcu_read_unlock();
 		return ERR_PTR(-EINVAL);
 	}
diff --git a/ipc/util.h b/ipc/util.h
index f03c36188cc3..eb547e35ffbe 100644
--- a/ipc/util.h
+++ b/ipc/util.h
@@ -165,13 +165,13 @@ static inline void ipc_lock_by_ptr(struct kern_ipc_perm *perm,
 				   struct q_spinlock_node *node)
 {
 	rcu_read_lock();
-	q_spin_lock(&perm->lock, node);
+	q_spin_lock_process(&perm->lock, node);
 }
 
 static inline void ipc_unlock(struct kern_ipc_perm *perm,
 			      struct q_spinlock_node *node)
 {
-	q_spin_unlock(&perm->lock, node);
+	q_spin_unlock_process(&perm->lock, node);
 	rcu_read_unlock();
 }
 
-- 
1.7.7.3

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* Re: [RFC PATCH 0/6] fast queue spinlocks
  2013-01-22 23:13 [RFC PATCH 0/6] fast queue spinlocks Michel Lespinasse
                   ` (5 preceding siblings ...)
  2013-01-22 23:13 ` [RFC PATCH 6/6] ipc: object locking " Michel Lespinasse
@ 2013-01-22 23:17 ` Michel Lespinasse
  6 siblings, 0 replies; 25+ messages in thread
From: Michel Lespinasse @ 2013-01-22 23:17 UTC (permalink / raw)
  To: Rik van Riel, Ingo Molnar, Paul E. McKenney, David Howells,
	Thomas Gleixner, Eric Dumazet, Eric W. Biederman, Manfred Spraul
  Cc: linux-kernel

[-- Attachment #1: Type: text/plain, Size: 930 bytes --]

On Tue, Jan 22, 2013 at 3:13 PM, Michel Lespinasse <walken@google.com> wrote:
> Additionally I will attach (as a reply to this email) graphs showing total
> spinlock throughput for a microbenchmark consisting of N threads doing
> lock/unlock operations in a tight loop. We can see that the proposed
> fast queue spinlock is comparable to ticket spinlocks for low number
> of threads, scales much better for high number of threads, and is always
> faster than the MCS strawman proposal (which did have the issue of being
> kinda slow at around 2-3 threads).
> mach1 is a 4 socket AMD Barcelona system (16 cores total)
> mach2 is a 2 socket Intel Westmere system (12 cores / 24 threads total)
> mach3 is a 2 socket Intel Sandybridge system (16 cores / 32 threads total)

Graphs are attached (admittedly not the most interesting benchmark).

-- 
Michel "Walken" Lespinasse
A program is never fully debugged until the last user dies.

[-- Attachment #2: graph.png --]
[-- Type: image/png, Size: 5128 bytes --]

[-- Attachment #3: graph.png --]
[-- Type: image/png, Size: 5029 bytes --]

[-- Attachment #4: graph.png --]
[-- Type: image/png, Size: 4740 bytes --]

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [RFC PATCH 4/6] kernel: faster queue spinlock implementation
  2013-01-22 23:13 ` [RFC PATCH 4/6] kernel: faster queue spinlock implementation Michel Lespinasse
@ 2013-01-23 21:55   ` Rik van Riel
  2013-01-23 23:52     ` Michel Lespinasse
  2013-01-24  0:18   ` Eric Dumazet
  2013-01-25 20:30   ` [RFC PATCH 7/6] kernel: document fast queue spinlocks Rik van Riel
  2 siblings, 1 reply; 25+ messages in thread
From: Rik van Riel @ 2013-01-23 21:55 UTC (permalink / raw)
  To: Michel Lespinasse
  Cc: Ingo Molnar, Paul E. McKenney, David Howells, Thomas Gleixner,
	Eric Dumazet, Eric W. Biederman, Manfred Spraul, linux-kernel

On 01/22/2013 06:13 PM, Michel Lespinasse wrote:

> Because of these limitations, the MCS queue spinlock implementation does
> not always compare favorably to ticket spinlocks under moderate contention.
>
> This alternative queue spinlock implementation has some nice properties:
>
> - One single atomic operation (xchg) during acquire
> - One single memory store for unlock. No busy looping either.
>    Actually, the unlock is so short that we can just inline it.
> - Same basic API as with the MCS spinlock

There is one thing I do not understand about these locks.

> +static inline void
> +q_spin_unlock(struct q_spinlock *lock, struct q_spinlock_node *node)
> +{
> +	q_spin_unlock_mb();	/* guarantee release store semantics */
> +	ACCESS_ONCE(node->token->wait) = false;
> +	preempt_enable();
> +}

Here you set wait to false, in the CPU-local (on the current CPU)
queue lock token. Afterwards, the same CPU could try to lock another
lock, using the same token...

> +DEFINE_PER_CPU(struct q_spinlock_token *, q_spinlock_token[2]);
> +
> +static inline struct q_spinlock_token *
> +____q_spin_lock(struct q_spinlock *lock,
> +		struct q_spinlock_token **percpu_token)
>   {
> +	/*
> +	 * Careful about reentrancy here - if we are interrupted and the code
> +	 * running in that interrupt tries to get another queue spinlock,
> +	 * it must not use the same percpu_token that we're using here.
> +	 */
> +
> +	struct q_spinlock_token *token, *prev;
> +
> +	token = __this_cpu_read(*percpu_token);
> +	token->wait = true;
> +	prev = xchg(&lock->token, token);
> +	__this_cpu_write(*percpu_token, prev);
> +	while (ACCESS_ONCE(prev->wait))
>   		cpu_relax();
>   	q_spin_lock_mb();	/* guarantee acquire load semantics */
> +	return token;
>   }

Here a CPU trying to take the lock will spin on the previous
CPU's token.

However, the previous CPU can immediately re-use its token.

It looks like it might be possible for the CPU trying to
acquire the lock to miss prev->wait being set to false, and
continue spinning.

If this lock type is widely used, could that lead to a deadlock?

Is there something in your code that guarantees the scenario
I described cannot happen, and I just missed it?

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [RFC PATCH 4/6] kernel: faster queue spinlock implementation
  2013-01-23 21:55   ` Rik van Riel
@ 2013-01-23 23:52     ` Michel Lespinasse
  0 siblings, 0 replies; 25+ messages in thread
From: Michel Lespinasse @ 2013-01-23 23:52 UTC (permalink / raw)
  To: Rik van Riel
  Cc: Ingo Molnar, Paul E. McKenney, David Howells, Thomas Gleixner,
	Eric Dumazet, Eric W. Biederman, Manfred Spraul, linux-kernel

On Wed, Jan 23, 2013 at 1:55 PM, Rik van Riel <riel@redhat.com> wrote:
> There is one thing I do not understand about these locks.

Ah, I need to explain it better then :)

> On 01/22/2013 06:13 PM, Michel Lespinasse wrote:
>> +static inline void
>> +q_spin_unlock(struct q_spinlock *lock, struct q_spinlock_node *node)
>> +{
>> +       q_spin_unlock_mb();     /* guarantee release store semantics */
>> +       ACCESS_ONCE(node->token->wait) = false;
>> +       preempt_enable();
>> +}
>
> Here you set wait to false, in the CPU-local (on the current CPU)
> queue lock token. Afterwards, the same CPU could try to lock another
> lock, using the same token...

No, it won't use the same token. Besically, setting token->wait =
false does two things:
- it indicates to the next waiter (if any) that it doesn't need to spin anymore;
- it releases ownership of the token so that the next thread to
acquire the spinlock (or that may be already spinning on it) can reuse
the token.

>> +DEFINE_PER_CPU(struct q_spinlock_token *, q_spinlock_token[2]);
>> +
>> +static inline struct q_spinlock_token *
>> +____q_spin_lock(struct q_spinlock *lock,
>> +               struct q_spinlock_token **percpu_token)
>>   {
>> +       /*
>> +        * Careful about reentrancy here - if we are interrupted and the
>> code
>> +        * running in that interrupt tries to get another queue spinlock,
>> +        * it must not use the same percpu_token that we're using here.
>> +        */
>> +
>> +       struct q_spinlock_token *token, *prev;
>> +
>> +       token = __this_cpu_read(*percpu_token);
>> +       token->wait = true;
>> +       prev = xchg(&lock->token, token);
>> +       __this_cpu_write(*percpu_token, prev);
>> +       while (ACCESS_ONCE(prev->wait))
>>                 cpu_relax();
>>         q_spin_lock_mb();       /* guarantee acquire load semantics */
>> +       return token;
>>   }
>
> Here a CPU trying to take the lock will spin on the previous
> CPU's token.
>
> However, the previous CPU can immediately re-use its token.

No, they'll use a different token every time.
I think what confused you is that in __q_spin_lock_process(),
percpu_token always points to q_spinlock_token[0]. But that in turns
points to a new token every time (actually points to the token that
was reclaimed on the last spinlock acquisition), so that consecutive
spinlock acquisitions on the same CPU won't keep reusing the same
token.

>> +       token = __this_cpu_read(*percpu_token);

This get the token that was reclaimed during the last spinlock
acquisition. We know the token is available to us, because the
previous spinlock acquisition waited for token->wait to become false,
and this condition got satisfied only when the previous spinlock
holder released ownership of the token.

>> +       __this_cpu_write(*percpu_token, prev);

This stores the token we're waiting on in our per-cpu stash so we can
reuse it on the next spinlock acquisition. It doesn't belong to us
yet, but it will as soon as we're done spinning.

We also have the __q_spin_lock_process() vs __q_spin_lock_bh()
distinction to ensure interrupts won't cause us to reuse per-cpu
tokens before we're done spinning on them.

Hope this helps,

-- 
Michel "Walken" Lespinasse
A program is never fully debugged until the last user dies.

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [RFC PATCH 4/6] kernel: faster queue spinlock implementation
  2013-01-22 23:13 ` [RFC PATCH 4/6] kernel: faster queue spinlock implementation Michel Lespinasse
  2013-01-23 21:55   ` Rik van Riel
@ 2013-01-24  0:18   ` Eric Dumazet
  2013-01-25 20:30   ` [RFC PATCH 7/6] kernel: document fast queue spinlocks Rik van Riel
  2 siblings, 0 replies; 25+ messages in thread
From: Eric Dumazet @ 2013-01-24  0:18 UTC (permalink / raw)
  To: Michel Lespinasse
  Cc: Rik van Riel, Ingo Molnar, Paul E. McKenney, David Howells,
	Thomas Gleixner, Eric Dumazet, Eric W. Biederman, Manfred Spraul,
	linux-kernel

On Tue, 2013-01-22 at 15:13 -0800, Michel Lespinasse wrote:
>  {
> -	__q_spin_unlock(lock, node);
> -	preempt_enable_no_resched();
> -	local_bh_enable_ip((unsigned long)__builtin_return_address(0));
> +	unsigned int cpu, i;
> +	struct q_spinlock_token *token;
> +	for_each_possible_cpu(cpu)
> +		for (i = 0; i < 2; i++) {
> +			token = kmalloc(sizeof(struct q_spinlock_alloc_token),
> +					GFP_KERNEL);
> +			BUG_ON(!token);
> +			token->wait = false;
> +			per_cpu(q_spinlock_token[i], cpu) = token;
> +		}

You might use kmalloc_node(...   , cpu_to_node(cpu)) to spread these
tokens on all nodes.



^ permalink raw reply	[flat|nested] 25+ messages in thread

* [RFC PATCH 7/6] kernel: document fast queue spinlocks
  2013-01-22 23:13 ` [RFC PATCH 4/6] kernel: faster queue spinlock implementation Michel Lespinasse
  2013-01-23 21:55   ` Rik van Riel
  2013-01-24  0:18   ` Eric Dumazet
@ 2013-01-25 20:30   ` Rik van Riel
  2 siblings, 0 replies; 25+ messages in thread
From: Rik van Riel @ 2013-01-25 20:30 UTC (permalink / raw)
  To: Michel Lespinasse
  Cc: Ingo Molnar, Paul E. McKenney, David Howells, Thomas Gleixner,
	Eric Dumazet, Eric W. Biederman, Manfred Spraul, linux-kernel

Document the fast queue spinlocks in a way that I can understand.

Signed-off-by: Rik van Riel <riel@redhat.com>
---
This may still not be clear to others. Please let me know if you
would like me to change/enhance the documentation, so you can
understand it too.

 kernel/queue_spinlock.c |   20 ++++++++++++++++++++
 1 files changed, 20 insertions(+), 0 deletions(-)

diff --git a/kernel/queue_spinlock.c b/kernel/queue_spinlock.c
index b571508..dc740fe 100644
--- a/kernel/queue_spinlock.c
+++ b/kernel/queue_spinlock.c
@@ -9,6 +9,23 @@
 #include <asm/processor.h>	/* for cpu_relax() */
 #include <asm/queue_spinlock.h>
 
+/*
+ * Fast queue spinlocks use a pool of tokens, which contain the actual locks,
+ * and are continuously moved around. Every spinlock is associated with one
+ * token, and every CPU is associated with two tokens. 
+ *
+ * When taking a lock, one of the CPU's tokens is associated with the lock,
+ * and the lock's token is associated with the CPU.
+ *
+ * The token that gets associated with the spinlock at lock time will indicate
+ * the lock is busy. The token that was previously associated with the spinlock,
+ * and is now associated with the CPU taking the lock, will indicate whether
+ * the previous lock holder has already unlocked the lock.
+ *
+ * To unlock a fast ticket spinlock, the CPU will unlock the token that it
+ * associated with the spinlock.
+ */
+
 DEFINE_PER_CPU(struct q_spinlock_token *, q_spinlock_token[2]);
 
 static inline struct q_spinlock_token *
@@ -25,8 +42,11 @@ ____q_spin_lock(struct q_spinlock *lock,
 
 	token = __this_cpu_read(*percpu_token);
 	token->wait = true;
+	/* Associate our (marked busy) token with the spinlock. */
 	prev = xchg(&lock->token, token);
+	/* The spinlock's old token is ours now. */
 	__this_cpu_write(*percpu_token, prev);
+	/* Wait for the spinlock's old token to be unlocked. */
 	while (ACCESS_ONCE(prev->wait))
 		cpu_relax();
 	q_spin_lock_mb();	/* guarantee acquire load semantics */


^ permalink raw reply related	[flat|nested] 25+ messages in thread

* Re: [RFC PATCH 1/6] kernel: implement queue spinlock API
  2013-01-22 23:13 ` [RFC PATCH 1/6] kernel: implement queue spinlock API Michel Lespinasse
@ 2013-02-07 22:34   ` Paul E. McKenney
  2013-02-07 22:56     ` Eric Dumazet
                       ` (2 more replies)
  0 siblings, 3 replies; 25+ messages in thread
From: Paul E. McKenney @ 2013-02-07 22:34 UTC (permalink / raw)
  To: Michel Lespinasse
  Cc: Rik van Riel, Ingo Molnar, David Howells, Thomas Gleixner,
	Eric Dumazet, Eric W. Biederman, Manfred Spraul, linux-kernel,
	john.stultz

On Tue, Jan 22, 2013 at 03:13:30PM -0800, Michel Lespinasse wrote:
> Introduce queue spinlocks, to be used in situations where it is desired
> to have good throughput even under the occasional high-contention situation.
> 
> This initial implementation is based on the classic MCS spinlock,
> because I think this represents the nicest API we can hope for in a
> fast queue spinlock algorithm. The MCS spinlock has known limitations
> in that it performs very well under high contention, but is not as
> good as the ticket spinlock under low contention. I will address these
> limitations in a later patch, which will propose an alternative,
> higher performance implementation using (mostly) the same API.
> 
> Sample use case acquiring mystruct->lock:
> 
>   struct q_spinlock_node node;
> 
>   q_spin_lock(&mystruct->lock, &node);
>   ...
>   q_spin_unlock(&mystruct->lock, &node);

It is possible to keep the normal API for MCS locks by having the lock
holder remember the parameter in the lock word itself.  While spinning,
the node is on the stack, is not needed once the lock is acquired.
The pointer to the next node in the queue -is- needed, but this can be
stored in the lock word.

I believe that John Stultz worked on something like this some years back,
so added him to CC.

							Thanx, Paul

> The q_spinlock_node must be used by a single lock holder / waiter and it must
> stay allocated for the entire duration when the lock is held (or waited on).
> 
> Signed-off-by: Michel Lespinasse <walken@google.com>
> 
> ---
>  arch/x86/include/asm/queue_spinlock.h |   20 ++++++++
>  include/asm-generic/queue_spinlock.h  |    7 +++
>  include/linux/queue_spinlock.h        |   61 ++++++++++++++++++++++++
>  kernel/Makefile                       |    2 +-
>  kernel/queue_spinlock.c               |   82 +++++++++++++++++++++++++++++++++
>  5 files changed, 171 insertions(+), 1 deletions(-)
>  create mode 100644 arch/x86/include/asm/queue_spinlock.h
>  create mode 100644 include/asm-generic/queue_spinlock.h
>  create mode 100644 include/linux/queue_spinlock.h
>  create mode 100644 kernel/queue_spinlock.c
> 
> diff --git a/arch/x86/include/asm/queue_spinlock.h b/arch/x86/include/asm/queue_spinlock.h
> new file mode 100644
> index 000000000000..655e01c1c2e8
> --- /dev/null
> +++ b/arch/x86/include/asm/queue_spinlock.h
> @@ -0,0 +1,20 @@
> +#ifndef _ASM_QUEUE_SPINLOCK_H
> +#define _ASM_QUEUE_SPINLOCK_H
> +
> +#if defined(CONFIG_X86_PPRO_FENCE) || defined(CONFIG_X86_OOSTORE)
> +
> +#define q_spin_lock_mb() mb()
> +#define q_spin_unlock_mb() mb()
> +
> +#else
> +
> +/*
> + * x86 memory ordering only needs compiler barriers to implement
> + * acquire load / release store semantics
> + */
> +#define q_spin_lock_mb() barrier()
> +#define q_spin_unlock_mb() barrier()
> +
> +#endif
> +
> +#endif /* _ASM_QUEUE_SPINLOCK_H */
> diff --git a/include/asm-generic/queue_spinlock.h b/include/asm-generic/queue_spinlock.h
> new file mode 100644
> index 000000000000..01fb9fbfb8dc
> --- /dev/null
> +++ b/include/asm-generic/queue_spinlock.h
> @@ -0,0 +1,7 @@
> +#ifndef _ASM_GENERIC_QUEUE_SPINLOCK_H
> +#define _ASM_GENERIC_QUEUE_SPINLOCK_H
> +
> +#define q_spin_lock_mb() mb()
> +#define q_spin_unlock_mb() mb()
> +
> +#endif /* _ASM_GENERIC_QUEUE_SPINLOCK_H */
> diff --git a/include/linux/queue_spinlock.h b/include/linux/queue_spinlock.h
> new file mode 100644
> index 000000000000..84b2470d232b
> --- /dev/null
> +++ b/include/linux/queue_spinlock.h
> @@ -0,0 +1,61 @@
> +#ifndef __LINUX_QUEUE_SPINLOCK_H
> +#define __LINUX_QUEUE_SPINLOCK_H
> +
> +#include <linux/bug.h>
> +
> +struct q_spinlock_node {
> +	struct q_spinlock_node *next;
> +	bool wait;
> +};
> +
> +struct q_spinlock {
> +	struct q_spinlock_node *node;
> +};
> +
> +#define __Q_SPIN_LOCK_UNLOCKED(name) { NULL }
> +
> +#ifdef CONFIG_SMP
> +
> +void q_spin_lock(struct q_spinlock *lock, struct q_spinlock_node *node);
> +void q_spin_lock_bh(struct q_spinlock *lock, struct q_spinlock_node *node);
> +void q_spin_unlock(struct q_spinlock *lock, struct q_spinlock_node *node);
> +void q_spin_unlock_bh(struct q_spinlock *lock, struct q_spinlock_node *node);
> +
> +static inline void q_spin_lock_init(struct q_spinlock *lock)
> +{
> +	lock->node = NULL;
> +}
> +
> +static inline void assert_q_spin_locked(struct q_spinlock *lock)
> +{
> +	BUG_ON(!lock->node);
> +}
> +
> +static inline void assert_q_spin_unlocked(struct q_spinlock *lock)
> +{
> +	BUG_ON(lock->node);
> +}
> +
> +#else /* !CONFIG_SMP */
> +
> +static inline void
> +q_spin_lock(struct q_spinlock *lock, struct q_spinlock_node *node) {}
> +
> +static inline void
> +q_spin_lock_bh(struct q_spinlock *lock, struct q_spinlock_node *node) {}
> +
> +static inline void
> +q_spin_unlock(struct q_spinlock *lock, struct q_spinlock_node *node) {}
> +
> +static inline void
> +q_spin_unlock_bh(struct q_spinlock *lock, struct q_spinlock_node *node) {}
> +
> +static inline void q_spin_lock_init(struct q_spinlock *lock) {}
> +
> +static inline void assert_q_spin_locked(struct q_spinlock *lock) {}
> +
> +static inline void assert_q_spin_unlocked(struct q_spinlock *lock) {}
> +
> +#endif /* CONFIG_SMP */
> +
> +#endif /* __LINUX_QUEUE_SPINLOCK_H */
> diff --git a/kernel/Makefile b/kernel/Makefile
> index 86e3285ae7e5..ec91cfef4d4e 100644
> --- a/kernel/Makefile
> +++ b/kernel/Makefile
> @@ -49,7 +49,7 @@ obj-$(CONFIG_SMP) += smp.o
>  ifneq ($(CONFIG_SMP),y)
>  obj-y += up.o
>  endif
> -obj-$(CONFIG_SMP) += spinlock.o
> +obj-$(CONFIG_SMP) += spinlock.o queue_spinlock.o
>  obj-$(CONFIG_DEBUG_SPINLOCK) += spinlock.o
>  obj-$(CONFIG_PROVE_LOCKING) += spinlock.o
>  obj-$(CONFIG_UID16) += uid16.o
> diff --git a/kernel/queue_spinlock.c b/kernel/queue_spinlock.c
> new file mode 100644
> index 000000000000..20304f0bc852
> --- /dev/null
> +++ b/kernel/queue_spinlock.c
> @@ -0,0 +1,82 @@
> +#include <linux/queue_spinlock.h>
> +#include <linux/export.h>
> +#include <linux/preempt.h>
> +#include <linux/bottom_half.h>
> +#include <asm/barrier.h>
> +#include <asm/processor.h>	/* for cpu_relax() */
> +#include <asm/queue_spinlock.h>
> +
> +static inline void
> +__q_spin_lock(struct q_spinlock *lock, struct q_spinlock_node *node)
> +{
> +	struct q_spinlock_node *prev;
> +
> +	node->next = NULL;
> +	prev = xchg(&lock->node, node);
> +	if (likely(!prev))
> +		/*
> +		 * Acquired the lock.
> +		 * xchg() already includes a full memory barrier.
> +		 */
> +		return;
> +
> +	node->wait = true;
> +	smp_wmb();
> +	ACCESS_ONCE(prev->next) = node;
> +	while (ACCESS_ONCE(node->wait))
> +		cpu_relax();
> +	q_spin_lock_mb();	/* guarantee acquire load semantics */
> +}
> +
> +static inline void
> +__q_spin_unlock(struct q_spinlock *lock, struct q_spinlock_node *node)
> +{
> +	struct q_spinlock_node *next;
> +	if (likely(!(next = ACCESS_ONCE(node->next)))) {
> +		/*
> +		 * Try to release lock.
> +		 * cmpxchg() already includes a full memory barrier.
> +		 */
> +		if (likely(cmpxchg(&lock->node, node, NULL) == node))
> +			return;
> +
> +		/*
> +		 * We raced with __q_spin_lock().
> +		 * Wait for next thread to be known.
> +		 */
> +		while (!(next = ACCESS_ONCE(node->next)))
> +			cpu_relax();
> +	}
> +	q_spin_unlock_mb();	/* guarantee release store semantics */
> +	ACCESS_ONCE(next->wait) = false;
> +}
> +
> +void q_spin_lock(struct q_spinlock *lock, struct q_spinlock_node *node)
> +{
> +	preempt_disable();
> +	__q_spin_lock(lock, node);
> +}
> +EXPORT_SYMBOL(q_spin_lock);
> +
> +void q_spin_lock_bh(struct q_spinlock *lock, struct q_spinlock_node *node)
> +{
> +	local_bh_disable();
> +	preempt_disable();
> +	__q_spin_lock(lock, node);
> +}
> +EXPORT_SYMBOL(q_spin_lock_bh);
> +
> +void q_spin_unlock(struct q_spinlock *lock, struct q_spinlock_node *node)
> +{
> +	__q_spin_unlock(lock, node);
> +	preempt_enable();
> +}
> +EXPORT_SYMBOL(q_spin_unlock);
> +
> +void q_spin_unlock_bh(struct q_spinlock *lock, struct q_spinlock_node *node)
> +{
> +	__q_spin_unlock(lock, node);
> +	preempt_enable_no_resched();
> +	local_bh_enable_ip((unsigned long)__builtin_return_address(0));
> +}
> +EXPORT_SYMBOL(q_spin_unlock_bh);
> -- 
> 1.7.7.3
> 


^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [RFC PATCH 1/6] kernel: implement queue spinlock API
  2013-02-07 22:34   ` Paul E. McKenney
@ 2013-02-07 22:56     ` Eric Dumazet
  2013-02-07 23:53       ` Paul E. McKenney
  2013-02-07 23:58       ` Michel Lespinasse
  2013-02-07 23:14     ` John Stultz
  2013-02-08  0:35     ` Michel Lespinasse
  2 siblings, 2 replies; 25+ messages in thread
From: Eric Dumazet @ 2013-02-07 22:56 UTC (permalink / raw)
  To: paulmck
  Cc: Michel Lespinasse, Rik van Riel, Ingo Molnar, David Howells,
	Thomas Gleixner, Eric Dumazet, Eric W. Biederman, Manfred Spraul,
	linux-kernel, john.stultz

On Thu, 2013-02-07 at 14:34 -0800, Paul E. McKenney wrote:
> On Tue, Jan 22, 2013 at 03:13:30PM -0800, Michel Lespinasse wrote:
> > Introduce queue spinlocks, to be used in situations where it is desired
> > to have good throughput even under the occasional high-contention situation.
> > 
> > This initial implementation is based on the classic MCS spinlock,
> > because I think this represents the nicest API we can hope for in a
> > fast queue spinlock algorithm. The MCS spinlock has known limitations
> > in that it performs very well under high contention, but is not as
> > good as the ticket spinlock under low contention. I will address these
> > limitations in a later patch, which will propose an alternative,
> > higher performance implementation using (mostly) the same API.
> > 
> > Sample use case acquiring mystruct->lock:
> > 
> >   struct q_spinlock_node node;
> > 
> >   q_spin_lock(&mystruct->lock, &node);
> >   ...
> >   q_spin_unlock(&mystruct->lock, &node);
> 
> It is possible to keep the normal API for MCS locks by having the lock
> holder remember the parameter in the lock word itself.  While spinning,
> the node is on the stack, is not needed once the lock is acquired.
> The pointer to the next node in the queue -is- needed, but this can be
> stored in the lock word.
> 
> I believe that John Stultz worked on something like this some years back,
> so added him to CC.
> 

Hmm...

This could easily break if the spin_lock() is embedded in a function,
and the unlock done in another one.

(storage for the node would disappear at function epilogue )




^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [RFC PATCH 1/6] kernel: implement queue spinlock API
  2013-02-07 22:34   ` Paul E. McKenney
  2013-02-07 22:56     ` Eric Dumazet
@ 2013-02-07 23:14     ` John Stultz
  2013-02-08  0:35     ` Michel Lespinasse
  2 siblings, 0 replies; 25+ messages in thread
From: John Stultz @ 2013-02-07 23:14 UTC (permalink / raw)
  To: paulmck
  Cc: Michel Lespinasse, Rik van Riel, Ingo Molnar, David Howells,
	Thomas Gleixner, Eric Dumazet, Eric W. Biederman, Manfred Spraul,
	linux-kernel

On 02/07/2013 02:34 PM, Paul E. McKenney wrote:
> On Tue, Jan 22, 2013 at 03:13:30PM -0800, Michel Lespinasse wrote:
>> Introduce queue spinlocks, to be used in situations where it is desired
>> to have good throughput even under the occasional high-contention situation.
>>
>> This initial implementation is based on the classic MCS spinlock,
>> because I think this represents the nicest API we can hope for in a
>> fast queue spinlock algorithm. The MCS spinlock has known limitations
>> in that it performs very well under high contention, but is not as
>> good as the ticket spinlock under low contention. I will address these
>> limitations in a later patch, which will propose an alternative,
>> higher performance implementation using (mostly) the same API.
>>
>> Sample use case acquiring mystruct->lock:
>>
>>    struct q_spinlock_node node;
>>
>>    q_spin_lock(&mystruct->lock, &node);
>>    ...
>>    q_spin_unlock(&mystruct->lock, &node);
> It is possible to keep the normal API for MCS locks by having the lock
> holder remember the parameter in the lock word itself.  While spinning,
> the node is on the stack, is not needed once the lock is acquired.
> The pointer to the next node in the queue -is- needed, but this can be
> stored in the lock word.
>
> I believe that John Stultz worked on something like this some years back,
> so added him to CC.
Oh yea, its been quite awhile.

Here are some of the discussion threads google remembers for me:

http://marc.info/?l=lse-tech&m=101227079817027&w=2

http://marc.info/?l=lse-tech&m=101380783015065&w=2  (missing the patch)
     http://marc.info/?l=lse-tech&m=101380783615084&w=2 (patch for above)


thanks
-john



^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [RFC PATCH 1/6] kernel: implement queue spinlock API
  2013-02-07 22:56     ` Eric Dumazet
@ 2013-02-07 23:53       ` Paul E. McKenney
  2013-02-07 23:58       ` Michel Lespinasse
  1 sibling, 0 replies; 25+ messages in thread
From: Paul E. McKenney @ 2013-02-07 23:53 UTC (permalink / raw)
  To: Eric Dumazet
  Cc: Michel Lespinasse, Rik van Riel, Ingo Molnar, David Howells,
	Thomas Gleixner, Eric Dumazet, Eric W. Biederman, Manfred Spraul,
	linux-kernel, john.stultz

On Thu, Feb 07, 2013 at 02:56:49PM -0800, Eric Dumazet wrote:
> On Thu, 2013-02-07 at 14:34 -0800, Paul E. McKenney wrote:
> > On Tue, Jan 22, 2013 at 03:13:30PM -0800, Michel Lespinasse wrote:
> > > Introduce queue spinlocks, to be used in situations where it is desired
> > > to have good throughput even under the occasional high-contention situation.
> > > 
> > > This initial implementation is based on the classic MCS spinlock,
> > > because I think this represents the nicest API we can hope for in a
> > > fast queue spinlock algorithm. The MCS spinlock has known limitations
> > > in that it performs very well under high contention, but is not as
> > > good as the ticket spinlock under low contention. I will address these
> > > limitations in a later patch, which will propose an alternative,
> > > higher performance implementation using (mostly) the same API.
> > > 
> > > Sample use case acquiring mystruct->lock:
> > > 
> > >   struct q_spinlock_node node;
> > > 
> > >   q_spin_lock(&mystruct->lock, &node);
> > >   ...
> > >   q_spin_unlock(&mystruct->lock, &node);
> > 
> > It is possible to keep the normal API for MCS locks by having the lock
> > holder remember the parameter in the lock word itself.  While spinning,
> > the node is on the stack, is not needed once the lock is acquired.
> > The pointer to the next node in the queue -is- needed, but this can be
> > stored in the lock word.
> > 
> > I believe that John Stultz worked on something like this some years back,
> > so added him to CC.
> > 
> 
> Hmm...
> 
> This could easily break if the spin_lock() is embedded in a function,
> and the unlock done in another one.
> 
> (storage for the node would disappear at function epilogue )

But that is OK -- the storage is used only for spinning on.  Once a given
task has actually acquired the lock, that storage is no longer needed.
What -is- needed is the pointer to the next CPU's node, and that node
is guaranteed to persist until the next CPU acquires the lock, which
cannot happen until this CPU releases that lock.

							Thanx, Paul


^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [RFC PATCH 1/6] kernel: implement queue spinlock API
  2013-02-07 22:56     ` Eric Dumazet
  2013-02-07 23:53       ` Paul E. McKenney
@ 2013-02-07 23:58       ` Michel Lespinasse
  2013-02-08  0:03         ` Eric Dumazet
  1 sibling, 1 reply; 25+ messages in thread
From: Michel Lespinasse @ 2013-02-07 23:58 UTC (permalink / raw)
  To: Eric Dumazet
  Cc: paulmck, Rik van Riel, Ingo Molnar, David Howells,
	Thomas Gleixner, Eric Dumazet, Eric W. Biederman, Manfred Spraul,
	linux-kernel, john.stultz

On Thu, Feb 7, 2013 at 2:56 PM, Eric Dumazet <eric.dumazet@gmail.com> wrote:
> On Thu, 2013-02-07 at 14:34 -0800, Paul E. McKenney wrote:
>> On Tue, Jan 22, 2013 at 03:13:30PM -0800, Michel Lespinasse wrote:
>> > Introduce queue spinlocks, to be used in situations where it is desired
>> > to have good throughput even under the occasional high-contention situation.
>> >
>> > This initial implementation is based on the classic MCS spinlock,
>> > because I think this represents the nicest API we can hope for in a
>> > fast queue spinlock algorithm. The MCS spinlock has known limitations
>> > in that it performs very well under high contention, but is not as
>> > good as the ticket spinlock under low contention. I will address these
>> > limitations in a later patch, which will propose an alternative,
>> > higher performance implementation using (mostly) the same API.
>> >
>> > Sample use case acquiring mystruct->lock:
>> >
>> >   struct q_spinlock_node node;
>> >
>> >   q_spin_lock(&mystruct->lock, &node);
>> >   ...
>> >   q_spin_unlock(&mystruct->lock, &node);
>>
>> It is possible to keep the normal API for MCS locks by having the lock
>> holder remember the parameter in the lock word itself.  While spinning,
>> the node is on the stack, is not needed once the lock is acquired.
>> The pointer to the next node in the queue -is- needed, but this can be
>> stored in the lock word.
>>
>> I believe that John Stultz worked on something like this some years back,
>> so added him to CC.
>>
>
> Hmm...
>
> This could easily break if the spin_lock() is embedded in a function,
> and the unlock done in another one.
>
> (storage for the node would disappear at function epilogue )

No, I think that's doable. The trick would be that once a thread
acquires the lock, the only remaining use of the node is to receive
the 'next' pointer if/when another thread starts contending for the
lock. So if the lock state would need to distinguish between a lock
that is currently locked but not contended (the next value would then
be NULL), and a lock that is currently locked and contended (the
lock->next value is the node that goes after the current lock owner)

-- 
Michel "Walken" Lespinasse
A program is never fully debugged until the last user dies.

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [RFC PATCH 1/6] kernel: implement queue spinlock API
  2013-02-07 23:58       ` Michel Lespinasse
@ 2013-02-08  0:03         ` Eric Dumazet
  2013-02-08  0:40           ` Paul E. McKenney
  0 siblings, 1 reply; 25+ messages in thread
From: Eric Dumazet @ 2013-02-08  0:03 UTC (permalink / raw)
  To: Michel Lespinasse
  Cc: paulmck, Rik van Riel, Ingo Molnar, David Howells,
	Thomas Gleixner, Eric Dumazet, Eric W. Biederman, Manfred Spraul,
	linux-kernel, john.stultz

On Thu, 2013-02-07 at 15:58 -0800, Michel Lespinasse wrote:

> No, I think that's doable. The trick would be that once a thread
> acquires the lock, the only remaining use of the node is to receive
> the 'next' pointer if/when another thread starts contending for the
> lock. So if the lock state would need to distinguish between a lock
> that is currently locked but not contended (the next value would then
> be NULL), and a lock that is currently locked and contended (the
> lock->next value is the node that goes after the current lock owner)
> 

It adds yet another memory write to store the node pointer in the
lock...

I suspect it's going to increase false sharing.



^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [RFC PATCH 1/6] kernel: implement queue spinlock API
  2013-02-07 22:34   ` Paul E. McKenney
  2013-02-07 22:56     ` Eric Dumazet
  2013-02-07 23:14     ` John Stultz
@ 2013-02-08  0:35     ` Michel Lespinasse
  2 siblings, 0 replies; 25+ messages in thread
From: Michel Lespinasse @ 2013-02-08  0:35 UTC (permalink / raw)
  To: paulmck
  Cc: Rik van Riel, Ingo Molnar, David Howells, Thomas Gleixner,
	Eric Dumazet, Eric W. Biederman, Manfred Spraul, linux-kernel,
	john.stultz

On Thu, Feb 7, 2013 at 2:34 PM, Paul E. McKenney
<paulmck@linux.vnet.ibm.com> wrote:
> On Tue, Jan 22, 2013 at 03:13:30PM -0800, Michel Lespinasse wrote:
>> Introduce queue spinlocks, to be used in situations where it is desired
>> to have good throughput even under the occasional high-contention situation.
>>
>> This initial implementation is based on the classic MCS spinlock,
>> because I think this represents the nicest API we can hope for in a
>> fast queue spinlock algorithm. The MCS spinlock has known limitations
>> in that it performs very well under high contention, but is not as
>> good as the ticket spinlock under low contention. I will address these
>> limitations in a later patch, which will propose an alternative,
>> higher performance implementation using (mostly) the same API.

While we're talking about MCS spinlocks, I should point out a
potential issue I found about my 'faster queue spinlock' proposed MCS
alternative. It is nice in that it avoids an atomic operation in the
unlock path; however each unlocked lock must have a 'token' allocated
and this token is accessed by the lock function. The issue with this
is that if two threads don't actually block on each other, but they
both access a lock that bounces between them, now the token will have
to bounce between these threads as well, which is going to be more
expansive than the atomic operation that was saved.

-- 
Michel "Walken" Lespinasse
A program is never fully debugged until the last user dies.

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [RFC PATCH 1/6] kernel: implement queue spinlock API
  2013-02-08  0:03         ` Eric Dumazet
@ 2013-02-08  0:40           ` Paul E. McKenney
  2013-02-08  3:48             ` Michel Lespinasse
  0 siblings, 1 reply; 25+ messages in thread
From: Paul E. McKenney @ 2013-02-08  0:40 UTC (permalink / raw)
  To: Eric Dumazet
  Cc: Michel Lespinasse, Rik van Riel, Ingo Molnar, David Howells,
	Thomas Gleixner, Eric Dumazet, Eric W. Biederman, Manfred Spraul,
	linux-kernel, john.stultz

On Thu, Feb 07, 2013 at 04:03:54PM -0800, Eric Dumazet wrote:
> On Thu, 2013-02-07 at 15:58 -0800, Michel Lespinasse wrote:
> 
> > No, I think that's doable. The trick would be that once a thread
> > acquires the lock, the only remaining use of the node is to receive
> > the 'next' pointer if/when another thread starts contending for the
> > lock. So if the lock state would need to distinguish between a lock
> > that is currently locked but not contended (the next value would then
> > be NULL), and a lock that is currently locked and contended (the
> > lock->next value is the node that goes after the current lock owner)
> > 
> 
> It adds yet another memory write to store the node pointer in the
> lock...
> 
> I suspect it's going to increase false sharing.

On the other hand, compared to straight MCS, it reduces the need to
pass the node address around.  Furthermore, the node pointer is likely
to be in the same cache line as the lock word itself, and finally
some architectures can do a double-pointer store.

Of course, it might well be slower, but it seems like it is worth
giving it a try.

							Thanx, Paul


^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [RFC PATCH 1/6] kernel: implement queue spinlock API
  2013-02-08  0:40           ` Paul E. McKenney
@ 2013-02-08  3:48             ` Michel Lespinasse
  2013-02-08  4:36               ` Paul E. McKenney
  0 siblings, 1 reply; 25+ messages in thread
From: Michel Lespinasse @ 2013-02-08  3:48 UTC (permalink / raw)
  To: paulmck
  Cc: Eric Dumazet, Rik van Riel, Ingo Molnar, David Howells,
	Thomas Gleixner, Eric Dumazet, Eric W. Biederman, Manfred Spraul,
	linux-kernel, john.stultz

On Thu, Feb 7, 2013 at 4:40 PM, Paul E. McKenney
<paulmck@linux.vnet.ibm.com> wrote:
> On Thu, Feb 07, 2013 at 04:03:54PM -0800, Eric Dumazet wrote:
>> It adds yet another memory write to store the node pointer in the
>> lock...
>>
>> I suspect it's going to increase false sharing.
>
> On the other hand, compared to straight MCS, it reduces the need to
> pass the node address around.  Furthermore, the node pointer is likely
> to be in the same cache line as the lock word itself, and finally
> some architectures can do a double-pointer store.
>
> Of course, it might well be slower, but it seems like it is worth
> giving it a try.

Right. Another nice point about this approach is that there needs to
be only one node per spinning CPU, so the node pointers (both tail and
next) might be replaced with CPU identifiers, which would bring the
spinlock size down to the same as with the ticket spinlock (which in
turns makes it that much more likely that we'll have atomic stores of
that size).

-- 
Michel "Walken" Lespinasse
A program is never fully debugged until the last user dies.

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [RFC PATCH 1/6] kernel: implement queue spinlock API
  2013-02-08  3:48             ` Michel Lespinasse
@ 2013-02-08  4:36               ` Paul E. McKenney
  2013-02-08  5:03                 ` Paul E. McKenney
  0 siblings, 1 reply; 25+ messages in thread
From: Paul E. McKenney @ 2013-02-08  4:36 UTC (permalink / raw)
  To: Michel Lespinasse
  Cc: Eric Dumazet, Rik van Riel, Ingo Molnar, David Howells,
	Thomas Gleixner, Eric Dumazet, Eric W. Biederman, Manfred Spraul,
	linux-kernel, john.stultz

On Thu, Feb 07, 2013 at 07:48:33PM -0800, Michel Lespinasse wrote:
> On Thu, Feb 7, 2013 at 4:40 PM, Paul E. McKenney
> <paulmck@linux.vnet.ibm.com> wrote:
> > On Thu, Feb 07, 2013 at 04:03:54PM -0800, Eric Dumazet wrote:
> >> It adds yet another memory write to store the node pointer in the
> >> lock...
> >>
> >> I suspect it's going to increase false sharing.
> >
> > On the other hand, compared to straight MCS, it reduces the need to
> > pass the node address around.  Furthermore, the node pointer is likely
> > to be in the same cache line as the lock word itself, and finally
> > some architectures can do a double-pointer store.
> >
> > Of course, it might well be slower, but it seems like it is worth
> > giving it a try.
> 
> Right. Another nice point about this approach is that there needs to
> be only one node per spinning CPU, so the node pointers (both tail and
> next) might be replaced with CPU identifiers, which would bring the
> spinlock size down to the same as with the ticket spinlock (which in
> turns makes it that much more likely that we'll have atomic stores of
> that size).

Good point!  I must admit that this is one advantage of having the
various _irq spinlock acquisition primitives disable irqs before
spinning.  ;-)

							Thanx, Paul


^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [RFC PATCH 1/6] kernel: implement queue spinlock API
  2013-02-08  4:36               ` Paul E. McKenney
@ 2013-02-08  5:03                 ` Paul E. McKenney
  2013-02-08  5:11                   ` Michel Lespinasse
  0 siblings, 1 reply; 25+ messages in thread
From: Paul E. McKenney @ 2013-02-08  5:03 UTC (permalink / raw)
  To: Michel Lespinasse
  Cc: Eric Dumazet, Rik van Riel, Ingo Molnar, David Howells,
	Thomas Gleixner, Eric Dumazet, Eric W. Biederman, Manfred Spraul,
	linux-kernel, john.stultz

On Thu, Feb 07, 2013 at 08:36:43PM -0800, Paul E. McKenney wrote:
> On Thu, Feb 07, 2013 at 07:48:33PM -0800, Michel Lespinasse wrote:
> > On Thu, Feb 7, 2013 at 4:40 PM, Paul E. McKenney
> > <paulmck@linux.vnet.ibm.com> wrote:
> > > On Thu, Feb 07, 2013 at 04:03:54PM -0800, Eric Dumazet wrote:
> > >> It adds yet another memory write to store the node pointer in the
> > >> lock...
> > >>
> > >> I suspect it's going to increase false sharing.
> > >
> > > On the other hand, compared to straight MCS, it reduces the need to
> > > pass the node address around.  Furthermore, the node pointer is likely
> > > to be in the same cache line as the lock word itself, and finally
> > > some architectures can do a double-pointer store.
> > >
> > > Of course, it might well be slower, but it seems like it is worth
> > > giving it a try.
> > 
> > Right. Another nice point about this approach is that there needs to
> > be only one node per spinning CPU, so the node pointers (both tail and
> > next) might be replaced with CPU identifiers, which would bring the
> > spinlock size down to the same as with the ticket spinlock (which in
> > turns makes it that much more likely that we'll have atomic stores of
> > that size).
> 
> Good point!  I must admit that this is one advantage of having the
> various _irq spinlock acquisition primitives disable irqs before
> spinning.  ;-)

Right...  For spinlocks that -don't- disable irqs, you need to deal with
the possibility that a CPU gets interrupted while spinning, and the
interrupt handler also tries to acquire a queued lock.  One way to deal
with this is to have a node per CPUxirq.  Of course, if interrupts
handlers always disable irqs when acquiring a spinlock, then you only
need CPUx2.

							Thanx, Paul


^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [RFC PATCH 1/6] kernel: implement queue spinlock API
  2013-02-08  5:03                 ` Paul E. McKenney
@ 2013-02-08  5:11                   ` Michel Lespinasse
  2013-02-08 16:17                     ` Paul E. McKenney
  0 siblings, 1 reply; 25+ messages in thread
From: Michel Lespinasse @ 2013-02-08  5:11 UTC (permalink / raw)
  To: paulmck
  Cc: Eric Dumazet, Rik van Riel, Ingo Molnar, David Howells,
	Thomas Gleixner, Eric Dumazet, Eric W. Biederman, Manfred Spraul,
	linux-kernel, john.stultz

On Thu, Feb 7, 2013 at 9:03 PM, Paul E. McKenney
<paulmck@linux.vnet.ibm.com> wrote:
> Right...  For spinlocks that -don't- disable irqs, you need to deal with
> the possibility that a CPU gets interrupted while spinning, and the
> interrupt handler also tries to acquire a queued lock.  One way to deal
> with this is to have a node per CPUxirq.  Of course, if interrupts
> handlers always disable irqs when acquiring a spinlock, then you only
> need CPUx2.

The simple solution would be to do like I proposed in my faster queue
spinlock proposal, have one function for process context lock
acquisitions, another for bh-disabled acquisitions, and just say that
hardirqs can't use the queue spinlocks (I don't expect we have any
locks taken from hardirq context where contention might be an issue ?)

-- 
Michel "Walken" Lespinasse
A program is never fully debugged until the last user dies.

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [RFC PATCH 1/6] kernel: implement queue spinlock API
  2013-02-08  5:11                   ` Michel Lespinasse
@ 2013-02-08 16:17                     ` Paul E. McKenney
  0 siblings, 0 replies; 25+ messages in thread
From: Paul E. McKenney @ 2013-02-08 16:17 UTC (permalink / raw)
  To: Michel Lespinasse
  Cc: Eric Dumazet, Rik van Riel, Ingo Molnar, David Howells,
	Thomas Gleixner, Eric Dumazet, Eric W. Biederman, Manfred Spraul,
	linux-kernel, john.stultz

On Thu, Feb 07, 2013 at 09:11:14PM -0800, Michel Lespinasse wrote:
> On Thu, Feb 7, 2013 at 9:03 PM, Paul E. McKenney
> <paulmck@linux.vnet.ibm.com> wrote:
> > Right...  For spinlocks that -don't- disable irqs, you need to deal with
> > the possibility that a CPU gets interrupted while spinning, and the
> > interrupt handler also tries to acquire a queued lock.  One way to deal
> > with this is to have a node per CPUxirq.  Of course, if interrupts
> > handlers always disable irqs when acquiring a spinlock, then you only
> > need CPUx2.
> 
> The simple solution would be to do like I proposed in my faster queue
> spinlock proposal, have one function for process context lock
> acquisitions, another for bh-disabled acquisitions, and just say that
> hardirqs can't use the queue spinlocks (I don't expect we have any
> locks taken from hardirq context where contention might be an issue ?)

Makes sense!  The spinlocks that disable hardirqs should get extra
contention-reduction attention, after which the main benefit for
queued spinlocks is process-level spinlocks.

							Thanx, Paul


^ permalink raw reply	[flat|nested] 25+ messages in thread

end of thread, other threads:[~2013-02-08 16:17 UTC | newest]

Thread overview: 25+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2013-01-22 23:13 [RFC PATCH 0/6] fast queue spinlocks Michel Lespinasse
2013-01-22 23:13 ` [RFC PATCH 1/6] kernel: implement queue spinlock API Michel Lespinasse
2013-02-07 22:34   ` Paul E. McKenney
2013-02-07 22:56     ` Eric Dumazet
2013-02-07 23:53       ` Paul E. McKenney
2013-02-07 23:58       ` Michel Lespinasse
2013-02-08  0:03         ` Eric Dumazet
2013-02-08  0:40           ` Paul E. McKenney
2013-02-08  3:48             ` Michel Lespinasse
2013-02-08  4:36               ` Paul E. McKenney
2013-02-08  5:03                 ` Paul E. McKenney
2013-02-08  5:11                   ` Michel Lespinasse
2013-02-08 16:17                     ` Paul E. McKenney
2013-02-07 23:14     ` John Stultz
2013-02-08  0:35     ` Michel Lespinasse
2013-01-22 23:13 ` [RFC PATCH 2/6] net: convert qdisc busylock to use " Michel Lespinasse
2013-01-22 23:13 ` [RFC PATCH 3/6] ipc: convert ipc objects " Michel Lespinasse
2013-01-22 23:13 ` [RFC PATCH 4/6] kernel: faster queue spinlock implementation Michel Lespinasse
2013-01-23 21:55   ` Rik van Riel
2013-01-23 23:52     ` Michel Lespinasse
2013-01-24  0:18   ` Eric Dumazet
2013-01-25 20:30   ` [RFC PATCH 7/6] kernel: document fast queue spinlocks Rik van Riel
2013-01-22 23:13 ` [RFC PATCH 5/6] net: qdisc busylock updates to account for queue spinlock api change Michel Lespinasse
2013-01-22 23:13 ` [RFC PATCH 6/6] ipc: object locking " Michel Lespinasse
2013-01-22 23:17 ` [RFC PATCH 0/6] fast queue spinlocks Michel Lespinasse

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).