From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1753102AbcEPHwS (ORCPT ); Mon, 16 May 2016 03:52:18 -0400 Received: from mx1.redhat.com ([209.132.183.28]:38635 "EHLO mx1.redhat.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1752542AbcEPHwQ (ORCPT ); Mon, 16 May 2016 03:52:16 -0400 Subject: Re: [PATCH net-next] tuntap: introduce tx skb ring To: "Michael S. Tsirkin" References: <1463361421-4397-1-git-send-email-jasowang@redhat.com> <20160516070012-mutt-send-email-mst@redhat.com> Cc: davem@davemloft.net, netdev@vger.kernel.org, linux-kernel@vger.kernel.org From: Jason Wang Message-ID: <57397C2B.7000603@redhat.com> Date: Mon, 16 May 2016 15:52:11 +0800 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:38.0) Gecko/20100101 Thunderbird/38.7.2 MIME-Version: 1.0 In-Reply-To: <20160516070012-mutt-send-email-mst@redhat.com> Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: 8bit X-Greylist: Sender IP whitelisted, not delayed by milter-greylist-4.5.16 (mx1.redhat.com [10.5.110.31]); Mon, 16 May 2016 07:52:16 +0000 (UTC) Sender: linux-kernel-owner@vger.kernel.org List-ID: X-Mailing-List: linux-kernel@vger.kernel.org On 2016年05月16日 12:23, Michael S. Tsirkin wrote: > On Mon, May 16, 2016 at 09:17:01AM +0800, Jason Wang wrote: >> We used to queue tx packets in sk_receive_queue, this is less >> efficient since it requires spinlocks to synchronize between producer >> and consumer. >> >> This patch tries to address this by using circular buffer which allows >> lockless synchronization. This is done by switching from >> sk_receive_queue to a tx skb ring with a new flag IFF_TX_RING and when >> this is set: > Why do we need a new flag? Is there a userspace-visible > behaviour change? Probably yes since tx_queue_length does not work. > >> - store pointer to skb in circular buffer in tun_net_xmit(), and read >> it from the circular buffer in tun_do_read(). >> - introduce a new proto_ops peek which could be implemented by >> specific socket which does not use sk_receive_queue. >> - store skb length in circular buffer too, and implement a lockless >> peek for tuntap. >> - change vhost_net to use proto_ops->peek() instead >> - new spinlocks were introduced to synchronize among producers (and so >> did for consumers). >> >> Pktgen test shows about 9% improvement on guest receiving pps: >> >> Before: ~1480000pps >> After : ~1610000pps >> >> (I'm not sure noblocking read is still needed, so it was not included >> in this patch) > How do you mean? Of course we must support blocking and non-blocking > read - userspace uses it. Ok, will add this. > >> Signed-off-by: Jason Wang >> --- >> --- >> drivers/net/tun.c | 157 +++++++++++++++++++++++++++++++++++++++++--- >> drivers/vhost/net.c | 16 ++++- >> include/linux/net.h | 1 + >> include/uapi/linux/if_tun.h | 1 + >> 4 files changed, 165 insertions(+), 10 deletions(-) >> >> diff --git a/drivers/net/tun.c b/drivers/net/tun.c >> index 425e983..6001ece 100644 >> --- a/drivers/net/tun.c >> +++ b/drivers/net/tun.c >> @@ -71,6 +71,7 @@ >> #include >> #include >> #include >> +#include >> >> #include >> >> @@ -130,6 +131,8 @@ struct tap_filter { >> #define MAX_TAP_FLOWS 4096 >> >> #define TUN_FLOW_EXPIRE (3 * HZ) >> +#define TUN_RING_SIZE 256 > Can we resize this according to tx_queue_len set by user? We can, but it needs lots of other changes, e.g being notified when tx_queue_len was changed by user. And if tx_queue_length is not power of 2, we probably need modulus to calculate the capacity. > >> +#define TUN_RING_MASK (TUN_RING_SIZE - 1) >> >> struct tun_pcpu_stats { >> u64 rx_packets; >> @@ -142,6 +145,11 @@ struct tun_pcpu_stats { >> u32 rx_frame_errors; >> }; >> >> +struct tun_desc { >> + struct sk_buff *skb; >> + int len; /* Cached skb len for peeking */ >> +}; >> + >> /* A tun_file connects an open character device to a tuntap netdevice. It >> * also contains all socket related structures (except sock_fprog and tap_filter) >> * to serve as one transmit queue for tuntap device. The sock_fprog and >> @@ -167,6 +175,13 @@ struct tun_file { >> }; >> struct list_head next; >> struct tun_struct *detached; >> + /* reader lock */ >> + spinlock_t rlock; >> + unsigned long tail; >> + struct tun_desc tx_descs[TUN_RING_SIZE]; >> + /* writer lock */ >> + spinlock_t wlock; >> + unsigned long head; >> }; >> >> struct tun_flow_entry { >> @@ -515,7 +530,27 @@ static struct tun_struct *tun_enable_queue(struct tun_file *tfile) >> >> static void tun_queue_purge(struct tun_file *tfile) >> { >> + unsigned long head, tail; >> + struct tun_desc *desc; >> + struct sk_buff *skb; >> skb_queue_purge(&tfile->sk.sk_receive_queue); >> + spin_lock(&tfile->rlock); >> + >> + head = ACCESS_ONCE(tfile->head); >> + tail = tfile->tail; >> + >> + /* read tail before reading descriptor at tail */ >> + smp_rmb(); > I think you mean read *head* here Right. > > >> + >> + while (CIRC_CNT(head, tail, TUN_RING_SIZE) >= 1) { >> + desc = &tfile->tx_descs[tail]; >> + skb = desc->skb; >> + kfree_skb(skb); >> + tail = (tail + 1) & TUN_RING_MASK; >> + /* read descriptor before incrementing tail. */ >> + smp_store_release(&tfile->tail, tail & TUN_RING_MASK); >> + } >> + spin_unlock(&tfile->rlock); >> skb_queue_purge(&tfile->sk.sk_error_queue); >> } >> > Barrier pairing seems messed up. Could you tag > each barrier with its pair pls? > E.g. add /* Barrier A for pairing */ Before barrier and > its pair. Ok. for both tun_queue_purge() and tun_do_read(): smp_rmb() is paired with smp_store_release() in tun_net_xmit(). smp_store_release() is paired with spin_unlock()/spin_lock() in tun_net_xmit(). > >> @@ -824,6 +859,7 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev) >> int txq = skb->queue_mapping; >> struct tun_file *tfile; >> u32 numqueues = 0; >> + unsigned long flags; >> >> rcu_read_lock(); >> tfile = rcu_dereference(tun->tfiles[txq]); >> @@ -888,8 +924,35 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev) >> >> nf_reset(skb); >> >> - /* Enqueue packet */ >> - skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb); >> + if (tun->flags & IFF_TX_RING) { >> + unsigned long head, tail; >> + >> + spin_lock_irqsave(&tfile->wlock, flags); >> + >> + head = tfile->head; >> + tail = ACCESS_ONCE(tfile->tail); > this should be acquire Consider there's always one slot left empty, so we need to produced two skbs here before we could corrupt consumer. So the spin_unlock()/spin_lock() provides ordering here? > >> + >> + if (CIRC_SPACE(head, tail, TUN_RING_SIZE) >= 1) { >> + struct tun_desc *desc = &tfile->tx_descs[head]; >> + >> + desc->skb = skb; >> + desc->len = skb->len; >> + if (skb_vlan_tag_present(skb)) >> + desc->len += VLAN_HLEN; >> + >> + /* read descriptor before incrementing head. */ > write descriptor. Right. > so smp_wmb is enough. I thought smp_store_release() was more lightweight than smp_wmb()? > >> + smp_store_release(&tfile->head, >> + (head + 1) & TUN_RING_MASK); >> + } else { >> + spin_unlock_irqrestore(&tfile->wlock, flags); >> + goto drop; >> + } >> + >> + spin_unlock_irqrestore(&tfile->wlock, flags); >> + } else { >> + /* Enqueue packet */ >> + skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb); >> + } >> >> /* Notify and wake up reader process */ >> if (tfile->flags & TUN_FASYNC) >> @@ -1085,6 +1148,13 @@ static void tun_net_init(struct net_device *dev) >> } >> } >> >> +static bool tun_queue_not_empty(struct tun_file *tfile) >> +{ >> + struct sock *sk = tfile->socket.sk; >> + >> + return (!skb_queue_empty(&sk->sk_receive_queue) || >> + ACCESS_ONCE(tfile->head) != ACCESS_ONCE(tfile->tail)); >> +} >> /* Character device part */ >> >> /* Poll */ >> @@ -1104,7 +1174,7 @@ static unsigned int tun_chr_poll(struct file *file, poll_table *wait) >> >> poll_wait(file, sk_sleep(sk), wait); >> >> - if (!skb_queue_empty(&sk->sk_receive_queue)) >> + if (tun_queue_not_empty(tfile)) >> mask |= POLLIN | POLLRDNORM; >> >> if (sock_writeable(sk) || >> @@ -1494,11 +1564,36 @@ static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile, >> if (tun->dev->reg_state != NETREG_REGISTERED) >> return -EIO; >> >> - /* Read frames from queue */ >> - skb = __skb_recv_datagram(tfile->socket.sk, noblock ? MSG_DONTWAIT : 0, >> - &peeked, &off, &err); >> - if (!skb) >> - return err; >> + if (tun->flags & IFF_TX_RING) { >> + unsigned long head, tail; >> + struct tun_desc *desc; >> + >> + spin_lock(&tfile->rlock); >> + head = ACCESS_ONCE(tfile->head); >> + tail = tfile->tail; >> + >> + if (CIRC_CNT(head, tail, TUN_RING_SIZE) >= 1) { >> + /* read tail before reading descriptor at tail */ >> + smp_rmb(); >> + desc = &tfile->tx_descs[tail]; >> + skb = desc->skb; >> + /* read descriptor before incrementing tail. */ >> + smp_store_release(&tfile->tail, >> + (tail + 1) & TUN_RING_MASK); > same comments as purge, also - pls order code consistently. Ok. > >> + } else { >> + spin_unlock(&tfile->rlock); >> + return -EAGAIN; >> + } >> + >> + spin_unlock(&tfile->rlock); >> + } else { >> + /* Read frames from queue */ >> + skb = __skb_recv_datagram(tfile->socket.sk, >> + noblock ? MSG_DONTWAIT : 0, >> + &peeked, &off, &err); >> + if (!skb) >> + return err; >> + } >> >> ret = tun_put_user(tun, tfile, skb, to); >> if (unlikely(ret < 0)) >> @@ -1629,8 +1724,47 @@ out: >> return ret; >> } >> >> +static int tun_peek(struct socket *sock, bool exact) >> +{ >> + struct tun_file *tfile = container_of(sock, struct tun_file, socket); >> + struct sock *sk = sock->sk; >> + struct tun_struct *tun; >> + int ret = 0; >> + >> + if (!exact) >> + return tun_queue_not_empty(tfile); >> + >> + tun = __tun_get(tfile); >> + if (!tun) >> + return 0; >> + >> + if (tun->flags & IFF_TX_RING) { >> + unsigned long head = ACCESS_ONCE(tfile->head); >> + unsigned long tail = ACCESS_ONCE(tfile->tail); >> + >> + if (head != tail) >> + ret = tfile->tx_descs[tail].len; > no ordering at all here? Seems yes, we can't be accurate if there's are more consumers. > >> + } else { >> + struct sk_buff *head; >> + unsigned long flags; >> + >> + spin_lock_irqsave(&sk->sk_receive_queue.lock, flags); >> + head = skb_peek(&sk->sk_receive_queue); >> + if (likely(head)) { >> + ret = head->len; >> + if (skb_vlan_tag_present(head)) >> + ret += VLAN_HLEN; >> + } >> + spin_unlock_irqrestore(&sk->sk_receive_queue.lock, flags); >> + } >> + >> + tun_put(tun); >> + return ret; >> +} >> + >> /* Ops structure to mimic raw sockets with tun */ >> static const struct proto_ops tun_socket_ops = { >> + .peek = tun_peek, >> .sendmsg = tun_sendmsg, >> .recvmsg = tun_recvmsg, >> }; >> @@ -2306,13 +2440,13 @@ static int tun_chr_open(struct inode *inode, struct file * file) >> { >> struct net *net = current->nsproxy->net_ns; >> struct tun_file *tfile; >> - >> DBG1(KERN_INFO, "tunX: tun_chr_open\n"); >> >> tfile = (struct tun_file *)sk_alloc(net, AF_UNSPEC, GFP_KERNEL, >> &tun_proto, 0); >> if (!tfile) >> return -ENOMEM; >> + >> RCU_INIT_POINTER(tfile->tun, NULL); >> tfile->flags = 0; >> tfile->ifindex = 0; >> @@ -2332,6 +2466,11 @@ static int tun_chr_open(struct inode *inode, struct file * file) >> INIT_LIST_HEAD(&tfile->next); >> >> sock_set_flag(&tfile->sk, SOCK_ZEROCOPY); >> + tfile->head = 0; >> + tfile->tail = 0; >> + >> + spin_lock_init(&tfile->rlock); >> + spin_lock_init(&tfile->wlock); >> >> return 0; >> } >> diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c >> index f744eeb..10ff494 100644 >> --- a/drivers/vhost/net.c >> +++ b/drivers/vhost/net.c >> @@ -455,10 +455,14 @@ out: >> >> static int peek_head_len(struct sock *sk) >> { >> + struct socket *sock = sk->sk_socket; >> struct sk_buff *head; >> int len = 0; >> unsigned long flags; >> >> + if (sock->ops->peek) >> + return sock->ops->peek(sock, true); >> + >> spin_lock_irqsave(&sk->sk_receive_queue.lock, flags); >> head = skb_peek(&sk->sk_receive_queue); >> if (likely(head)) { >> @@ -471,6 +475,16 @@ static int peek_head_len(struct sock *sk) >> return len; >> } >> >> +static int sk_has_rx_data(struct sock *sk) >> +{ >> + struct socket *sock = sk->sk_socket; >> + >> + if (sock->ops->peek) >> + return sock->ops->peek(sock, false); >> + >> + return skb_queue_empty(&sk->sk_receive_queue); >> +} >> + >> static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk) >> { >> struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX]; >> @@ -487,7 +501,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk) >> endtime = busy_clock() + vq->busyloop_timeout; >> >> while (vhost_can_busy_poll(&net->dev, endtime) && >> - skb_queue_empty(&sk->sk_receive_queue) && >> + !sk_has_rx_data(sk) && >> vhost_vq_avail_empty(&net->dev, vq)) >> cpu_relax_lowlatency(); >> >> diff --git a/include/linux/net.h b/include/linux/net.h >> index 72c1e06..3c4ecd5 100644 >> --- a/include/linux/net.h >> +++ b/include/linux/net.h >> @@ -132,6 +132,7 @@ struct module; >> struct proto_ops { >> int family; >> struct module *owner; >> + int (*peek) (struct socket *sock, bool exact); >> int (*release) (struct socket *sock); >> int (*bind) (struct socket *sock, >> struct sockaddr *myaddr, >> diff --git a/include/uapi/linux/if_tun.h b/include/uapi/linux/if_tun.h >> index 3cb5e1d..d64ddc1 100644 >> --- a/include/uapi/linux/if_tun.h >> +++ b/include/uapi/linux/if_tun.h >> @@ -61,6 +61,7 @@ >> #define IFF_TUN 0x0001 >> #define IFF_TAP 0x0002 >> #define IFF_NO_PI 0x1000 >> +#define IFF_TX_RING 0x0010 >> /* This flag has no real effect */ >> #define IFF_ONE_QUEUE 0x2000 >> #define IFF_VNET_HDR 0x4000 >> -- >> 2.7.4