All of lore.kernel.org
 help / color / mirror / Atom feed
From: Jianfeng Tan <jianfeng.tan@intel.com>
To: dev@dpdk.org
Cc: anatoly.burakov@intel.com, bruce.richardson@intel.com,
	konstantin.ananyev@intel.com, thomas@monjalon.net,
	Jianfeng Tan <jianfeng.tan@intel.com>
Subject: [PATCH v2 3/4] eal: add synchronous multi-process communication
Date: Thu, 11 Jan 2018 04:07:33 +0000	[thread overview]
Message-ID: <1515643654-129489-4-git-send-email-jianfeng.tan@intel.com> (raw)
In-Reply-To: <1515643654-129489-1-git-send-email-jianfeng.tan@intel.com>

We need the synchronous way for multi-process communication,
i.e., blockingly waiting for reply message when we send a request
to the peer process.

We add two APIs rte_eal_mp_request() and rte_eal_mp_reply() for
such use case. By invoking rte_eal_mp_request(), a request message
is sent out, and then it waits there for a reply message. The
timeout is hard-coded 5 Sec. And the replied message will be copied
in the parameters of this API so that the caller can decide how
to translate those information (including params and fds). Note
if a primary process owns multiple secondary processes, this API
will fail.

The API rte_eal_mp_reply() is always called by an mp action handler.
Here we add another parameter for rte_eal_mp_t so that the action
handler knows which peer address to reply.

We use mutex in rte_eal_mp_request() to guarantee that only one
request is on the fly for one pair of processes.

Suggested-by: Anatoly Burakov <anatoly.burakov@intel.com>
Signed-off-by: Jianfeng Tan <jianfeng.tan@intel.com>
---
 lib/librte_eal/common/eal_common_proc.c | 144 +++++++++++++++++++++++++++++---
 lib/librte_eal/common/include/rte_eal.h |  73 +++++++++++++++-
 lib/librte_eal/rte_eal_version.map      |   2 +
 3 files changed, 206 insertions(+), 13 deletions(-)

diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 70519cc..f194a52 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -32,6 +32,7 @@
 static int mp_fd = -1;
 static char *mp_sec_sockets[MAX_SECONDARY_PROCS];
 static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t mp_mutex_request = PTHREAD_MUTEX_INITIALIZER;
 
 struct action_entry {
 	TAILQ_ENTRY(action_entry) next;      /**< Next attached action entry */
@@ -49,6 +50,10 @@ static struct action_entry_list action_entry_list =
 
 struct mp_msghdr {
 	char action_name[MAX_ACTION_NAME_LEN];
+#define MP_MSG	0 /* Share message with peers, will not block */
+#define MP_REQ	1 /* Request for information, Will block for a reply */
+#define MP_REP	2 /* Reply to previously-received request */
+	int type;
 	int fds_num;
 	int len_params;
 	char params[0];
@@ -138,7 +143,8 @@ rte_eal_mp_action_unregister(const char *name)
 }
 
 static int
-read_msg(int fd, char *buf, int buflen, int *fds, int fds_num)
+read_msg(int fd, char *buf, int buflen,
+	 int *fds, int fds_num, struct sockaddr_un *s)
 {
 	int ret;
 	struct iovec iov;
@@ -151,6 +157,8 @@ read_msg(int fd, char *buf, int buflen, int *fds, int fds_num)
 	iov.iov_base = buf;
 	iov.iov_len  = buflen;
 
+	msgh.msg_name = s;
+	msgh.msg_namelen = sizeof(*s);
 	msgh.msg_iov = &iov;
 	msgh.msg_iovlen = 1;
 	msgh.msg_control = control;
@@ -181,7 +189,7 @@ read_msg(int fd, char *buf, int buflen, int *fds, int fds_num)
 }
 
 static int
-process_msg(struct mp_msghdr *hdr, int len, int fds[])
+process_msg(struct mp_msghdr *hdr, int len, int fds[], struct sockaddr_un *s)
 {
 	int ret;
 	int params_len;
@@ -199,10 +207,10 @@ process_msg(struct mp_msghdr *hdr, int len, int fds[])
 	}
 
 	params_len = len - sizeof(struct mp_msghdr);
-	ret = entry->action(hdr->params, params_len, fds, hdr->fds_num);
+	ret = entry->action(hdr->params, params_len,
+			    fds, hdr->fds_num, s->sun_path);
 	pthread_mutex_unlock(&mp_mutex_action);
 	return ret;
-
 }
 
 static void *
