All of lore.kernel.org
 help / color / mirror / Atom feed
From: David Howells <dhowells@redhat.com>
To: netdev@vger.kernel.org
Cc: Marc Dionne <marc.dionne@auristor.com>,
	linux-afs@lists.infradead.org, dhowells@redhat.com,
	linux-afs@lists.infradead.org, linux-kernel@vger.kernel.org
Subject: [PATCH net-next 29/36] rxrpc: Reduce the use of RCU in packet input
Date: Fri, 02 Dec 2022 00:19:10 +0000	[thread overview]
Message-ID: <166994035029.1732290.5164470094736354349.stgit@warthog.procyon.org.uk> (raw)
In-Reply-To: <166994010342.1732290.13771061038178613124.stgit@warthog.procyon.org.uk>

Shrink the region of rxrpc_input_packet() that is covered by the RCU read
lock so that it only covers the connection and call lookup.  This means
that the bits now outside of that can call sleepable functions such as
kmalloc and sendmsg.

Also take a ref on the conn or call we're going to use before we drop the
RCU read lock.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
---

 net/rxrpc/ar-internal.h |    3 +-
 net/rxrpc/call_accept.c |   13 ++-------
 net/rxrpc/input.c       |    7 ++---
 net/rxrpc/io_thread.c   |   68 ++++++++++++++++++++++++++++++++++++-----------
 4 files changed, 59 insertions(+), 32 deletions(-)

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 6af7298af39b..cfd16f1e5c83 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -961,8 +961,7 @@ void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
  * input.c
  */
 void rxrpc_input_call_event(struct rxrpc_call *, struct sk_buff *);
-void rxrpc_input_implicit_end_call(struct rxrpc_sock *, struct rxrpc_connection *,
-				   struct rxrpc_call *);
+void rxrpc_input_implicit_end_call(struct rxrpc_connection *, struct rxrpc_call *);
 
 /*
  * io_thread.c
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c
index 5f978b0b2404..beb8efa2e7a9 100644
--- a/net/rxrpc/call_accept.c
+++ b/net/rxrpc/call_accept.c
@@ -336,13 +336,13 @@ static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
  * If this is for a kernel service, when we allocate the call, it will have
  * three refs on it: (1) the kernel service, (2) the user_call_ID tree, (3) the
  * retainer ref obtained from the backlog buffer.  Prealloc calls for userspace
- * services only have the ref from the backlog buffer.  We want to pass this
- * ref to non-BH context to dispose of.
+ * services only have the ref from the backlog buffer.  We pass this ref to the
+ * caller.
  *
  * If we want to report an error, we mark the skb with the packet type and
  * abort code and return NULL.
  *
- * The call is returned with the user access mutex held.
+ * The call is returned with the user access mutex held and a ref on it.
  */
 struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
 					   struct rxrpc_sock *rx,
@@ -426,13 +426,6 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
 
 	rxrpc_send_ping(call, skb);
 
