All of lore.kernel.org
 help / color / mirror / Atom feed
From: Bernard Iremonger <bernard.iremonger@intel.com>
To: dev@dpdk.org
Cc: declan.doherty@intel.com,
	Bernard Iremonger <bernard.iremonger@intel.com>
Subject: [PATCH 2/5] bonding: add read/write lock to rx/tx burst functions
Date: Thu,  5 May 2016 16:14:57 +0100	[thread overview]
Message-ID: <1462461300-9962-3-git-send-email-bernard.iremonger@intel.com> (raw)
In-Reply-To: <1462461300-9962-1-git-send-email-bernard.iremonger@intel.com>

Signed-off-by: Bernard Iremonger <bernard.iremonger@intel.com>
---
 drivers/net/bonding/rte_eth_bond_pmd.c | 112 +++++++++++++++++++++++++--------
 1 file changed, 85 insertions(+), 27 deletions(-)

diff --git a/drivers/net/bonding/rte_eth_bond_pmd.c b/drivers/net/bonding/rte_eth_bond_pmd.c
index ed6245b..c3e772c 100644
--- a/drivers/net/bonding/rte_eth_bond_pmd.c
+++ b/drivers/net/bonding/rte_eth_bond_pmd.c
@@ -92,7 +92,7 @@ bond_ethdev_rx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 
 	internals = bd_rx_q->dev_private;
 
-
+	rte_rwlock_read_lock(&internals->rwlock);
 	for (i = 0; i < internals->active_slave_count && nb_pkts; i++) {
 		/* Offset of pointer to *bufs increases as packets are received
 		 * from other slaves */
@@ -103,6 +103,7 @@ bond_ethdev_rx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 			nb_pkts -= num_rx_slave;
 		}
 	}
+	rte_rwlock_read_unlock(&internals->rwlock);
 
 	return num_rx_total;
 }
@@ -112,14 +113,20 @@ bond_ethdev_rx_burst_active_backup(void *queue, struct rte_mbuf **bufs,
 		uint16_t nb_pkts)
 {
 	struct bond_dev_private *internals;
+	uint16_t num_rx_total;
 
 	/* Cast to structure, containing bonded device's port id and queue id */
 	struct bond_rx_queue *bd_rx_q = (struct bond_rx_queue *)queue;
 
 	internals = bd_rx_q->dev_private;
+	rte_rwlock_read_lock(&internals->rwlock);
+
+	num_rx_total = rte_eth_rx_burst(internals->current_primary_port,
+					bd_rx_q->queue_id, bufs, nb_pkts);
 
-	return rte_eth_rx_burst(internals->current_primary_port,
-			bd_rx_q->queue_id, bufs, nb_pkts);
+	rte_rwlock_read_unlock(&internals->rwlock);
+
+	return num_rx_total;
 }
 
 static uint16_t
