All of lore.kernel.org
 help / color / mirror / Atom feed
* [net-next 00/18] tipc: Introduce Communcation Group feature
@ 2017-10-12 14:02 Jon Maloy
  2017-10-12 14:02 ` [net-next 01/18] tipc: add ability to order and receive topology events in driver Jon Maloy
                   ` (17 more replies)
  0 siblings, 18 replies; 22+ messages in thread
From: Jon Maloy @ 2017-10-12 14:02 UTC (permalink / raw)
  To: davem, netdev; +Cc: parthasarathy.bhuvaragan, ying.xue, tipc-discussion

With this commit series we introduce a 'Group Communication' feature in
order to resolve the datagram and multicast flow control problem. This 
new feature makes it possible to instantiate multiple virtual brokerless
message buses by just creating member sockets.

The main features are as follows:
---------------------------------
- Sockets can join a group via a new setsockopt() call TIPC_GROUP_JOIN.
  If it is the first socket of the group this implies creation of the
  group. This call takes four parameters: 'type' serves as group 
  identifier, 'instance' serves as member identifier, and 'scope' 
  indicates the visibility of the group (node/cluster/zone). Finally,
  'flags' indicates different options for the socket joining the group. 
  For the time being, there are only two such flags: 1) 'LOOPBACK'
  indicates if the creator of the socket wants to receive a copy of 
  broadcast or multicast messages it sends to the group, 2) EVENTS 
  indicates if it wants to receive membership (JOINED/LEFT) events for
  the other members of the group.

- Groups are closed, i.e., sockets which have not joined a group will
  not be able to send messages to or receive messages from members of
  the group, and vice versa. A socket can only be member of one group
  at a time.

- There are four transmission modes.
  1: Unicast. The sender transmits a message using the port identity
     (node:port tuple) of the receiving socket.
  2: Anycast. The sender transmits a message using a port name (type:
     instance:scope) of one of the receiving sockets. If more than
     one member socket matches the given address a destination is 
     selected according to a round-robin algorithm, but also considering
     the destination load (advertised window size) as an additional
     criteria.
  3: Multicast. The sender transmits a message using a port name 
     (type:instance:scope) of one or more of the receiving sockets.
     All sockets in the group matching the given address will receive
     a copy of the message. 
  4: Broadcast. The sender transmits a message using the primtive
     send(). All members of the group, irrespective of their member
     identity (instance) number receive a copy of the message.

- TIPC broadcast is used for carrying messages in mode 3 or 4 when
  this is deemed more efficient, i.e., depending on number of actual
  destinations.

- All transmission modes are flow controlled, so that messages never
  are dropped or rejected, just like we are used to from connection
  oriented communication. A special algorithm guarantees that this is
  true even for multipoint-to-point communication, i.e., at occasions
  where many source sockets may decide to send simultaneously towards
  the same  destination socket.

- Sequence order is always guaranteed, even between the different
  transmission modes.

- Member join/leave events are received in all other member sockets
  in guaranteed order. I.e., a 'JOINED' (an empty message with the OOB
  bit set) will always be received before the first data message from
  a new member, and a 'LEAVE' (like 'JOINED', but with EOR bit set) will
  always arrive after the last data message from a leaving member.

Jon Maloy (18):
  tipc: add ability to order and receive topology events in driver
  tipc: improve address sanity check in tipc_connect()
  tipc: add ability to obtain node availability status from other files
  tipc: refactor function filter_rcv()
  tipc: add new function for sending multiple small messages
  tipc: improve destination linked list
  tipc: introduce communication groups
  tipc: add second source address to recvmsg()/recvfrom()
  tipc: receive group membership events via member socket
  tipc: introduce flow control for group broadcast messages
  tipc: introduce group unicast messaging
  tipc: introduce group anycast messaging
  tipc: introduce group multicast messaging
  tipc: guarantee group unicast doesn't bypass group broadcast
  tipc: guarantee that group broadcast doesn't bypass group unicast
  tipc: guarantee delivery of UP event before first broadcast
  tipc: guarantee delivery of last broadcast before DOWN event
  tipc: add multipoint-to-point flow control

 include/uapi/linux/tipc.h |  15 +
 net/tipc/Makefile         |   2 +-
 net/tipc/bcast.c          |  18 +-
 net/tipc/core.h           |   5 +
 net/tipc/group.c          | 874 ++++++++++++++++++++++++++++++++++++++++++++++
 net/tipc/group.h          |  73 ++++
 net/tipc/link.c           |   9 +-
 net/tipc/msg.c            |   7 +
 net/tipc/msg.h            | 110 +++++-
 net/tipc/name_table.c     | 174 ++++++---
 net/tipc/name_table.h     |  28 +-
 net/tipc/node.c           |  42 ++-
 net/tipc/node.h           |   5 +-
 net/tipc/server.c         | 121 +++++--
 net/tipc/server.h         |   5 +-
 net/tipc/socket.c         | 770 +++++++++++++++++++++++++++++++---------
 16 files changed, 1988 insertions(+), 270 deletions(-)
 create mode 100644 net/tipc/group.c
 create mode 100644 net/tipc/group.h

-- 
2.1.4

^ permalink raw reply	[flat|nested] 22+ messages in thread

* [net-next 01/18] tipc: add ability to order and receive topology events in driver
  2017-10-12 14:02 [net-next 00/18] tipc: Introduce Communcation Group feature Jon Maloy
@ 2017-10-12 14:02 ` Jon Maloy
  2017-10-12 18:54   ` David Miller
  2017-10-12 14:02 ` [net-next 02/18] tipc: improve address sanity check in tipc_connect() Jon Maloy
                   ` (16 subsequent siblings)
  17 siblings, 1 reply; 22+ messages in thread
From: Jon Maloy @ 2017-10-12 14:02 UTC (permalink / raw)
  To: davem, netdev; +Cc: parthasarathy.bhuvaragan, ying.xue, tipc-discussion

As preparation for introducing communication groups, we add the ability
to issue topology subscriptions and receive topology events from kernel
space. This will make it possible for group member sockets to keep track
of other group members.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/core.h   |   5 +++
 net/tipc/msg.h    |   1 +
 net/tipc/server.c | 121 ++++++++++++++++++++++++++++++++++++++++++------------
 net/tipc/server.h |   5 ++-
 net/tipc/socket.c |  32 +++++++++------
 5 files changed, 125 insertions(+), 39 deletions(-)

diff --git a/net/tipc/core.h b/net/tipc/core.h
index 5cc5398..9643426 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -132,6 +132,11 @@ static inline struct list_head *tipc_nodes(struct net *net)
 	return &tipc_net(net)->node_list;
 }
 
+static inline struct tipc_server *tipc_topsrv(struct net *net)
+{
+	return tipc_net(net)->topsrv;
+}
+
 static inline unsigned int tipc_hashfn(u32 addr)
 {
 	return addr & (NODE_HTABLE_SIZE - 1);
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index c843fd2..d058b1c 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -78,6 +78,7 @@ struct plist;
 #define  MSG_FRAGMENTER       12
 #define  LINK_CONFIG          13
 #define  SOCK_WAKEUP          14       /* pseudo user */
+#define  TOP_SRV              15       /* pseudo user */
 
 /*
  * Message header sizes
diff --git a/net/tipc/server.c b/net/tipc/server.c
index 3cd6402..caec0d2 100644
--- a/net/tipc/server.c
+++ b/net/tipc/server.c
@@ -36,6 +36,8 @@
 #include "server.h"
 #include "core.h"
 #include "socket.h"
+#include "addr.h"
+#include "msg.h"
 #include <net/sock.h>
 #include <linux/module.h>
 
@@ -105,13 +107,11 @@ static void tipc_conn_kref_release(struct kref *kref)
 		kernel_bind(sock, (struct sockaddr *)saddr, sizeof(*saddr));
 		sock_release(sock);
 		con->sock = NULL;
-
-		spin_lock_bh(&s->idr_lock);
-		idr_remove(&s->conn_idr, con->conid);
-		s->idr_in_use--;
-		spin_unlock_bh(&s->idr_lock);
 	}
-
+	spin_lock_bh(&s->idr_lock);
+	idr_remove(&s->conn_idr, con->conid);
+	s->idr_in_use--;
+	spin_unlock_bh(&s->idr_lock);
 	tipc_clean_outqueues(con);
 	kfree(con);
 }
@@ -197,7 +197,8 @@ static void tipc_close_conn(struct tipc_conn *con)
 	struct tipc_server *s = con->server;
 
 	if (test_and_clear_bit(CF_CONNECTED, &con->flags)) {
-		tipc_unregister_callbacks(con);
+		if (con->sock)
+			tipc_unregister_callbacks(con);
 
 		if (con->conid)
 			s->tipc_conn_release(con->conid, con->usr_data);
@@ -207,8 +208,8 @@ static void tipc_close_conn(struct tipc_conn *con)
 		 * are harmless for us here as we have already deleted this
 		 * connection from server connection list.
 		 */
-		kernel_sock_shutdown(con->sock, SHUT_RDWR);
-
+		if (con->sock)
+			kernel_sock_shutdown(con->sock, SHUT_RDWR);
 		conn_put(con);
 	}
 }
@@ -288,6 +289,7 @@ static int tipc_receive_from_sock(struct tipc_conn *con)
 }
 
 static int tipc_accept_from_sock(struct tipc_conn *con)
+
 {
 	struct tipc_server *s = con->server;
 	struct socket *sock = con->sock;
@@ -473,6 +475,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid,
 
 	if (!queue_work(s->send_wq, &con->swork))
 		conn_put(con);
+
 	return 0;
 }
 
@@ -487,38 +490,104 @@ void tipc_conn_terminate(struct tipc_server *s, int conid)
 	}
 }
 
+bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
+			     u32 lower, u32 upper, int *conid)
+{
+	struct tipc_server *s;
+	struct tipc_conn *con;
+	struct tipc_subscriber *scbr;
+	struct tipc_subscr sub;
+
+	sub.seq.type = type;
+	sub.seq.lower = lower;
+	sub.seq.upper = upper;
+	sub.timeout = TIPC_WAIT_FOREVER;
+	sub.filter = TIPC_SUB_PORTS;
+	*(u32 *)&sub.usr_handle = port;
+
+	con = tipc_alloc_conn(tipc_topsrv(net));
+	if (!con)
+		return false;
+
+	*conid = con->conid;
+	s = con->server;
+	scbr = s->tipc_conn_new(*conid);
+	if (!scbr) {
+		tipc_close_conn(con);
+		return false;
+	}
+
+	con->usr_data = scbr;
+	con->sock = NULL;
+	s->tipc_conn_recvmsg(net, *conid, NULL, scbr, &sub, sizeof(sub));
+	return true;
+}
+
+void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
+{
+	struct tipc_conn *con;
+
+	con = tipc_conn_lookup(tipc_topsrv(net), conid);
+	if (!con)
+		return;
+	tipc_close_conn(con);
+	conn_put(con);
+}
+
+static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt)
+{
+	u32 port = *(u32 *)&evt->s.usr_handle;
+	u32 self = tipc_own_addr(net);
+	struct sk_buff_head evtq;
+	struct sk_buff *skb;
+
+	skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt),
+			      self, self, port, port, 0);
+	if (!skb)
+		return;
+	msg_set_dest_droppable(buf_msg(skb), true);
+	memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt));
+	skb_queue_head_init(&evtq);
+	__skb_queue_tail(&evtq, skb);
+	tipc_sk_rcv(net, &evtq);
+}
+
 static void tipc_send_to_sock(struct tipc_conn *con)
 {
 	int count = 0;
 	struct tipc_server *s = con->server;
+	struct tipc_event *evt;
 	struct outqueue_entry *e;
 	struct msghdr msg;
 	int ret;
 
 	spin_lock_bh(&con->outqueue_lock);
 	while (test_bit(CF_CONNECTED, &con->flags)) {
-		e = list_entry(con->outqueue.next, struct outqueue_entry,
-			       list);
+		e = list_entry(con->outqueue.next, struct outqueue_entry, list);
 		if ((struct list_head *) e == &con->outqueue)
 			break;
-		spin_unlock_bh(&con->outqueue_lock);
 
-		memset(&msg, 0, sizeof(msg));
-		msg.msg_flags = MSG_DONTWAIT;
+		spin_unlock_bh(&con->outqueue_lock);
 
-		if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) {
-			msg.msg_name = &e->dest;
-			msg.msg_namelen = sizeof(struct sockaddr_tipc);
-		}
-		ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
-				     e->iov.iov_len);
-		if (ret == -EWOULDBLOCK || ret == 0) {
-			cond_resched();
-			goto out;
-		} else if (ret < 0) {
-			goto send_err;
+		if (con->sock) {
+			memset(&msg, 0, sizeof(msg));
+			msg.msg_flags = MSG_DONTWAIT;
+			if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) {
+				msg.msg_name = &e->dest;
+				msg.msg_namelen = sizeof(struct sockaddr_tipc);
+			}
+			ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
+					     e->iov.iov_len);
+			if (ret == -EWOULDBLOCK || ret == 0) {
+				cond_resched();
+				goto out;
+			} else if (ret < 0) {
+				goto send_err;
+			}
+		} else {
+			evt = e->iov.iov_base;
+			tipc_send_kern_top_evt(s->net, evt);
 		}
-
 		/* Don't starve users filling buffers */
 		if (++count >= MAX_SEND_MSG_COUNT) {
 			cond_resched();
diff --git a/net/tipc/server.h b/net/tipc/server.h
index 34f8055..2113c91 100644
--- a/net/tipc/server.h
+++ b/net/tipc/server.h
@@ -83,13 +83,16 @@ struct tipc_server {
 int tipc_conn_sendmsg(struct tipc_server *s, int conid,
 		      struct sockaddr_tipc *addr, void *data, size_t len);
 
+bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
+			     u32 lower, u32 upper, int *conid);
+void tipc_topsrv_kern_unsubscr(struct net *net, int conid);
+
 /**
  * tipc_conn_terminate - terminate connection with server
  *
  * Note: Must call it in process context since it might sleep
  */
 void tipc_conn_terminate(struct tipc_server *s, int conid);
-
 int tipc_server_start(struct tipc_server *s);
 
 void tipc_server_stop(struct tipc_server *s);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index d50edd6..9a7e7b5c 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -896,6 +896,10 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
 	kfree_skb(skb);
 }
 
+static void tipc_sk_top_evt(struct tipc_sock *tsk, struct tipc_event *evt)
+{
+}
+
 /**
  * tipc_sendmsg - send message in connectionless manner
  * @sock: socket structure
@@ -1671,20 +1675,24 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
 	struct tipc_msg *hdr = buf_msg(skb);
 	unsigned int limit = rcvbuf_limit(sk, skb);
 	int err = TIPC_OK;
-	int usr = msg_user(hdr);
-	u32 onode;
-
-	if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
-		tipc_sk_proto_rcv(tsk, skb, xmitq);
-		return false;
-	}
 
-	if (unlikely(usr == SOCK_WAKEUP)) {
-		onode = msg_orignode(hdr);
+	if (unlikely(!msg_isdata(hdr))) {
+		switch (msg_user(hdr)) {
+		case CONN_MANAGER:
+			tipc_sk_proto_rcv(tsk, skb, xmitq);
+			return false;
+		case SOCK_WAKEUP:
+			u32_del(&tsk->cong_links, msg_orignode(hdr));
+			tsk->cong_link_cnt--;
+			sk->sk_write_space(sk);
+			break;
+		case TOP_SRV:
+			tipc_sk_top_evt(tsk, (void *)msg_data(hdr));
+			break;
+		default:
+			break;
+		}
 		kfree_skb(skb);
-		u32_del(&tsk->cong_links, onode);
-		tsk->cong_link_cnt--;
-		sk->sk_write_space(sk);
 		return false;
 	}
 
-- 
2.1.4

^ permalink raw reply related	[flat|nested] 22+ messages in thread

* [net-next 02/18] tipc: improve address sanity check in tipc_connect()
  2017-10-12 14:02 [net-next 00/18] tipc: Introduce Communcation Group feature Jon Maloy
  2017-10-12 14:02 ` [net-next 01/18] tipc: add ability to order and receive topology events in driver Jon Maloy
@ 2017-10-12 14:02 ` Jon Maloy
  2017-10-12 14:02 ` [net-next 03/18] tipc: add ability to obtain node availability status from other files Jon Maloy
                   ` (15 subsequent siblings)
  17 siblings, 0 replies; 22+ messages in thread
From: Jon Maloy @ 2017-10-12 14:02 UTC (permalink / raw)
  To: davem, netdev; +Cc: parthasarathy.bhuvaragan, ying.xue, tipc-discussion

The address given to tipc_connect() is not completely sanity checked,
under the assumption that this will be done later in the function
__tipc_sendmsg() when the address is used there.

However, the latter functon will in the next commits serve as caller
to several other send functions, so we want to move the corresponding
sanity check there to the beginning of that function, before we possibly
need to grab the address stored by tipc_connect(). We must therefore
be able to trust that this address already has been thoroughly checked.

We do this in this commit.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/socket.c | 31 +++++++++++++++----------------
 1 file changed, 15 insertions(+), 16 deletions(-)

diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 9a7e7b5c..7659e79 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1911,28 +1911,27 @@ static int tipc_connect(struct socket *sock, struct sockaddr *dest,
 	int previous;
 	int res = 0;
 
+	if (destlen != sizeof(struct sockaddr_tipc))
+		return -EINVAL;
+
 	lock_sock(sk);
 
-	/* DGRAM/RDM connect(), just save the destaddr */
-	if (tipc_sk_type_connectionless(sk)) {
-		if (dst->family == AF_UNSPEC) {
-			memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc));
-		} else if (destlen != sizeof(struct sockaddr_tipc)) {
+	if (dst->family == AF_UNSPEC) {
+		memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc));
+		if (!tipc_sk_type_connectionless(sk))
 			res = -EINVAL;
-		} else {
-			memcpy(&tsk->peer, dest, destlen);
-		}
 		goto exit;
+	} else if (dst->family != AF_TIPC) {
+		res = -EINVAL;
 	}
