All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH v1 0/4] eventdev: add interrupt driven queues to Rx adapter
@ 2018-06-08 18:15 Nikhil Rao
  2018-06-08 18:15 ` [PATCH v1 1/4] eventdev: standardize Rx adapter internal function names Nikhil Rao
                   ` (5 more replies)
  0 siblings, 6 replies; 19+ messages in thread
From: Nikhil Rao @ 2018-06-08 18:15 UTC (permalink / raw)
  To: jerin.jacob; +Cc: dev, Nikhil Rao

This patch series adds support for interrupt driven queues to the
ethernet Rx adapter, the first 3 patches prepare the code to
handle both poll and interrupt driven Rx queues and the final
patch has code changes specific to interrupt driven queues.

This patch series is to be applied on top of the Rx adapter
patches posted previously

https://dpdk.org/dev/patchwork/patch/40629/
https://dpdk.org/dev/patchwork/patch/40595/
https://dpdk.org/dev/patchwork/patch/40594/
https://dpdk.org/dev/patchwork/patch/40593/

Nikhil Rao (4):
  eventdev: standardize Rx adapter internal function names
  eventdev: improve err handling for Rx adapter queue add/del
  eventdev: move Rx adapter eth Rx to separate function
  eventdev: add interrupt driven queues in Rx event adapter

 lib/librte_eventdev/rte_event_eth_rx_adapter.h     |    5 +-
 lib/librte_eventdev/rte_event_eth_rx_adapter.c     | 1630 +++++++++++++++++---
 test/test/test_event_eth_rx_adapter.c              |  261 +++-
 .../prog_guide/event_ethernet_rx_adapter.rst       |   24 +
 config/common_base                                 |    1 +
 lib/librte_eventdev/Makefile                       |    4 +-
 6 files changed, 1689 insertions(+), 236 deletions(-)

-- 
1.8.3.1

^ permalink raw reply	[flat|nested] 19+ messages in thread

* [PATCH v1 1/4] eventdev: standardize Rx adapter internal function names
  2018-06-08 18:15 [PATCH v1 0/4] eventdev: add interrupt driven queues to Rx adapter Nikhil Rao
@ 2018-06-08 18:15 ` Nikhil Rao
  2018-06-17 13:24   ` Jerin Jacob
  2018-06-08 18:15 ` [PATCH v1 2/4] eventdev: improve err handling for Rx adapter queue add/del Nikhil Rao
                   ` (4 subsequent siblings)
  5 siblings, 1 reply; 19+ messages in thread
From: Nikhil Rao @ 2018-06-08 18:15 UTC (permalink / raw)
  To: jerin.jacob; +Cc: dev, Nikhil Rao

Add a common prefix to function names and rename
few to better match functionality

Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
---
 lib/librte_eventdev/rte_event_eth_rx_adapter.c | 167 ++++++++++++-------------
 1 file changed, 80 insertions(+), 87 deletions(-)

diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
index ce1f62d..9361d48 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
@@ -129,30 +129,30 @@ struct eth_rx_queue_info {
 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
 
 static inline int
-valid_id(uint8_t id)
+rxa_validate_id(uint8_t id)
 {
 	return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
 }
 
 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
-	if (!valid_id(id)) { \
+	if (!rxa_validate_id(id)) { \
 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
 		return retval; \
 	} \
 } while (0)
 
 static inline int
-sw_rx_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
+rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
 {
 	return rx_adapter->num_rx_polled;
 }
 
 /* Greatest common divisor */
-static uint16_t gcd_u16(uint16_t a, uint16_t b)
+static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 {
 	uint16_t r = a % b;
 
-	return r ? gcd_u16(b, r) : b;
+	return r ? rxa_gcd_u16(b, r) : b;
 }
 
 /* Returns the next queue in the polling sequence
@@ -160,7 +160,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
  */
 static int
-wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
+rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
 	 unsigned int n, int *cw,
 	 struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
 	 uint16_t gcd, int prev)
@@ -190,7 +190,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 
 /* Precalculate WRR polling sequence for all queues in rx_adapter */
 static int
-eth_poll_wrr_calc(struct rte_event_eth_rx_adapter *rx_adapter)
+rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter)
 {
 	uint16_t d;
 	uint16_t q;
@@ -239,7 +239,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 				rx_poll[poll_q].eth_rx_qid = q;
 				max_wrr_pos += wt;
 				max_wt = RTE_MAX(max_wt, wt);
-				gcd = (gcd) ? gcd_u16(gcd, wt) : wt;
+				gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
 				poll_q++;
 			}
 		}
@@ -259,7 +259,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 		int prev = -1;
 		int cw = -1;
 		for (i = 0; i < max_wrr_pos; i++) {
-			rx_wrr[i] = wrr_next(rx_adapter, poll_q, &cw,
+			rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
 					     rx_poll, max_wt, gcd, prev);
 			prev = rx_wrr[i];
 		}
@@ -276,7 +276,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 }
 
 static inline void
-mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
+rxa_mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
 	struct ipv6_hdr **ipv6_hdr)
 {
 	struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
@@ -315,7 +315,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 
 /* Calculate RSS hash for IPv4/6 */
 static inline uint32_t
-do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
+rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
 {
 	uint32_t input_len;
 	void *tuple;
@@ -324,7 +324,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 	struct ipv4_hdr *ipv4_hdr;
 	struct ipv6_hdr *ipv6_hdr;
 
-	mtoip(m, &ipv4_hdr, &ipv6_hdr);
+	rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
 
 	if (ipv4_hdr) {
 		ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
@@ -343,13 +343,13 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 }
 
 static inline int
-rx_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
+rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
 {
 	return !!rx_adapter->enq_block_count;
 }
 
 static inline void
-rx_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
+rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
 {
 	if (rx_adapter->rx_enq_block_start_ts)
 		return;
@@ -362,13 +362,13 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 }
 
 static inline void
-rx_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
+rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
 		    struct rte_event_eth_rx_adapter_stats *stats)
 {
 	if (unlikely(!stats->rx_enq_start_ts))
 		stats->rx_enq_start_ts = rte_get_tsc_cycles();
 
-	if (likely(!rx_enq_blocked(rx_adapter)))
+	if (likely(!rxa_enq_blocked(rx_adapter)))
 		return;
 
 	rx_adapter->enq_block_count = 0;
@@ -384,8 +384,8 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
  * this function
  */
 static inline void
-buf_event_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
-		  struct rte_event *ev)
+rxa_buffer_event(struct rte_event_eth_rx_adapter *rx_adapter,
+		struct rte_event *ev)
 {
 	struct rte_eth_event_enqueue_buffer *buf =
 	    &rx_adapter->event_enqueue_buffer;
@@ -394,7 +394,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 
 /* Enqueue buffered events to event device */
 static inline uint16_t
-flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
+rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
 {
 	struct rte_eth_event_enqueue_buffer *buf =
 	    &rx_adapter->event_enqueue_buffer;
@@ -411,8 +411,8 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 		stats->rx_enq_retry++;
 	}
 
-	n ? rx_enq_block_end_ts(rx_adapter, stats) :
-		rx_enq_block_start_ts(rx_adapter);
+	n ? rxa_enq_block_end_ts(rx_adapter, stats) :
+		rxa_enq_block_start_ts(rx_adapter);
 
 	buf->count -= n;
 	stats->rx_enq_count += n;
@@ -421,11 +421,11 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 }
 
 static inline void
-fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter,
-	uint16_t eth_dev_id,
-	uint16_t rx_queue_id,
-	struct rte_mbuf **mbufs,
-	uint16_t num)
+rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
+		uint16_t eth_dev_id,
+		uint16_t rx_queue_id,
+		struct rte_mbuf **mbufs,
+		uint16_t num)
 {
 	uint32_t i;
 	struct eth_device_info *eth_device_info =
@@ -463,7 +463,8 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 		struct rte_event *ev = &events[i];
 
 		rss = do_rss ?
-			do_softrss(m, rx_adapter->rss_key_be) : m->hash.rss;
+			rxa_do_softrss(m, rx_adapter->rss_key_be) :
+			m->hash.rss;
 		flow_id =
 		    eth_rx_queue_info->flow_id &
 				eth_rx_queue_info->flow_id_mask;
@@ -477,7 +478,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 		ev->priority = priority;
 		ev->mbuf = m;
 
-		buf_event_enqueue(rx_adapter, ev);
+		rxa_buffer_event(rx_adapter, ev);
 	}
 }
 
@@ -495,7 +496,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
  * it.
  */
 static inline void
-eth_rx_poll(struct rte_event_eth_rx_adapter *rx_adapter)
+rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
 {
 	uint32_t num_queue;
 	uint16_t n;
@@ -520,7 +521,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 		 * enough space in the enqueue buffer.
 		 */
 		if (buf->count >= BATCH_SIZE)
-			flush_event_buffer(rx_adapter);
+			rxa_flush_event_buffer(rx_adapter);
 		if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
 			rx_adapter->wrr_pos = wrr_pos;
 			return;
@@ -534,7 +535,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 			/* The check before rte_eth_rx_burst() ensures that
 			 * all n mbufs can be buffered
 			 */
-			fill_event_buffer(rx_adapter, d, qid, mbufs, n);
+			rxa_buffer_mbufs(rx_adapter, d, qid, mbufs, n);
 			nb_rx += n;
 			if (nb_rx > max_nb_rx) {
 				rx_adapter->wrr_pos =
@@ -548,11 +549,11 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 	}
 
 	if (buf->count >= BATCH_SIZE)
-		flush_event_buffer(rx_adapter);
+		rxa_flush_event_buffer(rx_adapter);
 }
 
 static int
-event_eth_rx_adapter_service_func(void *args)
+rxa_service_func(void *args)
 {
 	struct rte_event_eth_rx_adapter *rx_adapter = args;
 
@@ -562,7 +563,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 		return 0;
 		rte_spinlock_unlock(&rx_adapter->rx_lock);
 	}
-	eth_rx_poll(rx_adapter);
+	rxa_poll(rx_adapter);
 	rte_spinlock_unlock(&rx_adapter->rx_lock);
 	return 0;
 }
@@ -594,14 +595,14 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 }
 
 static inline struct rte_event_eth_rx_adapter *
-id_to_rx_adapter(uint8_t id)
+rxa_id_to_adapter(uint8_t id)
 {
 	return event_eth_rx_adapter ?
 		event_eth_rx_adapter[id] : NULL;
 }
 
 static int
-default_conf_cb(uint8_t id, uint8_t dev_id,
+rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
 		struct rte_event_eth_rx_adapter_conf *conf, void *arg)
 {
 	int ret;
@@ -610,7 +611,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 	int started;
 	uint8_t port_id;
 	struct rte_event_port_conf *port_conf = arg;
-	struct rte_event_eth_rx_adapter *rx_adapter = id_to_rx_adapter(id);
+	struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
 
 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
 	dev_conf = dev->data->dev_conf;
@@ -647,7 +648,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 }
 
 static int
-init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
+rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
 {
 	int ret;
 	struct rte_service_spec service;
@@ -660,7 +661,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 	snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
 		"rte_event_eth_rx_adapter_%d", id);
 	service.socket_id = rx_adapter->socket_id;
-	service.callback = event_eth_rx_adapter_service_func;
+	service.callback = rxa_service_func;
 	service.callback_userdata = rx_adapter;
 	/* Service function handles locking for queue add/del updates */
 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
@@ -688,9 +689,8 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 	return ret;
 }
 
-
 static void
-update_queue_info(struct rte_event_eth_rx_adapter *rx_adapter,
+rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 		struct eth_device_info *dev_info,
 		int32_t rx_queue_id,
 		uint8_t add)
@@ -704,7 +704,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 
 	if (rx_queue_id == -1) {
 		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
-			update_queue_info(rx_adapter, dev_info, i, add);
+			rxa_update_queue(rx_adapter, dev_info, i, add);
 	} else {
 		queue_info = &dev_info->rx_queue[rx_queue_id];
 		enabled = queue_info->queue_enabled;
@@ -720,9 +720,9 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 }
 
 static int
-event_eth_rx_adapter_queue_del(struct rte_event_eth_rx_adapter *rx_adapter,
-			    struct eth_device_info *dev_info,
-			    uint16_t rx_queue_id)
+rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
+	struct eth_device_info *dev_info,
+	uint16_t rx_queue_id)
 {
 	struct eth_rx_queue_info *queue_info;
 
@@ -731,15 +731,15 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 
 	queue_info = &dev_info->rx_queue[rx_queue_id];
 	rx_adapter->num_rx_polled -= queue_info->queue_enabled;
-	update_queue_info(rx_adapter, dev_info, rx_queue_id, 0);
+	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
 	return 0;
 }
 
 static void
-event_eth_rx_adapter_queue_add(struct rte_event_eth_rx_adapter *rx_adapter,
-		struct eth_device_info *dev_info,
-		uint16_t rx_queue_id,
-		const struct rte_event_eth_rx_adapter_queue_conf *conf)
+rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
+	struct eth_device_info *dev_info,
+	uint16_t rx_queue_id,
+	const struct rte_event_eth_rx_adapter_queue_conf *conf)
 
 {
 	struct eth_rx_queue_info *queue_info;
@@ -759,10 +759,10 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 
 	/* The same queue can be added more than once */
 	rx_adapter->num_rx_polled += !queue_info->queue_enabled;
-	update_queue_info(rx_adapter, dev_info, rx_queue_id, 1);
+	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
 }
 
-static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
+static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 		uint16_t eth_dev_id,
 		int rx_queue_id,
 		const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
@@ -799,19 +799,15 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 
 	if (rx_queue_id == -1) {
 		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
-			event_eth_rx_adapter_queue_add(rx_adapter,
-						dev_info, i,
-						queue_conf);
+			rxa_add_queue(rx_adapter, dev_info, i, queue_conf);
 	} else {
-		event_eth_rx_adapter_queue_add(rx_adapter, dev_info,
-					  (uint16_t)rx_queue_id,
-					  queue_conf);
+		rxa_add_queue(rx_adapter, dev_info, (uint16_t)rx_queue_id,
+			queue_conf);
 	}
 
-	ret = eth_poll_wrr_calc(rx_adapter);
+	ret = rxa_calc_wrr_sequence(rx_adapter);
 	if (ret) {
-		event_eth_rx_adapter_queue_del(rx_adapter,
-					dev_info, rx_queue_id);
+		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
 		return ret;
 	}
 
@@ -819,7 +815,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 }
 
 static int
-rx_adapter_ctrl(uint8_t id, int start)
+rxa_ctrl(uint8_t id, int start)
 {
 	struct rte_event_eth_rx_adapter *rx_adapter;
 	struct rte_eventdev *dev;
@@ -829,7 +825,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 	int stop = !start;
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
-	rx_adapter = id_to_rx_adapter(id);
+	rx_adapter = rxa_id_to_adapter(id);
 	if (rx_adapter == NULL)
 		return -EINVAL;
 
@@ -892,7 +888,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 			return ret;
 	}
 
-	rx_adapter = id_to_rx_adapter(id);
+	rx_adapter = rxa_id_to_adapter(id);
 	if (rx_adapter != NULL) {
 		RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
 		return -EEXIST;
@@ -934,7 +930,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 		rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
 
 	event_eth_rx_adapter[id] = rx_adapter;
-	if (conf_cb == default_conf_cb)
+	if (conf_cb == rxa_default_conf_cb)
 		rx_adapter->default_cb_arg = 1;
 	return 0;
 }
@@ -955,7 +951,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 		return -ENOMEM;
 	*pc = *port_config;
 	ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
-					default_conf_cb,
+					rxa_default_conf_cb,
 					pc);
 	if (ret)
 		rte_free(pc);
@@ -969,7 +965,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 
-	rx_adapter = id_to_rx_adapter(id);
+	rx_adapter = rxa_id_to_adapter(id);
 	if (rx_adapter == NULL)
 		return -EINVAL;
 
@@ -1004,7 +1000,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
 
-	rx_adapter = id_to_rx_adapter(id);
+	rx_adapter = rxa_id_to_adapter(id);
 	if ((rx_adapter == NULL) || (queue_conf == NULL))
 		return -EINVAL;
 
@@ -1063,7 +1059,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 				rx_queue_id, queue_conf);
 		if (ret == 0) {
 			dev_info->internal_event_port = 1;
-			update_queue_info(rx_adapter,
+			rxa_update_queue(rx_adapter,
 					&rx_adapter->eth_devices[eth_dev_id],
 					rx_queue_id,
 					1);
@@ -1071,13 +1067,14 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 	} else {
 		rte_spinlock_lock(&rx_adapter->rx_lock);
 		dev_info->internal_event_port = 0;
-		ret = init_service(rx_adapter, id);
+		ret = rxa_init_service(rx_adapter, id);
 		if (ret == 0)
-			ret = add_rx_queue(rx_adapter, eth_dev_id, rx_queue_id,
+			ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
 					queue_conf);
 		rte_spinlock_unlock(&rx_adapter->rx_lock);
 		if (ret == 0)
-			start_service = !!sw_rx_adapter_queue_count(rx_adapter);
+			start_service =
+				!!rxa_sw_adapter_queue_count(rx_adapter);
 	}
 
 	if (ret)
@@ -1103,7 +1100,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
 
-	rx_adapter = id_to_rx_adapter(id);
+	rx_adapter = rxa_id_to_adapter(id);
 	if (rx_adapter == NULL)
 		return -EINVAL;
 
@@ -1130,7 +1127,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 						&rte_eth_devices[eth_dev_id],
 						rx_queue_id);
 		if (ret == 0) {
-			update_queue_info(rx_adapter,
+			rxa_update_queue(rx_adapter,
 					&rx_adapter->eth_devices[eth_dev_id],
 					rx_queue_id,
 					0);
@@ -1144,16 +1141,12 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 		rte_spinlock_lock(&rx_adapter->rx_lock);
 		if (rx_queue_id == -1) {
 			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
-				event_eth_rx_adapter_queue_del(rx_adapter,
-							dev_info,
-							i);
+				rxa_sw_del(rx_adapter, dev_info, i);
 		} else {
-			event_eth_rx_adapter_queue_del(rx_adapter,
-						dev_info,
-						(uint16_t)rx_queue_id);
+			rxa_sw_del(rx_adapter, dev_info, (uint16_t)rx_queue_id);
 		}
 
-		rc = eth_poll_wrr_calc(rx_adapter);
+		rc = rxa_calc_wrr_sequence(rx_adapter);
 		if (rc)
 			RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32,
 					rc);
@@ -1165,7 +1158,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 
 		rte_spinlock_unlock(&rx_adapter->rx_lock);
 		rte_service_component_runstate_set(rx_adapter->service_id,
-				sw_rx_adapter_queue_count(rx_adapter));
+				rxa_sw_adapter_queue_count(rx_adapter));
 	}
 
 	return ret;
@@ -1175,13 +1168,13 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 int
 rte_event_eth_rx_adapter_start(uint8_t id)
 {
-	return rx_adapter_ctrl(id, 1);
+	return rxa_ctrl(id, 1);
 }
 
 int
 rte_event_eth_rx_adapter_stop(uint8_t id)
 {
-	return rx_adapter_ctrl(id, 0);
+	return rxa_ctrl(id, 0);
 }
 
 int
@@ -1198,7 +1191,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 
-	rx_adapter = id_to_rx_adapter(id);
+	rx_adapter = rxa_id_to_adapter(id);
 	if (rx_adapter  == NULL || stats == NULL)
 		return -EINVAL;
 
@@ -1236,7 +1229,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 
-	rx_adapter = id_to_rx_adapter(id);
+	rx_adapter = rxa_id_to_adapter(id);
 	if (rx_adapter == NULL)
 		return -EINVAL;
 
