From mboxrd@z Thu Jan 1 00:00:00 1970 From: =?UTF-8?Q?Mattias_R=c3=b6nnblom?= Subject: Re: [PATCH v2 1/1] eventdev: add new software event timer adapter Date: Sun, 9 Dec 2018 20:17:22 +0100 Message-ID: References: <1543534514-183766-1-git-send-email-erik.g.carrillo@intel.com> <1544214885-6811-1-git-send-email-erik.g.carrillo@intel.com> <1544214885-6811-2-git-send-email-erik.g.carrillo@intel.com> Mime-Version: 1.0 Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 7bit Cc: pbhagavatula@caviumnetworks.com, rsanford@akamai.com, stephen@networkplumber.org, dev@dpdk.org To: Erik Gabriel Carrillo , jerin.jacob@caviumnetworks.com Return-path: Received: from mail.lysator.liu.se (mail.lysator.liu.se [130.236.254.3]) by dpdk.org (Postfix) with ESMTP id C27682C02 for ; Sun, 9 Dec 2018 20:17:31 +0100 (CET) Received: from mail.lysator.liu.se (localhost [127.0.0.1]) by mail.lysator.liu.se (Postfix) with ESMTP id CC5C540062 for ; Sun, 9 Dec 2018 20:17:30 +0100 (CET) In-Reply-To: <1544214885-6811-2-git-send-email-erik.g.carrillo@intel.com> Content-Language: en-US List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" On 2018-12-07 21:34, Erik Gabriel Carrillo wrote: > This patch introduces a new version of the event timer adapter software > PMD. In the original design, timer event producer lcores in the primary > and secondary processes enqueued event timers into a ring, and a > service core in the primary process dequeued them and processed them > further. To improve performance, this version does away with the ring > and lets lcores in both primary and secondary processes insert timers > directly into timer skiplist data structures; the service core directly > accesses the lists as well, when looking for timers that have expired. > > Signed-off-by: Erik Gabriel Carrillo > --- > lib/librte_eventdev/rte_event_timer_adapter.c | 687 +++++++++++--------------- > 1 file changed, 275 insertions(+), 412 deletions(-) > > diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c > index 79070d4..9c528cb 100644 > --- a/lib/librte_eventdev/rte_event_timer_adapter.c > +++ b/lib/librte_eventdev/rte_event_timer_adapter.c > @@ -7,6 +7,7 @@ > #include > #include > #include > +#include > You have no assert() calls, from what I can see. Include for RTE_ASSERT(). > #include > #include > @@ -19,6 +20,7 @@ > #include > #include > #include > +#include > > #include "rte_eventdev.h" > #include "rte_eventdev_pmd.h" > @@ -34,7 +36,7 @@ static int evtim_buffer_logtype; > > static struct rte_event_timer_adapter adapters[RTE_EVENT_TIMER_ADAPTER_NUM_MAX]; > > -static const struct rte_event_timer_adapter_ops sw_event_adapter_timer_ops; > +static const struct rte_event_timer_adapter_ops swtim_ops; > > #define EVTIM_LOG(level, logtype, ...) \ > rte_log(RTE_LOG_ ## level, logtype, \ > @@ -211,7 +213,7 @@ rte_event_timer_adapter_create_ext( > * implementation. > */ > if (adapter->ops == NULL) > - adapter->ops = &sw_event_adapter_timer_ops; > + adapter->ops = &swtim_ops; > > /* Allow driver to do some setup */ > FUNC_PTR_OR_NULL_RET_WITH_ERRNO(adapter->ops->init, -ENOTSUP); > @@ -334,7 +336,7 @@ rte_event_timer_adapter_lookup(uint16_t adapter_id) > * implementation. > */ > if (adapter->ops == NULL) > - adapter->ops = &sw_event_adapter_timer_ops; > + adapter->ops = &swtim_ops; > > /* Set fast-path function pointers */ > adapter->arm_burst = adapter->ops->arm_burst; > @@ -491,6 +493,7 @@ event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id, > } > > *nb_events_inv = 0; > + > *nb_events_flushed = rte_event_enqueue_burst(dev_id, port_id, > &events[tail_idx], n); > if (*nb_events_flushed != n && rte_errno == -EINVAL) { > @@ -498,137 +501,123 @@ event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id, > (*nb_events_inv)++; > } > > + if (*nb_events_flushed > 0) > + EVTIM_BUF_LOG_DBG("enqueued %"PRIu16" timer events to event " > + "device", *nb_events_flushed); > + > bufp->tail = bufp->tail + *nb_events_flushed + *nb_events_inv; > } > > /* > * Software event timer adapter implementation > */ > - > -struct rte_event_timer_adapter_sw_data { > - /* List of messages for outstanding timers */ > - TAILQ_HEAD(, msg) msgs_tailq_head; > - /* Lock to guard tailq and armed count */ > - rte_spinlock_t msgs_tailq_sl; > +struct swtim { > /* Identifier of service executing timer management logic. */ > uint32_t service_id; > /* The cycle count at which the adapter should next tick */ > uint64_t next_tick_cycles; > - /* Incremented as the service moves through phases of an iteration */ > - volatile int service_phase; > /* The tick resolution used by adapter instance. May have been > * adjusted from what user requested > */ > uint64_t timer_tick_ns; > /* Maximum timeout in nanoseconds allowed by adapter instance. */ > uint64_t max_tmo_ns; > - /* Ring containing messages to arm or cancel event timers */ > - struct rte_ring *msg_ring; > - /* Mempool containing msg objects */ > - struct rte_mempool *msg_pool; > /* Buffered timer expiry events to be enqueued to an event device. */ > struct event_buffer buffer; > /* Statistics */ > struct rte_event_timer_adapter_stats stats; > - /* The number of threads currently adding to the message ring */ > - rte_atomic16_t message_producer_count; > + /* Mempool of timer objects */ > + struct rte_mempool *tim_pool; > + /* Back pointer for convenience */ > + struct rte_event_timer_adapter *adapter; > + /* Identifier of timer data instance */ > + uint32_t timer_data_id; > + /* Track which cores have actually armed a timer */ > + rte_atomic16_t in_use[RTE_MAX_LCORE]; > + /* Track which cores' timer lists should be polled */ > + unsigned int poll_lcores[RTE_MAX_LCORE]; > + /* The number of lists that should be polled */ > + int n_poll_lcores; > + /* Lock to atomically access the above two variables */ > + rte_spinlock_t poll_lcores_sl; > }; > > -enum msg_type {MSG_TYPE_ARM, MSG_TYPE_CANCEL}; > - > -struct msg { > - enum msg_type type; > - struct rte_event_timer *evtim; > - struct rte_timer tim; > - TAILQ_ENTRY(msg) msgs; > -}; > +static inline struct swtim * > +swtim_pmd_priv(const struct rte_event_timer_adapter *adapter) > +{ > + return adapter->data->adapter_priv; > +} > > static void > -sw_event_timer_cb(struct rte_timer *tim, void *arg) > +swtim_callback(void *arg) > { > - int ret; > + struct rte_timer *tim = arg; > + struct rte_event_timer *evtim = tim->arg; > + struct rte_event_timer_adapter *adapter; > + struct swtim *sw; > uint16_t nb_evs_flushed = 0; > uint16_t nb_evs_invalid = 0; > uint64_t opaque; > - struct rte_event_timer *evtim; > - struct rte_event_timer_adapter *adapter; > - struct rte_event_timer_adapter_sw_data *sw_data; > + int ret; > > - evtim = arg; > opaque = evtim->impl_opaque[1]; > adapter = (struct rte_event_timer_adapter *)(uintptr_t)opaque; > - sw_data = adapter->data->adapter_priv; > + sw = swtim_pmd_priv(adapter); > > - ret = event_buffer_add(&sw_data->buffer, &evtim->ev); > + ret = event_buffer_add(&sw->buffer, &evtim->ev); > if (ret < 0) { > /* If event buffer is full, put timer back in list with > * immediate expiry value, so that we process it again on the > * next iteration. > */ > - rte_timer_reset_sync(tim, 0, SINGLE, rte_lcore_id(), > - sw_event_timer_cb, evtim); > + rte_timer_alt_reset(sw->timer_data_id, tim, 0, SINGLE, > + rte_lcore_id(), NULL, evtim); > + > + sw->stats.evtim_retry_count++; > > - sw_data->stats.evtim_retry_count++; > EVTIM_LOG_DBG("event buffer full, resetting rte_timer with " > "immediate expiry value"); > } else { > - struct msg *m = container_of(tim, struct msg, tim); > - TAILQ_REMOVE(&sw_data->msgs_tailq_head, m, msgs); > EVTIM_BUF_LOG_DBG("buffered an event timer expiry event"); > - evtim->state = RTE_EVENT_TIMER_NOT_ARMED; > + rte_mempool_put(sw->tim_pool, tim); > + sw->stats.evtim_exp_count++; > > - /* Free the msg object containing the rte_timer now that > - * we've buffered its event successfully. > - */ > - rte_mempool_put(sw_data->msg_pool, m); > - > - /* Bump the count when we successfully add an expiry event to > - * the buffer. > - */ > - sw_data->stats.evtim_exp_count++; > + evtim->state = RTE_EVENT_TIMER_NOT_ARMED; > } > > - if (event_buffer_batch_ready(&sw_data->buffer)) { > - event_buffer_flush(&sw_data->buffer, > + if (event_buffer_batch_ready(&sw->buffer)) { > + event_buffer_flush(&sw->buffer, > adapter->data->event_dev_id, > adapter->data->event_port_id, > &nb_evs_flushed, > &nb_evs_invalid); > > - sw_data->stats.ev_enq_count += nb_evs_flushed; > - sw_data->stats.ev_inv_count += nb_evs_invalid; > + sw->stats.ev_enq_count += nb_evs_flushed; > + sw->stats.ev_inv_count += nb_evs_invalid; > } > } > > static __rte_always_inline uint64_t > get_timeout_cycles(struct rte_event_timer *evtim, > - struct rte_event_timer_adapter *adapter) > + const struct rte_event_timer_adapter *adapter) > { > - uint64_t timeout_ns; > - struct rte_event_timer_adapter_sw_data *sw_data; > - > - sw_data = adapter->data->adapter_priv; > - timeout_ns = evtim->timeout_ticks * sw_data->timer_tick_ns; > + struct swtim *sw = swtim_pmd_priv(adapter); > + uint64_t timeout_ns = evtim->timeout_ticks * sw->timer_tick_ns; > return timeout_ns * rte_get_timer_hz() / NSECPERSEC; > - > } > > /* This function returns true if one or more (adapter) ticks have occurred since > * the last time it was called. > */ > static inline bool > -adapter_did_tick(struct rte_event_timer_adapter *adapter) > +swtim_did_tick(struct swtim *sw) > { > uint64_t cycles_per_adapter_tick, start_cycles; > uint64_t *next_tick_cyclesp; > - struct rte_event_timer_adapter_sw_data *sw_data; > - > - sw_data = adapter->data->adapter_priv; > - next_tick_cyclesp = &sw_data->next_tick_cycles; > > - cycles_per_adapter_tick = sw_data->timer_tick_ns * > + next_tick_cyclesp = &sw->next_tick_cycles; > + cycles_per_adapter_tick = sw->timer_tick_ns * > (rte_get_timer_hz() / NSECPERSEC); > - > start_cycles = rte_get_timer_cycles(); > > /* Note: initially, *next_tick_cyclesp == 0, so the clause below will > @@ -640,7 +629,6 @@ adapter_did_tick(struct rte_event_timer_adapter *adapter) > * boundary. > */ > start_cycles -= start_cycles % cycles_per_adapter_tick; > - > *next_tick_cyclesp = start_cycles + cycles_per_adapter_tick; > > return true; > @@ -655,15 +643,12 @@ check_timeout(struct rte_event_timer *evtim, > const struct rte_event_timer_adapter *adapter) > { > uint64_t tmo_nsec; > - struct rte_event_timer_adapter_sw_data *sw_data; > - > - sw_data = adapter->data->adapter_priv; > - tmo_nsec = evtim->timeout_ticks * sw_data->timer_tick_ns; > + struct swtim *sw = swtim_pmd_priv(adapter); > > - if (tmo_nsec > sw_data->max_tmo_ns) > + tmo_nsec = evtim->timeout_ticks * sw->timer_tick_ns; > + if (tmo_nsec > sw->max_tmo_ns) > return -1; > - > - if (tmo_nsec < sw_data->timer_tick_ns) > + if (tmo_nsec < sw->timer_tick_ns) > return -2; > > return 0; > @@ -691,110 +676,34 @@ check_destination_event_queue(struct rte_event_timer *evtim, > return 0; > } > > -#define NB_OBJS 32 > static int > -sw_event_timer_adapter_service_func(void *arg) > +swtim_service_func(void *arg) > { > - int i, num_msgs; > - uint64_t cycles, opaque; > + struct rte_event_timer_adapter *adapter = arg; > + struct swtim *sw = swtim_pmd_priv(adapter); > uint16_t nb_evs_flushed = 0; > uint16_t nb_evs_invalid = 0; > - struct rte_event_timer_adapter *adapter; > - struct rte_event_timer_adapter_sw_data *sw_data; > - struct rte_event_timer *evtim = NULL; > - struct rte_timer *tim = NULL; > - struct msg *msg, *msgs[NB_OBJS]; > - > - adapter = arg; > - sw_data = adapter->data->adapter_priv; > - > - sw_data->service_phase = 1; > - rte_smp_wmb(); > - > - while (rte_atomic16_read(&sw_data->message_producer_count) > 0 || > - !rte_ring_empty(sw_data->msg_ring)) { > - > - num_msgs = rte_ring_dequeue_burst(sw_data->msg_ring, > - (void **)msgs, NB_OBJS, NULL); > - > - for (i = 0; i < num_msgs; i++) { > - int ret = 0; > - > - RTE_SET_USED(ret); > - > - msg = msgs[i]; > - evtim = msg->evtim; > - > - switch (msg->type) { > - case MSG_TYPE_ARM: > - EVTIM_SVC_LOG_DBG("dequeued ARM message from " > - "ring"); > - tim = &msg->tim; > - rte_timer_init(tim); > - cycles = get_timeout_cycles(evtim, > - adapter); > - ret = rte_timer_reset(tim, cycles, SINGLE, > - rte_lcore_id(), > - sw_event_timer_cb, > - evtim); > - RTE_ASSERT(ret == 0); > - > - evtim->impl_opaque[0] = (uintptr_t)tim; > - evtim->impl_opaque[1] = (uintptr_t)adapter; > - > - TAILQ_INSERT_TAIL(&sw_data->msgs_tailq_head, > - msg, > - msgs); > - break; > - case MSG_TYPE_CANCEL: > - EVTIM_SVC_LOG_DBG("dequeued CANCEL message " > - "from ring"); > - opaque = evtim->impl_opaque[0]; > - tim = (struct rte_timer *)(uintptr_t)opaque; > - RTE_ASSERT(tim != NULL); > - > - ret = rte_timer_stop(tim); > - RTE_ASSERT(ret == 0); > - > - /* Free the msg object for the original arm > - * request. > - */ > - struct msg *m; > - m = container_of(tim, struct msg, tim); > - TAILQ_REMOVE(&sw_data->msgs_tailq_head, m, > - msgs); > - rte_mempool_put(sw_data->msg_pool, m); > - > - /* Free the msg object for the current msg */ > - rte_mempool_put(sw_data->msg_pool, msg); > - > - evtim->impl_opaque[0] = 0; > - evtim->impl_opaque[1] = 0; > - > - break; > - } > - } > - } > - > - sw_data->service_phase = 2; > - rte_smp_wmb(); > > - if (adapter_did_tick(adapter)) { > - rte_timer_manage(); > + if (swtim_did_tick(sw)) { > + /* This lock is seldom acquired on the arm side */ > + rte_spinlock_lock(&sw->poll_lcores_sl); > + rte_timer_alt_manage(sw->timer_data_id, > + sw->poll_lcores, > + sw->n_poll_lcores, > + swtim_callback); > + rte_spinlock_unlock(&sw->poll_lcores_sl); > > - event_buffer_flush(&sw_data->buffer, > + event_buffer_flush(&sw->buffer, > adapter->data->event_dev_id, > adapter->data->event_port_id, > - &nb_evs_flushed, &nb_evs_invalid); > + &nb_evs_flushed, > + &nb_evs_invalid); > > - sw_data->stats.ev_enq_count += nb_evs_flushed; > - sw_data->stats.ev_inv_count += nb_evs_invalid; > - sw_data->stats.adapter_tick_count++; > + sw->stats.ev_enq_count += nb_evs_flushed; > + sw->stats.ev_inv_count += nb_evs_invalid; > + sw->stats.adapter_tick_count++; > } > > - sw_data->service_phase = 0; > - rte_smp_wmb(); > - > return 0; > } > > @@ -828,168 +737,145 @@ compute_msg_mempool_cache_size(uint64_t nb_requested, uint64_t nb_actual) > return cache_size; > } > > -#define SW_MIN_INTERVAL 1E5 > - > static int > -sw_event_timer_adapter_init(struct rte_event_timer_adapter *adapter) > +swtim_init(struct rte_event_timer_adapter *adapter) > { > - int ret; > - struct rte_event_timer_adapter_sw_data *sw_data; > - uint64_t nb_timers; > + int i, ret; > + struct swtim *sw; > unsigned int flags; > struct rte_service_spec service; > - static bool timer_subsystem_inited; // static initialized to false > > - /* Allocate storage for SW implementation data */ > - char priv_data_name[RTE_RING_NAMESIZE]; > - snprintf(priv_data_name, RTE_RING_NAMESIZE, "sw_evtim_adap_priv_%"PRIu8, > - adapter->data->id); > - adapter->data->adapter_priv = rte_zmalloc_socket( > - priv_data_name, > - sizeof(struct rte_event_timer_adapter_sw_data), > - RTE_CACHE_LINE_SIZE, > - adapter->data->socket_id); > - if (adapter->data->adapter_priv == NULL) { > + /* Allocate storage for private data area */ > +#define SWTIM_NAMESIZE 32 > + char swtim_name[SWTIM_NAMESIZE]; > + snprintf(swtim_name, SWTIM_NAMESIZE, "swtim_%"PRIu8, > + adapter->data->id); > + sw = rte_zmalloc_socket(swtim_name, sizeof(*sw), RTE_CACHE_LINE_SIZE, > + adapter->data->socket_id); > + if (sw == NULL) { > EVTIM_LOG_ERR("failed to allocate space for private data"); > rte_errno = ENOMEM; > return -1; > } > > - if (adapter->data->conf.timer_tick_ns < SW_MIN_INTERVAL) { > - EVTIM_LOG_ERR("failed to create adapter with requested tick " > - "interval"); > - rte_errno = EINVAL; > - return -1; > - } > - > - sw_data = adapter->data->adapter_priv; > - > - sw_data->timer_tick_ns = adapter->data->conf.timer_tick_ns; > - sw_data->max_tmo_ns = adapter->data->conf.max_tmo_ns; > + /* Connect storage to adapter instance */ > + adapter->data->adapter_priv = sw; > + sw->adapter = adapter; > > - TAILQ_INIT(&sw_data->msgs_tailq_head); > - rte_spinlock_init(&sw_data->msgs_tailq_sl); > - rte_atomic16_init(&sw_data->message_producer_count); > - > - /* Rings require power of 2, so round up to next such value */ > - nb_timers = rte_align64pow2(adapter->data->conf.nb_timers); > - > - char msg_ring_name[RTE_RING_NAMESIZE]; > - snprintf(msg_ring_name, RTE_RING_NAMESIZE, > - "sw_evtim_adap_msg_ring_%"PRIu8, adapter->data->id); > - flags = adapter->data->conf.flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT ? > - RING_F_SP_ENQ | RING_F_SC_DEQ : > - RING_F_SC_DEQ; > - sw_data->msg_ring = rte_ring_create(msg_ring_name, nb_timers, > - adapter->data->socket_id, flags); > - if (sw_data->msg_ring == NULL) { > - EVTIM_LOG_ERR("failed to create message ring"); > - rte_errno = ENOMEM; > - goto free_priv_data; > - } > + sw->timer_tick_ns = adapter->data->conf.timer_tick_ns; > + sw->max_tmo_ns = adapter->data->conf.max_tmo_ns; > > - char pool_name[RTE_RING_NAMESIZE]; > - snprintf(pool_name, RTE_RING_NAMESIZE, "sw_evtim_adap_msg_pool_%"PRIu8, > + /* Create a timer pool */ > + char pool_name[SWTIM_NAMESIZE]; > + snprintf(pool_name, SWTIM_NAMESIZE, "swtim_pool_%"PRIu8, > adapter->data->id); > - > - /* Both the arming/canceling thread and the service thread will do puts > - * to the mempool, but if the SP_PUT flag is enabled, we can specify > - * single-consumer get for the mempool. > - */ > - flags = adapter->data->conf.flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT ? > - MEMPOOL_F_SC_GET : 0; > - > - /* The usable size of a ring is count - 1, so subtract one here to > - * make the counts agree. > - */ > + /* Optimal mempool size is a power of 2 minus one */ > + uint64_t nb_timers = rte_align64pow2(adapter->data->conf.nb_timers); > int pool_size = nb_timers - 1; > int cache_size = compute_msg_mempool_cache_size( > adapter->data->conf.nb_timers, nb_timers); > - sw_data->msg_pool = rte_mempool_create(pool_name, pool_size, > - sizeof(struct msg), cache_size, > - 0, NULL, NULL, NULL, NULL, > - adapter->data->socket_id, flags); > - if (sw_data->msg_pool == NULL) { > - EVTIM_LOG_ERR("failed to create message object mempool"); > + flags = 0; /* pool is multi-producer, multi-consumer */ > + sw->tim_pool = rte_mempool_create(pool_name, pool_size, > + sizeof(struct rte_timer), cache_size, 0, NULL, NULL, > + NULL, NULL, adapter->data->socket_id, flags); > + if (sw->tim_pool == NULL) { > + EVTIM_LOG_ERR("failed to create timer object mempool"); > rte_errno = ENOMEM; > - goto free_msg_ring; > + goto free_alloc; > + } > + > + /* Initialize the variables that track in-use timer lists */ > + rte_spinlock_init(&sw->poll_lcores_sl); > + for (i = 0; i < RTE_MAX_LCORE; i++) > + rte_atomic16_init(&sw->in_use[i]); > + > + /* Initialize the timer subsystem and allocate timer data instance */ > + ret = rte_timer_subsystem_init(); > + if (ret < 0) { > + if (ret != -EALREADY) { > + EVTIM_LOG_ERR("failed to initialize timer subsystem"); > + rte_errno = ret; > + goto free_mempool; > + } > + } > + > + ret = rte_timer_data_alloc(&sw->timer_data_id); > + if (ret < 0) { > + EVTIM_LOG_ERR("failed to allocate timer data instance"); > + rte_errno = ret; > + goto free_mempool; > } > > - event_buffer_init(&sw_data->buffer); > + /* Initialize timer event buffer */ > + event_buffer_init(&sw->buffer); > + > + sw->adapter = adapter; > > /* Register a service component to run adapter logic */ > memset(&service, 0, sizeof(service)); > snprintf(service.name, RTE_SERVICE_NAME_MAX, > - "sw_evimer_adap_svc_%"PRIu8, adapter->data->id); > + "swtim_svc_%"PRIu8, adapter->data->id); > service.socket_id = adapter->data->socket_id; > - service.callback = sw_event_timer_adapter_service_func; > + service.callback = swtim_service_func; > service.callback_userdata = adapter; > service.capabilities &= ~(RTE_SERVICE_CAP_MT_SAFE); > - ret = rte_service_component_register(&service, &sw_data->service_id); > + ret = rte_service_component_register(&service, &sw->service_id); > if (ret < 0) { > EVTIM_LOG_ERR("failed to register service %s with id %"PRIu32 > - ": err = %d", service.name, sw_data->service_id, > + ": err = %d", service.name, sw->service_id, > ret); > > rte_errno = ENOSPC; > - goto free_msg_pool; > + goto free_mempool; > } > > EVTIM_LOG_DBG("registered service %s with id %"PRIu32, service.name, > - sw_data->service_id); > + sw->service_id); > > - adapter->data->service_id = sw_data->service_id; > + adapter->data->service_id = sw->service_id; > adapter->data->service_inited = 1; > > - if (!timer_subsystem_inited) { > - rte_timer_subsystem_init(); > - timer_subsystem_inited = true; > - } > - > return 0; > - > -free_msg_pool: > - rte_mempool_free(sw_data->msg_pool); > -free_msg_ring: > - rte_ring_free(sw_data->msg_ring); > -free_priv_data: > - rte_free(sw_data); > +free_mempool: > + rte_mempool_free(sw->tim_pool); > +free_alloc: > + rte_free(sw); > return -1; > } > > -static int > -sw_event_timer_adapter_uninit(struct rte_event_timer_adapter *adapter) > +static void > +swtim_free_tim(struct rte_timer *tim, void *arg) > { > - int ret; > - struct msg *m1, *m2; > - struct rte_event_timer_adapter_sw_data *sw_data = > - adapter->data->adapter_priv; > + struct swtim *sw = arg; > > - rte_spinlock_lock(&sw_data->msgs_tailq_sl); > - > - /* Cancel outstanding rte_timers and free msg objects */ > - m1 = TAILQ_FIRST(&sw_data->msgs_tailq_head); > - while (m1 != NULL) { > - EVTIM_LOG_DBG("freeing outstanding timer"); > - m2 = TAILQ_NEXT(m1, msgs); > - > - rte_timer_stop_sync(&m1->tim); > - rte_mempool_put(sw_data->msg_pool, m1); > + rte_mempool_put(sw->tim_pool, (void *)tim); > +} No cast required. > > - m1 = m2; > - } > +/* Traverse the list of outstanding timers and put them back in the mempool > + * before freeing the adapter to avoid leaking the memory. > + */ > +static int > +swtim_uninit(struct rte_event_timer_adapter *adapter) > +{ > + int ret; > + struct swtim *sw = swtim_pmd_priv(adapter); > > - rte_spinlock_unlock(&sw_data->msgs_tailq_sl); > + /* Free outstanding timers */ > + rte_timer_stop_all(sw->timer_data_id, > + sw->poll_lcores, > + sw->n_poll_lcores, > + swtim_free_tim, > + sw); > > - ret = rte_service_component_unregister(sw_data->service_id); > + ret = rte_service_component_unregister(sw->service_id); > if (ret < 0) { > EVTIM_LOG_ERR("failed to unregister service component"); > return ret; > } > > - rte_ring_free(sw_data->msg_ring); > - rte_mempool_free(sw_data->msg_pool); > - rte_free(adapter->data->adapter_priv); > + rte_mempool_free(sw->tim_pool); > + rte_free(sw); > + adapter->data->adapter_priv = NULL; > > return 0; > } > @@ -1010,88 +896,79 @@ get_mapped_count_for_service(uint32_t service_id) > } > > static int > -sw_event_timer_adapter_start(const struct rte_event_timer_adapter *adapter) > +swtim_start(const struct rte_event_timer_adapter *adapter) > { > int mapped_count; > - struct rte_event_timer_adapter_sw_data *sw_data; > - > - sw_data = adapter->data->adapter_priv; > + struct swtim *sw = swtim_pmd_priv(adapter); > > /* Mapping the service to more than one service core can introduce > * delays while one thread is waiting to acquire a lock, so only allow > * one core to be mapped to the service. > + * > + * Note: the service could be modified such that it spreads cores to > + * poll over multiple service instances. > */ > - mapped_count = get_mapped_count_for_service(sw_data->service_id); > + mapped_count = get_mapped_count_for_service(sw->service_id); > > - if (mapped_count == 1) > - return rte_service_component_runstate_set(sw_data->service_id, > - 1); > + if (mapped_count != 1) > + return mapped_count < 1 ? -ENOENT : -ENOTSUP; > > - return mapped_count < 1 ? -ENOENT : -ENOTSUP; > + return rte_service_component_runstate_set(sw->service_id, 1); > } > > static int > -sw_event_timer_adapter_stop(const struct rte_event_timer_adapter *adapter) > +swtim_stop(const struct rte_event_timer_adapter *adapter) > { > int ret; > - struct rte_event_timer_adapter_sw_data *sw_data = > - adapter->data->adapter_priv; > + struct swtim *sw = swtim_pmd_priv(adapter); > > - ret = rte_service_component_runstate_set(sw_data->service_id, 0); > + ret = rte_service_component_runstate_set(sw->service_id, 0); > if (ret < 0) > return ret; > > - /* Wait for the service to complete its final iteration before > - * stopping. > - */ > - while (sw_data->service_phase != 0) > + /* Wait for the service to complete its final iteration */ > + while (rte_service_may_be_active(sw->service_id)) > rte_pause(); > > - rte_smp_rmb(); > - > return 0; > } > > static void > -sw_event_timer_adapter_get_info(const struct rte_event_timer_adapter *adapter, > +swtim_get_info(const struct rte_event_timer_adapter *adapter, > struct rte_event_timer_adapter_info *adapter_info) > { > - struct rte_event_timer_adapter_sw_data *sw_data; > - sw_data = adapter->data->adapter_priv; > - > - adapter_info->min_resolution_ns = sw_data->timer_tick_ns; > - adapter_info->max_tmo_ns = sw_data->max_tmo_ns; > + struct swtim *sw = swtim_pmd_priv(adapter); > + adapter_info->min_resolution_ns = sw->timer_tick_ns; > + adapter_info->max_tmo_ns = sw->max_tmo_ns; > } > > static int > -sw_event_timer_adapter_stats_get(const struct rte_event_timer_adapter *adapter, > - struct rte_event_timer_adapter_stats *stats) > +swtim_stats_get(const struct rte_event_timer_adapter *adapter, > + struct rte_event_timer_adapter_stats *stats) > { > - struct rte_event_timer_adapter_sw_data *sw_data; > - sw_data = adapter->data->adapter_priv; > - *stats = sw_data->stats; > + struct swtim *sw = swtim_pmd_priv(adapter); > + *stats = sw->stats; /* structure copy */ > return 0; > } > > static int > -sw_event_timer_adapter_stats_reset( > - const struct rte_event_timer_adapter *adapter) > +swtim_stats_reset(const struct rte_event_timer_adapter *adapter) > { > - struct rte_event_timer_adapter_sw_data *sw_data; > - sw_data = adapter->data->adapter_priv; > - memset(&sw_data->stats, 0, sizeof(sw_data->stats)); > + struct swtim *sw = swtim_pmd_priv(adapter); > + memset(&sw->stats, 0, sizeof(sw->stats)); > return 0; > } > > -static __rte_always_inline uint16_t > -__sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter, > - struct rte_event_timer **evtims, > - uint16_t nb_evtims) > +static uint16_t > +__swtim_arm_burst(const struct rte_event_timer_adapter *adapter, > + struct rte_event_timer **evtims, > + uint16_t nb_evtims) > { > - uint16_t i; > - int ret; > - struct rte_event_timer_adapter_sw_data *sw_data; > - struct msg *msgs[nb_evtims]; > + int i, ret; > + struct swtim *sw = swtim_pmd_priv(adapter); > + uint32_t lcore_id = rte_lcore_id(); > + struct rte_timer *tim, *tims[nb_evtims]; > + uint64_t cycles; > > #ifdef RTE_LIBRTE_EVENTDEV_DEBUG > /* Check that the service is running. */ > @@ -1101,101 +978,104 @@ __sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter, > } > #endif > > - sw_data = adapter->data->adapter_priv; > + /* Adjust lcore_id if non-EAL thread. Arbitrarily pick the timer list of > + * the highest lcore to insert such timers into > + */ > + if (lcore_id == LCORE_ID_ANY) > + lcore_id = RTE_MAX_LCORE - 1; > + > + /* If this is the first time we're arming an event timer on this lcore, > + * mark this lcore as "in use"; this will cause the service > + * function to process the timer list that corresponds to this lcore. > + */ > + if (unlikely(rte_atomic16_test_and_set(&sw->in_use[lcore_id]))) { I suspect we have a performance critical false sharing issue above. Many/all flags are going to be arranged on the same cache line. > + rte_spinlock_lock(&sw->poll_lcores_sl); > + EVTIM_LOG_DBG("Adding lcore id = %u to list of lcores to poll", > + lcore_id); > + sw->poll_lcores[sw->n_poll_lcores++] = lcore_id; > + rte_spinlock_unlock(&sw->poll_lcores_sl); > + } > > - ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims); > + ret = rte_mempool_get_bulk(sw->tim_pool, (void **)tims, > + nb_evtims); > if (ret < 0) { > rte_errno = ENOSPC; > return 0; > } > > - /* Let the service know we're producing messages for it to process */ > - rte_atomic16_inc(&sw_data->message_producer_count); > - > - /* If the service is managing timers, wait for it to finish */ > - while (sw_data->service_phase == 2) > - rte_pause(); > - > - rte_smp_rmb(); > - > for (i = 0; i < nb_evtims; i++) { > /* Don't modify the event timer state in these cases */ > if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) { > rte_errno = EALREADY; > break; > } else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED || > - evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) { > + evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) { > rte_errno = EINVAL; > break; > } > > ret = check_timeout(evtims[i], adapter); > - if (ret == -1) { > + if (unlikely(ret == -1)) { > evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE; > rte_errno = EINVAL; > break; > - } > - if (ret == -2) { > + } else if (unlikely(ret == -2)) { > evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY; > rte_errno = EINVAL; > break; > } > > - if (check_destination_event_queue(evtims[i], adapter) < 0) { > + if (unlikely(check_destination_event_queue(evtims[i], > + adapter) < 0)) { > evtims[i]->state = RTE_EVENT_TIMER_ERROR; > rte_errno = EINVAL; > break; > } > > - /* Checks passed, set up a message to enqueue */ > - msgs[i]->type = MSG_TYPE_ARM; > - msgs[i]->evtim = evtims[i]; > + tim = tims[i]; > + rte_timer_init(tim); > > - /* Set the payload pointer if not set. */ > - if (evtims[i]->ev.event_ptr == NULL) > - evtims[i]->ev.event_ptr = evtims[i]; > + evtims[i]->impl_opaque[0] = (uintptr_t)tim; > + evtims[i]->impl_opaque[1] = (uintptr_t)adapter; > > - /* msg objects that get enqueued successfully will be freed > - * either by a future cancel operation or by the timer > - * expiration callback. > - */ > - if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) { > - rte_errno = ENOSPC; > + cycles = get_timeout_cycles(evtims[i], adapter); > + ret = rte_timer_alt_reset(sw->timer_data_id, tim, cycles, > + SINGLE, lcore_id, NULL, evtims[i]); > + if (ret < 0) { > + /* tim was in RUNNING or CONFIG state */ > + evtims[i]->state = RTE_EVENT_TIMER_ERROR; > break; > } > > - EVTIM_LOG_DBG("enqueued ARM message to ring"); > - > + rte_smp_wmb(); > + EVTIM_LOG_DBG("armed an event timer"); > evtims[i]->state = RTE_EVENT_TIMER_ARMED; > } > > - /* Let the service know we're done producing messages */ > - rte_atomic16_dec(&sw_data->message_producer_count); > - > if (i < nb_evtims) > - rte_mempool_put_bulk(sw_data->msg_pool, (void **)&msgs[i], > - nb_evtims - i); > + rte_mempool_put_bulk(sw->tim_pool, > + (void **)&tims[i], nb_evtims - i); > > return i; > } > > static uint16_t > -sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter, > - struct rte_event_timer **evtims, > - uint16_t nb_evtims) > +swtim_arm_burst(const struct rte_event_timer_adapter *adapter, > + struct rte_event_timer **evtims, > + uint16_t nb_evtims) > { > - return __sw_event_timer_arm_burst(adapter, evtims, nb_evtims); > + return __swtim_arm_burst(adapter, evtims, nb_evtims); > } > > static uint16_t > -sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter, > - struct rte_event_timer **evtims, > - uint16_t nb_evtims) > +swtim_cancel_burst(const struct rte_event_timer_adapter *adapter, > + struct rte_event_timer **evtims, > + uint16_t nb_evtims) > { > - uint16_t i; > - int ret; > - struct rte_event_timer_adapter_sw_data *sw_data; > - struct msg *msgs[nb_evtims]; > + int i, ret; > + struct rte_timer *timp; > + uint64_t opaque; > + struct swtim *sw = swtim_pmd_priv(adapter); > > #ifdef RTE_LIBRTE_EVENTDEV_DEBUG > /* Check that the service is running. */ > @@ -1205,23 +1085,6 @@ sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter, > } > #endif > > - sw_data = adapter->data->adapter_priv; > - > - ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims); > - if (ret < 0) { > - rte_errno = ENOSPC; > - return 0; > - } > - > - /* Let the service know we're producing messages for it to process */ > - rte_atomic16_inc(&sw_data->message_producer_count); > - > - /* If the service could be modifying event timer states, wait */ > - while (sw_data->service_phase == 2) > - rte_pause(); > - > - rte_smp_rmb(); > - > for (i = 0; i < nb_evtims; i++) { > /* Don't modify the event timer state in these cases */ > if (evtims[i]->state == RTE_EVENT_TIMER_CANCELED) { > @@ -1232,54 +1095,54 @@ sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter, > break; > } > > - msgs[i]->type = MSG_TYPE_CANCEL; > - msgs[i]->evtim = evtims[i]; > + opaque = evtims[i]->impl_opaque[0]; > + timp = (struct rte_timer *)(uintptr_t)opaque; > + RTE_ASSERT(timp != NULL); > > - if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) { > - rte_errno = ENOSPC; > + ret = rte_timer_alt_stop(sw->timer_data_id, timp); > + if (ret < 0) { > + /* Timer is running or being configured */ > + rte_errno = EAGAIN; > break; > } > > - EVTIM_LOG_DBG("enqueued CANCEL message to ring"); > + rte_mempool_put(sw->tim_pool, (void **)timp); > > evtims[i]->state = RTE_EVENT_TIMER_CANCELED; > - } > + evtims[i]->impl_opaque[0] = 0; > + evtims[i]->impl_opaque[1] = 0; > > - /* Let the service know we're done producing messages */ > - rte_atomic16_dec(&sw_data->message_producer_count); > - > - if (i < nb_evtims) > - rte_mempool_put_bulk(sw_data->msg_pool, (void **)&msgs[i], > - nb_evtims - i); > + rte_smp_wmb(); > + } > > return i; > } > > static uint16_t > -sw_event_timer_arm_tmo_tick_burst(const struct rte_event_timer_adapter *adapter, > - struct rte_event_timer **evtims, > - uint64_t timeout_ticks, > - uint16_t nb_evtims) > +swtim_arm_tmo_tick_burst(const struct rte_event_timer_adapter *adapter, > + struct rte_event_timer **evtims, > + uint64_t timeout_ticks, > + uint16_t nb_evtims) > { > int i; > > for (i = 0; i < nb_evtims; i++) > evtims[i]->timeout_ticks = timeout_ticks; > > - return __sw_event_timer_arm_burst(adapter, evtims, nb_evtims); > + return __swtim_arm_burst(adapter, evtims, nb_evtims); > } > > -static const struct rte_event_timer_adapter_ops sw_event_adapter_timer_ops = { > - .init = sw_event_timer_adapter_init, > - .uninit = sw_event_timer_adapter_uninit, > - .start = sw_event_timer_adapter_start, > - .stop = sw_event_timer_adapter_stop, > - .get_info = sw_event_timer_adapter_get_info, > - .stats_get = sw_event_timer_adapter_stats_get, > - .stats_reset = sw_event_timer_adapter_stats_reset, > - .arm_burst = sw_event_timer_arm_burst, > - .arm_tmo_tick_burst = sw_event_timer_arm_tmo_tick_burst, > - .cancel_burst = sw_event_timer_cancel_burst, > +static const struct rte_event_timer_adapter_ops swtim_ops = { > + .init = swtim_init, > + .uninit = swtim_uninit, > + .start = swtim_start, > + .stop = swtim_stop, > + .get_info = swtim_get_info, > + .stats_get = swtim_stats_get, > + .stats_reset = swtim_stats_reset, > + .arm_burst = swtim_arm_burst, > + .arm_tmo_tick_burst = swtim_arm_tmo_tick_burst, > + .cancel_burst = swtim_cancel_burst, > }; > > RTE_INIT(event_timer_adapter_init_log) >