-
-	/*
-	 * Reject connection attempt using multicast address
-	 *
-	 * Note: send_msg() validates the rest of the address fields,
-	 *       so there's no need to do it here
-	 */
-	if (dst->addrtype == TIPC_ADDR_MCAST) {
+	if (dst->addrtype != TIPC_ADDR_ID && dst->addrtype != TIPC_ADDR_NAME)
 		res = -EINVAL;
+	if (res)
+		goto exit;
+
+	/* DGRAM/RDM connect(), just save the destaddr */
+	if (tipc_sk_type_connectionless(sk)) {
+		memcpy(&tsk->peer, dest, destlen);
 		goto exit;
 	}
 
-- 
2.1.4

^ permalink raw reply related	[flat|nested] 22+ messages in thread

* [net-next 03/18] tipc: add ability to obtain node availability status from other files
  2017-10-12 14:02 [net-next 00/18] tipc: Introduce Communcation Group feature Jon Maloy
  2017-10-12 14:02 ` [net-next 01/18] tipc: add ability to order and receive topology events in driver Jon Maloy
  2017-10-12 14:02 ` [net-next 02/18] tipc: improve address sanity check in tipc_connect() Jon Maloy
@ 2017-10-12 14:02 ` Jon Maloy
  2017-10-12 14:02 ` [net-next 04/18] tipc: refactor function filter_rcv() Jon Maloy
                   ` (14 subsequent siblings)
  17 siblings, 0 replies; 22+ messages in thread
From: Jon Maloy @ 2017-10-12 14:02 UTC (permalink / raw)
  To: davem, netdev; +Cc: parthasarathy.bhuvaragan, ying.xue, tipc-discussion

In the coming commits, functions at the socket level will need the
ability to read the availability status of a given node. We therefore
introduce a new function for this purpose, while renaming the existing
static function currently having the wanted name.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/node.c | 26 +++++++++++++++++++++-----
 net/tipc/node.h |  1 +
 2 files changed, 22 insertions(+), 5 deletions(-)

diff --git a/net/tipc/node.c b/net/tipc/node.c
index 198dbc7..6cc1ae6 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -157,7 +157,7 @@ static void tipc_node_timeout(unsigned long data);
 static void tipc_node_fsm_evt(struct tipc_node *n, int evt);
 static struct tipc_node *tipc_node_find(struct net *net, u32 addr);
 static void tipc_node_put(struct tipc_node *node);
-static bool tipc_node_is_up(struct tipc_node *n);
+static bool node_is_up(struct tipc_node *n);
 
 struct tipc_sock_conn {
 	u32 port;
@@ -657,7 +657,7 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
 		*slot1 = i;
 	}
 
-	if (!tipc_node_is_up(n)) {
+	if (!node_is_up(n)) {
 		if (tipc_link_peer_is_down(l))
 			tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
 		tipc_node_fsm_evt(n, SELF_LOST_CONTACT_EVT);
@@ -717,11 +717,27 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
 	tipc_sk_rcv(n->net, &le->inputq);
 }
 
-static bool tipc_node_is_up(struct tipc_node *n)
+static bool node_is_up(struct tipc_node *n)
 {
 	return n->active_links[0] != INVALID_BEARER_ID;
 }
 
+bool tipc_node_is_up(struct net *net, u32 addr)
+{
+	struct tipc_node *n;
+	bool retval = false;
+
+	if (in_own_node(net, addr))
+		return true;
+
+	n = tipc_node_find(net, addr);
+	if (!n)
+		return false;
+	retval = node_is_up(n);
+	tipc_node_put(n);
+	return retval;
+}
+
 void tipc_node_check_dest(struct net *net, u32 onode,
 			  struct tipc_bearer *b,
 			  u16 capabilities, u32 signature,
@@ -1149,7 +1165,7 @@ static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
 
 	if (nla_put_u32(msg->skb, TIPC_NLA_NODE_ADDR, node->addr))
 		goto attr_msg_full;
-	if (tipc_node_is_up(node))
+	if (node_is_up(node))
 		if (nla_put_flag(msg->skb, TIPC_NLA_NODE_UP))
 			goto attr_msg_full;
 
@@ -1249,7 +1265,7 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
 		dst = n->addr;
 		if (in_own_node(net, dst))
 			continue;
-		if (!tipc_node_is_up(n))
+		if (!node_is_up(n))
 			continue;
 		txskb = pskb_copy(skb, GFP_ATOMIC);
 		if (!txskb)
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 898c229..8db59fe 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -76,6 +76,7 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb);
 int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
 void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
 int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel);
+bool tipc_node_is_up(struct net *net, u32 addr);
 u16 tipc_node_get_capabilities(struct net *net, u32 addr);
 int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb);
 int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb);
-- 
2.1.4

^ permalink raw reply related	[flat|nested] 22+ messages in thread

* [net-next 04/18] tipc: refactor function filter_rcv()
  2017-10-12 14:02 [net-next 00/18] tipc: Introduce Communcation Group feature Jon Maloy
                   ` (2 preceding siblings ...)
  2017-10-12 14:02 ` [net-next 03/18] tipc: add ability to obtain node availability status from other files Jon Maloy
@ 2017-10-12 14:02 ` Jon Maloy
  2017-10-12 18:56   ` David Miller
  2017-10-12 14:02 ` [net-next 05/18] tipc: add new function for sending multiple small messages Jon Maloy
                   ` (13 subsequent siblings)
  17 siblings, 1 reply; 22+ messages in thread
From: Jon Maloy @ 2017-10-12 14:02 UTC (permalink / raw)
  To: davem, netdev; +Cc: parthasarathy.bhuvaragan, ying.xue, tipc-discussion

In the following commits we will need to handle multiple incoming and
rejected/returned buffers in the function socket.c::filter_rcv().
As a preparation for this, we generalize the function by handling
buffer queues instead of individual buffers. We also introduce a
help function tipc_skb_reject(), and rename filter_rcv() to
tipc_sk_filter_rcv() in line with other functions in socket.c.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/msg.c    |   7 +++
 net/tipc/msg.h    |   2 +
 net/tipc/socket.c | 157 +++++++++++++++++++++++++++---------------------------
 3 files changed, 87 insertions(+), 79 deletions(-)

diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 17146c1..1649d45 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -666,3 +666,10 @@ void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
 	}
 	kfree_skb(skb);
 }
+
+void tipc_skb_reject(struct net *net, int err, struct sk_buff *skb,
+		     struct sk_buff_head *xmitq)
+{
+	if (tipc_msg_reverse(tipc_own_addr(net), &skb, err))
+		__skb_queue_tail(xmitq, skb);
+}
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index d058b1c..be3e38a 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -819,6 +819,8 @@ static inline bool msg_is_reset(struct tipc_msg *hdr)
 struct sk_buff *tipc_buf_acquire(u32 size, gfp_t gfp);
 bool tipc_msg_validate(struct sk_buff *skb);
 bool tipc_msg_reverse(u32 own_addr, struct sk_buff **skb, int err);
+void tipc_skb_reject(struct net *net, int err, struct sk_buff *skb,
+		     struct sk_buff_head *xmitq);
 void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type,
 		   u32 hsize, u32 destnode);
 struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 7659e79..81b7d46c 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -111,7 +111,7 @@ struct tipc_sock {
 	struct rcu_head rcu;
 };
 
-static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
+static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb);
 static void tipc_data_ready(struct sock *sk);
 static void tipc_write_space(struct sock *sk);
 static void tipc_sock_destruct(struct sock *sk);
@@ -453,7 +453,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
 	msg_set_origport(msg, tsk->portid);
 	setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
 	sk->sk_shutdown = 0;
-	sk->sk_backlog_rcv = tipc_backlog_rcv;
+	sk->sk_backlog_rcv = tipc_sk_backlog_rcv;
 	sk->sk_rcvbuf = sysctl_tipc_rmem[1];
 	sk->sk_data_ready = tipc_data_ready;
 	sk->sk_write_space = tipc_write_space;
@@ -850,12 +850,12 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
 }
 
 /**
- * tipc_sk_proto_rcv - receive a connection mng protocol message
+ * tipc_sk_conn_proto_rcv - receive a connection mng protocol message
  * @tsk: receiving socket
  * @skb: pointer to message buffer.
  */
-static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
-			      struct sk_buff_head *xmitq)
+static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
+				   struct sk_buff_head *xmitq)
 {
 	struct sock *sk = &tsk->sk;
 	u32 onode = tsk_own_node(tsk);
@@ -1536,14 +1536,41 @@ static void tipc_sock_destruct(struct sock *sk)
 	__skb_queue_purge(&sk->sk_receive_queue);
 }
 
+static void tipc_sk_proto_rcv(struct sock *sk,
+			      struct sk_buff_head *inputq,
+			      struct sk_buff_head *xmitq)
+{
+	struct tipc_sock *tsk = tipc_sk(sk);
+	struct sk_buff *skb = __skb_dequeue(inputq);
+	struct tipc_msg *hdr = buf_msg(skb);
+
+	switch (msg_user(hdr)) {
+	case CONN_MANAGER:
+		tipc_sk_conn_proto_rcv(tsk, skb, xmitq);
+		return;
+	case SOCK_WAKEUP:
+		u32_del(&tsk->cong_links, msg_orignode(hdr));
+		tsk->cong_link_cnt--;
+		sk->sk_write_space(sk);
+		break;
+	case TOP_SRV:
+		tipc_sk_top_evt(tsk, (void *)msg_data(hdr));
+		break;
+	default:
+		break;
+	}
+
+	kfree_skb(skb);
+}
+
 /**
- * filter_connect - Handle all incoming messages for a connection-based socket
+ * tipc_filter_connect - Handle incoming message for a connection-based socket
  * @tsk: TIPC socket
  * @skb: pointer to message buffer. Set to NULL if buffer is consumed
  *
  * Returns true if everything ok, false otherwise
  */
-static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
+static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
 {
 	struct sock *sk = &tsk->sk;
 	struct net *net = sock_net(sk);
@@ -1657,7 +1684,7 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
 }
 
 /**
- * filter_rcv - validate incoming message
+ * tipc_sk_filter_rcv - validate incoming message
  * @sk: socket
  * @skb: pointer to message.
  *
@@ -1666,75 +1693,49 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
  *
  * Called with socket lock already taken
  *
- * Returns true if message was added to socket receive queue, otherwise false
  */
-static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
-		       struct sk_buff_head *xmitq)
+static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
+			       struct sk_buff_head *xmitq)
 {
+	bool sk_conn = !tipc_sk_type_connectionless(sk);
 	struct tipc_sock *tsk = tipc_sk(sk);
 	struct tipc_msg *hdr = buf_msg(skb);
-	unsigned int limit = rcvbuf_limit(sk, skb);
-	int err = TIPC_OK;
+	struct net *net = sock_net(sk);
+	struct sk_buff_head inputq;
+	int limit, err = TIPC_OK;
 
-	if (unlikely(!msg_isdata(hdr))) {
-		switch (msg_user(hdr)) {
-		case CONN_MANAGER:
-			tipc_sk_proto_rcv(tsk, skb, xmitq);
-			return false;
-		case SOCK_WAKEUP:
-			u32_del(&tsk->cong_links, msg_orignode(hdr));
-			tsk->cong_link_cnt--;
-			sk->sk_write_space(sk);
-			break;
-		case TOP_SRV:
-			tipc_sk_top_evt(tsk, (void *)msg_data(hdr));
-			break;
-		default:
-			break;
-		}
-		kfree_skb(skb);
-		return false;
-	}
+	TIPC_SKB_CB(skb)->bytes_read = 0;
+	__skb_queue_head_init(&inputq);
+	__skb_queue_tail(&inputq, skb);
 
-	/* Drop if illegal message type */
-	if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) {
-		kfree_skb(skb);
-		return false;
-	}
+	if (unlikely(!msg_isdata(hdr)))
+		tipc_sk_proto_rcv(sk, &inputq, xmitq);
+	else if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG))
+		return kfree_skb(skb);
 
-	/* Reject if wrong message type for current socket state */
-	if (tipc_sk_type_connectionless(sk)) {
-		if (msg_connected(hdr)) {
+	/* Validate and add to receive buffer if there is space */
+	while ((skb = __skb_dequeue(&inputq))) {
+		hdr = buf_msg(skb);
+		limit = rcvbuf_limit(sk, skb);
+		if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) ||
+		    (!sk_conn && msg_connected(hdr)))
 			err = TIPC_ERR_NO_PORT;
-			goto reject;
-		}
-	} else if (unlikely(!filter_connect(tsk, skb))) {
-		err = TIPC_ERR_NO_PORT;
-		goto reject;
-	}
+		else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit)
+			err = TIPC_ERR_OVERLOAD;
 
-	/* Reject message if there isn't room to queue it */
-	if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) {
-		err = TIPC_ERR_OVERLOAD;
-		goto reject;
+		if (unlikely(err)) {
+			tipc_skb_reject(net, err, skb, xmitq);
+			err = TIPC_OK;
+			continue;
+		}
+		__skb_queue_tail(&sk->sk_receive_queue, skb);
+		skb_set_owner_r(skb, sk);
+		sk->sk_data_ready(sk);
 	}
-
-	/* Enqueue message */
-	TIPC_SKB_CB(skb)->bytes_read = 0;
-	__skb_queue_tail(&sk->sk_receive_queue, skb);
-	skb_set_owner_r(skb, sk);
-
-	sk->sk_data_ready(sk);
-	return true;
-
-reject:
-	if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err))
-		__skb_queue_tail(xmitq, skb);
-	return false;
 }
 
 /**
- * tipc_backlog_rcv - handle incoming message from backlog queue
+ * tipc_sk_backlog_rcv - handle incoming message from backlog queue
  * @sk: socket
  * @skb: message
  *
@@ -1742,27 +1743,25 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
  *
  * Returns 0
  */
-static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
+static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb)
 {
-	unsigned int truesize = skb->truesize;
+	unsigned int before = sk_rmem_alloc_get(sk);
 	struct sk_buff_head xmitq;
 	u32 dnode, selector;
+	unsigned int added;
 
 	__skb_queue_head_init(&xmitq);
 
-	if (likely(filter_rcv(sk, skb, &xmitq))) {
-		atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
-		return 0;
-	}
+	tipc_sk_filter_rcv(sk, skb, &xmitq);
+	added = sk_rmem_alloc_get(sk) - before;
+	atomic_add(added, &tipc_sk(sk)->dupl_rcvcnt);
 
-	if (skb_queue_empty(&xmitq))
-		return 0;
-
-	/* Send response/rejected message */
-	skb = __skb_dequeue(&xmitq);
-	dnode = msg_destnode(buf_msg(skb));
-	selector = msg_origport(buf_msg(skb));
-	tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
+	/* Send pending response/rejected messages, if any */
+	while ((skb = __skb_dequeue(&xmitq))) {
+		selector = msg_origport(buf_msg(skb));
+		dnode = msg_destnode(buf_msg(skb));
+		tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
+	}
 	return 0;
 }
 
@@ -1794,7 +1793,7 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
 
 		/* Add message directly to receive queue if possible */
 		if (!sock_owned_by_user(sk)) {
-			filter_rcv(sk, skb, xmitq);
+			tipc_sk_filter_rcv(sk, skb, xmitq);
 			continue;
 		}
 
-- 
2.1.4

^ permalink raw reply related	[flat|nested] 22+ messages in thread

* [net-next 05/18] tipc: add new function for sending multiple small messages
  2017-10-12 14:02 [net-next 00/18] tipc: Introduce Communcation Group feature Jon Maloy
                   ` (3 preceding siblings ...)
  2017-10-12 14:02 ` [net-next 04/18] tipc: refactor function filter_rcv() Jon Maloy
@ 2017-10-12 14:02 ` Jon Maloy
  2017-10-12 14:02 ` [net-next 06/18] tipc: improve destination linked list Jon Maloy
                   ` (12 subsequent siblings)
  17 siblings, 0 replies; 22+ messages in thread
From: Jon Maloy @ 2017-10-12 14:02 UTC (permalink / raw)
  To: davem, netdev; +Cc: parthasarathy.bhuvaragan, ying.xue, tipc-discussion

We see an increasing need to send multiple single-buffer messages
of TIPC_SYSTEM_IMPORTANCE to different individual destination nodes.
Instead of looping over the send queue and sending each buffer
individually, as we do now, we add a new help function
tipc_node_distr_xmit() to do this.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/node.c   | 16 ++++++++++++++++
 net/tipc/node.h   |  1 +
 net/tipc/socket.c | 14 ++------------
 3 files changed, 19 insertions(+), 12 deletions(-)

diff --git a/net/tipc/node.c b/net/tipc/node.c
index 6cc1ae6..89f8ac73 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1254,6 +1254,22 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
 	return 0;
 }
 
+/* tipc_node_distr_xmit(): send single buffer msgs to individual destinations
+ * Note: this is only for SYSTEM_IMPORTANCE messages, which cannot be rejected
+ */
+int tipc_node_distr_xmit(struct net *net, struct sk_buff_head *xmitq)
+{
+	struct sk_buff *skb;
+	u32 selector, dnode;
+
+	while ((skb = __skb_dequeue(xmitq))) {
+		selector = msg_origport(buf_msg(skb));
+		dnode = msg_destnode(buf_msg(skb));
+		tipc_node_xmit_skb(net, skb, dnode, selector);
+	}
+	return 0;
+}
+
 void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
 {
 	struct sk_buff *txskb;
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 8db59fe..df2f219 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -68,6 +68,7 @@ int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node,
 			   char *linkname, size_t len);
 int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
 		   int selector);
+int tipc_node_distr_xmit(struct net *net, struct sk_buff_head *list);
 int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
 		       u32 selector);
 void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 addr);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 81b7d46c..6b2b4ca 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1740,14 +1740,11 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
  * @skb: message
  *
  * Caller must hold socket lock
- *
- * Returns 0
  */
 static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb)
 {
 	unsigned int before = sk_rmem_alloc_get(sk);
 	struct sk_buff_head xmitq;
-	u32 dnode, selector;
 	unsigned int added;
 
 	__skb_queue_head_init(&xmitq);
@@ -1757,11 +1754,7 @@ static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb)
 	atomic_add(added, &tipc_sk(sk)->dupl_rcvcnt);
 
 	/* Send pending response/rejected messages, if any */
-	while ((skb = __skb_dequeue(&xmitq))) {
-		selector = msg_origport(buf_msg(skb));
-		dnode = msg_destnode(buf_msg(skb));
-		tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
-	}
+	tipc_node_distr_xmit(sock_net(sk), &xmitq);
 	return 0;
 }
 
@@ -1840,10 +1833,7 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
 				spin_unlock_bh(&sk->sk_lock.slock);
 			}
 			/* Send pending response/rejected messages, if any */
-			while ((skb = __skb_dequeue(&xmitq))) {
-				dnode = msg_destnode(buf_msg(skb));
-				tipc_node_xmit_skb(net, skb, dnode, dport);
-			}
+			tipc_node_distr_xmit(sock_net(sk), &xmitq);
 			sock_put(sk);
 			continue;
 		}
-- 
2.1.4

^ permalink raw reply related	[flat|nested] 22+ messages in thread

* [net-next 06/18] tipc: improve destination linked list
  2017-10-12 14:02 [net-next 00/18] tipc: Introduce Communcation Group feature Jon Maloy
                   ` (4 preceding siblings ...)
  2017-10-12 14:02 ` [net-next 05/18] tipc: add new function for sending multiple small messages Jon Maloy
@ 2017-10-12 14:02 ` Jon Maloy
  2017-10-12 18:57   ` David Miller
  2017-10-12 14:02 ` [net-next 07/18] tipc: introduce communication groups Jon Maloy
                   ` (11 subsequent siblings)
  17 siblings, 1 reply; 22+ messages in thread
From: Jon Maloy @ 2017-10-12 14:02 UTC (permalink / raw)
  To: davem, netdev; +Cc: parthasarathy.bhuvaragan, ying.xue, tipc-discussion

We often see a need for a linked list of destination identities,
sometimes containing a port number, sometimes a node identity, and
sometimes both. The currently defined struct u32_list is not generic
enough to cover all cases, so we extend it to contain two u32 integers
and rename it to struct tipc_dest_list.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/bcast.c      | 18 +++++------
 net/tipc/name_table.c | 89 ++++++++++++++++++++++++++-------------------------
 net/tipc/name_table.h | 22 ++++++++-----
 net/tipc/socket.c     | 12 +++----
 4 files changed, 74 insertions(+), 67 deletions(-)

diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index a140dd4..421fa44 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -259,19 +259,19 @@ static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
 			   struct tipc_nlist *dests, u16 *cong_link_cnt)
 {
 	struct sk_buff_head _pkts;
-	struct u32_item *n, *tmp;
-	u32 dst, selector;
+	struct tipc_dest *dst, *tmp;
+	u32 dnode, selector;
 
 	selector = msg_link_selector(buf_msg(skb_peek(pkts)));
 	skb_queue_head_init(&_pkts);
 
-	list_for_each_entry_safe(n, tmp, &dests->list, list) {
-		dst = n->value;
-		if (!tipc_msg_pskb_copy(dst, pkts, &_pkts))
+	list_for_each_entry_safe(dst, tmp, &dests->list, list) {
+		dnode = dst->node;
+		if (!tipc_msg_pskb_copy(dnode, pkts, &_pkts))
 			return -ENOMEM;
 
 		/* Any other return value than -ELINKCONG is ignored */
-		if (tipc_node_xmit(net, &_pkts, dst, selector) == -ELINKCONG)
+		if (tipc_node_xmit(net, &_pkts, dnode, selector) == -ELINKCONG)
 			(*cong_link_cnt)++;
 	}
 	return 0;
@@ -554,7 +554,7 @@ void tipc_nlist_add(struct tipc_nlist *nl, u32 node)
 {
 	if (node == nl->self)
 		nl->local = true;
-	else if (u32_push(&nl->list, node))
+	else if (tipc_dest_push(&nl->list, node, 0))
 		nl->remote++;
 }
 
@@ -562,13 +562,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 node)
 {
 	if (node == nl->self)
 		nl->local = false;
-	else if (u32_del(&nl->list, node))
+	else if (tipc_dest_del(&nl->list, node, 0))
 		nl->remote--;
 }
 
 void tipc_nlist_purge(struct tipc_nlist *nl)
 {
-	u32_list_purge(&nl->list);
+	tipc_dest_list_purge(&nl->list);
 	nl->remote = 0;
 	nl->local = 0;
 }
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index bd0aac8..20f75b1 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -634,7 +634,7 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
 		info = sseq->info;
 		list_for_each_entry(publ, &info->node_list, node_list) {
 			if (publ->scope <= limit)
-				u32_push(dports, publ->ref);
+				tipc_dest_push(dports, 0, publ->ref);
 		}
 
 		if (info->cluster_list_size != info->node_list_size)
@@ -1057,78 +1057,79 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb)
 	return skb->len;
 }
 
