All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation
@ 2016-06-30 14:10 David Howells
  2016-06-30 14:10 ` [PATCH net-next 01/19] rxrpc: Check the source of a packet to a client conn David Howells
                   ` (19 more replies)
  0 siblings, 20 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:10 UTC (permalink / raw)
  To: davem; +Cc: dhowells, netdev, linux-afs, linux-kernel



Here's the next part of the AF_RXRPC rewrite.  The two main purposes of
this set are to fix the call number handling and to make use of RCU when
looking up the connection or call to pass a received packet to.

Important changes in this set include:

 (1) Avoidance of placing stack data into SG lists in rxkad so that kernel
     stacks can become vmalloc'd (Herbert Xu).

 (2) Calls cease pinning the connection they used as soon as possible,
     which allows the connection to be discarded sooner and allows the call
     channel on that connection to be reused earlier.

 (3) Make each call channel on a connection have a separate and independent
     call number space rather than having a shared number space for the
     connection.  Call numbers should increment monotonically per channel
     on the client, and the server should ignore a call with a lower call
     number for that channel than the latest it has seen.  The RESPONSE
     packet sets the minimum values of each call ID counter on a
     connection.

 (4) Look up calls by indexing the channel array on a connection rather
     than by keeping calls in an rbtree on that connection.

 (5) Call terminal statuses are cached in the channel array for the last
     call.  It is assumed that if we the server have seen call N, then the
     client no longer cares about call N-1 on the same channel.

     This will allow retransmission of the terminal status in future
     without the need to keep the rxrpc_call struct around.

 (6) Peer lookups are moved out of common connection handling code and into
     service connection handling code as client connections (a) must point
     to a peer before they can be used and (b) are looked up by a
     machine-unique connection ID directly, so we only need to look up the
     peer first if we're going to deal with a service call.

 (7) The reference count on a connection is held elevated by 1 whilst it is
     alive (ie. idle unused connections have a refcount of 1).  The reaper
     will attempt to change the refcount from 1->0 and skip if this cannot
     be done, whilst look ups only increment the refcount if it's non-zero.

     This makes the implementation of RCU lookups easier as we don't have
     to get a ref on the connection or a lock on the connection list to
     prevent a connection being reaped whilst we're contemplating queueing
     a packet that initiates a new service call upon it.

     If we need to get a connection, but there's a dead connection in the
     tree, we use rb_replace_node() to replace the dead one with a new one.

 (8) Use a seqlock to validate the walk over the service connection rbtree
     attached to a peer when it's being walked in RCU mode.

 (9) Make the incoming call/connection packet handling code use RCU mode
     and locks and make it only take a reference if the call/connection
     gets queued on a workqueue.

The intention is that the next set will introduce the connection lifetime
management and capacity limits to prevent clients from overloading the
server.


There are some fixes too:

 (1) Verifying that a packet coming in to a client connection came from the
     expected source.

 (2) Fix handling of connection failure in client call creation where we
     don't reinitialise the list linkage block and a second attempt to
     unlink the failed connection oopses and also we don't set the state
     correctly, which causes an assertion failure.

 (3) New service calls were being added to the socket's accept queue under
     the wrong lock.


The patches can be found here also:

	http://git.kernel.org/cgit/linux/kernel/git/dhowells/linux-fs.git/log/?h=rxrpc-rewrite

Tagged thusly:

	git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs.git
	rxrpc-rewrite-20160630

David
---
David Howells (18):
      rxrpc: Check the source of a packet to a client conn
      rxrpc: Provide more refcount helper functions
      rxrpc: Dup the main conn list for the proc interface
      rxrpc: Turn connection #defines into enums and put outside struct def
      rxrpc: Check that the client conns cache is empty before module removal
      rxrpc: Move usage count getting into rxrpc_queue_conn()
      rxrpc: Fix handling of connection failure in client call creation
      rxrpc: Release a call's connection ref on call disconnection
      rxrpc: Add RCU destruction for connections and calls
      rxrpc: Access socket accept queue under right lock
      rxrpc: Call channels should have separate call number spaces
      rxrpc: Split client connection code out into its own file
      rxrpc: Split service connection code out into its own file
      rxrpc: Move peer lookup from call-accept to new-incoming-conn
      rxrpc: Maintain an extra ref on a conn for the cache list
      rxrpc: Prune the contents of the rxrpc_conn_proto struct
      rxrpc: Move data_ready peer lookup into rxrpc_find_connection()
      rxrpc: Use RCU to access a peer's service connection tree

Herbert Xu (1):
      rxrpc: Avoid using stack memory in SG lists in rxkad


 net/rxrpc/Makefile       |    1 
 net/rxrpc/af_rxrpc.c     |   20 -
 net/rxrpc/ar-internal.h  |  145 +++++++---
 net/rxrpc/call_accept.c  |   38 +--
 net/rxrpc/call_event.c   |   14 +
 net/rxrpc/call_object.c  |  100 +++----
 net/rxrpc/conn_client.c  |  283 ++++++++++++++++++++
 net/rxrpc/conn_event.c   |   39 +--
 net/rxrpc/conn_object.c  |  639 +++++++++++-----------------------------------
 net/rxrpc/conn_service.c |  231 +++++++++++++++++
 net/rxrpc/input.c        |   69 ++---
 net/rxrpc/insecure.c     |    7 -
 net/rxrpc/local_object.c |   19 +
 net/rxrpc/peer_object.c  |    2 
 net/rxrpc/proc.c         |   31 +-
 net/rxrpc/rxkad.c        |  191 +++++---------
 net/rxrpc/utils.c        |   37 ++-
 17 files changed, 989 insertions(+), 877 deletions(-)
 create mode 100644 net/rxrpc/conn_service.c

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [PATCH net-next 01/19] rxrpc: Check the source of a packet to a client conn
  2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
@ 2016-06-30 14:10 ` David Howells
  2016-06-30 14:10 ` [PATCH net-next 02/19] rxrpc: Avoid using stack memory in SG lists in rxkad David Howells
                   ` (18 subsequent siblings)
  19 siblings, 0 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:10 UTC (permalink / raw)
  To: davem; +Cc: dhowells, netdev, linux-afs, linux-kernel

When looking up a client connection to which to route a packet, we need to
check that the packet came from the correct source so that a peer can't try
to muck around with another peer's connection.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/conn_object.c |    4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index 4bfad7cf96cb..9381938187ac 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -509,7 +509,9 @@ struct rxrpc_connection *rxrpc_find_connection(struct rxrpc_local *local,
 		}
 	} else {
 		conn = idr_find(&rxrpc_client_conn_ids, cid >> RXRPC_CIDSHIFT);
-		if (conn && conn->proto.epoch == epoch)
+		if (conn &&
+		    conn->proto.epoch == epoch &&
+		    conn->params.peer == peer)
 			goto found;
 	}
 

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [PATCH net-next 02/19] rxrpc: Avoid using stack memory in SG lists in rxkad
  2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
  2016-06-30 14:10 ` [PATCH net-next 01/19] rxrpc: Check the source of a packet to a client conn David Howells
@ 2016-06-30 14:10 ` David Howells
  2016-06-30 14:11 ` [PATCH net-next 03/19] rxrpc: Provide more refcount helper functions David Howells
                   ` (17 subsequent siblings)
  19 siblings, 0 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:10 UTC (permalink / raw)
  To: davem
  Cc: Herbert Xu, netdev, linux-kernel, dhowells, Andy Lutomirski, linux-afs

From: Herbert Xu <herbert@gondor.apana.org.au>

rxkad uses stack memory in SG lists which would not work if stacks were
allocated from vmalloc memory.  In fact, in most cases this isn't even
necessary as the stack memory ends up getting copied over to kmalloc
memory.

This patch eliminates all the unnecessary stack memory uses by supplying
the final destination directly to the crypto API.  In two instances where a
temporary buffer is actually needed we also switch use a scratch area in
the rxrpc_call struct (only one DATA packet will be being secured or
verified at a time).

Finally there is no need to split a split-page buffer into two SG entries
so code dealing with that has been removed.

Signed-off-by: Herbert Xu <herbert@gondor.apana.org.au>
Signed-off-by: Andy Lutomirski <luto@kernel.org>
Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/ar-internal.h |    8 +--
 net/rxrpc/conn_event.c  |    5 +-
 net/rxrpc/conn_object.c |    6 ++
 net/rxrpc/insecure.c    |    7 +-
 net/rxrpc/rxkad.c       |  150 +++++++++++++++--------------------------------
 5 files changed, 65 insertions(+), 111 deletions(-)

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 702db72196fb..796368d1fb25 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -141,17 +141,16 @@ struct rxrpc_security {
 	int (*init_connection_security)(struct rxrpc_connection *);
 
 	/* prime a connection's packet security */
-	void (*prime_packet_security)(struct rxrpc_connection *);
+	int (*prime_packet_security)(struct rxrpc_connection *);
 
 	/* impose security on a packet */
-	int (*secure_packet)(const struct rxrpc_call *,
+	int (*secure_packet)(struct rxrpc_call *,
 			     struct sk_buff *,
 			     size_t,
 			     void *);
 
 	/* verify the security on a received packet */
-	int (*verify_packet)(const struct rxrpc_call *, struct sk_buff *,
-			     u32 *);
+	int (*verify_packet)(struct rxrpc_call *, struct sk_buff *, u32 *);
 
 	/* issue a challenge */
 	int (*issue_challenge)(struct rxrpc_connection *);
@@ -399,6 +398,7 @@ struct rxrpc_call {
 	struct sk_buff_head	rx_oos_queue;	/* packets received out of sequence */
 	struct sk_buff		*tx_pending;	/* Tx socket buffer being filled */
 	wait_queue_head_t	tx_waitq;	/* wait for Tx window space to become available */
+	__be32			crypto_buf[2];	/* Temporary packet crypto buffer */
 	unsigned long		user_call_ID;	/* user-defined call ID */
 	unsigned long		creation_jif;	/* time of call creation */
 	unsigned long		flags;
diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c
index bf6971555eac..6a3c96707831 100644
--- a/net/rxrpc/conn_event.c
+++ b/net/rxrpc/conn_event.c
@@ -188,7 +188,10 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
 		if (ret < 0)
 			return ret;
 
-		conn->security->prime_packet_security(conn);
+		ret = conn->security->prime_packet_security(conn);
+		if (ret < 0)
+			return ret;
+
 		read_lock_bh(&conn->lock);
 		spin_lock(&conn->state_lock);
 
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index 9381938187ac..0ab87fa0bbac 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -138,7 +138,9 @@ rxrpc_alloc_client_connection(struct rxrpc_conn_parameters *cp, gfp_t gfp)
 	if (ret < 0)
 		goto error_1;
 
-	conn->security->prime_packet_security(conn);
+	ret = conn->security->prime_packet_security(conn);
+	if (ret < 0)
+		goto error_2;
 
 	write_lock(&rxrpc_connection_lock);
 	list_add_tail(&conn->link, &rxrpc_connections);
@@ -152,6 +154,8 @@ rxrpc_alloc_client_connection(struct rxrpc_conn_parameters *cp, gfp_t gfp)
 	_leave(" = %p", conn);
 	return conn;
 
+error_2:
+	conn->security->clear(conn);
 error_1:
 	rxrpc_put_client_connection_id(conn);
 error_0:
diff --git a/net/rxrpc/insecure.c b/net/rxrpc/insecure.c
index e571403613c1..c21ad213b337 100644
--- a/net/rxrpc/insecure.c
+++ b/net/rxrpc/insecure.c
@@ -17,11 +17,12 @@ static int none_init_connection_security(struct rxrpc_connection *conn)
 	return 0;
 }
 
-static void none_prime_packet_security(struct rxrpc_connection *conn)
+static int none_prime_packet_security(struct rxrpc_connection *conn)
 {
+	return 0;
 }
 
-static int none_secure_packet(const struct rxrpc_call *call,
+static int none_secure_packet(struct rxrpc_call *call,
 			       struct sk_buff *skb,
 			       size_t data_size,
 			       void *sechdr)
@@ -29,7 +30,7 @@ static int none_secure_packet(const struct rxrpc_call *call,
 	return 0;
 }
 
-static int none_verify_packet(const struct rxrpc_call *call,
+static int none_verify_packet(struct rxrpc_call *call,
 			       struct sk_buff *skb,
 			       u32 *_abort_code)
 {
diff --git a/net/rxrpc/rxkad.c b/net/rxrpc/rxkad.c
index 23c05ec6fa28..3acc7c1241d4 100644
--- a/net/rxrpc/rxkad.c
+++ b/net/rxrpc/rxkad.c
@@ -103,43 +103,43 @@ error:
  * prime the encryption state with the invariant parts of a connection's
  * description
  */
-static void rxkad_prime_packet_security(struct rxrpc_connection *conn)
+static int rxkad_prime_packet_security(struct rxrpc_connection *conn)
 {
 	struct rxrpc_key_token *token;
 	SKCIPHER_REQUEST_ON_STACK(req, conn->cipher);
-	struct scatterlist sg[2];
+	struct scatterlist sg;
 	struct rxrpc_crypt iv;
-	struct {
-		__be32 x[4];
-	} tmpbuf __attribute__((aligned(16))); /* must all be in same page */
+	__be32 *tmpbuf;
+	size_t tmpsize = 4 * sizeof(__be32);
 
 	_enter("");
 
 	if (!conn->params.key)
-		return;
+		return 0;
+
+	tmpbuf = kmalloc(tmpsize, GFP_KERNEL);
+	if (!tmpbuf)
+		return -ENOMEM;
 
 	token = conn->params.key->payload.data[0];
 	memcpy(&iv, token->kad->session_key, sizeof(iv));
 
-	tmpbuf.x[0] = htonl(conn->proto.epoch);
-	tmpbuf.x[1] = htonl(conn->proto.cid);
-	tmpbuf.x[2] = 0;
-	tmpbuf.x[3] = htonl(conn->security_ix);
-
-	sg_init_one(&sg[0], &tmpbuf, sizeof(tmpbuf));
-	sg_init_one(&sg[1], &tmpbuf, sizeof(tmpbuf));
+	tmpbuf[0] = htonl(conn->proto.epoch);
+	tmpbuf[1] = htonl(conn->proto.cid);
+	tmpbuf[2] = 0;
+	tmpbuf[3] = htonl(conn->security_ix);
 
+	sg_init_one(&sg, tmpbuf, tmpsize);
 	skcipher_request_set_tfm(req, conn->cipher);
 	skcipher_request_set_callback(req, 0, NULL, NULL);
-	skcipher_request_set_crypt(req, &sg[1], &sg[0], sizeof(tmpbuf), iv.x);
-
+	skcipher_request_set_crypt(req, &sg, &sg, tmpsize, iv.x);
 	crypto_skcipher_encrypt(req);
 	skcipher_request_zero(req);
 
-	memcpy(&conn->csum_iv, &tmpbuf.x[2], sizeof(conn->csum_iv));
-	ASSERTCMP((u32 __force)conn->csum_iv.n[0], ==, (u32 __force)tmpbuf.x[2]);
-
-	_leave("");
+	memcpy(&conn->csum_iv, tmpbuf + 2, sizeof(conn->csum_iv));
+	kfree(tmpbuf);
+	_leave(" = 0");
+	return 0;
 }
 
 /*
@@ -152,12 +152,9 @@ static int rxkad_secure_packet_auth(const struct rxrpc_call *call,
 {
 	struct rxrpc_skb_priv *sp;
 	SKCIPHER_REQUEST_ON_STACK(req, call->conn->cipher);
+	struct rxkad_level1_hdr hdr;
 	struct rxrpc_crypt iv;
-	struct scatterlist sg[2];
-	struct {
-		struct rxkad_level1_hdr hdr;
-		__be32	first;	/* first four bytes of data and padding */
-	} tmpbuf __attribute__((aligned(8))); /* must all be in same page */
+	struct scatterlist sg;
 	u16 check;
 
 	sp = rxrpc_skb(skb);
@@ -167,24 +164,19 @@ static int rxkad_secure_packet_auth(const struct rxrpc_call *call,
 	check = sp->hdr.seq ^ sp->hdr.callNumber;
 	data_size |= (u32)check << 16;
 
-	tmpbuf.hdr.data_size = htonl(data_size);
-	memcpy(&tmpbuf.first, sechdr + 4, sizeof(tmpbuf.first));
+	hdr.data_size = htonl(data_size);
+	memcpy(sechdr, &hdr, sizeof(hdr));
 
 	/* start the encryption afresh */
 	memset(&iv, 0, sizeof(iv));
 
-	sg_init_one(&sg[0], &tmpbuf, sizeof(tmpbuf));
-	sg_init_one(&sg[1], &tmpbuf, sizeof(tmpbuf));
-
+	sg_init_one(&sg, sechdr, 8);
 	skcipher_request_set_tfm(req, call->conn->cipher);
 	skcipher_request_set_callback(req, 0, NULL, NULL);
-	skcipher_request_set_crypt(req, &sg[1], &sg[0], sizeof(tmpbuf), iv.x);
-
+	skcipher_request_set_crypt(req, &sg, &sg, 8, iv.x);
 	crypto_skcipher_encrypt(req);
 	skcipher_request_zero(req);
 
-	memcpy(sechdr, &tmpbuf, sizeof(tmpbuf));
-
 	_leave(" = 0");
 	return 0;
 }
@@ -198,8 +190,7 @@ static int rxkad_secure_packet_encrypt(const struct rxrpc_call *call,
 				       void *sechdr)
 {
 	const struct rxrpc_key_token *token;
-	struct rxkad_level2_hdr rxkhdr
-		__attribute__((aligned(8))); /* must be all on one page */
+	struct rxkad_level2_hdr rxkhdr;
 	struct rxrpc_skb_priv *sp;
 	SKCIPHER_REQUEST_ON_STACK(req, call->conn->cipher);
 	struct rxrpc_crypt iv;
@@ -218,18 +209,16 @@ static int rxkad_secure_packet_encrypt(const struct rxrpc_call *call,
 
 	rxkhdr.data_size = htonl(data_size | (u32)check << 16);
 	rxkhdr.checksum = 0;
+	memcpy(sechdr, &rxkhdr, sizeof(rxkhdr));
 
 	/* encrypt from the session key */
 	token = call->conn->params.key->payload.data[0];
 	memcpy(&iv, token->kad->session_key, sizeof(iv));
 
 	sg_init_one(&sg[0], sechdr, sizeof(rxkhdr));
-	sg_init_one(&sg[1], &rxkhdr, sizeof(rxkhdr));
-
 	skcipher_request_set_tfm(req, call->conn->cipher);
 	skcipher_request_set_callback(req, 0, NULL, NULL);
-	skcipher_request_set_crypt(req, &sg[1], &sg[0], sizeof(rxkhdr), iv.x);
-
+	skcipher_request_set_crypt(req, &sg[0], &sg[0], sizeof(rxkhdr), iv.x);
 	crypto_skcipher_encrypt(req);
 
 	/* we want to encrypt the skbuff in-place */
@@ -243,9 +232,7 @@ static int rxkad_secure_packet_encrypt(const struct rxrpc_call *call,
 
 	sg_init_table(sg, nsg);
 	skb_to_sgvec(skb, sg, 0, len);
-
 	skcipher_request_set_crypt(req, sg, sg, len, iv.x);
-
 	crypto_skcipher_encrypt(req);
 
 	_leave(" = 0");
@@ -259,7 +246,7 @@ out:
 /*
  * checksum an RxRPC packet header
  */
-static int rxkad_secure_packet(const struct rxrpc_call *call,
+static int rxkad_secure_packet(struct rxrpc_call *call,
 			       struct sk_buff *skb,
 			       size_t data_size,
 			       void *sechdr)
@@ -267,10 +254,7 @@ static int rxkad_secure_packet(const struct rxrpc_call *call,
 	struct rxrpc_skb_priv *sp;
 	SKCIPHER_REQUEST_ON_STACK(req, call->conn->cipher);
 	struct rxrpc_crypt iv;
-	struct scatterlist sg[2];
-	struct {
-		__be32 x[2];
-	} tmpbuf __attribute__((aligned(8))); /* must all be in same page */
+	struct scatterlist sg;
 	u32 x, y;
 	int ret;
 
@@ -293,20 +277,17 @@ static int rxkad_secure_packet(const struct rxrpc_call *call,
 	/* calculate the security checksum */
 	x = call->channel << (32 - RXRPC_CIDSHIFT);
 	x |= sp->hdr.seq & 0x3fffffff;
-	tmpbuf.x[0] = htonl(sp->hdr.callNumber);
-	tmpbuf.x[1] = htonl(x);
-
-	sg_init_one(&sg[0], &tmpbuf, sizeof(tmpbuf));
-	sg_init_one(&sg[1], &tmpbuf, sizeof(tmpbuf));
+	call->crypto_buf[0] = htonl(sp->hdr.callNumber);
+	call->crypto_buf[1] = htonl(x);
 
+	sg_init_one(&sg, call->crypto_buf, 8);
 	skcipher_request_set_tfm(req, call->conn->cipher);
 	skcipher_request_set_callback(req, 0, NULL, NULL);
-	skcipher_request_set_crypt(req, &sg[1], &sg[0], sizeof(tmpbuf), iv.x);
-
+	skcipher_request_set_crypt(req, &sg, &sg, 8, iv.x);
 	crypto_skcipher_encrypt(req);
 	skcipher_request_zero(req);
 
-	y = ntohl(tmpbuf.x[1]);
+	y = ntohl(call->crypto_buf[1]);
 	y = (y >> 16) & 0xffff;
 	if (y == 0)
 		y = 1; /* zero checksums are not permitted */
@@ -367,7 +348,6 @@ static int rxkad_verify_packet_auth(const struct rxrpc_call *call,
 	skcipher_request_set_tfm(req, call->conn->cipher);
 	skcipher_request_set_callback(req, 0, NULL, NULL);
 	skcipher_request_set_crypt(req, sg, sg, 8, iv.x);
-
 	crypto_skcipher_decrypt(req);
 	skcipher_request_zero(req);
 
@@ -452,7 +432,6 @@ static int rxkad_verify_packet_encrypt(const struct rxrpc_call *call,
 	skcipher_request_set_tfm(req, call->conn->cipher);
 	skcipher_request_set_callback(req, 0, NULL, NULL);
 	skcipher_request_set_crypt(req, sg, sg, skb->len, iv.x);
-
 	crypto_skcipher_decrypt(req);
 	skcipher_request_zero(req);
 	if (sg != _sg)
@@ -498,17 +477,14 @@ nomem:
 /*
  * verify the security on a received packet
  */
-static int rxkad_verify_packet(const struct rxrpc_call *call,
+static int rxkad_verify_packet(struct rxrpc_call *call,
 			       struct sk_buff *skb,
 			       u32 *_abort_code)
 {
 	SKCIPHER_REQUEST_ON_STACK(req, call->conn->cipher);
 	struct rxrpc_skb_priv *sp;
 	struct rxrpc_crypt iv;
-	struct scatterlist sg[2];
-	struct {
-		__be32 x[2];
-	} tmpbuf __attribute__((aligned(8))); /* must all be in same page */
+	struct scatterlist sg;
 	u16 cksum;
 	u32 x, y;
 	int ret;
@@ -533,20 +509,17 @@ static int rxkad_verify_packet(const struct rxrpc_call *call,
 	/* validate the security checksum */
 	x = call->channel << (32 - RXRPC_CIDSHIFT);
 	x |= sp->hdr.seq & 0x3fffffff;
-	tmpbuf.x[0] = htonl(call->call_id);
-	tmpbuf.x[1] = htonl(x);
-
-	sg_init_one(&sg[0], &tmpbuf, sizeof(tmpbuf));
-	sg_init_one(&sg[1], &tmpbuf, sizeof(tmpbuf));
+	call->crypto_buf[0] = htonl(call->call_id);
+	call->crypto_buf[1] = htonl(x);
 
+	sg_init_one(&sg, call->crypto_buf, 8);
 	skcipher_request_set_tfm(req, call->conn->cipher);
 	skcipher_request_set_callback(req, 0, NULL, NULL);
-	skcipher_request_set_crypt(req, &sg[1], &sg[0], sizeof(tmpbuf), iv.x);
-
+	skcipher_request_set_crypt(req, &sg, &sg, 8, iv.x);
 	crypto_skcipher_encrypt(req);
 	skcipher_request_zero(req);
 
-	y = ntohl(tmpbuf.x[1]);
+	y = ntohl(call->crypto_buf[1]);
 	cksum = (y >> 16) & 0xffff;
 	if (cksum == 0)
 		cksum = 1; /* zero checksums are not permitted */
@@ -710,29 +683,6 @@ static void rxkad_calc_response_checksum(struct rxkad_response *response)
 }
 
 /*
- * load a scatterlist with a potentially split-page buffer
- */
-static void rxkad_sg_set_buf2(struct scatterlist sg[2],
-			      void *buf, size_t buflen)
-{
-	int nsg = 1;
-
-	sg_init_table(sg, 2);
-
-	sg_set_buf(&sg[0], buf, buflen);
-	if (sg[0].offset + buflen > PAGE_SIZE) {
-		/* the buffer was split over two pages */
-		sg[0].length = PAGE_SIZE - sg[0].offset;
-		sg_set_buf(&sg[1], buf + sg[0].length, buflen - sg[0].length);
-		nsg++;
-	}
-
-	sg_mark_end(&sg[nsg - 1]);
-
-	ASSERTCMP(sg[0].length + sg[1].length, ==, buflen);
-}
-
-/*
  * encrypt the response packet
  */
 static void rxkad_encrypt_response(struct rxrpc_connection *conn,
@@ -741,17 +691,16 @@ static void rxkad_encrypt_response(struct rxrpc_connection *conn,
 {
 	SKCIPHER_REQUEST_ON_STACK(req, conn->cipher);
 	struct rxrpc_crypt iv;
-	struct scatterlist sg[2];
+	struct scatterlist sg[1];
 
 	/* continue encrypting from where we left off */
 	memcpy(&iv, s2->session_key, sizeof(iv));
 
-	rxkad_sg_set_buf2(sg, &resp->encrypted, sizeof(resp->encrypted));
-
+	sg_init_table(sg, 1);
+	sg_set_buf(sg, &resp->encrypted, sizeof(resp->encrypted));
 	skcipher_request_set_tfm(req, conn->cipher);
 	skcipher_request_set_callback(req, 0, NULL, NULL);
 	skcipher_request_set_crypt(req, sg, sg, sizeof(resp->encrypted), iv.x);
-
 	crypto_skcipher_encrypt(req);
 	skcipher_request_zero(req);
 }
@@ -887,10 +836,8 @@ static int rxkad_decrypt_ticket(struct rxrpc_connection *conn,
 	}
 
 	sg_init_one(&sg[0], ticket, ticket_len);
-
 	skcipher_request_set_callback(req, 0, NULL, NULL);
 	skcipher_request_set_crypt(req, sg, sg, ticket_len, iv.x);
-
 	crypto_skcipher_decrypt(req);
 	skcipher_request_free(req);
 
@@ -1001,7 +948,7 @@ static void rxkad_decrypt_response(struct rxrpc_connection *conn,
 				   const struct rxrpc_crypt *session_key)
 {
 	SKCIPHER_REQUEST_ON_STACK(req, rxkad_ci);
-	struct scatterlist sg[2];
+	struct scatterlist sg[1];
 	struct rxrpc_crypt iv;
 
 	_enter(",,%08x%08x",
@@ -1016,12 +963,11 @@ static void rxkad_decrypt_response(struct rxrpc_connection *conn,
 
 	memcpy(&iv, session_key, sizeof(iv));
 
-	rxkad_sg_set_buf2(sg, &resp->encrypted, sizeof(resp->encrypted));
-
+	sg_init_table(sg, 1);
+	sg_set_buf(sg, &resp->encrypted, sizeof(resp->encrypted));
 	skcipher_request_set_tfm(req, rxkad_ci);
 	skcipher_request_set_callback(req, 0, NULL, NULL);
 	skcipher_request_set_crypt(req, sg, sg, sizeof(resp->encrypted), iv.x);
-
 	crypto_skcipher_decrypt(req);
 	skcipher_request_zero(req);
 

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [PATCH net-next 03/19] rxrpc: Provide more refcount helper functions
  2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
  2016-06-30 14:10 ` [PATCH net-next 01/19] rxrpc: Check the source of a packet to a client conn David Howells
  2016-06-30 14:10 ` [PATCH net-next 02/19] rxrpc: Avoid using stack memory in SG lists in rxkad David Howells
@ 2016-06-30 14:11 ` David Howells
  2016-06-30 14:11 ` [PATCH net-next 04/19] rxrpc: Dup the main conn list for the proc interface David Howells
                   ` (16 subsequent siblings)
  19 siblings, 0 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:11 UTC (permalink / raw)
  To: davem; +Cc: dhowells, netdev, linux-afs, linux-kernel