@@ -1261,7 +1254,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 
-	rx_adapter = id_to_rx_adapter(id);
+	rx_adapter = rxa_id_to_adapter(id);
 	if (rx_adapter == NULL || service_id == NULL)
 		return -EINVAL;
 
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v1 2/4] eventdev: improve err handling for Rx adapter queue add/del
  2018-06-08 18:15 [PATCH v1 0/4] eventdev: add interrupt driven queues to Rx adapter Nikhil Rao
  2018-06-08 18:15 ` [PATCH v1 1/4] eventdev: standardize Rx adapter internal function names Nikhil Rao
@ 2018-06-08 18:15 ` Nikhil Rao
  2018-06-17 13:31   ` Jerin Jacob
  2018-06-08 18:15 ` [PATCH v1 3/4] eventdev: move Rx adapter eth receive code to separate function Nikhil Rao
                   ` (3 subsequent siblings)
  5 siblings, 1 reply; 19+ messages in thread
From: Nikhil Rao @ 2018-06-08 18:15 UTC (permalink / raw)
  To: jerin.jacob; +Cc: dev, Nikhil Rao

The new WRR sequence applicable after queue add/del is set
up after setting the new queue state, so a memory allocation
failure will leave behind an incorrect state.

This change separates the memory sizing + allocation for the
Rx poll and WRR array from calculation of the WRR sequence.
If there is a memory allocation failure, existing Rx queue
configuration remains unchanged.

Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
---
 lib/librte_eventdev/rte_event_eth_rx_adapter.c | 413 ++++++++++++++++++-------
 1 file changed, 299 insertions(+), 114 deletions(-)

diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
index 9361d48..c8db11b 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
@@ -109,10 +109,16 @@ struct eth_device_info {
 	 * rx_adapter_stop callback needs to be invoked
 	 */
 	uint8_t dev_rx_started;
-	/* If nb_dev_queues > 0, the start callback will
+	/* Number of queues added for this device */
+	uint16_t nb_dev_queues;
+	/* If nb_rx_poll > 0, the start callback will
 	 * be invoked if not already invoked
 	 */
-	uint16_t nb_dev_queues;
+	uint16_t nb_rx_poll;
+	/* sum(wrr(q)) for all queues within the device
+	 * useful when deleting all device queues
+	 */
+	uint32_t wrr_len;
 };
 
 /* Per Rx queue */
@@ -188,13 +194,170 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	}
 }
 
-/* Precalculate WRR polling sequence for all queues in rx_adapter */
+static inline int
+rxa_polled_queue(struct eth_device_info *dev_info,
+	int rx_queue_id)
+{
+	struct eth_rx_queue_info *queue_info;
+
+	queue_info = &dev_info->rx_queue[rx_queue_id];
+	return !dev_info->internal_event_port &&
+		dev_info->rx_queue &&
+		queue_info->queue_enabled && queue_info->wt != 0;
+}
+
+/* Calculate size of the eth_rx_poll and wrr_sched arrays
+ * after deleting poll mode rx queues
+ */
+static void
+rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
+			struct eth_device_info *dev_info,
+			int rx_queue_id,
+			uint32_t *nb_rx_poll,
+			uint32_t *nb_wrr)
+{
+	uint32_t poll_diff;
+	uint32_t wrr_len_diff;
+
+	if (rx_queue_id == -1) {
+		poll_diff = dev_info->nb_rx_poll;
+		wrr_len_diff = dev_info->wrr_len;
+	} else {
+		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
+		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
+					0;
+	}
+
+	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
+	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
+}
+
+/* Calculate nb_rx_* after adding poll mode rx queues
+ */
+static void
+rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
+			struct eth_device_info *dev_info,
+			int rx_queue_id,
+			uint16_t wt,
+			uint32_t *nb_rx_poll,
+			uint32_t *nb_wrr)
+{
+	uint32_t poll_diff;
+	uint32_t wrr_len_diff;
+
+	if (rx_queue_id == -1) {
+		poll_diff = dev_info->dev->data->nb_rx_queues -
+						dev_info->nb_rx_poll;
+		wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
+				- dev_info->wrr_len;
+	} else {
+		poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
+		wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
+				wt - dev_info->rx_queue[rx_queue_id].wt :
+				wt;
+	}
+
+	*nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
+	*nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
+}
+
+/* Calculate nb_rx_* after adding rx_queue_id */
+static void
+rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
+		struct eth_device_info *dev_info,
+		int rx_queue_id,
+		uint16_t wt,
+		uint32_t *nb_rx_poll,
+		uint32_t *nb_wrr)
+{
+	rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
+				wt, nb_rx_poll, nb_wrr);
+}
+
+/* Calculate nb_rx_* after deleting rx_queue_id */
+static void
+rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
+		struct eth_device_info *dev_info,
+		int rx_queue_id,
+		uint32_t *nb_rx_poll,
+		uint32_t *nb_wrr)
+{
+	rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
+				nb_wrr);
+}
+
+/*
+ * Allocate the rx_poll array
+ */
+static struct eth_rx_poll_entry *
+rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
+	uint32_t num_rx_polled)
+{
+	size_t len;
+
+	len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
+							RTE_CACHE_LINE_SIZE);
+	return  rte_zmalloc_socket(rx_adapter->mem_name,
+				len,
+				RTE_CACHE_LINE_SIZE,
+				rx_adapter->socket_id);
+}
+
+/*
+ * Allocate the WRR array
+ */
+static uint32_t *
+rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
+{
+	size_t len;
+
+	len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
+			RTE_CACHE_LINE_SIZE);
+	return  rte_zmalloc_socket(rx_adapter->mem_name,
+				len,
+				RTE_CACHE_LINE_SIZE,
+				rx_adapter->socket_id);
+}
+
 static int
-rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter)
+rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
+		uint32_t nb_poll,
+		uint32_t nb_wrr,
+		struct eth_rx_poll_entry **rx_poll,
+		uint32_t **wrr_sched)
+{
+
+	if (nb_poll == 0) {
+		*rx_poll = NULL;
+		*wrr_sched = NULL;
+		return 0;
+	}
+
+	*rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
+	if (*rx_poll == NULL) {
+		*wrr_sched = NULL;
+		return -ENOMEM;
+	}
+
+	*wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
+	if (*wrr_sched == NULL) {
+		rte_free(*rx_poll);
+		return -ENOMEM;
+	}
+	return 0;
+}
+
+/* Precalculate WRR polling sequence for all queues in rx_adapter */
+static void
+rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
+		struct eth_rx_poll_entry *rx_poll,
+		uint32_t *rx_wrr)
 {
 	uint16_t d;
 	uint16_t q;
 	unsigned int i;
+	int prev = -1;
+	int cw = -1;
 
 	/* Initialize variables for calculation of wrr schedule */
 	uint16_t max_wrr_pos = 0;
@@ -202,77 +365,48 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	uint16_t max_wt = 0;
 	uint16_t gcd = 0;
 
-	struct eth_rx_poll_entry *rx_poll = NULL;
-	uint32_t *rx_wrr = NULL;
+	if (rx_poll == NULL)
+		return;
 
-	if (rx_adapter->num_rx_polled) {
-		size_t len = RTE_ALIGN(rx_adapter->num_rx_polled *
-				sizeof(*rx_adapter->eth_rx_poll),
-				RTE_CACHE_LINE_SIZE);
-		rx_poll = rte_zmalloc_socket(rx_adapter->mem_name,
-					     len,
-					     RTE_CACHE_LINE_SIZE,
-					     rx_adapter->socket_id);
-		if (rx_poll == NULL)
-			return -ENOMEM;
+	/* Generate array of all queues to poll, the size of this
+	 * array is poll_q
+	 */
+	RTE_ETH_FOREACH_DEV(d) {
+		uint16_t nb_rx_queues;
+		struct eth_device_info *dev_info =
+				&rx_adapter->eth_devices[d];
+		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+		if (dev_info->rx_queue == NULL)
+			continue;
+		if (dev_info->internal_event_port)
+			continue;
+		dev_info->wrr_len = 0;
+		for (q = 0; q < nb_rx_queues; q++) {
+			struct eth_rx_queue_info *queue_info =
+				&dev_info->rx_queue[q];
+			uint16_t wt;
 
-		/* Generate array of all queues to poll, the size of this
-		 * array is poll_q
-		 */
-		RTE_ETH_FOREACH_DEV(d) {
-			uint16_t nb_rx_queues;
-			struct eth_device_info *dev_info =
-					&rx_adapter->eth_devices[d];
-			nb_rx_queues = dev_info->dev->data->nb_rx_queues;
-			if (dev_info->rx_queue == NULL)
+			if (!rxa_polled_queue(dev_info, q))
 				continue;
-			if (dev_info->internal_event_port)
-				continue;
-			for (q = 0; q < nb_rx_queues; q++) {
-				struct eth_rx_queue_info *queue_info =
-					&dev_info->rx_queue[q];
-				if (queue_info->queue_enabled == 0)
-					continue;
-
-				uint16_t wt = queue_info->wt;
-				rx_poll[poll_q].eth_dev_id = d;
-				rx_poll[poll_q].eth_rx_qid = q;
-				max_wrr_pos += wt;
-				max_wt = RTE_MAX(max_wt, wt);
-				gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
-				poll_q++;
-			}
-		}
-
-		len = RTE_ALIGN(max_wrr_pos * sizeof(*rx_wrr),
-				RTE_CACHE_LINE_SIZE);
-		rx_wrr = rte_zmalloc_socket(rx_adapter->mem_name,
-					    len,
-					    RTE_CACHE_LINE_SIZE,
-					    rx_adapter->socket_id);
-		if (rx_wrr == NULL) {
-			rte_free(rx_poll);
-			return -ENOMEM;
-		}
-
-		/* Generate polling sequence based on weights */
-		int prev = -1;
-		int cw = -1;
-		for (i = 0; i < max_wrr_pos; i++) {
-			rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
-					     rx_poll, max_wt, gcd, prev);
-			prev = rx_wrr[i];
+			wt = queue_info->wt;
+			rx_poll[poll_q].eth_dev_id = d;
+			rx_poll[poll_q].eth_rx_qid = q;
+			max_wrr_pos += wt;
+			dev_info->wrr_len += wt;
+			max_wt = RTE_MAX(max_wt, wt);
+			gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
+			poll_q++;
 		}
 	}
 
-	rte_free(rx_adapter->eth_rx_poll);
-	rte_free(rx_adapter->wrr_sched);
-
-	rx_adapter->eth_rx_poll = rx_poll;
-	rx_adapter->wrr_sched = rx_wrr;
-	rx_adapter->wrr_len = max_wrr_pos;
-
-	return 0;
+	/* Generate polling sequence based on weights */
+	prev = -1;
+	cw = -1;
+	for (i = 0; i < max_wrr_pos; i++) {
+		rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
+				     rx_poll, max_wt, gcd, prev);
+		prev = rx_wrr[i];
+	}
 }
 
 static inline void
@@ -719,31 +853,53 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	}
 }
 
-static int
+static void
 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
 	struct eth_device_info *dev_info,
-	uint16_t rx_queue_id)
+	int32_t rx_queue_id)
 {
-	struct eth_rx_queue_info *queue_info;
+	int pollq;
 
 	if (rx_adapter->nb_queues == 0)
-		return 0;
+		return;
 
-	queue_info = &dev_info->rx_queue[rx_queue_id];
-	rx_adapter->num_rx_polled -= queue_info->queue_enabled;
+	if (rx_queue_id == -1) {
+		uint16_t nb_rx_queues;
+		uint16_t i;
+
+		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+		for (i = 0; i <	nb_rx_queues; i++)
+			rxa_sw_del(rx_adapter, dev_info, i);
+		return;
+	}
+
+	pollq = rxa_polled_queue(dev_info, rx_queue_id);
 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
-	return 0;
+	rx_adapter->num_rx_polled -= pollq;
+	dev_info->nb_rx_poll -= pollq;
 }
 
 static void
 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 	struct eth_device_info *dev_info,
-	uint16_t rx_queue_id,
+	int32_t rx_queue_id,
 	const struct rte_event_eth_rx_adapter_queue_conf *conf)
-
 {
 	struct eth_rx_queue_info *queue_info;
 	const struct rte_event *ev = &conf->ev;
+	int pollq;
+
+	if (rx_queue_id == -1) {
+		uint16_t nb_rx_queues;
+		uint16_t i;
+
+		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+		for (i = 0; i <	nb_rx_queues; i++)
+			rxa_add_queue(rx_adapter, dev_info, i, conf);
+		return;
+	}
+
+	pollq = rxa_polled_queue(dev_info, rx_queue_id);
 
 	queue_info = &dev_info->rx_queue[rx_queue_id];
 	queue_info->event_queue_id = ev->queue_id;
@@ -757,9 +913,11 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 		queue_info->flow_id_mask = ~0;
 	}
 
-	/* The same queue can be added more than once */
-	rx_adapter->num_rx_polled += !queue_info->queue_enabled;
 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
+	if (rxa_polled_queue(dev_info, rx_queue_id)) {
+		rx_adapter->num_rx_polled += !pollq;
+		dev_info->nb_rx_poll += !pollq;
+	}
 }
 
 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
@@ -769,8 +927,12 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 {
 	struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
 	struct rte_event_eth_rx_adapter_queue_conf temp_conf;
-	uint32_t i;
 	int ret;
+	struct eth_rx_poll_entry *rx_poll;
+	struct eth_rx_queue_info *rx_queue;
+	uint32_t *rx_wrr;
+	uint16_t nb_rx_queues;
+	uint32_t nb_rx_poll, nb_wrr;
 
 	if (queue_conf->servicing_weight == 0) {
 
@@ -787,31 +949,51 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 		queue_conf = &temp_conf;
 	}
 
+	nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+	rx_queue = dev_info->rx_queue;
+
 	if (dev_info->rx_queue == NULL) {
 		dev_info->rx_queue =
 		    rte_zmalloc_socket(rx_adapter->mem_name,
-				       dev_info->dev->data->nb_rx_queues *
+				       nb_rx_queues *
 				       sizeof(struct eth_rx_queue_info), 0,
 				       rx_adapter->socket_id);
 		if (dev_info->rx_queue == NULL)
 			return -ENOMEM;
 	}
+	rx_wrr = NULL;
+	rx_poll = NULL;
 
-	if (rx_queue_id == -1) {
-		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
-			rxa_add_queue(rx_adapter, dev_info, i, queue_conf);
-	} else {
-		rxa_add_queue(rx_adapter, dev_info, (uint16_t)rx_queue_id,
-			queue_conf);
-	}
+	rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
+			queue_conf->servicing_weight,
+			&nb_rx_poll, &nb_wrr);
 
-	ret = rxa_calc_wrr_sequence(rx_adapter);
-	if (ret) {
-		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
-		return ret;
+	ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
+				&rx_poll, &rx_wrr);
+	if (ret)
+		goto err_free_rxqueue;
+
+	rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
+	rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
+
+	rte_free(rx_adapter->eth_rx_poll);
+	rte_free(rx_adapter->wrr_sched);
+
+	rx_adapter->eth_rx_poll = rx_poll;
+	rx_adapter->wrr_sched = rx_wrr;
+	rx_adapter->wrr_len = nb_wrr;
+	return 0;
+
+err_free_rxqueue:
+	if (rx_queue == NULL) {
+		rte_free(dev_info->rx_queue);
+		dev_info->rx_queue = NULL;
 	}
 
-	return ret;
+	rte_free(rx_poll);
+	rte_free(rx_wrr);
+
+	return 0;
 }
 
 static int
@@ -995,7 +1177,6 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 	struct rte_event_eth_rx_adapter *rx_adapter;
 	struct rte_eventdev *dev;
 	struct eth_device_info *dev_info;
-	int start_service;
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
@@ -1038,7 +1219,6 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 		return -EINVAL;
 	}
 
-	start_service = 0;
 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
 
 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
@@ -1072,16 +1252,13 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 			ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
 					queue_conf);
 		rte_spinlock_unlock(&rx_adapter->rx_lock);
-		if (ret == 0)
-			start_service =
-				!!rxa_sw_adapter_queue_count(rx_adapter);
 	}
 
 	if (ret)
 		return ret;
 
-	if (start_service)
-		rte_service_component_runstate_set(rx_adapter->service_id, 1);
+	rte_service_component_runstate_set(rx_adapter->service_id,
+				rxa_sw_adapter_queue_count(rx_adapter));
 
 	return 0;
 }
@@ -1095,7 +1272,10 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 	struct rte_event_eth_rx_adapter *rx_adapter;
 	struct eth_device_info *dev_info;
 	uint32_t cap;
-	uint16_t i;
+	uint32_t nb_rx_poll = 0;
+	uint32_t nb_wrr = 0;
+	struct eth_rx_poll_entry *rx_poll = NULL;
+	uint32_t *rx_wrr = NULL;
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
@@ -1137,26 +1317,31 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 			}
 		}
 	} else {
-		int rc;
+		rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
+			&nb_rx_poll, &nb_wrr);
+		ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
+			&rx_poll, &rx_wrr);
+		if (ret)
+			return ret;
+
 		rte_spinlock_lock(&rx_adapter->rx_lock);
-		if (rx_queue_id == -1) {
-			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
-				rxa_sw_del(rx_adapter, dev_info, i);
-		} else {
-			rxa_sw_del(rx_adapter, dev_info, (uint16_t)rx_queue_id);
-		}
+		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
+		rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
+
+		rte_free(rx_adapter->eth_rx_poll);
+		rte_free(rx_adapter->wrr_sched);
 
-		rc = rxa_calc_wrr_sequence(rx_adapter);
-		if (rc)
-			RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32,
-					rc);
+		rx_adapter->eth_rx_poll = rx_poll;
+		rx_adapter->num_rx_polled = nb_rx_poll;
+		rx_adapter->wrr_sched = rx_wrr;
+		rx_adapter->wrr_len = nb_wrr;
 
 		if (dev_info->nb_dev_queues == 0) {
 			rte_free(dev_info->rx_queue);
 			dev_info->rx_queue = NULL;
 		}
-
 		rte_spinlock_unlock(&rx_adapter->rx_lock);
+
 		rte_service_component_runstate_set(rx_adapter->service_id,
 				rxa_sw_adapter_queue_count(rx_adapter));
 	}
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v1 3/4] eventdev: move Rx adapter eth receive code to separate function
  2018-06-08 18:15 [PATCH v1 0/4] eventdev: add interrupt driven queues to Rx adapter Nikhil Rao
  2018-06-08 18:15 ` [PATCH v1 1/4] eventdev: standardize Rx adapter internal function names Nikhil Rao
  2018-06-08 18:15 ` [PATCH v1 2/4] eventdev: improve err handling for Rx adapter queue add/del Nikhil Rao
@ 2018-06-08 18:15 ` Nikhil Rao
  2018-06-17 13:35   ` Jerin Jacob
  2018-06-08 18:15 ` [PATCH v1 3/4] eventdev: move Rx adapter eth Rx " Nikhil Rao
                   ` (2 subsequent siblings)
  5 siblings, 1 reply; 19+ messages in thread
From: Nikhil Rao @ 2018-06-08 18:15 UTC (permalink / raw)
  To: jerin.jacob; +Cc: dev, Nikhil Rao

Create a separate function that handles eth receive and
enqueue to event buffer. This function will also be called for
interrupt driven receive queues.

Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
---
 lib/librte_eventdev/rte_event_eth_rx_adapter.c | 67 ++++++++++++++++++--------
 1 file changed, 47 insertions(+), 20 deletions(-)

diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
index c8db11b..40e9bc9 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
@@ -616,6 +616,45 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	}
 }
 
+/* Enqueue packets from  <port, q>  to event buffer */
+static inline uint32_t
+rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
+	uint16_t port_id,
+	uint16_t queue_id,
+	uint32_t rx_count,
+	uint32_t max_rx)
+{
+	struct rte_mbuf *mbufs[BATCH_SIZE];
+	struct rte_eth_event_enqueue_buffer *buf =
+					&rx_adapter->event_enqueue_buffer;
+	struct rte_event_eth_rx_adapter_stats *stats =
+					&rx_adapter->stats;
+	uint16_t n;
+	uint32_t nb_rx = 0;
+
+	/* Don't do a batch dequeue from the rx queue if there isn't
+	 * enough space in the enqueue buffer.
+	 */
+	while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
+		if (buf->count >= BATCH_SIZE)
+			rxa_flush_event_buffer(rx_adapter);
+
+		stats->rx_poll_count++;
+		n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
+		if (unlikely(!n))
+			break;
+		rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
+		nb_rx += n;
+		if (rx_count + nb_rx > max_rx)
+			break;
+	}
+
+	if (buf->count >= BATCH_SIZE)
+		rxa_flush_event_buffer(rx_adapter);
+
+	return nb_rx;
+}
+
 /*
  * Polls receive queues added to the event adapter and enqueues received
  * packets to the event device.
@@ -633,17 +672,16 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
 {
 	uint32_t num_queue;
-	uint16_t n;
 	uint32_t nb_rx = 0;
-	struct rte_mbuf *mbufs[BATCH_SIZE];
 	struct rte_eth_event_enqueue_buffer *buf;
 	uint32_t wrr_pos;
 	uint32_t max_nb_rx;
+	struct rte_event_eth_rx_adapter_stats *stats;
 
 	wrr_pos = rx_adapter->wrr_pos;
 	max_nb_rx = rx_adapter->max_nb_rx;
 	buf = &rx_adapter->event_enqueue_buffer;
-	struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
+	stats = &rx_adapter->stats;
 
 	/* Iterate through a WRR sequence */
 	for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
@@ -658,32 +696,21 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 			rxa_flush_event_buffer(rx_adapter);
 		if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
 			rx_adapter->wrr_pos = wrr_pos;
-			return;
+			break;
 		}
 
-		stats->rx_poll_count++;
-		n = rte_eth_rx_burst(d, qid, mbufs, BATCH_SIZE);
-
-		if (n) {
-			stats->rx_packets += n;
-			/* The check before rte_eth_rx_burst() ensures that
-			 * all n mbufs can be buffered
-			 */
-			rxa_buffer_mbufs(rx_adapter, d, qid, mbufs, n);
-			nb_rx += n;
-			if (nb_rx > max_nb_rx) {
-				rx_adapter->wrr_pos =
+		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx);
+		if (nb_rx > max_nb_rx) {
+			rx_adapter->wrr_pos =
 				    (wrr_pos + 1) % rx_adapter->wrr_len;
-				break;
-			}
+			break;
 		}
 
 		if (++wrr_pos == rx_adapter->wrr_len)
 			wrr_pos = 0;
 	}
 
-	if (buf->count >= BATCH_SIZE)
-		rxa_flush_event_buffer(rx_adapter);
+	stats->rx_packets += nb_rx;
 }
 
 static int
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v1 3/4] eventdev: move Rx adapter eth Rx to separate function
  2018-06-08 18:15 [PATCH v1 0/4] eventdev: add interrupt driven queues to Rx adapter Nikhil Rao
                   ` (2 preceding siblings ...)
  2018-06-08 18:15 ` [PATCH v1 3/4] eventdev: move Rx adapter eth receive code to separate function Nikhil Rao
@ 2018-06-08 18:15 ` Nikhil Rao
  2018-06-17 13:36   ` Jerin Jacob
  2018-06-08 18:15 ` [PATCH v1 4/4] eventdev: add interrupt driven queues in Rx event adapter Nikhil Rao
  2018-06-27 10:55 ` [PATCH v2 0/5] eventdev: add interrupt driven queues to Rx adapter Nikhil Rao
  5 siblings, 1 reply; 19+ messages in thread
From: Nikhil Rao @ 2018-06-08 18:15 UTC (permalink / raw)
  To: jerin.jacob; +Cc: dev, Nikhil Rao

Create a separate function that handles eth receive and
enqueue to event buffer. This function will also be called for
interrupt driven receive queues.

Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
---
 lib/librte_eventdev/rte_event_eth_rx_adapter.c | 67 ++++++++++++++++++--------
 1 file changed, 47 insertions(+), 20 deletions(-)

diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
index c8db11b..40e9bc9 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
@@ -616,6 +616,45 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	}
 }
 
+/* Enqueue packets from  <port, q>  to event buffer */
+static inline uint32_t
+rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
+	uint16_t port_id,
+	uint16_t queue_id,
+	uint32_t rx_count,
+	uint32_t max_rx)
+{
+	struct rte_mbuf *mbufs[BATCH_SIZE];
+	struct rte_eth_event_enqueue_buffer *buf =
+					&rx_adapter->event_enqueue_buffer;
+	struct rte_event_eth_rx_adapter_stats *stats =
+					&rx_adapter->stats;
+	uint16_t n;
+	uint32_t nb_rx = 0;
+
+	/* Don't do a batch dequeue from the rx queue if there isn't
+	 * enough space in the enqueue buffer.
+	 */
+	while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
+		if (buf->count >= BATCH_SIZE)
+			rxa_flush_event_buffer(rx_adapter);
+
+		stats->rx_poll_count++;
+		n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
+		if (unlikely(!n))
+			break;
+		rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
+		nb_rx += n;
+		if (rx_count + nb_rx > max_rx)
+			break;
+	}
+
+	if (buf->count >= BATCH_SIZE)
+		rxa_flush_event_buffer(rx_adapter);
+
+	return nb_rx;
+}
+
 /*
  * Polls receive queues added to the event adapter and enqueues received
  * packets to the event device.
@@ -633,17 +672,16 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
 {
 	uint32_t num_queue;
-	uint16_t n;
 	uint32_t nb_rx = 0;
-	struct rte_mbuf *mbufs[BATCH_SIZE];
 	struct rte_eth_event_enqueue_buffer *buf;
 	uint32_t wrr_pos;
 	uint32_t max_nb_rx;
+	struct rte_event_eth_rx_adapter_stats *stats;
 
 	wrr_pos = rx_adapter->wrr_pos;
 	max_nb_rx = rx_adapter->max_nb_rx;
 	buf = &rx_adapter->event_enqueue_buffer;
-	struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
+	stats = &rx_adapter->stats;
 
 	/* Iterate through a WRR sequence */
 	for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
@@ -658,32 +696,21 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 			rxa_flush_event_buffer(rx_adapter);
 		if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
 			rx_adapter->wrr_pos = wrr_pos;
-			return;
+			break;
 		}
 
-		stats->rx_poll_count++;
-		n = rte_eth_rx_burst(d, qid, mbufs, BATCH_SIZE);
-
-		if (n) {
-			stats->rx_packets += n;
-			/* The check before rte_eth_rx_burst() ensures that
-			 * all n mbufs can be buffered
-			 */
-			rxa_buffer_mbufs(rx_adapter, d, qid, mbufs, n);
-			nb_rx += n;
-			if (nb_rx > max_nb_rx) {
-				rx_adapter->wrr_pos =
+		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx);
+		if (nb_rx > max_nb_rx) {
+			rx_adapter->wrr_pos =
 				    (wrr_pos + 1) % rx_adapter->wrr_len;
-				break;
-			}
+			break;
 		}
 
 		if (++wrr_pos == rx_adapter->wrr_len)
 			wrr_pos = 0;
 	}
 
-	if (buf->count >= BATCH_SIZE)
-		rxa_flush_event_buffer(rx_adapter);
+	stats->rx_packets += nb_rx;
 }
 
 static int
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v1 4/4] eventdev: add interrupt driven queues in Rx event adapter
  2018-06-08 18:15 [PATCH v1 0/4] eventdev: add interrupt driven queues to Rx adapter Nikhil Rao
                   ` (3 preceding siblings ...)
  2018-06-08 18:15 ` [PATCH v1 3/4] eventdev: move Rx adapter eth Rx " Nikhil Rao
@ 2018-06-08 18:15 ` Nikhil Rao
  2018-06-17 13:49   ` Jerin Jacob
  2018-06-27 10:55 ` [PATCH v2 0/5] eventdev: add interrupt driven queues to Rx adapter Nikhil Rao
  5 siblings, 1 reply; 19+ messages in thread
From: Nikhil Rao @ 2018-06-08 18:15 UTC (permalink / raw)
  To: jerin.jacob; +Cc: dev, Nikhil Rao

Add support for interrupt driven queues when eth device is
configured for rxq interrupts and servicing weight for the
queue is configured to be zero.

A interrupt driven packet received counter has been added to
rte_event_eth_rx_adapter_stats.

Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
---
 lib/librte_eventdev/rte_event_eth_rx_adapter.h     |    5 +-
 lib/librte_eventdev/rte_event_eth_rx_adapter.c     | 1049 +++++++++++++++++++-
 test/test/test_event_eth_rx_adapter.c              |  261 ++++-
 .../prog_guide/event_ethernet_rx_adapter.rst       |   24 +
 config/common_base                                 |    1 +
 lib/librte_eventdev/Makefile                       |    4 +-
 6 files changed, 1296 insertions(+), 48 deletions(-)

diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.h b/lib/librte_eventdev/rte_event_eth_rx_adapter.h
index 307b2b5..97f25e9 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.h
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.h
@@ -64,8 +64,7 @@
  * the service function ID of the adapter in this case.
  *
  * Note:
- * 1) Interrupt driven receive queues are currently unimplemented.
- * 2) Devices created after an instance of rte_event_eth_rx_adapter_create
+ * 1) Devices created after an instance of rte_event_eth_rx_adapter_create
  *  should be added to a new instance of the rx adapter.
  */
 
@@ -199,6 +198,8 @@ struct rte_event_eth_rx_adapter_stats {
 	 * block cycles can be used to compute the percentage of
 	 * cycles the service is blocked by the event device.
 	 */
+	uint64_t rx_intr_packets;
+	/**< Received packet count for interrupt mode Rx queues */
 };
 
 /**
diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
index 40e9bc9..d038ee4 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
@@ -2,6 +2,8 @@
  * Copyright(c) 2017 Intel Corporation.
  * All rights reserved.
  */
+#include <unistd.h>
+#include <sys/epoll.h>
 #include <rte_cycles.h>
 #include <rte_common.h>
 #include <rte_dev.h>
@@ -11,6 +13,7 @@
 #include <rte_malloc.h>
 #include <rte_service_component.h>
 #include <rte_thash.h>
+#include <rte_interrupts.h>
 
 #include "rte_eventdev.h"
 #include "rte_eventdev_pmd.h"
@@ -24,6 +27,36 @@
 #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
 
 #define RSS_KEY_SIZE	40
+/* value written to intr thread pipe to signal thread exit */
+#define ETH_BRIDGE_INTR_THREAD_EXIT	1
+/* Sentinel value to detect initialized file handle */
+#define INIT_FD		-1
+
+/*
+ * Used to communicate exit notification to interrupt thread
+ */
+union intr_pipefds {
+	RTE_STD_C11
+	struct {
+		int pipefd[2];
+	};
+	struct {
+		int readfd;
+		int writefd;
+	};
+};
+
+/*
+ * Used to store port and queue ID of interrupting Rx queue
+ */
+union queue_data {
+	RTE_STD_C11
+	void *ptr;
+	struct {
+		uint16_t port;
+		uint16_t queue;
+	};
+};
 
 /*
  * There is an instance of this struct per polled Rx queue added to the
@@ -75,6 +108,34 @@ struct rte_event_eth_rx_adapter {
 	uint16_t enq_block_count;
 	/* Block start ts */
 	uint64_t rx_enq_block_start_ts;
+	/* epoll fd used to wait for Rx interrupts */
+	int epd;
+	/* Num of interrupt driven interrupt queues */
+	uint32_t num_rx_intr;
+	/* Used to send <dev id, queue id> of interrupting Rx queues from
+	 * the interrupt thread to the Rx thread
+	 */
+	struct rte_ring *intr_ring;
+	/* Rx Queue data (dev id, queue id) for the last non-empty
+	 * queue polled
+	 */
+	union queue_data qd;
+	/* queue_data is valid */
+	int qd_valid;
+	/* Interrupt ring lock, synchronizes Rx thread
+	 * and interrupt thread
+	 */
+	rte_spinlock_t intr_ring_lock;
+	/* event array passed to rte_poll_wait */
+	struct rte_epoll_event *epoll_events;
+	/* Count of interrupt vectors in use */
+	uint32_t num_intr_vec;
+	/* fd used to send intr thread an exit notification */
+	union intr_pipefds intr_pipe;
+	/* Event used in exit notification for intr thread */
+	struct rte_epoll_event exit_ev;
+	/* Thread blocked on Rx interrupts */
+	pthread_t rx_intr_thread;
 	/* Configuration callback for rte_service configuration */
 	rte_event_eth_rx_adapter_conf_cb conf_cb;
 	/* Configuration callback argument */
@@ -111,19 +172,40 @@ struct eth_device_info {
 	uint8_t dev_rx_started;
 	/* Number of queues added for this device */
 	uint16_t nb_dev_queues;
-	/* If nb_rx_poll > 0, the start callback will
+	/* Number of poll based queues
+	 * If nb_rx_poll > 0, the start callback will
 	 * be invoked if not already invoked
 	 */
 	uint16_t nb_rx_poll;
+	/* Number of interrupt based queues
+	 * If nb_rx_intr > 0, the start callback will
+	 * be invoked if not already invoked.
+	 */
+	uint16_t nb_rx_intr;
+	/* Number of queues that use the shared interrupt */
+	uint16_t nb_shared_intr;
 	/* sum(wrr(q)) for all queues within the device
 	 * useful when deleting all device queues
 	 */
 	uint32_t wrr_len;
+	/* Intr based queue index to start polling from, this is used
+	 * if the number of shared interrupts is non-zero
+	 */
+	uint16_t next_q_idx;
+	/* Intr based queue indices */
+	uint16_t *intr_queue;
+	/* device generates per Rx queue interrupt for queue index
+	 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
+	 */
+	int multi_intr_cap;
+	/* shared interrupt enabled */
+	int shared_intr_enabled;
 };
 
 /* Per Rx queue */
 struct eth_rx_queue_info {
 	int queue_enabled;	/* True if added */
+	int intr_enabled;
 	uint16_t wt;		/* Polling weight */
 	uint8_t event_queue_id;	/* Event queue to enqueue packets to */
 	uint8_t sched_type;	/* Sched type for events */
@@ -150,7 +232,7 @@ struct eth_rx_queue_info {
 static inline int
 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
 {
-	return rx_adapter->num_rx_polled;
+	return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
 }
 
 /* Greatest common divisor */
@@ -195,6 +277,28 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 }
 
 static inline int
+rxa_shared_intr(struct eth_device_info *dev_info,
+	int rx_queue_id)
+{
+	int multi_intr_cap =
+			rte_intr_cap_multiple(dev_info->dev->intr_handle);
+	return !multi_intr_cap ||
+		rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
+}
+
+static inline int
+rxa_intr_queue(struct eth_device_info *dev_info,
+	int rx_queue_id)
+{
+	struct eth_rx_queue_info *queue_info;
+
+	queue_info = &dev_info->rx_queue[rx_queue_id];
+	return dev_info->rx_queue &&
+		!dev_info->internal_event_port &&
+		queue_info->queue_enabled && queue_info->wt == 0;
+}
+
+static inline int
 rxa_polled_queue(struct eth_device_info *dev_info,
 	int rx_queue_id)
 {
@@ -206,6 +310,95 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 		queue_info->queue_enabled && queue_info->wt != 0;
 }
 
+/* Calculate change in number of vectors after Rx queue ID is add/deleted */
+static int
+rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
+{
+	uint16_t i;
+	int n, s;
+	uint16_t nbq;
+
+	nbq = dev_info->dev->data->nb_rx_queues;
+	n = 0; /* non shared count */
+	s = 0; /* shared count */
+
+	if (rx_queue_id == -1) {
+		for (i = 0; i < nbq; i++) {
+			if (!rxa_shared_intr(dev_info, i))
+				n += add ? !rxa_intr_queue(dev_info, i) :
+					rxa_intr_queue(dev_info, i);
+			else
+				s += add ? !rxa_intr_queue(dev_info, i) :
+					rxa_intr_queue(dev_info, i);
+		}
+
+		if (s > 0) {
+			if ((add && dev_info->nb_shared_intr == 0) ||
+				(!add && dev_info->nb_shared_intr))
+				n += 1;
+		}
+	} else {
+		if (!rxa_shared_intr(dev_info, rx_queue_id))
+			n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
+				rxa_intr_queue(dev_info, rx_queue_id);
+		else
+			n = add ? !dev_info->nb_shared_intr :
+				dev_info->nb_shared_intr == 1;
+	}
+
+	return add ? n : -n;
+}
+
+/* Calculate nb_rx_intr after deleting interrupt mode rx queues
+ */
+static void
+rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
+			struct eth_device_info *dev_info,
+			int rx_queue_id,
+			uint32_t *nb_rx_intr)
+{
+	uint32_t intr_diff;
+
+	if (rx_queue_id == -1)
+		intr_diff = dev_info->nb_rx_intr;
+	else
+		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
+
+	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
+}
+
+/* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
+ * interrupt queues could currently be poll mode Rx queues
+ */
+static void
+rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
+			struct eth_device_info *dev_info,
+			int rx_queue_id,
+			uint32_t *nb_rx_poll,
+			uint32_t *nb_rx_intr,
+			uint32_t *nb_wrr)
+{
+	uint32_t intr_diff;
+	uint32_t poll_diff;
+	uint32_t wrr_len_diff;
+
+	if (rx_queue_id == -1) {
+		intr_diff = dev_info->dev->data->nb_rx_queues -
+						dev_info->nb_rx_intr;
+		poll_diff = dev_info->nb_rx_poll;
+		wrr_len_diff = dev_info->wrr_len;
+	} else {
+		intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
+		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
+		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
+					0;
+	}
+
+	*nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
+	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
+	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
+}
+
 /* Calculate size of the eth_rx_poll and wrr_sched arrays
  * after deleting poll mode rx queues
  */
@@ -240,17 +433,21 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 			int rx_queue_id,
 			uint16_t wt,
 			uint32_t *nb_rx_poll,