-	/* We have to discard the prealloc queue's ref here and rely on a
-	 * combination of the RCU read lock and refs held either by the socket
-	 * (recvmsg queue, to-be-accepted queue or user ID tree) or the kernel
-	 * service to prevent the call from being deallocated too early.
-	 */
-	rxrpc_put_call(call, rxrpc_call_put_discard_prealloc);
-
 	if (hlist_unhashed(&call->error_link)) {
 		spin_lock(&call->peer->lock);
 		hlist_add_head(&call->error_link, &call->peer->error_targets);
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 42addbcf59f9..01d32f817a7a 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -1072,8 +1072,7 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
  *
  * TODO: If callNumber > call_id + 1, renegotiate security.
  */
-void rxrpc_input_implicit_end_call(struct rxrpc_sock *rx,
-				   struct rxrpc_connection *conn,
+void rxrpc_input_implicit_end_call(struct rxrpc_connection *conn,
 				   struct rxrpc_call *call)
 {
 	switch (READ_ONCE(call->state)) {
@@ -1091,7 +1090,7 @@ void rxrpc_input_implicit_end_call(struct rxrpc_sock *rx,
 		break;
 	}
 
-	spin_lock(&rx->incoming_lock);
+	spin_lock(&conn->bundle->channel_lock);
 	__rxrpc_disconnect_call(conn, call);
-	spin_unlock(&rx->incoming_lock);
+	spin_unlock(&conn->bundle->channel_lock);
 }
diff --git a/net/rxrpc/io_thread.c b/net/rxrpc/io_thread.c
index 91b8ba5b90db..3b6927610677 100644
--- a/net/rxrpc/io_thread.c
+++ b/net/rxrpc/io_thread.c
@@ -257,6 +257,8 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
 	if (sp->hdr.serviceId == 0)
 		goto bad_message;
 
+	rcu_read_lock();
+
 	if (rxrpc_to_server(sp)) {
 		/* Weed out packets to services we're not offering.  Packets
 		 * that would begin a call are explicitly rejected and the rest
@@ -264,7 +266,9 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
 		 */
 		rx = rcu_dereference(local->service);
 		if (!rx || (sp->hdr.serviceId != rx->srx.srx_service &&
-			    sp->hdr.serviceId != rx->second_service)) {
+			    sp->hdr.serviceId != rx->second_service)
+		    ) {
+			rcu_read_unlock();
 			if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA &&
 			    sp->hdr.seq == 1)
 				goto unsupported_service;
@@ -293,7 +297,12 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
 		if (sp->hdr.callNumber == 0) {
 			/* Connection-level packet */
 			_debug("CONN %p {%d}", conn, conn->debug_id);
-			rxrpc_post_packet_to_conn(conn, skb);
+			conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_conn_input);
+			rcu_read_unlock();
+			if (conn) {
+				rxrpc_post_packet_to_conn(conn, skb);
+				rxrpc_put_connection(conn, rxrpc_conn_put_conn_input);
+			}
 			return 0;
 		}
 
@@ -305,20 +314,26 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
 		chan = &conn->channels[channel];
 
 		/* Ignore really old calls */
-		if (sp->hdr.callNumber < chan->last_call)
+		if (sp->hdr.callNumber < chan->last_call) {
+			rcu_read_unlock();
 			return 0;
+		}
 
 		if (sp->hdr.callNumber == chan->last_call) {
 			if (chan->call ||
-			    sp->hdr.type == RXRPC_PACKET_TYPE_ABORT)
+			    sp->hdr.type == RXRPC_PACKET_TYPE_ABORT) {
+				rcu_read_unlock();
 				return 0;
+			}
 
 			/* For the previous service call, if completed
 			 * successfully, we discard all further packets.
 			 */
 			if (rxrpc_conn_is_service(conn) &&
-			    chan->last_type == RXRPC_PACKET_TYPE_ACK)
+			    chan->last_type == RXRPC_PACKET_TYPE_ACK) {
+				rcu_read_unlock();
 				return 0;
+			}
 
 			/* But otherwise we need to retransmit the final packet
 			 * from data cached in the connection record.
@@ -328,20 +343,32 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
 						    sp->hdr.seq,
 						    sp->hdr.serial,
 						    sp->hdr.flags);
-			rxrpc_post_packet_to_conn(conn, skb);
+			conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input);
+			rcu_read_unlock();
+			if (conn) {
+				rxrpc_post_packet_to_conn(conn, skb);
+				rxrpc_put_connection(conn, rxrpc_conn_put_call_input);
+			}
 			return 0;
 		}
 
 		call = rcu_dereference(chan->call);
 
 		if (sp->hdr.callNumber > chan->call_id) {
-			if (rxrpc_to_client(sp))
+			if (rxrpc_to_client(sp)) {
+				rcu_read_unlock();
 				goto reject_packet;
-			if (call)
-				rxrpc_input_implicit_end_call(rx, conn, call);
-			call = NULL;
+			}
+			if (call) {
+				rxrpc_input_implicit_end_call(conn, call);
+				chan->call = NULL;
+				call = NULL;
+			}
 		}
 
+		if (call && !rxrpc_try_get_call(call, rxrpc_call_get_input))
+			call = NULL;
+
 		if (call) {
 			if (sp->hdr.serviceId != call->dest_srx.srx_service)
 				call->dest_srx.srx_service = sp->hdr.serviceId;
@@ -352,23 +379,33 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
 		}
 	}
 
-	if (!call || refcount_read(&call->ref) == 0) {
+	if (!call) {
 		if (rxrpc_to_client(sp) ||
-		    sp->hdr.type != RXRPC_PACKET_TYPE_DATA)
+		    sp->hdr.type != RXRPC_PACKET_TYPE_DATA) {
+			rcu_read_unlock();
 			goto bad_message;
-		if (sp->hdr.seq != 1)
+		}
+		if (sp->hdr.seq != 1) {
+			rcu_read_unlock();
 			return 0;
+		}
 		call = rxrpc_new_incoming_call(local, rx, skb);
-		if (!call)
+		if (!call) {
+			rcu_read_unlock();
 			goto reject_packet;
+		}
 	}
 
+	rcu_read_unlock();
+
 	/* Process a call packet. */
 	rxrpc_input_call_event(call, skb);
+	rxrpc_put_call(call, rxrpc_call_put_input);
 	trace_rxrpc_rx_done(0, 0);
 	return 0;
 
 wrong_security:
+	rcu_read_unlock();
 	trace_rxrpc_abort(0, "SEC", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
 			  RXKADINCONSISTENCY, EBADMSG);
 	skb->priority = RXKADINCONSISTENCY;
@@ -381,6 +418,7 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
 	goto post_abort;
 
 reupgrade:
+	rcu_read_unlock();
 	trace_rxrpc_abort(0, "UPG", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
 			  RX_PROTOCOL_ERROR, EBADMSG);
 	goto protocol_error;
@@ -433,9 +471,7 @@ int rxrpc_io_thread(void *data)
 			switch (skb->mark) {
 			case RXRPC_SKB_MARK_PACKET:
 				skb->priority = 0;
-				rcu_read_lock();
 				rxrpc_input_packet(local, &skb);
-				rcu_read_unlock();
 				trace_rxrpc_rx_done(skb->mark, skb->priority);
 				rxrpc_free_skb(skb, rxrpc_skb_put_input);
 				break;



  parent reply	other threads:[~2022-12-02  0:24 UTC|newest]

Thread overview: 38+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2022-12-02  0:15 [PATCH net-next 00/36] rxrpc: Increasing SACK size and moving away from softirq, parts 2 & 3 David Howells
2022-12-02  0:15 ` [PATCH net-next 01/36] rxrpc: Fix checker warning David Howells
2022-12-02  0:15 ` [PATCH net-next 02/36] rxrpc: Implement an in-kernel rxperf server for testing purposes David Howells
2022-12-02  0:15 ` [PATCH net-next 03/36] rxrpc: Fix call leak David Howells
2022-12-02  0:15 ` [PATCH net-next 04/36] rxrpc: Remove decl for rxrpc_kernel_call_is_complete() David Howells
2022-12-02  0:15 ` [PATCH net-next 05/36] rxrpc: Remove handling of duplicate packets in recvmsg_queue David Howells
2022-12-02  0:15 ` [PATCH net-next 06/36] rxrpc: Remove the [k_]proto() debugging macros David Howells
2022-12-02  0:16 ` [PATCH net-next 07/36] rxrpc: Remove the [_k]net() " David Howells
2022-12-02  0:16 ` [PATCH net-next 08/36] rxrpc: Drop rxrpc_conn_parameters from rxrpc_connection and rxrpc_bundle David Howells
2022-12-02  0:16 ` [PATCH net-next 09/36] rxrpc: Extract the code from a received ABORT packet much earlier David Howells
2022-12-02  0:16 ` [PATCH net-next 10/36] rxrpc: trace: Don't use __builtin_return_address for rxrpc_local tracing David Howells
2022-12-02  0:16 ` [PATCH net-next 11/36] rxrpc: trace: Don't use __builtin_return_address for rxrpc_peer tracing David Howells
2022-12-02  0:16 ` [PATCH net-next 12/36] rxrpc: trace: Don't use __builtin_return_address for rxrpc_conn tracing David Howells
2022-12-02  0:16 ` [PATCH net-next 13/36] rxrpc: trace: Don't use __builtin_return_address for rxrpc_call tracing David Howells
2022-12-02  0:17 ` [PATCH net-next 14/36] rxrpc: Trace rxrpc_bundle refcount David Howells
2022-12-02  0:17 ` [PATCH net-next 15/36] rxrpc: trace: Don't use __builtin_return_address for sk_buff tracing David Howells
2022-12-02  0:17 ` [PATCH net-next 16/36] rxrpc: Don't hold a ref for call timer or workqueue David Howells
2022-12-02  0:17 ` [PATCH net-next 17/36] rxrpc: Don't hold a ref for connection workqueue David Howells
2022-12-02  0:17 ` [PATCH net-next 18/36] rxrpc: Split the receive code David Howells
2022-12-02  0:17 ` [PATCH net-next 19/36] rxrpc: Create a per-local endpoint receive queue and I/O thread David Howells
2022-12-02  0:17 ` [PATCH net-next 20/36] rxrpc: Move packet reception processing into " David Howells
2022-12-02  0:18 ` [PATCH net-next 21/36] rxrpc: Move error processing into the local endpoint " David Howells
2022-12-02  0:18 ` [PATCH net-next 22/36] rxrpc: Remove call->input_lock David Howells
2022-12-02  0:18 ` [PATCH net-next 23/36] rxrpc: Don't use sk->sk_receive_queue.lock to guard socket state changes David Howells
2022-12-02  0:18 ` [PATCH net-next 24/36] rxrpc: Implement a mechanism to send an event notification to a call David Howells
2022-12-02  0:18 ` [PATCH net-next 25/36] rxrpc: Copy client call parameters into rxrpc_call earlier David Howells
2022-12-02  0:18 ` [PATCH net-next 26/36] rxrpc: Move DATA transmission into call processor work item David Howells
2022-12-02  0:18 ` [PATCH net-next 27/36] rxrpc: Remove RCU from peer->error_targets list David Howells
2022-12-02  0:19 ` [PATCH net-next 28/36] rxrpc: Simplify skbuff accounting in receive path David Howells
2022-12-02  0:19 ` David Howells [this message]
2022-12-02  0:19 ` [PATCH net-next 30/36] rxrpc: Extract the peer address from an incoming packet earlier David Howells
2022-12-02  0:19 ` [PATCH net-next 31/36] rxrpc: Make the I/O thread take over the call and local processor work David Howells
2022-12-02  0:19 ` [PATCH net-next 32/36] rxrpc: Remove the _bh annotation from all the spinlocks David Howells
2022-12-02  0:19 ` [PATCH net-next 33/36] rxrpc: Trace/count transmission underflows and cwnd resets David Howells
2022-12-02  0:19 ` [PATCH net-next 34/36] rxrpc: Move the cwnd degradation after transmitting packets David Howells
2022-12-02  0:20 ` [PATCH net-next 35/36] rxrpc: Fold __rxrpc_unuse_local() into rxrpc_unuse_local() David Howells
2022-12-02  0:20 ` [PATCH net-next 36/36] rxrpc: Transmit ACKs at the point of generation David Howells
2022-12-05 11:10 ` [PATCH net-next 00/36] rxrpc: Increasing SACK size and moving away from softirq, parts 2 & 3 patchwork-bot+netdevbpf

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=166994035029.1732290.5164470094736354349.stgit@warthog.procyon.org.uk \
    --to=dhowells@redhat.com \
    --cc=linux-afs@lists.infradead.org \
    --cc=linux-kernel@vger.kernel.org \
    --cc=marc.dionne@auristor.com \
    --cc=netdev@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.