* [PATCH 1/4] eal: add tailq for new distributor component
[not found] ` <1400580057-30155-1-git-send-email-bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
@ 2014-05-20 10:00 ` Bruce Richardson
2014-05-20 10:00 ` [PATCH 2/4] distributor: new packet distributor library Bruce Richardson
` (10 subsequent siblings)
11 siblings, 0 replies; 29+ messages in thread
From: Bruce Richardson @ 2014-05-20 10:00 UTC (permalink / raw)
To: dev-VfR2kkLFssw
add new tailq to the EAL for new distributor library component.
Signed-off-by: Bruce Richardson <bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
---
lib/librte_eal/common/include/rte_tailq_elem.h | 2 ++
1 file changed, 2 insertions(+)
diff --git a/lib/librte_eal/common/include/rte_tailq_elem.h b/lib/librte_eal/common/include/rte_tailq_elem.h
index 2de4010..fdd2faf 100644
--- a/lib/librte_eal/common/include/rte_tailq_elem.h
+++ b/lib/librte_eal/common/include/rte_tailq_elem.h
@@ -82,6 +82,8 @@ rte_tailq_elem(RTE_TAILQ_PM, "RTE_PM")
rte_tailq_elem(RTE_TAILQ_ACL, "RTE_ACL")
+rte_tailq_elem(RTE_TAILQ_DISTRIBUTOR, "RTE_DISTRIBUTOR")
+
rte_tailq_end(RTE_TAILQ_NUM)
#undef rte_tailq_elem
--
1.9.0
^ permalink raw reply related [flat|nested] 29+ messages in thread
* [PATCH 2/4] distributor: new packet distributor library
[not found] ` <1400580057-30155-1-git-send-email-bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
2014-05-20 10:00 ` [PATCH 1/4] eal: add tailq for new distributor component Bruce Richardson
@ 2014-05-20 10:00 ` Bruce Richardson
[not found] ` <1400580057-30155-3-git-send-email-bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
2014-05-20 10:00 ` [PATCH 3/4] distributor: add distributor library to build Bruce Richardson
` (9 subsequent siblings)
11 siblings, 1 reply; 29+ messages in thread
From: Bruce Richardson @ 2014-05-20 10:00 UTC (permalink / raw)
To: dev-VfR2kkLFssw
This adds the code for a new Intel DPDK library for packet distribution.
The distributor is a component which is designed to pass packets
one-at-a-time to workers, with dynamic load balancing. Using the RSS
field in the mbuf as a tag, the distributor tracks what packet tag is
being processed by what worker and then ensures that no two packets with
the same tag are in-flight simultaneously. Once a tag is not in-flight,
then the next packet with that tag will be sent to the next available
core.
Signed-off-by: Bruce Richardson <bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
---
lib/librte_distributor/Makefile | 50 ++++
lib/librte_distributor/rte_distributor.c | 417 +++++++++++++++++++++++++++++++
lib/librte_distributor/rte_distributor.h | 173 +++++++++++++
3 files changed, 640 insertions(+)
create mode 100644 lib/librte_distributor/Makefile
create mode 100644 lib/librte_distributor/rte_distributor.c
create mode 100644 lib/librte_distributor/rte_distributor.h
diff --git a/lib/librte_distributor/Makefile b/lib/librte_distributor/Makefile
new file mode 100644
index 0000000..36699f8
--- /dev/null
+++ b/lib/librte_distributor/Makefile
@@ -0,0 +1,50 @@
+# BSD LICENSE
+#
+# Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Intel Corporation nor the names of its
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = librte_distributor.a
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
+
+# all source are stored in SRCS-y
+SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) := rte_distributor.c
+
+# install this header file
+SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include := rte_distributor.h
+
+# this lib needs eal
+DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_eal
+DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_mbuf
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
new file mode 100644
index 0000000..cc8384e
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor.c
@@ -0,0 +1,417 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <sys/queue.h>
+#include <string.h>
+#include <rte_mbuf.h>
+#include <rte_memzone.h>
+#include <rte_errno.h>
+#include <rte_string_fns.h>
+#include <rte_tailq.h>
+#include <rte_eal_memconfig.h>
+#include "rte_distributor.h"
+
+#define NO_FLAGS 0
+#define RTE_DISTRIB_PREFIX "DT_"
+
+/* we will use the bottom four bits of pointer for flags, shifting out
+ * the top four bits to make room (since a 64-bit pointer actually only uses
+ * 48 bits). An arithmetic-right-shift will then appropriately restore the
+ * original pointer value with proper sign extension into the top bits. */
+#define RTE_DISTRIB_FLAG_BITS 4
+#define RTE_DISTRIB_FLAGS_MASK (0x0F)
+#define RTE_DISTRIB_NO_BUF 0
+#define RTE_DISTRIB_GET_BUF (1)
+#define RTE_DISTRIB_RETURN_BUF (2)
+
+#define RTE_DISTRIB_BACKLOG_SIZE 8
+#define RTE_DISTRIB_BACKLOG_MASK (RTE_DISTRIB_BACKLOG_SIZE - 1)
+
+#define RTE_DISTRIB_MAX_RETURNS 128
+#define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
+
+union rte_distributor_buffer {
+ volatile int64_t bufptr64;
+ char pad[CACHE_LINE_SIZE*3];
+} __rte_cache_aligned;
+
+struct rte_distributor_backlog {
+ unsigned start;
+ unsigned count;
+ int64_t pkts[RTE_DISTRIB_BACKLOG_SIZE];
+};
+
+struct rte_distributor_returned_pkts {
+ unsigned start;
+ unsigned count;
+ struct rte_mbuf *mbufs[RTE_DISTRIB_MAX_RETURNS];
+};
+
+struct rte_distributor {
+ TAILQ_ENTRY(rte_distributor) next; /**< Next in list. */
+
+ char name[RTE_DISTRIBUTOR_NAMESIZE]; /**< Name of the ring. */
+ unsigned num_workers; /**< Number of workers polling */
+
+ uint32_t in_flight_tags[RTE_MAX_LCORE];
+ struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
+
+ union rte_distributor_buffer bufs[RTE_MAX_LCORE];
+
+ struct rte_distributor_returned_pkts returns;
+};
+
+TAILQ_HEAD(rte_distributor_list, rte_distributor);
+
+/**** APIs called by workers ****/
+
+struct rte_mbuf *
+rte_distributor_get_pkt(struct rte_distributor *d,
+ unsigned worker_id, struct rte_mbuf *oldpkt,
+ unsigned reserved __rte_unused)
+{
+ union rte_distributor_buffer *buf = &d->bufs[worker_id];
+ int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS) | \
+ RTE_DISTRIB_GET_BUF;
+ while (unlikely(buf->bufptr64 & RTE_DISTRIB_FLAGS_MASK))
+ rte_pause();
+ buf->bufptr64 = req;
+ while (buf->bufptr64 & RTE_DISTRIB_GET_BUF)
+ rte_pause();
+ /* since bufptr64 is a signed value, this should be an arithmetic shift */
+ int64_t ret = buf->bufptr64 >> RTE_DISTRIB_FLAG_BITS;
+ return (struct rte_mbuf *)((uintptr_t)ret);
+}
+
+int
+rte_distributor_return_pkt(struct rte_distributor *d,
+ unsigned worker_id, struct rte_mbuf *oldpkt)
+{
+ union rte_distributor_buffer *buf = &d->bufs[worker_id];
+ uint64_t req = ((uintptr_t)oldpkt << RTE_DISTRIB_FLAG_BITS) | \
+ RTE_DISTRIB_RETURN_BUF;
+ buf->bufptr64 = req;
+ return 0;
+}
+
+/**** APIs called on distributor core ***/
+
+/* as name suggests, adds a packet to the backlog for a particular worker */
+static int
+add_to_backlog(struct rte_distributor_backlog *bl, int64_t item)
+{
+ if (bl->count == RTE_DISTRIB_BACKLOG_SIZE)
+ return -1;
+
+ bl->pkts[(bl->start + bl->count++) & (RTE_DISTRIB_BACKLOG_MASK)] = item;
+ return 0;
+}
+
+/* takes the next packet for a worker off the backlog */
+static int64_t
+backlog_pop(struct rte_distributor_backlog *bl)
+{
+ bl->count--;
+ return bl->pkts[bl->start++ & RTE_DISTRIB_BACKLOG_MASK];
+}
+
+/* stores a packet returned from a worker inside the returns array */
+static inline void
+store_return(uintptr_t oldbuf, struct rte_distributor *d,
+ unsigned *ret_start, unsigned *ret_count)
+{
+ /* store returns in a circular buffer - code is branch-free */
+ d->returns.mbufs[(*ret_start + *ret_count)
+ & RTE_DISTRIB_RETURNS_MASK] = (void *)oldbuf;
+ *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
+ *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
+}
+
+/* process a set of packets to distribute them to workers */
+int
+rte_distributor_process(struct rte_distributor *d,
+ struct rte_mbuf **mbufs, unsigned num_mbufs)
+{
+ unsigned next_idx = 0;
+ unsigned worker = 0;
+ struct rte_mbuf *next_mb = NULL;
+ int64_t next_value = 0;
+ uint32_t new_tag = 0;
+ unsigned ret_start = d->returns.start,
+ ret_count = d->returns.count;
+
+ while (next_idx < num_mbufs || next_mb != NULL) {
+
+ int64_t data = d->bufs[worker].bufptr64;
+ uintptr_t oldbuf = 0;
+
+ if (!next_mb) {
+ next_mb = mbufs[next_idx++];
+ next_value = (((int64_t)(uintptr_t)next_mb) << RTE_DISTRIB_FLAG_BITS);
+ new_tag = (next_mb->pkt.hash.rss | 1);
+
+ uint32_t match = 0;
+ unsigned i;
+ for (i = 0; i < d->num_workers; i++)
+ match |= (!(d->in_flight_tags[i] ^ new_tag) << i);
+
+ if (match) {
+ next_mb = NULL;
+ unsigned worker = __builtin_ctz(match);
+ if (add_to_backlog(&d->backlog[worker], next_value) < 0)
+ next_idx--;
+ }
+ }
+
+ if ((data & RTE_DISTRIB_GET_BUF) &&
+ (d->backlog[worker].count || next_mb)) {
+
+ if (d->backlog[worker].count)
+ d->bufs[worker].bufptr64 =
+ backlog_pop(&d->backlog[worker]);
+
+ else {
+ d->bufs[worker].bufptr64 = next_value;
+ d->in_flight_tags[worker] = new_tag;
+ next_mb = NULL;
+ }
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ }
+ else if (data & RTE_DISTRIB_RETURN_BUF) {
+ d->in_flight_tags[worker] = 0;
+ d->bufs[worker].bufptr64 = 0;
+ if (unlikely(d->backlog[worker].count != 0)) {
+ /* On return of a packet, we need to move the queued packets
+ * for this core elsewhere.
+ * Easiest solution is to set things up for
+ * a recursive call. That will cause those packets to be queued
+ * up for the next free core, i.e. it will return as soon as a
+ * core becomes free to accept the first packet, as subsequent
+ * ones will be added to the backlog for that core.
+ */
+ struct rte_mbuf *pkts[RTE_DISTRIB_BACKLOG_SIZE];
+ unsigned i;
+ struct rte_distributor_backlog *bl = &d->backlog[worker];
+
+ for (i = 0; i < bl->count; i++) {
+ unsigned idx = (bl->start + i) & RTE_DISTRIB_BACKLOG_MASK;
+ pkts[i] = (void *)((uintptr_t)
+ (bl->pkts[idx] >> RTE_DISTRIB_FLAG_BITS));
+ }
+ /* recursive call */
+ rte_distributor_process(d, pkts, i);
+ bl->count = bl->start = 0;
+ }
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ }
+
+ /* store returns in a circular buffer */
+ store_return(oldbuf, d, &ret_start, &ret_count);
+
+ if (++worker == d->num_workers)
+ worker = 0;
+ }
+ /* to finish, check all workers for backlog and schedule work for them
+ * if they are ready */
+ for (worker = 0; worker < d->num_workers; worker++)
+ if (d->backlog[worker].count &&
+ (d->bufs[worker].bufptr64 & RTE_DISTRIB_GET_BUF)) {
+
+ int64_t oldbuf = d->bufs[worker].bufptr64 >> RTE_DISTRIB_FLAG_BITS;
+ store_return(oldbuf, d, &ret_start, &ret_count);
+
+ d->bufs[worker].bufptr64 =
+ backlog_pop(&d->backlog[worker]);
+ }
+
+ d->returns.start = ret_start;
+ d->returns.count = ret_count;
+ return num_mbufs;
+}
+
+/* return to the caller, packets returned from workers */
+int
+rte_distributor_returned_pkts(struct rte_distributor *d,
+ struct rte_mbuf **mbufs, unsigned max_mbufs)
+{
+ struct rte_distributor_returned_pkts *returns = &d->returns;
+ unsigned retval = max_mbufs < returns->count ? max_mbufs : returns->count;
+ unsigned i;
+
+ for (i = 0; i < retval; i++)
+ mbufs[i] = returns->mbufs[(returns->start + i) &
+ RTE_DISTRIB_RETURNS_MASK];
+ returns->start += i;
+ returns->count -= i;
+
+ return retval;
+}
+
+/* local function used by the flush function only, to reassign a backlog for
+ * a shutdown core. The process function uses a recursive call for this, but
+ * that is not done in flush, as we need to track the outstanding packets count.
+ */
+static inline int
+move_worker_backlog(struct rte_distributor *d, unsigned worker)
+{
+ struct rte_distributor_backlog *bl = &d->backlog[worker];
+ unsigned i;
+
+ for (i = 0; i < d->num_workers; i++) {
+ if (i == worker)
+ continue;
+ /* check worker is active and then if backlog will fit */
+ if ((d->in_flight_tags[i] != 0 ||
+ (d->bufs[i].bufptr64 & RTE_DISTRIB_GET_BUF)) &&
+ (bl->count + d->backlog[i].count) <= RTE_DISTRIB_BACKLOG_SIZE) {
+ while (bl->count)
+ add_to_backlog(&d->backlog[i], backlog_pop(bl));
+ return 0;
+ }
+ }
+ return -1;
+}
+
+/* flush the distributor, so that there are no outstanding packets in flight or
+ * queued up. */
+int
+rte_distributor_flush(struct rte_distributor *d)
+{
+ unsigned worker, total_outstanding = 0;
+ unsigned flushed = 0;
+ unsigned ret_start = d->returns.start,
+ ret_count = d->returns.count;
+
+ for (worker = 0; worker < d->num_workers; worker++)
+ total_outstanding += d->backlog[worker].count +
+ !!(d->in_flight_tags[worker]);
+
+ worker = 0;
+ while (flushed < total_outstanding) {
+
+ if (d->in_flight_tags[worker] != 0 || d->backlog[worker].count) {
+ const int64_t data = d->bufs[worker].bufptr64;
+ uintptr_t oldbuf = 0;
+
+ if (data & RTE_DISTRIB_GET_BUF) {
+ flushed += (d->in_flight_tags[worker] != 0);
+ if (d->backlog[worker].count) {
+ d->bufs[worker].bufptr64 =
+ backlog_pop(&d->backlog[worker]);
+ /* we need to mark something as being in-flight, but it
+ * doesn't matter what as we never check it except
+ * to check for non-zero.
+ */
+ d->in_flight_tags[worker] = 1;
+ } else {
+ d->bufs[worker].bufptr64 = RTE_DISTRIB_GET_BUF;
+ d->in_flight_tags[worker] = 0;
+ }
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ }
+ else if (data & RTE_DISTRIB_RETURN_BUF) {
+ if (d->backlog[worker].count == 0 ||
+ move_worker_backlog(d, worker) == 0) {
+ /* only if we move backlog, process this packet */
+ d->bufs[worker].bufptr64 = 0;
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ flushed ++;
+ d->in_flight_tags[worker] = 0;
+ }
+ }
+
+ store_return(oldbuf, d, &ret_start, &ret_count);
+ }
+
+ if (++worker == d->num_workers)
+ worker = 0;
+ }
+ d->returns.start = ret_start;
+ d->returns.count = ret_count;
+
+ return flushed;
+}
+
+/* clears the internal returns array in the distributor */
+void
+rte_distributor_clear_returns(struct rte_distributor *d)
+{
+ d->returns.start = d->returns.count = 0;
+#ifndef __OPTIMIZE__
+ memset(d->returns.mbufs, 0, sizeof(d->returns.mbufs));
+#endif
+}
+
+/* creates a distributor instance */
+struct rte_distributor *
+rte_distributor_create(const char *name,
+ unsigned socket_id,
+ unsigned num_workers,
+ struct rte_distributor_extra_args *args __rte_unused)
+{
+ struct rte_distributor *d;
+ struct rte_distributor_list *distributor_list;
+ char mz_name[RTE_MEMZONE_NAMESIZE];
+ const struct rte_memzone *mz;
+
+ /* compilation-time checks */
+ RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0);
+ RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0);
+
+ if (name == NULL || num_workers >= RTE_MAX_LCORE) {
+ rte_errno = EINVAL;
+ return NULL;
+ }
+ rte_snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
+ mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
+ if (mz == NULL) {
+ rte_errno = ENOMEM;
+ return NULL;
+ }
+
+ /* check that we have an initialised tail queue */
+ if ((distributor_list = RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_DISTRIBUTOR,
+ rte_distributor_list)) == NULL) {
+ rte_errno = E_RTE_NO_TAILQ;
+ return NULL;
+ }
+
+ d = mz->addr;
+ rte_snprintf(d->name, sizeof(d->name), "%s", name);
+ d->num_workers = num_workers;
+ TAILQ_INSERT_TAIL(distributor_list, d, next);
+
+ return d;
+}
+
diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h
new file mode 100644
index 0000000..d684ff9
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor.h
@@ -0,0 +1,173 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_DISTRIBUTE_H_
+#define _RTE_DISTRIBUTE_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rte_mbuf.h>
+
+#define RTE_DISTRIBUTOR_NAMESIZE 32 /**< Length of name for instance */
+
+struct rte_distributor;
+
+struct rte_distributor_extra_args { }; /**< reserved for future use*/
+
+/**
+ * Function to create a new distributor instance
+ *
+ * Reserves the memory needed for the distributor operation and
+ * initializes the distributor to work with the configured number of workers.
+ *
+ * @param name
+ * The name to be given to the distributor instance.
+ * @param socket_id
+ * The NUMA node on which the memory is to be allocated
+ * @param num_workers
+ * The maximum number of workers that will request packets from this
+ * distributor
+ * @param extra_args
+ * Reserved for future use, should be passed in as NULL
+ * @return
+ * The newly created distributor instance
+ */
+struct rte_distributor *
+rte_distributor_create(const char *name, unsigned socket_id,
+ unsigned num_workers, struct rte_distributor_extra_args *extra_args);
+
+/**
+ * Process a set of packets by distributing them among workers that request
+ * packets. The distributor will ensure that no two packets that have the
+ * same flow id, or tag, in the mbuf will be procesed at the same time.
+ *
+ * NOTE: This is not thread safe, should only be called in one thread at a time
+ *
+ * @param d
+ * The distributor instance to be used
+ * @param mbufs
+ * The mbufs to be distributed
+ * @param num_mbufs
+ * The number of mbufs in the mbufs array
+ * @return
+ * The number of mbufs processed.
+ */
+int
+rte_distributor_process(struct rte_distributor *d,
+ struct rte_mbuf **mbufs, unsigned num_mbufs);
+
+/**
+ * Get a set of mbufs that have been returned to the distributor by workers
+ *
+ * @param d
+ * The distributor instance to be used
+ * @param mbufs
+ * The mbufs pointer array to be filled in
+ * @param max_mbufs
+ * The size of the mbufs array
+ * @return
+ * The number of mbufs returned in the mbufs array.
+ */
+int
+rte_distributor_returned_pkts(struct rte_distributor *d,
+ struct rte_mbuf **mbufs, unsigned max_mbufs);
+
+/**
+ * Flush the distributor component, so that there are no in-flight or
+ * backlogged packets awaiting processing
+ *
+ * @param d
+ * The distributor instance to be used
+ * @return
+ * The number of queued/in-flight packets that were completed by this call.
+ */
+int
+rte_distributor_flush(struct rte_distributor *d);
+
+/**
+ * Clears the array of returned packets used as the source for the
+ * rte_distributor_returned_pkts() API call.
+ *
+ * @param d
+ * The distributor instance to be used
+ */
+void
+rte_distributor_clear_returns(struct rte_distributor *d);
+
+/**
+ * API called by a worker to get a new packet to process. Any previous packet
+ * given to the worker is assumed to have completed processing, and may be
+ * optionally returned to the distributor via the oldpkt parameter.
+ *
+ * @param d
+ * The distributor instance to be used
+ * @param worker_id
+ * The worker instance number to use - must be less that num_workers passed
+ * at distributor creation time.
+ * @param oldpkt
+ * The previous packet, if any, being processed by the worker
+ * @param reserved
+ * Reserved for future use, should be set to zero.
+ *
+ * @return
+ * A new packet to be processed by the worker thread.
+ */
+struct rte_mbuf *
+rte_distributor_get_pkt(struct rte_distributor *d,
+ unsigned worker_id, struct rte_mbuf *oldpkt, unsigned reserved);
+
+/**
+ * API called by a worker to return a completed packet without requesting a
+ * new packet, for example, because a worker thread is shutting down
+ *
+ * @param d
+ * The distributor instance to be used
+ * @param worker_id
+ * The worker instance number to use - must be less that num_workers passed
+ * at distributor creation time.
+ * @param mbuf
+ * The previous packet being processed by the worker
+ */
+int
+rte_distributor_return_pkt(struct rte_distributor *d, unsigned worker_id,
+ struct rte_mbuf *mbuf);
+
+/******************************************/
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
--
1.9.0
^ permalink raw reply related [flat|nested] 29+ messages in thread
* [PATCH 3/4] distributor: add distributor library to build
[not found] ` <1400580057-30155-1-git-send-email-bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
2014-05-20 10:00 ` [PATCH 1/4] eal: add tailq for new distributor component Bruce Richardson
2014-05-20 10:00 ` [PATCH 2/4] distributor: new packet distributor library Bruce Richardson
@ 2014-05-20 10:00 ` Bruce Richardson
2014-05-20 10:00 ` [PATCH 4/4] distributor: add unit tests for distributor lib Bruce Richardson
` (8 subsequent siblings)
11 siblings, 0 replies; 29+ messages in thread
From: Bruce Richardson @ 2014-05-20 10:00 UTC (permalink / raw)
To: dev-VfR2kkLFssw
add new configuration settings to enable/disable the distributor library
and add makefile entry to compile it once enabled.
Signed-off-by: Bruce Richardson <bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
---
config/defconfig_i686-default-linuxapp-gcc | 5 +++++
config/defconfig_i686-default-linuxapp-icc | 5 +++++
config/defconfig_x86_64-default-bsdapp-gcc | 6 ++++++
config/defconfig_x86_64-default-linuxapp-gcc | 5 +++++
config/defconfig_x86_64-default-linuxapp-icc | 5 +++++
lib/Makefile | 1 +
mk/rte.app.mk | 4 ++++
7 files changed, 31 insertions(+)
diff --git a/config/defconfig_i686-default-linuxapp-gcc b/config/defconfig_i686-default-linuxapp-gcc
index 14bd3d1..5b4261e 100644
--- a/config/defconfig_i686-default-linuxapp-gcc
+++ b/config/defconfig_i686-default-linuxapp-gcc
@@ -335,3 +335,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
#
CONFIG_RTE_NIC_BYPASS=n
+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/config/defconfig_i686-default-linuxapp-icc b/config/defconfig_i686-default-linuxapp-icc
index ec3386e..d1d4aeb 100644
--- a/config/defconfig_i686-default-linuxapp-icc
+++ b/config/defconfig_i686-default-linuxapp-icc
@@ -334,3 +334,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
#
CONFIG_RTE_NIC_BYPASS=n
+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/config/defconfig_x86_64-default-bsdapp-gcc b/config/defconfig_x86_64-default-bsdapp-gcc
index d960e1d..329920e 100644
--- a/config/defconfig_x86_64-default-bsdapp-gcc
+++ b/config/defconfig_x86_64-default-bsdapp-gcc
@@ -300,3 +300,9 @@ CONFIG_RTE_APP_TEST=y
CONFIG_RTE_TEST_PMD=y
CONFIG_RTE_TEST_PMD_RECORD_CORE_CYCLES=n
CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
+
+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/config/defconfig_x86_64-default-linuxapp-gcc b/config/defconfig_x86_64-default-linuxapp-gcc
index f11ffbf..772a6b3 100644
--- a/config/defconfig_x86_64-default-linuxapp-gcc
+++ b/config/defconfig_x86_64-default-linuxapp-gcc
@@ -337,3 +337,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
#
CONFIG_RTE_NIC_BYPASS=n
+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/config/defconfig_x86_64-default-linuxapp-icc b/config/defconfig_x86_64-default-linuxapp-icc
index 4eaca4c..04affc8 100644
--- a/config/defconfig_x86_64-default-linuxapp-icc
+++ b/config/defconfig_x86_64-default-linuxapp-icc
@@ -333,3 +333,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
#
CONFIG_RTE_NIC_BYPASS=n
+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/lib/Makefile b/lib/Makefile
index b92b392..5a0b10f 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -55,6 +55,7 @@ DIRS-$(CONFIG_RTE_LIBRTE_METER) += librte_meter
DIRS-$(CONFIG_RTE_LIBRTE_SCHED) += librte_sched
DIRS-$(CONFIG_RTE_LIBRTE_ACL) += librte_acl
DIRS-$(CONFIG_RTE_LIBRTE_KVARGS) += librte_kvargs
+DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += librte_distributor
ifeq ($(CONFIG_RTE_EXEC_ENV_LINUXAPP),y)
DIRS-$(CONFIG_RTE_LIBRTE_KNI) += librte_kni
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index a2c60b6..82a160f 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -73,6 +73,10 @@ LDLIBS += -lrte_ivshmem
endif
endif
+ifeq ($(CONFIG_RTE_LIBRTE_DISTRIBUTOR),y)
+LDLIBS += -lrte_distributor
+endif
+
ifeq ($(CONFIG_RTE_LIBRTE_E1000_PMD),y)
LDLIBS += -lrte_pmd_e1000
endif
--
1.9.0
^ permalink raw reply related [flat|nested] 29+ messages in thread
* [PATCH 4/4] distributor: add unit tests for distributor lib
[not found] ` <1400580057-30155-1-git-send-email-bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
` (2 preceding siblings ...)
2014-05-20 10:00 ` [PATCH 3/4] distributor: add distributor library to build Bruce Richardson
@ 2014-05-20 10:00 ` Bruce Richardson
2014-05-20 10:38 ` [PATCH 0/4] New library: rte_distributor Neil Horman
` (7 subsequent siblings)
11 siblings, 0 replies; 29+ messages in thread
From: Bruce Richardson @ 2014-05-20 10:00 UTC (permalink / raw)
To: dev-VfR2kkLFssw
Add a set of unit tests and some basic performance test for the
distributor library. These tests cover all the major functionality of
the library on both distributor and worker sides.
Signed-off-by: Bruce Richardson <bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
---
app/test/Makefile | 2 +
app/test/commands.c | 7 +-
app/test/test.h | 2 +
app/test/test_distributor.c | 582 +++++++++++++++++++++++++++++++++++++++
app/test/test_distributor_perf.c | 274 ++++++++++++++++++
5 files changed, 866 insertions(+), 1 deletion(-)
create mode 100644 app/test/test_distributor.c
create mode 100644 app/test/test_distributor_perf.c
diff --git a/app/test/Makefile b/app/test/Makefile
index b49785e..7c2d351 100644
--- a/app/test/Makefile
+++ b/app/test/Makefile
@@ -93,6 +93,8 @@ SRCS-$(CONFIG_RTE_APP_TEST) += test_power.c
SRCS-$(CONFIG_RTE_APP_TEST) += test_common.c
SRCS-$(CONFIG_RTE_APP_TEST) += test_timer_perf.c
SRCS-$(CONFIG_RTE_APP_TEST) += test_ivshmem.c
+SRCS-$(CONFIG_RTE_APP_TEST) += test_distributor.c
+SRCS-$(CONFIG_RTE_APP_TEST) += test_distributor_perf.c
SRCS-$(CONFIG_RTE_APP_TEST) += test_devargs.c
ifeq ($(CONFIG_RTE_APP_TEST),y)
diff --git a/app/test/commands.c b/app/test/commands.c
index efa8566..dfdbd37 100644
--- a/app/test/commands.c
+++ b/app/test/commands.c
@@ -179,6 +179,10 @@ static void cmd_autotest_parsed(void *parsed_result,
ret = test_common();
if (!strcmp(res->autotest, "ivshmem_autotest"))
ret = test_ivshmem();
+ if (!strcmp(res->autotest, "distributor_autotest"))
+ ret = test_distributor();
+ if (!strcmp(res->autotest, "distributor_perf_autotest"))
+ ret = test_distributor_perf();
if (!strcmp(res->autotest, "devargs_autotest"))
ret = test_devargs();
#ifdef RTE_LIBRTE_PMD_RING
@@ -238,7 +242,8 @@ cmdline_parse_token_string_t cmd_autotest_autotest =
#ifdef RTE_LIBRTE_KVARGS
"kvargs_autotest#"
#endif
- "common_autotest");
+ "common_autotest#"
+ "distributor_autotest#distributor_perf_autotest");
cmdline_parse_inst_t cmd_autotest = {
.f = cmd_autotest_parsed, /* function to call */
diff --git a/app/test/test.h b/app/test/test.h
index 1945d29..9b83ade 100644
--- a/app/test/test.h
+++ b/app/test/test.h
@@ -92,6 +92,8 @@ int test_power(void);
int test_common(void);
int test_pmd_ring(void);
int test_ivshmem(void);
+int test_distributor(void);
+int test_distributor_perf(void);
int test_kvargs(void);
int test_devargs(void);
diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
new file mode 100644
index 0000000..3c6bef0
--- /dev/null
+++ b/app/test/test_distributor.c
@@ -0,0 +1,582 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+#ifdef RTE_LIBRTE_DISTRIBUTOR
+#include <unistd.h>
+#include <string.h>
+#include <rte_cycles.h>
+#include <rte_errno.h>
+#include <rte_distributor.h>
+
+#define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
+#define BURST 32
+#define BIG_BATCH 1024
+
+static volatile int quit = 0; /**< general quit variable for all threads */
+static volatile int zero_quit = 0; /**< var for when we just want thr0 to quit*/
+static volatile unsigned worker_idx =0;
+
+struct worker_stats{
+ volatile unsigned handled_packets;
+} __rte_cache_aligned;
+struct worker_stats worker_stats[RTE_MAX_LCORE];
+
+/* returns the total count of the number of packets handled by the worker
+ * functions given below.
+ */
+static inline unsigned
+total_packet_count(void)
+{
+ unsigned i, count = 0;
+ for (i = 0; i < worker_idx; i++)
+ count += worker_stats[i].handled_packets;
+ return count;
+}
+
+/* resets the packet counts for a new test */
+static inline void
+clear_packet_count(void)
+{
+ memset(&worker_stats, 0, sizeof(worker_stats));
+}
+
+/* this is the basic worker function for sanity test
+ * it does nothing but return packets and count them.
+ */
+static int
+handle_work(void *arg)
+{
+ struct rte_mbuf *pkt = NULL;
+ struct rte_distributor *d = arg;
+ unsigned count = 0;
+ unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+
+ pkt = rte_distributor_get_pkt(d, id, NULL, 0);
+ while (!quit) {
+ worker_stats[id].handled_packets++, count++;
+ pkt = rte_distributor_get_pkt(d, id, pkt, 0);
+ }
+ worker_stats[id].handled_packets++, count++;
+ rte_distributor_return_pkt(d, id, pkt);
+ return 0;
+}
+
+/* do basic sanity testing of the distributor. This test tests the following:
+ * - send 32 packets through distributor with the same tag and ensure they
+ * all go to the one worker
+ * - send 32 packets throught the distributor with two different tags and
+ * verify that they go equally to two different workers.
+ * - send 32 packets with different tags through the distributors and
+ * just verify we get all packets back.
+ * - send 1024 packets through the distributor, gathering the returned packets
+ * as we go. Then verify that we correctly got all 1024 pointers back again,
+ * not necessarily in the same order (as different flows).
+ */
+static int
+sanity_test(struct rte_distributor *d, struct rte_mempool *p)
+{
+ struct rte_mbuf *bufs[BURST];
+ unsigned i;
+
+ printf("=== Basic distributor sanity tests ===\n");
+ clear_packet_count();
+ if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+
+ /* now set all hash values in all buffers to zero, so all pkts go to the
+ * one worker thread */
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = 0;
+
+ rte_distributor_process(d, bufs, BURST);
+ rte_distributor_flush(d);
+ if (total_packet_count() != BURST) {
+ printf("Line %d: Error, not all packets flushed. Expected %u, got %u\n",
+ __LINE__, BURST, total_packet_count());
+ return -1;
+ }
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+ printf("Sanity test with all zero hashes done.\n");
+ if (worker_stats[0].handled_packets != BURST)
+ return -1;
+
+ /* pick two flows and check they go correctly */
+ if (rte_lcore_count() >= 3) {
+ clear_packet_count();
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = (i & 1) << 8;
+
+ rte_distributor_process(d, bufs, BURST);
+ rte_distributor_flush(d);
+ if (total_packet_count() != BURST) {
+ printf("Line %d: Error, not all packets flushed. "
+ "Expected %u, got %u\n",
+ __LINE__, BURST, total_packet_count());
+ return -1;
+ }
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+ printf("Sanity test with two hash values done\n");
+
+ if (worker_stats[0].handled_packets != 16 ||
+ worker_stats[1].handled_packets != 16)
+ return -1;
+ }
+
+ /* give a different hash value to each packet, so load gets distributed */
+ clear_packet_count();
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = i;
+
+ rte_distributor_process(d, bufs, BURST);
+ rte_distributor_flush(d);
+ if (total_packet_count() != BURST) {
+ printf("Line %d: Error, not all packets flushed. Expected %u, got %u\n",
+ __LINE__, BURST, total_packet_count());
+ return -1;
+ }
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+ printf("Sanity test with non-zero hashes done\n");
+
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
+
+ /* sanity test with BIG_BATCH packets to ensure they all arrived back from
+ * the returned packets function */
+ clear_packet_count();
+ struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH];
+ unsigned num_returned = 0;
+
+ /* flush out any remaining packets */
+ rte_distributor_flush(d);
+ rte_distributor_clear_returns(d);
+ if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+ for (i = 0; i < BIG_BATCH; i++)
+ many_bufs[i]->pkt.hash.rss = i << 2;
+
+ for (i = 0; i < BIG_BATCH/BURST; i++) {
+ rte_distributor_process(d, &many_bufs[i*BURST], BURST);
+ num_returned += rte_distributor_returned_pkts(d,
+ &return_bufs[num_returned], BIG_BATCH - num_returned);
+ }
+ rte_distributor_flush(d);
+ num_returned += rte_distributor_returned_pkts(d,
+ &return_bufs[num_returned], BIG_BATCH - num_returned);
+
+ if (num_returned != BIG_BATCH) {
+ printf("line %d: Number returned is not the same as number sent\n",
+ __LINE__);
+ return -1;
+ }
+ /* big check - make sure all packets made it back!! */
+ for (i = 0; i < BIG_BATCH; i++) {
+ unsigned j;
+ struct rte_mbuf *src = many_bufs[i];
+ for (j = 0; j < BIG_BATCH; j++)
+ if (return_bufs[j] == src) {
+ break;
+ }
+ if (j == BIG_BATCH) {
+ printf("Error: could not find source packet #%u\n", i);
+ return -1;
+ }
+ }
+ printf("Sanity test of returned packets done\n");
+
+ rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
+
+ printf("\n");
+ return 0;
+}
+
+
+/* to test that the distributor does not lose packets, we use this worker
+ * function which frees mbufs when it gets them. The distributor thread does
+ * the mbuf allocation. If distributor drops packets we'll eventually run out
+ * of mbufs.
+ */
+static int
+handle_work_with_free_mbufs(void *arg)
+{
+ struct rte_mbuf *pkt = NULL;
+ struct rte_distributor *d = arg;
+ unsigned count = 0;
+ unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+
+ pkt = rte_distributor_get_pkt(d, id, NULL, 0);
+ while (!quit) {
+ worker_stats[id].handled_packets++, count++;
+ rte_pktmbuf_free(pkt);
+ pkt = rte_distributor_get_pkt(d, id, pkt, 0);
+ }
+ worker_stats[id].handled_packets++, count++;
+ rte_distributor_return_pkt(d, id, pkt);
+ return 0;
+}
+
+/* Perform a sanity test of the distributor with a large number of packets,
+ * where we allocate a new set of mbufs for each burst. The workers then
+ * free the mbufs. This ensures that we don't have any packet leaks in the
+ * library.
+ */
+static int
+sanity_test_with_mbuf_alloc(struct rte_distributor *d, struct rte_mempool *p)
+{
+ unsigned i;
+ struct rte_mbuf *bufs[BURST];
+
+ printf("=== Sanity test with mbuf alloc/free ===\n");
+ clear_packet_count();
+ for (i = 0; i < ((1<<ITER_POWER)); i+=BURST) {
+ unsigned j;
+ while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
+ rte_distributor_process(d, NULL, 0);
+ for (j = 0; j < BURST; j++) {
+ bufs[j]->pkt.hash.rss = (i+j) << 1;
+ bufs[j]->refcnt = 1;
+ }
+
+ rte_distributor_process(d, bufs, BURST);
+ }
+
+ rte_distributor_flush(d);
+ if (total_packet_count() < (1<<ITER_POWER)) {
+ printf("Line %u: Packet count is incorrect, %u, expected %u\n",
+ __LINE__, total_packet_count(), (1<<ITER_POWER));
+ return -1;
+ }
+
+ printf("Sanity test with mbuf alloc/free passed\n\n");
+ return 0;
+}
+
+static int
+handle_work_for_shutdown_test(void *arg)
+{
+ struct rte_mbuf *pkt = NULL;
+ struct rte_distributor *d = arg;
+ unsigned count = 0;
+ const unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+
+ pkt = rte_distributor_get_pkt(d, id, NULL, 0);
+ /* wait for quit single globally, or for worker zero wait for zero_quit */
+ while (!quit && !(id == 0 && zero_quit)) {
+ worker_stats[id].handled_packets++, count++;
+ rte_pktmbuf_free(pkt);
+ pkt = rte_distributor_get_pkt(d, id, NULL, 0);
+ }
+ worker_stats[id].handled_packets++, count++;
+ rte_distributor_return_pkt(d, id, pkt);
+
+ if (id == 0) {
+ /* for worker zero, allow it to restart to pick up last packet
+ * when all workers are shutting down.
+ */
+ while(zero_quit)
+ usleep(100);
+ pkt = rte_distributor_get_pkt(d, id, NULL, 0);
+ while (!quit) {
+ worker_stats[id].handled_packets++, count++;
+ rte_pktmbuf_free(pkt);
+ pkt = rte_distributor_get_pkt(d, id, NULL, 0);
+ }
+ rte_distributor_return_pkt(d, id, pkt);
+ }
+ return 0;
+}
+
+
+/* Perform a sanity test of the distributor with a large number of packets,
+ * where we allocate a new set of mbufs for each burst. The workers then
+ * free the mbufs. This ensures that we don't have any packet leaks in the
+ * library.
+ */
+static int
+sanity_test_with_worker_shutdown(struct rte_distributor *d, struct rte_mempool *p)
+{
+ struct rte_mbuf *bufs[BURST];
+ unsigned i;
+
+ printf("=== Sanity test of worker shutdown ===\n");
+
+ clear_packet_count();
+ if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+
+ /* now set all hash values in all buffers to zero, so all pkts go to the
+ * one worker thread */
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = 0;
+
+ rte_distributor_process(d, bufs, BURST);
+ /* at this point, we will have processed some packets and have a full backlog
+ * for the other ones at worker 0.
+ */
+
+ /* get more buffers to queue up, again setting them to the same flow */
+ if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = 0;
+
+ /* get worker zero to quit */
+ zero_quit = 1;
+ rte_distributor_process(d, bufs, BURST);
+
+ /* flush the distributor */
+ rte_distributor_flush(d);
+ if (total_packet_count() != BURST * 2) {
+ printf("Line %d: Error, not all packets flushed. Expected %u, got %u\n",
+ __LINE__, BURST * 2, total_packet_count());
+ return -1;
+ }
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+
+ printf("Sanity test with worker shutdown passed\n\n");
+ return 0;
+}
+
+/* Test that the flush function is able to move packets between workers when
+ * one worker shuts down..
+ */
+static int
+test_flush_with_worker_shutdown(struct rte_distributor *d, struct rte_mempool *p)
+{
+ struct rte_mbuf *bufs[BURST];
+ unsigned i;
+
+ printf("=== Test flush fn with worker shutdown ===\n");
+
+ clear_packet_count();
+ if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+
+ /* now set all hash values in all buffers to zero, so all pkts go to the
+ * one worker thread */
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = 0;
+
+ rte_distributor_process(d, bufs, BURST);
+ /* at this point, we will have processed some packets and have a full backlog
+ * for the other ones at worker 0.
+ */
+
+ /* get worker zero to quit */
+ zero_quit = 1;
+
+ /* flush the distributor */
+ rte_distributor_flush(d);
+
+ zero_quit = 0;
+ if (total_packet_count() != BURST) {
+ printf("Line %d: Error, not all packets flushed. Expected %u, got %u\n",
+ __LINE__, BURST, total_packet_count());
+ return -1;
+ }
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+
+ printf("Flush test with worker shutdown passed\n\n");
+ return 0;
+}
+
+static
+int test_error_distributor_create_name(void)
+{
+ struct rte_distributor *d = NULL;
+ char* name = NULL;
+
+ d = rte_distributor_create(name, rte_socket_id(),
+ rte_lcore_count() - 1, NULL);
+ if (d != NULL || rte_errno != EINVAL) {
+ printf("ERROR: distributor_name is NULL, yet create call succeeded\n");
+ return -1;
+ }
+
+ return 0;
+}
+
+
+static
+int test_error_distributor_create_numworkers(void)
+{
+ struct rte_distributor *d = NULL;
+ d = rte_distributor_create("test_numworkers", rte_socket_id(),
+ RTE_MAX_LCORE + 10, NULL);
+ if (d != NULL || rte_errno != EINVAL) {
+ printf("ERROR: num_workers >= RTE_MAX_LCORE yet create call succeed\n");
+ return -1;
+ }
+ return 0;
+}
+
+
+/* Useful function which ensures that all worker functions terminate */
+static void
+quit_workers(struct rte_distributor *d, struct rte_mempool *p)
+{
+ const unsigned num_workers = rte_lcore_count() - 1;
+ unsigned i;
+ struct rte_mbuf *bufs[RTE_MAX_LCORE];
+ rte_mempool_get_bulk(p, (void *)bufs, num_workers);
+
+ zero_quit = 0;
+ quit = 1;
+ for (i = 0; i < num_workers; i++)
+ bufs[i]->pkt.hash.rss = i << 1;
+ rte_distributor_process(d, bufs, num_workers);
+
+ rte_mempool_put_bulk(p, (void *)bufs, num_workers);
+
+ rte_distributor_process(d, NULL, 0);
+ rte_distributor_flush(d);
+ rte_eal_mp_wait_lcore();
+ quit = 0;
+ worker_idx = 0;
+}
+
+#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
+
+int
+test_distributor(void)
+{
+ static struct rte_distributor *d = NULL;
+ static struct rte_mempool *p = NULL;
+
+ if (rte_lcore_count() < 2) {
+ printf("ERROR: not enough cores to test distributor\n");
+ return -1;
+ }
+
+ if (d == NULL) {
+ d = rte_distributor_create("Test_distributor", rte_socket_id(),
+ rte_lcore_count() - 1, NULL);
+ if (d == NULL) {
+ printf("Error creating distributor\n");
+ return -1;
+ }
+ } else {
+ rte_distributor_flush(d);
+ rte_distributor_clear_returns(d);
+ }
+
+ const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
+ BIG_BATCH * 2 -1 : (511 * rte_lcore_count());
+ if (p == NULL) {
+ p = rte_mempool_create("DT_MBUF_POOL", nb_bufs,
+ MBUF_SIZE, BURST,
+ sizeof(struct rte_pktmbuf_pool_private),
+ rte_pktmbuf_pool_init, NULL,
+ rte_pktmbuf_init, NULL,
+ rte_socket_id(), 0);
+ if (p == NULL) {
+ printf("Error creating mempool\n");
+ return -1;
+ }
+ }
+
+ rte_eal_mp_remote_launch(handle_work, d, SKIP_MASTER);
+ if (sanity_test(d, p) < 0)
+ goto err;
+ quit_workers(d, p);
+
+ rte_eal_mp_remote_launch(handle_work_with_free_mbufs, d, SKIP_MASTER);
+ if (sanity_test_with_mbuf_alloc(d, p) < 0)
+ goto err;
+ quit_workers(d, p);
+
+ if (rte_lcore_count() > 2) {
+ rte_eal_mp_remote_launch(handle_work_for_shutdown_test, d, SKIP_MASTER);
+ if (sanity_test_with_worker_shutdown(d, p) < 0)
+ goto err;
+ quit_workers(d, p);
+
+ rte_eal_mp_remote_launch(handle_work_for_shutdown_test, d, SKIP_MASTER);
+ if (test_flush_with_worker_shutdown(d, p) < 0)
+ goto err;
+ quit_workers(d, p);
+
+ } else {
+ printf("Not enough cores to run tests for worker shutdown\n");
+ }
+
+ if (test_error_distributor_create_numworkers() == -1 ||
+ test_error_distributor_create_name() == -1) {
+ printf("rte_distributor_create parameter check tests failed");
+ return -1;
+ }
+
+ return 0;
+
+err:
+ quit_workers(d, p);
+ return -1;
+}
+
+#else
+
+#include <stdio.h>
+
+int
+test_distributor(void)
+{
+ printf("Distributor is not enabled in configuration\n");
+ return 0;
+}
+
+#endif
diff --git a/app/test/test_distributor_perf.c b/app/test/test_distributor_perf.c
new file mode 100644
index 0000000..29e4013
--- /dev/null
+++ b/app/test/test_distributor_perf.c
@@ -0,0 +1,274 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+#ifdef RTE_LIBRTE_DISTRIBUTOR
+#include <unistd.h>
+#include <string.h>
+#include <rte_cycles.h>
+#include <rte_distributor.h>
+
+#define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
+#define BURST 32
+#define BIG_BATCH 1024
+
+static volatile int quit = 0;
+static volatile unsigned worker_idx;
+
+struct worker_stats{
+ volatile unsigned handled_packets;
+} __rte_cache_aligned;
+struct worker_stats worker_stats[RTE_MAX_LCORE];
+
+/* worker thread used for testing the time to do a round-trip of a cache
+ * line between two cores and back again
+ */
+static void
+flip_bit(volatile uint64_t *arg)
+{
+ uint64_t old_val = 0;
+ while (old_val != 2) {
+ while(!*arg)
+ rte_pause();
+ old_val = *arg;
+ *arg = 0;
+ }
+}
+
+/* test case to time the number of cycles to round-trip a cache line between
+ * two cores and back again.
+ */
+static void
+time_cache_line_switch(void)
+{
+ /* allocate a full cache line for data, we use only first byte of it */
+ uint64_t data[CACHE_LINE_SIZE*3 / sizeof(uint64_t)];
+
+ unsigned i, slaveid = rte_get_next_lcore(rte_lcore_id(), 0, 0);
+ volatile uint64_t *pdata = &data[0];
+ *pdata = 1;
+ rte_eal_remote_launch((lcore_function_t *)flip_bit, &data[0], slaveid);
+ while (*pdata)
+ rte_pause();
+
+ const uint64_t start_time = rte_rdtsc();
+ for (i = 0; i < (1<< ITER_POWER); i++) {
+ while (*pdata)
+ rte_pause();
+ *pdata = 1;
+ }
+ const uint64_t end_time = rte_rdtsc();
+
+ while (*pdata)
+ rte_pause();
+ *pdata = 2;
+ rte_eal_wait_lcore(slaveid);
+ printf("==== Cache line switch test ===\n");
+ printf("Time for %u iterations = %"PRIu64" ticks\n", (1<<ITER_POWER),
+ end_time-start_time);
+ printf("Ticks per iteration = %"PRIu64"\n\n",
+ (end_time-start_time) >> ITER_POWER);
+}
+
+/* returns the total count of the number of packets handled by the worker
+ * functions given below.
+ */
+static unsigned
+total_packet_count(void)
+{
+ unsigned i, count = 0;
+ for (i = 0; i < worker_idx; i++)
+ count += worker_stats[i].handled_packets;
+ return count;
+}
+
+/* resets the packet counts for a new test */
+static void
+clear_packet_count(void)
+{
+ memset(&worker_stats, 0, sizeof(worker_stats));
+}
+
+/* this is the basic worker function for performance tests.
+ * it does nothing but return packets and count them.
+ */
+static int
+handle_work(void *arg)
+{
+ struct rte_mbuf *pkt = NULL;
+ struct rte_distributor *d = arg;
+ unsigned count = 0;
+ unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+
+ pkt = rte_distributor_get_pkt(d, id, NULL, 0);
+ while (!quit) {
+ worker_stats[id].handled_packets++, count++;
+ pkt = rte_distributor_get_pkt(d, id, pkt, 0);
+ }
+ worker_stats[id].handled_packets++, count++;
+ rte_distributor_return_pkt(d, id, pkt);
+ return 0;
+}
+
+/* this basic performance test just repeatedly sends in 32 packets at a time
+ * to the distributor and verifies at the end that we got them all in the worker
+ * threads and finally how long per packet the processing took.
+ */
+static inline int
+perf_test(struct rte_distributor *d, struct rte_mempool *p)
+{
+ unsigned i;
+ uint64_t start, end;
+ struct rte_mbuf *bufs[BURST];
+
+ clear_packet_count();
+ if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ printf("Error getting mbufs from pool\n");
+ return -1;
+ }
+ /* ensure we have different hash value for each pkt */
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = i;
+
+ start = rte_rdtsc();
+ for (i = 0; i < (1<<ITER_POWER); i++)
+ rte_distributor_process(d, bufs, BURST);
+ end = rte_rdtsc();
+
+ do {
+ usleep(100);
+ rte_distributor_process(d, NULL, 0);
+ } while (total_packet_count() < (BURST << ITER_POWER));
+
+ printf("=== Performance test of distributor ===\n");
+ printf("Time per burst: %"PRIu64"\n", (end - start) >> ITER_POWER);
+ printf("Time per packet: %"PRIu64"\n\n",
+ ((end - start) >> ITER_POWER)/BURST);
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+ printf("Total packets: %u (%x)\n", total_packet_count(),
+ total_packet_count());
+ printf("=== Perf test done ===\n\n");
+
+ return 0;
+}
+
+/* Useful function which ensures that all worker functions terminate */
+static void
+quit_workers(struct rte_distributor *d, struct rte_mempool *p)
+{
+ const unsigned num_workers = rte_lcore_count() - 1;
+ unsigned i;
+ struct rte_mbuf *bufs[RTE_MAX_LCORE];
+ rte_mempool_get_bulk(p, (void *)bufs, num_workers);
+
+ quit = 1;
+ for (i = 0; i < num_workers; i++)
+ bufs[i]->pkt.hash.rss = i << 1;
+ rte_distributor_process(d, bufs, num_workers);
+
+ rte_mempool_put_bulk(p, (void *)bufs, num_workers);
+
+ rte_distributor_process(d, NULL, 0);
+ rte_eal_mp_wait_lcore();
+ quit = 0;
+ worker_idx = 0;
+}
+
+#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
+
+int
+test_distributor_perf(void)
+{
+ static struct rte_distributor *d = NULL;
+ static struct rte_mempool *p = NULL;
+
+ if (rte_lcore_count() < 2) {
+ printf("ERROR: not enough cores to test distributor\n");
+ return -1;
+ }
+
+ /* first time how long it takes to round-trip a cache line */
+ time_cache_line_switch();
+
+ if (d == NULL) {
+ d = rte_distributor_create("Test_perf", rte_socket_id(),
+ rte_lcore_count() - 1, NULL);
+ if (d == NULL) {
+ printf("Error creating distributor\n");
+ return -1;
+ }
+ } else {
+ rte_distributor_flush(d);
+ rte_distributor_clear_returns(d);
+ }
+
+ const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
+ BIG_BATCH * 2 -1 : (511 * rte_lcore_count());
+ if (p == NULL) {
+ p = rte_mempool_create("DPT_MBUF_POOL", nb_bufs,
+ MBUF_SIZE, BURST,
+ sizeof(struct rte_pktmbuf_pool_private),
+ rte_pktmbuf_pool_init, NULL,
+ rte_pktmbuf_init, NULL,
+ rte_socket_id(), 0);
+ if (p == NULL) {
+ printf("Error creating mempool\n");
+ return -1;
+ }
+ }
+
+ rte_eal_mp_remote_launch(handle_work, d, SKIP_MASTER);
+ if (perf_test(d,p) < 0)
+ return -1;
+ quit_workers(d, p);
+
+ return 0;
+}
+
+#else
+
+#include <stdio.h>
+
+int
+test_distributor_perf(void)
+{
+ printf("Distributor is not enabled in configuration\n");
+ return 0;
+}
+
+#endif
--
1.9.0
^ permalink raw reply related [flat|nested] 29+ messages in thread
* Re: [PATCH 0/4] New library: rte_distributor
[not found] ` <1400580057-30155-1-git-send-email-bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
` (3 preceding siblings ...)
2014-05-20 10:00 ` [PATCH 4/4] distributor: add unit tests for distributor lib Bruce Richardson
@ 2014-05-20 10:38 ` Neil Horman
[not found] ` <20140520103845.GA6648-B26myB8xz7F8NnZeBjwnZQMhkBWG/bsMQH7oEaQurus@public.gmane.org>
2014-05-27 22:32 ` Thomas Monjalon
` (6 subsequent siblings)
11 siblings, 1 reply; 29+ messages in thread
From: Neil Horman @ 2014-05-20 10:38 UTC (permalink / raw)
To: Bruce Richardson; +Cc: dev-VfR2kkLFssw
On Tue, May 20, 2014 at 11:00:53AM +0100, Bruce Richardson wrote:
> This adds a new library to the Intel DPDK whereby a set of packets can be distributed one-at-a-time to a set of worker cores, with dynamic load balancing being done between those workers. Flows are identified by a tag within the mbuf (currently the RSS hash field, 32-bit value), which is used to ensure that no two packets of the same flow are processed in parallel, thereby preserving ordering.
>
> Bruce Richardson (4):
> eal: add tailq for new distributor component
> distributor: new packet distributor library
> distributor: add distributor library to build
> distributor: add unit tests for distributor lib
>
> app/test/Makefile | 2 +
> app/test/commands.c | 7 +-
> app/test/test.h | 2 +
> app/test/test_distributor.c | 582 +++++++++++++++++++++++++
> app/test/test_distributor_perf.c | 274 ++++++++++++
> config/defconfig_i686-default-linuxapp-gcc | 5 +
> config/defconfig_i686-default-linuxapp-icc | 5 +
> config/defconfig_x86_64-default-bsdapp-gcc | 6 +
> config/defconfig_x86_64-default-linuxapp-gcc | 5 +
> config/defconfig_x86_64-default-linuxapp-icc | 5 +
> lib/Makefile | 1 +
> lib/librte_distributor/Makefile | 50 +++
> lib/librte_distributor/rte_distributor.c | 417 ++++++++++++++++++
> lib/librte_distributor/rte_distributor.h | 173 ++++++++
> lib/librte_eal/common/include/rte_tailq_elem.h | 2 +
> mk/rte.app.mk | 4 +
> 16 files changed, 1539 insertions(+), 1 deletion(-)
> create mode 100644 app/test/test_distributor.c
> create mode 100644 app/test/test_distributor_perf.c
> create mode 100644 lib/librte_distributor/Makefile
> create mode 100644 lib/librte_distributor/rte_distributor.c
> create mode 100644 lib/librte_distributor/rte_distributor.h
>
> --
> 1.9.0
>
>
This sounds an awful lot like the team and bonding drivers. Why implement this
as a separate application accessible api, rather than a stacked PMD? If you do
the latter then existing applications could concievably change their
configurations to use this technology and gain the benefit of load distribution
without having to alter the application to use a new api.
Neil
^ permalink raw reply [flat|nested] 29+ messages in thread
* Re: [PATCH 0/4] New library: rte_distributor
[not found] ` <1400580057-30155-1-git-send-email-bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
` (4 preceding siblings ...)
2014-05-20 10:38 ` [PATCH 0/4] New library: rte_distributor Neil Horman
@ 2014-05-27 22:32 ` Thomas Monjalon
2014-05-28 8:48 ` Richardson, Bruce
2014-05-29 10:12 ` [PATCH v2 0/5] " Bruce Richardson
` (5 subsequent siblings)
11 siblings, 1 reply; 29+ messages in thread
From: Thomas Monjalon @ 2014-05-27 22:32 UTC (permalink / raw)
To: Bruce Richardson; +Cc: dev-VfR2kkLFssw
Hi Bruce,
As for rte_acl, I have some formatting comments.
2014-05-20 11:00, Bruce Richardson:
> This adds a new library to the Intel DPDK whereby a set of packets can be
> distributed one-at-a-time to a set of worker cores, with dynamic load
> balancing being done between those workers. Flows are identified by a tag
> within the mbuf (currently the RSS hash field, 32-bit value), which is used
> to ensure that no two packets of the same flow are processed in parallel,
> thereby preserving ordering.
>
> app/test/Makefile | 2 +
> app/test/commands.c | 7 +-
> app/test/test.h | 2 +
> app/test/test_distributor.c | 582 +++++++++++++++++++++++++
> app/test/test_distributor_perf.c | 274 ++++++++++++
> config/defconfig_i686-default-linuxapp-gcc | 5 +
> config/defconfig_i686-default-linuxapp-icc | 5 +
> config/defconfig_x86_64-default-bsdapp-gcc | 6 +
> config/defconfig_x86_64-default-linuxapp-gcc | 5 +
> config/defconfig_x86_64-default-linuxapp-icc | 5 +
> lib/Makefile | 1 +
> lib/librte_distributor/Makefile | 50 +++
> lib/librte_distributor/rte_distributor.c | 417 ++++++++++++++++++
> lib/librte_distributor/rte_distributor.h | 173 ++++++++
> lib/librte_eal/common/include/rte_tailq_elem.h | 2 +
> mk/rte.app.mk | 4 +
> 16 files changed, 1539 insertions(+), 1 deletion(-)
As you are introducing a new library, you need to update
doxygen configuration and start page:
doc/doxy-api.conf
doc/doxy-api-index.md
I've run checkpatch.pl from kernel.org on these distributor patches
and it reports some code style issues.
Could you have a look at it please?
Thanks
--
Thomas
^ permalink raw reply [flat|nested] 29+ messages in thread
* Re: [PATCH 0/4] New library: rte_distributor
2014-05-27 22:32 ` Thomas Monjalon
@ 2014-05-28 8:48 ` Richardson, Bruce
0 siblings, 0 replies; 29+ messages in thread
From: Richardson, Bruce @ 2014-05-28 8:48 UTC (permalink / raw)
To: Thomas Monjalon; +Cc: dev-VfR2kkLFssw
> -----Original Message-----
> From: Thomas Monjalon [mailto:thomas.monjalon-pdR9zngts4EAvxtiuMwx3w@public.gmane.org]
> Sent: Tuesday, May 27, 2014 11:33 PM
> To: Richardson, Bruce
> Cc: dev-VfR2kkLFssw@public.gmane.org
> Subject: Re: [dpdk-dev] [PATCH 0/4] New library: rte_distributor
>
> Hi Bruce,
>
> As for rte_acl, I have some formatting comments.
>
> 2014-05-20 11:00, Bruce Richardson:
> > This adds a new library to the Intel DPDK whereby a set of packets can be
> > distributed one-at-a-time to a set of worker cores, with dynamic load
> > balancing being done between those workers. Flows are identified by a tag
> > within the mbuf (currently the RSS hash field, 32-bit value), which is used
> > to ensure that no two packets of the same flow are processed in parallel,
> > thereby preserving ordering.
> >
> > app/test/Makefile | 2 +
> > app/test/commands.c | 7 +-
> > app/test/test.h | 2 +
> > app/test/test_distributor.c | 582 +++++++++++++++++++++++++
> > app/test/test_distributor_perf.c | 274 ++++++++++++
> > config/defconfig_i686-default-linuxapp-gcc | 5 +
> > config/defconfig_i686-default-linuxapp-icc | 5 +
> > config/defconfig_x86_64-default-bsdapp-gcc | 6 +
> > config/defconfig_x86_64-default-linuxapp-gcc | 5 +
> > config/defconfig_x86_64-default-linuxapp-icc | 5 +
> > lib/Makefile | 1 +
> > lib/librte_distributor/Makefile | 50 +++
> > lib/librte_distributor/rte_distributor.c | 417 ++++++++++++++++++
> > lib/librte_distributor/rte_distributor.h | 173 ++++++++
> > lib/librte_eal/common/include/rte_tailq_elem.h | 2 +
> > mk/rte.app.mk | 4 +
> > 16 files changed, 1539 insertions(+), 1 deletion(-)
>
> As you are introducing a new library, you need to update
> doxygen configuration and start page:
> doc/doxy-api.conf
> doc/doxy-api-index.md
Didn't know to update those, I'll add it in to the v2 patch set.
>
> I've run checkpatch.pl from kernel.org on these distributor patches
> and it reports some code style issues.
> Could you have a look at it please?
Yep. I've downloaded and run that patch myself in preparation for a V2 patch set (due really soon), so hopefully all should be well second time round.
^ permalink raw reply [flat|nested] 29+ messages in thread
* [PATCH v2 0/5] New library: rte_distributor
[not found] ` <1400580057-30155-1-git-send-email-bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
` (5 preceding siblings ...)
2014-05-27 22:32 ` Thomas Monjalon
@ 2014-05-29 10:12 ` Bruce Richardson
[not found] ` <1401358338-23455-1-git-send-email-bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
2014-05-29 10:12 ` [PATCH v2 1/5] eal: add tailq for new distributor component Bruce Richardson
` (4 subsequent siblings)
11 siblings, 1 reply; 29+ messages in thread
From: Bruce Richardson @ 2014-05-29 10:12 UTC (permalink / raw)
To: dev-VfR2kkLFssw
This adds a new library to the Intel DPDK whereby a set of packets can be distributed one-at-a-time to a set of worker cores, with dynamic load balancing being done between those workers. Flows are identified by a tag within the mbuf (currently the RSS hash field, 32-bit value), which is used to ensure that no two packets of the same flow are processed in parallel, thereby preserving ordering.
Major changes in V2 set:
* Updates to take account of Neil's comments on original patch set
* Updates to fix issues highlighted by checkpatch.pl
* Additional handling in library for special case when process() is called with zero mbufs
Bruce Richardson (5):
eal: add tailq for new distributor component
distributor: new packet distributor library
distributor: add distributor library to build
distributor: add unit tests for distributor lib
docs: add distributor lib to API docs
app/test/Makefile | 2 +
app/test/commands.c | 7 +-
app/test/test.h | 2 +
app/test/test_distributor.c | 595 +++++++++++++++++++++++++
app/test/test_distributor_perf.c | 275 ++++++++++++
config/common_bsdapp | 6 +
config/common_linuxapp | 5 +
doc/doxy-api-index.md | 1 +
doc/doxy-api.conf | 1 +
lib/Makefile | 1 +
lib/librte_distributor/Makefile | 50 +++
lib/librte_distributor/rte_distributor.c | 487 ++++++++++++++++++++
lib/librte_distributor/rte_distributor.h | 186 ++++++++
lib/librte_eal/common/include/rte_tailq_elem.h | 2 +
mk/rte.app.mk | 4 +
15 files changed, 1623 insertions(+), 1 deletion(-)
create mode 100644 app/test/test_distributor.c
create mode 100644 app/test/test_distributor_perf.c
create mode 100644 lib/librte_distributor/Makefile
create mode 100644 lib/librte_distributor/rte_distributor.c
create mode 100644 lib/librte_distributor/rte_distributor.h
--
1.9.3
^ permalink raw reply [flat|nested] 29+ messages in thread
* [PATCH v2 1/5] eal: add tailq for new distributor component
[not found] ` <1400580057-30155-1-git-send-email-bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
` (6 preceding siblings ...)
2014-05-29 10:12 ` [PATCH v2 0/5] " Bruce Richardson
@ 2014-05-29 10:12 ` Bruce Richardson
2014-05-29 10:12 ` [PATCH v2 2/5] distributor: new packet distributor library Bruce Richardson
` (3 subsequent siblings)
11 siblings, 0 replies; 29+ messages in thread
From: Bruce Richardson @ 2014-05-29 10:12 UTC (permalink / raw)
To: dev-VfR2kkLFssw
add new tailq to the EAL for new distributor library component.
Signed-off-by: Bruce Richardson <bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
---
lib/librte_eal/common/include/rte_tailq_elem.h | 2 ++
1 file changed, 2 insertions(+)
diff --git a/lib/librte_eal/common/include/rte_tailq_elem.h b/lib/librte_eal/common/include/rte_tailq_elem.h
index 2de4010..fdd2faf 100644
--- a/lib/librte_eal/common/include/rte_tailq_elem.h
+++ b/lib/librte_eal/common/include/rte_tailq_elem.h
@@ -82,6 +82,8 @@ rte_tailq_elem(RTE_TAILQ_PM, "RTE_PM")
rte_tailq_elem(RTE_TAILQ_ACL, "RTE_ACL")
+rte_tailq_elem(RTE_TAILQ_DISTRIBUTOR, "RTE_DISTRIBUTOR")
+
rte_tailq_end(RTE_TAILQ_NUM)
#undef rte_tailq_elem
--
1.9.3
^ permalink raw reply related [flat|nested] 29+ messages in thread
* [PATCH v2 2/5] distributor: new packet distributor library
[not found] ` <1400580057-30155-1-git-send-email-bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
` (7 preceding siblings ...)
2014-05-29 10:12 ` [PATCH v2 1/5] eal: add tailq for new distributor component Bruce Richardson
@ 2014-05-29 10:12 ` Bruce Richardson
[not found] ` <1401358338-23455-3-git-send-email-bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
2014-05-29 10:12 ` [PATCH v2 3/5] distributor: add distributor library to build Bruce Richardson
` (2 subsequent siblings)
11 siblings, 1 reply; 29+ messages in thread
From: Bruce Richardson @ 2014-05-29 10:12 UTC (permalink / raw)
To: dev-VfR2kkLFssw
This adds the code for a new Intel DPDK library for packet distribution.
The distributor is a component which is designed to pass packets
one-at-a-time to workers, with dynamic load balancing. Using the RSS
field in the mbuf as a tag, the distributor tracks what packet tag is
being processed by what worker and then ensures that no two packets with
the same tag are in-flight simultaneously. Once a tag is not in-flight,
then the next packet with that tag will be sent to the next available
core.
Changes in V2 patch :
* added support for a partial distributor flush when process() API
called without any new mbufs
* Removed unused "future use" parameters from functions
* Improved comments to be clearer about thread safety
* Add locks around the tailq add in create() API fn
* Stylistic improvements for issues flagged by checkpatch
Signed-off-by: Bruce Richardson <bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
---
lib/librte_distributor/Makefile | 50 ++++
lib/librte_distributor/rte_distributor.c | 487 +++++++++++++++++++++++++++++++
lib/librte_distributor/rte_distributor.h | 186 ++++++++++++
3 files changed, 723 insertions(+)
create mode 100644 lib/librte_distributor/Makefile
create mode 100644 lib/librte_distributor/rte_distributor.c
create mode 100644 lib/librte_distributor/rte_distributor.h
diff --git a/lib/librte_distributor/Makefile b/lib/librte_distributor/Makefile
new file mode 100644
index 0000000..36699f8
--- /dev/null
+++ b/lib/librte_distributor/Makefile
@@ -0,0 +1,50 @@
+# BSD LICENSE
+#
+# Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Intel Corporation nor the names of its
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = librte_distributor.a
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
+
+# all source are stored in SRCS-y
+SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) := rte_distributor.c
+
+# install this header file
+SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include := rte_distributor.h
+
+# this lib needs eal
+DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_eal
+DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_mbuf
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
new file mode 100644
index 0000000..35b7da3
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor.c
@@ -0,0 +1,487 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <sys/queue.h>
+#include <string.h>
+#include <rte_mbuf.h>
+#include <rte_memzone.h>
+#include <rte_errno.h>
+#include <rte_string_fns.h>
+#include <rte_tailq.h>
+#include <rte_eal_memconfig.h>
+#include "rte_distributor.h"
+
+#define NO_FLAGS 0
+#define RTE_DISTRIB_PREFIX "DT_"
+
+/* we will use the bottom four bits of pointer for flags, shifting out
+ * the top four bits to make room (since a 64-bit pointer actually only uses
+ * 48 bits). An arithmetic-right-shift will then appropriately restore the
+ * original pointer value with proper sign extension into the top bits. */
+#define RTE_DISTRIB_FLAG_BITS 4
+#define RTE_DISTRIB_FLAGS_MASK (0x0F)
+#define RTE_DISTRIB_NO_BUF 0 /**< empty flags: no buffer requested */
+#define RTE_DISTRIB_GET_BUF (1) /**< worker requests a buffer, returns old */
+#define RTE_DISTRIB_RETURN_BUF (2) /**< worker returns a buffer, no request */
+
+#define RTE_DISTRIB_BACKLOG_SIZE 8
+#define RTE_DISTRIB_BACKLOG_MASK (RTE_DISTRIB_BACKLOG_SIZE - 1)
+
+#define RTE_DISTRIB_MAX_RETURNS 128
+#define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
+
+/**
+ * Buffer structure used to pass the pointer data between cores. This is cache
+ * line aligned, but to improve performance and prevent adjacent cache-line
+ * prefetches of buffers for other workers, e.g. when worker 1's buffer is on
+ * the next cache line to worker 0, we pad this out to three cache lines.
+ * Only 64-bits of the memory is actually used though.
+ */
+union rte_distributor_buffer {
+ volatile int64_t bufptr64;
+ char pad[CACHE_LINE_SIZE*3];
+} __rte_cache_aligned;
+
+struct rte_distributor_backlog {
+ unsigned start;
+ unsigned count;
+ int64_t pkts[RTE_DISTRIB_BACKLOG_SIZE];
+};
+
+struct rte_distributor_returned_pkts {
+ unsigned start;
+ unsigned count;
+ struct rte_mbuf *mbufs[RTE_DISTRIB_MAX_RETURNS];
+};
+
+struct rte_distributor {
+ TAILQ_ENTRY(rte_distributor) next; /**< Next in list. */
+
+ char name[RTE_DISTRIBUTOR_NAMESIZE]; /**< Name of the ring. */
+ unsigned num_workers; /**< Number of workers polling */
+
+ uint32_t in_flight_tags[RTE_MAX_LCORE];
+ struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
+
+ union rte_distributor_buffer bufs[RTE_MAX_LCORE];
+
+ struct rte_distributor_returned_pkts returns;
+};
+
+TAILQ_HEAD(rte_distributor_list, rte_distributor);
+
+/**** APIs called by workers ****/
+
+struct rte_mbuf *
+rte_distributor_get_pkt(struct rte_distributor *d,
+ unsigned worker_id, struct rte_mbuf *oldpkt)
+{
+ union rte_distributor_buffer *buf = &d->bufs[worker_id];
+ int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
+ | RTE_DISTRIB_GET_BUF;
+ while (unlikely(buf->bufptr64 & RTE_DISTRIB_FLAGS_MASK))
+ rte_pause();
+ buf->bufptr64 = req;
+ while (buf->bufptr64 & RTE_DISTRIB_GET_BUF)
+ rte_pause();
+ /* since bufptr64 is signed, this should be an arithmetic shift */
+ int64_t ret = buf->bufptr64 >> RTE_DISTRIB_FLAG_BITS;
+ return (struct rte_mbuf *)((uintptr_t)ret);
+}
+
+int
+rte_distributor_return_pkt(struct rte_distributor *d,
+ unsigned worker_id, struct rte_mbuf *oldpkt)
+{
+ union rte_distributor_buffer *buf = &d->bufs[worker_id];
+ uint64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
+ | RTE_DISTRIB_RETURN_BUF;
+ buf->bufptr64 = req;
+ return 0;
+}
+
+/**** APIs called on distributor core ***/
+
+/* as name suggests, adds a packet to the backlog for a particular worker */
+static int
+add_to_backlog(struct rte_distributor_backlog *bl, int64_t item)
+{
+ if (bl->count == RTE_DISTRIB_BACKLOG_SIZE)
+ return -1;
+
+ bl->pkts[(bl->start + bl->count++) & (RTE_DISTRIB_BACKLOG_MASK)]
+ = item;
+ return 0;
+}
+
+/* takes the next packet for a worker off the backlog */
+static int64_t
+backlog_pop(struct rte_distributor_backlog *bl)
+{
+ bl->count--;
+ return bl->pkts[bl->start++ & RTE_DISTRIB_BACKLOG_MASK];
+}
+
+/* stores a packet returned from a worker inside the returns array */
+static inline void
+store_return(uintptr_t oldbuf, struct rte_distributor *d,
+ unsigned *ret_start, unsigned *ret_count)
+{
+ /* store returns in a circular buffer - code is branch-free */
+ d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
+ = (void *)oldbuf;
+ *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
+ *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
+}
+
+static inline void
+handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
+{
+ d->in_flight_tags[wkr] = 0;
+ d->bufs[wkr].bufptr64 = 0;
+ if (unlikely(d->backlog[wkr].count != 0)) {
+ /* On return of a packet, we need to move the
+ * queued packets for this core elsewhere.
+ * Easiest solution is to set things up for
+ * a recursive call. That will cause those
+ * packets to be queued up for the next free
+ * core, i.e. it will return as soon as a
+ * core becomes free to accept the first
+ * packet, as subsequent ones will be added to
+ * the backlog for that core.
+ */
+ struct rte_mbuf *pkts[RTE_DISTRIB_BACKLOG_SIZE];
+ unsigned i;
+ struct rte_distributor_backlog *bl = &d->backlog[wkr];
+
+ for (i = 0; i < bl->count; i++) {
+ unsigned idx = (bl->start + i) &
+ RTE_DISTRIB_BACKLOG_MASK;
+ pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
+ RTE_DISTRIB_FLAG_BITS));
+ }
+ /* recursive call */
+ rte_distributor_process(d, pkts, i);
+ bl->count = bl->start = 0;
+ }
+}
+
+/* this function is called when process() fn is called without any new
+ * packets. It goes through all the workers and clears any returned packets
+ * to do a partial flush.
+ */
+static int
+process_returns(struct rte_distributor *d)
+{
+ unsigned wkr;
+ unsigned flushed = 0;
+ unsigned ret_start = d->returns.start,
+ ret_count = d->returns.count;
+
+ for (wkr = 0; wkr < d->num_workers; wkr++) {
+
+ const int64_t data = d->bufs[wkr].bufptr64;
+ uintptr_t oldbuf = 0;
+
+ if (data & RTE_DISTRIB_GET_BUF) {
+ flushed++;
+ if (d->backlog[wkr].count)
+ d->bufs[wkr].bufptr64 =
+ backlog_pop(&d->backlog[wkr]);
+ else {
+ d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF;
+ d->in_flight_tags[wkr] = 0;
+ }
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ } else if (data & RTE_DISTRIB_RETURN_BUF) {
+ handle_worker_shutdown(d, wkr);
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ }
+
+ store_return(oldbuf, d, &ret_start, &ret_count);
+ }
+
+ d->returns.start = ret_start;
+ d->returns.count = ret_count;
+
+ return flushed;
+}
+
+/* process a set of packets to distribute them to workers */
+int
+rte_distributor_process(struct rte_distributor *d,
+ struct rte_mbuf **mbufs, unsigned num_mbufs)
+{
+ unsigned next_idx = 0;
+ unsigned wkr = 0;
+ struct rte_mbuf *next_mb = NULL;
+ int64_t next_value = 0;
+ uint32_t new_tag = 0;
+ unsigned ret_start = d->returns.start,
+ ret_count = d->returns.count;
+
+ if (unlikely(num_mbufs == 0))
+ return process_returns(d);
+
+ while (next_idx < num_mbufs || next_mb != NULL) {
+
+ int64_t data = d->bufs[wkr].bufptr64;
+ uintptr_t oldbuf = 0;
+
+ if (!next_mb) {
+ next_mb = mbufs[next_idx++];
+ next_value = (((int64_t)(uintptr_t)next_mb)
+ << RTE_DISTRIB_FLAG_BITS);
+ new_tag = (next_mb->pkt.hash.rss | 1);
+
+ uint32_t match = 0;
+ unsigned i;
+ for (i = 0; i < d->num_workers; i++)
+ match |= (!(d->in_flight_tags[i] ^ new_tag)
+ << i);
+
+ if (match) {
+ next_mb = NULL;
+ unsigned worker = __builtin_ctz(match);
+ if (add_to_backlog(&d->backlog[worker],
+ next_value) < 0)
+ next_idx--;
+ }
+ }
+
+ if ((data & RTE_DISTRIB_GET_BUF) &&
+ (d->backlog[wkr].count || next_mb)) {
+
+ if (d->backlog[wkr].count)
+ d->bufs[wkr].bufptr64 =
+ backlog_pop(&d->backlog[wkr]);
+
+ else {
+ d->bufs[wkr].bufptr64 = next_value;
+ d->in_flight_tags[wkr] = new_tag;
+ next_mb = NULL;
+ }
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ } else if (data & RTE_DISTRIB_RETURN_BUF) {
+ handle_worker_shutdown(d, wkr);
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ }
+
+ /* store returns in a circular buffer */
+ store_return(oldbuf, d, &ret_start, &ret_count);
+
+ if (++wkr == d->num_workers)
+ wkr = 0;
+ }
+ /* to finish, check all workers for backlog and schedule work for them
+ * if they are ready */
+ for (wkr = 0; wkr < d->num_workers; wkr++)
+ if (d->backlog[wkr].count &&
+ (d->bufs[wkr].bufptr64 & RTE_DISTRIB_GET_BUF)) {
+
+ int64_t oldbuf = d->bufs[wkr].bufptr64 >>
+ RTE_DISTRIB_FLAG_BITS;
+ store_return(oldbuf, d, &ret_start, &ret_count);
+
+ d->bufs[wkr].bufptr64 = backlog_pop(&d->backlog[wkr]);
+ }
+
+ d->returns.start = ret_start;
+ d->returns.count = ret_count;
+ return num_mbufs;
+}
+
+/* return to the caller, packets returned from workers */
+int
+rte_distributor_returned_pkts(struct rte_distributor *d,
+ struct rte_mbuf **mbufs, unsigned max_mbufs)
+{
+ struct rte_distributor_returned_pkts *returns = &d->returns;
+ unsigned retval = (max_mbufs < returns->count) ?
+ max_mbufs : returns->count;
+ unsigned i;
+
+ for (i = 0; i < retval; i++) {
+ unsigned idx = (returns->start + i) & RTE_DISTRIB_RETURNS_MASK;
+ mbufs[i] = returns->mbufs[idx];
+ }
+ returns->start += i;
+ returns->count -= i;
+
+ return retval;
+}
+
+/* local function used by the flush function only, to reassign a backlog for
+ * a shutdown core. The process function uses a recursive call for this, but
+ * that is not done in flush, as we need to track the outstanding packets count.
+ */
+static inline int
+move_backlog(struct rte_distributor *d, unsigned worker)
+{
+ struct rte_distributor_backlog *bl = &d->backlog[worker];
+ unsigned i;
+
+ for (i = 0; i < d->num_workers; i++) {
+ if (i == worker)
+ continue;
+ /* check worker is active and then if backlog will fit */
+ if ((d->in_flight_tags[i] != 0 ||
+ (d->bufs[i].bufptr64 & RTE_DISTRIB_GET_BUF)) &&
+ (bl->count + d->backlog[i].count)
+ <= RTE_DISTRIB_BACKLOG_SIZE) {
+ while (bl->count)
+ add_to_backlog(&d->backlog[i], backlog_pop(bl));
+ return 0;
+ }
+ }
+ return -1;
+}
+
+/* flush the distributor, so that there are no outstanding packets in flight or
+ * queued up. */
+int
+rte_distributor_flush(struct rte_distributor *d)
+{
+ unsigned wkr, total_outstanding = 0;
+ unsigned flushed = 0;
+ unsigned ret_start = d->returns.start,
+ ret_count = d->returns.count;
+
+ for (wkr = 0; wkr < d->num_workers; wkr++)
+ total_outstanding += d->backlog[wkr].count +
+ !!(d->in_flight_tags[wkr]);
+
+ wkr = 0;
+ while (flushed < total_outstanding) {
+
+ if (d->in_flight_tags[wkr] != 0 || d->backlog[wkr].count) {
+ const int64_t data = d->bufs[wkr].bufptr64;
+ uintptr_t oldbuf = 0;
+
+ if (data & RTE_DISTRIB_GET_BUF) {
+ flushed += (d->in_flight_tags[wkr] != 0);
+ if (d->backlog[wkr].count) {
+ d->bufs[wkr].bufptr64 =
+ backlog_pop(&d->backlog[wkr]);
+ /* we need to mark something as being
+ * in-flight, but it doesn't matter what
+ * as we never check it except
+ * to check for non-zero.
+ */
+ d->in_flight_tags[wkr] = 1;
+ } else {
+ d->bufs[wkr].bufptr64 =
+ RTE_DISTRIB_GET_BUF;
+ d->in_flight_tags[wkr] = 0;
+ }
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ } else if (data & RTE_DISTRIB_RETURN_BUF) {
+ if (d->backlog[wkr].count == 0 ||
+ move_backlog(d, wkr) == 0) {
+ /* only if we move backlog,
+ * process this packet */
+ d->bufs[wkr].bufptr64 = 0;
+ oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
+ flushed++;
+ d->in_flight_tags[wkr] = 0;
+ }
+ }
+
+ store_return(oldbuf, d, &ret_start, &ret_count);
+ }
+
+ if (++wkr == d->num_workers)
+ wkr = 0;
+ }
+ d->returns.start = ret_start;
+ d->returns.count = ret_count;
+
+ return flushed;
+}
+
+/* clears the internal returns array in the distributor */
+void
+rte_distributor_clear_returns(struct rte_distributor *d)
+{
+ d->returns.start = d->returns.count = 0;
+#ifndef __OPTIMIZE__
+ memset(d->returns.mbufs, 0, sizeof(d->returns.mbufs));
+#endif
+}
+
+/* creates a distributor instance */
+struct rte_distributor *
+rte_distributor_create(const char *name,
+ unsigned socket_id,
+ unsigned num_workers)
+{
+ struct rte_distributor *d;
+ struct rte_distributor_list *distributor_list;
+ char mz_name[RTE_MEMZONE_NAMESIZE];
+ const struct rte_memzone *mz;
+
+ /* compilation-time checks */
+ RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0);
+ RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0);
+
+ if (name == NULL || num_workers >= RTE_MAX_LCORE) {
+ rte_errno = EINVAL;
+ return NULL;
+ }
+
+ /* check that we have an initialised tail queue */
+ distributor_list = RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_DISTRIBUTOR,
+ rte_distributor_list);
+ if (distributor_list == NULL) {
+ rte_errno = E_RTE_NO_TAILQ;
+ return NULL;
+ }
+
+ rte_snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
+ mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
+ if (mz == NULL) {
+ rte_errno = ENOMEM;
+ return NULL;
+ }
+
+ d = mz->addr;
+ rte_snprintf(d->name, sizeof(d->name), "%s", name);
+ d->num_workers = num_workers;
+
+ rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+ TAILQ_INSERT_TAIL(distributor_list, d, next);
+ rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+ return d;
+}
diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h
new file mode 100644
index 0000000..d8e953f
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor.h
@@ -0,0 +1,186 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_DISTRIBUTE_H_
+#define _RTE_DISTRIBUTE_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rte_mbuf.h>
+
+#define RTE_DISTRIBUTOR_NAMESIZE 32 /**< Length of name for instance */
+
+struct rte_distributor;
+
+/**
+ * Function to create a new distributor instance
+ *
+ * Reserves the memory needed for the distributor operation and
+ * initializes the distributor to work with the configured number of workers.
+ *
+ * @param name
+ * The name to be given to the distributor instance.
+ * @param socket_id
+ * The NUMA node on which the memory is to be allocated
+ * @param num_workers
+ * The maximum number of workers that will request packets from this
+ * distributor
+ * @return
+ * The newly created distributor instance
+ */
+struct rte_distributor *
+rte_distributor_create(const char *name, unsigned socket_id,
+ unsigned num_workers);
+
+/* *** APIS to be called on the distributor lcore *** */
+/*
+ * The following APIs are the public APIs which are designed for use on a
+ * single lcore which acts as the distributor lcore for a given distributor
+ * instance. These functions cannot be called on multiple cores simultaneously
+ * without using locking to protect access to the internals of the distributor.
+ *
+ * NOTE: a given lcore cannot act as both a distributor lcore and a worker lcore
+ * for the same distributor instance, otherwise deadlock will result.
+ */
+
+/**
+ * Process a set of packets by distributing them among workers that request
+ * packets. The distributor will ensure that no two packets that have the
+ * same flow id, or tag, in the mbuf will be procesed at the same time.
+ *
+ * @param d
+ * The distributor instance to be used
+ * @param mbufs
+ * The mbufs to be distributed
+ * @param num_mbufs
+ * The number of mbufs in the mbufs array
+ * @return
+ * The number of mbufs processed.
+ */
+int
+rte_distributor_process(struct rte_distributor *d,
+ struct rte_mbuf **mbufs, unsigned num_mbufs);
+
+/**
+ * Get a set of mbufs that have been returned to the distributor by workers
+ *
+ * @param d
+ * The distributor instance to be used
+ * @param mbufs
+ * The mbufs pointer array to be filled in
+ * @param max_mbufs
+ * The size of the mbufs array
+ * @return
+ * The number of mbufs returned in the mbufs array.
+ */
+int
+rte_distributor_returned_pkts(struct rte_distributor *d,
+ struct rte_mbuf **mbufs, unsigned max_mbufs);
+
+/**
+ * Flush the distributor component, so that there are no in-flight or
+ * backlogged packets awaiting processing
+ *
+ * @param d
+ * The distributor instance to be used
+ * @return
+ * The number of queued/in-flight packets that were completed by this call.
+ */
+int
+rte_distributor_flush(struct rte_distributor *d);
+
+/**
+ * Clears the array of returned packets used as the source for the
+ * rte_distributor_returned_pkts() API call.
+ *
+ * @param d
+ * The distributor instance to be used
+ */
+void
+rte_distributor_clear_returns(struct rte_distributor *d);
+
+/* *** APIS to be called on the worker lcores *** */
+/*
+ * The following APIs are the public APIs which are designed for use on
+ * multiple lcores which act as workers for a distributor. Each lcore should use
+ * a unique worker id when requesting packets.
+ *
+ * NOTE: a given lcore cannot act as both a distributor lcore and a worker lcore
+ * for the same distributor instance, otherwise deadlock will result.
+ */
+
+/**
+ * API called by a worker to get a new packet to process. Any previous packet
+ * given to the worker is assumed to have completed processing, and may be
+ * optionally returned to the distributor via the oldpkt parameter.
+ *
+ * @param d
+ * The distributor instance to be used
+ * @param worker_id
+ * The worker instance number to use - must be less that num_workers passed
+ * at distributor creation time.
+ * @param oldpkt
+ * The previous packet, if any, being processed by the worker
+ *
+ * @return
+ * A new packet to be processed by the worker thread.
+ */
+struct rte_mbuf *
+rte_distributor_get_pkt(struct rte_distributor *d,
+ unsigned worker_id, struct rte_mbuf *oldpkt);
+
+/**
+ * API called by a worker to return a completed packet without requesting a
+ * new packet, for example, because a worker thread is shutting down
+ *
+ * @param d
+ * The distributor instance to be used
+ * @param worker_id
+ * The worker instance number to use - must be less that num_workers passed
+ * at distributor creation time.
+ * @param mbuf
+ * The previous packet being processed by the worker
+ */
+int
+rte_distributor_return_pkt(struct rte_distributor *d, unsigned worker_id,
+ struct rte_mbuf *mbuf);
+
+/******************************************/
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
--
1.9.3
^ permalink raw reply related [flat|nested] 29+ messages in thread
* [PATCH v2 3/5] distributor: add distributor library to build
[not found] ` <1400580057-30155-1-git-send-email-bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
` (8 preceding siblings ...)
2014-05-29 10:12 ` [PATCH v2 2/5] distributor: new packet distributor library Bruce Richardson
@ 2014-05-29 10:12 ` Bruce Richardson
2014-05-29 10:12 ` [PATCH v2 4/5] distributor: add unit tests for distributor lib Bruce Richardson
2014-05-29 10:12 ` [PATCH v2 5/5] docs: add distributor lib to API docs Bruce Richardson
11 siblings, 0 replies; 29+ messages in thread
From: Bruce Richardson @ 2014-05-29 10:12 UTC (permalink / raw)
To: dev-VfR2kkLFssw
add new configuration settings to enable/disable the distributor library
and add makefile entry to compile it once enabled.
Changes in V2:
* Patch updated to use new common config files
Signed-off-by: Bruce Richardson <bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
---
config/common_bsdapp | 6 ++++++
config/common_linuxapp | 5 +++++
lib/Makefile | 1 +
mk/rte.app.mk | 4 ++++
4 files changed, 16 insertions(+)
diff --git a/config/common_bsdapp b/config/common_bsdapp
index 2cc7b80..2af6191 100644
--- a/config/common_bsdapp
+++ b/config/common_bsdapp
@@ -300,3 +300,9 @@ CONFIG_RTE_APP_TEST=y
CONFIG_RTE_TEST_PMD=y
CONFIG_RTE_TEST_PMD_RECORD_CORE_CYCLES=n
CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
+
+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/config/common_linuxapp b/config/common_linuxapp
index 62619c6..1663289 100644
--- a/config/common_linuxapp
+++ b/config/common_linuxapp
@@ -337,3 +337,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
#
CONFIG_RTE_NIC_BYPASS=n
+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/lib/Makefile b/lib/Makefile
index b92b392..5a0b10f 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -55,6 +55,7 @@ DIRS-$(CONFIG_RTE_LIBRTE_METER) += librte_meter
DIRS-$(CONFIG_RTE_LIBRTE_SCHED) += librte_sched
DIRS-$(CONFIG_RTE_LIBRTE_ACL) += librte_acl
DIRS-$(CONFIG_RTE_LIBRTE_KVARGS) += librte_kvargs
+DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += librte_distributor
ifeq ($(CONFIG_RTE_EXEC_ENV_LINUXAPP),y)
DIRS-$(CONFIG_RTE_LIBRTE_KNI) += librte_kni
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index a836577..ec5fbd8 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -61,6 +61,10 @@ ifeq ($(NO_AUTOLIBS),)
LDLIBS += --whole-archive
+ifeq ($(CONFIG_RTE_LIBRTE_DISTRIBUTOR),y)
+LDLIBS += -lrte_distributor
+endif
+
ifeq ($(CONFIG_RTE_LIBRTE_KNI),y)
ifeq ($(CONFIG_RTE_EXEC_ENV_LINUXAPP),y)
LDLIBS += -lrte_kni
--
1.9.3
^ permalink raw reply related [flat|nested] 29+ messages in thread
* [PATCH v2 4/5] distributor: add unit tests for distributor lib
[not found] ` <1400580057-30155-1-git-send-email-bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
` (9 preceding siblings ...)
2014-05-29 10:12 ` [PATCH v2 3/5] distributor: add distributor library to build Bruce Richardson
@ 2014-05-29 10:12 ` Bruce Richardson
2014-05-29 10:12 ` [PATCH v2 5/5] docs: add distributor lib to API docs Bruce Richardson
11 siblings, 0 replies; 29+ messages in thread
From: Bruce Richardson @ 2014-05-29 10:12 UTC (permalink / raw)
To: dev-VfR2kkLFssw
Add a set of unit tests and some basic performance test for the
distributor library. These tests cover all the major functionality of
the library on both distributor and worker sides.
Changes in V2:
* Updated tests to work with APIs in distributor patch V2
* Stylistic changes to clear checkpatch.pl errors
Signed-off-by: Bruce Richardson <bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
---
app/test/Makefile | 2 +
app/test/commands.c | 7 +-
app/test/test.h | 2 +
app/test/test_distributor.c | 595 +++++++++++++++++++++++++++++++++++++++
app/test/test_distributor_perf.c | 275 ++++++++++++++++++
5 files changed, 880 insertions(+), 1 deletion(-)
create mode 100644 app/test/test_distributor.c
create mode 100644 app/test/test_distributor_perf.c
diff --git a/app/test/Makefile b/app/test/Makefile
index b49785e..7c2d351 100644
--- a/app/test/Makefile
+++ b/app/test/Makefile
@@ -93,6 +93,8 @@ SRCS-$(CONFIG_RTE_APP_TEST) += test_power.c
SRCS-$(CONFIG_RTE_APP_TEST) += test_common.c
SRCS-$(CONFIG_RTE_APP_TEST) += test_timer_perf.c
SRCS-$(CONFIG_RTE_APP_TEST) += test_ivshmem.c
+SRCS-$(CONFIG_RTE_APP_TEST) += test_distributor.c
+SRCS-$(CONFIG_RTE_APP_TEST) += test_distributor_perf.c
SRCS-$(CONFIG_RTE_APP_TEST) += test_devargs.c
ifeq ($(CONFIG_RTE_APP_TEST),y)
diff --git a/app/test/commands.c b/app/test/commands.c
index efa8566..dfdbd37 100644
--- a/app/test/commands.c
+++ b/app/test/commands.c
@@ -179,6 +179,10 @@ static void cmd_autotest_parsed(void *parsed_result,
ret = test_common();
if (!strcmp(res->autotest, "ivshmem_autotest"))
ret = test_ivshmem();
+ if (!strcmp(res->autotest, "distributor_autotest"))
+ ret = test_distributor();
+ if (!strcmp(res->autotest, "distributor_perf_autotest"))
+ ret = test_distributor_perf();
if (!strcmp(res->autotest, "devargs_autotest"))
ret = test_devargs();
#ifdef RTE_LIBRTE_PMD_RING
@@ -238,7 +242,8 @@ cmdline_parse_token_string_t cmd_autotest_autotest =
#ifdef RTE_LIBRTE_KVARGS
"kvargs_autotest#"
#endif
- "common_autotest");
+ "common_autotest#"
+ "distributor_autotest#distributor_perf_autotest");
cmdline_parse_inst_t cmd_autotest = {
.f = cmd_autotest_parsed, /* function to call */
diff --git a/app/test/test.h b/app/test/test.h
index 1945d29..9b83ade 100644
--- a/app/test/test.h
+++ b/app/test/test.h
@@ -92,6 +92,8 @@ int test_power(void);
int test_common(void);
int test_pmd_ring(void);
int test_ivshmem(void);
+int test_distributor(void);
+int test_distributor_perf(void);
int test_kvargs(void);
int test_devargs(void);
diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
new file mode 100644
index 0000000..e7dc1fb
--- /dev/null
+++ b/app/test/test_distributor.c
@@ -0,0 +1,595 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+#ifdef RTE_LIBRTE_DISTRIBUTOR
+#include <unistd.h>
+#include <string.h>
+#include <rte_cycles.h>
+#include <rte_errno.h>
+#include <rte_distributor.h>
+
+#define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
+#define BURST 32
+#define BIG_BATCH 1024
+
+/* statics - all zero-initialized by default */
+static volatile int quit; /**< general quit variable for all threads */
+static volatile int zero_quit; /**< var for when we just want thr0 to quit*/
+static volatile unsigned worker_idx;
+
+struct worker_stats {
+ volatile unsigned handled_packets;
+} __rte_cache_aligned;
+struct worker_stats worker_stats[RTE_MAX_LCORE];
+
+/* returns the total count of the number of packets handled by the worker
+ * functions given below.
+ */
+static inline unsigned
+total_packet_count(void)
+{
+ unsigned i, count = 0;
+ for (i = 0; i < worker_idx; i++)
+ count += worker_stats[i].handled_packets;
+ return count;
+}
+
+/* resets the packet counts for a new test */
+static inline void
+clear_packet_count(void)
+{
+ memset(&worker_stats, 0, sizeof(worker_stats));
+}
+
+/* this is the basic worker function for sanity test
+ * it does nothing but return packets and count them.
+ */
+static int
+handle_work(void *arg)
+{
+ struct rte_mbuf *pkt = NULL;
+ struct rte_distributor *d = arg;
+ unsigned count = 0;
+ unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+
+ pkt = rte_distributor_get_pkt(d, id, NULL);
+ while (!quit) {
+ worker_stats[id].handled_packets++, count++;
+ pkt = rte_distributor_get_pkt(d, id, pkt);
+ }
+ worker_stats[id].handled_packets++, count++;
+ rte_distributor_return_pkt(d, id, pkt);
+ return 0;
+}
+
+/* do basic sanity testing of the distributor. This test tests the following:
+ * - send 32 packets through distributor with the same tag and ensure they
+ * all go to the one worker
+ * - send 32 packets throught the distributor with two different tags and
+ * verify that they go equally to two different workers.
+ * - send 32 packets with different tags through the distributors and
+ * just verify we get all packets back.
+ * - send 1024 packets through the distributor, gathering the returned packets
+ * as we go. Then verify that we correctly got all 1024 pointers back again,
+ * not necessarily in the same order (as different flows).
+ */
+static int
+sanity_test(struct rte_distributor *d, struct rte_mempool *p)
+{
+ struct rte_mbuf *bufs[BURST];
+ unsigned i;
+
+ printf("=== Basic distributor sanity tests ===\n");
+ clear_packet_count();
+ if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+
+ /* now set all hash values in all buffers to zero, so all pkts go to the
+ * one worker thread */
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = 0;
+
+ rte_distributor_process(d, bufs, BURST);
+ rte_distributor_flush(d);
+ if (total_packet_count() != BURST) {
+ printf("Line %d: Error, not all packets flushed. "
+ "Expected %u, got %u\n",
+ __LINE__, BURST, total_packet_count());
+ return -1;
+ }
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+ printf("Sanity test with all zero hashes done.\n");
+ if (worker_stats[0].handled_packets != BURST)
+ return -1;
+
+ /* pick two flows and check they go correctly */
+ if (rte_lcore_count() >= 3) {
+ clear_packet_count();
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = (i & 1) << 8;
+
+ rte_distributor_process(d, bufs, BURST);
+ rte_distributor_flush(d);
+ if (total_packet_count() != BURST) {
+ printf("Line %d: Error, not all packets flushed. "
+ "Expected %u, got %u\n",
+ __LINE__, BURST, total_packet_count());
+ return -1;
+ }
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+ printf("Sanity test with two hash values done\n");
+
+ if (worker_stats[0].handled_packets != 16 ||
+ worker_stats[1].handled_packets != 16)
+ return -1;
+ }
+
+ /* give a different hash value to each packet,
+ * so load gets distributed */
+ clear_packet_count();
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = i;
+
+ rte_distributor_process(d, bufs, BURST);
+ rte_distributor_flush(d);
+ if (total_packet_count() != BURST) {
+ printf("Line %d: Error, not all packets flushed. "
+ "Expected %u, got %u\n",
+ __LINE__, BURST, total_packet_count());
+ return -1;
+ }
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+ printf("Sanity test with non-zero hashes done\n");
+
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
+
+ /* sanity test with BIG_BATCH packets to ensure they all arrived back
+ * from the returned packets function */
+ clear_packet_count();
+ struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH];
+ unsigned num_returned = 0;
+
+ /* flush out any remaining packets */
+ rte_distributor_flush(d);
+ rte_distributor_clear_returns(d);
+ if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+ for (i = 0; i < BIG_BATCH; i++)
+ many_bufs[i]->pkt.hash.rss = i << 2;
+
+ for (i = 0; i < BIG_BATCH/BURST; i++) {
+ rte_distributor_process(d, &many_bufs[i*BURST], BURST);
+ num_returned += rte_distributor_returned_pkts(d,
+ &return_bufs[num_returned],
+ BIG_BATCH - num_returned);
+ }
+ rte_distributor_flush(d);
+ num_returned += rte_distributor_returned_pkts(d,
+ &return_bufs[num_returned], BIG_BATCH - num_returned);
+
+ if (num_returned != BIG_BATCH) {
+ printf("line %d: Number returned is not the same as "
+ "number sent\n", __LINE__);
+ return -1;
+ }
+ /* big check - make sure all packets made it back!! */
+ for (i = 0; i < BIG_BATCH; i++) {
+ unsigned j;
+ struct rte_mbuf *src = many_bufs[i];
+ for (j = 0; j < BIG_BATCH; j++)
+ if (return_bufs[j] == src)
+ break;
+
+ if (j == BIG_BATCH) {
+ printf("Error: could not find source packet #%u\n", i);
+ return -1;
+ }
+ }
+ printf("Sanity test of returned packets done\n");
+
+ rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
+
+ printf("\n");
+ return 0;
+}
+
+
+/* to test that the distributor does not lose packets, we use this worker
+ * function which frees mbufs when it gets them. The distributor thread does
+ * the mbuf allocation. If distributor drops packets we'll eventually run out
+ * of mbufs.
+ */
+static int
+handle_work_with_free_mbufs(void *arg)
+{
+ struct rte_mbuf *pkt = NULL;
+ struct rte_distributor *d = arg;
+ unsigned count = 0;
+ unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+
+ pkt = rte_distributor_get_pkt(d, id, NULL);
+ while (!quit) {
+ worker_stats[id].handled_packets++, count++;
+ rte_pktmbuf_free(pkt);
+ pkt = rte_distributor_get_pkt(d, id, pkt);
+ }
+ worker_stats[id].handled_packets++, count++;
+ rte_distributor_return_pkt(d, id, pkt);
+ return 0;
+}
+
+/* Perform a sanity test of the distributor with a large number of packets,
+ * where we allocate a new set of mbufs for each burst. The workers then
+ * free the mbufs. This ensures that we don't have any packet leaks in the
+ * library.
+ */
+static int
+sanity_test_with_mbuf_alloc(struct rte_distributor *d, struct rte_mempool *p)
+{
+ unsigned i;
+ struct rte_mbuf *bufs[BURST];
+
+ printf("=== Sanity test with mbuf alloc/free ===\n");
+ clear_packet_count();
+ for (i = 0; i < ((1<<ITER_POWER)); i += BURST) {
+ unsigned j;
+ while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
+ rte_distributor_process(d, NULL, 0);
+ for (j = 0; j < BURST; j++) {
+ bufs[j]->pkt.hash.rss = (i+j) << 1;
+ bufs[j]->refcnt = 1;
+ }
+
+ rte_distributor_process(d, bufs, BURST);
+ }
+
+ rte_distributor_flush(d);
+ if (total_packet_count() < (1<<ITER_POWER)) {
+ printf("Line %u: Packet count is incorrect, %u, expected %u\n",
+ __LINE__, total_packet_count(),
+ (1<<ITER_POWER));
+ return -1;
+ }
+
+ printf("Sanity test with mbuf alloc/free passed\n\n");
+ return 0;
+}
+
+static int
+handle_work_for_shutdown_test(void *arg)
+{
+ struct rte_mbuf *pkt = NULL;
+ struct rte_distributor *d = arg;
+ unsigned count = 0;
+ const unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+
+ pkt = rte_distributor_get_pkt(d, id, NULL);
+ /* wait for quit single globally, or for worker zero, wait
+ * for zero_quit */
+ while (!quit && !(id == 0 && zero_quit)) {
+ worker_stats[id].handled_packets++, count++;
+ rte_pktmbuf_free(pkt);
+ pkt = rte_distributor_get_pkt(d, id, NULL);
+ }
+ worker_stats[id].handled_packets++, count++;
+ rte_distributor_return_pkt(d, id, pkt);
+
+ if (id == 0) {
+ /* for worker zero, allow it to restart to pick up last packet
+ * when all workers are shutting down.
+ */
+ while (zero_quit)
+ usleep(100);
+ pkt = rte_distributor_get_pkt(d, id, NULL);
+ while (!quit) {
+ worker_stats[id].handled_packets++, count++;
+ rte_pktmbuf_free(pkt);
+ pkt = rte_distributor_get_pkt(d, id, NULL);
+ }
+ rte_distributor_return_pkt(d, id, pkt);
+ }
+ return 0;
+}
+
+
+/* Perform a sanity test of the distributor with a large number of packets,
+ * where we allocate a new set of mbufs for each burst. The workers then
+ * free the mbufs. This ensures that we don't have any packet leaks in the
+ * library.
+ */
+static int
+sanity_test_with_worker_shutdown(struct rte_distributor *d,
+ struct rte_mempool *p)
+{
+ struct rte_mbuf *bufs[BURST];
+ unsigned i;
+
+ printf("=== Sanity test of worker shutdown ===\n");
+
+ clear_packet_count();
+ if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+
+ /* now set all hash values in all buffers to zero, so all pkts go to the
+ * one worker thread */
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = 0;
+
+ rte_distributor_process(d, bufs, BURST);
+ /* at this point, we will have processed some packets and have a full
+ * backlog for the other ones at worker 0.
+ */
+
+ /* get more buffers to queue up, again setting them to the same flow */
+ if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = 0;
+
+ /* get worker zero to quit */
+ zero_quit = 1;
+ rte_distributor_process(d, bufs, BURST);
+
+ /* flush the distributor */
+ rte_distributor_flush(d);
+ if (total_packet_count() != BURST * 2) {
+ printf("Line %d: Error, not all packets flushed. "
+ "Expected %u, got %u\n",
+ __LINE__, BURST * 2, total_packet_count());
+ return -1;
+ }
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+
+ printf("Sanity test with worker shutdown passed\n\n");
+ return 0;
+}
+
+/* Test that the flush function is able to move packets between workers when
+ * one worker shuts down..
+ */
+static int
+test_flush_with_worker_shutdown(struct rte_distributor *d,
+ struct rte_mempool *p)
+{
+ struct rte_mbuf *bufs[BURST];
+ unsigned i;
+
+ printf("=== Test flush fn with worker shutdown ===\n");
+
+ clear_packet_count();
+ if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+
+ /* now set all hash values in all buffers to zero, so all pkts go to the
+ * one worker thread */
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = 0;
+
+ rte_distributor_process(d, bufs, BURST);
+ /* at this point, we will have processed some packets and have a full
+ * backlog for the other ones at worker 0.
+ */
+
+ /* get worker zero to quit */
+ zero_quit = 1;
+
+ /* flush the distributor */
+ rte_distributor_flush(d);
+
+ zero_quit = 0;
+ if (total_packet_count() != BURST) {
+ printf("Line %d: Error, not all packets flushed. "
+ "Expected %u, got %u\n",
+ __LINE__, BURST, total_packet_count());
+ return -1;
+ }
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+
+ printf("Flush test with worker shutdown passed\n\n");
+ return 0;
+}
+
+static
+int test_error_distributor_create_name(void)
+{
+ struct rte_distributor *d = NULL;
+ char *name = NULL;
+
+ d = rte_distributor_create(name, rte_socket_id(),
+ rte_lcore_count() - 1);
+ if (d != NULL || rte_errno != EINVAL) {
+ printf("ERROR: No error on create() with NULL name param\n");
+ return -1;
+ }
+
+ return 0;
+}
+
+
+static
+int test_error_distributor_create_numworkers(void)
+{
+ struct rte_distributor *d = NULL;
+ d = rte_distributor_create("test_numworkers", rte_socket_id(),
+ RTE_MAX_LCORE + 10);
+ if (d != NULL || rte_errno != EINVAL) {
+ printf("ERROR: No error on create() with num_workers > MAX\n");
+ return -1;
+ }
+ return 0;
+}
+
+
+/* Useful function which ensures that all worker functions terminate */
+static void
+quit_workers(struct rte_distributor *d, struct rte_mempool *p)
+{
+ const unsigned num_workers = rte_lcore_count() - 1;
+ unsigned i;
+ struct rte_mbuf *bufs[RTE_MAX_LCORE];
+ rte_mempool_get_bulk(p, (void *)bufs, num_workers);
+
+ zero_quit = 0;
+ quit = 1;
+ for (i = 0; i < num_workers; i++)
+ bufs[i]->pkt.hash.rss = i << 1;
+ rte_distributor_process(d, bufs, num_workers);
+
+ rte_mempool_put_bulk(p, (void *)bufs, num_workers);
+
+ rte_distributor_process(d, NULL, 0);
+ rte_distributor_flush(d);
+ rte_eal_mp_wait_lcore();
+ quit = 0;
+ worker_idx = 0;
+}
+
+#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
+
+int
+test_distributor(void)
+{
+ static struct rte_distributor *d;
+ static struct rte_mempool *p;
+
+ if (rte_lcore_count() < 2) {
+ printf("ERROR: not enough cores to test distributor\n");
+ return -1;
+ }
+
+ if (d == NULL) {
+ d = rte_distributor_create("Test_distributor", rte_socket_id(),
+ rte_lcore_count() - 1);
+ if (d == NULL) {
+ printf("Error creating distributor\n");
+ return -1;
+ }
+ } else {
+ rte_distributor_flush(d);
+ rte_distributor_clear_returns(d);
+ }
+
+ const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
+ (BIG_BATCH * 2) - 1 : (511 * rte_lcore_count());
+ if (p == NULL) {
+ p = rte_mempool_create("DT_MBUF_POOL", nb_bufs,
+ MBUF_SIZE, BURST,
+ sizeof(struct rte_pktmbuf_pool_private),
+ rte_pktmbuf_pool_init, NULL,
+ rte_pktmbuf_init, NULL,
+ rte_socket_id(), 0);
+ if (p == NULL) {
+ printf("Error creating mempool\n");
+ return -1;
+ }
+ }
+
+ rte_eal_mp_remote_launch(handle_work, d, SKIP_MASTER);
+ if (sanity_test(d, p) < 0)
+ goto err;
+ quit_workers(d, p);
+
+ rte_eal_mp_remote_launch(handle_work_with_free_mbufs, d, SKIP_MASTER);
+ if (sanity_test_with_mbuf_alloc(d, p) < 0)
+ goto err;
+ quit_workers(d, p);
+
+ if (rte_lcore_count() > 2) {
+ rte_eal_mp_remote_launch(handle_work_for_shutdown_test, d,
+ SKIP_MASTER);
+ if (sanity_test_with_worker_shutdown(d, p) < 0)
+ goto err;
+ quit_workers(d, p);
+
+ rte_eal_mp_remote_launch(handle_work_for_shutdown_test, d,
+ SKIP_MASTER);
+ if (test_flush_with_worker_shutdown(d, p) < 0)
+ goto err;
+ quit_workers(d, p);
+
+ } else {
+ printf("Not enough cores to run tests for worker shutdown\n");
+ }
+
+ if (test_error_distributor_create_numworkers() == -1 ||
+ test_error_distributor_create_name() == -1) {
+ printf("rte_distributor_create parameter check tests failed");
+ return -1;
+ }
+
+ return 0;
+
+err:
+ quit_workers(d, p);
+ return -1;
+}
+
+#else
+
+#include <stdio.h>
+
+int
+test_distributor(void)
+{
+ printf("Distributor is not enabled in configuration\n");
+ return 0;
+}
+
+#endif
diff --git a/app/test/test_distributor_perf.c b/app/test/test_distributor_perf.c
new file mode 100644
index 0000000..1031baa
--- /dev/null
+++ b/app/test/test_distributor_perf.c
@@ -0,0 +1,275 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+#ifdef RTE_LIBRTE_DISTRIBUTOR
+#include <unistd.h>
+#include <string.h>
+#include <rte_cycles.h>
+#include <rte_distributor.h>
+
+#define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
+#define BURST 32
+#define BIG_BATCH 1024
+
+/* static vars - zero initialized by default */
+static volatile int quit;
+static volatile unsigned worker_idx;
+
+struct worker_stats {
+ volatile unsigned handled_packets;
+} __rte_cache_aligned;
+struct worker_stats worker_stats[RTE_MAX_LCORE];
+
+/* worker thread used for testing the time to do a round-trip of a cache
+ * line between two cores and back again
+ */
+static void
+flip_bit(volatile uint64_t *arg)
+{
+ uint64_t old_val = 0;
+ while (old_val != 2) {
+ while (!*arg)
+ rte_pause();
+ old_val = *arg;
+ *arg = 0;
+ }
+}
+
+/* test case to time the number of cycles to round-trip a cache line between
+ * two cores and back again.
+ */
+static void
+time_cache_line_switch(void)
+{
+ /* allocate a full cache line for data, we use only first byte of it */
+ uint64_t data[CACHE_LINE_SIZE*3 / sizeof(uint64_t)];
+
+ unsigned i, slaveid = rte_get_next_lcore(rte_lcore_id(), 0, 0);
+ volatile uint64_t *pdata = &data[0];
+ *pdata = 1;
+ rte_eal_remote_launch((lcore_function_t *)flip_bit, &data[0], slaveid);
+ while (*pdata)
+ rte_pause();
+
+ const uint64_t start_time = rte_rdtsc();
+ for (i = 0; i < (1 << ITER_POWER); i++) {
+ while (*pdata)
+ rte_pause();
+ *pdata = 1;
+ }
+ const uint64_t end_time = rte_rdtsc();
+
+ while (*pdata)
+ rte_pause();
+ *pdata = 2;
+ rte_eal_wait_lcore(slaveid);
+ printf("==== Cache line switch test ===\n");
+ printf("Time for %u iterations = %"PRIu64" ticks\n", (1<<ITER_POWER),
+ end_time-start_time);
+ printf("Ticks per iteration = %"PRIu64"\n\n",
+ (end_time-start_time) >> ITER_POWER);
+}
+
+/* returns the total count of the number of packets handled by the worker
+ * functions given below.
+ */
+static unsigned
+total_packet_count(void)
+{
+ unsigned i, count = 0;
+ for (i = 0; i < worker_idx; i++)
+ count += worker_stats[i].handled_packets;
+ return count;
+}
+
+/* resets the packet counts for a new test */
+static void
+clear_packet_count(void)
+{
+ memset(&worker_stats, 0, sizeof(worker_stats));
+}
+
+/* this is the basic worker function for performance tests.
+ * it does nothing but return packets and count them.
+ */
+static int
+handle_work(void *arg)
+{
+ struct rte_mbuf *pkt = NULL;
+ struct rte_distributor *d = arg;
+ unsigned count = 0;
+ unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+
+ pkt = rte_distributor_get_pkt(d, id, NULL);
+ while (!quit) {
+ worker_stats[id].handled_packets++, count++;
+ pkt = rte_distributor_get_pkt(d, id, pkt);
+ }
+ worker_stats[id].handled_packets++, count++;
+ rte_distributor_return_pkt(d, id, pkt);
+ return 0;
+}
+
+/* this basic performance test just repeatedly sends in 32 packets at a time
+ * to the distributor and verifies at the end that we got them all in the worker
+ * threads and finally how long per packet the processing took.
+ */
+static inline int
+perf_test(struct rte_distributor *d, struct rte_mempool *p)
+{
+ unsigned i;
+ uint64_t start, end;
+ struct rte_mbuf *bufs[BURST];
+
+ clear_packet_count();
+ if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ printf("Error getting mbufs from pool\n");
+ return -1;
+ }
+ /* ensure we have different hash value for each pkt */
+ for (i = 0; i < BURST; i++)
+ bufs[i]->pkt.hash.rss = i;
+
+ start = rte_rdtsc();
+ for (i = 0; i < (1<<ITER_POWER); i++)
+ rte_distributor_process(d, bufs, BURST);
+ end = rte_rdtsc();
+
+ do {
+ usleep(100);
+ rte_distributor_process(d, NULL, 0);
+ } while (total_packet_count() < (BURST << ITER_POWER));
+
+ printf("=== Performance test of distributor ===\n");
+ printf("Time per burst: %"PRIu64"\n", (end - start) >> ITER_POWER);
+ printf("Time per packet: %"PRIu64"\n\n",
+ ((end - start) >> ITER_POWER)/BURST);
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ worker_stats[i].handled_packets);
+ printf("Total packets: %u (%x)\n", total_packet_count(),
+ total_packet_count());
+ printf("=== Perf test done ===\n\n");
+
+ return 0;
+}
+
+/* Useful function which ensures that all worker functions terminate */
+static void
+quit_workers(struct rte_distributor *d, struct rte_mempool *p)
+{
+ const unsigned num_workers = rte_lcore_count() - 1;
+ unsigned i;
+ struct rte_mbuf *bufs[RTE_MAX_LCORE];
+ rte_mempool_get_bulk(p, (void *)bufs, num_workers);
+
+ quit = 1;
+ for (i = 0; i < num_workers; i++)
+ bufs[i]->pkt.hash.rss = i << 1;
+ rte_distributor_process(d, bufs, num_workers);
+
+ rte_mempool_put_bulk(p, (void *)bufs, num_workers);
+
+ rte_distributor_process(d, NULL, 0);
+ rte_eal_mp_wait_lcore();
+ quit = 0;
+ worker_idx = 0;
+}
+
+#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
+
+int
+test_distributor_perf(void)
+{
+ static struct rte_distributor *d;
+ static struct rte_mempool *p;
+
+ if (rte_lcore_count() < 2) {
+ printf("ERROR: not enough cores to test distributor\n");
+ return -1;
+ }
+
+ /* first time how long it takes to round-trip a cache line */
+ time_cache_line_switch();
+
+ if (d == NULL) {
+ d = rte_distributor_create("Test_perf", rte_socket_id(),
+ rte_lcore_count() - 1);
+ if (d == NULL) {
+ printf("Error creating distributor\n");
+ return -1;
+ }
+ } else {
+ rte_distributor_flush(d);
+ rte_distributor_clear_returns(d);
+ }
+
+ const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
+ (BIG_BATCH * 2) - 1 : (511 * rte_lcore_count());
+ if (p == NULL) {
+ p = rte_mempool_create("DPT_MBUF_POOL", nb_bufs,
+ MBUF_SIZE, BURST,
+ sizeof(struct rte_pktmbuf_pool_private),
+ rte_pktmbuf_pool_init, NULL,
+ rte_pktmbuf_init, NULL,
+ rte_socket_id(), 0);
+ if (p == NULL) {
+ printf("Error creating mempool\n");
+ return -1;
+ }
+ }
+
+ rte_eal_mp_remote_launch(handle_work, d, SKIP_MASTER);
+ if (perf_test(d, p) < 0)
+ return -1;
+ quit_workers(d, p);
+
+ return 0;
+}
+
+#else
+
+#include <stdio.h>
+
+int
+test_distributor_perf(void)
+{
+ printf("Distributor is not enabled in configuration\n");
+ return 0;
+}
+
+#endif
--
1.9.3
^ permalink raw reply related [flat|nested] 29+ messages in thread
* [PATCH v2 5/5] docs: add distributor lib to API docs
[not found] ` <1400580057-30155-1-git-send-email-bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
` (10 preceding siblings ...)
2014-05-29 10:12 ` [PATCH v2 4/5] distributor: add unit tests for distributor lib Bruce Richardson
@ 2014-05-29 10:12 ` Bruce Richardson
11 siblings, 0 replies; 29+ messages in thread
From: Bruce Richardson @ 2014-05-29 10:12 UTC (permalink / raw)
To: dev-VfR2kkLFssw
Add entries into the API documentation for the new distributor library.
Signed-off-by: Bruce Richardson <bruce.richardson-ral2JQCrhuEAvxtiuMwx3w@public.gmane.org>
---
doc/doxy-api-index.md | 1 +
doc/doxy-api.conf | 1 +
2 files changed, 2 insertions(+)
diff --git a/doc/doxy-api-index.md b/doc/doxy-api-index.md
index 2825c08..6e75a6e 100644
--- a/doc/doxy-api-index.md
+++ b/doc/doxy-api-index.md
@@ -94,6 +94,7 @@ There are many libraries, so their headers may be grouped by topics:
- **containers**:
[mbuf] (@ref rte_mbuf.h),
[ring] (@ref rte_ring.h),
+ [distributor] (@ref rte_distributor.h),
[tailq] (@ref rte_tailq.h),
[bitmap] (@ref rte_bitmap.h)
diff --git a/doc/doxy-api.conf b/doc/doxy-api.conf
index 642f77a..9df7356 100644
--- a/doc/doxy-api.conf
+++ b/doc/doxy-api.conf
@@ -31,6 +31,7 @@
PROJECT_NAME = DPDK
INPUT = doc/doxy-api-index.md \
lib/librte_eal/common/include \
+ lib/librte_distributor \
lib/librte_ether \
lib/librte_hash \
lib/librte_kni \
--
1.9.3
^ permalink raw reply related [flat|nested] 29+ messages in thread