+			uint32_t *nb_rx_intr,
 			uint32_t *nb_wrr)
 {
+	uint32_t intr_diff;
 	uint32_t poll_diff;
 	uint32_t wrr_len_diff;
 
 	if (rx_queue_id == -1) {
+		intr_diff = dev_info->nb_rx_intr;
 		poll_diff = dev_info->dev->data->nb_rx_queues -
 						dev_info->nb_rx_poll;
 		wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
 				- dev_info->wrr_len;
 	} else {
+		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
 		poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
 		wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
 				wt - dev_info->rx_queue[rx_queue_id].wt :
@@ -258,6 +455,7 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	}
 
 	*nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
+	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
 	*nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
 }
 
@@ -268,10 +466,15 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 		int rx_queue_id,
 		uint16_t wt,
 		uint32_t *nb_rx_poll,
+		uint32_t *nb_rx_intr,
 		uint32_t *nb_wrr)
 {
-	rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
-				wt, nb_rx_poll, nb_wrr);
+	if (wt != 0)
+		rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
+					wt, nb_rx_poll, nb_rx_intr, nb_wrr);
+	else
+		rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
+					nb_rx_poll, nb_rx_intr, nb_wrr);
 }
 
 /* Calculate nb_rx_* after deleting rx_queue_id */
@@ -280,10 +483,13 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 		struct eth_device_info *dev_info,
 		int rx_queue_id,
 		uint32_t *nb_rx_poll,
+		uint32_t *nb_rx_intr,
 		uint32_t *nb_wrr)
 {
 	rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
 				nb_wrr);
+	rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
+				nb_rx_intr);
 }
 
 /*
@@ -622,7 +828,8 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	uint16_t port_id,
 	uint16_t queue_id,
 	uint32_t rx_count,
-	uint32_t max_rx)
+	uint32_t max_rx,
+	int *rxq_empty)
 {
 	struct rte_mbuf *mbufs[BATCH_SIZE];
 	struct rte_eth_event_enqueue_buffer *buf =
@@ -632,6 +839,8 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	uint16_t n;
 	uint32_t nb_rx = 0;
 
+	if (rxq_empty)
+		*rxq_empty = 0;
 	/* Don't do a batch dequeue from the rx queue if there isn't
 	 * enough space in the enqueue buffer.
 	 */
@@ -641,8 +850,11 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 
 		stats->rx_poll_count++;
 		n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
-		if (unlikely(!n))
+		if (unlikely(!n)) {
+			if (rxq_empty)
+				*rxq_empty = 1;
 			break;
+		}
 		rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
 		nb_rx += n;
 		if (rx_count + nb_rx > max_rx)
@@ -655,6 +867,237 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	return nb_rx;
 }
 
+static inline void
+rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
+		void *data)
+{
+	uint16_t port_id;
+	uint16_t queue;
+	int err;
+	union queue_data qd;
+	struct eth_device_info *dev_info;
+	struct eth_rx_queue_info *queue_info;
+	int *intr_enabled;
+
+	qd.ptr = data;
+	port_id = qd.port;
+	queue = qd.queue;
+
+	dev_info = &rx_adapter->eth_devices[port_id];
+	queue_info = &dev_info->rx_queue[queue];
+	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
+	if (rxa_shared_intr(dev_info, queue))
+		intr_enabled = &dev_info->shared_intr_enabled;
+	else
+		intr_enabled = &queue_info->intr_enabled;
+
+	if (*intr_enabled) {
+		*intr_enabled = 0;
+		err = rte_ring_enqueue(rx_adapter->intr_ring, data);
+		/* Entry should always be available.
+		 * The ring size equals the maximum number of interrupt
+		 * vectors supported (an interrupt vector is shared in
+		 * case of shared interrupts)
+		 */
+		if (err)
+			RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
+				" to ring: %s", strerror(err));
+		else
+			rte_eth_dev_rx_intr_disable(port_id, queue);
+	}
+	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
+}
+
+static int
+rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
+			uint32_t num_intr_vec)
+{
+	if (rx_adapter->num_intr_vec + num_intr_vec >
+				RTE_EVENT_ETH_INTR_RING_SIZE) {
+		RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
+		" %d needed %d limit %d", rx_adapter->num_intr_vec,
+		num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
+		return -ENOSPC;
+	}
+
+	return 0;
+}
+
+/* Delete entries for (dev, queue) from the interrupt ring */
+static void
+rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
+			struct eth_device_info *dev_info,
+			uint16_t rx_queue_id)
+{
+	int i, n;
+	union queue_data qd;
+
+	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
+
+	n = rte_ring_count(rx_adapter->intr_ring);
+	for (i = 0; i < n; i++) {
+		rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
+		if (!rxa_shared_intr(dev_info, rx_queue_id)) {
+			if (qd.port == dev_info->dev->data->port_id &&
+				qd.queue == rx_queue_id)
+				continue;
+		} else {
+			if (qd.port == dev_info->dev->data->port_id)
+				continue;
+		}
+		rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
+	}
+
+	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
+}
+
+/* pthread callback handling interrupt mode receive queues
+ * After receiving an Rx interrupt, it enqueues the port id and queue id of the
+ * interrupting queue to the adapter's ring buffer for interrupt events.
+ * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
+ * the adapter service function.
+ */
+static void *
+rxa_intr_thread(void *arg)
+{
+	struct rte_event_eth_rx_adapter *rx_adapter = arg;
+	struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
+	int n, i;
+	uint8_t val;
+	ssize_t bytes_read;
+
+	while (1) {
+		n = rte_epoll_wait(rx_adapter->epd, epoll_events,
+				   RTE_EVENT_ETH_INTR_RING_SIZE + 1, -1);
+		if (unlikely(n < 0))
+			RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
+					n);
+		for (i = 0; i < n; i++) {
+			if (epoll_events[i].fd == rx_adapter->intr_pipe.readfd)
+				goto done;
+			rxa_intr_ring_enqueue(rx_adapter,
+					epoll_events[i].epdata.data);
+		}
+	}
+
+done:
+	bytes_read = read(rx_adapter->intr_pipe.readfd, &val, sizeof(val));
+	if (bytes_read != sizeof(val))
+		RTE_EDEV_LOG_ERR("Failed to read from pipe %s",
+				strerror(errno));
+	return NULL;
+}
+
+/* Dequeue <port, q> from interrupt ring and enqueue received
+ * mbufs to eventdev
+ */
+static inline uint32_t
+rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	uint32_t n;
+	uint32_t nb_rx = 0;
+	int rxq_empty;
+	struct rte_eth_event_enqueue_buffer *buf;
+	rte_spinlock_t *ring_lock;
+	uint8_t max_done = 0;
+
+	if (rx_adapter->num_rx_intr == 0)
+		return 0;
+
+	if (rte_ring_count(rx_adapter->intr_ring) == 0
+		&& !rx_adapter->qd_valid)
+		return 0;
+
+	buf = &rx_adapter->event_enqueue_buffer;
+	ring_lock = &rx_adapter->intr_ring_lock;
+
+	if (buf->count >= BATCH_SIZE)
+		rxa_flush_event_buffer(rx_adapter);
+
+	while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
+		struct eth_device_info *dev_info;
+		uint16_t port;
+		uint16_t queue;
+		union queue_data qd  = rx_adapter->qd;
+		int err;
+
+		if (!rx_adapter->qd_valid) {
+			struct eth_rx_queue_info *queue_info;
+
+			rte_spinlock_lock(ring_lock);
+			err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
+			if (err) {
+				rte_spinlock_unlock(ring_lock);
+				break;
+			}
+
+			port = qd.port;
+			queue = qd.queue;
+			rx_adapter->qd = qd;
+			rx_adapter->qd_valid = 1;
+			dev_info = &rx_adapter->eth_devices[port];
+			if (rxa_shared_intr(dev_info, queue))
+				dev_info->shared_intr_enabled = 1;
+			else {
+				queue_info = &dev_info->rx_queue[queue];
+				queue_info->intr_enabled = 1;
+			}
+			rte_eth_dev_rx_intr_enable(port, queue);
+			rte_spinlock_unlock(ring_lock);
+		} else {
+			port = qd.port;
+			queue = qd.queue;
+
+			dev_info = &rx_adapter->eth_devices[port];
+		}
+
+		if (rxa_shared_intr(dev_info, queue)) {
+			uint16_t i;
+			uint16_t nb_queues;
+
+			nb_queues = dev_info->dev->data->nb_rx_queues;
+			n = 0;
+			for (i = dev_info->next_q_idx; i < nb_queues; i++) {
+				uint8_t enq_buffer_full;
+
+				if (!rxa_intr_queue(dev_info, i))
+					continue;
+				n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
+					rx_adapter->max_nb_rx,
+					&rxq_empty);
+				nb_rx += n;
+
+				enq_buffer_full = !rxq_empty && n == 0;
+				max_done = nb_rx > rx_adapter->max_nb_rx;
+
+				if (enq_buffer_full || max_done) {
+					dev_info->next_q_idx = i;
+					goto done;
+				}
+			}
+
+			rx_adapter->qd_valid = 0;
+
+			/* Reinitialize for next interrupt */
+			dev_info->next_q_idx = dev_info->multi_intr_cap ?
+						RTE_MAX_RXTX_INTR_VEC_ID - 1 :
+						0;
+		} else {
+			n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
+				rx_adapter->max_nb_rx,
+				&rxq_empty);
+			rx_adapter->qd_valid = !rxq_empty;
+			nb_rx += n;
+			if (nb_rx > rx_adapter->max_nb_rx)
+				break;
+		}
+	}
+
+done:
+	rx_adapter->stats.rx_intr_packets += nb_rx;
+	return nb_rx;
+}
+
 /*
  * Polls receive queues added to the event adapter and enqueues received
  * packets to the event device.
@@ -668,7 +1111,7 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
  * the hypervisor's switching layer where adjustments can be made to deal with
  * it.
  */
-static inline void
+static inline uint32_t
 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
 {
 	uint32_t num_queue;
@@ -676,7 +1119,6 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	struct rte_eth_event_enqueue_buffer *buf;
 	uint32_t wrr_pos;
 	uint32_t max_nb_rx;
-	struct rte_event_eth_rx_adapter_stats *stats;
 
 	wrr_pos = rx_adapter->wrr_pos;
 	max_nb_rx = rx_adapter->max_nb_rx;
@@ -696,10 +1138,11 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 			rxa_flush_event_buffer(rx_adapter);
 		if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
 			rx_adapter->wrr_pos = wrr_pos;
-			break;
+			return nb_rx;
 		}
 
-		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx);
+		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
+				NULL);
 		if (nb_rx > max_nb_rx) {
 			rx_adapter->wrr_pos =
 				    (wrr_pos + 1) % rx_adapter->wrr_len;
@@ -709,14 +1152,14 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 		if (++wrr_pos == rx_adapter->wrr_len)
 			wrr_pos = 0;
 	}
-
-	stats->rx_packets += nb_rx;
+	return nb_rx;
 }
 
 static int
 rxa_service_func(void *args)
 {
 	struct rte_event_eth_rx_adapter *rx_adapter = args;
+	struct rte_event_eth_rx_adapter_stats *stats;
 
 	if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
 		return 0;
@@ -724,7 +1167,10 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 		return 0;
 		rte_spinlock_unlock(&rx_adapter->rx_lock);
 	}
-	rxa_poll(rx_adapter);
+
+	stats = &rx_adapter->stats;
+	stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
+	stats->rx_packets += rxa_poll(rx_adapter);
 	rte_spinlock_unlock(&rx_adapter->rx_lock);
 	return 0;
 }
@@ -809,6 +1255,443 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 }
 
 static int
+rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	int err;
+
+	if (rx_adapter->epd != INIT_FD)
+		return 0;
+
+	rx_adapter->epd = epoll_create1(EPOLL_CLOEXEC);
+	if (rx_adapter->epd < 0) {
+		RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", errno);
+		return -errno;
+	}
+
+	if (pipe(rx_adapter->intr_pipe.pipefd) < 0) {
+		err = -errno;
+		RTE_EDEV_LOG_ERR("pipe() error, err %d", errno);
+		goto error_0;
+	}
+
+	rx_adapter->exit_ev.epdata.event = EPOLLIN;
+	rx_adapter->exit_ev.epdata.cb_fun = NULL;
+	err = rte_epoll_ctl(rx_adapter->epd, EPOLL_CTL_ADD,
+			rx_adapter->intr_pipe.readfd, &rx_adapter->exit_ev);
+	if (err) {
+		RTE_EDEV_LOG_ERR("Failed to add epoll instance, err %d", err);
+		goto error_1;
+	}
+
+	return 0;
+
+error_1:
+	close(rx_adapter->intr_pipe.writefd);
+	close(rx_adapter->intr_pipe.readfd);
+	rx_adapter->intr_pipe.writefd = INIT_FD;
+	rx_adapter->intr_pipe.readfd = INIT_FD;
+
+error_0:
+	close(rx_adapter->epd);
+	rx_adapter->epd = INIT_FD;
+
+	return err;
+}
+
+/* Affinitize interrupt thread to the same cores as the service function
+ * If the service function has not been mapped to cores then affinitize
+ * the interrupt thread to the master lcore
+ */
+static int
+rxa_affinitize_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	rte_cpuset_t rx_cpuset;
+	uint32_t i;
+	uint32_t n_sc;
+	uint32_t service_cores[RTE_MAX_LCORE];
+	int err;
+
+	CPU_ZERO(&rx_cpuset);
+	n_sc = rte_service_lcore_list(service_cores, RTE_MAX_LCORE);
+
+	for (i = 0; i < n_sc; i++) {
+		struct lcore_config *c = &lcore_config[service_cores[i]];
+		if (rte_service_map_lcore_get(rx_adapter->service_id,
+					service_cores[i]))
+			CPU_OR(&rx_cpuset, &rx_cpuset, &c->cpuset);
+	}
+
+	if (CPU_COUNT(&rx_cpuset) == 0) {
+		struct rte_config *cfg = rte_eal_get_configuration();
+		struct lcore_config *c = &lcore_config[cfg->master_lcore];
+		CPU_OR(&rx_cpuset, &rx_cpuset, &c->cpuset);
+	}
+
+	err = pthread_setaffinity_np(rx_adapter->rx_intr_thread,
+				sizeof(cpu_set_t),
+				&rx_cpuset);
+	if (err != 0)
+		RTE_EDEV_LOG_ERR("pthread_setaffinity_np() failed, err %d",
+				err);
+	return -err;
+}
+
+static int
+rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	int err;
+	uint8_t val;
+	char thread_name[RTE_MAX_THREAD_NAME_LEN];
+
+	if (rx_adapter->intr_ring)
+		return 0;
+
+	rx_adapter->intr_ring = rte_ring_create("intr_ring",
+					RTE_EVENT_ETH_INTR_RING_SIZE,
+					rte_socket_id(), 0);
+	if (!rx_adapter->intr_ring)
+		return -ENOMEM;
+
+	rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
+					(RTE_EVENT_ETH_INTR_RING_SIZE + 1) *
+					sizeof(struct rte_epoll_event),
+					RTE_CACHE_LINE_SIZE,
+					rx_adapter->socket_id);
+	if (!rx_adapter->epoll_events) {
+		err = -ENOMEM;
+		goto error;
+	}
+
+	rte_spinlock_init(&rx_adapter->intr_ring_lock);
+
+	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
+			"rx-intr-thread");
+	err = pthread_create(&rx_adapter->rx_intr_thread, NULL,
+			rxa_intr_thread, rx_adapter);
+	if (err) {
+		err = -err;
+		goto error;
+	}
+
+	err = rxa_affinitize_intr_thread(rx_adapter);
+	if (!err) {
+		rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
+		return 0;
+	}
+
+	val = ETH_BRIDGE_INTR_THREAD_EXIT;
+	/* This write wakes up the interrupt thread that is
+	 * blocked in rte_epoll_wait()
+	 */
+	if (write(rx_adapter->intr_pipe.writefd, &val,
+		sizeof(val)) <= 0) {
+		RTE_EDEV_LOG_ERR("Failed to notify intr rx thread %s",
+				strerror(errno));
+		err = -errno;
+	} else {
+		err = pthread_join(rx_adapter->rx_intr_thread, NULL);
+		if (err) {
+			RTE_EDEV_LOG_ERR("pthread_join failed, err %d", err);
+			err = -err;
+		}
+	}
+
+error:
+	rte_ring_free(rx_adapter->intr_ring);
+	rx_adapter->intr_ring = NULL;
+	rx_adapter->epoll_events = NULL;
+	return err;
+}
+
+static int
+rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	int err;
+	uint8_t val = ETH_BRIDGE_INTR_THREAD_EXIT;
+
+	/* This write wakes up the interrupt thread that is
+	 * blocked in rte_epoll_wait()
+	 */
+	if (write(rx_adapter->intr_pipe.writefd, &val,
+						sizeof(val)) <= 0) {
+		RTE_EDEV_LOG_ERR("Failed to notify intr rx thread %s",
+			strerror(errno));
+		return -errno;
+	}
+
+	err = pthread_join(rx_adapter->rx_intr_thread, NULL);
+	if (err != 0) {
+		RTE_EDEV_LOG_ERR("Failed to join thread err: %d\n", err);
+		return -err;
+	}
+
+	rte_free(rx_adapter->epoll_events);
+	rte_ring_free(rx_adapter->intr_ring);
+	rx_adapter->intr_ring = NULL;
+	rx_adapter->epoll_events = NULL;
+	return 0;
+}
+
+static int
+rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	int ret;
+
+	if (rx_adapter->num_rx_intr == 0)
+		return 0;
+
+	ret = rxa_destroy_intr_thread(rx_adapter);
+	if (ret)
+		return ret;
+	ret = rte_epoll_ctl(rx_adapter->epd, EPOLL_CTL_DEL,
+			rx_adapter->intr_pipe.readfd, &rx_adapter->exit_ev);
+	if (ret)
+		RTE_EDEV_LOG_ERR("Failed to delete fd from epoll err: %d\n",
+				ret);
+
+	close(rx_adapter->intr_pipe.writefd);
+	close(rx_adapter->intr_pipe.readfd);
+	close(rx_adapter->epd);
+
+	rx_adapter->intr_pipe.writefd = INIT_FD;
+	rx_adapter->intr_pipe.readfd = INIT_FD;
+	rx_adapter->epd = INIT_FD;
+
+	return ret;
+}
+
+static int
+rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
+	struct eth_device_info *dev_info,
+	uint16_t rx_queue_id)
+{
+	int err;
+	uint16_t eth_dev_id = dev_info->dev->data->port_id;
+	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
+
+	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
+	if (err) {
+		RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
+			rx_queue_id);
+		return err;
+	}
+
+	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
+					rx_adapter->epd,
+					RTE_INTR_EVENT_DEL,
+					0);
+	if (err)
+		RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
+
+	if (sintr)
+		dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
+	else
+		dev_info->shared_intr_enabled = 0;
+	return err;
+}
+
+static int
+rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
+		struct eth_device_info *dev_info,
+		int rx_queue_id)
+{
+	int err;
+	int i;
+	int s;
+
+	if (dev_info->nb_rx_intr == 0)
+		return 0;
+
+	err = 0;
+	if (rx_queue_id == -1) {
+		s = dev_info->nb_shared_intr;
+		for (i = 0; i < dev_info->nb_rx_intr; i++) {
+			int sintr;
+			uint16_t q;
+
+			q = dev_info->intr_queue[i];
+			sintr = rxa_shared_intr(dev_info, q);
+			s -= sintr;
+
+			if (!sintr || s == 0) {
+
+				err = rxa_disable_intr(rx_adapter, dev_info,
+						q);
+				if (err)
+					return err;
+				rxa_intr_ring_del_entries(rx_adapter, dev_info,
+							q);
+			}
+		}
+	} else {
+		if (!rxa_intr_queue(dev_info, rx_queue_id))
+			return 0;
+		if (!rxa_shared_intr(dev_info, rx_queue_id) ||
+				dev_info->nb_shared_intr == 1) {
+			err = rxa_disable_intr(rx_adapter, dev_info,
+					rx_queue_id);
+			if (err)
+				return err;
+			rxa_intr_ring_del_entries(rx_adapter, dev_info,
+						rx_queue_id);
+		}
+
+		for (i = 0; i < dev_info->nb_rx_intr; i++) {
+			if (dev_info->intr_queue[i] == rx_queue_id) {
+				for (; i < dev_info->nb_rx_intr - 1; i++)
+					dev_info->intr_queue[i] =
+						dev_info->intr_queue[i + 1];
+				break;
+			}
+		}
+	}
+
+	return err;
+}
+
+static int
+rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
+	struct eth_device_info *dev_info,
+	uint16_t rx_queue_id)
+{
+	int err, err1;
+	uint16_t eth_dev_id = dev_info->dev->data->port_id;
+	union queue_data qd;
+	int init_fd;
+	uint16_t *intr_queue;
+	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
+
+	if (rxa_intr_queue(dev_info, rx_queue_id))
+		return 0;
+
+	intr_queue = dev_info->intr_queue;
+	if (dev_info->intr_queue == NULL) {
+		size_t len =
+			dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
+		dev_info->intr_queue =
+			rte_zmalloc_socket(
+				rx_adapter->mem_name,
+				len,
+				0,
+				rx_adapter->socket_id);
+		if (dev_info->intr_queue == NULL)
+			return -ENOMEM;
+	}
+
+	init_fd = rx_adapter->epd;
+	err = rxa_init_epd(rx_adapter);
+	if (err)
+		goto err_free_queue;
+
+	qd.port = eth_dev_id;
+	qd.queue = rx_queue_id;
+
+	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
+					rx_adapter->epd,
+					RTE_INTR_EVENT_ADD,
+					qd.ptr);
+	if (err) {
+		RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
+			" Rx Queue %u err %d", rx_queue_id, err);
+		goto err_del_fd;
+	}
+
+	err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
+	if (err) {
+		RTE_EDEV_LOG_ERR("Could not enable interrupt for"
+				" Rx Queue %u err %d", rx_queue_id, err);
+
+		goto err_del_event;
+	}
+
+	err = rxa_create_intr_thread(rx_adapter);
+	if (!err)  {
+		if (sintr)
+			dev_info->shared_intr_enabled = 1;
+		else
+			dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
+		return 0;
+	}
+
+
+	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
+	if (err)
+		RTE_EDEV_LOG_ERR("Could not disable interrupt for"
+				" Rx Queue %u err %d", rx_queue_id, err);
+err_del_event:
+	err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
+					rx_adapter->epd,
+					RTE_INTR_EVENT_DEL,
+					0);
+	if (err1) {
+		RTE_EDEV_LOG_ERR("Could not delete event for"
+				" Rx Queue %u err %d", rx_queue_id, err1);
+	}
+err_del_fd:
+	if (init_fd == INIT_FD) {
+		close(rx_adapter->epd);
+		rx_adapter->epd = -1;
+	}
+err_free_queue:
+	if (intr_queue == NULL)
+		rte_free(dev_info->intr_queue);
+
+	return err;
+}
+
+static int
+rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
+	struct eth_device_info *dev_info,
+	int rx_queue_id)
+
+{
+	int i, j, err;
+	int si = -1;
+	int shared_done = (dev_info->nb_shared_intr > 0);
+
+	if (rx_queue_id != -1) {
+		if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
+			return 0;
+		return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
+	}
+
+	err = 0;
+	for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
+
+		if (rxa_shared_intr(dev_info, i) && shared_done)
+			continue;
+
+		err = rxa_config_intr(rx_adapter, dev_info, i);
+
+		shared_done = err == 0 && rxa_shared_intr(dev_info, i);
+		if (shared_done) {
+			si = i;
+			dev_info->shared_intr_enabled = 1;
+		}
+		if (err)
+			break;
+	}
+
+	if (err == 0)
+		return 0;
+
+	shared_done = (dev_info->nb_shared_intr > 0);
+	for (j = 0; j < i; j++) {
+		if (rxa_intr_queue(dev_info, j))
+			continue;
+		if (rxa_shared_intr(dev_info, j) && si != j)
+			continue;
+		err = rxa_disable_intr(rx_adapter, dev_info, j);
+		if (err)
+			break;
+
+	}
+
+	return err;
+}
+
+
+static int
 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
 {
 	int ret;
@@ -843,6 +1726,7 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
 	rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
 	rx_adapter->service_inited = 1;
+	rx_adapter->epd = INIT_FD;
 	return 0;
 
 err_done:
@@ -886,6 +1770,9 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	int32_t rx_queue_id)
 {
 	int pollq;
+	int intrq;
+	int sintrq;
+
 
 	if (rx_adapter->nb_queues == 0)
 		return;
@@ -901,9 +1788,14 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	}
 
 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
+	intrq = rxa_intr_queue(dev_info, rx_queue_id);
+	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
 	rx_adapter->num_rx_polled -= pollq;
 	dev_info->nb_rx_poll -= pollq;
+	rx_adapter->num_rx_intr -= intrq;
+	dev_info->nb_rx_intr -= intrq;
+	dev_info->nb_shared_intr -= intrq && sintrq;
 }
 
 static void
@@ -915,6 +1807,8 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	struct eth_rx_queue_info *queue_info;
 	const struct rte_event *ev = &conf->ev;
 	int pollq;
+	int intrq;
+	int sintrq;
 
 	if (rx_queue_id == -1) {
 		uint16_t nb_rx_queues;
@@ -927,6 +1821,8 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	}
 
 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
+	intrq = rxa_intr_queue(dev_info, rx_queue_id);
+	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
 
 	queue_info = &dev_info->rx_queue[rx_queue_id];
 	queue_info->event_queue_id = ev->queue_id;
@@ -944,6 +1840,24 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	if (rxa_polled_queue(dev_info, rx_queue_id)) {
 		rx_adapter->num_rx_polled += !pollq;
 		dev_info->nb_rx_poll += !pollq;
+		rx_adapter->num_rx_intr -= intrq;
+		dev_info->nb_rx_intr -= intrq;
+		dev_info->nb_shared_intr -= intrq && sintrq;
+	}
+
+	if (rxa_intr_queue(dev_info, rx_queue_id)) {
+		rx_adapter->num_rx_polled -= pollq;
+		dev_info->nb_rx_poll -= pollq;
+		rx_adapter->num_rx_intr += !intrq;
+		dev_info->nb_rx_intr += !intrq;
+		dev_info->nb_shared_intr += !intrq && sintrq;
+		if (dev_info->nb_shared_intr == 1) {
+			if (dev_info->multi_intr_cap)
+				dev_info->next_q_idx =
+					RTE_MAX_RXTX_INTR_VEC_ID - 1;
+			else
+				dev_info->next_q_idx = 0;
+		}
 	}
 }
 
@@ -960,24 +1874,24 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 	uint32_t *rx_wrr;
 	uint16_t nb_rx_queues;
 	uint32_t nb_rx_poll, nb_wrr;
+	uint32_t nb_rx_intr;
+	int num_intr_vec;
+	uint16_t wt;
 
 	if (queue_conf->servicing_weight == 0) {
-
 		struct rte_eth_dev_data *data = dev_info->dev->data;
-		if (data->dev_conf.intr_conf.rxq) {
-			RTE_EDEV_LOG_ERR("Interrupt driven queues"
-					" not supported");
-			return -ENOTSUP;
-		}
-		temp_conf = *queue_conf;
 
-		/* If Rx interrupts are disabled set wt = 1 */
-		temp_conf.servicing_weight = 1;
+		temp_conf = *queue_conf;
+		if (!data->dev_conf.intr_conf.rxq) {
+			/* If Rx interrupts are disabled set wt = 1 */
+			temp_conf.servicing_weight = 1;
+		}
 		queue_conf = &temp_conf;
 	}
 
 	nb_rx_queues = dev_info->dev->data->nb_rx_queues;
 	rx_queue = dev_info->rx_queue;
+	wt = queue_conf->servicing_weight;
 
 	if (dev_info->rx_queue == NULL) {
 		dev_info->rx_queue =
@@ -993,13 +1907,64 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 
 	rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
 			queue_conf->servicing_weight,
-			&nb_rx_poll, &nb_wrr);
+			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
+
+	dev_info->multi_intr_cap =
+			rte_intr_cap_multiple(dev_info->dev->intr_handle);
 
 	ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
 				&rx_poll, &rx_wrr);
 	if (ret)
 		goto err_free_rxqueue;
 
+	if (wt == 0) {
+		num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
+
+		ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
+		if (ret)
+			goto err_free_rxqueue;
+
+		ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
+		if (ret)
+			goto err_free_rxqueue;
+	} else {
+
+		num_intr_vec = 0;
+		if (rx_adapter->num_rx_intr > nb_rx_intr) {
+			num_intr_vec = rxa_nb_intr_vect(dev_info,
+						rx_queue_id, 0);
+			/* interrupt based queues are being converted to
+			 * poll mode queues, delete the interrupt configuration
+			 * for those.
+			 */
+			ret = rxa_del_intr_queue(rx_adapter,
+						dev_info, rx_queue_id);
+			if (ret)
+				goto err_free_rxqueue;
+		}
+	}
+
+	if (nb_rx_intr == 0) {
+		ret = rxa_free_intr_resources(rx_adapter);
+		if (ret)
+			goto err_free_rxqueue;
+	}
+
+	if (wt == 0) {
+		uint16_t i;
+
+		if (rx_queue_id  == -1) {
+			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
+				dev_info->intr_queue[i] = i;
+		} else {
+			if (!rxa_intr_queue(dev_info, rx_queue_id))
+				dev_info->intr_queue[nb_rx_intr - 1] =
+					rx_queue_id;
+		}
+	}
+
+
+
 	rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
 	rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
 
