All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH net-next 0/7] tipc: some optimizations and impovements
@ 2015-03-13 14:27 Jon Maloy
  2015-03-13 14:27 ` [PATCH net-next 1/7] tipc: add framework for node capabilities exchange Jon Maloy
                   ` (7 more replies)
  0 siblings, 8 replies; 11+ messages in thread
From: Jon Maloy @ 2015-03-13 14:27 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

The commits in this series contain some relatively simple changes
that lead to better throughput across TIPC connections. We also make
changes to the implementation of link transmission queueing and priority
handling, in order to make the code more comprehensible and maintainable.


Jon Maloy (7):
  tipc: add framework for node capabilities exchange
  tipc: move message validation function to msg.c
  tipc: eliminate unnecessary linearization of incoming buffers
  tipc: extract bundled buffers by cloning instead of copying
  tipc: eliminate unnecessary call to broadcast ack function
  tipc: split link outqueue
  tipc: clean up handling of message priorities

 net/tipc/bcast.c    |  53 ++++----
 net/tipc/discover.c |   3 +
 net/tipc/link.c     | 352 +++++++++++++++++++---------------------------------
 net/tipc/link.h     |  17 ++-
 net/tipc/msg.c      | 123 ++++++++++++------
 net/tipc/msg.h      |  87 +++++++------
 net/tipc/node.c     |   4 +-
 net/tipc/node.h     |   6 +-
 8 files changed, 303 insertions(+), 342 deletions(-)

-- 
1.9.1

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH net-next 1/7] tipc: add framework for node capabilities exchange
  2015-03-13 14:27 [PATCH net-next 0/7] tipc: some optimizations and impovements Jon Maloy
@ 2015-03-13 14:27 ` Jon Maloy
  2015-03-13 14:27 ` [PATCH net-next 2/7] tipc: move message validation function to msg.c Jon Maloy
                   ` (6 subsequent siblings)
  7 siblings, 0 replies; 11+ messages in thread
From: Jon Maloy @ 2015-03-13 14:27 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

The TIPC protocol spec has defined a 13 bit capability bitmap in
the neighbor discovery header, as a means to maintain compatibility
between different code and protocol generations. Until now this field
has been unused.

We now introduce the basic framework for exchanging capabilities
between nodes at first contact. After exchange, a peer node's
capabilities are stored as a 16 bit bitmap in struct tipc_node.

Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
 net/tipc/discover.c |  3 +++
 net/tipc/msg.h      | 11 ++++++++++-
 net/tipc/node.h     |  4 +++-
 3 files changed, 16 insertions(+), 2 deletions(-)

diff --git a/net/tipc/discover.c b/net/tipc/discover.c
index 5967506..169f3dd 100644
--- a/net/tipc/discover.c
+++ b/net/tipc/discover.c
@@ -89,6 +89,7 @@ static void tipc_disc_init_msg(struct net *net, struct sk_buff *buf, u32 type,
 		      MAX_H_SIZE, dest_domain);
 	msg_set_non_seq(msg, 1);
 	msg_set_node_sig(msg, tn->random);
+	msg_set_node_capabilities(msg, 0);
 	msg_set_dest_domain(msg, dest_domain);
 	msg_set_bc_netid(msg, tn->net_id);
 	b_ptr->media->addr2msg(msg_media_addr(msg), &b_ptr->addr);
@@ -133,6 +134,7 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
 	u32 net_id = msg_bc_netid(msg);
 	u32 mtyp = msg_type(msg);
 	u32 signature = msg_node_sig(msg);
+	u16 caps = msg_node_capabilities(msg);
 	bool addr_match = false;
 	bool sign_match = false;
 	bool link_up = false;
@@ -167,6 +169,7 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
 	if (!node)
 		return;
 	tipc_node_lock(node);
+	node->capabilities = caps;
 	link = node->links[bearer->identity];
 
 	/* Prepare to validate requesting node's signature and media address */
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index fa16784..7cece64 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -510,7 +510,6 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
 #define DSC_REQ_MSG		0
 #define DSC_RESP_MSG		1
 
-
 /*
  * Word 1
  */
@@ -534,6 +533,16 @@ static inline void msg_set_node_sig(struct tipc_msg *m, u32 n)
 	msg_set_bits(m, 1, 0, 0xffff, n);
 }
 
+static inline u32 msg_node_capabilities(struct tipc_msg *m)
+{
+	return msg_bits(m, 1, 15, 0x1fff);
+}
+
+static inline void msg_set_node_capabilities(struct tipc_msg *m, u32 n)
+{
+	msg_set_bits(m, 1, 15, 0x1fff, n);
+}
+
 
 /*
  * Word 2
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 3d18c66..f78be64 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -106,6 +106,7 @@ struct tipc_node_bclink {
  * @list: links to adjacent nodes in sorted list of cluster's nodes
  * @working_links: number of working links to node (both active and standby)
  * @link_cnt: number of links to node
+ * @capabilities: bitmap, indicating peer node's functional capabilities
  * @signature: node instance identifier
  * @link_id: local and remote bearer ids of changing link, if any
  * @publ_list: list of publications
@@ -125,7 +126,8 @@ struct tipc_node {
 	struct tipc_node_bclink bclink;
 	struct list_head list;
 	int link_cnt;
-	int working_links;
+	u16 working_links;
+	u16 capabilities;
 	u32 signature;
 	u32 link_id;
 	struct list_head publ_list;
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH net-next 2/7] tipc: move message validation function to msg.c
  2015-03-13 14:27 [PATCH net-next 0/7] tipc: some optimizations and impovements Jon Maloy
  2015-03-13 14:27 ` [PATCH net-next 1/7] tipc: add framework for node capabilities exchange Jon Maloy
