netdev.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH net-next 0/7] tipc: multicast and internal users to new send functions
@ 2014-07-15  3:54 Jon Maloy
  2014-07-15  3:54 ` [PATCH net-next 1/7] tipc: make name table distributor use new send function Jon Maloy
                   ` (6 more replies)
  0 siblings, 7 replies; 12+ messages in thread
From: Jon Maloy @ 2014-07-15  3:54 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

We move the remaining data transmit users: multicast, name table
distributor, and link internal protocols to use the new data
transmission framework introduced in a previous commit series
("tipc: new unicast transmission code").

Finally, we remove the code obsoleted by the new functions.

Jon Maloy (7):
  tipc: make name table distributor use new send function
  tipc: let internal link users call the new link send function
  tipc: add new functions for multicast and broadcast distribution
  tipc: start using the new multicast functions
  tipc: remove unreferenced functions
  tipc: rename temporarily named functions
  tipc: ensure sequential message delivery across dual bearers

 net/tipc/bcast.c      |   82 +++++++------
 net/tipc/bcast.h      |    5 +-
 net/tipc/link.c       |  307 ++-----------------------------------------------
 net/tipc/link.h       |    9 +-
 net/tipc/msg.c        |   79 +++++++------
 net/tipc/msg.h        |    9 +-
 net/tipc/name_distr.c |   76 ++++++------
 net/tipc/name_distr.h |    2 +-
 net/tipc/node.c       |   13 +--
 net/tipc/port.c       |  122 +-------------------
 net/tipc/port.h       |   10 --
 net/tipc/socket.c     |  148 ++++++++++++++++--------
 net/tipc/socket.h     |    2 +
 13 files changed, 263 insertions(+), 601 deletions(-)

-- 
1.7.9.5

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [PATCH net-next 1/7] tipc: make name table distributor use new send function
  2014-07-15  3:54 [PATCH net-next 0/7] tipc: multicast and internal users to new send functions Jon Maloy
@ 2014-07-15  3:54 ` Jon Maloy
  2014-07-15  3:54 ` [PATCH net-next 2/7] tipc: let internal link users call the new link " Jon Maloy
                   ` (5 subsequent siblings)
  6 siblings, 0 replies; 12+ messages in thread
From: Jon Maloy @ 2014-07-15  3:54 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

In a previous commit series ("tipc: new unicast transmission code")
we introduced a new message sending function, tipc_link_xmit2(),
and moved the unicast data users over to use that function. We now
let the internal name table distributor do the same.

The interaction between the name distributor and the node/link
layer also becomes significantly simpler, so we can eliminate
the function tipc_link_names_xmit().

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/link.c       |   41 --------------------------
 net/tipc/link.h       |    1 -
 net/tipc/name_distr.c |   76 ++++++++++++++++++++++++++++---------------------
 net/tipc/name_distr.h |    2 +-
 net/tipc/node.c       |   13 ++-------
 5 files changed, 48 insertions(+), 85 deletions(-)

diff --git a/net/tipc/link.c b/net/tipc/link.c
index a235b24..367b0f5 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1033,47 +1033,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
 }
 
 /*
- * tipc_link_names_xmit - send name table entries to new neighbor
- *
- * Send routine for bulk delivery of name table messages when contact
- * with a new neighbor occurs. No link congestion checking is performed
- * because name table messages *must* be delivered. The messages must be
- * small enough not to require fragmentation.
- * Called without any locks held.
- */
-void tipc_link_names_xmit(struct list_head *message_list, u32 dest)
-{
-	struct tipc_node *n_ptr;
-	struct tipc_link *l_ptr;
-	struct sk_buff *buf;
-	struct sk_buff *temp_buf;
-
-	if (list_empty(message_list))
-		return;
-
-	n_ptr = tipc_node_find(dest);
-	if (n_ptr) {
-		tipc_node_lock(n_ptr);
-		l_ptr = n_ptr->active_links[0];
-		if (l_ptr) {
-			/* convert circular list to linear list */
-			((struct sk_buff *)message_list->prev)->next = NULL;
-			link_add_chain_to_outqueue(l_ptr,
-				(struct sk_buff *)message_list->next, 0);
-			tipc_link_push_queue(l_ptr);
-			INIT_LIST_HEAD(message_list);
-		}
-		tipc_node_unlock(n_ptr);
-	}
-
-	/* discard the messages if they couldn't be sent */
-	list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) {
-		list_del((struct list_head *)buf);
-		kfree_skb(buf);
-	}
-}
-
-/*
  * tipc_link_push_packet: Push one unsent packet to the media
  */
 static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 227ff81..04a59c5 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -228,7 +228,6 @@ void tipc_link_reset(struct tipc_link *l_ptr);
 void tipc_link_reset_list(unsigned int bearer_id);
 int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector);
 int tipc_link_xmit2(struct sk_buff *buf, u32 dest, u32 selector);
-void tipc_link_names_xmit(struct list_head *message_list, u32 dest);
 int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
 int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf);
 int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 8ce7309..d16f947 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -101,24 +101,22 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest)
 
 void named_cluster_distribute(struct sk_buff *buf)
 {
-	struct sk_buff *buf_copy;
-	struct tipc_node *n_ptr;
-	struct tipc_link *l_ptr;
+	struct sk_buff *obuf;
+	struct tipc_node *node;
+	u32 dnode;
 
 	rcu_read_lock();
-	list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
-		tipc_node_lock(n_ptr);
-		l_ptr = n_ptr->active_links[n_ptr->addr & 1];
-		if (l_ptr) {
-			buf_copy = skb_copy(buf, GFP_ATOMIC);
-			if (!buf_copy) {
-				tipc_node_unlock(n_ptr);
-				break;
-			}
-			msg_set_destnode(buf_msg(buf_copy), n_ptr->addr);
-			__tipc_link_xmit(l_ptr, buf_copy);
-		}
-		tipc_node_unlock(n_ptr);
+	list_for_each_entry_rcu(node, &tipc_node_list, list) {
+		dnode = node->addr;
+		if (in_own_node(dnode))
+			continue;
+		if (!tipc_node_active_links(node))
+			continue;
+		obuf = skb_copy(buf, GFP_ATOMIC);
+		if (!obuf)
+			break;
+		msg_set_destnode(buf_msg(obuf), dnode);
+		tipc_link_xmit2(obuf, dnode, dnode);
 	}
 	rcu_read_unlock();
 
@@ -175,34 +173,44 @@ struct sk_buff *tipc_named_withdraw(struct publication *publ)
 	return buf;
 }
 
-/*
+/**
  * named_distribute - prepare name info for bulk distribution to another node
+ * @msg_list: list of messages (buffers) to be returned from this function
+ * @dnode: node to be updated
+ * @pls: linked list of publication items to be packed into buffer chain
  */
-static void named_distribute(struct list_head *message_list, u32 node,
-			     struct publ_list *pls, u32 max_item_buf)
+static void named_distribute(struct list_head *msg_list, u32 dnode,
+			     struct publ_list *pls)
 {
 	struct publication *publ;
 	struct sk_buff *buf = NULL;
 	struct distr_item *item = NULL;
-	u32 left = 0;
-	u32 rest = pls->size * ITEM_SIZE;
+	uint dsz = pls->size * ITEM_SIZE;
+	uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE;
+	uint rem = dsz;
+	uint msg_rem = 0;
 
 	list_for_each_entry(publ, &pls->list, local_list) {
+		/* Prepare next buffer: */
 		if (!buf) {
-			left = (rest <= max_item_buf) ? rest : max_item_buf;
-			rest -= left;
-			buf = named_prepare_buf(PUBLICATION, left, node);
+			msg_rem = min_t(uint, rem, msg_dsz);
+			rem -= msg_rem;
+			buf = named_prepare_buf(PUBLICATION, msg_rem, dnode);
 			if (!buf) {
 				pr_warn("Bulk publication failure\n");
 				return;
 			}
 			item = (struct distr_item *)msg_data(buf_msg(buf));
 		}
+
+		/* Pack publication into message: */
 		publ_to_item(item, publ);
 		item++;
-		left -= ITEM_SIZE;
-		if (!left) {
-			list_add_tail((struct list_head *)buf, message_list);
+		msg_rem -= ITEM_SIZE;
+
+		/* Append full buffer to list: */
+		if (!msg_rem) {
+			list_add_tail((struct list_head *)buf, msg_list);
 			buf = NULL;
 		}
 	}
@@ -211,16 +219,20 @@ static void named_distribute(struct list_head *message_list, u32 node,
 /**
  * tipc_named_node_up - tell specified node about all publications by this node
  */
-void tipc_named_node_up(u32 max_item_buf, u32 node)
+void tipc_named_node_up(u32 dnode)
 {
-	LIST_HEAD(message_list);
+	LIST_HEAD(msg_list);
+	struct sk_buff *buf_chain;
 
 	read_lock_bh(&tipc_nametbl_lock);
-	named_distribute(&message_list, node, &publ_cluster, max_item_buf);
-	named_distribute(&message_list, node, &publ_zone, max_item_buf);
+	named_distribute(&msg_list, dnode, &publ_cluster);
+	named_distribute(&msg_list, dnode, &publ_zone);
 	read_unlock_bh(&tipc_nametbl_lock);
 
-	tipc_link_names_xmit(&message_list, node);
+	/* Convert circular list to linear list and send: */
+	buf_chain = (struct sk_buff *)msg_list.next;
+	((struct sk_buff *)msg_list.prev)->next = NULL;
+	tipc_link_xmit2(buf_chain, dnode, dnode);
 }
 
 /**
diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h
index b2eed4e..8afe32b 100644
--- a/net/tipc/name_distr.h
+++ b/net/tipc/name_distr.h
@@ -70,7 +70,7 @@ struct distr_item {
 struct sk_buff *tipc_named_publish(struct publication *publ);
 struct sk_buff *tipc_named_withdraw(struct publication *publ);
 void named_cluster_distribute(struct sk_buff *buf);
-void tipc_named_node_up(u32 max_item_buf, u32 node);
+void tipc_named_node_up(u32 dnode);
 void tipc_named_rcv(struct sk_buff *buf);
 void tipc_named_reinit(void);
 
diff --git a/net/tipc/node.c b/net/tipc/node.c
index d959343..f706929 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -474,8 +474,6 @@ int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len)
 void tipc_node_unlock(struct tipc_node *node)
 {
 	LIST_HEAD(nsub_list);
-	struct tipc_link *link;
-	int pkt_sz = 0;
 	u32 addr = 0;
 
 	if (likely(!node->action_flags)) {
@@ -488,18 +486,13 @@ void tipc_node_unlock(struct tipc_node *node)
 		node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN;
 	}
 	if (node->action_flags & TIPC_NOTIFY_NODE_UP) {
-		link = node->active_links[0];
 		node->action_flags &= ~TIPC_NOTIFY_NODE_UP;
-		if (link) {
-			pkt_sz = ((link->max_pkt - INT_H_SIZE) / ITEM_SIZE) *
-				  ITEM_SIZE;
-			addr = node->addr;
-		}
+		addr = node->addr;
 	}
 	spin_unlock_bh(&node->lock);
 
 	if (!list_empty(&nsub_list))
 		tipc_nodesub_notify(&nsub_list);
-	if (pkt_sz)
-		tipc_named_node_up(pkt_sz, addr);
+	if (addr)
+		tipc_named_node_up(addr);
 }
-- 
1.7.9.5

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH net-next 2/7] tipc: let internal link users call the new link send function
  2014-07-15  3:54 [PATCH net-next 0/7] tipc: multicast and internal users to new send functions Jon Maloy
  2014-07-15  3:54 ` [PATCH net-next 1/7] tipc: make name table distributor use new send function Jon Maloy