@@ -1009,6 +1974,7 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 	rx_adapter->eth_rx_poll = rx_poll;
 	rx_adapter->wrr_sched = rx_wrr;
 	rx_adapter->wrr_len = nb_wrr;
+	rx_adapter->num_intr_vec += num_intr_vec;
 	return 0;
 
 err_free_rxqueue:
@@ -1301,8 +2267,10 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 	uint32_t cap;
 	uint32_t nb_rx_poll = 0;
 	uint32_t nb_wrr = 0;
+	uint32_t nb_rx_intr;
 	struct eth_rx_poll_entry *rx_poll = NULL;
 	uint32_t *rx_wrr = NULL;
+	int num_intr_vec;
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
@@ -1345,29 +2313,59 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 		}
 	} else {
 		rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
-			&nb_rx_poll, &nb_wrr);
+			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
+
 		ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
 			&rx_poll, &rx_wrr);
 		if (ret)
 			return ret;
 
 		rte_spinlock_lock(&rx_adapter->rx_lock);
+
+		num_intr_vec = 0;
+		if (rx_adapter->num_rx_intr > nb_rx_intr) {
+
+			num_intr_vec = rxa_nb_intr_vect(dev_info,
+						rx_queue_id, 0);
+			ret = rxa_del_intr_queue(rx_adapter, dev_info,
+					rx_queue_id);
+			if (ret)
+				goto unlock_ret;
+		}
+
+		if (nb_rx_intr == 0) {
+			ret = rxa_free_intr_resources(rx_adapter);
+			if (ret)
+				goto unlock_ret;
+		}
+
 		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
 		rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
 
 		rte_free(rx_adapter->eth_rx_poll);
 		rte_free(rx_adapter->wrr_sched);
 
+		if (nb_rx_intr == 0) {
+			rte_free(dev_info->intr_queue);
+			dev_info->intr_queue = NULL;
+		}
+
 		rx_adapter->eth_rx_poll = rx_poll;
-		rx_adapter->num_rx_polled = nb_rx_poll;
 		rx_adapter->wrr_sched = rx_wrr;
 		rx_adapter->wrr_len = nb_wrr;
+		rx_adapter->num_intr_vec += num_intr_vec;
 
 		if (dev_info->nb_dev_queues == 0) {
 			rte_free(dev_info->rx_queue);
 			dev_info->rx_queue = NULL;
 		}
+unlock_ret:
 		rte_spinlock_unlock(&rx_adapter->rx_lock);
+		if (ret) {
+			rte_free(rx_poll);
+			rte_free(rx_wrr);
+			return ret;
+		}
 
 		rte_service_component_runstate_set(rx_adapter->service_id,
 				rxa_sw_adapter_queue_count(rx_adapter));
@@ -1376,7 +2374,6 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 	return ret;
 }
 
-
 int
 rte_event_eth_rx_adapter_start(uint8_t id)
 {
diff --git a/test/test/test_event_eth_rx_adapter.c b/test/test/test_event_eth_rx_adapter.c
index dee632b..d247c0a 100644
--- a/test/test/test_event_eth_rx_adapter.c
+++ b/test/test/test_event_eth_rx_adapter.c
@@ -25,28 +25,17 @@ struct event_eth_rx_adapter_test_params {
 	struct rte_mempool *mp;
 	uint16_t rx_rings, tx_rings;
 	uint32_t caps;
+	int rx_intr_port_inited;
+	uint16_t rx_intr_port;
 };
 
 static struct event_eth_rx_adapter_test_params default_params;
 
 static inline int
-port_init(uint8_t port, struct rte_mempool *mp)
+port_init_common(uint8_t port, const struct rte_eth_conf *port_conf,
+		struct rte_mempool *mp)
 {
-	static const struct rte_eth_conf port_conf_default = {
-		.rxmode = {
-			.mq_mode = ETH_MQ_RX_RSS,
-			.max_rx_pkt_len = ETHER_MAX_LEN
-		},
-		.rx_adv_conf = {
-			.rss_conf = {
-				.rss_hf = ETH_RSS_IP |
-					  ETH_RSS_TCP |
-					  ETH_RSS_UDP,
-			}
-		}
-	};
 	const uint16_t rx_ring_size = 512, tx_ring_size = 512;
-	struct rte_eth_conf port_conf = port_conf_default;
 	int retval;
 	uint16_t q;
 	struct rte_eth_dev_info dev_info;
@@ -54,7 +43,7 @@ struct event_eth_rx_adapter_test_params {
 	if (!rte_eth_dev_is_valid_port(port))
 		return -1;
 
-	retval = rte_eth_dev_configure(port, 0, 0, &port_conf);
+	retval = rte_eth_dev_configure(port, 0, 0, port_conf);
 
 	rte_eth_dev_info_get(port, &dev_info);
 
@@ -64,7 +53,7 @@ struct event_eth_rx_adapter_test_params {
 
 	/* Configure the Ethernet device. */
 	retval = rte_eth_dev_configure(port, default_params.rx_rings,
-				default_params.tx_rings, &port_conf);
+				default_params.tx_rings, port_conf);
 	if (retval != 0)
 		return retval;
 
@@ -104,6 +93,77 @@ struct event_eth_rx_adapter_test_params {
 	return 0;
 }
 
+static inline int
+port_init_rx_intr(uint8_t port, struct rte_mempool *mp)
+{
+	static const struct rte_eth_conf port_conf_default = {
+		.rxmode = {
+			.mq_mode = ETH_MQ_RX_RSS,
+			.max_rx_pkt_len = ETHER_MAX_LEN
+		},
+		.intr_conf = {
+			.rxq = 1,
+		},
+	};
+
+	return port_init_common(port, &port_conf_default, mp);
+}
+
+static inline int
+port_init(uint8_t port, struct rte_mempool *mp)
+{
+	static const struct rte_eth_conf port_conf_default = {
+		.rxmode = {
+			.mq_mode = ETH_MQ_RX_RSS,
+			.max_rx_pkt_len = ETHER_MAX_LEN
+		},
+		.rx_adv_conf = {
+			.rss_conf = {
+				.rss_hf = ETH_RSS_IP |
+					ETH_RSS_TCP |
+					ETH_RSS_UDP,
+			}
+		}
+	};
+
+	return port_init_common(port, &port_conf_default, mp);
+}
+
+static int
+init_port_rx_intr(int num_ports)
+{
+	int retval;
+	uint16_t portid;
+	int err;
+
+	default_params.mp = rte_pktmbuf_pool_create("packet_pool",
+						   NB_MBUFS,
+						   MBUF_CACHE_SIZE,
+						   MBUF_PRIV_SIZE,
+						   RTE_MBUF_DEFAULT_BUF_SIZE,
+						   rte_socket_id());
+	if (!default_params.mp)
+		return -ENOMEM;
+
+	RTE_ETH_FOREACH_DEV(portid) {
+		retval = port_init_rx_intr(portid, default_params.mp);
+		if (retval)
+			continue;
+		err = rte_event_eth_rx_adapter_caps_get(TEST_DEV_ID, portid,
+							&default_params.caps);
+		if (err)
+			continue;
+		if (!(default_params.caps &
+			RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) {
+			default_params.rx_intr_port_inited = 1;
+			default_params.rx_intr_port = portid;
+			return 0;
+		}
+		rte_eth_dev_stop(portid);
+	}
+	return 0;
+}
+
 static int
 init_ports(int num_ports)
 {
@@ -175,6 +235,57 @@ struct event_eth_rx_adapter_test_params {
 	return err;
 }
 
+static int
+testsuite_setup_rx_intr(void)
+{
+	int err;
+	uint8_t count;
+	struct rte_event_dev_info dev_info;
+
+	count = rte_event_dev_count();
+	if (!count) {
+		printf("Failed to find a valid event device,"
+			" testing with event_skeleton device\n");
+		rte_vdev_init("event_skeleton", NULL);
+	}
+
+	struct rte_event_dev_config config = {
+		.nb_event_queues = 1,
+		.nb_event_ports = 1,
+	};
+
+	err = rte_event_dev_info_get(TEST_DEV_ID, &dev_info);
+	config.nb_event_queue_flows = dev_info.max_event_queue_flows;
+	config.nb_event_port_dequeue_depth =
+			dev_info.max_event_port_dequeue_depth;
+	config.nb_event_port_enqueue_depth =
+			dev_info.max_event_port_enqueue_depth;
+	config.nb_events_limit =
+			dev_info.max_num_events;
+
+	err = rte_event_dev_configure(TEST_DEV_ID, &config);
+	TEST_ASSERT(err == 0, "Event device initialization failed err %d\n",
+			err);
+
+	/*
+	 * eth devices like octeontx use event device to receive packets
+	 * so rte_eth_dev_start invokes rte_event_dev_start internally, so
+	 * call init_ports after rte_event_dev_configure
+	 */
+	err = init_port_rx_intr(rte_eth_dev_count_total());
+	TEST_ASSERT(err == 0, "Port initialization failed err %d\n", err);
+
+	if (!default_params.rx_intr_port_inited)
+		return 0;
+
+	err = rte_event_eth_rx_adapter_caps_get(TEST_DEV_ID,
+						default_params.rx_intr_port,
+						&default_params.caps);
+	TEST_ASSERT(err == 0, "Failed to get adapter cap err %d\n", err);
+
+	return err;
+}
+
 static void
 testsuite_teardown(void)
 {
@@ -185,6 +296,16 @@ struct event_eth_rx_adapter_test_params {
 	rte_mempool_free(default_params.mp);
 }
 
+static void
+testsuite_teardown_rx_intr(void)
+{
+	if (!default_params.rx_intr_port_inited)
+		return;
+
+	rte_eth_dev_stop(default_params.rx_intr_port);
+	rte_mempool_free(default_params.mp);
+}
+
 static int
 adapter_create(void)
 {
@@ -333,6 +454,89 @@ struct event_eth_rx_adapter_test_params {
 }
 
 static int
+adapter_intr_queue_add_del(void)
+{
+	int err;
+	struct rte_event ev;
+	uint32_t cap;
+	uint16_t eth_port;
+	struct rte_event_eth_rx_adapter_queue_conf queue_config;
+
+	if (!default_params.rx_intr_port_inited)
+		return 0;
+
+	eth_port = default_params.rx_intr_port;
+	err = rte_event_eth_rx_adapter_caps_get(TEST_DEV_ID, eth_port, &cap);
+	TEST_ASSERT(err == 0, "Expected 0 got %d", err);
+
+	ev.queue_id = 0;
+	ev.sched_type = RTE_SCHED_TYPE_ATOMIC;
+	ev.priority = 0;
+
+	queue_config.rx_queue_flags = 0;
+	queue_config.ev = ev;
+
+	/* weight = 0 => interrupt mode */
+	queue_config.servicing_weight = 0;
+
+	/* add queue 0 */
+	err = rte_event_eth_rx_adapter_queue_add(TEST_INST_ID,
+						TEST_ETHDEV_ID, 0,
+						&queue_config);
+	TEST_ASSERT(err == 0, "Expected 0 got %d", err);
+
+	/* add all queues */
+	queue_config.servicing_weight = 0;
+	err = rte_event_eth_rx_adapter_queue_add(TEST_INST_ID,
+						TEST_ETHDEV_ID,
+						-1,
+						&queue_config);
+	TEST_ASSERT(err == 0, "Expected 0 got %d", err);
+
+	/* del queue 0 */
+	err = rte_event_eth_rx_adapter_queue_del(TEST_INST_ID,
+						TEST_ETHDEV_ID,
+						0);
+	TEST_ASSERT(err == 0, "Expected 0 got %d", err);
+
+	/* del remaining queues */
+	err = rte_event_eth_rx_adapter_queue_del(TEST_INST_ID,
+						TEST_ETHDEV_ID,
+						-1);
+	TEST_ASSERT(err == 0, "Expected 0 got %d", err);
+
+	/* add all queues */
+	queue_config.servicing_weight = 0;
+	err = rte_event_eth_rx_adapter_queue_add(TEST_INST_ID,
+						TEST_ETHDEV_ID,
+						-1,
+						&queue_config);
+	TEST_ASSERT(err == 0, "Expected 0 got %d", err);
+
+	/* intr -> poll mode queue */
+	queue_config.servicing_weight = 1;
+	err = rte_event_eth_rx_adapter_queue_add(TEST_INST_ID,
+						TEST_ETHDEV_ID,
+						0,
+						&queue_config);
+	TEST_ASSERT(err == 0, "Expected 0 got %d", err);
+
+	err = rte_event_eth_rx_adapter_queue_add(TEST_INST_ID,
+						TEST_ETHDEV_ID,
+						-1,
+						 &queue_config);
+	TEST_ASSERT(err == 0, "Expected 0 got %d", err);
+
+	/* del queues */
+	err = rte_event_eth_rx_adapter_queue_del(TEST_INST_ID,
+						TEST_ETHDEV_ID,
+						-1);
+	TEST_ASSERT(err == 0, "Expected 0 got %d", err);
+
+	return TEST_SUCCESS;
+}
+
+static int
 adapter_start_stop(void)
 {
 	int err;
@@ -402,7 +606,7 @@ struct event_eth_rx_adapter_test_params {
 	return TEST_SUCCESS;
 }
 
-static struct unit_test_suite service_tests  = {
+static struct unit_test_suite event_eth_rx_tests = {
 	.suite_name = "rx event eth adapter test suite",
 	.setup = testsuite_setup,
 	.teardown = testsuite_teardown,
@@ -416,11 +620,30 @@ struct event_eth_rx_adapter_test_params {
 	}
 };
 
+static struct unit_test_suite event_eth_rx_intr_tests = {
+	.suite_name = "rx event eth adapter test suite",
+	.setup = testsuite_setup_rx_intr,
+	.teardown = testsuite_teardown_rx_intr,
+	.unit_test_cases = {
+		TEST_CASE_ST(adapter_create, adapter_free,
+			adapter_intr_queue_add_del),
+		TEST_CASES_END() /**< NULL terminate unit test array */
+	}
+};
+
 static int
 test_event_eth_rx_adapter_common(void)
 {
-	return unit_test_suite_runner(&service_tests);
+	return unit_test_suite_runner(&event_eth_rx_tests);
+}
+
+static int
+test_event_eth_rx_intr_adapter_common(void)
+{
+	return unit_test_suite_runner(&event_eth_rx_intr_tests);
 }
 
 REGISTER_TEST_COMMAND(event_eth_rx_adapter_autotest,
 		test_event_eth_rx_adapter_common);
+REGISTER_TEST_COMMAND(event_eth_rx_intr_adapter_autotest,
+		test_event_eth_rx_intr_adapter_common);
diff --git a/doc/guides/prog_guide/event_ethernet_rx_adapter.rst b/doc/guides/prog_guide/event_ethernet_rx_adapter.rst
index 319e4f0..2f055ec 100644
--- a/doc/guides/prog_guide/event_ethernet_rx_adapter.rst
+++ b/doc/guides/prog_guide/event_ethernet_rx_adapter.rst
@@ -144,3 +144,27 @@ enqueued event counts are a sum of the counts from the eventdev PMD callbacks
 if the callback is supported, and the counts maintained by the service function,
 if one exists. The service function also maintains a count of cycles for which
 it was not able to enqueue to the event device.
+
+Interrupt Based Rx Queues
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The service core function is typically set up to poll ethernet Rx queues for
+packets. Certain queues may have low packet rates and it would be more
+efficient to enable the Rx queue interrupt and read packets after receiving
+the interrupt.
+
+The servicing_weight member of struct rte_event_eth_rx_adapter_queue_conf
+is applicable when the adapter uses a service core function. The application
+has to enable Rx queue interrupts when configuring the ethernet device
+uing the ``rte_eth_dev_configue()`` function and then use a servicing_weight
+of zero when addding the Rx queue to the adapter.
+
+The adapter creates a thread blocked on the interrupt, on an interrupt this
+thread enqueues the port id and the queue id to a ring buffer. The adapter
+service function dequeues the port id and queue id from the ring buffer,
+invokes the ``rte_eth_rx_burst()`` to receive packets on the queue and
+converts the received packets to events in the same manner as packets
+received on a polled Rx queue. The interrupt thread is affinitized to the same
+CPUs as the lcores of the Rx adapter service function, if the Rx adapter
+service function has not been mapped to any lcores, the interrupt thread
+is mapped to the master lcore.
diff --git a/config/common_base b/config/common_base
index 6b0d1cb..bc32956 100644
--- a/config/common_base
+++ b/config/common_base
@@ -597,6 +597,7 @@ CONFIG_RTE_LIBRTE_EVENTDEV_DEBUG=n
 CONFIG_RTE_EVENT_MAX_DEVS=16
 CONFIG_RTE_EVENT_MAX_QUEUES_PER_DEV=64
 CONFIG_RTE_EVENT_TIMER_ADAPTER_NUM_MAX=32
+CONFIG_RTE_EVENT_ETH_INTR_RING_SIZE=1024
 CONFIG_RTE_EVENT_CRYPTO_ADAPTER_MAX_INSTANCE=32
 
 #
diff --git a/lib/librte_eventdev/Makefile b/lib/librte_eventdev/Makefile
index b3e2546..e269357 100644
--- a/lib/librte_eventdev/Makefile
+++ b/lib/librte_eventdev/Makefile
@@ -8,14 +8,16 @@ include $(RTE_SDK)/mk/rte.vars.mk
 LIB = librte_eventdev.a
 
 # library version
-LIBABIVER := 4
+LIBABIVER := 5
 
 # build flags
 CFLAGS += -DALLOW_EXPERIMENTAL_API
+CFLAGS += -D_GNU_SOURCE
 CFLAGS += -O3
 CFLAGS += $(WERROR_FLAGS)
 LDLIBS += -lrte_eal -lrte_ring -lrte_ethdev -lrte_hash -lrte_mempool -lrte_timer
 LDLIBS += -lrte_mbuf -lrte_cryptodev
+LDLIBS += -lpthread
 
 # library source files
 SRCS-y += rte_eventdev.c
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 19+ messages in thread

* Re: [PATCH v1 1/4] eventdev: standardize Rx adapter internal function names
  2018-06-08 18:15 ` [PATCH v1 1/4] eventdev: standardize Rx adapter internal function names Nikhil Rao
@ 2018-06-17 13:24   ` Jerin Jacob
  0 siblings, 0 replies; 19+ messages in thread
From: Jerin Jacob @ 2018-06-17 13:24 UTC (permalink / raw)
  To: Nikhil Rao; +Cc: dev

-----Original Message-----
> Date: Fri, 8 Jun 2018 23:45:14 +0530
> From: Nikhil Rao <nikhil.rao@intel.com>
> To: jerin.jacob@caviumnetworks.com
> CC: dev@dpdk.org, Nikhil Rao <nikhil.rao@intel.com>
> Subject: [PATCH v1 1/4] eventdev: standardize Rx adapter internal function
>  names
> X-Mailer: git-send-email 1.8.3.1
> 
> Add a common prefix to function names and rename
> few to better match functionality
> 
> Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>

Acked-by: Jerin Jacob <jerin.jacob@caviumnetworks.com>

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH v1 2/4] eventdev: improve err handling for Rx adapter queue add/del
  2018-06-08 18:15 ` [PATCH v1 2/4] eventdev: improve err handling for Rx adapter queue add/del Nikhil Rao
@ 2018-06-17 13:31   ` Jerin Jacob
  2018-06-18 12:13     ` Rao, Nikhil
  0 siblings, 1 reply; 19+ messages in thread
From: Jerin Jacob @ 2018-06-17 13:31 UTC (permalink / raw)
  To: Nikhil Rao; +Cc: dev

-----Original Message-----
> Date: Fri, 8 Jun 2018 23:45:15 +0530
> From: Nikhil Rao <nikhil.rao@intel.com>
> To: jerin.jacob@caviumnetworks.com
> CC: dev@dpdk.org, Nikhil Rao <nikhil.rao@intel.com>
> Subject: [PATCH v1 2/4] eventdev: improve err handling for Rx adapter queue
>  add/del
> X-Mailer: git-send-email 1.8.3.1
> 
> The new WRR sequence applicable after queue add/del is set
> up after setting the new queue state, so a memory allocation
> failure will leave behind an incorrect state.
> 
> This change separates the memory sizing + allocation for the
> Rx poll and WRR array from calculation of the WRR sequence.
> If there is a memory allocation failure, existing Rx queue
> configuration remains unchanged.
> 
> Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
> ---
> @@ -995,7 +1177,6 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
>  	struct rte_event_eth_rx_adapter *rx_adapter;
>  	struct rte_eventdev *dev;
>  	struct eth_device_info *dev_info;
> -	int start_service;
>  
>  	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
>  	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
> @@ -1038,7 +1219,6 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
>  		return -EINVAL;
>  	}
>  
> -	start_service = 0;
>  	dev_info = &rx_adapter->eth_devices[eth_dev_id];
>  
>  	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
> @@ -1072,16 +1252,13 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
>  			ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
>  					queue_conf);
>  		rte_spinlock_unlock(&rx_adapter->rx_lock);
> -		if (ret == 0)
> -			start_service =
> -				!!rxa_sw_adapter_queue_count(rx_adapter);
>  	}
>  
>  	if (ret)
>  		return ret;
>  
> -	if (start_service)
> -		rte_service_component_runstate_set(rx_adapter->service_id, 1);
> +	rte_service_component_runstate_set(rx_adapter->service_id,
> +				rxa_sw_adapter_queue_count(rx_adapter));

Please move this logic under above !cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT
condition as rte_service not valid for internal ports.


> 

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH v1 3/4] eventdev: move Rx adapter eth receive code to separate function
  2018-06-08 18:15 ` [PATCH v1 3/4] eventdev: move Rx adapter eth receive code to separate function Nikhil Rao
@ 2018-06-17 13:35   ` Jerin Jacob
  0 siblings, 0 replies; 19+ messages in thread
From: Jerin Jacob @ 2018-06-17 13:35 UTC (permalink / raw)
  To: Nikhil Rao; +Cc: dev

-----Original Message-----
> Date: Fri, 8 Jun 2018 23:45:16 +0530
> From: Nikhil Rao <nikhil.rao@intel.com>
> To: jerin.jacob@caviumnetworks.com
> CC: dev@dpdk.org, Nikhil Rao <nikhil.rao@intel.com>
> Subject: [PATCH v1 3/4] eventdev: move Rx adapter eth receive code to
>  separate function
> X-Mailer: git-send-email 1.8.3.1
> 
> Create a separate function that handles eth receive and
> enqueue to event buffer. This function will also be called for
> interrupt driven receive queues.
> 
> Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
> ---

There is two 3/4 patches in this series. Please remove the appropriate
one.

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH v1 3/4] eventdev: move Rx adapter eth Rx to separate function
  2018-06-08 18:15 ` [PATCH v1 3/4] eventdev: move Rx adapter eth Rx " Nikhil Rao
@ 2018-06-17 13:36   ` Jerin Jacob
  0 siblings, 0 replies; 19+ messages in thread
From: Jerin Jacob @ 2018-06-17 13:36 UTC (permalink / raw)
  To: Nikhil Rao; +Cc: dev

-----Original Message-----
> Date: Fri, 8 Jun 2018 23:45:17 +0530
> From: Nikhil Rao <nikhil.rao@intel.com>
> To: jerin.jacob@caviumnetworks.com
> CC: dev@dpdk.org, Nikhil Rao <nikhil.rao@intel.com>
> Subject: [PATCH v1 3/4] eventdev: move Rx adapter eth Rx to separate
>  function
> X-Mailer: git-send-email 1.8.3.1
> 
> Create a separate function that handles eth receive and
> enqueue to event buffer. This function will also be called for
> interrupt driven receive queues.
> 
> Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>

Acked-by: Jerin Jacob <jerin.jacob@caviumnetworks.com>

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH v1 4/4] eventdev: add interrupt driven queues in Rx event adapter
  2018-06-08 18:15 ` [PATCH v1 4/4] eventdev: add interrupt driven queues in Rx event adapter Nikhil Rao
@ 2018-06-17 13:49   ` Jerin Jacob
  2018-06-18 13:15     ` Rao, Nikhil
  0 siblings, 1 reply; 19+ messages in thread
From: Jerin Jacob @ 2018-06-17 13:49 UTC (permalink / raw)
  To: Nikhil Rao; +Cc: dev

-----Original Message-----
> Date: Fri, 8 Jun 2018 23:45:18 +0530
> From: Nikhil Rao <nikhil.rao@intel.com>
> To: jerin.jacob@caviumnetworks.com
> CC: dev@dpdk.org, Nikhil Rao <nikhil.rao@intel.com>
> Subject: [PATCH v1 4/4] eventdev: add interrupt driven queues in Rx event
>  adapter
> X-Mailer: git-send-email 1.8.3.1
> 
> Add support for interrupt driven queues when eth device is
> configured for rxq interrupts and servicing weight for the
> queue is configured to be zero.
> 
> A interrupt driven packet received counter has been added to
> rte_event_eth_rx_adapter_stats.
> 
> Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
> ---
>  lib/librte_eventdev/rte_event_eth_rx_adapter.h     |    5 +-
>  lib/librte_eventdev/rte_event_eth_rx_adapter.c     | 1049 +++++++++++++++++++-
>  test/test/test_event_eth_rx_adapter.c              |  261 ++++-

Please move the testcase to separate patch.

>  .../prog_guide/event_ethernet_rx_adapter.rst       |   24 +
>  config/common_base                                 |    1 +

This patch creates build issue with meson build.
command to reproduce:
--------------------
export MESON_PARAMS='-Dwerror=true -Dexamples=all'
CC="ccache gcc" meson --default-library=shared $MESON_PARAMS gcc-shared-build
ninja -C gcc-shared-build

log:
---
../lib/librte_eventdev/rte_event_eth_rx_adapter.c: In function
‘rxa_intr_ring_check_avail’:
../lib/librte_eventdev/rte_event_eth_rx_adapter.c:916:5: error:
‘RTE_EVENT_ETH_INTR_RING_SIZE’ undeclared (first use in this function);
did you mean ‘RTE_EVENT_DEV_XSTATS_NAME_SIZE’?
     RTE_EVENT_ETH_INTR_RING_SIZE) {
     ^~~~~~~~~~~~~~~~~~~~~~~~~~~~
     RTE_EVENT_DEV_XSTATS_NAME_SIZE
../lib/librte_eventdev/rte_event_eth_rx_adapter.c:916:5: note: each
undeclared identifier is reported only once for each function it appears
in
../lib/librte_eventdev/rte_event_eth_rx_adapter.c: In function
‘rxa_intr_thread’:
../lib/librte_eventdev/rte_event_eth_rx_adapter.c:971:8: error:
‘RTE_EVENT_ETH_INTR_RING_SIZE’ undeclared (first use in this function);
did you mean ‘RTE_EVENT_DEV_XSTATS_NAME_SIZE’?
        RTE_EVENT_ETH_INTR_RING_SIZE + 1, -1);
        ^~~~~~~~~~~~~~~~~~~~~~~~~~~~
>  6 files changed, 1296 insertions(+), 48 deletions(-)
> 
> diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.h b/lib/librte_eventdev/rte_event_eth_rx_adapter.h
> index 307b2b5..97f25e9 100644
> --- a/lib/librte_eventdev/rte_event_eth_rx_adapter.h
> +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.h
> @@ -64,8 +64,7 @@
>   * the service function ID of the adapter in this case.
>   *
>   * Note:
> - * 1) Interrupt driven receive queues are currently unimplemented.
> - * 2) Devices created after an instance of rte_event_eth_rx_adapter_create
> + * 1) Devices created after an instance of rte_event_eth_rx_adapter_create
>   *  should be added to a new instance of the rx adapter.

Can we remove this NOTE and add this check in the code if it is not the
case?

>   */
>  
> @@ -199,6 +198,8 @@ struct rte_event_eth_rx_adapter_stats {
>  	 * block cycles can be used to compute the percentage of
>  	 * cycles the service is blocked by the event device.
>  	 */
> +	uint64_t rx_intr_packets;
> +	/**< Received packet count for interrupt mode Rx queues */
>  };
>  
>  /**
> diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> index 40e9bc9..d038ee4 100644
> --- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> @@ -2,6 +2,8 @@
>   * Copyright(c) 2017 Intel Corporation.
>   * All rights reserved.
>   */
> +#include <unistd.h>
> +#include <sys/epoll.h>
>  #include <rte_cycles.h>
>  #include <rte_common.h>
>  #include <rte_dev.h>
> @@ -11,6 +13,7 @@
>  #include <rte_malloc.h>
>  #include <rte_service_component.h>
>  #include <rte_thash.h>
> +#include <rte_interrupts.h>
>  
>  #include "rte_eventdev.h"
>  #include "rte_eventdev_pmd.h"
> @@ -24,6 +27,36 @@
>  #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
> +static void *
> +rxa_intr_thread(void *arg)
> +{
> +	struct rte_event_eth_rx_adapter *rx_adapter = arg;
> +	struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
> +	int n, i;
> +	uint8_t val;
> +	ssize_t bytes_read;
> +
> +	while (1) {
> +		n = rte_epoll_wait(rx_adapter->epd, epoll_events,
> +				   RTE_EVENT_ETH_INTR_RING_SIZE + 1, -1);

Can you check with FreeBSD if everything is fine or not?

> +		if (unlikely(n < 0))
> +			RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
> +					n);
> +		for (i = 0; i < n; i++) {
> +			if (epoll_events[i].fd == rx_adapter->intr_pipe.readfd)
> +				goto done;
> +			rxa_intr_ring_enqueue(rx_adapter,
> +					epoll_events[i].epdata.data);
> +		}
> +	}
> +
> +done:
> +
> +static int
> +rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
> +{
> +	int err;
> +	uint8_t val;
> +	char thread_name[RTE_MAX_THREAD_NAME_LEN];
> +
> +	if (rx_adapter->intr_ring)
> +		return 0;
> +
> +	rx_adapter->intr_ring = rte_ring_create("intr_ring",
> +					RTE_EVENT_ETH_INTR_RING_SIZE,
> +					rte_socket_id(), 0);
> +	if (!rx_adapter->intr_ring)
> +		return -ENOMEM;
> +
> +	rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
> +					(RTE_EVENT_ETH_INTR_RING_SIZE + 1) *
> +					sizeof(struct rte_epoll_event),
> +					RTE_CACHE_LINE_SIZE,
> +					rx_adapter->socket_id);
> +	if (!rx_adapter->epoll_events) {
> +		err = -ENOMEM;
> +		goto error;
> +	}
> +
> +	rte_spinlock_init(&rx_adapter->intr_ring_lock);
> +
> +	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
> +			"rx-intr-thread");
> +	err = pthread_create(&rx_adapter->rx_intr_thread, NULL,
> +			rxa_intr_thread, rx_adapter);


Can you replace the pthread_* with new rte_ctrl_thread_create()
abstraction ?

>  #
> diff --git a/lib/librte_eventdev/Makefile b/lib/librte_eventdev/Makefile
> index b3e2546..e269357 100644
> --- a/lib/librte_eventdev/Makefile
> +++ b/lib/librte_eventdev/Makefile
> @@ -8,14 +8,16 @@ include $(RTE_SDK)/mk/rte.vars.mk
>  LIB = librte_eventdev.a
>  
>  # library version
> -LIBABIVER := 4
> +LIBABIVER := 5
>  
>  # build flags
>  CFLAGS += -DALLOW_EXPERIMENTAL_API
> +CFLAGS += -D_GNU_SOURCE
>  CFLAGS += -O3
>  CFLAGS += $(WERROR_FLAGS)
>  LDLIBS += -lrte_eal -lrte_ring -lrte_ethdev -lrte_hash -lrte_mempool -lrte_timer
>  LDLIBS += -lrte_mbuf -lrte_cryptodev
> +LDLIBS += -lpthread

This may not be required if we add rte_ctrl_thread library.

>  
>  # library source files
>  SRCS-y += rte_eventdev.c
> -- 
> 1.8.3.1
> 

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH v1 2/4] eventdev: improve err handling for Rx adapter queue add/del
  2018-06-17 13:31   ` Jerin Jacob
@ 2018-06-18 12:13     ` Rao, Nikhil
  0 siblings, 0 replies; 19+ messages in thread
From: Rao, Nikhil @ 2018-06-18 12:13 UTC (permalink / raw)
  To: Jerin Jacob; +Cc: dev



On 6/17/2018 7:01 PM, Jerin Jacob wrote:
> -----Original Message-----
>> Date: Fri, 8 Jun 2018 23:45:15 +0530
>> From: Nikhil Rao <nikhil.rao@intel.com>
>> To: jerin.jacob@caviumnetworks.com
>> CC: dev@dpdk.org, Nikhil Rao <nikhil.rao@intel.com>
>> Subject: [PATCH v1 2/4] eventdev: improve err handling for Rx adapter queue
>>   add/del
>> X-Mailer: git-send-email 1.8.3.1
>>
>> The new WRR sequence applicable after queue add/del is set
>> up after setting the new queue state, so a memory allocation
>> failure will leave behind an incorrect state.
>>
>> This change separates the memory sizing + allocation for the
>> Rx poll and WRR array from calculation of the WRR sequence.
>> If there is a memory allocation failure, existing Rx queue
>> configuration remains unchanged.
>>
>> Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
>> ---
>> @@ -995,7 +1177,6 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
>>   	struct rte_event_eth_rx_adapter *rx_adapter;
>>   	struct rte_eventdev *dev;
>>   	struct eth_device_info *dev_info;
>> -	int start_service;
>>   
>>   	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
>>   	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
>> @@ -1038,7 +1219,6 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
>>   		return -EINVAL;
>>   	}
>>   
>> -	start_service = 0;
>>   	dev_info = &rx_adapter->eth_devices[eth_dev_id];
>>   
>>   	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
>> @@ -1072,16 +1252,13 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
>>   			ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
>>   					queue_conf);
>>   		rte_spinlock_unlock(&rx_adapter->rx_lock);
>> -		if (ret == 0)
>> -			start_service =
>> -				!!rxa_sw_adapter_queue_count(rx_adapter);
>>   	}
>>   
>>   	if (ret)
>>   		return ret;
>>   
>> -	if (start_service)
>> -		rte_service_component_runstate_set(rx_adapter->service_id, 1);
>> +	rte_service_component_runstate_set(rx_adapter->service_id,
>> +				rxa_sw_adapter_queue_count(rx_adapter));
> Please move this logic under above !cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT
> condition as rte_service not valid for internal ports.
Ok.

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH v1 4/4] eventdev: add interrupt driven queues in Rx event adapter
  2018-06-17 13:49   ` Jerin Jacob
@ 2018-06-18 13:15     ` Rao, Nikhil
  0 siblings, 0 replies; 19+ messages in thread
