All of lore.kernel.org
 help / color / mirror / Atom feed
From: Gage Eads <gage.eads@intel.com>
To: dev@dpdk.org
Cc: olivier.matz@6wind.com, arybchenko@solarflare.com,
	bruce.richardson@intel.com, konstantin.ananyev@intel.com,
	stephen@networkplumber.org, jerinj@marvell.com,
	mczekaj@marvell.com, nd@arm.com, Ola.Liljedahl@arm.com
Subject: [PATCH v6 3/6] ring: add a lock-free implementation
Date: Wed,  6 Mar 2019 09:03:39 -0600	[thread overview]
Message-ID: <20190306150342.2894-4-gage.eads@intel.com> (raw)
In-Reply-To: <20190306150342.2894-1-gage.eads@intel.com>

This commit adds support for lock-free circular ring enqueue and dequeue
functions. The ring is supported on 32- and 64-bit architectures, however
it uses a 128-bit compare-and-swap instruction when run on a 64-bit
architecture, and thus is currently limited to x86_64.

The algorithm is based on Ola Liljedahl's lfring, modified to fit within
the rte ring API. With no contention, an enqueue of n pointers uses (1 + n)
CAS operations and a dequeue of n pointers uses 1. This algorithm has worse
average-case performance than the regular rte ring (particularly a
highly-contended ring with large bulk accesses), however:
- For applications with preemptible pthreads, the regular rte ring's
  worst-case performance (i.e. one thread being preempted in the
  update_tail() critical section) is much worse than the lock-free ring's.
- Software caching can mitigate the average case performance for ring-based
  algorithms. For example, a lock-free ring based mempool (a likely use
  case for this ring) with per-thread caching.

To avoid the ABA problem, each ring entry contains a modification counter.
On a 64-bit architecture, the chance of ABA occurring are effectively zero;
a 64-bit counter will take many years to wrap at current CPU frequencies.
On a 32-bit architectures, a lock-free ring must be at least 1024-entries
deep; assuming 100 cycles per ring entry access, this guarantees the ring's
modification counters will wrap on the order of days.

The lock-free ring is enabled via a new flag, RING_F_LF. Because the ring's
memsize is now a function of its flags (the lock-free ring requires 128b
for each entry), this commit adds a new argument ('flags') to
rte_ring_get_memsize(). An API deprecation notice will be sent in a
separate commit.

For ease-of-use, existing ring enqueue and dequeue functions work on both
regular and lock-free rings. This introduces an additional branch in the
datapath, but this should be a highly predictable branch.
ring_perf_autotest shows a negligible performance impact; it's hard to
distinguish a real difference versus system noise.

                                  | ring_perf_autotest cycles with branch -
             Test                 |   ring_perf_autotest cycles without
------------------------------------------------------------------
SP/SC single enq/dequeue          | 0.33
MP/MC single enq/dequeue          | -4.00
SP/SC burst enq/dequeue (size 8)  | 0.00
MP/MC burst enq/dequeue (size 8)  | 0.00
SP/SC burst enq/dequeue (size 32) | 0.00
MP/MC burst enq/dequeue (size 32) | 0.00
SC empty dequeue                  | 1.00
MC empty dequeue                  | 0.00

Single lcore:
SP/SC bulk enq/dequeue (size 8)   | 0.49
MP/MC bulk enq/dequeue (size 8)   | 0.08
SP/SC bulk enq/dequeue (size 32)  | 0.07
MP/MC bulk enq/dequeue (size 32)  | 0.09

Two physical cores:
SP/SC bulk enq/dequeue (size 8)   | 0.19
MP/MC bulk enq/dequeue (size 8)   | -0.37
SP/SC bulk enq/dequeue (size 32)  | 0.09
MP/MC bulk enq/dequeue (size 32)  | -0.05

Two NUMA nodes:
SP/SC bulk enq/dequeue (size 8)   | -1.96
MP/MC bulk enq/dequeue (size 8)   | 0.88
SP/SC bulk enq/dequeue (size 32)  | 0.10
MP/MC bulk enq/dequeue (size 32)  | 0.46

Test setup: x86_64 build with default config, dual-socket Xeon E5-2699 v4,
running on isolcpus cores with a tickless scheduler. Each test run three
times and the results averaged.

Signed-off-by: Gage Eads <gage.eads@intel.com>
---
 lib/librte_ring/rte_ring.c           |  92 +++++++--
 lib/librte_ring/rte_ring.h           | 308 ++++++++++++++++++++++++++---
 lib/librte_ring/rte_ring_c11_mem.h   | 366 ++++++++++++++++++++++++++++++++++-
 lib/librte_ring/rte_ring_generic.h   | 354 +++++++++++++++++++++++++++++++++
 lib/librte_ring/rte_ring_version.map |   7 +
 5 files changed, 1080 insertions(+), 47 deletions(-)

diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index d215acecc..d4a176f57 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -45,9 +45,9 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
 
 /* return the size of memory occupied by a ring */
 ssize_t
-rte_ring_get_memsize(unsigned count)
+rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags)
 {
-	ssize_t sz;
+	ssize_t sz, elt_sz;
 
 	/* count must be a power of 2 */
 	if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
@@ -57,10 +57,23 @@ rte_ring_get_memsize(unsigned count)
 		return -EINVAL;
 	}
 
-	sz = sizeof(struct rte_ring) + count * sizeof(void *);
+	elt_sz = (flags & RING_F_LF) ? 2 * sizeof(void *) : sizeof(void *);
+
+	sz = sizeof(struct rte_ring) + count * elt_sz;
 	sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
 	return sz;
 }
+BIND_DEFAULT_SYMBOL(rte_ring_get_memsize, _v1905, 19.05);
+MAP_STATIC_SYMBOL(ssize_t rte_ring_get_memsize(unsigned int count,
+					       unsigned int flags),
+		  rte_ring_get_memsize_v1905);
+
+ssize_t
+rte_ring_get_memsize_v20(unsigned int count)
+{
+	return rte_ring_get_memsize_v1905(count, 0);
+}
+VERSION_SYMBOL(rte_ring_get_memsize, _v20, 2.0);
 
 int
 rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
@@ -75,6 +88,8 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
 			  RTE_CACHE_LINE_MASK) != 0);
 	RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) &
 			  RTE_CACHE_LINE_MASK) != 0);
+	RTE_BUILD_BUG_ON(sizeof(struct rte_ring_lf_entry) !=
+			 2 * sizeof(void *));
 
 	/* init the ring structure */
 	memset(r, 0, sizeof(*r));
@@ -82,8 +97,6 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
 	if (ret < 0 || ret >= (int)sizeof(r->name))
 		return -ENAMETOOLONG;
 	r->flags = flags;
-	r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
-	r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
 
 	if (flags & RING_F_EXACT_SZ) {
 		r->size = rte_align32pow2(count + 1);
@@ -100,12 +113,46 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
 		r->mask = count - 1;
 		r->capacity = r->mask;
 	}