@ 2015-03-13 14:27 ` Jon Maloy
  2015-03-13 14:27 ` [PATCH net-next 3/7] tipc: eliminate unnecessary linearization of incoming buffers Jon Maloy
                   ` (5 subsequent siblings)
  7 siblings, 0 replies; 11+ messages in thread
From: Jon Maloy @ 2015-03-13 14:27 UTC (permalink / raw)
  To: davem; +Cc: Jon Maloy, netdev, Paul Gortmaker, tipc-discussion

The function link_buf_validate() is in reality re-entrant and context
independent, and will in later commits be called from several locations.
Therefore, we move it to msg.c, make it outline and rename the it to
tipc_msg_validate().

We also make some cosmetic changes to the function.

Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
 net/tipc/link.c | 58 +--------------------------------------------------------
 net/tipc/msg.c  | 42 ++++++++++++++++++++++++++++++++++++++++-
 net/tipc/msg.h  |  5 +++--
 3 files changed, 45 insertions(+), 60 deletions(-)

diff --git a/net/tipc/link.c b/net/tipc/link.c
index 98609fd..944c8c6 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1048,61 +1048,6 @@ static void link_retrieve_defq(struct tipc_link *link,
 }
 
 /**
- * link_recv_buf_validate - validate basic format of received message
- *
- * This routine ensures a TIPC message has an acceptable header, and at least
- * as much data as the header indicates it should.  The routine also ensures
- * that the entire message header is stored in the main fragment of the message
- * buffer, to simplify future access to message header fields.
- *
- * Note: Having extra info present in the message header or data areas is OK.
- * TIPC will ignore the excess, under the assumption that it is optional info
- * introduced by a later release of the protocol.
- */
-static int link_recv_buf_validate(struct sk_buff *buf)
-{
-	static u32 min_data_hdr_size[8] = {
-		SHORT_H_SIZE, MCAST_H_SIZE, NAMED_H_SIZE, BASIC_H_SIZE,
-		MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
-		};
-
-	struct tipc_msg *msg;
-	u32 tipc_hdr[2];
-	u32 size;
-	u32 hdr_size;
-	u32 min_hdr_size;
-
-	/* If this packet comes from the defer queue, the skb has already
-	 * been validated
-	 */
-	if (unlikely(TIPC_SKB_CB(buf)->deferred))
-		return 1;
-
-	if (unlikely(buf->len < MIN_H_SIZE))
-		return 0;
-
-	msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
-	if (msg == NULL)
-		return 0;
-
-	if (unlikely(msg_version(msg) != TIPC_VERSION))
-		return 0;
-
-	size = msg_size(msg);
-	hdr_size = msg_hdr_sz(msg);
-	min_hdr_size = msg_isdata(msg) ?
-		min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;
-
-	if (unlikely((hdr_size < min_hdr_size) ||
-		     (size < hdr_size) ||
-		     (buf->len < size) ||
-		     (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
-		return 0;
-
-	return pskb_may_pull(buf, hdr_size);
-}
-
-/**
  * tipc_rcv - process TIPC packets/messages arriving from off-node
  * @net: the applicable net namespace
  * @skb: TIPC packet
@@ -1127,7 +1072,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
 
 	while ((skb = __skb_dequeue(&head))) {
 		/* Ensure message is well-formed */
-		if (unlikely(!link_recv_buf_validate(skb)))
+		if (unlikely(!tipc_msg_validate(skb)))
 			goto discard;
 
 		/* Ensure message data is a single contiguous unit */
@@ -1398,7 +1343,6 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
 
 	if (tipc_link_defer_pkt(&l_ptr->deferred_queue, buf)) {
 		l_ptr->stats.deferred_recv++;
-		TIPC_SKB_CB(buf)->deferred = true;
 		if ((skb_queue_len(&l_ptr->deferred_queue) % 16) == 1)
 			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
 	} else {
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index b6eb90c..1364ceb 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -1,7 +1,7 @@
 /*
  * net/tipc/msg.c: TIPC message header routines
  *
- * Copyright (c) 2000-2006, 2014, Ericsson AB
+ * Copyright (c) 2000-2006, 2014-2015, Ericsson AB
  * Copyright (c) 2005, 2010-2011, Wind River Systems
  * All rights reserved.
  *
@@ -181,6 +181,46 @@ err:
 	return 0;
 }
 
+/* tipc_msg_validate - validate basic format of received message
+ *
+ * This routine ensures a TIPC message has an acceptable header, and at least
+ * as much data as the header indicates it should.  The routine also ensures
+ * that the entire message header is stored in the main fragment of the message
+ * buffer, to simplify future access to message header fields.
+ *
+ * Note: Having extra info present in the message header or data areas is OK.
+ * TIPC will ignore the excess, under the assumption that it is optional info
+ * introduced by a later release of the protocol.
+ */
+bool tipc_msg_validate(struct sk_buff *skb)
+{
+	struct tipc_msg *msg;
+	int hdr[2];
+	int msz, hsz, min_hsz, mtype;
+	u8 min_hdr_sz[4] = {
+		SHORT_H_SIZE, MCAST_H_SIZE, NAMED_H_SIZE, BASIC_H_SIZE
+	};
+
+	msg = skb_header_pointer(skb, 0, sizeof(hdr), hdr);
+	if (unlikely(!msg))
+		return false;
+	if (unlikely(TIPC_SKB_CB(skb)->validated))
+		return true;
+	if (unlikely(msg_version(msg) != TIPC_VERSION))
+		return false;
+
+	msz = msg_size(msg);
+	hsz = msg_hdr_sz(msg);
+	mtype = msg_type(msg);
+	min_hsz = msg_isdata(msg) ? min_hdr_sz[mtype] : INT_H_SIZE;
+
+	if (unlikely((hsz < min_hsz) || (msz < hsz) || (skb->len < msz)))
+		return false;
+	if (unlikely((msz - hsz) > TIPC_MAX_USER_MSG_SIZE))
+		return false;
+	TIPC_SKB_CB(skb)->validated = true;
+	return pskb_may_pull(skb, hsz);
+}
 
 /**
  * tipc_msg_build - create buffer chain containing specified header and data
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 7cece64..62306b8 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -1,7 +1,7 @@
 /*
  * net/tipc/msg.h: Include file for TIPC message header routines
  *
- * Copyright (c) 2000-2007, 2014, Ericsson AB
+ * Copyright (c) 2000-2007, 2014-2015 Ericsson AB
  * Copyright (c) 2005-2008, 2010-2011, Wind River Systems
  * All rights reserved.
  *
@@ -92,7 +92,7 @@ struct plist;
 struct tipc_skb_cb {
 	void *handle;
 	struct sk_buff *tail;
-	bool deferred;
+	bool validated;
 	bool wakeup_pending;
 	bool bundling;
 	u16 chain_sz;
@@ -758,6 +758,7 @@ static inline u32 msg_tot_origport(struct tipc_msg *m)
 }
 
 struct sk_buff *tipc_buf_acquire(u32 size);
+bool tipc_msg_validate(struct sk_buff *skb);
 bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode,
 		      int err);
 void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type,
-- 
1.9.1


------------------------------------------------------------------------------
Dive into the World of Parallel Programming The Go Parallel Website, sponsored
by Intel and developed in partnership with Slashdot Media, is your hub for all
things parallel software development, from weekly thought leadership blogs to
news, videos, case studies, tutorials and more. Take a look and join the 
conversation now. http://goparallel.sourceforge.net/

^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH net-next 3/7] tipc: eliminate unnecessary linearization of incoming buffers
  2015-03-13 14:27 [PATCH net-next 0/7] tipc: some optimizations and impovements Jon Maloy
  2015-03-13 14:27 ` [PATCH net-next 1/7] tipc: add framework for node capabilities exchange Jon Maloy
  2015-03-13 14:27 ` [PATCH net-next 2/7] tipc: move message validation function to msg.c Jon Maloy
@ 2015-03-13 14:27 ` Jon Maloy
  2015-03-13 14:27 ` [PATCH net-next 4/7] tipc: extract bundled buffers by cloning instead of copying Jon Maloy
                   ` (4 subsequent siblings)
  7 siblings, 0 replies; 11+ messages in thread
From: Jon Maloy @ 2015-03-13 14:27 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

Currently, TIPC linearizes all incoming buffers directly at reception
before passing them upwards in the stack. This is clearly a waste of
CPU resources, and must be avoided.

In this commit, we eliminate this unnecessary linearization. We still
ensure that at least the message header is linear, and that the buffer
is linearized where this is still needed, i.e. when unbundling and when
reversing messages.

In addition, we ensure that fragmented messages are validated after
reassembly before delivering them upwards in the stack.

Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
 net/tipc/link.c |  5 -----
 net/tipc/msg.c  | 14 ++++++++++----
 2 files changed, 10 insertions(+), 9 deletions(-)

diff --git a/net/tipc/link.c b/net/tipc/link.c
index 944c8c6..8c6639d 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1075,13 +1075,8 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
 		if (unlikely(!tipc_msg_validate(skb)))
 			goto discard;
 
-		/* Ensure message data is a single contiguous unit */
-		if (unlikely(skb_linearize(skb)))
-			goto discard;
-
 		/* Handle arrival of a non-unicast link message */
 		msg = buf_msg(skb);
-
 		if (unlikely(msg_non_seq(msg))) {
 			if (msg_user(msg) ==  LINK_CONFIG)
 				tipc_disc_rcv(net, skb, b_ptr);
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 1364ceb..663ed29 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -165,6 +165,9 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
 	}
 
 	if (fragid == LAST_FRAGMENT) {
+		TIPC_SKB_CB(head)->validated = false;
+		if (unlikely(!tipc_msg_validate(head)))
+			goto err;
 		*buf = head;
 		TIPC_SKB_CB(head)->tail = NULL;
 		*headbuf = NULL;
@@ -172,7 +175,6 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
 	}
 	*buf = NULL;
 	return 0;
-
 err:
 	pr_warn_ratelimited("Unable to build fragment list\n");
 	kfree_skb(*buf);