From: Rao, Nikhil @ 2018-06-18 13:15 UTC (permalink / raw)
  To: Jerin Jacob; +Cc: dev

On 6/17/2018 7:19 PM, Jerin Jacob wrote:

> -----Original Message-----
>> Date: Fri, 8 Jun 2018 23:45:18 +0530
>> From: Nikhil Rao <nikhil.rao@intel.com>
>> To: jerin.jacob@caviumnetworks.com
>> CC: dev@dpdk.org, Nikhil Rao <nikhil.rao@intel.com>
>> Subject: [PATCH v1 4/4] eventdev: add interrupt driven queues in Rx event
>>   adapter
>> X-Mailer: git-send-email 1.8.3.1
>>
>> Add support for interrupt driven queues when eth device is
>> configured for rxq interrupts and servicing weight for the
>> queue is configured to be zero.
>>
>> A interrupt driven packet received counter has been added to
>> rte_event_eth_rx_adapter_stats.
>>
>> Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
>> ---
>>   lib/librte_eventdev/rte_event_eth_rx_adapter.h     |    5 +-
>>   lib/librte_eventdev/rte_event_eth_rx_adapter.c     | 1049 +++++++++++++++++++-
>>   test/test/test_event_eth_rx_adapter.c              |  261 ++++-
> Please move the testcase to separate patch.
>
>>   .../prog_guide/event_ethernet_rx_adapter.rst       |   24 +
>>   config/common_base                                 |    1 +
> This patch creates build issue with meson build.
> command to reproduce:
> --------------------
> export MESON_PARAMS='-Dwerror=true -Dexamples=all'
> CC="ccache gcc" meson --default-library=shared $MESON_PARAMS gcc-shared-build
> ninja -C gcc-shared-build
>
> log:
> ---
> ../lib/librte_eventdev/rte_event_eth_rx_adapter.c: In function
> ‘rxa_intr_ring_check_avail’:
> ../lib/librte_eventdev/rte_event_eth_rx_adapter.c:916:5: error:
> ‘RTE_EVENT_ETH_INTR_RING_SIZE’ undeclared (first use in this function);
> did you mean ‘RTE_EVENT_DEV_XSTATS_NAME_SIZE’?
>       RTE_EVENT_ETH_INTR_RING_SIZE) {
>       ^~~~~~~~~~~~~~~~~~~~~~~~~~~~
>       RTE_EVENT_DEV_XSTATS_NAME_SIZE
> ../lib/librte_eventdev/rte_event_eth_rx_adapter.c:916:5: note: each
> undeclared identifier is reported only once for each function it appears
> in
> ../lib/librte_eventdev/rte_event_eth_rx_adapter.c: In function
> ‘rxa_intr_thread’:
> ../lib/librte_eventdev/rte_event_eth_rx_adapter.c:971:8: error:
> ‘RTE_EVENT_ETH_INTR_RING_SIZE’ undeclared (first use in this function);
> did you mean ‘RTE_EVENT_DEV_XSTATS_NAME_SIZE’?
>          RTE_EVENT_ETH_INTR_RING_SIZE + 1, -1);
>          ^~~~~~~~~~~~~~~~~~~~~~~~~~~~
>>   6 files changed, 1296 insertions(+), 48 deletions(-)
>>
>> diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.h b/lib/librte_eventdev/rte_event_eth_rx_adapter.h
>> index 307b2b5..97f25e9 100644
>> --- a/lib/librte_eventdev/rte_event_eth_rx_adapter.h
>> +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.h
>> @@ -64,8 +64,7 @@
>>    * the service function ID of the adapter in this case.
>>    *
>>    * Note:
>> - * 1) Interrupt driven receive queues are currently unimplemented.
>> - * 2) Devices created after an instance of rte_event_eth_rx_adapter_create
>> + * 1) Devices created after an instance of rte_event_eth_rx_adapter_create
>>    *  should be added to a new instance of the rx adapter.
> Can we remove this NOTE and add this check in the code if it is not the
> case?
OK.
>>    */
>>   
>> @@ -199,6 +198,8 @@ struct rte_event_eth_rx_adapter_stats {
>>   	 * block cycles can be used to compute the percentage of
>>   	 * cycles the service is blocked by the event device.
>>   	 */
>> +	uint64_t rx_intr_packets;
>> +	/**< Received packet count for interrupt mode Rx queues */
>>   };
>>   
>>   /**
>> diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> index 40e9bc9..d038ee4 100644
>> --- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> @@ -2,6 +2,8 @@
>>    * Copyright(c) 2017 Intel Corporation.
>>    * All rights reserved.
>>    */
>> +#include <unistd.h>
>> +#include <sys/epoll.h>
>>   #include <rte_cycles.h>
>>   #include <rte_common.h>
>>   #include <rte_dev.h>
>> @@ -11,6 +13,7 @@
>>   #include <rte_malloc.h>
>>   #include <rte_service_component.h>
>>   #include <rte_thash.h>
>> +#include <rte_interrupts.h>
>>   
>>   #include "rte_eventdev.h"
>>   #include "rte_eventdev_pmd.h"
>> @@ -24,6 +27,36 @@
>>   #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
>> +static void *
>> +rxa_intr_thread(void *arg)
>> +{
>> +	struct rte_event_eth_rx_adapter *rx_adapter = arg;
>> +	struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
>> +	int n, i;
>> +	uint8_t val;
>> +	ssize_t bytes_read;
>> +
>> +	while (1) {
>> +		n = rte_epoll_wait(rx_adapter->epd, epoll_events,
>> +				   RTE_EVENT_ETH_INTR_RING_SIZE + 1, -1);
> Can you check with FreeBSD if everything is fine or not?
Interrupt functionality works only on Linux (rte_epoll_wait() etc are 
implemented only in Linux.) or I am not understanding your question 
correctly ?
>> +		if (unlikely(n < 0))
>> +			RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
>> +					n);
>> +		for (i = 0; i < n; i++) {
>> +			if (epoll_events[i].fd == rx_adapter->intr_pipe.readfd)
>> +				goto done;
>> +			rxa_intr_ring_enqueue(rx_adapter,
>> +					epoll_events[i].epdata.data);
>> +		}
>> +	}
>> +
>> +done:
>> +
>> +static int
>> +rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
>> +{
>> +	int err;
>> +	uint8_t val;
>> +	char thread_name[RTE_MAX_THREAD_NAME_LEN];
>> +
>> +	if (rx_adapter->intr_ring)
>> +		return 0;
>> +
>> +	rx_adapter->intr_ring = rte_ring_create("intr_ring",
>> +					RTE_EVENT_ETH_INTR_RING_SIZE,
>> +					rte_socket_id(), 0);
>> +	if (!rx_adapter->intr_ring)
>> +		return -ENOMEM;
>> +
>> +	rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
>> +					(RTE_EVENT_ETH_INTR_RING_SIZE + 1) *
>> +					sizeof(struct rte_epoll_event),
>> +					RTE_CACHE_LINE_SIZE,
>> +					rx_adapter->socket_id);
>> +	if (!rx_adapter->epoll_events) {
>> +		err = -ENOMEM;
>> +		goto error;
>> +	}
>> +
>> +	rte_spinlock_init(&rx_adapter->intr_ring_lock);
>> +
>> +	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
>> +			"rx-intr-thread");
>> +	err = pthread_create(&rx_adapter->rx_intr_thread, NULL,
>> +			rxa_intr_thread, rx_adapter);
>
> Can you replace the pthread_* with new rte_ctrl_thread_create()
> abstraction ?
OK.
>>   #
>> diff --git a/lib/librte_eventdev/Makefile b/lib/librte_eventdev/Makefile
>> index b3e2546..e269357 100644
>> --- a/lib/librte_eventdev/Makefile
>> +++ b/lib/librte_eventdev/Makefile
>> @@ -8,14 +8,16 @@ include $(RTE_SDK)/mk/rte.vars.mk
>>   LIB = librte_eventdev.a
>>   
>>   # library version
>> -LIBABIVER := 4
>> +LIBABIVER := 5
>>   
>>   # build flags
>>   CFLAGS += -DALLOW_EXPERIMENTAL_API
>> +CFLAGS += -D_GNU_SOURCE
>>   CFLAGS += -O3
>>   CFLAGS += $(WERROR_FLAGS)
>>   LDLIBS += -lrte_eal -lrte_ring -lrte_ethdev -lrte_hash -lrte_mempool -lrte_timer
>>   LDLIBS += -lrte_mbuf -lrte_cryptodev
>> +LDLIBS += -lpthread
> This may not be required if we add rte_ctrl_thread library.
>
>>   
>>   # library source files
>>   SRCS-y += rte_eventdev.c
>> -- 
>> 1.8.3.1
>>
Thanks for the review.
Nikhil

^ permalink raw reply	[flat|nested] 19+ messages in thread

* [PATCH v2 0/5] eventdev: add interrupt driven queues to Rx adapter
  2018-06-08 18:15 [PATCH v1 0/4] eventdev: add interrupt driven queues to Rx adapter Nikhil Rao
                   ` (4 preceding siblings ...)
  2018-06-08 18:15 ` [PATCH v1 4/4] eventdev: add interrupt driven queues in Rx event adapter Nikhil Rao
@ 2018-06-27 10:55 ` Nikhil Rao
  2018-06-27 10:55   ` [PATCH v2 1/5] eventdev: standardize Rx adapter internal function names Nikhil Rao
                     ` (4 more replies)
  5 siblings, 5 replies; 19+ messages in thread
From: Nikhil Rao @ 2018-06-27 10:55 UTC (permalink / raw)
  To: jerin.jacob; +Cc: nikhil.rao, dev

This patch series adds support for interrupt driven queues to the
ethernet Rx adapter, the first 3 patches prepare the code to
handle both poll and interrupt driven Rx queues, the 4th patch
patch has code changes specific to interrupt driven queues and
the final patch has test code.

Changelog:

v1->v2:

* move rte_service_component_runstate_set such that it
  is called only when cap & RTE__EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT
  is false. (Jerin Jacob)

* Fix meson build. (Jerin Jacob)

* Replace calls to pthread_* with rte_ctrl_thread_create().
  (Jerin Jacob)

* Move adapter test code to separate patch. (Jerin Jacob)

Note: I haven't removed the note about devices created
rte_event_eth_rx_adapter_create, will fix in a separate patch.


Nikhil Rao (5):
  eventdev: standardize Rx adapter internal function names
  eventdev: improve err handling for Rx adapter queue add/del
  eventdev: move Rx adapter eth Rx to separate function
  eventdev: add interrupt driven queues to Rx adapter
  eventdev: add Rx adapter tests for interrupt driven queues

 config/rte_config.h                                |    1 +
 lib/librte_eventdev/rte_event_eth_rx_adapter.h     |    5 +-
 lib/librte_eventdev/rte_event_eth_rx_adapter.c     | 1509 +++++++++++++++++---
 test/test/test_event_eth_rx_adapter.c              |  261 +++-
 .../prog_guide/event_ethernet_rx_adapter.rst       |   24 +
 config/common_base                                 |    1 +
 lib/librte_eventdev/Makefile                       |    2 +-
 7 files changed, 1565 insertions(+), 238 deletions(-)

-- 
1.8.3.1

^ permalink raw reply	[flat|nested] 19+ messages in thread

* [PATCH v2 1/5] eventdev: standardize Rx adapter internal function names
  2018-06-27 10:55 ` [PATCH v2 0/5] eventdev: add interrupt driven queues to Rx adapter Nikhil Rao
@ 2018-06-27 10:55   ` Nikhil Rao
  2018-06-27 10:55   ` [PATCH v2 2/5] eventdev: improve err handling for Rx adapter queue add/del Nikhil Rao
                     ` (3 subsequent siblings)
  4 siblings, 0 replies; 19+ messages in thread
From: Nikhil Rao @ 2018-06-27 10:55 UTC (permalink / raw)
  To: jerin.jacob; +Cc: nikhil.rao, dev

Add a common prefix to function names and rename
few to better match functionality

Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
Acked-by: Jerin Jacob <jerin.jacob@caviumnetworks.com>
---
 lib/librte_eventdev/rte_event_eth_rx_adapter.c | 167 ++++++++++++-------------
 1 file changed, 80 insertions(+), 87 deletions(-)

diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
index ce1f62d..9361d48 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
@@ -129,30 +129,30 @@ struct eth_rx_queue_info {
 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
 
 static inline int
-valid_id(uint8_t id)
+rxa_validate_id(uint8_t id)
 {
 	return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
 }
 
 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
-	if (!valid_id(id)) { \
+	if (!rxa_validate_id(id)) { \
 		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
 		return retval; \
 	} \
 } while (0)
 
 static inline int
-sw_rx_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
+rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
 {
 	return rx_adapter->num_rx_polled;
 }
 
 /* Greatest common divisor */
-static uint16_t gcd_u16(uint16_t a, uint16_t b)
+static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 {
 	uint16_t r = a % b;
 
-	return r ? gcd_u16(b, r) : b;
+	return r ? rxa_gcd_u16(b, r) : b;
 }
 
 /* Returns the next queue in the polling sequence
@@ -160,7 +160,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
  */
 static int
-wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
+rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
 	 unsigned int n, int *cw,
 	 struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
 	 uint16_t gcd, int prev)
@@ -190,7 +190,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 
 /* Precalculate WRR polling sequence for all queues in rx_adapter */
 static int
-eth_poll_wrr_calc(struct rte_event_eth_rx_adapter *rx_adapter)
+rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter)
 {
 	uint16_t d;
 	uint16_t q;
@@ -239,7 +239,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 				rx_poll[poll_q].eth_rx_qid = q;
 				max_wrr_pos += wt;
 				max_wt = RTE_MAX(max_wt, wt);
-				gcd = (gcd) ? gcd_u16(gcd, wt) : wt;
+				gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
 				poll_q++;
 			}
 		}
@@ -259,7 +259,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 		int prev = -1;
 		int cw = -1;
 		for (i = 0; i < max_wrr_pos; i++) {
-			rx_wrr[i] = wrr_next(rx_adapter, poll_q, &cw,
+			rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
 					     rx_poll, max_wt, gcd, prev);
 			prev = rx_wrr[i];
 		}
@@ -276,7 +276,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 }
 
 static inline void
-mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
+rxa_mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
 	struct ipv6_hdr **ipv6_hdr)
 {
 	struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
@@ -315,7 +315,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 
 /* Calculate RSS hash for IPv4/6 */
 static inline uint32_t
-do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
+rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
 {
 	uint32_t input_len;
 	void *tuple;
@@ -324,7 +324,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 	struct ipv4_hdr *ipv4_hdr;
 	struct ipv6_hdr *ipv6_hdr;
 
-	mtoip(m, &ipv4_hdr, &ipv6_hdr);
+	rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
 
 	if (ipv4_hdr) {
 		ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
@@ -343,13 +343,13 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 }
 
 static inline int
-rx_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
+rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
 {
 	return !!rx_adapter->enq_block_count;
 }
 
 static inline void
-rx_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
+rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
 {
 	if (rx_adapter->rx_enq_block_start_ts)
 		return;
@@ -362,13 +362,13 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 }
 
 static inline void
-rx_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
+rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
 		    struct rte_event_eth_rx_adapter_stats *stats)
 {
 	if (unlikely(!stats->rx_enq_start_ts))
 		stats->rx_enq_start_ts = rte_get_tsc_cycles();
 
-	if (likely(!rx_enq_blocked(rx_adapter)))
+	if (likely(!rxa_enq_blocked(rx_adapter)))
 		return;
 
 	rx_adapter->enq_block_count = 0;
@@ -384,8 +384,8 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
  * this function
  */
 static inline void
-buf_event_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
-		  struct rte_event *ev)
+rxa_buffer_event(struct rte_event_eth_rx_adapter *rx_adapter,
+		struct rte_event *ev)
 {
 	struct rte_eth_event_enqueue_buffer *buf =
 	    &rx_adapter->event_enqueue_buffer;
@@ -394,7 +394,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 
 /* Enqueue buffered events to event device */
 static inline uint16_t
-flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
+rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
 {
 	struct rte_eth_event_enqueue_buffer *buf =
 	    &rx_adapter->event_enqueue_buffer;
@@ -411,8 +411,8 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 		stats->rx_enq_retry++;
 	}
 
-	n ? rx_enq_block_end_ts(rx_adapter, stats) :
-		rx_enq_block_start_ts(rx_adapter);
+	n ? rxa_enq_block_end_ts(rx_adapter, stats) :
+		rxa_enq_block_start_ts(rx_adapter);
 
 	buf->count -= n;
 	stats->rx_enq_count += n;
@@ -421,11 +421,11 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 }
 
 static inline void
-fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter,
-	uint16_t eth_dev_id,
-	uint16_t rx_queue_id,
-	struct rte_mbuf **mbufs,
-	uint16_t num)
+rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
+		uint16_t eth_dev_id,
+		uint16_t rx_queue_id,
+		struct rte_mbuf **mbufs,
+		uint16_t num)
 {
 	uint32_t i;
 	struct eth_device_info *eth_device_info =
@@ -463,7 +463,8 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 		struct rte_event *ev = &events[i];
 
 		rss = do_rss ?
-			do_softrss(m, rx_adapter->rss_key_be) : m->hash.rss;
+			rxa_do_softrss(m, rx_adapter->rss_key_be) :
+			m->hash.rss;
 		flow_id =
 		    eth_rx_queue_info->flow_id &
 				eth_rx_queue_info->flow_id_mask;
@@ -477,7 +478,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 		ev->priority = priority;
 		ev->mbuf = m;
 
-		buf_event_enqueue(rx_adapter, ev);
+		rxa_buffer_event(rx_adapter, ev);
 	}
 }
 
@@ -495,7 +496,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
  * it.
  */
 static inline void
-eth_rx_poll(struct rte_event_eth_rx_adapter *rx_adapter)
+rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
 {
 	uint32_t num_queue;
 	uint16_t n;
@@ -520,7 +521,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 		 * enough space in the enqueue buffer.
 		 */
 		if (buf->count >= BATCH_SIZE)
-			flush_event_buffer(rx_adapter);
+			rxa_flush_event_buffer(rx_adapter);
 		if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
 			rx_adapter->wrr_pos = wrr_pos;
 			return;
@@ -534,7 +535,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 			/* The check before rte_eth_rx_burst() ensures that
 			 * all n mbufs can be buffered
 			 */
-			fill_event_buffer(rx_adapter, d, qid, mbufs, n);
+			rxa_buffer_mbufs(rx_adapter, d, qid, mbufs, n);
 			nb_rx += n;
 			if (nb_rx > max_nb_rx) {
 				rx_adapter->wrr_pos =
@@ -548,11 +549,11 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 	}
 
 	if (buf->count >= BATCH_SIZE)
-		flush_event_buffer(rx_adapter);
+		rxa_flush_event_buffer(rx_adapter);
 }
 
 static int
-event_eth_rx_adapter_service_func(void *args)
+rxa_service_func(void *args)
 {
 	struct rte_event_eth_rx_adapter *rx_adapter = args;
 
@@ -562,7 +563,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 		return 0;
 		rte_spinlock_unlock(&rx_adapter->rx_lock);
 	}
-	eth_rx_poll(rx_adapter);
+	rxa_poll(rx_adapter);
 	rte_spinlock_unlock(&rx_adapter->rx_lock);
 	return 0;
 }
@@ -594,14 +595,14 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 }
 
 static inline struct rte_event_eth_rx_adapter *
-id_to_rx_adapter(uint8_t id)
+rxa_id_to_adapter(uint8_t id)
 {
 	return event_eth_rx_adapter ?
 		event_eth_rx_adapter[id] : NULL;
 }
 
 static int
-default_conf_cb(uint8_t id, uint8_t dev_id,
+rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
 		struct rte_event_eth_rx_adapter_conf *conf, void *arg)
 {
 	int ret;
@@ -610,7 +611,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 	int started;
 	uint8_t port_id;
 	struct rte_event_port_conf *port_conf = arg;
-	struct rte_event_eth_rx_adapter *rx_adapter = id_to_rx_adapter(id);
+	struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
 
 	dev = &rte_eventdevs[rx_adapter->eventdev_id];
 	dev_conf = dev->data->dev_conf;
@@ -647,7 +648,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 }
 
 static int
-init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
+rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
 {
 	int ret;
 	struct rte_service_spec service;
@@ -660,7 +661,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 	snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
 		"rte_event_eth_rx_adapter_%d", id);
 	service.socket_id = rx_adapter->socket_id;
-	service.callback = event_eth_rx_adapter_service_func;
+	service.callback = rxa_service_func;
 	service.callback_userdata = rx_adapter;
 	/* Service function handles locking for queue add/del updates */
 	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
@@ -688,9 +689,8 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 	return ret;
 }
 
-
 static void
-update_queue_info(struct rte_event_eth_rx_adapter *rx_adapter,
+rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 		struct eth_device_info *dev_info,
 		int32_t rx_queue_id,
 		uint8_t add)
@@ -704,7 +704,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 
 	if (rx_queue_id == -1) {
 		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
-			update_queue_info(rx_adapter, dev_info, i, add);
+			rxa_update_queue(rx_adapter, dev_info, i, add);
 	} else {
 		queue_info = &dev_info->rx_queue[rx_queue_id];
 		enabled = queue_info->queue_enabled;
@@ -720,9 +720,9 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 }
 
 static int
-event_eth_rx_adapter_queue_del(struct rte_event_eth_rx_adapter *rx_adapter,
-			    struct eth_device_info *dev_info,
-			    uint16_t rx_queue_id)
+rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
+	struct eth_device_info *dev_info,
+	uint16_t rx_queue_id)
 {
 	struct eth_rx_queue_info *queue_info;
 
@@ -731,15 +731,15 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 
 	queue_info = &dev_info->rx_queue[rx_queue_id];
 	rx_adapter->num_rx_polled -= queue_info->queue_enabled;
-	update_queue_info(rx_adapter, dev_info, rx_queue_id, 0);
+	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
 	return 0;
 }
 
 static void
-event_eth_rx_adapter_queue_add(struct rte_event_eth_rx_adapter *rx_adapter,
-		struct eth_device_info *dev_info,
-		uint16_t rx_queue_id,
-		const struct rte_event_eth_rx_adapter_queue_conf *conf)
+rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
+	struct eth_device_info *dev_info,
+	uint16_t rx_queue_id,
+	const struct rte_event_eth_rx_adapter_queue_conf *conf)
 
 {
 	struct eth_rx_queue_info *queue_info;
@@ -759,10 +759,10 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b)
 
 	/* The same queue can be added more than once */
 	rx_adapter->num_rx_polled += !queue_info->queue_enabled;
-	update_queue_info(rx_adapter, dev_info, rx_queue_id, 1);
+	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
 }
 
-static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
+static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 		uint16_t eth_dev_id,
 		int rx_queue_id,
 		const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
@@ -799,19 +799,15 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 
 	if (rx_queue_id == -1) {
 		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
-			event_eth_rx_adapter_queue_add(rx_adapter,
-						dev_info, i,
-						queue_conf);
+			rxa_add_queue(rx_adapter, dev_info, i, queue_conf);
 	} else {
-		event_eth_rx_adapter_queue_add(rx_adapter, dev_info,
-					  (uint16_t)rx_queue_id,
-					  queue_conf);
+		rxa_add_queue(rx_adapter, dev_info, (uint16_t)rx_queue_id,
+			queue_conf);
 	}
 
-	ret = eth_poll_wrr_calc(rx_adapter);
+	ret = rxa_calc_wrr_sequence(rx_adapter);
 	if (ret) {
-		event_eth_rx_adapter_queue_del(rx_adapter,
-					dev_info, rx_queue_id);
+		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
 		return ret;
 	}
 
@@ -819,7 +815,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 }
 
 static int
-rx_adapter_ctrl(uint8_t id, int start)
+rxa_ctrl(uint8_t id, int start)
 {
 	struct rte_event_eth_rx_adapter *rx_adapter;
 	struct rte_eventdev *dev;
@@ -829,7 +825,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 	int stop = !start;
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
-	rx_adapter = id_to_rx_adapter(id);
+	rx_adapter = rxa_id_to_adapter(id);
 	if (rx_adapter == NULL)
 		return -EINVAL;
 
@@ -892,7 +888,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 			return ret;
 	}
 
-	rx_adapter = id_to_rx_adapter(id);
+	rx_adapter = rxa_id_to_adapter(id);
 	if (rx_adapter != NULL) {
 		RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
 		return -EEXIST;
@@ -934,7 +930,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 		rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
 
 	event_eth_rx_adapter[id] = rx_adapter;
-	if (conf_cb == default_conf_cb)
+	if (conf_cb == rxa_default_conf_cb)
 		rx_adapter->default_cb_arg = 1;
 	return 0;
 }
@@ -955,7 +951,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 		return -ENOMEM;
 	*pc = *port_config;
 	ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
-					default_conf_cb,
+					rxa_default_conf_cb,
 					pc);
 	if (ret)
 		rte_free(pc);
@@ -969,7 +965,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 
-	rx_adapter = id_to_rx_adapter(id);
+	rx_adapter = rxa_id_to_adapter(id);
 	if (rx_adapter == NULL)
 		return -EINVAL;
 
@@ -1004,7 +1000,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
 
-	rx_adapter = id_to_rx_adapter(id);
+	rx_adapter = rxa_id_to_adapter(id);
 	if ((rx_adapter == NULL) || (queue_conf == NULL))
 		return -EINVAL;
 
@@ -1063,7 +1059,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 				rx_queue_id, queue_conf);
 		if (ret == 0) {
 			dev_info->internal_event_port = 1;
-			update_queue_info(rx_adapter,
+			rxa_update_queue(rx_adapter,
 					&rx_adapter->eth_devices[eth_dev_id],
 					rx_queue_id,
 					1);
@@ -1071,13 +1067,14 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 	} else {
 		rte_spinlock_lock(&rx_adapter->rx_lock);
 		dev_info->internal_event_port = 0;
-		ret = init_service(rx_adapter, id);
+		ret = rxa_init_service(rx_adapter, id);
 		if (ret == 0)
-			ret = add_rx_queue(rx_adapter, eth_dev_id, rx_queue_id,
+			ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
 					queue_conf);
 		rte_spinlock_unlock(&rx_adapter->rx_lock);
 		if (ret == 0)
-			start_service = !!sw_rx_adapter_queue_count(rx_adapter);
+			start_service =
+				!!rxa_sw_adapter_queue_count(rx_adapter);
 	}
 
 	if (ret)
@@ -1103,7 +1100,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
 
-	rx_adapter = id_to_rx_adapter(id);
+	rx_adapter = rxa_id_to_adapter(id);
 	if (rx_adapter == NULL)
 		return -EINVAL;
 
@@ -1130,7 +1127,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 						&rte_eth_devices[eth_dev_id],
 						rx_queue_id);
 		if (ret == 0) {
-			update_queue_info(rx_adapter,
+			rxa_update_queue(rx_adapter,
 					&rx_adapter->eth_devices[eth_dev_id],
 					rx_queue_id,
 					0);
@@ -1144,16 +1141,12 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 		rte_spinlock_lock(&rx_adapter->rx_lock);
 		if (rx_queue_id == -1) {
 			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
-				event_eth_rx_adapter_queue_del(rx_adapter,
-							dev_info,
-							i);
+				rxa_sw_del(rx_adapter, dev_info, i);
 		} else {
-			event_eth_rx_adapter_queue_del(rx_adapter,
-						dev_info,
-						(uint16_t)rx_queue_id);
+			rxa_sw_del(rx_adapter, dev_info, (uint16_t)rx_queue_id);
 		}
 
-		rc = eth_poll_wrr_calc(rx_adapter);
+		rc = rxa_calc_wrr_sequence(rx_adapter);
 		if (rc)
 			RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32,
 					rc);
@@ -1165,7 +1158,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 
 		rte_spinlock_unlock(&rx_adapter->rx_lock);
 		rte_service_component_runstate_set(rx_adapter->service_id,
-				sw_rx_adapter_queue_count(rx_adapter));
+				rxa_sw_adapter_queue_count(rx_adapter));
 	}
 
 	return ret;
@@ -1175,13 +1168,13 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 int
 rte_event_eth_rx_adapter_start(uint8_t id)
 {
-	return rx_adapter_ctrl(id, 1);
+	return rxa_ctrl(id, 1);
 }
 
 int
 rte_event_eth_rx_adapter_stop(uint8_t id)
 {
-	return rx_adapter_ctrl(id, 0);
+	return rxa_ctrl(id, 0);
 }
 
 int
@@ -1198,7 +1191,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 
-	rx_adapter = id_to_rx_adapter(id);
+	rx_adapter = rxa_id_to_adapter(id);
 	if (rx_adapter  == NULL || stats == NULL)
 		return -EINVAL;
 
@@ -1236,7 +1229,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 
-	rx_adapter = id_to_rx_adapter(id);
+	rx_adapter = rxa_id_to_adapter(id);
 	if (rx_adapter == NULL)
 		return -EINVAL;
 
@@ -1261,7 +1254,7 @@ static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 
-	rx_adapter = id_to_rx_adapter(id);
+	rx_adapter = rxa_id_to_adapter(id);
 	if (rx_adapter == NULL || service_id == NULL)
 		return -EINVAL;
 
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v2 2/5] eventdev: improve err handling for Rx adapter queue add/del
  2018-06-27 10:55 ` [PATCH v2 0/5] eventdev: add interrupt driven queues to Rx adapter Nikhil Rao
  2018-06-27 10:55   ` [PATCH v2 1/5] eventdev: standardize Rx adapter internal function names Nikhil Rao
@ 2018-06-27 10:55   ` Nikhil Rao
  2018-06-27 10:55   ` [PATCH v2 3/5] eventdev: move Rx adapter eth Rx to separate function Nikhil Rao
                     ` (2 subsequent siblings)
  4 siblings, 0 replies; 19+ messages in thread
