All of lore.kernel.org
 help / color / mirror / Atom feed
From: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
To: gage.eads@intel.com, jerin.jacobkollanukkaran@cavium.com,
	harry.van.haaren@intel.com, hemant.agrawal@nxp.com,
	liang.j.ma@intel.com, santosh.shukla@caviumnetworks.com
Cc: dev@dpdk.org, Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
Subject: [PATCH v2 04/15] examples/eventdev: add generic worker pipeline
Date: Wed, 10 Jan 2018 16:40:02 +0530	[thread overview]
Message-ID: <20180110111013.14644-4-pbhagavatula@caviumnetworks.com> (raw)
In-Reply-To: <20180110111013.14644-1-pbhagavatula@caviumnetworks.com>

Rename existing pipeline as generic worker pipeline.

Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
---

 v2 Changes:
 - Add SPDX licence tags

 examples/eventdev_pipeline_sw_pmd/Makefile         |   1 +
 examples/eventdev_pipeline_sw_pmd/main.c           | 440 +--------------------
 .../eventdev_pipeline_sw_pmd/pipeline_common.h     |  53 +++
 .../pipeline_worker_generic.c                      | 398 +++++++++++++++++++
 4 files changed, 466 insertions(+), 426 deletions(-)
 create mode 100644 examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c

diff --git a/examples/eventdev_pipeline_sw_pmd/Makefile b/examples/eventdev_pipeline_sw_pmd/Makefile
index de4e22c88..5e30556fb 100644
--- a/examples/eventdev_pipeline_sw_pmd/Makefile
+++ b/examples/eventdev_pipeline_sw_pmd/Makefile
@@ -42,6 +42,7 @@ APP = eventdev_pipeline_sw_pmd

 # all source are stored in SRCS-y
 SRCS-y := main.c
+SRCS-y += pipeline_worker_generic.c

 CFLAGS += -O3
 CFLAGS += $(WERROR_FLAGS)
diff --git a/examples/eventdev_pipeline_sw_pmd/main.c b/examples/eventdev_pipeline_sw_pmd/main.c
index 2c919b7fa..295c8b692 100644
--- a/examples/eventdev_pipeline_sw_pmd/main.c
+++ b/examples/eventdev_pipeline_sw_pmd/main.c
@@ -68,179 +68,6 @@ eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
 	} while (_sent != unsent);
 }

