BPF Archive on lore.kernel.org
 help / color / Atom feed
* [PATCH net-next 0/9] XDP for NXP ENETC
@ 2021-03-31 20:08 Vladimir Oltean
  2021-03-31 20:08 ` [PATCH net-next 1/9] net: enetc: consume the error RX buffer descriptors in a dedicated function Vladimir Oltean
                   ` (9 more replies)
  0 siblings, 10 replies; 23+ messages in thread
From: Vladimir Oltean @ 2021-03-31 20:08 UTC (permalink / raw)
  To: Jakub Kicinski, Alexei Starovoitov, Daniel Borkmann,
	Jesper Dangaard Brouer, John Fastabend, Andrii Nakryiko,
	Martin KaFai Lau, Song Liu, Yonghong Song, KP Singh,
	David S. Miller, netdev, bpf
  Cc: Alexander Duyck, Ioana Ciornei, Alex Marginean, Claudiu Manoil,
	Ilias Apalodimas, Vladimir Oltean

From: Vladimir Oltean <vladimir.oltean@nxp.com>

This series adds support to the enetc driver for the basic XDP primitives.
The ENETC is a network controller found inside the NXP LS1028A SoC,
which is a dual-core Cortex A72 device for industrial networking,
with the CPUs clocked at up to 1.3 GHz. On this platform, there are 4
ENETC ports and a 6-port embedded DSA switch, in a topology that looks
like this:

  +-------------------------------------------------------------------------+
  |                    +--------+ 1 Gbps (typically disabled)               |
  | ENETC PCI          |  ENETC |--------------------------+                |
  | Root Complex       | port 3 |-----------------------+  |                |
  | Integrated         +--------+                       |  |                |
  | Endpoint                                            |  |                |
  |                    +--------+ 2.5 Gbps              |  |                |
  |                    |  ENETC |--------------+        |  |                |
  |                    | port 2 |-----------+  |        |  |                |
  |                    +--------+           |  |        |  |                |
  |                                         |  |        |  |                |
  |                        +------------------------------------------------+
  |                        |             |  Felix |  |  Felix |             |
  |                        | Switch      | port 4 |  | port 5 |             |
  |                        |             +--------+  +--------+             |
  |                        |                                                |
  | +--------+  +--------+ | +--------+  +--------+  +--------+  +--------+ |
  | |  ENETC |  |  ENETC | | |  Felix |  |  Felix |  |  Felix |  |  Felix | |
  | | port 0 |  | port 1 | | | port 0 |  | port 1 |  | port 2 |  | port 3 | |
  +-------------------------------------------------------------------------+
         |          |             |           |            |          |
         v          v             v           v            v          v
       Up to      Up to                      Up to 4x 2.5Gbps
      2.5Gbps     1Gbps

The ENETC ports 2 and 3 can act as DSA masters for the embedded switch.
Because 4 out of the 6 externally-facing ports of the SoC are switch
ports, the most interesting use case for XDP on this device is in fact
XDP_TX on the 2.5Gbps DSA master.

Nonetheless, the results presented below are for IPv4 forwarding between
ENETC port 0 (eno0) and port 1 (eno1) both configured for 1Gbps.
There are two streams of IPv4/UDP datagrams with a frame length of 64
octets delivered at 100% port load to eno0 and to eno1. eno0 has a flow
steering rule to process the traffic on RX ring 0 (CPU 0), and eno1 has
a flow steering rule towards RX ring 1 (CPU 1).

For the IPFWD test, standard IP routing was enabled in the netns.
For the XDP_DROP test, the samples/bpf/xdp1 program was attached to both
eno0 and to eno1.
For the XDP_TX test, the samples/bpf/xdp2 program was attached to both
eno0 and to eno1.
For the XDP_REDIRECT test, the samples/bpf/xdp_redirect program was
attached once to the input of eno0/output of eno1, and twice to the
input of eno1/output of eno0.

Finally, the preliminary results are as follows:

        | IPFWD | XDP_TX | XDP_REDIRECT | XDP_DROP
--------+-------+--------+-------------------------
fps     | 761   | 2535   | 1735         | 2783
Gbps    | 0.51  | 1.71   | 1.17         | n/a

There is a strange phenomenon in my testing sistem where it appears that
one CPU is processing more than the other. I have not investigated this
too much. Also, the code might not be very well optimized (for example
dma_sync_for_device is called with the full ENETC_RXB_DMA_SIZE_XDP).

Design wise, the ENETC is a PCI device with BD rings, so it uses the
MEM_TYPE_PAGE_SHARED memory model, as can typically be seen in Intel
devices. The strategy was to build upon the existing model that the
driver uses, and not change it too much. So you will see things like a
separate NAPI poll function for XDP.

I have only tested with PAGE_SIZE=4096, and since we split pages in
half, it means that MTU-sized frames are scatter/gather (the XDP
headroom + skb_shared_info only leaves us 1476 bytes of data per
buffer). This is sub-optimal, but I would rather keep it this way and
help speed up Lorenzo's series for S/G support through testing, rather
than change the enetc driver to use some other memory model like page_pool.
My code is already structured for S/G, and that works fine for XDP_DROP
and XDP_TX, just not for XDP_REDIRECT, even between two enetc ports.
So the S/G XDP_REDIRECT is stubbed out (the frames are dropped), but
obviously I would like to remove that limitation soon.

Please note that I am rather new to this kind of stuff, I am more of a
control path person, so I would appreciate feedback.

Enough talking, on to the patches.

Vladimir Oltean (9):
  net: enetc: consume the error RX buffer descriptors in a dedicated
    function
  net: enetc: move skb creation into enetc_build_skb
  net: enetc: add a dedicated is_eof bit in the TX software BD
  net: enetc: clean the TX software BD on the TX confirmation path
  net: enetc: move up enetc_reuse_page and enetc_page_reusable
  net: enetc: add support for XDP_DROP and XDP_PASS
  net: enetc: add support for XDP_TX
  net: enetc: increase RX ring default size
  net: enetc: add support for XDP_REDIRECT

 drivers/net/ethernet/freescale/enetc/enetc.c  | 826 +++++++++++++++---
 drivers/net/ethernet/freescale/enetc/enetc.h  |  53 +-
 .../ethernet/freescale/enetc/enetc_ethtool.c  |  19 +-
 .../net/ethernet/freescale/enetc/enetc_pf.c   |   2 +
 4 files changed, 796 insertions(+), 104 deletions(-)

-- 
2.25.1


^ permalink raw reply	[flat|nested] 23+ messages in thread

* [PATCH net-next 1/9] net: enetc: consume the error RX buffer descriptors in a dedicated function
  2021-03-31 20:08 [PATCH net-next 0/9] XDP for NXP ENETC Vladimir Oltean
@ 2021-03-31 20:08 ` Vladimir Oltean
  2021-03-31 20:08 ` [PATCH net-next 2/9] net: enetc: move skb creation into enetc_build_skb Vladimir Oltean
                   ` (8 subsequent siblings)
  9 siblings, 0 replies; 23+ messages in thread
From: Vladimir Oltean @ 2021-03-31 20:08 UTC (permalink / raw)
  To: Jakub Kicinski, Alexei Starovoitov, Daniel Borkmann,
	Jesper Dangaard Brouer, John Fastabend, Andrii Nakryiko,
	Martin KaFai Lau, Song Liu, Yonghong Song, KP Singh,
	David S. Miller, netdev, bpf
  Cc: Alexander Duyck, Ioana Ciornei, Alex Marginean, Claudiu Manoil,
	Ilias Apalodimas, Vladimir Oltean

From: Vladimir Oltean <vladimir.oltean@nxp.com>

We can and should check the RX BD errors before starting to build the
skb. The only apparent reason why things are done in this backwards
order is to spare one call to enetc_rxbd_next.

Signed-off-by: Vladimir Oltean <vladimir.oltean@nxp.com>
---
 drivers/net/ethernet/freescale/enetc/enetc.c | 43 ++++++++++++--------
 1 file changed, 27 insertions(+), 16 deletions(-)

diff --git a/drivers/net/ethernet/freescale/enetc/enetc.c b/drivers/net/ethernet/freescale/enetc/enetc.c
index 5a54976e6a28..362cfba7ce14 100644
--- a/drivers/net/ethernet/freescale/enetc/enetc.c
+++ b/drivers/net/ethernet/freescale/enetc/enetc.c
@@ -605,6 +605,28 @@ static void enetc_add_rx_buff_to_skb(struct enetc_bdr *rx_ring, int i,
 	enetc_put_rx_buff(rx_ring, rx_swbd);
 }
 
+static bool enetc_check_bd_errors_and_consume(struct enetc_bdr *rx_ring,
+					      u32 bd_status,
+					      union enetc_rx_bd **rxbd, int *i)
+{
+	if (likely(!(bd_status & ENETC_RXBD_LSTATUS(ENETC_RXBD_ERR_MASK))))
+		return false;
+
+	enetc_rxbd_next(rx_ring, rxbd, i);
+
+	while (!(bd_status & ENETC_RXBD_LSTATUS_F)) {
+		dma_rmb();
+		bd_status = le32_to_cpu((*rxbd)->r.lstatus);
+
+		enetc_rxbd_next(rx_ring, rxbd, i);
+	}
+
+	rx_ring->ndev->stats.rx_dropped++;
+	rx_ring->ndev->stats.rx_errors++;
+
+	return true;
+}
+
 #define ENETC_RXBD_BUNDLE 16 /* # of BDs to update at once */
 
 static int enetc_clean_rx_ring(struct enetc_bdr *rx_ring,
@@ -634,6 +656,11 @@ static int enetc_clean_rx_ring(struct enetc_bdr *rx_ring,
 
 		enetc_wr_reg_hot(rx_ring->idr, BIT(rx_ring->index));
 		dma_rmb(); /* for reading other rxbd fields */
+
+		if (enetc_check_bd_errors_and_consume(rx_ring, bd_status,
+						      &rxbd, &i))
+			break;
+
 		size = le16_to_cpu(rxbd->r.buf_len);
 		skb = enetc_map_rx_buff_to_skb(rx_ring, i, size);
 		if (!skb)
@@ -645,22 +672,6 @@ static int enetc_clean_rx_ring(struct enetc_bdr *rx_ring,
 
 		enetc_rxbd_next(rx_ring, &rxbd, &i);
 
-		if (unlikely(bd_status &
-			     ENETC_RXBD_LSTATUS(ENETC_RXBD_ERR_MASK))) {
-			dev_kfree_skb(skb);
-			while (!(bd_status & ENETC_RXBD_LSTATUS_F)) {
-				dma_rmb();
-				bd_status = le32_to_cpu(rxbd->r.lstatus);
-
-				enetc_rxbd_next(rx_ring, &rxbd, &i);
-			}
-
-			rx_ring->ndev->stats.rx_dropped++;
-			rx_ring->ndev->stats.rx_errors++;
-
-			break;
-		}
-
 		/* not last BD in frame? */
 		while (!(bd_status & ENETC_RXBD_LSTATUS_F)) {
 			bd_status = le32_to_cpu(rxbd->r.lstatus);
-- 
2.25.1


^ permalink raw reply	[flat|nested] 23+ messages in thread

* [PATCH net-next 2/9] net: enetc: move skb creation into enetc_build_skb
  2021-03-31 20:08 [PATCH net-next 0/9] XDP for NXP ENETC Vladimir Oltean
  2021-03-31 20:08 ` [PATCH net-next 1/9] net: enetc: consume the error RX buffer descriptors in a dedicated function Vladimir Oltean
@ 2021-03-31 20:08 ` Vladimir Oltean
  2021-03-31 20:08 ` [PATCH net-next 3/9] net: enetc: add a dedicated is_eof bit in the TX software BD Vladimir Oltean
                   ` (7 subsequent siblings)
  9 siblings, 0 replies; 23+ messages in thread
From: Vladimir Oltean @ 2021-03-31 20:08 UTC (permalink / raw)
  To: Jakub Kicinski, Alexei Starovoitov, Daniel Borkmann,
	Jesper Dangaard Brouer, John Fastabend, Andrii Nakryiko,
	Martin KaFai Lau, Song Liu, Yonghong Song, KP Singh,
	David S. Miller, netdev, bpf
  Cc: Alexander Duyck, Ioana Ciornei, Alex Marginean, Claudiu Manoil,
	Ilias Apalodimas, Vladimir Oltean

From: Vladimir Oltean <vladimir.oltean@nxp.com>

We need to build an skb from two code paths now: from the plain RX data
path and from the XDP data path when the verdict is XDP_PASS.

Create a new enetc_build_skb function which contains the essential steps
for building an skb based on the first and last positions of buffer
descriptors within the RX ring.

We also squash the enetc_process_skb function into enetc_build_skb,
because what that function did wasn't very meaningful on its own.

The "rx_frm_cnt++" instruction has been moved around napi_gro_receive
for cosmetic reasons, to be in the same spot as rx_byte_cnt++, which
itself must be before napi_gro_receive, because that's when we lose
ownership of the skb.

Signed-off-by: Vladimir Oltean <vladimir.oltean@nxp.com>
---
 drivers/net/ethernet/freescale/enetc/enetc.c | 81 +++++++++++---------
 1 file changed, 44 insertions(+), 37 deletions(-)

diff --git a/drivers/net/ethernet/freescale/enetc/enetc.c b/drivers/net/ethernet/freescale/enetc/enetc.c
index 362cfba7ce14..b2071b8dc316 100644
--- a/drivers/net/ethernet/freescale/enetc/enetc.c
+++ b/drivers/net/ethernet/freescale/enetc/enetc.c
@@ -513,13 +513,6 @@ static void enetc_get_offloads(struct enetc_bdr *rx_ring,
 #endif
 }
 
-static void enetc_process_skb(struct enetc_bdr *rx_ring,
-			      struct sk_buff *skb)
-{
-	skb_record_rx_queue(skb, rx_ring->index);
-	skb->protocol = eth_type_trans(skb, rx_ring->ndev);
-}
-
 static bool enetc_page_reusable(struct page *page)
 {
 	return (!page_is_pfmemalloc(page) && page_ref_count(page) == 1);
@@ -627,6 +620,47 @@ static bool enetc_check_bd_errors_and_consume(struct enetc_bdr *rx_ring,
 	return true;
 }
 
+static struct sk_buff *enetc_build_skb(struct enetc_bdr *rx_ring,
+				       u32 bd_status, union enetc_rx_bd **rxbd,
+				       int *i, int *cleaned_cnt)
+{
+	struct sk_buff *skb;
+	u16 size;
+
+	size = le16_to_cpu((*rxbd)->r.buf_len);
+	skb = enetc_map_rx_buff_to_skb(rx_ring, *i, size);
+	if (!skb)
+		return NULL;
+
+	enetc_get_offloads(rx_ring, *rxbd, skb);
+
+	(*cleaned_cnt)++;
+
+	enetc_rxbd_next(rx_ring, rxbd, i);
+
+	/* not last BD in frame? */
+	while (!(bd_status & ENETC_RXBD_LSTATUS_F)) {
+		bd_status = le32_to_cpu((*rxbd)->r.lstatus);
+		size = ENETC_RXB_DMA_SIZE;
+
+		if (bd_status & ENETC_RXBD_LSTATUS_F) {
+			dma_rmb();
+			size = le16_to_cpu((*rxbd)->r.buf_len);
+		}
+
+		enetc_add_rx_buff_to_skb(rx_ring, *i, size, skb);
+
+		(*cleaned_cnt)++;
+
+		enetc_rxbd_next(rx_ring, rxbd, i);
+	}
+
+	skb_record_rx_queue(skb, rx_ring->index);
+	skb->protocol = eth_type_trans(skb, rx_ring->ndev);
+
+	return skb;
+}
+
 #define ENETC_RXBD_BUNDLE 16 /* # of BDs to update at once */
 
 static int enetc_clean_rx_ring(struct enetc_bdr *rx_ring,
@@ -643,7 +677,6 @@ static int enetc_clean_rx_ring(struct enetc_bdr *rx_ring,
 		union enetc_rx_bd *rxbd;
 		struct sk_buff *skb;
 		u32 bd_status;
-		u16 size;
 
 		if (cleaned_cnt >= ENETC_RXBD_BUNDLE)
 			cleaned_cnt -= enetc_refill_rx_ring(rx_ring,
@@ -661,41 +694,15 @@ static int enetc_clean_rx_ring(struct enetc_bdr *rx_ring,
 						      &rxbd, &i))
 			break;
 
-		size = le16_to_cpu(rxbd->r.buf_len);
-		skb = enetc_map_rx_buff_to_skb(rx_ring, i, size);
+		skb = enetc_build_skb(rx_ring, bd_status, &rxbd, &i,
+				      &cleaned_cnt);
 		if (!skb)
 			break;
 
-		enetc_get_offloads(rx_ring, rxbd, skb);
-
-		cleaned_cnt++;
-
-		enetc_rxbd_next(rx_ring, &rxbd, &i);
-
-		/* not last BD in frame? */
-		while (!(bd_status & ENETC_RXBD_LSTATUS_F)) {
-			bd_status = le32_to_cpu(rxbd->r.lstatus);
-			size = ENETC_RXB_DMA_SIZE;
-
-			if (bd_status & ENETC_RXBD_LSTATUS_F) {
-				dma_rmb();
-				size = le16_to_cpu(rxbd->r.buf_len);
-			}
-
-			enetc_add_rx_buff_to_skb(rx_ring, i, size, skb);
-
-			cleaned_cnt++;
-
-			enetc_rxbd_next(rx_ring, &rxbd, &i);
-		}
-
 		rx_byte_cnt += skb->len;
-
-		enetc_process_skb(rx_ring, skb);
+		rx_frm_cnt++;
 
 		napi_gro_receive(napi, skb);
-
-		rx_frm_cnt++;
 	}
 
 	rx_ring->next_to_clean = i;
-- 
2.25.1


^ permalink raw reply	[flat|nested] 23+ messages in thread

* [PATCH net-next 3/9] net: enetc: add a dedicated is_eof bit in the TX software BD
  2021-03-31 20:08 [PATCH net-next 0/9] XDP for NXP ENETC Vladimir Oltean
  2021-03-31 20:08 ` [PATCH net-next 1/9] net: enetc: consume the error RX buffer descriptors in a dedicated function Vladimir Oltean
  2021-03-31 20:08 ` [PATCH net-next 2/9] net: enetc: move skb creation into enetc_build_skb Vladimir Oltean
@ 2021-03-31 20:08 ` Vladimir Oltean
  2021-03-31 20:08 ` [PATCH net-next 4/9] net: enetc: clean the TX software BD on the TX confirmation path Vladimir Oltean
                   ` (6 subsequent siblings)
  9 siblings, 0 replies; 23+ messages in thread
From: Vladimir Oltean @ 2021-03-31 20:08 UTC (permalink / raw)
  To: Jakub Kicinski, Alexei Starovoitov, Daniel Borkmann,
	Jesper Dangaard Brouer, John Fastabend, Andrii Nakryiko,
	Martin KaFai Lau, Song Liu, Yonghong Song, KP Singh,
	David S. Miller, netdev, bpf
  Cc: Alexander Duyck, Ioana Ciornei, Alex Marginean, Claudiu Manoil,
	Ilias Apalodimas, Vladimir Oltean

From: Vladimir Oltean <vladimir.oltean@nxp.com>

In the transmit path, if we have a scatter/gather frame, it is put into
multiple software buffer descriptors, the last of which has the skb
pointer populated (which is necessary for rearming the TX MSI vector and
for collecting the two-step TX timestamp from the TX confirmation path).

At the moment, this is sufficient, but with XDP_TX, we'll need to
service TX software buffer descriptors that don't have an skb pointer,
however they might be final nonetheless. So add a dedicated bit for
final software BDs that we populate and check explicitly. Also, we keep
looking just for an skb when doing TX timestamping, because we don't
want/need that for XDP.

