Netdev Archive on lore.kernel.org
 help / color / Atom feed
* [net-next v2 1/1] tipc: clean up skb list lock handling on send path
@ 2019-08-15 14:42 Jon Maloy
  2019-08-16  5:29 ` Xin Long
  2019-08-18 21:01 ` David Miller
  0 siblings, 2 replies; 3+ messages in thread
From: Jon Maloy @ 2019-08-15 14:42 UTC (permalink / raw)
  To: davem, netdev
  Cc: tung.q.nguyen, hoang.h.le, jon.maloy, lxin, shuali, ying.xue,
	edumazet, tipc-discussion

The policy for handling the skb list locks on the send and receive paths
is simple.

- On the send path we never need to grab the lock on the 'xmitq' list
  when the destination is an exernal node.

- On the receive path we always need to grab the lock on the 'inputq'
  list, irrespective of source node.

However, when transmitting node local messages those will eventually
end up on the receive path of a local socket, meaning that the argument
'xmitq' in tipc_node_xmit() will become the 'ínputq' argument in  the
function tipc_sk_rcv(). This has been handled by always initializing
the spinlock of the 'xmitq' list at message creation, just in case it
may end up on the receive path later, and despite knowing that the lock
in most cases never will be used.

This approach is inaccurate and confusing, and has also concealed the
fact that the stated 'no lock grabbing' policy for the send path is
violated in some cases.

We now clean up this by never initializing the lock at message creation,
instead doing this at the moment we find that the message actually will
enter the receive path. At the same time we fix the four locations
where we incorrectly access the spinlock on the send/error path.

This patch also reverts commit d12cffe9329f ("tipc: ensure head->lock
is initialised") which has now become redundant.

CC: Eric Dumazet <edumazet@google.com>
Reported-by: Chris Packham <chris.packham@alliedtelesis.co.nz>
Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>

---
v2: removed more unnecessary lock initializations after feedback
    from Xin Long.
---
 net/tipc/bcast.c      | 10 +++++-----
 net/tipc/group.c      |  4 ++--
 net/tipc/link.c       | 14 +++++++-------
 net/tipc/name_distr.c |  2 +-
 net/tipc/node.c       |  7 ++++---
 net/tipc/socket.c     | 14 +++++++-------
 6 files changed, 26 insertions(+), 25 deletions(-)

diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 34f3e56..6ef1abd 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -185,7 +185,7 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
 	}
 
 	/* We have to transmit across all bearers */
-	skb_queue_head_init(&_xmitq);
+	__skb_queue_head_init(&_xmitq);
 	for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
 		if (!bb->dests[bearer_id])
 			continue;
@@ -256,7 +256,7 @@ static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
 	struct sk_buff_head xmitq;
 	int rc = 0;
 
-	skb_queue_head_init(&xmitq);
+	__skb_queue_head_init(&xmitq);
 	tipc_bcast_lock(net);
 	if (tipc_link_bc_peers(l))
 		rc = tipc_link_xmit(l, pkts, &xmitq);
@@ -286,7 +286,7 @@ static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
 	u32 dnode, selector;
 
 	selector = msg_link_selector(buf_msg(skb_peek(pkts)));
-	skb_queue_head_init(&_pkts);
+	__skb_queue_head_init(&_pkts);
 
 	list_for_each_entry_safe(dst, tmp, &dests->list, list) {
 		dnode = dst->node;
@@ -344,7 +344,7 @@ static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb,
 	msg_set_size(_hdr, MCAST_H_SIZE);
 	msg_set_is_rcast(_hdr, !msg_is_rcast(hdr));
 
-	skb_queue_head_init(&tmpq);
+	__skb_queue_head_init(&tmpq);
 	__skb_queue_tail(&tmpq, _skb);
 	if (method->rcast)
 		tipc_bcast_xmit(net, &tmpq, cong_link_cnt);
@@ -378,7 +378,7 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
 	int rc = 0;
 
 	skb_queue_head_init(&inputq);
-	skb_queue_head_init(&localq);
+	__skb_queue_head_init(&localq);
 
 	/* Clone packets before they are consumed by next call */
 	if (dests->local && !tipc_msg_reassemble(pkts, &localq)) {
diff --git a/net/tipc/group.c b/net/tipc/group.c
index 5f98d38..89257e2 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -199,7 +199,7 @@ void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcvbuf)
 	struct tipc_member *m, *tmp;
 	struct sk_buff_head xmitq;
 
-	skb_queue_head_init(&xmitq);
+	__skb_queue_head_init(&xmitq);
 	rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
 		tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq);
 		tipc_group_update_member(m, 0);
