netdev.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* Re: [PATCH net-next v4 09/10] virtio-net: xsk zero copy xmit implement wakeup and xmit
       [not found] <1618482470.2631352-3-xuanzhuo@linux.alibaba.com>
@ 2021-04-16  5:35 ` Jason Wang
  0 siblings, 0 replies; 4+ messages in thread
From: Jason Wang @ 2021-04-16  5:35 UTC (permalink / raw)
  To: Xuan Zhuo
  Cc: Michael S. Tsirkin, David S. Miller, Jakub Kicinski,
	Björn Töpel, Magnus Karlsson, Jonathan Lemon,
	Alexei Starovoitov, Daniel Borkmann, Jesper Dangaard Brouer,
	John Fastabend, virtualization, bpf, dust.li, netdev


在 2021/4/15 下午6:27, Xuan Zhuo 写道:
> On Wed, 14 Apr 2021 13:46:45 +0800, Jason Wang <jasowang@redhat.com> wrote:
>> 在 2021/4/13 上午11:15, Xuan Zhuo 写道:
>>> This patch implements the core part of xsk zerocopy xmit.
>>>
>>> When the user calls sendto to consume the data in the xsk tx queue,
>>> virtnet_xsk_wakeup() will be called.
>>>
>>> In wakeup, it will try to send a part of the data directly. There are
>>> two purposes for this realization:
>>>
>>> 1. Send part of the data quickly to reduce the transmission delay of the
>>>      first packet.
>>> 2. Trigger tx interrupt, start napi to consume xsk tx data.
>>>
>>> All sent xsk packets share the virtio-net header of xsk_hdr. If xsk
>>> needs to support csum and other functions later, consider assigning xsk
>>> hdr separately for each sent packet.
>>>
>>> There are now three situations in free_old_xmit(): skb, xdp frame, xsk
>>> desc.  Based on the last two bit of ptr returned by virtqueue_get_buf():
>>>       00 is skb by default.
>>>       01 represents the packet sent by xdp
>>>       10 is the packet sent by xsk
>>>
>>> If the xmit work of xsk has not been completed, but the ring is full,
>>> napi must first exit and wait for the ring to be available, so
>>> need_wakeup() is set. If free_old_xmit() is called first by start_xmit(),
>>> we can quickly wake up napi to execute xsk xmit task.
>>>
>>> When recycling, we need to count the number of bytes sent, so put xsk
>>> desc->len into the ptr pointer. Because ptr does not point to meaningful
>>> objects in xsk.
>>>
>>> Signed-off-by: Xuan Zhuo <xuanzhuo@linux.alibaba.com>
>>> Reviewed-by: Dust Li <dust.li@linux.alibaba.com>
>>> ---
>>>    drivers/net/virtio_net.c | 296 ++++++++++++++++++++++++++++++++++++++-
>>>    1 file changed, 292 insertions(+), 4 deletions(-)
>>>
>>> diff --git a/drivers/net/virtio_net.c b/drivers/net/virtio_net.c
>>> index 8242a9e9f17d..c441d6bf1510 100644
>>> --- a/drivers/net/virtio_net.c
>>> +++ b/drivers/net/virtio_net.c
>>> @@ -46,6 +46,11 @@ module_param(napi_tx, bool, 0644);
>>>    #define VIRTIO_XDP_REDIR	BIT(1)
>>>
>>>    #define VIRTIO_XDP_FLAG	BIT(0)
>>> +#define VIRTIO_XSK_FLAG	BIT(1)
>>> +
>>> +#define VIRTIO_XSK_PTR_SHIFT       4
>>> +
>>> +static struct virtio_net_hdr_mrg_rxbuf xsk_hdr;
>>>
>>>    /* RX packet size EWMA. The average packet size is used to determine the packet
>>>     * buffer size when refilling RX rings. As the entire RX ring may be refilled
>>> @@ -138,6 +143,12 @@ struct send_queue {
>>>    	struct {
>>>    		/* xsk pool */
>>>    		struct xsk_buff_pool __rcu *pool;
>>> +
>>> +		/* save the desc for next xmit, when xmit fail. */
>>> +		struct xdp_desc last_desc;
>>
>> As replied in the pervious version this looks tricky. I think we need to
>> make sure to reserve some slots as skb path did.
>>
>> This looks exactly like what stmmac did which alos shares XDP and skb
>> for the same ring.
>>
>>
>>> +
>>> +		/* xsk wait for tx inter or softirq */
>>> +		bool need_wakeup;
>>>    	} xsk;
>>>    };
>>>
>>> @@ -255,6 +266,15 @@ struct padded_vnet_hdr {
>>>    	char padding[4];
>>>    };
>>>
>>> +static int virtnet_xsk_run(struct send_queue *sq, struct xsk_buff_pool *pool,
>>> +			   int budget, bool in_napi);
>>> +static void virtnet_xsk_complete(struct send_queue *sq, u32 num);
>>> +
>>> +static bool is_skb_ptr(void *ptr)
>>> +{
>>> +	return !((unsigned long)ptr & (VIRTIO_XDP_FLAG | VIRTIO_XSK_FLAG));
>>> +}
>>> +
>>>    static bool is_xdp_frame(void *ptr)
>>>    {
>>>    	return (unsigned long)ptr & VIRTIO_XDP_FLAG;
>>> @@ -265,6 +285,19 @@ static void *xdp_to_ptr(struct xdp_frame *ptr)
>>>    	return (void *)((unsigned long)ptr | VIRTIO_XDP_FLAG);
>>>    }
>>>
>>> +static void *xsk_to_ptr(struct xdp_desc *desc)
>>> +{
>>> +	/* save the desc len to ptr */
>>> +	u64 p = desc->len << VIRTIO_XSK_PTR_SHIFT;
>>> +
>>> +	return (void *)((unsigned long)p | VIRTIO_XSK_FLAG);
>>> +}
>>> +
>>> +static void ptr_to_xsk(void *ptr, struct xdp_desc *desc)
>>> +{
>>> +	desc->len = ((unsigned long)ptr) >> VIRTIO_XSK_PTR_SHIFT;
>>> +}
>>> +
>>>    static struct xdp_frame *ptr_to_xdp(void *ptr)
>>>    {
>>>    	return (struct xdp_frame *)((unsigned long)ptr & ~VIRTIO_XDP_FLAG);
>>> @@ -273,25 +306,35 @@ static struct xdp_frame *ptr_to_xdp(void *ptr)
>>>    static void __free_old_xmit(struct send_queue *sq, bool in_napi,
>>>    			    struct virtnet_sq_stats *stats)
>>>    {
>>> +	unsigned int xsknum = 0;
>>>    	unsigned int len;
>>>    	void *ptr;
>>>
>>>    	while ((ptr = virtqueue_get_buf(sq->vq, &len)) != NULL) {
>>> -		if (likely(!is_xdp_frame(ptr))) {
>>> +		if (is_skb_ptr(ptr)) {
>>>    			struct sk_buff *skb = ptr;
>>>
>>>    			pr_debug("Sent skb %p\n", skb);
>>>
>>>    			stats->bytes += skb->len;
>>>    			napi_consume_skb(skb, in_napi);
>>> -		} else {
>>> +		} else if (is_xdp_frame(ptr)) {
>>>    			struct xdp_frame *frame = ptr_to_xdp(ptr);
>>>
>>>    			stats->bytes += frame->len;
>>>    			xdp_return_frame(frame);
>>> +		} else {
>>> +			struct xdp_desc desc;
>>> +
>>> +			ptr_to_xsk(ptr, &desc);
>>> +			stats->bytes += desc.len;
>>> +			++xsknum;
>>>    		}
>>>    		stats->packets++;
>>>    	}
>>> +
>>> +	if (xsknum)
>>> +		virtnet_xsk_complete(sq, xsknum);
>>>    }
>>>
>>>    /* Converting between virtqueue no. and kernel tx/rx queue no.
>>> @@ -1529,6 +1572,19 @@ static int virtnet_open(struct net_device *dev)
>>>    	return 0;
>>>    }
>>>
>>> +static int virtnet_poll_xsk(struct send_queue *sq, int budget)
>>> +{
>>> +	struct xsk_buff_pool *pool;
>>> +	int work_done = 0;
>>> +
>>> +	rcu_read_lock();
>>> +	pool = rcu_dereference(sq->xsk.pool);
>>> +	if (pool)
>>> +		work_done = virtnet_xsk_run(sq, pool, budget, true);
>>> +	rcu_read_unlock();
>>> +	return work_done;
>>> +}
>>> +
>>>    static int virtnet_poll_tx(struct napi_struct *napi, int budget)
>>>    {
>>>    	struct send_queue *sq = container_of(napi, struct send_queue, napi);
>>> @@ -1545,6 +1601,7 @@ static int virtnet_poll_tx(struct napi_struct *napi, int budget)
>>>
>>>    	txq = netdev_get_tx_queue(vi->dev, index);
>>>    	__netif_tx_lock(txq, raw_smp_processor_id());
>>> +	work_done += virtnet_poll_xsk(sq, budget);
>>>    	free_old_xmit(sq, true);
>>>    	__netif_tx_unlock(txq);
>>>
>>> @@ -2535,6 +2592,234 @@ static int virtnet_xdp_set(struct net_device *dev, struct bpf_prog *prog,
>>>    	return err;
>>>    }
>>>
>>> +static void virtnet_xsk_check_queue(struct send_queue *sq)
>>> +{
>>> +	struct virtnet_info *vi = sq->vq->vdev->priv;
>>> +	struct net_device *dev = vi->dev;
>>> +	int qnum = sq - vi->sq;
>>> +
>>> +	/* If this sq is not the exclusive queue of the current cpu,
>>> +	 * then it may be called by start_xmit, so check it running out
>>> +	 * of space.
>>> +	 *
>>
>> I think it's better to move this check after is_xdp_raw_buffer_queue().
> Sorry, do not understand.


So what I meant is:


/* If it is a raw buffer queue, ... */
     if (is_xdp_raw_buffer_queue())

/* if it is not the exclusive queue */
     if (sq->vq->num_free ...)


>>
>>> +	 * And if it is a raw buffer queue, it does not check whether the status
>>> +	 * of the queue is stopped when sending. So there is no need to check
>>> +	 * the situation of the raw buffer queue.
>>> +	 */
>>> +	if (is_xdp_raw_buffer_queue(vi, qnum))
>>> +		return;
>>> +
>>> +	/* Stop the queue to avoid getting packets that we are
>>> +	 * then unable to transmit. Then wait the tx interrupt.
>>> +	 */
>>> +	if (sq->vq->num_free < 2 + MAX_SKB_FRAGS)
>>> +		netif_stop_subqueue(dev, qnum);
>>
>> Is there any way to stop xsk TX here?
>
> xsk tx is driven by tx napi, so stop has no meaning.


So NAPI can be stopped.


>
>
>>
>>> +}
>>> +
>>> +static void virtnet_xsk_complete(struct send_queue *sq, u32 num)
>>> +{
>>> +	struct xsk_buff_pool *pool;
>>> +
>>> +	rcu_read_lock();
>>> +
>>> +	pool = rcu_dereference(sq->xsk.pool);
>>> +	if (!pool) {
>>> +		rcu_read_unlock();
>>> +		return;
>>> +	}
>>> +	xsk_tx_completed(pool, num);
>>> +	rcu_read_unlock();
>>> +
>>> +	if (sq->xsk.need_wakeup) {
>>> +		sq->xsk.need_wakeup = false;
>>> +		virtqueue_napi_schedule(&sq->napi, sq->vq);
>>> +	}
>>> +}
>>> +
>>> +static int virtnet_xsk_xmit(struct send_queue *sq, struct xsk_buff_pool *pool,
>>> +			    struct xdp_desc *desc)
>>> +{
>>> +	struct virtnet_info *vi;
>>> +	u32 offset, n, len;
>>> +	struct page *page;
>>> +	void *data;
>>> +	u64 addr;
>>> +	int err;
>>> +
>>> +	vi = sq->vq->vdev->priv;
>>> +	addr = desc->addr;
>>> +
>>> +	data = xsk_buff_raw_get_data(pool, addr);
>>> +	offset = offset_in_page(data);
>>> +
>>> +	/* xsk unaligned mode, desc may use two pages */
>>> +	if (desc->len > PAGE_SIZE - offset)
>>> +		n = 3;
>>> +	else
>>> +		n = 2;
>>> +
>>> +	sg_init_table(sq->sg, n);
>>> +	sg_set_buf(sq->sg, &xsk_hdr, vi->hdr_len);
>>> +
>>> +	/* handle for xsk first page */
>>> +	len = min_t(int, desc->len, PAGE_SIZE - offset);
>>> +	page = xsk_buff_xdp_get_page(pool, addr);
>>> +	sg_set_page(sq->sg + 1, page, len, offset);
>>> +
>>> +	/* xsk unaligned mode, handle for the second page */
>>> +	if (len < desc->len) {
>>> +		page = xsk_buff_xdp_get_page(pool, addr + len);
>>> +		len = min_t(int, desc->len - len, PAGE_SIZE);
>>> +		sg_set_page(sq->sg + 2, page, len, 0);
>>> +	}
>>> +
>>> +	err = virtqueue_add_outbuf(sq->vq, sq->sg, n, xsk_to_ptr(desc),
>>> +				   GFP_ATOMIC);
>>> +	if (unlikely(err))
>>> +		sq->xsk.last_desc = *desc;
>>> +
>>> +	return err;
>>> +}
>>> +
>>> +static int virtnet_xsk_xmit_batch(struct send_queue *sq,
>>> +				  struct xsk_buff_pool *pool,
>>> +				  unsigned int budget,
>>> +				  bool in_napi, int *done,
>>> +				  struct virtnet_sq_stats *stats)
>>> +{
>>> +	struct xdp_desc desc;
>>> +	int err, packet = 0;
>>> +	int ret = -EAGAIN;
>>> +
>>> +	if (sq->xsk.last_desc.addr) {
>>> +		if (sq->vq->num_free < 2 + MAX_SKB_FRAGS)
>>> +			return -EBUSY;
>>> +
>>> +		err = virtnet_xsk_xmit(sq, pool, &sq->xsk.last_desc);
>>> +		if (unlikely(err))
>>> +			return -EBUSY;
>>> +
>>> +		++packet;
>>> +		--budget;
>>> +		sq->xsk.last_desc.addr = 0;
>>
>> So I think we don't need to do this since we try always to reserve 2 +
>> MAX_SKB_FRAGS, then it means we get -EIO/-ENOMEM which is bascially a
>> broken device or dma map.
>>
> Does it mean that when there are enough slots, virtqueue_add_outbuf() returns
> failure -EIO/-ENOMEM, which means that the network card is no longer working.


Or it could be a bug of the driver. And you don't need to check num_free 
before if you always try to reserve sufficient slots.


>
> I originally thought that an error was returned, but it may work normally next
> time. It should be my fault.
>
> Thank you very much, I learned it. I will delete the related code.
>
>>> +	}
>>> +
>>> +	while (budget-- > 0) {
>>> +		if (sq->vq->num_free < 2 + MAX_SKB_FRAGS) {
>>> +			ret = -EBUSY;
>>> +			break;
>>> +		}
>>> +
>>> +		if (!xsk_tx_peek_desc(pool, &desc)) {
>>> +			/* done */
>>> +			ret = 0;
>>> +			break;
>>> +		}
>>> +
>>> +		err = virtnet_xsk_xmit(sq, pool, &desc);
>>> +		if (unlikely(err)) {
>>> +			ret = -EBUSY;
>>
>> Since the function will be called by NAPI I think we to report the
>> number of packets that is transmitted as well.
>
> A 'desc' points to a packet, so only one packet is sent by virtnet_xsk_xmit().
> I have this information in the following variable "packet" statistics. Finally,
> use the parameter "done" to pass to the upper layer.


I meant you did:

virtnet_poll_tx()
     work_done += virtnet_poll_xsk()
     virtnet_xsk_run()
         virtnet_xsk_xmit_batch()

If there's no suffcieint slot you still need to return the number of 
packet that has been sent.



>
>>
>>> +			break;
>>> +		}
>>> +
>>> +		++packet;
>>> +	}
>>> +
>>> +	if (packet) {
>>> +		if (virtqueue_kick_prepare(sq->vq) && virtqueue_notify(sq->vq))
>>> +			++stats->kicks;
>>> +
>>> +		*done = packet;
>>> +		stats->xdp_tx += packet;
>>> +
>>> +		xsk_tx_release(pool);
>>> +	}
>>> +
>>> +	return ret;
>>> +}
>>> +
>>> +static int virtnet_xsk_run(struct send_queue *sq, struct xsk_buff_pool *pool,
>>> +			   int budget, bool in_napi)
>>> +{
>>> +	struct virtnet_sq_stats stats = {};
>>> +	int done = 0;
>>> +	int err;
>>> +
>>> +	sq->xsk.need_wakeup = false;
>>> +	__free_old_xmit(sq, in_napi, &stats);
>>> +
>>> +	/* return err:
>>> +	 * -EAGAIN: done == budget
>>> +	 * -EBUSY:  done < budget
>>> +	 *  0    :  done < budget
>>> +	 */
>>> +	err = virtnet_xsk_xmit_batch(sq, pool, budget, in_napi, &done, &stats);
>>> +	if (err == -EBUSY) {
>>> +		__free_old_xmit(sq, in_napi, &stats);
>>> +
>>> +		/* If the space is enough, let napi run again. */
>>> +		if (sq->vq->num_free >= 2 + MAX_SKB_FRAGS)
>>> +			done = budget;
>>
>> Why you need to run NAPI isntead of a netif_tx_wake()?
>
> When virtnet_xsk_run() is called by virtnet_xsk_wakeup(), the return value is
> not concerned.
>
> virtnet_xsk_run() is already in napi when it is called by poll_tx(), so if there
> are more slots, I try to return "budget" and let napi call poll_tx() again


Well, my question is why you don't need to wakeup the qdisc in this 
case. Note that the queue could be shared with ndo_start_xmit().


>
>>
>>> +		else
>>> +			sq->xsk.need_wakeup = true;
>>
>> So done is 0, is this intended?
>
> When err == 0, it indicates that the xsk queue has been exhausted. At this time,
> done < budget, return done directly, and napi can be stopped.
>
> When err == -EAGAIN, it indicates that the budget amount of patch has been sent,
> and done should be returned to the upper layer. Wait for napi to call poll_tx()
> again


So the code is only for -EBUSY if I read the code correctly. And in this 
case you return 0.


>
> In both cases, done is directly returned to the upper layer without special
> processing.
>
>>
>>> +	}
>>> +
>>> +	virtnet_xsk_check_queue(sq);
>>> +
>>> +	u64_stats_update_begin(&sq->stats.syncp);
>>> +	sq->stats.packets += stats.packets;
>>> +	sq->stats.bytes += stats.bytes;
>>> +	sq->stats.kicks += stats.kicks;
>>> +	sq->stats.xdp_tx += stats.xdp_tx;
>>> +	u64_stats_update_end(&sq->stats.syncp);
>>> +
>>> +	return done;
>>> +}
>>> +
>>> +static int virtnet_xsk_wakeup(struct net_device *dev, u32 qid, u32 flag)
>>> +{
>>> +	struct virtnet_info *vi = netdev_priv(dev);
>>> +	struct xsk_buff_pool *pool;
>>> +	struct netdev_queue *txq;
>>> +	struct send_queue *sq;
>>> +
>>> +	if (!netif_running(dev))
>>> +		return -ENETDOWN;
>>> +
>>> +	if (qid >= vi->curr_queue_pairs)
>>> +		return -EINVAL;
>>> +
>>> +	sq = &vi->sq[qid];
>>> +
>>> +	rcu_read_lock();
>>> +
>>> +	pool = rcu_dereference(sq->xsk.pool);
>>> +	if (!pool)
>>> +		goto end;
>>> +
>>> +	if (napi_if_scheduled_mark_missed(&sq->napi))
>>> +		goto end;
>>> +
>>> +	txq = netdev_get_tx_queue(dev, qid);
>>> +
>>> +	__netif_tx_lock_bh(txq);
>>> +
>>> +	/* Send part of the packet directly to reduce the delay in sending the
>>> +	 * packet, and this can actively trigger the tx interrupts.
>>> +	 *
>>> +	 * If no packet is sent out, the ring of the device is full. In this
>>> +	 * case, we will still get a tx interrupt response. Then we will deal
>>> +	 * with the subsequent packet sending work.
>>
>> So stmmac schedule NAPI here, do you have perf numbers for this improvement?
>
> virtnet_xsk_wakeup() is called by sendto(). The purpose is to start consuming
> the data in the xsk tx queue. The purpose here is to make napi run.
>
> When napi is not running, I try to send a certain amount of data:
> 1. Reduce the delay of the first packet
> 2. Trigger hardware to generate tx interrupt


I dont' see the code the trigger hardware interrupt and I don't believe 
virtio can do this.

So the point is we need

1) schedule NAPI when napi_if_scheduled_mark_missed() returns false
2) introduce this optimization on top with perf numbers

Thanks

>
> Thanks.
>
>> Thanks
>>
>>
>>> +	 */
>>> +	virtnet_xsk_run(sq, pool, napi_weight, false);
>>> +
>>> +	__netif_tx_unlock_bh(txq);
>>> +
>>> +end:
>>> +	rcu_read_unlock();
>>> +	return 0;
>>> +}
>>> +
>>>    static int virtnet_xsk_pool_enable(struct net_device *dev,
>>>    				   struct xsk_buff_pool *pool,
>>>    				   u16 qid)
>>> @@ -2559,6 +2844,8 @@ static int virtnet_xsk_pool_enable(struct net_device *dev,
>>>    		return -EPERM;
>>>
>>>    	rcu_read_lock();
>>> +	memset(&sq->xsk, 0, sizeof(sq->xsk));
>>> +
>>>    	/* Here is already protected by rtnl_lock, so rcu_assign_pointer is
>>>    	 * safe.
>>>    	 */
>>> @@ -2658,6 +2945,7 @@ static const struct net_device_ops virtnet_netdev = {
>>>    	.ndo_vlan_rx_kill_vid = virtnet_vlan_rx_kill_vid,
>>>    	.ndo_bpf		= virtnet_xdp,
>>>    	.ndo_xdp_xmit		= virtnet_xdp_xmit,
>>> +	.ndo_xsk_wakeup         = virtnet_xsk_wakeup,
>>>    	.ndo_features_check	= passthru_features_check,
>>>    	.ndo_get_phys_port_name	= virtnet_get_phys_port_name,
>>>    	.ndo_set_features	= virtnet_set_features,
>>> @@ -2761,9 +3049,9 @@ static void free_unused_bufs(struct virtnet_info *vi)
>>>    	for (i = 0; i < vi->max_queue_pairs; i++) {
>>>    		struct virtqueue *vq = vi->sq[i].vq;
>>>    		while ((buf = virtqueue_detach_unused_buf(vq)) != NULL) {
>>> -			if (!is_xdp_frame(buf))
>>> +			if (is_skb_ptr(buf))
>>>    				dev_kfree_skb(buf);
>>> -			else
>>> +			else if (is_xdp_frame(buf))
>>>    				xdp_return_frame(ptr_to_xdp(buf));
>>>    		}
>>>    	}


^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [PATCH net-next v4 09/10] virtio-net: xsk zero copy xmit implement wakeup and xmit
       [not found] <1618554586.8830593-2-xuanzhuo@linux.alibaba.com>
@ 2021-04-19  2:46 ` Jason Wang
  0 siblings, 0 replies; 4+ messages in thread
