linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH v8 1/5] ptr_ring: array based FIFO for pointers
       [not found] <1465851234-13558-1-git-send-email-mst@redhat.com>
@ 2016-06-13 20:54 ` Michael S. Tsirkin
  2016-06-14 12:02   ` Jesper Dangaard Brouer
  2016-06-13 20:54 ` [PATCH v8 2/5] ptr_ring: ring test Michael S. Tsirkin
                   ` (3 subsequent siblings)
  4 siblings, 1 reply; 9+ messages in thread
From: Michael S. Tsirkin @ 2016-06-13 20:54 UTC (permalink / raw)
  To: linux-kernel
  Cc: Jason Wang, Eric Dumazet, davem, netdev, Steven Rostedt, brouer, kvm

A simple array based FIFO of pointers.  Intended for net stack which
commonly has a single consumer/producer.

Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
---
 include/linux/ptr_ring.h | 264 +++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 264 insertions(+)
 create mode 100644 include/linux/ptr_ring.h

diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
new file mode 100644
index 0000000..633406f
--- /dev/null
+++ b/include/linux/ptr_ring.h
@@ -0,0 +1,264 @@
+/*
+ *	Definitions for the 'struct ptr_ring' datastructure.
+ *
+ *	Author:
+ *		Michael S. Tsirkin <mst@redhat.com>
+ *
+ *	Copyright (C) 2016 Red Hat, Inc.
+ *
+ *	This program is free software; you can redistribute it and/or modify it
+ *	under the terms of the GNU General Public License as published by the
+ *	Free Software Foundation; either version 2 of the License, or (at your
+ *	option) any later version.
+ *
+ *	This is a limited-size FIFO maintaining pointers in FIFO order, with
+ *	one CPU producing entries and another consuming entries from a FIFO.
+ *
+ *	This implementation tries to minimize cache-contention when there is a
+ *	single producer and a single consumer CPU.
+ */
+
+#ifndef _LINUX_PTR_RING_H
+#define _LINUX_PTR_RING_H 1
+
+#ifdef __KERNEL__
+#include <linux/spinlock.h>
+#include <linux/cache.h>
+#include <linux/types.h>
+#include <linux/compiler.h>
+#include <linux/cache.h>
+#include <linux/slab.h>
+#include <asm/errno.h>
+#endif
+
+struct ptr_ring {
+	int producer ____cacheline_aligned_in_smp;
+	spinlock_t producer_lock;
+	int consumer ____cacheline_aligned_in_smp;
+	spinlock_t consumer_lock;
+	/* Shared consumer/producer data */
+	/* Read-only by both the producer and the consumer */
+	int size ____cacheline_aligned_in_smp; /* max entries in queue */
+	void **queue;
+};
+
+/* Note: callers invoking this in a loop must use a compiler barrier,
+ * for example cpu_relax().
+ * Callers don't need to take producer lock - if they don't
+ * the next call to __ptr_ring_produce may fail.
+ */
+static inline bool __ptr_ring_full(struct ptr_ring *r)
+{
+	return r->queue[r->producer];
+}
+
+static inline bool ptr_ring_full(struct ptr_ring *r)
+{
+	barrier();
+	return __ptr_ring_full(r);
+}
+
+/* Note: callers invoking this in a loop must use a compiler barrier,
+ * for example cpu_relax().
+ */
+static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
+{
+	if (__ptr_ring_full(r))
+		return -ENOSPC;
+
+	r->queue[r->producer++] = ptr;
+	if (unlikely(r->producer >= r->size))
+		r->producer = 0;
+	return 0;
+}
+
+static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
+{
+	int ret;
+
+	spin_lock(&r->producer_lock);
+	ret = __ptr_ring_produce(r, ptr);
+	spin_unlock(&r->producer_lock);
+
+	return ret;
+}
+
+static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
+{
+	int ret;
+
+	spin_lock_irq(&r->producer_lock);
+	ret = __ptr_ring_produce(r, ptr);
+	spin_unlock_irq(&r->producer_lock);
+
+	return ret;
+}
+
+static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
+{
+	unsigned long flags;
+	int ret;
+
+	spin_lock_irqsave(&r->producer_lock, flags);
+	ret = __ptr_ring_produce(r, ptr);
+	spin_unlock_irqrestore(&r->producer_lock, flags);
+
+	return ret;
+}
+
+static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
+{
+	int ret;
+
+	spin_lock_bh(&r->producer_lock);
+	ret = __ptr_ring_produce(r, ptr);
+	spin_unlock_bh(&r->producer_lock);
+
+	return ret;
+}
+
+/* Note: callers invoking this in a loop must use a compiler barrier,
+ * for example cpu_relax(). Callers must take consumer_lock
+ * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL.
+ * There's no need for a lock if pointer is merely tested - see e.g.
+ * ptr_ring_empty.
+ */
+static inline void *__ptr_ring_peek(struct ptr_ring *r)
+{
+	return r->queue[r->consumer];
+}
+
+static inline bool ptr_ring_empty(struct ptr_ring *r)
+{
+	barrier();
+	return !__ptr_ring_peek(r);
+}
+
+/* Must only be called after __ptr_ring_peek returned !NULL */
+static inline void __ptr_ring_discard_one(struct ptr_ring *r)
+{
+	r->queue[r->consumer++] = NULL;
+	if (unlikely(r->consumer >= r->size))
+		r->consumer = 0;
+}
+
+static inline void *__ptr_ring_consume(struct ptr_ring *r)
+{
+	void *ptr;
+
+	ptr = __ptr_ring_peek(r);
+	if (ptr)
+		__ptr_ring_discard_one(r);
+
+	return ptr;
+}
+
+static inline void *ptr_ring_consume(struct ptr_ring *r)
+{
+	void *ptr;
+
+	spin_lock(&r->consumer_lock);
+	ptr = __ptr_ring_consume(r);
+	spin_unlock(&r->consumer_lock);
+
+	return ptr;
+}
+
+static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
+{
+	void *ptr;
+
+	spin_lock_irq(&r->consumer_lock);
+	ptr = __ptr_ring_consume(r);
+	spin_unlock_irq(&r->consumer_lock);
+
+	return ptr;
+}
+
+static inline void *ptr_ring_consume_any(struct ptr_ring *r)
+{
+	unsigned long flags;
+	void *ptr;
+
+	spin_lock_irqsave(&r->consumer_lock, flags);
+	ptr = __ptr_ring_consume(r);
+	spin_unlock_irqrestore(&r->consumer_lock, flags);
+
+	return ptr;
+}
+
+static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
+{
+	void *ptr;
+
+	spin_lock_bh(&r->consumer_lock);
+	ptr = __ptr_ring_consume(r);
+	spin_unlock_bh(&r->consumer_lock);
+
+	return ptr;
+}
+
+/* Cast to structure type and call a function without discarding from FIFO.
+ * Function must return a value.
+ * Callers must take consumer_lock.
+ */
+#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
+
+#define PTR_RING_PEEK_CALL(r, f) ({ \
+	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
+	\
+	spin_lock(&(r)->consumer_lock); \
+	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
+	spin_unlock(&(r)->consumer_lock); \
+	__PTR_RING_PEEK_CALL_v; \
+})
+
+#define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
+	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
+	\
+	spin_lock_irq(&(r)->consumer_lock); \
+	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
+	spin_unlock_irq(&(r)->consumer_lock); \
+	__PTR_RING_PEEK_CALL_v; \
+})
+
+#define PTR_RING_PEEK_CALL_BH(r, f) ({ \
+	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
+	\
+	spin_lock_bh(&(r)->consumer_lock); \
+	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
+	spin_unlock_bh(&(r)->consumer_lock); \
+	__PTR_RING_PEEK_CALL_v; \
+})
+
+#define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
+	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
+	unsigned long __PTR_RING_PEEK_CALL_f;\
+	\
+	spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
+	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
+	spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
+	__PTR_RING_PEEK_CALL_v; \
+})
+
+static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
+{
+	r->queue = kzalloc(ALIGN(size * sizeof *(r->queue), SMP_CACHE_BYTES),
+			   gfp);
+	if (!r->queue)
+		return -ENOMEM;
+
+	r->size = size;
+	r->producer = r->consumer = 0;
+	spin_lock_init(&r->producer_lock);
+	spin_lock_init(&r->consumer_lock);
+
+	return 0;
+}
+
+static inline void ptr_ring_cleanup(struct ptr_ring *r)
+{
+	kfree(r->queue);
+}
+
+#endif /* _LINUX_PTR_RING_H  */
-- 
MST

