All of lore.kernel.org
 help / color / mirror / Atom feed
* [net-next 0/5] tipc: add some improvements
@ 2020-05-26  9:38 Tuong Lien
  2020-05-26  9:38 ` [net-next 1/5] tipc: introduce Gap ACK blocks for broadcast link Tuong Lien
                   ` (5 more replies)
  0 siblings, 6 replies; 7+ messages in thread
From: Tuong Lien @ 2020-05-26  9:38 UTC (permalink / raw)
  To: davem, jmaloy, maloy, ying.xue, netdev; +Cc: tipc-discussion

This series adds some improvements to TIPC.

The first patch improves the TIPC broadcast's performance with the 'Gap
ACK blocks' mechanism similar to unicast before, while the others give
support on tracing & statistics for broadcast links, and an alternative
to carry broadcast retransmissions via unicast which might be useful in
some cases.

Besides, the Nagle algorithm can now automatically 'adjust' itself
depending on the specific network condition a stream connection runs by
the last patch.

Tuong Lien (5):
  tipc: introduce Gap ACK blocks for broadcast link
  tipc: add back link trace events
  tipc: enable broadcast retrans via unicast
  tipc: add support for broadcast rcv stats dumping
  tipc: add test for Nagle algorithm effectiveness

 net/tipc/bcast.c   |  22 ++-
 net/tipc/bcast.h   |   9 +-
 net/tipc/link.c    | 487 +++++++++++++++++++++++++++++++----------------------
 net/tipc/link.h    |  11 +-
 net/tipc/msg.c     |  12 +-
 net/tipc/msg.h     |  43 ++++-
 net/tipc/netlink.c |   2 +-
 net/tipc/node.c    |  73 ++++++--
 net/tipc/socket.c  |  64 +++++--
 net/tipc/sysctl.c  |   9 +-
 net/tipc/trace.h   |  17 +-
 11 files changed, 486 insertions(+), 263 deletions(-)

-- 
2.13.7


^ permalink raw reply	[flat|nested] 7+ messages in thread

* [net-next 1/5] tipc: introduce Gap ACK blocks for broadcast link
  2020-05-26  9:38 [net-next 0/5] tipc: add some improvements Tuong Lien