@ 2014-07-15  3:54 ` Jon Maloy
  2014-07-15  3:54 ` [PATCH net-next 3/7] tipc: add new functions for multicast and broadcast distribution Jon Maloy
                   ` (4 subsequent siblings)
  6 siblings, 0 replies; 12+ messages in thread
From: Jon Maloy @ 2014-07-15  3:54 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

We convert the link internal users (changeover protocol, broadcast
synchronization) to use the new packet send function.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/link.c |   15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)

diff --git a/net/tipc/link.c b/net/tipc/link.c
index 367b0f5..d1255ba 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -999,7 +999,7 @@ int tipc_link_xmit2(struct sk_buff *buf, u32 dnode, u32 selector)
  *
  * Called with node locked
  */
-static void tipc_link_sync_xmit(struct tipc_link *l)
+static void tipc_link_sync_xmit(struct tipc_link *link)
 {
 	struct sk_buff *buf;
 	struct tipc_msg *msg;
@@ -1009,10 +1009,9 @@ static void tipc_link_sync_xmit(struct tipc_link *l)
 		return;
 
 	msg = buf_msg(buf);
-	tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr);
-	msg_set_last_bcast(msg, l->owner->bclink.acked);
-	link_add_chain_to_outqueue(l, buf, 0);
-	tipc_link_push_queue(l);
+	tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr);
+	msg_set_last_bcast(msg, link->owner->bclink.acked);
+	__tipc_link_xmit2(link, buf);
 }
 
 /*
@@ -1859,7 +1858,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
 	}
 	skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
 	skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
-	__tipc_link_xmit(tunnel, buf);
+	__tipc_link_xmit2(tunnel, buf);
 }
 
 
@@ -1892,7 +1891,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
 		if (buf) {
 			skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
 			msg_set_size(&tunnel_hdr, INT_H_SIZE);
-			__tipc_link_xmit(tunnel, buf);
+			__tipc_link_xmit2(tunnel, buf);
 		} else {
 			pr_warn("%sunable to send changeover msg\n",
 				link_co_err);
@@ -1965,7 +1964,7 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
 		skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);
 		skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,
 					       length);
-		__tipc_link_xmit(tunnel, outbuf);
+		__tipc_link_xmit2(tunnel, outbuf);
 		if (!tipc_link_is_up(l_ptr))
 			return;
 		iter = iter->next;
-- 
1.7.9.5

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH net-next 3/7] tipc: add new functions for multicast and broadcast distribution
  2014-07-15  3:54 [PATCH net-next 0/7] tipc: multicast and internal users to new send functions Jon Maloy
  2014-07-15  3:54 ` [PATCH net-next 1/7] tipc: make name table distributor use new send function Jon Maloy
  2014-07-15  3:54 ` [PATCH net-next 2/7] tipc: let internal link users call the new link " Jon Maloy
@ 2014-07-15  3:54 ` Jon Maloy
  2014-07-15  3:54 ` [PATCH net-next 4/7] tipc: start using the new multicast functions Jon Maloy
                   ` (3 subsequent siblings)
  6 siblings, 0 replies; 12+ messages in thread
From: Jon Maloy @ 2014-07-15  3:54 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

We add a new broadcast link transmit function in bclink.c and a new
receive function in socket.c. The purpose is to move the branching
between external and internal destination down to the link layer,
just as we have done with unicast in earlier commits. We also make
use of the new link-independent fragmentation support that was
introduced in an earlier commit series.

This gives a shorter and simpler code path, and makes it possible
to obtain copy-free buffer delivery to all node local destination
sockets.

The new transmission code is added in parallel with the existing one,
and will be used by the socket multicast send function in the next
commit in this series.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/bcast.c  |   50 +++++++++++++++++++++++++++++++++++++++++++++++++-
 net/tipc/bcast.h  |    4 +++-
 net/tipc/msg.c    |   38 ++++++++++++++++++++++++++++++++++++++
 net/tipc/msg.h    |    2 ++
 net/tipc/socket.c |   40 ++++++++++++++++++++++++++++++++++++++++
 net/tipc/socket.h |    2 ++
 6 files changed, 134 insertions(+), 2 deletions(-)

diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 2663167..80f5a51 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -1,7 +1,7 @@
 /*
  * net/tipc/bcast.c: TIPC broadcast code
  *
- * Copyright (c) 2004-2006, Ericsson AB
+ * Copyright (c) 2004-2006, 2014, Ericsson AB
  * Copyright (c) 2004, Intel Corporation.
  * Copyright (c) 2005, 2010-2011, Wind River Systems
  * All rights reserved.
@@ -38,6 +38,8 @@
 #include "core.h"
 #include "link.h"
 #include "port.h"
+#include "socket.h"
+#include "msg.h"
 #include "bcast.h"
 #include "name_distr.h"
 
@@ -138,6 +140,11 @@ static void tipc_bclink_unlock(void)
 		tipc_link_reset_all(node);
 }
 
+uint  tipc_bclink_get_mtu(void)
+{
+	return MAX_PKT_DEFAULT_MCAST;
+}
+
 void tipc_bclink_set_flags(unsigned int flags)
 {
 	bclink->flags |= flags;
@@ -408,6 +415,47 @@ exit:
 	return res;
 }
 
+/* tipc_bclink_xmit2 - broadcast buffer chain to all nodes in cluster
+ *                     and to identified node local sockets
+ * @buf: chain of buffers containing message
+ * Consumes the buffer chain, except when returning -ELINKCONG
+ * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
+ */
+int tipc_bclink_xmit2(struct sk_buff *buf)
+{
+	int rc = 0;
+	struct sk_buff *clbuf;
+
+	/* Prepare clone of message for local node */
+	clbuf = tipc_msg_reassemble(buf);
+	if (unlikely(!clbuf)) {
+		kfree_skb_list(buf);
+		return -EHOSTUNREACH;
+	}
+
+	/* Broadcast to all other nodes */
+	if (likely(bclink)) {
+		tipc_bclink_lock();
+		if (likely(bclink->bcast_nodes.count)) {
+			rc = __tipc_link_xmit(bcl, buf);
+			if (likely(!rc)) {
+				bclink_set_last_sent();
+				bcl->stats.queue_sz_counts++;
+				bcl->stats.accu_queue_sz += bcl->out_queue_size;
+			}
+		}
+		tipc_bclink_unlock();
+	}
+
+	/* Deliver message clone */
+	if (likely(!rc))
+		tipc_sk_mcast_rcv(clbuf);
+	else
+		kfree_skb(clbuf);
+
+	return rc;
+}
+
 /**
  * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
  *
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 00330c4..d90645f 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -1,7 +1,7 @@
 /*
  * net/tipc/bcast.h: Include file for TIPC broadcast code
  *
- * Copyright (c) 2003-2006, Ericsson AB
+ * Copyright (c) 2003-2006, 2014, Ericsson AB
  * Copyright (c) 2005, 2010-2011, Wind River Systems
  * All rights reserved.
  *
@@ -98,5 +98,7 @@ int  tipc_bclink_stats(char *stats_buf, const u32 buf_size);
 int  tipc_bclink_reset_stats(void);
 int  tipc_bclink_set_queue_limits(u32 limit);
 void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action);
+uint  tipc_bclink_get_mtu(void);
+int tipc_bclink_xmit2(struct sk_buff *buf);
 
 #endif
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 6ec9584..4662b16 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -412,3 +412,41 @@ int tipc_msg_eval(struct sk_buff *buf, u32 *dnode)
 	msg_set_destport(msg, dport);
 	return TIPC_OK;
 }
+
+/* tipc_msg_reassemble() - clone a buffer chain of fragments and
+ *                         reassemble the clones into one message
+ */
+struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain)
+{
+	struct sk_buff *buf = chain;
+	struct sk_buff *frag = buf;
+	struct sk_buff *head = NULL;
+	int hdr_sz;
+
+	/* Copy header if single buffer */
+	if (!buf->next) {
+		hdr_sz = skb_headroom(buf) + msg_hdr_sz(buf_msg(buf));
+		head = __pskb_copy(buf, hdr_sz, GFP_ATOMIC);
+		if (likely(head))
+			return head;
+		return NULL;
+	}
+
+	/* Clone all fragments and reassemble */
+	while (buf) {
+		frag = skb_clone(buf, GFP_ATOMIC);
+		if (!frag)
+			goto error;
+		frag->next = NULL;
+		if (tipc_buf_append(&head, &frag))
+			break;
+		if (!head)
+			goto error;
+		buf = buf->next;
+	}
+	return frag;
+error:
+	pr_warn("Failed do clone local mcast rcv buffer\n");
+	kfree_skb(head);
+	return NULL;
+}
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 7d57434..a15d596 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -744,4 +744,6 @@ bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode);
 int tipc_msg_build2(struct tipc_msg *mhdr, struct iovec const *iov,
 		    int offset, int dsz, int mtu , struct sk_buff **chain);
 
+struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain);
+
 #endif
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 8c5600c..d16550f 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -534,6 +534,46 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
 	return mask;
 }
 
