All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
@ 2017-11-02  8:43 Jia He
  2017-11-02 13:26 ` Ananyev, Konstantin
  2017-11-02 17:23 ` Jerin Jacob
  0 siblings, 2 replies; 13+ messages in thread
From: Jia He @ 2017-11-02  8:43 UTC (permalink / raw)
  To: jerin.jacob, dev, olivier.matz
  Cc: konstantin.ananyev, bruce.richardson, jianbo.liu, hemant.agrawal,
	Jia He, jie2.liu, bing.zhao, jia.he

We watched a rte panic of mbuf_autotest in our qualcomm arm64 server.
As for the possible race condition, please refer to [1].

Furthermore, there are 2 options as suggested by Jerin:
1. use rte_smp_rmb
2. use load_acquire/store_release(refer to [2]).
CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by
default it is n;

The reason why providing 2 options is due to the performance benchmark
difference in different arm machines, please refer to [3].

Already fuctionally tested on the machines as follows:
on X86(passed the compilation)
on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=y
on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=n

[1] http://dpdk.org/ml/archives/dev/2017-October/078366.html
[2] https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h#L170
[3] http://dpdk.org/ml/archives/dev/2017-October/080861.html

---
Changelog:
V2: let users choose whether using load_acquire/store_release
V1: rte_smp_rmb() between 2 loads

Signed-off-by: Jia He <hejianet@gmail.com>
Signed-off-by: jie2.liu@hxt-semitech.com
Signed-off-by: bing.zhao@hxt-semitech.com
Signed-off-by: jia.he@hxt-semitech.com
Suggested-by: jerin.jacob@caviumnetworks.com
---
 lib/librte_ring/Makefile           |  4 +++-
 lib/librte_ring/rte_ring.h         | 38 ++++++++++++++++++++++++------
 lib/librte_ring/rte_ring_arm64.h   | 48 ++++++++++++++++++++++++++++++++++++++
 lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++
 4 files changed, 127 insertions(+), 8 deletions(-)
 create mode 100644 lib/librte_ring/rte_ring_arm64.h
 create mode 100644 lib/librte_ring/rte_ring_generic.h

diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
index e34d9d9..fa57a86 100644
--- a/lib/librte_ring/Makefile
+++ b/lib/librte_ring/Makefile
@@ -45,6 +45,8 @@ LIBABIVER := 1
 SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
 
 # install includes
-SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h
+SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
+					rte_ring_arm64.h \
+					rte_ring_generic.h
 
 include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 5e9b3b7..943b1f9 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -103,6 +103,18 @@ extern "C" {
 #include <rte_memzone.h>
 #include <rte_pause.h>
 
+/* In those strong memory models (e.g. x86), there is no need to add rmb()
+ * between load and load.
+ * In those weak models(powerpc/arm), there are 2 choices for the users
+ * 1.use rmb() memory barrier
+ * 2.use one-direcion load_acquire/store_release barrier
+ * It depends on performance test results. */
+#ifdef RTE_ARCH_ARM64
+#include "rte_ring_arm64.h"
+#else
+#include "rte_ring_generic.h"
+#endif
+
 #define RTE_TAILQ_RING_NAME "RTE_RING"
 
 enum rte_ring_queue_behavior {
@@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
 		while (unlikely(ht->tail != old_val))
 			rte_pause();
 
-	ht->tail = new_val;
+	arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
 }
 
 /**
@@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
 		/* Reset n to the initial burst count */
 		n = max;
 
-		*old_head = r->prod.head;
+		*old_head = arch_rte_atomic_load(&r->prod.head,
+						__ATOMIC_ACQUIRE);
 		const uint32_t cons_tail = r->cons.tail;
 		/*
 		 *  The subtraction is done between two unsigned 32bits value
@@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
 		if (is_sp)
 			r->prod.head = *new_head, success = 1;
 		else
-			success = rte_atomic32_cmpset(&r->prod.head,
-					*old_head, *new_head);
+			success = arch_rte_atomic32_cmpset(&r->prod.head,
+					old_head, *new_head,
+					0, __ATOMIC_ACQUIRE,
+					__ATOMIC_RELAXED);
 	} while (unlikely(success == 0));
 	return n;
 }
@@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
 		goto end;
 
 	ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
+
+#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
 	rte_smp_wmb();
+#endif
 
 	update_tail(&r->prod, prod_head, prod_next, is_sp);
 end:
@@ -516,7 +534,8 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
 		/* Restore n as it may change every loop */
 		n = max;
 
-		*old_head = r->cons.head;
+		*old_head = arch_rte_atomic_load(&r->cons.head,
+						__ATOMIC_ACQUIRE);
 		const uint32_t prod_tail = r->prod.tail;
 		/* The subtraction is done between two unsigned 32bits value
 		 * (the result is always modulo 32 bits even if we have
@@ -535,8 +554,10 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
 		if (is_sc)
 			r->cons.head = *new_head, success = 1;
 		else
-			success = rte_atomic32_cmpset(&r->cons.head, *old_head,
-					*new_head);
+			success = arch_rte_atomic32_cmpset(&r->cons.head,
+					old_head, *new_head,
+					0, __ATOMIC_ACQUIRE,
+					__ATOMIC_RELAXED);
 	} while (unlikely(success == 0));
 	return n;
 }
@@ -575,7 +596,10 @@ __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
 		goto end;
 
 	DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
+
+#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
 	rte_smp_rmb();
+#endif
 
 	update_tail(&r->cons, cons_head, cons_next, is_sc);
 
diff --git a/lib/librte_ring/rte_ring_arm64.h b/lib/librte_ring/rte_ring_arm64.h
new file mode 100644
index 0000000..09438e5
--- /dev/null
+++ b/lib/librte_ring/rte_ring_arm64.h
@@ -0,0 +1,48 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2017 hxt-semitech. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of hxt-semitech nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_RING_ARM64_H_
+#define _RTE_RING_ARM64_H_
+
+#define arch_rte_atomic_store __atomic_store_n
+#define arch_rte_atomic32_cmpset __atomic_compare_exchange_n
+#ifdef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
+#define arch_rte_atomic_load __atomic_load_n
+#else
+#define arch_rte_atomic_load(a, b) \
+	*(a); \
+	rte_smp_rmb()
+
+#endif
+
+#endif /* _RTE_RING_ARM64_H_ */
+
diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h
new file mode 100644
index 0000000..4fb777c
--- /dev/null
+++ b/lib/librte_ring/rte_ring_generic.h
@@ -0,0 +1,45 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2017 hxt-semitech. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of hxt-semitech nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_RING_GENERIC_H_
+#define _RTE_RING_GENERIC_H_
+
+#define arch_rte_atomic_load(a, b) *(a)
+#define arch_rte_atomic_store(a, b, c) \
+do { \
+	*(a) = b; \
+} while(0)
+#define arch_rte_atomic32_cmpset(a, b, c, d, e, f)  \
+	rte_atomic32_cmpset(a, *(b), c)
+
+#endif /* _RTE_RING_GENERIC_H_ */
+
-- 
2.7.4

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
  2017-11-02  8:43 [PATCH v2] ring: guarantee ordering of cons/prod loading when doing Jia He
@ 2017-11-02 13:26 ` Ananyev, Konstantin
  2017-11-02 15:42   ` Jia He
  2017-11-02 17:23 ` Jerin Jacob
  1 sibling, 1 reply; 13+ messages in thread
From: Ananyev, Konstantin @ 2017-11-02 13:26 UTC (permalink / raw)
  To: Jia He, jerin.jacob, dev, olivier.matz
  Cc: Richardson, Bruce, jianbo.liu, hemant.agrawal, jie2.liu,
	bing.zhao, jia.he

Hi Jia,

> -----Original Message-----
> From: Jia He [mailto:hejianet@gmail.com]
> Sent: Thursday, November 2, 2017 8:44 AM
> To: jerin.jacob@caviumnetworks.com; dev@dpdk.org; olivier.matz@6wind.com
> Cc: Ananyev, Konstantin <konstantin.ananyev@intel.com>; Richardson, Bruce <bruce.richardson@intel.com>; jianbo.liu@arm.com;
> hemant.agrawal@nxp.com; Jia He <hejianet@gmail.com>; jie2.liu@hxt-semitech.com; bing.zhao@hxt-semitech.com; jia.he@hxt-
> semitech.com
> Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
> 
> We watched a rte panic of mbuf_autotest in our qualcomm arm64 server.
> As for the possible race condition, please refer to [1].
> 
> Furthermore, there are 2 options as suggested by Jerin:
> 1. use rte_smp_rmb
> 2. use load_acquire/store_release(refer to [2]).
> CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by
> default it is n;
> 
> The reason why providing 2 options is due to the performance benchmark
> difference in different arm machines, please refer to [3].
> 
> Already fuctionally tested on the machines as follows:
> on X86(passed the compilation)
> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=y
> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=n
> 
> [1] http://dpdk.org/ml/archives/dev/2017-October/078366.html
> [2] https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h#L170
> [3] http://dpdk.org/ml/archives/dev/2017-October/080861.html
> 
> ---
> Changelog:
> V2: let users choose whether using load_acquire/store_release
> V1: rte_smp_rmb() between 2 loads
> 
> Signed-off-by: Jia He <hejianet@gmail.com>
> Signed-off-by: jie2.liu@hxt-semitech.com
> Signed-off-by: bing.zhao@hxt-semitech.com
> Signed-off-by: jia.he@hxt-semitech.com
> Suggested-by: jerin.jacob@caviumnetworks.com
> ---
>  lib/librte_ring/Makefile           |  4 +++-
>  lib/librte_ring/rte_ring.h         | 38 ++++++++++++++++++++++++------
>  lib/librte_ring/rte_ring_arm64.h   | 48 ++++++++++++++++++++++++++++++++++++++
>  lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++
>  4 files changed, 127 insertions(+), 8 deletions(-)
>  create mode 100644 lib/librte_ring/rte_ring_arm64.h
>  create mode 100644 lib/librte_ring/rte_ring_generic.h
> 
> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
> index e34d9d9..fa57a86 100644
> --- a/lib/librte_ring/Makefile
> +++ b/lib/librte_ring/Makefile
> @@ -45,6 +45,8 @@ LIBABIVER := 1
>  SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
> 
>  # install includes
> -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h
> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
> +					rte_ring_arm64.h \
> +					rte_ring_generic.h
> 
>  include $(RTE_SDK)/mk/rte.lib.mk
> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> index 5e9b3b7..943b1f9 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -103,6 +103,18 @@ extern "C" {
>  #include <rte_memzone.h>
>  #include <rte_pause.h>
> 
> +/* In those strong memory models (e.g. x86), there is no need to add rmb()
> + * between load and load.
> + * In those weak models(powerpc/arm), there are 2 choices for the users
> + * 1.use rmb() memory barrier
> + * 2.use one-direcion load_acquire/store_release barrier
> + * It depends on performance test results. */
> +#ifdef RTE_ARCH_ARM64
> +#include "rte_ring_arm64.h"
> +#else
> +#include "rte_ring_generic.h"
> +#endif
> +
>  #define RTE_TAILQ_RING_NAME "RTE_RING"
> 
>  enum rte_ring_queue_behavior {
> @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
>  		while (unlikely(ht->tail != old_val))
>  			rte_pause();
> 
> -	ht->tail = new_val;
> +	arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
>  }
> 
>  /**
> @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
>  		/* Reset n to the initial burst count */
>  		n = max;
> 
> -		*old_head = r->prod.head;
> +		*old_head = arch_rte_atomic_load(&r->prod.head,
> +						__ATOMIC_ACQUIRE);
>  		const uint32_t cons_tail = r->cons.tail;

The code starts to look a bit messy with all these arch specific macros...
So I wonder wouldn't it be more cleaner to:

1. move existing __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail
into rte_ring_generic.h
2. Add rte_smp_rmb into generic __rte_ring_move_prod_head/__rte_ring_move_cons_head 
(as was in v1 of your patch).
3. Introduce ARM specific versions of __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail
in the rte_ring_arm64.h

That way we will keep ogneric code simple and clean, while still allowing arch specific optimizations.

>  		/*
>  		 *  The subtraction is done between two unsigned 32bits value
> @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
>  		if (is_sp)
>  			r->prod.head = *new_head, success = 1;
>  		else
> -			success = rte_atomic32_cmpset(&r->prod.head,
> -					*old_head, *new_head);
> +			success = arch_rte_atomic32_cmpset(&r->prod.head,
> +					old_head, *new_head,
> +					0, __ATOMIC_ACQUIRE,
> +					__ATOMIC_RELAXED);
>  	} while (unlikely(success == 0));
>  	return n;
>  }
> @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
>  		goto end;
> 
>  	ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
> +
> +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER

I wonder why do we need that macro?
Would be there situations when smp_wmb() are not needed here?
Konstantin

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
  2017-11-02 13:26 ` Ananyev, Konstantin
@ 2017-11-02 15:42   ` Jia He
  2017-11-02 16:16     ` Ananyev, Konstantin
  0 siblings, 1 reply; 13+ messages in thread
From: Jia He @ 2017-11-02 15:42 UTC (permalink / raw)
  To: Ananyev, Konstantin, jerin.jacob, dev, olivier.matz
  Cc: Richardson, Bruce, jianbo.liu, hemant.agrawal, jie2.liu,
	bing.zhao, jia.he

Hi Ananyev


On 11/2/2017 9:26 PM, Ananyev, Konstantin Wrote:
> Hi Jia,
>
>> -----Original Message-----
>> From: Jia He [mailto:hejianet@gmail.com]
>> Sent: Thursday, November 2, 2017 8:44 AM
>> To: jerin.jacob@caviumnetworks.com; dev@dpdk.org; olivier.matz@6wind.com
>> Cc: Ananyev, Konstantin <konstantin.ananyev@intel.com>; Richardson, Bruce <bruce.richardson@intel.com>; jianbo.liu@arm.com;
>> hemant.agrawal@nxp.com; Jia He <hejianet@gmail.com>; jie2.liu@hxt-semitech.com; bing.zhao@hxt-semitech.com; jia.he@hxt-
>> semitech.com
>> Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
>>
>> We watched a rte panic of mbuf_autotest in our qualcomm arm64 server.
>> As for the possible race condition, please refer to [1].
>>
>> Furthermore, there are 2 options as suggested by Jerin:
>> 1. use rte_smp_rmb
>> 2. use load_acquire/store_release(refer to [2]).
>> CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by
>> default it is n;
>>
>> The reason why providing 2 options is due to the performance benchmark
>> difference in different arm machines, please refer to [3].
>>
>> Already fuctionally tested on the machines as follows:
>> on X86(passed the compilation)
>> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=y
>> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=n
>>
>> [1] http://dpdk.org/ml/archives/dev/2017-October/078366.html
>> [2] https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h#L170
>> [3] http://dpdk.org/ml/archives/dev/2017-October/080861.html
>>
>> ---
>> Changelog:
>> V2: let users choose whether using load_acquire/store_release
>> V1: rte_smp_rmb() between 2 loads
>>
>> Signed-off-by: Jia He <hejianet@gmail.com>
>> Signed-off-by: jie2.liu@hxt-semitech.com
>> Signed-off-by: bing.zhao@hxt-semitech.com
>> Signed-off-by: jia.he@hxt-semitech.com
>> Suggested-by: jerin.jacob@caviumnetworks.com
>> ---
>>   lib/librte_ring/Makefile           |  4 +++-
>>   lib/librte_ring/rte_ring.h         | 38 ++++++++++++++++++++++++------
>>   lib/librte_ring/rte_ring_arm64.h   | 48 ++++++++++++++++++++++++++++++++++++++
>>   lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++
>>   4 files changed, 127 insertions(+), 8 deletions(-)
>>   create mode 100644 lib/librte_ring/rte_ring_arm64.h
>>   create mode 100644 lib/librte_ring/rte_ring_generic.h
>>
>> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
>> index e34d9d9..fa57a86 100644
>> --- a/lib/librte_ring/Makefile
>> +++ b/lib/librte_ring/Makefile
>> @@ -45,6 +45,8 @@ LIBABIVER := 1
>>   SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
>>
>>   # install includes
>> -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h
>> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
>> +					rte_ring_arm64.h \
>> +					rte_ring_generic.h
>>
>>   include $(RTE_SDK)/mk/rte.lib.mk
>> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
>> index 5e9b3b7..943b1f9 100644
>> --- a/lib/librte_ring/rte_ring.h
>> +++ b/lib/librte_ring/rte_ring.h
>> @@ -103,6 +103,18 @@ extern "C" {
>>   #include <rte_memzone.h>
>>   #include <rte_pause.h>
>>
>> +/* In those strong memory models (e.g. x86), there is no need to add rmb()
>> + * between load and load.
>> + * In those weak models(powerpc/arm), there are 2 choices for the users
>> + * 1.use rmb() memory barrier
>> + * 2.use one-direcion load_acquire/store_release barrier
>> + * It depends on performance test results. */
>> +#ifdef RTE_ARCH_ARM64
>> +#include "rte_ring_arm64.h"
>> +#else
>> +#include "rte_ring_generic.h"
>> +#endif
>> +
>>   #define RTE_TAILQ_RING_NAME "RTE_RING"
>>
>>   enum rte_ring_queue_behavior {
>> @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
>>   		while (unlikely(ht->tail != old_val))
>>   			rte_pause();
>>
>> -	ht->tail = new_val;
>> +	arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
>>   }
>>
>>   /**
>> @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
>>   		/* Reset n to the initial burst count */
>>   		n = max;
>>
>> -		*old_head = r->prod.head;
>> +		*old_head = arch_rte_atomic_load(&r->prod.head,
>> +						__ATOMIC_ACQUIRE);
>>   		const uint32_t cons_tail = r->cons.tail;
> The code starts to look a bit messy with all these arch specific macros...
> So I wonder wouldn't it be more cleaner to:
>
> 1. move existing __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail
> into rte_ring_generic.h
> 2. Add rte_smp_rmb into generic __rte_ring_move_prod_head/__rte_ring_move_cons_head
> (as was in v1 of your patch).
> 3. Introduce ARM specific versions of __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail
> in the rte_ring_arm64.h
>
> That way we will keep ogneric code simple and clean, while still allowing arch specific optimizations.
Thanks for your review.
But as per your suggestion, there will be at least 2 copies of 
__rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail.
Thus, if there are any bugs in the future, both 2 copies have to be 
changed, right?
>
>>   		/*
>>   		 *  The subtraction is done between two unsigned 32bits value
>> @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
>>   		if (is_sp)
>>   			r->prod.head = *new_head, success = 1;
>>   		else
>> -			success = rte_atomic32_cmpset(&r->prod.head,
>> -					*old_head, *new_head);
>> +			success = arch_rte_atomic32_cmpset(&r->prod.head,
>> +					old_head, *new_head,
>> +					0, __ATOMIC_ACQUIRE,
>> +					__ATOMIC_RELAXED);
>>   	} while (unlikely(success == 0));
>>   	return n;
>>   }
>> @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
>>   		goto end;
>>
>>   	ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
>> +
>> +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
> I wonder why do we need that macro?
> Would be there situations when smp_wmb() are not needed here?
If the dpdk user chooses the config acquire/release, the store_release 
barrier in update_tail together with
the load_acquire barrier pair in __rte_ring_move_{prod,cons}_head 
guarantee the order. So smp_wmb() is not required here. Please refer to 
the freebsd ring implementation and Jerin's debug patch.
https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h
https://github.com/jerinjacobk/mytests/blob/master/ring/0001-ring-using-c11-memory-model.patch

---
Cheers,
Jia
> Konstantin
>
>

-- 
Cheers,
Jia

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
  2017-11-02 15:42   ` Jia He
@ 2017-11-02 16:16     ` Ananyev, Konstantin
  2017-11-02 17:00       ` Jerin Jacob
  0 siblings, 1 reply; 13+ messages in thread
From: Ananyev, Konstantin @ 2017-11-02 16:16 UTC (permalink / raw)
  To: Jia He, jerin.jacob, dev, olivier.matz
  Cc: Richardson, Bruce, jianbo.liu, hemant.agrawal, jie2.liu,
	bing.zhao, jia.he



> -----Original Message-----
> From: Jia He [mailto:hejianet@gmail.com]
> Sent: Thursday, November 2, 2017 3:43 PM
> To: Ananyev, Konstantin <konstantin.ananyev@intel.com>; jerin.jacob@caviumnetworks.com; dev@dpdk.org; olivier.matz@6wind.com
> Cc: Richardson, Bruce <bruce.richardson@intel.com>; jianbo.liu@arm.com; hemant.agrawal@nxp.com; jie2.liu@hxt-semitech.com;
> bing.zhao@hxt-semitech.com; jia.he@hxt-semitech.com
> Subject: Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
> 
> Hi Ananyev
> 
> 
> On 11/2/2017 9:26 PM, Ananyev, Konstantin Wrote:
> > Hi Jia,
> >
> >> -----Original Message-----
> >> From: Jia He [mailto:hejianet@gmail.com]
> >> Sent: Thursday, November 2, 2017 8:44 AM
> >> To: jerin.jacob@caviumnetworks.com; dev@dpdk.org; olivier.matz@6wind.com
> >> Cc: Ananyev, Konstantin <konstantin.ananyev@intel.com>; Richardson, Bruce <bruce.richardson@intel.com>; jianbo.liu@arm.com;
> >> hemant.agrawal@nxp.com; Jia He <hejianet@gmail.com>; jie2.liu@hxt-semitech.com; bing.zhao@hxt-semitech.com; jia.he@hxt-
> >> semitech.com
> >> Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
> >>
> >> We watched a rte panic of mbuf_autotest in our qualcomm arm64 server.
> >> As for the possible race condition, please refer to [1].
> >>
> >> Furthermore, there are 2 options as suggested by Jerin:
> >> 1. use rte_smp_rmb
> >> 2. use load_acquire/store_release(refer to [2]).
> >> CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by
> >> default it is n;
> >>
> >> The reason why providing 2 options is due to the performance benchmark
> >> difference in different arm machines, please refer to [3].
> >>
> >> Already fuctionally tested on the machines as follows:
> >> on X86(passed the compilation)
> >> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=y
> >> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=n
> >>
> >> [1] http://dpdk.org/ml/archives/dev/2017-October/078366.html
> >> [2] https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h#L170
> >> [3] http://dpdk.org/ml/archives/dev/2017-October/080861.html
> >>
> >> ---
> >> Changelog:
> >> V2: let users choose whether using load_acquire/store_release
> >> V1: rte_smp_rmb() between 2 loads
> >>
> >> Signed-off-by: Jia He <hejianet@gmail.com>
> >> Signed-off-by: jie2.liu@hxt-semitech.com
> >> Signed-off-by: bing.zhao@hxt-semitech.com
> >> Signed-off-by: jia.he@hxt-semitech.com
> >> Suggested-by: jerin.jacob@caviumnetworks.com
> >> ---
> >>   lib/librte_ring/Makefile           |  4 +++-
> >>   lib/librte_ring/rte_ring.h         | 38 ++++++++++++++++++++++++------
> >>   lib/librte_ring/rte_ring_arm64.h   | 48 ++++++++++++++++++++++++++++++++++++++
> >>   lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++
> >>   4 files changed, 127 insertions(+), 8 deletions(-)
> >>   create mode 100644 lib/librte_ring/rte_ring_arm64.h
> >>   create mode 100644 lib/librte_ring/rte_ring_generic.h
> >>
> >> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
> >> index e34d9d9..fa57a86 100644
> >> --- a/lib/librte_ring/Makefile
> >> +++ b/lib/librte_ring/Makefile
> >> @@ -45,6 +45,8 @@ LIBABIVER := 1
> >>   SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
> >>
> >>   # install includes
> >> -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h
> >> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
> >> +					rte_ring_arm64.h \
> >> +					rte_ring_generic.h
> >>
> >>   include $(RTE_SDK)/mk/rte.lib.mk
> >> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> >> index 5e9b3b7..943b1f9 100644
> >> --- a/lib/librte_ring/rte_ring.h
> >> +++ b/lib/librte_ring/rte_ring.h
> >> @@ -103,6 +103,18 @@ extern "C" {
> >>   #include <rte_memzone.h>
> >>   #include <rte_pause.h>
> >>
> >> +/* In those strong memory models (e.g. x86), there is no need to add rmb()
> >> + * between load and load.
> >> + * In those weak models(powerpc/arm), there are 2 choices for the users
> >> + * 1.use rmb() memory barrier
> >> + * 2.use one-direcion load_acquire/store_release barrier
> >> + * It depends on performance test results. */
> >> +#ifdef RTE_ARCH_ARM64
> >> +#include "rte_ring_arm64.h"
> >> +#else
> >> +#include "rte_ring_generic.h"
> >> +#endif
> >> +
> >>   #define RTE_TAILQ_RING_NAME "RTE_RING"
> >>
> >>   enum rte_ring_queue_behavior {
> >> @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
> >>   		while (unlikely(ht->tail != old_val))
> >>   			rte_pause();
> >>
> >> -	ht->tail = new_val;
> >> +	arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
> >>   }
> >>
> >>   /**
> >> @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> >>   		/* Reset n to the initial burst count */
> >>   		n = max;
> >>
> >> -		*old_head = r->prod.head;
> >> +		*old_head = arch_rte_atomic_load(&r->prod.head,
> >> +						__ATOMIC_ACQUIRE);
> >>   		const uint32_t cons_tail = r->cons.tail;
> > The code starts to look a bit messy with all these arch specific macros...
> > So I wonder wouldn't it be more cleaner to:
> >
> > 1. move existing __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail
> > into rte_ring_generic.h
> > 2. Add rte_smp_rmb into generic __rte_ring_move_prod_head/__rte_ring_move_cons_head
> > (as was in v1 of your patch).
> > 3. Introduce ARM specific versions of __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail
> > in the rte_ring_arm64.h
> >
> > That way we will keep ogneric code simple and clean, while still allowing arch specific optimizations.
> Thanks for your review.
> But as per your suggestion, there will be at least 2 copies of
> __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail.
> Thus, if there are any bugs in the future, both 2 copies have to be
> changed, right?

Yes, there would be some code duplication.
Though generic code would be much cleaner and simpler to read in that case.
Which is worth some dups I think.

> >
> >>   		/*
> >>   		 *  The subtraction is done between two unsigned 32bits value
> >> @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> >>   		if (is_sp)
> >>   			r->prod.head = *new_head, success = 1;
> >>   		else
> >> -			success = rte_atomic32_cmpset(&r->prod.head,
> >> -					*old_head, *new_head);
> >> +			success = arch_rte_atomic32_cmpset(&r->prod.head,
> >> +					old_head, *new_head,
> >> +					0, __ATOMIC_ACQUIRE,
> >> +					__ATOMIC_RELAXED);
> >>   	} while (unlikely(success == 0));
> >>   	return n;
> >>   }
> >> @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
> >>   		goto end;
> >>
> >>   	ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
> >> +
> >> +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
> > I wonder why do we need that macro?
> > Would be there situations when smp_wmb() are not needed here?
> If the dpdk user chooses the config acquire/release, the store_release
> barrier in update_tail together with
> the load_acquire barrier pair in __rte_ring_move_{prod,cons}_head
> guarantee the order. So smp_wmb() is not required here. Please refer to
> the freebsd ring implementation and Jerin's debug patch.
> https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h
> https://github.com/jerinjacobk/mytests/blob/master/ring/0001-ring-using-c11-memory-model.patch

Ok, so because you are doing
arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
inside update_tail() you don't need mb() anymore here...
In that case, can we probably hide all that logic inside update_tail()?
I.E. move sp_rmb/smp_wmb into update_tail() and then for arm64 you'll
Have your special one with atomic_store()

Something like that for generic version :
static __rte_always_inline void
update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
-                uint32_t single)
+                uint32_t single, uint32_t enqueue)
{

+       if (enqueue)
+            rte_smp_wmb();
+       else
+           rte_smp_rmb();
        /*
         * If there are other enqueues/dequeues in progress that preceded us,
         * we need to wait for them to complete
         */
        if (!single)
                while (unlikely(ht->tail != old_val))
                        rte_pause();

        ht->tail = new_val;
}

....

static __rte_always_inline unsigned int
__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
                 unsigned int n, enum rte_ring_queue_behavior behavior,
                 int is_sp, unsigned int *free_space)
{
        .....

        ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
-       rte_smp_wmb();

-       update_tail(&r->prod, prod_head, prod_next, is_sp);
+       update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
....

static __rte_always_inline unsigned int
__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
                 unsigned int n, enum rte_ring_queue_behavior behavior,
                 int is_sc, unsigned int *available)
{
        ....

        DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
-       rte_smp_rmb();

-       update_tail(&r->cons, cons_head, cons_next, is_sc);
+       update_tail(&r->cons, cons_head, cons_next, is_sc, 0);

Konstantin

> 
> ---
> Cheers,
> Jia
> > Konstantin
> >
> >
> 
> --
> Cheers,
> Jia

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
  2017-11-02 16:16     ` Ananyev, Konstantin
@ 2017-11-02 17:00       ` Jerin Jacob
  0 siblings, 0 replies; 13+ messages in thread
From: Jerin Jacob @ 2017-11-02 17:00 UTC (permalink / raw)
  To: Ananyev, Konstantin
  Cc: Jia He, dev, olivier.matz, Richardson, Bruce, jianbo.liu,
	hemant.agrawal, jie2.liu, bing.zhao, jia.he

-----Original Message-----
> Date: Thu, 2 Nov 2017 16:16:33 +0000
> From: "Ananyev, Konstantin" <konstantin.ananyev@intel.com>
> To: Jia He <hejianet@gmail.com>, "jerin.jacob@caviumnetworks.com"
>  <jerin.jacob@caviumnetworks.com>, "dev@dpdk.org" <dev@dpdk.org>,
>  "olivier.matz@6wind.com" <olivier.matz@6wind.com>
> CC: "Richardson, Bruce" <bruce.richardson@intel.com>, "jianbo.liu@arm.com"
>  <jianbo.liu@arm.com>, "hemant.agrawal@nxp.com" <hemant.agrawal@nxp.com>,
>  "jie2.liu@hxt-semitech.com" <jie2.liu@hxt-semitech.com>,
>  "bing.zhao@hxt-semitech.com" <bing.zhao@hxt-semitech.com>,
>  "jia.he@hxt-semitech.com" <jia.he@hxt-semitech.com>
> Subject: RE: [PATCH v2] ring: guarantee ordering of cons/prod loading when
>  doing
> 
> 
> 
> > -----Original Message-----
> > From: Jia He [mailto:hejianet@gmail.com]
> > Sent: Thursday, November 2, 2017 3:43 PM
> > To: Ananyev, Konstantin <konstantin.ananyev@intel.com>; jerin.jacob@caviumnetworks.com; dev@dpdk.org; olivier.matz@6wind.com
> > Cc: Richardson, Bruce <bruce.richardson@intel.com>; jianbo.liu@arm.com; hemant.agrawal@nxp.com; jie2.liu@hxt-semitech.com;
> > bing.zhao@hxt-semitech.com; jia.he@hxt-semitech.com
> > Subject: Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
> > 
> > Hi Ananyev
> > 
> > 
> > On 11/2/2017 9:26 PM, Ananyev, Konstantin Wrote:
> > > Hi Jia,
> > >
> > >> -----Original Message-----
> > >> From: Jia He [mailto:hejianet@gmail.com]
> > >> Sent: Thursday, November 2, 2017 8:44 AM
> > >> To: jerin.jacob@caviumnetworks.com; dev@dpdk.org; olivier.matz@6wind.com
> > >> Cc: Ananyev, Konstantin <konstantin.ananyev@intel.com>; Richardson, Bruce <bruce.richardson@intel.com>; jianbo.liu@arm.com;
> > >> hemant.agrawal@nxp.com; Jia He <hejianet@gmail.com>; jie2.liu@hxt-semitech.com; bing.zhao@hxt-semitech.com; jia.he@hxt-
> > >> semitech.com
> > >> Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
> > >>
> > >> We watched a rte panic of mbuf_autotest in our qualcomm arm64 server.
> > >> As for the possible race condition, please refer to [1].
> > >>
> > >> Furthermore, there are 2 options as suggested by Jerin:
> > >> 1. use rte_smp_rmb
> > >> 2. use load_acquire/store_release(refer to [2]).
> > >> CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by
> > >> default it is n;
> > >>
> > >> The reason why providing 2 options is due to the performance benchmark
> > >> difference in different arm machines, please refer to [3].
> > >>
> > >> Already fuctionally tested on the machines as follows:
> > >> on X86(passed the compilation)
> > >> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=y
> > >> on arm64 with CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER=n
> > >>
> > >> [1] http://dpdk.org/ml/archives/dev/2017-October/078366.html
> > >> [2] https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h#L170
> > >> [3] http://dpdk.org/ml/archives/dev/2017-October/080861.html
> > >>
> > >> ---
> > >> Changelog:
> > >> V2: let users choose whether using load_acquire/store_release
> > >> V1: rte_smp_rmb() between 2 loads
> > >>
> > >> Signed-off-by: Jia He <hejianet@gmail.com>
> > >> Signed-off-by: jie2.liu@hxt-semitech.com
> > >> Signed-off-by: bing.zhao@hxt-semitech.com
> > >> Signed-off-by: jia.he@hxt-semitech.com
> > >> Suggested-by: jerin.jacob@caviumnetworks.com
> > >> ---
> > >>   lib/librte_ring/Makefile           |  4 +++-
> > >>   lib/librte_ring/rte_ring.h         | 38 ++++++++++++++++++++++++------
> > >>   lib/librte_ring/rte_ring_arm64.h   | 48 ++++++++++++++++++++++++++++++++++++++
> > >>   lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++
> > >>   4 files changed, 127 insertions(+), 8 deletions(-)
> > >>   create mode 100644 lib/librte_ring/rte_ring_arm64.h
> > >>   create mode 100644 lib/librte_ring/rte_ring_generic.h
> > >>
> > >> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
> > >> index e34d9d9..fa57a86 100644
> > >> --- a/lib/librte_ring/Makefile
> > >> +++ b/lib/librte_ring/Makefile
> > >> @@ -45,6 +45,8 @@ LIBABIVER := 1
> > >>   SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
> > >>
> > >>   # install includes
> > >> -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h
> > >> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
> > >> +					rte_ring_arm64.h \
> > >> +					rte_ring_generic.h
> > >>
> > >>   include $(RTE_SDK)/mk/rte.lib.mk
> > >> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> > >> index 5e9b3b7..943b1f9 100644
> > >> --- a/lib/librte_ring/rte_ring.h
> > >> +++ b/lib/librte_ring/rte_ring.h
> > >> @@ -103,6 +103,18 @@ extern "C" {
> > >>   #include <rte_memzone.h>
> > >>   #include <rte_pause.h>
> > >>
> > >> +/* In those strong memory models (e.g. x86), there is no need to add rmb()
> > >> + * between load and load.
> > >> + * In those weak models(powerpc/arm), there are 2 choices for the users
> > >> + * 1.use rmb() memory barrier
> > >> + * 2.use one-direcion load_acquire/store_release barrier
> > >> + * It depends on performance test results. */
> > >> +#ifdef RTE_ARCH_ARM64
> > >> +#include "rte_ring_arm64.h"
> > >> +#else
> > >> +#include "rte_ring_generic.h"
> > >> +#endif
> > >> +
> > >>   #define RTE_TAILQ_RING_NAME "RTE_RING"
> > >>
> > >>   enum rte_ring_queue_behavior {
> > >> @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
> > >>   		while (unlikely(ht->tail != old_val))
> > >>   			rte_pause();
> > >>
> > >> -	ht->tail = new_val;
> > >> +	arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
> > >>   }
> > >>
> > >>   /**
> > >> @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> > >>   		/* Reset n to the initial burst count */
> > >>   		n = max;
> > >>
> > >> -		*old_head = r->prod.head;
> > >> +		*old_head = arch_rte_atomic_load(&r->prod.head,
> > >> +						__ATOMIC_ACQUIRE);
> > >>   		const uint32_t cons_tail = r->cons.tail;
> > > The code starts to look a bit messy with all these arch specific macros...
> > > So I wonder wouldn't it be more cleaner to:
> > >
> > > 1. move existing __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail
> > > into rte_ring_generic.h
> > > 2. Add rte_smp_rmb into generic __rte_ring_move_prod_head/__rte_ring_move_cons_head
> > > (as was in v1 of your patch).
> > > 3. Introduce ARM specific versions of __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail
> > > in the rte_ring_arm64.h
> > >
> > > That way we will keep ogneric code simple and clean, while still allowing arch specific optimizations.
> > Thanks for your review.
> > But as per your suggestion, there will be at least 2 copies of
> > __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail.
> > Thus, if there are any bugs in the future, both 2 copies have to be
> > changed, right?
> 
> Yes, there would be some code duplication.
> Though generic code would be much cleaner and simpler to read in that case.
> Which is worth some dups I think.

I agree with Konstantin here.

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
  2017-11-02  8:43 [PATCH v2] ring: guarantee ordering of cons/prod loading when doing Jia He
  2017-11-02 13:26 ` Ananyev, Konstantin
@ 2017-11-02 17:23 ` Jerin Jacob
  2017-11-03  1:46   ` Jia He
  1 sibling, 1 reply; 13+ messages in thread
From: Jerin Jacob @ 2017-11-02 17:23 UTC (permalink / raw)
  To: Jia He
  Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson,
	jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he

-----Original Message-----
> Date: Thu,  2 Nov 2017 08:43:30 +0000
> From: Jia He <hejianet@gmail.com>
> To: jerin.jacob@caviumnetworks.com, dev@dpdk.org, olivier.matz@6wind.com
> Cc: konstantin.ananyev@intel.com, bruce.richardson@intel.com,
>  jianbo.liu@arm.com, hemant.agrawal@nxp.com, Jia He <hejianet@gmail.com>,
>  jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com,
>  jia.he@hxt-semitech.com
> Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
> X-Mailer: git-send-email 2.7.4
> 
> We watched a rte panic of mbuf_autotest in our qualcomm arm64 server.
> As for the possible race condition, please refer to [1].

Hi Jia,

In addition to Konstantin comments. Please find below some review
comments.
> 
> Furthermore, there are 2 options as suggested by Jerin:
> 1. use rte_smp_rmb
> 2. use load_acquire/store_release(refer to [2]).
> CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by

I think, The better name would be CONFIG_RTE_RING_USE_C11_MEM_MODEL
or something like that.

> default it is n;
> 
> ---
> Changelog:
> V2: let users choose whether using load_acquire/store_release
> V1: rte_smp_rmb() between 2 loads
> 
> Signed-off-by: Jia He <hejianet@gmail.com>
> Signed-off-by: jie2.liu@hxt-semitech.com
> Signed-off-by: bing.zhao@hxt-semitech.com
> Signed-off-by: jia.he@hxt-semitech.com
> Suggested-by: jerin.jacob@caviumnetworks.com
> ---
>  lib/librte_ring/Makefile           |  4 +++-
>  lib/librte_ring/rte_ring.h         | 38 ++++++++++++++++++++++++------
>  lib/librte_ring/rte_ring_arm64.h   | 48 ++++++++++++++++++++++++++++++++++++++
>  lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++
>  4 files changed, 127 insertions(+), 8 deletions(-)
>  create mode 100644 lib/librte_ring/rte_ring_arm64.h
>  create mode 100644 lib/librte_ring/rte_ring_generic.h
> 
> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
> index e34d9d9..fa57a86 100644
> --- a/lib/librte_ring/Makefile
> +++ b/lib/librte_ring/Makefile
> @@ -45,6 +45,8 @@ LIBABIVER := 1
>  SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
>  
>  # install includes
> -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h
> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
> +					rte_ring_arm64.h \

It is really not specific to arm64. We could rename it to rte_ring_c11_mem.h or
something like that to reflect the implementation based on c11 memory
model.


> +					rte_ring_generic.h
>  
>  include $(RTE_SDK)/mk/rte.lib.mk
> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> index 5e9b3b7..943b1f9 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -103,6 +103,18 @@ extern "C" {
>  #include <rte_memzone.h>
>  #include <rte_pause.h>
>  
> +/* In those strong memory models (e.g. x86), there is no need to add rmb()
> + * between load and load.
> + * In those weak models(powerpc/arm), there are 2 choices for the users
> + * 1.use rmb() memory barrier
> + * 2.use one-direcion load_acquire/store_release barrier
> + * It depends on performance test results. */
> +#ifdef RTE_ARCH_ARM64

s/RTE_ARCH_ARM64/RTE_RING_USE_C11_MEM_MODEL and update the generic arm64 config.
By that way it can used by another architecture like ppc if they choose to do so.


> +#include "rte_ring_arm64.h"
> +#else
> +#include "rte_ring_generic.h"
> +#endif
> +
>  #define RTE_TAILQ_RING_NAME "RTE_RING"
>  
>  enum rte_ring_queue_behavior {
> @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
>  		while (unlikely(ht->tail != old_val))
>  			rte_pause();
>  
> -	ht->tail = new_val;
> +	arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
>  }
>  
>  /**
> @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
>  		/* Reset n to the initial burst count */
>  		n = max;
>  
> -		*old_head = r->prod.head;
> +		*old_head = arch_rte_atomic_load(&r->prod.head,
> +						__ATOMIC_ACQUIRE);

Same as Konstantin comments, i.e move to this function to c11 memory model 
header file

>  		const uint32_t cons_tail = r->cons.tail;
>  		/*
>  		 *  The subtraction is done between two unsigned 32bits value
> @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
>  		if (is_sp)
>  			r->prod.head = *new_head, success = 1;
>  		else
> -			success = rte_atomic32_cmpset(&r->prod.head,
> -					*old_head, *new_head);
> +			success = arch_rte_atomic32_cmpset(&r->prod.head,
> +					old_head, *new_head,
> +					0, __ATOMIC_ACQUIRE,
> +					__ATOMIC_RELAXED);
>  	} while (unlikely(success == 0));
>  	return n;
>  }
> @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
>  		goto end;
>  
>  	ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
> +
> +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER

This #define will be removed if the function moves.

>  	rte_smp_wmb();
> +#endif
>  
>  	update_tail(&r->prod, prod_head, prod_next, is_sp);
>  end:
> @@ -516,7 +534,8 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
>  		/* Restore n as it may change every loop */
>  		n = max;
>  
> -		*old_head = r->cons.head;
> +		*old_head = arch_rte_atomic_load(&r->cons.head,
> +						__ATOMIC_ACQUIRE);
>  		const uint32_t prod_tail = r->prod.tail;
>  		/* The subtraction is done between two unsigned 32bits value
>  		 * (the result is always modulo 32 bits even if we have
> @@ -535,8 +554,10 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
>  		if (is_sc)
>  			r->cons.head = *new_head, success = 1;
>  		else
> -			success = rte_atomic32_cmpset(&r->cons.head, *old_head,
> -					*new_head);
> +			success = arch_rte_atomic32_cmpset(&r->cons.head,
> +					old_head, *new_head,
> +					0, __ATOMIC_ACQUIRE,
> +					__ATOMIC_RELAXED);
>  	} while (unlikely(success == 0));
>  	return n;
>  }
> @@ -575,7 +596,10 @@ __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
>  		goto end;
>  
>  	DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
> +
> +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER

This #define will be removed if the function moves.

>  	rte_smp_rmb();
> +#endif

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
  2017-11-02 17:23 ` Jerin Jacob
@ 2017-11-03  1:46   ` Jia He
  2017-11-03 12:56     ` Jerin Jacob
  0 siblings, 1 reply; 13+ messages in thread
From: Jia He @ 2017-11-03  1:46 UTC (permalink / raw)
  To: Jerin Jacob
  Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson,
	jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he

Hi Jerin


On 11/3/2017 1:23 AM, Jerin Jacob Wrote:
> -----Original Message-----
>> Date: Thu,  2 Nov 2017 08:43:30 +0000
>> From: Jia He <hejianet@gmail.com>
>> To: jerin.jacob@caviumnetworks.com, dev@dpdk.org, olivier.matz@6wind.com
>> Cc: konstantin.ananyev@intel.com, bruce.richardson@intel.com,
>>   jianbo.liu@arm.com, hemant.agrawal@nxp.com, Jia He <hejianet@gmail.com>,
>>   jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com,
>>   jia.he@hxt-semitech.com
>> Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
>> X-Mailer: git-send-email 2.7.4
>>
>> We watched a rte panic of mbuf_autotest in our qualcomm arm64 server.
>> As for the possible race condition, please refer to [1].
> Hi Jia,
>
> In addition to Konstantin comments. Please find below some review
> comments.
>> Furthermore, there are 2 options as suggested by Jerin:
>> 1. use rte_smp_rmb
>> 2. use load_acquire/store_release(refer to [2]).
>> CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by
> I think, The better name would be CONFIG_RTE_RING_USE_C11_MEM_MODEL
> or something like that.
Ok, but how to distinguish following 2 options?

CONFIG_RTE_RING_USE_C11_MEM_MODEL doesn't seem to be enough

1. use rte_smp_rmb
2. use load_acquire/store_release(refer to [2]).

>> default it is n;
>>
>> ---
>> Changelog:
>> V2: let users choose whether using load_acquire/store_release
>> V1: rte_smp_rmb() between 2 loads
>>
>> Signed-off-by: Jia He <hejianet@gmail.com>
>> Signed-off-by: jie2.liu@hxt-semitech.com
>> Signed-off-by: bing.zhao@hxt-semitech.com
>> Signed-off-by: jia.he@hxt-semitech.com
>> Suggested-by: jerin.jacob@caviumnetworks.com
>> ---
>>   lib/librte_ring/Makefile           |  4 +++-
>>   lib/librte_ring/rte_ring.h         | 38 ++++++++++++++++++++++++------
>>   lib/librte_ring/rte_ring_arm64.h   | 48 ++++++++++++++++++++++++++++++++++++++
>>   lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++
>>   4 files changed, 127 insertions(+), 8 deletions(-)
>>   create mode 100644 lib/librte_ring/rte_ring_arm64.h
>>   create mode 100644 lib/librte_ring/rte_ring_generic.h
>>
>> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
>> index e34d9d9..fa57a86 100644
>> --- a/lib/librte_ring/Makefile
>> +++ b/lib/librte_ring/Makefile
>> @@ -45,6 +45,8 @@ LIBABIVER := 1
>>   SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
>>   
>>   # install includes
>> -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h
>> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
>> +					rte_ring_arm64.h \
> It is really not specific to arm64. We could rename it to rte_ring_c11_mem.h or
> something like that to reflect the implementation based on c11 memory
> model.
>
>
>> +					rte_ring_generic.h
>>   
>>   include $(RTE_SDK)/mk/rte.lib.mk
>> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
>> index 5e9b3b7..943b1f9 100644
>> --- a/lib/librte_ring/rte_ring.h
>> +++ b/lib/librte_ring/rte_ring.h
>> @@ -103,6 +103,18 @@ extern "C" {
>>   #include <rte_memzone.h>
>>   #include <rte_pause.h>
>>   
>> +/* In those strong memory models (e.g. x86), there is no need to add rmb()
>> + * between load and load.
>> + * In those weak models(powerpc/arm), there are 2 choices for the users
>> + * 1.use rmb() memory barrier
>> + * 2.use one-direcion load_acquire/store_release barrier
>> + * It depends on performance test results. */
>> +#ifdef RTE_ARCH_ARM64
> s/RTE_ARCH_ARM64/RTE_RING_USE_C11_MEM_MODEL and update the generic arm64 config.
> By that way it can used by another architecture like ppc if they choose to do so.
>
>
>> +#include "rte_ring_arm64.h"
>> +#else
>> +#include "rte_ring_generic.h"
>> +#endif
>> +
>>   #define RTE_TAILQ_RING_NAME "RTE_RING"
>>   
>>   enum rte_ring_queue_behavior {
>> @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
>>   		while (unlikely(ht->tail != old_val))
>>   			rte_pause();
>>   
>> -	ht->tail = new_val;
>> +	arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
>>   }
>>   
>>   /**
>> @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
>>   		/* Reset n to the initial burst count */
>>   		n = max;
>>   
>> -		*old_head = r->prod.head;
>> +		*old_head = arch_rte_atomic_load(&r->prod.head,
>> +						__ATOMIC_ACQUIRE);
> Same as Konstantin comments, i.e move to this function to c11 memory model
> header file
>
>>   		const uint32_t cons_tail = r->cons.tail;
>>   		/*
>>   		 *  The subtraction is done between two unsigned 32bits value
>> @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
>>   		if (is_sp)
>>   			r->prod.head = *new_head, success = 1;
>>   		else
>> -			success = rte_atomic32_cmpset(&r->prod.head,
>> -					*old_head, *new_head);
>> +			success = arch_rte_atomic32_cmpset(&r->prod.head,
>> +					old_head, *new_head,
>> +					0, __ATOMIC_ACQUIRE,
>> +					__ATOMIC_RELAXED);
>>   	} while (unlikely(success == 0));
>>   	return n;
>>   }
>> @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
>>   		goto end;
>>   
>>   	ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
>> +
>> +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
> This #define will be removed if the function moves.
>
>>   	rte_smp_wmb();
>> +#endif
>>   
>>   	update_tail(&r->prod, prod_head, prod_next, is_sp);
>>   end:
>> @@ -516,7 +534,8 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
>>   		/* Restore n as it may change every loop */
>>   		n = max;
>>   
>> -		*old_head = r->cons.head;
>> +		*old_head = arch_rte_atomic_load(&r->cons.head,
>> +						__ATOMIC_ACQUIRE);
>>   		const uint32_t prod_tail = r->prod.tail;
>>   		/* The subtraction is done between two unsigned 32bits value
>>   		 * (the result is always modulo 32 bits even if we have
>> @@ -535,8 +554,10 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
>>   		if (is_sc)
>>   			r->cons.head = *new_head, success = 1;
>>   		else
>> -			success = rte_atomic32_cmpset(&r->cons.head, *old_head,
>> -					*new_head);
>> +			success = arch_rte_atomic32_cmpset(&r->cons.head,
>> +					old_head, *new_head,
>> +					0, __ATOMIC_ACQUIRE,
>> +					__ATOMIC_RELAXED);
>>   	} while (unlikely(success == 0));
>>   	return n;
>>   }
>> @@ -575,7 +596,10 @@ __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
>>   		goto end;
>>   
>>   	DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
>> +
>> +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
> This #define will be removed if the function moves.
>
>>   	rte_smp_rmb();
>> +#endif

-- 
Cheers,
Jia

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
  2017-11-03  1:46   ` Jia He
@ 2017-11-03 12:56     ` Jerin Jacob
  2017-11-06  7:25       ` Jia He
  0 siblings, 1 reply; 13+ messages in thread
From: Jerin Jacob @ 2017-11-03 12:56 UTC (permalink / raw)
  To: Jia He
  Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson,
	jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he

-----Original Message-----
> Date: Fri, 3 Nov 2017 09:46:40 +0800
> From: Jia He <hejianet@gmail.com>
> To: Jerin Jacob <jerin.jacob@caviumnetworks.com>
> Cc: dev@dpdk.org, olivier.matz@6wind.com, konstantin.ananyev@intel.com,
>  bruce.richardson@intel.com, jianbo.liu@arm.com, hemant.agrawal@nxp.com,
>  jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com,
>  jia.he@hxt-semitech.com
> Subject: Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when
>  doing
> User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101
>  Thunderbird/52.4.0
> 
> Hi Jerin
> 
> 
> On 11/3/2017 1:23 AM, Jerin Jacob Wrote:
> > -----Original Message-----
> > > Date: Thu,  2 Nov 2017 08:43:30 +0000
> > > From: Jia He <hejianet@gmail.com>
> > > To: jerin.jacob@caviumnetworks.com, dev@dpdk.org, olivier.matz@6wind.com
> > > Cc: konstantin.ananyev@intel.com, bruce.richardson@intel.com,
> > >   jianbo.liu@arm.com, hemant.agrawal@nxp.com, Jia He <hejianet@gmail.com>,
> > >   jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com,
> > >   jia.he@hxt-semitech.com
> > > Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
> > > X-Mailer: git-send-email 2.7.4
> > > 
> > > We watched a rte panic of mbuf_autotest in our qualcomm arm64 server.
> > > As for the possible race condition, please refer to [1].
> > Hi Jia,
> > 
> > In addition to Konstantin comments. Please find below some review
> > comments.
> > > Furthermore, there are 2 options as suggested by Jerin:
> > > 1. use rte_smp_rmb
> > > 2. use load_acquire/store_release(refer to [2]).
> > > CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by
> > I think, The better name would be CONFIG_RTE_RING_USE_C11_MEM_MODEL
> > or something like that.
> Ok, but how to distinguish following 2 options?

No clearly understood this question. For arm64 case, you can add 
CONFIG_RTE_RING_USE_C11_MEM_MODEL=y in config/defconfig_arm64-armv8a-*


> 
> CONFIG_RTE_RING_USE_C11_MEM_MODEL doesn't seem to be enough
> 
> 1. use rte_smp_rmb
> 2. use load_acquire/store_release(refer to [2]).
> 
> > > default it is n;
> > > 
> > > ---
> > > Changelog:
> > > V2: let users choose whether using load_acquire/store_release
> > > V1: rte_smp_rmb() between 2 loads
> > > 
> > > Signed-off-by: Jia He <hejianet@gmail.com>
> > > Signed-off-by: jie2.liu@hxt-semitech.com
> > > Signed-off-by: bing.zhao@hxt-semitech.com
> > > Signed-off-by: jia.he@hxt-semitech.com
> > > Suggested-by: jerin.jacob@caviumnetworks.com
> > > ---
> > >   lib/librte_ring/Makefile           |  4 +++-
> > >   lib/librte_ring/rte_ring.h         | 38 ++++++++++++++++++++++++------
> > >   lib/librte_ring/rte_ring_arm64.h   | 48 ++++++++++++++++++++++++++++++++++++++
> > >   lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++
> > >   4 files changed, 127 insertions(+), 8 deletions(-)
> > >   create mode 100644 lib/librte_ring/rte_ring_arm64.h
> > >   create mode 100644 lib/librte_ring/rte_ring_generic.h
> > > 
> > > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
> > > index e34d9d9..fa57a86 100644
> > > --- a/lib/librte_ring/Makefile
> > > +++ b/lib/librte_ring/Makefile
> > > @@ -45,6 +45,8 @@ LIBABIVER := 1
> > >   SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
> > >   # install includes
> > > -SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h
> > > +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
> > > +					rte_ring_arm64.h \
> > It is really not specific to arm64. We could rename it to rte_ring_c11_mem.h or
> > something like that to reflect the implementation based on c11 memory
> > model.
> > 
> > 
> > > +					rte_ring_generic.h
> > >   include $(RTE_SDK)/mk/rte.lib.mk
> > > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> > > index 5e9b3b7..943b1f9 100644
> > > --- a/lib/librte_ring/rte_ring.h
> > > +++ b/lib/librte_ring/rte_ring.h
> > > @@ -103,6 +103,18 @@ extern "C" {
> > >   #include <rte_memzone.h>
> > >   #include <rte_pause.h>
> > > +/* In those strong memory models (e.g. x86), there is no need to add rmb()
> > > + * between load and load.
> > > + * In those weak models(powerpc/arm), there are 2 choices for the users
> > > + * 1.use rmb() memory barrier
> > > + * 2.use one-direcion load_acquire/store_release barrier
> > > + * It depends on performance test results. */
> > > +#ifdef RTE_ARCH_ARM64
> > s/RTE_ARCH_ARM64/RTE_RING_USE_C11_MEM_MODEL and update the generic arm64 config.
> > By that way it can used by another architecture like ppc if they choose to do so.
> > 
> > 
> > > +#include "rte_ring_arm64.h"
> > > +#else
> > > +#include "rte_ring_generic.h"
> > > +#endif
> > > +
> > >   #define RTE_TAILQ_RING_NAME "RTE_RING"
> > >   enum rte_ring_queue_behavior {
> > > @@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
> > >   		while (unlikely(ht->tail != old_val))
> > >   			rte_pause();
> > > -	ht->tail = new_val;
> > > +	arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
> > >   }
> > >   /**
> > > @@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> > >   		/* Reset n to the initial burst count */
> > >   		n = max;
> > > -		*old_head = r->prod.head;
> > > +		*old_head = arch_rte_atomic_load(&r->prod.head,
> > > +						__ATOMIC_ACQUIRE);
> > Same as Konstantin comments, i.e move to this function to c11 memory model
> > header file
> > 
> > >   		const uint32_t cons_tail = r->cons.tail;
> > >   		/*
> > >   		 *  The subtraction is done between two unsigned 32bits value
> > > @@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> > >   		if (is_sp)
> > >   			r->prod.head = *new_head, success = 1;
> > >   		else
> > > -			success = rte_atomic32_cmpset(&r->prod.head,
> > > -					*old_head, *new_head);
> > > +			success = arch_rte_atomic32_cmpset(&r->prod.head,
> > > +					old_head, *new_head,
> > > +					0, __ATOMIC_ACQUIRE,
> > > +					__ATOMIC_RELAXED);
> > >   	} while (unlikely(success == 0));
> > >   	return n;
> > >   }
> > > @@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
> > >   		goto end;
> > >   	ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
> > > +
> > > +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
> > This #define will be removed if the function moves.
> > 
> > >   	rte_smp_wmb();
> > > +#endif
> > >   	update_tail(&r->prod, prod_head, prod_next, is_sp);
> > >   end:
> > > @@ -516,7 +534,8 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
> > >   		/* Restore n as it may change every loop */
> > >   		n = max;
> > > -		*old_head = r->cons.head;
> > > +		*old_head = arch_rte_atomic_load(&r->cons.head,
> > > +						__ATOMIC_ACQUIRE);
> > >   		const uint32_t prod_tail = r->prod.tail;
> > >   		/* The subtraction is done between two unsigned 32bits value
> > >   		 * (the result is always modulo 32 bits even if we have
> > > @@ -535,8 +554,10 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
> > >   		if (is_sc)
> > >   			r->cons.head = *new_head, success = 1;
> > >   		else
> > > -			success = rte_atomic32_cmpset(&r->cons.head, *old_head,
> > > -					*new_head);
> > > +			success = arch_rte_atomic32_cmpset(&r->cons.head,
> > > +					old_head, *new_head,
> > > +					0, __ATOMIC_ACQUIRE,
> > > +					__ATOMIC_RELAXED);
> > >   	} while (unlikely(success == 0));
> > >   	return n;
> > >   }
> > > @@ -575,7 +596,10 @@ __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
> > >   		goto end;
> > >   	DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
> > > +
> > > +#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
> > This #define will be removed if the function moves.
> > 
> > >   	rte_smp_rmb();
> > > +#endif
> 
> -- 
> Cheers,
> Jia
> 

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
  2017-11-03 12:56     ` Jerin Jacob
@ 2017-11-06  7:25       ` Jia He
  2017-11-07  4:36         ` Jerin Jacob
  0 siblings, 1 reply; 13+ messages in thread
From: Jia He @ 2017-11-06  7:25 UTC (permalink / raw)
  To: Jerin Jacob
  Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson,
	jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he

Hi Jerin


On 11/3/2017 8:56 PM, Jerin Jacob Wrote:
> -----Original Message-----
>>
[...]
>> g like that.
>> Ok, but how to distinguish following 2 options?
> No clearly understood this question. For arm64 case, you can add
> CONFIG_RTE_RING_USE_C11_MEM_MODEL=y in config/defconfig_arm64-armv8a-*
Sorry for my unclear expressions.
I mean there should be one additional config macro besides 
CONFIG_RTE_RING_USE_C11_MEM_MODEL
for users to choose?

i.e.
  - On X86:CONFIG_RTE_RING_USE_C11_MEM_MODEL=n
include rte_ring_generic.h, no changes
- On arm64,CONFIG_RTE_RING_USE_C11_MEM_MODEL=y
include rte_ring_c11_mem.h by default.
In rte_ring_c11_mem.h, implement new version of 
__rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail

Then, how to distinguish the option of using rte_smp_rmb() or 
__atomic_load/store_n()?

Thanks for the clarification.

-- 
Cheers,
Jia

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
  2017-11-06  7:25       ` Jia He
@ 2017-11-07  4:36         ` Jerin Jacob
  2017-11-07  8:34           ` Jia He
  0 siblings, 1 reply; 13+ messages in thread
From: Jerin Jacob @ 2017-11-07  4:36 UTC (permalink / raw)
  To: Jia He
  Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson,
	jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he

-----Original Message-----
> Date: Mon, 6 Nov 2017 15:25:12 +0800
> From: Jia He <hejianet@gmail.com>
> To: Jerin Jacob <jerin.jacob@caviumnetworks.com>
> Cc: dev@dpdk.org, olivier.matz@6wind.com, konstantin.ananyev@intel.com,
>  bruce.richardson@intel.com, jianbo.liu@arm.com, hemant.agrawal@nxp.com,
>  jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com,
>  jia.he@hxt-semitech.com
> Subject: Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when
>  doing
> User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101
>  Thunderbird/52.4.0
> 
> Hi Jerin
> 
> 
> On 11/3/2017 8:56 PM, Jerin Jacob Wrote:
> > -----Original Message-----
> > > 
> [...]
> > > g like that.
> > > Ok, but how to distinguish following 2 options?
> > No clearly understood this question. For arm64 case, you can add
> > CONFIG_RTE_RING_USE_C11_MEM_MODEL=y in config/defconfig_arm64-armv8a-*
> Sorry for my unclear expressions.
> I mean there should be one additional config macro besides
> CONFIG_RTE_RING_USE_C11_MEM_MODEL
> for users to choose?
> 
> i.e.
>  - On X86:CONFIG_RTE_RING_USE_C11_MEM_MODEL=n
> include rte_ring_generic.h, no changes
> - On arm64,CONFIG_RTE_RING_USE_C11_MEM_MODEL=y
> include rte_ring_c11_mem.h by default.
> In rte_ring_c11_mem.h, implement new version of
> __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail
> 
> Then, how to distinguish the option of using rte_smp_rmb() or
> __atomic_load/store_n()?

On option could be to change the prototype of update_tail() and make
compiler accommodate it for zero cost for arm64(Which I think, it it the
case. But you can check the generated instructions)
If not, move, __rte_ring_do_dequeue() and __rte_ring_do_enqueue() instead of
__rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail()


➜ [master][dpdk.org] $ git diff
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 5e9b3b7b4..b32648825 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -358,8 +358,12 @@ void rte_ring_dump(FILE *f, const struct rte_ring
*r);
 
 static __rte_always_inline void
 update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
new_val,
-               uint32_t single)
+               uint32_t single, const uint32_t enqueue)
 {
+       if (enqueue)
+               rte_smp_wmb();
+       else
+               rte_smp_rmb();
        /*
         * If there are other enqueues/dequeues in progress that
         * preceded us,
         * we need to wait for them to complete
@@ -470,9 +474,8 @@ __rte_ring_do_enqueue(struct rte_ring *r, void *
const *obj_table,
                goto end;
 
        ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
-       rte_smp_wmb();
 
-       update_tail(&r->prod, prod_head, prod_next, is_sp);
+       update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
 end:
        if (free_space != NULL)
                *free_space = free_entries - n;
@@ -575,9 +578,8 @@ __rte_ring_do_dequeue(struct rte_ring *r, void
**obj_table,
                goto end;
 
        DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
-       rte_smp_rmb();
 
-       update_tail(&r->cons, cons_head, cons_next, is_sc);
+       update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
 
 end:
        if (available != NULL)


> 
> Thanks for the clarification.
> 
> -- 
> Cheers,
> Jia
> 

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
  2017-11-07  4:36         ` Jerin Jacob
@ 2017-11-07  8:34           ` Jia He
  2017-11-07  9:57             ` Jerin Jacob
  0 siblings, 1 reply; 13+ messages in thread
From: Jia He @ 2017-11-07  8:34 UTC (permalink / raw)
  To: Jerin Jacob
  Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson,
	jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he



On 11/7/2017 12:36 PM, Jerin Jacob Wrote:
> -----Original Message-----
>
> On option could be to change the prototype of update_tail() and make
> compiler accommodate it for zero cost for arm64(Which I think, it it the
> case. But you can check the generated instructions)
> If not, move, __rte_ring_do_dequeue() and __rte_ring_do_enqueue() instead of
> __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail()
>
>
> ➜ [master][dpdk.org] $ git diff
> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> index 5e9b3b7b4..b32648825 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -358,8 +358,12 @@ void rte_ring_dump(FILE *f, const struct rte_ring
> *r);
>   
>   static __rte_always_inline void
>   update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
> new_val,
> -               uint32_t single)
> +               uint32_t single, const uint32_t enqueue)
>   {
> +       if (enqueue)
> +               rte_smp_wmb();
> +       else
> +               rte_smp_rmb();
>          /*
>           * If there are other enqueues/dequeues in progress that
>           * preceded us,
>           * we need to wait for them to complete
> @@ -470,9 +474,8 @@ __rte_ring_do_enqueue(struct rte_ring *r, void *
> const *obj_table,
>                  goto end;
>   
>          ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
> -       rte_smp_wmb();
>   
> -       update_tail(&r->prod, prod_head, prod_next, is_sp);
> +       update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
>   end:
>          if (free_space != NULL)
>                  *free_space = free_entries - n;
> @@ -575,9 +578,8 @@ __rte_ring_do_dequeue(struct rte_ring *r, void
> **obj_table,
>                  goto end;
>   
>          DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
> -       rte_smp_rmb();
>   
> -       update_tail(&r->cons, cons_head, cons_next, is_sc);
> +       update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
>   
>   end:
>          if (available != NULL)
>
>
>
Hi Jerin, yes I knew this suggestion in update_tail.
But what I mean is the rte_smp_rmb() in __rte_ring_move_cons_head and 
__rte_ring_move_pros_head:
[option 1]
+        *old_head = r->cons.head;
+        rte_smp_rmb();
+        const uint32_t prod_tail = r->prod.tail;

[option 2]
+        *old_head = __atomic_load_n(&r->cons.head,
+                    __ATOMIC_ACQUIRE);
+        *old_head = r->cons.head;

ie.I wonder what is the suitable new config name to distinguish the 
above 2 options?
Thanks for the patience :-)

see my drafted patch below, the marcro "PREFER":
diff --git a/lib/librte_ring/rte_ring_c11_mem.h 
b/lib/librte_ring/rte_ring_c11_mem.h
new file mode 100644
index 0000000..22fe887
--- /dev/null
+++ b/lib/librte_ring/rte_ring_c11_mem.h
@@ -0,0 +1,305 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2017 hxt-semitech. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of hxt-semitech nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_RING_C11_MEM_H_
+#define _RTE_RING_C11_MEM_H_
+
+static __rte_always_inline void
+update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t 
new_val,
+        uint32_t single, uint32_t enqueue)
+{
+    /* Don't need wmb/rmb when we prefer to use load_acquire/
+     * store_release barrier */
+#ifndef PREFER
+    if (enqueue)
+        rte_smp_wmb();
+    else
+        rte_smp_rmb();
+#endif
+
+    /*
+     * If there are other enqueues/dequeues in progress that preceded us,
+     * we need to wait for them to complete
+     */
+    if (!single)
+        while (unlikely(ht->tail != old_val))
+            rte_pause();
+
+#ifdef PREFER
+    __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
+#else
+    ht->tail = new_val;
+#endif
+}
+
+/**
+ * @internal This function updates the producer head for enqueue
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sp
+ *   Indicates whether multi-producer path is needed or not
+ * @param n
+ *   The number of elements we will want to enqueue, i.e. how far 
should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where enqueue 
starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where enqueue finishes
+ * @param free_entries
+ *   Returns the amount of free space in the ring BEFORE head was moved
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
+        unsigned int n, enum rte_ring_queue_behavior behavior,
+        uint32_t *old_head, uint32_t *new_head,
+        uint32_t *free_entries)
+{
+    const uint32_t capacity = r->capacity;
+    unsigned int max = n;
+    int success;
+
+    do {
+        /* Reset n to the initial burst count */
+        n = max;
+
+#ifdef PREFER
+        *old_head = __atomic_load_n(&r->prod.head,
+                    __ATOMIC_ACQUIRE);
+#else
+        *old_head = r->prod.head;
+        /* prevent reorder of load/load */
+        rte_smp_rmb();
+#endif
+        const uint32_t cons_tail = r->cons.tail;
+        /*
+         *  The subtraction is done between two unsigned 32bits value
+         * (the result is always modulo 32 bits even if we have
+         * *old_head > cons_tail). So 'free_entries' is always between 0
+         * and capacity (which is < size).
+         */
+        *free_entries = (capacity + cons_tail - *old_head);
+
+        /* check that we have enough room in ring */
+        if (unlikely(n > *free_entries))
+            n = (behavior == RTE_RING_QUEUE_FIXED) ?
+                    0 : *free_entries;
+
+        if (n == 0)
+            return 0;
+
+        *new_head = *old_head + n;
+        if (is_sp)
+            r->prod.head = *new_head, success = 1;
+        else
+#ifdef PREFER
+            success = arch_rte_atomic32_cmpset(&r->prod.head,
+                    old_head, *new_head,
+                    0, __ATOMIC_ACQUIRE,
+                    __ATOMIC_RELAXED);
+#else
+            success = rte_atomic32_cmpset(&r->prod.head,
+                    *old_head, *new_head);
+#endif
+    } while (unlikely(success == 0));
+    return n;
+}
+
+/**
+ * @internal Enqueue several objects on the ring
+ *
+  * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param is_sp
+ *   Indicates whether to use single producer or multi-producer head update
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
+         unsigned int n, enum rte_ring_queue_behavior behavior,
+         int is_sp, unsigned int *free_space)
+{
+    uint32_t prod_head, prod_next;
+    uint32_t free_entries;
+
+    n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
+            &prod_head, &prod_next, &free_entries);
+    if (n == 0)
+        goto end;
+
+    ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
+
+    update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
+end:
+    if (free_space != NULL)
+        *free_space = free_entries - n;
+    return n;
+}
+
+/**
+ * @internal This function updates the consumer head for dequeue
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sc
+ *   Indicates whether multi-consumer path is needed or not
+ * @param n
+ *   The number of elements we will want to enqueue, i.e. how far 
should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where dequeue 
starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where dequeue finishes
+ * @param entries
+ *   Returns the number of entries in the ring BEFORE head was moved
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
+        unsigned int n, enum rte_ring_queue_behavior behavior,
+        uint32_t *old_head, uint32_t *new_head,
+        uint32_t *entries)
+{
+    unsigned int max = n;
+    int success;
+
+    /* move cons.head atomically */
+    do {
+        /* Restore n as it may change every loop */
+        n = max;
+#ifdef PREFER
+        *old_head = __atomic_load_n(&r->cons.head,
+                    __ATOMIC_ACQUIRE);
+#else
+        *old_head = r->cons.head;
+        /*  prevent reorder of load/load */
+        rte_smp_rmb();
+#endif
+
+        const uint32_t prod_tail = r->prod.tail;
+        /* The subtraction is done between two unsigned 32bits value
+         * (the result is always modulo 32 bits even if we have
+         * cons_head > prod_tail). So 'entries' is always between 0
+         * and size(ring)-1. */
+        *entries = (prod_tail - *old_head);
+
+        /* Set the actual entries for dequeue */
+        if (n > *entries)
+            n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+        if (unlikely(n == 0))
+            return 0;
+
+        *new_head = *old_head + n;
+        if (is_sc)
+            r->cons.head = *new_head, success = 1;
+        else
+#ifdef PREFER
+            success = arch_rte_atomic32_cmpset(&r->cons.head,
+                            old_head, *new_head,
+                            0, __ATOMIC_ACQUIRE,
+                            __ATOMIC_RELAXED);
+#else
+            success = rte_atomic32_cmpset(&r->cons.head, *old_head,
+                    *new_head);
+#endif
+    } while (unlikely(success == 0));
+    return n;
+}
+
+/**
+ * @internal Dequeue several objects from the ring
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param is_sc
+ *   Indicates whether to use single consumer or multi-consumer head update
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has 
finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
+         unsigned int n, enum rte_ring_queue_behavior behavior,
+         int is_sc, unsigned int *available)
+{
+    uint32_t cons_head, cons_next;
+    uint32_t entries;
+
+    n = __rte_ring_move_cons_head(r, is_sc, n, behavior,
+            &cons_head, &cons_next, &entries);
+    if (n == 0)
+        goto end;
+
+    DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
+
+    update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
+
+end:
+    if (available != NULL)
+        *available = entries - n;
+    return n;
+}
+
+#endif /* _RTE_RING_C11_MEM_H_ */
+
diff --git a/lib/librte_ring/rte_ring_generic.h 
b/lib/librte_ring/rte_ring_generic.h
new file mode 100644
index 0000000..0ce6d57
--- /dev/null
+++ b/lib/librte_ring/rte_ring_generic.h
@@ -0,0 +1,268 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2017 hxt-semitech. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of hxt-semitech nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_RING_GENERIC_H_
+#define _RTE_RING_GENERIC_H_
+
+static __rte_always_inline void
+update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t 
new_val,
+        uint32_t single, uint32_t enqueue)
+{
+    if (enqueue)
+        rte_smp_wmb();
+    else
+        rte_smp_rmb();
+    /*
+     * If there are other enqueues/dequeues in progress that preceded us,
+     * we need to wait for them to complete
+     */
+    if (!single)
+        while (unlikely(ht->tail != old_val))
+            rte_pause();
+
+    ht->tail = new_val;
+}
+
+/**
+ * @internal This function updates the producer head for enqueue
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sp
+ *   Indicates whether multi-producer path is needed or not
+ * @param n
+ *   The number of elements we will want to enqueue, i.e. how far 
should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where enqueue 
starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where enqueue finishes
+ * @param free_entries
+ *   Returns the amount of free space in the ring BEFORE head was moved
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
+        unsigned int n, enum rte_ring_queue_behavior behavior,
+        uint32_t *old_head, uint32_t *new_head,
+        uint32_t *free_entries)
+{
+    const uint32_t capacity = r->capacity;
+    unsigned int max = n;
+    int success;
+
+    do {
+        /* Reset n to the initial burst count */
+        n = max;
+
+        *old_head = r->prod.head;
+        const uint32_t cons_tail = r->cons.tail;
+        /*
+         *  The subtraction is done between two unsigned 32bits value
+         * (the result is always modulo 32 bits even if we have
+         * *old_head > cons_tail). So 'free_entries' is always between 0
+         * and capacity (which is < size).
+         */
+        *free_entries = (capacity + cons_tail - *old_head);
+
+        /* check that we have enough room in ring */
+        if (unlikely(n > *free_entries))
+            n = (behavior == RTE_RING_QUEUE_FIXED) ?
+                    0 : *free_entries;
+
+        if (n == 0)
+            return 0;
+
+        *new_head = *old_head + n;
+        if (is_sp)
+            r->prod.head = *new_head, success = 1;
+        else
+            success = rte_atomic32_cmpset(&r->prod.head,
+                    *old_head, *new_head);
+    } while (unlikely(success == 0));
+    return n;
+}
+
+/**
+ * @internal Enqueue several objects on the ring
+ *
+  * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param is_sp
+ *   Indicates whether to use single producer or multi-producer head update
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
+         unsigned int n, enum rte_ring_queue_behavior behavior,
+         int is_sp, unsigned int *free_space)
+{
+    uint32_t prod_head, prod_next;
+    uint32_t free_entries;
+
+    n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
+            &prod_head, &prod_next, &free_entries);
+    if (n == 0)
+        goto end;
+
+    ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
+
+    update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
+end:
+    if (free_space != NULL)
+        *free_space = free_entries - n;
+    return n;
+}
+
+/**
+ * @internal This function updates the consumer head for dequeue
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sc
+ *   Indicates whether multi-consumer path is needed or not
+ * @param n
+ *   The number of elements we will want to enqueue, i.e. how far 
should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where dequeue 
starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where dequeue finishes
+ * @param entries
+ *   Returns the number of entries in the ring BEFORE head was moved
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
+        unsigned int n, enum rte_ring_queue_behavior behavior,
+        uint32_t *old_head, uint32_t *new_head,
+        uint32_t *entries)
+{
+    unsigned int max = n;
+    int success;
+
+    /* move cons.head atomically */
+    do {
+        /* Restore n as it may change every loop */
+        n = max;
+
+        *old_head = r->cons.head;
+        const uint32_t prod_tail = r->prod.tail;
+        /* The subtraction is done between two unsigned 32bits value
+         * (the result is always modulo 32 bits even if we have
+         * cons_head > prod_tail). So 'entries' is always between 0
+         * and size(ring)-1. */
+        *entries = (prod_tail - *old_head);
+
+        /* Set the actual entries for dequeue */
+        if (n > *entries)
+            n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+        if (unlikely(n == 0))
+            return 0;
+
+        *new_head = *old_head + n;
+        if (is_sc)
+            r->cons.head = *new_head, success = 1;
+        else
+            success = rte_atomic32_cmpset(&r->cons.head, *old_head,
+                    *new_head);
+    } while (unlikely(success == 0));
+    return n;
+}
+
+/**
+ * @internal Dequeue several objects from the ring
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param is_sc
+ *   Indicates whether to use single consumer or multi-consumer head update
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has 
finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
+         unsigned int n, enum rte_ring_queue_behavior behavior,
+         int is_sc, unsigned int *available)
+{
+    uint32_t cons_head, cons_next;
+    uint32_t entries;
+
+    n = __rte_ring_move_cons_head(r, is_sc, n, behavior,
+            &cons_head, &cons_next, &entries);
+    if (n == 0)
+        goto end;
+
+    DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
+
+    update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
+
+end:
+    if (available != NULL)
+        *available = entries - n;
+    return n;
+}
+
+#endif /* _RTE_RING_GENERIC_H_ */
+
-- 
2.7.4