@@ -149,12 +156,17 @@ bond_ethdev_rx_burst_8023ad(void *queue, struct rte_mbuf **bufs,
 	memcpy(slaves, internals->active_slaves,
 			sizeof(internals->active_slaves[0]) * slave_count);
 
-	for (i = 0; i < slave_count && num_rx_total < nb_pkts; i++) {
+	rte_rwlock_read_lock(&internals->rwlock);
+	for (i = 0; i < internals->active_slave_count && num_rx_total < nb_pkts;
+		 i++) {
 		j = num_rx_total;
-		collecting = ACTOR_STATE(&mode_8023ad_ports[slaves[i]], COLLECTING);
+		collecting = ACTOR_STATE(
+				&mode_8023ad_ports[internals->active_slaves[i]],
+				COLLECTING);
 
 		/* Read packets from this slave */
-		num_rx_total += rte_eth_rx_burst(slaves[i], bd_rx_q->queue_id,
+		num_rx_total += rte_eth_rx_burst(internals->active_slaves[i],
+				bd_rx_q->queue_id,
 				&bufs[num_rx_total], nb_pkts - num_rx_total);
 
 		for (k = j; k < 2 && k < num_rx_total; k++)
@@ -175,7 +187,9 @@ bond_ethdev_rx_burst_8023ad(void *queue, struct rte_mbuf **bufs,
 					!is_same_ether_addr(&bond_mac, &hdr->d_addr)))) {
 
 				if (hdr->ether_type == ether_type_slow_be) {
-					bond_mode_8023ad_handle_slow_pkt(internals, slaves[i],
+					bond_mode_8023ad_handle_slow_pkt(
+						internals,
+						internals->active_slaves[i],
 						bufs[j]);
 				} else
 					rte_pktmbuf_free(bufs[j]);
@@ -190,6 +204,7 @@ bond_ethdev_rx_burst_8023ad(void *queue, struct rte_mbuf **bufs,
 				j++;
 		}
 	}
+	rte_rwlock_read_unlock(&internals->rwlock);
 
 	return num_rx_total;
 }
@@ -408,12 +423,14 @@ bond_ethdev_tx_burst_round_robin(void *queue, struct rte_mbuf **bufs,
 
 	/* Copy slave list to protect against slave up/down changes during tx
 	 * bursting */
+	rte_rwlock_read_lock(&internals->rwlock);
 	num_of_slaves = internals->active_slave_count;
 	memcpy(slaves, internals->active_slaves,
 			sizeof(internals->active_slaves[0]) * num_of_slaves);
-
-	if (num_of_slaves < 1)
+	if (num_of_slaves < 1) {
+		rte_rwlock_read_unlock(&internals->rwlock);
 		return num_tx_total;
+	}
 
 	/* Populate slaves mbuf with which packets are to be sent on it  */
 	for (i = 0; i < nb_pkts; i++) {
@@ -428,8 +445,10 @@ bond_ethdev_tx_burst_round_robin(void *queue, struct rte_mbuf **bufs,
 	/* Send packet burst on each slave device */
 	for (i = 0; i < num_of_slaves; i++) {
 		if (slave_nb_pkts[i] > 0) {
-			num_tx_slave = rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
-					slave_bufs[i], slave_nb_pkts[i]);
+			num_tx_slave = rte_eth_tx_burst(
+					internals->active_slaves[i],
+					bd_tx_q->queue_id, slave_bufs[i],
+					slave_nb_pkts[i]);
 
 			/* if tx burst fails move packets to end of bufs */
 			if (unlikely(num_tx_slave < slave_nb_pkts[i])) {
@@ -444,6 +463,7 @@ bond_ethdev_tx_burst_round_robin(void *queue, struct rte_mbuf **bufs,
 			num_tx_total += num_tx_slave;
 		}
 	}
+	rte_rwlock_read_unlock(&internals->rwlock);
 
 	return num_tx_total;
 }
@@ -454,15 +474,23 @@ bond_ethdev_tx_burst_active_backup(void *queue,
 {
 	struct bond_dev_private *internals;
 	struct bond_tx_queue *bd_tx_q;
+	uint16_t num_tx_total;
 
 	bd_tx_q = (struct bond_tx_queue *)queue;
 	internals = bd_tx_q->dev_private;
 
-	if (internals->active_slave_count < 1)
+	rte_rwlock_read_lock(&internals->rwlock);
+	if (internals->active_slave_count < 1) {
+		rte_rwlock_read_unlock(&internals->rwlock);
 		return 0;
+	}
+
+	num_tx_total = rte_eth_tx_burst(internals->current_primary_port,
+					bd_tx_q->queue_id, bufs, nb_pkts);
 
-	return rte_eth_tx_burst(internals->current_primary_port, bd_tx_q->queue_id,
-			bufs, nb_pkts);
+	rte_rwlock_read_unlock(&internals->rwlock);
+
+	return num_tx_total;
 }
 
 static inline uint16_t
@@ -693,16 +721,19 @@ bond_ethdev_tx_burst_tlb(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 			&rte_eth_devices[internals->primary_port];
 	uint16_t num_tx_total = 0;
 	uint8_t i, j;
-
-	uint8_t num_of_slaves = internals->active_slave_count;
+	uint8_t num_of_slaves;
 	uint8_t slaves[RTE_MAX_ETHPORTS];
 
 	struct ether_hdr *ether_hdr;
 	struct ether_addr primary_slave_addr;
 	struct ether_addr active_slave_addr;
 
-	if (num_of_slaves < 1)
+	rte_rwlock_read_lock(&internals->rwlock);
+	num_of_slaves = internals->active_slave_count;
+	if (num_of_slaves < 1) {
+		rte_rwlock_read_unlock(&internals->rwlock);
 		return num_tx_total;
+	}
 
 	memcpy(slaves, internals->tlb_slaves_order,
 				sizeof(internals->tlb_slaves_order[0]) * num_of_slaves);
@@ -716,7 +747,8 @@ bond_ethdev_tx_burst_tlb(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 	}
 
 	for (i = 0; i < num_of_slaves; i++) {
-		rte_eth_macaddr_get(slaves[i], &active_slave_addr);
+		rte_eth_macaddr_get(internals->tlb_slaves_order[i],
+				&active_slave_addr);
 		for (j = num_tx_total; j < nb_pkts; j++) {
 			if (j + 3 < nb_pkts)
 				rte_prefetch0(rte_pktmbuf_mtod(bufs[j+3], void*));
@@ -729,12 +761,15 @@ bond_ethdev_tx_burst_tlb(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 #endif
 		}
 
-		num_tx_total += rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
+		num_tx_total += rte_eth_tx_burst(
+				internals->tlb_slaves_order[i],
+				bd_tx_q->queue_id,
 				bufs + num_tx_total, nb_pkts - num_tx_total);
 
 		if (num_tx_total == nb_pkts)
 			break;
 	}
+	rte_rwlock_read_unlock(&internals->rwlock);
 
 	return num_tx_total;
 }
@@ -836,6 +871,8 @@ bond_ethdev_tx_burst_alb(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 		internals->mode6.ntt = 0;
 	}
 
+	rte_rwlock_read_lock(&internals->rwlock);
+
 	/* Send ARP packets on proper slaves */
 	for (i = 0; i < RTE_MAX_ETHPORTS; i++) {
 		if (slave_bufs_pkts[i] > 0) {
@@ -876,6 +913,8 @@ bond_ethdev_tx_burst_alb(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 		}
 	}
 
+	rte_rwlock_read_unlock(&internals->rwlock);
+
 	/* Send non-ARP packets using tlb policy */
 	if (slave_bufs_pkts[RTE_MAX_ETHPORTS] > 0) {
 		num_send = bond_ethdev_tx_burst_tlb(queue,
@@ -916,12 +955,16 @@ bond_ethdev_tx_burst_balance(void *queue, struct rte_mbuf **bufs,
 
 	/* Copy slave list to protect against slave up/down changes during tx
 	 * bursting */
+	rte_rwlock_read_lock(&internals->rwlock);
 	num_of_slaves = internals->active_slave_count;
+
 	memcpy(slaves, internals->active_slaves,
 			sizeof(internals->active_slaves[0]) * num_of_slaves);
 
-	if (num_of_slaves < 1)
+	if (num_of_slaves < 1) {
+		rte_rwlock_read_unlock(&internals->rwlock);
 		return num_tx_total;
+	}
 
 	/* Populate slaves mbuf with the packets which are to be sent on it  */
 	for (i = 0; i < nb_pkts; i++) {
@@ -935,7 +978,9 @@ bond_ethdev_tx_burst_balance(void *queue, struct rte_mbuf **bufs,
 	/* Send packet burst on each slave device */
 	for (i = 0; i < num_of_slaves; i++) {
 		if (slave_nb_pkts[i] > 0) {
-			num_tx_slave = rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
+			num_tx_slave = rte_eth_tx_burst(
+					internals->active_slaves[i],
+					bd_tx_q->queue_id,
 					slave_bufs[i], slave_nb_pkts[i]);
 
 			/* if tx burst fails move packets to end of bufs */
@@ -952,6 +997,7 @@ bond_ethdev_tx_burst_balance(void *queue, struct rte_mbuf **bufs,
 		}
 	}
 
+	rte_rwlock_read_unlock(&internals->rwlock);
 	return num_tx_total;
 }
 
@@ -986,15 +1032,20 @@ bond_ethdev_tx_burst_8023ad(void *queue, struct rte_mbuf **bufs,
 
 	/* Copy slave list to protect against slave up/down changes during tx
 	 * bursting */
+	rte_rwlock_read_lock(&internals->rwlock);
 	num_of_slaves = internals->active_slave_count;
-	if (num_of_slaves < 1)
+	if (num_of_slaves < 1) {
+		rte_rwlock_read_unlock(&internals->rwlock);
 		return num_tx_total;
+	}
 
 	memcpy(slaves, internals->active_slaves, sizeof(slaves[0]) * num_of_slaves);
 
 	distributing_count = 0;
 	for (i = 0; i < num_of_slaves; i++) {
-		struct port *port = &mode_8023ad_ports[slaves[i]];
+		struct port *port;
+
+		port = &mode_8023ad_ports[internals->active_slaves[i]];
 
 		slave_slow_nb_pkts[i] = rte_ring_dequeue_burst(port->tx_ring,
 				slow_pkts, BOND_MODE_8023AX_SLAVE_TX_PKTS);
@@ -1026,7 +1077,8 @@ bond_ethdev_tx_burst_8023ad(void *queue, struct rte_mbuf **bufs,
 		if (slave_nb_pkts[i] == 0)
 			continue;
 
-		num_tx_slave = rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
+		num_tx_slave = rte_eth_tx_burst(
+				internals->active_slaves[i], bd_tx_q->queue_id,
 				slave_bufs[i], slave_nb_pkts[i]);
 
 		/* If tx burst fails drop slow packets */
@@ -1044,6 +1096,7 @@ bond_ethdev_tx_burst_8023ad(void *queue, struct rte_mbuf **bufs,
 		}
 	}
 
+	rte_rwlock_read_unlock(&internals->rwlock);
 	return num_tx_total;
 }
 
@@ -1067,12 +1120,15 @@ bond_ethdev_tx_burst_broadcast(void *queue, struct rte_mbuf **bufs,
 
 	/* Copy slave list to protect against slave up/down changes during tx
 	 * bursting */
+	rte_rwlock_read_lock(&internals->rwlock);
 	num_of_slaves = internals->active_slave_count;
 	memcpy(slaves, internals->active_slaves,
 			sizeof(internals->active_slaves[0]) * num_of_slaves);
 
-	if (num_of_slaves < 1)
+	if (num_of_slaves < 1) {
+		rte_rwlock_read_unlock(&internals->rwlock);
 		return 0;
+	}
 
 	/* Increment reference count on mbufs */
 	for (i = 0; i < nb_pkts; i++)
@@ -1080,8 +1136,9 @@ bond_ethdev_tx_burst_broadcast(void *queue, struct rte_mbuf **bufs,
 
 	/* Transmit burst on each active slave */
 	for (i = 0; i < num_of_slaves; i++) {
-		slave_tx_total[i] = rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
-					bufs, nb_pkts);
+		slave_tx_total[i] = rte_eth_tx_burst(
+				internals->active_slaves[i],
+				bd_tx_q->queue_id, bufs, nb_pkts);
 
 		if (unlikely(slave_tx_total[i] < nb_pkts))
 			tx_failed_flag = 1;
@@ -1104,6 +1161,7 @@ bond_ethdev_tx_burst_broadcast(void *queue, struct rte_mbuf **bufs,
 				while (slave_tx_total[i] < nb_pkts)
 					rte_pktmbuf_free(bufs[slave_tx_total[i]++]);
 
+	rte_rwlock_read_unlock(&internals->rwlock);
 	return max_nb_of_tx_pkts;
 }
 
-- 
2.6.3

  parent reply	other threads:[~2016-05-05 15:38 UTC|newest]

Thread overview: 42+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2016-05-05 15:14 [PATCH 0/5] bonding: locks Bernard Iremonger
2016-05-05 15:14 ` [PATCH 1/5] bonding: replace spinlock with read/write lock Bernard Iremonger
2016-05-05 17:12   ` Stephen Hemminger
2016-05-06 10:32     ` Declan Doherty
2016-05-06 15:55       ` Stephen Hemminger
2016-05-13 17:10         ` Ananyev, Konstantin
2016-05-13 17:18           ` Ananyev, Konstantin
2016-05-26 16:24             ` Iremonger, Bernard
2016-05-05 15:14 ` Bernard Iremonger [this message]
2016-05-05 15:14 ` [PATCH 3/5] bonding: remove memcopy of slaves from rx/tx burst function Bernard Iremonger
2016-05-05 15:14 ` [PATCH 4/5] bonding: add read/write lock to stop function Bernard Iremonger
2016-05-05 15:15 ` [PATCH 5/5] bonding: add read/write lock to the link_update function Bernard Iremonger
2016-05-26 16:38 ` [PATCH v2 0/6] bonding: locks Bernard Iremonger
2016-05-26 16:38   ` [PATCH v2 1/6] bonding: add spinlock to rx and tx queues Bernard Iremonger
2016-06-10 18:12     ` Ananyev, Konstantin
2016-06-12 17:11     ` [PATCH v3 0/4] bonding: locks Bernard Iremonger
2016-06-12 17:11       ` [PATCH v3 1/4] bonding: add spinlock to rx and tx queues Bernard Iremonger
2016-06-12 17:11       ` [PATCH v3 2/4] bonding: grab queue spinlocks in slave add and remove Bernard Iremonger
2016-06-12 17:11       ` [PATCH v3 3/4] bonding: take queue spinlock in rx/tx burst functions Bernard Iremonger
2016-06-13  9:18         ` Bruce Richardson
2016-06-13 12:28           ` Iremonger, Bernard
2016-06-16 14:32             ` Bruce Richardson
2016-06-16 15:00               ` Thomas Monjalon
2016-06-16 16:41                 ` Iremonger, Bernard
2016-06-16 18:38                   ` Thomas Monjalon
2017-02-15 18:01                     ` Ferruh Yigit
2017-02-16  9:13                       ` Bruce Richardson
2017-02-16 11:39                         ` Iremonger, Bernard
2017-02-20 11:15                           ` Ferruh Yigit
2016-09-09 11:29         ` Ferruh Yigit
2016-06-12 17:11       ` [PATCH v3 4/4] bonding: remove memcpy from " Bernard Iremonger
2016-09-11 12:39         ` Yuanhan Liu
2016-05-26 16:38   ` [PATCH v2 2/6] bonding: grab queue spinlocks in slave add and remove Bernard Iremonger
2016-06-10 18:14     ` Ananyev, Konstantin
2016-05-26 16:38   ` [PATCH v2 3/6] bonding: take queue spinlock in rx/tx burst functions Bernard Iremonger
2016-06-10 18:14     ` Ananyev, Konstantin
2016-05-26 16:38   ` [PATCH v2 4/6] bonding: add spinlock to stop function Bernard Iremonger
2016-05-26 16:38   ` [PATCH v2 5/6] bonding: add spinlock to link update function Bernard Iremonger
2016-05-26 16:38   ` [PATCH v2 6/6] bonding: remove memcpy from burst functions Bernard Iremonger
2016-06-10 18:15     ` Ananyev, Konstantin
2016-06-10 14:45   ` [PATCH v2 0/6] bonding: locks Bruce Richardson
2016-06-10 18:24     ` Iremonger, Bernard

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1462461300-9962-3-git-send-email-bernard.iremonger@intel.com \
    --to=bernard.iremonger@intel.com \
    --cc=declan.doherty@intel.com \
    --cc=dev@dpdk.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.