All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH net-next 00/13] tipc: new unicast transmission code
@ 2014-06-26  1:41 Jon Maloy
  2014-06-26  1:41 ` [PATCH net-next 01/13] tipc: eliminate case of writing to freed memory Jon Maloy
                   ` (13 more replies)
  0 siblings, 14 replies; 18+ messages in thread
From: Jon Maloy @ 2014-06-26  1:41 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

As a step towards making the data transmission code more maintainable
and performant, we introduce a number of new functions, both for
building, sending and rejecting messages. The new functions will
eventually be used for alla data transmission, user data unicast,
service internal messaging, and multicast/broadcast.

We start with this series, where we introduce the functions, and
let user data unicast and the internal connection protocol use them.
The remaining users will come in a later series.

There are only minor changes to data structures, and no protocol
changes, so the older functions can still be used in parallel for
some time. Until the old functions are removed, we use temporary
names for the new functions, such as tipc_build_msg2, tipc_link_xmit2.

It should be noted that the first two commits are unrelated to the
rest of the series.

Jon Maloy (13):
  tipc: eliminate case of writing to freed memory
  tipc: use negative error return values in functions
  tipc: introduce send functions for chained buffers in link
  tipc: make link mtu easily accessible from socket
  tipc: introduce direct iovec to buffer chain fragmentation function
  tipc: separate building and sending of rejected messages
  tipc: introduce message evaluation function
  tipc: RDM/DGRAM transport uses new fragmenting and sending functions
  tipc: connection oriented transport uses new send functions
  tipc: let port protocol senders use new link send function
  tipc: same receive code path for connection protocol and data
    messages
  tipc: clean up connection protocol reception function
  tipc: simplify connection congestion handling

 net/tipc/link.c        |  417 ++++++++++++++++-------------------------
 net/tipc/link.h        |    2 +
 net/tipc/msg.c         |  282 ++++++++++++++++++++++++++--
 net/tipc/msg.h         |   32 +++-
 net/tipc/net.c         |   63 +------
 net/tipc/net.h         |    2 -
 net/tipc/node.c        |   25 ++-
 net/tipc/node.h        |   17 ++
 net/tipc/node_subscr.c |    6 +-
 net/tipc/port.c        |  328 +++-----------------------------
 net/tipc/port.h        |   40 ----
 net/tipc/socket.c      |  487 ++++++++++++++++++++++++++++--------------------
 net/tipc/socket.h      |   14 ++
 13 files changed, 827 insertions(+), 888 deletions(-)

-- 
1.7.9.5

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [PATCH net-next 01/13] tipc: eliminate case of writing to freed memory
  2014-06-26  1:41 [PATCH net-next 00/13] tipc: new unicast transmission code Jon Maloy
@ 2014-06-26  1:41 ` Jon Maloy
  2014-06-26 10:56   ` Neil Horman
  2014-06-26  1:41 ` [PATCH net-next 02/13] tipc: use negative error return values in functions Jon Maloy
                   ` (12 subsequent siblings)
  13 siblings, 1 reply; 18+ messages in thread
From: Jon Maloy @ 2014-06-26  1:41 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

In the function tipc_nodesub_notify() we call a function pointer
aggregated into the object to be notified, whereafter we set
the function pointer to NULL. However, in some cases the function
pointed to will free the struct containing the function pointer,
resulting in a write to already freed memory.

This bug seems to always have been there, without causing any
notable harm.

In this commit we fix the problem by inverting the order of the
zeroing and the function call.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
 net/tipc/node_subscr.c |    6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c
index 7c59ab1..2d13eea 100644
--- a/net/tipc/node_subscr.c
+++ b/net/tipc/node_subscr.c
@@ -84,11 +84,13 @@ void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub)
 void tipc_nodesub_notify(struct list_head *nsub_list)
 {
 	struct tipc_node_subscr *ns, *safe;
+	net_ev_handler handle_node_down;
 
 	list_for_each_entry_safe(ns, safe, nsub_list, nodesub_list) {
-		if (ns->handle_node_down) {
-			ns->handle_node_down(ns->usr_handle);
+		handle_node_down = ns->handle_node_down;
+		if (handle_node_down) {
 			ns->handle_node_down = NULL;
+			handle_node_down(ns->usr_handle);
 		}
 	}
 }
-- 
1.7.9.5

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH net-next 02/13] tipc: use negative error return values in functions
  2014-06-26  1:41 [PATCH net-next 00/13] tipc: new unicast transmission code Jon Maloy
  2014-06-26  1:41 ` [PATCH net-next 01/13] tipc: eliminate case of writing to freed memory Jon Maloy
@ 2014-06-26  1:41 ` Jon Maloy
  2014-06-26  1:41 ` [PATCH net-next 03/13] tipc: introduce send functions for chained buffers in link Jon Maloy
                   ` (11 subsequent siblings)
  13 siblings, 0 replies; 18+ messages in thread
From: Jon Maloy @ 2014-06-26  1:41 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

In some places, TIPC functions returns positive integers as return
codes. This goes against standard Linux coding practice, and may
even cause problems in some cases.

We now change the return values of the functions filter_rcv()
and filter_connect() to become signed integers, and return
negative error codes when needed. The codes we use in these
particular cases are still TIPC specific, since they are both
part of the TIPC API and have no correspondence in errno.h

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/socket.c |   46 +++++++++++++++++++++++-----------------------
 1 file changed, 23 insertions(+), 23 deletions(-)

diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index ef04755..9a95dab 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1269,17 +1269,16 @@ static void tipc_data_ready(struct sock *sk)
  * @tsk: TIPC socket
  * @msg: message
  *
- * Returns TIPC error status code and socket error status code
- * once it encounters some errors
+ * Returns 0 (TIPC_OK) if everyting ok, -TIPC_ERR_NO_PORT otherwise
  */
-static u32 filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
+static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
 {
 	struct sock *sk = &tsk->sk;
 	struct tipc_port *port = &tsk->port;
 	struct socket *sock = sk->sk_socket;
 	struct tipc_msg *msg = buf_msg(*buf);
 
-	u32 retval = TIPC_ERR_NO_PORT;
+	int retval = -TIPC_ERR_NO_PORT;
 	int res;
 
 	if (msg_mcast(msg))
@@ -1382,32 +1381,33 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
  *
  * Called with socket lock already taken; port lock may also be taken.
  *
- * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
+ * Returns 0 (TIPC_OK) if message was consumed, -TIPC error code if message
+ * to be rejected.
  */
-static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
+static int filter_rcv(struct sock *sk, struct sk_buff *buf)
 {
 	struct socket *sock = sk->sk_socket;
 	struct tipc_sock *tsk = tipc_sk(sk);
 	struct tipc_msg *msg = buf_msg(buf);
 	unsigned int limit = rcvbuf_limit(sk, buf);
-	u32 res = TIPC_OK;
+	int rc = TIPC_OK;
 
 	/* Reject message if it is wrong sort of message for socket */
 	if (msg_type(msg) > TIPC_DIRECT_MSG)
-		return TIPC_ERR_NO_PORT;
+		return -TIPC_ERR_NO_PORT;
 
 	if (sock->state == SS_READY) {
 		if (msg_connected(msg))
-			return TIPC_ERR_NO_PORT;
+			return -TIPC_ERR_NO_PORT;
 	} else {
-		res = filter_connect(tsk, &buf);
-		if (res != TIPC_OK || buf == NULL)
-			return res;
+		rc = filter_connect(tsk, &buf);
+		if (rc != TIPC_OK || buf == NULL)
+			return rc;
 	}
 
 	/* Reject message if there isn't room to queue it */
 	if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
-		return TIPC_ERR_OVERLOAD;
+		return -TIPC_ERR_OVERLOAD;
 
 	/* Enqueue message */
 	TIPC_SKB_CB(buf)->handle = NULL;
@@ -1429,13 +1429,13 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
  */
 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
 {
-	u32 res;
+	int rc;
 	struct tipc_sock *tsk = tipc_sk(sk);
 	uint truesize = buf->truesize;
 
-	res = filter_rcv(sk, buf);
-	if (unlikely(res))
-		tipc_reject_msg(buf, res);
+	rc = filter_rcv(sk, buf);
+	if (unlikely(rc))
+		tipc_reject_msg(buf, -rc);
 
 	if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
 		atomic_add(truesize, &tsk->dupl_rcvcnt);
@@ -1455,7 +1455,7 @@ int tipc_sk_rcv(struct sk_buff *buf)
 	struct tipc_port *port;
 	struct sock *sk;
 	u32 dport = msg_destport(buf_msg(buf));
-	int err = TIPC_OK;
+	int rc = TIPC_OK;
 	uint limit;
 
 	/* Forward unresolved named message */
@@ -1467,7 +1467,7 @@ int tipc_sk_rcv(struct sk_buff *buf)
 	/* Validate destination */
 	port = tipc_port_lock(dport);
 	if (unlikely(!port)) {
-		err = TIPC_ERR_NO_PORT;
+		rc = -TIPC_ERR_NO_PORT;
 		goto exit;
 	}
 
@@ -1478,22 +1478,22 @@ int tipc_sk_rcv(struct sk_buff *buf)
 	bh_lock_sock(sk);
 
 	if (!sock_owned_by_user(sk)) {
-		err = filter_rcv(sk, buf);
+		rc = filter_rcv(sk, buf);
 	} else {
 		if (sk->sk_backlog.len == 0)
 			atomic_set(&tsk->dupl_rcvcnt, 0);
 		limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt);
 		if (sk_add_backlog(sk, buf, limit))
-			err = TIPC_ERR_OVERLOAD;
+			rc = -TIPC_ERR_OVERLOAD;
 	}
 
 	bh_unlock_sock(sk);
 	tipc_port_unlock(port);
 
-	if (likely(!err))
+	if (likely(!rc))
 		return 0;
 exit:
-	tipc_reject_msg(buf, err);
+	tipc_reject_msg(buf, -rc);
 	return -EHOSTUNREACH;
 }
 
-- 
1.7.9.5

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH net-next 03/13] tipc: introduce send functions for chained buffers in link
  2014-06-26  1:41 [PATCH net-next 00/13] tipc: new unicast transmission code Jon Maloy
  2014-06-26  1:41 ` [PATCH net-next 01/13] tipc: eliminate case of writing to freed memory Jon Maloy
  2014-06-26  1:41 ` [PATCH net-next 02/13] tipc: use negative error return values in functions Jon Maloy
@ 2014-06-26  1:41 ` Jon Maloy
  2014-06-26  1:41 ` [PATCH net-next 04/13] tipc: make link mtu easily accessible from socket Jon Maloy
                   ` (10 subsequent siblings)
  13 siblings, 0 replies; 18+ messages in thread
From: Jon Maloy @ 2014-06-26  1:41 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

The current link implementation provides several different transmit
functions, depending on the characteristics of the message to be
sent: if it is an iovec or an sk_buff, if it needs fragmentation or
not, if the caller holds the node_lock or not. The permutation of
these options gives us an unwanted amount of unnecessarily complex
code.

As a first step towards simplifying the send path for all messages,
we introduce two new send functions at link level, tipc_link_xmit2()
and __tipc_link_xmit2(). The former looks up a link to the message
destination, and if one is found, it grabs the node lock and calls
the second function, which works exclusively inside the node lock
protection. If no link is found, and the destination is on the same
node, it delivers the message directly to the local destination
socket.

The new functions take a buffer chain where all packet headers are
already prepared, and the correct MTU has been used. These two
functions will later replace all other link-level transmit functions.

The functions are not backwards compatible, so we have added them
as new functions with temporary names. They are tested, but have no
users yet. Those will be added later in this series.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/link.c |  140 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 net/tipc/link.h |    2 +
 net/tipc/msg.c  |   96 +++++++++++++++++++++++++++++++++-----
 net/tipc/msg.h  |   25 +++++++++-
 4 files changed, 250 insertions(+), 13 deletions(-)

diff --git a/net/tipc/link.c b/net/tipc/link.c
index ad2c57f..68d2afb 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -850,6 +850,144 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector)
 	return res;
 }
 
+/* tipc_link_cong: determine return value and how to treat the
+ * sent buffer during link congestion.
+ * - For plain, errorless user data messages we keep the buffer and
+ *   return -ELINKONG.
+ * - For all other messages we discard the buffer and return -EHOSTUNREACH
+ * - For TIPC internal messages we also reset the link
+ */
+static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
+{
+	struct tipc_msg *msg = buf_msg(buf);
+	uint psz = msg_size(msg);
+	uint imp = tipc_msg_tot_importance(msg);
+	u32 oport = msg_tot_origport(msg);
+
+	if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) {
+		if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
+			link_schedule_port(link, oport, psz);
+			return -ELINKCONG;
+		}
+	} else {
+		pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
+		tipc_link_reset(link);
+	}
+	kfree_skb_list(buf);
+	return -EHOSTUNREACH;
+}
+
+/**
+ * __tipc_link_xmit2(): same as tipc_link_xmit2, but destlink is known & locked
+ * @link: link to use
+ * @buf: chain of buffers containing message
+ * Consumes the buffer chain, except when returning -ELINKCONG
+ * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
+ * user data messages) or -EHOSTUNREACH (all other messages/senders)
+ * Only the socket functions tipc_send_stream() and tipc_send_packet() need
+ * to act on the return value, since they may need to do more send attempts.
+ */
+int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf)
+{
+	struct tipc_msg *msg = buf_msg(buf);
+	uint psz = msg_size(msg);
+	uint qsz = link->out_queue_size;
+	uint sndlim = link->queue_limit[0];
+	uint imp = tipc_msg_tot_importance(msg);
+	uint mtu = link->max_pkt;
+	uint ack = mod(link->next_in_no - 1);
+	uint seqno = link->next_out_no;
+	uint bc_last_in = link->owner->bclink.last_in;
+	struct tipc_media_addr *addr = &link->media_addr;
+	struct sk_buff *next = buf->next;
+
+	/* Match queue limits against msg importance: */
+	if (unlikely(qsz >= link->queue_limit[imp]))
+		return tipc_link_cong(link, buf);
+
+	/* Has valid packet limit been used ? */
+	if (unlikely(psz > mtu)) {
+		kfree_skb_list(buf);
+		return -EMSGSIZE;
+	}
+
+	/* Prepare each packet for sending, and add to outqueue: */
+	while (buf) {
+		next = buf->next;
+		msg = buf_msg(buf);
+		msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
+		msg_set_bcast_ack(msg, bc_last_in);
+
+		if (!link->first_out) {
+			link->first_out = buf;
+		} else if (qsz < sndlim) {
+			link->last_out->next = buf;
+		} else if (tipc_msg_bundle(link->last_out, buf, mtu)) {
+			link->stats.sent_bundled++;
+			buf = next;
+			next = buf->next;
+			continue;
+		} else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) {
+			link->stats.sent_bundled++;
+			link->stats.sent_bundles++;
+			link->last_out->next = buf;
+			if (!link->next_out)
+				link->next_out = buf;
+		} else {
+			link->last_out->next = buf;
+			if (!link->next_out)
+				link->next_out = buf;
+		}
+
+		/* Send packet if possible: */
+		if (likely(++qsz <= sndlim)) {
+			tipc_bearer_send(link->bearer_id, buf, addr);
+			link->next_out = next;
+			link->unacked_window = 0;
+		}
+		seqno++;
+		link->last_out = buf;
+		buf = next;
+	}
+	link->next_out_no = seqno;
+	link->out_queue_size = qsz;
+	return 0;
+}
+
+/**
+ * tipc_link_xmit2() is the general link level function for message sending
+ * @buf: chain of buffers containing message
+ * @dsz: amount of user data to be sent
+ * @dnode: address of destination node
+ * @selector: a number used for deterministic link selection
+ * Consumes the buffer chain, except when returning -ELINKCONG
+ * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
+ */
+int tipc_link_xmit2(struct sk_buff *buf, u32 dnode, u32 selector)
+{
+	struct tipc_link *link = NULL;
+	struct tipc_node *node;
+	int rc = -EHOSTUNREACH;
+
+	node = tipc_node_find(dnode);
+	if (node) {
+		tipc_node_lock(node);
+		link = node->active_links[selector & 1];
+		if (link)
+			rc = __tipc_link_xmit2(link, buf);
+		tipc_node_unlock(node);
+	}
+
+	if (link)
+		return rc;
+
+	if (likely(in_own_node(dnode)))
+		return tipc_sk_rcv(buf);
+
+	kfree_skb_list(buf);
+	return rc;
+}
+
 /*
  * tipc_link_sync_xmit - synchronize broadcast link endpoints.
  *
@@ -1238,7 +1376,7 @@ static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
 			tipc_bearer_send(l_ptr->bearer_id, buf,
 					 &l_ptr->media_addr);
 			if (msg_user(msg) == MSG_BUNDLER)
-				msg_set_type(msg, CLOSED_MSG);
+				msg_set_type(msg, BUNDLE_CLOSED);
 			l_ptr->next_out = buf->next;
 			return 0;
 		}
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 200d518..227ff81 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -227,8 +227,10 @@ void tipc_link_reset_all(struct tipc_node *node);
 void tipc_link_reset(struct tipc_link *l_ptr);
 void tipc_link_reset_list(unsigned int bearer_id);
 int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector);
+int tipc_link_xmit2(struct sk_buff *buf, u32 dest, u32 selector);
 void tipc_link_names_xmit(struct list_head *message_list, u32 dest);
 int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
+int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf);
 int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
 u32 tipc_link_get_max_pkt(u32 dest, u32 selector);
 int tipc_link_iovec_xmit_fast(struct tipc_port *sender,
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 8be6e94..e02afc9 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -37,20 +37,11 @@
 #include "core.h"
 #include "msg.h"
 
-u32 tipc_msg_tot_importance(struct tipc_msg *m)
+static unsigned int align(unsigned int i)
 {
-	if (likely(msg_isdata(m))) {
-		if (likely(msg_orignode(m) == tipc_own_addr))
-			return msg_importance(m);
-		return msg_importance(m) + 4;
-	}
-	if ((msg_user(m) == MSG_FRAGMENTER)  &&
-	    (msg_type(m) == FIRST_FRAGMENT))
-		return msg_importance(msg_get_wrapped(m));
-	return msg_importance(m);
+	return (i + 3) & ~3u;
 }
 
-
 void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
 		   u32 destnode)
 {
@@ -152,3 +143,86 @@ out_free:
 	kfree_skb(*buf);
 	return 0;
 }
+
+/**
+ * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one
+ * @bbuf: the existing buffer ("bundle")
+ * @buf:  buffer to be appended
+ * @mtu:  max allowable size for the bundle buffer
+ * Consumes buffer if successful
+ * Returns true if bundling could be performed, otherwise false
+ */
+bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu)
+{
+	struct tipc_msg *bmsg = buf_msg(bbuf);
+	struct tipc_msg *msg = buf_msg(buf);
+	unsigned int bsz = msg_size(bmsg);
+	unsigned int msz = msg_size(msg);
+	u32 start = align(bsz);
+	u32 max = mtu - INT_H_SIZE;
+	u32 pad = start - bsz;
+
+	if (likely(msg_user(msg) == MSG_FRAGMENTER))
+		return false;
+	if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL))
+		return false;
+	if (unlikely(msg_user(msg) == BCAST_PROTOCOL))
+		return false;
+	if (likely(msg_user(bmsg) != MSG_BUNDLER))
+		return false;
+	if (likely(msg_type(bmsg) != BUNDLE_OPEN))
+		return false;
+	if (unlikely(skb_tailroom(bbuf) < (pad + msz)))
+		return false;
+	if (unlikely(max < (start + msz)))
+		return false;
+
+	skb_put(bbuf, pad + msz);
+	skb_copy_to_linear_data_offset(bbuf, start, buf->data, msz);
+	msg_set_size(bmsg, start + msz);
+	msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1);
+	bbuf->next = buf->next;
+	kfree_skb(buf);
+	return true;
+}
+
+/**
+ * tipc_msg_make_bundle(): Create bundle buf and append message to its tail
+ * @buf:  buffer to be appended and replaced
+ * @mtu:  max allowable size for the bundle buffer, inclusive header
+ * @dnode: destination node for message. (Not always present in header)
+ * Replaces buffer if successful
+ * Returns true if sucess, otherwise false
+ */
+bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode)
+{
+	struct sk_buff *bbuf;
+	struct tipc_msg *bmsg;
+	struct tipc_msg *msg = buf_msg(*buf);
+	u32 msz = msg_size(msg);
+	u32 max = mtu - INT_H_SIZE;
+
+	if (msg_user(msg) == MSG_FRAGMENTER)
+		return false;
+	if (msg_user(msg) == CHANGEOVER_PROTOCOL)
+		return false;
+	if (msg_user(msg) == BCAST_PROTOCOL)
+		return false;
+	if (msz > (max / 2))
+		return false;
+
+	bbuf = tipc_buf_acquire(max);
+	if (!bbuf)
+		return false;
+
+	skb_trim(bbuf, INT_H_SIZE);
+	bmsg = buf_msg(bbuf);
+	tipc_msg_init(bmsg, MSG_BUNDLER, BUNDLE_OPEN, INT_H_SIZE, dnode);
+	msg_set_seqno(bmsg, msg_seqno(msg));
+	msg_set_ack(bmsg, msg_ack(msg));
+	msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));
+	bbuf->next = (*buf)->next;
+	tipc_msg_bundle(bbuf, *buf, mtu);
+	*buf = bbuf;
+	return true;
+}
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 5035119..41a05fa 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -463,6 +463,11 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
 #define FRAGMENT		1
 #define LAST_FRAGMENT		2
 
+/* Bundling protocol message types
+ */
+#define BUNDLE_OPEN             0
+#define BUNDLE_CLOSED           1
+
 /*
  * Link management protocol message types
  */
@@ -706,12 +711,30 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
 	msg_set_bits(m, 9, 0, 0xffff, n);
 }
 
-u32 tipc_msg_tot_importance(struct tipc_msg *m);
+static inline u32 tipc_msg_tot_importance(struct tipc_msg *m)
+{
+	if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
+		return msg_importance(msg_get_wrapped(m));
+	return msg_importance(m);
+}
+
+static inline u32 msg_tot_origport(struct tipc_msg *m)
+{
+	if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
+		return msg_origport(msg_get_wrapped(m));
+	return msg_origport(m);
+}
+
 void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
 		   u32 destnode);
+
 int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
 		   unsigned int len, int max_size, struct sk_buff **buf);
 
 int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
 
+bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu);
+
+bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode);
+
 #endif
-- 
1.7.9.5

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH net-next 04/13] tipc: make link mtu easily accessible from socket
  2014-06-26  1:41 [PATCH net-next 00/13] tipc: new unicast transmission code Jon Maloy
                   ` (2 preceding siblings ...)
  2014-06-26  1:41 ` [PATCH net-next 03/13] tipc: introduce send functions for chained buffers in link Jon Maloy
@ 2014-06-26  1:41 ` Jon Maloy
  2014-06-26  1:41 ` [PATCH net-next 05/13] tipc: introduce direct iovec to buffer chain fragmentation function Jon Maloy
                   ` (9 subsequent siblings)
  13 siblings, 0 replies; 18+ messages in thread