-- 
Cheers,
Jia

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
  2017-11-07  8:34           ` Jia He
@ 2017-11-07  9:57             ` Jerin Jacob
  2017-11-08  2:31               ` Jia He
  0 siblings, 1 reply; 13+ messages in thread
From: Jerin Jacob @ 2017-11-07  9:57 UTC (permalink / raw)
  To: Jia He
  Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson,
	jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he

-----Original Message-----
> Date: Tue, 7 Nov 2017 16:34:30 +0800
> From: Jia He <hejianet@gmail.com>
> To: Jerin Jacob <jerin.jacob@caviumnetworks.com>
> Cc: dev@dpdk.org, olivier.matz@6wind.com, konstantin.ananyev@intel.com,
>  bruce.richardson@intel.com, jianbo.liu@arm.com, hemant.agrawal@nxp.com,
>  jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com,
>  jia.he@hxt-semitech.com
> Subject: Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when
>  doing
> User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101
>  Thunderbird/52.4.0
> 
> 
> 
> On 11/7/2017 12:36 PM, Jerin Jacob Wrote:
> > -----Original Message-----
> > 
> > On option could be to change the prototype of update_tail() and make
> > compiler accommodate it for zero cost for arm64(Which I think, it it the
> > case. But you can check the generated instructions)
> > If not, move, __rte_ring_do_dequeue() and __rte_ring_do_enqueue() instead of
> > __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail()
> > 
> > 
> > ➜ [master][dpdk.org] $ git diff
> > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> > index 5e9b3b7b4..b32648825 100644
> > --- a/lib/librte_ring/rte_ring.h
> > +++ b/lib/librte_ring/rte_ring.h
> > @@ -358,8 +358,12 @@ void rte_ring_dump(FILE *f, const struct rte_ring
> > *r);
> >   static __rte_always_inline void
> >   update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
> > new_val,
> > -               uint32_t single)
> > +               uint32_t single, const uint32_t enqueue)
> >   {
> > +       if (enqueue)
> > +               rte_smp_wmb();
> > +       else
> > +               rte_smp_rmb();
> >          /*
> >           * If there are other enqueues/dequeues in progress that
> >           * preceded us,
> >           * we need to wait for them to complete
> > @@ -470,9 +474,8 @@ __rte_ring_do_enqueue(struct rte_ring *r, void *
> > const *obj_table,
> >                  goto end;
> >          ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
> > -       rte_smp_wmb();
> > -       update_tail(&r->prod, prod_head, prod_next, is_sp);
> > +       update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
> >   end:
> >          if (free_space != NULL)
> >                  *free_space = free_entries - n;
> > @@ -575,9 +578,8 @@ __rte_ring_do_dequeue(struct rte_ring *r, void
> > **obj_table,
> >                  goto end;
> >          DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
> > -       rte_smp_rmb();
> > -       update_tail(&r->cons, cons_head, cons_next, is_sc);
> > +       update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
> >   end:
> >          if (available != NULL)
> > 
> > 
> > 
> Hi Jerin, yes I knew this suggestion in update_tail.
> But what I mean is the rte_smp_rmb() in __rte_ring_move_cons_head and
> __rte_ring_move_pros_head:
> [option 1]
> +        *old_head = r->cons.head;
> +        rte_smp_rmb();
> +        const uint32_t prod_tail = r->prod.tail;
> 
> [option 2]
> +        *old_head = __atomic_load_n(&r->cons.head,
> +                    __ATOMIC_ACQUIRE);
> +        *old_head = r->cons.head;
> 
> ie.I wonder what is the suitable new config name to distinguish the above 2
> options?

Why?
If you fix the generic version with rte_smp_rmb() then we just need only
one config to differentiate between c11 vs generic. See comments below,

> Thanks for the patience :-)
> 
> see my drafted patch below, the marcro "PREFER":
> + */
> +
> +#ifndef _RTE_RING_C11_MEM_H_
> +#define _RTE_RING_C11_MEM_H_
> +
> +static __rte_always_inline void
> +update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
> new_val,
> +        uint32_t single, uint32_t enqueue)
> +{
> +    /* Don't need wmb/rmb when we prefer to use load_acquire/
> +     * store_release barrier */
> +#ifndef PREFER
> +    if (enqueue)
> +        rte_smp_wmb();
> +    else
> +        rte_smp_rmb();
> +#endif

You can remove PREFER and let the "generic" version has this. For x86,
rte_smp_?mb() it will be NOOP. So no issue.

> +
> +    /*
> +     * If there are other enqueues/dequeues in progress that preceded us,
> +     * we need to wait for them to complete
> +     */
> +    if (!single)
> +        while (unlikely(ht->tail != old_val))
> +            rte_pause();
> +
> +#ifdef PREFER
> +    __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);

for c11 mem model version, it needs only __atomic_store_n version.

> +#else
> +    ht->tail = new_val;
> +#endif
> +}
> +
> +/**
> + * @internal This function updates the producer head for enqueue
> + *
> + * @param r
> + *   A pointer to the ring structure
> + * @param is_sp
> + *   Indicates whether multi-producer path is needed or not
> + * @param n
> + *   The number of elements we will want to enqueue, i.e. how far should
> the
> + *   head be moved
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
> + * @param old_head
> + *   Returns head value as it was before the move, i.e. where enqueue
> starts
> + * @param new_head
> + *   Returns the current/new head value i.e. where enqueue finishes
> + * @param free_entries
> + *   Returns the amount of free space in the ring BEFORE head was moved
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> +        unsigned int n, enum rte_ring_queue_behavior behavior,
> +        uint32_t *old_head, uint32_t *new_head,
> +        uint32_t *free_entries)
> +{
> +    const uint32_t capacity = r->capacity;
> +    unsigned int max = n;
> +    int success;
> +
> +    do {
> +        /* Reset n to the initial burst count */
> +        n = max;
> +
> +#ifdef PREFER
> +        *old_head = __atomic_load_n(&r->prod.head,
> +                    __ATOMIC_ACQUIRE);
> +#else
> +        *old_head = r->prod.head;
> +        /* prevent reorder of load/load */
> +        rte_smp_rmb();
> +#endif

Same as above comment.

> +        const uint32_t cons_tail = r->cons.tail;
> +        /*
> +         *  The subtraction is done between two unsigned 32bits value
> +         * (the result is always modulo 32 bits even if we have
> +         * *old_head > cons_tail). So 'free_entries' is always between 0
> +         * and capacity (which is < size).
> +         */
> +        *free_entries = (capacity + cons_tail - *old_head);
> +
> +        /* check that we have enough room in ring */
> +        if (unlikely(n > *free_entries))

> +static __rte_always_inline unsigned int
> +__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
> +         unsigned int n, enum rte_ring_queue_behavior behavior,
> +         int is_sp, unsigned int *free_space)
> +{


Duplicate function, No need to replicate on both versions.


> +static __rte_always_inline unsigned int
> +__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
> +        unsigned int n, enum rte_ring_queue_behavior behavior,
> +        uint32_t *old_head, uint32_t *new_head,
> +        uint32_t *entries)
> +{
> +    unsigned int max = n;
> +    int success;
> +
> +    /* move cons.head atomically */
> +    do {
> +        /* Restore n as it may change every loop */
> +        n = max;
> +#ifdef PREFER
> +        *old_head = __atomic_load_n(&r->cons.head,
> +                    __ATOMIC_ACQUIRE);
> +#else
> +        *old_head = r->cons.head;
> +        /*  prevent reorder of load/load */
> +        rte_smp_rmb();
> +#endif

Same as above comment

> +
> +        const uint32_t prod_tail = r->prod.tail;
> +        /* The subtraction is done between two unsigned 32bits value
> +         * (the result is always modulo 32 bits even if we have
> +         * cons_head > prod_tail). So 'entries' is always between 0
> +         * and size(ring)-1. */
> +        *entries = (prod_tail - *old_head);
> +
> +        /* Set the actual entries for dequeue */
> +        if (n > *entries)
> +            n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
> +
> +        if (unlikely(n == 0))
> +            return 0;
> +
> +        *new_head = *old_head + n;
> +        if (is_sc)
> +            r->cons.head = *new_head, success = 1;
> +        else
> +#ifdef PREFER
> +            success = arch_rte_atomic32_cmpset(&r->cons.head,
> +                            old_head, *new_head,
> +                            0, __ATOMIC_ACQUIRE,
> +                            __ATOMIC_RELAXED);
> +#else
> +            success = rte_atomic32_cmpset(&r->cons.head, *old_head,
> +                    *new_head);
> +#endif

Same as above comment

> +    } while (unlikely(success == 0));
> +    return n;
> +}
> +
> +/**
> + * @internal Dequeue several objects from the ring
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
> + * @param is_sc
> + *   Indicates whether to use single consumer or multi-consumer head update
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue has
> finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
> +         unsigned int n, enum rte_ring_queue_behavior behavior,
> +         int is_sc, unsigned int *available)
> +{

Duplicate function, No need to replicate on both versions.


> +    uint32_t cons_head, cons_next;
> +    uint32_t entries;
> +
> +    n = __rte_ring_move_cons_head(r, is_sc, n, behavior,
> +            &cons_head, &cons_next, &entries);
> +    if (n == 0)
> +        goto end;
> +
> +    DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
> +
> +    update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
> +
> +end:
> +    if (available != NULL)
> +        *available = entries - n;
> +    return n;
> +}
> +
> +#endif /* _RTE_RING_C11_MEM_H_ */
> +
> diff --git a/lib/librte_ring/rte_ring_generic.h
> b/lib/librte_ring/rte_ring_generic.h
> new file mode 100644
> index 0000000..0ce6d57
> --- /dev/null
> +++ b/lib/librte_ring/rte_ring_generic.h
> @@ -0,0 +1,268 @@
> +/*-
> + *   BSD LICENSE
> + *
> + *   Copyright(c) 2017 hxt-semitech. All rights reserved.
> + *
> + *   Redistribution and use in source and binary forms, with or without
> + *   modification, are permitted provided that the following conditions
> + *   are met:
> + *
> + *     * Redistributions of source code must retain the above copyright
> + *       notice, this list of conditions and the following disclaimer.
> + *     * Redistributions in binary form must reproduce the above copyright
> + *       notice, this list of conditions and the following disclaimer in
> + *       the documentation and/or other materials provided with the
> + *       distribution.
> + *     * Neither the name of hxt-semitech nor the names of its
> + *       contributors may be used to endorse or promote products derived
> + *       from this software without specific prior written permission.
> + *
> + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> + */
> +
> +#ifndef _RTE_RING_GENERIC_H_
> +#define _RTE_RING_GENERIC_H_
> +
> +static __rte_always_inline void
> +update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
> new_val,
> +        uint32_t single, uint32_t enqueue)
> +{
> +    if (enqueue)
> +        rte_smp_wmb();
> +    else
> +        rte_smp_rmb();
> +    /*
> +     * If there are other enqueues/dequeues in progress that preceded us,
> +     * we need to wait for them to complete
> +     */
> +    if (!single)
> +        while (unlikely(ht->tail != old_val))
> +            rte_pause();
> +
> +    ht->tail = new_val;
> +}
> +
> +/**
> + * @internal This function updates the producer head for enqueue
> + *
> + * @param r
> + *   A pointer to the ring structure
> + * @param is_sp
> + *   Indicates whether multi-producer path is needed or not
> + * @param n
> + *   The number of elements we will want to enqueue, i.e. how far should
> the
> + *   head be moved
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
> + * @param old_head
> + *   Returns head value as it was before the move, i.e. where enqueue
> starts
> + * @param new_head
> + *   Returns the current/new head value i.e. where enqueue finishes
> + * @param free_entries
> + *   Returns the amount of free space in the ring BEFORE head was moved
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> +        unsigned int n, enum rte_ring_queue_behavior behavior,
> +        uint32_t *old_head, uint32_t *new_head,
> +        uint32_t *free_entries)
> +{
> +    const uint32_t capacity = r->capacity;
> +    unsigned int max = n;
> +    int success;
> +
> +    do {
> +        /* Reset n to the initial burst count */
> +        n = max;
> +
> +        *old_head = r->prod.head;

adding rte_smp_rmb() no harm here as it is NOOP for x86 and it is
semantically correct too.


> +        const uint32_t cons_tail = r->cons.tail;
> +        /*
> +         *  The subtraction is done between two unsigned 32bits value
> +         * (the result is always modulo 32 bits even if we have
> +         * *old_head > cons_tail). So 'free_entries' is always between 0
> +         * and capacity (which is < size).
> +         */
> +        *free_entries = (capacity + cons_tail - *old_head);
> +
> +        /* check that we have enough room in ring */
> +        if (unlikely(n > *free_entries))
> +            n = (behavior == RTE_RING_QUEUE_FIXED) ?
> +                    0 : *free_entries;
> +
> +        if (n == 0)
> +            return 0;
> +
> +        *new_head = *old_head + n;
> +        if (is_sp)
> +            r->prod.head = *new_head, success = 1;
> +        else
> +            success = rte_atomic32_cmpset(&r->prod.head,
> +                    *old_head, *new_head);
> +    } while (unlikely(success == 0));
> +    return n;
> +}
> +
> +/**
> + * @internal Enqueue several objects on the ring
> + *
> +  * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
> + * @param is_sp
> + *   Indicates whether to use single producer or multi-producer head update
> + * @param free_space
> + *   returns the amount of space after the enqueue operation has finished
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
> +         unsigned int n, enum rte_ring_queue_behavior behavior,
> +         int is_sp, unsigned int *free_space)
> +{

Duplicate function, No need to replicate on both versions.

> +
> +/**
> + * @internal This function updates the consumer head for dequeue
> + *
> + * @param r
> + *   A pointer to the ring structure
> + * @param is_sc
> + *   Indicates whether multi-consumer path is needed or not
> + * @param n
> + *   The number of elements we will want to enqueue, i.e. how far should
> the
> + *   head be moved
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
> + * @param old_head
> + *   Returns head value as it was before the move, i.e. where dequeue
> starts
> + * @param new_head
> + *   Returns the current/new head value i.e. where dequeue finishes
> + * @param entries
> + *   Returns the number of entries in the ring BEFORE head was moved
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
> +        unsigned int n, enum rte_ring_queue_behavior behavior,
> +        uint32_t *old_head, uint32_t *new_head,
> +        uint32_t *entries)
> +{
> +    unsigned int max = n;
> +    int success;
> +
> +    /* move cons.head atomically */
> +    do {
> +        /* Restore n as it may change every loop */
> +        n = max;
> +
> +        *old_head = r->cons.head;
> +        const uint32_t prod_tail = r->prod.tail;

Same as above comment.

> +        /* The subtraction is done between two unsigned 32bits value
> +         * (the result is always modulo 32 bits even if we have
> +         * cons_head > prod_tail). So 'entries' is always between 0
> +         * and size(ring)-1. */
> +        *entries = (prod_tail - *old_head);
> +
> +        /* Set the actual entries for dequeue */
> +        if (n > *entries)
> +            n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
> +
> +        if (unlikely(n == 0))
> +            return 0;
> +
> +        *new_head = *old_head + n;
> +        if (is_sc)
> +            r->cons.head = *new_head, success = 1;
> +        else
> +            success = rte_atomic32_cmpset(&r->cons.head, *old_head,
> +                    *new_head);
> +    } while (unlikely(success == 0));
> +    return n;
> +}
> +
> +/**
> + * @internal Dequeue several objects from the ring
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
> + * @param is_sc
> + *   Indicates whether to use single consumer or multi-consumer head update
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue has
> finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
> +         unsigned int n, enum rte_ring_queue_behavior behavior,
> +         int is_sc, unsigned int *available)
> +{
Duplicate function, No need to replicate on both versions.
> 

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
  2017-11-07  9:57             ` Jerin Jacob
