dev.dpdk.org archive mirror
 help / color / mirror / Atom feed
From: Ruifeng Wang <ruifeng.wang@arm.com>
To: david.hunt@intel.com
Cc: dev@dpdk.org, hkalra@marvell.com, gavin.hu@arm.com,
	honnappa.nagarahalli@arm.com, nd@arm.com,
	Ruifeng Wang <ruifeng.wang@arm.com>,
	stable@dpdk.org
Subject: [dpdk-dev] [PATCH v3 1/2] lib/distributor: fix deadlock issue for aarch64
Date: Tue, 15 Oct 2019 17:28:25 +0800	[thread overview]
Message-ID: <20191015092826.13002-2-ruifeng.wang@arm.com> (raw)
In-Reply-To: <20191015092826.13002-1-ruifeng.wang@arm.com>

Distributor and worker threads rely on data structs in cache line
for synchronization. The shared data structs were not protected.
This caused deadlock issue on weaker memory ordering platforms as
aarch64.
Fix this issue by adding memory barriers to ensure synchronization
among cores.

Bugzilla ID: 342
Fixes: 775003ad2f96 ("distributor: add new burst-capable library")
Cc: stable@dpdk.org

Signed-off-by: Ruifeng Wang <ruifeng.wang@arm.com>
Reviewed-by: Gavin Hu <gavin.hu@arm.com>
---
 lib/librte_distributor/meson.build           |  5 ++
 lib/librte_distributor/rte_distributor.c     | 68 ++++++++++++++------
 lib/librte_distributor/rte_distributor_v20.c | 59 ++++++++++++-----
 3 files changed, 97 insertions(+), 35 deletions(-)

diff --git a/lib/librte_distributor/meson.build b/lib/librte_distributor/meson.build
index dba7e3b2a..26577dbc1 100644
--- a/lib/librte_distributor/meson.build
+++ b/lib/librte_distributor/meson.build
@@ -9,3 +9,8 @@ else
 endif
 headers = files('rte_distributor.h')
 deps += ['mbuf']
+
+# for clang 32-bit compiles we need libatomic for 64-bit atomic ops
+if cc.get_id() == 'clang' and dpdk_conf.get('RTE_ARCH_64') == false
+	ext_deps += cc.find_library('atomic')
+endif
diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
index 21eb1fb0a..0a03625c9 100644
--- a/lib/librte_distributor/rte_distributor.c
+++ b/lib/librte_distributor/rte_distributor.c
@@ -49,8 +49,11 @@ rte_distributor_request_pkt_v1705(struct rte_distributor *d,
 	}
 
 	retptr64 = &(buf->retptr64[0]);
