All of lore.kernel.org
 help / color / mirror / Atom feed
* [RFC 0/2] vhost: support async dequeue data path
@ 2022-01-01  0:12 xuan.ding
  2022-01-01  0:12 ` [RFC 1/2] vhost: support async dequeue for split ring xuan.ding
                   ` (3 more replies)
  0 siblings, 4 replies; 11+ messages in thread
From: xuan.ding @ 2022-01-01  0:12 UTC (permalink / raw)
  To: maxime.coquelin, chenbo.xia
  Cc: dev, jiayu.hu, sunil.pai.g, liangma, yuanx.wang, cheng1.jiang,
	wenwux.ma, Xuan Ding

From: Xuan Ding <xuan.ding@intel.com>

Hi everyone,

The presence of an asynchronous path allows applications to offload memory
copies to DMA engine, so as to save CPU cycles and improve the copy
performance. This patch set is a draft implementation for split
ring in vhost async dequeue data path. The code is based on latest
enqueue changes [1].

This patch set is a new design and implementation of [2]. Since dmadev
is introduced in 21.11, to simplify application logics, this patch
integrates dmadev in vhost. With dmadev integrated, vhost supports M:N
mapping between vrings and DMA virtual channels. Specifically, one vring
can use multiple different DMA channels and one DMA channel can be
shared by multiple vrings at the same time.

A new asynchronous dequeue function is introduced:
	1) rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
       		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
		uint16_t count, int *nr_inflight,
		uint16_t dma_id, uint16_t dma_vchan)

	Receive packets from the guest and offloads copies to DMA
virtual channel.

[1] https://mails.dpdk.org/archives/dev/2021-December/231889.html.
[2] https://mails.dpdk.org/archives/dev/2021-September/218591.html.

Your comments are welcomed and appreciated!

Thanks!
Xuan

Xuan Ding (2):
  vhost: support async dequeue for split ring
  examples/vhost: support async dequeue data path

 doc/guides/sample_app_ug/vhost.rst |   9 +-
 examples/vhost/main.c              | 305 ++++++++++++------
 examples/vhost/main.h              |  35 +-
 examples/vhost/virtio_net.c        |  16 +-
 lib/vhost/rte_vhost_async.h        |  29 ++
 lib/vhost/version.map              |   1 +
 lib/vhost/vhost.h                  |   1 +
 lib/vhost/virtio_net.c             | 493 +++++++++++++++++++++++++++++
 8 files changed, 783 insertions(+), 106 deletions(-)

-- 
2.17.1


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [RFC 1/2] vhost: support async dequeue for split ring
  2022-01-01  0:12 [RFC 0/2] vhost: support async dequeue data path xuan.ding
@ 2022-01-01  0:12 ` xuan.ding
  2022-01-01  0:12 ` [RFC 2/2] examples/vhost: support async dequeue data path xuan.ding
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 11+ messages in thread
From: xuan.ding @ 2022-01-01  0:12 UTC (permalink / raw)
  To: maxime.coquelin, chenbo.xia
  Cc: dev, jiayu.hu, sunil.pai.g, liangma, yuanx.wang, cheng1.jiang,
	wenwux.ma, Xuan Ding

From: Xuan Ding <xuan.ding@intel.com>

This patch implements asynchronous dequeue data path for vhost split
ring, with dmadev library integrated.

Signed-off-by: Yuan Wang <yuanx.wang@intel.com>
Signed-off-by: Xuan Ding <xuan.ding@intel.com>
---
 lib/vhost/rte_vhost_async.h |  29 +++
 lib/vhost/version.map       |   1 +
 lib/vhost/vhost.h           |   1 +
 lib/vhost/virtio_net.c      | 494 ++++++++++++++++++++++++++++++++++++
 4 files changed, 525 insertions(+)

diff --git a/lib/vhost/rte_vhost_async.h b/lib/vhost/rte_vhost_async.h
index 23a7a2d8b3..b1249382cd 100644
--- a/lib/vhost/rte_vhost_async.h
+++ b/lib/vhost/rte_vhost_async.h
@@ -203,4 +203,33 @@ __rte_experimental
 int rte_vhost_async_dma_configure(struct rte_vhost_async_dma_info *dmas,
 		uint16_t count);
 
+/**
+ * This function tries to receive packets from the guest with offloading
+ * copies to the async channel. The packets that are transfer completed
+ * are returned in "pkts". The other packets that their copies are submitted to
+ * the async channel but not completed are called "in-flight packets".
+ * This function will not return in-flight packets until their copies are
+ * completed by the async channel.
+ *
+ * @param vid
+ *  ID of vhost device to dequeue data
+ * @param queue_id
+ *  ID of virtqueue to dequeue data
+ * @param mbuf_pool
+ *  Mbuf_pool where host mbuf is allocated.
+ * @param pkts
+ *  Blank array to keep successfully dequeued packets
+ * @param count
+ *  Size of the packet array
+ * @param nr_inflight
+ *  The amount of in-flight packets. If error occurred, its value is set to -1.
+ * @return
+ *  Num of successfully dequeued packets
+ */
+__rte_experimental
+uint16_t
+rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
+	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
+	int *nr_inflight, uint16_t dma_id, uint16_t dma_vchan);
+
 #endif /* _RTE_VHOST_ASYNC_H_ */
diff --git a/lib/vhost/version.map b/lib/vhost/version.map
index 1202ba9c1a..816a6dc942 100644
--- a/lib/vhost/version.map
+++ b/lib/vhost/version.map
@@ -87,6 +87,7 @@ EXPERIMENTAL {
 
 	# added in 22.03
 	rte_vhost_async_dma_configure;
+	rte_vhost_async_try_dequeue_burst;
 };
 
 INTERNAL {
diff --git a/lib/vhost/vhost.h b/lib/vhost/vhost.h
index d9bda34e11..17166607ea 100644
--- a/lib/vhost/vhost.h
+++ b/lib/vhost/vhost.h
@@ -161,6 +161,7 @@ extern struct async_dma_info dma_copy_track[RTE_DMADEV_DEFAULT_MAX];
  */
 struct async_inflight_info {
 	struct rte_mbuf *mbuf;
+	struct virtio_net_hdr nethdr;
 	uint16_t descs; /* num of descs inflight */
 	uint16_t nr_buffers; /* num of buffers inflight for packed ring */
 };
diff --git a/lib/vhost/virtio_net.c b/lib/vhost/virtio_net.c
index 9f81fc9733..148709f2c5 100644
--- a/lib/vhost/virtio_net.c
+++ b/lib/vhost/virtio_net.c
@@ -3092,3 +3092,497 @@ rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
 
 	return count;
 }
+
+static __rte_always_inline int
+async_desc_to_mbuf_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
+			struct rte_mbuf *m, uint32_t mbuf_offset,
+			uint64_t buf_iova, uint32_t cpy_len)
+{
+	uint64_t mapped_len;
+	uint32_t buf_offset = 0;
+	void *hpa;
+
+	while (cpy_len) {
+		hpa = (void *)(uintptr_t)gpa_to_first_hpa(dev,
+					buf_iova + buf_offset, cpy_len,
+					&mapped_len);
+		if (unlikely(!hpa)) {
+			VHOST_LOG_DATA(ERR, "(%d) %s: failed to get hpa.\n",
+				dev->vid, __func__);
+			return -1;
+		}
+
+		if (unlikely(async_iter_add_iovec(vq->async, hpa,
+				(void *)(uintptr_t)rte_pktmbuf_iova_offset(m, mbuf_offset),
+				(size_t)mapped_len)))
+			return -1;
+
+		cpy_len -= (uint32_t)mapped_len;
+		mbuf_offset += (uint32_t)mapped_len;
+		buf_offset += (uint32_t)mapped_len;
+	}
+
+	return 0;
+}
+
+static __rte_always_inline int
+async_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
+		  struct buf_vector *buf_vec, uint16_t nr_vec,
+		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool,
+		  struct virtio_net_hdr *nethdr)
+{
+	uint64_t buf_addr, buf_iova;
+	uint32_t buf_avail, buf_offset, buf_len;
+	uint32_t mbuf_avail, mbuf_offset;
+	uint32_t cpy_len;
+	/* A counter to avoid desc dead loop chain */
+	uint16_t vec_idx = 0;
+	struct rte_mbuf *cur = m, *prev = m;
+	struct virtio_net_hdr tmp_hdr;
+	struct virtio_net_hdr *hdr = NULL;
+	struct vhost_async *async = vq->async;
+
+	buf_addr = buf_vec[vec_idx].buf_addr;
+	buf_len = buf_vec[vec_idx].buf_len;
+	buf_iova = buf_vec[vec_idx].buf_iova;
+
+	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1))
+		return -1;
+
+	if (virtio_net_with_host_offload(dev)) {
+		if (unlikely(buf_len < sizeof(struct virtio_net_hdr))) {
+			/*
+			 * No luck, the virtio-net header doesn't fit
+			 * in a contiguous virtual area.
+			 */
+			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
+			hdr = &tmp_hdr;
+		} else {
+			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_addr);
+		}
+	}
+
+	/*
+	 * A virtio driver normally uses at least 2 desc buffers
+	 * for Tx: the first for storing the header, and others
+	 * for storing the data.
+	 */
+	if (unlikely(buf_len < dev->vhost_hlen)) {
+		buf_offset = dev->vhost_hlen - buf_len;
+		vec_idx++;
+		buf_addr = buf_vec[vec_idx].buf_addr;
+		buf_iova = buf_vec[vec_idx].buf_iova;
+		buf_len = buf_vec[vec_idx].buf_len;
+		buf_avail  = buf_len - buf_offset;
+	} else if (buf_len == dev->vhost_hlen) {
+		if (unlikely(++vec_idx >= nr_vec))
+			return -1;
+		buf_addr = buf_vec[vec_idx].buf_addr;
+		buf_iova = buf_vec[vec_idx].buf_iova;
+		buf_len = buf_vec[vec_idx].buf_len;
+
+		buf_offset = 0;
+		buf_avail = buf_len;
+	} else {
+		buf_offset = dev->vhost_hlen;
+		buf_avail = buf_vec[vec_idx].buf_len - dev->vhost_hlen;
+	}
+
+	PRINT_PACKET(dev, (uintptr_t)(buf_addr + buf_offset), (uint32_t)buf_avail, 0);
+
+	mbuf_offset = 0;
+	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
+
+	if (async_iter_initialize(async))
+		return -1;
+
+	while (1) {
+		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
+
+		if (async_desc_to_mbuf_seg(dev, vq, cur, mbuf_offset, buf_iova + buf_offset,
+					   cpy_len) < 0)
+			goto error;
+
+		mbuf_avail -= cpy_len;
+		buf_avail -= cpy_len;
+		mbuf_offset += cpy_len;
+		buf_offset += cpy_len;
+
+		/* This buf reaches to its end, get the next one */
+		if (buf_avail == 0) {
+			if (++vec_idx >= nr_vec)
+				break;
+
+			buf_addr = buf_vec[vec_idx].buf_addr;
+			buf_iova = buf_vec[vec_idx].buf_iova;
+			buf_len = buf_vec[vec_idx].buf_len;
+
+			buf_offset = 0;
+			buf_avail = buf_len;
+
+			PRINT_PACKET(dev, (uintptr_t)buf_addr, (uint32_t)buf_avail, 0);
+		}
+
+		/*
+		 * This mbuf reaches to its end, get a new one
+		 * to hold more data.
+		 */
+		if (mbuf_avail == 0) {
+			cur = rte_pktmbuf_alloc(mbuf_pool);
+			if (unlikely(cur == NULL)) {
+				VHOST_LOG_DATA(ERR,
+					"(%d) %s: failed to allocate memory for mbuf.\n",
+					dev->vid, __func__);
+				goto error;
+			}
+
+			prev->next = cur;
+			prev->data_len = mbuf_offset;
+			m->nb_segs += 1;
+			m->pkt_len += mbuf_offset;
+			prev = cur;
+
+			mbuf_offset = 0;
+			mbuf_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
+		}
+	}
+
+	prev->data_len = mbuf_offset;
+	m->pkt_len += mbuf_offset;
+
+	async_iter_finalize(async);
+	if (hdr)
+		*nethdr = *hdr;
+
+	return 0;
+
+error:
+	async_iter_cancel(async);
+	return -1;
+}
+
+static __rte_always_inline uint16_t
+async_poll_dequeue_completed_split(struct virtio_net *dev, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint16_t count, uint16_t dma_id, uint16_t dma_vchan,
+		bool legacy_ol_flags)
+{
+	uint16_t start_idx, from, i;
+	uint16_t nr_cpl_pkts = 0;
+	struct async_inflight_info *pkts_info;
+	struct vhost_virtqueue *vq = dev->virtqueue[queue_id];
+
+	pkts_info = vq->async->pkts_info;
+
+	vhost_async_dma_check_completed(dma_id, dma_vchan, count);
+
+	start_idx = async_get_first_inflight_pkt_idx(vq);
+
+	from = start_idx;
+	while (vq->async->pkts_cmpl_flag[from] && count--) {
+		vq->async->pkts_cmpl_flag[from] = false;
+		from = (from + 1) & (vq->size - 1);
+		nr_cpl_pkts++;
+	}
+
+	for (i = 0; i < nr_cpl_pkts; i++) {
+		from = (start_idx + i) & (vq->size - 1);
+		pkts[i] = pkts_info[from].mbuf;
+
+		if (virtio_net_with_host_offload(dev))
+			vhost_dequeue_offload(&pkts_info[from].nethdr, pkts[i], legacy_ol_flags);
+	}
+
+	/* write back completed descs to used ring and update used idx */
+	write_back_completed_descs_split(vq, nr_cpl_pkts);
+	__atomic_add_fetch(&vq->used->idx, nr_cpl_pkts, __ATOMIC_RELEASE);
+	vhost_vring_call_split(dev, vq);
+
+	vq->async->pkts_inflight_n -= nr_cpl_pkts;
+
+	return nr_cpl_pkts;
+}
+
+static __rte_always_inline uint16_t
+virtio_dev_tx_async_split(struct virtio_net *dev,	struct vhost_virtqueue *vq,
+		uint16_t queue_id, struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+		uint16_t count, uint16_t dma_id, uint16_t dma_vchan, bool legacy_ol_flags)
+{
+	static bool allocerr_warned;
+	bool dropped = false;
+	uint16_t free_entries;
+	uint16_t pkt_idx, slot_idx = 0;
+	uint16_t nr_done_pkts = 0;
+	uint16_t pkt_err = 0;
+	int32_t n_xfer;
+	struct vhost_async *async = vq->async;
+	struct async_inflight_info *pkts_info = async->pkts_info;
+	struct rte_mbuf *pkts_prealloc[MAX_PKT_BURST];
+	uint16_t pkts_size = count;
+
+	/**
+	 * The ordering between avail index and
+	 * desc reads needs to be enforced.
+	 */
+	free_entries = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE) - vq->last_avail_idx;
+	if (free_entries == 0)
+		goto out;
+
+	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
+
+	async_iter_reset(async);
+
+	count = RTE_MIN(count, MAX_PKT_BURST);
+	count = RTE_MIN(count, free_entries);
+	VHOST_LOG_DATA(DEBUG, "(%d) about to dequeue %u buffers\n", dev->vid, count);
+
+	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts_prealloc, count))
+		goto out;
+
+	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
+		uint16_t head_idx = 0;
+		uint16_t nr_vec = 0;
+		uint16_t to;
+		uint32_t buf_len;
+		int err;
+		struct buf_vector buf_vec[BUF_VECTOR_MAX];
+		struct rte_mbuf *pkt = pkts_prealloc[pkt_idx];
+
+		if (unlikely(fill_vec_buf_split(dev, vq, vq->last_avail_idx,
+						&nr_vec, buf_vec,
+						&head_idx, &buf_len,
+						VHOST_ACCESS_RO) < 0)) {
+			dropped = true;
+			break;
+		}
+
+		err = virtio_dev_pktmbuf_prep(dev, pkt, buf_len);
+		if (unlikely(err)) {
+			/**
+			 * mbuf allocation fails for jumbo packets when external
+			 * buffer allocation is not allowed and linear buffer
+			 * is required. Drop this packet.
+			 */
+			if (!allocerr_warned) {
+				VHOST_LOG_DATA(ERR,
+					"(%d) %s: Failed mbuf alloc of size %d from %s on %s.\n",
+					dev->vid, __func__, buf_len, mbuf_pool->name, dev->ifname);
+				allocerr_warned = true;
+			}
+			dropped = true;
+			break;
+		}
+
+		slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1);
+		err = async_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkt, mbuf_pool,
+					&pkts_info[slot_idx].nethdr);
+		if (unlikely(err)) {
+			if (!allocerr_warned) {
+				VHOST_LOG_DATA(ERR,
+					"(%d) %s: Failed to offload copies to async channel %s.\n",
+					dev->vid, __func__, dev->ifname);
+				allocerr_warned = true;
+			}
+			dropped = true;
+			break;
+		}
+
+		pkts_info[slot_idx].mbuf = pkt;
+
+		/* store used descs */
+		to = async->desc_idx_split & (vq->size - 1);
+		async->descs_split[to].id = head_idx;
+		async->descs_split[to].len = 0;
+		async->desc_idx_split++;
+
+		vq->last_avail_idx++;
+	}
+
+	if (unlikely(dropped))
+		rte_pktmbuf_free_bulk(&pkts_prealloc[pkt_idx], count - pkt_idx);
+
+	n_xfer = vhost_async_dma_transfer(vq, dma_id, dma_vchan, async->pkts_idx, async->iov_iter,
+			pkt_idx);
+
+	async->pkts_inflight_n += n_xfer;
+
+	pkt_err = pkt_idx - n_xfer;
+	if (unlikely(pkt_err)) {
+		VHOST_LOG_DATA(DEBUG,
+			"(%d) %s: failed to transfer data for queue id %d.\n",
+			dev->vid, __func__, queue_id);
+
+		pkt_idx = n_xfer;
+		/* recover available ring */
+		vq->last_avail_idx -= pkt_err;
+
+		/**
+		 * recover async channel copy related structures and free pktmbufs
+		 * for error pkts.
+		 */
+		async->desc_idx_split -= pkt_err;
+		while (pkt_err-- > 0) {
+			rte_pktmbuf_free(pkts_info[slot_idx & (vq->size - 1)].mbuf);
+			slot_idx--;
+		}
+	}
+
+	async->pkts_idx += pkt_idx;
+	if (async->pkts_idx >= vq->size)
+		async->pkts_idx -= vq->size;
+
+out:
+	if (async->pkts_inflight_n > 0) {
+		nr_done_pkts = async_poll_dequeue_completed_split(dev, queue_id, pkts, pkts_size,
+					dma_id, dma_vchan, legacy_ol_flags);
+	}
+
+	return nr_done_pkts;
+}
+
+__rte_noinline
+static uint16_t
+virtio_dev_tx_async_split_legacy(struct virtio_net *dev,
+		struct vhost_virtqueue *vq, uint16_t queue_id,
+		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+		uint16_t count, uint16_t dma_id, uint16_t dma_vchan)
+{
+	return virtio_dev_tx_async_split(dev, vq, queue_id, mbuf_pool,
+				pkts, count, dma_id, dma_vchan, true);
+}
+
+__rte_noinline
+static uint16_t
+virtio_dev_tx_async_split_compliant(struct virtio_net *dev,
+		struct vhost_virtqueue *vq, uint16_t queue_id,
+		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+		uint16_t count, uint16_t dma_id, uint16_t dma_vchan)
+{
+	return virtio_dev_tx_async_split(dev, vq, queue_id, mbuf_pool,
+				pkts, count, dma_id, dma_vchan, false);
+}
+
+uint16_t
+rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
+	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
+	int *nr_inflight, uint16_t dma_id, uint16_t dma_vchan)
+{
+	struct virtio_net *dev;
+	struct rte_mbuf *rarp_mbuf = NULL;
+	struct vhost_virtqueue *vq;
+	int16_t success = 1;
+
+	*nr_inflight = -1;
+
+	dev = get_device(vid);
+	if (!dev)
+		return 0;
+
+	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
+		VHOST_LOG_DATA(ERR,
+			"(%d) %s: built-in vhost net backend is disabled.\n",
+			dev->vid, __func__);
+		return 0;
+	}
+
+	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
+		VHOST_LOG_DATA(ERR,
+			"(%d) %s: invalid virtqueue idx %d.\n",
+			dev->vid, __func__, queue_id);
+		return 0;
+	}
+
+	vq = dev->virtqueue[queue_id];
+
+	if (unlikely(rte_spinlock_trylock(&vq->access_lock) == 0))
+		return 0;
+
+	if (unlikely(vq->enabled == 0)) {
+		count = 0;
+		goto out_access_unlock;
+	}
+
+	if (unlikely(!vq->async)) {
+		VHOST_LOG_DATA(ERR, "(%d) %s: async not registered for queue id %d.\n",
+			dev->vid, __func__, queue_id);
+		count = 0;
+		goto out_access_unlock;
+	}
+
+	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
+		vhost_user_iotlb_rd_lock(vq);
+
+	if (unlikely(vq->access_ok == 0))
+		if (unlikely(vring_translate(dev, vq) < 0)) {
+			count = 0;
+			goto out;
+		}
+
+	/*
+	 * Construct a RARP broadcast packet, and inject it to the "pkts"
+	 * array, to looks like that guest actually send such packet.
+	 *
+	 * Check user_send_rarp() for more information.
+	 *
+	 * broadcast_rarp shares a cacheline in the virtio_net structure
+	 * with some fields that are accessed during enqueue and
+	 * __atomic_compare_exchange_n causes a write if performed compare
+	 * and exchange. This could result in false sharing between enqueue
+	 * and dequeue.
+	 *
+	 * Prevent unnecessary false sharing by reading broadcast_rarp first
+	 * and only performing compare and exchange if the read indicates it
+	 * is likely to be set.
+	 */
+	if (unlikely(__atomic_load_n(&dev->broadcast_rarp, __ATOMIC_ACQUIRE) &&
+			__atomic_compare_exchange_n(&dev->broadcast_rarp,
+			&success, 0, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))) {
+
+		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
+		if (rarp_mbuf == NULL) {
+			VHOST_LOG_DATA(ERR, "Failed to make RARP packet.\n");
+			count = 0;
+			goto out;
+		}
+		count -= 1;
+	}
+
+	if (unlikely(vq_is_packed(dev))) {
+		static bool not_support_pack_log;
+		if (!not_support_pack_log) {
+			VHOST_LOG_DATA(ERR,
+				"(%d) %s: async dequeue does not support packed ring.\n",
+				dev->vid, __func__);
+			not_support_pack_log = true;
+		}
+		count = 0;
+		goto out;
+	}
+
+	if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
+		count = virtio_dev_tx_async_split_legacy(dev, vq, queue_id,
+				mbuf_pool, pkts, count, dma_id, dma_vchan);
+	else
+		count = virtio_dev_tx_async_split_compliant(dev, vq, queue_id,
+				mbuf_pool, pkts, count, dma_id, dma_vchan);
+
+	*nr_inflight = vq->async->pkts_inflight_n;
+
+out:
+	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
+		vhost_user_iotlb_rd_unlock(vq);
+
+out_access_unlock:
+	rte_spinlock_unlock(&vq->access_lock);
+
+	if (unlikely(rarp_mbuf != NULL)) {
+		/*
+		 * Inject it to the head of "pkts" array, so that switch's mac
+		 * learning table will get updated first.
+		 */
+		memmove(&pkts[1], pkts, count * sizeof(struct rte_mbuf *));
+		pkts[0] = rarp_mbuf;
+		count += 1;
+	}
+
+	return count;
+}
-- 
2.17.1


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [RFC 2/2] examples/vhost: support async dequeue data path
  2022-01-01  0:12 [RFC 0/2] vhost: support async dequeue data path xuan.ding
  2022-01-01  0:12 ` [RFC 1/2] vhost: support async dequeue for split ring xuan.ding
@ 2022-01-01  0:12 ` xuan.ding
  2022-02-24 11:03 ` [RFC,v2 0/2] vhost: " xuan.ding
  2022-03-10  6:54 ` [RFC,v3 0/2] vhost: " xuan.ding
  3 siblings, 0 replies; 11+ messages in thread
From: xuan.ding @ 2022-01-01  0:12 UTC (permalink / raw)
  To: maxime.coquelin, chenbo.xia
  Cc: dev, jiayu.hu, sunil.pai.g, liangma, yuanx.wang, cheng1.jiang,
	wenwux.ma, Xuan Ding, Yuan Wang

From: Xuan Ding <xuan.ding@intel.com>

This patch adds the use case for async dequeue API. Vswitch can
leverage DMA device to accelerate vhost async dequeue path.

Signed-off-by: Wenwu Ma <wenwux.ma@intel.com>
Signed-off-by: Yuan Wang <yuan.wangx@intel.com>
Signed-off-by: Xuan Ding <xuan.ding@intel.com>
---
 doc/guides/sample_app_ug/vhost.rst |   9 +-
 examples/vhost/main.c              | 301 +++++++++++++++++++----------
 examples/vhost/main.h              |  35 +++-
 examples/vhost/virtio_net.c        |  16 +-
 4 files changed, 255 insertions(+), 106 deletions(-)

diff --git a/doc/guides/sample_app_ug/vhost.rst b/doc/guides/sample_app_ug/vhost.rst
index a6ce4bc8ac..09db965e70 100644
--- a/doc/guides/sample_app_ug/vhost.rst
+++ b/doc/guides/sample_app_ug/vhost.rst
@@ -169,9 +169,12 @@ demonstrates how to use the async vhost APIs. It's used in combination with dmas
 **--dmas**
 This parameter is used to specify the assigned DMA device of a vhost device.
 Async vhost-user net driver will be used if --dmas is set. For example
---dmas [txd0@00:04.0,txd1@00:04.1] means use DMA channel 00:04.0 for vhost
-device 0 enqueue operation and use DMA channel 00:04.1 for vhost device 1
-enqueue operation.
+--dmas [txd0@00:04.0,txd1@00:04.1,rxd0@00:04.2,rxd1@00:04.3] means use
+DMA channel 00:04.0/00:04.2 for vhost device 0 enqueue/dequeue operation
+and use DMA channel 00:04.1/00:04.3 for vhost device 1 enqueue/dequeue
+operation. The index of the device corresponds to the socket file in order,
+that means vhost device 0 is created through the first socket file, vhost
+device 1 is created through the second socket file, and so on.
 
 Common Issues
 -------------
