All of lore.kernel.org
 help / color / mirror / Atom feed
* [net-next] tipc: update a binding service via broadcast
@ 2020-06-07  4:24 Hoang Huu Le
  2020-06-07  5:38   ` kernel test robot
                   ` (5 more replies)
  0 siblings, 6 replies; 11+ messages in thread
From: Hoang Huu Le @ 2020-06-07  4:24 UTC (permalink / raw)
  To: jmaloy, maloy, ying.xue, tipc-discussion, netdev

Currently, updating binding table (add service binding to
name table/withdraw a service binding) is being sent over replicast.
However, if we are scaling up clusters to > 100 nodes/containers this
method is less affection because of looping through nodes in a cluster one
by one.

It is worth to use broadcast to update a binding service. This way, the
binding table can be updated on all peer nodes in one shot.

Broadcast is used when all peer nodes, as indicated by a new capability
flag TIPC_NAMED_BCAST, support reception of this message type.

Four problems need to be considered when introducing this feature.
1) When establishing a link to a new peer node we still update this by a
unicast 'bulk' update. This may lead to race conditions, where a later
broadcast publication/withdrawal bypass the 'bulk', resulting in
disordered publications, or even that a withdrawal may arrive before the
corresponding publication. We solve this by adding an 'is_last_bulk' bit
in the last bulk messages so that it can be distinguished from all other
messages. Only when this message has arrived do we open up for reception
of broadcast publications/withdrawals.
2) When a first legacy node is added to the cluster all distribution
will switch over to use the legacy 'replicast' method, while the
opposite happens when the last legacy node leaves the cluster. This
entails another risk of message disordering that has to be handled. We
solve this by adding a sequence number to the broadcast/replicast
messages, so that disordering can be discovered and corrected. Note
however that we don't need to consider potential message loss or
duplication at this protocol level.
3) Bulk messages don't contain any sequence numbers, and will always
arrive in order. Hence we must exempt those from the sequence number
control and deliver them unconditionally. We solve this by adding a new
'is_bulk' bit in those messages so that they can be recognized.
4) Legacy messages, which don't contain any new bits or sequence
numbers, but neither can arrive out of order, also need to be exempt
from the initial synchronization and sequence number check, and
delivered unconditionally. Therefore, we add another 'is_not_legacy' bit
to all new messages so that those can be distinguished from legacy
messages and the latter delivered directly.

Signed-off-by: Hoang Huu Le <hoang.h.le@dektech.com.au>
Acked-by: Jon Maloy <jmaloy@redhat.com>
---
 net/tipc/bcast.c      |   6 +--
 net/tipc/bcast.h      |   4 +-
 net/tipc/link.c       |   2 +-
 net/tipc/msg.h        |  40 ++++++++++++++++
 net/tipc/name_distr.c | 109 +++++++++++++++++++++++++++++++-----------
 net/tipc/name_distr.h |   9 ++--
 net/tipc/name_table.c |   9 +++-
 net/tipc/name_table.h |   2 +
 net/tipc/node.c       |  29 ++++++++---
 net/tipc/node.h       |   8 ++--
 10 files changed, 170 insertions(+), 48 deletions(-)

diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 383f87bc1061..940d176e0e87 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -250,8 +250,8 @@ static void tipc_bcast_select_xmit_method(struct net *net, int dests,
  * Consumes the buffer chain.
  * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE
  */
-static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
-			   u16 *cong_link_cnt)
+int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
+		    u16 *cong_link_cnt)
 {
 	struct tipc_link *l = tipc_bc_sndlink(net);
 	struct sk_buff_head xmitq;
@@ -752,7 +752,7 @@ void tipc_nlist_purge(struct tipc_nlist *nl)
 	nl->local = false;
 }
 
-u32 tipc_bcast_get_broadcast_mode(struct net *net)
+u32 tipc_bcast_get_mode(struct net *net)
 {
 	struct tipc_bc_base *bb = tipc_bc_base(net);
 
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 4240c95188b1..2d9352dc7b0e 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -90,6 +90,8 @@ void tipc_bcast_toggle_rcast(struct net *net, bool supp);
 int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
 		    struct tipc_mc_method *method, struct tipc_nlist *dests,
 		    u16 *cong_link_cnt);
+int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
+		    u16 *cong_link_cnt);
 int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
 void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
 			struct tipc_msg *hdr);
@@ -101,7 +103,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg,
 int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
 int tipc_bclink_reset_stats(struct net *net, struct tipc_link *l);
 
-u32 tipc_bcast_get_broadcast_mode(struct net *net);
+u32 tipc_bcast_get_mode(struct net *net);
 u32 tipc_bcast_get_broadcast_ratio(struct net *net);
 
 void tipc_mcast_filter_msg(struct net *net, struct sk_buff_head *defq,
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ee3b8d0576b8..eac89a3e22ce 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -2745,7 +2745,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg,
 	void *hdr;
 	struct nlattr *attrs;
 	struct nlattr *prop;
-	u32 bc_mode = tipc_bcast_get_broadcast_mode(net);
+	u32 bc_mode = tipc_bcast_get_mode(net);
 	u32 bc_ratio = tipc_bcast_get_broadcast_ratio(net);
 
 	if (!bcl)
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 58660d56bc83..65119e81ff0c 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -438,6 +438,36 @@ static inline void msg_set_errcode(struct tipc_msg *m, u32 err)
 	msg_set_bits(m, 1, 25, 0xf, err);
 }
 
+static inline void msg_set_bulk(struct tipc_msg *m)
+{
+	msg_set_bits(m, 1, 28, 0x1, 1);
+}
+
+static inline u32 msg_is_bulk(struct tipc_msg *m)
+{
+	return msg_bits(m, 1, 28, 0x1);
+}
+
+static inline void msg_set_last_bulk(struct tipc_msg *m)
+{
+	msg_set_bits(m, 1, 27, 0x1, 1);
+}
+
+static inline u32 msg_is_last_bulk(struct tipc_msg *m)
+{
+	return msg_bits(m, 1, 27, 0x1);
+}
+
+static inline void msg_set_non_legacy(struct tipc_msg *m)
+{
+	msg_set_bits(m, 1, 26, 0x1, 1);
+}
+
+static inline u32 msg_is_legacy(struct tipc_msg *m)
+{
+	return !msg_bits(m, 1, 26, 0x1);
+}
+
 static inline u32 msg_reroute_cnt(struct tipc_msg *m)
 {
 	return msg_bits(m, 1, 21, 0xf);
@@ -567,6 +597,16 @@ static inline void msg_set_origport(struct tipc_msg *m, u32 p)
 	msg_set_word(m, 4, p);
 }
 
+static inline u16 msg_named_seqno(struct tipc_msg *m)
+{
+	return msg_bits(m, 4, 0, 0xffff);
+}
+
+static inline void msg_set_named_seqno(struct tipc_msg *m, u16 n)
+{
+	msg_set_bits(m, 4, 0, 0xffff, n);
+}
+
 static inline u32 msg_destport(struct tipc_msg *m)
 {
 	return msg_word(m, 5);
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 5feaf3b67380..481d480609f0 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -102,7 +102,8 @@ struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ)
 		pr_warn("Publication distribution failure\n");
 		return NULL;
 	}
-
+	msg_set_named_seqno(buf_msg(skb), nt->snd_nxt++);
+	msg_set_non_legacy(buf_msg(skb));
 	item = (struct distr_item *)msg_data(buf_msg(skb));
 	publ_to_item(item, publ);
 	return skb;
@@ -114,8 +115,8 @@ struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ)
 struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ)
 {
 	struct name_table *nt = tipc_name_table(net);
-	struct sk_buff *buf;
 	struct distr_item *item;
+	struct sk_buff *skb;
 
 	write_lock_bh(&nt->cluster_scope_lock);
 	list_del(&publ->binding_node);
@@ -123,15 +124,16 @@ struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ)
 	if (publ->scope == TIPC_NODE_SCOPE)
 		return NULL;
 
-	buf = named_prepare_buf(net, WITHDRAWAL, ITEM_SIZE, 0);
-	if (!buf) {
+	skb = named_prepare_buf(net, WITHDRAWAL, ITEM_SIZE, 0);
+	if (!skb) {
 		pr_warn("Withdrawal distribution failure\n");
 		return NULL;
 	}
-
-	item = (struct distr_item *)msg_data(buf_msg(buf));
+	msg_set_named_seqno(buf_msg(skb), nt->snd_nxt++);
+	msg_set_non_legacy(buf_msg(skb));
+	item = (struct distr_item *)msg_data(buf_msg(skb));
 	publ_to_item(item, publ);
-	return buf;
+	return skb;
 }
 
 /**
@@ -141,7 +143,7 @@ struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ)
  * @pls: linked list of publication items to be packed into buffer chain
  */
 static void named_distribute(struct net *net, struct sk_buff_head *list,
-			     u32 dnode, struct list_head *pls)
+			     u32 dnode, struct list_head *pls, u16 seqno)
 {
 	struct publication *publ;
 	struct sk_buff *skb = NULL;
@@ -149,6 +151,7 @@ static void named_distribute(struct net *net, struct sk_buff_head *list,
 	u32 msg_dsz = ((tipc_node_get_mtu(net, dnode, 0, false) - INT_H_SIZE) /
 			ITEM_SIZE) * ITEM_SIZE;
 	u32 msg_rem = msg_dsz;
+	struct tipc_msg *hdr;
 
 	list_for_each_entry(publ, pls, binding_node) {
 		/* Prepare next buffer: */
@@ -159,8 +162,11 @@ static void named_distribute(struct net *net, struct sk_buff_head *list,
 				pr_warn("Bulk publication failure\n");
 				return;
 			}
-			msg_set_bc_ack_invalid(buf_msg(skb), true);
-			item = (struct distr_item *)msg_data(buf_msg(skb));
+			hdr = buf_msg(skb);
+			msg_set_bc_ack_invalid(hdr, true);
+			msg_set_bulk(hdr);
+			msg_set_non_legacy(hdr);
+			item = (struct distr_item *)msg_data(hdr);
 		}
 
 		/* Pack publication into message: */
@@ -176,24 +182,35 @@ static void named_distribute(struct net *net, struct sk_buff_head *list,
 		}
 	}
 	if (skb) {
-		msg_set_size(buf_msg(skb), INT_H_SIZE + (msg_dsz - msg_rem));
+		hdr = buf_msg(skb);
+		msg_set_size(hdr, INT_H_SIZE + (msg_dsz - msg_rem));
 		skb_trim(skb, INT_H_SIZE + (msg_dsz - msg_rem));
 		__skb_queue_tail(list, skb);
 	}
+	hdr = buf_msg(skb_peek_tail(list));
+	msg_set_last_bulk(hdr);
+	msg_set_named_seqno(hdr, seqno);
 }
 
 /**
  * tipc_named_node_up - tell specified node about all publications by this node
  */
