All of lore.kernel.org
 help / color / mirror / Atom feed
From: Harry van Haaren <harry.van.haaren@intel.com>
To: dev@dpdk.org
Cc: jerin.jacob@caviumnetworks.com,
	Bruce Richardson <bruce.richardson@intel.com>,
	Gage Eads <gage.eads@intel.com>,
	Harry van Haaren <harry.van.haaren@intel.com>
Subject: [PATCH v5 10/20] event/sw: add scheduling logic
Date: Fri, 24 Mar 2017 16:53:05 +0000	[thread overview]
Message-ID: <1490374395-149320-11-git-send-email-harry.van.haaren@intel.com> (raw)
In-Reply-To: <1490374395-149320-1-git-send-email-harry.van.haaren@intel.com>

From: Bruce Richardson <bruce.richardson@intel.com>

Add in the scheduling function which takes the events from the
producer queues and buffers them before scheduling them to consumer
queues. The scheduling logic includes support for atomic, reordered,
and parallel scheduling of flows.

Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
Signed-off-by: Gage Eads <gage.eads@intel.com>
Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
---
 drivers/event/sw/Makefile             |   1 +
 drivers/event/sw/sw_evdev.c           |   1 +
 drivers/event/sw/sw_evdev.h           |  11 +
 drivers/event/sw/sw_evdev_scheduler.c | 602 ++++++++++++++++++++++++++++++++++
 4 files changed, 615 insertions(+)
 create mode 100644 drivers/event/sw/sw_evdev_scheduler.c

diff --git a/drivers/event/sw/Makefile b/drivers/event/sw/Makefile
index b6ecd91..a7f5b3d 100644
--- a/drivers/event/sw/Makefile
+++ b/drivers/event/sw/Makefile
@@ -54,6 +54,7 @@ EXPORT_MAP := rte_pmd_evdev_sw_version.map
 # library source files
 SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev.c
 SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_worker.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_scheduler.c
 
 # export include files
 SYMLINK-y-include +=
diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c
index 9b2816d..b1ae2b6 100644
--- a/drivers/event/sw/sw_evdev.c
+++ b/drivers/event/sw/sw_evdev.c
@@ -555,6 +555,7 @@ sw_probe(const char *name, const char *params)
 	dev->enqueue_burst = sw_event_enqueue_burst;
 	dev->dequeue = sw_event_dequeue;
 	dev->dequeue_burst = sw_event_dequeue_burst;
+	dev->schedule = sw_event_schedule;
 
 	sw = dev->data->dev_private;
 	sw->data = dev->data;
diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
index ab372fd..7c157c7 100644
--- a/drivers/event/sw/sw_evdev.h
+++ b/drivers/event/sw/sw_evdev.h
@@ -248,8 +248,18 @@ struct sw_evdev {
 	/* Cache how many packets are in each cq */
 	uint16_t cq_ring_space[SW_PORTS_MAX] __rte_cache_aligned;
 
+	/* Array of pointers to load-balanced QIDs sorted by priority level */
+	struct sw_qid *qids_prioritized[RTE_EVENT_MAX_QUEUES_PER_DEV];
+
+	/* Stats */
+	struct sw_point_stats stats __rte_cache_aligned;
+	uint64_t sched_called;
 	int32_t sched_quanta;
+	uint64_t sched_no_iq_enqueues;
+	uint64_t sched_no_cq_enqueues;
+	uint64_t sched_cq_qid_called;
 
+	uint8_t started;
 	uint32_t credit_update_quanta;
 };
 
@@ -272,5 +282,6 @@ uint16_t sw_event_enqueue_burst(void *port, const struct rte_event ev[],
 uint16_t sw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait);
 uint16_t sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
 			uint64_t wait);
+void sw_event_schedule(struct rte_eventdev *dev);
 
 #endif /* _SW_EVDEV_H_ */