-	/* Spin while handshake bits are set (scheduler clears it) */
-	while (unlikely(*retptr64 & RTE_DISTRIB_GET_BUF)) {
+	/* Spin while handshake bits are set (scheduler clears it).
+	 * Sync with worker on GET_BUF flag.
+	 */
+	while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE)
+			& RTE_DISTRIB_GET_BUF)) {
 		rte_pause();
 		uint64_t t = rte_rdtsc()+100;
 
@@ -75,8 +78,10 @@ rte_distributor_request_pkt_v1705(struct rte_distributor *d,
 	/*
 	 * Finally, set the GET_BUF  to signal to distributor that cache
 	 * line is ready for processing
+	 * Sync with distributor to release retptrs
 	 */
-	*retptr64 |= RTE_DISTRIB_GET_BUF;
+	__atomic_store_n(retptr64, *retptr64 | RTE_DISTRIB_GET_BUF,
+			__ATOMIC_RELEASE);
 }
 BIND_DEFAULT_SYMBOL(rte_distributor_request_pkt, _v1705, 17.05);
 MAP_STATIC_SYMBOL(void rte_distributor_request_pkt(struct rte_distributor *d,
@@ -98,8 +103,11 @@ rte_distributor_poll_pkt_v1705(struct rte_distributor *d,
 		return (pkts[0]) ? 1 : 0;
 	}
 
-	/* If bit is set, return */
-	if (buf->bufptr64[0] & RTE_DISTRIB_GET_BUF)
+	/* If bit is set, return
+	 * Sync with distributor to acquire bufptrs
+	 */
+	if (__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)
+		& RTE_DISTRIB_GET_BUF)
 		return -1;
 
 	/* since bufptr64 is signed, this should be an arithmetic shift */
@@ -114,8 +122,10 @@ rte_distributor_poll_pkt_v1705(struct rte_distributor *d,
 	 * so now we've got the contents of the cacheline into an  array of
 	 * mbuf pointers, so toggle the bit so scheduler can start working
 	 * on the next cacheline while we're working.
+	 * Sync with distributor on GET_BUF flag. Release bufptrs.
 	 */
-	buf->bufptr64[0] |= RTE_DISTRIB_GET_BUF;
+	__atomic_store_n(&(buf->bufptr64[0]),
+		buf->bufptr64[0] | RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
 
 	return count;
 }
@@ -174,6 +184,8 @@ rte_distributor_return_pkt_v1705(struct rte_distributor *d,
 			return -EINVAL;
 	}
 
+	/* Sync with distributor to acquire retptrs */
+	__atomic_thread_fence(__ATOMIC_ACQUIRE);
 	for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
 		/* Switch off the return bit first */
 		buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
@@ -182,8 +194,11 @@ rte_distributor_return_pkt_v1705(struct rte_distributor *d,
 		buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<
 			RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
 
-	/* set the GET_BUF but even if we got no returns */
-	buf->retptr64[0] |= RTE_DISTRIB_GET_BUF;
+	/* set the GET_BUF but even if we got no returns.
+	 * Sync with distributor on GET_BUF flag. Release retptrs.
+	 */
+	__atomic_store_n(&(buf->retptr64[0]),
+		buf->retptr64[0] | RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
 
 	return 0;
 }
@@ -273,7 +288,9 @@ handle_returns(struct rte_distributor *d, unsigned int wkr)
 	unsigned int count = 0;
 	unsigned int i;
 
-	if (buf->retptr64[0] & RTE_DISTRIB_GET_BUF) {
+	/* Sync on GET_BUF flag. Acquire retptrs. */
+	if (__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_ACQUIRE)
+		& RTE_DISTRIB_GET_BUF) {
 		for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
 			if (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) {
 				oldbuf = ((uintptr_t)(buf->retptr64[i] >>
@@ -286,8 +303,10 @@ handle_returns(struct rte_distributor *d, unsigned int wkr)
 		}
 		d->returns.start = ret_start;
 		d->returns.count = ret_count;
-		/* Clear for the worker to populate with more returns */
-		buf->retptr64[0] = 0;
+		/* Clear for the worker to populate with more returns.
+		 * Sync with distributor on GET_BUF flag. Release retptrs.
+		 */
+		__atomic_store_n(&(buf->retptr64[0]), 0, __ATOMIC_RELEASE);
 	}
 	return count;
 }
@@ -307,7 +326,9 @@ release(struct rte_distributor *d, unsigned int wkr)
 	struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
 	unsigned int i;
 
-	while (!(d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF))
+	/* Sync with worker on GET_BUF flag */
+	while (!(__atomic_load_n(&(d->bufs[wkr].bufptr64[0]), __ATOMIC_ACQUIRE)
+		& RTE_DISTRIB_GET_BUF))
 		rte_pause();
 
 	handle_returns(d, wkr);
@@ -327,8 +348,11 @@ release(struct rte_distributor *d, unsigned int wkr)
 
 	d->backlog[wkr].count = 0;
 
-	/* Clear the GET bit */
-	buf->bufptr64[0] &= ~RTE_DISTRIB_GET_BUF;
+	/* Clear the GET bit.
+	 * Sync with worker on GET_BUF flag. Release bufptrs.
+	 */
+	__atomic_store_n(&(buf->bufptr64[0]),
+		buf->bufptr64[0] & ~RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
 	return  buf->count;
 
 }
@@ -355,7 +379,9 @@ rte_distributor_process_v1705(struct rte_distributor *d,
 	if (unlikely(num_mbufs == 0)) {
 		/* Flush out all non-full cache-lines to workers. */
 		for (wid = 0 ; wid < d->num_workers; wid++) {
-			if (d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF) {
+			/* Sync with worker on GET_BUF flag. */
+			if (__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
+				__ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF) {
 				release(d, wid);
 				handle_returns(d, wid);
 			}
@@ -367,7 +393,9 @@ rte_distributor_process_v1705(struct rte_distributor *d,
 		uint16_t matches[RTE_DIST_BURST_SIZE];
 		unsigned int pkts;
 
-		if (d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF)
+		/* Sync with worker on GET_BUF flag. */
+		if (__atomic_load_n(&(d->bufs[wkr].bufptr64[0]),
+			__ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)
 			d->bufs[wkr].count = 0;
 
 		if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE)
@@ -465,7 +493,9 @@ rte_distributor_process_v1705(struct rte_distributor *d,
 
 	/* Flush out all non-full cache-lines to workers. */
 	for (wid = 0 ; wid < d->num_workers; wid++)
-		if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF))
+		/* Sync with worker on GET_BUF flag. */
+		if ((__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
+			__ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF))
 			release(d, wid);
 
 	return num_mbufs;
@@ -574,7 +604,9 @@ rte_distributor_clear_returns_v1705(struct rte_distributor *d)
 
 	/* throw away returns, so workers can exit */
 	for (wkr = 0; wkr < d->num_workers; wkr++)
-		d->bufs[wkr].retptr64[0] = 0;
+		/* Sync with worker. Release retptrs. */
+		__atomic_store_n(&(d->bufs[wkr].retptr64[0]), 0,
+				__ATOMIC_RELEASE);
 }
 BIND_DEFAULT_SYMBOL(rte_distributor_clear_returns, _v1705, 17.05);
 MAP_STATIC_SYMBOL(void rte_distributor_clear_returns(struct rte_distributor *d),
diff --git a/lib/librte_distributor/rte_distributor_v20.c b/lib/librte_distributor/rte_distributor_v20.c
index cdc0969a8..ef6d5cb4b 100644
--- a/lib/librte_distributor/rte_distributor_v20.c
+++ b/lib/librte_distributor/rte_distributor_v20.c
@@ -34,9 +34,12 @@ rte_distributor_request_pkt_v20(struct rte_distributor_v20 *d,
 	union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
 	int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
 			| RTE_DISTRIB_GET_BUF;
-	while (unlikely(buf->bufptr64 & RTE_DISTRIB_FLAGS_MASK))
+	while (unlikely(__atomic_load_n(&buf->bufptr64, __ATOMIC_RELAXED)
+			& RTE_DISTRIB_FLAGS_MASK))
 		rte_pause();
-	buf->bufptr64 = req;
+
+	/* Sync with distributor on GET_BUF flag. */
+	__atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE);
 }
 VERSION_SYMBOL(rte_distributor_request_pkt, _v20, 2.0);
 
@@ -45,7 +48,9 @@ rte_distributor_poll_pkt_v20(struct rte_distributor_v20 *d,
 		unsigned worker_id)
 {
 	union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
-	if (buf->bufptr64 & RTE_DISTRIB_GET_BUF)
+	/* Sync with distributor. Acquire bufptr64. */
+	if (__atomic_load_n(&buf->bufptr64, __ATOMIC_ACQUIRE)
+		& RTE_DISTRIB_GET_BUF)
 		return NULL;
 
 	/* since bufptr64 is signed, this should be an arithmetic shift */
@@ -73,7 +78,8 @@ rte_distributor_return_pkt_v20(struct rte_distributor_v20 *d,
 	union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
 	uint64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
 			| RTE_DISTRIB_RETURN_BUF;
-	buf->bufptr64 = req;
+	/* Sync with distributor on RETURN_BUF flag. */
+	__atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE);
 	return 0;
 }
 VERSION_SYMBOL(rte_distributor_return_pkt, _v20, 2.0);
@@ -117,7 +123,8 @@ handle_worker_shutdown(struct rte_distributor_v20 *d, unsigned int wkr)
 {
 	d->in_flight_tags[wkr] = 0;
 	d->in_flight_bitmask &= ~(1UL << wkr);
-	d->bufs[wkr].bufptr64 = 0;
+	/* Sync with worker. Release bufptr64. */
+	__atomic_store_n(&(d->bufs[wkr].bufptr64), 0, __ATOMIC_RELEASE);
 	if (unlikely(d->backlog[wkr].count != 0)) {
 		/* On return of a packet, we need to move the
 		 * queued packets for this core elsewhere.
@@ -161,17 +168,23 @@ process_returns(struct rte_distributor_v20 *d)
 			ret_count = d->returns.count;
 
 	for (wkr = 0; wkr < d->num_workers; wkr++) {
-
-		const int64_t data = d->bufs[wkr].bufptr64;
 		uintptr_t oldbuf = 0;
+		/* Sync with worker. Acquire bufptr64. */
+		const int64_t data = __atomic_load_n(&(d->bufs[wkr].bufptr64),
+							__ATOMIC_ACQUIRE);
 
 		if (data & RTE_DISTRIB_GET_BUF) {
 			flushed++;
 			if (d->backlog[wkr].count)
-				d->bufs[wkr].bufptr64 =
-						backlog_pop(&d->backlog[wkr]);
+				/* Sync with worker. Release bufptr64. */
+				__atomic_store_n(&(d->bufs[wkr].bufptr64),
+					backlog_pop(&d->backlog[wkr]),
+					__ATOMIC_RELEASE);
 			else {
-				d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF;
+				/* Sync with worker on GET_BUF flag. */
+				__atomic_store_n(&(d->bufs[wkr].bufptr64),
+					RTE_DISTRIB_GET_BUF,
+					__ATOMIC_RELEASE);
 				d->in_flight_tags[wkr] = 0;
 				d->in_flight_bitmask &= ~(1UL << wkr);
 			}
@@ -207,9 +220,10 @@ rte_distributor_process_v20(struct rte_distributor_v20 *d,
 		return process_returns(d);
 
 	while (next_idx < num_mbufs || next_mb != NULL) {
-
-		int64_t data = d->bufs[wkr].bufptr64;
 		uintptr_t oldbuf = 0;
+		/* Sync with worker. Acquire bufptr64. */
+		int64_t data = __atomic_load_n(&(d->bufs[wkr].bufptr64),
+						__ATOMIC_ACQUIRE);
 
 		if (!next_mb) {
 			next_mb = mbufs[next_idx++];
@@ -255,11 +269,16 @@ rte_distributor_process_v20(struct rte_distributor_v20 *d,
 				(d->backlog[wkr].count || next_mb)) {
 
 			if (d->backlog[wkr].count)
-				d->bufs[wkr].bufptr64 =
-						backlog_pop(&d->backlog[wkr]);
+				/* Sync with worker. Release bufptr64. */
+				__atomic_store_n(&(d->bufs[wkr].bufptr64),
+						backlog_pop(&d->backlog[wkr]),
+						__ATOMIC_RELEASE);
 
 			else {
-				d->bufs[wkr].bufptr64 = next_value;
+				/* Sync with worker. Release bufptr64.  */
+				__atomic_store_n(&(d->bufs[wkr].bufptr64),
+						next_value,
+						__ATOMIC_RELEASE);
 				d->in_flight_tags[wkr] = new_tag;
 				d->in_flight_bitmask |= (1UL << wkr);
 				next_mb = NULL;
@@ -280,13 +299,19 @@ rte_distributor_process_v20(struct rte_distributor_v20 *d,
 	 * if they are ready */
 	for (wkr = 0; wkr < d->num_workers; wkr++)
 		if (d->backlog[wkr].count &&
-				(d->bufs[wkr].bufptr64 & RTE_DISTRIB_GET_BUF)) {
+				/* Sync with worker. Acquire bufptr64. */
+				(__atomic_load_n(&(d->bufs[wkr].bufptr64),
+				__ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)) {
 
 			int64_t oldbuf = d->bufs[wkr].bufptr64 >>
 					RTE_DISTRIB_FLAG_BITS;
+
 			store_return(oldbuf, d, &ret_start, &ret_count);
 
-			d->bufs[wkr].bufptr64 = backlog_pop(&d->backlog[wkr]);
+			/* Sync with worker. Release bufptr64. */
+			__atomic_store_n(&(d->bufs[wkr].bufptr64),
+				backlog_pop(&d->backlog[wkr]),
+				__ATOMIC_RELEASE);
 		}
 
 	d->returns.start = ret_start;
-- 
2.17.1


  reply	other threads:[~2019-10-15  9:29 UTC|newest]

Thread overview: 23+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-10-08  9:55 [dpdk-dev] [PATCH] lib/distributor: fix deadlock issue for aarch64 Ruifeng Wang
2019-10-08 12:53 ` Hunt, David
2019-10-08 17:05 ` Aaron Conole
2019-10-08 19:46   ` [dpdk-dev] [dpdk-stable] " David Marchand
2019-10-08 20:08     ` Aaron Conole
2019-10-09  5:52     ` Ruifeng Wang (Arm Technology China)
2019-10-17 11:42       ` [dpdk-dev] [EXT] " Harman Kalra
2019-10-17 13:48         ` Ruifeng Wang (Arm Technology China)
2019-10-12  2:43 ` [dpdk-dev] [PATCH v2 0/2] fix distributor unit test Ruifeng Wang
2019-10-12  2:43   ` [dpdk-dev] [PATCH v2 1/2] lib/distributor: fix deadlock issue for aarch64 Ruifeng Wang
2019-10-13  2:31     ` Honnappa Nagarahalli
2019-10-14 10:00       ` Ruifeng Wang (Arm Technology China)
2019-10-12  2:43   ` [dpdk-dev] [PATCH v2 2/2] test/distributor: fix false unit test failure Ruifeng Wang
2019-10-15  9:28 ` [dpdk-dev] [PATCH v3 0/2] fix distributor unit test Ruifeng Wang
2019-10-15  9:28   ` Ruifeng Wang [this message]
2019-10-25  8:13     ` [dpdk-dev] [PATCH v3 1/2] lib/distributor: fix deadlock issue for aarch64 Hunt, David
2019-10-15  9:28   ` [dpdk-dev] [PATCH v3 2/2] test/distributor: fix false unit test failure Ruifeng Wang
2019-10-25  8:13     ` Hunt, David
2019-10-24 19:31   ` [dpdk-dev] [PATCH v3 0/2] fix distributor unit test David Marchand
2019-10-25  8:11     ` Hunt, David
2019-10-25  8:18       ` David Marchand
2019-10-25  8:20         ` Hunt, David
2019-10-25  8:33   ` David Marchand

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20191015092826.13002-2-ruifeng.wang@arm.com \
    --to=ruifeng.wang@arm.com \
    --cc=david.hunt@intel.com \
    --cc=dev@dpdk.org \
    --cc=gavin.hu@arm.com \
    --cc=hkalra@marvell.com \
    --cc=honnappa.nagarahalli@arm.com \
    --cc=nd@arm.com \
    --cc=stable@dpdk.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).