From: Jon Maloy @ 2014-06-26  1:41 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

Message fragmentation is currently performed at link level, inside
the protection of node_lock. This potentially binds up the sending
link structure for a long time, instead of letting it do other tasks,
such as handle reception of new packets.

In this commit, we make the MTUs of each active link become easily
accessible from the socket level, i.e., without taking any spinlock
or dereferencing the target link pointer. This way, we make it possible
to perform fragmentation in the sending socket, before sending the
whole fragment chain to the link for transport.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/node.c |   25 +++++++++++++++++++++----
 net/tipc/node.h |   17 +++++++++++++++++
 2 files changed, 38 insertions(+), 4 deletions(-)

diff --git a/net/tipc/node.c b/net/tipc/node.c
index 5b44c30..d959343 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1,7 +1,7 @@
 /*
  * net/tipc/node.c: TIPC node management routines
  *
- * Copyright (c) 2000-2006, 2012 Ericsson AB
+ * Copyright (c) 2000-2006, 2012-2014, Ericsson AB
  * Copyright (c) 2005-2006, 2010-2014, Wind River Systems
  * All rights reserved.
  *
@@ -155,21 +155,25 @@ void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
 	if (!active[0]) {
 		active[0] = active[1] = l_ptr;
 		node_established_contact(n_ptr);
-		return;
+		goto exit;
 	}
 	if (l_ptr->priority < active[0]->priority) {
 		pr_info("New link <%s> becomes standby\n", l_ptr->name);
-		return;
+		goto exit;
 	}
 	tipc_link_dup_queue_xmit(active[0], l_ptr);
 	if (l_ptr->priority == active[0]->priority) {
 		active[0] = l_ptr;
-		return;
+		goto exit;
 	}
 	pr_info("Old link <%s> becomes standby\n", active[0]->name);
 	if (active[1] != active[0])
 		pr_info("Old link <%s> becomes standby\n", active[1]->name);
 	active[0] = active[1] = l_ptr;
+exit:
+	/* Leave room for changeover header when returning 'mtu' to users: */
+	n_ptr->act_mtus[0] = active[0]->max_pkt - INT_H_SIZE;
+	n_ptr->act_mtus[1] = active[1]->max_pkt - INT_H_SIZE;
 }
 
 /**
@@ -229,6 +233,19 @@ void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
 		tipc_link_failover_send_queue(l_ptr);
 	else
 		node_lost_contact(n_ptr);
+
+	/* Leave room for changeover header when returning 'mtu' to users: */
+	if (active[0]) {
+		n_ptr->act_mtus[0] = active[0]->max_pkt - INT_H_SIZE;
+		n_ptr->act_mtus[1] = active[1]->max_pkt - INT_H_SIZE;
+		return;
+	}
+
+	/* Loopback link went down? No fragmentation needed from now on. */
+	if (n_ptr->addr == tipc_own_addr) {
+		n_ptr->act_mtus[0] = MAX_MSG_SIZE;
+		n_ptr->act_mtus[1] = MAX_MSG_SIZE;
+	}
 }
 
 int tipc_node_active_links(struct tipc_node *n_ptr)
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 9087063..b61716a 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -41,6 +41,7 @@
 #include "addr.h"
 #include "net.h"
 #include "bearer.h"
+#include "msg.h"
 
 /*
  * Out-of-range value for node signature
@@ -105,6 +106,7 @@ struct tipc_node {
 	spinlock_t lock;
 	struct hlist_node hash;
 	struct tipc_link *active_links[2];
+	u32 act_mtus[2];
 	struct tipc_link *links[MAX_BEARERS];
 	unsigned int action_flags;
 	struct tipc_node_bclink bclink;
@@ -143,4 +145,19 @@ static inline bool tipc_node_blocked(struct tipc_node *node)
 		TIPC_NOTIFY_NODE_DOWN | TIPC_WAIT_OWN_LINKS_DOWN));
 }
 
+static inline uint tipc_node_get_mtu(u32 addr, u32 selector)
+{
+	struct tipc_node *node;
+	u32 mtu;
+
+	node = tipc_node_find(addr);
+
+	if (likely(node))
+		mtu = node->act_mtus[selector & 1];
+	else
+		mtu = MAX_MSG_SIZE;
+
+	return mtu;
+}
+
 #endif
-- 
1.7.9.5

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH net-next 05/13] tipc: introduce direct iovec to buffer chain fragmentation function
  2014-06-26  1:41 [PATCH net-next 00/13] tipc: new unicast transmission code Jon Maloy
                   ` (3 preceding siblings ...)
  2014-06-26  1:41 ` [PATCH net-next 04/13] tipc: make link mtu easily accessible from socket Jon Maloy
@ 2014-06-26  1:41 ` Jon Maloy
  2014-06-26  1:41 ` [PATCH net-next 06/13] tipc: separate building and sending of rejected messages Jon Maloy
                   ` (8 subsequent siblings)
  13 siblings, 0 replies; 18+ messages in thread
From: Jon Maloy @ 2014-06-26  1:41 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

Fragmentation at message sending is currently performed in two
places in link.c, depending on whether data to be transmitted
is delivered in the form of an iovec or as a big sk_buff. Those
functions are also tightly entangled with the send functions
that are using them.

We now introduce a re-entrant, standalone function, tipc_msg_build2(),
that builds a packet chain directly from an iovec. Each fragment is
sized according to the MTU value given by the caller, and is prepended
with a correctly built fragment header, when needed. The function is
independent from who is calling and where the chain will be delivered,
as long as the caller is able to indicate a correct MTU.

The function is tested, but not called by anybody yet. Since it is
incompatible with the existing tipc_msg_build(), and we cannot yet
remove that function, we have given it a temporary name.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/msg.c |  102 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 net/tipc/msg.h |    3 ++
 2 files changed, 105 insertions(+)

diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index e02afc9..4093b4c 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -144,6 +144,108 @@ out_free:
 	return 0;
 }
 
+
+/**
+ * tipc_msg_build2 - create buffer chain containing specified header and data
+ * @mhdr: Message header, to be prepended to data
+ * @iov: User data
+ * @offset: Posision in iov to start copying from
+ * @dsz: Total length of user data
+ * @pktmax: Max packet size that can be used
+ * @chain: Buffer or chain of buffers to be returned to caller
+ * Returns message data size or errno: -ENOMEM, -EFAULT
+ */
+int tipc_msg_build2(struct tipc_msg *mhdr, struct iovec const *iov,
+		    int offset, int dsz, int pktmax , struct sk_buff **chain)
+{
+	int mhsz = msg_hdr_sz(mhdr);
+	int msz = mhsz + dsz;
+	int pktno = 1;
+	int pktsz;
+	int pktrem = pktmax;
+	int drem = dsz;
+	struct tipc_msg pkthdr;
+	struct sk_buff *buf, *prev;
+	char *pktpos;
+	int rc;
+
+	msg_set_size(mhdr, msz);
+
+	/* No fragmentation needed? */
+	if (likely(msz <= pktmax)) {
+		buf = tipc_buf_acquire(msz);
+		*chain = buf;
+		if (unlikely(!buf))
+			return -ENOMEM;
+		skb_copy_to_linear_data(buf, mhdr, mhsz);
+		pktpos = buf->data + mhsz;
+		if (!dsz || !memcpy_fromiovecend(pktpos, iov, offset, dsz))
+			return dsz;
+		rc = -EFAULT;
+		goto error;
+	}
+
+	/* Prepare reusable fragment header */
+	tipc_msg_init(&pkthdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
+		      INT_H_SIZE, msg_destnode(mhdr));
+	msg_set_size(&pkthdr, pktmax);
+	msg_set_fragm_no(&pkthdr, pktno);
+
+	/* Prepare first fragment */
+	*chain = buf = tipc_buf_acquire(pktmax);
+	if (!buf)
+		return -ENOMEM;
+	pktpos = buf->data;
+	skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE);
+	pktpos += INT_H_SIZE;
+	pktrem -= INT_H_SIZE;
+	skb_copy_to_linear_data_offset(buf, INT_H_SIZE, mhdr, mhsz);
+	pktpos += mhsz;
+	pktrem -= mhsz;
+
+	do {
+		if (drem < pktrem)
+			pktrem = drem;
+
+		if (memcpy_fromiovecend(pktpos, iov, offset, pktrem)) {
+			rc = -EFAULT;
+			goto error;
+		}
+		drem -= pktrem;
+		offset += pktrem;
+
+		if (!drem)
+			break;
+
+		/* Prepare new fragment: */
+		if (drem < (pktmax - INT_H_SIZE))
+			pktsz = drem + INT_H_SIZE;
+		else
+			pktsz = pktmax;
+		prev = buf;
+		buf = tipc_buf_acquire(pktsz);
+		if (!buf) {
+			rc = -ENOMEM;
+			goto error;
+		}
+		prev->next = buf;
+		msg_set_type(&pkthdr, FRAGMENT);
+		msg_set_size(&pkthdr, pktsz);
+		msg_set_fragm_no(&pkthdr, ++pktno);
+		skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE);
+		pktpos = buf->data + INT_H_SIZE;
+		pktrem = pktsz - INT_H_SIZE;
+
+	} while (1);
+
+	msg_set_type(buf_msg(buf), LAST_FRAGMENT);
+	return dsz;
+error:
+	kfree_skb_list(*chain);
+	*chain = NULL;
+	return rc;
+}
+
 /**
  * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one
  * @bbuf: the existing buffer ("bundle")
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 41a05fa..5e1339d 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -737,4 +737,7 @@ bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu);
 
 bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode);
 
+int tipc_msg_build2(struct tipc_msg *mhdr, struct iovec const *iov,
+		    int offset, int dsz, int mtu , struct sk_buff **chain);
+
 #endif
-- 
1.7.9.5

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH net-next 06/13] tipc: separate building and sending of rejected messages
  2014-06-26  1:41 [PATCH net-next 00/13] tipc: new unicast transmission code Jon Maloy
                   ` (4 preceding siblings ...)
  2014-06-26  1:41 ` [PATCH net-next 05/13] tipc: introduce direct iovec to buffer chain fragmentation function Jon Maloy
@ 2014-06-26  1:41 ` Jon Maloy
  2014-06-26  1:41 ` [PATCH net-next 07/13] tipc: introduce message evaluation function Jon Maloy
                   ` (7 subsequent siblings)
  13 siblings, 0 replies; 18+ messages in thread
From: Jon Maloy @ 2014-06-26  1:41 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

The way we build and send rejected message is currenty perceived as
hard to follow, partly because we let the transmission go via deep
call chains through functions such as tipc_reject_msg() and
net_route_msg().

We want to remove those functions, and make the call sequences shallower
and simpler. For this purpose, we separate building and sending of
rejected messages. We build the reject message using the new function
tipc_msg_reverse(), and let the transmission go via the newly introduced
tipc_link_xmit2() function, as all transmission eventually will do. We
also ensure that all calls to tipc_link_xmit2() are made outside
port_lock/bh_lock_sock.

Finally, we replace all calls to tipc_reject_msg() with the two new
calls at all locations in the code that we want to keep. The remaining
calls are made from code that we are planning to remove, along with
tipc_reject_msg() itself.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/msg.c    |   42 ++++++++++++++++++++++++++++++++++++++++++
 net/tipc/msg.h    |    2 ++
 net/tipc/socket.c |   28 ++++++++++++++++++++--------
 3 files changed, 64 insertions(+), 8 deletions(-)

diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 4093b4c..6070dd0 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -37,6 +37,8 @@
 #include "core.h"
 #include "msg.h"
 
+#define MAX_FORWARD_SIZE 1024
+
 static unsigned int align(unsigned int i)
 {
 	return (i + 3) & ~3u;
@@ -328,3 +330,43 @@ bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode)
 	*buf = bbuf;
 	return true;
 }
+
+/**
+ * tipc_msg_reverse(): swap source and destination addresses and add error code
+ * @buf:  buffer containing message to be reversed
+ * @dnode: return value: node where to send message after reversal
+ * @err:  error code to be set in message
+ * Consumes buffer if failure
+ * Returns true if success, otherwise false
+ */
+bool tipc_msg_reverse(struct sk_buff *buf, u32 *dnode, int err)
+{
+	struct tipc_msg *msg = buf_msg(buf);
+	uint imp = msg_importance(msg);
+	struct tipc_msg ohdr;
+	uint rdsz = min_t(uint, msg_data_sz(msg), MAX_FORWARD_SIZE);
+
+	if (skb_linearize(buf) || !msg_isdata(msg))
+		goto exit;
+	if (msg_dest_droppable(msg) || msg_errcode(msg))
+		goto exit;
+
+	memcpy(&ohdr, msg, msg_hdr_sz(msg));
+	msg_set_importance(msg, min_t(uint, ++imp, TIPC_CRITICAL_IMPORTANCE));
+	msg_set_errcode(msg, err);
+	msg_set_origport(msg, msg_destport(&ohdr));
+	msg_set_destport(msg, msg_origport(&ohdr));
+	msg_set_prevnode(msg, tipc_own_addr);
+	if (!msg_short(msg)) {
+		msg_set_orignode(msg, msg_destnode(&ohdr));
+		msg_set_destnode(msg, msg_orignode(&ohdr));
+	}
+	msg_set_size(msg, msg_hdr_sz(msg) + rdsz);
+	skb_trim(buf, msg_size(msg));
+	skb_orphan(buf);
+	*dnode = msg_orignode(&ohdr);
+	return true;
+exit:
+	kfree_skb(buf);
+	return false;
+}
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 5e1339d..3805094 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -725,6 +725,8 @@ static inline u32 msg_tot_origport(struct tipc_msg *m)
 	return msg_origport(m);
 }
 