@@ -376,10 +378,14 @@ bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu)
  */
 bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos)
 {
-	struct tipc_msg *msg = buf_msg(skb);
+	struct tipc_msg *msg;
 	int imsz;
-	struct tipc_msg *imsg = (struct tipc_msg *)(msg_data(msg) + *pos);
+	struct tipc_msg *imsg;
 
+	if (unlikely(skb_linearize(skb)))
+		return false;
+	msg = buf_msg(skb);
+	imsg = (struct tipc_msg *)(msg_data(msg) + *pos);
 	/* Is there space left for shortest possible message? */
 	if (*pos > (msg_data_sz(msg) - SHORT_H_SIZE))
 		goto none;
@@ -461,11 +467,11 @@ bool tipc_msg_reverse(u32 own_addr,  struct sk_buff *buf, u32 *dnode,
 
 	if (skb_linearize(buf))
 		goto exit;
+	msg = buf_msg(buf);
 	if (msg_dest_droppable(msg))
 		goto exit;
 	if (msg_errcode(msg))
 		goto exit;
-
 	memcpy(&ohdr, msg, msg_hdr_sz(msg));
 	imp = min_t(uint, imp + 1, TIPC_CRITICAL_IMPORTANCE);
 	if (msg_isdata(msg))
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH net-next 4/7] tipc: extract bundled buffers by cloning instead of copying
  2015-03-13 14:27 [PATCH net-next 0/7] tipc: some optimizations and impovements Jon Maloy
                   ` (2 preceding siblings ...)
  2015-03-13 14:27 ` [PATCH net-next 3/7] tipc: eliminate unnecessary linearization of incoming buffers Jon Maloy
@ 2015-03-13 14:27 ` Jon Maloy
  2015-03-13 14:27 ` [PATCH net-next 5/7] tipc: eliminate unnecessary call to broadcast ack function Jon Maloy
                   ` (3 subsequent siblings)
  7 siblings, 0 replies; 11+ messages in thread
From: Jon Maloy @ 2015-03-13 14:27 UTC (permalink / raw)
  To: davem; +Cc: Jon Maloy, netdev, Paul Gortmaker, tipc-discussion

When we currently extract a bundled buffer from a message bundle in
the function tipc_msg_extract(), we allocate a new buffer and explicitly
copy the linear data area.

This is unnecessary, since we can just clone the buffer and do
skb_pull() on the clone to move the data pointer to the correct
position.

This is what we do in this commit.

Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
 net/tipc/link.c | 45 ++++++++++++---------------------------------
 net/tipc/msg.c  | 32 +++++++++++++++++++-------------
 2 files changed, 31 insertions(+), 46 deletions(-)

diff --git a/net/tipc/link.c b/net/tipc/link.c
index 8c6639d..56c39b1 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1,7 +1,7 @@
 /*
  * net/tipc/link.c: TIPC link code
  *
- * Copyright (c) 1996-2007, 2012-2014, Ericsson AB
+ * Copyright (c) 1996-2007, 2012-2015, Ericsson AB
  * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
  * All rights reserved.
  *
@@ -1117,7 +1117,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
 		ackd = msg_ack(msg);
 
 		/* Release acked messages */
-		if (n_ptr->bclink.recv_permitted)
+		if (likely(n_ptr->bclink.recv_permitted))
 			tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
 
 		released = 0;
@@ -1712,45 +1712,24 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
 	}
 }
 
-/**
- * buf_extract - extracts embedded TIPC message from another message
- * @skb: encapsulating message buffer
- * @from_pos: offset to extract from
- *
- * Returns a new message buffer containing an embedded message.  The
- * encapsulating buffer is left unchanged.
- */
-static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
-{
-	struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
-	u32 size = msg_size(msg);
-	struct sk_buff *eb;
-
-	eb = tipc_buf_acquire(size);
-	if (eb)
-		skb_copy_to_linear_data(eb, msg, size);
-	return eb;
-}
-
 /* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
  * Owner node is locked.
  */
-static void tipc_link_dup_rcv(struct tipc_link *l_ptr,
-			      struct sk_buff *t_buf)
+static void tipc_link_dup_rcv(struct tipc_link *link,
+			      struct sk_buff *skb)
 {
-	struct sk_buff *buf;
+	struct sk_buff *iskb;
+	int pos = 0;
 
-	if (!tipc_link_is_up(l_ptr))
+	if (!tipc_link_is_up(link))
 		return;
 
-	buf = buf_extract(t_buf, INT_H_SIZE);
-	if (buf == NULL) {
+	if (!tipc_msg_extract(skb, &iskb, &pos)) {
 		pr_warn("%sfailed to extract inner dup pkt\n", link_co_err);
 		return;
 	}
-
-	/* Add buffer to deferred queue, if applicable: */
-	link_handle_out_of_seq_msg(l_ptr, buf);
+	/* Append buffer to deferred queue, if applicable: */
+	link_handle_out_of_seq_msg(link, iskb);
 }
 
 /*  tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet
@@ -1762,6 +1741,7 @@ static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr,
 	struct tipc_msg *t_msg = buf_msg(t_buf);
 	struct sk_buff *buf = NULL;
 	struct tipc_msg *msg;
+	int pos = 0;
 
 	if (tipc_link_is_up(l_ptr))
 		tipc_link_reset(l_ptr);
@@ -1773,8 +1753,7 @@ static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr,
 	/* Should there be an inner packet? */
 	if (l_ptr->exp_msg_count) {
 		l_ptr->exp_msg_count--;
-		buf = buf_extract(t_buf, INT_H_SIZE);
-		if (buf == NULL) {
+		if (!tipc_msg_extract(t_buf, &buf, &pos)) {
 			pr_warn("%sno inner failover pkt\n", link_co_err);
 			goto exit;
 		}
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 663ed29..0462959 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -370,38 +370,44 @@ bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu)
 
 /**
  *  tipc_msg_extract(): extract bundled inner packet from buffer
- *  @skb: linear outer buffer, to be extracted from.
+ *  @skb: buffer to be extracted from.
  *  @iskb: extracted inner buffer, to be returned
- *  @pos: position of msg to be extracted. Returns with pointer of next msg
+ *  @pos: position in outer message of msg to be extracted.
+ *        Returns position of next msg
  *  Consumes outer buffer when last packet extracted
  *  Returns true when when there is an extracted buffer, otherwise false
  */
 bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos)
 {
 	struct tipc_msg *msg;
-	int imsz;
+	int imsz, offset;
 	struct tipc_msg *imsg;
 
 	if (unlikely(skb_linearize(skb)))
 		return false;
+
+	*iskb = NULL;
 	msg = buf_msg(skb);
-	imsg = (struct tipc_msg *)(msg_data(msg) + *pos);
+	offset = msg_hdr_sz(msg) + *pos;
+
 	/* Is there space left for shortest possible message? */
-	if (*pos > (msg_data_sz(msg) - SHORT_H_SIZE))
+	if (offset > (msg_size(msg) - SHORT_H_SIZE))
 		goto none;
-	imsz = msg_size(imsg);
 
-	/* Is there space left for current message ? */
-	if ((*pos + imsz) > msg_data_sz(msg))
-		goto none;
-	*iskb = tipc_buf_acquire(imsz);
+	*iskb = skb_clone(skb, GFP_ATOMIC);
 	if (!*iskb)
 		goto none;
-	skb_copy_to_linear_data(*iskb, imsg, imsz);
-	*pos += align(imsz);
-	return true;
+	skb_pull(*iskb, offset);
+	imsg = buf_msg(*iskb);
+	imsz = msg_size(imsg);
+	skb_trim(*iskb, imsz);
+	if (tipc_msg_validate(*iskb)) {
+		*pos += align(imsz);
+		return true;
+	}
 none:
 	kfree_skb(skb);
+	kfree_skb(*iskb);
 	*iskb = NULL;
 	return false;
 }
-- 
1.9.1


------------------------------------------------------------------------------
Dive into the World of Parallel Programming The Go Parallel Website, sponsored
by Intel and developed in partnership with Slashdot Media, is your hub for all
things parallel software development, from weekly thought leadership blogs to
news, videos, case studies, tutorials and more. Take a look and join the 
conversation now. http://goparallel.sourceforge.net/

^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH net-next 5/7] tipc: eliminate unnecessary call to broadcast ack function
  2015-03-13 14:27 [PATCH net-next 0/7] tipc: some optimizations and impovements Jon Maloy
                   ` (3 preceding siblings ...)
  2015-03-13 14:27 ` [PATCH net-next 4/7] tipc: extract bundled buffers by cloning instead of copying Jon Maloy
@ 2015-03-13 14:27 ` Jon Maloy
  2015-03-13 14:27 ` [PATCH net-next 6/7] tipc: split link outqueue Jon Maloy
                   ` (2 subsequent siblings)
  7 siblings, 0 replies; 11+ messages in thread
From: Jon Maloy @ 2015-03-13 14:27 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

The unicast packet header contains a broadcast acknowledge sequence
number, that may need to be conveyed to the broadcast link for proper
treatment. Currently, the function tipc_rcv(), which is on the most
critical data path, calls the function tipc_bclink_acknowledge() to
have this done. This call is made for each received packet, and results
in the unconditional grabbing of the broadcast link spinlock.

This is unnecessary, since we can see directly from tipc_rcv() if
the acknowledged number differs from what has been previously acked
from the node in question. In the vast majority of cases the numbers
won't differ, and there is nothing to update.

We now make the call to tipc_bclink_acknowledge() conditional
to that the two ack values differ.

Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
 net/tipc/bcast.c | 4 ++++
 net/tipc/link.c  | 2 +-
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 3e41704..5ee5076 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -215,7 +215,11 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
 	struct net *net = n_ptr->net;
 	struct tipc_net *tn = net_generic(net, tipc_net_id);
 
+	if (unlikely(!n_ptr->bclink.recv_permitted))
+		return;
+
 	tipc_bclink_lock(net);
+
 	/* Bail out if tx queue is empty (no clean up is required) */
 	skb = skb_peek(&tn->bcl->outqueue);
 	if (!skb)
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 56c39b1..2652c32 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1117,7 +1117,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
 		ackd = msg_ack(msg);
 
 		/* Release acked messages */
-		if (likely(n_ptr->bclink.recv_permitted))
+		if (unlikely(n_ptr->bclink.acked != msg_bcast_ack(msg)))
 			tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
 
 		released = 0;
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH net-next 6/7] tipc: split link outqueue
  2015-03-13 14:27 [PATCH net-next 0/7] tipc: some optimizations and impovements Jon Maloy
                   ` (4 preceding siblings ...)
  2015-03-13 14:27 ` [PATCH net-next 5/7] tipc: eliminate unnecessary call to broadcast ack function Jon Maloy
