All of lore.kernel.org
 help / color / mirror / Atom feed
From: Qi Zhang <qi.z.zhang@intel.com>
To: dev@dpdk.org
Cc: magnus.karlsson@intel.com, bjorn.topel@intel.com,
	Qi Zhang <qi.z.zhang@intel.com>
Subject: [RFC v2 4/7] net/af_xdp: use mbuf mempool for buffer management
Date: Thu,  8 Mar 2018 21:52:46 +0800	[thread overview]
Message-ID: <20180308135249.28187-5-qi.z.zhang@intel.com> (raw)
In-Reply-To: <20180308135249.28187-1-qi.z.zhang@intel.com>

Now, af_xdp registered memory buffer is managed by rte_mempool.
mbuf be allocated from rte_mempool can be convert to descriptor
index and vice versa.

Signed-off-by: Qi Zhang <qi.z.zhang@intel.com>
---
 drivers/net/af_xdp/rte_eth_af_xdp.c | 166 +++++++++++++++++++++---------------
 1 file changed, 98 insertions(+), 68 deletions(-)

diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c
index 5c7c53aeb..65c4c37bf 100644
--- a/drivers/net/af_xdp/rte_eth_af_xdp.c
+++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
@@ -39,7 +39,11 @@
 
 #define ETH_AF_XDP_FRAME_SIZE		2048
 #define ETH_AF_XDP_NUM_BUFFERS		131072
-#define ETH_AF_XDP_DATA_HEADROOM	0
+/* mempool hdrobj size (64 bytes) + sizeof(struct rte_mbuf) (128 bytes) */
+#define ETH_AF_XDP_MBUF_OVERHEAD	192
+/* data start from offset 320 (192 + 128) bytes */
+#define ETH_AF_XDP_DATA_HEADROOM \
+	(ETH_AF_XDP_MBUF_OVERHEAD + RTE_PKTMBUF_HEADROOM)
 #define ETH_AF_XDP_DFLT_RING_SIZE	1024
 #define ETH_AF_XDP_DFLT_QUEUE_IDX	0
 
