All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH net-next 0/3] tipc: some improvements and fixes
@ 2015-03-25 16:07 Jon Maloy
  2015-03-25 16:07 ` [PATCH net-next 1/3] tipc: introduce starvation free send algorithm Jon Maloy
                   ` (3 more replies)
  0 siblings, 4 replies; 5+ messages in thread
From: Jon Maloy @ 2015-03-25 16:07 UTC (permalink / raw)
  To: davem; +Cc: Jon Maloy, netdev, Paul Gortmaker, tipc-discussion

We introduce a better algorithm for selecting when and which
users should be subject to link congestion control, plus clean
up some code for that mechanism.
Commit #3 fixes another rare race condition during packet reception.

Jon Maloy (3):
  tipc: introduce starvation free send algorithm
  tipc: clean up handling of link congestion
  tipc: eliminate race condition at dual link establishment

 net/tipc/bcast.c |   2 +-
 net/tipc/link.c  | 207 ++++++++++++++++++++++++++++++++++---------------------
 net/tipc/link.h  |   9 ++-
 net/tipc/msg.h   |  36 +++++-----
 4 files changed, 157 insertions(+), 97 deletions(-)

-- 
1.9.1


------------------------------------------------------------------------------
Dive into the World of Parallel Programming The Go Parallel Website, sponsored
by Intel and developed in partnership with Slashdot Media, is your hub for all
things parallel software development, from weekly thought leadership blogs to
news, videos, case studies, tutorials and more. Take a look and join the 
conversation now. http://goparallel.sourceforge.net/

^ permalink raw reply	[flat|nested] 5+ messages in thread

* [PATCH net-next 1/3] tipc: introduce starvation free send algorithm
  2015-03-25 16:07 [PATCH net-next 0/3] tipc: some improvements and fixes Jon Maloy
@ 2015-03-25 16:07 ` Jon Maloy
  2015-03-25 16:07 ` [PATCH net-next 2/3] tipc: clean up handling of link congestion Jon Maloy
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 5+ messages in thread
From: Jon Maloy @ 2015-03-25 16:07 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

Currently, we only use a single counter; the length of the backlog
queue, to determine whether a message should be accepted to the queue
or not. Each time a message is being sent, the queue length is compared
to a threshold value for the message's importance priority. If the queue
length is beyond this threshold, the message is rejected. This algorithm
implies a risk of starvation of low importance senders during very high
load, because it may take a long time before the backlog queue has
decreased enough to accept a lower level message.

We now eliminate this risk by introducing a counter for each importance
priority. When a message is sent, we check only the queue level for that
particular message's priority. If that is ok, the message can be added
to the backlog, irrespective of the queue level for other priorities.
This way, each level is guaranteed a certain portion of the total
bandwidth, and any risk of starvation is eliminated.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
 net/tipc/bcast.c |  2 +-
 net/tipc/link.c  | 58 +++++++++++++++++++++++++++++++++++---------------------
 net/tipc/link.h  |  7 +++++--
 3 files changed, 42 insertions(+), 25 deletions(-)

diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 5230739..7935553 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -831,7 +831,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
 	prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
 	if (!prop)
 		goto attr_msg_full;
-	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->queue_limit[0]))
+	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->window))
 		goto prop_msg_full;
 	nla_nest_end(msg->skb, prop);
 
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 8c98c4d..b9325a1 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -310,7 +310,6 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
 	link_init_max_pkt(l_ptr);
 	l_ptr->priority = b_ptr->priority;
 	tipc_link_set_queue_limits(l_ptr, b_ptr->window);
-
 	l_ptr->next_out_no = 1;
 	__skb_queue_head_init(&l_ptr->transmq);
 	__skb_queue_head_init(&l_ptr->backlogq);
@@ -398,19 +397,22 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
  * Move a number of waiting users, as permitted by available space in
  * the send queue, from link wait queue to node wait queue for wakeup
  */