^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH v8 2/5] ptr_ring: ring test
       [not found] <1465851234-13558-1-git-send-email-mst@redhat.com>
  2016-06-13 20:54 ` [PATCH v8 1/5] ptr_ring: array based FIFO for pointers Michael S. Tsirkin
@ 2016-06-13 20:54 ` Michael S. Tsirkin
  2016-06-13 20:54 ` [PATCH v8 3/5] skb_array: array based FIFO for skbs Michael S. Tsirkin
                   ` (2 subsequent siblings)
  4 siblings, 0 replies; 9+ messages in thread
From: Michael S. Tsirkin @ 2016-06-13 20:54 UTC (permalink / raw)
  To: linux-kernel
  Cc: Jason Wang, Eric Dumazet, davem, netdev, Steven Rostedt, brouer,
	kvm, virtualization

Add ringtest based unit test for ptr ring.

Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
---
 tools/virtio/ringtest/ptr_ring.c | 192 +++++++++++++++++++++++++++++++++++++++
 tools/virtio/ringtest/Makefile   |   5 +-
 2 files changed, 196 insertions(+), 1 deletion(-)
 create mode 100644 tools/virtio/ringtest/ptr_ring.c

diff --git a/tools/virtio/ringtest/ptr_ring.c b/tools/virtio/ringtest/ptr_ring.c
new file mode 100644
index 0000000..74abd74
--- /dev/null
+++ b/tools/virtio/ringtest/ptr_ring.c
@@ -0,0 +1,192 @@
+#define _GNU_SOURCE
+#include "main.h"
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <pthread.h>
+#include <malloc.h>
+#include <assert.h>
+#include <errno.h>
+#include <limits.h>
+
+#define SMP_CACHE_BYTES 64
+#define cache_line_size() SMP_CACHE_BYTES
+#define ____cacheline_aligned_in_smp __attribute__ ((aligned (SMP_CACHE_BYTES)))
+#define unlikely(x)    (__builtin_expect(!!(x), 0))
+#define ALIGN(x, a) (((x) + (a) - 1) / (a) * (a))
+typedef pthread_spinlock_t  spinlock_t;
+
+typedef int gfp_t;
+static void *kzalloc(unsigned size, gfp_t gfp)
+{
+	void *p = memalign(64, size);
+	if (!p)
+		return p;
+	memset(p, 0, size);
+
+	return p;
+}
+
+static void kfree(void *p)
+{
+	if (p)
+		free(p);
+}
+
+static void spin_lock_init(spinlock_t *lock)
+{
+	int r = pthread_spin_init(lock, 0);
+	assert(!r);
+}
+
+static void spin_lock(spinlock_t *lock)
+{
+	int ret = pthread_spin_lock(lock);
+	assert(!ret);
+}
+
+static void spin_unlock(spinlock_t *lock)
+{
+	int ret = pthread_spin_unlock(lock);
+	assert(!ret);
+}
+
+static void spin_lock_bh(spinlock_t *lock)
+{
+	spin_lock(lock);
+}
+
+static void spin_unlock_bh(spinlock_t *lock)
+{
+	spin_unlock(lock);
+}
+
+static void spin_lock_irq(spinlock_t *lock)
+{
+	spin_lock(lock);
+}
+
+static void spin_unlock_irq(spinlock_t *lock)
+{
+	spin_unlock(lock);
+}
+
+static void spin_lock_irqsave(spinlock_t *lock, unsigned long f)
+{
+	spin_lock(lock);
+}
+
+static void spin_unlock_irqrestore(spinlock_t *lock, unsigned long f)
+{
+	spin_unlock(lock);
+}
+
+#include "../../../include/linux/ptr_ring.h"
+
+static unsigned long long headcnt, tailcnt;
+static struct ptr_ring array ____cacheline_aligned_in_smp;
+
+/* implemented by ring */
+void alloc_ring(void)
+{
+	int ret = ptr_ring_init(&array, ring_size, 0);
+	assert(!ret);
+}
+
+/* guest side */
+int add_inbuf(unsigned len, void *buf, void *datap)
+{
+	int ret;
+
+	ret = __ptr_ring_produce(&array, buf);
+	if (ret >= 0) {
+		ret = 0;
+		headcnt++;
+	}
+
+	return ret;
+}
+
+/*
+ * ptr_ring API provides no way for producer to find out whether a given
+ * buffer was consumed.  Our tests merely require that a successful get_buf
+ * implies that add_inbuf succeed in the past, and that add_inbuf will succeed,
+ * fake it accordingly.
+ */
+void *get_buf(unsigned *lenp, void **bufp)
+{
+	void *datap;
+
+	if (tailcnt == headcnt || __ptr_ring_full(&array))
+		datap = NULL;
+	else {
+		datap = "Buffer\n";
+		++tailcnt;
+	}
+
+	return datap;
+}
+
+void poll_used(void)
+{
+	void *b;
+
+	do {
+		if (tailcnt == headcnt || __ptr_ring_full(&array)) {
+			b = NULL;
+			barrier();
+		} else {
+			b = "Buffer\n";
+		}
+	} while (!b);
+}
+
+void disable_call()
+{
+	assert(0);
+}
+
+bool enable_call()
+{
+	assert(0);
+}
+
+void kick_available(void)
+{
+	assert(0);
+}
+
+/* host side */
+void disable_kick()
+{
+	assert(0);
+}
+
+bool enable_kick()
+{
+	assert(0);
+}
+
+void poll_avail(void)
+{
+	void *b;
+
+	do {
+		barrier();
+		b = __ptr_ring_peek(&array);
+	} while (!b);
+}
+
+bool use_buf(unsigned *lenp, void **bufp)
+{
+	void *ptr;
+
+	ptr = __ptr_ring_consume(&array);
+
+	return ptr;
+}
+
+void call_used(void)
+{
+	assert(0);
+}
diff --git a/tools/virtio/ringtest/Makefile b/tools/virtio/ringtest/Makefile
index 6173ada..877a8a4 100644
--- a/tools/virtio/ringtest/Makefile
+++ b/tools/virtio/ringtest/Makefile
@@ -1,6 +1,6 @@
 all:
 