@ 2015-03-13 14:27 ` Jon Maloy
  2015-03-13 14:27 ` [PATCH net-next 7/7] tipc: clean up handling of message priorities Jon Maloy
  2015-03-13 16:52 ` [PATCH net-next 0/7] tipc: some optimizations and impovements David Miller
  7 siblings, 0 replies; 11+ messages in thread
From: Jon Maloy @ 2015-03-13 14:27 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

struct tipc_link contains one single queue for outgoing packets,
where both transmitted and waiting packets are queued.

This infrastructure is hard to maintain, because we need
to keep a number of fields to keep track of which packets are
sent or unsent, and the number of packets in each category.

A lot of code becomes simpler if we split this queue into a transmission
queue, where sent/unacknowledged packets are kept, and a backlog queue,
where we keep the not yet sent packets.

In this commit we do this separation.

Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
 net/tipc/bcast.c |  48 ++++++-------
 net/tipc/link.c  | 208 ++++++++++++++++++++++++++-----------------------------
 net/tipc/link.h  |  17 ++---
 net/tipc/msg.c   |  32 +++++----
 net/tipc/msg.h   |   6 +-
 net/tipc/node.c  |   4 +-
 net/tipc/node.h  |   2 +-
 7 files changed, 150 insertions(+), 167 deletions(-)

diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 5ee5076..17cb0ff 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -135,9 +135,10 @@ static void bclink_set_last_sent(struct net *net)
 {
 	struct tipc_net *tn = net_generic(net, tipc_net_id);
 	struct tipc_link *bcl = tn->bcl;
+	struct sk_buff *skb = skb_peek(&bcl->backlogq);
 
-	if (bcl->next_out)
-		bcl->fsm_msg_cnt = mod(buf_seqno(bcl->next_out) - 1);
+	if (skb)
+		bcl->fsm_msg_cnt = mod(buf_seqno(skb) - 1);
 	else
 		bcl->fsm_msg_cnt = mod(bcl->next_out_no - 1);
 }
@@ -180,7 +181,7 @@ static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
 	struct sk_buff *skb;
 	struct tipc_link *bcl = tn->bcl;
 
-	skb_queue_walk(&bcl->outqueue, skb) {
+	skb_queue_walk(&bcl->transmq, skb) {
 		if (more(buf_seqno(skb), after)) {
 			tipc_link_retransmit(bcl, skb, mod(to - after));
 			break;
@@ -210,7 +211,6 @@ void tipc_bclink_wakeup_users(struct net *net)
 void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
 {
 	struct sk_buff *skb, *tmp;
-	struct sk_buff *next;
 	unsigned int released = 0;
 	struct net *net = n_ptr->net;
 	struct tipc_net *tn = net_generic(net, tipc_net_id);
@@ -221,7 +221,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
 	tipc_bclink_lock(net);
 
 	/* Bail out if tx queue is empty (no clean up is required) */
-	skb = skb_peek(&tn->bcl->outqueue);
+	skb = skb_peek(&tn->bcl->transmq);
 	if (!skb)
 		goto exit;
 
@@ -248,27 +248,19 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
 	}
 
 	/* Skip over packets that node has previously acknowledged */
-	skb_queue_walk(&tn->bcl->outqueue, skb) {
+	skb_queue_walk(&tn->bcl->transmq, skb) {
 		if (more(buf_seqno(skb), n_ptr->bclink.acked))
 			break;
 	}
 
 	/* Update packets that node is now acknowledging */
-	skb_queue_walk_from_safe(&tn->bcl->outqueue, skb, tmp) {
+	skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) {
 		if (more(buf_seqno(skb), acked))
 			break;
-
-		next = tipc_skb_queue_next(&tn->bcl->outqueue, skb);
-		if (skb != tn->bcl->next_out) {
-			bcbuf_decr_acks(skb);
-		} else {
-			bcbuf_set_acks(skb, 0);
-			tn->bcl->next_out = next;
-			bclink_set_last_sent(net);
-		}
-
+		bcbuf_decr_acks(skb);
+		bclink_set_last_sent(net);
 		if (bcbuf_acks(skb) == 0) {
-			__skb_unlink(skb, &tn->bcl->outqueue);
+			__skb_unlink(skb, &tn->bcl->transmq);
 			kfree_skb(skb);
 			released = 1;
 		}
@@ -276,7 +268,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
 	n_ptr->bclink.acked = acked;
 
 	/* Try resolving broadcast link congestion, if necessary */
-	if (unlikely(tn->bcl->next_out)) {
+	if (unlikely(skb_peek(&tn->bcl->backlogq))) {
 		tipc_link_push_packets(tn->bcl);
 		bclink_set_last_sent(net);
 	}
@@ -323,7 +315,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
 	buf = tipc_buf_acquire(INT_H_SIZE);
 	if (buf) {
 		struct tipc_msg *msg = buf_msg(buf);
-		struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue);
+		struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq);
 		u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
 
 		tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG,
@@ -398,7 +390,7 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
 		if (likely(bclink->bcast_nodes.count)) {
 			rc = __tipc_link_xmit(net, bcl, list);
 			if (likely(!rc)) {
-				u32 len = skb_queue_len(&bcl->outqueue);
+				u32 len = skb_queue_len(&bcl->transmq);
 
 				bclink_set_last_sent(net);
 				bcl->stats.queue_sz_counts++;
@@ -563,25 +555,25 @@ receive:
 		if (node->bclink.last_in == node->bclink.last_sent)
 			goto unlock;
 
-		if (skb_queue_empty(&node->bclink.deferred_queue)) {
+		if (skb_queue_empty(&node->bclink.deferdq)) {
 			node->bclink.oos_state = 1;
 			goto unlock;
 		}
 
-		msg = buf_msg(skb_peek(&node->bclink.deferred_queue));
+		msg = buf_msg(skb_peek(&node->bclink.deferdq));
 		seqno = msg_seqno(msg);
 		next_in = mod(next_in + 1);
 		if (seqno != next_in)
 			goto unlock;
 
 		/* Take in-sequence message from deferred queue & deliver it */
-		buf = __skb_dequeue(&node->bclink.deferred_queue);
+		buf = __skb_dequeue(&node->bclink.deferdq);
 		goto receive;
 	}
 
 	/* Handle out-of-sequence broadcast message */
 	if (less(next_in, seqno)) {
-		deferred = tipc_link_defer_pkt(&node->bclink.deferred_queue,
+		deferred = tipc_link_defer_pkt(&node->bclink.deferdq,
 					       buf);
 		bclink_update_last_sent(node, seqno);
 		buf = NULL;
@@ -638,7 +630,6 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
 		msg_set_non_seq(msg, 1);
 		msg_set_mc_netid(msg, tn->net_id);
 		tn->bcl->stats.sent_info++;
-
 		if (WARN_ON(!bclink->bcast_nodes.count)) {
 			dump_stack();
 			return 0;
@@ -917,8 +908,9 @@ int tipc_bclink_init(struct net *net)
 	sprintf(bcbearer->media.name, "tipc-broadcast");
 
 	spin_lock_init(&bclink->lock);
-	__skb_queue_head_init(&bcl->outqueue);
-	__skb_queue_head_init(&bcl->deferred_queue);
+	__skb_queue_head_init(&bcl->transmq);
+	__skb_queue_head_init(&bcl->backlogq);
+	__skb_queue_head_init(&bcl->deferdq);
 	skb_queue_head_init(&bcl->wakeupq);
 	bcl->next_out_no = 1;
 	spin_lock_init(&bclink->node.lock);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 2652c32..7e0036f 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -194,10 +194,10 @@ static void link_timeout(unsigned long data)
 	tipc_node_lock(l_ptr->owner);
 
 	/* update counters used in statistical profiling of send traffic */
-	l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->outqueue);
+	l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->transmq);
 	l_ptr->stats.queue_sz_counts++;
 
-	skb = skb_peek(&l_ptr->outqueue);
+	skb = skb_peek(&l_ptr->transmq);
 	if (skb) {
 		struct tipc_msg *msg = buf_msg(skb);
 		u32 length = msg_size(msg);
@@ -229,7 +229,7 @@ static void link_timeout(unsigned long data)
 	/* do all other link processing performed on a periodic basis */
 	link_state_event(l_ptr, TIMEOUT_EVT);
 
-	if (l_ptr->next_out)
+	if (skb_queue_len(&l_ptr->backlogq))
 		tipc_link_push_packets(l_ptr);
 
 	tipc_node_unlock(l_ptr->owner);
@@ -313,8 +313,9 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
 	link_init_max_pkt(l_ptr);
 
 	l_ptr->next_out_no = 1;
-	__skb_queue_head_init(&l_ptr->outqueue);
-	__skb_queue_head_init(&l_ptr->deferred_queue);
+	__skb_queue_head_init(&l_ptr->transmq);
+	__skb_queue_head_init(&l_ptr->backlogq);
+	__skb_queue_head_init(&l_ptr->deferdq);
 	skb_queue_head_init(&l_ptr->wakeupq);
 	skb_queue_head_init(&l_ptr->inputq);
 	skb_queue_head_init(&l_ptr->namedq);
@@ -400,7 +401,7 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
  */
 void link_prepare_wakeup(struct tipc_link *link)
 {
-	uint pend_qsz = skb_queue_len(&link->outqueue);
+	uint pend_qsz = skb_queue_len(&link->backlogq);
 	struct sk_buff *skb, *tmp;
 
 	skb_queue_walk_safe(&link->wakeupq, skb, tmp) {
@@ -430,8 +431,9 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
  */
 void tipc_link_purge_queues(struct tipc_link *l_ptr)
 {
-	__skb_queue_purge(&l_ptr->deferred_queue);
-	__skb_queue_purge(&l_ptr->outqueue);
+	__skb_queue_purge(&l_ptr->deferdq);
+	__skb_queue_purge(&l_ptr->transmq);
+	__skb_queue_purge(&l_ptr->backlogq);
 	tipc_link_reset_fragments(l_ptr);
 }
 
@@ -464,15 +466,15 @@ void tipc_link_reset(struct tipc_link *l_ptr)
 	}
 
 	/* Clean up all queues, except inputq: */
-	__skb_queue_purge(&l_ptr->outqueue);
-	__skb_queue_purge(&l_ptr->deferred_queue);
+	__skb_queue_purge(&l_ptr->transmq);
+	__skb_queue_purge(&l_ptr->backlogq);
+	__skb_queue_purge(&l_ptr->deferdq);
 	if (!owner->inputq)
 		owner->inputq = &l_ptr->inputq;
 	skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);
 	if (!skb_queue_empty(owner->inputq))
 		owner->action_flags |= TIPC_MSG_EVT;
-	l_ptr->next_out = NULL;
-	l_ptr->unacked_window = 0;
+	l_ptr->rcv_unacked = 0;
 	l_ptr->checkpoint = 1;
 	l_ptr->next_out_no = 1;
 	l_ptr->fsm_msg_cnt = 0;
@@ -742,54 +744,51 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
 		     struct sk_buff_head *list)
 {
 	struct tipc_msg *msg = buf_msg(skb_peek(list));
-	uint psz = msg_size(msg);
-	uint sndlim = link->queue_limit[0];
+	unsigned int maxwin = link->window;
 	uint imp = tipc_msg_tot_importance(msg);
 	uint mtu = link->max_pkt;
 	uint ack = mod(link->next_in_no - 1);
 	uint seqno = link->next_out_no;
 	uint bc_last_in = link->owner->bclink.last_in;
 	struct tipc_media_addr *addr = &link->media_addr;
-	struct sk_buff_head *outqueue = &link->outqueue;
+	struct sk_buff_head *transmq = &link->transmq;
+	struct sk_buff_head *backlogq = &link->backlogq;
 	struct sk_buff *skb, *tmp;
 
 	/* Match queue limits against msg importance: */
-	if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp]))
+	if (unlikely(skb_queue_len(backlogq) >= link->queue_limit[imp]))
 		return tipc_link_cong(link, list);
 
 	/* Has valid packet limit been used ? */
-	if (unlikely(psz > mtu)) {
+	if (unlikely(msg_size(msg) > mtu)) {
 		__skb_queue_purge(list);
 		return -EMSGSIZE;
 	}
 
-	/* Prepare each packet for sending, and add to outqueue: */
+	/* Prepare each packet for sending, and add to relevant queue: */
 	skb_queue_walk_safe(list, skb, tmp) {
 		__skb_unlink(skb, list);
 		msg = buf_msg(skb);
-		msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
+		msg_set_seqno(msg, seqno);
+		msg_set_ack(msg, ack);
 		msg_set_bcast_ack(msg, bc_last_in);
 
-		if (skb_queue_len(outqueue) < sndlim) {
-			__skb_queue_tail(outqueue, skb);
-			tipc_bearer_send(net, link->bearer_id,
-					 skb, addr);
-			link->next_out = NULL;
-			link->unacked_window = 0;
-		} else if (tipc_msg_bundle(outqueue, skb, mtu)) {
+		if (likely(skb_queue_len(transmq) < maxwin)) {
+			__skb_queue_tail(transmq, skb);
+			tipc_bearer_send(net, link->bearer_id, skb, addr);
+			link->rcv_unacked = 0;
+			seqno++;
+			continue;
+		}
+		if (tipc_msg_bundle(skb_peek_tail(backlogq), skb, mtu)) {
 			link->stats.sent_bundled++;
 			continue;
-		} else if (tipc_msg_make_bundle(outqueue, skb, mtu,
-						link->addr)) {
+		}
+		if (tipc_msg_make_bundle(&skb, mtu, link->addr)) {
 			link->stats.sent_bundled++;
 			link->stats.sent_bundles++;
-			if (!link->next_out)
-				link->next_out = skb_peek_tail(outqueue);
-		} else {
-			__skb_queue_tail(outqueue, skb);
-			if (!link->next_out)
-				link->next_out = skb;
 		}
+		__skb_queue_tail(backlogq, skb);
 		seqno++;
 	}
 	link->next_out_no = seqno;
@@ -895,14 +894,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
 	kfree_skb(buf);
 }
 
-struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
-				    const struct sk_buff *skb)
-{
-	if (skb_queue_is_last(list, skb))
-		return NULL;
-	return skb->next;
-}
-
 /*
  * tipc_link_push_packets - push unsent packets to bearer
  *
@@ -911,30 +902,23 @@ struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
  *
  * Called with node locked
  */
-void tipc_link_push_packets(struct tipc_link *l_ptr)
+void tipc_link_push_packets(struct tipc_link *link)
 {
-	struct sk_buff_head *outqueue = &l_ptr->outqueue;
-	struct sk_buff *skb = l_ptr->next_out;
+	struct sk_buff *skb;
 	struct tipc_msg *msg;
-	u32 next, first;
+	unsigned int ack = mod(link->next_in_no - 1);
 
-	skb_queue_walk_from(outqueue, skb) {
-		msg = buf_msg(skb);
-		next = msg_seqno(msg);
-		first = buf_seqno(skb_peek(outqueue));
-
-		if (mod(next - first) < l_ptr->queue_limit[0]) {
-			msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
-			msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
-			if (msg_user(msg) == MSG_BUNDLER)
-				TIPC_SKB_CB(skb)->bundling = false;
-			tipc_bearer_send(l_ptr->owner->net,
-					 l_ptr->bearer_id, skb,
-					 &l_ptr->media_addr);
-			l_ptr->next_out = tipc_skb_queue_next(outqueue, skb);
-		} else {
+	while (skb_queue_len(&link->transmq) < link->window) {
+		skb = __skb_dequeue(&link->backlogq);
+		if (!skb)
 			break;
-		}
+		msg = buf_msg(skb);
+		msg_set_ack(msg, ack);
+		msg_set_bcast_ack(msg, link->owner->bclink.last_in);
+		link->rcv_unacked = 0;
+		__skb_queue_tail(&link->transmq, skb);
+		tipc_bearer_send(link->owner->net, link->bearer_id,
+				 skb, &link->media_addr);
 	}
 }
 
@@ -1021,8 +1005,8 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
 		l_ptr->stale_count = 1;
 	}
 
-	skb_queue_walk_from(&l_ptr->outqueue, skb) {
-		if (!retransmits || skb == l_ptr->next_out)
+	skb_queue_walk_from(&l_ptr->transmq, skb) {
+		if (!retransmits)
 			break;
 		msg = buf_msg(skb);
 		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
@@ -1039,12 +1023,12 @@ static void link_retrieve_defq(struct tipc_link *link,
 {
 	u32 seq_no;
 
-	if (skb_queue_empty(&link->deferred_queue))
+	if (skb_queue_empty(&link->deferdq))
 		return;
 
-	seq_no = buf_seqno(skb_peek(&link->deferred_queue));
+	seq_no = buf_seqno(skb_peek(&link->deferdq));
 	if (seq_no == mod(link->next_in_no))
-		skb_queue_splice_tail_init(&link->deferred_queue, list);
+		skb_queue_splice_tail_init(&link->deferdq, list);
 }
 
 /**
@@ -1121,17 +1105,16 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
 			tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
 
 		released = 0;
-		skb_queue_walk_safe(&l_ptr->outqueue, skb1, tmp) {
-			if (skb1 == l_ptr->next_out ||
-			    more(buf_seqno(skb1), ackd))
+		skb_queue_walk_safe(&l_ptr->transmq, skb1, tmp) {
+			if (more(buf_seqno(skb1), ackd))
 				break;
-			 __skb_unlink(skb1, &l_ptr->outqueue);
+			 __skb_unlink(skb1, &l_ptr->transmq);
 			 kfree_skb(skb1);
 			 released = 1;
 		}
 
 		/* Try sending any messages link endpoint has pending */
-		if (unlikely(l_ptr->next_out))
+		if (unlikely(skb_queue_len(&l_ptr->backlogq)))
 			tipc_link_push_packets(l_ptr);
 
 		if (released && !skb_queue_empty(&l_ptr->wakeupq))
@@ -1166,10 +1149,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
 			goto unlock;
 		}
 		l_ptr->next_in_no++;
-		if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue)))
+		if (unlikely(!skb_queue_empty(&l_ptr->deferdq)))
 			link_retrieve_defq(l_ptr, &head);
-
-		if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
+		if (unlikely(++l_ptr->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
 			l_ptr->stats.sent_acks++;
 			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
 		}
@@ -1336,9 +1318,9 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
 		return;
 	}
 
-	if (tipc_link_defer_pkt(&l_ptr->deferred_queue, buf)) {
+	if (tipc_link_defer_pkt(&l_ptr->deferdq, buf)) {
 		l_ptr->stats.deferred_recv++;
-		if ((skb_queue_len(&l_ptr->deferred_queue) % 16) == 1)
+		if ((skb_queue_len(&l_ptr->deferdq) % TIPC_MIN_LINK_WIN) == 1)
 			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
 	} else {
 		l_ptr->stats.duplicates++;
@@ -1375,11 +1357,11 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
 
 		if (!tipc_link_is_up(l_ptr))
 			return;
-		if (l_ptr->next_out)
-			next_sent = buf_seqno(l_ptr->next_out);
+		if (skb_queue_len(&l_ptr->backlogq))
+			next_sent = buf_seqno(skb_peek(&l_ptr->backlogq));
 		msg_set_next_sent(msg, next_sent);
-		if (!skb_queue_empty(&l_ptr->deferred_queue)) {
-			u32 rec = buf_seqno(skb_peek(&l_ptr->deferred_queue));
+		if (!skb_queue_empty(&l_ptr->deferdq)) {
+			u32 rec = buf_seqno(skb_peek(&l_ptr->deferdq));
 			gap = mod(rec - mod(l_ptr->next_in_no));
 		}
 		msg_set_seq_gap(msg, gap);
@@ -1431,10 +1413,9 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
 
 	skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
 	buf->priority = TC_PRIO_CONTROL;
-
 	tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, buf,
 			 &l_ptr->media_addr);
-	l_ptr->unacked_window = 0;
+	l_ptr->rcv_unacked = 0;
 	kfree_skb(buf);
 }
 
@@ -1569,7 +1550,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
 		}
 		if (msg_seq_gap(msg)) {
 			l_ptr->stats.recv_nacks++;
-			tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->outqueue),
+			tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq),
 					     msg_seq_gap(msg));
 		}
 		break;
@@ -1616,7 +1597,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
  */
 void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
 {
-	u32 msgcount = skb_queue_len(&l_ptr->outqueue);
+	int msgcount;
 	struct tipc_link *tunnel = l_ptr->owner->active_links[0];
 	struct tipc_msg tunnel_hdr;
 	struct sk_buff *skb;
@@ -1627,10 +1608,12 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
 
 	tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
 		      ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
+	skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq);
+	msgcount = skb_queue_len(&l_ptr->transmq);
 	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
 	msg_set_msgcnt(&tunnel_hdr, msgcount);
 
-	if (skb_queue_empty(&l_ptr->outqueue)) {
+	if (skb_queue_empty(&l_ptr->transmq)) {
 		skb = tipc_buf_acquire(INT_H_SIZE);
 		if (skb) {
 			skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
@@ -1646,7 +1629,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
 	split_bundles = (l_ptr->owner->active_links[0] !=
 			 l_ptr->owner->active_links[1]);
 
-	skb_queue_walk(&l_ptr->outqueue, skb) {
+	skb_queue_walk(&l_ptr->transmq, skb) {
 		struct tipc_msg *msg = buf_msg(skb);
 
 		if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
@@ -1677,39 +1660,46 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
  * and sequence order is preserved per sender/receiver socket pair.
  * Owner node is locked.
  */
-void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
-			      struct tipc_link *tunnel)
+void tipc_link_dup_queue_xmit(struct tipc_link *link,
+			      struct tipc_link *tnl)
 {
 	struct sk_buff *skb;
-	struct tipc_msg tunnel_hdr;
-
-	tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
-		      DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
-	msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue));
-	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
-	skb_queue_walk(&l_ptr->outqueue, skb) {
+	struct tipc_msg tnl_hdr;
+	struct sk_buff_head *queue = &link->transmq;
+	int mcnt;
+
+	tipc_msg_init(link_own_addr(link), &tnl_hdr, CHANGEOVER_PROTOCOL,
+		      DUPLICATE_MSG, INT_H_SIZE, link->addr);
+	mcnt = skb_queue_len(&link->transmq) + skb_queue_len(&link->backlogq);
+	msg_set_msgcnt(&tnl_hdr, mcnt);
+	msg_set_bearer_id(&tnl_hdr, link->peer_bearer_id);
+
+tunnel_queue:
+	skb_queue_walk(queue, skb) {
 		struct sk_buff *outskb;
 		struct tipc_msg *msg = buf_msg(skb);
-		u32 length = msg_size(msg);
+		u32 len = msg_size(msg);
 
-		if (msg_user(msg) == MSG_BUNDLER)
-			msg_set_type(msg, CLOSED_MSG);
-		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));	/* Update */
-		msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
-		msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
-		outskb = tipc_buf_acquire(length + INT_H_SIZE);
+		msg_set_ack(msg, mod(link->next_in_no - 1));
+		msg_set_bcast_ack(msg, link->owner->bclink.last_in);
+		msg_set_size(&tnl_hdr, len + INT_H_SIZE);
+		outskb = tipc_buf_acquire(len + INT_H_SIZE);
 		if (outskb == NULL) {
 			pr_warn("%sunable to send duplicate msg\n",
 				link_co_err);
 			return;
 		}
-		skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE);
-		skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data,
-					       length);
-		__tipc_link_xmit_skb(tunnel, outskb);
-		if (!tipc_link_is_up(l_ptr))
+		skb_copy_to_linear_data(outskb, &tnl_hdr, INT_H_SIZE);
+		skb_copy_to_linear_data_offset(outskb, INT_H_SIZE,
+					       skb->data, len);
+		__tipc_link_xmit_skb(tnl, outskb);
+		if (!tipc_link_is_up(link))
 			return;
 	}
+	if (queue == &link->backlogq)
+		return;
+	queue = &link->backlogq;
+	goto tunnel_queue;
 }
 
 /* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
@@ -1823,6 +1813,8 @@ static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)
 
 void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
 {
+	l_ptr->window = window;
+
 	/* Data messages from this node, inclusive FIRST_FRAGM */
 	l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
 	l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 7aeb520..eec3ecf 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -124,7 +124,8 @@ struct tipc_stats {
  * @max_pkt: current maximum packet size for this link
  * @max_pkt_target: desired maximum packet size for this link
  * @max_pkt_probes: # of probes based on current (max_pkt, max_pkt_target)
- * @outqueue: outbound message queue
+ * @transmitq: queue for sent, non-acked messages
+ * @backlogq: queue for messages waiting to be sent
  * @next_out_no: next sequence number to use for outbound messages
  * @last_retransmitted: sequence number of most recently retransmitted message
  * @stale_count: # of identical retransmit requests made by peer
@@ -177,20 +178,21 @@ struct tipc_link {
 	u32 max_pkt_probes;
 
 	/* Sending */
-	struct sk_buff_head outqueue;
+	struct sk_buff_head transmq;
+	struct sk_buff_head backlogq;
 	u32 next_out_no;
+	u32 window;
 	u32 last_retransmitted;
 	u32 stale_count;
 
 	/* Reception */
 	u32 next_in_no;
-	struct sk_buff_head deferred_queue;
-	u32 unacked_window;
+	u32 rcv_unacked;
+	struct sk_buff_head deferdq;
 	struct sk_buff_head inputq;
 	struct sk_buff_head namedq;
 
 	/* Congestion handling */
-	struct sk_buff *next_out;
 	struct sk_buff_head wakeupq;
 
 	/* Fragmentation/reassembly */
@@ -302,9 +304,4 @@ static inline int link_reset_reset(struct tipc_link *l_ptr)
 	return l_ptr->state == RESET_RESET;
 }
 
-static inline int link_congested(struct tipc_link *l_ptr)
-{
-	return skb_queue_len(&l_ptr->outqueue) >= l_ptr->queue_limit[0];
-}
-
 #endif
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 0462959..a3b4715 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -328,33 +328,36 @@ error:
 
 /**
  * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one
- * @list: the buffer chain of the existing buffer ("bundle")
+ * @bskb: the buffer to append to ("bundle")
  * @skb:  buffer to be appended
  * @mtu:  max allowable size for the bundle buffer
  * Consumes buffer if successful
  * Returns true if bundling could be performed, otherwise false
  */
-bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu)
+bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu)
 {
-	struct sk_buff *bskb = skb_peek_tail(list);
-	struct tipc_msg *bmsg = buf_msg(bskb);
+	struct tipc_msg *bmsg;
 	struct tipc_msg *msg = buf_msg(skb);
-	unsigned int bsz = msg_size(bmsg);
+	unsigned int bsz;
 	unsigned int msz = msg_size(msg);
-	u32 start = align(bsz);
+	u32 start, pad;
 	u32 max = mtu - INT_H_SIZE;
-	u32 pad = start - bsz;
 
 	if (likely(msg_user(msg) == MSG_FRAGMENTER))
 		return false;
+	if (!bskb)
+		return false;
+	bmsg = buf_msg(bskb);
+	bsz = msg_size(bmsg);
+	start = align(bsz);
+	pad = start - bsz;
+
 	if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL))
 		return false;
 	if (unlikely(msg_user(msg) == BCAST_PROTOCOL))
 		return false;
 	if (likely(msg_user(bmsg) != MSG_BUNDLER))
 		return false;
-	if (likely(!TIPC_SKB_CB(bskb)->bundling))
-		return false;
 	if (unlikely(skb_tailroom(bskb) < (pad + msz)))
 		return false;
 	if (unlikely(max < (start + msz)))
@@ -421,12 +424,11 @@ none:
  * Replaces buffer if successful
  * Returns true if success, otherwise false
  */
-bool tipc_msg_make_bundle(struct sk_buff_head *list,
-			  struct sk_buff *skb, u32 mtu, u32 dnode)
+bool tipc_msg_make_bundle(struct sk_buff **skb, u32 mtu, u32 dnode)
 {
 	struct sk_buff *bskb;
 	struct tipc_msg *bmsg;
-	struct tipc_msg *msg = buf_msg(skb);
+	struct tipc_msg *msg = buf_msg(*skb);
 	u32 msz = msg_size(msg);
 	u32 max = mtu - INT_H_SIZE;
 
@@ -450,9 +452,9 @@ bool tipc_msg_make_bundle(struct sk_buff_head *list,
 	msg_set_seqno(bmsg, msg_seqno(msg));
 	msg_set_ack(bmsg, msg_ack(msg));
 	msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));
-	TIPC_SKB_CB(bskb)->bundling = true;
-	__skb_queue_tail(list, bskb);
-	return tipc_msg_bundle(list, skb, mtu);
+	tipc_msg_bundle(bskb, *skb, mtu);
+	*skb = bskb;
+	return true;
 }
 
 /**
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 62306b8..e5fc5fd 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -767,9 +767,9 @@ struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
 				uint data_sz, u32 dnode, u32 onode,
 				u32 dport, u32 oport, int errcode);
 int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
-bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu);
-bool tipc_msg_make_bundle(struct sk_buff_head *list,
-			  struct sk_buff *skb, u32 mtu, u32 dnode);
+bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu);
+
+bool tipc_msg_make_bundle(struct sk_buff **skb, u32 mtu, u32 dnode);
 bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);
 int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
 		   int offset, int dsz, int mtu, struct sk_buff_head *list);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 86152de..26d1de1 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -111,7 +111,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)
 	INIT_LIST_HEAD(&n_ptr->list);
 	INIT_LIST_HEAD(&n_ptr->publ_list);
 	INIT_LIST_HEAD(&n_ptr->conn_sks);
-	__skb_queue_head_init(&n_ptr->bclink.deferred_queue);
+	__skb_queue_head_init(&n_ptr->bclink.deferdq);
 	hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
 	list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
 		if (n_ptr->addr < temp_node->addr)
@@ -354,7 +354,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
 
 	/* Flush broadcast link info associated with lost node */
 	if (n_ptr->bclink.recv_permitted) {
-		__skb_queue_purge(&n_ptr->bclink.deferred_queue);
+		__skb_queue_purge(&n_ptr->bclink.deferdq);
 
 		if (n_ptr->bclink.reasm_buf) {
 			kfree_skb(n_ptr->bclink.reasm_buf);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index f78be64..e89ac04 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -84,7 +84,7 @@ struct tipc_node_bclink {
 	u32 last_sent;
 	u32 oos_state;
 	u32 deferred_size;
-	struct sk_buff_head deferred_queue;
+	struct sk_buff_head deferdq;
 	struct sk_buff *reasm_buf;
 	int inputq_map;
 	bool recv_permitted;
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH net-next 7/7] tipc: clean up handling of message priorities
  2015-03-13 14:27 [PATCH net-next 0/7] tipc: some optimizations and impovements Jon Maloy
                   ` (5 preceding siblings ...)
  2015-03-13 14:27 ` [PATCH net-next 6/7] tipc: split link outqueue Jon Maloy
@ 2015-03-13 14:27 ` Jon Maloy
  2015-03-13 16:52 ` [PATCH net-next 0/7] tipc: some optimizations and impovements David Miller
  7 siblings, 0 replies; 11+ messages in thread
From: Jon Maloy @ 2015-03-13 14:27 UTC (permalink / raw)
  To: davem; +Cc: Jon Maloy, netdev, Paul Gortmaker, tipc-discussion

Messages transferred by TIPC are assigned an "importance priority", -an
integer value indicating how to treat the message when there is link or
destination socket congestion.

There is no separate header field for this value. Instead, the message
user values have been chosen in ascending order according to perceived
importance, so that the message user field can be used for this.

This is not a good solution. First, we have many more users than the
needed priority levels, so we end up with treating more priority
levels than necessary. Second, the user field cannot always
accurately reflect the priority of the message. E.g., a message
fragment packet should really have the priority of the enveloped
user data message, and not the priority of the MSG_FRAGMENTER user.
Until now, we have been working around this problem in different ways,
but it it now time to implement a consistent way of handling such
priorities, although still within the constraint that we cannot
allocate any more bits in the regular data message header for this.

In this commit, we define a new priority level, TIPC_SYSTEM_IMPORTANCE,
that will be the only one used apart from the four (lower) user data
levels. All non-data messages map down to this priority. Furthermore,
we take some free bits from the MSG_FRAGMENTER header and allocate
them to store the priority of the enveloped message. We then adjust
the functions msg_importance()/msg_set_importance() so that they
read/set the correct header fields depending on user type.

This small protocol change is fully compatible, because the code on
on the receiving end of a link currently reads the importance level
only from user data messages, where there is no change.

Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
 net/tipc/bcast.c |  1 -
 net/tipc/link.c  | 40 +++++++++++++---------------------
 net/tipc/msg.c   |  5 +----
 net/tipc/msg.h   | 65 +++++++++++++++++++++++++++++---------------------------
 4 files changed, 50 insertions(+), 61 deletions(-)

diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 17cb0ff..5aff084 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -383,7 +383,6 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
 		__skb_queue_purge(list);
 		return -EHOSTUNREACH;
 	}
-
 	/* Broadcast to all nodes */
 	if (likely(bclink)) {
 		tipc_bclink_lock(net);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 7e0036f..bc49120 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -35,6 +35,7 @@
  */
 
 #include "core.h"
+#include "subscr.h"
 #include "link.h"
 #include "bcast.h"
 #include "socket.h"
@@ -305,12 +306,10 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
 	msg_set_session(msg, (tn->random & 0xffff));
 	msg_set_bearer_id(msg, b_ptr->identity);
 	strcpy((char *)msg_data(msg), if_name);
-
-	l_ptr->priority = b_ptr->priority;
-	tipc_link_set_queue_limits(l_ptr, b_ptr->window);
-
 	l_ptr->net_plane = b_ptr->net_plane;
 	link_init_max_pkt(l_ptr);
+	l_ptr->priority = b_ptr->priority;
+	tipc_link_set_queue_limits(l_ptr, b_ptr->window);
 
 	l_ptr->next_out_no = 1;
 	__skb_queue_head_init(&l_ptr->transmq);
@@ -708,7 +707,7 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list)
 {
 	struct sk_buff *skb = skb_peek(list);
 	struct tipc_msg *msg = buf_msg(skb);
-	uint imp = tipc_msg_tot_importance(msg);
+	int imp = msg_importance(msg);
 	u32 oport = msg_tot_origport(msg);
 
 	if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
@@ -745,7 +744,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
 {
 	struct tipc_msg *msg = buf_msg(skb_peek(list));
 	unsigned int maxwin = link->window;
-	uint imp = tipc_msg_tot_importance(msg);
+	unsigned int imp = msg_importance(msg);
 	uint mtu = link->max_pkt;
 	uint ack = mod(link->next_in_no - 1);
 	uint seqno = link->next_out_no;
@@ -755,7 +754,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
 	struct sk_buff_head *backlogq = &link->backlogq;
 	struct sk_buff *skb, *tmp;
 
-	/* Match queue limits against msg importance: */
+	/* Match queue limit against msg importance: */
 	if (unlikely(skb_queue_len(backlogq) >= link->queue_limit[imp]))
 		return tipc_link_cong(link, list);
 
@@ -1811,25 +1810,16 @@ static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)
 	l_ptr->abort_limit = tol / (jiffies_to_msecs(l_ptr->cont_intv) / 4);
 }
 
-void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
+void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
 {
-	l_ptr->window = window;
-
-	/* Data messages from this node, inclusive FIRST_FRAGM */
-	l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
-	l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
-	l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
-	l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
-	/* Transiting data messages,inclusive FIRST_FRAGM */
-	l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
-	l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
-	l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
-	l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
-	l_ptr->queue_limit[CONN_MANAGER] = 1200;
-	l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
-	l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
-	/* FRAGMENT and LAST_FRAGMENT packets */
-	l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
+	int max_bulk = TIPC_MAX_PUBLICATIONS / (l->max_pkt / ITEM_SIZE);
+
+	l->window = win;
+	l->queue_limit[TIPC_LOW_IMPORTANCE]      = win / 2;
+	l->queue_limit[TIPC_MEDIUM_IMPORTANCE]   = win;
+	l->queue_limit[TIPC_HIGH_IMPORTANCE]     = win / 2 * 3;
+	l->queue_limit[TIPC_CRITICAL_IMPORTANCE] = win * 2;
+	l->queue_limit[TIPC_SYSTEM_IMPORTANCE]   = max_bulk;
 }
 
 /* tipc_link_find_owner - locate owner node of link by link's name
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index a3b4715..59a2213 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -270,6 +270,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
 		      FIRST_FRAGMENT, INT_H_SIZE, msg_destnode(mhdr));
 	msg_set_size(&pkthdr, pktmax);
 	msg_set_fragm_no(&pkthdr, pktno);
+	msg_set_importance(&pkthdr, msg_importance(mhdr));
 
 	/* Prepare first fragment */
 	skb = tipc_buf_acquire(pktmax);
@@ -469,7 +470,6 @@ bool tipc_msg_reverse(u32 own_addr,  struct sk_buff *buf, u32 *dnode,
 		      int err)
 {
 	struct tipc_msg *msg = buf_msg(buf);
-	uint imp = msg_importance(msg);
 	struct tipc_msg ohdr;
 	uint rdsz = min_t(uint, msg_data_sz(msg), MAX_FORWARD_SIZE);
 
@@ -481,9 +481,6 @@ bool tipc_msg_reverse(u32 own_addr,  struct sk_buff *buf, u32 *dnode,
 	if (msg_errcode(msg))
 		goto exit;
 	memcpy(&ohdr, msg, msg_hdr_sz(msg));
-	imp = min_t(uint, imp + 1, TIPC_CRITICAL_IMPORTANCE);
-	if (msg_isdata(msg))
-		msg_set_importance(msg, imp);
 	msg_set_errcode(msg, err);
 	msg_set_origport(msg, msg_destport(&ohdr));
 	msg_set_destport(msg, msg_origport(&ohdr));
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index e5fc5fd..bd3969a 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -54,6 +54,8 @@ struct plist;
  * - TIPC_HIGH_IMPORTANCE
  * - TIPC_CRITICAL_IMPORTANCE
  */
+#define TIPC_SYSTEM_IMPORTANCE	4
+
 
 /*
  * Payload message types
@@ -64,6 +66,19 @@ struct plist;
 #define TIPC_DIRECT_MSG		3
 
 /*
+ * Internal message users
+ */
+#define  BCAST_PROTOCOL       5
+#define  MSG_BUNDLER          6
+#define  LINK_PROTOCOL        7
+#define  CONN_MANAGER         8
+#define  CHANGEOVER_PROTOCOL  10
+#define  NAME_DISTRIBUTOR     11
+#define  MSG_FRAGMENTER       12
+#define  LINK_CONFIG          13
+#define  SOCK_WAKEUP          14       /* pseudo user */
+
+/*
  * Message header sizes
  */
 #define SHORT_H_SIZE              24	/* In-cluster basic payload message */
@@ -170,16 +185,6 @@ static inline void msg_set_user(struct tipc_msg *m, u32 n)
 	msg_set_bits(m, 0, 25, 0xf, n);
 }
 
-static inline u32 msg_importance(struct tipc_msg *m)
-{
-	return msg_bits(m, 0, 25, 0xf);
-}
-
-static inline void msg_set_importance(struct tipc_msg *m, u32 i)
-{
-	msg_set_user(m, i);
-}
-
 static inline u32 msg_hdr_sz(struct tipc_msg *m)
 {
 	return msg_bits(m, 0, 21, 0xf) << 2;
@@ -336,6 +341,25 @@ static inline void msg_set_seqno(struct tipc_msg *m, u32 n)
 /*
  * Words 3-10
  */
+static inline u32 msg_importance(struct tipc_msg *m)
+{
+	if (unlikely(msg_user(m) == MSG_FRAGMENTER))
+		return msg_bits(m, 5, 13, 0x7);
+	if (likely(msg_isdata(m) && !msg_errcode(m)))
+		return msg_user(m);
+	return TIPC_SYSTEM_IMPORTANCE;
+}
+
+static inline void msg_set_importance(struct tipc_msg *m, u32 i)
+{
+	if (unlikely(msg_user(m) == MSG_FRAGMENTER))
+		msg_set_bits(m, 5, 13, 0x7, i);
+	else if (likely(i < TIPC_SYSTEM_IMPORTANCE))
+		msg_set_user(m, i);
+	else
+		pr_warn("Trying to set illegal importance in message\n");
+}
+
 static inline u32 msg_prevnode(struct tipc_msg *m)
 {
 	return msg_word(m, 3);
@@ -458,20 +482,6 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
  */
 
 /*
- * Internal message users
- */
-#define  BCAST_PROTOCOL       5
-#define  MSG_BUNDLER          6
-#define  LINK_PROTOCOL        7
-#define  CONN_MANAGER         8
-#define  ROUTE_DISTRIBUTOR    9		/* obsoleted */
-#define  CHANGEOVER_PROTOCOL  10
-#define  NAME_DISTRIBUTOR     11
-#define  MSG_FRAGMENTER       12
-#define  LINK_CONFIG          13
-#define  SOCK_WAKEUP          14       /* pseudo user */
-
-/*
  *  Connection management protocol message types
  */
 #define CONN_PROBE        0
@@ -743,13 +753,6 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
 	msg_set_bits(m, 9, 0, 0xffff, n);
 }
 
-static inline u32 tipc_msg_tot_importance(struct tipc_msg *m)
-{
-	if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
-		return msg_importance(msg_get_wrapped(m));
-	return msg_importance(m);
-}
-
 static inline u32 msg_tot_origport(struct tipc_msg *m)
 {
 	if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
-- 
1.9.1


------------------------------------------------------------------------------
Dive into the World of Parallel Programming The Go Parallel Website, sponsored
by Intel and developed in partnership with Slashdot Media, is your hub for all
things parallel software development, from weekly thought leadership blogs to
news, videos, case studies, tutorials and more. Take a look and join the 
conversation now. http://goparallel.sourceforge.net/

^ permalink raw reply related	[flat|nested] 11+ messages in thread

* Re: [PATCH net-next 0/7] tipc: some optimizations and impovements
  2015-03-13 14:27 [PATCH net-next 0/7] tipc: some optimizations and impovements Jon Maloy
                   ` (6 preceding siblings ...)
  2015-03-13 14:27 ` [PATCH net-next 7/7] tipc: clean up handling of message priorities Jon Maloy
@ 2015-03-13 16:52 ` David Miller
  2015-03-13 17:09   ` Jon Maloy
  7 siblings, 1 reply; 11+ messages in thread
From: David Miller @ 2015-03-13 16:52 UTC (permalink / raw)
  To: jon.maloy; +Cc: netdev, paul.gortmaker, tipc-discussion

From: Jon Maloy <jon.maloy@ericsson.com>
Date: Fri, 13 Mar 2015 10:27:33 -0400

> The commits in this series contain some relatively simple changes
> that lead to better throughput across TIPC connections. We also make
> changes to the implementation of link transmission queueing and priority
> handling, in order to make the code more comprehensible and maintainable.

I think you should do the SKB linearization changes properly.

Simply do pskb_may_pull() before you inspect any header, and you can
do this everwhere, even your bundling extraction code path.

I also don't think you even need to clone the SKB there either.

Thanks.

------------------------------------------------------------------------------
Dive into the World of Parallel Programming The Go Parallel Website, sponsored
by Intel and developed in partnership with Slashdot Media, is your hub for all
things parallel software development, from weekly thought leadership blogs to
news, videos, case studies, tutorials and more. Take a look and join the 
conversation now. http://goparallel.sourceforge.net/

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH net-next 0/7] tipc: some optimizations and impovements
  2015-03-13 16:52 ` [PATCH net-next 0/7] tipc: some optimizations and impovements David Miller
