From mboxrd@z Thu Jan 1 00:00:00 1970 From: Pavan Nikhilesh Subject: [PATCH v2 10/15] examples/eventdev: add all type queue option Date: Wed, 10 Jan 2018 16:40:08 +0530 Message-ID: <20180110111013.14644-10-pbhagavatula@caviumnetworks.com> References: <20171207203705.25020-1-pbhagavatula@caviumnetworks.com> <20180110111013.14644-1-pbhagavatula@caviumnetworks.com> Mime-Version: 1.0 Content-Type: text/plain Cc: dev@dpdk.org, Pavan Nikhilesh To: gage.eads@intel.com, jerin.jacobkollanukkaran@cavium.com, harry.van.haaren@intel.com, hemant.agrawal@nxp.com, liang.j.ma@intel.com, santosh.shukla@caviumnetworks.com Return-path: Received: from NAM03-BY2-obe.outbound.protection.outlook.com (mail-by2nam03on0064.outbound.protection.outlook.com [104.47.42.64]) by dpdk.org (Postfix) with ESMTP id EAEBE1B169 for ; Wed, 10 Jan 2018 12:11:09 +0100 (CET) In-Reply-To: <20180110111013.14644-1-pbhagavatula@caviumnetworks.com> List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Added configurable option to make queue type as all type queues i.e. RTE_EVENT_QUEUE_CFG_ALL_TYPES based on event dev capability RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES. This can be enabled by supplying '-a' as a cmdline argument. Signed-off-by: Pavan Nikhilesh --- v2 Changes: - redo worker selection logic(Harry) examples/eventdev_pipeline_sw_pmd/main.c | 7 +- .../eventdev_pipeline_sw_pmd/pipeline_common.h | 1 + .../pipeline_worker_generic.c | 5 + .../eventdev_pipeline_sw_pmd/pipeline_worker_tx.c | 162 +++++++++++++++++++-- 4 files changed, 164 insertions(+), 11 deletions(-) diff --git a/examples/eventdev_pipeline_sw_pmd/main.c b/examples/eventdev_pipeline_sw_pmd/main.c index f877e695b..2c7b02b86 100644 --- a/examples/eventdev_pipeline_sw_pmd/main.c +++ b/examples/eventdev_pipeline_sw_pmd/main.c @@ -148,6 +148,7 @@ static struct option long_options[] = { {"parallel", no_argument, 0, 'p'}, {"ordered", no_argument, 0, 'o'}, {"quiet", no_argument, 0, 'q'}, + {"use-atq", no_argument, 0, 'a'}, {"dump", no_argument, 0, 'D'}, {0, 0, 0, 0} }; @@ -171,6 +172,7 @@ usage(void) " -o, --ordered Use ordered scheduling\n" " -p, --parallel Use parallel scheduling\n" " -q, --quiet Minimize printed output\n" + " -a, --use-atq Use all type queues\n" " -D, --dump Print detailed statistics before exit" "\n"; fprintf(stderr, "%s", usage_str); @@ -191,7 +193,7 @@ parse_app_args(int argc, char **argv) int i; for (;;) { - c = getopt_long(argc, argv, "r:t:e:c:w:n:f:s:poPqDW:", + c = getopt_long(argc, argv, "r:t:e:c:w:n:f:s:paoPqDW:", long_options, &option_index); if (c == -1) break; @@ -224,6 +226,9 @@ parse_app_args(int argc, char **argv) case 'p': cdata.queue_type = RTE_SCHED_TYPE_PARALLEL; break; + case 'a': + cdata.all_type_queues = 1; + break; case 'q': cdata.quiet = 1; break; diff --git a/examples/eventdev_pipeline_sw_pmd/pipeline_common.h b/examples/eventdev_pipeline_sw_pmd/pipeline_common.h index e06320050..66553038c 100644 --- a/examples/eventdev_pipeline_sw_pmd/pipeline_common.h +++ b/examples/eventdev_pipeline_sw_pmd/pipeline_common.h @@ -77,6 +77,7 @@ struct config_data { int quiet; int dump_dev; int dump_dev_signal; + int all_type_queues; unsigned int num_stages; unsigned int worker_cq_depth; unsigned int rx_stride; diff --git a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c index 90f87709c..2c51f4a30 100644 --- a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c +++ b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c @@ -516,6 +516,11 @@ generic_opt_check(void) memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info)); rte_event_dev_info_get(0, &eventdev_info); + if (cdata.all_type_queues && !(eventdev_info.event_dev_cap & + RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES)) + rte_exit(EXIT_FAILURE, + "Event dev doesn't support all type queues\n"); + for (i = 0; i < rte_eth_dev_count(); i++) { ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap); if (ret) diff --git a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c index 419d8d410..25eabbd6c 100644 --- a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c +++ b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c @@ -94,6 +94,52 @@ worker_do_tx(void *arg) return 0; } +static int +worker_do_tx_atq(void *arg) +{ + struct rte_event ev; + + struct worker_data *data = (struct worker_data *)arg; + const uint8_t dev = data->dev_id; + const uint8_t port = data->port_id; + const uint8_t lst_qid = cdata.num_stages - 1; + size_t fwd = 0, received = 0, tx = 0; + + while (!fdata->done) { + + if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { + rte_pause(); + continue; + } + + received++; + const uint8_t cq_id = ev.sub_event_type % cdata.num_stages; + + if (cq_id == lst_qid) { + if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { + worker_tx_pkt(ev.mbuf); + tx++; + continue; + } + + worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); + } else { + ev.sub_event_type++; + worker_fwd_event(&ev, cdata.queue_type); + } + work(); + + worker_event_enqueue(dev, port, &ev); + fwd++; + } + + if (!cdata.quiet) + printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", + rte_lcore_id(), received, fwd, tx); + + return 0; +} + static int worker_do_tx_burst(void *arg) { @@ -149,17 +195,81 @@ worker_do_tx_burst(void *arg) return 0; } +static int +worker_do_tx_burst_atq(void *arg) +{ + struct rte_event ev[BATCH_SIZE]; + + struct worker_data *data = (struct worker_data *)arg; + uint8_t dev = data->dev_id; + uint8_t port = data->port_id; + uint8_t lst_qid = cdata.num_stages - 1; + size_t fwd = 0, received = 0, tx = 0; + + while (!fdata->done) { + uint16_t i; + + const uint16_t nb_rx = rte_event_dequeue_burst(dev, port, + ev, BATCH_SIZE, 0); + + if (nb_rx == 0) { + rte_pause(); + continue; + } + received += nb_rx; + + for (i = 0; i < nb_rx; i++) { + const uint8_t cq_id = ev[i].sub_event_type % + cdata.num_stages; + + if (cq_id == lst_qid) { + if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { + worker_tx_pkt(ev[i].mbuf); + tx++; + ev[i].op = RTE_EVENT_OP_RELEASE; + continue; + } + + worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); + } else { + ev[i].sub_event_type++; + worker_fwd_event(&ev[i], cdata.queue_type); + } + work(); + } + + worker_event_enqueue_burst(dev, port, ev, nb_rx); + fwd += nb_rx; + } + + if (!cdata.quiet) + printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", + rte_lcore_id(), received, fwd, tx); + + return 0; +} + static int setup_eventdev_worker_tx(struct cons_data *cons_data, struct worker_data *worker_data) { RTE_SET_USED(cons_data); uint8_t i; + const uint8_t atq = cdata.all_type_queues ? 1 : 0; const uint8_t dev_id = 0; const uint8_t nb_ports = cdata.num_workers; uint8_t nb_slots = 0; - uint8_t nb_queues = rte_eth_dev_count() * cdata.num_stages; - nb_queues += rte_eth_dev_count(); + uint8_t nb_queues = rte_eth_dev_count(); + + /* + * In case where all type queues are not enabled, use queues equal to + * number of stages * eth_dev_count and one extra queue per pipeline + * for Tx. + */ + if (!atq) { + nb_queues *= cdata.num_stages; + nb_queues += rte_eth_dev_count(); + } struct rte_event_dev_config config = { .nb_event_queues = nb_queues, @@ -211,12 +321,19 @@ setup_eventdev_worker_tx(struct cons_data *cons_data, printf(" Stages:\n"); for (i = 0; i < nb_queues; i++) { - uint8_t slot; + if (atq) { - nb_slots = cdata.num_stages + 1; - slot = i % nb_slots; - wkr_q_conf.schedule_type = slot == cdata.num_stages ? - RTE_SCHED_TYPE_ATOMIC : cdata.queue_type; + nb_slots = cdata.num_stages; + wkr_q_conf.event_queue_cfg = + RTE_EVENT_QUEUE_CFG_ALL_TYPES; + } else { + uint8_t slot; + + nb_slots = cdata.num_stages + 1; + slot = i % nb_slots; + wkr_q_conf.schedule_type = slot == cdata.num_stages ? + RTE_SCHED_TYPE_ATOMIC : cdata.queue_type; + } if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) { printf("%d: error creating qid %d\n", __LINE__, i); @@ -286,7 +403,7 @@ setup_eventdev_worker_tx(struct cons_data *cons_data, * * This forms two set of queue pipelines 0->1->2->tx and 3->4->5->tx. */ - cdata.rx_stride = nb_slots; + cdata.rx_stride = atq ? 1 : nb_slots; ret = rte_event_dev_service_id_get(dev_id, &fdata->evdev_service_id); if (ret != -ESRCH && ret != 0) { @@ -450,6 +567,11 @@ worker_tx_opt_check(void) memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info)); rte_event_dev_info_get(0, &eventdev_info); + if (cdata.all_type_queues && !(eventdev_info.event_dev_cap & + RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES)) + rte_exit(EXIT_FAILURE, + "Event dev doesn't support all type queues\n"); + for (i = 0; i < rte_eth_dev_count(); i++) { ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap); if (ret) @@ -477,13 +599,33 @@ worker_tx_opt_check(void) } } +static worker_loop +get_worker_loop_burst(uint8_t atq) +{ + if (atq) + return worker_do_tx_burst_atq; + + return worker_do_tx_burst; +} + +static worker_loop +get_worker_loop_non_burst(uint8_t atq) +{ + if (atq) + return worker_do_tx_atq; + + return worker_do_tx; +} + void set_worker_tx_setup_data(struct setup_data *caps, bool burst) { + uint8_t atq = cdata.all_type_queues ? 1 : 0; + if (burst) - caps->worker = worker_do_tx_burst; + caps->worker = get_worker_loop_burst(atq); else - caps->worker = worker_do_tx; + caps->worker = get_worker_loop_non_burst(atq); memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); -- 2.15.1