@ 2020-05-26  9:38 ` Tuong Lien
  2020-05-26  9:38 ` [net-next 2/5] tipc: add back link trace events Tuong Lien
                   ` (4 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Tuong Lien @ 2020-05-26  9:38 UTC (permalink / raw)
  To: davem, jmaloy, maloy, ying.xue, netdev; +Cc: tipc-discussion

As achieved through commit 9195948fbf34 ("tipc: improve TIPC throughput
by Gap ACK blocks"), we apply the same mechanism for the broadcast link
as well. The 'Gap ACK blocks' data field in a 'PROTOCOL/STATE_MSG' will
consist of two parts built for both the broadcast and unicast types:

 31                       16 15                        0
+-------------+-------------+-------------+-------------+
|  bgack_cnt  |  ugack_cnt  |            len            |
+-------------+-------------+-------------+-------------+  -
|            gap            |            ack            |   |
+-------------+-------------+-------------+-------------+    > bc gacks
:                           :                           :   |
+-------------+-------------+-------------+-------------+  -
|            gap            |            ack            |   |
+-------------+-------------+-------------+-------------+    > uc gacks
:                           :                           :   |
+-------------+-------------+-------------+-------------+  -

which is "automatically" backward-compatible.

We also increase the max number of Gap ACK blocks to 128, allowing upto
64 blocks per type (total buffer size = 516 bytes).

Besides, the 'tipc_link_advance_transmq()' function is refactored which
is applicable for both the unicast and broadcast cases now, so some old
functions can be removed and the code is optimized.

With the patch, TIPC broadcast is more robust regardless of packet loss
or disorder, latency, ... in the underlying network. Its performance is
boost up significantly.
For example, experiment with a 5% packet loss rate results:

$ time tipc-pipe --mc --rdm --data_size 123 --data_num 1500000
real    0m 42.46s
user    0m 1.16s
sys     0m 17.67s

Without the patch:

$ time tipc-pipe --mc --rdm --data_size 123 --data_num 1500000
real    8m 27.94s
user    0m 0.55s
sys     0m 2.38s

Acked-by: Ying Xue <ying.xue@windriver.com>
Acked-by: Jon Maloy <jmaloy@redhat.com>
Signed-off-by: Tuong Lien <tuong.t.lien@dektech.com.au>
---
 net/tipc/bcast.c |   9 +-
 net/tipc/link.c  | 425 ++++++++++++++++++++++++++++++++-----------------------
 net/tipc/link.h  |   7 +-
 net/tipc/msg.h   |  27 +++-
 net/tipc/node.c  |  10 +-
 5 files changed, 293 insertions(+), 185 deletions(-)

diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 4c20be08b9c4..3ce690a96ee9 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -474,7 +474,7 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
 	__skb_queue_head_init(&xmitq);
 
 	tipc_bcast_lock(net);
-	tipc_link_bc_ack_rcv(l, acked, &xmitq);
+	tipc_link_bc_ack_rcv(l, acked, 0, NULL, &xmitq);
 	tipc_bcast_unlock(net);
 
 	tipc_bcbase_xmit(net, &xmitq);
@@ -492,6 +492,7 @@ int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
 			struct tipc_msg *hdr)
 {
 	struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
+	struct tipc_gap_ack_blks *ga;
 	struct sk_buff_head xmitq;
 	int rc = 0;
 
@@ -501,8 +502,10 @@ int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
 	if (msg_type(hdr) != STATE_MSG) {
 		tipc_link_bc_init_rcv(l, hdr);
 	} else if (!msg_bc_ack_invalid(hdr)) {
-		tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq);
-		rc = tipc_link_bc_sync_rcv(l, hdr, &xmitq);
+		tipc_get_gap_ack_blks(&ga, l, hdr, false);
+		rc = tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr),
+					  msg_bc_gap(hdr), ga, &xmitq);
+		rc |= tipc_link_bc_sync_rcv(l, hdr, &xmitq);
 	}
 	tipc_bcast_unlock(net);
 
diff --git a/net/tipc/link.c b/net/tipc/link.c
index d4675e922a8f..d29b9c531171 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -188,6 +188,8 @@ struct tipc_link {
 	/* Broadcast */
 	u16 ackers;
 	u16 acked;
+	u16 last_gap;
+	struct tipc_gap_ack_blks *last_ga;
 	struct tipc_link *bc_rcvlink;
 	struct tipc_link *bc_sndlink;
 	u8 nack_state;
@@ -249,11 +251,14 @@ static int tipc_link_build_nack_msg(struct tipc_link *l,
 				    struct sk_buff_head *xmitq);
 static void tipc_link_build_bc_init_msg(struct tipc_link *l,
 					struct sk_buff_head *xmitq);
-static int tipc_link_release_pkts(struct tipc_link *l, u16 to);
-static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data, u16 gap);
-static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
+static u8 __tipc_build_gap_ack_blks(struct tipc_gap_ack_blks *ga,
+				    struct tipc_link *l, u8 start_index);
+static u16 tipc_build_gap_ack_blks(struct tipc_link *l, struct tipc_msg *hdr);
+static int tipc_link_advance_transmq(struct tipc_link *l, struct tipc_link *r,
+				     u16 acked, u16 gap,
 				     struct tipc_gap_ack_blks *ga,
-				     struct sk_buff_head *xmitq);
+				     struct sk_buff_head *xmitq,
+				     bool *retransmitted, int *rc);
 static void tipc_link_update_cwin(struct tipc_link *l, int released,
 				  bool retransmitted);
 /*
@@ -370,7 +375,7 @@ void tipc_link_remove_bc_peer(struct tipc_link *snd_l,
 	snd_l->ackers--;
 	rcv_l->bc_peer_is_up = true;
 	rcv_l->state = LINK_ESTABLISHED;
-	tipc_link_bc_ack_rcv(rcv_l, ack, xmitq);
+	tipc_link_bc_ack_rcv(rcv_l, ack, 0, NULL, xmitq);
 	trace_tipc_link_reset(rcv_l, TIPC_DUMP_ALL, "bclink removed!");
 	tipc_link_reset(rcv_l);
 	rcv_l->state = LINK_RESET;
@@ -784,8 +789,6 @@ bool tipc_link_too_silent(struct tipc_link *l)
 	return (l->silent_intv_cnt + 2 > l->abort_limit);
 }
 
-static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r,
-				u16 from, u16 to, struct sk_buff_head *xmitq);
 /* tipc_link_timeout - perform periodic task as instructed from node timeout
  */
 int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
@@ -948,6 +951,9 @@ void tipc_link_reset(struct tipc_link *l)
 	l->snd_nxt_state = 1;
 	l->rcv_nxt_state = 1;
 	l->acked = 0;
+	l->last_gap = 0;
+	kfree(l->last_ga);
+	l->last_ga = NULL;
 	l->silent_intv_cnt = 0;
 	l->rst_cnt = 0;
 	l->bc_peer_is_up = false;
@@ -1183,68 +1189,14 @@ static bool link_retransmit_failure(struct tipc_link *l, struct tipc_link *r,
 
 	if (link_is_bc_sndlink(l)) {
 		r->state = LINK_RESET;
-		*rc = TIPC_LINK_DOWN_EVT;
+		*rc |= TIPC_LINK_DOWN_EVT;
 	} else {
-		*rc = tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
+		*rc |= tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
 	}
 
 	return true;
 }
 
-/* tipc_link_bc_retrans() - retransmit zero or more packets
- * @l: the link to transmit on
- * @r: the receiving link ordering the retransmit. Same as l if unicast
- * @from: retransmit from (inclusive) this sequence number
- * @to: retransmit to (inclusive) this sequence number
- * xmitq: queue for accumulating the retransmitted packets
- */
-static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r,
-				u16 from, u16 to, struct sk_buff_head *xmitq)
-{
-	struct sk_buff *_skb, *skb = skb_peek(&l->transmq);
-	u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
-	u16 ack = l->rcv_nxt - 1;
-	int retransmitted = 0;
-	struct tipc_msg *hdr;
-	int rc = 0;
-
-	if (!skb)
-		return 0;
-	if (less(to, from))
-		return 0;
-
-	trace_tipc_link_retrans(r, from, to, &l->transmq);
-
-	if (link_retransmit_failure(l, r, &rc))
-		return rc;
-
-	skb_queue_walk(&l->transmq, skb) {
-		hdr = buf_msg(skb);
-		if (less(msg_seqno(hdr), from))
-			continue;
-		if (more(msg_seqno(hdr), to))
-			break;
-		if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr))
-			continue;
-		TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM;
-		_skb = pskb_copy(skb, GFP_ATOMIC);
-		if (!_skb)
-			return 0;
-		hdr = buf_msg(_skb);
-		msg_set_ack(hdr, ack);
-		msg_set_bcast_ack(hdr, bc_ack);
-		_skb->priority = TC_PRIO_CONTROL;
-		__skb_queue_tail(xmitq, _skb);
-		l->stats.retransmitted++;
-		retransmitted++;
-		/* Increase actual retrans counter & mark first time */
-		if (!TIPC_SKB_CB(skb)->retr_cnt++)
-			TIPC_SKB_CB(skb)->retr_stamp = jiffies;
-	}
-	tipc_link_update_cwin(l, 0, retransmitted);
-	return 0;
-}
-
 /* tipc_data_input - deliver data and name distr msgs to upper layer
  *
  * Consumes buffer if message is of right type
@@ -1402,46 +1354,71 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb,
 	return rc;
 }
 
-static int tipc_link_release_pkts(struct tipc_link *l, u16 acked)
-{
-	int released = 0;
-	struct sk_buff *skb, *tmp;
-
-	skb_queue_walk_safe(&l->transmq, skb, tmp) {
-		if (more(buf_seqno(skb), acked))
-			break;
-		__skb_unlink(skb, &l->transmq);
-		kfree_skb(skb);
-		released++;
+/**
+ * tipc_get_gap_ack_blks - get Gap ACK blocks from PROTOCOL/STATE_MSG
+ * @ga: returned pointer to the Gap ACK blocks if any
+ * @l: the tipc link
+ * @hdr: the PROTOCOL/STATE_MSG header
+ * @uc: desired Gap ACK blocks type, i.e. unicast (= 1) or broadcast (= 0)
+ *
+ * Return: the total Gap ACK blocks size
+ */
+u16 tipc_get_gap_ack_blks(struct tipc_gap_ack_blks **ga, struct tipc_link *l,
+			  struct tipc_msg *hdr, bool uc)
+{
+	struct tipc_gap_ack_blks *p;
+	u16 sz = 0;
+
+	/* Does peer support the Gap ACK blocks feature? */
+	if (l->peer_caps & TIPC_GAP_ACK_BLOCK) {
+		p = (struct tipc_gap_ack_blks *)msg_data(hdr);
+		sz = ntohs(p->len);
+		/* Sanity check */
+		if (sz == tipc_gap_ack_blks_sz(p->ugack_cnt + p->bgack_cnt)) {
+			/* Good, check if the desired type exists */
+			if ((uc && p->ugack_cnt) || (!uc && p->bgack_cnt))
+				goto ok;
+		/* Backward compatible: peer might not support bc, but uc? */
+		} else if (uc && sz == tipc_gap_ack_blks_sz(p->ugack_cnt)) {
+			if (p->ugack_cnt) {
+				p->bgack_cnt = 0;
+				goto ok;
+			}
+		}
 	}
-	return released;
+	/* Other cases: ignore! */
+	p = NULL;
+
+ok:
+	*ga = p;
+	return sz;
 }
 
-/* tipc_build_gap_ack_blks - build Gap ACK blocks
- * @l: tipc link that data have come with gaps in sequence if any
- * @data: data buffer to store the Gap ACK blocks after built
- *
- * returns the actual allocated memory size
- */
-static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data, u16 gap)
+static u8 __tipc_build_gap_ack_blks(struct tipc_gap_ack_blks *ga,
+				    struct tipc_link *l, u8 start_index)
 {
+	struct tipc_gap_ack *gacks = &ga->gacks[start_index];
 	struct sk_buff *skb = skb_peek(&l->deferdq);
-	struct tipc_gap_ack_blks *ga = data;
-	u16 len, expect, seqno = 0;
+	u16 expect, seqno = 0;
 	u8 n = 0;
 
-	if (!skb || !gap)
-		goto exit;
+	if (!skb)
+		return 0;
 
 	expect = buf_seqno(skb);
 	skb_queue_walk(&l->deferdq, skb) {
 		seqno = buf_seqno(skb);
 		if (unlikely(more(seqno, expect))) {
-			ga->gacks[n].ack = htons(expect - 1);
-			ga->gacks[n].gap = htons(seqno - expect);
-			if (++n >= MAX_GAP_ACK_BLKS) {
-				pr_info_ratelimited("Too few Gap ACK blocks!\n");
-				goto exit;
+			gacks[n].ack = htons(expect - 1);
+			gacks[n].gap = htons(seqno - expect);
+			if (++n >= MAX_GAP_ACK_BLKS / 2) {
+				char buf[TIPC_MAX_LINK_NAME];
+
+				pr_info_ratelimited("Gacks on %s: %d, ql: %d!\n",
+						    tipc_link_name_ext(l, buf),
+						    n,
+						    skb_queue_len(&l->deferdq));
+				return n;
 			}
 		} else if (unlikely(less(seqno, expect))) {
 			pr_warn("Unexpected skb in deferdq!\n");
@@ -1451,14 +1428,44 @@ static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data, u16 gap)
 	}
 
 	/* last block */
-	ga->gacks[n].ack = htons(seqno);
-	ga->gacks[n].gap = 0;
+	gacks[n].ack = htons(seqno);
+	gacks[n].gap = 0;
 	n++;
+	return n;
+}
 
-exit:
-	len = tipc_gap_ack_blks_sz(n);
+/* tipc_build_gap_ack_blks - build Gap ACK blocks
+ * @l: tipc unicast link
+ * @hdr: the tipc message buffer to store the Gap ACK blocks after built
+ *
+ * The function builds Gap ACK blocks for both the unicast & broadcast receiver
+ * links of a certain peer, the buffer after built has the network data format
+ * as found at the struct tipc_gap_ack_blks definition.
+ *
+ * returns the actual allocated memory size
+ */
+static u16 tipc_build_gap_ack_blks(struct tipc_link *l, struct tipc_msg *hdr)
+{
+	struct tipc_link *bcl = l->bc_rcvlink;
+	struct tipc_gap_ack_blks *ga;
+	u16 len;
+
+	ga = (struct tipc_gap_ack_blks *)msg_data(hdr);
+
+	/* Start with broadcast link first */
+	tipc_bcast_lock(bcl->net);
+	msg_set_bcast_ack(hdr, bcl->rcv_nxt - 1);
+	msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl));
+	ga->bgack_cnt = __tipc_build_gap_ack_blks(ga, bcl, 0);
+	tipc_bcast_unlock(bcl->net);
+
+	/* Now for unicast link, but an explicit NACK only (???) */
+	ga->ugack_cnt = (msg_seq_gap(hdr)) ?
+			__tipc_build_gap_ack_blks(ga, l, ga->bgack_cnt) : 0;
+
+	/* Total len */
+	len = tipc_gap_ack_blks_sz(ga->bgack_cnt + ga->ugack_cnt);
 	ga->len = htons(len);
-	ga->gack_cnt = n;
 	return len;
 }
 
@@ -1466,47 +1473,109 @@ static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data, u16 gap)
  *			       acked packets, also doing retransmissions if
  *			       gaps found
  * @l: tipc link with transmq queue to be advanced
+ * @r: tipc link "receiver" i.e. in case of broadcast (= "l" if unicast)
  * @acked: seqno of last packet acked by peer without any gaps before
  * @gap: # of gap packets
  * @ga: buffer pointer to Gap ACK blocks from peer
  * @xmitq: queue for accumulating the retransmitted packets if any
+ * @retransmitted: returned boolean value if a retransmission is really issued
+ * @rc: returned code e.g. TIPC_LINK_DOWN_EVT if a repeated retransmit failures
+ *      happens (- unlikely case)
  *
- * In case of a repeated retransmit failures, the call will return shortly
- * with a returned code (e.g. TIPC_LINK_DOWN_EVT)
+ * Return: the number of packets released from the link transmq
  */
-static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
+static int tipc_link_advance_transmq(struct tipc_link *l, struct tipc_link *r,
+				     u16 acked, u16 gap,
 				     struct tipc_gap_ack_blks *ga,
-				     struct sk_buff_head *xmitq)
+				     struct sk_buff_head *xmitq,
+				     bool *retransmitted, int *rc)
 {
+	struct tipc_gap_ack_blks *last_ga = r->last_ga, *this_ga = NULL;
+	struct tipc_gap_ack *gacks = NULL;
 	struct sk_buff *skb, *_skb, *tmp;
 	struct tipc_msg *hdr;
+	u32 qlen = skb_queue_len(&l->transmq);
+	u16 nacked = acked, ngap = gap, gack_cnt = 0;
 	u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
-	bool retransmitted = false;
 	u16 ack = l->rcv_nxt - 1;
-	bool passed = false;
-	u16 released = 0;
 	u16 seqno, n = 0;
-	int rc = 0;
+	u16 end = r->acked, start = end, offset = r->last_gap;
+	u16 si = (last_ga) ? last_ga->start_index : 0;
+	bool is_uc = !link_is_bc_sndlink(l);
+	bool bc_has_acked = false;
+
+	/* Determine Gap ACK blocks if any for the particular link */
+	if (ga && is_uc) {
+		/* Get the Gap ACKs, uc part */
+		gack_cnt = ga->ugack_cnt;
+		gacks = &ga->gacks[ga->bgack_cnt];
+	} else if (ga) {
+		/* Copy the Gap ACKs, bc part, for later renewal if needed */
+		this_ga = kmemdup(ga, tipc_gap_ack_blks_sz(ga->bgack_cnt),
+				  GFP_ATOMIC);
+		if (likely(this_ga)) {
+			this_ga->start_index = 0;
+			/* Start with the bc Gap ACKs */
+			gack_cnt = this_ga->bgack_cnt;
+			gacks = &this_ga->gacks[0];
+		} else {
+			/* Hmm, we can get in trouble..., simply ignore it */
+			pr_warn_ratelimited("Ignoring bc Gap ACKs, no memory\n");
+		}
+	}
 
+	/* Advance the link transmq */
 	skb_queue_walk_safe(&l->transmq, skb, tmp) {
 		seqno = buf_seqno(skb);
 
 next_gap_ack:
-		if (less_eq(seqno, acked)) {
+		if (less_eq(seqno, nacked)) {
+			if (is_uc)
+				goto release;
+			/* Skip packets peer has already acked */
+			if (!more(seqno, r->acked))
+				continue;
+			/* Get the next of last Gap ACK blocks */
+			while (more(seqno, end)) {
+				if (!last_ga || si >= last_ga->bgack_cnt)
+					break;
+				start = end + offset + 1;
+				end = ntohs(last_ga->gacks[si].ack);
+				offset = ntohs(last_ga->gacks[si].gap);
+				si++;
+				WARN_ONCE(more(start, end) ||
+					  (!offset &&
+					   si < last_ga->bgack_cnt) ||
+					  si > MAX_GAP_ACK_BLKS,
+					  "Corrupted Gap ACK: %d %d %d %d %d\n",
+					  start, end, offset, si,
+					  last_ga->bgack_cnt);
+			}
+			/* Check against the last Gap ACK block */
+			if (in_range(seqno, start, end))
+				continue;
+			/* Update/release the packet peer is acking */
+			bc_has_acked = true;
+			if (--TIPC_SKB_CB(skb)->ackers)
+				continue;
+release:
 			/* release skb */
 			__skb_unlink(skb, &l->transmq);
 			kfree_skb(skb);
-			released++;
-		} else if (less_eq(seqno, acked + gap)) {
-			/* First, check if repeated retrans failures occurs? */
-			if (!passed && link_retransmit_failure(l, l, &rc))
-				return rc;
-			passed = true;
-
+		} else if (less_eq(seqno, nacked + ngap)) {
+			/* First gap: check if repeated retrans failures? */
+			if (unlikely(seqno == acked + 1 &&
+				     link_retransmit_failure(l, r, rc))) {
+				/* Ignore this bc Gap ACKs if any */
+				kfree(this_ga);
+				this_ga = NULL;
+				break;
+			}
 			/* retransmit skb if unrestricted*/
 			if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr))
 				continue;
-			TIPC_SKB_CB(skb)->nxt_retr = TIPC_UC_RETR_TIME;
+			TIPC_SKB_CB(skb)->nxt_retr = (is_uc) ?
+					TIPC_UC_RETR_TIME : TIPC_BC_RETR_LIM;
 			_skb = pskb_copy(skb, GFP_ATOMIC);
 			if (!_skb)
 				continue;
@@ -1516,25 +1585,51 @@ static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
 			_skb->priority = TC_PRIO_CONTROL;
 			__skb_queue_tail(xmitq, _skb);
 			l->stats.retransmitted++;
-			retransmitted = true;
+			*retransmitted = true;
 			/* Increase actual retrans counter & mark first time */
 			if (!TIPC_SKB_CB(skb)->retr_cnt++)
 				TIPC_SKB_CB(skb)->retr_stamp = jiffies;
 		} else {
 			/* retry with Gap ACK blocks if any */
-			if (!ga || n >= ga->gack_cnt)
+			if (n >= gack_cnt)
 				break;
-			acked = ntohs(ga->gacks[n].ack);
-			gap = ntohs(ga->gacks[n].gap);
+			nacked = ntohs(gacks[n].ack);
+			ngap = ntohs(gacks[n].gap);
 			n++;
 			goto next_gap_ack;
 		}
 	}
-	if (released || retransmitted)
-		tipc_link_update_cwin(l, released, retransmitted);
-	if (released)
-		tipc_link_advance_backlog(l, xmitq);
-	return 0;
+
+	/* Renew last Gap ACK blocks for bc if needed */
+	if (bc_has_acked) {
+		if (this_ga) {
+			kfree(last_ga);
+			r->last_ga = this_ga;
+			r->last_gap = gap;
+		} else if (last_ga) {
+			if (less(acked, start)) {
+				si--;
+				offset = start - acked - 1;
+			} else if (less(acked, end)) {
+				acked = end;
+			}
+			if (si < last_ga->bgack_cnt) {
+				last_ga->start_index = si;
+				r->last_gap = offset;
+			} else {
+				kfree(last_ga);
+				r->last_ga = NULL;
+				r->last_gap = 0;
+			}
+		} else {
+			r->last_gap = 0;
+		}
+		r->acked = acked;
+	} else {
+		kfree(this_ga);
+	}
+
+	return qlen - skb_queue_len(&l->transmq);
 }
 
 /* tipc_link_build_state_msg: prepare link state message for transmission
@@ -1651,7 +1746,8 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
 			kfree_skb(skb);
 			break;
 		}
-		released += tipc_link_release_pkts(l, msg_ack(hdr));
+		released += tipc_link_advance_transmq(l, l, msg_ack(hdr), 0,
+						      NULL, NULL, NULL, NULL);
 
 		/* Defer delivery if sequence gap */
 		if (unlikely(seqno != rcv_nxt)) {
@@ -1739,7 +1835,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
 		msg_set_probe(hdr, probe);
 		msg_set_is_keepalive(hdr, probe || probe_reply);
 		if (l->peer_caps & TIPC_GAP_ACK_BLOCK)
-			glen = tipc_build_gap_ack_blks(l, data, rcvgap);
+			glen = tipc_build_gap_ack_blks(l, hdr);
 		tipc_mon_prep(l->net, data + glen, &dlen, mstate, l->bearer_id);
 		msg_set_size(hdr, INT_H_SIZE + glen + dlen);
 		skb_trim(skb, INT_H_SIZE + glen + dlen);
@@ -2027,20 +2123,19 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
 {
 	struct tipc_msg *hdr = buf_msg(skb);
 	struct tipc_gap_ack_blks *ga = NULL;
-	u16 rcvgap = 0;
-	u16 ack = msg_ack(hdr);
-	u16 gap = msg_seq_gap(hdr);
+	bool reply = msg_probe(hdr), retransmitted = false;
+	u16 dlen = msg_data_sz(hdr), glen = 0;
 	u16 peers_snd_nxt =  msg_next_sent(hdr);
 	u16 peers_tol = msg_link_tolerance(hdr);
 	u16 peers_prio = msg_linkprio(hdr);
+	u16 gap = msg_seq_gap(hdr);
+	u16 ack = msg_ack(hdr);
 	u16 rcv_nxt = l->rcv_nxt;
-	u16 dlen = msg_data_sz(hdr);
+	u16 rcvgap = 0;
 	int mtyp = msg_type(hdr);
-	bool reply = msg_probe(hdr);
-	u16 glen = 0;
-	void *data;
+	int rc = 0, released;
 	char *if_name;
-	int rc = 0;
+	void *data;
 
 	trace_tipc_proto_rcv(skb, false, l->name);
 	if (tipc_link_is_blocked(l) || !xmitq)
@@ -2137,13 +2232,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
 		}
 
 		/* Receive Gap ACK blocks from peer if any */
-		if (l->peer_caps & TIPC_GAP_ACK_BLOCK) {
-			ga = (struct tipc_gap_ack_blks *)data;
-			glen = ntohs(ga->len);
-			/* sanity check: if failed, ignore Gap ACK blocks */
-			if (glen != tipc_gap_ack_blks_sz(ga->gack_cnt))
-				ga = NULL;
-		}
+		glen = tipc_get_gap_ack_blks(&ga, l, hdr, true);
 
 		tipc_mon_rcv(l->net, data + glen, dlen - glen, l->addr,
 			     &l->mon_state, l->bearer_id);
@@ -2158,9 +2247,14 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
 			tipc_link_build_proto_msg(l, STATE_MSG, 0, reply,
 						  rcvgap, 0, 0, xmitq);
 
-		rc |= tipc_link_advance_transmq(l, ack, gap, ga, xmitq);
+		released = tipc_link_advance_transmq(l, l, ack, gap, ga, xmitq,
+						     &retransmitted, &rc);
 		if (gap)
 			l->stats.recv_nacks++;
+		if (released || retransmitted)
+			tipc_link_update_cwin(l, released, retransmitted);
+		if (released)
+			tipc_link_advance_backlog(l, xmitq);
 		if (unlikely(!skb_queue_empty(&l->wakeupq)))
 			link_prepare_wakeup(l);
 	}
@@ -2246,10 +2340,7 @@ void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr)
 int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
 			  struct sk_buff_head *xmitq)
 {
-	struct tipc_link *snd_l = l->bc_sndlink;
 	u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
-	u16 from = msg_bcast_ack(hdr) + 1;
-	u16 to = from + msg_bc_gap(hdr) - 1;
 	int rc = 0;
 
 	if (!link_is_up(l))
@@ -2271,8 +2362,6 @@ int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
 	if (more(peers_snd_nxt, l->rcv_nxt + l->window))
 		return rc;
 
-	rc = tipc_link_bc_retrans(snd_l, l, from, to, xmitq);
-
 	l->snd_nxt = peers_snd_nxt;
 	if (link_bc_rcv_gap(l))
 		rc |= TIPC_LINK_SND_STATE;
@@ -2307,38 +2396,27 @@ int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
 	return 0;
 }
 
-void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
-			  struct sk_buff_head *xmitq)
+int tipc_link_bc_ack_rcv(struct tipc_link *r, u16 acked, u16 gap,
+			 struct tipc_gap_ack_blks *ga,
+			 struct sk_buff_head *xmitq)
 {
-	struct sk_buff *skb, *tmp;
-	struct tipc_link *snd_l = l->bc_sndlink;
+	struct tipc_link *l = r->bc_sndlink;
+	bool unused = false;
+	int rc = 0;
 
-	if (!link_is_up(l) || !l->bc_peer_is_up)
-		return;
+	if (!link_is_up(r) || !r->bc_peer_is_up)
+		return 0;
 
-	if (!more(acked, l->acked))
-		return;
+	if (less(acked, r->acked) || (acked == r->acked && !gap && !ga))
+		return 0;
 
-	trace_tipc_link_bc_ack(l, l->acked, acked, &snd_l->transmq);
-	/* Skip over packets peer has already acked */
-	skb_queue_walk(&snd_l->transmq, skb) {
-		if (more(buf_seqno(skb), l->acked))
-			break;
-	}
+	tipc_link_advance_transmq(l, r, acked, gap, ga, xmitq, &unused, &rc);
 
-	/* Update/release the packets peer is acking now */
-	skb_queue_walk_from_safe(&snd_l->transmq, skb, tmp) {
-		if (more(buf_seqno(skb), acked))
-			break;
-		if (!--TIPC_SKB_CB(skb)->ackers) {
-			__skb_unlink(skb, &snd_l->transmq);
-			kfree_skb(skb);
-		}
-	}
-	l->acked = acked;
-	tipc_link_advance_backlog(snd_l, xmitq);
-	if (unlikely(!skb_queue_empty(&snd_l->wakeupq)))
-		link_prepare_wakeup(snd_l);
+	tipc_link_advance_backlog(l, xmitq);
+	if (unlikely(!skb_queue_empty(&l->wakeupq)))
+		link_prepare_wakeup(l);
+
+	return rc;
 }
 
 /* tipc_link_bc_nack_rcv(): receive broadcast nack message
@@ -2366,8 +2444,7 @@ int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
 		return 0;
 
 	if (dnode == tipc_own_addr(l->net)) {
-		tipc_link_bc_ack_rcv(l, acked, xmitq);
-		rc = tipc_link_bc_retrans(l->bc_sndlink, l, from, to, xmitq);
+		rc = tipc_link_bc_ack_rcv(l, acked, to - acked, NULL, xmitq);
 		l->stats.recv_nacks++;
 		return rc;
 	}
diff --git a/net/tipc/link.h b/net/tipc/link.h
index d3c1c3fc1659..0a0fa7350722 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -143,8 +143,11 @@ int tipc_link_bc_peers(struct tipc_link *l);
 void tipc_link_set_mtu(struct tipc_link *l, int mtu);
 int tipc_link_mtu(struct tipc_link *l);
 int tipc_link_mss(struct tipc_link *l);
-void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
-			  struct sk_buff_head *xmitq);
+u16 tipc_get_gap_ack_blks(struct tipc_gap_ack_blks **ga, struct tipc_link *l,
+			  struct tipc_msg *hdr, bool uc);
+int tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, u16 gap,
+			 struct tipc_gap_ack_blks *ga,
+			 struct sk_buff_head *xmitq);
 void tipc_link_build_bc_sync_msg(struct tipc_link *l,
 				 struct sk_buff_head *xmitq);
 void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr);
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 871feadbbc19..ca5f8689a33b 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -160,20 +160,39 @@ struct tipc_gap_ack {
 
 /* struct tipc_gap_ack_blks
  * @len: actual length of the record
- * @gack_cnt: number of Gap ACK blocks in the record
+ * @ugack_cnt: number of Gap ACK blocks for unicast (following the broadcast
+ *             ones)
+ * @start_index: starting index for "valid" broadcast Gap ACK blocks
+ * @bgack_cnt: number of Gap ACK blocks for broadcast in the record
  * @gacks: array of Gap ACK blocks
+ *
+ *  31                       16 15                        0
+ * +-------------+-------------+-------------+-------------+
+ * |  bgack_cnt  |  ugack_cnt  |            len            |
+ * +-------------+-------------+-------------+-------------+  -
+ * |            gap            |            ack            |   |
+ * +-------------+-------------+-------------+-------------+    > bc gacks
+ * :                           :                           :   |
+ * +-------------+-------------+-------------+-------------+  -
+ * |            gap            |            ack            |   |
+ * +-------------+-------------+-------------+-------------+    > uc gacks
+ * :                           :                           :   |
+ * +-------------+-------------+-------------+-------------+  -
  */
 struct tipc_gap_ack_blks {
 	__be16 len;
-	u8 gack_cnt;
-	u8 reserved;
+	union {
+		u8 ugack_cnt;
+		u8 start_index;
+	};
+	u8 bgack_cnt;
 	struct tipc_gap_ack gacks[];
 };
 
 #define tipc_gap_ack_blks_sz(n) (sizeof(struct tipc_gap_ack_blks) + \
 				 sizeof(struct tipc_gap_ack) * (n))
 
-#define MAX_GAP_ACK_BLKS	32
+#define MAX_GAP_ACK_BLKS	128
 #define MAX_GAP_ACK_BLKS_SZ	tipc_gap_ack_blks_sz(MAX_GAP_ACK_BLKS)
 
 static inline struct tipc_msg *buf_msg(struct sk_buff *skb)
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 803a3a6d0f50..6a49b3eeaae9 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -2071,10 +2071,16 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
 	le = &n->links[bearer_id];
 
 	/* Ensure broadcast reception is in synch with peer's send state */
-	if (unlikely(usr == LINK_PROTOCOL))
+	if (unlikely(usr == LINK_PROTOCOL)) {
+		if (unlikely(skb_linearize(skb))) {
+			tipc_node_put(n);
+			goto discard;
+		}
+		hdr = buf_msg(skb);
 		tipc_node_bc_sync_rcv(n, hdr, bearer_id, &xmitq);
-	else if (unlikely(tipc_link_acked(n->bc_entry.link) != bc_ack))
+	} else if (unlikely(tipc_link_acked(n->bc_entry.link) != bc_ack)) {
 		tipc_bcast_ack_rcv(net, n->bc_entry.link, hdr);
+	}
 
 	/* Receive packet directly if conditions permit */
 	tipc_node_read_lock(n);
-- 
2.13.7


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [net-next 2/5] tipc: add back link trace events
  2020-05-26  9:38 [net-next 0/5] tipc: add some improvements Tuong Lien
  2020-05-26  9:38 ` [net-next 1/5] tipc: introduce Gap ACK blocks for broadcast link Tuong Lien