-	r->prod.head = r->cons.head = 0;
-	r->prod.tail = r->cons.tail = 0;
+
+	r->log2_size = rte_log2_u64(r->size);
+
+	if (flags & RING_F_LF) {
+		uint32_t i;
+
+		r->prod_ptr.single =
+			(flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
+		r->cons_ptr.single =
+			(flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
+		r->prod_ptr.head = r->cons_ptr.head = 0;
+		r->prod_ptr.tail = r->cons_ptr.tail = 0;
+
+		for (i = 0; i < r->size; i++) {
+			struct rte_ring_lf_entry *ring_ptr, *base;
+
+			base = (struct rte_ring_lf_entry *)&r->ring;
+
+			ring_ptr = &base[i & r->mask];
+
+			ring_ptr->cnt = 0;
+		}
+	} else {
+		r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
+		r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
+		r->prod.head = r->cons.head = 0;
+		r->prod.tail = r->cons.tail = 0;
+	}
 
 	return 0;
 }
 
+/* If a ring entry is written on average every M cycles, then a ring entry is
+ * reused every M*count cycles, and a ring entry's counter repeats every
+ * M*count*2^32 cycles. If M=100 on a 2GHz system, then a 1024-entry ring's
+ * counters would repeat every 2.37 days. The likelihood of ABA occurring is
+ * considered sufficiently low for 1024-entry and larger rings.
+ */
+#define MIN_32_BIT_LF_RING_SIZE 1024
+
 /* create the ring */
 struct rte_ring *
 rte_ring_create(const char *name, unsigned count, int socket_id,
@@ -123,11 +170,25 @@ rte_ring_create(const char *name, unsigned count, int socket_id,
 
 	ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
 
+#ifdef RTE_ARCH_64
+#if !defined(RTE_ARCH_X86_64)
+	printf("This platform does not support the atomic operation required for RING_F_LF\n");
+	rte_errno = EINVAL;
+	return NULL;
+#endif
+#else
+	if ((flags & RING_F_LF) && count < MIN_32_BIT_LF_RING_SIZE) {
+		printf("RING_F_LF is only supported on 32-bit platforms for rings with at least 1024 entries.\n");
+		rte_errno = EINVAL;
+		return NULL;
+	}
+#endif
+
 	/* for an exact size ring, round up from count to a power of two */
 	if (flags & RING_F_EXACT_SZ)
 		count = rte_align32pow2(count + 1);
 
-	ring_size = rte_ring_get_memsize(count);
+	ring_size = rte_ring_get_memsize(count, flags);
 	if (ring_size < 0) {
 		rte_errno = ring_size;
 		return NULL;
@@ -227,10 +288,17 @@ rte_ring_dump(FILE *f, const struct rte_ring *r)
 	fprintf(f, "  flags=%x\n", r->flags);
 	fprintf(f, "  size=%"PRIu32"\n", r->size);
 	fprintf(f, "  capacity=%"PRIu32"\n", r->capacity);
-	fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
-	fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
-	fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
-	fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
+	if (r->flags & RING_F_LF) {
+		fprintf(f, "  ct=%"PRIuPTR"\n", r->cons_ptr.tail);
+		fprintf(f, "  ch=%"PRIuPTR"\n", r->cons_ptr.head);
+		fprintf(f, "  pt=%"PRIuPTR"\n", r->prod_ptr.tail);
+		fprintf(f, "  ph=%"PRIuPTR"\n", r->prod_ptr.head);
+	} else {
+		fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
+		fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
+		fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
+		fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
+	}
 	fprintf(f, "  used=%u\n", rte_ring_count(r));
 	fprintf(f, "  avail=%u\n", rte_ring_free_count(r));
 }
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index f16d77b8a..200d7b2a0 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -20,7 +20,7 @@
  *
  * - FIFO (First In First Out)
  * - Maximum size is fixed; the pointers are stored in a table.
- * - Lockless implementation.
+ * - Lockless (and optionally, non-blocking/lock-free) implementation.
  * - Multi- or single-consumer dequeue.
  * - Multi- or single-producer enqueue.
  * - Bulk dequeue.
@@ -98,6 +98,7 @@ struct rte_ring {
 	const struct rte_memzone *memzone;
 			/**< Memzone, if any, containing the rte_ring */
 	uint32_t size;           /**< Size of ring. */
+	uint32_t log2_size;      /**< log2(size of ring) */
 	uint32_t mask;           /**< Mask (size-1) of ring. */
 	uint32_t capacity;       /**< Usable size of ring */
 
@@ -133,6 +134,18 @@ struct rte_ring {
  */
 #define RING_F_EXACT_SZ 0x0004
 #define RTE_RING_SZ_MASK  (0x7fffffffU) /**< Ring size mask */
+/**
+ * The ring uses lock-free enqueue and dequeue functions. These functions
+ * do not have the "non-preemptive" constraint of a regular rte ring, and thus
+ * are suited for applications using preemptible pthreads. However, the
+ * lock-free functions have worse average-case performance than their regular
+ * rte ring counterparts. When used as the handler for a mempool, per-thread
+ * caching can mitigate the performance difference by reducing the number (and
+ * contention) of ring accesses.
+ *
+ * This flag is only supported on 32-bit and x86_64 platforms.
+ */
+#define RING_F_LF 0x0008
 
 /* @internal defines for passing to the enqueue dequeue worker functions */
 #define __IS_SP 1
@@ -150,11 +163,15 @@ struct rte_ring {
  *
  * @param count
  *   The number of elements in the ring (must be a power of 2).
+ * @param flags
+ *   The flags the ring will be created with.
  * @return
  *   - The memory size needed for the ring on success.
  *   - -EINVAL if count is not a power of 2.
  */
-ssize_t rte_ring_get_memsize(unsigned count);
+ssize_t rte_ring_get_memsize(unsigned int count, unsigned int flags);
+ssize_t rte_ring_get_memsize_v20(unsigned int count);
+ssize_t rte_ring_get_memsize_v1905(unsigned int count, unsigned int flags);
 
 /**
  * Initialize a ring structure.
@@ -187,6 +204,10 @@ ssize_t rte_ring_get_memsize(unsigned count);
  *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
  *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *      is "single-consumer". Otherwise, it is "multi-consumers".
+ *    - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2
+ *      number, but up to half the ring space may be wasted.
+ *    - RING_F_LF: If this flag is set, the ring uses lock-free variants of the
+ *      dequeue and enqueue functions.
  * @return
  *   0 on success, or a negative value on error.
  */
@@ -222,12 +243,17 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
  *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
  *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
  *      is "single-consumer". Otherwise, it is "multi-consumers".
+ *    - RING_F_EXACT_SZ: If this flag is set, count can be a non-power-of-2
+ *      number, but up to half the ring space may be wasted.
+ *    - RING_F_LF: If this flag is set, the ring uses lock-free variants of the
+ *      dequeue and enqueue functions.
  * @return
  *   On success, the pointer to the new allocated ring. NULL on error with
  *    rte_errno set appropriately. Possible errno values include:
  *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
  *    - E_RTE_SECONDARY - function was called from a secondary process instance
- *    - EINVAL - count provided is not a power of 2
+ *    - EINVAL - count provided is not a power of 2, or RING_F_LF is used on an
+ *      unsupported platform
  *    - ENOSPC - the maximum number of memzones has already been allocated
  *    - EEXIST - a memzone with the same name already exists
  *    - ENOMEM - no appropriate memory area found in which to create memzone
@@ -283,6 +309,50 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
 	} \
 } while (0)
 
+/* The actual enqueue of pointers on the lock-free ring, used by the
+ * single-producer lock-free enqueue function.
+ */
+#define ENQUEUE_PTRS_LF(r, base, prod_head, obj_table, n) do { \
+	unsigned int i; \
+	const uint32_t size = (r)->size; \
+	size_t idx = prod_head & (r)->mask; \
+	size_t new_cnt = prod_head + size; \
+	struct rte_ring_lf_entry *ring = (struct rte_ring_lf_entry *)base; \
+	unsigned int mask = ~0x3; \
+	if (likely(idx + n < size)) { \
+		for (i = 0; i < (n & mask); i += 4, idx += 4) { \
+			ring[idx].ptr = obj_table[i]; \
+			ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+			ring[idx + 1].ptr = obj_table[i + 1]; \
+			ring[idx + 1].cnt = (new_cnt + i + 1) >> r->log2_size; \
+			ring[idx + 2].ptr = obj_table[i + 2]; \
+			ring[idx + 2].cnt = (new_cnt + i + 2) >> r->log2_size; \
+			ring[idx + 3].ptr = obj_table[i + 3]; \
+			ring[idx + 3].cnt = (new_cnt + i + 3) >> r->log2_size; \
+		} \
+		switch (n & 0x3) { \
+		case 3: \
+			ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+			ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \
+		case 2: \
+			ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+			ring[idx++].ptr = obj_table[i++]; /* fallthrough */ \
+		case 1: \
+			ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+			ring[idx++].ptr = obj_table[i++]; \
+		} \
+	} else { \
+		for (i = 0; idx < size; i++, idx++) { \
+			ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+			ring[idx].ptr = obj_table[i]; \
+		} \
+		for (idx = 0; i < n; i++, idx++) {    \
+			ring[idx].cnt = (new_cnt + i) >> r->log2_size; \
+			ring[idx].ptr = obj_table[i]; \
+		} \
+	} \
+} while (0)
+
 /* the actual copy of pointers on the ring to obj_table.
  * Placed here since identical code needed in both
  * single and multi consumer dequeue functions */
@@ -314,6 +384,43 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
 	} \
 } while (0)
 
+/* The actual copy of pointers on the lock-free ring to obj_table. */
+#define DEQUEUE_PTRS_LF(r, base, cons_head, obj_table, n) do { \
+	unsigned int i; \
+	size_t idx = cons_head & (r)->mask; \
+	const uint32_t size = (r)->size; \
+	struct rte_ring_lf_entry *ring = (struct rte_ring_lf_entry *)base; \
+	unsigned int mask = ~0x3; \
+	if (likely(idx + n < size)) { \
+		for (i = 0; i < (n & mask); i += 4, idx += 4) {\
+			obj_table[i] = ring[idx].ptr; \
+			obj_table[i + 1] = ring[idx + 1].ptr; \
+			obj_table[i + 2] = ring[idx + 2].ptr; \
+			obj_table[i + 3] = ring[idx + 3].ptr; \
+		} \
+		switch (n & 0x3) { \
+		case 3: \
+			obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \
+		case 2: \
+			obj_table[i++] = ring[idx++].ptr; /* fallthrough */ \
+		case 1: \
+			obj_table[i++] = ring[idx++].ptr; \
+		} \
+	} else { \
+		for (i = 0; idx < size; i++, idx++) \
+			obj_table[i] = ring[idx].ptr; \
+		for (idx = 0; i < n; i++, idx++) \
+			obj_table[i] = ring[idx].ptr; \
+	} \
+} while (0)
+
+
+/* @internal 128-bit structure used by the lock-free ring */
+struct rte_ring_lf_entry {
+	void *ptr; /**< Data pointer */
+	uintptr_t cnt; /**< Modification counter */
+};
+
 /* Between load and load. there might be cpu reorder in weak model
  * (powerpc/arm).
  * There are 2 choices for the users
@@ -330,6 +437,70 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
 #endif
 
 /**
+ * @internal Enqueue several objects on the lock-free ring
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param is_sp
+ *   Indicates whether to use single producer or multi-producer head update
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_enqueue(struct rte_ring *r, void * const *obj_table,
+			 unsigned int n, enum rte_ring_queue_behavior behavior,
+			 unsigned int is_sp, unsigned int *free_space)
+{
+	if (is_sp)
+		return __rte_ring_do_lf_enqueue_sp(r, obj_table, n,
+						   behavior, free_space);
+	else
+		return __rte_ring_do_lf_enqueue_mp(r, obj_table, n,
+						   behavior, free_space);
+}
+
+/**
+ * @internal Dequeue several objects from the lock-free ring
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_dequeue(struct rte_ring *r, void **obj_table,
+		 unsigned int n, enum rte_ring_queue_behavior behavior,
+		 unsigned int is_sc, unsigned int *available)
+{
+	if (is_sc)
+		return __rte_ring_do_lf_dequeue_sc(r, obj_table, n,
+						   behavior, available);
+	else
+		return __rte_ring_do_lf_dequeue_mc(r, obj_table, n,
+						   behavior, available);
+}
+
+/**
  * @internal Enqueue several objects on the ring
  *
   * @param r
@@ -436,8 +607,14 @@ static __rte_always_inline unsigned int
 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			__IS_MP, free_space);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_enqueue(r, obj_table, n,
+						RTE_RING_QUEUE_FIXED, __IS_MP,
+						free_space);
+	else
+		return __rte_ring_do_enqueue(r, obj_table, n,
+					     RTE_RING_QUEUE_FIXED, __IS_MP,
+					     free_space);
 }
 
 /**
@@ -459,8 +636,14 @@ static __rte_always_inline unsigned int
 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			__IS_SP, free_space);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_enqueue(r, obj_table, n,
+						RTE_RING_QUEUE_FIXED, __IS_SP,
+						free_space);
+	else
+		return __rte_ring_do_enqueue(r, obj_table, n,
+					     RTE_RING_QUEUE_FIXED, __IS_SP,
+					     free_space);
 }
 
 /**
@@ -486,8 +669,14 @@ static __rte_always_inline unsigned int
 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 		      unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			r->prod.single, free_space);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_enqueue(r, obj_table, n,
+						RTE_RING_QUEUE_FIXED,
+						r->prod_ptr.single, free_space);
+	else
+		return __rte_ring_do_enqueue(r, obj_table, n,
+					     RTE_RING_QUEUE_FIXED,
+					     r->prod.single, free_space);
 }
 
 /**
@@ -570,8 +759,14 @@ static __rte_always_inline unsigned int
 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
-	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			__IS_MC, available);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_dequeue(r, obj_table, n,
+						RTE_RING_QUEUE_FIXED, __IS_MC,
+						available);
+	else
+		return __rte_ring_do_dequeue(r, obj_table, n,
+					     RTE_RING_QUEUE_FIXED, __IS_MC,
+					     available);
 }
 
 /**
@@ -594,8 +789,14 @@ static __rte_always_inline unsigned int
 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
-	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-			__IS_SC, available);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_dequeue(r, obj_table, n,
+						RTE_RING_QUEUE_FIXED, __IS_SC,
+						available);
+	else
+		return __rte_ring_do_dequeue(r, obj_table, n,
+					     RTE_RING_QUEUE_FIXED, __IS_SC,
+					     available);
 }
 
 /**
@@ -621,8 +822,14 @@ static __rte_always_inline unsigned int
 rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
 		unsigned int *available)
 {
-	return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
-				r->cons.single, available);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_dequeue(r, obj_table, n,
+						RTE_RING_QUEUE_FIXED,
+						r->cons_ptr.single, available);
+	else
+		return __rte_ring_do_dequeue(r, obj_table, n,
+					     RTE_RING_QUEUE_FIXED,
+					     r->cons.single, available);
 }
 
 /**
@@ -697,9 +904,13 @@ rte_ring_dequeue(struct rte_ring *r, void **obj_p)
 static inline unsigned
 rte_ring_count(const struct rte_ring *r)
 {
-	uint32_t prod_tail = r->prod.tail;
-	uint32_t cons_tail = r->cons.tail;
-	uint32_t count = (prod_tail - cons_tail) & r->mask;
+	uint32_t count;
+
+	if (r->flags & RING_F_LF)
+		count = (r->prod_ptr.tail - r->cons_ptr.tail) & r->mask;
+	else
+		count = (r->prod.tail - r->cons.tail) & r->mask;
+
 	return (count > r->capacity) ? r->capacity : count;
 }
 
@@ -819,8 +1030,14 @@ static __rte_always_inline unsigned
 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_do_enqueue(r, obj_table, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_enqueue(r, obj_table, n,
+						RTE_RING_QUEUE_VARIABLE,
+						__IS_MP, free_space);
+	else
+		return __rte_ring_do_enqueue(r, obj_table, n,
+					     RTE_RING_QUEUE_VARIABLE,
+					     __IS_MP, free_space);
 }
 
 /**
@@ -842,8 +1059,14 @@ static __rte_always_inline unsigned
 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 			 unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_do_enqueue(r, obj_table, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_enqueue(r, obj_table, n,
+						RTE_RING_QUEUE_VARIABLE,
+						__IS_SP, free_space);
+	else
+		return __rte_ring_do_enqueue(r, obj_table, n,
+					     RTE_RING_QUEUE_VARIABLE,
+					     __IS_SP, free_space);
 }
 
 /**
@@ -869,8 +1092,14 @@ static __rte_always_inline unsigned
 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 		      unsigned int n, unsigned int *free_space)
 {
-	return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE,
-			r->prod.single, free_space);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_enqueue(r, obj_table, n,
+						RTE_RING_QUEUE_VARIABLE,
+						r->prod_ptr.single, free_space);
+	else
+		return __rte_ring_do_enqueue(r, obj_table, n,
+					     RTE_RING_QUEUE_VARIABLE,
+					     r->prod.single, free_space);
 }
 
 /**
@@ -897,8 +1126,14 @@ static __rte_always_inline unsigned
 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
-	return __rte_ring_do_dequeue(r, obj_table, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_dequeue(r, obj_table, n,
+						RTE_RING_QUEUE_VARIABLE,
+						__IS_MC, available);
+	else
+		return __rte_ring_do_dequeue(r, obj_table, n,
+					     RTE_RING_QUEUE_VARIABLE,
+					     __IS_MC, available);
 }
 
 /**
@@ -922,8 +1157,14 @@ static __rte_always_inline unsigned
 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
-	return __rte_ring_do_dequeue(r, obj_table, n,
-			RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_dequeue(r, obj_table, n,
+						RTE_RING_QUEUE_VARIABLE,
+						__IS_SC, available);
+	else
+		return __rte_ring_do_dequeue(r, obj_table, n,
+					     RTE_RING_QUEUE_VARIABLE,
+					     __IS_SC, available);
 }
 
 /**
@@ -949,9 +1190,14 @@ static __rte_always_inline unsigned
 rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
 		unsigned int n, unsigned int *available)
 {
-	return __rte_ring_do_dequeue(r, obj_table, n,
-				RTE_RING_QUEUE_VARIABLE,
-				r->cons.single, available);
+	if (r->flags & RING_F_LF)
+		return __rte_ring_do_lf_dequeue(r, obj_table, n,
+						RTE_RING_QUEUE_VARIABLE,
+						r->cons_ptr.single, available);
+	else
+		return __rte_ring_do_dequeue(r, obj_table, n,
+					     RTE_RING_QUEUE_VARIABLE,
+					     r->cons.single, available);
 }
 
 #ifdef __cplusplus
diff --git a/lib/librte_ring/rte_ring_c11_mem.h b/lib/librte_ring/rte_ring_c11_mem.h
index 545caf257..a672d161e 100644
--- a/lib/librte_ring/rte_ring_c11_mem.h
+++ b/lib/librte_ring/rte_ring_c11_mem.h
@@ -221,8 +221,8 @@ __rte_ring_move_prod_head_ptr(struct rte_ring *r, unsigned int is_sp,
 		/* Ensure the head is read before tail */
 		__atomic_thread_fence(__ATOMIC_ACQUIRE);
 
