All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH v6 0/3] skb_array: array based FIFO for skbs
@ 2016-06-01 12:54 Michael S. Tsirkin
  2016-06-01 12:54 ` [PATCH v6 1/3] ptr_ring: array based FIFO for pointers Michael S. Tsirkin
                   ` (3 more replies)
  0 siblings, 4 replies; 8+ messages in thread
From: Michael S. Tsirkin @ 2016-06-01 12:54 UTC (permalink / raw)
  To: linux-kernel
  Cc: Michael S. Tsirkin, Jason Wang, Eric Dumazet, davem, netdev,
	Steven Rostedt, brouer, kvm

This is in response to the proposal by Jason to make tun
rx packet queue lockless using a circular buffer.
My testing seems to show that at least for the common usecase
in networking, which isn't lockless, circular buffer
with indices does not perform that well, because
each index access causes a cache line to bounce between
CPUs, and index access causes stalls due to the dependency.

By comparison, an array of pointers where NULL means invalid
and !NULL means valid, can be updated without messing up barriers
at all and does not have this issue.

On the flip side, cache pressure may be caused by using large queues.
tun has a queue of 1000 entries by default and that's 8K.
At this point I'm not sure this can be solved efficiently.
The correct solution might be sizing the queues appropriately.

Here's an implementation of this idea: it can be used more
or less whenever sk_buff_head can be used, except you need
to know the queue size in advance.

As this might be useful outside of networking, I implemented
a generic array of void pointers, with a type-safe wrapper for skbs.

I didn't implement resizing but it's possible to resize it by holding both
consumer and producer locks.

I think this code works fine without any extra memory barriers since we
always read and write the same location, so the accesses can not be
reordered.
Multiple writes of the same value into memory would mess things up
for us, I don't think compilers would do it though.
But if people feel it's better to be safe wrt compiler optimizations,
specifying queue as volatile would probably do it in a cleaner way
than converting all accesses to READ_ONCE/WRITE_ONCE. Thoughts?

The only issue is with calls within a loop using the __ptr_ring_XXX
accessors - in theory compiler could hoist accesses out of the loop.

Following volatile-considered-harmful.txt I merely
documented that callers that busy-poll should invoke cpu_relax().
Most people will use the external skb_array_XXX APIs with a spinlock,
so this should not be an issue for them.

Eric Dumazet suggested adding an extra pointer to skb for when
we have a single outstanding packet. I could not figure out
a way to implement this without a shared consumer/producer lock
though, which would cause cache line bounces by itself.

changes since v5
	implemented a generic ptr_ring api, and
		made skb_array a type-safe wrapper
	apis for taking the spinlock in different contexts
		following expected usecase in tun

changes since v4 (v3 was never posted)
	documentation
	dropped SKB_ARRAY_MIN_SIZE heuristic
	unit test (in userspace, included as patch 2)

changes since v2:
        fixed integer overflow pointed out by Eric.
        added some comments.

changes since v1:
        fixed bug pointed out by Eric.


Michael S. Tsirkin (3):
  ptr_ring: array based FIFO for pointers
  ptr_ring: ring test
  skb_array: array based FIFO for skbs

 include/linux/ptr_ring.h         | 263 +++++++++++++++++++++++++++++++++++++++
 include/linux/skb_array.h        | 125 +++++++++++++++++++
 tools/virtio/ringtest/ptr_ring.c | 192 ++++++++++++++++++++++++++++
 tools/virtio/ringtest/Makefile   |   5 +-
 4 files changed, 584 insertions(+), 1 deletion(-)
 create mode 100644 include/linux/ptr_ring.h
 create mode 100644 include/linux/skb_array.h
 create mode 100644 tools/virtio/ringtest/ptr_ring.c

-- 
MST

^ permalink raw reply	[flat|nested] 8+ messages in thread

* [PATCH v6 1/3] ptr_ring: array based FIFO for pointers
  2016-06-01 12:54 [PATCH v6 0/3] skb_array: array based FIFO for skbs Michael S. Tsirkin
@ 2016-06-01 12:54 ` Michael S. Tsirkin
  2016-06-01 12:54 ` [PATCH v6 2/3] ptr_ring: ring test Michael S. Tsirkin
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 8+ messages in thread
From: Michael S. Tsirkin @ 2016-06-01 12:54 UTC (permalink / raw)
  To: linux-kernel
  Cc: Michael S. Tsirkin, Jason Wang, Eric Dumazet, davem, netdev,
	Steven Rostedt, brouer, kvm

A simple array based FIFO of pointers.  Intended for net stack which
commonly has a single consumer/producer.

Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
---
 include/linux/ptr_ring.h | 267 +++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 267 insertions(+)
 create mode 100644 include/linux/ptr_ring.h

diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
new file mode 100644
index 0000000..b40af9a
--- /dev/null
+++ b/include/linux/ptr_ring.h
@@ -0,0 +1,267 @@
+/*
+ *	Definitions for the 'struct ptr_ring' datastructure.
+ *
+ *	Author:
+ *		Michael S. Tsirkin <mst@redhat.com>
+ *
+ *	Copyright (C) 2016 Red Hat, Inc.
+ *
+ *	This program is free software; you can redistribute it and/or modify it
+ *	under the terms of the GNU General Public License as published by the
+ *	Free Software Foundation; either version 2 of the License, or (at your
+ *	option) any later version.
+ *
+ *	This is a limited-size FIFO maintaining pointers in FIFO order, with
+ *	one CPU producing entries and another consuming entries from a FIFO.
+ *
+ *	This implementation tries to minimize cache-contention when there is a
+ *	single producer and a single consumer CPU.
+ */
+
+#ifndef _LINUX_PTR_RING_H
+#define _LINUX_PTR_RING_H 1
+
+#ifdef __KERNEL__
+#include <linux/spinlock.h>
+#include <linux/cache.h>
+#include <linux/types.h>
+#include <linux/compiler.h>
+#include <linux/cache.h>
+#include <linux/slab.h>
+#include <asm/errno.h>
+#endif
+
+struct ptr_ring {
+	int producer ____cacheline_aligned_in_smp;
+	spinlock_t producer_lock;
+	int consumer ____cacheline_aligned_in_smp;
+	spinlock_t consumer_lock;
+	/* Shared consumer/producer data */
+	/* Read-only by both the producer and the consumer */
+	int size ____cacheline_aligned_in_smp; /* max entries in queue */
+	void **queue;
+};
+
+/* Note: callers invoking this in a loop must use a compiler barrier,
+ * for example cpu_relax().
+ * Callers don't need to take producer lock - if they don't
+ * the next call to __ptr_ring_produce may fail.
+ */
+static inline bool __ptr_ring_full(struct ptr_ring *r)
+{
+	return r->queue[r->producer];
+}
+
+static inline bool ptr_ring_full(struct ptr_ring *r)
+{
+	barrier();
+	return __ptr_ring_full(r);
+}
+
+/* Note: callers invoking this in a loop must use a compiler barrier,
+ * for example cpu_relax().
+ */
+static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
+{
+	if (__ptr_ring_full(r))
+		return -ENOSPC;
+
+	r->queue[r->producer++] = ptr;
+	if (unlikely(r->producer >= r->size))
+		r->producer = 0;
+	return 0;
+}
+
+static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
+{
+	int ret;
+
+	spin_lock(&r->producer_lock);
+	ret = __ptr_ring_produce(r, ptr);
+	spin_unlock(&r->producer_lock);
+
+	return ret;
+}
+
+static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
+{
+	int ret;
+
+	spin_lock_irq(&r->producer_lock);
+	ret = __ptr_ring_produce(r, ptr);
+	spin_unlock_irq(&r->producer_lock);
+
+	return ret;
+}
+
+static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
+{
+	unsigned long flags;
+	int ret;
+
+	spin_lock_irqsave(&r->producer_lock, flags);
+	ret = __ptr_ring_produce(r, ptr);
+	spin_unlock_irqrestore(&r->producer_lock, flags);
+
+	return ret;
+}
+
+static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
+{
+	int ret;
+
+	spin_lock_bh(&r->producer_lock);
+	ret = __ptr_ring_produce(r, ptr);
+	spin_unlock_bh(&r->producer_lock);
+
+	return ret;
+}
+
+/* Note: callers invoking this in a loop must use a compiler barrier,
+ * for example cpu_relax(). Callers must take consumer_lock
+ * if they dereference the pointer - see e.g. PTR_RING_PEEK_FIELD.
+ * There's no need for a lock if pointer is merely tested - see e.g.
+ * ptr_ring_empty.
+ */
+static inline void *__ptr_ring_peek(struct ptr_ring *r)
+{
+	return r->queue[r->consumer];
+}
+
+static inline bool ptr_ring_empty(struct ptr_ring *r)
+{
+	barrier();
+	return !__ptr_ring_peek(r);
+}
+
+/* Must only be called after __ptr_ring_peek returned !NULL */
+static inline void __ptr_ring_discard_one(struct ptr_ring *r)
+{
+	r->queue[r->consumer++] = NULL;
+	if (unlikely(r->consumer >= r->size))
+		r->consumer = 0;
+}
+
+static inline void *__ptr_ring_consume(struct ptr_ring *r)
+{
+	void *ptr;
+
+	ptr = __ptr_ring_peek(r);
+	if (ptr)
+		__ptr_ring_discard_one(r);
+
+	return ptr;
+}
+
+static inline void *ptr_ring_consume(struct ptr_ring *r)
+{
+	void *ptr;
+
+	spin_lock(&r->consumer_lock);
+	ptr = __ptr_ring_consume(r);
+	spin_unlock(&r->consumer_lock);
+
+	return ptr;
+}
+
+static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
+{
+	void *ptr;
+
+	spin_lock_irq(&r->consumer_lock);
+	ptr = __ptr_ring_consume(r);
+	spin_unlock_irq(&r->consumer_lock);
+
+	return ptr;
+}
+
+static inline void *ptr_ring_consume_any(struct ptr_ring *r)
+{
+	unsigned long flags;
+	void *ptr;
+
+	spin_lock_irqsave(&r->consumer_lock, flags);
+	ptr = __ptr_ring_consume(r);
+	spin_unlock_irqrestore(&r->consumer_lock, flags);
+
+	return ptr;
+}
+
+static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
+{
+	void *ptr;
+
+	spin_lock(&r->consumer_lock);
+	ptr = __ptr_ring_consume(r);
+	spin_unlock(&r->consumer_lock);
+
+	return ptr;
+}
+
+/* Cast to structure type and peek at a field without discarding from FIFO.
+ * Callers must take consumer_lock.
+ */
+#define __PTR_RING_PEEK_FIELD(r, type, field, dflt) ({ \
+       type *__PTR_RING_PEEK_FIELD_p = __ptr_ring_peek(r); \
+	\
+       __PTR_RING_PEEK_FIELD_p ? __PTR_RING_PEEK_FIELD_p->field : (dflt); \
+})
+
+#define PTR_RING_PEEK_FIELD(r, type, field, dflt) ({ \
+	typeof(((type *)0)->field) __PTR_RING_PEEK_FIELD_v; \
+	\
+	spin_lock(&(r)->consumer_lock); \
+	__PTR_RING_PEEK_FIELD_v = __PTR_RING_PEEK_FIELD(r, type, field, dflt); \
+	spin_unlock(&(r)->consumer_lock); \
+	__PTR_RING_PEEK_FIELD_v; \
+})
+
+#define PTR_RING_PEEK_FIELD_IRQ(r, type, field, dflt) ({ \
+	typeof(((type *)0)->field) __PTR_RING_PEEK_FIELD_v; \
+	\
+	spin_lock_irq(&(r)->consumer_lock); \
+	__PTR_RING_PEEK_FIELD_v = __PTR_RING_PEEK_FIELD(r, type, field, dflt); \
+	spin_unlock_irq(&(r)->consumer_lock); \
+	__PTR_RING_PEEK_FIELD_v; \
+})
+
+#define PTR_RING_PEEK_FIELD_BH(r, type, field, dflt) ({ \
+	typeof(((type *)0)->field) __PTR_RING_PEEK_FIELD_v; \
+	\
+	spin_lock_bh(&(r)->consumer_lock); \
+	__PTR_RING_PEEK_FIELD_v = __PTR_RING_PEEK_FIELD(r, type, field, dflt); \
+	spin_unlock_bh(&(r)->consumer_lock); \
+	__PTR_RING_PEEK_FIELD_v; \
+})
+
+#define PTR_RING_PEEK_FIELD_ANY(r, type, field, dflt) ({ \
+	typeof(((type *)0)->field) __PTR_RING_PEEK_FIELD_v; \
+	unsigned long __PTR_RING_PEEK_FIELD_f;\
+	\
+	spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_FIELD_f); \
+	__PTR_RING_PEEK_FIELD_v = __PTR_RING_PEEK_FIELD(r, type, field, dflt); \
+	spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_FIELD_f); \
+	__PTR_RING_PEEK_FIELD_v; \
+})
+
+static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
+{
+	r->queue = kzalloc(ALIGN(size * sizeof *(r->queue), SMP_CACHE_BYTES),
+			   gfp);
+	if (!r->queue)
+		return -ENOMEM;
+
+	r->size = size;
+	r->producer = r->consumer = 0;
+	spin_lock_init(&r->producer_lock);
+	spin_lock_init(&r->consumer_lock);
+
+	return 0;
+}
+
+static inline void ptr_ring_cleanup(struct ptr_ring *r)
+{
+	kfree(r->queue);
+}
+
+#endif /* _LINUX_PTR_RING_H  */
-- 
MST

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH v6 2/3] ptr_ring: ring test
  2016-06-01 12:54 [PATCH v6 0/3] skb_array: array based FIFO for skbs Michael S. Tsirkin
  2016-06-01 12:54 ` [PATCH v6 1/3] ptr_ring: array based FIFO for pointers Michael S. Tsirkin
@ 2016-06-01 12:54 ` Michael S. Tsirkin
  2016-06-01 12:54 ` [PATCH v6 3/3] skb_array: array based FIFO for skbs Michael S. Tsirkin
  2016-06-02  4:51 ` [PATCH v6 0/3] " David Miller
  3 siblings, 0 replies; 8+ messages in thread