-static int
-consumer(void)
-{
-	const uint64_t freq_khz = rte_get_timer_hz() / 1000;
-	struct rte_event packets[BATCH_SIZE];
-
-	static uint64_t received;
-	static uint64_t last_pkts;
-	static uint64_t last_time;
-	static uint64_t start_time;
-	unsigned int i, j;
-	uint8_t dev_id = cons_data.dev_id;
-	uint8_t port_id = cons_data.port_id;
-
-	uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
-			packets, RTE_DIM(packets), 0);
-
-	if (n == 0) {
-		for (j = 0; j < rte_eth_dev_count(); j++)
-			rte_eth_tx_buffer_flush(j, 0, fdata->tx_buf[j]);
-		return 0;
-	}
-	if (start_time == 0)
-		last_time = start_time = rte_get_timer_cycles();
-
-	received += n;
-	for (i = 0; i < n; i++) {
-		uint8_t outport = packets[i].mbuf->port;
-		rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
-				packets[i].mbuf);
-
-		packets[i].op = RTE_EVENT_OP_RELEASE;
-	}
-
-	if (cons_data.release) {
-		uint16_t nb_tx;
-
-		nb_tx = rte_event_enqueue_burst(dev_id, port_id, packets, n);
-		while (nb_tx < n)
-			nb_tx += rte_event_enqueue_burst(dev_id, port_id,
-							 packets + nb_tx,
-							 n - nb_tx);
-	}
-
-	/* Print out mpps every 1<22 packets */
-	if (!cdata.quiet && received >= last_pkts + (1<<22)) {
-		const uint64_t now = rte_get_timer_cycles();
-		const uint64_t total_ms = (now - start_time) / freq_khz;
-		const uint64_t delta_ms = (now - last_time) / freq_khz;
-		uint64_t delta_pkts = received - last_pkts;
-
-		printf("# consumer RX=%"PRIu64", time %"PRIu64 "ms, "
-			"avg %.3f mpps [current %.3f mpps]\n",
-				received,
-				total_ms,
-				received / (total_ms * 1000.0),
-				delta_pkts / (delta_ms * 1000.0));
-		last_pkts = received;
-		last_time = now;
-	}
-
-	cdata.num_packets -= n;
-	if (cdata.num_packets <= 0)
-		fdata->done = 1;
-
-	return 0;
-}
-
-static inline void
-schedule_devices(unsigned int lcore_id)
-{
-	if (fdata->rx_core[lcore_id]) {
-		rte_service_run_iter_on_app_lcore(fdata->rxadptr_service_id,
-				!fdata->rx_single);
-	}
-
-	if (fdata->sched_core[lcore_id]) {
-		rte_service_run_iter_on_app_lcore(fdata->evdev_service_id,
-				!fdata->sched_single);
-		if (cdata.dump_dev_signal) {
-			rte_event_dev_dump(0, stdout);
-			cdata.dump_dev_signal = 0;
-		}
-	}
-
-	if (fdata->tx_core[lcore_id] && (fdata->tx_single ||
-	    rte_atomic32_cmpset(&(fdata->tx_lock), 0, 1))) {
-		consumer();
-		rte_atomic32_clear((rte_atomic32_t *)&(fdata->tx_lock));
-	}
-}
-
-static inline void
-work(struct rte_mbuf *m)
-{
-	struct ether_hdr *eth;
-	struct ether_addr addr;
-
-	/* change mac addresses on packet (to use mbuf data) */
-	/*
-	 * FIXME Swap mac address properly and also handle the
-	 * case for both odd and even number of stages that the
-	 * addresses end up the same at the end of the pipeline
-	 */
-	eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
-	ether_addr_copy(&eth->d_addr, &addr);
-	ether_addr_copy(&addr, &eth->d_addr);
-
-	/* do a number of cycles of work per packet */
-	volatile uint64_t start_tsc = rte_rdtsc();
-	while (rte_rdtsc() < start_tsc + cdata.worker_cycles)
-		rte_pause();
-}
-
-static int
-worker(void *arg)
-{
-	struct rte_event events[BATCH_SIZE];
-
-	struct worker_data *data = (struct worker_data *)arg;
-	uint8_t dev_id = data->dev_id;
-	uint8_t port_id = data->port_id;
-	size_t sent = 0, received = 0;
-	unsigned int lcore_id = rte_lcore_id();
-
-	while (!fdata->done) {
-		uint16_t i;
-
-		schedule_devices(lcore_id);
-
-		if (!fdata->worker_core[lcore_id]) {
-			rte_pause();
-			continue;
-		}
-
-		const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
-				events, RTE_DIM(events), 0);
-
-		if (nb_rx == 0) {
-			rte_pause();
-			continue;
-		}
-		received += nb_rx;
-
-		for (i = 0; i < nb_rx; i++) {
-
-			/* The first worker stage does classification */
-			if (events[i].queue_id == cdata.qid[0])
-				events[i].flow_id = events[i].mbuf->hash.rss
-							% cdata.num_fids;
-
-			events[i].queue_id = cdata.next_qid[events[i].queue_id];
-			events[i].op = RTE_EVENT_OP_FORWARD;
-			events[i].sched_type = cdata.queue_type;
-
-			work(events[i].mbuf);
-		}
-		uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
-				events, nb_rx);
-		while (nb_tx < nb_rx && !fdata->done)
-			nb_tx += rte_event_enqueue_burst(dev_id, port_id,
-							events + nb_tx,
-							nb_rx - nb_tx);
-		sent += nb_tx;
-	}
-
-	if (!cdata.quiet)
-		printf("  worker %u thread done. RX=%zu TX=%zu\n",
-				rte_lcore_id(), received, sent);
-
-	return 0;
-}
-
 /*
  * Parse the coremask given as argument (hexadecimal string) and fill
  * the global configuration (core role and core count) with the parsed
@@ -453,70 +280,6 @@ parse_app_args(int argc, char **argv)
 	}
 }

-static inline void
-init_rx_adapter(uint16_t nb_ports)
-{
-	int i;
-	int ret;
-	uint8_t evdev_id = 0;
-	struct rte_event_dev_info dev_info;
-
-	ret = rte_event_dev_info_get(evdev_id, &dev_info);
-
-	struct rte_event_port_conf rx_p_conf = {
-		.dequeue_depth = 8,
-		.enqueue_depth = 8,
-		.new_event_threshold = 1200,
-	};
-
-	if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
-		rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
-	if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
-		rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
-
-	ret = rte_event_eth_rx_adapter_create(cdata.rx_adapter_id, evdev_id,
-			&rx_p_conf);
-	if (ret)
-		rte_exit(EXIT_FAILURE, "failed to create rx adapter[%d]",
-				cdata.rx_adapter_id);
-
-	struct rte_event_eth_rx_adapter_queue_conf queue_conf = {
-		.ev.sched_type = cdata.queue_type,
-		.ev.queue_id = cdata.qid[0],
-	};
-
-	for (i = 0; i < nb_ports; i++) {
-		uint32_t cap;
-
-		ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
-		if (ret)
-			rte_exit(EXIT_FAILURE,
-					"failed to get event rx adapter "
-					"capabilities");
-
-		ret = rte_event_eth_rx_adapter_queue_add(cdata.rx_adapter_id, i,
-				-1, &queue_conf);
-		if (ret)
-			rte_exit(EXIT_FAILURE,
-					"Failed to add queues to Rx adapter");
-	}
-
-	ret = rte_event_eth_rx_adapter_service_id_get(cdata.rx_adapter_id,
-				&fdata->rxadptr_service_id);
-	if (ret != -ESRCH && ret != 0) {
-		rte_exit(EXIT_FAILURE,
-			"Error getting the service ID for Rx adapter\n");
-	}
-	rte_service_runstate_set(fdata->rxadptr_service_id, 1);
-	rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 0);
-
-	ret = rte_event_eth_rx_adapter_start(cdata.rx_adapter_id);
-	if (ret)
-		rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
-				cdata.rx_adapter_id);
-
-}
-
 /*
  * Initializes a given port using global settings and with the RX buffers
  * coming from the mbuf_pool passed as a parameter.
@@ -621,191 +384,12 @@ init_ports(unsigned int num_ports)
 	return 0;
 }

-struct port_link {
-	uint8_t queue_id;
-	uint8_t priority;
-};
-
-static int
-setup_eventdev(struct cons_data *cons_data,
-		struct worker_data *worker_data)
+static void
+do_capability_setup(uint16_t nb_ethdev, uint8_t eventdev_id)
 {
-	const uint8_t dev_id = 0;
-	/* +1 stages is for a SINGLE_LINK TX stage */
-	const uint8_t nb_queues = cdata.num_stages + 1;
-	/* + 1 for consumer */
-	const uint8_t nb_ports = cdata.num_workers + 1;
-	struct rte_event_dev_config config = {
-			.nb_event_queues = nb_queues,
-			.nb_event_ports = nb_ports,
-			.nb_events_limit  = 4096,
-			.nb_event_queue_flows = 1024,
-			.nb_event_port_dequeue_depth = 128,
-			.nb_event_port_enqueue_depth = 128,
-	};
-	struct rte_event_port_conf wkr_p_conf = {
-			.dequeue_depth = cdata.worker_cq_depth,
-			.enqueue_depth = 64,
-			.new_event_threshold = 4096,
-	};
-	struct rte_event_queue_conf wkr_q_conf = {
-			.schedule_type = cdata.queue_type,
-			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
-			.nb_atomic_flows = 1024,
-			.nb_atomic_order_sequences = 1024,
-	};
-	struct rte_event_port_conf tx_p_conf = {
-			.dequeue_depth = 128,
-			.enqueue_depth = 128,
-			.new_event_threshold = 4096,
-	};
-	const struct rte_event_queue_conf tx_q_conf = {
-			.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
-			.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
-	};
-
-	struct port_link worker_queues[MAX_NUM_STAGES];
-	uint8_t disable_implicit_release;
-	struct port_link tx_queue;
-	unsigned int i;
-
-	int ret, ndev = rte_event_dev_count();
-	if (ndev < 1) {
-		printf("%d: No Eventdev Devices Found\n", __LINE__);
-		return -1;
-	}
-
-	struct rte_event_dev_info dev_info;
-	ret = rte_event_dev_info_get(dev_id, &dev_info);
-	printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
-
-	disable_implicit_release = (dev_info.event_dev_cap &
-				    RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE);
-
-	wkr_p_conf.disable_implicit_release = disable_implicit_release;
-	tx_p_conf.disable_implicit_release = disable_implicit_release;
-
-	if (dev_info.max_event_port_dequeue_depth <
-			config.nb_event_port_dequeue_depth)
-		config.nb_event_port_dequeue_depth =
-				dev_info.max_event_port_dequeue_depth;
-	if (dev_info.max_event_port_enqueue_depth <
-			config.nb_event_port_enqueue_depth)
-		config.nb_event_port_enqueue_depth =
-				dev_info.max_event_port_enqueue_depth;
-
-	ret = rte_event_dev_configure(dev_id, &config);
-	if (ret < 0) {
-		printf("%d: Error configuring device\n", __LINE__);
-		return -1;
-	}
-
-	/* Q creation - one load balanced per pipeline stage*/
-	printf("  Stages:\n");
-	for (i = 0; i < cdata.num_stages; i++) {
-		if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) {
-			printf("%d: error creating qid %d\n", __LINE__, i);
-			return -1;
-		}
-		cdata.qid[i] = i;
-		cdata.next_qid[i] = i+1;
-		worker_queues[i].queue_id = i;
-		if (cdata.enable_queue_priorities) {
-			/* calculate priority stepping for each stage, leaving
-			 * headroom of 1 for the SINGLE_LINK TX below
-			 */
-			const uint32_t prio_delta =
-				(RTE_EVENT_DEV_PRIORITY_LOWEST-1) /  nb_queues;
-
-			/* higher priority for queues closer to tx */
-			wkr_q_conf.priority =
-				RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta * i;
-		}
-
-		const char *type_str = "Atomic";
-		switch (wkr_q_conf.schedule_type) {
-		case RTE_SCHED_TYPE_ORDERED:
-			type_str = "Ordered";
-			break;
-		case RTE_SCHED_TYPE_PARALLEL:
-			type_str = "Parallel";
-			break;
-		}
-		printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str,
-				wkr_q_conf.priority);
-	}
-	printf("\n");
-
-	/* final queue for sending to TX core */
-	if (rte_event_queue_setup(dev_id, i, &tx_q_conf) < 0) {
-		printf("%d: error creating qid %d\n", __LINE__, i);
-		return -1;
-	}
-	tx_queue.queue_id = i;
-	tx_queue.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST;
-
-	if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
-		wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
-	if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
-		wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
-
-	/* set up one port per worker, linking to all stage queues */
-	for (i = 0; i < cdata.num_workers; i++) {
-		struct worker_data *w = &worker_data[i];
-		w->dev_id = dev_id;
-		if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
-			printf("Error setting up port %d\n", i);
-			return -1;
-		}
-
-		uint32_t s;
-		for (s = 0; s < cdata.num_stages; s++) {
-			if (rte_event_port_link(dev_id, i,
-						&worker_queues[s].queue_id,
-						&worker_queues[s].priority,
-						1) != 1) {
-				printf("%d: error creating link for port %d\n",
-						__LINE__, i);
-				return -1;
-			}
-		}
-		w->port_id = i;
-	}
-
-	if (tx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
-		tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
-	if (tx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
-		tx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
-
-	/* port for consumer, linked to TX queue */
-	if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
-		printf("Error setting up port %d\n", i);
-		return -1;
-	}
-	if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
-				&tx_queue.priority, 1) != 1) {
-		printf("%d: error creating link for port %d\n",
-				__LINE__, i);
-		return -1;
-	}
-	*cons_data = (struct cons_data){.dev_id = dev_id,
-					.port_id = i,
-					.release = disable_implicit_release };
-
-	ret = rte_event_dev_service_id_get(dev_id,
-				&fdata->evdev_service_id);
-	if (ret != -ESRCH && ret != 0) {
-		printf("Error getting the service ID for sw eventdev\n");
-		return -1;
-	}
-	rte_service_runstate_set(fdata->evdev_service_id, 1);
-	rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
-	if (rte_event_dev_start(dev_id) < 0) {
-		printf("Error starting eventdev\n");
-		return -1;
-	}
-
-	return dev_id;
+	RTE_SET_USED(nb_ethdev);
+	RTE_SET_USED(eventdev_id);
+	set_worker_generic_setup_data(&fdata->cap, 1);
 }

 static void