-		/* load-acquire synchronize with store-release of ht->tail
-		 * in update_tail.
+		/* load-acquire synchronize with store-release of tail in
+		 * __rte_ring_do_lf_dequeue_{sc, mc}.
 		 */
 		cons_tail = __atomic_load_n(&r->cons_ptr.tail,
 					__ATOMIC_ACQUIRE);
@@ -247,6 +247,7 @@ __rte_ring_move_prod_head_ptr(struct rte_ring *r, unsigned int is_sp,
 					0, __ATOMIC_RELAXED,
 					__ATOMIC_RELAXED);
 	} while (unlikely(success == 0));
+
 	return n;
 }
 
@@ -293,8 +294,8 @@ __rte_ring_move_cons_head_ptr(struct rte_ring *r, unsigned int is_sc,
 		/* Ensure the head is read before tail */
 		__atomic_thread_fence(__ATOMIC_ACQUIRE);
 
-		/* this load-acquire synchronize with store-release of ht->tail
-		 * in update_tail.
+		/* load-acquire synchronize with store-release of tail in
+		 * __rte_ring_do_lf_enqueue_{sp, mp}.
 		 */
 		prod_tail = __atomic_load_n(&r->prod_ptr.tail,
 					__ATOMIC_ACQUIRE);
@@ -318,6 +319,363 @@ __rte_ring_move_cons_head_ptr(struct rte_ring *r, unsigned int is_sc,
 							0, __ATOMIC_RELAXED,
 							__ATOMIC_RELAXED);
 	} while (unlikely(success == 0));
