linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [RFC PATCH 0/2] illustrate cmpxchg ring for tap/tun and qdisc
@ 2016-11-11  4:43 John Fastabend
  2016-11-11  4:44 ` [RFC PATCH 1/2] net: use cmpxchg instead of spinlock in ptr rings John Fastabend
  2016-11-11  4:44 ` [RFC PATCH 2/2] ptr_ring_ll: pop/push multiple objects at once John Fastabend
  0 siblings, 2 replies; 9+ messages in thread
From: John Fastabend @ 2016-11-11  4:43 UTC (permalink / raw)
  To: jasowang; +Cc: netdev, linux-kernel

This is a cmpxchg ring that could potentially be used to replace the spinlock
variant of skb_array.

Couple comments its missing a resize operator which would need to be added
before it could be a drop in replacement. And although I tested the first
patch with my latest set of qdisc updates the second patch I have not tested
for a bit so it may or may not work but it illustrates the idea at least.

This was derived from the DPDK documentation where there is a description
of the cmpxchg ring presumably used there. It may or may not actually align
with what is done in DPDK implementation I have no idea I didn't look.


---

John Fastabend (2):
      net: use cmpxchg instead of spinlock in ptr rings
      ptr_ring_ll: pop/push multiple objects at once


 include/linux/netdevice.h   |   12 ----
 include/linux/ptr_ring_ll.h |  146 +++++++++++++++++++++++++++++++++++++++++++
 include/linux/skb_array.h   |   32 +++++++++
 net/core/dev.c              |   18 +----
 net/sched/sch_generic.c     |   86 +++++++++++++------------
 5 files changed, 224 insertions(+), 70 deletions(-)
 create mode 100644 include/linux/ptr_ring_ll.h

--
Signature

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [RFC PATCH 1/2] net: use cmpxchg instead of spinlock in ptr rings
  2016-11-11  4:43 [RFC PATCH 0/2] illustrate cmpxchg ring for tap/tun and qdisc John Fastabend
@ 2016-11-11  4:44 ` John Fastabend
  2016-11-14 11:09   ` Jesper Dangaard Brouer
  2016-11-14 23:01   ` Michael S. Tsirkin
  2016-11-11  4:44 ` [RFC PATCH 2/2] ptr_ring_ll: pop/push multiple objects at once John Fastabend
  1 sibling, 2 replies; 9+ messages in thread
From: John Fastabend @ 2016-11-11  4:44 UTC (permalink / raw)
  To: jasowang; +Cc: netdev, linux-kernel


---
 include/linux/ptr_ring_ll.h |  136 +++++++++++++++++++++++++++++++++++++++++++
 include/linux/skb_array.h   |   25 ++++++++
 2 files changed, 161 insertions(+)
 create mode 100644 include/linux/ptr_ring_ll.h

diff --git a/include/linux/ptr_ring_ll.h b/include/linux/ptr_ring_ll.h
new file mode 100644
index 0000000..bcb11f3
--- /dev/null
+++ b/include/linux/ptr_ring_ll.h
@@ -0,0 +1,136 @@
+/*
+ *	Definitions for the 'struct ptr_ring_ll' datastructure.
+ *
+ *	Author:
+ *		John Fastabend <john.r.fastabend@intel.com>
+ *
+ *	Copyright (C) 2016 Intel Corp.
+ *
+ *	This program is free software; you can redistribute it and/or modify it
+ *	under the terms of the GNU General Public License as published by the
+ *	Free Software Foundation; either version 2 of the License, or (at your
+ *	option) any later version.
+ *
+ *	This is a limited-size FIFO maintaining pointers in FIFO order, with
+ *	one CPU producing entries and another consuming entries from a FIFO.
+ *	extended from ptr_ring_ll to use cmpxchg over spin lock.
+ */
+
+#ifndef _LINUX_PTR_RING_LL_H
+#define _LINUX_PTR_RING_LL_H 1
+
+#ifdef __KERNEL__
+#include <linux/spinlock.h>
+#include <linux/cache.h>
+#include <linux/types.h>
+#include <linux/compiler.h>
+#include <linux/cache.h>
+#include <linux/slab.h>
+#include <asm/errno.h>
+#endif
+
+struct ptr_ring_ll {
+	u32 prod_size;
+	u32 prod_mask;
+	u32 prod_head;
+	u32 prod_tail;
+	u32 cons_size;
+	u32 cons_mask;
+	u32 cons_head;
+	u32 cons_tail;
+
+	void **queue;
+};
+
+/* Note: callers invoking this in a loop must use a compiler barrier,
+ * for example cpu_relax(). Callers must hold producer_lock.
+ */
+static inline int __ptr_ring_ll_produce(struct ptr_ring_ll *r, void *ptr)
+{
+	u32 ret, head, tail, next, slots, mask;
+
+	do {
+		head = READ_ONCE(r->prod_head);
+		mask = READ_ONCE(r->prod_mask);
+		tail = READ_ONCE(r->cons_tail);
+
+		slots = mask + tail - head;
+		if (slots < 1)
+			return -ENOMEM;
+
+		next = head + 1;
+		ret = cmpxchg(&r->prod_head, head, next);
+	} while (ret != head);
+
+	r->queue[head & mask] = ptr;
+	smp_wmb();
+
+	while (r->prod_tail != head)
+		cpu_relax();
+
+	r->prod_tail = next;
+	return 0;
+}
+
+static inline void *__ptr_ring_ll_consume(struct ptr_ring_ll *r)
+{
+	u32 ret, head, tail, next, slots, mask;
+	void *ptr;
+
+	do {
+		head = READ_ONCE(r->cons_head);
+		mask = READ_ONCE(r->cons_mask);
+		tail = READ_ONCE(r->prod_tail);
+
+		slots = tail - head;
+		if (slots < 1)
+			return ERR_PTR(-ENOMEM);
+
+		next = head + 1;
+		ret = cmpxchg(&r->cons_head, head, next);
+	} while (ret != head);
+
+	ptr = r->queue[head & mask];
+	smp_rmb();
+
+	while (r->cons_tail != head)
+		cpu_relax();
+
+	r->cons_tail = next;
+	return ptr;
+}
+
+static inline void **__ptr_ring_ll_init_queue_alloc(int size, gfp_t gfp)
+{
+	return kzalloc(ALIGN(size * sizeof(void *), SMP_CACHE_BYTES), gfp);
+}
+
+static inline int ptr_ring_ll_init(struct ptr_ring_ll *r, int size, gfp_t gfp)
+{
+	r->queue = __ptr_ring_init_queue_alloc(size, gfp);
+	if (!r->queue)
+		return -ENOMEM;
+
+	r->prod_size = r->cons_size = size;
+	r->prod_mask = r->cons_mask = size - 1;
+	r->prod_tail = r->prod_head = 0;
+	r->cons_tail = r->prod_tail = 0;
+
+	return 0;
+}
+
+static inline void ptr_ring_ll_cleanup(struct ptr_ring_ll *r, void (*destroy)(void *))
+{
+	if (destroy) {
+		void *ptr;
+
+		ptr = __ptr_ring_ll_consume(r);
+		while (!IS_ERR_OR_NULL(ptr)) {
+			destroy(ptr);
+			ptr = __ptr_ring_ll_consume(r);
+		}
+	}
+	kfree(r->queue);
+}
+
+#endif /* _LINUX_PTR_RING_LL_H  */
diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h
index f4dfade..9b43dfd 100644
--- a/include/linux/skb_array.h
+++ b/include/linux/skb_array.h
@@ -22,6 +22,7 @@
 
 #ifdef __KERNEL__
 #include <linux/ptr_ring.h>