Provide refcount helper functions for connections so that the code doesn't
touch conn->usage directly.

Also provide queueing helper functions so that the queueing of local and
connection objects can be fixed later.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/ar-internal.h |   12 +++++++++++-
 net/rxrpc/conn_event.c  |    2 +-
 net/rxrpc/input.c       |    2 +-
 3 files changed, 13 insertions(+), 3 deletions(-)

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 796368d1fb25..45aef3ef7609 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -35,7 +35,6 @@ struct rxrpc_crypt {
 	queue_delayed_work(rxrpc_workqueue, (WS), (D))
 
 #define rxrpc_queue_call(CALL)	rxrpc_queue_work(&(CALL)->processor)
-#define rxrpc_queue_conn(CONN)	rxrpc_queue_work(&(CONN)->processor)
 
 struct rxrpc_connection;
 
@@ -566,6 +565,12 @@ static inline void rxrpc_get_connection(struct rxrpc_connection *conn)
 	atomic_inc(&conn->usage);
 }
 
+
+static inline void rxrpc_queue_conn(struct rxrpc_connection *conn)
+{
+	rxrpc_queue_work(&conn->processor);
+}
+
 /*
  * input.c
  */
@@ -618,6 +623,11 @@ static inline void rxrpc_put_local(struct rxrpc_local *local)
 		__rxrpc_put_local(local);
 }
 
+static inline void rxrpc_queue_local(struct rxrpc_local *local)
+{
+	rxrpc_queue_work(&local->processor);
+}
+
 /*
  * misc.c
  */
diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c
index 6a3c96707831..d7e183c6b5df 100644
--- a/net/rxrpc/conn_event.c
+++ b/net/rxrpc/conn_event.c
@@ -318,7 +318,7 @@ void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
 	CHECK_SLAB_OKAY(&local->usage);
 
 	skb_queue_tail(&local->reject_queue, skb);
-	rxrpc_queue_work(&local->processor);
+	rxrpc_queue_local(local);
 }
 
 /*
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index f4bd57b77b93..8a68cc387a8c 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -595,7 +595,7 @@ static void rxrpc_post_packet_to_local(struct rxrpc_local *local,
 	_enter("%p,%p", local, skb);
 
 	skb_queue_tail(&local->event_queue, skb);
-	rxrpc_queue_work(&local->processor);
+	rxrpc_queue_local(local);
 }
 
 /*

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [PATCH net-next 04/19] rxrpc: Dup the main conn list for the proc interface
  2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
                   ` (2 preceding siblings ...)
  2016-06-30 14:11 ` [PATCH net-next 03/19] rxrpc: Provide more refcount helper functions David Howells
@ 2016-06-30 14:11 ` David Howells
  2016-06-30 14:11 ` [PATCH net-next 05/19] rxrpc: Turn connection #defines into enums and put outside struct def David Howells
                   ` (15 subsequent siblings)
  19 siblings, 0 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:11 UTC (permalink / raw)
  To: davem; +Cc: dhowells, netdev, linux-afs, linux-kernel

The main connection list is used for two independent purposes: primarily it
is used to find connections to reap and secondarily it is used to list
connections in procfs.

Split the procfs list out from the reap list.  This allows the reap list to
be phased out in stages as the client conns and service conns acquire their
own separate connection management strategies.

Whilst we're at it, use the address information stored in conn->proto when
displaying through procfs rather than accessing the peer record.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/ar-internal.h |    3 ++-
 net/rxrpc/conn_object.c |   12 +++++++++++-
 net/rxrpc/proc.c        |   14 ++++++--------
 3 files changed, 19 insertions(+), 10 deletions(-)

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 45aef3ef7609..f2b1c6dd5e9a 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -271,6 +271,7 @@ struct rxrpc_connection {
 		struct rb_node	client_node;	/* Node in local->client_conns */
 		struct rb_node	service_node;	/* Node in peer->service_conns */
 	};
+	struct list_head	proc_link;	/* link in procfs list */
 	struct list_head	link;		/* link in master connection list */
 	struct rb_root		calls;		/* calls on this connection */
 	struct sk_buff_head	rx_queue;	/* received conn-level packets */
@@ -535,7 +536,7 @@ void rxrpc_reject_packets(struct rxrpc_local *);
  * conn_object.c
  */
 extern unsigned int rxrpc_connection_expiry;
-extern struct list_head rxrpc_connections;
+extern struct list_head rxrpc_connection_proc_list;
 extern rwlock_t rxrpc_connection_lock;
 
 int rxrpc_connect_call(struct rxrpc_call *, struct rxrpc_conn_parameters *,
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index 0ab87fa0bbac..ff0c502ed68d 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -27,7 +27,8 @@ unsigned int rxrpc_connection_expiry = 10 * 60;
 
 static void rxrpc_connection_reaper(struct work_struct *work);
 
-LIST_HEAD(rxrpc_connections);
+static LIST_HEAD(rxrpc_connections);
+LIST_HEAD(rxrpc_connection_proc_list);
 DEFINE_RWLOCK(rxrpc_connection_lock);
 static DECLARE_DELAYED_WORK(rxrpc_connection_reap, rxrpc_connection_reaper);
 
@@ -45,6 +46,7 @@ static struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
 		spin_lock_init(&conn->channel_lock);
 		init_waitqueue_head(&conn->channel_wq);
 		INIT_WORK(&conn->processor, &rxrpc_process_connection);
+		INIT_LIST_HEAD(&conn->proc_link);
 		INIT_LIST_HEAD(&conn->link);
 		conn->calls = RB_ROOT;
 		skb_queue_head_init(&conn->rx_queue);
@@ -144,6 +146,7 @@ rxrpc_alloc_client_connection(struct rxrpc_conn_parameters *cp, gfp_t gfp)
 
 	write_lock(&rxrpc_connection_lock);
 	list_add_tail(&conn->link, &rxrpc_connections);
+	list_add_tail(&conn->proc_link, &rxrpc_connection_proc_list);
 	write_unlock(&rxrpc_connection_lock);
 
 	/* We steal the caller's peer ref. */
@@ -602,6 +605,7 @@ static void rxrpc_connection_reaper(struct work_struct *work)
 	struct rxrpc_connection *conn, *_p;
 	struct rxrpc_peer *peer;
 	unsigned long now, earliest, reap_time;
+	bool remove;
 
 	LIST_HEAD(graveyard);
 
@@ -619,6 +623,7 @@ static void rxrpc_connection_reaper(struct work_struct *work)
 		if (likely(atomic_read(&conn->usage) > 0))
 			continue;
 
+		remove = false;
 		if (rxrpc_conn_is_client(conn)) {
 			struct rxrpc_local *local = conn->params.local;
 			spin_lock(&local->client_conns_lock);
@@ -631,6 +636,7 @@ static void rxrpc_connection_reaper(struct work_struct *work)
 				rxrpc_put_client_connection_id(conn);
 				rb_erase(&conn->client_node,
 					 &local->client_conns);
+				remove = true;
 			} else if (reap_time < earliest) {
 				earliest = reap_time;
 			}
@@ -647,12 +653,16 @@ static void rxrpc_connection_reaper(struct work_struct *work)
 				list_move_tail(&conn->link, &graveyard);
 				rb_erase(&conn->service_node,
 					 &peer->service_conns);
+				remove = true;
 			} else if (reap_time < earliest) {
 				earliest = reap_time;
 			}
 
 			write_unlock_bh(&peer->conn_lock);
 		}
+
+		if (remove)
+			list_del_init(&conn->proc_link);
 	}
 	write_unlock(&rxrpc_connection_lock);
 
diff --git a/net/rxrpc/proc.c b/net/rxrpc/proc.c
index 500cdcdc843c..afb5e1bc1b6e 100644
--- a/net/rxrpc/proc.c
+++ b/net/rxrpc/proc.c
@@ -67,8 +67,7 @@ static int rxrpc_call_seq_show(struct seq_file *seq, void *v)
 	conn = call->conn;
 	if (conn)
 		sprintf(rbuff, "%pI4:%u",
-			&conn->params.peer->srx.transport.sin.sin_addr,
-			ntohs(conn->params.peer->srx.transport.sin.sin_port));
+			&conn->proto.ipv4_addr, ntohs(conn->proto.port));
 	else
 		strcpy(rbuff, "no_connection");
 
@@ -115,13 +114,13 @@ const struct file_operations rxrpc_call_seq_fops = {
 static void *rxrpc_connection_seq_start(struct seq_file *seq, loff_t *_pos)
 {
 	read_lock(&rxrpc_connection_lock);
-	return seq_list_start_head(&rxrpc_connections, *_pos);
+	return seq_list_start_head(&rxrpc_connection_proc_list, *_pos);
 }
 
 static void *rxrpc_connection_seq_next(struct seq_file *seq, void *v,
 				       loff_t *pos)
 {
-	return seq_list_next(v, &rxrpc_connections, pos);
+	return seq_list_next(v, &rxrpc_connection_proc_list, pos);
 }
 
 static void rxrpc_connection_seq_stop(struct seq_file *seq, void *v)
@@ -134,7 +133,7 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v)
 	struct rxrpc_connection *conn;
 	char lbuff[4 + 4 + 4 + 4 + 5 + 1], rbuff[4 + 4 + 4 + 4 + 5 + 1];
 
