From mboxrd@z Thu Jan 1 00:00:00 1970 From: Yuanhan Liu Subject: Re: [PATCH v4 2/6] vhost: rewrite enqueue Date: Wed, 7 Sep 2016 13:33:10 +0800 Message-ID: <20160907053310.GC4357@yliu-dev.sh.intel.com> References: <1472528164-54296-1-git-send-email-zhihong.wang@intel.com> <1472528164-54296-3-git-send-email-zhihong.wang@intel.com> <20160905063925.GK30752@yliu-dev.sh.intel.com> Mime-Version: 1.0 Content-Type: text/plain; charset=us-ascii Cc: dev@dpdk.org, maxime.coquelin@redhat.com, thomas.monjalon@6wind.com To: Zhihong Wang Return-path: Received: from mga07.intel.com (mga07.intel.com [134.134.136.100]) by dpdk.org (Postfix) with ESMTP id 95ECF7EEF for ; Wed, 7 Sep 2016 07:32:42 +0200 (CEST) Content-Disposition: inline In-Reply-To: <20160905063925.GK30752@yliu-dev.sh.intel.com> List-Id: patches and discussions about DPDK List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Hmmm, yet another email didn't send out successfully. Resend. BTW, please work out v5 on top of the latest next-virtio tree. Thanks. --yliu On Mon, Sep 05, 2016 at 02:39:25PM +0800, Yuanhan Liu wrote: ---- On Mon, Aug 29, 2016 at 11:36:00PM -0400, Zhihong Wang wrote: > This patch implements the vhost logic from scratch into a single function > designed for high performance and better maintainability. > > This is the baseline version of the new code, more optimization will be > added in the following patches in this patch set. > > --- > Changes in v4: > > 1. Refactor the code for clearer logic. > > 2. Add PRINT_PACKET for debugging. > > --- > Changes in v3: > > 1. Rewrite enqueue and delete the obsolete in the same patch. Change log should go ----> > Signed-off-by: Zhihong Wang > --- ... here, after the SoB. > lib/librte_vhost/vhost_rxtx.c | 525 ++++++++++++------------------------------ > 1 file changed, 145 insertions(+), 380 deletions(-) > > diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c > index 5806f99..629e8ae 100644 > --- a/lib/librte_vhost/vhost_rxtx.c > +++ b/lib/librte_vhost/vhost_rxtx.c > @@ -91,7 +91,7 @@ is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t qp_nb) > return (is_tx ^ (idx & 1)) == 0 && idx < qp_nb * VIRTIO_QNUM; > } > > -static void > +static inline void __attribute__((always_inline)) > virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr) > { > if (m_buf->ol_flags & PKT_TX_L4_MASK) { > @@ -112,6 +112,10 @@ virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr) > cksum)); > break; > } > + } else { > + net_hdr->flags = 0; > + net_hdr->csum_start = 0; > + net_hdr->csum_offset = 0; > } > > if (m_buf->ol_flags & PKT_TX_TCP_SEG) { > @@ -122,437 +126,198 @@ virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr) > net_hdr->gso_size = m_buf->tso_segsz; > net_hdr->hdr_len = m_buf->l2_len + m_buf->l3_len > + m_buf->l4_len; > + } else { > + net_hdr->gso_type = 0; > + net_hdr->hdr_len = 0; > + net_hdr->gso_size = 0; > } > } > > -static inline void > -copy_virtio_net_hdr(struct virtio_net *dev, uint64_t desc_addr, > - struct virtio_net_hdr_mrg_rxbuf hdr) > +static inline void __attribute__((always_inline)) > +update_used_ring(struct virtio_net *dev, struct vhost_virtqueue *vq, > + uint32_t desc_chain_head, uint32_t desc_chain_len) > { > - if (dev->vhost_hlen == sizeof(struct virtio_net_hdr_mrg_rxbuf)) > - *(struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr = hdr; > - else > - *(struct virtio_net_hdr *)(uintptr_t)desc_addr = hdr.hdr; > + uint32_t used_idx_round = vq->last_used_idx & (vq->size - 1); I'd suggest to use "used_idx", instead of "used_idx_round". > + > + vq->used->ring[used_idx_round].id = desc_chain_head; > + vq->used->ring[used_idx_round].len = desc_chain_len; > + vhost_log_used_vring(dev, vq, offsetof(struct vring_used, > + ring[used_idx_round]), > + sizeof(vq->used->ring[used_idx_round])); > } > > -static inline int __attribute__((always_inline)) > -copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq, > - struct rte_mbuf *m, uint16_t desc_idx) > +static inline uint32_t __attribute__((always_inline)) > +enqueue_packet(struct virtio_net *dev, struct vhost_virtqueue *vq, > + uint16_t avail_idx, struct rte_mbuf *mbuf, > + uint32_t is_mrg_rxbuf) > { > - uint32_t desc_avail, desc_offset; > - uint32_t mbuf_avail, mbuf_offset; > - uint32_t cpy_len; > + struct virtio_net_hdr_mrg_rxbuf *virtio_hdr; > struct vring_desc *desc; > uint64_t desc_addr; > - struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0}; > - > - desc = &vq->desc[desc_idx]; > + uint32_t desc_chain_head; > + uint32_t desc_chain_len; > + uint32_t desc_current; > + uint32_t desc_offset; > + uint32_t mbuf_len; > + uint32_t mbuf_avail; > + uint32_t copy_len; > + uint32_t extra_buffers = 0; I'd name it to "num_buffers", to keep consistent with the virito hdr naming style. > + > + /* start with the first mbuf of the packet */ > + mbuf_len = rte_pktmbuf_data_len(mbuf); > + mbuf_avail = mbuf_len; > + > + /* get the current desc */ > + desc_current = vq->avail->ring[(vq->last_used_idx) & (vq->size - 1)]; > + desc_chain_head = desc_current; > + desc = &vq->desc[desc_current]; > desc_addr = gpa_to_vva(dev, desc->addr); > - /* > - * Checking of 'desc_addr' placed outside of 'unlikely' macro to avoid > - * performance issue with some versions of gcc (4.8.4 and 5.3.0) which > - * otherwise stores offset on the stack instead of in a register. > - */ > - if (unlikely(desc->len < dev->vhost_hlen) || !desc_addr) > - return -1; > + if (unlikely(!desc_addr)) > + goto error; > > - rte_prefetch0((void *)(uintptr_t)desc_addr); > + /* handle virtio header */ > + virtio_hdr = (struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr; > + virtio_enqueue_offload(mbuf, &(virtio_hdr->hdr)); > + if (is_mrg_rxbuf) > + virtio_hdr->num_buffers = extra_buffers + 1; > > - virtio_enqueue_offload(m, &virtio_hdr.hdr); > - copy_virtio_net_hdr(dev, desc_addr, virtio_hdr); > vhost_log_write(dev, desc->addr, dev->vhost_hlen); > PRINT_PACKET(dev, (uintptr_t)desc_addr, dev->vhost_hlen, 0); > - > desc_offset = dev->vhost_hlen; > - desc_avail = desc->len - dev->vhost_hlen; > - > - mbuf_avail = rte_pktmbuf_data_len(m); > - mbuf_offset = 0; > - while (mbuf_avail != 0 || m->next != NULL) { > - /* done with current mbuf, fetch next */ > - if (mbuf_avail == 0) { > - m = m->next; > - > - mbuf_offset = 0; > - mbuf_avail = rte_pktmbuf_data_len(m); > + desc_chain_len = desc_offset; > + desc_addr += desc_offset; > + > + /* start copy from mbuf to desc */ > + while (mbuf_avail || mbuf->next) { > + /* get the next mbuf if the current done */ > + if (!mbuf_avail) { > + mbuf = mbuf->next; > + mbuf_len = rte_pktmbuf_data_len(mbuf); > + mbuf_avail = mbuf_len; > } > > - /* done with current desc buf, fetch next */ > - if (desc_avail == 0) { > - if ((desc->flags & VRING_DESC_F_NEXT) == 0) { > - /* Room in vring buffer is not enough */ > - return -1; > - } > - if (unlikely(desc->next >= vq->size)) > - return -1; > + /* get the next desc if the current done */ > + if (desc->len <= desc_offset) { > + if (desc->flags & VRING_DESC_F_NEXT) { > + /* go on with the current desc chain */ > + desc_offset = 0; > + desc_current = desc->next; > + desc = &vq->desc[desc_current]; > + desc_addr = gpa_to_vva(dev, desc->addr); > + if (unlikely(!desc_addr)) > + goto error; > + } else if (is_mrg_rxbuf) { > + /* start with the next desc chain */ > + update_used_ring(dev, vq, desc_chain_head, > + desc_chain_len); > + vq->last_used_idx++; Why not putting "vq->last_used_idx++" into update_used_ring()? > + extra_buffers++; > + virtio_hdr->num_buffers++; > + if (avail_idx == vq->last_used_idx) > + goto error; > + > + desc_current = > + vq->avail->ring[(vq->last_used_idx) & > + (vq->size - 1)]; > + desc_chain_head = desc_current; > + desc = &vq->desc[desc_current]; > + desc_addr = gpa_to_vva(dev, desc->addr); > + if (unlikely(!desc_addr)) > + goto error; > > - desc = &vq->desc[desc->next]; > - desc_addr = gpa_to_vva(dev, desc->addr); > - if (unlikely(!desc_addr)) > - return -1; > - > - desc_offset = 0; > - desc_avail = desc->len; > + desc_chain_len = 0; > + desc_offset = 0; > + } else > + goto error; > } > > - cpy_len = RTE_MIN(desc_avail, mbuf_avail); > - rte_memcpy((void *)((uintptr_t)(desc_addr + desc_offset)), > - rte_pktmbuf_mtod_offset(m, void *, mbuf_offset), > - cpy_len); > - vhost_log_write(dev, desc->addr + desc_offset, cpy_len); > - PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset), > - cpy_len, 0); > - > - mbuf_avail -= cpy_len; > - mbuf_offset += cpy_len; > - desc_avail -= cpy_len; > - desc_offset += cpy_len; > + /* copy mbuf data */ > + copy_len = RTE_MIN(desc->len - desc_offset, mbuf_avail); TBH, I'm okay with copy_len (actually, I prefer it slightly). However, the old code uses cpy_len, the current dequeue function also uses cpy_len, I then see no good reason to use copy_len here. It's really not a good idea to me to use two different naming styles in one source file. > + rte_memcpy((void *)(uintptr_t)desc_addr, > + rte_pktmbuf_mtod_offset(mbuf, void *, > + mbuf_len - mbuf_avail), I would keep the old var "mbuf_offset" and do not introduce "mbuf_len". This could avoid above calculation and make it straightforward. > + copy_len); > + vhost_log_write(dev, desc->addr + desc_offset, copy_len); > + PRINT_PACKET(dev, (uintptr_t)desc_addr, copy_len, 0); > + mbuf_avail -= copy_len; > + desc_offset += copy_len; > + desc_addr += copy_len; > + desc_chain_len += copy_len; Vertical alighment[0] is not a must, but as you can see, it's a style I prefer. Meaning, if possible, please follow it. [0]: https://en.wikipedia.org/wiki/Programming_style#Vertical_alignment > } > > - return 0; > -} > + update_used_ring(dev, vq, desc_chain_head, desc_chain_len); > + vq->last_used_idx++; > ... > - rte_prefetch0(&vq->desc[desc_indexes[0]]); > - for (i = 0; i < count; i++) { > - uint16_t desc_idx = desc_indexes[i]; > - int err; > + return 0; > > - err = copy_mbuf_to_desc(dev, vq, pkts[i], desc_idx); > - if (unlikely(err)) { > - used_idx = (start_idx + i) & (vq->size - 1); > - vq->used->ring[used_idx].len = dev->vhost_hlen; > - vhost_log_used_vring(dev, vq, > - offsetof(struct vring_used, ring[used_idx]), > - sizeof(vq->used->ring[used_idx])); > - } > +error: > + /* rollback on any error if last_used_idx update on-the-fly */ > + vq->last_used_idx -= extra_buffers; > > - if (i + 1 < count) > - rte_prefetch0(&vq->desc[desc_indexes[i+1]]); > - } > + return 1; We normally returns -1 (but not 1) on error. > > +static inline void __attribute__((always_inline)) > +notify_guest(struct virtio_net *dev, struct vhost_virtqueue *vq) > +{ > rte_smp_wmb(); > - > - *(volatile uint16_t *)&vq->used->idx += count; > - vq->last_used_idx += count; > - vhost_log_used_vring(dev, vq, > - offsetof(struct vring_used, idx), > - sizeof(vq->used->idx)); > - > - /* flush used->idx update before we read avail->flags. */ > + vq->used->idx = vq->last_used_idx; I will not drop the "volatile" cast here, sliently. You know this kind of stuff is tricky and would be painful to debug if it cause any issue. Such removal deserves a patch, as well as some explanations. > + vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx), > + sizeof(vq->used->idx)); > rte_mb(); > - > - /* Kick the guest if necessary. */ > if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT) > && (vq->callfd >= 0)) > eventfd_write(vq->callfd, (eventfd_t)1); > - return count; > } > ... > + /* start enqueuing packets 1 by 1 */ > + avail_idx = *((volatile uint16_t *)&vq->avail->idx); > + while (pkt_left && avail_idx != vq->last_used_idx) { > + if (enqueue_packet(dev, vq, avail_idx, pkts[pkt_idx], > + is_mrg_rxbuf)) > break; > - } > - > - nr_used = copy_mbuf_to_desc_mergeable(dev, vq, end, > - pkts[pkt_idx], buf_vec); > - rte_smp_wmb(); > - > - *(volatile uint16_t *)&vq->used->idx += nr_used; > - vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx), > - sizeof(vq->used->idx)); > - vq->last_used_idx += nr_used; > - } > - > - if (likely(pkt_idx)) { > - /* flush used->idx update before we read avail->flags. */ > - rte_mb(); > > - /* Kick the guest if necessary. */ > - if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT) > - && (vq->callfd >= 0)) > - eventfd_write(vq->callfd, (eventfd_t)1); > + pkt_idx++; > + pkt_sent++; pkt_idx and pkt_sent is duplicate here. --yliu > + pkt_left--; > } > > - return pkt_idx; > -} > - > -uint16_t > -rte_vhost_enqueue_burst(int vid, uint16_t queue_id, > - struct rte_mbuf **pkts, uint16_t count) > -{ > - struct virtio_net *dev = get_device(vid); > - > - if (!dev) > - return 0; > + /* update used idx and kick the guest if necessary */ > + if (pkt_sent) > + notify_guest(dev, vq); > > - if (dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF)) > - return virtio_dev_merge_rx(dev, queue_id, pkts, count); > - else > - return virtio_dev_rx(dev, queue_id, pkts, count); > + return pkt_sent; > } > > static void > -- > 2.7.4