diff --git a/examples/vhost/main.c b/examples/vhost/main.c
index 44073499bc..8c952d3874 100644
--- a/examples/vhost/main.c
+++ b/examples/vhost/main.c
@@ -62,6 +62,9 @@
 #define MAX_VHOST_DEVICE 1024
 #define DMA_RING_SIZE 4096
 
+#define ASYNC_ENQUEUE_VHOST 1
+#define ASYNC_DEQUEUE_VHOST 2
+
 struct dma_for_vhost dma_bind[MAX_VHOST_DEVICE];
 struct rte_vhost_async_dma_info dma_config[RTE_DMADEV_DEFAULT_MAX];
 static int dma_count;
@@ -103,8 +106,6 @@ static int client_mode;
 
 static int builtin_net_driver;
 
-static int async_vhost_driver;
-
 /* Specify timeout (in useconds) between retries on RX. */
 static uint32_t burst_rx_delay_time = BURST_RX_WAIT_US;
 /* Specify the number of retries on RX. */
@@ -114,6 +115,8 @@ static uint32_t burst_rx_retry_num = BURST_RX_RETRIES;
 static char *socket_files;
 static int nb_sockets;
 
+static struct vhost_queue_ops vdev_queue_ops[MAX_VHOST_DEVICE];
+
 /* empty vmdq configuration structure. Filled in programatically */
 static struct rte_eth_conf vmdq_conf_default = {
 	.rxmode = {
@@ -203,6 +206,18 @@ struct vhost_bufftable *vhost_txbuff[RTE_MAX_LCORE * MAX_VHOST_DEVICE];
 #define MBUF_TABLE_DRAIN_TSC	((rte_get_tsc_hz() + US_PER_S - 1) \
 				 / US_PER_S * BURST_TX_DRAIN_US)
 
+static int vid2socketid[MAX_VHOST_DEVICE];
+
+static uint32_t get_async_flag_by_socketid(int socketid)
+{
+	return dma_bind[socketid].async_flag;
+}
+
+static void init_vid2socketid_array(int vid, int socketid)
+{
+	vid2socketid[vid] = socketid;
+}
+
 static inline bool
 is_dma_configured(int16_t dev_id)
 {
@@ -224,7 +239,7 @@ open_dma(const char *value)
 	char *addrs = input;
 	char *ptrs[2];
 	char *start, *end, *substr;
-	int64_t vid, vring_id;
+	int64_t socketid, vring_id;
 
 	struct rte_dma_info info;
 	struct rte_dma_conf dev_config = { .nb_vchans = 1 };
@@ -263,7 +278,9 @@ open_dma(const char *value)
 
 	while (i < args_nr) {
 		char *arg_temp = dma_arg[i];
+		char *txd, *rxd;
 		uint8_t sub_nr;
+		int async_flag;
 
 		sub_nr = rte_strsplit(arg_temp, strlen(arg_temp), ptrs, 2, '@');
 		if (sub_nr != 2) {
@@ -271,21 +288,28 @@ open_dma(const char *value)
 			goto out;
 		}
 
-		start = strstr(ptrs[0], "txd");
-		if (start == NULL) {
+		txd = strstr(ptrs[0], "txd");
+		rxd = strstr(ptrs[0], "rxd");
+		if (txd) {
+			start = txd;
+			vring_id = VIRTIO_RXQ;
+			async_flag = ASYNC_ENQUEUE_VHOST;
+		} else if (rxd) {
+			start = rxd;
+			vring_id = VIRTIO_TXQ;
+			async_flag = ASYNC_DEQUEUE_VHOST;
+		} else {
 			ret = -1;
 			goto out;
 		}
 
 		start += 3;
-		vid = strtol(start, &end, 0);
+		socketid = strtol(start, &end, 0);
 		if (end == start) {
 			ret = -1;
 			goto out;
 		}
 
-		vring_id = 0 + VIRTIO_RXQ;
-
 		dev_id = rte_dma_get_dev_id_by_name(ptrs[1]);
 		if (dev_id < 0) {
 			RTE_LOG(ERR, VHOST_CONFIG, "Fail to find DMA %s.\n", ptrs[1]);
@@ -325,7 +349,8 @@ open_dma(const char *value)
 		dma_config[dma_count++].max_desc = DMA_RING_SIZE;
 
 done:
-		(dma_info + vid)->dmas[vring_id].dev_id = dev_id;
+		(dma_info + socketid)->dmas[vring_id].dev_id = dev_id;
+		(dma_info + socketid)->async_flag |= async_flag;
 		i++;
 	}
 out:
@@ -792,7 +817,6 @@ us_vhost_parse_args(int argc, char **argv)
 				us_vhost_usage(prgname);
 				return -1;
 			}
-			async_vhost_driver = 1;
 			break;
 
 		case OPT_CLIENT_NUM:
@@ -961,13 +985,13 @@ complete_async_pkts(struct vhost_dev *vdev)
 {
 	struct rte_mbuf *p_cpl[MAX_PKT_BURST];
 	uint16_t complete_count;
-	int16_t dma_id = dma_bind[vdev->vid].dmas[VIRTIO_RXQ].dev_id;
+	int16_t dma_id = dma_bind[vid2socketid[vdev->vid]].dmas[VIRTIO_RXQ].dev_id;
 
 	complete_count = rte_vhost_poll_enqueue_completed(vdev->vid,
 					VIRTIO_RXQ, p_cpl, MAX_PKT_BURST, dma_id, 0);
 	if (complete_count) {
 		free_pkts(p_cpl, complete_count);
-		__atomic_sub_fetch(&vdev->pkts_inflight, complete_count, __ATOMIC_SEQ_CST);
+		__atomic_sub_fetch(&vdev->pkts_enq_inflight, complete_count, __ATOMIC_SEQ_CST);
 	}
 
 }
@@ -1002,23 +1026,7 @@ drain_vhost(struct vhost_dev *vdev)
 	uint16_t nr_xmit = vhost_txbuff[buff_idx]->len;
 	struct rte_mbuf **m = vhost_txbuff[buff_idx]->m_table;
 
-	if (builtin_net_driver) {
-		ret = vs_enqueue_pkts(vdev, VIRTIO_RXQ, m, nr_xmit);
-	} else if (dma_bind[vdev->vid].dmas[VIRTIO_RXQ].async_enabled) {
-		uint16_t enqueue_fail = 0;
-		int16_t dma_id = dma_bind[vdev->vid].dmas[VIRTIO_RXQ].dev_id;
-
-		complete_async_pkts(vdev);
-		ret = rte_vhost_submit_enqueue_burst(vdev->vid, VIRTIO_RXQ, m, nr_xmit, dma_id, 0);
-		__atomic_add_fetch(&vdev->pkts_inflight, ret, __ATOMIC_SEQ_CST);
-
-		enqueue_fail = nr_xmit - ret;
-		if (enqueue_fail)
-			free_pkts(&m[ret], nr_xmit - ret);
-	} else {
-		ret = rte_vhost_enqueue_burst(vdev->vid, VIRTIO_RXQ,
-						m, nr_xmit);
-	}
+	ret = vdev_queue_ops[vdev->vid].enqueue_pkt_burst(vdev, VIRTIO_RXQ, m, nr_xmit);
 
 	if (enable_stats) {
 		__atomic_add_fetch(&vdev->stats.rx_total_atomic, nr_xmit,
@@ -1027,7 +1035,7 @@ drain_vhost(struct vhost_dev *vdev)
 				__ATOMIC_SEQ_CST);
 	}
 
-	if (!dma_bind[vdev->vid].dmas[VIRTIO_RXQ].async_enabled)
+	if (!dma_bind[vid2socketid[vdev->vid]].dmas[VIRTIO_RXQ].async_enabled)
 		free_pkts(m, nr_xmit);
 }
 
@@ -1300,6 +1308,33 @@ drain_mbuf_table(struct mbuf_table *tx_q)
 	}
 }
 
+uint16_t
+async_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint32_t rx_count)
+{
+	uint16_t enqueue_count;
+	uint16_t enqueue_fail = 0;
+	uint16_t dma_id = dma_bind[vid2socketid[dev->vid]].dmas[VIRTIO_RXQ].dev_id;
+
+	complete_async_pkts(dev);
+	enqueue_count = rte_vhost_submit_enqueue_burst(dev->vid, queue_id,
+					pkts, rx_count, dma_id, 0);
+	__atomic_add_fetch(&dev->pkts_enq_inflight, enqueue_count, __ATOMIC_SEQ_CST);
+
+	enqueue_fail = rx_count - enqueue_count;
+	if (enqueue_fail)
+		free_pkts(&pkts[enqueue_count], enqueue_fail);
+
+	return enqueue_count;
+}
+
+uint16_t
+sync_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint32_t rx_count)
+{
+	return rte_vhost_enqueue_burst(dev->vid, queue_id, pkts, rx_count);
+}
+
 static __rte_always_inline void
 drain_eth_rx(struct vhost_dev *vdev)
 {
@@ -1330,26 +1365,8 @@ drain_eth_rx(struct vhost_dev *vdev)
 		}
 	}
 
-	if (builtin_net_driver) {
-		enqueue_count = vs_enqueue_pkts(vdev, VIRTIO_RXQ,
-						pkts, rx_count);
-	} else if (dma_bind[vdev->vid].dmas[VIRTIO_RXQ].async_enabled) {
-		uint16_t enqueue_fail = 0;
-		int16_t dma_id = dma_bind[vdev->vid].dmas[VIRTIO_RXQ].dev_id;
-
-		complete_async_pkts(vdev);
-		enqueue_count = rte_vhost_submit_enqueue_burst(vdev->vid,
-					VIRTIO_RXQ, pkts, rx_count, dma_id, 0);
-		__atomic_add_fetch(&vdev->pkts_inflight, enqueue_count, __ATOMIC_SEQ_CST);
-
-		enqueue_fail = rx_count - enqueue_count;
-		if (enqueue_fail)
-			free_pkts(&pkts[enqueue_count], enqueue_fail);
-
-	} else {
-		enqueue_count = rte_vhost_enqueue_burst(vdev->vid, VIRTIO_RXQ,
-						pkts, rx_count);
-	}
+	enqueue_count = vdev_queue_ops[vdev->vid].enqueue_pkt_burst(vdev,
+					VIRTIO_RXQ, pkts, rx_count);
 
 	if (enable_stats) {
 		__atomic_add_fetch(&vdev->stats.rx_total_atomic, rx_count,
@@ -1358,10 +1375,33 @@ drain_eth_rx(struct vhost_dev *vdev)
 				__ATOMIC_SEQ_CST);
 	}
 
-	if (!dma_bind[vdev->vid].dmas[VIRTIO_RXQ].async_enabled)
+	if (!dma_bind[vid2socketid[vdev->vid]].dmas[VIRTIO_RXQ].async_enabled)
 		free_pkts(pkts, rx_count);
 }
 
+uint16_t async_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			    struct rte_mempool *mbuf_pool,
+			    struct rte_mbuf **pkts, uint16_t count)
+{
+	int nr_inflight;
+	uint16_t dequeue_count;
+	uint16_t dma_id = dma_bind[vid2socketid[dev->vid]].dmas[VIRTIO_TXQ].dev_id;
+
+	dequeue_count = rte_vhost_async_try_dequeue_burst(dev->vid, queue_id,
+			mbuf_pool, pkts, count, &nr_inflight, dma_id, 0);
+	if (likely(nr_inflight != -1))
+		dev->pkts_deq_inflight = nr_inflight;
+
+	return dequeue_count;
+}
+
+uint16_t sync_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			   struct rte_mempool *mbuf_pool,
+			   struct rte_mbuf **pkts, uint16_t count)
+{
+	return rte_vhost_dequeue_burst(dev->vid, queue_id, mbuf_pool, pkts, count);
+}
+
 static __rte_always_inline void
 drain_virtio_tx(struct vhost_dev *vdev)
 {
@@ -1369,13 +1409,8 @@ drain_virtio_tx(struct vhost_dev *vdev)
 	uint16_t count;
 	uint16_t i;
 
-	if (builtin_net_driver) {
-		count = vs_dequeue_pkts(vdev, VIRTIO_TXQ, mbuf_pool,
-					pkts, MAX_PKT_BURST);
-	} else {
-		count = rte_vhost_dequeue_burst(vdev->vid, VIRTIO_TXQ,
-					mbuf_pool, pkts, MAX_PKT_BURST);
-	}
+	count = vdev_queue_ops[vdev->vid].dequeue_pkt_burst(vdev,
+				VIRTIO_TXQ, mbuf_pool, pkts, MAX_PKT_BURST);
 
 	/* setup VMDq for the first packet */
 	if (unlikely(vdev->ready == DEVICE_MAC_LEARNING) && count) {
@@ -1454,6 +1489,31 @@ switch_worker(void *arg __rte_unused)
 	return 0;
 }
 
+static void
+vhost_clear_queue_thread_unsafe(struct vhost_dev *vdev, uint16_t queue_id)
+{
+	uint16_t n_pkt = 0;
+	uint16_t dma_id = dma_bind[vid2socketid[vdev->vid]].dmas[queue_id].dev_id;
+	struct rte_mbuf *m_enq_cpl[vdev->pkts_enq_inflight];
+	struct rte_mbuf *m_deq_cpl[vdev->pkts_deq_inflight];
+
+	if (queue_id % 2 == 0) {
+		while (vdev->pkts_enq_inflight) {
+			n_pkt = rte_vhost_clear_queue_thread_unsafe(vdev->vid,
+				queue_id, m_enq_cpl, vdev->pkts_enq_inflight, dma_id, 0);
+			free_pkts(m_enq_cpl, n_pkt);
+			__atomic_sub_fetch(&vdev->pkts_enq_inflight, n_pkt, __ATOMIC_SEQ_CST);
+		}
+	} else {
+		while (vdev->pkts_deq_inflight) {
+			n_pkt = rte_vhost_clear_queue_thread_unsafe(vdev->vid,
+				queue_id, m_deq_cpl, vdev->pkts_deq_inflight, dma_id, 0);
+			free_pkts(m_deq_cpl, n_pkt);
+			__atomic_sub_fetch(&vdev->pkts_deq_inflight, n_pkt, __ATOMIC_SEQ_CST);
+		}
+	}
+}
+
 /*
  * Remove a device from the specific data core linked list and from the
  * main linked list. Synchonization  occurs through the use of the
@@ -1510,25 +1570,83 @@ destroy_device(int vid)
 		"(%d) device has been removed from data core\n",
 		vdev->vid);
 
-	if (dma_bind[vid].dmas[VIRTIO_RXQ].async_enabled) {
-		uint16_t n_pkt = 0;
-		int16_t dma_id = dma_bind[vid].dmas[VIRTIO_RXQ].dev_id;
-		struct rte_mbuf *m_cpl[vdev->pkts_inflight];
-
-		while (vdev->pkts_inflight) {
-			n_pkt = rte_vhost_clear_queue_thread_unsafe(vid, VIRTIO_RXQ,
-						m_cpl, vdev->pkts_inflight, dma_id, 0);
-			free_pkts(m_cpl, n_pkt);
-			__atomic_sub_fetch(&vdev->pkts_inflight, n_pkt, __ATOMIC_SEQ_CST);
-		}
-
+	if (dma_bind[vid2socketid[vid]].dmas[VIRTIO_RXQ].async_enabled) {
+		vhost_clear_queue_thread_unsafe(vdev, VIRTIO_RXQ);
 		rte_vhost_async_channel_unregister(vid, VIRTIO_RXQ);
-		dma_bind[vid].dmas[VIRTIO_RXQ].async_enabled = false;
+		dma_bind[vid2socketid[vid]].dmas[VIRTIO_RXQ].async_enabled = false;
+	}
+	if (dma_bind[vid2socketid[vid]].dmas[VIRTIO_TXQ].async_enabled) {
+		vhost_clear_queue_thread_unsafe(vdev, VIRTIO_TXQ);
+		rte_vhost_async_channel_unregister(vid, VIRTIO_TXQ);
+		dma_bind[vid2socketid[vid]].dmas[VIRTIO_TXQ].async_enabled = false;
 	}
 
 	rte_free(vdev);
 }
 
+static int
+get_socketid_by_vid(int vid)
+{
+	int i;
+	char ifname[PATH_MAX];
+	rte_vhost_get_ifname(vid, ifname, sizeof(ifname));
+
+	for (i = 0; i < nb_sockets; i++) {
+		char *file = socket_files + i * PATH_MAX;
+		if (strcmp(file, ifname) == 0)
+			return i;
+	}
+
+	return -1;
+}
+
+static int
+init_vhost_queue_ops(int vid)
+{
+	int socketid = get_socketid_by_vid(vid);
+	if (socketid == -1)
+		return -1;
+
+	init_vid2socketid_array(vid, socketid);
+	if (builtin_net_driver) {
+		vdev_queue_ops[vid].enqueue_pkt_burst = builtin_enqueue_pkts;
+		vdev_queue_ops[vid].dequeue_pkt_burst = builtin_dequeue_pkts;
+	} else {
+		if (dma_bind[vid2socketid[vid]].dmas[VIRTIO_RXQ].async_enabled)
+			vdev_queue_ops[vid].enqueue_pkt_burst = async_enqueue_pkts;
+		else
+			vdev_queue_ops[vid].enqueue_pkt_burst = sync_enqueue_pkts;
+
+		if (dma_bind[vid2socketid[vid]].dmas[VIRTIO_TXQ].async_enabled)
+			vdev_queue_ops[vid].dequeue_pkt_burst = async_dequeue_pkts;
+		else
+			vdev_queue_ops[vid].dequeue_pkt_burst = sync_dequeue_pkts;
+	}
+
+	return 0;
+}
+
+static int
+vhost_async_channel_register(int vid)
+{
+	int rx_ret = 0, tx_ret = 0;
+
+	if (dma_bind[vid2socketid[vid]].dmas[VIRTIO_RXQ].dev_id != INVALID_DMA_ID) {
+		rx_ret = rte_vhost_async_channel_register(vid, VIRTIO_RXQ);
+		if (rx_ret == 0)
+			dma_bind[vid2socketid[vid]].dmas[VIRTIO_RXQ].async_enabled = true;
+	}
+
+	if (dma_bind[vid2socketid[vid]].dmas[VIRTIO_TXQ].dev_id != INVALID_DMA_ID) {
+		tx_ret = rte_vhost_async_channel_register(vid, VIRTIO_TXQ);
+		if (tx_ret == 0)
+			dma_bind[vid2socketid[vid]].dmas[VIRTIO_TXQ].async_enabled = true;
+	}
+
+	return rx_ret | tx_ret;
+}
+
+
 /*
  * A new device is added to a data core. First the device is added to the main linked list
  * and then allocated to a specific data core.
@@ -1540,6 +1658,8 @@ new_device(int vid)
 	uint16_t i;
 	uint32_t device_num_min = num_devices;
 	struct vhost_dev *vdev;
+	int ret;
+
 	vdev = rte_zmalloc("vhost device", sizeof(*vdev), RTE_CACHE_LINE_SIZE);
 	if (vdev == NULL) {
 		RTE_LOG(INFO, VHOST_DATA,
@@ -1593,17 +1713,12 @@ new_device(int vid)
 		"(%d) device has been added to data core %d\n",
 		vid, vdev->coreid);
 
-	if (dma_bind[vid].dmas[VIRTIO_RXQ].dev_id != INVALID_DMA_ID) {
-		int ret;
+	ret =  vhost_async_channel_register(vid);
 
-		ret = rte_vhost_async_channel_register(vid, VIRTIO_RXQ);
-		if (ret == 0) {
-			dma_bind[vid].dmas[VIRTIO_RXQ].async_enabled = true;
-		}
-		return ret;
-	}
+	if (init_vhost_queue_ops(vid) != 0)
+		return -1;
 
-	return 0;
+	return ret;
 }
 
 static int
@@ -1621,19 +1736,9 @@ vring_state_changed(int vid, uint16_t queue_id, int enable)
 	if (queue_id != VIRTIO_RXQ)
 		return 0;
 
-	if (dma_bind[vid].dmas[queue_id].async_enabled) {
-		if (!enable) {
-			uint16_t n_pkt = 0;
-			int16_t dma_id = dma_bind[vid].dmas[VIRTIO_RXQ].dev_id;
-			struct rte_mbuf *m_cpl[vdev->pkts_inflight];
-
-			while (vdev->pkts_inflight) {
-				n_pkt = rte_vhost_clear_queue_thread_unsafe(vid, queue_id,
-							m_cpl, vdev->pkts_inflight, dma_id, 0);
-				free_pkts(m_cpl, n_pkt);
-				__atomic_sub_fetch(&vdev->pkts_inflight, n_pkt, __ATOMIC_SEQ_CST);
-			}
-		}
+	if (dma_bind[vid2socketid[vid]].dmas[queue_id].async_enabled) {
+		if (!enable)
+			vhost_clear_queue_thread_unsafe(vdev, queue_id);
 	}
 
 	return 0;
@@ -1896,7 +2001,7 @@ main(int argc, char *argv[])
 	if (client_mode)
 		flags |= RTE_VHOST_USER_CLIENT;
 
-	if (async_vhost_driver) {
+	if (dma_count > 0) {
 		if (rte_vhost_async_dma_configure(dma_config, dma_count) < 0) {
 			RTE_LOG(ERR, VHOST_PORT, "Failed to configure DMA in vhost.\n");
 			for (i = 0; i < dma_count; i++) {
@@ -1906,18 +2011,18 @@ main(int argc, char *argv[])
 				}
 			}
 			dma_count = 0;
-			async_vhost_driver = false;
 		}
 	}
 
 	/* Register vhost user driver to handle vhost messages. */
 	for (i = 0; i < nb_sockets; i++) {
 		char *file = socket_files + i * PATH_MAX;
+		uint64_t flag = flags;
 
-		if (async_vhost_driver)
-			flags = flags | RTE_VHOST_USER_ASYNC_COPY;
+		if (dma_count > 0 && get_async_flag_by_socketid(i) != 0)
+			flag |= RTE_VHOST_USER_ASYNC_COPY;
 
-		ret = rte_vhost_driver_register(file, flags);
+		ret = rte_vhost_driver_register(file, flag);
 		if (ret != 0) {
 			unregister_drivers(i);
 			rte_exit(EXIT_FAILURE,
diff --git a/examples/vhost/main.h b/examples/vhost/main.h
index b4a453e77e..40ac2841d1 100644
--- a/examples/vhost/main.h
+++ b/examples/vhost/main.h
@@ -52,7 +52,8 @@ struct vhost_dev {
 	uint64_t features;
 	size_t hdr_len;
 	uint16_t nr_vrings;
-	uint16_t pkts_inflight;
+	uint16_t pkts_enq_inflight;
+	uint16_t pkts_deq_inflight;
 	struct rte_vhost_memory *mem;
 	struct device_statistics stats;
 	TAILQ_ENTRY(vhost_dev) global_vdev_entry;
@@ -62,6 +63,19 @@ struct vhost_dev {
 	struct vhost_queue queues[MAX_QUEUE_PAIRS * 2];
 } __rte_cache_aligned;
 
+typedef uint16_t (*vhost_enqueue_burst_t)(struct vhost_dev *dev,
+			uint16_t queue_id, struct rte_mbuf **pkts,
+			uint32_t count);
+
+typedef uint16_t (*vhost_dequeue_burst_t)(struct vhost_dev *dev,
+			uint16_t queue_id, struct rte_mempool *mbuf_pool,
+			struct rte_mbuf **pkts, uint16_t count);
+
+struct vhost_queue_ops {
+	vhost_enqueue_burst_t enqueue_pkt_burst;
+	vhost_dequeue_burst_t dequeue_pkt_burst;
+};
+
 TAILQ_HEAD(vhost_dev_tailq_list, vhost_dev);
 
 
@@ -88,6 +102,7 @@ struct dma_info {
 
 struct dma_for_vhost {
 	struct dma_info dmas[RTE_MAX_QUEUES_PER_PORT * 2];
+	uint32_t async_flag;
 };
 
 /* we implement non-extra virtio net features */
@@ -98,7 +113,19 @@ void vs_vhost_net_remove(struct vhost_dev *dev);
 uint16_t vs_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
 			 struct rte_mbuf **pkts, uint32_t count);
 
-uint16_t vs_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
-			 struct rte_mempool *mbuf_pool,
-			 struct rte_mbuf **pkts, uint16_t count);
+uint16_t builtin_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			struct rte_mbuf **pkts, uint32_t count);
+uint16_t builtin_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			struct rte_mempool *mbuf_pool,
+			struct rte_mbuf **pkts, uint16_t count);
+uint16_t sync_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			 struct rte_mbuf **pkts, uint32_t count);
+uint16_t sync_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			struct rte_mempool *mbuf_pool,
+			struct rte_mbuf **pkts, uint16_t count);
+uint16_t async_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			 struct rte_mbuf **pkts, uint32_t count);
+uint16_t async_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			struct rte_mempool *mbuf_pool,
+			struct rte_mbuf **pkts, uint16_t count);
 #endif /* _MAIN_H_ */
diff --git a/examples/vhost/virtio_net.c b/examples/vhost/virtio_net.c
index 9064fc3a82..2432a96566 100644
--- a/examples/vhost/virtio_net.c
+++ b/examples/vhost/virtio_net.c
@@ -238,6 +238,13 @@ vs_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
 	return count;
 }
 
+uint16_t
+builtin_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint32_t count)
+{
+	return vs_enqueue_pkts(dev, queue_id, pkts, count);
+}
+
 static __rte_always_inline int
 dequeue_pkt(struct vhost_dev *dev, struct rte_vhost_vring *vr,
 	    struct rte_mbuf *m, uint16_t desc_idx,
@@ -363,7 +370,7 @@ dequeue_pkt(struct vhost_dev *dev, struct rte_vhost_vring *vr,
 	return 0;
 }
 
-uint16_t
+static uint16_t
 vs_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
 {
@@ -440,3 +447,10 @@ vs_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
 
 	return i;
 }
+
+uint16_t
+builtin_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
+{
+	return vs_dequeue_pkts(dev, queue_id, mbuf_pool, pkts, count);
+}
-- 
2.17.1


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [RFC,v2 0/2] vhost: support async dequeue data path
  2022-01-01  0:12 [RFC 0/2] vhost: support async dequeue data path xuan.ding
  2022-01-01  0:12 ` [RFC 1/2] vhost: support async dequeue for split ring xuan.ding
  2022-01-01  0:12 ` [RFC 2/2] examples/vhost: support async dequeue data path xuan.ding
@ 2022-02-24 11:03 ` xuan.ding
  2022-02-24 11:03   ` [RFC,v2 1/2] vhost: support async dequeue for split ring xuan.ding
  2022-02-24 11:04   ` [RFC,v2 2/2] examples/vhost: support async dequeue data path xuan.ding
  2022-03-10  6:54 ` [RFC,v3 0/2] vhost: " xuan.ding
  3 siblings, 2 replies; 11+ messages in thread
From: xuan.ding @ 2022-02-24 11:03 UTC (permalink / raw)
  To: maxime.coquelin, chenbo.xia
  Cc: dev, jiayu.hu, cheng1.jiang, sunil.pai.g, liangma, Xuan Ding

From: Xuan Ding <xuan.ding@intel.com>

The presence of an asynchronous path allows applications to offload
memory copies to DMA engine, so as to save CPU cycles and improve
the copy performance. This patch set is a draft implementation for
split ring in vhost async dequeue data path. The code is based on
latest enqueue changes [1].

This patch set is a new design and implementation of [2]. Since dmadev
was introduced in DPDK 21.11, to simplify application logics, this patch
integrates dmadev in vhost. With dmadev integrated, vhost supports M:N
mapping between vrings and DMA virtual channels. Specifically, one vring
can use multiple different DMA channels and one DMA channel can be
shared by multiple vrings at the same time.

A new asynchronous dequeue function is introduced:
	1) rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
       		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
		uint16_t count, int *nr_inflight,
		uint16_t dma_id, uint16_t vchan_id)

	Receive packets from the guest and offloads copies to DMA