+bool tipc_msg_reverse(struct sk_buff *buf, u32 *dnode, int err);
+
 void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
 		   u32 destnode);
 
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 9a95dab..5961a63 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -39,6 +39,7 @@
 #include "node.h"
 
 #include <linux/export.h>
+#include "link.h"
 
 #define SS_LISTENING	-1	/* socket is listening */
 #define SS_READY	-2	/* socket is connectionless */
@@ -123,9 +124,12 @@ static void advance_rx_queue(struct sock *sk)
 static void reject_rx_queue(struct sock *sk)
 {
 	struct sk_buff *buf;
+	u32 dnode;
 
-	while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
-		tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
+	while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
+		if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT))
+			tipc_link_xmit2(buf, dnode, 0);
+	}
 }
 
 /**
@@ -303,6 +307,7 @@ static int tipc_release(struct socket *sock)
 	struct tipc_sock *tsk;
 	struct tipc_port *port;
 	struct sk_buff *buf;
+	u32 dnode;
 
 	/*
 	 * Exit if socket isn't fully initialized (occurs when a failed accept()
@@ -331,7 +336,8 @@ static int tipc_release(struct socket *sock)
 				sock->state = SS_DISCONNECTING;
 				tipc_port_disconnect(port->ref);
 			}
-			tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
+			if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT))
+				tipc_link_xmit2(buf, dnode, 0);
 		}
 	}
 
@@ -1430,14 +1436,15 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf)
 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
 {
 	int rc;
+	u32 onode;
 	struct tipc_sock *tsk = tipc_sk(sk);
 	uint truesize = buf->truesize;
 
 	rc = filter_rcv(sk, buf);
-	if (unlikely(rc))
-		tipc_reject_msg(buf, -rc);
 
-	if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
+	if (unlikely(rc && tipc_msg_reverse(buf, &onode, -rc)))
+		tipc_link_xmit2(buf, onode, 0);
+	else if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
 		atomic_add(truesize, &tsk->dupl_rcvcnt);
 
 	return 0;
@@ -1457,6 +1464,7 @@ int tipc_sk_rcv(struct sk_buff *buf)
 	u32 dport = msg_destport(buf_msg(buf));
 	int rc = TIPC_OK;
 	uint limit;
+	u32 dnode;
 
 	/* Forward unresolved named message */
 	if (unlikely(!dport)) {
@@ -1493,7 +1501,9 @@ int tipc_sk_rcv(struct sk_buff *buf)
 	if (likely(!rc))
 		return 0;
 exit:
-	tipc_reject_msg(buf, -rc);
+	if (!tipc_msg_reverse(buf, &dnode, -rc))
+		return -EHOSTUNREACH;
+	tipc_link_xmit2(buf, dnode, 0);
 	return -EHOSTUNREACH;
 }
 
@@ -1758,6 +1768,7 @@ static int tipc_shutdown(struct socket *sock, int how)
 	struct tipc_sock *tsk = tipc_sk(sk);
 	struct tipc_port *port = &tsk->port;
 	struct sk_buff *buf;
+	u32 peer;
 	int res;
 
 	if (how != SHUT_RDWR)
@@ -1778,7 +1789,8 @@ restart:
 				goto restart;
 			}
 			tipc_port_disconnect(port->ref);
-			tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
+			if (tipc_msg_reverse(buf, &peer, TIPC_CONN_SHUTDOWN))
+				tipc_link_xmit2(buf, peer, 0);
 		} else {
 			tipc_port_shutdown(port->ref);
 		}
-- 
1.7.9.5

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH net-next 07/13] tipc: introduce message evaluation function
  2014-06-26  1:41 [PATCH net-next 00/13] tipc: new unicast transmission code Jon Maloy
                   ` (5 preceding siblings ...)
  2014-06-26  1:41 ` [PATCH net-next 06/13] tipc: separate building and sending of rejected messages Jon Maloy
@ 2014-06-26  1:41 ` Jon Maloy
  2014-06-26  1:41 ` [PATCH net-next 08/13] tipc: RDM/DGRAM transport uses new fragmenting and sending functions Jon Maloy
                   ` (6 subsequent siblings)
  13 siblings, 0 replies; 18+ messages in thread
From: Jon Maloy @ 2014-06-26  1:41 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

When a message arrives in a node and finds no destination
socket, we may need to drop it, reject it, or forward it after
a secondary destination lookup. The latter two cases currently
results in a code path that is perceived as complex, because it
follows a deep call chain via obscure functions such as
net_route_named_msg() and net_route_msg().

We now introduce a function, tipc_msg_eval(), that takes the
decision about whether such a message should be rejected or
forwarded, but leaves it to the caller to actually perform
the indicated action.

If the decision is 'reject', it is still the task of the recently
introduced function tipc_msg_reverse() to take the final decision
about whether the message is rejectable or not. In the latter case
it drops the message.

As a result of this change, we can finally eliminate the function
net_route_named_msg(), and hence become independent of net_route_msg().

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/msg.c    |   38 ++++++++++++++++++++++++++++++++++++++
 net/tipc/msg.h    |    2 ++
 net/tipc/net.c    |   33 ++++++---------------------------
 net/tipc/socket.c |   16 +++++-----------
 4 files changed, 51 insertions(+), 38 deletions(-)

diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 6070dd0..7bfc442 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -36,6 +36,8 @@
 
 #include "core.h"
 #include "msg.h"
+#include "addr.h"
+#include "name_table.h"
 
 #define MAX_FORWARD_SIZE 1024
 
@@ -370,3 +372,39 @@ exit:
 	kfree_skb(buf);
 	return false;
 }
+
+/**
+ * tipc_msg_eval: determine fate of message that found no destination
+ * @buf: the buffer containing the message.
+ * @dnode: return value: next-hop node, if message to be forwarded
+ * @err: error code to use, if message to be rejected
+ *
+ * Does not consume buffer
+ * Returns 0 (TIPC_OK) if message ok and we can try again, -TIPC error
+ * code if message to be rejected
+ */
+int tipc_msg_eval(struct sk_buff *buf, u32 *dnode)
+{
+	struct tipc_msg *msg = buf_msg(buf);
+	u32 dport;
+
+	if (msg_type(msg) != TIPC_NAMED_MSG)
+		return -TIPC_ERR_NO_PORT;
+	if (skb_linearize(buf))
+		return -TIPC_ERR_NO_NAME;
+	if (msg_data_sz(msg) > MAX_FORWARD_SIZE)
+		return -TIPC_ERR_NO_NAME;
+	if (msg_reroute_cnt(msg) > 0)
+		return -TIPC_ERR_NO_NAME;
+
+	*dnode = addr_domain(msg_lookup_scope(msg));
+	dport = tipc_nametbl_translate(msg_nametype(msg),
+				       msg_nameinst(msg),
+				       dnode);
+	if (!dport)
+		return -TIPC_ERR_NO_NAME;
+	msg_incr_reroute_cnt(msg);
+	msg_set_destnode(msg, *dnode);
+	msg_set_destport(msg, dport);
+	return TIPC_OK;
+}
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 3805094..7d57434 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -727,6 +727,8 @@ static inline u32 msg_tot_origport(struct tipc_msg *m)
 
 bool tipc_msg_reverse(struct sk_buff *buf, u32 *dnode, int err);
 
+int tipc_msg_eval(struct sk_buff *buf, u32 *dnode);
+
 void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
 		   u32 destnode);
 
diff --git a/net/tipc/net.c b/net/tipc/net.c
index f64375e..5f7d6ff 100644
--- a/net/tipc/net.c
+++ b/net/tipc/net.c
@@ -1,7 +1,7 @@
 /*
  * net/tipc/net.c: TIPC network routing code
  *
- * Copyright (c) 1995-2006, Ericsson AB
+ * Copyright (c) 1995-2006, 2014, Ericsson AB
  * Copyright (c) 2005, 2010-2011, Wind River Systems
  * All rights reserved.
  *
@@ -103,29 +103,6 @@
  *       This is always used within the scope of a tipc_nametbl_lock(read).
  *     - A local spin_lock protecting the queue of subscriber events.
 */
-
-static void net_route_named_msg(struct sk_buff *buf)
-{
-	struct tipc_msg *msg = buf_msg(buf);
-	u32 dnode;
-	u32 dport;
-
-	if (!msg_named(msg)) {
-		kfree_skb(buf);
-		return;
-	}
-
-	dnode = addr_domain(msg_lookup_scope(msg));
-	dport = tipc_nametbl_translate(msg_nametype(msg), msg_nameinst(msg), &dnode);
-	if (dport) {
-		msg_set_destnode(msg, dnode);
-		msg_set_destport(msg, dport);
-		tipc_net_route_msg(buf);
-		return;
-	}
-	tipc_reject_msg(buf, TIPC_ERR_NO_NAME);
-}
-
 void tipc_net_route_msg(struct sk_buff *buf)
 {
 	struct tipc_msg *msg;
@@ -141,10 +118,12 @@ void tipc_net_route_msg(struct sk_buff *buf)
 		if (msg_isdata(msg)) {
 			if (msg_mcast(msg))
 				tipc_port_mcast_rcv(buf, NULL);
-			else if (msg_destport(msg))
+			else if (msg_destport(msg)) {
 				tipc_sk_rcv(buf);
-			else
-				net_route_named_msg(buf);
+			} else {
+				pr_warn("Cannot route msg; no destination\n");
+				kfree_skb(buf);
+			}
 			return;
 		}
 		switch (msg_user(msg)) {
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 5961a63..e642ed5 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1466,16 +1466,10 @@ int tipc_sk_rcv(struct sk_buff *buf)
 	uint limit;
 	u32 dnode;
 
-	/* Forward unresolved named message */
-	if (unlikely(!dport)) {
-		tipc_net_route_msg(buf);
-		return 0;
-	}
-
-	/* Validate destination */
+	/* Validate destination and message */
 	port = tipc_port_lock(dport);
 	if (unlikely(!port)) {
-		rc = -TIPC_ERR_NO_PORT;
+		rc = tipc_msg_eval(buf, &dnode);
 		goto exit;
 	}
 
@@ -1494,17 +1488,17 @@ int tipc_sk_rcv(struct sk_buff *buf)
 		if (sk_add_backlog(sk, buf, limit))
 			rc = -TIPC_ERR_OVERLOAD;
 	}
-
 	bh_unlock_sock(sk);
 	tipc_port_unlock(port);
 
 	if (likely(!rc))
 		return 0;
 exit:
-	if (!tipc_msg_reverse(buf, &dnode, -rc))
+	if ((rc < 0) && !tipc_msg_reverse(buf, &dnode, -rc))
 		return -EHOSTUNREACH;
+
 	tipc_link_xmit2(buf, dnode, 0);
-	return -EHOSTUNREACH;
+	return (rc < 0) ? -EHOSTUNREACH : 0;
 }
 
 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
-- 
1.7.9.5

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH net-next 08/13] tipc: RDM/DGRAM transport uses new fragmenting and sending functions
  2014-06-26  1:41 [PATCH net-next 00/13] tipc: new unicast transmission code Jon Maloy
                   ` (6 preceding siblings ...)
  2014-06-26  1:41 ` [PATCH net-next 07/13] tipc: introduce message evaluation function Jon Maloy
@ 2014-06-26  1:41 ` Jon Maloy
  2014-06-26  1:41 ` [PATCH net-next 09/13] tipc: connection oriented transport uses new send functions Jon Maloy
                   ` (5 subsequent siblings)
  13 siblings, 0 replies; 18+ messages in thread
From: Jon Maloy @ 2014-06-26  1:41 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

We merge the code for sending port name and port identity addressed
messages into the corresponding send functions in socket.c, and start
using the new fragmenting and transmit functions we just have introduced.

This saves a call level and quite a few code lines, as well as making
this part of the code easier to follow. As a consequence, the functions
tipc_send2name() and tipc_send2port() in port.c can be removed.

For practical reasons, we break out the code for sending multicast messages
from tipc_sendmsg() and move it into a separate function, tipc_sendmcast(),
but we do not yet convert it into using the new build/send functions.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/port.c   |   90 ------------------------------
 net/tipc/port.h   |   11 ----
 net/tipc/socket.c |  158 +++++++++++++++++++++++++++++++++++++----------------
 3 files changed, 110 insertions(+), 149 deletions(-)

diff --git a/net/tipc/port.c b/net/tipc/port.c
index 5fd7acc..8350ec9 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -806,93 +806,3 @@ int tipc_send(struct tipc_port *p_ptr,
 	}
 	return -ELINKCONG;
 }
-
-/**
- * tipc_send2name - send message sections to port name
- */
-int tipc_send2name(struct tipc_port *p_ptr,
-		   struct tipc_name const *name,
-		   unsigned int domain,
-		   struct iovec const *msg_sect,
-		   unsigned int len)
-{
-	struct tipc_msg *msg;
-	u32 destnode = domain;
-	u32 destport;
-	int res;
-
-	if (p_ptr->connected)
-		return -EINVAL;
-
-	msg = &p_ptr->phdr;
-	msg_set_type(msg, TIPC_NAMED_MSG);
-	msg_set_hdr_sz(msg, NAMED_H_SIZE);
-	msg_set_nametype(msg, name->type);
-	msg_set_nameinst(msg, name->instance);
-	msg_set_lookup_scope(msg, tipc_addr_scope(domain));
-	destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
-	msg_set_destnode(msg, destnode);
-	msg_set_destport(msg, destport);
-
-	if (likely(destport || destnode)) {
-		if (likely(in_own_node(destnode)))
-			res = tipc_port_iovec_rcv(p_ptr, msg_sect, len);
-		else if (tipc_own_addr)
-			res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len,
-							destnode);
-		else
-			res = tipc_port_iovec_reject(p_ptr, msg, msg_sect,
-						     len, TIPC_ERR_NO_NODE);
-		if (likely(res != -ELINKCONG)) {
-			if (res > 0)
-				p_ptr->sent++;
-			return res;
-		}
-		if (tipc_port_unreliable(p_ptr))
-			return len;
-
-		return -ELINKCONG;
-	}
-	return tipc_port_iovec_reject(p_ptr, msg, msg_sect, len,
-				      TIPC_ERR_NO_NAME);
-}
-
-/**
- * tipc_send2port - send message sections to port identity
- */
-int tipc_send2port(struct tipc_port *p_ptr,
-		   struct tipc_portid const *dest,
-		   struct iovec const *msg_sect,
-		   unsigned int len)
-{
-	struct tipc_msg *msg;
-	int res;
-
-	if (p_ptr->connected)
-		return -EINVAL;
-
-	msg = &p_ptr->phdr;
-	msg_set_type(msg, TIPC_DIRECT_MSG);
-	msg_set_lookup_scope(msg, 0);
-	msg_set_destnode(msg, dest->node);
-	msg_set_destport(msg, dest->ref);
-	msg_set_hdr_sz(msg, BASIC_H_SIZE);
-
-	if (in_own_node(dest->node))
-		res =  tipc_port_iovec_rcv(p_ptr, msg_sect, len);
-	else if (tipc_own_addr)
-		res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len,
-						dest->node);
-	else
-		res = tipc_port_iovec_reject(p_ptr, msg, msg_sect, len,
-						TIPC_ERR_NO_NODE);
-	if (likely(res != -ELINKCONG)) {
-		if (res > 0)
-			p_ptr->sent++;
-		return res;
-	}
-	if (tipc_port_unreliable(p_ptr))
-		return len;
-
-	return -ELINKCONG;
-}
diff --git a/net/tipc/port.h b/net/tipc/port.h
index cf4ca5b..e566d55 100644
--- a/net/tipc/port.h
+++ b/net/tipc/port.h
@@ -140,17 +140,6 @@ int tipc_send(struct tipc_port *port,
 	      struct iovec const *msg_sect,
 	      unsigned int len);
 