+
+	return n;
+}
+
+/**
+ * @internal
+ *   Enqueue several objects on the lock-free ring (single-producer only)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_enqueue_sp(struct rte_ring *r, void * const *obj_table,
+			    unsigned int n,
+			    enum rte_ring_queue_behavior behavior,
+			    unsigned int *free_space)
+{
+	uint32_t free_entries;
+	uintptr_t head, next;
+
+	n = __rte_ring_move_prod_head_ptr(r, 1, n, behavior,
+					  &head, &next, &free_entries);
+	if (n == 0)
+		goto end;
+
+	ENQUEUE_PTRS_LF(r, &r->ring, head, obj_table, n);
+
+	__atomic_store_n(&r->prod_ptr.tail,
+			 r->prod_ptr.tail + n,
+			 __ATOMIC_RELEASE);
+end:
+	if (free_space != NULL)
+		*free_space = free_entries - n;
+	return n;
+}
+
+/* This macro defines the number of times an enqueueing thread can fail to find
+ * a free ring slot before reloading its producer tail index.
+ */
+#define ENQ_RETRY_LIMIT 32
+
+/**
+ * @internal
+ *   Get the next producer tail index.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param idx
+ *   The local tail index
+ * @return
+ *   If the ring's tail is ahead of the local tail, return the shared tail.
+ *   Else, return tail + 1.
+ */
+static __rte_always_inline uintptr_t
+__rte_ring_reload_tail(struct rte_ring *r, uintptr_t idx)
+{
+	uintptr_t fresh = __atomic_load_n(&r->prod_ptr.tail, __ATOMIC_RELAXED);
+
+	if ((intptr_t)(idx - fresh) < 0)
+		idx = fresh; /* fresh is after idx, use it instead */
+	else
+		idx++; /* Continue with next slot */
+
+	return idx;
+}
+
+/**
+ * @internal
+ *   Update the ring's producer tail index. If another thread already updated
+ *   the index beyond the caller's tail value, do nothing.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param idx
+ *   The local tail index
+ * @return
+ *   If the shared tail is ahead of the local tail, return the shared tail.
+ *   Else, return tail + 1.
+ */
+static __rte_always_inline uintptr_t
+__rte_ring_lf_update_tail(struct rte_ring *r, uintptr_t val)
+{
+	volatile uintptr_t *loc = &r->prod_ptr.tail;
+	uintptr_t old = __atomic_load_n(loc, __ATOMIC_RELAXED);
+
+	do {
+		/* Check if the tail has already been updated. */
+		if ((intptr_t)(val - old) < 0)
+			return old;
+
+		/* Else val >= old, need to update *loc */
+	} while (!__atomic_compare_exchange_n(loc, &old, val,
+					      1, __ATOMIC_RELEASE,
+					      __ATOMIC_RELAXED));
+
+	return val;
+}
+
+/**
+ * @internal
+ *   Enqueue several objects on the lock-free ring (multi-producer safe)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_enqueue_mp(struct rte_ring *r, void * const *obj_table,
+			    unsigned int n,
+			    enum rte_ring_queue_behavior behavior,
+			    unsigned int *free_space)
+{
+#if !defined(ALLOW_EXPERIMENTAL_API)
+	RTE_SET_USED(r);
+	RTE_SET_USED(obj_table);
+	RTE_SET_USED(n);
+	RTE_SET_USED(behavior);
+	RTE_SET_USED(free_space);
+	printf("[%s()] RING_F_LF requires an experimental API."
+	       " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
+	       , __func__);
+	return 0;
+#else
+	struct rte_ring_lf_entry *base;
+	uintptr_t head, next, tail;
+	unsigned int i;
+	uint32_t avail;
+
+	/* Atomically update the prod head to reserve n slots. The prod tail
+	 * is modified at the end of the function.
+	 */
+	n = __rte_ring_move_prod_head_ptr(r, 0, n, behavior,
+					  &head, &next, &avail);
+
+	tail = __atomic_load_n(&r->prod_ptr.tail, __ATOMIC_RELAXED);
+	head = __atomic_load_n(&r->cons_ptr.tail, __ATOMIC_ACQUIRE);
+
+	if (unlikely(n == 0))
+		goto end;
+
+	base = (struct rte_ring_lf_entry *)&r->ring;
+
+	for (i = 0; i < n; i++) {
+		unsigned int retries = 0;
+		int success = 0;
+
+		/* Enqueue to the tail entry. If another thread wins the race,
+		 * retry with the new tail.
+		 */
+		do {
+			struct rte_ring_lf_entry old_value, new_value;
+			struct rte_ring_lf_entry *ring_ptr;
+
+			ring_ptr = &base[tail & r->mask];
+
+			old_value = *ring_ptr;
+
+			if (old_value.cnt != (tail >> r->log2_size)) {
+				/* This slot has already been used. Depending
+				 * on how far behind this thread is, either go
+				 * to the next slot or reload the tail.
+				 */
+				uintptr_t prev_tail;
+
+				prev_tail = (tail + r->size) >> r->log2_size;
+
+				if (old_value.cnt != prev_tail ||
+				    ++retries == ENQ_RETRY_LIMIT) {
+					/* This thread either fell 2+ laps
+					 * behind or hit the retry limit, so
+					 * reload the tail index.
+					 */
+					tail = __rte_ring_reload_tail(r, tail);
+					retries = 0;
+				} else {
+					/* Slot already used, try the next. */
+					tail++;
+
+				}
+
+				continue;
+			}
+
+			/* Found a free slot, try to enqueue next element. */
+			new_value.ptr = obj_table[i];
+			new_value.cnt = (tail + r->size) >> r->log2_size;
+
+#ifdef RTE_ARCH_64
+			success = rte_atomic128_cmp_exchange(
+					(rte_int128_t *)ring_ptr,
+					(rte_int128_t *)&old_value,
+					(rte_int128_t *)&new_value,
+					1, __ATOMIC_RELEASE,
+					__ATOMIC_RELAXED);
+#else
+			success = __atomic_compare_exchange(
+					(uint64_t *)ring_ptr,
+					&old_value,
+					&new_value,
+					1, __ATOMIC_RELEASE,
+					__ATOMIC_RELAXED);
+#endif
+		} while (success == 0);
+
+		/* Only increment tail if the CAS succeeds, since it can
+		 * spuriously fail on some architectures.
+		 */
+		tail++;
+	}
+
+end:
+	tail = __rte_ring_lf_update_tail(r, tail);
+
+	if (free_space != NULL)
+		*free_space = avail - n;
+	return n;
+#endif
+}
+
+/**
+ * @internal
+ *   Dequeue several objects from the lock-free ring (single-consumer only)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_dequeue_sc(struct rte_ring *r, void **obj_table,
+			    unsigned int n,
+			    enum rte_ring_queue_behavior behavior,
+			    unsigned int *available)
+{
+	uintptr_t cons_tail, prod_tail, avail;
+
+	cons_tail = __atomic_load_n(&r->cons_ptr.tail, __ATOMIC_RELAXED);
+	prod_tail = __atomic_load_n(&r->prod_ptr.tail, __ATOMIC_ACQUIRE);
+
+	avail = prod_tail - cons_tail;
+
+	/* Set the actual entries for dequeue */
+	if (unlikely(avail < n))
+		n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail;
+
+	if (unlikely(n == 0))
+		goto end;
+
+	DEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n);
+
+	/* Use a read barrier and store-relaxed so we don't unnecessarily order
+	 * writes.
+	 */
+	rte_smp_rmb();
+
+	__atomic_store_n(&r->cons_ptr.tail, cons_tail + n, __ATOMIC_RELAXED);
+end:
+	if (available != NULL)
+		*available = avail - n;
+
+	return n;
+}
+
+/**
+ * @internal
+ *   Dequeue several objects from the lock-free ring (multi-consumer safe)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_dequeue_mc(struct rte_ring *r, void **obj_table,
+			    unsigned int n,
+			    enum rte_ring_queue_behavior behavior,
+			    unsigned int *available)
+{
+	uintptr_t cons_tail, prod_tail, avail;
+
+	cons_tail = __atomic_load_n(&r->cons_ptr.tail, __ATOMIC_RELAXED);
+
+	do {
+		/* Load tail on every iteration to avoid spurious queue empty
+		 * situations.
+		 */
+		prod_tail = __atomic_load_n(&r->prod_ptr.tail,
+					    __ATOMIC_ACQUIRE);
+
+		avail = prod_tail - cons_tail;
+
+		/* Set the actual entries for dequeue */
+		if (unlikely(avail < n))
+			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail;
+
+		if (unlikely(n == 0))
+			goto end;
+
+		DEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n);
+
+		/* Use a read barrier and store-relaxed so we don't
+		 * unnecessarily order writes.
+		 */
+		rte_smp_rmb();
+
+	} while (!__atomic_compare_exchange_n(&r->cons_ptr.tail,
+					      &cons_tail, cons_tail + n,
+					      0, __ATOMIC_RELAXED,
+					      __ATOMIC_RELAXED));
+
+end:
+	if (available != NULL)
+		*available = avail - n;
+
 	return n;
 }
 
diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h
index 6a0e1bbfb..944b353f4 100644
--- a/lib/librte_ring/rte_ring_generic.h
+++ b/lib/librte_ring/rte_ring_generic.h
@@ -297,4 +297,358 @@ __rte_ring_move_cons_head_ptr(struct rte_ring *r, unsigned int is_sc,
 	return n;
 }
 
+/**
+ * @internal
+ *   Enqueue several objects on the lock-free ring (single-producer only)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_enqueue_sp(struct rte_ring *r, void * const *obj_table,
+			    unsigned int n,
+			    enum rte_ring_queue_behavior behavior,
+			    unsigned int *free_space)
+{
+	uint32_t free_entries;
+	uintptr_t head, next;
+
+	n = __rte_ring_move_prod_head_ptr(r, 1, n, behavior,
+					  &head, &next, &free_entries);
+	if (n == 0)
+		goto end;
+
+	ENQUEUE_PTRS_LF(r, &r->ring, head, obj_table, n);
+
+	rte_smp_wmb();
+
+	r->prod_ptr.tail += n;
+end:
+	if (free_space != NULL)
+		*free_space = free_entries - n;
+	return n;
+}
+
+/* This macro defines the number of times an enqueueing thread can fail to find
+ * a free ring slot before reloading its producer tail index.
+ */
+#define ENQ_RETRY_LIMIT 32
+
+/**
+ * @internal
+ *   Get the next producer tail index.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param idx
+ *   The local tail index
+ * @return
+ *   If the ring's tail is ahead of the local tail, return the shared tail.
+ *   Else, return tail + 1.
+ */
+static __rte_always_inline uintptr_t
+__rte_ring_reload_tail(struct rte_ring *r, uintptr_t idx)
+{
+	uintptr_t fresh = r->prod_ptr.tail;
+
+	if ((intptr_t)(idx - fresh) < 0)
+		/* fresh is after idx, use it instead */
+		idx = fresh;
+	else
+		/* Continue with next slot */
+		idx++;
+
+	return idx;
+}
+
+/**
+ * @internal
+ *   Update the ring's producer tail index. If another thread already updated
+ *   the index beyond the caller's tail value, do nothing.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param idx
+ *   The local tail index
+ * @return
+ *   If the shared tail is ahead of the local tail, return the shared tail.
+ *   Else, return tail + 1.
+ */
+static __rte_always_inline uintptr_t
+__rte_ring_lf_update_tail(struct rte_ring *r, uintptr_t val)
+{
+	volatile uintptr_t *loc = &r->prod_ptr.tail;
+	uintptr_t old = *loc;
+
+	do {
+		/* Check if the tail has already been updated. */
+		if ((intptr_t)(val - old) < 0)
+			return old;
+
+		/* Else val >= old, need to update *loc */
+	} while (!__sync_bool_compare_and_swap(loc, old, val));
+
+	return val;
+}
+
+/**
+ * @internal
+ *   Enqueue several objects on the lock-free ring (multi-producer safe)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items to the ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible to the ring
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_enqueue_mp(struct rte_ring *r, void * const *obj_table,
+			    unsigned int n,
+			    enum rte_ring_queue_behavior behavior,
+			    unsigned int *free_space)
+{
+#if !defined(ALLOW_EXPERIMENTAL_API)
+	RTE_SET_USED(r);
+	RTE_SET_USED(obj_table);
+	RTE_SET_USED(n);
+	RTE_SET_USED(behavior);
+	RTE_SET_USED(free_space);
+	printf("[%s()] RING_F_LF requires an experimental API."
+	       " Recompile with ALLOW_EXPERIMENTAL_API to use it.\n"
+	       , __func__);
+	return 0;
+#else
+	struct rte_ring_lf_entry *base;
+	uintptr_t head, next, tail;
+	unsigned int i;
+	uint32_t avail;
+
+	/* Atomically update the prod head to reserve n slots. The prod tail
+	 * is modified at the end of the function.
+	 */
+	n = __rte_ring_move_prod_head_ptr(r, 0, n, behavior,
+					  &head, &next, &avail);
+
+	tail = r->prod_ptr.tail;
+
+	rte_smp_rmb();
+
+	head = r->cons_ptr.tail;
+
+	if (unlikely(n == 0))
+		goto end;
+
+	base = (struct rte_ring_lf_entry *)&r->ring;
+
+	for (i = 0; i < n; i++) {
+		unsigned int retries = 0;
+		int success = 0;
+
+		/* Enqueue to the tail entry. If another thread wins the race,
+		 * retry with the new tail.
+		 */
+		do {
+			struct rte_ring_lf_entry old_value, new_value;
+			struct rte_ring_lf_entry *ring_ptr;
+
+			ring_ptr = &base[tail & r->mask];
+
+			old_value = *ring_ptr;
+
+			if (old_value.cnt != (tail >> r->log2_size)) {
+				/* This slot has already been used. Depending
+				 * on how far behind this thread is, either go
+				 * to the next slot or reload the tail.
+				 */
+				uintptr_t prev_tail;
+
+				prev_tail = (tail + r->size) >> r->log2_size;
+
+				if (old_value.cnt != prev_tail ||
+				    ++retries == ENQ_RETRY_LIMIT) {
+					/* This thread either fell 2+ laps
+					 * behind or hit the retry limit, so
+					 * reload the tail index.
+					 */
+					tail = __rte_ring_reload_tail(r, tail);
+					retries = 0;
+				} else {
+					/* Slot already used, try the next. */
+					tail++;
+
+				}
+
+				continue;
+			}
+
+			/* Found a free slot, try to enqueue next element. */
+			new_value.ptr = obj_table[i];
+			new_value.cnt = (tail + r->size) >> r->log2_size;
+
+#ifdef RTE_ARCH_64
+			success = rte_atomic128_cmp_exchange(
+					(rte_int128_t *)ring_ptr,
+					(rte_int128_t *)&old_value,
+					(rte_int128_t *)&new_value,
+					1, __ATOMIC_RELEASE,
+					__ATOMIC_RELAXED);
+#else
+			uint64_t *old_ptr = (uint64_t *)&old_value;
+			uint64_t *new_ptr = (uint64_t *)&new_value;
+
+			success = rte_atomic64_cmpset(
+					(volatile uint64_t *)ring_ptr,
+					*old_ptr, *new_ptr);
+#endif
+		} while (success == 0);
+
+		/* Only increment tail if the CAS succeeds, since it can
+		 * spuriously fail on some architectures.
+		 */
+		tail++;
+	}
+
+end:
+
+	tail = __rte_ring_lf_update_tail(r, tail);
+
+	if (free_space != NULL)
+		*free_space = avail - n;
+	return n;
+#endif
+}
+
+/**
+ * @internal
+ *   Dequeue several objects from the lock-free ring (single-consumer only)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_dequeue_sc(struct rte_ring *r, void **obj_table,
+			    unsigned int n,
+			    enum rte_ring_queue_behavior behavior,
+			    unsigned int *available)
+{
+	uintptr_t cons_tail, prod_tail, avail;
+
+	cons_tail = r->cons_ptr.tail;
+
+	rte_smp_rmb();
+
+	prod_tail = r->prod_ptr.tail;
+
+	avail = prod_tail - cons_tail;
+
+	/* Set the actual entries for dequeue */
+	if (unlikely(avail < n))
+		n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail;
+
+	if (unlikely(n == 0))
+		goto end;
+
+	DEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n);
+
+	rte_smp_rmb();
+
+	r->cons_ptr.tail += n;
+end:
+	if (available != NULL)
+		*available = avail - n;
+
+	return n;
+}
+
+/**
+ * @internal
+ *   Dequeue several objects from the lock-free ring (multi-consumer safe)
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from the ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from the ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_lf_dequeue_mc(struct rte_ring *r, void **obj_table,
+			    unsigned int n,
+			    enum rte_ring_queue_behavior behavior,
+			    unsigned int *available)
+{
+	uintptr_t cons_tail, prod_tail, avail;
+
+	cons_tail = r->cons_ptr.tail;
+
+	do {
+		rte_smp_rmb();
+
+		/* Load tail on every iteration to avoid spurious queue empty
+		 * situations.
+		 */
+		prod_tail = r->prod_ptr.tail;
+
+		avail = prod_tail - cons_tail;
+
+		/* Set the actual entries for dequeue */
+		if (unlikely(avail < n))
+			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : avail;
+
+		if (unlikely(n == 0))
+			goto end;
+
+		DEQUEUE_PTRS_LF(r, &r->ring, cons_tail, obj_table, n);
+
+	} while (!__sync_bool_compare_and_swap(&r->cons_ptr.tail,
+					       cons_tail, cons_tail + n));
+
+end:
+	if (available != NULL)
+		*available = avail - n;
+
+	return n;
+}
+
 #endif /* _RTE_RING_GENERIC_H_ */