From: Michael S. Tsirkin @ 2016-06-01 12:54 UTC (permalink / raw)
  To: linux-kernel
  Cc: Michael S. Tsirkin, Jason Wang, Eric Dumazet, davem, netdev,
	Steven Rostedt, brouer, kvm

Add ringtest based unit test for ptr ring.

Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
---
 tools/virtio/ringtest/ptr_ring.c | 192 +++++++++++++++++++++++++++++++++++++++
 tools/virtio/ringtest/Makefile   |   5 +-
 2 files changed, 196 insertions(+), 1 deletion(-)
 create mode 100644 tools/virtio/ringtest/ptr_ring.c

diff --git a/tools/virtio/ringtest/ptr_ring.c b/tools/virtio/ringtest/ptr_ring.c
new file mode 100644
index 0000000..74abd74
--- /dev/null
+++ b/tools/virtio/ringtest/ptr_ring.c
@@ -0,0 +1,192 @@
+#define _GNU_SOURCE
+#include "main.h"
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <pthread.h>
+#include <malloc.h>
+#include <assert.h>
+#include <errno.h>
+#include <limits.h>
+
+#define SMP_CACHE_BYTES 64
+#define cache_line_size() SMP_CACHE_BYTES
+#define ____cacheline_aligned_in_smp __attribute__ ((aligned (SMP_CACHE_BYTES)))
+#define unlikely(x)    (__builtin_expect(!!(x), 0))
+#define ALIGN(x, a) (((x) + (a) - 1) / (a) * (a))
+typedef pthread_spinlock_t  spinlock_t;
+
+typedef int gfp_t;
+static void *kzalloc(unsigned size, gfp_t gfp)
+{
+	void *p = memalign(64, size);
+	if (!p)
+		return p;
+	memset(p, 0, size);
+
+	return p;
+}
+
+static void kfree(void *p)
+{
+	if (p)
+		free(p);
+}
+
+static void spin_lock_init(spinlock_t *lock)
+{
+	int r = pthread_spin_init(lock, 0);
+	assert(!r);
+}
+
+static void spin_lock(spinlock_t *lock)
+{
+	int ret = pthread_spin_lock(lock);
+	assert(!ret);
+}
+
+static void spin_unlock(spinlock_t *lock)
+{
+	int ret = pthread_spin_unlock(lock);
+	assert(!ret);
+}
+
+static void spin_lock_bh(spinlock_t *lock)
+{
+	spin_lock(lock);
+}
+
+static void spin_unlock_bh(spinlock_t *lock)
+{
+	spin_unlock(lock);
+}
+
+static void spin_lock_irq(spinlock_t *lock)
+{
+	spin_lock(lock);
+}
+
+static void spin_unlock_irq(spinlock_t *lock)
+{
+	spin_unlock(lock);
+}
+
+static void spin_lock_irqsave(spinlock_t *lock, unsigned long f)
+{
+	spin_lock(lock);
+}
+
+static void spin_unlock_irqrestore(spinlock_t *lock, unsigned long f)
+{
+	spin_unlock(lock);
+}
+
+#include "../../../include/linux/ptr_ring.h"
+
+static unsigned long long headcnt, tailcnt;
+static struct ptr_ring array ____cacheline_aligned_in_smp;
+
+/* implemented by ring */
+void alloc_ring(void)
+{
+	int ret = ptr_ring_init(&array, ring_size, 0);
+	assert(!ret);
+}
+
+/* guest side */
+int add_inbuf(unsigned len, void *buf, void *datap)
+{
+	int ret;
+
+	ret = __ptr_ring_produce(&array, buf);
+	if (ret >= 0) {
+		ret = 0;
+		headcnt++;
+	}
+
+	return ret;
+}
+
+/*
+ * ptr_ring API provides no way for producer to find out whether a given
+ * buffer was consumed.  Our tests merely require that a successful get_buf
+ * implies that add_inbuf succeed in the past, and that add_inbuf will succeed,
+ * fake it accordingly.
+ */
+void *get_buf(unsigned *lenp, void **bufp)
+{
+	void *datap;
+
+	if (tailcnt == headcnt || __ptr_ring_full(&array))
+		datap = NULL;
+	else {
+		datap = "Buffer\n";
+		++tailcnt;
+	}
+
+	return datap;
+}
+
+void poll_used(void)
+{
+	void *b;
+
+	do {
+		if (tailcnt == headcnt || __ptr_ring_full(&array)) {
+			b = NULL;
+			barrier();
+		} else {
+			b = "Buffer\n";
+		}
+	} while (!b);
+}
+
+void disable_call()
+{
+	assert(0);
+}
+
+bool enable_call()
+{
+	assert(0);
+}
+
+void kick_available(void)
+{
+	assert(0);
+}
+
+/* host side */
+void disable_kick()
+{
+	assert(0);
+}
+
+bool enable_kick()
+{
+	assert(0);
+}
+
+void poll_avail(void)
+{
+	void *b;
+
+	do {
+		barrier();
+		b = __ptr_ring_peek(&array);
+	} while (!b);
+}
+
+bool use_buf(unsigned *lenp, void **bufp)
+{
+	void *ptr;
+
+	ptr = __ptr_ring_consume(&array);
+
+	return ptr;
+}
+
+void call_used(void)
+{
+	assert(0);
+}
diff --git a/tools/virtio/ringtest/Makefile b/tools/virtio/ringtest/Makefile
index 6173ada..877a8a4 100644
--- a/tools/virtio/ringtest/Makefile
+++ b/tools/virtio/ringtest/Makefile
@@ -1,6 +1,6 @@
 all:
 