@@ -53,6 +57,7 @@ struct xdp_umem {
 	unsigned int frame_size_log2;
 	unsigned int nframes;
 	int mr_fd;
+	struct rte_mempool *mb_pool;
 };
 
 struct pmd_internals {
@@ -63,7 +68,7 @@ struct pmd_internals {
 	struct xdp_queue rx;
 	struct xdp_queue tx;
 	struct xdp_umem *umem;
-	struct rte_mempool *mb_pool;
+	struct rte_mempool *ext_mb_pool;
 
 	unsigned long rx_pkts;
 	unsigned long rx_bytes;
@@ -76,7 +81,6 @@ struct pmd_internals {
 	uint16_t port_id;
 	uint16_t queue_idx;
 	int ring_size;
-	struct rte_ring *buf_ring;
 };
 
 static const char * const valid_arguments[] = {
@@ -101,6 +105,22 @@ static void *get_pkt_data(struct pmd_internals *internals,
 			(index << internals->umem->frame_size_log2) + offset);
 }
 
+static uint32_t
+mbuf_to_idx(struct pmd_internals *internals, struct rte_mbuf *mbuf)
+{
+	return (uint32_t)(((uint64_t)mbuf->buf_addr -
+			  (uint64_t)internals->umem->buffer) >>
+			  internals->umem->frame_size_log2);
+}
+
+static struct rte_mbuf *
+idx_to_mbuf(struct pmd_internals *internals, uint32_t idx)
+{
+	return (struct rte_mbuf *)(internals->umem->buffer +
+				   (idx << internals->umem->frame_size_log2) +
+				    0x40);
+}
+
 static uint16_t
 eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 {
@@ -115,18 +135,19 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 		  nb_pkts : ETH_AF_XDP_RX_BATCH_SIZE;
 
 	struct xdp_desc descs[ETH_AF_XDP_RX_BATCH_SIZE];
-	void *indexes[ETH_AF_XDP_RX_BATCH_SIZE];
+	struct rte_mbuf *mbufs[ETH_AF_XDP_RX_BATCH_SIZE];
 	int rcvd, i;
 
 	/* fill rx ring */
 	if (rxq->num_free >= ETH_AF_XDP_RX_BATCH_SIZE) {
-		int n = rte_ring_dequeue_bulk(internals->buf_ring,
-					      indexes,
-					      ETH_AF_XDP_RX_BATCH_SIZE,
-					      NULL);
-		for (i = 0; i < n; i++)
-			descs[i].idx = (uint32_t)((long int)indexes[i]);
-		xq_enq(rxq, descs, n);
+		int ret = rte_mempool_get_bulk(internals->umem->mb_pool,
+					       (void *)mbufs,
+					       ETH_AF_XDP_RX_BATCH_SIZE);
+		if (!ret) {
+			for (i = 0; i < ETH_AF_XDP_RX_BATCH_SIZE; i++)
+				descs[i].idx = mbuf_to_idx(internals, mbufs[i]);
+			xq_enq(rxq, descs, ETH_AF_XDP_RX_BATCH_SIZE);
+		}
 	}
 
 	/* read data */
@@ -138,7 +159,7 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 		char *pkt;
 		uint32_t idx = descs[i].idx;
 
-		mbuf = rte_pktmbuf_alloc(internals->mb_pool);
+		mbuf = rte_pktmbuf_alloc(internals->ext_mb_pool);
 		rte_pktmbuf_pkt_len(mbuf) =
 			rte_pktmbuf_data_len(mbuf) =
 			descs[i].len;
@@ -151,11 +172,9 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 		} else {
 			dropped++;
 		}
-		indexes[i] = (void *)((long int)idx);
+		rte_pktmbuf_free(idx_to_mbuf(internals, idx));
 	}
 
-	rte_ring_enqueue_bulk(internals->buf_ring, indexes, rcvd, NULL);
-
 	internals->rx_pkts += (rcvd - dropped);
 	internals->rx_bytes += rx_bytes;
 	internals->rx_dropped += dropped;
@@ -183,9 +202,10 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 	struct xdp_queue *txq = &internals->tx;
 	struct rte_mbuf *mbuf;
 	struct xdp_desc descs[ETH_AF_XDP_TX_BATCH_SIZE];
-	void *indexes[ETH_AF_XDP_TX_BATCH_SIZE];
+	struct rte_mbuf *mbufs[ETH_AF_XDP_TX_BATCH_SIZE];
 	uint16_t i, valid;
 	unsigned long tx_bytes = 0;
+	int ret;
 
 	nb_pkts = nb_pkts < ETH_AF_XDP_TX_BATCH_SIZE ?
 		  nb_pkts : ETH_AF_XDP_TX_BATCH_SIZE;
@@ -194,13 +214,15 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 		int n = xq_deq(txq, descs, ETH_AF_XDP_TX_BATCH_SIZE);
 
 		for (i = 0; i < n; i++)
-			indexes[i] = (void *)((long int)descs[i].idx);
-		rte_ring_enqueue_bulk(internals->buf_ring, indexes, n, NULL);
+			rte_pktmbuf_free(idx_to_mbuf(internals, descs[i].idx));
 	}
 
 	nb_pkts = nb_pkts > txq->num_free ? txq->num_free : nb_pkts;
-	nb_pkts = rte_ring_dequeue_bulk(internals->buf_ring, indexes,
-					nb_pkts, NULL);
+	ret = rte_mempool_get_bulk(internals->umem->mb_pool,
+				   (void *)mbufs,
+				   nb_pkts);
+	if (ret)
+		return 0;
 
 	valid = 0;
 	for (i = 0; i < nb_pkts; i++) {
@@ -209,14 +231,14 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 			internals->umem->frame_size - ETH_AF_XDP_DATA_HEADROOM;
 		mbuf = bufs[i];
 		if (mbuf->pkt_len <= buf_len) {
-			descs[valid].idx = (uint32_t)((long int)indexes[valid]);
+			descs[valid].idx = mbuf_to_idx(internals, mbufs[i]);
 			descs[valid].offset = ETH_AF_XDP_DATA_HEADROOM;
 			descs[valid].flags = 0;
 			descs[valid].len = mbuf->pkt_len;
 			pkt = get_pkt_data(internals, descs[i].idx,
 					   descs[i].offset);
 			memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
-			       descs[i].len);
+					   descs[i].len);
 			valid++;
 			tx_bytes += mbuf->pkt_len;
 		}
@@ -227,9 +249,10 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 	xq_enq(txq, descs, valid);
 	kick_tx(internals->sfd);
 
