linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH net-next] tuntap: introduce tx skb ring
@ 2016-05-16  1:17 Jason Wang
  2016-05-16  3:56 ` Eric Dumazet
  2016-05-16  4:23 ` Michael S. Tsirkin
  0 siblings, 2 replies; 21+ messages in thread
From: Jason Wang @ 2016-05-16  1:17 UTC (permalink / raw)
  To: davem, mst; +Cc: netdev, linux-kernel, Jason Wang

We used to queue tx packets in sk_receive_queue, this is less
efficient since it requires spinlocks to synchronize between producer
and consumer.

This patch tries to address this by using circular buffer which allows
lockless synchronization. This is done by switching from
sk_receive_queue to a tx skb ring with a new flag IFF_TX_RING and when
this is set:

- store pointer to skb in circular buffer in tun_net_xmit(), and read
  it from the circular buffer in tun_do_read().
- introduce a new proto_ops peek which could be implemented by
  specific socket which does not use sk_receive_queue.
- store skb length in circular buffer too, and implement a lockless
  peek for tuntap.
- change vhost_net to use proto_ops->peek() instead
- new spinlocks were introduced to synchronize among producers (and so
  did for consumers).

Pktgen test shows about 9% improvement on guest receiving pps:

Before: ~1480000pps
After : ~1610000pps

(I'm not sure noblocking read is still needed, so it was not included
 in this patch)
Signed-off-by: Jason Wang <jasowang@redhat.com>
---
---
 drivers/net/tun.c           | 157 +++++++++++++++++++++++++++++++++++++++++---
 drivers/vhost/net.c         |  16 ++++-
 include/linux/net.h         |   1 +
 include/uapi/linux/if_tun.h |   1 +
 4 files changed, 165 insertions(+), 10 deletions(-)

diff --git a/drivers/net/tun.c b/drivers/net/tun.c
index 425e983..6001ece 100644
--- a/drivers/net/tun.c
+++ b/drivers/net/tun.c
@@ -71,6 +71,7 @@
 #include <net/sock.h>
 #include <linux/seq_file.h>
 #include <linux/uio.h>
+#include <linux/circ_buf.h>
 
 #include <asm/uaccess.h>
 
@@ -130,6 +131,8 @@ struct tap_filter {
 #define MAX_TAP_FLOWS  4096
 
 #define TUN_FLOW_EXPIRE (3 * HZ)
+#define TUN_RING_SIZE 256
+#define TUN_RING_MASK (TUN_RING_SIZE - 1)
 
 struct tun_pcpu_stats {
 	u64 rx_packets;
@@ -142,6 +145,11 @@ struct tun_pcpu_stats {
 	u32 rx_frame_errors;
 };
 
+struct tun_desc {
+	struct sk_buff *skb;
+	int len; /* Cached skb len for peeking */
+};
+
 /* A tun_file connects an open character device to a tuntap netdevice. It
  * also contains all socket related structures (except sock_fprog and tap_filter)
  * to serve as one transmit queue for tuntap device. The sock_fprog and
@@ -167,6 +175,13 @@ struct tun_file {
 	};
 	struct list_head next;
 	struct tun_struct *detached;
+	/* reader lock */
+	spinlock_t rlock;
+	unsigned long tail;
+	struct tun_desc tx_descs[TUN_RING_SIZE];
+	/* writer lock */
+	spinlock_t wlock;
+	unsigned long head;
 };
 
 struct tun_flow_entry {
@@ -515,7 +530,27 @@ static struct tun_struct *tun_enable_queue(struct tun_file *tfile)
 
 static void tun_queue_purge(struct tun_file *tfile)
 {
+	unsigned long head, tail;
+	struct tun_desc *desc;
+	struct sk_buff *skb;
 	skb_queue_purge(&tfile->sk.sk_receive_queue);
+	spin_lock(&tfile->rlock);
+
+	head = ACCESS_ONCE(tfile->head);
+	tail = tfile->tail;
+
+	/* read tail before reading descriptor at tail */
+	smp_rmb();
+
+	while (CIRC_CNT(head, tail, TUN_RING_SIZE) >= 1) {
+		desc = &tfile->tx_descs[tail];
+		skb = desc->skb;
+		kfree_skb(skb);
+		tail = (tail + 1) & TUN_RING_MASK;
+		/* read descriptor before incrementing tail. */
+		smp_store_release(&tfile->tail, tail & TUN_RING_MASK);
+	}
+	spin_unlock(&tfile->rlock);
 	skb_queue_purge(&tfile->sk.sk_error_queue);
 }
 
@@ -824,6 +859,7 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
 	int txq = skb->queue_mapping;
 	struct tun_file *tfile;
 	u32 numqueues = 0;
+	unsigned long flags;
 
 	rcu_read_lock();
 	tfile = rcu_dereference(tun->tfiles[txq]);
@@ -888,8 +924,35 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
 
 	nf_reset(skb);
 
-	/* Enqueue packet */
-	skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
+	if (tun->flags & IFF_TX_RING) {
+		unsigned long head, tail;
+
+		spin_lock_irqsave(&tfile->wlock, flags);
+
+		head = tfile->head;
+		tail = ACCESS_ONCE(tfile->tail);
+
+		if (CIRC_SPACE(head, tail, TUN_RING_SIZE) >= 1) {
+			struct tun_desc *desc = &tfile->tx_descs[head];
+
+			desc->skb = skb;
+			desc->len = skb->len;
+			if (skb_vlan_tag_present(skb))
+				desc->len += VLAN_HLEN;
+
+			/* read descriptor before incrementing head. */
+			smp_store_release(&tfile->head,
+					  (head + 1) & TUN_RING_MASK);
+		} else {
+			spin_unlock_irqrestore(&tfile->wlock, flags);
+			goto drop;
+		}
+
+		spin_unlock_irqrestore(&tfile->wlock, flags);
+	} else {
+		/* Enqueue packet */
+		skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
+	}
 
 	/* Notify and wake up reader process */
 	if (tfile->flags & TUN_FASYNC)
@@ -1085,6 +1148,13 @@ static void tun_net_init(struct net_device *dev)
 	}
 }
 
+static bool tun_queue_not_empty(struct tun_file *tfile)
+{
+	struct sock *sk = tfile->socket.sk;
+
+	return (!skb_queue_empty(&sk->sk_receive_queue) ||
+		ACCESS_ONCE(tfile->head) != ACCESS_ONCE(tfile->tail));
+}
 /* Character device part */
 
 /* Poll */
@@ -1104,7 +1174,7 @@ static unsigned int tun_chr_poll(struct file *file, poll_table *wait)
 
 	poll_wait(file, sk_sleep(sk), wait);
 
-	if (!skb_queue_empty(&sk->sk_receive_queue))
+	if (tun_queue_not_empty(tfile))
 		mask |= POLLIN | POLLRDNORM;
 
 	if (sock_writeable(sk) ||
@@ -1494,11 +1564,36 @@ static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile,
 	if (tun->dev->reg_state != NETREG_REGISTERED)
 		return -EIO;
 
-	/* Read frames from queue */
-	skb = __skb_recv_datagram(tfile->socket.sk, noblock ? MSG_DONTWAIT : 0,
-				  &peeked, &off, &err);
-	if (!skb)
-		return err;
+	if (tun->flags & IFF_TX_RING) {
+		unsigned long head, tail;
+		struct tun_desc *desc;
+
+		spin_lock(&tfile->rlock);
+		head = ACCESS_ONCE(tfile->head);
+		tail = tfile->tail;
+
+		if (CIRC_CNT(head, tail, TUN_RING_SIZE) >= 1) {
+			/* read tail before reading descriptor at tail */
+			smp_rmb();
+			desc = &tfile->tx_descs[tail];
+			skb = desc->skb;
+			/* read descriptor before incrementing tail. */
+			smp_store_release(&tfile->tail,
+					  (tail + 1) & TUN_RING_MASK);
+		} else {
+			spin_unlock(&tfile->rlock);
+			return -EAGAIN;
+		}
+
+		spin_unlock(&tfile->rlock);
+	} else {
+		/* Read frames from queue */
+		skb = __skb_recv_datagram(tfile->socket.sk,
+					  noblock ? MSG_DONTWAIT : 0,
+					  &peeked, &off, &err);
+		if (!skb)
+			return err;
+	}
 
 	ret = tun_put_user(tun, tfile, skb, to);
 	if (unlikely(ret < 0))
@@ -1629,8 +1724,47 @@ out:
 	return ret;
 }
 
+static int tun_peek(struct socket *sock, bool exact)
+{
+	struct tun_file *tfile = container_of(sock, struct tun_file, socket);
+	struct sock *sk = sock->sk;
+	struct tun_struct *tun;
+	int ret = 0;
+
+	if (!exact)
+		return tun_queue_not_empty(tfile);
+
+	tun = __tun_get(tfile);
+	if (!tun)
+		return 0;
+
+	if (tun->flags & IFF_TX_RING) {
+		unsigned long head = ACCESS_ONCE(tfile->head);
+		unsigned long tail = ACCESS_ONCE(tfile->tail);
+
+		if (head != tail)
+			ret = tfile->tx_descs[tail].len;
+	} else {
+		struct sk_buff *head;
+		unsigned long flags;
+
+		spin_lock_irqsave(&sk->sk_receive_queue.lock, flags);
+		head = skb_peek(&sk->sk_receive_queue);
+		if (likely(head)) {
+			ret = head->len;
+			if (skb_vlan_tag_present(head))
+				ret += VLAN_HLEN;
+		}
+		spin_unlock_irqrestore(&sk->sk_receive_queue.lock, flags);
+	}
+
+	tun_put(tun);
+	return ret;
+}
+
 /* Ops structure to mimic raw sockets with tun */
 static const struct proto_ops tun_socket_ops = {
+	.peek    = tun_peek,
 	.sendmsg = tun_sendmsg,
 	.recvmsg = tun_recvmsg,
 };
@@ -2306,13 +2440,13 @@ static int tun_chr_open(struct inode *inode, struct file * file)
 {
 	struct net *net = current->nsproxy->net_ns;
 	struct tun_file *tfile;
-
 	DBG1(KERN_INFO, "tunX: tun_chr_open\n");
 
 	tfile = (struct tun_file *)sk_alloc(net, AF_UNSPEC, GFP_KERNEL,
 					    &tun_proto, 0);
 	if (!tfile)
 		return -ENOMEM;
+
 	RCU_INIT_POINTER(tfile->tun, NULL);
 	tfile->flags = 0;
 	tfile->ifindex = 0;
@@ -2332,6 +2466,11 @@ static int tun_chr_open(struct inode *inode, struct file * file)
 	INIT_LIST_HEAD(&tfile->next);
 
 	sock_set_flag(&tfile->sk, SOCK_ZEROCOPY);
+	tfile->head = 0;
+	tfile->tail = 0;
+
+	spin_lock_init(&tfile->rlock);
+	spin_lock_init(&tfile->wlock);
 
 	return 0;
 }
diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
index f744eeb..10ff494 100644
--- a/drivers/vhost/net.c
+++ b/drivers/vhost/net.c
@@ -455,10 +455,14 @@ out:
 
 static int peek_head_len(struct sock *sk)
 {
+	struct socket *sock = sk->sk_socket;
 	struct sk_buff *head;
 	int len = 0;
 	unsigned long flags;
 
+	if (sock->ops->peek)
+		return sock->ops->peek(sock, true);
+
 	spin_lock_irqsave(&sk->sk_receive_queue.lock, flags);
 	head = skb_peek(&sk->sk_receive_queue);
 	if (likely(head)) {
@@ -471,6 +475,16 @@ static int peek_head_len(struct sock *sk)
 	return len;
 }
 
+static int sk_has_rx_data(struct sock *sk)
+{
+	struct socket *sock = sk->sk_socket;
+
+	if (sock->ops->peek)
+		return sock->ops->peek(sock, false);
+
+	return skb_queue_empty(&sk->sk_receive_queue);
+}
+
 static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
 {
 	struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX];
@@ -487,7 +501,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
 		endtime = busy_clock() + vq->busyloop_timeout;
 
 		while (vhost_can_busy_poll(&net->dev, endtime) &&
-		       skb_queue_empty(&sk->sk_receive_queue) &&
+		       !sk_has_rx_data(sk) &&
 		       vhost_vq_avail_empty(&net->dev, vq))
 			cpu_relax_lowlatency();
 
diff --git a/include/linux/net.h b/include/linux/net.h
index 72c1e06..3c4ecd5 100644
--- a/include/linux/net.h
+++ b/include/linux/net.h
@@ -132,6 +132,7 @@ struct module;
 struct proto_ops {
 	int		family;
 	struct module	*owner;
+	int		(*peek) (struct socket *sock, bool exact);
 	int		(*release)   (struct socket *sock);
 	int		(*bind)	     (struct socket *sock,
 				      struct sockaddr *myaddr,
diff --git a/include/uapi/linux/if_tun.h b/include/uapi/linux/if_tun.h
index 3cb5e1d..d64ddc1 100644
--- a/include/uapi/linux/if_tun.h
+++ b/include/uapi/linux/if_tun.h
@@ -61,6 +61,7 @@
 #define IFF_TUN		0x0001
 #define IFF_TAP		0x0002
 #define IFF_NO_PI	0x1000
+#define IFF_TX_RING	0x0010
 /* This flag has no real effect */
 #define IFF_ONE_QUEUE	0x2000
 #define IFF_VNET_HDR	0x4000
-- 
2.7.4

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* Re: [PATCH net-next] tuntap: introduce tx skb ring
  2016-05-16  1:17 [PATCH net-next] tuntap: introduce tx skb ring Jason Wang
@ 2016-05-16  3:56 ` Eric Dumazet
  2016-05-16  7:51   ` Jason Wang
  2016-05-16  4:23 ` Michael S. Tsirkin
  1 sibling, 1 reply; 21+ messages in thread
From: Eric Dumazet @ 2016-05-16  3:56 UTC (permalink / raw)
  To: Jason Wang; +Cc: davem, mst, netdev, linux-kernel

On Mon, 2016-05-16 at 09:17 +0800, Jason Wang wrote:
> We used to queue tx packets in sk_receive_queue, this is less
> efficient since it requires spinlocks to synchronize between producer
> and consumer.

...

>  	struct tun_struct *detached;
> +	/* reader lock */
> +	spinlock_t rlock;
> +	unsigned long tail;
> +	struct tun_desc tx_descs[TUN_RING_SIZE];
> +	/* writer lock */
> +	spinlock_t wlock;
> +	unsigned long head;
>  };
>  

Ok, we had these kind of ideas floating around for many other cases,
like qdisc, UDP or af_packet sockets...

I believe we should have a common set of helpers, not hidden in
drivers/net/tun.c but in net/core/skb_ring.c or something, with more
flexibility (like the number of slots)


BTW, why are you using spin_lock_irqsave() in tun_net_xmit() and
tun_peek() ?

BH should be disabled already (in tun_next_xmit()), and we can not
transmit from hard irq.

Thanks.

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH net-next] tuntap: introduce tx skb ring
  2016-05-16  1:17 [PATCH net-next] tuntap: introduce tx skb ring Jason Wang
  2016-05-16  3:56 ` Eric Dumazet
@ 2016-05-16  4:23 ` Michael S. Tsirkin
  2016-05-16  7:52   ` Jason Wang
  1 sibling, 1 reply; 21+ messages in thread
From: Michael S. Tsirkin @ 2016-05-16  4:23 UTC (permalink / raw)
  To: Jason Wang; +Cc: davem, netdev, linux-kernel

On Mon, May 16, 2016 at 09:17:01AM +0800, Jason Wang wrote:
> We used to queue tx packets in sk_receive_queue, this is less
> efficient since it requires spinlocks to synchronize between producer
> and consumer.
> 
> This patch tries to address this by using circular buffer which allows
> lockless synchronization. This is done by switching from
> sk_receive_queue to a tx skb ring with a new flag IFF_TX_RING and when
> this is set:

Why do we need a new flag? Is there a userspace-visible
behaviour change?

> 
> - store pointer to skb in circular buffer in tun_net_xmit(), and read
>   it from the circular buffer in tun_do_read().
> - introduce a new proto_ops peek which could be implemented by
>   specific socket which does not use sk_receive_queue.
> - store skb length in circular buffer too, and implement a lockless
>   peek for tuntap.
> - change vhost_net to use proto_ops->peek() instead
> - new spinlocks were introduced to synchronize among producers (and so
>   did for consumers).
> 
> Pktgen test shows about 9% improvement on guest receiving pps:
> 
> Before: ~1480000pps
> After : ~1610000pps
> 
> (I'm not sure noblocking read is still needed, so it was not included
>  in this patch)

How do you mean? Of course we must support blocking and non-blocking
read - userspace uses it.

> Signed-off-by: Jason Wang <jasowang@redhat.com>
> ---
> ---
>  drivers/net/tun.c           | 157 +++++++++++++++++++++++++++++++++++++++++---
>  drivers/vhost/net.c         |  16 ++++-
>  include/linux/net.h         |   1 +
>  include/uapi/linux/if_tun.h |   1 +
>  4 files changed, 165 insertions(+), 10 deletions(-)
> 
> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
> index 425e983..6001ece 100644
> --- a/drivers/net/tun.c
> +++ b/drivers/net/tun.c
> @@ -71,6 +71,7 @@
>  #include <net/sock.h>
>  #include <linux/seq_file.h>
>  #include <linux/uio.h>
> +#include <linux/circ_buf.h>
>  
>  #include <asm/uaccess.h>
>  
> @@ -130,6 +131,8 @@ struct tap_filter {
>  #define MAX_TAP_FLOWS  4096
>  
>  #define TUN_FLOW_EXPIRE (3 * HZ)
> +#define TUN_RING_SIZE 256

Can we resize this according to tx_queue_len set by user?

> +#define TUN_RING_MASK (TUN_RING_SIZE - 1)
>  
>  struct tun_pcpu_stats {
>  	u64 rx_packets;
> @@ -142,6 +145,11 @@ struct tun_pcpu_stats {
>  	u32 rx_frame_errors;
>  };
>  
> +struct tun_desc {
> +	struct sk_buff *skb;
> +	int len; /* Cached skb len for peeking */
> +};
> +
>  /* A tun_file connects an open character device to a tuntap netdevice. It
>   * also contains all socket related structures (except sock_fprog and tap_filter)
>   * to serve as one transmit queue for tuntap device. The sock_fprog and
> @@ -167,6 +175,13 @@ struct tun_file {
>  	};
>  	struct list_head next;
>  	struct tun_struct *detached;
> +	/* reader lock */
> +	spinlock_t rlock;
> +	unsigned long tail;
> +	struct tun_desc tx_descs[TUN_RING_SIZE];
> +	/* writer lock */
> +	spinlock_t wlock;
> +	unsigned long head;
>  };
>  
>  struct tun_flow_entry {
> @@ -515,7 +530,27 @@ static struct tun_struct *tun_enable_queue(struct tun_file *tfile)
>  
>  static void tun_queue_purge(struct tun_file *tfile)
>  {
> +	unsigned long head, tail;
> +	struct tun_desc *desc;
> +	struct sk_buff *skb;
>  	skb_queue_purge(&tfile->sk.sk_receive_queue);
> +	spin_lock(&tfile->rlock);
> +
> +	head = ACCESS_ONCE(tfile->head);
> +	tail = tfile->tail;
> +
> +	/* read tail before reading descriptor at tail */
> +	smp_rmb();

I think you mean read *head* here


> +
> +	while (CIRC_CNT(head, tail, TUN_RING_SIZE) >= 1) {
> +		desc = &tfile->tx_descs[tail];
> +		skb = desc->skb;
> +		kfree_skb(skb);
> +		tail = (tail + 1) & TUN_RING_MASK;
> +		/* read descriptor before incrementing tail. */
> +		smp_store_release(&tfile->tail, tail & TUN_RING_MASK);
> +	}
> +	spin_unlock(&tfile->rlock);
>  	skb_queue_purge(&tfile->sk.sk_error_queue);
>  }
>

Barrier pairing seems messed up. Could you tag
each barrier with its pair pls?
E.g. add /* Barrier A for pairing */ Before barrier and
its pair.
  
> @@ -824,6 +859,7 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
>  	int txq = skb->queue_mapping;
>  	struct tun_file *tfile;
>  	u32 numqueues = 0;
> +	unsigned long flags;
>  
>  	rcu_read_lock();
>  	tfile = rcu_dereference(tun->tfiles[txq]);
> @@ -888,8 +924,35 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
>  
>  	nf_reset(skb);
>  
> -	/* Enqueue packet */
> -	skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
> +	if (tun->flags & IFF_TX_RING) {
> +		unsigned long head, tail;
> +
> +		spin_lock_irqsave(&tfile->wlock, flags);
> +
> +		head = tfile->head;
> +		tail = ACCESS_ONCE(tfile->tail);

this should be acquire

> +
> +		if (CIRC_SPACE(head, tail, TUN_RING_SIZE) >= 1) {
> +			struct tun_desc *desc = &tfile->tx_descs[head];
> +
> +			desc->skb = skb;
> +			desc->len = skb->len;
> +			if (skb_vlan_tag_present(skb))
> +				desc->len += VLAN_HLEN;
> +
> +			/* read descriptor before incrementing head. */

write descriptor. so smp_wmb is enough.

> +			smp_store_release(&tfile->head,
> +					  (head + 1) & TUN_RING_MASK);
> +		} else {
> +			spin_unlock_irqrestore(&tfile->wlock, flags);
> +			goto drop;
> +		}
> +
> +		spin_unlock_irqrestore(&tfile->wlock, flags);
> +	} else {
> +		/* Enqueue packet */
> +		skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
> +	}
>  
>  	/* Notify and wake up reader process */
>  	if (tfile->flags & TUN_FASYNC)
> @@ -1085,6 +1148,13 @@ static void tun_net_init(struct net_device *dev)
>  	}
>  }
>  
> +static bool tun_queue_not_empty(struct tun_file *tfile)
> +{
> +	struct sock *sk = tfile->socket.sk;
> +
> +	return (!skb_queue_empty(&sk->sk_receive_queue) ||
> +		ACCESS_ONCE(tfile->head) != ACCESS_ONCE(tfile->tail));
> +}
>  /* Character device part */
>  
>  /* Poll */
> @@ -1104,7 +1174,7 @@ static unsigned int tun_chr_poll(struct file *file, poll_table *wait)
>  
>  	poll_wait(file, sk_sleep(sk), wait);
>  
> -	if (!skb_queue_empty(&sk->sk_receive_queue))
> +	if (tun_queue_not_empty(tfile))
>  		mask |= POLLIN | POLLRDNORM;
>  
>  	if (sock_writeable(sk) ||
> @@ -1494,11 +1564,36 @@ static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile,
>  	if (tun->dev->reg_state != NETREG_REGISTERED)
>  		return -EIO;
>  
> -	/* Read frames from queue */
> -	skb = __skb_recv_datagram(tfile->socket.sk, noblock ? MSG_DONTWAIT : 0,
> -				  &peeked, &off, &err);
> -	if (!skb)
> -		return err;
> +	if (tun->flags & IFF_TX_RING) {
> +		unsigned long head, tail;
> +		struct tun_desc *desc;
> +
> +		spin_lock(&tfile->rlock);
> +		head = ACCESS_ONCE(tfile->head);
> +		tail = tfile->tail;
> +
> +		if (CIRC_CNT(head, tail, TUN_RING_SIZE) >= 1) {
> +			/* read tail before reading descriptor at tail */
> +			smp_rmb();
> +			desc = &tfile->tx_descs[tail];
> +			skb = desc->skb;
> +			/* read descriptor before incrementing tail. */
> +			smp_store_release(&tfile->tail,
> +					  (tail + 1) & TUN_RING_MASK);

same comments as purge, also - pls order code consistently.

> +		} else {
> +			spin_unlock(&tfile->rlock);
> +			return -EAGAIN;
> +		}
> +
> +		spin_unlock(&tfile->rlock);
> +	} else {
> +		/* Read frames from queue */
> +		skb = __skb_recv_datagram(tfile->socket.sk,
> +					  noblock ? MSG_DONTWAIT : 0,
> +					  &peeked, &off, &err);
> +		if (!skb)
> +			return err;
> +	}
>  
>  	ret = tun_put_user(tun, tfile, skb, to);
>  	if (unlikely(ret < 0))
> @@ -1629,8 +1724,47 @@ out:
>  	return ret;
>  }
>  
> +static int tun_peek(struct socket *sock, bool exact)
> +{
> +	struct tun_file *tfile = container_of(sock, struct tun_file, socket);
> +	struct sock *sk = sock->sk;
> +	struct tun_struct *tun;
> +	int ret = 0;
> +
> +	if (!exact)
> +		return tun_queue_not_empty(tfile);
> +
> +	tun = __tun_get(tfile);
> +	if (!tun)
> +		return 0;
> +
> +	if (tun->flags & IFF_TX_RING) {
> +		unsigned long head = ACCESS_ONCE(tfile->head);
> +		unsigned long tail = ACCESS_ONCE(tfile->tail);
> +
> +		if (head != tail)
> +			ret = tfile->tx_descs[tail].len;

no ordering at all here?

> +	} else {
> +		struct sk_buff *head;
> +		unsigned long flags;
> +
> +		spin_lock_irqsave(&sk->sk_receive_queue.lock, flags);
> +		head = skb_peek(&sk->sk_receive_queue);
> +		if (likely(head)) {
> +			ret = head->len;
> +			if (skb_vlan_tag_present(head))
> +				ret += VLAN_HLEN;
> +		}
> +		spin_unlock_irqrestore(&sk->sk_receive_queue.lock, flags);
> +	}
> +
> +	tun_put(tun);
> +	return ret;
> +}
> +
>  /* Ops structure to mimic raw sockets with tun */
>  static const struct proto_ops tun_socket_ops = {
> +	.peek    = tun_peek,
>  	.sendmsg = tun_sendmsg,
>  	.recvmsg = tun_recvmsg,
>  };
> @@ -2306,13 +2440,13 @@ static int tun_chr_open(struct inode *inode, struct file * file)
>  {
>  	struct net *net = current->nsproxy->net_ns;
>  	struct tun_file *tfile;
> -
>  	DBG1(KERN_INFO, "tunX: tun_chr_open\n");
>  
>  	tfile = (struct tun_file *)sk_alloc(net, AF_UNSPEC, GFP_KERNEL,
>  					    &tun_proto, 0);
>  	if (!tfile)
>  		return -ENOMEM;
> +
>  	RCU_INIT_POINTER(tfile->tun, NULL);
>  	tfile->flags = 0;
>  	tfile->ifindex = 0;
> @@ -2332,6 +2466,11 @@ static int tun_chr_open(struct inode *inode, struct file * file)
>  	INIT_LIST_HEAD(&tfile->next);
>  
>  	sock_set_flag(&tfile->sk, SOCK_ZEROCOPY);
> +	tfile->head = 0;
> +	tfile->tail = 0;
> +
> +	spin_lock_init(&tfile->rlock);
> +	spin_lock_init(&tfile->wlock);
>  
>  	return 0;
>  }
> diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
> index f744eeb..10ff494 100644
> --- a/drivers/vhost/net.c
> +++ b/drivers/vhost/net.c
> @@ -455,10 +455,14 @@ out:
>  
>  static int peek_head_len(struct sock *sk)
>  {
> +	struct socket *sock = sk->sk_socket;
>  	struct sk_buff *head;
>  	int len = 0;
>  	unsigned long flags;
>  
> +	if (sock->ops->peek)
> +		return sock->ops->peek(sock, true);
> +
>  	spin_lock_irqsave(&sk->sk_receive_queue.lock, flags);
>  	head = skb_peek(&sk->sk_receive_queue);
>  	if (likely(head)) {
> @@ -471,6 +475,16 @@ static int peek_head_len(struct sock *sk)
>  	return len;
>  }
>  
> +static int sk_has_rx_data(struct sock *sk)
> +{
> +	struct socket *sock = sk->sk_socket;
> +
> +	if (sock->ops->peek)
> +		return sock->ops->peek(sock, false);
> +
> +	return skb_queue_empty(&sk->sk_receive_queue);
> +}
> +
>  static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
>  {
>  	struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX];
> @@ -487,7 +501,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
>  		endtime = busy_clock() + vq->busyloop_timeout;
>  
>  		while (vhost_can_busy_poll(&net->dev, endtime) &&
> -		       skb_queue_empty(&sk->sk_receive_queue) &&
> +		       !sk_has_rx_data(sk) &&
>  		       vhost_vq_avail_empty(&net->dev, vq))
>  			cpu_relax_lowlatency();
>  
> diff --git a/include/linux/net.h b/include/linux/net.h
> index 72c1e06..3c4ecd5 100644
> --- a/include/linux/net.h
> +++ b/include/linux/net.h
> @@ -132,6 +132,7 @@ struct module;
>  struct proto_ops {
>  	int		family;
>  	struct module	*owner;
> +	int		(*peek) (struct socket *sock, bool exact);
>  	int		(*release)   (struct socket *sock);
>  	int		(*bind)	     (struct socket *sock,
>  				      struct sockaddr *myaddr,
> diff --git a/include/uapi/linux/if_tun.h b/include/uapi/linux/if_tun.h
> index 3cb5e1d..d64ddc1 100644
> --- a/include/uapi/linux/if_tun.h
> +++ b/include/uapi/linux/if_tun.h
> @@ -61,6 +61,7 @@
>  #define IFF_TUN		0x0001
>  #define IFF_TAP		0x0002
>  #define IFF_NO_PI	0x1000
> +#define IFF_TX_RING	0x0010
>  /* This flag has no real effect */
>  #define IFF_ONE_QUEUE	0x2000
>  #define IFF_VNET_HDR	0x4000
> -- 
> 2.7.4

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH net-next] tuntap: introduce tx skb ring
  2016-05-16  3:56 ` Eric Dumazet
@ 2016-05-16  7:51   ` Jason Wang
  2016-05-18  8:13     ` Jesper Dangaard Brouer
  0 siblings, 1 reply; 21+ messages in thread
From: Jason Wang @ 2016-05-16  7:51 UTC (permalink / raw)
  To: Eric Dumazet; +Cc: davem, mst, netdev, linux-kernel



On 2016年05月16日 11:56, Eric Dumazet wrote:
> On Mon, 2016-05-16 at 09:17 +0800, Jason Wang wrote:
>> We used to queue tx packets in sk_receive_queue, this is less
>> efficient since it requires spinlocks to synchronize between producer
>> and consumer.
> ...
>
>>   	struct tun_struct *detached;
>> +	/* reader lock */
>> +	spinlock_t rlock;
>> +	unsigned long tail;
>> +	struct tun_desc tx_descs[TUN_RING_SIZE];
>> +	/* writer lock */
>> +	spinlock_t wlock;
>> +	unsigned long head;
>>   };
>>   
> Ok, we had these kind of ideas floating around for many other cases,
> like qdisc, UDP or af_packet sockets...
>
> I believe we should have a common set of helpers, not hidden in
> drivers/net/tun.c but in net/core/skb_ring.c or something, with more
> flexibility (like the number of slots)
>

Yes, this sounds good.

> BTW, why are you using spin_lock_irqsave() in tun_net_xmit() and
> tun_peek() ?
>
> BH should be disabled already (in tun_next_xmit()), and we can not
> transmit from hard irq.
>
> Thanks.

Right, no need. But for tun_peek() we need spin_lock_bh() since it was 
called by vhost-net.

Thanks

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH net-next] tuntap: introduce tx skb ring
  2016-05-16  4:23 ` Michael S. Tsirkin
@ 2016-05-16  7:52   ` Jason Wang
  2016-05-16  8:08     ` Michael S. Tsirkin
  0 siblings, 1 reply; 21+ messages in thread
From: Jason Wang @ 2016-05-16  7:52 UTC (permalink / raw)
  To: Michael S. Tsirkin; +Cc: davem, netdev, linux-kernel



On 2016年05月16日 12:23, Michael S. Tsirkin wrote:
> On Mon, May 16, 2016 at 09:17:01AM +0800, Jason Wang wrote:
>> We used to queue tx packets in sk_receive_queue, this is less
>> efficient since it requires spinlocks to synchronize between producer
>> and consumer.
>>
>> This patch tries to address this by using circular buffer which allows
>> lockless synchronization. This is done by switching from
>> sk_receive_queue to a tx skb ring with a new flag IFF_TX_RING and when
>> this is set:
> Why do we need a new flag? Is there a userspace-visible
> behaviour change?

Probably yes since tx_queue_length does not work.

>
>> - store pointer to skb in circular buffer in tun_net_xmit(), and read
>>    it from the circular buffer in tun_do_read().
>> - introduce a new proto_ops peek which could be implemented by
>>    specific socket which does not use sk_receive_queue.
>> - store skb length in circular buffer too, and implement a lockless
>>    peek for tuntap.
>> - change vhost_net to use proto_ops->peek() instead
>> - new spinlocks were introduced to synchronize among producers (and so
>>    did for consumers).
>>
>> Pktgen test shows about 9% improvement on guest receiving pps:
>>
>> Before: ~1480000pps
>> After : ~1610000pps
>>
>> (I'm not sure noblocking read is still needed, so it was not included
>>   in this patch)
> How do you mean? Of course we must support blocking and non-blocking
> read - userspace uses it.

Ok, will add this.

>
>> Signed-off-by: Jason Wang <jasowang@redhat.com>
>> ---
>> ---
>>   drivers/net/tun.c           | 157 +++++++++++++++++++++++++++++++++++++++++---
>>   drivers/vhost/net.c         |  16 ++++-
>>   include/linux/net.h         |   1 +
>>   include/uapi/linux/if_tun.h |   1 +
>>   4 files changed, 165 insertions(+), 10 deletions(-)
>>
>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
>> index 425e983..6001ece 100644
>> --- a/drivers/net/tun.c
>> +++ b/drivers/net/tun.c
>> @@ -71,6 +71,7 @@
>>   #include <net/sock.h>
>>   #include <linux/seq_file.h>
>>   #include <linux/uio.h>
>> +#include <linux/circ_buf.h>
>>   
>>   #include <asm/uaccess.h>
>>   
>> @@ -130,6 +131,8 @@ struct tap_filter {
>>   #define MAX_TAP_FLOWS  4096
>>   
>>   #define TUN_FLOW_EXPIRE (3 * HZ)
>> +#define TUN_RING_SIZE 256
> Can we resize this according to tx_queue_len set by user?

We can, but it needs lots of other changes, e.g being notified when 
tx_queue_len was changed by user. And if tx_queue_length is not power of 
2, we probably need modulus to calculate the capacity.

>
>> +#define TUN_RING_MASK (TUN_RING_SIZE - 1)
>>   
>>   struct tun_pcpu_stats {
>>   	u64 rx_packets;
>> @@ -142,6 +145,11 @@ struct tun_pcpu_stats {
>>   	u32 rx_frame_errors;
>>   };
>>   
>> +struct tun_desc {
>> +	struct sk_buff *skb;
>> +	int len; /* Cached skb len for peeking */
>> +};
>> +
>>   /* A tun_file connects an open character device to a tuntap netdevice. It
>>    * also contains all socket related structures (except sock_fprog and tap_filter)
>>    * to serve as one transmit queue for tuntap device. The sock_fprog and
>> @@ -167,6 +175,13 @@ struct tun_file {
>>   	};
>>   	struct list_head next;
>>   	struct tun_struct *detached;
>> +	/* reader lock */
>> +	spinlock_t rlock;
>> +	unsigned long tail;
>> +	struct tun_desc tx_descs[TUN_RING_SIZE];
>> +	/* writer lock */
>> +	spinlock_t wlock;
>> +	unsigned long head;
>>   };
>>   
>>   struct tun_flow_entry {
>> @@ -515,7 +530,27 @@ static struct tun_struct *tun_enable_queue(struct tun_file *tfile)
>>   
>>   static void tun_queue_purge(struct tun_file *tfile)
>>   {
>> +	unsigned long head, tail;
>> +	struct tun_desc *desc;
>> +	struct sk_buff *skb;
>>   	skb_queue_purge(&tfile->sk.sk_receive_queue);
>> +	spin_lock(&tfile->rlock);
>> +
>> +	head = ACCESS_ONCE(tfile->head);
>> +	tail = tfile->tail;
>> +
>> +	/* read tail before reading descriptor at tail */
>> +	smp_rmb();
> I think you mean read *head* here

Right.

>
>
>> +
>> +	while (CIRC_CNT(head, tail, TUN_RING_SIZE) >= 1) {
>> +		desc = &tfile->tx_descs[tail];
>> +		skb = desc->skb;
>> +		kfree_skb(skb);
>> +		tail = (tail + 1) & TUN_RING_MASK;
>> +		/* read descriptor before incrementing tail. */
>> +		smp_store_release(&tfile->tail, tail & TUN_RING_MASK);
>> +	}
>> +	spin_unlock(&tfile->rlock);
>>   	skb_queue_purge(&tfile->sk.sk_error_queue);
>>   }
>>
> Barrier pairing seems messed up. Could you tag
> each barrier with its pair pls?
> E.g. add /* Barrier A for pairing */ Before barrier and
> its pair.

Ok.

for both tun_queue_purge() and tun_do_read():

smp_rmb() is paired with smp_store_release() in tun_net_xmit().
smp_store_release() is paired with spin_unlock()/spin_lock() in 
tun_net_xmit().

>    
>> @@ -824,6 +859,7 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
>>   	int txq = skb->queue_mapping;
>>   	struct tun_file *tfile;
>>   	u32 numqueues = 0;
>> +	unsigned long flags;
>>   
>>   	rcu_read_lock();
>>   	tfile = rcu_dereference(tun->tfiles[txq]);
>> @@ -888,8 +924,35 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
>>   
>>   	nf_reset(skb);
>>   
>> -	/* Enqueue packet */
>> -	skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
>> +	if (tun->flags & IFF_TX_RING) {
>> +		unsigned long head, tail;
>> +
>> +		spin_lock_irqsave(&tfile->wlock, flags);
>> +
>> +		head = tfile->head;
>> +		tail = ACCESS_ONCE(tfile->tail);
> this should be acquire

Consider there's always one slot left empty, so we need to produced two 
skbs here before we could corrupt consumer. So the 
spin_unlock()/spin_lock() provides ordering here?

>
>> +
>> +		if (CIRC_SPACE(head, tail, TUN_RING_SIZE) >= 1) {
>> +			struct tun_desc *desc = &tfile->tx_descs[head];
>> +
>> +			desc->skb = skb;
>> +			desc->len = skb->len;
>> +			if (skb_vlan_tag_present(skb))
>> +				desc->len += VLAN_HLEN;
>> +
>> +			/* read descriptor before incrementing head. */
> write descriptor.

Right.

>   so smp_wmb is enough.

I thought smp_store_release() was more lightweight than smp_wmb()?

>
>> +			smp_store_release(&tfile->head,
>> +					  (head + 1) & TUN_RING_MASK);
>> +		} else {
>> +			spin_unlock_irqrestore(&tfile->wlock, flags);
>> +			goto drop;
>> +		}
>> +
>> +		spin_unlock_irqrestore(&tfile->wlock, flags);
>> +	} else {
>> +		/* Enqueue packet */
>> +		skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
>> +	}
>>   
>>   	/* Notify and wake up reader process */
>>   	if (tfile->flags & TUN_FASYNC)
>> @@ -1085,6 +1148,13 @@ static void tun_net_init(struct net_device *dev)
>>   	}
>>   }
>>   
>> +static bool tun_queue_not_empty(struct tun_file *tfile)
>> +{
>> +	struct sock *sk = tfile->socket.sk;
>> +
>> +	return (!skb_queue_empty(&sk->sk_receive_queue) ||
>> +		ACCESS_ONCE(tfile->head) != ACCESS_ONCE(tfile->tail));
>> +}
>>   /* Character device part */
>>   
>>   /* Poll */
>> @@ -1104,7 +1174,7 @@ static unsigned int tun_chr_poll(struct file *file, poll_table *wait)
>>   
>>   	poll_wait(file, sk_sleep(sk), wait);
>>   
>> -	if (!skb_queue_empty(&sk->sk_receive_queue))
>> +	if (tun_queue_not_empty(tfile))
>>   		mask |= POLLIN | POLLRDNORM;
>>   
>>   	if (sock_writeable(sk) ||
>> @@ -1494,11 +1564,36 @@ static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile,
>>   	if (tun->dev->reg_state != NETREG_REGISTERED)
>>   		return -EIO;
>>   
>> -	/* Read frames from queue */
>> -	skb = __skb_recv_datagram(tfile->socket.sk, noblock ? MSG_DONTWAIT : 0,
>> -				  &peeked, &off, &err);
>> -	if (!skb)
>> -		return err;
>> +	if (tun->flags & IFF_TX_RING) {
>> +		unsigned long head, tail;
>> +		struct tun_desc *desc;
>> +
>> +		spin_lock(&tfile->rlock);
>> +		head = ACCESS_ONCE(tfile->head);
>> +		tail = tfile->tail;
>> +
>> +		if (CIRC_CNT(head, tail, TUN_RING_SIZE) >= 1) {
>> +			/* read tail before reading descriptor at tail */
>> +			smp_rmb();
>> +			desc = &tfile->tx_descs[tail];
>> +			skb = desc->skb;
>> +			/* read descriptor before incrementing tail. */
>> +			smp_store_release(&tfile->tail,
>> +					  (tail + 1) & TUN_RING_MASK);
> same comments as purge, also - pls order code consistently.

Ok.

>
>> +		} else {
>> +			spin_unlock(&tfile->rlock);
>> +			return -EAGAIN;
>> +		}
>> +
>> +		spin_unlock(&tfile->rlock);
>> +	} else {
>> +		/* Read frames from queue */
>> +		skb = __skb_recv_datagram(tfile->socket.sk,
>> +					  noblock ? MSG_DONTWAIT : 0,
>> +					  &peeked, &off, &err);
>> +		if (!skb)
>> +			return err;
>> +	}
>>   
>>   	ret = tun_put_user(tun, tfile, skb, to);
>>   	if (unlikely(ret < 0))
>> @@ -1629,8 +1724,47 @@ out:
>>   	return ret;
>>   }
>>   
>> +static int tun_peek(struct socket *sock, bool exact)
>> +{
>> +	struct tun_file *tfile = container_of(sock, struct tun_file, socket);
>> +	struct sock *sk = sock->sk;
>> +	struct tun_struct *tun;
>> +	int ret = 0;
>> +
>> +	if (!exact)
>> +		return tun_queue_not_empty(tfile);
>> +
>> +	tun = __tun_get(tfile);
>> +	if (!tun)
>> +		return 0;
>> +
>> +	if (tun->flags & IFF_TX_RING) {
>> +		unsigned long head = ACCESS_ONCE(tfile->head);
>> +		unsigned long tail = ACCESS_ONCE(tfile->tail);
>> +
>> +		if (head != tail)
>> +			ret = tfile->tx_descs[tail].len;
> no ordering at all here?

Seems yes, we can't be accurate if there's are more consumers.

>
>> +	} else {
>> +		struct sk_buff *head;
>> +		unsigned long flags;
>> +
>> +		spin_lock_irqsave(&sk->sk_receive_queue.lock, flags);
>> +		head = skb_peek(&sk->sk_receive_queue);
>> +		if (likely(head)) {
>> +			ret = head->len;
>> +			if (skb_vlan_tag_present(head))
>> +				ret += VLAN_HLEN;
>> +		}
>> +		spin_unlock_irqrestore(&sk->sk_receive_queue.lock, flags);
>> +	}
>> +
>> +	tun_put(tun);
>> +	return ret;
>> +}
>> +
>>   /* Ops structure to mimic raw sockets with tun */
>>   static const struct proto_ops tun_socket_ops = {
>> +	.peek    = tun_peek,
>>   	.sendmsg = tun_sendmsg,
>>   	.recvmsg = tun_recvmsg,
>>   };
>> @@ -2306,13 +2440,13 @@ static int tun_chr_open(struct inode *inode, struct file * file)
>>   {
>>   	struct net *net = current->nsproxy->net_ns;
>>   	struct tun_file *tfile;
>> -
>>   	DBG1(KERN_INFO, "tunX: tun_chr_open\n");
>>   
>>   	tfile = (struct tun_file *)sk_alloc(net, AF_UNSPEC, GFP_KERNEL,
>>   					    &tun_proto, 0);
>>   	if (!tfile)
>>   		return -ENOMEM;
>> +
>>   	RCU_INIT_POINTER(tfile->tun, NULL);
>>   	tfile->flags = 0;
>>   	tfile->ifindex = 0;
>> @@ -2332,6 +2466,11 @@ static int tun_chr_open(struct inode *inode, struct file * file)
>>   	INIT_LIST_HEAD(&tfile->next);
>>   
>>   	sock_set_flag(&tfile->sk, SOCK_ZEROCOPY);
>> +	tfile->head = 0;
>> +	tfile->tail = 0;
>> +
>> +	spin_lock_init(&tfile->rlock);
>> +	spin_lock_init(&tfile->wlock);
>>   
>>   	return 0;
>>   }
>> diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
>> index f744eeb..10ff494 100644
>> --- a/drivers/vhost/net.c
>> +++ b/drivers/vhost/net.c
>> @@ -455,10 +455,14 @@ out:
>>   
>>   static int peek_head_len(struct sock *sk)
>>   {
>> +	struct socket *sock = sk->sk_socket;
>>   	struct sk_buff *head;
>>   	int len = 0;
>>   	unsigned long flags;
>>   
>> +	if (sock->ops->peek)
>> +		return sock->ops->peek(sock, true);
>> +
>>   	spin_lock_irqsave(&sk->sk_receive_queue.lock, flags);
>>   	head = skb_peek(&sk->sk_receive_queue);
>>   	if (likely(head)) {
>> @@ -471,6 +475,16 @@ static int peek_head_len(struct sock *sk)
>>   	return len;
>>   }
>>   
>> +static int sk_has_rx_data(struct sock *sk)
>> +{
>> +	struct socket *sock = sk->sk_socket;
>> +
>> +	if (sock->ops->peek)
>> +		return sock->ops->peek(sock, false);
>> +
>> +	return skb_queue_empty(&sk->sk_receive_queue);
>> +}
>> +
>>   static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
>>   {
>>   	struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX];
>> @@ -487,7 +501,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
>>   		endtime = busy_clock() + vq->busyloop_timeout;
>>   
>>   		while (vhost_can_busy_poll(&net->dev, endtime) &&
>> -		       skb_queue_empty(&sk->sk_receive_queue) &&
>> +		       !sk_has_rx_data(sk) &&
>>   		       vhost_vq_avail_empty(&net->dev, vq))
>>   			cpu_relax_lowlatency();
>>   
>> diff --git a/include/linux/net.h b/include/linux/net.h
>> index 72c1e06..3c4ecd5 100644
>> --- a/include/linux/net.h
>> +++ b/include/linux/net.h
>> @@ -132,6 +132,7 @@ struct module;
>>   struct proto_ops {
>>   	int		family;
>>   	struct module	*owner;
>> +	int		(*peek) (struct socket *sock, bool exact);
>>   	int		(*release)   (struct socket *sock);
>>   	int		(*bind)	     (struct socket *sock,
>>   				      struct sockaddr *myaddr,
>> diff --git a/include/uapi/linux/if_tun.h b/include/uapi/linux/if_tun.h
>> index 3cb5e1d..d64ddc1 100644
>> --- a/include/uapi/linux/if_tun.h
>> +++ b/include/uapi/linux/if_tun.h
>> @@ -61,6 +61,7 @@
>>   #define IFF_TUN		0x0001
>>   #define IFF_TAP		0x0002
>>   #define IFF_NO_PI	0x1000
>> +#define IFF_TX_RING	0x0010
>>   /* This flag has no real effect */
>>   #define IFF_ONE_QUEUE	0x2000
>>   #define IFF_VNET_HDR	0x4000
>> -- 
>> 2.7.4

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH net-next] tuntap: introduce tx skb ring
  2016-05-16  7:52   ` Jason Wang
@ 2016-05-16  8:08     ` Michael S. Tsirkin
  2016-05-17  1:38       ` Jason Wang
  0 siblings, 1 reply; 21+ messages in thread
From: Michael S. Tsirkin @ 2016-05-16  8:08 UTC (permalink / raw)
  To: Jason Wang; +Cc: davem, netdev, linux-kernel

On Mon, May 16, 2016 at 03:52:11PM +0800, Jason Wang wrote:
> 
> 
> On 2016年05月16日 12:23, Michael S. Tsirkin wrote:
> >On Mon, May 16, 2016 at 09:17:01AM +0800, Jason Wang wrote:
> >>We used to queue tx packets in sk_receive_queue, this is less
> >>efficient since it requires spinlocks to synchronize between producer
> >>and consumer.
> >>
> >>This patch tries to address this by using circular buffer which allows
> >>lockless synchronization. This is done by switching from
> >>sk_receive_queue to a tx skb ring with a new flag IFF_TX_RING and when
> >>this is set:
> >Why do we need a new flag? Is there a userspace-visible
> >behaviour change?
> 
> Probably yes since tx_queue_length does not work.

So the flag name should reflect the behaviour somehow, not
the implementation.

> >
> >>- store pointer to skb in circular buffer in tun_net_xmit(), and read
> >>   it from the circular buffer in tun_do_read().
> >>- introduce a new proto_ops peek which could be implemented by
> >>   specific socket which does not use sk_receive_queue.
> >>- store skb length in circular buffer too, and implement a lockless
> >>   peek for tuntap.
> >>- change vhost_net to use proto_ops->peek() instead
> >>- new spinlocks were introduced to synchronize among producers (and so
> >>   did for consumers).
> >>
> >>Pktgen test shows about 9% improvement on guest receiving pps:
> >>
> >>Before: ~1480000pps
> >>After : ~1610000pps
> >>
> >>(I'm not sure noblocking read is still needed, so it was not included
> >>  in this patch)
> >How do you mean? Of course we must support blocking and non-blocking
> >read - userspace uses it.
> 
> Ok, will add this.
> 
> >
> >>Signed-off-by: Jason Wang <jasowang@redhat.com>
> >>---
> >>---
> >>  drivers/net/tun.c           | 157 +++++++++++++++++++++++++++++++++++++++++---
> >>  drivers/vhost/net.c         |  16 ++++-
> >>  include/linux/net.h         |   1 +
> >>  include/uapi/linux/if_tun.h |   1 +
> >>  4 files changed, 165 insertions(+), 10 deletions(-)
> >>
> >>diff --git a/drivers/net/tun.c b/drivers/net/tun.c
> >>index 425e983..6001ece 100644
> >>--- a/drivers/net/tun.c
> >>+++ b/drivers/net/tun.c
> >>@@ -71,6 +71,7 @@
> >>  #include <net/sock.h>
> >>  #include <linux/seq_file.h>
> >>  #include <linux/uio.h>
> >>+#include <linux/circ_buf.h>
> >>  #include <asm/uaccess.h>
> >>@@ -130,6 +131,8 @@ struct tap_filter {
> >>  #define MAX_TAP_FLOWS  4096
> >>  #define TUN_FLOW_EXPIRE (3 * HZ)
> >>+#define TUN_RING_SIZE 256
> >Can we resize this according to tx_queue_len set by user?
> 
> We can, but it needs lots of other changes, e.g being notified when
> tx_queue_len was changed by user.

Some kind of notifier?
Probably better than a new user interface.

> And if tx_queue_length is not power of 2,
> we probably need modulus to calculate the capacity.

Is that really that important for speed?
If yes, round it up to next power of two.
You can also probably wrap it with a conditional instead.

> >
> >>+#define TUN_RING_MASK (TUN_RING_SIZE - 1)
> >>  struct tun_pcpu_stats {
> >>  	u64 rx_packets;
> >>@@ -142,6 +145,11 @@ struct tun_pcpu_stats {
> >>  	u32 rx_frame_errors;
> >>  };
> >>+struct tun_desc {
> >>+	struct sk_buff *skb;
> >>+	int len; /* Cached skb len for peeking */
> >>+};
> >>+
> >>  /* A tun_file connects an open character device to a tuntap netdevice. It
> >>   * also contains all socket related structures (except sock_fprog and tap_filter)
> >>   * to serve as one transmit queue for tuntap device. The sock_fprog and
> >>@@ -167,6 +175,13 @@ struct tun_file {
> >>  	};
> >>  	struct list_head next;
> >>  	struct tun_struct *detached;
> >>+	/* reader lock */
> >>+	spinlock_t rlock;
> >>+	unsigned long tail;
> >>+	struct tun_desc tx_descs[TUN_RING_SIZE];
> >>+	/* writer lock */
> >>+	spinlock_t wlock;
> >>+	unsigned long head;
> >>  };
> >>  struct tun_flow_entry {
> >>@@ -515,7 +530,27 @@ static struct tun_struct *tun_enable_queue(struct tun_file *tfile)
> >>  static void tun_queue_purge(struct tun_file *tfile)
> >>  {
> >>+	unsigned long head, tail;
> >>+	struct tun_desc *desc;
> >>+	struct sk_buff *skb;
> >>  	skb_queue_purge(&tfile->sk.sk_receive_queue);
> >>+	spin_lock(&tfile->rlock);
> >>+
> >>+	head = ACCESS_ONCE(tfile->head);
> >>+	tail = tfile->tail;
> >>+
> >>+	/* read tail before reading descriptor at tail */
> >>+	smp_rmb();
> >I think you mean read *head* here
> 
> Right.
> 
> >
> >
> >>+
> >>+	while (CIRC_CNT(head, tail, TUN_RING_SIZE) >= 1) {
> >>+		desc = &tfile->tx_descs[tail];
> >>+		skb = desc->skb;
> >>+		kfree_skb(skb);
> >>+		tail = (tail + 1) & TUN_RING_MASK;
> >>+		/* read descriptor before incrementing tail. */
> >>+		smp_store_release(&tfile->tail, tail & TUN_RING_MASK);
> >>+	}
> >>+	spin_unlock(&tfile->rlock);
> >>  	skb_queue_purge(&tfile->sk.sk_error_queue);
> >>  }
> >>
> >Barrier pairing seems messed up. Could you tag
> >each barrier with its pair pls?
> >E.g. add /* Barrier A for pairing */ Before barrier and
> >its pair.
> 
> Ok.
> 
> for both tun_queue_purge() and tun_do_read():
> 
> smp_rmb() is paired with smp_store_release() in tun_net_xmit().

this seems at least an overkill. rmb would normally be paired with wmb,
not a full mb within release.

> smp_store_release() is paired with spin_unlock()/spin_lock() in
> tun_net_xmit().

release can't be paired with unlock since that's also a release.
lock is an acquire but from what I have seen you keep it around
operations not in the middle.

> 
> >>@@ -824,6 +859,7 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
> >>  	int txq = skb->queue_mapping;
> >>  	struct tun_file *tfile;
> >>  	u32 numqueues = 0;
> >>+	unsigned long flags;
> >>  	rcu_read_lock();
> >>  	tfile = rcu_dereference(tun->tfiles[txq]);
> >>@@ -888,8 +924,35 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
> >>  	nf_reset(skb);
> >>-	/* Enqueue packet */
> >>-	skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
> >>+	if (tun->flags & IFF_TX_RING) {
> >>+		unsigned long head, tail;
> >>+
> >>+		spin_lock_irqsave(&tfile->wlock, flags);
> >>+
> >>+		head = tfile->head;
> >>+		tail = ACCESS_ONCE(tfile->tail);
> >this should be acquire
> 
> Consider there's always one slot left empty, so we need to produced two skbs
> here before we could corrupt consumer. So the spin_unlock()/spin_lock()
> provides ordering here?

It's better to just follow memory barrier rules.


> >
> >>+ > >>+		if (CIRC_SPACE(head, tail, TUN_RING_SIZE) >= 1) {
> >>+			struct tun_desc *desc = &tfile->tx_descs[head];
> >>+
> >>+			desc->skb = skb;
> >>+			desc->len = skb->len;
> >>+			if (skb_vlan_tag_present(skb))
> >>+				desc->len += VLAN_HLEN;
> >>+
> >>+			/* read descriptor before incrementing head. */
> >write descriptor.
> 
> Right.
> 
> >  so smp_wmb is enough.
> 
> I thought smp_store_release() was more lightweight than smp_wmb()?

Why do you think this?  On which architecture?

> >
> >>+			smp_store_release(&tfile->head,
> >>+					  (head + 1) & TUN_RING_MASK);
> >>+		} else {
> >>+			spin_unlock_irqrestore(&tfile->wlock, flags);
> >>+			goto drop;
> >>+		}
> >>+
> >>+		spin_unlock_irqrestore(&tfile->wlock, flags);
> >>+	} else {
> >>+		/* Enqueue packet */
> >>+		skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
> >>+	}
> >>  	/* Notify and wake up reader process */
> >>  	if (tfile->flags & TUN_FASYNC)
> >>@@ -1085,6 +1148,13 @@ static void tun_net_init(struct net_device *dev)
> >>  	}
> >>  }
> >>+static bool tun_queue_not_empty(struct tun_file *tfile)
> >>+{
> >>+	struct sock *sk = tfile->socket.sk;
> >>+
> >>+	return (!skb_queue_empty(&sk->sk_receive_queue) ||
> >>+		ACCESS_ONCE(tfile->head) != ACCESS_ONCE(tfile->tail));
> >>+}
> >>  /* Character device part */
> >>  /* Poll */
> >>@@ -1104,7 +1174,7 @@ static unsigned int tun_chr_poll(struct file *file, poll_table *wait)
> >>  	poll_wait(file, sk_sleep(sk), wait);
> >>-	if (!skb_queue_empty(&sk->sk_receive_queue))
> >>+	if (tun_queue_not_empty(tfile))
> >>  		mask |= POLLIN | POLLRDNORM;
> >>  	if (sock_writeable(sk) ||
> >>@@ -1494,11 +1564,36 @@ static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile,
> >>  	if (tun->dev->reg_state != NETREG_REGISTERED)
> >>  		return -EIO;
> >>-	/* Read frames from queue */
> >>-	skb = __skb_recv_datagram(tfile->socket.sk, noblock ? MSG_DONTWAIT : 0,
> >>-				  &peeked, &off, &err);
> >>-	if (!skb)
> >>-		return err;
> >>+	if (tun->flags & IFF_TX_RING) {
> >>+		unsigned long head, tail;
> >>+		struct tun_desc *desc;
> >>+
> >>+		spin_lock(&tfile->rlock);
> >>+		head = ACCESS_ONCE(tfile->head);
> >>+		tail = tfile->tail;
> >>+
> >>+		if (CIRC_CNT(head, tail, TUN_RING_SIZE) >= 1) {
> >>+			/* read tail before reading descriptor at tail */
> >>+			smp_rmb();
> >>+			desc = &tfile->tx_descs[tail];
> >>+			skb = desc->skb;
> >>+			/* read descriptor before incrementing tail. */
> >>+			smp_store_release(&tfile->tail,
> >>+					  (tail + 1) & TUN_RING_MASK);
> >same comments as purge, also - pls order code consistently.
> 
> Ok.
> 
> >
> >>+		} else {
> >>+			spin_unlock(&tfile->rlock);
> >>+			return -EAGAIN;
> >>+		}
> >>+
> >>+		spin_unlock(&tfile->rlock);
> >>+	} else {
> >>+		/* Read frames from queue */
> >>+		skb = __skb_recv_datagram(tfile->socket.sk,
> >>+					  noblock ? MSG_DONTWAIT : 0,
> >>+					  &peeked, &off, &err);
> >>+		if (!skb)
> >>+			return err;
> >>+	}
> >>  	ret = tun_put_user(tun, tfile, skb, to);
> >>  	if (unlikely(ret < 0))
> >>@@ -1629,8 +1724,47 @@ out:
> >>  	return ret;
> >>  }
> >>+static int tun_peek(struct socket *sock, bool exact)
> >>+{
> >>+	struct tun_file *tfile = container_of(sock, struct tun_file, socket);
> >>+	struct sock *sk = sock->sk;
> >>+	struct tun_struct *tun;
> >>+	int ret = 0;
> >>+
> >>+	if (!exact)
> >>+		return tun_queue_not_empty(tfile);
> >>+
> >>+	tun = __tun_get(tfile);
> >>+	if (!tun)
> >>+		return 0;
> >>+
> >>+	if (tun->flags & IFF_TX_RING) {
> >>+		unsigned long head = ACCESS_ONCE(tfile->head);
> >>+		unsigned long tail = ACCESS_ONCE(tfile->tail);
> >>+
> >>+		if (head != tail)
> >>+			ret = tfile->tx_descs[tail].len;
> >no ordering at all here?
> 
> Seems yes, we can't be accurate if there's are more consumers.
> 
> >
> >>+	} else {
> >>+		struct sk_buff *head;
> >>+		unsigned long flags;
> >>+
> >>+		spin_lock_irqsave(&sk->sk_receive_queue.lock, flags);
> >>+		head = skb_peek(&sk->sk_receive_queue);
> >>+		if (likely(head)) {
> >>+			ret = head->len;
> >>+			if (skb_vlan_tag_present(head))
> >>+				ret += VLAN_HLEN;
> >>+		}
> >>+		spin_unlock_irqrestore(&sk->sk_receive_queue.lock, flags);
> >>+	}
> >>+
> >>+	tun_put(tun);
> >>+	return ret;
> >>+}
> >>+
> >>  /* Ops structure to mimic raw sockets with tun */
> >>  static const struct proto_ops tun_socket_ops = {
> >>+	.peek    = tun_peek,
> >>  	.sendmsg = tun_sendmsg,
> >>  	.recvmsg = tun_recvmsg,
> >>  };
> >>@@ -2306,13 +2440,13 @@ static int tun_chr_open(struct inode *inode, struct file * file)
> >>  {
> >>  	struct net *net = current->nsproxy->net_ns;
> >>  	struct tun_file *tfile;
> >>-
> >>  	DBG1(KERN_INFO, "tunX: tun_chr_open\n");
> >>  	tfile = (struct tun_file *)sk_alloc(net, AF_UNSPEC, GFP_KERNEL,
> >>  					    &tun_proto, 0);
> >>  	if (!tfile)
> >>  		return -ENOMEM;
> >>+
> >>  	RCU_INIT_POINTER(tfile->tun, NULL);
> >>  	tfile->flags = 0;
> >>  	tfile->ifindex = 0;
> >>@@ -2332,6 +2466,11 @@ static int tun_chr_open(struct inode *inode, struct file * file)
> >>  	INIT_LIST_HEAD(&tfile->next);
> >>  	sock_set_flag(&tfile->sk, SOCK_ZEROCOPY);
> >>+	tfile->head = 0;
> >>+	tfile->tail = 0;
> >>+
> >>+	spin_lock_init(&tfile->rlock);
> >>+	spin_lock_init(&tfile->wlock);
> >>  	return 0;
> >>  }
> >>diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
> >>index f744eeb..10ff494 100644
> >>--- a/drivers/vhost/net.c
> >>+++ b/drivers/vhost/net.c
> >>@@ -455,10 +455,14 @@ out:
> >>  static int peek_head_len(struct sock *sk)
> >>  {
> >>+	struct socket *sock = sk->sk_socket;
> >>  	struct sk_buff *head;
> >>  	int len = 0;
> >>  	unsigned long flags;
> >>+	if (sock->ops->peek)
> >>+		return sock->ops->peek(sock, true);
> >>+
> >>  	spin_lock_irqsave(&sk->sk_receive_queue.lock, flags);
> >>  	head = skb_peek(&sk->sk_receive_queue);
> >>  	if (likely(head)) {
> >>@@ -471,6 +475,16 @@ static int peek_head_len(struct sock *sk)
> >>  	return len;
> >>  }
> >>+static int sk_has_rx_data(struct sock *sk)
> >>+{
> >>+	struct socket *sock = sk->sk_socket;
> >>+
> >>+	if (sock->ops->peek)
> >>+		return sock->ops->peek(sock, false);
> >>+
> >>+	return skb_queue_empty(&sk->sk_receive_queue);
> >>+}
> >>+
> >>  static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
> >>  {
> >>  	struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX];
> >>@@ -487,7 +501,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
> >>  		endtime = busy_clock() + vq->busyloop_timeout;
> >>  		while (vhost_can_busy_poll(&net->dev, endtime) &&
> >>-		       skb_queue_empty(&sk->sk_receive_queue) &&
> >>+		       !sk_has_rx_data(sk) &&
> >>  		       vhost_vq_avail_empty(&net->dev, vq))
> >>  			cpu_relax_lowlatency();
> >>diff --git a/include/linux/net.h b/include/linux/net.h
> >>index 72c1e06..3c4ecd5 100644
> >>--- a/include/linux/net.h
> >>+++ b/include/linux/net.h
> >>@@ -132,6 +132,7 @@ struct module;
> >>  struct proto_ops {
> >>  	int		family;
> >>  	struct module	*owner;
> >>+	int		(*peek) (struct socket *sock, bool exact);
> >>  	int		(*release)   (struct socket *sock);
> >>  	int		(*bind)	     (struct socket *sock,
> >>  				      struct sockaddr *myaddr,
> >>diff --git a/include/uapi/linux/if_tun.h b/include/uapi/linux/if_tun.h
> >>index 3cb5e1d..d64ddc1 100644
> >>--- a/include/uapi/linux/if_tun.h
> >>+++ b/include/uapi/linux/if_tun.h
> >>@@ -61,6 +61,7 @@
> >>  #define IFF_TUN		0x0001
> >>  #define IFF_TAP		0x0002
> >>  #define IFF_NO_PI	0x1000
> >>+#define IFF_TX_RING	0x0010
> >>  /* This flag has no real effect */
> >>  #define IFF_ONE_QUEUE	0x2000
> >>  #define IFF_VNET_HDR	0x4000
> >>-- 
> >>2.7.4

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH net-next] tuntap: introduce tx skb ring
  2016-05-16  8:08     ` Michael S. Tsirkin
@ 2016-05-17  1:38       ` Jason Wang
  2016-05-18  8:16         ` Jesper Dangaard Brouer
  0 siblings, 1 reply; 21+ messages in thread
From: Jason Wang @ 2016-05-17  1:38 UTC (permalink / raw)
  To: Michael S. Tsirkin; +Cc: davem, netdev, linux-kernel



On 2016年05月16日 16:08, Michael S. Tsirkin wrote:
> On Mon, May 16, 2016 at 03:52:11PM +0800, Jason Wang wrote:
>>
>> On 2016年05月16日 12:23, Michael S. Tsirkin wrote:
>>> On Mon, May 16, 2016 at 09:17:01AM +0800, Jason Wang wrote:
>>>> We used to queue tx packets in sk_receive_queue, this is less
>>>> efficient since it requires spinlocks to synchronize between producer
>>>> and consumer.
>>>>
>>>> This patch tries to address this by using circular buffer which allows
>>>> lockless synchronization. This is done by switching from
>>>> sk_receive_queue to a tx skb ring with a new flag IFF_TX_RING and when
>>>> this is set:
>>> Why do we need a new flag? Is there a userspace-visible
>>> behaviour change?
>> Probably yes since tx_queue_length does not work.
> So the flag name should reflect the behaviour somehow, not
> the implementation.
>
>>>> - store pointer to skb in circular buffer in tun_net_xmit(), and read
>>>>    it from the circular buffer in tun_do_read().
>>>> - introduce a new proto_ops peek which could be implemented by
>>>>    specific socket which does not use sk_receive_queue.
>>>> - store skb length in circular buffer too, and implement a lockless
>>>>    peek for tuntap.
>>>> - change vhost_net to use proto_ops->peek() instead
>>>> - new spinlocks were introduced to synchronize among producers (and so
>>>>    did for consumers).
>>>>
>>>> Pktgen test shows about 9% improvement on guest receiving pps:
>>>>
>>>> Before: ~1480000pps
>>>> After : ~1610000pps
>>>>
>>>> (I'm not sure noblocking read is still needed, so it was not included
>>>>   in this patch)
>>> How do you mean? Of course we must support blocking and non-blocking
>>> read - userspace uses it.
>> Ok, will add this.
>>
>>>> Signed-off-by: Jason Wang <jasowang@redhat.com>
>>>> ---
>>>> ---
>>>>   drivers/net/tun.c           | 157 +++++++++++++++++++++++++++++++++++++++++---
>>>>   drivers/vhost/net.c         |  16 ++++-
>>>>   include/linux/net.h         |   1 +
>>>>   include/uapi/linux/if_tun.h |   1 +
>>>>   4 files changed, 165 insertions(+), 10 deletions(-)
>>>>
>>>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
>>>> index 425e983..6001ece 100644
>>>> --- a/drivers/net/tun.c
>>>> +++ b/drivers/net/tun.c
>>>> @@ -71,6 +71,7 @@
>>>>   #include <net/sock.h>
>>>>   #include <linux/seq_file.h>
>>>>   #include <linux/uio.h>
>>>> +#include <linux/circ_buf.h>
>>>>   #include <asm/uaccess.h>
>>>> @@ -130,6 +131,8 @@ struct tap_filter {
>>>>   #define MAX_TAP_FLOWS  4096
>>>>   #define TUN_FLOW_EXPIRE (3 * HZ)
>>>> +#define TUN_RING_SIZE 256
>>> Can we resize this according to tx_queue_len set by user?
>> We can, but it needs lots of other changes, e.g being notified when
>> tx_queue_len was changed by user.
> Some kind of notifier?

Yes, maybe.

> Probably better than a new user interface.

Ok.

>
>> And if tx_queue_length is not power of 2,
>> we probably need modulus to calculate the capacity.
> Is that really that important for speed?

Not sure, I can test.

> If yes, round it up to next power of two.

Right, this sounds a good solution.

> You can also probably wrap it with a conditional instead.
>
>>>> +#define TUN_RING_MASK (TUN_RING_SIZE - 1)
>>>>   struct tun_pcpu_stats {
>>>>   	u64 rx_packets;
>>>> @@ -142,6 +145,11 @@ struct tun_pcpu_stats {
>>>>   	u32 rx_frame_errors;
>>>>   };
>>>> +struct tun_desc {
>>>> +	struct sk_buff *skb;
>>>> +	int len; /* Cached skb len for peeking */
>>>> +};
>>>> +
>>>>   /* A tun_file connects an open character device to a tuntap netdevice. It
>>>>    * also contains all socket related structures (except sock_fprog and tap_filter)
>>>>    * to serve as one transmit queue for tuntap device. The sock_fprog and
>>>> @@ -167,6 +175,13 @@ struct tun_file {
>>>>   	};
>>>>   	struct list_head next;
>>>>   	struct tun_struct *detached;
>>>> +	/* reader lock */
>>>> +	spinlock_t rlock;
>>>> +	unsigned long tail;
>>>> +	struct tun_desc tx_descs[TUN_RING_SIZE];
>>>> +	/* writer lock */
>>>> +	spinlock_t wlock;
>>>> +	unsigned long head;
>>>>   };
>>>>   struct tun_flow_entry {
>>>> @@ -515,7 +530,27 @@ static struct tun_struct *tun_enable_queue(struct tun_file *tfile)
>>>>   static void tun_queue_purge(struct tun_file *tfile)
>>>>   {
>>>> +	unsigned long head, tail;
>>>> +	struct tun_desc *desc;
>>>> +	struct sk_buff *skb;
>>>>   	skb_queue_purge(&tfile->sk.sk_receive_queue);
>>>> +	spin_lock(&tfile->rlock);
>>>> +
>>>> +	head = ACCESS_ONCE(tfile->head);
>>>> +	tail = tfile->tail;
>>>> +
>>>> +	/* read tail before reading descriptor at tail */
>>>> +	smp_rmb();
>>> I think you mean read *head* here
>> Right.
>>
>>>
>>>> +
>>>> +	while (CIRC_CNT(head, tail, TUN_RING_SIZE) >= 1) {
>>>> +		desc = &tfile->tx_descs[tail];
>>>> +		skb = desc->skb;
>>>> +		kfree_skb(skb);
>>>> +		tail = (tail + 1) & TUN_RING_MASK;
>>>> +		/* read descriptor before incrementing tail. */
>>>> +		smp_store_release(&tfile->tail, tail & TUN_RING_MASK);
>>>> +	}
>>>> +	spin_unlock(&tfile->rlock);
>>>>   	skb_queue_purge(&tfile->sk.sk_error_queue);
>>>>   }
>>>>
>>> Barrier pairing seems messed up. Could you tag
>>> each barrier with its pair pls?
>>> E.g. add /* Barrier A for pairing */ Before barrier and
>>> its pair.
>> Ok.
>>
>> for both tun_queue_purge() and tun_do_read():
>>
>> smp_rmb() is paired with smp_store_release() in tun_net_xmit().
> this seems at least an overkill. rmb would normally be paired with wmb,
> not a full mb within release.

wmb is not enough here. We need guarantee the descriptor is read before 
we increase head. And full mb is more heavyweight that 
smp_store_release(). For paring, I can change to use 
smp_load_acquire(tfile->head) in tun_do_read().

>
>> smp_store_release() is paired with spin_unlock()/spin_lock() in
>> tun_net_xmit().
> release can't be paired with unlock since that's also a release.
> lock is an acquire but from what I have seen you keep it around
> operations not in the middle.

Right, so I will switch to use load_acquire() in this case.

>
>>>> @@ -824,6 +859,7 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
>>>>   	int txq = skb->queue_mapping;
>>>>   	struct tun_file *tfile;
>>>>   	u32 numqueues = 0;
>>>> +	unsigned long flags;
>>>>   	rcu_read_lock();
>>>>   	tfile = rcu_dereference(tun->tfiles[txq]);
>>>> @@ -888,8 +924,35 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
>>>>   	nf_reset(skb);
>>>> -	/* Enqueue packet */
>>>> -	skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
>>>> +	if (tun->flags & IFF_TX_RING) {
>>>> +		unsigned long head, tail;
>>>> +
>>>> +		spin_lock_irqsave(&tfile->wlock, flags);
>>>> +
>>>> +		head = tfile->head;
>>>> +		tail = ACCESS_ONCE(tfile->tail);
>>> this should be acquire
>> Consider there's always one slot left empty, so we need to produced two skbs
>> here before we could corrupt consumer. So the spin_unlock()/spin_lock()
>> provides ordering here?
> It's better to just follow memory barrier rules.

Ok.

>
>
>>>> + > >>+		if (CIRC_SPACE(head, tail, TUN_RING_SIZE) >= 1) {
>>>> +			struct tun_desc *desc = &tfile->tx_descs[head];
>>>> +
>>>> +			desc->skb = skb;
>>>> +			desc->len = skb->len;
>>>> +			if (skb_vlan_tag_present(skb))
>>>> +				desc->len += VLAN_HLEN;
>>>> +
>>>> +			/* read descriptor before incrementing head. */
>>> write descriptor.
>> Right.
>>
>>>   so smp_wmb is enough.
>> I thought smp_store_release() was more lightweight than smp_wmb()?
> Why do you think this?  On which architecture?

Sorry, I was wrong here.

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH net-next] tuntap: introduce tx skb ring
  2016-05-16  7:51   ` Jason Wang
@ 2016-05-18  8:13     ` Jesper Dangaard Brouer
  2016-05-18  8:23       ` Michael S. Tsirkin
                         ` (2 more replies)
  0 siblings, 3 replies; 21+ messages in thread
From: Jesper Dangaard Brouer @ 2016-05-18  8:13 UTC (permalink / raw)
  To: Jason Wang
  Cc: brouer, Eric Dumazet, davem, mst, netdev, linux-kernel, Steven Rostedt

On Mon, 16 May 2016 15:51:48 +0800
Jason Wang <jasowang@redhat.com> wrote:

> On 2016年05月16日 11:56, Eric Dumazet wrote:
> > On Mon, 2016-05-16 at 09:17 +0800, Jason Wang wrote:  
> >> We used to queue tx packets in sk_receive_queue, this is less
> >> efficient since it requires spinlocks to synchronize between producer
> >> and consumer.  
> > ...
> >  
> >>   	struct tun_struct *detached;
> >> +	/* reader lock */
> >> +	spinlock_t rlock;
> >> +	unsigned long tail;
> >> +	struct tun_desc tx_descs[TUN_RING_SIZE];
> >> +	/* writer lock */
> >> +	spinlock_t wlock;
> >> +	unsigned long head;
> >>   };
> >>     
> > Ok, we had these kind of ideas floating around for many other cases,
> > like qdisc, UDP or af_packet sockets...
> >
> > I believe we should have a common set of helpers, not hidden in
> > drivers/net/tun.c but in net/core/skb_ring.c or something, with more
> > flexibility (like the number of slots)
> >  
> 
> Yes, this sounds good.

I agree. It is sad to see everybody is implementing the same thing,
open coding an array/circular based ring buffer.  This kind of code is
hard to maintain and get right with barriers etc.  We can achieve the
same performance with a generic implementation, by inlining the help
function calls.

I implemented an array based Lock-Free/cmpxchg based queue, that you
could be inspired by, see:
 https://github.com/netoptimizer/prototype-kernel/blob/master/kernel/include/linux/alf_queue.h

The main idea behind my implementation is bulking, to amortize the
locked cmpxchg operation. You might not need it now, but I expect we
need it in the future.

You cannot use my alf_queue directly as your "struct tun_desc" is
larger than one-pointer (which the alf_queue works with).  But it
should be possible to extend to handle larger "objects".


Maybe Steven Rostedt have an even better ring queue implementation
already avail in the kernel?

-- 
Best regards,
  Jesper Dangaard Brouer
  MSc.CS, Principal Kernel Engineer at Red Hat
  Author of http://www.iptv-analyzer.org
  LinkedIn: http://www.linkedin.com/in/brouer

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH net-next] tuntap: introduce tx skb ring
  2016-05-17  1:38       ` Jason Wang
@ 2016-05-18  8:16         ` Jesper Dangaard Brouer
  2016-05-18  8:21           ` Michael S. Tsirkin
  0 siblings, 1 reply; 21+ messages in thread
From: Jesper Dangaard Brouer @ 2016-05-18  8:16 UTC (permalink / raw)
  To: Jason Wang; +Cc: brouer, Michael S. Tsirkin, davem, netdev, linux-kernel


On Tue, 17 May 2016 09:38:37 +0800 Jason Wang <jasowang@redhat.com> wrote:

> >> And if tx_queue_length is not power of 2,
> >> we probably need modulus to calculate the capacity.  
> > Is that really that important for speed?  
> 
> Not sure, I can test.

In my experience, yes, adding a modulus does affect performance.

> > If yes, round it up to next power of two.  
> 
> Right, this sounds a good solution.

Good idea.

-- 
Best regards,
  Jesper Dangaard Brouer
  MSc.CS, Principal Kernel Engineer at Red Hat
  Author of http://www.iptv-analyzer.org
  LinkedIn: http://www.linkedin.com/in/brouer

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH net-next] tuntap: introduce tx skb ring
  2016-05-18  8:16         ` Jesper Dangaard Brouer
@ 2016-05-18  8:21           ` Michael S. Tsirkin
  2016-05-18  9:21             ` Jesper Dangaard Brouer
  0 siblings, 1 reply; 21+ messages in thread
From: Michael S. Tsirkin @ 2016-05-18  8:21 UTC (permalink / raw)
  To: Jesper Dangaard Brouer; +Cc: Jason Wang, davem, netdev, linux-kernel

On Wed, May 18, 2016 at 10:16:31AM +0200, Jesper Dangaard Brouer wrote:
> 
> On Tue, 17 May 2016 09:38:37 +0800 Jason Wang <jasowang@redhat.com> wrote:
> 
> > >> And if tx_queue_length is not power of 2,
> > >> we probably need modulus to calculate the capacity.  
> > > Is that really that important for speed?  
> > 
> > Not sure, I can test.
> 
> In my experience, yes, adding a modulus does affect performance.

How about simple
	if (unlikely(++idx > size))
		idx = 0;


> > 
> > Right, this sounds a good solution.
> 
> Good idea.

I'm not that sure - it's clearly wasting memory.

> -- 
> Best regards,
>   Jesper Dangaard Brouer
>   MSc.CS, Principal Kernel Engineer at Red Hat
>   Author of http://www.iptv-analyzer.org
>   LinkedIn: http://www.linkedin.com/in/brouer

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH net-next] tuntap: introduce tx skb ring
  2016-05-18  8:13     ` Jesper Dangaard Brouer
@ 2016-05-18  8:23       ` Michael S. Tsirkin
  2016-05-18 10:23       ` Jason Wang
  2016-05-18 16:26       ` Michael S. Tsirkin
  2 siblings, 0 replies; 21+ messages in thread
From: Michael S. Tsirkin @ 2016-05-18  8:23 UTC (permalink / raw)
  To: Jesper Dangaard Brouer
  Cc: Jason Wang, Eric Dumazet, davem, netdev, linux-kernel, Steven Rostedt

On Wed, May 18, 2016 at 10:13:59AM +0200, Jesper Dangaard Brouer wrote:
> On Mon, 16 May 2016 15:51:48 +0800
> Jason Wang <jasowang@redhat.com> wrote:
> 
> > On 2016年05月16日 11:56, Eric Dumazet wrote:
> > > On Mon, 2016-05-16 at 09:17 +0800, Jason Wang wrote:  
> > >> We used to queue tx packets in sk_receive_queue, this is less
> > >> efficient since it requires spinlocks to synchronize between producer
> > >> and consumer.  
> > > ...
> > >  
> > >>   	struct tun_struct *detached;
> > >> +	/* reader lock */
> > >> +	spinlock_t rlock;
> > >> +	unsigned long tail;
> > >> +	struct tun_desc tx_descs[TUN_RING_SIZE];
> > >> +	/* writer lock */
> > >> +	spinlock_t wlock;
> > >> +	unsigned long head;
> > >>   };
> > >>     
> > > Ok, we had these kind of ideas floating around for many other cases,
> > > like qdisc, UDP or af_packet sockets...
> > >
> > > I believe we should have a common set of helpers, not hidden in
> > > drivers/net/tun.c but in net/core/skb_ring.c or something, with more
> > > flexibility (like the number of slots)
> > >  
> > 
> > Yes, this sounds good.
> 
> I agree. It is sad to see everybody is implementing the same thing,
> open coding an array/circular based ring buffer.  This kind of code is
> hard to maintain and get right with barriers etc.  We can achieve the
> same performance with a generic implementation, by inlining the help
> function calls.
> 
> I implemented an array based Lock-Free/cmpxchg based queue, that you
> could be inspired by, see:
>  https://github.com/netoptimizer/prototype-kernel/blob/master/kernel/include/linux/alf_queue.h
> 
> The main idea behind my implementation is bulking, to amortize the
> locked cmpxchg operation. You might not need it now, but I expect we
> need it in the future.
> 
> You cannot use my alf_queue directly as your "struct tun_desc" is
> larger than one-pointer (which the alf_queue works with).  But it
> should be possible to extend to handle larger "objects".
> 
> 
> Maybe Steven Rostedt have an even better ring queue implementation
> already avail in the kernel?

BTW at least for tun, index based isn't really needed.
A simple array seems to be more readable, faster and use less memory.
I have implemented this and it seems to work OK, will
post shortly.



> -- 
> Best regards,
>   Jesper Dangaard Brouer
>   MSc.CS, Principal Kernel Engineer at Red Hat
>   Author of http://www.iptv-analyzer.org
>   LinkedIn: http://www.linkedin.com/in/brouer

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH net-next] tuntap: introduce tx skb ring
  2016-05-18  8:21           ` Michael S. Tsirkin
@ 2016-05-18  9:21             ` Jesper Dangaard Brouer
  2016-05-18  9:55               ` Michael S. Tsirkin
  0 siblings, 1 reply; 21+ messages in thread
From: Jesper Dangaard Brouer @ 2016-05-18  9:21 UTC (permalink / raw)
  To: Michael S. Tsirkin; +Cc: Jason Wang, davem, netdev, linux-kernel, brouer

On Wed, 18 May 2016 11:21:59 +0300
"Michael S. Tsirkin" <mst@redhat.com> wrote:

> On Wed, May 18, 2016 at 10:16:31AM +0200, Jesper Dangaard Brouer wrote:
> > 
> > On Tue, 17 May 2016 09:38:37 +0800 Jason Wang <jasowang@redhat.com> wrote:
> >   
> > > >> And if tx_queue_length is not power of 2,
> > > >> we probably need modulus to calculate the capacity.    
> > > > Is that really that important for speed?    
> > > 
> > > Not sure, I can test.  
> > 
> > In my experience, yes, adding a modulus does affect performance.  
> 
> How about simple
> 	if (unlikely(++idx > size))
> 		idx = 0;

So, you are exchanging an AND-operation with a mask, for a
branch-operation.  If the branch predictor is good enough in the CPU
and code-"size" use-case, then I could be just as fast.

I've actually played with a lot of different approaches:
 https://github.com/netoptimizer/prototype-kernel/blob/master/kernel/include/linux/alf_queue_helpers.h

I cannot remember the exact results. I do remember micro benchmarking
showed good results with the advanced "unroll" approach, but IPv4
forwarding, where I know I-cache is getting evicted, showed best
results with the more simpler implementations.


> > > 
> > > Right, this sounds a good solution.  
> > 
> > Good idea.  
> 
> I'm not that sure - it's clearly wasting memory.

Rounding up to power of two.  In this case I don't think the memory
wast is too high.  As we are talking about max 16 bytes elements.

I am concerned about memory in another way. We need to keep these
arrays/rings small, due to data cache usage.  A 4096 ring queue is bad
because e.g. 16*4096=65536 bytes, and typical L1 cache is 32K-64K. As
this is a circular buffer, we walk over this memory all the time, thus
evicting the L1 cache.

-- 
Best regards,
  Jesper Dangaard Brouer
  MSc.CS, Principal Kernel Engineer at Red Hat
  Author of http://www.iptv-analyzer.org
  LinkedIn: http://www.linkedin.com/in/brouer

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH net-next] tuntap: introduce tx skb ring
  2016-05-18  9:21             ` Jesper Dangaard Brouer
@ 2016-05-18  9:55               ` Michael S. Tsirkin
  2016-05-18 10:42                 ` Jason Wang
  0 siblings, 1 reply; 21+ messages in thread
From: Michael S. Tsirkin @ 2016-05-18  9:55 UTC (permalink / raw)
  To: Jesper Dangaard Brouer; +Cc: Jason Wang, davem, netdev, linux-kernel

On Wed, May 18, 2016 at 11:21:29AM +0200, Jesper Dangaard Brouer wrote:
> On Wed, 18 May 2016 11:21:59 +0300
> "Michael S. Tsirkin" <mst@redhat.com> wrote:
> 
> > On Wed, May 18, 2016 at 10:16:31AM +0200, Jesper Dangaard Brouer wrote:
> > > 
> > > On Tue, 17 May 2016 09:38:37 +0800 Jason Wang <jasowang@redhat.com> wrote:
> > >   
> > > > >> And if tx_queue_length is not power of 2,
> > > > >> we probably need modulus to calculate the capacity.    
> > > > > Is that really that important for speed?    
> > > > 
> > > > Not sure, I can test.  
> > > 
> > > In my experience, yes, adding a modulus does affect performance.  
> > 
> > How about simple
> > 	if (unlikely(++idx > size))
> > 		idx = 0;
> 
> So, you are exchanging an AND-operation with a mask, for a
> branch-operation.  If the branch predictor is good enough in the CPU
> and code-"size" use-case, then I could be just as fast.
> 
> I've actually played with a lot of different approaches:
>  https://github.com/netoptimizer/prototype-kernel/blob/master/kernel/include/linux/alf_queue_helpers.h
> 
> I cannot remember the exact results. I do remember micro benchmarking
> showed good results with the advanced "unroll" approach, but IPv4
> forwarding, where I know I-cache is getting evicted, showed best
> results with the more simpler implementations.

This is all assuming you can somehow batch operations.
We can do this for transmit sometimes (when linux
is the source of the packets) but not always.

> 
> > > > 
> > > > Right, this sounds a good solution.  
> > > 
> > > Good idea.  
> > 
> > I'm not that sure - it's clearly wasting memory.
> 
> Rounding up to power of two.  In this case I don't think the memory
> wast is too high.  As we are talking about max 16 bytes elements.

It almost doubles it.
E.g. queue size of 10000 (rather common) will become 16K, wasting 6K.

> I am concerned about memory in another way. We need to keep these
> arrays/rings small, due to data cache usage.  A 4096 ring queue is bad
> because e.g. 16*4096=65536 bytes, and typical L1 cache is 32K-64K. As
> this is a circular buffer, we walk over this memory all the time, thus
> evicting the L1 cache.

Depends on the usage I guess.
Entries pointed to are much bigger, and you are
going to access them - is this really an issue?
If yes this shouldn't be that hard to fix ...

> -- 
> Best regards,
>   Jesper Dangaard Brouer
>   MSc.CS, Principal Kernel Engineer at Red Hat
>   Author of http://www.iptv-analyzer.org
>   LinkedIn: http://www.linkedin.com/in/brouer

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH net-next] tuntap: introduce tx skb ring
  2016-05-18  8:13     ` Jesper Dangaard Brouer
  2016-05-18  8:23       ` Michael S. Tsirkin
@ 2016-05-18 10:23       ` Jason Wang
  2016-05-18 11:52         ` Steven Rostedt
  2016-05-18 16:26       ` Michael S. Tsirkin
  2 siblings, 1 reply; 21+ messages in thread
From: Jason Wang @ 2016-05-18 10:23 UTC (permalink / raw)
  To: Jesper Dangaard Brouer
  Cc: Eric Dumazet, davem, mst, netdev, linux-kernel, Steven Rostedt



On 2016年05月18日 16:13, Jesper Dangaard Brouer wrote:
> On Mon, 16 May 2016 15:51:48 +0800
> Jason Wang <jasowang@redhat.com> wrote:
>
>> On 2016年05月16日 11:56, Eric Dumazet wrote:
>>> On Mon, 2016-05-16 at 09:17 +0800, Jason Wang wrote:
>>>> We used to queue tx packets in sk_receive_queue, this is less
>>>> efficient since it requires spinlocks to synchronize between producer
>>>> and consumer.
>>> ...
>>>   
>>>>    	struct tun_struct *detached;
>>>> +	/* reader lock */
>>>> +	spinlock_t rlock;
>>>> +	unsigned long tail;
>>>> +	struct tun_desc tx_descs[TUN_RING_SIZE];
>>>> +	/* writer lock */
>>>> +	spinlock_t wlock;
>>>> +	unsigned long head;
>>>>    };
>>>>      
>>> Ok, we had these kind of ideas floating around for many other cases,
>>> like qdisc, UDP or af_packet sockets...
>>>
>>> I believe we should have a common set of helpers, not hidden in
>>> drivers/net/tun.c but in net/core/skb_ring.c or something, with more
>>> flexibility (like the number of slots)
>>>   
>> Yes, this sounds good.
> I agree. It is sad to see everybody is implementing the same thing,
> open coding an array/circular based ring buffer.  This kind of code is
> hard to maintain and get right with barriers etc.  We can achieve the
> same performance with a generic implementation, by inlining the help
> function calls.
>
> I implemented an array based Lock-Free/cmpxchg based queue, that you
> could be inspired by, see:
>   https://github.com/netoptimizer/prototype-kernel/blob/master/kernel/include/linux/alf_queue.h

This looks really interesting, thanks.

>
> The main idea behind my implementation is bulking, to amortize the
> locked cmpxchg operation. You might not need it now, but I expect we
> need it in the future.

Right, we need change APIs which can read or write multiple buffers at 
one time for tun (and for others). I agree this will be a good 
optimization in the future.

>
> You cannot use my alf_queue directly as your "struct tun_desc" is
> larger than one-pointer (which the alf_queue works with).  But it
> should be possible to extend to handle larger "objects".

Yes, and for more generic usage, maybe one more void * is sufficient.

>
>
> Maybe Steven Rostedt have an even better ring queue implementation
> already avail in the kernel?
>

You mean ring buffer in tracing? Not sure, but it looks rather complex 
at first glance.

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH net-next] tuntap: introduce tx skb ring
  2016-05-18  9:55               ` Michael S. Tsirkin
@ 2016-05-18 10:42                 ` Jason Wang
  2016-05-18 10:58                   ` Michael S. Tsirkin
  0 siblings, 1 reply; 21+ messages in thread
From: Jason Wang @ 2016-05-18 10:42 UTC (permalink / raw)
  To: Michael S. Tsirkin, Jesper Dangaard Brouer; +Cc: davem, netdev, linux-kernel



On 2016年05月18日 17:55, Michael S. Tsirkin wrote:
> On Wed, May 18, 2016 at 11:21:29AM +0200, Jesper Dangaard Brouer wrote:
>> On Wed, 18 May 2016 11:21:59 +0300
>> "Michael S. Tsirkin" <mst@redhat.com> wrote:
>>
>>> On Wed, May 18, 2016 at 10:16:31AM +0200, Jesper Dangaard Brouer wrote:
>>>> On Tue, 17 May 2016 09:38:37 +0800 Jason Wang <jasowang@redhat.com> wrote:
>>>>    
>>>>>>> And if tx_queue_length is not power of 2,
>>>>>>> we probably need modulus to calculate the capacity.
>>>>>> Is that really that important for speed?
>>>>> Not sure, I can test.
>>>> In my experience, yes, adding a modulus does affect performance.
>>> How about simple
>>> 	if (unlikely(++idx > size))
>>> 		idx = 0;
>> So, you are exchanging an AND-operation with a mask, for a
>> branch-operation.  If the branch predictor is good enough in the CPU
>> and code-"size" use-case, then I could be just as fast.
>>
>> I've actually played with a lot of different approaches:
>>   https://github.com/netoptimizer/prototype-kernel/blob/master/kernel/include/linux/alf_queue_helpers.h
>>
>> I cannot remember the exact results. I do remember micro benchmarking
>> showed good results with the advanced "unroll" approach, but IPv4
>> forwarding, where I know I-cache is getting evicted, showed best
>> results with the more simpler implementations.
> This is all assuming you can somehow batch operations.
> We can do this for transmit sometimes (when linux
> is the source of the packets) but not always.
>
>>>>> Right, this sounds a good solution.
>>>> Good idea.
>>> I'm not that sure - it's clearly wasting memory.
>> Rounding up to power of two.  In this case I don't think the memory
>> wast is too high.  As we are talking about max 16 bytes elements.
> It almost doubles it.
> E.g. queue size of 10000 (rather common) will become 16K, wasting 6K.

It depends on the user, e.g default tx_queue_len is around 1000 for real 
cards. If we really care about the wasting, we can add a threshold and 
fall back to normal linked list during resizing.

>
>> I am concerned about memory in another way. We need to keep these
>> arrays/rings small, due to data cache usage.  A 4096 ring queue is bad
>> because e.g. 16*4096=65536 bytes, and typical L1 cache is 32K-64K. As
>> this is a circular buffer, we walk over this memory all the time, thus
>> evicting the L1 cache.
> Depends on the usage I guess.
> Entries pointed to are much bigger, and you are
> going to access them - is this really an issue?
> If yes this shouldn't be that hard to fix ...
>
>> -- 
>> Best regards,
>>    Jesper Dangaard Brouer
>>    MSc.CS, Principal Kernel Engineer at Red Hat
>>    Author of http://www.iptv-analyzer.org
>>    LinkedIn: http://www.linkedin.com/in/brouer

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH net-next] tuntap: introduce tx skb ring
  2016-05-18 10:42                 ` Jason Wang
@ 2016-05-18 10:58                   ` Michael S. Tsirkin
  0 siblings, 0 replies; 21+ messages in thread
From: Michael S. Tsirkin @ 2016-05-18 10:58 UTC (permalink / raw)
  To: Jason Wang; +Cc: Jesper Dangaard Brouer, davem, netdev, linux-kernel

On Wed, May 18, 2016 at 06:42:10PM +0800, Jason Wang wrote:
> 
> 
> On 2016年05月18日 17:55, Michael S. Tsirkin wrote:
> >On Wed, May 18, 2016 at 11:21:29AM +0200, Jesper Dangaard Brouer wrote:
> >>On Wed, 18 May 2016 11:21:59 +0300
> >>"Michael S. Tsirkin" <mst@redhat.com> wrote:
> >>
> >>>On Wed, May 18, 2016 at 10:16:31AM +0200, Jesper Dangaard Brouer wrote:
> >>>>On Tue, 17 May 2016 09:38:37 +0800 Jason Wang <jasowang@redhat.com> wrote:
> >>>>>>>And if tx_queue_length is not power of 2,
> >>>>>>>we probably need modulus to calculate the capacity.
> >>>>>>Is that really that important for speed?
> >>>>>Not sure, I can test.
> >>>>In my experience, yes, adding a modulus does affect performance.
> >>>How about simple
> >>>	if (unlikely(++idx > size))
> >>>		idx = 0;
> >>So, you are exchanging an AND-operation with a mask, for a
> >>branch-operation.  If the branch predictor is good enough in the CPU
> >>and code-"size" use-case, then I could be just as fast.
> >>
> >>I've actually played with a lot of different approaches:
> >>  https://github.com/netoptimizer/prototype-kernel/blob/master/kernel/include/linux/alf_queue_helpers.h
> >>
> >>I cannot remember the exact results. I do remember micro benchmarking
> >>showed good results with the advanced "unroll" approach, but IPv4
> >>forwarding, where I know I-cache is getting evicted, showed best
> >>results with the more simpler implementations.
> >This is all assuming you can somehow batch operations.
> >We can do this for transmit sometimes (when linux
> >is the source of the packets) but not always.
> >
> >>>>>Right, this sounds a good solution.
> >>>>Good idea.
> >>>I'm not that sure - it's clearly wasting memory.
> >>Rounding up to power of two.  In this case I don't think the memory
> >>wast is too high.  As we are talking about max 16 bytes elements.
> >It almost doubles it.
> >E.g. queue size of 10000 (rather common) will become 16K, wasting 6K.
> 
> It depends on the user, e.g default tx_queue_len is around 1000 for real
> cards. If we really care about the wasting, we can add a threshold and fall
> back to normal linked list during resizing.

That looks like a lot of complexity.

> >
> >>I am concerned about memory in another way. We need to keep these
> >>arrays/rings small, due to data cache usage.  A 4096 ring queue is bad
> >>because e.g. 16*4096=65536 bytes, and typical L1 cache is 32K-64K. As
> >>this is a circular buffer, we walk over this memory all the time, thus
> >>evicting the L1 cache.
> >Depends on the usage I guess.
> >Entries pointed to are much bigger, and you are
> >going to access them - is this really an issue?
> >If yes this shouldn't be that hard to fix ...
> >
> >>-- 
> >>Best regards,
> >>   Jesper Dangaard Brouer
> >>   MSc.CS, Principal Kernel Engineer at Red Hat
> >>   Author of http://www.iptv-analyzer.org
> >>   LinkedIn: http://www.linkedin.com/in/brouer

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH net-next] tuntap: introduce tx skb ring
  2016-05-18 10:23       ` Jason Wang
@ 2016-05-18 11:52         ` Steven Rostedt
  0 siblings, 0 replies; 21+ messages in thread
From: Steven Rostedt @ 2016-05-18 11:52 UTC (permalink / raw)
  To: Jason Wang
  Cc: Jesper Dangaard Brouer, Eric Dumazet, davem, mst, netdev,
	linux-kernel, Peter Zijlstra

On Wed, 18 May 2016 18:23:48 +0800
Jason Wang <jasowang@redhat.com> wrote:

>
> >
> >
> > Maybe Steven Rostedt have an even better ring queue implementation
> > already avail in the kernel?
> >  
> 
> You mean ring buffer in tracing? Not sure, but it looks rather complex 
> at first glance.

Yes it is, and I'm not sure it would be appropriate here or not. The
tracing ring buffer is highly tuned to be lockless and per cpu
(allocates a buffer per cpu), it also does not need to disable
interrupts and can be used in NMI context.

The complexity comes from being able to locklessly swap out pages from
the ring buffer to send across the network or to disk while a record is
happening.

Perf has a simpler ring buffer made for mmapping, but it's still rather
coupled with the perf core. It would be nice to make that ring buffer a
bit more generic and not so tied to perf itself.

-- Steve

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH net-next] tuntap: introduce tx skb ring
  2016-05-18  8:13     ` Jesper Dangaard Brouer
  2016-05-18  8:23       ` Michael S. Tsirkin
  2016-05-18 10:23       ` Jason Wang
@ 2016-05-18 16:26       ` Michael S. Tsirkin
  2016-05-18 16:41         ` Eric Dumazet
  2016-05-19 11:59         ` Jesper Dangaard Brouer
  2 siblings, 2 replies; 21+ messages in thread
From: Michael S. Tsirkin @ 2016-05-18 16:26 UTC (permalink / raw)
  To: Jesper Dangaard Brouer
  Cc: Jason Wang, Eric Dumazet, davem, netdev, linux-kernel, Steven Rostedt

On Wed, May 18, 2016 at 10:13:59AM +0200, Jesper Dangaard Brouer wrote:
> I agree. It is sad to see everybody is implementing the same thing,
> open coding an array/circular based ring buffer.  This kind of code is
> hard to maintain and get right with barriers etc.  We can achieve the
> same performance with a generic implementation, by inlining the help
> function calls.

So my testing seems to show that at least for the common usecase
in networking, which isn't lockless, circular buffer
with indices does not perform that well, because
each index access causes a cache line to bounce between
CPUs, and index access causes stalls due to the dependency.

By comparison, an array of pointers where NULL means invalid
and !NULL means valid, can be updated without messing up barriers
at all and does not have this issue.

You also mentioned cache pressure caused by using large queues, and I
think it's a significant issue. tun has a queue of 1000 entries by
default and that's 8K already.

So, I had an idea: with an array of pointers we could actually use
only part of the ring as long as it stays mostly empty.
We do want to fill at least two cache lines to prevent producer
and consumer from writing over the same cache line all the time.
This is SKB_ARRAY_MIN_SIZE logic below.

Pls take a look at the implementation below.  It's a straight port from virtio
unit test, so should work fine, except the SKB_ARRAY_MIN_SIZE hack that
I added.  Today I run out of time for testing this.  Posting for early
flames/feedback.

It's using skb pointers but we switching to void * would be easy at cost
of type safety, though it appears that people want lockless  push
etc so I'm not sure of the value.

--->
skb_array: array based FIFO for skbs

A simple array based FIFO of pointers.
Intended for net stack so uses skbs for type
safety, but we can replace with with void *
if others find it useful outside of net stack.

Signed-off-by: Michael S. Tsirkin <mst@redhat.com>

---

diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h
new file mode 100644
index 0000000..a67cc8b
--- /dev/null
+++ b/include/linux/skb_array.h
@@ -0,0 +1,116 @@
+/*
+ * See Documentation/skbular-buffers.txt for more information.
+ */
+
+#ifndef _LINUX_SKB_ARRAY_H
+#define _LINUX_SKB_ARRAY_H 1
+
+#include <linux/spinlock.h>
+#include <linux/cache.h>
+#include <linux/types.h>
+#include <linux/compiler.h>
+#include <linux/cache.h>
+#include <linux/slab.h>
+#include <asm/errno.h>
+
+struct sk_buff;
+
+struct skb_array {
+	int producer ____cacheline_aligned_in_smp;
+	spinlock_t producer_lock;
+	int consumer ____cacheline_aligned_in_smp;
+	spinlock_t consumer_lock;
+	/* Shared consumer/producer data */
+	int size ____cacheline_aligned_in_smp; /* max entries in queue */
+	struct sk_buff **queue;
+};
+
+#define SKB_ARRAY_MIN_SIZE (2 * (0x1 << cache_line_size()) / \
+			    sizeof (struct sk_buff *))
+
+static inline int __skb_array_produce(struct skb_array *a,
+				       struct sk_buff *skb)
+{
+	/* Try to start from beginning: good for cache utilization as we'll
+	 * keep reusing the same cache line.
+	 * Produce at least SKB_ARRAY_MIN_SIZE entries before trying to do this,
+	 * to reduce bouncing cache lines between them.
+	 */
+	if (a->producer >= SKB_ARRAY_MIN_SIZE && !a->queue[0])
+		a->producer = 0;
+	if (a->queue[a->producer])
+		return -ENOSPC;
+	a->queue[a->producer] = skb;
+	if (unlikely(++a->producer > a->size))
+		a->producer = 0;
+	return 0;
+}
+
+static inline int skb_array_produce_bh(struct skb_array *a,
+				       struct sk_buff *skb)
+{
+	int ret;
+
+	spin_lock_bh(&a->producer_lock);
+	ret = __skb_array_produce(a, skb);
+	spin_unlock_bh(&a->producer_lock);
+
+	return ret;
+}
+
+static inline struct sk_buff *__skb_array_peek(struct skb_array *a)
+{
+	if (a->queue[a->consumer])
+		return a->queue[a->consumer];
+
+	/* Check whether producer started at the beginning. */
+	if (unlikely(a->consumer >= SKB_ARRAY_MIN_SIZE && a->queue[0])) {
+		a->consumer = 0;
+		return a->queue[0];
+	}
+
+	return NULL;
+}
+
+static inline void __skb_array_consume(struct skb_array *a)
+{
+	a->queue[a->consumer++] = NULL;
+	if (unlikely(++a->consumer > a->size))
+		a->consumer = 0;
+}
+
+static inline struct sk_buff *skb_array_consume_bh(struct skb_array *a)
+{
+	struct sk_buff *skb;
+
+	spin_lock_bh(&a->producer_lock);
+	skb = __skb_array_peek(a);
+	if (skb)
+		__skb_array_consume(a);
+	spin_unlock_bh(&a->producer_lock);
+
+	return skb;
+}
+
+static inline int skb_array_init(struct skb_array *a, int size, gfp_t gfp)
+{
+	a->queue = kmalloc(ALIGN(size * sizeof *(a->queue), SMP_CACHE_BYTES),
+			   gfp);
+	if (!a->queue)
+		return -ENOMEM;
+
+	a->size = size;
+	a->producer = a->consumer = 0;
+	spin_lock_init(&a->producer_lock);
+	spin_lock_init(&a->consumer_lock);
+
+	return 0;
+}
+
+static inline void skb_array_cleanup(struct skb_array *a)
+{
+	kfree(a->queue);
+}
+
+#endif /* _LINUX_SKB_ARRAY_H  */

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* Re: [PATCH net-next] tuntap: introduce tx skb ring
  2016-05-18 16:26       ` Michael S. Tsirkin
@ 2016-05-18 16:41         ` Eric Dumazet
  2016-05-18 16:46           ` Michael S. Tsirkin
  2016-05-19 11:59         ` Jesper Dangaard Brouer
  1 sibling, 1 reply; 21+ messages in thread