-all: ring virtio_ring_0_9 virtio_ring_poll virtio_ring_inorder noring
+all: ring virtio_ring_0_9 virtio_ring_poll virtio_ring_inorder ptr_ring noring
 
 CFLAGS += -Wall
 CFLAGS += -pthread -O2 -ggdb
@@ -8,6 +8,7 @@ LDFLAGS += -pthread -O2 -ggdb
 
 main.o: main.c main.h
 ring.o: ring.c main.h
+ptr_ring.o: ptr_ring.c main.h ../../../include/linux/ptr_ring.h
 virtio_ring_0_9.o: virtio_ring_0_9.c main.h
 virtio_ring_poll.o: virtio_ring_poll.c virtio_ring_0_9.c main.h
 virtio_ring_inorder.o: virtio_ring_inorder.c virtio_ring_0_9.c main.h
@@ -15,6 +16,7 @@ ring: ring.o main.o
 virtio_ring_0_9: virtio_ring_0_9.o main.o
 virtio_ring_poll: virtio_ring_poll.o main.o
 virtio_ring_inorder: virtio_ring_inorder.o main.o
+ptr_ring: ptr_ring.o main.o
 noring: noring.o main.o
 clean:
 	-rm main.o
@@ -22,6 +24,7 @@ clean:
 	-rm virtio_ring_0_9.o virtio_ring_0_9
 	-rm virtio_ring_poll.o virtio_ring_poll
 	-rm virtio_ring_inorder.o virtio_ring_inorder
+	-rm ptr_ring.o ptr_ring
 	-rm noring.o noring
 
 .PHONY: all clean
-- 
MST

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH v6 3/3] skb_array: array based FIFO for skbs
  2016-06-01 12:54 [PATCH v6 0/3] skb_array: array based FIFO for skbs Michael S. Tsirkin
  2016-06-01 12:54 ` [PATCH v6 1/3] ptr_ring: array based FIFO for pointers Michael S. Tsirkin
  2016-06-01 12:54 ` [PATCH v6 2/3] ptr_ring: ring test Michael S. Tsirkin