-void tipc_named_node_up(struct net *net, u32 dnode)
+void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities)
 {
 	struct name_table *nt = tipc_name_table(net);
+	struct tipc_net *tn = tipc_net(net);
 	struct sk_buff_head head;
+	u16 seqno;
 
 	__skb_queue_head_init(&head);
+	spin_lock_bh(&tn->nametbl_lock);
+	if (!(capabilities & TIPC_NAMED_BCAST))
+		nt->rc_dests++;
+	seqno = nt->snd_nxt;
+	spin_unlock_bh(&tn->nametbl_lock);
 
 	read_lock_bh(&nt->cluster_scope_lock);
-	named_distribute(net, &head, dnode, &nt->cluster_scope);
+	named_distribute(net, &head, dnode, &nt->cluster_scope, seqno);
 	tipc_node_xmit(net, &head, dnode, 0);
 	read_unlock_bh(&nt->cluster_scope_lock);
 }
@@ -245,13 +262,21 @@ static void tipc_dist_queue_purge(struct net *net, u32 addr)
 	spin_unlock_bh(&tn->nametbl_lock);
 }
 
-void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr)
+void tipc_publ_notify(struct net *net, struct list_head *nsub_list,
+		      u32 addr, u16 capabilities)
 {
+	struct name_table *nt = tipc_name_table(net);
+	struct tipc_net *tn = tipc_net(net);
+
 	struct publication *publ, *tmp;
 
 	list_for_each_entry_safe(publ, tmp, nsub_list, binding_node)
 		tipc_publ_purge(net, publ, addr);
 	tipc_dist_queue_purge(net, addr);
+	spin_lock_bh(&tn->nametbl_lock);
+	if (!(capabilities & TIPC_NAMED_BCAST))
+		nt->rc_dests--;
+	spin_unlock_bh(&tn->nametbl_lock);
 }
 
 /**
@@ -295,29 +320,55 @@ static bool tipc_update_nametbl(struct net *net, struct distr_item *i,
 	return false;
 }
 
+struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
+				   u16 *rcv_nxt, bool *open)
+{
+	struct sk_buff *skb, *tmp;
+	struct tipc_msg *hdr;
+	u16 seqno;
+
+	skb_queue_walk_safe(namedq, skb, tmp) {
+		skb_linearize(skb);
+		hdr = buf_msg(skb);
+		seqno = msg_named_seqno(hdr);
+		if (msg_is_last_bulk(hdr)) {
+			*rcv_nxt = seqno;
+			*open = true;
+		}
+		if (msg_is_bulk(hdr) || msg_is_legacy(hdr)) {
+			__skb_unlink(skb, namedq);
+			return skb;
+		}
+
+		if (*open && (*rcv_nxt == seqno)) {
+			(*rcv_nxt)++;
+			__skb_unlink(skb, namedq);
+			return skb;
+		}
+	}
+	return NULL;
+}
+
 /**
  * tipc_named_rcv - process name table update messages sent by another node
  */
-void tipc_named_rcv(struct net *net, struct sk_buff_head *inputq)
+void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
+		    u16 *rcv_nxt, bool *open)
 {
-	struct tipc_net *tn = net_generic(net, tipc_net_id);
-	struct tipc_msg *msg;
+	struct tipc_net *tn = tipc_net(net);
 	struct distr_item *item;
-	uint count;
-	u32 node;
+	struct tipc_msg *hdr;
 	struct sk_buff *skb;
-	int mtype;
+	u32 count, node = 0;
 
 	spin_lock_bh(&tn->nametbl_lock);
-	for (skb = skb_dequeue(inputq); skb; skb = skb_dequeue(inputq)) {
-		skb_linearize(skb);
-		msg = buf_msg(skb);
-		mtype = msg_type(msg);
-		item = (struct distr_item *)msg_data(msg);
-		count = msg_data_sz(msg) / ITEM_SIZE;
-		node = msg_orignode(msg);
+	while ((skb = tipc_named_dequeue(namedq, rcv_nxt, open))) {
+		hdr = buf_msg(skb);
+		node = msg_orignode(hdr);
+		item = (struct distr_item *)msg_data(hdr);
+		count = msg_data_sz(hdr) / ITEM_SIZE;
 		while (count--) {
-			tipc_update_nametbl(net, item, node, mtype);
+			tipc_update_nametbl(net, item, node, msg_type(hdr));
 			item++;
 		}
 		kfree_skb(skb);
@@ -345,6 +396,6 @@ void tipc_named_reinit(struct net *net)
 		publ->node = self;
 	list_for_each_entry_rcu(publ, &nt->cluster_scope, binding_node)
 		publ->node = self;
-
+	nt->rc_dests = 0;
 	spin_unlock_bh(&tn->nametbl_lock);
 }
diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h
index 63fc73e0fa6c..092323158f06 100644
--- a/net/tipc/name_distr.h
+++ b/net/tipc/name_distr.h
@@ -67,11 +67,14 @@ struct distr_item {
 	__be32 key;
 };
 
+void tipc_named_bcast(struct net *net, struct sk_buff *skb);
 struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ);
 struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ);
-void tipc_named_node_up(struct net *net, u32 dnode);
-void tipc_named_rcv(struct net *net, struct sk_buff_head *msg_queue);
+void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities);
+void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
+		    u16 *rcv_nxt, bool *open);
 void tipc_named_reinit(struct net *net);
-void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr);
+void tipc_publ_notify(struct net *net, struct list_head *nsub_list,
+		      u32 addr, u16 capabilities);
 
 #endif
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 359b2bc888cf..2ac33d32edc2 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -729,6 +729,7 @@ struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
 	struct tipc_net *tn = tipc_net(net);
 	struct publication *p = NULL;
 	struct sk_buff *skb = NULL;
+	u32 rc_dests;
 
 	spin_lock_bh(&tn->nametbl_lock);
 
@@ -743,12 +744,14 @@ struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
 		nt->local_publ_count++;
 		skb = tipc_named_publish(net, p);
 	}
+	rc_dests = nt->rc_dests;
 exit:
 	spin_unlock_bh(&tn->nametbl_lock);
 
 	if (skb)
-		tipc_node_broadcast(net, skb);
+		tipc_node_broadcast(net, skb, rc_dests);
 	return p;
+
 }
 
 /**
@@ -762,6 +765,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower,
 	u32 self = tipc_own_addr(net);
 	struct sk_buff *skb = NULL;
 	struct publication *p;
+	u32 rc_dests;
 
 	spin_lock_bh(&tn->nametbl_lock);
 
@@ -775,10 +779,11 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower,
 		pr_err("Failed to remove local publication {%u,%u,%u}/%u\n",
 		       type, lower, upper, key);
 	}
+	rc_dests = nt->rc_dests;
 	spin_unlock_bh(&tn->nametbl_lock);
 
 	if (skb) {
-		tipc_node_broadcast(net, skb);
+		tipc_node_broadcast(net, skb, rc_dests);
 		return 1;
 	}
 	return 0;
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index 728bc7016c38..8064e1986e2c 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -106,6 +106,8 @@ struct name_table {
 	struct list_head cluster_scope;
 	rwlock_t cluster_scope_lock;
 	u32 local_publ_count;
+	u32 rc_dests;
+	u32 snd_nxt;
 };
 
 int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index a4c2816c3746..030a51c4d1fa 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -75,6 +75,8 @@ struct tipc_bclink_entry {
 	struct sk_buff_head arrvq;
 	struct sk_buff_head inputq2;
 	struct sk_buff_head namedq;
+	u16 named_rcv_nxt;
+	bool named_open;
 };
 
 /**
@@ -396,10 +398,10 @@ static void tipc_node_write_unlock(struct tipc_node *n)
 	write_unlock_bh(&n->lock);
 
 	if (flags & TIPC_NOTIFY_NODE_DOWN)
-		tipc_publ_notify(net, publ_list, addr);
+		tipc_publ_notify(net, publ_list, addr, n->capabilities);
 
 	if (flags & TIPC_NOTIFY_NODE_UP)
-		tipc_named_node_up(net, addr);
+		tipc_named_node_up(net, addr, n->capabilities);
 
 	if (flags & TIPC_NOTIFY_LINK_UP) {
 		tipc_mon_peer_up(net, addr, bearer_id);
@@ -1483,6 +1485,7 @@ static void node_lost_contact(struct tipc_node *n,
 
 	/* Clean up broadcast state */
 	tipc_bcast_remove_peer(n->net, n->bc_entry.link);