-int tipc_send2name(struct tipc_port *port,
-		   struct tipc_name const *name,
-		   u32 domain,
-		   struct iovec const *msg_sect,
-		   unsigned int len);
-
-int tipc_send2port(struct tipc_port *port,
-		   struct tipc_portid const *dest,
-		   struct iovec const *msg_sect,
-		   unsigned int len);
-
 int tipc_port_mcast_xmit(struct tipc_port *port,
 			 struct tipc_name_seq const *seq,
 			 struct iovec const *msg,
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index e642ed5..5690751 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -36,8 +36,9 @@
 
 #include "core.h"
 #include "port.h"
+#include "name_table.h"
 #include "node.h"
-
+#include "link.h"
 #include <linux/export.h>
 #include "link.h"
 
@@ -545,6 +546,8 @@ static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
 {
 	struct tipc_cfg_msg_hdr hdr;
 
+	if (unlikely(dest->addrtype == TIPC_ADDR_ID))
+		return 0;
 	if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
 		return 0;
 	if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
@@ -587,13 +590,49 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
 	return 0;
 }
 
+/**
+ * tipc_sendmcast - send multicast message
+ * @sock: socket structure
+ * @seq: destination address
+ * @iov: message data to send
+ * @dsz: total length of message data
+ * @timeo: timeout to wait for wakeup
+ *
+ * Called from function tipc_sendmsg(), which has done all sanity checks
+ * Returns the number of bytes sent on success, or errno
+ */
+static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
+			  struct iovec *iov, size_t dsz, long timeo)
+{
+	struct sock *sk = sock->sk;
+	struct tipc_sock *tsk = tipc_sk(sk);
+	int rc;
+
+	do {
+		if (sock->state != SS_READY) {
+			rc = -EOPNOTSUPP;
+			break;
+		}
+		rc = tipc_port_mcast_xmit(&tsk->port, seq, iov, dsz);
+		if (likely(rc >= 0)) {
+			if (sock->state != SS_READY)
+				sock->state = SS_CONNECTING;
+			break;
+		}
+		if (rc != -ELINKCONG)
+			break;
+		rc = tipc_wait_for_sndmsg(sock, &timeo);
+	} while (!rc);
+
+	return rc;
+}
 
 /**
  * tipc_sendmsg - send message in connectionless manner
  * @iocb: if NULL, indicates that socket lock is already held
  * @sock: socket structure
  * @m: message to send
- * @total_len: length of message
+ * @dsz: amount of user data to be sent
  *
  * Message must have an destination specified explicitly.
  * Used for SOCK_RDM and SOCK_DGRAM messages,
@@ -603,93 +642,116 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
  * Returns the number of bytes sent on success, or errno otherwise
  */
 static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