-void link_prepare_wakeup(struct tipc_link *link)
+void link_prepare_wakeup(struct tipc_link *l)
 {
-	uint pend_qsz = skb_queue_len(&link->backlogq);
+	int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
+	int imp, lim;
 	struct sk_buff *skb, *tmp;
 
-	skb_queue_walk_safe(&link->wakeupq, skb, tmp) {
-		if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp])
+	skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
+		imp = TIPC_SKB_CB(skb)->chain_imp;
+		lim = l->window + l->backlog[imp].limit;
+		pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
+		if ((pnd[imp] + l->backlog[imp].len) >= lim)
 			break;
-		pend_qsz += TIPC_SKB_CB(skb)->chain_sz;
-		skb_unlink(skb, &link->wakeupq);
-		skb_queue_tail(&link->inputq, skb);
-		link->owner->inputq = &link->inputq;
-		link->owner->action_flags |= TIPC_MSG_EVT;
+		skb_unlink(skb, &l->wakeupq);
+		skb_queue_tail(&l->inputq, skb);
+		l->owner->inputq = &l->inputq;
+		l->owner->action_flags |= TIPC_MSG_EVT;
 	}
 }
 
@@ -424,6 +426,16 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
 	l_ptr->reasm_buf = NULL;
 }
 
+static void tipc_link_purge_backlog(struct tipc_link *l)
+{
+	__skb_queue_purge(&l->backlogq);
+	l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
+	l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
+	l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
+	l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
+	l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
+}
+
 /**
  * tipc_link_purge_queues - purge all pkt queues associated with link
  * @l_ptr: pointer to link
@@ -432,7 +444,7 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr)
 {
 	__skb_queue_purge(&l_ptr->deferdq);
 	__skb_queue_purge(&l_ptr->transmq);
-	__skb_queue_purge(&l_ptr->backlogq);
+	tipc_link_purge_backlog(l_ptr);
 	tipc_link_reset_fragments(l_ptr);
 }
 
@@ -466,13 +478,13 @@ void tipc_link_reset(struct tipc_link *l_ptr)
 
 	/* Clean up all queues, except inputq: */
 	__skb_queue_purge(&l_ptr->transmq);
-	__skb_queue_purge(&l_ptr->backlogq);
 	__skb_queue_purge(&l_ptr->deferdq);
 	if (!owner->inputq)
 		owner->inputq = &l_ptr->inputq;
 	skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);
 	if (!skb_queue_empty(owner->inputq))
 		owner->action_flags |= TIPC_MSG_EVT;
+	tipc_link_purge_backlog(l_ptr);
 	l_ptr->rcv_unacked = 0;
 	l_ptr->checkpoint = 1;
 	l_ptr->next_out_no = 1;
@@ -754,16 +766,14 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
 	struct sk_buff_head *backlogq = &link->backlogq;
 	struct sk_buff *skb, *tmp;
 
-	/* Match queue limit against msg importance: */
-	if (unlikely(skb_queue_len(backlogq) >= link->queue_limit[imp]))
+	/* Match backlog limit against msg importance: */
+	if (unlikely(link->backlog[imp].len >= link->backlog[imp].limit))
 		return tipc_link_cong(link, list);
 
-	/* Has valid packet limit been used ? */
 	if (unlikely(msg_size(msg) > mtu)) {
 		__skb_queue_purge(list);
 		return -EMSGSIZE;
 	}
-
 	/* Prepare each packet for sending, and add to relevant queue: */
 	skb_queue_walk_safe(list, skb, tmp) {
 		__skb_unlink(skb, list);
@@ -786,8 +796,10 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
 		if (tipc_msg_make_bundle(&skb, mtu, link->addr)) {
 			link->stats.sent_bundled++;
 			link->stats.sent_bundles++;
+			imp = msg_importance(buf_msg(skb));
 		}
 		__skb_queue_tail(backlogq, skb);
+		link->backlog[imp].len++;
 		seqno++;
 	}
 	link->next_out_no = seqno;