@ 2016-06-01 12:54 ` Michael S. Tsirkin
  2016-06-02  8:17   ` Jason Wang
  2016-06-02  4:51 ` [PATCH v6 0/3] " David Miller
  3 siblings, 1 reply; 8+ messages in thread
From: Michael S. Tsirkin @ 2016-06-01 12:54 UTC (permalink / raw)
  To: linux-kernel
  Cc: Michael S. Tsirkin, Jason Wang, Eric Dumazet, davem, netdev,
	Steven Rostedt, brouer, kvm

A simple array based FIFO of pointers.  Intended for net stack so uses
skbs for type safety. Implemented as a set of wrappers around ptr_array.

Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
---
 include/linux/skb_array.h | 129 ++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 129 insertions(+)
 create mode 100644 include/linux/skb_array.h

diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h
new file mode 100644
index 0000000..cb67076
--- /dev/null
+++ b/include/linux/skb_array.h
@@ -0,0 +1,129 @@
+/*
+ *	Definitions for the 'struct skb_array' datastructure.
+ *
+ *	Author:
+ *		Michael S. Tsirkin <mst@redhat.com>
+ *
+ *	Copyright (C) 2016 Red Hat, Inc.
+ *
+ *	This program is free software; you can redistribute it and/or modify it
+ *	under the terms of the GNU General Public License as published by the
+ *	Free Software Foundation; either version 2 of the License, or (at your
+ *	option) any later version.
+ *
+ *	Limited-size FIFO of skbs. Can be used more or less whenever
+ *	sk_buff_head can be used, except you need to know the queue size in
+ *	advance.
+ *	Implemented as a type-safe wrapper around ptr_ring.
+ */
+
+#ifndef _LINUX_SKB_ARRAY_H
+#define _LINUX_SKB_ARRAY_H 1
+
+#ifdef __KERNEL__
+#include <linux/ptr_ring.h>
+#include <linux/skbuff.h>
+#endif
+
+struct skb_array {
+	struct ptr_ring ring;
+};
+
+/* Might be slightly faster than skb_array_full below, but callers invoking
+ * this in a loop must use a compiler barrier, for example cpu_relax().
+ */
+static inline bool __skb_array_full(struct skb_array *a)
+{
+	return __ptr_ring_full(&a->ring);
+}
+
+static inline bool skb_array_full(struct skb_array *a)
+{
+	return ptr_ring_full(&a->ring);
+}
+
+static inline int skb_array_produce(struct skb_array *a, struct sk_buff *skb)
+{
+	return ptr_ring_produce(&a->ring, skb);
+}
+
+static inline int skb_array_produce_irq(struct skb_array *a, struct sk_buff *skb)
+{
+	return ptr_ring_produce_irq(&a->ring, skb);
+}
+
+static inline int skb_array_produce_bh(struct skb_array *a, struct sk_buff *skb)
+{
+	return ptr_ring_produce_bh(&a->ring, skb);
+}
+
+static inline int skb_array_produce_any(struct skb_array *a, struct sk_buff *skb)
+{
+	return ptr_ring_produce_any(&a->ring, skb);
+}
+
+/* Might be slightly faster than skb_array_empty below, but callers invoking
+ * this in a loop must take care to use a compiler barrier, for example
+ * cpu_relax().
+ */
+static inline bool __skb_array_empty(struct skb_array *a)
+{
+	return !__ptr_ring_peek(&a->ring);
+}
+
+static inline bool skb_array_empty(struct skb_array *a)
+{
+	return ptr_ring_empty(&a->ring);
+}
+
+static inline struct sk_buff *skb_array_consume(struct skb_array *a)
+{
+	return ptr_ring_consume(&a->ring);
+}
+
+static inline struct sk_buff *skb_array_consume_irq(struct skb_array *a)
+{
+	return ptr_ring_consume_irq(&a->ring);
+}
+
+static inline struct sk_buff *skb_array_consume_any(struct skb_array *a)
+{
+	return ptr_ring_consume_any(&a->ring);
+}
+
+static inline struct sk_buff *skb_array_consume_bh(struct skb_array *a)
+{
+	return ptr_ring_consume_bh(&a->ring);
+}
+
+static inline int skb_array_peek_len(struct skb_array *a)
+{
+	return PTR_RING_PEEK_FIELD(&a->ring, struct sk_buff, len, 0);
+}
+
+static inline int skb_array_peek_len_irq(struct skb_array *a)
+{
+	return PTR_RING_PEEK_FIELD_IRQ(&a->ring, struct sk_buff, len, 0);
+}
+
+static inline int skb_array_peek_len_bh(struct skb_array *a)
+{
+	return PTR_RING_PEEK_FIELD_BH(&a->ring, struct sk_buff, len, 0);
+}
+
+static inline int skb_array_peek_len_any(struct skb_array *a)
+{
+	return PTR_RING_PEEK_FIELD_ANY(&a->ring, struct sk_buff, len, 0);
+}
+
+static inline int skb_array_init(struct skb_array *a, int size, gfp_t gfp)
+{
+	return ptr_ring_init(&a->ring, size, gfp);
+}
+
+static inline void skb_array_cleanup(struct skb_array *a)
+{
+	ptr_ring_cleanup(&a->ring);
+}
+
+#endif /* _LINUX_SKB_ARRAY_H  */
-- 
MST

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* Re: [PATCH v6 0/3] skb_array: array based FIFO for skbs
  2016-06-01 12:54 [PATCH v6 0/3] skb_array: array based FIFO for skbs Michael S. Tsirkin
                   ` (2 preceding siblings ...)
  2016-06-01 12:54 ` [PATCH v6 3/3] skb_array: array based FIFO for skbs Michael S. Tsirkin
@ 2016-06-02  4:51 ` David Miller
  2016-06-02  8:13   ` Jason Wang
  3 siblings, 1 reply; 8+ messages in thread
From: David Miller @ 2016-06-02  4:51 UTC (permalink / raw)
  To: mst; +Cc: linux-kernel, jasowang, eric.dumazet, netdev, rostedt, brouer, kvm

From: "Michael S. Tsirkin" <mst@redhat.com>
Date: Wed, 1 Jun 2016 15:54:34 +0300

> This is in response to the proposal by Jason to make tun
> rx packet queue lockless using a circular buffer.
> My testing seems to show that at least for the common usecase
> in networking, which isn't lockless, circular buffer
> with indices does not perform that well, because
> each index access causes a cache line to bounce between
> CPUs, and index access causes stalls due to the dependency.
> 
> By comparison, an array of pointers where NULL means invalid
> and !NULL means valid, can be updated without messing up barriers
> at all and does not have this issue.
> 
> On the flip side, cache pressure may be caused by using large queues.
> tun has a queue of 1000 entries by default and that's 8K.
> At this point I'm not sure this can be solved efficiently.
> The correct solution might be sizing the queues appropriately.
> 
> Here's an implementation of this idea: it can be used more
> or less whenever sk_buff_head can be used, except you need
> to know the queue size in advance.
 ...

I have no fundamental issues with this piece of infrastructure, but when
it gets included I want this series to include at least one use case.

This can be an adaptation of Jason's tun rx packet queue changes, or
similar.

Thanks.

^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [PATCH v6 0/3] skb_array: array based FIFO for skbs
  2016-06-02  4:51 ` [PATCH v6 0/3] " David Miller