@ 2020-05-26  9:38 ` Tuong Lien
  2020-05-26  9:38 ` [net-next 3/5] tipc: enable broadcast retrans via unicast Tuong Lien
                   ` (3 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Tuong Lien @ 2020-05-26  9:38 UTC (permalink / raw)
  To: davem, jmaloy, maloy, ying.xue, netdev; +Cc: tipc-discussion

In the previous commit ("tipc: add Gap ACK blocks support for broadcast
link"), we have removed the following link trace events due to the code
changes:

- tipc_link_bc_ack
- tipc_link_retrans

This commit adds them back along with some minor changes to adapt to
the new code.

Acked-by: Ying Xue <ying.xue@windriver.com>
Acked-by: Jon Maloy <jmaloy@redhat.com>
Signed-off-by: Tuong Lien <tuong.t.lien@dektech.com.au>
---
 net/tipc/link.c  |  3 +++
 net/tipc/trace.h | 13 ++++++++-----
 2 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/net/tipc/link.c b/net/tipc/link.c
index d29b9c531171..288c5670cfa5 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1504,6 +1504,8 @@ static int tipc_link_advance_transmq(struct tipc_link *l, struct tipc_link *r,
 	bool is_uc = !link_is_bc_sndlink(l);
 	bool bc_has_acked = false;
 
+	trace_tipc_link_retrans(r, acked + 1, acked + gap, &l->transmq);
+
 	/* Determine Gap ACK blocks if any for the particular link */
 	if (ga && is_uc) {
 		/* Get the Gap ACKs, uc part */
@@ -2410,6 +2412,7 @@ int tipc_link_bc_ack_rcv(struct tipc_link *r, u16 acked, u16 gap,
 	if (less(acked, r->acked) || (acked == r->acked && !gap && !ga))
 		return 0;
 
+	trace_tipc_link_bc_ack(r, acked, gap, &l->transmq);
 	tipc_link_advance_transmq(l, r, acked, gap, ga, xmitq, &unused, &rc);
 
 	tipc_link_advance_backlog(l, xmitq);
diff --git a/net/tipc/trace.h b/net/tipc/trace.h
index 4d8e00483afc..e7535ab75255 100644
--- a/net/tipc/trace.h
+++ b/net/tipc/trace.h
@@ -299,8 +299,10 @@ DECLARE_EVENT_CLASS(tipc_link_transmq_class,
 		__entry->from = f;
 		__entry->to = t;
 		__entry->len = skb_queue_len(tq);
-		__entry->fseqno = msg_seqno(buf_msg(skb_peek(tq)));
-		__entry->lseqno = msg_seqno(buf_msg(skb_peek_tail(tq)));
+		__entry->fseqno = __entry->len ?
+				  msg_seqno(buf_msg(skb_peek(tq))) : 0;
+		__entry->lseqno = __entry->len ?
+				  msg_seqno(buf_msg(skb_peek_tail(tq))) : 0;
 	),
 
 	TP_printk("<%s> retrans req: [%u-%u] transmq: %u [%u-%u]\n",
@@ -308,15 +310,16 @@ DECLARE_EVENT_CLASS(tipc_link_transmq_class,
 		  __entry->len, __entry->fseqno, __entry->lseqno)
 );
 
-DEFINE_EVENT(tipc_link_transmq_class, tipc_link_retrans,
+DEFINE_EVENT_CONDITION(tipc_link_transmq_class, tipc_link_retrans,
 	TP_PROTO(struct tipc_link *r, u16 f, u16 t, struct sk_buff_head *tq),
-	TP_ARGS(r, f, t, tq)
+	TP_ARGS(r, f, t, tq),
+	TP_CONDITION(less_eq(f, t))
 );
 
 DEFINE_EVENT_PRINT(tipc_link_transmq_class, tipc_link_bc_ack,
 	TP_PROTO(struct tipc_link *r, u16 f, u16 t, struct sk_buff_head *tq),
 	TP_ARGS(r, f, t, tq),
-	TP_printk("<%s> acked: [%u-%u] transmq: %u [%u-%u]\n",
+	TP_printk("<%s> acked: %u gap: %u transmq: %u [%u-%u]\n",
 		  __entry->name, __entry->from, __entry->to,
 		  __entry->len, __entry->fseqno, __entry->lseqno)
 );
-- 
2.13.7


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [net-next 3/5] tipc: enable broadcast retrans via unicast
  2020-05-26  9:38 [net-next 0/5] tipc: add some improvements Tuong Lien
  2020-05-26  9:38 ` [net-next 1/5] tipc: introduce Gap ACK blocks for broadcast link Tuong Lien
  2020-05-26  9:38 ` [net-next 2/5] tipc: add back link trace events Tuong Lien
@ 2020-05-26  9:38 ` Tuong Lien
  2020-05-26  9:38 ` [net-next 4/5] tipc: add support for broadcast rcv stats dumping Tuong Lien
                   ` (2 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Tuong Lien @ 2020-05-26  9:38 UTC (permalink / raw)
  To: davem, jmaloy, maloy, ying.xue, netdev; +Cc: tipc-discussion

In some environment, broadcast traffic is suppressed at high rate (i.e.
a kind of bandwidth limit setting). When it is applied, TIPC broadcast
can still run successfully. However, when it comes to a high load, some
packets will be dropped first and TIPC tries to retransmit them but the
packet retransmission is intentionally broadcast too, so making things
worse and not helpful at all.

This commit enables the broadcast retransmission via unicast which only
retransmits packets to the specific peer that has really reported a gap
i.e. not broadcasting to all nodes in the cluster, so will prevent from
being suppressed, and also reduce some overheads on the other peers due
to duplicates, finally improve the overall TIPC broadcast performance.

Note: the functionality can be turned on/off via the sysctl file:

echo 1 > /proc/sys/net/tipc/bc_retruni
echo 0 > /proc/sys/net/tipc/bc_retruni

Default is '0', i.e. the broadcast retransmission still works as usual.

Acked-by: Ying Xue <ying.xue@windriver.com>
Acked-by: Jon Maloy <jmaloy@redhat.com>
Signed-off-by: Tuong Lien <tuong.t.lien@dektech.com.au>
---
 net/tipc/bcast.c  | 11 ++++++++---
 net/tipc/bcast.h  |  4 +++-
 net/tipc/link.c   | 10 ++++++----
 net/tipc/link.h   |  3 ++-
 net/tipc/node.c   |  2 +-
 net/tipc/sysctl.c |  9 ++++++++-
 6 files changed, 28 insertions(+), 11 deletions(-)

diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 3ce690a96ee9..50a16f8bebd9 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -46,6 +46,7 @@
 #define BCLINK_WIN_MIN      32	/* bcast minimum link window size */
 
 const char tipc_bclink_name[] = "broadcast-link";
+unsigned long sysctl_tipc_bc_retruni __read_mostly;
 
 /**
  * struct tipc_bc_base - base structure for keeping broadcast send state
@@ -474,7 +475,7 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
 	__skb_queue_head_init(&xmitq);
 
 	tipc_bcast_lock(net);
-	tipc_link_bc_ack_rcv(l, acked, 0, NULL, &xmitq);
+	tipc_link_bc_ack_rcv(l, acked, 0, NULL, &xmitq, NULL);
 	tipc_bcast_unlock(net);
 
 	tipc_bcbase_xmit(net, &xmitq);
@@ -489,7 +490,8 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
  * RCU is locked, no other locks set
  */
 int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
-			struct tipc_msg *hdr)
+			struct tipc_msg *hdr,
+			struct sk_buff_head *retrq)
 {
 	struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
 	struct tipc_gap_ack_blks *ga;
@@ -503,8 +505,11 @@ int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
 		tipc_link_bc_init_rcv(l, hdr);
 	} else if (!msg_bc_ack_invalid(hdr)) {
 		tipc_get_gap_ack_blks(&ga, l, hdr, false);
+		if (!sysctl_tipc_bc_retruni)
+			retrq = &xmitq;
 		rc = tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr),
