All of lore.kernel.org
 help / color / mirror / Atom feed
From: "Tan, Jianfeng" <jianfeng.tan@intel.com>
To: Anatoly Burakov <anatoly.burakov@intel.com>, dev@dpdk.org
Cc: failed@ecsmtp.ir.intel.com, to@ecsmtp.ir.intel.com,
	remove@ecsmtp.ir.intel.com, Directory@ecsmtp.ir.intel.com,
	not@ecsmtp.ir.intel.com, empty@ecsmtp.ir.intel.com,
	konstantin.ananyev@intel.com
Subject: Re: [PATCH v5 2/2] eal: add asynchronous request API to DPDK IPC
Date: Mon, 26 Mar 2018 22:15:24 +0800	[thread overview]
Message-ID: <cab9d8f5-3f5c-59f0-65dd-543585668c14@intel.com> (raw)
In-Reply-To: <db5fe3cd9dee23baef0700a44d7429ab48d4df7e.1521895541.git.anatoly.burakov@intel.com>



On 3/24/2018 8:46 PM, Anatoly Burakov wrote:
> This API is similar to the blocking API that is already present,
> but reply will be received in a separate callback by the caller
> (callback specified at the time of request, rather than registering
> for it in advance).
>
> Under the hood, we create a separate thread to deal with replies to
> asynchronous requests, that will just wait to be notified by the
> main thread, or woken up on a timer.
>
> Signed-off-by: Anatoly Burakov <anatoly.burakov@intel.com>

Generally, it looks great to me except some trivial nits, so

Acked-by: Jianfeng Tan <jianfeng.tan@intel.com>