+/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
+ */
+void tipc_sk_mcast_rcv(struct sk_buff *buf)
+{
+	struct tipc_msg *msg = buf_msg(buf);
+	struct tipc_port_list dports = {0, NULL, };
+	struct tipc_port_list *item;
+	struct sk_buff *b;
+	uint i, last, dst = 0;
+	u32 scope = TIPC_CLUSTER_SCOPE;
+
+	if (in_own_node(msg_orignode(msg)))
+		scope = TIPC_NODE_SCOPE;
+
+	/* Create destination port list: */
+	tipc_nametbl_mc_translate(msg_nametype(msg),
+				  msg_namelower(msg),
+				  msg_nameupper(msg),
+				  scope,
+				  &dports);
+	last = dports.count;
+	if (!last) {
+		kfree_skb(buf);
+		return;
+	}
+
+	for (item = &dports; item; item = item->next) {
+		for (i = 0; i < PLSIZE && ++dst <= last; i++) {
+			b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf;
+			if (!b) {
+				pr_warn("Failed do clone mcast rcv buffer\n");
+				continue;
+			}
+			msg_set_destport(msg, item->ports[i]);
+			tipc_sk_rcv(b);
+		}
+	}
+	tipc_port_list_free(&dports);
+}
+
 /**
  * tipc_sk_proto_rcv - receive a connection mng protocol message
  * @tsk: receiving socket
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 2cdede9..43b75b3 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -85,4 +85,6 @@ static inline int tipc_sk_conn_cong(struct tipc_sock *tsk)
 
 int tipc_sk_rcv(struct sk_buff *buf);
 
+void tipc_sk_mcast_rcv(struct sk_buff *buf);
+
 #endif
-- 
1.7.9.5

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH net-next 4/7] tipc: start using the new multicast functions
  2014-07-15  3:54 [PATCH net-next 0/7] tipc: multicast and internal users to new send functions Jon Maloy
                   ` (2 preceding siblings ...)
  2014-07-15  3:54 ` [PATCH net-next 3/7] tipc: add new functions for multicast and broadcast distribution Jon Maloy
@ 2014-07-15  3:54 ` Jon Maloy
  2014-07-15  3:54 ` [PATCH net-next 5/7] tipc: remove unreferenced functions Jon Maloy
                   ` (2 subsequent siblings)
  6 siblings, 0 replies; 12+ messages in thread
From: Jon Maloy @ 2014-07-15  3:54 UTC (permalink / raw)
  To: davem; +Cc: Jon Maloy, erik.hugne, netdev, Paul Gortmaker, tipc-discussion

In this commit, we convert the socket multicast send function to
directly call the new multicast/broadcast function (tipc_bclink_xmit2())
introduced in the previous commit. We do this instead of letting the
call go via the now obsolete tipc_port_mcast_xmit(), hence saving
a call level and some code complexity.

We also remove the initial destination lookup at the message sending
side, and replace that with an unconditional lookup at the receiving
side, including on the sending node itself. This makes the destination
lookup and message transfer more uniform than before.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/bcast.c  |    7 ++---
 net/tipc/socket.c |   90 +++++++++++++++++++++++++++++++----------------------
 2 files changed, 56 insertions(+), 41 deletions(-)

diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 80f5a51..d1d39fa 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -491,7 +491,7 @@ void tipc_bclink_rcv(struct sk_buff *buf)
 	struct tipc_node *node;
 	u32 next_in;
 	u32 seqno;
-	int deferred;
+	int deferred = 0;
 
 	/* Screen out unwanted broadcast messages */
 
@@ -542,7 +542,7 @@ receive:
 			tipc_bclink_unlock();
 			tipc_node_unlock(node);
 			if (likely(msg_mcast(msg)))
-				tipc_port_mcast_rcv(buf, NULL);
+				tipc_sk_mcast_rcv(buf);
 			else
 				kfree_skb(buf);
 		} else if (msg_user(msg) == MSG_BUNDLER) {
@@ -620,8 +620,7 @@ receive:
 		node->bclink.deferred_size += deferred;
 		bclink_update_last_sent(node, seqno);
 		buf = NULL;
-	} else
-		deferred = 0;
+	}
 
 	tipc_bclink_lock();
 
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index d16550f..ae3d716 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -53,6 +53,7 @@ static void tipc_data_ready(struct sock *sk);
 static void tipc_write_space(struct sock *sk);
 static int tipc_release(struct socket *sock);
 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
+static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
 
 static const struct proto_ops packet_ops;
 static const struct proto_ops stream_ops;
@@ -534,6 +535,58 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
 	return mask;
 }
 
+/**
+ * tipc_sendmcast - send multicast message
+ * @sock: socket structure
+ * @seq: destination address
+ * @iov: message data to send
+ * @dsz: total length of message data
+ * @timeo: timeout to wait for wakeup
+ *
+ * Called from function tipc_sendmsg(), which has done all sanity checks
+ * Returns the number of bytes sent on success, or errno
+ */
+static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
+			  struct iovec *iov, size_t dsz, long timeo)
+{
+	struct sock *sk = sock->sk;
+	struct tipc_msg *mhdr = &tipc_sk(sk)->port.phdr;
+	struct sk_buff *buf;
+	uint mtu;
+	int rc;
+
+	msg_set_type(mhdr, TIPC_MCAST_MSG);
+	msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
+	msg_set_destport(mhdr, 0);
+	msg_set_destnode(mhdr, 0);
+	msg_set_nametype(mhdr, seq->type);
+	msg_set_namelower(mhdr, seq->lower);
+	msg_set_nameupper(mhdr, seq->upper);
+	msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
+
+new_mtu:
+	mtu = tipc_bclink_get_mtu();
+	rc = tipc_msg_build2(mhdr, iov, 0, dsz, mtu, &buf);
+	if (unlikely(rc < 0))
+		return rc;
+
+	do {
+		rc = tipc_bclink_xmit(buf);
+		if (likely(rc >= 0)) {
+			rc = dsz;
+			break;
+		}
+		if (rc == -EMSGSIZE)
+			goto new_mtu;
+		if (rc != -ELINKCONG)
+			break;
+		rc = tipc_wait_for_sndmsg(sock, &timeo);
+		if (rc)
+			kfree_skb_list(buf);
+	} while (!rc);
+	return rc;
+}
+
 /* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
  */
 void tipc_sk_mcast_rcv(struct sk_buff *buf)