virtual channel.

[1] https://mails.dpdk.org/archives/dev/2022-February/234555.html
[2] https://mails.dpdk.org/archives/dev/2021-September/218591.html

RFC v1 -> v2:
* fix one bug in example
* rename vchan to vchan_id
* check if dma_id and vchan_id valid
* rework all the logs to new standard

Xuan Ding (2):
  vhost: support async dequeue for split ring
  examples/vhost: support async dequeue data path

 doc/guides/sample_app_ug/vhost.rst |   9 +-
 examples/vhost/main.c              | 292 +++++++++++------
 examples/vhost/main.h              |  35 +-
 examples/vhost/virtio_net.c        |  16 +-
 lib/vhost/rte_vhost_async.h        |  33 ++
 lib/vhost/version.map              |   1 +
 lib/vhost/vhost.h                  |   1 +
 lib/vhost/virtio_net.c             | 504 +++++++++++++++++++++++++++++
 8 files changed, 793 insertions(+), 98 deletions(-)

-- 
2.17.1


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [RFC,v2 1/2] vhost: support async dequeue for split ring
  2022-02-24 11:03 ` [RFC,v2 0/2] vhost: " xuan.ding
@ 2022-02-24 11:03   ` xuan.ding
  2022-02-24 11:04   ` [RFC,v2 2/2] examples/vhost: support async dequeue data path xuan.ding
  1 sibling, 0 replies; 11+ messages in thread
From: xuan.ding @ 2022-02-24 11:03 UTC (permalink / raw)
  To: maxime.coquelin, chenbo.xia
  Cc: dev, jiayu.hu, cheng1.jiang, sunil.pai.g, liangma, Xuan Ding, Yuan Wang

From: Xuan Ding <xuan.ding@intel.com>

This patch implements asynchronous dequeue data path for vhost split
ring, with dmadev library integrated.

Signed-off-by: Xuan Ding <xuan.ding@intel.com>
Signed-off-by: Yuan Wang <yuanx.wang@intel.com>
---
 lib/vhost/rte_vhost_async.h |  37 ++-
 lib/vhost/version.map       |   1 +
 lib/vhost/vhost.h           |   1 +
 lib/vhost/virtio_net.c      | 504 ++++++++++++++++++++++++++++++++++++
 4 files changed, 541 insertions(+), 2 deletions(-)

diff --git a/lib/vhost/rte_vhost_async.h b/lib/vhost/rte_vhost_async.h
index 838c4778cc..a84f7cde9f 100644
--- a/lib/vhost/rte_vhost_async.h
+++ b/lib/vhost/rte_vhost_async.h
@@ -151,9 +151,9 @@ int rte_vhost_async_get_inflight(int vid, uint16_t queue_id);
  * @param count
  *  Size of the packet array
  * @param dma_id
- *  the identifier of DMA device
+ *  The identifier of DMA device
  * @param vchan_id
- *  the identifier of virtual DMA channel
+ *  The identifier of virtual DMA channel
  * @return
  *  Number of packets returned
  */
@@ -183,4 +183,37 @@ uint16_t rte_vhost_clear_queue_thread_unsafe(int vid, uint16_t queue_id,
 __rte_experimental
 int rte_vhost_async_dma_configure(int16_t dma_id, uint16_t vchan_id);
 
+/**
+ * This function tries to receive packets from the guest with offloading
+ * copies to the async channel. The packets that are transfer completed
+ * are returned in "pkts". The other packets that their copies are submitted to
+ * the async channel but not completed are called "in-flight packets".
+ * This function will not return in-flight packets until their copies are
+ * completed by the async channel.
+ *
+ * @param vid
+ *  ID of vhost device to dequeue data
+ * @param queue_id
+ *  ID of virtqueue to dequeue data
+ * @param mbuf_pool
+ *  Mbuf_pool where host mbuf is allocated
+ * @param pkts
+ *  Blank array to keep successfully dequeued packets
+ * @param count
+ *  Size of the packet array
+ * @param nr_inflight
+ *  The amount of in-flight packets. If error occurred, its value is set to -1.
+ * @param dma_id
+ *  The identifier of DMA device
+ * @param vchan_id
+ *  The identifier of virtual DMA channel
+ * @return
+ *  Number of successfully dequeued packets
+ */
+__rte_experimental
+uint16_t
+rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
+	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
+	int *nr_inflight, uint16_t dma_id, uint16_t vchan_id);
+
 #endif /* _RTE_VHOST_ASYNC_H_ */
diff --git a/lib/vhost/version.map b/lib/vhost/version.map
index 1202ba9c1a..816a6dc942 100644
--- a/lib/vhost/version.map
+++ b/lib/vhost/version.map
@@ -87,6 +87,7 @@ EXPERIMENTAL {
 
 	# added in 22.03
 	rte_vhost_async_dma_configure;
+	rte_vhost_async_try_dequeue_burst;
 };
 
 INTERNAL {
diff --git a/lib/vhost/vhost.h b/lib/vhost/vhost.h
index 21e1866a52..ba9d1f487d 100644
--- a/lib/vhost/vhost.h
+++ b/lib/vhost/vhost.h
@@ -180,6 +180,7 @@ extern struct async_dma_info dma_copy_track[RTE_DMADEV_DEFAULT_MAX];
  */
 struct async_inflight_info {
 	struct rte_mbuf *mbuf;
+	struct virtio_net_hdr nethdr;
 	uint16_t descs; /* num of descs inflight */
 	uint16_t nr_buffers; /* num of buffers inflight for packed ring */
 };
diff --git a/lib/vhost/virtio_net.c b/lib/vhost/virtio_net.c
index 5f432b0d77..3816caca79 100644
--- a/lib/vhost/virtio_net.c
+++ b/lib/vhost/virtio_net.c
@@ -3141,3 +3141,507 @@ rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
 
 	return count;
 }
+
+static __rte_always_inline int
+async_desc_to_mbuf_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
+			struct rte_mbuf *m, uint32_t mbuf_offset,
+			uint64_t buf_iova, uint32_t cpy_len)
+{
+	struct vhost_async *async = vq->async;
+	uint64_t mapped_len;
+	uint32_t buf_offset = 0;
+	void *host_iova;
+
+	while (cpy_len) {
+		host_iova = (void *)(uintptr_t)gpa_to_first_hpa(dev,
+					buf_iova + buf_offset, cpy_len,
+					&mapped_len);
+		if (unlikely(!host_iova)) {
+			VHOST_LOG_DATA(ERR, "(%s) %s: failed to get host_iova.\n",
+				dev->ifname, __func__);
+			return -1;
+		}
+
+		if (unlikely(async_iter_add_iovec(dev, async, host_iova,
+				(void *)(uintptr_t)rte_pktmbuf_iova_offset(m, mbuf_offset),
+				(size_t)mapped_len)))
+			return -1;
+
+		cpy_len -= (uint32_t)mapped_len;
+		mbuf_offset += (uint32_t)mapped_len;
+		buf_offset += (uint32_t)mapped_len;
+	}
+
+	return 0;
+}
+
+static __rte_always_inline int
+async_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
+		  struct buf_vector *buf_vec, uint16_t nr_vec,
+		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool,
+		  struct virtio_net_hdr *nethdr)
+{
+	uint64_t buf_addr, buf_iova;
+	uint32_t buf_avail, buf_offset, buf_len;
+	uint32_t mbuf_avail, mbuf_offset;
+	uint32_t cpy_len;
+	/* A counter to avoid desc dead loop chain */
+	uint16_t vec_idx = 0;
+	struct rte_mbuf *cur = m, *prev = m;
+	struct virtio_net_hdr tmp_hdr;
+	struct virtio_net_hdr *hdr = NULL;
+	struct vhost_async *async = vq->async;
+
+	buf_addr = buf_vec[vec_idx].buf_addr;
+	buf_len = buf_vec[vec_idx].buf_len;
+	buf_iova = buf_vec[vec_idx].buf_iova;
+
+	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1))
+		return -1;
+
+	if (virtio_net_with_host_offload(dev)) {
+		if (unlikely(buf_len < sizeof(struct virtio_net_hdr))) {
+			/*
+			 * No luck, the virtio-net header doesn't fit
+			 * in a contiguous virtual area.
+			 */
+			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
+			hdr = &tmp_hdr;
+		} else {
+			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_addr);
+		}
+	}
+
+	/*
+	 * A virtio driver normally uses at least 2 desc buffers
+	 * for Tx: the first for storing the header, and others
+	 * for storing the data.
+	 */
+	if (unlikely(buf_len < dev->vhost_hlen)) {
+		buf_offset = dev->vhost_hlen - buf_len;
+		vec_idx++;
+		buf_addr = buf_vec[vec_idx].buf_addr;
+		buf_iova = buf_vec[vec_idx].buf_iova;
+		buf_len = buf_vec[vec_idx].buf_len;
+		buf_avail  = buf_len - buf_offset;
+	} else if (buf_len == dev->vhost_hlen) {
+		if (unlikely(++vec_idx >= nr_vec))
+			return -1;
+		buf_addr = buf_vec[vec_idx].buf_addr;
+		buf_iova = buf_vec[vec_idx].buf_iova;
+		buf_len = buf_vec[vec_idx].buf_len;
+
+		buf_offset = 0;
+		buf_avail = buf_len;
+	} else {
+		buf_offset = dev->vhost_hlen;
+		buf_avail = buf_vec[vec_idx].buf_len - dev->vhost_hlen;
+	}
+
+	PRINT_PACKET(dev, (uintptr_t)(buf_addr + buf_offset), (uint32_t)buf_avail, 0);
+
+	mbuf_offset = 0;
+	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
+
+	if (async_iter_initialize(dev, async))
+		return -1;
+
+	while (1) {
+		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
+
+		if (async_desc_to_mbuf_seg(dev, vq, cur, mbuf_offset, buf_iova + buf_offset,
+					   cpy_len) < 0)
+			goto error;
+
+		mbuf_avail -= cpy_len;
+		buf_avail -= cpy_len;
+		mbuf_offset += cpy_len;
+		buf_offset += cpy_len;
+
+		/* This buf reaches to its end, get the next one */
+		if (buf_avail == 0) {
+			if (++vec_idx >= nr_vec)
+				break;
+
+			buf_addr = buf_vec[vec_idx].buf_addr;
+			buf_iova = buf_vec[vec_idx].buf_iova;
+			buf_len = buf_vec[vec_idx].buf_len;
+
+			buf_offset = 0;
+			buf_avail = buf_len;
+
+			PRINT_PACKET(dev, (uintptr_t)buf_addr, (uint32_t)buf_avail, 0);
+		}
+
+		/*
+		 * This mbuf reaches to its end, get a new one
+		 * to hold more data.
+		 */
+		if (mbuf_avail == 0) {
+			cur = rte_pktmbuf_alloc(mbuf_pool);
+			if (unlikely(cur == NULL)) {
+				VHOST_LOG_DATA(ERR,
+					"(%s) %s: failed to allocate memory for mbuf.\n",
+					dev->ifname, __func__);
+				goto error;
+			}
+
+			prev->next = cur;
+			prev->data_len = mbuf_offset;
+			m->nb_segs += 1;
+			m->pkt_len += mbuf_offset;
+			prev = cur;
+
+			mbuf_offset = 0;
+			mbuf_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
+		}
+	}
+
+	prev->data_len = mbuf_offset;
+	m->pkt_len += mbuf_offset;
+
+	async_iter_finalize(async);
+	if (hdr)
+		*nethdr = *hdr;
+
+	return 0;
+
+error:
+	async_iter_cancel(async);
+	return -1;
+}
+
+static __rte_always_inline uint16_t
+async_poll_dequeue_completed_split(struct virtio_net *dev, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint16_t count, uint16_t dma_id,
+		uint16_t vchan_id, bool legacy_ol_flags)
+{
+	uint16_t start_idx, from, i;
+	uint16_t nr_cpl_pkts = 0;
+	struct async_inflight_info *pkts_info;
+	struct vhost_virtqueue *vq = dev->virtqueue[queue_id];
+
+	pkts_info = vq->async->pkts_info;
+
+	vhost_async_dma_check_completed(dev, dma_id, vchan_id, VHOST_DMA_MAX_COPY_COMPLETE);
+
+	start_idx = async_get_first_inflight_pkt_idx(vq);
+
+	from = start_idx;
+	while (vq->async->pkts_cmpl_flag[from] && count--) {
+		vq->async->pkts_cmpl_flag[from] = false;
+		from = (from + 1) & (vq->size - 1);
+		nr_cpl_pkts++;
+	}
+
+	if (nr_cpl_pkts == 0)
+		return 0;
+
+	for (i = 0; i < nr_cpl_pkts; i++) {
+		from = (start_idx + i) & (vq->size - 1);
+		pkts[i] = pkts_info[from].mbuf;
+
+		if (virtio_net_with_host_offload(dev))
+			vhost_dequeue_offload(dev, &pkts_info[from].nethdr, pkts[i],
+					      legacy_ol_flags);
+	}
+
+	/* write back completed descs to used ring and update used idx */
+	write_back_completed_descs_split(vq, nr_cpl_pkts);
+	__atomic_add_fetch(&vq->used->idx, nr_cpl_pkts, __ATOMIC_RELEASE);
+	vhost_vring_call_split(dev, vq);
+
+	vq->async->pkts_inflight_n -= nr_cpl_pkts;
+
+	return nr_cpl_pkts;
+}
+
+static __rte_always_inline uint16_t
+virtio_dev_tx_async_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
+		uint16_t queue_id, struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+		uint16_t count, uint16_t dma_id, uint16_t vchan_id, bool legacy_ol_flags)
+{
+	static bool allocerr_warned;
+	bool dropped = false;
+	uint16_t free_entries;
+	uint16_t pkt_idx, slot_idx = 0;
+	uint16_t nr_done_pkts = 0;
+	uint16_t pkt_err = 0;
+	uint16_t n_xfer;
+	struct vhost_async *async = vq->async;
+	struct async_inflight_info *pkts_info = async->pkts_info;
+	struct rte_mbuf *pkts_prealloc[MAX_PKT_BURST];
+	uint16_t pkts_size = count;
+
+	/**
+	 * The ordering between avail index and
+	 * desc reads needs to be enforced.
+	 */
+	free_entries = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE) - vq->last_avail_idx;
+	if (free_entries == 0)
+		goto out;
+
+	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
+
+	async_iter_reset(async);
+
+	count = RTE_MIN(count, MAX_PKT_BURST);
+	count = RTE_MIN(count, free_entries);
+	VHOST_LOG_DATA(DEBUG, "(%s) about to dequeue %u buffers\n", dev->ifname, count);
+
+	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts_prealloc, count))
+		goto out;
+
+	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
+		uint16_t head_idx = 0;
+		uint16_t nr_vec = 0;
+		uint16_t to;
+		uint32_t buf_len;
+		int err;
+		struct buf_vector buf_vec[BUF_VECTOR_MAX];
+		struct rte_mbuf *pkt = pkts_prealloc[pkt_idx];
+
+		if (unlikely(fill_vec_buf_split(dev, vq, vq->last_avail_idx,
+						&nr_vec, buf_vec,
+						&head_idx, &buf_len,
+						VHOST_ACCESS_RO) < 0)) {
+			dropped = true;
+			break;
+		}
+
+		err = virtio_dev_pktmbuf_prep(dev, pkt, buf_len);
+		if (unlikely(err)) {
+			/**
+			 * mbuf allocation fails for jumbo packets when external
+			 * buffer allocation is not allowed and linear buffer
+			 * is required. Drop this packet.
+			 */
+			if (!allocerr_warned) {
+				VHOST_LOG_DATA(ERR,
+					"(%s) %s: Failed mbuf alloc of size %d from %s\n",
+					dev->ifname, __func__, buf_len, mbuf_pool->name);
+				allocerr_warned = true;
+			}
+			dropped = true;
+			break;
+		}
+
+		slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1);
+		err = async_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkt, mbuf_pool,
+					&pkts_info[slot_idx].nethdr);
+		if (unlikely(err)) {
+			if (!allocerr_warned) {
+				VHOST_LOG_DATA(ERR,
+					"(%s) %s: Failed to offload copies to async channel.\n",
+					dev->ifname, __func__);
+				allocerr_warned = true;
+			}
+			dropped = true;
+			break;
+		}
+
+		pkts_info[slot_idx].mbuf = pkt;
+
+		/* store used descs */
+		to = async->desc_idx_split & (vq->size - 1);
+		async->descs_split[to].id = head_idx;
+		async->descs_split[to].len = 0;
+		async->desc_idx_split++;
+
+		vq->last_avail_idx++;
+	}
+
+	if (unlikely(dropped))
+		rte_pktmbuf_free_bulk(&pkts_prealloc[pkt_idx], count - pkt_idx);
+
+	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
+					  async->iov_iter, pkt_idx);
+
+	async->pkts_inflight_n += n_xfer;
+
+	pkt_err = pkt_idx - n_xfer;
+	if (unlikely(pkt_err)) {
+		VHOST_LOG_DATA(DEBUG,
+			"(%s) %s: failed to transfer data for queue id %d.\n",
+			dev->ifname, __func__, queue_id);
+
+		pkt_idx = n_xfer;
+		/* recover available ring */
+		vq->last_avail_idx -= pkt_err;
+
+		/**
+		 * recover async channel copy related structures and free pktmbufs
+		 * for error pkts.
+		 */
+		async->desc_idx_split -= pkt_err;
+		while (pkt_err-- > 0) {
+			rte_pktmbuf_free(pkts_info[slot_idx & (vq->size - 1)].mbuf);
+			slot_idx--;
+		}
+	}
+
+	async->pkts_idx += pkt_idx;
+	if (async->pkts_idx >= vq->size)
+		async->pkts_idx -= vq->size;
+
+out:
+	/* DMA device may serve other queues, unconditionally check completed. */
+	nr_done_pkts = async_poll_dequeue_completed_split(dev, queue_id, pkts, pkts_size,
+							  dma_id, vchan_id, legacy_ol_flags);
+
+	return nr_done_pkts;
+}
+
+__rte_noinline
+static uint16_t
+virtio_dev_tx_async_split_legacy(struct virtio_net *dev,
+		struct vhost_virtqueue *vq, uint16_t queue_id,
+		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+		uint16_t count, uint16_t dma_id, uint16_t vchan_id)
+{
+	return virtio_dev_tx_async_split(dev, vq, queue_id, mbuf_pool,
+				pkts, count, dma_id, vchan_id, true);
+}
+
+__rte_noinline
+static uint16_t
+virtio_dev_tx_async_split_compliant(struct virtio_net *dev,
+		struct vhost_virtqueue *vq, uint16_t queue_id,
+		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+		uint16_t count, uint16_t dma_id, uint16_t vchan_id)
+{
+	return virtio_dev_tx_async_split(dev, vq, queue_id, mbuf_pool,
+				pkts, count, dma_id, vchan_id, false);
+}
+
+uint16_t
+rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
+	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
+	int *nr_inflight, uint16_t dma_id, uint16_t vchan_id)
+{
+	struct virtio_net *dev;
+	struct rte_mbuf *rarp_mbuf = NULL;
+	struct vhost_virtqueue *vq;
+	int16_t success = 1;
+
+	*nr_inflight = -1;
+
+	dev = get_device(vid);
+	if (!dev)
+		return 0;
+
+	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
+		VHOST_LOG_DATA(ERR,
+			"(%s) %s: built-in vhost net backend is disabled.\n",
+			dev->ifname, __func__);
+		return 0;
+	}
+
+	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
+		VHOST_LOG_DATA(ERR,
+			"(%s) %s: invalid virtqueue idx %d.\n",
+			dev->ifname, __func__, queue_id);
+		return 0;
+	}
+
+	if (unlikely(!dma_copy_track[dma_id].vchans ||
+				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
+		VHOST_LOG_DATA(ERR, "(%s) %s: invalid channel %d:%u.\n", dev->ifname, __func__,
+			       dma_id, vchan_id);
+		return 0;
+	}
+
+	vq = dev->virtqueue[queue_id];
+
+	if (unlikely(rte_spinlock_trylock(&vq->access_lock) == 0))
+		return 0;
+
+	if (unlikely(vq->enabled == 0)) {
+		count = 0;
+		goto out_access_unlock;
+	}
+
+	if (unlikely(!vq->async)) {
+		VHOST_LOG_DATA(ERR, "(%s) %s: async not registered for queue id %d.\n",
+			dev->ifname, __func__, queue_id);
+		count = 0;
+		goto out_access_unlock;
+	}
+
+	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
+		vhost_user_iotlb_rd_lock(vq);
+
+	if (unlikely(vq->access_ok == 0))
+		if (unlikely(vring_translate(dev, vq) < 0)) {
+			count = 0;
+			goto out;
+		}
+
+	/*
+	 * Construct a RARP broadcast packet, and inject it to the "pkts"
+	 * array, to looks like that guest actually send such packet.
+	 *
+	 * Check user_send_rarp() for more information.
+	 *
+	 * broadcast_rarp shares a cacheline in the virtio_net structure
+	 * with some fields that are accessed during enqueue and
+	 * __atomic_compare_exchange_n causes a write if performed compare
+	 * and exchange. This could result in false sharing between enqueue
+	 * and dequeue.
+	 *
+	 * Prevent unnecessary false sharing by reading broadcast_rarp first
+	 * and only performing compare and exchange if the read indicates it
+	 * is likely to be set.
+	 */
+	if (unlikely(__atomic_load_n(&dev->broadcast_rarp, __ATOMIC_ACQUIRE) &&
+			__atomic_compare_exchange_n(&dev->broadcast_rarp,
+			&success, 0, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))) {
+
+		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
+		if (rarp_mbuf == NULL) {
+			VHOST_LOG_DATA(ERR, "Failed to make RARP packet.\n");
+			count = 0;
+			goto out;
+		}
+		/*
+		 * Inject it to the head of "pkts" array, so that switch's mac
+		 * learning table will get updated first.
+		 */
+		pkts[0] = rarp_mbuf;
+		pkts++;
+		count -= 1;
+	}
+
+	if (unlikely(vq_is_packed(dev))) {
+		static bool not_support_pack_log;
+		if (!not_support_pack_log) {
+			VHOST_LOG_DATA(ERR,
+				"(%s) %s: async dequeue does not support packed ring.\n",
+				dev->ifname, __func__);
+			not_support_pack_log = true;
+		}
+		count = 0;
+		goto out;
+	}
+
+	if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
+		count = virtio_dev_tx_async_split_legacy(dev, vq, queue_id,
+				mbuf_pool, pkts, count, dma_id, vchan_id);
+	else
+		count = virtio_dev_tx_async_split_compliant(dev, vq, queue_id,
+				mbuf_pool, pkts, count, dma_id, vchan_id);
+
+	*nr_inflight = vq->async->pkts_inflight_n;
+
+out:
+	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
+		vhost_user_iotlb_rd_unlock(vq);
+
+out_access_unlock:
+	rte_spinlock_unlock(&vq->access_lock);
+
+	if (unlikely(rarp_mbuf != NULL))
+		count += 1;
+
+	return count;
+}
-- 
2.17.1


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [RFC,v2 2/2] examples/vhost: support async dequeue data path
  2022-02-24 11:03 ` [RFC,v2 0/2] vhost: " xuan.ding
  2022-02-24 11:03   ` [RFC,v2 1/2] vhost: support async dequeue for split ring xuan.ding
@ 2022-02-24 11:04   ` xuan.ding
  1 sibling, 0 replies; 11+ messages in thread
From: xuan.ding @ 2022-02-24 11:04 UTC (permalink / raw)
  To: maxime.coquelin, chenbo.xia
  Cc: dev, jiayu.hu, cheng1.jiang, sunil.pai.g, liangma, Xuan Ding,
	Wenwu Ma, Yuan Wang

From: Xuan Ding <xuan.ding@intel.com>

This patch adds the use case for async dequeue API. Vswitch can
leverage DMA device to accelerate vhost async dequeue path.

Signed-off-by: Wenwu Ma <wenwux.ma@intel.com>
Signed-off-by: Yuan Wang <yuan.wangx@intel.com>
Signed-off-by: Xuan Ding <xuan.ding@intel.com>
---
 doc/guides/sample_app_ug/vhost.rst |   9 +-
 examples/vhost/main.c              | 292 ++++++++++++++++++++---------
 examples/vhost/main.h              |  35 +++-
 examples/vhost/virtio_net.c        |  16 +-
 4 files changed, 254 insertions(+), 98 deletions(-)

diff --git a/doc/guides/sample_app_ug/vhost.rst b/doc/guides/sample_app_ug/vhost.rst
index a6ce4bc8ac..09db965e70 100644
--- a/doc/guides/sample_app_ug/vhost.rst
+++ b/doc/guides/sample_app_ug/vhost.rst
@@ -169,9 +169,12 @@ demonstrates how to use the async vhost APIs. It's used in combination with dmas
 **--dmas**
 This parameter is used to specify the assigned DMA device of a vhost device.
 Async vhost-user net driver will be used if --dmas is set. For example