@@ -886,17 +470,21 @@ main(int argc, char **argv)
 	if (ndevs > 1)
 		fprintf(stderr, "Warning: More than one eventdev, using idx 0");

+
+	do_capability_setup(num_ports, 0);
+	fdata->cap.check_opt();
+
 	worker_data = rte_calloc(0, cdata.num_workers,
 			sizeof(worker_data[0]), 0);
 	if (worker_data == NULL)
 		rte_panic("rte_calloc failed\n");

-	int dev_id = setup_eventdev(&cons_data, worker_data);
+	int dev_id = fdata->cap.evdev_setup(&cons_data, worker_data);
 	if (dev_id < 0)
 		rte_exit(EXIT_FAILURE, "Error setting up eventdev\n");

 	init_ports(num_ports);
-	init_rx_adapter(num_ports);
+	fdata->cap.adptr_setup(num_ports);

 	int worker_idx = 0;
 	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
@@ -929,8 +517,8 @@ main(int argc, char **argv)
 				__func__, lcore_id,
 				worker_data[worker_idx].port_id);

-		err = rte_eal_remote_launch(worker, &worker_data[worker_idx],
-					    lcore_id);
+		err = rte_eal_remote_launch(fdata->cap.worker,
+				&worker_data[worker_idx], lcore_id);
 		if (err) {
 			rte_panic("Failed to launch worker on core %d\n",
 					lcore_id);
@@ -943,7 +531,7 @@ main(int argc, char **argv)
 	lcore_id = rte_lcore_id();

 	if (core_in_use(lcore_id))
-		worker(&worker_data[worker_idx++]);
+		fdata->cap.worker(&worker_data[worker_idx++]);

 	rte_eal_mp_wait_lcore();

diff --git a/examples/eventdev_pipeline_sw_pmd/pipeline_common.h b/examples/eventdev_pipeline_sw_pmd/pipeline_common.h
index 00721ea94..379ba9d4b 100644
--- a/examples/eventdev_pipeline_sw_pmd/pipeline_common.h
+++ b/examples/eventdev_pipeline_sw_pmd/pipeline_common.h
@@ -84,7 +84,60 @@ struct config_data {
 	uint8_t rx_adapter_id;
 };

+struct port_link {
+	uint8_t queue_id;
+	uint8_t priority;
+};
+
 struct cons_data cons_data;

 struct fastpath_data *fdata;
 struct config_data cdata;
+
+static __rte_always_inline void
+work(struct rte_mbuf *m)
+{
+	struct ether_hdr *eth;
+	struct ether_addr addr;
+
+	/* change mac addresses on packet (to use mbuf data) */
+	/*
+	 * FIXME Swap mac address properly and also handle the
+	 * case for both odd and even number of stages that the
+	 * addresses end up the same at the end of the pipeline
+	 */
+	eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
+	ether_addr_copy(&eth->d_addr, &addr);
+	ether_addr_copy(&addr, &eth->d_addr);
+
+	/* do a number of cycles of work per packet */
+	volatile uint64_t start_tsc = rte_rdtsc();
+	while (rte_rdtsc() < start_tsc + cdata.worker_cycles)
+		rte_pause();
+}
+
+static __rte_always_inline void
+schedule_devices(unsigned int lcore_id)
+{
+	if (fdata->rx_core[lcore_id]) {
+		rte_service_run_iter_on_app_lcore(fdata->rxadptr_service_id,
+				!fdata->rx_single);
+	}
+
+	if (fdata->sched_core[lcore_id]) {
+		rte_service_run_iter_on_app_lcore(fdata->evdev_service_id,
+				!fdata->sched_single);
+		if (cdata.dump_dev_signal) {
+			rte_event_dev_dump(0, stdout);
+			cdata.dump_dev_signal = 0;
+		}
+	}
+
+	if (fdata->tx_core[lcore_id] && (fdata->tx_single ||
+			 rte_atomic32_cmpset(&(fdata->tx_lock), 0, 1))) {
+		fdata->cap.consumer();
+		rte_atomic32_clear((rte_atomic32_t *)&(fdata->tx_lock));
+	}
+}
+
+void set_worker_generic_setup_data(struct setup_data *caps, bool burst);
diff --git a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c
new file mode 100644
index 000000000..d2bc6d355
--- /dev/null
+++ b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c
@@ -0,0 +1,398 @@
+/*
+ * SPDX-License-Identifier: BSD-3-Clause
+ * Copyright 2016 Intel Corporation.
+ * Copyright 2017 Cavium, Inc.
+ */
+
+#include "pipeline_common.h"
+
+static int
+worker_generic_burst(void *arg)
+{
+	struct rte_event events[BATCH_SIZE];
+
+	struct worker_data *data = (struct worker_data *)arg;
+	uint8_t dev_id = data->dev_id;
+	uint8_t port_id = data->port_id;
+	size_t sent = 0, received = 0;
+	unsigned int lcore_id = rte_lcore_id();
+
+	while (!fdata->done) {
+		uint16_t i;
+
+		if (fdata->cap.scheduler)
+			fdata->cap.scheduler(lcore_id);
+
+		if (!fdata->worker_core[lcore_id]) {
+			rte_pause();
+			continue;
+		}
+
+		const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
+				events, RTE_DIM(events), 0);
+
+		if (nb_rx == 0) {
+			rte_pause();
+			continue;
+		}
+		received += nb_rx;
+
+		for (i = 0; i < nb_rx; i++) {
+
+			/* The first worker stage does classification */
+			if (events[i].queue_id == cdata.qid[0])
+				events[i].flow_id = events[i].mbuf->hash.rss
+							% cdata.num_fids;
+
+			events[i].queue_id = cdata.next_qid[events[i].queue_id];
+			events[i].op = RTE_EVENT_OP_FORWARD;
+			events[i].sched_type = cdata.queue_type;
+
+			work(events[i].mbuf);
+		}
+		uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
+				events, nb_rx);
+		while (nb_tx < nb_rx && !fdata->done)
+			nb_tx += rte_event_enqueue_burst(dev_id, port_id,
+							events + nb_tx,
+							nb_rx - nb_tx);
+		sent += nb_tx;
+	}
+
+	if (!cdata.quiet)
+		printf("  worker %u thread done. RX=%zu TX=%zu\n",
+				rte_lcore_id(), received, sent);
+
+	return 0;
+}
+
+static __rte_always_inline int
+consumer_burst(void)
+{
+	const uint64_t freq_khz = rte_get_timer_hz() / 1000;
+	struct rte_event packets[BATCH_SIZE];
+
+	static uint64_t received;
+	static uint64_t last_pkts;
+	static uint64_t last_time;
+	static uint64_t start_time;
+	unsigned int i, j;
+	uint8_t dev_id = cons_data.dev_id;
+	uint8_t port_id = cons_data.port_id;
+	uint16_t nb_ports = rte_eth_dev_count();
+
+	do {
+		uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
+				packets, RTE_DIM(packets), 0);
+
+		if (n == 0) {
+			for (j = 0; j < nb_ports; j++)
+				rte_eth_tx_buffer_flush(j, 0, fdata->tx_buf[j]);
+			return 0;
+		}
+		if (start_time == 0)
+			last_time = start_time = rte_get_timer_cycles();
+
+		received += n;
+		for (i = 0; i < n; i++) {
+			uint8_t outport = packets[i].mbuf->port;
+			rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
+					packets[i].mbuf);
+
+			packets[i].op = RTE_EVENT_OP_RELEASE;
+		}
+
+		if (cons_data.release) {
+			uint16_t nb_tx;
+
+			nb_tx = rte_event_enqueue_burst(dev_id, port_id,
+								packets, n);
+			while (nb_tx < n)
+				nb_tx += rte_event_enqueue_burst(dev_id,
+						port_id, packets + nb_tx,
+						n - nb_tx);
+		}
+
+		/* Print out mpps every 1<22 packets */
+		if (!cdata.quiet && received >= last_pkts + (1<<22)) {
+			const uint64_t now = rte_get_timer_cycles();
+			const uint64_t total_ms = (now - start_time) / freq_khz;
+			const uint64_t delta_ms = (now - last_time) / freq_khz;
+			uint64_t delta_pkts = received - last_pkts;
+
+			printf("# consumer RX=%"PRIu64", time %"PRIu64 "ms, "
+					"avg %.3f mpps [current %.3f mpps]\n",
+					received,
+					total_ms,
+					received / (total_ms * 1000.0),
+					delta_pkts / (delta_ms * 1000.0));
+			last_pkts = received;
+			last_time = now;
+		}
+
+		cdata.num_packets -= n;
+		if (cdata.num_packets <= 0)
+			fdata->done = 1;
+	/* Be stuck in this loop if single. */
+	} while (!fdata->done && fdata->tx_single);
+
+	return 0;
+}
+
+static int
+setup_eventdev_generic(struct cons_data *cons_data,
+		struct worker_data *worker_data)
+{
+	const uint8_t dev_id = 0;
+	/* +1 stages is for a SINGLE_LINK TX stage */
+	const uint8_t nb_queues = cdata.num_stages + 1;
+	/* + 1 is one port for consumer */
+	const uint8_t nb_ports = cdata.num_workers + 1;
+	struct rte_event_dev_config config = {
+			.nb_event_queues = nb_queues,
+			.nb_event_ports = nb_ports,
+			.nb_events_limit  = 4096,
+			.nb_event_queue_flows = 1024,
+			.nb_event_port_dequeue_depth = 128,
+			.nb_event_port_enqueue_depth = 128,
+	};
+	struct rte_event_port_conf wkr_p_conf = {
+			.dequeue_depth = cdata.worker_cq_depth,
+			.enqueue_depth = 64,
+			.new_event_threshold = 4096,
+	};
+	struct rte_event_queue_conf wkr_q_conf = {
+			.schedule_type = cdata.queue_type,
+			.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+			.nb_atomic_flows = 1024,
+		.nb_atomic_order_sequences = 1024,
+	};
+	struct rte_event_port_conf tx_p_conf = {
+			.dequeue_depth = 128,
+			.enqueue_depth = 128,
+			.new_event_threshold = 4096,
+	};
+	struct rte_event_queue_conf tx_q_conf = {
+			.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
+			.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
+	};
+
+	struct port_link worker_queues[MAX_NUM_STAGES];
+	uint8_t disable_implicit_release;
+	struct port_link tx_queue;
+	unsigned int i;
+
+	int ret, ndev = rte_event_dev_count();
+	if (ndev < 1) {
+		printf("%d: No Eventdev Devices Found\n", __LINE__);
+		return -1;
+	}
+
+	struct rte_event_dev_info dev_info;
+	ret = rte_event_dev_info_get(dev_id, &dev_info);
+	printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
+
+	disable_implicit_release = (dev_info.event_dev_cap &
+			RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE);
+
+	wkr_p_conf.disable_implicit_release = disable_implicit_release;
+	tx_p_conf.disable_implicit_release = disable_implicit_release;
+
+	if (dev_info.max_event_port_dequeue_depth <
+			config.nb_event_port_dequeue_depth)
+		config.nb_event_port_dequeue_depth =
+				dev_info.max_event_port_dequeue_depth;
+	if (dev_info.max_event_port_enqueue_depth <
+			config.nb_event_port_enqueue_depth)
+		config.nb_event_port_enqueue_depth =
+				dev_info.max_event_port_enqueue_depth;
+
+	ret = rte_event_dev_configure(dev_id, &config);
+	if (ret < 0) {
+		printf("%d: Error configuring device\n", __LINE__);
+		return -1;
+	}
+
+	/* Q creation - one load balanced per pipeline stage*/
+	printf("  Stages:\n");
+	for (i = 0; i < cdata.num_stages; i++) {
+		if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) {
+			printf("%d: error creating qid %d\n", __LINE__, i);
+			return -1;
+		}
+		cdata.qid[i] = i;
+		cdata.next_qid[i] = i+1;
+		worker_queues[i].queue_id = i;
+		if (cdata.enable_queue_priorities) {
+			/* calculate priority stepping for each stage, leaving
+			 * headroom of 1 for the SINGLE_LINK TX below
+			 */
+			const uint32_t prio_delta =
+				(RTE_EVENT_DEV_PRIORITY_LOWEST-1) /  nb_queues;
+
+			/* higher priority for queues closer to tx */
+			wkr_q_conf.priority =
+				RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta * i;
+		}
+
+		const char *type_str = "Atomic";
+		switch (wkr_q_conf.schedule_type) {
+		case RTE_SCHED_TYPE_ORDERED:
+			type_str = "Ordered";
+			break;
+		case RTE_SCHED_TYPE_PARALLEL:
+			type_str = "Parallel";
+			break;
+		}
+		printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str,
+				wkr_q_conf.priority);
+	}
+	printf("\n");
+
+	/* final queue for sending to TX core */
+	if (rte_event_queue_setup(dev_id, i, &tx_q_conf) < 0) {
+		printf("%d: error creating qid %d\n", __LINE__, i);
+		return -1;
+	}
+	tx_queue.queue_id = i;
+	tx_queue.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST;
+
+	if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
+		wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
+	if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
+		wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
+
+	/* set up one port per worker, linking to all stage queues */
+	for (i = 0; i < cdata.num_workers; i++) {
+		struct worker_data *w = &worker_data[i];
+		w->dev_id = dev_id;
+		if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
+			printf("Error setting up port %d\n", i);
+			return -1;
+		}
+
+		uint32_t s;
+		for (s = 0; s < cdata.num_stages; s++) {
+			if (rte_event_port_link(dev_id, i,
+						&worker_queues[s].queue_id,
+						&worker_queues[s].priority,
+						1) != 1) {
+				printf("%d: error creating link for port %d\n",
+						__LINE__, i);
+				return -1;
+			}
+		}
+		w->port_id = i;
+	}
+
+	if (tx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
+		tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
+	if (tx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
+		tx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
+
+	/* port for consumer, linked to TX queue */
+	if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
+		printf("Error setting up port %d\n", i);
+		return -1;
+	}
+	if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
+				&tx_queue.priority, 1) != 1) {
+		printf("%d: error creating link for port %d\n",
+				__LINE__, i);
+		return -1;
+	}
+	*cons_data = (struct cons_data){.dev_id = dev_id,
+					.port_id = i,
+					.release = disable_implicit_release };
+
+	ret = rte_event_dev_service_id_get(dev_id,
+				&fdata->evdev_service_id);
+	if (ret != -ESRCH && ret != 0) {
+		printf("Error getting the service ID for sw eventdev\n");
+		return -1;
+	}
+	rte_service_runstate_set(fdata->evdev_service_id, 1);
+	rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
+	if (rte_event_dev_start(dev_id) < 0) {
+		printf("Error starting eventdev\n");
+		return -1;
+	}
+
+	return dev_id;
+}
+
+static void
+init_rx_adapter(uint16_t nb_ports)
+{
+	int i;
+	int ret;
+	uint8_t evdev_id = 0;
+	struct rte_event_dev_info dev_info;
+
+	ret = rte_event_dev_info_get(evdev_id, &dev_info);
+
+	struct rte_event_port_conf rx_p_conf = {
+		.dequeue_depth = 8,
+		.enqueue_depth = 8,
+		.new_event_threshold = 1200,
+	};
+
+	if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
+		rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
+	if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
+		rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
+
+	/* Create one adapter for all the ethernet ports. */
+	ret = rte_event_eth_rx_adapter_create(cdata.rx_adapter_id, evdev_id,
+			&rx_p_conf);
+	if (ret)
+		rte_exit(EXIT_FAILURE, "failed to create rx adapter[%d]",
+				cdata.rx_adapter_id);
+
+	struct rte_event_eth_rx_adapter_queue_conf queue_conf = {
+		.ev.sched_type = cdata.queue_type,
+		.ev.queue_id = cdata.qid[0],
+	};
+
+	for (i = 0; i < nb_ports; i++) {
+		uint32_t cap;
+
+		ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
+		if (ret)
+			rte_exit(EXIT_FAILURE,
+					"failed to get event rx adapter "
+					"capabilities");
+
+		ret = rte_event_eth_rx_adapter_queue_add(cdata.rx_adapter_id, i,
+				-1, &queue_conf);
+		if (ret)
+			rte_exit(EXIT_FAILURE,
+					"Failed to add queues to Rx adapter");
+	}
+
+	ret = rte_event_eth_rx_adapter_service_id_get(cdata.rx_adapter_id,
+				&fdata->rxadptr_service_id);
+	if (ret != -ESRCH && ret != 0) {
+		rte_exit(EXIT_FAILURE,
+			"Error getting the service ID for sw eventdev\n");
+	}
+	rte_service_runstate_set(fdata->rxadptr_service_id, 1);
+	rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 0);
+
+	ret = rte_event_eth_rx_adapter_start(cdata.rx_adapter_id);
+	if (ret)
+		rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
+				cdata.rx_adapter_id);
+}
+
+void
+set_worker_generic_setup_data(struct setup_data *caps, bool burst)
+{
+	RTE_SET_USED(burst);
+	caps->consumer = consumer_burst;
+	caps->worker = worker_generic_burst;
+
+	caps->adptr_setup = init_rx_adapter;
+	caps->scheduler = schedule_devices;
+	caps->evdev_setup = setup_eventdev_generic;
+}
--
2.15.1

  parent reply	other threads:[~2018-01-10 11:10 UTC|newest]