diff --git a/drivers/event/sw/sw_evdev_scheduler.c b/drivers/event/sw/sw_evdev_scheduler.c
new file mode 100644
index 0000000..2aecc95
--- /dev/null
+++ b/drivers/event/sw/sw_evdev_scheduler.c
@@ -0,0 +1,602 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <rte_ring.h>
+#include <rte_hash_crc.h>
+#include "sw_evdev.h"
+#include "iq_ring.h"
+#include "event_ring.h"
+
+#define SW_IQS_MASK (SW_IQS_MAX-1)
+
+/* Retrieve the highest priority IQ or -1 if no pkts available. Doing the
+ * CLZ twice is faster than caching the value due to data dependencies
+ */
+#define PKT_MASK_TO_IQ(pkts) \
+	(__builtin_ctz(pkts | (1 << SW_IQS_MAX)))
+
+/* Clamp the highest priorities to the max value as allowed by
+ * the mask. Assums MASK is (powerOfTwo - 1). Priority 0 (highest) are shifted
+ * into leftmost IQ so that clz() reads it first on dequeue
+ */
+#define PRIO_TO_IQ(prio) (prio > SW_IQS_MASK ? SW_IQS_MASK : prio)
+
+#define MAX_PER_IQ_DEQUEUE 48
+#define FLOWID_MASK (SW_QID_NUM_FIDS-1)
+
+static inline uint32_t
+sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
+		uint32_t iq_num, unsigned int count)
+{
+	struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */
+	struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE];
+	uint32_t nb_blocked = 0;
+	uint32_t i;
+
+	if (count > MAX_PER_IQ_DEQUEUE)
+		count = MAX_PER_IQ_DEQUEUE;
+
+	/* This is the QID ID. The QID ID is static, hence it can be
+	 * used to identify the stage of processing in history lists etc
+	 */
+	uint32_t qid_id = qid->id;
+
+	iq_ring_dequeue_burst(qid->iq[iq_num], qes, count);
+	for (i = 0; i < count; i++) {
+		const struct rte_event *qe = &qes[i];
+		/* use cheap bit mixing, we only need to lose a few bits */
+		uint32_t flow_id32 = (qes[i].flow_id) ^ (qes[i].flow_id >> 10);
+		const uint16_t flow_id = FLOWID_MASK & flow_id32;
+		struct sw_fid_t *fid = &qid->fids[flow_id];
+		int cq = fid->cq;
+
+		if (cq < 0) {
+			uint32_t cq_idx = qid->cq_next_tx++;
+			if (qid->cq_next_tx == qid->cq_num_mapped_cqs)
+				qid->cq_next_tx = 0;
+			cq = qid->cq_map[cq_idx];
+
+			/* find least used */
+			int cq_free_cnt = sw->cq_ring_space[cq];
+			for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs;
+					cq_idx++) {
+				int test_cq = qid->cq_map[cq_idx];
+				int test_cq_free = sw->cq_ring_space[test_cq];
+				if (test_cq_free > cq_free_cnt) {
+					cq = test_cq;
+					cq_free_cnt = test_cq_free;
+				}
+			}
+
+			fid->cq = cq; /* this pins early */
+		}
+
+		if (sw->cq_ring_space[cq] == 0 ||
+				sw->ports[cq].inflights == SW_PORT_HIST_LIST) {
+			blocked_qes[nb_blocked++] = *qe;
+			continue;
+		}
+
+		struct sw_port *p = &sw->ports[cq];
+
+		/* at this point we can queue up the packet on the cq_buf */
+		fid->pcount++;
+		p->cq_buf[p->cq_buf_count++] = *qe;
+		p->inflights++;
+		sw->cq_ring_space[cq]--;
+
+		int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1));
+		p->hist_list[head].fid = flow_id;
+		p->hist_list[head].qid = qid_id;
+
+		p->stats.tx_pkts++;
+		qid->stats.tx_pkts++;
+
+		/* if we just filled in the last slot, flush the buffer */
+		if (sw->cq_ring_space[cq] == 0) {
+			struct qe_ring *worker = p->cq_worker_ring;
+			qe_ring_enqueue_burst(worker, p->cq_buf,
+					p->cq_buf_count,
+					&sw->cq_ring_space[cq]);
+			p->cq_buf_count = 0;
+		}
+	}
+	iq_ring_put_back(qid->iq[iq_num], blocked_qes, nb_blocked);
+
+	return count - nb_blocked;
+}
+
+static inline uint32_t
+sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
+		uint32_t iq_num, unsigned int count, int keep_order)
+{
+	uint32_t i;
+	uint32_t cq_idx = qid->cq_next_tx;
+
+	/* This is the QID ID. The QID ID is static, hence it can be
+	 * used to identify the stage of processing in history lists etc
+	 */
+	uint32_t qid_id = qid->id;
+
+	if (count > MAX_PER_IQ_DEQUEUE)
+		count = MAX_PER_IQ_DEQUEUE;
+
+	if (keep_order)
+		/* only schedule as many as we have reorder buffer entries */
+		count = RTE_MIN(count,
+				rte_ring_count(qid->reorder_buffer_freelist));
+
+	for (i = 0; i < count; i++) {
+		const struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]);
+		uint32_t cq_check_count = 0;
+		uint32_t cq;
+
+		/*
+		 *  for parallel, just send to next available CQ in round-robin
+		 * fashion. So scan for an available CQ. If all CQs are full
+		 * just return and move on to next QID
+		 */
+		do {
+			if (++cq_check_count > qid->cq_num_mapped_cqs)
+				goto exit;
+			cq = qid->cq_map[cq_idx];
+			if (++cq_idx == qid->cq_num_mapped_cqs)
+				cq_idx = 0;
+		} while (qe_ring_free_count(sw->ports[cq].cq_worker_ring) == 0 ||
+				sw->ports[cq].inflights == SW_PORT_HIST_LIST);
+
+		struct sw_port *p = &sw->ports[cq];
+		if (sw->cq_ring_space[cq] == 0 ||
+				p->inflights == SW_PORT_HIST_LIST)
+			break;
+
+		sw->cq_ring_space[cq]--;
+
+		qid->stats.tx_pkts++;
+
+		const int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
+
+		p->hist_list[head].fid = qe->flow_id;
+		p->hist_list[head].qid = qid_id;
+
+		if (keep_order)
+			rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
+					(void *)&p->hist_list[head].rob_entry);
+
+		sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
+		iq_ring_pop(qid->iq[iq_num]);
+
+		rte_compiler_barrier();
+		p->inflights++;
+		p->stats.tx_pkts++;
+		p->hist_head++;
+	}
+exit:
+	qid->cq_next_tx = cq_idx;
+	return i;
+}
+
+static uint32_t
+sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
+		uint32_t iq_num, unsigned int count __rte_unused)
+{
+	uint32_t cq_id = qid->cq_map[0];
+	struct sw_port *port = &sw->ports[cq_id];
+
+	/* get max burst enq size for cq_ring */
+	uint32_t count_free = sw->cq_ring_space[cq_id];
+	if (count_free == 0)
+		return 0;
+
+	/* burst dequeue from the QID IQ ring */
+	struct iq_ring *ring = qid->iq[iq_num];
+	uint32_t ret = iq_ring_dequeue_burst(ring,
+			&port->cq_buf[port->cq_buf_count], count_free);
+	port->cq_buf_count += ret;
+
+	/* Update QID, Port and Total TX stats */
+	qid->stats.tx_pkts += ret;
+	port->stats.tx_pkts += ret;
+
+	/* Subtract credits from cached value */
+	sw->cq_ring_space[cq_id] -= ret;
+
+	return ret;
+}
+
+static uint32_t
+sw_schedule_qid_to_cq(struct sw_evdev *sw)
+{
+	uint32_t pkts = 0;
+	uint32_t qid_idx;
+
+	sw->sched_cq_qid_called++;
+
+	for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) {
+		struct sw_qid *qid = sw->qids_prioritized[qid_idx];
+
+		int type = qid->type;
+		int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);
+
+		/* zero mapped CQs indicates directed */
+		if (iq_num >= SW_IQS_MAX)
+			continue;
+
+		uint32_t pkts_done = 0;
+		uint32_t count = iq_ring_count(qid->iq[iq_num]);
+
+		if (count > 0) {
+			if (type == SW_SCHED_TYPE_DIRECT)
+				pkts_done += sw_schedule_dir_to_cq(sw, qid,
+						iq_num, count);
+			else if (type == RTE_SCHED_TYPE_ATOMIC)
+				pkts_done += sw_schedule_atomic_to_cq(sw, qid,
+						iq_num, count);
+			else
+				pkts_done += sw_schedule_parallel_to_cq(sw, qid,
+						iq_num, count,
+						type == RTE_SCHED_TYPE_ORDERED);
+		}
+
+		/* Check if the IQ that was polled is now empty, and unset it
+		 * in the IQ mask if its empty.
+		 */
+		int all_done = (pkts_done == count);
+
+		qid->iq_pkt_mask &= ~(all_done << (iq_num));
+		pkts += pkts_done;
+	}
+
+	return pkts;
+}
+
+/* This function will perform re-ordering of packets, and injecting into
+ * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*
+ * contiguous in that array, this function accepts a "range" of QIDs to scan.
+ */
+static uint16_t
+sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
+{
+	/* Perform egress reordering */
+	struct rte_event *qe;
+	uint32_t pkts_iter = 0;
+
+	for (; qid_start < qid_end; qid_start++) {
+		struct sw_qid *qid = &sw->qids[qid_start];
+		int i, num_entries_in_use;
+
+		if (qid->type != RTE_SCHED_TYPE_ORDERED)
+			continue;
+
+		num_entries_in_use = rte_ring_free_count(
+					qid->reorder_buffer_freelist);
+
+		for (i = 0; i < num_entries_in_use; i++) {
+			struct reorder_buffer_entry *entry;
+			int j;
+
+			entry = &qid->reorder_buffer[qid->reorder_buffer_index];
+
+			if (!entry->ready)
+				break;
+
+			for (j = 0; j < entry->num_fragments; j++) {
+				uint16_t dest_qid;
+				uint16_t dest_iq;
+
+				int idx = entry->fragment_index + j;
+				qe = &entry->fragments[idx];
+
+				dest_qid = qe->queue_id;
+				dest_iq  = PRIO_TO_IQ(qe->priority);
+
+				if (dest_qid >= sw->qid_count) {
+					sw->stats.rx_dropped++;
+					continue;
+				}
+
+				struct sw_qid *dest_qid_ptr =
+					&sw->qids[dest_qid];
+				const struct iq_ring *dest_iq_ptr =
+					dest_qid_ptr->iq[dest_iq];
+				if (iq_ring_free_count(dest_iq_ptr) == 0)
+					break;
+
+				pkts_iter++;
+
+				struct sw_qid *q = &sw->qids[dest_qid];
+				struct iq_ring *r = q->iq[dest_iq];
+
+				/* we checked for space above, so enqueue must
+				 * succeed
+				 */
+				iq_ring_enqueue(r, qe);
+				q->iq_pkt_mask |= (1 << (dest_iq));
+				q->iq_pkt_count[dest_iq]++;
+				q->stats.rx_pkts++;
+			}
+
+			entry->ready = (j != entry->num_fragments);
+			entry->num_fragments -= j;
+			entry->fragment_index += j;
+
+			if (!entry->ready) {
+				entry->fragment_index = 0;
+
+				rte_ring_sp_enqueue(
+						qid->reorder_buffer_freelist,
+						entry);
+
+				qid->reorder_buffer_index++;
+				qid->reorder_buffer_index %= qid->window_size;
+			}
+		}
+	}
+	return pkts_iter;
+}
+
+static inline void __attribute__((always_inline))
+sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
+{
+	RTE_SET_USED(sw);
+	struct qe_ring *worker = port->rx_worker_ring;
+	port->pp_buf_start = 0;
+	port->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf,
+			RTE_DIM(port->pp_buf));
+}
+
+static inline uint32_t __attribute__((always_inline))
+__pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
+{
+	static const struct reorder_buffer_entry dummy_rob;
+	uint32_t pkts_iter = 0;
+	struct sw_port *port = &sw->ports[port_id];
+
+	/* If shadow ring has 0 pkts, pull from worker ring */
+	if (port->pp_buf_count == 0)
+		sw_refill_pp_buf(sw, port);
+
+	while (port->pp_buf_count) {
+		const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
+		struct sw_hist_list_entry *hist_entry = NULL;
+		uint8_t flags = qe->op;
+		const uint16_t eop = !(flags & QE_FLAG_NOT_EOP);
+		int needs_reorder = 0;
+		/* if no-reordering, having PARTIAL == NEW */
+		if (!allow_reorder && !eop)
+			flags = QE_FLAG_VALID;
+
+		/*
+		 * if we don't have space for this packet in an IQ,
+		 * then move on to next queue. Technically, for a
+		 * packet that needs reordering, we don't need to check
+		 * here, but it simplifies things not to special-case
+		 */
+		uint32_t iq_num = PRIO_TO_IQ(qe->priority);
+		struct sw_qid *qid = &sw->qids[qe->queue_id];
+
+		if ((flags & QE_FLAG_VALID) &&
+				iq_ring_free_count(qid->iq[iq_num]) == 0)
+			break;
+
+		/* now process based on flags. Note that for directed
+		 * queues, the enqueue_flush masks off all but the
+		 * valid flag. This makes FWD and PARTIAL enqueues just
+		 * NEW type, and makes DROPS no-op calls.
+		 */
+		if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {
+			const uint32_t hist_tail = port->hist_tail &
+					(SW_PORT_HIST_LIST - 1);
+
+			hist_entry = &port->hist_list[hist_tail];
+			const uint32_t hist_qid = hist_entry->qid;
+			const uint32_t hist_fid = hist_entry->fid;
+
+			struct sw_fid_t *fid =
+				&sw->qids[hist_qid].fids[hist_fid];
+			fid->pcount -= eop;
+			if (fid->pcount == 0)
+				fid->cq = -1;
+
+			if (allow_reorder) {
+				/* set reorder ready if an ordered QID */
+				uintptr_t rob_ptr =
+					(uintptr_t)hist_entry->rob_entry;
+				const uintptr_t valid = (rob_ptr != 0);
+				needs_reorder = valid;
+				rob_ptr |=
+					((valid - 1) & (uintptr_t)&dummy_rob);
+				struct reorder_buffer_entry *tmp_rob_ptr =
+					(struct reorder_buffer_entry *)rob_ptr;
+				tmp_rob_ptr->ready = eop * needs_reorder;
+			}
+
+			port->inflights -= eop;
+			port->hist_tail += eop;
+		}
+		if (flags & QE_FLAG_VALID) {
+			port->stats.rx_pkts++;
+
+			if (allow_reorder && needs_reorder) {
+				struct reorder_buffer_entry *rob_entry =
+						hist_entry->rob_entry;
+
+				/* Although fragmentation not currently
+				 * supported by eventdev API, we support it
+				 * here. Open: How do we alert the user that
+				 * they've exceeded max frags?
+				 */
+				int num_frag = rob_entry->num_fragments;
+				if (num_frag == SW_FRAGMENTS_MAX)
+					sw->stats.rx_dropped++;
+				else {
+					int idx = rob_entry->num_fragments++;
+					rob_entry->fragments[idx] = *qe;
+				}
+				goto end_qe;
+			}
+
+			/* Use the iq_num from above to push the QE
+			 * into the qid at the right priority
+			 */
+
+			qid->iq_pkt_mask |= (1 << (iq_num));
+			iq_ring_enqueue(qid->iq[iq_num], qe);
+			qid->iq_pkt_count[iq_num]++;
+			qid->stats.rx_pkts++;
+			pkts_iter++;
+		}
+
+end_qe:
+		port->pp_buf_start++;
+		port->pp_buf_count--;
+	} /* while (avail_qes) */
+
+	return pkts_iter;
+}
+
+static uint32_t
+sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id)
+{
+	return __pull_port_lb(sw, port_id, 1);
+}
+
+static uint32_t
+sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id)
+{
+	return __pull_port_lb(sw, port_id, 0);
+}
+
+static uint32_t
+sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
+{
+	uint32_t pkts_iter = 0;
+	struct sw_port *port = &sw->ports[port_id];
+
+	/* If shadow ring has 0 pkts, pull from worker ring */
+	if (port->pp_buf_count == 0)
+		sw_refill_pp_buf(sw, port);
+
+	while (port->pp_buf_count) {
+		const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
+		uint8_t flags = qe->op;
+
+		if ((flags & QE_FLAG_VALID) == 0)
+			goto end_qe;
+
+		uint32_t iq_num = PRIO_TO_IQ(qe->priority);
+		struct sw_qid *qid = &sw->qids[qe->queue_id];
+		struct iq_ring *iq_ring = qid->iq[iq_num];
+
+		if (iq_ring_free_count(iq_ring) == 0)
+			break; /* move to next port */
+
+		port->stats.rx_pkts++;
+
+		/* Use the iq_num from above to push the QE
+		 * into the qid at the right priority
+		 */
+		qid->iq_pkt_mask |= (1 << (iq_num));
+		iq_ring_enqueue(iq_ring, qe);
+		qid->iq_pkt_count[iq_num]++;
+		qid->stats.rx_pkts++;
+		pkts_iter++;
+
+end_qe:
+		port->pp_buf_start++;
+		port->pp_buf_count--;
+	} /* while port->pp_buf_count */
+
+	return pkts_iter;
+}
+
+void
+sw_event_schedule(struct rte_eventdev *dev)
+{
+	struct sw_evdev *sw = sw_pmd_priv(dev);
+	uint32_t in_pkts, out_pkts;
+	uint32_t out_pkts_total = 0, in_pkts_total = 0;
+	int32_t sched_quanta = sw->sched_quanta;
+	uint32_t i;
+
+	sw->sched_called++;
+	if (!sw->started)
+		return;
+
+	do {
+		uint32_t in_pkts_this_iteration = 0;
+
+		/* Pull from rx_ring for ports */
+		do {
+			in_pkts = 0;
+			for (i = 0; i < sw->port_count; i++)
+				if (sw->ports[i].is_directed)
+					in_pkts += sw_schedule_pull_port_dir(sw, i);
+				else if (sw->ports[i].num_ordered_qids > 0)
+					in_pkts += sw_schedule_pull_port_lb(sw, i);
+				else
+					in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
+
+			/* QID scan for re-ordered */
+			in_pkts += sw_schedule_reorder(sw, 0,
+					sw->qid_count);
+			in_pkts_this_iteration += in_pkts;
+		} while (in_pkts > 4 &&
+				(int)in_pkts_this_iteration < sched_quanta);
+
+		out_pkts = 0;
+		out_pkts += sw_schedule_qid_to_cq(sw);
+		out_pkts_total += out_pkts;
+		in_pkts_total += in_pkts_this_iteration;
+
+		if (in_pkts == 0 && out_pkts == 0)
+			break;
+	} while ((int)out_pkts_total < sched_quanta);
+
+	/* push all the internal buffered QEs in port->cq_ring to the
+	 * worker cores: aka, do the ring transfers batched.
+	 */
+	for (i = 0; i < sw->port_count; i++) {
+		struct qe_ring *worker = sw->ports[i].cq_worker_ring;
+		qe_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
+				sw->ports[i].cq_buf_count,
+				&sw->cq_ring_space[i]);
+		sw->ports[i].cq_buf_count = 0;
+	}
+
+	sw->stats.tx_pkts += out_pkts_total;
+	sw->stats.rx_pkts += in_pkts_total;
+
+	sw->sched_no_iq_enqueues += (in_pkts_total == 0);
+	sw->sched_no_cq_enqueues += (out_pkts_total == 0);
+
+}
-- 
2.7.4

  parent reply	other threads:[~2017-03-24 16:53 UTC|newest]