-all: ring virtio_ring_0_9 virtio_ring_poll virtio_ring_inorder noring
+all: ring virtio_ring_0_9 virtio_ring_poll virtio_ring_inorder ptr_ring noring
 
 CFLAGS += -Wall
 CFLAGS += -pthread -O2 -ggdb
@@ -8,6 +8,7 @@ LDFLAGS += -pthread -O2 -ggdb
 
 main.o: main.c main.h
 ring.o: ring.c main.h
+ptr_ring.o: ptr_ring.c main.h ../../../include/linux/ptr_ring.h
 virtio_ring_0_9.o: virtio_ring_0_9.c main.h
 virtio_ring_poll.o: virtio_ring_poll.c virtio_ring_0_9.c main.h
 virtio_ring_inorder.o: virtio_ring_inorder.c virtio_ring_0_9.c main.h
@@ -15,6 +16,7 @@ ring: ring.o main.o
 virtio_ring_0_9: virtio_ring_0_9.o main.o
 virtio_ring_poll: virtio_ring_poll.o main.o
 virtio_ring_inorder: virtio_ring_inorder.o main.o
+ptr_ring: ptr_ring.o main.o
 noring: noring.o main.o
 clean:
 	-rm main.o
@@ -22,6 +24,7 @@ clean:
 	-rm virtio_ring_0_9.o virtio_ring_0_9
 	-rm virtio_ring_poll.o virtio_ring_poll
 	-rm virtio_ring_inorder.o virtio_ring_inorder