-			struct msghdr *m, size_t total_len)
+			struct msghdr *m, size_t dsz)
 {
+	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
 	struct sock *sk = sock->sk;
 	struct tipc_sock *tsk = tipc_sk(sk);
 	struct tipc_port *port = &tsk->port;
-	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
-	int needs_conn;
+	struct tipc_msg *mhdr = &port->phdr;
+	struct iovec *iov = m->msg_iov;
+	u32 dnode, dport;
+	struct sk_buff *buf;
+	struct tipc_name_seq *seq = &dest->addr.nameseq;
+	u32 mtu;
 	long timeo;
-	int res = -EINVAL;
+	int rc = -EINVAL;
 
 	if (unlikely(!dest))
 		return -EDESTADDRREQ;
+
 	if (unlikely((m->msg_namelen < sizeof(*dest)) ||
 		     (dest->family != AF_TIPC)))
 		return -EINVAL;
-	if (total_len > TIPC_MAX_USER_MSG_SIZE)
+
+	if (dsz > TIPC_MAX_USER_MSG_SIZE)
 		return -EMSGSIZE;
 
 	if (iocb)
 		lock_sock(sk);
 
-	needs_conn = (sock->state != SS_READY);
-	if (unlikely(needs_conn)) {
+	if (unlikely(sock->state != SS_READY)) {
 		if (sock->state == SS_LISTENING) {
-			res = -EPIPE;
+			rc = -EPIPE;
 			goto exit;
 		}
 		if (sock->state != SS_UNCONNECTED) {
-			res = -EISCONN;
+			rc = -EISCONN;
 			goto exit;
 		}
 		if (tsk->port.published) {
-			res = -EOPNOTSUPP;
+			rc = -EOPNOTSUPP;
 			goto exit;
 		}
 		if (dest->addrtype == TIPC_ADDR_NAME) {
 			tsk->port.conn_type = dest->addr.name.name.type;
 			tsk->port.conn_instance = dest->addr.name.name.instance;
 		}
-
-		/* Abort any pending connection attempts (very unlikely) */
-		reject_rx_queue(sk);
 	}
+	rc = dest_name_check(dest, m);
+	if (rc)
+		goto exit;
 
 	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
-	do {
-		if (dest->addrtype == TIPC_ADDR_NAME) {
-			res = dest_name_check(dest, m);
-			if (res)
-				break;
-			res = tipc_send2name(port,
-					     &dest->addr.name.name,
-					     dest->addr.name.domain,
-					     m->msg_iov,
-					     total_len);
-		} else if (dest->addrtype == TIPC_ADDR_ID) {
-			res = tipc_send2port(port,
-					     &dest->addr.id,
-					     m->msg_iov,
-					     total_len);
-		} else if (dest->addrtype == TIPC_ADDR_MCAST) {
-			if (needs_conn) {
-				res = -EOPNOTSUPP;
-				break;
-			}
-			res = dest_name_check(dest, m);
-			if (res)
-				break;
-			res = tipc_port_mcast_xmit(port,
-						   &dest->addr.nameseq,
-						   m->msg_iov,
-						   total_len);
+
+	if (dest->addrtype == TIPC_ADDR_MCAST) {
+		rc = tipc_sendmcast(sock, seq, iov, dsz, timeo);
+		goto exit;
+	} else if (dest->addrtype == TIPC_ADDR_NAME) {
+		u32 type = dest->addr.name.name.type;
+		u32 inst = dest->addr.name.name.instance;
+		u32 domain = dest->addr.name.domain;
+
+		dnode = domain;
+		msg_set_type(mhdr, TIPC_NAMED_MSG);
+		msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
+		msg_set_nametype(mhdr, type);
+		msg_set_nameinst(mhdr, inst);
+		msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
+		dport = tipc_nametbl_translate(type, inst, &dnode);
+		msg_set_destnode(mhdr, dnode);
+		msg_set_destport(mhdr, dport);
+		if (unlikely(!dport && !dnode)) {
+			rc = -EHOSTUNREACH;
+			goto exit;
 		}
-		if (likely(res != -ELINKCONG)) {
-			if (needs_conn && (res >= 0))
+	} else if (dest->addrtype == TIPC_ADDR_ID) {
+		dnode = dest->addr.id.node;
+		msg_set_type(mhdr, TIPC_DIRECT_MSG);
+		msg_set_lookup_scope(mhdr, 0);
+		msg_set_destnode(mhdr, dnode);
+		msg_set_destport(mhdr, dest->addr.id.ref);
+		msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
+	}
+
+new_mtu:
+	mtu = tipc_node_get_mtu(dnode, tsk->port.ref);
+	rc = tipc_msg_build2(mhdr, iov, 0, dsz, mtu, &buf);
+	if (rc < 0)
+		goto exit;
+
+	do {
+		rc = tipc_link_xmit2(buf, dnode, tsk->port.ref);
+		if (likely(rc >= 0)) {
+			if (sock->state != SS_READY)
 				sock->state = SS_CONNECTING;
+			rc = dsz;
 			break;
 		}
-		res = tipc_wait_for_sndmsg(sock, &timeo);
-		if (res)
+		if (rc == -EMSGSIZE)
+			goto new_mtu;
+
+		if (rc != -ELINKCONG)
 			break;
-	} while (1);
+
+		rc = tipc_wait_for_sndmsg(sock, &timeo);
+	} while (!rc);
 
 exit:
 	if (iocb)
 		release_sock(sk);
-	return res;
+
+	return rc;
 }
 
 static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
-- 
1.7.9.5

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH net-next 09/13] tipc: connection oriented transport uses new send functions
  2014-06-26  1:41 [PATCH net-next 00/13] tipc: new unicast transmission code Jon Maloy
                   ` (7 preceding siblings ...)
  2014-06-26  1:41 ` [PATCH net-next 08/13] tipc: RDM/DGRAM transport uses new fragmenting and sending functions Jon Maloy
@ 2014-06-26  1:41 ` Jon Maloy
  2014-06-26  1:41 ` [PATCH net-next 10/13] tipc: let port protocol senders use new link send function Jon Maloy
                   ` (4 subsequent siblings)
  13 siblings, 0 replies; 18+ messages in thread
From: Jon Maloy @ 2014-06-26  1:41 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

We move the message sending across established connections
to use the message preparation and send functions introduced
earlier in this series. We now do the message preparation
and call to the link send function directly from the socket,
instead of going via the port layer.

As a consequence of this change, the functions tipc_send(),
tipc_port_iovec_rcv(), tipc_port_iovec_reject() and tipc_reject_msg()
become unreferenced and can be eliminated from port.c. For the same
reason, the functions tipc_link_xmit_fast(), tipc_link_iovec_xmit_long()
and tipc_link_iovec_fast() can be eliminated from link.c.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/link.c   |  249 -----------------------------------------------------
 net/tipc/port.c   |  142 +-----------------------------
 net/tipc/port.h   |   12 ---
 net/tipc/socket.c |  180 +++++++++++++++-----------------------
 4 files changed, 70 insertions(+), 513 deletions(-)

diff --git a/net/tipc/link.c b/net/tipc/link.c
index 68d2afb..93a8033 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -82,9 +82,6 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf);
 static int  tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
 				 struct sk_buff **buf);
 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
-static int  tipc_link_iovec_long_xmit(struct tipc_port *sender,
-				      struct iovec const *msg_sect,
-				      unsigned int len, u32 destnode);
 static void link_state_event(struct tipc_link *l_ptr, u32 event);
 static void link_reset_statistics(struct tipc_link *l_ptr);
 static void link_print(struct tipc_link *l_ptr, const char *str);
@@ -1071,252 +1068,6 @@ void tipc_link_names_xmit(struct list_head *message_list, u32 dest)
 }
 
 /*
- * tipc_link_xmit_fast: Entry for data messages where the
- * destination link is known and the header is complete,
- * inclusive total message length. Very time critical.
- * Link is locked. Returns user data length.
- */
-static int tipc_link_xmit_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
-			       u32 *used_max_pkt)
-{
-	struct tipc_msg *msg = buf_msg(buf);
-	int res = msg_data_sz(msg);
-
-	if (likely(!link_congested(l_ptr))) {
-		if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
-			link_add_to_outqueue(l_ptr, buf, msg);
-			tipc_bearer_send(l_ptr->bearer_id, buf,
-					 &l_ptr->media_addr);
-			l_ptr->unacked_window = 0;
-			return res;
-		}
-		else
-			*used_max_pkt = l_ptr->max_pkt;
-	}
-	return __tipc_link_xmit(l_ptr, buf);  /* All other cases */
-}
-
-/*
- * tipc_link_iovec_xmit_fast: Entry for messages where the
- * destination processor is known and the header is complete,
- * except for total message length.
- * Returns user data length or errno.
- */
-int tipc_link_iovec_xmit_fast(struct tipc_port *sender,
-			      struct iovec const *msg_sect,
-			      unsigned int len, u32 destaddr)
-{
-	struct tipc_msg *hdr = &sender->phdr;
-	struct tipc_link *l_ptr;
-	struct sk_buff *buf;
-	struct tipc_node *node;
-	int res;
-	u32 selector = msg_origport(hdr) & 1;
-
-again:
-	/*
-	 * Try building message using port's max_pkt hint.
-	 * (Must not hold any locks while building message.)
-	 */
-	res = tipc_msg_build(hdr, msg_sect, len, sender->max_pkt, &buf);
-	/* Exit if build request was invalid */
-	if (unlikely(res < 0))
-		return res;
-
-	node = tipc_node_find(destaddr);
-	if (likely(node)) {
-		tipc_node_lock(node);
-		l_ptr = node->active_links[selector];
-		if (likely(l_ptr)) {
-			if (likely(buf)) {
-				res = tipc_link_xmit_fast(l_ptr, buf,
-							  &sender->max_pkt);
-exit:
-				tipc_node_unlock(node);
-				return res;
-			}
-
-			/* Exit if link (or bearer) is congested */
-			if (link_congested(l_ptr)) {
-				res = link_schedule_port(l_ptr,
-							 sender->ref, res);
-				goto exit;
-			}
-
-			/*
-			 * Message size exceeds max_pkt hint; update hint,
-			 * then re-try fast path or fragment the message
-			 */
-			sender->max_pkt = l_ptr->max_pkt;
-			tipc_node_unlock(node);
-
-
-			if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt)
-				goto again;
-
-			return tipc_link_iovec_long_xmit(sender, msg_sect,
-							 len, destaddr);
-		}
-		tipc_node_unlock(node);
-	}
-
-	/* Couldn't find a link to the destination node */
-	kfree_skb(buf);
-	tipc_port_iovec_reject(sender, hdr, msg_sect, len, TIPC_ERR_NO_NODE);
-	return -ENETUNREACH;
-}
-
-/*
- * tipc_link_iovec_long_xmit(): Entry for long messages where the
- * destination node is known and the header is complete,
- * inclusive total message length.
- * Link and bearer congestion status have been checked to be ok,
- * and are ignored if they change.
- *
- * Note that fragments do not use the full link MTU so that they won't have
- * to undergo refragmentation if link changeover causes them to be sent
- * over another link with an additional tunnel header added as prefix.
- * (Refragmentation will still occur if the other link has a smaller MTU.)
- *
- * Returns user data length or errno.
- */
-static int tipc_link_iovec_long_xmit(struct tipc_port *sender,
-				     struct iovec const *msg_sect,
-				     unsigned int len, u32 destaddr)
-{
-	struct tipc_link *l_ptr;
-	struct tipc_node *node;
-	struct tipc_msg *hdr = &sender->phdr;
-	u32 dsz = len;
-	u32 max_pkt, fragm_sz, rest;
-	struct tipc_msg fragm_hdr;
-	struct sk_buff *buf, *buf_chain, *prev;
-	u32 fragm_crs, fragm_rest, hsz, sect_rest;
-	const unchar __user *sect_crs;
-	int curr_sect;
-	u32 fragm_no;
-	int res = 0;
-
-again:
-	fragm_no = 1;
-	max_pkt = sender->max_pkt - INT_H_SIZE;
-		/* leave room for tunnel header in case of link changeover */
-	fragm_sz = max_pkt - INT_H_SIZE;
-		/* leave room for fragmentation header in each fragment */
-	rest = dsz;
-	fragm_crs = 0;
-	fragm_rest = 0;
-	sect_rest = 0;
-	sect_crs = NULL;
-	curr_sect = -1;
-
-	/* Prepare reusable fragment header */
-	tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
-		 INT_H_SIZE, msg_destnode(hdr));
-	msg_set_size(&fragm_hdr, max_pkt);
-	msg_set_fragm_no(&fragm_hdr, 1);
-
-	/* Prepare header of first fragment */
-	buf_chain = buf = tipc_buf_acquire(max_pkt);
-	if (!buf)
-		return -ENOMEM;
-	buf->next = NULL;
-	skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
-	hsz = msg_hdr_sz(hdr);
-	skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
-
-	/* Chop up message */
-	fragm_crs = INT_H_SIZE + hsz;
-	fragm_rest = fragm_sz - hsz;
-
-	do {		/* For all sections */
-		u32 sz;
-
-		if (!sect_rest) {
-			sect_rest = msg_sect[++curr_sect].iov_len;
-			sect_crs = msg_sect[curr_sect].iov_base;
-		}
-
-		if (sect_rest < fragm_rest)
-			sz = sect_rest;
-		else
-			sz = fragm_rest;
-
-		if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
-			res = -EFAULT;
-error:
-			kfree_skb_list(buf_chain);
-			return res;
-		}
-		sect_crs += sz;
-		sect_rest -= sz;
-		fragm_crs += sz;
-		fragm_rest -= sz;
-		rest -= sz;
-
-		if (!fragm_rest && rest) {
-
-			/* Initiate new fragment: */
-			if (rest <= fragm_sz) {
-				fragm_sz = rest;
-				msg_set_type(&fragm_hdr, LAST_FRAGMENT);
-			} else {
-				msg_set_type(&fragm_hdr, FRAGMENT);
-			}
-			msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
-			msg_set_fragm_no(&fragm_hdr, ++fragm_no);
-			prev = buf;
-			buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
-			if (!buf) {
-				res = -ENOMEM;
-				goto error;
-			}
-
-			buf->next = NULL;
-			prev->next = buf;
-			skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
-			fragm_crs = INT_H_SIZE;
-			fragm_rest = fragm_sz;
-		}
-	} while (rest > 0);
-
-	/*
-	 * Now we have a buffer chain. Select a link and check
-	 * that packet size is still OK
-	 */
-	node = tipc_node_find(destaddr);
-	if (likely(node)) {
-		tipc_node_lock(node);
-		l_ptr = node->active_links[sender->ref & 1];
-		if (!l_ptr) {
-			tipc_node_unlock(node);
-			goto reject;
-		}
-		if (l_ptr->max_pkt < max_pkt) {
-			sender->max_pkt = l_ptr->max_pkt;
-			tipc_node_unlock(node);
-			kfree_skb_list(buf_chain);
-			goto again;
-		}
-	} else {
-reject:
-		kfree_skb_list(buf_chain);
-		tipc_port_iovec_reject(sender, hdr, msg_sect, len,
-				       TIPC_ERR_NO_NODE);
-		return -ENETUNREACH;
-	}
-
-	/* Append chain of fragments to send queue & send them */
-	l_ptr->long_msg_seq_no++;
-	link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
-	l_ptr->stats.sent_fragments += fragm_no;
-	l_ptr->stats.sent_fragmented++;
-	tipc_link_push_queue(l_ptr);
-	tipc_node_unlock(node);
-	return dsz;
-}
-
-/*
  * tipc_link_push_packet: Push one unsent packet to the media
  */
 static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
diff --git a/net/tipc/port.c b/net/tipc/port.c
index 8350ec9..606ff1a 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -211,6 +211,7 @@ u32 tipc_port_init(struct tipc_port *p_ptr,
 	}
 
 	p_ptr->max_pkt = MAX_PKT_DEFAULT;
+	p_ptr->sent = 1;
 	p_ptr->ref = ref;
 	INIT_LIST_HEAD(&p_ptr->wait_list);
 	INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
@@ -279,92 +280,6 @@ static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr,
 	return buf;
 }
 
-int tipc_reject_msg(struct sk_buff *buf, u32 err)
-{
-	struct tipc_msg *msg = buf_msg(buf);
-	struct sk_buff *rbuf;
-	struct tipc_msg *rmsg;
-	int hdr_sz;
-	u32 imp;
-	u32 data_sz = msg_data_sz(msg);
-	u32 src_node;
-	u32 rmsg_sz;
-
-	/* discard rejected message if it shouldn't be returned to sender */
-	if (WARN(!msg_isdata(msg),
-		 "attempt to reject message with user=%u", msg_user(msg))) {
-		dump_stack();
-		goto exit;
-	}
-	if (msg_errcode(msg) || msg_dest_droppable(msg))
-		goto exit;
-
-	/*
-	 * construct returned message by copying rejected message header and
-	 * data (or subset), then updating header fields that need adjusting
-	 */
-	hdr_sz = msg_hdr_sz(msg);
-	rmsg_sz = hdr_sz + min_t(u32, data_sz, MAX_REJECT_SIZE);
-
-	rbuf = tipc_buf_acquire(rmsg_sz);
-	if (rbuf == NULL)
-		goto exit;
-
-	rmsg = buf_msg(rbuf);
-	skb_copy_to_linear_data(rbuf, msg, rmsg_sz);
-
-	if (msg_connected(rmsg)) {
-		imp = msg_importance(rmsg);
-		if (imp < TIPC_CRITICAL_IMPORTANCE)
-			msg_set_importance(rmsg, ++imp);
-	}
-	msg_set_non_seq(rmsg, 0);
-	msg_set_size(rmsg, rmsg_sz);
-	msg_set_errcode(rmsg, err);
-	msg_set_prevnode(rmsg, tipc_own_addr);
-	msg_swap_words(rmsg, 4, 5);
-	if (!msg_short(rmsg))
-		msg_swap_words(rmsg, 6, 7);
-
-	/* send self-abort message when rejecting on a connected port */
-	if (msg_connected(msg)) {
-		struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
-
-		if (p_ptr) {
-			struct sk_buff *abuf = NULL;
-
-			if (p_ptr->connected)
-				abuf = port_build_self_abort_msg(p_ptr, err);
-			tipc_port_unlock(p_ptr);
-			tipc_net_route_msg(abuf);
-		}
-	}
-
-	/* send returned message & dispose of rejected message */
-	src_node = msg_prevnode(msg);
-	if (in_own_node(src_node))
-		tipc_sk_rcv(rbuf);
-	else
-		tipc_link_xmit(rbuf, src_node, msg_link_selector(rmsg));
-exit:
-	kfree_skb(buf);
-	return data_sz;
-}
-
-int tipc_port_iovec_reject(struct tipc_port *p_ptr, struct tipc_msg *hdr,
-			   struct iovec const *msg_sect, unsigned int len,
-			   int err)
-{
-	struct sk_buff *buf;
-	int res;
-
-	res = tipc_msg_build(hdr, msg_sect, len, MAX_MSG_SIZE, &buf);
-	if (!buf)
-		return res;
-
-	return tipc_reject_msg(buf, err);
-}
-
 static void port_timeout(unsigned long ref)
 {
 	struct tipc_port *p_ptr = tipc_port_lock(ref);
@@ -698,7 +613,7 @@ int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr,
 			  (net_ev_handler)port_handle_node_down);
 	res = 0;
 exit:
-	p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref);
+	p_ptr->max_pkt = tipc_node_get_mtu(peer->node, ref);
 	return res;
 }
 
@@ -753,56 +668,3 @@ int tipc_port_shutdown(u32 ref)
 	tipc_net_route_msg(buf);
 	return tipc_port_disconnect(ref);
 }
-
-/*
- *  tipc_port_iovec_rcv: Concatenate and deliver sectioned
- *                       message for this node.
- */
-static int tipc_port_iovec_rcv(struct tipc_port *sender,
-			       struct iovec const *msg_sect,
-			       unsigned int len)
-{
-	struct sk_buff *buf;
-	int res;
-
-	res = tipc_msg_build(&sender->phdr, msg_sect, len, MAX_MSG_SIZE, &buf);
-	if (likely(buf))
-		tipc_sk_rcv(buf);
-	return res;
-}
-
-/**
- * tipc_send - send message sections on connection
- */
-int tipc_send(struct tipc_port *p_ptr,
-	      struct iovec const *msg_sect,
-	      unsigned int len)
-{
-	u32 destnode;
-	int res;
-
-	if (!p_ptr->connected)
-		return -EINVAL;
-
-	p_ptr->congested = 1;
-	if (!tipc_port_congested(p_ptr)) {
-		destnode = tipc_port_peernode(p_ptr);
-		if (likely(!in_own_node(destnode)))
-			res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len,
-							destnode);
-		else
-			res = tipc_port_iovec_rcv(p_ptr, msg_sect, len);
-
-		if (likely(res != -ELINKCONG)) {
-			p_ptr->congested = 0;
-			if (res > 0)
-				p_ptr->sent++;
-			return res;
-		}
-	}
-	if (tipc_port_unreliable(p_ptr)) {
-		p_ptr->congested = 0;
-		return len;
-	}
-	return -ELINKCONG;
-}
diff --git a/net/tipc/port.h b/net/tipc/port.h
index e566d55..231d948 100644
--- a/net/tipc/port.h
+++ b/net/tipc/port.h
@@ -104,8 +104,6 @@ struct tipc_port_list;
 u32 tipc_port_init(struct tipc_port *p_ptr,
 		   const unsigned int importance);
 
-int tipc_reject_msg(struct sk_buff *buf, u32 err);
-
 void tipc_acknowledge(u32 port_ref, u32 ack);
 
 void tipc_port_destroy(struct tipc_port *p_ptr);
@@ -136,21 +134,11 @@ int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg);
  * TIPC messaging routines
  */
 
-int tipc_send(struct tipc_port *port,
-	      struct iovec const *msg_sect,
-	      unsigned int len);
-
 int tipc_port_mcast_xmit(struct tipc_port *port,
 			 struct tipc_name_seq const *seq,
 			 struct iovec const *msg,
 			 unsigned int len);
 
-int tipc_port_iovec_reject(struct tipc_port *p_ptr,
-			   struct tipc_msg *hdr,
-			   struct iovec const *msg_sect,
-			   unsigned int len,
-			   int err);
-
 struct sk_buff *tipc_port_get_ports(void);
 void tipc_port_proto_rcv(struct sk_buff *buf);
 void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 5690751..bfe79bb 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -206,6 +206,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
 	sk->sk_data_ready = tipc_data_ready;
 	sk->sk_write_space = tipc_write_space;
 	tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
+	tsk->port.sent = 0;
 	atomic_set(&tsk->dupl_rcvcnt, 0);
 	tipc_port_unlock(port);
 
@@ -784,30 +785,40 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
 }
 
 /**
- * tipc_send_packet - send a connection-oriented message
- * @iocb: if NULL, indicates that socket lock is already held
+ * tipc_send_stream - send stream-oriented data
+ * @iocb: (unused)
  * @sock: socket structure
- * @m: message to send
- * @total_len: length of message
+ * @m: data to send
+ * @dsz: total length of data to be transmitted
  *
- * Used for SOCK_SEQPACKET messages and SOCK_STREAM data.
+ * Used for SOCK_STREAM data.
  *
- * Returns the number of bytes sent on success, or errno otherwise
+ * Returns the number of bytes sent on success (or partial success),
+ * or errno if no data sent
  */
-static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
-			    struct msghdr *m, size_t total_len)
+static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
+			    struct msghdr *m, size_t dsz)
 {
 	struct sock *sk = sock->sk;
 	struct tipc_sock *tsk = tipc_sk(sk);
+	struct tipc_port *port = &tsk->port;
+	struct tipc_msg *mhdr = &port->phdr;
+	struct sk_buff *buf;
 	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
-	int res = -EINVAL;
+	u32 ref = port->ref;
+	int rc = -EINVAL;
 	long timeo;
+	u32 dnode;
+	uint mtu, send, sent = 0;
 
 	/* Handle implied connection establishment */
-	if (unlikely(dest))
-		return tipc_sendmsg(iocb, sock, m, total_len);
-
-	if (total_len > TIPC_MAX_USER_MSG_SIZE)
+	if (unlikely(dest)) {
+		rc = tipc_sendmsg(iocb, sock, m, dsz);
+		if (dsz && (dsz == rc))
+			tsk->port.sent = 1;
+		return rc;
+	}
+	if (dsz > (uint)INT_MAX)
 		return -EMSGSIZE;
 
 	if (iocb)
@@ -815,123 +826,68 @@ static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
 
 	if (unlikely(sock->state != SS_CONNECTED)) {
 		if (sock->state == SS_DISCONNECTING)
-			res = -EPIPE;
+			rc = -EPIPE;
 		else
-			res = -ENOTCONN;
+			rc = -ENOTCONN;
 		goto exit;
 	}
 
 	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
+	dnode = tipc_port_peernode(port);
+	port->congested = 1;
+
+next:
+	mtu = port->max_pkt;
+	send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
+	rc = tipc_msg_build2(mhdr, m->msg_iov, sent, send, mtu, &buf);
+	if (unlikely(rc < 0))
+		goto exit;
 	do {
-		res = tipc_send(&tsk->port, m->msg_iov, total_len);
-		if (likely(res != -ELINKCONG))
-			break;
-		res = tipc_wait_for_sndpkt(sock, &timeo);
-		if (res)
-			break;
-	} while (1);
+		port->congested = 1;
+		if (likely(!tipc_port_congested(port))) {
+			rc = tipc_link_xmit2(buf, dnode, ref);
+			if (likely(!rc)) {
+				port->sent++;
+				sent += send;
+				if (sent == dsz)
+					break;
+				goto next;
+			}
+			if (rc == -EMSGSIZE) {
+				port->max_pkt = tipc_node_get_mtu(dnode, ref);
+				goto next;
+			}
+			if (rc != -ELINKCONG)
+				break;
+		}
+		rc = tipc_wait_for_sndpkt(sock, &timeo);
+	} while (!rc);
+
+	port->congested = 0;
 exit:
 	if (iocb)
 		release_sock(sk);
-	return res;
+	return sent ? sent : rc;
 }
 
 /**
- * tipc_send_stream - send stream-oriented data
- * @iocb: (unused)
+ * tipc_send_packet - send a connection-oriented message
+ * @iocb: if NULL, indicates that socket lock is already held
  * @sock: socket structure
- * @m: data to send
- * @total_len: total length of data to be sent
+ * @m: message to send
+ * @dsz: length of data to be transmitted
  *
- * Used for SOCK_STREAM data.
+ * Used for SOCK_SEQPACKET messages.
  *
- * Returns the number of bytes sent on success (or partial success),
- * or errno if no data sent
+ * Returns the number of bytes sent on success, or errno otherwise
  */
-static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
-			    struct msghdr *m, size_t total_len)
+static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
+			    struct msghdr *m, size_t dsz)
 {
-	struct sock *sk = sock->sk;
-	struct tipc_sock *tsk = tipc_sk(sk);
-	struct msghdr my_msg;
-	struct iovec my_iov;
-	struct iovec *curr_iov;
-	int curr_iovlen;
-	char __user *curr_start;
-	u32 hdr_size;
-	int curr_left;
-	int bytes_to_send;
-	int bytes_sent;
-	int res;
-
-	lock_sock(sk);
-
-	/* Handle special cases where there is no connection */
-	if (unlikely(sock->state != SS_CONNECTED)) {
-		if (sock->state == SS_UNCONNECTED)
-			res = tipc_send_packet(NULL, sock, m, total_len);
-		else
-			res = sock->state == SS_DISCONNECTING ? -EPIPE : -ENOTCONN;
-		goto exit;
-	}
-
-	if (unlikely(m->msg_name)) {
-		res = -EISCONN;
-		goto exit;
-	}
-
-	if (total_len > (unsigned int)INT_MAX) {
-		res = -EMSGSIZE;
-		goto exit;
-	}
-
-	/*
-	 * Send each iovec entry using one or more messages
-	 *
-	 * Note: This algorithm is good for the most likely case
-	 * (i.e. one large iovec entry), but could be improved to pass sets
-	 * of small iovec entries into send_packet().
-	 */
-	curr_iov = m->msg_iov;
-	curr_iovlen = m->msg_iovlen;
-	my_msg.msg_iov = &my_iov;
-	my_msg.msg_iovlen = 1;
-	my_msg.msg_flags = m->msg_flags;
-	my_msg.msg_name = NULL;
-	bytes_sent = 0;
-
-	hdr_size = msg_hdr_sz(&tsk->port.phdr);
-
-	while (curr_iovlen--) {
-		curr_start = curr_iov->iov_base;
-		curr_left = curr_iov->iov_len;
-
-		while (curr_left) {
-			bytes_to_send = tsk->port.max_pkt - hdr_size;
-			if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
-				bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
-			if (curr_left < bytes_to_send)
-				bytes_to_send = curr_left;
-			my_iov.iov_base = curr_start;
-			my_iov.iov_len = bytes_to_send;
-			res = tipc_send_packet(NULL, sock, &my_msg,
-					       bytes_to_send);
-			if (res < 0) {
-				if (bytes_sent)
-					res = bytes_sent;
-				goto exit;
-			}
-			curr_left -= bytes_to_send;
-			curr_start += bytes_to_send;
-			bytes_sent += bytes_to_send;
-		}
+	if (dsz > TIPC_MAX_USER_MSG_SIZE)
+		return -EMSGSIZE;
 
-		curr_iov++;
-	}
-	res = bytes_sent;
-exit:
-	release_sock(sk);
-	return res;
+	return tipc_send_stream(iocb, sock, m, dsz);
 }
 
 /**
-- 
1.7.9.5

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH net-next 10/13] tipc: let port protocol senders use new link send function
  2014-06-26  1:41 [PATCH net-next 00/13] tipc: new unicast transmission code Jon Maloy
                   ` (8 preceding siblings ...)
  2014-06-26  1:41 ` [PATCH net-next 09/13] tipc: connection oriented transport uses new send functions Jon Maloy
@ 2014-06-26  1:41 ` Jon Maloy
  2014-06-26  1:41 ` [PATCH net-next 11/13] tipc: same receive code path for connection protocol and data messages Jon Maloy
                   ` (3 subsequent siblings)
  13 siblings, 0 replies; 18+ messages in thread
From: Jon Maloy @ 2014-06-26  1:41 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

Several functions in port.c, related to the port protocol and
connection shutdown, need to send messages. We now convert them
to use the new link send function.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/port.c |   30 +++++++++++++++++++++++-------
 1 file changed, 23 insertions(+), 7 deletions(-)

diff --git a/net/tipc/port.c b/net/tipc/port.c
index 606ff1a..60aede0 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -236,6 +236,8 @@ u32 tipc_port_init(struct tipc_port *p_ptr,
 void tipc_port_destroy(struct tipc_port *p_ptr)
 {
 	struct sk_buff *buf = NULL;
+	struct tipc_msg *msg = NULL;
+	u32 peer;
 
 	tipc_withdraw(p_ptr, 0, NULL);
 
@@ -247,14 +249,15 @@ void tipc_port_destroy(struct tipc_port *p_ptr)
 	if (p_ptr->connected) {
 		buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
 		tipc_nodesub_unsubscribe(&p_ptr->subscription);
+		msg = buf_msg(buf);
+		peer = msg_destnode(msg);
+		tipc_link_xmit2(buf, peer, msg_link_selector(msg));
 	}
-
 	spin_lock_bh(&tipc_port_list_lock);
 	list_del(&p_ptr->port_list);
 	list_del(&p_ptr->wait_list);
 	spin_unlock_bh(&tipc_port_list_lock);
 	k_term_timer(&p_ptr->timer);
-	tipc_net_route_msg(buf);
 }
 
 /*
@@ -276,6 +279,7 @@ static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr,
 		msg_set_destport(msg, tipc_port_peerport(p_ptr));
 		msg_set_origport(msg, p_ptr->ref);
 		msg_set_msgcnt(msg, ack);
+		buf->next = NULL;
 	}
 	return buf;
 }
@@ -284,6 +288,7 @@ static void port_timeout(unsigned long ref)
 {
 	struct tipc_port *p_ptr = tipc_port_lock(ref);
 	struct sk_buff *buf = NULL;
+	struct tipc_msg *msg = NULL;
 
 	if (!p_ptr)
 		return;
@@ -302,7 +307,8 @@ static void port_timeout(unsigned long ref)
 		k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
 	}
 	tipc_port_unlock(p_ptr);
-	tipc_net_route_msg(buf);
+	msg = buf_msg(buf);
+	tipc_link_xmit2(buf, msg_destnode(msg),	msg_link_selector(msg));
 }
 
 
@@ -310,12 +316,14 @@ static void port_handle_node_down(unsigned long ref)
 {
 	struct tipc_port *p_ptr = tipc_port_lock(ref);
 	struct sk_buff *buf = NULL;
+	struct tipc_msg *msg = NULL;
 
 	if (!p_ptr)
 		return;
 	buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
 	tipc_port_unlock(p_ptr);
-	tipc_net_route_msg(buf);
+	msg = buf_msg(buf);
+	tipc_link_xmit2(buf, msg_destnode(msg),	msg_link_selector(msg));
 }
 
 
@@ -327,6 +335,7 @@ static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 er
 		struct tipc_msg *msg = buf_msg(buf);
 		msg_swap_words(msg, 4, 5);
 		msg_swap_words(msg, 6, 7);
+		buf->next = NULL;
 	}
 	return buf;
 }
@@ -351,6 +360,7 @@ static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 er
 		if (imp < TIPC_CRITICAL_IMPORTANCE)
 			msg_set_importance(msg, ++imp);
 		msg_set_errcode(msg, err);
+		buf->next = NULL;
 	}
 	return buf;
 }
@@ -401,7 +411,7 @@ void tipc_port_proto_rcv(struct sk_buff *buf)
 	p_ptr->probing_state = CONFIRMED;
 	tipc_port_unlock(p_ptr);
 exit:
-	tipc_net_route_msg(r_buf);
+	tipc_link_xmit2(r_buf, msg_destnode(msg), msg_link_selector(msg));
 	kfree_skb(buf);
 }
 
@@ -496,6 +506,7 @@ void tipc_acknowledge(u32 ref, u32 ack)
 {
 	struct tipc_port *p_ptr;
 	struct sk_buff *buf = NULL;
+	struct tipc_msg *msg;
 
 	p_ptr = tipc_port_lock(ref);
 	if (!p_ptr)
@@ -505,7 +516,10 @@ void tipc_acknowledge(u32 ref, u32 ack)
 		buf = port_build_proto_msg(p_ptr, CONN_ACK, ack);
 	}
 	tipc_port_unlock(p_ptr);
-	tipc_net_route_msg(buf);
+	if (!buf)
+		return;
+	msg = buf_msg(buf);
+	tipc_link_xmit2(buf, msg_destnode(msg),	msg_link_selector(msg));
 }
 
 int tipc_publish(struct tipc_port *p_ptr, unsigned int scope,
@@ -656,6 +670,7 @@ int tipc_port_disconnect(u32 ref)
  */
 int tipc_port_shutdown(u32 ref)
 {
+	struct tipc_msg *msg;
 	struct tipc_port *p_ptr;
 	struct sk_buff *buf = NULL;
 
@@ -665,6 +680,7 @@ int tipc_port_shutdown(u32 ref)
 
 	buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN);
 	tipc_port_unlock(p_ptr);
-	tipc_net_route_msg(buf);
+	msg = buf_msg(buf);
+	tipc_link_xmit2(buf, msg_destnode(msg),	msg_link_selector(msg));
 	return tipc_port_disconnect(ref);
 }
-- 
1.7.9.5

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH net-next 11/13] tipc: same receive code path for connection protocol and data messages
  2014-06-26  1:41 [PATCH net-next 00/13] tipc: new unicast transmission code Jon Maloy
                   ` (9 preceding siblings ...)
  2014-06-26  1:41 ` [PATCH net-next 10/13] tipc: let port protocol senders use new link send function Jon Maloy
@ 2014-06-26  1:41 ` Jon Maloy
  2014-06-26  1:41 ` [PATCH net-next 12/13] tipc: clean up connection protocol reception function Jon Maloy
                   ` (2 subsequent siblings)
  13 siblings, 0 replies; 18+ messages in thread
From: Jon Maloy @ 2014-06-26  1:41 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

As a preparation to eliminate port_lock we need to bring reception
of connection protocol messages under proper protection of bh_lock_sock
or socket owner.

We fix this by letting those messages follow the same code path as
incoming data messages.

As a side effect of this change, the last reference to the function
net_route_msg() disappears, and we can eliminate that function.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/link.c   |   18 ++++++++++++------
 net/tipc/net.c    |   40 ----------------------------------------
 net/tipc/net.h    |    2 --
 net/tipc/port.c   |    7 +------
 net/tipc/port.h   |    2 +-
 net/tipc/socket.c |    5 +++++
 6 files changed, 19 insertions(+), 55 deletions(-)

diff --git a/net/tipc/link.c b/net/tipc/link.c
index 93a8033..96a8072 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1479,6 +1479,7 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
 		case TIPC_MEDIUM_IMPORTANCE:
 		case TIPC_HIGH_IMPORTANCE:
 		case TIPC_CRITICAL_IMPORTANCE:
+		case CONN_MANAGER:
 			tipc_node_unlock(n_ptr);
 			tipc_sk_rcv(buf);
 			continue;
@@ -1493,10 +1494,6 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
 			tipc_node_unlock(n_ptr);
 			tipc_named_rcv(buf);
 			continue;
-		case CONN_MANAGER:
-			tipc_node_unlock(n_ptr);
-			tipc_port_proto_rcv(buf);
-			continue;
 		case BCAST_PROTOCOL:
 			tipc_link_sync_rcv(n_ptr, buf);
 			break;
@@ -2106,6 +2103,7 @@ void tipc_link_bundle_rcv(struct sk_buff *buf)
 	u32 msgcount = msg_msgcnt(buf_msg(buf));
 	u32 pos = INT_H_SIZE;
 	struct sk_buff *obuf;
+	struct tipc_msg *omsg;
 
 	while (msgcount--) {
 		obuf = buf_extract(buf, pos);
@@ -2113,8 +2111,16 @@ void tipc_link_bundle_rcv(struct sk_buff *buf)
 			pr_warn("Link unable to unbundle message(s)\n");
 			break;
 		}
-		pos += align(msg_size(buf_msg(obuf)));
-		tipc_net_route_msg(obuf);
+		omsg = buf_msg(obuf);
+		pos += align(msg_size(omsg));
+		if (msg_isdata(omsg) || (msg_user(omsg) == CONN_MANAGER)) {
+			tipc_sk_rcv(obuf);
+		} else if (msg_user(omsg) == NAME_DISTRIBUTOR) {
+			tipc_named_rcv(obuf);
+		} else {
+			pr_warn("Illegal bundled msg: %u\n", msg_user(omsg));
+			kfree_skb(obuf);
+		}
 	}
 	kfree_skb(buf);
 }
diff --git a/net/tipc/net.c b/net/tipc/net.c
index 5f7d6ff..7fcc949 100644
--- a/net/tipc/net.c
+++ b/net/tipc/net.c
@@ -103,46 +103,6 @@
  *       This is always used within the scope of a tipc_nametbl_lock(read).
  *     - A local spin_lock protecting the queue of subscriber events.
 */
-void tipc_net_route_msg(struct sk_buff *buf)
-{
-	struct tipc_msg *msg;
-	u32 dnode;
-
-	if (!buf)
-		return;
-	msg = buf_msg(buf);
-
-	/* Handle message for this node */
-	dnode = msg_short(msg) ? tipc_own_addr : msg_destnode(msg);
-	if (tipc_in_scope(dnode, tipc_own_addr)) {
-		if (msg_isdata(msg)) {
-			if (msg_mcast(msg))
-				tipc_port_mcast_rcv(buf, NULL);
-			else if (msg_destport(msg)) {
-				tipc_sk_rcv(buf);
-			} else {
-				pr_warn("Cannot route msg; no destination\n");
-				kfree_skb(buf);
-			}
-			return;
-		}
-		switch (msg_user(msg)) {
-		case NAME_DISTRIBUTOR:
-			tipc_named_rcv(buf);
-			break;
-		case CONN_MANAGER:
-			tipc_port_proto_rcv(buf);
-			break;
-		default:
-			kfree_skb(buf);
-		}
-		return;
-	}
-
-	/* Handle message for another node */
-	skb_trim(buf, msg_size(msg));
-	tipc_link_xmit(buf, dnode, msg_link_selector(msg));
-}
 
 int tipc_net_start(u32 addr)
 {
diff --git a/net/tipc/net.h b/net/tipc/net.h
index c6c2b46..59ef338 100644
--- a/net/tipc/net.h
+++ b/net/tipc/net.h
@@ -37,8 +37,6 @@
 #ifndef _TIPC_NET_H
 #define _TIPC_NET_H
 
-void tipc_net_route_msg(struct sk_buff *buf);
-
 int tipc_net_start(u32 addr);
 void tipc_net_stop(void);
 
diff --git a/net/tipc/port.c b/net/tipc/port.c
index 60aede0..dcc6309 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -365,16 +365,14 @@ static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 er
 	return buf;
 }
 
-void tipc_port_proto_rcv(struct sk_buff *buf)
+void tipc_port_proto_rcv(struct tipc_port *p_ptr, struct sk_buff *buf)
 {
 	struct tipc_msg *msg = buf_msg(buf);
-	struct tipc_port *p_ptr;
 	struct sk_buff *r_buf = NULL;
 	u32 destport = msg_destport(msg);
 	int wakeable;
 
 	/* Validate connection */
-	p_ptr = tipc_port_lock(destport);
 	if (!p_ptr || !p_ptr->connected || !tipc_port_peer_msg(p_ptr, msg)) {
 		r_buf = tipc_buf_acquire(BASIC_H_SIZE);
 		if (r_buf) {
@@ -385,8 +383,6 @@ void tipc_port_proto_rcv(struct sk_buff *buf)
 			msg_set_origport(msg, destport);
 			msg_set_destport(msg, msg_origport(msg));
 		}
-		if (p_ptr)
-			tipc_port_unlock(p_ptr);
 		goto exit;
 	}
 
@@ -409,7 +405,6 @@ void tipc_port_proto_rcv(struct sk_buff *buf)
 		break;
 	}
 	p_ptr->probing_state = CONFIRMED;
-	tipc_port_unlock(p_ptr);
 exit:
 	tipc_link_xmit2(r_buf, msg_destnode(msg), msg_link_selector(msg));
 	kfree_skb(buf);
diff --git a/net/tipc/port.h b/net/tipc/port.h
index 231d948..3f28fd4 100644
--- a/net/tipc/port.h
+++ b/net/tipc/port.h
@@ -140,7 +140,7 @@ int tipc_port_mcast_xmit(struct tipc_port *port,
 			 unsigned int len);
 
 struct sk_buff *tipc_port_get_ports(void);
-void tipc_port_proto_rcv(struct sk_buff *buf);
+void tipc_port_proto_rcv(struct tipc_port *port, struct sk_buff *buf);
 void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp);
 void tipc_port_reinit(void);
 
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index bfe79bb..d838a4b 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1416,6 +1416,11 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf)
 	unsigned int limit = rcvbuf_limit(sk, buf);
 	int rc = TIPC_OK;
 
+	if (unlikely(msg_user(msg) == CONN_MANAGER)) {
+		tipc_port_proto_rcv(&tsk->port, buf);
+		return TIPC_OK;
+	}
+
 	/* Reject message if it is wrong sort of message for socket */
 	if (msg_type(msg) > TIPC_DIRECT_MSG)
 		return -TIPC_ERR_NO_PORT;
-- 
1.7.9.5

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH net-next 12/13] tipc: clean up connection protocol reception function
  2014-06-26  1:41 [PATCH net-next 00/13] tipc: new unicast transmission code Jon Maloy
                   ` (10 preceding siblings ...)
  2014-06-26  1:41 ` [PATCH net-next 11/13] tipc: same receive code path for connection protocol and data messages Jon Maloy
@ 2014-06-26  1:41 ` Jon Maloy
  2014-06-26  1:41 ` [PATCH net-next 13/13] tipc: simplify connection congestion handling Jon Maloy
  2014-06-27 19:56 ` [PATCH net-next 00/13] tipc: new unicast transmission code David Miller
  13 siblings, 0 replies; 18+ messages in thread
From: Jon Maloy @ 2014-06-26  1:41 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

We simplify the code for receiving connection probes, leveraging the
recently introduced tipc_msg_reverse() function. We also stick to
the principle of sending a possible response message directly from
the calling (tipc_sk_rcv or backlog_rcv) functions, hence making
the call chain shallower and easier to follow.

We make one small protocol change here, allowed according to
the spec. If a protocol message arrives from a remote socket that
is not the one we are connected to, we are currently generating a
connection abort message and send it to the source. This behavior
is unnecessary, and might even be a security risk, so instead we
now choose to only ignore the message. The consequnce for the sender
is that he will need longer time to discover his mistake (until the
next timeout), but this is an extreme corner case, and may happen
anyway under other circumstances, so we deem this change acceptable.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/msg.c    |   10 ++++++---
 net/tipc/port.c   |   53 +++-----------------------------------------
 net/tipc/port.h   |    1 -
 net/tipc/socket.c |   64 +++++++++++++++++++++++++++++++++++++++++++++--------
 net/tipc/socket.h |    3 +++
 5 files changed, 68 insertions(+), 63 deletions(-)

diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 7bfc442..6ec9584 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -348,13 +348,17 @@ bool tipc_msg_reverse(struct sk_buff *buf, u32 *dnode, int err)
 	struct tipc_msg ohdr;
 	uint rdsz = min_t(uint, msg_data_sz(msg), MAX_FORWARD_SIZE);
 
-	if (skb_linearize(buf) || !msg_isdata(msg))
+	if (skb_linearize(buf))
+		goto exit;
+	if (msg_dest_droppable(msg))
 		goto exit;
-	if (msg_dest_droppable(msg) || msg_errcode(msg))
+	if (msg_errcode(msg))
 		goto exit;
 
 	memcpy(&ohdr, msg, msg_hdr_sz(msg));
-	msg_set_importance(msg, min_t(uint, ++imp, TIPC_CRITICAL_IMPORTANCE));
+	imp = min_t(uint, imp + 1, TIPC_CRITICAL_IMPORTANCE);
+	if (msg_isdata(msg))
+		msg_set_importance(msg, imp);
 	msg_set_errcode(msg, err);
 	msg_set_origport(msg, msg_destport(&ohdr));
 	msg_set_destport(msg, msg_origport(&ohdr));
diff --git a/net/tipc/port.c b/net/tipc/port.c
index dcc6309..9f53d5a 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -42,8 +42,6 @@
 
 /* Connection management: */
 #define PROBING_INTERVAL 3600000	/* [ms] => 1 h */
-#define CONFIRMED 0
-#define PROBING 1
 
 #define MAX_REJECT_SIZE 1024
 
@@ -299,11 +297,11 @@ static void port_timeout(unsigned long ref)
 	}
 
 	/* Last probe answered ? */