+#include <linux/ptr_ring_ll.h>
 #include <linux/skbuff.h>
 #include <linux/if_vlan.h>
 #endif
@@ -30,6 +31,10 @@ struct skb_array {
 	struct ptr_ring ring;
 };
 
+struct skb_array_ll {
+	struct ptr_ring_ll ring;
+};
+
 /* Might be slightly faster than skb_array_full below, but callers invoking
  * this in a loop must use a compiler barrier, for example cpu_relax().
  */
@@ -43,6 +48,11 @@ static inline bool skb_array_full(struct skb_array *a)
 	return ptr_ring_full(&a->ring);
 }
 
+static inline int skb_array_ll_produce(struct skb_array_ll *a, struct sk_buff *skb)
+{
+	return __ptr_ring_ll_produce(&a->ring, skb);
+}
+
 static inline int skb_array_produce(struct skb_array *a, struct sk_buff *skb)
 {
 	return ptr_ring_produce(&a->ring, skb);
@@ -92,6 +102,11 @@ static inline bool skb_array_empty_any(struct skb_array *a)
 	return ptr_ring_empty_any(&a->ring);
 }
 
+static inline struct sk_buff *skb_array_ll_consume(struct skb_array_ll *a)
+{
+	return __ptr_ring_ll_consume(&a->ring);
+}
+
 static inline struct sk_buff *skb_array_consume(struct skb_array *a)
 {
 	return ptr_ring_consume(&a->ring);
@@ -146,6 +161,11 @@ static inline int skb_array_peek_len_any(struct skb_array *a)
 	return PTR_RING_PEEK_CALL_ANY(&a->ring, __skb_array_len_with_tag);
 }
 
+static inline int skb_array_ll_init(struct skb_array_ll *a, int size, gfp_t gfp)
+{
+	return ptr_ring_ll_init(&a->ring, size, gfp);
+}
+
 static inline int skb_array_init(struct skb_array *a, int size, gfp_t gfp)
 {
 	return ptr_ring_init(&a->ring, size, gfp);
@@ -170,6 +190,11 @@ static inline int skb_array_resize_multiple(struct skb_array **rings,
 					__skb_array_destroy_skb);
 }
 