+	-rm ptr_ring.o ptr_ring
 	-rm noring.o noring
 
 .PHONY: all clean
-- 
MST

^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH v8 3/5] skb_array: array based FIFO for skbs
       [not found] <1465851234-13558-1-git-send-email-mst@redhat.com>
  2016-06-13 20:54 ` [PATCH v8 1/5] ptr_ring: array based FIFO for pointers Michael S. Tsirkin
  2016-06-13 20:54 ` [PATCH v8 2/5] ptr_ring: ring test Michael S. Tsirkin
@ 2016-06-13 20:54 ` Michael S. Tsirkin
  2016-06-14 12:03   ` Jesper Dangaard Brouer
  2016-06-13 20:54 ` [PATCH v8 4/5] ptr_ring: resize support Michael S. Tsirkin
  2016-06-13 20:54 ` [PATCH v8 5/5] skb_array: " Michael S. Tsirkin
  4 siblings, 1 reply; 9+ messages in thread
From: Michael S. Tsirkin @ 2016-06-13 20:54 UTC (permalink / raw)
  To: linux-kernel
  Cc: Jason Wang, Eric Dumazet, davem, netdev, Steven Rostedt, brouer, kvm

A simple array based FIFO of pointers.  Intended for net stack so uses
skbs for type safety. Implemented as a set of wrappers around ptr_ring.

Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
---
 include/linux/skb_array.h | 144 ++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 144 insertions(+)
 create mode 100644 include/linux/skb_array.h

diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h
new file mode 100644
index 0000000..c4c0902
--- /dev/null
+++ b/include/linux/skb_array.h
@@ -0,0 +1,144 @@
+/*
+ *	Definitions for the 'struct skb_array' datastructure.
+ *
+ *	Author:
+ *		Michael S. Tsirkin <mst@redhat.com>
+ *
+ *	Copyright (C) 2016 Red Hat, Inc.
+ *
+ *	This program is free software; you can redistribute it and/or modify it
+ *	under the terms of the GNU General Public License as published by the
+ *	Free Software Foundation; either version 2 of the License, or (at your
+ *	option) any later version.
+ *
+ *	Limited-size FIFO of skbs. Can be used more or less whenever
+ *	sk_buff_head can be used, except you need to know the queue size in
+ *	advance.
+ *	Implemented as a type-safe wrapper around ptr_ring.
+ */
+
+#ifndef _LINUX_SKB_ARRAY_H
+#define _LINUX_SKB_ARRAY_H 1
+
+#ifdef __KERNEL__
+#include <linux/ptr_ring.h>
+#include <linux/skbuff.h>
+#include <linux/if_vlan.h>
+#endif
+
+struct skb_array {
+	struct ptr_ring ring;
+};
+
+/* Might be slightly faster than skb_array_full below, but callers invoking
+ * this in a loop must use a compiler barrier, for example cpu_relax().
+ */
+static inline bool __skb_array_full(struct skb_array *a)
+{
+	return __ptr_ring_full(&a->ring);
+}
+
+static inline bool skb_array_full(struct skb_array *a)
+{
+	return ptr_ring_full(&a->ring);
+}
+
+static inline int skb_array_produce(struct skb_array *a, struct sk_buff *skb)
+{
+	return ptr_ring_produce(&a->ring, skb);
+}
+
+static inline int skb_array_produce_irq(struct skb_array *a, struct sk_buff *skb)
+{
+	return ptr_ring_produce_irq(&a->ring, skb);
+}
+
+static inline int skb_array_produce_bh(struct skb_array *a, struct sk_buff *skb)
+{
+	return ptr_ring_produce_bh(&a->ring, skb);
+}
+
+static inline int skb_array_produce_any(struct skb_array *a, struct sk_buff *skb)
+{
+	return ptr_ring_produce_any(&a->ring, skb);
+}
+
+/* Might be slightly faster than skb_array_empty below, but callers invoking
+ * this in a loop must take care to use a compiler barrier, for example
+ * cpu_relax().
+ */
+static inline bool __skb_array_empty(struct skb_array *a)
+{
+	return !__ptr_ring_peek(&a->ring);
+}
+
+static inline bool skb_array_empty(struct skb_array *a)
+{
+	return ptr_ring_empty(&a->ring);
+}
+
+static inline struct sk_buff *skb_array_consume(struct skb_array *a)
+{
+	return ptr_ring_consume(&a->ring);
+}
+
+static inline struct sk_buff *skb_array_consume_irq(struct skb_array *a)
+{
+	return ptr_ring_consume_irq(&a->ring);
+}
+
+static inline struct sk_buff *skb_array_consume_any(struct skb_array *a)
+{
+	return ptr_ring_consume_any(&a->ring);
+}
+
+static inline struct sk_buff *skb_array_consume_bh(struct skb_array *a)
+{
+	return ptr_ring_consume_bh(&a->ring);
+}
+
+static inline int __skb_array_len_with_tag(struct sk_buff *skb)
+{
+	if (likely(skb)) {
+		int len = skb->len;
+
+		if (skb_vlan_tag_present(skb))
+			len += VLAN_HLEN;
+
+		return len;
+	} else {
+		return 0;
+	}
+}
+
+static inline int skb_array_peek_len(struct skb_array *a)
+{
+	return PTR_RING_PEEK_CALL(&a->ring, __skb_array_len_with_tag);
+}
+
+static inline int skb_array_peek_len_irq(struct skb_array *a)
+{
+	return PTR_RING_PEEK_CALL_IRQ(&a->ring, __skb_array_len_with_tag);
+}
+
+static inline int skb_array_peek_len_bh(struct skb_array *a)
+{
+	return PTR_RING_PEEK_CALL_BH(&a->ring, __skb_array_len_with_tag);
+}
+
+static inline int skb_array_peek_len_any(struct skb_array *a)
+{
+	return PTR_RING_PEEK_CALL_ANY(&a->ring, __skb_array_len_with_tag);
+}
+
+static inline int skb_array_init(struct skb_array *a, int size, gfp_t gfp)
+{
+	return ptr_ring_init(&a->ring, size, gfp);
+}
+
+static inline void skb_array_cleanup(struct skb_array *a)
+{
+	ptr_ring_cleanup(&a->ring);
+}
+
+#endif /* _LINUX_SKB_ARRAY_H  */
-- 
MST