@@ -670,43 +723,6 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
 }
 
 /**
- * tipc_sendmcast - send multicast message
- * @sock: socket structure
- * @seq: destination address
- * @iov: message data to send
- * @dsz: total length of message data
- * @timeo: timeout to wait for wakeup
- *
- * Called from function tipc_sendmsg(), which has done all sanity checks
- * Returns the number of bytes sent on success, or errno
- */
-static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
-			  struct iovec *iov, size_t dsz, long timeo)
-{
-	struct sock *sk = sock->sk;
-	struct tipc_sock *tsk = tipc_sk(sk);
-	int rc;
-
-	do {
-		if (sock->state != SS_READY) {
-			rc = -EOPNOTSUPP;
-			break;
-		}
-		rc = tipc_port_mcast_xmit(&tsk->port, seq, iov, dsz);
-		if (likely(rc >= 0)) {
-			if (sock->state != SS_READY)
-				sock->state = SS_CONNECTING;
-			break;
-		}
-		if (rc != -ELINKCONG)
-			break;
-		rc = tipc_wait_for_sndmsg(sock, &timeo);
-	} while (!rc);
-
-	return rc;
-}
-
-/**
  * tipc_sendmsg - send message in connectionless manner
  * @iocb: if NULL, indicates that socket lock is already held
  * @sock: socket structure
-- 
1.7.9.5


------------------------------------------------------------------------------
Want fast and easy access to all the code in your enterprise? Index and
search up to 200,000 lines of code with a free copy of Black Duck
Code Sight - the same software that powers the world's largest code
search on Ohloh, the Black Duck Open Hub! Try it now.
http://p.sf.net/sfu/bds

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH net-next 5/7] tipc: remove unreferenced functions
  2014-07-15  3:54 [PATCH net-next 0/7] tipc: multicast and internal users to new send functions Jon Maloy
                   ` (3 preceding siblings ...)
  2014-07-15  3:54 ` [PATCH net-next 4/7] tipc: start using the new multicast functions Jon Maloy
@ 2014-07-15  3:54 ` Jon Maloy
  2014-07-15  3:54 ` [PATCH net-next 6/7] tipc: rename temporarily named functions Jon Maloy
  2014-07-15  3:54 ` [PATCH net-next 7/7] tipc: ensure sequential message delivery across dual bearers Jon Maloy
  6 siblings, 0 replies; 12+ messages in thread
From: Jon Maloy @ 2014-07-15  3:54 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

We can now remove a number of functions which have become obsolete
and unreferenced through this commit series. There are no functional
changes in this commit.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/bcast.c |   26 ------
 net/tipc/bcast.h |    1 -
 net/tipc/link.c  |  247 ------------------------------------------------------
 net/tipc/link.h  |    6 --
 net/tipc/msg.c   |   35 --------
 net/tipc/msg.h   |    3 -
 net/tipc/port.c  |  112 -------------------------
 net/tipc/port.h  |   10 ---
 8 files changed, 440 deletions(-)

diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index d1d39fa..4081251 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -389,32 +389,6 @@ static void bclink_peek_nack(struct tipc_msg *msg)
 	tipc_node_unlock(n_ptr);
 }
 
-/*
- * tipc_bclink_xmit - broadcast a packet to all nodes in cluster
- */
-int tipc_bclink_xmit(struct sk_buff *buf)
-{
-	int res;
-
-	tipc_bclink_lock();
-
-	if (!bclink->bcast_nodes.count) {
-		res = msg_data_sz(buf_msg(buf));
-		kfree_skb(buf);
-		goto exit;
-	}
-
-	res = __tipc_link_xmit(bcl, buf);
-	if (likely(res >= 0)) {
-		bclink_set_last_sent();
-		bcl->stats.queue_sz_counts++;
-		bcl->stats.accu_queue_sz += bcl->out_queue_size;
-	}
-exit:
-	tipc_bclink_unlock();
-	return res;
-}
-
 /* tipc_bclink_xmit2 - broadcast buffer chain to all nodes in cluster
  *                     and to identified node local sockets
  * @buf: chain of buffers containing message
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index d90645f..af6b657 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -89,7 +89,6 @@ void tipc_bclink_add_node(u32 addr);
 void tipc_bclink_remove_node(u32 addr);
 struct tipc_node *tipc_bclink_retransmit_to(void);
 void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked);
-int  tipc_bclink_xmit(struct sk_buff *buf);
 void tipc_bclink_rcv(struct sk_buff *buf);
 u32  tipc_bclink_get_last_sent(void);
 u32  tipc_bclink_acks_missing(struct tipc_node *n_ptr);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index d1255ba..28730dd 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -85,7 +85,6 @@ static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
 static void link_state_event(struct tipc_link *l_ptr, u32 event);
 static void link_reset_statistics(struct tipc_link *l_ptr);
 static void link_print(struct tipc_link *l_ptr, const char *str);
-static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
 static void tipc_link_sync_xmit(struct tipc_link *l);
 static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
 static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf);
@@ -679,180 +678,6 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
 	}
 }
 
-/*
- * link_bundle_buf(): Append contents of a buffer to
- * the tail of an existing one.
- */
-static int link_bundle_buf(struct tipc_link *l_ptr, struct sk_buff *bundler,
-			   struct sk_buff *buf)
-{
-	struct tipc_msg *bundler_msg = buf_msg(bundler);
-	struct tipc_msg *msg = buf_msg(buf);
-	u32 size = msg_size(msg);
-	u32 bundle_size = msg_size(bundler_msg);
-	u32 to_pos = align(bundle_size);
-	u32 pad = to_pos - bundle_size;
-
-	if (msg_user(bundler_msg) != MSG_BUNDLER)
-		return 0;
-	if (msg_type(bundler_msg) != OPEN_MSG)
-		return 0;
-	if (skb_tailroom(bundler) < (pad + size))
-		return 0;
-	if (l_ptr->max_pkt < (to_pos + size))
-		return 0;
-
-	skb_put(bundler, pad + size);
-	skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
-	msg_set_size(bundler_msg, to_pos + size);
-	msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
-	kfree_skb(buf);
-	l_ptr->stats.sent_bundled++;
-	return 1;
-}
-
-static void link_add_to_outqueue(struct tipc_link *l_ptr,
-				 struct sk_buff *buf,
-				 struct tipc_msg *msg)
-{
-	u32 ack = mod(l_ptr->next_in_no - 1);
-	u32 seqno = mod(l_ptr->next_out_no++);
-
-	msg_set_word(msg, 2, ((ack << 16) | seqno));
-	msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
-	buf->next = NULL;
-	if (l_ptr->first_out) {
-		l_ptr->last_out->next = buf;
-		l_ptr->last_out = buf;
-	} else
-		l_ptr->first_out = l_ptr->last_out = buf;
-
-	l_ptr->out_queue_size++;
-	if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
-		l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
-}
-
-static void link_add_chain_to_outqueue(struct tipc_link *l_ptr,
-				       struct sk_buff *buf_chain,
-				       u32 long_msgno)
-{
-	struct sk_buff *buf;
-	struct tipc_msg *msg;
-
-	if (!l_ptr->next_out)
-		l_ptr->next_out = buf_chain;
-	while (buf_chain) {
-		buf = buf_chain;
-		buf_chain = buf_chain->next;
-
-		msg = buf_msg(buf);
-		msg_set_long_msgno(msg, long_msgno);
-		link_add_to_outqueue(l_ptr, buf, msg);
-	}
-}
-
-/*
- * tipc_link_xmit() is the 'full path' for messages, called from
- * inside TIPC when the 'fast path' in tipc_send_xmit
- * has failed, and from link_send()
- */
-int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf)
-{
-	struct tipc_msg *msg = buf_msg(buf);
-	u32 size = msg_size(msg);
-	u32 dsz = msg_data_sz(msg);
-	u32 queue_size = l_ptr->out_queue_size;
-	u32 imp = tipc_msg_tot_importance(msg);
-	u32 queue_limit = l_ptr->queue_limit[imp];
-	u32 max_packet = l_ptr->max_pkt;
-
-	/* Match msg importance against queue limits: */
-	if (unlikely(queue_size >= queue_limit)) {
-		if (imp <= TIPC_CRITICAL_IMPORTANCE) {
-			link_schedule_port(l_ptr, msg_origport(msg), size);
-			kfree_skb(buf);
-			return -ELINKCONG;
-		}
-		kfree_skb(buf);
-		if (imp > CONN_MANAGER) {
-			pr_warn("%s<%s>, send queue full", link_rst_msg,
-				l_ptr->name);
-			tipc_link_reset(l_ptr);
-		}
-		return dsz;
-	}
-
-	/* Fragmentation needed ? */
-	if (size > max_packet)
-		return tipc_link_frag_xmit(l_ptr, buf);
-
-	/* Packet can be queued or sent. */
-	if (likely(!link_congested(l_ptr))) {
-		link_add_to_outqueue(l_ptr, buf, msg);
-
-		tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
-		l_ptr->unacked_window = 0;
-		return dsz;
-	}
-	/* Congestion: can message be bundled ? */
-	if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
-	    (msg_user(msg) != MSG_FRAGMENTER)) {
-
-		/* Try adding message to an existing bundle */
-		if (l_ptr->next_out &&
-		    link_bundle_buf(l_ptr, l_ptr->last_out, buf))
-			return dsz;
-
-		/* Try creating a new bundle */
-		if (size <= max_packet * 2 / 3) {
-			struct sk_buff *bundler = tipc_buf_acquire(max_packet);
-			struct tipc_msg bundler_hdr;
-
-			if (bundler) {
-				tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
-					 INT_H_SIZE, l_ptr->addr);
-				skb_copy_to_linear_data(bundler, &bundler_hdr,
-							INT_H_SIZE);
-				skb_trim(bundler, INT_H_SIZE);
-				link_bundle_buf(l_ptr, bundler, buf);
-				buf = bundler;
-				msg = buf_msg(buf);
-				l_ptr->stats.sent_bundles++;
-			}
-		}
-	}
-	if (!l_ptr->next_out)
-		l_ptr->next_out = buf;
-	link_add_to_outqueue(l_ptr, buf, msg);
-	return dsz;
-}
-
-/*
- * tipc_link_xmit(): same as __tipc_link_xmit(), but the link to use
- * has not been selected yet, and the the owner node is not locked
- * Called by TIPC internal users, e.g. the name distributor
- */
-int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector)
-{
-	struct tipc_link *l_ptr;
-	struct tipc_node *n_ptr;
-	int res = -ELINKCONG;
-
-	n_ptr = tipc_node_find(dest);
-	if (n_ptr) {
-		tipc_node_lock(n_ptr);
-		l_ptr = n_ptr->active_links[selector & 1];
-		if (l_ptr)
-			res = __tipc_link_xmit(l_ptr, buf);
-		else
-			kfree_skb(buf);
-		tipc_node_unlock(n_ptr);
-	} else {
-		kfree_skb(buf);
-	}
-	return res;
-}
-
 /* tipc_link_cong: determine return value and how to treat the
  * sent buffer during link congestion.
  * - For plain, errorless user data messages we keep the buffer and
@@ -2123,78 +1948,6 @@ void tipc_link_bundle_rcv(struct sk_buff *buf)
 	kfree_skb(buf);
 }
 
-/*
- *  Fragmentation/defragmentation:
- */
-
-/*
- * tipc_link_frag_xmit: Entry for buffers needing fragmentation.
- * The buffer is complete, inclusive total message length.
- * Returns user data length.
- */
-static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf)
-{
-	struct sk_buff *buf_chain = NULL;
-	struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain;
-	struct tipc_msg *inmsg = buf_msg(buf);
-	struct tipc_msg fragm_hdr;
-	u32 insize = msg_size(inmsg);
-	u32 dsz = msg_data_sz(inmsg);
-	unchar *crs = buf->data;
-	u32 rest = insize;
-	u32 pack_sz = l_ptr->max_pkt;
-	u32 fragm_sz = pack_sz - INT_H_SIZE;
-	u32 fragm_no = 0;
-	u32 destaddr;
-
-	if (msg_short(inmsg))
-		destaddr = l_ptr->addr;
-	else
-		destaddr = msg_destnode(inmsg);
-
-	/* Prepare reusable fragment header: */
-	tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
-		 INT_H_SIZE, destaddr);
-
-	/* Chop up message: */
-	while (rest > 0) {
-		struct sk_buff *fragm;
-
-		if (rest <= fragm_sz) {
-			fragm_sz = rest;
-			msg_set_type(&fragm_hdr, LAST_FRAGMENT);
-		}
-		fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
-		if (fragm == NULL) {
-			kfree_skb(buf);
-			kfree_skb_list(buf_chain);
-			return -ENOMEM;
-		}
-		msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
-		fragm_no++;
-		msg_set_fragm_no(&fragm_hdr, fragm_no);
-		skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
-		skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
-					       fragm_sz);
-		buf_chain_tail->next = fragm;
-		buf_chain_tail = fragm;
-
-		rest -= fragm_sz;
-		crs += fragm_sz;
-		msg_set_type(&fragm_hdr, FRAGMENT);
-	}
-	kfree_skb(buf);
-
-	/* Append chain of fragments to send queue & send them */
-	l_ptr->long_msg_seq_no++;
-	link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
-	l_ptr->stats.sent_fragments += fragm_no;
-	l_ptr->stats.sent_fragmented++;
-	tipc_link_push_queue(l_ptr);
-
-	return dsz;
-}
-
 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
 {
 	if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL))
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 04a59c5..f0ebeb6 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -226,15 +226,9 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area,
 void tipc_link_reset_all(struct tipc_node *node);
 void tipc_link_reset(struct tipc_link *l_ptr);
 void tipc_link_reset_list(unsigned int bearer_id);
-int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector);
 int tipc_link_xmit2(struct sk_buff *buf, u32 dest, u32 selector);
-int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
 int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf);
-int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
 u32 tipc_link_get_max_pkt(u32 dest, u32 selector);
-int tipc_link_iovec_xmit_fast(struct tipc_port *sender,
-			      struct iovec const *msg_sect,
-			      unsigned int len, u32 destnode);
 void tipc_link_bundle_rcv(struct sk_buff *buf);
 void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
 			  u32 gap, u32 tolerance, u32 priority, u32 acked_mtu);
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 4662b16..04e5cad 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -60,41 +60,6 @@ void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
 	msg_set_destnode(m, destnode);
 }
 