From: Jason Wang @ 2021-04-19  2:46 UTC (permalink / raw)
  To: Xuan Zhuo
  Cc: Michael S. Tsirkin, David S. Miller, Jakub Kicinski,
	Björn Töpel, Magnus Karlsson, Jonathan Lemon,
	Alexei Starovoitov, Daniel Borkmann, Jesper Dangaard Brouer,
	John Fastabend, virtualization, bpf, dust.li, netdev


在 2021/4/16 下午2:29, Xuan Zhuo 写道:
> On Fri, 16 Apr 2021 13:35:33 +0800, Jason Wang <jasowang@redhat.com> wrote:
>> 在 2021/4/15 下午6:27, Xuan Zhuo 写道:
>>> On Wed, 14 Apr 2021 13:46:45 +0800, Jason Wang <jasowang@redhat.com> wrote:
>>>> 在 2021/4/13 上午11:15, Xuan Zhuo 写道:
>>>>> This patch implements the core part of xsk zerocopy xmit.
>>>>>
>>>>> When the user calls sendto to consume the data in the xsk tx queue,
>>>>> virtnet_xsk_wakeup() will be called.
>>>>>
>>>>> In wakeup, it will try to send a part of the data directly. There are
>>>>> two purposes for this realization:
>>>>>
>>>>> 1. Send part of the data quickly to reduce the transmission delay of the
>>>>>       first packet.
>>>>> 2. Trigger tx interrupt, start napi to consume xsk tx data.
>>>>>
>>>>> All sent xsk packets share the virtio-net header of xsk_hdr. If xsk
>>>>> needs to support csum and other functions later, consider assigning xsk
>>>>> hdr separately for each sent packet.
>>>>>
>>>>> There are now three situations in free_old_xmit(): skb, xdp frame, xsk
>>>>> desc.  Based on the last two bit of ptr returned by virtqueue_get_buf():
>>>>>        00 is skb by default.
>>>>>        01 represents the packet sent by xdp
>>>>>        10 is the packet sent by xsk
>>>>>
>>>>> If the xmit work of xsk has not been completed, but the ring is full,
>>>>> napi must first exit and wait for the ring to be available, so
>>>>> need_wakeup() is set. If free_old_xmit() is called first by start_xmit(),
>>>>> we can quickly wake up napi to execute xsk xmit task.
>>>>>
>>>>> When recycling, we need to count the number of bytes sent, so put xsk
>>>>> desc->len into the ptr pointer. Because ptr does not point to meaningful
>>>>> objects in xsk.
>>>>>
>>>>> Signed-off-by: Xuan Zhuo <xuanzhuo@linux.alibaba.com>
>>>>> Reviewed-by: Dust Li <dust.li@linux.alibaba.com>
>>>>> ---
>>>>>     drivers/net/virtio_net.c | 296 ++++++++++++++++++++++++++++++++++++++-
>>>>>     1 file changed, 292 insertions(+), 4 deletions(-)
>>>>>
>>>>> diff --git a/drivers/net/virtio_net.c b/drivers/net/virtio_net.c
>>>>> index 8242a9e9f17d..c441d6bf1510 100644
>>>>> --- a/drivers/net/virtio_net.c
>>>>> +++ b/drivers/net/virtio_net.c
>>>>> @@ -46,6 +46,11 @@ module_param(napi_tx, bool, 0644);
>>>>>     #define VIRTIO_XDP_REDIR	BIT(1)
>>>>>
>>>>>     #define VIRTIO_XDP_FLAG	BIT(0)
>>>>> +#define VIRTIO_XSK_FLAG	BIT(1)
>>>>> +
>>>>> +#define VIRTIO_XSK_PTR_SHIFT       4
>>>>> +
>>>>> +static struct virtio_net_hdr_mrg_rxbuf xsk_hdr;
>>>>>
>>>>>     /* RX packet size EWMA. The average packet size is used to determine the packet
>>>>>      * buffer size when refilling RX rings. As the entire RX ring may be refilled
>>>>> @@ -138,6 +143,12 @@ struct send_queue {
>>>>>     	struct {
>>>>>     		/* xsk pool */
>>>>>     		struct xsk_buff_pool __rcu *pool;
>>>>> +
>>>>> +		/* save the desc for next xmit, when xmit fail. */
>>>>> +		struct xdp_desc last_desc;
>>>> As replied in the pervious version this looks tricky. I think we need to
>>>> make sure to reserve some slots as skb path did.
>>>>
>>>> This looks exactly like what stmmac did which alos shares XDP and skb
>>>> for the same ring.
>>>>
>>>>
>>>>> +
>>>>> +		/* xsk wait for tx inter or softirq */
>>>>> +		bool need_wakeup;
>>>>>     	} xsk;
>>>>>     };
>>>>>
>>>>> @@ -255,6 +266,15 @@ struct padded_vnet_hdr {
>>>>>     	char padding[4];
>>>>>     };
>>>>>
>>>>> +static int virtnet_xsk_run(struct send_queue *sq, struct xsk_buff_pool *pool,
>>>>> +			   int budget, bool in_napi);
>>>>> +static void virtnet_xsk_complete(struct send_queue *sq, u32 num);
>>>>> +
>>>>> +static bool is_skb_ptr(void *ptr)
>>>>> +{
>>>>> +	return !((unsigned long)ptr & (VIRTIO_XDP_FLAG | VIRTIO_XSK_FLAG));
>>>>> +}
>>>>> +
>>>>>     static bool is_xdp_frame(void *ptr)
>>>>>     {
>>>>>     	return (unsigned long)ptr & VIRTIO_XDP_FLAG;
>>>>> @@ -265,6 +285,19 @@ static void *xdp_to_ptr(struct xdp_frame *ptr)
>>>>>     	return (void *)((unsigned long)ptr | VIRTIO_XDP_FLAG);
>>>>>     }
>>>>>
>>>>> +static void *xsk_to_ptr(struct xdp_desc *desc)
>>>>> +{
>>>>> +	/* save the desc len to ptr */
>>>>> +	u64 p = desc->len << VIRTIO_XSK_PTR_SHIFT;
>>>>> +
>>>>> +	return (void *)((unsigned long)p | VIRTIO_XSK_FLAG);
>>>>> +}
>>>>> +
>>>>> +static void ptr_to_xsk(void *ptr, struct xdp_desc *desc)
>>>>> +{
>>>>> +	desc->len = ((unsigned long)ptr) >> VIRTIO_XSK_PTR_SHIFT;
>>>>> +}
>>>>> +
>>>>>     static struct xdp_frame *ptr_to_xdp(void *ptr)
>>>>>     {
>>>>>     	return (struct xdp_frame *)((unsigned long)ptr & ~VIRTIO_XDP_FLAG);
>>>>> @@ -273,25 +306,35 @@ static struct xdp_frame *ptr_to_xdp(void *ptr)
>>>>>     static void __free_old_xmit(struct send_queue *sq, bool in_napi,
>>>>>     			    struct virtnet_sq_stats *stats)
>>>>>     {
>>>>> +	unsigned int xsknum = 0;
>>>>>     	unsigned int len;
>>>>>     	void *ptr;
>>>>>
>>>>>     	while ((ptr = virtqueue_get_buf(sq->vq, &len)) != NULL) {
>>>>> -		if (likely(!is_xdp_frame(ptr))) {
>>>>> +		if (is_skb_ptr(ptr)) {
>>>>>     			struct sk_buff *skb = ptr;
>>>>>
>>>>>     			pr_debug("Sent skb %p\n", skb);
>>>>>
>>>>>     			stats->bytes += skb->len;
>>>>>     			napi_consume_skb(skb, in_napi);
>>>>> -		} else {
>>>>> +		} else if (is_xdp_frame(ptr)) {
>>>>>     			struct xdp_frame *frame = ptr_to_xdp(ptr);
>>>>>
>>>>>     			stats->bytes += frame->len;
>>>>>     			xdp_return_frame(frame);
>>>>> +		} else {
>>>>> +			struct xdp_desc desc;
>>>>> +
>>>>> +			ptr_to_xsk(ptr, &desc);
>>>>> +			stats->bytes += desc.len;
>>>>> +			++xsknum;
>>>>>     		}
>>>>>     		stats->packets++;
>>>>>     	}
>>>>> +
>>>>> +	if (xsknum)
>>>>> +		virtnet_xsk_complete(sq, xsknum);
>>>>>     }
>>>>>
>>>>>     /* Converting between virtqueue no. and kernel tx/rx queue no.
>>>>> @@ -1529,6 +1572,19 @@ static int virtnet_open(struct net_device *dev)
>>>>>     	return 0;
>>>>>     }
>>>>>
>>>>> +static int virtnet_poll_xsk(struct send_queue *sq, int budget)
>>>>> +{
>>>>> +	struct xsk_buff_pool *pool;
>>>>> +	int work_done = 0;
>>>>> +
>>>>> +	rcu_read_lock();
>>>>> +	pool = rcu_dereference(sq->xsk.pool);
>>>>> +	if (pool)
>>>>> +		work_done = virtnet_xsk_run(sq, pool, budget, true);
>>>>> +	rcu_read_unlock();
>>>>> +	return work_done;
>>>>> +}
>>>>> +
>>>>>     static int virtnet_poll_tx(struct napi_struct *napi, int budget)
>>>>>     {
>>>>>     	struct send_queue *sq = container_of(napi, struct send_queue, napi);
>>>>> @@ -1545,6 +1601,7 @@ static int virtnet_poll_tx(struct napi_struct *napi, int budget)
>>>>>
>>>>>     	txq = netdev_get_tx_queue(vi->dev, index);
>>>>>     	__netif_tx_lock(txq, raw_smp_processor_id());
>>>>> +	work_done += virtnet_poll_xsk(sq, budget);
>>>>>     	free_old_xmit(sq, true);
>>>>>     	__netif_tx_unlock(txq);
>>>>>
>>>>> @@ -2535,6 +2592,234 @@ static int virtnet_xdp_set(struct net_device *dev, struct bpf_prog *prog,
>>>>>     	return err;
>>>>>     }
>>>>>
>>>>> +static void virtnet_xsk_check_queue(struct send_queue *sq)
>>>>> +{
>>>>> +	struct virtnet_info *vi = sq->vq->vdev->priv;
>>>>> +	struct net_device *dev = vi->dev;
>>>>> +	int qnum = sq - vi->sq;
>>>>> +
>>>>> +	/* If this sq is not the exclusive queue of the current cpu,
>>>>> +	 * then it may be called by start_xmit, so check it running out
>>>>> +	 * of space.
>>>>> +	 *
>>>> I think it's better to move this check after is_xdp_raw_buffer_queue().
>>> Sorry, do not understand.
>>
>> So what I meant is:
>>
>>
>> /* If it is a raw buffer queue, ... */
>>       if (is_xdp_raw_buffer_queue())
>>
>> /* if it is not the exclusive queue */
>>       if (sq->vq->num_free ...)
> I understand.
>
>>
>>>>> +	 * And if it is a raw buffer queue, it does not check whether the status
>>>>> +	 * of the queue is stopped when sending. So there is no need to check
>>>>> +	 * the situation of the raw buffer queue.
>>>>> +	 */
>>>>> +	if (is_xdp_raw_buffer_queue(vi, qnum))
>>>>> +		return;
>>>>> +
>>>>> +	/* Stop the queue to avoid getting packets that we are
>>>>> +	 * then unable to transmit. Then wait the tx interrupt.
>>>>> +	 */
>>>>> +	if (sq->vq->num_free < 2 + MAX_SKB_FRAGS)
>>>>> +		netif_stop_subqueue(dev, qnum);
>>>> Is there any way to stop xsk TX here?
>>> xsk tx is driven by tx napi, so stop has no meaning.
>>
>> So NAPI can be stopped.
>>
>>
>>>
>>>>> +}
>>>>> +
>>>>> +static void virtnet_xsk_complete(struct send_queue *sq, u32 num)
>>>>> +{
>>>>> +	struct xsk_buff_pool *pool;
>>>>> +
>>>>> +	rcu_read_lock();
>>>>> +
>>>>> +	pool = rcu_dereference(sq->xsk.pool);
>>>>> +	if (!pool) {
>>>>> +		rcu_read_unlock();
>>>>> +		return;
>>>>> +	}
>>>>> +	xsk_tx_completed(pool, num);
>>>>> +	rcu_read_unlock();
>>>>> +
>>>>> +	if (sq->xsk.need_wakeup) {
>>>>> +		sq->xsk.need_wakeup = false;
>>>>> +		virtqueue_napi_schedule(&sq->napi, sq->vq);
>>>>> +	}
>>>>> +}
>>>>> +
>>>>> +static int virtnet_xsk_xmit(struct send_queue *sq, struct xsk_buff_pool *pool,
>>>>> +			    struct xdp_desc *desc)
>>>>> +{
>>>>> +	struct virtnet_info *vi;
>>>>> +	u32 offset, n, len;
>>>>> +	struct page *page;
>>>>> +	void *data;
>>>>> +	u64 addr;
>>>>> +	int err;
>>>>> +
>>>>> +	vi = sq->vq->vdev->priv;
>>>>> +	addr = desc->addr;
>>>>> +
>>>>> +	data = xsk_buff_raw_get_data(pool, addr);
>>>>> +	offset = offset_in_page(data);
>>>>> +
>>>>> +	/* xsk unaligned mode, desc may use two pages */
>>>>> +	if (desc->len > PAGE_SIZE - offset)
>>>>> +		n = 3;
>>>>> +	else
>>>>> +		n = 2;
>>>>> +
>>>>> +	sg_init_table(sq->sg, n);
>>>>> +	sg_set_buf(sq->sg, &xsk_hdr, vi->hdr_len);
>>>>> +
>>>>> +	/* handle for xsk first page */
>>>>> +	len = min_t(int, desc->len, PAGE_SIZE - offset);
>>>>> +	page = xsk_buff_xdp_get_page(pool, addr);
>>>>> +	sg_set_page(sq->sg + 1, page, len, offset);
>>>>> +
>>>>> +	/* xsk unaligned mode, handle for the second page */
>>>>> +	if (len < desc->len) {
>>>>> +		page = xsk_buff_xdp_get_page(pool, addr + len);
>>>>> +		len = min_t(int, desc->len - len, PAGE_SIZE);
>>>>> +		sg_set_page(sq->sg + 2, page, len, 0);
>>>>> +	}
>>>>> +
>>>>> +	err = virtqueue_add_outbuf(sq->vq, sq->sg, n, xsk_to_ptr(desc),
>>>>> +				   GFP_ATOMIC);
>>>>> +	if (unlikely(err))
>>>>> +		sq->xsk.last_desc = *desc;
>>>>> +
>>>>> +	return err;
>>>>> +}
>>>>> +
>>>>> +static int virtnet_xsk_xmit_batch(struct send_queue *sq,
>>>>> +				  struct xsk_buff_pool *pool,
>>>>> +				  unsigned int budget,
>>>>> +				  bool in_napi, int *done,
>>>>> +				  struct virtnet_sq_stats *stats)
>>>>> +{
>>>>> +	struct xdp_desc desc;
>>>>> +	int err, packet = 0;
>>>>> +	int ret = -EAGAIN;
>>>>> +
>>>>> +	if (sq->xsk.last_desc.addr) {
>>>>> +		if (sq->vq->num_free < 2 + MAX_SKB_FRAGS)
>>>>> +			return -EBUSY;
>>>>> +
>>>>> +		err = virtnet_xsk_xmit(sq, pool, &sq->xsk.last_desc);
>>>>> +		if (unlikely(err))
>>>>> +			return -EBUSY;
>>>>> +
>>>>> +		++packet;
>>>>> +		--budget;
>>>>> +		sq->xsk.last_desc.addr = 0;
>>>> So I think we don't need to do this since we try always to reserve 2 +
>>>> MAX_SKB_FRAGS, then it means we get -EIO/-ENOMEM which is bascially a
>>>> broken device or dma map.
>>>>
>>> Does it mean that when there are enough slots, virtqueue_add_outbuf() returns
>>> failure -EIO/-ENOMEM, which means that the network card is no longer working.
>>
>> Or it could be a bug of the driver. And you don't need to check num_free
>> before if you always try to reserve sufficient slots.
>>
>>
>>> I originally thought that an error was returned, but it may work normally next
>>> time. It should be my fault.
>>>
>>> Thank you very much, I learned it. I will delete the related code.
>>>
>>>>> +	}
>>>>> +
>>>>> +	while (budget-- > 0) {
>>>>> +		if (sq->vq->num_free < 2 + MAX_SKB_FRAGS) {
>>>>> +			ret = -EBUSY;
>>>>> +			break;
>>>>> +		}
>>>>> +
>>>>> +		if (!xsk_tx_peek_desc(pool, &desc)) {
>>>>> +			/* done */
>>>>> +			ret = 0;
>>>>> +			break;
>>>>> +		}
>>>>> +
>>>>> +		err = virtnet_xsk_xmit(sq, pool, &desc);
>>>>> +		if (unlikely(err)) {
>>>>> +			ret = -EBUSY;
>>>> Since the function will be called by NAPI I think we to report the
>>>> number of packets that is transmitted as well.
>>> A 'desc' points to a packet, so only one packet is sent by virtnet_xsk_xmit().
>>> I have this information in the following variable "packet" statistics. Finally,
>>> use the parameter "done" to pass to the upper layer.
>>
>> I meant you did:
>>
>> virtnet_poll_tx()
>>       work_done += virtnet_poll_xsk()
>>       virtnet_xsk_run()
>>           virtnet_xsk_xmit_batch()
>>
>> If there's no suffcieint slot you still need to return the number of
>> packet that has been sent.
> I don’t understand what you mean.
>
> As long as there are packets sent, "++packet" will be counted, and finally
> returned to virtnet_xsk_run() through "done", regardless of whether there is
> insufficient slot.


Right, I misread the code.


>
>>
>>
>>>>> +			break;
>>>>> +		}
>>>>> +
>>>>> +		++packet;
>>>>> +	}
>>>>> +
>>>>> +	if (packet) {
>>>>> +		if (virtqueue_kick_prepare(sq->vq) && virtqueue_notify(sq->vq))
>>>>> +			++stats->kicks;
>>>>> +
>>>>> +		*done = packet;
>>>>> +		stats->xdp_tx += packet;
>>>>> +
>>>>> +		xsk_tx_release(pool);
>>>>> +	}
>>>>> +
>>>>> +	return ret;
>>>>> +}
>>>>> +
>>>>> +static int virtnet_xsk_run(struct send_queue *sq, struct xsk_buff_pool *pool,
>>>>> +			   int budget, bool in_napi)
>>>>> +{
>>>>> +	struct virtnet_sq_stats stats = {};
>>>>> +	int done = 0;
>>>>> +	int err;
>>>>> +
>>>>> +	sq->xsk.need_wakeup = false;
>>>>> +	__free_old_xmit(sq, in_napi, &stats);
>>>>> +
>>>>> +	/* return err:
>>>>> +	 * -EAGAIN: done == budget
>>>>> +	 * -EBUSY:  done < budget
>>>>> +	 *  0    :  done < budget
>>>>> +	 */
>>>>> +	err = virtnet_xsk_xmit_batch(sq, pool, budget, in_napi, &done, &stats);
>>>>> +	if (err == -EBUSY) {
>>>>> +		__free_old_xmit(sq, in_napi, &stats);
>>>>> +
>>>>> +		/* If the space is enough, let napi run again. */
>>>>> +		if (sq->vq->num_free >= 2 + MAX_SKB_FRAGS)
>>>>> +			done = budget;
>>>> Why you need to run NAPI isntead of a netif_tx_wake()?
>>> When virtnet_xsk_run() is called by virtnet_xsk_wakeup(), the return value is
>>> not concerned.
>>>
>>> virtnet_xsk_run() is already in napi when it is called by poll_tx(), so if there
>>> are more slots, I try to return "budget" and let napi call poll_tx() again
>>
>> Well, my question is why you don't need to wakeup the qdisc in this
>> case. Note that the queue could be shared with ndo_start_xmit().
> virtnet_xsk_run() will been called by virtnet_poll_tx() or virtnet_xsk_wakeup().
>
> 1. virtnet_poll_tx() will call netif_tx_wake_queue()
>
> static int virtnet_poll_tx(struct napi_struct *napi, int budget)
> {
> 	.....
>
> 	if (sq->vq->num_free >= 2 + MAX_SKB_FRAGS)
> 		netif_tx_wake_queue(txq);
>
> 2. virtnet_xsk_wakeup()
>       You are right. We can indeed call virtnet_xsk_wakeup() here.
>
>>
>>>>> +		else
>>>>> +			sq->xsk.need_wakeup = true;
>>>> So done is 0, is this intended?
>>> When err == 0, it indicates that the xsk queue has been exhausted. At this time,
>>> done < budget, return done directly, and napi can be stopped.
>>>
>>> When err == -EAGAIN, it indicates that the budget amount of patch has been sent,
>>> and done should be returned to the upper layer. Wait for napi to call poll_tx()
>>> again
>>
>> So the code is only for -EBUSY if I read the code correctly. And in this
>> case you return 0.
> This case, I return 'done' and the done < budge.  'done' may not be 0.
>
> done < budget, so napi will stop.


So to question is about this:

                 /* If the space is enough, let napi run again. */
                 if (sq->vq->num_free >= 2 + MAX_SKB_FRAGS)
                         done = budget;
else
                         sq->xsk.need_wakeup = true;

So done is 0 when num_free < 2 + MAX_SKB_FRAGS, I think you should 
return done here.

Note that done is still less than budget in this case.

Another question, when num_free >= 2 + MAX_SKB_FRAGS why not simply 
re-try virtnet_xsk_xmit_batch here?


>
>
>>
>>> In both cases, done is directly returned to the upper layer without special
>>> processing.
>>>
>>>>> +	}
>>>>> +
>>>>> +	virtnet_xsk_check_queue(sq);
>>>>> +
>>>>> +	u64_stats_update_begin(&sq->stats.syncp);
>>>>> +	sq->stats.packets += stats.packets;
>>>>> +	sq->stats.bytes += stats.bytes;
>>>>> +	sq->stats.kicks += stats.kicks;
>>>>> +	sq->stats.xdp_tx += stats.xdp_tx;
>>>>> +	u64_stats_update_end(&sq->stats.syncp);
>>>>> +
>>>>> +	return done;
>>>>> +}
>>>>> +
>>>>> +static int virtnet_xsk_wakeup(struct net_device *dev, u32 qid, u32 flag)
>>>>> +{
>>>>> +	struct virtnet_info *vi = netdev_priv(dev);
>>>>> +	struct xsk_buff_pool *pool;
>>>>> +	struct netdev_queue *txq;
>>>>> +	struct send_queue *sq;
>>>>> +
>>>>> +	if (!netif_running(dev))
>>>>> +		return -ENETDOWN;
>>>>> +
>>>>> +	if (qid >= vi->curr_queue_pairs)
>>>>> +		return -EINVAL;
>>>>> +
>>>>> +	sq = &vi->sq[qid];
>>>>> +
>>>>> +	rcu_read_lock();
>>>>> +
>>>>> +	pool = rcu_dereference(sq->xsk.pool);
>>>>> +	if (!pool)
>>>>> +		goto end;
>>>>> +
>>>>> +	if (napi_if_scheduled_mark_missed(&sq->napi))
>>>>> +		goto end;
>>>>> +
>>>>> +	txq = netdev_get_tx_queue(dev, qid);
>>>>> +
>>>>> +	__netif_tx_lock_bh(txq);
>>>>> +
>>>>> +	/* Send part of the packet directly to reduce the delay in sending the
>>>>> +	 * packet, and this can actively trigger the tx interrupts.
>>>>> +	 *
>>>>> +	 * If no packet is sent out, the ring of the device is full. In this
>>>>> +	 * case, we will still get a tx interrupt response. Then we will deal
>>>>> +	 * with the subsequent packet sending work.
>>>> So stmmac schedule NAPI here, do you have perf numbers for this improvement?
>>> virtnet_xsk_wakeup() is called by sendto(). The purpose is to start consuming
>>> the data in the xsk tx queue. The purpose here is to make napi run.
>>>
>>> When napi is not running, I try to send a certain amount of data:
>>> 1. Reduce the delay of the first packet
>>> 2. Trigger hardware to generate tx interrupt
>>
>> I dont' see the code the trigger hardware interrupt and I don't believe
>> virtio can do this.
>>
>> So the point is we need
>>
>> 1) schedule NAPI when napi_if_scheduled_mark_missed() returns false
>> 2) introduce this optimization on top with perf numbers
> It is true that virtio net does not directly let the hardware generate a tx
> method. So I try to send data here to wait for the tx interrupt notification
> from the hardware to recycle the sent packets.
>
> Is this method not feasible?


I think the answer is yes. But you need to benchmark this optimization 
to make sure it can give us the imporvement as you expected.

So I suggest to do something like follow:

1) start with a simple napi schedule here
2) add your optimization on top and benchmark the difference


>
> Do you mean the performance improvement brought by xsk zerocopy? I should put it
> in the patch set cover. Indeed I should put one in the commit log of this patch.
>
> There is a problem with that direct schedule NAPI. It will wait for NAPI to run
> on the current cpu. This will cause an obvious delay. Especially the current
> user process may occupy the cpu.


Yes, but can you simply do:

local_bh_disalbe();
netif_napi_schedule();
local_bh_enable();

?

Thanks


>
> Thanks.
>
>> Thanks
>>
>>> Thanks.
>>>
>>>> Thanks
>>>>
>>>>
>>>>> +	 */
>>>>> +	virtnet_xsk_run(sq, pool, napi_weight, false);
>>>>> +
>>>>> +	__netif_tx_unlock_bh(txq);
>>>>> +
>>>>> +end:
>>>>> +	rcu_read_unlock();
>>>>> +	return 0;
>>>>> +}
>>>>> +
>>>>>     static int virtnet_xsk_pool_enable(struct net_device *dev,
>>>>>     				   struct xsk_buff_pool *pool,
>>>>>     				   u16 qid)
>>>>> @@ -2559,6 +2844,8 @@ static int virtnet_xsk_pool_enable(struct net_device *dev,
>>>>>     		return -EPERM;
>>>>>
>>>>>     	rcu_read_lock();
>>>>> +	memset(&sq->xsk, 0, sizeof(sq->xsk));
>>>>> +
>>>>>     	/* Here is already protected by rtnl_lock, so rcu_assign_pointer is
>>>>>     	 * safe.
>>>>>     	 */
>>>>> @@ -2658,6 +2945,7 @@ static const struct net_device_ops virtnet_netdev = {
>>>>>     	.ndo_vlan_rx_kill_vid = virtnet_vlan_rx_kill_vid,
>>>>>     	.ndo_bpf		= virtnet_xdp,
>>>>>     	.ndo_xdp_xmit		= virtnet_xdp_xmit,
>>>>> +	.ndo_xsk_wakeup         = virtnet_xsk_wakeup,
>>>>>     	.ndo_features_check	= passthru_features_check,
>>>>>     	.ndo_get_phys_port_name	= virtnet_get_phys_port_name,
>>>>>     	.ndo_set_features	= virtnet_set_features,
>>>>> @@ -2761,9 +3049,9 @@ static void free_unused_bufs(struct virtnet_info *vi)
>>>>>     	for (i = 0; i < vi->max_queue_pairs; i++) {
>>>>>     		struct virtqueue *vq = vi->sq[i].vq;
>>>>>     		while ((buf = virtqueue_detach_unused_buf(vq)) != NULL) {
>>>>> -			if (!is_xdp_frame(buf))
>>>>> +			if (is_skb_ptr(buf))
>>>>>     				dev_kfree_skb(buf);
>>>>> -			else
>>>>> +			else if (is_xdp_frame(buf))
>>>>>     				xdp_return_frame(ptr_to_xdp(buf));
>>>>>     		}
>>>>>     	}


^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [PATCH net-next v4 09/10] virtio-net: xsk zero copy xmit implement wakeup and xmit
  2021-04-13  3:15 ` [PATCH net-next v4 09/10] virtio-net: xsk zero copy xmit implement wakeup and xmit Xuan Zhuo
@ 2021-04-14  5:46   ` Jason Wang
  0 siblings, 0 replies; 4+ messages in thread
From: Jason Wang @ 2021-04-14  5:46 UTC (permalink / raw)
  To: Xuan Zhuo, netdev
  Cc: Michael S. Tsirkin, David S. Miller, Jakub Kicinski,
	Björn Töpel, Magnus Karlsson, Jonathan Lemon,
	Alexei Starovoitov, Daniel Borkmann, Jesper Dangaard Brouer,
	John Fastabend, virtualization, bpf, dust . li


在 2021/4/13 上午11:15, Xuan Zhuo 写道:
> This patch implements the core part of xsk zerocopy xmit.
>
> When the user calls sendto to consume the data in the xsk tx queue,
> virtnet_xsk_wakeup() will be called.
>
> In wakeup, it will try to send a part of the data directly. There are
> two purposes for this realization:
>
> 1. Send part of the data quickly to reduce the transmission delay of the
>     first packet.
> 2. Trigger tx interrupt, start napi to consume xsk tx data.
>
> All sent xsk packets share the virtio-net header of xsk_hdr. If xsk
> needs to support csum and other functions later, consider assigning xsk
> hdr separately for each sent packet.
>
> There are now three situations in free_old_xmit(): skb, xdp frame, xsk
> desc.  Based on the last two bit of ptr returned by virtqueue_get_buf():
>      00 is skb by default.
>      01 represents the packet sent by xdp
>      10 is the packet sent by xsk
>
> If the xmit work of xsk has not been completed, but the ring is full,
> napi must first exit and wait for the ring to be available, so
> need_wakeup() is set. If free_old_xmit() is called first by start_xmit(),
> we can quickly wake up napi to execute xsk xmit task.
>
> When recycling, we need to count the number of bytes sent, so put xsk
> desc->len into the ptr pointer. Because ptr does not point to meaningful
> objects in xsk.
>
> Signed-off-by: Xuan Zhuo <xuanzhuo@linux.alibaba.com>
> Reviewed-by: Dust Li <dust.li@linux.alibaba.com>
> ---
>   drivers/net/virtio_net.c | 296 ++++++++++++++++++++++++++++++++++++++-
>   1 file changed, 292 insertions(+), 4 deletions(-)
>
> diff --git a/drivers/net/virtio_net.c b/drivers/net/virtio_net.c
> index 8242a9e9f17d..c441d6bf1510 100644
> --- a/drivers/net/virtio_net.c
> +++ b/drivers/net/virtio_net.c
> @@ -46,6 +46,11 @@ module_param(napi_tx, bool, 0644);
>   #define VIRTIO_XDP_REDIR	BIT(1)
>   
>   #define VIRTIO_XDP_FLAG	BIT(0)
> +#define VIRTIO_XSK_FLAG	BIT(1)
> +
> +#define VIRTIO_XSK_PTR_SHIFT       4
> +
> +static struct virtio_net_hdr_mrg_rxbuf xsk_hdr;
>   
>   /* RX packet size EWMA. The average packet size is used to determine the packet
>    * buffer size when refilling RX rings. As the entire RX ring may be refilled
> @@ -138,6 +143,12 @@ struct send_queue {
>   	struct {
>   		/* xsk pool */
>   		struct xsk_buff_pool __rcu *pool;
> +
> +		/* save the desc for next xmit, when xmit fail. */
> +		struct xdp_desc last_desc;


As replied in the pervious version this looks tricky. I think we need to 
make sure to reserve some slots as skb path did.

This looks exactly like what stmmac did which alos shares XDP and skb 
for the same ring.


> +
> +		/* xsk wait for tx inter or softirq */
> +		bool need_wakeup;
>   	} xsk;
>   };
>   
> @@ -255,6 +266,15 @@ struct padded_vnet_hdr {
>   	char padding[4];
>   };
>   
> +static int virtnet_xsk_run(struct send_queue *sq, struct xsk_buff_pool *pool,
> +			   int budget, bool in_napi);
> +static void virtnet_xsk_complete(struct send_queue *sq, u32 num);
> +
> +static bool is_skb_ptr(void *ptr)
> +{
> +	return !((unsigned long)ptr & (VIRTIO_XDP_FLAG | VIRTIO_XSK_FLAG));
> +}
> +
>   static bool is_xdp_frame(void *ptr)
>   {
>   	return (unsigned long)ptr & VIRTIO_XDP_FLAG;
> @@ -265,6 +285,19 @@ static void *xdp_to_ptr(struct xdp_frame *ptr)
>   	return (void *)((unsigned long)ptr | VIRTIO_XDP_FLAG);
>   }
>   
> +static void *xsk_to_ptr(struct xdp_desc *desc)
> +{
> +	/* save the desc len to ptr */
> +	u64 p = desc->len << VIRTIO_XSK_PTR_SHIFT;
> +
> +	return (void *)((unsigned long)p | VIRTIO_XSK_FLAG);
> +}
> +
> +static void ptr_to_xsk(void *ptr, struct xdp_desc *desc)
> +{
> +	desc->len = ((unsigned long)ptr) >> VIRTIO_XSK_PTR_SHIFT;
> +}
> +
>   static struct xdp_frame *ptr_to_xdp(void *ptr)
>   {
>   	return (struct xdp_frame *)((unsigned long)ptr & ~VIRTIO_XDP_FLAG);
> @@ -273,25 +306,35 @@ static struct xdp_frame *ptr_to_xdp(void *ptr)
>   static void __free_old_xmit(struct send_queue *sq, bool in_napi,
>   			    struct virtnet_sq_stats *stats)
>   {
> +	unsigned int xsknum = 0;
>   	unsigned int len;
>   	void *ptr;
>   
>   	while ((ptr = virtqueue_get_buf(sq->vq, &len)) != NULL) {
> -		if (likely(!is_xdp_frame(ptr))) {
> +		if (is_skb_ptr(ptr)) {
>   			struct sk_buff *skb = ptr;
>   
>   			pr_debug("Sent skb %p\n", skb);
>   
>   			stats->bytes += skb->len;
>   			napi_consume_skb(skb, in_napi);
> -		} else {
> +		} else if (is_xdp_frame(ptr)) {
>   			struct xdp_frame *frame = ptr_to_xdp(ptr);
>   
>   			stats->bytes += frame->len;
>   			xdp_return_frame(frame);
> +		} else {
> +			struct xdp_desc desc;
> +
> +			ptr_to_xsk(ptr, &desc);
> +			stats->bytes += desc.len;
> +			++xsknum;
>   		}
>   		stats->packets++;
>   	}
> +
> +	if (xsknum)
> +		virtnet_xsk_complete(sq, xsknum);
>   }
>   
>   /* Converting between virtqueue no. and kernel tx/rx queue no.
> @@ -1529,6 +1572,19 @@ static int virtnet_open(struct net_device *dev)
>   	return 0;
>   }
>   
> +static int virtnet_poll_xsk(struct send_queue *sq, int budget)
> +{
> +	struct xsk_buff_pool *pool;
> +	int work_done = 0;
> +
> +	rcu_read_lock();
> +	pool = rcu_dereference(sq->xsk.pool);
> +	if (pool)
> +		work_done = virtnet_xsk_run(sq, pool, budget, true);
> +	rcu_read_unlock();
> +	return work_done;
> +}
> +
>   static int virtnet_poll_tx(struct napi_struct *napi, int budget)
>   {
>   	struct send_queue *sq = container_of(napi, struct send_queue, napi);
> @@ -1545,6 +1601,7 @@ static int virtnet_poll_tx(struct napi_struct *napi, int budget)
>   
>   	txq = netdev_get_tx_queue(vi->dev, index);
>   	__netif_tx_lock(txq, raw_smp_processor_id());
> +	work_done += virtnet_poll_xsk(sq, budget);
>   	free_old_xmit(sq, true);
>   	__netif_tx_unlock(txq);
>   
> @@ -2535,6 +2592,234 @@ static int virtnet_xdp_set(struct net_device *dev, struct bpf_prog *prog,
>   	return err;
>   }
>   
> +static void virtnet_xsk_check_queue(struct send_queue *sq)
> +{
> +	struct virtnet_info *vi = sq->vq->vdev->priv;
> +	struct net_device *dev = vi->dev;
> +	int qnum = sq - vi->sq;
> +
> +	/* If this sq is not the exclusive queue of the current cpu,
> +	 * then it may be called by start_xmit, so check it running out
> +	 * of space.
> +	 *


I think it's better to move this check after is_xdp_raw_buffer_queue().


> +	 * And if it is a raw buffer queue, it does not check whether the status
> +	 * of the queue is stopped when sending. So there is no need to check
> +	 * the situation of the raw buffer queue.
> +	 */
> +	if (is_xdp_raw_buffer_queue(vi, qnum))
> +		return;
> +
> +	/* Stop the queue to avoid getting packets that we are
> +	 * then unable to transmit. Then wait the tx interrupt.
> +	 */
> +	if (sq->vq->num_free < 2 + MAX_SKB_FRAGS)
> +		netif_stop_subqueue(dev, qnum);


Is there any way to stop xsk TX here?


> +}
> +
> +static void virtnet_xsk_complete(struct send_queue *sq, u32 num)
> +{
> +	struct xsk_buff_pool *pool;
> +
> +	rcu_read_lock();
> +
> +	pool = rcu_dereference(sq->xsk.pool);
> +	if (!pool) {
> +		rcu_read_unlock();
> +		return;
> +	}
> +	xsk_tx_completed(pool, num);
> +	rcu_read_unlock();
> +
> +	if (sq->xsk.need_wakeup) {
> +		sq->xsk.need_wakeup = false;
> +		virtqueue_napi_schedule(&sq->napi, sq->vq);
> +	}
> +}
> +
> +static int virtnet_xsk_xmit(struct send_queue *sq, struct xsk_buff_pool *pool,
> +			    struct xdp_desc *desc)
> +{
> +	struct virtnet_info *vi;
> +	u32 offset, n, len;
> +	struct page *page;
> +	void *data;
> +	u64 addr;
> +	int err;
> +
> +	vi = sq->vq->vdev->priv;
> +	addr = desc->addr;
> +
> +	data = xsk_buff_raw_get_data(pool, addr);
> +	offset = offset_in_page(data);
> +
> +	/* xsk unaligned mode, desc may use two pages */
> +	if (desc->len > PAGE_SIZE - offset)
> +		n = 3;
> +	else
> +		n = 2;
> +
> +	sg_init_table(sq->sg, n);
> +	sg_set_buf(sq->sg, &xsk_hdr, vi->hdr_len);
> +
> +	/* handle for xsk first page */
> +	len = min_t(int, desc->len, PAGE_SIZE - offset);
> +	page = xsk_buff_xdp_get_page(pool, addr);
> +	sg_set_page(sq->sg + 1, page, len, offset);
> +
> +	/* xsk unaligned mode, handle for the second page */
> +	if (len < desc->len) {
> +		page = xsk_buff_xdp_get_page(pool, addr + len);
> +		len = min_t(int, desc->len - len, PAGE_SIZE);
> +		sg_set_page(sq->sg + 2, page, len, 0);
> +	}
> +
> +	err = virtqueue_add_outbuf(sq->vq, sq->sg, n, xsk_to_ptr(desc),
> +				   GFP_ATOMIC);
> +	if (unlikely(err))
> +		sq->xsk.last_desc = *desc;
> +
> +	return err;
> +}
> +
> +static int virtnet_xsk_xmit_batch(struct send_queue *sq,
> +				  struct xsk_buff_pool *pool,
> +				  unsigned int budget,
> +				  bool in_napi, int *done,
> +				  struct virtnet_sq_stats *stats)
> +{
> +	struct xdp_desc desc;
> +	int err, packet = 0;
> +	int ret = -EAGAIN;
> +
> +	if (sq->xsk.last_desc.addr) {
> +		if (sq->vq->num_free < 2 + MAX_SKB_FRAGS)
> +			return -EBUSY;
> +
> +		err = virtnet_xsk_xmit(sq, pool, &sq->xsk.last_desc);
> +		if (unlikely(err))
> +			return -EBUSY;
> +
> +		++packet;
> +		--budget;
> +		sq->xsk.last_desc.addr = 0;


So I think we don't need to do this since we try always to reserve 2 + 
MAX_SKB_FRAGS, then it means we get -EIO/-ENOMEM which is bascially a 
broken device or dma map.


> +	}
> +
> +	while (budget-- > 0) {
> +		if (sq->vq->num_free < 2 + MAX_SKB_FRAGS) {
> +			ret = -EBUSY;
> +			break;
> +		}
> +
> +		if (!xsk_tx_peek_desc(pool, &desc)) {
> +			/* done */
> +			ret = 0;
> +			break;
> +		}
> +
> +		err = virtnet_xsk_xmit(sq, pool, &desc);
> +		if (unlikely(err)) {
> +			ret = -EBUSY;


Since the function will be called by NAPI I think we to report the 
number of packets that is transmitted as well.


> +			break;
> +		}
> +
> +		++packet;
> +	}
> +
> +	if (packet) {
> +		if (virtqueue_kick_prepare(sq->vq) && virtqueue_notify(sq->vq))
> +			++stats->kicks;
> +
> +		*done = packet;
> +		stats->xdp_tx += packet;
> +
> +		xsk_tx_release(pool);
> +	}
> +
> +	return ret;
> +}
> +
> +static int virtnet_xsk_run(struct send_queue *sq, struct xsk_buff_pool *pool,
> +			   int budget, bool in_napi)
> +{
> +	struct virtnet_sq_stats stats = {};
> +	int done = 0;
> +	int err;
> +
> +	sq->xsk.need_wakeup = false;
> +	__free_old_xmit(sq, in_napi, &stats);
> +
> +	/* return err:
> +	 * -EAGAIN: done == budget
> +	 * -EBUSY:  done < budget
> +	 *  0    :  done < budget
> +	 */
> +	err = virtnet_xsk_xmit_batch(sq, pool, budget, in_napi, &done, &stats);
> +	if (err == -EBUSY) {
> +		__free_old_xmit(sq, in_napi, &stats);
> +
> +		/* If the space is enough, let napi run again. */
> +		if (sq->vq->num_free >= 2 + MAX_SKB_FRAGS)
> +			done = budget;


Why you need to run NAPI isntead of a netif_tx_wake()?


> +		else
> +			sq->xsk.need_wakeup = true;


So done is 0, is this intended?


> +	}
> +
> +	virtnet_xsk_check_queue(sq);
> +
> +	u64_stats_update_begin(&sq->stats.syncp);
> +	sq->stats.packets += stats.packets;
> +	sq->stats.bytes += stats.bytes;
> +	sq->stats.kicks += stats.kicks;
> +	sq->stats.xdp_tx += stats.xdp_tx;
> +	u64_stats_update_end(&sq->stats.syncp);
> +
> +	return done;
> +}
> +
> +static int virtnet_xsk_wakeup(struct net_device *dev, u32 qid, u32 flag)
> +{
> +	struct virtnet_info *vi = netdev_priv(dev);
> +	struct xsk_buff_pool *pool;
> +	struct netdev_queue *txq;
> +	struct send_queue *sq;
> +
> +	if (!netif_running(dev))
> +		return -ENETDOWN;
> +
> +	if (qid >= vi->curr_queue_pairs)
> +		return -EINVAL;
> +
> +	sq = &vi->sq[qid];
> +
> +	rcu_read_lock();
> +
> +	pool = rcu_dereference(sq->xsk.pool);
> +	if (!pool)
> +		goto end;
> +
> +	if (napi_if_scheduled_mark_missed(&sq->napi))
> +		goto end;
> +
> +	txq = netdev_get_tx_queue(dev, qid);
> +
> +	__netif_tx_lock_bh(txq);
> +
> +	/* Send part of the packet directly to reduce the delay in sending the
> +	 * packet, and this can actively trigger the tx interrupts.
> +	 *
> +	 * If no packet is sent out, the ring of the device is full. In this
> +	 * case, we will still get a tx interrupt response. Then we will deal
> +	 * with the subsequent packet sending work.


So stmmac schedule NAPI here, do you have perf numbers for this improvement?

Thanks


> +	 */
> +	virtnet_xsk_run(sq, pool, napi_weight, false);
> +
> +	__netif_tx_unlock_bh(txq);
> +
> +end:
> +	rcu_read_unlock();
> +	return 0;
> +}
> +
>   static int virtnet_xsk_pool_enable(struct net_device *dev,
>   				   struct xsk_buff_pool *pool,
>   				   u16 qid)
> @@ -2559,6 +2844,8 @@ static int virtnet_xsk_pool_enable(struct net_device *dev,
>   		return -EPERM;
>   
>   	rcu_read_lock();
> +	memset(&sq->xsk, 0, sizeof(sq->xsk));
> +
>   	/* Here is already protected by rtnl_lock, so rcu_assign_pointer is
>   	 * safe.
>   	 */
> @@ -2658,6 +2945,7 @@ static const struct net_device_ops virtnet_netdev = {
>   	.ndo_vlan_rx_kill_vid = virtnet_vlan_rx_kill_vid,
>   	.ndo_bpf		= virtnet_xdp,
>   	.ndo_xdp_xmit		= virtnet_xdp_xmit,
> +	.ndo_xsk_wakeup         = virtnet_xsk_wakeup,
>   	.ndo_features_check	= passthru_features_check,
>   	.ndo_get_phys_port_name	= virtnet_get_phys_port_name,
>   	.ndo_set_features	= virtnet_set_features,
> @@ -2761,9 +3049,9 @@ static void free_unused_bufs(struct virtnet_info *vi)
>   	for (i = 0; i < vi->max_queue_pairs; i++) {
>   		struct virtqueue *vq = vi->sq[i].vq;
>   		while ((buf = virtqueue_detach_unused_buf(vq)) != NULL) {
> -			if (!is_xdp_frame(buf))
> +			if (is_skb_ptr(buf))
>   				dev_kfree_skb(buf);
> -			else
> +			else if (is_xdp_frame(buf))
>   				xdp_return_frame(ptr_to_xdp(buf));
>   		}
>   	}


^ permalink raw reply	[flat|nested] 4+ messages in thread

* [PATCH net-next v4 09/10] virtio-net: xsk zero copy xmit implement wakeup and xmit
  2021-04-13  3:15 [PATCH net-next v4 00/10] virtio-net support xdp socket zero copy xmit Xuan Zhuo
@ 2021-04-13  3:15 ` Xuan Zhuo
  2021-04-14  5:46   ` Jason Wang
  0 siblings, 1 reply; 4+ messages in thread
From: Xuan Zhuo @ 2021-04-13  3:15 UTC (permalink / raw)
  To: netdev
  Cc: Michael S. Tsirkin, Jason Wang, David S. Miller, Jakub Kicinski,
	Björn Töpel, Magnus Karlsson, Jonathan Lemon,
	Alexei Starovoitov, Daniel Borkmann, Jesper Dangaard Brouer,
	John Fastabend, virtualization, bpf, dust . li

This patch implements the core part of xsk zerocopy xmit.

When the user calls sendto to consume the data in the xsk tx queue,
virtnet_xsk_wakeup() will be called.

In wakeup, it will try to send a part of the data directly. There are
two purposes for this realization:

1. Send part of the data quickly to reduce the transmission delay of the
   first packet.
2. Trigger tx interrupt, start napi to consume xsk tx data.

All sent xsk packets share the virtio-net header of xsk_hdr. If xsk
needs to support csum and other functions later, consider assigning xsk
hdr separately for each sent packet.

There are now three situations in free_old_xmit(): skb, xdp frame, xsk
desc.  Based on the last two bit of ptr returned by virtqueue_get_buf():
    00 is skb by default.
    01 represents the packet sent by xdp
    10 is the packet sent by xsk

If the xmit work of xsk has not been completed, but the ring is full,
napi must first exit and wait for the ring to be available, so
need_wakeup() is set. If free_old_xmit() is called first by start_xmit(),
we can quickly wake up napi to execute xsk xmit task.

When recycling, we need to count the number of bytes sent, so put xsk
desc->len into the ptr pointer. Because ptr does not point to meaningful
objects in xsk.

Signed-off-by: Xuan Zhuo <xuanzhuo@linux.alibaba.com>
Reviewed-by: Dust Li <dust.li@linux.alibaba.com>
---
 drivers/net/virtio_net.c | 296 ++++++++++++++++++++++++++++++++++++++-
 1 file changed, 292 insertions(+), 4 deletions(-)

diff --git a/drivers/net/virtio_net.c b/drivers/net/virtio_net.c
index 8242a9e9f17d..c441d6bf1510 100644
--- a/drivers/net/virtio_net.c
+++ b/drivers/net/virtio_net.c
@@ -46,6 +46,11 @@ module_param(napi_tx, bool, 0644);
 #define VIRTIO_XDP_REDIR	BIT(1)
 
 #define VIRTIO_XDP_FLAG	BIT(0)
+#define VIRTIO_XSK_FLAG	BIT(1)
+
+#define VIRTIO_XSK_PTR_SHIFT       4
+
+static struct virtio_net_hdr_mrg_rxbuf xsk_hdr;
 
 /* RX packet size EWMA. The average packet size is used to determine the packet
  * buffer size when refilling RX rings. As the entire RX ring may be refilled
@@ -138,6 +143,12 @@ struct send_queue {
 	struct {
 		/* xsk pool */
 		struct xsk_buff_pool __rcu *pool;
+
+		/* save the desc for next xmit, when xmit fail. */
+		struct xdp_desc last_desc;
+
+		/* xsk wait for tx inter or softirq */
+		bool need_wakeup;
 	} xsk;
 };
 
@@ -255,6 +266,15 @@ struct padded_vnet_hdr {
 	char padding[4];
 };
 
+static int virtnet_xsk_run(struct send_queue *sq, struct xsk_buff_pool *pool,
+			   int budget, bool in_napi);
+static void virtnet_xsk_complete(struct send_queue *sq, u32 num);
+
+static bool is_skb_ptr(void *ptr)
+{
+	return !((unsigned long)ptr & (VIRTIO_XDP_FLAG | VIRTIO_XSK_FLAG));
+}
+
 static bool is_xdp_frame(void *ptr)
 {
 	return (unsigned long)ptr & VIRTIO_XDP_FLAG;
@@ -265,6 +285,19 @@ static void *xdp_to_ptr(struct xdp_frame *ptr)
 	return (void *)((unsigned long)ptr | VIRTIO_XDP_FLAG);
 }
 
+static void *xsk_to_ptr(struct xdp_desc *desc)
+{
+	/* save the desc len to ptr */
+	u64 p = desc->len << VIRTIO_XSK_PTR_SHIFT;
+
+	return (void *)((unsigned long)p | VIRTIO_XSK_FLAG);
+}
+
+static void ptr_to_xsk(void *ptr, struct xdp_desc *desc)
+{
+	desc->len = ((unsigned long)ptr) >> VIRTIO_XSK_PTR_SHIFT;
+}
+
 static struct xdp_frame *ptr_to_xdp(void *ptr)
 {
 	return (struct xdp_frame *)((unsigned long)ptr & ~VIRTIO_XDP_FLAG);
@@ -273,25 +306,35 @@ static struct xdp_frame *ptr_to_xdp(void *ptr)
 static void __free_old_xmit(struct send_queue *sq, bool in_napi,
 			    struct virtnet_sq_stats *stats)
 {
+	unsigned int xsknum = 0;
 	unsigned int len;
 	void *ptr;
 
 	while ((ptr = virtqueue_get_buf(sq->vq, &len)) != NULL) {
-		if (likely(!is_xdp_frame(ptr))) {
+		if (is_skb_ptr(ptr)) {
 			struct sk_buff *skb = ptr;
 
 			pr_debug("Sent skb %p\n", skb);
 
 			stats->bytes += skb->len;
 			napi_consume_skb(skb, in_napi);
-		} else {
+		} else if (is_xdp_frame(ptr)) {
 			struct xdp_frame *frame = ptr_to_xdp(ptr);
 
 			stats->bytes += frame->len;
 			xdp_return_frame(frame);
+		} else {
+			struct xdp_desc desc;
+
+			ptr_to_xsk(ptr, &desc);
+			stats->bytes += desc.len;
+			++xsknum;
 		}
 		stats->packets++;
 	}
+
+	if (xsknum)
+		virtnet_xsk_complete(sq, xsknum);
 }
 
 /* Converting between virtqueue no. and kernel tx/rx queue no.
@@ -1529,6 +1572,19 @@ static int virtnet_open(struct net_device *dev)
 	return 0;
 }
 
+static int virtnet_poll_xsk(struct send_queue *sq, int budget)
+{
+	struct xsk_buff_pool *pool;
+	int work_done = 0;
+
+	rcu_read_lock();
+	pool = rcu_dereference(sq->xsk.pool);
+	if (pool)
+		work_done = virtnet_xsk_run(sq, pool, budget, true);
+	rcu_read_unlock();
+	return work_done;
+}
+
 static int virtnet_poll_tx(struct napi_struct *napi, int budget)
 {
 	struct send_queue *sq = container_of(napi, struct send_queue, napi);
@@ -1545,6 +1601,7 @@ static int virtnet_poll_tx(struct napi_struct *napi, int budget)
 
 	txq = netdev_get_tx_queue(vi->dev, index);
 	__netif_tx_lock(txq, raw_smp_processor_id());
+	work_done += virtnet_poll_xsk(sq, budget);
 	free_old_xmit(sq, true);
 	__netif_tx_unlock(txq);
 
@@ -2535,6 +2592,234 @@ static int virtnet_xdp_set(struct net_device *dev, struct bpf_prog *prog,
 	return err;
 }
 
+static void virtnet_xsk_check_queue(struct send_queue *sq)
+{
+	struct virtnet_info *vi = sq->vq->vdev->priv;
+	struct net_device *dev = vi->dev;
+	int qnum = sq - vi->sq;
+
+	/* If this sq is not the exclusive queue of the current cpu,
+	 * then it may be called by start_xmit, so check it running out
+	 * of space.
+	 *
+	 * And if it is a raw buffer queue, it does not check whether the status
+	 * of the queue is stopped when sending. So there is no need to check
+	 * the situation of the raw buffer queue.
+	 */
+	if (is_xdp_raw_buffer_queue(vi, qnum))
+		return;
+
+	/* Stop the queue to avoid getting packets that we are
+	 * then unable to transmit. Then wait the tx interrupt.
+	 */
+	if (sq->vq->num_free < 2 + MAX_SKB_FRAGS)
+		netif_stop_subqueue(dev, qnum);
+}
+
+static void virtnet_xsk_complete(struct send_queue *sq, u32 num)
+{
+	struct xsk_buff_pool *pool;
+
+	rcu_read_lock();
+
+	pool = rcu_dereference(sq->xsk.pool);
+	if (!pool) {
+		rcu_read_unlock();
+		return;
+	}
+	xsk_tx_completed(pool, num);
+	rcu_read_unlock();
+
+	if (sq->xsk.need_wakeup) {
+		sq->xsk.need_wakeup = false;
+		virtqueue_napi_schedule(&sq->napi, sq->vq);
+	}
+}
+
+static int virtnet_xsk_xmit(struct send_queue *sq, struct xsk_buff_pool *pool,
+			    struct xdp_desc *desc)
+{
+	struct virtnet_info *vi;
+	u32 offset, n, len;
+	struct page *page;
+	void *data;
+	u64 addr;
+	int err;
+
+	vi = sq->vq->vdev->priv;
+	addr = desc->addr;
+
+	data = xsk_buff_raw_get_data(pool, addr);
+	offset = offset_in_page(data);
+
+	/* xsk unaligned mode, desc may use two pages */
+	if (desc->len > PAGE_SIZE - offset)
+		n = 3;
+	else
+		n = 2;
+
+	sg_init_table(sq->sg, n);
+	sg_set_buf(sq->sg, &xsk_hdr, vi->hdr_len);
+
+	/* handle for xsk first page */
+	len = min_t(int, desc->len, PAGE_SIZE - offset);
+	page = xsk_buff_xdp_get_page(pool, addr);
+	sg_set_page(sq->sg + 1, page, len, offset);
+
+	/* xsk unaligned mode, handle for the second page */
+	if (len < desc->len) {
+		page = xsk_buff_xdp_get_page(pool, addr + len);
+		len = min_t(int, desc->len - len, PAGE_SIZE);
+		sg_set_page(sq->sg + 2, page, len, 0);
+	}
+
+	err = virtqueue_add_outbuf(sq->vq, sq->sg, n, xsk_to_ptr(desc),
+				   GFP_ATOMIC);
+	if (unlikely(err))
+		sq->xsk.last_desc = *desc;
+
+	return err;
+}
+
+static int virtnet_xsk_xmit_batch(struct send_queue *sq,
+				  struct xsk_buff_pool *pool,
+				  unsigned int budget,
+				  bool in_napi, int *done,
+				  struct virtnet_sq_stats *stats)
+{
+	struct xdp_desc desc;
+	int err, packet = 0;
+	int ret = -EAGAIN;
+
+	if (sq->xsk.last_desc.addr) {
+		if (sq->vq->num_free < 2 + MAX_SKB_FRAGS)
+			return -EBUSY;
+
+		err = virtnet_xsk_xmit(sq, pool, &sq->xsk.last_desc);
+		if (unlikely(err))
+			return -EBUSY;
+
+		++packet;
+		--budget;
+		sq->xsk.last_desc.addr = 0;
+	}
+
+	while (budget-- > 0) {
+		if (sq->vq->num_free < 2 + MAX_SKB_FRAGS) {
+			ret = -EBUSY;
+			break;
+		}
+
+		if (!xsk_tx_peek_desc(pool, &desc)) {
+			/* done */
+			ret = 0;
+			break;
+		}
+
+		err = virtnet_xsk_xmit(sq, pool, &desc);
+		if (unlikely(err)) {
+			ret = -EBUSY;
+			break;
+		}
+
+		++packet;
+	}
+
+	if (packet) {
+		if (virtqueue_kick_prepare(sq->vq) && virtqueue_notify(sq->vq))
+			++stats->kicks;
+
+		*done = packet;
+		stats->xdp_tx += packet;
+
+		xsk_tx_release(pool);
+	}
+
+	return ret;
+}
+
+static int virtnet_xsk_run(struct send_queue *sq, struct xsk_buff_pool *pool,
+			   int budget, bool in_napi)
+{
+	struct virtnet_sq_stats stats = {};
+	int done = 0;
+	int err;
+
+	sq->xsk.need_wakeup = false;
+	__free_old_xmit(sq, in_napi, &stats);
+
+	/* return err:
+	 * -EAGAIN: done == budget
+	 * -EBUSY:  done < budget
+	 *  0    :  done < budget
+	 */
+	err = virtnet_xsk_xmit_batch(sq, pool, budget, in_napi, &done, &stats);
+	if (err == -EBUSY) {
+		__free_old_xmit(sq, in_napi, &stats);
+
+		/* If the space is enough, let napi run again. */
+		if (sq->vq->num_free >= 2 + MAX_SKB_FRAGS)
+			done = budget;
+		else
+			sq->xsk.need_wakeup = true;
+	}
+
+	virtnet_xsk_check_queue(sq);
+
+	u64_stats_update_begin(&sq->stats.syncp);
+	sq->stats.packets += stats.packets;
+	sq->stats.bytes += stats.bytes;
+	sq->stats.kicks += stats.kicks;
+	sq->stats.xdp_tx += stats.xdp_tx;
+	u64_stats_update_end(&sq->stats.syncp);
+
+	return done;
+}
+
+static int virtnet_xsk_wakeup(struct net_device *dev, u32 qid, u32 flag)
+{
+	struct virtnet_info *vi = netdev_priv(dev);
+	struct xsk_buff_pool *pool;
+	struct netdev_queue *txq;
+	struct send_queue *sq;
+
+	if (!netif_running(dev))
+		return -ENETDOWN;
+
+	if (qid >= vi->curr_queue_pairs)
+		return -EINVAL;
+
+	sq = &vi->sq[qid];
+
+	rcu_read_lock();
+
+	pool = rcu_dereference(sq->xsk.pool);
+	if (!pool)
+		goto end;
+
+	if (napi_if_scheduled_mark_missed(&sq->napi))
+		goto end;
+
+	txq = netdev_get_tx_queue(dev, qid);
+
+	__netif_tx_lock_bh(txq);
+
+	/* Send part of the packet directly to reduce the delay in sending the
+	 * packet, and this can actively trigger the tx interrupts.
+	 *
+	 * If no packet is sent out, the ring of the device is full. In this
+	 * case, we will still get a tx interrupt response. Then we will deal
+	 * with the subsequent packet sending work.
+	 */
+	virtnet_xsk_run(sq, pool, napi_weight, false);
+
+	__netif_tx_unlock_bh(txq);
+
+end:
+	rcu_read_unlock();
+	return 0;
+}
+
 static int virtnet_xsk_pool_enable(struct net_device *dev,
 				   struct xsk_buff_pool *pool,
 				   u16 qid)
@@ -2559,6 +2844,8 @@ static int virtnet_xsk_pool_enable(struct net_device *dev,
 		return -EPERM;
 
 	rcu_read_lock();
+	memset(&sq->xsk, 0, sizeof(sq->xsk));
+
 	/* Here is already protected by rtnl_lock, so rcu_assign_pointer is
 	 * safe.
 	 */
@@ -2658,6 +2945,7 @@ static const struct net_device_ops virtnet_netdev = {
 	.ndo_vlan_rx_kill_vid = virtnet_vlan_rx_kill_vid,
 	.ndo_bpf		= virtnet_xdp,
 	.ndo_xdp_xmit		= virtnet_xdp_xmit,
+	.ndo_xsk_wakeup         = virtnet_xsk_wakeup,
 	.ndo_features_check	= passthru_features_check,
 	.ndo_get_phys_port_name	= virtnet_get_phys_port_name,
 	.ndo_set_features	= virtnet_set_features,
@@ -2761,9 +3049,9 @@ static void free_unused_bufs(struct virtnet_info *vi)
 	for (i = 0; i < vi->max_queue_pairs; i++) {
 		struct virtqueue *vq = vi->sq[i].vq;
 		while ((buf = virtqueue_detach_unused_buf(vq)) != NULL) {
-			if (!is_xdp_frame(buf))
+			if (is_skb_ptr(buf))
 				dev_kfree_skb(buf);
-			else
+			else if (is_xdp_frame(buf))
 				xdp_return_frame(ptr_to_xdp(buf));
 		}
 	}
-- 
2.31.0


^ permalink raw reply related	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2021-04-19  2:46 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
     [not found] <1618482470.2631352-3-xuanzhuo@linux.alibaba.com>
2021-04-16  5:35 ` [PATCH net-next v4 09/10] virtio-net: xsk zero copy xmit implement wakeup and xmit Jason Wang
     [not found] <1618554586.8830593-2-xuanzhuo@linux.alibaba.com>
2021-04-19  2:46 ` Jason Wang
2021-04-13  3:15 [PATCH net-next v4 00/10] virtio-net support xdp socket zero copy xmit Xuan Zhuo
2021-04-13  3:15 ` [PATCH net-next v4 09/10] virtio-net: xsk zero copy xmit implement wakeup and xmit Xuan Zhuo
2021-04-14  5:46   ` Jason Wang

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).