@ 2016-06-02  8:13   ` Jason Wang
  0 siblings, 0 replies; 8+ messages in thread
From: Jason Wang @ 2016-06-02  8:13 UTC (permalink / raw)
  To: David Miller, mst
  Cc: linux-kernel, eric.dumazet, netdev, rostedt, brouer, kvm



On 2016年06月02日 12:51, David Miller wrote:
> From: "Michael S. Tsirkin" <mst@redhat.com>
> Date: Wed, 1 Jun 2016 15:54:34 +0300
>
>> This is in response to the proposal by Jason to make tun
>> rx packet queue lockless using a circular buffer.
>> My testing seems to show that at least for the common usecase
>> in networking, which isn't lockless, circular buffer
>> with indices does not perform that well, because
>> each index access causes a cache line to bounce between
>> CPUs, and index access causes stalls due to the dependency.
>>
>> By comparison, an array of pointers where NULL means invalid
>> and !NULL means valid, can be updated without messing up barriers
>> at all and does not have this issue.
>>
>> On the flip side, cache pressure may be caused by using large queues.
>> tun has a queue of 1000 entries by default and that's 8K.
>> At this point I'm not sure this can be solved efficiently.
>> The correct solution might be sizing the queues appropriately.
>>
>> Here's an implementation of this idea: it can be used more
>> or less whenever sk_buff_head can be used, except you need
>> to know the queue size in advance.
>   ...
>
> I have no fundamental issues with this piece of infrastructure, but when
> it gets included I want this series to include at least one use case.
>
> This can be an adaptation of Jason's tun rx packet queue changes, or
> similar.
>
> Thanks.

Right, I'm working on using skb array for tun, will post the patch in 
the following days.

Thanks

^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [PATCH v6 3/3] skb_array: array based FIFO for skbs
  2016-06-01 12:54 ` [PATCH v6 3/3] skb_array: array based FIFO for skbs Michael S. Tsirkin
@ 2016-06-02  8:17   ` Jason Wang
  2016-06-02  9:27     ` Michael S. Tsirkin
  0 siblings, 1 reply; 8+ messages in thread
From: Jason Wang @ 2016-06-02  8:17 UTC (permalink / raw)
  To: Michael S. Tsirkin, linux-kernel
  Cc: Eric Dumazet, davem, netdev, Steven Rostedt, brouer, kvm