-					  msg_bc_gap(hdr), ga, &xmitq);
+					  msg_bc_gap(hdr), ga, &xmitq,
+					  retrq);
 		rc |= tipc_link_bc_sync_rcv(l, hdr, &xmitq);
 	}
 	tipc_bcast_unlock(net);
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 9e847d9617d3..97d3cf9d3e4d 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -45,6 +45,7 @@ struct tipc_nl_msg;
 struct tipc_nlist;
 struct tipc_nitem;
 extern const char tipc_bclink_name[];
+extern unsigned long sysctl_tipc_bc_retruni;
 
 #define TIPC_METHOD_EXPIRE msecs_to_jiffies(5000)
 
@@ -93,7 +94,8 @@ int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
 void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
 			struct tipc_msg *hdr);
 int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
-			struct tipc_msg *hdr);
+			struct tipc_msg *hdr,
+			struct sk_buff_head *retrq);
 int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
 int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
 int tipc_bclink_reset_stats(struct net *net);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 288c5670cfa5..af352391e2ab 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -375,7 +375,7 @@ void tipc_link_remove_bc_peer(struct tipc_link *snd_l,
 	snd_l->ackers--;
 	rcv_l->bc_peer_is_up = true;
 	rcv_l->state = LINK_ESTABLISHED;
-	tipc_link_bc_ack_rcv(rcv_l, ack, 0, NULL, xmitq);
+	tipc_link_bc_ack_rcv(rcv_l, ack, 0, NULL, xmitq, NULL);
 	trace_tipc_link_reset(rcv_l, TIPC_DUMP_ALL, "bclink removed!");
 	tipc_link_reset(rcv_l);
 	rcv_l->state = LINK_RESET;
@@ -2400,7 +2400,8 @@ int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
 
 int tipc_link_bc_ack_rcv(struct tipc_link *r, u16 acked, u16 gap,
 			 struct tipc_gap_ack_blks *ga,
-			 struct sk_buff_head *xmitq)
+			 struct sk_buff_head *xmitq,
+			 struct sk_buff_head *retrq)
 {
 	struct tipc_link *l = r->bc_sndlink;
 	bool unused = false;
@@ -2413,7 +2414,7 @@ int tipc_link_bc_ack_rcv(struct tipc_link *r, u16 acked, u16 gap,
 		return 0;
 
 	trace_tipc_link_bc_ack(r, acked, gap, &l->transmq);
-	tipc_link_advance_transmq(l, r, acked, gap, ga, xmitq, &unused, &rc);
+	tipc_link_advance_transmq(l, r, acked, gap, ga, retrq, &unused, &rc);
 
 	tipc_link_advance_backlog(l, xmitq);
 	if (unlikely(!skb_queue_empty(&l->wakeupq)))
@@ -2447,7 +2448,8 @@ int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
 		return 0;
 
 	if (dnode == tipc_own_addr(l->net)) {
-		rc = tipc_link_bc_ack_rcv(l, acked, to - acked, NULL, xmitq);
+		rc = tipc_link_bc_ack_rcv(l, acked, to - acked, NULL, xmitq,
+					  xmitq);
 		l->stats.recv_nacks++;
 		return rc;
 	}
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 0a0fa7350722..4d0768cf91d5 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -147,7 +147,8 @@ u16 tipc_get_gap_ack_blks(struct tipc_gap_ack_blks **ga, struct tipc_link *l,
 			  struct tipc_msg *hdr, bool uc);
 int tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, u16 gap,
 			 struct tipc_gap_ack_blks *ga,
-			 struct sk_buff_head *xmitq);
+			 struct sk_buff_head *xmitq,
+			 struct sk_buff_head *retrq);
 void tipc_link_build_bc_sync_msg(struct tipc_link *l,
 				 struct sk_buff_head *xmitq);
 void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 6a49b3eeaae9..548207fdec15 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1772,7 +1772,7 @@ static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr,
 	struct tipc_link *ucl;
 	int rc;
 