-	if (p_ptr->probing_state == PROBING) {
+	if (p_ptr->probing_state == TIPC_CONN_PROBING) {
 		buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
 	} else {
 		buf = port_build_proto_msg(p_ptr, CONN_PROBE, 0);
-		p_ptr->probing_state = PROBING;
+		p_ptr->probing_state = TIPC_CONN_PROBING;
 		k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
 	}
 	tipc_port_unlock(p_ptr);
@@ -365,51 +363,6 @@ static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 er
 	return buf;
 }
 
-void tipc_port_proto_rcv(struct tipc_port *p_ptr, struct sk_buff *buf)
-{
-	struct tipc_msg *msg = buf_msg(buf);
-	struct sk_buff *r_buf = NULL;
-	u32 destport = msg_destport(msg);
-	int wakeable;
-
-	/* Validate connection */
-	if (!p_ptr || !p_ptr->connected || !tipc_port_peer_msg(p_ptr, msg)) {
-		r_buf = tipc_buf_acquire(BASIC_H_SIZE);
-		if (r_buf) {
-			msg = buf_msg(r_buf);
-			tipc_msg_init(msg, TIPC_HIGH_IMPORTANCE, TIPC_CONN_MSG,
-				      BASIC_H_SIZE, msg_orignode(msg));
-			msg_set_errcode(msg, TIPC_ERR_NO_PORT);
-			msg_set_origport(msg, destport);
-			msg_set_destport(msg, msg_origport(msg));
-		}
-		goto exit;
-	}
-
-	/* Process protocol message sent by peer */
-	switch (msg_type(msg)) {
-	case CONN_ACK:
-		wakeable = tipc_port_congested(p_ptr) && p_ptr->congested;
-		p_ptr->acked += msg_msgcnt(msg);
-		if (!tipc_port_congested(p_ptr)) {
-			p_ptr->congested = 0;
-			if (wakeable)
-				tipc_port_wakeup(p_ptr);
-		}
-		break;
-	case CONN_PROBE:
-		r_buf = port_build_proto_msg(p_ptr, CONN_PROBE_REPLY, 0);
-		break;
-	default:
-		/* CONN_PROBE_REPLY or unrecognized - no action required */
-		break;
-	}
-	p_ptr->probing_state = CONFIRMED;
-exit:
-	tipc_link_xmit2(r_buf, msg_destnode(msg), msg_link_selector(msg));
-	kfree_skb(buf);
-}
-
 static int port_print(struct tipc_port *p_ptr, char *buf, int len, int full_id)
 {
 	struct publication *publ;
@@ -613,7 +566,7 @@ int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr,
 	msg_set_hdr_sz(msg, SHORT_H_SIZE);
 
 	p_ptr->probing_interval = PROBING_INTERVAL;
-	p_ptr->probing_state = CONFIRMED;
+	p_ptr->probing_state = TIPC_CONN_OK;
 	p_ptr->connected = 1;
 	k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
 
diff --git a/net/tipc/port.h b/net/tipc/port.h
index 3f28fd4..3a4f808 100644
--- a/net/tipc/port.h
+++ b/net/tipc/port.h
@@ -140,7 +140,6 @@ int tipc_port_mcast_xmit(struct tipc_port *port,
 			 unsigned int len);
 
 struct sk_buff *tipc_port_get_ports(void);
-void tipc_port_proto_rcv(struct tipc_port *port, struct sk_buff *buf);
 void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp);
 void tipc_port_reinit(void);
 
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index d838a4b..1762323 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -46,6 +46,7 @@
 #define SS_READY	-2	/* socket is connectionless */
 
 #define CONN_TIMEOUT_DEFAULT	8000	/* default connect timeout = 8s */