-/**
- * tipc_msg_build - create message using specified header and data
- *
- * Note: Caller must not hold any locks in case copy_from_user() is interrupted!
- *
- * Returns message data size or errno
- */
-int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
-		   unsigned int len, int max_size, struct sk_buff **buf)
-{
-	int dsz, sz, hsz;
-	unsigned char *to;
-
-	dsz = len;
-	hsz = msg_hdr_sz(hdr);
-	sz = hsz + dsz;
-	msg_set_size(hdr, sz);
-	if (unlikely(sz > max_size)) {
-		*buf = NULL;
-		return dsz;
-	}
-
-	*buf = tipc_buf_acquire(sz);
-	if (!(*buf))
-		return -ENOMEM;
-	skb_copy_to_linear_data(*buf, hdr, hsz);
-	to = (*buf)->data + hsz;
-	if (len && memcpy_fromiovecend(to, msg_sect, 0, dsz)) {
-		kfree_skb(*buf);
-		*buf = NULL;
-		return -EFAULT;
-	}
-	return dsz;
-}
-
 /* tipc_buf_append(): Append a buffer to the fragment list of another buffer
  * Let first buffer become head buffer
  * Returns 1 and sets *buf to headbuf if chain is complete, otherwise 0
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index a15d596..868fe22 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -732,9 +732,6 @@ int tipc_msg_eval(struct sk_buff *buf, u32 *dnode);
 void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
 		   u32 destnode);
 
-int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
-		   unsigned int len, int max_size, struct sk_buff **buf);
-
 int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
 
 bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu);
diff --git a/net/tipc/port.c b/net/tipc/port.c
index 0d09dcb..835dca1 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -74,118 +74,6 @@ int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg)
 		(!peernode && (orignode == tipc_own_addr));
 }
 
-/**
- * tipc_port_mcast_xmit - send a multicast message to local and remote
- * destinations
- */
-int tipc_port_mcast_xmit(struct tipc_port *oport,
-			 struct tipc_name_seq const *seq,
-			 struct iovec const *msg_sect,
-			 unsigned int len)
-{
-	struct tipc_msg *hdr;
-	struct sk_buff *buf;
-	struct sk_buff *ibuf = NULL;
-	struct tipc_port_list dports = {0, NULL, };
-	int ext_targets;
-	int res;
-
-	/* Create multicast message */
-	hdr = &oport->phdr;
-	msg_set_type(hdr, TIPC_MCAST_MSG);
-	msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
-	msg_set_destport(hdr, 0);
-	msg_set_destnode(hdr, 0);
-	msg_set_nametype(hdr, seq->type);
-	msg_set_namelower(hdr, seq->lower);
-	msg_set_nameupper(hdr, seq->upper);
-	msg_set_hdr_sz(hdr, MCAST_H_SIZE);
-	res = tipc_msg_build(hdr, msg_sect, len, MAX_MSG_SIZE, &buf);
-	if (unlikely(!buf))
-		return res;
-
-	/* Figure out where to send multicast message */
-	ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper,
-						TIPC_NODE_SCOPE, &dports);
-
-	/* Send message to destinations (duplicate it only if necessary) */
-	if (ext_targets) {
-		if (dports.count != 0) {
-			ibuf = skb_copy(buf, GFP_ATOMIC);
-			if (ibuf == NULL) {
-				tipc_port_list_free(&dports);
-				kfree_skb(buf);
-				return -ENOMEM;
-			}
-		}
-		res = tipc_bclink_xmit(buf);
-		if ((res < 0) && (dports.count != 0))
-			kfree_skb(ibuf);
-	} else {
-		ibuf = buf;
-	}
-
-	if (res >= 0) {
-		if (ibuf)
-			tipc_port_mcast_rcv(ibuf, &dports);
-	} else {
-		tipc_port_list_free(&dports);
-	}
-	return res;
-}
-
-/**
- * tipc_port_mcast_rcv - deliver multicast message to all destination ports
- *
- * If there is no port list, perform a lookup to create one
- */
-void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp)
-{
-	struct tipc_msg *msg;
-	struct tipc_port_list dports = {0, NULL, };
-	struct tipc_port_list *item = dp;
-	int cnt = 0;
-
-	msg = buf_msg(buf);
-
-	/* Create destination port list, if one wasn't supplied */
-	if (dp == NULL) {
-		tipc_nametbl_mc_translate(msg_nametype(msg),
-				     msg_namelower(msg),
-				     msg_nameupper(msg),
-				     TIPC_CLUSTER_SCOPE,
-				     &dports);
-		item = dp = &dports;
-	}
-
-	/* Deliver a copy of message to each destination port */
-	if (dp->count != 0) {
-		msg_set_destnode(msg, tipc_own_addr);
-		if (dp->count == 1) {
-			msg_set_destport(msg, dp->ports[0]);
-			tipc_sk_rcv(buf);
-			tipc_port_list_free(dp);
-			return;
-		}
-		for (; cnt < dp->count; cnt++) {
-			int index = cnt % PLSIZE;
-			struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
-
-			if (b == NULL) {
-				pr_warn("Unable to deliver multicast message(s)\n");
-				goto exit;
-			}
-			if ((index == 0) && (cnt != 0))
-				item = item->next;
-			msg_set_destport(buf_msg(b), item->ports[index]);
-			tipc_sk_rcv(b);
-		}
-	}
-exit:
-	kfree_skb(buf);
-	tipc_port_list_free(dp);
-}
-
 /* tipc_port_init - intiate TIPC port and lock it
  *
  * Returns obtained reference if initialization is successful, zero otherwise
diff --git a/net/tipc/port.h b/net/tipc/port.h
index 0e47052..3f93454 100644
--- a/net/tipc/port.h
+++ b/net/tipc/port.h
@@ -120,17 +120,7 @@ int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr,
 		   struct tipc_portid const *peer);
 int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg);
 
-/*
- * TIPC messaging routines
- */
-
-int tipc_port_mcast_xmit(struct tipc_port *port,
-			 struct tipc_name_seq const *seq,
-			 struct iovec const *msg,
-			 unsigned int len);
-
 struct sk_buff *tipc_port_get_ports(void);
-void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp);
 void tipc_port_reinit(void);
 
 /**
-- 
1.7.9.5

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH net-next 6/7] tipc: rename temporarily named functions
  2014-07-15  3:54 [PATCH net-next 0/7] tipc: multicast and internal users to new send functions Jon Maloy
                   ` (4 preceding siblings ...)
  2014-07-15  3:54 ` [PATCH net-next 5/7] tipc: remove unreferenced functions Jon Maloy
@ 2014-07-15  3:54 ` Jon Maloy
  2014-07-15  3:54 ` [PATCH net-next 7/7] tipc: ensure sequential message delivery across dual bearers Jon Maloy
  6 siblings, 0 replies; 12+ messages in thread
From: Jon Maloy @ 2014-07-15  3:54 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

After the previous commit, we can now give the functions with temporary
names, such as tipc_link_xmit2(), tipc_msg_build2() etc., their proper
names.

There are no functional changes in this commit.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/bcast.c      |    6 +++---
 net/tipc/bcast.h      |    2 +-
 net/tipc/link.c       |   18 +++++++++---------
 net/tipc/link.h       |    4 ++--
 net/tipc/msg.c        |    6 +++---
 net/tipc/msg.h        |    4 ++--
 net/tipc/name_distr.c |    4 ++--
 net/tipc/port.c       |   10 +++++-----
 net/tipc/socket.c     |   20 ++++++++++----------
 9 files changed, 37 insertions(+), 37 deletions(-)

diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 4081251..446295d 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -389,13 +389,13 @@ static void bclink_peek_nack(struct tipc_msg *msg)
 	tipc_node_unlock(n_ptr);
 }
 
-/* tipc_bclink_xmit2 - broadcast buffer chain to all nodes in cluster
- *                     and to identified node local sockets
+/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster
+ *                    and to identified node local sockets
  * @buf: chain of buffers containing message
  * Consumes the buffer chain, except when returning -ELINKCONG
  * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
  */
-int tipc_bclink_xmit2(struct sk_buff *buf)
+int tipc_bclink_xmit(struct sk_buff *buf)
 {
 	int rc = 0;
 	struct sk_buff *clbuf;
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index af6b657..4875d95 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -98,6 +98,6 @@ int  tipc_bclink_reset_stats(void);
 int  tipc_bclink_set_queue_limits(u32 limit);
 void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action);
 uint  tipc_bclink_get_mtu(void);
-int tipc_bclink_xmit2(struct sk_buff *buf);
+int tipc_bclink_xmit(struct sk_buff *buf);
 
 #endif
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 28730dd..fb1485d 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -706,7 +706,7 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
 }
 
 /**
- * __tipc_link_xmit2(): same as tipc_link_xmit2, but destlink is known & locked
+ * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
  * @link: link to use
  * @buf: chain of buffers containing message
  * Consumes the buffer chain, except when returning -ELINKCONG
@@ -715,7 +715,7 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
  * Only the socket functions tipc_send_stream() and tipc_send_packet() need
  * to act on the return value, since they may need to do more send attempts.
  */
-int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf)
+int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf)
 {
 	struct tipc_msg *msg = buf_msg(buf);
 	uint psz = msg_size(msg);
@@ -783,7 +783,7 @@ int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf)
 }
 
 /**
- * tipc_link_xmit2() is the general link level function for message sending
+ * tipc_link_xmit() is the general link level function for message sending
  * @buf: chain of buffers containing message
  * @dsz: amount of user data to be sent
  * @dnode: address of destination node
@@ -791,7 +791,7 @@ int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf)
  * Consumes the buffer chain, except when returning -ELINKCONG
  * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
  */
-int tipc_link_xmit2(struct sk_buff *buf, u32 dnode, u32 selector)
+int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
 {
 	struct tipc_link *link = NULL;
 	struct tipc_node *node;
@@ -802,7 +802,7 @@ int tipc_link_xmit2(struct sk_buff *buf, u32 dnode, u32 selector)
 		tipc_node_lock(node);
 		link = node->active_links[selector & 1];
 		if (link)
-			rc = __tipc_link_xmit2(link, buf);
+			rc = __tipc_link_xmit(link, buf);
 		tipc_node_unlock(node);
 	}
 
@@ -836,7 +836,7 @@ static void tipc_link_sync_xmit(struct tipc_link *link)
 	msg = buf_msg(buf);
 	tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr);
 	msg_set_last_bcast(msg, link->owner->bclink.acked);
-	__tipc_link_xmit2(link, buf);
+	__tipc_link_xmit(link, buf);
 }
 
 /*
@@ -1683,7 +1683,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
 	}
 	skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
 	skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
-	__tipc_link_xmit2(tunnel, buf);
+	__tipc_link_xmit(tunnel, buf);
 }
 
 
@@ -1716,7 +1716,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
 		if (buf) {
 			skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
 			msg_set_size(&tunnel_hdr, INT_H_SIZE);
-			__tipc_link_xmit2(tunnel, buf);
+			__tipc_link_xmit(tunnel, buf);
 		} else {
 			pr_warn("%sunable to send changeover msg\n",
 				link_co_err);
@@ -1789,7 +1789,7 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
 		skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);
 		skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,
 					       length);
-		__tipc_link_xmit2(tunnel, outbuf);
+		__tipc_link_xmit(tunnel, outbuf);
 		if (!tipc_link_is_up(l_ptr))
 			return;
 		iter = iter->next;
diff --git a/net/tipc/link.h b/net/tipc/link.h
index f0ebeb6..782983c 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -226,8 +226,8 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area,
 void tipc_link_reset_all(struct tipc_node *node);
 void tipc_link_reset(struct tipc_link *l_ptr);
 void tipc_link_reset_list(unsigned int bearer_id);
-int tipc_link_xmit2(struct sk_buff *buf, u32 dest, u32 selector);
-int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf);
+int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector);
+int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf);
 u32 tipc_link_get_max_pkt(u32 dest, u32 selector);
 void tipc_link_bundle_rcv(struct sk_buff *buf);
 void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 04e5cad..21b803a 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -115,7 +115,7 @@ out_free:
 
 
 /**
- * tipc_msg_build2 - create buffer chain containing specified header and data
+ * tipc_msg_build - create buffer chain containing specified header and data
  * @mhdr: Message header, to be prepended to data
  * @iov: User data
  * @offset: Posision in iov to start copying from
@@ -124,8 +124,8 @@ out_free:
  * @chain: Buffer or chain of buffers to be returned to caller
  * Returns message data size or errno: -ENOMEM, -EFAULT
  */