---dmas [txd0@00:04.0,txd1@00:04.1] means use DMA channel 00:04.0 for vhost
-device 0 enqueue operation and use DMA channel 00:04.1 for vhost device 1
-enqueue operation.
+--dmas [txd0@00:04.0,txd1@00:04.1,rxd0@00:04.2,rxd1@00:04.3] means use
+DMA channel 00:04.0/00:04.2 for vhost device 0 enqueue/dequeue operation
+and use DMA channel 00:04.1/00:04.3 for vhost device 1 enqueue/dequeue
+operation. The index of the device corresponds to the socket file in order,
+that means vhost device 0 is created through the first socket file, vhost
+device 1 is created through the second socket file, and so on.
 
 Common Issues
 -------------
diff --git a/examples/vhost/main.c b/examples/vhost/main.c
index 68afd398bb..8417ac0f81 100644
--- a/examples/vhost/main.c
+++ b/examples/vhost/main.c
@@ -61,6 +61,9 @@
 
 #define DMA_RING_SIZE 4096
 
+#define ASYNC_ENQUEUE_VHOST 1
+#define ASYNC_DEQUEUE_VHOST 2
+
 struct dma_for_vhost dma_bind[RTE_MAX_VHOST_DEVICE];
 int16_t dmas_id[RTE_DMADEV_DEFAULT_MAX];
 static int dma_count;
@@ -111,6 +114,8 @@ static uint32_t burst_rx_retry_num = BURST_RX_RETRIES;
 static char *socket_files;
 static int nb_sockets;
 
+static struct vhost_queue_ops vdev_queue_ops[RTE_MAX_VHOST_DEVICE];
+
 /* empty VMDq configuration structure. Filled in programmatically */
 static struct rte_eth_conf vmdq_conf_default = {
 	.rxmode = {
@@ -200,6 +205,18 @@ struct vhost_bufftable *vhost_txbuff[RTE_MAX_LCORE * RTE_MAX_VHOST_DEVICE];
 #define MBUF_TABLE_DRAIN_TSC	((rte_get_tsc_hz() + US_PER_S - 1) \
 				 / US_PER_S * BURST_TX_DRAIN_US)
 
+static int vid2socketid[RTE_MAX_VHOST_DEVICE];
+
+static uint32_t get_async_flag_by_socketid(int socketid)
+{
+	return dma_bind[socketid].async_flag;
+}
+
+static void init_vid2socketid_array(int vid, int socketid)
+{
+	vid2socketid[vid] = socketid;
+}
+
 static inline bool
 is_dma_configured(int16_t dev_id)
 {
@@ -219,7 +236,7 @@ open_dma(const char *value)
 	char *addrs = input;
 	char *ptrs[2];
 	char *start, *end, *substr;
-	int64_t vid;
+	int64_t socketid, vring_id;
 
 	struct rte_dma_info info;
 	struct rte_dma_conf dev_config = { .nb_vchans = 1 };
@@ -257,7 +274,9 @@ open_dma(const char *value)
 
 	while (i < args_nr) {
 		char *arg_temp = dma_arg[i];
+		char *txd, *rxd;
 		uint8_t sub_nr;
+		int async_flag;
 
 		sub_nr = rte_strsplit(arg_temp, strlen(arg_temp), ptrs, 2, '@');
 		if (sub_nr != 2) {
@@ -265,14 +284,23 @@ open_dma(const char *value)
 			goto out;
 		}
 
-		start = strstr(ptrs[0], "txd");
-		if (start == NULL) {
+		txd = strstr(ptrs[0], "txd");
+		rxd = strstr(ptrs[0], "rxd");
+		if (txd) {
+			start = txd;
+			vring_id = VIRTIO_RXQ;
+			async_flag = ASYNC_ENQUEUE_VHOST;
+		} else if (rxd) {
+			start = rxd;
+			vring_id = VIRTIO_TXQ;
+			async_flag = ASYNC_DEQUEUE_VHOST;
+		} else {
 			ret = -1;
 			goto out;
 		}
 
 		start += 3;
-		vid = strtol(start, &end, 0);
+		socketid = strtol(start, &end, 0);
 		if (end == start) {
 			ret = -1;
 			goto out;
@@ -333,7 +361,8 @@ open_dma(const char *value)
 		dmas_id[dma_count++] = dev_id;
 
 done:
-		(dma_info + vid)->dmas[VIRTIO_RXQ].dev_id = dev_id;
+		(dma_info + socketid)->dmas[vring_id].dev_id = dev_id;
+		(dma_info + socketid)->async_flag |= async_flag;
 		i++;
 	}
 out:
@@ -967,13 +996,13 @@ complete_async_pkts(struct vhost_dev *vdev)
 {
 	struct rte_mbuf *p_cpl[MAX_PKT_BURST];
 	uint16_t complete_count;
-	int16_t dma_id = dma_bind[vdev->vid].dmas[VIRTIO_RXQ].dev_id;
+	int16_t dma_id = dma_bind[vid2socketid[vdev->vid]].dmas[VIRTIO_RXQ].dev_id;
 
 	complete_count = rte_vhost_poll_enqueue_completed(vdev->vid,
 					VIRTIO_RXQ, p_cpl, MAX_PKT_BURST, dma_id, 0);
 	if (complete_count) {
 		free_pkts(p_cpl, complete_count);
-		__atomic_sub_fetch(&vdev->pkts_inflight, complete_count, __ATOMIC_SEQ_CST);
+		__atomic_sub_fetch(&vdev->pkts_enq_inflight, complete_count, __ATOMIC_SEQ_CST);
 	}
 
 }
@@ -1008,23 +1037,7 @@ drain_vhost(struct vhost_dev *vdev)
 	uint16_t nr_xmit = vhost_txbuff[buff_idx]->len;
 	struct rte_mbuf **m = vhost_txbuff[buff_idx]->m_table;
 
-	if (builtin_net_driver) {
-		ret = vs_enqueue_pkts(vdev, VIRTIO_RXQ, m, nr_xmit);
-	} else if (dma_bind[vdev->vid].dmas[VIRTIO_RXQ].async_enabled) {
-		uint16_t enqueue_fail = 0;
-		int16_t dma_id = dma_bind[vdev->vid].dmas[VIRTIO_RXQ].dev_id;
-
-		complete_async_pkts(vdev);
-		ret = rte_vhost_submit_enqueue_burst(vdev->vid, VIRTIO_RXQ, m, nr_xmit, dma_id, 0);
-		__atomic_add_fetch(&vdev->pkts_inflight, ret, __ATOMIC_SEQ_CST);
-
-		enqueue_fail = nr_xmit - ret;
-		if (enqueue_fail)
-			free_pkts(&m[ret], nr_xmit - ret);
-	} else {
-		ret = rte_vhost_enqueue_burst(vdev->vid, VIRTIO_RXQ,
-						m, nr_xmit);
-	}
+	ret = vdev_queue_ops[vdev->vid].enqueue_pkt_burst(vdev, VIRTIO_RXQ, m, nr_xmit);
 
 	if (enable_stats) {
 		__atomic_add_fetch(&vdev->stats.rx_total_atomic, nr_xmit,
@@ -1033,7 +1046,7 @@ drain_vhost(struct vhost_dev *vdev)
 				__ATOMIC_SEQ_CST);
 	}
 
-	if (!dma_bind[vdev->vid].dmas[VIRTIO_RXQ].async_enabled)
+	if (!dma_bind[vid2socketid[vdev->vid]].dmas[VIRTIO_RXQ].async_enabled)
 		free_pkts(m, nr_xmit);
 }
 
@@ -1305,6 +1318,33 @@ drain_mbuf_table(struct mbuf_table *tx_q)
 	}
 }
 
+uint16_t
+async_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint32_t rx_count)
+{
+	uint16_t enqueue_count;
+	uint16_t enqueue_fail = 0;
+	uint16_t dma_id = dma_bind[vid2socketid[dev->vid]].dmas[VIRTIO_RXQ].dev_id;
+
+	complete_async_pkts(dev);
+	enqueue_count = rte_vhost_submit_enqueue_burst(dev->vid, queue_id,
+					pkts, rx_count, dma_id, 0);
+	__atomic_add_fetch(&dev->pkts_enq_inflight, enqueue_count, __ATOMIC_SEQ_CST);
+
+	enqueue_fail = rx_count - enqueue_count;
+	if (enqueue_fail)
+		free_pkts(&pkts[enqueue_count], enqueue_fail);
+
+	return enqueue_count;
+}
+
+uint16_t
+sync_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint32_t rx_count)
+{
+	return rte_vhost_enqueue_burst(dev->vid, queue_id, pkts, rx_count);
+}
+
 static __rte_always_inline void
 drain_eth_rx(struct vhost_dev *vdev)
 {
@@ -1335,26 +1375,8 @@ drain_eth_rx(struct vhost_dev *vdev)
 		}
 	}
 
-	if (builtin_net_driver) {
-		enqueue_count = vs_enqueue_pkts(vdev, VIRTIO_RXQ,
-						pkts, rx_count);
-	} else if (dma_bind[vdev->vid].dmas[VIRTIO_RXQ].async_enabled) {
-		uint16_t enqueue_fail = 0;
-		int16_t dma_id = dma_bind[vdev->vid].dmas[VIRTIO_RXQ].dev_id;
-
-		complete_async_pkts(vdev);
-		enqueue_count = rte_vhost_submit_enqueue_burst(vdev->vid,
-					VIRTIO_RXQ, pkts, rx_count, dma_id, 0);
-		__atomic_add_fetch(&vdev->pkts_inflight, enqueue_count, __ATOMIC_SEQ_CST);
-
-		enqueue_fail = rx_count - enqueue_count;
-		if (enqueue_fail)
-			free_pkts(&pkts[enqueue_count], enqueue_fail);
-
-	} else {
-		enqueue_count = rte_vhost_enqueue_burst(vdev->vid, VIRTIO_RXQ,
-						pkts, rx_count);
-	}
+	enqueue_count = vdev_queue_ops[vdev->vid].enqueue_pkt_burst(vdev,
+					VIRTIO_RXQ, pkts, rx_count);
 
 	if (enable_stats) {
 		__atomic_add_fetch(&vdev->stats.rx_total_atomic, rx_count,
@@ -1363,10 +1385,33 @@ drain_eth_rx(struct vhost_dev *vdev)
 				__ATOMIC_SEQ_CST);
 	}
 
-	if (!dma_bind[vdev->vid].dmas[VIRTIO_RXQ].async_enabled)
+	if (!dma_bind[vid2socketid[vdev->vid]].dmas[VIRTIO_RXQ].async_enabled)
 		free_pkts(pkts, rx_count);
 }
 
+uint16_t async_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			    struct rte_mempool *mbuf_pool,
+			    struct rte_mbuf **pkts, uint16_t count)
+{
+	int nr_inflight;
+	uint16_t dequeue_count;
+	uint16_t dma_id = dma_bind[vid2socketid[dev->vid]].dmas[VIRTIO_TXQ].dev_id;
+
+	dequeue_count = rte_vhost_async_try_dequeue_burst(dev->vid, queue_id,
+			mbuf_pool, pkts, count, &nr_inflight, dma_id, 0);
+	if (likely(nr_inflight != -1))
+		dev->pkts_deq_inflight = nr_inflight;
+
+	return dequeue_count;
+}
+
+uint16_t sync_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			   struct rte_mempool *mbuf_pool,
+			   struct rte_mbuf **pkts, uint16_t count)
+{
+	return rte_vhost_dequeue_burst(dev->vid, queue_id, mbuf_pool, pkts, count);
+}
+
 static __rte_always_inline void
 drain_virtio_tx(struct vhost_dev *vdev)
 {
@@ -1374,13 +1419,8 @@ drain_virtio_tx(struct vhost_dev *vdev)
 	uint16_t count;
 	uint16_t i;
 
-	if (builtin_net_driver) {
-		count = vs_dequeue_pkts(vdev, VIRTIO_TXQ, mbuf_pool,
-					pkts, MAX_PKT_BURST);
-	} else {
-		count = rte_vhost_dequeue_burst(vdev->vid, VIRTIO_TXQ,
-					mbuf_pool, pkts, MAX_PKT_BURST);
-	}
+	count = vdev_queue_ops[vdev->vid].dequeue_pkt_burst(vdev,
+				VIRTIO_TXQ, mbuf_pool, pkts, MAX_PKT_BURST);
 
 	/* setup VMDq for the first packet */
 	if (unlikely(vdev->ready == DEVICE_MAC_LEARNING) && count) {
@@ -1459,6 +1499,31 @@ switch_worker(void *arg __rte_unused)
 	return 0;
 }
 
+static void
+vhost_clear_queue_thread_unsafe(struct vhost_dev *vdev, uint16_t queue_id)
+{
+	uint16_t n_pkt = 0;
+	uint16_t dma_id = dma_bind[vid2socketid[vdev->vid]].dmas[queue_id].dev_id;
+	struct rte_mbuf *m_enq_cpl[vdev->pkts_enq_inflight];
+	struct rte_mbuf *m_deq_cpl[vdev->pkts_deq_inflight];
+
+	if (queue_id % 2 == 0) {
+		while (vdev->pkts_enq_inflight) {
+			n_pkt = rte_vhost_clear_queue_thread_unsafe(vdev->vid,
+				queue_id, m_enq_cpl, vdev->pkts_enq_inflight, dma_id, 0);
+			free_pkts(m_enq_cpl, n_pkt);
+			__atomic_sub_fetch(&vdev->pkts_enq_inflight, n_pkt, __ATOMIC_SEQ_CST);
+		}
+	} else {
+		while (vdev->pkts_deq_inflight) {
+			n_pkt = rte_vhost_clear_queue_thread_unsafe(vdev->vid,
+				queue_id, m_deq_cpl, vdev->pkts_deq_inflight, dma_id, 0);
+			free_pkts(m_deq_cpl, n_pkt);
+			__atomic_sub_fetch(&vdev->pkts_deq_inflight, n_pkt, __ATOMIC_SEQ_CST);
+		}
+	}
+}
+
 /*
  * Remove a device from the specific data core linked list and from the
  * main linked list. Synchronization  occurs through the use of the
@@ -1515,25 +1580,78 @@ destroy_device(int vid)
 		"(%d) device has been removed from data core\n",
 		vdev->vid);
 
-	if (dma_bind[vid].dmas[VIRTIO_RXQ].async_enabled) {
-		uint16_t n_pkt = 0;
-		int16_t dma_id = dma_bind[vid].dmas[VIRTIO_RXQ].dev_id;
-		struct rte_mbuf *m_cpl[vdev->pkts_inflight];
-
-		while (vdev->pkts_inflight) {
-			n_pkt = rte_vhost_clear_queue_thread_unsafe(vid, VIRTIO_RXQ,
-						m_cpl, vdev->pkts_inflight, dma_id, 0);
-			free_pkts(m_cpl, n_pkt);
-			__atomic_sub_fetch(&vdev->pkts_inflight, n_pkt, __ATOMIC_SEQ_CST);
-		}
-
+	if (dma_bind[vid2socketid[vid]].dmas[VIRTIO_RXQ].async_enabled) {
+		vhost_clear_queue_thread_unsafe(vdev, VIRTIO_RXQ);
 		rte_vhost_async_channel_unregister(vid, VIRTIO_RXQ);
-		dma_bind[vid].dmas[VIRTIO_RXQ].async_enabled = false;
+		dma_bind[vid2socketid[vid]].dmas[VIRTIO_RXQ].async_enabled = false;
+	}
+	if (dma_bind[vid2socketid[vid]].dmas[VIRTIO_TXQ].async_enabled) {
+		vhost_clear_queue_thread_unsafe(vdev, VIRTIO_TXQ);
+		rte_vhost_async_channel_unregister(vid, VIRTIO_TXQ);
+		dma_bind[vid2socketid[vid]].dmas[VIRTIO_TXQ].async_enabled = false;
 	}
 
 	rte_free(vdev);
 }
 
+static int
+get_socketid_by_vid(int vid)
+{
+	int i;
+	char ifname[PATH_MAX];
+	rte_vhost_get_ifname(vid, ifname, sizeof(ifname));
+
+	for (i = 0; i < nb_sockets; i++) {
+		char *file = socket_files + i * PATH_MAX;
+		if (strcmp(file, ifname) == 0)
+			return i;
+	}
+
+	return -1;
+}
+
+static int
+init_vhost_queue_ops(int vid)
+{
+	if (builtin_net_driver) {
+		vdev_queue_ops[vid].enqueue_pkt_burst = builtin_enqueue_pkts;
+		vdev_queue_ops[vid].dequeue_pkt_burst = builtin_dequeue_pkts;
+	} else {
+		if (dma_bind[vid2socketid[vid]].dmas[VIRTIO_RXQ].async_enabled)
+			vdev_queue_ops[vid].enqueue_pkt_burst = async_enqueue_pkts;
+		else
+			vdev_queue_ops[vid].enqueue_pkt_burst = sync_enqueue_pkts;
+
+		if (dma_bind[vid2socketid[vid]].dmas[VIRTIO_TXQ].async_enabled)
+			vdev_queue_ops[vid].dequeue_pkt_burst = async_dequeue_pkts;
+		else
+			vdev_queue_ops[vid].dequeue_pkt_burst = sync_dequeue_pkts;
+	}
+
+	return 0;
+}
+
+static int
+vhost_async_channel_register(int vid)
+{
+	int rx_ret = 0, tx_ret = 0;
+
+	if (dma_bind[vid2socketid[vid]].dmas[VIRTIO_RXQ].dev_id != INVALID_DMA_ID) {
+		rx_ret = rte_vhost_async_channel_register(vid, VIRTIO_RXQ);
+		if (rx_ret == 0)
+			dma_bind[vid2socketid[vid]].dmas[VIRTIO_RXQ].async_enabled = true;
+	}
+
+	if (dma_bind[vid2socketid[vid]].dmas[VIRTIO_TXQ].dev_id != INVALID_DMA_ID) {
+		tx_ret = rte_vhost_async_channel_register(vid, VIRTIO_TXQ);
+		if (tx_ret == 0)
+			dma_bind[vid2socketid[vid]].dmas[VIRTIO_TXQ].async_enabled = true;
+	}
+
+	return rx_ret | tx_ret;
+}
+
+
 /*
  * A new device is added to a data core. First the device is added to the main linked list
  * and then allocated to a specific data core.
@@ -1545,6 +1663,8 @@ new_device(int vid)
 	uint16_t i;
 	uint32_t device_num_min = num_devices;
 	struct vhost_dev *vdev;
+	int ret;
+
 	vdev = rte_zmalloc("vhost device", sizeof(*vdev), RTE_CACHE_LINE_SIZE);
 	if (vdev == NULL) {
 		RTE_LOG(INFO, VHOST_DATA,
@@ -1567,6 +1687,17 @@ new_device(int vid)
 		}
 	}
 
+	int socketid = get_socketid_by_vid(vid);
+	if (socketid == -1)
+		return -1;
+
+	init_vid2socketid_array(vid, socketid);
+
+	ret =  vhost_async_channel_register(vid);
+
+	if (init_vhost_queue_ops(vid) != 0)
+		return -1;
+
 	if (builtin_net_driver)
 		vs_vhost_net_setup(vdev);
 
@@ -1598,16 +1729,7 @@ new_device(int vid)
 		"(%d) device has been added to data core %d\n",
 		vid, vdev->coreid);
 
-	if (dma_bind[vid].dmas[VIRTIO_RXQ].dev_id != INVALID_DMA_ID) {
-		int ret;
-
-		ret = rte_vhost_async_channel_register(vid, VIRTIO_RXQ);
-		if (ret == 0)
-			dma_bind[vid].dmas[VIRTIO_RXQ].async_enabled = true;
-		return ret;
-	}
-
-	return 0;
+	return ret;
 }
 
 static int
@@ -1625,19 +1747,9 @@ vring_state_changed(int vid, uint16_t queue_id, int enable)
 	if (queue_id != VIRTIO_RXQ)
 		return 0;
 
-	if (dma_bind[vid].dmas[queue_id].async_enabled) {
-		if (!enable) {
-			uint16_t n_pkt = 0;
-			int16_t dma_id = dma_bind[vid].dmas[VIRTIO_RXQ].dev_id;
-			struct rte_mbuf *m_cpl[vdev->pkts_inflight];
-
-			while (vdev->pkts_inflight) {
-				n_pkt = rte_vhost_clear_queue_thread_unsafe(vid, queue_id,
-							m_cpl, vdev->pkts_inflight, dma_id, 0);
-				free_pkts(m_cpl, n_pkt);
-				__atomic_sub_fetch(&vdev->pkts_inflight, n_pkt, __ATOMIC_SEQ_CST);
-			}
-		}
+	if (dma_bind[vid2socketid[vid]].dmas[queue_id].async_enabled) {
+		if (!enable)
+			vhost_clear_queue_thread_unsafe(vdev, queue_id);
 	}
 
 	return 0;
@@ -1910,7 +2022,7 @@ main(int argc, char *argv[])
 	for (i = 0; i < nb_sockets; i++) {
 		char *file = socket_files + i * PATH_MAX;
 
-		if (dma_count)
+		if (dma_count && get_async_flag_by_socketid(i) != 0)
 			flags = flags | RTE_VHOST_USER_ASYNC_COPY;
 
 		ret = rte_vhost_driver_register(file, flags);
diff --git a/examples/vhost/main.h b/examples/vhost/main.h
index b4a453e77e..40ac2841d1 100644
--- a/examples/vhost/main.h
+++ b/examples/vhost/main.h
@@ -52,7 +52,8 @@ struct vhost_dev {
 	uint64_t features;
 	size_t hdr_len;
 	uint16_t nr_vrings;
-	uint16_t pkts_inflight;
+	uint16_t pkts_enq_inflight;
+	uint16_t pkts_deq_inflight;
 	struct rte_vhost_memory *mem;
 	struct device_statistics stats;
 	TAILQ_ENTRY(vhost_dev) global_vdev_entry;
@@ -62,6 +63,19 @@ struct vhost_dev {
 	struct vhost_queue queues[MAX_QUEUE_PAIRS * 2];
 } __rte_cache_aligned;
 
+typedef uint16_t (*vhost_enqueue_burst_t)(struct vhost_dev *dev,
+			uint16_t queue_id, struct rte_mbuf **pkts,
+			uint32_t count);
+
+typedef uint16_t (*vhost_dequeue_burst_t)(struct vhost_dev *dev,
+			uint16_t queue_id, struct rte_mempool *mbuf_pool,
+			struct rte_mbuf **pkts, uint16_t count);
+
+struct vhost_queue_ops {
+	vhost_enqueue_burst_t enqueue_pkt_burst;
+	vhost_dequeue_burst_t dequeue_pkt_burst;
+};
+
 TAILQ_HEAD(vhost_dev_tailq_list, vhost_dev);
 
 
@@ -88,6 +102,7 @@ struct dma_info {
 
 struct dma_for_vhost {
 	struct dma_info dmas[RTE_MAX_QUEUES_PER_PORT * 2];
+	uint32_t async_flag;
 };
 
 /* we implement non-extra virtio net features */
@@ -98,7 +113,19 @@ void vs_vhost_net_remove(struct vhost_dev *dev);
 uint16_t vs_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
 			 struct rte_mbuf **pkts, uint32_t count);
 
-uint16_t vs_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
-			 struct rte_mempool *mbuf_pool,
-			 struct rte_mbuf **pkts, uint16_t count);
+uint16_t builtin_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			struct rte_mbuf **pkts, uint32_t count);
+uint16_t builtin_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			struct rte_mempool *mbuf_pool,
+			struct rte_mbuf **pkts, uint16_t count);
+uint16_t sync_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			 struct rte_mbuf **pkts, uint32_t count);
+uint16_t sync_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			struct rte_mempool *mbuf_pool,
+			struct rte_mbuf **pkts, uint16_t count);
+uint16_t async_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			 struct rte_mbuf **pkts, uint32_t count);
+uint16_t async_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			struct rte_mempool *mbuf_pool,
+			struct rte_mbuf **pkts, uint16_t count);
 #endif /* _MAIN_H_ */
diff --git a/examples/vhost/virtio_net.c b/examples/vhost/virtio_net.c
index 9064fc3a82..2432a96566 100644
--- a/examples/vhost/virtio_net.c
+++ b/examples/vhost/virtio_net.c
@@ -238,6 +238,13 @@ vs_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
 	return count;
 }
 