From: Eric Dumazet @ 2016-05-18 16:41 UTC (permalink / raw)
  To: Michael S. Tsirkin
  Cc: Jesper Dangaard Brouer, Jason Wang, davem, netdev, linux-kernel,
	Steven Rostedt

On Wed, 2016-05-18 at 19:26 +0300, Michael S. Tsirkin wrote:
> On Wed, May 18, 2016 at 10:13:59AM +0200, Jesper Dangaard Brouer wrote:
> > I agree. It is sad to see everybody is implementing the same thing,
> > open coding an array/circular based ring buffer.  This kind of code is
> > hard to maintain and get right with barriers etc.  We can achieve the
> > same performance with a generic implementation, by inlining the help
> > function calls.
> 
> So my testing seems to show that at least for the common usecase
> in networking, which isn't lockless, circular buffer
> with indices does not perform that well, because
> each index access causes a cache line to bounce between
> CPUs, and index access causes stalls due to the dependency.


Yes.

> 
> By comparison, an array of pointers where NULL means invalid
> and !NULL means valid, can be updated without messing up barriers
> at all and does not have this issue.

Right but then you need appropriate barriers.

> 
> You also mentioned cache pressure caused by using large queues, and I
> think it's a significant issue. tun has a queue of 1000 entries by
> default and that's 8K already.
> 
> So, I had an idea: with an array of pointers we could actually use
> only part of the ring as long as it stays mostly empty.
> We do want to fill at least two cache lines to prevent producer
> and consumer from writing over the same cache line all the time.
> This is SKB_ARRAY_MIN_SIZE logic below.
> 
> Pls take a look at the implementation below.  It's a straight port from virtio
> unit test, so should work fine, except the SKB_ARRAY_MIN_SIZE hack that
> I added.  Today I run out of time for testing this.  Posting for early
> flames/feedback.
> 
> It's using skb pointers but we switching to void * would be easy at cost
> of type safety, though it appears that people want lockless  push
> etc so I'm not sure of the value.
> 
> --->
> skb_array: array based FIFO for skbs
> 
> A simple array based FIFO of pointers.
> Intended for net stack so uses skbs for type
> safety, but we can replace with with void *
> if others find it useful outside of net stack.
> 
> Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
> 
> ---
> 
> diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h
> new file mode 100644
> index 0000000..a67cc8b
> --- /dev/null
> +++ b/include/linux/skb_array.h
> @@ -0,0 +1,116 @@
> +/*
> + * See Documentation/skbular-buffers.txt for more information.
> + */
> +
> +#ifndef _LINUX_SKB_ARRAY_H
> +#define _LINUX_SKB_ARRAY_H 1
> +
> +#include <linux/spinlock.h>
> +#include <linux/cache.h>
> +#include <linux/types.h>
> +#include <linux/compiler.h>
> +#include <linux/cache.h>
> +#include <linux/slab.h>
> +#include <asm/errno.h>
> +
> +struct sk_buff;
> +
> +struct skb_array {
> +	int producer ____cacheline_aligned_in_smp;
> +	spinlock_t producer_lock;
> +	int consumer ____cacheline_aligned_in_smp;
> +	spinlock_t consumer_lock;
> +	/* Shared consumer/producer data */
> +	int size ____cacheline_aligned_in_smp; /* max entries in queue */
> +	struct sk_buff **queue;
> +};
> +
> +#define SKB_ARRAY_MIN_SIZE (2 * (0x1 << cache_line_size()) / \
> +			    sizeof (struct sk_buff *))
> +
> +static inline int __skb_array_produce(struct skb_array *a,
> +				       struct sk_buff *skb)
> +{
> +	/* Try to start from beginning: good for cache utilization as we'll
> +	 * keep reusing the same cache line.
> +	 * Produce at least SKB_ARRAY_MIN_SIZE entries before trying to do this,
> +	 * to reduce bouncing cache lines between them.
> +	 */
> +	if (a->producer >= SKB_ARRAY_MIN_SIZE && !a->queue[0])

a->queue[0] might be set by consumer, you probably need a barrier.

> +		a->producer = 0;
> +	if (a->queue[a->producer])
> +		return -ENOSPC;
> +	a->queue[a->producer] = skb;
> +	if (unlikely(++a->producer > a->size))
> +		a->producer = 0;
> +	return 0;
> +}
> +
> +static inline int skb_array_produce_bh(struct skb_array *a,
> +				       struct sk_buff *skb)
> +{
> +	int ret;
> +
> +	spin_lock_bh(&a->producer_lock);
> +	ret = __skb_array_produce(a, skb);
> +	spin_unlock_bh(&a->producer_lock);
> +
> +	return ret;
> +}
> +
> +static inline struct sk_buff *__skb_array_peek(struct skb_array *a)
> +{
> +	if (a->queue[a->consumer])
> +		return a->queue[a->consumer];
> +
> +	/* Check whether producer started at the beginning. */
> +	if (unlikely(a->consumer >= SKB_ARRAY_MIN_SIZE && a->queue[0])) {
> +		a->consumer = 0;
> +		return a->queue[0];
> +	}
> +
> +	return NULL;
> +}
> +
> +static inline void __skb_array_consume(struct skb_array *a)
> +{
> +	a->queue[a->consumer++] = NULL;
> +	if (unlikely(++a->consumer > a->size))

a->consumer is incremented twice ?

> +		a->consumer = 0;
> +}
> +

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH net-next] tuntap: introduce tx skb ring
  2016-05-18 16:41         ` Eric Dumazet
@ 2016-05-18 16:46           ` Michael S. Tsirkin
  0 siblings, 0 replies; 21+ messages in thread