On 2016年06月01日 20:54, Michael S. Tsirkin wrote:
> A simple array based FIFO of pointers.  Intended for net stack so uses
> skbs for type safety. Implemented as a set of wrappers around ptr_array.
>
> Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
> ---
>   include/linux/skb_array.h | 129 ++++++++++++++++++++++++++++++++++++++++++++++
>   1 file changed, 129 insertions(+)
>   create mode 100644 include/linux/skb_array.h
>
> diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h
> new file mode 100644
> index 0000000..cb67076
> --- /dev/null
> +++ b/include/linux/skb_array.h
> @@ -0,0 +1,129 @@
> +/*
> + *	Definitions for the 'struct skb_array' datastructure.
> + *
> + *	Author:
> + *		Michael S. Tsirkin <mst@redhat.com>
> + *
> + *	Copyright (C) 2016 Red Hat, Inc.
> + *
> + *	This program is free software; you can redistribute it and/or modify it
> + *	under the terms of the GNU General Public License as published by the
> + *	Free Software Foundation; either version 2 of the License, or (at your
> + *	option) any later version.
> + *
> + *	Limited-size FIFO of skbs. Can be used more or less whenever
> + *	sk_buff_head can be used, except you need to know the queue size in
> + *	advance.
> + *	Implemented as a type-safe wrapper around ptr_ring.
> + */
> +
> +#ifndef _LINUX_SKB_ARRAY_H
> +#define _LINUX_SKB_ARRAY_H 1
> +
> +#ifdef __KERNEL__
> +#include <linux/ptr_ring.h>
> +#include <linux/skbuff.h>
> +#endif
> +
> +struct skb_array {
> +	struct ptr_ring ring;
> +};
> +
> +/* Might be slightly faster than skb_array_full below, but callers invoking
> + * this in a loop must use a compiler barrier, for example cpu_relax().
> + */
> +static inline bool __skb_array_full(struct skb_array *a)
> +{
> +	return __ptr_ring_full(&a->ring);
> +}
> +
> +static inline bool skb_array_full(struct skb_array *a)
> +{
> +	return ptr_ring_full(&a->ring);
> +}
> +
> +static inline int skb_array_produce(struct skb_array *a, struct sk_buff *skb)
> +{
> +	return ptr_ring_produce(&a->ring, skb);
> +}
> +
> +static inline int skb_array_produce_irq(struct skb_array *a, struct sk_buff *skb)
> +{
> +	return ptr_ring_produce_irq(&a->ring, skb);
> +}
> +
> +static inline int skb_array_produce_bh(struct skb_array *a, struct sk_buff *skb)
> +{
> +	return ptr_ring_produce_bh(&a->ring, skb);
> +}
> +
> +static inline int skb_array_produce_any(struct skb_array *a, struct sk_buff *skb)
> +{
> +	return ptr_ring_produce_any(&a->ring, skb);
> +}
> +
> +/* Might be slightly faster than skb_array_empty below, but callers invoking
> + * this in a loop must take care to use a compiler barrier, for example
> + * cpu_relax().
> + */
> +static inline bool __skb_array_empty(struct skb_array *a)
> +{
> +	return !__ptr_ring_peek(&a->ring);
> +}
> +
> +static inline bool skb_array_empty(struct skb_array *a)
> +{
> +	return ptr_ring_empty(&a->ring);
> +}
> +
> +static inline struct sk_buff *skb_array_consume(struct skb_array *a)
> +{
> +	return ptr_ring_consume(&a->ring);
> +}
> +
> +static inline struct sk_buff *skb_array_consume_irq(struct skb_array *a)
> +{
> +	return ptr_ring_consume_irq(&a->ring);
> +}
> +
> +static inline struct sk_buff *skb_array_consume_any(struct skb_array *a)
> +{
> +	return ptr_ring_consume_any(&a->ring);
> +}
> +
> +static inline struct sk_buff *skb_array_consume_bh(struct skb_array *a)
> +{
> +	return ptr_ring_consume_bh(&a->ring);
> +}
> +
> +static inline int skb_array_peek_len(struct skb_array *a)
> +{
> +	return PTR_RING_PEEK_FIELD(&a->ring, struct sk_buff, len, 0);
> +}

An interesting case for tun is peek the length of a tagged packet. In 
this case, len should be increased accordingly. So this simple filed 
peeking won't work.

Do we need a callback here?

> +
> +static inline int skb_array_peek_len_irq(struct skb_array *a)
> +{
> +	return PTR_RING_PEEK_FIELD_IRQ(&a->ring, struct sk_buff, len, 0);
> +}
> +
> +static inline int skb_array_peek_len_bh(struct skb_array *a)
> +{
> +	return PTR_RING_PEEK_FIELD_BH(&a->ring, struct sk_buff, len, 0);
> +}
> +
> +static inline int skb_array_peek_len_any(struct skb_array *a)
> +{
> +	return PTR_RING_PEEK_FIELD_ANY(&a->ring, struct sk_buff, len, 0);
> +}
> +
> +static inline int skb_array_init(struct skb_array *a, int size, gfp_t gfp)
> +{
> +	return ptr_ring_init(&a->ring, size, gfp);
> +}
> +
> +static inline void skb_array_cleanup(struct skb_array *a)
> +{
> +	ptr_ring_cleanup(&a->ring);
> +}
> +
> +#endif /* _LINUX_SKB_ARRAY_H  */

^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [PATCH v6 3/3] skb_array: array based FIFO for skbs
  2016-06-02  8:17   ` Jason Wang
@ 2016-06-02  9:27     ` Michael S. Tsirkin
  0 siblings, 0 replies; 8+ messages in thread
From: Michael S. Tsirkin @ 2016-06-02  9:27 UTC (permalink / raw)
  To: Jason Wang
  Cc: linux-kernel, Eric Dumazet, davem, netdev, Steven Rostedt, brouer, kvm