From: Nikhil Rao @ 2018-06-27 10:55 UTC (permalink / raw)
  To: jerin.jacob; +Cc: nikhil.rao, dev

The new WRR sequence applicable after queue add/del is set
up after setting the new queue state, so a memory allocation
failure will leave behind an incorrect state.

This change separates the memory sizing + allocation for the
Rx poll and WRR array from calculation of the WRR sequence.
If there is a memory allocation failure, existing Rx queue
configuration remains unchanged.

Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
---
 lib/librte_eventdev/rte_event_eth_rx_adapter.c | 418 ++++++++++++++++++-------
 1 file changed, 302 insertions(+), 116 deletions(-)

diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
index 9361d48..926f83a 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
@@ -109,10 +109,16 @@ struct eth_device_info {
 	 * rx_adapter_stop callback needs to be invoked
 	 */
 	uint8_t dev_rx_started;
-	/* If nb_dev_queues > 0, the start callback will
+	/* Number of queues added for this device */
+	uint16_t nb_dev_queues;
+	/* If nb_rx_poll > 0, the start callback will
 	 * be invoked if not already invoked
 	 */
-	uint16_t nb_dev_queues;
+	uint16_t nb_rx_poll;
+	/* sum(wrr(q)) for all queues within the device
+	 * useful when deleting all device queues
+	 */
+	uint32_t wrr_len;
 };
 
 /* Per Rx queue */
@@ -188,13 +194,170 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	}
 }
 
-/* Precalculate WRR polling sequence for all queues in rx_adapter */
+static inline int
+rxa_polled_queue(struct eth_device_info *dev_info,
+	int rx_queue_id)
+{
+	struct eth_rx_queue_info *queue_info;
+
+	queue_info = &dev_info->rx_queue[rx_queue_id];
+	return !dev_info->internal_event_port &&
+		dev_info->rx_queue &&
+		queue_info->queue_enabled && queue_info->wt != 0;
+}
+
+/* Calculate size of the eth_rx_poll and wrr_sched arrays
+ * after deleting poll mode rx queues
+ */
+static void
+rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
+			struct eth_device_info *dev_info,
+			int rx_queue_id,
+			uint32_t *nb_rx_poll,
+			uint32_t *nb_wrr)
+{
+	uint32_t poll_diff;
+	uint32_t wrr_len_diff;
+
+	if (rx_queue_id == -1) {
+		poll_diff = dev_info->nb_rx_poll;
+		wrr_len_diff = dev_info->wrr_len;
+	} else {
+		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
+		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
+					0;
+	}
+
+	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
+	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
+}
+
+/* Calculate nb_rx_* after adding poll mode rx queues
+ */
+static void
+rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
+			struct eth_device_info *dev_info,
+			int rx_queue_id,
+			uint16_t wt,
+			uint32_t *nb_rx_poll,
+			uint32_t *nb_wrr)
+{
+	uint32_t poll_diff;
+	uint32_t wrr_len_diff;
+
+	if (rx_queue_id == -1) {
+		poll_diff = dev_info->dev->data->nb_rx_queues -
+						dev_info->nb_rx_poll;
+		wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
+				- dev_info->wrr_len;
+	} else {
+		poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
+		wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
+				wt - dev_info->rx_queue[rx_queue_id].wt :
+				wt;
+	}
+
+	*nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
+	*nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
+}
+
+/* Calculate nb_rx_* after adding rx_queue_id */
+static void
+rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
+		struct eth_device_info *dev_info,
+		int rx_queue_id,
+		uint16_t wt,
+		uint32_t *nb_rx_poll,
+		uint32_t *nb_wrr)
+{
+	rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
+				wt, nb_rx_poll, nb_wrr);
+}
+
+/* Calculate nb_rx_* after deleting rx_queue_id */
+static void
+rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
+		struct eth_device_info *dev_info,
+		int rx_queue_id,
+		uint32_t *nb_rx_poll,
+		uint32_t *nb_wrr)
+{
+	rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
+				nb_wrr);
+}
+
+/*
+ * Allocate the rx_poll array
+ */
+static struct eth_rx_poll_entry *
+rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
+	uint32_t num_rx_polled)
+{
+	size_t len;
+
+	len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
+							RTE_CACHE_LINE_SIZE);
+	return  rte_zmalloc_socket(rx_adapter->mem_name,
+				len,
+				RTE_CACHE_LINE_SIZE,
+				rx_adapter->socket_id);
+}
+
+/*
+ * Allocate the WRR array
+ */
+static uint32_t *
+rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
+{
+	size_t len;
+
+	len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
+			RTE_CACHE_LINE_SIZE);
+	return  rte_zmalloc_socket(rx_adapter->mem_name,
+				len,
+				RTE_CACHE_LINE_SIZE,
+				rx_adapter->socket_id);
+}
+
 static int
-rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter)
+rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
+		uint32_t nb_poll,
+		uint32_t nb_wrr,
+		struct eth_rx_poll_entry **rx_poll,
+		uint32_t **wrr_sched)
+{
+
+	if (nb_poll == 0) {
+		*rx_poll = NULL;
+		*wrr_sched = NULL;
+		return 0;
+	}
+
+	*rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
+	if (*rx_poll == NULL) {
+		*wrr_sched = NULL;
+		return -ENOMEM;
+	}
+
+	*wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
+	if (*wrr_sched == NULL) {
+		rte_free(*rx_poll);
+		return -ENOMEM;
+	}
+	return 0;
+}
+
+/* Precalculate WRR polling sequence for all queues in rx_adapter */
+static void
+rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
+		struct eth_rx_poll_entry *rx_poll,
+		uint32_t *rx_wrr)
 {
 	uint16_t d;
 	uint16_t q;
 	unsigned int i;
+	int prev = -1;
+	int cw = -1;
 
 	/* Initialize variables for calculation of wrr schedule */
 	uint16_t max_wrr_pos = 0;
@@ -202,77 +365,48 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	uint16_t max_wt = 0;
 	uint16_t gcd = 0;
 
-	struct eth_rx_poll_entry *rx_poll = NULL;
-	uint32_t *rx_wrr = NULL;
+	if (rx_poll == NULL)
+		return;
 
-	if (rx_adapter->num_rx_polled) {
-		size_t len = RTE_ALIGN(rx_adapter->num_rx_polled *
-				sizeof(*rx_adapter->eth_rx_poll),
-				RTE_CACHE_LINE_SIZE);
-		rx_poll = rte_zmalloc_socket(rx_adapter->mem_name,
-					     len,
-					     RTE_CACHE_LINE_SIZE,
-					     rx_adapter->socket_id);
-		if (rx_poll == NULL)
-			return -ENOMEM;
+	/* Generate array of all queues to poll, the size of this
+	 * array is poll_q
+	 */
+	RTE_ETH_FOREACH_DEV(d) {
+		uint16_t nb_rx_queues;
+		struct eth_device_info *dev_info =
+				&rx_adapter->eth_devices[d];
+		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+		if (dev_info->rx_queue == NULL)
+			continue;
+		if (dev_info->internal_event_port)
+			continue;
+		dev_info->wrr_len = 0;
+		for (q = 0; q < nb_rx_queues; q++) {
+			struct eth_rx_queue_info *queue_info =
+				&dev_info->rx_queue[q];
+			uint16_t wt;
 
-		/* Generate array of all queues to poll, the size of this
-		 * array is poll_q
-		 */
-		RTE_ETH_FOREACH_DEV(d) {
-			uint16_t nb_rx_queues;
-			struct eth_device_info *dev_info =
-					&rx_adapter->eth_devices[d];
-			nb_rx_queues = dev_info->dev->data->nb_rx_queues;
-			if (dev_info->rx_queue == NULL)
-				continue;
-			if (dev_info->internal_event_port)
+			if (!rxa_polled_queue(dev_info, q))
 				continue;
-			for (q = 0; q < nb_rx_queues; q++) {
-				struct eth_rx_queue_info *queue_info =
-					&dev_info->rx_queue[q];
-				if (queue_info->queue_enabled == 0)
-					continue;
-
-				uint16_t wt = queue_info->wt;
-				rx_poll[poll_q].eth_dev_id = d;
-				rx_poll[poll_q].eth_rx_qid = q;
-				max_wrr_pos += wt;
-				max_wt = RTE_MAX(max_wt, wt);
-				gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
-				poll_q++;
-			}
-		}
-
-		len = RTE_ALIGN(max_wrr_pos * sizeof(*rx_wrr),
-				RTE_CACHE_LINE_SIZE);
-		rx_wrr = rte_zmalloc_socket(rx_adapter->mem_name,
-					    len,
-					    RTE_CACHE_LINE_SIZE,
-					    rx_adapter->socket_id);
-		if (rx_wrr == NULL) {
-			rte_free(rx_poll);
-			return -ENOMEM;
-		}
-
-		/* Generate polling sequence based on weights */
-		int prev = -1;
-		int cw = -1;
-		for (i = 0; i < max_wrr_pos; i++) {
-			rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
-					     rx_poll, max_wt, gcd, prev);
-			prev = rx_wrr[i];
+			wt = queue_info->wt;
+			rx_poll[poll_q].eth_dev_id = d;
+			rx_poll[poll_q].eth_rx_qid = q;
+			max_wrr_pos += wt;
+			dev_info->wrr_len += wt;
+			max_wt = RTE_MAX(max_wt, wt);
+			gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
+			poll_q++;
 		}
 	}
 
-	rte_free(rx_adapter->eth_rx_poll);
-	rte_free(rx_adapter->wrr_sched);
-
-	rx_adapter->eth_rx_poll = rx_poll;
-	rx_adapter->wrr_sched = rx_wrr;
-	rx_adapter->wrr_len = max_wrr_pos;
-
-	return 0;
+	/* Generate polling sequence based on weights */
+	prev = -1;
+	cw = -1;
+	for (i = 0; i < max_wrr_pos; i++) {
+		rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
+				     rx_poll, max_wt, gcd, prev);
+		prev = rx_wrr[i];
+	}
 }
 
 static inline void
@@ -719,31 +853,53 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	}
 }
 
-static int
+static void
 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
 	struct eth_device_info *dev_info,
-	uint16_t rx_queue_id)
+	int32_t rx_queue_id)
 {
-	struct eth_rx_queue_info *queue_info;
+	int pollq;
 
 	if (rx_adapter->nb_queues == 0)
-		return 0;
+		return;
 
-	queue_info = &dev_info->rx_queue[rx_queue_id];
-	rx_adapter->num_rx_polled -= queue_info->queue_enabled;
+	if (rx_queue_id == -1) {
+		uint16_t nb_rx_queues;
+		uint16_t i;
+
+		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+		for (i = 0; i <	nb_rx_queues; i++)
+			rxa_sw_del(rx_adapter, dev_info, i);
+		return;
+	}
+
+	pollq = rxa_polled_queue(dev_info, rx_queue_id);
 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
-	return 0;
+	rx_adapter->num_rx_polled -= pollq;
+	dev_info->nb_rx_poll -= pollq;
 }
 
 static void
 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 	struct eth_device_info *dev_info,
-	uint16_t rx_queue_id,
+	int32_t rx_queue_id,
 	const struct rte_event_eth_rx_adapter_queue_conf *conf)
-
 {
 	struct eth_rx_queue_info *queue_info;
 	const struct rte_event *ev = &conf->ev;
+	int pollq;
+
+	if (rx_queue_id == -1) {
+		uint16_t nb_rx_queues;
+		uint16_t i;
+
+		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+		for (i = 0; i <	nb_rx_queues; i++)
+			rxa_add_queue(rx_adapter, dev_info, i, conf);
+		return;
+	}
+
+	pollq = rxa_polled_queue(dev_info, rx_queue_id);
 
 	queue_info = &dev_info->rx_queue[rx_queue_id];
 	queue_info->event_queue_id = ev->queue_id;
@@ -757,9 +913,11 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 		queue_info->flow_id_mask = ~0;
 	}
 
-	/* The same queue can be added more than once */
-	rx_adapter->num_rx_polled += !queue_info->queue_enabled;
 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
+	if (rxa_polled_queue(dev_info, rx_queue_id)) {
+		rx_adapter->num_rx_polled += !pollq;
+		dev_info->nb_rx_poll += !pollq;
+	}
 }
 
 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
@@ -769,8 +927,12 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 {
 	struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
 	struct rte_event_eth_rx_adapter_queue_conf temp_conf;
-	uint32_t i;
 	int ret;
+	struct eth_rx_poll_entry *rx_poll;
+	struct eth_rx_queue_info *rx_queue;
+	uint32_t *rx_wrr;
+	uint16_t nb_rx_queues;
+	uint32_t nb_rx_poll, nb_wrr;
 
 	if (queue_conf->servicing_weight == 0) {
 
@@ -787,31 +949,51 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 		queue_conf = &temp_conf;
 	}
 
+	nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+	rx_queue = dev_info->rx_queue;
+
 	if (dev_info->rx_queue == NULL) {
 		dev_info->rx_queue =
 		    rte_zmalloc_socket(rx_adapter->mem_name,
-				       dev_info->dev->data->nb_rx_queues *
+				       nb_rx_queues *
 				       sizeof(struct eth_rx_queue_info), 0,
 				       rx_adapter->socket_id);
 		if (dev_info->rx_queue == NULL)
 			return -ENOMEM;
 	}
+	rx_wrr = NULL;
+	rx_poll = NULL;
 
-	if (rx_queue_id == -1) {
-		for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
-			rxa_add_queue(rx_adapter, dev_info, i, queue_conf);
-	} else {
-		rxa_add_queue(rx_adapter, dev_info, (uint16_t)rx_queue_id,
-			queue_conf);
-	}
+	rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
+			queue_conf->servicing_weight,
+			&nb_rx_poll, &nb_wrr);
 
-	ret = rxa_calc_wrr_sequence(rx_adapter);
-	if (ret) {
-		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
-		return ret;
+	ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
+				&rx_poll, &rx_wrr);
+	if (ret)
+		goto err_free_rxqueue;
+
+	rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
+	rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
+
+	rte_free(rx_adapter->eth_rx_poll);
+	rte_free(rx_adapter->wrr_sched);
+
+	rx_adapter->eth_rx_poll = rx_poll;
+	rx_adapter->wrr_sched = rx_wrr;
+	rx_adapter->wrr_len = nb_wrr;
+	return 0;
+
+err_free_rxqueue:
+	if (rx_queue == NULL) {
+		rte_free(dev_info->rx_queue);
+		dev_info->rx_queue = NULL;
 	}
 
-	return ret;
+	rte_free(rx_poll);
+	rte_free(rx_wrr);
+
+	return 0;
 }
 
 static int
@@ -995,7 +1177,6 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 	struct rte_event_eth_rx_adapter *rx_adapter;
 	struct rte_eventdev *dev;
 	struct eth_device_info *dev_info;
-	int start_service;
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
@@ -1038,7 +1219,6 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 		return -EINVAL;
 	}
 
-	start_service = 0;
 	dev_info = &rx_adapter->eth_devices[eth_dev_id];
 
 	if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
@@ -1068,21 +1248,19 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 		rte_spinlock_lock(&rx_adapter->rx_lock);
 		dev_info->internal_event_port = 0;
 		ret = rxa_init_service(rx_adapter, id);
-		if (ret == 0)
+		if (ret == 0) {
+			uint32_t service_id = rx_adapter->service_id;
 			ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
 					queue_conf);
+			rte_service_component_runstate_set(service_id,
+				rxa_sw_adapter_queue_count(rx_adapter));
+		}
 		rte_spinlock_unlock(&rx_adapter->rx_lock);
-		if (ret == 0)
-			start_service =
-				!!rxa_sw_adapter_queue_count(rx_adapter);
 	}
 
 	if (ret)
 		return ret;
 
-	if (start_service)
-		rte_service_component_runstate_set(rx_adapter->service_id, 1);
-
 	return 0;
 }
 
@@ -1095,7 +1273,10 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 	struct rte_event_eth_rx_adapter *rx_adapter;
 	struct eth_device_info *dev_info;
 	uint32_t cap;
-	uint16_t i;
+	uint32_t nb_rx_poll = 0;
+	uint32_t nb_wrr = 0;
+	struct eth_rx_poll_entry *rx_poll = NULL;
+	uint32_t *rx_wrr = NULL;
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
@@ -1137,26 +1318,31 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 			}
 		}
 	} else {
-		int rc;
+		rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
+			&nb_rx_poll, &nb_wrr);
+		ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
+			&rx_poll, &rx_wrr);
+		if (ret)
+			return ret;
+
 		rte_spinlock_lock(&rx_adapter->rx_lock);
-		if (rx_queue_id == -1) {
-			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
-				rxa_sw_del(rx_adapter, dev_info, i);
-		} else {
-			rxa_sw_del(rx_adapter, dev_info, (uint16_t)rx_queue_id);
-		}
+		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
+		rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
+
+		rte_free(rx_adapter->eth_rx_poll);
+		rte_free(rx_adapter->wrr_sched);
 
-		rc = rxa_calc_wrr_sequence(rx_adapter);
-		if (rc)
-			RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32,
-					rc);
+		rx_adapter->eth_rx_poll = rx_poll;
+		rx_adapter->num_rx_polled = nb_rx_poll;
+		rx_adapter->wrr_sched = rx_wrr;
+		rx_adapter->wrr_len = nb_wrr;
 
 		if (dev_info->nb_dev_queues == 0) {
 			rte_free(dev_info->rx_queue);
 			dev_info->rx_queue = NULL;
 		}
-
 		rte_spinlock_unlock(&rx_adapter->rx_lock);
+
 		rte_service_component_runstate_set(rx_adapter->service_id,
 				rxa_sw_adapter_queue_count(rx_adapter));
 	}
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v2 3/5] eventdev: move Rx adapter eth Rx to separate function
  2018-06-27 10:55 ` [PATCH v2 0/5] eventdev: add interrupt driven queues to Rx adapter Nikhil Rao
  2018-06-27 10:55   ` [PATCH v2 1/5] eventdev: standardize Rx adapter internal function names Nikhil Rao
  2018-06-27 10:55   ` [PATCH v2 2/5] eventdev: improve err handling for Rx adapter queue add/del Nikhil Rao
@ 2018-06-27 10:55   ` Nikhil Rao
  2018-06-27 10:55   ` [PATCH v2 4/5] eventdev: add interrupt driven queues to Rx adapter Nikhil Rao
  2018-06-27 10:55   ` [PATCH v2 5/5] eventdev: add Rx adapter tests for interrupt driven queues Nikhil Rao
  4 siblings, 0 replies; 19+ messages in thread
From: Nikhil Rao @ 2018-06-27 10:55 UTC (permalink / raw)
  To: jerin.jacob; +Cc: nikhil.rao, dev

Create a separate function that handles eth receive and
enqueue to event buffer. This function will also be called for
interrupt driven receive queues.

Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
Acked-by: Jerin Jacob <jerin.jacob@caviumnetworks.com>
---
 lib/librte_eventdev/rte_event_eth_rx_adapter.c | 67 ++++++++++++++++++--------
 1 file changed, 47 insertions(+), 20 deletions(-)

diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
index 926f83a..8fe037f 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
@@ -616,6 +616,45 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	}
 }
 
+/* Enqueue packets from  <port, q>  to event buffer */
+static inline uint32_t
+rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
+	uint16_t port_id,
+	uint16_t queue_id,
+	uint32_t rx_count,
+	uint32_t max_rx)
+{
+	struct rte_mbuf *mbufs[BATCH_SIZE];
+	struct rte_eth_event_enqueue_buffer *buf =
+					&rx_adapter->event_enqueue_buffer;
+	struct rte_event_eth_rx_adapter_stats *stats =
+					&rx_adapter->stats;
+	uint16_t n;
+	uint32_t nb_rx = 0;
+
+	/* Don't do a batch dequeue from the rx queue if there isn't
+	 * enough space in the enqueue buffer.
+	 */
+	while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
+		if (buf->count >= BATCH_SIZE)
+			rxa_flush_event_buffer(rx_adapter);
+
+		stats->rx_poll_count++;
+		n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
+		if (unlikely(!n))
+			break;
+		rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
+		nb_rx += n;
+		if (rx_count + nb_rx > max_rx)
+			break;
+	}
+
+	if (buf->count >= BATCH_SIZE)
+		rxa_flush_event_buffer(rx_adapter);
+
+	return nb_rx;
+}
+
 /*
  * Polls receive queues added to the event adapter and enqueues received
  * packets to the event device.
@@ -633,17 +672,16 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
 {
 	uint32_t num_queue;
-	uint16_t n;
 	uint32_t nb_rx = 0;
-	struct rte_mbuf *mbufs[BATCH_SIZE];
 	struct rte_eth_event_enqueue_buffer *buf;
 	uint32_t wrr_pos;
 	uint32_t max_nb_rx;
+	struct rte_event_eth_rx_adapter_stats *stats;
 
 	wrr_pos = rx_adapter->wrr_pos;
 	max_nb_rx = rx_adapter->max_nb_rx;
 	buf = &rx_adapter->event_enqueue_buffer;
-	struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
+	stats = &rx_adapter->stats;
 
 	/* Iterate through a WRR sequence */
 	for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
@@ -658,32 +696,21 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 			rxa_flush_event_buffer(rx_adapter);
 		if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
 			rx_adapter->wrr_pos = wrr_pos;
-			return;
+			break;
 		}
 
-		stats->rx_poll_count++;
-		n = rte_eth_rx_burst(d, qid, mbufs, BATCH_SIZE);
-
-		if (n) {
-			stats->rx_packets += n;
-			/* The check before rte_eth_rx_burst() ensures that
-			 * all n mbufs can be buffered
-			 */
-			rxa_buffer_mbufs(rx_adapter, d, qid, mbufs, n);
-			nb_rx += n;
-			if (nb_rx > max_nb_rx) {
-				rx_adapter->wrr_pos =
+		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx);
+		if (nb_rx > max_nb_rx) {
+			rx_adapter->wrr_pos =
 				    (wrr_pos + 1) % rx_adapter->wrr_len;
-				break;
-			}
+			break;
 		}
 
 		if (++wrr_pos == rx_adapter->wrr_len)
 			wrr_pos = 0;
 	}
 