@@ -914,6 +926,7 @@ void tipc_link_push_packets(struct tipc_link *link)
 		if (!skb)
 			break;
 		msg = buf_msg(skb);
+		link->backlog[msg_importance(msg)].len--;
 		msg_set_ack(msg, ack);
 		msg_set_bcast_ack(msg, link->owner->bclink.last_in);
 		link->rcv_unacked = 0;
@@ -1610,6 +1623,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
 	tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
 		      ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
 	skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq);
+	tipc_link_purge_backlog(l_ptr);
 	msgcount = skb_queue_len(&l_ptr->transmq);
 	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
 	msg_set_msgcnt(&tunnel_hdr, msgcount);
@@ -1817,11 +1831,11 @@ void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
 	int max_bulk = TIPC_MAX_PUBLICATIONS / (l->max_pkt / ITEM_SIZE);
 
 	l->window = win;
-	l->queue_limit[TIPC_LOW_IMPORTANCE]      = win / 2;
-	l->queue_limit[TIPC_MEDIUM_IMPORTANCE]   = win;
-	l->queue_limit[TIPC_HIGH_IMPORTANCE]     = win / 2 * 3;
-	l->queue_limit[TIPC_CRITICAL_IMPORTANCE] = win * 2;
-	l->queue_limit[TIPC_SYSTEM_IMPORTANCE]   = max_bulk;
+	l->backlog[TIPC_LOW_IMPORTANCE].limit      = win / 2;
+	l->backlog[TIPC_MEDIUM_IMPORTANCE].limit   = win;
+	l->backlog[TIPC_HIGH_IMPORTANCE].limit     = win / 2 * 3;
+	l->backlog[TIPC_CRITICAL_IMPORTANCE].limit = win * 2;
+	l->backlog[TIPC_SYSTEM_IMPORTANCE].limit   = max_bulk;
 }
 
 /* tipc_link_find_owner - locate owner node of link by link's name
@@ -2120,7 +2134,7 @@ static int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg,
 	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, link->tolerance))
 		goto prop_msg_full;
 	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN,
-			link->queue_limit[TIPC_LOW_IMPORTANCE]))
+			link->window))
 		goto prop_msg_full;
 	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))
 		goto prop_msg_full;
diff --git a/net/tipc/link.h b/net/tipc/link.h
index eec3ecf..99543a4 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -118,7 +118,7 @@ struct tipc_stats {
  * @pmsg: convenience pointer to "proto_msg" field
  * @priority: current link priority
  * @net_plane: current link network plane ('A' through 'H')
- * @queue_limit: outbound message queue congestion thresholds (indexed by user)
+ * @backlog_limit: backlog queue congestion thresholds (indexed by importance)
  * @exp_msg_count: # of tunnelled messages expected during link changeover
  * @reset_checkpoint: seq # of last acknowledged message at time of link reset
  * @max_pkt: current maximum packet size for this link
@@ -166,7 +166,6 @@ struct tipc_link {
 	struct tipc_msg *pmsg;
 	u32 priority;
 	char net_plane;
-	u32 queue_limit[15];	/* queue_limit[0]==window limit */
 
 	/* Changeover */
 	u32 exp_msg_count;
@@ -180,6 +179,10 @@ struct tipc_link {
 	/* Sending */
 	struct sk_buff_head transmq;
 	struct sk_buff_head backlogq;
+	struct {
+		u16 len;
+		u16 limit;
+	} backlog[5];
 	u32 next_out_no;
 	u32 window;
 	u32 last_retransmitted;
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH net-next 2/3] tipc: clean up handling of link congestion
  2015-03-25 16:07 [PATCH net-next 0/3] tipc: some improvements and fixes Jon Maloy
  2015-03-25 16:07 ` [PATCH net-next 1/3] tipc: introduce starvation free send algorithm Jon Maloy
@ 2015-03-25 16:07 ` Jon Maloy
  2015-03-25 16:07 ` [PATCH net-next 3/3] tipc: eliminate race condition at dual link establishment Jon Maloy
  2015-03-25 18:07 ` [PATCH net-next 0/3] tipc: some improvements and fixes David Miller
  3 siblings, 0 replies; 5+ messages in thread