On Thu, Jun 02, 2016 at 04:17:02PM +0800, Jason Wang wrote:
> 
> 
> On 2016年06月01日 20:54, Michael S. Tsirkin wrote:
> >A simple array based FIFO of pointers.  Intended for net stack so uses
> >skbs for type safety. Implemented as a set of wrappers around ptr_array.
> >
> >Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
> >---
> >  include/linux/skb_array.h | 129 ++++++++++++++++++++++++++++++++++++++++++++++
> >  1 file changed, 129 insertions(+)
> >  create mode 100644 include/linux/skb_array.h
> >
> >diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h
> >new file mode 100644
> >index 0000000..cb67076
> >--- /dev/null
> >+++ b/include/linux/skb_array.h
> >@@ -0,0 +1,129 @@
> >+/*
> >+ *	Definitions for the 'struct skb_array' datastructure.
> >+ *
> >+ *	Author:
> >+ *		Michael S. Tsirkin <mst@redhat.com>
> >+ *
> >+ *	Copyright (C) 2016 Red Hat, Inc.
> >+ *
> >+ *	This program is free software; you can redistribute it and/or modify it
> >+ *	under the terms of the GNU General Public License as published by the
> >+ *	Free Software Foundation; either version 2 of the License, or (at your
> >+ *	option) any later version.
> >+ *
> >+ *	Limited-size FIFO of skbs. Can be used more or less whenever
> >+ *	sk_buff_head can be used, except you need to know the queue size in
> >+ *	advance.
> >+ *	Implemented as a type-safe wrapper around ptr_ring.
> >+ */
> >+
> >+#ifndef _LINUX_SKB_ARRAY_H
> >+#define _LINUX_SKB_ARRAY_H 1
> >+
> >+#ifdef __KERNEL__
> >+#include <linux/ptr_ring.h>
> >+#include <linux/skbuff.h>
> >+#endif
> >+
> >+struct skb_array {
> >+	struct ptr_ring ring;
> >+};
> >+
> >+/* Might be slightly faster than skb_array_full below, but callers invoking
> >+ * this in a loop must use a compiler barrier, for example cpu_relax().
> >+ */
> >+static inline bool __skb_array_full(struct skb_array *a)
> >+{
> >+	return __ptr_ring_full(&a->ring);
> >+}
> >+
> >+static inline bool skb_array_full(struct skb_array *a)
> >+{
> >+	return ptr_ring_full(&a->ring);
> >+}
> >+
> >+static inline int skb_array_produce(struct skb_array *a, struct sk_buff *skb)
> >+{
> >+	return ptr_ring_produce(&a->ring, skb);
> >+}
> >+
> >+static inline int skb_array_produce_irq(struct skb_array *a, struct sk_buff *skb)
> >+{
> >+	return ptr_ring_produce_irq(&a->ring, skb);
> >+}
> >+
> >+static inline int skb_array_produce_bh(struct skb_array *a, struct sk_buff *skb)
> >+{
> >+	return ptr_ring_produce_bh(&a->ring, skb);
> >+}
> >+
> >+static inline int skb_array_produce_any(struct skb_array *a, struct sk_buff *skb)
> >+{
> >+	return ptr_ring_produce_any(&a->ring, skb);
> >+}
> >+
> >+/* Might be slightly faster than skb_array_empty below, but callers invoking
> >+ * this in a loop must take care to use a compiler barrier, for example
> >+ * cpu_relax().
> >+ */
> >+static inline bool __skb_array_empty(struct skb_array *a)
> >+{
> >+	return !__ptr_ring_peek(&a->ring);
> >+}
> >+
> >+static inline bool skb_array_empty(struct skb_array *a)
> >+{
> >+	return ptr_ring_empty(&a->ring);
> >+}
> >+
> >+static inline struct sk_buff *skb_array_consume(struct skb_array *a)
> >+{
> >+	return ptr_ring_consume(&a->ring);
> >+}
> >+
> >+static inline struct sk_buff *skb_array_consume_irq(struct skb_array *a)
> >+{
> >+	return ptr_ring_consume_irq(&a->ring);
> >+}
> >+
> >+static inline struct sk_buff *skb_array_consume_any(struct skb_array *a)
> >+{
> >+	return ptr_ring_consume_any(&a->ring);
> >+}
> >+
> >+static inline struct sk_buff *skb_array_consume_bh(struct skb_array *a)
> >+{
> >+	return ptr_ring_consume_bh(&a->ring);
> >+}
> >+
> >+static inline int skb_array_peek_len(struct skb_array *a)
> >+{
> >+	return PTR_RING_PEEK_FIELD(&a->ring, struct sk_buff, len, 0);
> >+}
> 
> An interesting case for tun is peek the length of a tagged packet. In this
> case, len should be increased accordingly. So this simple filed peeking
> won't work.
> 
> Do we need a callback here?

I'd say let's just make it do the right thing wrt vlans.
I'll post v7 with this change.



> >+
> >+static inline int skb_array_peek_len_irq(struct skb_array *a)
> >+{
> >+	return PTR_RING_PEEK_FIELD_IRQ(&a->ring, struct sk_buff, len, 0);
> >+}
> >+
> >+static inline int skb_array_peek_len_bh(struct skb_array *a)
> >+{
> >+	return PTR_RING_PEEK_FIELD_BH(&a->ring, struct sk_buff, len, 0);
> >+}
> >+
> >+static inline int skb_array_peek_len_any(struct skb_array *a)
> >+{
> >+	return PTR_RING_PEEK_FIELD_ANY(&a->ring, struct sk_buff, len, 0);
> >+}
> >+
> >+static inline int skb_array_init(struct skb_array *a, int size, gfp_t gfp)
> >+{
> >+	return ptr_ring_init(&a->ring, size, gfp);
> >+}
> >+
> >+static inline void skb_array_cleanup(struct skb_array *a)
> >+{
> >+	ptr_ring_cleanup(&a->ring);
> >+}
> >+
> >+#endif /* _LINUX_SKB_ARRAY_H  */

^ permalink raw reply	[flat|nested] 8+ messages in thread

end of thread, other threads:[~2016-06-02  9:27 UTC | newest]

Thread overview: 8+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-06-01 12:54 [PATCH v6 0/3] skb_array: array based FIFO for skbs Michael S. Tsirkin
2016-06-01 12:54 ` [PATCH v6 1/3] ptr_ring: array based FIFO for pointers Michael S. Tsirkin
2016-06-01 12:54 ` [PATCH v6 2/3] ptr_ring: ring test Michael S. Tsirkin
2016-06-01 12:54 ` [PATCH v6 3/3] skb_array: array based FIFO for skbs Michael S. Tsirkin
2016-06-02  8:17   ` Jason Wang
2016-06-02  9:27     ` Michael S. Tsirkin
2016-06-02  4:51 ` [PATCH v6 0/3] " David Miller
2016-06-02  8:13   ` Jason Wang

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.