-	if (v == &rxrpc_connections) {
+	if (v == &rxrpc_connection_proc_list) {
 		seq_puts(seq,
 			 "Proto Local                  Remote                "
 			 " SvID ConnID   Calls    End Use State    Key     "
@@ -143,15 +142,14 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v)
 		return 0;
 	}
 
-	conn = list_entry(v, struct rxrpc_connection, link);
+	conn = list_entry(v, struct rxrpc_connection, proc_link);
 
 	sprintf(lbuff, "%pI4:%u",
 		&conn->params.local->srx.transport.sin.sin_addr,
 		ntohs(conn->params.local->srx.transport.sin.sin_port));
 
 	sprintf(rbuff, "%pI4:%u",
-		&conn->params.peer->srx.transport.sin.sin_addr,
-		ntohs(conn->params.peer->srx.transport.sin.sin_port));
+		&conn->proto.ipv4_addr, ntohs(conn->proto.port));
 
 	seq_printf(seq,
 		   "UDP   %-22.22s %-22.22s %4x %08x %08x %s %3u"

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [PATCH net-next 05/19] rxrpc: Turn connection #defines into enums and put outside struct def
  2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
                   ` (3 preceding siblings ...)
  2016-06-30 14:11 ` [PATCH net-next 04/19] rxrpc: Dup the main conn list for the proc interface David Howells
@ 2016-06-30 14:11 ` David Howells
  2016-06-30 14:11 ` [PATCH net-next 06/19] rxrpc: Check that the client conns cache is empty before module removal David Howells
                   ` (14 subsequent siblings)
  19 siblings, 0 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:11 UTC (permalink / raw)
  To: davem; +Cc: dhowells, netdev, linux-afs, linux-kernel

Turn the connection event and state #define lists into enums and move
outside of the struct definition.

Whilst we're at it, change _SERVER to _SERVICE in those identifiers and add
EV_ into the event name to distinguish them from flags and states.

Also add a symbol indicating the number of states and use that in the state
text array.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/ar-internal.h |   42 ++++++++++++++++++++++++++++++------------
 net/rxrpc/call_accept.c |    6 +++---
 net/rxrpc/conn_event.c  |    6 +++---
 net/rxrpc/conn_object.c |    4 ++--
 net/rxrpc/proc.c        |   18 +++++++++---------
 5 files changed, 47 insertions(+), 29 deletions(-)

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index f2b1c6dd5e9a..6f62b0eef0e0 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -254,6 +254,35 @@ struct rxrpc_conn_parameters {
 };
 
 /*
+ * Bits in the connection flags.
+ */
+enum rxrpc_conn_flag {
+	RXRPC_CONN_HAS_IDR,		/* Has a client conn ID assigned */
+};
+
+/*
+ * Events that can be raised upon a connection.
+ */
+enum rxrpc_conn_event {
+	RXRPC_CONN_EV_CHALLENGE,	/* Send challenge packet */
+};
+
+/*
+ * The connection protocol state.
+ */
+enum rxrpc_conn_proto_state {
+	RXRPC_CONN_UNUSED,		/* Connection not yet attempted */
+	RXRPC_CONN_CLIENT,		/* Client connection */
+	RXRPC_CONN_SERVICE_UNSECURED,	/* Service unsecured connection */
+	RXRPC_CONN_SERVICE_CHALLENGING,	/* Service challenging for security */
+	RXRPC_CONN_SERVICE,		/* Service secured connection */
+	RXRPC_CONN_REMOTELY_ABORTED,	/* Conn aborted by peer */
+	RXRPC_CONN_LOCALLY_ABORTED,	/* Conn aborted locally */
+	RXRPC_CONN_NETWORK_ERROR,	/* Conn terminated by network error */
+	RXRPC_CONN__NR_STATES
+};
+
+/*
  * RxRPC connection definition
  * - matched by { local, peer, epoch, conn_id, direction }
  * - each connection can only handle four simultaneous calls
@@ -280,23 +309,12 @@ struct rxrpc_connection {
 	struct crypto_skcipher	*cipher;	/* encryption handle */
 	struct rxrpc_crypt	csum_iv;	/* packet checksum base */
 	unsigned long		flags;
-#define RXRPC_CONN_HAS_IDR	0		/* - Has a client conn ID assigned */
 	unsigned long		events;
-#define RXRPC_CONN_CHALLENGE	0		/* send challenge packet */
 	unsigned long		put_time;	/* Time at which last put */
 	rwlock_t		lock;		/* access lock */
 	spinlock_t		state_lock;	/* state-change lock */
 	atomic_t		usage;
-	enum {					/* current state of connection */
-		RXRPC_CONN_UNUSED,		/* - connection not yet attempted */
-		RXRPC_CONN_CLIENT,		/* - client connection */
-		RXRPC_CONN_SERVER_UNSECURED,	/* - server unsecured connection */
-		RXRPC_CONN_SERVER_CHALLENGING,	/* - server challenging for security */
-		RXRPC_CONN_SERVER,		/* - server secured connection */
-		RXRPC_CONN_REMOTELY_ABORTED,	/* - conn aborted by peer */
-		RXRPC_CONN_LOCALLY_ABORTED,	/* - conn aborted locally */
-		RXRPC_CONN_NETWORK_ERROR,	/* - conn terminated by network error */
-	} state;
+	enum rxrpc_conn_proto_state state : 8;	/* current state of connection */
 	u32			local_abort;	/* local abort code */
 	u32			remote_abort;	/* remote abort code */
 	int			error;		/* local error incurred */
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c
index 202e053a3c6d..1c0860df150e 100644
--- a/net/rxrpc/call_accept.c
+++ b/net/rxrpc/call_accept.c
@@ -128,12 +128,12 @@ static int rxrpc_accept_incoming_call(struct rxrpc_local *local,
 
 		spin_lock(&call->conn->state_lock);
 		if (sp->hdr.securityIndex > 0 &&
-		    call->conn->state == RXRPC_CONN_SERVER_UNSECURED) {
+		    call->conn->state == RXRPC_CONN_SERVICE_UNSECURED) {
 			_debug("await conn sec");
 			list_add_tail(&call->accept_link, &rx->secureq);
-			call->conn->state = RXRPC_CONN_SERVER_CHALLENGING;
+			call->conn->state = RXRPC_CONN_SERVICE_CHALLENGING;
 			rxrpc_get_connection(call->conn);
-			set_bit(RXRPC_CONN_CHALLENGE, &call->conn->events);
+			set_bit(RXRPC_CONN_EV_CHALLENGE, &call->conn->events);
 			rxrpc_queue_conn(call->conn);
 		} else {
 			_debug("conn ready");
diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c
index d7e183c6b5df..b9c39b83eddb 100644
--- a/net/rxrpc/conn_event.c
+++ b/net/rxrpc/conn_event.c
@@ -195,8 +195,8 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
 		read_lock_bh(&conn->lock);
 		spin_lock(&conn->state_lock);
 
-		if (conn->state == RXRPC_CONN_SERVER_CHALLENGING) {
-			conn->state = RXRPC_CONN_SERVER;
+		if (conn->state == RXRPC_CONN_SERVICE_CHALLENGING) {
+			conn->state = RXRPC_CONN_SERVICE;
 			for (loop = 0; loop < RXRPC_MAXCALLS; loop++)
 				rxrpc_call_is_secure(conn->channels[loop]);
 		}
@@ -268,7 +268,7 @@ void rxrpc_process_connection(struct work_struct *work)
 
 	rxrpc_get_connection(conn);
 
-	if (test_and_clear_bit(RXRPC_CONN_CHALLENGE, &conn->events)) {
+	if (test_and_clear_bit(RXRPC_CONN_EV_CHALLENGE, &conn->events)) {
 		rxrpc_secure_connection(conn);
 		rxrpc_put_connection(conn);
 	}
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index ff0c502ed68d..6d50d0d165b7 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -403,9 +403,9 @@ struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local,
 	candidate->params.service_id	= sp->hdr.serviceId;
 	candidate->security_ix		= sp->hdr.securityIndex;
 	candidate->out_clientflag	= 0;
-	candidate->state		= RXRPC_CONN_SERVER;
+	candidate->state		= RXRPC_CONN_SERVICE;
 	if (candidate->params.service_id)
-		candidate->state	= RXRPC_CONN_SERVER_UNSECURED;
+		candidate->state	= RXRPC_CONN_SERVICE_UNSECURED;
 
 	write_lock_bh(&peer->conn_lock);
 
diff --git a/net/rxrpc/proc.c b/net/rxrpc/proc.c
index afb5e1bc1b6e..46d470335263 100644
--- a/net/rxrpc/proc.c
+++ b/net/rxrpc/proc.c
@@ -14,15 +14,15 @@
 #include <net/af_rxrpc.h>
 #include "ar-internal.h"
 
-static const char *const rxrpc_conn_states[] = {
-	[RXRPC_CONN_UNUSED]		= "Unused  ",
-	[RXRPC_CONN_CLIENT]		= "Client  ",
-	[RXRPC_CONN_SERVER_UNSECURED]	= "SvUnsec ",
-	[RXRPC_CONN_SERVER_CHALLENGING]	= "SvChall ",
-	[RXRPC_CONN_SERVER]		= "SvSecure",
-	[RXRPC_CONN_REMOTELY_ABORTED]	= "RmtAbort",
-	[RXRPC_CONN_LOCALLY_ABORTED]	= "LocAbort",
-	[RXRPC_CONN_NETWORK_ERROR]	= "NetError",
+static const char *const rxrpc_conn_states[RXRPC_CONN__NR_STATES] = {
+	[RXRPC_CONN_UNUSED]			= "Unused  ",
+	[RXRPC_CONN_CLIENT]			= "Client  ",
+	[RXRPC_CONN_SERVICE_UNSECURED]		= "SvUnsec ",
+	[RXRPC_CONN_SERVICE_CHALLENGING]	= "SvChall ",
+	[RXRPC_CONN_SERVICE]			= "SvSecure",
+	[RXRPC_CONN_REMOTELY_ABORTED]		= "RmtAbort",
+	[RXRPC_CONN_LOCALLY_ABORTED]		= "LocAbort",
+	[RXRPC_CONN_NETWORK_ERROR]		= "NetError",
 };
 
 /*

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [PATCH net-next 06/19] rxrpc: Check that the client conns cache is empty before module removal
  2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
                   ` (4 preceding siblings ...)
  2016-06-30 14:11 ` [PATCH net-next 05/19] rxrpc: Turn connection #defines into enums and put outside struct def David Howells
@ 2016-06-30 14:11 ` David Howells
  2016-06-30 14:11 ` [PATCH net-next 07/19] rxrpc: Move usage count getting into rxrpc_queue_conn() David Howells
                   ` (13 subsequent siblings)
  19 siblings, 0 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:11 UTC (permalink / raw)
  To: davem; +Cc: dhowells, netdev, linux-afs, linux-kernel

Check that the client conns cache is empty before module removal and bug if
not, listing any offending connections that are still present.  Unfortunately,
if there are connections still around, then the transport socket is still
unexpectedly open and active, so we can't just unallocate the connections.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/af_rxrpc.c    |    3 +--
 net/rxrpc/ar-internal.h |    1 +
 net/rxrpc/conn_client.c |   19 +++++++++++++++++++
 3 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index 5d3e795a7c48..d5073eb02498 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -807,8 +807,7 @@ static void __exit af_rxrpc_exit(void)
 	_debug("synchronise RCU");
 	rcu_barrier();
 	_debug("destroy locals");
-	ASSERT(idr_is_empty(&rxrpc_client_conn_ids));
-	idr_destroy(&rxrpc_client_conn_ids);
+	rxrpc_destroy_client_conn_ids();
 	rxrpc_destroy_all_locals();
 
 	remove_proc_entry("rxrpc_conns", init_net.proc_net);
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 6f62b0eef0e0..2a8710dac3c5 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -542,6 +542,7 @@ extern struct idr rxrpc_client_conn_ids;
 
 int rxrpc_get_client_connection_id(struct rxrpc_connection *, gfp_t);
 void rxrpc_put_client_connection_id(struct rxrpc_connection *);
+void rxrpc_destroy_client_conn_ids(void);
 
 /*
  * conn_event.c
diff --git a/net/rxrpc/conn_client.c b/net/rxrpc/conn_client.c
index 82488d6adb83..be437d5e90ce 100644
--- a/net/rxrpc/conn_client.c
+++ b/net/rxrpc/conn_client.c
@@ -92,3 +92,22 @@ void rxrpc_put_client_connection_id(struct rxrpc_connection *conn)
 		spin_unlock(&rxrpc_conn_id_lock);
 	}
 }
+
+/*
+ * Destroy the client connection ID tree.
+ */
+void rxrpc_destroy_client_conn_ids(void)
+{
+	struct rxrpc_connection *conn;
+	int id;
+
+	if (!idr_is_empty(&rxrpc_client_conn_ids)) {
+		idr_for_each_entry(&rxrpc_client_conn_ids, conn, id) {
+			pr_err("AF_RXRPC: Leaked client conn %p {%d}\n",
+			       conn, atomic_read(&conn->usage));
+		}
+		BUG();
+	}
+
+	idr_destroy(&rxrpc_client_conn_ids);
+}

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [PATCH net-next 07/19] rxrpc: Move usage count getting into rxrpc_queue_conn()
  2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
                   ` (5 preceding siblings ...)
  2016-06-30 14:11 ` [PATCH net-next 06/19] rxrpc: Check that the client conns cache is empty before module removal David Howells
@ 2016-06-30 14:11 ` David Howells
  2016-06-30 14:11 ` [PATCH net-next 08/19] rxrpc: Fix handling of connection failure in client call creation David Howells
                   ` (12 subsequent siblings)
  19 siblings, 0 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:11 UTC (permalink / raw)
  To: davem; +Cc: dhowells, netdev, linux-afs, linux-kernel

Rather than calling rxrpc_get_connection() manually before calling
rxrpc_queue_conn(), do it inside the queue wrapper.

This allows us to do some important fixes:

 (1) If the usage count is 0, do nothing.  This prevents connections from
     being reanimated once they're dead.

 (2) If rxrpc_queue_work() fails because the work item is already queued,
     retract the usage count increment which would otherwise be lost.

 (3) Don't take a ref on the connection in the work function.  By passing
     the ref through the work item, this is unnecessary.  Doing it in the
     work function is too late anyway.  Previously, connection-directed
     packets held a ref on the connection, but that's not really the best
     idea.

And another useful changes:

 (*) Don't need to take a refcount on the connection in the data_ready
     handler unless we invoke the connection's work item.  We're using RCU
     there so that's otherwise redundant.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/ar-internal.h |    9 ++++++++-
 net/rxrpc/call_accept.c |    1 -
 net/rxrpc/conn_event.c  |    8 +-------
 net/rxrpc/input.c       |    1 -
 4 files changed, 9 insertions(+), 10 deletions(-)

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 2a8710dac3c5..d43cb7831693 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -585,10 +585,17 @@ static inline void rxrpc_get_connection(struct rxrpc_connection *conn)
 	atomic_inc(&conn->usage);
 }
 
+static inline
+struct rxrpc_connection *rxrpc_get_connection_maybe(struct rxrpc_connection *conn)
+{
+	return atomic_inc_not_zero(&conn->usage) ? conn : NULL;
+}
 
 static inline void rxrpc_queue_conn(struct rxrpc_connection *conn)
 {
-	rxrpc_queue_work(&conn->processor);
+	if (rxrpc_get_connection_maybe(conn) &&
+	    !rxrpc_queue_work(&conn->processor))
+		rxrpc_put_connection(conn);
 }
 
 /*
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c
index 1c0860df150e..5367dbe9b96f 100644
--- a/net/rxrpc/call_accept.c
+++ b/net/rxrpc/call_accept.c
@@ -132,7 +132,6 @@ static int rxrpc_accept_incoming_call(struct rxrpc_local *local,
 			_debug("await conn sec");
 			list_add_tail(&call->accept_link, &rx->secureq);
 			call->conn->state = RXRPC_CONN_SERVICE_CHALLENGING;
-			rxrpc_get_connection(call->conn);
 			set_bit(RXRPC_CONN_EV_CHALLENGE, &call->conn->events);
 			rxrpc_queue_conn(call->conn);
 		} else {
diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c
index b9c39b83eddb..9ceddd3fd5db 100644
--- a/net/rxrpc/conn_event.c
+++ b/net/rxrpc/conn_event.c
@@ -266,12 +266,8 @@ void rxrpc_process_connection(struct work_struct *work)
 
 	_enter("{%d}", conn->debug_id);
 
-	rxrpc_get_connection(conn);
-
-	if (test_and_clear_bit(RXRPC_CONN_EV_CHALLENGE, &conn->events)) {
+	if (test_and_clear_bit(RXRPC_CONN_EV_CHALLENGE, &conn->events))
 		rxrpc_secure_connection(conn);
-		rxrpc_put_connection(conn);
-	}
 
 	/* go through the conn-level event packets, releasing the ref on this
 	 * connection that each one has when we've finished with it */
@@ -286,7 +282,6 @@ void rxrpc_process_connection(struct work_struct *work)
 			goto requeue_and_leave;
 		case -ECONNABORTED:
 		default:
-			rxrpc_put_connection(conn);
 			rxrpc_free_skb(skb);
 			break;
 		}
@@ -304,7 +299,6 @@ requeue_and_leave:
 protocol_error:
 	if (rxrpc_abort_connection(conn, -ret, abort_code) < 0)
 		goto requeue_and_leave;
-	rxrpc_put_connection(conn);
 	rxrpc_free_skb(skb);
 	_leave(" [EPROTO]");
 	goto out;
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 8a68cc387a8c..710e55a5f569 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -580,7 +580,6 @@ static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
 {
 	_enter("%p,%p", conn, skb);
 
-	rxrpc_get_connection(conn);
 	skb_queue_tail(&conn->rx_queue, skb);
 	rxrpc_queue_conn(conn);
 }

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [PATCH net-next 08/19] rxrpc: Fix handling of connection failure in client call creation
  2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
                   ` (6 preceding siblings ...)
  2016-06-30 14:11 ` [PATCH net-next 07/19] rxrpc: Move usage count getting into rxrpc_queue_conn() David Howells
@ 2016-06-30 14:11 ` David Howells
  2016-06-30 14:11 ` [PATCH net-next 09/19] rxrpc: Release a call's connection ref on call disconnection David Howells
                   ` (11 subsequent siblings)
  19 siblings, 0 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:11 UTC (permalink / raw)
  To: davem; +Cc: dhowells, netdev, linux-afs, linux-kernel

If rxrpc_connect_call() fails during the creation of a client connection,
there are two bugs that we can hit that need fixing:

 (1) The call state should be moved to RXRPC_CALL_DEAD before the call
     cleanup phase is invoked.  If not, this can cause an assertion failure
     later.

 (2) call->link should be reinitialised after being deleted in
     rxrpc_new_client_call() - which otherwise leads to a failure later
     when the call cleanup attempts to delete the link again.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/call_object.c |    4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index ad933daae13b..6223a7ed831f 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -425,9 +425,10 @@ error:
 	rxrpc_put_call(call);
 
 	write_lock_bh(&rxrpc_call_lock);
-	list_del(&call->link);
+	list_del_init(&call->link);
 	write_unlock_bh(&rxrpc_call_lock);
 
+	call->state = RXRPC_CALL_DEAD;
 	rxrpc_put_call(call);
 	_leave(" = %d", ret);
 	return ERR_PTR(ret);
@@ -439,6 +440,7 @@ error:
 	 */
 found_user_ID_now_present:
 	write_unlock(&rx->call_lock);
+	call->state = RXRPC_CALL_DEAD;
 	rxrpc_put_call(call);
 	_leave(" = -EEXIST [%p]", call);
 	return ERR_PTR(-EEXIST);

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [PATCH net-next 09/19] rxrpc: Release a call's connection ref on call disconnection
  2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
                   ` (7 preceding siblings ...)
  2016-06-30 14:11 ` [PATCH net-next 08/19] rxrpc: Fix handling of connection failure in client call creation David Howells
@ 2016-06-30 14:11 ` David Howells
  2016-06-30 14:11 ` [PATCH net-next 10/19] rxrpc: Add RCU destruction for connections and calls David Howells
                   ` (10 subsequent siblings)
  19 siblings, 0 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:11 UTC (permalink / raw)
  To: davem; +Cc: dhowells, netdev, linux-afs, linux-kernel

When a call is disconnected, clear the call's pointer to the connection and
release the associated ref on that connection.  This means that the call no
longer pins the connection and the connection can be discarded even before
the call is.

As the code currently stands, the call struct is effectively pinned by
userspace until userspace has enacted a recvmsg() to retrieve the final
call state as sk_buffs on the receive queue pin the call to which they're
related because:

 (1) The rxrpc_call struct contains the userspace ID that recvmsg() has to
     include in the control message buffer to indicate which call is being
     referred to.  This ID must remain valid until the terminal packet is
     completely read and must be invalidated immediately at that point as
     userspace is entitled to immediately reuse it.

 (2) The final ACK to the reply to a client call isn't sent until the last
     data packet is entirely read (it's probably worth altering this in
     future to be send the ACK as soon as all the data has been received).


This change requires a bit of rearrangement to make sure that the call
isn't going to try and access the connection again after protocol
completion:

 (1) Delete the error link earlier when we're releasing the call.  Possibly
     network errors should be distributed via connections at the cost of
     adding in an access to the rxrpc_connection struct.

 (2) Remove the call from the connection's call tree before disconnecting
     the call.  The call tree needs to be removed anyway and incoming
     packets delivered by channel pointer instead.

 (3) The release call event should be considered last after all other
     events have been processed so that we don't need access to the
     connection again.

 (4) Move the channel_lock taking from rxrpc_release_call() to
     rxrpc_disconnect_call() where it will be required in future.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/call_event.c  |   10 +++++-----
 net/rxrpc/call_object.c |   26 +++++++++-----------------
 net/rxrpc/conn_object.c |    8 ++++++++
 3 files changed, 22 insertions(+), 22 deletions(-)

diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c
index 0ba84295f913..638d66df284a 100644
--- a/net/rxrpc/call_event.c
+++ b/net/rxrpc/call_event.c
@@ -858,11 +858,6 @@ void rxrpc_process_call(struct work_struct *work)
 	iov[0].iov_len	= sizeof(whdr);
 
 	/* deal with events of a final nature */
-	if (test_bit(RXRPC_CALL_EV_RELEASE, &call->events)) {
-		rxrpc_release_call(call);
-		clear_bit(RXRPC_CALL_EV_RELEASE, &call->events);
-	}
-
 	if (test_bit(RXRPC_CALL_EV_RCVD_ERROR, &call->events)) {
 		enum rxrpc_skb_mark mark;
 		int error;
@@ -1144,6 +1139,11 @@ void rxrpc_process_call(struct work_struct *work)
 		goto maybe_reschedule;
 	}
 
+	if (test_bit(RXRPC_CALL_EV_RELEASE, &call->events)) {
+		rxrpc_release_call(call);
+		clear_bit(RXRPC_CALL_EV_RELEASE, &call->events);
+	}
+
 	/* other events may have been raised since we started checking */
 	goto maybe_reschedule;
 
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 6223a7ed831f..b43d89c89744 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -628,6 +628,10 @@ void rxrpc_release_call(struct rxrpc_call *call)
 	 */
 	_debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);
 
+	spin_lock(&conn->params.peer->lock);
+	hlist_del_init(&call->error_link);
+	spin_unlock(&conn->params.peer->lock);
+
 	write_lock_bh(&rx->call_lock);
 	if (!list_empty(&call->accept_link)) {
 		_debug("unlinking once-pending call %p { e=%lx f=%lx }",
@@ -643,25 +647,22 @@ void rxrpc_release_call(struct rxrpc_call *call)
 	write_unlock_bh(&rx->call_lock);
 
 	/* free up the channel for reuse */
-	spin_lock(&conn->channel_lock);
 	write_lock_bh(&conn->lock);
 	write_lock(&call->state_lock);
 
-	rxrpc_disconnect_call(call);
-
-	spin_unlock(&conn->channel_lock);
-
 	if (call->state < RXRPC_CALL_COMPLETE &&
 	    call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
 		_debug("+++ ABORTING STATE %d +++\n", call->state);
 		call->state = RXRPC_CALL_LOCALLY_ABORTED;
 		call->local_abort = RX_CALL_DEAD;
-		set_bit(RXRPC_CALL_EV_ABORT, &call->events);
-		rxrpc_queue_call(call);
 	}
 	write_unlock(&call->state_lock);
+
+	rb_erase(&call->conn_node, &conn->calls);
 	write_unlock_bh(&conn->lock);
 
+	rxrpc_disconnect_call(call);
+
 	/* clean up the Rx queue */
 	if (!skb_queue_empty(&call->rx_queue) ||
 	    !skb_queue_empty(&call->rx_oos_queue)) {
@@ -817,16 +818,7 @@ static void rxrpc_cleanup_call(struct rxrpc_call *call)
 		return;
 	}
 
-	if (call->conn) {
-		spin_lock(&call->conn->params.peer->lock);
-		hlist_del_init(&call->error_link);
-		spin_unlock(&call->conn->params.peer->lock);
-
-		write_lock_bh(&call->conn->lock);
-		rb_erase(&call->conn_node, &call->conn->calls);
-		write_unlock_bh(&call->conn->lock);
-		rxrpc_put_connection(call->conn);
-	}
+	ASSERTCMP(call->conn, ==, NULL);
 
 	/* Remove the call from the hash */
 	rxrpc_call_hash_del(call);
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index 6d50d0d165b7..a2c1ddade974 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -544,11 +544,19 @@ void rxrpc_disconnect_call(struct rxrpc_call *call)
 
 	_enter("%d,%d", conn->debug_id, call->channel);
 
+	spin_lock(&conn->channel_lock);
+
 	if (conn->channels[chan] == call) {
 		rcu_assign_pointer(conn->channels[chan], NULL);
 		atomic_inc(&conn->avail_chans);
 		wake_up(&conn->channel_wq);
 	}
+
+	spin_unlock(&conn->channel_lock);
+
+	call->conn = NULL;
+	rxrpc_put_connection(conn);
+	_leave("");
 }
 
 /*

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [PATCH net-next 10/19] rxrpc: Add RCU destruction for connections and calls
  2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
                   ` (8 preceding siblings ...)
  2016-06-30 14:11 ` [PATCH net-next 09/19] rxrpc: Release a call's connection ref on call disconnection David Howells
@ 2016-06-30 14:11 ` David Howells
  2016-06-30 14:12 ` [PATCH net-next 11/19] rxrpc: Access socket accept queue under right lock David Howells
                   ` (9 subsequent siblings)
  19 siblings, 0 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:11 UTC (permalink / raw)
  To: davem; +Cc: dhowells, netdev, linux-afs, linux-kernel

Add RCU destruction for connections and calls as the RCU lookup from the
transport socket data_ready handler is going to come along shortly.

Whilst we're at it, move the cleanup workqueue flushing and RCU barrierage
into the destruction code for the objects that need it (locals and
connections) and add the extra RCU barrier required for connection cleanup.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/af_rxrpc.c     |   19 -------------------
 net/rxrpc/ar-internal.h  |    4 +++-
 net/rxrpc/call_object.c  |   15 +++++++++++++--
 net/rxrpc/conn_object.c  |   29 ++++++++++++++++++++++++++---
 net/rxrpc/local_object.c |   19 +++++++++++--------
 5 files changed, 53 insertions(+), 33 deletions(-)

diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index d5073eb02498..d6e4e3b69dc3 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -788,26 +788,7 @@ static void __exit af_rxrpc_exit(void)
 	proto_unregister(&rxrpc_proto);
 	rxrpc_destroy_all_calls();
 	rxrpc_destroy_all_connections();
-
 	ASSERTCMP(atomic_read(&rxrpc_n_skbs), ==, 0);
-
-	/* We need to flush the scheduled work twice because the local endpoint
-	 * records involve a work item in their destruction as they can only be
-	 * destroyed from process context.  However, a connection may have a
-	 * work item outstanding - and this will pin the local endpoint record
-	 * until the connection goes away.
-	 *
-	 * Peers don't pin locals and calls pin sockets - which prevents the
-	 * module from being unloaded - so we should only need two flushes.
-	 */
-	_debug("flush scheduled work");
-	flush_workqueue(rxrpc_workqueue);
-	_debug("flush scheduled work 2");
-	flush_workqueue(rxrpc_workqueue);
-	_debug("synchronise RCU");
-	rcu_barrier();
-	_debug("destroy locals");
-	rxrpc_destroy_client_conn_ids();
 	rxrpc_destroy_all_locals();
 
 	remove_proc_entry("rxrpc_conns", init_net.proc_net);
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index d43cb7831693..ea45f1fc4ded 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -292,9 +292,10 @@ struct rxrpc_connection {
 	struct rxrpc_conn_parameters params;
 
 	spinlock_t		channel_lock;
-	struct rxrpc_call	*channels[RXRPC_MAXCALLS]; /* active calls */
+	struct rxrpc_call __rcu	*channels[RXRPC_MAXCALLS]; /* active calls */
 	wait_queue_head_t	channel_wq;	/* queue to wait for channel to become available */
 
+	struct rcu_head		rcu;
 	struct work_struct	processor;	/* connection event processor */
 	union {
 		struct rb_node	client_node;	/* Node in local->client_conns */
@@ -399,6 +400,7 @@ enum rxrpc_call_state {
  * - matched by { connection, call_id }
  */
 struct rxrpc_call {
+	struct rcu_head		rcu;
 	struct rxrpc_connection	*conn;		/* connection carrying call */
 	struct rxrpc_sock	*socket;	/* socket responsible */
 	struct timer_list	lifetimer;	/* lifetime remaining on call */
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index b43d89c89744..3c4f8517ab55 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -544,7 +544,7 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
 	candidate = NULL;
 	rb_link_node(&call->conn_node, parent, p);
 	rb_insert_color(&call->conn_node, &conn->calls);
-	conn->channels[call->channel] = call;
+	rcu_assign_pointer(conn->channels[call->channel], call);
 	sock_hold(&rx->sk);
 	rxrpc_get_connection(conn);
 	write_unlock_bh(&conn->lock);
@@ -795,6 +795,17 @@ void __rxrpc_put_call(struct rxrpc_call *call)
 }
 
 /*
+ * Final call destruction under RCU.
+ */
+static void rxrpc_rcu_destroy_call(struct rcu_head *rcu)
+{
+	struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu);
+
+	rxrpc_purge_queue(&call->rx_queue);
+	kmem_cache_free(rxrpc_call_jar, call);
+}
+
+/*
  * clean up a call
  */
 static void rxrpc_cleanup_call(struct rxrpc_call *call)
@@ -849,7 +860,7 @@ static void rxrpc_cleanup_call(struct rxrpc_call *call)
 	rxrpc_purge_queue(&call->rx_queue);
 	ASSERT(skb_queue_empty(&call->rx_oos_queue));
 	sock_put(&call->socket->sk);
-	kmem_cache_free(rxrpc_call_jar, call);
+	call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
 }
 
 /*
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index a2c1ddade974..b409b2cf7651 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -584,9 +584,12 @@ void rxrpc_put_connection(struct rxrpc_connection *conn)
 /*
  * destroy a virtual connection
  */
-static void rxrpc_destroy_connection(struct rxrpc_connection *conn)
+static void rxrpc_destroy_connection(struct rcu_head *rcu)
 {
-	_enter("%p{%d}", conn, atomic_read(&conn->usage));
+	struct rxrpc_connection *conn =
+		container_of(rcu, struct rxrpc_connection, rcu);
+
+	_enter("{%d,u=%d}", conn->debug_id, atomic_read(&conn->usage));
 
 	ASSERTCMP(atomic_read(&conn->usage), ==, 0);
 
@@ -688,7 +691,8 @@ static void rxrpc_connection_reaper(struct work_struct *work)
 		list_del_init(&conn->link);
 
 		ASSERTCMP(atomic_read(&conn->usage), ==, 0);
-		rxrpc_destroy_connection(conn);
+		skb_queue_purge(&conn->rx_queue);
+		call_rcu(&conn->rcu, rxrpc_destroy_connection);
 	}
 
 	_leave("");
@@ -700,11 +704,30 @@ static void rxrpc_connection_reaper(struct work_struct *work)
  */
 void __exit rxrpc_destroy_all_connections(void)
 {
+	struct rxrpc_connection *conn, *_p;
+	bool leak = false;
+
 	_enter("");
 
 	rxrpc_connection_expiry = 0;
 	cancel_delayed_work(&rxrpc_connection_reap);
 	rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0);
+	flush_workqueue(rxrpc_workqueue);
+
+	write_lock(&rxrpc_connection_lock);
+	list_for_each_entry_safe(conn, _p, &rxrpc_connections, link) {
+		pr_err("AF_RXRPC: Leaked conn %p {%d}\n",
+		       conn, atomic_read(&conn->usage));
+		leak = true;
+	}
+	write_unlock(&rxrpc_connection_lock);
+	BUG_ON(leak);
+
+	/* Make sure the local and peer records pinned by any dying connections
+	 * are released.
+	 */
+	rcu_barrier();
+	rxrpc_destroy_client_conn_ids();
 
 	_leave("");
 }
diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c
index 3ab7764f7cd8..a753796fbe8f 100644
--- a/net/rxrpc/local_object.c
+++ b/net/rxrpc/local_object.c
@@ -374,14 +374,17 @@ void __exit rxrpc_destroy_all_locals(void)
 
 	_enter("");
 
-	if (list_empty(&rxrpc_local_endpoints))
-		return;
+	flush_workqueue(rxrpc_workqueue);
 
-	mutex_lock(&rxrpc_local_mutex);
-	list_for_each_entry(local, &rxrpc_local_endpoints, link) {
-		pr_err("AF_RXRPC: Leaked local %p {%d}\n",
-		       local, atomic_read(&local->usage));
+	if (!list_empty(&rxrpc_local_endpoints)) {
+		mutex_lock(&rxrpc_local_mutex);
+		list_for_each_entry(local, &rxrpc_local_endpoints, link) {
+			pr_err("AF_RXRPC: Leaked local %p {%d}\n",
+			       local, atomic_read(&local->usage));
+		}
+		mutex_unlock(&rxrpc_local_mutex);
+		BUG();
 	}
-	mutex_unlock(&rxrpc_local_mutex);
-	BUG();
+
+	rcu_barrier();
 }

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [PATCH net-next 11/19] rxrpc: Access socket accept queue under right lock
  2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
                   ` (9 preceding siblings ...)
  2016-06-30 14:11 ` [PATCH net-next 10/19] rxrpc: Add RCU destruction for connections and calls David Howells
@ 2016-06-30 14:12 ` David Howells
  2016-06-30 14:12 ` [PATCH net-next 12/19] rxrpc: Call channels should have separate call number spaces David Howells
                   ` (8 subsequent siblings)
  19 siblings, 0 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:12 UTC (permalink / raw)
  To: davem; +Cc: dhowells, netdev, linux-afs, linux-kernel

The socket's accept queue (socket->acceptq) should be accessed under
socket->call_lock, not under the connection lock.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/call_event.c |    4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c
index 638d66df284a..fc32aa5764a2 100644
--- a/net/rxrpc/call_event.c
+++ b/net/rxrpc/call_event.c
@@ -1089,7 +1089,7 @@ void rxrpc_process_call(struct work_struct *work)
 
 		if (call->state == RXRPC_CALL_SERVER_SECURING) {
 			_debug("securing");
-			write_lock(&call->conn->lock);
+			write_lock(&call->socket->call_lock);
 			if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
 			    !test_bit(RXRPC_CALL_EV_RELEASE, &call->events)) {
 				_debug("not released");
@@ -1097,7 +1097,7 @@ void rxrpc_process_call(struct work_struct *work)
 				list_move_tail(&call->accept_link,
 					       &call->socket->acceptq);
 			}
-			write_unlock(&call->conn->lock);
+			write_unlock(&call->socket->call_lock);
 			read_lock(&call->state_lock);
 			if (call->state < RXRPC_CALL_COMPLETE)
 				set_bit(RXRPC_CALL_EV_POST_ACCEPT, &call->events);

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [PATCH net-next 12/19] rxrpc: Call channels should have separate call number spaces
  2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
                   ` (10 preceding siblings ...)
  2016-06-30 14:12 ` [PATCH net-next 11/19] rxrpc: Access socket accept queue under right lock David Howells
@ 2016-06-30 14:12 ` David Howells
  2016-06-30 14:12 ` [PATCH net-next 13/19] rxrpc: Split client connection code out into its own file David Howells
                   ` (7 subsequent siblings)
  19 siblings, 0 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:12 UTC (permalink / raw)
  To: davem; +Cc: dhowells, netdev, linux-afs, linux-kernel

Each channel on a connection has a separate, independent number space from
which to allocate callNumber values.  It is entirely possible, for example,
to have a connection with four active calls, each with call number 1.

Note that the callNumber values for any particular channel don't have to
start at 1, but they are supposed to increment monotonically for that
channel from a client's perspective and may not be reused once the call
number is transmitted (until the epoch cycles all the way back round).

Currently, however, call numbers are allocated on a per-connection basis
and, further, are held in an rb-tree.  The rb-tree is redundant as the four
channel pointers in the rxrpc_connection struct are entirely capable of
pointing to all the calls currently in progress on a connection.

To this end, make the following changes:

 (1) Handle call number allocation independently per channel.

 (2) Get rid of the conn->calls rb-tree.  This is overkill as a connection
     may have a maximum of four calls in progress at any one time.  Use the
     pointers in the channels[] array instead, indexed by the channel
     number from the packet.

 (3) For each channel, save the result of the last call that was in
     progress on that channel in conn->channels[] so that the final ACK or
     ABORT packet can be replayed if necessary.  Any call earlier than that
     is just ignored.  If we've seen the next call number in a packet, the
     last one is most definitely defunct.

 (4) When generating a RESPONSE packet for a connection, the call number
     counter for each channel must be included in it.

 (5) When parsing a RESPONSE packet for a connection, the call number
     counters contained therein should be used to set the minimum expected
     call numbers on each channel.

To do in future commits:

 (1) Replay terminal packets based on the last call stored in
     conn->channels[].

 (2) Connections should be retired before the callNumber space on any
     channel runs out.

 (3) A server is expected to disregard or reject any new incoming call that
     has a call number less than the current call number counter.  The call
     number counter for that channel must be advanced to the new call
     number.

     Note that the server cannot just require that the next call that it
     sees on a channel be exactly the call number counter + 1 because then
     there's a scenario that could cause a problem: The client transmits a
     packet to initiate a connection, the network goes out, the server
     sends an ACK (which gets lost), the client sends an ABORT (which also
     gets lost); the network then reconnects, the client then reuses the
     call number for the next call (it doesn't know the server already saw
     the call number), but the server thinks it already has the first
     packet of this call (it doesn't know that the client doesn't know that
     it saw the call number the first time).

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/ar-internal.h |   14 +++++----
 net/rxrpc/call_object.c |   59 ++++++++++++++----------------------
 net/rxrpc/conn_event.c  |   20 ++++++------
 net/rxrpc/conn_object.c |   77 ++++++++++++++++++-----------------------------
 net/rxrpc/proc.c        |    5 +--
 net/rxrpc/rxkad.c       |   41 ++++++++++++++++---------
 6 files changed, 101 insertions(+), 115 deletions(-)

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index ea45f1fc4ded..3685d99f2671 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -292,7 +292,14 @@ struct rxrpc_connection {
 	struct rxrpc_conn_parameters params;
 
 	spinlock_t		channel_lock;
-	struct rxrpc_call __rcu	*channels[RXRPC_MAXCALLS]; /* active calls */
+
+	struct rxrpc_channel {
+		struct rxrpc_call __rcu	*call;		/* Active call */
+		u32			call_id;	/* ID of current call */
+		u32			call_counter;	/* Call ID counter */
+		u32			last_call;	/* ID of last call */
+		u32			last_result;	/* Result of last call (0/abort) */
+	} channels[RXRPC_MAXCALLS];
 	wait_queue_head_t	channel_wq;	/* queue to wait for channel to become available */
 
 	struct rcu_head		rcu;
@@ -303,7 +310,6 @@ struct rxrpc_connection {
 	};
 	struct list_head	proc_link;	/* link in procfs list */
 	struct list_head	link;		/* link in master connection list */
-	struct rb_root		calls;		/* calls on this connection */
 	struct sk_buff_head	rx_queue;	/* received conn-level packets */
 	const struct rxrpc_security *security;	/* applied security module */
 	struct key		*server_key;	/* security for this service */
@@ -312,7 +318,6 @@ struct rxrpc_connection {
 	unsigned long		flags;
 	unsigned long		events;
 	unsigned long		put_time;	/* Time at which last put */
-	rwlock_t		lock;		/* access lock */
 	spinlock_t		state_lock;	/* state-change lock */
 	atomic_t		usage;
 	enum rxrpc_conn_proto_state state : 8;	/* current state of connection */
@@ -320,7 +325,6 @@ struct rxrpc_connection {
 	u32			remote_abort;	/* remote abort code */
 	int			error;		/* local error incurred */
 	int			debug_id;	/* debug ID for printks */
-	unsigned int		call_counter;	/* call ID counter */
 	atomic_t		serial;		/* packet serial number counter */
 	atomic_t		hi_serial;	/* highest serial number received */
 	atomic_t		avail_chans;	/* number of channels available */
@@ -413,7 +417,6 @@ struct rxrpc_call {
 	struct hlist_node	error_link;	/* link in error distribution list */
 	struct list_head	accept_link;	/* calls awaiting acceptance */
 	struct rb_node		sock_node;	/* node in socket call tree */
-	struct rb_node		conn_node;	/* node in connection call tree */
 	struct sk_buff_head	rx_queue;	/* received packets */
 	struct sk_buff_head	rx_oos_queue;	/* packets received out of sequence */
 	struct sk_buff		*tx_pending;	/* Tx socket buffer being filled */
@@ -565,6 +568,7 @@ int rxrpc_connect_call(struct rxrpc_call *, struct rxrpc_conn_parameters *,
 struct rxrpc_connection *rxrpc_find_connection(struct rxrpc_local *,
 					       struct rxrpc_peer *,
 					       struct sk_buff *);
+void __rxrpc_disconnect_call(struct rxrpc_call *);
 void rxrpc_disconnect_call(struct rxrpc_call *);
 void rxrpc_put_connection(struct rxrpc_connection *);
 void __exit rxrpc_destroy_all_connections(void);
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 3c4f8517ab55..118b0c00a7b4 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -456,8 +456,7 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
 {
 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 	struct rxrpc_call *call, *candidate;
-	struct rb_node **p, *parent;
-	u32 call_id;
+	u32 call_id, chan;
 
 	_enter(",%d", conn->debug_id);
 
@@ -467,20 +466,23 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
 	if (!candidate)
 		return ERR_PTR(-EBUSY);
 
+	chan = sp->hdr.cid & RXRPC_CHANNELMASK;
 	candidate->socket	= rx;
 	candidate->conn		= conn;
 	candidate->cid		= sp->hdr.cid;
 	candidate->call_id	= sp->hdr.callNumber;
-	candidate->channel	= sp->hdr.cid & RXRPC_CHANNELMASK;
+	candidate->channel	= chan;
 	candidate->rx_data_post	= 0;
 	candidate->state	= RXRPC_CALL_SERVER_ACCEPTING;
 	if (conn->security_ix > 0)
 		candidate->state = RXRPC_CALL_SERVER_SECURING;
 
-	write_lock_bh(&conn->lock);
+	spin_lock(&conn->channel_lock);
 
 	/* set the channel for this call */
-	call = conn->channels[candidate->channel];
+	call = rcu_dereference_protected(conn->channels[chan].call,
+					 lockdep_is_held(&conn->channel_lock));
+
 	_debug("channel[%u] is %p", candidate->channel, call);
 	if (call && call->call_id == sp->hdr.callNumber) {
 		/* already set; must've been a duplicate packet */
@@ -509,9 +511,9 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
 		       call->debug_id, rxrpc_call_states[call->state]);
 
 		if (call->state >= RXRPC_CALL_COMPLETE) {
-			conn->channels[call->channel] = NULL;
+			__rxrpc_disconnect_call(conn->channels[chan].call);
 		} else {
-			write_unlock_bh(&conn->lock);
+			spin_unlock(&conn->channel_lock);
 			kmem_cache_free(rxrpc_call_jar, candidate);
 			_leave(" = -EBUSY");
 			return ERR_PTR(-EBUSY);
@@ -521,33 +523,22 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
 	/* check the call number isn't duplicate */
 	_debug("check dup");
 	call_id = sp->hdr.callNumber;
-	p = &conn->calls.rb_node;
-	parent = NULL;
-	while (*p) {
-		parent = *p;
-		call = rb_entry(parent, struct rxrpc_call, conn_node);
-
-		/* The tree is sorted in order of the __be32 value without
-		 * turning it into host order.
-		 */
-		if (call_id < call->call_id)
-			p = &(*p)->rb_left;
-		else if (call_id > call->call_id)
-			p = &(*p)->rb_right;
-		else
-			goto old_call;
-	}
+
+	/* We just ignore calls prior to the current call ID.  Terminated calls
+	 * are handled via the connection.
+	 */
+	if (call_id <= conn->channels[chan].call_counter)
+		goto old_call; /* TODO: Just drop packet */
 
 	/* make the call available */
 	_debug("new call");
 	call = candidate;
 	candidate = NULL;
-	rb_link_node(&call->conn_node, parent, p);
-	rb_insert_color(&call->conn_node, &conn->calls);
-	rcu_assign_pointer(conn->channels[call->channel], call);
+	conn->channels[chan].call_counter = call_id;
+	rcu_assign_pointer(conn->channels[chan].call, call);
 	sock_hold(&rx->sk);
 	rxrpc_get_connection(conn);
-	write_unlock_bh(&conn->lock);
+	spin_unlock(&conn->channel_lock);
 
 	spin_lock(&conn->params.peer->lock);
 	hlist_add_head(&call->error_link, &conn->params.peer->error_targets);
@@ -587,19 +578,19 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
 	return call;
 
 extant_call:
-	write_unlock_bh(&conn->lock);
+	spin_unlock(&conn->channel_lock);
 	kmem_cache_free(rxrpc_call_jar, candidate);
 	_leave(" = %p {%d} [extant]", call, call ? call->debug_id : -1);
 	return call;
 
 aborted_call:
-	write_unlock_bh(&conn->lock);
+	spin_unlock(&conn->channel_lock);
 	kmem_cache_free(rxrpc_call_jar, candidate);
 	_leave(" = -ECONNABORTED");
 	return ERR_PTR(-ECONNABORTED);
 
 old_call:
-	write_unlock_bh(&conn->lock);
+	spin_unlock(&conn->channel_lock);
 	kmem_cache_free(rxrpc_call_jar, candidate);
 	_leave(" = -ECONNRESET [old]");
 	return ERR_PTR(-ECONNRESET);
@@ -647,8 +638,7 @@ void rxrpc_release_call(struct rxrpc_call *call)
 	write_unlock_bh(&rx->call_lock);
 
 	/* free up the channel for reuse */
-	write_lock_bh(&conn->lock);
-	write_lock(&call->state_lock);
+	write_lock_bh(&call->state_lock);
 
 	if (call->state < RXRPC_CALL_COMPLETE &&
 	    call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
@@ -656,10 +646,7 @@ void rxrpc_release_call(struct rxrpc_call *call)
 		call->state = RXRPC_CALL_LOCALLY_ABORTED;
 		call->local_abort = RX_CALL_DEAD;
 	}
-	write_unlock(&call->state_lock);
-
-	rb_erase(&call->conn_node, &conn->calls);
-	write_unlock_bh(&conn->lock);
+	write_unlock_bh(&call->state_lock);
 
 	rxrpc_disconnect_call(call);
 
diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c
index 9ceddd3fd5db..59702feb675d 100644
--- a/net/rxrpc/conn_event.c
+++ b/net/rxrpc/conn_event.c
@@ -31,15 +31,15 @@ static void rxrpc_abort_calls(struct rxrpc_connection *conn, int state,
 			      u32 abort_code)
 {
 	struct rxrpc_call *call;
-	struct rb_node *p;
+	int i;
 
 	_enter("{%d},%x", conn->debug_id, abort_code);
 
-	read_lock_bh(&conn->lock);
+	spin_lock(&conn->channel_lock);
 
-	for (p = rb_first(&conn->calls); p; p = rb_next(p)) {
-		call = rb_entry(p, struct rxrpc_call, conn_node);
-		write_lock(&call->state_lock);
+	for (i = 0; i < RXRPC_MAXCALLS; i++) {
+		call = conn->channels[i].call;
+		write_lock_bh(&call->state_lock);
 		if (call->state <= RXRPC_CALL_COMPLETE) {
 			call->state = state;
 			if (state == RXRPC_CALL_LOCALLY_ABORTED) {
@@ -51,10 +51,10 @@ static void rxrpc_abort_calls(struct rxrpc_connection *conn, int state,
 			}
 			rxrpc_queue_call(call);
 		}
-		write_unlock(&call->state_lock);
+		write_unlock_bh(&call->state_lock);
 	}
 
-	read_unlock_bh(&conn->lock);
+	spin_unlock(&conn->channel_lock);
 	_leave("");
 }
 
@@ -192,17 +192,17 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
 		if (ret < 0)
 			return ret;
 
-		read_lock_bh(&conn->lock);
+		spin_lock(&conn->channel_lock);
 		spin_lock(&conn->state_lock);
 
 		if (conn->state == RXRPC_CONN_SERVICE_CHALLENGING) {
 			conn->state = RXRPC_CONN_SERVICE;
 			for (loop = 0; loop < RXRPC_MAXCALLS; loop++)
-				rxrpc_call_is_secure(conn->channels[loop]);
+				rxrpc_call_is_secure(conn->channels[loop].call);
 		}
 
 		spin_unlock(&conn->state_lock);
-		read_unlock_bh(&conn->lock);
+		spin_unlock(&conn->channel_lock);
 		return 0;
 
 	default:
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index b409b2cf7651..ef6090ce6343 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -48,10 +48,8 @@ static struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
 		INIT_WORK(&conn->processor, &rxrpc_process_connection);
 		INIT_LIST_HEAD(&conn->proc_link);
 		INIT_LIST_HEAD(&conn->link);
-		conn->calls = RB_ROOT;
 		skb_queue_head_init(&conn->rx_queue);
 		conn->security = &rxrpc_no_security;
-		rwlock_init(&conn->lock);
 		spin_lock_init(&conn->state_lock);
 		atomic_set(&conn->usage, 1);
 		conn->debug_id = atomic_inc_return(&rxrpc_debug_id);
@@ -65,39 +63,6 @@ static struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
 }
 
 /*
- * add a call to a connection's call-by-ID tree
- */
-static void rxrpc_add_call_ID_to_conn(struct rxrpc_connection *conn,
-				      struct rxrpc_call *call)
-{
-	struct rxrpc_call *xcall;
-	struct rb_node *parent, **p;
-	__be32 call_id;
-
-	write_lock_bh(&conn->lock);
-
-	call_id = call->call_id;
-	p = &conn->calls.rb_node;
-	parent = NULL;
-	while (*p) {
-		parent = *p;
-		xcall = rb_entry(parent, struct rxrpc_call, conn_node);
-
-		if (call_id < xcall->call_id)
-			p = &(*p)->rb_left;
-		else if (call_id > xcall->call_id)
-			p = &(*p)->rb_right;
-		else
-			BUG();
-	}
-
-	rb_link_node(&call->conn_node, parent, p);
-	rb_insert_color(&call->conn_node, &conn->calls);
-
-	write_unlock_bh(&conn->lock);
-}
-
-/*
  * Allocate a client connection.  The caller must take care to clear any
  * padding bytes in *cp.
  */
@@ -280,12 +245,12 @@ found_channel:
 	call->channel	= chan;
 	call->epoch	= conn->proto.epoch;
 	call->cid	= conn->proto.cid | chan;
-	call->call_id	= ++conn->call_counter;
-	rcu_assign_pointer(conn->channels[chan], call);
+	call->call_id	= ++conn->channels[chan].call_counter;
+	conn->channels[chan].call_id = call->call_id;
+	rcu_assign_pointer(conn->channels[chan].call, call);
 
 	_net("CONNECT call %d on conn %d", call->debug_id, conn->debug_id);
 
-	rxrpc_add_call_ID_to_conn(conn, call);
 	spin_unlock(&conn->channel_lock);
 	rxrpc_put_peer(cp->peer);
 	cp->peer = NULL;
@@ -329,7 +294,7 @@ found_extant_conn:
 	spin_lock(&conn->channel_lock);
 
 	for (chan = 0; chan < RXRPC_MAXCALLS; chan++)
-		if (!conn->channels[chan])
+		if (!conn->channels[chan].call)
 			goto found_channel;
 	BUG();
 
@@ -535,28 +500,47 @@ found:
 
 /*
  * Disconnect a call and clear any channel it occupies when that call
- * terminates.
+ * terminates.  The caller must hold the channel_lock and must release the
+ * call's ref on the connection.
  */
-void rxrpc_disconnect_call(struct rxrpc_call *call)
+void __rxrpc_disconnect_call(struct rxrpc_call *call)
 {
 	struct rxrpc_connection *conn = call->conn;
-	unsigned chan = call->channel;
+	struct rxrpc_channel *chan = &conn->channels[call->channel];
 
 	_enter("%d,%d", conn->debug_id, call->channel);
 
-	spin_lock(&conn->channel_lock);
+	if (chan->call == call) {
+		/* Save the result of the call so that we can repeat it if necessary
+		 * through the channel, whilst disposing of the actual call record.
+		 */
+		chan->last_result = call->local_abort;
+		smp_wmb();
+		chan->last_call = chan->call_id;
+		chan->call_id = chan->call_counter;
 
-	if (conn->channels[chan] == call) {
-		rcu_assign_pointer(conn->channels[chan], NULL);
+		rcu_assign_pointer(chan->call, NULL);
 		atomic_inc(&conn->avail_chans);
 		wake_up(&conn->channel_wq);
 	}
 
+	_leave("");
+}
+
+/*
+ * Disconnect a call and clear any channel it occupies when that call
+ * terminates.
+ */
+void rxrpc_disconnect_call(struct rxrpc_call *call)
+{
+	struct rxrpc_connection *conn = call->conn;
+
+	spin_lock(&conn->channel_lock);
+	__rxrpc_disconnect_call(call);
 	spin_unlock(&conn->channel_lock);
 
 	call->conn = NULL;
 	rxrpc_put_connection(conn);
-	_leave("");
 }
 
 /*
@@ -595,7 +579,6 @@ static void rxrpc_destroy_connection(struct rcu_head *rcu)
 
 	_net("DESTROY CONN %d", conn->debug_id);
 
-	ASSERT(RB_EMPTY_ROOT(&conn->calls));
 	rxrpc_purge_queue(&conn->rx_queue);
 
 	conn->security->clear(conn);
diff --git a/net/rxrpc/proc.c b/net/rxrpc/proc.c
index 46d470335263..ed5a94d8be7b 100644
--- a/net/rxrpc/proc.c
+++ b/net/rxrpc/proc.c
@@ -136,7 +136,7 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v)
 	if (v == &rxrpc_connection_proc_list) {
 		seq_puts(seq,
 			 "Proto Local                  Remote                "
-			 " SvID ConnID   Calls    End Use State    Key     "
+			 " SvID ConnID   End Use State    Key     "
 			 " Serial   ISerial\n"
 			 );
 		return 0;
@@ -152,13 +152,12 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v)
 		&conn->proto.ipv4_addr, ntohs(conn->proto.port));
 
 	seq_printf(seq,
-		   "UDP   %-22.22s %-22.22s %4x %08x %08x %s %3u"
+		   "UDP   %-22.22s %-22.22s %4x %08x %s %3u"
 		   " %s %08x %08x %08x\n",
 		   lbuff,
 		   rbuff,
 		   conn->params.service_id,
 		   conn->proto.cid,
-		   conn->call_counter,
 		   rxrpc_conn_is_service(conn) ? "Svc" : "Clt",
 		   atomic_read(&conn->usage),
 		   rxrpc_conn_states[conn->state],
diff --git a/net/rxrpc/rxkad.c b/net/rxrpc/rxkad.c
index 3acc7c1241d4..63afa9e9cc08 100644
--- a/net/rxrpc/rxkad.c
+++ b/net/rxrpc/rxkad.c
@@ -767,14 +767,10 @@ static int rxkad_respond_to_challenge(struct rxrpc_connection *conn,
 	resp.kvno			= htonl(token->kad->kvno);
 	resp.ticket_len			= htonl(token->kad->ticket_len);
 
-	resp.encrypted.call_id[0] =
-		htonl(conn->channels[0] ? conn->channels[0]->call_id : 0);
-	resp.encrypted.call_id[1] =
-		htonl(conn->channels[1] ? conn->channels[1]->call_id : 0);
-	resp.encrypted.call_id[2] =
-		htonl(conn->channels[2] ? conn->channels[2]->call_id : 0);
-	resp.encrypted.call_id[3] =
-		htonl(conn->channels[3] ? conn->channels[3]->call_id : 0);
+	resp.encrypted.call_id[0] = htonl(conn->channels[0].call_counter);
+	resp.encrypted.call_id[1] = htonl(conn->channels[1].call_counter);
+	resp.encrypted.call_id[2] = htonl(conn->channels[2].call_counter);
+	resp.encrypted.call_id[3] = htonl(conn->channels[3].call_counter);
 
 	/* calculate the response checksum and then do the encryption */
 	rxkad_calc_response_checksum(&resp);
@@ -991,7 +987,7 @@ static int rxkad_verify_response(struct rxrpc_connection *conn,
 	void *ticket;
 	u32 abort_code, version, kvno, ticket_len, level;
 	__be32 csum;
-	int ret;
+	int ret, i;
 
 	_enter("{%d,%x}", conn->debug_id, key_serial(conn->server_key));
 
@@ -1054,11 +1050,26 @@ static int rxkad_verify_response(struct rxrpc_connection *conn,
 	if (response.encrypted.checksum != csum)
 		goto protocol_error_free;
 
-	if (ntohl(response.encrypted.call_id[0]) > INT_MAX ||
-	    ntohl(response.encrypted.call_id[1]) > INT_MAX ||
-	    ntohl(response.encrypted.call_id[2]) > INT_MAX ||
-	    ntohl(response.encrypted.call_id[3]) > INT_MAX)
-		goto protocol_error_free;
+	spin_lock(&conn->channel_lock);
+	for (i = 0; i < RXRPC_MAXCALLS; i++) {
+		struct rxrpc_call *call;
+		u32 call_id = ntohl(response.encrypted.call_id[i]);
+
+		if (call_id > INT_MAX)
+			goto protocol_error_unlock;
+
+		if (call_id < conn->channels[i].call_counter)
+			goto protocol_error_unlock;
+		if (call_id > conn->channels[i].call_counter) {
+			call = rcu_dereference_protected(
+				conn->channels[i].call,
+				lockdep_is_held(&conn->channel_lock));
+			if (call && call->state < RXRPC_CALL_COMPLETE)
+				goto protocol_error_unlock;
+			conn->channels[i].call_counter = call_id;
+		}
+	}
+	spin_unlock(&conn->channel_lock);
 
 	abort_code = RXKADOUTOFSEQUENCE;
 	if (ntohl(response.encrypted.inc_nonce) != conn->security_nonce + 1)
@@ -1083,6 +1094,8 @@ static int rxkad_verify_response(struct rxrpc_connection *conn,
 	_leave(" = 0");
 	return 0;
 
+protocol_error_unlock:
+	spin_unlock(&conn->channel_lock);
 protocol_error_free:
 	kfree(ticket);
 protocol_error:

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [PATCH net-next 13/19] rxrpc: Split client connection code out into its own file
  2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
                   ` (11 preceding siblings ...)
  2016-06-30 14:12 ` [PATCH net-next 12/19] rxrpc: Call channels should have separate call number spaces David Howells
@ 2016-06-30 14:12 ` David Howells
  2016-06-30 14:12 ` [PATCH net-next 14/19] rxrpc: Split service " David Howells
                   ` (6 subsequent siblings)
  19 siblings, 0 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:12 UTC (permalink / raw)
  To: davem; +Cc: dhowells, netdev, linux-afs, linux-kernel

Split the client-specific connection code out into its own file.  It will
behave somewhat differently from the service-specific connection code, so
it makes sense to separate them.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/ar-internal.h |    7 +
 net/rxrpc/conn_client.c |  249 +++++++++++++++++++++++++++++++++++++++++++++++
 net/rxrpc/conn_object.c |  250 -----------------------------------------------
 3 files changed, 254 insertions(+), 252 deletions(-)

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 3685d99f2671..b95380dd0f37 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -545,9 +545,10 @@ void __exit rxrpc_destroy_all_calls(void);
  */
 extern struct idr rxrpc_client_conn_ids;
 
-int rxrpc_get_client_connection_id(struct rxrpc_connection *, gfp_t);
 void rxrpc_put_client_connection_id(struct rxrpc_connection *);
 void rxrpc_destroy_client_conn_ids(void);
+int rxrpc_connect_call(struct rxrpc_call *, struct rxrpc_conn_parameters *,
+		       struct sockaddr_rxrpc *, gfp_t);
 
 /*
  * conn_event.c
@@ -560,11 +561,11 @@ void rxrpc_reject_packets(struct rxrpc_local *);
  * conn_object.c
  */
 extern unsigned int rxrpc_connection_expiry;
+extern struct list_head rxrpc_connections;
 extern struct list_head rxrpc_connection_proc_list;
 extern rwlock_t rxrpc_connection_lock;
 
-int rxrpc_connect_call(struct rxrpc_call *, struct rxrpc_conn_parameters *,
-		       struct sockaddr_rxrpc *, gfp_t);
+struct rxrpc_connection *rxrpc_alloc_connection(gfp_t);
 struct rxrpc_connection *rxrpc_find_connection(struct rxrpc_local *,
 					       struct rxrpc_peer *,
 					       struct sk_buff *);
diff --git a/net/rxrpc/conn_client.c b/net/rxrpc/conn_client.c
index be437d5e90ce..ce2fd4508bb5 100644
--- a/net/rxrpc/conn_client.c
+++ b/net/rxrpc/conn_client.c
@@ -33,7 +33,8 @@ static DEFINE_SPINLOCK(rxrpc_conn_id_lock);
  * client conns away from the current allocation point to try and keep the IDs
  * concentrated.  We will also need to retire connections from an old epoch.
  */
-int rxrpc_get_client_connection_id(struct rxrpc_connection *conn, gfp_t gfp)
+static int rxrpc_get_client_connection_id(struct rxrpc_connection *conn,
+					  gfp_t gfp)
 {
 	u32 epoch;
 	int id;
@@ -111,3 +112,249 @@ void rxrpc_destroy_client_conn_ids(void)
 
 	idr_destroy(&rxrpc_client_conn_ids);
 }
+
+/*
+ * Allocate a client connection.  The caller must take care to clear any
+ * padding bytes in *cp.
+ */
+static struct rxrpc_connection *
+rxrpc_alloc_client_connection(struct rxrpc_conn_parameters *cp, gfp_t gfp)
+{
+	struct rxrpc_connection *conn;
+	int ret;
+
+	_enter("");
+
+	conn = rxrpc_alloc_connection(gfp);
+	if (!conn) {
+		_leave(" = -ENOMEM");
+		return ERR_PTR(-ENOMEM);
+	}
+
+	conn->params		= *cp;
+	conn->proto.local	= cp->local;
+	conn->proto.epoch	= rxrpc_epoch;
+	conn->proto.cid		= 0;
+	conn->proto.in_clientflag = 0;
+	conn->proto.family	= cp->peer->srx.transport.family;
+	conn->out_clientflag	= RXRPC_CLIENT_INITIATED;
+	conn->state		= RXRPC_CONN_CLIENT;
+
+	switch (conn->proto.family) {
+	case AF_INET:
+		conn->proto.addr_size = sizeof(conn->proto.ipv4_addr);
+		conn->proto.ipv4_addr = cp->peer->srx.transport.sin.sin_addr;
+		conn->proto.port = cp->peer->srx.transport.sin.sin_port;
+		break;
+	}
+
+	ret = rxrpc_get_client_connection_id(conn, gfp);
+	if (ret < 0)
+		goto error_0;
+
+	ret = rxrpc_init_client_conn_security(conn);
+	if (ret < 0)
+		goto error_1;
+
+	ret = conn->security->prime_packet_security(conn);
+	if (ret < 0)
+		goto error_2;
+
+	write_lock(&rxrpc_connection_lock);
+	list_add_tail(&conn->link, &rxrpc_connections);
+	list_add_tail(&conn->proc_link, &rxrpc_connection_proc_list);
+	write_unlock(&rxrpc_connection_lock);
+
+	/* We steal the caller's peer ref. */
+	cp->peer = NULL;
+	rxrpc_get_local(conn->params.local);
+	key_get(conn->params.key);
+
+	_leave(" = %p", conn);
+	return conn;
+
+error_2:
+	conn->security->clear(conn);
+error_1:
+	rxrpc_put_client_connection_id(conn);
+error_0:
+	kfree(conn);
+	_leave(" = %d", ret);
+	return ERR_PTR(ret);
+}
+
+/*
+ * find a connection for a call
+ * - called in process context with IRQs enabled
+ */
+int rxrpc_connect_call(struct rxrpc_call *call,
+		       struct rxrpc_conn_parameters *cp,
+		       struct sockaddr_rxrpc *srx,
+		       gfp_t gfp)
+{
+	struct rxrpc_connection *conn, *candidate = NULL;
+	struct rxrpc_local *local = cp->local;
+	struct rb_node *p, **pp, *parent;
+	long diff;
+	int chan;
+
+	DECLARE_WAITQUEUE(myself, current);
+
+	_enter("{%d,%lx},", call->debug_id, call->user_call_ID);
+
+	cp->peer = rxrpc_lookup_peer(cp->local, srx, gfp);
+	if (!cp->peer)
+		return -ENOMEM;
+
+	if (!cp->exclusive) {
+		/* Search for a existing client connection unless this is going
+		 * to be a connection that's used exclusively for a single call.
+		 */
+		_debug("search 1");
+		spin_lock(&local->client_conns_lock);
+		p = local->client_conns.rb_node;
+		while (p) {
+			conn = rb_entry(p, struct rxrpc_connection, client_node);
+
+#define cmp(X) ((long)conn->params.X - (long)cp->X)
+			diff = (cmp(peer) ?:
+				cmp(key) ?:
+				cmp(security_level));
+			if (diff < 0)
+				p = p->rb_left;
+			else if (diff > 0)
+				p = p->rb_right;
+			else
+				goto found_extant_conn;
+		}
+		spin_unlock(&local->client_conns_lock);
+	}
+
+	/* We didn't find a connection or we want an exclusive one. */
+	_debug("get new conn");
+	candidate = rxrpc_alloc_client_connection(cp, gfp);
+	if (!candidate) {
+		_leave(" = -ENOMEM");
+		return -ENOMEM;
+	}
+
+	if (cp->exclusive) {
+		/* Assign the call on an exclusive connection to channel 0 and
+		 * don't add the connection to the endpoint's shareable conn
+		 * lookup tree.
+		 */
+		_debug("exclusive chan 0");
+		conn = candidate;
+		atomic_set(&conn->avail_chans, RXRPC_MAXCALLS - 1);
+		spin_lock(&conn->channel_lock);
+		chan = 0;
+		goto found_channel;
+	}
+
+	/* We need to redo the search before attempting to add a new connection
+	 * lest we race with someone else adding a conflicting instance.
+	 */
+	_debug("search 2");
+	spin_lock(&local->client_conns_lock);
+
+	pp = &local->client_conns.rb_node;
+	parent = NULL;
+	while (*pp) {
+		parent = *pp;
+		conn = rb_entry(parent, struct rxrpc_connection, client_node);
+
+		diff = (cmp(peer) ?:
+			cmp(key) ?:
+			cmp(security_level));
+		if (diff < 0)
+			pp = &(*pp)->rb_left;
+		else if (diff > 0)
+			pp = &(*pp)->rb_right;
+		else
+			goto found_extant_conn;
+	}
+
+	/* The second search also failed; simply add the new connection with
+	 * the new call in channel 0.  Note that we need to take the channel
+	 * lock before dropping the client conn lock.
+	 */
+	_debug("new conn");
+	conn = candidate;
+	candidate = NULL;
+
+	rb_link_node(&conn->client_node, parent, pp);
+	rb_insert_color(&conn->client_node, &local->client_conns);
+
+	atomic_set(&conn->avail_chans, RXRPC_MAXCALLS - 1);
+	spin_lock(&conn->channel_lock);
+	spin_unlock(&local->client_conns_lock);
+	chan = 0;
+
+found_channel:
+	_debug("found chan");
+	call->conn	= conn;
+	call->channel	= chan;
+	call->epoch	= conn->proto.epoch;
+	call->cid	= conn->proto.cid | chan;
+	call->call_id	= ++conn->channels[chan].call_counter;
+	conn->channels[chan].call_id = call->call_id;
+	rcu_assign_pointer(conn->channels[chan].call, call);
+
+	_net("CONNECT call %d on conn %d", call->debug_id, conn->debug_id);
+
+	spin_unlock(&conn->channel_lock);
+	rxrpc_put_peer(cp->peer);
+	cp->peer = NULL;
+	_leave(" = %p {u=%d}", conn, atomic_read(&conn->usage));
+	return 0;
+
+	/* We found a suitable connection already in existence.  Discard any
+	 * candidate we may have allocated, and try to get a channel on this
+	 * one.
+	 */
+found_extant_conn:
+	_debug("found conn");
+	rxrpc_get_connection(conn);
+	spin_unlock(&local->client_conns_lock);
+
+	rxrpc_put_connection(candidate);
+
+	if (!atomic_add_unless(&conn->avail_chans, -1, 0)) {
+		if (!gfpflags_allow_blocking(gfp)) {
+			rxrpc_put_connection(conn);
+			_leave(" = -EAGAIN");
+			return -EAGAIN;
+		}
+
+		add_wait_queue(&conn->channel_wq, &myself);
+		for (;;) {
+			set_current_state(TASK_INTERRUPTIBLE);
+			if (atomic_add_unless(&conn->avail_chans, -1, 0))
+				break;
+			if (signal_pending(current))
+				goto interrupted;
+			schedule();
+		}
+		remove_wait_queue(&conn->channel_wq, &myself);
+		__set_current_state(TASK_RUNNING);
+	}
+
+	/* The connection allegedly now has a free channel and we can now
+	 * attach the call to it.
+	 */
+	spin_lock(&conn->channel_lock);
+
+	for (chan = 0; chan < RXRPC_MAXCALLS; chan++)
+		if (!conn->channels[chan].call)
+			goto found_channel;
+	BUG();
+
+interrupted:
+	remove_wait_queue(&conn->channel_wq, &myself);
+	__set_current_state(TASK_RUNNING);
+	rxrpc_put_connection(conn);
+	rxrpc_put_peer(cp->peer);
+	cp->peer = NULL;
+	_leave(" = -ERESTARTSYS");
+	return -ERESTARTSYS;
+}
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index ef6090ce6343..4a2876b9904b 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -27,7 +27,7 @@ unsigned int rxrpc_connection_expiry = 10 * 60;
 
 static void rxrpc_connection_reaper(struct work_struct *work);
 
-static LIST_HEAD(rxrpc_connections);
+LIST_HEAD(rxrpc_connections);
 LIST_HEAD(rxrpc_connection_proc_list);
 DEFINE_RWLOCK(rxrpc_connection_lock);
 static DECLARE_DELAYED_WORK(rxrpc_connection_reap, rxrpc_connection_reaper);
@@ -35,7 +35,7 @@ static DECLARE_DELAYED_WORK(rxrpc_connection_reap, rxrpc_connection_reaper);
 /*
  * allocate a new connection
  */
-static struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
+struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
 {
 	struct rxrpc_connection *conn;
 
@@ -63,252 +63,6 @@ static struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
 }
 
 /*
- * Allocate a client connection.  The caller must take care to clear any
- * padding bytes in *cp.
- */
-static struct rxrpc_connection *
-rxrpc_alloc_client_connection(struct rxrpc_conn_parameters *cp, gfp_t gfp)
-{
-	struct rxrpc_connection *conn;
-	int ret;
-
-	_enter("");
-
-	conn = rxrpc_alloc_connection(gfp);
-	if (!conn) {
-		_leave(" = -ENOMEM");
-		return ERR_PTR(-ENOMEM);
-	}
-
-	conn->params		= *cp;
-	conn->proto.local	= cp->local;
-	conn->proto.epoch	= rxrpc_epoch;
-	conn->proto.cid		= 0;
-	conn->proto.in_clientflag = 0;
-	conn->proto.family	= cp->peer->srx.transport.family;
-	conn->out_clientflag	= RXRPC_CLIENT_INITIATED;
-	conn->state		= RXRPC_CONN_CLIENT;
-
-	switch (conn->proto.family) {
-	case AF_INET:
-		conn->proto.addr_size = sizeof(conn->proto.ipv4_addr);
-		conn->proto.ipv4_addr = cp->peer->srx.transport.sin.sin_addr;
-		conn->proto.port = cp->peer->srx.transport.sin.sin_port;
-		break;
-	}
-
-	ret = rxrpc_get_client_connection_id(conn, gfp);
-	if (ret < 0)
-		goto error_0;
-
-	ret = rxrpc_init_client_conn_security(conn);
-	if (ret < 0)
-		goto error_1;
-
-	ret = conn->security->prime_packet_security(conn);
-	if (ret < 0)
-		goto error_2;
-
-	write_lock(&rxrpc_connection_lock);
-	list_add_tail(&conn->link, &rxrpc_connections);
-	list_add_tail(&conn->proc_link, &rxrpc_connection_proc_list);
-	write_unlock(&rxrpc_connection_lock);
-
-	/* We steal the caller's peer ref. */
-	cp->peer = NULL;
-	rxrpc_get_local(conn->params.local);
-	key_get(conn->params.key);
-
-	_leave(" = %p", conn);
-	return conn;
-
-error_2:
-	conn->security->clear(conn);
-error_1:
-	rxrpc_put_client_connection_id(conn);
-error_0:
-	kfree(conn);
-	_leave(" = %d", ret);
-	return ERR_PTR(ret);
-}
-
-/*
- * find a connection for a call
- * - called in process context with IRQs enabled
- */
-int rxrpc_connect_call(struct rxrpc_call *call,
-		       struct rxrpc_conn_parameters *cp,
-		       struct sockaddr_rxrpc *srx,
-		       gfp_t gfp)
-{
-	struct rxrpc_connection *conn, *candidate = NULL;
-	struct rxrpc_local *local = cp->local;
-	struct rb_node *p, **pp, *parent;
-	long diff;
-	int chan;
-
-	DECLARE_WAITQUEUE(myself, current);
-
-	_enter("{%d,%lx},", call->debug_id, call->user_call_ID);
-
-	cp->peer = rxrpc_lookup_peer(cp->local, srx, gfp);
-	if (!cp->peer)
-		return -ENOMEM;
-
-	if (!cp->exclusive) {
-		/* Search for a existing client connection unless this is going
-		 * to be a connection that's used exclusively for a single call.
-		 */
-		_debug("search 1");
-		spin_lock(&local->client_conns_lock);
-		p = local->client_conns.rb_node;
-		while (p) {
-			conn = rb_entry(p, struct rxrpc_connection, client_node);
-
-#define cmp(X) ((long)conn->params.X - (long)cp->X)
-			diff = (cmp(peer) ?:
-				cmp(key) ?:
-				cmp(security_level));
-			if (diff < 0)
-				p = p->rb_left;
-			else if (diff > 0)
-				p = p->rb_right;
-			else
-				goto found_extant_conn;
-		}
-		spin_unlock(&local->client_conns_lock);
-	}
-
-	/* We didn't find a connection or we want an exclusive one. */
-	_debug("get new conn");
-	candidate = rxrpc_alloc_client_connection(cp, gfp);
-	if (!candidate) {
-		_leave(" = -ENOMEM");
-		return -ENOMEM;
-	}
-
-	if (cp->exclusive) {
-		/* Assign the call on an exclusive connection to channel 0 and
-		 * don't add the connection to the endpoint's shareable conn
-		 * lookup tree.
-		 */
-		_debug("exclusive chan 0");
-		conn = candidate;
-		atomic_set(&conn->avail_chans, RXRPC_MAXCALLS - 1);
-		spin_lock(&conn->channel_lock);
-		chan = 0;
-		goto found_channel;
-	}
-
-	/* We need to redo the search before attempting to add a new connection
-	 * lest we race with someone else adding a conflicting instance.
-	 */
-	_debug("search 2");
-	spin_lock(&local->client_conns_lock);
-
-	pp = &local->client_conns.rb_node;
-	parent = NULL;
-	while (*pp) {
-		parent = *pp;
-		conn = rb_entry(parent, struct rxrpc_connection, client_node);
-
-		diff = (cmp(peer) ?:
-			cmp(key) ?:
-			cmp(security_level));
-		if (diff < 0)
-			pp = &(*pp)->rb_left;
-		else if (diff > 0)
-			pp = &(*pp)->rb_right;
-		else
-			goto found_extant_conn;
-	}
-
-	/* The second search also failed; simply add the new connection with
-	 * the new call in channel 0.  Note that we need to take the channel
-	 * lock before dropping the client conn lock.
-	 */
-	_debug("new conn");
-	conn = candidate;
-	candidate = NULL;
-
-	rb_link_node(&conn->client_node, parent, pp);
-	rb_insert_color(&conn->client_node, &local->client_conns);
-
-	atomic_set(&conn->avail_chans, RXRPC_MAXCALLS - 1);
-	spin_lock(&conn->channel_lock);
-	spin_unlock(&local->client_conns_lock);
-	chan = 0;
-
-found_channel:
-	_debug("found chan");
-	call->conn	= conn;
-	call->channel	= chan;
-	call->epoch	= conn->proto.epoch;
-	call->cid	= conn->proto.cid | chan;
-	call->call_id	= ++conn->channels[chan].call_counter;
-	conn->channels[chan].call_id = call->call_id;
-	rcu_assign_pointer(conn->channels[chan].call, call);
-
-	_net("CONNECT call %d on conn %d", call->debug_id, conn->debug_id);
-
-	spin_unlock(&conn->channel_lock);
-	rxrpc_put_peer(cp->peer);
-	cp->peer = NULL;
-	_leave(" = %p {u=%d}", conn, atomic_read(&conn->usage));
-	return 0;
-
-	/* We found a suitable connection already in existence.  Discard any
-	 * candidate we may have allocated, and try to get a channel on this
-	 * one.
-	 */
-found_extant_conn:
-	_debug("found conn");
-	rxrpc_get_connection(conn);
-	spin_unlock(&local->client_conns_lock);
-
-	rxrpc_put_connection(candidate);
-
-	if (!atomic_add_unless(&conn->avail_chans, -1, 0)) {
-		if (!gfpflags_allow_blocking(gfp)) {
-			rxrpc_put_connection(conn);
-			_leave(" = -EAGAIN");
-			return -EAGAIN;
-		}
-
-		add_wait_queue(&conn->channel_wq, &myself);
-		for (;;) {
-			set_current_state(TASK_INTERRUPTIBLE);
-			if (atomic_add_unless(&conn->avail_chans, -1, 0))
-				break;
-			if (signal_pending(current))
-				goto interrupted;
-			schedule();
-		}
-		remove_wait_queue(&conn->channel_wq, &myself);
-		__set_current_state(TASK_RUNNING);
-	}
-
-	/* The connection allegedly now has a free channel and we can now
-	 * attach the call to it.
-	 */
-	spin_lock(&conn->channel_lock);
-
-	for (chan = 0; chan < RXRPC_MAXCALLS; chan++)
-		if (!conn->channels[chan].call)
-			goto found_channel;
-	BUG();
-
-interrupted:
-	remove_wait_queue(&conn->channel_wq, &myself);
-	__set_current_state(TASK_RUNNING);
-	rxrpc_put_connection(conn);
-	rxrpc_put_peer(cp->peer);
-	cp->peer = NULL;
-	_leave(" = -ERESTARTSYS");
-	return -ERESTARTSYS;
-}
-
-/*
  * get a record of an incoming connection
  */
 struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local,

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [PATCH net-next 14/19] rxrpc: Split service connection code out into its own file
  2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
                   ` (12 preceding siblings ...)
  2016-06-30 14:12 ` [PATCH net-next 13/19] rxrpc: Split client connection code out into its own file David Howells
@ 2016-06-30 14:12 ` David Howells
  2016-06-30 14:12 ` [PATCH net-next 15/19] rxrpc: Move peer lookup from call-accept to new-incoming-conn David Howells
                   ` (5 subsequent siblings)
  19 siblings, 0 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:12 UTC (permalink / raw)
  To: davem; +Cc: dhowells, netdev, linux-afs, linux-kernel

Split the service-specific connection code out into into its own file.  The
client-specific code has already been split out.  This will leave just the
common code in the original file.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/Makefile       |    1 
 net/rxrpc/ar-internal.h  |   13 +++-
 net/rxrpc/conn_object.c  |  133 ------------------------------------------
 net/rxrpc/conn_service.c |  146 ++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 157 insertions(+), 136 deletions(-)
 create mode 100644 net/rxrpc/conn_service.c

diff --git a/net/rxrpc/Makefile b/net/rxrpc/Makefile
index 6522e50fb750..10f3f48a16a8 100644
--- a/net/rxrpc/Makefile
+++ b/net/rxrpc/Makefile
@@ -10,6 +10,7 @@ af-rxrpc-y := \
 	conn_client.o \
 	conn_event.o \
 	conn_object.o \
+	conn_service.o \
 	input.o \
 	insecure.o \
 	key.o \
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index b95380dd0f37..2495c4facf51 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -565,6 +565,9 @@ extern struct list_head rxrpc_connections;
 extern struct list_head rxrpc_connection_proc_list;
 extern rwlock_t rxrpc_connection_lock;
 
+void rxrpc_conn_hash_proto_key(struct rxrpc_conn_proto *);
+void rxrpc_extract_conn_params(struct rxrpc_conn_proto *,
+			       struct rxrpc_local *, struct sk_buff *);
 struct rxrpc_connection *rxrpc_alloc_connection(gfp_t);
 struct rxrpc_connection *rxrpc_find_connection(struct rxrpc_local *,
 					       struct rxrpc_peer *,
@@ -573,9 +576,6 @@ void __rxrpc_disconnect_call(struct rxrpc_call *);
 void rxrpc_disconnect_call(struct rxrpc_call *);
 void rxrpc_put_connection(struct rxrpc_connection *);
 void __exit rxrpc_destroy_all_connections(void);
-struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *,
-						   struct rxrpc_peer *,
-						   struct sk_buff *);
 
 static inline bool rxrpc_conn_is_client(const struct rxrpc_connection *conn)
 {
@@ -606,6 +606,13 @@ static inline void rxrpc_queue_conn(struct rxrpc_connection *conn)
 }
 
 /*
+ * conn_service.c
+ */
+struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *,
+						   struct rxrpc_peer *,
+						   struct sk_buff *);
+
+/*
  * input.c
  */
 void rxrpc_data_ready(struct sock *);
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index 4a2876b9904b..12d7c7850aed 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -63,139 +63,6 @@ struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
 }
 
 /*
- * get a record of an incoming connection
- */
-struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local,
-						   struct rxrpc_peer *peer,
-						   struct sk_buff *skb)
-{
-	struct rxrpc_connection *conn, *candidate = NULL;
-	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
-	struct rb_node *p, **pp;
-	const char *new = "old";
-	__be32 epoch;
-	u32 cid;
-
-	_enter("");
-
-	ASSERT(sp->hdr.flags & RXRPC_CLIENT_INITIATED);
-
-	epoch = sp->hdr.epoch;
-	cid = sp->hdr.cid & RXRPC_CIDMASK;
-
-	/* search the connection list first */
-	read_lock_bh(&peer->conn_lock);
-
-	p = peer->service_conns.rb_node;
-	while (p) {
-		conn = rb_entry(p, struct rxrpc_connection, service_node);
-
-		_debug("maybe %x", conn->proto.cid);
-
-		if (epoch < conn->proto.epoch)
-			p = p->rb_left;
-		else if (epoch > conn->proto.epoch)
-			p = p->rb_right;
-		else if (cid < conn->proto.cid)
-			p = p->rb_left;
-		else if (cid > conn->proto.cid)
-			p = p->rb_right;
-		else
-			goto found_extant_connection;
-	}
-	read_unlock_bh(&peer->conn_lock);
-
-	/* not yet present - create a candidate for a new record and then
-	 * redo the search */
-	candidate = rxrpc_alloc_connection(GFP_NOIO);
-	if (!candidate) {
-		_leave(" = -ENOMEM");
-		return ERR_PTR(-ENOMEM);
-	}
-
-	candidate->proto.local		= local;
-	candidate->proto.epoch		= sp->hdr.epoch;
-	candidate->proto.cid		= sp->hdr.cid & RXRPC_CIDMASK;
-	candidate->proto.in_clientflag	= RXRPC_CLIENT_INITIATED;
-	candidate->params.local		= local;
-	candidate->params.peer		= peer;
-	candidate->params.service_id	= sp->hdr.serviceId;
-	candidate->security_ix		= sp->hdr.securityIndex;
-	candidate->out_clientflag	= 0;
-	candidate->state		= RXRPC_CONN_SERVICE;
-	if (candidate->params.service_id)
-		candidate->state	= RXRPC_CONN_SERVICE_UNSECURED;
-
-	write_lock_bh(&peer->conn_lock);
-
-	pp = &peer->service_conns.rb_node;
-	p = NULL;
-	while (*pp) {
-		p = *pp;
-		conn = rb_entry(p, struct rxrpc_connection, service_node);
-
-		if (epoch < conn->proto.epoch)
-			pp = &(*pp)->rb_left;
-		else if (epoch > conn->proto.epoch)
-			pp = &(*pp)->rb_right;
-		else if (cid < conn->proto.cid)
-			pp = &(*pp)->rb_left;
-		else if (cid > conn->proto.cid)
-			pp = &(*pp)->rb_right;
-		else
-			goto found_extant_second;
-	}
-
-	/* we can now add the new candidate to the list */
-	conn = candidate;
-	candidate = NULL;
-	rb_link_node(&conn->service_node, p, pp);
-	rb_insert_color(&conn->service_node, &peer->service_conns);
-	rxrpc_get_peer(peer);
-	rxrpc_get_local(local);
-
-	write_unlock_bh(&peer->conn_lock);
-
-	write_lock(&rxrpc_connection_lock);
-	list_add_tail(&conn->link, &rxrpc_connections);
-	write_unlock(&rxrpc_connection_lock);
-
-	new = "new";
-
-success:
-	_net("CONNECTION %s %d {%x}", new, conn->debug_id, conn->proto.cid);
-
-	_leave(" = %p {u=%d}", conn, atomic_read(&conn->usage));
-	return conn;
-
-	/* we found the connection in the list immediately */
-found_extant_connection:
-	if (sp->hdr.securityIndex != conn->security_ix) {
-		read_unlock_bh(&peer->conn_lock);
-		goto security_mismatch;
-	}
-	rxrpc_get_connection(conn);
-	read_unlock_bh(&peer->conn_lock);
-	goto success;
-
-	/* we found the connection on the second time through the list */
-found_extant_second:
-	if (sp->hdr.securityIndex != conn->security_ix) {
-		write_unlock_bh(&peer->conn_lock);
-		goto security_mismatch;
-	}
-	rxrpc_get_connection(conn);
-	write_unlock_bh(&peer->conn_lock);
-	kfree(candidate);
-	goto success;
-
-security_mismatch:
-	kfree(candidate);
-	_leave(" = -EKEYREJECTED");
-	return ERR_PTR(-EKEYREJECTED);
-}
-
-/*
  * find a connection based on transport and RxRPC connection ID for an incoming
  * packet
  */
diff --git a/net/rxrpc/conn_service.c b/net/rxrpc/conn_service.c
new file mode 100644
index 000000000000..2536d5276f67
--- /dev/null
+++ b/net/rxrpc/conn_service.c
@@ -0,0 +1,146 @@
+/* Service connection management
+ *
+ * Copyright (C) 2016 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#include <linux/slab.h>
+#include "ar-internal.h"
+
+/*
+ * get a record of an incoming connection
+ */
+struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local,
+						   struct rxrpc_peer *peer,
+						   struct sk_buff *skb)
+{
+	struct rxrpc_connection *conn, *candidate = NULL;
+	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+	struct rb_node *p, **pp;
+	const char *new = "old";
+	__be32 epoch;
+	u32 cid;
+
+	_enter("");
+
+	ASSERT(sp->hdr.flags & RXRPC_CLIENT_INITIATED);
+
+	epoch = sp->hdr.epoch;
+	cid = sp->hdr.cid & RXRPC_CIDMASK;
+
+	/* search the connection list first */
+	read_lock_bh(&peer->conn_lock);
+
+	p = peer->service_conns.rb_node;
+	while (p) {
+		conn = rb_entry(p, struct rxrpc_connection, service_node);
+
+		_debug("maybe %x", conn->proto.cid);
+
+		if (epoch < conn->proto.epoch)
+			p = p->rb_left;
+		else if (epoch > conn->proto.epoch)
+			p = p->rb_right;
+		else if (cid < conn->proto.cid)
+			p = p->rb_left;
+		else if (cid > conn->proto.cid)
+			p = p->rb_right;
+		else
+			goto found_extant_connection;
+	}
+	read_unlock_bh(&peer->conn_lock);
+
+	/* not yet present - create a candidate for a new record and then
+	 * redo the search */
+	candidate = rxrpc_alloc_connection(GFP_NOIO);
+	if (!candidate) {
+		_leave(" = -ENOMEM");
+		return ERR_PTR(-ENOMEM);
+	}
+
+	candidate->proto.local		= local;
+	candidate->proto.epoch		= sp->hdr.epoch;
+	candidate->proto.cid		= sp->hdr.cid & RXRPC_CIDMASK;
+	candidate->proto.in_clientflag	= RXRPC_CLIENT_INITIATED;
+	candidate->params.local		= local;
+	candidate->params.peer		= peer;
+	candidate->params.service_id	= sp->hdr.serviceId;
+	candidate->security_ix		= sp->hdr.securityIndex;
+	candidate->out_clientflag	= 0;
+	candidate->state		= RXRPC_CONN_SERVICE;
+	if (candidate->params.service_id)
+		candidate->state	= RXRPC_CONN_SERVICE_UNSECURED;
+
+	write_lock_bh(&peer->conn_lock);
+
+	pp = &peer->service_conns.rb_node;
+	p = NULL;
+	while (*pp) {
+		p = *pp;
+		conn = rb_entry(p, struct rxrpc_connection, service_node);
+
+		if (epoch < conn->proto.epoch)
+			pp = &(*pp)->rb_left;
+		else if (epoch > conn->proto.epoch)
+			pp = &(*pp)->rb_right;
+		else if (cid < conn->proto.cid)
+			pp = &(*pp)->rb_left;
+		else if (cid > conn->proto.cid)
+			pp = &(*pp)->rb_right;
+		else
+			goto found_extant_second;
+	}
+
+	/* we can now add the new candidate to the list */
+	conn = candidate;
+	candidate = NULL;
+	rb_link_node(&conn->service_node, p, pp);
+	rb_insert_color(&conn->service_node, &peer->service_conns);
+	rxrpc_get_peer(peer);
+	rxrpc_get_local(local);
+
+	write_unlock_bh(&peer->conn_lock);
+
+	write_lock(&rxrpc_connection_lock);
+	list_add_tail(&conn->link, &rxrpc_connections);
+	write_unlock(&rxrpc_connection_lock);
+
+	new = "new";
+
+success:
+	_net("CONNECTION %s %d {%x}", new, conn->debug_id, conn->proto.cid);
+
+	_leave(" = %p {u=%d}", conn, atomic_read(&conn->usage));
+	return conn;
+
+	/* we found the connection in the list immediately */
+found_extant_connection:
+	if (sp->hdr.securityIndex != conn->security_ix) {
+		read_unlock_bh(&peer->conn_lock);
+		goto security_mismatch;
+	}
+	rxrpc_get_connection(conn);
+	read_unlock_bh(&peer->conn_lock);
+	goto success;
+
+	/* we found the connection on the second time through the list */
+found_extant_second:
+	if (sp->hdr.securityIndex != conn->security_ix) {
+		write_unlock_bh(&peer->conn_lock);
+		goto security_mismatch;
+	}
+	rxrpc_get_connection(conn);
+	write_unlock_bh(&peer->conn_lock);
+	kfree(candidate);
+	goto success;
+
+security_mismatch:
+	kfree(candidate);
+	_leave(" = -EKEYREJECTED");
+	return ERR_PTR(-EKEYREJECTED);
+}

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [PATCH net-next 15/19] rxrpc: Move peer lookup from call-accept to new-incoming-conn
  2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
                   ` (13 preceding siblings ...)
  2016-06-30 14:12 ` [PATCH net-next 14/19] rxrpc: Split service " David Howells
@ 2016-06-30 14:12 ` David Howells
  2016-06-30 14:12 ` [PATCH net-next 16/19] rxrpc: Maintain an extra ref on a conn for the cache list David Howells
                   ` (4 subsequent siblings)
  19 siblings, 0 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:12 UTC (permalink / raw)
  To: davem; +Cc: dhowells, netdev, linux-afs, linux-kernel

Move the lookup of a peer from a call that's being accepted into the
function that creates a new incoming connection.  This will allow us to
avoid incrementing the peer's usage count in some cases in future.

Note that I haven't bother to integrate rxrpc_get_addr_from_skb() with
rxrpc_extract_addr_from_skb() as I'm going to delete the former in the very
near future.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/ar-internal.h  |    3 ++-
 net/rxrpc/call_accept.c  |   31 +++++++------------------------
 net/rxrpc/conn_service.c |   11 ++++++++++-
 net/rxrpc/utils.c        |   32 ++++++++++++++++++++++++++++++++
 4 files changed, 51 insertions(+), 26 deletions(-)

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 2495c4facf51..7d87cace0177 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -609,7 +609,7 @@ static inline void rxrpc_queue_conn(struct rxrpc_connection *conn)
  * conn_service.c
  */
 struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *,
-						   struct rxrpc_peer *,
+						   struct sockaddr_rxrpc *,
 						   struct sk_buff *);
 
 /*
@@ -775,6 +775,7 @@ static inline void rxrpc_sysctl_exit(void) {}
  */
 void rxrpc_get_addr_from_skb(struct rxrpc_local *, const struct sk_buff *,
 			     struct sockaddr_rxrpc *);
+int rxrpc_extract_addr_from_skb(struct sockaddr_rxrpc *, struct sk_buff *);
 
 /*
  * debug tracing
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c
index 5367dbe9b96f..0b2832141bd0 100644
--- a/net/rxrpc/call_accept.c
+++ b/net/rxrpc/call_accept.c
@@ -75,7 +75,6 @@ static int rxrpc_accept_incoming_call(struct rxrpc_local *local,
 {
 	struct rxrpc_connection *conn;
 	struct rxrpc_skb_priv *sp, *nsp;
-	struct rxrpc_peer *peer;
 	struct rxrpc_call *call;
 	struct sk_buff *notification;
 	int ret;
@@ -94,15 +93,7 @@ static int rxrpc_accept_incoming_call(struct rxrpc_local *local,
 	rxrpc_new_skb(notification);
 	notification->mark = RXRPC_SKB_MARK_NEW_CALL;
 
-	peer = rxrpc_lookup_peer(local, srx, GFP_NOIO);
-	if (!peer) {
-		_debug("no peer");
-		ret = -EBUSY;
-		goto error;
-	}
-
-	conn = rxrpc_incoming_connection(local, peer, skb);
-	rxrpc_put_peer(peer);
+	conn = rxrpc_incoming_connection(local, srx, skb);
 	if (IS_ERR(conn)) {
 		_debug("no conn");
 		ret = PTR_ERR(conn);
@@ -226,20 +217,8 @@ void rxrpc_accept_incoming_calls(struct rxrpc_local *local)
 	whdr._rsvd	= 0;
 	whdr.serviceId	= htons(sp->hdr.serviceId);
 
-	/* determine the remote address */
-	memset(&srx, 0, sizeof(srx));
-	srx.srx_family = AF_RXRPC;
-	srx.transport.family = local->srx.transport.family;
-	srx.transport_type = local->srx.transport_type;
-	switch (srx.transport.family) {
-	case AF_INET:
-		srx.transport_len = sizeof(struct sockaddr_in);
-		srx.transport.sin.sin_port = udp_hdr(skb)->source;
-		srx.transport.sin.sin_addr.s_addr = ip_hdr(skb)->saddr;
-		break;
-	default:
-		goto busy;
-	}
+	if (rxrpc_extract_addr_from_skb(&srx, skb) < 0)
+		goto drop;
 
 	/* get the socket providing the service */
 	read_lock_bh(&local->services_lock);
@@ -285,6 +264,10 @@ busy:
 	rxrpc_free_skb(skb);
 	return;
 
+drop:
+	rxrpc_free_skb(skb);
+	return;
+
 invalid_service:
 	skb->priority = RX_INVALID_OPERATION;
 	rxrpc_reject_packet(local, skb);
diff --git a/net/rxrpc/conn_service.c b/net/rxrpc/conn_service.c
index 2536d5276f67..3f0242423a66 100644
--- a/net/rxrpc/conn_service.c
+++ b/net/rxrpc/conn_service.c
@@ -16,11 +16,12 @@
  * get a record of an incoming connection
  */
 struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local,
-						   struct rxrpc_peer *peer,
+						   struct sockaddr_rxrpc *srx,
 						   struct sk_buff *skb)
 {
 	struct rxrpc_connection *conn, *candidate = NULL;
 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+	struct rxrpc_peer *peer;
 	struct rb_node *p, **pp;
 	const char *new = "old";
 	__be32 epoch;
@@ -28,6 +29,12 @@ struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local,
 
 	_enter("");
 
+	peer = rxrpc_lookup_peer(local, srx, GFP_NOIO);
+	if (!peer) {
+		_debug("no peer");
+		return ERR_PTR(-EBUSY);
+	}
+
 	ASSERT(sp->hdr.flags & RXRPC_CLIENT_INITIATED);
 
 	epoch = sp->hdr.epoch;
@@ -59,6 +66,7 @@ struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local,
 	 * redo the search */
 	candidate = rxrpc_alloc_connection(GFP_NOIO);
 	if (!candidate) {
+		rxrpc_put_peer(peer);
 		_leave(" = -ENOMEM");
 		return ERR_PTR(-ENOMEM);
 	}
@@ -115,6 +123,7 @@ struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local,
 success:
 	_net("CONNECTION %s %d {%x}", new, conn->debug_id, conn->proto.cid);
 
+	rxrpc_put_peer(peer);
 	_leave(" = %p {u=%d}", conn, atomic_read(&conn->usage));
 	return conn;
 
diff --git a/net/rxrpc/utils.c b/net/rxrpc/utils.c
index f28122a15a24..d3db02ecc37f 100644
--- a/net/rxrpc/utils.c
+++ b/net/rxrpc/utils.c
@@ -10,6 +10,7 @@
  */
 
 #include <linux/ip.h>
+#include <linux/ipv6.h>
 #include <linux/udp.h>
 #include "ar-internal.h"
 
@@ -39,3 +40,34 @@ void rxrpc_get_addr_from_skb(struct rxrpc_local *local,
 		BUG();
 	}
 }
+
+/*
+ * Fill out a peer address from a socket buffer containing a packet.
+ */
+int rxrpc_extract_addr_from_skb(struct sockaddr_rxrpc *srx, struct sk_buff *skb)
+{
+	memset(srx, 0, sizeof(*srx));
+
+	switch (ntohs(skb->protocol)) {
+	case ETH_P_IP:
+		srx->transport_type = SOCK_DGRAM;
+		srx->transport_len = sizeof(srx->transport.sin);
+		srx->transport.sin.sin_family = AF_INET;
+		srx->transport.sin.sin_port = udp_hdr(skb)->source;
+		srx->transport.sin.sin_addr.s_addr = ip_hdr(skb)->saddr;
+		return 0;
+
+	case ETH_P_IPV6:
+		srx->transport_type = SOCK_DGRAM;
+		srx->transport_len = sizeof(srx->transport.sin6);
+		srx->transport.sin6.sin6_family = AF_INET6;
+		srx->transport.sin6.sin6_port = udp_hdr(skb)->source;
+		srx->transport.sin6.sin6_addr = ipv6_hdr(skb)->saddr;
+		return 0;
+
+	default:
+		pr_warn_ratelimited("AF_RXRPC: Unknown eth protocol %u\n",
+				    ntohs(skb->protocol));
+		return -EAFNOSUPPORT;
+	}
+}

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [PATCH net-next 16/19] rxrpc: Maintain an extra ref on a conn for the cache list
  2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
                   ` (14 preceding siblings ...)
  2016-06-30 14:12 ` [PATCH net-next 15/19] rxrpc: Move peer lookup from call-accept to new-incoming-conn David Howells
@ 2016-06-30 14:12 ` David Howells
  2016-06-30 14:12 ` [PATCH net-next 17/19] rxrpc: Prune the contents of the rxrpc_conn_proto struct David Howells
                   ` (3 subsequent siblings)
  19 siblings, 0 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:12 UTC (permalink / raw)
  To: davem; +Cc: dhowells, netdev, linux-afs, linux-kernel

Overhaul the usage count accounting for the rxrpc_connection struct to make
it easier to implement RCU access from the data_ready handler.

The problem is that currently we're using a lock to prevent the garbage
collector from trying to clean up a connection that we're contemplating
unidling.  We could just stick incoming packets on the connection we find,
but we've then got a problem that we may race when dispatching a work item
to process it as we need to give that a ref to prevent the rxrpc_connection
struct from disappearing in the meantime.

Further, incoming packets may get discarded if attached to an
rxrpc_connection struct that is going away.  Whilst this is not a total
disaster - the client will presumably resend - it would delay processing of
the call.  This would affect the AFS client filesystem's service manager
operation.

To this end:

 (1) We now maintain an extra count on the connection usage count whilst it
     is on the connection list.  This mean it is not in use when its
     refcount is 1.

 (2) When trying to reuse an old connection, we only increment the refcount
     if it is greater than 0.  If it is 0, we replace it in the tree with a
     new candidate connection.

 (3) Two connection flags are added to indicate whether or not a connection
     is in the local's client connection tree (used by sendmsg) or the
     peer's service connection tree (used by data_ready).  This makes sure
     that we don't try and remove a connection if it got replaced.

     The flags are tested under lock with the removal operation to prevent
     the reaper from killing the rxrpc_connection struct whilst someone
     else is trying to effect a replacement.

     This could probably be alleviated by using memory barriers between the
     flag set/test and the rb_tree ops.  The rb_tree op would still need to
     be under the lock, however.

 (4) When trying to reap an old connection, we try to flip the usage count
     from 1 to 0.  If it's not 1 at that point, then it must've come back
     to life temporarily and we ignore it.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/ar-internal.h  |    5 ++-
 net/rxrpc/conn_client.c  |   42 ++++++++++++++++++++----
 net/rxrpc/conn_object.c  |   80 +++++++++++++++++-----------------------------
 net/rxrpc/conn_service.c |   34 +++++++++++++++++---
 4 files changed, 97 insertions(+), 64 deletions(-)

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 7d87cace0177..71618e983b4a 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -258,6 +258,8 @@ struct rxrpc_conn_parameters {
  */
 enum rxrpc_conn_flag {
 	RXRPC_CONN_HAS_IDR,		/* Has a client conn ID assigned */
+	RXRPC_CONN_IN_SERVICE_CONNS,	/* Conn is in peer->service_conns */
+	RXRPC_CONN_IN_CLIENT_CONNS,	/* Conn is in local->client_conns */
 };
 
 /*
@@ -545,10 +547,10 @@ void __exit rxrpc_destroy_all_calls(void);
  */
 extern struct idr rxrpc_client_conn_ids;
 
-void rxrpc_put_client_connection_id(struct rxrpc_connection *);
 void rxrpc_destroy_client_conn_ids(void);
 int rxrpc_connect_call(struct rxrpc_call *, struct rxrpc_conn_parameters *,
 		       struct sockaddr_rxrpc *, gfp_t);
+void rxrpc_unpublish_client_conn(struct rxrpc_connection *);
 
 /*
  * conn_event.c
@@ -611,6 +613,7 @@ static inline void rxrpc_queue_conn(struct rxrpc_connection *conn)
 struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *,
 						   struct sockaddr_rxrpc *,
 						   struct sk_buff *);
+void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
 
 /*
  * input.c
diff --git a/net/rxrpc/conn_client.c b/net/rxrpc/conn_client.c
index ce2fd4508bb5..52809e21f56d 100644
--- a/net/rxrpc/conn_client.c
+++ b/net/rxrpc/conn_client.c
@@ -84,7 +84,7 @@ error:
 /*
  * Release a connection ID for a client connection from the global pool.
  */
-void rxrpc_put_client_connection_id(struct rxrpc_connection *conn)
+static void rxrpc_put_client_connection_id(struct rxrpc_connection *conn)
 {
 	if (test_bit(RXRPC_CONN_HAS_IDR, &conn->flags)) {
 		spin_lock(&rxrpc_conn_id_lock);
@@ -279,12 +279,13 @@ int rxrpc_connect_call(struct rxrpc_call *call,
 	 * lock before dropping the client conn lock.
 	 */
 	_debug("new conn");
+	set_bit(RXRPC_CONN_IN_CLIENT_CONNS, &candidate->flags);
+	rb_link_node(&candidate->client_node, parent, pp);
+	rb_insert_color(&candidate->client_node, &local->client_conns);
+attached:
 	conn = candidate;
 	candidate = NULL;
 
-	rb_link_node(&conn->client_node, parent, pp);
-	rb_insert_color(&conn->client_node, &local->client_conns);
-
 	atomic_set(&conn->avail_chans, RXRPC_MAXCALLS - 1);
 	spin_lock(&conn->channel_lock);
 	spin_unlock(&local->client_conns_lock);
@@ -308,13 +309,22 @@ found_channel:
 	_leave(" = %p {u=%d}", conn, atomic_read(&conn->usage));
 	return 0;
 
-	/* We found a suitable connection already in existence.  Discard any
-	 * candidate we may have allocated, and try to get a channel on this
-	 * one.
+	/* We found a potentially suitable connection already in existence.  If
+	 * we can reuse it (ie. its usage count hasn't been reduced to 0 by the
+	 * reaper), discard any candidate we may have allocated, and try to get
+	 * a channel on this one, otherwise we have to replace it.
 	 */
 found_extant_conn:
 	_debug("found conn");
-	rxrpc_get_connection(conn);
+	if (!rxrpc_get_connection_maybe(conn)) {
+		set_bit(RXRPC_CONN_IN_CLIENT_CONNS, &candidate->flags);
+		rb_replace_node(&conn->client_node,
+				&candidate->client_node,
+				&local->client_conns);
+		clear_bit(RXRPC_CONN_IN_CLIENT_CONNS, &conn->flags);
+		goto attached;
+	}
+
 	spin_unlock(&local->client_conns_lock);
 
 	rxrpc_put_connection(candidate);
@@ -358,3 +368,19 @@ interrupted:
 	_leave(" = -ERESTARTSYS");
 	return -ERESTARTSYS;
 }
+
+/*
+ * Remove a client connection from the local endpoint's tree, thereby removing
+ * it as a target for reuse for new client calls.
+ */
+void rxrpc_unpublish_client_conn(struct rxrpc_connection *conn)
+{
+	struct rxrpc_local *local = conn->params.local;
+
+	spin_lock(&local->client_conns_lock);
+	if (test_and_clear_bit(RXRPC_CONN_IN_CLIENT_CONNS, &conn->flags))
+		rb_erase(&conn->client_node, &local->client_conns);
+	spin_unlock(&local->client_conns_lock);
+
+	rxrpc_put_client_connection_id(conn);
+}
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index 12d7c7850aed..83cd3548ec10 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -51,7 +51,10 @@ struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
 		skb_queue_head_init(&conn->rx_queue);
 		conn->security = &rxrpc_no_security;
 		spin_lock_init(&conn->state_lock);
-		atomic_set(&conn->usage, 1);
+		/* We maintain an extra ref on the connection whilst it is
+		 * on the rxrpc_connections list.
+		 */
+		atomic_set(&conn->usage, 2);
 		conn->debug_id = atomic_inc_return(&rxrpc_debug_id);
 		atomic_set(&conn->avail_chans, RXRPC_MAXCALLS);
 		conn->size_align = 4;
@@ -113,7 +116,7 @@ struct rxrpc_connection *rxrpc_find_connection(struct rxrpc_local *local,
 	return NULL;
 
 found:
-	rxrpc_get_connection(conn);
+	conn = rxrpc_get_connection_maybe(conn);
 	read_unlock_bh(&peer->conn_lock);
 	_leave(" = %p", conn);
 	return conn;
@@ -175,10 +178,10 @@ void rxrpc_put_connection(struct rxrpc_connection *conn)
 	_enter("%p{u=%d,d=%d}",
 	       conn, atomic_read(&conn->usage), conn->debug_id);
 
-	ASSERTCMP(atomic_read(&conn->usage), >, 0);
+	ASSERTCMP(atomic_read(&conn->usage), >, 1);
 
 	conn->put_time = ktime_get_seconds();
-	if (atomic_dec_and_test(&conn->usage)) {
+	if (atomic_dec_return(&conn->usage) == 1) {
 		_debug("zombie");
 		rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0);
 	}
@@ -218,66 +221,42 @@ static void rxrpc_destroy_connection(struct rcu_head *rcu)
 static void rxrpc_connection_reaper(struct work_struct *work)
 {
 	struct rxrpc_connection *conn, *_p;
-	struct rxrpc_peer *peer;
-	unsigned long now, earliest, reap_time;
-	bool remove;
+	unsigned long reap_older_than, earliest, put_time, now;
 
 	LIST_HEAD(graveyard);
 
 	_enter("");
 
 	now = ktime_get_seconds();
+	reap_older_than =  now - rxrpc_connection_expiry;
 	earliest = ULONG_MAX;
 
 	write_lock(&rxrpc_connection_lock);
 	list_for_each_entry_safe(conn, _p, &rxrpc_connections, link) {
-		_debug("reap CONN %d { u=%d,t=%ld }",
-		       conn->debug_id, atomic_read(&conn->usage),
-		       (long) now - (long) conn->put_time);
-
-		if (likely(atomic_read(&conn->usage) > 0))
+		ASSERTCMP(atomic_read(&conn->usage), >, 0);
+		if (likely(atomic_read(&conn->usage) > 1))
 			continue;
 
-		remove = false;
-		if (rxrpc_conn_is_client(conn)) {
-			struct rxrpc_local *local = conn->params.local;
-			spin_lock(&local->client_conns_lock);
-			reap_time = conn->put_time + rxrpc_connection_expiry;
-
-			if (atomic_read(&conn->usage) > 0) {
-				;
-			} else if (reap_time <= now) {
-				list_move_tail(&conn->link, &graveyard);
-				rxrpc_put_client_connection_id(conn);
-				rb_erase(&conn->client_node,
-					 &local->client_conns);
-				remove = true;
-			} else if (reap_time < earliest) {
-				earliest = reap_time;
-			}
-
-			spin_unlock(&local->client_conns_lock);
-		} else {
-			peer = conn->params.peer;
-			write_lock_bh(&peer->conn_lock);
-			reap_time = conn->put_time + rxrpc_connection_expiry;
-
-			if (atomic_read(&conn->usage) > 0) {
-				;
-			} else if (reap_time <= now) {
-				list_move_tail(&conn->link, &graveyard);
-				rb_erase(&conn->service_node,
-					 &peer->service_conns);
-				remove = true;
-			} else if (reap_time < earliest) {
-				earliest = reap_time;
-			}
-
-			write_unlock_bh(&peer->conn_lock);
+		put_time = READ_ONCE(conn->put_time);
+		if (time_after(put_time, reap_older_than)) {
+			if (time_before(put_time, earliest))
+				earliest = put_time;
+			continue;
 		}
 
-		if (remove)
-			list_del_init(&conn->proc_link);
+		/* The usage count sits at 1 whilst the object is unused on the
+		 * list; we reduce that to 0 to make the object unavailable.
+		 */
+		if (atomic_cmpxchg(&conn->usage, 1, 0) != 1)
+			continue;
+
+		if (rxrpc_conn_is_client(conn))
+			rxrpc_unpublish_client_conn(conn);
+		else
+			rxrpc_unpublish_service_conn(conn);
+
+		list_move_tail(&conn->link, &graveyard);
+		list_del_init(&conn->proc_link);
 	}
 	write_unlock(&rxrpc_connection_lock);
 
@@ -288,7 +267,6 @@ static void rxrpc_connection_reaper(struct work_struct *work)
 					 (earliest - now) * HZ);
 	}
 
-	/* then destroy all those pulled out */
 	while (!list_empty(&graveyard)) {
 		conn = list_entry(graveyard.next, struct rxrpc_connection,
 				  link);
diff --git a/net/rxrpc/conn_service.c b/net/rxrpc/conn_service.c
index 3f0242423a66..4ae9a9e7f31f 100644
--- a/net/rxrpc/conn_service.c
+++ b/net/rxrpc/conn_service.c
@@ -105,10 +105,12 @@ struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local,
 	}
 
 	/* we can now add the new candidate to the list */
+	set_bit(RXRPC_CONN_IN_SERVICE_CONNS, &candidate->flags);
+	rb_link_node(&candidate->service_node, p, pp);
+	rb_insert_color(&candidate->service_node, &peer->service_conns);
+attached:
 	conn = candidate;
 	candidate = NULL;
-	rb_link_node(&conn->service_node, p, pp);
-	rb_insert_color(&conn->service_node, &peer->service_conns);
 	rxrpc_get_peer(peer);
 	rxrpc_get_local(local);
 
@@ -129,11 +131,19 @@ success:
 
 	/* we found the connection in the list immediately */
 found_extant_connection:
+	if (!rxrpc_get_connection_maybe(conn)) {
+		set_bit(RXRPC_CONN_IN_SERVICE_CONNS, &candidate->flags);
+		rb_replace_node(&conn->service_node,
+				&candidate->service_node,
+				&peer->service_conns);
+		clear_bit(RXRPC_CONN_IN_SERVICE_CONNS, &conn->flags);
+		goto attached;
+	}
+
 	if (sp->hdr.securityIndex != conn->security_ix) {
 		read_unlock_bh(&peer->conn_lock);
-		goto security_mismatch;
+		goto security_mismatch_put;
 	}
-	rxrpc_get_connection(conn);
 	read_unlock_bh(&peer->conn_lock);
 	goto success;
 
@@ -148,8 +158,24 @@ found_extant_second:
 	kfree(candidate);
 	goto success;
 
+security_mismatch_put:
+	rxrpc_put_connection(conn);
 security_mismatch:
 	kfree(candidate);
 	_leave(" = -EKEYREJECTED");
 	return ERR_PTR(-EKEYREJECTED);
 }
+
+/*
+ * Remove the service connection from the peer's tree, thereby removing it as a
+ * target for incoming packets.
+ */
+void rxrpc_unpublish_service_conn(struct rxrpc_connection *conn)
+{
+	struct rxrpc_peer *peer = conn->params.peer;
+
+	write_lock_bh(&peer->conn_lock);
+	if (test_and_clear_bit(RXRPC_CONN_IN_SERVICE_CONNS, &conn->flags))
+		rb_erase(&conn->service_node, &peer->service_conns);
+	write_unlock_bh(&peer->conn_lock);
+}

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [PATCH net-next 17/19] rxrpc: Prune the contents of the rxrpc_conn_proto struct
  2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
                   ` (15 preceding siblings ...)
  2016-06-30 14:12 ` [PATCH net-next 16/19] rxrpc: Maintain an extra ref on a conn for the cache list David Howells
@ 2016-06-30 14:12 ` David Howells
  2016-06-30 14:12 ` [PATCH net-next 18/19] rxrpc: Move data_ready peer lookup into rxrpc_find_connection() David Howells
                   ` (2 subsequent siblings)
  19 siblings, 0 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:12 UTC (permalink / raw)
  To: davem; +Cc: dhowells, netdev, linux-afs, linux-kernel

Prune the contents of the rxrpc_conn_proto struct.  Most of the fields aren't
used anymore.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/ar-internal.h  |   20 +++++++-------------
 net/rxrpc/call_object.c  |    2 +-
 net/rxrpc/conn_client.c  |   11 -----------
 net/rxrpc/conn_service.c |    2 --
 net/rxrpc/proc.c         |    6 ++++--
 5 files changed, 12 insertions(+), 29 deletions(-)

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 71618e983b4a..db866552877e 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -229,18 +229,12 @@ struct rxrpc_peer {
  * Keys for matching a connection.
  */
 struct rxrpc_conn_proto {
-	unsigned long		hash_key;
-	struct rxrpc_local	*local;		/* Representation of local endpoint */
-	u32			epoch;		/* epoch of this connection */
-	u32			cid;		/* connection ID */
-	u8			in_clientflag;	/* RXRPC_CLIENT_INITIATED if we are server */
-	u8			addr_size;	/* Size of the address */
-	sa_family_t		family;		/* Transport protocol */
-	__be16			port;		/* Peer UDP/UDP6 port */
-	union {					/* Peer address */
-		struct in_addr	ipv4_addr;
-		struct in6_addr	ipv6_addr;
-		u32		raw_addr[0];
+	union {
+		struct {
+			u32	epoch;		/* epoch of this connection */
+			u32	cid;		/* connection ID */
+		};
+		u64		index_key;
 	};
 };
 
@@ -586,7 +580,7 @@ static inline bool rxrpc_conn_is_client(const struct rxrpc_connection *conn)
 
 static inline bool rxrpc_conn_is_service(const struct rxrpc_connection *conn)
 {
-	return conn->proto.in_clientflag;
+	return !rxrpc_conn_is_client(conn);
 }
 
 static inline void rxrpc_get_connection(struct rxrpc_connection *conn)
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 118b0c00a7b4..a88a94c16402 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -566,7 +566,7 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
 	}
 	call->epoch = conn->proto.epoch;
 	call->service_id = conn->params.service_id;
-	call->in_clientflag = conn->proto.in_clientflag;
+	call->in_clientflag = RXRPC_CLIENT_INITIATED;
 	/* Add the new call to the hashtable */
 	rxrpc_call_hash_add(call);
 
diff --git a/net/rxrpc/conn_client.c b/net/rxrpc/conn_client.c
index 52809e21f56d..74b97f67acf5 100644
--- a/net/rxrpc/conn_client.c
+++ b/net/rxrpc/conn_client.c
@@ -132,22 +132,11 @@ rxrpc_alloc_client_connection(struct rxrpc_conn_parameters *cp, gfp_t gfp)
 	}
 
 	conn->params		= *cp;
-	conn->proto.local	= cp->local;
 	conn->proto.epoch	= rxrpc_epoch;
 	conn->proto.cid		= 0;
-	conn->proto.in_clientflag = 0;
-	conn->proto.family	= cp->peer->srx.transport.family;
 	conn->out_clientflag	= RXRPC_CLIENT_INITIATED;
 	conn->state		= RXRPC_CONN_CLIENT;
 
-	switch (conn->proto.family) {
-	case AF_INET:
-		conn->proto.addr_size = sizeof(conn->proto.ipv4_addr);
-		conn->proto.ipv4_addr = cp->peer->srx.transport.sin.sin_addr;
-		conn->proto.port = cp->peer->srx.transport.sin.sin_port;
-		break;
-	}
-
 	ret = rxrpc_get_client_connection_id(conn, gfp);
 	if (ret < 0)
 		goto error_0;
diff --git a/net/rxrpc/conn_service.c b/net/rxrpc/conn_service.c
index 4ae9a9e7f31f..ac56ecf1c079 100644
--- a/net/rxrpc/conn_service.c
+++ b/net/rxrpc/conn_service.c
@@ -71,10 +71,8 @@ struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local,
 		return ERR_PTR(-ENOMEM);
 	}
 
-	candidate->proto.local		= local;
 	candidate->proto.epoch		= sp->hdr.epoch;
 	candidate->proto.cid		= sp->hdr.cid & RXRPC_CIDMASK;
-	candidate->proto.in_clientflag	= RXRPC_CLIENT_INITIATED;
 	candidate->params.local		= local;
 	candidate->params.peer		= peer;
 	candidate->params.service_id	= sp->hdr.serviceId;
diff --git a/net/rxrpc/proc.c b/net/rxrpc/proc.c
index ed5a94d8be7b..f8a3d0f285e5 100644
--- a/net/rxrpc/proc.c
+++ b/net/rxrpc/proc.c
@@ -67,7 +67,8 @@ static int rxrpc_call_seq_show(struct seq_file *seq, void *v)
 	conn = call->conn;
 	if (conn)
 		sprintf(rbuff, "%pI4:%u",
-			&conn->proto.ipv4_addr, ntohs(conn->proto.port));
+			&conn->params.peer->srx.transport.sin.sin_addr,
+			ntohs(conn->params.peer->srx.transport.sin.sin_port));
 	else
 		strcpy(rbuff, "no_connection");
 
@@ -149,7 +150,8 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v)
 		ntohs(conn->params.local->srx.transport.sin.sin_port));
 
 	sprintf(rbuff, "%pI4:%u",
-		&conn->proto.ipv4_addr, ntohs(conn->proto.port));
+		&conn->params.peer->srx.transport.sin.sin_addr,
+		ntohs(conn->params.peer->srx.transport.sin.sin_port));
 
 	seq_printf(seq,
 		   "UDP   %-22.22s %-22.22s %4x %08x %s %3u"

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [PATCH net-next 18/19] rxrpc: Move data_ready peer lookup into rxrpc_find_connection()
  2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
                   ` (16 preceding siblings ...)
  2016-06-30 14:12 ` [PATCH net-next 17/19] rxrpc: Prune the contents of the rxrpc_conn_proto struct David Howells
@ 2016-06-30 14:12 ` David Howells
  2016-06-30 14:12 ` [PATCH net-next 19/19] rxrpc: Use RCU to access a peer's service connection tree David Howells
  2016-06-30 14:22 ` [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
  19 siblings, 0 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:12 UTC (permalink / raw)
  To: davem; +Cc: dhowells, netdev, linux-afs, linux-kernel

Move the peer lookup done in input.c by data_ready into
rxrpc_find_connection().

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/ar-internal.h |    3 --
 net/rxrpc/conn_object.c |   73 ++++++++++++++++++++++++++++++++++++-----------
 net/rxrpc/input.c       |   30 ++-----------------
 net/rxrpc/utils.c       |   27 -----------------
 4 files changed, 59 insertions(+), 74 deletions(-)

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index db866552877e..f7f85151b399 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -566,7 +566,6 @@ void rxrpc_extract_conn_params(struct rxrpc_conn_proto *,
 			       struct rxrpc_local *, struct sk_buff *);
 struct rxrpc_connection *rxrpc_alloc_connection(gfp_t);
 struct rxrpc_connection *rxrpc_find_connection(struct rxrpc_local *,
-					       struct rxrpc_peer *,
 					       struct sk_buff *);
 void __rxrpc_disconnect_call(struct rxrpc_call *);
 void rxrpc_disconnect_call(struct rxrpc_call *);
@@ -770,8 +769,6 @@ static inline void rxrpc_sysctl_exit(void) {}
 /*
  * utils.c
  */
-void rxrpc_get_addr_from_skb(struct rxrpc_local *, const struct sk_buff *,
-			     struct sockaddr_rxrpc *);
 int rxrpc_extract_addr_from_skb(struct sockaddr_rxrpc *, struct sk_buff *);
 
 /*
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index 83cd3548ec10..ff19ee123a9b 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -70,52 +70,91 @@ struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
  * packet
  */
 struct rxrpc_connection *rxrpc_find_connection(struct rxrpc_local *local,
-					       struct rxrpc_peer *peer,
 					       struct sk_buff *skb)
 {
 	struct rxrpc_connection *conn;
+	struct rxrpc_conn_proto k;
 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+	struct sockaddr_rxrpc srx;
+	struct rxrpc_peer *peer;
 	struct rb_node *p;
-	u32 epoch, cid;
 
 	_enter(",{%x,%x}", sp->hdr.cid, sp->hdr.flags);
 
-	read_lock_bh(&peer->conn_lock);
+	if (rxrpc_extract_addr_from_skb(&srx, skb) < 0)
+		goto not_found;
 
-	cid	= sp->hdr.cid & RXRPC_CIDMASK;
-	epoch	= sp->hdr.epoch;
+	/* We may have to handle mixing IPv4 and IPv6 */
+	if (srx.transport.family != local->srx.transport.family) {
+		pr_warn_ratelimited("AF_RXRPC: Protocol mismatch %u not %u\n",
+				    srx.transport.family,
+				    local->srx.transport.family);
+		goto not_found;
+	}
+
+	k.epoch	= sp->hdr.epoch;
+	k.cid	= sp->hdr.cid & RXRPC_CIDMASK;
 
 	if (sp->hdr.flags & RXRPC_CLIENT_INITIATED) {
+		/* We need to look up service connections by the full protocol
+		 * parameter set.  We look up the peer first as an intermediate
+		 * step and then the connection from the peer's tree.
+		 */
+		peer = rxrpc_lookup_peer_rcu(local, &srx);
+		if (!peer)
+			goto not_found;
+
+		read_lock_bh(&peer->conn_lock);
+
 		p = peer->service_conns.rb_node;
 		while (p) {
 			conn = rb_entry(p, struct rxrpc_connection, service_node);
 
 			_debug("maybe %x", conn->proto.cid);
 
-			if (epoch < conn->proto.epoch)
+			if (k.epoch < conn->proto.epoch)
 				p = p->rb_left;
-			else if (epoch > conn->proto.epoch)
+			else if (k.epoch > conn->proto.epoch)
 				p = p->rb_right;
-			else if (cid < conn->proto.cid)
+			else if (k.cid < conn->proto.cid)
 				p = p->rb_left;
-			else if (cid > conn->proto.cid)
+			else if (k.cid > conn->proto.cid)
 				p = p->rb_right;
 			else
-				goto found;
+				goto found_service_conn;
 		}
+		read_unlock_bh(&peer->conn_lock);
 	} else {
-		conn = idr_find(&rxrpc_client_conn_ids, cid >> RXRPC_CIDSHIFT);
-		if (conn &&
-		    conn->proto.epoch == epoch &&
-		    conn->params.peer == peer)
-			goto found;
+		conn = idr_find(&rxrpc_client_conn_ids,
+				k.cid >> RXRPC_CIDSHIFT);
+		if (!conn ||
+		    conn->proto.epoch != k.epoch ||
+		    conn->params.local != local)
+			goto not_found;
+
+		peer = conn->params.peer;
+		switch (srx.transport.family) {
+		case AF_INET:
+			if (peer->srx.transport.sin.sin_port !=
+			    srx.transport.sin.sin_port ||
+			    peer->srx.transport.sin.sin_addr.s_addr !=
+			    srx.transport.sin.sin_addr.s_addr)
+				goto not_found;
+			break;
+		default:
+			BUG();
+		}
+
+		conn = rxrpc_get_connection_maybe(conn);
+		_leave(" = %p", conn);
+		return conn;
 	}
 
-	read_unlock_bh(&peer->conn_lock);
+not_found:
 	_leave(" = NULL");
 	return NULL;
 
-found:
+found_service_conn:
 	conn = rxrpc_get_connection_maybe(conn);
 	read_unlock_bh(&peer->conn_lock);
 	_leave(" = %p", conn);
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 710e55a5f569..51a7611dd2f6 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -626,32 +626,6 @@ int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
 	return 0;
 }
 
-static struct rxrpc_connection *rxrpc_conn_from_local(struct rxrpc_local *local,
-						      struct sk_buff *skb)
-{
-	struct rxrpc_peer *peer;
-	struct rxrpc_connection *conn;
-	struct sockaddr_rxrpc srx;
-
-	rxrpc_get_addr_from_skb(local, skb, &srx);
-	rcu_read_lock();
-	peer = rxrpc_lookup_peer_rcu(local, &srx);
-	if (!peer)
-		goto cant_find_peer;
-
-	conn = rxrpc_find_connection(local, peer, skb);
-	rcu_read_unlock();
-	if (!conn)
-		goto cant_find_conn;
-
-	return conn;
-
-cant_find_peer:
-	rcu_read_unlock();
-cant_find_conn:
-	return NULL;
-}
-
 /*
  * handle data received on the local endpoint
  * - may be called in interrupt context
@@ -731,7 +705,9 @@ void rxrpc_data_ready(struct sock *sk)
 		 * old-fashioned way doesn't really hurt */
 		struct rxrpc_connection *conn;
 
-		conn = rxrpc_conn_from_local(local, skb);
+		rcu_read_lock();
+		conn = rxrpc_find_connection(local, skb);
+		rcu_read_unlock();
 		if (!conn)
 			goto cant_route_call;
 
diff --git a/net/rxrpc/utils.c b/net/rxrpc/utils.c
index d3db02ecc37f..b88914d53ca5 100644
--- a/net/rxrpc/utils.c
+++ b/net/rxrpc/utils.c
@@ -15,33 +15,6 @@
 #include "ar-internal.h"
 
 /*
- * Set up an RxRPC address from a socket buffer.
- */
-void rxrpc_get_addr_from_skb(struct rxrpc_local *local,
-			     const struct sk_buff *skb,
-			     struct sockaddr_rxrpc *srx)
-{
-	memset(srx, 0, sizeof(*srx));
-	srx->transport_type = local->srx.transport_type;
-	srx->transport.family = local->srx.transport.family;
-
-	/* Can we see an ipv4 UDP packet on an ipv6 UDP socket?  and vice
-	 * versa?
-	 */
-	switch (srx->transport.family) {
-	case AF_INET:
-		srx->transport.sin.sin_port = udp_hdr(skb)->source;
-		srx->transport_len = sizeof(struct sockaddr_in);
-		memcpy(&srx->transport.sin.sin_addr, &ip_hdr(skb)->saddr,
-		       sizeof(struct in_addr));
-		break;
-
-	default:
-		BUG();
-	}
-}
-
-/*
  * Fill out a peer address from a socket buffer containing a packet.
  */
 int rxrpc_extract_addr_from_skb(struct sockaddr_rxrpc *srx, struct sk_buff *skb)

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* [PATCH net-next 19/19] rxrpc: Use RCU to access a peer's service connection tree
  2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
                   ` (17 preceding siblings ...)
  2016-06-30 14:12 ` [PATCH net-next 18/19] rxrpc: Move data_ready peer lookup into rxrpc_find_connection() David Howells
@ 2016-06-30 14:12 ` David Howells
  2016-06-30 15:30   ` Peter Zijlstra
                     ` (2 more replies)
  2016-06-30 14:22 ` [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
  19 siblings, 3 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:12 UTC (permalink / raw)
  To: davem; +Cc: dhowells, netdev, linux-afs, linux-kernel

Move to using RCU access to a peer's service connection tree when routing
an incoming packet.  This is done using a seqlock to trigger retrying of
the tree walk if a change happened.

Further, we no longer get a ref on the connection looked up in the
data_ready handler unless we queue the connection's work item - and then
only if the refcount > 0.


Note that I'm avoiding the use of a hash table for service connections
because each service connection is addressed by a 62-bit number
(constructed from epoch and connection ID >> 2) that would allow the client
to engage in bucket stuffing, given knowledge of the hash algorithm.
Peers, however, are hashed as the network address is less controllable by
the client.  The total number of peers will also be limited in a future
commit.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/ar-internal.h  |   21 ++--
 net/rxrpc/conn_client.c  |    2 
 net/rxrpc/conn_object.c  |   65 +++++------
 net/rxrpc/conn_service.c |  272 +++++++++++++++++++++++++++-------------------
 net/rxrpc/input.c        |   44 ++++---
 net/rxrpc/peer_object.c  |    2 
 6 files changed, 225 insertions(+), 181 deletions(-)

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index f7f85151b399..02945cc77399 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -10,6 +10,7 @@
  */
 
 #include <linux/atomic.h>
+#include <linux/seqlock.h>
 #include <net/sock.h>
 #include <net/af_rxrpc.h>
 #include <rxrpc/packet.h>
@@ -206,7 +207,7 @@ struct rxrpc_peer {
 	struct hlist_head	error_targets;	/* targets for net error distribution */
 	struct work_struct	error_distributor;
 	struct rb_root		service_conns;	/* Service connections */
-	rwlock_t		conn_lock;
+	seqlock_t		service_conn_lock;
 	spinlock_t		lock;		/* access lock */
 	unsigned int		if_mtu;		/* interface MTU for this peer */
 	unsigned int		mtu;		/* network MTU for this peer */
@@ -561,12 +562,10 @@ extern struct list_head rxrpc_connections;
 extern struct list_head rxrpc_connection_proc_list;
 extern rwlock_t rxrpc_connection_lock;
 
-void rxrpc_conn_hash_proto_key(struct rxrpc_conn_proto *);
-void rxrpc_extract_conn_params(struct rxrpc_conn_proto *,
-			       struct rxrpc_local *, struct sk_buff *);
+int rxrpc_extract_addr_from_skb(struct sockaddr_rxrpc *, struct sk_buff *);
 struct rxrpc_connection *rxrpc_alloc_connection(gfp_t);
-struct rxrpc_connection *rxrpc_find_connection(struct rxrpc_local *,
-					       struct sk_buff *);
+struct rxrpc_connection *rxrpc_find_connection_rcu(struct rxrpc_local *,
+						   struct sk_buff *);
 void __rxrpc_disconnect_call(struct rxrpc_call *);
 void rxrpc_disconnect_call(struct rxrpc_call *);
 void rxrpc_put_connection(struct rxrpc_connection *);
@@ -593,16 +592,20 @@ struct rxrpc_connection *rxrpc_get_connection_maybe(struct rxrpc_connection *con
 	return atomic_inc_not_zero(&conn->usage) ? conn : NULL;
 }
 
-static inline void rxrpc_queue_conn(struct rxrpc_connection *conn)
+static inline bool rxrpc_queue_conn(struct rxrpc_connection *conn)
 {
-	if (rxrpc_get_connection_maybe(conn) &&
-	    !rxrpc_queue_work(&conn->processor))
+	if (!rxrpc_get_connection_maybe(conn))
+		return false;
+	if (!rxrpc_queue_work(&conn->processor))
 		rxrpc_put_connection(conn);
+	return true;
 }
 
 /*
  * conn_service.c
  */
+struct rxrpc_connection *rxrpc_find_service_conn_rcu(struct rxrpc_peer *,
+						     struct sk_buff *);
 struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *,
 						   struct sockaddr_rxrpc *,
 						   struct sk_buff *);
diff --git a/net/rxrpc/conn_client.c b/net/rxrpc/conn_client.c
index 74b97f67acf5..ebdc267ecfb4 100644
--- a/net/rxrpc/conn_client.c
+++ b/net/rxrpc/conn_client.c
@@ -132,8 +132,6 @@ rxrpc_alloc_client_connection(struct rxrpc_conn_parameters *cp, gfp_t gfp)
 	}
 
 	conn->params		= *cp;
-	conn->proto.epoch	= rxrpc_epoch;
-	conn->proto.cid		= 0;
 	conn->out_clientflag	= RXRPC_CLIENT_INITIATED;
 	conn->state		= RXRPC_CONN_CLIENT;
 
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index ff19ee123a9b..231aa28c0d88 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -15,7 +15,6 @@
 #include <linux/slab.h>
 #include <linux/net.h>
 #include <linux/skbuff.h>
-#include <linux/crypto.h>
 #include <net/sock.h>
 #include <net/af_rxrpc.h>
 #include "ar-internal.h"
@@ -66,24 +65,30 @@ struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
 }
 
 /*
- * find a connection based on transport and RxRPC connection ID for an incoming
- * packet
+ * Look up a connection in the cache by protocol parameters.
+ *
+ * If successful, a pointer to the connection is returned, but no ref is taken.
+ * NULL is returned if there is no match.
+ *
+ * The caller must be holding the RCU read lock.
  */
-struct rxrpc_connection *rxrpc_find_connection(struct rxrpc_local *local,
-					       struct sk_buff *skb)
+struct rxrpc_connection *rxrpc_find_connection_rcu(struct rxrpc_local *local,
+						   struct sk_buff *skb)
 {
 	struct rxrpc_connection *conn;
 	struct rxrpc_conn_proto k;
 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 	struct sockaddr_rxrpc srx;
 	struct rxrpc_peer *peer;
-	struct rb_node *p;
 
-	_enter(",{%x,%x}", sp->hdr.cid, sp->hdr.flags);
+	_enter(",%x", sp->hdr.cid & RXRPC_CIDMASK);
 
 	if (rxrpc_extract_addr_from_skb(&srx, skb) < 0)
 		goto not_found;
 
+	k.epoch	= sp->hdr.epoch;
+	k.cid	= sp->hdr.cid & RXRPC_CIDMASK;
+
 	/* We may have to handle mixing IPv4 and IPv6 */
 	if (srx.transport.family != local->srx.transport.family) {
 		pr_warn_ratelimited("AF_RXRPC: Protocol mismatch %u not %u\n",
@@ -103,32 +108,23 @@ struct rxrpc_connection *rxrpc_find_connection(struct rxrpc_local *local,
 		peer = rxrpc_lookup_peer_rcu(local, &srx);
 		if (!peer)
 			goto not_found;
-
-		read_lock_bh(&peer->conn_lock);
-
-		p = peer->service_conns.rb_node;
-		while (p) {
-			conn = rb_entry(p, struct rxrpc_connection, service_node);
-
-			_debug("maybe %x", conn->proto.cid);
-
-			if (k.epoch < conn->proto.epoch)
-				p = p->rb_left;
-			else if (k.epoch > conn->proto.epoch)
-				p = p->rb_right;
-			else if (k.cid < conn->proto.cid)
-				p = p->rb_left;
-			else if (k.cid > conn->proto.cid)
-				p = p->rb_right;
-			else
-				goto found_service_conn;
-		}
-		read_unlock_bh(&peer->conn_lock);
+		conn = rxrpc_find_service_conn_rcu(peer, skb);
+		if (!conn || atomic_read(&conn->usage) == 0)
+			goto not_found;
+		_leave(" = %p", conn);
+		return conn;
 	} else {
+		/* Look up client connections by connection ID alone as their
+		 * IDs are unique for this machine.
+		 */
 		conn = idr_find(&rxrpc_client_conn_ids,
-				k.cid >> RXRPC_CIDSHIFT);
-		if (!conn ||
-		    conn->proto.epoch != k.epoch ||
+				sp->hdr.cid >> RXRPC_CIDSHIFT);
+		if (!conn || atomic_read(&conn->usage) == 0) {
+			_debug("no conn");
+			goto not_found;
+		}
+
+		if (conn->proto.epoch != k.epoch ||
 		    conn->params.local != local)
 			goto not_found;
 
@@ -145,7 +141,6 @@ struct rxrpc_connection *rxrpc_find_connection(struct rxrpc_local *local,
 			BUG();
 		}
 
-		conn = rxrpc_get_connection_maybe(conn);
 		_leave(" = %p", conn);
 		return conn;
 	}
@@ -153,12 +148,6 @@ struct rxrpc_connection *rxrpc_find_connection(struct rxrpc_local *local,
 not_found:
 	_leave(" = NULL");
 	return NULL;
-
-found_service_conn:
-	conn = rxrpc_get_connection_maybe(conn);
-	read_unlock_bh(&peer->conn_lock);
-	_leave(" = %p", conn);
-	return conn;
 }
 
 /*
diff --git a/net/rxrpc/conn_service.c b/net/rxrpc/conn_service.c
index ac56ecf1c079..acd73975353e 100644
--- a/net/rxrpc/conn_service.c
+++ b/net/rxrpc/conn_service.c
@@ -13,19 +13,122 @@
 #include "ar-internal.h"
 
 /*
+ * Find a service connection under RCU conditions.
+ *
+ * We could use a hash table, but that is subject to bucket stuffing by an
+ * attacker as the client gets to pick the epoch and cid values and would know
+ * the hash function.  So, instead, we use a hash table for the peer and from
+ * that an rbtree to find the service connection.  Under ordinary circumstances
+ * it might be slower than a large hash table, but it is at least limited in
+ * depth.
+ */
+struct rxrpc_connection *rxrpc_find_service_conn_rcu(struct rxrpc_peer *peer,
+						     struct sk_buff *skb)
+{
+	struct rxrpc_connection *conn = NULL;
+	struct rxrpc_conn_proto k;
+	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+	struct rb_node *p;
+	unsigned int seq;
+
+	k.epoch	= sp->hdr.epoch;
+	k.cid	= sp->hdr.cid & RXRPC_CIDMASK;
+
+	do {
+		/* Unfortunately, rbtree walking doesn't give reliable results
+		 * under just the RCU read lock, so we have to check for
+		 * changes.
+		 */
+		read_seqbegin_or_lock(&peer->service_conn_lock, &seq);
+
+		p = peer->service_conns.rb_node;
+		while (p) {
+			conn = rb_entry(p, struct rxrpc_connection, service_node);
+
+			if (conn->proto.index_key < k.index_key)
+				p = p->rb_left;
+			else if (conn->proto.index_key > k.index_key)
+				p = p->rb_right;
+			else
+				goto done;
+			conn = NULL;
+		}
+	} while (need_seqretry(&peer->service_conn_lock, seq));
+
+done:
+	done_seqretry(&peer->service_conn_lock, seq);
+	_leave(" = %d", conn ? conn->debug_id : -1);
+	return conn;
+}
+
+/*
+ * Insert a service connection into a peer's tree, thereby making it a target
+ * for incoming packets.
+ */
+static struct rxrpc_connection *
+rxrpc_publish_service_conn(struct rxrpc_peer *peer,
+			   struct rxrpc_connection *conn)
+{
+	struct rxrpc_connection *cursor = NULL;
+	struct rxrpc_conn_proto k = conn->proto;
+	struct rb_node **pp, *parent;
+
+	write_seqlock_bh(&peer->service_conn_lock);
+
+	pp = &peer->service_conns.rb_node;
+	parent = NULL;
+	while (*pp) {
+		parent = *pp;
+		cursor = rb_entry(parent,
+				  struct rxrpc_connection, service_node);
+
+		if (cursor->proto.index_key < k.index_key)
+			pp = &(*pp)->rb_left;
+		else if (cursor->proto.index_key > k.index_key)
+			pp = &(*pp)->rb_right;
+		else
+			goto found_extant_conn;
+	}
+
+	rb_link_node(&conn->service_node, parent, pp);
+	rb_insert_color(&conn->service_node, &peer->service_conns);
+conn_published:
+	set_bit(RXRPC_CONN_IN_SERVICE_CONNS, &conn->flags);
+	write_sequnlock_bh(&peer->service_conn_lock);
+	_leave(" = %d [new]", conn->debug_id);
+	return conn;
+
+found_extant_conn:
+	if (atomic_read(&cursor->usage) == 0)
+		goto replace_old_connection;
+	write_sequnlock_bh(&peer->service_conn_lock);
+	/* We should not be able to get here.  rxrpc_incoming_connection() is
+	 * called in a non-reentrant context, so there can't be a race to
+	 * insert a new connection.
+	 */
+	BUG();
+
+replace_old_connection:
+	/* The old connection is from an outdated epoch. */
+	_debug("replace conn");
+	rb_replace_node(&cursor->service_node,
+			&conn->service_node,
+			&peer->service_conns);
+	clear_bit(RXRPC_CONN_IN_SERVICE_CONNS, &cursor->flags);
+	goto conn_published;
+}
+
+/*
  * get a record of an incoming connection
  */
 struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local,
 						   struct sockaddr_rxrpc *srx,
 						   struct sk_buff *skb)
 {
-	struct rxrpc_connection *conn, *candidate = NULL;
+	struct rxrpc_connection *conn;
 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 	struct rxrpc_peer *peer;
-	struct rb_node *p, **pp;
 	const char *new = "old";
-	__be32 epoch;
-	u32 cid;
 
 	_enter("");
 
@@ -37,131 +140,80 @@ struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local,
 
 	ASSERT(sp->hdr.flags & RXRPC_CLIENT_INITIATED);
 
-	epoch = sp->hdr.epoch;
-	cid = sp->hdr.cid & RXRPC_CIDMASK;
-
-	/* search the connection list first */
-	read_lock_bh(&peer->conn_lock);
-
-	p = peer->service_conns.rb_node;
-	while (p) {
-		conn = rb_entry(p, struct rxrpc_connection, service_node);
-
-		_debug("maybe %x", conn->proto.cid);
-
-		if (epoch < conn->proto.epoch)
-			p = p->rb_left;
-		else if (epoch > conn->proto.epoch)
-			p = p->rb_right;
-		else if (cid < conn->proto.cid)
-			p = p->rb_left;
-		else if (cid > conn->proto.cid)
-			p = p->rb_right;
-		else
-			goto found_extant_connection;
+	rcu_read_lock();
+	peer = rxrpc_lookup_peer_rcu(local, srx);
+	if (peer) {
+		conn = rxrpc_find_service_conn_rcu(peer, skb);
+		if (conn) {
+			if (sp->hdr.securityIndex != conn->security_ix)
+				goto security_mismatch_rcu;
+			if (rxrpc_get_connection_maybe(conn))
+				goto found_extant_connection_rcu;
+
+			/* The conn has expired but we can't remove it without
+			 * the appropriate lock, so we attempt to replace it
+			 * when we have a new candidate.
+			 */
+		}
+
+		if (!rxrpc_get_peer_maybe(peer))
+			peer = NULL;
 	}
-	read_unlock_bh(&peer->conn_lock);
-
-	/* not yet present - create a candidate for a new record and then
-	 * redo the search */
-	candidate = rxrpc_alloc_connection(GFP_NOIO);
-	if (!candidate) {
-		rxrpc_put_peer(peer);
-		_leave(" = -ENOMEM");
-		return ERR_PTR(-ENOMEM);
-	}
-
-	candidate->proto.epoch		= sp->hdr.epoch;
-	candidate->proto.cid		= sp->hdr.cid & RXRPC_CIDMASK;
-	candidate->params.local		= local;
-	candidate->params.peer		= peer;
-	candidate->params.service_id	= sp->hdr.serviceId;
-	candidate->security_ix		= sp->hdr.securityIndex;
-	candidate->out_clientflag	= 0;
-	candidate->state		= RXRPC_CONN_SERVICE;
-	if (candidate->params.service_id)
-		candidate->state	= RXRPC_CONN_SERVICE_UNSECURED;
-
-	write_lock_bh(&peer->conn_lock);
-
-	pp = &peer->service_conns.rb_node;
-	p = NULL;
-	while (*pp) {
-		p = *pp;
-		conn = rb_entry(p, struct rxrpc_connection, service_node);
+	rcu_read_unlock();
 
-		if (epoch < conn->proto.epoch)
-			pp = &(*pp)->rb_left;
-		else if (epoch > conn->proto.epoch)
-			pp = &(*pp)->rb_right;
-		else if (cid < conn->proto.cid)
-			pp = &(*pp)->rb_left;
-		else if (cid > conn->proto.cid)
-			pp = &(*pp)->rb_right;
-		else
-			goto found_extant_second;
+	if (!peer) {
+		peer = rxrpc_lookup_peer(local, srx, GFP_NOIO);
+		if (IS_ERR(peer))
+			goto enomem;
 	}
 
-	/* we can now add the new candidate to the list */
-	set_bit(RXRPC_CONN_IN_SERVICE_CONNS, &candidate->flags);
-	rb_link_node(&candidate->service_node, p, pp);
-	rb_insert_color(&candidate->service_node, &peer->service_conns);
-attached:
-	conn = candidate;
-	candidate = NULL;
-	rxrpc_get_peer(peer);
-	rxrpc_get_local(local);
+	/* We don't have a matching record yet. */
+	conn = rxrpc_alloc_connection(GFP_NOIO);
+	if (!conn)
+		goto enomem_peer;
+
+	conn->proto.epoch	= sp->hdr.epoch;
+	conn->proto.cid		= sp->hdr.cid & RXRPC_CIDMASK;
+	conn->params.local	= local;
+	conn->params.peer	= peer;
+	conn->params.service_id	= sp->hdr.serviceId;
+	conn->security_ix	= sp->hdr.securityIndex;
+	conn->out_clientflag	= 0;
+	conn->state		= RXRPC_CONN_SERVICE;
+	if (conn->params.service_id)
+		conn->state	= RXRPC_CONN_SERVICE_UNSECURED;
 
-	write_unlock_bh(&peer->conn_lock);
+	rxrpc_get_local(local);
 
 	write_lock(&rxrpc_connection_lock);
 	list_add_tail(&conn->link, &rxrpc_connections);
+	list_add_tail(&conn->proc_link, &rxrpc_connection_proc_list);
 	write_unlock(&rxrpc_connection_lock);
 
+	/* Make the connection a target for incoming packets. */
+	rxrpc_publish_service_conn(peer, conn);
+
 	new = "new";
 
 success:
 	_net("CONNECTION %s %d {%x}", new, conn->debug_id, conn->proto.cid);
-
-	rxrpc_put_peer(peer);
 	_leave(" = %p {u=%d}", conn, atomic_read(&conn->usage));
 	return conn;
 
-	/* we found the connection in the list immediately */
-found_extant_connection:
-	if (!rxrpc_get_connection_maybe(conn)) {
-		set_bit(RXRPC_CONN_IN_SERVICE_CONNS, &candidate->flags);
-		rb_replace_node(&conn->service_node,
-				&candidate->service_node,
-				&peer->service_conns);
-		clear_bit(RXRPC_CONN_IN_SERVICE_CONNS, &conn->flags);
-		goto attached;
-	}
-
-	if (sp->hdr.securityIndex != conn->security_ix) {
-		read_unlock_bh(&peer->conn_lock);
-		goto security_mismatch_put;
-	}
-	read_unlock_bh(&peer->conn_lock);
-	goto success;
-
-	/* we found the connection on the second time through the list */
-found_extant_second:
-	if (sp->hdr.securityIndex != conn->security_ix) {
-		write_unlock_bh(&peer->conn_lock);
-		goto security_mismatch;
-	}
-	rxrpc_get_connection(conn);
-	write_unlock_bh(&peer->conn_lock);
-	kfree(candidate);
+found_extant_connection_rcu:
+	rcu_read_unlock();
 	goto success;
 
-security_mismatch_put:
-	rxrpc_put_connection(conn);
-security_mismatch:
-	kfree(candidate);
+security_mismatch_rcu:
+	rcu_read_unlock();
 	_leave(" = -EKEYREJECTED");
 	return ERR_PTR(-EKEYREJECTED);
+
+enomem_peer:
+	rxrpc_put_peer(peer);
+enomem:
+	_leave(" = -ENOMEM");
+	return ERR_PTR(-ENOMEM);
 }
 
 /*
@@ -172,8 +224,8 @@ void rxrpc_unpublish_service_conn(struct rxrpc_connection *conn)
 {
 	struct rxrpc_peer *peer = conn->params.peer;
 
-	write_lock_bh(&peer->conn_lock);
+	write_seqlock_bh(&peer->service_conn_lock);
 	if (test_and_clear_bit(RXRPC_CONN_IN_SERVICE_CONNS, &conn->flags))
 		rb_erase(&conn->service_node, &peer->service_conns);
-	write_unlock_bh(&peer->conn_lock);
+	write_sequnlock_bh(&peer->service_conn_lock);
 }
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 51a7611dd2f6..52c02f1fa686 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -575,13 +575,13 @@ done:
  * post connection-level events to the connection
  * - this includes challenges, responses and some aborts
  */
-static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
+static bool rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
 				      struct sk_buff *skb)
 {
 	_enter("%p,%p", conn, skb);
 
 	skb_queue_tail(&conn->rx_queue, skb);
-	rxrpc_queue_conn(conn);
+	return rxrpc_queue_conn(conn);
 }
 
 /*
@@ -636,6 +636,7 @@ int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
  */
 void rxrpc_data_ready(struct sock *sk)
 {
+	struct rxrpc_connection *conn;
 	struct rxrpc_skb_priv *sp;
 	struct rxrpc_local *local = sk->sk_user_data;
 	struct sk_buff *skb;
@@ -699,36 +700,37 @@ void rxrpc_data_ready(struct sock *sk)
 	    (sp->hdr.callNumber == 0 || sp->hdr.seq == 0))
 		goto bad_message;
 
-	if (sp->hdr.callNumber == 0) {
-		/* This is a connection-level packet. These should be
-		 * fairly rare, so the extra overhead of looking them up the
-		 * old-fashioned way doesn't really hurt */
-		struct rxrpc_connection *conn;
-
-		rcu_read_lock();
-		conn = rxrpc_find_connection(local, skb);
-		rcu_read_unlock();
-		if (!conn)
-			goto cant_route_call;
+	rcu_read_lock();
+
+retry_find_conn:
+	conn = rxrpc_find_connection_rcu(local, skb);
+	if (!conn)
+		goto cant_route_call;
 
+	if (sp->hdr.callNumber == 0) {
+		/* Connection-level packet */
 		_debug("CONN %p {%d}", conn, conn->debug_id);
-		rxrpc_post_packet_to_conn(conn, skb);
-		rxrpc_put_connection(conn);
+		if (!rxrpc_post_packet_to_conn(conn, skb))
+			goto retry_find_conn;
 	} else {
-		struct rxrpc_call *call;
+		/* Call-bound packets are routed by connection channel. */
+		unsigned int channel = sp->hdr.cid & RXRPC_CHANNELMASK;
+		struct rxrpc_channel *chan = &conn->channels[channel];
+		struct rxrpc_call *call = rcu_dereference(chan->call);
 
-		call = rxrpc_find_call_hash(&sp->hdr, local,
-					    AF_INET, &ip_hdr(skb)->saddr);
-		if (call)
-			rxrpc_post_packet_to_call(call, skb);
-		else
+		if (!call || atomic_read(&call->usage) == 0)
 			goto cant_route_call;
+
+		rxrpc_post_packet_to_call(call, skb);
 	}
 
+	rcu_read_unlock();
 out:
 	return;
 
 cant_route_call:
+	rcu_read_unlock();
+
 	_debug("can't route call");
 	if (sp->hdr.flags & RXRPC_CLIENT_INITIATED &&
 	    sp->hdr.type == RXRPC_PACKET_TYPE_DATA) {
diff --git a/net/rxrpc/peer_object.c b/net/rxrpc/peer_object.c
index 01d4930a11f7..538e9831c699 100644
--- a/net/rxrpc/peer_object.c
+++ b/net/rxrpc/peer_object.c
@@ -189,7 +189,7 @@ struct rxrpc_peer *rxrpc_alloc_peer(struct rxrpc_local *local, gfp_t gfp)
 		INIT_WORK(&peer->error_distributor,
 			  &rxrpc_peer_error_distributor);
 		peer->service_conns = RB_ROOT;
-		rwlock_init(&peer->conn_lock);
+		seqlock_init(&peer->service_conn_lock);
 		spin_lock_init(&peer->lock);
 		peer->debug_id = atomic_inc_return(&rxrpc_debug_id);
 	}

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* Re: [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation
  2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
                   ` (18 preceding siblings ...)
  2016-06-30 14:12 ` [PATCH net-next 19/19] rxrpc: Use RCU to access a peer's service connection tree David Howells
@ 2016-06-30 14:22 ` David Howells
  19 siblings, 0 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 14:22 UTC (permalink / raw)
  To: davem; +Cc: dhowells, netdev, linux-afs, linux-kernel

Sigh.

It seems I misunderstood the comment on read_seqbegin_or_lock() and failed to
initialise seq in rxrpc_find_service_conn_rcu().  For some reason my compiler
didn't notice, though Fengguang's magic compile farm did.

So, please ignore this version of the patchset and I'll post a new set.

David

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [PATCH net-next 19/19] rxrpc: Use RCU to access a peer's service connection tree
  2016-06-30 14:12 ` [PATCH net-next 19/19] rxrpc: Use RCU to access a peer's service connection tree David Howells
@ 2016-06-30 15:30   ` Peter Zijlstra
  2016-06-30 16:22   ` David Howells
  2016-06-30 16:36   ` David Howells
  2 siblings, 0 replies; 25+ messages in thread
From: Peter Zijlstra @ 2016-06-30 15:30 UTC (permalink / raw)
  To: David Howells; +Cc: davem, netdev, linux-afs, linux-kernel

On Thu, Jun 30, 2016 at 03:12:58PM +0100, David Howells wrote:
> +struct rxrpc_connection *rxrpc_find_service_conn_rcu(struct rxrpc_peer *peer,
> +						     struct sk_buff *skb)
> +{
> +	struct rxrpc_connection *conn = NULL;
> +	struct rxrpc_conn_proto k;
> +	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
> +	struct rb_node *p;
> +	unsigned int seq;
> +
> +	k.epoch	= sp->hdr.epoch;
> +	k.cid	= sp->hdr.cid & RXRPC_CIDMASK;
> +
> +	do {
> +		/* Unfortunately, rbtree walking doesn't give reliable results
> +		 * under just the RCU read lock, so we have to check for
> +		 * changes.
> +		 */
> +		read_seqbegin_or_lock(&peer->service_conn_lock, &seq);
> +
> +		p = peer->service_conns.rb_node;
> +		while (p) {
> +			conn = rb_entry(p, struct rxrpc_connection, service_node);
> +
> +			if (conn->proto.index_key < k.index_key)
> +				p = p->rb_left;
> +			else if (conn->proto.index_key > k.index_key)
> +				p = p->rb_right;

You still very much need rcu_dereference() for both left and right
pointers. As well as the first p load.

> +			else
> +				goto done;
> +			conn = NULL;
> +		}
> +	} while (need_seqretry(&peer->service_conn_lock, seq));
> +
> +done:
> +	done_seqretry(&peer->service_conn_lock, seq);
> +	_leave(" = %d", conn ? conn->debug_id : -1);
> +	return conn;
> +}


> +static struct rxrpc_connection *
> +rxrpc_publish_service_conn(struct rxrpc_peer *peer,
> +			   struct rxrpc_connection *conn)
> +{
> +	struct rxrpc_connection *cursor = NULL;
> +	struct rxrpc_conn_proto k = conn->proto;
> +	struct rb_node **pp, *parent;
> +
> +	write_seqlock_bh(&peer->service_conn_lock);
> +
> +	pp = &peer->service_conns.rb_node;
> +	parent = NULL;
> +	while (*pp) {
> +		parent = *pp;
> +		cursor = rb_entry(parent,
> +				  struct rxrpc_connection, service_node);
> +
> +		if (cursor->proto.index_key < k.index_key)
> +			pp = &(*pp)->rb_left;
> +		else if (cursor->proto.index_key > k.index_key)
> +			pp = &(*pp)->rb_right;
> +		else
> +			goto found_extant_conn;
> +	}
> +
> +	rb_link_node(&conn->service_node, parent, pp);

You want rb_link_node_rcu() here.

> +	rb_insert_color(&conn->service_node, &peer->service_conns);
> +conn_published:
> +	set_bit(RXRPC_CONN_IN_SERVICE_CONNS, &conn->flags);
> +	write_sequnlock_bh(&peer->service_conn_lock);
> +	_leave(" = %d [new]", conn->debug_id);
> +	return conn;

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [PATCH net-next 19/19] rxrpc: Use RCU to access a peer's service connection tree
  2016-06-30 14:12 ` [PATCH net-next 19/19] rxrpc: Use RCU to access a peer's service connection tree David Howells
  2016-06-30 15:30   ` Peter Zijlstra
@ 2016-06-30 16:22   ` David Howells
  2016-06-30 16:36   ` David Howells
  2 siblings, 0 replies; 25+ messages in thread
From: David Howells @ 2016-06-30 16:22 UTC (permalink / raw)
  To: Peter Zijlstra; +Cc: dhowells, davem, netdev, linux-afs, linux-kernel

Peter Zijlstra <peterz@infradead.org> wrote:

> > +			if (conn->proto.index_key < k.index_key)
> > +				p = p->rb_left;
> > +			else if (conn->proto.index_key > k.index_key)
> > +				p = p->rb_right;
> 
> You still very much need rcu_dereference() for both left and right
> pointers. As well as the first p load.

Bah...  Yes.  Good point.

> > +	rb_link_node(&conn->service_node, parent, pp);
> 
> You want rb_link_node_rcu() here.

Should there be an rb_replace_node_rcu() also?

David

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [PATCH net-next 19/19] rxrpc: Use RCU to access a peer's service connection tree
  2016-06-30 14:12 ` [PATCH net-next 19/19] rxrpc: Use RCU to access a peer's service connection tree David Howells
  2016-06-30 15:30   ` Peter Zijlstra
  2016-06-30 16:22   ` David Howells
@ 2016-06-30 16:36   ` David Howells
  2016-06-30 20:19     ` Peter Zijlstra
  2 siblings, 1 reply; 25+ messages in thread
From: David Howells @ 2016-06-30 16:36 UTC (permalink / raw)
  To: Peter Zijlstra; +Cc: dhowells, davem, netdev, linux-afs, linux-kernel

David Howells <dhowells@redhat.com> wrote:

> > You want rb_link_node_rcu() here.
> 
> Should there be an rb_replace_node_rcu() also?

Or I could make rb_replace_node() RCU friendly.  What do you think of the
attached changes (split into appropriate patches)?  It's a case of changing
the order in which pointers are set in the rbtree code and inserting a
barrier.

I also wonder if rb_insert_color() needs some attention - though possibly
that's okay as it doesn't start with unset pointers (since you call
rb_link_node_rcu() first).

David
---
diff --git a/include/linux/rbtree_augmented.h b/include/linux/rbtree_augmented.h
index 14d7b831b63a..d076183e49be 100644
--- a/include/linux/rbtree_augmented.h
+++ b/include/linux/rbtree_augmented.h
@@ -130,6 +130,19 @@ __rb_change_child(struct rb_node *old, struct rb_node *new,
 		WRITE_ONCE(root->rb_node, new);
 }
 
+static inline void
+__rb_change_child_rcu(struct rb_node *old, struct rb_node *new,
+		      struct rb_node *parent, struct rb_root *root)
+{
+	if (parent) {
+		if (parent->rb_left == old)
+			rcu_assign_pointer(parent->rb_left, new);
+		else
+			rcu_assign_pointer(parent->rb_right, new);
+	} else
+		rcu_assign_pointer(root->rb_node, new);
+}
+
 extern void __rb_erase_color(struct rb_node *parent, struct rb_root *root,
 	void (*augment_rotate)(struct rb_node *old, struct rb_node *new));
 
diff --git a/lib/rbtree.c b/lib/rbtree.c
index 1356454e36de..2b1a190c737c 100644
--- a/lib/rbtree.c
+++ b/lib/rbtree.c
@@ -539,15 +539,17 @@ void rb_replace_node(struct rb_node *victim, struct rb_node *new,
 {
 	struct rb_node *parent = rb_parent(victim);
 
+	/* Copy the pointers/colour from the victim to the replacement */
+	*new = *victim;
+
 	/* Set the surrounding nodes to point to the replacement */
-	__rb_change_child(victim, new, parent, root);
 	if (victim->rb_left)
 		rb_set_parent(victim->rb_left, new);
 	if (victim->rb_right)
 		rb_set_parent(victim->rb_right, new);
 
-	/* Copy the pointers/colour from the victim to the replacement */
-	*new = *victim;
+	/* Set the onward pointer last with an RCU barrier */
+	__rb_change_child_rcu(victim, new, parent, root);
 }
 EXPORT_SYMBOL(rb_replace_node);
 
diff --git a/net/rxrpc/conn_service.c b/net/rxrpc/conn_service.c
index dc64211c5ee8..298ec300cfcc 100644
--- a/net/rxrpc/conn_service.c
+++ b/net/rxrpc/conn_service.c
@@ -41,14 +41,14 @@ struct rxrpc_connection *rxrpc_find_service_conn_rcu(struct rxrpc_peer *peer,
 		 */
 		read_seqbegin_or_lock(&peer->service_conn_lock, &seq);
 
-		p = peer->service_conns.rb_node;
+		p = rcu_dereference(peer->service_conns.rb_node);
 		while (p) {
 			conn = rb_entry(p, struct rxrpc_connection, service_node);
 
 			if (conn->proto.index_key < k.index_key)
-				p = p->rb_left;
+				p = rcu_dereference(p->rb_left);
 			else if (conn->proto.index_key > k.index_key)
-				p = p->rb_right;
+				p = rcu_dereference(p->rb_right);
 			else
 				goto done;
 			conn = NULL;
@@ -90,7 +90,7 @@ rxrpc_publish_service_conn(struct rxrpc_peer *peer,
 			goto found_extant_conn;
 	}
 
-	rb_link_node(&conn->service_node, parent, pp);
+	rb_link_node_rcu(&conn->service_node, parent, pp);
 	rb_insert_color(&conn->service_node, &peer->service_conns);
 conn_published:
 	set_bit(RXRPC_CONN_IN_SERVICE_CONNS, &conn->flags);

^ permalink raw reply related	[flat|nested] 25+ messages in thread

* Re: [PATCH net-next 19/19] rxrpc: Use RCU to access a peer's service connection tree
  2016-06-30 16:36   ` David Howells
@ 2016-06-30 20:19     ` Peter Zijlstra
  0 siblings, 0 replies; 25+ messages in thread
From: Peter Zijlstra @ 2016-06-30 20:19 UTC (permalink / raw)
  To: David Howells; +Cc: davem, netdev, linux-afs, linux-kernel

On Thu, Jun 30, 2016 at 05:36:51PM +0100, David Howells wrote:
> David Howells <dhowells@redhat.com> wrote:
> 
> > > You want rb_link_node_rcu() here.
> > 
> > Should there be an rb_replace_node_rcu() also?
> 
> Or I could make rb_replace_node() RCU friendly.  What do you think of the
> attached changes (split into appropriate patches)?  It's a case of changing
> the order in which pointers are set in the rbtree code and inserting a
> barrier.

> diff --git a/lib/rbtree.c b/lib/rbtree.c
> index 1356454e36de..2b1a190c737c 100644
> --- a/lib/rbtree.c
> +++ b/lib/rbtree.c
> @@ -539,15 +539,17 @@ void rb_replace_node(struct rb_node *victim, struct rb_node *new,
>  {
>  	struct rb_node *parent = rb_parent(victim);
>  
> +	/* Copy the pointers/colour from the victim to the replacement */
> +	*new = *victim;
> +
>  	/* Set the surrounding nodes to point to the replacement */
> -	__rb_change_child(victim, new, parent, root);
>  	if (victim->rb_left)
>  		rb_set_parent(victim->rb_left, new);
>  	if (victim->rb_right)
>  		rb_set_parent(victim->rb_right, new);
>  
> -	/* Copy the pointers/colour from the victim to the replacement */
> -	*new = *victim;
> +	/* Set the onward pointer last with an RCU barrier */
> +	__rb_change_child_rcu(victim, new, parent, root);
>  }
>  EXPORT_SYMBOL(rb_replace_node);

So back when I did this work there was resistance to making the regular
RB-tree primitives more expensive for the rare RCU user. And I suspect
that this is still so.

Now, rb_replace_node() isn't a widely used primitive, so it might go
unnoticed, but since we already have rb_link_node_rcu() adding
rb_replace_node_rcu() is the consistent thing to do.


> diff --git a/net/rxrpc/conn_service.c b/net/rxrpc/conn_service.c
> index dc64211c5ee8..298ec300cfcc 100644
> --- a/net/rxrpc/conn_service.c
> +++ b/net/rxrpc/conn_service.c
> @@ -41,14 +41,14 @@ struct rxrpc_connection *rxrpc_find_service_conn_rcu(struct rxrpc_peer *peer,
>  		 */
>  		read_seqbegin_or_lock(&peer->service_conn_lock, &seq);
>  
> -		p = peer->service_conns.rb_node;
> +		p = rcu_dereference(peer->service_conns.rb_node);
>  		while (p) {
>  			conn = rb_entry(p, struct rxrpc_connection, service_node);
>  
>  			if (conn->proto.index_key < k.index_key)
> -				p = p->rb_left;
> +				p = rcu_dereference(p->rb_left);
>  			else if (conn->proto.index_key > k.index_key)
> -				p = p->rb_right;
> +				p = rcu_dereference(p->rb_right);
>  			else
>  				goto done;
>  			conn = NULL;
> @@ -90,7 +90,7 @@ rxrpc_publish_service_conn(struct rxrpc_peer *peer,
>  			goto found_extant_conn;
>  	}
>  
> -	rb_link_node(&conn->service_node, parent, pp);
> +	rb_link_node_rcu(&conn->service_node, parent, pp);
>  	rb_insert_color(&conn->service_node, &peer->service_conns);
>  conn_published:
>  	set_bit(RXRPC_CONN_IN_SERVICE_CONNS, &conn->flags);

Yep, that's about right.

^ permalink raw reply	[flat|nested] 25+ messages in thread

end of thread, other threads:[~2016-06-30 20:21 UTC | newest]

Thread overview: 25+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-06-30 14:10 [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells
2016-06-30 14:10 ` [PATCH net-next 01/19] rxrpc: Check the source of a packet to a client conn David Howells
2016-06-30 14:10 ` [PATCH net-next 02/19] rxrpc: Avoid using stack memory in SG lists in rxkad David Howells
2016-06-30 14:11 ` [PATCH net-next 03/19] rxrpc: Provide more refcount helper functions David Howells
2016-06-30 14:11 ` [PATCH net-next 04/19] rxrpc: Dup the main conn list for the proc interface David Howells
2016-06-30 14:11 ` [PATCH net-next 05/19] rxrpc: Turn connection #defines into enums and put outside struct def David Howells
2016-06-30 14:11 ` [PATCH net-next 06/19] rxrpc: Check that the client conns cache is empty before module removal David Howells
2016-06-30 14:11 ` [PATCH net-next 07/19] rxrpc: Move usage count getting into rxrpc_queue_conn() David Howells
2016-06-30 14:11 ` [PATCH net-next 08/19] rxrpc: Fix handling of connection failure in client call creation David Howells
2016-06-30 14:11 ` [PATCH net-next 09/19] rxrpc: Release a call's connection ref on call disconnection David Howells
2016-06-30 14:11 ` [PATCH net-next 10/19] rxrpc: Add RCU destruction for connections and calls David Howells
2016-06-30 14:12 ` [PATCH net-next 11/19] rxrpc: Access socket accept queue under right lock David Howells
2016-06-30 14:12 ` [PATCH net-next 12/19] rxrpc: Call channels should have separate call number spaces David Howells
2016-06-30 14:12 ` [PATCH net-next 13/19] rxrpc: Split client connection code out into its own file David Howells
2016-06-30 14:12 ` [PATCH net-next 14/19] rxrpc: Split service " David Howells
2016-06-30 14:12 ` [PATCH net-next 15/19] rxrpc: Move peer lookup from call-accept to new-incoming-conn David Howells
2016-06-30 14:12 ` [PATCH net-next 16/19] rxrpc: Maintain an extra ref on a conn for the cache list David Howells
2016-06-30 14:12 ` [PATCH net-next 17/19] rxrpc: Prune the contents of the rxrpc_conn_proto struct David Howells
2016-06-30 14:12 ` [PATCH net-next 18/19] rxrpc: Move data_ready peer lookup into rxrpc_find_connection() David Howells
2016-06-30 14:12 ` [PATCH net-next 19/19] rxrpc: Use RCU to access a peer's service connection tree David Howells
2016-06-30 15:30   ` Peter Zijlstra
2016-06-30 16:22   ` David Howells
2016-06-30 16:36   ` David Howells
2016-06-30 20:19     ` Peter Zijlstra
2016-06-30 14:22 ` [PATCH net-next 00/19] rxrpc: Improve conn/call lookup and fix call number generation David Howells

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.