Thread overview: 48+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2017-12-07 20:36 [PATCH 00/13] examples/eventdev: add capability based pipeline support Pavan Nikhilesh
2017-12-07 20:36 ` [PATCH 01/13] examples/eventdev: add Rx adapter support Pavan Nikhilesh
2017-12-11 16:15   ` Eads, Gage
2017-12-12  8:17     ` Pavan Nikhilesh Bhagavatula
2017-12-12 15:59       ` Eads, Gage
2017-12-07 20:36 ` [PATCH 02/13] examples/eventdev: move common data into pipeline common Pavan Nikhilesh
2017-12-11 16:15   ` Eads, Gage
2017-12-12  8:19     ` Pavan Nikhilesh Bhagavatula
2017-12-07 20:36 ` [PATCH 03/13] examples/eventdev: add framework for caps based pipeline Pavan Nikhilesh
2017-12-07 20:36 ` [PATCH 04/13] examples/eventdev: add generic worker pipeline Pavan Nikhilesh
2017-12-07 20:36 ` [PATCH 05/13] examples/eventdev: add ops to check cmdline args Pavan Nikhilesh
2017-12-19 11:23   ` Van Haaren, Harry
2017-12-07 20:36 ` [PATCH 06/13] examples/eventdev: add non burst mode generic worker Pavan Nikhilesh
2017-12-19 13:26   ` Van Haaren, Harry
2017-12-19 19:01     ` Pavan Nikhilesh
2017-12-07 20:36 ` [PATCH 07/13] examples/eventdev: add thread safe Tx worker pipeline Pavan Nikhilesh
2017-12-19 12:00   ` Van Haaren, Harry
2017-12-19 18:55     ` Pavan Nikhilesh
2017-12-07 20:37 ` [PATCH 08/13] examples/eventdev: add burst for thread safe pipeline Pavan Nikhilesh
2017-12-07 20:37 ` [PATCH 09/13] examples/eventdev: add all type queue option Pavan Nikhilesh
2017-12-19 13:18   ` Van Haaren, Harry
2017-12-19 19:05     ` Pavan Nikhilesh
2017-12-07 20:37 ` [PATCH 10/13] examples/eventdev: add single stage pipeline worker Pavan Nikhilesh
2017-12-11 16:45   ` Eads, Gage
2017-12-07 20:37 ` [PATCH 11/13] examples/eventdev: add atq " Pavan Nikhilesh
2017-12-19 13:34   ` Van Haaren, Harry
2017-12-07 20:37 ` [PATCH 12/13] examples/eventdev_pipeline_sw_pmd: rename example Pavan Nikhilesh
2017-12-07 20:37 ` [PATCH 13/13] doc: update example eventdev_pipeline Pavan Nikhilesh
2017-12-11 11:29   ` Laatz, Kevin
2018-01-10 11:09 ` [PATCH v2 01/15] examples/eventdev: add Rx adapter support Pavan Nikhilesh
2018-01-10 11:10   ` [PATCH v2 02/15] examples/eventdev: move common data into pipeline common Pavan Nikhilesh
2018-01-10 11:10   ` [PATCH v2 03/15] examples/eventdev: add framework for caps based pipeline Pavan Nikhilesh
2018-01-10 11:10   ` Pavan Nikhilesh [this message]
2018-01-10 11:10   ` [PATCH v2 05/15] examples/eventdev: add ops to check cmdline args Pavan Nikhilesh
2018-01-10 11:10   ` [PATCH v2 06/15] examples/eventdev: add non burst mode generic worker Pavan Nikhilesh
2018-01-10 11:10   ` [PATCH v2 07/15] examples/eventdev: modify work cycles Pavan Nikhilesh
2018-01-15 10:14     ` Van Haaren, Harry
2018-01-10 11:10   ` [PATCH v2 08/15] examples/eventdev: add thread safe Tx worker pipeline Pavan Nikhilesh
2018-01-10 11:10   ` [PATCH v2 09/15] examples/eventdev: add burst for thread safe pipeline Pavan Nikhilesh
2018-01-10 11:10   ` [PATCH v2 10/15] examples/eventdev: add all type queue option Pavan Nikhilesh
2018-01-10 11:10   ` [PATCH v2 11/15] examples/eventdev: add single stage pipeline worker Pavan Nikhilesh
2018-01-10 11:10   ` [PATCH v2 12/15] examples/eventdev: add atq " Pavan Nikhilesh
2018-01-10 11:10   ` [PATCH v2 13/15] examples/eventdev: add mempool size configuration Pavan Nikhilesh
2018-01-10 11:10   ` [PATCH v2 14/15] examples/eventdev_pipeline_sw_pmd: rename example Pavan Nikhilesh
2018-01-10 11:10   ` [PATCH v2 15/15] doc: update example eventdev pipeline Pavan Nikhilesh
2018-01-16 11:34     ` Kovacevic, Marko
2018-01-16 10:35   ` [PATCH v2 01/15] examples/eventdev: add Rx adapter support Van Haaren, Harry
2018-01-16 16:12     ` Jerin Jacob

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20180110111013.14644-4-pbhagavatula@caviumnetworks.com \
    --to=pbhagavatula@caviumnetworks.com \
    --cc=dev@dpdk.org \
    --cc=gage.eads@intel.com \
    --cc=harry.van.haaren@intel.com \
    --cc=hemant.agrawal@nxp.com \
    --cc=jerin.jacobkollanukkaran@cavium.com \
    --cc=liang.j.ma@intel.com \
    --cc=santosh.shukla@caviumnetworks.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.