From mboxrd@z Thu Jan 1 00:00:00 1970 From: "Hunt, David" Subject: Re: [PATCH 1/3] examples/eventdev_pipeline: added sample app Date: Mon, 26 Jun 2017 15:46:47 +0100 Message-ID: <6c53f05d-2dd2-5b83-2eab-bcecd93bea82@intel.com> References: <1492768299-84016-1-git-send-email-harry.van.haaren@intel.com> <1492768299-84016-2-git-send-email-harry.van.haaren@intel.com> <20170510141202.GA8431@jerin> Mime-Version: 1.0 Content-Type: text/plain; charset=windows-1252; format=flowed Content-Transfer-Encoding: 7bit Cc: dev@dpdk.org, Gage Eads , Bruce Richardson To: Jerin Jacob , Harry van Haaren Return-path: Received: from mga04.intel.com (mga04.intel.com [192.55.52.120]) by dpdk.org (Postfix) with ESMTP id EE3775689 for ; Mon, 26 Jun 2017 16:46:51 +0200 (CEST) In-Reply-To: <20170510141202.GA8431@jerin> List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Hi Jerin, I'm assisting Harry on the sample app, and have just pushed up a V2 patch based on your feedback. I've addressed most of your suggestions, comments below. There may still a couple of outstanding questions that need further discussion. Regards, Dave On 10/5/2017 3:12 PM, Jerin Jacob wrote: > -----Original Message----- >> Date: Fri, 21 Apr 2017 10:51:37 +0100 >> From: Harry van Haaren >> To: dev@dpdk.org >> CC: jerin.jacob@caviumnetworks.com, Harry van Haaren >> , Gage Eads , Bruce >> Richardson >> Subject: [PATCH 1/3] examples/eventdev_pipeline: added sample app >> X-Mailer: git-send-email 2.7.4 >> >> This commit adds a sample app for the eventdev library. >> The app has been tested with DPDK 17.05-rc2, hence this >> release (or later) is recommended. >> >> The sample app showcases a pipeline processing use-case, >> with event scheduling and processing defined per stage. >> The application recieves traffic as normal, with each >> packet traversing the pipeline. Once the packet has >> been processed by each of the pipeline stages, it is >> transmitted again. >> >> The app provides a framework to utilize cores for a single >> role or multiple roles. Examples of roles are the RX core, >> TX core, Scheduling core (in the case of the event/sw PMD), >> and worker cores. >> >> Various flags are available to configure numbers of stages, >> cycles of work at each stage, type of scheduling, number of >> worker cores, queue depths etc. For a full explaination, >> please refer to the documentation. >> >> Signed-off-by: Gage Eads >> Signed-off-by: Bruce Richardson >> Signed-off-by: Harry van Haaren > > Thanks for the example application to share the SW view. > I could make it run on HW after some tweaking(not optimized though) > > [...] >> +#define MAX_NUM_STAGES 8 >> +#define BATCH_SIZE 16 >> +#define MAX_NUM_CORE 64 > > How about RTE_MAX_LCORE? Core usage in the sample app is held in a uint64_t. Adding arrays would be possible, but I feel that the extra effort would not give that much benefit. I've left as is for the moment, unless you see any strong requirement to go beyond 64 cores? > >> + >> +static unsigned int active_cores; >> +static unsigned int num_workers; >> +static unsigned long num_packets = (1L << 25); /* do ~32M packets */ >> +static unsigned int num_fids = 512; >> +static unsigned int num_priorities = 1; > > looks like its not used. Yes, Removed. > >> +static unsigned int num_stages = 1; >> +static unsigned int worker_cq_depth = 16; >> +static int queue_type = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY; >> +static int16_t next_qid[MAX_NUM_STAGES+1] = {-1}; >> +static int16_t qid[MAX_NUM_STAGES] = {-1}; > > Moving all fastpath related variables under a structure with cache > aligned will help. I tried a few different combinations of this, and saw no gains, some losses. So will leave as is for the moment, if that's OK. > >> +static int worker_cycles; >> +static int enable_queue_priorities; >> + >> +struct prod_data { >> + uint8_t dev_id; >> + uint8_t port_id; >> + int32_t qid; >> + unsigned num_nic_ports; >> +}; Yes, saw a percent or two gain when this plus following two data structs cache aligned. > > cache aligned ? > >> + >> +struct cons_data { >> + uint8_t dev_id; >> + uint8_t port_id; >> +}; >> + > > cache aligned ? Yes, see comment above > >> +static struct prod_data prod_data; >> +static struct cons_data cons_data; >> + >> +struct worker_data { >> + uint8_t dev_id; >> + uint8_t port_id; >> +}; > > cache aligned ? Yes, see comment above > >> + >> +static unsigned *enqueue_cnt; >> +static unsigned *dequeue_cnt; >> + >> +static volatile int done; >> +static volatile int prod_stop; > > No one updating the prod_stop. Old var, removed. > >> +static int quiet; >> +static int dump_dev; >> +static int dump_dev_signal; >> + >> +static uint32_t rx_lock; >> +static uint32_t tx_lock; >> +static uint32_t sched_lock; >> +static bool rx_single; >> +static bool tx_single; >> +static bool sched_single; >> + >> +static unsigned rx_core[MAX_NUM_CORE]; >> +static unsigned tx_core[MAX_NUM_CORE]; >> +static unsigned sched_core[MAX_NUM_CORE]; >> +static unsigned worker_core[MAX_NUM_CORE]; >> + >> +static bool >> +core_in_use(unsigned lcore_id) { >> + return (rx_core[lcore_id] || sched_core[lcore_id] || >> + tx_core[lcore_id] || worker_core[lcore_id]); >> +} >> + >> +static struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS]; >> + >> +static void >> +rte_eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent, >> + void *userdata) > > IMO, It is better to not use rte_eth_* for application functions. Sure, removed the 'rte_' part of the function name. > >> +{ >> + int port_id = (uintptr_t) userdata; >> + unsigned _sent = 0; >> + >> + do { >> + /* Note: hard-coded TX queue */ >> + _sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent], >> + unsent - _sent); >> + } while (_sent != unsent); >> +} >> + >> +static int >> +consumer(void) >> +{ >> + const uint64_t freq_khz = rte_get_timer_hz() / 1000; >> + struct rte_event packets[BATCH_SIZE]; >> + >> + static uint64_t npackets; >> + static uint64_t received; >> + static uint64_t received_printed; >> + static uint64_t time_printed; >> + static uint64_t start_time; >> + unsigned i, j; >> + uint8_t dev_id = cons_data.dev_id; >> + uint8_t port_id = cons_data.port_id; >> + >> + if (!npackets) >> + npackets = num_packets; >> + >> + do { >> + uint16_t n = rte_event_dequeue_burst(dev_id, port_id, >> + packets, RTE_DIM(packets), 0); > > const uint16_t n = sure. > >> + >> + if (n == 0) { >> + for (j = 0; j < rte_eth_dev_count(); j++) >> + rte_eth_tx_buffer_flush(j, 0, tx_buf[j]); >> + return 0; >> + } >> + if (start_time == 0) >> + time_printed = start_time = rte_get_timer_cycles(); >> + >> + received += n; >> + for (i = 0; i < n; i++) { >> + uint8_t outport = packets[i].mbuf->port; >> + rte_eth_tx_buffer(outport, 0, tx_buf[outport], >> + packets[i].mbuf); >> + } >> + >> + if (!quiet && received >= received_printed + (1<<22)) { >> + const uint64_t now = rte_get_timer_cycles(); >> + const uint64_t delta_cycles = now - start_time; >> + const uint64_t elapsed_ms = delta_cycles / freq_khz; >> + const uint64_t interval_ms = >> + (now - time_printed) / freq_khz; >> + >> + uint64_t rx_noprint = received - received_printed; >> + printf("# consumer RX=%"PRIu64", time %"PRIu64 >> + "ms, avg %.3f mpps [current %.3f mpps]\n", >> + received, elapsed_ms, >> + (received) / (elapsed_ms * 1000.0), >> + rx_noprint / (interval_ms * 1000.0)); >> + received_printed = received; >> + time_printed = now; >> + } >> + >> + dequeue_cnt[0] += n; >> + >> + if (num_packets > 0 && npackets > 0) { >> + npackets -= n; >> + if (npackets == 0 || npackets > num_packets) >> + done = 1; >> + } > > Looks like very complicated logic.I think we can simplify it. I've simplified this. > >> + } while (0); > > do while(0); really required here? Removed. > >> + >> + return 0; >> +} >> + >> +static int >> +producer(void) >> +{ >> + static uint8_t eth_port; >> + struct rte_mbuf *mbufs[BATCH_SIZE]; >> + struct rte_event ev[BATCH_SIZE]; >> + uint32_t i, num_ports = prod_data.num_nic_ports; >> + int32_t qid = prod_data.qid; >> + uint8_t dev_id = prod_data.dev_id; >> + uint8_t port_id = prod_data.port_id; >> + uint32_t prio_idx = 0; >> + >> + const uint16_t nb_rx = rte_eth_rx_burst(eth_port, 0, mbufs, BATCH_SIZE); >> + if (++eth_port == num_ports) >> + eth_port = 0; >> + if (nb_rx == 0) { >> + rte_pause(); >> + return 0; >> + } >> + >> + for (i = 0; i < nb_rx; i++) { >> + ev[i].flow_id = mbufs[i]->hash.rss; > > prefetching the buff[i+1] may help here? I tried, didn't make much difference. > >> + ev[i].op = RTE_EVENT_OP_NEW; >> + ev[i].sched_type = queue_type; > > The value of RTE_EVENT_QUEUE_CFG_ORDERED_ONLY != RTE_SCHED_TYPE_ORDERED. So, we > cannot assign .sched_type as queue_type. > > I think, one option could be to avoid translation in application is to > - Remove RTE_EVENT_QUEUE_CFG_ALL_TYPES, RTE_EVENT_QUEUE_CFG_*_ONLY > - Introduce a new RTE_EVENT_DEV_CAP_ to denote RTE_EVENT_QUEUE_CFG_ALL_TYPES cap > ability > - add sched_type in struct rte_event_queue_conf. If capability flag is > not set then implementation takes sched_type value for the queue. > > Any thoughts? Not sure here, would it be ok for the moment, and we can work on a patch in the future? > > >> + ev[i].queue_id = qid; >> + ev[i].event_type = RTE_EVENT_TYPE_CPU; > > IMO, RTE_EVENT_TYPE_ETHERNET is the better option here as it is > producing the Ethernet packets/events. Changed to RTE_EVENT_TYPE_ETHDEV. > >> + ev[i].sub_event_type = 0; >> + ev[i].priority = RTE_EVENT_DEV_PRIORITY_NORMAL; >> + ev[i].mbuf = mbufs[i]; >> + RTE_SET_USED(prio_idx); >> + } >> + >> + const int nb_tx = rte_event_enqueue_burst(dev_id, port_id, ev, nb_rx); > > For producer pattern i.e a burst of RTE_EVENT_OP_NEW, OcteonTX can do burst > operation unlike FORWARD case(which is one event at a time).Earlier, I > thought I can abstract the producer pattern in PMD, but it looks like we > are going with application driven producer model based on latest RFC.So I think, > we can add one flag to rte_event_enqueue_burst to denote all the events > are of type RTE_EVENT_OP_NEW as hint.SW driver can ignore this. > > I can send a patch for the same. > > Any thoughts? I think this comment is closed now, as your patch is upstreamed, afaik? > > >> + if (nb_tx != nb_rx) { >> + for (i = nb_tx; i < nb_rx; i++) >> + rte_pktmbuf_free(mbufs[i]); >> + } >> + enqueue_cnt[0] += nb_tx; >> + >> + if (unlikely(prod_stop)) > > I think, No one updating the prod_stop Removed. > >> + done = 1; >> + >> + return 0; >> +} >> + >> +static inline void >> +schedule_devices(uint8_t dev_id, unsigned lcore_id) >> +{ >> + if (rx_core[lcore_id] && (rx_single || >> + rte_atomic32_cmpset(&rx_lock, 0, 1))) { > > This pattern(rte_atomic32_cmpset) makes application can inject only > "one core" worth of packets. Not enough for low-end cores. May be we need > multiple producer options. I think, new RFC is addressing it. OK, will leave this to the RFC. > >> + producer(); >> + rte_atomic32_clear((rte_atomic32_t *)&rx_lock); >> + } >> + >> + if (sched_core[lcore_id] && (sched_single || >> + rte_atomic32_cmpset(&sched_lock, 0, 1))) { >> + rte_event_schedule(dev_id); >> + if (dump_dev_signal) { >> + rte_event_dev_dump(0, stdout); >> + dump_dev_signal = 0; >> + } >> + rte_atomic32_clear((rte_atomic32_t *)&sched_lock); >> + } > > Lot of unwanted code if RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED set. > > I think, We can make common code with compile time aware and make > runtime workers based on the flag.. > i.e > rte_eal_remote_launch(worker_x, &worker_data[worker_idx], lcore_id); > rte_eal_remote_launch(worker_y, &worker_data[worker_idx], lcore_id); > > May we can improve after initial version. Yes, we can clean up after initial version. > >> + >> + if (tx_core[lcore_id] && (tx_single || >> + rte_atomic32_cmpset(&tx_lock, 0, 1))) { >> + consumer(); > > Should consumer() need to come in this pattern? I am thinking like > if events is from last stage then call consumer() in worker() > > I think, above scheme works better when the _same_ worker code need to run the > case where > 1) ethdev HW is capable to enqueuing the packets to same txq from > multiple thread > 2) ethdev is not capable to do so. > > So, The above cases can be addressed in configuration time where we link > the queues to port > case 1) Link all workers to last queue > case 2) Link only worker to last queue > > and keeping the common worker code. > > HW implementation has functional and performance issue if "two" ports are > assigned to one lcore for dequeue. The above scheme fixes that problem too. Can we have a bit more discussion on this item? Is this needed for this sample app, or can we perhaps work a patch for this later? Harry? > >> + rte_atomic32_clear((rte_atomic32_t *)&tx_lock); >> + } >> +} >> + >> +static int >> +worker(void *arg) >> +{ >> + struct rte_event events[BATCH_SIZE]; >> + >> + struct worker_data *data = (struct worker_data *)arg; >> + uint8_t dev_id = data->dev_id; >> + uint8_t port_id = data->port_id; >> + size_t sent = 0, received = 0; >> + unsigned lcore_id = rte_lcore_id(); >> + >> + while (!done) { >> + uint16_t i; >> + >> + schedule_devices(dev_id, lcore_id); >> + >> + if (!worker_core[lcore_id]) { >> + rte_pause(); >> + continue; >> + } >> + >> + uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id, >> + events, RTE_DIM(events), 0); >> + >> + if (nb_rx == 0) { >> + rte_pause(); >> + continue; >> + } >> + received += nb_rx; >> + >> + for (i = 0; i < nb_rx; i++) { >> + struct ether_hdr *eth; >> + struct ether_addr addr; >> + struct rte_mbuf *m = events[i].mbuf; >> + >> + /* The first worker stage does classification */ >> + if (events[i].queue_id == qid[0]) >> + events[i].flow_id = m->hash.rss % num_fids; > > Not sure why we need do(shrinking the flows) this in worker() in queue based pipeline. > If an PMD has any specific requirement on num_fids,I think, we > can move this configuration stage or PMD can choose optimum fid internally to > avoid modulus operation tax in fastpath in all PMD. > > Does struct rte_event_queue_conf.nb_atomic_flows help here? In my tests the modulus makes very little difference in the throughput. And I think it's good to have a way of varying the number of flows for testing different scenarios, even if it's not the most performant. > >> + >> + events[i].queue_id = next_qid[events[i].queue_id]; >> + events[i].op = RTE_EVENT_OP_FORWARD; > > missing events[i].sched_type.HW PMD does not work with this. > I think, we can use similar scheme like next_qid for next_sched_type. Done. added events[i].sched_type = queue_type. > >> + >> + /* change mac addresses on packet (to use mbuf data) */ >> + eth = rte_pktmbuf_mtod(m, struct ether_hdr *); >> + ether_addr_copy(ð->d_addr, &addr); >> + ether_addr_copy(ð->s_addr, ð->d_addr); >> + ether_addr_copy(&addr, ð->s_addr); > > IMO, We can make packet processing code code as "static inline function" so > different worker types can reuse. Done. moved out to a work() function. > >> + >> + /* do a number of cycles of work per packet */ >> + volatile uint64_t start_tsc = rte_rdtsc(); >> + while (rte_rdtsc() < start_tsc + worker_cycles) >> + rte_pause(); > > Ditto. Done. moved out to a work() function. > > I think, All worker specific variables like "worker_cycles" can moved into > one structure and use. > >> + } >> + uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id, >> + events, nb_rx); >> + while (nb_tx < nb_rx && !done) >> + nb_tx += rte_event_enqueue_burst(dev_id, port_id, >> + events + nb_tx, >> + nb_rx - nb_tx); >> + sent += nb_tx; >> + } >> + >> + if (!quiet) >> + printf(" worker %u thread done. RX=%zu TX=%zu\n", >> + rte_lcore_id(), received, sent); >> + >> + return 0; >> +} >> + >> +/* >> + * Parse the coremask given as argument (hexadecimal string) and fill >> + * the global configuration (core role and core count) with the parsed >> + * value. >> + */ >> +static int xdigit2val(unsigned char c) > > multiple instance of "xdigit2val" in DPDK repo. May be we can push this > as common code. Sure, that's something we can look at in a separate patch, now that it's being used more and more. > >> +{ >> + int val; >> + >> + if (isdigit(c)) >> + val = c - '0'; >> + else if (isupper(c)) >> + val = c - 'A' + 10; >> + else >> + val = c - 'a' + 10; >> + return val; >> +} >> + >> + >> +static void >> +usage(void) >> +{ >> + const char *usage_str = >> + " Usage: eventdev_demo [options]\n" >> + " Options:\n" >> + " -n, --packets=N Send N packets (default ~32M), 0 implies no limit\n" >> + " -f, --atomic-flows=N Use N random flows from 1 to N (default 16)\n" > > I think, this parameter now, effects the application fast path code.I think, > it should eventdev configuration para-mater. See above comment on num_fids > >> + " -s, --num_stages=N Use N atomic stages (default 1)\n" >> + " -r, --rx-mask=core mask Run NIC rx on CPUs in core mask\n" >> + " -w, --worker-mask=core mask Run worker on CPUs in core mask\n" >> + " -t, --tx-mask=core mask Run NIC tx on CPUs in core mask\n" >> + " -e --sched-mask=core mask Run scheduler on CPUs in core mask\n" >> + " -c --cq-depth=N Worker CQ depth (default 16)\n" >> + " -W --work-cycles=N Worker cycles (default 0)\n" >> + " -P --queue-priority Enable scheduler queue prioritization\n" >> + " -o, --ordered Use ordered scheduling\n" >> + " -p, --parallel Use parallel scheduling\n" > > IMO, all stage being "parallel" or "ordered" or "atomic" is one mode of > operation. It is valid have to any combination. We need to express that in > command like > example: > 3 stage with > O->A->P How about we add an option that specifies the mode of operation for each stage in a string? Maybe have a '-m' option (modes) e.g. '-m appo' for 4 stages with atomic, parallel, paralled, ordered. Or maybe reuse your test-eventdev parameter style? > >> + " -q, --quiet Minimize printed output\n" >> + " -D, --dump Print detailed statistics before exit" >> + "\n"; >> + fprintf(stderr, "%s", usage_str); >> + exit(1); >> +} >> + > > [...] > >> + rx_single = (popcnt == 1); >> + break; >> + case 't': >> + tx_lcore_mask = parse_coremask(optarg); >> + popcnt = __builtin_popcountll(tx_lcore_mask); >> + tx_single = (popcnt == 1); >> + break; >> + case 'e': >> + sched_lcore_mask = parse_coremask(optarg); >> + popcnt = __builtin_popcountll(sched_lcore_mask); >> + sched_single = (popcnt == 1); >> + break; >> + default: >> + usage(); >> + } >> + } >> + >> + if (worker_lcore_mask == 0 || rx_lcore_mask == 0 || >> + sched_lcore_mask == 0 || tx_lcore_mask == 0) { > > Need to honor RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED i.e sched_lcore_mask > is zero can be valid case. I'll seperate this out to a check for RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED. Need to do it later in eventdev_setup(), after eventdev instance is created. > >> + printf("Core part of pipeline was not assigned any cores. " >> + "This will stall the pipeline, please check core masks " >> + "(use -h for details on setting core masks):\n" >> + "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64 >> + "\n\tworkers: %"PRIu64"\n", >> + rx_lcore_mask, tx_lcore_mask, sched_lcore_mask, >> + worker_lcore_mask); >> + rte_exit(-1, "Fix core masks\n"); >> + } >> + if (num_stages == 0 || num_stages > MAX_NUM_STAGES) >> + usage(); >> + >> + for (i = 0; i < MAX_NUM_CORE; i++) { >> + rx_core[i] = !!(rx_lcore_mask & (1UL << i)); >> + tx_core[i] = !!(tx_lcore_mask & (1UL << i)); >> + sched_core[i] = !!(sched_lcore_mask & (1UL << i)); >> + worker_core[i] = !!(worker_lcore_mask & (1UL << i)); >> + >> + if (worker_core[i]) >> + num_workers++; >> + if (core_in_use(i)) >> + active_cores++; >> + } >> +} >> + >> + >> +struct port_link { >> + uint8_t queue_id; >> + uint8_t priority; >> +}; >> + >> +static int >> +setup_eventdev(struct prod_data *prod_data, >> + struct cons_data *cons_data, >> + struct worker_data *worker_data) >> +{ >> + const uint8_t dev_id = 0; >> + /* +1 stages is for a SINGLE_LINK TX stage */ >> + const uint8_t nb_queues = num_stages + 1; >> + /* + 2 is one port for producer and one for consumer */ >> + const uint8_t nb_ports = num_workers + 2; > > selection of number of ports is a function of rte_event_has_producer(). > I think, it will be addressed with RFC. > >> + const struct rte_event_dev_config config = { >> + .nb_event_queues = nb_queues, >> + .nb_event_ports = nb_ports, >> + .nb_events_limit = 4096, >> + .nb_event_queue_flows = 1024, >> + .nb_event_port_dequeue_depth = 128, >> + .nb_event_port_enqueue_depth = 128, > > OCTEONTX PMD driver has .nb_event_port_dequeue_depth = 1 and > .nb_event_port_enqueue_depth = 1 and struct rte_event_dev_info.min_dequeue_timeout_ns > = 853 value. > I think, we need to check the rte_event_dev_info_get() first to get the sane > values and take RTE_MIN or RTE_MAX based on the use case. > > or > > I can ignore this value in OCTEONTX PMD. But I am not sure NXP case, > Any thoughts from NXP folks. > > Added in code to do the relevant checks. It should now limit the enqueue/dequeue depths to the configured depth for the port if it's less. >> + }; >> + const struct rte_event_port_conf wkr_p_conf = { >> + .dequeue_depth = worker_cq_depth, > > Same as above See previous comment. > >> + .enqueue_depth = 64, > > Same as above See previous comment. > >> + .new_event_threshold = 4096, >> + }; >> + struct rte_event_queue_conf wkr_q_conf = { >> + .event_queue_cfg = queue_type, >> + .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, >> + .nb_atomic_flows = 1024, >> + .nb_atomic_order_sequences = 1024, >> + }; >> + const struct rte_event_port_conf tx_p_conf = { >> + .dequeue_depth = 128, > > Same as above See previous comment. >> + .enqueue_depth = 128, > > Same as above See previous comment. >> + .new_event_threshold = 4096, >> + }; >> + const struct rte_event_queue_conf tx_q_conf = { >> + .priority = RTE_EVENT_DEV_PRIORITY_HIGHEST, >> + .event_queue_cfg = >> + RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY | >> + RTE_EVENT_QUEUE_CFG_SINGLE_LINK, >> + .nb_atomic_flows = 1024, >> + .nb_atomic_order_sequences = 1024, >> + }; >> + >> + struct port_link worker_queues[MAX_NUM_STAGES]; >> + struct port_link tx_queue; >> + unsigned i; >> + >> + int ret, ndev = rte_event_dev_count(); >> + if (ndev < 1) { >> + printf("%d: No Eventdev Devices Found\n", __LINE__); >> + return -1; >> + } >> + >> + struct rte_event_dev_info dev_info; >> + ret = rte_event_dev_info_get(dev_id, &dev_info); >> + printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name); >> + >> + ret = rte_event_dev_configure(dev_id, &config); >> + if (ret < 0) >> + printf("%d: Error configuring device\n", __LINE__) > > Don't process further with failed configure. > Done. >> + >> + /* Q creation - one load balanced per pipeline stage*/ >> + >> + /* set up one port per worker, linking to all stage queues */ >> + for (i = 0; i < num_workers; i++) { >> + struct worker_data *w = &worker_data[i]; >> + w->dev_id = dev_id; >> + if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) { >> + printf("Error setting up port %d\n", i); >> + return -1; >> + } >> + >> + uint32_t s; >> + for (s = 0; s < num_stages; s++) { >> + if (rte_event_port_link(dev_id, i, >> + &worker_queues[s].queue_id, >> + &worker_queues[s].priority, >> + 1) != 1) { >> + printf("%d: error creating link for port %d\n", >> + __LINE__, i); >> + return -1; >> + } >> + } >> + w->port_id = i; >> + } >> + /* port for consumer, linked to TX queue */ >> + if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) { > > If ethdev supports MT txq queue support then this port can be linked to > worker too. something to consider for future. > Sure. No change for now. >> + printf("Error setting up port %d\n", i); >> + return -1; >> + } >> + if (rte_event_port_link(dev_id, i, &tx_queue.queue_id, >> + &tx_queue.priority, 1) != 1) { >> + printf("%d: error creating link for port %d\n", >> + __LINE__, i); >> + return -1; >> + } >> + /* port for producer, no links */ >> + const struct rte_event_port_conf rx_p_conf = { >> + .dequeue_depth = 8, >> + .enqueue_depth = 8, > > same as above issue.You could get default config first and configure. > Done. Checking config.nb_event_port_dequeue_depth and reducing if less. >> + .new_event_threshold = 1200, >> + }; >> + if (rte_event_port_setup(dev_id, i + 1, &rx_p_conf) < 0) { >> + printf("Error setting up port %d\n", i); >> + return -1; >> + } >> + >> + *prod_data = (struct prod_data){.dev_id = dev_id, >> + .port_id = i + 1, >> + .qid = qid[0] }; >> + *cons_data = (struct cons_data){.dev_id = dev_id, >> + .port_id = i }; >> + >> + enqueue_cnt = rte_calloc(0, >> + RTE_CACHE_LINE_SIZE/(sizeof(enqueue_cnt[0])), >> + sizeof(enqueue_cnt[0]), 0); >> + dequeue_cnt = rte_calloc(0, >> + RTE_CACHE_LINE_SIZE/(sizeof(dequeue_cnt[0])), >> + sizeof(dequeue_cnt[0]), 0); > > Why array? looks like enqueue_cnt[1] and dequeue_cnt[1] not used anywhere. > Looks like there was an intention to extend this more. And all the app does is increment without using. I've removed these two vars to clean up. >> + >> + if (rte_event_dev_start(dev_id) < 0) { >> + printf("Error starting eventdev\n"); >> + return -1; >> + } >> + >> + return dev_id; >> +} >> + >> +static void >> +signal_handler(int signum) >> +{ >> + if (done || prod_stop) > > I think, No one updating the prod_stop > Removed. >> + rte_exit(1, "Exiting on signal %d\n", signum); >> + if (signum == SIGINT || signum == SIGTERM) { >> + printf("\n\nSignal %d received, preparing to exit...\n", >> + signum); >> + done = 1; >> + } >> + if (signum == SIGTSTP) >> + rte_event_dev_dump(0, stdout); >> +} >> + >> +int >> +main(int argc, char **argv) > > [...] > >> + RTE_LCORE_FOREACH_SLAVE(lcore_id) { >> + if (lcore_id >= MAX_NUM_CORE) >> + break; >> + >> + if (!rx_core[lcore_id] && !worker_core[lcore_id] && >> + !tx_core[lcore_id] && !sched_core[lcore_id]) >> + continue; >> + >> + if (rx_core[lcore_id]) >> + printf( >> + "[%s()] lcore %d executing NIC Rx, and using eventdev port %u\n", >> + __func__, lcore_id, prod_data.port_id); > > These prints wont show if rx,tx, scheduler running on master core(as we > are browsing through RTE_LCORE_FOREACH_SLAVE) OK, changed to RTE_LCORE_FOREACH, which also includes the master core. > >> + >> + if (!quiet) { >> + printf("\nPort Workload distribution:\n"); >> + uint32_t i; >> + uint64_t tot_pkts = 0; >> + uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0}; >> + for (i = 0; i < num_workers; i++) { >> + char statname[64]; >> + snprintf(statname, sizeof(statname), "port_%u_rx", >> + worker_data[i].port_id); > > Please check "port_%u_rx" xstat availability with PMD first. Added a check after rte_event_dev_xstats_by_name_get() to see if it's not -ENOTSUP. >> + pkts_per_wkr[i] = rte_event_dev_xstats_by_name_get( >> + dev_id, statname, NULL); >> + tot_pkts += pkts_per_wkr[i]; >> + } >> + for (i = 0; i < num_workers; i++) { >> + float pc = pkts_per_wkr[i] * 100 / >> + ((float)tot_pkts); >> + printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n", >> + i, pc, pkts_per_wkr[i]); >> + } >> + >> + } >> + >> + return 0; >> +} > > As final note, considering the different options in fastpath, I was > thinking like introducing app/test-eventdev like app/testpmd and have > set of function pointers# for different modes like "macswap", "txonly" > in testpmd to exercise different options and framework for adding new use > cases.I will work on that to check the feasibility. > > ## > struct fwd_engine { > const char *fwd_mode_name; /**< Forwarding mode name. */ > port_fwd_begin_t port_fwd_begin; /**< NULL if nothing special to do. */ > port_fwd_end_t port_fwd_end; /**< NULL if nothing special to do. */ > packet_fwd_t packet_fwd; /**< Mandatory. */ > }; > >> -- >> 2.7.4 >>