^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH v8 4/5] ptr_ring: resize support
       [not found] <1465851234-13558-1-git-send-email-mst@redhat.com>
                   ` (2 preceding siblings ...)
  2016-06-13 20:54 ` [PATCH v8 3/5] skb_array: array based FIFO for skbs Michael S. Tsirkin
@ 2016-06-13 20:54 ` Michael S. Tsirkin
  2016-06-14 12:04   ` Jesper Dangaard Brouer
  2016-06-13 20:54 ` [PATCH v8 5/5] skb_array: " Michael S. Tsirkin
  4 siblings, 1 reply; 9+ messages in thread
From: Michael S. Tsirkin @ 2016-06-13 20:54 UTC (permalink / raw)
  To: linux-kernel
  Cc: Jason Wang, Eric Dumazet, davem, netdev, Steven Rostedt, brouer, kvm

This adds ring resize support. Seems to be necessary as
users such as tun allow userspace control over queue size.

If resize is used, this costs us ability to peek at queue without
consumer lock - should not be a big deal as peek and consumer are
usually run on the same CPU.

If ring is made bigger, ring contents is preserved.  If ring is made
smaller, extra pointers are passed to an optional destructor callback.

Cleanup function also gains destructor callback such that
all pointers in queue can be cleaned up.

This changes some APIs but we don't have any users yet,
so it won't break bisect.

Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
---
 include/linux/ptr_ring.h | 157 ++++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 143 insertions(+), 14 deletions(-)

diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
index 633406f..562a65e 100644
--- a/include/linux/ptr_ring.h
+++ b/include/linux/ptr_ring.h
@@ -43,9 +43,9 @@ struct ptr_ring {
 };
 
 /* Note: callers invoking this in a loop must use a compiler barrier,
- * for example cpu_relax().
- * Callers don't need to take producer lock - if they don't
- * the next call to __ptr_ring_produce may fail.
+ * for example cpu_relax().  If ring is ever resized, callers must hold
+ * producer_lock - see e.g. ptr_ring_full.  Otherwise, if callers don't hold
+ * producer_lock, the next call to __ptr_ring_produce may fail.
  */
 static inline bool __ptr_ring_full(struct ptr_ring *r)
 {
@@ -54,16 +54,55 @@ static inline bool __ptr_ring_full(struct ptr_ring *r)
 
 static inline bool ptr_ring_full(struct ptr_ring *r)
 {
-	barrier();
-	return __ptr_ring_full(r);
+	bool ret;
+
+	spin_lock(&r->producer_lock);
+	ret = __ptr_ring_full(r);
+	spin_unlock(&r->producer_lock);
+
+	return ret;
+}
+
+static inline bool ptr_ring_full_irq(struct ptr_ring *r)
+{
+	bool ret;
+
+	spin_lock_irq(&r->producer_lock);
+	ret = __ptr_ring_full(r);
+	spin_unlock_irq(&r->producer_lock);
+
+	return ret;
+}
+
+static inline bool ptr_ring_full_any(struct ptr_ring *r)
+{
+	unsigned long flags;
+	bool ret;
+
+	spin_lock_irqsave(&r->producer_lock, flags);
+	ret = __ptr_ring_full(r);
+	spin_unlock_irqrestore(&r->producer_lock, flags);
+
+	return ret;
+}
+
+static inline bool ptr_ring_full_bh(struct ptr_ring *r)
+{
+	bool ret;
+
+	spin_lock_bh(&r->producer_lock);
+	ret = __ptr_ring_full(r);
+	spin_unlock_bh(&r->producer_lock);
+
+	return ret;
 }
 
 /* Note: callers invoking this in a loop must use a compiler barrier,
- * for example cpu_relax().
+ * for example cpu_relax(). Callers must hold producer_lock.
  */
 static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
 {
-	if (__ptr_ring_full(r))
+	if (r->queue[r->producer])
 		return -ENOSPC;
 
 	r->queue[r->producer++] = ptr;
@@ -120,20 +159,68 @@ static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
 /* Note: callers invoking this in a loop must use a compiler barrier,
  * for example cpu_relax(). Callers must take consumer_lock
  * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL.
- * There's no need for a lock if pointer is merely tested - see e.g.
- * ptr_ring_empty.
+ * If ring is never resized, and if the pointer is merely
+ * tested, there's no need to take the lock - see e.g.  __ptr_ring_empty.
  */
 static inline void *__ptr_ring_peek(struct ptr_ring *r)
 {
 	return r->queue[r->consumer];
 }
 
-static inline bool ptr_ring_empty(struct ptr_ring *r)
+/* Note: callers invoking this in a loop must use a compiler barrier,
+ * for example cpu_relax(). Callers must take consumer_lock
+ * if the ring is ever resized - see e.g. ptr_ring_empty.
+ */
+static inline bool __ptr_ring_empty(struct ptr_ring *r)
 {
-	barrier();
 	return !__ptr_ring_peek(r);
 }
 
+static inline bool ptr_ring_empty(struct ptr_ring *r)
+{
+	bool ret;
+
+	spin_lock(&r->consumer_lock);
+	ret = __ptr_ring_empty(r);
+	spin_unlock(&r->consumer_lock);
+
+	return ret;
+}
+
+static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
+{
+	bool ret;
+
+	spin_lock_irq(&r->consumer_lock);
+	ret = __ptr_ring_empty(r);
+	spin_unlock_irq(&r->consumer_lock);
+
+	return ret;
+}
+
+static inline bool ptr_ring_empty_any(struct ptr_ring *r)
+{
+	unsigned long flags;
+	bool ret;
+
+	spin_lock_irqsave(&r->consumer_lock, flags);
+	ret = __ptr_ring_empty(r);
+	spin_unlock_irqrestore(&r->consumer_lock, flags);
+
+	return ret;
+}
+
+static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
+{
+	bool ret;
+
+	spin_lock_bh(&r->consumer_lock);
+	ret = __ptr_ring_empty(r);
+	spin_unlock_bh(&r->consumer_lock);
+
+	return ret;
+}
+
 /* Must only be called after __ptr_ring_peek returned !NULL */
 static inline void __ptr_ring_discard_one(struct ptr_ring *r)
 {
@@ -241,10 +328,14 @@ static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
 	__PTR_RING_PEEK_CALL_v; \
 })
 
+static inline void **__ptr_ring_init_queue_alloc(int size, gfp_t gfp)
+{
+	return kzalloc(ALIGN(size * sizeof(void *), SMP_CACHE_BYTES), gfp);
+}
+
 static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
 {
-	r->queue = kzalloc(ALIGN(size * sizeof *(r->queue), SMP_CACHE_BYTES),
-			   gfp);
+	r->queue = __ptr_ring_init_queue_alloc(size, gfp);
 	if (!r->queue)
 		return -ENOMEM;
 
@@ -256,8 +347,46 @@ static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
 	return 0;
 }
 
-static inline void ptr_ring_cleanup(struct ptr_ring *r)
+static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
+				  void (*destroy)(void *))
+{
+	unsigned long flags;
+	int producer = 0;
+	void **queue = __ptr_ring_init_queue_alloc(size, gfp);
+	void **old;
+	void *ptr;
+
+	if (!queue)
+		return -ENOMEM;
+
+	spin_lock_irqsave(&(r)->producer_lock, flags);
+
+	while ((ptr = ptr_ring_consume(r)))
+		if (producer < size)
+			queue[producer++] = ptr;
+		else if (destroy)
+			destroy(ptr);
+
+	r->size = size;
+	r->producer = producer;
+	r->consumer = 0;
+	old = r->queue;
+	r->queue = queue;
+
+	spin_unlock_irqrestore(&(r)->producer_lock, flags);
+
+	kfree(old);
+
+	return 0;
+}
+
+static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
 {
+	void *ptr;
+
+	if (destroy)
+		while ((ptr = ptr_ring_consume(r)))
+			destroy(ptr);
 	kfree(r->queue);
 }
 
-- 
MST

^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH v8 5/5] skb_array: resize support
       [not found] <1465851234-13558-1-git-send-email-mst@redhat.com>
                   ` (3 preceding siblings ...)
  2016-06-13 20:54 ` [PATCH v8 4/5] ptr_ring: resize support Michael S. Tsirkin
@ 2016-06-13 20:54 ` Michael S. Tsirkin
  2016-06-14 12:21   ` Jesper Dangaard Brouer
  4 siblings, 1 reply; 9+ messages in thread
From: Michael S. Tsirkin @ 2016-06-13 20:54 UTC (permalink / raw)
  To: linux-kernel
  Cc: Jason Wang, Eric Dumazet, davem, netdev, Steven Rostedt, brouer, kvm

Update skb_array after ptr_ring API changes.

Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
---
 include/linux/skb_array.h | 33 +++++++++++++++++++++++++++++----
 1 file changed, 29 insertions(+), 4 deletions(-)

diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h
index c4c0902..678bfbf 100644
--- a/include/linux/skb_array.h
+++ b/include/linux/skb_array.h
@@ -63,9 +63,9 @@ static inline int skb_array_produce_any(struct skb_array *a, struct sk_buff *skb
 	return ptr_ring_produce_any(&a->ring, skb);
 }
 
-/* Might be slightly faster than skb_array_empty below, but callers invoking
- * this in a loop must take care to use a compiler barrier, for example
- * cpu_relax().
+/* Might be slightly faster than skb_array_empty below, but only safe if the
+ * array is never resized. Also, callers invoking this in a loop must take care
+ * to use a compiler barrier, for example cpu_relax().
  */
 static inline bool __skb_array_empty(struct skb_array *a)
 {
@@ -77,6 +77,21 @@ static inline bool skb_array_empty(struct skb_array *a)
 	return ptr_ring_empty(&a->ring);
 }
 
+static inline bool skb_array_empty_bh(struct skb_array *a)
+{
+	return ptr_ring_empty_bh(&a->ring);
+}
+
+static inline bool skb_array_empty_irq(struct skb_array *a)
+{
+	return ptr_ring_empty_irq(&a->ring);
+}
+
+static inline bool skb_array_empty_any(struct skb_array *a)
+{
+	return ptr_ring_empty_any(&a->ring);
+}
+
 static inline struct sk_buff *skb_array_consume(struct skb_array *a)
 {
 	return ptr_ring_consume(&a->ring);
@@ -136,9 +151,19 @@ static inline int skb_array_init(struct skb_array *a, int size, gfp_t gfp)
 	return ptr_ring_init(&a->ring, size, gfp);
 }
 
+void __skb_array_destroy_skb(void *ptr)
+{
+	kfree_skb(ptr);
+}
+
+int skb_array_resize(struct skb_array *a, int size, gfp_t gfp)
+{
+	return ptr_ring_resize(&a->ring, size, gfp, __skb_array_destroy_skb);
+}
+
 static inline void skb_array_cleanup(struct skb_array *a)
 {
-	ptr_ring_cleanup(&a->ring);
+	ptr_ring_cleanup(&a->ring, __skb_array_destroy_skb);
 }
 
 #endif /* _LINUX_SKB_ARRAY_H  */
-- 
MST

^ permalink raw reply related	[flat|nested] 9+ messages in thread

* Re: [PATCH v8 1/5] ptr_ring: array based FIFO for pointers
  2016-06-13 20:54 ` [PATCH v8 1/5] ptr_ring: array based FIFO for pointers Michael S. Tsirkin