Thread overview: 109+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
     [not found] <489175012-101439-1-git-send-email-harry.van.haaren@intel.com>
2017-03-24 16:52 ` [PATCH v5 00/20] next-eventdev: event/sw software eventdev Harry van Haaren
2017-03-24 16:52   ` [PATCH v5 01/20] test/eventdev: pass timeout ticks unsupported Harry van Haaren
2017-03-25  5:38     ` Jerin Jacob
2017-03-24 16:52   ` [PATCH v5 02/20] event/sw: add new software-only eventdev driver Harry van Haaren
2017-03-25  6:24     ` Jerin Jacob
2017-03-27 15:30       ` Van Haaren, Harry
2017-03-24 16:52   ` [PATCH v5 03/20] event/sw: add device capabilities function Harry van Haaren
2017-03-25 10:50     ` Jerin Jacob
2017-03-24 16:52   ` [PATCH v5 04/20] event/sw: add configure function Harry van Haaren
2017-03-25 13:17     ` Jerin Jacob
2017-03-24 16:53   ` [PATCH v5 05/20] event/sw: add fns to return default port/queue config Harry van Haaren
2017-03-25 13:21     ` Jerin Jacob
2017-03-24 16:53   ` [PATCH v5 06/20] event/sw: add support for event queues Harry van Haaren
2017-03-27  7:45     ` Jerin Jacob
2017-03-27  8:47       ` Bruce Richardson
2017-03-27 15:17       ` Van Haaren, Harry
2017-03-28 10:43         ` Jerin Jacob
2017-03-28 12:42           ` Van Haaren, Harry
2017-03-28 17:36             ` Jerin Jacob
2017-03-29  8:28               ` Van Haaren, Harry
2017-03-24 16:53   ` [PATCH v5 07/20] event/sw: add support for event ports Harry van Haaren
2017-03-27  8:55     ` Jerin Jacob
2017-03-24 16:53   ` [PATCH v5 08/20] event/sw: add support for linking queues to ports Harry van Haaren
2017-03-27 11:20     ` Jerin Jacob
2017-03-29 10:58       ` Van Haaren, Harry
2017-03-24 16:53   ` [PATCH v5 09/20] event/sw: add worker core functions Harry van Haaren
2017-03-27 13:50     ` Jerin Jacob
2017-03-28 16:17       ` Van Haaren, Harry
2017-03-24 16:53   ` Harry van Haaren [this message]
2017-03-24 16:53   ` [PATCH v5 11/20] event/sw: add start stop and close functions Harry van Haaren
2017-03-27 16:02     ` Jerin Jacob
2017-03-24 16:53   ` [PATCH v5 12/20] event/sw: add dump function for easier debugging Harry van Haaren
2017-03-24 16:53   ` [PATCH v5 13/20] event/sw: add xstats support Harry van Haaren
2017-03-24 16:53   ` [PATCH v5 14/20] test/eventdev: add SW test infrastructure Harry van Haaren
2017-03-28 15:20     ` Burakov, Anatoly
2017-03-24 16:53   ` [PATCH v5 15/20] test/eventdev: add basic SW tests Harry van Haaren
2017-03-28 15:21     ` Burakov, Anatoly
2017-03-24 16:53   ` [PATCH v5 16/20] test/eventdev: add SW tests for load balancing Harry van Haaren
2017-03-28 15:21     ` Burakov, Anatoly
2017-03-24 16:53   ` [PATCH v5 17/20] test/eventdev: add SW xstats tests Harry van Haaren
2017-03-28 15:22     ` Burakov, Anatoly
2017-03-24 16:53   ` [PATCH v5 18/20] test/eventdev: add SW deadlock tests Harry van Haaren
2017-03-28 15:22     ` Burakov, Anatoly
2017-03-24 16:53   ` [PATCH v5 19/20] doc: add event device and software eventdev Harry van Haaren
2017-03-29 13:47     ` Jerin Jacob
2017-03-24 16:53   ` [PATCH v5 20/20] maintainers: add eventdev section and claim SW PMD Harry van Haaren
2017-03-29 13:05     ` Jerin Jacob
2017-03-29 23:25   ` [PATCH v6 00/21] next-eventdev: event/sw software eventdev Harry van Haaren
2017-03-29 23:25     ` [PATCH v6 01/21] eventdev: improve API docs for start function Harry van Haaren
2017-03-30 10:56       ` Burakov, Anatoly
2017-03-30 17:11       ` Jerin Jacob
2017-03-30 17:24         ` Van Haaren, Harry
2017-03-29 23:25     ` [PATCH v6 02/21] test/eventdev: pass timeout ticks unsupported Harry van Haaren
2017-03-29 23:25     ` [PATCH v6 03/21] event/sw: add new software-only eventdev driver Harry van Haaren
2017-03-29 23:25     ` [PATCH v6 04/21] event/sw: add device capabilities function Harry van Haaren
2017-03-29 23:25     ` [PATCH v6 05/21] event/sw: add configure function Harry van Haaren
2017-03-29 23:25     ` [PATCH v6 06/21] event/sw: add fns to return default port/queue config Harry van Haaren
2017-03-29 23:25     ` [PATCH v6 07/21] event/sw: add support for event queues Harry van Haaren
2017-03-30 18:06       ` Jerin Jacob
2017-03-29 23:25     ` [PATCH v6 08/21] event/sw: add support for event ports Harry van Haaren
2017-03-29 23:25     ` [PATCH v6 09/21] event/sw: add support for linking queues to ports Harry van Haaren
2017-03-29 23:25     ` [PATCH v6 10/21] event/sw: add worker core functions Harry van Haaren
2017-03-30 18:07       ` Jerin Jacob
2017-03-29 23:25     ` [PATCH v6 11/21] event/sw: add scheduling logic Harry van Haaren
2017-03-30 10:07       ` Hunt, David
2017-03-29 23:25     ` [PATCH v6 12/21] event/sw: add start stop and close functions Harry van Haaren
2017-03-30  8:24       ` Jerin Jacob
2017-03-30  8:49         ` Van Haaren, Harry
2017-03-29 23:25     ` [PATCH v6 13/21] event/sw: add dump function for easier debugging Harry van Haaren
2017-03-30 10:32       ` Hunt, David
2017-03-29 23:25     ` [PATCH v6 14/21] event/sw: add xstats support Harry van Haaren
2017-03-30 11:12       ` Hunt, David
2017-03-29 23:25     ` [PATCH v6 15/21] test/eventdev: add SW test infrastructure Harry van Haaren
2017-03-29 23:25     ` [PATCH v6 16/21] test/eventdev: add basic SW tests Harry van Haaren
2017-03-29 23:25     ` [PATCH v6 17/21] test/eventdev: add SW tests for load balancing Harry van Haaren
2017-03-29 23:26     ` [PATCH v6 18/21] test/eventdev: add SW xstats tests Harry van Haaren
2017-03-29 23:26     ` [PATCH v6 19/21] test/eventdev: add SW deadlock tests Harry van Haaren
2017-03-29 23:26     ` [PATCH v6 20/21] doc: add event device and software eventdev Harry van Haaren
2017-03-30  8:27       ` Burakov, Anatoly
2017-03-29 23:26     ` [PATCH v6 21/21] maintainers: add eventdev section and claim SW PMD Harry van Haaren
2017-03-30 19:30     ` [PATCH v7 00/22] next-eventdev: event/sw software eventdev Harry van Haaren
2017-03-30 19:30       ` [PATCH v7 01/22] eventdev: improve API docs for start function Harry van Haaren
2017-03-30 19:30       ` [PATCH v7 02/22] test/eventdev: pass timeout ticks unsupported Harry van Haaren
2017-03-30 19:30       ` [PATCH v7 03/22] event/sw: add new software-only eventdev driver Harry van Haaren
2017-03-30 19:30       ` [PATCH v7 04/22] event/sw: add device capabilities function Harry van Haaren
2017-03-30 19:30       ` [PATCH v7 05/22] event/sw: add configure function Harry van Haaren
2017-03-30 19:30       ` [PATCH v7 06/22] event/sw: add fns to return default port/queue config Harry van Haaren
2017-03-30 19:30       ` [PATCH v7 07/22] event/sw: add support for event queues Harry van Haaren
2017-03-30 19:30       ` [PATCH v7 08/22] event/sw: add support for event ports Harry van Haaren
2017-03-30 19:30       ` [PATCH v7 09/22] event/sw: add support for linking queues to ports Harry van Haaren
2017-03-30 19:30       ` [PATCH v7 10/22] event/sw: add worker core functions Harry van Haaren
2017-03-30 19:30       ` [PATCH v7 11/22] event/sw: add scheduling logic Harry van Haaren
2017-03-30 19:30       ` [PATCH v7 12/22] event/sw: add start stop and close functions Harry van Haaren
2017-03-30 19:30       ` [PATCH v7 13/22] event/sw: add dump function for easier debugging Harry van Haaren
2017-03-30 19:30       ` [PATCH v7 14/22] event/sw: add xstats support Harry van Haaren
2017-03-30 19:30       ` [PATCH v7 15/22] test/eventdev: add SW test infrastructure Harry van Haaren
2017-03-30 19:30       ` [PATCH v7 16/22] test/eventdev: add basic SW tests Harry van Haaren
2017-03-30 19:30       ` [PATCH v7 17/22] test/eventdev: add SW tests for load balancing Harry van Haaren
2017-04-02 14:56         ` Jerin Jacob
2017-04-03  9:08           ` Van Haaren, Harry
2017-03-30 19:30       ` [PATCH v7 18/22] test/eventdev: add SW xstats tests Harry van Haaren
2017-03-30 19:30       ` [PATCH v7 19/22] test/eventdev: add SW deadlock tests Harry van Haaren
2017-03-30 19:30       ` [PATCH v7 20/22] doc: add event device and software eventdev Harry van Haaren
2017-03-30 19:30       ` [PATCH v7 21/22] doc: add SW eventdev PMD to 17.05 release notes Harry van Haaren
2017-03-31 12:23         ` Hunt, David
2017-03-31 14:45         ` Jerin Jacob
2017-03-30 19:30       ` [PATCH v7 22/22] maintainers: add eventdev section and claim SW PMD Harry van Haaren
2017-03-31 13:56         ` Jerin Jacob
2017-04-01 11:38       ` [PATCH v7 00/22] next-eventdev: event/sw software eventdev Jerin Jacob

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1490374395-149320-11-git-send-email-harry.van.haaren@intel.com \
    --to=harry.van.haaren@intel.com \
    --cc=bruce.richardson@intel.com \
    --cc=dev@dpdk.org \
    --cc=gage.eads@intel.com \
    --cc=jerin.jacob@caviumnetworks.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.