+uint16_t
+builtin_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint32_t count)
+{
+	return vs_enqueue_pkts(dev, queue_id, pkts, count);
+}
+
 static __rte_always_inline int
 dequeue_pkt(struct vhost_dev *dev, struct rte_vhost_vring *vr,
 	    struct rte_mbuf *m, uint16_t desc_idx,
@@ -363,7 +370,7 @@ dequeue_pkt(struct vhost_dev *dev, struct rte_vhost_vring *vr,
 	return 0;
 }
 
-uint16_t
+static uint16_t
 vs_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
 {
@@ -440,3 +447,10 @@ vs_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
 
 	return i;
 }
+
+uint16_t
+builtin_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
+{
+	return vs_dequeue_pkts(dev, queue_id, mbuf_pool, pkts, count);
+}
-- 
2.17.1


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [RFC,v3 0/2] vhost: support async dequeue data path
  2022-01-01  0:12 [RFC 0/2] vhost: support async dequeue data path xuan.ding
                   ` (2 preceding siblings ...)
  2022-02-24 11:03 ` [RFC,v2 0/2] vhost: " xuan.ding
@ 2022-03-10  6:54 ` xuan.ding
  2022-03-10  6:54   ` [RFC,v3 1/2] vhost: support async dequeue for split ring xuan.ding
  2022-03-10  6:54   ` [RFC,v3 2/2] examples/vhost: support async dequeue data path xuan.ding
  3 siblings, 2 replies; 11+ messages in thread
From: xuan.ding @ 2022-03-10  6:54 UTC (permalink / raw)
  To: maxime.coquelin, chenbo.xia
  Cc: dev, jiayu.hu, cheng1.jiang, sunil.pai.g, liangma, Xuan Ding

From: Xuan Ding <xuan.ding@intel.com>

The presence of asynchronous path allows applications to offload
memory copies to DMA engine, so as to save CPU cycles and improve
the copy performance. This patch set is a draft implementation for
split ring in vhost async dequeue data path. The code is based on
latest enqueue changes [1].

This patch set is a new design and implementation of [2]. Since dmadev
was introduced in DPDK 21.11, to simplify application logics, this patch
integrates dmadev in vhost. With dmadev integrated, vhost supports M:N
mapping between vrings and DMA virtual channels. Specifically, one vring
can use multiple different DMA channels and one DMA channel can be
shared by multiple vrings at the same time.

A new asynchronous dequeue function is introduced:
        1) rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
                struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
                uint16_t count, int *nr_inflight,
                uint16_t dma_id, uint16_t vchan_id)

        Receive packets from the guest and offloads copies to DMA
virtual channel.

[1] https://mails.dpdk.org/archives/dev/2022-February/234555.html
[2] https://mails.dpdk.org/archives/dev/2021-September/218591.html

RFC v2->v3:
* rebase to latest DPDK version

RFC v1 -> v2:
* fix one bug in example
* rename vchan to vchan_id
* check if dma_id and vchan_id valid
* rework all the logs to new standard

Xuan Ding (2):
  vhost: support async dequeue for split ring
  examples/vhost: support async dequeue data path

 doc/guides/sample_app_ug/vhost.rst |   9 +-
 examples/vhost/main.c              | 292 +++++++++++------
 examples/vhost/main.h              |  35 +-
 examples/vhost/virtio_net.c        |  16 +-
 lib/vhost/rte_vhost_async.h        |  37 ++-
 lib/vhost/version.map              |   1 +
 lib/vhost/vhost.h                  |   1 +
 lib/vhost/virtio_net.c             | 504 +++++++++++++++++++++++++++++
 8 files changed, 795 insertions(+), 100 deletions(-)

-- 
2.17.1


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [RFC,v3 1/2] vhost: support async dequeue for split ring
  2022-03-10  6:54 ` [RFC,v3 0/2] vhost: " xuan.ding
@ 2022-03-10  6:54   ` xuan.ding
  2022-03-31  9:15     ` Maxime Coquelin
  2022-03-10  6:54   ` [RFC,v3 2/2] examples/vhost: support async dequeue data path xuan.ding
  1 sibling, 1 reply; 11+ messages in thread
From: xuan.ding @ 2022-03-10  6:54 UTC (permalink / raw)
  To: maxime.coquelin, chenbo.xia
  Cc: dev, jiayu.hu, cheng1.jiang, sunil.pai.g, liangma, Xuan Ding, Yuan Wang

From: Xuan Ding <xuan.ding@intel.com>

This patch implements asynchronous dequeue data path for vhost split
ring, with dmadev library integrated.

Signed-off-by: Xuan Ding <xuan.ding@intel.com>
Signed-off-by: Yuan Wang <yuanx.wang@intel.com>
---
 lib/vhost/rte_vhost_async.h |  37 ++-
 lib/vhost/version.map       |   1 +
 lib/vhost/vhost.h           |   1 +
 lib/vhost/virtio_net.c      | 504 ++++++++++++++++++++++++++++++++++++
 4 files changed, 541 insertions(+), 2 deletions(-)

diff --git a/lib/vhost/rte_vhost_async.h b/lib/vhost/rte_vhost_async.h
index f1293c6a9d..b6ab0b06a2 100644
--- a/lib/vhost/rte_vhost_async.h
+++ b/lib/vhost/rte_vhost_async.h
@@ -155,9 +155,9 @@ int rte_vhost_async_get_inflight(int vid, uint16_t queue_id);
  * @param count
  *  Size of the packet array
  * @param dma_id
- *  the identifier of DMA device
+ *  The identifier of DMA device
  * @param vchan_id
- *  the identifier of virtual DMA channel
+ *  The identifier of virtual DMA channel
  * @return
  *  Number of packets returned
  */
@@ -187,6 +187,39 @@ uint16_t rte_vhost_clear_queue_thread_unsafe(int vid, uint16_t queue_id,
 __rte_experimental
 int rte_vhost_async_dma_configure(int16_t dma_id, uint16_t vchan_id);
 
+/**
+ * This function tries to receive packets from the guest with offloading
+ * copies to the async channel. The packets that are transfer completed
+ * are returned in "pkts". The other packets that their copies are submitted to
+ * the async channel but not completed are called "in-flight packets".
+ * This function will not return in-flight packets until their copies are
+ * completed by the async channel.
+ *
+ * @param vid
+ *  ID of vhost device to dequeue data
+ * @param queue_id
+ *  ID of virtqueue to dequeue data
+ * @param mbuf_pool
+ *  Mbuf_pool where host mbuf is allocated
+ * @param pkts
+ *  Blank array to keep successfully dequeued packets
+ * @param count
+ *  Size of the packet array
+ * @param nr_inflight
+ *  The amount of in-flight packets. If error occurred, its value is set to -1.
+ * @param dma_id
+ *  The identifier of DMA device
+ * @param vchan_id
+ *  The identifier of virtual DMA channel
+ * @return
+ *  Number of successfully dequeued packets
+ */
+__rte_experimental
+uint16_t
+rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
+	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
+	int *nr_inflight, uint16_t dma_id, uint16_t vchan_id);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/vhost/version.map b/lib/vhost/version.map
index 0a66c5840c..968d6d4290 100644
--- a/lib/vhost/version.map
+++ b/lib/vhost/version.map
@@ -87,6 +87,7 @@ EXPERIMENTAL {
 
 	# added in 22.03
 	rte_vhost_async_dma_configure;
+	rte_vhost_async_try_dequeue_burst;
 };
 
 INTERNAL {
diff --git a/lib/vhost/vhost.h b/lib/vhost/vhost.h
index a9edc271aa..3799d41089 100644
--- a/lib/vhost/vhost.h
+++ b/lib/vhost/vhost.h
@@ -178,6 +178,7 @@ extern struct async_dma_info dma_copy_track[RTE_DMADEV_DEFAULT_MAX];
  */
 struct async_inflight_info {
 	struct rte_mbuf *mbuf;
+	struct virtio_net_hdr nethdr;
 	uint16_t descs; /* num of descs inflight */
 	uint16_t nr_buffers; /* num of buffers inflight for packed ring */
 };
diff --git a/lib/vhost/virtio_net.c b/lib/vhost/virtio_net.c
index 5f432b0d77..3816caca79 100644
--- a/lib/vhost/virtio_net.c
+++ b/lib/vhost/virtio_net.c
@@ -3141,3 +3141,507 @@ rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
 
 	return count;
 }
+
+static __rte_always_inline int
+async_desc_to_mbuf_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
+			struct rte_mbuf *m, uint32_t mbuf_offset,
+			uint64_t buf_iova, uint32_t cpy_len)
+{
+	struct vhost_async *async = vq->async;
+	uint64_t mapped_len;
+	uint32_t buf_offset = 0;
+	void *host_iova;
+
+	while (cpy_len) {
+		host_iova = (void *)(uintptr_t)gpa_to_first_hpa(dev,
+					buf_iova + buf_offset, cpy_len,
+					&mapped_len);
+		if (unlikely(!host_iova)) {
+			VHOST_LOG_DATA(ERR, "(%s) %s: failed to get host_iova.\n",
+				dev->ifname, __func__);
+			return -1;
+		}
+
+		if (unlikely(async_iter_add_iovec(dev, async, host_iova,
+				(void *)(uintptr_t)rte_pktmbuf_iova_offset(m, mbuf_offset),
+				(size_t)mapped_len)))
+			return -1;
+
+		cpy_len -= (uint32_t)mapped_len;
+		mbuf_offset += (uint32_t)mapped_len;
+		buf_offset += (uint32_t)mapped_len;
+	}
+
+	return 0;
+}
+
+static __rte_always_inline int
+async_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
+		  struct buf_vector *buf_vec, uint16_t nr_vec,
+		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool,
+		  struct virtio_net_hdr *nethdr)
+{
+	uint64_t buf_addr, buf_iova;
+	uint32_t buf_avail, buf_offset, buf_len;
+	uint32_t mbuf_avail, mbuf_offset;
+	uint32_t cpy_len;
+	/* A counter to avoid desc dead loop chain */
+	uint16_t vec_idx = 0;
+	struct rte_mbuf *cur = m, *prev = m;
+	struct virtio_net_hdr tmp_hdr;
+	struct virtio_net_hdr *hdr = NULL;
+	struct vhost_async *async = vq->async;
+
+	buf_addr = buf_vec[vec_idx].buf_addr;
+	buf_len = buf_vec[vec_idx].buf_len;
+	buf_iova = buf_vec[vec_idx].buf_iova;
+
+	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1))
+		return -1;
+
+	if (virtio_net_with_host_offload(dev)) {
+		if (unlikely(buf_len < sizeof(struct virtio_net_hdr))) {
+			/*
+			 * No luck, the virtio-net header doesn't fit
+			 * in a contiguous virtual area.
+			 */
+			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
+			hdr = &tmp_hdr;
+		} else {
+			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_addr);
+		}
+	}
+
+	/*
+	 * A virtio driver normally uses at least 2 desc buffers
+	 * for Tx: the first for storing the header, and others
+	 * for storing the data.
+	 */
+	if (unlikely(buf_len < dev->vhost_hlen)) {
+		buf_offset = dev->vhost_hlen - buf_len;
+		vec_idx++;
+		buf_addr = buf_vec[vec_idx].buf_addr;
+		buf_iova = buf_vec[vec_idx].buf_iova;
+		buf_len = buf_vec[vec_idx].buf_len;
+		buf_avail  = buf_len - buf_offset;
+	} else if (buf_len == dev->vhost_hlen) {
+		if (unlikely(++vec_idx >= nr_vec))
+			return -1;
+		buf_addr = buf_vec[vec_idx].buf_addr;
+		buf_iova = buf_vec[vec_idx].buf_iova;
+		buf_len = buf_vec[vec_idx].buf_len;
+
+		buf_offset = 0;
+		buf_avail = buf_len;
+	} else {
+		buf_offset = dev->vhost_hlen;
+		buf_avail = buf_vec[vec_idx].buf_len - dev->vhost_hlen;
+	}
+
+	PRINT_PACKET(dev, (uintptr_t)(buf_addr + buf_offset), (uint32_t)buf_avail, 0);
+
+	mbuf_offset = 0;
+	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
+
+	if (async_iter_initialize(dev, async))
+		return -1;
+
+	while (1) {
+		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
+
+		if (async_desc_to_mbuf_seg(dev, vq, cur, mbuf_offset, buf_iova + buf_offset,
+					   cpy_len) < 0)
+			goto error;
+
+		mbuf_avail -= cpy_len;
+		buf_avail -= cpy_len;
+		mbuf_offset += cpy_len;
+		buf_offset += cpy_len;
+
+		/* This buf reaches to its end, get the next one */
+		if (buf_avail == 0) {
+			if (++vec_idx >= nr_vec)
+				break;
+
+			buf_addr = buf_vec[vec_idx].buf_addr;
+			buf_iova = buf_vec[vec_idx].buf_iova;
+			buf_len = buf_vec[vec_idx].buf_len;
+
+			buf_offset = 0;
+			buf_avail = buf_len;
+
+			PRINT_PACKET(dev, (uintptr_t)buf_addr, (uint32_t)buf_avail, 0);
+		}
+
+		/*
+		 * This mbuf reaches to its end, get a new one
+		 * to hold more data.
+		 */
+		if (mbuf_avail == 0) {
+			cur = rte_pktmbuf_alloc(mbuf_pool);
+			if (unlikely(cur == NULL)) {
+				VHOST_LOG_DATA(ERR,
+					"(%s) %s: failed to allocate memory for mbuf.\n",
+					dev->ifname, __func__);
+				goto error;
+			}
+
+			prev->next = cur;
+			prev->data_len = mbuf_offset;
+			m->nb_segs += 1;
+			m->pkt_len += mbuf_offset;
+			prev = cur;
+
+			mbuf_offset = 0;
+			mbuf_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
+		}
+	}
+
+	prev->data_len = mbuf_offset;
+	m->pkt_len += mbuf_offset;
+
+	async_iter_finalize(async);
+	if (hdr)
+		*nethdr = *hdr;
+
+	return 0;
+
+error:
+	async_iter_cancel(async);
+	return -1;
+}
+
+static __rte_always_inline uint16_t
+async_poll_dequeue_completed_split(struct virtio_net *dev, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint16_t count, uint16_t dma_id,
+		uint16_t vchan_id, bool legacy_ol_flags)
+{
+	uint16_t start_idx, from, i;
+	uint16_t nr_cpl_pkts = 0;
+	struct async_inflight_info *pkts_info;
+	struct vhost_virtqueue *vq = dev->virtqueue[queue_id];
+
+	pkts_info = vq->async->pkts_info;
+
+	vhost_async_dma_check_completed(dev, dma_id, vchan_id, VHOST_DMA_MAX_COPY_COMPLETE);
+
+	start_idx = async_get_first_inflight_pkt_idx(vq);
+
+	from = start_idx;
+	while (vq->async->pkts_cmpl_flag[from] && count--) {
+		vq->async->pkts_cmpl_flag[from] = false;
+		from = (from + 1) & (vq->size - 1);
+		nr_cpl_pkts++;
+	}
+
+	if (nr_cpl_pkts == 0)
+		return 0;
+
+	for (i = 0; i < nr_cpl_pkts; i++) {
+		from = (start_idx + i) & (vq->size - 1);
+		pkts[i] = pkts_info[from].mbuf;
+
+		if (virtio_net_with_host_offload(dev))
+			vhost_dequeue_offload(dev, &pkts_info[from].nethdr, pkts[i],
+					      legacy_ol_flags);
+	}
+
+	/* write back completed descs to used ring and update used idx */
+	write_back_completed_descs_split(vq, nr_cpl_pkts);
+	__atomic_add_fetch(&vq->used->idx, nr_cpl_pkts, __ATOMIC_RELEASE);
+	vhost_vring_call_split(dev, vq);
+
+	vq->async->pkts_inflight_n -= nr_cpl_pkts;
+
+	return nr_cpl_pkts;
+}
+
+static __rte_always_inline uint16_t
+virtio_dev_tx_async_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
+		uint16_t queue_id, struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+		uint16_t count, uint16_t dma_id, uint16_t vchan_id, bool legacy_ol_flags)
+{
+	static bool allocerr_warned;
+	bool dropped = false;
+	uint16_t free_entries;
+	uint16_t pkt_idx, slot_idx = 0;
+	uint16_t nr_done_pkts = 0;
+	uint16_t pkt_err = 0;
+	uint16_t n_xfer;
+	struct vhost_async *async = vq->async;
+	struct async_inflight_info *pkts_info = async->pkts_info;
+	struct rte_mbuf *pkts_prealloc[MAX_PKT_BURST];
+	uint16_t pkts_size = count;
+
+	/**
+	 * The ordering between avail index and
+	 * desc reads needs to be enforced.
+	 */
+	free_entries = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE) - vq->last_avail_idx;
+	if (free_entries == 0)
+		goto out;
+
+	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
+
+	async_iter_reset(async);
+
+	count = RTE_MIN(count, MAX_PKT_BURST);
+	count = RTE_MIN(count, free_entries);
+	VHOST_LOG_DATA(DEBUG, "(%s) about to dequeue %u buffers\n", dev->ifname, count);
+
+	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts_prealloc, count))
+		goto out;
+
+	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
+		uint16_t head_idx = 0;
+		uint16_t nr_vec = 0;
+		uint16_t to;
+		uint32_t buf_len;
+		int err;
+		struct buf_vector buf_vec[BUF_VECTOR_MAX];
+		struct rte_mbuf *pkt = pkts_prealloc[pkt_idx];
+
+		if (unlikely(fill_vec_buf_split(dev, vq, vq->last_avail_idx,
+						&nr_vec, buf_vec,
+						&head_idx, &buf_len,
+						VHOST_ACCESS_RO) < 0)) {
+			dropped = true;
+			break;
+		}
+
+		err = virtio_dev_pktmbuf_prep(dev, pkt, buf_len);
+		if (unlikely(err)) {
+			/**
+			 * mbuf allocation fails for jumbo packets when external
+			 * buffer allocation is not allowed and linear buffer
+			 * is required. Drop this packet.
+			 */
+			if (!allocerr_warned) {
+				VHOST_LOG_DATA(ERR,
+					"(%s) %s: Failed mbuf alloc of size %d from %s\n",
+					dev->ifname, __func__, buf_len, mbuf_pool->name);
+				allocerr_warned = true;
+			}
+			dropped = true;
+			break;
+		}
+
+		slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1);
+		err = async_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkt, mbuf_pool,
+					&pkts_info[slot_idx].nethdr);
+		if (unlikely(err)) {
+			if (!allocerr_warned) {
+				VHOST_LOG_DATA(ERR,
+					"(%s) %s: Failed to offload copies to async channel.\n",
+					dev->ifname, __func__);
+				allocerr_warned = true;
+			}
+			dropped = true;
+			break;
+		}
+
+		pkts_info[slot_idx].mbuf = pkt;
+
+		/* store used descs */
+		to = async->desc_idx_split & (vq->size - 1);
+		async->descs_split[to].id = head_idx;
+		async->descs_split[to].len = 0;
+		async->desc_idx_split++;
+
+		vq->last_avail_idx++;
+	}
+
+	if (unlikely(dropped))
+		rte_pktmbuf_free_bulk(&pkts_prealloc[pkt_idx], count - pkt_idx);
+
+	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
+					  async->iov_iter, pkt_idx);
+
+	async->pkts_inflight_n += n_xfer;
+
+	pkt_err = pkt_idx - n_xfer;
+	if (unlikely(pkt_err)) {
+		VHOST_LOG_DATA(DEBUG,
+			"(%s) %s: failed to transfer data for queue id %d.\n",
+			dev->ifname, __func__, queue_id);
+
+		pkt_idx = n_xfer;
+		/* recover available ring */
+		vq->last_avail_idx -= pkt_err;
+
+		/**
+		 * recover async channel copy related structures and free pktmbufs
+		 * for error pkts.
+		 */
+		async->desc_idx_split -= pkt_err;
+		while (pkt_err-- > 0) {
+			rte_pktmbuf_free(pkts_info[slot_idx & (vq->size - 1)].mbuf);
+			slot_idx--;
+		}
+	}
+
+	async->pkts_idx += pkt_idx;
+	if (async->pkts_idx >= vq->size)
+		async->pkts_idx -= vq->size;
+
+out:
+	/* DMA device may serve other queues, unconditionally check completed. */
+	nr_done_pkts = async_poll_dequeue_completed_split(dev, queue_id, pkts, pkts_size,
+							  dma_id, vchan_id, legacy_ol_flags);
+
+	return nr_done_pkts;
+}
+
+__rte_noinline
+static uint16_t
+virtio_dev_tx_async_split_legacy(struct virtio_net *dev,
+		struct vhost_virtqueue *vq, uint16_t queue_id,
+		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+		uint16_t count, uint16_t dma_id, uint16_t vchan_id)
+{
+	return virtio_dev_tx_async_split(dev, vq, queue_id, mbuf_pool,
+				pkts, count, dma_id, vchan_id, true);
+}
+
+__rte_noinline
+static uint16_t
+virtio_dev_tx_async_split_compliant(struct virtio_net *dev,
+		struct vhost_virtqueue *vq, uint16_t queue_id,
+		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+		uint16_t count, uint16_t dma_id, uint16_t vchan_id)
+{
+	return virtio_dev_tx_async_split(dev, vq, queue_id, mbuf_pool,
+				pkts, count, dma_id, vchan_id, false);
+}
+
+uint16_t
+rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
+	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
+	int *nr_inflight, uint16_t dma_id, uint16_t vchan_id)
+{
+	struct virtio_net *dev;
+	struct rte_mbuf *rarp_mbuf = NULL;
+	struct vhost_virtqueue *vq;
+	int16_t success = 1;
+
+	*nr_inflight = -1;
+
+	dev = get_device(vid);
+	if (!dev)
+		return 0;
+
+	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
+		VHOST_LOG_DATA(ERR,
+			"(%s) %s: built-in vhost net backend is disabled.\n",
+			dev->ifname, __func__);
+		return 0;
+	}
+
+	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
+		VHOST_LOG_DATA(ERR,
+			"(%s) %s: invalid virtqueue idx %d.\n",
+			dev->ifname, __func__, queue_id);
+		return 0;
+	}
+
+	if (unlikely(!dma_copy_track[dma_id].vchans ||
+				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
+		VHOST_LOG_DATA(ERR, "(%s) %s: invalid channel %d:%u.\n", dev->ifname, __func__,
+			       dma_id, vchan_id);
+		return 0;
+	}
+
+	vq = dev->virtqueue[queue_id];
+
+	if (unlikely(rte_spinlock_trylock(&vq->access_lock) == 0))
+		return 0;
+
+	if (unlikely(vq->enabled == 0)) {
+		count = 0;
+		goto out_access_unlock;
+	}
+
+	if (unlikely(!vq->async)) {
+		VHOST_LOG_DATA(ERR, "(%s) %s: async not registered for queue id %d.\n",
+			dev->ifname, __func__, queue_id);
+		count = 0;
+		goto out_access_unlock;
+	}
+
+	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
+		vhost_user_iotlb_rd_lock(vq);
+
+	if (unlikely(vq->access_ok == 0))
+		if (unlikely(vring_translate(dev, vq) < 0)) {
+			count = 0;
+			goto out;
+		}
+
+	/*
+	 * Construct a RARP broadcast packet, and inject it to the "pkts"
+	 * array, to looks like that guest actually send such packet.
+	 *
+	 * Check user_send_rarp() for more information.
+	 *
+	 * broadcast_rarp shares a cacheline in the virtio_net structure
+	 * with some fields that are accessed during enqueue and
+	 * __atomic_compare_exchange_n causes a write if performed compare
+	 * and exchange. This could result in false sharing between enqueue
+	 * and dequeue.
+	 *
+	 * Prevent unnecessary false sharing by reading broadcast_rarp first
+	 * and only performing compare and exchange if the read indicates it
+	 * is likely to be set.
+	 */
+	if (unlikely(__atomic_load_n(&dev->broadcast_rarp, __ATOMIC_ACQUIRE) &&
+			__atomic_compare_exchange_n(&dev->broadcast_rarp,
+			&success, 0, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))) {
+
+		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
+		if (rarp_mbuf == NULL) {
+			VHOST_LOG_DATA(ERR, "Failed to make RARP packet.\n");
+			count = 0;
+			goto out;
+		}
+		/*
+		 * Inject it to the head of "pkts" array, so that switch's mac
+		 * learning table will get updated first.
+		 */
+		pkts[0] = rarp_mbuf;
+		pkts++;
+		count -= 1;
+	}
+
+	if (unlikely(vq_is_packed(dev))) {
+		static bool not_support_pack_log;
+		if (!not_support_pack_log) {
+			VHOST_LOG_DATA(ERR,
+				"(%s) %s: async dequeue does not support packed ring.\n",
+				dev->ifname, __func__);
+			not_support_pack_log = true;
+		}
+		count = 0;
+		goto out;
+	}
+
+	if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
+		count = virtio_dev_tx_async_split_legacy(dev, vq, queue_id,
+				mbuf_pool, pkts, count, dma_id, vchan_id);
+	else
+		count = virtio_dev_tx_async_split_compliant(dev, vq, queue_id,
+				mbuf_pool, pkts, count, dma_id, vchan_id);
+
+	*nr_inflight = vq->async->pkts_inflight_n;
+
+out:
+	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
+		vhost_user_iotlb_rd_unlock(vq);
+
+out_access_unlock:
+	rte_spinlock_unlock(&vq->access_lock);
+
+	if (unlikely(rarp_mbuf != NULL))
+		count += 1;
+
+	return count;
+}
-- 
2.17.1


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [RFC,v3 2/2] examples/vhost: support async dequeue data path
  2022-03-10  6:54 ` [RFC,v3 0/2] vhost: " xuan.ding
  2022-03-10  6:54   ` [RFC,v3 1/2] vhost: support async dequeue for split ring xuan.ding
@ 2022-03-10  6:54   ` xuan.ding
  1 sibling, 0 replies; 11+ messages in thread
From: xuan.ding @ 2022-03-10  6:54 UTC (permalink / raw)
  To: maxime.coquelin, chenbo.xia
  Cc: dev, jiayu.hu, cheng1.jiang, sunil.pai.g, liangma, Xuan Ding,
	Wenwu Ma, Yuan Wang

From: Xuan Ding <xuan.ding@intel.com>

This patch adds the use case for async dequeue API. Vswitch can
leverage DMA device to accelerate vhost async dequeue path.

Signed-off-by: Wenwu Ma <wenwux.ma@intel.com>
Signed-off-by: Yuan Wang <yuan.wangx@intel.com>
Signed-off-by: Xuan Ding <xuan.ding@intel.com>
---
 doc/guides/sample_app_ug/vhost.rst |   9 +-
 examples/vhost/main.c              | 292 ++++++++++++++++++++---------
 examples/vhost/main.h              |  35 +++-
 examples/vhost/virtio_net.c        |  16 +-
 4 files changed, 254 insertions(+), 98 deletions(-)

diff --git a/doc/guides/sample_app_ug/vhost.rst b/doc/guides/sample_app_ug/vhost.rst
index a6ce4bc8ac..09db965e70 100644
--- a/doc/guides/sample_app_ug/vhost.rst
+++ b/doc/guides/sample_app_ug/vhost.rst
@@ -169,9 +169,12 @@ demonstrates how to use the async vhost APIs. It's used in combination with dmas
 **--dmas**
 This parameter is used to specify the assigned DMA device of a vhost device.
 Async vhost-user net driver will be used if --dmas is set. For example
---dmas [txd0@00:04.0,txd1@00:04.1] means use DMA channel 00:04.0 for vhost
-device 0 enqueue operation and use DMA channel 00:04.1 for vhost device 1
-enqueue operation.
+--dmas [txd0@00:04.0,txd1@00:04.1,rxd0@00:04.2,rxd1@00:04.3] means use
+DMA channel 00:04.0/00:04.2 for vhost device 0 enqueue/dequeue operation
+and use DMA channel 00:04.1/00:04.3 for vhost device 1 enqueue/dequeue
+operation. The index of the device corresponds to the socket file in order,
+that means vhost device 0 is created through the first socket file, vhost
+device 1 is created through the second socket file, and so on.
 
 Common Issues
 -------------
diff --git a/examples/vhost/main.c b/examples/vhost/main.c
index d94fabb060..d26e40ab73 100644
--- a/examples/vhost/main.c
+++ b/examples/vhost/main.c
@@ -63,6 +63,9 @@
 
 #define DMA_RING_SIZE 4096
 
+#define ASYNC_ENQUEUE_VHOST 1
+#define ASYNC_DEQUEUE_VHOST 2
+
 /* number of mbufs in all pools - if specified on command-line. */
 static int total_num_mbufs = NUM_MBUFS_DEFAULT;
 
@@ -116,6 +119,8 @@ static uint32_t burst_rx_retry_num = BURST_RX_RETRIES;
 static char *socket_files;
 static int nb_sockets;
 
+static struct vhost_queue_ops vdev_queue_ops[RTE_MAX_VHOST_DEVICE];
+
 /* empty VMDq configuration structure. Filled in programmatically */
 static struct rte_eth_conf vmdq_conf_default = {
 	.rxmode = {
@@ -205,6 +210,18 @@ struct vhost_bufftable *vhost_txbuff[RTE_MAX_LCORE * RTE_MAX_VHOST_DEVICE];
 #define MBUF_TABLE_DRAIN_TSC	((rte_get_tsc_hz() + US_PER_S - 1) \
 				 / US_PER_S * BURST_TX_DRAIN_US)
 
+static int vid2socketid[RTE_MAX_VHOST_DEVICE];
+
+static uint32_t get_async_flag_by_socketid(int socketid)
+{
+	return dma_bind[socketid].async_flag;
+}
+
+static void init_vid2socketid_array(int vid, int socketid)
+{
+	vid2socketid[vid] = socketid;
+}
+
 static inline bool
 is_dma_configured(int16_t dev_id)
 {
@@ -224,7 +241,7 @@ open_dma(const char *value)
 	char *addrs = input;
 	char *ptrs[2];
 	char *start, *end, *substr;
-	int64_t vid;
+	int64_t socketid, vring_id;
 
 	struct rte_dma_info info;
 	struct rte_dma_conf dev_config = { .nb_vchans = 1 };
@@ -262,7 +279,9 @@ open_dma(const char *value)
 
 	while (i < args_nr) {
 		char *arg_temp = dma_arg[i];
+		char *txd, *rxd;
 		uint8_t sub_nr;
+		int async_flag;
 
 		sub_nr = rte_strsplit(arg_temp, strlen(arg_temp), ptrs, 2, '@');
 		if (sub_nr != 2) {
@@ -270,14 +289,23 @@ open_dma(const char *value)
 			goto out;
 		}
 
-		start = strstr(ptrs[0], "txd");
-		if (start == NULL) {
+		txd = strstr(ptrs[0], "txd");
+		rxd = strstr(ptrs[0], "rxd");
+		if (txd) {
+			start = txd;
+			vring_id = VIRTIO_RXQ;
+			async_flag = ASYNC_ENQUEUE_VHOST;
+		} else if (rxd) {
+			start = rxd;
+			vring_id = VIRTIO_TXQ;
+			async_flag = ASYNC_DEQUEUE_VHOST;
+		} else {
 			ret = -1;
 			goto out;
 		}
 
 		start += 3;
-		vid = strtol(start, &end, 0);
+		socketid = strtol(start, &end, 0);
 		if (end == start) {
 			ret = -1;
 			goto out;
@@ -338,7 +366,8 @@ open_dma(const char *value)
 		dmas_id[dma_count++] = dev_id;
 
 done:
-		(dma_info + vid)->dmas[VIRTIO_RXQ].dev_id = dev_id;
+		(dma_info + socketid)->dmas[vring_id].dev_id = dev_id;
+		(dma_info + socketid)->async_flag |= async_flag;
 		i++;
 	}
 out:
@@ -990,13 +1019,13 @@ complete_async_pkts(struct vhost_dev *vdev)
 {
 	struct rte_mbuf *p_cpl[MAX_PKT_BURST];
 	uint16_t complete_count;
-	int16_t dma_id = dma_bind[vdev->vid].dmas[VIRTIO_RXQ].dev_id;
+	int16_t dma_id = dma_bind[vid2socketid[vdev->vid]].dmas[VIRTIO_RXQ].dev_id;
 
 	complete_count = rte_vhost_poll_enqueue_completed(vdev->vid,
 					VIRTIO_RXQ, p_cpl, MAX_PKT_BURST, dma_id, 0);
 	if (complete_count) {
 		free_pkts(p_cpl, complete_count);
-		__atomic_sub_fetch(&vdev->pkts_inflight, complete_count, __ATOMIC_SEQ_CST);
+		__atomic_sub_fetch(&vdev->pkts_enq_inflight, complete_count, __ATOMIC_SEQ_CST);
 	}
 
 }
@@ -1031,23 +1060,7 @@ drain_vhost(struct vhost_dev *vdev)
 	uint16_t nr_xmit = vhost_txbuff[buff_idx]->len;
 	struct rte_mbuf **m = vhost_txbuff[buff_idx]->m_table;
 
-	if (builtin_net_driver) {
-		ret = vs_enqueue_pkts(vdev, VIRTIO_RXQ, m, nr_xmit);
-	} else if (dma_bind[vdev->vid].dmas[VIRTIO_RXQ].async_enabled) {
-		uint16_t enqueue_fail = 0;
-		int16_t dma_id = dma_bind[vdev->vid].dmas[VIRTIO_RXQ].dev_id;
-
-		complete_async_pkts(vdev);
-		ret = rte_vhost_submit_enqueue_burst(vdev->vid, VIRTIO_RXQ, m, nr_xmit, dma_id, 0);
-		__atomic_add_fetch(&vdev->pkts_inflight, ret, __ATOMIC_SEQ_CST);
-
-		enqueue_fail = nr_xmit - ret;
-		if (enqueue_fail)
-			free_pkts(&m[ret], nr_xmit - ret);
-	} else {
-		ret = rte_vhost_enqueue_burst(vdev->vid, VIRTIO_RXQ,
-						m, nr_xmit);
-	}
+	ret = vdev_queue_ops[vdev->vid].enqueue_pkt_burst(vdev, VIRTIO_RXQ, m, nr_xmit);
 
 	if (enable_stats) {
 		__atomic_add_fetch(&vdev->stats.rx_total_atomic, nr_xmit,
@@ -1056,7 +1069,7 @@ drain_vhost(struct vhost_dev *vdev)
 				__ATOMIC_SEQ_CST);
 	}
 
-	if (!dma_bind[vdev->vid].dmas[VIRTIO_RXQ].async_enabled)
+	if (!dma_bind[vid2socketid[vdev->vid]].dmas[VIRTIO_RXQ].async_enabled)
 		free_pkts(m, nr_xmit);
 }
 
@@ -1328,6 +1341,33 @@ drain_mbuf_table(struct mbuf_table *tx_q)
 	}
 }
 
+uint16_t
+async_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint32_t rx_count)
+{
+	uint16_t enqueue_count;
+	uint16_t enqueue_fail = 0;
+	uint16_t dma_id = dma_bind[vid2socketid[dev->vid]].dmas[VIRTIO_RXQ].dev_id;
+
+	complete_async_pkts(dev);
+	enqueue_count = rte_vhost_submit_enqueue_burst(dev->vid, queue_id,
+					pkts, rx_count, dma_id, 0);
+	__atomic_add_fetch(&dev->pkts_enq_inflight, enqueue_count, __ATOMIC_SEQ_CST);
+
+	enqueue_fail = rx_count - enqueue_count;
+	if (enqueue_fail)
+		free_pkts(&pkts[enqueue_count], enqueue_fail);
+
+	return enqueue_count;
+}
+
+uint16_t
+sync_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint32_t rx_count)
+{
+	return rte_vhost_enqueue_burst(dev->vid, queue_id, pkts, rx_count);
+}
+
 static __rte_always_inline void
 drain_eth_rx(struct vhost_dev *vdev)
 {
@@ -1358,26 +1398,8 @@ drain_eth_rx(struct vhost_dev *vdev)
 		}
 	}
 