+	__skb_queue_purge(&n->bc_entry.namedq);
 
 	/* Abort any ongoing link failover */
 	for (i = 0; i < MAX_BEARERS; i++) {
@@ -1729,12 +1732,23 @@ int tipc_node_distr_xmit(struct net *net, struct sk_buff_head *xmitq)
 	return 0;
 }
 
-void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
+void tipc_node_broadcast(struct net *net, struct sk_buff *skb, int rc_dests)
 {
+	struct sk_buff_head xmitq;
 	struct sk_buff *txskb;
 	struct tipc_node *n;
+	u16 dummy;
 	u32 dst;
 
+	/* Use broadcast if all nodes support it */
+	if (!rc_dests && tipc_bcast_get_mode(net) != BCLINK_MODE_RCAST) {
+		__skb_queue_head_init(&xmitq);
+		__skb_queue_tail(&xmitq, skb);
+		tipc_bcast_xmit(net, &xmitq, &dummy);
+		return;
+	}
+
+	/* Otherwise use legacy replicast method */
 	rcu_read_lock();
 	list_for_each_entry_rcu(n, tipc_nodes(net), list) {
 		dst = n->addr;
@@ -1749,7 +1763,6 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
 		tipc_node_xmit_skb(net, txskb, dst, 0);
 	}
 	rcu_read_unlock();
-
 	kfree_skb(skb);
 }
 
@@ -1844,7 +1857,9 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id
 
 	/* Handle NAME_DISTRIBUTOR messages sent from 1.7 nodes */
 	if (!skb_queue_empty(&n->bc_entry.namedq))
-		tipc_named_rcv(net, &n->bc_entry.namedq);
+		tipc_named_rcv(net, &n->bc_entry.namedq,
+			       &n->bc_entry.named_rcv_nxt,
+			       &n->bc_entry.named_open);
 
 	/* If reassembly or retransmission failure => reset all links to peer */
 	if (rc & TIPC_LINK_DOWN_EVT)
@@ -2114,7 +2129,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
 		tipc_node_link_down(n, bearer_id, false);
 
 	if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
-		tipc_named_rcv(net, &n->bc_entry.namedq);
+		tipc_named_rcv(net, &n->bc_entry.namedq,
+			       &n->bc_entry.named_rcv_nxt,
+			       &n->bc_entry.named_open);
 
 	if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1)))
 		tipc_node_mcast_rcv(n);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index a6803b449a2c..9f6f13f1604f 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -55,7 +55,8 @@ enum {
 	TIPC_MCAST_RBCTL      = (1 << 7),
 	TIPC_GAP_ACK_BLOCK    = (1 << 8),
 	TIPC_TUNNEL_ENHANCED  = (1 << 9),
-	TIPC_NAGLE            = (1 << 10)
+	TIPC_NAGLE            = (1 << 10),
+	TIPC_NAMED_BCAST      = (1 << 11)
 };
 
 #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT           |  \
@@ -68,7 +69,8 @@ enum {
 				TIPC_MCAST_RBCTL       |   \
 				TIPC_GAP_ACK_BLOCK     |   \
 				TIPC_TUNNEL_ENHANCED   |   \
-				TIPC_NAGLE)
+				TIPC_NAGLE             |   \
+				TIPC_NAMED_BCAST)
 
 #define INVALID_BEARER_ID -1
 
@@ -101,7 +103,7 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
 		       u32 selector);
 void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 addr);
 void tipc_node_unsubscribe(struct net *net, struct list_head *subscr, u32 addr);
-void tipc_node_broadcast(struct net *net, struct sk_buff *skb);
+void tipc_node_broadcast(struct net *net, struct sk_buff *skb, int rc_dests);
 int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
 void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
 int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel, bool connected);
-- 
2.25.1


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* Re: [net-next] tipc: update a binding service via broadcast
  2020-06-07  4:24 [net-next] tipc: update a binding service via broadcast Hoang Huu Le
@ 2020-06-07  5:38   ` kernel test robot
  2020-06-07  6:22   ` kernel test robot
                     ` (4 subsequent siblings)
  5 siblings, 0 replies; 11+ messages in thread
From: kernel test robot @ 2020-06-07  5:38 UTC (permalink / raw)
  To: Hoang Huu Le, jmaloy, maloy, ying.xue, tipc-discussion, netdev; +Cc: kbuild-all

[-- Attachment #1: Type: text/plain, Size: 4360 bytes --]

Hi Hoang,

Thank you for the patch! Perhaps something to improve:

[auto build test WARNING on net-next/master]
[also build test WARNING on linus/master next-20200605]
[cannot apply to v5.7]
[if your patch is applied to the wrong git tree, please drop us a note to help
improve the system. BTW, we also suggest to use '--base' option to specify the
base tree in git format-patch, please see https://stackoverflow.com/a/37406982]

url:    https://github.com/0day-ci/linux/commits/Hoang-Huu-Le/tipc-update-a-binding-service-via-broadcast/20200607-122712
base:   https://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next.git cb8e59cc87201af93dfbb6c3dccc8fcad72a09c2
config: um-allmodconfig (attached as .config)
compiler: gcc-9 (Debian 9.3.0-13) 9.3.0
reproduce (this is a W=1 build):
        # save the attached .config to linux build tree
        make W=1 ARCH=um 

If you fix the issue, kindly add following tag as appropriate
Reported-by: kernel test robot <lkp@intel.com>

All warnings (new ones prefixed by >>, old ones prefixed by <<):

cc1: warning: arch/um/include/uapi: No such file or directory [-Wmissing-include-dirs]
In file included from include/linux/string.h:6,
from include/uapi/linux/tipc_config.h:42,
from net/tipc/core.h:41,
from net/tipc/name_distr.c:37:
include/asm-generic/fixmap.h: In function 'fix_to_virt':
include/asm-generic/fixmap.h:32:19: warning: comparison of unsigned expression >= 0 is always true [-Wtype-limits]
32 |  BUILD_BUG_ON(idx >= __end_of_fixed_addresses);
|                   ^~
include/linux/compiler.h:383:9: note: in definition of macro '__compiletime_assert'
383 |   if (!(condition))              |         ^~~~~~~~~
include/linux/compiler.h:403:2: note: in expansion of macro '_compiletime_assert'
403 |  _compiletime_assert(condition, msg, __compiletime_assert_, __COUNTER__)
|  ^~~~~~~~~~~~~~~~~~~
include/linux/build_bug.h:39:37: note: in expansion of macro 'compiletime_assert'
39 | #define BUILD_BUG_ON_MSG(cond, msg) compiletime_assert(!(cond), msg)
|                                     ^~~~~~~~~~~~~~~~~~
include/linux/build_bug.h:50:2: note: in expansion of macro 'BUILD_BUG_ON_MSG'
50 |  BUILD_BUG_ON_MSG(condition, "BUILD_BUG_ON failed: " #condition)
|  ^~~~~~~~~~~~~~~~
include/asm-generic/fixmap.h:32:2: note: in expansion of macro 'BUILD_BUG_ON'
32 |  BUILD_BUG_ON(idx >= __end_of_fixed_addresses);
|  ^~~~~~~~~~~~
In file included from include/linux/uaccess.h:11,
from include/linux/sched/task.h:11,
from include/linux/sched/signal.h:9,
from include/linux/rcuwait.h:6,
from include/linux/percpu-rwsem.h:7,
from include/linux/fs.h:34,
from include/linux/huge_mm.h:8,
from include/linux/mm.h:675,
from net/tipc/core.h:46,
from net/tipc/name_distr.c:37:
arch/um/include/asm/uaccess.h: In function '__access_ok':
arch/um/include/asm/uaccess.h:17:29: warning: comparison of unsigned expression >= 0 is always true [-Wtype-limits]
17 |    (((unsigned long) (addr) >= FIXADDR_USER_START) &&          |                             ^~
arch/um/include/asm/uaccess.h:45:3: note: in expansion of macro '__access_ok_vsyscall'
45 |   __access_ok_vsyscall(addr, size) ||
|   ^~~~~~~~~~~~~~~~~~~~
net/tipc/name_distr.c: At top level:
>> net/tipc/name_distr.c:323:17: warning: no previous prototype for 'tipc_named_dequeue' [-Wmissing-prototypes]
323 | struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
|                 ^~~~~~~~~~~~~~~~~~

vim +/tipc_named_dequeue +323 net/tipc/name_distr.c

   322	
 > 323	struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
   324					   u16 *rcv_nxt, bool *open)
   325	{
   326		struct sk_buff *skb, *tmp;
   327		struct tipc_msg *hdr;
   328		u16 seqno;
   329	
   330		skb_queue_walk_safe(namedq, skb, tmp) {
   331			skb_linearize(skb);
   332			hdr = buf_msg(skb);
   333			seqno = msg_named_seqno(hdr);
   334			if (msg_is_last_bulk(hdr)) {
   335				*rcv_nxt = seqno;
   336				*open = true;
   337			}
   338			if (msg_is_bulk(hdr) || msg_is_legacy(hdr)) {
   339				__skb_unlink(skb, namedq);
   340				return skb;
   341			}
   342	
   343			if (*open && (*rcv_nxt == seqno)) {
   344				(*rcv_nxt)++;
   345				__skb_unlink(skb, namedq);
   346				return skb;
   347			}
   348		}
   349		return NULL;
   350	}
   351	

---
0-DAY CI Kernel Test Service, Intel Corporation
https://lists.01.org/hyperkitty/list/kbuild-all@lists.01.org

[-- Attachment #2: .config.gz --]
[-- Type: application/gzip, Size: 22681 bytes --]

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [net-next] tipc: update a binding service via broadcast
@ 2020-06-07  5:38   ` kernel test robot
  0 siblings, 0 replies; 11+ messages in thread