-	rc = tipc_bcast_sync_rcv(n->net, n->bc_entry.link, hdr);
+	rc = tipc_bcast_sync_rcv(n->net, n->bc_entry.link, hdr, xmitq);
 
 	if (rc & TIPC_LINK_DOWN_EVT) {
 		tipc_node_reset_links(n);
diff --git a/net/tipc/sysctl.c b/net/tipc/sysctl.c
index 58ab3d6dcdce..97a6264a2993 100644
--- a/net/tipc/sysctl.c
+++ b/net/tipc/sysctl.c
@@ -36,7 +36,7 @@
 #include "core.h"
 #include "trace.h"
 #include "crypto.h"
-
+#include "bcast.h"
 #include <linux/sysctl.h>
 
 static struct ctl_table_header *tipc_ctl_hdr;
@@ -75,6 +75,13 @@ static struct ctl_table tipc_table[] = {
 		.extra1         = SYSCTL_ONE,
 	},
 #endif
+	{
+		.procname	= "bc_retruni",
+		.data		= &sysctl_tipc_bc_retruni,
+		.maxlen		= sizeof(sysctl_tipc_bc_retruni),
+		.mode		= 0644,
+		.proc_handler	= proc_doulongvec_minmax,
+	},
 	{}
 };
 
-- 
2.13.7


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [net-next 4/5] tipc: add support for broadcast rcv stats dumping
  2020-05-26  9:38 [net-next 0/5] tipc: add some improvements Tuong Lien
                   ` (2 preceding siblings ...)
  2020-05-26  9:38 ` [net-next 3/5] tipc: enable broadcast retrans via unicast Tuong Lien