From: Jon Maloy @ 2015-03-25 16:07 UTC (permalink / raw)
  To: davem; +Cc: Jon Maloy, netdev, Paul Gortmaker, tipc-discussion

After the recent changes in message importance handling it becomes
possible to simplify handling of messages and sockets when we
encounter link congestion.

We merge the function tipc_link_cong() into link_schedule_user(),
and simplify the code of the latter. The code should now be
easier to follow, especially regarding return codes and handling
of the message that caused the situation.

In case the scheduling function is unable to pre-allocate a wakeup
message buffer, it now returns -ENOBUFS, which is a more correct
code than the previously used -EHOSTUNREACH.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
 net/tipc/link.c | 104 ++++++++++++++++++++++++++------------------------------
 net/tipc/msg.h  |  28 ++++++---------
 2 files changed, 60 insertions(+), 72 deletions(-)

diff --git a/net/tipc/link.c b/net/tipc/link.c
index b9325a1..58e2460 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -367,28 +367,43 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id,
 }
 
 /**
- * link_schedule_user - schedule user for wakeup after congestion
+ * link_schedule_user - schedule a message sender for wakeup after congestion
  * @link: congested link
- * @oport: sending port
- * @chain_sz: size of buffer chain that was attempted sent
- * @imp: importance of message attempted sent
+ * @list: message that was attempted sent
  * Create pseudo msg to send back to user when congestion abates
+ * Only consumes message if there is an error
  */
-static bool link_schedule_user(struct tipc_link *link, u32 oport,
-			       uint chain_sz, uint imp)
+static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
 {
-	struct sk_buff *buf;
+	struct tipc_msg *msg = buf_msg(skb_peek(list));
+	int imp = msg_importance(msg);
+	u32 oport = msg_origport(msg);
+	u32 addr = link_own_addr(link);
+	struct sk_buff *skb;
 
-	buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
-			      link_own_addr(link), link_own_addr(link),
-			      oport, 0, 0);
-	if (!buf)
-		return false;
-	TIPC_SKB_CB(buf)->chain_sz = chain_sz;
-	TIPC_SKB_CB(buf)->chain_imp = imp;
-	skb_queue_tail(&link->wakeupq, buf);
+	/* This really cannot happen...  */
+	if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
+		pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
+		tipc_link_reset(link);
+		goto err;
+	}
+	/* Non-blocking sender: */
+	if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
+		return -ELINKCONG;
+
+	/* Create and schedule wakeup pseudo message */
+	skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
+			      addr, addr, oport, 0, 0);
+	if (!skb)
+		goto err;
+	TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);
+	TIPC_SKB_CB(skb)->chain_imp = imp;
+	skb_queue_tail(&link->wakeupq, skb);
 	link->stats.link_congs++;