From: Michael S. Tsirkin @ 2016-05-18 16:46 UTC (permalink / raw)
  To: Eric Dumazet
  Cc: Jesper Dangaard Brouer, Jason Wang, davem, netdev, linux-kernel,
	Steven Rostedt

On Wed, May 18, 2016 at 09:41:03AM -0700, Eric Dumazet wrote:
> On Wed, 2016-05-18 at 19:26 +0300, Michael S. Tsirkin wrote:
> > On Wed, May 18, 2016 at 10:13:59AM +0200, Jesper Dangaard Brouer wrote:
> > > I agree. It is sad to see everybody is implementing the same thing,
> > > open coding an array/circular based ring buffer.  This kind of code is
> > > hard to maintain and get right with barriers etc.  We can achieve the
> > > same performance with a generic implementation, by inlining the help
> > > function calls.
> > 
> > So my testing seems to show that at least for the common usecase
> > in networking, which isn't lockless, circular buffer
> > with indices does not perform that well, because
> > each index access causes a cache line to bounce between
> > CPUs, and index access causes stalls due to the dependency.
> 
> 
> Yes.
> 
> > 
> > By comparison, an array of pointers where NULL means invalid
> > and !NULL means valid, can be updated without messing up barriers
> > at all and does not have this issue.
> 
> Right but then you need appropriate barriers.
> 
> > 
> > You also mentioned cache pressure caused by using large queues, and I
> > think it's a significant issue. tun has a queue of 1000 entries by
> > default and that's 8K already.
> > 
> > So, I had an idea: with an array of pointers we could actually use
> > only part of the ring as long as it stays mostly empty.
> > We do want to fill at least two cache lines to prevent producer
> > and consumer from writing over the same cache line all the time.
> > This is SKB_ARRAY_MIN_SIZE logic below.
> > 
> > Pls take a look at the implementation below.  It's a straight port from virtio
> > unit test, so should work fine, except the SKB_ARRAY_MIN_SIZE hack that
> > I added.  Today I run out of time for testing this.  Posting for early
> > flames/feedback.
> > 
> > It's using skb pointers but we switching to void * would be easy at cost
> > of type safety, though it appears that people want lockless  push
> > etc so I'm not sure of the value.
> > 
> > --->
> > skb_array: array based FIFO for skbs
> > 
> > A simple array based FIFO of pointers.
> > Intended for net stack so uses skbs for type
> > safety, but we can replace with with void *
> > if others find it useful outside of net stack.
> > 
> > Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
> > 
> > ---
> > 
> > diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h
> > new file mode 100644
> > index 0000000..a67cc8b
> > --- /dev/null
> > +++ b/include/linux/skb_array.h
> > @@ -0,0 +1,116 @@
> > +/*
> > + * See Documentation/skbular-buffers.txt for more information.
> > + */
> > +
> > +#ifndef _LINUX_SKB_ARRAY_H
> > +#define _LINUX_SKB_ARRAY_H 1
> > +
> > +#include <linux/spinlock.h>
> > +#include <linux/cache.h>
> > +#include <linux/types.h>
> > +#include <linux/compiler.h>
> > +#include <linux/cache.h>
> > +#include <linux/slab.h>
> > +#include <asm/errno.h>
> > +
> > +struct sk_buff;
> > +
> > +struct skb_array {
> > +	int producer ____cacheline_aligned_in_smp;
> > +	spinlock_t producer_lock;
> > +	int consumer ____cacheline_aligned_in_smp;
> > +	spinlock_t consumer_lock;
> > +	/* Shared consumer/producer data */
> > +	int size ____cacheline_aligned_in_smp; /* max entries in queue */
> > +	struct sk_buff **queue;
> > +};
> > +
> > +#define SKB_ARRAY_MIN_SIZE (2 * (0x1 << cache_line_size()) / \
> > +			    sizeof (struct sk_buff *))
> > +
> > +static inline int __skb_array_produce(struct skb_array *a,
> > +				       struct sk_buff *skb)
> > +{
> > +	/* Try to start from beginning: good for cache utilization as we'll
> > +	 * keep reusing the same cache line.
> > +	 * Produce at least SKB_ARRAY_MIN_SIZE entries before trying to do this,
> > +	 * to reduce bouncing cache lines between them.
> > +	 */
> > +	if (a->producer >= SKB_ARRAY_MIN_SIZE && !a->queue[0])
> 
> a->queue[0] might be set by consumer, you probably need a barrier.

I think not - we write to the same place below and two accesses to
same address are never reordered.

> > +		a->producer = 0;
> > +	if (a->queue[a->producer])
> > +		return -ENOSPC;
> > +	a->queue[a->producer] = skb;
> > +	if (unlikely(++a->producer > a->size))
> > +		a->producer = 0;
> > +	return 0;
> > +}
> > +
> > +static inline int skb_array_produce_bh(struct skb_array *a,
> > +				       struct sk_buff *skb)
> > +{
> > +	int ret;
> > +
> > +	spin_lock_bh(&a->producer_lock);
> > +	ret = __skb_array_produce(a, skb);
> > +	spin_unlock_bh(&a->producer_lock);
> > +
> > +	return ret;
> > +}
> > +
> > +static inline struct sk_buff *__skb_array_peek(struct skb_array *a)
> > +{
> > +	if (a->queue[a->consumer])
> > +		return a->queue[a->consumer];
> > +
> > +	/* Check whether producer started at the beginning. */
> > +	if (unlikely(a->consumer >= SKB_ARRAY_MIN_SIZE && a->queue[0])) {
> > +		a->consumer = 0;
> > +		return a->queue[0];
> > +	}
> > +
> > +	return NULL;
> > +}
> > +
> > +static inline void __skb_array_consume(struct skb_array *a)
> > +{
> > +	a->queue[a->consumer++] = NULL;
> > +	if (unlikely(++a->consumer > a->size))
> 
> a->consumer is incremented twice ?

Oops.

> > +		a->consumer = 0;
> > +}
> > +
> 
> 
> 

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH net-next] tuntap: introduce tx skb ring
  2016-05-18 16:26       ` Michael S. Tsirkin
  2016-05-18 16:41         ` Eric Dumazet