+static inline void skb_array_ll_cleanup(struct skb_array_ll *a)
+{
+	ptr_ring_ll_cleanup(&a->ring, __skb_array_destroy_skb);
+}
+
 static inline void skb_array_cleanup(struct skb_array *a)
 {
 	ptr_ring_cleanup(&a->ring, __skb_array_destroy_skb);

^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [RFC PATCH 2/2] ptr_ring_ll: pop/push multiple objects at once
  2016-11-11  4:43 [RFC PATCH 0/2] illustrate cmpxchg ring for tap/tun and qdisc John Fastabend
  2016-11-11  4:44 ` [RFC PATCH 1/2] net: use cmpxchg instead of spinlock in ptr rings John Fastabend
@ 2016-11-11  4:44 ` John Fastabend
  2016-11-14 23:06   ` Michael S. Tsirkin
  1 sibling, 1 reply; 9+ messages in thread
From: John Fastabend @ 2016-11-11  4:44 UTC (permalink / raw)
  To: jasowang; +Cc: netdev, linux-kernel

Signed-off-by: John Fastabend <john.r.fastabend@intel.com>
---
 include/linux/ptr_ring_ll.h |   22 ++++++++++++++++------
 include/linux/skb_array.h   |   11 +++++++++--
 net/sched/sch_generic.c     |    2 +-
 3 files changed, 26 insertions(+), 9 deletions(-)

diff --git a/include/linux/ptr_ring_ll.h b/include/linux/ptr_ring_ll.h
index bcb11f3..5dc25f7 100644
--- a/include/linux/ptr_ring_ll.h
+++ b/include/linux/ptr_ring_ll.h
@@ -45,9 +45,10 @@ struct ptr_ring_ll {
 /* Note: callers invoking this in a loop must use a compiler barrier,
  * for example cpu_relax(). Callers must hold producer_lock.
  */
-static inline int __ptr_ring_ll_produce(struct ptr_ring_ll *r, void *ptr)
+static inline int __ptr_ring_ll_produce_many(struct ptr_ring_ll *r,
+					     void **ptr, int num)
 {
-	u32 ret, head, tail, next, slots, mask;
+	u32 ret, head, tail, next, slots, mask, i;
 
 	do {
 		head = READ_ONCE(r->prod_head);
@@ -55,21 +56,30 @@ static inline int __ptr_ring_ll_produce(struct ptr_ring_ll *r, void *ptr)
 		tail = READ_ONCE(r->cons_tail);
 
 		slots = mask + tail - head;
-		if (slots < 1)
+		if (slots < num)
+			num = slots;
+
+		if (unlikely(!num))
 			return -ENOMEM;
 
-		next = head + 1;
+		next = head + num;
 		ret = cmpxchg(&r->prod_head, head, next);
 	} while (ret != head);
 
-	r->queue[head & mask] = ptr;
+	for (i = 0; i < num; i++)
+		r->queue[(head + i) & mask] = ptr[i];
 	smp_wmb();
 
 	while (r->prod_tail != head)
 		cpu_relax();
 
 	r->prod_tail = next;
-	return 0;
+	return num;
+}
+
+static inline int __ptr_ring_ll_produce(struct ptr_ring_ll *r, void **ptr)
+{
+	return __ptr_ring_ll_produce_many(r, ptr, 1);
 }
 
 static inline void *__ptr_ring_ll_consume(struct ptr_ring_ll *r)
diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h
index 9b43dfd..de3c700 100644
--- a/include/linux/skb_array.h
+++ b/include/linux/skb_array.h
@@ -48,9 +48,16 @@ static inline bool skb_array_full(struct skb_array *a)
 	return ptr_ring_full(&a->ring);
 }
 
-static inline int skb_array_ll_produce(struct skb_array_ll *a, struct sk_buff *skb)
+static inline int skb_array_ll_produce_many(struct skb_array_ll *a,
+					    struct sk_buff **skb, int num)
 {
-	return __ptr_ring_ll_produce(&a->ring, skb);
+	return __ptr_ring_ll_produce_many(&a->ring, (void **)skb, num);
+}
+
+static inline int skb_array_ll_produce(struct skb_array_ll *a,
+				       struct sk_buff **skb)
+{
+	return __ptr_ring_ll_produce(&a->ring, (void **)skb);
 }
 
 static inline int skb_array_produce(struct skb_array *a, struct sk_buff *skb)
diff --git a/net/sched/sch_generic.c b/net/sched/sch_generic.c
index 4648ec8..58f2011 100644
--- a/net/sched/sch_generic.c
+++ b/net/sched/sch_generic.c
@@ -571,7 +571,7 @@ static int pfifo_fast_enqueue(struct sk_buff *skb, struct Qdisc *qdisc,
 	struct skb_array_ll *q = band2list(priv, band);
 	int err;
 
-	err = skb_array_ll_produce(q, skb);
+	err = skb_array_ll_produce(q, &skb);
 
 	if (unlikely(err)) {
 		net_warn_ratelimited("drop a packet from fast enqueue\n");

^ permalink raw reply related	[flat|nested] 9+ messages in thread

* Re: [RFC PATCH 1/2] net: use cmpxchg instead of spinlock in ptr rings
  2016-11-11  4:44 ` [RFC PATCH 1/2] net: use cmpxchg instead of spinlock in ptr rings John Fastabend
@ 2016-11-14 11:09   ` Jesper Dangaard Brouer
  2016-11-14 23:01   ` Michael S. Tsirkin
  1 sibling, 0 replies; 9+ messages in thread
From: Jesper Dangaard Brouer @ 2016-11-14 11:09 UTC (permalink / raw)
  To: John Fastabend
  Cc: jasowang, netdev, linux-kernel, Michael S. Tsirkin, Jason Wang,
	Mathieu Desnoyers


On Thu, 10 Nov 2016 20:44:08 -0800 John Fastabend <john.fastabend@gmail.com> wrote:

> ---
>  include/linux/ptr_ring_ll.h |  136 +++++++++++++++++++++++++++++++++++++++++++
>  include/linux/skb_array.h   |   25 ++++++++
>  2 files changed, 161 insertions(+)
>  create mode 100644 include/linux/ptr_ring_ll.h
> 
> diff --git a/include/linux/ptr_ring_ll.h b/include/linux/ptr_ring_ll.h
> new file mode 100644
> index 0000000..bcb11f3
> --- /dev/null
> +++ b/include/linux/ptr_ring_ll.h
> @@ -0,0 +1,136 @@
> +/*
> + *	Definitions for the 'struct ptr_ring_ll' datastructure.
> + *
> + *	Author:
> + *		John Fastabend <john.r.fastabend@intel.com>
[...]
> + *
> + *	This is a limited-size FIFO maintaining pointers in FIFO order, with
> + *	one CPU producing entries and another consuming entries from a FIFO.
> + *	extended from ptr_ring_ll to use cmpxchg over spin lock.

It sounds like this is Single Producer Single Consumer (SPSC)
implementation, but your implementation actually is Multi Producer
Multi Consumer (MPMC) capable.

The implementation looks a lot like my alf_queue[1] implementation:
 [1] https://github.com/netoptimizer/prototype-kernel/blob/master/kernel/include/linux/alf_queue.h

If the primary use-case is one CPU producing and another consuming,
then the normal ptr_ring (skb_array) will actually be faster!

The reason is ptr_ring avoids bouncing a cache-line between the CPUs on
every ring access.  This is achieved by having the checks for full
(__ptr_ring_full) and empty (__ptr_ring_empty) use the contents of the
array (NULL value).

I actually implemented two micro-benchmarks to measure the difference
between skb_array[2] and alf_queue[3]:
 [2] https://github.com/netoptimizer/prototype-kernel/blob/master/kernel/lib/skb_array_parallel01.c
 [3] https://github.com/netoptimizer/prototype-kernel/blob/master/kernel/lib/alf_queue_parallel01.c


> + */
> +
> +#ifndef _LINUX_PTR_RING_LL_H
> +#define _LINUX_PTR_RING_LL_H 1
> +
[...]
> +
> +struct ptr_ring_ll {
> +	u32 prod_size;
> +	u32 prod_mask;
> +	u32 prod_head;
> +	u32 prod_tail;
> +	u32 cons_size;
> +	u32 cons_mask;
> +	u32 cons_head;
> +	u32 cons_tail;
> +
> +	void **queue;
> +};

Your implementation doesn't even split the consumer and producer into
different cachelines (which in practice doesn't help much due to how
the empty/full checks are performed).

> +
> +/* Note: callers invoking this in a loop must use a compiler barrier,
> + * for example cpu_relax(). Callers must hold producer_lock.
> + */
> +static inline int __ptr_ring_ll_produce(struct ptr_ring_ll *r, void *ptr)
> +{
> +	u32 ret, head, tail, next, slots, mask;
> +
> +	do {
> +		head = READ_ONCE(r->prod_head);
> +		mask = READ_ONCE(r->prod_mask);
> +		tail = READ_ONCE(r->cons_tail);

Problem occur here, as the producer need to access/read the consumers
tail, to determine if the queue is not already full (slots avail).
Thus, the next "consumer-CPU" will see the cacheline in wrong state
(Modified/Invalid or Shared).

> +
> +		slots = mask + tail - head;
> +		if (slots < 1)
> +			return -ENOMEM;
> +
> +		next = head + 1;
> +		ret = cmpxchg(&r->prod_head, head, next);
> +	} while (ret != head);
> +
> +	r->queue[head & mask] = ptr;
> +	smp_wmb();
> +
> +	while (r->prod_tail != head)
> +		cpu_relax();
> +
> +	r->prod_tail = next;
> +	return 0;
> +}
> +
> +static inline void *__ptr_ring_ll_consume(struct ptr_ring_ll *r)
> +{
> +	u32 ret, head, tail, next, slots, mask;
> +	void *ptr;
> +
> +	do {
> +		head = READ_ONCE(r->cons_head);
> +		mask = READ_ONCE(r->cons_mask);
> +		tail = READ_ONCE(r->prod_tail);

Like wise the consumer is reading the producer tail (for the empty check).

> +
> +		slots = tail - head;
> +		if (slots < 1)
> +			return ERR_PTR(-ENOMEM);
> +
> +		next = head + 1;
> +		ret = cmpxchg(&r->cons_head, head, next);
> +	} while (ret != head);
> +
> +	ptr = r->queue[head & mask];
> +	smp_rmb();
> +
> +	while (r->cons_tail != head)
> +		cpu_relax();
> +
> +	r->cons_tail = next;
> +	return ptr;
> +}
> +
> +static inline void **__ptr_ring_ll_init_queue_alloc(int size, gfp_t gfp)
> +{
> +	return kzalloc(ALIGN(size * sizeof(void *), SMP_CACHE_BYTES), gfp);
> +}
> +
> +static inline int ptr_ring_ll_init(struct ptr_ring_ll *r, int size, gfp_t gfp)
> +{
> +	r->queue = __ptr_ring_init_queue_alloc(size, gfp);
> +	if (!r->queue)
> +		return -ENOMEM;
> +
> +	r->prod_size = r->cons_size = size;
> +	r->prod_mask = r->cons_mask = size - 1;

Shouldn't we have some check like is_power_of_2(size), as this code
looks like it depend on this.

> +	r->prod_tail = r->prod_head = 0;
> +	r->cons_tail = r->prod_tail = 0;
> +
> +	return 0;
> +}
> +
[...]
> +#endif /* _LINUX_PTR_RING_LL_H  */
> diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h
> index f4dfade..9b43dfd 100644
> --- a/include/linux/skb_array.h
> +++ b/include/linux/skb_array.h
[...]
>  
> +static inline int skb_array_ll_produce(struct skb_array_ll *a, struct sk_buff *skb)
> +{
> +	return __ptr_ring_ll_produce(&a->ring, skb);
> +}
> +
[...]
>  
> +static inline struct sk_buff *skb_array_ll_consume(struct skb_array_ll *a)
> +{
> +	return __ptr_ring_ll_consume(&a->ring);
> +}
> +

Note in the Multi Producer Multi Consumer (MPMC) use-case this type of
queue can be faster than normal ptr_ring.  And in patch2 you implement
bulking, which is where the real benefit shows (in the MPMC case) for
this kind of queue.

What I would really like to see is a lock-free (locked cmpxchg) queue
implementation, what like ptr_ring use the array as empty/full check,
and still (somehow) support bulking.

-- 
Best regards,
  Jesper Dangaard Brouer
  MSc.CS, Principal Kernel Engineer at Red Hat
  Author of http://www.iptv-analyzer.org
  LinkedIn: http://www.linkedin.com/in/brouer

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [RFC PATCH 1/2] net: use cmpxchg instead of spinlock in ptr rings
  2016-11-11  4:44 ` [RFC PATCH 1/2] net: use cmpxchg instead of spinlock in ptr rings John Fastabend
  2016-11-14 11:09   ` Jesper Dangaard Brouer
@ 2016-11-14 23:01   ` Michael S. Tsirkin
  2016-11-16  4:30     ` John Fastabend
  1 sibling, 1 reply; 9+ messages in thread
From: Michael S. Tsirkin @ 2016-11-14 23:01 UTC (permalink / raw)
  To: John Fastabend; +Cc: jasowang, netdev, linux-kernel

On Thu, Nov 10, 2016 at 08:44:08PM -0800, John Fastabend wrote:
> 
> ---
>  include/linux/ptr_ring_ll.h |  136 +++++++++++++++++++++++++++++++++++++++++++
>  include/linux/skb_array.h   |   25 ++++++++
>  2 files changed, 161 insertions(+)
>  create mode 100644 include/linux/ptr_ring_ll.h
> 
> diff --git a/include/linux/ptr_ring_ll.h b/include/linux/ptr_ring_ll.h
> new file mode 100644
> index 0000000..bcb11f3
> --- /dev/null
> +++ b/include/linux/ptr_ring_ll.h
> @@ -0,0 +1,136 @@
> +/*
> + *	Definitions for the 'struct ptr_ring_ll' datastructure.
> + *
> + *	Author:
> + *		John Fastabend <john.r.fastabend@intel.com>
> + *
> + *	Copyright (C) 2016 Intel Corp.
> + *
> + *	This program is free software; you can redistribute it and/or modify it
> + *	under the terms of the GNU General Public License as published by the
> + *	Free Software Foundation; either version 2 of the License, or (at your
> + *	option) any later version.
> + *
> + *	This is a limited-size FIFO maintaining pointers in FIFO order, with
> + *	one CPU producing entries and another consuming entries from a FIFO.
> + *	extended from ptr_ring_ll to use cmpxchg over spin lock.

So when is each one (ptr-ring/ptr-ring-ll) a win? _ll suffix seems to
imply this gives a better latency, OTOH for a ping/pong I suspect
ptr-ring would be better as it avoids index cache line bounces.

> + */
> +
> +#ifndef _LINUX_PTR_RING_LL_H
> +#define _LINUX_PTR_RING_LL_H 1
> +
> +#ifdef __KERNEL__
> +#include <linux/spinlock.h>
> +#include <linux/cache.h>
> +#include <linux/types.h>
> +#include <linux/compiler.h>
> +#include <linux/cache.h>
> +#include <linux/slab.h>
> +#include <asm/errno.h>
> +#endif
> +
> +struct ptr_ring_ll {
> +	u32 prod_size;
> +	u32 prod_mask;
> +	u32 prod_head;
> +	u32 prod_tail;
> +	u32 cons_size;
> +	u32 cons_mask;
> +	u32 cons_head;
> +	u32 cons_tail;
> +
> +	void **queue;
> +};
> +
> +/* Note: callers invoking this in a loop must use a compiler barrier,
> + * for example cpu_relax(). Callers must hold producer_lock.
> + */
> +static inline int __ptr_ring_ll_produce(struct ptr_ring_ll *r, void *ptr)
> +{
> +	u32 ret, head, tail, next, slots, mask;
> +
> +	do {
> +		head = READ_ONCE(r->prod_head);
> +		mask = READ_ONCE(r->prod_mask);
> +		tail = READ_ONCE(r->cons_tail);
> +
> +		slots = mask + tail - head;
> +		if (slots < 1)
> +			return -ENOMEM;
> +
> +		next = head + 1;
> +		ret = cmpxchg(&r->prod_head, head, next);
> +	} while (ret != head);


So why is this preferable to a lock?

I suspect it's nothing else than the qspinlock fairness
and polling code complexity. It's all not very useful if you
1. are just doing a couple of instructions under the lock
and
2. use a finite FIFO which is unfair anyway


How about this hack (lifted from virt_spin_lock):

static inline void quick_spin_lock(struct qspinlock *lock)
{
        do {
                while (atomic_read(&lock->val) != 0)
                        cpu_relax();
        } while (atomic_cmpxchg(&lock->val, 0, _Q_LOCKED_VAL) != 0);
}

Or maybe we should even drop the atomic_read in the middle -
worth profiling and comparing:

static inline void quick_spin_lock(struct qspinlock *lock)
{
        while (atomic_cmpxchg(&lock->val, 0, _Q_LOCKED_VAL) != 0)
		cpu_relax();
}


Then, use quick_spin_lock instead of spin_lock everywhere in
ptr_ring - will that make it more efficient?


> +
> +	r->queue[head & mask] = ptr;
> +	smp_wmb();
> +
> +	while (r->prod_tail != head)
> +		cpu_relax();
> +
> +	r->prod_tail = next;
> +	return 0;
> +}
> +
> +static inline void *__ptr_ring_ll_consume(struct ptr_ring_ll *r)
> +{
> +	u32 ret, head, tail, next, slots, mask;
> +	void *ptr;
> +
> +	do {
> +		head = READ_ONCE(r->cons_head);
> +		mask = READ_ONCE(r->cons_mask);
> +		tail = READ_ONCE(r->prod_tail);
> +
> +		slots = tail - head;
> +		if (slots < 1)
> +			return ERR_PTR(-ENOMEM);
> +
> +		next = head + 1;
> +		ret = cmpxchg(&r->cons_head, head, next);
> +	} while (ret != head);
> +
> +	ptr = r->queue[head & mask];
> +	smp_rmb();
> +
> +	while (r->cons_tail != head)
> +		cpu_relax();
> +
> +	r->cons_tail = next;
> +	return ptr;
> +}
> +
> +static inline void **__ptr_ring_ll_init_queue_alloc(int size, gfp_t gfp)
> +{
> +	return kzalloc(ALIGN(size * sizeof(void *), SMP_CACHE_BYTES), gfp);
> +}
> +
> +static inline int ptr_ring_ll_init(struct ptr_ring_ll *r, int size, gfp_t gfp)
> +{
> +	r->queue = __ptr_ring_init_queue_alloc(size, gfp);
> +	if (!r->queue)
> +		return -ENOMEM;
> +
> +	r->prod_size = r->cons_size = size;
> +	r->prod_mask = r->cons_mask = size - 1;
> +	r->prod_tail = r->prod_head = 0;
> +	r->cons_tail = r->prod_tail = 0;
> +
> +	return 0;
> +}
> +
> +static inline void ptr_ring_ll_cleanup(struct ptr_ring_ll *r, void (*destroy)(void *))
> +{
> +	if (destroy) {
> +		void *ptr;
> +
> +		ptr = __ptr_ring_ll_consume(r);
> +		while (!IS_ERR_OR_NULL(ptr)) {
> +			destroy(ptr);
> +			ptr = __ptr_ring_ll_consume(r);
> +		}
> +	}
> +	kfree(r->queue);
> +}
> +
> +#endif /* _LINUX_PTR_RING_LL_H  */
> diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h
> index f4dfade..9b43dfd 100644
> --- a/include/linux/skb_array.h
> +++ b/include/linux/skb_array.h
> @@ -22,6 +22,7 @@
>  
>  #ifdef __KERNEL__
>  #include <linux/ptr_ring.h>
> +#include <linux/ptr_ring_ll.h>
>  #include <linux/skbuff.h>
>  #include <linux/if_vlan.h>
>  #endif
> @@ -30,6 +31,10 @@ struct skb_array {
>  	struct ptr_ring ring;
>  };
>  
> +struct skb_array_ll {
> +	struct ptr_ring_ll ring;
> +};
> +
>  /* Might be slightly faster than skb_array_full below, but callers invoking
>   * this in a loop must use a compiler barrier, for example cpu_relax().
>   */
> @@ -43,6 +48,11 @@ static inline bool skb_array_full(struct skb_array *a)
>  	return ptr_ring_full(&a->ring);
>  }
>  
> +static inline int skb_array_ll_produce(struct skb_array_ll *a, struct sk_buff *skb)
> +{
> +	return __ptr_ring_ll_produce(&a->ring, skb);
> +}
> +
>  static inline int skb_array_produce(struct skb_array *a, struct sk_buff *skb)
>  {
>  	return ptr_ring_produce(&a->ring, skb);
> @@ -92,6 +102,11 @@ static inline bool skb_array_empty_any(struct skb_array *a)
>  	return ptr_ring_empty_any(&a->ring);
>  }
>  
> +static inline struct sk_buff *skb_array_ll_consume(struct skb_array_ll *a)
> +{
> +	return __ptr_ring_ll_consume(&a->ring);
> +}
> +
>  static inline struct sk_buff *skb_array_consume(struct skb_array *a)
>  {
>  	return ptr_ring_consume(&a->ring);
> @@ -146,6 +161,11 @@ static inline int skb_array_peek_len_any(struct skb_array *a)
>  	return PTR_RING_PEEK_CALL_ANY(&a->ring, __skb_array_len_with_tag);
>  }
>  
> +static inline int skb_array_ll_init(struct skb_array_ll *a, int size, gfp_t gfp)
> +{
> +	return ptr_ring_ll_init(&a->ring, size, gfp);
> +}
> +
>  static inline int skb_array_init(struct skb_array *a, int size, gfp_t gfp)
>  {
>  	return ptr_ring_init(&a->ring, size, gfp);
> @@ -170,6 +190,11 @@ static inline int skb_array_resize_multiple(struct skb_array **rings,
>  					__skb_array_destroy_skb);
>  }
>  
> +static inline void skb_array_ll_cleanup(struct skb_array_ll *a)
> +{
> +	ptr_ring_ll_cleanup(&a->ring, __skb_array_destroy_skb);
> +}
> +
>  static inline void skb_array_cleanup(struct skb_array *a)
>  {
>  	ptr_ring_cleanup(&a->ring, __skb_array_destroy_skb);

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [RFC PATCH 2/2] ptr_ring_ll: pop/push multiple objects at once
  2016-11-11  4:44 ` [RFC PATCH 2/2] ptr_ring_ll: pop/push multiple objects at once John Fastabend
@ 2016-11-14 23:06   ` Michael S. Tsirkin
  2016-11-16  4:42     ` John Fastabend
  0 siblings, 1 reply; 9+ messages in thread
From: Michael S. Tsirkin @ 2016-11-14 23:06 UTC (permalink / raw)
  To: John Fastabend; +Cc: jasowang, netdev, linux-kernel

On Thu, Nov 10, 2016 at 08:44:32PM -0800, John Fastabend wrote:
> Signed-off-by: John Fastabend <john.r.fastabend@intel.com>

This will naturally reduce the cache line bounce
costs, but so will a _many API for ptr-ring,
doing lock-add many-unlock.

the number of atomics also scales better with the lock:
one per push instead of one per queue.

Also, when can qdisc use a _many operation?


> ---
>  include/linux/ptr_ring_ll.h |   22 ++++++++++++++++------
>  include/linux/skb_array.h   |   11 +++++++++--
>  net/sched/sch_generic.c     |    2 +-
>  3 files changed, 26 insertions(+), 9 deletions(-)
> 
> diff --git a/include/linux/ptr_ring_ll.h b/include/linux/ptr_ring_ll.h
> index bcb11f3..5dc25f7 100644
> --- a/include/linux/ptr_ring_ll.h
> +++ b/include/linux/ptr_ring_ll.h
> @@ -45,9 +45,10 @@ struct ptr_ring_ll {
>  /* Note: callers invoking this in a loop must use a compiler barrier,
>   * for example cpu_relax(). Callers must hold producer_lock.
>   */
> -static inline int __ptr_ring_ll_produce(struct ptr_ring_ll *r, void *ptr)
> +static inline int __ptr_ring_ll_produce_many(struct ptr_ring_ll *r,
> +					     void **ptr, int num)
>  {
> -	u32 ret, head, tail, next, slots, mask;
> +	u32 ret, head, tail, next, slots, mask, i;
>  
>  	do {
>  		head = READ_ONCE(r->prod_head);
> @@ -55,21 +56,30 @@ static inline int __ptr_ring_ll_produce(struct ptr_ring_ll *r, void *ptr)
>  		tail = READ_ONCE(r->cons_tail);
>  
>  		slots = mask + tail - head;
> -		if (slots < 1)
> +		if (slots < num)
> +			num = slots;
> +
> +		if (unlikely(!num))
>  			return -ENOMEM;
>  
> -		next = head + 1;
> +		next = head + num;
>  		ret = cmpxchg(&r->prod_head, head, next);
>  	} while (ret != head);
>  
> -	r->queue[head & mask] = ptr;
> +	for (i = 0; i < num; i++)
> +		r->queue[(head + i) & mask] = ptr[i];
>  	smp_wmb();
>  
>  	while (r->prod_tail != head)
>  		cpu_relax();
>  
>  	r->prod_tail = next;
> -	return 0;
> +	return num;
> +}
> +
> +static inline int __ptr_ring_ll_produce(struct ptr_ring_ll *r, void **ptr)
> +{
> +	return __ptr_ring_ll_produce_many(r, ptr, 1);
>  }
>  
>  static inline void *__ptr_ring_ll_consume(struct ptr_ring_ll *r)
> diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h
> index 9b43dfd..de3c700 100644
> --- a/include/linux/skb_array.h
> +++ b/include/linux/skb_array.h
> @@ -48,9 +48,16 @@ static inline bool skb_array_full(struct skb_array *a)
>  	return ptr_ring_full(&a->ring);
>  }
>  
> -static inline int skb_array_ll_produce(struct skb_array_ll *a, struct sk_buff *skb)
> +static inline int skb_array_ll_produce_many(struct skb_array_ll *a,
> +					    struct sk_buff **skb, int num)
>  {
> -	return __ptr_ring_ll_produce(&a->ring, skb);
> +	return __ptr_ring_ll_produce_many(&a->ring, (void **)skb, num);
> +}
> +
> +static inline int skb_array_ll_produce(struct skb_array_ll *a,
> +				       struct sk_buff **skb)
> +{
> +	return __ptr_ring_ll_produce(&a->ring, (void **)skb);
>  }
>  
>  static inline int skb_array_produce(struct skb_array *a, struct sk_buff *skb)
> diff --git a/net/sched/sch_generic.c b/net/sched/sch_generic.c
> index 4648ec8..58f2011 100644
> --- a/net/sched/sch_generic.c
> +++ b/net/sched/sch_generic.c
> @@ -571,7 +571,7 @@ static int pfifo_fast_enqueue(struct sk_buff *skb, struct Qdisc *qdisc,
>  	struct skb_array_ll *q = band2list(priv, band);
>  	int err;
>  
> -	err = skb_array_ll_produce(q, skb);
> +	err = skb_array_ll_produce(q, &skb);
>  
>  	if (unlikely(err)) {
>  		net_warn_ratelimited("drop a packet from fast enqueue\n");

I don't see a pop many operation here.

-- 
MST

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [RFC PATCH 1/2] net: use cmpxchg instead of spinlock in ptr rings
  2016-11-14 23:01   ` Michael S. Tsirkin
@ 2016-11-16  4:30     ` John Fastabend
  0 siblings, 0 replies; 9+ messages in thread
From: John Fastabend @ 2016-11-16  4:30 UTC (permalink / raw)
  To: Michael S. Tsirkin; +Cc: jasowang, netdev, linux-kernel

On 16-11-14 03:01 PM, Michael S. Tsirkin wrote:
> On Thu, Nov 10, 2016 at 08:44:08PM -0800, John Fastabend wrote:
>>
>> ---
>>  include/linux/ptr_ring_ll.h |  136 +++++++++++++++++++++++++++++++++++++++++++
>>  include/linux/skb_array.h   |   25 ++++++++
>>  2 files changed, 161 insertions(+)
>>  create mode 100644 include/linux/ptr_ring_ll.h
>>
>> diff --git a/include/linux/ptr_ring_ll.h b/include/linux/ptr_ring_ll.h
>> new file mode 100644
>> index 0000000..bcb11f3
>> --- /dev/null
>> +++ b/include/linux/ptr_ring_ll.h
>> @@ -0,0 +1,136 @@
>> +/*
>> + *	Definitions for the 'struct ptr_ring_ll' datastructure.
>> + *
>> + *	Author:
>> + *		John Fastabend <john.r.fastabend@intel.com>
>> + *
>> + *	Copyright (C) 2016 Intel Corp.
>> + *
>> + *	This program is free software; you can redistribute it and/or modify it
>> + *	under the terms of the GNU General Public License as published by the
>> + *	Free Software Foundation; either version 2 of the License, or (at your
>> + *	option) any later version.
>> + *
>> + *	This is a limited-size FIFO maintaining pointers in FIFO order, with
>> + *	one CPU producing entries and another consuming entries from a FIFO.
>> + *	extended from ptr_ring_ll to use cmpxchg over spin lock.
> 
> So when is each one (ptr-ring/ptr-ring-ll) a win? _ll suffix seems to
> imply this gives a better latency, OTOH for a ping/pong I suspect
> ptr-ring would be better as it avoids index cache line bounces.

My observation under qdisc testing with pktgen is that I get better pps
numbers with this code vs ptr_ring using spinlock. I actually wrote
this implementation before the skb_array code was around though and
haven't done a thorough analysis of the two yet only pktgen benchmarks.

In my pktgen benchmarks I test 1:1 producer/consumer and many to one
producer/consumer tests. I'll post some numbers later this week.

[...]

>> + */
>> +static inline int __ptr_ring_ll_produce(struct ptr_ring_ll *r, void *ptr)
>> +{
>> +	u32 ret, head, tail, next, slots, mask;
>> +
>> +	do {
>> +		head = READ_ONCE(r->prod_head);
>> +		mask = READ_ONCE(r->prod_mask);
>> +		tail = READ_ONCE(r->cons_tail);
>> +
>> +		slots = mask + tail - head;
>> +		if (slots < 1)
>> +			return -ENOMEM;
>> +
>> +		next = head + 1;
>> +		ret = cmpxchg(&r->prod_head, head, next);
>> +	} while (ret != head);
> 
> 
> So why is this preferable to a lock?
> 
> I suspect it's nothing else than the qspinlock fairness
> and polling code complexity. It's all not very useful if you
> 1. are just doing a couple of instructions under the lock
> and
> 2. use a finite FIFO which is unfair anyway
> 
> 
> How about this hack (lifted from virt_spin_lock):
> 
> static inline void quick_spin_lock(struct qspinlock *lock)
> {
>         do {
>                 while (atomic_read(&lock->val) != 0)
>                         cpu_relax();
>         } while (atomic_cmpxchg(&lock->val, 0, _Q_LOCKED_VAL) != 0);
> }
> 
> Or maybe we should even drop the atomic_read in the middle -
> worth profiling and comparing:
> 
> static inline void quick_spin_lock(struct qspinlock *lock)
> {
>         while (atomic_cmpxchg(&lock->val, 0, _Q_LOCKED_VAL) != 0)
> 		cpu_relax();
> }
> 
> 
> Then, use quick_spin_lock instead of spin_lock everywhere in
> ptr_ring - will that make it more efficient?
> 

I think this could be the case. I'll give it a test later this week I
am working on the xdp bits for virtio at the moment. To be honest though
for my qdisc patchset first I need to resolve a bug and then probably in
the first set just use the existing skb_array implementation. Its fun
to micro-optimize this stuff but really any implementation will show
improvement over existing code.

Thanks,
John

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [RFC PATCH 2/2] ptr_ring_ll: pop/push multiple objects at once
  2016-11-14 23:06   ` Michael S. Tsirkin
@ 2016-11-16  4:42     ` John Fastabend
  2016-11-16  5:23       ` Michael S. Tsirkin
  0 siblings, 1 reply; 9+ messages in thread
From: John Fastabend @ 2016-11-16  4:42 UTC (permalink / raw)
  To: Michael S. Tsirkin; +Cc: jasowang, netdev, linux-kernel

On 16-11-14 03:06 PM, Michael S. Tsirkin wrote:
> On Thu, Nov 10, 2016 at 08:44:32PM -0800, John Fastabend wrote:
>> Signed-off-by: John Fastabend <john.r.fastabend@intel.com>
> 
> This will naturally reduce the cache line bounce
> costs, but so will a _many API for ptr-ring,
> doing lock-add many-unlock.
> 
> the number of atomics also scales better with the lock:
> one per push instead of one per queue.
> 
> Also, when can qdisc use a _many operation?
> 

On dequeue we can pull off many skbs instead of one at a time and
then either (a) pass them down as an array to the driver (I started
to write this on top of ixgbe and it seems like a win) or (b) pass
them one by one down to the driver and set the xmit_more bit correctly.

The pass one by one also seems like a win because we avoid the lock
per skb.

On enqueue qdisc side its a bit more evasive to start doing this.


[...]

>> +++ b/net/sched/sch_generic.c
>> @@ -571,7 +571,7 @@ static int pfifo_fast_enqueue(struct sk_buff *skb, struct Qdisc *qdisc,
>>  	struct skb_array_ll *q = band2list(priv, band);
>>  	int err;
>>  
>> -	err = skb_array_ll_produce(q, skb);
>> +	err = skb_array_ll_produce(q, &skb);
>>  
>>  	if (unlikely(err)) {
>>  		net_warn_ratelimited("drop a packet from fast enqueue\n");
> 
> I don't see a pop many operation here.
> 

Patches need a bit of cleanup looks like it was part of another patch.

.John

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [RFC PATCH 2/2] ptr_ring_ll: pop/push multiple objects at once
  2016-11-16  4:42     ` John Fastabend
@ 2016-11-16  5:23       ` Michael S. Tsirkin
  0 siblings, 0 replies; 9+ messages in thread
From: Michael S. Tsirkin @ 2016-11-16  5:23 UTC (permalink / raw)
  To: John Fastabend; +Cc: jasowang, netdev, linux-kernel

On Tue, Nov 15, 2016 at 08:42:03PM -0800, John Fastabend wrote:
> On 16-11-14 03:06 PM, Michael S. Tsirkin wrote:
> > On Thu, Nov 10, 2016 at 08:44:32PM -0800, John Fastabend wrote:
> >> Signed-off-by: John Fastabend <john.r.fastabend@intel.com>
> > 
> > This will naturally reduce the cache line bounce
> > costs, but so will a _many API for ptr-ring,
> > doing lock-add many-unlock.
> > 
> > the number of atomics also scales better with the lock:
> > one per push instead of one per queue.
> > 
> > Also, when can qdisc use a _many operation?
> > 
> 
> On dequeue we can pull off many skbs instead of one at a time and
> then either (a) pass them down as an array to the driver (I started
> to write this on top of ixgbe and it seems like a win) or (b) pass
> them one by one down to the driver and set the xmit_more bit correctly.
> 
> The pass one by one also seems like a win because we avoid the lock
> per skb.
> 
> On enqueue qdisc side its a bit more evasive to start doing this.
> 
> 
> [...]


I see. So we could wrap __ptr_ring_consume and
implement __skb_array_consume. You can call that
in a loop under a lock. I would limit it to something
small like 16 pointers, to make sure lock contention is
not an issue.

-- 
MST

^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2016-11-16  5:23 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-11-11  4:43 [RFC PATCH 0/2] illustrate cmpxchg ring for tap/tun and qdisc John Fastabend
2016-11-11  4:44 ` [RFC PATCH 1/2] net: use cmpxchg instead of spinlock in ptr rings John Fastabend
2016-11-14 11:09   ` Jesper Dangaard Brouer
2016-11-14 23:01   ` Michael S. Tsirkin
2016-11-16  4:30     ` John Fastabend
2016-11-11  4:44 ` [RFC PATCH 2/2] ptr_ring_ll: pop/push multiple objects at once John Fastabend
2016-11-14 23:06   ` Michael S. Tsirkin
2016-11-16  4:42     ` John Fastabend
2016-11-16  5:23       ` Michael S. Tsirkin

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).