+#define TIPC_FWD_MSG	        1
 
 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
 static void tipc_data_ready(struct sock *sk);
@@ -534,6 +535,46 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
 }
 
 /**
+ * tipc_sk_proto_rcv - receive a connection mng protocol message
+ * @tsk: receiving socket
+ * @dnode: node to send response message to, if any
+ * @buf: buffer containing protocol message
+ * Returns 0 (TIPC_OK) if message was consumed, 1 (TIPC_FWD_MSG) if
+ * (CONN_PROBE_REPLY) message should be forwarded.
+ */
+int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, struct sk_buff *buf)
+{
+	struct tipc_msg *msg = buf_msg(buf);
+	struct tipc_port *port = &tsk->port;
+	int wakeable;
+
+	/* Ignore if connection cannot be validated: */
+	if (!port->connected || !tipc_port_peer_msg(port, msg))
+		goto exit;
+
+	port->probing_state = TIPC_CONN_OK;
+
+	if (msg_type(msg) == CONN_ACK) {
+		wakeable = tipc_port_congested(port) && port->congested;
+		port->acked += msg_msgcnt(msg);
+		if (!tipc_port_congested(port)) {
+			port->congested = 0;
+			if (wakeable)
+				tipc_port_wakeup(port);
+		}
+	} else if (msg_type(msg) == CONN_PROBE) {
+		if (!tipc_msg_reverse(buf, dnode, TIPC_OK))
+			return TIPC_OK;
+		msg_set_type(msg, CONN_PROBE_REPLY);
+		return TIPC_FWD_MSG;
+	}
+	/* Do nothing if msg_type() == CONN_PROBE_REPLY */
+exit:
+	kfree_skb(buf);
+	return TIPC_OK;
+}
+
+/**
  * dest_name_check - verify user is permitted to send to specified port name
  * @dest: destination address
  * @m: descriptor for message to be sent
@@ -1406,7 +1447,7 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
  * Called with socket lock already taken; port lock may also be taken.
  *
  * Returns 0 (TIPC_OK) if message was consumed, -TIPC error code if message
- * to be rejected.
+ * to be rejected, 1 (TIPC_FWD_MSG) if (CONN_MANAGER) message to be forwarded
  */
 static int filter_rcv(struct sock *sk, struct sk_buff *buf)
 {
@@ -1414,12 +1455,11 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf)
 	struct tipc_sock *tsk = tipc_sk(sk);
 	struct tipc_msg *msg = buf_msg(buf);
 	unsigned int limit = rcvbuf_limit(sk, buf);
+	u32 onode;
 	int rc = TIPC_OK;
 
-	if (unlikely(msg_user(msg) == CONN_MANAGER)) {
-		tipc_port_proto_rcv(&tsk->port, buf);
-		return TIPC_OK;
-	}
+	if (unlikely(msg_user(msg) == CONN_MANAGER))
+		return tipc_sk_proto_rcv(tsk, &onode, buf);
 
 	/* Reject message if it is wrong sort of message for socket */
 	if (msg_type(msg) > TIPC_DIRECT_MSG)
@@ -1465,10 +1505,16 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
 
 	rc = filter_rcv(sk, buf);
 
-	if (unlikely(rc && tipc_msg_reverse(buf, &onode, -rc)))
-		tipc_link_xmit2(buf, onode, 0);
-	else if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
-		atomic_add(truesize, &tsk->dupl_rcvcnt);
+	if (likely(!rc)) {
+		if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
+			atomic_add(truesize, &tsk->dupl_rcvcnt);
+		return 0;
+	}
+
+	if ((rc < 0) && !tipc_msg_reverse(buf, &onode, -rc))
+		return 0;
+
+	tipc_link_xmit2(buf, onode, 0);
 
 	return 0;
 }
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 3afcd2a..69fd06b 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -38,6 +38,9 @@
 #include "port.h"
 #include <net/sock.h>
 
+#define TIPC_CONN_OK      0
+#define TIPC_CONN_PROBING 1
+
 /**
  * struct tipc_sock - TIPC socket structure
  * @sk: socket - interacts with 'port' and with user via the socket API
-- 
1.7.9.5

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* [PATCH net-next 13/13] tipc: simplify connection congestion handling
  2014-06-26  1:41 [PATCH net-next 00/13] tipc: new unicast transmission code Jon Maloy
                   ` (11 preceding siblings ...)
  2014-06-26  1:41 ` [PATCH net-next 12/13] tipc: clean up connection protocol reception function Jon Maloy
@ 2014-06-26  1:41 ` Jon Maloy
  2014-06-27 19:56 ` [PATCH net-next 00/13] tipc: new unicast transmission code David Miller
  13 siblings, 0 replies; 18+ messages in thread
From: Jon Maloy @ 2014-06-26  1:41 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

As a consequence of the recently introduced serialized access
to the socket in commit 8d94168a761819d10252bab1f8de6d7b202c3baa
("tipc: same receive code path for connection protocol and data
messages") we can make a number of simplifications in the
detection and handling of connection congestion situations.

- We don't need to keep two counters, one for sent messages and one
  for acked messages. There is no longer any risk for races between
  acknowledge messages arriving in BH and data message sending
  running in user context. So we merge this into one counter,
  'sent_unacked', which is incremented at sending and subtracted
  from at acknowledge reception.

- We don't need to set the 'congested' field in tipc_port to
  true before we sent the message, and clear it when sending
  is successful. (As a matter of fact, it was never necessary;
  the field was set in link_schedule_port() before any wakeup
  could arrive anyway.)

- We keep the conditions for link congestion and connection connection
  congestion separated. There would otherwise be a risk that an arriving
  acknowledge message may wake up a user sleeping because of link
  congestion.

- We can simplify reception of acknowledge messages.

We also make some cosmetic/structural changes:

- We rename the 'congested' field to the more correct 'link_cong´.

- We rename 'conn_unacked' to 'rcv_unacked'

- We move the above mentioned fields from struct tipc_port to
  struct tipc_sock.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/link.c   |   10 +++++++---
 net/tipc/port.c   |   12 ++----------
 net/tipc/port.h   |   16 ----------------
 net/tipc/socket.c |   48 +++++++++++++++++++++++-------------------------
 net/tipc/socket.h |   11 +++++++++++
 5 files changed, 43 insertions(+), 54 deletions(-)

diff --git a/net/tipc/link.c b/net/tipc/link.c
index 96a8072..a081e7d 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -332,13 +332,15 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
 static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
 {
 	struct tipc_port *p_ptr;
+	struct tipc_sock *tsk;
 
 	spin_lock_bh(&tipc_port_list_lock);
 	p_ptr = tipc_port_lock(origport);
 	if (p_ptr) {
 		if (!list_empty(&p_ptr->wait_list))
 			goto exit;
-		p_ptr->congested = 1;
+		tsk = tipc_port_to_sock(p_ptr);
+		tsk->link_cong = 1;
 		p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
 		list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
 		l_ptr->stats.link_congs++;
@@ -352,6 +354,7 @@ exit:
 void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
 {
 	struct tipc_port *p_ptr;
+	struct tipc_sock *tsk;
 	struct tipc_port *temp_p_ptr;
 	int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
 
@@ -367,10 +370,11 @@ void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
 				 wait_list) {
 		if (win <= 0)
 			break;
+		tsk = tipc_port_to_sock(p_ptr);
 		list_del_init(&p_ptr->wait_list);
 		spin_lock_bh(p_ptr->lock);
-		p_ptr->congested = 0;
-		tipc_port_wakeup(p_ptr);
+		tsk->link_cong = 0;
+		tipc_sock_wakeup(tsk);
 		win -= p_ptr->waiting_pkts;
 		spin_unlock_bh(p_ptr->lock);
 	}
diff --git a/net/tipc/port.c b/net/tipc/port.c
index 9f53d5a..0d09dcb 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -186,12 +186,6 @@ exit:
 	tipc_port_list_free(dp);
 }
 
-
-void tipc_port_wakeup(struct tipc_port *port)
-{
-	tipc_sock_wakeup(tipc_port_to_sock(port));
-}
-
 /* tipc_port_init - intiate TIPC port and lock it
  *
  * Returns obtained reference if initialization is successful, zero otherwise
@@ -209,7 +203,6 @@ u32 tipc_port_init(struct tipc_port *p_ptr,
 	}
 
 	p_ptr->max_pkt = MAX_PKT_DEFAULT;
-	p_ptr->sent = 1;
 	p_ptr->ref = ref;
 	INIT_LIST_HEAD(&p_ptr->wait_list);
 	INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
@@ -459,10 +452,9 @@ void tipc_acknowledge(u32 ref, u32 ack)
 	p_ptr = tipc_port_lock(ref);
 	if (!p_ptr)
 		return;
-	if (p_ptr->connected) {
-		p_ptr->conn_unacked -= ack;
+	if (p_ptr->connected)
 		buf = port_build_proto_msg(p_ptr, CONN_ACK, ack);
-	}
+
 	tipc_port_unlock(p_ptr);
 	if (!buf)
 		return;
diff --git a/net/tipc/port.h b/net/tipc/port.h
index 3a4f808..0e47052 100644
--- a/net/tipc/port.h
+++ b/net/tipc/port.h
@@ -53,17 +53,13 @@
  * @connected: non-zero if port is currently connected to a peer port
  * @conn_type: TIPC type used when connection was established
  * @conn_instance: TIPC instance used when connection was established
- * @conn_unacked: number of unacknowledged messages received from peer port
  * @published: non-zero if port has one or more associated names
- * @congested: non-zero if cannot send because of link or port congestion
  * @max_pkt: maximum packet size "hint" used when building messages sent by port
  * @ref: unique reference to port in TIPC object registry
  * @phdr: preformatted message header used when sending messages
  * @port_list: adjacent ports in TIPC's global list of ports
  * @wait_list: adjacent ports in list of ports waiting on link congestion
  * @waiting_pkts:
- * @sent: # of non-empty messages sent by port
- * @acked: # of non-empty message acknowledgements from connected port's peer
  * @publications: list of publications for port
  * @pub_count: total # of publications port has made during its lifetime
  * @probing_state:
@@ -76,17 +72,13 @@ struct tipc_port {
 	int connected;
 	u32 conn_type;
 	u32 conn_instance;
-	u32 conn_unacked;
 	int published;
-	u32 congested;
 	u32 max_pkt;
 	u32 ref;
 	struct tipc_msg phdr;
 	struct list_head port_list;
 	struct list_head wait_list;
 	u32 waiting_pkts;
-	u32 sent;
-	u32 acked;
 	struct list_head publications;
 	u32 pub_count;
 	u32 probing_state;
@@ -120,8 +112,6 @@ int tipc_port_disconnect(u32 portref);
 
 int tipc_port_shutdown(u32 ref);
 
-void tipc_port_wakeup(struct tipc_port *port);
-
 /*
  * The following routines require that the port be locked on entry
  */
@@ -161,12 +151,6 @@ static inline void tipc_port_unlock(struct tipc_port *p_ptr)
 	spin_unlock_bh(p_ptr->lock);
 }
 
-static inline int tipc_port_congested(struct tipc_port *p_ptr)
-{
-	return ((p_ptr->sent - p_ptr->acked) >= TIPC_FLOWCTRL_WIN);
-}
-
-
 static inline u32 tipc_port_peernode(struct tipc_port *p_ptr)
 {
 	return msg_destnode(&p_ptr->phdr);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 1762323..ede78b1 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -207,7 +207,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
 	sk->sk_data_ready = tipc_data_ready;
 	sk->sk_write_space = tipc_write_space;
 	tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
-	tsk->port.sent = 0;
+	tsk->sent_unacked = 0;
 	atomic_set(&tsk->dupl_rcvcnt, 0);
 	tipc_port_unlock(port);
 
@@ -513,12 +513,12 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
 
 	switch ((int)sock->state) {
 	case SS_UNCONNECTED:
-		if (!tsk->port.congested)
+		if (!tsk->link_cong)
 			mask |= POLLOUT;
 		break;
 	case SS_READY:
 	case SS_CONNECTED:
-		if (!tsk->port.congested)
+		if (!tsk->link_cong && !tipc_sk_conn_cong(tsk))
 			mask |= POLLOUT;
 		/* fall thru' */
 	case SS_CONNECTING:
@@ -546,7 +546,7 @@ int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, struct sk_buff *buf)
 {
 	struct tipc_msg *msg = buf_msg(buf);
 	struct tipc_port *port = &tsk->port;
-	int wakeable;
+	int conn_cong;
 
 	/* Ignore if connection cannot be validated: */
 	if (!port->connected || !tipc_port_peer_msg(port, msg))
@@ -555,13 +555,10 @@ int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, struct sk_buff *buf)
 	port->probing_state = TIPC_CONN_OK;
 
 	if (msg_type(msg) == CONN_ACK) {
-		wakeable = tipc_port_congested(port) && port->congested;
-		port->acked += msg_msgcnt(msg);
-		if (!tipc_port_congested(port)) {
-			port->congested = 0;
-			if (wakeable)
-				tipc_port_wakeup(port);
-		}
+		conn_cong = tipc_sk_conn_cong(tsk);
+		tsk->sent_unacked -= msg_msgcnt(msg);
+		if (conn_cong)
+			tipc_sock_wakeup(tsk);
 	} else if (msg_type(msg) == CONN_PROBE) {
 		if (!tipc_msg_reverse(buf, dnode, TIPC_OK))
 			return TIPC_OK;
@@ -626,7 +623,7 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
 			return sock_intr_errno(*timeo_p);
 
 		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
-		done = sk_wait_event(sk, timeo_p, !tsk->port.congested);
+		done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
 		finish_wait(sk_sleep(sk), &wait);
 	} while (!done);
 	return 0;
@@ -800,7 +797,6 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
 {
 	struct sock *sk = sock->sk;
 	struct tipc_sock *tsk = tipc_sk(sk);
-	struct tipc_port *port = &tsk->port;
 	DEFINE_WAIT(wait);
 	int done;
 
@@ -819,7 +815,9 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
 
 		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
 		done = sk_wait_event(sk, timeo_p,
-				     (!port->congested || !port->connected));
+				     (!tsk->link_cong &&
+				      !tipc_sk_conn_cong(tsk)) ||
+				     !tsk->port.connected);
 		finish_wait(sk_sleep(sk), &wait);
 	} while (!done);
 	return 0;