@@ -211,11 +219,12 @@ mp_handle(void *arg __rte_unused)
 	int len;
 	int fds[SCM_MAX_FD];
 	char buf[MAX_MSG_LENGTH];
+	struct sockaddr_un sa;
 
 	while (1) {
-		len = read_msg(mp_fd, buf, MAX_MSG_LENGTH, fds, SCM_MAX_FD);
+		len = read_msg(mp_fd, buf, MAX_MSG_LENGTH, fds, SCM_MAX_FD, &sa);
 		if (len > 0)
-			process_msg((struct mp_msghdr *)buf, len, fds);
+			process_msg((struct mp_msghdr *)buf, len, fds, &sa);
 	}
 
 	return NULL;
@@ -255,7 +264,8 @@ static int
 mp_primary_proc(const void *params,
 		int len __rte_unused,
 		int fds[] __rte_unused,
-		int fds_num __rte_unused)
+		int fds_num __rte_unused,
+		const void *peer __rte_unused)
 {
 	const struct proc_request *r = (const struct proc_request *)params;
 
@@ -362,7 +372,8 @@ rte_eal_mp_channel_init(void)
 }
 
 static inline struct mp_msghdr *
-format_msg(const char *act_name, const void *p, int len_params, int fds_num)
+format_msg(const char *act_name, const void *p,
+	   int len_params, int fds_num, int type)
 {
 	int len_msg;
 	struct mp_msghdr *msg;
@@ -384,6 +395,7 @@ format_msg(const char *act_name, const void *p, int len_params, int fds_num)
 	strcpy(msg->action_name, act_name);
 	msg->fds_num = fds_num;
 	msg->len_params = len_params;
+	msg->type = type;
 	memcpy(msg->params, p, len_params);
 	return msg;
 }
@@ -455,7 +467,9 @@ mp_send(const char *action_name,
 	const void *params,
 	int len_params,
 	int fds[],
-	int fds_num)
+	int fds_num,
+	int type,
+	const void *peer)
 {
 	int i;
 	int n = 0;
@@ -468,7 +482,7 @@ mp_send(const char *action_name,
 		return 0;
 	}
 
-	msg = format_msg(action_name, params, len_params, fds_num);
+	msg = format_msg(action_name, params, len_params, fds_num, type);
 	if (msg == NULL)
 		return 0;
 
@@ -477,6 +491,11 @@ mp_send(const char *action_name,
 		return 0;
 	}
 
+	if (peer) {
+		n += send_msg(sockfd, peer, msg, fds);
+		goto ret;
+	}
+
 	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
 		/* broadcast to all secondaries */
 		for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
@@ -488,6 +507,7 @@ mp_send(const char *action_name,
 	} else
 		n += send_msg(sockfd, eal_mp_unix_path(), msg, fds);
 
+ret:
 	free(msg);
 	close(sockfd);
 	return n;
@@ -501,5 +521,107 @@ rte_eal_mp_sendmsg(const char *action_name,
 		   int fds_num)
 {
 	RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", action_name);
-	return mp_send(action_name, params, len_params, fds, fds_num);
+	return mp_send(action_name, params, len_params,
+			fds, fds_num, MP_MSG, NULL);
+}
+
+int
+rte_eal_mp_request(const char *action_name,
+		   void *params,
+		   int len_p,
+		   int fds[],
+		   int fds_in,
+		   int fds_out)
+{
+	int i, j;
+	int sockfd;
+	int nprocs;
+	int ret = 0;
+	struct mp_msghdr *req;
+	struct timeval tv;
+	char buf[MAX_MSG_LENGTH];
+	struct mp_msghdr *hdr;
+
+	RTE_LOG(DEBUG, EAL, "request: %s\n", action_name);
+
+	if (fds_in > SCM_MAX_FD || fds_out > SCM_MAX_FD) {
+		RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n", SCM_MAX_FD);
+		rte_errno = -E2BIG;
+		return 0;
+	}
+
+	req = format_msg(action_name, params, len_p, fds_in, MP_REQ);
+	if (req == NULL)
+		return 0;
+
+	if ((sockfd = open_unix_fd(0)) < 0) {
+		free(req);
+		return 0;
+	}
+
+	tv.tv_sec = 5;  /* 5 Secs Timeout */
+	tv.tv_usec = 0;
+	if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO,
+			(const void *)&tv, sizeof(struct timeval)) < 0)
+		RTE_LOG(INFO, EAL, "Failed to set recv timeout\n");
+
+	/* Only allow one req at a time */
+	pthread_mutex_lock(&mp_mutex_request);
+
+	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+		nprocs = 0;
+		for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
+			if (!mp_sec_sockets[i]) {
+				j = i;
+				nprocs++;
+			}
+
+		if (nprocs > 1) {
+			RTE_LOG(ERR, EAL,
+				"multi secondary processes not supported\n");
+			goto free_and_ret;
+		}
+
+		ret = send_msg(sockfd, mp_sec_sockets[j], req, fds);
+	} else
+		ret = send_msg(sockfd, eal_mp_unix_path(), req, fds);
+
+	if (ret == 0) {
+		RTE_LOG(ERR, EAL, "failed to send request: %s\n", action_name);
+		ret = -1;
+		goto free_and_ret;
+	}
+
+	ret = read_msg(sockfd, buf, MAX_MSG_LENGTH, fds, fds_out, NULL);
+	if (ret > 0) {
+		hdr = (struct mp_msghdr *)buf;
+		if (hdr->len_params == len_p)
+			memcpy(params, hdr->params, len_p);
+		else {
+			RTE_LOG(ERR, EAL, "invalid reply\n");
+			ret = 0;
+		}
+	}
+
+free_and_ret:
+	free(req);
+	close(sockfd);
+	pthread_mutex_unlock(&mp_mutex_request);
+	return ret;
+}
+
+int
+rte_eal_mp_reply(const char *action_name,
+		 const void *params,
+		 int len_p,
+		 int fds[],
+		 int fds_in,
+		 const void *peer)
+{
+	RTE_LOG(DEBUG, EAL, "reply: %s\n", action_name);
+	if (peer == NULL) {
+		RTE_LOG(ERR, EAL, "peer is not specified\n");
+		return 0;
+	}
+	return mp_send(action_name, params, len_p, fds, fds_in, MP_REP, peer);
 }
diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
index 9884c0b..2690a77 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -192,7 +192,7 @@ int rte_eal_primary_proc_alive(const char *config_file_path);
  * this function typedef to register action for coming messages.
  */
 typedef int (*rte_eal_mp_t)(const void *params, int len,
-			    int fds[], int fds_num);
+			    int fds[], int fds_num, const void *peer);
 
 /**
  * Register an action function for primary/secondary communication.
@@ -245,7 +245,7 @@ void rte_eal_mp_action_unregister(const char *name);
  *   The fds argument is an array of fds sent with sendmsg.
  *
  * @param fds_num
- *   The fds_num argument is number of fds to be sent with sendmsg.
+ *   The number of fds to be sent with sendmsg.
  *
  * @return
  *  - Returns the number of messages being sent successfully.
@@ -255,6 +255,75 @@ rte_eal_mp_sendmsg(const char *action_name, const void *params,
 		   int len_params, int fds[], int fds_num);
 
 /**
+ * Send a request to the peer process and expect a reply.
+ *
+ * This function sends a request message to the peer process, and will
+ * block until receiving reply message from the peer process. Note:
+ * this does not work for the primary process sending requests to its
+ * multiple (>1) secondary processes.
+ *
+ * @param action_name
+ *   The action_name argument is used to identify which action will be used.
+ *
+ * @param params
+ *   The params argument contains the customized message; as the reply is
+ *   received, the replied params will be copied to this pointer. 
+ *
+ * @param len_p
+ *   The length of the customized message.
+ *
+ * @param fds
+ *   The fds argument is an array of fds sent with sendmsg; as the reply
+ *   is received, the replied fds will be copied into this array.
+ *
+ * @param fds_in
+ *   The number of fds to be sent.
+ *
+ * @param fds_out
+ *   The number of fds to be received.
+ *
+ * @return
+ *  - (1) on success;
+ *  - (0) on sending request successfully but no valid reply received.
+ *  - (<0) on failing to sending request.
+ */
+int
+rte_eal_mp_request(const char *action_name, void *params,
+		   int len_p, int fds[], int fds_in, int fds_out);
+
+/**
+ * Send a reply to the peer process.
+ *
+ * This function will send a reply message in response to a request message
+ * received previously.
+ *
+ * @param action_name
+ *   The action_name argument is used to identify which action will be used.
+ *
+ * @param params
+ *   The params argument contains the customized message.
+ *
+ * @param len_p
+ *   The length of the customized message.
+ *
+ * @param fds
+ *   The fds argument is an array of fds sent with sendmsg.
+ *
+ * @param fds_in
+ *   The number of fds to be sent with sendmsg.
+ *
+ * @param peer
+ *   The fds_num argument is number of fds to be sent with sendmsg.
+ *
+ * @return
+ *  - (1) on success;
+ *  - (0) on failure.
+ */
+int
+rte_eal_mp_reply(const char *action_name, const void *params,
+		 int len_p, int fds[], int fds_in, const void *peer);
+
+/**
  * Usage function typedef used by the application usage function.
  *
  * Use this function typedef to define and call rte_set_application_usage_hook()
diff --git a/lib/librte_eal/rte_eal_version.map b/lib/librte_eal/rte_eal_version.map
index 5dacde5..068ac0b 100644
--- a/lib/librte_eal/rte_eal_version.map
+++ b/lib/librte_eal/rte_eal_version.map
@@ -243,5 +243,7 @@ DPDK_18.02 {
 	rte_eal_mp_action_register;
 	rte_eal_mp_action_unregister;
 	rte_eal_mp_sendmsg;
+	rte_eal_mp_request;
+	rte_eal_mp_reply;
 
 } DPDK_17.11;
-- 
2.7.4

  parent reply	other threads:[~2018-01-11  4:05 UTC|newest]

Thread overview: 88+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2017-11-30 18:44 [PATCH 0/3] generic channel for multi-process communication Jianfeng Tan
2017-11-30 18:44 ` [PATCH 1/3] eal: add " Jianfeng Tan
2017-12-11 11:04   ` Burakov, Anatoly
2017-12-11 16:43   ` Ananyev, Konstantin
2017-11-30 18:44 ` [PATCH 2/3] eal: add synchronous " Jianfeng Tan
2017-12-11 11:39   ` Burakov, Anatoly
2017-12-11 16:49     ` Ananyev, Konstantin
2017-11-30 18:44 ` [PATCH 3/3] vfio: use the generic multi-process channel Jianfeng Tan
2017-12-11 12:01   ` Burakov, Anatoly
2017-12-11  9:59 ` [PATCH 0/3] generic channel for multi-process communication Burakov, Anatoly
2017-12-12  7:34   ` Tan, Jianfeng
2017-12-12 16:18     ` Burakov, Anatoly
2018-01-11  4:07 ` [PATCH v2 0/4] " Jianfeng Tan
2018-01-11  4:07   ` [PATCH v2 1/4] eal: add " Jianfeng Tan
2018-01-13 12:57     ` Burakov, Anatoly
2018-01-15 19:52     ` Ananyev, Konstantin
2018-01-11  4:07   ` [PATCH v2 2/4] eal: add and del secondary processes in the primary Jianfeng Tan
2018-01-13 13:11     ` Burakov, Anatoly
2018-01-15 21:45     ` Ananyev, Konstantin
2018-01-11  4:07   ` Jianfeng Tan [this message]
2018-01-13 13:41     ` [PATCH v2 3/4] eal: add synchronous multi-process communication Burakov, Anatoly
2018-01-16  0:00     ` Ananyev, Konstantin
2018-01-16  8:10       ` Tan, Jianfeng
2018-01-16 11:12         ` Ananyev, Konstantin
2018-01-16 16:47           ` Tan, Jianfeng
2018-01-17 10:50             ` Ananyev, Konstantin
2018-01-17 13:09               ` Tan, Jianfeng
2018-01-17 13:15                 ` Tan, Jianfeng
2018-01-17 17:20                 ` Ananyev, Konstantin
2018-01-11  4:07   ` [PATCH v2 4/4] vfio: use the generic multi-process channel Jianfeng Tan
2018-01-13 14:03     ` Burakov, Anatoly
2018-03-04 14:57     ` [PATCH v5] vfio: change to use " Jianfeng Tan
2018-03-14 13:27       ` Burakov, Anatoly
2018-03-19  6:53         ` Tan, Jianfeng
2018-03-20 10:33           ` Burakov, Anatoly
2018-03-20 10:56             ` Burakov, Anatoly
2018-03-20  8:50     ` [PATCH v6] " Jianfeng Tan
2018-04-05 14:26       ` Tan, Jianfeng
2018-04-05 14:39         ` Burakov, Anatoly
2018-04-12 23:27         ` Thomas Monjalon
2018-04-12 15:26       ` Burakov, Anatoly
2018-04-15 15:06     ` [PATCH v7] " Jianfeng Tan
2018-04-15 15:10       ` Tan, Jianfeng
2018-04-17 23:04       ` Thomas Monjalon
2018-01-25  4:16 ` [PATCH v3 0/3] generic channel for multi-process communication Jianfeng Tan
2018-01-25  4:16   ` [PATCH v3 1/3] eal: add " Jianfeng Tan
2018-01-25 10:41     ` Thomas Monjalon
2018-01-25 11:27     ` Burakov, Anatoly
2018-01-25 11:34       ` Thomas Monjalon
2018-01-25 12:21     ` Ananyev, Konstantin
2018-01-25  4:16   ` [PATCH v3 2/3] eal: add synchronous " Jianfeng Tan
2018-01-25 12:00     ` Burakov, Anatoly
2018-01-25 12:19       ` Burakov, Anatoly
2018-01-25 12:19       ` Ananyev, Konstantin
2018-01-25 12:25         ` Burakov, Anatoly
2018-01-25 13:00           ` Ananyev, Konstantin
2018-01-25 13:05             ` Burakov, Anatoly
2018-01-25 13:10               ` Burakov, Anatoly
2018-01-25 15:03                 ` Ananyev, Konstantin
2018-01-25 16:22                   ` Burakov, Anatoly
2018-01-25 17:10                     ` Tan, Jianfeng
2018-01-25 18:02                       ` Burakov, Anatoly
2018-01-25 12:22     ` Ananyev, Konstantin
2018-01-25  4:16   ` [PATCH v3 3/3] vfio: use the generic multi-process channel Jianfeng Tan
2018-01-25 10:47     ` Thomas Monjalon
2018-01-25 10:52       ` Burakov, Anatoly
2018-01-25 10:57         ` Thomas Monjalon
2018-01-25 12:15           ` Burakov, Anatoly
2018-01-25 19:14 ` [PATCH v4 0/2] generic channel for multi-process communication Jianfeng Tan
2018-01-25 19:14   ` [PATCH v4 1/2] eal: add synchronous " Jianfeng Tan
2018-01-25 19:14   ` [PATCH v4 2/2] vfio: use the generic multi-process channel Jianfeng Tan
2018-01-25 19:15   ` [PATCH v4 0/2] generic channel for multi-process communication Tan, Jianfeng
2018-01-25 19:21 ` [PATCH v5 " Jianfeng Tan
2018-01-25 19:21   ` [PATCH v5 1/2] eal: add " Jianfeng Tan
2018-01-25 19:21   ` [PATCH v5 2/2] eal: add synchronous " Jianfeng Tan
2018-01-25 21:23   ` [PATCH v5 0/2] generic channel for " Thomas Monjalon
2018-01-26  3:41 ` [PATCH v6 " Jianfeng Tan
2018-01-26  3:41   ` [PATCH v6 1/2] eal: add " Jianfeng Tan
2018-01-26 10:25     ` Burakov, Anatoly
2018-01-29  6:37       ` Tan, Jianfeng
2018-01-29  9:37         ` Burakov, Anatoly
2018-01-26  3:41   ` [PATCH v6 2/2] eal: add synchronous " Jianfeng Tan
2018-01-26 10:31     ` Burakov, Anatoly
2018-01-29 23:52   ` [PATCH v6 0/2] generic channel for " Thomas Monjalon
2018-01-30  6:58 ` [PATCH v7 " Jianfeng Tan
2018-01-30  6:58   ` [PATCH v7 1/2] eal: add " Jianfeng Tan
2018-01-30  6:58   ` [PATCH v7 2/2] eal: add synchronous " Jianfeng Tan
2018-01-30 14:46   ` [PATCH v7 0/2] generic channel for " Thomas Monjalon

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1515643654-129489-4-git-send-email-jianfeng.tan@intel.com \
    --to=jianfeng.tan@intel.com \
    --cc=anatoly.burakov@intel.com \
    --cc=bruce.richardson@intel.com \
    --cc=dev@dpdk.org \
    --cc=konstantin.ananyev@intel.com \
    --cc=thomas@monjalon.net \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.