@ 2020-05-26  9:38 ` Tuong Lien
  2020-05-26  9:38 ` [net-next 5/5] tipc: add test for Nagle algorithm effectiveness Tuong Lien
  2020-05-26 22:17 ` [net-next 0/5] tipc: add some improvements David Miller
  5 siblings, 0 replies; 7+ messages in thread
From: Tuong Lien @ 2020-05-26  9:38 UTC (permalink / raw)
  To: davem, jmaloy, maloy, ying.xue, netdev; +Cc: tipc-discussion

This commit enables dumping the statistics of a broadcast-receiver link
like the traditional 'broadcast-link' one (which is for broadcast-
sender). The link dumping can be triggered via netlink (e.g. the
iproute2/tipc tool) by the link flag - 'TIPC_NLA_LINK_BROADCAST' as the
indicator.

The name of a broadcast-receiver link of a specific peer will be in the
format: 'broadcast-link:<peer-id>'.

For example:

Link <broadcast-link:1001002>
  Window:50 packets
  RX packets:7841 fragments:2408/440 bundles:0/0
  TX packets:0 fragments:0/0 bundles:0/0
  RX naks:0 defs:124 dups:0
  TX naks:21 acks:0 retrans:0
  Congestion link:0  Send queue max:0 avg:0

In addition, the broadcast-receiver link statistics can be reset in the
usual way via netlink by specifying that link name in command.

Note: the 'tipc_link_name_ext()' is removed because the link name can
now be retrieved simply via the 'l->name'.

Acked-by: Ying Xue <ying.xue@windriver.com>
Acked-by: Jon Maloy <jmaloy@redhat.com>
Signed-off-by: Tuong Lien <tuong.t.lien@dektech.com.au>
---
 net/tipc/bcast.c   |  6 ++---
 net/tipc/bcast.h   |  5 +++--
 net/tipc/link.c    | 65 +++++++++++++++++++++++++++---------------------------
 net/tipc/link.h    |  3 +--
 net/tipc/msg.c     |  9 ++++----
 net/tipc/msg.h     |  2 +-
 net/tipc/netlink.c |  2 +-
 net/tipc/node.c    | 61 +++++++++++++++++++++++++++++++++++++++++++-------
 net/tipc/trace.h   |  4 ++--
 9 files changed, 101 insertions(+), 56 deletions(-)

diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 50a16f8bebd9..383f87bc1061 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -563,10 +563,8 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)
 		tipc_sk_rcv(net, inputq);
 }
 
-int tipc_bclink_reset_stats(struct net *net)
+int tipc_bclink_reset_stats(struct net *net, struct tipc_link *l)
 {
-	struct tipc_link *l = tipc_bc_sndlink(net);
-
 	if (!l)
 		return -ENOPROTOOPT;
 
@@ -694,7 +692,7 @@ int tipc_bcast_init(struct net *net)
 	tn->bcbase = bb;
 	spin_lock_init(&tipc_net(net)->bclock);
 
-	if (!tipc_link_bc_create(net, 0, 0,
+	if (!tipc_link_bc_create(net, 0, 0, NULL,
 				 FB_MTU,
 				 BCLINK_WIN_DEFAULT,
 				 BCLINK_WIN_DEFAULT,
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 97d3cf9d3e4d..4240c95188b1 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -96,9 +96,10 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
 int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
 			struct tipc_msg *hdr,
 			struct sk_buff_head *retrq);
-int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
+int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg,
+			struct tipc_link *bcl);
 int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
-int tipc_bclink_reset_stats(struct net *net);
+int tipc_bclink_reset_stats(struct net *net, struct tipc_link *l);
 
 u32 tipc_bcast_get_broadcast_mode(struct net *net);
 u32 tipc_bcast_get_broadcast_ratio(struct net *net);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index af352391e2ab..ee3b8d0576b8 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -539,7 +539,7 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
  *
  * Returns true if link was created, otherwise false
  */
-bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer,
+bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer, u8 *peer_id,
 			 int mtu, u32 min_win, u32 max_win, u16 peer_caps,
 			 struct sk_buff_head *inputq,
 			 struct sk_buff_head *namedq,
@@ -554,7 +554,18 @@ bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer,
 		return false;
 
 	l = *link;
-	strcpy(l->name, tipc_bclink_name);
+	if (peer_id) {
+		char peer_str[NODE_ID_STR_LEN] = {0,};
+
+		tipc_nodeid2string(peer_str, peer_id);
+		if (strlen(peer_str) > 16)
+			sprintf(peer_str, "%x", peer);
+		/* Broadcast receiver link name: "broadcast-link:<peer>" */
+		snprintf(l->name, sizeof(l->name), "%s:%s", tipc_bclink_name,
+			 peer_str);
+	} else {
+		strcpy(l->name, tipc_bclink_name);
+	}
 	trace_tipc_link_reset(l, TIPC_DUMP_ALL, "bclink created!");
 	tipc_link_reset(l);
 	l->state = LINK_RESET;
@@ -1412,11 +1423,8 @@ static u8 __tipc_build_gap_ack_blks(struct tipc_gap_ack_blks *ga,
 			gacks[n].ack = htons(expect - 1);
 			gacks[n].gap = htons(seqno - expect);
 			if (++n >= MAX_GAP_ACK_BLKS / 2) {
-				char buf[TIPC_MAX_LINK_NAME];
-
 				pr_info_ratelimited("Gacks on %s: %d, ql: %d!\n",
-						    tipc_link_name_ext(l, buf),
-						    n,
+						    l->name, n,
 						    skb_queue_len(&l->deferdq));
 				return n;
 			}
@@ -1587,6 +1595,8 @@ static int tipc_link_advance_transmq(struct tipc_link *l, struct tipc_link *r,
 			_skb->priority = TC_PRIO_CONTROL;
 			__skb_queue_tail(xmitq, _skb);
 			l->stats.retransmitted++;
+			if (!is_uc)
+				r->stats.retransmitted++;
 			*retransmitted = true;
 			/* Increase actual retrans counter & mark first time */
 			if (!TIPC_SKB_CB(skb)->retr_cnt++)
@@ -1753,7 +1763,8 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
 
 		/* Defer delivery if sequence gap */
 		if (unlikely(seqno != rcv_nxt)) {
-			__tipc_skb_queue_sorted(defq, seqno, skb);
+			if (!__tipc_skb_queue_sorted(defq, seqno, skb))
+				l->stats.duplicates++;
 			rc |= tipc_link_build_nack_msg(l, xmitq);
 			break;
 		}
@@ -1787,15 +1798,15 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
 				      int tolerance, int priority,
 				      struct sk_buff_head *xmitq)
 {
+	struct tipc_mon_state *mstate = &l->mon_state;
+	struct sk_buff_head *dfq = &l->deferdq;
 	struct tipc_link *bcl = l->bc_rcvlink;
-	struct sk_buff *skb;
 	struct tipc_msg *hdr;
-	struct sk_buff_head *dfq = &l->deferdq;
+	struct sk_buff *skb;
 	bool node_up = link_is_up(bcl);
-	struct tipc_mon_state *mstate = &l->mon_state;
+	u16 glen = 0, bc_rcvgap = 0;
 	int dlen = 0;
 	void *data;
-	u16 glen = 0;
 
 	/* Don't send protocol message during reset or link failover */
 	if (tipc_link_is_blocked(l))
@@ -1833,7 +1844,8 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
 		if (l->peer_caps & TIPC_LINK_PROTO_SEQNO)
 			msg_set_seqno(hdr, l->snd_nxt_state++);
 		msg_set_seq_gap(hdr, rcvgap);
-		msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl));
+		bc_rcvgap = link_bc_rcv_gap(bcl);
+		msg_set_bc_gap(hdr, bc_rcvgap);
 		msg_set_probe(hdr, probe);
 		msg_set_is_keepalive(hdr, probe || probe_reply);
 		if (l->peer_caps & TIPC_GAP_ACK_BLOCK)
@@ -1858,6 +1870,8 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
 		l->stats.sent_probes++;
 	if (rcvgap)
 		l->stats.sent_nacks++;
+	if (bc_rcvgap)
+		bcl->stats.sent_nacks++;
 	skb->priority = TC_PRIO_CONTROL;
 	__skb_queue_tail(xmitq, skb);
 	trace_tipc_proto_build(skb, false, l->name);
@@ -2358,8 +2372,6 @@ int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
 	if (!l->bc_peer_is_up)
 		return rc;
 
-	l->stats.recv_nacks++;
-
 	/* Ignore if peers_snd_nxt goes beyond receive window */
 	if (more(peers_snd_nxt, l->rcv_nxt + l->window))
 		return rc;
@@ -2410,6 +2422,11 @@ int tipc_link_bc_ack_rcv(struct tipc_link *r, u16 acked, u16 gap,
 	if (!link_is_up(r) || !r->bc_peer_is_up)
 		return 0;
 
+	if (gap) {
+		l->stats.recv_nacks++;
+		r->stats.recv_nacks++;
+	}
+
 	if (less(acked, r->acked) || (acked == r->acked && !gap && !ga))
 		return 0;
 
@@ -2721,16 +2738,15 @@ static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb,
 	return -EMSGSIZE;
 }
 
-int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
+int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg,
+			struct tipc_link *bcl)
 {
 	int err;
 	void *hdr;
 	struct nlattr *attrs;
 	struct nlattr *prop;
-	struct tipc_net *tn = net_generic(net, tipc_net_id);
 	u32 bc_mode = tipc_bcast_get_broadcast_mode(net);
 	u32 bc_ratio = tipc_bcast_get_broadcast_ratio(net);
-	struct tipc_link *bcl = tn->bcl;
 
 	if (!bcl)
 		return 0;
@@ -2817,21 +2833,6 @@ void tipc_link_set_abort_limit(struct tipc_link *l, u32 limit)
 	l->abort_limit = limit;
 }
 
-char *tipc_link_name_ext(struct tipc_link *l, char *buf)
-{
-	if (!l)
-		scnprintf(buf, TIPC_MAX_LINK_NAME, "null");
-	else if (link_is_bc_sndlink(l))
-		scnprintf(buf, TIPC_MAX_LINK_NAME, "broadcast-sender");
-	else if (link_is_bc_rcvlink(l))
-		scnprintf(buf, TIPC_MAX_LINK_NAME,
-			  "broadcast-receiver, peer %x", l->addr);
-	else
-		memcpy(buf, l->name, TIPC_MAX_LINK_NAME);
-
-	return buf;
-}
-
 /**
  * tipc_link_dump - dump TIPC link data
  * @l: tipc link to be dumped
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 4d0768cf91d5..fc07232c9a12 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -80,7 +80,7 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
 		      struct sk_buff_head *inputq,
 		      struct sk_buff_head *namedq,
 		      struct tipc_link **link);
-bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer,
+bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer, u8 *peer_id,
 			 int mtu, u32 min_win, u32 max_win, u16 peer_caps,
 			 struct sk_buff_head *inputq,
 			 struct sk_buff_head *namedq,
@@ -111,7 +111,6 @@ u16 tipc_link_rcv_nxt(struct tipc_link *l);
 u16 tipc_link_acked(struct tipc_link *l);
 u32 tipc_link_id(struct tipc_link *l);
 char *tipc_link_name(struct tipc_link *l);
-char *tipc_link_name_ext(struct tipc_link *l, char *buf);
 u32 tipc_link_state(struct tipc_link *l);
 char tipc_link_plane(struct tipc_link *l);
 int tipc_link_prio(struct tipc_link *l);
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 4d0e0bdd997b..c69fb99163fc 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -825,19 +825,19 @@ bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
  * @seqno: sequence number of buffer to add
  * @skb: buffer to add
  */
-void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
+bool __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
 			     struct sk_buff *skb)
 {
 	struct sk_buff *_skb, *tmp;
 
 	if (skb_queue_empty(list) || less(seqno, buf_seqno(skb_peek(list)))) {
 		__skb_queue_head(list, skb);
-		return;
+		return true;
 	}
 
 	if (more(seqno, buf_seqno(skb_peek_tail(list)))) {
 		__skb_queue_tail(list, skb);
-		return;
+		return true;
 	}
 
 	skb_queue_walk_safe(list, _skb, tmp) {
@@ -846,9 +846,10 @@ void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
 		if (seqno == buf_seqno(_skb))
 			break;
 		__skb_queue_before(list, _skb, skb);
-		return;
+		return true;
 	}
 	kfree_skb(skb);
+	return false;
 }
 
 void tipc_skb_reject(struct net *net, int err, struct sk_buff *skb,
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index ca5f8689a33b..cd4281779468 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -1145,7 +1145,7 @@ bool tipc_msg_assemble(struct sk_buff_head *list);
 bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq);
 bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
 			struct sk_buff_head *cpy);
-void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
+bool __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
 			     struct sk_buff *skb);
 bool tipc_msg_skb_clone(struct sk_buff_head *msg, struct sk_buff_head *cpy);
 
diff --git a/net/tipc/netlink.c b/net/tipc/netlink.c
index bb9862410e68..c4aee6247d55 100644
--- a/net/tipc/netlink.c
+++ b/net/tipc/netlink.c
@@ -188,7 +188,7 @@ static const struct genl_ops tipc_genl_v2_ops[] = {
 	},
 	{
 		.cmd	= TIPC_NL_LINK_GET,
-		.validate = GENL_DONT_VALIDATE_STRICT | GENL_DONT_VALIDATE_DUMP,
+		.validate = GENL_DONT_VALIDATE_STRICT,
 		.doit   = tipc_nl_node_get_link,
 		.dumpit	= tipc_nl_node_dump_link,
 	},
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 548207fdec15..0312fb181d94 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1138,7 +1138,7 @@ void tipc_node_check_dest(struct net *net, u32 addr,
 	if (unlikely(!n->bc_entry.link)) {
 		snd_l = tipc_bc_sndlink(net);
 		if (!tipc_link_bc_create(net, tipc_own_addr(net),
-					 addr, U16_MAX,
+					 addr, peer_id, U16_MAX,
 					 tipc_link_min_win(snd_l),
 					 tipc_link_max_win(snd_l),
 					 n->capabilities,
@@ -2435,7 +2435,7 @@ int tipc_nl_node_get_link(struct sk_buff *skb, struct genl_info *info)
 		return -ENOMEM;
 
 	if (strcmp(name, tipc_bclink_name) == 0) {
-		err = tipc_nl_add_bc_link(net, &msg);
+		err = tipc_nl_add_bc_link(net, &msg, tipc_net(net)->bcl);
 		if (err)
 			goto err_free;
 	} else {
@@ -2479,6 +2479,7 @@ int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct genl_info *info)
 	struct tipc_node *node;
 	struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
 	struct net *net = sock_net(skb->sk);
+	struct tipc_net *tn = tipc_net(net);
 	struct tipc_link_entry *le;
 
 	if (!info->attrs[TIPC_NLA_LINK])
@@ -2495,11 +2496,26 @@ int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct genl_info *info)
 
 	link_name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
 
-	if (strcmp(link_name, tipc_bclink_name) == 0) {
-		err = tipc_bclink_reset_stats(net);
+	err = -EINVAL;
+	if (!strcmp(link_name, tipc_bclink_name)) {
+		err = tipc_bclink_reset_stats(net, tipc_bc_sndlink(net));
 		if (err)
 			return err;
 		return 0;
+	} else if (strstr(link_name, tipc_bclink_name)) {
+		rcu_read_lock();
+		list_for_each_entry_rcu(node, &tn->node_list, list) {
+			tipc_node_read_lock(node);
+			link = node->bc_entry.link;
+			if (link && !strcmp(link_name, tipc_link_name(link))) {
+				err = tipc_bclink_reset_stats(net, link);
+				tipc_node_read_unlock(node);
+				break;
+			}
+			tipc_node_read_unlock(node);
+		}
+		rcu_read_unlock();
+		return err;
 	}
 
 	node = tipc_node_find_by_name(net, link_name, &bearer_id);
@@ -2523,7 +2539,8 @@ int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct genl_info *info)
 
 /* Caller should hold node lock  */
 static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg,
-				    struct tipc_node *node, u32 *prev_link)
+				    struct tipc_node *node, u32 *prev_link,
+				    bool bc_link)
 {
 	u32 i;
 	int err;
@@ -2539,6 +2556,14 @@ static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg,
 		if (err)
 			return err;
 	}
+
+	if (bc_link) {
+		*prev_link = i;
+		err = tipc_nl_add_bc_link(net, msg, node->bc_entry.link);
+		if (err)
+			return err;
+	}
+
 	*prev_link = 0;
 
 	return 0;
@@ -2547,17 +2572,36 @@ static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg,
 int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb)
 {
 	struct net *net = sock_net(skb->sk);
+	struct nlattr **attrs = genl_dumpit_info(cb)->attrs;
+	struct nlattr *link[TIPC_NLA_LINK_MAX + 1];
 	struct tipc_net *tn = net_generic(net, tipc_net_id);
 	struct tipc_node *node;
 	struct tipc_nl_msg msg;
 	u32 prev_node = cb->args[0];
 	u32 prev_link = cb->args[1];
 	int done = cb->args[2];
+	bool bc_link = cb->args[3];
 	int err;
 
 	if (done)
 		return 0;
 
+	if (!prev_node) {
+		/* Check if broadcast-receiver links dumping is needed */
+		if (attrs && attrs[TIPC_NLA_LINK]) {
+			err = nla_parse_nested_deprecated(link,
+							  TIPC_NLA_LINK_MAX,
+							  attrs[TIPC_NLA_LINK],
+							  tipc_nl_link_policy,
+							  NULL);
+			if (unlikely(err))
+				return err;
+			if (unlikely(!link[TIPC_NLA_LINK_BROADCAST]))
+				return -EINVAL;
+			bc_link = true;
+		}
+	}
+
 	msg.skb = skb;
 	msg.portid = NETLINK_CB(cb->skb).portid;
 	msg.seq = cb->nlh->nlmsg_seq;
@@ -2581,7 +2625,7 @@ int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb)
 						 list) {
 			tipc_node_read_lock(node);
 			err = __tipc_nl_add_node_links(net, &msg, node,
-						       &prev_link);
+						       &prev_link, bc_link);
 			tipc_node_read_unlock(node);
 			if (err)
 				goto out;
@@ -2589,14 +2633,14 @@ int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb)
 			prev_node = node->addr;
 		}
 	} else {
-		err = tipc_nl_add_bc_link(net, &msg);
+		err = tipc_nl_add_bc_link(net, &msg, tn->bcl);
 		if (err)
 			goto out;
 
 		list_for_each_entry_rcu(node, &tn->node_list, list) {
 			tipc_node_read_lock(node);
 			err = __tipc_nl_add_node_links(net, &msg, node,
-						       &prev_link);
+						       &prev_link, bc_link);
 			tipc_node_read_unlock(node);
 			if (err)
 				goto out;
@@ -2611,6 +2655,7 @@ int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb)
 	cb->args[0] = prev_node;
 	cb->args[1] = prev_link;
 	cb->args[2] = done;
+	cb->args[3] = bc_link;
 
 	return skb->len;
 }
diff --git a/net/tipc/trace.h b/net/tipc/trace.h
index e7535ab75255..04af83f0500c 100644
--- a/net/tipc/trace.h
+++ b/net/tipc/trace.h
@@ -255,7 +255,7 @@ DECLARE_EVENT_CLASS(tipc_link_class,
 
 	TP_fast_assign(
 		__assign_str(header, header);
-		tipc_link_name_ext(l, __entry->name);
+		memcpy(__entry->name, tipc_link_name(l), TIPC_MAX_LINK_NAME);
 		tipc_link_dump(l, dqueues, __get_str(buf));
 	),
 
@@ -295,7 +295,7 @@ DECLARE_EVENT_CLASS(tipc_link_transmq_class,
 	),
 
 	TP_fast_assign(
-		tipc_link_name_ext(r, __entry->name);
+		memcpy(__entry->name, tipc_link_name(r), TIPC_MAX_LINK_NAME);
 		__entry->from = f;
 		__entry->to = t;
 		__entry->len = skb_queue_len(tq);
-- 
2.13.7


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [net-next 5/5] tipc: add test for Nagle algorithm effectiveness
  2020-05-26  9:38 [net-next 0/5] tipc: add some improvements Tuong Lien
                   ` (3 preceding siblings ...)
  2020-05-26  9:38 ` [net-next 4/5] tipc: add support for broadcast rcv stats dumping Tuong Lien
@ 2020-05-26  9:38 ` Tuong Lien
  2020-05-26 22:17 ` [net-next 0/5] tipc: add some improvements David Miller
  5 siblings, 0 replies; 7+ messages in thread
From: Tuong Lien @ 2020-05-26  9:38 UTC (permalink / raw)
  To: davem, jmaloy, maloy, ying.xue, netdev; +Cc: tipc-discussion

When streaming in Nagle mode, we try to bundle small messages from user
as many as possible if there is one outstanding buffer, i.e. not ACK-ed
by the receiving side, which helps boost up the overall throughput. So,
the algorithm's effectiveness really depends on when Nagle ACK comes or
what the specific network latency (RTT) is, compared to the user's
message sending rate.

In a bad case, the user's sending rate is low or the network latency is
small, there will not be many bundles, so making a Nagle ACK or waiting
for it is not meaningful.
For example: a user sends its messages every 100ms and the RTT is 50ms,
then for each messages, we require one Nagle ACK but then there is only
one user message sent without any bundles.

In a better case, even if we have a few bundles (e.g. the RTT = 300ms),
but now the user sends messages in medium size, then there will not be
any difference at all, that says 3 x 1000-byte data messages if bundled
will still result in 3 bundles with MTU = 1500.

When Nagle is ineffective, the delay in user message sending is clearly
wasted instead of sending directly.

Besides, adding Nagle ACKs will consume some processor load on both the
sending and receiving sides.

This commit adds a test on the effectiveness of the Nagle algorithm for
an individual connection in the network on which it actually runs.
Particularly, upon receipt of a Nagle ACK we will compare the number of
bundles in the backlog queue to the number of user messages which would
be sent directly without Nagle. If the ratio is good (e.g. >= 2), Nagle
mode will be kept for further message sending. Otherwise, we will leave
Nagle and put a 'penalty' on the connection, so it will have to spend
more 'one-way' messages before being able to re-enter Nagle.

In addition, the 'ack-required' bit is only set when really needed that
the number of Nagle ACKs will be reduced during Nagle mode.

Testing with benchmark showed that with the patch, there was not much
difference in throughput for small messages since the tool continuously
sends messages without a break, so Nagle would still take in effect.

Acked-by: Ying Xue <ying.xue@windriver.com>
Acked-by: Jon Maloy <jmaloy@redhat.com>
Signed-off-by: Tuong Lien <tuong.t.lien@dektech.com.au>
---
 net/tipc/msg.c    |  3 ---
 net/tipc/msg.h    | 14 ++++++++++--
 net/tipc/socket.c | 64 ++++++++++++++++++++++++++++++++++++++++++++-----------
 3 files changed, 64 insertions(+), 17 deletions(-)

diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index c69fb99163fc..23809039dda1 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -235,9 +235,6 @@ int tipc_msg_append(struct tipc_msg *_hdr, struct msghdr *m, int dlen,
 			msg_set_size(hdr, MIN_H_SIZE);
 			__skb_queue_tail(txq, skb);
 			total += 1;
-			if (prev)
-				msg_set_ack_required(buf_msg(prev), 0);
-			msg_set_ack_required(hdr, 1);
 		}
 		hdr = buf_msg(skb);
 		curr = msg_blocks(hdr);
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index cd4281779468..58660d56bc83 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -340,9 +340,19 @@ static inline int msg_ack_required(struct tipc_msg *m)
 	return msg_bits(m, 0, 18, 1);
 }
 
-static inline void msg_set_ack_required(struct tipc_msg *m, u32 d)
+static inline void msg_set_ack_required(struct tipc_msg *m)
 {
-	msg_set_bits(m, 0, 18, 1, d);
+	msg_set_bits(m, 0, 18, 1, 1);
+}
+
+static inline int msg_nagle_ack(struct tipc_msg *m)
+{
+	return msg_bits(m, 0, 18, 1);
+}
+
+static inline void msg_set_nagle_ack(struct tipc_msg *m)
+{
+	msg_set_bits(m, 0, 18, 1, 1);
 }
 
 static inline bool msg_is_rcast(struct tipc_msg *m)
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index e370ad0edd76..d6b67d07d22e 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -48,6 +48,8 @@
 #include "group.h"
 #include "trace.h"
 
+#define NAGLE_START_INIT	4
+#define NAGLE_START_MAX		1024
 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
 #define CONN_PROBING_INTV	msecs_to_jiffies(3600000)  /* [ms] => 1 h */
 #define TIPC_FWD_MSG		1
@@ -119,7 +121,10 @@ struct tipc_sock {
 	struct rcu_head rcu;
 	struct tipc_group *group;
 	u32 oneway;
+	u32 nagle_start;
 	u16 snd_backlog;
+	u16 msg_acc;
+	u16 pkt_cnt;
 	bool expect_ack;
 	bool nodelay;
 	bool group_is_open;
@@ -143,7 +148,7 @@ static int tipc_sk_insert(struct tipc_sock *tsk);
 static void tipc_sk_remove(struct tipc_sock *tsk);
 static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
-static void tipc_sk_push_backlog(struct tipc_sock *tsk);
+static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack);
 
 static const struct proto_ops packet_ops;
 static const struct proto_ops stream_ops;
@@ -474,6 +479,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
 	tsk = tipc_sk(sk);
 	tsk->max_pkt = MAX_PKT_DEFAULT;
 	tsk->maxnagle = 0;
+	tsk->nagle_start = NAGLE_START_INIT;
 	INIT_LIST_HEAD(&tsk->publications);
 	INIT_LIST_HEAD(&tsk->cong_links);
 	msg = &tsk->phdr;
@@ -541,7 +547,7 @@ static void __tipc_shutdown(struct socket *sock, int error)
 					    !tsk_conn_cong(tsk)));
 
 	/* Push out delayed messages if in Nagle mode */
-	tipc_sk_push_backlog(tsk);
+	tipc_sk_push_backlog(tsk, false);
 	/* Remove pending SYN */
 	__skb_queue_purge(&sk->sk_write_queue);
 
@@ -1252,14 +1258,37 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
 /* tipc_sk_push_backlog(): send accumulated buffers in socket write queue
  *                         when socket is in Nagle mode
  */
-static void tipc_sk_push_backlog(struct tipc_sock *tsk)
+static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack)
 {
 	struct sk_buff_head *txq = &tsk->sk.sk_write_queue;
+	struct sk_buff *skb = skb_peek_tail(txq);
 	struct net *net = sock_net(&tsk->sk);
 	u32 dnode = tsk_peer_node(tsk);
-	struct sk_buff *skb = skb_peek(txq);
 	int rc;
 
+	if (nagle_ack) {
+		tsk->pkt_cnt += skb_queue_len(txq);
+		if (!tsk->pkt_cnt || tsk->msg_acc / tsk->pkt_cnt < 2) {
+			tsk->oneway = 0;
+			if (tsk->nagle_start < NAGLE_START_MAX)
+				tsk->nagle_start *= 2;
+			tsk->expect_ack = false;
+			pr_debug("tsk %10u: bad nagle %u -> %u, next start %u!\n",
+				 tsk->portid, tsk->msg_acc, tsk->pkt_cnt,
+				 tsk->nagle_start);
+		} else {
+			tsk->nagle_start = NAGLE_START_INIT;
+			if (skb) {
+				msg_set_ack_required(buf_msg(skb));
+				tsk->expect_ack = true;
+			} else {
+				tsk->expect_ack = false;
+			}
+		}
+		tsk->msg_acc = 0;
+		tsk->pkt_cnt = 0;
+	}
+
 	if (!skb || tsk->cong_link_cnt)
 		return;
 
@@ -1267,9 +1296,10 @@ static void tipc_sk_push_backlog(struct tipc_sock *tsk)
 	if (msg_is_syn(buf_msg(skb)))
 		return;
 
+	if (tsk->msg_acc)
+		tsk->pkt_cnt += skb_queue_len(txq);
 	tsk->snt_unacked += tsk->snd_backlog;
 	tsk->snd_backlog = 0;
-	tsk->expect_ack = true;
 	rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
 	if (rc == -ELINKCONG)
 		tsk->cong_link_cnt = 1;
@@ -1322,8 +1352,7 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
 		return;
 	} else if (mtyp == CONN_ACK) {
 		was_cong = tsk_conn_cong(tsk);
-		tsk->expect_ack = false;
-		tipc_sk_push_backlog(tsk);
+		tipc_sk_push_backlog(tsk, msg_nagle_ack(hdr));
 		tsk->snt_unacked -= msg_conn_ack(hdr);
 		if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
 			tsk->snd_win = msg_adv_win(hdr);
@@ -1516,6 +1545,7 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
 	struct tipc_sock *tsk = tipc_sk(sk);
 	struct tipc_msg *hdr = &tsk->phdr;
 	struct net *net = sock_net(sk);
+	struct sk_buff *skb;
 	u32 dnode = tsk_peer_node(tsk);
 	int maxnagle = tsk->maxnagle;
 	int maxpkt = tsk->max_pkt;
@@ -1544,17 +1574,25 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
 			break;
 		send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
 		blocks = tsk->snd_backlog;
-		if (tsk->oneway++ >= 4 && send <= maxnagle) {
+		if (tsk->oneway++ >= tsk->nagle_start && send <= maxnagle) {
 			rc = tipc_msg_append(hdr, m, send, maxnagle, txq);
 			if (unlikely(rc < 0))
 				break;
 			blocks += rc;
+			tsk->msg_acc++;
 			if (blocks <= 64 && tsk->expect_ack) {
 				tsk->snd_backlog = blocks;
 				sent += send;
 				break;
+			} else if (blocks > 64) {
+				tsk->pkt_cnt += skb_queue_len(txq);
+			} else {
+				skb = skb_peek_tail(txq);
+				msg_set_ack_required(buf_msg(skb));
+				tsk->expect_ack = true;
+				tsk->msg_acc = 0;
+				tsk->pkt_cnt = 0;
 			}
-			tsk->expect_ack = true;
 		} else {
 			rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq);
 			if (unlikely(rc != send))
@@ -2091,7 +2129,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
 		smp_wmb();
 		tsk->cong_link_cnt--;
 		wakeup = true;
-		tipc_sk_push_backlog(tsk);
+		tipc_sk_push_backlog(tsk, false);
 		break;
 	case GROUP_PROTOCOL:
 		tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
@@ -2180,7 +2218,7 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb,
 		return false;
 	case TIPC_ESTABLISHED:
 		if (!skb_queue_empty(&sk->sk_write_queue))
-			tipc_sk_push_backlog(tsk);
+			tipc_sk_push_backlog(tsk, false);
 		/* Accept only connection-based messages sent by peer */
 		if (likely(con_msg && !err && pport == oport &&
 			   pnode == onode)) {
@@ -2188,8 +2226,10 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb,
 				struct sk_buff *skb;
 
 				skb = tipc_sk_build_ack(tsk);
-				if (skb)
+				if (skb) {
+					msg_set_nagle_ack(buf_msg(skb));
 					__skb_queue_tail(xmitq, skb);
+				}
 			}
 			return true;
 		}
-- 
2.13.7


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* Re: [net-next 0/5] tipc: add some improvements
  2020-05-26  9:38 [net-next 0/5] tipc: add some improvements Tuong Lien
                   ` (4 preceding siblings ...)
  2020-05-26  9:38 ` [net-next 5/5] tipc: add test for Nagle algorithm effectiveness Tuong Lien
@ 2020-05-26 22:17 ` David Miller
  5 siblings, 0 replies; 7+ messages in thread