-	if (buf->count >= BATCH_SIZE)
-		rxa_flush_event_buffer(rx_adapter);
+	stats->rx_packets += nb_rx;
 }
 
 static int
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v2 4/5] eventdev: add interrupt driven queues to Rx adapter
  2018-06-27 10:55 ` [PATCH v2 0/5] eventdev: add interrupt driven queues to Rx adapter Nikhil Rao
                     ` (2 preceding siblings ...)
  2018-06-27 10:55   ` [PATCH v2 3/5] eventdev: move Rx adapter eth Rx to separate function Nikhil Rao
@ 2018-06-27 10:55   ` Nikhil Rao
  2018-06-27 10:55   ` [PATCH v2 5/5] eventdev: add Rx adapter tests for interrupt driven queues Nikhil Rao
  4 siblings, 0 replies; 19+ messages in thread
From: Nikhil Rao @ 2018-06-27 10:55 UTC (permalink / raw)
  To: jerin.jacob; +Cc: nikhil.rao, dev

Add support for interrupt driven queues when eth device is
configured for rxq interrupts and servicing weight for the
queue is configured to be zero.

A interrupt driven packet received counter has been added to
rte_event_eth_rx_adapter_stats.

Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
---
 config/rte_config.h                                |   1 +
 lib/librte_eventdev/rte_event_eth_rx_adapter.h     |   5 +-
 lib/librte_eventdev/rte_event_eth_rx_adapter.c     | 923 ++++++++++++++++++++-
 .../prog_guide/event_ethernet_rx_adapter.rst       |  24 +
 config/common_base                                 |   1 +
 lib/librte_eventdev/Makefile                       |   2 +-
 6 files changed, 927 insertions(+), 29 deletions(-)

diff --git a/config/rte_config.h b/config/rte_config.h
index a1d0175..ec88f14 100644
--- a/config/rte_config.h
+++ b/config/rte_config.h
@@ -64,6 +64,7 @@
 #define RTE_EVENT_MAX_DEVS 16
 #define RTE_EVENT_MAX_QUEUES_PER_DEV 64
 #define RTE_EVENT_TIMER_ADAPTER_NUM_MAX 32
+#define RTE_EVENT_ETH_INTR_RING_SIZE 1024
 #define RTE_EVENT_CRYPTO_ADAPTER_MAX_INSTANCE 32
 
 /* rawdev defines */
diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.h b/lib/librte_eventdev/rte_event_eth_rx_adapter.h
index 307b2b5..97f25e9 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.h
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.h
@@ -64,8 +64,7 @@
  * the service function ID of the adapter in this case.
  *
  * Note:
- * 1) Interrupt driven receive queues are currently unimplemented.
- * 2) Devices created after an instance of rte_event_eth_rx_adapter_create
+ * 1) Devices created after an instance of rte_event_eth_rx_adapter_create
  *  should be added to a new instance of the rx adapter.
  */
 
@@ -199,6 +198,8 @@ struct rte_event_eth_rx_adapter_stats {
 	 * block cycles can be used to compute the percentage of
 	 * cycles the service is blocked by the event device.
 	 */
+	uint64_t rx_intr_packets;
+	/**< Received packet count for interrupt mode Rx queues */
 };
 
 /**
diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
index 8fe037f..62886c4 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
@@ -2,6 +2,8 @@
  * Copyright(c) 2017 Intel Corporation.
  * All rights reserved.
  */
+#include <unistd.h>
+#include <sys/epoll.h>
 #include <rte_cycles.h>
 #include <rte_common.h>
 #include <rte_dev.h>
@@ -11,6 +13,7 @@
 #include <rte_malloc.h>
 #include <rte_service_component.h>
 #include <rte_thash.h>
+#include <rte_interrupts.h>
 
 #include "rte_eventdev.h"
 #include "rte_eventdev_pmd.h"
@@ -24,6 +27,22 @@
 #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
 
 #define RSS_KEY_SIZE	40
+/* value written to intr thread pipe to signal thread exit */
+#define ETH_BRIDGE_INTR_THREAD_EXIT	1
+/* Sentinel value to detect initialized file handle */
+#define INIT_FD		-1
+
+/*
+ * Used to store port and queue ID of interrupting Rx queue
+ */
+union queue_data {
+	RTE_STD_C11
+	void *ptr;
+	struct {
+		uint16_t port;
+		uint16_t queue;
+	};
+};
 
 /*
  * There is an instance of this struct per polled Rx queue added to the
@@ -75,6 +94,32 @@ struct rte_event_eth_rx_adapter {
 	uint16_t enq_block_count;
 	/* Block start ts */
 	uint64_t rx_enq_block_start_ts;
+	/* epoll fd used to wait for Rx interrupts */
+	int epd;
+	/* Num of interrupt driven interrupt queues */
+	uint32_t num_rx_intr;
+	/* Used to send <dev id, queue id> of interrupting Rx queues from
+	 * the interrupt thread to the Rx thread
+	 */
+	struct rte_ring *intr_ring;
+	/* Rx Queue data (dev id, queue id) for the last non-empty
+	 * queue polled
+	 */
+	union queue_data qd;
+	/* queue_data is valid */
+	int qd_valid;
+	/* Interrupt ring lock, synchronizes Rx thread
+	 * and interrupt thread
+	 */
+	rte_spinlock_t intr_ring_lock;
+	/* event array passed to rte_poll_wait */
+	struct rte_epoll_event *epoll_events;
+	/* Count of interrupt vectors in use */
+	uint32_t num_intr_vec;
+	/* Thread blocked on Rx interrupts */
+	pthread_t rx_intr_thread;
+	/* Stop thread flag */
+	uint8_t stop_thread;
 	/* Configuration callback for rte_service configuration */
 	rte_event_eth_rx_adapter_conf_cb conf_cb;
 	/* Configuration callback argument */
@@ -93,6 +138,8 @@ struct rte_event_eth_rx_adapter {
 	uint32_t service_id;
 	/* Adapter started flag */
 	uint8_t rxa_started;
+	/* Adapter ID */
+	uint8_t id;
 } __rte_cache_aligned;
 
 /* Per eth device */
@@ -111,19 +158,40 @@ struct eth_device_info {
 	uint8_t dev_rx_started;
 	/* Number of queues added for this device */
 	uint16_t nb_dev_queues;
-	/* If nb_rx_poll > 0, the start callback will
+	/* Number of poll based queues
+	 * If nb_rx_poll > 0, the start callback will
 	 * be invoked if not already invoked
 	 */
 	uint16_t nb_rx_poll;
+	/* Number of interrupt based queues
+	 * If nb_rx_intr > 0, the start callback will
+	 * be invoked if not already invoked.
+	 */
+	uint16_t nb_rx_intr;
+	/* Number of queues that use the shared interrupt */
+	uint16_t nb_shared_intr;
 	/* sum(wrr(q)) for all queues within the device
 	 * useful when deleting all device queues
 	 */
 	uint32_t wrr_len;
+	/* Intr based queue index to start polling from, this is used
+	 * if the number of shared interrupts is non-zero
+	 */
+	uint16_t next_q_idx;
+	/* Intr based queue indices */
+	uint16_t *intr_queue;
+	/* device generates per Rx queue interrupt for queue index
+	 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
+	 */
+	int multi_intr_cap;
+	/* shared interrupt enabled */
+	int shared_intr_enabled;
 };
 
 /* Per Rx queue */
 struct eth_rx_queue_info {
 	int queue_enabled;	/* True if added */
+	int intr_enabled;
 	uint16_t wt;		/* Polling weight */
 	uint8_t event_queue_id;	/* Event queue to enqueue packets to */
 	uint8_t sched_type;	/* Sched type for events */
@@ -150,7 +218,7 @@ struct eth_rx_queue_info {
 static inline int
 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
 {
-	return rx_adapter->num_rx_polled;
+	return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
 }
 
 /* Greatest common divisor */
@@ -195,6 +263,28 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 }
 
 static inline int
+rxa_shared_intr(struct eth_device_info *dev_info,
+	int rx_queue_id)
+{
+	int multi_intr_cap =
+			rte_intr_cap_multiple(dev_info->dev->intr_handle);
+	return !multi_intr_cap ||
+		rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
+}
+
+static inline int
+rxa_intr_queue(struct eth_device_info *dev_info,
+	int rx_queue_id)
+{
+	struct eth_rx_queue_info *queue_info;
+
+	queue_info = &dev_info->rx_queue[rx_queue_id];
+	return dev_info->rx_queue &&
+		!dev_info->internal_event_port &&
+		queue_info->queue_enabled && queue_info->wt == 0;
+}
+
+static inline int
 rxa_polled_queue(struct eth_device_info *dev_info,
 	int rx_queue_id)
 {
@@ -206,6 +296,95 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 		queue_info->queue_enabled && queue_info->wt != 0;
 }
 
+/* Calculate change in number of vectors after Rx queue ID is add/deleted */
+static int
+rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
+{
+	uint16_t i;
+	int n, s;
+	uint16_t nbq;
+
+	nbq = dev_info->dev->data->nb_rx_queues;
+	n = 0; /* non shared count */
+	s = 0; /* shared count */
+
+	if (rx_queue_id == -1) {
+		for (i = 0; i < nbq; i++) {
+			if (!rxa_shared_intr(dev_info, i))
+				n += add ? !rxa_intr_queue(dev_info, i) :
+					rxa_intr_queue(dev_info, i);
+			else
+				s += add ? !rxa_intr_queue(dev_info, i) :
+					rxa_intr_queue(dev_info, i);
+		}
+
+		if (s > 0) {
+			if ((add && dev_info->nb_shared_intr == 0) ||
+				(!add && dev_info->nb_shared_intr))
+				n += 1;
+		}
+	} else {
+		if (!rxa_shared_intr(dev_info, rx_queue_id))
+			n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
+				rxa_intr_queue(dev_info, rx_queue_id);
+		else
+			n = add ? !dev_info->nb_shared_intr :
+				dev_info->nb_shared_intr == 1;
+	}
+
+	return add ? n : -n;
+}
+
+/* Calculate nb_rx_intr after deleting interrupt mode rx queues
+ */
+static void
+rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
+			struct eth_device_info *dev_info,
+			int rx_queue_id,
+			uint32_t *nb_rx_intr)
+{
+	uint32_t intr_diff;
+
+	if (rx_queue_id == -1)
+		intr_diff = dev_info->nb_rx_intr;
+	else
+		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
+
+	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
+}
+
+/* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
+ * interrupt queues could currently be poll mode Rx queues
+ */
+static void
+rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
+			struct eth_device_info *dev_info,
+			int rx_queue_id,
+			uint32_t *nb_rx_poll,
+			uint32_t *nb_rx_intr,
+			uint32_t *nb_wrr)
+{
+	uint32_t intr_diff;
+	uint32_t poll_diff;
+	uint32_t wrr_len_diff;
+
+	if (rx_queue_id == -1) {
+		intr_diff = dev_info->dev->data->nb_rx_queues -
+						dev_info->nb_rx_intr;
+		poll_diff = dev_info->nb_rx_poll;
+		wrr_len_diff = dev_info->wrr_len;
+	} else {
+		intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
+		poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
+		wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
+					0;
+	}
+
+	*nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
+	*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
+	*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
+}
+
 /* Calculate size of the eth_rx_poll and wrr_sched arrays
  * after deleting poll mode rx queues
  */
@@ -240,17 +419,21 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 			int rx_queue_id,
 			uint16_t wt,
 			uint32_t *nb_rx_poll,
+			uint32_t *nb_rx_intr,
 			uint32_t *nb_wrr)
 {
+	uint32_t intr_diff;
 	uint32_t poll_diff;
 	uint32_t wrr_len_diff;
 
 	if (rx_queue_id == -1) {
+		intr_diff = dev_info->nb_rx_intr;
 		poll_diff = dev_info->dev->data->nb_rx_queues -
 						dev_info->nb_rx_poll;
 		wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
 				- dev_info->wrr_len;
 	} else {
+		intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
 		poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
 		wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
 				wt - dev_info->rx_queue[rx_queue_id].wt :
@@ -258,6 +441,7 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	}
 
 	*nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
+	*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
 	*nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
 }
 
@@ -268,10 +452,15 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 		int rx_queue_id,
 		uint16_t wt,
 		uint32_t *nb_rx_poll,
+		uint32_t *nb_rx_intr,
 		uint32_t *nb_wrr)
 {
-	rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
-				wt, nb_rx_poll, nb_wrr);
+	if (wt != 0)
+		rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
+					wt, nb_rx_poll, nb_rx_intr, nb_wrr);
+	else
+		rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
+					nb_rx_poll, nb_rx_intr, nb_wrr);
 }
 
 /* Calculate nb_rx_* after deleting rx_queue_id */
@@ -280,10 +469,13 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 		struct eth_device_info *dev_info,
 		int rx_queue_id,
 		uint32_t *nb_rx_poll,
+		uint32_t *nb_rx_intr,
 		uint32_t *nb_wrr)
 {
 	rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
 				nb_wrr);
+	rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
+				nb_rx_intr);
 }
 
 /*
@@ -622,7 +814,8 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	uint16_t port_id,
 	uint16_t queue_id,
 	uint32_t rx_count,
-	uint32_t max_rx)
+	uint32_t max_rx,
+	int *rxq_empty)
 {
 	struct rte_mbuf *mbufs[BATCH_SIZE];
 	struct rte_eth_event_enqueue_buffer *buf =
@@ -632,6 +825,8 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	uint16_t n;
 	uint32_t nb_rx = 0;
 
+	if (rxq_empty)
+		*rxq_empty = 0;
 	/* Don't do a batch dequeue from the rx queue if there isn't
 	 * enough space in the enqueue buffer.
 	 */
@@ -641,8 +836,11 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 
 		stats->rx_poll_count++;
 		n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
-		if (unlikely(!n))
+		if (unlikely(!n)) {
+			if (rxq_empty)
+				*rxq_empty = 1;
 			break;
+		}
 		rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
 		nb_rx += n;
 		if (rx_count + nb_rx > max_rx)
@@ -655,6 +853,228 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	return nb_rx;
 }
 
+static inline void
+rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
+		void *data)
+{
+	uint16_t port_id;
+	uint16_t queue;
+	int err;
+	union queue_data qd;
+	struct eth_device_info *dev_info;
+	struct eth_rx_queue_info *queue_info;
+	int *intr_enabled;
+
+	qd.ptr = data;
+	port_id = qd.port;
+	queue = qd.queue;
+
+	dev_info = &rx_adapter->eth_devices[port_id];
+	queue_info = &dev_info->rx_queue[queue];
+	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
+	if (rxa_shared_intr(dev_info, queue))
+		intr_enabled = &dev_info->shared_intr_enabled;
+	else
+		intr_enabled = &queue_info->intr_enabled;
+
+	if (*intr_enabled) {
+		*intr_enabled = 0;
+		err = rte_ring_enqueue(rx_adapter->intr_ring, data);
+		/* Entry should always be available.
+		 * The ring size equals the maximum number of interrupt
+		 * vectors supported (an interrupt vector is shared in
+		 * case of shared interrupts)
+		 */
+		if (err)
+			RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
+				" to ring: %s", strerror(err));
+		else
+			rte_eth_dev_rx_intr_disable(port_id, queue);
+	}
+	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
+}
+
+static int
+rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
+			uint32_t num_intr_vec)
+{
+	if (rx_adapter->num_intr_vec + num_intr_vec >
+				RTE_EVENT_ETH_INTR_RING_SIZE) {
+		RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
+		" %d needed %d limit %d", rx_adapter->num_intr_vec,
+		num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
+		return -ENOSPC;
+	}
+
+	return 0;
+}
+
+/* Delete entries for (dev, queue) from the interrupt ring */
+static void
+rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
+			struct eth_device_info *dev_info,
+			uint16_t rx_queue_id)
+{
+	int i, n;
+	union queue_data qd;
+
+	rte_spinlock_lock(&rx_adapter->intr_ring_lock);
+
+	n = rte_ring_count(rx_adapter->intr_ring);
+	for (i = 0; i < n; i++) {
+		rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
+		if (!rxa_shared_intr(dev_info, rx_queue_id)) {
+			if (qd.port == dev_info->dev->data->port_id &&
+				qd.queue == rx_queue_id)
+				continue;
+		} else {
+			if (qd.port == dev_info->dev->data->port_id)
+				continue;
+		}
+		rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
+	}
+
+	rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
+}
+
+/* pthread callback handling interrupt mode receive queues
+ * After receiving an Rx interrupt, it enqueues the port id and queue id of the
+ * interrupting queue to the adapter's ring buffer for interrupt events.
+ * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
+ * the adapter service function.
+ */
+static void *
+rxa_intr_thread(void *arg)
+{
+	struct rte_event_eth_rx_adapter *rx_adapter = arg;
+	struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
+	int n, i;
+
+	while (1) {
+		n = rte_epoll_wait(rx_adapter->epd, epoll_events,
+				RTE_EVENT_ETH_INTR_RING_SIZE + 1, -1);
+		if (unlikely(n < 0))
+			RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
+					n);
+		for (i = 0; i < n; i++) {
+			rxa_intr_ring_enqueue(rx_adapter,
+					epoll_events[i].epdata.data);
+		}
+	}
+
+	return NULL;
+}
+
+/* Dequeue <port, q> from interrupt ring and enqueue received
+ * mbufs to eventdev
+ */
+static inline uint32_t
+rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	uint32_t n;
+	uint32_t nb_rx = 0;
+	int rxq_empty;
+	struct rte_eth_event_enqueue_buffer *buf;
+	rte_spinlock_t *ring_lock;
+	uint8_t max_done = 0;
+
+	if (rx_adapter->num_rx_intr == 0)
+		return 0;
+
+	if (rte_ring_count(rx_adapter->intr_ring) == 0
+		&& !rx_adapter->qd_valid)
+		return 0;
+
+	buf = &rx_adapter->event_enqueue_buffer;
+	ring_lock = &rx_adapter->intr_ring_lock;
+
+	if (buf->count >= BATCH_SIZE)
+		rxa_flush_event_buffer(rx_adapter);
+
+	while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
+		struct eth_device_info *dev_info;
+		uint16_t port;
+		uint16_t queue;
+		union queue_data qd  = rx_adapter->qd;
+		int err;
+
+		if (!rx_adapter->qd_valid) {
+			struct eth_rx_queue_info *queue_info;
+
+			rte_spinlock_lock(ring_lock);
+			err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
+			if (err) {
+				rte_spinlock_unlock(ring_lock);
+				break;
+			}
+
+			port = qd.port;
+			queue = qd.queue;
+			rx_adapter->qd = qd;
+			rx_adapter->qd_valid = 1;
+			dev_info = &rx_adapter->eth_devices[port];
+			if (rxa_shared_intr(dev_info, queue))
+				dev_info->shared_intr_enabled = 1;
+			else {
+				queue_info = &dev_info->rx_queue[queue];
+				queue_info->intr_enabled = 1;
+			}
+			rte_eth_dev_rx_intr_enable(port, queue);
+			rte_spinlock_unlock(ring_lock);
+		} else {
+			port = qd.port;
+			queue = qd.queue;
+
+			dev_info = &rx_adapter->eth_devices[port];
+		}
+
+		if (rxa_shared_intr(dev_info, queue)) {
+			uint16_t i;
+			uint16_t nb_queues;
+
+			nb_queues = dev_info->dev->data->nb_rx_queues;
+			n = 0;
+			for (i = dev_info->next_q_idx; i < nb_queues; i++) {
+				uint8_t enq_buffer_full;
+
+				if (!rxa_intr_queue(dev_info, i))
+					continue;
+				n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
+					rx_adapter->max_nb_rx,
+					&rxq_empty);
+				nb_rx += n;
+
+				enq_buffer_full = !rxq_empty && n == 0;
+				max_done = nb_rx > rx_adapter->max_nb_rx;
+
+				if (enq_buffer_full || max_done) {
+					dev_info->next_q_idx = i;
+					goto done;
+				}
+			}
+
+			rx_adapter->qd_valid = 0;
+
+			/* Reinitialize for next interrupt */
+			dev_info->next_q_idx = dev_info->multi_intr_cap ?
+						RTE_MAX_RXTX_INTR_VEC_ID - 1 :
+						0;
+		} else {
+			n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
+				rx_adapter->max_nb_rx,
+				&rxq_empty);
+			rx_adapter->qd_valid = !rxq_empty;
+			nb_rx += n;
+			if (nb_rx > rx_adapter->max_nb_rx)
+				break;
+		}
+	}
+
+done:
+	rx_adapter->stats.rx_intr_packets += nb_rx;
+	return nb_rx;
+}
+
 /*
  * Polls receive queues added to the event adapter and enqueues received
  * packets to the event device.
@@ -668,7 +1088,7 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
  * the hypervisor's switching layer where adjustments can be made to deal with
  * it.
  */
-static inline void
+static inline uint32_t
 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
 {
 	uint32_t num_queue;
@@ -676,7 +1096,6 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	struct rte_eth_event_enqueue_buffer *buf;
 	uint32_t wrr_pos;
 	uint32_t max_nb_rx;
-	struct rte_event_eth_rx_adapter_stats *stats;
 
 	wrr_pos = rx_adapter->wrr_pos;
 	max_nb_rx = rx_adapter->max_nb_rx;
@@ -696,10 +1115,11 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 			rxa_flush_event_buffer(rx_adapter);
 		if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
 			rx_adapter->wrr_pos = wrr_pos;
-			break;
+			return nb_rx;
 		}
 
-		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx);
+		nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
+				NULL);
 		if (nb_rx > max_nb_rx) {
 			rx_adapter->wrr_pos =
 				    (wrr_pos + 1) % rx_adapter->wrr_len;
@@ -709,14 +1129,14 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 		if (++wrr_pos == rx_adapter->wrr_len)
 			wrr_pos = 0;
 	}
-
-	stats->rx_packets += nb_rx;
+	return nb_rx;
 }
 
 static int
 rxa_service_func(void *args)
 {
 	struct rte_event_eth_rx_adapter *rx_adapter = args;
+	struct rte_event_eth_rx_adapter_stats *stats;
 
 	if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
 		return 0;
@@ -724,7 +1144,10 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 		return 0;
 		rte_spinlock_unlock(&rx_adapter->rx_lock);
 	}
-	rxa_poll(rx_adapter);
+
+	stats = &rx_adapter->stats;
+	stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
+	stats->rx_packets += rxa_poll(rx_adapter);
 	rte_spinlock_unlock(&rx_adapter->rx_lock);
 	return 0;
 }
@@ -809,6 +1232,339 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 }
 
 static int
+rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	if (rx_adapter->epd != INIT_FD)
+		return 0;
+
+	rx_adapter->epd = epoll_create1(EPOLL_CLOEXEC);
+	if (rx_adapter->epd < 0) {
+		rx_adapter->epd = INIT_FD;
+		RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", errno);
+		return -errno;
+	}
+
+	return 0;
+}
+
+static int
+rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	int err;
+	char thread_name[RTE_MAX_THREAD_NAME_LEN];
+
+	if (rx_adapter->intr_ring)
+		return 0;
+
+	rx_adapter->intr_ring = rte_ring_create("intr_ring",
+					RTE_EVENT_ETH_INTR_RING_SIZE,
+					rte_socket_id(), 0);
+	if (!rx_adapter->intr_ring)
+		return -ENOMEM;
+
+	rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
+					(RTE_EVENT_ETH_INTR_RING_SIZE + 1) *
+					sizeof(struct rte_epoll_event),
+					RTE_CACHE_LINE_SIZE,
+					rx_adapter->socket_id);
+	if (!rx_adapter->epoll_events) {
+		err = -ENOMEM;
+		goto error;
+	}
+
+	rte_spinlock_init(&rx_adapter->intr_ring_lock);
+
+	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
+			"rx-intr-thread-%d", rx_adapter->id);
+
+	err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
+				NULL, rxa_intr_thread, rx_adapter);
+	if (!err) {
+		rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
+		return 0;
+	}
+
+	RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
+error:
+	rte_ring_free(rx_adapter->intr_ring);
+	rx_adapter->intr_ring = NULL;
+	rx_adapter->epoll_events = NULL;
+	return err;
+}
+
+static int
+rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	int err;
+
+	rx_adapter->stop_thread = 1;
+	err = pthread_cancel(rx_adapter->rx_intr_thread);
+	if (err)
+		RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
+				err);
+
+	err = pthread_join(rx_adapter->rx_intr_thread, NULL);
+	if (err)
+		RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
+
+	rx_adapter->stop_thread = 0;
+	rte_free(rx_adapter->epoll_events);
+	rte_ring_free(rx_adapter->intr_ring);
+	rx_adapter->intr_ring = NULL;
+	rx_adapter->epoll_events = NULL;
+	return 0;
+}
+
+static int
+rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+	int ret;
+
+	if (rx_adapter->num_rx_intr == 0)
+		return 0;
+
+	ret = rxa_destroy_intr_thread(rx_adapter);
+	if (ret)
+		return ret;
+
+	close(rx_adapter->epd);
+	rx_adapter->epd = INIT_FD;
+
+	return ret;
+}
+
+static int
+rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
+	struct eth_device_info *dev_info,
+	uint16_t rx_queue_id)
+{
+	int err;
+	uint16_t eth_dev_id = dev_info->dev->data->port_id;
+	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
+
+	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
+	if (err) {
+		RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
+			rx_queue_id);
+		return err;
+	}
+
+	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
+					rx_adapter->epd,
+					RTE_INTR_EVENT_DEL,
+					0);
+	if (err)
+		RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
+
+	if (sintr)
+		dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
+	else
+		dev_info->shared_intr_enabled = 0;
+	return err;
+}
+
+static int
+rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
+		struct eth_device_info *dev_info,
+		int rx_queue_id)
+{
+	int err;
+	int i;
+	int s;
+
+	if (dev_info->nb_rx_intr == 0)
+		return 0;
+
+	err = 0;
+	if (rx_queue_id == -1) {
+		s = dev_info->nb_shared_intr;
+		for (i = 0; i < dev_info->nb_rx_intr; i++) {
+			int sintr;
+			uint16_t q;
+
+			q = dev_info->intr_queue[i];
+			sintr = rxa_shared_intr(dev_info, q);
+			s -= sintr;
+
+			if (!sintr || s == 0) {
+
+				err = rxa_disable_intr(rx_adapter, dev_info,
+						q);
+				if (err)
+					return err;
+				rxa_intr_ring_del_entries(rx_adapter, dev_info,
+							q);
+			}
+		}
+	} else {
+		if (!rxa_intr_queue(dev_info, rx_queue_id))
+			return 0;
+		if (!rxa_shared_intr(dev_info, rx_queue_id) ||
+				dev_info->nb_shared_intr == 1) {
+			err = rxa_disable_intr(rx_adapter, dev_info,
+					rx_queue_id);
+			if (err)
+				return err;
+			rxa_intr_ring_del_entries(rx_adapter, dev_info,
+						rx_queue_id);
+		}
+
+		for (i = 0; i < dev_info->nb_rx_intr; i++) {
+			if (dev_info->intr_queue[i] == rx_queue_id) {
+				for (; i < dev_info->nb_rx_intr - 1; i++)
+					dev_info->intr_queue[i] =
+						dev_info->intr_queue[i + 1];
+				break;
+			}
+		}
+	}
+
+	return err;
+}
+
+static int
+rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
+	struct eth_device_info *dev_info,
+	uint16_t rx_queue_id)
+{
+	int err, err1;
+	uint16_t eth_dev_id = dev_info->dev->data->port_id;
+	union queue_data qd;
+	int init_fd;
+	uint16_t *intr_queue;
+	int sintr = rxa_shared_intr(dev_info, rx_queue_id);
+
+	if (rxa_intr_queue(dev_info, rx_queue_id))
+		return 0;
+
+	intr_queue = dev_info->intr_queue;
+	if (dev_info->intr_queue == NULL) {
+		size_t len =
+			dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
+		dev_info->intr_queue =
+			rte_zmalloc_socket(
+				rx_adapter->mem_name,
+				len,
+				0,
+				rx_adapter->socket_id);
+		if (dev_info->intr_queue == NULL)
+			return -ENOMEM;
+	}
+
+	init_fd = rx_adapter->epd;
+	err = rxa_init_epd(rx_adapter);
+	if (err)
+		goto err_free_queue;
+
+	qd.port = eth_dev_id;
+	qd.queue = rx_queue_id;
+
+	err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
+					rx_adapter->epd,
+					RTE_INTR_EVENT_ADD,
+					qd.ptr);
+	if (err) {
+		RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
+			" Rx Queue %u err %d", rx_queue_id, err);
+		goto err_del_fd;
+	}
+
+	err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
+	if (err) {
+		RTE_EDEV_LOG_ERR("Could not enable interrupt for"
+				" Rx Queue %u err %d", rx_queue_id, err);
+
+		goto err_del_event;
+	}
+
+	err = rxa_create_intr_thread(rx_adapter);
+	if (!err)  {
+		if (sintr)
+			dev_info->shared_intr_enabled = 1;
+		else
+			dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
+		return 0;
+	}
+
+
+	err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
+	if (err)
+		RTE_EDEV_LOG_ERR("Could not disable interrupt for"
+				" Rx Queue %u err %d", rx_queue_id, err);
+err_del_event:
+	err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
+					rx_adapter->epd,
+					RTE_INTR_EVENT_DEL,
+					0);
+	if (err1) {
+		RTE_EDEV_LOG_ERR("Could not delete event for"
+				" Rx Queue %u err %d", rx_queue_id, err1);
+	}
+err_del_fd:
+	if (init_fd == INIT_FD) {
+		close(rx_adapter->epd);
+		rx_adapter->epd = -1;
+	}
+err_free_queue:
+	if (intr_queue == NULL)
+		rte_free(dev_info->intr_queue);
+
+	return err;
+}
+
+static int
+rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
+	struct eth_device_info *dev_info,
+	int rx_queue_id)
+
+{
+	int i, j, err;
+	int si = -1;
+	int shared_done = (dev_info->nb_shared_intr > 0);
+
+	if (rx_queue_id != -1) {
+		if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
+			return 0;
+		return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
+	}
+
+	err = 0;
+	for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
+
+		if (rxa_shared_intr(dev_info, i) && shared_done)
+			continue;
+
+		err = rxa_config_intr(rx_adapter, dev_info, i);
+
+		shared_done = err == 0 && rxa_shared_intr(dev_info, i);
+		if (shared_done) {
+			si = i;
+			dev_info->shared_intr_enabled = 1;
+		}
+		if (err)
+			break;
+	}
+
+	if (err == 0)
+		return 0;
+
+	shared_done = (dev_info->nb_shared_intr > 0);
+	for (j = 0; j < i; j++) {
+		if (rxa_intr_queue(dev_info, j))
+			continue;
+		if (rxa_shared_intr(dev_info, j) && si != j)
+			continue;
+		err = rxa_disable_intr(rx_adapter, dev_info, j);
+		if (err)
+			break;
+
+	}
+
+	return err;
+}
+
+
+static int
 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
 {
 	int ret;
@@ -843,6 +1599,7 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
 	rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
 	rx_adapter->service_inited = 1;
+	rx_adapter->epd = INIT_FD;
 	return 0;
 
 err_done:
@@ -886,6 +1643,9 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	int32_t rx_queue_id)
 {
 	int pollq;
+	int intrq;
+	int sintrq;
+
 
 	if (rx_adapter->nb_queues == 0)
 		return;
@@ -901,9 +1661,14 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	}
 
 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
+	intrq = rxa_intr_queue(dev_info, rx_queue_id);
+	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
 	rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
 	rx_adapter->num_rx_polled -= pollq;
 	dev_info->nb_rx_poll -= pollq;
+	rx_adapter->num_rx_intr -= intrq;
+	dev_info->nb_rx_intr -= intrq;
+	dev_info->nb_shared_intr -= intrq && sintrq;
 }
 
 static void
@@ -915,6 +1680,8 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	struct eth_rx_queue_info *queue_info;
 	const struct rte_event *ev = &conf->ev;
 	int pollq;
+	int intrq;
+	int sintrq;
 
 	if (rx_queue_id == -1) {
 		uint16_t nb_rx_queues;
@@ -927,6 +1694,8 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	}
 
 	pollq = rxa_polled_queue(dev_info, rx_queue_id);
+	intrq = rxa_intr_queue(dev_info, rx_queue_id);
+	sintrq = rxa_shared_intr(dev_info, rx_queue_id);
 
 	queue_info = &dev_info->rx_queue[rx_queue_id];
 	queue_info->event_queue_id = ev->queue_id;
@@ -944,6 +1713,24 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
 	if (rxa_polled_queue(dev_info, rx_queue_id)) {
 		rx_adapter->num_rx_polled += !pollq;
 		dev_info->nb_rx_poll += !pollq;
+		rx_adapter->num_rx_intr -= intrq;
+		dev_info->nb_rx_intr -= intrq;
+		dev_info->nb_shared_intr -= intrq && sintrq;
+	}
+
+	if (rxa_intr_queue(dev_info, rx_queue_id)) {
+		rx_adapter->num_rx_polled -= pollq;
+		dev_info->nb_rx_poll -= pollq;
+		rx_adapter->num_rx_intr += !intrq;
+		dev_info->nb_rx_intr += !intrq;
+		dev_info->nb_shared_intr += !intrq && sintrq;
+		if (dev_info->nb_shared_intr == 1) {
+			if (dev_info->multi_intr_cap)
+				dev_info->next_q_idx =
+					RTE_MAX_RXTX_INTR_VEC_ID - 1;
+			else
+				dev_info->next_q_idx = 0;
+		}
 	}
 }
 
@@ -960,24 +1747,24 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 	uint32_t *rx_wrr;
 	uint16_t nb_rx_queues;
 	uint32_t nb_rx_poll, nb_wrr;
+	uint32_t nb_rx_intr;
+	int num_intr_vec;
+	uint16_t wt;
 
 	if (queue_conf->servicing_weight == 0) {
-
 		struct rte_eth_dev_data *data = dev_info->dev->data;
-		if (data->dev_conf.intr_conf.rxq) {
-			RTE_EDEV_LOG_ERR("Interrupt driven queues"
-					" not supported");
-			return -ENOTSUP;
-		}
-		temp_conf = *queue_conf;
 
-		/* If Rx interrupts are disabled set wt = 1 */
-		temp_conf.servicing_weight = 1;
+		temp_conf = *queue_conf;
+		if (!data->dev_conf.intr_conf.rxq) {
+			/* If Rx interrupts are disabled set wt = 1 */
+			temp_conf.servicing_weight = 1;
+		}
 		queue_conf = &temp_conf;
 	}
 
 	nb_rx_queues = dev_info->dev->data->nb_rx_queues;
 	rx_queue = dev_info->rx_queue;
+	wt = queue_conf->servicing_weight;
 
 	if (dev_info->rx_queue == NULL) {
 		dev_info->rx_queue =
@@ -993,13 +1780,64 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 
 	rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
 			queue_conf->servicing_weight,
-			&nb_rx_poll, &nb_wrr);
+			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
+
+	dev_info->multi_intr_cap =
+			rte_intr_cap_multiple(dev_info->dev->intr_handle);
 
 	ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
 				&rx_poll, &rx_wrr);
 	if (ret)
 		goto err_free_rxqueue;
 
+	if (wt == 0) {
+		num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
+
+		ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
+		if (ret)
+			goto err_free_rxqueue;
+
+		ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
+		if (ret)
+			goto err_free_rxqueue;
+	} else {
+
+		num_intr_vec = 0;
+		if (rx_adapter->num_rx_intr > nb_rx_intr) {
+			num_intr_vec = rxa_nb_intr_vect(dev_info,
+						rx_queue_id, 0);
+			/* interrupt based queues are being converted to
+			 * poll mode queues, delete the interrupt configuration
+			 * for those.
+			 */
+			ret = rxa_del_intr_queue(rx_adapter,
+						dev_info, rx_queue_id);
+			if (ret)
+				goto err_free_rxqueue;
+		}
+	}
+
+	if (nb_rx_intr == 0) {
+		ret = rxa_free_intr_resources(rx_adapter);
+		if (ret)
+			goto err_free_rxqueue;
+	}
+
+	if (wt == 0) {
+		uint16_t i;
+
+		if (rx_queue_id  == -1) {
+			for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
+				dev_info->intr_queue[i] = i;
+		} else {
+			if (!rxa_intr_queue(dev_info, rx_queue_id))
+				dev_info->intr_queue[nb_rx_intr - 1] =
+					rx_queue_id;
+		}
+	}
+
+
+
 	rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
 	rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
 
@@ -1009,6 +1847,7 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 	rx_adapter->eth_rx_poll = rx_poll;
 	rx_adapter->wrr_sched = rx_wrr;
 	rx_adapter->wrr_len = nb_wrr;
+	rx_adapter->num_intr_vec += num_intr_vec;
 	return 0;
 
 err_free_rxqueue:
@@ -1119,6 +1958,7 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 	rx_adapter->socket_id = socket_id;
 	rx_adapter->conf_cb = conf_cb;
 	rx_adapter->conf_arg = conf_arg;
+	rx_adapter->id = id;
 	strcpy(rx_adapter->mem_name, mem_name);
 	rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
 					/* FIXME: incompatible with hotplug */
@@ -1302,8 +2142,10 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 	uint32_t cap;
 	uint32_t nb_rx_poll = 0;
 	uint32_t nb_wrr = 0;
+	uint32_t nb_rx_intr;
 	struct eth_rx_poll_entry *rx_poll = NULL;
 	uint32_t *rx_wrr = NULL;
+	int num_intr_vec;
 
 	RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
@@ -1346,29 +2188,59 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 		}
 	} else {
 		rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
-			&nb_rx_poll, &nb_wrr);
+			&nb_rx_poll, &nb_rx_intr, &nb_wrr);
+
 		ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
 			&rx_poll, &rx_wrr);
 		if (ret)
 			return ret;
 
 		rte_spinlock_lock(&rx_adapter->rx_lock);
+
+		num_intr_vec = 0;
+		if (rx_adapter->num_rx_intr > nb_rx_intr) {
+
+			num_intr_vec = rxa_nb_intr_vect(dev_info,
+						rx_queue_id, 0);
+			ret = rxa_del_intr_queue(rx_adapter, dev_info,
+					rx_queue_id);
+			if (ret)
+				goto unlock_ret;
+		}
+
+		if (nb_rx_intr == 0) {
+			ret = rxa_free_intr_resources(rx_adapter);
+			if (ret)
+				goto unlock_ret;
+		}
+
 		rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
 		rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
 
 		rte_free(rx_adapter->eth_rx_poll);
 		rte_free(rx_adapter->wrr_sched);
 
+		if (nb_rx_intr == 0) {
+			rte_free(dev_info->intr_queue);
+			dev_info->intr_queue = NULL;
+		}
+
 		rx_adapter->eth_rx_poll = rx_poll;
-		rx_adapter->num_rx_polled = nb_rx_poll;
 		rx_adapter->wrr_sched = rx_wrr;
 		rx_adapter->wrr_len = nb_wrr;
+		rx_adapter->num_intr_vec += num_intr_vec;
 
 		if (dev_info->nb_dev_queues == 0) {
 			rte_free(dev_info->rx_queue);
 			dev_info->rx_queue = NULL;
 		}
+unlock_ret:
 		rte_spinlock_unlock(&rx_adapter->rx_lock);
+		if (ret) {
+			rte_free(rx_poll);
+			rte_free(rx_wrr);
+			return ret;
+		}
 
 		rte_service_component_runstate_set(rx_adapter->service_id,
 				rxa_sw_adapter_queue_count(rx_adapter));
@@ -1377,7 +2249,6 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 	return ret;
 }
 
-
 int
 rte_event_eth_rx_adapter_start(uint8_t id)
 {
diff --git a/doc/guides/prog_guide/event_ethernet_rx_adapter.rst b/doc/guides/prog_guide/event_ethernet_rx_adapter.rst
index 319e4f0..2f055ec 100644
--- a/doc/guides/prog_guide/event_ethernet_rx_adapter.rst
+++ b/doc/guides/prog_guide/event_ethernet_rx_adapter.rst
@@ -144,3 +144,27 @@ enqueued event counts are a sum of the counts from the eventdev PMD callbacks
 if the callback is supported, and the counts maintained by the service function,
 if one exists. The service function also maintains a count of cycles for which
 it was not able to enqueue to the event device.
+
+Interrupt Based Rx Queues
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The service core function is typically set up to poll ethernet Rx queues for
+packets. Certain queues may have low packet rates and it would be more
+efficient to enable the Rx queue interrupt and read packets after receiving
+the interrupt.
+
+The servicing_weight member of struct rte_event_eth_rx_adapter_queue_conf
+is applicable when the adapter uses a service core function. The application
+has to enable Rx queue interrupts when configuring the ethernet device
+uing the ``rte_eth_dev_configue()`` function and then use a servicing_weight
+of zero when addding the Rx queue to the adapter.
+
+The adapter creates a thread blocked on the interrupt, on an interrupt this
+thread enqueues the port id and the queue id to a ring buffer. The adapter
+service function dequeues the port id and queue id from the ring buffer,
+invokes the ``rte_eth_rx_burst()`` to receive packets on the queue and
+converts the received packets to events in the same manner as packets
+received on a polled Rx queue. The interrupt thread is affinitized to the same
+CPUs as the lcores of the Rx adapter service function, if the Rx adapter
+service function has not been mapped to any lcores, the interrupt thread
+is mapped to the master lcore.
diff --git a/config/common_base b/config/common_base
index fcf3a1f..3cb5edd 100644
--- a/config/common_base
+++ b/config/common_base
@@ -597,6 +597,7 @@ CONFIG_RTE_LIBRTE_EVENTDEV_DEBUG=n
 CONFIG_RTE_EVENT_MAX_DEVS=16
 CONFIG_RTE_EVENT_MAX_QUEUES_PER_DEV=64
 CONFIG_RTE_EVENT_TIMER_ADAPTER_NUM_MAX=32
+CONFIG_RTE_EVENT_ETH_INTR_RING_SIZE=1024
 CONFIG_RTE_EVENT_CRYPTO_ADAPTER_MAX_INSTANCE=32
 
 #
diff --git a/lib/librte_eventdev/Makefile b/lib/librte_eventdev/Makefile
index b3e2546..24af956 100644
--- a/lib/librte_eventdev/Makefile
+++ b/lib/librte_eventdev/Makefile
@@ -8,7 +8,7 @@ include $(RTE_SDK)/mk/rte.vars.mk
 LIB = librte_eventdev.a
 
 # library version
-LIBABIVER := 4
+LIBABIVER := 5
 
 # build flags
 CFLAGS += -DALLOW_EXPERIMENTAL_API
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v2 5/5] eventdev: add Rx adapter tests for interrupt driven queues
  2018-06-27 10:55 ` [PATCH v2 0/5] eventdev: add interrupt driven queues to Rx adapter Nikhil Rao
                     ` (3 preceding siblings ...)
  2018-06-27 10:55   ` [PATCH v2 4/5] eventdev: add interrupt driven queues to Rx adapter Nikhil Rao