@ 2015-03-13 17:09   ` Jon Maloy
  2015-03-13 19:44     ` David Miller
  0 siblings, 1 reply; 11+ messages in thread
From: Jon Maloy @ 2015-03-13 17:09 UTC (permalink / raw)
  To: David Miller, jon.maloy; +Cc: netdev, tipc-discussion, paul.gortmaker


On 15-03-13 12:52 PM, David Miller wrote:
> From: Jon Maloy <jon.maloy@ericsson.com>
> Date: Fri, 13 Mar 2015 10:27:33 -0400
>
>> The commits in this series contain some relatively simple changes
>> that lead to better throughput across TIPC connections. We also make
>> changes to the implementation of link transmission queueing and priority
>> handling, in order to make the code more comprehensible and maintainable.
> I think you should do the SKB linearization changes properly.
>
> Simply do pskb_may_pull() before you inspect any header, and you can
> do this everwhere, even your bundling extraction code path.
Ok, I'll try that.

>
> I also don't think you even need to clone the SKB there either.

I don't don't see how we can not. We may extract many
buffers from different positions in the data area, which will
be sent in different directions, and be freed at different
moments of time.

///jon
>
> Thanks.


------------------------------------------------------------------------------
Dive into the World of Parallel Programming The Go Parallel Website, sponsored
by Intel and developed in partnership with Slashdot Media, is your hub for all
things parallel software development, from weekly thought leadership blogs to
news, videos, case studies, tutorials and more. Take a look and join the 
conversation now. http://goparallel.sourceforge.net/

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH net-next 0/7] tipc: some optimizations and impovements
  2015-03-13 17:09   ` Jon Maloy
@ 2015-03-13 19:44     ` David Miller
  0 siblings, 0 replies; 11+ messages in thread