-bool u32_find(struct list_head *l, u32 value)
+struct tipc_dest *tipc_dest_find(struct list_head *l, u32 node, u32 port)
 {
-	struct u32_item *item;
+	struct tipc_dest *dst;
+	u64 value = (u64)node << 32 | port;
 
-	list_for_each_entry(item, l, list) {
-		if (item->value == value)
-			return true;
+	list_for_each_entry(dst, l, list) {
+		if (dst->value != value)
+			continue;
+		return dst;
 	}
-	return false;
+	return NULL;
 }
 
-bool u32_push(struct list_head *l, u32 value)
+bool tipc_dest_push(struct list_head *l, u32 node, u32 port)
 {
-	struct u32_item *item;
+	struct tipc_dest *dst;
+	u64 value = (u64)node << 32 | port;
 
-	list_for_each_entry(item, l, list) {
-		if (item->value == value)
-			return false;
-	}
-	item = kmalloc(sizeof(*item), GFP_ATOMIC);
-	if (unlikely(!item))
+	if (tipc_dest_find(l, node, port))
 		return false;
 
-	item->value = value;
-	list_add(&item->list, l);
+	dst = kmalloc(sizeof(*dst), GFP_ATOMIC);
+	if (unlikely(!dst))
+		return false;
+	dst->value = value;
+	list_add(&dst->list, l);
 	return true;
 }
 
-u32 u32_pop(struct list_head *l)
+bool tipc_dest_pop(struct list_head *l, u32 *node, u32 *port)
 {
-	struct u32_item *item;
-	u32 value = 0;
+	struct tipc_dest *dst;
 
 	if (list_empty(l))
-		return 0;
-	item = list_first_entry(l, typeof(*item), list);
-	value = item->value;
-	list_del(&item->list);
-	kfree(item);
-	return value;
+		return false;
+	dst = list_first_entry(l, typeof(*dst), list);
+	if (port)
+		*port = dst->port;
+	if (node)
+		*node = dst->node;
+	list_del(&dst->list);
+	kfree(dst);
+	return true;
 }
 
-bool u32_del(struct list_head *l, u32 value)
+bool tipc_dest_del(struct list_head *l, u32 node, u32 port)
 {
-	struct u32_item *item, *tmp;
+	struct tipc_dest *dst;
 
-	list_for_each_entry_safe(item, tmp, l, list) {
-		if (item->value != value)
-			continue;
-		list_del(&item->list);
-		kfree(item);
-		return true;
-	}
-	return false;
+	dst = tipc_dest_find(l, node, port);
+	if (!dst)
+		return false;
+	list_del(&dst->list);
+	kfree(dst);
+	return true;
 }
 
-void u32_list_purge(struct list_head *l)
+void tipc_dest_list_purge(struct list_head *l)
 {
-	struct u32_item *item, *tmp;
+	struct tipc_dest *dst, *tmp;
 
-	list_for_each_entry_safe(item, tmp, l, list) {
-		list_del(&item->list);
-		kfree(item);
+	list_for_each_entry_safe(dst, tmp, l, list) {
+		list_del(&dst->list);
+		kfree(dst);
 	}
 }
 
-int u32_list_len(struct list_head *l)
+int tipc_dest_list_len(struct list_head *l)
 {
-	struct u32_item *item;
+	struct tipc_dest *dst;
 	int i = 0;
 
-	list_for_each_entry(item, l, list) {
+	list_for_each_entry(dst, l, list) {
 		i++;
 	}
 	return i;
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index 6ebdeb1..d121175 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -120,16 +120,22 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s);
 int tipc_nametbl_init(struct net *net);
 void tipc_nametbl_stop(struct net *net);
 
-struct u32_item {
+struct tipc_dest {
 	struct list_head list;
-	u32 value;
+	union {
+		struct {
+			u32 port;
+			u32 node;
+		};
+		u64 value;
+	};
 };
 
-bool u32_push(struct list_head *l, u32 value);
-u32 u32_pop(struct list_head *l);
-bool u32_find(struct list_head *l, u32 value);
-bool u32_del(struct list_head *l, u32 value);
-void u32_list_purge(struct list_head *l);
-int u32_list_len(struct list_head *l);
+struct tipc_dest *tipc_dest_find(struct list_head *l, u32 node, u32 port);
+bool tipc_dest_push(struct list_head *l, u32 node, u32 port);
+bool tipc_dest_pop(struct list_head *l, u32 *node, u32 *port);
+bool tipc_dest_del(struct list_head *l, u32 node, u32 port);
+void tipc_dest_list_purge(struct list_head *l);
+int tipc_dest_list_len(struct list_head *l);
 
 #endif
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 6b2b4ca..850358f 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -565,7 +565,7 @@ static int tipc_release(struct socket *sock)
 
 	/* Reject any messages that accumulated in backlog queue */
 	release_sock(sk);
-	u32_list_purge(&tsk->cong_links);
+	tipc_dest_list_purge(&tsk->cong_links);
 	tsk->cong_link_cnt = 0;
 	call_rcu(&tsk->rcu, tipc_sk_callback);
 	sock->sk = NULL;
@@ -826,8 +826,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
 		tipc_nametbl_mc_translate(net,
 					  msg_nametype(msg), msg_namelower(msg),
 					  msg_nameupper(msg), scope, &dports);
-		portid = u32_pop(&dports);
-		for (; portid; portid = u32_pop(&dports)) {
+		while (tipc_dest_pop(&dports, NULL, &portid)) {
 			_skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
 			if (_skb) {
 				msg_set_destport(buf_msg(_skb), portid);
@@ -1000,7 +999,8 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
 	}
 
 	/* Block or return if destination link is congested */
-	rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode));
+	rc = tipc_wait_for_cond(sock, &timeout,
+				!tipc_dest_find(clinks, dnode, 0));
 	if (unlikely(rc))
 		return rc;
 
@@ -1012,7 +1012,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
 
 	rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
 	if (unlikely(rc == -ELINKCONG)) {
-		u32_push(clinks, dnode);
+		tipc_dest_push(clinks, dnode, 0);
 		tsk->cong_link_cnt++;
 		rc = 0;
 	}
@@ -1549,7 +1549,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
 		tipc_sk_conn_proto_rcv(tsk, skb, xmitq);
 		return;
 	case SOCK_WAKEUP:
-		u32_del(&tsk->cong_links, msg_orignode(hdr));
+		tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0);
 		tsk->cong_link_cnt--;
 		sk->sk_write_space(sk);
 		break;
-- 
2.1.4

^ permalink raw reply related	[flat|nested] 22+ messages in thread

* [net-next 07/18] tipc: introduce communication groups
  2017-10-12 14:02 [net-next 00/18] tipc: Introduce Communcation Group feature Jon Maloy
                   ` (5 preceding siblings ...)
  2017-10-12 14:02 ` [net-next 06/18] tipc: improve destination linked list Jon Maloy
@ 2017-10-12 14:02 ` Jon Maloy
  2017-10-12 14:02 ` [net-next 08/18] tipc: add second source address to recvmsg()/recvfrom() Jon Maloy
                   ` (10 subsequent siblings)
  17 siblings, 0 replies; 22+ messages in thread
From: Jon Maloy @ 2017-10-12 14:02 UTC (permalink / raw)
  To: davem, netdev; +Cc: parthasarathy.bhuvaragan, ying.xue, tipc-discussion

As a preparation for introducing flow control for multicast and datagram
messaging we need a more strictly defined framework than we have now. A
socket must be able keep track of exactly how many and which other
sockets it is allowed to communicate with at any moment, and keep the
necessary state for those.

We therefore introduce a new concept we have named Communication Groups.
Sockets can join a group via a new setsockopt() call TIPC_GROUP_JOIN.
The call takes four parameters: 'type' serves as group identifier,
'instance' serves as an logical member identifier, and 'scope' indicates
the visibility of the group (node/cluster/zone). Finally, 'flags' makes
it possible to set certain properties for the member. For now, there is
only one flag, indicating if the creator of the socket wants to receive
a copy of broadcast or multicast messages it is sending via the socket,
and if wants to be eligible as destination for its own anycasts.

A group is closed, i.e., sockets which have not joined a group will
not be able to send messages to or receive messages from members of
the group, and vice versa.

Any member of a group can send multicast ('group broadcast') messages
to all group members, optionally including itself, using the primitive
send(). The messages are received via the recvmsg() primitive. A socket
can only be member of one group at a time.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
---
 include/uapi/linux/tipc.h |  14 ++
 net/tipc/Makefile         |   2 +-
 net/tipc/group.c          | 404 ++++++++++++++++++++++++++++++++++++++++++++++
 net/tipc/group.h          |  64 ++++++++
 net/tipc/link.c           |   3 +-
 net/tipc/msg.h            |  42 ++++-
 net/tipc/name_table.c     |  44 +++--
 net/tipc/name_table.h     |   3 +
 net/tipc/node.h           |   3 +-
 net/tipc/socket.c         | 201 ++++++++++++++++++++---
 10 files changed, 740 insertions(+), 40 deletions(-)
 create mode 100644 net/tipc/group.c
 create mode 100644 net/tipc/group.h

diff --git a/include/uapi/linux/tipc.h b/include/uapi/linux/tipc.h
index 5351b08..5f7b2c4 100644
--- a/include/uapi/linux/tipc.h
+++ b/include/uapi/linux/tipc.h
@@ -231,6 +231,20 @@ struct sockaddr_tipc {
 #define TIPC_SOCK_RECVQ_DEPTH	132	/* Default: none (read only) */
 #define TIPC_MCAST_BROADCAST    133     /* Default: TIPC selects. No arg */
 #define TIPC_MCAST_REPLICAST    134     /* Default: TIPC selects. No arg */
+#define TIPC_GROUP_JOIN         135     /* Takes struct tipc_group_req* */
+#define TIPC_GROUP_LEAVE        136     /* No argument */
+
+/*
+ * Flag values
+ */
+#define TIPC_GROUP_LOOPBACK     0x1  /* Receive copy of sent msg when match */
+
+struct tipc_group_req {
+	__u32 type;      /* group id */
+	__u32 instance;  /* member id */
+	__u32 scope;     /* zone/cluster/node */
+	__u32 flags;
+};
 
 /*
  * Maximum sizes of TIPC bearer-related names (including terminating NULL)
diff --git a/net/tipc/Makefile b/net/tipc/Makefile
index 31b9f9c..a3af73e 100644
--- a/net/tipc/Makefile
+++ b/net/tipc/Makefile
@@ -8,7 +8,7 @@ tipc-y	+= addr.o bcast.o bearer.o \
 	   core.o link.o discover.o msg.o  \
 	   name_distr.o  subscr.o monitor.o name_table.o net.o  \
 	   netlink.o netlink_compat.o node.o socket.o eth_media.o \
-	   server.o socket.o
+	   server.o socket.o group.o
 
 tipc-$(CONFIG_TIPC_MEDIA_UDP)	+= udp_media.o
 tipc-$(CONFIG_TIPC_MEDIA_IB)	+= ib_media.o
diff --git a/net/tipc/group.c b/net/tipc/group.c
new file mode 100644
index 0000000..20663e5
--- /dev/null
+++ b/net/tipc/group.c
@@ -0,0 +1,404 @@
+/*
+ * net/tipc/group.c: TIPC group messaging code
+ *
+ * Copyright (c) 2017, Ericsson AB
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. Neither the names of the copyright holders nor the names of its
+ *    contributors may be used to endorse or promote products derived from
+ *    this software without specific prior written permission.
+ *
+ * Alternatively, this software may be distributed under the terms of the
+ * GNU General Public License ("GPL") version 2 as published by the Free
+ * Software Foundation.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "core.h"
+#include "addr.h"
+#include "group.h"
+#include "bcast.h"
+#include "server.h"
+#include "msg.h"
+#include "socket.h"
+#include "node.h"
+#include "name_table.h"
+#include "subscr.h"
+
+#define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)
+#define ADV_IDLE ADV_UNIT
+
+enum mbr_state {
+	MBR_QUARANTINED,
+	MBR_DISCOVERED,
+	MBR_JOINING,
+	MBR_PUBLISHED,
+	MBR_JOINED,
+	MBR_LEAVING
+};
+
+struct tipc_member {
+	struct rb_node tree_node;
+	struct list_head list;
+	u32 node;
+	u32 port;
+	enum mbr_state state;
+	u16 bc_rcv_nxt;
+};
+
+struct tipc_group {
+	struct rb_root members;
+	struct tipc_nlist dests;
+	struct net *net;
+	int subid;
+	u32 type;
+	u32 instance;
+	u32 domain;
+	u32 scope;
+	u32 portid;
+	u16 member_cnt;
+	u16 bc_snd_nxt;
+	bool loopback;
+};
+
+static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
+				  int mtyp, struct sk_buff_head *xmitq);
+
+u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
+{
+	return grp->bc_snd_nxt;
+}
+
+static bool tipc_group_is_receiver(struct tipc_member *m)
+{
+	return m && m->state >= MBR_JOINED;
+}
+
+int tipc_group_size(struct tipc_group *grp)
+{
+	return grp->member_cnt;
+}
+
+struct tipc_group *tipc_group_create(struct net *net, u32 portid,
+				     struct tipc_group_req *mreq)
+{
+	struct tipc_group *grp;
+	u32 type = mreq->type;
+
+	grp = kzalloc(sizeof(*grp), GFP_ATOMIC);
+	if (!grp)
+		return NULL;
+	tipc_nlist_init(&grp->dests, tipc_own_addr(net));
+	grp->members = RB_ROOT;
+	grp->net = net;
+	grp->portid = portid;
+	grp->domain = addr_domain(net, mreq->scope);
+	grp->type = type;
+	grp->instance = mreq->instance;
+	grp->scope = mreq->scope;
+	grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
+	if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid))
+		return grp;
+	kfree(grp);
+	return NULL;
+}
+
+void tipc_group_delete(struct net *net, struct tipc_group *grp)
+{
+	struct rb_root *tree = &grp->members;
+	struct sk_buff_head xmitq;
+	struct tipc_member *m, *tmp;
+
+	__skb_queue_head_init(&xmitq);
+
+	rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
+		tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq);
+		list_del(&m->list);
+		kfree(m);
+	}
+	tipc_node_distr_xmit(net, &xmitq);
+	tipc_nlist_purge(&grp->dests);
+	tipc_topsrv_kern_unsubscr(net, grp->subid);
+	kfree(grp);
+}
+
+struct tipc_member *tipc_group_find_member(struct tipc_group *grp,
+					   u32 node, u32 port)
+{
+	struct tipc_member *m;
+	struct rb_node *n = grp->members.rb_node;
+	u64 nkey, key = (u64)node << 32 | port;
+
+	while (n) {
+		m = container_of(n, struct tipc_member, tree_node);
+		nkey = (u64)m->node << 32 | m->port;
+		if (key < nkey)
+			n = n->rb_left;
+		else if (key > nkey)
+			n = n->rb_right;
+		else
+			return m;
+	}
+	return NULL;
+}
+
+static struct tipc_member *tipc_group_find_node(struct tipc_group *grp,
+						u32 node)
+{
+	struct tipc_member *m;
+	struct rb_node *n;
+
+	for (n = rb_first(&grp->members); n; n = rb_next(n)) {
+		m = container_of(n, struct tipc_member, tree_node);
+		if (m->node == node)
+			return m;
+	}
+	return NULL;
+}
+
+static void tipc_group_add_to_tree(struct tipc_group *grp,
+				   struct tipc_member *m)
+{
+	struct rb_node **n, *parent = NULL;
+	u64 nkey, key = (u64)m->node << 32 | m->port;
+	struct tipc_member *tmp;
+
+	n = &grp->members.rb_node;
+	while (*n) {
+		tmp = container_of(*n, struct tipc_member, tree_node);
+		parent = *n;
+		tmp = container_of(parent, struct tipc_member, tree_node);
+		nkey = (u64)tmp->node << 32 | tmp->port;
+		if (key < nkey)
+			n = &(*n)->rb_left;
+		else if (key > nkey)
+			n = &(*n)->rb_right;
+		else
+			return;
+	}
+	rb_link_node(&m->tree_node, parent, n);
+	rb_insert_color(&m->tree_node, &grp->members);
+}
+
+static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
+						    u32 node, u32 port,
+						    int state)
+{
+	struct tipc_member *m;
+
+	m = kzalloc(sizeof(*m), GFP_ATOMIC);
+	if (!m)
+		return NULL;
+	INIT_LIST_HEAD(&m->list);
+	m->node = node;
+	m->port = port;
+	grp->member_cnt++;
+	tipc_group_add_to_tree(grp, m);
+	tipc_nlist_add(&grp->dests, m->node);
+	m->state = state;
+	return m;
+}
+
+void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port)
+{
+	tipc_group_create_member(grp, node, port, MBR_DISCOVERED);
+}
+
+static void tipc_group_delete_member(struct tipc_group *grp,
+				     struct tipc_member *m)
+{
+	rb_erase(&m->tree_node, &grp->members);
+	grp->member_cnt--;
+	list_del_init(&m->list);
+
+	/* If last member on a node, remove node from dest list */
+	if (!tipc_group_find_node(grp, m->node))
+		tipc_nlist_del(&grp->dests, m->node);
+
+	kfree(m);
+}
+
+struct tipc_nlist *tipc_group_dests(struct tipc_group *grp)
+{
+	return &grp->dests;
+}
+
+void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
+		     int *scope)
+{
+	seq->type = grp->type;
+	seq->lower = grp->instance;
+	seq->upper = grp->instance;
+	*scope = grp->scope;
+}
+
+void tipc_group_update_bc_members(struct tipc_group *grp)
+{
+	grp->bc_snd_nxt++;
+}
+
+/* tipc_group_filter_msg() - determine if we should accept arriving message
+ */
+void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
+			   struct sk_buff_head *xmitq)
+{
+	struct sk_buff *skb = __skb_dequeue(inputq);
+	struct tipc_member *m;
+	struct tipc_msg *hdr;
+	u32 node, port;
+	int mtyp;
+
+	if (!skb)
+		return;
+
+	hdr = buf_msg(skb);
+	mtyp = msg_type(hdr);
+	node =  msg_orignode(hdr);
+	port = msg_origport(hdr);
+
+	if (!msg_in_group(hdr))
+		goto drop;
+
+	m = tipc_group_find_member(grp, node, port);
+	if (!tipc_group_is_receiver(m))
+		goto drop;
+
+	__skb_queue_tail(inputq, skb);
+
+	m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
+	return;
+drop:
+	kfree_skb(skb);
+}
+
+static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
+				  int mtyp, struct sk_buff_head *xmitq)
+{
+	struct sk_buff *skb;
+	struct tipc_msg *hdr;
+
+	skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,
+			      m->node, tipc_own_addr(grp->net),
+			      m->port, grp->portid, 0);
+	if (!skb)
+		return;
+
+	hdr = buf_msg(skb);
+	if (mtyp == GRP_JOIN_MSG)
+		msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
+	__skb_queue_tail(xmitq, skb);
+}
+
+void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
+			  struct sk_buff_head *xmitq)
+{
+	struct tipc_member *m;
+	u32 node = msg_orignode(hdr);
+	u32 port = msg_origport(hdr);
+
+	if (!grp)
+		return;
+
+	m = tipc_group_find_member(grp, node, port);
+
+	switch (msg_type(hdr)) {
+	case GRP_JOIN_MSG:
+		if (!m)
+			m = tipc_group_create_member(grp, node, port,
+						     MBR_QUARANTINED);
+		if (!m)
+			return;
+		m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
+
+		/* Wait until PUBLISH event is received */
+		if (m->state == MBR_DISCOVERED)
+			m->state = MBR_JOINING;
+		else if (m->state == MBR_PUBLISHED)
+			m->state = MBR_JOINED;
+		return;
+	case GRP_LEAVE_MSG:
+		if (!m)
+			return;
+
+		/* Wait until WITHDRAW event is received */
+		if (m->state != MBR_LEAVING) {
+			m->state = MBR_LEAVING;
+			return;
+		}
+		/* Otherwise deliver already received WITHDRAW event */
+		tipc_group_delete_member(grp, m);
+		return;
+	default:
+		pr_warn("Received unknown GROUP_PROTO message\n");
+	}
+}
+
+/* tipc_group_member_evt() - receive and handle a member up/down event
+ */
+void tipc_group_member_evt(struct tipc_group *grp,
+			   struct sk_buff *skb,
+			   struct sk_buff_head *xmitq)
+{
+	struct tipc_msg *hdr = buf_msg(skb);
+	struct tipc_event *evt = (void *)msg_data(hdr);
+	struct tipc_member *m;
+	struct net *net;
+	u32 port = evt->port.ref;
+	u32 node = evt->port.node;
+	u32 self;
+
+	if (!grp)
+		goto drop;
+
+	net = grp->net;
+	self = tipc_own_addr(net);
+	if (!grp->loopback && node == self && port == grp->portid)
+		goto drop;
+
+	m = tipc_group_find_member(grp, node, port);
+
+	if (evt->event == TIPC_PUBLISHED) {
+		if (!m)
+			m = tipc_group_create_member(grp, node, port,
+						     MBR_DISCOVERED);
+		if (!m)
+			goto drop;
+
+		/* Wait if JOIN message not yet received */
+		if (m->state == MBR_DISCOVERED)
+			m->state = MBR_PUBLISHED;
+		else
+			m->state = MBR_JOINED;
+		tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
+	} else if (evt->event == TIPC_WITHDRAWN) {
+		if (!m)
+			goto drop;
+
+		/* Keep back event if more messages might be expected */
+		if (m->state != MBR_LEAVING && tipc_node_is_up(net, node))
+			m->state = MBR_LEAVING;
+		else
+			tipc_group_delete_member(grp, m);
+	}
+drop:
+	kfree_skb(skb);
+}
diff --git a/net/tipc/group.h b/net/tipc/group.h
new file mode 100644
index 0000000..9bdf447
--- /dev/null
+++ b/net/tipc/group.h
@@ -0,0 +1,64 @@
+/*
+ * net/tipc/group.h: Include file for TIPC group unicast/multicast functions
+ *
+ * Copyright (c) 2017, Ericsson AB
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. Neither the names of the copyright holders nor the names of its
+ *    contributors may be used to endorse or promote products derived from
+ *    this software without specific prior written permission.
+ *
+ * Alternatively, this software may be distributed under the terms of the
+ * GNU General Public License ("GPL") version 2 as published by the Free
+ * Software Foundation.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _TIPC_GROUP_H
+#define _TIPC_GROUP_H
+
+#include "core.h"
+
+struct tipc_group;
+struct tipc_member;
+struct tipc_msg;
+
+struct tipc_group *tipc_group_create(struct net *net, u32 portid,
+				     struct tipc_group_req *mreq);
+void tipc_group_delete(struct net *net, struct tipc_group *grp);
+void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port);
+struct tipc_nlist *tipc_group_dests(struct tipc_group *grp);
+void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
+		     int *scope);
+void tipc_group_filter_msg(struct tipc_group *grp,
+			   struct sk_buff_head *inputq,
+			   struct sk_buff_head *xmitq);
+void tipc_group_member_evt(struct tipc_group *grp,
+			   struct sk_buff *skb,
+			   struct sk_buff_head *xmitq);
+void tipc_group_proto_rcv(struct tipc_group *grp,
+			  struct tipc_msg *hdr,
+			  struct sk_buff_head *xmitq);
+void tipc_group_update_bc_members(struct tipc_group *grp);
+u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
+int tipc_group_size(struct tipc_group *grp);
+#endif
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ac0144f..bd25bff 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1046,11 +1046,12 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
 	case TIPC_MEDIUM_IMPORTANCE:
 	case TIPC_HIGH_IMPORTANCE:
 	case TIPC_CRITICAL_IMPORTANCE:
-		if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) {
+		if (unlikely(msg_mcast(hdr))) {
 			skb_queue_tail(l->bc_rcvlink->inputq, skb);
 			return true;
 		}
 	case CONN_MANAGER:
+	case GROUP_PROTOCOL:
 		skb_queue_tail(inputq, skb);
 		return true;
 	case NAME_DISTRIBUTOR:
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index be3e38a..7ba1045 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -1,7 +1,7 @@
 /*
  * net/tipc/msg.h: Include file for TIPC message header routines
  *
- * Copyright (c) 2000-2007, 2014-2015 Ericsson AB
+ * Copyright (c) 2000-2007, 2014-2016 Ericsson AB
  * Copyright (c) 2005-2008, 2010-2011, Wind River Systems
  * All rights reserved.
  *
@@ -65,6 +65,7 @@ struct plist;
 #define TIPC_MCAST_MSG		1
 #define TIPC_NAMED_MSG		2
 #define TIPC_DIRECT_MSG		3
+#define TIPC_GRP_BCAST_MSG	4
 
 /*
  * Internal message users
@@ -73,6 +74,7 @@ struct plist;
 #define  MSG_BUNDLER          6
 #define  LINK_PROTOCOL        7
 #define  CONN_MANAGER         8
+#define  GROUP_PROTOCOL       9
 #define  TUNNEL_PROTOCOL      10
 #define  NAME_DISTRIBUTOR     11
 #define  MSG_FRAGMENTER       12
@@ -87,6 +89,7 @@ struct plist;
 #define BASIC_H_SIZE              32	/* Basic payload message */
 #define NAMED_H_SIZE              40	/* Named payload message */
 #define MCAST_H_SIZE              44	/* Multicast payload message */
+#define GROUP_H_SIZE              44	/* Group payload message */
 #define INT_H_SIZE                40	/* Internal messages */
 #define MIN_H_SIZE                24	/* Smallest legal TIPC header size */
 #define MAX_H_SIZE                60	/* Largest possible TIPC header size */
@@ -252,6 +255,11 @@ static inline void msg_set_type(struct tipc_msg *m, u32 n)
 	msg_set_bits(m, 1, 29, 0x7, n);
 }
 
+static inline int msg_in_group(struct tipc_msg *m)
+{
+	return (msg_type(m) == TIPC_GRP_BCAST_MSG);
+}
+
 static inline u32 msg_named(struct tipc_msg *m)
 {
 	return msg_type(m) == TIPC_NAMED_MSG;
@@ -259,7 +267,9 @@ static inline u32 msg_named(struct tipc_msg *m)
 
 static inline u32 msg_mcast(struct tipc_msg *m)
 {
-	return msg_type(m) == TIPC_MCAST_MSG;
+	int mtyp = msg_type(m);
+
+	return ((mtyp == TIPC_MCAST_MSG) || (mtyp == TIPC_GRP_BCAST_MSG));
 }
 
 static inline u32 msg_connected(struct tipc_msg *m)
@@ -515,6 +525,12 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n)
 #define DSC_RESP_MSG		1
 
 /*
+ * Group protocol message types
+ */
+#define GRP_JOIN_MSG         0
+#define GRP_LEAVE_MSG        1
+
+/*
  * Word 1
  */
 static inline u32 msg_seq_gap(struct tipc_msg *m)
@@ -795,6 +811,28 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
 	msg_set_bits(m, 9, 0, 0xffff, n);
 }
 
+static inline u16 msg_grp_bc_syncpt(struct tipc_msg *m)
+{
+	return msg_bits(m, 9, 16, 0xffff);
+}
+
+static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n)
+{
+	msg_set_bits(m, 9, 16, 0xffff, n);
+}
+
+/* Word 10
+ */
+static inline u16 msg_grp_bc_seqno(struct tipc_msg *m)
+{
+	return msg_bits(m, 10, 16, 0xffff);
+}
+
+static inline void msg_set_grp_bc_seqno(struct tipc_msg *m, u32 n)
+{
+	msg_set_bits(m, 10, 16, 0xffff, n);
+}
+
 static inline bool msg_peer_link_is_up(struct tipc_msg *m)
 {
 	if (likely(msg_user(m) != LINK_PROTOCOL))
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 20f75b1..941aed3 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -43,6 +43,7 @@
 #include "bcast.h"
 #include "addr.h"
 #include "node.h"
+#include "group.h"
 #include <net/genetlink.h>
 
 #define TIPC_NAMETBL_SIZE 1024		/* must be a power of 2 */
@@ -596,18 +597,6 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance,
 	return ref;
 }
 
-/**
- * tipc_nametbl_mc_translate - find multicast destinations
- *
- * Creates list of all local ports that overlap the given multicast address;
- * also determines if any off-node ports overlap.
- *
- * Note: Publications with a scope narrower than 'limit' are ignored.
- * (i.e. local node-scope publications mustn't receive messages arriving
- * from another node, even if the multcast link brought it here)
- *
- * Returns non-zero if any off-node ports overlap
- */
 int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
 			      u32 limit, struct list_head *dports)
 {
@@ -679,6 +668,37 @@ void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
 	rcu_read_unlock();
 }
 
+/* tipc_nametbl_build_group - build list of communication group members
+ */
+void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
+			      u32 type, u32 domain)
+{
+	struct sub_seq *sseq, *stop;
+	struct publication *p;
+	struct name_info *info;
+	struct name_seq *seq;
+
+	rcu_read_lock();
+	seq = nametbl_find_seq(net, type);
+	if (!seq)
+		goto exit;
+
+	spin_lock_bh(&seq->lock);
+	sseq = seq->sseqs;
+	stop = seq->sseqs + seq->first_free;
+	for (; sseq != stop; sseq++) {
+		info = sseq->info;
+		list_for_each_entry(p, &info->zone_list, zone_list) {
+			if (!tipc_in_scope(domain, p->node))
+				continue;
+			tipc_group_add_member(grp, p->node, p->ref);
+		}
+	}
+	spin_unlock_bh(&seq->lock);
+exit:
+	rcu_read_unlock();
+}
+
 /*
  * tipc_nametbl_publish - add name publication to network name tables
  */
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index d121175..97646b1 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -40,6 +40,7 @@
 struct tipc_subscription;
 struct tipc_plist;
 struct tipc_nlist;
+struct tipc_group;
 
 /*
  * TIPC name types reserved for internal TIPC use (both current and planned)
@@ -101,6 +102,8 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
 u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node);
 int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
 			      u32 limit, struct list_head *dports);
+void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
+			      u32 type, u32 domain);
 void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
 				   u32 upper, u32 domain,
 				   struct tipc_nlist *nodes);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index df2f219..acd58d2 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -48,7 +48,8 @@ enum {
 	TIPC_BCAST_SYNCH      = (1 << 1),
 	TIPC_BCAST_STATE_NACK = (1 << 2),
 	TIPC_BLOCK_FLOWCTL    = (1 << 3),
-	TIPC_BCAST_RCAST      = (1 << 4)
+	TIPC_BCAST_RCAST      = (1 << 4),
+	TIPC_MCAST_GROUPS     = (1 << 5)
 };
 
 #define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 850358f..1e5de367 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1,7 +1,7 @@
 /*
  * net/tipc/socket.c: TIPC socket API
  *
- * Copyright (c) 2001-2007, 2012-2016, Ericsson AB
+ * Copyright (c) 2001-2007, 2012-2017, Ericsson AB
  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
  * All rights reserved.
  *
@@ -45,6 +45,7 @@
 #include "socket.h"
 #include "bcast.h"
 #include "netlink.h"
+#include "group.h"
 
 #define CONN_TIMEOUT_DEFAULT	8000	/* default connect timeout = 8s */
 #define CONN_PROBING_INTERVAL	msecs_to_jiffies(3600000)  /* [ms] => 1 h */
@@ -78,7 +79,7 @@ enum {
  * @conn_timeout: the time we can wait for an unresponded setup request
  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
  * @cong_link_cnt: number of congested links
- * @sent_unacked: # messages sent by socket, and not yet acked by peer
+ * @snt_unacked: # messages sent by socket, and not yet acked by peer
  * @rcv_unacked: # messages read by user, but not yet acked back to peer
  * @peer: 'connected' peer for dgram/rdm
  * @node: hash table node
@@ -109,6 +110,7 @@ struct tipc_sock {
 	struct rhash_head node;
 	struct tipc_mc_method mc_method;
 	struct rcu_head rcu;
+	struct tipc_group *group;
 };
 
 static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb);
@@ -123,6 +125,7 @@ static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
 			   struct tipc_name_seq const *seq);
 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
 			    struct tipc_name_seq const *seq);
+static int tipc_sk_leave(struct tipc_sock *tsk);
 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
 static int tipc_sk_insert(struct tipc_sock *tsk);
 static void tipc_sk_remove(struct tipc_sock *tsk);
@@ -559,6 +562,7 @@ static int tipc_release(struct socket *sock)
 
 	__tipc_shutdown(sock, TIPC_ERR_NO_PORT);
 	sk->sk_shutdown = SHUTDOWN_MASK;
+	tipc_sk_leave(tsk);
 	tipc_sk_withdraw(tsk, 0, NULL);
 	sk_stop_timer(sk, &sk->sk_timer);
 	tipc_sk_remove(tsk);
@@ -601,7 +605,10 @@ static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
 		res = tipc_sk_withdraw(tsk, 0, NULL);
 		goto exit;
 	}
-
+	if (tsk->group) {
+		res = -EACCES;
+		goto exit;
+	}
 	if (uaddr_len < sizeof(struct sockaddr_tipc)) {
 		res = -EINVAL;
 		goto exit;
@@ -698,6 +705,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
 {
 	struct sock *sk = sock->sk;
 	struct tipc_sock *tsk = tipc_sk(sk);
+	struct tipc_group *grp = tsk->group;
 	u32 mask = 0;
 
 	sock_poll_wait(file, sk_sleep(sk), wait);
@@ -718,8 +726,9 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
 			mask |= (POLLIN | POLLRDNORM);
 		break;
 	case TIPC_OPEN:
-		if (!tsk->cong_link_cnt)
-			mask |= POLLOUT;
+		if (!grp || tipc_group_size(grp))
+			if (!tsk->cong_link_cnt)
+				mask |= POLLOUT;
 		if (tipc_sk_type_connectionless(sk) &&
 		    (!skb_queue_empty(&sk->sk_receive_queue)))
 			mask |= (POLLIN | POLLRDNORM);
@@ -757,6 +766,9 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
 	struct tipc_nlist dsts;
 	int rc;
 
+	if (tsk->group)
+		return -EACCES;
+
 	/* Block or return if any destination link is congested */
 	rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
 	if (unlikely(rc))
@@ -794,6 +806,64 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
 }
 
 /**
+ * tipc_send_group_bcast - send message to all members in communication group
+ * @sk: socket structure
+ * @m: message to send
+ * @dlen: total length of message data
+ * @timeout: timeout to wait for wakeup
+ *
+ * Called from function tipc_sendmsg(), which has done all sanity checks
+ * Returns the number of bytes sent on success, or errno
+ */
+static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
+				 int dlen, long timeout)
+{
+	struct sock *sk = sock->sk;
+	struct net *net = sock_net(sk);
+	struct tipc_sock *tsk = tipc_sk(sk);
+	struct tipc_group *grp = tsk->group;
+	struct tipc_msg *hdr = &tsk->phdr;
+	struct tipc_nlist *dsts = tipc_group_dests(grp);
+	struct tipc_mc_method *method = &tsk->mc_method;
+	struct sk_buff_head pkts;
+	int mtu = tipc_bcast_get_mtu(net);
+	int rc = -EHOSTUNREACH;
+
+	if (!dsts->local && !dsts->remote)
+		return -EHOSTUNREACH;
+
+	/* Block or return if any destination link is congested */
+	rc = tipc_wait_for_cond(sock, &timeout,	!tsk->cong_link_cnt);
+	if (unlikely(rc))
+		return rc;
+
+	/* Complete message header */
+	msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
+	msg_set_hdr_sz(hdr, MCAST_H_SIZE);
+	msg_set_destport(hdr, 0);
+	msg_set_destnode(hdr, 0);
+	msg_set_nameinst(hdr, 0);
+	msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp));
+
+	/* Build message as chain of buffers */
+	skb_queue_head_init(&pkts);
+	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
+	if (unlikely(rc != dlen))
+		return rc;
+
+	/* Send message */
+	rc = tipc_mcast_xmit(net, &pkts, method, dsts,
+			     &tsk->cong_link_cnt);
+	if (unlikely(rc))
+		return rc;
+
+	/* Update broadcast sequence number */
+	tipc_group_update_bc_members(tsk->group);
+
+	return dlen;
+}
+
+/**
  * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
  * @arrvq: queue with arriving messages, to be cloned after destination lookup
  * @inputq: queue with cloned messages, delivered to socket after dest lookup
@@ -805,11 +875,13 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
 {
 	struct tipc_msg *msg;
 	struct list_head dports;
-	u32 portid;
+	u32 portid, oport, onode;
+	u32 self = tipc_own_addr(net);
 	u32 scope = TIPC_CLUSTER_SCOPE;
 	struct sk_buff_head tmpq;
 	uint hsz;
 	struct sk_buff *skb, *_skb;
+	u32 lower = 0, upper = ~0;
 
 	__skb_queue_head_init(&tmpq);
 	INIT_LIST_HEAD(&dports);
@@ -818,14 +890,18 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
 	for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
 		msg = buf_msg(skb);
 		hsz = skb_headroom(skb) + msg_hdr_sz(msg);
-
-		if (in_own_node(net, msg_orignode(msg)))
+		oport = msg_origport(msg);
+		onode = msg_orignode(msg);
+		if (onode == self)
 			scope = TIPC_NODE_SCOPE;
 
 		/* Create destination port list and message clones: */
-		tipc_nametbl_mc_translate(net,
-					  msg_nametype(msg), msg_namelower(msg),
-					  msg_nameupper(msg), scope, &dports);
+		if (!msg_in_group(msg)) {
+			lower = msg_namelower(msg);
+			upper = msg_nameupper(msg);
+		}
+		tipc_nametbl_mc_translate(net, msg_nametype(msg), lower, upper,
+					  scope, &dports);
 		while (tipc_dest_pop(&dports, NULL, &portid)) {
 			_skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
 			if (_skb) {
@@ -895,10 +971,6 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
 	kfree_skb(skb);
 }
 
-static void tipc_sk_top_evt(struct tipc_sock *tsk, struct tipc_event *evt)
-{
-}
-
 /**
  * tipc_sendmsg - send message in connectionless manner
  * @sock: socket structure
@@ -934,6 +1006,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
 	long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
 	struct list_head *clinks = &tsk->cong_links;
 	bool syn = !tipc_sk_type_connectionless(sk);
+	struct tipc_group *grp = tsk->group;
 	struct tipc_msg *hdr = &tsk->phdr;
 	struct tipc_name_seq *seq;
 	struct sk_buff_head pkts;
@@ -944,6 +1017,9 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
 	if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
 		return -EMSGSIZE;
 
+	if (unlikely(grp))
+		return tipc_send_group_bcast(sock, m, dlen, timeout);
+
 	if (unlikely(!dest)) {
 		dest = &tsk->peer;
 		if (!syn || dest->family != AF_TIPC)
@@ -1543,6 +1619,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
 	struct tipc_sock *tsk = tipc_sk(sk);
 	struct sk_buff *skb = __skb_dequeue(inputq);
 	struct tipc_msg *hdr = buf_msg(skb);
+	struct tipc_group *grp = tsk->group;
 
 	switch (msg_user(hdr)) {
 	case CONN_MANAGER:
@@ -1553,8 +1630,12 @@ static void tipc_sk_proto_rcv(struct sock *sk,
 		tsk->cong_link_cnt--;
 		sk->sk_write_space(sk);
 		break;
+	case GROUP_PROTOCOL:
+		tipc_group_proto_rcv(grp, hdr, xmitq);
+		break;
 	case TOP_SRV:
-		tipc_sk_top_evt(tsk, (void *)msg_data(hdr));
+		tipc_group_member_evt(tsk->group, skb, xmitq);
+		skb = NULL;
 		break;
 	default:
 		break;
@@ -1699,6 +1780,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
 {
 	bool sk_conn = !tipc_sk_type_connectionless(sk);
 	struct tipc_sock *tsk = tipc_sk(sk);
+	struct tipc_group *grp = tsk->group;
 	struct tipc_msg *hdr = buf_msg(skb);
 	struct net *net = sock_net(sk);
 	struct sk_buff_head inputq;
@@ -1710,15 +1792,19 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
 
 	if (unlikely(!msg_isdata(hdr)))
 		tipc_sk_proto_rcv(sk, &inputq, xmitq);
-	else if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG))
+	else if (unlikely(msg_type(hdr) > TIPC_GRP_BCAST_MSG))
 		return kfree_skb(skb);
 
+	if (unlikely(grp))
+		tipc_group_filter_msg(grp, &inputq, xmitq);
+
 	/* Validate and add to receive buffer if there is space */
 	while ((skb = __skb_dequeue(&inputq))) {
 		hdr = buf_msg(skb);
 		limit = rcvbuf_limit(sk, skb);
 		if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) ||
-		    (!sk_conn && msg_connected(hdr)))
+		    (!sk_conn && msg_connected(hdr)) ||
+		    (!grp && msg_in_group(hdr)))
 			err = TIPC_ERR_NO_PORT;
 		else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit)
 			err = TIPC_ERR_OVERLOAD;
@@ -1837,7 +1923,6 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
 			sock_put(sk);
 			continue;
 		}
-
 		/* No destination socket => dequeue skb if still there */
 		skb = tipc_skb_dequeue(inputq, dport);
 		if (!skb)
@@ -1905,6 +1990,11 @@ static int tipc_connect(struct socket *sock, struct sockaddr *dest,
 
 	lock_sock(sk);
 
+	if (tsk->group) {
+		res = -EINVAL;
+		goto exit;
+	}
+
 	if (dst->family == AF_UNSPEC) {
 		memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc));
 		if (!tipc_sk_type_connectionless(sk))
@@ -2341,6 +2431,52 @@ void tipc_sk_rht_destroy(struct net *net)
 	rhashtable_destroy(&tn->sk_rht);
 }
 
+static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
+{
+	struct net *net = sock_net(&tsk->sk);
+	struct tipc_group *grp = tsk->group;
+	u32 domain = addr_domain(net, mreq->scope);
+	struct tipc_msg *hdr = &tsk->phdr;
+	struct tipc_name_seq seq;
+	int rc;
+
+	if (mreq->type < TIPC_RESERVED_TYPES)
+		return -EACCES;
+	if (grp)
+		return -EACCES;
+	grp = tipc_group_create(net, tsk->portid, mreq);
+	if (!grp)
+		return -ENOMEM;
+	tsk->group = grp;
+	msg_set_lookup_scope(hdr, mreq->scope);
+	msg_set_nametype(hdr, mreq->type);
+	msg_set_dest_droppable(hdr, true);
+	seq.type = mreq->type;
+	seq.lower = mreq->instance;
+	seq.upper = seq.lower;
+	tipc_nametbl_build_group(net, grp, mreq->type, domain);
+	rc = tipc_sk_publish(tsk, mreq->scope, &seq);
+	if (rc)
+		tipc_group_delete(net, grp);
+	return rc;
+}
+
+static int tipc_sk_leave(struct tipc_sock *tsk)
+{
+	struct net *net = sock_net(&tsk->sk);
+	struct tipc_group *grp = tsk->group;
+	struct tipc_name_seq seq;
+	int scope;
+
+	if (!grp)
+		return -EINVAL;
+	tipc_group_self(grp, &seq, &scope);
+	tipc_group_delete(net, grp);
+	tsk->group = NULL;
+	tipc_sk_withdraw(tsk, scope, &seq);
+	return 0;
+}
+
 /**
  * tipc_setsockopt - set socket option
  * @sock: socket structure
@@ -2359,6 +2495,7 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
 {
 	struct sock *sk = sock->sk;
 	struct tipc_sock *tsk = tipc_sk(sk);
+	struct tipc_group_req mreq;
 	u32 value = 0;
 	int res = 0;
 
@@ -2374,9 +2511,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
 	case TIPC_CONN_TIMEOUT:
 		if (ol < sizeof(value))
 			return -EINVAL;
-		res = get_user(value, (u32 __user *)ov);
-		if (res)
-			return res;
+		if (get_user(value, (u32 __user *)ov))
+			return -EFAULT;
+		break;
+	case TIPC_GROUP_JOIN:
+		if (ol < sizeof(mreq))
+			return -EINVAL;
+		if (copy_from_user(&mreq, ov, sizeof(mreq)))
+			return -EFAULT;
 		break;
 	default:
 		if (ov || ol)
@@ -2409,6 +2551,12 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
 		tsk->mc_method.rcast = true;
 		tsk->mc_method.mandatory = true;
 		break;
+	case TIPC_GROUP_JOIN:
+		res = tipc_sk_join(tsk, &mreq);
+		break;
+	case TIPC_GROUP_LEAVE:
+		res = tipc_sk_leave(tsk);
+		break;
 	default:
 		res = -EINVAL;
 	}
@@ -2436,7 +2584,8 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
 {
 	struct sock *sk = sock->sk;
 	struct tipc_sock *tsk = tipc_sk(sk);
-	int len;
+	struct tipc_name_seq seq;
+	int len, scope;
 	u32 value;
 	int res;
 
@@ -2470,6 +2619,12 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
 	case TIPC_SOCK_RECVQ_DEPTH:
 		value = skb_queue_len(&sk->sk_receive_queue);
 		break;
+	case TIPC_GROUP_JOIN:
+		seq.type = 0;
+		if (tsk->group)
+			tipc_group_self(tsk->group, &seq, &scope);
+		value = seq.type;
+		break;
 	default:
 		res = -EINVAL;
 	}
-- 
2.1.4

^ permalink raw reply related	[flat|nested] 22+ messages in thread

* [net-next 08/18] tipc: add second source address to recvmsg()/recvfrom()
  2017-10-12 14:02 [net-next 00/18] tipc: Introduce Communcation Group feature Jon Maloy
                   ` (6 preceding siblings ...)
  2017-10-12 14:02 ` [net-next 07/18] tipc: introduce communication groups Jon Maloy
@ 2017-10-12 14:02 ` Jon Maloy
  2017-10-12 14:02 ` [net-next 09/18] tipc: receive group membership events via member socket Jon Maloy
                   ` (9 subsequent siblings)
  17 siblings, 0 replies; 22+ messages in thread
From: Jon Maloy @ 2017-10-12 14:02 UTC (permalink / raw)
  To: davem, netdev; +Cc: parthasarathy.bhuvaragan, ying.xue, tipc-discussion

With group communication, it becomes important for a message receiver to
identify not only from which socket (identfied by a node:port tuple) the
message was sent, but also the logical identity (type:instance) of the
sending member.

We fix this by adding a second instance of struct sockaddr_tipc to the
source address area when a message is read. The extra address struct
is filled in with data found in the received message header (type,) and
in the local member representation struct (instance.)

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/group.c  |  3 +++
 net/tipc/msg.h    |  1 +
 net/tipc/socket.c | 52 ++++++++++++++++++++++++++++++++++------------------
 3 files changed, 38 insertions(+), 18 deletions(-)

diff --git a/net/tipc/group.c b/net/tipc/group.c
index 20663e5..b30e24a 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -61,6 +61,7 @@ struct tipc_member {
 	struct list_head list;
 	u32 node;
 	u32 port;
+	u32 instance;
 	enum mbr_state state;
 	u16 bc_rcv_nxt;
 };
@@ -282,6 +283,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
 	if (!tipc_group_is_receiver(m))
 		goto drop;
 
+	TIPC_SKB_CB(skb)->orig_member = m->instance;
 	__skb_queue_tail(inputq, skb);
 
 	m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
@@ -388,6 +390,7 @@ void tipc_group_member_evt(struct tipc_group *grp,
 			m->state = MBR_PUBLISHED;
 		else
 			m->state = MBR_JOINED;
+		m->instance = evt->found_lower;
 		tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
 	} else if (evt->event == TIPC_WITHDRAWN) {
 		if (!m)
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 7ba1045..6bf9840 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -100,6 +100,7 @@ struct plist;
 
 struct tipc_skb_cb {
 	u32 bytes_read;
+	u32 orig_member;
 	struct sk_buff *tail;
 	bool validated;
 	u16 chain_imp;
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 1e5de367..43a74b8 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1222,26 +1222,42 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
 }
 
 /**
- * set_orig_addr - capture sender's address for received message
+ * tipc_sk_set_orig_addr - capture sender's address for received message
  * @m: descriptor for message info
- * @msg: received message header
+ * @hdr: received message header
  *
  * Note: Address is not captured if not requested by receiver.
  */
-static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
-{
-	DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
-
-	if (addr) {
-		addr->family = AF_TIPC;
-		addr->addrtype = TIPC_ADDR_ID;
-		memset(&addr->addr, 0, sizeof(addr->addr));
-		addr->addr.id.ref = msg_origport(msg);
-		addr->addr.id.node = msg_orignode(msg);
-		addr->addr.name.domain = 0;	/* could leave uninitialized */
-		addr->scope = 0;		/* could leave uninitialized */
-		m->msg_namelen = sizeof(struct sockaddr_tipc);
-	}
+static void tipc_sk_set_orig_addr(struct msghdr *m, struct sk_buff *skb)
+{
+	struct tipc_msg *hdr = buf_msg(skb);
+	struct sockaddr_pair {
+		struct sockaddr_tipc sock;
+		struct sockaddr_tipc member;
+	};
+	DECLARE_SOCKADDR(struct sockaddr_pair *, srcaddr, m->msg_name);
+
+	if (!srcaddr)
+		return;
+
+	srcaddr->sock.family = AF_TIPC;
+	srcaddr->sock.addrtype = TIPC_ADDR_ID;
+	srcaddr->sock.addr.id.ref = msg_origport(hdr);
+	srcaddr->sock.addr.id.node = msg_orignode(hdr);
+	srcaddr->sock.addr.name.domain = 0;
+	srcaddr->sock.scope = 0;
+	m->msg_namelen = sizeof(struct sockaddr_tipc);
+
+	if (!msg_in_group(hdr))
+		return;
+
+	/* Group message users may also want to know sending member's id */
+	srcaddr->member.family = AF_TIPC;
+	srcaddr->member.addrtype = TIPC_ADDR_NAME;
+	srcaddr->member.addr.name.name.type = msg_nametype(hdr);
+	srcaddr->member.addr.name.name.instance = TIPC_SKB_CB(skb)->orig_member;
+	srcaddr->member.addr.name.domain = 0;
+	m->msg_namelen = sizeof(*srcaddr);
 }
 
 /**
@@ -1432,7 +1448,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
 	} while (1);
 
 	/* Collect msg meta data, including error code and rejected data */
-	set_orig_addr(m, hdr);
+	tipc_sk_set_orig_addr(m, skb);
 	rc = tipc_sk_anc_data_recv(m, hdr, tsk);
 	if (unlikely(rc))
 		goto exit;
@@ -1526,7 +1542,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,
 
 		/* Collect msg meta data, incl. error code and rejected data */
 		if (!copied) {
-			set_orig_addr(m, hdr);
+			tipc_sk_set_orig_addr(m, skb);
 			rc = tipc_sk_anc_data_recv(m, hdr, tsk);
 			if (rc)
 				break;
-- 
2.1.4

^ permalink raw reply related	[flat|nested] 22+ messages in thread

* [net-next 09/18] tipc: receive group membership events via member socket
  2017-10-12 14:02 [net-next 00/18] tipc: Introduce Communcation Group feature Jon Maloy
                   ` (7 preceding siblings ...)
  2017-10-12 14:02 ` [net-next 08/18] tipc: add second source address to recvmsg()/recvfrom() Jon Maloy
@ 2017-10-12 14:02 ` Jon Maloy
  2017-10-12 14:02 ` [net-next 10/18] tipc: introduce flow control for group broadcast messages Jon Maloy
                   ` (8 subsequent siblings)
  17 siblings, 0 replies; 22+ messages in thread
From: Jon Maloy @ 2017-10-12 14:02 UTC (permalink / raw)
  To: davem, netdev; +Cc: parthasarathy.bhuvaragan, ying.xue, tipc-discussion

Like with any other service, group members' availability can be
subscribed for by connecting to be topology server. However, because
the events arrive via a different socket than the member socket, there
is a real risk that membership events my arrive out of synch with the
actual JOIN/LEAVE action. I.e., it is possible to receive the first
messages from a new member before the corresponding JOIN event arrives,
just as it is possible to receive the last messages from a leaving
member after the LEAVE event has already been received.

Since each member socket is internally also subscribing for membership
events, we now fix this problem by passing those events on to the user
via the member socket. We leverage the already present member synch-
ronization protocol to guarantee correct message/event order. An event
is delivered to the user as an empty message where the two source
addresses identify the new/lost member. Furthermore, we set the MSG_OOB
bit in the message flags to mark it as an event. If the event is an
indication about a member loss we also set the MSG_EOR bit, so it can
be distinguished from a member addition event.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
---
 include/uapi/linux/tipc.h |  1 +
 net/tipc/group.c          | 60 +++++++++++++++++++++++++++++++++++++----------
 net/tipc/group.h          |  2 ++
 net/tipc/msg.h            | 22 +++++++++++++++--
 net/tipc/socket.c         | 43 +++++++++++++++++++++------------
 5 files changed, 98 insertions(+), 30 deletions(-)

diff --git a/include/uapi/linux/tipc.h b/include/uapi/linux/tipc.h
index 5f7b2c4..ef41c11 100644
--- a/include/uapi/linux/tipc.h
+++ b/include/uapi/linux/tipc.h
@@ -238,6 +238,7 @@ struct sockaddr_tipc {
  * Flag values
  */
 #define TIPC_GROUP_LOOPBACK     0x1  /* Receive copy of sent msg when match */
+#define TIPC_GROUP_MEMBER_EVTS  0x2  /* Receive membership events in socket */
 
 struct tipc_group_req {
 	__u32 type;      /* group id */
diff --git a/net/tipc/group.c b/net/tipc/group.c
index b30e24a..fe2a76f 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -59,6 +59,7 @@ enum mbr_state {
 struct tipc_member {
 	struct rb_node tree_node;
 	struct list_head list;
+	struct sk_buff *event_msg;
 	u32 node;
 	u32 port;
 	u32 instance;
@@ -79,6 +80,7 @@ struct tipc_group {
 	u16 member_cnt;
 	u16 bc_snd_nxt;
 	bool loopback;
+	bool events;
 };
 
 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
@@ -117,6 +119,7 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
 	grp->instance = mreq->instance;
 	grp->scope = mreq->scope;
 	grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
+	grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
 	if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid))
 		return grp;
 	kfree(grp);
@@ -279,6 +282,13 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
 	if (!msg_in_group(hdr))
 		goto drop;
 
+	if (mtyp == TIPC_GRP_MEMBER_EVT) {
+		if (!grp->events)
+			goto drop;
+		__skb_queue_tail(inputq, skb);
+		return;
+	}
+
 	m = tipc_group_find_member(grp, node, port);
 	if (!tipc_group_is_receiver(m))
 		goto drop;
@@ -311,6 +321,7 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
 }
 
 void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
+			  struct sk_buff_head *inputq,
 			  struct sk_buff_head *xmitq)
 {
 	struct tipc_member *m;
@@ -332,10 +343,12 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
 		m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
 
 		/* Wait until PUBLISH event is received */
-		if (m->state == MBR_DISCOVERED)
+		if (m->state == MBR_DISCOVERED) {
 			m->state = MBR_JOINING;
-		else if (m->state == MBR_PUBLISHED)
+		} else if (m->state == MBR_PUBLISHED) {
 			m->state = MBR_JOINED;
+			__skb_queue_tail(inputq, m->event_msg);
+		}
 		return;
 	case GRP_LEAVE_MSG:
 		if (!m)
@@ -347,6 +360,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
 			return;
 		}
 		/* Otherwise deliver already received WITHDRAW event */
+		__skb_queue_tail(inputq, m->event_msg);
 		tipc_group_delete_member(grp, m);
 		return;
 	default:
@@ -354,16 +368,17 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
 	}
 }
 
-/* tipc_group_member_evt() - receive and handle a member up/down event
- */
 void tipc_group_member_evt(struct tipc_group *grp,
 			   struct sk_buff *skb,
+			   struct sk_buff_head *inputq,
 			   struct sk_buff_head *xmitq)
 {
 	struct tipc_msg *hdr = buf_msg(skb);
 	struct tipc_event *evt = (void *)msg_data(hdr);
 	struct tipc_member *m;
 	struct net *net;
+	int event = evt->event;
+	u32 instance = evt->found_lower;
 	u32 port = evt->port.ref;
 	u32 node = evt->port.node;
 	u32 self;
@@ -376,32 +391,51 @@ void tipc_group_member_evt(struct tipc_group *grp,
 	if (!grp->loopback && node == self && port == grp->portid)
 		goto drop;
 
+	/* Convert message before delivery to user */
+	msg_set_hdr_sz(hdr, GROUP_H_SIZE);
+	msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
+	msg_set_type(hdr, TIPC_GRP_MEMBER_EVT);
+	msg_set_origport(hdr, port);
+	msg_set_orignode(hdr, node);
+	msg_set_nametype(hdr, grp->type);
+	msg_set_grp_evt(hdr, event);
+
 	m = tipc_group_find_member(grp, node, port);
 
-	if (evt->event == TIPC_PUBLISHED) {
+	if (event == TIPC_PUBLISHED) {
 		if (!m)
 			m = tipc_group_create_member(grp, node, port,
 						     MBR_DISCOVERED);
 		if (!m)
 			goto drop;
 
-		/* Wait if JOIN message not yet received */
-		if (m->state == MBR_DISCOVERED)
+		/* Hold back event if JOIN message not yet received */
+		if (m->state == MBR_DISCOVERED) {
+			m->event_msg = skb;
 			m->state = MBR_PUBLISHED;
-		else
+		} else {
+			__skb_queue_tail(inputq, skb);
 			m->state = MBR_JOINED;
-		m->instance = evt->found_lower;
+		}
+		m->instance = instance;
+		TIPC_SKB_CB(skb)->orig_member = m->instance;
 		tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
-	} else if (evt->event == TIPC_WITHDRAWN) {
+	} else if (event == TIPC_WITHDRAWN) {
 		if (!m)
 			goto drop;
 
-		/* Keep back event if more messages might be expected */
-		if (m->state != MBR_LEAVING && tipc_node_is_up(net, node))
+		TIPC_SKB_CB(skb)->orig_member = m->instance;
+
+		/* Hold back event if more messages might be expected */
+		if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) {
+			m->event_msg = skb;
 			m->state = MBR_LEAVING;
-		else
+		} else {
+			__skb_queue_tail(inputq, skb);
 			tipc_group_delete_member(grp, m);
+		}
 	}
+	return;
 drop:
 	kfree_skb(skb);
 }
diff --git a/net/tipc/group.h b/net/tipc/group.h
index 9bdf447..5d3f10d 100644
--- a/net/tipc/group.h
+++ b/net/tipc/group.h
@@ -54,9 +54,11 @@ void tipc_group_filter_msg(struct tipc_group *grp,
 			   struct sk_buff_head *xmitq);
 void tipc_group_member_evt(struct tipc_group *grp,
 			   struct sk_buff *skb,
+			   struct sk_buff_head *inputq,
 			   struct sk_buff_head *xmitq);
 void tipc_group_proto_rcv(struct tipc_group *grp,
 			  struct tipc_msg *hdr,
+			  struct sk_buff_head *inputq,
 			  struct sk_buff_head *xmitq);
 void tipc_group_update_bc_members(struct tipc_group *grp);
 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 6bf9840..3384ee6 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -65,7 +65,8 @@ struct plist;
 #define TIPC_MCAST_MSG		1
 #define TIPC_NAMED_MSG		2
 #define TIPC_DIRECT_MSG		3
-#define TIPC_GRP_BCAST_MSG	4
+#define TIPC_GRP_MEMBER_EVT	4
+#define TIPC_GRP_BCAST_MSG	5
 
 /*
  * Internal message users
@@ -258,7 +259,14 @@ static inline void msg_set_type(struct tipc_msg *m, u32 n)
 
 static inline int msg_in_group(struct tipc_msg *m)
 {
-	return (msg_type(m) == TIPC_GRP_BCAST_MSG);
+	int mtyp = msg_type(m);
+
+	return (mtyp == TIPC_GRP_BCAST_MSG) || (mtyp == TIPC_GRP_MEMBER_EVT);
+}
+
+static inline bool msg_is_grp_evt(struct tipc_msg *m)
+{
+	return msg_type(m) == TIPC_GRP_MEMBER_EVT;
 }
 
 static inline u32 msg_named(struct tipc_msg *m)
@@ -824,6 +832,16 @@ static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n)
 
 /* Word 10
  */
+static inline u16 msg_grp_evt(struct tipc_msg *m)
+{
+	return msg_bits(m, 10, 0, 0x3);
+}
+
+static inline void msg_set_grp_evt(struct tipc_msg *m, int n)
+{
+	msg_set_bits(m, 10, 0, 0x3, n);
+}
+
 static inline u16 msg_grp_bc_seqno(struct tipc_msg *m)
 {
 	return msg_bits(m, 10, 16, 0xffff);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 43a74b8..ed0907c 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -706,39 +706,41 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
 	struct sock *sk = sock->sk;
 	struct tipc_sock *tsk = tipc_sk(sk);
 	struct tipc_group *grp = tsk->group;
-	u32 mask = 0;
+	struct sk_buff *skb = skb_peek(&sk->sk_receive_queue);
+	u32 revents = 0;
 
 	sock_poll_wait(file, sk_sleep(sk), wait);
 
 	if (sk->sk_shutdown & RCV_SHUTDOWN)
-		mask |= POLLRDHUP | POLLIN | POLLRDNORM;
+		revents |= POLLRDHUP | POLLIN | POLLRDNORM;
 	if (sk->sk_shutdown == SHUTDOWN_MASK)
-		mask |= POLLHUP;
+		revents |= POLLHUP;
 
 	switch (sk->sk_state) {
 	case TIPC_ESTABLISHED:
 		if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
-			mask |= POLLOUT;
+			revents |= POLLOUT;
 		/* fall thru' */
 	case TIPC_LISTEN:
 	case TIPC_CONNECTING:
-		if (!skb_queue_empty(&sk->sk_receive_queue))
-			mask |= (POLLIN | POLLRDNORM);
+		if (skb)
+			revents |= POLLIN | POLLRDNORM;
 		break;
 	case TIPC_OPEN:
 		if (!grp || tipc_group_size(grp))
 			if (!tsk->cong_link_cnt)
-				mask |= POLLOUT;
-		if (tipc_sk_type_connectionless(sk) &&
-		    (!skb_queue_empty(&sk->sk_receive_queue)))
-			mask |= (POLLIN | POLLRDNORM);
+				revents |= POLLOUT;
+		if (!tipc_sk_type_connectionless(sk))
+			break;
+		if (!skb)
+			break;
+		revents |= POLLIN | POLLRDNORM;
 		break;
 	case TIPC_DISCONNECTING:
-		mask = (POLLIN | POLLRDNORM | POLLHUP);
+		revents = POLLIN | POLLRDNORM | POLLHUP;
 		break;
 	}
-
-	return mask;
+	return revents;
 }
 
 /**
@@ -1418,6 +1420,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
 	struct sk_buff *skb;
 	struct tipc_msg *hdr;
 	bool connected = !tipc_sk_type_connectionless(sk);
+	bool grp_evt;
 	int rc, err, hlen, dlen, copy;
 	long timeout;
 
@@ -1442,6 +1445,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
 		dlen = msg_data_sz(hdr);
 		hlen = msg_hdr_sz(hdr);
 		err = msg_errcode(hdr);
+		grp_evt = msg_is_grp_evt(hdr);
 		if (likely(dlen || err))
 			break;
 		tsk_advance_rx_queue(sk);
@@ -1468,11 +1472,20 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
 	if (unlikely(rc))
 		goto exit;
 
+	/* Mark message as group event if applicable */
+	if (unlikely(grp_evt)) {
+		if (msg_grp_evt(hdr) == TIPC_WITHDRAWN)
+			m->msg_flags |= MSG_EOR;
+		m->msg_flags |= MSG_OOB;
+		copy = 0;
+	}
+
 	/* Caption of data or error code/rejected data was successful */
 	if (unlikely(flags & MSG_PEEK))
 		goto exit;
 
 	tsk_advance_rx_queue(sk);
+
 	if (likely(!connected))
 		goto exit;
 
@@ -1647,10 +1660,10 @@ static void tipc_sk_proto_rcv(struct sock *sk,
 		sk->sk_write_space(sk);
 		break;
 	case GROUP_PROTOCOL:
-		tipc_group_proto_rcv(grp, hdr, xmitq);
+		tipc_group_proto_rcv(grp, hdr, inputq, xmitq);
 		break;
 	case TOP_SRV:
-		tipc_group_member_evt(tsk->group, skb, xmitq);
+		tipc_group_member_evt(tsk->group, skb, inputq, xmitq);
 		skb = NULL;
 		break;
 	default:
-- 
2.1.4

^ permalink raw reply related	[flat|nested] 22+ messages in thread

* [net-next 10/18] tipc: introduce flow control for group broadcast messages
  2017-10-12 14:02 [net-next 00/18] tipc: Introduce Communcation Group feature Jon Maloy
                   ` (8 preceding siblings ...)
  2017-10-12 14:02 ` [net-next 09/18] tipc: receive group membership events via member socket Jon Maloy
@ 2017-10-12 14:02 ` Jon Maloy
  2017-10-12 14:02 ` [net-next 11/18] tipc: introduce group unicast messaging Jon Maloy
                   ` (7 subsequent siblings)
  17 siblings, 0 replies; 22+ messages in thread
From: Jon Maloy @ 2017-10-12 14:02 UTC (permalink / raw)
  To: davem, netdev; +Cc: parthasarathy.bhuvaragan, ying.xue, tipc-discussion

We introduce an end-to-end flow control mechanism for group broadcast
messages. This ensures that no messages are ever lost because of
destination receive buffer overflow, with minimal impact on performance.
For now, the algorithm is based on the assumption that there is only one
active transmitter at any moment in time.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/group.c  | 148 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
 net/tipc/group.h  |  11 ++--
 net/tipc/msg.h    |   5 +-
 net/tipc/socket.c |  48 +++++++++++++-----
 4 files changed, 190 insertions(+), 22 deletions(-)

diff --git a/net/tipc/group.c b/net/tipc/group.c
index fe2a76f..50dd7d1 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -46,6 +46,7 @@
 
 #define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)
 #define ADV_IDLE ADV_UNIT
+#define ADV_ACTIVE (ADV_UNIT * 12)
 
 enum mbr_state {
 	MBR_QUARANTINED,
@@ -59,16 +60,22 @@ enum mbr_state {
 struct tipc_member {
 	struct rb_node tree_node;
 	struct list_head list;
+	struct list_head congested;
 	struct sk_buff *event_msg;
+	struct tipc_group *group;
 	u32 node;
 	u32 port;
 	u32 instance;
 	enum mbr_state state;
+	u16 advertised;
+	u16 window;
 	u16 bc_rcv_nxt;
+	bool usr_pending;
 };
 
 struct tipc_group {
 	struct rb_root members;
+	struct list_head congested;
 	struct tipc_nlist dests;
 	struct net *net;
 	int subid;
@@ -86,11 +93,24 @@ struct tipc_group {
 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
 				  int mtyp, struct sk_buff_head *xmitq);
 
+static int tipc_group_rcvbuf_limit(struct tipc_group *grp)
+{
+	int mcnt = grp->member_cnt + 1;
+
+	/* Scale to bytes, considering worst-case truesize/msgsize ratio */
+	return mcnt * ADV_ACTIVE * FLOWCTL_BLK_SZ * 4;
+}
+
 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
 {
 	return grp->bc_snd_nxt;
 }
 
+static bool tipc_group_is_enabled(struct tipc_member *m)
+{
+	return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING;
+}
+
 static bool tipc_group_is_receiver(struct tipc_member *m)
 {
 	return m && m->state >= MBR_JOINED;
@@ -111,6 +131,7 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
 	if (!grp)
 		return NULL;
 	tipc_nlist_init(&grp->dests, tipc_own_addr(net));
+	INIT_LIST_HEAD(&grp->congested);
 	grp->members = RB_ROOT;
 	grp->net = net;
 	grp->portid = portid;
@@ -213,6 +234,8 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
 	if (!m)
 		return NULL;
 	INIT_LIST_HEAD(&m->list);
+	INIT_LIST_HEAD(&m->congested);
+	m->group = grp;
 	m->node = node;
 	m->port = port;
 	grp->member_cnt++;
@@ -233,6 +256,7 @@ static void tipc_group_delete_member(struct tipc_group *grp,
 	rb_erase(&m->tree_node, &grp->members);
 	grp->member_cnt--;
 	list_del_init(&m->list);
+	list_del_init(&m->congested);
 
 	/* If last member on a node, remove node from dest list */
 	if (!tipc_group_find_node(grp, m->node))
@@ -255,11 +279,59 @@ void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
 	*scope = grp->scope;
 }
 
-void tipc_group_update_bc_members(struct tipc_group *grp)
+void tipc_group_update_member(struct tipc_member *m, int len)
+{
+	struct tipc_group *grp = m->group;
+	struct tipc_member *_m, *tmp;
+
+	if (!tipc_group_is_enabled(m))
+		return;
+
+	m->window -= len;
+
+	if (m->window >= ADV_IDLE)
+		return;
+
+	if (!list_empty(&m->congested))
+		return;
+
+	/* Sort member into congested members' list */
+	list_for_each_entry_safe(_m, tmp, &grp->congested, congested) {
+		if (m->window > _m->window)
+			continue;
+		list_add_tail(&m->congested, &_m->congested);
+		return;
+	}
+	list_add_tail(&m->congested, &grp->congested);
+}
+
+void tipc_group_update_bc_members(struct tipc_group *grp, int len)
 {
+	struct tipc_member *m;
+	struct rb_node *n;
+
+	for (n = rb_first(&grp->members); n; n = rb_next(n)) {
+		m = container_of(n, struct tipc_member, tree_node);
+		if (tipc_group_is_enabled(m))
+			tipc_group_update_member(m, len);
+	}
 	grp->bc_snd_nxt++;
 }
 
+bool tipc_group_bc_cong(struct tipc_group *grp, int len)
+{
+	struct tipc_member *m;
+
+	if (list_empty(&grp->congested))
+		return false;
+
+	m = list_first_entry(&grp->congested, struct tipc_member, congested);
+	if (m->window >= len)
+		return false;
+
+	return true;
+}
+
 /* tipc_group_filter_msg() - determine if we should accept arriving message
  */
 void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
@@ -302,11 +374,36 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
 	kfree_skb(skb);
 }
 
+void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
+			       u32 port, struct sk_buff_head *xmitq)
+{
+	struct tipc_member *m;
+
+	m = tipc_group_find_member(grp, node, port);
+	if (!m)
+		return;
+
+	m->advertised -= blks;
+
+	switch (m->state) {
+	case MBR_JOINED:
+		if (m->advertised <= (ADV_ACTIVE - ADV_UNIT))
+			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
+		break;
+	case MBR_DISCOVERED:
+	case MBR_JOINING:
+	case MBR_LEAVING:
+	default:
+		break;
+	}
+}
+
 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
 				  int mtyp, struct sk_buff_head *xmitq)
 {
 	struct sk_buff *skb;
 	struct tipc_msg *hdr;
+	int adv = 0;
 
 	skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,
 			      m->node, tipc_own_addr(grp->net),
@@ -314,14 +411,24 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
 	if (!skb)
 		return;
 
+	if (m->state == MBR_JOINED)
+		adv = ADV_ACTIVE - m->advertised;
+
 	hdr = buf_msg(skb);
-	if (mtyp == GRP_JOIN_MSG)
+
+	if (mtyp == GRP_JOIN_MSG) {
 		msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
+		msg_set_adv_win(hdr, adv);
+		m->advertised += adv;
+	} else if (mtyp == GRP_ADV_MSG) {
+		msg_set_adv_win(hdr, adv);
+		m->advertised += adv;
+	}
 	__skb_queue_tail(xmitq, skb);
 }
 
-void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
-			  struct sk_buff_head *inputq,
+void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
+			  struct tipc_msg *hdr, struct sk_buff_head *inputq,
 			  struct sk_buff_head *xmitq)
 {
 	struct tipc_member *m;
@@ -341,14 +448,22 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
 		if (!m)
 			return;
 		m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
+		m->window += msg_adv_win(hdr);
 
 		/* Wait until PUBLISH event is received */
 		if (m->state == MBR_DISCOVERED) {
 			m->state = MBR_JOINING;
 		} else if (m->state == MBR_PUBLISHED) {
 			m->state = MBR_JOINED;
+			*usr_wakeup = true;
+			m->usr_pending = false;
+			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
 			__skb_queue_tail(inputq, m->event_msg);
 		}
+		if (m->window < ADV_IDLE)
+			tipc_group_update_member(m, 0);
+		else
+			list_del_init(&m->congested);
 		return;
 	case GRP_LEAVE_MSG:
 		if (!m)
@@ -361,14 +476,28 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
 		}
 		/* Otherwise deliver already received WITHDRAW event */
 		__skb_queue_tail(inputq, m->event_msg);
+		*usr_wakeup = m->usr_pending;
 		tipc_group_delete_member(grp, m);
+		list_del_init(&m->congested);
+		return;
+	case GRP_ADV_MSG:
+		if (!m)
+			return;
+		m->window += msg_adv_win(hdr);
+		*usr_wakeup = m->usr_pending;
+		m->usr_pending = false;
+		list_del_init(&m->congested);
 		return;
 	default:
 		pr_warn("Received unknown GROUP_PROTO message\n");
 	}
 }
 
+/* tipc_group_member_evt() - receive and handle a member up/down event
+ */
 void tipc_group_member_evt(struct tipc_group *grp,
+			   bool *usr_wakeup,
+			   int *sk_rcvbuf,
 			   struct sk_buff *skb,
 			   struct sk_buff_head *inputq,
 			   struct sk_buff_head *xmitq)
@@ -416,16 +545,25 @@ void tipc_group_member_evt(struct tipc_group *grp,
 		} else {
 			__skb_queue_tail(inputq, skb);
 			m->state = MBR_JOINED;
+			*usr_wakeup = true;
+			m->usr_pending = false;
 		}
 		m->instance = instance;
 		TIPC_SKB_CB(skb)->orig_member = m->instance;
 		tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
+		if (m->window < ADV_IDLE)
+			tipc_group_update_member(m, 0);
+		else
+			list_del_init(&m->congested);
 	} else if (event == TIPC_WITHDRAWN) {
 		if (!m)
 			goto drop;
 
 		TIPC_SKB_CB(skb)->orig_member = m->instance;
 
+		*usr_wakeup = m->usr_pending;
+		m->usr_pending = false;
+
 		/* Hold back event if more messages might be expected */
 		if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) {
 			m->event_msg = skb;
@@ -434,7 +572,9 @@ void tipc_group_member_evt(struct tipc_group *grp,
 			__skb_queue_tail(inputq, skb);
 			tipc_group_delete_member(grp, m);
 		}
+		list_del_init(&m->congested);
 	}
+	*sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
 	return;
 drop:
 	kfree_skb(skb);
diff --git a/net/tipc/group.h b/net/tipc/group.h
index 5d3f10d..0e2740e 100644
--- a/net/tipc/group.h
+++ b/net/tipc/group.h
@@ -52,15 +52,18 @@ void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
 void tipc_group_filter_msg(struct tipc_group *grp,
 			   struct sk_buff_head *inputq,
 			   struct sk_buff_head *xmitq);
-void tipc_group_member_evt(struct tipc_group *grp,
-			   struct sk_buff *skb,
+void tipc_group_member_evt(struct tipc_group *grp, bool *wakeup,
+			   int *sk_rcvbuf, struct sk_buff *skb,
 			   struct sk_buff_head *inputq,
 			   struct sk_buff_head *xmitq);
-void tipc_group_proto_rcv(struct tipc_group *grp,
+void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup,
 			  struct tipc_msg *hdr,
 			  struct sk_buff_head *inputq,
 			  struct sk_buff_head *xmitq);
-void tipc_group_update_bc_members(struct tipc_group *grp);
+void tipc_group_update_bc_members(struct tipc_group *grp, int len);
+bool tipc_group_bc_cong(struct tipc_group *grp, int len);
+void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
+			       u32 port, struct sk_buff_head *xmitq);
 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
 int tipc_group_size(struct tipc_group *grp);
 #endif
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 3384ee6..820720c 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -538,6 +538,7 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n)
  */
 #define GRP_JOIN_MSG         0
 #define GRP_LEAVE_MSG        1
+#define GRP_ADV_MSG          2
 
 /*
  * Word 1
@@ -790,12 +791,12 @@ static inline void msg_set_conn_ack(struct tipc_msg *m, u32 n)
 	msg_set_bits(m, 9, 16, 0xffff, n);
 }
 
-static inline u32 msg_adv_win(struct tipc_msg *m)
+static inline u16 msg_adv_win(struct tipc_msg *m)
 {
 	return msg_bits(m, 9, 0, 0xffff);
 }
 
-static inline void msg_set_adv_win(struct tipc_msg *m, u32 n)
+static inline void msg_set_adv_win(struct tipc_msg *m, u16 n)
 {
 	msg_set_bits(m, 9, 0, 0xffff, n);
 }
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index ed0907c..d2f83fc9 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -196,6 +196,11 @@ static bool tsk_conn_cong(struct tipc_sock *tsk)
 	return tsk->snt_unacked > tsk->snd_win;
 }
 
+static u16 tsk_blocks(int len)
+{
+	return ((len / FLOWCTL_BLK_SZ) + 1);
+}
+
 /* tsk_blocks(): translate a buffer size in bytes to number of
  * advertisable blocks, taking into account the ratio truesize(len)/len
  * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
@@ -828,20 +833,22 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
 	struct tipc_nlist *dsts = tipc_group_dests(grp);
 	struct tipc_mc_method *method = &tsk->mc_method;
 	struct sk_buff_head pkts;
+	int blks = tsk_blocks(MCAST_H_SIZE + dlen);
 	int mtu = tipc_bcast_get_mtu(net);
 	int rc = -EHOSTUNREACH;
 
 	if (!dsts->local && !dsts->remote)
 		return -EHOSTUNREACH;
 
-	/* Block or return if any destination link is congested */
-	rc = tipc_wait_for_cond(sock, &timeout,	!tsk->cong_link_cnt);
+	/* Block or return if any destination link or member is congested */
+	rc = tipc_wait_for_cond(sock, &timeout,	!tsk->cong_link_cnt &&
+				!tipc_group_bc_cong(grp, blks));
 	if (unlikely(rc))
 		return rc;
 
 	/* Complete message header */
 	msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
-	msg_set_hdr_sz(hdr, MCAST_H_SIZE);
+	msg_set_hdr_sz(hdr, GROUP_H_SIZE);
 	msg_set_destport(hdr, 0);
 	msg_set_destnode(hdr, 0);
 	msg_set_nameinst(hdr, 0);
@@ -859,9 +866,8 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
 	if (unlikely(rc))
 		return rc;
 
-	/* Update broadcast sequence number */
-	tipc_group_update_bc_members(tsk->group);
-
+	/* Update broadcast sequence number and send windows */
+	tipc_group_update_bc_members(tsk->group, blks);
 	return dlen;
 }
 
@@ -1019,7 +1025,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
 	if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
 		return -EMSGSIZE;
 
-	if (unlikely(grp))
+	if (unlikely(grp && !dest))
 		return tipc_send_group_bcast(sock, m, dlen, timeout);
 
 	if (unlikely(!dest)) {
@@ -1419,6 +1425,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
 	struct tipc_sock *tsk = tipc_sk(sk);
 	struct sk_buff *skb;
 	struct tipc_msg *hdr;
+	struct sk_buff_head xmitq;
 	bool connected = !tipc_sk_type_connectionless(sk);
 	bool grp_evt;
 	int rc, err, hlen, dlen, copy;
@@ -1435,8 +1442,8 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
 	}
 	timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
 
+	/* Step rcv queue to first msg with data or error; wait if necessary */
 	do {
-		/* Look at first msg in receive queue; wait if necessary */
 		rc = tipc_wait_for_rcvmsg(sock, &timeout);
 		if (unlikely(rc))
 			goto exit;
@@ -1484,12 +1491,21 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
 	if (unlikely(flags & MSG_PEEK))
 		goto exit;
 
+	/* Send group flow control advertisement when applicable */
+	if (tsk->group && msg_in_group(hdr) && !grp_evt) {
+		skb_queue_head_init(&xmitq);
+		tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
+					  msg_orignode(hdr), msg_origport(hdr),
+					  &xmitq);
+		tipc_node_distr_xmit(sock_net(sk), &xmitq);
+	}
+
 	tsk_advance_rx_queue(sk);
 
 	if (likely(!connected))
 		goto exit;
 
-	/* Send connection flow control ack when applicable */
+	/* Send connection flow control advertisement when applicable */
 	tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
 	if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
 		tipc_sk_send_ack(tsk);
@@ -1649,6 +1665,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
 	struct sk_buff *skb = __skb_dequeue(inputq);
 	struct tipc_msg *hdr = buf_msg(skb);
 	struct tipc_group *grp = tsk->group;
+	bool wakeup = false;
 
 	switch (msg_user(hdr)) {
 	case CONN_MANAGER:
@@ -1657,19 +1674,23 @@ static void tipc_sk_proto_rcv(struct sock *sk,
 	case SOCK_WAKEUP:
 		tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0);
 		tsk->cong_link_cnt--;
-		sk->sk_write_space(sk);
+		wakeup = true;
 		break;
 	case GROUP_PROTOCOL:
-		tipc_group_proto_rcv(grp, hdr, inputq, xmitq);
+		tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
 		break;
 	case TOP_SRV:
-		tipc_group_member_evt(tsk->group, skb, inputq, xmitq);
+		tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf,
+				      skb, inputq, xmitq);
 		skb = NULL;
 		break;
 	default:
 		break;
 	}
 