@ 2016-05-19 11:59         ` Jesper Dangaard Brouer
  1 sibling, 0 replies; 21+ messages in thread
From: Jesper Dangaard Brouer @ 2016-05-19 11:59 UTC (permalink / raw)
  To: Michael S. Tsirkin
  Cc: Jason Wang, Eric Dumazet, davem, netdev, linux-kernel,
	Steven Rostedt, brouer

On Wed, 18 May 2016 19:26:38 +0300
"Michael S. Tsirkin" <mst@redhat.com> wrote:

> On Wed, May 18, 2016 at 10:13:59AM +0200, Jesper Dangaard Brouer wrote:
> > I agree. It is sad to see everybody is implementing the same thing,
> > open coding an array/circular based ring buffer.  This kind of code is
> > hard to maintain and get right with barriers etc.  We can achieve the
> > same performance with a generic implementation, by inlining the help
> > function calls.  
> 
> So my testing seems to show that at least for the common usecase
> in networking, which isn't lockless, circular buffer
> with indices does not perform that well, because
> each index access causes a cache line to bounce between
> CPUs, and index access causes stalls due to the dependency.

Yes, I have also noticed this.

For example my qmempool implementation which is based on alf_queue,
does not scale perfectly, likely because of this.

Concurrency benchmark:
 https://github.com/netoptimizer/prototype-kernel/blob/master/kernel/mm/qmempool_bench_parallel.c

for N in $(seq 1 8); do modprobe qmempool_bench_parallel parallel_cpus=$N run_flags=$((2#1000)) ; rmmod qmempool_bench_parallel && dmesg | tail -n 6 | grep parallel_qmempool_pattern_softirq_inline | awk '{print "Qmempool parallel "$8," ",$5," ",$6," ",$7}'; done 

Qmempool parallel CPUs:1   Average:   25   cycles(tsc)
Qmempool parallel CPUs:2   Average:   54   cycles(tsc)
Qmempool parallel CPUs:3   Average:   68   cycles(tsc)
Qmempool parallel CPUs:4   Average:   98   cycles(tsc)
Qmempool parallel CPUs:5   Average:   112   cycles(tsc)
Qmempool parallel CPUs:6   Average:   136   cycles(tsc)
Qmempool parallel CPUs:7   Average:   168   cycles(tsc)
Qmempool parallel CPUs:8   Average:   222   cycles(tsc)

The test above  does 1024 allocs followed by 1024 frees, to a qmempool,
which will cache 64 objects locally before accessing the shared
alf_queue pool (func run_bench_N_pattern_qmempool).


> By comparison, an array of pointers where NULL means invalid
> and !NULL means valid, can be updated without messing up barriers
> at all and does not have this issue.

We should verify this by benchmarking.  Once you have fixed the bug
Eric pointed out, I can try to benchmark this for you...


> You also mentioned cache pressure caused by using large queues, and I
> think it's a significant issue. tun has a queue of 1000 entries by
> default and that's 8K already.
> 
> So, I had an idea: with an array of pointers we could actually use
> only part of the ring as long as it stays mostly empty.
> We do want to fill at least two cache lines to prevent producer
> and consumer from writing over the same cache line all the time.
> This is SKB_ARRAY_MIN_SIZE logic below.

I really like this idea.  The only problem is that performance
characteristics will change according to load, which makes it harder to
benchmark, and verify that both situations are covered.  I guess, in a
micro-benchmark we could make sure that be cover both cases.  In
real-life scenarios it might be harder...


> Pls take a look at the implementation below.  It's a straight port from virtio
> unit test, so should work fine, except the SKB_ARRAY_MIN_SIZE hack that
> I added.  Today I run out of time for testing this.  Posting for early
> flames/feedback.
> 
> It's using skb pointers but we switching to void * would be easy at cost
> of type safety, though it appears that people want lockless  push
> etc so I'm not sure of the value.
> 
> --->  
> skb_array: array based FIFO for skbs
> 
> A simple array based FIFO of pointers.
> Intended for net stack so uses skbs for type
> safety, but we can replace with with void *
> if others find it useful outside of net stack.
> 
> Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
> 
> ---
> 
> diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h
> new file mode 100644
> index 0000000..a67cc8b
> --- /dev/null
> +++ b/include/linux/skb_array.h
> @@ -0,0 +1,116 @@
> +/*
> + * See Documentation/skbular-buffers.txt for more information.
> + */
> +
> +#ifndef _LINUX_SKB_ARRAY_H
> +#define _LINUX_SKB_ARRAY_H 1
> +
> +#include <linux/spinlock.h>
> +#include <linux/cache.h>
> +#include <linux/types.h>
> +#include <linux/compiler.h>
> +#include <linux/cache.h>
> +#include <linux/slab.h>
> +#include <asm/errno.h>
> +
> +struct sk_buff;
> +
> +struct skb_array {
> +	int producer ____cacheline_aligned_in_smp;
> +	spinlock_t producer_lock;
> +	int consumer ____cacheline_aligned_in_smp;
> +	spinlock_t consumer_lock;
> +	/* Shared consumer/producer data */
> +	int size ____cacheline_aligned_in_smp; /* max entries in queue */
> +	struct sk_buff **queue;
> +};
> +
> +#define SKB_ARRAY_MIN_SIZE (2 * (0x1 << cache_line_size()) / \
> +			    sizeof (struct sk_buff *))
> +
> +static inline int __skb_array_produce(struct skb_array *a,
> +				       struct sk_buff *skb)
> +{
> +	/* Try to start from beginning: good for cache utilization as we'll
> +	 * keep reusing the same cache line.
> +	 * Produce at least SKB_ARRAY_MIN_SIZE entries before trying to do this,
> +	 * to reduce bouncing cache lines between them.
> +	 */
> +	if (a->producer >= SKB_ARRAY_MIN_SIZE && !a->queue[0])
> +		a->producer = 0;
> +	if (a->queue[a->producer])
> +		return -ENOSPC;
> +	a->queue[a->producer] = skb;
> +	if (unlikely(++a->producer > a->size))
> +		a->producer = 0;
> +	return 0;
> +}
> +
> +static inline int skb_array_produce_bh(struct skb_array *a,
> +				       struct sk_buff *skb)
> +{
> +	int ret;
> +
> +	spin_lock_bh(&a->producer_lock);
> +	ret = __skb_array_produce(a, skb);
> +	spin_unlock_bh(&a->producer_lock);
> +
> +	return ret;
> +}
> +
> +static inline struct sk_buff *__skb_array_peek(struct skb_array *a)
> +{
> +	if (a->queue[a->consumer])
> +		return a->queue[a->consumer];
> +
> +	/* Check whether producer started at the beginning. */
> +	if (unlikely(a->consumer >= SKB_ARRAY_MIN_SIZE && a->queue[0])) {
> +		a->consumer = 0;
> +		return a->queue[0];
> +	}
> +
> +	return NULL;
> +}
> +
> +static inline void __skb_array_consume(struct skb_array *a)
> +{
> +	a->queue[a->consumer++] = NULL;
> +	if (unlikely(++a->consumer > a->size))
> +		a->consumer = 0;
> +}
> +
> +static inline struct sk_buff *skb_array_consume_bh(struct skb_array *a)
> +{
> +	struct sk_buff *skb;
> +
> +	spin_lock_bh(&a->producer_lock);
> +	skb = __skb_array_peek(a);
> +	if (skb)
> +		__skb_array_consume(a);
> +	spin_unlock_bh(&a->producer_lock);
> +
> +	return skb;
> +}
> +
> +static inline int skb_array_init(struct skb_array *a, int size, gfp_t gfp)
> +{
> +	a->queue = kmalloc(ALIGN(size * sizeof *(a->queue), SMP_CACHE_BYTES),
> +			   gfp);
> +	if (!a->queue)
> +		return -ENOMEM;
> +
> +	a->size = size;
> +	a->producer = a->consumer = 0;
> +	spin_lock_init(&a->producer_lock);
> +	spin_lock_init(&a->consumer_lock);
> +
> +	return 0;
> +}
> +
> +static inline void skb_array_cleanup(struct skb_array *a)
> +{
> +	kfree(a->queue);
> +}
> +
> +#endif /* _LINUX_SKB_ARRAY_H  */



-- 
Best regards,
  Jesper Dangaard Brouer
  MSc.CS, Principal Kernel Engineer at Red Hat
  Author of http://www.iptv-analyzer.org
  LinkedIn: http://www.linkedin.com/in/brouer

^ permalink raw reply	[flat|nested] 21+ messages in thread

end of thread, other threads:[~2016-05-19 11:59 UTC | newest]

Thread overview: 21+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-05-16  1:17 [PATCH net-next] tuntap: introduce tx skb ring Jason Wang
2016-05-16  3:56 ` Eric Dumazet
2016-05-16  7:51   ` Jason Wang
2016-05-18  8:13     ` Jesper Dangaard Brouer
2016-05-18  8:23       ` Michael S. Tsirkin
2016-05-18 10:23       ` Jason Wang
2016-05-18 11:52         ` Steven Rostedt
2016-05-18 16:26       ` Michael S. Tsirkin
2016-05-18 16:41         ` Eric Dumazet
2016-05-18 16:46           ` Michael S. Tsirkin
2016-05-19 11:59         ` Jesper Dangaard Brouer
2016-05-16  4:23 ` Michael S. Tsirkin
2016-05-16  7:52   ` Jason Wang
2016-05-16  8:08     ` Michael S. Tsirkin
2016-05-17  1:38       ` Jason Wang
2016-05-18  8:16         ` Jesper Dangaard Brouer
2016-05-18  8:21           ` Michael S. Tsirkin
2016-05-18  9:21             ` Jesper Dangaard Brouer
2016-05-18  9:55               ` Michael S. Tsirkin
2016-05-18 10:42                 ` Jason Wang
2016-05-18 10:58                   ` Michael S. Tsirkin

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).