@ 2018-06-27 10:55   ` Nikhil Rao
  4 siblings, 0 replies; 19+ messages in thread
From: Nikhil Rao @ 2018-06-27 10:55 UTC (permalink / raw)
  To: jerin.jacob; +Cc: nikhil.rao, dev

Add test for queue add and delete, the add/delete calls
also switch queues between poll and interrupt mode.

Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
---
 test/test/test_event_eth_rx_adapter.c | 261 +++++++++++++++++++++++++++++++---
 1 file changed, 242 insertions(+), 19 deletions(-)

diff --git a/test/test/test_event_eth_rx_adapter.c b/test/test/test_event_eth_rx_adapter.c
index d432731..2337e54 100644
--- a/test/test/test_event_eth_rx_adapter.c
+++ b/test/test/test_event_eth_rx_adapter.c
@@ -25,28 +25,17 @@ struct event_eth_rx_adapter_test_params {
 	struct rte_mempool *mp;
 	uint16_t rx_rings, tx_rings;
 	uint32_t caps;
+	int rx_intr_port_inited;
+	uint16_t rx_intr_port;
 };
 
 static struct event_eth_rx_adapter_test_params default_params;
 
 static inline int
-port_init(uint8_t port, struct rte_mempool *mp)
+port_init_common(uint8_t port, const struct rte_eth_conf *port_conf,
+		struct rte_mempool *mp)
 {
-	static const struct rte_eth_conf port_conf_default = {
-		.rxmode = {
-			.mq_mode = ETH_MQ_RX_RSS,
-			.max_rx_pkt_len = ETHER_MAX_LEN
-		},
-		.rx_adv_conf = {
-			.rss_conf = {
-				.rss_hf = ETH_RSS_IP |
-					  ETH_RSS_TCP |
-					  ETH_RSS_UDP,
-			}
-		}
-	};
 	const uint16_t rx_ring_size = 512, tx_ring_size = 512;
-	struct rte_eth_conf port_conf = port_conf_default;
 	int retval;
 	uint16_t q;
 	struct rte_eth_dev_info dev_info;
@@ -54,7 +43,7 @@ struct event_eth_rx_adapter_test_params {
 	if (!rte_eth_dev_is_valid_port(port))
 		return -1;
 
-	retval = rte_eth_dev_configure(port, 0, 0, &port_conf);
+	retval = rte_eth_dev_configure(port, 0, 0, port_conf);
 
 	rte_eth_dev_info_get(port, &dev_info);
 
@@ -64,7 +53,7 @@ struct event_eth_rx_adapter_test_params {
 
 	/* Configure the Ethernet device. */
 	retval = rte_eth_dev_configure(port, default_params.rx_rings,
-				default_params.tx_rings, &port_conf);
+				default_params.tx_rings, port_conf);
 	if (retval != 0)
 		return retval;
 
@@ -104,6 +93,77 @@ struct event_eth_rx_adapter_test_params {
 	return 0;
 }
 
+static inline int
+port_init_rx_intr(uint8_t port, struct rte_mempool *mp)
+{
+	static const struct rte_eth_conf port_conf_default = {
+		.rxmode = {
+			.mq_mode = ETH_MQ_RX_RSS,
+			.max_rx_pkt_len = ETHER_MAX_LEN
+		},
+		.intr_conf = {
+			.rxq = 1,
+		},
+	};
+
+	return port_init_common(port, &port_conf_default, mp);
+}
+
+static inline int
+port_init(uint8_t port, struct rte_mempool *mp)
+{
+	static const struct rte_eth_conf port_conf_default = {
+		.rxmode = {
+			.mq_mode = ETH_MQ_RX_RSS,
+			.max_rx_pkt_len = ETHER_MAX_LEN
+		},
+		.rx_adv_conf = {
+			.rss_conf = {
+				.rss_hf = ETH_RSS_IP |
+					ETH_RSS_TCP |
+					ETH_RSS_UDP,
+			}
+		}
+	};
+
+	return port_init_common(port, &port_conf_default, mp);
+}
+
+static int
+init_port_rx_intr(int num_ports)
+{
+	int retval;
+	uint16_t portid;
+	int err;
+
+	default_params.mp = rte_pktmbuf_pool_create("packet_pool",
+						   NB_MBUFS,
+						   MBUF_CACHE_SIZE,
+						   MBUF_PRIV_SIZE,
+						   RTE_MBUF_DEFAULT_BUF_SIZE,
+						   rte_socket_id());
+	if (!default_params.mp)
+		return -ENOMEM;
+
+	RTE_ETH_FOREACH_DEV(portid) {
+		retval = port_init_rx_intr(portid, default_params.mp);
+		if (retval)
+			continue;
+		err = rte_event_eth_rx_adapter_caps_get(TEST_DEV_ID, portid,
+							&default_params.caps);
+		if (err)
+			continue;
+		if (!(default_params.caps &
+			RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) {
+			default_params.rx_intr_port_inited = 1;
+			default_params.rx_intr_port = portid;
+			return 0;
+		}
+		rte_eth_dev_stop(portid);
+	}
+	return 0;
+}
+
 static int
 init_ports(int num_ports)
 {
@@ -181,6 +241,57 @@ struct event_eth_rx_adapter_test_params {
 	return err;
 }
 
+static int
+testsuite_setup_rx_intr(void)
+{
+	int err;
+	uint8_t count;
+	struct rte_event_dev_info dev_info;
+
+	count = rte_event_dev_count();
+	if (!count) {
+		printf("Failed to find a valid event device,"
+			" testing with event_skeleton device\n");
+		rte_vdev_init("event_skeleton", NULL);
+	}
+
+	struct rte_event_dev_config config = {
+		.nb_event_queues = 1,
+		.nb_event_ports = 1,
+	};
+
+	err = rte_event_dev_info_get(TEST_DEV_ID, &dev_info);
+	config.nb_event_queue_flows = dev_info.max_event_queue_flows;
+	config.nb_event_port_dequeue_depth =
+			dev_info.max_event_port_dequeue_depth;
+	config.nb_event_port_enqueue_depth =
+			dev_info.max_event_port_enqueue_depth;
+	config.nb_events_limit =
+			dev_info.max_num_events;
+
+	err = rte_event_dev_configure(TEST_DEV_ID, &config);
+	TEST_ASSERT(err == 0, "Event device initialization failed err %d\n",
+			err);
+
+	/*
+	 * eth devices like octeontx use event device to receive packets
+	 * so rte_eth_dev_start invokes rte_event_dev_start internally, so
+	 * call init_ports after rte_event_dev_configure
+	 */
+	err = init_port_rx_intr(rte_eth_dev_count_total());
+	TEST_ASSERT(err == 0, "Port initialization failed err %d\n", err);
+
+	if (!default_params.rx_intr_port_inited)
+		return 0;
+
+	err = rte_event_eth_rx_adapter_caps_get(TEST_DEV_ID,
+						default_params.rx_intr_port,
+						&default_params.caps);
+	TEST_ASSERT(err == 0, "Failed to get adapter cap err %d\n", err);
+
+	return err;
+}
+
 static void
 testsuite_teardown(void)
 {
@@ -191,6 +302,16 @@ struct event_eth_rx_adapter_test_params {
 	rte_mempool_free(default_params.mp);
 }
 
+static void
+testsuite_teardown_rx_intr(void)
+{
+	if (!default_params.rx_intr_port_inited)
+		return;
+
+	rte_eth_dev_stop(default_params.rx_intr_port);
+	rte_mempool_free(default_params.mp);
+}
+
 static int
 adapter_create(void)
 {
@@ -401,6 +522,89 @@ struct event_eth_rx_adapter_test_params {
 }
 
 static int
+adapter_intr_queue_add_del(void)
+{
+	int err;
+	struct rte_event ev;
+	uint32_t cap;
+	uint16_t eth_port;
+	struct rte_event_eth_rx_adapter_queue_conf queue_config;
+
+	if (!default_params.rx_intr_port_inited)
+		return 0;
+
+	eth_port = default_params.rx_intr_port;
+	err = rte_event_eth_rx_adapter_caps_get(TEST_DEV_ID, eth_port, &cap);
+	TEST_ASSERT(err == 0, "Expected 0 got %d", err);
+
+	ev.queue_id = 0;
+	ev.sched_type = RTE_SCHED_TYPE_ATOMIC;
+	ev.priority = 0;
+
+	queue_config.rx_queue_flags = 0;
+	queue_config.ev = ev;
+
+	/* weight = 0 => interrupt mode */
+	queue_config.servicing_weight = 0;
+
+	/* add queue 0 */
+	err = rte_event_eth_rx_adapter_queue_add(TEST_INST_ID,
+						TEST_ETHDEV_ID, 0,
+						&queue_config);
+	TEST_ASSERT(err == 0, "Expected 0 got %d", err);
+
+	/* add all queues */
+	queue_config.servicing_weight = 0;
+	err = rte_event_eth_rx_adapter_queue_add(TEST_INST_ID,
+						TEST_ETHDEV_ID,
+						-1,
+						&queue_config);
+	TEST_ASSERT(err == 0, "Expected 0 got %d", err);
+
+	/* del queue 0 */
+	err = rte_event_eth_rx_adapter_queue_del(TEST_INST_ID,
+						TEST_ETHDEV_ID,
+						0);
+	TEST_ASSERT(err == 0, "Expected 0 got %d", err);
+
+	/* del remaining queues */
+	err = rte_event_eth_rx_adapter_queue_del(TEST_INST_ID,
+						TEST_ETHDEV_ID,
+						-1);
+	TEST_ASSERT(err == 0, "Expected 0 got %d", err);
+
+	/* add all queues */
+	queue_config.servicing_weight = 0;
+	err = rte_event_eth_rx_adapter_queue_add(TEST_INST_ID,
+						TEST_ETHDEV_ID,
+						-1,
+						&queue_config);
+	TEST_ASSERT(err == 0, "Expected 0 got %d", err);
+
+	/* intr -> poll mode queue */
+	queue_config.servicing_weight = 1;
+	err = rte_event_eth_rx_adapter_queue_add(TEST_INST_ID,
+						TEST_ETHDEV_ID,
+						0,
+						&queue_config);
+	TEST_ASSERT(err == 0, "Expected 0 got %d", err);
+
+	err = rte_event_eth_rx_adapter_queue_add(TEST_INST_ID,
+						TEST_ETHDEV_ID,
+						-1,
+						 &queue_config);
+	TEST_ASSERT(err == 0, "Expected 0 got %d", err);
+
+	/* del queues */
+	err = rte_event_eth_rx_adapter_queue_del(TEST_INST_ID,
+						TEST_ETHDEV_ID,
+						-1);
+	TEST_ASSERT(err == 0, "Expected 0 got %d", err);
+
+	return TEST_SUCCESS;
+}
+
+static int
 adapter_start_stop(void)
 {
 	int err;
@@ -470,7 +674,7 @@ struct event_eth_rx_adapter_test_params {
 	return TEST_SUCCESS;
 }
 
-static struct unit_test_suite service_tests  = {
+static struct unit_test_suite event_eth_rx_tests = {
 	.suite_name = "rx event eth adapter test suite",
 	.setup = testsuite_setup,
 	.teardown = testsuite_teardown,
@@ -485,11 +689,30 @@ struct event_eth_rx_adapter_test_params {
 	}
 };
 
+static struct unit_test_suite event_eth_rx_intr_tests = {
+	.suite_name = "rx event eth adapter test suite",
+	.setup = testsuite_setup_rx_intr,
+	.teardown = testsuite_teardown_rx_intr,
+	.unit_test_cases = {
+		TEST_CASE_ST(adapter_create, adapter_free,
+			adapter_intr_queue_add_del),
+		TEST_CASES_END() /**< NULL terminate unit test array */
+	}
+};
+
 static int
 test_event_eth_rx_adapter_common(void)
 {
-	return unit_test_suite_runner(&service_tests);
+	return unit_test_suite_runner(&event_eth_rx_tests);
+}
+
+static int
+test_event_eth_rx_intr_adapter_common(void)
+{
+	return unit_test_suite_runner(&event_eth_rx_intr_tests);
 }
 
 REGISTER_TEST_COMMAND(event_eth_rx_adapter_autotest,
 		test_event_eth_rx_adapter_common);
+REGISTER_TEST_COMMAND(event_eth_rx_intr_adapter_autotest,
+		test_event_eth_rx_intr_adapter_common);
-- 
1.8.3.1

^ permalink raw reply related	[flat|nested] 19+ messages in thread

end of thread, other threads:[~2018-06-27 10:55 UTC | newest]

Thread overview: 19+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-06-08 18:15 [PATCH v1 0/4] eventdev: add interrupt driven queues to Rx adapter Nikhil Rao
2018-06-08 18:15 ` [PATCH v1 1/4] eventdev: standardize Rx adapter internal function names Nikhil Rao
2018-06-17 13:24   ` Jerin Jacob
2018-06-08 18:15 ` [PATCH v1 2/4] eventdev: improve err handling for Rx adapter queue add/del Nikhil Rao
2018-06-17 13:31   ` Jerin Jacob
2018-06-18 12:13     ` Rao, Nikhil
2018-06-08 18:15 ` [PATCH v1 3/4] eventdev: move Rx adapter eth receive code to separate function Nikhil Rao
2018-06-17 13:35   ` Jerin Jacob
2018-06-08 18:15 ` [PATCH v1 3/4] eventdev: move Rx adapter eth Rx " Nikhil Rao
2018-06-17 13:36   ` Jerin Jacob
2018-06-08 18:15 ` [PATCH v1 4/4] eventdev: add interrupt driven queues in Rx event adapter Nikhil Rao
2018-06-17 13:49   ` Jerin Jacob
2018-06-18 13:15     ` Rao, Nikhil
2018-06-27 10:55 ` [PATCH v2 0/5] eventdev: add interrupt driven queues to Rx adapter Nikhil Rao
2018-06-27 10:55   ` [PATCH v2 1/5] eventdev: standardize Rx adapter internal function names Nikhil Rao
2018-06-27 10:55   ` [PATCH v2 2/5] eventdev: improve err handling for Rx adapter queue add/del Nikhil Rao
2018-06-27 10:55   ` [PATCH v2 3/5] eventdev: move Rx adapter eth Rx to separate function Nikhil Rao
2018-06-27 10:55   ` [PATCH v2 4/5] eventdev: add interrupt driven queues to Rx adapter Nikhil Rao
2018-06-27 10:55   ` [PATCH v2 5/5] eventdev: add Rx adapter tests for interrupt driven queues Nikhil Rao

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.