+	if (wakeup)
+		sk->sk_write_space(sk);
+
 	kfree_skb(skb);
 }
 
@@ -1784,6 +1805,9 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
 	struct tipc_sock *tsk = tipc_sk(sk);
 	struct tipc_msg *hdr = buf_msg(skb);
 
+	if (unlikely(msg_in_group(hdr)))
+		return sk->sk_rcvbuf;
+
 	if (unlikely(!msg_connected(hdr)))
 		return sk->sk_rcvbuf << msg_importance(hdr);
 
-- 
2.1.4

^ permalink raw reply related	[flat|nested] 22+ messages in thread

* [net-next 11/18] tipc: introduce group unicast messaging
  2017-10-12 14:02 [net-next 00/18] tipc: Introduce Communcation Group feature Jon Maloy
                   ` (9 preceding siblings ...)
  2017-10-12 14:02 ` [net-next 10/18] tipc: introduce flow control for group broadcast messages Jon Maloy
@ 2017-10-12 14:02 ` Jon Maloy
  2017-10-12 14:02 ` [net-next 12/18] tipc: introduce group anycast messaging Jon Maloy
                   ` (6 subsequent siblings)
  17 siblings, 0 replies; 22+ messages in thread
From: Jon Maloy @ 2017-10-12 14:02 UTC (permalink / raw)
  To: davem, netdev; +Cc: parthasarathy.bhuvaragan, ying.xue, tipc-discussion

We now make it possible to send connectionless unicast messages
within a communication group. To send a message, the sender can use
either a direct port address, aka port identity, or an indirect port
name to be looked up.

This type of messages are subject to the same start synchronization
and flow control mechanism as group broadcast messages.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/group.c  |  48 ++++++++++++++++++++++-
 net/tipc/group.h  |   3 ++
 net/tipc/msg.h    |   3 +-
 net/tipc/socket.c | 112 +++++++++++++++++++++++++++++++++++++++++++++++++-----
 4 files changed, 153 insertions(+), 13 deletions(-)

diff --git a/net/tipc/group.c b/net/tipc/group.c
index 50dd7d1..2d624c0 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -186,6 +186,17 @@ struct tipc_member *tipc_group_find_member(struct tipc_group *grp,
 	return NULL;
 }
 
+static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp,
+						u32 node, u32 port)
+{
+	struct tipc_member *m;
+
+	m = tipc_group_find_member(grp, node, port);
+	if (m && tipc_group_is_enabled(m))
+		return m;
+	return NULL;
+}
+
 static struct tipc_member *tipc_group_find_node(struct tipc_group *grp,
 						u32 node)
 {
@@ -318,9 +329,42 @@ void tipc_group_update_bc_members(struct tipc_group *grp, int len)
 	grp->bc_snd_nxt++;
 }
 
-bool tipc_group_bc_cong(struct tipc_group *grp, int len)
+bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
+		     int len, struct tipc_member **mbr)
 {
+	struct sk_buff_head xmitq;
 	struct tipc_member *m;
+	int adv, state;
+
+	if (list_empty(&grp->congested))
+		return false;
+
+	m = tipc_group_find_dest(grp, dnode, dport);
+	*mbr = m;
+	if (!m)
+		return false;
+	if (m->usr_pending)
+		return true;
+	if (m->window >= len)
+		return false;
+	m->usr_pending = true;
+
+	/* If not fully advertised, do it now to prevent mutual blocking */
+	adv = m->advertised;
+	state = m->state;
+	if (state < MBR_JOINED)
+		return true;
+	if (state == MBR_JOINED && adv == ADV_IDLE)
+		return true;
+	skb_queue_head_init(&xmitq);
+	tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq);
+	tipc_node_distr_xmit(grp->net, &xmitq);
+	return true;
+}
+
+bool tipc_group_bc_cong(struct tipc_group *grp, int len)
+{
+	struct tipc_member *m = NULL;
 
 	if (list_empty(&grp->congested))
 		return false;
@@ -329,7 +373,7 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len)
 	if (m->window >= len)
 		return false;
 
-	return true;
+	return tipc_group_cong(grp, m->node, m->port, len, &m);
 }
 
 /* tipc_group_filter_msg() - determine if we should accept arriving message
diff --git a/net/tipc/group.h b/net/tipc/group.h
index 0e2740e..8f77290 100644
--- a/net/tipc/group.h
+++ b/net/tipc/group.h
@@ -61,9 +61,12 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup,
 			  struct sk_buff_head *inputq,
 			  struct sk_buff_head *xmitq);
 void tipc_group_update_bc_members(struct tipc_group *grp, int len);
+bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
+		     int len, struct tipc_member **m);
 bool tipc_group_bc_cong(struct tipc_group *grp, int len);
 void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
 			       u32 port, struct sk_buff_head *xmitq);
 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
+void tipc_group_update_member(struct tipc_member *m, int len);
 int tipc_group_size(struct tipc_group *grp);
 #endif
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 820720c..7430fe7 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -67,6 +67,7 @@ struct plist;
 #define TIPC_DIRECT_MSG		3
 #define TIPC_GRP_MEMBER_EVT	4
 #define TIPC_GRP_BCAST_MSG	5
+#define TIPC_GRP_UCAST_MSG	6
 
 /*
  * Internal message users
@@ -261,7 +262,7 @@ static inline int msg_in_group(struct tipc_msg *m)
 {
 	int mtyp = msg_type(m);
 
-	return (mtyp == TIPC_GRP_BCAST_MSG) || (mtyp == TIPC_GRP_MEMBER_EVT);
+	return mtyp >= TIPC_GRP_MEMBER_EVT && mtyp <= TIPC_GRP_UCAST_MSG;
 }
 
 static inline bool msg_is_grp_evt(struct tipc_msg *m)
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index d2f83fc9..4f7fee2 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -813,6 +813,93 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
 }
 
 /**
+ * tipc_send_group_msg - send a message to a member in the group
+ * @net: network namespace
+ * @m: message to send
+ * @mb: group member
+ * @dnode: destination node
+ * @dport: destination port
+ * @dlen: total length of message data
+ */
+static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
+			       struct msghdr *m, struct tipc_member *mb,
+			       u32 dnode, u32 dport, int dlen)
+{
+	struct tipc_msg *hdr = &tsk->phdr;
+	struct sk_buff_head pkts;
+	int blks = tsk_blocks(GROUP_H_SIZE + dlen);
+	int mtu, rc;
+
+	/* Complete message header */
+	msg_set_type(hdr, TIPC_GRP_UCAST_MSG);
+	msg_set_hdr_sz(hdr, GROUP_H_SIZE);
+	msg_set_destport(hdr, dport);
+	msg_set_destnode(hdr, dnode);
+
+	/* Build message as chain of buffers */
+	skb_queue_head_init(&pkts);
+	mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
+	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
+	if (unlikely(rc != dlen))
+		return rc;
+
+	/* Send message */
+	rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
+	if (unlikely(rc == -ELINKCONG)) {
+		tipc_dest_push(&tsk->cong_links, dnode, 0);
+		tsk->cong_link_cnt++;
+	}
+
+	/* Update send window and sequence number */
+	tipc_group_update_member(mb, blks);
+
+	return dlen;
+}
+
+/**
+ * tipc_send_group_unicast - send message to a member in the group
+ * @sock: socket structure
+ * @m: message to send
+ * @dlen: total length of message data
+ * @timeout: timeout to wait for wakeup
+ *
+ * Called from function tipc_sendmsg(), which has done all sanity checks
+ * Returns the number of bytes sent on success, or errno
+ */
+static int tipc_send_group_unicast(struct socket *sock, struct msghdr *m,
+				   int dlen, long timeout)
+{
+	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
+	struct sock *sk = sock->sk;
+	struct net *net = sock_net(sk);
+	struct tipc_sock *tsk = tipc_sk(sk);
+	struct tipc_group *grp = tsk->group;
+	struct tipc_member *mb = NULL;
+	int blks = tsk_blocks(GROUP_H_SIZE + dlen);
+	u32 node, port;
+	int rc;
+
+	node = dest->addr.id.node;
+	port = dest->addr.id.ref;
+	if (!port && !node)
+		return -EHOSTUNREACH;
+
+	/* Block or return if destination link or member is congested */
+	rc = tipc_wait_for_cond(sock, &timeout,
+				!tipc_dest_find(&tsk->cong_links, node, 0) &&
+				!tipc_group_cong(grp, node, port, blks, &mb));
+	if (unlikely(rc))
+		return rc;
+
+	if (unlikely(!mb))
+		return -EHOSTUNREACH;
+
+	rc = tipc_send_group_msg(net, tsk, m, mb, node, port, dlen);
+
+	return rc ? rc : dlen;
+}
+
+/**
  * tipc_send_group_bcast - send message to all members in communication group
  * @sk: socket structure
  * @m: message to send
@@ -1025,8 +1112,20 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
 	if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
 		return -EMSGSIZE;
 
-	if (unlikely(grp && !dest))
-		return tipc_send_group_bcast(sock, m, dlen, timeout);
+	if (likely(dest)) {
+		if (unlikely(m->msg_namelen < sizeof(*dest)))
+			return -EINVAL;
+		if (unlikely(dest->family != AF_TIPC))
+			return -EINVAL;
+	}
+
+	if (grp) {
+		if (!dest)
+			return tipc_send_group_bcast(sock, m, dlen, timeout);
+		if (dest->addrtype == TIPC_ADDR_ID)
+			return tipc_send_group_unicast(sock, m, dlen, timeout);
+		return -EINVAL;
+	}
 
 	if (unlikely(!dest)) {
 		dest = &tsk->peer;
@@ -1034,12 +1133,6 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
 			return -EDESTADDRREQ;
 	}
 
-	if (unlikely(m->msg_namelen < sizeof(*dest)))
-		return -EINVAL;
-
-	if (unlikely(dest->family != AF_TIPC))
-		return -EINVAL;
-
 	if (unlikely(syn)) {
 		if (sk->sk_state == TIPC_LISTEN)
 			return -EPIPE;
@@ -1072,7 +1165,6 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
 		msg_set_destport(hdr, dport);
 		if (unlikely(!dport && !dnode))
 			return -EHOSTUNREACH;
-
 	} else if (dest->addrtype == TIPC_ADDR_ID) {
 		dnode = dest->addr.id.node;
 		msg_set_type(hdr, TIPC_DIRECT_MSG);
@@ -1845,7 +1937,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
 
 	if (unlikely(!msg_isdata(hdr)))
 		tipc_sk_proto_rcv(sk, &inputq, xmitq);
-	else if (unlikely(msg_type(hdr) > TIPC_GRP_BCAST_MSG))
+	else if (unlikely(msg_type(hdr) > TIPC_GRP_UCAST_MSG))
 		return kfree_skb(skb);
 
 	if (unlikely(grp))
-- 
2.1.4

^ permalink raw reply related	[flat|nested] 22+ messages in thread

* [net-next 12/18] tipc: introduce group anycast messaging
  2017-10-12 14:02 [net-next 00/18] tipc: Introduce Communcation Group feature Jon Maloy
                   ` (10 preceding siblings ...)
  2017-10-12 14:02 ` [net-next 11/18] tipc: introduce group unicast messaging Jon Maloy
@ 2017-10-12 14:02 ` Jon Maloy
  2017-10-12 14:02 ` [net-next 13/18] tipc: introduce group multicast messaging Jon Maloy
                   ` (5 subsequent siblings)
  17 siblings, 0 replies; 22+ messages in thread