@ 2016-06-14 12:02   ` Jesper Dangaard Brouer
  0 siblings, 0 replies; 9+ messages in thread
From: Jesper Dangaard Brouer @ 2016-06-14 12:02 UTC (permalink / raw)
  To: Michael S. Tsirkin
  Cc: linux-kernel, Jason Wang, Eric Dumazet, davem, netdev,
	Steven Rostedt, kvm, brouer

On Mon, 13 Jun 2016 23:54:31 +0300
"Michael S. Tsirkin" <mst@redhat.com> wrote:

> A simple array based FIFO of pointers.  Intended for net stack which
> commonly has a single consumer/producer.
> 
> Signed-off-by: Michael S. Tsirkin <mst@redhat.com>

Acked-by: Jesper Dangaard Brouer <brouer@redhat.com>

-- 
Best regards,
  Jesper Dangaard Brouer
  MSc.CS, Principal Kernel Engineer at Red Hat
  Author of http://www.iptv-analyzer.org
  LinkedIn: http://www.linkedin.com/in/brouer

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH v8 3/5] skb_array: array based FIFO for skbs
  2016-06-13 20:54 ` [PATCH v8 3/5] skb_array: array based FIFO for skbs Michael S. Tsirkin
@ 2016-06-14 12:03   ` Jesper Dangaard Brouer
  0 siblings, 0 replies; 9+ messages in thread
From: Jesper Dangaard Brouer @ 2016-06-14 12:03 UTC (permalink / raw)
  To: Michael S. Tsirkin
  Cc: linux-kernel, Jason Wang, Eric Dumazet, davem, netdev,
	Steven Rostedt, kvm, brouer

On Mon, 13 Jun 2016 23:54:41 +0300
"Michael S. Tsirkin" <mst@redhat.com> wrote:

> A simple array based FIFO of pointers.  Intended for net stack so uses
> skbs for type safety. Implemented as a set of wrappers around ptr_ring.
> 
> Signed-off-by: Michael S. Tsirkin <mst@redhat.com>

Acked-by: Jesper Dangaard Brouer <brouer@redhat.com>
Tested-by: Jesper Dangaard Brouer <brouer@redhat.com>

-- 
Best regards,
  Jesper Dangaard Brouer
  MSc.CS, Principal Kernel Engineer at Red Hat
  Author of http://www.iptv-analyzer.org
  LinkedIn: http://www.linkedin.com/in/brouer

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH v8 4/5] ptr_ring: resize support
  2016-06-13 20:54 ` [PATCH v8 4/5] ptr_ring: resize support Michael S. Tsirkin