-	return true;
+	return -ELINKCONG;
+err:
+	__skb_queue_purge(list);
+	return -ENOBUFS;
 }
 
 /**
@@ -708,48 +723,15 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
 	}
 }
 
-/* tipc_link_cong: determine return value and how to treat the
- * sent buffer during link congestion.
- * - For plain, errorless user data messages we keep the buffer and
- *   return -ELINKONG.
- * - For all other messages we discard the buffer and return -EHOSTUNREACH
- * - For TIPC internal messages we also reset the link
- */
-static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list)
-{
-	struct sk_buff *skb = skb_peek(list);
-	struct tipc_msg *msg = buf_msg(skb);
-	int imp = msg_importance(msg);
-	u32 oport = msg_tot_origport(msg);
-
-	if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
-		pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
-		tipc_link_reset(link);
-		goto drop;
-	}
-	if (unlikely(msg_errcode(msg)))
-		goto drop;
-	if (unlikely(msg_reroute_cnt(msg)))
-		goto drop;
-	if (TIPC_SKB_CB(skb)->wakeup_pending)
-		return -ELINKCONG;
-	if (link_schedule_user(link, oport, skb_queue_len(list), imp))
-		return -ELINKCONG;
-drop:
-	__skb_queue_purge(list);
-	return -EHOSTUNREACH;
-}
-
 /**
  * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
  * @link: link to use
  * @list: chain of buffers containing message
  *
- * Consumes the buffer chain, except when returning -ELINKCONG
- * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
- * user data messages) or -EHOSTUNREACH (all other messages/senders)
- * Only the socket functions tipc_send_stream() and tipc_send_packet() need
- * to act on the return value, since they may need to do more send attempts.
+ * Consumes the buffer chain, except when returning -ELINKCONG,
+ * since the caller then may want to make more send attempts.
+ * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
+ * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
  */
 int __tipc_link_xmit(struct net *net, struct tipc_link *link,
 		     struct sk_buff_head *list)
@@ -768,7 +750,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
 
 	/* Match backlog limit against msg importance: */
 	if (unlikely(link->backlog[imp].len >= link->backlog[imp].limit))
-		return tipc_link_cong(link, list);
+		return link_schedule_user(link, list);
 
 	if (unlikely(msg_size(msg) > mtu)) {
 		__skb_queue_purge(list);
@@ -820,13 +802,25 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
 	return __tipc_link_xmit(link->owner->net, link, &head);
 }
 
+/* tipc_link_xmit_skb(): send single buffer to destination
+ * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
+ * messages, which will not be rejected
+ * The only exception is datagram messages rerouted after secondary
+ * lookup, which are rare and safe to dispose of anyway.
+ * TODO: Return real return value, and let callers use
+ * tipc_wait_for_sendpkt() where applicable
+ */
 int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
 		       u32 selector)
 {
 	struct sk_buff_head head;
+	int rc;
 
 	skb2list(skb, &head);
-	return tipc_link_xmit(net, &head, dnode, selector);
+	rc = tipc_link_xmit(net, &head, dnode, selector);
+	if (rc == -ELINKCONG)
+		kfree_skb(skb);
+	return 0;
 }
 
 /**
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index bd3969a..6445db0 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -240,6 +240,15 @@ static inline void msg_set_size(struct tipc_msg *m, u32 sz)
 	m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz);
 }
 
+static inline unchar *msg_data(struct tipc_msg *m)
+{
+	return ((unchar *)m) + msg_hdr_sz(m);
+}
+
+static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
+{
+	return (struct tipc_msg *)msg_data(m);
+}
 
 /*
  * Word 1
@@ -372,6 +381,8 @@ static inline void msg_set_prevnode(struct tipc_msg *m, u32 a)
 
 static inline u32 msg_origport(struct tipc_msg *m)
 {
+	if (msg_user(m) == MSG_FRAGMENTER)
+		m = msg_get_wrapped(m);
 	return msg_word(m, 4);
 }
 
@@ -467,16 +478,6 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n)
 	msg_set_word(m, 10, n);
 }
 
-static inline unchar *msg_data(struct tipc_msg *m)
-{
-	return ((unchar *)m) + msg_hdr_sz(m);
-}
-
-static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
-{
-	return (struct tipc_msg *)msg_data(m);
-}
-
 /*
  * Constants and routines used to read and write TIPC internal message headers
  */
@@ -753,13 +754,6 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
 	msg_set_bits(m, 9, 0, 0xffff, n);
 }
 