From: Jon Maloy @ 2017-10-12 14:02 UTC (permalink / raw)
  To: davem, netdev; +Cc: parthasarathy.bhuvaragan, ying.xue, tipc-discussion

In this commit, we make it possible to send connectionless unicast
messages to any member corresponding to the given member identity,
when there is more than one such member. The sender must use a
TIPC_ADDR_NAME address to achieve this effect.

We also perform load balancing between the destinations, i.e., we
primarily select one which has advertised sufficient send window
to not cause a block/EAGAIN delay, if any. This mechanism is
overlayed on the always present round-robin selection.

Anycast messages are subject to the same start synchronization
and flow control mechanism as group broadcast messages.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/group.c      |  7 +++++
 net/tipc/group.h      |  3 ++
 net/tipc/name_table.c | 41 +++++++++++++++++++++++++
 net/tipc/name_table.h |  3 ++
 net/tipc/socket.c     | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 138 insertions(+)

diff --git a/net/tipc/group.c b/net/tipc/group.c
index 2d624c0..d3b8aba 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -116,6 +116,13 @@ static bool tipc_group_is_receiver(struct tipc_member *m)
 	return m && m->state >= MBR_JOINED;
 }
 
+u32 tipc_group_exclude(struct tipc_group *grp)
+{
+	if (!grp->loopback)
+		return grp->portid;
+	return 0;
+}
+
 int tipc_group_size(struct tipc_group *grp)
 {
 	return grp->member_cnt;
diff --git a/net/tipc/group.h b/net/tipc/group.h
index 8f77290..e432066 100644
--- a/net/tipc/group.h
+++ b/net/tipc/group.h
@@ -49,6 +49,7 @@ void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port);
 struct tipc_nlist *tipc_group_dests(struct tipc_group *grp);
 void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
 		     int *scope);
