DPDK-dev Archive on lore.kernel.org
 help / color / Atom feed
* [RFC,v2] lfring: lock-free ring buffer
@ 2019-01-30 16:36 Ola Liljedahl
  2019-06-05 18:21 ` [dpdk-dev] " Eads, Gage
  0 siblings, 1 reply; 4+ messages in thread
From: Ola Liljedahl @ 2019-01-30 16:36 UTC (permalink / raw)
  To: dev, Gage Eads, Honnappa Nagarahalli, bruce.richardson; +Cc: nd, Ola Liljedahl

Lock-free MP/MC ring buffer with SP/SC options.
The ring buffer metadata only maintains one set of head and tail
indexes. Tail is updated on enqueue, head is updated on dequeue.
The ring slots between head and tail always contains valid (unconsumed)
slots.
Each ring slot consists of a struct of data pointer ("ptr") and lap
counter ("lap"). Slot.lap is only updated on enqueue with the new lap
counter (ring index / ring size). The slot lap conuter is used to ensure
that slots are written sequentially (important for FIFO ordering). It is
also used to synchronise multiple producers.

MP enqueue scans the ring from the tail until head+size, looking for an
empty slot as indicated by slot.lap equaling the lap counter of
the previous lap ((index - ring size) / ring size). The empty slot is
written (.ptr = data, .lap = index / ring size) using CAS. If CAS fails,
some other thread wrote the slot and the thread continues with the next slot.
There is some logic to detect that the thread has fallen behind and should
reload the shared tail index instead of just trying the next slot.

When all elements have been enqueued or if the ring buffer is full, the
thread updates the ring buffer tail index (unless this has not already
been updated by some other thread). The test is using "sequence number
arithmetic". Thus this thread will help other threads that have written
ring slots but not yet updated the shared tail index.

If a slot with a lap counter different from current or previous lap is
found, it is assumed that the thread has fallen behind (e.g. been preempted)
and the local tail index is (conditionally) updated from the ring buffer
metadata in order to quickly catch up.

MP dequeue ensures at least N elements are available and reads the slots
from the shared head index and attempts to commit the operation using
CAS on the shared head index.

Enqueue N elements: N+1 CAS operations (but the last CAS operation might
not be necessary during contention as producers will help each other)
Dequeue N elements: 1 CAS operation

Single-threaded (SP/SC) enqueue and dequeue operations will inter-operate
with MT-safe (MP/MC) operations but it is the responsibility of the user
to ensure single-threaded operations are safe to use.

128-bit lock-free atomic compare exchange operations for AArch64
(ARMv8.0) and x86-64 are provided in lockfree16.h. I expect these
functions to find a different home and possibly a different API.
Note that a 'frail' version atomic_compare_exchange is implemented,
this means that in the case of failure, the old value returned is
not guaranteed to be atomically read (as this is not possible on ARMv8.0
without also writing to the location). Atomically read values are
often not necessary, it is a successful compare and exchange operation
which provides atomicity.

Changes from v1:
* Fix check-patch warnings
* Use lap counter instead of ring index in ring slots
* Support for 32-bit architectures
* Prepend '_' to internal functions in rte_lfring.h
* Change name of public MP/MC/SP/SC bulk enqueue/dequeue functions to
  match rte_ring pattern
* Perform only fixed amount of retries in enqueue before reloading
  shared ring buffer tail index to (hopefully) quicker catch up
* Reload shared ring buffer tail on every iteration in dequeue in order
  to avoid spurious queue empty situations
* Dequeue requires nelems element to be available in order to do bulk
  dequeue.
* Use rte_smp_rmb() + atomic store (or cas)-relaxed instead of store
  (or cas)-release in dequeue functions as we only read shared data.
  This avoids ordering stores to private data.

Signed-off-by: Ola Liljedahl <ola.liljedahl@arm.com>
---
 config/common_base                       |   5 +
 lib/Makefile                             |   2 +
 lib/librte_lfring/Makefile               |  22 ++
 lib/librte_lfring/lockfree16.h           | 143 +++++++
 lib/librte_lfring/meson.build            |   6 +
 lib/librte_lfring/rte_lfring.c           | 264 +++++++++++++
 lib/librte_lfring/rte_lfring.h           | 621 +++++++++++++++++++++++++++++++
 lib/librte_lfring/rte_lfring_version.map |  19 +
 8 files changed, 1082 insertions(+)
 create mode 100644 lib/librte_lfring/Makefile
 create mode 100644 lib/librte_lfring/lockfree16.h
 create mode 100644 lib/librte_lfring/meson.build
 create mode 100644 lib/librte_lfring/rte_lfring.c
 create mode 100644 lib/librte_lfring/rte_lfring.h
 create mode 100644 lib/librte_lfring/rte_lfring_version.map

diff --git a/config/common_base b/config/common_base
index 7c6da51..375f9c3 100644
--- a/config/common_base
+++ b/config/common_base
@@ -719,6 +719,11 @@ CONFIG_RTE_LIBRTE_PMD_IFPGA_RAWDEV=y
 CONFIG_RTE_LIBRTE_RING=y
 
 #
+# Compile librte_lfring
+#
+CONFIG_RTE_LIBRTE_LFRING=y
+
+#
 # Compile librte_mempool
 #
 CONFIG_RTE_LIBRTE_MEMPOOL=y
diff --git a/lib/Makefile b/lib/Makefile
index d6239d2..cd46339 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -12,6 +12,8 @@ DIRS-$(CONFIG_RTE_LIBRTE_PCI) += librte_pci
 DEPDIRS-librte_pci := librte_eal
 DIRS-$(CONFIG_RTE_LIBRTE_RING) += librte_ring
 DEPDIRS-librte_ring := librte_eal
+DIRS-$(CONFIG_RTE_LIBRTE_RING) += librte_lfring
+DEPDIRS-librte_lfring := librte_eal
 DIRS-$(CONFIG_RTE_LIBRTE_MEMPOOL) += librte_mempool
 DEPDIRS-librte_mempool := librte_eal librte_ring
 DIRS-$(CONFIG_RTE_LIBRTE_MBUF) += librte_mbuf
diff --git a/lib/librte_lfring/Makefile b/lib/librte_lfring/Makefile
new file mode 100644
index 0000000..c04d2e9
--- /dev/null
+++ b/lib/librte_lfring/Makefile
@@ -0,0 +1,22 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2010-2014 Intel Corporation
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = librte_lfring.a
+
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
+LDLIBS += -lrte_eal
+
+EXPORT_MAP := rte_lfring_version.map
+
+LIBABIVER := 1
+
+# all source are stored in SRCS-y
+SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_lfring.c
+
+# install includes
+SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_lfring.h lockfree16.h
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_lfring/lockfree16.h b/lib/librte_lfring/lockfree16.h
new file mode 100644
index 0000000..199add9
--- /dev/null
+++ b/lib/librte_lfring/lockfree16.h
@@ -0,0 +1,143 @@
+//Copyright (c) 2018-2019, ARM Limited. All rights reserved.
+//
+//SPDX-License-Identifier:        BSD-3-Clause
+
+#ifndef _LOCKFREE16_H
+#define _LOCKFREE16_H
+
+#include <stdint.h>
+#include <stdbool.h>
+
+#define HAS_ACQ(mo) ((mo) != __ATOMIC_RELAXED && (mo) != __ATOMIC_RELEASE)
+#define HAS_RLS(mo) ((mo) == __ATOMIC_RELEASE || \
+		     (mo) == __ATOMIC_ACQ_REL || \
+		     (mo) == __ATOMIC_SEQ_CST)
+
+#define MO_LOAD(mo) (HAS_ACQ((mo)) ? __ATOMIC_ACQUIRE : __ATOMIC_RELAXED)
+#define MO_STORE(mo) (HAS_RLS((mo)) ? __ATOMIC_RELEASE : __ATOMIC_RELAXED)
+
+#if defined __aarch64__
+
+static inline __int128 ldx128(const __int128 *var, int mm)
+{
+	__int128 old;
+	if (mm == __ATOMIC_ACQUIRE)
+		__asm volatile("ldaxp %0, %H0, [%1]"
+				: "=&r" (old)
+				: "r" (var)
+				: "memory");
+	else if (mm == __ATOMIC_RELAXED)
+		__asm volatile("ldxp %0, %H0, [%1]"
+				: "=&r" (old)
+				: "r" (var)
+				: );
+	else
+		__builtin_trap();
+	return old;
+}
+
+//Return 0 on success, 1 on failure
+static inline uint32_t stx128(__int128 *var, __int128 neu, int mm)
+{
+	uint32_t ret;
+	if (mm == __ATOMIC_RELEASE)
+		__asm volatile("stlxp %w0, %1, %H1, [%2]"
+				: "=&r" (ret)
+				: "r" (neu), "r" (var)
+				: "memory");
+	else if (mm == __ATOMIC_RELAXED)
+		__asm volatile("stxp %w0, %1, %H1, [%2]"
+				: "=&r" (ret)
+				: "r" (neu), "r" (var)
+				: );
+	else
+		__builtin_trap();
+	return ret;
+}
+
+//Weak compare-exchange and non-atomic load on failure
+static inline bool
+lockfree_compare_exchange_16_frail(register __int128 *var,
+				   __int128 *exp,
+				   register __int128 neu,
+				   int mo_success,
+				   int mo_failure)
+{
+	(void)mo_failure;//Ignore memory ordering for failure, memory order for
+	//success must be stronger or equal
+	int ldx_mo = MO_LOAD(mo_success);
+	int stx_mo = MO_STORE(mo_success);
+	register __int128 expected = *exp;
+	__asm __volatile("" ::: "memory");
+	//Atomicity of LDXP is not guaranteed
+	register __int128 old = ldx128(var, ldx_mo);
+	if (old == expected && !stx128(var, neu, stx_mo)) {
+		//Right value and STX succeeded
+		__asm __volatile("" ::: "memory");
+		return 1;
+	}
+	__asm __volatile("" ::: "memory");
+	//Wrong value or STX failed
+	*exp = old;//Old possibly torn value (OK for 'frail' flavour)
+	return 0;//Failure, *exp updated
+}
+
+#elif defined __x86_64__
+
+union u128 {
+	struct {
+		uint64_t lo, hi;
+	} s;
+	__int128 ui;
+};
+
+static inline bool
+cmpxchg16b(__int128 *src, union u128 *cmp, union u128 with)
+{
+	bool result;
+	__asm__ __volatile__
+		("lock cmpxchg16b %1\n\tsetz %0"
+		 : "=q" (result), "+m" (*src), "+d" (cmp->s.hi), "+a" (cmp->s.lo)
+		 : "c" (with.s.hi), "b" (with.s.lo)
+		 : "cc", "memory"
+		);
+	return result;
+}
+
+static inline bool
+lockfree_compare_exchange_16(register __int128 *var,
+			     __int128 *exp,
+			     register __int128 neu,
+			     bool weak,
+			     int mo_success,
+			     int mo_failure)
+{
+	(void)weak;
+	(void)mo_success;
+	(void)mo_failure;
+	union u128 cmp, with;
+	cmp.ui = *exp;
+	with.ui = neu;
+	if (cmpxchg16b(var, &cmp, with))
+		return true;
+	*exp = cmp.ui;
+	return false;
+}
+
+static inline bool
+lockfree_compare_exchange_16_frail(register __int128 *var,
+				   __int128 *exp,
+				   register __int128 neu,
+				   int mo_s,
+				   int mo_f)
+{
+	return lockfree_compare_exchange_16(var, exp, neu, true, mo_s, mo_f);
+}
+
+#else
+
+#error Unsupported architecture
+
+#endif
+
+#endif
diff --git a/lib/librte_lfring/meson.build b/lib/librte_lfring/meson.build
new file mode 100644
index 0000000..c8b90cb
--- /dev/null
+++ b/lib/librte_lfring/meson.build
@@ -0,0 +1,6 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2017 Intel Corporation
+
+version = 1
+sources = files('rte_lfring.c')
+headers = files('rte_lfring.h','lockfree16.h')
diff --git a/lib/librte_lfring/rte_lfring.c b/lib/librte_lfring/rte_lfring.c
new file mode 100644
index 0000000..e130f71
--- /dev/null
+++ b/lib/librte_lfring/rte_lfring.c
@@ -0,0 +1,264 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2015 Intel Corporation
+ * Copyright (c) 2019 Arm Ltd
+ * All rights reserved.
+ */
+
+#include <stdio.h>
+#include <stdarg.h>
+#include <string.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <errno.h>
+#include <sys/queue.h>
+
+#include <rte_common.h>
+#include <rte_log.h>
+#include <rte_memory.h>
+#include <rte_memzone.h>
+#include <rte_malloc.h>
+#include <rte_launch.h>
+#include <rte_eal.h>
+#include <rte_eal_memconfig.h>
+#include <rte_atomic.h>
+#include <rte_per_lcore.h>
+#include <rte_lcore.h>
+#include <rte_branch_prediction.h>
+#include <rte_errno.h>
+#include <rte_string_fns.h>
+#include <rte_spinlock.h>
+
+#include "rte_lfring.h"
+
+TAILQ_HEAD(rte_lfring_list, rte_tailq_entry);
+
+static struct rte_tailq_elem rte_lfring_tailq = {
+	.name = RTE_TAILQ_LFRING_NAME,
+};
+EAL_REGISTER_TAILQ(rte_lfring_tailq)
+
+/* true if x is a power of 2 */
+#define POWEROF2(x) ((((x)-1) & (x)) == 0)
+
+/* return the size of memory occupied by a ring */
+ssize_t
+rte_lfring_get_memsize(unsigned int count)
+{
+	ssize_t sz;
+
+	/* count must be a power of 2 */
+	if ((!POWEROF2(count)) || (count > 0x80000000U)) {
+		RTE_LOG(ERR, RING,
+			"Requested size is invalid, must be power of 2, and "
+			"do not exceed the size limit %u\n", 0x80000000U);
+		return -EINVAL;
+	}
+
+	sz = sizeof(struct rte_lfring) + count * sizeof(struct rte_lfr_element);
+	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+	return sz;
+}
+
+int
+rte_lfring_init(struct rte_lfring *r, const char *name, unsigned int count,
+		unsigned int flags)
+{
+	int ret;
+
+	/* compilation-time checks */
+	RTE_BUILD_BUG_ON((sizeof(struct rte_lfring) &
+			  RTE_CACHE_LINE_MASK) != 0);
+
+	/* init the ring structure */
+	memset(r, 0, sizeof(*r));
+	ret = snprintf(r->name, sizeof(r->name), "%s", name);
+	if (ret < 0 || ret >= (int)sizeof(r->name))
+		return -ENAMETOOLONG;
+	r->flags = flags;
+
+	r->size = rte_align32pow2(count);
+	r->mask = r->size - 1;
+	r->log2 = rte_log2_u64(r->size);
+	r->head = 0;
+	r->tail = 0;
+	for (uint64_t i = 0; i < r->size; i++) {
+		r->ring[i].ptr = NULL;
+		r->ring[i].lap = (i - r->size) >> r->log2;
+	}
+
+	return 0;
+}
+
+/* create the ring */
+struct rte_lfring *
+rte_lfring_create(const char *name, unsigned int count, int socket_id,
+		  unsigned int flags)
+{
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+	struct rte_lfring *r;
+	struct rte_tailq_entry *te;
+	const struct rte_memzone *mz;
+	ssize_t ring_size;
+	int mz_flags = 0;
+	struct rte_lfring_list *ring_list = NULL;
+	const unsigned int requested_count = count;
+	int ret;
+
+	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
+
+	count = rte_align32pow2(count);
+
+	ring_size = rte_lfring_get_memsize(count);
+	if (ring_size < 0) {
+		rte_errno = ring_size;
+		return NULL;
+	}
+
+	ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
+		RTE_LFRING_MZ_PREFIX, name);
+	if (ret < 0 || ret >= (int)sizeof(mz_name)) {
+		rte_errno = ENAMETOOLONG;
+		return NULL;
+	}
+
+	te = rte_zmalloc("LFRING_TAILQ_ENTRY", sizeof(*te), 0);
+	if (te == NULL) {
+		RTE_LOG(ERR, RING, "Cannot reserve memory for tailq\n");
+		rte_errno = ENOMEM;
+		return NULL;
+	}
+
+	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	/* reserve a memory zone for this ring. If we can't get rte_config or
+	 * we are secondary process, the memzone_reserve function will set
+	 * rte_errno for us appropriately - hence no check in this this
+	 * function
+	 */
+	mz = rte_memzone_reserve_aligned(mz_name, ring_size, socket_id,
+					 mz_flags, __alignof__(*r));
+	if (mz != NULL) {
+		r = mz->addr;
+		/* no need to check return value here, we already checked the
+		 * arguments above
+		 */
+		rte_lfring_init(r, name, requested_count, flags);
+
+		te->data = (void *) r;
+		r->memzone = mz;
+
+		TAILQ_INSERT_TAIL(ring_list, te, next);
+	} else {
+		r = NULL;
+		RTE_LOG(ERR, RING, "Cannot reserve memory\n");
+		rte_free(te);
+	}
+	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	return r;
+}
+
+/* free the ring */
+void
+rte_lfring_free(struct rte_lfring *r)
+{
+	struct rte_lfring_list *ring_list = NULL;
+	struct rte_tailq_entry *te;
+
+	if (r == NULL)
+		return;
+
+	/*
+	 * Ring was not created with rte_lfring_create,
+	 * therefore, there is no memzone to free.
+	 */
+	if (r->memzone == NULL) {
+		RTE_LOG(ERR, RING, "Cannot free ring (not created with rte_lfring_create()");
+		return;
+	}
+
+	if (rte_memzone_free(r->memzone) != 0) {
+		RTE_LOG(ERR, RING, "Cannot free memory\n");
+		return;
+	}
+
+	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
+	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	/* find out tailq entry */
+	TAILQ_FOREACH(te, ring_list, next) {
+		if (te->data == (void *) r)
+			break;
+	}
+
+	if (te == NULL) {
+		rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+		return;
+	}
+
+	TAILQ_REMOVE(ring_list, te, next);
+
+	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	rte_free(te);
+}
+
+/* dump the status of the ring on the console */
+void
+rte_lfring_dump(FILE *f, const struct rte_lfring *r)
+{
+	fprintf(f, "ring <%s>@%p\n", r->name, r);
+	fprintf(f, "  flags=%x\n", r->flags);
+	fprintf(f, "  size=%"PRIu64"\n", r->size);
+	fprintf(f, "  tail=%"PRIu64"\n", r->tail);
+	fprintf(f, "  head=%"PRIu64"\n", r->head);
+	fprintf(f, "  used=%u\n", rte_lfring_count(r));
+	fprintf(f, "  avail=%u\n", rte_lfring_free_count(r));
+}
+
+/* dump the status of all rings on the console */
+void
+rte_lfring_list_dump(FILE *f)
+{
+	const struct rte_tailq_entry *te;
+	struct rte_lfring_list *ring_list;
+
+	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
+
+	rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	TAILQ_FOREACH(te, ring_list, next) {
+		rte_lfring_dump(f, (struct rte_lfring *) te->data);
+	}
+
+	rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
+}
+
+/* search a ring from its name */
+struct rte_lfring *
+rte_lfring_lookup(const char *name)
+{
+	struct rte_tailq_entry *te;
+	struct rte_lfring *r = NULL;
+	struct rte_lfring_list *ring_list;
+
+	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
+
+	rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
+
+	TAILQ_FOREACH(te, ring_list, next) {
+		r = (struct rte_lfring *) te->data;
+		if (strncmp(name, r->name, RTE_LFRING_NAMESIZE) == 0)
+			break;
+	}
+
+	rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	if (te == NULL) {
+		rte_errno = ENOENT;
+		return NULL;
+	}
+
+	return r;
+}
diff --git a/lib/librte_lfring/rte_lfring.h b/lib/librte_lfring/rte_lfring.h
new file mode 100644
index 0000000..22d3e1c
--- /dev/null
+++ b/lib/librte_lfring/rte_lfring.h
@@ -0,0 +1,621 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2017 Intel Corporation
+ * Copyright (c) 2019 Arm Ltd
+ * All rights reserved.
+ */
+
+#ifndef _RTE_LFRING_H_
+#define _RTE_LFRING_H_
+
+/**
+ * @file
+ * RTE Lfring
+ *
+ * The Lfring Manager is a fixed-size queue, implemented as a table of
+ * (pointers + counters).
+ *
+ * - FIFO (First In First Out)
+ * - Maximum size is fixed; the pointers are stored in a table.
+ * - Lock-free implementation.
+ * - Multi- or single-consumer dequeue.
+ * - Multi- or single-producer enqueue.
+ *
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdio.h>
+#include <stdint.h>
+#include <sys/queue.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_memzone.h>
+#include <rte_pause.h>
+
+#include "lockfree16.h"
+
+#define RTE_TAILQ_LFRING_NAME "RTE_LFRING"
+
+#define RTE_LFRING_MZ_PREFIX "RG_"
+/**< The maximum length of an lfring name. */
+#define RTE_LFRING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
+			   sizeof(RTE_LFRING_MZ_PREFIX) + 1)
+
+struct rte_memzone; /* forward declaration, so as not to require memzone.h */
+
+struct rte_lfr_element {
+	void *ptr;
+	uintptr_t lap;
+};
+
+#if __SIZEOF_POINTER__ == 8
+typedef __int128 dintptr_t;
+#elif __SIZEOF_POINTER__ == 4
+typedef uint64_t dintptr_t;
+#else
+#error
+#endif
+
+/**
+ * An RTE lfring structure.
+ *
+ */
+struct rte_lfring {
+	char name[RTE_LFRING_NAMESIZE] __rte_cache_aligned; /**< Name of the lfring. */
+	const struct rte_memzone *memzone;
+			/**< Memzone, if any, containing the rte_lfring */
+
+	char pad0 __rte_cache_aligned; /**< empty cache line */
+
+	/** Modified by producer. */
+	uint64_t tail __rte_cache_aligned;
+	uint64_t size;	   /**< Size of ring. */
+	uint64_t mask;	   /**< Mask (size-1) of lfring. */
+	uint32_t log2;	   /**< log2 size. */
+	uint32_t flags;	   /**< Flags supplied at creation. */
+	char pad1 __rte_cache_aligned; /**< empty cache line */
+
+	/** Modified by consumer. */
+	uint64_t head __rte_cache_aligned;
+	char pad2 __rte_cache_aligned; /**< empty cache line */
+
+	struct rte_lfr_element ring[] __rte_cache_aligned;
+};
+
+#define LFRING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
+#define LFRING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
+
+/**
+ * Calculate the memory size needed for a lfring
+ *
+ * This function returns the number of bytes needed for a lfring, given
+ * the number of elements in it. This value is the sum of the size of
+ * the structure rte_lfring and the size of the memory needed by the
+ * objects pointers. The value is aligned to a cache line size.
+ *
+ * @param count
+ *   The number of elements in the lfring (must be a power of 2).
+ * @return
+ *   - The memory size needed for the lfring on success.
+ *   - -EINVAL if count is not a power of 2.
+ */
+ssize_t rte_lfring_get_memsize(unsigned int count);
+
+/**
+ * Initialize a lfring structure.
+ *
+ * Initialize a lfring structure in memory pointed by "r". The size of the
+ * memory area must be large enough to store the lfring structure and the
+ * object table. It is advised to use rte_lfring_get_memsize() to get the
+ * appropriate size.
+ *
+ * The lfring size is set to *count*, which must be a power of two. Water
+ * marking is disabled by default. The real usable lfring size is
+ * *count-1* instead of *count* to differentiate a free lfring from an
+ * empty lfring.
+ *
+ * The lfring is not added in RTE_TAILQ_LFRING global list. Indeed, the
+ * memory given by the caller may not be shareable among dpdk
+ * processes.
+ *
+ * @param r
+ *   The pointer to the lfring structure followed by the objects table.
+ * @param name
+ *   The name of the lfring.
+ * @param count
+ *   The number of elements in the lfring (must be a power of 2).
+ * @param flags
+ * @return
+ *   0 on success, or a negative value on error.
+ */
+int rte_lfring_init(struct rte_lfring *r, const char *name, unsigned int count,
+		    unsigned int flags);
+
+/**
+ * Create a new lfring named *name* in memory.
+ *
+ * This function uses ``memzone_reserve()`` to allocate memory. Then it
+ * calls rte_lfring_init() to initialize an empty lfring.
+ *
+ * The new lfring size is set to *count*, which must be a power of
+ * two. Water marking is disabled by default. The real usable lfring size
+ * is *count-1* instead of *count* to differentiate a free lfring from an
+ * empty lfring.
+ *
+ * The lfring is added in RTE_TAILQ_LFRING list.
+ *
+ * @param name
+ *   The name of the lfring.
+ * @param count
+ *   The size of the lfring (must be a power of 2).
+ * @param socket_id
+ *   The *socket_id* argument is the socket identifier in case of
+ *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
+ *   constraint for the reserved zone.
+ * @param flags
+ *   An OR of the following:
+ *    - LFRING_F_SP_ENQ: If this flag is set, the default behavior when
+ *      using ``rte_lfring_enqueue()`` or ``rte_lfring_enqueue_bulk()``
+ *      is "single-producer". Otherwise, it is "multi-producers".
+ *    - LFRING_F_SC_DEQ: If this flag is set, the default behavior when
+ *      using ``rte_lfring_dequeue()`` or ``rte_lfring_dequeue_bulk()``
+ *      is "single-consumer". Otherwise, it is "multi-consumers".
+ * @return
+ *   On success, the pointer to the new allocated lfring. NULL on error with
+ *    rte_errno set appropriately. Possible errno values include:
+ *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
+ *    - E_RTE_SECONDARY - function was called from a secondary process instance
+ *    - EINVAL - count provided is not a power of 2
+ *    - ENOSPC - the maximum number of memzones has already been allocated
+ *    - EEXIST - a memzone with the same name already exists
+ *    - ENOMEM - no appropriate memory area found in which to create memzone
+ */
+struct rte_lfring *rte_lfring_create(const char *name, unsigned int count,
+				     int socket_id, unsigned int flags);
+/**
+ * De-allocate all memory used by the lfring.
+ *
+ * @param r
+ *   Lfring to free
+ */
+void rte_lfring_free(struct rte_lfring *r);
+
+/**
+ * Dump the status of the lfring to a file.
+ *
+ * @param f
+ *   A pointer to a file for output
+ * @param r
+ *   A pointer to the lfring structure.
+ */
+void rte_lfring_dump(FILE *f, const struct rte_lfring *r);
+
+/* True if 'a' is before 'b' ('a' < 'b') in serial number arithmetic */
+static __rte_always_inline bool
+_rte_lfring_before(uint64_t a, uint64_t b)
+{
+	return (int64_t)(a - b) < 0;
+}
+
+static __rte_always_inline uint64_t
+_rte_lfring_update(uint64_t *loc, uint64_t neu)
+{
+	uint64_t old = __atomic_load_n(loc, __ATOMIC_RELAXED);
+	do {
+		if (_rte_lfring_before(neu, old)) {
+			/* neu < old */
+			return old;
+		}
+		/* Else neu > old, need to update *loc */
+	} while (!__atomic_compare_exchange_n(loc,
+					      &old,
+					      neu,
+					      /*weak=*/true,
+					      __ATOMIC_RELEASE,
+					      __ATOMIC_RELAXED));
+	return neu;
+}
+
+static __rte_always_inline uint64_t
+_rte_lfring_reload(const uint64_t *loc, uint64_t idx)
+{
+	uint64_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED);
+	if (_rte_lfring_before(idx, fresh)) {
+		/* fresh is after idx, use this instead */
+		idx = fresh;
+	} else {
+		/* Continue with next slot */
+		idx++;
+	}
+	return idx;
+}
+
+static __rte_always_inline void
+_rte_lfring_enq_elems(void *const *elems,
+		      struct rte_lfr_element *ring,
+		      uint64_t mask,
+		      uint32_t log2,
+		      uint64_t idx,
+		      uint64_t num)
+{
+	uint64_t seg0 = RTE_MIN(mask + 1 - (idx & mask), num);
+	struct rte_lfr_element *ring0 = &ring[idx & mask];
+	for (uint64_t i = 0; i < seg0; i++) {
+		ring0[i].ptr = *elems++;
+		ring0[i].lap = (idx++) >> log2;
+	}
+	if (num > seg0) {
+		uint64_t seg1 = num - seg0;
+		for (uint64_t i = 0; i < seg1; i++) {
+			ring[i].ptr = *elems++;
+			ring[i].lap = (idx++) >> log2;
+		}
+	}
+}
+
+static __rte_always_inline unsigned int
+rte_lfring_sp_enqueue_bulk(struct rte_lfring *r,
+			   void * const *elems,
+			   unsigned int nelems,
+			   unsigned int *free_space)
+{
+	/* Single-producer */
+	uint64_t size = r->mask + 1;
+	uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_RELAXED);
+	uint64_t head = __atomic_load_n(&r->head, __ATOMIC_ACQUIRE);
+	int64_t actual = RTE_MIN((int64_t)(head + size - tail), (int64_t)nelems);
+	if (actual <= 0)
+		return 0;
+	_rte_lfring_enq_elems(elems, r->ring, r->mask, r->log2, tail, actual);
+	__atomic_store_n(&r->tail, tail + actual, __ATOMIC_RELEASE);
+	if (free_space != NULL)
+		*free_space = head + size - (tail + actual);
+	return actual;
+}
+
+static inline bool
+lockfree_compare_exchange_frail(dintptr_t *loc,
+				dintptr_t *old,
+				dintptr_t neu,
+				int mo_success,
+				int mo_failure)
+{
+#if __SIZEOF_POINTER__ == 8
+	return lockfree_compare_exchange_16_frail(loc,
+						  old,
+						  neu,
+						  mo_success,
+						  mo_failure);
+#elif __SIZEOF_POINTER__ == 4
+	return lockfree_compare_exchange_8_frail(loc,
+						 old,
+						 neu,
+						 mo_success,
+						 mo_failure);
+#else
+#error
+#endif
+}
+
+#define ENQ_RETRY_LIMIT 32
+
+static __rte_always_inline unsigned int
+rte_lfring_mp_enqueue_bulk(struct rte_lfring *r,
+			   void * const *elems,
+			   unsigned int nelems,
+			   unsigned int *free_space)
+{
+	/* Lock-free multi-producer */
+	uint64_t mask = r->mask;
+	uint64_t size = mask + 1;
+	uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_RELAXED);
+	uint64_t actual = 0;
+	uint32_t retries = 0;
+restart:
+	while (actual < nelems &&
+	       _rte_lfring_before(tail,
+			__atomic_load_n(&r->head, __ATOMIC_ACQUIRE)) + size) {
+		union {
+			struct rte_lfr_element e;
+			dintptr_t ui;
+		} old, neu;
+		struct rte_lfr_element *slot = &r->ring[tail & mask];
+		old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED);
+		old.e.lap = __atomic_load_n(&slot->lap, __ATOMIC_RELAXED);
+		do {
+			if (old.e.lap != (tail - size) >> r->log2) {
+				if (old.e.lap != tail >> r->log2 ||
+					++retries == ENQ_RETRY_LIMIT) {
+					/* We are far behind,
+					 * restart with fresh index
+					 */
+					tail = _rte_lfring_reload(&r->tail,
+								  tail);
+					retries = 0;
+					goto restart;
+				} else {
+					/* Slot already enqueued,
+					 * try next slot
+					 */
+					tail++;
+					goto restart;
+				}
+			}
+			/* Found slot that was used one lap back,
+			 * try to enqueue next element
+			 */
+			neu.e.ptr = elems[actual];
+			neu.e.lap = tail >> r->log2;/* Set lap on enqueue */
+		} while (!lockfree_compare_exchange_frail((dintptr_t *)slot,
+							  &old.ui,
+							  neu.ui,
+							  __ATOMIC_RELEASE,
+							  __ATOMIC_RELAXED));
+		/* Enqueue succeeded */
+		actual++;
+		/* Continue with next slot */
+		tail++;
+		retries = 0;
+	}
+	tail = _rte_lfring_update(&r->tail, tail);
+	if (free_space != NULL)
+		*free_space = __atomic_load_n(&r->head,
+					      __ATOMIC_RELAXED) + size - tail;
+	return actual;
+}
+
+/**
+ * Enqueue several objects on an lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to enqueue on the lfring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the lfring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, between 0 and n (inclusive)
+ */
+static __rte_always_inline unsigned int
+rte_lfring_enqueue_bulk(struct rte_lfring *r,
+			void * const *elems,
+			unsigned int nelems,
+			unsigned int *free_space)
+{
+	if (r->flags & LFRING_F_SP_ENQ)
+		return rte_lfring_sp_enqueue_bulk(r, elems, nelems, free_space);
+	else
+		return rte_lfring_mp_enqueue_bulk(r, elems, nelems, free_space);
+}
+
+static __rte_always_inline void
+_rte_lfring_deq_elems(void **elems,
+		      const struct rte_lfr_element *ring,
+		      uint64_t mask,
+		      uint64_t idx,
+		      uint64_t num)
+{
+	uint64_t seg0 = RTE_MIN(mask + 1 - (idx & mask), num);
+	const struct rte_lfr_element *ring0 = &ring[idx & mask];
+	for (uint64_t i = 0; i < seg0; i++)
+		*elems++ = ring0[i].ptr;
+	if (num > seg0) {
+		uint64_t seg1 = num - seg0;
+		for (uint64_t i = 0; i < seg1; i++)
+			*elems++ = ring[i].ptr;
+	}
+}
+
+static __rte_always_inline unsigned int
+rte_lfring_sc_dequeue_bulk(struct rte_lfring *r,
+			   void **elems,
+			   unsigned int nelems,
+			   unsigned int *available)
+{
+	/* Single-consumer */
+	int64_t actual;
+	uint64_t mask = r->mask;
+	uint64_t head = __atomic_load_n(&r->head, __ATOMIC_RELAXED);
+	uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_ACQUIRE);
+	actual = RTE_MIN((int64_t)(tail - head), (int64_t)nelems);
+	if (unlikely(actual < nelems))
+		return 0;
+	_rte_lfring_deq_elems(elems, r->ring, mask, head, actual);
+	/* Use RMB + STORE-RELAXED instead of STORE-RELEASE as we only
+	 * read shared data, no need to order writes.
+	 */
+	rte_smp_rmb();
+	__atomic_store_n(&r->head, head + actual, __ATOMIC_RELAXED);
+	if (available != NULL)
+		*available = tail - (head + actual);
+	return (unsigned int)actual;
+}
+
+static __rte_always_inline unsigned int
+rte_lfring_mc_dequeue_bulk(struct rte_lfring *r,
+			   void **elems,
+			   unsigned int nelems,
+			   unsigned int *available)
+{
+	/* Lock-free multi-consumer */
+	int64_t actual;
+	uint64_t mask = r->mask;
+	uint64_t head = __atomic_load_n(&r->head, __ATOMIC_RELAXED);
+	uint64_t tail;
+	do {
+		/* Load tail on every iteration to avoid spurious queue
+		 * empty situations
+		 */
+		tail = __atomic_load_n(&r->tail, __ATOMIC_ACQUIRE);
+		actual = RTE_MIN((int64_t)(tail - head), (int64_t)nelems);
+		if (unlikely(actual < nelems))
+			return 0;
+		_rte_lfring_deq_elems(elems, r->ring, mask, head, actual);
+		/* Use RMB + CAS-RELAXED instead of CAS-RELEASE as we only
+		 * read shared data, no need to order writes.
+		 */
+	} while (!__atomic_compare_exchange_n(&r->head,
+					      &head,
+					      head + actual,
+					      /*weak*/false,
+					      __ATOMIC_RELAXED,
+					      __ATOMIC_RELAXED));
+	if (available != NULL)
+		*available = tail - (head + actual);
+	return (unsigned int)actual;
+}
+
+/**
+ * Dequeue several objects from an lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to dequeue from the lfring to the obj_table.
+ * @param available
+ *   Returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   The number of objects dequeued, from 0 to n (inclusive)
+ */
+static __rte_always_inline unsigned int
+rte_lfring_dequeue_bulk(struct rte_lfring *r,
+			void **elems,
+			unsigned int nelems,
+			unsigned int *available)
+{
+	if (r->flags & LFRING_F_SC_DEQ)
+		return rte_lfring_sc_dequeue_bulk(r, elems, nelems, available);
+	else
+		return rte_lfring_mc_dequeue_bulk(r, elems, nelems, available);
+}
+
+/**
+ * Return the number of entries in a lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   The number of entries in the lfring.
+ */
+static inline unsigned
+rte_lfring_count(const struct rte_lfring *r)
+{
+	uint64_t head = __atomic_load_n(&r->head, __ATOMIC_RELAXED);
+	uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_RELAXED);
+	uint64_t avail = tail - head;
+	return (int64_t)avail > 0 ? avail : 0;
+}
+
+/**
+ * Return the number of free entries in a lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   The number of free entries in the lfring.
+ */
+static inline unsigned
+rte_lfring_free_count(const struct rte_lfring *r)
+{
+	return r->size - rte_lfring_count(r);
+}
+
+/**
+ * Test if a lfring is full.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   - 1: The lfring is full.
+ *   - 0: The lfring is not full.
+ */
+static inline int
+rte_lfring_full(const struct rte_lfring *r)
+{
+	return rte_lfring_free_count(r) == 0;
+}
+
+/**
+ * Test if a lfring is empty.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   - 1: The lfring is empty.
+ *   - 0: The lfring is not empty.
+ */
+static inline int
+rte_lfring_empty(const struct rte_lfring *r)
+{
+	return rte_lfring_count(r) == 0;
+}
+
+/**
+ * Return the size of the lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   The size of the data store used by the lfring.
+ *   NOTE: this is not the same as the usable space in the lfring. To query that
+ *   use ``rte_lfring_get_capacity()``.
+ */
+static inline unsigned int
+rte_lfring_get_size(const struct rte_lfring *r)
+{
+	return r->size;
+}
+
+/**
+ * Return the number of elements which can be stored in the lfring.
+ *
+ * @param r
+ *   A pointer to the lfring structure.
+ * @return
+ *   The usable size of the lfring.
+ */
+static inline unsigned int
+rte_lfring_get_capacity(const struct rte_lfring *r)
+{
+	return r->size;
+}
+
+/**
+ * Dump the status of all lfrings on the console
+ *
+ * @param f
+ *   A pointer to a file for output
+ */
+void rte_lfring_list_dump(FILE *f);
+
+/**
+ * Search a lfring from its name
+ *
+ * @param name
+ *   The name of the lfring.
+ * @return
+ *   The pointer to the lfring matching the name, or NULL if not found,
+ *   with rte_errno set appropriately. Possible rte_errno values include:
+ *    - ENOENT - required entry not available to return.
+ */
+struct rte_lfring *rte_lfring_lookup(const char *name);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_LFRING_H_ */
diff --git a/lib/librte_lfring/rte_lfring_version.map b/lib/librte_lfring/rte_lfring_version.map
new file mode 100644
index 0000000..d935efd
--- /dev/null
+++ b/lib/librte_lfring/rte_lfring_version.map
@@ -0,0 +1,19 @@
+DPDK_2.0 {
+	global:
+
+	rte_ring_create;
+	rte_ring_dump;
+	rte_ring_get_memsize;
+	rte_ring_init;
+	rte_ring_list_dump;
+	rte_ring_lookup;
+
+	local: *;
+};
+
+DPDK_2.2 {
+	global:
+
+	rte_ring_free;
+
+} DPDK_2.0;
-- 
2.7.4

^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [dpdk-dev] [RFC,v2] lfring: lock-free ring buffer
  2019-01-30 16:36 [RFC,v2] lfring: lock-free ring buffer Ola Liljedahl
@ 2019-06-05 18:21 ` " Eads, Gage
  2019-06-15 21:00   ` Ola Liljedahl
  0 siblings, 1 reply; 4+ messages in thread
From: Eads, Gage @ 2019-06-05 18:21 UTC (permalink / raw)
  To: Ola Liljedahl, dev, Honnappa Nagarahalli, Richardson, Bruce; +Cc: nd

Hi Ola,

Is it possible to add burst enqueue and dequeue functions as well, so we can plug this into a mempool handler? Besides the mempool handler, I think the all-or-nothing semantics would be useful for applications. Besides that, this RFC looks good at a high level.

For a complete submission, a few more changes are needed:
- Builds: Need to add 'lfring' to lib/meson.build and mk/rte.app.mk
- Documentation: need to update release notes, add a new section in the programmer's guide, and update the doxygen configuration files
- Tests: This patchset should add a set of lfring tests as well

Code comments follow.

<snip>
 
> diff --git a/lib/Makefile b/lib/Makefile index d6239d2..cd46339 100644
> --- a/lib/Makefile
> +++ b/lib/Makefile
> @@ -12,6 +12,8 @@ DIRS-$(CONFIG_RTE_LIBRTE_PCI) += librte_pci
> DEPDIRS-librte_pci := librte_eal
>  DIRS-$(CONFIG_RTE_LIBRTE_RING) += librte_ring  DEPDIRS-librte_ring :=
> librte_eal
> +DIRS-$(CONFIG_RTE_LIBRTE_RING) += librte_lfring DEPDIRS-librte_lfring
> +:= librte_eal

LIBRTE_RING -> LIBRTE_LFRING

>  DIRS-$(CONFIG_RTE_LIBRTE_MEMPOOL) += librte_mempool  DEPDIRS-
> librte_mempool := librte_eal librte_ring
>  DIRS-$(CONFIG_RTE_LIBRTE_MBUF) += librte_mbuf diff --git
> a/lib/librte_lfring/Makefile b/lib/librte_lfring/Makefile new file mode 100644
> index 0000000..c04d2e9
> --- /dev/null
> +++ b/lib/librte_lfring/Makefile
> @@ -0,0 +1,22 @@
> +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2010-2014 Intel
> +Corporation

Need to correct the copyright

> +
> +include $(RTE_SDK)/mk/rte.vars.mk
> +
> +# library name
> +LIB = librte_lfring.a
> +
> +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 LDLIBS += -lrte_eal
> +
> +EXPORT_MAP := rte_lfring_version.map
> +
> +LIBABIVER := 1
> +
> +# all source are stored in SRCS-y
> +SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_lfring.c

LIBRTE_RING -> LIBRTE_LFRING

> +
> +# install includes
> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_lfring.h lockfree16.h

LIBRTE_RING -> LIBRTE_LFRING

> +
> +include $(RTE_SDK)/mk/rte.lib.mk
> diff --git a/lib/librte_lfring/lockfree16.h b/lib/librte_lfring/lockfree16.h new
> file mode 100644 index 0000000..199add9
> --- /dev/null
> +++ b/lib/librte_lfring/lockfree16.h

This needs to be refactored to use the rte_atomic128_cmp_exchange interface.

<snip>

> diff --git a/lib/librte_lfring/meson.build b/lib/librte_lfring/meson.build new
> file mode 100644 index 0000000..c8b90cb
> --- /dev/null
> +++ b/lib/librte_lfring/meson.build
> @@ -0,0 +1,6 @@
> +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2017 Intel
> +Corporation
> +

Correct copyright

> +version = 1
> +sources = files('rte_lfring.c')
> +headers = files('rte_lfring.h','lockfree16.h')
> diff --git a/lib/librte_lfring/rte_lfring.c b/lib/librte_lfring/rte_lfring.c new file
> mode 100644 index 0000000..e130f71
> --- /dev/null
> +++ b/lib/librte_lfring/rte_lfring.c
> @@ -0,0 +1,264 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2010-2015 Intel Corporation
> + * Copyright (c) 2019 Arm Ltd
> + * All rights reserved.
> + */
> +
> +#include <stdio.h>
> +#include <stdarg.h>
> +#include <string.h>
> +#include <stdint.h>
> +#include <inttypes.h>
> +#include <errno.h>
> +#include <sys/queue.h>
> +
> +#include <rte_common.h>
> +#include <rte_log.h>
> +#include <rte_memory.h>
> +#include <rte_memzone.h>
> +#include <rte_malloc.h>
> +#include <rte_launch.h>
> +#include <rte_eal.h>
> +#include <rte_eal_memconfig.h>
> +#include <rte_atomic.h>
> +#include <rte_per_lcore.h>
> +#include <rte_lcore.h>
> +#include <rte_branch_prediction.h>
> +#include <rte_errno.h>
> +#include <rte_string_fns.h>
> +#include <rte_spinlock.h>
> +

You can reduce the system and DPDK includes down to:

#include <inttypes.h>
#include <string.h>
		
#include <rte_atomic.h>
#include <rte_eal.h>
#include <rte_eal_memconfig.h>
#include <rte_errno.h>
#include <rte_malloc.h>
#include <rte_memzone.h>
#include <rte_rwlock.h>
#include <rte_tailq.h>

> +#include "rte_lfring.h"
> +
> +TAILQ_HEAD(rte_lfring_list, rte_tailq_entry);
> +
> +static struct rte_tailq_elem rte_lfring_tailq = {
> +	.name = RTE_TAILQ_LFRING_NAME,
> +};
> +EAL_REGISTER_TAILQ(rte_lfring_tailq)
> +
> +/* true if x is a power of 2 */
> +#define POWEROF2(x) ((((x)-1) & (x)) == 0)

DPDK provides this functionality with RTE_IS_POWER_OF_2(), in rte_common.h

> +
> +/* return the size of memory occupied by a ring */ ssize_t
> +rte_lfring_get_memsize(unsigned int count) {
> +	ssize_t sz;
> +
> +	/* count must be a power of 2 */
> +	if ((!POWEROF2(count)) || (count > 0x80000000U)) {
> +		RTE_LOG(ERR, RING,

This library needs to replace the RING logging with its own LFRING logging.

<snip>

> +/* create the ring */

The comments before function definitions (e.g. "create the ring", "free the ring") probably aren't necessary, the names are self-explanatory.

> +struct rte_lfring *
> +rte_lfring_create(const char *name, unsigned int count, int socket_id,
> +		  unsigned int flags)
> +{
> +	char mz_name[RTE_MEMZONE_NAMESIZE];
> +	struct rte_lfring *r;
> +	struct rte_tailq_entry *te;
> +	const struct rte_memzone *mz;
> +	ssize_t ring_size;
> +	int mz_flags = 0;
> +	struct rte_lfring_list *ring_list = NULL;
> +	const unsigned int requested_count = count;
> +	int ret;
> +
> +	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
> +
> +	count = rte_align32pow2(count);
> +
> +	ring_size = rte_lfring_get_memsize(count);
> +	if (ring_size < 0) {
> +		rte_errno = ring_size;
> +		return NULL;
> +	}
> +
> +	ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
> +		RTE_LFRING_MZ_PREFIX, name);
> +	if (ret < 0 || ret >= (int)sizeof(mz_name)) {
> +		rte_errno = ENAMETOOLONG;
> +		return NULL;
> +	}
> +
> +	te = rte_zmalloc("LFRING_TAILQ_ENTRY", sizeof(*te), 0);
> +	if (te == NULL) {
> +		RTE_LOG(ERR, RING, "Cannot reserve memory for tailq\n");
> +		rte_errno = ENOMEM;
> +		return NULL;
> +	}
> +
> +	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
> +
> +	/* reserve a memory zone for this ring. If we can't get rte_config or
> +	 * we are secondary process, the memzone_reserve function will set
> +	 * rte_errno for us appropriately - hence no check in this this
> +	 * function
> +	 */

The "or we are secondary process" comment is out-of-date; the memzone reserve functions can be called by a secondary process. (For that matter, the documentation in rte_memzone.h is out-of-date as well.) This restriction was removed in commit 916e4f4f4e45.

<snip>

> +/* free the ring */
> +void
> +rte_lfring_free(struct rte_lfring *r)
> +{
> +	struct rte_lfring_list *ring_list = NULL;
> +	struct rte_tailq_entry *te;
> +
> +	if (r == NULL)
> +		return;
> +
> +	/*
> +	 * Ring was not created with rte_lfring_create,
> +	 * therefore, there is no memzone to free.
> +	 */
> +	if (r->memzone == NULL) {
> +		RTE_LOG(ERR, RING, "Cannot free ring (not created with
> rte_lfring_create()");
> +		return;
> +	}
> +
> +	if (rte_memzone_free(r->memzone) != 0) {
> +		RTE_LOG(ERR, RING, "Cannot free memory\n");
> +		return;
> +	}

Better to free the ring's memzone after taking it off the list, in the unlikely event another thread looks it up at the same time and attempts to use it.

> +
> +	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
> +	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
> +
> +	/* find out tailq entry */
> +	TAILQ_FOREACH(te, ring_list, next) {
> +		if (te->data == (void *) r)
> +			break;
> +	}
> +
> +	if (te == NULL) {
> +		rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
> +		return;
> +	}
> +
> +	TAILQ_REMOVE(ring_list, te, next);
> +
> +	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
> +
> +	rte_free(te);
> +}

<snip>

> +/* search a ring from its name */
> +struct rte_lfring *
> +rte_lfring_lookup(const char *name)
> +{
> +	struct rte_tailq_entry *te;
> +	struct rte_lfring *r = NULL;
> +	struct rte_lfring_list *ring_list;
> +
> +	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
> +
> +	rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
> +
> +	TAILQ_FOREACH(te, ring_list, next) {
> +		r = (struct rte_lfring *) te->data;
> +		if (strncmp(name, r->name, RTE_LFRING_NAMESIZE) == 0)

Missing a NULL pointer check before dereferencing 'name'

> +			break;
> +	}
> +
> +	rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
> +
> +	if (te == NULL) {
> +		rte_errno = ENOENT;
> +		return NULL;
> +	}
> +
> +	return r;
> +}
> diff --git a/lib/librte_lfring/rte_lfring.h b/lib/librte_lfring/rte_lfring.h new file
> mode 100644 index 0000000..22d3e1c
> --- /dev/null
> +++ b/lib/librte_lfring/rte_lfring.h
> @@ -0,0 +1,621 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2010-2017 Intel Corporation
> + * Copyright (c) 2019 Arm Ltd
> + * All rights reserved.
> + */
> +
> +#ifndef _RTE_LFRING_H_
> +#define _RTE_LFRING_H_
> +
> +/**
> + * @file
> + * RTE Lfring
> + *
> + * The Lfring Manager is a fixed-size queue, implemented as a table of
> + * (pointers + counters).
> + *
> + * - FIFO (First In First Out)
> + * - Maximum size is fixed; the pointers are stored in a table.
> + * - Lock-free implementation.
> + * - Multi- or single-consumer dequeue.
> + * - Multi- or single-producer enqueue.
> + *
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdio.h>
> +#include <stdint.h>
> +#include <sys/queue.h>
> +#include <errno.h>
> +#include <rte_common.h>
> +#include <rte_config.h>
> +#include <rte_memory.h>
> +#include <rte_lcore.h>
> +#include <rte_atomic.h>
> +#include <rte_branch_prediction.h>
> +#include <rte_memzone.h>
> +#include <rte_pause.h>
> +

You should be able to clean up the includes here too, e.g. rte_pause.h doesn't appear to be used.

> +#include "lockfree16.h"
> +
> +#define RTE_TAILQ_LFRING_NAME "RTE_LFRING"
> +
> +#define RTE_LFRING_MZ_PREFIX "RG_"
> +/**< The maximum length of an lfring name. */ #define

Change "/**<" to "/**", otherwise doxygen applies the comment to RTE_LFRING_MZ_PREFIX

> +RTE_LFRING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
> +			   sizeof(RTE_LFRING_MZ_PREFIX) + 1)
> +
> +struct rte_memzone; /* forward declaration, so as not to require
> +memzone.h */

This forward declaration is no longer necessary

<snip>

> +/**
> + * Calculate the memory size needed for a lfring
> + *
> + * This function returns the number of bytes needed for a lfring, given
> + * the number of elements in it. This value is the sum of the size of
> + * the structure rte_lfring and the size of the memory needed by the
> + * objects pointers. The value is aligned to a cache line size.
> + *
> + * @param count
> + *   The number of elements in the lfring (must be a power of 2).
> + * @return
> + *   - The memory size needed for the lfring on success.
> + *   - -EINVAL if count is not a power of 2.

Missing error case: -EINVAL if size exceeds size limit

> + */
> +ssize_t rte_lfring_get_memsize(unsigned int count);
> +
> +/**
> + * Initialize a lfring structure.
> + *
> + * Initialize a lfring structure in memory pointed by "r". The size of
> +the
> + * memory area must be large enough to store the lfring structure and
> +the
> + * object table. It is advised to use rte_lfring_get_memsize() to get
> +the
> + * appropriate size.
> + *
> + * The lfring size is set to *count*, which must be a power of two.
> +Water
> + * marking is disabled by default. The real usable lfring size is
> + * *count-1* instead of *count* to differentiate a free lfring from an
> + * empty lfring.

"free lfring" -> should say "full lfring"? Same applies to rte_lfring_create().

> + *
> + * The lfring is not added in RTE_TAILQ_LFRING global list. Indeed, the
> + * memory given by the caller may not be shareable among dpdk
> + * processes.

Here and in rte_lfring_create(), the documentation mentions whether or not the lfring is added to the RTE_TAILQ_LFRING list. I don't think this makes sense in the documentation, as it pertains to the implementation and not the interface.

> + *
> + * @param r
> + *   The pointer to the lfring structure followed by the objects table.
> + * @param name
> + *   The name of the lfring.
> + * @param count
> + *   The number of elements in the lfring (must be a power of 2).
> + * @param flags
> + * @return
> + *   0 on success, or a negative value on error.
> + */
> +int rte_lfring_init(struct rte_lfring *r, const char *name, unsigned int count,
> +		    unsigned int flags);
> +
> +/**
> + * Create a new lfring named *name* in memory.
> + *
> + * This function uses ``memzone_reserve()`` to allocate memory. Then it
> + * calls rte_lfring_init() to initialize an empty lfring.
> + *
> + * The new lfring size is set to *count*, which must be a power of
> + * two. Water marking is disabled by default. The real usable lfring
> +size
> + * is *count-1* instead of *count* to differentiate a free lfring from
> +an
> + * empty lfring.
> + *
> + * The lfring is added in RTE_TAILQ_LFRING list.
> + *
> + * @param name
> + *   The name of the lfring.
> + * @param count
> + *   The size of the lfring (must be a power of 2).
> + * @param socket_id
> + *   The *socket_id* argument is the socket identifier in case of
> + *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
> + *   constraint for the reserved zone.
> + * @param flags
> + *   An OR of the following:
> + *    - LFRING_F_SP_ENQ: If this flag is set, the default behavior when
> + *      using ``rte_lfring_enqueue()`` or ``rte_lfring_enqueue_bulk()``
> + *      is "single-producer". Otherwise, it is "multi-producers".
> + *    - LFRING_F_SC_DEQ: If this flag is set, the default behavior when
> + *      using ``rte_lfring_dequeue()`` or ``rte_lfring_dequeue_bulk()``
> + *      is "single-consumer". Otherwise, it is "multi-consumers".
> + * @return
> + *   On success, the pointer to the new allocated lfring. NULL on error with
> + *    rte_errno set appropriately. Possible errno values include:
> + *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config
> structure
> + *    - E_RTE_SECONDARY - function was called from a secondary process
> instance

E_RTE_SECONDARY and E_RTE_NO_CONFIG are not possible error cases

<snip>

> +#define ENQ_RETRY_LIMIT 32

Per the coding guidelines (https://doc.dpdk.org/guides/contributing/coding_style.html#macros), macros should be prefixed with RTE_.

<snip>

> +/**
> + * Return the number of elements which can be stored in the lfring.
> + *
> + * @param r
> + *   A pointer to the lfring structure.
> + * @return
> + *   The usable size of the lfring.
> + */
> +static inline unsigned int
> +rte_lfring_get_capacity(const struct rte_lfring *r) {
> +	return r->size;

I believe this should return r->mask, to account for the one unusable ring entry.

<snip>

> diff --git a/lib/librte_lfring/rte_lfring_version.map
> b/lib/librte_lfring/rte_lfring_version.map
> new file mode 100644
> index 0000000..d935efd
> --- /dev/null
> +++ b/lib/librte_lfring/rte_lfring_version.map
> @@ -0,0 +1,19 @@
> +DPDK_2.0 {
> +	global:
> +
> +	rte_ring_create;
> +	rte_ring_dump;
> +	rte_ring_get_memsize;
> +	rte_ring_init;
> +	rte_ring_list_dump;
> +	rte_ring_lookup;

Need to fix function names and DPDK version number

Thanks,
Gage

^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [dpdk-dev] [RFC,v2] lfring: lock-free ring buffer
  2019-06-05 18:21 ` [dpdk-dev] " Eads, Gage