-	if (builtin_net_driver) {
-		enqueue_count = vs_enqueue_pkts(vdev, VIRTIO_RXQ,
-						pkts, rx_count);
-	} else if (dma_bind[vdev->vid].dmas[VIRTIO_RXQ].async_enabled) {
-		uint16_t enqueue_fail = 0;
-		int16_t dma_id = dma_bind[vdev->vid].dmas[VIRTIO_RXQ].dev_id;
-
-		complete_async_pkts(vdev);
-		enqueue_count = rte_vhost_submit_enqueue_burst(vdev->vid,
-					VIRTIO_RXQ, pkts, rx_count, dma_id, 0);
-		__atomic_add_fetch(&vdev->pkts_inflight, enqueue_count, __ATOMIC_SEQ_CST);
-
-		enqueue_fail = rx_count - enqueue_count;
-		if (enqueue_fail)
-			free_pkts(&pkts[enqueue_count], enqueue_fail);
-
-	} else {
-		enqueue_count = rte_vhost_enqueue_burst(vdev->vid, VIRTIO_RXQ,
-						pkts, rx_count);
-	}
+	enqueue_count = vdev_queue_ops[vdev->vid].enqueue_pkt_burst(vdev,
+					VIRTIO_RXQ, pkts, rx_count);
 
 	if (enable_stats) {
 		__atomic_add_fetch(&vdev->stats.rx_total_atomic, rx_count,
@@ -1386,10 +1408,33 @@ drain_eth_rx(struct vhost_dev *vdev)
 				__ATOMIC_SEQ_CST);
 	}
 
-	if (!dma_bind[vdev->vid].dmas[VIRTIO_RXQ].async_enabled)
+	if (!dma_bind[vid2socketid[vdev->vid]].dmas[VIRTIO_RXQ].async_enabled)
 		free_pkts(pkts, rx_count);
 }
 
+uint16_t async_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			    struct rte_mempool *mbuf_pool,
+			    struct rte_mbuf **pkts, uint16_t count)
+{
+	int nr_inflight;
+	uint16_t dequeue_count;
+	uint16_t dma_id = dma_bind[vid2socketid[dev->vid]].dmas[VIRTIO_TXQ].dev_id;
+
+	dequeue_count = rte_vhost_async_try_dequeue_burst(dev->vid, queue_id,
+			mbuf_pool, pkts, count, &nr_inflight, dma_id, 0);
+	if (likely(nr_inflight != -1))
+		dev->pkts_deq_inflight = nr_inflight;
+
+	return dequeue_count;
+}
+
+uint16_t sync_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			   struct rte_mempool *mbuf_pool,
+			   struct rte_mbuf **pkts, uint16_t count)
+{
+	return rte_vhost_dequeue_burst(dev->vid, queue_id, mbuf_pool, pkts, count);
+}
+
 static __rte_always_inline void
 drain_virtio_tx(struct vhost_dev *vdev)
 {
@@ -1397,13 +1442,8 @@ drain_virtio_tx(struct vhost_dev *vdev)
 	uint16_t count;
 	uint16_t i;
 
-	if (builtin_net_driver) {
-		count = vs_dequeue_pkts(vdev, VIRTIO_TXQ, mbuf_pool,
-					pkts, MAX_PKT_BURST);
-	} else {
-		count = rte_vhost_dequeue_burst(vdev->vid, VIRTIO_TXQ,
-					mbuf_pool, pkts, MAX_PKT_BURST);
-	}
+	count = vdev_queue_ops[vdev->vid].dequeue_pkt_burst(vdev,
+				VIRTIO_TXQ, mbuf_pool, pkts, MAX_PKT_BURST);
 
 	/* setup VMDq for the first packet */
 	if (unlikely(vdev->ready == DEVICE_MAC_LEARNING) && count) {
@@ -1482,6 +1522,31 @@ switch_worker(void *arg __rte_unused)
 	return 0;
 }
 
+static void
+vhost_clear_queue_thread_unsafe(struct vhost_dev *vdev, uint16_t queue_id)
+{
+	uint16_t n_pkt = 0;
+	uint16_t dma_id = dma_bind[vid2socketid[vdev->vid]].dmas[queue_id].dev_id;
+	struct rte_mbuf *m_enq_cpl[vdev->pkts_enq_inflight];
+	struct rte_mbuf *m_deq_cpl[vdev->pkts_deq_inflight];
+
+	if (queue_id % 2 == 0) {
+		while (vdev->pkts_enq_inflight) {
+			n_pkt = rte_vhost_clear_queue_thread_unsafe(vdev->vid,
+				queue_id, m_enq_cpl, vdev->pkts_enq_inflight, dma_id, 0);
+			free_pkts(m_enq_cpl, n_pkt);
+			__atomic_sub_fetch(&vdev->pkts_enq_inflight, n_pkt, __ATOMIC_SEQ_CST);
+		}
+	} else {
+		while (vdev->pkts_deq_inflight) {
+			n_pkt = rte_vhost_clear_queue_thread_unsafe(vdev->vid,
+				queue_id, m_deq_cpl, vdev->pkts_deq_inflight, dma_id, 0);
+			free_pkts(m_deq_cpl, n_pkt);
+			__atomic_sub_fetch(&vdev->pkts_deq_inflight, n_pkt, __ATOMIC_SEQ_CST);
+		}
+	}
+}
+
 /*
  * Remove a device from the specific data core linked list and from the
  * main linked list. Synchronization  occurs through the use of the
@@ -1538,25 +1603,78 @@ destroy_device(int vid)
 		"(%d) device has been removed from data core\n",
 		vdev->vid);
 
-	if (dma_bind[vid].dmas[VIRTIO_RXQ].async_enabled) {
-		uint16_t n_pkt = 0;
-		int16_t dma_id = dma_bind[vid].dmas[VIRTIO_RXQ].dev_id;
-		struct rte_mbuf *m_cpl[vdev->pkts_inflight];
-
-		while (vdev->pkts_inflight) {
-			n_pkt = rte_vhost_clear_queue_thread_unsafe(vid, VIRTIO_RXQ,
-						m_cpl, vdev->pkts_inflight, dma_id, 0);
-			free_pkts(m_cpl, n_pkt);
-			__atomic_sub_fetch(&vdev->pkts_inflight, n_pkt, __ATOMIC_SEQ_CST);
-		}
-
+	if (dma_bind[vid2socketid[vid]].dmas[VIRTIO_RXQ].async_enabled) {
+		vhost_clear_queue_thread_unsafe(vdev, VIRTIO_RXQ);
 		rte_vhost_async_channel_unregister(vid, VIRTIO_RXQ);
-		dma_bind[vid].dmas[VIRTIO_RXQ].async_enabled = false;
+		dma_bind[vid2socketid[vid]].dmas[VIRTIO_RXQ].async_enabled = false;
+	}
+	if (dma_bind[vid2socketid[vid]].dmas[VIRTIO_TXQ].async_enabled) {
+		vhost_clear_queue_thread_unsafe(vdev, VIRTIO_TXQ);
+		rte_vhost_async_channel_unregister(vid, VIRTIO_TXQ);
+		dma_bind[vid2socketid[vid]].dmas[VIRTIO_TXQ].async_enabled = false;
 	}
 
 	rte_free(vdev);
 }
 
+static int
+get_socketid_by_vid(int vid)
+{
+	int i;
+	char ifname[PATH_MAX];
+	rte_vhost_get_ifname(vid, ifname, sizeof(ifname));
+
+	for (i = 0; i < nb_sockets; i++) {
+		char *file = socket_files + i * PATH_MAX;
+		if (strcmp(file, ifname) == 0)
+			return i;
+	}
+
+	return -1;
+}
+
+static int
+init_vhost_queue_ops(int vid)
+{
+	if (builtin_net_driver) {
+		vdev_queue_ops[vid].enqueue_pkt_burst = builtin_enqueue_pkts;
+		vdev_queue_ops[vid].dequeue_pkt_burst = builtin_dequeue_pkts;
+	} else {
+		if (dma_bind[vid2socketid[vid]].dmas[VIRTIO_RXQ].async_enabled)
+			vdev_queue_ops[vid].enqueue_pkt_burst = async_enqueue_pkts;
+		else
+			vdev_queue_ops[vid].enqueue_pkt_burst = sync_enqueue_pkts;
+
+		if (dma_bind[vid2socketid[vid]].dmas[VIRTIO_TXQ].async_enabled)
+			vdev_queue_ops[vid].dequeue_pkt_burst = async_dequeue_pkts;
+		else
+			vdev_queue_ops[vid].dequeue_pkt_burst = sync_dequeue_pkts;
+	}
+
+	return 0;
+}
+
+static int
+vhost_async_channel_register(int vid)
+{
+	int rx_ret = 0, tx_ret = 0;
+
+	if (dma_bind[vid2socketid[vid]].dmas[VIRTIO_RXQ].dev_id != INVALID_DMA_ID) {
+		rx_ret = rte_vhost_async_channel_register(vid, VIRTIO_RXQ);
+		if (rx_ret == 0)
+			dma_bind[vid2socketid[vid]].dmas[VIRTIO_RXQ].async_enabled = true;
+	}
+
+	if (dma_bind[vid2socketid[vid]].dmas[VIRTIO_TXQ].dev_id != INVALID_DMA_ID) {
+		tx_ret = rte_vhost_async_channel_register(vid, VIRTIO_TXQ);
+		if (tx_ret == 0)
+			dma_bind[vid2socketid[vid]].dmas[VIRTIO_TXQ].async_enabled = true;
+	}
+
+	return rx_ret | tx_ret;
+}
+
+
 /*
  * A new device is added to a data core. First the device is added to the main linked list
  * and then allocated to a specific data core.
@@ -1568,6 +1686,8 @@ new_device(int vid)
 	uint16_t i;
 	uint32_t device_num_min = num_devices;
 	struct vhost_dev *vdev;
+	int ret;
+
 	vdev = rte_zmalloc("vhost device", sizeof(*vdev), RTE_CACHE_LINE_SIZE);
 	if (vdev == NULL) {
 		RTE_LOG(INFO, VHOST_DATA,
@@ -1590,6 +1710,17 @@ new_device(int vid)
 		}
 	}
 
+	int socketid = get_socketid_by_vid(vid);
+	if (socketid == -1)
+		return -1;
+
+	init_vid2socketid_array(vid, socketid);
+
+	ret =  vhost_async_channel_register(vid);
+
+	if (init_vhost_queue_ops(vid) != 0)
+		return -1;
+
 	if (builtin_net_driver)
 		vs_vhost_net_setup(vdev);
 
@@ -1621,16 +1752,7 @@ new_device(int vid)
 		"(%d) device has been added to data core %d\n",
 		vid, vdev->coreid);
 
-	if (dma_bind[vid].dmas[VIRTIO_RXQ].dev_id != INVALID_DMA_ID) {
-		int ret;
-
-		ret = rte_vhost_async_channel_register(vid, VIRTIO_RXQ);
-		if (ret == 0)
-			dma_bind[vid].dmas[VIRTIO_RXQ].async_enabled = true;
-		return ret;
-	}
-
-	return 0;
+	return ret;
 }
 
 static int
@@ -1648,19 +1770,9 @@ vring_state_changed(int vid, uint16_t queue_id, int enable)
 	if (queue_id != VIRTIO_RXQ)
 		return 0;
 
-	if (dma_bind[vid].dmas[queue_id].async_enabled) {
-		if (!enable) {
-			uint16_t n_pkt = 0;
-			int16_t dma_id = dma_bind[vid].dmas[VIRTIO_RXQ].dev_id;
-			struct rte_mbuf *m_cpl[vdev->pkts_inflight];
-
-			while (vdev->pkts_inflight) {
-				n_pkt = rte_vhost_clear_queue_thread_unsafe(vid, queue_id,
-							m_cpl, vdev->pkts_inflight, dma_id, 0);
-				free_pkts(m_cpl, n_pkt);
-				__atomic_sub_fetch(&vdev->pkts_inflight, n_pkt, __ATOMIC_SEQ_CST);
-			}
-		}
+	if (dma_bind[vid2socketid[vid]].dmas[queue_id].async_enabled) {
+		if (!enable)
+			vhost_clear_queue_thread_unsafe(vdev, queue_id);
 	}
 
 	return 0;
@@ -1885,7 +1997,7 @@ main(int argc, char *argv[])
 	for (i = 0; i < nb_sockets; i++) {
 		char *file = socket_files + i * PATH_MAX;
 
-		if (dma_count)
+		if (dma_count && get_async_flag_by_socketid(i) != 0)
 			flags = flags | RTE_VHOST_USER_ASYNC_COPY;
 
 		ret = rte_vhost_driver_register(file, flags);
diff --git a/examples/vhost/main.h b/examples/vhost/main.h
index b4a453e77e..40ac2841d1 100644
--- a/examples/vhost/main.h
+++ b/examples/vhost/main.h
@@ -52,7 +52,8 @@ struct vhost_dev {
 	uint64_t features;
 	size_t hdr_len;
 	uint16_t nr_vrings;
-	uint16_t pkts_inflight;
+	uint16_t pkts_enq_inflight;
+	uint16_t pkts_deq_inflight;
 	struct rte_vhost_memory *mem;
 	struct device_statistics stats;
 	TAILQ_ENTRY(vhost_dev) global_vdev_entry;
@@ -62,6 +63,19 @@ struct vhost_dev {
 	struct vhost_queue queues[MAX_QUEUE_PAIRS * 2];
 } __rte_cache_aligned;
 
+typedef uint16_t (*vhost_enqueue_burst_t)(struct vhost_dev *dev,
+			uint16_t queue_id, struct rte_mbuf **pkts,
+			uint32_t count);
+
+typedef uint16_t (*vhost_dequeue_burst_t)(struct vhost_dev *dev,
+			uint16_t queue_id, struct rte_mempool *mbuf_pool,
+			struct rte_mbuf **pkts, uint16_t count);
+
+struct vhost_queue_ops {
+	vhost_enqueue_burst_t enqueue_pkt_burst;
+	vhost_dequeue_burst_t dequeue_pkt_burst;
+};
+
 TAILQ_HEAD(vhost_dev_tailq_list, vhost_dev);
 
 
@@ -88,6 +102,7 @@ struct dma_info {
 
 struct dma_for_vhost {
 	struct dma_info dmas[RTE_MAX_QUEUES_PER_PORT * 2];
+	uint32_t async_flag;
 };
 
 /* we implement non-extra virtio net features */
@@ -98,7 +113,19 @@ void vs_vhost_net_remove(struct vhost_dev *dev);
 uint16_t vs_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
 			 struct rte_mbuf **pkts, uint32_t count);
 
-uint16_t vs_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
-			 struct rte_mempool *mbuf_pool,
-			 struct rte_mbuf **pkts, uint16_t count);
+uint16_t builtin_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			struct rte_mbuf **pkts, uint32_t count);
+uint16_t builtin_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			struct rte_mempool *mbuf_pool,
+			struct rte_mbuf **pkts, uint16_t count);
+uint16_t sync_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			 struct rte_mbuf **pkts, uint32_t count);
+uint16_t sync_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			struct rte_mempool *mbuf_pool,
+			struct rte_mbuf **pkts, uint16_t count);
+uint16_t async_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			 struct rte_mbuf **pkts, uint32_t count);
+uint16_t async_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+			struct rte_mempool *mbuf_pool,
+			struct rte_mbuf **pkts, uint16_t count);
 #endif /* _MAIN_H_ */
diff --git a/examples/vhost/virtio_net.c b/examples/vhost/virtio_net.c
index 9064fc3a82..2432a96566 100644
--- a/examples/vhost/virtio_net.c
+++ b/examples/vhost/virtio_net.c
@@ -238,6 +238,13 @@ vs_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
 	return count;
 }
 