-	if (valid < nb_pkts)
-		rte_ring_enqueue_bulk(internals->buf_ring, &indexes[valid],
-				      nb_pkts - valid, NULL);
+	if (valid < nb_pkts) {
+		for (i = valid; i < nb_pkts; i++)
+			rte_pktmbuf_free(mbufs[i]);
+	}
 
 	internals->err_pkts += (nb_pkts - valid);
 	internals->tx_pkts += valid;
@@ -242,14 +265,13 @@ static void
 fill_rx_desc(struct pmd_internals *internals)
 {
 	int num_free = internals->rx.num_free;
-	void *p = NULL;
 	int i;
-
 	for (i = 0; i < num_free; i++) {
 		struct xdp_desc desc = {};
+		struct rte_mbuf *mbuf =
+			rte_pktmbuf_alloc(internals->umem->mb_pool);
 
-		rte_ring_dequeue(internals->buf_ring, &p);
-		desc.idx = (uint32_t)((long int)p);
+		desc.idx = mbuf_to_idx(internals, mbuf);
 		xq_enq(&internals->rx, &desc, 1);
 	}
 }
@@ -344,33 +366,53 @@ eth_link_update(struct rte_eth_dev *dev __rte_unused,
 	return 0;
 }
 
-static struct xdp_umem *xsk_alloc_and_mem_reg_buffers(int sfd, size_t nbuffers)
+static void *get_base_addr(struct rte_mempool *mb_pool)
+{
+	struct rte_mempool_memhdr *memhdr;
+
+	STAILQ_FOREACH(memhdr, &mb_pool->mem_list, next) {
+		return memhdr->addr;
+	}
+	return NULL;
+}
+
+static struct xdp_umem *xsk_alloc_and_mem_reg_buffers(int sfd,
+						      size_t nbuffers,
+						      const char *pool_name)
 {
 	struct xdp_mr_req req = { .frame_size = ETH_AF_XDP_FRAME_SIZE,
 				  .data_headroom = ETH_AF_XDP_DATA_HEADROOM };
-	struct xdp_umem *umem;
-	void *bufs;
-	int ret;
+	struct xdp_umem *umem = calloc(1, sizeof(*umem));
 
-	ret = posix_memalign((void **)&bufs, getpagesize(),
-			     nbuffers * req.frame_size);
-	if (ret)
+	if (!umem)
+		return NULL;
+
+	umem->mb_pool =
+		rte_pktmbuf_pool_create_with_flags(
+			pool_name, nbuffers,
+			250, 0,
+			(ETH_AF_XDP_FRAME_SIZE - ETH_AF_XDP_MBUF_OVERHEAD),
+			MEMPOOL_F_NO_SPREAD | MEMPOOL_F_PAGE_ALIGN,
+			SOCKET_ID_ANY);
+
+	if (!umem->mb_pool) {
+		free(umem);
 		return NULL;
+	}
 
-	umem = calloc(1, sizeof(*umem));
-	if (!umem) {
-		free(bufs);
+	if (umem->mb_pool->nb_mem_chunks > 1) {
+		rte_mempool_free(umem->mb_pool);
+		free(umem);
 		return NULL;
 	}
 
-	req.addr = (unsigned long)bufs;
+	req.addr = (uint64_t)get_base_addr(umem->mb_pool);
 	req.len = nbuffers * req.frame_size;
-	ret = setsockopt(sfd, SOL_XDP, XDP_MEM_REG, &req, sizeof(req));
-	RTE_ASSERT(ret == 0);
+	setsockopt(sfd, SOL_XDP, XDP_MEM_REG, &req, sizeof(req));
 
 	umem->frame_size = ETH_AF_XDP_FRAME_SIZE;
 	umem->frame_size_log2 = 11;
-	umem->buffer = bufs;
+	umem->buffer = (char *)req.addr;
 	umem->size = nbuffers * req.frame_size;
 	umem->nframes = nbuffers;
 	umem->mr_fd = sfd;
@@ -383,38 +425,27 @@ xdp_configure(struct pmd_internals *internals)
 {
 	struct sockaddr_xdp sxdp;
 	struct xdp_ring_req req;
-	char ring_name[0x100];
+	char pool_name[0x100];
+
 	int ret = 0;
-	long int i;
 
-	snprintf(ring_name, 0x100, "%s_%s_%d", "af_xdp_ring",
+	snprintf(pool_name, 0x100, "%s_%s_%d", "af_xdp_pool",
 		 internals->if_name, internals->queue_idx);
-	internals->buf_ring = rte_ring_create(ring_name,
-					      ETH_AF_XDP_NUM_BUFFERS,
-					      SOCKET_ID_ANY,
-					      0x0);
-	if (!internals->buf_ring)
-		return -1;
-
-	for (i = 0; i < ETH_AF_XDP_NUM_BUFFERS; i++)
-		rte_ring_enqueue(internals->buf_ring, (void *)i);
-
 	internals->umem = xsk_alloc_and_mem_reg_buffers(internals->sfd,
-							ETH_AF_XDP_NUM_BUFFERS);
+							ETH_AF_XDP_NUM_BUFFERS,
+							pool_name);
 	if (!internals->umem)
-		goto error;
+		return -1;
 
 	req.mr_fd = internals->umem->mr_fd;
 	req.desc_nr = internals->ring_size;
 
 	ret = setsockopt(internals->sfd, SOL_XDP, XDP_RX_RING,
 			 &req, sizeof(req));
-
 	RTE_ASSERT(ret == 0);
 
 	ret = setsockopt(internals->sfd, SOL_XDP, XDP_TX_RING,
 			 &req, sizeof(req));
-
 	RTE_ASSERT(ret == 0);
 
 	internals->rx.ring = mmap(0, req.desc_nr * sizeof(struct xdp_desc),
@@ -445,10 +476,6 @@ xdp_configure(struct pmd_internals *internals)
 	RTE_ASSERT(ret == 0);
 
 	return ret;
-error:
-	rte_ring_free(internals->buf_ring);
-	internals->buf_ring = NULL;
-	return -1;
 }
 
 static int
@@ -463,11 +490,11 @@ eth_rx_queue_setup(struct rte_eth_dev *dev,
 	unsigned int buf_size, data_size;
 
 	RTE_ASSERT(rx_queue_id == 0);
-	internals->mb_pool = mb_pool;
+	internals->ext_mb_pool = mb_pool;
 	xdp_configure(internals);
 
 	/* Now get the space available for data in the mbuf */
-	buf_size = rte_pktmbuf_data_room_size(internals->mb_pool) -
+	buf_size = rte_pktmbuf_data_room_size(internals->ext_mb_pool) -
 		RTE_PKTMBUF_HEADROOM;
 	data_size = internals->umem->frame_size;
 
@@ -736,8 +763,11 @@ rte_pmd_af_xdp_remove(struct rte_vdev_device *dev)
 		return -1;
 
 	internals = eth_dev->data->dev_private;
-	rte_ring_free(internals->buf_ring);
-	rte_free(internals->umem);
+	if (internals->umem) {
+		if (internals->umem->mb_pool)
+			rte_mempool_free(internals->umem->mb_pool);
+		rte_free(internals->umem);
+	}
 	rte_free(eth_dev->data->dev_private);
 	rte_free(eth_dev->data);
 	close(internals->sfd);
-- 
2.13.6

  parent reply	other threads:[~2018-03-08 13:52 UTC|newest]

Thread overview: 11+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-03-08 13:52 [RFC v2 0/7] PMD driver for AF_XDP Qi Zhang
2018-03-08 13:52 ` [RFC v2 1/7] net/af_xdp: new PMD driver Qi Zhang
2018-03-08 13:52 ` [RFC v2 2/7] lib/mbuf: enable parse flags when create mempool Qi Zhang
2018-03-08 13:52 ` [RFC v2 3/7] lib/mempool: allow page size aligned mempool Qi Zhang
2018-03-08 13:52 ` Qi Zhang [this message]
2018-03-08 13:52 ` [RFC v2 5/7] net/af_xdp: enable share mempool Qi Zhang
2018-03-08 13:52 ` [RFC v2 6/7] net/af_xdp: load BPF file Qi Zhang
2018-03-08 14:20   ` Zhang, Qi Z
2018-03-08 23:15   ` Stephen Hemminger
2018-05-09  7:02     ` Björn Töpel
2018-03-08 13:52 ` [RFC v2 7/7] app/testpmd: enable parameter for mempool flags Qi Zhang

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20180308135249.28187-5-qi.z.zhang@intel.com \
    --to=qi.z.zhang@intel.com \
    --cc=bjorn.topel@intel.com \
    --cc=dev@dpdk.org \
    --cc=magnus.karlsson@intel.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.