-int tipc_msg_build2(struct tipc_msg *mhdr, struct iovec const *iov,
-		    int offset, int dsz, int pktmax , struct sk_buff **chain)
+int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
+		   int offset, int dsz, int pktmax , struct sk_buff **chain)
 {
 	int mhsz = msg_hdr_sz(mhdr);
 	int msz = mhsz + dsz;
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 868fe22..462fa19 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -738,8 +738,8 @@ bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu);
 
 bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode);
 
-int tipc_msg_build2(struct tipc_msg *mhdr, struct iovec const *iov,
-		    int offset, int dsz, int mtu , struct sk_buff **chain);
+int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
+		   int offset, int dsz, int mtu , struct sk_buff **chain);
 
 struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain);
 
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index d16f947..dcc15bc 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -116,7 +116,7 @@ void named_cluster_distribute(struct sk_buff *buf)
 		if (!obuf)
 			break;
 		msg_set_destnode(buf_msg(obuf), dnode);
-		tipc_link_xmit2(obuf, dnode, dnode);
+		tipc_link_xmit(obuf, dnode, dnode);
 	}
 	rcu_read_unlock();
 
@@ -232,7 +232,7 @@ void tipc_named_node_up(u32 dnode)
 	/* Convert circular list to linear list and send: */
 	buf_chain = (struct sk_buff *)msg_list.next;
 	((struct sk_buff *)msg_list.prev)->next = NULL;
-	tipc_link_xmit2(buf_chain, dnode, dnode);
+	tipc_link_xmit(buf_chain, dnode, dnode);
 }
 
 /**
diff --git a/net/tipc/port.c b/net/tipc/port.c
index 835dca1..7e096a5 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -130,7 +130,7 @@ void tipc_port_destroy(struct tipc_port *p_ptr)
 		tipc_nodesub_unsubscribe(&p_ptr->subscription);
 		msg = buf_msg(buf);
 		peer = msg_destnode(msg);
-		tipc_link_xmit2(buf, peer, msg_link_selector(msg));
+		tipc_link_xmit(buf, peer, msg_link_selector(msg));
 	}
 	spin_lock_bh(&tipc_port_list_lock);
 	list_del(&p_ptr->port_list);
@@ -187,7 +187,7 @@ static void port_timeout(unsigned long ref)
 	}
 	tipc_port_unlock(p_ptr);
 	msg = buf_msg(buf);
-	tipc_link_xmit2(buf, msg_destnode(msg),	msg_link_selector(msg));
+	tipc_link_xmit(buf, msg_destnode(msg),	msg_link_selector(msg));
 }
 
 
@@ -202,7 +202,7 @@ static void port_handle_node_down(unsigned long ref)
 	buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
 	tipc_port_unlock(p_ptr);
 	msg = buf_msg(buf);
-	tipc_link_xmit2(buf, msg_destnode(msg),	msg_link_selector(msg));
+	tipc_link_xmit(buf, msg_destnode(msg),	msg_link_selector(msg));
 }
 
 
@@ -347,7 +347,7 @@ void tipc_acknowledge(u32 ref, u32 ack)
 	if (!buf)
 		return;
 	msg = buf_msg(buf);
-	tipc_link_xmit2(buf, msg_destnode(msg),	msg_link_selector(msg));
+	tipc_link_xmit(buf, msg_destnode(msg),	msg_link_selector(msg));
 }
 
 int tipc_publish(struct tipc_port *p_ptr, unsigned int scope,
@@ -509,6 +509,6 @@ int tipc_port_shutdown(u32 ref)
 	buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN);
 	tipc_port_unlock(p_ptr);
 	msg = buf_msg(buf);
-	tipc_link_xmit2(buf, msg_destnode(msg),	msg_link_selector(msg));
+	tipc_link_xmit(buf, msg_destnode(msg),	msg_link_selector(msg));
 	return tipc_port_disconnect(ref);
 }
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index ae3d716..495bcf4 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -131,7 +131,7 @@ static void reject_rx_queue(struct sock *sk)
 
 	while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
 		if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT))
-			tipc_link_xmit2(buf, dnode, 0);
+			tipc_link_xmit(buf, dnode, 0);
 	}
 }
 
@@ -341,7 +341,7 @@ static int tipc_release(struct socket *sock)
 				tipc_port_disconnect(port->ref);
 			}
 			if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT))
-				tipc_link_xmit2(buf, dnode, 0);
+				tipc_link_xmit(buf, dnode, 0);
 		}
 	}
 
@@ -566,7 +566,7 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
 
 new_mtu:
 	mtu = tipc_bclink_get_mtu();
-	rc = tipc_msg_build2(mhdr, iov, 0, dsz, mtu, &buf);
+	rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf);
 	if (unlikely(rc < 0))
 		return rc;
 
@@ -821,12 +821,12 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
 
 new_mtu:
 	mtu = tipc_node_get_mtu(dnode, tsk->port.ref);
-	rc = tipc_msg_build2(mhdr, iov, 0, dsz, mtu, &buf);
+	rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf);
 	if (rc < 0)
 		goto exit;
 
 	do {
-		rc = tipc_link_xmit2(buf, dnode, tsk->port.ref);
+		rc = tipc_link_xmit(buf, dnode, tsk->port.ref);
 		if (likely(rc >= 0)) {
 			if (sock->state != SS_READY)
 				sock->state = SS_CONNECTING;
@@ -934,12 +934,12 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
 next:
 	mtu = port->max_pkt;
 	send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
-	rc = tipc_msg_build2(mhdr, m->msg_iov, sent, send, mtu, &buf);
+	rc = tipc_msg_build(mhdr, m->msg_iov, sent, send, mtu, &buf);
 	if (unlikely(rc < 0))
 		goto exit;
 	do {
 		if (likely(!tipc_sk_conn_cong(tsk))) {
-			rc = tipc_link_xmit2(buf, dnode, ref);
+			rc = tipc_link_xmit(buf, dnode, ref);
 			if (likely(!rc)) {
 				tsk->sent_unacked++;
 				sent += send;
@@ -1571,7 +1571,7 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
 	if ((rc < 0) && !tipc_msg_reverse(buf, &onode, -rc))
 		return 0;
 
-	tipc_link_xmit2(buf, onode, 0);
+	tipc_link_xmit(buf, onode, 0);
 
 	return 0;
 }
@@ -1623,7 +1623,7 @@ exit:
 	if ((rc < 0) && !tipc_msg_reverse(buf, &dnode, -rc))
 		return -EHOSTUNREACH;
 
-	tipc_link_xmit2(buf, dnode, 0);
+	tipc_link_xmit(buf, dnode, 0);
 	return (rc < 0) ? -EHOSTUNREACH : 0;
 }
 
@@ -1910,7 +1910,7 @@ restart:
 			}
 			tipc_port_disconnect(port->ref);
 			if (tipc_msg_reverse(buf, &peer, TIPC_CONN_SHUTDOWN))
-				tipc_link_xmit2(buf, peer, 0);
+				tipc_link_xmit(buf, peer, 0);
 		} else {
 			tipc_port_shutdown(port->ref);
 		}
-- 
1.7.9.5

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH net-next 7/7] tipc: ensure sequential message delivery across dual bearers
  2014-07-15  3:54 [PATCH net-next 0/7] tipc: multicast and internal users to new send functions Jon Maloy
                   ` (5 preceding siblings ...)
  2014-07-15  3:54 ` [PATCH net-next 6/7] tipc: rename temporarily named functions Jon Maloy
@ 2014-07-15  3:54 ` Jon Maloy
  6 siblings, 0 replies; 12+ messages in thread
From: Jon Maloy @ 2014-07-15  3:54 UTC (permalink / raw)
  To: davem; +Cc: Jon Maloy, erik.hugne, netdev, Paul Gortmaker, tipc-discussion

When we run broadcast packets over dual bearers/interfaces, the
current transmission code is flipping bearers between each sent
packet, with the purpose of leveraging the double bandwidth
available. The receiving bclink is resequencing the packets if
needed, so all messages are delivered upwards from the broadcast
link in the correct order, even if they may arrive in concurrent
interrupts.

However, at the moment of delivery upwards to the socket, we release
all spinlocks (bclink_lock, node_lock), so it is still possible
that arriving messages bypass each other before they reach the socket
queue.

We fix this by applying the same technique we are using for unicast
traffic. We use a link selector (i.e., the last bit of sending port
number) to ensure that messages from the same sender socket always are
sent over the same bearer. This guarantees sequential delivery between
socket pairs, which is sufficient to satisfy the protocol spec, as well
as all known user requirements.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/bcast.c |   17 +++++------------
 1 file changed, 5 insertions(+), 12 deletions(-)

diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 446295d..af1f6f7 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -631,6 +631,7 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
 			      struct tipc_media_addr *unused2)
 {
 	int bp_index;
+	struct tipc_msg *msg = buf_msg(buf);
 
 	/* Prepare broadcast link message for reliable transmission,
 	 * if first time trying to send it;
@@ -638,10 +639,7 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
 	 * since they are sent in an unreliable manner and don't need it
 	 */
 	if (likely(!msg_non_seq(buf_msg(buf)))) {
-		struct tipc_msg *msg;
-
 		bcbuf_set_acks(buf, bclink->bcast_nodes.count);
-		msg = buf_msg(buf);
 		msg_set_non_seq(msg, 1);
 		msg_set_mc_netid(msg, tipc_net_id);
 		bcl->stats.sent_info++;
@@ -658,12 +656,14 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
 	for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
 		struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
 		struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
-		struct tipc_bearer *b = p;
+		struct tipc_bearer *bp[2] = {p, s};
+		struct tipc_bearer *b = bp[msg_link_selector(msg)];
 		struct sk_buff *tbuf;
 
 		if (!p)
 			break; /* No more bearers to try */
-
+		if (!b)
+			b = p;
 		tipc_nmap_diff(&bcbearer->remains, &b->nodes,
 			       &bcbearer->remains_new);
 		if (bcbearer->remains_new.count == bcbearer->remains.count)
@@ -680,13 +680,6 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
 			tipc_bearer_send(b->identity, tbuf, &b->bcast_addr);
 			kfree_skb(tbuf); /* Bearer keeps a clone */
 		}
-
-		/* Swap bearers for next packet */
-		if (s) {
-			bcbearer->bpairs[bp_index].primary = s;
-			bcbearer->bpairs[bp_index].secondary = p;
-		}
-
 		if (bcbearer->remains_new.count == 0)
 			break; /* All targets reached */
 
-- 
1.7.9.5