From: kernel test robot @ 2020-06-07  5:38 UTC (permalink / raw)
  To: kbuild-all

[-- Attachment #1: Type: text/plain, Size: 4466 bytes --]

Hi Hoang,

Thank you for the patch! Perhaps something to improve:

[auto build test WARNING on net-next/master]
[also build test WARNING on linus/master next-20200605]
[cannot apply to v5.7]
[if your patch is applied to the wrong git tree, please drop us a note to help
improve the system. BTW, we also suggest to use '--base' option to specify the
base tree in git format-patch, please see https://stackoverflow.com/a/37406982]

url:    https://github.com/0day-ci/linux/commits/Hoang-Huu-Le/tipc-update-a-binding-service-via-broadcast/20200607-122712
base:   https://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next.git cb8e59cc87201af93dfbb6c3dccc8fcad72a09c2
config: um-allmodconfig (attached as .config)
compiler: gcc-9 (Debian 9.3.0-13) 9.3.0
reproduce (this is a W=1 build):
        # save the attached .config to linux build tree
        make W=1 ARCH=um 

If you fix the issue, kindly add following tag as appropriate
Reported-by: kernel test robot <lkp@intel.com>

All warnings (new ones prefixed by >>, old ones prefixed by <<):

cc1: warning: arch/um/include/uapi: No such file or directory [-Wmissing-include-dirs]
In file included from include/linux/string.h:6,
from include/uapi/linux/tipc_config.h:42,
from net/tipc/core.h:41,
from net/tipc/name_distr.c:37:
include/asm-generic/fixmap.h: In function 'fix_to_virt':
include/asm-generic/fixmap.h:32:19: warning: comparison of unsigned expression >= 0 is always true [-Wtype-limits]
32 |  BUILD_BUG_ON(idx >= __end_of_fixed_addresses);
|                   ^~
include/linux/compiler.h:383:9: note: in definition of macro '__compiletime_assert'
383 |   if (!(condition))              |         ^~~~~~~~~
include/linux/compiler.h:403:2: note: in expansion of macro '_compiletime_assert'
403 |  _compiletime_assert(condition, msg, __compiletime_assert_, __COUNTER__)
|  ^~~~~~~~~~~~~~~~~~~
include/linux/build_bug.h:39:37: note: in expansion of macro 'compiletime_assert'
39 | #define BUILD_BUG_ON_MSG(cond, msg) compiletime_assert(!(cond), msg)
|                                     ^~~~~~~~~~~~~~~~~~
include/linux/build_bug.h:50:2: note: in expansion of macro 'BUILD_BUG_ON_MSG'
50 |  BUILD_BUG_ON_MSG(condition, "BUILD_BUG_ON failed: " #condition)
|  ^~~~~~~~~~~~~~~~
include/asm-generic/fixmap.h:32:2: note: in expansion of macro 'BUILD_BUG_ON'
32 |  BUILD_BUG_ON(idx >= __end_of_fixed_addresses);
|  ^~~~~~~~~~~~
In file included from include/linux/uaccess.h:11,
from include/linux/sched/task.h:11,
from include/linux/sched/signal.h:9,
from include/linux/rcuwait.h:6,
from include/linux/percpu-rwsem.h:7,
from include/linux/fs.h:34,
from include/linux/huge_mm.h:8,
from include/linux/mm.h:675,
from net/tipc/core.h:46,
from net/tipc/name_distr.c:37:
arch/um/include/asm/uaccess.h: In function '__access_ok':
arch/um/include/asm/uaccess.h:17:29: warning: comparison of unsigned expression >= 0 is always true [-Wtype-limits]
17 |    (((unsigned long) (addr) >= FIXADDR_USER_START) &&          |                             ^~
arch/um/include/asm/uaccess.h:45:3: note: in expansion of macro '__access_ok_vsyscall'
45 |   __access_ok_vsyscall(addr, size) ||
|   ^~~~~~~~~~~~~~~~~~~~
net/tipc/name_distr.c: At top level:
>> net/tipc/name_distr.c:323:17: warning: no previous prototype for 'tipc_named_dequeue' [-Wmissing-prototypes]
323 | struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
|                 ^~~~~~~~~~~~~~~~~~

vim +/tipc_named_dequeue +323 net/tipc/name_distr.c

   322	
 > 323	struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
   324					   u16 *rcv_nxt, bool *open)
   325	{
   326		struct sk_buff *skb, *tmp;
   327		struct tipc_msg *hdr;
   328		u16 seqno;
   329	
   330		skb_queue_walk_safe(namedq, skb, tmp) {
   331			skb_linearize(skb);
   332			hdr = buf_msg(skb);
   333			seqno = msg_named_seqno(hdr);
   334			if (msg_is_last_bulk(hdr)) {
   335				*rcv_nxt = seqno;
   336				*open = true;
   337			}
   338			if (msg_is_bulk(hdr) || msg_is_legacy(hdr)) {
   339				__skb_unlink(skb, namedq);
   340				return skb;
   341			}
   342	
   343			if (*open && (*rcv_nxt == seqno)) {
   344				(*rcv_nxt)++;
   345				__skb_unlink(skb, namedq);
   346				return skb;
   347			}
   348		}
   349		return NULL;
   350	}
   351	

---
0-DAY CI Kernel Test Service, Intel Corporation
https://lists.01.org/hyperkitty/list/kbuild-all(a)lists.01.org

[-- Attachment #2: config.gz --]
[-- Type: application/gzip, Size: 22681 bytes --]

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [net-next] tipc: update a binding service via broadcast
  2020-06-07  4:24 [net-next] tipc: update a binding service via broadcast Hoang Huu Le
@ 2020-06-07  6:22   ` kernel test robot
  2020-06-07  6:22   ` kernel test robot
                     ` (4 subsequent siblings)
  5 siblings, 0 replies; 11+ messages in thread
From: kernel test robot @ 2020-06-07  6:22 UTC (permalink / raw)
  To: Hoang Huu Le, jmaloy, maloy, ying.xue, tipc-discussion, netdev
  Cc: kbuild-all, clang-built-linux

[-- Attachment #1: Type: text/plain, Size: 2804 bytes --]

Hi Hoang,

Thank you for the patch! Perhaps something to improve:

[auto build test WARNING on net-next/master]
[also build test WARNING on linus/master next-20200605]
[cannot apply to v5.7]
[if your patch is applied to the wrong git tree, please drop us a note to help
improve the system. BTW, we also suggest to use '--base' option to specify the
base tree in git format-patch, please see https://stackoverflow.com/a/37406982]

url:    https://github.com/0day-ci/linux/commits/Hoang-Huu-Le/tipc-update-a-binding-service-via-broadcast/20200607-122712
base:   https://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next.git cb8e59cc87201af93dfbb6c3dccc8fcad72a09c2
config: x86_64-randconfig-r022-20200607 (attached as .config)
compiler: clang version 11.0.0 (https://github.com/llvm/llvm-project e429cffd4f228f70c1d9df0e5d77c08590dd9766)
reproduce (this is a W=1 build):
        wget https://raw.githubusercontent.com/intel/lkp-tests/master/sbin/make.cross -O ~/bin/make.cross
        chmod +x ~/bin/make.cross
        # install x86_64 cross compiling tool for clang build
        # apt-get install binutils-x86-64-linux-gnu
        # save the attached .config to linux build tree
        COMPILER_INSTALL_PATH=$HOME/0day COMPILER=clang make.cross ARCH=x86_64 

If you fix the issue, kindly add following tag as appropriate
Reported-by: kernel test robot <lkp@intel.com>

All warnings (new ones prefixed by >>, old ones prefixed by <<):

>> net/tipc/name_distr.c:323:17: warning: no previous prototype for function 'tipc_named_dequeue' [-Wmissing-prototypes]
struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
^
net/tipc/name_distr.c:323:1: note: declare 'static' if the function is not intended to be used outside of this translation unit
struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
^
static
1 warning generated.

vim +/tipc_named_dequeue +323 net/tipc/name_distr.c

   322	
 > 323	struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
   324					   u16 *rcv_nxt, bool *open)
   325	{
   326		struct sk_buff *skb, *tmp;
   327		struct tipc_msg *hdr;
   328		u16 seqno;
   329	
   330		skb_queue_walk_safe(namedq, skb, tmp) {
   331			skb_linearize(skb);
   332			hdr = buf_msg(skb);
   333			seqno = msg_named_seqno(hdr);
   334			if (msg_is_last_bulk(hdr)) {
   335				*rcv_nxt = seqno;
   336				*open = true;
   337			}
   338			if (msg_is_bulk(hdr) || msg_is_legacy(hdr)) {
   339				__skb_unlink(skb, namedq);
   340				return skb;
   341			}
   342	
   343			if (*open && (*rcv_nxt == seqno)) {
   344				(*rcv_nxt)++;
   345				__skb_unlink(skb, namedq);
   346				return skb;
   347			}
   348		}
   349		return NULL;
   350	}
   351	

---
0-DAY CI Kernel Test Service, Intel Corporation
https://lists.01.org/hyperkitty/list/kbuild-all@lists.01.org

[-- Attachment #2: .config.gz --]
[-- Type: application/gzip, Size: 30659 bytes --]

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [net-next] tipc: update a binding service via broadcast
@ 2020-06-07  6:22   ` kernel test robot
  0 siblings, 0 replies; 11+ messages in thread
From: kernel test robot @ 2020-06-07  6:22 UTC (permalink / raw)
  To: kbuild-all

[-- Attachment #1: Type: text/plain, Size: 2879 bytes --]

Hi Hoang,

Thank you for the patch! Perhaps something to improve:

[auto build test WARNING on net-next/master]
[also build test WARNING on linus/master next-20200605]
[cannot apply to v5.7]
[if your patch is applied to the wrong git tree, please drop us a note to help
improve the system. BTW, we also suggest to use '--base' option to specify the
base tree in git format-patch, please see https://stackoverflow.com/a/37406982]

url:    https://github.com/0day-ci/linux/commits/Hoang-Huu-Le/tipc-update-a-binding-service-via-broadcast/20200607-122712
base:   https://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next.git cb8e59cc87201af93dfbb6c3dccc8fcad72a09c2
config: x86_64-randconfig-r022-20200607 (attached as .config)
compiler: clang version 11.0.0 (https://github.com/llvm/llvm-project e429cffd4f228f70c1d9df0e5d77c08590dd9766)
reproduce (this is a W=1 build):
        wget https://raw.githubusercontent.com/intel/lkp-tests/master/sbin/make.cross -O ~/bin/make.cross
        chmod +x ~/bin/make.cross
        # install x86_64 cross compiling tool for clang build
        # apt-get install binutils-x86-64-linux-gnu
        # save the attached .config to linux build tree
        COMPILER_INSTALL_PATH=$HOME/0day COMPILER=clang make.cross ARCH=x86_64 

If you fix the issue, kindly add following tag as appropriate
Reported-by: kernel test robot <lkp@intel.com>

All warnings (new ones prefixed by >>, old ones prefixed by <<):

>> net/tipc/name_distr.c:323:17: warning: no previous prototype for function 'tipc_named_dequeue' [-Wmissing-prototypes]
struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
^
net/tipc/name_distr.c:323:1: note: declare 'static' if the function is not intended to be used outside of this translation unit
struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
^
static
1 warning generated.

vim +/tipc_named_dequeue +323 net/tipc/name_distr.c

   322	
 > 323	struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
   324					   u16 *rcv_nxt, bool *open)
   325	{
   326		struct sk_buff *skb, *tmp;
   327		struct tipc_msg *hdr;
   328		u16 seqno;
   329	
   330		skb_queue_walk_safe(namedq, skb, tmp) {
   331			skb_linearize(skb);
   332			hdr = buf_msg(skb);
   333			seqno = msg_named_seqno(hdr);
   334			if (msg_is_last_bulk(hdr)) {
   335				*rcv_nxt = seqno;
   336				*open = true;
   337			}
   338			if (msg_is_bulk(hdr) || msg_is_legacy(hdr)) {
   339				__skb_unlink(skb, namedq);
   340				return skb;
   341			}
   342	
   343			if (*open && (*rcv_nxt == seqno)) {
   344				(*rcv_nxt)++;
   345				__skb_unlink(skb, namedq);
   346				return skb;
   347			}
   348		}
   349		return NULL;
   350	}
   351	

---
0-DAY CI Kernel Test Service, Intel Corporation
https://lists.01.org/hyperkitty/list/kbuild-all(a)lists.01.org

[-- Attachment #2: config.gz --]
[-- Type: application/gzip, Size: 30659 bytes --]

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [net-next] tipc: update a binding service via broadcast
  2020-06-07  4:24 [net-next] tipc: update a binding service via broadcast Hoang Huu Le
@ 2020-06-07  8:18   ` kernel test robot
  2020-06-07  6:22   ` kernel test robot
                     ` (4 subsequent siblings)
  5 siblings, 0 replies; 11+ messages in thread
From: kernel test robot @ 2020-06-07  8:18 UTC (permalink / raw)
  To: Hoang Huu Le, jmaloy, maloy, ying.xue, tipc-discussion, netdev; +Cc: kbuild-all

[-- Attachment #1: Type: text/plain, Size: 1435 bytes --]

Hi Hoang,

Thank you for the patch! Perhaps something to improve:

[auto build test WARNING on net-next/master]
[also build test WARNING on linus/master next-20200605]
[cannot apply to v5.7]
[if your patch is applied to the wrong git tree, please drop us a note to help
improve the system. BTW, we also suggest to use '--base' option to specify the
base tree in git format-patch, please see https://stackoverflow.com/a/37406982]

url:    https://github.com/0day-ci/linux/commits/Hoang-Huu-Le/tipc-update-a-binding-service-via-broadcast/20200607-122712
base:   https://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next.git cb8e59cc87201af93dfbb6c3dccc8fcad72a09c2
config: i386-randconfig-s002-20200607 (attached as .config)
compiler: gcc-9 (Debian 9.3.0-13) 9.3.0
reproduce:
        # apt-get install sparse
        # sparse version: v0.6.1-247-gcadbd124-dirty
        # save the attached .config to linux build tree
        make W=1 C=1 ARCH=i386 CF='-fdiagnostic-prefix -D__CHECK_ENDIAN__'

If you fix the issue, kindly add following tag as appropriate
Reported-by: kernel test robot <lkp@intel.com>


sparse warnings: (new ones prefixed by >>)

>> net/tipc/name_distr.c:323:16: sparse: sparse: symbol 'tipc_named_dequeue' was not declared. Should it be static?

Please review and possibly fold the followup patch.

---
0-DAY CI Kernel Test Service, Intel Corporation
https://lists.01.org/hyperkitty/list/kbuild-all@lists.01.org

[-- Attachment #2: .config.gz --]
[-- Type: application/gzip, Size: 37946 bytes --]

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [net-next] tipc: update a binding service via broadcast
@ 2020-06-07  8:18   ` kernel test robot
  0 siblings, 0 replies; 11+ messages in thread
From: kernel test robot @ 2020-06-07  8:18 UTC (permalink / raw)
  To: kbuild-all

[-- Attachment #1: Type: text/plain, Size: 1471 bytes --]

Hi Hoang,

Thank you for the patch! Perhaps something to improve:

[auto build test WARNING on net-next/master]
[also build test WARNING on linus/master next-20200605]
[cannot apply to v5.7]
[if your patch is applied to the wrong git tree, please drop us a note to help
improve the system. BTW, we also suggest to use '--base' option to specify the
base tree in git format-patch, please see https://stackoverflow.com/a/37406982]

url:    https://github.com/0day-ci/linux/commits/Hoang-Huu-Le/tipc-update-a-binding-service-via-broadcast/20200607-122712
base:   https://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next.git cb8e59cc87201af93dfbb6c3dccc8fcad72a09c2
config: i386-randconfig-s002-20200607 (attached as .config)
compiler: gcc-9 (Debian 9.3.0-13) 9.3.0
reproduce:
        # apt-get install sparse
        # sparse version: v0.6.1-247-gcadbd124-dirty
        # save the attached .config to linux build tree
        make W=1 C=1 ARCH=i386 CF='-fdiagnostic-prefix -D__CHECK_ENDIAN__'

If you fix the issue, kindly add following tag as appropriate
Reported-by: kernel test robot <lkp@intel.com>


sparse warnings: (new ones prefixed by >>)

>> net/tipc/name_distr.c:323:16: sparse: sparse: symbol 'tipc_named_dequeue' was not declared. Should it be static?

Please review and possibly fold the followup patch.

---
0-DAY CI Kernel Test Service, Intel Corporation
https://lists.01.org/hyperkitty/list/kbuild-all(a)lists.01.org

[-- Attachment #2: config.gz --]
[-- Type: application/gzip, Size: 37946 bytes --]

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [RFC PATCH] tipc: tipc_named_dequeue() can be static
  2020-06-07  4:24 [net-next] tipc: update a binding service via broadcast Hoang Huu Le
@ 2020-06-07  8:18   ` kernel test robot
  2020-06-07  6:22   ` kernel test robot
                     ` (4 subsequent siblings)
  5 siblings, 0 replies; 11+ messages in thread
From: kernel test robot @ 2020-06-07  8:18 UTC (permalink / raw)
  To: Hoang Huu Le, jmaloy, maloy, ying.xue, tipc-discussion, netdev; +Cc: kbuild-all


Signed-off-by: kernel test robot <lkp@intel.com>
---
 name_distr.c |    4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 481d480609f0a..b4f2351259333 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -320,8 +320,8 @@ static bool tipc_update_nametbl(struct net *net, struct distr_item *i,
 	return false;
 }
 
-struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
-				   u16 *rcv_nxt, bool *open)
+static struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
+					  u16 *rcv_nxt, bool *open)
 {
 	struct sk_buff *skb, *tmp;
 	struct tipc_msg *hdr;

^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [RFC PATCH] tipc: tipc_named_dequeue() can be static
@ 2020-06-07  8:18   ` kernel test robot
  0 siblings, 0 replies; 11+ messages in thread
From: kernel test robot @ 2020-06-07  8:18 UTC (permalink / raw)
  To: kbuild-all

[-- Attachment #1: Type: text/plain, Size: 679 bytes --]


Signed-off-by: kernel test robot <lkp@intel.com>
---
 name_distr.c |    4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 481d480609f0a..b4f2351259333 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -320,8 +320,8 @@ static bool tipc_update_nametbl(struct net *net, struct distr_item *i,
 	return false;
 }
 
-struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
-				   u16 *rcv_nxt, bool *open)
+static struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
+					  u16 *rcv_nxt, bool *open)
 {
 	struct sk_buff *skb, *tmp;
 	struct tipc_msg *hdr;

^ permalink raw reply related	[flat|nested] 11+ messages in thread

* Re: [net-next] tipc: update a binding service via broadcast
  2020-06-07  4:24 [net-next] tipc: update a binding service via broadcast Hoang Huu Le
                   ` (3 preceding siblings ...)
  2020-06-07  8:18   ` kernel test robot
@ 2020-06-07 19:03 ` Jon Maloy
  2020-06-07 23:56 ` David Miller
  5 siblings, 0 replies; 11+ messages in thread
From: Jon Maloy @ 2020-06-07 19:03 UTC (permalink / raw)
  To: Hoang Huu Le, maloy, ying.xue, tipc-discussion, netdev



On 6/7/20 12:24 AM, Hoang Huu Le wrote:
> Currently, updating binding table (add service binding to
> name table/withdraw a service binding) is being sent over replicast.
> However, if we are scaling up clusters to > 100 nodes/containers this
> method is less affection because of looping through nodes in a cluster one
> by one.
>
> It is worth to use broadcast to update a binding service. This way, the
> binding table can be updated on all peer nodes in one shot.
>
> Broadcast is used when all peer nodes, as indicated by a new capability
> flag TIPC_NAMED_BCAST, support reception of this message type.
>
> Four problems need to be considered when introducing this feature.
> 1) When establishing a link to a new peer node we still update this by a
> unicast 'bulk' update. This may lead to race conditions, where a later
> broadcast publication/withdrawal bypass the 'bulk', resulting in
> disordered publications, or even that a withdrawal may arrive before the
> corresponding publication. We solve this by adding an 'is_last_bulk' bit
> in the last bulk messages so that it can be distinguished from all other
> messages. Only when this message has arrived do we open up for reception
> of broadcast publications/withdrawals.
Add a line feed between these paragraphs before you send the patch.
Otherwise, still acked by me.

///join

> 2) When a first legacy node is added to the cluster all distribution
> will switch over to use the legacy 'replicast' method, while the
> opposite happens when the last legacy node leaves the cluster. This
> entails another risk of message disordering that has to be handled. We
> solve this by adding a sequence number to the broadcast/replicast
> messages, so that disordering can be discovered and corrected. Note
> however that we don't need to consider potential message loss or
> duplication at this protocol level.
> 3) Bulk messages don't contain any sequence numbers, and will always
> arrive in order. Hence we must exempt those from the sequence number
> control and deliver them unconditionally. We solve this by adding a new
> 'is_bulk' bit in those messages so that they can be recognized.
> 4) Legacy messages, which don't contain any new bits or sequence
> numbers, but neither can arrive out of order, also need to be exempt
> from the initial synchronization and sequence number check, and
> delivered unconditionally. Therefore, we add another 'is_not_legacy' bit
> to all new messages so that those can be distinguished from legacy
> messages and the latter delivered directly.
>
> Signed-off-by: Hoang Huu Le <hoang.h.le@dektech.com.au>
> Acked-by: Jon Maloy <jmaloy@redhat.com>
> ---
>   net/tipc/bcast.c      |   6 +--
>   net/tipc/bcast.h      |   4 +-
>   net/tipc/link.c       |   2 +-
>   net/tipc/msg.h        |  40 ++++++++++++++++
>   net/tipc/name_distr.c | 109 +++++++++++++++++++++++++++++++-----------
>   net/tipc/name_distr.h |   9 ++--
>   net/tipc/name_table.c |   9 +++-
>   net/tipc/name_table.h |   2 +
>   net/tipc/node.c       |  29 ++++++++---
>   net/tipc/node.h       |   8 ++--
>   10 files changed, 170 insertions(+), 48 deletions(-)
>
> diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
> index 383f87bc1061..940d176e0e87 100644
> --- a/net/tipc/bcast.c
> +++ b/net/tipc/bcast.c
> @@ -250,8 +250,8 @@ static void tipc_bcast_select_xmit_method(struct net *net, int dests,
>    * Consumes the buffer chain.
>    * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE
>    */
> -static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
> -			   u16 *cong_link_cnt)
> +int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
> +		    u16 *cong_link_cnt)
>   {
>   	struct tipc_link *l = tipc_bc_sndlink(net);
>   	struct sk_buff_head xmitq;
> @@ -752,7 +752,7 @@ void tipc_nlist_purge(struct tipc_nlist *nl)
>   	nl->local = false;
>   }
>   
> -u32 tipc_bcast_get_broadcast_mode(struct net *net)
> +u32 tipc_bcast_get_mode(struct net *net)
>   {
>   	struct tipc_bc_base *bb = tipc_bc_base(net);
>   
> diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
> index 4240c95188b1..2d9352dc7b0e 100644
> --- a/net/tipc/bcast.h
> +++ b/net/tipc/bcast.h
> @@ -90,6 +90,8 @@ void tipc_bcast_toggle_rcast(struct net *net, bool supp);
>   int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
>   		    struct tipc_mc_method *method, struct tipc_nlist *dests,
>   		    u16 *cong_link_cnt);
> +int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
> +		    u16 *cong_link_cnt);
>   int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
>   void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
>   			struct tipc_msg *hdr);
> @@ -101,7 +103,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg,
>   int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
>   int tipc_bclink_reset_stats(struct net *net, struct tipc_link *l);
>   
> -u32 tipc_bcast_get_broadcast_mode(struct net *net);
> +u32 tipc_bcast_get_mode(struct net *net);
>   u32 tipc_bcast_get_broadcast_ratio(struct net *net);
>   
>   void tipc_mcast_filter_msg(struct net *net, struct sk_buff_head *defq,
> diff --git a/net/tipc/link.c b/net/tipc/link.c
> index ee3b8d0576b8..eac89a3e22ce 100644
> --- a/net/tipc/link.c
> +++ b/net/tipc/link.c
> @@ -2745,7 +2745,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg,
>   	void *hdr;
>   	struct nlattr *attrs;
>   	struct nlattr *prop;
> -	u32 bc_mode = tipc_bcast_get_broadcast_mode(net);
> +	u32 bc_mode = tipc_bcast_get_mode(net);
>   	u32 bc_ratio = tipc_bcast_get_broadcast_ratio(net);
>   
>   	if (!bcl)
> diff --git a/net/tipc/msg.h b/net/tipc/msg.h
> index 58660d56bc83..65119e81ff0c 100644
> --- a/net/tipc/msg.h
> +++ b/net/tipc/msg.h
> @@ -438,6 +438,36 @@ static inline void msg_set_errcode(struct tipc_msg *m, u32 err)
>   	msg_set_bits(m, 1, 25, 0xf, err);
>   }
>   
> +static inline void msg_set_bulk(struct tipc_msg *m)
> +{
> +	msg_set_bits(m, 1, 28, 0x1, 1);
> +}
> +
> +static inline u32 msg_is_bulk(struct tipc_msg *m)
> +{
> +	return msg_bits(m, 1, 28, 0x1);
> +}
> +
> +static inline void msg_set_last_bulk(struct tipc_msg *m)
> +{
> +	msg_set_bits(m, 1, 27, 0x1, 1);
> +}
> +
> +static inline u32 msg_is_last_bulk(struct tipc_msg *m)
> +{
> +	return msg_bits(m, 1, 27, 0x1);
> +}
> +
> +static inline void msg_set_non_legacy(struct tipc_msg *m)
> +{
> +	msg_set_bits(m, 1, 26, 0x1, 1);
> +}
> +
> +static inline u32 msg_is_legacy(struct tipc_msg *m)
> +{
> +	return !msg_bits(m, 1, 26, 0x1);
> +}
> +
>   static inline u32 msg_reroute_cnt(struct tipc_msg *m)
>   {
>   	return msg_bits(m, 1, 21, 0xf);
> @@ -567,6 +597,16 @@ static inline void msg_set_origport(struct tipc_msg *m, u32 p)
>   	msg_set_word(m, 4, p);
>   }
>   
> +static inline u16 msg_named_seqno(struct tipc_msg *m)
> +{
> +	return msg_bits(m, 4, 0, 0xffff);
> +}
> +
> +static inline void msg_set_named_seqno(struct tipc_msg *m, u16 n)
> +{
> +	msg_set_bits(m, 4, 0, 0xffff, n);
> +}
> +
>   static inline u32 msg_destport(struct tipc_msg *m)
>   {
>   	return msg_word(m, 5);
> diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
> index 5feaf3b67380..481d480609f0 100644
> --- a/net/tipc/name_distr.c
> +++ b/net/tipc/name_distr.c
> @@ -102,7 +102,8 @@ struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ)
>   		pr_warn("Publication distribution failure\n");
>   		return NULL;
>   	}
> -
> +	msg_set_named_seqno(buf_msg(skb), nt->snd_nxt++);
> +	msg_set_non_legacy(buf_msg(skb));
>   	item = (struct distr_item *)msg_data(buf_msg(skb));
>   	publ_to_item(item, publ);
>   	return skb;
> @@ -114,8 +115,8 @@ struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ)
>   struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ)
>   {
>   	struct name_table *nt = tipc_name_table(net);
> -	struct sk_buff *buf;
>   	struct distr_item *item;
> +	struct sk_buff *skb;
>   
>   	write_lock_bh(&nt->cluster_scope_lock);
>   	list_del(&publ->binding_node);
> @@ -123,15 +124,16 @@ struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ)
>   	if (publ->scope == TIPC_NODE_SCOPE)
>   		return NULL;
>   
> -	buf = named_prepare_buf(net, WITHDRAWAL, ITEM_SIZE, 0);
> -	if (!buf) {
> +	skb = named_prepare_buf(net, WITHDRAWAL, ITEM_SIZE, 0);
> +	if (!skb) {
>   		pr_warn("Withdrawal distribution failure\n");
>   		return NULL;
>   	}
> -
> -	item = (struct distr_item *)msg_data(buf_msg(buf));
> +	msg_set_named_seqno(buf_msg(skb), nt->snd_nxt++);
> +	msg_set_non_legacy(buf_msg(skb));
> +	item = (struct distr_item *)msg_data(buf_msg(skb));
>   	publ_to_item(item, publ);
> -	return buf;
> +	return skb;
>   }
>   
>   /**
> @@ -141,7 +143,7 @@ struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ)
>    * @pls: linked list of publication items to be packed into buffer chain
>    */
>   static void named_distribute(struct net *net, struct sk_buff_head *list,
> -			     u32 dnode, struct list_head *pls)
> +			     u32 dnode, struct list_head *pls, u16 seqno)
>   {
>   	struct publication *publ;
>   	struct sk_buff *skb = NULL;
> @@ -149,6 +151,7 @@ static void named_distribute(struct net *net, struct sk_buff_head *list,
>   	u32 msg_dsz = ((tipc_node_get_mtu(net, dnode, 0, false) - INT_H_SIZE) /
>   			ITEM_SIZE) * ITEM_SIZE;
>   	u32 msg_rem = msg_dsz;
> +	struct tipc_msg *hdr;
>   
>   	list_for_each_entry(publ, pls, binding_node) {
>   		/* Prepare next buffer: */
> @@ -159,8 +162,11 @@ static void named_distribute(struct net *net, struct sk_buff_head *list,
>   				pr_warn("Bulk publication failure\n");
>   				return;
>   			}
> -			msg_set_bc_ack_invalid(buf_msg(skb), true);
> -			item = (struct distr_item *)msg_data(buf_msg(skb));
> +			hdr = buf_msg(skb);
> +			msg_set_bc_ack_invalid(hdr, true);
> +			msg_set_bulk(hdr);
> +			msg_set_non_legacy(hdr);
> +			item = (struct distr_item *)msg_data(hdr);
>   		}
>   
>   		/* Pack publication into message: */
> @@ -176,24 +182,35 @@ static void named_distribute(struct net *net, struct sk_buff_head *list,
>   		}
>   	}
>   	if (skb) {
> -		msg_set_size(buf_msg(skb), INT_H_SIZE + (msg_dsz - msg_rem));
> +		hdr = buf_msg(skb);
> +		msg_set_size(hdr, INT_H_SIZE + (msg_dsz - msg_rem));
>   		skb_trim(skb, INT_H_SIZE + (msg_dsz - msg_rem));
>   		__skb_queue_tail(list, skb);
>   	}
> +	hdr = buf_msg(skb_peek_tail(list));
> +	msg_set_last_bulk(hdr);
> +	msg_set_named_seqno(hdr, seqno);
>   }
>   
>   /**
>    * tipc_named_node_up - tell specified node about all publications by this node
>    */
> -void tipc_named_node_up(struct net *net, u32 dnode)
> +void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities)
>   {
>   	struct name_table *nt = tipc_name_table(net);
> +	struct tipc_net *tn = tipc_net(net);
>   	struct sk_buff_head head;
> +	u16 seqno;
>   
>   	__skb_queue_head_init(&head);
> +	spin_lock_bh(&tn->nametbl_lock);
> +	if (!(capabilities & TIPC_NAMED_BCAST))
> +		nt->rc_dests++;
> +	seqno = nt->snd_nxt;
> +	spin_unlock_bh(&tn->nametbl_lock);
>   
>   	read_lock_bh(&nt->cluster_scope_lock);
> -	named_distribute(net, &head, dnode, &nt->cluster_scope);
> +	named_distribute(net, &head, dnode, &nt->cluster_scope, seqno);
>   	tipc_node_xmit(net, &head, dnode, 0);
>   	read_unlock_bh(&nt->cluster_scope_lock);
>   }
> @@ -245,13 +262,21 @@ static void tipc_dist_queue_purge(struct net *net, u32 addr)
>   	spin_unlock_bh(&tn->nametbl_lock);
>   }
>   
> -void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr)
> +void tipc_publ_notify(struct net *net, struct list_head *nsub_list,
> +		      u32 addr, u16 capabilities)
>   {
> +	struct name_table *nt = tipc_name_table(net);
> +	struct tipc_net *tn = tipc_net(net);
> +
>   	struct publication *publ, *tmp;
>   
>   	list_for_each_entry_safe(publ, tmp, nsub_list, binding_node)
>   		tipc_publ_purge(net, publ, addr);
>   	tipc_dist_queue_purge(net, addr);
> +	spin_lock_bh(&tn->nametbl_lock);
> +	if (!(capabilities & TIPC_NAMED_BCAST))
> +		nt->rc_dests--;
> +	spin_unlock_bh(&tn->nametbl_lock);
>   }
>   
>   /**
> @@ -295,29 +320,55 @@ static bool tipc_update_nametbl(struct net *net, struct distr_item *i,
>   	return false;
>   }
>   
> +struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
> +				   u16 *rcv_nxt, bool *open)
> +{
> +	struct sk_buff *skb, *tmp;
> +	struct tipc_msg *hdr;
> +	u16 seqno;
> +
> +	skb_queue_walk_safe(namedq, skb, tmp) {
> +		skb_linearize(skb);
> +		hdr = buf_msg(skb);
> +		seqno = msg_named_seqno(hdr);
> +		if (msg_is_last_bulk(hdr)) {
> +			*rcv_nxt = seqno;
> +			*open = true;
> +		}
> +		if (msg_is_bulk(hdr) || msg_is_legacy(hdr)) {
> +			__skb_unlink(skb, namedq);
> +			return skb;
> +		}
> +
> +		if (*open && (*rcv_nxt == seqno)) {
> +			(*rcv_nxt)++;
> +			__skb_unlink(skb, namedq);
> +			return skb;
> +		}
> +	}
> +	return NULL;
> +}
> +
>   /**
>    * tipc_named_rcv - process name table update messages sent by another node
>    */
> -void tipc_named_rcv(struct net *net, struct sk_buff_head *inputq)
> +void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
> +		    u16 *rcv_nxt, bool *open)
>   {
> -	struct tipc_net *tn = net_generic(net, tipc_net_id);
> -	struct tipc_msg *msg;
> +	struct tipc_net *tn = tipc_net(net);
>   	struct distr_item *item;
> -	uint count;
> -	u32 node;
> +	struct tipc_msg *hdr;
>   	struct sk_buff *skb;
> -	int mtype;
> +	u32 count, node = 0;
>   
>   	spin_lock_bh(&tn->nametbl_lock);
> -	for (skb = skb_dequeue(inputq); skb; skb = skb_dequeue(inputq)) {
> -		skb_linearize(skb);
> -		msg = buf_msg(skb);
> -		mtype = msg_type(msg);
> -		item = (struct distr_item *)msg_data(msg);
> -		count = msg_data_sz(msg) / ITEM_SIZE;
> -		node = msg_orignode(msg);
> +	while ((skb = tipc_named_dequeue(namedq, rcv_nxt, open))) {
> +		hdr = buf_msg(skb);
> +		node = msg_orignode(hdr);
> +		item = (struct distr_item *)msg_data(hdr);
> +		count = msg_data_sz(hdr) / ITEM_SIZE;
>   		while (count--) {
> -			tipc_update_nametbl(net, item, node, mtype);
> +			tipc_update_nametbl(net, item, node, msg_type(hdr));
>   			item++;
>   		}
>   		kfree_skb(skb);
> @@ -345,6 +396,6 @@ void tipc_named_reinit(struct net *net)
>   		publ->node = self;
>   	list_for_each_entry_rcu(publ, &nt->cluster_scope, binding_node)
>   		publ->node = self;
> -
> +	nt->rc_dests = 0;
>   	spin_unlock_bh(&tn->nametbl_lock);
>   }
> diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h
> index 63fc73e0fa6c..092323158f06 100644
> --- a/net/tipc/name_distr.h
> +++ b/net/tipc/name_distr.h
> @@ -67,11 +67,14 @@ struct distr_item {
>   	__be32 key;
>   };
>   
> +void tipc_named_bcast(struct net *net, struct sk_buff *skb);
>   struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ);
>   struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ);
> -void tipc_named_node_up(struct net *net, u32 dnode);
> -void tipc_named_rcv(struct net *net, struct sk_buff_head *msg_queue);
> +void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities);
> +void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
> +		    u16 *rcv_nxt, bool *open);
>   void tipc_named_reinit(struct net *net);
> -void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr);
> +void tipc_publ_notify(struct net *net, struct list_head *nsub_list,
> +		      u32 addr, u16 capabilities);
>   
>   #endif
> diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
> index 359b2bc888cf..2ac33d32edc2 100644
> --- a/net/tipc/name_table.c
> +++ b/net/tipc/name_table.c
> @@ -729,6 +729,7 @@ struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
>   	struct tipc_net *tn = tipc_net(net);
>   	struct publication *p = NULL;
>   	struct sk_buff *skb = NULL;
> +	u32 rc_dests;
>   
>   	spin_lock_bh(&tn->nametbl_lock);
>   
> @@ -743,12 +744,14 @@ struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
>   		nt->local_publ_count++;
>   		skb = tipc_named_publish(net, p);
>   	}
> +	rc_dests = nt->rc_dests;
>   exit:
>   	spin_unlock_bh(&tn->nametbl_lock);
>   
>   	if (skb)
> -		tipc_node_broadcast(net, skb);
> +		tipc_node_broadcast(net, skb, rc_dests);
>   	return p;
> +
>   }
>   
>   /**
> @@ -762,6 +765,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower,
>   	u32 self = tipc_own_addr(net);
>   	struct sk_buff *skb = NULL;
>   	struct publication *p;
> +	u32 rc_dests;
>   
>   	spin_lock_bh(&tn->nametbl_lock);
>   
> @@ -775,10 +779,11 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower,
>   		pr_err("Failed to remove local publication {%u,%u,%u}/%u\n",
>   		       type, lower, upper, key);
>   	}
> +	rc_dests = nt->rc_dests;
>   	spin_unlock_bh(&tn->nametbl_lock);
>   
>   	if (skb) {
> -		tipc_node_broadcast(net, skb);
> +		tipc_node_broadcast(net, skb, rc_dests);
>   		return 1;
>   	}
>   	return 0;
> diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
> index 728bc7016c38..8064e1986e2c 100644
> --- a/net/tipc/name_table.h
> +++ b/net/tipc/name_table.h
> @@ -106,6 +106,8 @@ struct name_table {
>   	struct list_head cluster_scope;
>   	rwlock_t cluster_scope_lock;
>   	u32 local_publ_count;
> +	u32 rc_dests;
> +	u32 snd_nxt;
>   };
>   
>   int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
> diff --git a/net/tipc/node.c b/net/tipc/node.c
> index a4c2816c3746..030a51c4d1fa 100644
> --- a/net/tipc/node.c
> +++ b/net/tipc/node.c
> @@ -75,6 +75,8 @@ struct tipc_bclink_entry {
>   	struct sk_buff_head arrvq;
>   	struct sk_buff_head inputq2;
>   	struct sk_buff_head namedq;
> +	u16 named_rcv_nxt;
> +	bool named_open;
>   };
>   
>   /**
> @@ -396,10 +398,10 @@ static void tipc_node_write_unlock(struct tipc_node *n)
>   	write_unlock_bh(&n->lock);
>   
>   	if (flags & TIPC_NOTIFY_NODE_DOWN)
> -		tipc_publ_notify(net, publ_list, addr);
> +		tipc_publ_notify(net, publ_list, addr, n->capabilities);
>   
>   	if (flags & TIPC_NOTIFY_NODE_UP)
> -		tipc_named_node_up(net, addr);
> +		tipc_named_node_up(net, addr, n->capabilities);
>   
>   	if (flags & TIPC_NOTIFY_LINK_UP) {
>   		tipc_mon_peer_up(net, addr, bearer_id);
> @@ -1483,6 +1485,7 @@ static void node_lost_contact(struct tipc_node *n,
>   
>   	/* Clean up broadcast state */
>   	tipc_bcast_remove_peer(n->net, n->bc_entry.link);
> +	__skb_queue_purge(&n->bc_entry.namedq);
>   
>   	/* Abort any ongoing link failover */
>   	for (i = 0; i < MAX_BEARERS; i++) {
> @@ -1729,12 +1732,23 @@ int tipc_node_distr_xmit(struct net *net, struct sk_buff_head *xmitq)
>   	return 0;
>   }
>   
> -void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
> +void tipc_node_broadcast(struct net *net, struct sk_buff *skb, int rc_dests)
>   {
> +	struct sk_buff_head xmitq;
>   	struct sk_buff *txskb;
>   	struct tipc_node *n;
> +	u16 dummy;
>   	u32 dst;
>   
> +	/* Use broadcast if all nodes support it */
> +	if (!rc_dests && tipc_bcast_get_mode(net) != BCLINK_MODE_RCAST) {
> +		__skb_queue_head_init(&xmitq);
> +		__skb_queue_tail(&xmitq, skb);
> +		tipc_bcast_xmit(net, &xmitq, &dummy);
> +		return;
> +	}
> +
> +	/* Otherwise use legacy replicast method */
>   	rcu_read_lock();
>   	list_for_each_entry_rcu(n, tipc_nodes(net), list) {
>   		dst = n->addr;
> @@ -1749,7 +1763,6 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
>   		tipc_node_xmit_skb(net, txskb, dst, 0);
>   	}
>   	rcu_read_unlock();
> -
>   	kfree_skb(skb);
>   }
>   
> @@ -1844,7 +1857,9 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id
>   
>   	/* Handle NAME_DISTRIBUTOR messages sent from 1.7 nodes */
>   	if (!skb_queue_empty(&n->bc_entry.namedq))
> -		tipc_named_rcv(net, &n->bc_entry.namedq);
> +		tipc_named_rcv(net, &n->bc_entry.namedq,
> +			       &n->bc_entry.named_rcv_nxt,
> +			       &n->bc_entry.named_open);
>   
>   	/* If reassembly or retransmission failure => reset all links to peer */
>   	if (rc & TIPC_LINK_DOWN_EVT)
> @@ -2114,7 +2129,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
>   		tipc_node_link_down(n, bearer_id, false);
>   
>   	if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
> -		tipc_named_rcv(net, &n->bc_entry.namedq);
> +		tipc_named_rcv(net, &n->bc_entry.namedq,
> +			       &n->bc_entry.named_rcv_nxt,
> +			       &n->bc_entry.named_open);
>   
>   	if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1)))
>   		tipc_node_mcast_rcv(n);
> diff --git a/net/tipc/node.h b/net/tipc/node.h
> index a6803b449a2c..9f6f13f1604f 100644
> --- a/net/tipc/node.h
> +++ b/net/tipc/node.h
> @@ -55,7 +55,8 @@ enum {
>   	TIPC_MCAST_RBCTL      = (1 << 7),
>   	TIPC_GAP_ACK_BLOCK    = (1 << 8),
>   	TIPC_TUNNEL_ENHANCED  = (1 << 9),
> -	TIPC_NAGLE            = (1 << 10)
> +	TIPC_NAGLE            = (1 << 10),
> +	TIPC_NAMED_BCAST      = (1 << 11)
>   };
>   
>   #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT           |  \
> @@ -68,7 +69,8 @@ enum {
>   				TIPC_MCAST_RBCTL       |   \
>   				TIPC_GAP_ACK_BLOCK     |   \
>   				TIPC_TUNNEL_ENHANCED   |   \
> -				TIPC_NAGLE)
> +				TIPC_NAGLE             |   \
> +				TIPC_NAMED_BCAST)
>   
>   #define INVALID_BEARER_ID -1
>   
> @@ -101,7 +103,7 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
>   		       u32 selector);
>   void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 addr);
>   void tipc_node_unsubscribe(struct net *net, struct list_head *subscr, u32 addr);
> -void tipc_node_broadcast(struct net *net, struct sk_buff *skb);
> +void tipc_node_broadcast(struct net *net, struct sk_buff *skb, int rc_dests);
>   int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
>   void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
>   int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel, bool connected);