+u32 tipc_group_exclude(struct tipc_group *grp);
 void tipc_group_filter_msg(struct tipc_group *grp,
 			   struct sk_buff_head *inputq,
 			   struct sk_buff_head *xmitq);
@@ -68,5 +69,7 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
 			       u32 port, struct sk_buff_head *xmitq);
 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
 void tipc_group_update_member(struct tipc_member *m, int len);
+struct tipc_member *tipc_group_find_sender(struct tipc_group *grp,
+					   u32 node, u32 port);
 int tipc_group_size(struct tipc_group *grp);
 #endif
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 941aed3..a87aff0 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -597,6 +597,47 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance,
 	return ref;
 }
 
+bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,
+			 struct list_head *dsts, int *dstcnt, u32 exclude,
+			 bool all)
+{
+	struct publication *publ;
+	struct name_info *info;
+	struct name_seq *seq;
+	struct sub_seq *sseq;
+	u32 self = tipc_own_addr(net);
+
+	if (!tipc_in_scope(domain, self))
+		return false;
+
+	*dstcnt = 0;
+	rcu_read_lock();
+	seq = nametbl_find_seq(net, type);
+	if (unlikely(!seq))
+		goto exit;
+	spin_lock_bh(&seq->lock);
+	sseq = nameseq_find_subseq(seq, instance);
+	if (likely(sseq)) {
+		info = sseq->info;
+		list_for_each_entry(publ, &info->zone_list, zone_list) {
+			if (!tipc_in_scope(domain, publ->node))
+				continue;
+			if (publ->ref == exclude && publ->node == self)
+				continue;
+			tipc_dest_push(dsts, publ->node, publ->ref);
+			(*dstcnt)++;
+			if (all)
+				continue;
+			list_move_tail(&publ->zone_list, &info->zone_list);
+			break;
+		}
+	}
+	spin_unlock_bh(&seq->lock);
+exit:
+	rcu_read_unlock();
+	return !list_empty(dsts);
+}
+
 int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
 			      u32 limit, struct list_head *dports)
 {
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index 97646b1..71926e4 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -107,6 +107,9 @@ void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
 void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
 				   u32 upper, u32 domain,
 				   struct tipc_nlist *nodes);
+bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,
+			 struct list_head *dsts, int *dstcnt, u32 exclude,
+			 bool all);
 struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
 					 u32 upper, u32 scope, u32 port_ref,
 					 u32 key);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 4f7fee2..5ea93ac 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -900,6 +900,88 @@ static int tipc_send_group_unicast(struct socket *sock, struct msghdr *m,
 }
 
 /**
+ * tipc_send_group_anycast - send message to any member with given identity
+ * @sock: socket structure
+ * @m: message to send
+ * @dlen: total length of message data
+ * @timeout: timeout to wait for wakeup
+ *
+ * Called from function tipc_sendmsg(), which has done all sanity checks
+ * Returns the number of bytes sent on success, or errno
+ */
+static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
+				   int dlen, long timeout)
+{
+	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
+	struct sock *sk = sock->sk;
+	struct net *net = sock_net(sk);
+	struct tipc_sock *tsk = tipc_sk(sk);
+	struct tipc_group *grp = tsk->group;
+	struct tipc_member *first = NULL;
+	struct tipc_member *mbr = NULL;
+	struct list_head *cong_links = &tsk->cong_links;
+	struct list_head dsts;
+	int blks = tsk_blocks(GROUP_H_SIZE + dlen);
+	u32 type, inst, domain;
+	u32 node, port, exclude;
+	int lookups = 0;
+	int dstcnt, rc;
+	bool cong;
+
+	INIT_LIST_HEAD(&dsts);
+
+	type = dest->addr.name.name.type;
+	inst = dest->addr.name.name.instance;
+	domain = addr_domain(net, dest->scope);
+	exclude = tipc_group_exclude(grp);
+
+	while (++lookups < 4) {
+		first = NULL;
+
+		/* Look for a non-congested destination member, if any */
+		while (1) {
+			if (!tipc_nametbl_lookup(net, type, inst, domain, &dsts,
+						 &dstcnt, exclude, false))
+				return -EHOSTUNREACH;
+			tipc_dest_pop(&dsts, &node, &port);
+			cong = tipc_group_cong(grp, node, port, blks, &mbr);
+			if (!cong)
+				break;
+			if (mbr == first)
+				break;
+			if (!first)
+				first = mbr;
+		}
+
+		/* Start over if destination was not in member list */
+		if (unlikely(!mbr))
+			continue;
+
+		if (likely(!cong && !tipc_dest_find(cong_links, node, 0)))
+			break;
+
+		/* Block or return if destination link or member is congested */
+		rc = tipc_wait_for_cond(sock, &timeout,
+					!tipc_dest_find(cong_links, node, 0) &&
+					!tipc_group_cong(grp, node, port,
+							 blks, &mbr));
+		if (unlikely(rc))
+			return rc;
+
+		/* Send, unless destination disappeared while waiting */
+		if (likely(mbr))
+			break;
+	}
+
+	if (unlikely(lookups >= 4))
+		return -EHOSTUNREACH;
+
+	rc = tipc_send_group_msg(net, tsk, m, mbr, node, port, dlen);
+
+	return rc ? rc : dlen;
+}
+
+/**
  * tipc_send_group_bcast - send message to all members in communication group
  * @sk: socket structure
  * @m: message to send
@@ -1122,6 +1204,8 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
 	if (grp) {
 		if (!dest)
 			return tipc_send_group_bcast(sock, m, dlen, timeout);
+		if (dest->addrtype == TIPC_ADDR_NAME)
+			return tipc_send_group_anycast(sock, m, dlen, timeout);
 		if (dest->addrtype == TIPC_ADDR_ID)
 			return tipc_send_group_unicast(sock, m, dlen, timeout);
 		return -EINVAL;
-- 
2.1.4

^ permalink raw reply related	[flat|nested] 22+ messages in thread

* [net-next 13/18] tipc: introduce group multicast messaging
  2017-10-12 14:02 [net-next 00/18] tipc: Introduce Communcation Group feature Jon Maloy
                   ` (11 preceding siblings ...)
  2017-10-12 14:02 ` [net-next 12/18] tipc: introduce group anycast messaging Jon Maloy
@ 2017-10-12 14:02 ` Jon Maloy
  2017-10-12 14:02 ` [net-next 14/18] tipc: guarantee group unicast doesn't bypass group broadcast Jon Maloy
                   ` (4 subsequent siblings)
  17 siblings, 0 replies; 22+ messages in thread
From: Jon Maloy @ 2017-10-12 14:02 UTC (permalink / raw)
  To: davem, netdev; +Cc: parthasarathy.bhuvaragan, ying.xue, tipc-discussion

The previously introduced message transport to all group members is
based on the tipc multicast service, but is logically a broadcast
service within the group, and that is what we call it.

We now add functionality for sending messages to all group members
having a certain identity. Correspondingly, we call this feature 'group
multicast'. The service is using unicast when only one destination is
found, otherwise it will use the bearer broadcast service to transfer
the messages. In the latter case, the receiving members filter arriving
messages by looking at the intended destination instance. If there is
no match, the message will be dropped, while still being considered
received and read as seen by the flow control mechanism.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/group.c  | 14 +++++++++++++-
 net/tipc/msg.h    | 11 +++++++++--
 net/tipc/socket.c | 56 +++++++++++++++++++++++++++++++++++++++++++++++++++----
 3 files changed, 74 insertions(+), 7 deletions(-)

diff --git a/net/tipc/group.c b/net/tipc/group.c
index d3b8aba..2fa49cc7 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -416,10 +416,22 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
 	if (!tipc_group_is_receiver(m))
 		goto drop;
 
+	m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
+
+	/* Drop multicast here if not for this member */
+	if (mtyp == TIPC_GRP_MCAST_MSG) {
+		if (msg_nameinst(hdr) != grp->instance) {
+			m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
+			tipc_group_update_rcv_win(grp, msg_blocks(hdr),
+						  node, port, xmitq);
+			kfree_skb(skb);
+			return;
+		}
+	}
+
 	TIPC_SKB_CB(skb)->orig_member = m->instance;
 	__skb_queue_tail(inputq, skb);
 
-	m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
 	return;
 drop:
 	kfree_skb(skb);
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 7430fe7..b40a050 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -67,7 +67,8 @@ struct plist;
 #define TIPC_DIRECT_MSG		3
 #define TIPC_GRP_MEMBER_EVT	4
 #define TIPC_GRP_BCAST_MSG	5
-#define TIPC_GRP_UCAST_MSG	6
+#define TIPC_GRP_MCAST_MSG	6
+#define TIPC_GRP_UCAST_MSG	7
 
 /*
  * Internal message users
@@ -195,6 +196,11 @@ static inline u32 msg_size(struct tipc_msg *m)
 	return msg_bits(m, 0, 0, 0x1ffff);
 }
 
+static inline u32 msg_blocks(struct tipc_msg *m)
+{
+	return (msg_size(m) / 1024) + 1;
+}
+
 static inline u32 msg_data_sz(struct tipc_msg *m)
 {
 	return msg_size(m) - msg_hdr_sz(m);
@@ -279,7 +285,8 @@ static inline u32 msg_mcast(struct tipc_msg *m)
 {
 	int mtyp = msg_type(m);
 
-	return ((mtyp == TIPC_MCAST_MSG) || (mtyp == TIPC_GRP_BCAST_MSG));
+	return ((mtyp == TIPC_MCAST_MSG) || (mtyp == TIPC_GRP_BCAST_MSG) ||
+		(mtyp == TIPC_GRP_MCAST_MSG));
 }
 
 static inline u32 msg_connected(struct tipc_msg *m)
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 5ea93ac..ad41f64 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -994,6 +994,7 @@ static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
 static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
 				 int dlen, long timeout)
 {
+	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
 	struct sock *sk = sock->sk;
 	struct net *net = sock_net(sk);
 	struct tipc_sock *tsk = tipc_sk(sk);
@@ -1016,11 +1017,16 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
 		return rc;
 
 	/* Complete message header */
-	msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
+	if (dest) {
+		msg_set_type(hdr, TIPC_GRP_MCAST_MSG);
+		msg_set_nameinst(hdr, dest->addr.name.name.instance);
+	} else {
+		msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
+		msg_set_nameinst(hdr, 0);
+	}
 	msg_set_hdr_sz(hdr, GROUP_H_SIZE);
 	msg_set_destport(hdr, 0);
 	msg_set_destnode(hdr, 0);
-	msg_set_nameinst(hdr, 0);
 	msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp));
 
 	/* Build message as chain of buffers */