@@ -856,7 +854,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
 	if (unlikely(dest)) {
 		rc = tipc_sendmsg(iocb, sock, m, dsz);
 		if (dsz && (dsz == rc))
-			tsk->port.sent = 1;
+			tsk->sent_unacked = 1;
 		return rc;
 	}
 	if (dsz > (uint)INT_MAX)
@@ -875,7 +873,6 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
 
 	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
 	dnode = tipc_port_peernode(port);
-	port->congested = 1;
 
 next:
 	mtu = port->max_pkt;
@@ -884,11 +881,10 @@ next:
 	if (unlikely(rc < 0))
 		goto exit;
 	do {
-		port->congested = 1;
-		if (likely(!tipc_port_congested(port))) {
+		if (likely(!tipc_sk_conn_cong(tsk))) {
 			rc = tipc_link_xmit2(buf, dnode, ref);
 			if (likely(!rc)) {
-				port->sent++;
+				tsk->sent_unacked++;
 				sent += send;
 				if (sent == dsz)
 					break;
@@ -903,8 +899,6 @@ next:
 		}
 		rc = tipc_wait_for_sndpkt(sock, &timeo);
 	} while (!rc);
-
-	port->congested = 0;
 exit:
 	if (iocb)
 		release_sock(sk);
@@ -1169,8 +1163,10 @@ restart:
 	/* Consume received message (optional) */
 	if (likely(!(flags & MSG_PEEK))) {
 		if ((sock->state != SS_READY) &&
-		    (++port->conn_unacked >= TIPC_CONNACK_INTV))
-			tipc_acknowledge(port->ref, port->conn_unacked);
+		    (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
+			tipc_acknowledge(port->ref, tsk->rcv_unacked);
+			tsk->rcv_unacked = 0;
+		}
 		advance_rx_queue(sk);
 	}
 exit:
@@ -1278,8 +1274,10 @@ restart:
 
 	/* Consume received message (optional) */
 	if (likely(!(flags & MSG_PEEK))) {
-		if (unlikely(++port->conn_unacked >= TIPC_CONNACK_INTV))
-			tipc_acknowledge(port->ref, port->conn_unacked);
+		if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
+			tipc_acknowledge(port->ref, tsk->rcv_unacked);
+			tsk->rcv_unacked = 0;
+		}
 		advance_rx_queue(sk);
 	}
 
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 69fd06b..2cdede9 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -48,6 +48,9 @@
  * @peer_name: the peer of the connection, if any
  * @conn_timeout: the time we can wait for an unresponded setup request
  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
+ * @link_cong: non-zero if owner must sleep because of link congestion
+ * @sent_unacked: # messages sent by socket, and not yet acked by peer
+ * @rcv_unacked: # messages read by user, but not yet acked back to peer
  */
 
 struct tipc_sock {
@@ -55,6 +58,9 @@ struct tipc_sock {
 	struct tipc_port port;
 	unsigned int conn_timeout;
 	atomic_t dupl_rcvcnt;
+	int link_cong;
+	uint sent_unacked;
+	uint rcv_unacked;
 };
 
 static inline struct tipc_sock *tipc_sk(const struct sock *sk)
@@ -72,6 +78,11 @@ static inline void tipc_sock_wakeup(struct tipc_sock *tsk)
 	tsk->sk.sk_write_space(&tsk->sk);
 }
 
+static inline int tipc_sk_conn_cong(struct tipc_sock *tsk)
+{
+	return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
+}
+
 int tipc_sk_rcv(struct sk_buff *buf);
 
 #endif
-- 
1.7.9.5

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* Re: [PATCH net-next 01/13] tipc: eliminate case of writing to freed memory
  2014-06-26  1:41 ` [PATCH net-next 01/13] tipc: eliminate case of writing to freed memory Jon Maloy
@ 2014-06-26 10:56   ` Neil Horman
  2014-06-27  3:33     ` Jon Maloy
  0 siblings, 1 reply; 18+ messages in thread
From: Neil Horman @ 2014-06-26 10:56 UTC (permalink / raw)
  To: Jon Maloy
  Cc: davem, netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion

On Wed, Jun 25, 2014 at 08:41:30PM -0500, Jon Maloy wrote:
> In the function tipc_nodesub_notify() we call a function pointer
> aggregated into the object to be notified, whereafter we set
> the function pointer to NULL. However, in some cases the function
> pointed to will free the struct containing the function pointer,
> resulting in a write to already freed memory.
> 
> This bug seems to always have been there, without causing any
> notable harm.
> 
> In this commit we fix the problem by inverting the order of the
> zeroing and the function call.
> 
> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
> ---
>  net/tipc/node_subscr.c |    6 ++++--
>  1 file changed, 4 insertions(+), 2 deletions(-)
> 
> diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c
> index 7c59ab1..2d13eea 100644
> --- a/net/tipc/node_subscr.c
> +++ b/net/tipc/node_subscr.c
> @@ -84,11 +84,13 @@ void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub)
>  void tipc_nodesub_notify(struct list_head *nsub_list)
>  {
>  	struct tipc_node_subscr *ns, *safe;
> +	net_ev_handler handle_node_down;
>  
>  	list_for_each_entry_safe(ns, safe, nsub_list, nodesub_list) {
> -		if (ns->handle_node_down) {
> -			ns->handle_node_down(ns->usr_handle);
> +		handle_node_down = ns->handle_node_down;
> +		if (handle_node_down) {
>  			ns->handle_node_down = NULL;
> +			handle_node_down(ns->usr_handle);
>  		}
>  	}
>  }

If its true that some functions pointed to by handle_node_down free the struct
pointing to the function pointer, than this change isn't sufficient.  The only
caller to tipc_nodesub_notify is node_lost_contact, which dereferences the same
structure right after the call to tipc_nodesub_notify.

Neil

> -- 
> 1.7.9.5
> 
> --
> To unsubscribe from this list: send the line "unsubscribe netdev" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
> 

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH net-next 01/13] tipc: eliminate case of writing to freed memory
  2014-06-26 10:56   ` Neil Horman
@ 2014-06-27  3:33     ` Jon Maloy
  2014-06-27 11:41       ` Neil Horman
  0 siblings, 1 reply; 18+ messages in thread
From: Jon Maloy @ 2014-06-27  3:33 UTC (permalink / raw)
  To: Neil Horman, Jon Maloy; +Cc: netdev, Paul Gortmaker, tipc-discussion, davem

On 06/26/2014 05:56 AM, Neil Horman wrote:
> On Wed, Jun 25, 2014 at 08:41:30PM -0500, Jon Maloy wrote:
>> In the function tipc_nodesub_notify() we call a function pointer
>> aggregated into the object to be notified, whereafter we set
>> the function pointer to NULL. However, in some cases the function
>> pointed to will free the struct containing the function pointer,
>> resulting in a write to already freed memory.
>>
>> This bug seems to always have been there, without causing any
>> notable harm.
>>
>> In this commit we fix the problem by inverting the order of the
>> zeroing and the function call.
>>
>> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
>> ---
>>  net/tipc/node_subscr.c |    6 ++++--
>>  1 file changed, 4 insertions(+), 2 deletions(-)
>>
>> diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c
>> index 7c59ab1..2d13eea 100644
>> --- a/net/tipc/node_subscr.c
>> +++ b/net/tipc/node_subscr.c
>> @@ -84,11 +84,13 @@ void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub)
>>  void tipc_nodesub_notify(struct list_head *nsub_list)
>>  {
>>  	struct tipc_node_subscr *ns, *safe;
>> +	net_ev_handler handle_node_down;
>>  
>>  	list_for_each_entry_safe(ns, safe, nsub_list, nodesub_list) {
>> -		if (ns->handle_node_down) {
>> -			ns->handle_node_down(ns->usr_handle);
>> +		handle_node_down = ns->handle_node_down;
>> +		if (handle_node_down) {
>>  			ns->handle_node_down = NULL;
>> +			handle_node_down(ns->usr_handle);
>>  		}
>>  	}
>>  }
> If its true that some functions pointed to by handle_node_down free the struct
> pointing to the function pointer, than this change isn't sufficient.  The only
> caller to tipc_nodesub_notify is node_lost_contact, which dereferences the same
> structure right after the call to tipc_nodesub_notify.

In think you misunderstand. The pointer (n_ptr*) passed as parameter in
node_sub_notify() points to a struct of type tipc_node. This one is *not*
freed during the call. But it aggregates a linked list of structs of type
nsub. Those are the ones containing the mentioned function pointer, and
the ones potentially being being released during the traversal of the list.

I cannot see that any of these struct are references after the call to
node_sub_notify().

///jon
 


>
> Neil
>
>> -- 
>> 1.7.9.5
>>
>> --
>> To unsubscribe from this list: send the line "unsubscribe netdev" in
>> the body of a message to majordomo@vger.kernel.org
>> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>>


------------------------------------------------------------------------------
Open source business process management suite built on Java and Eclipse
Turn processes into business applications with Bonita BPM Community Edition
Quickly connect people, data, and systems into organized workflows
Winner of BOSSIE, CODIE, OW2 and Gartner awards
http://p.sf.net/sfu/Bonitasoft

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH net-next 01/13] tipc: eliminate case of writing to freed memory
  2014-06-27  3:33     ` Jon Maloy
@ 2014-06-27 11:41       ` Neil Horman
  0 siblings, 0 replies; 18+ messages in thread
From: Neil Horman @ 2014-06-27 11:41 UTC (permalink / raw)
  To: Jon Maloy
  Cc: Jon Maloy, davem, netdev, Paul Gortmaker, erik.hugne, ying.xue,
	tipc-discussion

On Thu, Jun 26, 2014 at 10:33:04PM -0500, Jon Maloy wrote:
> On 06/26/2014 05:56 AM, Neil Horman wrote:
> > On Wed, Jun 25, 2014 at 08:41:30PM -0500, Jon Maloy wrote:
> >> In the function tipc_nodesub_notify() we call a function pointer
> >> aggregated into the object to be notified, whereafter we set
> >> the function pointer to NULL. However, in some cases the function
> >> pointed to will free the struct containing the function pointer,
> >> resulting in a write to already freed memory.
> >>
> >> This bug seems to always have been there, without causing any
> >> notable harm.
> >>
> >> In this commit we fix the problem by inverting the order of the
> >> zeroing and the function call.
> >>
> >> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
> >> ---
> >>  net/tipc/node_subscr.c |    6 ++++--
> >>  1 file changed, 4 insertions(+), 2 deletions(-)
> >>
> >> diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c
> >> index 7c59ab1..2d13eea 100644
> >> --- a/net/tipc/node_subscr.c
> >> +++ b/net/tipc/node_subscr.c
> >> @@ -84,11 +84,13 @@ void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub)
> >>  void tipc_nodesub_notify(struct list_head *nsub_list)
> >>  {
> >>  	struct tipc_node_subscr *ns, *safe;
> >> +	net_ev_handler handle_node_down;
> >>  
> >>  	list_for_each_entry_safe(ns, safe, nsub_list, nodesub_list) {
> >> -		if (ns->handle_node_down) {
> >> -			ns->handle_node_down(ns->usr_handle);
> >> +		handle_node_down = ns->handle_node_down;
> >> +		if (handle_node_down) {
> >>  			ns->handle_node_down = NULL;
> >> +			handle_node_down(ns->usr_handle);
> >>  		}
> >>  	}
> >>  }
> > If its true that some functions pointed to by handle_node_down free the struct
> > pointing to the function pointer, than this change isn't sufficient.  The only
> > caller to tipc_nodesub_notify is node_lost_contact, which dereferences the same
> > structure right after the call to tipc_nodesub_notify.
> 
> In think you misunderstand. The pointer (n_ptr*) passed as parameter in
> node_sub_notify() points to a struct of type tipc_node. This one is *not*
> freed during the call. But it aggregates a linked list of structs of type
> nsub. Those are the ones containing the mentioned function pointer, and
> the ones potentially being being released during the traversal of the list.
> 
> I cannot see that any of these struct are references after the call to
> node_sub_notify().
> 
> ///jon
>  
You're right, I was looking at Linus' tree, not the net-next tree.  In the
former, the struct as a whole, not the list_head is passed in.

Neil

> 
> 
> >
> > Neil
> >
> >> -- 
> >> 1.7.9.5
> >>
> >> --
> >> To unsubscribe from this list: send the line "unsubscribe netdev" in
> >> the body of a message to majordomo@vger.kernel.org
> >> More majordomo info at  http://vger.kernel.org/majordomo-info.html
> >>
> 
> 

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [PATCH net-next 00/13] tipc: new unicast transmission code
  2014-06-26  1:41 [PATCH net-next 00/13] tipc: new unicast transmission code Jon Maloy
                   ` (12 preceding siblings ...)
  2014-06-26  1:41 ` [PATCH net-next 13/13] tipc: simplify connection congestion handling Jon Maloy
@ 2014-06-27 19:56 ` David Miller
  13 siblings, 0 replies; 18+ messages in thread
From: David Miller @ 2014-06-27 19:56 UTC (permalink / raw)
  To: jon.maloy
  Cc: netdev, paul.gortmaker, erik.hugne, ying.xue, maloy, tipc-discussion

From: Jon Maloy <jon.maloy@ericsson.com>
Date: Wed, 25 Jun 2014 20:41:29 -0500

> As a step towards making the data transmission code more maintainable
> and performant, we introduce a number of new functions, both for
> building, sending and rejecting messages. The new functions will
> eventually be used for alla data transmission, user data unicast,
> service internal messaging, and multicast/broadcast.
> 
> We start with this series, where we introduce the functions, and
> let user data unicast and the internal connection protocol use them.
> The remaining users will come in a later series.
> 
> There are only minor changes to data structures, and no protocol
> changes, so the older functions can still be used in parallel for
> some time. Until the old functions are removed, we use temporary
> names for the new functions, such as tipc_build_msg2, tipc_link_xmit2.
> 
> It should be noted that the first two commits are unrelated to the
> rest of the series.

Series applied, thanks Jon.

^ permalink raw reply	[flat|nested] 18+ messages in thread

end of thread, other threads:[~2014-06-27 19:56 UTC | newest]

Thread overview: 18+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2014-06-26  1:41 [PATCH net-next 00/13] tipc: new unicast transmission code Jon Maloy
2014-06-26  1:41 ` [PATCH net-next 01/13] tipc: eliminate case of writing to freed memory Jon Maloy
2014-06-26 10:56   ` Neil Horman
2014-06-27  3:33     ` Jon Maloy
2014-06-27 11:41       ` Neil Horman
2014-06-26  1:41 ` [PATCH net-next 02/13] tipc: use negative error return values in functions Jon Maloy
2014-06-26  1:41 ` [PATCH net-next 03/13] tipc: introduce send functions for chained buffers in link Jon Maloy
2014-06-26  1:41 ` [PATCH net-next 04/13] tipc: make link mtu easily accessible from socket Jon Maloy
2014-06-26  1:41 ` [PATCH net-next 05/13] tipc: introduce direct iovec to buffer chain fragmentation function Jon Maloy
2014-06-26  1:41 ` [PATCH net-next 06/13] tipc: separate building and sending of rejected messages Jon Maloy
2014-06-26  1:41 ` [PATCH net-next 07/13] tipc: introduce message evaluation function Jon Maloy
2014-06-26  1:41 ` [PATCH net-next 08/13] tipc: RDM/DGRAM transport uses new fragmenting and sending functions Jon Maloy
2014-06-26  1:41 ` [PATCH net-next 09/13] tipc: connection oriented transport uses new send functions Jon Maloy
2014-06-26  1:41 ` [PATCH net-next 10/13] tipc: let port protocol senders use new link send function Jon Maloy
2014-06-26  1:41 ` [PATCH net-next 11/13] tipc: same receive code path for connection protocol and data messages Jon Maloy
2014-06-26  1:41 ` [PATCH net-next 12/13] tipc: clean up connection protocol reception function Jon Maloy
2014-06-26  1:41 ` [PATCH net-next 13/13] tipc: simplify connection congestion handling Jon Maloy
2014-06-27 19:56 ` [PATCH net-next 00/13] tipc: new unicast transmission code David Miller

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.