^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [net-next] tipc: update a binding service via broadcast
  2020-06-07  4:24 [net-next] tipc: update a binding service via broadcast Hoang Huu Le
                   ` (4 preceding siblings ...)
  2020-06-07 19:03 ` [net-next] tipc: update a binding service via broadcast Jon Maloy
@ 2020-06-07 23:56 ` David Miller
  5 siblings, 0 replies; 11+ messages in thread
From: David Miller @ 2020-06-07 23:56 UTC (permalink / raw)
  To: hoang.h.le; +Cc: jmaloy, maloy, ying.xue, tipc-discussion, netdev


net-next is CLOSED, thank you

^ permalink raw reply	[flat|nested] 11+ messages in thread

end of thread, other threads:[~2020-06-07 23:56 UTC | newest]

Thread overview: 11+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-06-07  4:24 [net-next] tipc: update a binding service via broadcast Hoang Huu Le
2020-06-07  5:38 ` kernel test robot
2020-06-07  5:38   ` kernel test robot
2020-06-07  6:22 ` kernel test robot
2020-06-07  6:22   ` kernel test robot
2020-06-07  8:18 ` kernel test robot
2020-06-07  8:18   ` kernel test robot
2020-06-07  8:18 ` [RFC PATCH] tipc: tipc_named_dequeue() can be static kernel test robot
2020-06-07  8:18   ` kernel test robot
2020-06-07 19:03 ` [net-next] tipc: update a binding service via broadcast Jon Maloy
2020-06-07 23:56 ` David Miller

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.