@ 2019-06-15 21:00   ` Ola Liljedahl
  2019-06-18 17:06     ` Eads, Gage
  0 siblings, 1 reply; 4+ messages in thread
From: Ola Liljedahl @ 2019-06-15 21:00 UTC (permalink / raw)
  To: Honnappa Nagarahalli, bruce.richardson, gage.eads, dev; +Cc: nd

On Wed, 2019-06-05 at 18:21 +0000, Eads, Gage wrote:
> Hi Ola,
> 
> Is it possible to add burst enqueue and dequeue functions as well, so we can
> plug this into a mempool handler?
Proper burst enqueue is difficult or at least not very efficient.

>  Besides the mempool handler, I think the all-or-nothing semantics would be
> useful for applications. Besides that, this RFC looks good at a high level.
> 
> For a complete submission, a few more changes are needed:
> - Builds: Need to add 'lfring' to lib/meson.build and mk/rte.app.mk
> - Documentation: need to update release notes, add a new section in the
> programmer's guide, and update the doxygen configuration files
> - Tests: This patchset should add a set of lfring tests as well
> 
> Code comments follow.
Thanks for the review comments, I only had time to look at a few of them. I will
return with more complete answers and a new version of the patch.

-- Ola

> 
> <snip>
>  
> 
> diff --git a/lib/Makefile b/lib/Makefile index d6239d2..cd46339 100644
> --- a/lib/Makefile
> +++ b/lib/Makefile
> @@ -12,6 +12,8 @@ DIRS-$(CONFIG_RTE_LIBRTE_PCI) += librte_pci
> DEPDIRS-librte_pci := librte_eal
>  DIRS-$(CONFIG_RTE_LIBRTE_RING) += librte_ring  DEPDIRS-librte_ring :=
> librte_eal
> +DIRS-$(CONFIG_RTE_LIBRTE_RING) += librte_lfring DEPDIRS-librte_lfring
> +:= librte_eal
> 
> LIBRTE_RING -> LIBRTE_LFRING
> 
> 
>  DIRS-$(CONFIG_RTE_LIBRTE_MEMPOOL) += librte_mempool  DEPDIRS-
> librte_mempool := librte_eal librte_ring
>  DIRS-$(CONFIG_RTE_LIBRTE_MBUF) += librte_mbuf diff --git
> a/lib/librte_lfring/Makefile b/lib/librte_lfring/Makefile new file mode 100644
> index 0000000..c04d2e9
> --- /dev/null
> +++ b/lib/librte_lfring/Makefile
> @@ -0,0 +1,22 @@
> +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2010-2014 Intel
> +Corporation
> 
> Need to correct the copyright
> 
> 
> +
> +include $(RTE_SDK)/mk/rte.vars.mk
> +
> +# library name
> +LIB = librte_lfring.a
> +
> +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 LDLIBS += -lrte_eal
> +
> +EXPORT_MAP := rte_lfring_version.map
> +
> +LIBABIVER := 1
> +
> +# all source are stored in SRCS-y
> +SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_lfring.c
> 
> LIBRTE_RING -> LIBRTE_LFRING
> 
> 
> +
> +# install includes
> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_lfring.h lockfree16.h
> 
> LIBRTE_RING -> LIBRTE_LFRING
> 
> 
> +
> +include $(RTE_SDK)/mk/rte.lib.mk
> diff --git a/lib/librte_lfring/lockfree16.h b/lib/librte_lfring/lockfree16.h
> new
> file mode 100644 index 0000000..199add9
> --- /dev/null
> +++ b/lib/librte_lfring/lockfree16.h
> 
> This needs to be refactored to use the rte_atomic128_cmp_exchange interface.
> 
> <snip>
> 
> 
> diff --git a/lib/librte_lfring/meson.build b/lib/librte_lfring/meson.build new
> file mode 100644 index 0000000..c8b90cb
> --- /dev/null
> +++ b/lib/librte_lfring/meson.build
> @@ -0,0 +1,6 @@
> +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2017 Intel
> +Corporation
> +
> 
> Correct copyright
> 
> 
> +version = 1
> +sources = files('rte_lfring.c')
> +headers = files('rte_lfring.h','lockfree16.h')
> diff --git a/lib/librte_lfring/rte_lfring.c b/lib/librte_lfring/rte_lfring.c
> new file
> mode 100644 index 0000000..e130f71
> --- /dev/null
> +++ b/lib/librte_lfring/rte_lfring.c
> @@ -0,0 +1,264 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2010-2015 Intel Corporation
> + * Copyright (c) 2019 Arm Ltd
> + * All rights reserved.
> + */
> +
> +#include <stdio.h>
> +#include <stdarg.h>
> +#include <string.h>
> +#include <stdint.h>
> +#include <inttypes.h>
> +#include <errno.h>
> +#include <sys/queue.h>
> +
> +#include <rte_common.h>
> +#include <rte_log.h>
> +#include <rte_memory.h>
> +#include <rte_memzone.h>
> +#include <rte_malloc.h>
> +#include <rte_launch.h>
> +#include <rte_eal.h>
> +#include <rte_eal_memconfig.h>
> +#include <rte_atomic.h>
> +#include <rte_per_lcore.h>
> +#include <rte_lcore.h>
> +#include <rte_branch_prediction.h>
> +#include <rte_errno.h>
> +#include <rte_string_fns.h>
> +#include <rte_spinlock.h>
> +
> 
> You can reduce the system and DPDK includes down to:
> 
> #include <inttypes.h>
> #include <string.h>
> 		
> #include <rte_atomic.h>
> #include <rte_eal.h>
> #include <rte_eal_memconfig.h>
> #include <rte_errno.h>
> #include <rte_malloc.h>
> #include <rte_memzone.h>
> #include <rte_rwlock.h>
> #include <rte_tailq.h>
> 
> 
> +#include "rte_lfring.h"
> +
> +TAILQ_HEAD(rte_lfring_list, rte_tailq_entry);
> +
> +static struct rte_tailq_elem rte_lfring_tailq = {
> +	.name = RTE_TAILQ_LFRING_NAME,
> +};
> +EAL_REGISTER_TAILQ(rte_lfring_tailq)
> +
> +/* true if x is a power of 2 */
> +#define POWEROF2(x) ((((x)-1) & (x)) == 0)
> 
> DPDK provides this functionality with RTE_IS_POWER_OF_2(), in rte_common.h
> 
> 
> +
> +/* return the size of memory occupied by a ring */ ssize_t
> +rte_lfring_get_memsize(unsigned int count) {
> +	ssize_t sz;
> +
> +	/* count must be a power of 2 */
> +	if ((!POWEROF2(count)) || (count > 0x80000000U)) {
> +		RTE_LOG(ERR, RING,
> 
> This library needs to replace the RING logging with its own LFRING logging.
> 
> <snip>
> 
> 
> +/* create the ring */
> 
> The comments before function definitions (e.g. "create the ring", "free the
> ring") probably aren't necessary, the names are self-explanatory.
> 
> 
> +struct rte_lfring *
> +rte_lfring_create(const char *name, unsigned int count, int socket_id,
> +		  unsigned int flags)
> +{
> +	char mz_name[RTE_MEMZONE_NAMESIZE];
> +	struct rte_lfring *r;
> +	struct rte_tailq_entry *te;
> +	const struct rte_memzone *mz;
> +	ssize_t ring_size;
> +	int mz_flags = 0;
> +	struct rte_lfring_list *ring_list = NULL;
> +	const unsigned int requested_count = count;
> +	int ret;
> +
> +	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
> +
> +	count = rte_align32pow2(count);
> +
> +	ring_size = rte_lfring_get_memsize(count);
> +	if (ring_size < 0) {
> +		rte_errno = ring_size;
> +		return NULL;
> +	}
> +
> +	ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
> +		RTE_LFRING_MZ_PREFIX, name);
> +	if (ret < 0 || ret >= (int)sizeof(mz_name)) {
> +		rte_errno = ENAMETOOLONG;
> +		return NULL;
> +	}
> +
> +	te = rte_zmalloc("LFRING_TAILQ_ENTRY", sizeof(*te), 0);
> +	if (te == NULL) {
> +		RTE_LOG(ERR, RING, "Cannot reserve memory for tailq\n");
> +		rte_errno = ENOMEM;
> +		return NULL;
> +	}
> +
> +	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
> +
> +	/* reserve a memory zone for this ring. If we can't get rte_config or
> +	 * we are secondary process, the memzone_reserve function will set
> +	 * rte_errno for us appropriately - hence no check in this this
> +	 * function
> +	 */
> 
> The "or we are secondary process" comment is out-of-date; the memzone reserve
> functions can be called by a secondary process. (For that matter, the
> documentation in rte_memzone.h is out-of-date as well.) This restriction was
> removed in commit 916e4f4f4e45.
> 
> <snip>
> 
> 
> +/* free the ring */
> +void
> +rte_lfring_free(struct rte_lfring *r)
> +{
> +	struct rte_lfring_list *ring_list = NULL;
> +	struct rte_tailq_entry *te;
> +
> +	if (r == NULL)
> +		return;
> +
> +	/*
> +	 * Ring was not created with rte_lfring_create,
> +	 * therefore, there is no memzone to free.
> +	 */
> +	if (r->memzone == NULL) {
> +		RTE_LOG(ERR, RING, "Cannot free ring (not created with
> rte_lfring_create()");
> +		return;
> +	}
> +
> +	if (rte_memzone_free(r->memzone) != 0) {
> +		RTE_LOG(ERR, RING, "Cannot free memory\n");
> +		return;
> +	}
> 
> Better to free the ring's memzone after taking it off the list, in the
> unlikely event another thread looks it up at the same time and attempts to use
> it.
> 
> 
> +
> +	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
> +	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
> +
> +	/* find out tailq entry */
> +	TAILQ_FOREACH(te, ring_list, next) {
> +		if (te->data == (void *) r)
> +			break;
> +	}
> +
> +	if (te == NULL) {
> +		rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
> +		return;
> +	}
> +
> +	TAILQ_REMOVE(ring_list, te, next);
> +
> +	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
> +
> +	rte_free(te);
> +}
> 
> <snip>
> 
> 
> +/* search a ring from its name */
> +struct rte_lfring *
> +rte_lfring_lookup(const char *name)
> +{
> +	struct rte_tailq_entry *te;
> +	struct rte_lfring *r = NULL;
> +	struct rte_lfring_list *ring_list;
> +
> +	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
> +
> +	rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
> +
> +	TAILQ_FOREACH(te, ring_list, next) {
> +		r = (struct rte_lfring *) te->data;
> +		if (strncmp(name, r->name, RTE_LFRING_NAMESIZE) == 0)
> 
> Missing a NULL pointer check before dereferencing 'name'
Why shouldn't the program crash if someone passes a NULL pointer parameter?
Callers will be internal, external users should be able to control whether NULL
is passed instead of a valid pointer.
A crash and a core dump is the best way to detect and debug errors.

> 
> 
> +			break;
> +	}
> +
> +	rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
> +
> +	if (te == NULL) {
> +		rte_errno = ENOENT;
> +		return NULL;
> +	}
> +
> +	return r;
> +}
> diff --git a/lib/librte_lfring/rte_lfring.h b/lib/librte_lfring/rte_lfring.h
> new file
> mode 100644 index 0000000..22d3e1c
> --- /dev/null
> +++ b/lib/librte_lfring/rte_lfring.h
> @@ -0,0 +1,621 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2010-2017 Intel Corporation
> + * Copyright (c) 2019 Arm Ltd
> + * All rights reserved.
> + */
> +
> +#ifndef _RTE_LFRING_H_
> +#define _RTE_LFRING_H_
> +
> +/**
> + * @file
> + * RTE Lfring
> + *
> + * The Lfring Manager is a fixed-size queue, implemented as a table of
> + * (pointers + counters).
> + *
> + * - FIFO (First In First Out)
> + * - Maximum size is fixed; the pointers are stored in a table.
> + * - Lock-free implementation.
> + * - Multi- or single-consumer dequeue.
> + * - Multi- or single-producer enqueue.
> + *
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdio.h>
> +#include <stdint.h>
> +#include <sys/queue.h>
> +#include <errno.h>
> +#include <rte_common.h>
> +#include <rte_config.h>
> +#include <rte_memory.h>
> +#include <rte_lcore.h>
> +#include <rte_atomic.h>
> +#include <rte_branch_prediction.h>
> +#include <rte_memzone.h>
> +#include <rte_pause.h>
> +
> 
> You should be able to clean up the includes here too, e.g. rte_pause.h doesn't
> appear to be used.
> 
> 
> +#include "lockfree16.h"
> +
> +#define RTE_TAILQ_LFRING_NAME "RTE_LFRING"
> +
> +#define RTE_LFRING_MZ_PREFIX "RG_"
> +/**< The maximum length of an lfring name. */ #define
> 
> Change "/**<" to "/**", otherwise doxygen applies the comment to
> RTE_LFRING_MZ_PREFIX
> 
> 
> +RTE_LFRING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
> +			   sizeof(RTE_LFRING_MZ_PREFIX) + 1)
> +
> +struct rte_memzone; /* forward declaration, so as not to require
> +memzone.h */
> 
> This forward declaration is no longer necessary
> 
> <snip>
> 
> 
> +/**
> + * Calculate the memory size needed for a lfring
> + *
> + * This function returns the number of bytes needed for a lfring, given
> + * the number of elements in it. This value is the sum of the size of
> + * the structure rte_lfring and the size of the memory needed by the
> + * objects pointers. The value is aligned to a cache line size.
> + *
> + * @param count
> + *   The number of elements in the lfring (must be a power of 2).
> + * @return
> + *   - The memory size needed for the lfring on success.
> + *   - -EINVAL if count is not a power of 2.
> 
> Missing error case: -EINVAL if size exceeds size limit
> 
> 
> + */
> +ssize_t rte_lfring_get_memsize(unsigned int count);
> +
> +/**
> + * Initialize a lfring structure.
> + *
> + * Initialize a lfring structure in memory pointed by "r". The size of
> +the
> + * memory area must be large enough to store the lfring structure and
> +the
> + * object table. It is advised to use rte_lfring_get_memsize() to get
> +the
> + * appropriate size.
> + *
> + * The lfring size is set to *count*, which must be a power of two.
> +Water
> + * marking is disabled by default. The real usable lfring size is
> + * *count-1* instead of *count* to differentiate a free lfring from an
> + * empty lfring.
> 
> "free lfring" -> should say "full lfring"? Same applies to
> rte_lfring_create().
> 
> 
> + *
> + * The lfring is not added in RTE_TAILQ_LFRING global list. Indeed, the
> + * memory given by the caller may not be shareable among dpdk
> + * processes.
> 
> Here and in rte_lfring_create(), the documentation mentions whether or not the
> lfring is added to the RTE_TAILQ_LFRING list. I don't think this makes sense
> in the documentation, as it pertains to the implementation and not the
> interface.
> 
> 
> + *
> + * @param r
> + *   The pointer to the lfring structure followed by the objects table.
> + * @param name
> + *   The name of the lfring.
> + * @param count
> + *   The number of elements in the lfring (must be a power of 2).
> + * @param flags
> + * @return
> + *   0 on success, or a negative value on error.
> + */
> +int rte_lfring_init(struct rte_lfring *r, const char *name, unsigned int
> count,
> +		    unsigned int flags);
> +
> +/**
> + * Create a new lfring named *name* in memory.
> + *
> + * This function uses ``memzone_reserve()`` to allocate memory. Then it
> + * calls rte_lfring_init() to initialize an empty lfring.
> + *
> + * The new lfring size is set to *count*, which must be a power of
> + * two. Water marking is disabled by default. The real usable lfring
> +size
> + * is *count-1* instead of *count* to differentiate a free lfring from
> +an
> + * empty lfring.
> + *
> + * The lfring is added in RTE_TAILQ_LFRING list.
> + *
> + * @param name
> + *   The name of the lfring.
> + * @param count
> + *   The size of the lfring (must be a power of 2).
> + * @param socket_id
> + *   The *socket_id* argument is the socket identifier in case of
> + *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
> + *   constraint for the reserved zone.
> + * @param flags
> + *   An OR of the following:
> + *    - LFRING_F_SP_ENQ: If this flag is set, the default behavior when
> + *      using ``rte_lfring_enqueue()`` or ``rte_lfring_enqueue_bulk()``
> + *      is "single-producer". Otherwise, it is "multi-producers".
> + *    - LFRING_F_SC_DEQ: If this flag is set, the default behavior when
> + *      using ``rte_lfring_dequeue()`` or ``rte_lfring_dequeue_bulk()``
> + *      is "single-consumer". Otherwise, it is "multi-consumers".
> + * @return
> + *   On success, the pointer to the new allocated lfring. NULL on error with
> + *    rte_errno set appropriately. Possible errno values include:
> + *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config
> structure
> + *    - E_RTE_SECONDARY - function was called from a secondary process
> instance
> 
> E_RTE_SECONDARY and E_RTE_NO_CONFIG are not possible error cases
> 
> <snip>
> 
> 
> +#define ENQ_RETRY_LIMIT 32
> 
> Per the coding guidelines (
> https://doc.dpdk.org/guides/contributing/coding_style.html#macros), macros
> should be prefixed with RTE_.
> 
> <snip>
> 
> 
> +/**
> + * Return the number of elements which can be stored in the lfring.
> + *
> + * @param r
> + *   A pointer to the lfring structure.
> + * @return
> + *   The usable size of the lfring.
> + */
> +static inline unsigned int
> +rte_lfring_get_capacity(const struct rte_lfring *r) {
> +	return r->size;
> 
> I believe this should return r->mask, to account for the one unusable ring
> entry.

I think this is a mistake, all ring entries should be usable.
> 
> <snip>
> 
> 
> diff --git a/lib/librte_lfring/rte_lfring_version.map
> b/lib/librte_lfring/rte_lfring_version.map
> new file mode 100644
> index 0000000..d935efd
> --- /dev/null
> +++ b/lib/librte_lfring/rte_lfring_version.map
> @@ -0,0 +1,19 @@
> +DPDK_2.0 {
> +	global:
> +
> +	rte_ring_create;
> +	rte_ring_dump;
> +	rte_ring_get_memsize;
> +	rte_ring_init;
> +	rte_ring_list_dump;
> +	rte_ring_lookup;
> 
> Need to fix function names and DPDK version number
> 
> Thanks,
> Gage
> 
> 
-- 
Ola Liljedahl, System Architect, Arm


^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [dpdk-dev] [RFC,v2] lfring: lock-free ring buffer
  2019-06-15 21:00   ` Ola Liljedahl
@ 2019-06-18 17:06     ` Eads, Gage
  0 siblings, 0 replies; 4+ messages in thread
From: Eads, Gage @ 2019-06-18 17:06 UTC (permalink / raw)
  To: Ola Liljedahl, Honnappa Nagarahalli, Richardson, Bruce, dev; +Cc: nd



> -----Original Message-----
> From: Ola Liljedahl [mailto:Ola.Liljedahl@arm.com]
> Sent: Saturday, June 15, 2019 4:00 PM
> To: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>; Richardson,
> Bruce <bruce.richardson@intel.com>; Eads, Gage <gage.eads@intel.com>;
> dev@dpdk.org
> Cc: nd <nd@arm.com>
> Subject: Re: [RFC,v2] lfring: lock-free ring buffer
> 
> On Wed, 2019-06-05 at 18:21 +0000, Eads, Gage wrote:
> > Hi Ola,
> >
> > Is it possible to add burst enqueue and dequeue functions as well, so
> > we can plug this into a mempool handler?
> Proper burst enqueue is difficult or at least not very efficient.
> 
> >  Besides the mempool handler, I think the all-or-nothing semantics
> > would be useful for applications. Besides that, this RFC looks good at a high
> level.
> >
> > For a complete submission, a few more changes are needed:
> > - Builds: Need to add 'lfring' to lib/meson.build and mk/rte.app.mk
> > - Documentation: need to update release notes, add a new section in
> > the programmer's guide, and update the doxygen configuration files
> > - Tests: This patchset should add a set of lfring tests as well
> >
> > Code comments follow.
> Thanks for the review comments, I only had time to look at a few of them. I
> will return with more complete answers and a new version of the patch.
> 

Sounds good.

<snip>

> > +/* search a ring from its name */
> > +struct rte_lfring *
> > +rte_lfring_lookup(const char *name)
> > +{
> > +	struct rte_tailq_entry *te;
> > +	struct rte_lfring *r = NULL;
> > +	struct rte_lfring_list *ring_list;
> > +
> > +	ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
> > +
> > +	rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
> > +
> > +	TAILQ_FOREACH(te, ring_list, next) {
> > +		r = (struct rte_lfring *) te->data;
> > +		if (strncmp(name, r->name, RTE_LFRING_NAMESIZE) == 0)
> >
> > Missing a NULL pointer check before dereferencing 'name'
> Why shouldn't the program crash if someone passes a NULL pointer
> parameter?
> Callers will be internal, external users should be able to control whether
> NULL is passed instead of a valid pointer.
> A crash and a core dump is the best way to detect and debug errors.

If you think crashing is the appropriate response, rte_panic() with a descriptive error string would be better than a segfault alone.

<snip>

> > +/**
> > + * Return the number of elements which can be stored in the lfring.
> > + *
> > + * @param r
> > + *   A pointer to the lfring structure.
> > + * @return
> > + *   The usable size of the lfring.
> > + */
> > +static inline unsigned int
> > +rte_lfring_get_capacity(const struct rte_lfring *r) {
> > +	return r->size;
> >
> > I believe this should return r->mask, to account for the one unusable
> > ring entry.
> 
> I think this is a mistake, all ring entries should be usable.

Ok, then do these comments from elsewhere in the header need to be corrected?

"The real usable lfring size is *count-1* instead of *count* to differentiate a free lfring from an empty lfring."

^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, back to index

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-01-30 16:36 [RFC,v2] lfring: lock-free ring buffer Ola Liljedahl
2019-06-05 18:21 ` [dpdk-dev] " Eads, Gage
2019-06-15 21:00   ` Ola Liljedahl
2019-06-18 17:06     ` Eads, Gage

DPDK-dev Archive on lore.kernel.org

Archives are clonable:
	git clone --mirror https://lore.kernel.org/dpdk-dev/0 dpdk-dev/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 dpdk-dev dpdk-dev/ https://lore.kernel.org/dpdk-dev \
		dev@dpdk.org
	public-inbox-index dpdk-dev

Example config snippet for mirrors

Newsgroup available over NNTP:
	nntp://nntp.lore.kernel.org/org.dpdk.dev


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git