-static inline u32 msg_tot_origport(struct tipc_msg *m)
-{
-	if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
-		return msg_origport(msg_get_wrapped(m));
-	return msg_origport(m);
-}
-
 struct sk_buff *tipc_buf_acquire(u32 size);
 bool tipc_msg_validate(struct sk_buff *skb);
 bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode,
-- 
1.9.1


------------------------------------------------------------------------------
Dive into the World of Parallel Programming The Go Parallel Website, sponsored
by Intel and developed in partnership with Slashdot Media, is your hub for all
things parallel software development, from weekly thought leadership blogs to
news, videos, case studies, tutorials and more. Take a look and join the 
conversation now. http://goparallel.sourceforge.net/

^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH net-next 3/3] tipc: eliminate race condition at dual link establishment
  2015-03-25 16:07 [PATCH net-next 0/3] tipc: some improvements and fixes Jon Maloy
  2015-03-25 16:07 ` [PATCH net-next 1/3] tipc: introduce starvation free send algorithm Jon Maloy
  2015-03-25 16:07 ` [PATCH net-next 2/3] tipc: clean up handling of link congestion Jon Maloy
@ 2015-03-25 16:07 ` Jon Maloy
  2015-03-25 18:07 ` [PATCH net-next 0/3] tipc: some improvements and fixes David Miller
  3 siblings, 0 replies; 5+ messages in thread
From: Jon Maloy @ 2015-03-25 16:07 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

Despite recent improvements, the establishment of dual parallel
links still has a small glitch where messages can bypass each
other. When the second link in a dual-link configuration is
established, part of the first link's traffic will be steered over
to the new link. Although we do have a mechanism to ensure that
packets sent before and after the establishment of the new link
arrive in sequence to the destination node, this is not enough.
The arriving messages will still be delivered upwards in different
threads, something entailing a risk of message disordering during
the transition phase.

To fix this, we introduce a synchronization mechanism between the
two parallel links, so that traffic arriving on the new link cannot
be added to its input queue until we are guaranteed that all
pre-establishment messages have been delivered on the old, parallel
link.

This problem seems to always have been around, but its occurrence is
so rare that it has not been noticed until recent intensive testing.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
 net/tipc/link.c | 45 +++++++++++++++++++++++++++++++++++++++++++++
 net/tipc/link.h |  2 ++
 net/tipc/msg.h  |  8 ++++++++
 3 files changed, 55 insertions(+)

diff --git a/net/tipc/link.c b/net/tipc/link.c
index 58e2460..1287161 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -139,6 +139,13 @@ static void tipc_link_put(struct tipc_link *l_ptr)
 	kref_put(&l_ptr->ref, tipc_link_release);
 }
 
+static struct tipc_link *tipc_parallel_link(struct tipc_link *l)
+{
+	if (l->owner->active_links[0] != l)
+		return l->owner->active_links[0];
+	return l->owner->active_links[1];
+}
+
 static void link_init_max_pkt(struct tipc_link *l_ptr)
 {
 	struct tipc_node *node = l_ptr->owner;
@@ -1026,6 +1033,32 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
 	}
 }
 
+/* link_synch(): check if all packets arrived before the synch
+ *               point have been consumed
+ * Returns true if the parallel links are synched, otherwise false
+ */
+static bool link_synch(struct tipc_link *l)
+{
+	unsigned int post_synch;
+	struct tipc_link *pl;
+
+	pl  = tipc_parallel_link(l);
+	if (pl == l)
+		goto synched;
+
+	/* Was last pre-synch packet added to input queue ? */
+	if (less_eq(pl->next_in_no, l->synch_point))
+		return false;
+
+	/* Is it still in the input queue ? */
+	post_synch = mod(pl->next_in_no - l->synch_point) - 1;
+	if (skb_queue_len(&pl->inputq) > post_synch)
+		return false;
+synched:
+	l->flags &= ~LINK_SYNCHING;
+	return true;
+}
+
 static void link_retrieve_defq(struct tipc_link *link,
 			       struct sk_buff_head *list)
 {
@@ -1156,6 +1189,14 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
 			skb = NULL;
 			goto unlock;
 		}