@@ -1041,6 +1047,48 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
 }
 
 /**
+ * tipc_send_group_mcast - send message to all members with given identity
+ * @sock: socket structure
+ * @m: message to send
+ * @dlen: total length of message data
+ * @timeout: timeout to wait for wakeup
+ *
+ * Called from function tipc_sendmsg(), which has done all sanity checks
+ * Returns the number of bytes sent on success, or errno
+ */
+static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
+				 int dlen, long timeout)
+{
+	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
+	struct sock *sk = sock->sk;
+	struct net *net = sock_net(sk);
+	struct tipc_sock *tsk = tipc_sk(sk);
+	struct tipc_group *grp = tsk->group;
+	struct tipc_name_seq *seq = &dest->addr.nameseq;
+	struct list_head dsts;
+	u32 domain, exclude, dstcnt;
+
+	INIT_LIST_HEAD(&dsts);
+
+	if (seq->lower != seq->upper)
+		return -ENOTSUPP;
+
+	domain = addr_domain(net, dest->scope);
+	exclude = tipc_group_exclude(grp);
+	if (!tipc_nametbl_lookup(net, seq->type, seq->lower, domain,
+				 &dsts, &dstcnt, exclude, true))
+		return -EHOSTUNREACH;
+
+	if (dstcnt == 1) {
+		tipc_dest_pop(&dsts, &dest->addr.id.node, &dest->addr.id.ref);
+		return tipc_send_group_unicast(sock, m, dlen, timeout);
+	}
+
+	tipc_dest_list_purge(&dsts);
+	return tipc_send_group_bcast(sock, m, dlen, timeout);
+}
+
+/**
  * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
  * @arrvq: queue with arriving messages, to be cloned after destination lookup
  * @inputq: queue with cloned messages, delivered to socket after dest lookup
@@ -1208,6 +1256,8 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
 			return tipc_send_group_anycast(sock, m, dlen, timeout);
 		if (dest->addrtype == TIPC_ADDR_ID)
 			return tipc_send_group_unicast(sock, m, dlen, timeout);
+		if (dest->addrtype == TIPC_ADDR_MCAST)
+			return tipc_send_group_mcast(sock, m, dlen, timeout);
 		return -EINVAL;
 	}
 
@@ -2021,8 +2071,6 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
 
 	if (unlikely(!msg_isdata(hdr)))
 		tipc_sk_proto_rcv(sk, &inputq, xmitq);
-	else if (unlikely(msg_type(hdr) > TIPC_GRP_UCAST_MSG))
-		return kfree_skb(skb);
 
 	if (unlikely(grp))
 		tipc_group_filter_msg(grp, &inputq, xmitq);
-- 
2.1.4

^ permalink raw reply related	[flat|nested] 22+ messages in thread

* [net-next 14/18] tipc: guarantee group unicast doesn't bypass group broadcast
  2017-10-12 14:02 [net-next 00/18] tipc: Introduce Communcation Group feature Jon Maloy
                   ` (12 preceding siblings ...)
  2017-10-12 14:02 ` [net-next 13/18] tipc: introduce group multicast messaging Jon Maloy
@ 2017-10-12 14:02 ` Jon Maloy
  2017-10-12 14:02 ` [net-next 15/18] tipc: guarantee that group broadcast doesn't bypass group unicast Jon Maloy
                   ` (3 subsequent siblings)
  17 siblings, 0 replies; 22+ messages in thread
From: Jon Maloy @ 2017-10-12 14:02 UTC (permalink / raw)
  To: davem, netdev; +Cc: parthasarathy.bhuvaragan, ying.xue, tipc-discussion

Group unicast messages don't follow the same path as broadcast messages,
and there is a high risk that unicasts sent from a socket might bypass
previously sent broadcasts from the same socket.

We fix this by letting all unicast messages carry the sequence number of
the next sent broadcast from the same node, but without updating this
number at the receiver. This way, a receiver can check and if necessary
re-order such messages before they are added to the socket receive buffer.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/group.c  | 87 +++++++++++++++++++++++++++++++++++++++++++++----------
 net/tipc/socket.c |  2 ++
 2 files changed, 74 insertions(+), 15 deletions(-)

diff --git a/net/tipc/group.c b/net/tipc/group.c
index 2fa49cc7..5edceb6 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -62,6 +62,7 @@ struct tipc_member {
 	struct list_head list;
 	struct list_head congested;
 	struct sk_buff *event_msg;
+	struct sk_buff_head deferredq;
 	struct tipc_group *group;
 	u32 node;
 	u32 port;
@@ -253,6 +254,7 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
 		return NULL;
 	INIT_LIST_HEAD(&m->list);
 	INIT_LIST_HEAD(&m->congested);
+	__skb_queue_head_init(&m->deferredq);
 	m->group = grp;
 	m->node = node;
 	m->port = port;
@@ -383,29 +385,54 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len)
 	return tipc_group_cong(grp, m->node, m->port, len, &m);
 }
 
+/* tipc_group_sort_msg() - sort msg into queue by bcast sequence number
+ */
+static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq)
+{
+	struct sk_buff *_skb, *tmp;
+	struct tipc_msg *_hdr, *hdr = buf_msg(skb);
+	u16 bc_seqno = msg_grp_bc_seqno(hdr);
+	int mtyp = msg_type(hdr);
+
+	/* Bcast may be bypassed by unicast, - sort it in */
+	if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) {
+		skb_queue_walk_safe(defq, _skb, tmp) {
+			_hdr = buf_msg(_skb);
+			if (!less(bc_seqno, msg_grp_bc_seqno(_hdr)))
+				continue;
+			__skb_queue_before(defq, _skb, skb);
+			return;
+		}
+		/* Bcast was not bypassed, - add to tail */
+	}
+	/* Unicasts are never bypassed, - always add to tail */
+	__skb_queue_tail(defq, skb);
+}
+
 /* tipc_group_filter_msg() - determine if we should accept arriving message
  */
 void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
 			   struct sk_buff_head *xmitq)
 {
 	struct sk_buff *skb = __skb_dequeue(inputq);
+	struct sk_buff_head *defq;
 	struct tipc_member *m;
 	struct tipc_msg *hdr;
+	bool deliver, update;
 	u32 node, port;
-	int mtyp;
+	int mtyp, blks;
 
 	if (!skb)
 		return;
 
 	hdr = buf_msg(skb);
-	mtyp = msg_type(hdr);
 	node =  msg_orignode(hdr);
 	port = msg_origport(hdr);
 
 	if (!msg_in_group(hdr))
 		goto drop;
 
-	if (mtyp == TIPC_GRP_MEMBER_EVT) {
+	if (msg_is_grp_evt(hdr)) {
 		if (!grp->events)
 			goto drop;
 		__skb_queue_tail(inputq, skb);
@@ -416,22 +443,52 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
 	if (!tipc_group_is_receiver(m))
 		goto drop;
 
-	m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
+	if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
+		goto drop;
 
-	/* Drop multicast here if not for this member */
-	if (mtyp == TIPC_GRP_MCAST_MSG) {
-		if (msg_nameinst(hdr) != grp->instance) {
-			m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
-			tipc_group_update_rcv_win(grp, msg_blocks(hdr),
-						  node, port, xmitq);
-			kfree_skb(skb);
-			return;
+	TIPC_SKB_CB(skb)->orig_member = m->instance;
+	defq = &m->deferredq;
+	tipc_group_sort_msg(skb, defq);
+
+	while ((skb = skb_peek(defq))) {
+		hdr = buf_msg(skb);
+		mtyp = msg_type(hdr);
+		deliver = true;
+		update = false;
+
+		if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
+			break;
+
+		/* Decide what to do with message */
+		switch (mtyp) {
+		case TIPC_GRP_MCAST_MSG:
+			if (msg_nameinst(hdr) != grp->instance) {
+				update = true;
+				deliver = false;
+			}
+			/* Fall thru */
+		case TIPC_GRP_BCAST_MSG:
+			m->bc_rcv_nxt++;
+			break;
+		case TIPC_GRP_UCAST_MSG:
+			break;
+		default:
+			break;
 		}
-	}
 
-	TIPC_SKB_CB(skb)->orig_member = m->instance;
-	__skb_queue_tail(inputq, skb);
+		/* Execute decisions */
+		__skb_dequeue(defq);
+		if (deliver)
+			__skb_queue_tail(inputq, skb);
+		else
+			kfree_skb(skb);
+
+		if (!update)
+			continue;
 
+		blks = msg_blocks(hdr);
+		tipc_group_update_rcv_win(grp, blks, node, port, xmitq);
+	}
 	return;
 drop:
 	kfree_skb(skb);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index ad41f64..015cdd9 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -827,6 +827,7 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
 {
 	struct tipc_msg *hdr = &tsk->phdr;
 	struct sk_buff_head pkts;
+	u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group);
 	int blks = tsk_blocks(GROUP_H_SIZE + dlen);
 	int mtu, rc;
 
@@ -835,6 +836,7 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
 	msg_set_hdr_sz(hdr, GROUP_H_SIZE);
 	msg_set_destport(hdr, dport);
 	msg_set_destnode(hdr, dnode);
+	msg_set_grp_bc_seqno(hdr, bc_snd_nxt);
 
 	/* Build message as chain of buffers */
 	skb_queue_head_init(&pkts);
-- 
2.1.4

^ permalink raw reply related	[flat|nested] 22+ messages in thread

* [net-next 15/18] tipc: guarantee that group broadcast doesn't bypass group unicast
  2017-10-12 14:02 [net-next 00/18] tipc: Introduce Communcation Group feature Jon Maloy
                   ` (13 preceding siblings ...)
  2017-10-12 14:02 ` [net-next 14/18] tipc: guarantee group unicast doesn't bypass group broadcast Jon Maloy
@ 2017-10-12 14:02 ` Jon Maloy
  2017-10-12 14:02 ` [net-next 16/18] tipc: guarantee delivery of UP event before first broadcast Jon Maloy
                   ` (2 subsequent siblings)
  17 siblings, 0 replies; 22+ messages in thread
From: Jon Maloy @ 2017-10-12 14:02 UTC (permalink / raw)
  To: davem, netdev; +Cc: parthasarathy.bhuvaragan, ying.xue, tipc-discussion

We need a mechanism guaranteeing that group unicasts sent out from a
socket are not bypassed by later sent broadcasts from the same socket.
We do this as follows:

- Each time a unicast is sent, we set a the broadcast method for the
  socket to "replicast" and "mandatory". This forces the first
  subsequent broadcast message to follow the same network and data path
  as the preceding unicast to a destination, hence preventing it from
  overtaking the latter.

- In order to make the 'same data path' statement above true, we let
  group unicasts pass through the multicast link input queue, instead
  of as previously through the unicast link input queue.

- In the first broadcast following a unicast, we set a new header flag,
  requiring all recipients to immediately acknowledge its reception.

- During the period before all the expected acknowledges are received,
  the socket refuses to accept any more broadcast attempts, i.e., by
  blocking or returning EAGAIN. This period should typically not be
  longer than a few microseconds.

- When all acknowledges have been received, the sending socket will
  open up for subsequent broadcasts, this time giving the link layer
  freedom to itself select the best transmission method.

- The forced and/or abrupt transmission method changes described above
  may lead to broadcasts arriving out of order to the recipients. We
  remedy this by introducing code that checks and if necessary
  re-orders such messages at the receiving end.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/group.c  | 47 +++++++++++++++++++++++++++++++++++++++++------
 net/tipc/group.h  |  4 +---
 net/tipc/link.c   |  5 ++---
 net/tipc/msg.h    | 21 +++++++++++++++++++++
 net/tipc/socket.c | 34 +++++++++++++++++++++++++++++-----
 5 files changed, 94 insertions(+), 17 deletions(-)

diff --git a/net/tipc/group.c b/net/tipc/group.c
index 5edceb6..1203929 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -71,6 +71,7 @@ struct tipc_member {
 	u16 advertised;
 	u16 window;
 	u16 bc_rcv_nxt;
+	u16 bc_acked;
 	bool usr_pending;
 };
 
@@ -87,6 +88,7 @@ struct tipc_group {
 	u32 portid;
 	u16 member_cnt;
 	u16 bc_snd_nxt;
+	u16 bc_ackers;
 	bool loopback;
 	bool events;
 };
@@ -258,6 +260,7 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
 	m->group = grp;
 	m->node = node;
 	m->port = port;
+	m->bc_acked = grp->bc_snd_nxt - 1;
 	grp->member_cnt++;
 	tipc_group_add_to_tree(grp, m);
 	tipc_nlist_add(&grp->dests, m->node);
@@ -275,6 +278,11 @@ static void tipc_group_delete_member(struct tipc_group *grp,
 {
 	rb_erase(&m->tree_node, &grp->members);
 	grp->member_cnt--;
+
+	/* Check if we were waiting for replicast ack from this member */
+	if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1))
+		grp->bc_ackers--;
+
 	list_del_init(&m->list);
 	list_del_init(&m->congested);
 
@@ -325,16 +333,23 @@ void tipc_group_update_member(struct tipc_member *m, int len)
 	list_add_tail(&m->congested, &grp->congested);
 }
 
-void tipc_group_update_bc_members(struct tipc_group *grp, int len)
+void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack)
 {
 	struct tipc_member *m;
 	struct rb_node *n;
+	u16 prev = grp->bc_snd_nxt - 1;
 
 	for (n = rb_first(&grp->members); n; n = rb_next(n)) {
 		m = container_of(n, struct tipc_member, tree_node);
-		if (tipc_group_is_enabled(m))
+		if (tipc_group_is_enabled(m)) {
 			tipc_group_update_member(m, len);
+			m->bc_acked = prev;
+		}
 	}
+
+	/* Mark number of acknowledges to expect, if any */
+	if (ack)
+		grp->bc_ackers = grp->member_cnt;
 	grp->bc_snd_nxt++;
 }
 
@@ -375,6 +390,10 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len)
 {
 	struct tipc_member *m = NULL;
 
+	/* If prev bcast was replicast, reject until all receivers have acked */
+	if (grp->bc_ackers)
+		return true;
+
 	if (list_empty(&grp->congested))
 		return false;
 
@@ -394,7 +413,7 @@ static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq)
 	u16 bc_seqno = msg_grp_bc_seqno(hdr);
 	int mtyp = msg_type(hdr);
 
-	/* Bcast may be bypassed by unicast, - sort it in */
+	/* Bcast may be bypassed by unicast or other bcast, - sort it in */
 	if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) {
 		skb_queue_walk_safe(defq, _skb, tmp) {
 			_hdr = buf_msg(_skb);
@@ -418,7 +437,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
 	struct sk_buff_head *defq;
 	struct tipc_member *m;
 	struct tipc_msg *hdr;
-	bool deliver, update;
+	bool ack, deliver, update;
 	u32 node, port;
 	int mtyp, blks;
 
@@ -454,6 +473,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
 		hdr = buf_msg(skb);
 		mtyp = msg_type(hdr);
 		deliver = true;
+		ack = false;
 		update = false;
 
 		if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
@@ -469,6 +489,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
 			/* Fall thru */
 		case TIPC_GRP_BCAST_MSG:
 			m->bc_rcv_nxt++;
+			ack = msg_grp_bc_ack_req(hdr);
 			break;
 		case TIPC_GRP_UCAST_MSG:
 			break;
@@ -483,6 +504,9 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
 		else
 			kfree_skb(skb);
 
+		if (ack)
+			tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq);
+
 		if (!update)
 			continue;
 
@@ -543,6 +567,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
 	} else if (mtyp == GRP_ADV_MSG) {
 		msg_set_adv_win(hdr, adv);
 		m->advertised += adv;
+	} else if (mtyp == GRP_ACK_MSG) {
+		msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt);
 	}
 	__skb_queue_tail(xmitq, skb);
 }
@@ -596,7 +622,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
 		}
 		/* Otherwise deliver already received WITHDRAW event */
 		__skb_queue_tail(inputq, m->event_msg);
-		*usr_wakeup = m->usr_pending;
+		*usr_wakeup = true;
 		tipc_group_delete_member(grp, m);
 		list_del_init(&m->congested);
 		return;
@@ -608,6 +634,15 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
 		m->usr_pending = false;
 		list_del_init(&m->congested);
 		return;
+	case GRP_ACK_MSG:
+		if (!m)
+			return;
+		m->bc_acked = msg_grp_bc_acked(hdr);
+		if (--grp->bc_ackers)
+			break;
+		*usr_wakeup = true;
+		m->usr_pending = false;
+		return;
 	default:
 		pr_warn("Received unknown GROUP_PROTO message\n");
 	}
@@ -681,7 +716,7 @@ void tipc_group_member_evt(struct tipc_group *grp,
 
 		TIPC_SKB_CB(skb)->orig_member = m->instance;
 
-		*usr_wakeup = m->usr_pending;
+		*usr_wakeup = true;
 		m->usr_pending = false;
 
 		/* Hold back event if more messages might be expected */
diff --git a/net/tipc/group.h b/net/tipc/group.h
index e432066..d525e1c 100644
--- a/net/tipc/group.h
+++ b/net/tipc/group.h
@@ -61,7 +61,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup,
 			  struct tipc_msg *hdr,
 			  struct sk_buff_head *inputq,
 			  struct sk_buff_head *xmitq);
-void tipc_group_update_bc_members(struct tipc_group *grp, int len);
+void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack);
 bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
 		     int len, struct tipc_member **m);
 bool tipc_group_bc_cong(struct tipc_group *grp, int len);
@@ -69,7 +69,5 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
 			       u32 port, struct sk_buff_head *xmitq);
 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
 void tipc_group_update_member(struct tipc_member *m, int len);
-struct tipc_member *tipc_group_find_sender(struct tipc_group *grp,
-					   u32 node, u32 port);
 int tipc_group_size(struct tipc_group *grp);
 #endif
diff --git a/net/tipc/link.c b/net/tipc/link.c
index bd25bff..70a2149 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1046,13 +1046,12 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
 	case TIPC_MEDIUM_IMPORTANCE:
 	case TIPC_HIGH_IMPORTANCE:
 	case TIPC_CRITICAL_IMPORTANCE:
-		if (unlikely(msg_mcast(hdr))) {
+		if (unlikely(msg_in_group(hdr) || msg_mcast(hdr))) {
 			skb_queue_tail(l->bc_rcvlink->inputq, skb);
 			return true;
 		}
-	case CONN_MANAGER:
 	case GROUP_PROTOCOL:
-		skb_queue_tail(inputq, skb);
+	case CONN_MANAGER:
 		return true;
 	case NAME_DISTRIBUTOR:
 		l->bc_rcvlink->state = LINK_ESTABLISHED;
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index b40a050..1c088a1 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -547,6 +547,7 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n)
 #define GRP_JOIN_MSG         0
 #define GRP_LEAVE_MSG        1
 #define GRP_ADV_MSG          2
+#define GRP_ACK_MSG          3
 
 /*
  * Word 1
@@ -839,6 +840,16 @@ static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n)
 	msg_set_bits(m, 9, 16, 0xffff, n);
 }
 
+static inline u16 msg_grp_bc_acked(struct tipc_msg *m)
+{
+	return msg_bits(m, 9, 16, 0xffff);
+}
+
+static inline void msg_set_grp_bc_acked(struct tipc_msg *m, u16 n)
+{
+	msg_set_bits(m, 9, 16, 0xffff, n);
+}
+
 /* Word 10
  */
 static inline u16 msg_grp_evt(struct tipc_msg *m)
@@ -851,6 +862,16 @@ static inline void msg_set_grp_evt(struct tipc_msg *m, int n)
 	msg_set_bits(m, 10, 0, 0x3, n);
 }
 
+static inline u16 msg_grp_bc_ack_req(struct tipc_msg *m)
+{
+	return msg_bits(m, 10, 0, 0x1);
+}
+
+static inline void msg_set_grp_bc_ack_req(struct tipc_msg *m, bool n)
+{
+	msg_set_bits(m, 10, 0, 0x1, n);
+}
+
 static inline u16 msg_grp_bc_seqno(struct tipc_msg *m)
 {
 	return msg_bits(m, 10, 16, 0xffff);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 015cdd9..379c5b9 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -825,6 +825,7 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
 			       struct msghdr *m, struct tipc_member *mb,
 			       u32 dnode, u32 dport, int dlen)
 {
+	struct tipc_mc_method *method = &tsk->mc_method;
 	struct tipc_msg *hdr = &tsk->phdr;
 	struct sk_buff_head pkts;
 	u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group);
@@ -852,9 +853,12 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
 		tsk->cong_link_cnt++;
 	}
 
-	/* Update send window and sequence number */
+	/* Update send window */
 	tipc_group_update_member(mb, blks);
 
+	/* A broadcast sent within next EXPIRE period must follow same path */
+	method->rcast = true;
+	method->mandatory = true;
 	return dlen;
 }
 
@@ -1005,6 +1009,7 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
 	struct tipc_nlist *dsts = tipc_group_dests(grp);
 	struct tipc_mc_method *method = &tsk->mc_method;
 	struct sk_buff_head pkts;
+	bool ack = method->mandatory && method->rcast;
 	int blks = tsk_blocks(MCAST_H_SIZE + dlen);
 	int mtu = tipc_bcast_get_mtu(net);
 	int rc = -EHOSTUNREACH;
@@ -1031,6 +1036,9 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
 	msg_set_destnode(hdr, 0);
 	msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp));
 
+	/* Avoid getting stuck with repeated forced replicasts */
+	msg_set_grp_bc_ack_req(hdr, ack);
+
 	/* Build message as chain of buffers */
 	skb_queue_head_init(&pkts);
 	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
@@ -1038,13 +1046,17 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
 		return rc;
 
 	/* Send message */
-	rc = tipc_mcast_xmit(net, &pkts, method, dsts,
-			     &tsk->cong_link_cnt);
+	rc = tipc_mcast_xmit(net, &pkts, method, dsts, &tsk->cong_link_cnt);
 	if (unlikely(rc))
 		return rc;
 
 	/* Update broadcast sequence number and send windows */
-	tipc_group_update_bc_members(tsk->group, blks);
+	tipc_group_update_bc_members(tsk->group, blks, ack);
+
+	/* Broadcast link is now free to choose method for next broadcast */
+	method->mandatory = false;
+	method->expires = jiffies;
+
 	return dlen;
 }
 
@@ -1106,9 +1118,9 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
 	u32 self = tipc_own_addr(net);
 	u32 scope = TIPC_CLUSTER_SCOPE;
 	struct sk_buff_head tmpq;
-	uint hsz;
 	struct sk_buff *skb, *_skb;
 	u32 lower = 0, upper = ~0;
+	int user, mtyp, hsz;
 
 	__skb_queue_head_init(&tmpq);
 	INIT_LIST_HEAD(&dports);
@@ -1116,6 +1128,18 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
 	skb = tipc_skb_peek(arrvq, &inputq->lock);
 	for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
 		msg = buf_msg(skb);
+		user = msg_user(msg);
+		mtyp = msg_type(msg);
+		if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
+			spin_lock_bh(&inputq->lock);
+			if (skb_peek(arrvq) == skb) {
+				__skb_dequeue(arrvq);
+				__skb_queue_tail(inputq, skb);
+			}
+			refcount_dec(&skb->users);
+			spin_unlock_bh(&inputq->lock);
+			continue;
+		}
 		hsz = skb_headroom(skb) + msg_hdr_sz(msg);
 		oport = msg_origport(msg);
 		onode = msg_orignode(msg);
-- 
2.1.4

^ permalink raw reply related	[flat|nested] 22+ messages in thread

* [net-next 16/18] tipc: guarantee delivery of UP event before first broadcast
  2017-10-12 14:02 [net-next 00/18] tipc: Introduce Communcation Group feature Jon Maloy
                   ` (14 preceding siblings ...)
  2017-10-12 14:02 ` [net-next 15/18] tipc: guarantee that group broadcast doesn't bypass group unicast Jon Maloy
@ 2017-10-12 14:02 ` Jon Maloy
  2017-10-12 14:02 ` [net-next 17/18] tipc: guarantee delivery of last broadcast before DOWN event Jon Maloy
  2017-10-12 14:02 ` [net-next 18/18] tipc: add multipoint-to-point flow control Jon Maloy
  17 siblings, 0 replies; 22+ messages in thread
From: Jon Maloy @ 2017-10-12 14:02 UTC (permalink / raw)
  To: davem, netdev; +Cc: parthasarathy.bhuvaragan, ying.xue, tipc-discussion

The following scenario is possible:
- A user joins a group, and immediately sends out a broadcast message
  to its members.
- The broadcast message, following a different data path than the
  initial JOIN message sent out during the joining procedure, arrives
  to a receiver before the latter..
- The receiver drops the message, since it is not ready to accept any
  messages until the JOIN has arrived.

We avoid this by treating group protocol JOIN messages like unicast
messages.
- We let them pass through the recipient's multicast input queue, just
  like ordinary unicasts.
- We force the first following broadacst to be sent as replicated
  unicast and being acknowledged by the recipient before accepting
  any more broadcast transmissions.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/link.c   | 7 +++++--
 net/tipc/socket.c | 4 ++++
 2 files changed, 9 insertions(+), 2 deletions(-)

diff --git a/net/tipc/link.c b/net/tipc/link.c
index 70a2149..723dd69 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1039,6 +1039,7 @@ int tipc_link_retrans(struct tipc_link *l, struct tipc_link *nacker,
 static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
 			    struct sk_buff_head *inputq)
 {
+	struct sk_buff_head *mc_inputq = l->bc_rcvlink->inputq;
 	struct tipc_msg *hdr = buf_msg(skb);
 
 	switch (msg_user(hdr)) {
@@ -1047,12 +1048,14 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
 	case TIPC_HIGH_IMPORTANCE:
 	case TIPC_CRITICAL_IMPORTANCE:
 		if (unlikely(msg_in_group(hdr) || msg_mcast(hdr))) {
-			skb_queue_tail(l->bc_rcvlink->inputq, skb);
+			skb_queue_tail(mc_inputq, skb);
 			return true;
 		}
-	case GROUP_PROTOCOL:
 	case CONN_MANAGER:
 		return true;
+	case GROUP_PROTOCOL:
+		skb_queue_tail(mc_inputq, skb);
+		return true;
 	case NAME_DISTRIBUTOR:
 		l->bc_rcvlink->state = LINK_ESTABLISHED;
 		skb_queue_tail(l->namedq, skb);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 379c5b9..358a603 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -2761,6 +2761,10 @@ static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
 	rc = tipc_sk_publish(tsk, mreq->scope, &seq);
 	if (rc)
 		tipc_group_delete(net, grp);
+
+	/* Eliminate any risk that a broadcast overtakes the sent JOIN */
+	tsk->mc_method.rcast = true;
+	tsk->mc_method.mandatory = true;
 	return rc;
 }
 
-- 
2.1.4

^ permalink raw reply related	[flat|nested] 22+ messages in thread

* [net-next 17/18] tipc: guarantee delivery of last broadcast before DOWN event
  2017-10-12 14:02 [net-next 00/18] tipc: Introduce Communcation Group feature Jon Maloy
                   ` (15 preceding siblings ...)
  2017-10-12 14:02 ` [net-next 16/18] tipc: guarantee delivery of UP event before first broadcast Jon Maloy
@ 2017-10-12 14:02 ` Jon Maloy
  2017-10-12 14:02 ` [net-next 18/18] tipc: add multipoint-to-point flow control Jon Maloy
  17 siblings, 0 replies; 22+ messages in thread
From: Jon Maloy @ 2017-10-12 14:02 UTC (permalink / raw)
  To: davem, netdev; +Cc: parthasarathy.bhuvaragan, ying.xue, tipc-discussion

The following scenario is possible:
- A user sends a broadcast message, and thereafter immediately leaves
  the group.
- The LEAVE message, following a different path than the broadcast,
  arrives ahead of the broadcast, and the sending member is removed
  from the receiver's list.
- The broadcast message arrives, but is dropped because the sender
  now is unknown to the receipient.

We fix this by sequence numbering membership events, just like ordinary
unicast messages. Currently, when a JOIN is sent to a peer, it contains
a synchronization point, - the sequence number of the next sent
broadcast, in order to give the receiver a start synchronization point.
We now let even LEAVE messages contain such an "end synchronization"
point, so that the recipient can delay the removal of the sending member
until it knows that all messages have been received.

The received synchronization points are added as sequence numbers to the
generated membership events, making it possible to handle them almost
the same way as regular unicasts in the receiving filter function. In
particular, a DOWN event with a too high sequence number will be kept
in the reordering queue until the missing broadcast(s) arrive and have
been delivered.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/group.c | 45 ++++++++++++++++++++++++++++++++-------------
 1 file changed, 32 insertions(+), 13 deletions(-)

diff --git a/net/tipc/group.c b/net/tipc/group.c
index 1203929..0a405f2 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -71,6 +71,7 @@ struct tipc_member {
 	u16 advertised;
 	u16 window;
 	u16 bc_rcv_nxt;
+	u16 bc_syncpt;
 	u16 bc_acked;
 	bool usr_pending;
 };
@@ -413,7 +414,7 @@ static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq)
 	u16 bc_seqno = msg_grp_bc_seqno(hdr);
 	int mtyp = msg_type(hdr);
 
-	/* Bcast may be bypassed by unicast or other bcast, - sort it in */
+	/* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */
 	if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) {
 		skb_queue_walk_safe(defq, _skb, tmp) {
 			_hdr = buf_msg(_skb);
@@ -437,7 +438,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
 	struct sk_buff_head *defq;
 	struct tipc_member *m;
 	struct tipc_msg *hdr;
-	bool ack, deliver, update;
+	bool ack, deliver, update, leave = false;
 	u32 node, port;
 	int mtyp, blks;
 
@@ -451,13 +452,6 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
 	if (!msg_in_group(hdr))
 		goto drop;
 
-	if (msg_is_grp_evt(hdr)) {
-		if (!grp->events)
-			goto drop;
-		__skb_queue_tail(inputq, skb);
-		return;
-	}
-
 	m = tipc_group_find_member(grp, node, port);
 	if (!tipc_group_is_receiver(m))
 		goto drop;
@@ -493,6 +487,12 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
 			break;
 		case TIPC_GRP_UCAST_MSG:
 			break;
+		case TIPC_GRP_MEMBER_EVT:
+			if (m->state == MBR_LEAVING)
+				leave = true;
+			if (!grp->events)
+				deliver = false;
+			break;
 		default:
 			break;
 		}
@@ -507,6 +507,11 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
 		if (ack)
 			tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq);
 
+		if (leave) {
+			tipc_group_delete_member(grp, m);
+			__skb_queue_purge(defq);
+			break;
+		}
 		if (!update)
 			continue;
 
@@ -564,6 +569,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
 		msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
 		msg_set_adv_win(hdr, adv);
 		m->advertised += adv;
+	} else if (mtyp == GRP_LEAVE_MSG) {
+		msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
 	} else if (mtyp == GRP_ADV_MSG) {
 		msg_set_adv_win(hdr, adv);
 		m->advertised += adv;
@@ -578,6 +585,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
 			  struct sk_buff_head *xmitq)
 {
 	struct tipc_member *m;
+	struct tipc_msg *ehdr;
 	u32 node = msg_orignode(hdr);
 	u32 port = msg_origport(hdr);
 
@@ -593,7 +601,8 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
 						     MBR_QUARANTINED);
 		if (!m)
 			return;
-		m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
+		m->bc_syncpt = msg_grp_bc_syncpt(hdr);
+		m->bc_rcv_nxt = m->bc_syncpt;
 		m->window += msg_adv_win(hdr);
 
 		/* Wait until PUBLISH event is received */
@@ -604,6 +613,8 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
 			*usr_wakeup = true;
 			m->usr_pending = false;
 			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
+			ehdr = buf_msg(m->event_msg);
+			msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
 			__skb_queue_tail(inputq, m->event_msg);
 		}
 		if (m->window < ADV_IDLE)
@@ -614,6 +625,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
 	case GRP_LEAVE_MSG:
 		if (!m)
 			return;
+		m->bc_syncpt = msg_grp_bc_syncpt(hdr);
 
 		/* Wait until WITHDRAW event is received */
 		if (m->state != MBR_LEAVING) {
@@ -621,9 +633,10 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
 			return;
 		}
 		/* Otherwise deliver already received WITHDRAW event */
+		ehdr = buf_msg(m->event_msg);
+		msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
 		__skb_queue_tail(inputq, m->event_msg);
 		*usr_wakeup = true;
-		tipc_group_delete_member(grp, m);
 		list_del_init(&m->congested);
 		return;
 	case GRP_ADV_MSG:
@@ -666,6 +679,7 @@ void tipc_group_member_evt(struct tipc_group *grp,
 	u32 port = evt->port.ref;
 	u32 node = evt->port.node;
 	u32 self;
+	bool node_up;
 
 	if (!grp)
 		goto drop;
@@ -698,6 +712,7 @@ void tipc_group_member_evt(struct tipc_group *grp,
 			m->event_msg = skb;
 			m->state = MBR_PUBLISHED;
 		} else {
+			msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
 			__skb_queue_tail(inputq, skb);
 			m->state = MBR_JOINED;
 			*usr_wakeup = true;
@@ -718,14 +733,18 @@ void tipc_group_member_evt(struct tipc_group *grp,
 
 		*usr_wakeup = true;
 		m->usr_pending = false;
+		node_up = tipc_node_is_up(net, node);
 
 		/* Hold back event if more messages might be expected */
-		if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) {
+		if (m->state != MBR_LEAVING && node_up) {
 			m->event_msg = skb;
 			m->state = MBR_LEAVING;
 		} else {
+			if (node_up)
+				msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
+			else
+				msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt);
 			__skb_queue_tail(inputq, skb);
-			tipc_group_delete_member(grp, m);
 		}
 		list_del_init(&m->congested);
 	}
-- 
2.1.4

^ permalink raw reply related	[flat|nested] 22+ messages in thread

* [net-next 18/18] tipc: add multipoint-to-point flow control
  2017-10-12 14:02 [net-next 00/18] tipc: Introduce Communcation Group feature Jon Maloy
                   ` (16 preceding siblings ...)
  2017-10-12 14:02 ` [net-next 17/18] tipc: guarantee delivery of last broadcast before DOWN event Jon Maloy
@ 2017-10-12 14:02 ` Jon Maloy
  17 siblings, 0 replies; 22+ messages in thread
From: Jon Maloy @ 2017-10-12 14:02 UTC (permalink / raw)
  To: davem, netdev; +Cc: parthasarathy.bhuvaragan, ying.xue, tipc-discussion

We already have point-to-multipoint flow control within a group. But
we even need the opposite; -a scheme which can handle that potentially
hundreds of sources may try to send messages to the same destination
simultaneously without causing buffer overflow at the recipient. This
commit adds such a mechanism.

The algorithm works as follows:

- When a member detects a new, joining member, it initially set its
  state to JOINED and advertises a minimum window to the new member.
  This window is chosen so that the new member can send exactly one
  maximum sized message, or several smaller ones, to the recipient
  before it must stop and wait for an additional advertisement. This
  minimum window ADV_IDLE is set to 65 1kB blocks.

- When a member receives the first data message from a JOINED member,
  it changes the state of the latter to ACTIVE, and advertises a larger
  window ADV_ACTIVE = 12 x ADV_IDLE blocks to the sender, so it can
  continue sending with minimal disturbances to the data flow.

- The active members are kept in a dedicated linked list. Each time a
  message is received from an active member, it will be moved to the
  tail of that list. This way, we keep a record of which members have
  been most (tail) and least (head) recently active.

- There is a maximum number (16) of permitted simultaneous active
  senders per receiver. When this limit is reached, the receiver will
  not advertise anything immediately to a new sender, but instead put
  it in a PENDING state, and add it to a corresponding queue. At the
  same time, it will pick the least recently active member, send it an
  advertisement RECLAIM message, and set this member to state
  RECLAIMING.

- The reclaimee member has to respond with a REMIT message, meaning that
  it goes back to a send window of ADV_IDLE, and returns its unused
  advertised blocks beyond that value to the reclaiming member.

- When the reclaiming member receives the REMIT message, it unlinks
  the reclaimee from its active list, resets its state to JOINED, and
  notes that it is now back at ADV_IDLE advertised blocks to that
  member. If there are still unread data messages sent out by
  reclaimee before the REMIT, the member goes into an intermediate
  state REMITTED, where it stays until the said messages have been
  consumed.

- The returned advertised blocks can now be re-advertised to the
  pending member, which is now set to state ACTIVE and added to
  the active member list.

- To be proactive, i.e., to minimize the risk that any member will
  end up in the pending queue, we start reclaiming resources already
  when the number of active members exceeds 3/4 of the permitted
  maximum.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/group.c | 129 ++++++++++++++++++++++++++++++++++++++++++++++++++++---
 net/tipc/msg.h   |  12 ++++++
 2 files changed, 136 insertions(+), 5 deletions(-)

diff --git a/net/tipc/group.c b/net/tipc/group.c
index 0a405f2..4337732 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -54,6 +54,10 @@ enum mbr_state {
 	MBR_JOINING,
 	MBR_PUBLISHED,
 	MBR_JOINED,
+	MBR_PENDING,
+	MBR_ACTIVE,
+	MBR_RECLAIMING,
+	MBR_REMITTED,
 	MBR_LEAVING
 };
 
@@ -79,6 +83,9 @@ struct tipc_member {
 struct tipc_group {
 	struct rb_root members;
 	struct list_head congested;
+	struct list_head pending;
+	struct list_head active;
+	struct list_head reclaiming;
 	struct tipc_nlist dests;
 	struct net *net;
 	int subid;
@@ -88,6 +95,8 @@ struct tipc_group {
 	u32 scope;
 	u32 portid;
 	u16 member_cnt;
+	u16 active_cnt;
+	u16 max_active;
 	u16 bc_snd_nxt;
 	u16 bc_ackers;
 	bool loopback;
@@ -97,12 +106,29 @@ struct tipc_group {
 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
 				  int mtyp, struct sk_buff_head *xmitq);
 
+static void tipc_group_decr_active(struct tipc_group *grp,
+				   struct tipc_member *m)
+{
+	if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING)
+		grp->active_cnt--;
+}
+
 static int tipc_group_rcvbuf_limit(struct tipc_group *grp)
 {
 	int mcnt = grp->member_cnt + 1;
+	int max_active, active_pool, idle_pool;
+
+	/* Limit simultaneous reception from other members */
+	max_active = min(mcnt / 8, 64);
+	max_active = max(max_active, 16);
+	grp->max_active = max_active;
+
+	/* Reserve blocks for active and idle members */
+	active_pool = max_active * ADV_ACTIVE;
+	idle_pool = (mcnt - max_active) * ADV_IDLE;
 
 	/* Scale to bytes, considering worst-case truesize/msgsize ratio */
-	return mcnt * ADV_ACTIVE * FLOWCTL_BLK_SZ * 4;
+	return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4;
 }
 
 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
@@ -143,6 +169,9 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
 		return NULL;
 	tipc_nlist_init(&grp->dests, tipc_own_addr(net));
 	INIT_LIST_HEAD(&grp->congested);
+	INIT_LIST_HEAD(&grp->active);
+	INIT_LIST_HEAD(&grp->pending);
+	INIT_LIST_HEAD(&grp->reclaiming);
 	grp->members = RB_ROOT;
 	grp->net = net;
 	grp->portid = portid;
@@ -286,6 +315,7 @@ static void tipc_group_delete_member(struct tipc_group *grp,
 
 	list_del_init(&m->list);
 	list_del_init(&m->congested);
+	tipc_group_decr_active(grp, m);
 
 	/* If last member on a node, remove node from dest list */
 	if (!tipc_group_find_node(grp, m->node))
@@ -381,6 +411,10 @@ bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
 		return true;
 	if (state == MBR_JOINED && adv == ADV_IDLE)
 		return true;
+	if (state == MBR_ACTIVE && adv == ADV_ACTIVE)
+		return true;
+	if (state == MBR_PENDING && adv == ADV_IDLE)
+		return true;
 	skb_queue_head_init(&xmitq);
 	tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq);
 	tipc_node_distr_xmit(grp->net, &xmitq);
@@ -526,7 +560,11 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
 void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
 			       u32 port, struct sk_buff_head *xmitq)
 {
-	struct tipc_member *m;
+	struct tipc_member *m, *rm;
+	struct list_head *active = &grp->active;
+	int max_active = grp->max_active;
+	int reclaim_limit = max_active * 3 / 4;
+	int active_cnt = grp->active_cnt;
 
 	m = tipc_group_find_member(grp, node, port);
 	if (!m)
@@ -536,9 +574,41 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
 
 	switch (m->state) {
 	case MBR_JOINED:
-		if (m->advertised <= (ADV_ACTIVE - ADV_UNIT))
+		/* Reclaim advertised space from least active member */
+		if (!list_empty(active) && active_cnt >= reclaim_limit) {
+			rm = list_first_entry(active, struct tipc_member, list);
+			rm->state = MBR_RECLAIMING;
+			list_move_tail(&rm->list, &grp->reclaiming);
+			tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq);
+		}
+		/* If max active, become pending and wait for reclaimed space */
+		if (active_cnt >= max_active) {
+			m->state = MBR_PENDING;
+			list_add_tail(&m->list, &grp->pending);
+			break;
+		}
+		/* Otherwise become active */
+		m->state = MBR_ACTIVE;
+		list_add_tail(&m->list, &grp->active);
+		grp->active_cnt++;
+		/* Fall through */
+	case MBR_ACTIVE:
+		if (!list_is_last(&m->list, &grp->active))
+			list_move_tail(&m->list, &grp->active);
+		if (m->advertised > (ADV_ACTIVE * 3 / 4))
+			break;
+		tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
+		break;
+	case MBR_REMITTED:
+		if (m->advertised > ADV_IDLE)
+			break;
+		m->state = MBR_JOINED;
+		if (m->advertised < ADV_IDLE) {
+			pr_warn_ratelimited("Rcv unexpected msg after REMIT\n");
 			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
+		}
 		break;
+	case MBR_RECLAIMING:
 	case MBR_DISCOVERED:
 	case MBR_JOINING:
 	case MBR_LEAVING:
@@ -560,8 +630,10 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
 	if (!skb)
 		return;
 
-	if (m->state == MBR_JOINED)
+	if (m->state == MBR_ACTIVE)
 		adv = ADV_ACTIVE - m->advertised;
+	else if (m->state == MBR_JOINED || m->state == MBR_PENDING)
+		adv = ADV_IDLE - m->advertised;
 
 	hdr = buf_msg(skb);
 
@@ -576,6 +648,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
 		m->advertised += adv;
 	} else if (mtyp == GRP_ACK_MSG) {
 		msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt);
+	} else if (mtyp == GRP_REMIT_MSG) {
+		msg_set_grp_remitted(hdr, m->window);
 	}
 	__skb_queue_tail(xmitq, skb);
 }
@@ -584,10 +658,11 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
 			  struct tipc_msg *hdr, struct sk_buff_head *inputq,
 			  struct sk_buff_head *xmitq)
 {
-	struct tipc_member *m;
+	struct tipc_member *m, *pm;
 	struct tipc_msg *ehdr;
 	u32 node = msg_orignode(hdr);
 	u32 port = msg_origport(hdr);
+	u16 remitted, in_flight;
 
 	if (!grp)
 		return;
@@ -629,6 +704,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
 
 		/* Wait until WITHDRAW event is received */
 		if (m->state != MBR_LEAVING) {
+			tipc_group_decr_active(grp, m);
 			m->state = MBR_LEAVING;
 			return;
 		}
@@ -656,6 +732,48 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
 		*usr_wakeup = true;
 		m->usr_pending = false;
 		return;
+	case GRP_RECLAIM_MSG:
+		if (!m)
+			return;
+		*usr_wakeup = m->usr_pending;
+		m->usr_pending = false;
+		tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq);
+		m->window = ADV_IDLE;
+		return;
+	case GRP_REMIT_MSG:
+		if (!m || m->state != MBR_RECLAIMING)
+			return;
+
+		list_del_init(&m->list);
+		grp->active_cnt--;
+		remitted = msg_grp_remitted(hdr);
+
+		/* Messages preceding the REMIT still in receive queue */
+		if (m->advertised > remitted) {
+			m->state = MBR_REMITTED;
+			in_flight = m->advertised - remitted;
+		}
+		/* All messages preceding the REMIT have been read */
+		if (m->advertised <= remitted) {
+			m->state = MBR_JOINED;
+			in_flight = 0;
+		}
+		/* ..and the REMIT overtaken by more messages => re-advertise */
+		if (m->advertised < remitted)
+			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
+
+		m->advertised = ADV_IDLE + in_flight;
+
+		/* Set oldest pending member to active and advertise */
+		if (list_empty(&grp->pending))
+			return;
+		pm = list_first_entry(&grp->pending, struct tipc_member, list);
+		pm->state = MBR_ACTIVE;
+		list_move_tail(&pm->list, &grp->active);
+		grp->active_cnt++;
+		if (pm->advertised <= (ADV_ACTIVE * 3 / 4))
+			tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
+		return;
 	default:
 		pr_warn("Received unknown GROUP_PROTO message\n");
 	}
@@ -738,6 +856,7 @@ void tipc_group_member_evt(struct tipc_group *grp,
 		/* Hold back event if more messages might be expected */
 		if (m->state != MBR_LEAVING && node_up) {
 			m->event_msg = skb;
+			tipc_group_decr_active(grp, m);
 			m->state = MBR_LEAVING;
 		} else {
 			if (node_up)
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 1c088a1..65258c3 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -548,6 +548,8 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n)
 #define GRP_LEAVE_MSG        1
 #define GRP_ADV_MSG          2
 #define GRP_ACK_MSG          3
+#define GRP_RECLAIM_MSG      4
+#define GRP_REMIT_MSG        5
 
 /*
  * Word 1
@@ -850,6 +852,16 @@ static inline void msg_set_grp_bc_acked(struct tipc_msg *m, u16 n)
 	msg_set_bits(m, 9, 16, 0xffff, n);
 }
 
+static inline u16 msg_grp_remitted(struct tipc_msg *m)
+{
+	return msg_bits(m, 9, 16, 0xffff);
+}
+
+static inline void msg_set_grp_remitted(struct tipc_msg *m, u16 n)
+{
+	msg_set_bits(m, 9, 16, 0xffff, n);
+}
+
 /* Word 10
  */
 static inline u16 msg_grp_evt(struct tipc_msg *m)
-- 
2.1.4

^ permalink raw reply related	[flat|nested] 22+ messages in thread

* Re: [net-next 01/18] tipc: add ability to order and receive topology events in driver
  2017-10-12 14:02 ` [net-next 01/18] tipc: add ability to order and receive topology events in driver Jon Maloy
@ 2017-10-12 18:54   ` David Miller
  0 siblings, 0 replies; 22+ messages in thread
From: David Miller @ 2017-10-12 18:54 UTC (permalink / raw)
  To: jon.maloy; +Cc: netdev, parthasarathy.bhuvaragan, ying.xue, tipc-discussion

From: Jon Maloy <jon.maloy@ericsson.com>
Date: Thu, 12 Oct 2017 16:02:22 +0200

> @@ -288,6 +289,7 @@ static int tipc_receive_from_sock(struct tipc_conn *con)
>  }
>  
>  static int tipc_accept_from_sock(struct tipc_conn *con)
> +
>  {

This whitespace change seems unintentional.

> +bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
> +			     u32 lower, u32 upper, int *conid)
> +{
> +	struct tipc_server *s;
> +	struct tipc_conn *con;
> +	struct tipc_subscriber *scbr;
> +	struct tipc_subscr sub;

Please order local variables from longest to shortest line.

>  static void tipc_send_to_sock(struct tipc_conn *con)
>  {
>  	int count = 0;
>  	struct tipc_server *s = con->server;
> +	struct tipc_event *evt;
>  	struct outqueue_entry *e;
>  	struct msghdr msg;
>  	int ret;

Likewise.

^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [net-next 04/18] tipc: refactor function filter_rcv()
  2017-10-12 14:02 ` [net-next 04/18] tipc: refactor function filter_rcv() Jon Maloy
@ 2017-10-12 18:56   ` David Miller
  0 siblings, 0 replies; 22+ messages in thread
From: David Miller @ 2017-10-12 18:56 UTC (permalink / raw)
  To: jon.maloy; +Cc: netdev, parthasarathy.bhuvaragan, ying.xue, tipc-discussion

From: Jon Maloy <jon.maloy@ericsson.com>
Date: Thu, 12 Oct 2017 16:02:25 +0200

> +static void tipc_sk_proto_rcv(struct sock *sk,
> +			      struct sk_buff_head *inputq,
> +			      struct sk_buff_head *xmitq)
> +{
> +	struct tipc_sock *tsk = tipc_sk(sk);
> +	struct sk_buff *skb = __skb_dequeue(inputq);
> +	struct tipc_msg *hdr = buf_msg(skb);

Please order local variables from longest to shortest line.

^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [net-next 06/18] tipc: improve destination linked list
  2017-10-12 14:02 ` [net-next 06/18] tipc: improve destination linked list Jon Maloy
@ 2017-10-12 18:57   ` David Miller
  0 siblings, 0 replies; 22+ messages in thread
From: David Miller @ 2017-10-12 18:57 UTC (permalink / raw)
  To: jon.maloy; +Cc: netdev, parthasarathy.bhuvaragan, ying.xue, tipc-discussion

From: Jon Maloy <jon.maloy@ericsson.com>
Date: Thu, 12 Oct 2017 16:02:27 +0200

> @@ -259,19 +259,19 @@ static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
>  			   struct tipc_nlist *dests, u16 *cong_link_cnt)
>  {
>  	struct sk_buff_head _pkts;
> -	struct u32_item *n, *tmp;
> -	u32 dst, selector;
> +	struct tipc_dest *dst, *tmp;
> +	u32 dnode, selector;

Order local variables from longest to shortest line please.

> -bool u32_find(struct list_head *l, u32 value)
> +struct tipc_dest *tipc_dest_find(struct list_head *l, u32 node, u32 port)
>  {
> -	struct u32_item *item;
> +	struct tipc_dest *dst;
> +	u64 value = (u64)node << 32 | port;

Likewise.

> -bool u32_push(struct list_head *l, u32 value)
> +bool tipc_dest_push(struct list_head *l, u32 node, u32 port)
>  {
> -	struct u32_item *item;
> +	struct tipc_dest *dst;
> +	u64 value = (u64)node << 32 | port;

Likewise.

You're probably tired of hearing me say this, so I'll just ask that you
audit the rest of your patch series for this problem as well.

Thank you :-)

^ permalink raw reply	[flat|nested] 22+ messages in thread

end of thread, other threads:[~2017-10-12 18:57 UTC | newest]

Thread overview: 22+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2017-10-12 14:02 [net-next 00/18] tipc: Introduce Communcation Group feature Jon Maloy
2017-10-12 14:02 ` [net-next 01/18] tipc: add ability to order and receive topology events in driver Jon Maloy
2017-10-12 18:54   ` David Miller
2017-10-12 14:02 ` [net-next 02/18] tipc: improve address sanity check in tipc_connect() Jon Maloy
2017-10-12 14:02 ` [net-next 03/18] tipc: add ability to obtain node availability status from other files Jon Maloy
2017-10-12 14:02 ` [net-next 04/18] tipc: refactor function filter_rcv() Jon Maloy
2017-10-12 18:56   ` David Miller
2017-10-12 14:02 ` [net-next 05/18] tipc: add new function for sending multiple small messages Jon Maloy
2017-10-12 14:02 ` [net-next 06/18] tipc: improve destination linked list Jon Maloy
2017-10-12 18:57   ` David Miller
2017-10-12 14:02 ` [net-next 07/18] tipc: introduce communication groups Jon Maloy
2017-10-12 14:02 ` [net-next 08/18] tipc: add second source address to recvmsg()/recvfrom() Jon Maloy
2017-10-12 14:02 ` [net-next 09/18] tipc: receive group membership events via member socket Jon Maloy
2017-10-12 14:02 ` [net-next 10/18] tipc: introduce flow control for group broadcast messages Jon Maloy
2017-10-12 14:02 ` [net-next 11/18] tipc: introduce group unicast messaging Jon Maloy
2017-10-12 14:02 ` [net-next 12/18] tipc: introduce group anycast messaging Jon Maloy
2017-10-12 14:02 ` [net-next 13/18] tipc: introduce group multicast messaging Jon Maloy
2017-10-12 14:02 ` [net-next 14/18] tipc: guarantee group unicast doesn't bypass group broadcast Jon Maloy
2017-10-12 14:02 ` [net-next 15/18] tipc: guarantee that group broadcast doesn't bypass group unicast Jon Maloy
2017-10-12 14:02 ` [net-next 16/18] tipc: guarantee delivery of UP event before first broadcast Jon Maloy
2017-10-12 14:02 ` [net-next 17/18] tipc: guarantee delivery of last broadcast before DOWN event Jon Maloy
2017-10-12 14:02 ` [net-next 18/18] tipc: add multipoint-to-point flow control Jon Maloy

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.