+uint16_t
+builtin_enqueue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+		struct rte_mbuf **pkts, uint32_t count)
+{
+	return vs_enqueue_pkts(dev, queue_id, pkts, count);
+}
+
 static __rte_always_inline int
 dequeue_pkt(struct vhost_dev *dev, struct rte_vhost_vring *vr,
 	    struct rte_mbuf *m, uint16_t desc_idx,
@@ -363,7 +370,7 @@ dequeue_pkt(struct vhost_dev *dev, struct rte_vhost_vring *vr,
 	return 0;
 }
 
-uint16_t
+static uint16_t
 vs_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
 	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
 {
@@ -440,3 +447,10 @@ vs_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
 
 	return i;
 }
+
+uint16_t
+builtin_dequeue_pkts(struct vhost_dev *dev, uint16_t queue_id,
+	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
+{
+	return vs_dequeue_pkts(dev, queue_id, mbuf_pool, pkts, count);
+}
-- 
2.17.1


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* Re: [RFC,v3 1/2] vhost: support async dequeue for split ring
  2022-03-10  6:54   ` [RFC,v3 1/2] vhost: support async dequeue for split ring xuan.ding
@ 2022-03-31  9:15     ` Maxime Coquelin
  2022-03-31 11:20       ` Ding, Xuan
  0 siblings, 1 reply; 11+ messages in thread
From: Maxime Coquelin @ 2022-03-31  9:15 UTC (permalink / raw)
  To: xuan.ding, chenbo.xia
  Cc: dev, jiayu.hu, cheng1.jiang, sunil.pai.g, liangma, Yuan Wang



On 3/10/22 07:54, xuan.ding@intel.com wrote:
> From: Xuan Ding <xuan.ding@intel.com>
> 
> This patch implements asynchronous dequeue data path for vhost split
> ring, with dmadev library integrated.
> 
> Signed-off-by: Xuan Ding <xuan.ding@intel.com>
> Signed-off-by: Yuan Wang <yuanx.wang@intel.com>
> ---
>   lib/vhost/rte_vhost_async.h |  37 ++-
>   lib/vhost/version.map       |   1 +
>   lib/vhost/vhost.h           |   1 +
>   lib/vhost/virtio_net.c      | 504 ++++++++++++++++++++++++++++++++++++
>   4 files changed, 541 insertions(+), 2 deletions(-)
> 
> diff --git a/lib/vhost/rte_vhost_async.h b/lib/vhost/rte_vhost_async.h
> index f1293c6a9d..b6ab0b06a2 100644
> --- a/lib/vhost/rte_vhost_async.h
> +++ b/lib/vhost/rte_vhost_async.h
> @@ -155,9 +155,9 @@ int rte_vhost_async_get_inflight(int vid, uint16_t queue_id);
>    * @param count
>    *  Size of the packet array
>    * @param dma_id
> - *  the identifier of DMA device
> + *  The identifier of DMA device
>    * @param vchan_id
> - *  the identifier of virtual DMA channel
> + *  The identifier of virtual DMA channel

This is unrelated to the purpose of this patch, it can be moved in a
dedicated trivial patch.

>    * @return
>    *  Number of packets returned
>    */
> @@ -187,6 +187,39 @@ uint16_t rte_vhost_clear_queue_thread_unsafe(int vid, uint16_t queue_id,
>   __rte_experimental
>   int rte_vhost_async_dma_configure(int16_t dma_id, uint16_t vchan_id);
>   
> +/**
> + * This function tries to receive packets from the guest with offloading
> + * copies to the async channel. The packets that are transfer completed
> + * are returned in "pkts". The other packets that their copies are submitted to
> + * the async channel but not completed are called "in-flight packets".
> + * This function will not return in-flight packets until their copies are
> + * completed by the async channel.
> + *
> + * @param vid
> + *  ID of vhost device to dequeue data
> + * @param queue_id
> + *  ID of virtqueue to dequeue data
> + * @param mbuf_pool
> + *  Mbuf_pool where host mbuf is allocated
> + * @param pkts
> + *  Blank array to keep successfully dequeued packets
> + * @param count
> + *  Size of the packet array
> + * @param nr_inflight
> + *  The amount of in-flight packets. If error occurred, its value is set to -1.
> + * @param dma_id
> + *  The identifier of DMA device
> + * @param vchan_id
> + *  The identifier of virtual DMA channel
> + * @return
> + *  Number of successfully dequeued packets
> + */
> +__rte_experimental
> +uint16_t
> +rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
> +	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
> +	int *nr_inflight, uint16_t dma_id, uint16_t vchan_id);
> +
>   #ifdef __cplusplus
>   }
>   #endif
> diff --git a/lib/vhost/version.map b/lib/vhost/version.map
> index 0a66c5840c..968d6d4290 100644
> --- a/lib/vhost/version.map
> +++ b/lib/vhost/version.map
> @@ -87,6 +87,7 @@ EXPERIMENTAL {
>   
>   	# added in 22.03
>   	rte_vhost_async_dma_configure;

# added in 22.07

> +	rte_vhost_async_try_dequeue_burst;
>   };
>   
>   INTERNAL {
> diff --git a/lib/vhost/vhost.h b/lib/vhost/vhost.h
> index a9edc271aa..3799d41089 100644
> --- a/lib/vhost/vhost.h
> +++ b/lib/vhost/vhost.h
> @@ -178,6 +178,7 @@ extern struct async_dma_info dma_copy_track[RTE_DMADEV_DEFAULT_MAX];
>    */
>   struct async_inflight_info {
>   	struct rte_mbuf *mbuf;
> +	struct virtio_net_hdr nethdr;
>   	uint16_t descs; /* num of descs inflight */
>   	uint16_t nr_buffers; /* num of buffers inflight for packed ring */
>   };
> diff --git a/lib/vhost/virtio_net.c b/lib/vhost/virtio_net.c
> index 5f432b0d77..3816caca79 100644
> --- a/lib/vhost/virtio_net.c
> +++ b/lib/vhost/virtio_net.c
> @@ -3141,3 +3141,507 @@ rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
>   
>   	return count;
>   }
> +
> +static __rte_always_inline int
> +async_desc_to_mbuf_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
> +			struct rte_mbuf *m, uint32_t mbuf_offset,
> +			uint64_t buf_iova, uint32_t cpy_len)
> +{
> +	struct vhost_async *async = vq->async;
> +	uint64_t mapped_len;
> +	uint32_t buf_offset = 0;
> +	void *host_iova;
> +
> +	while (cpy_len) {
> +		host_iova = (void *)(uintptr_t)gpa_to_first_hpa(dev,
> +					buf_iova + buf_offset, cpy_len,
> +					&mapped_len);
> +		if (unlikely(!host_iova)) {
> +			VHOST_LOG_DATA(ERR, "(%s) %s: failed to get host_iova.\n",
> +				dev->ifname, __func__);
> +			return -1;
> +		}
> +
> +		if (unlikely(async_iter_add_iovec(dev, async, host_iova,
> +				(void *)(uintptr_t)rte_pktmbuf_iova_offset(m, mbuf_offset),
> +				(size_t)mapped_len)))
> +			return -1;
> +
> +		cpy_len -= (uint32_t)mapped_len;
> +		mbuf_offset += (uint32_t)mapped_len;
> +		buf_offset += (uint32_t)mapped_len;
> +	}
> +
> +	return 0;
> +}

It looks really similar to async_mbuf_to_desc_seg(), just the direction
of the DMA copoy is changed.

Maybe we could refactor the two functions in a single one with adding a 
parameter for the direction. Something like this:

static __rte_always_inline int
async_fill_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
		struct rte_mbuf *m, uint32_t mbuf_offset,
		uint64_t buf_iova, uint32_t cpy_len, bool to_desc)
{
	struct vhost_async *async = vq->async;
	uint64_t mapped_len;
	uint32_t buf_offset = 0;
	void *src, *dst
	void *host_iova;

	while (cpy_len) {
		host_iova = (void *)(uintptr_t)gpa_to_first_hpa(dev,
				buf_iova + buf_offset, cpy_len, &mapped_len);
		if (unlikely(!host_iova)) {
			VHOST_LOG_DATA(ERR, "(%s) %s: failed to get host iova.\n",
				       dev->ifname, __func__);
			return -1;
		}

		if (to_desc) {
			src = (void *)(uintptr_t)rte_pktmbuf_iova_offset(m, mbuff_offset);
			dst = host_iova
		} else {
			dst = host_iova
			src = (void *)(uintptr_t)rte_pktmbuf_iova_offset(m, mbuff_offset);
		}

		if (unlikely(async_iter_add_iovec(dev, async, src, dst, 
(size_t)mapped_len)))
			return -1;

		cpy_len -= (uint32_t)mapped_len;
		mbuf_offset += (uint32_t)mapped_len;
		buf_offset += (uint32_t)mapped_len;
	}

	return 0;
}


Then if you don't pass a variable but a static true/false value for
to_desc, the compiler should generate the same code, so no performance
degradation.

> +
> +static __rte_always_inline int
> +async_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
> +		  struct buf_vector *buf_vec, uint16_t nr_vec,
> +		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool,
> +		  struct virtio_net_hdr *nethdr)
> +{
> +	uint64_t buf_addr, buf_iova;
> +	uint32_t buf_avail, buf_offset, buf_len;
> +	uint32_t mbuf_avail, mbuf_offset;
> +	uint32_t cpy_len;
> +	/* A counter to avoid desc dead loop chain */
> +	uint16_t vec_idx = 0;
> +	struct rte_mbuf *cur = m, *prev = m;
> +	struct virtio_net_hdr tmp_hdr;
> +	struct virtio_net_hdr *hdr = NULL;
> +	struct vhost_async *async = vq->async;
> +
> +	buf_addr = buf_vec[vec_idx].buf_addr;
> +	buf_len = buf_vec[vec_idx].buf_len;
> +	buf_iova = buf_vec[vec_idx].buf_iova;
> +
> +	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1))
> +		return -1;
> +
> +	if (virtio_net_with_host_offload(dev)) {
> +		if (unlikely(buf_len < sizeof(struct virtio_net_hdr))) {
> +			/*
> +			 * No luck, the virtio-net header doesn't fit
> +			 * in a contiguous virtual area.
> +			 */
> +			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
> +			hdr = &tmp_hdr;
> +		} else {
> +			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_addr);
> +		}
> +	}
> +
> +	/*
> +	 * A virtio driver normally uses at least 2 desc buffers
> +	 * for Tx: the first for storing the header, and others
> +	 * for storing the data.
> +	 */
> +	if (unlikely(buf_len < dev->vhost_hlen)) {
> +		buf_offset = dev->vhost_hlen - buf_len;
> +		vec_idx++;
> +		buf_addr = buf_vec[vec_idx].buf_addr;
> +		buf_iova = buf_vec[vec_idx].buf_iova;
> +		buf_len = buf_vec[vec_idx].buf_len;
> +		buf_avail  = buf_len - buf_offset;
> +	} else if (buf_len == dev->vhost_hlen) {
> +		if (unlikely(++vec_idx >= nr_vec))
> +			return -1;
> +		buf_addr = buf_vec[vec_idx].buf_addr;
> +		buf_iova = buf_vec[vec_idx].buf_iova;
> +		buf_len = buf_vec[vec_idx].buf_len;
> +
> +		buf_offset = 0;
> +		buf_avail = buf_len;
> +	} else {
> +		buf_offset = dev->vhost_hlen;
> +		buf_avail = buf_vec[vec_idx].buf_len - dev->vhost_hlen;
> +	}
> +
> +	PRINT_PACKET(dev, (uintptr_t)(buf_addr + buf_offset), (uint32_t)buf_avail, 0);
> +
> +	mbuf_offset = 0;
> +	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
> +
> +	if (async_iter_initialize(dev, async))
> +		return -1;
> +
> +	while (1) {
> +		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
> +
> +		if (async_desc_to_mbuf_seg(dev, vq, cur, mbuf_offset, buf_iova + buf_offset,
> +					   cpy_len) < 0)
> +			goto error;
> +
> +		mbuf_avail -= cpy_len;
> +		buf_avail -= cpy_len;
> +		mbuf_offset += cpy_len;
> +		buf_offset += cpy_len;
> +
> +		/* This buf reaches to its end, get the next one */
> +		if (buf_avail == 0) {
> +			if (++vec_idx >= nr_vec)
> +				break;
> +
> +			buf_addr = buf_vec[vec_idx].buf_addr;
> +			buf_iova = buf_vec[vec_idx].buf_iova;
> +			buf_len = buf_vec[vec_idx].buf_len;
> +
> +			buf_offset = 0;
> +			buf_avail = buf_len;
> +
> +			PRINT_PACKET(dev, (uintptr_t)buf_addr, (uint32_t)buf_avail, 0);
> +		}
> +
> +		/*
> +		 * This mbuf reaches to its end, get a new one
> +		 * to hold more data.
> +		 */
> +		if (mbuf_avail == 0) {
> +			cur = rte_pktmbuf_alloc(mbuf_pool);
> +			if (unlikely(cur == NULL)) {
> +				VHOST_LOG_DATA(ERR,
> +					"(%s) %s: failed to allocate memory for mbuf.\n",
> +					dev->ifname, __func__);
> +				goto error;
> +			}
> +
> +			prev->next = cur;
> +			prev->data_len = mbuf_offset;
> +			m->nb_segs += 1;
> +			m->pkt_len += mbuf_offset;
> +			prev = cur;
> +
> +			mbuf_offset = 0;
> +			mbuf_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
> +		}
> +	}
> +
> +	prev->data_len = mbuf_offset;
> +	m->pkt_len += mbuf_offset;
> +
> +	async_iter_finalize(async);
> +	if (hdr)
> +		*nethdr = *hdr;
> +
> +	return 0;
> +
> +error:
> +	async_iter_cancel(async);
> +	return -1;
> +}


You can do here the same refactoring I did for the enqueue path, i.e.
merging copy_desc_to_mbuf and async_desc_to_mbuf in a single function.

> +static __rte_always_inline uint16_t
> +async_poll_dequeue_completed_split(struct virtio_net *dev, uint16_t queue_id,
> +		struct rte_mbuf **pkts, uint16_t count, uint16_t dma_id,
> +		uint16_t vchan_id, bool legacy_ol_flags)
> +{
> +	uint16_t start_idx, from, i;
> +	uint16_t nr_cpl_pkts = 0;
> +	struct async_inflight_info *pkts_info;
> +	struct vhost_virtqueue *vq = dev->virtqueue[queue_id];
> +
> +	pkts_info = vq->async->pkts_info;
> +
> +	vhost_async_dma_check_completed(dev, dma_id, vchan_id, VHOST_DMA_MAX_COPY_COMPLETE);
> +
> +	start_idx = async_get_first_inflight_pkt_idx(vq);
> +
> +	from = start_idx;
> +	while (vq->async->pkts_cmpl_flag[from] && count--) {
> +		vq->async->pkts_cmpl_flag[from] = false;
> +		from = (from + 1) & (vq->size - 1);
> +		nr_cpl_pkts++;
> +	}
> +
> +	if (nr_cpl_pkts == 0)
> +		return 0;
> +
> +	for (i = 0; i < nr_cpl_pkts; i++) {
> +		from = (start_idx + i) & (vq->size - 1);
> +		pkts[i] = pkts_info[from].mbuf;
> +
> +		if (virtio_net_with_host_offload(dev))
> +			vhost_dequeue_offload(dev, &pkts_info[from].nethdr, pkts[i],
> +					      legacy_ol_flags);
> +	}
> +
> +	/* write back completed descs to used ring and update used idx */
> +	write_back_completed_descs_split(vq, nr_cpl_pkts);
> +	__atomic_add_fetch(&vq->used->idx, nr_cpl_pkts, __ATOMIC_RELEASE);
> +	vhost_vring_call_split(dev, vq);
> +
> +	vq->async->pkts_inflight_n -= nr_cpl_pkts;
> +
> +	return nr_cpl_pkts;
> +}
> +
> +static __rte_always_inline uint16_t
> +virtio_dev_tx_async_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
> +		uint16_t queue_id, struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
> +		uint16_t count, uint16_t dma_id, uint16_t vchan_id, bool legacy_ol_flags)
> +{
> +	static bool allocerr_warned;
> +	bool dropped = false;
> +	uint16_t free_entries;
> +	uint16_t pkt_idx, slot_idx = 0;
> +	uint16_t nr_done_pkts = 0;
> +	uint16_t pkt_err = 0;
> +	uint16_t n_xfer;
> +	struct vhost_async *async = vq->async;
> +	struct async_inflight_info *pkts_info = async->pkts_info;
> +	struct rte_mbuf *pkts_prealloc[MAX_PKT_BURST];
> +	uint16_t pkts_size = count;
> +
> +	/**
> +	 * The ordering between avail index and
> +	 * desc reads needs to be enforced.
> +	 */
> +	free_entries = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE) - vq->last_avail_idx;
> +	if (free_entries == 0)
> +		goto out;
> +
> +	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
> +
> +	async_iter_reset(async);
> +
> +	count = RTE_MIN(count, MAX_PKT_BURST);
> +	count = RTE_MIN(count, free_entries);
> +	VHOST_LOG_DATA(DEBUG, "(%s) about to dequeue %u buffers\n", dev->ifname, count);
> +
> +	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts_prealloc, count))
> +		goto out;
> +
> +	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
> +		uint16_t head_idx = 0;
> +		uint16_t nr_vec = 0;
> +		uint16_t to;
> +		uint32_t buf_len;
> +		int err;
> +		struct buf_vector buf_vec[BUF_VECTOR_MAX];
> +		struct rte_mbuf *pkt = pkts_prealloc[pkt_idx];
> +
> +		if (unlikely(fill_vec_buf_split(dev, vq, vq->last_avail_idx,
> +						&nr_vec, buf_vec,
> +						&head_idx, &buf_len,
> +						VHOST_ACCESS_RO) < 0)) {
> +			dropped = true;
> +			break;
> +		}
> +
> +		err = virtio_dev_pktmbuf_prep(dev, pkt, buf_len);
> +		if (unlikely(err)) {
> +			/**
> +			 * mbuf allocation fails for jumbo packets when external
> +			 * buffer allocation is not allowed and linear buffer
> +			 * is required. Drop this packet.
> +			 */
> +			if (!allocerr_warned) {
> +				VHOST_LOG_DATA(ERR,
> +					"(%s) %s: Failed mbuf alloc of size %d from %s\n",
> +					dev->ifname, __func__, buf_len, mbuf_pool->name);
> +				allocerr_warned = true;
> +			}
> +			dropped = true;
> +			break;
> +		}
> +
> +		slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1);
> +		err = async_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkt, mbuf_pool,
> +					&pkts_info[slot_idx].nethdr);
> +		if (unlikely(err)) {
> +			if (!allocerr_warned) {
> +				VHOST_LOG_DATA(ERR,
> +					"(%s) %s: Failed to offload copies to async channel.\n",
> +					dev->ifname, __func__);
> +				allocerr_warned = true;
> +			}
> +			dropped = true;
> +			break;
> +		}
> +
> +		pkts_info[slot_idx].mbuf = pkt;
> +
> +		/* store used descs */
> +		to = async->desc_idx_split & (vq->size - 1);
> +		async->descs_split[to].id = head_idx;
> +		async->descs_split[to].len = 0;
> +		async->desc_idx_split++;
> +
> +		vq->last_avail_idx++;
> +	}
> +
> +	if (unlikely(dropped))
> +		rte_pktmbuf_free_bulk(&pkts_prealloc[pkt_idx], count - pkt_idx);
> +
> +	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx,
> +					  async->iov_iter, pkt_idx);
> +
> +	async->pkts_inflight_n += n_xfer;
> +
> +	pkt_err = pkt_idx - n_xfer;
> +	if (unlikely(pkt_err)) {
> +		VHOST_LOG_DATA(DEBUG,
> +			"(%s) %s: failed to transfer data for queue id %d.\n",
> +			dev->ifname, __func__, queue_id);
> +
> +		pkt_idx = n_xfer;
> +		/* recover available ring */
> +		vq->last_avail_idx -= pkt_err;
> +
> +		/**
> +		 * recover async channel copy related structures and free pktmbufs
> +		 * for error pkts.
> +		 */
> +		async->desc_idx_split -= pkt_err;
> +		while (pkt_err-- > 0) {
> +			rte_pktmbuf_free(pkts_info[slot_idx & (vq->size - 1)].mbuf);
> +			slot_idx--;
> +		}
> +	}
> +
> +	async->pkts_idx += pkt_idx;
> +	if (async->pkts_idx >= vq->size)
> +		async->pkts_idx -= vq->size;
> +
> +out:
> +	/* DMA device may serve other queues, unconditionally check completed. */
> +	nr_done_pkts = async_poll_dequeue_completed_split(dev, queue_id, pkts, pkts_size,
> +							  dma_id, vchan_id, legacy_ol_flags);
> +
> +	return nr_done_pkts;
> +}
> +
> +__rte_noinline
> +static uint16_t
> +virtio_dev_tx_async_split_legacy(struct virtio_net *dev,
> +		struct vhost_virtqueue *vq, uint16_t queue_id,
> +		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
> +		uint16_t count, uint16_t dma_id, uint16_t vchan_id)
> +{
> +	return virtio_dev_tx_async_split(dev, vq, queue_id, mbuf_pool,
> +				pkts, count, dma_id, vchan_id, true);
> +}
> +
> +__rte_noinline
> +static uint16_t
> +virtio_dev_tx_async_split_compliant(struct virtio_net *dev,
> +		struct vhost_virtqueue *vq, uint16_t queue_id,
> +		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
> +		uint16_t count, uint16_t dma_id, uint16_t vchan_id)
> +{
> +	return virtio_dev_tx_async_split(dev, vq, queue_id, mbuf_pool,
> +				pkts, count, dma_id, vchan_id, false);
> +}
> +
> +uint16_t
> +rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
> +	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count,
> +	int *nr_inflight, uint16_t dma_id, uint16_t vchan_id)
> +{
> +	struct virtio_net *dev;
> +	struct rte_mbuf *rarp_mbuf = NULL;
> +	struct vhost_virtqueue *vq;
> +	int16_t success = 1;
> +
> +	*nr_inflight = -1;
> +
> +	dev = get_device(vid);
> +	if (!dev)
> +		return 0;
> +
> +	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
> +		VHOST_LOG_DATA(ERR,
> +			"(%s) %s: built-in vhost net backend is disabled.\n",
> +			dev->ifname, __func__);
> +		return 0;
> +	}
> +
> +	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
> +		VHOST_LOG_DATA(ERR,
> +			"(%s) %s: invalid virtqueue idx %d.\n",
> +			dev->ifname, __func__, queue_id);
> +		return 0;
> +	}
> +
> +	if (unlikely(!dma_copy_track[dma_id].vchans ||
> +				!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
> +		VHOST_LOG_DATA(ERR, "(%s) %s: invalid channel %d:%u.\n", dev->ifname, __func__,
> +			       dma_id, vchan_id);
> +		return 0;
> +	}
> +
> +	vq = dev->virtqueue[queue_id];
> +
> +	if (unlikely(rte_spinlock_trylock(&vq->access_lock) == 0))
> +		return 0;
> +
> +	if (unlikely(vq->enabled == 0)) {
> +		count = 0;
> +		goto out_access_unlock;
> +	}
> +
> +	if (unlikely(!vq->async)) {
> +		VHOST_LOG_DATA(ERR, "(%s) %s: async not registered for queue id %d.\n",
> +			dev->ifname, __func__, queue_id);
> +		count = 0;
> +		goto out_access_unlock;
> +	}
> +
> +	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
> +		vhost_user_iotlb_rd_lock(vq);
> +
> +	if (unlikely(vq->access_ok == 0))
> +		if (unlikely(vring_translate(dev, vq) < 0)) {
> +			count = 0;
> +			goto out;
> +		}
> +
> +	/*
> +	 * Construct a RARP broadcast packet, and inject it to the "pkts"
> +	 * array, to looks like that guest actually send such packet.
> +	 *
> +	 * Check user_send_rarp() for more information.
> +	 *
> +	 * broadcast_rarp shares a cacheline in the virtio_net structure
> +	 * with some fields that are accessed during enqueue and
> +	 * __atomic_compare_exchange_n causes a write if performed compare
> +	 * and exchange. This could result in false sharing between enqueue
> +	 * and dequeue.
> +	 *
> +	 * Prevent unnecessary false sharing by reading broadcast_rarp first
> +	 * and only performing compare and exchange if the read indicates it
> +	 * is likely to be set.
> +	 */
> +	if (unlikely(__atomic_load_n(&dev->broadcast_rarp, __ATOMIC_ACQUIRE) &&
> +			__atomic_compare_exchange_n(&dev->broadcast_rarp,
> +			&success, 0, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))) {
> +
> +		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac);
> +		if (rarp_mbuf == NULL) {
> +			VHOST_LOG_DATA(ERR, "Failed to make RARP packet.\n");
> +			count = 0;
> +			goto out;
> +		}
> +		/*
> +		 * Inject it to the head of "pkts" array, so that switch's mac
> +		 * learning table will get updated first.
> +		 */
> +		pkts[0] = rarp_mbuf;
> +		pkts++;
> +		count -= 1;
> +	}
> +
> +	if (unlikely(vq_is_packed(dev))) {
> +		static bool not_support_pack_log;
> +		if (!not_support_pack_log) {
> +			VHOST_LOG_DATA(ERR,
> +				"(%s) %s: async dequeue does not support packed ring.\n",
> +				dev->ifname, __func__);
> +			not_support_pack_log = true;
> +		}
> +		count = 0;
> +		goto out;
> +	}
> +
> +	if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
> +		count = virtio_dev_tx_async_split_legacy(dev, vq, queue_id,
> +				mbuf_pool, pkts, count, dma_id, vchan_id);
> +	else
> +		count = virtio_dev_tx_async_split_compliant(dev, vq, queue_id,
> +				mbuf_pool, pkts, count, dma_id, vchan_id);
> +
> +	*nr_inflight = vq->async->pkts_inflight_n;
> +
> +out:
> +	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
> +		vhost_user_iotlb_rd_unlock(vq);
> +
> +out_access_unlock:
> +	rte_spinlock_unlock(&vq->access_lock);
> +
> +	if (unlikely(rarp_mbuf != NULL))
> +		count += 1;
> +
> +	return count;
> +}


^ permalink raw reply	[flat|nested] 11+ messages in thread

* RE: [RFC,v3 1/2] vhost: support async dequeue for split ring
  2022-03-31  9:15     ` Maxime Coquelin
@ 2022-03-31 11:20       ` Ding, Xuan
  0 siblings, 0 replies; 11+ messages in thread
From: Ding, Xuan @ 2022-03-31 11:20 UTC (permalink / raw)
  To: Maxime Coquelin, Xia, Chenbo
  Cc: dev, Hu, Jiayu, Jiang, Cheng1, Pai G, Sunil, liangma, Wang, YuanX

Hi Maxime,

> -----Original Message-----
> From: Maxime Coquelin <maxime.coquelin@redhat.com>
> Sent: Thursday, March 31, 2022 5:15 PM
> To: Ding, Xuan <xuan.ding@intel.com>; Xia, Chenbo <chenbo.xia@intel.com>
> Cc: dev@dpdk.org; Hu, Jiayu <jiayu.hu@intel.com>; Jiang, Cheng1
> <cheng1.jiang@intel.com>; Pai G, Sunil <sunil.pai.g@intel.com>;
> liangma@liangbit.com; Wang, YuanX <yuanx.wang@intel.com>
> Subject: Re: [RFC,v3 1/2] vhost: support async dequeue for split ring
> 
> 
> 
> On 3/10/22 07:54, xuan.ding@intel.com wrote:
> > From: Xuan Ding <xuan.ding@intel.com>
> >
> > This patch implements asynchronous dequeue data path for vhost split
> > ring, with dmadev library integrated.
> >
> > Signed-off-by: Xuan Ding <xuan.ding@intel.com>
> > Signed-off-by: Yuan Wang <yuanx.wang@intel.com>
> > ---
> >   lib/vhost/rte_vhost_async.h |  37 ++-
> >   lib/vhost/version.map       |   1 +
> >   lib/vhost/vhost.h           |   1 +
> >   lib/vhost/virtio_net.c      | 504 ++++++++++++++++++++++++++++++++++++
> >   4 files changed, 541 insertions(+), 2 deletions(-)
> >
> > diff --git a/lib/vhost/rte_vhost_async.h b/lib/vhost/rte_vhost_async.h
> > index f1293c6a9d..b6ab0b06a2 100644
> > --- a/lib/vhost/rte_vhost_async.h
> > +++ b/lib/vhost/rte_vhost_async.h
> > @@ -155,9 +155,9 @@ int rte_vhost_async_get_inflight(int vid, uint16_t
> queue_id);
> >    * @param count
> >    *  Size of the packet array
> >    * @param dma_id
> > - *  the identifier of DMA device
> > + *  The identifier of DMA device
> >    * @param vchan_id
> > - *  the identifier of virtual DMA channel
> > + *  The identifier of virtual DMA channel
> 
> This is unrelated to the purpose of this patch, it can be moved in a dedicated
> trivial patch.

Okay, I will remove it from this patch set.

> 
> >    * @return
> >    *  Number of packets returned
> >    */
> > @@ -187,6 +187,39 @@ uint16_t
> rte_vhost_clear_queue_thread_unsafe(int vid, uint16_t queue_id,
> >   __rte_experimental
> >   int rte_vhost_async_dma_configure(int16_t dma_id, uint16_t
> > vchan_id);
> >
> > +/**
> > + * This function tries to receive packets from the guest with
> > +offloading
> > + * copies to the async channel. The packets that are transfer
> > +completed
> > + * are returned in "pkts". The other packets that their copies are
> > +submitted to
> > + * the async channel but not completed are called "in-flight packets".
> > + * This function will not return in-flight packets until their copies
> > +are
> > + * completed by the async channel.
> > + *
> > + * @param vid
> > + *  ID of vhost device to dequeue data
> > + * @param queue_id
> > + *  ID of virtqueue to dequeue data
> > + * @param mbuf_pool
> > + *  Mbuf_pool where host mbuf is allocated
> > + * @param pkts
> > + *  Blank array to keep successfully dequeued packets
> > + * @param count
> > + *  Size of the packet array
> > + * @param nr_inflight
> > + *  The amount of in-flight packets. If error occurred, its value is set to -1.
> > + * @param dma_id
> > + *  The identifier of DMA device
> > + * @param vchan_id
> > + *  The identifier of virtual DMA channel
> > + * @return
> > + *  Number of successfully dequeued packets  */ __rte_experimental
> > +uint16_t rte_vhost_async_try_dequeue_burst(int vid, uint16_t
> > +queue_id,
> > +	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t
> count,
> > +	int *nr_inflight, uint16_t dma_id, uint16_t vchan_id);
> > +
> >   #ifdef __cplusplus
> >   }
> >   #endif
> > diff --git a/lib/vhost/version.map b/lib/vhost/version.map index
> > 0a66c5840c..968d6d4290 100644
> > --- a/lib/vhost/version.map
> > +++ b/lib/vhost/version.map
> > @@ -87,6 +87,7 @@ EXPERIMENTAL {
> >
> >   	# added in 22.03
> >   	rte_vhost_async_dma_configure;
> 
> # added in 22.07

Sure, I will add in 22.07 in new version.

> 
> > +	rte_vhost_async_try_dequeue_burst;
> >   };
> >
> >   INTERNAL {
> > diff --git a/lib/vhost/vhost.h b/lib/vhost/vhost.h index
> > a9edc271aa..3799d41089 100644
> > --- a/lib/vhost/vhost.h
> > +++ b/lib/vhost/vhost.h
> > @@ -178,6 +178,7 @@ extern struct async_dma_info
> dma_copy_track[RTE_DMADEV_DEFAULT_MAX];
> >    */
> >   struct async_inflight_info {
> >   	struct rte_mbuf *mbuf;
> > +	struct virtio_net_hdr nethdr;
> >   	uint16_t descs; /* num of descs inflight */
> >   	uint16_t nr_buffers; /* num of buffers inflight for packed ring */
> >   };
> > diff --git a/lib/vhost/virtio_net.c b/lib/vhost/virtio_net.c index
> > 5f432b0d77..3816caca79 100644
> > --- a/lib/vhost/virtio_net.c
> > +++ b/lib/vhost/virtio_net.c
> > @@ -3141,3 +3141,507 @@ rte_vhost_dequeue_burst(int vid, uint16_t
> > queue_id,
> >
> >   	return count;
> >   }
> > +
> > +static __rte_always_inline int
> > +async_desc_to_mbuf_seg(struct virtio_net *dev, struct vhost_virtqueue
> *vq,
> > +			struct rte_mbuf *m, uint32_t mbuf_offset,
> > +			uint64_t buf_iova, uint32_t cpy_len) {
> > +	struct vhost_async *async = vq->async;
> > +	uint64_t mapped_len;
> > +	uint32_t buf_offset = 0;
> > +	void *host_iova;
> > +
> > +	while (cpy_len) {
> > +		host_iova = (void *)(uintptr_t)gpa_to_first_hpa(dev,
> > +					buf_iova + buf_offset, cpy_len,
> > +					&mapped_len);
> > +		if (unlikely(!host_iova)) {
> > +			VHOST_LOG_DATA(ERR, "(%s) %s: failed to get
> host_iova.\n",
> > +				dev->ifname, __func__);
> > +			return -1;
> > +		}
> > +
> > +		if (unlikely(async_iter_add_iovec(dev, async, host_iova,
> > +				(void *)(uintptr_t)rte_pktmbuf_iova_offset(m,
> mbuf_offset),
> > +				(size_t)mapped_len)))
> > +			return -1;
> > +
> > +		cpy_len -= (uint32_t)mapped_len;
> > +		mbuf_offset += (uint32_t)mapped_len;
> > +		buf_offset += (uint32_t)mapped_len;
> > +	}
> > +
> > +	return 0;
> > +}
> 
> It looks really similar to async_mbuf_to_desc_seg(), just the direction of the
> DMA copoy is changed.
> 
> Maybe we could refactor the two functions in a single one with adding a
> parameter for the direction. Something like this:
> 
> static __rte_always_inline int
> async_fill_seg(struct virtio_net *dev, struct vhost_virtqueue *vq,
> 		struct rte_mbuf *m, uint32_t mbuf_offset,
> 		uint64_t buf_iova, uint32_t cpy_len, bool to_desc) {
> 	struct vhost_async *async = vq->async;
> 	uint64_t mapped_len;
> 	uint32_t buf_offset = 0;
> 	void *src, *dst
> 	void *host_iova;
> 
> 	while (cpy_len) {
> 		host_iova = (void *)(uintptr_t)gpa_to_first_hpa(dev,
> 				buf_iova + buf_offset, cpy_len,
> &mapped_len);
> 		if (unlikely(!host_iova)) {
> 			VHOST_LOG_DATA(ERR, "(%s) %s: failed to get host
> iova.\n",
> 				       dev->ifname, __func__);
> 			return -1;
> 		}
> 
> 		if (to_desc) {
> 			src = (void *)(uintptr_t)rte_pktmbuf_iova_offset(m,
> mbuff_offset);
> 			dst = host_iova
> 		} else {
> 			dst = host_iova
> 			src = (void *)(uintptr_t)rte_pktmbuf_iova_offset(m,
> mbuff_offset);
> 		}
> 
> 		if (unlikely(async_iter_add_iovec(dev, async, src, dst,
> (size_t)mapped_len)))
> 			return -1;
> 
> 		cpy_len -= (uint32_t)mapped_len;
> 		mbuf_offset += (uint32_t)mapped_len;
> 		buf_offset += (uint32_t)mapped_len;
> 	}
> 
> 	return 0;
> }
> 
> 
> Then if you don't pass a variable but a static true/false value for to_desc, the
> compiler should generate the same code, so no performance degradation.

Thanks so much for the suggestion.
I noticed the async_mbuf_to_desc_seg() refactoring in enqueue path,
so I refactored the previous dequeue code to async_desc_to_mbuf_seg().
These two functions can indeed be combined into one, it is a good idea.

> 
> > +
> > +static __rte_always_inline int
> > +async_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
> > +		  struct buf_vector *buf_vec, uint16_t nr_vec,
> > +		  struct rte_mbuf *m, struct rte_mempool *mbuf_pool,
> > +		  struct virtio_net_hdr *nethdr)
> > +{
> > +	uint64_t buf_addr, buf_iova;
> > +	uint32_t buf_avail, buf_offset, buf_len;
> > +	uint32_t mbuf_avail, mbuf_offset;
> > +	uint32_t cpy_len;
> > +	/* A counter to avoid desc dead loop chain */
> > +	uint16_t vec_idx = 0;
> > +	struct rte_mbuf *cur = m, *prev = m;
> > +	struct virtio_net_hdr tmp_hdr;
> > +	struct virtio_net_hdr *hdr = NULL;
> > +	struct vhost_async *async = vq->async;
> > +
> > +	buf_addr = buf_vec[vec_idx].buf_addr;
> > +	buf_len = buf_vec[vec_idx].buf_len;
> > +	buf_iova = buf_vec[vec_idx].buf_iova;
> > +
> > +	if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1))
> > +		return -1;
> > +
> > +	if (virtio_net_with_host_offload(dev)) {
> > +		if (unlikely(buf_len < sizeof(struct virtio_net_hdr))) {
> > +			/*
> > +			 * No luck, the virtio-net header doesn't fit
> > +			 * in a contiguous virtual area.
> > +			 */
> > +			copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec);
> > +			hdr = &tmp_hdr;
> > +		} else {
> > +			hdr = (struct virtio_net_hdr *)((uintptr_t)buf_addr);
> > +		}
> > +	}
> > +
> > +	/*
> > +	 * A virtio driver normally uses at least 2 desc buffers
> > +	 * for Tx: the first for storing the header, and others
> > +	 * for storing the data.
> > +	 */
> > +	if (unlikely(buf_len < dev->vhost_hlen)) {
> > +		buf_offset = dev->vhost_hlen - buf_len;
> > +		vec_idx++;
> > +		buf_addr = buf_vec[vec_idx].buf_addr;
> > +		buf_iova = buf_vec[vec_idx].buf_iova;
> > +		buf_len = buf_vec[vec_idx].buf_len;
> > +		buf_avail  = buf_len - buf_offset;
> > +	} else if (buf_len == dev->vhost_hlen) {
> > +		if (unlikely(++vec_idx >= nr_vec))
> > +			return -1;
> > +		buf_addr = buf_vec[vec_idx].buf_addr;
> > +		buf_iova = buf_vec[vec_idx].buf_iova;
> > +		buf_len = buf_vec[vec_idx].buf_len;
> > +
> > +		buf_offset = 0;
> > +		buf_avail = buf_len;
> > +	} else {
> > +		buf_offset = dev->vhost_hlen;
> > +		buf_avail = buf_vec[vec_idx].buf_len - dev->vhost_hlen;
> > +	}
> > +
> > +	PRINT_PACKET(dev, (uintptr_t)(buf_addr + buf_offset),
> > +(uint32_t)buf_avail, 0);
> > +
> > +	mbuf_offset = 0;
> > +	mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
> > +
> > +	if (async_iter_initialize(dev, async))
> > +		return -1;
> > +
> > +	while (1) {
> > +		cpy_len = RTE_MIN(buf_avail, mbuf_avail);
> > +
> > +		if (async_desc_to_mbuf_seg(dev, vq, cur, mbuf_offset,
> buf_iova + buf_offset,
> > +					   cpy_len) < 0)
> > +			goto error;
> > +
> > +		mbuf_avail -= cpy_len;
> > +		buf_avail -= cpy_len;
> > +		mbuf_offset += cpy_len;
> > +		buf_offset += cpy_len;
> > +
> > +		/* This buf reaches to its end, get the next one */
> > +		if (buf_avail == 0) {
> > +			if (++vec_idx >= nr_vec)
> > +				break;
> > +
> > +			buf_addr = buf_vec[vec_idx].buf_addr;
> > +			buf_iova = buf_vec[vec_idx].buf_iova;
> > +			buf_len = buf_vec[vec_idx].buf_len;
> > +
> > +			buf_offset = 0;
> > +			buf_avail = buf_len;
> > +
> > +			PRINT_PACKET(dev, (uintptr_t)buf_addr,
> (uint32_t)buf_avail, 0);
> > +		}
> > +
> > +		/*
> > +		 * This mbuf reaches to its end, get a new one
> > +		 * to hold more data.
> > +		 */
> > +		if (mbuf_avail == 0) {
> > +			cur = rte_pktmbuf_alloc(mbuf_pool);
> > +			if (unlikely(cur == NULL)) {
> > +				VHOST_LOG_DATA(ERR,
> > +					"(%s) %s: failed to allocate memory
> for mbuf.\n",
> > +					dev->ifname, __func__);
> > +				goto error;
> > +			}
> > +
> > +			prev->next = cur;
> > +			prev->data_len = mbuf_offset;
> > +			m->nb_segs += 1;
> > +			m->pkt_len += mbuf_offset;
> > +			prev = cur;
> > +
> > +			mbuf_offset = 0;
> > +			mbuf_avail = cur->buf_len -
> RTE_PKTMBUF_HEADROOM;
> > +		}
> > +	}
> > +
> > +	prev->data_len = mbuf_offset;
> > +	m->pkt_len += mbuf_offset;
> > +
> > +	async_iter_finalize(async);
> > +	if (hdr)
> > +		*nethdr = *hdr;
> > +
> > +	return 0;
> > +
> > +error:
> > +	async_iter_cancel(async);
> > +	return -1;
> > +}
> 
> 
> You can do here the same refactoring I did for the enqueue path, i.e.
> merging copy_desc_to_mbuf and async_desc_to_mbuf in a single function.

Yes, I am preparing the v1 patch in 22.07, including the refactoring here.
Please see next version.

Thanks,
Xuan

> 
> > +static __rte_always_inline uint16_t
> > +async_poll_dequeue_completed_split(struct virtio_net *dev, uint16_t
> queue_id,
> > +		struct rte_mbuf **pkts, uint16_t count, uint16_t dma_id,
> > +		uint16_t vchan_id, bool legacy_ol_flags) {
> > +	uint16_t start_idx, from, i;
> > +	uint16_t nr_cpl_pkts = 0;
> > +	struct async_inflight_info *pkts_info;
> > +	struct vhost_virtqueue *vq = dev->virtqueue[queue_id];
> > +
> > +	pkts_info = vq->async->pkts_info;
> > +
> > +	vhost_async_dma_check_completed(dev, dma_id, vchan_id,
> > +VHOST_DMA_MAX_COPY_COMPLETE);
> > +
> > +	start_idx = async_get_first_inflight_pkt_idx(vq);
> > +
> > +	from = start_idx;
> > +	while (vq->async->pkts_cmpl_flag[from] && count--) {
> > +		vq->async->pkts_cmpl_flag[from] = false;
> > +		from = (from + 1) & (vq->size - 1);
> > +		nr_cpl_pkts++;
> > +	}
> > +
> > +	if (nr_cpl_pkts == 0)
> > +		return 0;
> > +
> > +	for (i = 0; i < nr_cpl_pkts; i++) {
> > +		from = (start_idx + i) & (vq->size - 1);
> > +		pkts[i] = pkts_info[from].mbuf;
> > +
> > +		if (virtio_net_with_host_offload(dev))
> > +			vhost_dequeue_offload(dev,
> &pkts_info[from].nethdr, pkts[i],
> > +					      legacy_ol_flags);
> > +	}
> > +
> > +	/* write back completed descs to used ring and update used idx */
> > +	write_back_completed_descs_split(vq, nr_cpl_pkts);
> > +	__atomic_add_fetch(&vq->used->idx, nr_cpl_pkts,
> __ATOMIC_RELEASE);
> > +	vhost_vring_call_split(dev, vq);
> > +
> > +	vq->async->pkts_inflight_n -= nr_cpl_pkts;
> > +
> > +	return nr_cpl_pkts;
> > +}
> > +
> > +static __rte_always_inline uint16_t
> > +virtio_dev_tx_async_split(struct virtio_net *dev, struct vhost_virtqueue
> *vq,
> > +		uint16_t queue_id, struct rte_mempool *mbuf_pool, struct
> rte_mbuf **pkts,
> > +		uint16_t count, uint16_t dma_id, uint16_t vchan_id, bool
> > +legacy_ol_flags) {
> > +	static bool allocerr_warned;
> > +	bool dropped = false;
> > +	uint16_t free_entries;
> > +	uint16_t pkt_idx, slot_idx = 0;
> > +	uint16_t nr_done_pkts = 0;
> > +	uint16_t pkt_err = 0;
> > +	uint16_t n_xfer;
> > +	struct vhost_async *async = vq->async;
> > +	struct async_inflight_info *pkts_info = async->pkts_info;
> > +	struct rte_mbuf *pkts_prealloc[MAX_PKT_BURST];
> > +	uint16_t pkts_size = count;
> > +
> > +	/**
> > +	 * The ordering between avail index and
> > +	 * desc reads needs to be enforced.
> > +	 */
> > +	free_entries = __atomic_load_n(&vq->avail->idx,
> __ATOMIC_ACQUIRE) - vq->last_avail_idx;
> > +	if (free_entries == 0)
> > +		goto out;
> > +
> > +	rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size -
> > +1)]);
> > +
> > +	async_iter_reset(async);
> > +
> > +	count = RTE_MIN(count, MAX_PKT_BURST);
> > +	count = RTE_MIN(count, free_entries);
> > +	VHOST_LOG_DATA(DEBUG, "(%s) about to dequeue %u buffers\n",
> > +dev->ifname, count);
> > +
> > +	if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts_prealloc, count))
> > +		goto out;
> > +
> > +	for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
> > +		uint16_t head_idx = 0;
> > +		uint16_t nr_vec = 0;
> > +		uint16_t to;
> > +		uint32_t buf_len;
> > +		int err;
> > +		struct buf_vector buf_vec[BUF_VECTOR_MAX];
> > +		struct rte_mbuf *pkt = pkts_prealloc[pkt_idx];
> > +
> > +		if (unlikely(fill_vec_buf_split(dev, vq, vq->last_avail_idx,
> > +						&nr_vec, buf_vec,
> > +						&head_idx, &buf_len,
> > +						VHOST_ACCESS_RO) < 0)) {
> > +			dropped = true;
> > +			break;
> > +		}
> > +
> > +		err = virtio_dev_pktmbuf_prep(dev, pkt, buf_len);
> > +		if (unlikely(err)) {
> > +			/**
> > +			 * mbuf allocation fails for jumbo packets when
> external
> > +			 * buffer allocation is not allowed and linear buffer
> > +			 * is required. Drop this packet.
> > +			 */
> > +			if (!allocerr_warned) {
> > +				VHOST_LOG_DATA(ERR,
> > +					"(%s) %s: Failed mbuf alloc of size %d
> from %s\n",
> > +					dev->ifname, __func__, buf_len,
> mbuf_pool->name);
> > +				allocerr_warned = true;
> > +			}
> > +			dropped = true;
> > +			break;
> > +		}
> > +
> > +		slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1);
> > +		err = async_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkt,
> mbuf_pool,
> > +					&pkts_info[slot_idx].nethdr);
> > +		if (unlikely(err)) {
> > +			if (!allocerr_warned) {
> > +				VHOST_LOG_DATA(ERR,
> > +					"(%s) %s: Failed to offload copies to
> async channel.\n",
> > +					dev->ifname, __func__);
> > +				allocerr_warned = true;
> > +			}
> > +			dropped = true;
> > +			break;
> > +		}
> > +
> > +		pkts_info[slot_idx].mbuf = pkt;
> > +
> > +		/* store used descs */
> > +		to = async->desc_idx_split & (vq->size - 1);
> > +		async->descs_split[to].id = head_idx;
> > +		async->descs_split[to].len = 0;
> > +		async->desc_idx_split++;
> > +
> > +		vq->last_avail_idx++;
> > +	}
> > +
> > +	if (unlikely(dropped))
> > +		rte_pktmbuf_free_bulk(&pkts_prealloc[pkt_idx], count -
> pkt_idx);
> > +
> > +	n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id,
> async->pkts_idx,
> > +					  async->iov_iter, pkt_idx);
> > +
> > +	async->pkts_inflight_n += n_xfer;
> > +
> > +	pkt_err = pkt_idx - n_xfer;
> > +	if (unlikely(pkt_err)) {
> > +		VHOST_LOG_DATA(DEBUG,
> > +			"(%s) %s: failed to transfer data for queue id %d.\n",
> > +			dev->ifname, __func__, queue_id);
> > +
> > +		pkt_idx = n_xfer;
> > +		/* recover available ring */
> > +		vq->last_avail_idx -= pkt_err;
> > +
> > +		/**
> > +		 * recover async channel copy related structures and free
> pktmbufs
> > +		 * for error pkts.
> > +		 */
> > +		async->desc_idx_split -= pkt_err;
> > +		while (pkt_err-- > 0) {
> > +			rte_pktmbuf_free(pkts_info[slot_idx & (vq->size -
> 1)].mbuf);
> > +			slot_idx--;
> > +		}
> > +	}
> > +
> > +	async->pkts_idx += pkt_idx;
> > +	if (async->pkts_idx >= vq->size)
> > +		async->pkts_idx -= vq->size;
> > +
> > +out:
> > +	/* DMA device may serve other queues, unconditionally check
> completed. */
> > +	nr_done_pkts = async_poll_dequeue_completed_split(dev, queue_id,
> pkts, pkts_size,
> > +							  dma_id, vchan_id,
> legacy_ol_flags);
> > +
> > +	return nr_done_pkts;
> > +}
> > +
> > +__rte_noinline
> > +static uint16_t
> > +virtio_dev_tx_async_split_legacy(struct virtio_net *dev,
> > +		struct vhost_virtqueue *vq, uint16_t queue_id,
> > +		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
> > +		uint16_t count, uint16_t dma_id, uint16_t vchan_id) {
> > +	return virtio_dev_tx_async_split(dev, vq, queue_id, mbuf_pool,
> > +				pkts, count, dma_id, vchan_id, true); }
> > +
> > +__rte_noinline
> > +static uint16_t
> > +virtio_dev_tx_async_split_compliant(struct virtio_net *dev,
> > +		struct vhost_virtqueue *vq, uint16_t queue_id,
> > +		struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
> > +		uint16_t count, uint16_t dma_id, uint16_t vchan_id) {
> > +	return virtio_dev_tx_async_split(dev, vq, queue_id, mbuf_pool,
> > +				pkts, count, dma_id, vchan_id, false); }
> > +
> > +uint16_t
> > +rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id,
> > +	struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t
> count,
> > +	int *nr_inflight, uint16_t dma_id, uint16_t vchan_id) {
> > +	struct virtio_net *dev;
> > +	struct rte_mbuf *rarp_mbuf = NULL;
> > +	struct vhost_virtqueue *vq;
> > +	int16_t success = 1;
> > +
> > +	*nr_inflight = -1;
> > +
> > +	dev = get_device(vid);
> > +	if (!dev)
> > +		return 0;
> > +
> > +	if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
> > +		VHOST_LOG_DATA(ERR,
> > +			"(%s) %s: built-in vhost net backend is disabled.\n",
> > +			dev->ifname, __func__);
> > +		return 0;
> > +	}
> > +
> > +	if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
> > +		VHOST_LOG_DATA(ERR,
> > +			"(%s) %s: invalid virtqueue idx %d.\n",
> > +			dev->ifname, __func__, queue_id);
> > +		return 0;
> > +	}
> > +
> > +	if (unlikely(!dma_copy_track[dma_id].vchans ||
> > +
> 	!dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) {
> > +		VHOST_LOG_DATA(ERR, "(%s) %s: invalid channel %d:%u.\n",
> dev->ifname, __func__,
> > +			       dma_id, vchan_id);
> > +		return 0;
> > +	}
> > +
> > +	vq = dev->virtqueue[queue_id];
> > +
> > +	if (unlikely(rte_spinlock_trylock(&vq->access_lock) == 0))
> > +		return 0;
> > +
> > +	if (unlikely(vq->enabled == 0)) {
> > +		count = 0;
> > +		goto out_access_unlock;
> > +	}
> > +
> > +	if (unlikely(!vq->async)) {
> > +		VHOST_LOG_DATA(ERR, "(%s) %s: async not registered for
> queue id %d.\n",
> > +			dev->ifname, __func__, queue_id);
> > +		count = 0;
> > +		goto out_access_unlock;
> > +	}
> > +
> > +	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
> > +		vhost_user_iotlb_rd_lock(vq);
> > +
> > +	if (unlikely(vq->access_ok == 0))
> > +		if (unlikely(vring_translate(dev, vq) < 0)) {
> > +			count = 0;
> > +			goto out;
> > +		}
> > +
> > +	/*
> > +	 * Construct a RARP broadcast packet, and inject it to the "pkts"
> > +	 * array, to looks like that guest actually send such packet.
> > +	 *
> > +	 * Check user_send_rarp() for more information.
> > +	 *
> > +	 * broadcast_rarp shares a cacheline in the virtio_net structure
> > +	 * with some fields that are accessed during enqueue and
> > +	 * __atomic_compare_exchange_n causes a write if performed
> compare
> > +	 * and exchange. This could result in false sharing between enqueue
> > +	 * and dequeue.
> > +	 *
> > +	 * Prevent unnecessary false sharing by reading broadcast_rarp first
> > +	 * and only performing compare and exchange if the read indicates it
> > +	 * is likely to be set.
> > +	 */
> > +	if (unlikely(__atomic_load_n(&dev->broadcast_rarp,
> __ATOMIC_ACQUIRE) &&
> > +			__atomic_compare_exchange_n(&dev-
> >broadcast_rarp,
> > +			&success, 0, 0, __ATOMIC_RELEASE,
> __ATOMIC_RELAXED))) {
> > +
> > +		rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev-
> >mac);
> > +		if (rarp_mbuf == NULL) {
> > +			VHOST_LOG_DATA(ERR, "Failed to make RARP
> packet.\n");
> > +			count = 0;
> > +			goto out;
> > +		}
> > +		/*
> > +		 * Inject it to the head of "pkts" array, so that switch's mac
> > +		 * learning table will get updated first.
> > +		 */
> > +		pkts[0] = rarp_mbuf;
> > +		pkts++;
> > +		count -= 1;
> > +	}
> > +
> > +	if (unlikely(vq_is_packed(dev))) {
> > +		static bool not_support_pack_log;
> > +		if (!not_support_pack_log) {
> > +			VHOST_LOG_DATA(ERR,
> > +				"(%s) %s: async dequeue does not support
> packed ring.\n",
> > +				dev->ifname, __func__);
> > +			not_support_pack_log = true;
> > +		}
> > +		count = 0;
> > +		goto out;
> > +	}
> > +
> > +	if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS)
> > +		count = virtio_dev_tx_async_split_legacy(dev, vq, queue_id,
> > +				mbuf_pool, pkts, count, dma_id, vchan_id);
> > +	else
> > +		count = virtio_dev_tx_async_split_compliant(dev, vq,
> queue_id,
> > +				mbuf_pool, pkts, count, dma_id, vchan_id);
> > +
> > +	*nr_inflight = vq->async->pkts_inflight_n;
> > +
> > +out:
> > +	if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
> > +		vhost_user_iotlb_rd_unlock(vq);
> > +
> > +out_access_unlock:
> > +	rte_spinlock_unlock(&vq->access_lock);
> > +
> > +	if (unlikely(rarp_mbuf != NULL))
> > +		count += 1;
> > +
> > +	return count;
> > +}


^ permalink raw reply	[flat|nested] 11+ messages in thread

end of thread, other threads:[~2022-03-31 11:20 UTC | newest]

Thread overview: 11+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-01-01  0:12 [RFC 0/2] vhost: support async dequeue data path xuan.ding
2022-01-01  0:12 ` [RFC 1/2] vhost: support async dequeue for split ring xuan.ding
2022-01-01  0:12 ` [RFC 2/2] examples/vhost: support async dequeue data path xuan.ding
2022-02-24 11:03 ` [RFC,v2 0/2] vhost: " xuan.ding
2022-02-24 11:03   ` [RFC,v2 1/2] vhost: support async dequeue for split ring xuan.ding
2022-02-24 11:04   ` [RFC,v2 2/2] examples/vhost: support async dequeue data path xuan.ding
2022-03-10  6:54 ` [RFC,v3 0/2] vhost: " xuan.ding
2022-03-10  6:54   ` [RFC,v3 1/2] vhost: support async dequeue for split ring xuan.ding
2022-03-31  9:15     ` Maxime Coquelin
2022-03-31 11:20       ` Ding, Xuan
2022-03-10  6:54   ` [RFC,v3 2/2] examples/vhost: support async dequeue data path xuan.ding

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.