+		/* Synchronize with parallel link if applicable */
+		if (unlikely((l_ptr->flags & LINK_SYNCHING) && !msg_dup(msg))) {
+			link_handle_out_of_seq_msg(l_ptr, skb);
+			if (link_synch(l_ptr))
+				link_retrieve_defq(l_ptr, &head);
+			skb = NULL;
+			goto unlock;
+		}
 		l_ptr->next_in_no++;
 		if (unlikely(!skb_queue_empty(&l_ptr->deferdq)))
 			link_retrieve_defq(l_ptr, &head);
@@ -1231,6 +1272,10 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
 
 	switch (msg_user(msg)) {
 	case CHANGEOVER_PROTOCOL:
+		if (msg_dup(msg)) {
+			link->flags |= LINK_SYNCHING;
+			link->synch_point = msg_seqno(msg_get_wrapped(msg));
+		}
 		if (!tipc_link_tunnel_rcv(node, &skb))
 			break;
 		if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 99543a4..d2b5663 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -60,6 +60,7 @@
  */
 #define LINK_STARTED    0x0001
 #define LINK_STOPPED    0x0002
+#define LINK_SYNCHING   0x0004
 
 /* Starting value for maximum packet size negotiation on unicast links
  * (unless bearer MTU is less)
@@ -170,6 +171,7 @@ struct tipc_link {
 	/* Changeover */
 	u32 exp_msg_count;
 	u32 reset_checkpoint;
+	u32 synch_point;
 
 	/* Max packet negotiation */
 	u32 max_pkt;
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 6445db0..d273207 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -554,6 +554,14 @@ static inline void msg_set_node_capabilities(struct tipc_msg *m, u32 n)
 	msg_set_bits(m, 1, 15, 0x1fff, n);
 }
 
+static inline bool msg_dup(struct tipc_msg *m)
+{
+	if (likely(msg_user(m) != CHANGEOVER_PROTOCOL))
+		return false;
+	if (msg_type(m) != DUPLICATE_MSG)
+		return false;
+	return true;
+}
 
 /*
  * Word 2
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 5+ messages in thread

* Re: [PATCH net-next 0/3] tipc: some improvements and fixes
  2015-03-25 16:07 [PATCH net-next 0/3] tipc: some improvements and fixes Jon Maloy
                   ` (2 preceding siblings ...)
  2015-03-25 16:07 ` [PATCH net-next 3/3] tipc: eliminate race condition at dual link establishment Jon Maloy
@ 2015-03-25 18:07 ` David Miller
  3 siblings, 0 replies; 5+ messages in thread
From: David Miller @ 2015-03-25 18:07 UTC (permalink / raw)
  To: jon.maloy
  Cc: netdev, paul.gortmaker, erik.hugne, ying.xue, maloy, tipc-discussion

From: Jon Maloy <jon.maloy@ericsson.com>
Date: Wed, 25 Mar 2015 12:07:23 -0400

> We introduce a better algorithm for selecting when and which
> users should be subject to link congestion control, plus clean
> up some code for that mechanism.
> Commit #3 fixes another rare race condition during packet reception.

Series applied, thanks Jon.

^ permalink raw reply	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2015-03-25 18:07 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2015-03-25 16:07 [PATCH net-next 0/3] tipc: some improvements and fixes Jon Maloy
2015-03-25 16:07 ` [PATCH net-next 1/3] tipc: introduce starvation free send algorithm Jon Maloy
2015-03-25 16:07 ` [PATCH net-next 2/3] tipc: clean up handling of link congestion Jon Maloy
2015-03-25 16:07 ` [PATCH net-next 3/3] tipc: eliminate race condition at dual link establishment Jon Maloy
2015-03-25 18:07 ` [PATCH net-next 0/3] tipc: some improvements and fixes David Miller

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.