------------------------------------------------------------------------------
Want fast and easy access to all the code in your enterprise? Index and
search up to 200,000 lines of code with a free copy of Black Duck
Code Sight - the same software that powers the world's largest code
search on Ohloh, the Black Duck Open Hub! Try it now.
http://p.sf.net/sfu/bds

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* Re: [PATCH net-next 3/7] tipc: add new functions for multicast and broadcast distribution
  2014-07-16 22:10   ` David Miller
@ 2014-07-16 22:21     ` Jon Maloy
  0 siblings, 0 replies; 12+ messages in thread
From: Jon Maloy @ 2014-07-16 22:21 UTC (permalink / raw)
  To: David Miller; +Cc: netdev, paul.gortmaker, tipc-discussion



> -----Original Message-----
> From: David Miller [mailto:davem@davemloft.net]
> Sent: July-16-14 6:11 PM
> To: Jon Maloy
> Cc: netdev@vger.kernel.org; paul.gortmaker@windriver.com; Erik Hugne;
> ying.xue@windriver.com; maloy@donjonn.com; tipc-
> discussion@lists.sourceforge.net
> Subject: Re: [PATCH net-next 3/7] tipc: add new functions for multicast and
> broadcast distribution
> 
> From: Jon Maloy <jon.maloy@ericsson.com>
> Date: Tue, 15 Jul 2014 13:46:06 -0400
> 
> > +	/* Copy header if single buffer */
> > +	if (!buf->next) {
> > +		hdr_sz = skb_headroom(buf) + msg_hdr_sz(buf_msg(buf));
> > +		head = __pskb_copy(buf, hdr_sz, GFP_ATOMIC);
> > +		if (likely(head))
> > +			return head;
> > +		return NULL;
> 
> These last four lines are simply:
> 
> 		return __pskb_copy(buf, hdr_sz, GFP_ATOMIC);
> 
> Don't make this code more complicated than necessary :)

Uhh...  Of course.  I think I changed this code back and forth too many times.
If that is all,  I'll post the series again asap.

///jon

------------------------------------------------------------------------------
Want fast and easy access to all the code in your enterprise? Index and
search up to 200,000 lines of code with a free copy of Black Duck
Code Sight - the same software that powers the world's largest code
search on Ohloh, the Black Duck Open Hub! Try it now.
http://p.sf.net/sfu/bds

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [PATCH net-next 3/7] tipc: add new functions for multicast and broadcast distribution
  2014-07-15 17:46 ` [PATCH net-next 3/7] tipc: add new functions for multicast and broadcast distribution Jon Maloy
@ 2014-07-16 22:10   ` David Miller
  2014-07-16 22:21     ` Jon Maloy
  0 siblings, 1 reply; 12+ messages in thread
From: David Miller @ 2014-07-16 22:10 UTC (permalink / raw)
  To: jon.maloy
  Cc: netdev, paul.gortmaker, erik.hugne, ying.xue, maloy, tipc-discussion

From: Jon Maloy <jon.maloy@ericsson.com>
Date: Tue, 15 Jul 2014 13:46:06 -0400

> +	/* Copy header if single buffer */
> +	if (!buf->next) {
> +		hdr_sz = skb_headroom(buf) + msg_hdr_sz(buf_msg(buf));
> +		head = __pskb_copy(buf, hdr_sz, GFP_ATOMIC);
> +		if (likely(head))
> +			return head;
> +		return NULL;

These last four lines are simply:

		return __pskb_copy(buf, hdr_sz, GFP_ATOMIC);

Don't make this code more complicated than necessary :)

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [PATCH net-next 3/7] tipc: add new functions for multicast and broadcast distribution
  2014-07-15 17:46 [PATCH net-next 0/7] tipc: multicast and internal users to new send functions Jon Maloy
@ 2014-07-15 17:46 ` Jon Maloy
  2014-07-16 22:10   ` David Miller
  0 siblings, 1 reply; 12+ messages in thread
From: Jon Maloy @ 2014-07-15 17:46 UTC (permalink / raw)
  To: davem; +Cc: Jon Maloy, netdev, Paul Gortmaker, tipc-discussion

We add a new broadcast link transmit function in bclink.c and a new
receive function in socket.c. The purpose is to move the branching
between external and internal destination down to the link layer,
just as we have done with unicast in earlier commits. We also make
use of the new link-independent fragmentation support that was
introduced in an earlier commit series.

This gives a shorter and simpler code path, and makes it possible
to obtain copy-free buffer delivery to all node local destination
sockets.

The new transmission code is added in parallel with the existing one,
and will be used by the socket multicast send function in the next
commit in this series.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/bcast.c  |   55 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
 net/tipc/bcast.h  |    4 +++-
 net/tipc/msg.c    |   38 ++++++++++++++++++++++++++++++++++++
 net/tipc/msg.h    |    2 ++
 net/tipc/socket.c |   40 ++++++++++++++++++++++++++++++++++++++
 net/tipc/socket.h |    2 ++
 6 files changed, 139 insertions(+), 2 deletions(-)

diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 2663167..4742d60 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -1,7 +1,7 @@
 /*
  * net/tipc/bcast.c: TIPC broadcast code
  *
- * Copyright (c) 2004-2006, Ericsson AB
+ * Copyright (c) 2004-2006, 2014, Ericsson AB
  * Copyright (c) 2004, Intel Corporation.
  * Copyright (c) 2005, 2010-2011, Wind River Systems
  * All rights reserved.
@@ -38,6 +38,8 @@
 #include "core.h"
 #include "link.h"
 #include "port.h"
+#include "socket.h"
+#include "msg.h"
 #include "bcast.h"
 #include "name_distr.h"
 
@@ -138,6 +140,11 @@ static void tipc_bclink_unlock(void)
 		tipc_link_reset_all(node);
 }
 
+uint  tipc_bclink_get_mtu(void)
+{
+	return MAX_PKT_DEFAULT_MCAST;
+}
+
 void tipc_bclink_set_flags(unsigned int flags)
 {
 	bclink->flags |= flags;
@@ -408,6 +415,52 @@ exit:
 	return res;
 }
 
+/* tipc_bclink_xmit2 - broadcast buffer chain to all nodes in cluster
+ *                     and to identified node local sockets
+ * @buf: chain of buffers containing message
+ * Consumes the buffer chain, except when returning -ELINKCONG
+ * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
+ */
+int tipc_bclink_xmit2(struct sk_buff *buf)
+{
+	int rc = 0;
+	int bc = 0;
+	struct sk_buff *clbuf;
+
+	/* Prepare clone of message for local node */
+	clbuf = tipc_msg_reassemble(buf);
+	if (unlikely(!clbuf)) {
+		kfree_skb_list(buf);
+		return -EHOSTUNREACH;
+	}
+
+	/* Broadcast to all other nodes */
+	if (likely(bclink)) {
+		tipc_bclink_lock();
+		if (likely(bclink->bcast_nodes.count)) {
+			rc = __tipc_link_xmit(bcl, buf);
+			if (likely(!rc)) {
+				bclink_set_last_sent();
+				bcl->stats.queue_sz_counts++;
+				bcl->stats.accu_queue_sz += bcl->out_queue_size;
+			}
+			bc = 1;
+		}
+		tipc_bclink_unlock();
+	}
+
+	if (unlikely(!bc))
+		kfree_skb_list(buf);
+
+	/* Deliver message clone */
+	if (likely(!rc))
+		tipc_sk_mcast_rcv(clbuf);
+	else
+		kfree_skb(clbuf);
+
+	return rc;
+}
+
 /**
  * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
  *
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 00330c4..d90645f 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -1,7 +1,7 @@
 /*
  * net/tipc/bcast.h: Include file for TIPC broadcast code
  *
- * Copyright (c) 2003-2006, Ericsson AB
+ * Copyright (c) 2003-2006, 2014, Ericsson AB
  * Copyright (c) 2005, 2010-2011, Wind River Systems
  * All rights reserved.
  *
@@ -98,5 +98,7 @@ int  tipc_bclink_stats(char *stats_buf, const u32 buf_size);
 int  tipc_bclink_reset_stats(void);
 int  tipc_bclink_set_queue_limits(u32 limit);
 void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action);
+uint  tipc_bclink_get_mtu(void);
+int tipc_bclink_xmit2(struct sk_buff *buf);
 
 #endif
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 6ec9584..4662b16 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -412,3 +412,41 @@ int tipc_msg_eval(struct sk_buff *buf, u32 *dnode)
 	msg_set_destport(msg, dport);
 	return TIPC_OK;
 }
+
+/* tipc_msg_reassemble() - clone a buffer chain of fragments and
+ *                         reassemble the clones into one message
+ */
+struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain)
+{
+	struct sk_buff *buf = chain;
+	struct sk_buff *frag = buf;
+	struct sk_buff *head = NULL;
+	int hdr_sz;
+
+	/* Copy header if single buffer */
+	if (!buf->next) {
+		hdr_sz = skb_headroom(buf) + msg_hdr_sz(buf_msg(buf));
+		head = __pskb_copy(buf, hdr_sz, GFP_ATOMIC);
+		if (likely(head))
+			return head;
+		return NULL;
+	}
+
+	/* Clone all fragments and reassemble */
+	while (buf) {
+		frag = skb_clone(buf, GFP_ATOMIC);
+		if (!frag)
+			goto error;
+		frag->next = NULL;
+		if (tipc_buf_append(&head, &frag))
+			break;
+		if (!head)
+			goto error;
+		buf = buf->next;
+	}
+	return frag;
+error:
+	pr_warn("Failed do clone local mcast rcv buffer\n");
+	kfree_skb(head);
+	return NULL;
+}
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 7d57434..a15d596 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -744,4 +744,6 @@ bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode);
 int tipc_msg_build2(struct tipc_msg *mhdr, struct iovec const *iov,
 		    int offset, int dsz, int mtu , struct sk_buff **chain);
 
+struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain);
+
 #endif
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 8c5600c..d16550f 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -534,6 +534,46 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
 	return mask;
 }
 
+/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
+ */
+void tipc_sk_mcast_rcv(struct sk_buff *buf)
+{
+	struct tipc_msg *msg = buf_msg(buf);
+	struct tipc_port_list dports = {0, NULL, };
+	struct tipc_port_list *item;
+	struct sk_buff *b;
+	uint i, last, dst = 0;
+	u32 scope = TIPC_CLUSTER_SCOPE;
+
+	if (in_own_node(msg_orignode(msg)))
+		scope = TIPC_NODE_SCOPE;
+
+	/* Create destination port list: */
+	tipc_nametbl_mc_translate(msg_nametype(msg),
+				  msg_namelower(msg),
+				  msg_nameupper(msg),
+				  scope,
+				  &dports);
+	last = dports.count;
+	if (!last) {
+		kfree_skb(buf);
+		return;
+	}
+
+	for (item = &dports; item; item = item->next) {
+		for (i = 0; i < PLSIZE && ++dst <= last; i++) {
+			b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf;
+			if (!b) {
+				pr_warn("Failed do clone mcast rcv buffer\n");
+				continue;
+			}
+			msg_set_destport(msg, item->ports[i]);
+			tipc_sk_rcv(b);
+		}
+	}
+	tipc_port_list_free(&dports);
+}
+
 /**
  * tipc_sk_proto_rcv - receive a connection mng protocol message
  * @tsk: receiving socket
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 2cdede9..43b75b3 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -85,4 +85,6 @@ static inline int tipc_sk_conn_cong(struct tipc_sock *tsk)
 
 int tipc_sk_rcv(struct sk_buff *buf);
 
+void tipc_sk_mcast_rcv(struct sk_buff *buf);
+
 #endif
-- 
1.7.9.5


------------------------------------------------------------------------------
Want fast and easy access to all the code in your enterprise? Index and
search up to 200,000 lines of code with a free copy of Black Duck
Code Sight - the same software that powers the world's largest code
search on Ohloh, the Black Duck Open Hub! Try it now.
http://p.sf.net/sfu/bds

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH net-next 3/7] tipc: add new functions for multicast and broadcast distribution
  2014-07-15 14:49 [PATCH net-next 0/7] tipc: multicast and internal users to new send functions Jon Maloy
@ 2014-07-15 14:49 ` Jon Maloy
  0 siblings, 0 replies; 12+ messages in thread
From: Jon Maloy @ 2014-07-15 14:49 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

We add a new broadcast link transmit function in bclink.c and a new
receive function in socket.c. The purpose is to move the branching
between external and internal destination down to the link layer,
just as we have done with unicast in earlier commits. We also make
use of the new link-independent fragmentation support that was
introduced in an earlier commit series.

This gives a shorter and simpler code path, and makes it possible
to obtain copy-free buffer delivery to all node local destination
sockets.

The new transmission code is added in parallel with the existing one,
and will be used by the socket multicast send function in the next
commit in this series.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/bcast.c  |   55 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
 net/tipc/bcast.h  |    4 +++-
 net/tipc/msg.c    |   38 ++++++++++++++++++++++++++++++++++++
 net/tipc/msg.h    |    2 ++
 net/tipc/socket.c |   40 ++++++++++++++++++++++++++++++++++++++
 net/tipc/socket.h |    2 ++
 6 files changed, 139 insertions(+), 2 deletions(-)

diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 2663167..4742d60 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -1,7 +1,7 @@
 /*
  * net/tipc/bcast.c: TIPC broadcast code
  *
- * Copyright (c) 2004-2006, Ericsson AB
+ * Copyright (c) 2004-2006, 2014, Ericsson AB
  * Copyright (c) 2004, Intel Corporation.
  * Copyright (c) 2005, 2010-2011, Wind River Systems
  * All rights reserved.
@@ -38,6 +38,8 @@
 #include "core.h"
 #include "link.h"
 #include "port.h"
+#include "socket.h"
+#include "msg.h"
 #include "bcast.h"
 #include "name_distr.h"
 
@@ -138,6 +140,11 @@ static void tipc_bclink_unlock(void)
 		tipc_link_reset_all(node);
 }
 
+uint  tipc_bclink_get_mtu(void)
+{
+	return MAX_PKT_DEFAULT_MCAST;
+}
+
 void tipc_bclink_set_flags(unsigned int flags)
 {
 	bclink->flags |= flags;
@@ -408,6 +415,52 @@ exit:
 	return res;
 }
 
+/* tipc_bclink_xmit2 - broadcast buffer chain to all nodes in cluster
+ *                     and to identified node local sockets
+ * @buf: chain of buffers containing message
+ * Consumes the buffer chain, except when returning -ELINKCONG
+ * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
+ */
+int tipc_bclink_xmit2(struct sk_buff *buf)
+{
+	int rc = 0;
+	int bc = 0;
+	struct sk_buff *clbuf;
+
+	/* Prepare clone of message for local node */
+	clbuf = tipc_msg_reassemble(buf);
+	if (unlikely(!clbuf)) {
+		kfree_skb_list(buf);
+		return -EHOSTUNREACH;
+	}
+
+	/* Broadcast to all other nodes */
+	if (likely(bclink)) {
+		tipc_bclink_lock();
+		if (likely(bclink->bcast_nodes.count)) {
+			rc = __tipc_link_xmit(bcl, buf);
+			if (likely(!rc)) {
+				bclink_set_last_sent();
+				bcl->stats.queue_sz_counts++;
+				bcl->stats.accu_queue_sz += bcl->out_queue_size;
+			}
+			bc = 1;
+		}
+		tipc_bclink_unlock();
+	}
+
+	if (unlikely(!bc))
+		kfree_skb_list(buf);
+
+	/* Deliver message clone */
+	if (likely(!rc))
+		tipc_sk_mcast_rcv(clbuf);
+	else
+		kfree_skb(clbuf);
+
+	return rc;
+}
+
 /**
  * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
  *
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 00330c4..d90645f 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -1,7 +1,7 @@
 /*
  * net/tipc/bcast.h: Include file for TIPC broadcast code
  *
- * Copyright (c) 2003-2006, Ericsson AB
+ * Copyright (c) 2003-2006, 2014, Ericsson AB
  * Copyright (c) 2005, 2010-2011, Wind River Systems
  * All rights reserved.
  *
@@ -98,5 +98,7 @@ int  tipc_bclink_stats(char *stats_buf, const u32 buf_size);
 int  tipc_bclink_reset_stats(void);
 int  tipc_bclink_set_queue_limits(u32 limit);
 void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action);
+uint  tipc_bclink_get_mtu(void);
+int tipc_bclink_xmit2(struct sk_buff *buf);
 
 #endif
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 6ec9584..4662b16 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -412,3 +412,41 @@ int tipc_msg_eval(struct sk_buff *buf, u32 *dnode)
 	msg_set_destport(msg, dport);
 	return TIPC_OK;
 }
+
+/* tipc_msg_reassemble() - clone a buffer chain of fragments and
+ *                         reassemble the clones into one message
+ */
+struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain)
+{
+	struct sk_buff *buf = chain;
+	struct sk_buff *frag = buf;
+	struct sk_buff *head = NULL;
+	int hdr_sz;
+
+	/* Copy header if single buffer */
+	if (!buf->next) {
+		hdr_sz = skb_headroom(buf) + msg_hdr_sz(buf_msg(buf));
+		head = __pskb_copy(buf, hdr_sz, GFP_ATOMIC);
+		if (likely(head))
+			return head;
+		return NULL;
+	}
+
+	/* Clone all fragments and reassemble */
+	while (buf) {
+		frag = skb_clone(buf, GFP_ATOMIC);
+		if (!frag)
+			goto error;
+		frag->next = NULL;
+		if (tipc_buf_append(&head, &frag))
+			break;
+		if (!head)
+			goto error;
+		buf = buf->next;
+	}
+	return frag;
+error:
+	pr_warn("Failed do clone local mcast rcv buffer\n");
+	kfree_skb(head);
+	return NULL;
+}
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 7d57434..a15d596 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -744,4 +744,6 @@ bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode);
 int tipc_msg_build2(struct tipc_msg *mhdr, struct iovec const *iov,
 		    int offset, int dsz, int mtu , struct sk_buff **chain);
 
+struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain);
+
 #endif
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 8c5600c..d16550f 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -534,6 +534,46 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
 	return mask;
 }
 
+/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
+ */
+void tipc_sk_mcast_rcv(struct sk_buff *buf)
+{
+	struct tipc_msg *msg = buf_msg(buf);
+	struct tipc_port_list dports = {0, NULL, };
+	struct tipc_port_list *item;
+	struct sk_buff *b;
+	uint i, last, dst = 0;
+	u32 scope = TIPC_CLUSTER_SCOPE;
+
+	if (in_own_node(msg_orignode(msg)))
+		scope = TIPC_NODE_SCOPE;
+
+	/* Create destination port list: */
+	tipc_nametbl_mc_translate(msg_nametype(msg),
+				  msg_namelower(msg),
+				  msg_nameupper(msg),
+				  scope,
+				  &dports);
+	last = dports.count;
+	if (!last) {
+		kfree_skb(buf);
+		return;
+	}
+
+	for (item = &dports; item; item = item->next) {
+		for (i = 0; i < PLSIZE && ++dst <= last; i++) {
+			b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf;
+			if (!b) {
+				pr_warn("Failed do clone mcast rcv buffer\n");
+				continue;
+			}
+			msg_set_destport(msg, item->ports[i]);
+			tipc_sk_rcv(b);
+		}
+	}
+	tipc_port_list_free(&dports);
+}
+
 /**
  * tipc_sk_proto_rcv - receive a connection mng protocol message
  * @tsk: receiving socket
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 2cdede9..43b75b3 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -85,4 +85,6 @@ static inline int tipc_sk_conn_cong(struct tipc_sock *tsk)
 
 int tipc_sk_rcv(struct sk_buff *buf);
 
+void tipc_sk_mcast_rcv(struct sk_buff *buf);
+
 #endif
-- 
1.7.9.5

^ permalink raw reply related	[flat|nested] 12+ messages in thread

end of thread, other threads:[~2014-07-16 22:21 UTC | newest]

Thread overview: 12+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2014-07-15  3:54 [PATCH net-next 0/7] tipc: multicast and internal users to new send functions Jon Maloy
2014-07-15  3:54 ` [PATCH net-next 1/7] tipc: make name table distributor use new send function Jon Maloy
2014-07-15  3:54 ` [PATCH net-next 2/7] tipc: let internal link users call the new link " Jon Maloy
2014-07-15  3:54 ` [PATCH net-next 3/7] tipc: add new functions for multicast and broadcast distribution Jon Maloy
2014-07-15  3:54 ` [PATCH net-next 4/7] tipc: start using the new multicast functions Jon Maloy
2014-07-15  3:54 ` [PATCH net-next 5/7] tipc: remove unreferenced functions Jon Maloy
2014-07-15  3:54 ` [PATCH net-next 6/7] tipc: rename temporarily named functions Jon Maloy
2014-07-15  3:54 ` [PATCH net-next 7/7] tipc: ensure sequential message delivery across dual bearers Jon Maloy
2014-07-15 14:49 [PATCH net-next 0/7] tipc: multicast and internal users to new send functions Jon Maloy
2014-07-15 14:49 ` [PATCH net-next 3/7] tipc: add new functions for multicast and broadcast distribution Jon Maloy
2014-07-15 17:46 [PATCH net-next 0/7] tipc: multicast and internal users to new send functions Jon Maloy
2014-07-15 17:46 ` [PATCH net-next 3/7] tipc: add new functions for multicast and broadcast distribution Jon Maloy
2014-07-16 22:10   ` David Miller
2014-07-16 22:21     ` Jon Maloy

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).