@@ -435,7 +435,7 @@ bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
 		return true;
 	if (state == MBR_PENDING && adv == ADV_IDLE)
 		return true;
-	skb_queue_head_init(&xmitq);
+	__skb_queue_head_init(&xmitq);
 	tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq);
 	tipc_node_distr_xmit(grp->net, &xmitq);
 	return true;
diff --git a/net/tipc/link.c b/net/tipc/link.c
index dd3155b..289e848 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -959,7 +959,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
 		pr_warn("Too large msg, purging xmit list %d %d %d %d %d!\n",
 			skb_queue_len(list), msg_user(hdr),
 			msg_type(hdr), msg_size(hdr), mtu);
-		skb_queue_purge(list);
+		__skb_queue_purge(list);
 		return -EMSGSIZE;
 	}
 
@@ -988,7 +988,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
 		if (likely(skb_queue_len(transmq) < maxwin)) {
 			_skb = skb_clone(skb, GFP_ATOMIC);
 			if (!_skb) {
-				skb_queue_purge(list);
+				__skb_queue_purge(list);
 				return -ENOBUFS;
 			}
 			__skb_dequeue(list);
@@ -1668,7 +1668,7 @@ void tipc_link_create_dummy_tnl_msg(struct tipc_link *l,
 	struct sk_buff *skb;
 	u32 dnode = l->addr;
 
-	skb_queue_head_init(&tnlq);
+	__skb_queue_head_init(&tnlq);
 	skb = tipc_msg_create(TUNNEL_PROTOCOL, FAILOVER_MSG,
 			      INT_H_SIZE, BASIC_H_SIZE,
 			      dnode, onode, 0, 0, 0);
@@ -1708,9 +1708,9 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
 	if (!tnl)
 		return;
 
-	skb_queue_head_init(&tnlq);
-	skb_queue_head_init(&tmpxq);
-	skb_queue_head_init(&frags);
+	__skb_queue_head_init(&tnlq);
+	__skb_queue_head_init(&tmpxq);
+	__skb_queue_head_init(&frags);
 
 	/* At least one packet required for safe algorithm => add dummy */
 	skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG,
@@ -1720,7 +1720,7 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
 		pr_warn("%sunable to create tunnel packet\n", link_co_err);
 		return;
 	}
-	skb_queue_tail(&tnlq, skb);
+	__skb_queue_tail(&tnlq, skb);
 	tipc_link_xmit(l, &tnlq, &tmpxq);
 	__skb_queue_purge(&tmpxq);
 
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 44abc8e..61219f0 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -190,7 +190,7 @@ void tipc_named_node_up(struct net *net, u32 dnode)
 	struct name_table *nt = tipc_name_table(net);
 	struct sk_buff_head head;
 
-	skb_queue_head_init(&head);
+	__skb_queue_head_init(&head);
 
 	read_lock_bh(&nt->cluster_scope_lock);
 	named_distribute(net, &head, dnode, &nt->cluster_scope);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 1bdcf0f..c8f6177 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1444,13 +1444,14 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
 
 	if (in_own_node(net, dnode)) {
 		tipc_loopback_trace(net, list);
+		spin_lock_init(&list->lock);
 		tipc_sk_rcv(net, list);
 		return 0;
 	}
 
 	n = tipc_node_find(net, dnode);
 	if (unlikely(!n)) {
-		skb_queue_purge(list);
+		__skb_queue_purge(list);
 		return -EHOSTUNREACH;
 	}
 
@@ -1459,7 +1460,7 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
 	if (unlikely(bearer_id == INVALID_BEARER_ID)) {
 		tipc_node_read_unlock(n);
 		tipc_node_put(n);
-		skb_queue_purge(list);
+		__skb_queue_purge(list);
 		return -EHOSTUNREACH;
 	}
 
@@ -1491,7 +1492,7 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
 {
 	struct sk_buff_head head;
 
-	skb_queue_head_init(&head);
+	__skb_queue_head_init(&head);
 	__skb_queue_tail(&head, skb);
 	tipc_node_xmit(net, &head, dnode, selector);
 	return 0;
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 83ae41d..3b9f8cc 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -809,7 +809,7 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
 	msg_set_nameupper(hdr, seq->upper);
 
 	/* Build message as chain of buffers */
-	skb_queue_head_init(&pkts);
+	__skb_queue_head_init(&pkts);
 	rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
 
 	/* Send message if build was successful */
@@ -853,7 +853,7 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
 	msg_set_grp_bc_seqno(hdr, bc_snd_nxt);
 
 	/* Build message as chain of buffers */
-	skb_queue_head_init(&pkts);
+	__skb_queue_head_init(&pkts);
 	mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
 	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
 	if (unlikely(rc != dlen))
@@ -1058,7 +1058,7 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
 	msg_set_grp_bc_ack_req(hdr, ack);
 
 	/* Build message as chain of buffers */
-	skb_queue_head_init(&pkts);
+	__skb_queue_head_init(&pkts);
 	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
 	if (unlikely(rc != dlen))
 		return rc;
@@ -1387,7 +1387,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
 	if (unlikely(rc))
 		return rc;
 
-	skb_queue_head_init(&pkts);
+	__skb_queue_head_init(&pkts);
 	mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
 	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
 	if (unlikely(rc != dlen))
@@ -1445,7 +1445,7 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
 	int send, sent = 0;
 	int rc = 0;
 
-	skb_queue_head_init(&pkts);
+	__skb_queue_head_init(&pkts);
 
 	if (unlikely(dlen > INT_MAX))
 		return -EMSGSIZE;
@@ -1805,7 +1805,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
 
 	/* Send group flow control advertisement when applicable */
 	if (tsk->group && msg_in_group(hdr) && !grp_evt) {
-		skb_queue_head_init(&xmitq);
+		__skb_queue_head_init(&xmitq);
 		tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
 					  msg_orignode(hdr), msg_origport(hdr),
 					  &xmitq);
@@ -2674,7 +2674,7 @@ static void tipc_sk_timeout(struct timer_list *t)
 	struct sk_buff_head list;
 	int rc = 0;
 
-	skb_queue_head_init(&list);
+	__skb_queue_head_init(&list);
 	bh_lock_sock(sk);
 
 	/* Try again later if socket is busy */
-- 
2.1.4


^ permalink raw reply	[flat|nested] 3+ messages in thread

* Re: [net-next v2 1/1] tipc: clean up skb list lock handling on send path
  2019-08-15 14:42 [net-next v2 1/1] tipc: clean up skb list lock handling on send path Jon Maloy
@ 2019-08-16  5:29 ` Xin Long
  2019-08-18 21:01 ` David Miller
  1 sibling, 0 replies; 3+ messages in thread
From: Xin Long @ 2019-08-16  5:29 UTC (permalink / raw)
  To: Jon Maloy
  Cc: davem, network dev, tung.q.nguyen, hoang.h.le, Long Xin, shuali,
	Ying Xue, Eric Dumazet, tipc-discussion

On Thu, Aug 15, 2019 at 11:36 PM Jon Maloy <jon.maloy@ericsson.com> wrote:
>
> The policy for handling the skb list locks on the send and receive paths
> is simple.
>
> - On the send path we never need to grab the lock on the 'xmitq' list
>   when the destination is an exernal node.
>
> - On the receive path we always need to grab the lock on the 'inputq'
>   list, irrespective of source node.
>
> However, when transmitting node local messages those will eventually
> end up on the receive path of a local socket, meaning that the argument
> 'xmitq' in tipc_node_xmit() will become the 'ínputq' argument in  the
> function tipc_sk_rcv(). This has been handled by always initializing
> the spinlock of the 'xmitq' list at message creation, just in case it
> may end up on the receive path later, and despite knowing that the lock
> in most cases never will be used.
>
> This approach is inaccurate and confusing, and has also concealed the
> fact that the stated 'no lock grabbing' policy for the send path is
> violated in some cases.
>
> We now clean up this by never initializing the lock at message creation,
> instead doing this at the moment we find that the message actually will
> enter the receive path. At the same time we fix the four locations
> where we incorrectly access the spinlock on the send/error path.
>
> This patch also reverts commit d12cffe9329f ("tipc: ensure head->lock
> is initialised") which has now become redundant.
>
> CC: Eric Dumazet <edumazet@google.com>
> Reported-by: Chris Packham <chris.packham@alliedtelesis.co.nz>
> Acked-by: Ying Xue <ying.xue@windriver.com>
> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
>
> ---
> v2: removed more unnecessary lock initializations after feedback
>     from Xin Long.
> ---
>  net/tipc/bcast.c      | 10 +++++-----
>  net/tipc/group.c      |  4 ++--
>  net/tipc/link.c       | 14 +++++++-------
>  net/tipc/name_distr.c |  2 +-
>  net/tipc/node.c       |  7 ++++---
>  net/tipc/socket.c     | 14 +++++++-------
>  6 files changed, 26 insertions(+), 25 deletions(-)
>
> diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
> index 34f3e56..6ef1abd 100644
> --- a/net/tipc/bcast.c
> +++ b/net/tipc/bcast.c
> @@ -185,7 +185,7 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
>         }
>
>         /* We have to transmit across all bearers */
> -       skb_queue_head_init(&_xmitq);
> +       __skb_queue_head_init(&_xmitq);
>         for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
>                 if (!bb->dests[bearer_id])
>                         continue;
> @@ -256,7 +256,7 @@ static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
>         struct sk_buff_head xmitq;
>         int rc = 0;
>
> -       skb_queue_head_init(&xmitq);
> +       __skb_queue_head_init(&xmitq);
>         tipc_bcast_lock(net);
>         if (tipc_link_bc_peers(l))
>                 rc = tipc_link_xmit(l, pkts, &xmitq);
> @@ -286,7 +286,7 @@ static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
>         u32 dnode, selector;
>
>         selector = msg_link_selector(buf_msg(skb_peek(pkts)));
> -       skb_queue_head_init(&_pkts);
> +       __skb_queue_head_init(&_pkts);
>
>         list_for_each_entry_safe(dst, tmp, &dests->list, list) {
>                 dnode = dst->node;
> @@ -344,7 +344,7 @@ static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb,
>         msg_set_size(_hdr, MCAST_H_SIZE);
>         msg_set_is_rcast(_hdr, !msg_is_rcast(hdr));
>
> -       skb_queue_head_init(&tmpq);
> +       __skb_queue_head_init(&tmpq);
>         __skb_queue_tail(&tmpq, _skb);
>         if (method->rcast)
>                 tipc_bcast_xmit(net, &tmpq, cong_link_cnt);
> @@ -378,7 +378,7 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
>         int rc = 0;
>
>         skb_queue_head_init(&inputq);
> -       skb_queue_head_init(&localq);
> +       __skb_queue_head_init(&localq);
>
>         /* Clone packets before they are consumed by next call */
>         if (dests->local && !tipc_msg_reassemble(pkts, &localq)) {
> diff --git a/net/tipc/group.c b/net/tipc/group.c
> index 5f98d38..89257e2 100644
> --- a/net/tipc/group.c
> +++ b/net/tipc/group.c
> @@ -199,7 +199,7 @@ void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcvbuf)
>         struct tipc_member *m, *tmp;
>         struct sk_buff_head xmitq;
>
> -       skb_queue_head_init(&xmitq);
> +       __skb_queue_head_init(&xmitq);
>         rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
>                 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq);
>                 tipc_group_update_member(m, 0);
> @@ -435,7 +435,7 @@ bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
>                 return true;
>         if (state == MBR_PENDING && adv == ADV_IDLE)
>                 return true;
> -       skb_queue_head_init(&xmitq);
> +       __skb_queue_head_init(&xmitq);
>         tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq);
>         tipc_node_distr_xmit(grp->net, &xmitq);
>         return true;
> diff --git a/net/tipc/link.c b/net/tipc/link.c
> index dd3155b..289e848 100644
> --- a/net/tipc/link.c
> +++ b/net/tipc/link.c
> @@ -959,7 +959,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
>                 pr_warn("Too large msg, purging xmit list %d %d %d %d %d!\n",
>                         skb_queue_len(list), msg_user(hdr),
>                         msg_type(hdr), msg_size(hdr), mtu);
> -               skb_queue_purge(list);
> +               __skb_queue_purge(list);
>                 return -EMSGSIZE;
>         }
>
> @@ -988,7 +988,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
>                 if (likely(skb_queue_len(transmq) < maxwin)) {
>                         _skb = skb_clone(skb, GFP_ATOMIC);
>                         if (!_skb) {
> -                               skb_queue_purge(list);
> +                               __skb_queue_purge(list);
>                                 return -ENOBUFS;
>                         }
>                         __skb_dequeue(list);
> @@ -1668,7 +1668,7 @@ void tipc_link_create_dummy_tnl_msg(struct tipc_link *l,
>         struct sk_buff *skb;
>         u32 dnode = l->addr;
>
> -       skb_queue_head_init(&tnlq);
> +       __skb_queue_head_init(&tnlq);
>         skb = tipc_msg_create(TUNNEL_PROTOCOL, FAILOVER_MSG,
>                               INT_H_SIZE, BASIC_H_SIZE,
>                               dnode, onode, 0, 0, 0);
> @@ -1708,9 +1708,9 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
>         if (!tnl)
>                 return;
>
> -       skb_queue_head_init(&tnlq);
> -       skb_queue_head_init(&tmpxq);
> -       skb_queue_head_init(&frags);
> +       __skb_queue_head_init(&tnlq);
> +       __skb_queue_head_init(&tmpxq);
> +       __skb_queue_head_init(&frags);
>
>         /* At least one packet required for safe algorithm => add dummy */
>         skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG,
> @@ -1720,7 +1720,7 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
>                 pr_warn("%sunable to create tunnel packet\n", link_co_err);
>                 return;
>         }
> -       skb_queue_tail(&tnlq, skb);
> +       __skb_queue_tail(&tnlq, skb);
>         tipc_link_xmit(l, &tnlq, &tmpxq);
>         __skb_queue_purge(&tmpxq);
>
> diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
> index 44abc8e..61219f0 100644
> --- a/net/tipc/name_distr.c
> +++ b/net/tipc/name_distr.c
> @@ -190,7 +190,7 @@ void tipc_named_node_up(struct net *net, u32 dnode)
>         struct name_table *nt = tipc_name_table(net);
>         struct sk_buff_head head;
>
> -       skb_queue_head_init(&head);
> +       __skb_queue_head_init(&head);
>
>         read_lock_bh(&nt->cluster_scope_lock);
>         named_distribute(net, &head, dnode, &nt->cluster_scope);
> diff --git a/net/tipc/node.c b/net/tipc/node.c
> index 1bdcf0f..c8f6177 100644
> --- a/net/tipc/node.c
> +++ b/net/tipc/node.c
> @@ -1444,13 +1444,14 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
>
>         if (in_own_node(net, dnode)) {
>                 tipc_loopback_trace(net, list);
> +               spin_lock_init(&list->lock);
>                 tipc_sk_rcv(net, list);
>                 return 0;
>         }
>
>         n = tipc_node_find(net, dnode);
>         if (unlikely(!n)) {
> -               skb_queue_purge(list);
> +               __skb_queue_purge(list);
>                 return -EHOSTUNREACH;
>         }
>
> @@ -1459,7 +1460,7 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
>         if (unlikely(bearer_id == INVALID_BEARER_ID)) {
>                 tipc_node_read_unlock(n);
>                 tipc_node_put(n);
> -               skb_queue_purge(list);
> +               __skb_queue_purge(list);
>                 return -EHOSTUNREACH;
>         }
>
> @@ -1491,7 +1492,7 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
>  {
>         struct sk_buff_head head;
>
> -       skb_queue_head_init(&head);
> +       __skb_queue_head_init(&head);
>         __skb_queue_tail(&head, skb);
>         tipc_node_xmit(net, &head, dnode, selector);
>         return 0;
> diff --git a/net/tipc/socket.c b/net/tipc/socket.c
> index 83ae41d..3b9f8cc 100644
> --- a/net/tipc/socket.c
> +++ b/net/tipc/socket.c
> @@ -809,7 +809,7 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
>         msg_set_nameupper(hdr, seq->upper);
>
>         /* Build message as chain of buffers */
> -       skb_queue_head_init(&pkts);
> +       __skb_queue_head_init(&pkts);
>         rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
>
>         /* Send message if build was successful */
> @@ -853,7 +853,7 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
>         msg_set_grp_bc_seqno(hdr, bc_snd_nxt);
>
>         /* Build message as chain of buffers */
> -       skb_queue_head_init(&pkts);
> +       __skb_queue_head_init(&pkts);
>         mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
>         rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
>         if (unlikely(rc != dlen))
> @@ -1058,7 +1058,7 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
>         msg_set_grp_bc_ack_req(hdr, ack);
>
>         /* Build message as chain of buffers */
> -       skb_queue_head_init(&pkts);
> +       __skb_queue_head_init(&pkts);
>         rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
>         if (unlikely(rc != dlen))
>                 return rc;
> @@ -1387,7 +1387,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
>         if (unlikely(rc))
>                 return rc;
>
> -       skb_queue_head_init(&pkts);
> +       __skb_queue_head_init(&pkts);
>         mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
>         rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
>         if (unlikely(rc != dlen))
> @@ -1445,7 +1445,7 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
>         int send, sent = 0;
>         int rc = 0;
>
> -       skb_queue_head_init(&pkts);
> +       __skb_queue_head_init(&pkts);
>
>         if (unlikely(dlen > INT_MAX))
>                 return -EMSGSIZE;
> @@ -1805,7 +1805,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
>
>         /* Send group flow control advertisement when applicable */
>         if (tsk->group && msg_in_group(hdr) && !grp_evt) {
> -               skb_queue_head_init(&xmitq);
> +               __skb_queue_head_init(&xmitq);
>                 tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
>                                           msg_orignode(hdr), msg_origport(hdr),
>                                           &xmitq);
> @@ -2674,7 +2674,7 @@ static void tipc_sk_timeout(struct timer_list *t)
>         struct sk_buff_head list;
>         int rc = 0;
>
> -       skb_queue_head_init(&list);
> +       __skb_queue_head_init(&list);
>         bh_lock_sock(sk);
>
>         /* Try again later if socket is busy */
> --
> 2.1.4
>
Reviewed-by: Xin Long <lucien.xin@gmail.com>

^ permalink raw reply	[flat|nested] 3+ messages in thread

* Re: [net-next v2 1/1] tipc: clean up skb list lock handling on send path
  2019-08-15 14:42 [net-next v2 1/1] tipc: clean up skb list lock handling on send path Jon Maloy
  2019-08-16  5:29 ` Xin Long
@ 2019-08-18 21:01 ` David Miller
  1 sibling, 0 replies; 3+ messages in thread
From: David Miller @ 2019-08-18 21:01 UTC (permalink / raw)
  To: jon.maloy
  Cc: netdev, tung.q.nguyen, hoang.h.le, lxin, shuali, ying.xue,
	edumazet, tipc-discussion

From: Jon Maloy <jon.maloy@ericsson.com>
Date: Thu, 15 Aug 2019 16:42:50 +0200

> The policy for handling the skb list locks on the send and receive paths
> is simple.
> 
> - On the send path we never need to grab the lock on the 'xmitq' list
>   when the destination is an exernal node.
> 
> - On the receive path we always need to grab the lock on the 'inputq'
>   list, irrespective of source node.
> 
> However, when transmitting node local messages those will eventually
> end up on the receive path of a local socket, meaning that the argument
> 'xmitq' in tipc_node_xmit() will become the 'ínputq' argument in  the
> function tipc_sk_rcv(). This has been handled by always initializing
> the spinlock of the 'xmitq' list at message creation, just in case it
> may end up on the receive path later, and despite knowing that the lock
> in most cases never will be used.
> 
> This approach is inaccurate and confusing, and has also concealed the
> fact that the stated 'no lock grabbing' policy for the send path is
> violated in some cases.
> 
> We now clean up this by never initializing the lock at message creation,
> instead doing this at the moment we find that the message actually will
> enter the receive path. At the same time we fix the four locations
> where we incorrectly access the spinlock on the send/error path.
> 
> This patch also reverts commit d12cffe9329f ("tipc: ensure head->lock
> is initialised") which has now become redundant.
> 
> CC: Eric Dumazet <edumazet@google.com>
> Reported-by: Chris Packham <chris.packham@alliedtelesis.co.nz>
> Acked-by: Ying Xue <ying.xue@windriver.com>
> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>

Applied.

^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, back to index

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-08-15 14:42 [net-next v2 1/1] tipc: clean up skb list lock handling on send path Jon Maloy
2019-08-16  5:29 ` Xin Long
2019-08-18 21:01 ` David Miller

Netdev Archive on lore.kernel.org

Archives are clonable:
	git clone --mirror https://lore.kernel.org/netdev/0 netdev/git/0.git
	git clone --mirror https://lore.kernel.org/netdev/1 netdev/git/1.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 netdev netdev/ https://lore.kernel.org/netdev \
		netdev@vger.kernel.org netdev@archiver.kernel.org
	public-inbox-index netdev


Newsgroup available over NNTP:
	nntp://nntp.lore.kernel.org/org.kernel.vger.netdev


AGPL code for this site: git clone https://public-inbox.org/ public-inbox