Signed-off-by: Vladimir Oltean <vladimir.oltean@nxp.com>
---
 drivers/net/ethernet/freescale/enetc/enetc.c | 7 +++----
 drivers/net/ethernet/freescale/enetc/enetc.h | 1 +
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/drivers/net/ethernet/freescale/enetc/enetc.c b/drivers/net/ethernet/freescale/enetc/enetc.c
index b2071b8dc316..37d2d142a744 100644
--- a/drivers/net/ethernet/freescale/enetc/enetc.c
+++ b/drivers/net/ethernet/freescale/enetc/enetc.c
@@ -157,6 +157,7 @@ static int enetc_map_tx_buffs(struct enetc_bdr *tx_ring, struct sk_buff *skb,
 	temp_bd.flags = flags;
 	*txbd = temp_bd;
 
+	tx_ring->tx_swbd[i].is_eof = true;
 	tx_ring->tx_swbd[i].skb = skb;
 
 	enetc_bdr_idx_inc(tx_ring, &i);
@@ -316,8 +317,6 @@ static bool enetc_clean_tx_ring(struct enetc_bdr *tx_ring, int napi_budget)
 	do_tstamp = false;
 
 	while (bds_to_clean && tx_frm_cnt < ENETC_DEFAULT_TX_WORK) {
-		bool is_eof = !!tx_swbd->skb;
-
 		if (unlikely(tx_swbd->check_wb)) {
 			struct enetc_ndev_priv *priv = netdev_priv(ndev);
 			union enetc_tx_bd *txbd;
@@ -335,7 +334,7 @@ static bool enetc_clean_tx_ring(struct enetc_bdr *tx_ring, int napi_budget)
 		if (likely(tx_swbd->dma))
 			enetc_unmap_tx_buff(tx_ring, tx_swbd);
 
-		if (is_eof) {
+		if (tx_swbd->skb) {
 			if (unlikely(do_tstamp)) {
 				enetc_tstamp_tx(tx_swbd->skb, tstamp);
 				do_tstamp = false;
@@ -355,7 +354,7 @@ static bool enetc_clean_tx_ring(struct enetc_bdr *tx_ring, int napi_budget)
 		}
 
 		/* BD iteration loop end */
-		if (is_eof) {
+		if (tx_swbd->is_eof) {
 			tx_frm_cnt++;
 			/* re-arm interrupt source */
 			enetc_wr_reg_hot(tx_ring->idr, BIT(tx_ring->index) |
diff --git a/drivers/net/ethernet/freescale/enetc/enetc.h b/drivers/net/ethernet/freescale/enetc/enetc.h
index 773e412b9f4e..d9e75644b89c 100644
--- a/drivers/net/ethernet/freescale/enetc/enetc.h
+++ b/drivers/net/ethernet/freescale/enetc/enetc.h
@@ -25,6 +25,7 @@ struct enetc_tx_swbd {
 	u8 is_dma_page:1;
 	u8 check_wb:1;
 	u8 do_tstamp:1;
+	u8 is_eof:1;
 };
 
 #define ENETC_RX_MAXFRM_SIZE	ENETC_MAC_MAXFRM_SIZE
-- 
2.25.1


^ permalink raw reply	[flat|nested] 23+ messages in thread

* [PATCH net-next 4/9] net: enetc: clean the TX software BD on the TX confirmation path
  2021-03-31 20:08 [PATCH net-next 0/9] XDP for NXP ENETC Vladimir Oltean
                   ` (2 preceding siblings ...)
  2021-03-31 20:08 ` [PATCH net-next 3/9] net: enetc: add a dedicated is_eof bit in the TX software BD Vladimir Oltean
@ 2021-03-31 20:08 ` Vladimir Oltean
  2021-03-31 20:08 ` [PATCH net-next 5/9] net: enetc: move up enetc_reuse_page and enetc_page_reusable Vladimir Oltean
                   ` (5 subsequent siblings)
  9 siblings, 0 replies; 23+ messages in thread
From: Vladimir Oltean @ 2021-03-31 20:08 UTC (permalink / raw)
  To: Jakub Kicinski, Alexei Starovoitov, Daniel Borkmann,
	Jesper Dangaard Brouer, John Fastabend, Andrii Nakryiko,
	Martin KaFai Lau, Song Liu, Yonghong Song, KP Singh,
	David S. Miller, netdev, bpf
  Cc: Alexander Duyck, Ioana Ciornei, Alex Marginean, Claudiu Manoil,
	Ilias Apalodimas, Vladimir Oltean

From: Vladimir Oltean <vladimir.oltean@nxp.com>

With the future introduction of some new fields into enetc_tx_swbd such
as is_xdp_tx, is_xdp_redirect etc, we need not only to set these bits
to true from the XDP_TX/XDP_REDIRECT code path, but also to false from
the old code paths.

This is because TX software buffer descriptors are kept in a ring that
is shadow of the hardware TX ring, so these structures keep getting
reused, and there is always the possibility that when a software BD is
reused (after we ran a full circle through the TX ring), the old user of
the tx_swbd had set is_xdp_tx = true, and now we are sending a regular
skb, which would need to set is_xdp_tx = false.

To be minimally invasive to the old code paths, let's just scrub the
software TX BD in the TX confirmation path (enetc_clean_tx_ring), once
we know that nobody uses this software TX BD (tx_ring->next_to_clean
hasn't yet been updated, and the TX paths check enetc_bd_unused which
tells them if there's any more space in the TX ring for a new enqueue).

Signed-off-by: Vladimir Oltean <vladimir.oltean@nxp.com>
---
 drivers/net/ethernet/freescale/enetc/enetc.c | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/drivers/net/ethernet/freescale/enetc/enetc.c b/drivers/net/ethernet/freescale/enetc/enetc.c
index 37d2d142a744..ade05518b496 100644
--- a/drivers/net/ethernet/freescale/enetc/enetc.c
+++ b/drivers/net/ethernet/freescale/enetc/enetc.c
@@ -344,6 +344,10 @@ static bool enetc_clean_tx_ring(struct enetc_bdr *tx_ring, int napi_budget)
 		}
 
 		tx_byte_cnt += tx_swbd->len;
+		/* Scrub the swbd here so we don't have to do that
+		 * when we reuse it during xmit
+		 */
+		memset(tx_swbd, 0, sizeof(*tx_swbd));
 
 		bds_to_clean--;
 		tx_swbd++;
-- 
2.25.1


^ permalink raw reply	[flat|nested] 23+ messages in thread

* [PATCH net-next 5/9] net: enetc: move up enetc_reuse_page and enetc_page_reusable
  2021-03-31 20:08 [PATCH net-next 0/9] XDP for NXP ENETC Vladimir Oltean
                   ` (3 preceding siblings ...)
  2021-03-31 20:08 ` [PATCH net-next 4/9] net: enetc: clean the TX software BD on the TX confirmation path Vladimir Oltean
@ 2021-03-31 20:08 ` Vladimir Oltean
  2021-03-31 20:08 ` [PATCH net-next 6/9] net: enetc: add support for XDP_DROP and XDP_PASS Vladimir Oltean
                   ` (4 subsequent siblings)
  9 siblings, 0 replies; 23+ messages in thread
From: Vladimir Oltean @ 2021-03-31 20:08 UTC (permalink / raw)
  To: Jakub Kicinski, Alexei Starovoitov, Daniel Borkmann,
	Jesper Dangaard Brouer, John Fastabend, Andrii Nakryiko,
	Martin KaFai Lau, Song Liu, Yonghong Song, KP Singh,
	David S. Miller, netdev, bpf
  Cc: Alexander Duyck, Ioana Ciornei, Alex Marginean, Claudiu Manoil,
	Ilias Apalodimas, Vladimir Oltean

From: Vladimir Oltean <vladimir.oltean@nxp.com>

For XDP_TX, we need to call enetc_reuse_page from enetc_clean_tx_ring,
so we need to avoid a forward declaration.

Signed-off-by: Vladimir Oltean <vladimir.oltean@nxp.com>
---
 drivers/net/ethernet/freescale/enetc/enetc.c | 38 ++++++++++----------
 1 file changed, 19 insertions(+), 19 deletions(-)

diff --git a/drivers/net/ethernet/freescale/enetc/enetc.c b/drivers/net/ethernet/freescale/enetc/enetc.c
index ade05518b496..38301d0d7f0c 100644
--- a/drivers/net/ethernet/freescale/enetc/enetc.c
+++ b/drivers/net/ethernet/freescale/enetc/enetc.c
@@ -275,6 +275,25 @@ static int enetc_bd_ready_count(struct enetc_bdr *tx_ring, int ci)
 	return pi >= ci ? pi - ci : tx_ring->bd_count - ci + pi;
 }
 
+static bool enetc_page_reusable(struct page *page)
+{
+	return (!page_is_pfmemalloc(page) && page_ref_count(page) == 1);
+}
+
+static void enetc_reuse_page(struct enetc_bdr *rx_ring,
+			     struct enetc_rx_swbd *old)
+{
+	struct enetc_rx_swbd *new;
+
+	new = &rx_ring->rx_swbd[rx_ring->next_to_alloc];
+
+	/* next buf that may reuse a page */
+	enetc_bdr_idx_inc(rx_ring, &rx_ring->next_to_alloc);
+
+	/* copy page reference */
+	*new = *old;
+}
+
 static void enetc_get_tx_tstamp(struct enetc_hw *hw, union enetc_tx_bd *txbd,
 				u64 *tstamp)
 {
@@ -516,25 +535,6 @@ static void enetc_get_offloads(struct enetc_bdr *rx_ring,
 #endif
 }
 
-static bool enetc_page_reusable(struct page *page)
-{
-	return (!page_is_pfmemalloc(page) && page_ref_count(page) == 1);
-}
-
-static void enetc_reuse_page(struct enetc_bdr *rx_ring,
-			     struct enetc_rx_swbd *old)
-{
-	struct enetc_rx_swbd *new;
-
-	new = &rx_ring->rx_swbd[rx_ring->next_to_alloc];
-
-	/* next buf that may reuse a page */
-	enetc_bdr_idx_inc(rx_ring, &rx_ring->next_to_alloc);
-
-	/* copy page reference */
-	*new = *old;
-}
-
 static struct enetc_rx_swbd *enetc_get_rx_buff(struct enetc_bdr *rx_ring,
 					       int i, u16 size)
 {
-- 
2.25.1


^ permalink raw reply	[flat|nested] 23+ messages in thread

* [PATCH net-next 6/9] net: enetc: add support for XDP_DROP and XDP_PASS
  2021-03-31 20:08 [PATCH net-next 0/9] XDP for NXP ENETC Vladimir Oltean
                   ` (4 preceding siblings ...)
  2021-03-31 20:08 ` [PATCH net-next 5/9] net: enetc: move up enetc_reuse_page and enetc_page_reusable Vladimir Oltean
@ 2021-03-31 20:08 ` Vladimir Oltean
  2021-03-31 20:08 ` [PATCH net-next 7/9] net: enetc: add support for XDP_TX Vladimir Oltean
                   ` (3 subsequent siblings)
  9 siblings, 0 replies; 23+ messages in thread
From: Vladimir Oltean @ 2021-03-31 20:08 UTC (permalink / raw)
  To: Jakub Kicinski, Alexei Starovoitov, Daniel Borkmann,
	Jesper Dangaard Brouer, John Fastabend, Andrii Nakryiko,
	Martin KaFai Lau, Song Liu, Yonghong Song, KP Singh,
	David S. Miller, netdev, bpf
  Cc: Alexander Duyck, Ioana Ciornei, Alex Marginean, Claudiu Manoil,
	Ilias Apalodimas, Vladimir Oltean

From: Vladimir Oltean <vladimir.oltean@nxp.com>

For the RX ring, enetc uses an allocation scheme based on pages split
into two buffers, which is already very efficient in terms of preventing
reallocations / maximizing reuse, so I see no reason why I would change
that.

 +--------+--------+--------+--------+--------+--------+--------+
 |        |        |        |        |        |        |        |
 | half B | half B | half B | half B | half B | half B | half B |
 |        |        |        |        |        |        |        |
 +--------+--------+--------+--------+--------+--------+--------+
 |        |        |        |        |        |        |        |
 | half A | half A | half A | half A | half A | half A | half A | RX ring
 |        |        |        |        |        |        |        |
 +--------+--------+--------+--------+--------+--------+--------+
     ^                                                     ^
     |                                                     |
 next_to_clean                                       next_to_alloc
                                                      next_to_use

                   +--------+--------+--------+--------+--------+
                   |        |        |        |        |        |
                   | half B | half B | half B | half B | half B |
                   |        |        |        |        |        |
 +--------+--------+--------+--------+--------+--------+--------+
 |        |        |        |        |        |        |        |
 | half B | half B | half A | half A | half A | half A | half A | RX ring
 |        |        |        |        |        |        |        |
 +--------+--------+--------+--------+--------+--------+--------+
 |        |        |   ^                                   ^
 | half A | half A |   |                                   |
 |        |        | next_to_clean                   next_to_use
 +--------+--------+
              ^
              |
         next_to_alloc

then when enetc_refill_rx_ring is called, whose purpose is to advance
next_to_use, it sees that it can take buffers up to next_to_alloc, and
it says "oh, hey, rx_swbd->page isn't NULL, I don't need to allocate
one!".

The only problem is that for default PAGE_SIZE values of 4096, buffer
sizes are 2048 bytes. While this is enough for normal skb allocations at
an MTU of 1500 bytes, for XDP it isn't, because the XDP headroom is 256
bytes, and including skb_shared_info and alignment, we end up being able
to make use of only 1472 bytes, which is insufficient for the default
MTU.

To solve that problem, we implement scatter/gather processing in the
driver, because we would really like to keep the existing allocation
scheme. A packet of 1500 bytes is received in a buffer of 1472 bytes and
another one of 28 bytes.

Because the headroom required by XDP is different (and much larger) than
the one required by the network stack, whenever a BPF program is added
or deleted on the port, we drain the existing RX buffers and seed new
ones with the required headroom. We also keep the required headroom in
rx_ring->buffer_offset.

The simplest way to implement XDP_PASS, where an skb must be created, is
to create an xdp_buff based on the next_to_clean RX BDs, but not clear
those BDs from the RX ring yet, just keep the original index at which
the BDs for this frame started. Then, if the verdict is XDP_PASS,
instead of converting the xdb_buff to an skb, we replay a call to
enetc_build_skb (just as in the normal enetc_clean_rx_ring case),
starting from the original BD index.

We would also like to be minimally invasive to the regular RX data path,
and not check whether there is a BPF program attached to the ring on
every packet. So we create a separate RX ring processing function for
XDP.

Because we only install/remove the BPF program while the interface is
down, we forgo the rcu_read_lock() in enetc_clean_rx_ring, since there
shouldn't be any circumstance in which we are processing packets and
there is a potentially freed BPF program attached to the RX ring.

Signed-off-by: Vladimir Oltean <vladimir.oltean@nxp.com>
---
 drivers/net/ethernet/freescale/enetc/enetc.c  | 284 ++++++++++++++++--
 drivers/net/ethernet/freescale/enetc/enetc.h  |  14 +
 .../ethernet/freescale/enetc/enetc_ethtool.c  |   2 +
 .../net/ethernet/freescale/enetc/enetc_pf.c   |   1 +
 4 files changed, 281 insertions(+), 20 deletions(-)

diff --git a/drivers/net/ethernet/freescale/enetc/enetc.c b/drivers/net/ethernet/freescale/enetc/enetc.c
index 38301d0d7f0c..58bb0b78585a 100644
--- a/drivers/net/ethernet/freescale/enetc/enetc.c
+++ b/drivers/net/ethernet/freescale/enetc/enetc.c
@@ -2,6 +2,7 @@
 /* Copyright 2017-2019 NXP */
 
 #include "enetc.h"
+#include <linux/bpf_trace.h>
 #include <linux/tcp.h>
 #include <linux/udp.h>
 #include <linux/vmalloc.h>
@@ -420,7 +421,7 @@ static bool enetc_new_page(struct enetc_bdr *rx_ring,
 
 	rx_swbd->dma = addr;
 	rx_swbd->page = page;
-	rx_swbd->page_offset = ENETC_RXB_PAD;
+	rx_swbd->page_offset = rx_ring->buffer_offset;
 
 	return true;
 }
@@ -550,6 +551,8 @@ static void enetc_put_rx_buff(struct enetc_bdr *rx_ring,
 			      struct enetc_rx_swbd *rx_swbd)
 {
 	if (likely(enetc_page_reusable(rx_swbd->page))) {
+		size_t buffer_size = ENETC_RXB_TRUESIZE - rx_ring->buffer_offset;
+
 		rx_swbd->page_offset ^= ENETC_RXB_TRUESIZE;
 		page_ref_inc(rx_swbd->page);
 
@@ -558,8 +561,7 @@ static void enetc_put_rx_buff(struct enetc_bdr *rx_ring,
 		/* sync for use by the device */
 		dma_sync_single_range_for_device(rx_ring->dev, rx_swbd->dma,
 						 rx_swbd->page_offset,
-						 ENETC_RXB_DMA_SIZE,
-						 DMA_FROM_DEVICE);
+						 buffer_size, DMA_FROM_DEVICE);
 	} else {
 		dma_unmap_page(rx_ring->dev, rx_swbd->dma,
 			       PAGE_SIZE, DMA_FROM_DEVICE);
@@ -576,13 +578,13 @@ static struct sk_buff *enetc_map_rx_buff_to_skb(struct enetc_bdr *rx_ring,
 	void *ba;
 
 	ba = page_address(rx_swbd->page) + rx_swbd->page_offset;
-	skb = build_skb(ba - ENETC_RXB_PAD, ENETC_RXB_TRUESIZE);
+	skb = build_skb(ba - rx_ring->buffer_offset, ENETC_RXB_TRUESIZE);
 	if (unlikely(!skb)) {
 		rx_ring->stats.rx_alloc_errs++;
 		return NULL;
 	}
 
-	skb_reserve(skb, ENETC_RXB_PAD);
+	skb_reserve(skb, rx_ring->buffer_offset);
 	__skb_put(skb, size);
 
 	enetc_put_rx_buff(rx_ring, rx_swbd);
@@ -625,7 +627,7 @@ static bool enetc_check_bd_errors_and_consume(struct enetc_bdr *rx_ring,
 
 static struct sk_buff *enetc_build_skb(struct enetc_bdr *rx_ring,
 				       u32 bd_status, union enetc_rx_bd **rxbd,
-				       int *i, int *cleaned_cnt)
+				       int *i, int *cleaned_cnt, int buffer_size)
 {
 	struct sk_buff *skb;
 	u16 size;
@@ -644,7 +646,7 @@ static struct sk_buff *enetc_build_skb(struct enetc_bdr *rx_ring,
 	/* not last BD in frame? */
 	while (!(bd_status & ENETC_RXBD_LSTATUS_F)) {
 		bd_status = le32_to_cpu((*rxbd)->r.lstatus);
-		size = ENETC_RXB_DMA_SIZE;
+		size = buffer_size;
 
 		if (bd_status & ENETC_RXBD_LSTATUS_F) {
 			dma_rmb();
@@ -698,7 +700,7 @@ static int enetc_clean_rx_ring(struct enetc_bdr *rx_ring,
 			break;
 
 		skb = enetc_build_skb(rx_ring, bd_status, &rxbd, &i,
-				      &cleaned_cnt);
+				      &cleaned_cnt, ENETC_RXB_DMA_SIZE);
 		if (!skb)
 			break;
 
@@ -716,10 +718,174 @@ static int enetc_clean_rx_ring(struct enetc_bdr *rx_ring,
 	return rx_frm_cnt;
 }
 
+static void enetc_map_rx_buff_to_xdp(struct enetc_bdr *rx_ring, int i,
+				     struct xdp_buff *xdp_buff, u16 size)
+{
+	struct enetc_rx_swbd *rx_swbd = enetc_get_rx_buff(rx_ring, i, size);
+	void *hard_start = page_address(rx_swbd->page) + rx_swbd->page_offset;
+	struct skb_shared_info *shinfo;
+
+	xdp_prepare_buff(xdp_buff, hard_start - rx_ring->buffer_offset,
+			 rx_ring->buffer_offset, size, false);
+
+	shinfo = xdp_get_shared_info_from_buff(xdp_buff);
+	shinfo->nr_frags = 0;
+}
+
+static void enetc_add_rx_buff_to_xdp(struct enetc_bdr *rx_ring, int i,
+				     u16 size, struct xdp_buff *xdp_buff)
+{
+	struct skb_shared_info *shinfo = xdp_get_shared_info_from_buff(xdp_buff);
+	struct enetc_rx_swbd *rx_swbd = enetc_get_rx_buff(rx_ring, i, size);
+	skb_frag_t *frag = &shinfo->frags[shinfo->nr_frags];
+
+	skb_frag_off_set(frag, rx_swbd->page_offset);
+	skb_frag_size_set(frag, size);
+	__skb_frag_set_page(frag, rx_swbd->page);
+
+	shinfo->nr_frags++;
+}
+
+static void enetc_build_xdp_buff(struct enetc_bdr *rx_ring, u32 bd_status,
+				 union enetc_rx_bd **rxbd, int *i,
+				 int *cleaned_cnt, struct xdp_buff *xdp_buff)
+{
+	u16 size = le16_to_cpu((*rxbd)->r.buf_len);
+
+	xdp_init_buff(xdp_buff, ENETC_RXB_TRUESIZE, &rx_ring->xdp.rxq);
+
+	enetc_map_rx_buff_to_xdp(rx_ring, *i, xdp_buff, size);
+	(*cleaned_cnt)++;
+	enetc_rxbd_next(rx_ring, rxbd, i);
+
+	/* not last BD in frame? */
+	while (!(bd_status & ENETC_RXBD_LSTATUS_F)) {
+		bd_status = le32_to_cpu((*rxbd)->r.lstatus);
+		size = ENETC_RXB_DMA_SIZE_XDP;
+
+		if (bd_status & ENETC_RXBD_LSTATUS_F) {
+			dma_rmb();
+			size = le16_to_cpu((*rxbd)->r.buf_len);
+		}
+
+		enetc_add_rx_buff_to_xdp(rx_ring, *i, size, xdp_buff);
+		(*cleaned_cnt)++;
+		enetc_rxbd_next(rx_ring, rxbd, i);
+	}
+}
+
+/* Reuse the current page without performing half-page buffer flipping */
+static void enetc_put_xdp_buff(struct enetc_bdr *rx_ring,
+			       struct enetc_rx_swbd *rx_swbd)
+{
+	enetc_reuse_page(rx_ring, rx_swbd);
+
+	/* sync for use by the device */
+	dma_sync_single_range_for_device(rx_ring->dev, rx_swbd->dma,
+					 rx_swbd->page_offset,
+					 ENETC_RXB_DMA_SIZE_XDP,
+					 DMA_FROM_DEVICE);
+
+	rx_swbd->page = NULL;
+}
+
+static void enetc_xdp_drop(struct enetc_bdr *rx_ring, int rx_ring_first,
+			   int rx_ring_last)
+{
+	while (rx_ring_first != rx_ring_last) {
+		enetc_put_xdp_buff(rx_ring,
+				   &rx_ring->rx_swbd[rx_ring_first]);
+		enetc_bdr_idx_inc(rx_ring, &rx_ring_first);
+	}
+	rx_ring->stats.xdp_drops++;
+}
+
+static int enetc_clean_rx_ring_xdp(struct enetc_bdr *rx_ring,
+				   struct napi_struct *napi, int work_limit,
+				   struct bpf_prog *prog)
+{
+	int rx_frm_cnt = 0, rx_byte_cnt = 0;
+	int cleaned_cnt, i;
+	u32 xdp_act;
+
+	cleaned_cnt = enetc_bd_unused(rx_ring);
+	/* next descriptor to process */
+	i = rx_ring->next_to_clean;
+
+	while (likely(rx_frm_cnt < work_limit)) {
+		union enetc_rx_bd *rxbd, *orig_rxbd;
+		int orig_i, orig_cleaned_cnt;
+		struct xdp_buff xdp_buff;
+		struct sk_buff *skb;
+		u32 bd_status;
+
+		if (cleaned_cnt >= ENETC_RXBD_BUNDLE)
+			cleaned_cnt -= enetc_refill_rx_ring(rx_ring,
+							    cleaned_cnt);
+
+		rxbd = enetc_rxbd(rx_ring, i);
+		bd_status = le32_to_cpu(rxbd->r.lstatus);
+		if (!bd_status)
+			break;
+
+		enetc_wr_reg_hot(rx_ring->idr, BIT(rx_ring->index));
+		dma_rmb(); /* for reading other rxbd fields */
+
+		if (enetc_check_bd_errors_and_consume(rx_ring, bd_status,
+						      &rxbd, &i))
+			break;
+
+		orig_rxbd = rxbd;
+		orig_cleaned_cnt = cleaned_cnt;
+		orig_i = i;
+
+		enetc_build_xdp_buff(rx_ring, bd_status, &rxbd, &i,
+				     &cleaned_cnt, &xdp_buff);
+
+		xdp_act = bpf_prog_run_xdp(prog, &xdp_buff);
+
+		switch (xdp_act) {
+		case XDP_ABORTED:
+			trace_xdp_exception(rx_ring->ndev, prog, xdp_act);
+			fallthrough;
+		case XDP_DROP:
+			enetc_xdp_drop(rx_ring, orig_i, i);
+			break;
+		case XDP_PASS:
+			rxbd = orig_rxbd;
+			cleaned_cnt = orig_cleaned_cnt;
+			i = orig_i;
+
+			skb = enetc_build_skb(rx_ring, bd_status, &rxbd,
+					      &i, &cleaned_cnt,
+					      ENETC_RXB_DMA_SIZE_XDP);
+			if (unlikely(!skb))
+				/* Exit the switch/case, not the loop */
+				break;
+
+			napi_gro_receive(napi, skb);
+			break;
+		default:
+			bpf_warn_invalid_xdp_action(xdp_act);
+		}
+
+		rx_frm_cnt++;
+	}
+
+	rx_ring->next_to_clean = i;
+
+	rx_ring->stats.packets += rx_frm_cnt;
+	rx_ring->stats.bytes += rx_byte_cnt;
+
+	return rx_frm_cnt;
+}
+
 static int enetc_poll(struct napi_struct *napi, int budget)
 {
 	struct enetc_int_vector
 		*v = container_of(napi, struct enetc_int_vector, napi);
+	struct enetc_bdr *rx_ring = &v->rx_ring;
+	struct bpf_prog *prog;
 	bool complete = true;
 	int work_done;
 	int i;
@@ -730,7 +896,11 @@ static int enetc_poll(struct napi_struct *napi, int budget)
 		if (!enetc_clean_tx_ring(&v->tx_ring[i], budget))
 			complete = false;
 
-	work_done = enetc_clean_rx_ring(&v->rx_ring, napi, budget);
+	prog = rx_ring->xdp.prog;
+	if (prog)
+		work_done = enetc_clean_rx_ring_xdp(rx_ring, napi, budget, prog);
+	else
+		work_done = enetc_clean_rx_ring(rx_ring, napi, budget);
 	if (work_done == budget)
 		complete = false;
 	if (work_done)
@@ -1120,7 +1290,10 @@ static void enetc_setup_rxbdr(struct enetc_hw *hw, struct enetc_bdr *rx_ring)
 	enetc_rxbdr_wr(hw, idx, ENETC_RBLENR,
 		       ENETC_RTBLENR_LEN(rx_ring->bd_count));
 
-	enetc_rxbdr_wr(hw, idx, ENETC_RBBSR, ENETC_RXB_DMA_SIZE);
+	if (rx_ring->xdp.prog)
+		enetc_rxbdr_wr(hw, idx, ENETC_RBBSR, ENETC_RXB_DMA_SIZE_XDP);
+	else
+		enetc_rxbdr_wr(hw, idx, ENETC_RBBSR, ENETC_RXB_DMA_SIZE);
 
 	enetc_rxbdr_wr(hw, idx, ENETC_RBPIR, 0);
 
@@ -1511,6 +1684,54 @@ int enetc_setup_tc(struct net_device *ndev, enum tc_setup_type type,
 	}
 }
 
+static int enetc_setup_xdp_prog(struct net_device *dev, struct bpf_prog *prog,
+				struct netlink_ext_ack *extack)
+{
+	struct enetc_ndev_priv *priv = netdev_priv(dev);
+	struct bpf_prog *old_prog;
+	bool is_up;
+	int i;
+
+	/* The buffer layout is changing, so we need to drain the old
+	 * RX buffers and seed new ones.
+	 */
+	is_up = netif_running(dev);
+	if (is_up)
+		dev_close(dev);
+
+	old_prog = xchg(&priv->xdp_prog, prog);
+	if (old_prog)
+		bpf_prog_put(old_prog);
+
+	for (i = 0; i < priv->num_rx_rings; i++) {
+		struct enetc_bdr *rx_ring = priv->rx_ring[i];
+
+		rx_ring->xdp.prog = prog;
+
+		if (prog)
+			rx_ring->buffer_offset = XDP_PACKET_HEADROOM;
+		else
+			rx_ring->buffer_offset = ENETC_RXB_PAD;
+	}
+
+	if (is_up)
+		return dev_open(dev, extack);
+
+	return 0;
+}
+
+int enetc_setup_bpf(struct net_device *dev, struct netdev_bpf *xdp)
+{
+	switch (xdp->command) {
+	case XDP_SETUP_PROG:
+		return enetc_setup_xdp_prog(dev, xdp->prog, xdp->extack);
+	default:
+		return -EINVAL;
+	}
+
+	return 0;
+}
+
 struct net_device_stats *enetc_get_stats(struct net_device *ndev)
 {
 	struct enetc_ndev_priv *priv = netdev_priv(ndev);
@@ -1727,6 +1948,28 @@ int enetc_alloc_msix(struct enetc_ndev_priv *priv)
 
 		priv->int_vector[i] = v;
 
+		bdr = &v->rx_ring;
+		bdr->index = i;
+		bdr->ndev = priv->ndev;
+		bdr->dev = priv->dev;
+		bdr->bd_count = priv->rx_bd_count;
+		bdr->buffer_offset = ENETC_RXB_PAD;
+		priv->rx_ring[i] = bdr;
+
+		err = xdp_rxq_info_reg(&bdr->xdp.rxq, priv->ndev, i, 0);
+		if (err) {
+			kfree(v);
+			goto fail;
+		}
+
+		err = xdp_rxq_info_reg_mem_model(&bdr->xdp.rxq,
+						 MEM_TYPE_PAGE_SHARED, NULL);
+		if (err) {
+			xdp_rxq_info_unreg(&bdr->xdp.rxq);
+			kfree(v);
+			goto fail;
+		}
+
 		/* init defaults for adaptive IC */
 		if (priv->ic_mode & ENETC_IC_RX_ADAPTIVE) {
 			v->rx_ictt = 0x1;
@@ -1754,22 +1997,20 @@ int enetc_alloc_msix(struct enetc_ndev_priv *priv)
 			bdr->bd_count = priv->tx_bd_count;
 			priv->tx_ring[idx] = bdr;
 		}
-
-		bdr = &v->rx_ring;
-		bdr->index = i;
-		bdr->ndev = priv->ndev;
-		bdr->dev = priv->dev;
-		bdr->bd_count = priv->rx_bd_count;
-		priv->rx_ring[i] = bdr;
 	}
 
 	return 0;
 
 fail:
 	while (i--) {
-		netif_napi_del(&priv->int_vector[i]->napi);
-		cancel_work_sync(&priv->int_vector[i]->rx_dim.work);
-		kfree(priv->int_vector[i]);
+		struct enetc_int_vector *v = priv->int_vector[i];
+		struct enetc_bdr *rx_ring = &v->rx_ring;
+
+		xdp_rxq_info_unreg_mem_model(&rx_ring->xdp.rxq);
+		xdp_rxq_info_unreg(&rx_ring->xdp.rxq);
+		netif_napi_del(&v->napi);
+		cancel_work_sync(&v->rx_dim.work);
+		kfree(v);
 	}
 
 	pci_free_irq_vectors(pdev);
@@ -1783,7 +2024,10 @@ void enetc_free_msix(struct enetc_ndev_priv *priv)
 
 	for (i = 0; i < priv->bdr_int_num; i++) {
 		struct enetc_int_vector *v = priv->int_vector[i];
+		struct enetc_bdr *rx_ring = &v->rx_ring;
 
+		xdp_rxq_info_unreg_mem_model(&rx_ring->xdp.rxq);
+		xdp_rxq_info_unreg(&rx_ring->xdp.rxq);
 		netif_napi_del(&v->napi);
 		cancel_work_sync(&v->rx_dim.work);
 	}
diff --git a/drivers/net/ethernet/freescale/enetc/enetc.h b/drivers/net/ethernet/freescale/enetc/enetc.h
index d9e75644b89c..5815addfe966 100644
--- a/drivers/net/ethernet/freescale/enetc/enetc.h
+++ b/drivers/net/ethernet/freescale/enetc/enetc.h
@@ -33,6 +33,8 @@ struct enetc_tx_swbd {
 #define ENETC_RXB_PAD		NET_SKB_PAD /* add extra space if needed */
 #define ENETC_RXB_DMA_SIZE	\
 	(SKB_WITH_OVERHEAD(ENETC_RXB_TRUESIZE) - ENETC_RXB_PAD)
+#define ENETC_RXB_DMA_SIZE_XDP	\
+	(SKB_WITH_OVERHEAD(ENETC_RXB_TRUESIZE) - XDP_PACKET_HEADROOM)
 
 struct enetc_rx_swbd {
 	dma_addr_t dma;
@@ -44,6 +46,12 @@ struct enetc_ring_stats {
 	unsigned int packets;
 	unsigned int bytes;
 	unsigned int rx_alloc_errs;
+	unsigned int xdp_drops;
+};
+
+struct enetc_xdp_data {
+	struct xdp_rxq_info rxq;
+	struct bpf_prog *prog;
 };
 
 #define ENETC_RX_RING_DEFAULT_SIZE	512
@@ -72,6 +80,9 @@ struct enetc_bdr {
 	};
 	void __iomem *idr; /* Interrupt Detect Register pointer */
 
+	int buffer_offset;
+	struct enetc_xdp_data xdp;
+
 	struct enetc_ring_stats stats;
 
 	dma_addr_t bd_dma_base;
@@ -276,6 +287,8 @@ struct enetc_ndev_priv {
 	struct phylink *phylink;
 	int ic_mode;
 	u32 tx_ictt;
+
+	struct bpf_prog *xdp_prog;
 };
 
 /* Messaging */
@@ -315,6 +328,7 @@ int enetc_set_features(struct net_device *ndev,
 int enetc_ioctl(struct net_device *ndev, struct ifreq *rq, int cmd);
 int enetc_setup_tc(struct net_device *ndev, enum tc_setup_type type,
 		   void *type_data);
+int enetc_setup_bpf(struct net_device *dev, struct netdev_bpf *xdp);
 
 /* ethtool */
 void enetc_set_ethtool_ops(struct net_device *ndev);
diff --git a/drivers/net/ethernet/freescale/enetc/enetc_ethtool.c b/drivers/net/ethernet/freescale/enetc/enetc_ethtool.c
index 89e558135432..0183c13f8c1e 100644
--- a/drivers/net/ethernet/freescale/enetc/enetc_ethtool.c
+++ b/drivers/net/ethernet/freescale/enetc/enetc_ethtool.c
@@ -192,6 +192,7 @@ static const struct {
 static const char rx_ring_stats[][ETH_GSTRING_LEN] = {
 	"Rx ring %2d frames",
 	"Rx ring %2d alloc errors",
+	"Rx ring %2d XDP drops",
 };
 
 static const char tx_ring_stats[][ETH_GSTRING_LEN] = {
@@ -273,6 +274,7 @@ static void enetc_get_ethtool_stats(struct net_device *ndev,
 	for (i = 0; i < priv->num_rx_rings; i++) {
 		data[o++] = priv->rx_ring[i]->stats.packets;
 		data[o++] = priv->rx_ring[i]->stats.rx_alloc_errs;
+		data[o++] = priv->rx_ring[i]->stats.xdp_drops;
 	}
 
 	if (!enetc_si_is_pf(priv->si))
diff --git a/drivers/net/ethernet/freescale/enetc/enetc_pf.c b/drivers/net/ethernet/freescale/enetc/enetc_pf.c
index 5e95afd61c87..0484dbe13422 100644
--- a/drivers/net/ethernet/freescale/enetc/enetc_pf.c
+++ b/drivers/net/ethernet/freescale/enetc/enetc_pf.c
@@ -707,6 +707,7 @@ static const struct net_device_ops enetc_ndev_ops = {
 	.ndo_set_features	= enetc_pf_set_features,
 	.ndo_do_ioctl		= enetc_ioctl,
 	.ndo_setup_tc		= enetc_setup_tc,
+	.ndo_bpf		= enetc_setup_bpf,
 };
 
 static void enetc_pf_netdev_setup(struct enetc_si *si, struct net_device *ndev,
-- 
2.25.1


^ permalink raw reply	[flat|nested] 23+ messages in thread

* [PATCH net-next 7/9] net: enetc: add support for XDP_TX
  2021-03-31 20:08 [PATCH net-next 0/9] XDP for NXP ENETC Vladimir Oltean
                   ` (5 preceding siblings ...)
  2021-03-31 20:08 ` [PATCH net-next 6/9] net: enetc: add support for XDP_DROP and XDP_PASS Vladimir Oltean
@ 2021-03-31 20:08 ` Vladimir Oltean
  2021-03-31 20:08 ` [PATCH net-next 8/9] net: enetc: increase RX ring default size Vladimir Oltean
                   ` (2 subsequent siblings)
  9 siblings, 0 replies; 23+ messages in thread
From: Vladimir Oltean @ 2021-03-31 20:08 UTC (permalink / raw)
  To: Jakub Kicinski, Alexei Starovoitov, Daniel Borkmann,
	Jesper Dangaard Brouer, John Fastabend, Andrii Nakryiko,
	Martin KaFai Lau, Song Liu, Yonghong Song, KP Singh,
	David S. Miller, netdev, bpf
  Cc: Alexander Duyck, Ioana Ciornei, Alex Marginean, Claudiu Manoil,
	Ilias Apalodimas, Vladimir Oltean

From: Vladimir Oltean <vladimir.oltean@nxp.com>

For reflecting packets back into the interface they came from, we create
an array of TX software BDs derived from the RX software BDs. Therefore,
we need to extend the TX software BD structure to contain most of the
stuff that's already present in the RX software BD structure, for
reasons that will become evident in a moment.

For a frame with the XDP_TX verdict, we don't reuse any buffer right
away as we do for XDP_DROP (the same page half) or XDP_PASS (the other
page half, same as the skb code path).

Because the buffer transfers ownership from the RX ring to the TX ring,
reusing any page half right away is very dangerous. So what we can do is
we can recycle the same page half as soon as TX is complete.

The code path is:
enetc_poll
-> enetc_clean_rx_ring_xdp
   -> enetc_xdp_tx
   -> enetc_refill_rx_ring
(time passes, another MSI interrupt is raised)
enetc_poll
-> enetc_clean_tx_ring
   -> enetc_recycle_xdp_tx_buff

But that creates a problem, because there is a potentially large time
window between enetc_xdp_tx and enetc_recycle_xdp_tx_buff, period in
which we'll have less and less RX buffers.

Basically, when the ship starts sinking, the knee-jerk reaction is to
let enetc_refill_rx_ring do what it does for the standard skb code path
(refill every 16 consumed buffers), but that turns out to be very
inefficient. The problem is that we have no rx_swbd->page at our
disposal from the enetc_reuse_page path, so enetc_refill_rx_ring would
have to call enetc_new_page for every buffer that we refill (if we
choose to refill at this early stage). Very inefficient, it only makes
the problem worse, because page allocation is an expensive process, and
CPU time is exactly what we're lacking.

Additionally, there is an even bigger problem: if we let
enetc_refill_rx_ring top up the ring's buffers again from the RX path,
remember that the buffers sent to transmission haven't disappeared
anywhere. They will be eventually sent, and processed in
enetc_clean_tx_ring, and an attempt will be made to recycle them.
But surprise, the RX ring is already full of new buffers, because we
were premature in deciding that we should refill. So not only we took
the expensive decision of allocating new pages, but now we must throw
away perfectly good and reusable buffers.

So what we do is we implement an elastic refill mechanism, which keeps
track of the number of in-flight XDP_TX buffer descriptors. We top up
the RX ring only up to the total ring capacity minus the number of BDs
that are in flight (because we know that those BDs will return to us
eventually).

The enetc driver manages 1 RX ring per CPU, and the default TX ring
management is the same. So we do XDP_TX towards the TX ring of the same
index, because it is affined to the same CPU. This will probably not
produce great results when we have a tc-taprio/tc-mqprio qdisc on the
interface, because in that case, the number of TX rings might be
greater, but I didn't add any checks for that yet (mostly because I
didn't know what checks to add).

It should also be noted that we need to change the DMA mapping direction
for RX buffers, since they may now be reflected into the TX ring of the
same device. We choose to use DMA_BIDIRECTIONAL instead of unmapping and
remapping as DMA_TO_DEVICE, because performance is better this way.

Signed-off-by: Vladimir Oltean <vladimir.oltean@nxp.com>
---
 drivers/net/ethernet/freescale/enetc/enetc.c  | 217 ++++++++++++++++--
 drivers/net/ethernet/freescale/enetc/enetc.h  |  25 ++
 .../ethernet/freescale/enetc/enetc_ethtool.c  |  11 +-
 3 files changed, 228 insertions(+), 25 deletions(-)

diff --git a/drivers/net/ethernet/freescale/enetc/enetc.c b/drivers/net/ethernet/freescale/enetc/enetc.c
index 58bb0b78585a..ba5313a5d7a4 100644
--- a/drivers/net/ethernet/freescale/enetc/enetc.c
+++ b/drivers/net/ethernet/freescale/enetc/enetc.c
@@ -8,21 +8,20 @@
 #include <linux/vmalloc.h>
 #include <net/pkt_sched.h>
 
-/* ENETC overhead: optional extension BD + 1 BD gap */
-#define ENETC_TXBDS_NEEDED(val)	((val) + 2)
-/* max # of chained Tx BDs is 15, including head and extension BD */
-#define ENETC_MAX_SKB_FRAGS	13
-#define ENETC_TXBDS_MAX_NEEDED	ENETC_TXBDS_NEEDED(ENETC_MAX_SKB_FRAGS + 1)
-
 static void enetc_unmap_tx_buff(struct enetc_bdr *tx_ring,
 				struct enetc_tx_swbd *tx_swbd)
 {
+	/* For XDP_TX, pages come from RX, whereas for the other contexts where
+	 * we have is_dma_page_set, those come from skb_frag_dma_map. We need
+	 * to match the DMA mapping length, so we need to differentiate those.
+	 */
 	if (tx_swbd->is_dma_page)
 		dma_unmap_page(tx_ring->dev, tx_swbd->dma,
-			       tx_swbd->len, DMA_TO_DEVICE);
+			       tx_swbd->is_xdp_tx ? PAGE_SIZE : tx_swbd->len,
+			       tx_swbd->dir);
 	else
 		dma_unmap_single(tx_ring->dev, tx_swbd->dma,
-				 tx_swbd->len, DMA_TO_DEVICE);
+				 tx_swbd->len, tx_swbd->dir);
 	tx_swbd->dma = 0;
 }
 
@@ -38,6 +37,13 @@ static void enetc_free_tx_skb(struct enetc_bdr *tx_ring,
 	}
 }
 
+/* Let H/W know BD ring has been updated */
+static void enetc_update_tx_ring_tail(struct enetc_bdr *tx_ring)
+{
+	/* includes wmb() */
+	enetc_wr_reg_hot(tx_ring->tpir, tx_ring->next_to_use);
+}
+
 static int enetc_map_tx_buffs(struct enetc_bdr *tx_ring, struct sk_buff *skb,
 			      int active_offloads)
 {
@@ -68,6 +74,7 @@ static int enetc_map_tx_buffs(struct enetc_bdr *tx_ring, struct sk_buff *skb,
 	tx_swbd->dma = dma;
 	tx_swbd->len = len;
 	tx_swbd->is_dma_page = 0;
+	tx_swbd->dir = DMA_TO_DEVICE;
 	count++;
 
 	do_vlan = skb_vlan_tag_present(skb);
@@ -150,6 +157,7 @@ static int enetc_map_tx_buffs(struct enetc_bdr *tx_ring, struct sk_buff *skb,
 		tx_swbd->dma = dma;
 		tx_swbd->len = len;
 		tx_swbd->is_dma_page = 1;
+		tx_swbd->dir = DMA_TO_DEVICE;
 		count++;
 	}
 
@@ -166,8 +174,7 @@ static int enetc_map_tx_buffs(struct enetc_bdr *tx_ring, struct sk_buff *skb,
 
 	skb_tx_timestamp(skb);
 
-	/* let H/W know BD ring has been updated */
-	enetc_wr_reg_hot(tx_ring->tpir, i); /* includes wmb() */
+	enetc_update_tx_ring_tail(tx_ring);
 
 	return count;
 
@@ -320,6 +327,43 @@ static void enetc_tstamp_tx(struct sk_buff *skb, u64 tstamp)
 	}
 }
 
+static void enetc_recycle_xdp_tx_buff(struct enetc_bdr *tx_ring,
+				      struct enetc_tx_swbd *tx_swbd)
+{
+	struct enetc_ndev_priv *priv = netdev_priv(tx_ring->ndev);
+	struct enetc_bdr *rx_ring = priv->rx_ring[tx_ring->index];
+	struct enetc_rx_swbd rx_swbd = {
+		.dma = tx_swbd->dma,
+		.page = tx_swbd->page,
+		.page_offset = tx_swbd->page_offset,
+		.dir = tx_swbd->dir,
+		.len = tx_swbd->len,
+	};
+
+	if (likely(enetc_swbd_unused(rx_ring))) {
+		enetc_reuse_page(rx_ring, &rx_swbd);
+
+		/* sync for use by the device */
+		dma_sync_single_range_for_device(rx_ring->dev, rx_swbd.dma,
+						 rx_swbd.page_offset,
+						 ENETC_RXB_DMA_SIZE_XDP,
+						 rx_swbd.dir);
+
+		rx_ring->stats.recycles++;
+	} else {
+		/* RX ring is already full, we need to unmap and free the
+		 * page, since there's nothing useful we can do with it.
+		 */
+		rx_ring->stats.recycle_failures++;
+
+		dma_unmap_page(rx_ring->dev, rx_swbd.dma, PAGE_SIZE,
+			       rx_swbd.dir);
+		__free_page(rx_swbd.page);
+	}
+
+	rx_ring->xdp.xdp_tx_in_flight--;
+}
+
 static bool enetc_clean_tx_ring(struct enetc_bdr *tx_ring, int napi_budget)
 {
 	struct net_device *ndev = tx_ring->ndev;
@@ -351,7 +395,9 @@ static bool enetc_clean_tx_ring(struct enetc_bdr *tx_ring, int napi_budget)
 			}
 		}
 
-		if (likely(tx_swbd->dma))
+		if (tx_swbd->is_xdp_tx)
+			enetc_recycle_xdp_tx_buff(tx_ring, tx_swbd);
+		else if (likely(tx_swbd->dma))
 			enetc_unmap_tx_buff(tx_ring, tx_swbd);
 
 		if (tx_swbd->skb) {
@@ -405,6 +451,7 @@ static bool enetc_clean_tx_ring(struct enetc_bdr *tx_ring, int napi_budget)
 static bool enetc_new_page(struct enetc_bdr *rx_ring,
 			   struct enetc_rx_swbd *rx_swbd)
 {
+	bool xdp = !!(rx_ring->xdp.prog);
 	struct page *page;
 	dma_addr_t addr;
 
@@ -412,7 +459,10 @@ static bool enetc_new_page(struct enetc_bdr *rx_ring,
 	if (unlikely(!page))
 		return false;
 
-	addr = dma_map_page(rx_ring->dev, page, 0, PAGE_SIZE, DMA_FROM_DEVICE);
+	/* For XDP_TX, we forgo dma_unmap -> dma_map */
+	rx_swbd->dir = xdp ? DMA_BIDIRECTIONAL : DMA_FROM_DEVICE;
+
+	addr = dma_map_page(rx_ring->dev, page, 0, PAGE_SIZE, rx_swbd->dir);
 	if (unlikely(dma_mapping_error(rx_ring->dev, addr))) {
 		__free_page(page);
 
@@ -536,6 +586,10 @@ static void enetc_get_offloads(struct enetc_bdr *rx_ring,
 #endif
 }
 
+/* This gets called during the non-XDP NAPI poll cycle as well as on XDP_PASS,
+ * so it needs to work with both DMA_FROM_DEVICE as well as DMA_BIDIRECTIONAL
+ * mapped buffers.
+ */
 static struct enetc_rx_swbd *enetc_get_rx_buff(struct enetc_bdr *rx_ring,
 					       int i, u16 size)
 {
@@ -543,7 +597,7 @@ static struct enetc_rx_swbd *enetc_get_rx_buff(struct enetc_bdr *rx_ring,
 
 	dma_sync_single_range_for_cpu(rx_ring->dev, rx_swbd->dma,
 				      rx_swbd->page_offset,
-				      size, DMA_FROM_DEVICE);
+				      size, rx_swbd->dir);
 	return rx_swbd;
 }
 
@@ -561,10 +615,10 @@ static void enetc_put_rx_buff(struct enetc_bdr *rx_ring,
 		/* sync for use by the device */
 		dma_sync_single_range_for_device(rx_ring->dev, rx_swbd->dma,
 						 rx_swbd->page_offset,
-						 buffer_size, DMA_FROM_DEVICE);
+						 buffer_size, rx_swbd->dir);
 	} else {
-		dma_unmap_page(rx_ring->dev, rx_swbd->dma,
-			       PAGE_SIZE, DMA_FROM_DEVICE);
+		dma_unmap_page(rx_ring->dev, rx_swbd->dma, PAGE_SIZE,
+			       rx_swbd->dir);
 	}
 
 	rx_swbd->page = NULL;
@@ -718,6 +772,61 @@ static int enetc_clean_rx_ring(struct enetc_bdr *rx_ring,
 	return rx_frm_cnt;
 }
 
+static void enetc_xdp_map_tx_buff(struct enetc_bdr *tx_ring, int i,
+				  struct enetc_tx_swbd *tx_swbd,
+				  int frm_len)
+{
+	union enetc_tx_bd *txbd = ENETC_TXBD(*tx_ring, i);
+
+	prefetchw(txbd);
+
+	enetc_clear_tx_bd(txbd);
+	txbd->addr = cpu_to_le64(tx_swbd->dma + tx_swbd->page_offset);
+	txbd->buf_len = cpu_to_le16(tx_swbd->len);
+	txbd->frm_len = cpu_to_le16(frm_len);
+
+	memcpy(&tx_ring->tx_swbd[i], tx_swbd, sizeof(*tx_swbd));
+}
+
+/* Puts in the TX ring one XDP frame, mapped as an array of TX software buffer
+ * descriptors.
+ */
+static bool enetc_xdp_tx(struct enetc_bdr *tx_ring,
+			 struct enetc_tx_swbd *xdp_tx_arr, int num_tx_swbd)
+{
+	struct enetc_tx_swbd *tmp_tx_swbd = xdp_tx_arr;
+	int i, k, frm_len = tmp_tx_swbd->len;
+
+	if (unlikely(enetc_bd_unused(tx_ring) < ENETC_TXBDS_NEEDED(num_tx_swbd)))
+		return false;
+
+	while (unlikely(!tmp_tx_swbd->is_eof)) {
+		tmp_tx_swbd++;
+		frm_len += tmp_tx_swbd->len;
+	}
+
+	i = tx_ring->next_to_use;
+
+	for (k = 0; k < num_tx_swbd; k++) {
+		struct enetc_tx_swbd *xdp_tx_swbd = &xdp_tx_arr[k];
+
+		enetc_xdp_map_tx_buff(tx_ring, i, xdp_tx_swbd, frm_len);
+
+		/* last BD needs 'F' bit set */
+		if (xdp_tx_swbd->is_eof) {
+			union enetc_tx_bd *txbd = ENETC_TXBD(*tx_ring, i);
+
+			txbd->flags = ENETC_TXBD_FLAGS_F;
+		}
+
+		enetc_bdr_idx_inc(tx_ring, &i);
+	}
+
+	tx_ring->next_to_use = i;
+
+	return true;
+}
+
 static void enetc_map_rx_buff_to_xdp(struct enetc_bdr *rx_ring, int i,
 				     struct xdp_buff *xdp_buff, u16 size)
 {
@@ -725,6 +834,9 @@ static void enetc_map_rx_buff_to_xdp(struct enetc_bdr *rx_ring, int i,
 	void *hard_start = page_address(rx_swbd->page) + rx_swbd->page_offset;
 	struct skb_shared_info *shinfo;
 
+	/* To be used for XDP_TX */
+	rx_swbd->len = size;
+
 	xdp_prepare_buff(xdp_buff, hard_start - rx_ring->buffer_offset,
 			 rx_ring->buffer_offset, size, false);
 
@@ -739,6 +851,9 @@ static void enetc_add_rx_buff_to_xdp(struct enetc_bdr *rx_ring, int i,
 	struct enetc_rx_swbd *rx_swbd = enetc_get_rx_buff(rx_ring, i, size);
 	skb_frag_t *frag = &shinfo->frags[shinfo->nr_frags];
 
+	/* To be used for XDP_TX */
+	rx_swbd->len = size;
+
 	skb_frag_off_set(frag, rx_swbd->page_offset);
 	skb_frag_size_set(frag, size);
 	__skb_frag_set_page(frag, rx_swbd->page);
@@ -780,15 +895,48 @@ static void enetc_put_xdp_buff(struct enetc_bdr *rx_ring,
 {
 	enetc_reuse_page(rx_ring, rx_swbd);
 
-	/* sync for use by the device */
 	dma_sync_single_range_for_device(rx_ring->dev, rx_swbd->dma,
 					 rx_swbd->page_offset,
 					 ENETC_RXB_DMA_SIZE_XDP,
-					 DMA_FROM_DEVICE);
+					 rx_swbd->dir);
 
 	rx_swbd->page = NULL;
 }
 
+/* Convert RX buffer descriptors to TX buffer descriptors. These will be
+ * recycled back into the RX ring in enetc_clean_tx_ring. We need to scrub the
+ * RX software BDs because the ownership of the buffer no longer belongs to the
+ * RX ring, so enetc_refill_rx_ring may not reuse rx_swbd->page.
+ */
+static int enetc_rx_swbd_to_xdp_tx_swbd(struct enetc_tx_swbd *xdp_tx_arr,
+					struct enetc_bdr *rx_ring,
+					int rx_ring_first, int rx_ring_last)
+{
+	int n = 0;
+
+	for (; rx_ring_first != rx_ring_last;
+	     n++, enetc_bdr_idx_inc(rx_ring, &rx_ring_first)) {
+		struct enetc_rx_swbd *rx_swbd = &rx_ring->rx_swbd[rx_ring_first];
+		struct enetc_tx_swbd *tx_swbd = &xdp_tx_arr[n];
+
+		/* No need to dma_map, we already have DMA_BIDIRECTIONAL */
+		tx_swbd->dma = rx_swbd->dma;
+		tx_swbd->dir = rx_swbd->dir;
+		tx_swbd->page = rx_swbd->page;
+		tx_swbd->page_offset = rx_swbd->page_offset;
+		tx_swbd->len = rx_swbd->len;
+		tx_swbd->is_dma_page = true;
+		tx_swbd->is_xdp_tx = true;
+		tx_swbd->is_eof = false;
+		memset(rx_swbd, 0, sizeof(*rx_swbd));
+	}
+
+	/* We rely on caller providing an rx_ring_last > rx_ring_first */
+	xdp_tx_arr[n - 1].is_eof = true;
+
+	return n;
+}
+
 static void enetc_xdp_drop(struct enetc_bdr *rx_ring, int rx_ring_first,
 			   int rx_ring_last)
 {
@@ -804,6 +952,10 @@ static int enetc_clean_rx_ring_xdp(struct enetc_bdr *rx_ring,
 				   struct napi_struct *napi, int work_limit,
 				   struct bpf_prog *prog)
 {
+	struct enetc_tx_swbd xdp_tx_arr[ENETC_MAX_SKB_FRAGS] = {0};
+	struct enetc_ndev_priv *priv = netdev_priv(rx_ring->ndev);
+	struct enetc_bdr *tx_ring = priv->tx_ring[rx_ring->index];
+	int xdp_tx_bd_cnt, xdp_tx_frm_cnt = 0;
 	int rx_frm_cnt = 0, rx_byte_cnt = 0;
 	int cleaned_cnt, i;
 	u32 xdp_act;
@@ -819,10 +971,6 @@ static int enetc_clean_rx_ring_xdp(struct enetc_bdr *rx_ring,
 		struct sk_buff *skb;
 		u32 bd_status;
 
-		if (cleaned_cnt >= ENETC_RXBD_BUNDLE)
-			cleaned_cnt -= enetc_refill_rx_ring(rx_ring,
-							    cleaned_cnt);
-
 		rxbd = enetc_rxbd(rx_ring, i);
 		bd_status = le32_to_cpu(rxbd->r.lstatus);
 		if (!bd_status)
@@ -865,6 +1013,20 @@ static int enetc_clean_rx_ring_xdp(struct enetc_bdr *rx_ring,
 
 			napi_gro_receive(napi, skb);
 			break;
+		case XDP_TX:
+			xdp_tx_bd_cnt = enetc_rx_swbd_to_xdp_tx_swbd(xdp_tx_arr,
+								     rx_ring,
+								     orig_i, i);
+
+			if (!enetc_xdp_tx(tx_ring, xdp_tx_arr, xdp_tx_bd_cnt)) {
+				enetc_xdp_drop(rx_ring, orig_i, i);
+				tx_ring->stats.xdp_tx_drops++;
+			} else {
+				tx_ring->stats.xdp_tx += xdp_tx_bd_cnt;
+				rx_ring->xdp.xdp_tx_in_flight += xdp_tx_bd_cnt;
+				xdp_tx_frm_cnt++;
+			}
+			break;
 		default:
 			bpf_warn_invalid_xdp_action(xdp_act);
 		}
@@ -877,6 +1039,13 @@ static int enetc_clean_rx_ring_xdp(struct enetc_bdr *rx_ring,
 	rx_ring->stats.packets += rx_frm_cnt;
 	rx_ring->stats.bytes += rx_byte_cnt;
 
+	if (xdp_tx_frm_cnt)
+		enetc_update_tx_ring_tail(tx_ring);
+
+	if (cleaned_cnt > rx_ring->xdp.xdp_tx_in_flight)
+		enetc_refill_rx_ring(rx_ring, enetc_bd_unused(rx_ring) -
+				     rx_ring->xdp.xdp_tx_in_flight);
+
 	return rx_frm_cnt;
 }
 
@@ -1141,8 +1310,8 @@ static void enetc_free_rx_ring(struct enetc_bdr *rx_ring)
 		if (!rx_swbd->page)
 			continue;
 
-		dma_unmap_page(rx_ring->dev, rx_swbd->dma,
-			       PAGE_SIZE, DMA_FROM_DEVICE);
+		dma_unmap_page(rx_ring->dev, rx_swbd->dma, PAGE_SIZE,
+			       rx_swbd->dir);
 		__free_page(rx_swbd->page);
 		rx_swbd->page = NULL;
 	}
diff --git a/drivers/net/ethernet/freescale/enetc/enetc.h b/drivers/net/ethernet/freescale/enetc/enetc.h
index 5815addfe966..864da962ae21 100644
--- a/drivers/net/ethernet/freescale/enetc/enetc.h
+++ b/drivers/net/ethernet/freescale/enetc/enetc.h
@@ -21,11 +21,15 @@
 struct enetc_tx_swbd {
 	struct sk_buff *skb;
 	dma_addr_t dma;
+	struct page *page;	/* valid only if is_xdp_tx */
+	u16 page_offset;	/* valid only if is_xdp_tx */
 	u16 len;
+	enum dma_data_direction dir;
 	u8 is_dma_page:1;
 	u8 check_wb:1;
 	u8 do_tstamp:1;
 	u8 is_eof:1;
+	u8 is_xdp_tx:1;
 };
 
 #define ENETC_RX_MAXFRM_SIZE	ENETC_MAC_MAXFRM_SIZE
@@ -40,18 +44,31 @@ struct enetc_rx_swbd {
 	dma_addr_t dma;
 	struct page *page;
 	u16 page_offset;
+	enum dma_data_direction dir;
+	u16 len;
 };
 
+/* ENETC overhead: optional extension BD + 1 BD gap */
+#define ENETC_TXBDS_NEEDED(val)	((val) + 2)
+/* max # of chained Tx BDs is 15, including head and extension BD */
+#define ENETC_MAX_SKB_FRAGS	13
+#define ENETC_TXBDS_MAX_NEEDED	ENETC_TXBDS_NEEDED(ENETC_MAX_SKB_FRAGS + 1)
+
 struct enetc_ring_stats {
 	unsigned int packets;
 	unsigned int bytes;
 	unsigned int rx_alloc_errs;
 	unsigned int xdp_drops;
+	unsigned int xdp_tx;
+	unsigned int xdp_tx_drops;
+	unsigned int recycles;
+	unsigned int recycle_failures;
 };
 
 struct enetc_xdp_data {
 	struct xdp_rxq_info rxq;
 	struct bpf_prog *prog;
+	int xdp_tx_in_flight;
 };
 
 #define ENETC_RX_RING_DEFAULT_SIZE	512
@@ -104,6 +121,14 @@ static inline int enetc_bd_unused(struct enetc_bdr *bdr)
 	return bdr->bd_count + bdr->next_to_clean - bdr->next_to_use - 1;
 }
 
+static inline int enetc_swbd_unused(struct enetc_bdr *bdr)
+{
+	if (bdr->next_to_clean > bdr->next_to_alloc)
+		return bdr->next_to_clean - bdr->next_to_alloc - 1;
+
+	return bdr->bd_count + bdr->next_to_clean - bdr->next_to_alloc - 1;
+}
+
 /* Control BD ring */
 #define ENETC_CBDR_DEFAULT_SIZE	64
 struct enetc_cbdr {
diff --git a/drivers/net/ethernet/freescale/enetc/enetc_ethtool.c b/drivers/net/ethernet/freescale/enetc/enetc_ethtool.c
index 0183c13f8c1e..37821a8b225e 100644
--- a/drivers/net/ethernet/freescale/enetc/enetc_ethtool.c
+++ b/drivers/net/ethernet/freescale/enetc/enetc_ethtool.c
@@ -193,10 +193,14 @@ static const char rx_ring_stats[][ETH_GSTRING_LEN] = {
 	"Rx ring %2d frames",
 	"Rx ring %2d alloc errors",
 	"Rx ring %2d XDP drops",
+	"Rx ring %2d recycles",
+	"Rx ring %2d recycle failures",
 };
 
 static const char tx_ring_stats[][ETH_GSTRING_LEN] = {
 	"Tx ring %2d frames",
+	"Tx ring %2d XDP frames",
+	"Tx ring %2d XDP drops",
 };
 
 static int enetc_get_sset_count(struct net_device *ndev, int sset)
@@ -268,13 +272,18 @@ static void enetc_get_ethtool_stats(struct net_device *ndev,
 	for (i = 0; i < ARRAY_SIZE(enetc_si_counters); i++)
 		data[o++] = enetc_rd64(hw, enetc_si_counters[i].reg);
 
-	for (i = 0; i < priv->num_tx_rings; i++)
+	for (i = 0; i < priv->num_tx_rings; i++) {
 		data[o++] = priv->tx_ring[i]->stats.packets;
+		data[o++] = priv->tx_ring[i]->stats.xdp_tx;
+		data[o++] = priv->tx_ring[i]->stats.xdp_tx_drops;
+	}
 
 	for (i = 0; i < priv->num_rx_rings; i++) {
 		data[o++] = priv->rx_ring[i]->stats.packets;
 		data[o++] = priv->rx_ring[i]->stats.rx_alloc_errs;
 		data[o++] = priv->rx_ring[i]->stats.xdp_drops;
+		data[o++] = priv->rx_ring[i]->stats.recycles;
+		data[o++] = priv->rx_ring[i]->stats.recycle_failures;
 	}
 
 	if (!enetc_si_is_pf(priv->si))
-- 
2.25.1


^ permalink raw reply	[flat|nested] 23+ messages in thread

* [PATCH net-next 8/9] net: enetc: increase RX ring default size
  2021-03-31 20:08 [PATCH net-next 0/9] XDP for NXP ENETC Vladimir Oltean
                   ` (6 preceding siblings ...)
  2021-03-31 20:08 ` [PATCH net-next 7/9] net: enetc: add support for XDP_TX Vladimir Oltean
@ 2021-03-31 20:08 ` Vladimir Oltean
  2021-03-31 20:08 ` [PATCH net-next 9/9] net: enetc: add support for XDP_REDIRECT Vladimir Oltean
  2021-03-31 22:20 ` [PATCH net-next 0/9] XDP for NXP ENETC patchwork-bot+netdevbpf
  9 siblings, 0 replies; 23+ messages in thread
From: Vladimir Oltean @ 2021-03-31 20:08 UTC (permalink / raw)
  To: Jakub Kicinski, Alexei Starovoitov, Daniel Borkmann,
	Jesper Dangaard Brouer, John Fastabend, Andrii Nakryiko,
	Martin KaFai Lau, Song Liu, Yonghong Song, KP Singh,
	David S. Miller, netdev, bpf
  Cc: Alexander Duyck, Ioana Ciornei, Alex Marginean, Claudiu Manoil,
	Ilias Apalodimas, Vladimir Oltean

From: Vladimir Oltean <vladimir.oltean@nxp.com>

As explained in the XDP_TX patch, when receiving a burst of frames with
the XDP_TX verdict, there is a momentary dip in the number of available
RX buffers. The system will eventually recover as TX completions will
start kicking in and refilling our RX BD ring again. But until that
happens, we need to survive with as few out-of-buffer discards as
possible.

This increases the memory footprint of the driver in order to avoid
discards at 2.5Gbps line rate 64B packet sizes, the maximum speed
available for testing on 1 port on NXP LS1028A.

Signed-off-by: Vladimir Oltean <vladimir.oltean@nxp.com>
---
 drivers/net/ethernet/freescale/enetc/enetc.h | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/drivers/net/ethernet/freescale/enetc/enetc.h b/drivers/net/ethernet/freescale/enetc/enetc.h
index 864da962ae21..d0619fcbbe97 100644
--- a/drivers/net/ethernet/freescale/enetc/enetc.h
+++ b/drivers/net/ethernet/freescale/enetc/enetc.h
@@ -71,7 +71,7 @@ struct enetc_xdp_data {
 	int xdp_tx_in_flight;
 };
 
-#define ENETC_RX_RING_DEFAULT_SIZE	512
+#define ENETC_RX_RING_DEFAULT_SIZE	2048
 #define ENETC_TX_RING_DEFAULT_SIZE	256
 #define ENETC_DEFAULT_TX_WORK		(ENETC_TX_RING_DEFAULT_SIZE / 2)
 
-- 
2.25.1


^ permalink raw reply	[flat|nested] 23+ messages in thread

* [PATCH net-next 9/9] net: enetc: add support for XDP_REDIRECT
  2021-03-31 20:08 [PATCH net-next 0/9] XDP for NXP ENETC Vladimir Oltean
                   ` (7 preceding siblings ...)
  2021-03-31 20:08 ` [PATCH net-next 8/9] net: enetc: increase RX ring default size Vladimir Oltean
@ 2021-03-31 20:08 ` Vladimir Oltean
  2021-04-01 11:26   ` Toke Høiland-Jørgensen
  2021-03-31 22:20 ` [PATCH net-next 0/9] XDP for NXP ENETC patchwork-bot+netdevbpf
  9 siblings, 1 reply; 23+ messages in thread
From: Vladimir Oltean @ 2021-03-31 20:08 UTC (permalink / raw)
  To: Jakub Kicinski, Alexei Starovoitov, Daniel Borkmann,
	Jesper Dangaard Brouer, John Fastabend, Andrii Nakryiko,
	Martin KaFai Lau, Song Liu, Yonghong Song, KP Singh,
	David S. Miller, netdev, bpf
  Cc: Alexander Duyck, Ioana Ciornei, Alex Marginean, Claudiu Manoil,
	Ilias Apalodimas, Vladimir Oltean

From: Vladimir Oltean <vladimir.oltean@nxp.com>

The driver implementation of the XDP_REDIRECT action reuses parts from
XDP_TX, most notably the enetc_xdp_tx function which transmits an array
of TX software BDs. Only this time, the buffers don't have DMA mappings,
we need to create them.

When a BPF program reaches the XDP_REDIRECT verdict for a frame, we can
employ the same buffer reuse strategy as for the normal processing path
and for XDP_PASS: we can flip to the other page half and seed that to
the RX ring.

Note that scatter/gather support is there, but disabled due to lack of
multi-buffer support in XDP (which is added by this series):
https://patchwork.kernel.org/project/netdevbpf/cover/cover.1616179034.git.lorenzo@kernel.org/

Signed-off-by: Vladimir Oltean <vladimir.oltean@nxp.com>
---
 drivers/net/ethernet/freescale/enetc/enetc.c  | 212 +++++++++++++++++-
 drivers/net/ethernet/freescale/enetc/enetc.h  |  11 +-
 .../ethernet/freescale/enetc/enetc_ethtool.c  |   6 +
 .../net/ethernet/freescale/enetc/enetc_pf.c   |   1 +
 4 files changed, 218 insertions(+), 12 deletions(-)

diff --git a/drivers/net/ethernet/freescale/enetc/enetc.c b/drivers/net/ethernet/freescale/enetc/enetc.c
index ba5313a5d7a4..57049ae97201 100644
--- a/drivers/net/ethernet/freescale/enetc/enetc.c
+++ b/drivers/net/ethernet/freescale/enetc/enetc.c
@@ -8,6 +8,23 @@
 #include <linux/vmalloc.h>
 #include <net/pkt_sched.h>
 
+static struct sk_buff *enetc_tx_swbd_get_skb(struct enetc_tx_swbd *tx_swbd)
+{
+	if (tx_swbd->is_xdp_tx || tx_swbd->is_xdp_redirect)
+		return NULL;
+
+	return tx_swbd->skb;
+}
+
+static struct xdp_frame *
+enetc_tx_swbd_get_xdp_frame(struct enetc_tx_swbd *tx_swbd)
+{
+	if (tx_swbd->is_xdp_redirect)
+		return tx_swbd->xdp_frame;
+
+	return NULL;
+}
+
 static void enetc_unmap_tx_buff(struct enetc_bdr *tx_ring,
 				struct enetc_tx_swbd *tx_swbd)
 {
@@ -25,14 +42,20 @@ static void enetc_unmap_tx_buff(struct enetc_bdr *tx_ring,
 	tx_swbd->dma = 0;
 }
 
-static void enetc_free_tx_skb(struct enetc_bdr *tx_ring,
-			      struct enetc_tx_swbd *tx_swbd)
+static void enetc_free_tx_frame(struct enetc_bdr *tx_ring,
+				struct enetc_tx_swbd *tx_swbd)
 {
+	struct xdp_frame *xdp_frame = enetc_tx_swbd_get_xdp_frame(tx_swbd);
+	struct sk_buff *skb = enetc_tx_swbd_get_skb(tx_swbd);
+
 	if (tx_swbd->dma)
 		enetc_unmap_tx_buff(tx_ring, tx_swbd);
 
-	if (tx_swbd->skb) {
-		dev_kfree_skb_any(tx_swbd->skb);
+	if (xdp_frame) {
+		xdp_return_frame(tx_swbd->xdp_frame);
+		tx_swbd->xdp_frame = NULL;
+	} else if (skb) {
+		dev_kfree_skb_any(skb);
 		tx_swbd->skb = NULL;
 	}
 }
@@ -183,7 +206,7 @@ static int enetc_map_tx_buffs(struct enetc_bdr *tx_ring, struct sk_buff *skb,
 
 	do {
 		tx_swbd = &tx_ring->tx_swbd[i];
-		enetc_free_tx_skb(tx_ring, tx_swbd);
+		enetc_free_tx_frame(tx_ring, tx_swbd);
 		if (i == 0)
 			i = tx_ring->bd_count;
 		i--;
@@ -381,6 +404,9 @@ static bool enetc_clean_tx_ring(struct enetc_bdr *tx_ring, int napi_budget)
 	do_tstamp = false;
 
 	while (bds_to_clean && tx_frm_cnt < ENETC_DEFAULT_TX_WORK) {
+		struct xdp_frame *xdp_frame = enetc_tx_swbd_get_xdp_frame(tx_swbd);
+		struct sk_buff *skb = enetc_tx_swbd_get_skb(tx_swbd);
+
 		if (unlikely(tx_swbd->check_wb)) {
 			struct enetc_ndev_priv *priv = netdev_priv(ndev);
 			union enetc_tx_bd *txbd;
@@ -400,12 +426,15 @@ static bool enetc_clean_tx_ring(struct enetc_bdr *tx_ring, int napi_budget)
 		else if (likely(tx_swbd->dma))
 			enetc_unmap_tx_buff(tx_ring, tx_swbd);
 
-		if (tx_swbd->skb) {
+		if (xdp_frame) {
+			xdp_return_frame(xdp_frame);
+			tx_swbd->xdp_frame = NULL;
+		} else if (skb) {
 			if (unlikely(do_tstamp)) {
-				enetc_tstamp_tx(tx_swbd->skb, tstamp);
+				enetc_tstamp_tx(skb, tstamp);
 				do_tstamp = false;
 			}
-			napi_consume_skb(tx_swbd->skb, napi_budget);
+			napi_consume_skb(skb, napi_budget);
 			tx_swbd->skb = NULL;
 		}
 
@@ -827,6 +856,109 @@ static bool enetc_xdp_tx(struct enetc_bdr *tx_ring,
 	return true;
 }
 
+static int enetc_xdp_frame_to_xdp_tx_swbd(struct enetc_bdr *tx_ring,
+					  struct enetc_tx_swbd *xdp_tx_arr,
+					  struct xdp_frame *xdp_frame)
+{
+	struct enetc_tx_swbd *xdp_tx_swbd = &xdp_tx_arr[0];
+	struct skb_shared_info *shinfo;
+	void *data = xdp_frame->data;
+	int len = xdp_frame->len;
+	skb_frag_t *frag;
+	dma_addr_t dma;
+	unsigned int f;
+	int n = 0;
+
+	dma = dma_map_single(tx_ring->dev, data, len, DMA_TO_DEVICE);
+	if (unlikely(dma_mapping_error(tx_ring->dev, dma))) {
+		netdev_err(tx_ring->ndev, "DMA map error\n");
+		return -1;
+	}
+
+	xdp_tx_swbd->dma = dma;
+	xdp_tx_swbd->dir = DMA_TO_DEVICE;
+	xdp_tx_swbd->len = len;
+	xdp_tx_swbd->is_xdp_redirect = true;
+	xdp_tx_swbd->is_eof = false;
+	xdp_tx_swbd->xdp_frame = NULL;
+
+	n++;
+	xdp_tx_swbd = &xdp_tx_arr[n];
+
+	shinfo = xdp_get_shared_info_from_frame(xdp_frame);
+
+	for (f = 0, frag = &shinfo->frags[0]; f < shinfo->nr_frags;
+	     f++, frag++) {
+		data = skb_frag_address(frag);
+		len = skb_frag_size(frag);
+
+		dma = dma_map_single(tx_ring->dev, data, len, DMA_TO_DEVICE);
+		if (unlikely(dma_mapping_error(tx_ring->dev, dma))) {
+			/* Undo the DMA mapping for all fragments */
+			while (n-- >= 0)
+				enetc_unmap_tx_buff(tx_ring, &xdp_tx_arr[n]);
+
+			netdev_err(tx_ring->ndev, "DMA map error\n");
+			return -1;
+		}
+
+		xdp_tx_swbd->dma = dma;
+		xdp_tx_swbd->dir = DMA_TO_DEVICE;
+		xdp_tx_swbd->len = len;
+		xdp_tx_swbd->is_xdp_redirect = true;
+		xdp_tx_swbd->is_eof = false;
+		xdp_tx_swbd->xdp_frame = NULL;
+
+		n++;
+		xdp_tx_swbd = &xdp_tx_arr[n];
+	}
+
+	xdp_tx_arr[n - 1].is_eof = true;
+	xdp_tx_arr[n - 1].xdp_frame = xdp_frame;
+
+	return n;
+}
+
+int enetc_xdp_xmit(struct net_device *ndev, int num_frames,
+		   struct xdp_frame **frames, u32 flags)
+{
+	struct enetc_tx_swbd xdp_redirect_arr[ENETC_MAX_SKB_FRAGS] = {0};
+	struct enetc_ndev_priv *priv = netdev_priv(ndev);
+	struct enetc_bdr *tx_ring;
+	int xdp_tx_bd_cnt, i, k;
+	int xdp_tx_frm_cnt = 0;
+
+	tx_ring = priv->tx_ring[smp_processor_id()];
+
+	prefetchw(ENETC_TXBD(*tx_ring, tx_ring->next_to_use));
+
+	for (k = 0; k < num_frames; k++) {
+		xdp_tx_bd_cnt = enetc_xdp_frame_to_xdp_tx_swbd(tx_ring,
+							       xdp_redirect_arr,
+							       frames[k]);
+		if (unlikely(xdp_tx_bd_cnt < 0))
+			break;
+
+		if (unlikely(!enetc_xdp_tx(tx_ring, xdp_redirect_arr,
+					   xdp_tx_bd_cnt))) {
+			for (i = 0; i < xdp_tx_bd_cnt; i++)
+				enetc_unmap_tx_buff(tx_ring,
+						    &xdp_redirect_arr[i]);
+			tx_ring->stats.xdp_tx_drops++;
+			break;
+		}
+
+		xdp_tx_frm_cnt++;
+	}
+
+	if (unlikely((flags & XDP_XMIT_FLUSH) || k != xdp_tx_frm_cnt))
+		enetc_update_tx_ring_tail(tx_ring);
+
+	tx_ring->stats.xdp_tx += xdp_tx_frm_cnt;
+
+	return xdp_tx_frm_cnt;
+}
+
 static void enetc_map_rx_buff_to_xdp(struct enetc_bdr *rx_ring, int i,
 				     struct xdp_buff *xdp_buff, u16 size)
 {
@@ -948,14 +1080,31 @@ static void enetc_xdp_drop(struct enetc_bdr *rx_ring, int rx_ring_first,
 	rx_ring->stats.xdp_drops++;
 }
 
+static void enetc_xdp_free(struct enetc_bdr *rx_ring, int rx_ring_first,
+			   int rx_ring_last)
+{
+	while (rx_ring_first != rx_ring_last) {
+		struct enetc_rx_swbd *rx_swbd = &rx_ring->rx_swbd[rx_ring_first];
+
+		if (rx_swbd->page) {
+			dma_unmap_page(rx_ring->dev, rx_swbd->dma, PAGE_SIZE,
+				       rx_swbd->dir);
+			__free_page(rx_swbd->page);
+			rx_swbd->page = NULL;
+		}
+		enetc_bdr_idx_inc(rx_ring, &rx_ring_first);
+	}
+	rx_ring->stats.xdp_redirect_failures++;
+}
+
 static int enetc_clean_rx_ring_xdp(struct enetc_bdr *rx_ring,
 				   struct napi_struct *napi, int work_limit,
 				   struct bpf_prog *prog)
 {
+	int xdp_tx_bd_cnt, xdp_tx_frm_cnt = 0, xdp_redirect_frm_cnt = 0;
 	struct enetc_tx_swbd xdp_tx_arr[ENETC_MAX_SKB_FRAGS] = {0};
 	struct enetc_ndev_priv *priv = netdev_priv(rx_ring->ndev);
 	struct enetc_bdr *tx_ring = priv->tx_ring[rx_ring->index];
-	int xdp_tx_bd_cnt, xdp_tx_frm_cnt = 0;
 	int rx_frm_cnt = 0, rx_byte_cnt = 0;
 	int cleaned_cnt, i;
 	u32 xdp_act;
@@ -969,6 +1118,7 @@ static int enetc_clean_rx_ring_xdp(struct enetc_bdr *rx_ring,
 		int orig_i, orig_cleaned_cnt;
 		struct xdp_buff xdp_buff;
 		struct sk_buff *skb;
+		int tmp_orig_i, err;
 		u32 bd_status;
 
 		rxbd = enetc_rxbd(rx_ring, i);
@@ -1026,6 +1176,43 @@ static int enetc_clean_rx_ring_xdp(struct enetc_bdr *rx_ring,
 				rx_ring->xdp.xdp_tx_in_flight += xdp_tx_bd_cnt;
 				xdp_tx_frm_cnt++;
 			}
+			break;
+		case XDP_REDIRECT:
+			/* xdp_return_frame does not support S/G in the sense
+			 * that it leaks the fragments (__xdp_return should not
+			 * call page_frag_free only for the initial buffer).
+			 * Until XDP_REDIRECT gains support for S/G let's keep
+			 * the code structure in place, but dead. We drop the
+			 * S/G frames ourselves to avoid memory leaks which
+			 * would otherwise leave the kernel OOM.
+			 */
+			if (unlikely(cleaned_cnt - orig_cleaned_cnt != 1)) {
+				enetc_xdp_drop(rx_ring, orig_i, i);
+				rx_ring->stats.xdp_redirect_sg++;
+				break;
+			}
+
+			tmp_orig_i = orig_i;
+
+			while (orig_i != i) {
+				enetc_put_rx_buff(rx_ring,
+						  &rx_ring->rx_swbd[orig_i]);
+				enetc_bdr_idx_inc(rx_ring, &orig_i);
+			}
+
+			err = xdp_do_redirect(rx_ring->ndev, &xdp_buff, prog);
+			if (unlikely(err)) {
+				enetc_xdp_free(rx_ring, tmp_orig_i, i);
+			} else {
+				xdp_redirect_frm_cnt++;
+				rx_ring->stats.xdp_redirect++;
+			}
+
+			if (unlikely(xdp_redirect_frm_cnt > ENETC_DEFAULT_TX_WORK)) {
+				xdp_do_flush_map();
+				xdp_redirect_frm_cnt = 0;
+			}
+
 			break;
 		default:
 			bpf_warn_invalid_xdp_action(xdp_act);
@@ -1039,6 +1226,9 @@ static int enetc_clean_rx_ring_xdp(struct enetc_bdr *rx_ring,
 	rx_ring->stats.packets += rx_frm_cnt;
 	rx_ring->stats.bytes += rx_byte_cnt;
 
+	if (xdp_redirect_frm_cnt)
+		xdp_do_flush_map();
+
 	if (xdp_tx_frm_cnt)
 		enetc_update_tx_ring_tail(tx_ring);
 
@@ -1173,7 +1363,7 @@ static void enetc_free_txbdr(struct enetc_bdr *txr)
 	int size, i;
 
 	for (i = 0; i < txr->bd_count; i++)
-		enetc_free_tx_skb(txr, &txr->tx_swbd[i]);
+		enetc_free_tx_frame(txr, &txr->tx_swbd[i]);
 
 	size = txr->bd_count * sizeof(union enetc_tx_bd);
 
@@ -1290,7 +1480,7 @@ static void enetc_free_tx_ring(struct enetc_bdr *tx_ring)
 	for (i = 0; i < tx_ring->bd_count; i++) {
 		struct enetc_tx_swbd *tx_swbd = &tx_ring->tx_swbd[i];
 
-		enetc_free_tx_skb(tx_ring, tx_swbd);
+		enetc_free_tx_frame(tx_ring, tx_swbd);
 	}
 
 	tx_ring->next_to_clean = 0;
diff --git a/drivers/net/ethernet/freescale/enetc/enetc.h b/drivers/net/ethernet/freescale/enetc/enetc.h
index d0619fcbbe97..05474f46b0d9 100644
--- a/drivers/net/ethernet/freescale/enetc/enetc.h
+++ b/drivers/net/ethernet/freescale/enetc/enetc.h
@@ -19,7 +19,10 @@
 				(ETH_FCS_LEN + ETH_HLEN + VLAN_HLEN))
 
 struct enetc_tx_swbd {
-	struct sk_buff *skb;
+	union {
+		struct sk_buff *skb;
+		struct xdp_frame *xdp_frame;
+	};
 	dma_addr_t dma;
 	struct page *page;	/* valid only if is_xdp_tx */
 	u16 page_offset;	/* valid only if is_xdp_tx */
@@ -30,6 +33,7 @@ struct enetc_tx_swbd {
 	u8 do_tstamp:1;
 	u8 is_eof:1;
 	u8 is_xdp_tx:1;
+	u8 is_xdp_redirect:1;
 };
 
 #define ENETC_RX_MAXFRM_SIZE	ENETC_MAC_MAXFRM_SIZE
@@ -61,6 +65,9 @@ struct enetc_ring_stats {
 	unsigned int xdp_drops;
 	unsigned int xdp_tx;
 	unsigned int xdp_tx_drops;
+	unsigned int xdp_redirect;
+	unsigned int xdp_redirect_failures;
+	unsigned int xdp_redirect_sg;
 	unsigned int recycles;
 	unsigned int recycle_failures;
 };
@@ -354,6 +361,8 @@ int enetc_ioctl(struct net_device *ndev, struct ifreq *rq, int cmd);
 int enetc_setup_tc(struct net_device *ndev, enum tc_setup_type type,
 		   void *type_data);
 int enetc_setup_bpf(struct net_device *dev, struct netdev_bpf *xdp);
+int enetc_xdp_xmit(struct net_device *ndev, int num_frames,
+		   struct xdp_frame **frames, u32 flags);
 
 /* ethtool */
 void enetc_set_ethtool_ops(struct net_device *ndev);
diff --git a/drivers/net/ethernet/freescale/enetc/enetc_ethtool.c b/drivers/net/ethernet/freescale/enetc/enetc_ethtool.c
index 37821a8b225e..7cc81b453bd7 100644
--- a/drivers/net/ethernet/freescale/enetc/enetc_ethtool.c
+++ b/drivers/net/ethernet/freescale/enetc/enetc_ethtool.c
@@ -195,6 +195,9 @@ static const char rx_ring_stats[][ETH_GSTRING_LEN] = {
 	"Rx ring %2d XDP drops",
 	"Rx ring %2d recycles",
 	"Rx ring %2d recycle failures",
+	"Rx ring %2d redirects",
+	"Rx ring %2d redirect failures",
+	"Rx ring %2d redirect S/G",
 };
 
 static const char tx_ring_stats[][ETH_GSTRING_LEN] = {
@@ -284,6 +287,9 @@ static void enetc_get_ethtool_stats(struct net_device *ndev,
 		data[o++] = priv->rx_ring[i]->stats.xdp_drops;
 		data[o++] = priv->rx_ring[i]->stats.recycles;
 		data[o++] = priv->rx_ring[i]->stats.recycle_failures;
+		data[o++] = priv->rx_ring[i]->stats.xdp_redirect;
+		data[o++] = priv->rx_ring[i]->stats.xdp_redirect_failures;
+		data[o++] = priv->rx_ring[i]->stats.xdp_redirect_sg;
 	}
 
 	if (!enetc_si_is_pf(priv->si))
diff --git a/drivers/net/ethernet/freescale/enetc/enetc_pf.c b/drivers/net/ethernet/freescale/enetc/enetc_pf.c
index 0484dbe13422..f61fedf462e5 100644
--- a/drivers/net/ethernet/freescale/enetc/enetc_pf.c
+++ b/drivers/net/ethernet/freescale/enetc/enetc_pf.c
@@ -708,6 +708,7 @@ static const struct net_device_ops enetc_ndev_ops = {
 	.ndo_do_ioctl		= enetc_ioctl,
 	.ndo_setup_tc		= enetc_setup_tc,
 	.ndo_bpf		= enetc_setup_bpf,
+	.ndo_xdp_xmit		= enetc_xdp_xmit,
 };
 
 static void enetc_pf_netdev_setup(struct enetc_si *si, struct net_device *ndev,
-- 
2.25.1


^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH net-next 0/9] XDP for NXP ENETC
  2021-03-31 20:08 [PATCH net-next 0/9] XDP for NXP ENETC Vladimir Oltean
                   ` (8 preceding siblings ...)
  2021-03-31 20:08 ` [PATCH net-next 9/9] net: enetc: add support for XDP_REDIRECT Vladimir Oltean
@ 2021-03-31 22:20 ` patchwork-bot+netdevbpf
  2021-03-31 22:55   ` Vladimir Oltean
  9 siblings, 1 reply; 23+ messages in thread
From: patchwork-bot+netdevbpf @ 2021-03-31 22:20 UTC (permalink / raw)
  To: Vladimir Oltean
  Cc: kuba, ast, daniel, hawk, john.fastabend, andrii, kafai,
	songliubraving, yhs, kpsingh, davem, netdev, bpf,
	alexander.duyck, ioana.ciornei, alexandru.marginean,
	claudiu.manoil, ilias.apalodimas, vladimir.oltean

Hello:

This series was applied to netdev/net-next.git (refs/heads/master):

On Wed, 31 Mar 2021 23:08:48 +0300 you wrote:
> From: Vladimir Oltean <vladimir.oltean@nxp.com>
> 
> This series adds support to the enetc driver for the basic XDP primitives.
> The ENETC is a network controller found inside the NXP LS1028A SoC,
> which is a dual-core Cortex A72 device for industrial networking,
> with the CPUs clocked at up to 1.3 GHz. On this platform, there are 4
> ENETC ports and a 6-port embedded DSA switch, in a topology that looks
> like this:
> 
> [...]

Here is the summary with links:
  - [net-next,1/9] net: enetc: consume the error RX buffer descriptors in a dedicated function
    https://git.kernel.org/netdev/net-next/c/2fa423f5f0c6
  - [net-next,2/9] net: enetc: move skb creation into enetc_build_skb
    https://git.kernel.org/netdev/net-next/c/a800abd3ecb9
  - [net-next,3/9] net: enetc: add a dedicated is_eof bit in the TX software BD
    https://git.kernel.org/netdev/net-next/c/d504498d2eb3
  - [net-next,4/9] net: enetc: clean the TX software BD on the TX confirmation path
    https://git.kernel.org/netdev/net-next/c/1ee8d6f3bebb
  - [net-next,5/9] net: enetc: move up enetc_reuse_page and enetc_page_reusable
    https://git.kernel.org/netdev/net-next/c/65d0cbb414ce
  - [net-next,6/9] net: enetc: add support for XDP_DROP and XDP_PASS
    https://git.kernel.org/netdev/net-next/c/d1b15102dd16
  - [net-next,7/9] net: enetc: add support for XDP_TX
    https://git.kernel.org/netdev/net-next/c/7ed2bc80074e
  - [net-next,8/9] net: enetc: increase RX ring default size
    https://git.kernel.org/netdev/net-next/c/d6a2829e82cf
  - [net-next,9/9] net: enetc: add support for XDP_REDIRECT
    https://git.kernel.org/netdev/net-next/c/9d2b68cc108d

You are awesome, thank you!
--
Deet-doot-dot, I am a bot.
https://korg.docs.kernel.org/patchwork/pwbot.html



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH net-next 0/9] XDP for NXP ENETC
  2021-03-31 22:20 ` [PATCH net-next 0/9] XDP for NXP ENETC patchwork-bot+netdevbpf
@ 2021-03-31 22:55   ` Vladimir Oltean
  2021-04-01 11:28     ` Toke Høiland-Jørgensen
  0 siblings, 1 reply; 23+ messages in thread
From: Vladimir Oltean @ 2021-03-31 22:55 UTC (permalink / raw)
  To: patchwork-bot+netdevbpf
  Cc: kuba, ast, daniel, hawk, john.fastabend, andrii, kafai,
	songliubraving, yhs, kpsingh, davem, netdev, bpf,
	alexander.duyck, ioana.ciornei, alexandru.marginean,
	claudiu.manoil, ilias.apalodimas, vladimir.oltean

On Wed, Mar 31, 2021 at 10:20:18PM +0000, patchwork-bot+netdevbpf@kernel.org wrote:
> Hello:
>
> This series was applied to netdev/net-next.git (refs/heads/master):
>
> On Wed, 31 Mar 2021 23:08:48 +0300 you wrote:
> > From: Vladimir Oltean <vladimir.oltean@nxp.com>
> >
> > This series adds support to the enetc driver for the basic XDP primitives.
> > The ENETC is a network controller found inside the NXP LS1028A SoC,
> > which is a dual-core Cortex A72 device for industrial networking,
> > with the CPUs clocked at up to 1.3 GHz. On this platform, there are 4
> > ENETC ports and a 6-port embedded DSA switch, in a topology that looks
> > like this:
> >
> > [...]
>
> Here is the summary with links:
>   - [net-next,1/9] net: enetc: consume the error RX buffer descriptors in a dedicated function
>     https://git.kernel.org/netdev/net-next/c/2fa423f5f0c6
>   - [net-next,2/9] net: enetc: move skb creation into enetc_build_skb
>     https://git.kernel.org/netdev/net-next/c/a800abd3ecb9
>   - [net-next,3/9] net: enetc: add a dedicated is_eof bit in the TX software BD
>     https://git.kernel.org/netdev/net-next/c/d504498d2eb3
>   - [net-next,4/9] net: enetc: clean the TX software BD on the TX confirmation path
>     https://git.kernel.org/netdev/net-next/c/1ee8d6f3bebb
>   - [net-next,5/9] net: enetc: move up enetc_reuse_page and enetc_page_reusable
>     https://git.kernel.org/netdev/net-next/c/65d0cbb414ce
>   - [net-next,6/9] net: enetc: add support for XDP_DROP and XDP_PASS
>     https://git.kernel.org/netdev/net-next/c/d1b15102dd16
>   - [net-next,7/9] net: enetc: add support for XDP_TX
>     https://git.kernel.org/netdev/net-next/c/7ed2bc80074e
>   - [net-next,8/9] net: enetc: increase RX ring default size
>     https://git.kernel.org/netdev/net-next/c/d6a2829e82cf
>   - [net-next,9/9] net: enetc: add support for XDP_REDIRECT
>     https://git.kernel.org/netdev/net-next/c/9d2b68cc108d
>
> You are awesome, thank you!
> --
> Deet-doot-dot, I am a bot.
> https://korg.docs.kernel.org/patchwork/pwbot.html

Let's play a drinking game, the winner is who doesn't get drunk every
time Dave merges a 9-patch series with no review in less than two hours
after it was posted :D

Now in all seriousness, I'm very much open to receiving feedback still.
In a way, I appreciate seeing the patches merged as is, since further
fixups made based on review will add more nuance and color to the git
log, and it will be easier to understand why things were done or
modified in a certain way, etc, as opposed to the alternative which is
to keep amending the same blank 'add support for XDP_TX / XDP_REDIRECT /
whatever', with never enough attention given to the finer points.

^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH net-next 9/9] net: enetc: add support for XDP_REDIRECT
  2021-03-31 20:08 ` [PATCH net-next 9/9] net: enetc: add support for XDP_REDIRECT Vladimir Oltean
@ 2021-04-01 11:26   ` Toke Høiland-Jørgensen
  2021-04-01 11:31     ` Vladimir Oltean
  0 siblings, 1 reply; 23+ messages in thread
From: Toke Høiland-Jørgensen @ 2021-04-01 11:26 UTC (permalink / raw)
  To: Vladimir Oltean, Jakub Kicinski, Alexei Starovoitov,
	Daniel Borkmann, Jesper Dangaard Brouer, John Fastabend,
	Andrii Nakryiko, Martin KaFai Lau, Song Liu, Yonghong Song,
	KP Singh, David S. Miller, netdev, bpf
  Cc: Alexander Duyck, Ioana Ciornei, Alex Marginean, Claudiu Manoil,
	Ilias Apalodimas, Vladimir Oltean

Vladimir Oltean <olteanv@gmail.com> writes:

> From: Vladimir Oltean <vladimir.oltean@nxp.com>
>
> The driver implementation of the XDP_REDIRECT action reuses parts from
> XDP_TX, most notably the enetc_xdp_tx function which transmits an array
> of TX software BDs. Only this time, the buffers don't have DMA mappings,
> we need to create them.
>
> When a BPF program reaches the XDP_REDIRECT verdict for a frame, we can
> employ the same buffer reuse strategy as for the normal processing path
> and for XDP_PASS: we can flip to the other page half and seed that to
> the RX ring.
>
> Note that scatter/gather support is there, but disabled due to lack of
> multi-buffer support in XDP (which is added by this series):
> https://patchwork.kernel.org/project/netdevbpf/cover/cover.1616179034.git.lorenzo@kernel.org/
>
> Signed-off-by: Vladimir Oltean <vladimir.oltean@nxp.com>
> ---
>  drivers/net/ethernet/freescale/enetc/enetc.c  | 212 +++++++++++++++++-
>  drivers/net/ethernet/freescale/enetc/enetc.h  |  11 +-
>  .../ethernet/freescale/enetc/enetc_ethtool.c  |   6 +
>  .../net/ethernet/freescale/enetc/enetc_pf.c   |   1 +
>  4 files changed, 218 insertions(+), 12 deletions(-)
>
> diff --git a/drivers/net/ethernet/freescale/enetc/enetc.c b/drivers/net/ethernet/freescale/enetc/enetc.c
> index ba5313a5d7a4..57049ae97201 100644
> --- a/drivers/net/ethernet/freescale/enetc/enetc.c
> +++ b/drivers/net/ethernet/freescale/enetc/enetc.c
> @@ -8,6 +8,23 @@
>  #include <linux/vmalloc.h>
>  #include <net/pkt_sched.h>
>  
> +static struct sk_buff *enetc_tx_swbd_get_skb(struct enetc_tx_swbd *tx_swbd)
> +{
> +	if (tx_swbd->is_xdp_tx || tx_swbd->is_xdp_redirect)
> +		return NULL;
> +
> +	return tx_swbd->skb;
> +}
> +
> +static struct xdp_frame *
> +enetc_tx_swbd_get_xdp_frame(struct enetc_tx_swbd *tx_swbd)
> +{
> +	if (tx_swbd->is_xdp_redirect)
> +		return tx_swbd->xdp_frame;
> +
> +	return NULL;
> +}
> +
>  static void enetc_unmap_tx_buff(struct enetc_bdr *tx_ring,
>  				struct enetc_tx_swbd *tx_swbd)
>  {
> @@ -25,14 +42,20 @@ static void enetc_unmap_tx_buff(struct enetc_bdr *tx_ring,
>  	tx_swbd->dma = 0;
>  }
>  
> -static void enetc_free_tx_skb(struct enetc_bdr *tx_ring,
> -			      struct enetc_tx_swbd *tx_swbd)
> +static void enetc_free_tx_frame(struct enetc_bdr *tx_ring,
> +				struct enetc_tx_swbd *tx_swbd)
>  {
> +	struct xdp_frame *xdp_frame = enetc_tx_swbd_get_xdp_frame(tx_swbd);
> +	struct sk_buff *skb = enetc_tx_swbd_get_skb(tx_swbd);
> +
>  	if (tx_swbd->dma)
>  		enetc_unmap_tx_buff(tx_ring, tx_swbd);
>  
> -	if (tx_swbd->skb) {
> -		dev_kfree_skb_any(tx_swbd->skb);
> +	if (xdp_frame) {
> +		xdp_return_frame(tx_swbd->xdp_frame);
> +		tx_swbd->xdp_frame = NULL;
> +	} else if (skb) {
> +		dev_kfree_skb_any(skb);
>  		tx_swbd->skb = NULL;
>  	}
>  }
> @@ -183,7 +206,7 @@ static int enetc_map_tx_buffs(struct enetc_bdr *tx_ring, struct sk_buff *skb,
>  
>  	do {
>  		tx_swbd = &tx_ring->tx_swbd[i];
> -		enetc_free_tx_skb(tx_ring, tx_swbd);
> +		enetc_free_tx_frame(tx_ring, tx_swbd);
>  		if (i == 0)
>  			i = tx_ring->bd_count;
>  		i--;
> @@ -381,6 +404,9 @@ static bool enetc_clean_tx_ring(struct enetc_bdr *tx_ring, int napi_budget)
>  	do_tstamp = false;
>  
>  	while (bds_to_clean && tx_frm_cnt < ENETC_DEFAULT_TX_WORK) {
> +		struct xdp_frame *xdp_frame = enetc_tx_swbd_get_xdp_frame(tx_swbd);
> +		struct sk_buff *skb = enetc_tx_swbd_get_skb(tx_swbd);
> +
>  		if (unlikely(tx_swbd->check_wb)) {
>  			struct enetc_ndev_priv *priv = netdev_priv(ndev);
>  			union enetc_tx_bd *txbd;
> @@ -400,12 +426,15 @@ static bool enetc_clean_tx_ring(struct enetc_bdr *tx_ring, int napi_budget)
>  		else if (likely(tx_swbd->dma))
>  			enetc_unmap_tx_buff(tx_ring, tx_swbd);
>  
> -		if (tx_swbd->skb) {
> +		if (xdp_frame) {
> +			xdp_return_frame(xdp_frame);
> +			tx_swbd->xdp_frame = NULL;
> +		} else if (skb) {
>  			if (unlikely(do_tstamp)) {
> -				enetc_tstamp_tx(tx_swbd->skb, tstamp);
> +				enetc_tstamp_tx(skb, tstamp);
>  				do_tstamp = false;
>  			}
> -			napi_consume_skb(tx_swbd->skb, napi_budget);
> +			napi_consume_skb(skb, napi_budget);
>  			tx_swbd->skb = NULL;
>  		}
>  
> @@ -827,6 +856,109 @@ static bool enetc_xdp_tx(struct enetc_bdr *tx_ring,
>  	return true;
>  }
>  
> +static int enetc_xdp_frame_to_xdp_tx_swbd(struct enetc_bdr *tx_ring,
> +					  struct enetc_tx_swbd *xdp_tx_arr,
> +					  struct xdp_frame *xdp_frame)
> +{
> +	struct enetc_tx_swbd *xdp_tx_swbd = &xdp_tx_arr[0];
> +	struct skb_shared_info *shinfo;
> +	void *data = xdp_frame->data;
> +	int len = xdp_frame->len;
> +	skb_frag_t *frag;
> +	dma_addr_t dma;
> +	unsigned int f;
> +	int n = 0;
> +
> +	dma = dma_map_single(tx_ring->dev, data, len, DMA_TO_DEVICE);
> +	if (unlikely(dma_mapping_error(tx_ring->dev, dma))) {
> +		netdev_err(tx_ring->ndev, "DMA map error\n");
> +		return -1;
> +	}
> +
> +	xdp_tx_swbd->dma = dma;
> +	xdp_tx_swbd->dir = DMA_TO_DEVICE;
> +	xdp_tx_swbd->len = len;
> +	xdp_tx_swbd->is_xdp_redirect = true;
> +	xdp_tx_swbd->is_eof = false;
> +	xdp_tx_swbd->xdp_frame = NULL;
> +
> +	n++;
> +	xdp_tx_swbd = &xdp_tx_arr[n];
> +
> +	shinfo = xdp_get_shared_info_from_frame(xdp_frame);
> +
> +	for (f = 0, frag = &shinfo->frags[0]; f < shinfo->nr_frags;
> +	     f++, frag++) {
> +		data = skb_frag_address(frag);
> +		len = skb_frag_size(frag);
> +
> +		dma = dma_map_single(tx_ring->dev, data, len, DMA_TO_DEVICE);
> +		if (unlikely(dma_mapping_error(tx_ring->dev, dma))) {
> +			/* Undo the DMA mapping for all fragments */
> +			while (n-- >= 0)
> +				enetc_unmap_tx_buff(tx_ring, &xdp_tx_arr[n]);
> +
> +			netdev_err(tx_ring->ndev, "DMA map error\n");
> +			return -1;
> +		}
> +
> +		xdp_tx_swbd->dma = dma;
> +		xdp_tx_swbd->dir = DMA_TO_DEVICE;
> +		xdp_tx_swbd->len = len;
> +		xdp_tx_swbd->is_xdp_redirect = true;
> +		xdp_tx_swbd->is_eof = false;
> +		xdp_tx_swbd->xdp_frame = NULL;
> +
> +		n++;
> +		xdp_tx_swbd = &xdp_tx_arr[n];
> +	}
> +
> +	xdp_tx_arr[n - 1].is_eof = true;
> +	xdp_tx_arr[n - 1].xdp_frame = xdp_frame;
> +
> +	return n;
> +}
> +
> +int enetc_xdp_xmit(struct net_device *ndev, int num_frames,
> +		   struct xdp_frame **frames, u32 flags)
> +{
> +	struct enetc_tx_swbd xdp_redirect_arr[ENETC_MAX_SKB_FRAGS] = {0};
> +	struct enetc_ndev_priv *priv = netdev_priv(ndev);
> +	struct enetc_bdr *tx_ring;
> +	int xdp_tx_bd_cnt, i, k;
> +	int xdp_tx_frm_cnt = 0;
> +
> +	tx_ring = priv->tx_ring[smp_processor_id()];

What mechanism guarantees that this won't overflow the array? :)

-Toke


^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH net-next 0/9] XDP for NXP ENETC
  2021-03-31 22:55   ` Vladimir Oltean
@ 2021-04-01 11:28     ` Toke Høiland-Jørgensen
  0 siblings, 0 replies; 23+ messages in thread
From: Toke Høiland-Jørgensen @ 2021-04-01 11:28 UTC (permalink / raw)
  To: Vladimir Oltean, patchwork-bot+netdevbpf
  Cc: kuba, ast, daniel, hawk, john.fastabend, andrii, kafai,
	songliubraving, yhs, kpsingh, davem, netdev, bpf,
	alexander.duyck, ioana.ciornei, alexandru.marginean,
	claudiu.manoil, ilias.apalodimas, vladimir.oltean

Vladimir Oltean <olteanv@gmail.com> writes:

> On Wed, Mar 31, 2021 at 10:20:18PM +0000, patchwork-bot+netdevbpf@kernel.org wrote:
>> Hello:
>>
>> This series was applied to netdev/net-next.git (refs/heads/master):
>>
>> On Wed, 31 Mar 2021 23:08:48 +0300 you wrote:
>> > From: Vladimir Oltean <vladimir.oltean@nxp.com>
>> >
>> > This series adds support to the enetc driver for the basic XDP primitives.
>> > The ENETC is a network controller found inside the NXP LS1028A SoC,
>> > which is a dual-core Cortex A72 device for industrial networking,
>> > with the CPUs clocked at up to 1.3 GHz. On this platform, there are 4
>> > ENETC ports and a 6-port embedded DSA switch, in a topology that looks
>> > like this:
>> >
>> > [...]
>>
>> Here is the summary with links:
>>   - [net-next,1/9] net: enetc: consume the error RX buffer descriptors in a dedicated function
>>     https://git.kernel.org/netdev/net-next/c/2fa423f5f0c6
>>   - [net-next,2/9] net: enetc: move skb creation into enetc_build_skb
>>     https://git.kernel.org/netdev/net-next/c/a800abd3ecb9
>>   - [net-next,3/9] net: enetc: add a dedicated is_eof bit in the TX software BD
>>     https://git.kernel.org/netdev/net-next/c/d504498d2eb3
>>   - [net-next,4/9] net: enetc: clean the TX software BD on the TX confirmation path
>>     https://git.kernel.org/netdev/net-next/c/1ee8d6f3bebb
>>   - [net-next,5/9] net: enetc: move up enetc_reuse_page and enetc_page_reusable
>>     https://git.kernel.org/netdev/net-next/c/65d0cbb414ce
>>   - [net-next,6/9] net: enetc: add support for XDP_DROP and XDP_PASS
>>     https://git.kernel.org/netdev/net-next/c/d1b15102dd16
>>   - [net-next,7/9] net: enetc: add support for XDP_TX
>>     https://git.kernel.org/netdev/net-next/c/7ed2bc80074e
>>   - [net-next,8/9] net: enetc: increase RX ring default size
>>     https://git.kernel.org/netdev/net-next/c/d6a2829e82cf
>>   - [net-next,9/9] net: enetc: add support for XDP_REDIRECT
>>     https://git.kernel.org/netdev/net-next/c/9d2b68cc108d
>>
>> You are awesome, thank you!
>> --
>> Deet-doot-dot, I am a bot.
>> https://korg.docs.kernel.org/patchwork/pwbot.html
>
> Let's play a drinking game, the winner is who doesn't get drunk every
> time Dave merges a 9-patch series with no review in less than two hours
> after it was posted :D

No thanks! I value my liver too much for that ;)

(I was wondering about whether there was some black magic going on here
that I had missed; glad to see it's not just me)

> Now in all seriousness, I'm very much open to receiving feedback
> still.

Good - sent you one comment on the REDIRECT support to start with :)

-Toke


^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH net-next 9/9] net: enetc: add support for XDP_REDIRECT
  2021-04-01 11:26   ` Toke Høiland-Jørgensen
@ 2021-04-01 11:31     ` Vladimir Oltean
  2021-04-01 11:39       ` Toke Høiland-Jørgensen
  0 siblings, 1 reply; 23+ messages in thread
From: Vladimir Oltean @ 2021-04-01 11:31 UTC (permalink / raw)
  To: Toke Høiland-Jørgensen
  Cc: Jakub Kicinski, Alexei Starovoitov, Daniel Borkmann,
	Jesper Dangaard Brouer, John Fastabend, Andrii Nakryiko,
	Martin KaFai Lau, Song Liu, Yonghong Song, KP Singh,
	David S. Miller, netdev, bpf, Alexander Duyck, Ioana Ciornei,
	Alex Marginean, Claudiu Manoil, Ilias Apalodimas,
	Vladimir Oltean

On Thu, Apr 01, 2021 at 01:26:02PM +0200, Toke Høiland-Jørgensen wrote:
> > +int enetc_xdp_xmit(struct net_device *ndev, int num_frames,
> > +		   struct xdp_frame **frames, u32 flags)
> > +{
> > +	struct enetc_tx_swbd xdp_redirect_arr[ENETC_MAX_SKB_FRAGS] = {0};
> > +	struct enetc_ndev_priv *priv = netdev_priv(ndev);
> > +	struct enetc_bdr *tx_ring;
> > +	int xdp_tx_bd_cnt, i, k;
> > +	int xdp_tx_frm_cnt = 0;
> > +
> > +	tx_ring = priv->tx_ring[smp_processor_id()];
> 
> What mechanism guarantees that this won't overflow the array? :)

Which array, the array of TX rings?
You mean that it's possible to receive a TC_SETUP_QDISC_MQPRIO or
TC_SETUP_QDISC_TAPRIO with num_tc == 1, and we have 2 CPUs?
Well, yeah, I don't know what's the proper way to deal with that. Ideas?

^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH net-next 9/9] net: enetc: add support for XDP_REDIRECT
  2021-04-01 11:31     ` Vladimir Oltean
@ 2021-04-01 11:39       ` Toke Høiland-Jørgensen
  2021-04-01 16:09         ` Vladimir Oltean
  0 siblings, 1 reply; 23+ messages in thread
From: Toke Høiland-Jørgensen @ 2021-04-01 11:39 UTC (permalink / raw)
  To: Vladimir Oltean
  Cc: Jakub Kicinski, Alexei Starovoitov, Daniel Borkmann,
	Jesper Dangaard Brouer, John Fastabend, Andrii Nakryiko,
	Martin KaFai Lau, Song Liu, Yonghong Song, KP Singh,
	David S. Miller, netdev, bpf, Alexander Duyck, Ioana Ciornei,
	Alex Marginean, Claudiu Manoil, Ilias Apalodimas,
	Vladimir Oltean

Vladimir Oltean <olteanv@gmail.com> writes:

> On Thu, Apr 01, 2021 at 01:26:02PM +0200, Toke Høiland-Jørgensen wrote:
>> > +int enetc_xdp_xmit(struct net_device *ndev, int num_frames,
>> > +		   struct xdp_frame **frames, u32 flags)
>> > +{
>> > +	struct enetc_tx_swbd xdp_redirect_arr[ENETC_MAX_SKB_FRAGS] = {0};
>> > +	struct enetc_ndev_priv *priv = netdev_priv(ndev);
>> > +	struct enetc_bdr *tx_ring;
>> > +	int xdp_tx_bd_cnt, i, k;
>> > +	int xdp_tx_frm_cnt = 0;
>> > +
>> > +	tx_ring = priv->tx_ring[smp_processor_id()];
>> 
>> What mechanism guarantees that this won't overflow the array? :)
>
> Which array, the array of TX rings?

Yes.

> You mean that it's possible to receive a TC_SETUP_QDISC_MQPRIO or
> TC_SETUP_QDISC_TAPRIO with num_tc == 1, and we have 2 CPUs?

Not just that, this ndo can be called on arbitrary CPUs after a
redirect. The code just calls through from the XDP receive path so which
CPU it ends up on depends on the RSS+IRQ config of the other device,
which may not even be the same driver; i.e., you have no control over
that... :)

> Well, yeah, I don't know what's the proper way to deal with that. Ideas?

Well the obvious one is just:

tx_ring = priv->tx_ring[smp_processor_id() % num_ring_ids];

and then some kind of locking to deal with multiple CPUs accessing the
same TX ring...

-Toke


^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH net-next 9/9] net: enetc: add support for XDP_REDIRECT
  2021-04-01 11:39       ` Toke Høiland-Jørgensen
@ 2021-04-01 16:09         ` Vladimir Oltean
  2021-04-01 18:01           ` Toke Høiland-Jørgensen
  0 siblings, 1 reply; 23+ messages in thread
From: Vladimir Oltean @ 2021-04-01 16:09 UTC (permalink / raw)
  To: Toke Høiland-Jørgensen
  Cc: Jakub Kicinski, Alexei Starovoitov, Daniel Borkmann,
	Jesper Dangaard Brouer, John Fastabend, Andrii Nakryiko,
	Martin KaFai Lau, Song Liu, Yonghong Song, KP Singh,
	David S. Miller, netdev, bpf, Alexander Duyck, Ioana Ciornei,
	Alex Marginean, Claudiu Manoil, Ilias Apalodimas,
	Vladimir Oltean

On Thu, Apr 01, 2021 at 01:39:05PM +0200, Toke Høiland-Jørgensen wrote:
> Vladimir Oltean <olteanv@gmail.com> writes:
>
> > On Thu, Apr 01, 2021 at 01:26:02PM +0200, Toke Høiland-Jørgensen wrote:
> >> > +int enetc_xdp_xmit(struct net_device *ndev, int num_frames,
> >> > +		   struct xdp_frame **frames, u32 flags)
> >> > +{
> >> > +	struct enetc_tx_swbd xdp_redirect_arr[ENETC_MAX_SKB_FRAGS] = {0};
> >> > +	struct enetc_ndev_priv *priv = netdev_priv(ndev);
> >> > +	struct enetc_bdr *tx_ring;
> >> > +	int xdp_tx_bd_cnt, i, k;
> >> > +	int xdp_tx_frm_cnt = 0;
> >> > +
> >> > +	tx_ring = priv->tx_ring[smp_processor_id()];
> >>
> >> What mechanism guarantees that this won't overflow the array? :)
> >
> > Which array, the array of TX rings?
>
> Yes.
>

The problem isn't even accessing an out-of-bounds element in the TX ring array.

As it turns out, I had a relatively superficial understanding of how
things are organized, but let me try to explain.

The number of TX rings is a configurable resource (between PFs and VFs)
and we read the capability at probe time:

enetc_get_si_caps:
	val = enetc_rd(hw, ENETC_SICAPR0);
	si->num_rx_rings = (val >> 16) & 0xff;
	si->num_tx_rings = val & 0xff;

enetc_init_si_rings_params:
	priv->num_tx_rings = si->num_tx_rings;

In any case, the TX array is declared as:

struct enetc_ndev_priv {
	struct enetc_bdr *tx_ring[16];
	struct enetc_bdr *rx_ring[16];
};

because that's the maximum hardware capability.

The priv->tx_ring array is populated in:

enetc_alloc_msix:
	/* # of tx rings per int vector */
	v_tx_rings = priv->num_tx_rings / priv->bdr_int_num;

	for (i = 0; i < priv->bdr_int_num; i++) {
		for (j = 0; j < v_tx_rings; j++) {
			if (priv->bdr_int_num == ENETC_MAX_BDR_INT)
				idx = 2 * j + i; /* 2 CPUs */
			else
				idx = j + i * v_tx_rings; /* default */

			priv->tx_ring[idx] = bdr;
		}
	}

priv->bdr_int_num is set to "num_online_cpus()".
On LS1028A, it can be either 1 or 2 (and the ENETC_MAX_BDR_INT macro is
equal to 2).

Otherwise said, the convoluted logic above does the following:
- It affines an MSI interrupt vector per CPU
- It affines an RX ring per MSI vector, hence per CPU
- It balances the fixed number of TX rings (say 8) among the available
  MSI vectors, hence CPUs (say 2). It does this by iterating with i
  through the RX MSI interrupt vectors, and with j through the number of
  TX rings per MSI vector.

This logic maps:
- the even TX rings to CPU 0 and the odd TX rings to CPU 1, if 2 CPUs
  are used
- all TX rings to CPU 0, if 1 CPU is used

This is done because we have this logic in enetc_poll:

	for (i = 0; i < v->count_tx_rings; i++)
		if (!enetc_clean_tx_ring(&v->tx_ring[i], budget))
			complete = false;

for processing the TX completions of a given group of TX rings in the RX
MSI interrupt handler of a certain CPU.

Otherwise said, priv->tx_ring[i] is always BD ring i, and that mapping
never changes. All 8 TX rings are enabled and available for use.

What I knew about tc-taprio and tc-mqprio is that they only enqueue to
TX queues [0, num_tc-1] because of this, as it turns out:

enetc_xmit:
	tx_ring = priv->tx_ring[skb->queue_mapping];

where skb->queue_mapping is given by:
	err = netif_set_real_num_tx_queues(ndev, priv->num_tx_rings);
and by this, respectively, from the mqprio code path:
	netif_set_real_num_tx_queues(ndev, num_tc);

As for why XDP works, and priv->tx_ring[smp_processor_id()] is:
- TX ring 0 for CPU 0 and TX ring 1 for CPU 1, if 2 CPUs are used
- TX ring 0, if 1 CPU is used

The TX completions in the first case are handled by:
- CPU 0 for TX ring 0 (because it is even) and CPU 1 for TX ring 1
  (because it is odd), if 2 CPUs are used, due to the mapping I talked
  about earlier
- CPU 0 if only 1 CPU is used

> > You mean that it's possible to receive a TC_SETUP_QDISC_MQPRIO or
> > TC_SETUP_QDISC_TAPRIO with num_tc == 1, and we have 2 CPUs?
>
> Not just that, this ndo can be called on arbitrary CPUs after a
> redirect. The code just calls through from the XDP receive path so which
> CPU it ends up on depends on the RSS+IRQ config of the other device,
> which may not even be the same driver; i.e., you have no control over
> that... :)
>

What do you mean by "arbitrary" CPU? You can't plug CPUs in, it's a dual
core system... Why does the source ifindex matter at all? I'm using the
TX ring affined to the CPU that ndo_xdp_xmit is currently running on.

> > Well, yeah, I don't know what's the proper way to deal with that. Ideas?
>
> Well the obvious one is just:
>
> tx_ring = priv->tx_ring[smp_processor_id() % num_ring_ids];
>
> and then some kind of locking to deal with multiple CPUs accessing the
> same TX ring...

By multiple CPUs accessing the same TX ring, you mean locking between
ndo_xdp_xmit and ndo_start_xmit? Can that even happen if the hardware
architecture is to have at least as many TX rings as CPUs?

Because otherwise, I see that ndo_xdp_xmit is only called from
xdp_do_flush, which is in softirq context, which to my very rudimentary
knowledge run with bottom halves, thus preemption, disabled? So I don't
think it's possible for ndo_xdp_xmit and ndo_xmit, or even two
ndo_xdp_xmit instances, to access the same TX ring?

Sorry, I'm sure these are trivial questions, but I would like to really
understand what I need to change and why :D

^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH net-next 9/9] net: enetc: add support for XDP_REDIRECT
  2021-04-01 16:09         ` Vladimir Oltean
@ 2021-04-01 18:01           ` Toke Høiland-Jørgensen
  2021-04-01 19:38             ` Vladimir Oltean
  0 siblings, 1 reply; 23+ messages in thread
From: Toke Høiland-Jørgensen @ 2021-04-01 18:01 UTC (permalink / raw)
  To: Vladimir Oltean
  Cc: Jakub Kicinski, Alexei Starovoitov, Daniel Borkmann,
	Jesper Dangaard Brouer, John Fastabend, Andrii Nakryiko,
	Martin KaFai Lau, Song Liu, Yonghong Song, KP Singh,
	David S. Miller, netdev, bpf, Alexander Duyck, Ioana Ciornei,
	Alex Marginean, Claudiu Manoil, Ilias Apalodimas,
	Vladimir Oltean

Vladimir Oltean <olteanv@gmail.com> writes:

> On Thu, Apr 01, 2021 at 01:39:05PM +0200, Toke Høiland-Jørgensen wrote:
>> Vladimir Oltean <olteanv@gmail.com> writes:
>>
>> > On Thu, Apr 01, 2021 at 01:26:02PM +0200, Toke Høiland-Jørgensen wrote:
>> >> > +int enetc_xdp_xmit(struct net_device *ndev, int num_frames,
>> >> > +		   struct xdp_frame **frames, u32 flags)
>> >> > +{
>> >> > +	struct enetc_tx_swbd xdp_redirect_arr[ENETC_MAX_SKB_FRAGS] = {0};
>> >> > +	struct enetc_ndev_priv *priv = netdev_priv(ndev);
>> >> > +	struct enetc_bdr *tx_ring;
>> >> > +	int xdp_tx_bd_cnt, i, k;
>> >> > +	int xdp_tx_frm_cnt = 0;
>> >> > +
>> >> > +	tx_ring = priv->tx_ring[smp_processor_id()];
>> >>
>> >> What mechanism guarantees that this won't overflow the array? :)
>> >
>> > Which array, the array of TX rings?
>>
>> Yes.
>>
>
> The problem isn't even accessing an out-of-bounds element in the TX ring array.
>
> As it turns out, I had a relatively superficial understanding of how
> things are organized, but let me try to explain.
>
> The number of TX rings is a configurable resource (between PFs and VFs)
> and we read the capability at probe time:
>
> enetc_get_si_caps:
> 	val = enetc_rd(hw, ENETC_SICAPR0);
> 	si->num_rx_rings = (val >> 16) & 0xff;
> 	si->num_tx_rings = val & 0xff;
>
> enetc_init_si_rings_params:
> 	priv->num_tx_rings = si->num_tx_rings;
>
> In any case, the TX array is declared as:
>
> struct enetc_ndev_priv {
> 	struct enetc_bdr *tx_ring[16];
> 	struct enetc_bdr *rx_ring[16];
> };
>
> because that's the maximum hardware capability.
>
> The priv->tx_ring array is populated in:
>
> enetc_alloc_msix:
> 	/* # of tx rings per int vector */
> 	v_tx_rings = priv->num_tx_rings / priv->bdr_int_num;
>
> 	for (i = 0; i < priv->bdr_int_num; i++) {
> 		for (j = 0; j < v_tx_rings; j++) {
> 			if (priv->bdr_int_num == ENETC_MAX_BDR_INT)
> 				idx = 2 * j + i; /* 2 CPUs */
> 			else
> 				idx = j + i * v_tx_rings; /* default */
>
> 			priv->tx_ring[idx] = bdr;
> 		}
> 	}
>
> priv->bdr_int_num is set to "num_online_cpus()".
> On LS1028A, it can be either 1 or 2 (and the ENETC_MAX_BDR_INT macro is
> equal to 2).
>
> Otherwise said, the convoluted logic above does the following:
> - It affines an MSI interrupt vector per CPU
> - It affines an RX ring per MSI vector, hence per CPU
> - It balances the fixed number of TX rings (say 8) among the available
>   MSI vectors, hence CPUs (say 2). It does this by iterating with i
>   through the RX MSI interrupt vectors, and with j through the number of
>   TX rings per MSI vector.
>
> This logic maps:
> - the even TX rings to CPU 0 and the odd TX rings to CPU 1, if 2 CPUs
>   are used
> - all TX rings to CPU 0, if 1 CPU is used
>
> This is done because we have this logic in enetc_poll:
>
> 	for (i = 0; i < v->count_tx_rings; i++)
> 		if (!enetc_clean_tx_ring(&v->tx_ring[i], budget))
> 			complete = false;
>
> for processing the TX completions of a given group of TX rings in the RX
> MSI interrupt handler of a certain CPU.
>
> Otherwise said, priv->tx_ring[i] is always BD ring i, and that mapping
> never changes. All 8 TX rings are enabled and available for use.
>
> What I knew about tc-taprio and tc-mqprio is that they only enqueue to
> TX queues [0, num_tc-1] because of this, as it turns out:
>
> enetc_xmit:
> 	tx_ring = priv->tx_ring[skb->queue_mapping];
>
> where skb->queue_mapping is given by:
> 	err = netif_set_real_num_tx_queues(ndev, priv->num_tx_rings);
> and by this, respectively, from the mqprio code path:
> 	netif_set_real_num_tx_queues(ndev, num_tc);
>
> As for why XDP works, and priv->tx_ring[smp_processor_id()] is:
> - TX ring 0 for CPU 0 and TX ring 1 for CPU 1, if 2 CPUs are used
> - TX ring 0, if 1 CPU is used
>
> The TX completions in the first case are handled by:
> - CPU 0 for TX ring 0 (because it is even) and CPU 1 for TX ring 1
>   (because it is odd), if 2 CPUs are used, due to the mapping I talked
>   about earlier
> - CPU 0 if only 1 CPU is used

Right - thank you for the details! So what are the constraints on the
configuration. Specifically, given two netdevs on the same device, is it
possible that the system can ever end up in a situation where one device
has two *RXQs* configured, and the other only one *TXQ*. Because then
you could get a redirect from RXQ 1 on one device, which would also end
up trying to transmit on TXQ 1 on the other device; and that would break
if that other device only has TXQ 0 configured... Same thing if a single
device has 2 RXQs but only one TXQ (it can redirect to itself).

>> > You mean that it's possible to receive a TC_SETUP_QDISC_MQPRIO or
>> > TC_SETUP_QDISC_TAPRIO with num_tc == 1, and we have 2 CPUs?
>>
>> Not just that, this ndo can be called on arbitrary CPUs after a
>> redirect. The code just calls through from the XDP receive path so which
>> CPU it ends up on depends on the RSS+IRQ config of the other device,
>> which may not even be the same driver; i.e., you have no control over
>> that... :)
>>
>
> What do you mean by "arbitrary" CPU? You can't plug CPUs in, it's a dual
> core system... Why does the source ifindex matter at all? I'm using the
> TX ring affined to the CPU that ndo_xdp_xmit is currently running on.

See, this is why I asked 'what mechanism ensures'. Because if that
mechanism is 'this driver is only ever used on a system with fewer CPUs
than TXQs', then that's of course fine :)

But there are drivers that do basically the same thing as what you've
done here, *without* having such an assurance, and just looking at that
function it's not obvious that there's an out-of-band reason why it's
safe. And I literally just came from looking at such a case when I
replied to your initial patch...

>> > Well, yeah, I don't know what's the proper way to deal with that. Ideas?
>>
>> Well the obvious one is just:
>>
>> tx_ring = priv->tx_ring[smp_processor_id() % num_ring_ids];
>>
>> and then some kind of locking to deal with multiple CPUs accessing the
>> same TX ring...
>
> By multiple CPUs accessing the same TX ring, you mean locking between
> ndo_xdp_xmit and ndo_start_xmit? Can that even happen if the hardware
> architecture is to have at least as many TX rings as CPUs?
>
> Because otherwise, I see that ndo_xdp_xmit is only called from
> xdp_do_flush, which is in softirq context, which to my very rudimentary
> knowledge run with bottom halves, thus preemption, disabled? So I don't
> think it's possible for ndo_xdp_xmit and ndo_xmit, or even two
> ndo_xdp_xmit instances, to access the same TX ring?

Yup, I think you're right about that. The "we always have more TXQs than
CPUs" condition was the bit I was missing (and of course you're *sure*
that this would never change sometime in the future, right? ;)).

> Sorry, I'm sure these are trivial questions, but I would like to really
> understand what I need to change and why :D

Given the above I think the only potentially breaking thing is the
#RXQ > #TXQ case I outlined. And maybe a comment documenting why indexing
the tx_ring array by smp_processor_id() is safe would be nice? :)

-Toke


^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH net-next 9/9] net: enetc: add support for XDP_REDIRECT
  2021-04-01 18:01           ` Toke Høiland-Jørgensen
@ 2021-04-01 19:38             ` Vladimir Oltean
  2021-04-02 10:56               ` Vladimir Oltean
  0 siblings, 1 reply; 23+ messages in thread
From: Vladimir Oltean @ 2021-04-01 19:38 UTC (permalink / raw)
  To: Toke Høiland-Jørgensen
  Cc: Jakub Kicinski, Alexei Starovoitov, Daniel Borkmann,
	Jesper Dangaard Brouer, John Fastabend, Andrii Nakryiko,
	Martin KaFai Lau, Song Liu, Yonghong Song, KP Singh,
	David S. Miller, netdev, bpf, Alexander Duyck, Ioana Ciornei,
	Alex Marginean, Claudiu Manoil, Ilias Apalodimas,
	Vladimir Oltean

On Thu, Apr 01, 2021 at 08:01:42PM +0200, Toke Høiland-Jørgensen wrote:
> Vladimir Oltean <olteanv@gmail.com> writes:
> 
> > On Thu, Apr 01, 2021 at 01:39:05PM +0200, Toke Høiland-Jørgensen wrote:
> >> Vladimir Oltean <olteanv@gmail.com> writes:
> >>
> >> > On Thu, Apr 01, 2021 at 01:26:02PM +0200, Toke Høiland-Jørgensen wrote:
> >> >> > +int enetc_xdp_xmit(struct net_device *ndev, int num_frames,
> >> >> > +		   struct xdp_frame **frames, u32 flags)
> >> >> > +{
> >> >> > +	struct enetc_tx_swbd xdp_redirect_arr[ENETC_MAX_SKB_FRAGS] = {0};
> >> >> > +	struct enetc_ndev_priv *priv = netdev_priv(ndev);
> >> >> > +	struct enetc_bdr *tx_ring;
> >> >> > +	int xdp_tx_bd_cnt, i, k;
> >> >> > +	int xdp_tx_frm_cnt = 0;
> >> >> > +
> >> >> > +	tx_ring = priv->tx_ring[smp_processor_id()];
> >> >>
> >> >> What mechanism guarantees that this won't overflow the array? :)
> >> >
> >> > Which array, the array of TX rings?
> >>
> >> Yes.
> >>
> >
> > The problem isn't even accessing an out-of-bounds element in the TX ring array.
> >
> > As it turns out, I had a relatively superficial understanding of how
> > things are organized, but let me try to explain.
> >
> > The number of TX rings is a configurable resource (between PFs and VFs)
> > and we read the capability at probe time:
> >
> > enetc_get_si_caps:
> > 	val = enetc_rd(hw, ENETC_SICAPR0);
> > 	si->num_rx_rings = (val >> 16) & 0xff;
> > 	si->num_tx_rings = val & 0xff;
> >
> > enetc_init_si_rings_params:
> > 	priv->num_tx_rings = si->num_tx_rings;
> >
> > In any case, the TX array is declared as:
> >
> > struct enetc_ndev_priv {
> > 	struct enetc_bdr *tx_ring[16];
> > 	struct enetc_bdr *rx_ring[16];
> > };
> >
> > because that's the maximum hardware capability.
> >
> > The priv->tx_ring array is populated in:
> >
> > enetc_alloc_msix:
> > 	/* # of tx rings per int vector */
> > 	v_tx_rings = priv->num_tx_rings / priv->bdr_int_num;
> >
> > 	for (i = 0; i < priv->bdr_int_num; i++) {
> > 		for (j = 0; j < v_tx_rings; j++) {
> > 			if (priv->bdr_int_num == ENETC_MAX_BDR_INT)
> > 				idx = 2 * j + i; /* 2 CPUs */
> > 			else
> > 				idx = j + i * v_tx_rings; /* default */
> >
> > 			priv->tx_ring[idx] = bdr;
> > 		}
> > 	}
> >
> > priv->bdr_int_num is set to "num_online_cpus()".
> > On LS1028A, it can be either 1 or 2 (and the ENETC_MAX_BDR_INT macro is
> > equal to 2).
> >
> > Otherwise said, the convoluted logic above does the following:
> > - It affines an MSI interrupt vector per CPU
> > - It affines an RX ring per MSI vector, hence per CPU
> > - It balances the fixed number of TX rings (say 8) among the available
> >   MSI vectors, hence CPUs (say 2). It does this by iterating with i
> >   through the RX MSI interrupt vectors, and with j through the number of
> >   TX rings per MSI vector.
> >
> > This logic maps:
> > - the even TX rings to CPU 0 and the odd TX rings to CPU 1, if 2 CPUs
> >   are used
> > - all TX rings to CPU 0, if 1 CPU is used
> >
> > This is done because we have this logic in enetc_poll:
> >
> > 	for (i = 0; i < v->count_tx_rings; i++)
> > 		if (!enetc_clean_tx_ring(&v->tx_ring[i], budget))
> > 			complete = false;
> >
> > for processing the TX completions of a given group of TX rings in the RX
> > MSI interrupt handler of a certain CPU.
> >
> > Otherwise said, priv->tx_ring[i] is always BD ring i, and that mapping
> > never changes. All 8 TX rings are enabled and available for use.
> >
> > What I knew about tc-taprio and tc-mqprio is that they only enqueue to
> > TX queues [0, num_tc-1] because of this, as it turns out:
> >
> > enetc_xmit:
> > 	tx_ring = priv->tx_ring[skb->queue_mapping];
> >
> > where skb->queue_mapping is given by:
> > 	err = netif_set_real_num_tx_queues(ndev, priv->num_tx_rings);
> > and by this, respectively, from the mqprio code path:
> > 	netif_set_real_num_tx_queues(ndev, num_tc);
> >
> > As for why XDP works, and priv->tx_ring[smp_processor_id()] is:
> > - TX ring 0 for CPU 0 and TX ring 1 for CPU 1, if 2 CPUs are used
> > - TX ring 0, if 1 CPU is used
> >
> > The TX completions in the first case are handled by:
> > - CPU 0 for TX ring 0 (because it is even) and CPU 1 for TX ring 1
> >   (because it is odd), if 2 CPUs are used, due to the mapping I talked
> >   about earlier
> > - CPU 0 if only 1 CPU is used
> 
> Right - thank you for the details! So what are the constraints on the
> configuration. Specifically, given two netdevs on the same device, is it
> possible that the system can ever end up in a situation where one device
> has two *RXQs* configured, and the other only one *TXQ*. Because then
> you could get a redirect from RXQ 1 on one device, which would also end
> up trying to transmit on TXQ 1 on the other device; and that would break
> if that other device only has TXQ 0 configured... Same thing if a single
> device has 2 RXQs but only one TXQ (it can redirect to itself).

I discover more and more of the driver as I talk to you, I like it :D

So I said that there is a maximum number of RX and TX rings splittable
between the PF and its VFs, but I wasn't exactly sure where that
configuration is done. I found it now.

enetc_port_si_configure: (SI == station interface)
	- read Port capability register 0 (PCAPR0) to determine how many
	  RX rings and TX rings the hardware has for this port (PFs + VFs)
	  in total.
	- assign num_rings = min(TX rings, RX rings)
	- try to assign 8 TX rings and 8 RX rings to the PF
	  - if this fails, just assign ${num_rings} TX rings and
	    ${num_rings} RX rings to the PF
	- split the remaining RX and TX rings to the number of
	  configured VFs (example: if there are 16 RX rings and 16 TX
	  rings for a port with 2 VFs, the driver assigns 8RX/8TX rings
	  for the PF, and 4RX/4TX rings for each VF).
	   - if we couldn't assign 8RX/8TX rings for the PF in the
	     previous step, we don't assign any ring to the VF

So yeah, we have an equal number of RX and TX rings. The driver,
however, only uses 2 RX rings _actively_: one per CPU. The other 6, I
don't know, I guess I can use them for AF_XDP (I haven't looked very
closely at that yet), at the moment they're pretty much unused, even if
reserved and not given to VFs.

> >> > You mean that it's possible to receive a TC_SETUP_QDISC_MQPRIO or
> >> > TC_SETUP_QDISC_TAPRIO with num_tc == 1, and we have 2 CPUs?
> >>
> >> Not just that, this ndo can be called on arbitrary CPUs after a
> >> redirect. The code just calls through from the XDP receive path so which
> >> CPU it ends up on depends on the RSS+IRQ config of the other device,
> >> which may not even be the same driver; i.e., you have no control over
> >> that... :)
> >>
> >
> > What do you mean by "arbitrary" CPU? You can't plug CPUs in, it's a dual
> > core system... Why does the source ifindex matter at all? I'm using the
> > TX ring affined to the CPU that ndo_xdp_xmit is currently running on.
> 
> See, this is why I asked 'what mechanism ensures'. Because if that
> mechanism is 'this driver is only ever used on a system with fewer CPUs
> than TXQs', then that's of course fine :)
> 
> But there are drivers that do basically the same thing as what you've
> done here, *without* having such an assurance, and just looking at that
> function it's not obvious that there's an out-of-band reason why it's
> safe. And I literally just came from looking at such a case when I
> replied to your initial patch...

Maybe you were confused seeing that this is a PCI device, thinking it's
a plug-in card or something, therefore we don't get to choose the number
of CPUs that the host has. In hindsight, I don't know why you didn't ask
about this, it is pretty strange when you think about it.

It is actually more like a platform device with a PCI front-end - we
found this loophole in the PCI standard where you can create a "root
complex/integrated endpoint" which is basically an ECAM where the config
space contains PFs corresponding to some platform devices in the SoC (in
our case, all 4 Ethernet ports have their own PF, the switch has its own
PF, same thing for the MDIO controller and the 1588 timer). Their
register map is exposed as a number of BARs which use Enhanced
Allocation, so the generic PCI ECAM driver doesn't need to create any
translation windows for these addresses, it just uses what's in there,
which, surprise, is the actual base address of the peripheral in the
SoC's memory space.

We do that because we gain a lot of cool stuff by appearing as PCI
devices to system software, like for example multiple interfaces on top
of a 'shared MAC' are simply mapped over SR-IOV.

So it just 'smells' like PCI, but they're regular memory-mapped devices,
there is no PCI transaction layer or physical layer. At the moment the
LS1028A is the only SoC running Linux that integrates the ENETC block,
so we fully control the environment.

> >> > Well, yeah, I don't know what's the proper way to deal with that. Ideas?
> >>
> >> Well the obvious one is just:
> >>
> >> tx_ring = priv->tx_ring[smp_processor_id() % num_ring_ids];
> >>
> >> and then some kind of locking to deal with multiple CPUs accessing the
> >> same TX ring...
> >
> > By multiple CPUs accessing the same TX ring, you mean locking between
> > ndo_xdp_xmit and ndo_start_xmit? Can that even happen if the hardware
> > architecture is to have at least as many TX rings as CPUs?
> >
> > Because otherwise, I see that ndo_xdp_xmit is only called from
> > xdp_do_flush, which is in softirq context, which to my very rudimentary
> > knowledge run with bottom halves, thus preemption, disabled? So I don't
> > think it's possible for ndo_xdp_xmit and ndo_xmit, or even two
> > ndo_xdp_xmit instances, to access the same TX ring?
> 
> Yup, I think you're right about that. The "we always have more TXQs than
> CPUs" condition was the bit I was missing (and of course you're *sure*
> that this would never change sometime in the future, right? ;)).

I'm pretty sure, yeah, we build the SoCs and one of the requirements we
have is that every ENETC PF has enough TX rings in order for every CPU
to have its own one. That helps a lot with avoiding contention and
simplifying the driver. Maybe I'll use this opportunity to talk again to
the hardware design guys and make sure that the next SoCs with Linux
follow the same pattern as LS1028A, although I see no reason why not.

> > Sorry, I'm sure these are trivial questions, but I would like to really
> > understand what I need to change and why :D
> 
> Given the above I think the only potentially breaking thing is the
> #RXQ > #TXQ case I outlined. And maybe a comment documenting why indexing
> the tx_ring array by smp_processor_id() is safe would be nice? :)

Sure, which part exactly do you think would explain it best? Should I
add a reference to enetc_port_si_configure?

^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH net-next 9/9] net: enetc: add support for XDP_REDIRECT
  2021-04-01 19:38             ` Vladimir Oltean
@ 2021-04-02 10:56               ` Vladimir Oltean
  2021-04-03 11:07                 ` Toke Høiland-Jørgensen
  0 siblings, 1 reply; 23+ messages in thread
From: Vladimir Oltean @ 2021-04-02 10:56 UTC (permalink / raw)
  To: Toke Høiland-Jørgensen
  Cc: Jakub Kicinski, Alexei Starovoitov, Daniel Borkmann,
	Jesper Dangaard Brouer, John Fastabend, Andrii Nakryiko,
	Martin KaFai Lau, Song Liu, Yonghong Song, KP Singh,
	David S. Miller, netdev, bpf, Alexander Duyck, Ioana Ciornei,
	Alex Marginean, Claudiu Manoil, Ilias Apalodimas,
	Vladimir Oltean

On Thu, Apr 01, 2021 at 10:38:21PM +0300, Vladimir Oltean wrote:
> On Thu, Apr 01, 2021 at 08:01:42PM +0200, Toke Høiland-Jørgensen wrote:
> > Vladimir Oltean <olteanv@gmail.com> writes:
> > 
> > > On Thu, Apr 01, 2021 at 01:39:05PM +0200, Toke Høiland-Jørgensen wrote:
> > >> Vladimir Oltean <olteanv@gmail.com> writes:
> > >>
> > >> > On Thu, Apr 01, 2021 at 01:26:02PM +0200, Toke Høiland-Jørgensen wrote:
> > >> >> > +int enetc_xdp_xmit(struct net_device *ndev, int num_frames,
> > >> >> > +		   struct xdp_frame **frames, u32 flags)
> > >> >> > +{
> > >> >> > +	struct enetc_tx_swbd xdp_redirect_arr[ENETC_MAX_SKB_FRAGS] = {0};
> > >> >> > +	struct enetc_ndev_priv *priv = netdev_priv(ndev);
> > >> >> > +	struct enetc_bdr *tx_ring;
> > >> >> > +	int xdp_tx_bd_cnt, i, k;
> > >> >> > +	int xdp_tx_frm_cnt = 0;
> > >> >> > +
> > >> >> > +	tx_ring = priv->tx_ring[smp_processor_id()];
> > >> >>
> > >> >> What mechanism guarantees that this won't overflow the array? :)
> > >> >
> > >> > Which array, the array of TX rings?
> > >>
> > >> Yes.
> > >>
> > >
> > > The problem isn't even accessing an out-of-bounds element in the TX ring array.
> > >
> > > As it turns out, I had a relatively superficial understanding of how
> > > things are organized, but let me try to explain.
> > >
> > > The number of TX rings is a configurable resource (between PFs and VFs)
> > > and we read the capability at probe time:
> > >
> > > enetc_get_si_caps:
> > > 	val = enetc_rd(hw, ENETC_SICAPR0);
> > > 	si->num_rx_rings = (val >> 16) & 0xff;
> > > 	si->num_tx_rings = val & 0xff;
> > >
> > > enetc_init_si_rings_params:
> > > 	priv->num_tx_rings = si->num_tx_rings;
> > >
> > > In any case, the TX array is declared as:
> > >
> > > struct enetc_ndev_priv {
> > > 	struct enetc_bdr *tx_ring[16];
> > > 	struct enetc_bdr *rx_ring[16];
> > > };
> > >
> > > because that's the maximum hardware capability.
> > >
> > > The priv->tx_ring array is populated in:
> > >
> > > enetc_alloc_msix:
> > > 	/* # of tx rings per int vector */
> > > 	v_tx_rings = priv->num_tx_rings / priv->bdr_int_num;
> > >
> > > 	for (i = 0; i < priv->bdr_int_num; i++) {
> > > 		for (j = 0; j < v_tx_rings; j++) {
> > > 			if (priv->bdr_int_num == ENETC_MAX_BDR_INT)
> > > 				idx = 2 * j + i; /* 2 CPUs */
> > > 			else
> > > 				idx = j + i * v_tx_rings; /* default */
> > >
> > > 			priv->tx_ring[idx] = bdr;
> > > 		}
> > > 	}
> > >
> > > priv->bdr_int_num is set to "num_online_cpus()".
> > > On LS1028A, it can be either 1 or 2 (and the ENETC_MAX_BDR_INT macro is
> > > equal to 2).
> > >
> > > Otherwise said, the convoluted logic above does the following:
> > > - It affines an MSI interrupt vector per CPU
> > > - It affines an RX ring per MSI vector, hence per CPU
> > > - It balances the fixed number of TX rings (say 8) among the available
> > >   MSI vectors, hence CPUs (say 2). It does this by iterating with i
> > >   through the RX MSI interrupt vectors, and with j through the number of
> > >   TX rings per MSI vector.
> > >
> > > This logic maps:
> > > - the even TX rings to CPU 0 and the odd TX rings to CPU 1, if 2 CPUs
> > >   are used
> > > - all TX rings to CPU 0, if 1 CPU is used
> > >
> > > This is done because we have this logic in enetc_poll:
> > >
> > > 	for (i = 0; i < v->count_tx_rings; i++)
> > > 		if (!enetc_clean_tx_ring(&v->tx_ring[i], budget))
> > > 			complete = false;
> > >
> > > for processing the TX completions of a given group of TX rings in the RX
> > > MSI interrupt handler of a certain CPU.
> > >
> > > Otherwise said, priv->tx_ring[i] is always BD ring i, and that mapping
> > > never changes. All 8 TX rings are enabled and available for use.
> > >
> > > What I knew about tc-taprio and tc-mqprio is that they only enqueue to
> > > TX queues [0, num_tc-1] because of this, as it turns out:
> > >
> > > enetc_xmit:
> > > 	tx_ring = priv->tx_ring[skb->queue_mapping];
> > >
> > > where skb->queue_mapping is given by:
> > > 	err = netif_set_real_num_tx_queues(ndev, priv->num_tx_rings);
> > > and by this, respectively, from the mqprio code path:
> > > 	netif_set_real_num_tx_queues(ndev, num_tc);
> > >
> > > As for why XDP works, and priv->tx_ring[smp_processor_id()] is:
> > > - TX ring 0 for CPU 0 and TX ring 1 for CPU 1, if 2 CPUs are used
> > > - TX ring 0, if 1 CPU is used
> > >
> > > The TX completions in the first case are handled by:
> > > - CPU 0 for TX ring 0 (because it is even) and CPU 1 for TX ring 1
> > >   (because it is odd), if 2 CPUs are used, due to the mapping I talked
> > >   about earlier
> > > - CPU 0 if only 1 CPU is used
> > 
> > Right - thank you for the details! So what are the constraints on the
> > configuration. Specifically, given two netdevs on the same device, is it
> > possible that the system can ever end up in a situation where one device
> > has two *RXQs* configured, and the other only one *TXQ*. Because then
> > you could get a redirect from RXQ 1 on one device, which would also end
> > up trying to transmit on TXQ 1 on the other device; and that would break
> > if that other device only has TXQ 0 configured... Same thing if a single
> > device has 2 RXQs but only one TXQ (it can redirect to itself).
> 
> I discover more and more of the driver as I talk to you, I like it :D
> 
> So I said that there is a maximum number of RX and TX rings splittable
> between the PF and its VFs, but I wasn't exactly sure where that
> configuration is done. I found it now.
> 
> enetc_port_si_configure: (SI == station interface)
> 	- read Port capability register 0 (PCAPR0) to determine how many
> 	  RX rings and TX rings the hardware has for this port (PFs + VFs)
> 	  in total.
> 	- assign num_rings = min(TX rings, RX rings)
> 	- try to assign 8 TX rings and 8 RX rings to the PF
> 	  - if this fails, just assign ${num_rings} TX rings and
> 	    ${num_rings} RX rings to the PF
> 	- split the remaining RX and TX rings to the number of
> 	  configured VFs (example: if there are 16 RX rings and 16 TX
> 	  rings for a port with 2 VFs, the driver assigns 8RX/8TX rings
> 	  for the PF, and 4RX/4TX rings for each VF).
> 	   - if we couldn't assign 8RX/8TX rings for the PF in the
> 	     previous step, we don't assign any ring to the VF
> 
> So yeah, we have an equal number of RX and TX rings. The driver,
> however, only uses 2 RX rings _actively_: one per CPU. The other 6, I
> don't know, I guess I can use them for AF_XDP (I haven't looked very
> closely at that yet), at the moment they're pretty much unused, even if
> reserved and not given to VFs.
> 
> > >> > You mean that it's possible to receive a TC_SETUP_QDISC_MQPRIO or
> > >> > TC_SETUP_QDISC_TAPRIO with num_tc == 1, and we have 2 CPUs?
> > >>
> > >> Not just that, this ndo can be called on arbitrary CPUs after a
> > >> redirect. The code just calls through from the XDP receive path so which
> > >> CPU it ends up on depends on the RSS+IRQ config of the other device,
> > >> which may not even be the same driver; i.e., you have no control over
> > >> that... :)
> > >>
> > >
> > > What do you mean by "arbitrary" CPU? You can't plug CPUs in, it's a dual
> > > core system... Why does the source ifindex matter at all? I'm using the
> > > TX ring affined to the CPU that ndo_xdp_xmit is currently running on.
> > 
> > See, this is why I asked 'what mechanism ensures'. Because if that
> > mechanism is 'this driver is only ever used on a system with fewer CPUs
> > than TXQs', then that's of course fine :)
> > 
> > But there are drivers that do basically the same thing as what you've
> > done here, *without* having such an assurance, and just looking at that
> > function it's not obvious that there's an out-of-band reason why it's
> > safe. And I literally just came from looking at such a case when I
> > replied to your initial patch...
> 
> Maybe you were confused seeing that this is a PCI device, thinking it's
> a plug-in card or something, therefore we don't get to choose the number
> of CPUs that the host has. In hindsight, I don't know why you didn't ask
> about this, it is pretty strange when you think about it.
> 
> It is actually more like a platform device with a PCI front-end - we
> found this loophole in the PCI standard where you can create a "root
> complex/integrated endpoint" which is basically an ECAM where the config
> space contains PFs corresponding to some platform devices in the SoC (in
> our case, all 4 Ethernet ports have their own PF, the switch has its own
> PF, same thing for the MDIO controller and the 1588 timer). Their
> register map is exposed as a number of BARs which use Enhanced
> Allocation, so the generic PCI ECAM driver doesn't need to create any
> translation windows for these addresses, it just uses what's in there,
> which, surprise, is the actual base address of the peripheral in the
> SoC's memory space.
> 
> We do that because we gain a lot of cool stuff by appearing as PCI
> devices to system software, like for example multiple interfaces on top
> of a 'shared MAC' are simply mapped over SR-IOV.
> 
> So it just 'smells' like PCI, but they're regular memory-mapped devices,
> there is no PCI transaction layer or physical layer. At the moment the
> LS1028A is the only SoC running Linux that integrates the ENETC block,
> so we fully control the environment.
> 
> > >> > Well, yeah, I don't know what's the proper way to deal with that. Ideas?
> > >>
> > >> Well the obvious one is just:
> > >>
> > >> tx_ring = priv->tx_ring[smp_processor_id() % num_ring_ids];
> > >>
> > >> and then some kind of locking to deal with multiple CPUs accessing the
> > >> same TX ring...
> > >
> > > By multiple CPUs accessing the same TX ring, you mean locking between
> > > ndo_xdp_xmit and ndo_start_xmit? Can that even happen if the hardware
> > > architecture is to have at least as many TX rings as CPUs?
> > >
> > > Because otherwise, I see that ndo_xdp_xmit is only called from
> > > xdp_do_flush, which is in softirq context, which to my very rudimentary
> > > knowledge run with bottom halves, thus preemption, disabled? So I don't
> > > think it's possible for ndo_xdp_xmit and ndo_xmit, or even two
> > > ndo_xdp_xmit instances, to access the same TX ring?
> > 
> > Yup, I think you're right about that. The "we always have more TXQs than
> > CPUs" condition was the bit I was missing (and of course you're *sure*
> > that this would never change sometime in the future, right? ;)).
> 
> I'm pretty sure, yeah, we build the SoCs and one of the requirements we
> have is that every ENETC PF has enough TX rings in order for every CPU
> to have its own one. That helps a lot with avoiding contention and
> simplifying the driver. Maybe I'll use this opportunity to talk again to
> the hardware design guys and make sure that the next SoCs with Linux
> follow the same pattern as LS1028A, although I see no reason why not.
> 
> > > Sorry, I'm sure these are trivial questions, but I would like to really
> > > understand what I need to change and why :D
> > 
> > Given the above I think the only potentially breaking thing is the
> > #RXQ > #TXQ case I outlined. And maybe a comment documenting why indexing
> > the tx_ring array by smp_processor_id() is safe would be nice? :)
> 
> Sure, which part exactly do you think would explain it best? Should I
> add a reference to enetc_port_si_configure?

After discussing a bit more with Claudiu, I think we do have a problem,
and it has to do with concurrent ndo_xdp_xmit on one CPU and ndo_start_xmit
on another CPU.

See, even if we have 8 TX rings, they are not really affined to any CPU.
Instead, when we call netif_set_real_num_tx_queues, we allow netdev_pick_tx
to hash amongs the TX queues of the same priority. There are three consequences:
- Traffic with the same hash will be sent to the same TX queue, thus
  avoiding reordering for packets belonging to the same stream.
- Traffic with different hashes are distributed to different TX queues.
- If we have two CPUs sending traffic with the same hash, they will
  serialize on the TX lock of the same netdev queue.

The last one is a problem because our XDP_REDIRECT tries to associate
one TX ring with one CPU, and, as explained above, that TX ring might
already be used by our ndo_start_xmit on another CPU, selected by
netdev_pick_tx.

The first idea was to implement ndo_select_queue for the network stack,
and select the TX ring based on smp_processor_id(). But we know that
this will break the first two effects of netdev_pick_tx, which are very
much desirable. For example, if we have a user space process sending a
TCP stream, and the scheduler migrates that process from one CPU to
another, then the ndo_select_queue output for that TCP stream will
change, and we will have TX reordering for packets belonging to the same
stream. Not at all ideal.

Another idea is to just crop some TX queues from the network stack, and
basically call netif_set_real_num_tx_queues(6), leaving one TX ring per
CPU dedicated to XDP. This will work just fine for normal qdiscs, except
that with mqprio/taprio we have a problem. Our TX rings have a configurable
strict priority for the hardware egress scheduler. When we don't have
mqprio/taprio, all TX rings have the same priority of 0 (therefore it is
safe to allow hashing to select one at random), but when we have mqprio
or taprio, we enjoy the benefit of configuring the priority of each TX
ring using the "map" option. The problem, of course, is that if we crop
2 TX rings out of what the network stack sees, then we are no longer
able to configure their queue-to-traffic-class mapping through
mqprio/taprio, so we cannot change their prioritization relative to the
network stack queues. In a way, this seems to be in line with the XDP
design because that bypasses anything that has to do with qdiscs, but we
don't really like that. We also have some other qdisc-based offloads
such as Credit Based Shaper, and we would very much like to be able to
set bandwidth profiles for the XDP rings, for AVB/TSN use cases.

Finally there is the option of taking the network stack's TX lock in our
ndo_xdp_xmit, but frankly I would leave that option as a last resort.
Maybe we could make this less expensive by bulk-enqueuing into a
temporary array of buffer descriptors, and only taking the xmit lock
when flushing that array (since that is the only portion that strictly
needs to be protected against concurrency). But the problem with this
approach is that if you have a temporary array, it becomes a lot more
difficult and error-prone to not take more frames than you can enqueue.
For example, the TX ring might have only 20 free entries, and you filled
your BD array with 32 frames, and you told the caller of ndo_xdp_xmit
that you processed all those frames. Now when push comes to shove and
you actually need to enqueue them, you end up in the position that you
must drop them yourself. This seems to be very much against the design
principle of commit fdc13979f91e ("bpf, devmap: Move drop error path to
devmap for XDP_REDIRECT") whose desire is to let XDP handle the dropping
of excess TX frames.

What do you think?

^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH net-next 9/9] net: enetc: add support for XDP_REDIRECT
  2021-04-02 10:56               ` Vladimir Oltean
@ 2021-04-03 11:07                 ` Toke Høiland-Jørgensen
  2021-04-03 12:12                   ` Vladimir Oltean
  0 siblings, 1 reply; 23+ messages in thread
From: Toke Høiland-Jørgensen @ 2021-04-03 11:07 UTC (permalink / raw)
  To: Vladimir Oltean
  Cc: Jakub Kicinski, Alexei Starovoitov, Daniel Borkmann,
	Jesper Dangaard Brouer, John Fastabend, Andrii Nakryiko,
	Martin KaFai Lau, Song Liu, Yonghong Song, KP Singh,
	David S. Miller, netdev, bpf, Alexander Duyck, Ioana Ciornei,
	Alex Marginean, Claudiu Manoil, Ilias Apalodimas,
	Vladimir Oltean

Vladimir Oltean <olteanv@gmail.com> writes:

> On Thu, Apr 01, 2021 at 10:38:21PM +0300, Vladimir Oltean wrote:
>> On Thu, Apr 01, 2021 at 08:01:42PM +0200, Toke Høiland-Jørgensen wrote:
>> > Vladimir Oltean <olteanv@gmail.com> writes:
>> > 
>> > > On Thu, Apr 01, 2021 at 01:39:05PM +0200, Toke Høiland-Jørgensen wrote:
>> > >> Vladimir Oltean <olteanv@gmail.com> writes:
>> > >>
>> > >> > On Thu, Apr 01, 2021 at 01:26:02PM +0200, Toke Høiland-Jørgensen wrote:
>> > >> >> > +int enetc_xdp_xmit(struct net_device *ndev, int num_frames,
>> > >> >> > +		   struct xdp_frame **frames, u32 flags)
>> > >> >> > +{
>> > >> >> > +	struct enetc_tx_swbd xdp_redirect_arr[ENETC_MAX_SKB_FRAGS] = {0};
>> > >> >> > +	struct enetc_ndev_priv *priv = netdev_priv(ndev);
>> > >> >> > +	struct enetc_bdr *tx_ring;
>> > >> >> > +	int xdp_tx_bd_cnt, i, k;
>> > >> >> > +	int xdp_tx_frm_cnt = 0;
>> > >> >> > +
>> > >> >> > +	tx_ring = priv->tx_ring[smp_processor_id()];
>> > >> >>
>> > >> >> What mechanism guarantees that this won't overflow the array? :)
>> > >> >
>> > >> > Which array, the array of TX rings?
>> > >>
>> > >> Yes.
>> > >>
>> > >
>> > > The problem isn't even accessing an out-of-bounds element in the TX ring array.
>> > >
>> > > As it turns out, I had a relatively superficial understanding of how
>> > > things are organized, but let me try to explain.
>> > >
>> > > The number of TX rings is a configurable resource (between PFs and VFs)
>> > > and we read the capability at probe time:
>> > >
>> > > enetc_get_si_caps:
>> > > 	val = enetc_rd(hw, ENETC_SICAPR0);
>> > > 	si->num_rx_rings = (val >> 16) & 0xff;
>> > > 	si->num_tx_rings = val & 0xff;
>> > >
>> > > enetc_init_si_rings_params:
>> > > 	priv->num_tx_rings = si->num_tx_rings;
>> > >
>> > > In any case, the TX array is declared as:
>> > >
>> > > struct enetc_ndev_priv {
>> > > 	struct enetc_bdr *tx_ring[16];
>> > > 	struct enetc_bdr *rx_ring[16];
>> > > };
>> > >
>> > > because that's the maximum hardware capability.
>> > >
>> > > The priv->tx_ring array is populated in:
>> > >
>> > > enetc_alloc_msix:
>> > > 	/* # of tx rings per int vector */
>> > > 	v_tx_rings = priv->num_tx_rings / priv->bdr_int_num;
>> > >
>> > > 	for (i = 0; i < priv->bdr_int_num; i++) {
>> > > 		for (j = 0; j < v_tx_rings; j++) {
>> > > 			if (priv->bdr_int_num == ENETC_MAX_BDR_INT)
>> > > 				idx = 2 * j + i; /* 2 CPUs */
>> > > 			else
>> > > 				idx = j + i * v_tx_rings; /* default */
>> > >
>> > > 			priv->tx_ring[idx] = bdr;
>> > > 		}
>> > > 	}
>> > >
>> > > priv->bdr_int_num is set to "num_online_cpus()".
>> > > On LS1028A, it can be either 1 or 2 (and the ENETC_MAX_BDR_INT macro is
>> > > equal to 2).
>> > >
>> > > Otherwise said, the convoluted logic above does the following:
>> > > - It affines an MSI interrupt vector per CPU
>> > > - It affines an RX ring per MSI vector, hence per CPU
>> > > - It balances the fixed number of TX rings (say 8) among the available
>> > >   MSI vectors, hence CPUs (say 2). It does this by iterating with i
>> > >   through the RX MSI interrupt vectors, and with j through the number of
>> > >   TX rings per MSI vector.
>> > >
>> > > This logic maps:
>> > > - the even TX rings to CPU 0 and the odd TX rings to CPU 1, if 2 CPUs
>> > >   are used
>> > > - all TX rings to CPU 0, if 1 CPU is used
>> > >
>> > > This is done because we have this logic in enetc_poll:
>> > >
>> > > 	for (i = 0; i < v->count_tx_rings; i++)
>> > > 		if (!enetc_clean_tx_ring(&v->tx_ring[i], budget))
>> > > 			complete = false;
>> > >
>> > > for processing the TX completions of a given group of TX rings in the RX
>> > > MSI interrupt handler of a certain CPU.
>> > >
>> > > Otherwise said, priv->tx_ring[i] is always BD ring i, and that mapping
>> > > never changes. All 8 TX rings are enabled and available for use.
>> > >
>> > > What I knew about tc-taprio and tc-mqprio is that they only enqueue to
>> > > TX queues [0, num_tc-1] because of this, as it turns out:
>> > >
>> > > enetc_xmit:
>> > > 	tx_ring = priv->tx_ring[skb->queue_mapping];
>> > >
>> > > where skb->queue_mapping is given by:
>> > > 	err = netif_set_real_num_tx_queues(ndev, priv->num_tx_rings);
>> > > and by this, respectively, from the mqprio code path:
>> > > 	netif_set_real_num_tx_queues(ndev, num_tc);
>> > >
>> > > As for why XDP works, and priv->tx_ring[smp_processor_id()] is:
>> > > - TX ring 0 for CPU 0 and TX ring 1 for CPU 1, if 2 CPUs are used
>> > > - TX ring 0, if 1 CPU is used
>> > >
>> > > The TX completions in the first case are handled by:
>> > > - CPU 0 for TX ring 0 (because it is even) and CPU 1 for TX ring 1
>> > >   (because it is odd), if 2 CPUs are used, due to the mapping I talked
>> > >   about earlier
>> > > - CPU 0 if only 1 CPU is used
>> > 
>> > Right - thank you for the details! So what are the constraints on the
>> > configuration. Specifically, given two netdevs on the same device, is it
>> > possible that the system can ever end up in a situation where one device
>> > has two *RXQs* configured, and the other only one *TXQ*. Because then
>> > you could get a redirect from RXQ 1 on one device, which would also end
>> > up trying to transmit on TXQ 1 on the other device; and that would break
>> > if that other device only has TXQ 0 configured... Same thing if a single
>> > device has 2 RXQs but only one TXQ (it can redirect to itself).
>> 
>> I discover more and more of the driver as I talk to you, I like it :D
>> 
>> So I said that there is a maximum number of RX and TX rings splittable
>> between the PF and its VFs, but I wasn't exactly sure where that
>> configuration is done. I found it now.
>> 
>> enetc_port_si_configure: (SI == station interface)
>> 	- read Port capability register 0 (PCAPR0) to determine how many
>> 	  RX rings and TX rings the hardware has for this port (PFs + VFs)
>> 	  in total.
>> 	- assign num_rings = min(TX rings, RX rings)
>> 	- try to assign 8 TX rings and 8 RX rings to the PF
>> 	  - if this fails, just assign ${num_rings} TX rings and
>> 	    ${num_rings} RX rings to the PF
>> 	- split the remaining RX and TX rings to the number of
>> 	  configured VFs (example: if there are 16 RX rings and 16 TX
>> 	  rings for a port with 2 VFs, the driver assigns 8RX/8TX rings
>> 	  for the PF, and 4RX/4TX rings for each VF).
>> 	   - if we couldn't assign 8RX/8TX rings for the PF in the
>> 	     previous step, we don't assign any ring to the VF
>> 
>> So yeah, we have an equal number of RX and TX rings. The driver,
>> however, only uses 2 RX rings _actively_: one per CPU. The other 6, I
>> don't know, I guess I can use them for AF_XDP (I haven't looked very
>> closely at that yet), at the moment they're pretty much unused, even if
>> reserved and not given to VFs.
>> 
>> > >> > You mean that it's possible to receive a TC_SETUP_QDISC_MQPRIO or
>> > >> > TC_SETUP_QDISC_TAPRIO with num_tc == 1, and we have 2 CPUs?
>> > >>
>> > >> Not just that, this ndo can be called on arbitrary CPUs after a
>> > >> redirect. The code just calls through from the XDP receive path so which
>> > >> CPU it ends up on depends on the RSS+IRQ config of the other device,
>> > >> which may not even be the same driver; i.e., you have no control over
>> > >> that... :)
>> > >>
>> > >
>> > > What do you mean by "arbitrary" CPU? You can't plug CPUs in, it's a dual
>> > > core system... Why does the source ifindex matter at all? I'm using the
>> > > TX ring affined to the CPU that ndo_xdp_xmit is currently running on.
>> > 
>> > See, this is why I asked 'what mechanism ensures'. Because if that
>> > mechanism is 'this driver is only ever used on a system with fewer CPUs
>> > than TXQs', then that's of course fine :)
>> > 
>> > But there are drivers that do basically the same thing as what you've
>> > done here, *without* having such an assurance, and just looking at that
>> > function it's not obvious that there's an out-of-band reason why it's
>> > safe. And I literally just came from looking at such a case when I
>> > replied to your initial patch...
>> 
>> Maybe you were confused seeing that this is a PCI device, thinking it's
>> a plug-in card or something, therefore we don't get to choose the number
>> of CPUs that the host has. In hindsight, I don't know why you didn't ask
>> about this, it is pretty strange when you think about it.
>> 
>> It is actually more like a platform device with a PCI front-end - we
>> found this loophole in the PCI standard where you can create a "root
>> complex/integrated endpoint" which is basically an ECAM where the config
>> space contains PFs corresponding to some platform devices in the SoC (in
>> our case, all 4 Ethernet ports have their own PF, the switch has its own
>> PF, same thing for the MDIO controller and the 1588 timer). Their
>> register map is exposed as a number of BARs which use Enhanced
>> Allocation, so the generic PCI ECAM driver doesn't need to create any
>> translation windows for these addresses, it just uses what's in there,
>> which, surprise, is the actual base address of the peripheral in the
>> SoC's memory space.
>> 
>> We do that because we gain a lot of cool stuff by appearing as PCI
>> devices to system software, like for example multiple interfaces on top
>> of a 'shared MAC' are simply mapped over SR-IOV.
>> 
>> So it just 'smells' like PCI, but they're regular memory-mapped devices,
>> there is no PCI transaction layer or physical layer. At the moment the
>> LS1028A is the only SoC running Linux that integrates the ENETC block,
>> so we fully control the environment.
>> 
>> > >> > Well, yeah, I don't know what's the proper way to deal with that. Ideas?
>> > >>
>> > >> Well the obvious one is just:
>> > >>
>> > >> tx_ring = priv->tx_ring[smp_processor_id() % num_ring_ids];
>> > >>
>> > >> and then some kind of locking to deal with multiple CPUs accessing the
>> > >> same TX ring...
>> > >
>> > > By multiple CPUs accessing the same TX ring, you mean locking between
>> > > ndo_xdp_xmit and ndo_start_xmit? Can that even happen if the hardware
>> > > architecture is to have at least as many TX rings as CPUs?
>> > >
>> > > Because otherwise, I see that ndo_xdp_xmit is only called from
>> > > xdp_do_flush, which is in softirq context, which to my very rudimentary
>> > > knowledge run with bottom halves, thus preemption, disabled? So I don't
>> > > think it's possible for ndo_xdp_xmit and ndo_xmit, or even two
>> > > ndo_xdp_xmit instances, to access the same TX ring?
>> > 
>> > Yup, I think you're right about that. The "we always have more TXQs than
>> > CPUs" condition was the bit I was missing (and of course you're *sure*
>> > that this would never change sometime in the future, right? ;)).
>> 
>> I'm pretty sure, yeah, we build the SoCs and one of the requirements we
>> have is that every ENETC PF has enough TX rings in order for every CPU
>> to have its own one. That helps a lot with avoiding contention and
>> simplifying the driver. Maybe I'll use this opportunity to talk again to
>> the hardware design guys and make sure that the next SoCs with Linux
>> follow the same pattern as LS1028A, although I see no reason why not.
>> 
>> > > Sorry, I'm sure these are trivial questions, but I would like to really
>> > > understand what I need to change and why :D
>> > 
>> > Given the above I think the only potentially breaking thing is the
>> > #RXQ > #TXQ case I outlined. And maybe a comment documenting why indexing
>> > the tx_ring array by smp_processor_id() is safe would be nice? :)
>> 
>> Sure, which part exactly do you think would explain it best? Should I
>> add a reference to enetc_port_si_configure?
>
> After discussing a bit more with Claudiu, I think we do have a problem,
> and it has to do with concurrent ndo_xdp_xmit on one CPU and ndo_start_xmit
> on another CPU.
>
> See, even if we have 8 TX rings, they are not really affined to any CPU.
> Instead, when we call netif_set_real_num_tx_queues, we allow netdev_pick_tx
> to hash amongs the TX queues of the same priority. There are three consequences:
> - Traffic with the same hash will be sent to the same TX queue, thus
>   avoiding reordering for packets belonging to the same stream.
> - Traffic with different hashes are distributed to different TX queues.
> - If we have two CPUs sending traffic with the same hash, they will
>   serialize on the TX lock of the same netdev queue.
>
> The last one is a problem because our XDP_REDIRECT tries to associate
> one TX ring with one CPU, and, as explained above, that TX ring might
> already be used by our ndo_start_xmit on another CPU, selected by
> netdev_pick_tx.
>
> The first idea was to implement ndo_select_queue for the network stack,
> and select the TX ring based on smp_processor_id(). But we know that
> this will break the first two effects of netdev_pick_tx, which are very
> much desirable. For example, if we have a user space process sending a
> TCP stream, and the scheduler migrates that process from one CPU to
> another, then the ndo_select_queue output for that TCP stream will
> change, and we will have TX reordering for packets belonging to the same
> stream. Not at all ideal.
>
> Another idea is to just crop some TX queues from the network stack, and
> basically call netif_set_real_num_tx_queues(6), leaving one TX ring per
> CPU dedicated to XDP. This will work just fine for normal qdiscs, except
> that with mqprio/taprio we have a problem. Our TX rings have a configurable
> strict priority for the hardware egress scheduler. When we don't have
> mqprio/taprio, all TX rings have the same priority of 0 (therefore it is
> safe to allow hashing to select one at random), but when we have mqprio
> or taprio, we enjoy the benefit of configuring the priority of each TX
> ring using the "map" option. The problem, of course, is that if we crop
> 2 TX rings out of what the network stack sees, then we are no longer
> able to configure their queue-to-traffic-class mapping through
> mqprio/taprio, so we cannot change their prioritization relative to the
> network stack queues. In a way, this seems to be in line with the XDP
> design because that bypasses anything that has to do with qdiscs, but we
> don't really like that. We also have some other qdisc-based offloads
> such as Credit Based Shaper, and we would very much like to be able to
> set bandwidth profiles for the XDP rings, for AVB/TSN use cases.

You'd not be the first driver to solve this by just carving out a couple
of TX rings for XDP :)

And while I get the desire for being able to configure these things for
XDP as well, I'm not sure that the qdisc interface is the right one to
use for that. There was a general TXQ allocation idea that unfortunately
stalled out, but there is also ongoing work on XDP+TSN - I'm hoping
Jesper can chime in with the details...

> Finally there is the option of taking the network stack's TX lock in our
> ndo_xdp_xmit, but frankly I would leave that option as a last resort.
> Maybe we could make this less expensive by bulk-enqueuing into a
> temporary array of buffer descriptors, and only taking the xmit lock
> when flushing that array (since that is the only portion that strictly
> needs to be protected against concurrency). But the problem with this
> approach is that if you have a temporary array, it becomes a lot more
> difficult and error-prone to not take more frames than you can enqueue.
> For example, the TX ring might have only 20 free entries, and you filled
> your BD array with 32 frames, and you told the caller of ndo_xdp_xmit
> that you processed all those frames. Now when push comes to shove and
> you actually need to enqueue them, you end up in the position that you
> must drop them yourself. This seems to be very much against the design
> principle of commit fdc13979f91e ("bpf, devmap: Move drop error path to
> devmap for XDP_REDIRECT") whose desire is to let XDP handle the dropping
> of excess TX frames.

Note that there's already bulking in XDP_REDIRECT: after an XDP program
returns XDP_REDIRECT, the packets will actually be put on a bulk queue
(see bq_enqueue() in devmap.c), and that will be flushed to the TX
driver at the end of the (RX) NAPI cycle. So taking a lock in
ndo_xdp_xmit() may not be quite as much overhead as you think it is -
so maybe it would be worth benchmarking before ruling this out entirely? :)

-Toke


^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH net-next 9/9] net: enetc: add support for XDP_REDIRECT
  2021-04-03 11:07                 ` Toke Høiland-Jørgensen
@ 2021-04-03 12:12                   ` Vladimir Oltean
  2021-04-06 11:23                     ` Toke Høiland-Jørgensen
  0 siblings, 1 reply; 23+ messages in thread
From: Vladimir Oltean @ 2021-04-03 12:12 UTC (permalink / raw)
  To: Toke Høiland-Jørgensen
  Cc: Jakub Kicinski, Alexei Starovoitov, Daniel Borkmann,
	Jesper Dangaard Brouer, John Fastabend, Andrii Nakryiko,
	Martin KaFai Lau, Song Liu, Yonghong Song, KP Singh,
	David S. Miller, netdev, bpf, Alexander Duyck, Ioana Ciornei,
	Alex Marginean, Claudiu Manoil, Ilias Apalodimas,
	Vladimir Oltean

On Sat, Apr 03, 2021 at 01:07:29PM +0200, Toke Høiland-Jørgensen wrote:
> Vladimir Oltean <olteanv@gmail.com> writes:
> 
> > On Thu, Apr 01, 2021 at 10:38:21PM +0300, Vladimir Oltean wrote:
> >> On Thu, Apr 01, 2021 at 08:01:42PM +0200, Toke Høiland-Jørgensen wrote:
> >> > Vladimir Oltean <olteanv@gmail.com> writes:
> >> > 
> >> > > On Thu, Apr 01, 2021 at 01:39:05PM +0200, Toke Høiland-Jørgensen wrote:
> >> > >> Vladimir Oltean <olteanv@gmail.com> writes:
> >> > >>
> >> > >> > On Thu, Apr 01, 2021 at 01:26:02PM +0200, Toke Høiland-Jørgensen wrote:
> >> > >> >> > +int enetc_xdp_xmit(struct net_device *ndev, int num_frames,
> >> > >> >> > +		   struct xdp_frame **frames, u32 flags)
> >> > >> >> > +{
> >> > >> >> > +	struct enetc_tx_swbd xdp_redirect_arr[ENETC_MAX_SKB_FRAGS] = {0};
> >> > >> >> > +	struct enetc_ndev_priv *priv = netdev_priv(ndev);
> >> > >> >> > +	struct enetc_bdr *tx_ring;
> >> > >> >> > +	int xdp_tx_bd_cnt, i, k;
> >> > >> >> > +	int xdp_tx_frm_cnt = 0;
> >> > >> >> > +
> >> > >> >> > +	tx_ring = priv->tx_ring[smp_processor_id()];
> >> > >> >>
> >> > >> >> What mechanism guarantees that this won't overflow the array? :)
> >> > >> >
> >> > >> > Which array, the array of TX rings?
> >> > >>
> >> > >> Yes.
> >> > >>
> >> > >
> >> > > The problem isn't even accessing an out-of-bounds element in the TX ring array.
> >> > >
> >> > > As it turns out, I had a relatively superficial understanding of how
> >> > > things are organized, but let me try to explain.
> >> > >
> >> > > The number of TX rings is a configurable resource (between PFs and VFs)
> >> > > and we read the capability at probe time:
> >> > >
> >> > > enetc_get_si_caps:
> >> > > 	val = enetc_rd(hw, ENETC_SICAPR0);
> >> > > 	si->num_rx_rings = (val >> 16) & 0xff;
> >> > > 	si->num_tx_rings = val & 0xff;
> >> > >
> >> > > enetc_init_si_rings_params:
> >> > > 	priv->num_tx_rings = si->num_tx_rings;
> >> > >
> >> > > In any case, the TX array is declared as:
> >> > >
> >> > > struct enetc_ndev_priv {
> >> > > 	struct enetc_bdr *tx_ring[16];
> >> > > 	struct enetc_bdr *rx_ring[16];
> >> > > };
> >> > >
> >> > > because that's the maximum hardware capability.
> >> > >
> >> > > The priv->tx_ring array is populated in:
> >> > >
> >> > > enetc_alloc_msix:
> >> > > 	/* # of tx rings per int vector */
> >> > > 	v_tx_rings = priv->num_tx_rings / priv->bdr_int_num;
> >> > >
> >> > > 	for (i = 0; i < priv->bdr_int_num; i++) {
> >> > > 		for (j = 0; j < v_tx_rings; j++) {
> >> > > 			if (priv->bdr_int_num == ENETC_MAX_BDR_INT)
> >> > > 				idx = 2 * j + i; /* 2 CPUs */
> >> > > 			else
> >> > > 				idx = j + i * v_tx_rings; /* default */
> >> > >
> >> > > 			priv->tx_ring[idx] = bdr;
> >> > > 		}
> >> > > 	}
> >> > >
> >> > > priv->bdr_int_num is set to "num_online_cpus()".
> >> > > On LS1028A, it can be either 1 or 2 (and the ENETC_MAX_BDR_INT macro is
> >> > > equal to 2).
> >> > >
> >> > > Otherwise said, the convoluted logic above does the following:
> >> > > - It affines an MSI interrupt vector per CPU
> >> > > - It affines an RX ring per MSI vector, hence per CPU
> >> > > - It balances the fixed number of TX rings (say 8) among the available
> >> > >   MSI vectors, hence CPUs (say 2). It does this by iterating with i
> >> > >   through the RX MSI interrupt vectors, and with j through the number of
> >> > >   TX rings per MSI vector.
> >> > >
> >> > > This logic maps:
> >> > > - the even TX rings to CPU 0 and the odd TX rings to CPU 1, if 2 CPUs
> >> > >   are used
> >> > > - all TX rings to CPU 0, if 1 CPU is used
> >> > >
> >> > > This is done because we have this logic in enetc_poll:
> >> > >
> >> > > 	for (i = 0; i < v->count_tx_rings; i++)
> >> > > 		if (!enetc_clean_tx_ring(&v->tx_ring[i], budget))
> >> > > 			complete = false;
> >> > >
> >> > > for processing the TX completions of a given group of TX rings in the RX
> >> > > MSI interrupt handler of a certain CPU.
> >> > >
> >> > > Otherwise said, priv->tx_ring[i] is always BD ring i, and that mapping
> >> > > never changes. All 8 TX rings are enabled and available for use.
> >> > >
> >> > > What I knew about tc-taprio and tc-mqprio is that they only enqueue to
> >> > > TX queues [0, num_tc-1] because of this, as it turns out:
> >> > >
> >> > > enetc_xmit:
> >> > > 	tx_ring = priv->tx_ring[skb->queue_mapping];
> >> > >
> >> > > where skb->queue_mapping is given by:
> >> > > 	err = netif_set_real_num_tx_queues(ndev, priv->num_tx_rings);
> >> > > and by this, respectively, from the mqprio code path:
> >> > > 	netif_set_real_num_tx_queues(ndev, num_tc);
> >> > >
> >> > > As for why XDP works, and priv->tx_ring[smp_processor_id()] is:
> >> > > - TX ring 0 for CPU 0 and TX ring 1 for CPU 1, if 2 CPUs are used
> >> > > - TX ring 0, if 1 CPU is used
> >> > >
> >> > > The TX completions in the first case are handled by:
> >> > > - CPU 0 for TX ring 0 (because it is even) and CPU 1 for TX ring 1
> >> > >   (because it is odd), if 2 CPUs are used, due to the mapping I talked
> >> > >   about earlier
> >> > > - CPU 0 if only 1 CPU is used
> >> > 
> >> > Right - thank you for the details! So what are the constraints on the
> >> > configuration. Specifically, given two netdevs on the same device, is it
> >> > possible that the system can ever end up in a situation where one device
> >> > has two *RXQs* configured, and the other only one *TXQ*. Because then
> >> > you could get a redirect from RXQ 1 on one device, which would also end
> >> > up trying to transmit on TXQ 1 on the other device; and that would break
> >> > if that other device only has TXQ 0 configured... Same thing if a single
> >> > device has 2 RXQs but only one TXQ (it can redirect to itself).
> >> 
> >> I discover more and more of the driver as I talk to you, I like it :D
> >> 
> >> So I said that there is a maximum number of RX and TX rings splittable
> >> between the PF and its VFs, but I wasn't exactly sure where that
> >> configuration is done. I found it now.
> >> 
> >> enetc_port_si_configure: (SI == station interface)
> >> 	- read Port capability register 0 (PCAPR0) to determine how many
> >> 	  RX rings and TX rings the hardware has for this port (PFs + VFs)
> >> 	  in total.
> >> 	- assign num_rings = min(TX rings, RX rings)
> >> 	- try to assign 8 TX rings and 8 RX rings to the PF
> >> 	  - if this fails, just assign ${num_rings} TX rings and
> >> 	    ${num_rings} RX rings to the PF
> >> 	- split the remaining RX and TX rings to the number of
> >> 	  configured VFs (example: if there are 16 RX rings and 16 TX
> >> 	  rings for a port with 2 VFs, the driver assigns 8RX/8TX rings
> >> 	  for the PF, and 4RX/4TX rings for each VF).
> >> 	   - if we couldn't assign 8RX/8TX rings for the PF in the
> >> 	     previous step, we don't assign any ring to the VF
> >> 
> >> So yeah, we have an equal number of RX and TX rings. The driver,
> >> however, only uses 2 RX rings _actively_: one per CPU. The other 6, I
> >> don't know, I guess I can use them for AF_XDP (I haven't looked very
> >> closely at that yet), at the moment they're pretty much unused, even if
> >> reserved and not given to VFs.
> >> 
> >> > >> > You mean that it's possible to receive a TC_SETUP_QDISC_MQPRIO or
> >> > >> > TC_SETUP_QDISC_TAPRIO with num_tc == 1, and we have 2 CPUs?
> >> > >>
> >> > >> Not just that, this ndo can be called on arbitrary CPUs after a
> >> > >> redirect. The code just calls through from the XDP receive path so which
> >> > >> CPU it ends up on depends on the RSS+IRQ config of the other device,
> >> > >> which may not even be the same driver; i.e., you have no control over
> >> > >> that... :)
> >> > >>
> >> > >
> >> > > What do you mean by "arbitrary" CPU? You can't plug CPUs in, it's a dual
> >> > > core system... Why does the source ifindex matter at all? I'm using the
> >> > > TX ring affined to the CPU that ndo_xdp_xmit is currently running on.
> >> > 
> >> > See, this is why I asked 'what mechanism ensures'. Because if that
> >> > mechanism is 'this driver is only ever used on a system with fewer CPUs
> >> > than TXQs', then that's of course fine :)
> >> > 
> >> > But there are drivers that do basically the same thing as what you've
> >> > done here, *without* having such an assurance, and just looking at that
> >> > function it's not obvious that there's an out-of-band reason why it's
> >> > safe. And I literally just came from looking at such a case when I
> >> > replied to your initial patch...
> >> 
> >> Maybe you were confused seeing that this is a PCI device, thinking it's
> >> a plug-in card or something, therefore we don't get to choose the number
> >> of CPUs that the host has. In hindsight, I don't know why you didn't ask
> >> about this, it is pretty strange when you think about it.
> >> 
> >> It is actually more like a platform device with a PCI front-end - we
> >> found this loophole in the PCI standard where you can create a "root
> >> complex/integrated endpoint" which is basically an ECAM where the config
> >> space contains PFs corresponding to some platform devices in the SoC (in
> >> our case, all 4 Ethernet ports have their own PF, the switch has its own
> >> PF, same thing for the MDIO controller and the 1588 timer). Their
> >> register map is exposed as a number of BARs which use Enhanced
> >> Allocation, so the generic PCI ECAM driver doesn't need to create any
> >> translation windows for these addresses, it just uses what's in there,
> >> which, surprise, is the actual base address of the peripheral in the
> >> SoC's memory space.
> >> 
> >> We do that because we gain a lot of cool stuff by appearing as PCI
> >> devices to system software, like for example multiple interfaces on top
> >> of a 'shared MAC' are simply mapped over SR-IOV.
> >> 
> >> So it just 'smells' like PCI, but they're regular memory-mapped devices,
> >> there is no PCI transaction layer or physical layer. At the moment the
> >> LS1028A is the only SoC running Linux that integrates the ENETC block,
> >> so we fully control the environment.
> >> 
> >> > >> > Well, yeah, I don't know what's the proper way to deal with that. Ideas?
> >> > >>
> >> > >> Well the obvious one is just:
> >> > >>
> >> > >> tx_ring = priv->tx_ring[smp_processor_id() % num_ring_ids];
> >> > >>
> >> > >> and then some kind of locking to deal with multiple CPUs accessing the
> >> > >> same TX ring...
> >> > >
> >> > > By multiple CPUs accessing the same TX ring, you mean locking between
> >> > > ndo_xdp_xmit and ndo_start_xmit? Can that even happen if the hardware
> >> > > architecture is to have at least as many TX rings as CPUs?
> >> > >
> >> > > Because otherwise, I see that ndo_xdp_xmit is only called from
> >> > > xdp_do_flush, which is in softirq context, which to my very rudimentary
> >> > > knowledge run with bottom halves, thus preemption, disabled? So I don't
> >> > > think it's possible for ndo_xdp_xmit and ndo_xmit, or even two
> >> > > ndo_xdp_xmit instances, to access the same TX ring?
> >> > 
> >> > Yup, I think you're right about that. The "we always have more TXQs than
> >> > CPUs" condition was the bit I was missing (and of course you're *sure*
> >> > that this would never change sometime in the future, right? ;)).
> >> 
> >> I'm pretty sure, yeah, we build the SoCs and one of the requirements we
> >> have is that every ENETC PF has enough TX rings in order for every CPU
> >> to have its own one. That helps a lot with avoiding contention and
> >> simplifying the driver. Maybe I'll use this opportunity to talk again to
> >> the hardware design guys and make sure that the next SoCs with Linux
> >> follow the same pattern as LS1028A, although I see no reason why not.
> >> 
> >> > > Sorry, I'm sure these are trivial questions, but I would like to really
> >> > > understand what I need to change and why :D
> >> > 
> >> > Given the above I think the only potentially breaking thing is the
> >> > #RXQ > #TXQ case I outlined. And maybe a comment documenting why indexing
> >> > the tx_ring array by smp_processor_id() is safe would be nice? :)
> >> 
> >> Sure, which part exactly do you think would explain it best? Should I
> >> add a reference to enetc_port_si_configure?
> >
> > After discussing a bit more with Claudiu, I think we do have a problem,
> > and it has to do with concurrent ndo_xdp_xmit on one CPU and ndo_start_xmit
> > on another CPU.
> >
> > See, even if we have 8 TX rings, they are not really affined to any CPU.
> > Instead, when we call netif_set_real_num_tx_queues, we allow netdev_pick_tx
> > to hash amongs the TX queues of the same priority. There are three consequences:
> > - Traffic with the same hash will be sent to the same TX queue, thus
> >   avoiding reordering for packets belonging to the same stream.
> > - Traffic with different hashes are distributed to different TX queues.
> > - If we have two CPUs sending traffic with the same hash, they will
> >   serialize on the TX lock of the same netdev queue.
> >
> > The last one is a problem because our XDP_REDIRECT tries to associate
> > one TX ring with one CPU, and, as explained above, that TX ring might
> > already be used by our ndo_start_xmit on another CPU, selected by
> > netdev_pick_tx.
> >
> > The first idea was to implement ndo_select_queue for the network stack,
> > and select the TX ring based on smp_processor_id(). But we know that
> > this will break the first two effects of netdev_pick_tx, which are very
> > much desirable. For example, if we have a user space process sending a
> > TCP stream, and the scheduler migrates that process from one CPU to
> > another, then the ndo_select_queue output for that TCP stream will
> > change, and we will have TX reordering for packets belonging to the same
> > stream. Not at all ideal.
> >
> > Another idea is to just crop some TX queues from the network stack, and
> > basically call netif_set_real_num_tx_queues(6), leaving one TX ring per
> > CPU dedicated to XDP. This will work just fine for normal qdiscs, except
> > that with mqprio/taprio we have a problem. Our TX rings have a configurable
> > strict priority for the hardware egress scheduler. When we don't have
> > mqprio/taprio, all TX rings have the same priority of 0 (therefore it is
> > safe to allow hashing to select one at random), but when we have mqprio
> > or taprio, we enjoy the benefit of configuring the priority of each TX
> > ring using the "map" option. The problem, of course, is that if we crop
> > 2 TX rings out of what the network stack sees, then we are no longer
> > able to configure their queue-to-traffic-class mapping through
> > mqprio/taprio, so we cannot change their prioritization relative to the
> > network stack queues. In a way, this seems to be in line with the XDP
> > design because that bypasses anything that has to do with qdiscs, but we
> > don't really like that. We also have some other qdisc-based offloads
> > such as Credit Based Shaper, and we would very much like to be able to
> > set bandwidth profiles for the XDP rings, for AVB/TSN use cases.
> 
> You'd not be the first driver to solve this by just carving out a couple
> of TX rings for XDP :)
> 
> And while I get the desire for being able to configure these things for
> XDP as well, I'm not sure that the qdisc interface is the right one to
> use for that. There was a general TXQ allocation idea that unfortunately
> stalled out, but there is also ongoing work on XDP+TSN - I'm hoping
> Jesper can chime in with the details...

See, the reason why I don't like this answer is because when we tried to
upstream our genetlink-based TSN configuration:
https://patchwork.ozlabs.org/project/netdev/patch/1545968945-7290-1-git-send-email-Po.Liu@nxp.com/
we were told that it's a QoS feature and QoS belongs to the qdisc layer.

I get the impression that XDP is largely incompatible with QoS by design,
which sounds to me like a bit of a foot gun. For example, we have some
customers interested in building an AVB application stack on top of AF_XDP,
and for the endpoints (talker/listener) they really need to be able to
configure bandwidth profiles for Stream Reservation classes A and B on
the AF_XDP rings.

To us, tc is mostly just a configuration interface for hardware features,
the deal was that this is fine as long as they have a software counterpart
with identical semantics. I think I understand the basic problem in that
a software shaper would be bypassed by XDP, and therefore, the bandwidth
profile would not be observed properly by the AVB talker if we were to
rely on that. So that sounds indeed like we shouldn't even attempt to
manage any TX queues on which XDP traffic is possible with tc, unless
we're willing to pass XDP_REDIRECT through the qdisc layer (which I'm
not suggesting is a good idea). But with the hardware offload that
wouldn't be the case, so it's almost as if what would work for us would
be to have some 'dummy' TX queues for XDP manageable by tc qdiscs where
we could attach our offloadable filters and shapers and policers. I just
don't want them to be completely invisible as far as tc is concerned.
Managing which TX queues go to XDP, and not letting the driver choose
that, would be even nicer.

> > Finally there is the option of taking the network stack's TX lock in our
> > ndo_xdp_xmit, but frankly I would leave that option as a last resort.
> > Maybe we could make this less expensive by bulk-enqueuing into a
> > temporary array of buffer descriptors, and only taking the xmit lock
> > when flushing that array (since that is the only portion that strictly
> > needs to be protected against concurrency). But the problem with this
> > approach is that if you have a temporary array, it becomes a lot more
> > difficult and error-prone to not take more frames than you can enqueue.
> > For example, the TX ring might have only 20 free entries, and you filled
> > your BD array with 32 frames, and you told the caller of ndo_xdp_xmit
> > that you processed all those frames. Now when push comes to shove and
> > you actually need to enqueue them, you end up in the position that you
> > must drop them yourself. This seems to be very much against the design
> > principle of commit fdc13979f91e ("bpf, devmap: Move drop error path to
> > devmap for XDP_REDIRECT") whose desire is to let XDP handle the dropping
> > of excess TX frames.
> 
> Note that there's already bulking in XDP_REDIRECT: after an XDP program
> returns XDP_REDIRECT, the packets will actually be put on a bulk queue
> (see bq_enqueue() in devmap.c), and that will be flushed to the TX
> driver at the end of the (RX) NAPI cycle. So taking a lock in
> ndo_xdp_xmit() may not be quite as much overhead as you think it is -
> so maybe it would be worth benchmarking before ruling this out entirely? :)

If shared TX queues does turn out to be the best alternative - which I'm
not convinced it is - then I'll benchmark it, sure.

^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH net-next 9/9] net: enetc: add support for XDP_REDIRECT
  2021-04-03 12:12                   ` Vladimir Oltean
@ 2021-04-06 11:23                     ` Toke Høiland-Jørgensen
  0 siblings, 0 replies; 23+ messages in thread
From: Toke Høiland-Jørgensen @ 2021-04-06 11:23 UTC (permalink / raw)
  To: Vladimir Oltean
  Cc: Jakub Kicinski, Alexei Starovoitov, Daniel Borkmann,
	Jesper Dangaard Brouer, John Fastabend, Andrii Nakryiko,
	Martin KaFai Lau, Song Liu, Yonghong Song, KP Singh,
	David S. Miller, netdev, bpf, Alexander Duyck, Ioana Ciornei,
	Alex Marginean, Claudiu Manoil, Ilias Apalodimas,
	Vladimir Oltean

Vladimir Oltean <olteanv@gmail.com> writes:

> On Sat, Apr 03, 2021 at 01:07:29PM +0200, Toke Høiland-Jørgensen wrote:
>> Vladimir Oltean <olteanv@gmail.com> writes:
>> 
>> > On Thu, Apr 01, 2021 at 10:38:21PM +0300, Vladimir Oltean wrote:
>> >> On Thu, Apr 01, 2021 at 08:01:42PM +0200, Toke Høiland-Jørgensen wrote:
>> >> > Vladimir Oltean <olteanv@gmail.com> writes:
>> >> > 
>> >> > > On Thu, Apr 01, 2021 at 01:39:05PM +0200, Toke Høiland-Jørgensen wrote:
>> >> > >> Vladimir Oltean <olteanv@gmail.com> writes:
>> >> > >>
>> >> > >> > On Thu, Apr 01, 2021 at 01:26:02PM +0200, Toke Høiland-Jørgensen wrote:
>> >> > >> >> > +int enetc_xdp_xmit(struct net_device *ndev, int num_frames,
>> >> > >> >> > +		   struct xdp_frame **frames, u32 flags)
>> >> > >> >> > +{
>> >> > >> >> > +	struct enetc_tx_swbd xdp_redirect_arr[ENETC_MAX_SKB_FRAGS] = {0};
>> >> > >> >> > +	struct enetc_ndev_priv *priv = netdev_priv(ndev);
>> >> > >> >> > +	struct enetc_bdr *tx_ring;
>> >> > >> >> > +	int xdp_tx_bd_cnt, i, k;
>> >> > >> >> > +	int xdp_tx_frm_cnt = 0;
>> >> > >> >> > +
>> >> > >> >> > +	tx_ring = priv->tx_ring[smp_processor_id()];
>> >> > >> >>
>> >> > >> >> What mechanism guarantees that this won't overflow the array? :)
>> >> > >> >
>> >> > >> > Which array, the array of TX rings?
>> >> > >>
>> >> > >> Yes.
>> >> > >>
>> >> > >
>> >> > > The problem isn't even accessing an out-of-bounds element in the TX ring array.
>> >> > >
>> >> > > As it turns out, I had a relatively superficial understanding of how
>> >> > > things are organized, but let me try to explain.
>> >> > >
>> >> > > The number of TX rings is a configurable resource (between PFs and VFs)
>> >> > > and we read the capability at probe time:
>> >> > >
>> >> > > enetc_get_si_caps:
>> >> > > 	val = enetc_rd(hw, ENETC_SICAPR0);
>> >> > > 	si->num_rx_rings = (val >> 16) & 0xff;
>> >> > > 	si->num_tx_rings = val & 0xff;
>> >> > >
>> >> > > enetc_init_si_rings_params:
>> >> > > 	priv->num_tx_rings = si->num_tx_rings;
>> >> > >
>> >> > > In any case, the TX array is declared as:
>> >> > >
>> >> > > struct enetc_ndev_priv {
>> >> > > 	struct enetc_bdr *tx_ring[16];
>> >> > > 	struct enetc_bdr *rx_ring[16];
>> >> > > };
>> >> > >
>> >> > > because that's the maximum hardware capability.
>> >> > >
>> >> > > The priv->tx_ring array is populated in:
>> >> > >
>> >> > > enetc_alloc_msix:
>> >> > > 	/* # of tx rings per int vector */
>> >> > > 	v_tx_rings = priv->num_tx_rings / priv->bdr_int_num;
>> >> > >
>> >> > > 	for (i = 0; i < priv->bdr_int_num; i++) {
>> >> > > 		for (j = 0; j < v_tx_rings; j++) {
>> >> > > 			if (priv->bdr_int_num == ENETC_MAX_BDR_INT)
>> >> > > 				idx = 2 * j + i; /* 2 CPUs */
>> >> > > 			else
>> >> > > 				idx = j + i * v_tx_rings; /* default */
>> >> > >
>> >> > > 			priv->tx_ring[idx] = bdr;
>> >> > > 		}
>> >> > > 	}
>> >> > >
>> >> > > priv->bdr_int_num is set to "num_online_cpus()".
>> >> > > On LS1028A, it can be either 1 or 2 (and the ENETC_MAX_BDR_INT macro is
>> >> > > equal to 2).
>> >> > >
>> >> > > Otherwise said, the convoluted logic above does the following:
>> >> > > - It affines an MSI interrupt vector per CPU
>> >> > > - It affines an RX ring per MSI vector, hence per CPU
>> >> > > - It balances the fixed number of TX rings (say 8) among the available
>> >> > >   MSI vectors, hence CPUs (say 2). It does this by iterating with i
>> >> > >   through the RX MSI interrupt vectors, and with j through the number of
>> >> > >   TX rings per MSI vector.
>> >> > >
>> >> > > This logic maps:
>> >> > > - the even TX rings to CPU 0 and the odd TX rings to CPU 1, if 2 CPUs
>> >> > >   are used
>> >> > > - all TX rings to CPU 0, if 1 CPU is used
>> >> > >
>> >> > > This is done because we have this logic in enetc_poll:
>> >> > >
>> >> > > 	for (i = 0; i < v->count_tx_rings; i++)
>> >> > > 		if (!enetc_clean_tx_ring(&v->tx_ring[i], budget))
>> >> > > 			complete = false;
>> >> > >
>> >> > > for processing the TX completions of a given group of TX rings in the RX
>> >> > > MSI interrupt handler of a certain CPU.
>> >> > >
>> >> > > Otherwise said, priv->tx_ring[i] is always BD ring i, and that mapping
>> >> > > never changes. All 8 TX rings are enabled and available for use.
>> >> > >
>> >> > > What I knew about tc-taprio and tc-mqprio is that they only enqueue to
>> >> > > TX queues [0, num_tc-1] because of this, as it turns out:
>> >> > >
>> >> > > enetc_xmit:
>> >> > > 	tx_ring = priv->tx_ring[skb->queue_mapping];
>> >> > >
>> >> > > where skb->queue_mapping is given by:
>> >> > > 	err = netif_set_real_num_tx_queues(ndev, priv->num_tx_rings);
>> >> > > and by this, respectively, from the mqprio code path:
>> >> > > 	netif_set_real_num_tx_queues(ndev, num_tc);
>> >> > >
>> >> > > As for why XDP works, and priv->tx_ring[smp_processor_id()] is:
>> >> > > - TX ring 0 for CPU 0 and TX ring 1 for CPU 1, if 2 CPUs are used
>> >> > > - TX ring 0, if 1 CPU is used
>> >> > >
>> >> > > The TX completions in the first case are handled by:
>> >> > > - CPU 0 for TX ring 0 (because it is even) and CPU 1 for TX ring 1
>> >> > >   (because it is odd), if 2 CPUs are used, due to the mapping I talked
>> >> > >   about earlier
>> >> > > - CPU 0 if only 1 CPU is used
>> >> > 
>> >> > Right - thank you for the details! So what are the constraints on the
>> >> > configuration. Specifically, given two netdevs on the same device, is it
>> >> > possible that the system can ever end up in a situation where one device
>> >> > has two *RXQs* configured, and the other only one *TXQ*. Because then
>> >> > you could get a redirect from RXQ 1 on one device, which would also end
>> >> > up trying to transmit on TXQ 1 on the other device; and that would break
>> >> > if that other device only has TXQ 0 configured... Same thing if a single
>> >> > device has 2 RXQs but only one TXQ (it can redirect to itself).
>> >> 
>> >> I discover more and more of the driver as I talk to you, I like it :D
>> >> 
>> >> So I said that there is a maximum number of RX and TX rings splittable
>> >> between the PF and its VFs, but I wasn't exactly sure where that
>> >> configuration is done. I found it now.
>> >> 
>> >> enetc_port_si_configure: (SI == station interface)
>> >> 	- read Port capability register 0 (PCAPR0) to determine how many
>> >> 	  RX rings and TX rings the hardware has for this port (PFs + VFs)
>> >> 	  in total.
>> >> 	- assign num_rings = min(TX rings, RX rings)
>> >> 	- try to assign 8 TX rings and 8 RX rings to the PF
>> >> 	  - if this fails, just assign ${num_rings} TX rings and
>> >> 	    ${num_rings} RX rings to the PF
>> >> 	- split the remaining RX and TX rings to the number of
>> >> 	  configured VFs (example: if there are 16 RX rings and 16 TX
>> >> 	  rings for a port with 2 VFs, the driver assigns 8RX/8TX rings
>> >> 	  for the PF, and 4RX/4TX rings for each VF).
>> >> 	   - if we couldn't assign 8RX/8TX rings for the PF in the
>> >> 	     previous step, we don't assign any ring to the VF
>> >> 
>> >> So yeah, we have an equal number of RX and TX rings. The driver,
>> >> however, only uses 2 RX rings _actively_: one per CPU. The other 6, I
>> >> don't know, I guess I can use them for AF_XDP (I haven't looked very
>> >> closely at that yet), at the moment they're pretty much unused, even if
>> >> reserved and not given to VFs.
>> >> 
>> >> > >> > You mean that it's possible to receive a TC_SETUP_QDISC_MQPRIO or
>> >> > >> > TC_SETUP_QDISC_TAPRIO with num_tc == 1, and we have 2 CPUs?
>> >> > >>
>> >> > >> Not just that, this ndo can be called on arbitrary CPUs after a
>> >> > >> redirect. The code just calls through from the XDP receive path so which
>> >> > >> CPU it ends up on depends on the RSS+IRQ config of the other device,
>> >> > >> which may not even be the same driver; i.e., you have no control over
>> >> > >> that... :)
>> >> > >>
>> >> > >
>> >> > > What do you mean by "arbitrary" CPU? You can't plug CPUs in, it's a dual
>> >> > > core system... Why does the source ifindex matter at all? I'm using the
>> >> > > TX ring affined to the CPU that ndo_xdp_xmit is currently running on.
>> >> > 
>> >> > See, this is why I asked 'what mechanism ensures'. Because if that
>> >> > mechanism is 'this driver is only ever used on a system with fewer CPUs
>> >> > than TXQs', then that's of course fine :)
>> >> > 
>> >> > But there are drivers that do basically the same thing as what you've
>> >> > done here, *without* having such an assurance, and just looking at that
>> >> > function it's not obvious that there's an out-of-band reason why it's
>> >> > safe. And I literally just came from looking at such a case when I
>> >> > replied to your initial patch...
>> >> 
>> >> Maybe you were confused seeing that this is a PCI device, thinking it's
>> >> a plug-in card or something, therefore we don't get to choose the number
>> >> of CPUs that the host has. In hindsight, I don't know why you didn't ask
>> >> about this, it is pretty strange when you think about it.
>> >> 
>> >> It is actually more like a platform device with a PCI front-end - we
>> >> found this loophole in the PCI standard where you can create a "root
>> >> complex/integrated endpoint" which is basically an ECAM where the config
>> >> space contains PFs corresponding to some platform devices in the SoC (in
>> >> our case, all 4 Ethernet ports have their own PF, the switch has its own
>> >> PF, same thing for the MDIO controller and the 1588 timer). Their
>> >> register map is exposed as a number of BARs which use Enhanced
>> >> Allocation, so the generic PCI ECAM driver doesn't need to create any
>> >> translation windows for these addresses, it just uses what's in there,
>> >> which, surprise, is the actual base address of the peripheral in the
>> >> SoC's memory space.
>> >> 
>> >> We do that because we gain a lot of cool stuff by appearing as PCI
>> >> devices to system software, like for example multiple interfaces on top
>> >> of a 'shared MAC' are simply mapped over SR-IOV.
>> >> 
>> >> So it just 'smells' like PCI, but they're regular memory-mapped devices,
>> >> there is no PCI transaction layer or physical layer. At the moment the
>> >> LS1028A is the only SoC running Linux that integrates the ENETC block,
>> >> so we fully control the environment.
>> >> 
>> >> > >> > Well, yeah, I don't know what's the proper way to deal with that. Ideas?
>> >> > >>
>> >> > >> Well the obvious one is just:
>> >> > >>
>> >> > >> tx_ring = priv->tx_ring[smp_processor_id() % num_ring_ids];
>> >> > >>
>> >> > >> and then some kind of locking to deal with multiple CPUs accessing the
>> >> > >> same TX ring...
>> >> > >
>> >> > > By multiple CPUs accessing the same TX ring, you mean locking between
>> >> > > ndo_xdp_xmit and ndo_start_xmit? Can that even happen if the hardware
>> >> > > architecture is to have at least as many TX rings as CPUs?
>> >> > >
>> >> > > Because otherwise, I see that ndo_xdp_xmit is only called from
>> >> > > xdp_do_flush, which is in softirq context, which to my very rudimentary
>> >> > > knowledge run with bottom halves, thus preemption, disabled? So I don't
>> >> > > think it's possible for ndo_xdp_xmit and ndo_xmit, or even two
>> >> > > ndo_xdp_xmit instances, to access the same TX ring?
>> >> > 
>> >> > Yup, I think you're right about that. The "we always have more TXQs than
>> >> > CPUs" condition was the bit I was missing (and of course you're *sure*
>> >> > that this would never change sometime in the future, right? ;)).
>> >> 
>> >> I'm pretty sure, yeah, we build the SoCs and one of the requirements we
>> >> have is that every ENETC PF has enough TX rings in order for every CPU
>> >> to have its own one. That helps a lot with avoiding contention and
>> >> simplifying the driver. Maybe I'll use this opportunity to talk again to
>> >> the hardware design guys and make sure that the next SoCs with Linux
>> >> follow the same pattern as LS1028A, although I see no reason why not.
>> >> 
>> >> > > Sorry, I'm sure these are trivial questions, but I would like to really
>> >> > > understand what I need to change and why :D
>> >> > 
>> >> > Given the above I think the only potentially breaking thing is the
>> >> > #RXQ > #TXQ case I outlined. And maybe a comment documenting why indexing
>> >> > the tx_ring array by smp_processor_id() is safe would be nice? :)
>> >> 
>> >> Sure, which part exactly do you think would explain it best? Should I
>> >> add a reference to enetc_port_si_configure?
>> >
>> > After discussing a bit more with Claudiu, I think we do have a problem,
>> > and it has to do with concurrent ndo_xdp_xmit on one CPU and ndo_start_xmit
>> > on another CPU.
>> >
>> > See, even if we have 8 TX rings, they are not really affined to any CPU.
>> > Instead, when we call netif_set_real_num_tx_queues, we allow netdev_pick_tx
>> > to hash amongs the TX queues of the same priority. There are three consequences:
>> > - Traffic with the same hash will be sent to the same TX queue, thus
>> >   avoiding reordering for packets belonging to the same stream.
>> > - Traffic with different hashes are distributed to different TX queues.
>> > - If we have two CPUs sending traffic with the same hash, they will
>> >   serialize on the TX lock of the same netdev queue.
>> >
>> > The last one is a problem because our XDP_REDIRECT tries to associate
>> > one TX ring with one CPU, and, as explained above, that TX ring might
>> > already be used by our ndo_start_xmit on another CPU, selected by
>> > netdev_pick_tx.
>> >
>> > The first idea was to implement ndo_select_queue for the network stack,
>> > and select the TX ring based on smp_processor_id(). But we know that
>> > this will break the first two effects of netdev_pick_tx, which are very
>> > much desirable. For example, if we have a user space process sending a
>> > TCP stream, and the scheduler migrates that process from one CPU to
>> > another, then the ndo_select_queue output for that TCP stream will
>> > change, and we will have TX reordering for packets belonging to the same
>> > stream. Not at all ideal.
>> >
>> > Another idea is to just crop some TX queues from the network stack, and
>> > basically call netif_set_real_num_tx_queues(6), leaving one TX ring per
>> > CPU dedicated to XDP. This will work just fine for normal qdiscs, except
>> > that with mqprio/taprio we have a problem. Our TX rings have a configurable
>> > strict priority for the hardware egress scheduler. When we don't have
>> > mqprio/taprio, all TX rings have the same priority of 0 (therefore it is
>> > safe to allow hashing to select one at random), but when we have mqprio
>> > or taprio, we enjoy the benefit of configuring the priority of each TX
>> > ring using the "map" option. The problem, of course, is that if we crop
>> > 2 TX rings out of what the network stack sees, then we are no longer
>> > able to configure their queue-to-traffic-class mapping through
>> > mqprio/taprio, so we cannot change their prioritization relative to the
>> > network stack queues. In a way, this seems to be in line with the XDP
>> > design because that bypasses anything that has to do with qdiscs, but we
>> > don't really like that. We also have some other qdisc-based offloads
>> > such as Credit Based Shaper, and we would very much like to be able to
>> > set bandwidth profiles for the XDP rings, for AVB/TSN use cases.
>> 
>> You'd not be the first driver to solve this by just carving out a couple
>> of TX rings for XDP :)
>> 
>> And while I get the desire for being able to configure these things for
>> XDP as well, I'm not sure that the qdisc interface is the right one to
>> use for that. There was a general TXQ allocation idea that unfortunately
>> stalled out, but there is also ongoing work on XDP+TSN - I'm hoping
>> Jesper can chime in with the details...
>
> See, the reason why I don't like this answer is because when we tried to
> upstream our genetlink-based TSN configuration:
> https://patchwork.ozlabs.org/project/netdev/patch/1545968945-7290-1-git-send-email-Po.Liu@nxp.com/
> we were told that it's a QoS feature and QoS belongs to the qdisc layer.
>
> I get the impression that XDP is largely incompatible with QoS by design,
> which sounds to me like a bit of a foot gun. For example, we have some
> customers interested in building an AVB application stack on top of AF_XDP,
> and for the endpoints (talker/listener) they really need to be able to
> configure bandwidth profiles for Stream Reservation classes A and B on
> the AF_XDP rings.
>
> To us, tc is mostly just a configuration interface for hardware features,
> the deal was that this is fine as long as they have a software counterpart
> with identical semantics. I think I understand the basic problem in that
> a software shaper would be bypassed by XDP, and therefore, the bandwidth
> profile would not be observed properly by the AVB talker if we were to
> rely on that. So that sounds indeed like we shouldn't even attempt to
> manage any TX queues on which XDP traffic is possible with tc, unless
> we're willing to pass XDP_REDIRECT through the qdisc layer (which I'm
> not suggesting is a good idea). But with the hardware offload that
> wouldn't be the case, so it's almost as if what would work for us would
> be to have some 'dummy' TX queues for XDP manageable by tc qdiscs where
> we could attach our offloadable filters and shapers and policers. I just
> don't want them to be completely invisible as far as tc is concerned.
> Managing which TX queues go to XDP, and not letting the driver choose
> that, would be even nicer.

I'm not objecting to being able to configure the hardware queues that
will be used for XDP, I'm just saying that doing so via TC is not a very
good interface for it... Rather, we need an interface for configuring
hardware queues that can be used by *both* XDP and TC.

And yeah, the lack of queueing and bandwidth management is a major
footgun in XDP, which we do want to fix (also for regular XDP_REDIRECT,
not just AF_XDP).

-Toke


^ permalink raw reply	[flat|nested] 23+ messages in thread

end of thread, back to index

Thread overview: 23+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-03-31 20:08 [PATCH net-next 0/9] XDP for NXP ENETC Vladimir Oltean
2021-03-31 20:08 ` [PATCH net-next 1/9] net: enetc: consume the error RX buffer descriptors in a dedicated function Vladimir Oltean
2021-03-31 20:08 ` [PATCH net-next 2/9] net: enetc: move skb creation into enetc_build_skb Vladimir Oltean
2021-03-31 20:08 ` [PATCH net-next 3/9] net: enetc: add a dedicated is_eof bit in the TX software BD Vladimir Oltean
2021-03-31 20:08 ` [PATCH net-next 4/9] net: enetc: clean the TX software BD on the TX confirmation path Vladimir Oltean
2021-03-31 20:08 ` [PATCH net-next 5/9] net: enetc: move up enetc_reuse_page and enetc_page_reusable Vladimir Oltean
2021-03-31 20:08 ` [PATCH net-next 6/9] net: enetc: add support for XDP_DROP and XDP_PASS Vladimir Oltean
2021-03-31 20:08 ` [PATCH net-next 7/9] net: enetc: add support for XDP_TX Vladimir Oltean
2021-03-31 20:08 ` [PATCH net-next 8/9] net: enetc: increase RX ring default size Vladimir Oltean
2021-03-31 20:08 ` [PATCH net-next 9/9] net: enetc: add support for XDP_REDIRECT Vladimir Oltean
2021-04-01 11:26   ` Toke Høiland-Jørgensen
2021-04-01 11:31     ` Vladimir Oltean
2021-04-01 11:39       ` Toke Høiland-Jørgensen
2021-04-01 16:09         ` Vladimir Oltean
2021-04-01 18:01           ` Toke Høiland-Jørgensen
2021-04-01 19:38             ` Vladimir Oltean
2021-04-02 10:56               ` Vladimir Oltean
2021-04-03 11:07                 ` Toke Høiland-Jørgensen
2021-04-03 12:12                   ` Vladimir Oltean
2021-04-06 11:23                     ` Toke Høiland-Jørgensen
2021-03-31 22:20 ` [PATCH net-next 0/9] XDP for NXP ENETC patchwork-bot+netdevbpf
2021-03-31 22:55   ` Vladimir Oltean
2021-04-01 11:28     ` Toke Høiland-Jørgensen

BPF Archive on lore.kernel.org

Archives are clonable:
	git clone --mirror https://lore.kernel.org/bpf/0 bpf/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 bpf bpf/ https://lore.kernel.org/bpf \
		bpf@vger.kernel.org
	public-inbox-index bpf

Example config snippet for mirrors

Newsgroup available over NNTP:
	nntp://nntp.lore.kernel.org/org.kernel.vger.bpf


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git