@ 2016-06-14 12:04   ` Jesper Dangaard Brouer
  0 siblings, 0 replies; 9+ messages in thread
From: Jesper Dangaard Brouer @ 2016-06-14 12:04 UTC (permalink / raw)
  To: Michael S. Tsirkin
  Cc: linux-kernel, Jason Wang, Eric Dumazet, davem, netdev,
	Steven Rostedt, kvm, brouer

On Mon, 13 Jun 2016 23:54:45 +0300
"Michael S. Tsirkin" <mst@redhat.com> wrote:

> This adds ring resize support. Seems to be necessary as
> users such as tun allow userspace control over queue size.
> 
> If resize is used, this costs us ability to peek at queue without
> consumer lock - should not be a big deal as peek and consumer are
> usually run on the same CPU.
> 
> If ring is made bigger, ring contents is preserved.  If ring is made
> smaller, extra pointers are passed to an optional destructor callback.
> 
> Cleanup function also gains destructor callback such that
> all pointers in queue can be cleaned up.
> 
> This changes some APIs but we don't have any users yet,
> so it won't break bisect.
> 
> Signed-off-by: Michael S. Tsirkin <mst@redhat.com>

Acked-by: Jesper Dangaard Brouer <brouer@redhat.com>

-- 
Best regards,
  Jesper Dangaard Brouer
  MSc.CS, Principal Kernel Engineer at Red Hat
  Author of http://www.iptv-analyzer.org
  LinkedIn: http://www.linkedin.com/in/brouer

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH v8 5/5] skb_array: resize support
  2016-06-13 20:54 ` [PATCH v8 5/5] skb_array: " Michael S. Tsirkin