From: David Miller @ 2020-05-26 22:17 UTC (permalink / raw)
  To: tuong.t.lien; +Cc: jmaloy, maloy, ying.xue, netdev, tipc-discussion

From: Tuong Lien <tuong.t.lien@dektech.com.au>
Date: Tue, 26 May 2020 16:38:33 +0700

> This series adds some improvements to TIPC.
> 
> The first patch improves the TIPC broadcast's performance with the 'Gap
> ACK blocks' mechanism similar to unicast before, while the others give
> support on tracing & statistics for broadcast links, and an alternative
> to carry broadcast retransmissions via unicast which might be useful in
> some cases.
> 
> Besides, the Nagle algorithm can now automatically 'adjust' itself
> depending on the specific network condition a stream connection runs by
> the last patch.

Series applied, thanks.

^ permalink raw reply	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2020-05-26 22:17 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-05-26  9:38 [net-next 0/5] tipc: add some improvements Tuong Lien
2020-05-26  9:38 ` [net-next 1/5] tipc: introduce Gap ACK blocks for broadcast link Tuong Lien
2020-05-26  9:38 ` [net-next 2/5] tipc: add back link trace events Tuong Lien
2020-05-26  9:38 ` [net-next 3/5] tipc: enable broadcast retrans via unicast Tuong Lien
2020-05-26  9:38 ` [net-next 4/5] tipc: add support for broadcast rcv stats dumping Tuong Lien
2020-05-26  9:38 ` [net-next 5/5] tipc: add test for Nagle algorithm effectiveness Tuong Lien
2020-05-26 22:17 ` [net-next 0/5] tipc: add some improvements David Miller

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.