From: David Miller @ 2015-03-13 19:44 UTC (permalink / raw)
  To: maloy
  Cc: jon.maloy, netdev, paul.gortmaker, erik.hugne, ying.xue, tipc-discussion

From: Jon Maloy <maloy@donjonn.com>
Date: Fri, 13 Mar 2015 13:09:54 -0400

> On 15-03-13 12:52 PM, David Miller wrote:
>> From: Jon Maloy <jon.maloy@ericsson.com>
>> Date: Fri, 13 Mar 2015 10:27:33 -0400
>>
>> I also don't think you even need to clone the SKB there either.
> 
> I don't don't see how we can not. We may extract many
> buffers from different positions in the data area, which will
> be sent in different directions, and be freed at different
> moments of time.

Ok, sounds like you need to clone after all.

^ permalink raw reply	[flat|nested] 11+ messages in thread

end of thread, other threads:[~2015-03-13 19:45 UTC | newest]

Thread overview: 11+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2015-03-13 14:27 [PATCH net-next 0/7] tipc: some optimizations and impovements Jon Maloy
2015-03-13 14:27 ` [PATCH net-next 1/7] tipc: add framework for node capabilities exchange Jon Maloy
2015-03-13 14:27 ` [PATCH net-next 2/7] tipc: move message validation function to msg.c Jon Maloy
2015-03-13 14:27 ` [PATCH net-next 3/7] tipc: eliminate unnecessary linearization of incoming buffers Jon Maloy
2015-03-13 14:27 ` [PATCH net-next 4/7] tipc: extract bundled buffers by cloning instead of copying Jon Maloy
2015-03-13 14:27 ` [PATCH net-next 5/7] tipc: eliminate unnecessary call to broadcast ack function Jon Maloy
2015-03-13 14:27 ` [PATCH net-next 6/7] tipc: split link outqueue Jon Maloy
2015-03-13 14:27 ` [PATCH net-next 7/7] tipc: clean up handling of message priorities Jon Maloy
2015-03-13 16:52 ` [PATCH net-next 0/7] tipc: some optimizations and impovements David Miller
2015-03-13 17:09   ` Jon Maloy
2015-03-13 19:44     ` David Miller

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.