@ 2016-06-14 12:21   ` Jesper Dangaard Brouer
  0 siblings, 0 replies; 9+ messages in thread
From: Jesper Dangaard Brouer @ 2016-06-14 12:21 UTC (permalink / raw)
  To: Michael S. Tsirkin
  Cc: linux-kernel, Jason Wang, Eric Dumazet, davem, netdev,
	Steven Rostedt, kvm, brouer

On Mon, 13 Jun 2016 23:54:50 +0300
"Michael S. Tsirkin" <mst@redhat.com> wrote:

> Update skb_array after ptr_ring API changes.
> 
> Signed-off-by: Michael S. Tsirkin <mst@redhat.com>

Acked-by: Jesper Dangaard Brouer <brouer@redhat.com>
Tested-by: Jesper Dangaard Brouer <brouer@redhat.com>

Also did resize unit test:
 https://github.com/netoptimizer/prototype-kernel/commit/af0b4d7e7261e9

The parallel benchmark:
 https://github.com/netoptimizer/prototype-kernel/blob/master/kernel/lib/skb_array_parallel01.c

Have been adjusted to use the non-BH variant of the lock.  And the
parallel benchmark results, where a single producer and a single
consumer CPU runs concurrently, and the queue always partly full
(optimal case for minimize cache-contention):

On CPU i7-4790K @ 4.00GHz:
 - Enqueue 32 cycles(tsc) 8.162 ns 
 - Dequeue 33 cycles(tsc) 8.417 ns

Notice this is an extremely good concurrency results, as it is very
close to the optimal case benchmark 26 cycles, when running
enqueue+dequeue on the same CPU in a tight loop[2].  

-- 
Best regards,
  Jesper Dangaard Brouer
  MSc.CS, Principal Kernel Engineer at Red Hat
  Author of http://www.iptv-analyzer.org
  LinkedIn: http://www.linkedin.com/in/brouer

[2] https://github.com/netoptimizer/prototype-kernel/blob/master/kernel/lib/skb_array_bench01.c

^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2016-06-14 12:21 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
     [not found] <1465851234-13558-1-git-send-email-mst@redhat.com>
2016-06-13 20:54 ` [PATCH v8 1/5] ptr_ring: array based FIFO for pointers Michael S. Tsirkin
2016-06-14 12:02   ` Jesper Dangaard Brouer
2016-06-13 20:54 ` [PATCH v8 2/5] ptr_ring: ring test Michael S. Tsirkin
2016-06-13 20:54 ` [PATCH v8 3/5] skb_array: array based FIFO for skbs Michael S. Tsirkin
2016-06-14 12:03   ` Jesper Dangaard Brouer
2016-06-13 20:54 ` [PATCH v8 4/5] ptr_ring: resize support Michael S. Tsirkin
2016-06-14 12:04   ` Jesper Dangaard Brouer
2016-06-13 20:54 ` [PATCH v8 5/5] skb_array: " Michael S. Tsirkin
2016-06-14 12:21   ` Jesper Dangaard Brouer

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).