@ 2017-11-08  2:31               ` Jia He
  0 siblings, 0 replies; 13+ messages in thread
From: Jia He @ 2017-11-08  2:31 UTC (permalink / raw)
  To: Jerin Jacob
  Cc: dev, olivier.matz, konstantin.ananyev, bruce.richardson,
	jianbo.liu, hemant.agrawal, jie2.liu, bing.zhao, jia.he

Hi Jerin

Thank you,

I mistakenly think x86 doen't need rte_smp_rmb().

Since rte_smp_rmb() only impact x86's compiler optimization, I will 
simplify the codes as your suggestions

Cheers,

Jia


On 11/7/2017 5:57 PM, Jerin Jacob Wrote:
> -----Original Message-----
>> Date: Tue, 7 Nov 2017 16:34:30 +0800
>> From: Jia He <hejianet@gmail.com>
>> To: Jerin Jacob <jerin.jacob@caviumnetworks.com>
>> Cc: dev@dpdk.org, olivier.matz@6wind.com, konstantin.ananyev@intel.com,
>>   bruce.richardson@intel.com, jianbo.liu@arm.com, hemant.agrawal@nxp.com,
>>   jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com,
>>   jia.he@hxt-semitech.com
>> Subject: Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when
>>   doing
>> User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101
>>   Thunderbird/52.4.0
>>
>>
>>
>> On 11/7/2017 12:36 PM, Jerin Jacob Wrote:
>>> -----Original Message-----
>>>
>>> On option could be to change the prototype of update_tail() and make
>>> compiler accommodate it for zero cost for arm64(Which I think, it it the
>>> case. But you can check the generated instructions)
>>> If not, move, __rte_ring_do_dequeue() and __rte_ring_do_enqueue() instead of
>>> __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail()
>>>
>>>
>>> ➜ [master][dpdk.org] $ git diff
>>> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
>>> index 5e9b3b7b4..b32648825 100644
>>> --- a/lib/librte_ring/rte_ring.h
>>> +++ b/lib/librte_ring/rte_ring.h
>>> @@ -358,8 +358,12 @@ void rte_ring_dump(FILE *f, const struct rte_ring
>>> *r);
>>>    static __rte_always_inline void
>>>    update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
>>> new_val,
>>> -               uint32_t single)
>>> +               uint32_t single, const uint32_t enqueue)
>>>    {
>>> +       if (enqueue)
>>> +               rte_smp_wmb();
>>> +       else
>>> +               rte_smp_rmb();
>>>           /*
>>>            * If there are other enqueues/dequeues in progress that
>>>            * preceded us,
>>>            * we need to wait for them to complete
>>> @@ -470,9 +474,8 @@ __rte_ring_do_enqueue(struct rte_ring *r, void *
>>> const *obj_table,
>>>                   goto end;
>>>           ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
>>> -       rte_smp_wmb();
>>> -       update_tail(&r->prod, prod_head, prod_next, is_sp);
>>> +       update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
>>>    end:
>>>           if (free_space != NULL)
>>>                   *free_space = free_entries - n;
>>> @@ -575,9 +578,8 @@ __rte_ring_do_dequeue(struct rte_ring *r, void
>>> **obj_table,
>>>                   goto end;
>>>           DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
>>> -       rte_smp_rmb();
>>> -       update_tail(&r->cons, cons_head, cons_next, is_sc);
>>> +       update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
>>>    end:
>>>           if (available != NULL)
>>>
>>>
>>>
>> Hi Jerin, yes I knew this suggestion in update_tail.
>> But what I mean is the rte_smp_rmb() in __rte_ring_move_cons_head and
>> __rte_ring_move_pros_head:
>> [option 1]
>> +        *old_head = r->cons.head;
>> +        rte_smp_rmb();
>> +        const uint32_t prod_tail = r->prod.tail;
>>
>> [option 2]
>> +        *old_head = __atomic_load_n(&r->cons.head,
>> +                    __ATOMIC_ACQUIRE);
>> +        *old_head = r->cons.head;
>>
>> ie.I wonder what is the suitable new config name to distinguish the above 2
>> options?
> Why?
> If you fix the generic version with rte_smp_rmb() then we just need only
> one config to differentiate between c11 vs generic. See comments below,
>
>> Thanks for the patience :-)
>>
>> see my drafted patch below, the marcro "PREFER":
>> + */
>> +
>> +#ifndef _RTE_RING_C11_MEM_H_
>> +#define _RTE_RING_C11_MEM_H_
>> +
>> +static __rte_always_inline void
>> +update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
>> new_val,
>> +        uint32_t single, uint32_t enqueue)
>> +{
>> +    /* Don't need wmb/rmb when we prefer to use load_acquire/
>> +     * store_release barrier */
>> +#ifndef PREFER
>> +    if (enqueue)
>> +        rte_smp_wmb();
>> +    else
>> +        rte_smp_rmb();
>> +#endif
> You can remove PREFER and let the "generic" version has this. For x86,
> rte_smp_?mb() it will be NOOP. So no issue.
>
>> +
>> +    /*
>> +     * If there are other enqueues/dequeues in progress that preceded us,
>> +     * we need to wait for them to complete
>> +     */
>> +    if (!single)
>> +        while (unlikely(ht->tail != old_val))
>> +            rte_pause();
>> +
>> +#ifdef PREFER
>> +    __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
> for c11 mem model version, it needs only __atomic_store_n version.
>
>> +#else
>> +    ht->tail = new_val;
>> +#endif
>> +}
>> +
>> +/**
>> + * @internal This function updates the producer head for enqueue
>> + *
>> + * @param r
>> + *   A pointer to the ring structure
>> + * @param is_sp
>> + *   Indicates whether multi-producer path is needed or not
>> + * @param n
>> + *   The number of elements we will want to enqueue, i.e. how far should
>> the
>> + *   head be moved
>> + * @param behavior
>> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
>> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
>> + * @param old_head
>> + *   Returns head value as it was before the move, i.e. where enqueue
>> starts
>> + * @param new_head
>> + *   Returns the current/new head value i.e. where enqueue finishes
>> + * @param free_entries
>> + *   Returns the amount of free space in the ring BEFORE head was moved
>> + * @return
>> + *   Actual number of objects enqueued.
>> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
>> + */
>> +static __rte_always_inline unsigned int
>> +__rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
>> +        unsigned int n, enum rte_ring_queue_behavior behavior,
>> +        uint32_t *old_head, uint32_t *new_head,
>> +        uint32_t *free_entries)
>> +{
>> +    const uint32_t capacity = r->capacity;
>> +    unsigned int max = n;
>> +    int success;
>> +
>> +    do {
>> +        /* Reset n to the initial burst count */
>> +        n = max;
>> +
>> +#ifdef PREFER
>> +        *old_head = __atomic_load_n(&r->prod.head,
>> +                    __ATOMIC_ACQUIRE);
>> +#else
>> +        *old_head = r->prod.head;
>> +        /* prevent reorder of load/load */
>> +        rte_smp_rmb();
>> +#endif
> Same as above comment.
>
>> +        const uint32_t cons_tail = r->cons.tail;
>> +        /*
>> +         *  The subtraction is done between two unsigned 32bits value
>> +         * (the result is always modulo 32 bits even if we have
>> +         * *old_head > cons_tail). So 'free_entries' is always between 0
>> +         * and capacity (which is < size).
>> +         */
>> +        *free_entries = (capacity + cons_tail - *old_head);
>> +
>> +        /* check that we have enough room in ring */
>> +        if (unlikely(n > *free_entries))
>> +static __rte_always_inline unsigned int
>> +__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
>> +         unsigned int n, enum rte_ring_queue_behavior behavior,
>> +         int is_sp, unsigned int *free_space)
>> +{
>
> Duplicate function, No need to replicate on both versions.
>
>
>> +static __rte_always_inline unsigned int
>> +__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
>> +        unsigned int n, enum rte_ring_queue_behavior behavior,
>> +        uint32_t *old_head, uint32_t *new_head,
>> +        uint32_t *entries)
>> +{
>> +    unsigned int max = n;
>> +    int success;
>> +
>> +    /* move cons.head atomically */
>> +    do {
>> +        /* Restore n as it may change every loop */
>> +        n = max;
>> +#ifdef PREFER
>> +        *old_head = __atomic_load_n(&r->cons.head,
>> +                    __ATOMIC_ACQUIRE);
>> +#else
>> +        *old_head = r->cons.head;
>> +        /*  prevent reorder of load/load */
>> +        rte_smp_rmb();
>> +#endif
> Same as above comment
>
>> +
>> +        const uint32_t prod_tail = r->prod.tail;
>> +        /* The subtraction is done between two unsigned 32bits value
>> +         * (the result is always modulo 32 bits even if we have
>> +         * cons_head > prod_tail). So 'entries' is always between 0
>> +         * and size(ring)-1. */
>> +        *entries = (prod_tail - *old_head);
>> +
>> +        /* Set the actual entries for dequeue */
>> +        if (n > *entries)
>> +            n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
>> +
>> +        if (unlikely(n == 0))
>> +            return 0;
>> +
>> +        *new_head = *old_head + n;
>> +        if (is_sc)
>> +            r->cons.head = *new_head, success = 1;
>> +        else
>> +#ifdef PREFER
>> +            success = arch_rte_atomic32_cmpset(&r->cons.head,
>> +                            old_head, *new_head,
>> +                            0, __ATOMIC_ACQUIRE,
>> +                            __ATOMIC_RELAXED);
>> +#else
>> +            success = rte_atomic32_cmpset(&r->cons.head, *old_head,
>> +                    *new_head);
>> +#endif
> Same as above comment
>
>> +    } while (unlikely(success == 0));
>> +    return n;
>> +}
>> +
>> +/**
>> + * @internal Dequeue several objects from the ring
>> + *
>> + * @param r
>> + *   A pointer to the ring structure.
>> + * @param obj_table
>> + *   A pointer to a table of void * pointers (objects).
>> + * @param n
>> + *   The number of objects to pull from the ring.
>> + * @param behavior
>> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
>> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
>> + * @param is_sc
>> + *   Indicates whether to use single consumer or multi-consumer head update
>> + * @param available
>> + *   returns the number of remaining ring entries after the dequeue has
>> finished
>> + * @return
>> + *   - Actual number of objects dequeued.
>> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
>> + */
>> +static __rte_always_inline unsigned int
>> +__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
>> +         unsigned int n, enum rte_ring_queue_behavior behavior,
>> +         int is_sc, unsigned int *available)
>> +{
> Duplicate function, No need to replicate on both versions.
>
>
>> +    uint32_t cons_head, cons_next;
>> +    uint32_t entries;
>> +
>> +    n = __rte_ring_move_cons_head(r, is_sc, n, behavior,
>> +            &cons_head, &cons_next, &entries);
>> +    if (n == 0)
>> +        goto end;
>> +
>> +    DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
>> +
>> +    update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
>> +
>> +end:
>> +    if (available != NULL)
>> +        *available = entries - n;
>> +    return n;
>> +}
>> +
>> +#endif /* _RTE_RING_C11_MEM_H_ */
>> +
>> diff --git a/lib/librte_ring/rte_ring_generic.h
>> b/lib/librte_ring/rte_ring_generic.h
>> new file mode 100644
>> index 0000000..0ce6d57
>> --- /dev/null
>> +++ b/lib/librte_ring/rte_ring_generic.h
>> @@ -0,0 +1,268 @@
>> +/*-
>> + *   BSD LICENSE
>> + *
>> + *   Copyright(c) 2017 hxt-semitech. All rights reserved.
>> + *
>> + *   Redistribution and use in source and binary forms, with or without
>> + *   modification, are permitted provided that the following conditions
>> + *   are met:
>> + *
>> + *     * Redistributions of source code must retain the above copyright
>> + *       notice, this list of conditions and the following disclaimer.
>> + *     * Redistributions in binary form must reproduce the above copyright
>> + *       notice, this list of conditions and the following disclaimer in
>> + *       the documentation and/or other materials provided with the
>> + *       distribution.
>> + *     * Neither the name of hxt-semitech nor the names of its
>> + *       contributors may be used to endorse or promote products derived
>> + *       from this software without specific prior written permission.
>> + *
>> + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
>> + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
>> + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
>> + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
>> + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
>> + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
>> + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
>> + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
>> + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
>> + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
>> + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
>> + */
>> +
>> +#ifndef _RTE_RING_GENERIC_H_
>> +#define _RTE_RING_GENERIC_H_
>> +
>> +static __rte_always_inline void
>> +update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
>> new_val,
>> +        uint32_t single, uint32_t enqueue)
>> +{
>> +    if (enqueue)
>> +        rte_smp_wmb();
>> +    else
>> +        rte_smp_rmb();
>> +    /*
>> +     * If there are other enqueues/dequeues in progress that preceded us,
>> +     * we need to wait for them to complete
>> +     */
>> +    if (!single)
>> +        while (unlikely(ht->tail != old_val))
>> +            rte_pause();
>> +
>> +    ht->tail = new_val;
>> +}
>> +
>> +/**
>> + * @internal This function updates the producer head for enqueue
>> + *
>> + * @param r
>> + *   A pointer to the ring structure
>> + * @param is_sp
>> + *   Indicates whether multi-producer path is needed or not
>> + * @param n
>> + *   The number of elements we will want to enqueue, i.e. how far should
>> the
>> + *   head be moved
>> + * @param behavior
>> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
>> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
>> + * @param old_head
>> + *   Returns head value as it was before the move, i.e. where enqueue
>> starts
>> + * @param new_head
>> + *   Returns the current/new head value i.e. where enqueue finishes
>> + * @param free_entries
>> + *   Returns the amount of free space in the ring BEFORE head was moved
>> + * @return
>> + *   Actual number of objects enqueued.
>> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
>> + */
>> +static __rte_always_inline unsigned int
>> +__rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
>> +        unsigned int n, enum rte_ring_queue_behavior behavior,
>> +        uint32_t *old_head, uint32_t *new_head,
>> +        uint32_t *free_entries)
>> +{
>> +    const uint32_t capacity = r->capacity;
>> +    unsigned int max = n;
>> +    int success;
>> +
>> +    do {
>> +        /* Reset n to the initial burst count */
>> +        n = max;
>> +
>> +        *old_head = r->prod.head;
> adding rte_smp_rmb() no harm here as it is NOOP for x86 and it is
> semantically correct too.
>
>
>> +        const uint32_t cons_tail = r->cons.tail;
>> +        /*
>> +         *  The subtraction is done between two unsigned 32bits value
>> +         * (the result is always modulo 32 bits even if we have
>> +         * *old_head > cons_tail). So 'free_entries' is always between 0
>> +         * and capacity (which is < size).
>> +         */
>> +        *free_entries = (capacity + cons_tail - *old_head);
>> +
>> +        /* check that we have enough room in ring */
>> +        if (unlikely(n > *free_entries))
>> +            n = (behavior == RTE_RING_QUEUE_FIXED) ?
>> +                    0 : *free_entries;
>> +
>> +        if (n == 0)
>> +            return 0;
>> +
>> +        *new_head = *old_head + n;
>> +        if (is_sp)
>> +            r->prod.head = *new_head, success = 1;
>> +        else
>> +            success = rte_atomic32_cmpset(&r->prod.head,
>> +                    *old_head, *new_head);
>> +    } while (unlikely(success == 0));
>> +    return n;
>> +}
>> +
>> +/**
>> + * @internal Enqueue several objects on the ring
>> + *
>> +  * @param r
>> + *   A pointer to the ring structure.
>> + * @param obj_table
>> + *   A pointer to a table of void * pointers (objects).
>> + * @param n
>> + *   The number of objects to add in the ring from the obj_table.
>> + * @param behavior
>> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
>> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
>> + * @param is_sp
>> + *   Indicates whether to use single producer or multi-producer head update
>> + * @param free_space
>> + *   returns the amount of space after the enqueue operation has finished
>> + * @return
>> + *   Actual number of objects enqueued.
>> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
>> + */
>> +static __rte_always_inline unsigned int
>> +__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
>> +         unsigned int n, enum rte_ring_queue_behavior behavior,
>> +         int is_sp, unsigned int *free_space)
>> +{
> Duplicate function, No need to replicate on both versions.
>
>> +
>> +/**
>> + * @internal This function updates the consumer head for dequeue
>> + *
>> + * @param r
>> + *   A pointer to the ring structure
>> + * @param is_sc
>> + *   Indicates whether multi-consumer path is needed or not
>> + * @param n
>> + *   The number of elements we will want to enqueue, i.e. how far should
>> the
>> + *   head be moved
>> + * @param behavior
>> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
>> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
>> + * @param old_head
>> + *   Returns head value as it was before the move, i.e. where dequeue
>> starts
>> + * @param new_head
>> + *   Returns the current/new head value i.e. where dequeue finishes
>> + * @param entries
>> + *   Returns the number of entries in the ring BEFORE head was moved
>> + * @return
>> + *   - Actual number of objects dequeued.
>> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
>> + */
>> +static __rte_always_inline unsigned int
>> +__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
>> +        unsigned int n, enum rte_ring_queue_behavior behavior,
>> +        uint32_t *old_head, uint32_t *new_head,
>> +        uint32_t *entries)
>> +{
>> +    unsigned int max = n;
>> +    int success;
>> +
>> +    /* move cons.head atomically */
>> +    do {
>> +        /* Restore n as it may change every loop */
>> +        n = max;
>> +
>> +        *old_head = r->cons.head;
>> +        const uint32_t prod_tail = r->prod.tail;
> Same as above comment.
>
>> +        /* The subtraction is done between two unsigned 32bits value
>> +         * (the result is always modulo 32 bits even if we have
>> +         * cons_head > prod_tail). So 'entries' is always between 0
>> +         * and size(ring)-1. */
>> +        *entries = (prod_tail - *old_head);
>> +
>> +        /* Set the actual entries for dequeue */
>> +        if (n > *entries)
>> +            n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
>> +
>> +        if (unlikely(n == 0))
>> +            return 0;
>> +
>> +        *new_head = *old_head + n;
>> +        if (is_sc)
>> +            r->cons.head = *new_head, success = 1;
>> +        else
>> +            success = rte_atomic32_cmpset(&r->cons.head, *old_head,
>> +                    *new_head);
>> +    } while (unlikely(success == 0));
>> +    return n;
>> +}
>> +
>> +/**
>> + * @internal Dequeue several objects from the ring
>> + *
>> + * @param r
>> + *   A pointer to the ring structure.
>> + * @param obj_table
>> + *   A pointer to a table of void * pointers (objects).
>> + * @param n
>> + *   The number of objects to pull from the ring.
>> + * @param behavior
>> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
>> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
>> + * @param is_sc
>> + *   Indicates whether to use single consumer or multi-consumer head update
>> + * @param available
>> + *   returns the number of remaining ring entries after the dequeue has
>> finished
>> + * @return
>> + *   - Actual number of objects dequeued.
>> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
>> + */
>> +static __rte_always_inline unsigned int
>> +__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
>> +         unsigned int n, enum rte_ring_queue_behavior behavior,
>> +         int is_sc, unsigned int *available)
>> +{
> Duplicate function, No need to replicate on both versions.

-- 
Cheers,
Jia

^ permalink raw reply	[flat|nested] 13+ messages in thread

end of thread, other threads:[~2017-11-08  2:31 UTC | newest]

Thread overview: 13+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2017-11-02  8:43 [PATCH v2] ring: guarantee ordering of cons/prod loading when doing Jia He
2017-11-02 13:26 ` Ananyev, Konstantin
2017-11-02 15:42   ` Jia He
2017-11-02 16:16     ` Ananyev, Konstantin
2017-11-02 17:00       ` Jerin Jacob
2017-11-02 17:23 ` Jerin Jacob
2017-11-03  1:46   ` Jia He
2017-11-03 12:56     ` Jerin Jacob
2017-11-06  7:25       ` Jia He
2017-11-07  4:36         ` Jerin Jacob
2017-11-07  8:34           ` Jia He
2017-11-07  9:57             ` Jerin Jacob
2017-11-08  2:31               ` Jia He

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.