> ---
>
> Notes:
>      v5:
>        - addressed review comments from Jianfeng
>        - split into two patches to avoid rename noise
>        - do not mark ignored message as processed
>      v4:
>        - rebase on top of latest IPC Improvements patchset [2]
>      
>      v3:
>        - added support for MP_IGN messages introduced in
>          IPC improvements v5 patchset
>      v2:
>        - fixed deadlocks and race conditions by not calling
>          callbacks while iterating over sync request list
>        - fixed use-after-free by making a copy of request
>        - changed API to also give user a copy of original
>          request, so that they know to which message the
>          callback is a reply to
>        - fixed missing .map file entries
>      
>      This patch is dependent upon previously published patchsets
>      for IPC fixes [1] and improvements [2].
>      
>      rte_mp_action_unregister and rte_mp_async_reply_unregister
>      do the same thing - should we perhaps make it one function?
>      
>      [1] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Fixes/
>      [2] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Improvements/
>
>   lib/librte_eal/common/eal_common_proc.c | 455 +++++++++++++++++++++++++++++++-
>   lib/librte_eal/common/include/rte_eal.h |  36 +++
>   lib/librte_eal/rte_eal_version.map      |   1 +
>   3 files changed, 479 insertions(+), 13 deletions(-)
>
> diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
> index 52b6ab2..c86252c 100644
> --- a/lib/librte_eal/common/eal_common_proc.c
> +++ b/lib/librte_eal/common/eal_common_proc.c
> @@ -26,6 +26,7 @@
>   #include <rte_errno.h>
>   #include <rte_lcore.h>
>   #include <rte_log.h>
> +#include <rte_tailq.h>
>   
>   #include "eal_private.h"
>   #include "eal_filesystem.h"
> @@ -60,13 +61,32 @@ struct mp_msg_internal {
>   	struct rte_mp_msg msg;
>   };
>   
> +struct async_request_param {
> +	rte_mp_async_reply_t clb;
> +	struct rte_mp_reply user_reply;
> +	struct timespec end;
> +	int n_responses_processed;
> +};
> +
>   struct pending_request {
>   	TAILQ_ENTRY(pending_request) next;
> -	int reply_received;
> +	enum {
> +		REQUEST_TYPE_SYNC,
> +		REQUEST_TYPE_ASYNC
> +	} type;
>   	char dst[PATH_MAX];
>   	struct rte_mp_msg *request;
>   	struct rte_mp_msg *reply;
> -	pthread_cond_t cond;
> +	int reply_received;
> +	RTE_STD_C11
> +	union {
> +		struct {
> +			struct async_request_param *param;
> +		} async;
> +		struct {
> +			pthread_cond_t cond;
> +		} sync;
> +	};
>   };
>   
>   TAILQ_HEAD(pending_request_list, pending_request);
> @@ -74,9 +94,12 @@ TAILQ_HEAD(pending_request_list, pending_request);
>   static struct {
>   	struct pending_request_list requests;
>   	pthread_mutex_t lock;
> +	pthread_cond_t async_cond;
>   } pending_requests = {
>   	.requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests),
> -	.lock = PTHREAD_MUTEX_INITIALIZER
> +	.lock = PTHREAD_MUTEX_INITIALIZER,
> +	.async_cond = PTHREAD_COND_INITIALIZER
> +	/**< used in async requests only */
>   };
>   
>   /* forward declarations */
> @@ -273,7 +296,12 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
>   			memcpy(sync_req->reply, msg, sizeof(*msg));
>   			/* -1 indicates that we've been asked to ignore */
>   			sync_req->reply_received = m->type == MP_REP ? 1 : -1;
> -			pthread_cond_signal(&sync_req->cond);
> +
> +			if (sync_req->type == REQUEST_TYPE_SYNC)
> +				pthread_cond_signal(&sync_req->sync.cond);
> +			else if (sync_req->type == REQUEST_TYPE_ASYNC)
> +				pthread_cond_signal(
> +					&pending_requests.async_cond);
>   		} else
>   			RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
>   		pthread_mutex_unlock(&pending_requests.lock);
> @@ -320,6 +348,189 @@ mp_handle(void *arg __rte_unused)
>   }
>   
>   static int
> +timespec_cmp(const struct timespec *a, const struct timespec *b)
> +{
> +	if (a->tv_sec < b->tv_sec)
> +		return -1;
> +	if (a->tv_sec > b->tv_sec)
> +		return 1;
> +	if (a->tv_nsec < b->tv_nsec)
> +		return -1;
> +	if (a->tv_nsec > b->tv_nsec)
> +		return 1;
> +	return 0;
> +}
> +
> +enum async_action {
> +	ACTION_NONE, /**< don't do anything */
> +	ACTION_FREE, /**< free the action entry, but don't trigger callback */
> +	ACTION_TRIGGER /**< trigger callback, then free action entry */
> +};
> +
> +static enum async_action
> +process_async_request(struct pending_request *sr, const struct timespec *now)
> +{
> +	struct async_request_param *param;
> +	struct rte_mp_reply *reply;
> +	bool timeout, received, last_msg;
> +
> +	param = sr->async.param;
> +	reply = &param->user_reply;
> +
> +	/* did we timeout? */
> +	timeout = timespec_cmp(&param->end, now) <= 0;
> +
> +	/* did we receive a response? */
> +	received = sr->reply_received != 0;
> +
> +	/* if we didn't time out, and we didn't receive a response, ignore */
> +	if (!timeout && !received)
> +		return ACTION_NONE;
> +
> +	/* if we received a response, adjust relevant data and copy mesasge. */
> +	if (sr->reply_received == 1 && sr->reply) {
> +		struct rte_mp_msg *msg, *user_msgs, *tmp;
> +
> +		msg = sr->reply;
> +		user_msgs = reply->msgs;
> +
> +		tmp = realloc(user_msgs, sizeof(*msg) *
> +				(reply->nb_received + 1));
> +		if (!tmp) {
> +			RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
> +				sr->dst, sr->request->name);
> +			/* this entry is going to be removed and its message
> +			 * dropped, but we don't want to leak memory, so
> +			 * continue.
> +			 */
> +		} else {
> +			user_msgs = tmp;
> +			reply->msgs = user_msgs;
> +			memcpy(&user_msgs[reply->nb_received],
> +					msg, sizeof(*msg));
> +			reply->nb_received++;
> +		}
> +
> +		/* mark this request as processed */
> +		param->n_responses_processed++;
> +	} else if (sr->reply_received == -1) {
> +		/* we were asked to ignore this process */
> +		reply->nb_sent--;
> +	}
> +	free(sr->reply);
> +
> +	last_msg = param->n_responses_processed == reply->nb_sent;
> +
> +	return last_msg ? ACTION_TRIGGER : ACTION_FREE;
> +}
> +
> +static void
> +trigger_async_action(struct pending_request *sr)
> +{
> +	struct async_request_param *param;
> +	struct rte_mp_reply *reply;
> +
> +	param = sr->async.param;
> +	reply = &param->user_reply;
> +
> +	param->clb(sr->request, reply);
> +
> +	/* clean up */
> +	free(sr->async.param->user_reply.msgs);

How about simple "free(reply->msgs);"?

> +	free(sr->async.param);
> +	free(sr->request);
> +}
> +
> +static void *
> +async_reply_handle(void *arg __rte_unused)
> +{
> +	struct pending_request *sr;
> +	struct timeval now;
> +	struct timespec timeout, ts_now;
> +	while (1) {
> +		struct pending_request *trigger = NULL;
> +		int ret;
> +		bool nowait = false;
> +		bool timedwait = false;
> +
> +		pthread_mutex_lock(&pending_requests.lock);
> +
> +		/* scan through the list and see if there are any timeouts that
> +		 * are earlier than our current timeout.
> +		 */
> +		TAILQ_FOREACH(sr, &pending_requests.requests, next) {
> +			if (sr->type != REQUEST_TYPE_ASYNC)
> +				continue;
> +			if (!timedwait || timespec_cmp(&sr->async.param->end,
> +					&timeout) < 0) {
> +				memcpy(&timeout, &sr->async.param->end,
> +					sizeof(timeout));
> +				timedwait = true;
> +			}
> +
> +			/* sometimes, we don't even wait */
> +			if (sr->reply_received) {
> +				nowait = true;
> +				break;
> +			}
> +		}
> +
> +		if (nowait)
> +			ret = 0;
> +		else if (timedwait)
> +			ret = pthread_cond_timedwait(
> +					&pending_requests.async_cond,
> +					&pending_requests.lock, &timeout);
> +		else
> +			ret = pthread_cond_wait(&pending_requests.async_cond,
> +					&pending_requests.lock);
> +
> +		if (gettimeofday(&now, NULL) < 0) {
> +			RTE_LOG(ERR, EAL, "Cannot get current time\n");
> +			break;
> +		}
> +		ts_now.tv_nsec = now.tv_usec * 1000;
> +		ts_now.tv_sec = now.tv_sec;
> +
> +		if (ret == 0 || ret == ETIMEDOUT) {
> +			struct pending_request *next;
> +			/* we've either been woken up, or we timed out */
> +
> +			/* we have still the lock, check if anything needs
> +			 * processing.
> +			 */
> +			TAILQ_FOREACH_SAFE(sr, &pending_requests.requests, next,
> +					next) {
> +				enum async_action action;
> +				if (sr->type != REQUEST_TYPE_ASYNC)
> +					continue;
> +
> +				action = process_async_request(sr, &ts_now);
> +				if (action == ACTION_FREE) {
> +					TAILQ_REMOVE(&pending_requests.requests,
> +							sr, next);
> +					free(sr);
> +				} else if (action == ACTION_TRIGGER &&
> +						trigger == NULL) {
> +					TAILQ_REMOVE(&pending_requests.requests,
> +							sr, next);
> +					trigger = sr;
> +				}
> +			}
> +		}
> +		pthread_mutex_unlock(&pending_requests.lock);
> +		if (trigger) {
> +			trigger_async_action(trigger);
> +			free(trigger);
> +		}
> +	};
> +
> +	RTE_LOG(ERR, EAL, "ERROR: asynchronous requests disabled\n");
> +
> +	return NULL;
> +}
> +
> +static int
>   open_socket_fd(void)
>   {
>   	char peer_name[PATH_MAX] = {0};
> @@ -382,7 +593,7 @@ rte_mp_channel_init(void)
>   	char thread_name[RTE_MAX_THREAD_NAME_LEN];
>   	char path[PATH_MAX];
>   	int dir_fd;
> -	pthread_t tid;
> +	pthread_t mp_handle_tid, async_reply_handle_tid;
>   
>   	/* create filter path */
>   	create_socket_path("*", path, sizeof(path));
> @@ -419,7 +630,16 @@ rte_mp_channel_init(void)
>   		return -1;
>   	}
>   
> -	if (pthread_create(&tid, NULL, mp_handle, NULL) < 0) {
> +	if (pthread_create(&mp_handle_tid, NULL, mp_handle, NULL) < 0) {
> +		RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
> +			strerror(errno));
> +		close(mp_fd);
> +		mp_fd = -1;
> +		return -1;
> +	}
> +
> +	if (pthread_create(&async_reply_handle_tid, NULL,
> +			async_reply_handle, NULL) < 0) {
>   		RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
>   			strerror(errno));
>   		close(mp_fd);
> @@ -430,7 +650,11 @@ rte_mp_channel_init(void)
>   
>   	/* try best to set thread name */
>   	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle");
> -	rte_thread_setname(tid, thread_name);
> +	rte_thread_setname(mp_handle_tid, thread_name);
> +
> +	/* try best to set thread name */
> +	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_async_handle");
> +	rte_thread_setname(async_reply_handle_tid, thread_name);
>   
>   	/* unlock the directory */
>   	flock(dir_fd, LOCK_UN);
> @@ -602,18 +826,77 @@ rte_mp_sendmsg(struct rte_mp_msg *msg)
>   }
>   
>   static int
> -mp_request_one(const char *dst, struct rte_mp_msg *req,
> +mp_request_async(const char *dst, struct rte_mp_msg *req,
> +		struct async_request_param *param)
> +{
> +	struct rte_mp_msg *reply_msg;
> +	struct pending_request *sync_req, *exist;
> +	int ret;
> +
> +	sync_req = malloc(sizeof(*sync_req));
> +	reply_msg = malloc(sizeof(*reply_msg));
> +	if (sync_req == NULL || reply_msg == NULL) {
> +		RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n");
> +		rte_errno = ENOMEM;
> +		ret = -1;
> +		goto fail;
> +	}
> +
> +	memset(sync_req, 0, sizeof(*sync_req));
> +	memset(reply_msg, 0, sizeof(*reply_msg));
> +
> +	sync_req->type = REQUEST_TYPE_ASYNC;
> +	strcpy(sync_req->dst, dst);
> +	sync_req->request = req;
> +	sync_req->reply = reply_msg;
> +	sync_req->async.param = param;
> +
> +	/* queue already locked by caller */
> +
> +	exist = find_sync_request(dst, req->name);
> +	if (!exist)
> +		TAILQ_INSERT_TAIL(&pending_requests.requests, sync_req, next);
> +	if (exist) {

else?

> +		RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
> +		rte_errno = EEXIST;
> +		ret = -1;
> +		goto fail;
> +	}
> +
> +	ret = send_msg(dst, req, MP_REQ);
> +	if (ret < 0) {
> +		RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
> +			dst, req->name);
> +		ret = -1;
> +		goto fail;
> +	} else if (ret == 0) {
> +		ret = 0;
> +		goto fail;
> +	}
> +
> +	param->user_reply.nb_sent++;
> +
> +	return 0;
> +fail:
> +	free(sync_req);
> +	free(reply_msg);
> +	return ret;
> +}
> +
> +static int
> +mp_request_sync(const char *dst, struct rte_mp_msg *req,
>   	       struct rte_mp_reply *reply, const struct timespec *ts)
>   {
>   	int ret;
>   	struct rte_mp_msg msg, *tmp;
>   	struct pending_request sync_req, *exist;
>   
> +	sync_req.type = REQUEST_TYPE_SYNC;
>   	sync_req.reply_received = 0;
>   	strcpy(sync_req.dst, dst);
>   	sync_req.request = req;
>   	sync_req.reply = &msg;
> -	pthread_cond_init(&sync_req.cond, NULL);
> +	pthread_cond_init(&sync_req.sync.cond, NULL);
>   
>   	pthread_mutex_lock(&pending_requests.lock);
>   	exist = find_sync_request(dst, req->name);
> @@ -637,7 +920,7 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,
>   	reply->nb_sent++;
>   
>   	do {
> -		ret = pthread_cond_timedwait(&sync_req.cond,
> +		ret = pthread_cond_timedwait(&sync_req.sync.cond,
>   				&pending_requests.lock, ts);
>   	} while (ret != 0 && ret != ETIMEDOUT);
>   
> @@ -703,7 +986,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
>   
>   	/* for secondary process, send request to the primary process only */
>   	if (rte_eal_process_type() == RTE_PROC_SECONDARY)
> -		return mp_request_one(eal_mp_socket_path(), req, reply, &end);
> +		return mp_request_sync(eal_mp_socket_path(), req, reply, &end);
>   
>   	/* for primary process, broadcast request, and collect reply 1 by 1 */
>   	mp_dir = opendir(mp_dir_path);
> @@ -732,7 +1015,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
>   		snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
>   			 ent->d_name);
>   
> -		if (mp_request_one(path, req, reply, &end))
> +		if (mp_request_sync(path, req, reply, &end))
>   			ret = -1;
>   	}
>   	/* unlock the directory */
> @@ -744,9 +1027,155 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
>   }
>   
>   int __rte_experimental
> -rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
> +rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
> +		rte_mp_async_reply_t clb)
>   {
> +	struct rte_mp_msg *copy;
> +	struct pending_request *dummy;
> +	struct async_request_param *param = NULL;

No need to assign it to NULL.

> +	struct rte_mp_reply *reply;
> +	int dir_fd, ret = 0;
> +	DIR *mp_dir;
> +	struct dirent *ent;
> +	struct timeval now;
> +	struct timespec *end;
> +
> +	RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
> +
> +	if (check_input(req) == false)
> +		return -1;
> +	if (gettimeofday(&now, NULL) < 0) {
> +		RTE_LOG(ERR, EAL, "Faile to get current time\n");
> +		rte_errno = errno;
> +		return -1;
> +	}
> +	copy = malloc(sizeof(*copy));
> +	dummy = malloc(sizeof(*dummy));
> +	param = malloc(sizeof(*param));
> +	if (copy == NULL || dummy == NULL || param == NULL) {
> +		RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n");
> +		rte_errno = ENOMEM;
> +		goto fail;
> +	}
> +
> +	memset(copy, 0, sizeof(*copy));
> +	memset(dummy, 0, sizeof(*dummy));
> +	memset(param, 0, sizeof(*param));
> +
> +	/* copy message */
> +	memcpy(copy, req, sizeof(*copy));
> +
> +	param->n_responses_processed = 0;
> +	param->clb = clb;
> +	end = &param->end;
> +	reply = &param->user_reply;
> +
> +	end->tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;
> +	end->tv_sec = now.tv_sec + ts->tv_sec +
> +			(now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;
> +	reply->nb_sent = 0;
> +	reply->nb_received = 0;
> +	reply->msgs = NULL;
>   
> +	/* we have to lock the request queue here, as we will be adding a bunch
> +	 * of requests to the queue at once, and some of the replies may arrive
> +	 * before we add all of the requests to the queue.
> +	 */
> +	pthread_mutex_lock(&pending_requests.lock);
> +
> +	/* we have to ensure that callback gets triggered even if we don't send
> +	 * anything, therefore earlier we have allocated a dummy request. put it
> +	 * on the queue and fill it. we will remove it once we know we sent
> +	 * something.
> +	 */

Or we can add this dummy at last if it's necessary, instead of adding 
firstly and remove if not necessary? No strong option here.

> +	dummy->type = REQUEST_TYPE_ASYNC;
> +	dummy->request = copy;
> +	dummy->reply = NULL;
> +	dummy->async.param = param;
> +	dummy->reply_received = 1; /* short-circuit the timeout */
> +
> +	TAILQ_INSERT_TAIL(&pending_requests.requests, dummy, next);
> +
> +	/* for secondary process, send request to the primary process only */
> +	if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
> +		ret = mp_request_async(eal_mp_socket_path(), copy, param);
> +
> +		/* if we sent something, remove dummy request from the queue */
> +		if (reply->nb_sent != 0) {
> +			TAILQ_REMOVE(&pending_requests.requests, dummy, next);
> +			free(dummy);
> +			dummy = NULL;
> +		}
> +
> +		pthread_mutex_unlock(&pending_requests.lock);
> +
> +		/* if we couldn't send anything, clean up */
> +		if (ret != 0)
> +			goto fail;
> +		return 0;
> +	}
> +
> +	/* for primary process, broadcast request */
> +	mp_dir = opendir(mp_dir_path);
> +	if (!mp_dir) {
> +		RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
> +		rte_errno = errno;
> +		goto unlock_fail;
> +	}
> +	dir_fd = dirfd(mp_dir);
> +
> +	/* lock the directory to prevent processes spinning up while we send */
> +	if (flock(dir_fd, LOCK_EX)) {
> +		RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
> +			mp_dir_path);
> +		rte_errno = errno;
> +		goto closedir_fail;
> +	}
> +
> +	while ((ent = readdir(mp_dir))) {
> +		char path[PATH_MAX];
> +
> +		if (fnmatch(mp_filter, ent->d_name, 0) != 0)
> +			continue;
> +
> +		snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
> +			 ent->d_name);
> +
> +		if (mp_request_async(path, copy, param))
> +			ret = -1;
> +	}
> +	/* if we sent something, remove dummy request from the queue */
> +	if (reply->nb_sent != 0) {
> +		TAILQ_REMOVE(&pending_requests.requests, dummy, next);
> +		free(dummy);
> +		dummy = NULL;
> +	}
> +	/* trigger async request thread wake up */
> +	pthread_cond_signal(&pending_requests.async_cond);
> +
> +	/* finally, unlock the queue */
> +	pthread_mutex_unlock(&pending_requests.lock);
> +
> +	/* unlock the directory */
> +	flock(dir_fd, LOCK_UN);
> +
> +	/* dir_fd automatically closed on closedir */
> +	closedir(mp_dir);
> +	return ret;
> +closedir_fail:
> +	closedir(mp_dir);
> +unlock_fail:
> +	pthread_mutex_unlock(&pending_requests.lock);
> +fail:
> +	free(dummy);
> +	free(param);
> +	free(copy);
> +	return -1;
> +}
> +
> +int __rte_experimental
> +rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
> +{
>   	RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name);
>   
>   	if (check_input(msg) == false)
> diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
> index 044474e..87ebfd0 100644
> --- a/lib/librte_eal/common/include/rte_eal.h
> +++ b/lib/librte_eal/common/include/rte_eal.h
> @@ -230,6 +230,16 @@ struct rte_mp_reply {
>   typedef int (*rte_mp_t)(const struct rte_mp_msg *msg, const void *peer);
>   
>   /**
> + * Asynchronous reply function typedef used by other components.
> + *
> + * As we create socket channel for primary/secondary communication, use
> + * this function typedef to register action for coming responses to asynchronous
> + * requests.
> + */
> +typedef int (*rte_mp_async_reply_t)(const struct rte_mp_msg *request,
> +		const struct rte_mp_reply *reply);
> +
> +/**
>    * @warning
>    * @b EXPERIMENTAL: this API may change without prior notice
>    *
> @@ -321,6 +331,32 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
>    * @warning
>    * @b EXPERIMENTAL: this API may change without prior notice
>    *
> + * Send a request to the peer process and expect a reply in a separate callback.
> + *
> + * This function sends a request message to the peer process, and will not
> + * block. Instead, reply will be received in a separate callback.
> + *
> + * @param req
> + *   The req argument contains the customized request message.
> + *
> + * @param ts
> + *   The ts argument specifies how long we can wait for the peer(s) to reply.
> + *
> + * @param clb
> + *   The callback to trigger when all responses for this request have arrived.
> + *
> + * @return
> + *  - On success, return 0.
> + *  - On failure, return -1, and the reason will be stored in rte_errno.
> + */
> +int __rte_experimental
> +rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
> +		rte_mp_async_reply_t clb);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
>    * Send a reply to the peer process.
>    *
>    * This function will send a reply message in response to a request message
> diff --git a/lib/librte_eal/rte_eal_version.map b/lib/librte_eal/rte_eal_version.map
> index d123602..328a0be 100644
> --- a/lib/librte_eal/rte_eal_version.map
> +++ b/lib/librte_eal/rte_eal_version.map
> @@ -225,6 +225,7 @@ EXPERIMENTAL {
>   	rte_mp_action_unregister;
>   	rte_mp_sendmsg;
>   	rte_mp_request;
> +	rte_mp_request_async;
>   	rte_mp_reply;
>   	rte_service_attr_get;
>   	rte_service_attr_reset_all;

  reply	other threads:[~2018-03-26 14:15 UTC|newest]

Thread overview: 40+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-02-27 14:59 [PATCH] eal: add asynchronous request API to DPDK IPC Anatoly Burakov
2018-02-28 10:22 ` Burakov, Anatoly
2018-03-02 18:06 ` [PATCH v2] " Anatoly Burakov
2018-03-07 16:57   ` [PATCH v3] " Anatoly Burakov
2018-03-13 17:42     ` [PATCH v4] " Anatoly Burakov
2018-03-23 15:38       ` Tan, Jianfeng
2018-03-23 18:21         ` Burakov, Anatoly
2018-03-24 13:22           ` Burakov, Anatoly
2018-03-24 12:46       ` [PATCH v5 1/2] eal: rename IPC sync request to pending request Anatoly Burakov
2018-03-26  7:31         ` Tan, Jianfeng
2018-03-27 13:59         ` [PATCH v6 " Anatoly Burakov
2018-03-27 16:27           ` Thomas Monjalon
2018-03-28  9:15             ` Burakov, Anatoly
2018-03-28 10:08               ` Thomas Monjalon
2018-03-28 10:57                 ` Burakov, Anatoly
2018-03-31 17:06           ` [PATCH v7 1/3] " Anatoly Burakov
2018-03-31 17:06           ` [PATCH v7 2/3] eal: rename mp_request to mp_request_sync Anatoly Burakov
2018-04-02  5:09             ` Tan, Jianfeng
2018-03-31 17:06           ` [PATCH v7 3/3] eal: add asynchronous request API to DPDK IPC Anatoly Burakov
2018-04-04 22:15             ` Thomas Monjalon
2018-03-27 13:59         ` [PATCH v6 2/2] " Anatoly Burakov
2018-03-27 16:33           ` Thomas Monjalon
2018-03-28  2:08             ` Tan, Jianfeng
2018-03-28  7:29               ` Thomas Monjalon
2018-03-28  8:22                 ` Van Haaren, Harry
2018-03-28  8:55                   ` Tan, Jianfeng
2018-03-28  9:10                     ` Van Haaren, Harry
2018-03-28  9:21                     ` Burakov, Anatoly
2018-03-28  9:53                       ` Thomas Monjalon
2018-03-28 10:42                         ` Burakov, Anatoly
2018-03-28 11:26                           ` Thomas Monjalon
2018-03-28 12:21                             ` Burakov, Anatoly
2018-03-28  9:11                 ` Bruce Richardson
2018-03-24 12:46       ` [PATCH v5 " Anatoly Burakov
2018-03-26 14:15         ` Tan, Jianfeng [this message]
2018-03-26 14:28           ` Burakov, Anatoly
2018-03-02 18:48 ` [PATCH] " Stephen Hemminger
2018-03-03 12:29   ` Burakov, Anatoly
2018-03-02 18:51 ` Stephen Hemminger
2018-03-03 13:44   ` Burakov, Anatoly

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=cab9d8f5-3f5c-59f0-65dd-543585668c14@intel.com \
    --to=jianfeng.tan@intel.com \
    --cc=Directory@ecsmtp.ir.intel.com \
    --cc=anatoly.burakov@intel.com \
    --cc=dev@dpdk.org \
    --cc=empty@ecsmtp.ir.intel.com \
    --cc=failed@ecsmtp.ir.intel.com \
    --cc=konstantin.ananyev@intel.com \
    --cc=not@ecsmtp.ir.intel.com \
    --cc=remove@ecsmtp.ir.intel.com \
    --cc=to@ecsmtp.ir.intel.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.