diff --git a/lib/librte_ring/rte_ring_version.map b/lib/librte_ring/rte_ring_version.map
index d935efd0d..8969467af 100644
--- a/lib/librte_ring/rte_ring_version.map
+++ b/lib/librte_ring/rte_ring_version.map
@@ -17,3 +17,10 @@ DPDK_2.2 {
 	rte_ring_free;
 
 } DPDK_2.0;
+
+DPDK_19.05 {
+	global:
+
+	rte_ring_get_memsize;
+
+} DPDK_2.2;
-- 
2.13.6

  parent reply	other threads:[~2019-03-06 15:04 UTC|newest]

Thread overview: 102+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-01-10 21:01 [PATCH 0/6] Add non-blocking ring Gage Eads
2019-01-10 21:01 ` [PATCH 1/6] ring: change head and tail to pointer-width size Gage Eads
2019-01-11  4:38   ` Stephen Hemminger
2019-01-11 19:07     ` Eads, Gage
2019-01-11 10:25   ` Burakov, Anatoly
2019-01-11 19:12     ` Eads, Gage
2019-01-11 19:55       ` Stephen Hemminger
2019-01-15 15:48         ` Eads, Gage
2019-01-11 10:40   ` Burakov, Anatoly
2019-01-11 10:58     ` Bruce Richardson
2019-01-11 11:30       ` Burakov, Anatoly
     [not found]         ` <20190111115851.GC3336@bricha3-MOBL.ger.corp.intel.com>
2019-01-11 19:27           ` Eads, Gage
2019-01-21 14:14             ` Burakov, Anatoly
2019-01-22 18:27               ` Eads, Gage
2019-01-10 21:01 ` [PATCH 2/6] ring: add a non-blocking implementation Gage Eads
2019-01-10 21:01 ` [PATCH 3/6] test_ring: add non-blocking ring autotest Gage Eads
2019-01-10 21:01 ` [PATCH 4/6] test_ring_perf: add non-blocking ring perf test Gage Eads
2019-01-10 21:01 ` [PATCH 5/6] mempool/ring: add non-blocking ring handlers Gage Eads
2019-01-13 13:43   ` Andrew Rybchenko
2019-01-10 21:01 ` [PATCH 6/6] doc: add NB ring comment to EAL "known issues" Gage Eads
2019-01-11  2:51   ` Varghese, Vipin
2019-01-11 19:30     ` Eads, Gage
2019-01-14  0:07       ` Varghese, Vipin
2019-01-15 23:52 ` [PATCH v2 0/5] Add non-blocking ring Gage Eads
2019-01-15 23:52   ` [PATCH v2 1/5] ring: change head and tail to pointer-width size Gage Eads
2019-01-15 23:52   ` [PATCH v2 2/5] ring: add a non-blocking implementation Gage Eads
2019-01-15 23:52   ` [PATCH v2 3/5] test_ring: add non-blocking ring autotest Gage Eads
2019-01-15 23:52   ` [PATCH v2 4/5] test_ring_perf: add non-blocking ring perf test Gage Eads
2019-01-15 23:52   ` [PATCH v2 5/5] mempool/ring: add non-blocking ring handlers Gage Eads
2019-01-16  0:26   ` [PATCH v2 0/5] Add non-blocking ring Stephen Hemminger
2019-01-18 15:23   ` [PATCH v3 " Gage Eads
2019-01-18 15:23     ` [PATCH v3 1/5] ring: add 64-bit headtail structure Gage Eads
2019-01-18 15:23     ` [PATCH v3 2/5] ring: add a non-blocking implementation Gage Eads
2019-01-22 10:12       ` Ola Liljedahl
2019-01-22 14:49       ` Ola Liljedahl
2019-01-22 21:31         ` Eads, Gage
2019-01-23 10:16           ` Ola Liljedahl
2019-01-25 17:21             ` Eads, Gage
2019-01-28 10:35               ` Ola Liljedahl
2019-01-28 18:54                 ` Eads, Gage
2019-01-28 22:31                   ` Ola Liljedahl
2019-01-28 13:34               ` Jerin Jacob Kollanukkaran
2019-01-28 13:43                 ` Ola Liljedahl
2019-01-28 14:04                   ` Jerin Jacob Kollanukkaran
2019-01-28 14:06                     ` Ola Liljedahl
2019-01-28 18:59                 ` Eads, Gage
2019-01-18 15:23     ` [PATCH v3 3/5] test_ring: add non-blocking ring autotest Gage Eads
2019-01-18 15:23     ` [PATCH v3 4/5] test_ring_perf: add non-blocking ring perf test Gage Eads
2019-01-18 15:23     ` [PATCH v3 5/5] mempool/ring: add non-blocking ring handlers Gage Eads
2019-01-22  9:27     ` [PATCH v3 0/5] Add non-blocking ring Ola Liljedahl
2019-01-22 10:15       ` Ola Liljedahl
2019-01-22 19:15       ` Eads, Gage
2019-01-23 16:02       ` Jerin Jacob Kollanukkaran
2019-01-23 16:29         ` Ola Liljedahl
2019-01-28 13:10           ` [EXT] " Jerin Jacob Kollanukkaran
2019-01-25  5:20     ` Honnappa Nagarahalli
2019-01-25 17:42       ` Eads, Gage
2019-01-25 17:56       ` Eads, Gage
2019-01-28 10:41         ` Ola Liljedahl
2019-01-28 18:14     ` [PATCH v4 " Gage Eads
2019-01-28 18:14       ` [PATCH v4 1/5] ring: add 64-bit headtail structure Gage Eads
2019-01-29 12:56         ` Ola Liljedahl
2019-01-30  4:26           ` Eads, Gage
2019-01-28 18:14       ` [PATCH v4 2/5] ring: add a non-blocking implementation Gage Eads
2019-01-28 18:14       ` [PATCH v4 3/5] test_ring: add non-blocking ring autotest Gage Eads
2019-01-28 18:14       ` [PATCH v4 4/5] test_ring_perf: add non-blocking ring perf test Gage Eads
2019-01-28 18:14       ` [PATCH v4 5/5] mempool/ring: add non-blocking ring handlers Gage Eads
2019-03-05 17:40       ` [PATCH v5 0/6] Add lock-free ring and mempool handler Gage Eads
2019-03-05 17:40         ` [PATCH v5 1/6] ring: add a pointer-width headtail structure Gage Eads
2019-03-05 17:40         ` [PATCH v5 2/6] ring: add a ring start marker Gage Eads
2019-03-05 17:40         ` [PATCH v5 3/6] ring: add a lock-free implementation Gage Eads
2019-03-05 17:40         ` [PATCH v5 4/6] test_ring: add lock-free ring autotest Gage Eads
2019-03-05 17:40         ` [PATCH v5 5/6] test_ring_perf: add lock-free ring perf test Gage Eads
2019-03-05 17:40         ` [PATCH v5 6/6] mempool/ring: add lock-free ring handlers Gage Eads
2019-03-06 15:03         ` [PATCH v6 0/6] Add lock-free ring and mempool handler Gage Eads
2019-03-06 15:03           ` [PATCH v6 1/6] ring: add a pointer-width headtail structure Gage Eads
2019-03-06 15:03           ` [PATCH v6 2/6] ring: add a ring start marker Gage Eads
2019-03-06 15:03           ` Gage Eads [this message]
2019-03-06 15:03           ` [PATCH v6 4/6] test_ring: add lock-free ring autotest Gage Eads
2019-03-06 15:03           ` [PATCH v6 5/6] test_ring_perf: add lock-free ring perf test Gage Eads
2019-03-06 15:03           ` [PATCH v6 6/6] mempool/ring: add lock-free ring handlers Gage Eads
2019-03-18 21:35           ` [PATCH v7 0/6] Add lock-free ring and mempool handler Gage Eads
2019-03-18 21:35             ` [PATCH v7 1/6] ring: add a pointer-width headtail structure Gage Eads
2019-03-18 21:35             ` [PATCH v7 2/6] ring: add a ring start marker Gage Eads
2019-03-18 21:35             ` [PATCH v7 3/6] ring: add a lock-free implementation Gage Eads
2019-03-19 15:50               ` Stephen Hemminger
2019-03-18 21:35             ` [PATCH v7 4/6] test_ring: add lock-free ring autotest Gage Eads
2019-03-18 21:35             ` [PATCH v7 5/6] test_ring_perf: add lock-free ring perf test Gage Eads
2019-03-18 21:35             ` [PATCH v7 6/6] mempool/ring: add lock-free ring handlers Gage Eads
2019-03-18 21:49             ` [PATCH v7 0/6] Add lock-free ring and mempool handler Eads, Gage
2019-03-19 15:51               ` Stephen Hemminger
2019-04-01 19:23                 ` Eads, Gage
2019-04-02 10:16                   ` Ola Liljedahl
2019-04-04 22:28                     ` [dpdk-dev] " Eads, Gage
2019-03-19  1:20             ` [PATCH v8 " Gage Eads
2019-03-19  1:20               ` [PATCH v8 1/6] ring: add a pointer-width headtail structure Gage Eads
2019-03-19  1:20               ` [PATCH v8 2/6] ring: add a ring start marker Gage Eads
2019-03-19  1:20               ` [PATCH v8 3/6] ring: add a lock-free implementation Gage Eads
2019-03-19  1:20               ` [PATCH v8 4/6] test_ring: add lock-free ring autotest Gage Eads
2019-03-19  1:20               ` [PATCH v8 5/6] test_ring_perf: add lock-free ring perf test Gage Eads
2019-03-19  1:20               ` [PATCH v8 6/6] mempool/ring: add lock-free ring handlers Gage Eads
2019-04-03 16:46               ` [PATCH v8 0/6] Add lock-free ring and mempool handler Thomas Monjalon

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20190306150342.2894-4-gage.eads@intel.com \
    --to=gage.eads@intel.com \
    --cc=Ola.Liljedahl@arm.com \
    --cc=arybchenko@solarflare.com \
    --cc=bruce.richardson@intel.com \
    --cc=dev@dpdk.org \
    --cc=jerinj@marvell.com \
    --cc=konstantin.ananyev@intel.com \
    --cc=mczekaj@marvell.com \
    --cc=nd@arm.com \
    --cc=olivier.matz@6wind.com \
    --cc=stephen@networkplumber.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.