linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH 00/11] RxRPC: Rewrite part 2
@ 2016-03-07 14:37 David Howells
  2016-03-07 14:38 ` [PATCH 01/11] rxrpc: Add a common object cache David Howells
                   ` (10 more replies)
  0 siblings, 11 replies; 21+ messages in thread
From: David Howells @ 2016-03-07 14:37 UTC (permalink / raw)
  To: linux-afs; +Cc: dhowells, netdev, linux-kernel


Here's the second set of patches from my RxRPC rewrite, aimed at net-next.

The RxRPC driver wants to end up with four different classes of object
residing in five/six different hashes:

 (1) Local endpoints (hashed by transport address).

 (2) Peers (hashed by remote transport address).

 (3) Connections (hashed by protocol data; hashed by user parameters).

 (4) Calls (temporarily hashed by protocol data; hashed by control message
     user ID).

And, possibly, in future:

 (5) Services (hashed by local endpoint + service ID; hashed by incoming
     protocol data).

The desirable end result is that the connection is the main switch for
incoming packets rather than calls, since each connection has four
'channels' that are the (up to) four calls currently active on it.  This is
how the other RxRPC implementations work.  This means that the transport
object can be killed off, simplifying the code.

Incoming calls currently work by building peer, transport, connection and
call objects and then dumping the incoming packet onto the call.  This
needs to change somewhat also - but will be addressed in a later part of
the rewrite (and may change yet again if service objects are introduced).

The code currently uses spinlocks and rb-trees or lists to find, stepwise,
the target call.  One of the aims of the rewrite is to change this to
RCU-governed hash tables and get rid of as much locking a possible.


To this end, patches 01-02 add a hash table-based object manager that will
then be used to handle all four object classes.  This provides the
following facilities:

 (1) Two hashes per object class.  These have slightly different
     characteristics: All objects must be on the primary hash, but being on
     the secondary hash is optional.  Objects can only be removed from the
     primary by the garbage collector, but can be removed once from the
     secondary hash.

 (2) RCU-safe lookup.

 (3) Usage-count based RCU-safe garbage collection.

 (4) Object re-use.

 (5) One per-class expiry timer instead of per-object timers.

 (6) /proc listing (the primary hash lists all the objects, so a separate
     list isn't necessary).


Patches 03-06 implement the local endpoint object cache using the new
object manager.  This will be responsible for setting up a new connection
object for a new incoming call for which we don't have one set up and
replying to version request packets.

Patches 07-09 implement the peer object cache using the new object manager.
Peers objects will become responsible for handling error reports and MTU
calculation when transport objects are removed.

Patches 10-11 "reclassify" the error report handling as peer event
handling.  This will be overhauled in a later patch to really be driven by
a peer event handler - but the transport object must be got rid of first.
For the moment, this is just a bit of renaming.

Note that some of these patches are basically renames with Makefile
adjustment or the extraction of source out into their own file.  In such a
case, the extracted/moved code isn't modified until a later patch to
simplify GIT history management.

In the case of mass code extraction, should I copy the file in one commit
(without attaching it to the Makefile), then delete the relevant bits from
both files in the next commit to make the patch easier to review?

The object class implementations are going to end up with two files each:

	<class>-object.c	- Object creation, lookup, management
	<class>-event.c		- Object state machine & event processing


The patches can be found here also:

	http://git.kernel.org/cgit/linux/kernel/git/dhowells/linux-fs.git/log/?h=rxrpc-rewrite

Tagged thusly:

	git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs.git
	rxrpc-rewrite-20160307

This is based on net-next/master

David
---
David Howells (11):
      rxrpc: Add a common object cache
      rxrpc: Do procfs lists through objcache
      rxrpc: Separate local endpoint object handling out into its own file
      rxrpc: Implement local endpoint cache
      rxrpc: procfs file to list local endpoints
      rxrpc: Rename ar-local.c to local-event.c
      rxrpc: Rename ar-peer.c to peer-object.c
      rxrpc: Implement peer endpoint cache
      rxrpc: Add /proc/net/rxrpc_peers to display the known remote endpoints
      rxrpc: Rename ar-error.c to peer-event.c
      rxrpc: Rename rxrpc_UDP_error_report() to rxrpc_error_report()


 net/rxrpc/Makefile       |   11 +
 net/rxrpc/af_rxrpc.c     |   17 +
 net/rxrpc/ar-accept.c    |    9 -
 net/rxrpc/ar-connevent.c |    2 
 net/rxrpc/ar-error.c     |  230 ------------------
 net/rxrpc/ar-input.c     |   31 +-
 net/rxrpc/ar-internal.h  |  150 +++++++++---
 net/rxrpc/ar-local.c     |  415 ---------------------------------
 net/rxrpc/ar-peer.c      |  303 ------------------------
 net/rxrpc/ar-transport.c |    2 
 net/rxrpc/local-event.c  |  119 +++++++++
 net/rxrpc/local-object.c |  340 +++++++++++++++++++++++++++
 net/rxrpc/objcache.c     |  581 ++++++++++++++++++++++++++++++++++++++++++++++
 net/rxrpc/objcache.h     |   97 ++++++++
 net/rxrpc/peer-event.c   |  281 ++++++++++++++++++++++
 net/rxrpc/peer-object.c  |  295 +++++++++++++++++++++++
 net/rxrpc/utils.c        |   41 +++
 17 files changed, 1906 insertions(+), 1018 deletions(-)
 delete mode 100644 net/rxrpc/ar-error.c
 delete mode 100644 net/rxrpc/ar-local.c
 delete mode 100644 net/rxrpc/ar-peer.c
 create mode 100644 net/rxrpc/local-event.c
 create mode 100644 net/rxrpc/local-object.c
 create mode 100644 net/rxrpc/objcache.c
 create mode 100644 net/rxrpc/objcache.h
 create mode 100644 net/rxrpc/peer-event.c
 create mode 100644 net/rxrpc/peer-object.c
 create mode 100644 net/rxrpc/utils.c

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [PATCH 01/11] rxrpc: Add a common object cache
  2016-03-07 14:37 [PATCH 00/11] RxRPC: Rewrite part 2 David Howells
@ 2016-03-07 14:38 ` David Howells
  2016-03-07 18:42   ` David Miller
  2016-03-07 22:45   ` David Howells
  2016-03-07 14:38 ` [PATCH 02/11] rxrpc: Do procfs lists through objcache David Howells
                   ` (9 subsequent siblings)
  10 siblings, 2 replies; 21+ messages in thread
From: David Howells @ 2016-03-07 14:38 UTC (permalink / raw)
  To: linux-afs; +Cc: dhowells, netdev, linux-kernel

Add a common object cache implementation for RxRPC.  This will be used to
cache objects of various types (calls, connections, local and remote
endpoint records).  Each object that would be cached must contain an
obj_node struct for the cache to use.  The object's usage count and link
pointers are here, plus other internal metadata.

Each object cache consists of a primary hash to which all objects of that
type must be added and a secondary hash to which objects may also be added
and removed a single time.  Objects are automatically removed from both
hashes when they expire.

Objects start off life with a usage count of 2 - one for the cache and one
for the caller.  When an object's usage count is reduced to 1, it sits in
the cache until its expiry time is reached, at which point the cache
attempts to reduce the count to 0 and, if successful, clean it up.  An
object with a usage count of 1 in the cache can be looked up and have its
usage count increased, thereby stopping the expiry process.

Objects are looked up, unlinked and destroyed under RCU-safe conditions.

A garbage collector cycles through all the hash buckets in the primary hash
and compares the expiry times of the usage-count-1 objects to the current
time, removing any that have expired.  This is kicked by a single timer for
the whole cache rather than having a timer per object.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/Makefile      |    3 
 net/rxrpc/ar-internal.h |   39 ++++
 net/rxrpc/objcache.c    |  477 +++++++++++++++++++++++++++++++++++++++++++++++
 net/rxrpc/objcache.h    |   89 +++++++++
 4 files changed, 607 insertions(+), 1 deletion(-)
 create mode 100644 net/rxrpc/objcache.c
 create mode 100644 net/rxrpc/objcache.h

diff --git a/net/rxrpc/Makefile b/net/rxrpc/Makefile
index ec126f91276b..b79fee14c763 100644
--- a/net/rxrpc/Makefile
+++ b/net/rxrpc/Makefile
@@ -18,7 +18,8 @@ af-rxrpc-y := \
 	ar-recvmsg.o \
 	ar-security.o \
 	ar-skbuff.o \
-	ar-transport.o
+	ar-transport.o \
+	objcache.o
 
 af-rxrpc-$(CONFIG_PROC_FS) += ar-proc.o
 af-rxrpc-$(CONFIG_SYSCTL) += sysctl.o
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 8b495aed517d..21d6ae6f4cc6 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -9,7 +9,11 @@
  * 2 of the License, or (at your option) any later version.
  */
 
+#include <linux/net.h>
+#include <net/sock.h>
+#include <net/af_rxrpc.h>
 #include <rxrpc/packet.h>
+#include "objcache.h"
 
 #if 0
 #define CHECK_SLAB_OKAY(X)				     \
@@ -467,6 +471,41 @@ extern atomic_t rxrpc_n_skbs;
 extern u32 rxrpc_epoch;
 extern atomic_t rxrpc_debug_id;
 extern struct workqueue_struct *rxrpc_workqueue;
+ 
+static inline void __rxrpc_queue_obj(struct work_struct *work,
+				     struct objcache *cache,
+				     struct obj_node *obj)
+{
+	/* Pass the caller's ref to the workqueue or drop it if already
+	 * queued.
+	 */
+	if (!queue_work(rxrpc_workqueue, work))
+		objcache_put(cache, obj);
+}
+
+static inline void rxrpc_queue_obj(struct work_struct *work,
+				   struct objcache *cache,
+				   struct obj_node *obj)
+{
+	/* We don't want to queue the work item if the object is dead - but
+	 * whilst we want to avoid calling objcache_put(), we really, really
+	 * want to avoid calling cancel_sync_wait() or flush_workqueue().
+	 *
+	 * There is, however, a gap between calling queue_work() and doing
+	 * something conditionally on its result that would allow the work item
+	 * to be happen if we get interrupted - so we can't just increment the
+	 * usage count if we queued the work and decrement it in the work func
+	 * as the work func might decrement it *before* we manage to increment
+	 * it here.
+	 *
+	 * So we have to attempt to increment the count before trying the queue
+	 * operation and then correct afterwards if the work was already
+	 * queued.
+	 */
+	if (objcache_get_maybe(obj) &&
+	    !queue_work(rxrpc_workqueue, work))
+		objcache_put(cache, obj);
+}
 
 /*
  * ar-accept.c
diff --git a/net/rxrpc/objcache.c b/net/rxrpc/objcache.c
new file mode 100644
index 000000000000..74eed8ce5894
--- /dev/null
+++ b/net/rxrpc/objcache.c
@@ -0,0 +1,477 @@
+/* Common object cache
+ *
+ * Copyright (C) 2015 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#include <linux/sched.h>
+#include <linux/hash.h>
+#include "ar-internal.h"
+#include "objcache.h"
+
+static void objcache_gc(struct work_struct *work);
+static void objcache_gc_timer_func(unsigned long _cache);
+
+/**
+ * objcache_init - Initialise working state of an object cache.
+ * @cache: The cache to initialise
+ *
+ * Certain fields must be supplied, notably the method pointers and the log2
+ * cache size in cache->shift.  Also, it is assumed that hash_table[] will be
+ * precleared.
+ *
+ * Note that the hash tables must be at least 2 buckets in size otherwise hash
+ * folding and hash scanning for gc won't work.
+ */
+void objcache_init(struct objcache *cache)
+{
+	_enter("%s", cache->name);
+
+	BUG_ON(!cache->hash_key || !cache->cmp_key ||
+	       !cache->hash_table || cache->nr_buckets <= 1);
+
+	INIT_WORK(&cache->gc_work, objcache_gc);
+	setup_timer(&cache->gc_timer, objcache_gc_timer_func,
+		    (unsigned long)cache);
+	cache->gc_next_run = TIME64_MAX;
+	spin_lock_init(&cache->lock);
+	atomic_set(&cache->count, 0);
+	cache->shift = ilog2(cache->nr_buckets);
+	cache->gc_needed = false;
+
+	if (cache->hash_table_2) {
+		BUG_ON(!cache->hash_key_2 || !cache->cmp_key_2 ||
+		       cache->nr_buckets_2 <= 1);
+		cache->shift_2 = ilog2(cache->nr_buckets_2);
+	}
+}
+
+/*
+ * Reduce the full hash value to the table size.
+ */
+static unsigned objcache_hash_fold(unsigned long full_hash_key, int shift)
+{
+	return hash_32(full_hash_key, shift);
+}
+
+/**
+ * objcache_try_add - Add an object to a hash table if no collision
+ * @cache: Cache to add to
+ * @candidate: Candidate object to add
+ * @key: The key to match
+ *
+ * Add an object to the hashtable if there's not already an equivalent object
+ * present.  Return whichever object ends up in the cache.  A ref is taken on
+ * the object returned.  This function will never fail.
+ */
+struct obj_node *objcache_try_add(struct objcache *cache,
+				  struct obj_node *candidate,
+				  const void *key)
+{
+	struct hlist_head *bucket;
+	struct obj_node *obj, *after;
+	unsigned long full_hash_key = cache->hash_key(key);
+	unsigned hash_key = objcache_hash_fold(full_hash_key, cache->shift);
+	int diff;
+
+	_enter("%s", cache->name);
+
+	/* Objects have a usage count of 1 when lurking in the cache with no
+	 * users, so we need two references - one for the cache and one for the
+	 * caller.
+	 */
+	atomic_set(&candidate->usage, 2);
+	INIT_HLIST_NODE(&candidate->link);
+	INIT_HLIST_NODE(&candidate->link_2);
+	candidate->full_hash_key = full_hash_key;
+	candidate->put_timestamp = TIME64_MAX;
+
+	spin_lock(&cache->lock);
+
+	bucket = cache->hash_table + hash_key;
+	_debug("%lu -> %u -> %p", full_hash_key, hash_key, bucket);
+	if (hlist_empty(bucket)) {
+		hlist_add_head_rcu(&candidate->link, bucket);
+	} else {
+		hlist_for_each_entry(obj, bucket, link) {
+			after = obj;
+			diff = ((obj->full_hash_key - candidate->full_hash_key) ?:
+				cache->cmp_key(obj, key));
+			if (diff < 0)
+				continue;
+			if (diff == 0 && atomic_inc_not_zero(&obj->usage))
+				goto already_present;
+
+			hlist_add_before_rcu(&candidate->link, &obj->link);
+			goto added;
+		}
+
+		hlist_add_behind_rcu(&candidate->link, &after->link);
+	}
+
+added:
+	obj = candidate;
+	atomic_inc(&cache->count);
+
+already_present:
+	spin_unlock(&cache->lock);
+	return obj;
+}
+
+/**
+ * objcache_lookup_rcu - Look up an object using RCU.
+ * @cache: The cache to look in
+ * @key: The key to match
+ *
+ * Look up an object in a cache using RCU.  The caller must hold the RCU read
+ * lock.  If a successful return is made, no adjustment to the object usage
+ * count is made.
+ */
+struct obj_node *objcache_lookup_rcu(struct objcache *cache, const void *key)
+{
+	struct hlist_head *bucket;
+	struct obj_node *obj;
+	unsigned long full_hash_key = cache->hash_key(key);
+	unsigned hash_key = objcache_hash_fold(full_hash_key, cache->shift);
+	int diff;
+
+	_enter("%s", cache->name);
+
+	bucket = cache->hash_table + hash_key;
+	hlist_for_each_entry(obj, bucket, link) {
+		diff = (obj->full_hash_key - full_hash_key) ?:
+			cache->cmp_key(obj, key);
+		if (diff < 0)
+			continue;
+		if (diff == 0 && atomic_read(&obj->usage) >= 1)
+			goto found;
+		break;
+	}
+
+	_leave(" = NULL");
+	return NULL;
+
+found:
+	_leave(" = %p {u=%d}", obj, atomic_read(&obj->usage));
+	return obj;
+}
+
+/**
+ * objcache_add_2 - Add an object to the secondary hash
+ * @cache: Cache to add to
+ * @candidate: Candidate object to add
+ * @key: The key to match
+ * @displace: Whether or not to displace a collision
+ *
+ * Add an object to the secondary hashtable.  The object must already be in the
+ * primary cache.  Doesn't alter the object's usage count.
+ *
+ * If there is no collision with an already cached object, the object will be
+ * added and true will be returned.  If there is a collision, then if @displace
+ * is true, the new object will be placed in front of the old one and true will
+ * be returned, otherwise if @displace is false, no change will be made and
+ * false will be returned.
+ */
+bool objcache_add_2(struct objcache *cache, struct obj_node *candidate,
+		    const void *key, bool displace)
+{
+	struct hlist_head *bucket;
+	struct obj_node *obj, *after;
+	unsigned long full_hash_key = cache->hash_key_2(key);
+	unsigned hash_key = objcache_hash_fold(full_hash_key, cache->shift_2);
+	bool ret;
+	int diff;
+
+	_enter("%s", cache->name);
+
+	BUG_ON(hlist_unhashed(&candidate->link));
+
+	/* We assume that the object is already in the primary cache.
+	 */
+	spin_lock(&cache->lock);
+
+	bucket = cache->hash_table_2 + hash_key;
+	if (hlist_empty(bucket)) {
+		hlist_add_head_rcu(&candidate->link_2, bucket);
+		ret = true;
+	} else {
+		hlist_for_each_entry(obj, bucket, link_2) {
+			after = obj;
+			diff = cache->cmp_key_2(obj, key);
+			if (diff < 0)
+				continue;
+
+			if (diff == 0 && !displace) {
+				ret = false;
+				goto out;
+			}
+
+			/* We add in front of one that has the same parameters,
+			 * effectively displacing that from future lookups.
+			 */
+			hlist_add_before_rcu(&candidate->link_2, &obj->link_2);
+			ret = true;
+			goto out;
+		}
+
+		hlist_add_behind_rcu(&candidate->link_2, &after->link_2);
+		ret = true;
+	}
+
+out:
+	spin_unlock(&cache->lock);
+	return ret;
+}
+
+/**
+ * objcache_del_2 - Remove an object from the secondary cache.
+ */
+void objcache_del_2(struct objcache *cache, struct obj_node *obj)
+{
+	BUG_ON(hlist_unhashed(&obj->link_2));
+
+	spin_lock(&cache->lock);
+	hlist_del_rcu(&obj->link_2);
+	spin_unlock(&cache->lock);
+}
+
+/**
+ * objcache_lookup_rcu_2 - Look up an object using RCU in the secondary cache.
+ * @cache: The cache to look in
+ * @key: The key to match
+ *
+ * Look up an object in a secondary cache using RCU.  The caller must hold the
+ * RCU read lock.  If a successful return is made, no adjustment to the object
+ * usage count is made.
+ */
+struct obj_node *objcache_lookup_rcu_2(struct objcache *cache, const void *key)
+{
+	struct hlist_head *bucket;
+	struct obj_node *obj;
+	unsigned long full_hash_key = cache->hash_key_2(key);
+	unsigned hash_key = objcache_hash_fold(full_hash_key, cache->shift_2);
+	int diff;
+
+	_enter("%s", cache->name);
+
+	bucket = cache->hash_table_2 + hash_key;
+	hlist_for_each_entry(obj, bucket, link_2) {
+		diff = cache->cmp_key_2(obj, key);
+		if (diff < 0)
+			continue;
+		if (diff == 0 && atomic_read(&obj->usage) >= 1)
+			goto found;
+		break;
+	}
+
+	_leave(" = NULL");
+	return NULL;
+
+found:
+	_leave(" = %p {u=%d}", obj, atomic_read(&obj->usage));
+	return obj;
+}
+
+/*
+ * Release a ref on an object that's in the cache.  The object is removed from
+ * the cache some time after it is last put.
+ */
+void objcache_put(struct objcache *cache, struct obj_node *obj)
+{
+	struct timespec64 now;
+	time64_t timestamp;
+	unsigned delay = cache->gc_delay;
+	int usage;
+
+	_enter("%s,%p{u=%d}", cache->name, obj, atomic_read(&obj->usage));
+
+	usage = atomic_read(&obj->usage);
+	if (usage < 2) {
+		pr_err("objcache_put: %s usage underrun (%d)\n",
+		       cache->name, usage);
+		BUG();
+	}
+	BUG_ON(cache->gc_clear_all);
+
+	obj->put_timestamp = TIME64_MAX;
+	usage = atomic_dec_return(&obj->usage);
+	if (usage > 1)
+		return;
+	smp_wmb();
+	now = current_kernel_time64();
+	obj->put_timestamp = timestamp = now.tv_sec;
+
+	if (timestamp + delay < cache->gc_next_run) {
+		cache->gc_next_run = timestamp + delay;
+		mod_timer(&cache->gc_timer, jiffies + delay * HZ);
+	}
+
+	_leave("");
+}
+
+/*
+ * Kick off a cache garbage collection cycle after the last put of an object
+ * plus a delay.
+ */
+static void objcache_gc_timer_func(unsigned long _cache)
+{
+	struct objcache *cache = (struct objcache *)_cache;
+
+	cache->gc_next_run = TIME64_MAX;
+	cache->gc_needed = true;
+	queue_work(system_long_wq, &cache->gc_work);
+}
+
+/**
+ * objcache_obj_rcu_done - Tell the cache that an object got RCU cleaned.
+ * @cache: The cache holding the object
+ *
+ * Tell the cache that an object got cleaned up.
+ */
+void objcache_obj_rcu_done(struct objcache *cache)
+{
+	if (atomic_dec_and_test(&cache->count))
+		wake_up_atomic_t(&cache->count);
+}
+
+/*
+ * Garbage collect a cache
+ */
+static void objcache_gc(struct work_struct *work)
+{
+	struct objcache *cache = container_of(work, struct objcache, gc_work);
+	struct hlist_head *bucket;
+	struct hlist_node *cursor;
+	LIST_HEAD(graveyard);
+	struct obj_node *obj;
+	time64_t now = get_seconds(), next_run = cache->gc_next_run, expiry;
+	unsigned gc_bucket = cache->gc_bucket;
+	int nr_scanned = 0, usage;
+
+	_enter("%s,%u-%u", cache->name, gc_bucket, cache->gc_last_bucket);
+
+	spin_lock(&cache->lock);
+
+	if (cache->gc_needed) {
+		_debug("GC NEEDED");
+		cache->gc_last_bucket = gc_bucket + cache->nr_buckets;
+		cache->gc_needed = false;
+	}
+
+	while (gc_bucket != cache->gc_last_bucket) {
+		unsigned n = gc_bucket & (cache->nr_buckets - 1);
+		bucket = &cache->hash_table[n];
+		hlist_for_each_entry_safe(obj, cursor, bucket, link) {
+			_debug("GC SEES %p %d", obj, atomic_read(&obj->usage));
+			nr_scanned++;
+			usage = atomic_read(&obj->usage);
+			if (usage > 1) {
+				if (cache->gc_clear_all) {
+					pr_err("objcache_gc: %s still in use (%d)\n",
+					       cache->name, usage);
+				}
+				continue;
+			}
+			expiry = obj->put_timestamp + cache->gc_delay;
+			_debug("GC MAYBE %p at %lld", obj, expiry - now);
+			if (expiry > now && !cache->gc_clear_all) {
+				if (expiry < next_run)
+					next_run = expiry;
+				_debug("GC defer");
+				continue;
+			}
+
+			if (atomic_cmpxchg(&obj->usage, 1, 0) != 1) {
+				_debug("GC can't dec");
+				continue;
+			}
+
+			_debug("GC %p", obj);
+			_debug("GC UNLINK %p %p", obj->link.next, obj->link.pprev);
+			hlist_del_rcu(&obj->link);
+			_debug("GC UNLINK %p %p", obj->link_2.next, obj->link_2.pprev);
+			if (!hlist_unhashed(&obj->link_2) &&
+			    obj->link_2.pprev != LIST_POISON2)
+				hlist_del_rcu(&obj->link_2);
+			list_add_tail(&obj->gc_link, &graveyard);
+		}
+
+		gc_bucket++;
+		if (nr_scanned > 20)
+			break;
+	}
+
+	cache->gc_bucket = gc_bucket;
+	if (next_run < cache->gc_next_run)
+		cache->gc_next_run = next_run;
+	spin_unlock(&cache->lock);
+
+	/* We need to wait for each dead object to quiesce before we can start
+	 * the destruction process.
+	 */
+	while (!list_empty(&graveyard)) {
+		obj = list_entry(graveyard.next, struct obj_node, gc_link);
+		list_del(&obj->gc_link);
+		if (cache->prepare_for_gc)
+			cache->prepare_for_gc(obj);
+		call_rcu(&obj->rcu, cache->gc_rcu);
+	}
+
+	if (!cache->gc_clear_all) {
+		now = get_seconds();
+		if (next_run <= now) {
+			_debug("GC NEXT now %lld", next_run - now);
+			cache->gc_next_run = TIME64_MAX;
+			cache->gc_last_bucket = gc_bucket + cache->nr_buckets;
+		} else if (next_run < TIME64_MAX) {
+			mod_timer(&cache->gc_timer,
+				  jiffies + (next_run - now) * HZ);
+			_debug("GC NEXT timer %lld", next_run - now);
+		} else {
+			_debug("GC cease");
+		}
+	}
+
+	if (gc_bucket != cache->gc_last_bucket)
+		queue_work(system_long_wq, &cache->gc_work);
+	_leave("");
+}
+
+/*
+ * wait_on_atomic_t() sleep function for uninterruptible waiting
+ */
+static int objcache_wait_atomic_t(atomic_t *p)
+{
+	schedule();
+	return 0;
+}
+
+/**
+ * objcache_clear - Clear a cache
+ * @cache: The cache to clear
+ *
+ * Preemptively destroy all the objects in a cache rather than waiting for them
+ * to time out.
+ */
+void objcache_clear(struct objcache *cache)
+{
+	_enter("%s", cache->name);
+
+	spin_lock(&cache->lock);
+	cache->gc_clear_all = true;
+	cache->gc_needed = true;
+	spin_unlock(&cache->lock);
+	del_timer_sync(&cache->gc_timer);
+	queue_work(system_long_wq, &cache->gc_work);
+	wait_on_atomic_t(&cache->count, objcache_wait_atomic_t,
+			 TASK_UNINTERRUPTIBLE);
+	flush_work(&cache->gc_work);
+	synchronize_rcu();
+
+	_leave("");
+}
diff --git a/net/rxrpc/objcache.h b/net/rxrpc/objcache.h
new file mode 100644
index 000000000000..770ec924a6d2
--- /dev/null
+++ b/net/rxrpc/objcache.h
@@ -0,0 +1,89 @@
+/* Common object cache definitions
+ *
+ * Copyright (C) 2015 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#ifndef _OBJCACHE_H
+#define _OBJCACHE_H
+
+#include <linux/rculist.h>
+#include <linux/workqueue.h>
+#include <linux/timer.h>
+#include <linux/seq_file.h>
+
+struct seq_file;
+
+struct obj_node {
+	union {
+		struct rcu_head		rcu;
+		struct list_head	gc_link;
+	};
+	struct hlist_node	link;
+	struct hlist_node	link_2;
+	unsigned long		full_hash_key;
+	time64_t		put_timestamp;
+	atomic_t		usage;
+};
+
+struct objcache {
+	/* Parameters that must be set before initialisation */
+	const char		*name;
+	void (*prepare_for_gc)(struct obj_node *);
+	void (*gc_rcu)(struct rcu_head *);
+
+	unsigned long (*hash_key)(const void *);
+	int (*cmp_key)(const struct obj_node *, const void *);
+	struct hlist_head	*hash_table;
+	unsigned		gc_delay;
+	u16			nr_buckets;
+
+	/* Secondary hash parameters if we want one - also must be set before
+	 * initialisation.  Note that the secondary hash doesn't store its full
+	 * hash key in the obj_node struct.
+	 */
+	u16			nr_buckets_2;
+	struct hlist_head	*hash_table_2;
+	unsigned long (*hash_key_2)(const void *);
+	int (*cmp_key_2)(const struct obj_node *, const void *);
+
+	/* Internal data */
+	spinlock_t		lock;
+	atomic_t		count;
+	u8			shift;
+	u8			shift_2;
+	bool			gc_needed;
+	bool			gc_clear_all;
+	struct work_struct	gc_work;
+	struct timer_list	gc_timer;
+	time64_t		gc_next_run;
+	unsigned		gc_bucket;
+	unsigned		gc_last_bucket;
+};
+
+static inline bool objcache_get_maybe(struct obj_node *obj)
+{
+	return atomic_inc_not_zero(&obj->usage);
+}
+
+static inline void objcache_get(struct obj_node *obj)
+{
+	atomic_inc(&obj->usage);
+}
+
+extern void objcache_init(struct objcache *);
+extern struct obj_node *objcache_try_add(struct objcache *, struct obj_node *, const void *);
+extern struct obj_node *objcache_lookup_rcu(struct objcache *, const void *);
+extern bool objcache_add_2(struct objcache *, struct obj_node *, const void *, bool);
+extern void objcache_del_2(struct objcache *, struct obj_node *);
+extern struct obj_node *objcache_lookup_rcu_2(struct objcache *, const void *);
+extern void objcache_put(struct objcache *, struct obj_node *);
+extern void objcache_obj_rcu_done(struct objcache *);
+extern void objcache_clear(struct objcache *);
+
+#endif /* _OBJCACHE_H */

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 02/11] rxrpc: Do procfs lists through objcache
  2016-03-07 14:37 [PATCH 00/11] RxRPC: Rewrite part 2 David Howells
  2016-03-07 14:38 ` [PATCH 01/11] rxrpc: Add a common object cache David Howells
@ 2016-03-07 14:38 ` David Howells
  2016-03-07 14:38 ` [PATCH 03/11] rxrpc: Separate local endpoint object handling out into its own file David Howells
                   ` (8 subsequent siblings)
  10 siblings, 0 replies; 21+ messages in thread
From: David Howells @ 2016-03-07 14:38 UTC (permalink / raw)
  To: linux-afs; +Cc: dhowells, netdev, linux-kernel

Use the object cache primary hash to provide lists of RxRPC objects through
/proc/net/ for all caches where desired.  Each user of the cache just needs
to provide a show function in its objcache struct and register the proc
file with objcache_seq_fops as its file operations.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/objcache.c |  104 ++++++++++++++++++++++++++++++++++++++++++++++++++
 net/rxrpc/objcache.h |    8 ++++
 2 files changed, 112 insertions(+)

diff --git a/net/rxrpc/objcache.c b/net/rxrpc/objcache.c
index 74eed8ce5894..e74f8c3c4119 100644
--- a/net/rxrpc/objcache.c
+++ b/net/rxrpc/objcache.c
@@ -11,6 +11,8 @@
 
 #include <linux/sched.h>
 #include <linux/hash.h>
+#include <linux/proc_fs.h>
+#include <linux/seq_file.h>
 #include "ar-internal.h"
 #include "objcache.h"
 
@@ -475,3 +477,105 @@ void objcache_clear(struct objcache *cache)
 
 	_leave("");
 }
+
+/*
+ * Generate a list of cached objects in /proc/net/x
+ */
+static void *objcache_seq_start(struct seq_file *seq, loff_t *_pos)
+	__acquires(rcu)
+{
+	struct objcache *cache = seq->private;
+	struct hlist_head *hash;
+	loff_t pos_l = *_pos;
+	unsigned pos = pos_l, bucket;
+	void *ret;
+
+	if (*_pos > UINT_MAX)
+		return NULL;
+	bucket = pos >> 16;
+	pos &= 0xffff;
+
+	rcu_read_lock();
+
+	do {
+		hash = &cache->hash_table[bucket];
+		if (bucket == 0)
+			ret = seq_hlist_start_head(hash, pos);
+		else
+			ret = seq_hlist_start(hash, pos);
+	} while (!ret && (bucket++,
+			  *_pos = bucket << 16,
+			  bucket < cache->nr_buckets));
+
+	return ret;
+}
+
+static void *objcache_seq_next(struct seq_file *seq, void *v, loff_t *_pos)
+{
+	struct objcache *cache = seq->private;
+	struct hlist_head *hash;
+	unsigned bucket;
+	void *ret;
+
+	if (*_pos > UINT_MAX)
+		return NULL;
+	bucket = *_pos >> 16;
+	hash = &cache->hash_table[bucket];
+	ret = seq_hlist_next(v, hash, _pos);
+	if (ret)
+		return ret;
+
+	while (bucket++,
+	       *_pos = bucket << 16,
+	       bucket < cache->nr_buckets
+	       ) {
+		hash = &cache->hash_table[bucket];
+		ret = seq_hlist_start(hash, 0);
+		if (ret)
+			break;
+	}
+	return ret;
+}
+
+static void objcache_seq_stop(struct seq_file *seq, void *v)
+	__releases(rcu)
+{
+	rcu_read_unlock();
+}
+
+static int objcache_seq_show(struct seq_file *seq, void *v)
+{
+	struct objcache *cache = seq->private;
+	struct obj_node *obj = v;
+
+	return cache->seq_show(seq, obj);
+}
+
+static const struct seq_operations objcache_seq_ops = {
+	.start  = objcache_seq_start,
+	.next   = objcache_seq_next,
+	.stop   = objcache_seq_stop,
+	.show   = objcache_seq_show,
+};
+
+static int objcache_seq_open(struct inode *inode, struct file *file)
+{
+	struct objcache *cache = PDE_DATA(inode);
+	struct seq_file *seq;
+	int ret;
+
+	ret = seq_open(file, &objcache_seq_ops);
+	if (ret == 0) {
+		seq = file->private_data;
+		seq->private = cache;
+	}
+	return ret;
+}
+
+const struct file_operations objcache_seq_fops = {
+	.owner		= THIS_MODULE,
+	.open		= objcache_seq_open,
+	.read		= seq_read,
+	.llseek		= seq_lseek,
+	.release	= seq_release,
+};
diff --git a/net/rxrpc/objcache.h b/net/rxrpc/objcache.h
index 770ec924a6d2..a3799eb4c857 100644
--- a/net/rxrpc/objcache.h
+++ b/net/rxrpc/objcache.h
@@ -52,6 +52,11 @@ struct objcache {
 	unsigned long (*hash_key_2)(const void *);
 	int (*cmp_key_2)(const struct obj_node *, const void *);
 
+	/* If the cache should be visible through /proc, the following
+	 * should be implemented.
+	 */
+	int (*seq_show)(struct seq_file *, void *);
+
 	/* Internal data */
 	spinlock_t		lock;
 	atomic_t		count;
@@ -64,6 +69,7 @@ struct objcache {
 	time64_t		gc_next_run;
 	unsigned		gc_bucket;
 	unsigned		gc_last_bucket;
+	struct seq_operations	seq_ops;
 };
 
 static inline bool objcache_get_maybe(struct obj_node *obj)
@@ -86,4 +92,6 @@ extern void objcache_put(struct objcache *, struct obj_node *);
 extern void objcache_obj_rcu_done(struct objcache *);
 extern void objcache_clear(struct objcache *);
 
+extern const struct file_operations objcache_seq_fops;
+
 #endif /* _OBJCACHE_H */

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 03/11] rxrpc: Separate local endpoint object handling out into its own file
  2016-03-07 14:37 [PATCH 00/11] RxRPC: Rewrite part 2 David Howells
  2016-03-07 14:38 ` [PATCH 01/11] rxrpc: Add a common object cache David Howells
  2016-03-07 14:38 ` [PATCH 02/11] rxrpc: Do procfs lists through objcache David Howells
@ 2016-03-07 14:38 ` David Howells
  2016-03-07 14:38 ` [PATCH 04/11] rxrpc: Implement local endpoint cache David Howells
                   ` (7 subsequent siblings)
  10 siblings, 0 replies; 21+ messages in thread
From: David Howells @ 2016-03-07 14:38 UTC (permalink / raw)
  To: linux-afs; +Cc: dhowells, netdev, linux-kernel

Separate local endpoint object handling out into its own file preparatory
to overhauling it to use the new object cache.  The original file is then
be used exclusively for the local endpoint packet and event handling.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/Makefile       |    1 
 net/rxrpc/ar-internal.h  |   15 +-
 net/rxrpc/ar-local.c     |  299 --------------------------------------------
 net/rxrpc/local-object.c |  316 ++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 328 insertions(+), 303 deletions(-)
 create mode 100644 net/rxrpc/local-object.c

diff --git a/net/rxrpc/Makefile b/net/rxrpc/Makefile
index b79fee14c763..166e4cb3b13c 100644
--- a/net/rxrpc/Makefile
+++ b/net/rxrpc/Makefile
@@ -19,6 +19,7 @@ af-rxrpc-y := \
 	ar-security.o \
 	ar-skbuff.o \
 	ar-transport.o \
+	local-object.o \
 	objcache.o
 
 af-rxrpc-$(CONFIG_PROC_FS) += ar-proc.o
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 21d6ae6f4cc6..cec573dbb5e1 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -598,11 +598,7 @@ void rxrpc_fast_process_packet(struct rxrpc_call *, struct sk_buff *);
 /*
  * ar-local.c
  */
-extern rwlock_t rxrpc_local_lock;
-
-struct rxrpc_local *rxrpc_lookup_local(struct sockaddr_rxrpc *);
-void rxrpc_put_local(struct rxrpc_local *);
-void __exit rxrpc_destroy_all_locals(void);
+extern void rxrpc_process_local_events(struct work_struct *);
 
 /*
  * ar-key.c
@@ -676,6 +672,15 @@ struct rxrpc_transport *rxrpc_find_transport(struct rxrpc_local *,
 					     struct rxrpc_peer *);
 
 /*
+ * local-object.c
+ */
+extern rwlock_t rxrpc_local_lock;
+
+struct rxrpc_local *rxrpc_lookup_local(struct sockaddr_rxrpc *);
+void rxrpc_put_local(struct rxrpc_local *);
+void __exit rxrpc_destroy_all_locals(void);
+
+/*
  * sysctl.c
  */
 #ifdef CONFIG_SYSCTL
diff --git a/net/rxrpc/ar-local.c b/net/rxrpc/ar-local.c
index 4e1e6db0050b..7060995a4276 100644
--- a/net/rxrpc/ar-local.c
+++ b/net/rxrpc/ar-local.c
@@ -22,303 +22,6 @@
 
 static const char rxrpc_version_string[65] = "linux-" UTS_RELEASE " AF_RXRPC";
 
-static LIST_HEAD(rxrpc_locals);
-DEFINE_RWLOCK(rxrpc_local_lock);
-static DECLARE_RWSEM(rxrpc_local_sem);
-static DECLARE_WAIT_QUEUE_HEAD(rxrpc_local_wq);
-
-static void rxrpc_destroy_local(struct work_struct *work);
-static void rxrpc_process_local_events(struct work_struct *work);
-
-/*
- * allocate a new local
- */
-static
-struct rxrpc_local *rxrpc_alloc_local(struct sockaddr_rxrpc *srx)
-{
-	struct rxrpc_local *local;
-
-	local = kzalloc(sizeof(struct rxrpc_local), GFP_KERNEL);
-	if (local) {
-		INIT_WORK(&local->destroyer, &rxrpc_destroy_local);
-		INIT_WORK(&local->acceptor, &rxrpc_accept_incoming_calls);
-		INIT_WORK(&local->rejecter, &rxrpc_reject_packets);
-		INIT_WORK(&local->event_processor, &rxrpc_process_local_events);
-		INIT_LIST_HEAD(&local->services);
-		INIT_LIST_HEAD(&local->link);
-		init_rwsem(&local->defrag_sem);
-		skb_queue_head_init(&local->accept_queue);
-		skb_queue_head_init(&local->reject_queue);
-		skb_queue_head_init(&local->event_queue);
-		spin_lock_init(&local->lock);
-		rwlock_init(&local->services_lock);
-		atomic_set(&local->usage, 1);
-		local->debug_id = atomic_inc_return(&rxrpc_debug_id);
-		memcpy(&local->srx, srx, sizeof(*srx));
-	}
-
-	_leave(" = %p", local);
-	return local;
-}
-
-/*
- * create the local socket
- * - must be called with rxrpc_local_sem writelocked
- */
-static int rxrpc_create_local(struct rxrpc_local *local)
-{
-	struct sock *sock;
-	int ret, opt;
-
-	_enter("%p{%d}", local, local->srx.transport_type);
-
-	/* create a socket to represent the local endpoint */
-	ret = sock_create_kern(&init_net, PF_INET, local->srx.transport_type,
-			       IPPROTO_UDP, &local->socket);
-	if (ret < 0) {
-		_leave(" = %d [socket]", ret);
-		return ret;
-	}
-
-	/* if a local address was supplied then bind it */
-	if (local->srx.transport_len > sizeof(sa_family_t)) {
-		_debug("bind");
-		ret = kernel_bind(local->socket,
-				  (struct sockaddr *) &local->srx.transport,
-				  local->srx.transport_len);
-		if (ret < 0) {
-			_debug("bind failed");
-			goto error;
-		}
-	}
-
-	/* we want to receive ICMP errors */
-	opt = 1;
-	ret = kernel_setsockopt(local->socket, SOL_IP, IP_RECVERR,
-				(char *) &opt, sizeof(opt));
-	if (ret < 0) {
-		_debug("setsockopt failed");
-		goto error;
-	}
-
-	/* we want to set the don't fragment bit */
-	opt = IP_PMTUDISC_DO;
-	ret = kernel_setsockopt(local->socket, SOL_IP, IP_MTU_DISCOVER,
-				(char *) &opt, sizeof(opt));
-	if (ret < 0) {
-		_debug("setsockopt failed");
-		goto error;
-	}
-
-	write_lock_bh(&rxrpc_local_lock);
-	list_add(&local->link, &rxrpc_locals);
-	write_unlock_bh(&rxrpc_local_lock);
-
-	/* set the socket up */
-	sock = local->socket->sk;
-	sock->sk_user_data	= local;
-	sock->sk_data_ready	= rxrpc_data_ready;
-	sock->sk_error_report	= rxrpc_UDP_error_report;
-	_leave(" = 0");
-	return 0;
-
-error:
-	kernel_sock_shutdown(local->socket, SHUT_RDWR);
-	local->socket->sk->sk_user_data = NULL;
-	sock_release(local->socket);
-	local->socket = NULL;
-
-	_leave(" = %d", ret);
-	return ret;
-}
-
-/*
- * create a new local endpoint using the specified UDP address
- */
-struct rxrpc_local *rxrpc_lookup_local(struct sockaddr_rxrpc *srx)
-{
-	struct rxrpc_local *local;
-	int ret;
-
-	_enter("{%d,%u,%pI4+%hu}",
-	       srx->transport_type,
-	       srx->transport.family,
-	       &srx->transport.sin.sin_addr,
-	       ntohs(srx->transport.sin.sin_port));
-
-	down_write(&rxrpc_local_sem);
-
-	/* see if we have a suitable local local endpoint already */
-	read_lock_bh(&rxrpc_local_lock);
-
-	list_for_each_entry(local, &rxrpc_locals, link) {
-		_debug("CMP {%d,%u,%pI4+%hu}",
-		       local->srx.transport_type,
-		       local->srx.transport.family,
-		       &local->srx.transport.sin.sin_addr,
-		       ntohs(local->srx.transport.sin.sin_port));
-
-		if (local->srx.transport_type != srx->transport_type ||
-		    local->srx.transport.family != srx->transport.family)
-			continue;
-
-		switch (srx->transport.family) {
-		case AF_INET:
-			if (local->srx.transport.sin.sin_port !=
-			    srx->transport.sin.sin_port)
-				continue;
-			if (memcmp(&local->srx.transport.sin.sin_addr,
-				   &srx->transport.sin.sin_addr,
-				   sizeof(struct in_addr)) != 0)
-				continue;
-			goto found_local;
-
-		default:
-			BUG();
-		}
-	}
-
-	read_unlock_bh(&rxrpc_local_lock);
-
-	/* we didn't find one, so we need to create one */
-	local = rxrpc_alloc_local(srx);
-	if (!local) {
-		up_write(&rxrpc_local_sem);
-		return ERR_PTR(-ENOMEM);
-	}
-
-	ret = rxrpc_create_local(local);
-	if (ret < 0) {
-		up_write(&rxrpc_local_sem);
-		kfree(local);
-		_leave(" = %d", ret);
-		return ERR_PTR(ret);
-	}
-
-	up_write(&rxrpc_local_sem);
-
-	_net("LOCAL new %d {%d,%u,%pI4+%hu}",
-	     local->debug_id,
-	     local->srx.transport_type,
-	     local->srx.transport.family,
-	     &local->srx.transport.sin.sin_addr,
-	     ntohs(local->srx.transport.sin.sin_port));
-
-	_leave(" = %p [new]", local);
-	return local;
-
-found_local:
-	rxrpc_get_local(local);
-	read_unlock_bh(&rxrpc_local_lock);
-	up_write(&rxrpc_local_sem);
-
-	_net("LOCAL old %d {%d,%u,%pI4+%hu}",
-	     local->debug_id,
-	     local->srx.transport_type,
-	     local->srx.transport.family,
-	     &local->srx.transport.sin.sin_addr,
-	     ntohs(local->srx.transport.sin.sin_port));
-
-	_leave(" = %p [reuse]", local);
-	return local;
-}
-
-/*
- * release a local endpoint
- */
-void rxrpc_put_local(struct rxrpc_local *local)
-{
-	_enter("%p{u=%d}", local, atomic_read(&local->usage));
-
-	ASSERTCMP(atomic_read(&local->usage), >, 0);
-
-	/* to prevent a race, the decrement and the dequeue must be effectively
-	 * atomic */
-	write_lock_bh(&rxrpc_local_lock);
-	if (unlikely(atomic_dec_and_test(&local->usage))) {
-		_debug("destroy local");
-		rxrpc_queue_work(&local->destroyer);
-	}
-	write_unlock_bh(&rxrpc_local_lock);
-	_leave("");
-}
-
-/*
- * destroy a local endpoint
- */
-static void rxrpc_destroy_local(struct work_struct *work)
-{
-	struct rxrpc_local *local =
-		container_of(work, struct rxrpc_local, destroyer);
-
-	_enter("%p{%d}", local, atomic_read(&local->usage));
-
-	down_write(&rxrpc_local_sem);
-
-	write_lock_bh(&rxrpc_local_lock);
-	if (atomic_read(&local->usage) > 0) {
-		write_unlock_bh(&rxrpc_local_lock);
-		up_read(&rxrpc_local_sem);
-		_leave(" [resurrected]");
-		return;
-	}
-
-	list_del(&local->link);
-	local->socket->sk->sk_user_data = NULL;
-	write_unlock_bh(&rxrpc_local_lock);
-
-	downgrade_write(&rxrpc_local_sem);
-
-	ASSERT(list_empty(&local->services));
-	ASSERT(!work_pending(&local->acceptor));
-	ASSERT(!work_pending(&local->rejecter));
-	ASSERT(!work_pending(&local->event_processor));
-
-	/* finish cleaning up the local descriptor */
-	rxrpc_purge_queue(&local->accept_queue);
-	rxrpc_purge_queue(&local->reject_queue);
-	rxrpc_purge_queue(&local->event_queue);
-	kernel_sock_shutdown(local->socket, SHUT_RDWR);
-	sock_release(local->socket);
-
-	up_read(&rxrpc_local_sem);
-
-	_net("DESTROY LOCAL %d", local->debug_id);
-	kfree(local);
-
-	if (list_empty(&rxrpc_locals))
-		wake_up_all(&rxrpc_local_wq);
-
-	_leave("");
-}
-
-/*
- * preemptively destroy all local local endpoint rather than waiting for
- * them to be destroyed
- */
-void __exit rxrpc_destroy_all_locals(void)
-{
-	DECLARE_WAITQUEUE(myself,current);
-
-	_enter("");
-
-	/* we simply have to wait for them to go away */
-	if (!list_empty(&rxrpc_locals)) {
-		set_current_state(TASK_UNINTERRUPTIBLE);
-		add_wait_queue(&rxrpc_local_wq, &myself);
-
-		while (!list_empty(&rxrpc_locals)) {
-			schedule();
-			set_current_state(TASK_UNINTERRUPTIBLE);
-		}
-
-		remove_wait_queue(&rxrpc_local_wq, &myself);
-		set_current_state(TASK_RUNNING);
-	}
-
-	_leave("");
-}
-
 /*
  * Reply to a version request
  */
@@ -377,7 +80,7 @@ static void rxrpc_send_version_request(struct rxrpc_local *local,
 /*
  * Process event packets targetted at a local endpoint.
  */
-static void rxrpc_process_local_events(struct work_struct *work)
+void rxrpc_process_local_events(struct work_struct *work)
 {
 	struct rxrpc_local *local = container_of(work, struct rxrpc_local, event_processor);
 	struct sk_buff *skb;
diff --git a/net/rxrpc/local-object.c b/net/rxrpc/local-object.c
new file mode 100644
index 000000000000..1dc701dbc715
--- /dev/null
+++ b/net/rxrpc/local-object.c
@@ -0,0 +1,316 @@
+/* Local endpoint object management
+ *
+ * Copyright (C) 2015 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#include <linux/module.h>
+#include <linux/net.h>
+#include <linux/skbuff.h>
+#include <linux/slab.h>
+#include <linux/udp.h>
+#include <linux/ip.h>
+#include <net/sock.h>
+#include <net/af_rxrpc.h>
+#include "ar-internal.h"
+
+static LIST_HEAD(rxrpc_locals);
+DEFINE_RWLOCK(rxrpc_local_lock);
+static DECLARE_RWSEM(rxrpc_local_sem);
+static DECLARE_WAIT_QUEUE_HEAD(rxrpc_local_wq);
+
+static void rxrpc_destroy_local(struct work_struct *work);
+
+/*
+ * allocate a new local
+ */
+static
+struct rxrpc_local *rxrpc_alloc_local(struct sockaddr_rxrpc *srx)
+{
+	struct rxrpc_local *local;
+
+	local = kzalloc(sizeof(struct rxrpc_local), GFP_KERNEL);
+	if (local) {
+		INIT_WORK(&local->destroyer, &rxrpc_destroy_local);
+		INIT_WORK(&local->acceptor, &rxrpc_accept_incoming_calls);
+		INIT_WORK(&local->rejecter, &rxrpc_reject_packets);
+		INIT_WORK(&local->event_processor, &rxrpc_process_local_events);
+		INIT_LIST_HEAD(&local->services);
+		INIT_LIST_HEAD(&local->link);
+		init_rwsem(&local->defrag_sem);
+		skb_queue_head_init(&local->accept_queue);
+		skb_queue_head_init(&local->reject_queue);
+		skb_queue_head_init(&local->event_queue);
+		spin_lock_init(&local->lock);
+		rwlock_init(&local->services_lock);
+		atomic_set(&local->usage, 1);
+		local->debug_id = atomic_inc_return(&rxrpc_debug_id);
+		memcpy(&local->srx, srx, sizeof(*srx));
+	}
+
+	_leave(" = %p", local);
+	return local;
+}
+
+/*
+ * create the local socket
+ * - must be called with rxrpc_local_sem writelocked
+ */
+static int rxrpc_create_local(struct rxrpc_local *local)
+{
+	struct sock *sock;
+	int ret, opt;
+
+	_enter("%p{%d}", local, local->srx.transport_type);
+
+	/* create a socket to represent the local endpoint */
+	ret = sock_create_kern(&init_net, PF_INET, local->srx.transport_type,
+			       IPPROTO_UDP, &local->socket);
+	if (ret < 0) {
+		_leave(" = %d [socket]", ret);
+		return ret;
+	}
+
+	/* if a local address was supplied then bind it */
+	if (local->srx.transport_len > sizeof(sa_family_t)) {
+		_debug("bind");
+		ret = kernel_bind(local->socket,
+				  (struct sockaddr *) &local->srx.transport,
+				  local->srx.transport_len);
+		if (ret < 0) {
+			_debug("bind failed");
+			goto error;
+		}
+	}
+
+	/* we want to receive ICMP errors */
+	opt = 1;
+	ret = kernel_setsockopt(local->socket, SOL_IP, IP_RECVERR,
+				(char *) &opt, sizeof(opt));
+	if (ret < 0) {
+		_debug("setsockopt failed");
+		goto error;
+	}
+
+	/* we want to set the don't fragment bit */
+	opt = IP_PMTUDISC_DO;
+	ret = kernel_setsockopt(local->socket, SOL_IP, IP_MTU_DISCOVER,
+				(char *) &opt, sizeof(opt));
+	if (ret < 0) {
+		_debug("setsockopt failed");
+		goto error;
+	}
+
+	write_lock_bh(&rxrpc_local_lock);
+	list_add(&local->link, &rxrpc_locals);
+	write_unlock_bh(&rxrpc_local_lock);
+
+	/* set the socket up */
+	sock = local->socket->sk;
+	sock->sk_user_data	= local;
+	sock->sk_data_ready	= rxrpc_data_ready;
+	sock->sk_error_report	= rxrpc_UDP_error_report;
+	_leave(" = 0");
+	return 0;
+
+error:
+	kernel_sock_shutdown(local->socket, SHUT_RDWR);
+	local->socket->sk->sk_user_data = NULL;
+	sock_release(local->socket);
+	local->socket = NULL;
+
+	_leave(" = %d", ret);
+	return ret;
+}
+
+/*
+ * create a new local endpoint using the specified UDP address
+ */
+struct rxrpc_local *rxrpc_lookup_local(struct sockaddr_rxrpc *srx)
+{
+	struct rxrpc_local *local;
+	int ret;
+
+	_enter("{%d,%u,%pI4+%hu}",
+	       srx->transport_type,
+	       srx->transport.family,
+	       &srx->transport.sin.sin_addr,
+	       ntohs(srx->transport.sin.sin_port));
+
+	down_write(&rxrpc_local_sem);
+
+	/* see if we have a suitable local local endpoint already */
+	read_lock_bh(&rxrpc_local_lock);
+
+	list_for_each_entry(local, &rxrpc_locals, link) {
+		_debug("CMP {%d,%u,%pI4+%hu}",
+		       local->srx.transport_type,
+		       local->srx.transport.family,
+		       &local->srx.transport.sin.sin_addr,
+		       ntohs(local->srx.transport.sin.sin_port));
+
+		if (local->srx.transport_type != srx->transport_type ||
+		    local->srx.transport.family != srx->transport.family)
+			continue;
+
+		switch (srx->transport.family) {
+		case AF_INET:
+			if (local->srx.transport.sin.sin_port !=
+			    srx->transport.sin.sin_port)
+				continue;
+			if (memcmp(&local->srx.transport.sin.sin_addr,
+				   &srx->transport.sin.sin_addr,
+				   sizeof(struct in_addr)) != 0)
+				continue;
+			goto found_local;
+
+		default:
+			BUG();
+		}
+	}
+
+	read_unlock_bh(&rxrpc_local_lock);
+
+	/* we didn't find one, so we need to create one */
+	local = rxrpc_alloc_local(srx);
+	if (!local) {
+		up_write(&rxrpc_local_sem);
+		return ERR_PTR(-ENOMEM);
+	}
+
+	ret = rxrpc_create_local(local);
+	if (ret < 0) {
+		up_write(&rxrpc_local_sem);
+		kfree(local);
+		_leave(" = %d", ret);
+		return ERR_PTR(ret);
+	}
+
+	up_write(&rxrpc_local_sem);
+
+	_net("LOCAL new %d {%d,%u,%pI4+%hu}",
+	     local->debug_id,
+	     local->srx.transport_type,
+	     local->srx.transport.family,
+	     &local->srx.transport.sin.sin_addr,
+	     ntohs(local->srx.transport.sin.sin_port));
+
+	_leave(" = %p [new]", local);
+	return local;
+
+found_local:
+	rxrpc_get_local(local);
+	read_unlock_bh(&rxrpc_local_lock);
+	up_write(&rxrpc_local_sem);
+
+	_net("LOCAL old %d {%d,%u,%pI4+%hu}",
+	     local->debug_id,
+	     local->srx.transport_type,
+	     local->srx.transport.family,
+	     &local->srx.transport.sin.sin_addr,
+	     ntohs(local->srx.transport.sin.sin_port));
+
+	_leave(" = %p [reuse]", local);
+	return local;
+}
+
+/*
+ * release a local endpoint
+ */
+void rxrpc_put_local(struct rxrpc_local *local)
+{
+	_enter("%p{u=%d}", local, atomic_read(&local->usage));
+
+	ASSERTCMP(atomic_read(&local->usage), >, 0);
+
+	/* to prevent a race, the decrement and the dequeue must be effectively
+	 * atomic */
+	write_lock_bh(&rxrpc_local_lock);
+	if (unlikely(atomic_dec_and_test(&local->usage))) {
+		_debug("destroy local");
+		rxrpc_queue_work(&local->destroyer);
+	}
+	write_unlock_bh(&rxrpc_local_lock);
+	_leave("");
+}
+
+/*
+ * destroy a local endpoint
+ */
+static void rxrpc_destroy_local(struct work_struct *work)
+{
+	struct rxrpc_local *local =
+		container_of(work, struct rxrpc_local, destroyer);
+
+	_enter("%p{%d}", local, atomic_read(&local->usage));
+
+	down_write(&rxrpc_local_sem);
+
+	write_lock_bh(&rxrpc_local_lock);
+	if (atomic_read(&local->usage) > 0) {
+		write_unlock_bh(&rxrpc_local_lock);
+		up_read(&rxrpc_local_sem);
+		_leave(" [resurrected]");
+		return;
+	}
+
+	list_del(&local->link);
+	local->socket->sk->sk_user_data = NULL;
+	write_unlock_bh(&rxrpc_local_lock);
+
+	downgrade_write(&rxrpc_local_sem);
+
+	ASSERT(list_empty(&local->services));
+	ASSERT(!work_pending(&local->acceptor));
+	ASSERT(!work_pending(&local->rejecter));
+	ASSERT(!work_pending(&local->event_processor));
+
+	/* finish cleaning up the local descriptor */
+	rxrpc_purge_queue(&local->accept_queue);
+	rxrpc_purge_queue(&local->reject_queue);
+	rxrpc_purge_queue(&local->event_queue);
+	kernel_sock_shutdown(local->socket, SHUT_RDWR);
+	sock_release(local->socket);
+
+	up_read(&rxrpc_local_sem);
+
+	_net("DESTROY LOCAL %d", local->debug_id);
+	kfree(local);
+
+	if (list_empty(&rxrpc_locals))
+		wake_up_all(&rxrpc_local_wq);
+
+	_leave("");
+}
+
+/*
+ * preemptively destroy all local local endpoint rather than waiting for
+ * them to be destroyed
+ */
+void __exit rxrpc_destroy_all_locals(void)
+{
+	DECLARE_WAITQUEUE(myself,current);
+
+	_enter("");
+
+	/* we simply have to wait for them to go away */
+	if (!list_empty(&rxrpc_locals)) {
+		set_current_state(TASK_UNINTERRUPTIBLE);
+		add_wait_queue(&rxrpc_local_wq, &myself);
+
+		while (!list_empty(&rxrpc_locals)) {
+			schedule();
+			set_current_state(TASK_UNINTERRUPTIBLE);
+		}
+
+		remove_wait_queue(&rxrpc_local_wq, &myself);
+		set_current_state(TASK_RUNNING);
+	}
+
+	_leave("");
+}

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 04/11] rxrpc: Implement local endpoint cache
  2016-03-07 14:37 [PATCH 00/11] RxRPC: Rewrite part 2 David Howells
                   ` (2 preceding siblings ...)
  2016-03-07 14:38 ` [PATCH 03/11] rxrpc: Separate local endpoint object handling out into its own file David Howells
@ 2016-03-07 14:38 ` David Howells
  2016-03-07 14:38 ` [PATCH 05/11] rxrpc: procfs file to list local endpoints David Howells
                   ` (6 subsequent siblings)
  10 siblings, 0 replies; 21+ messages in thread
From: David Howells @ 2016-03-07 14:38 UTC (permalink / raw)
  To: linux-afs; +Cc: dhowells, netdev, linux-kernel

Implement the local RxRPC endpoint cache.  Only the primary cache is used.
This is indexed on the following details:

  - Local network transport family - currently only AF_INET.
  - Local network transport type - currently only UDP.
  - Local network transport address.

The hash isn't very big since we don't expect to have many local endpoints
hanging around - RxRPC sockets opened with a 0 service ID (ie. client-only
sockets) share local endpoints if they have matching local network
addresses (typically all zeros).

We use a mutex to handle lookups and don't provide RCU-only lookups since
we only expect write access to this cache to be done from process context
when opening a socket.  The local endpoint object is pointed to by the
transport socket's sk_user_data for the life of the transport socket so
that it's fast to access by the transport socket sk_data_ready and
sk_error_report callbacks.

Further, the transport socket is shut down before we clear the sk_user_data
pointer, so that we can be sure that the transport socket's callbacks won't
be invoked once the RCU destruction is scheduled.

The local endpoint retains the transport socket that we use to send and
capture packets and capture network error messages (ICMP).  The socket is
opened when an endpoint is looked up - if it doesn't already exist.

Note that to make this work, we have to get rid of rxrpc_local_lock as that
causes a potential deadlock between a softirq looking in an object cache
whilst holding that lock vs objcache_clear() taking the cache lock and then
an interrupt.

However, since the socket is locked by the caller of the rxrpc_data_ready()
function and given that we don't clear sk_user_data until after we've shut
down the socket, we are guaranteed that the local endpoint struct is pinned
until rxrpc_data_ready() returns - so we don't need to lock the local
endpoint struct there.

The other places we've taken the lock where we read the usage count and
then increment it if not zero can be replaced by atomic_inc_not_zero()
(hidden inside rxrpc_get_local_maybe()).

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/af_rxrpc.c     |    5 +
 net/rxrpc/ar-accept.c    |    7 -
 net/rxrpc/ar-connevent.c |    2 
 net/rxrpc/ar-input.c     |   18 +--
 net/rxrpc/ar-internal.h  |   41 +++---
 net/rxrpc/ar-local.c     |    5 -
 net/rxrpc/local-object.c |  311 +++++++++++++++++++++++-----------------------
 7 files changed, 194 insertions(+), 195 deletions(-)

diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index a76501757b59..a27d8e3ef854 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -791,6 +791,8 @@ static int __init af_rxrpc_init(void)
 
 	rxrpc_epoch = get_seconds();
 
+	objcache_init(&rxrpc_local_cache);
+
 	ret = -ENOMEM;
 	rxrpc_call_jar = kmem_cache_create(
 		"rxrpc_call_jar", sizeof(struct rxrpc_call), 0,
@@ -856,6 +858,7 @@ error_proto:
 error_work_queue:
 	kmem_cache_destroy(rxrpc_call_jar);
 error_call_jar:
+	objcache_clear(&rxrpc_local_cache);
 	return ret;
 }
 
@@ -874,7 +877,7 @@ static void __exit af_rxrpc_exit(void)
 	rxrpc_destroy_all_connections();
 	rxrpc_destroy_all_transports();
 	rxrpc_destroy_all_peers();
-	rxrpc_destroy_all_locals();
+	objcache_clear(&rxrpc_local_cache);
 
 	ASSERTCMP(atomic_read(&rxrpc_n_skbs), ==, 0);
 
diff --git a/net/rxrpc/ar-accept.c b/net/rxrpc/ar-accept.c
index 277731a5e67a..d43799f8d3ef 100644
--- a/net/rxrpc/ar-accept.c
+++ b/net/rxrpc/ar-accept.c
@@ -213,12 +213,7 @@ void rxrpc_accept_incoming_calls(struct work_struct *work)
 
 	_enter("%d", local->debug_id);
 
-	read_lock_bh(&rxrpc_local_lock);
-	if (atomic_read(&local->usage) > 0)
-		rxrpc_get_local(local);
-	else
-		local = NULL;
-	read_unlock_bh(&rxrpc_local_lock);
+	local = rxrpc_get_local_maybe(local);
 	if (!local) {
 		_leave(" [local dead]");
 		return;
diff --git a/net/rxrpc/ar-connevent.c b/net/rxrpc/ar-connevent.c
index 1bdaaed8cdc4..74ad0d24faad 100644
--- a/net/rxrpc/ar-connevent.c
+++ b/net/rxrpc/ar-connevent.c
@@ -317,7 +317,7 @@ void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
 {
 	CHECK_SLAB_OKAY(&local->usage);
 
-	if (!atomic_inc_not_zero(&local->usage)) {
+	if (!rxrpc_get_local_maybe(local)) {
 		printk("resurrected on reject\n");
 		BUG();
 	}
diff --git a/net/rxrpc/ar-input.c b/net/rxrpc/ar-input.c
index 63ed75c40e29..514bfdaba322 100644
--- a/net/rxrpc/ar-input.c
+++ b/net/rxrpc/ar-input.c
@@ -598,9 +598,9 @@ static void rxrpc_post_packet_to_local(struct rxrpc_local *local,
 {
 	_enter("%p,%p", local, skb);
 
-	atomic_inc(&local->usage);
+	rxrpc_get_local(local);
 	skb_queue_tail(&local->event_queue, skb);
-	rxrpc_queue_work(&local->event_processor);
+	rxrpc_queue_work(&local->processor);
 }
 
 /*
@@ -675,13 +675,13 @@ void rxrpc_data_ready(struct sock *sk)
 
 	ASSERT(!irqs_disabled());
 
-	read_lock_bh(&rxrpc_local_lock);
-	local = sk->sk_user_data;
-	if (local && atomic_read(&local->usage) > 0)
-		rxrpc_get_local(local);
-	else
-		local = NULL;
-	read_unlock_bh(&rxrpc_local_lock);
+	/* The socket is locked by the caller and this prevents the socket from
+	 * being shut down, thus preventing sk_user_data from being cleared
+	 * until this function returns.  The local endpoint may, however, be in
+	 * the process of being discarded from the cache, so we still need to
+	 * validate it.
+	 */
+	local = rxrpc_get_local_maybe(sk->sk_user_data);
 	if (!local) {
 		_leave(" [local dead]");
 		return;
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index cec573dbb5e1..ceb1442f745b 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -167,24 +167,24 @@ struct rxrpc_security {
 };
 
 /*
- * RxRPC local transport endpoint definition
- * - matched by local port, address and protocol type
+ * RxRPC local transport endpoint description
+ * - owned by a single AF_RXRPC socket
+ * - pointed to by transport socket struct sk_user_data
  */
 struct rxrpc_local {
+	struct obj_node		obj;
 	struct socket		*socket;	/* my UDP socket */
-	struct work_struct	destroyer;	/* endpoint destroyer */
 	struct work_struct	acceptor;	/* incoming call processor */
 	struct work_struct	rejecter;	/* packet reject writer */
-	struct work_struct	event_processor; /* endpoint event processor */
+	struct work_struct	processor;	/* endpoint packet processor */
 	struct list_head	services;	/* services listening on this endpoint */
-	struct list_head	link;		/* link in endpoint list */
 	struct rw_semaphore	defrag_sem;	/* control re-enablement of IP DF bit */
 	struct sk_buff_head	accept_queue;	/* incoming calls awaiting acceptance */
 	struct sk_buff_head	reject_queue;	/* packets awaiting rejection */
 	struct sk_buff_head	event_queue;	/* endpoint event packets awaiting processing */
+	struct mutex		conn_lock;	/* Client connection creation lock */
 	spinlock_t		lock;		/* access lock */
 	rwlock_t		services_lock;	/* lock for services list */
-	atomic_t		usage;
 	int			debug_id;	/* debug ID for printks */
 	volatile char		error_rcvd;	/* T if received ICMP error outstanding */
 	struct sockaddr_rxrpc	srx;		/* local address */
@@ -674,11 +674,25 @@ struct rxrpc_transport *rxrpc_find_transport(struct rxrpc_local *,
 /*
  * local-object.c
  */
-extern rwlock_t rxrpc_local_lock;
+extern struct objcache rxrpc_local_cache;
 
 struct rxrpc_local *rxrpc_lookup_local(struct sockaddr_rxrpc *);
-void rxrpc_put_local(struct rxrpc_local *);
-void __exit rxrpc_destroy_all_locals(void);
+
+static inline void rxrpc_get_local(struct rxrpc_local *local)
+{
+	objcache_get(&local->obj);
+}
+
+static inline
+struct rxrpc_local *rxrpc_get_local_maybe(struct rxrpc_local *local)
+{
+	return objcache_get_maybe(&local->obj) ? local : NULL;
+}
+
+static inline void rxrpc_put_local(struct rxrpc_local *local)
+{
+	objcache_put(&rxrpc_local_cache, &local->obj);
+}
 
 /*
  * sysctl.c
@@ -866,15 +880,6 @@ static inline void rxrpc_purge_queue(struct sk_buff_head *list)
 		rxrpc_free_skb(skb);
 }
 
-static inline void __rxrpc_get_local(struct rxrpc_local *local, const char *f)
-{
-	CHECK_SLAB_OKAY(&local->usage);
-	if (atomic_inc_return(&local->usage) == 1)
-		printk("resurrected (%s)\n", f);
-}
-
-#define rxrpc_get_local(LOCAL) __rxrpc_get_local((LOCAL), __func__)
-
 #define rxrpc_get_call(CALL)				\
 do {							\
 	CHECK_SLAB_OKAY(&(CALL)->usage);		\
diff --git a/net/rxrpc/ar-local.c b/net/rxrpc/ar-local.c
index 7060995a4276..6ab0e9bfdbe8 100644
--- a/net/rxrpc/ar-local.c
+++ b/net/rxrpc/ar-local.c
@@ -82,13 +82,14 @@ static void rxrpc_send_version_request(struct rxrpc_local *local,
  */
 void rxrpc_process_local_events(struct work_struct *work)
 {
-	struct rxrpc_local *local = container_of(work, struct rxrpc_local, event_processor);
+	struct rxrpc_local *local =
+		container_of(work, struct rxrpc_local, processor);
 	struct sk_buff *skb;
 	char v;
 
 	_enter("");
 
-	atomic_inc(&local->usage);
+	rxrpc_get_local(local);
 	
 	while ((skb = skb_dequeue(&local->event_queue))) {
 		struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
diff --git a/net/rxrpc/local-object.c b/net/rxrpc/local-object.c
index 1dc701dbc715..4f44e86e70fe 100644
--- a/net/rxrpc/local-object.c
+++ b/net/rxrpc/local-object.c
@@ -19,38 +19,115 @@
 #include <net/af_rxrpc.h>
 #include "ar-internal.h"
 
-static LIST_HEAD(rxrpc_locals);
-DEFINE_RWLOCK(rxrpc_local_lock);
-static DECLARE_RWSEM(rxrpc_local_sem);
-static DECLARE_WAIT_QUEUE_HEAD(rxrpc_local_wq);
+static void rxrpc_local_prepare_for_gc(struct obj_node *);
+static void rxrpc_local_gc_rcu(struct rcu_head *);
+static unsigned long rxrpc_local_hash_key(const void *);
+static int rxrpc_local_cmp_key(const struct obj_node *, const void *);
+
+static DEFINE_MUTEX(rxrpc_local_mutex);
+static struct hlist_head rxrpc_local_cache_hash[16];
+
+struct objcache rxrpc_local_cache = {
+	.name		= "locals",
+	.prepare_for_gc	= rxrpc_local_prepare_for_gc,
+	.gc_rcu		= rxrpc_local_gc_rcu,
+	.hash_key	= rxrpc_local_hash_key,
+	.cmp_key	= rxrpc_local_cmp_key,
+	.hash_table	= rxrpc_local_cache_hash,
+	.gc_delay	= 2,
+	.nr_buckets	= ARRAY_SIZE(rxrpc_local_cache_hash),
+};
 
-static void rxrpc_destroy_local(struct work_struct *work);
+/*
+ * Hash a local key.
+ */
+static unsigned long rxrpc_local_hash_key(const void *_srx)
+{
+	const struct sockaddr_rxrpc *srx = _srx;
+	const u16 *p;
+	unsigned int i, size;
+	unsigned long hash_key;
+
+	_enter("%u", srx->transport.family);
+
+	hash_key = srx->transport_type;
+	hash_key += srx->transport_len;
+	hash_key += srx->transport.family;
+
+	switch (srx->transport.family) {
+	case AF_INET:
+		hash_key += (u16 __force)srx->transport.sin.sin_port;
+		size = sizeof(srx->transport.sin.sin_addr);
+		p = (u16 *)&srx->transport.sin.sin_addr;
+		break;
+	default:
+		BUG();
+	}
+
+	/* Step through the local address in 16-bit portions for speed */
+	for (i = 0; i < size; i += sizeof(*p), p++)
+		hash_key += *p;
+
+	_leave(" = 0x%lx", hash_key);
+	return hash_key;
+}
 
 /*
- * allocate a new local
+ * Compare a local to a key.  Return -ve, 0 or +ve to indicate less than, same
+ * or greater than.
  */
-static
-struct rxrpc_local *rxrpc_alloc_local(struct sockaddr_rxrpc *srx)
+static int rxrpc_local_cmp_key(const struct obj_node *obj, const void *_srx)
+{
+	const struct rxrpc_local *local =
+		container_of(obj, struct rxrpc_local, obj);
+	const struct sockaddr_rxrpc *srx = _srx;
+	int diff;
+
+	diff = ((local->srx.transport_type - srx->transport_type) ?:
+		(local->srx.transport_len - srx->transport_len) ?:
+		(local->srx.transport.family - srx->transport.family));
+	if (diff != 0)
+		return diff;
+
+	switch (srx->transport.family) {
+	case AF_INET:
+		/* If the choice of UDP port is left up to the transport, then
+		 * the endpoint record doesn't match.
+		 */
+		return ((u16 __force)local->srx.transport.sin.sin_port -
+			(u16 __force)srx->transport.sin.sin_port) ?:
+			memcmp(&local->srx.transport.sin.sin_addr,
+			       &srx->transport.sin.sin_addr,
+			       sizeof(struct in_addr));
+	default:
+		BUG();
+	}
+}
+
+/*
+ * Allocate a new local endpoint.  This is service ID independent but rather
+ * defines a specific transport endpoint.
+ */
+static struct rxrpc_local *rxrpc_alloc_local(struct sockaddr_rxrpc *srx)
 {
 	struct rxrpc_local *local;
 
 	local = kzalloc(sizeof(struct rxrpc_local), GFP_KERNEL);
 	if (local) {
-		INIT_WORK(&local->destroyer, &rxrpc_destroy_local);
 		INIT_WORK(&local->acceptor, &rxrpc_accept_incoming_calls);
 		INIT_WORK(&local->rejecter, &rxrpc_reject_packets);
-		INIT_WORK(&local->event_processor, &rxrpc_process_local_events);
+		INIT_WORK(&local->processor, &rxrpc_process_local_events);
 		INIT_LIST_HEAD(&local->services);
-		INIT_LIST_HEAD(&local->link);
 		init_rwsem(&local->defrag_sem);
 		skb_queue_head_init(&local->accept_queue);
 		skb_queue_head_init(&local->reject_queue);
 		skb_queue_head_init(&local->event_queue);
+		mutex_init(&local->conn_lock);
 		spin_lock_init(&local->lock);
 		rwlock_init(&local->services_lock);
-		atomic_set(&local->usage, 1);
 		local->debug_id = atomic_inc_return(&rxrpc_debug_id);
 		memcpy(&local->srx, srx, sizeof(*srx));
+		local->srx.srx_service = 0;
 	}
 
 	_leave(" = %p", local);
@@ -59,9 +136,9 @@ struct rxrpc_local *rxrpc_alloc_local(struct sockaddr_rxrpc *srx)
 
 /*
  * create the local socket
- * - must be called with rxrpc_local_sem writelocked
+ * - must be called with rxrpc_local_mutex locked
  */
-static int rxrpc_create_local(struct rxrpc_local *local)
+static int rxrpc_open_socket(struct rxrpc_local *local)
 {
 	struct sock *sock;
 	int ret, opt;
@@ -80,10 +157,10 @@ static int rxrpc_create_local(struct rxrpc_local *local)
 	if (local->srx.transport_len > sizeof(sa_family_t)) {
 		_debug("bind");
 		ret = kernel_bind(local->socket,
-				  (struct sockaddr *) &local->srx.transport,
+				  (struct sockaddr *)&local->srx.transport,
 				  local->srx.transport_len);
 		if (ret < 0) {
-			_debug("bind failed");
+			_debug("bind failed %d", ret);
 			goto error;
 		}
 	}
@@ -106,10 +183,6 @@ static int rxrpc_create_local(struct rxrpc_local *local)
 		goto error;
 	}
 
-	write_lock_bh(&rxrpc_local_lock);
-	list_add(&local->link, &rxrpc_locals);
-	write_unlock_bh(&rxrpc_local_lock);
-
 	/* set the socket up */
 	sock = local->socket->sk;
 	sock->sk_user_data	= local;
@@ -129,71 +202,53 @@ error:
 }
 
 /*
- * create a new local endpoint using the specified UDP address
+ * Look up or create a new local endpoint using the specified address.
  */
 struct rxrpc_local *rxrpc_lookup_local(struct sockaddr_rxrpc *srx)
 {
 	struct rxrpc_local *local;
+	struct obj_node *obj;
+	const char *new;
 	int ret;
 
-	_enter("{%d,%u,%pI4+%hu}",
-	       srx->transport_type,
-	       srx->transport.family,
-	       &srx->transport.sin.sin_addr,
-	       ntohs(srx->transport.sin.sin_port));
-
-	down_write(&rxrpc_local_sem);
-
-	/* see if we have a suitable local local endpoint already */
-	read_lock_bh(&rxrpc_local_lock);
-
-	list_for_each_entry(local, &rxrpc_locals, link) {
-		_debug("CMP {%d,%u,%pI4+%hu}",
-		       local->srx.transport_type,
-		       local->srx.transport.family,
-		       &local->srx.transport.sin.sin_addr,
-		       ntohs(local->srx.transport.sin.sin_port));
-
-		if (local->srx.transport_type != srx->transport_type ||
-		    local->srx.transport.family != srx->transport.family)
-			continue;
-
-		switch (srx->transport.family) {
-		case AF_INET:
-			if (local->srx.transport.sin.sin_port !=
-			    srx->transport.sin.sin_port)
-				continue;
-			if (memcmp(&local->srx.transport.sin.sin_addr,
-				   &srx->transport.sin.sin_addr,
-				   sizeof(struct in_addr)) != 0)
-				continue;
-			goto found_local;
-
-		default:
-			BUG();
-		}
+	if (srx->transport.family == AF_INET) {
+		_enter("{%d,%u,%pI4+%hu}",
+		       srx->transport_type,
+		       srx->transport.family,
+		       &srx->transport.sin.sin_addr,
+		       ntohs(srx->transport.sin.sin_port));
+	} else {
+		_enter("{%d,%u}",
+		       srx->transport_type,
+		       srx->transport.family);
+		return ERR_PTR(-EAFNOSUPPORT);
 	}
 
-	read_unlock_bh(&rxrpc_local_lock);
-
-	/* we didn't find one, so we need to create one */
-	local = rxrpc_alloc_local(srx);
-	if (!local) {
-		up_write(&rxrpc_local_sem);
-		return ERR_PTR(-ENOMEM);
+	mutex_lock(&rxrpc_local_mutex);
+
+	obj = objcache_lookup_rcu(&rxrpc_local_cache, srx);
+	if (obj && objcache_get_maybe(obj)) {
+		local = container_of(obj, struct rxrpc_local, obj);
+		new = "old";
+	} else {
+		local = rxrpc_alloc_local(srx);
+		if (!local)
+			goto nomem;
+
+		ret = rxrpc_open_socket(local);
+		if (ret < 0)
+			goto sock_error;
+
+		obj = objcache_try_add(&rxrpc_local_cache, &local->obj,
+				       &local->srx);
+		BUG_ON(obj != &local->obj);
+		new = "new";
 	}
 
-	ret = rxrpc_create_local(local);
-	if (ret < 0) {
-		up_write(&rxrpc_local_sem);
-		kfree(local);
-		_leave(" = %d", ret);
-		return ERR_PTR(ret);
-	}
+	mutex_unlock(&rxrpc_local_mutex);
 
-	up_write(&rxrpc_local_sem);
-
-	_net("LOCAL new %d {%d,%u,%pI4+%hu}",
+	_net("LOCAL %s %d {%d,%u,%pI4+%hu}",
+	     new,
 	     local->debug_id,
 	     local->srx.transport_type,
 	     local->srx.transport.family,
@@ -203,114 +258,54 @@ struct rxrpc_local *rxrpc_lookup_local(struct sockaddr_rxrpc *srx)
 	_leave(" = %p [new]", local);
 	return local;
 
-found_local:
-	rxrpc_get_local(local);
-	read_unlock_bh(&rxrpc_local_lock);
-	up_write(&rxrpc_local_sem);
-
-	_net("LOCAL old %d {%d,%u,%pI4+%hu}",
-	     local->debug_id,
-	     local->srx.transport_type,
-	     local->srx.transport.family,
-	     &local->srx.transport.sin.sin_addr,
-	     ntohs(local->srx.transport.sin.sin_port));
-
-	_leave(" = %p [reuse]", local);
-	return local;
+nomem:
+	ret = -ENOMEM;
+sock_error:
+	mutex_unlock(&rxrpc_local_mutex);
+	kfree(local);
+	_leave(" = %d", ret);
+	return ERR_PTR(ret);
 }
 
 /*
- * release a local endpoint
+ * Prepare to garbage collect local endpoints.  Closing the socket cannot be
+ * done from an RCU callback context because it might sleep.
  */
-void rxrpc_put_local(struct rxrpc_local *local)
+static void rxrpc_local_prepare_for_gc(struct obj_node *obj)
 {
-	_enter("%p{u=%d}", local, atomic_read(&local->usage));
-
-	ASSERTCMP(atomic_read(&local->usage), >, 0);
-
-	/* to prevent a race, the decrement and the dequeue must be effectively
-	 * atomic */
-	write_lock_bh(&rxrpc_local_lock);
-	if (unlikely(atomic_dec_and_test(&local->usage))) {
-		_debug("destroy local");
-		rxrpc_queue_work(&local->destroyer);
+	struct rxrpc_local *local = container_of(obj, struct rxrpc_local, obj);
+	struct socket *socket = local->socket;
+
+	if (socket) {
+		local->socket = NULL;
+		kernel_sock_shutdown(socket, SHUT_RDWR);
+		socket->sk->sk_user_data = NULL;
+		sock_release(socket);
 	}
-	write_unlock_bh(&rxrpc_local_lock);
-	_leave("");
 }
 
 /*
- * destroy a local endpoint
+ * Destroy a local endpoint after the RCU grace period expires.
  */
-static void rxrpc_destroy_local(struct work_struct *work)
+static void rxrpc_local_gc_rcu(struct rcu_head *rcu)
 {
-	struct rxrpc_local *local =
-		container_of(work, struct rxrpc_local, destroyer);
-
-	_enter("%p{%d}", local, atomic_read(&local->usage));
-
-	down_write(&rxrpc_local_sem);
-
-	write_lock_bh(&rxrpc_local_lock);
-	if (atomic_read(&local->usage) > 0) {
-		write_unlock_bh(&rxrpc_local_lock);
-		up_read(&rxrpc_local_sem);
-		_leave(" [resurrected]");
-		return;
-	}
-
-	list_del(&local->link);
-	local->socket->sk->sk_user_data = NULL;
-	write_unlock_bh(&rxrpc_local_lock);
+	struct rxrpc_local *local = container_of(rcu, struct rxrpc_local, obj.rcu);
 
-	downgrade_write(&rxrpc_local_sem);
+	_enter("%p", local);
 
 	ASSERT(list_empty(&local->services));
 	ASSERT(!work_pending(&local->acceptor));
 	ASSERT(!work_pending(&local->rejecter));
-	ASSERT(!work_pending(&local->event_processor));
+	ASSERT(!work_pending(&local->processor));
 
 	/* finish cleaning up the local descriptor */
 	rxrpc_purge_queue(&local->accept_queue);
 	rxrpc_purge_queue(&local->reject_queue);
 	rxrpc_purge_queue(&local->event_queue);
-	kernel_sock_shutdown(local->socket, SHUT_RDWR);
-	sock_release(local->socket);
-
-	up_read(&rxrpc_local_sem);
 
 	_net("DESTROY LOCAL %d", local->debug_id);
 	kfree(local);
 
-	if (list_empty(&rxrpc_locals))
-		wake_up_all(&rxrpc_local_wq);
-
-	_leave("");
-}
-
-/*
- * preemptively destroy all local local endpoint rather than waiting for
- * them to be destroyed
- */
-void __exit rxrpc_destroy_all_locals(void)
-{
-	DECLARE_WAITQUEUE(myself,current);
-
-	_enter("");
-
-	/* we simply have to wait for them to go away */
-	if (!list_empty(&rxrpc_locals)) {
-		set_current_state(TASK_UNINTERRUPTIBLE);
-		add_wait_queue(&rxrpc_local_wq, &myself);
-
-		while (!list_empty(&rxrpc_locals)) {
-			schedule();
-			set_current_state(TASK_UNINTERRUPTIBLE);
-		}
-
-		remove_wait_queue(&rxrpc_local_wq, &myself);
-		set_current_state(TASK_RUNNING);
-	}
-
+	objcache_obj_rcu_done(&rxrpc_local_cache);
 	_leave("");
 }

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 05/11] rxrpc: procfs file to list local endpoints
  2016-03-07 14:37 [PATCH 00/11] RxRPC: Rewrite part 2 David Howells
                   ` (3 preceding siblings ...)
  2016-03-07 14:38 ` [PATCH 04/11] rxrpc: Implement local endpoint cache David Howells
@ 2016-03-07 14:38 ` David Howells
  2016-03-07 14:38 ` [PATCH 06/11] rxrpc: Rename ar-local.c to local-event.c David Howells
                   ` (5 subsequent siblings)
  10 siblings, 0 replies; 21+ messages in thread
From: David Howells @ 2016-03-07 14:38 UTC (permalink / raw)
  To: linux-afs; +Cc: dhowells, netdev, linux-kernel

Add a proc file to list local rxrpc endpoints using the object cache
facility to do much of the work.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/af_rxrpc.c     |    3 +++
 net/rxrpc/local-object.c |   29 +++++++++++++++++++++++++++++
 2 files changed, 32 insertions(+)

diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index a27d8e3ef854..23ebb127cac1 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -842,6 +842,8 @@ static int __init af_rxrpc_init(void)
 	proc_create("rxrpc_calls", 0, init_net.proc_net, &rxrpc_call_seq_fops);
 	proc_create("rxrpc_conns", 0, init_net.proc_net,
 		    &rxrpc_connection_seq_fops);
+	proc_create_data("rxrpc_locals", 0, init_net.proc_net,
+			 &objcache_seq_fops, &rxrpc_local_cache);
 #endif
 	return 0;
 
@@ -883,6 +885,7 @@ static void __exit af_rxrpc_exit(void)
 
 	_debug("flush scheduled work");
 	flush_workqueue(rxrpc_workqueue);
+	remove_proc_entry("rxrpc_locals", init_net.proc_net);
 	remove_proc_entry("rxrpc_conns", init_net.proc_net);
 	remove_proc_entry("rxrpc_calls", init_net.proc_net);
 	destroy_workqueue(rxrpc_workqueue);
diff --git a/net/rxrpc/local-object.c b/net/rxrpc/local-object.c
index 4f44e86e70fe..cc6354675026 100644
--- a/net/rxrpc/local-object.c
+++ b/net/rxrpc/local-object.c
@@ -19,6 +19,7 @@
 #include <net/af_rxrpc.h>
 #include "ar-internal.h"
 
+static int rxrpc_local_seq_show(struct seq_file *, void *);
 static void rxrpc_local_prepare_for_gc(struct obj_node *);
 static void rxrpc_local_gc_rcu(struct rcu_head *);
 static unsigned long rxrpc_local_hash_key(const void *);
@@ -29,6 +30,7 @@ static struct hlist_head rxrpc_local_cache_hash[16];
 
 struct objcache rxrpc_local_cache = {
 	.name		= "locals",
+	.seq_show	= rxrpc_local_seq_show,
 	.prepare_for_gc	= rxrpc_local_prepare_for_gc,
 	.gc_rcu		= rxrpc_local_gc_rcu,
 	.hash_key	= rxrpc_local_hash_key,
@@ -309,3 +311,30 @@ static void rxrpc_local_gc_rcu(struct rcu_head *rcu)
 	objcache_obj_rcu_done(&rxrpc_local_cache);
 	_leave("");
 }
+
+/*
+ * Display a local endpoint in /proc/net/rxrpc_locals.
+ */
+static int rxrpc_local_seq_show(struct seq_file *seq, void *v)
+{
+	struct rxrpc_local *local;
+
+	if (v == SEQ_START_TOKEN) {
+		seq_puts(seq, "Use Proto LPort Local\n");
+		return 0;
+	}
+
+	local = hlist_entry(v, struct rxrpc_local, obj.link);
+
+	switch (local->srx.transport.family) {
+	case AF_INET:
+		seq_printf(seq,
+			   "%3d UDP   %5hu %pI4\n",
+			   atomic_read(&local->obj.usage),
+			   ntohs(local->srx.transport.sin.sin_port),
+			   &local->srx.transport.sin.sin_addr);
+		break;
+	}
+
+	return 0;
+}

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 06/11] rxrpc: Rename ar-local.c to local-event.c
  2016-03-07 14:37 [PATCH 00/11] RxRPC: Rewrite part 2 David Howells
                   ` (4 preceding siblings ...)
  2016-03-07 14:38 ` [PATCH 05/11] rxrpc: procfs file to list local endpoints David Howells
@ 2016-03-07 14:38 ` David Howells
  2016-03-07 14:38 ` [PATCH 07/11] rxrpc: Rename ar-peer.c to peer-object.c David Howells
                   ` (4 subsequent siblings)
  10 siblings, 0 replies; 21+ messages in thread
From: David Howells @ 2016-03-07 14:38 UTC (permalink / raw)
  To: linux-afs; +Cc: dhowells, netdev, linux-kernel

Rename ar-local.c to local-event.c.  What's left in the file is the local
endpoint event/packet handling code after the object management has been
separated out.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/Makefile      |    2 -
 net/rxrpc/ar-internal.h |   10 ++--
 net/rxrpc/ar-local.c    |  119 -----------------------------------------------
 net/rxrpc/local-event.c |  119 +++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 125 insertions(+), 125 deletions(-)
 delete mode 100644 net/rxrpc/ar-local.c
 create mode 100644 net/rxrpc/local-event.c

diff --git a/net/rxrpc/Makefile b/net/rxrpc/Makefile
index 166e4cb3b13c..7f41d272bf3d 100644
--- a/net/rxrpc/Makefile
+++ b/net/rxrpc/Makefile
@@ -12,13 +12,13 @@ af-rxrpc-y := \
 	ar-error.o \
 	ar-input.o \
 	ar-key.o \
-	ar-local.o \
 	ar-output.o \
 	ar-peer.o \
 	ar-recvmsg.o \
 	ar-security.o \
 	ar-skbuff.o \
 	ar-transport.o \
+	local-event.o \
 	local-object.o \
 	objcache.o
 
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index ceb1442f745b..79c965789c2c 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -596,11 +596,6 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *, struct sk_buff *, bool, bool);
 void rxrpc_fast_process_packet(struct rxrpc_call *, struct sk_buff *);
 
 /*
- * ar-local.c
- */
-extern void rxrpc_process_local_events(struct work_struct *);
-
-/*
  * ar-key.c
  */
 extern struct key_type key_type_rxrpc;
@@ -672,6 +667,11 @@ struct rxrpc_transport *rxrpc_find_transport(struct rxrpc_local *,
 					     struct rxrpc_peer *);
 
 /*
+ * local-event.c
+ */
+extern void rxrpc_process_local_events(struct work_struct *);
+
+/*
  * local-object.c
  */
 extern struct objcache rxrpc_local_cache;
diff --git a/net/rxrpc/ar-local.c b/net/rxrpc/ar-local.c
deleted file mode 100644
index 6ab0e9bfdbe8..000000000000
--- a/net/rxrpc/ar-local.c
+++ /dev/null
@@ -1,119 +0,0 @@
-/* AF_RXRPC local endpoint management
- *
- * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
- * Written by David Howells (dhowells@redhat.com)
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version
- * 2 of the License, or (at your option) any later version.
- */
-
-#include <linux/module.h>
-#include <linux/net.h>
-#include <linux/skbuff.h>
-#include <linux/slab.h>
-#include <linux/udp.h>
-#include <linux/ip.h>
-#include <net/sock.h>
-#include <net/af_rxrpc.h>
-#include <generated/utsrelease.h>
-#include "ar-internal.h"
-
-static const char rxrpc_version_string[65] = "linux-" UTS_RELEASE " AF_RXRPC";
-
-/*
- * Reply to a version request
- */
-static void rxrpc_send_version_request(struct rxrpc_local *local,
-				       struct rxrpc_host_header *hdr,
-				       struct sk_buff *skb)
-{
-	struct rxrpc_wire_header whdr;
-	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
-	struct sockaddr_in sin;
-	struct msghdr msg;
-	struct kvec iov[2];
-	size_t len;
-	int ret;
-
-	_enter("");
-
-	sin.sin_family = AF_INET;
-	sin.sin_port = udp_hdr(skb)->source;
-	sin.sin_addr.s_addr = ip_hdr(skb)->saddr;
-
-	msg.msg_name	= &sin;
-	msg.msg_namelen	= sizeof(sin);
-	msg.msg_control	= NULL;
-	msg.msg_controllen = 0;
-	msg.msg_flags	= 0;
-
-	whdr.epoch	= htonl(sp->hdr.epoch);
-	whdr.cid	= htonl(sp->hdr.cid);
-	whdr.callNumber	= htonl(sp->hdr.callNumber);
-	whdr.seq	= 0;
-	whdr.serial	= 0;
-	whdr.type	= RXRPC_PACKET_TYPE_VERSION;
-	whdr.flags	= RXRPC_LAST_PACKET | (~hdr->flags & RXRPC_CLIENT_INITIATED);
-	whdr.userStatus	= 0;
-	whdr.securityIndex = 0;
-	whdr._rsvd	= 0;
-	whdr.serviceId	= htons(sp->hdr.serviceId);
-
-	iov[0].iov_base	= &whdr;
-	iov[0].iov_len	= sizeof(whdr);
-	iov[1].iov_base	= (char *)rxrpc_version_string;
-	iov[1].iov_len	= sizeof(rxrpc_version_string);
-
-	len = iov[0].iov_len + iov[1].iov_len;
-
-	_proto("Tx VERSION (reply)");
-
-	ret = kernel_sendmsg(local->socket, &msg, iov, 2, len);
-	if (ret < 0)
-		_debug("sendmsg failed: %d", ret);
-
-	_leave("");
-}
-
-/*
- * Process event packets targetted at a local endpoint.
- */
-void rxrpc_process_local_events(struct work_struct *work)
-{
-	struct rxrpc_local *local =
-		container_of(work, struct rxrpc_local, processor);
-	struct sk_buff *skb;
-	char v;
-
-	_enter("");
-
-	rxrpc_get_local(local);
-	
-	while ((skb = skb_dequeue(&local->event_queue))) {
-		struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
-
-		_debug("{%d},{%u}", local->debug_id, sp->hdr.type);
-
-		switch (sp->hdr.type) {
-		case RXRPC_PACKET_TYPE_VERSION:
-			if (skb_copy_bits(skb, 0, &v, 1) < 0)
-				return;
-			_proto("Rx VERSION { %02x }", v);
-			if (v == 0)
-				rxrpc_send_version_request(local, &sp->hdr, skb);
-			break;
-
-		default:
-			/* Just ignore anything we don't understand */
-			break;
-		}
-
-		rxrpc_put_local(local);
-		rxrpc_free_skb(skb);
-	}
-
-	rxrpc_put_local(local);
-	_leave("");
-}
diff --git a/net/rxrpc/local-event.c b/net/rxrpc/local-event.c
new file mode 100644
index 000000000000..6ab0e9bfdbe8
--- /dev/null
+++ b/net/rxrpc/local-event.c
@@ -0,0 +1,119 @@
+/* AF_RXRPC local endpoint management
+ *
+ * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version
+ * 2 of the License, or (at your option) any later version.
+ */
+
+#include <linux/module.h>
+#include <linux/net.h>
+#include <linux/skbuff.h>
+#include <linux/slab.h>
+#include <linux/udp.h>
+#include <linux/ip.h>
+#include <net/sock.h>
+#include <net/af_rxrpc.h>
+#include <generated/utsrelease.h>
+#include "ar-internal.h"
+
+static const char rxrpc_version_string[65] = "linux-" UTS_RELEASE " AF_RXRPC";
+
+/*
+ * Reply to a version request
+ */
+static void rxrpc_send_version_request(struct rxrpc_local *local,
+				       struct rxrpc_host_header *hdr,
+				       struct sk_buff *skb)
+{
+	struct rxrpc_wire_header whdr;
+	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+	struct sockaddr_in sin;
+	struct msghdr msg;
+	struct kvec iov[2];
+	size_t len;
+	int ret;
+
+	_enter("");
+
+	sin.sin_family = AF_INET;
+	sin.sin_port = udp_hdr(skb)->source;
+	sin.sin_addr.s_addr = ip_hdr(skb)->saddr;
+
+	msg.msg_name	= &sin;
+	msg.msg_namelen	= sizeof(sin);
+	msg.msg_control	= NULL;
+	msg.msg_controllen = 0;
+	msg.msg_flags	= 0;
+
+	whdr.epoch	= htonl(sp->hdr.epoch);
+	whdr.cid	= htonl(sp->hdr.cid);
+	whdr.callNumber	= htonl(sp->hdr.callNumber);
+	whdr.seq	= 0;
+	whdr.serial	= 0;
+	whdr.type	= RXRPC_PACKET_TYPE_VERSION;
+	whdr.flags	= RXRPC_LAST_PACKET | (~hdr->flags & RXRPC_CLIENT_INITIATED);
+	whdr.userStatus	= 0;
+	whdr.securityIndex = 0;
+	whdr._rsvd	= 0;
+	whdr.serviceId	= htons(sp->hdr.serviceId);
+
+	iov[0].iov_base	= &whdr;
+	iov[0].iov_len	= sizeof(whdr);
+	iov[1].iov_base	= (char *)rxrpc_version_string;
+	iov[1].iov_len	= sizeof(rxrpc_version_string);
+
+	len = iov[0].iov_len + iov[1].iov_len;
+
+	_proto("Tx VERSION (reply)");
+
+	ret = kernel_sendmsg(local->socket, &msg, iov, 2, len);
+	if (ret < 0)
+		_debug("sendmsg failed: %d", ret);
+
+	_leave("");
+}
+
+/*
+ * Process event packets targetted at a local endpoint.
+ */
+void rxrpc_process_local_events(struct work_struct *work)
+{
+	struct rxrpc_local *local =
+		container_of(work, struct rxrpc_local, processor);
+	struct sk_buff *skb;
+	char v;
+
+	_enter("");
+
+	rxrpc_get_local(local);
+	
+	while ((skb = skb_dequeue(&local->event_queue))) {
+		struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+
+		_debug("{%d},{%u}", local->debug_id, sp->hdr.type);
+
+		switch (sp->hdr.type) {
+		case RXRPC_PACKET_TYPE_VERSION:
+			if (skb_copy_bits(skb, 0, &v, 1) < 0)
+				return;
+			_proto("Rx VERSION { %02x }", v);
+			if (v == 0)
+				rxrpc_send_version_request(local, &sp->hdr, skb);
+			break;
+
+		default:
+			/* Just ignore anything we don't understand */
+			break;
+		}
+
+		rxrpc_put_local(local);
+		rxrpc_free_skb(skb);
+	}
+
+	rxrpc_put_local(local);
+	_leave("");
+}

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 07/11] rxrpc: Rename ar-peer.c to peer-object.c
  2016-03-07 14:37 [PATCH 00/11] RxRPC: Rewrite part 2 David Howells
                   ` (5 preceding siblings ...)
  2016-03-07 14:38 ` [PATCH 06/11] rxrpc: Rename ar-local.c to local-event.c David Howells
@ 2016-03-07 14:38 ` David Howells
  2016-03-07 14:38 ` [PATCH 08/11] rxrpc: Implement peer endpoint cache David Howells
                   ` (3 subsequent siblings)
  10 siblings, 0 replies; 21+ messages in thread
From: David Howells @ 2016-03-07 14:38 UTC (permalink / raw)
  To: linux-afs; +Cc: dhowells, netdev, linux-kernel

Rename ar-peer.c to peer-object.c for consistency (to be) with the other
new object cache management files.  It will be modified in a later commit.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/Makefile      |    4 -
 net/rxrpc/ar-internal.h |   16 +-
 net/rxrpc/ar-peer.c     |  303 -----------------------------------------------
 net/rxrpc/peer-object.c |  303 +++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 313 insertions(+), 313 deletions(-)
 delete mode 100644 net/rxrpc/ar-peer.c
 create mode 100644 net/rxrpc/peer-object.c

diff --git a/net/rxrpc/Makefile b/net/rxrpc/Makefile
index 7f41d272bf3d..38b23f6fb9aa 100644
--- a/net/rxrpc/Makefile
+++ b/net/rxrpc/Makefile
@@ -13,14 +13,14 @@ af-rxrpc-y := \
 	ar-input.o \
 	ar-key.o \
 	ar-output.o \
-	ar-peer.o \
 	ar-recvmsg.o \
 	ar-security.o \
 	ar-skbuff.o \
 	ar-transport.o \
 	local-event.o \
 	local-object.o \
-	objcache.o
+	objcache.o \
+	peer-object.o
 
 af-rxrpc-$(CONFIG_PROC_FS) += ar-proc.o
 af-rxrpc-$(CONFIG_SYSCTL) += sysctl.o
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 79c965789c2c..ec18b06789e6 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -617,14 +617,6 @@ int rxrpc_client_sendmsg(struct rxrpc_sock *, struct rxrpc_transport *,
 int rxrpc_server_sendmsg(struct rxrpc_sock *, struct msghdr *, size_t);
 
 /*
- * ar-peer.c
- */
-struct rxrpc_peer *rxrpc_get_peer(struct sockaddr_rxrpc *, gfp_t);
-void rxrpc_put_peer(struct rxrpc_peer *);
-struct rxrpc_peer *rxrpc_find_peer(struct rxrpc_local *, __be32, __be16);
-void __exit rxrpc_destroy_all_peers(void);
-
-/*
  * ar-proc.c
  */
 extern const char *const rxrpc_call_states[];
@@ -695,6 +687,14 @@ static inline void rxrpc_put_local(struct rxrpc_local *local)
 }
 
 /*
+ * peer-object.c
+ */
+struct rxrpc_peer *rxrpc_get_peer(struct sockaddr_rxrpc *, gfp_t);
+void rxrpc_put_peer(struct rxrpc_peer *);
+struct rxrpc_peer *rxrpc_find_peer(struct rxrpc_local *, __be32, __be16);
+void __exit rxrpc_destroy_all_peers(void);
+
+/*
  * sysctl.c
  */
 #ifdef CONFIG_SYSCTL
diff --git a/net/rxrpc/ar-peer.c b/net/rxrpc/ar-peer.c
deleted file mode 100644
index dc089b1976aa..000000000000
--- a/net/rxrpc/ar-peer.c
+++ /dev/null
@@ -1,303 +0,0 @@
-/* RxRPC remote transport endpoint management
- *
- * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
- * Written by David Howells (dhowells@redhat.com)
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version
- * 2 of the License, or (at your option) any later version.
- */
-
-#include <linux/module.h>
-#include <linux/net.h>
-#include <linux/skbuff.h>
-#include <linux/udp.h>
-#include <linux/in.h>
-#include <linux/in6.h>
-#include <linux/icmp.h>
-#include <linux/slab.h>
-#include <net/sock.h>
-#include <net/af_rxrpc.h>
-#include <net/ip.h>
-#include <net/route.h>
-#include "ar-internal.h"
-
-static LIST_HEAD(rxrpc_peers);
-static DEFINE_RWLOCK(rxrpc_peer_lock);
-static DECLARE_WAIT_QUEUE_HEAD(rxrpc_peer_wq);
-
-static void rxrpc_destroy_peer(struct work_struct *work);
-
-/*
- * assess the MTU size for the network interface through which this peer is
- * reached
- */
-static void rxrpc_assess_MTU_size(struct rxrpc_peer *peer)
-{
-	struct rtable *rt;
-	struct flowi4 fl4;
-
-	peer->if_mtu = 1500;
-
-	rt = ip_route_output_ports(&init_net, &fl4, NULL,
-				   peer->srx.transport.sin.sin_addr.s_addr, 0,
-				   htons(7000), htons(7001),
-				   IPPROTO_UDP, 0, 0);
-	if (IS_ERR(rt)) {
-		_leave(" [route err %ld]", PTR_ERR(rt));
-		return;
-	}
-
-	peer->if_mtu = dst_mtu(&rt->dst);
-	dst_release(&rt->dst);
-
-	_leave(" [if_mtu %u]", peer->if_mtu);
-}
-
-/*
- * allocate a new peer
- */
-static struct rxrpc_peer *rxrpc_alloc_peer(struct sockaddr_rxrpc *srx,
-					   gfp_t gfp)
-{
-	struct rxrpc_peer *peer;
-
-	_enter("");
-
-	peer = kzalloc(sizeof(struct rxrpc_peer), gfp);
-	if (peer) {
-		INIT_WORK(&peer->destroyer, &rxrpc_destroy_peer);
-		INIT_LIST_HEAD(&peer->link);
-		INIT_LIST_HEAD(&peer->error_targets);
-		spin_lock_init(&peer->lock);
-		atomic_set(&peer->usage, 1);
-		peer->debug_id = atomic_inc_return(&rxrpc_debug_id);
-		memcpy(&peer->srx, srx, sizeof(*srx));
-
-		rxrpc_assess_MTU_size(peer);
-		peer->mtu = peer->if_mtu;
-
-		if (srx->transport.family == AF_INET) {
-			peer->hdrsize = sizeof(struct iphdr);
-			switch (srx->transport_type) {
-			case SOCK_DGRAM:
-				peer->hdrsize += sizeof(struct udphdr);
-				break;
-			default:
-				BUG();
-				break;
-			}
-		} else {
-			BUG();
-		}
-
-		peer->hdrsize += sizeof(struct rxrpc_wire_header);
-		peer->maxdata = peer->mtu - peer->hdrsize;
-	}
-
-	_leave(" = %p", peer);
-	return peer;
-}
-
-/*
- * obtain a remote transport endpoint for the specified address
- */
-struct rxrpc_peer *rxrpc_get_peer(struct sockaddr_rxrpc *srx, gfp_t gfp)
-{
-	struct rxrpc_peer *peer, *candidate;
-	const char *new = "old";
-	int usage;
-
-	_enter("{%d,%d,%pI4+%hu}",
-	       srx->transport_type,
-	       srx->transport_len,
-	       &srx->transport.sin.sin_addr,
-	       ntohs(srx->transport.sin.sin_port));
-
-	/* search the peer list first */
-	read_lock_bh(&rxrpc_peer_lock);
-	list_for_each_entry(peer, &rxrpc_peers, link) {
-		_debug("check PEER %d { u=%d t=%d l=%d }",
-		       peer->debug_id,
-		       atomic_read(&peer->usage),
-		       peer->srx.transport_type,
-		       peer->srx.transport_len);
-
-		if (atomic_read(&peer->usage) > 0 &&
-		    peer->srx.transport_type == srx->transport_type &&
-		    peer->srx.transport_len == srx->transport_len &&
-		    memcmp(&peer->srx.transport,
-			   &srx->transport,
-			   srx->transport_len) == 0)
-			goto found_extant_peer;
-	}
-	read_unlock_bh(&rxrpc_peer_lock);
-
-	/* not yet present - create a candidate for a new record and then
-	 * redo the search */
-	candidate = rxrpc_alloc_peer(srx, gfp);
-	if (!candidate) {
-		_leave(" = -ENOMEM");
-		return ERR_PTR(-ENOMEM);
-	}
-
-	write_lock_bh(&rxrpc_peer_lock);
-
-	list_for_each_entry(peer, &rxrpc_peers, link) {
-		if (atomic_read(&peer->usage) > 0 &&
-		    peer->srx.transport_type == srx->transport_type &&
-		    peer->srx.transport_len == srx->transport_len &&
-		    memcmp(&peer->srx.transport,
-			   &srx->transport,
-			   srx->transport_len) == 0)
-			goto found_extant_second;
-	}
-
-	/* we can now add the new candidate to the list */
-	peer = candidate;
-	candidate = NULL;
-	usage = atomic_read(&peer->usage);
-
-	list_add_tail(&peer->link, &rxrpc_peers);
-	write_unlock_bh(&rxrpc_peer_lock);
-	new = "new";
-
-success:
-	_net("PEER %s %d {%d,%u,%pI4+%hu}",
-	     new,
-	     peer->debug_id,
-	     peer->srx.transport_type,
-	     peer->srx.transport.family,
-	     &peer->srx.transport.sin.sin_addr,
-	     ntohs(peer->srx.transport.sin.sin_port));
-
-	_leave(" = %p {u=%d}", peer, usage);
-	return peer;
-
-	/* we found the peer in the list immediately */
-found_extant_peer:
-	usage = atomic_inc_return(&peer->usage);
-	read_unlock_bh(&rxrpc_peer_lock);
-	goto success;
-
-	/* we found the peer on the second time through the list */
-found_extant_second:
-	usage = atomic_inc_return(&peer->usage);
-	write_unlock_bh(&rxrpc_peer_lock);
-	kfree(candidate);
-	goto success;
-}
-
-/*
- * find the peer associated with a packet
- */
-struct rxrpc_peer *rxrpc_find_peer(struct rxrpc_local *local,
-				   __be32 addr, __be16 port)
-{
-	struct rxrpc_peer *peer;
-
-	_enter("");
-
-	/* search the peer list */
-	read_lock_bh(&rxrpc_peer_lock);
-
-	if (local->srx.transport.family == AF_INET &&
-	    local->srx.transport_type == SOCK_DGRAM
-	    ) {
-		list_for_each_entry(peer, &rxrpc_peers, link) {
-			if (atomic_read(&peer->usage) > 0 &&
-			    peer->srx.transport_type == SOCK_DGRAM &&
-			    peer->srx.transport.family == AF_INET &&
-			    peer->srx.transport.sin.sin_port == port &&
-			    peer->srx.transport.sin.sin_addr.s_addr == addr)
-				goto found_UDP_peer;
-		}
-
-		goto new_UDP_peer;
-	}
-
-	read_unlock_bh(&rxrpc_peer_lock);
-	_leave(" = -EAFNOSUPPORT");
-	return ERR_PTR(-EAFNOSUPPORT);
-
-found_UDP_peer:
-	_net("Rx UDP DGRAM from peer %d", peer->debug_id);
-	atomic_inc(&peer->usage);
-	read_unlock_bh(&rxrpc_peer_lock);
-	_leave(" = %p", peer);
-	return peer;
-
-new_UDP_peer:
-	_net("Rx UDP DGRAM from NEW peer");
-	read_unlock_bh(&rxrpc_peer_lock);
-	_leave(" = -EBUSY [new]");
-	return ERR_PTR(-EBUSY);
-}
-
-/*
- * release a remote transport endpoint
- */
-void rxrpc_put_peer(struct rxrpc_peer *peer)
-{
-	_enter("%p{u=%d}", peer, atomic_read(&peer->usage));
-
-	ASSERTCMP(atomic_read(&peer->usage), >, 0);
-
-	if (likely(!atomic_dec_and_test(&peer->usage))) {
-		_leave(" [in use]");
-		return;
-	}
-
-	rxrpc_queue_work(&peer->destroyer);
-	_leave("");
-}
-
-/*
- * destroy a remote transport endpoint
- */
-static void rxrpc_destroy_peer(struct work_struct *work)
-{
-	struct rxrpc_peer *peer =
-		container_of(work, struct rxrpc_peer, destroyer);
-
-	_enter("%p{%d}", peer, atomic_read(&peer->usage));
-
-	write_lock_bh(&rxrpc_peer_lock);
-	list_del(&peer->link);
-	write_unlock_bh(&rxrpc_peer_lock);
-
-	_net("DESTROY PEER %d", peer->debug_id);
-	kfree(peer);
-
-	if (list_empty(&rxrpc_peers))
-		wake_up_all(&rxrpc_peer_wq);
-	_leave("");
-}
-
-/*
- * preemptively destroy all the peer records from a transport endpoint rather
- * than waiting for them to time out
- */
-void __exit rxrpc_destroy_all_peers(void)
-{
-	DECLARE_WAITQUEUE(myself,current);
-
-	_enter("");
-
-	/* we simply have to wait for them to go away */
-	if (!list_empty(&rxrpc_peers)) {
-		set_current_state(TASK_UNINTERRUPTIBLE);
-		add_wait_queue(&rxrpc_peer_wq, &myself);
-
-		while (!list_empty(&rxrpc_peers)) {
-			schedule();
-			set_current_state(TASK_UNINTERRUPTIBLE);
-		}
-
-		remove_wait_queue(&rxrpc_peer_wq, &myself);
-		set_current_state(TASK_RUNNING);
-	}
-
-	_leave("");
-}
diff --git a/net/rxrpc/peer-object.c b/net/rxrpc/peer-object.c
new file mode 100644
index 000000000000..dc089b1976aa
--- /dev/null
+++ b/net/rxrpc/peer-object.c
@@ -0,0 +1,303 @@
+/* RxRPC remote transport endpoint management
+ *
+ * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version
+ * 2 of the License, or (at your option) any later version.
+ */
+
+#include <linux/module.h>
+#include <linux/net.h>
+#include <linux/skbuff.h>
+#include <linux/udp.h>
+#include <linux/in.h>
+#include <linux/in6.h>
+#include <linux/icmp.h>
+#include <linux/slab.h>
+#include <net/sock.h>
+#include <net/af_rxrpc.h>
+#include <net/ip.h>
+#include <net/route.h>
+#include "ar-internal.h"
+
+static LIST_HEAD(rxrpc_peers);
+static DEFINE_RWLOCK(rxrpc_peer_lock);
+static DECLARE_WAIT_QUEUE_HEAD(rxrpc_peer_wq);
+
+static void rxrpc_destroy_peer(struct work_struct *work);
+
+/*
+ * assess the MTU size for the network interface through which this peer is
+ * reached
+ */
+static void rxrpc_assess_MTU_size(struct rxrpc_peer *peer)
+{
+	struct rtable *rt;
+	struct flowi4 fl4;
+
+	peer->if_mtu = 1500;
+
+	rt = ip_route_output_ports(&init_net, &fl4, NULL,
+				   peer->srx.transport.sin.sin_addr.s_addr, 0,
+				   htons(7000), htons(7001),
+				   IPPROTO_UDP, 0, 0);
+	if (IS_ERR(rt)) {
+		_leave(" [route err %ld]", PTR_ERR(rt));
+		return;
+	}
+
+	peer->if_mtu = dst_mtu(&rt->dst);
+	dst_release(&rt->dst);
+
+	_leave(" [if_mtu %u]", peer->if_mtu);
+}
+
+/*
+ * allocate a new peer
+ */
+static struct rxrpc_peer *rxrpc_alloc_peer(struct sockaddr_rxrpc *srx,
+					   gfp_t gfp)
+{
+	struct rxrpc_peer *peer;
+
+	_enter("");
+
+	peer = kzalloc(sizeof(struct rxrpc_peer), gfp);
+	if (peer) {
+		INIT_WORK(&peer->destroyer, &rxrpc_destroy_peer);
+		INIT_LIST_HEAD(&peer->link);
+		INIT_LIST_HEAD(&peer->error_targets);
+		spin_lock_init(&peer->lock);
+		atomic_set(&peer->usage, 1);
+		peer->debug_id = atomic_inc_return(&rxrpc_debug_id);
+		memcpy(&peer->srx, srx, sizeof(*srx));
+
+		rxrpc_assess_MTU_size(peer);
+		peer->mtu = peer->if_mtu;
+
+		if (srx->transport.family == AF_INET) {
+			peer->hdrsize = sizeof(struct iphdr);
+			switch (srx->transport_type) {
+			case SOCK_DGRAM:
+				peer->hdrsize += sizeof(struct udphdr);
+				break;
+			default:
+				BUG();
+				break;
+			}
+		} else {
+			BUG();
+		}
+
+		peer->hdrsize += sizeof(struct rxrpc_wire_header);
+		peer->maxdata = peer->mtu - peer->hdrsize;
+	}
+
+	_leave(" = %p", peer);
+	return peer;
+}
+
+/*
+ * obtain a remote transport endpoint for the specified address
+ */
+struct rxrpc_peer *rxrpc_get_peer(struct sockaddr_rxrpc *srx, gfp_t gfp)
+{
+	struct rxrpc_peer *peer, *candidate;
+	const char *new = "old";
+	int usage;
+
+	_enter("{%d,%d,%pI4+%hu}",
+	       srx->transport_type,
+	       srx->transport_len,
+	       &srx->transport.sin.sin_addr,
+	       ntohs(srx->transport.sin.sin_port));
+
+	/* search the peer list first */
+	read_lock_bh(&rxrpc_peer_lock);
+	list_for_each_entry(peer, &rxrpc_peers, link) {
+		_debug("check PEER %d { u=%d t=%d l=%d }",
+		       peer->debug_id,
+		       atomic_read(&peer->usage),
+		       peer->srx.transport_type,
+		       peer->srx.transport_len);
+
+		if (atomic_read(&peer->usage) > 0 &&
+		    peer->srx.transport_type == srx->transport_type &&
+		    peer->srx.transport_len == srx->transport_len &&
+		    memcmp(&peer->srx.transport,
+			   &srx->transport,
+			   srx->transport_len) == 0)
+			goto found_extant_peer;
+	}
+	read_unlock_bh(&rxrpc_peer_lock);
+
+	/* not yet present - create a candidate for a new record and then
+	 * redo the search */
+	candidate = rxrpc_alloc_peer(srx, gfp);
+	if (!candidate) {
+		_leave(" = -ENOMEM");
+		return ERR_PTR(-ENOMEM);
+	}
+
+	write_lock_bh(&rxrpc_peer_lock);
+
+	list_for_each_entry(peer, &rxrpc_peers, link) {
+		if (atomic_read(&peer->usage) > 0 &&
+		    peer->srx.transport_type == srx->transport_type &&
+		    peer->srx.transport_len == srx->transport_len &&
+		    memcmp(&peer->srx.transport,
+			   &srx->transport,
+			   srx->transport_len) == 0)
+			goto found_extant_second;
+	}
+
+	/* we can now add the new candidate to the list */
+	peer = candidate;
+	candidate = NULL;
+	usage = atomic_read(&peer->usage);
+
+	list_add_tail(&peer->link, &rxrpc_peers);
+	write_unlock_bh(&rxrpc_peer_lock);
+	new = "new";
+
+success:
+	_net("PEER %s %d {%d,%u,%pI4+%hu}",
+	     new,
+	     peer->debug_id,
+	     peer->srx.transport_type,
+	     peer->srx.transport.family,
+	     &peer->srx.transport.sin.sin_addr,
+	     ntohs(peer->srx.transport.sin.sin_port));
+
+	_leave(" = %p {u=%d}", peer, usage);
+	return peer;
+
+	/* we found the peer in the list immediately */
+found_extant_peer:
+	usage = atomic_inc_return(&peer->usage);
+	read_unlock_bh(&rxrpc_peer_lock);
+	goto success;
+
+	/* we found the peer on the second time through the list */
+found_extant_second:
+	usage = atomic_inc_return(&peer->usage);
+	write_unlock_bh(&rxrpc_peer_lock);
+	kfree(candidate);
+	goto success;
+}
+
+/*
+ * find the peer associated with a packet
+ */
+struct rxrpc_peer *rxrpc_find_peer(struct rxrpc_local *local,
+				   __be32 addr, __be16 port)
+{
+	struct rxrpc_peer *peer;
+
+	_enter("");
+
+	/* search the peer list */
+	read_lock_bh(&rxrpc_peer_lock);
+
+	if (local->srx.transport.family == AF_INET &&
+	    local->srx.transport_type == SOCK_DGRAM
+	    ) {
+		list_for_each_entry(peer, &rxrpc_peers, link) {
+			if (atomic_read(&peer->usage) > 0 &&
+			    peer->srx.transport_type == SOCK_DGRAM &&
+			    peer->srx.transport.family == AF_INET &&
+			    peer->srx.transport.sin.sin_port == port &&
+			    peer->srx.transport.sin.sin_addr.s_addr == addr)
+				goto found_UDP_peer;
+		}
+
+		goto new_UDP_peer;
+	}
+
+	read_unlock_bh(&rxrpc_peer_lock);
+	_leave(" = -EAFNOSUPPORT");
+	return ERR_PTR(-EAFNOSUPPORT);
+
+found_UDP_peer:
+	_net("Rx UDP DGRAM from peer %d", peer->debug_id);
+	atomic_inc(&peer->usage);
+	read_unlock_bh(&rxrpc_peer_lock);
+	_leave(" = %p", peer);
+	return peer;
+
+new_UDP_peer:
+	_net("Rx UDP DGRAM from NEW peer");
+	read_unlock_bh(&rxrpc_peer_lock);
+	_leave(" = -EBUSY [new]");
+	return ERR_PTR(-EBUSY);
+}
+
+/*
+ * release a remote transport endpoint
+ */
+void rxrpc_put_peer(struct rxrpc_peer *peer)
+{
+	_enter("%p{u=%d}", peer, atomic_read(&peer->usage));
+
+	ASSERTCMP(atomic_read(&peer->usage), >, 0);
+
+	if (likely(!atomic_dec_and_test(&peer->usage))) {
+		_leave(" [in use]");
+		return;
+	}
+
+	rxrpc_queue_work(&peer->destroyer);
+	_leave("");
+}
+
+/*
+ * destroy a remote transport endpoint
+ */
+static void rxrpc_destroy_peer(struct work_struct *work)
+{
+	struct rxrpc_peer *peer =
+		container_of(work, struct rxrpc_peer, destroyer);
+
+	_enter("%p{%d}", peer, atomic_read(&peer->usage));
+
+	write_lock_bh(&rxrpc_peer_lock);
+	list_del(&peer->link);
+	write_unlock_bh(&rxrpc_peer_lock);
+
+	_net("DESTROY PEER %d", peer->debug_id);
+	kfree(peer);
+
+	if (list_empty(&rxrpc_peers))
+		wake_up_all(&rxrpc_peer_wq);
+	_leave("");
+}
+
+/*
+ * preemptively destroy all the peer records from a transport endpoint rather
+ * than waiting for them to time out
+ */
+void __exit rxrpc_destroy_all_peers(void)
+{
+	DECLARE_WAITQUEUE(myself,current);
+
+	_enter("");
+
+	/* we simply have to wait for them to go away */
+	if (!list_empty(&rxrpc_peers)) {
+		set_current_state(TASK_UNINTERRUPTIBLE);
+		add_wait_queue(&rxrpc_peer_wq, &myself);
+
+		while (!list_empty(&rxrpc_peers)) {
+			schedule();
+			set_current_state(TASK_UNINTERRUPTIBLE);
+		}
+
+		remove_wait_queue(&rxrpc_peer_wq, &myself);
+		set_current_state(TASK_RUNNING);
+	}
+
+	_leave("");
+}

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 08/11] rxrpc: Implement peer endpoint cache
  2016-03-07 14:37 [PATCH 00/11] RxRPC: Rewrite part 2 David Howells
                   ` (6 preceding siblings ...)
  2016-03-07 14:38 ` [PATCH 07/11] rxrpc: Rename ar-peer.c to peer-object.c David Howells
@ 2016-03-07 14:38 ` David Howells
  2016-03-07 14:39 ` [PATCH 09/11] rxrpc: Add /proc/net/rxrpc_peers to display the known remote endpoints David Howells
                   ` (2 subsequent siblings)
  10 siblings, 0 replies; 21+ messages in thread
From: David Howells @ 2016-03-07 14:38 UTC (permalink / raw)
  To: linux-afs; +Cc: dhowells, netdev, linux-kernel

Implement the peer RxRPC endpoint cache.  Only the primary cache is used.
This is indexed on the following details:

  - Network transport family - currently only AF_INET.
  - Network transport type - currently only UDP.
  - Peer network transport address.

We use the RCU read lock to handle non-creating lookups so that we can do
the look up from bottom half context in the sk_error_report handler.
Creating lookups are done under spinlock rather than mutex as they might be
set up due to an external stimulus if the local endpoint is a server.

Captured network error messages (ICMP) are handled with respect to this
struct and MTU size and RTT are cached here.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/Makefile       |    3 
 net/rxrpc/af_rxrpc.c     |    6 +
 net/rxrpc/ar-accept.c    |    2 
 net/rxrpc/ar-error.c     |   57 ++++++++
 net/rxrpc/ar-input.c     |   13 +-
 net/rxrpc/ar-internal.h  |   33 ++++-
 net/rxrpc/ar-transport.c |    2 
 net/rxrpc/peer-object.c  |  330 ++++++++++++++++++++--------------------------
 net/rxrpc/utils.c        |   41 ++++++
 9 files changed, 283 insertions(+), 204 deletions(-)
 create mode 100644 net/rxrpc/utils.c

diff --git a/net/rxrpc/Makefile b/net/rxrpc/Makefile
index 38b23f6fb9aa..33ad62ea2a34 100644
--- a/net/rxrpc/Makefile
+++ b/net/rxrpc/Makefile
@@ -20,7 +20,8 @@ af-rxrpc-y := \
 	local-event.o \
 	local-object.o \
 	objcache.o \
-	peer-object.o
+	peer-object.o \
+	utils.o
 
 af-rxrpc-$(CONFIG_PROC_FS) += ar-proc.o
 af-rxrpc-$(CONFIG_SYSCTL) += sysctl.o
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index 23ebb127cac1..5f0ffb5f8306 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -240,7 +240,7 @@ static struct rxrpc_transport *rxrpc_name_to_transport(struct socket *sock,
 		return ERR_PTR(-EAFNOSUPPORT);
 
 	/* find a remote transport endpoint from the local one */
-	peer = rxrpc_get_peer(srx, gfp);
+	peer = rxrpc_lookup_peer(srx, gfp);
 	if (IS_ERR(peer))
 		return ERR_CAST(peer);
 
@@ -792,6 +792,7 @@ static int __init af_rxrpc_init(void)
 	rxrpc_epoch = get_seconds();
 
 	objcache_init(&rxrpc_local_cache);
+	objcache_init(&rxrpc_peer_cache);
 
 	ret = -ENOMEM;
 	rxrpc_call_jar = kmem_cache_create(
@@ -860,6 +861,7 @@ error_proto:
 error_work_queue:
 	kmem_cache_destroy(rxrpc_call_jar);
 error_call_jar:
+	objcache_clear(&rxrpc_peer_cache);
 	objcache_clear(&rxrpc_local_cache);
 	return ret;
 }
@@ -878,7 +880,7 @@ static void __exit af_rxrpc_exit(void)
 	rxrpc_destroy_all_calls();
 	rxrpc_destroy_all_connections();
 	rxrpc_destroy_all_transports();
-	rxrpc_destroy_all_peers();
+	objcache_clear(&rxrpc_peer_cache);
 	objcache_clear(&rxrpc_local_cache);
 
 	ASSERTCMP(atomic_read(&rxrpc_n_skbs), ==, 0);
diff --git a/net/rxrpc/ar-accept.c b/net/rxrpc/ar-accept.c
index d43799f8d3ef..fa6f34d1065a 100644
--- a/net/rxrpc/ar-accept.c
+++ b/net/rxrpc/ar-accept.c
@@ -93,7 +93,7 @@ static int rxrpc_accept_incoming_call(struct rxrpc_local *local,
 	rxrpc_new_skb(notification);
 	notification->mark = RXRPC_SKB_MARK_NEW_CALL;
 
-	peer = rxrpc_get_peer(srx, GFP_NOIO);
+	peer = rxrpc_lookup_peer(srx, GFP_NOIO);
 	if (IS_ERR(peer)) {
 		_debug("no peer");
 		ret = -EBUSY;
diff --git a/net/rxrpc/ar-error.c b/net/rxrpc/ar-error.c
index 3e82d6f0313c..05eb2366ff22 100644
--- a/net/rxrpc/ar-error.c
+++ b/net/rxrpc/ar-error.c
@@ -23,6 +23,55 @@
 #include "ar-internal.h"
 
 /*
+ * Find the peer associated with an ICMP packet.
+ */
+static struct rxrpc_peer *rxrpc_find_icmp_peer_rcu(struct rxrpc_local *local,
+						   const struct sk_buff *skb)
+{
+	struct sock_exterr_skb *serr = SKB_EXT_ERR(skb);
+	struct sockaddr_rxrpc srx;
+
+	_enter("");
+
+	memset(&srx, 0, sizeof(srx));
+	srx.transport_type = local->srx.transport_type;
+	srx.transport.family = local->srx.transport.family;
+
+	/* Can we see an ICMP4 packet on an ICMP6 listening socket?  and vice
+	 * versa?
+	 */
+	switch (srx.transport.family) {
+	case AF_INET:
+		srx.transport.sin.sin_port = serr->port;
+		srx.transport_len = sizeof(struct sockaddr_in);
+		switch (serr->ee.ee_origin) {
+		case SO_EE_ORIGIN_ICMP:
+			_net("Rx ICMP");
+			memcpy(&srx.transport.sin.sin_addr,
+			       skb_network_header(skb) + serr->addr_offset,
+			       sizeof(struct in_addr));
+			break;
+		case SO_EE_ORIGIN_ICMP6:
+			_net("Rx ICMP6 on v4 sock");
+			memcpy(&srx.transport.sin.sin_addr,
+			       skb_network_header(skb) + serr->addr_offset + 12,
+			       sizeof(struct in_addr));
+			break;
+		default:
+			memcpy(&srx.transport.sin.sin_addr, &ip_hdr(skb)->saddr,
+			       sizeof(struct in_addr));
+			break;
+		}
+		break;
+
+	default:
+		BUG();
+	}
+
+	return rxrpc_lookup_peer_rcu(&srx);
+}
+
+/*
  * handle an error received on the local endpoint
  */
 void rxrpc_UDP_error_report(struct sock *sk)
@@ -57,8 +106,10 @@ void rxrpc_UDP_error_report(struct sock *sk)
 	_net("Rx UDP Error from %pI4:%hu", &addr, ntohs(port));
 	_debug("Msg l:%d d:%d", skb->len, skb->data_len);
 
-	peer = rxrpc_find_peer(local, addr, port);
+	rcu_read_lock();
+	peer = rxrpc_find_icmp_peer_rcu(local, skb);
 	if (IS_ERR(peer)) {
+		rcu_read_unlock();
 		rxrpc_free_skb(skb);
 		_leave(" [no peer]");
 		return;
@@ -66,7 +117,7 @@ void rxrpc_UDP_error_report(struct sock *sk)
 
 	trans = rxrpc_find_transport(local, peer);
 	if (!trans) {
-		rxrpc_put_peer(peer);
+		rcu_read_unlock();
 		rxrpc_free_skb(skb);
 		_leave(" [no trans]");
 		return;
@@ -110,7 +161,7 @@ void rxrpc_UDP_error_report(struct sock *sk)
 		}
 	}
 
-	rxrpc_put_peer(peer);
+	rcu_read_unlock();
 
 	/* pass the transport ref to error_handler to release */
 	skb_queue_tail(&trans->error_queue, skb);
diff --git a/net/rxrpc/ar-input.c b/net/rxrpc/ar-input.c
index 514bfdaba322..76a070792718 100644
--- a/net/rxrpc/ar-input.c
+++ b/net/rxrpc/ar-input.c
@@ -639,14 +639,16 @@ static struct rxrpc_connection *rxrpc_conn_from_local(struct rxrpc_local *local,
 	struct rxrpc_peer *peer;
 	struct rxrpc_transport *trans;
 	struct rxrpc_connection *conn;
+	struct sockaddr_rxrpc srx;
 
-	peer = rxrpc_find_peer(local, ip_hdr(skb)->saddr,
-				udp_hdr(skb)->source);
+	rxrpc_get_addr_from_skb(local, skb, &srx);
+	rcu_read_lock();
+	peer = rxrpc_lookup_peer_rcu(&srx);
 	if (IS_ERR(peer))
-		goto cant_find_conn;
+		goto cant_find_peer;
 
 	trans = rxrpc_find_transport(local, peer);
-	rxrpc_put_peer(peer);
+	rcu_read_unlock();
 	if (!trans)
 		goto cant_find_conn;
 
@@ -656,6 +658,9 @@ static struct rxrpc_connection *rxrpc_conn_from_local(struct rxrpc_local *local,
 		goto cant_find_conn;
 
 	return conn;
+
+cant_find_peer:
+	rcu_read_unlock();
 cant_find_conn:
 	return NULL;
 }
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index ec18b06789e6..57fce54e9180 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -196,11 +196,9 @@ struct rxrpc_local {
  * - holds the connection ID counter for connections between the two endpoints
  */
 struct rxrpc_peer {
-	struct work_struct	destroyer;	/* peer destroyer */
-	struct list_head	link;		/* link in master peer list */
+	struct obj_node		obj;
 	struct list_head	error_targets;	/* targets for net error distribution */
 	spinlock_t		lock;		/* access lock */
-	atomic_t		usage;
 	unsigned int		if_mtu;		/* interface MTU for this peer */
 	unsigned int		mtu;		/* network MTU for this peer */
 	unsigned int		maxdata;	/* data size (MTU - hdrsize) */
@@ -689,10 +687,25 @@ static inline void rxrpc_put_local(struct rxrpc_local *local)
 /*
  * peer-object.c
  */
-struct rxrpc_peer *rxrpc_get_peer(struct sockaddr_rxrpc *, gfp_t);
-void rxrpc_put_peer(struct rxrpc_peer *);
-struct rxrpc_peer *rxrpc_find_peer(struct rxrpc_local *, __be32, __be16);
-void __exit rxrpc_destroy_all_peers(void);
+extern struct objcache rxrpc_peer_cache;
+
+struct rxrpc_peer *rxrpc_lookup_peer_rcu(const struct sockaddr_rxrpc *);
+struct rxrpc_peer *rxrpc_lookup_peer(struct sockaddr_rxrpc *, gfp_t);
+
+static inline void rxrpc_get_peer(struct rxrpc_peer *peer)
+{
+	objcache_get(&peer->obj);
+}
+
+static inline bool rxrpc_get_peer_maybe(struct rxrpc_peer *peer)
+{
+	return objcache_get_maybe(&peer->obj);
+}
+
+static inline void rxrpc_put_peer(struct rxrpc_peer *peer)
+{
+	objcache_put(&rxrpc_peer_cache, &peer->obj);
+}
 
 /*
  * sysctl.c
@@ -706,6 +719,12 @@ static inline void rxrpc_sysctl_exit(void) {}
 #endif
 
 /*
+ * utils.c
+ */
+void rxrpc_get_addr_from_skb(struct rxrpc_local *, const struct sk_buff *,
+			     struct sockaddr_rxrpc *);
+
+/*
  * debug tracing
  */
 extern unsigned int rxrpc_debug;
diff --git a/net/rxrpc/ar-transport.c b/net/rxrpc/ar-transport.c
index 5f9b9d462f53..e055fa30422b 100644
--- a/net/rxrpc/ar-transport.c
+++ b/net/rxrpc/ar-transport.c
@@ -119,7 +119,7 @@ struct rxrpc_transport *rxrpc_get_transport(struct rxrpc_local *local,
 	usage = atomic_read(&trans->usage);
 
 	rxrpc_get_local(trans->local);
-	atomic_inc(&trans->peer->usage);
+	rxrpc_get_peer(trans->peer);
 	list_add_tail(&trans->link, &rxrpc_transports);
 	write_unlock_bh(&rxrpc_transport_lock);
 	new = "new";
diff --git a/net/rxrpc/peer-object.c b/net/rxrpc/peer-object.c
index dc089b1976aa..35157c659bf7 100644
--- a/net/rxrpc/peer-object.c
+++ b/net/rxrpc/peer-object.c
@@ -1,6 +1,6 @@
 /* RxRPC remote transport endpoint management
  *
- * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
+ * Copyright (C) 2007, 2015 Red Hat, Inc. All Rights Reserved.
  * Written by David Howells (dhowells@redhat.com)
  *
  * This program is free software; you can redistribute it and/or
@@ -14,8 +14,6 @@
 #include <linux/skbuff.h>
 #include <linux/udp.h>
 #include <linux/in.h>
-#include <linux/in6.h>
-#include <linux/icmp.h>
 #include <linux/slab.h>
 #include <net/sock.h>
 #include <net/af_rxrpc.h>
@@ -23,11 +21,128 @@
 #include <net/route.h>
 #include "ar-internal.h"
 
-static LIST_HEAD(rxrpc_peers);
-static DEFINE_RWLOCK(rxrpc_peer_lock);
-static DECLARE_WAIT_QUEUE_HEAD(rxrpc_peer_wq);
+static unsigned long rxrpc_peer_hash_key(const void *);
+static int rxrpc_peer_cmp_key(const struct obj_node *, const void *);
+static void rxrpc_peer_gc_rcu(struct rcu_head *);
 
-static void rxrpc_destroy_peer(struct work_struct *work);
+static struct hlist_head rxrpc_peer_cache_hash[256];
+
+struct objcache rxrpc_peer_cache = {
+	.name		= "peers",
+	.gc_rcu		= rxrpc_peer_gc_rcu,
+	.hash_key	= rxrpc_peer_hash_key,
+	.cmp_key	= rxrpc_peer_cmp_key,
+	.hash_table	= rxrpc_peer_cache_hash,
+	.gc_delay	= 2,
+	.nr_buckets	= ARRAY_SIZE(rxrpc_peer_cache_hash),
+};
+
+/*
+ * Destroy a peer after the RCU grace period expires.
+ */
+static void rxrpc_peer_gc_rcu(struct rcu_head *rcu)
+{
+	struct rxrpc_peer *peer = container_of(rcu, struct rxrpc_peer, obj.rcu);
+
+	_enter("%d", peer->debug_id);
+
+	_net("DESTROY PEER %d", peer->debug_id);
+
+	ASSERT(list_empty(&peer->error_targets));
+
+	kfree(peer);
+
+	objcache_obj_rcu_done(&rxrpc_peer_cache);
+}
+
+/*
+ * Hash a peer key.
+ */
+static unsigned long rxrpc_peer_hash_key(const void *_srx)
+{
+	const struct sockaddr_rxrpc *srx = _srx;
+	const u16 *p;
+	unsigned int i, size;
+	unsigned long hash_key;
+
+	_enter("");
+
+	hash_key = srx->transport_type;
+	hash_key += srx->transport_len;
+	hash_key += srx->transport.family;
+
+	switch (srx->transport.family) {
+	case AF_INET:
+		hash_key += (u16 __force)srx->transport.sin.sin_port;
+		size = sizeof(srx->transport.sin.sin_addr);
+		p = (u16 *)&srx->transport.sin.sin_addr;
+		break;
+	}
+
+	/* Step through the peer address in 16-bit portions for speed */
+	for (i = 0; i < size; i += sizeof(*p), p++)
+		hash_key += *p;
+
+	_leave(" 0x%lx", hash_key);
+	return hash_key;
+}
+
+/*
+ * Compare a peer to a key.  Return -ve, 0 or +ve to indicate less than, same
+ * or greater than.
+ */
+static int rxrpc_peer_cmp_key(const struct obj_node *obj, const void *_srx)
+{
+	const struct rxrpc_peer *peer =
+		container_of(obj, struct rxrpc_peer, obj);
+	const struct sockaddr_rxrpc *srx = _srx;
+	int diff;
+
+	diff = ((peer->srx.transport_type - srx->transport_type) ?:
+		(peer->srx.transport_len - srx->transport_len) ?:
+		(peer->srx.transport.family - srx->transport.family));
+	if (diff != 0)
+		return diff;
+
+	switch (srx->transport.family) {
+	case AF_INET:
+		return ((u16 __force)peer->srx.transport.sin.sin_port -
+			(u16 __force)srx->transport.sin.sin_port) ?:
+			memcmp(&peer->srx.transport.sin.sin_addr,
+			       &srx->transport.sin.sin_addr,
+			       sizeof(struct in_addr));
+	default:
+		BUG();
+	}
+}
+
+/*
+ * Look up a remote transport endpoint for the specified address using RCU.
+ */
+struct rxrpc_peer *rxrpc_lookup_peer_rcu(const struct sockaddr_rxrpc *srx)
+{
+	struct rxrpc_peer *peer;
+	struct obj_node *obj;
+
+	obj = objcache_lookup_rcu(&rxrpc_peer_cache, srx);
+	if (!obj)
+		return NULL;
+
+	peer = container_of(obj, struct rxrpc_peer, obj);
+	switch (srx->transport.family) {
+	case AF_INET:
+		_net("PEER %d {%d,%u,%pI4+%hu}",
+		     peer->debug_id,
+		     peer->srx.transport_type,
+		     peer->srx.transport.family,
+		     &peer->srx.transport.sin.sin_addr,
+		     ntohs(peer->srx.transport.sin.sin_port));
+		break;
+	}
+
+	_leave(" = %p {u=%d}", peer, atomic_read(&peer->obj.usage));
+	return peer;
+}
 
 /*
  * assess the MTU size for the network interface through which this peer is
@@ -67,11 +182,8 @@ static struct rxrpc_peer *rxrpc_alloc_peer(struct sockaddr_rxrpc *srx,
 
 	peer = kzalloc(sizeof(struct rxrpc_peer), gfp);
 	if (peer) {
-		INIT_WORK(&peer->destroyer, &rxrpc_destroy_peer);
-		INIT_LIST_HEAD(&peer->link);
 		INIT_LIST_HEAD(&peer->error_targets);
 		spin_lock_init(&peer->lock);
-		atomic_set(&peer->usage, 1);
 		peer->debug_id = atomic_inc_return(&rxrpc_debug_id);
 		memcpy(&peer->srx, srx, sizeof(*srx));
 
@@ -103,10 +215,10 @@ static struct rxrpc_peer *rxrpc_alloc_peer(struct sockaddr_rxrpc *srx,
 /*
  * obtain a remote transport endpoint for the specified address
  */
-struct rxrpc_peer *rxrpc_get_peer(struct sockaddr_rxrpc *srx, gfp_t gfp)
+struct rxrpc_peer *rxrpc_lookup_peer(struct sockaddr_rxrpc *srx, gfp_t gfp)
 {
 	struct rxrpc_peer *peer, *candidate;
-	const char *new = "old";
+	struct obj_node *obj;
 	int usage;
 
 	_enter("{%d,%d,%pI4+%hu}",
@@ -116,188 +228,36 @@ struct rxrpc_peer *rxrpc_get_peer(struct sockaddr_rxrpc *srx, gfp_t gfp)
 	       ntohs(srx->transport.sin.sin_port));
 
 	/* search the peer list first */
-	read_lock_bh(&rxrpc_peer_lock);
-	list_for_each_entry(peer, &rxrpc_peers, link) {
-		_debug("check PEER %d { u=%d t=%d l=%d }",
-		       peer->debug_id,
-		       atomic_read(&peer->usage),
-		       peer->srx.transport_type,
-		       peer->srx.transport_len);
-
-		if (atomic_read(&peer->usage) > 0 &&
-		    peer->srx.transport_type == srx->transport_type &&
-		    peer->srx.transport_len == srx->transport_len &&
-		    memcmp(&peer->srx.transport,
-			   &srx->transport,
-			   srx->transport_len) == 0)
-			goto found_extant_peer;
-	}
-	read_unlock_bh(&rxrpc_peer_lock);
-
-	/* not yet present - create a candidate for a new record and then
-	 * redo the search */
-	candidate = rxrpc_alloc_peer(srx, gfp);
-	if (!candidate) {
-		_leave(" = -ENOMEM");
-		return ERR_PTR(-ENOMEM);
-	}
+	rcu_read_lock();
+	peer = rxrpc_lookup_peer_rcu(srx);
+	if (peer && !rxrpc_get_peer_maybe(peer))
+		peer = NULL;
+	rcu_read_unlock();
+
+	if (!peer) {
+		/* The peer is not yet present in cache - create a candidate
+		 * for a new record and then redo the search.
+		 */
+		candidate = rxrpc_alloc_peer(srx, gfp);
+		if (!candidate) {
+			_leave(" = NULL [nomem]");
+			return NULL;
+		}
 
-	write_lock_bh(&rxrpc_peer_lock);
+		obj = objcache_try_add(&rxrpc_peer_cache, &candidate->obj,
+				       &candidate->srx);
+		peer = container_of(obj, struct rxrpc_peer, obj);
 
-	list_for_each_entry(peer, &rxrpc_peers, link) {
-		if (atomic_read(&peer->usage) > 0 &&
-		    peer->srx.transport_type == srx->transport_type &&
-		    peer->srx.transport_len == srx->transport_len &&
-		    memcmp(&peer->srx.transport,
-			   &srx->transport,
-			   srx->transport_len) == 0)
-			goto found_extant_second;
+		if (peer != candidate)
+			kfree(candidate);
 	}
 
-	/* we can now add the new candidate to the list */
-	peer = candidate;
-	candidate = NULL;
-	usage = atomic_read(&peer->usage);
-
-	list_add_tail(&peer->link, &rxrpc_peers);
-	write_unlock_bh(&rxrpc_peer_lock);
-	new = "new";
-
-success:
-	_net("PEER %s %d {%d,%u,%pI4+%hu}",
-	     new,
+	_net("PEER %d {%d,%pI4+%hu}",
 	     peer->debug_id,
 	     peer->srx.transport_type,
-	     peer->srx.transport.family,
 	     &peer->srx.transport.sin.sin_addr,
 	     ntohs(peer->srx.transport.sin.sin_port));
 
 	_leave(" = %p {u=%d}", peer, usage);
 	return peer;
-
-	/* we found the peer in the list immediately */
-found_extant_peer:
-	usage = atomic_inc_return(&peer->usage);
-	read_unlock_bh(&rxrpc_peer_lock);
-	goto success;
-
-	/* we found the peer on the second time through the list */
-found_extant_second:
-	usage = atomic_inc_return(&peer->usage);
-	write_unlock_bh(&rxrpc_peer_lock);
-	kfree(candidate);
-	goto success;
-}
-
-/*
- * find the peer associated with a packet
- */
-struct rxrpc_peer *rxrpc_find_peer(struct rxrpc_local *local,
-				   __be32 addr, __be16 port)
-{
-	struct rxrpc_peer *peer;
-
-	_enter("");
-
-	/* search the peer list */
-	read_lock_bh(&rxrpc_peer_lock);
-
-	if (local->srx.transport.family == AF_INET &&
-	    local->srx.transport_type == SOCK_DGRAM
-	    ) {
-		list_for_each_entry(peer, &rxrpc_peers, link) {
-			if (atomic_read(&peer->usage) > 0 &&
-			    peer->srx.transport_type == SOCK_DGRAM &&
-			    peer->srx.transport.family == AF_INET &&
-			    peer->srx.transport.sin.sin_port == port &&
-			    peer->srx.transport.sin.sin_addr.s_addr == addr)
-				goto found_UDP_peer;
-		}
-
-		goto new_UDP_peer;
-	}
-
-	read_unlock_bh(&rxrpc_peer_lock);
-	_leave(" = -EAFNOSUPPORT");
-	return ERR_PTR(-EAFNOSUPPORT);
-
-found_UDP_peer:
-	_net("Rx UDP DGRAM from peer %d", peer->debug_id);
-	atomic_inc(&peer->usage);
-	read_unlock_bh(&rxrpc_peer_lock);
-	_leave(" = %p", peer);
-	return peer;
-
-new_UDP_peer:
-	_net("Rx UDP DGRAM from NEW peer");
-	read_unlock_bh(&rxrpc_peer_lock);
-	_leave(" = -EBUSY [new]");
-	return ERR_PTR(-EBUSY);
-}
-
-/*
- * release a remote transport endpoint
- */
-void rxrpc_put_peer(struct rxrpc_peer *peer)
-{
-	_enter("%p{u=%d}", peer, atomic_read(&peer->usage));
-
-	ASSERTCMP(atomic_read(&peer->usage), >, 0);
-
-	if (likely(!atomic_dec_and_test(&peer->usage))) {
-		_leave(" [in use]");
-		return;
-	}
-
-	rxrpc_queue_work(&peer->destroyer);
-	_leave("");
-}
-
-/*
- * destroy a remote transport endpoint
- */
-static void rxrpc_destroy_peer(struct work_struct *work)
-{
-	struct rxrpc_peer *peer =
-		container_of(work, struct rxrpc_peer, destroyer);
-
-	_enter("%p{%d}", peer, atomic_read(&peer->usage));
-
-	write_lock_bh(&rxrpc_peer_lock);
-	list_del(&peer->link);
-	write_unlock_bh(&rxrpc_peer_lock);
-
-	_net("DESTROY PEER %d", peer->debug_id);
-	kfree(peer);
-
-	if (list_empty(&rxrpc_peers))
-		wake_up_all(&rxrpc_peer_wq);
-	_leave("");
-}
-
-/*
- * preemptively destroy all the peer records from a transport endpoint rather
- * than waiting for them to time out
- */
-void __exit rxrpc_destroy_all_peers(void)
-{
-	DECLARE_WAITQUEUE(myself,current);
-
-	_enter("");
-
-	/* we simply have to wait for them to go away */
-	if (!list_empty(&rxrpc_peers)) {
-		set_current_state(TASK_UNINTERRUPTIBLE);
-		add_wait_queue(&rxrpc_peer_wq, &myself);
-
-		while (!list_empty(&rxrpc_peers)) {
-			schedule();
-			set_current_state(TASK_UNINTERRUPTIBLE);
-		}
-
-		remove_wait_queue(&rxrpc_peer_wq, &myself);
-		set_current_state(TASK_RUNNING);
-	}
-
-	_leave("");
 }
diff --git a/net/rxrpc/utils.c b/net/rxrpc/utils.c
new file mode 100644
index 000000000000..f28122a15a24
--- /dev/null
+++ b/net/rxrpc/utils.c
@@ -0,0 +1,41 @@
+/* Utility routines
+ *
+ * Copyright (C) 2015 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#include <linux/ip.h>
+#include <linux/udp.h>
+#include "ar-internal.h"
+
+/*
+ * Set up an RxRPC address from a socket buffer.
+ */
+void rxrpc_get_addr_from_skb(struct rxrpc_local *local,
+			     const struct sk_buff *skb,
+			     struct sockaddr_rxrpc *srx)
+{
+	memset(srx, 0, sizeof(*srx));
+	srx->transport_type = local->srx.transport_type;
+	srx->transport.family = local->srx.transport.family;
+
+	/* Can we see an ipv4 UDP packet on an ipv6 UDP socket?  and vice
+	 * versa?
+	 */
+	switch (srx->transport.family) {
+	case AF_INET:
+		srx->transport.sin.sin_port = udp_hdr(skb)->source;
+		srx->transport_len = sizeof(struct sockaddr_in);
+		memcpy(&srx->transport.sin.sin_addr, &ip_hdr(skb)->saddr,
+		       sizeof(struct in_addr));
+		break;
+
+	default:
+		BUG();
+	}
+}

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 09/11] rxrpc: Add /proc/net/rxrpc_peers to display the known remote endpoints
  2016-03-07 14:37 [PATCH 00/11] RxRPC: Rewrite part 2 David Howells
                   ` (7 preceding siblings ...)
  2016-03-07 14:38 ` [PATCH 08/11] rxrpc: Implement peer endpoint cache David Howells
@ 2016-03-07 14:39 ` David Howells
  2016-03-07 14:39 ` [PATCH 10/11] rxrpc: Rename ar-error.c to peer-event.c David Howells
  2016-03-07 14:39 ` [PATCH 11/11] rxrpc: Rename rxrpc_UDP_error_report() to rxrpc_error_report() David Howells
  10 siblings, 0 replies; 21+ messages in thread
From: David Howells @ 2016-03-07 14:39 UTC (permalink / raw)
  To: linux-afs; +Cc: dhowells, netdev, linux-kernel

Add /proc/net/rxrpc_peers to display the remote endpoint records that are
resident in the peer cache.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/af_rxrpc.c    |    3 +++
 net/rxrpc/peer-object.c |   32 ++++++++++++++++++++++++++++++++
 2 files changed, 35 insertions(+)

diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index 5f0ffb5f8306..b1849c4e3fd3 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -845,6 +845,8 @@ static int __init af_rxrpc_init(void)
 		    &rxrpc_connection_seq_fops);
 	proc_create_data("rxrpc_locals", 0, init_net.proc_net,
 			 &objcache_seq_fops, &rxrpc_local_cache);
+	proc_create_data("rxrpc_peers", 0, init_net.proc_net,
+			 &objcache_seq_fops, &rxrpc_peer_cache);
 #endif
 	return 0;
 
@@ -887,6 +889,7 @@ static void __exit af_rxrpc_exit(void)
 
 	_debug("flush scheduled work");
 	flush_workqueue(rxrpc_workqueue);
+	remove_proc_entry("rxrpc_peers", init_net.proc_net);
 	remove_proc_entry("rxrpc_locals", init_net.proc_net);
 	remove_proc_entry("rxrpc_conns", init_net.proc_net);
 	remove_proc_entry("rxrpc_calls", init_net.proc_net);
diff --git a/net/rxrpc/peer-object.c b/net/rxrpc/peer-object.c
index 35157c659bf7..2dd8c241302d 100644
--- a/net/rxrpc/peer-object.c
+++ b/net/rxrpc/peer-object.c
@@ -21,6 +21,7 @@
 #include <net/route.h>
 #include "ar-internal.h"
 
+static int rxrpc_peer_seq_show(struct seq_file *, void *);
 static unsigned long rxrpc_peer_hash_key(const void *);
 static int rxrpc_peer_cmp_key(const struct obj_node *, const void *);
 static void rxrpc_peer_gc_rcu(struct rcu_head *);
@@ -29,6 +30,7 @@ static struct hlist_head rxrpc_peer_cache_hash[256];
 
 struct objcache rxrpc_peer_cache = {
 	.name		= "peers",
+	.seq_show	= rxrpc_peer_seq_show,
 	.gc_rcu		= rxrpc_peer_gc_rcu,
 	.hash_key	= rxrpc_peer_hash_key,
 	.cmp_key	= rxrpc_peer_cmp_key,
@@ -261,3 +263,33 @@ struct rxrpc_peer *rxrpc_lookup_peer(struct sockaddr_rxrpc *srx, gfp_t gfp)
 	_leave(" = %p {u=%d}", peer, usage);
 	return peer;
 }
+
+/*
+ * Display a remote endpoint in /proc/net/rxrpc_peers.
+ */
+static int rxrpc_peer_seq_show(struct seq_file *seq, void *v)
+{
+	struct rxrpc_peer *peer;
+
+	if (v == SEQ_START_TOKEN) {
+		seq_puts(seq, "Use SvID Proto MTU   RTT   RPort Remote\n");
+		return 0;
+	}
+
+	peer = hlist_entry(v, struct rxrpc_peer, obj.link);
+
+	switch (peer->srx.transport.family) {
+	case AF_INET:
+		seq_printf(seq,
+			   "%3d %4x UDP   %5u %5lu %5hu %pI4\n",
+			   atomic_read(&peer->obj.usage),
+			   peer->srx.srx_service,
+			   peer->mtu,
+			   peer->rtt,
+			   ntohs(peer->srx.transport.sin.sin_port),
+			   &peer->srx.transport.sin.sin_addr);
+		break;
+	}
+
+	return 0;
+}

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 10/11] rxrpc: Rename ar-error.c to peer-event.c
  2016-03-07 14:37 [PATCH 00/11] RxRPC: Rewrite part 2 David Howells
                   ` (8 preceding siblings ...)
  2016-03-07 14:39 ` [PATCH 09/11] rxrpc: Add /proc/net/rxrpc_peers to display the known remote endpoints David Howells
@ 2016-03-07 14:39 ` David Howells
  2016-03-07 14:39 ` [PATCH 11/11] rxrpc: Rename rxrpc_UDP_error_report() to rxrpc_error_report() David Howells
  10 siblings, 0 replies; 21+ messages in thread
From: David Howells @ 2016-03-07 14:39 UTC (permalink / raw)
  To: linux-afs; +Cc: dhowells, netdev, linux-kernel

Rename ar-error.c to peer-event.c.  This will form the core of the peer
event handling code.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/Makefile      |    2 
 net/rxrpc/ar-error.c    |  281 -----------------------------------------------
 net/rxrpc/ar-internal.h |   12 +-
 net/rxrpc/peer-event.c  |  281 +++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 288 insertions(+), 288 deletions(-)
 delete mode 100644 net/rxrpc/ar-error.c
 create mode 100644 net/rxrpc/peer-event.c

diff --git a/net/rxrpc/Makefile b/net/rxrpc/Makefile
index 33ad62ea2a34..3417a98e6914 100644
--- a/net/rxrpc/Makefile
+++ b/net/rxrpc/Makefile
@@ -9,7 +9,6 @@ af-rxrpc-y := \
 	ar-call.o \
 	ar-connection.o \
 	ar-connevent.o \
-	ar-error.o \
 	ar-input.o \
 	ar-key.o \
 	ar-output.o \
@@ -20,6 +19,7 @@ af-rxrpc-y := \
 	local-event.o \
 	local-object.o \
 	objcache.o \
+	peer-event.o \
 	peer-object.o \
 	utils.o
 
diff --git a/net/rxrpc/ar-error.c b/net/rxrpc/ar-error.c
deleted file mode 100644
index 05eb2366ff22..000000000000
--- a/net/rxrpc/ar-error.c
+++ /dev/null
@@ -1,281 +0,0 @@
-/* Error message handling (ICMP)
- *
- * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
- * Written by David Howells (dhowells@redhat.com)
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version
- * 2 of the License, or (at your option) any later version.
- */
-
-#include <linux/module.h>
-#include <linux/net.h>
-#include <linux/skbuff.h>
-#include <linux/errqueue.h>
-#include <linux/udp.h>
-#include <linux/in.h>
-#include <linux/in6.h>
-#include <linux/icmp.h>
-#include <net/sock.h>
-#include <net/af_rxrpc.h>
-#include <net/ip.h>
-#include "ar-internal.h"
-
-/*
- * Find the peer associated with an ICMP packet.
- */
-static struct rxrpc_peer *rxrpc_find_icmp_peer_rcu(struct rxrpc_local *local,
-						   const struct sk_buff *skb)
-{
-	struct sock_exterr_skb *serr = SKB_EXT_ERR(skb);
-	struct sockaddr_rxrpc srx;
-
-	_enter("");
-
-	memset(&srx, 0, sizeof(srx));
-	srx.transport_type = local->srx.transport_type;
-	srx.transport.family = local->srx.transport.family;
-
-	/* Can we see an ICMP4 packet on an ICMP6 listening socket?  and vice
-	 * versa?
-	 */
-	switch (srx.transport.family) {
-	case AF_INET:
-		srx.transport.sin.sin_port = serr->port;
-		srx.transport_len = sizeof(struct sockaddr_in);
-		switch (serr->ee.ee_origin) {
-		case SO_EE_ORIGIN_ICMP:
-			_net("Rx ICMP");
-			memcpy(&srx.transport.sin.sin_addr,
-			       skb_network_header(skb) + serr->addr_offset,
-			       sizeof(struct in_addr));
-			break;
-		case SO_EE_ORIGIN_ICMP6:
-			_net("Rx ICMP6 on v4 sock");
-			memcpy(&srx.transport.sin.sin_addr,
-			       skb_network_header(skb) + serr->addr_offset + 12,
-			       sizeof(struct in_addr));
-			break;
-		default:
-			memcpy(&srx.transport.sin.sin_addr, &ip_hdr(skb)->saddr,
-			       sizeof(struct in_addr));
-			break;
-		}
-		break;
-
-	default:
-		BUG();
-	}
-
-	return rxrpc_lookup_peer_rcu(&srx);
-}
-
-/*
- * handle an error received on the local endpoint
- */
-void rxrpc_UDP_error_report(struct sock *sk)
-{
-	struct sock_exterr_skb *serr;
-	struct rxrpc_transport *trans;
-	struct rxrpc_local *local = sk->sk_user_data;
-	struct rxrpc_peer *peer;
-	struct sk_buff *skb;
-	__be32 addr;
-	__be16 port;
-
-	_enter("%p{%d}", sk, local->debug_id);
-
-	skb = sock_dequeue_err_skb(sk);
-	if (!skb) {
-		_leave("UDP socket errqueue empty");
-		return;
-	}
-	serr = SKB_EXT_ERR(skb);
-	if (!skb->len && serr->ee.ee_origin == SO_EE_ORIGIN_TIMESTAMPING) {
-		_leave("UDP empty message");
-		kfree_skb(skb);
-		return;
-	}
-
-	rxrpc_new_skb(skb);
-
-	addr = *(__be32 *)(skb_network_header(skb) + serr->addr_offset);
-	port = serr->port;
-
-	_net("Rx UDP Error from %pI4:%hu", &addr, ntohs(port));
-	_debug("Msg l:%d d:%d", skb->len, skb->data_len);
-
-	rcu_read_lock();
-	peer = rxrpc_find_icmp_peer_rcu(local, skb);
-	if (IS_ERR(peer)) {
-		rcu_read_unlock();
-		rxrpc_free_skb(skb);
-		_leave(" [no peer]");
-		return;
-	}
-
-	trans = rxrpc_find_transport(local, peer);
-	if (!trans) {
-		rcu_read_unlock();
-		rxrpc_free_skb(skb);
-		_leave(" [no trans]");
-		return;
-	}
-
-	if (serr->ee.ee_origin == SO_EE_ORIGIN_ICMP &&
-	    serr->ee.ee_type == ICMP_DEST_UNREACH &&
-	    serr->ee.ee_code == ICMP_FRAG_NEEDED
-	    ) {
-		u32 mtu = serr->ee.ee_info;
-
-		_net("Rx Received ICMP Fragmentation Needed (%d)", mtu);
-
-		/* wind down the local interface MTU */
-		if (mtu > 0 && peer->if_mtu == 65535 && mtu < peer->if_mtu) {
-			peer->if_mtu = mtu;
-			_net("I/F MTU %u", mtu);
-		}
-
-		if (mtu == 0) {
-			/* they didn't give us a size, estimate one */
-			mtu = peer->if_mtu;
-			if (mtu > 1500) {
-				mtu >>= 1;
-				if (mtu < 1500)
-					mtu = 1500;
-			} else {
-				mtu -= 100;
-				if (mtu < peer->hdrsize)
-					mtu = peer->hdrsize + 4;
-			}
-		}
-
-		if (mtu < peer->mtu) {
-			spin_lock_bh(&peer->lock);
-			peer->mtu = mtu;
-			peer->maxdata = peer->mtu - peer->hdrsize;
-			spin_unlock_bh(&peer->lock);
-			_net("Net MTU %u (maxdata %u)",
-			     peer->mtu, peer->maxdata);
-		}
-	}
-
-	rcu_read_unlock();
-
-	/* pass the transport ref to error_handler to release */
-	skb_queue_tail(&trans->error_queue, skb);
-	rxrpc_queue_work(&trans->error_handler);
-	_leave("");
-}
-
-/*
- * deal with UDP error messages
- */
-void rxrpc_UDP_error_handler(struct work_struct *work)
-{
-	struct sock_extended_err *ee;
-	struct sock_exterr_skb *serr;
-	struct rxrpc_transport *trans =
-		container_of(work, struct rxrpc_transport, error_handler);
-	struct sk_buff *skb;
-	int err;
-
-	_enter("");
-
-	skb = skb_dequeue(&trans->error_queue);
-	if (!skb)
-		return;
-
-	serr = SKB_EXT_ERR(skb);
-	ee = &serr->ee;
-
-	_net("Rx Error o=%d t=%d c=%d e=%d",
-	     ee->ee_origin, ee->ee_type, ee->ee_code, ee->ee_errno);
-
-	err = ee->ee_errno;
-
-	switch (ee->ee_origin) {
-	case SO_EE_ORIGIN_ICMP:
-		switch (ee->ee_type) {
-		case ICMP_DEST_UNREACH:
-			switch (ee->ee_code) {
-			case ICMP_NET_UNREACH:
-				_net("Rx Received ICMP Network Unreachable");
-				break;
-			case ICMP_HOST_UNREACH:
-				_net("Rx Received ICMP Host Unreachable");
-				break;
-			case ICMP_PORT_UNREACH:
-				_net("Rx Received ICMP Port Unreachable");
-				break;
-			case ICMP_NET_UNKNOWN:
-				_net("Rx Received ICMP Unknown Network");
-				break;
-			case ICMP_HOST_UNKNOWN:
-				_net("Rx Received ICMP Unknown Host");
-				break;
-			default:
-				_net("Rx Received ICMP DestUnreach code=%u",
-				     ee->ee_code);
-				break;
-			}
-			break;
-
-		case ICMP_TIME_EXCEEDED:
-			_net("Rx Received ICMP TTL Exceeded");
-			break;
-
-		default:
-			_proto("Rx Received ICMP error { type=%u code=%u }",
-			       ee->ee_type, ee->ee_code);
-			break;
-		}
-		break;
-
-	case SO_EE_ORIGIN_LOCAL:
-		_proto("Rx Received local error { error=%d }",
-		       ee->ee_errno);
-		break;
-
-	case SO_EE_ORIGIN_NONE:
-	case SO_EE_ORIGIN_ICMP6:
-	default:
-		_proto("Rx Received error report { orig=%u }",
-		       ee->ee_origin);
-		break;
-	}
-
-	/* terminate all the affected calls if there's an unrecoverable
-	 * error */
-	if (err) {
-		struct rxrpc_call *call, *_n;
-
-		_debug("ISSUE ERROR %d", err);
-
-		spin_lock_bh(&trans->peer->lock);
-		trans->peer->net_error = err;
-
-		list_for_each_entry_safe(call, _n, &trans->peer->error_targets,
-					 error_link) {
-			write_lock(&call->state_lock);
-			if (call->state != RXRPC_CALL_COMPLETE &&
-			    call->state < RXRPC_CALL_NETWORK_ERROR) {
-				call->state = RXRPC_CALL_NETWORK_ERROR;
-				set_bit(RXRPC_CALL_EV_RCVD_ERROR, &call->events);
-				rxrpc_queue_call(call);
-			}
-			write_unlock(&call->state_lock);
-			list_del_init(&call->error_link);
-		}
-
-		spin_unlock_bh(&trans->peer->lock);
-	}
-
-	if (!skb_queue_empty(&trans->error_queue))
-		rxrpc_queue_work(&trans->error_handler);
-
-	rxrpc_free_skb(skb);
-	rxrpc_put_transport(trans);
-	_leave("");
-}
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 57fce54e9180..0cfd7c10d824 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -579,12 +579,6 @@ void rxrpc_reject_packet(struct rxrpc_local *, struct sk_buff *);
 void rxrpc_reject_packets(struct work_struct *);
 
 /*
- * ar-error.c
- */
-void rxrpc_UDP_error_report(struct sock *);
-void rxrpc_UDP_error_handler(struct work_struct *);
-
-/*
  * ar-input.c
  */
 extern const char *rxrpc_pkts[];
@@ -685,6 +679,12 @@ static inline void rxrpc_put_local(struct rxrpc_local *local)
 }
 
 /*
+ * peer-event.c
+ */
+void rxrpc_UDP_error_report(struct sock *);
+void rxrpc_UDP_error_handler(struct work_struct *);
+
+/*
  * peer-object.c
  */
 extern struct objcache rxrpc_peer_cache;
diff --git a/net/rxrpc/peer-event.c b/net/rxrpc/peer-event.c
new file mode 100644
index 000000000000..05eb2366ff22
--- /dev/null
+++ b/net/rxrpc/peer-event.c
@@ -0,0 +1,281 @@
+/* Error message handling (ICMP)
+ *
+ * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version
+ * 2 of the License, or (at your option) any later version.
+ */
+
+#include <linux/module.h>
+#include <linux/net.h>
+#include <linux/skbuff.h>
+#include <linux/errqueue.h>
+#include <linux/udp.h>
+#include <linux/in.h>
+#include <linux/in6.h>
+#include <linux/icmp.h>
+#include <net/sock.h>
+#include <net/af_rxrpc.h>
+#include <net/ip.h>
+#include "ar-internal.h"
+
+/*
+ * Find the peer associated with an ICMP packet.
+ */
+static struct rxrpc_peer *rxrpc_find_icmp_peer_rcu(struct rxrpc_local *local,
+						   const struct sk_buff *skb)
+{
+	struct sock_exterr_skb *serr = SKB_EXT_ERR(skb);
+	struct sockaddr_rxrpc srx;
+
+	_enter("");
+
+	memset(&srx, 0, sizeof(srx));
+	srx.transport_type = local->srx.transport_type;
+	srx.transport.family = local->srx.transport.family;
+
+	/* Can we see an ICMP4 packet on an ICMP6 listening socket?  and vice
+	 * versa?
+	 */
+	switch (srx.transport.family) {
+	case AF_INET:
+		srx.transport.sin.sin_port = serr->port;
+		srx.transport_len = sizeof(struct sockaddr_in);
+		switch (serr->ee.ee_origin) {
+		case SO_EE_ORIGIN_ICMP:
+			_net("Rx ICMP");
+			memcpy(&srx.transport.sin.sin_addr,
+			       skb_network_header(skb) + serr->addr_offset,
+			       sizeof(struct in_addr));
+			break;
+		case SO_EE_ORIGIN_ICMP6:
+			_net("Rx ICMP6 on v4 sock");
+			memcpy(&srx.transport.sin.sin_addr,
+			       skb_network_header(skb) + serr->addr_offset + 12,
+			       sizeof(struct in_addr));
+			break;
+		default:
+			memcpy(&srx.transport.sin.sin_addr, &ip_hdr(skb)->saddr,
+			       sizeof(struct in_addr));
+			break;
+		}
+		break;
+
+	default:
+		BUG();
+	}
+
+	return rxrpc_lookup_peer_rcu(&srx);
+}
+
+/*
+ * handle an error received on the local endpoint
+ */
+void rxrpc_UDP_error_report(struct sock *sk)
+{
+	struct sock_exterr_skb *serr;
+	struct rxrpc_transport *trans;
+	struct rxrpc_local *local = sk->sk_user_data;
+	struct rxrpc_peer *peer;
+	struct sk_buff *skb;
+	__be32 addr;
+	__be16 port;
+
+	_enter("%p{%d}", sk, local->debug_id);
+
+	skb = sock_dequeue_err_skb(sk);
+	if (!skb) {
+		_leave("UDP socket errqueue empty");
+		return;
+	}
+	serr = SKB_EXT_ERR(skb);
+	if (!skb->len && serr->ee.ee_origin == SO_EE_ORIGIN_TIMESTAMPING) {
+		_leave("UDP empty message");
+		kfree_skb(skb);
+		return;
+	}
+
+	rxrpc_new_skb(skb);
+
+	addr = *(__be32 *)(skb_network_header(skb) + serr->addr_offset);
+	port = serr->port;
+
+	_net("Rx UDP Error from %pI4:%hu", &addr, ntohs(port));
+	_debug("Msg l:%d d:%d", skb->len, skb->data_len);
+
+	rcu_read_lock();
+	peer = rxrpc_find_icmp_peer_rcu(local, skb);
+	if (IS_ERR(peer)) {
+		rcu_read_unlock();
+		rxrpc_free_skb(skb);
+		_leave(" [no peer]");
+		return;
+	}
+
+	trans = rxrpc_find_transport(local, peer);
+	if (!trans) {
+		rcu_read_unlock();
+		rxrpc_free_skb(skb);
+		_leave(" [no trans]");
+		return;
+	}
+
+	if (serr->ee.ee_origin == SO_EE_ORIGIN_ICMP &&
+	    serr->ee.ee_type == ICMP_DEST_UNREACH &&
+	    serr->ee.ee_code == ICMP_FRAG_NEEDED
+	    ) {
+		u32 mtu = serr->ee.ee_info;
+
+		_net("Rx Received ICMP Fragmentation Needed (%d)", mtu);
+
+		/* wind down the local interface MTU */
+		if (mtu > 0 && peer->if_mtu == 65535 && mtu < peer->if_mtu) {
+			peer->if_mtu = mtu;
+			_net("I/F MTU %u", mtu);
+		}
+
+		if (mtu == 0) {
+			/* they didn't give us a size, estimate one */
+			mtu = peer->if_mtu;
+			if (mtu > 1500) {
+				mtu >>= 1;
+				if (mtu < 1500)
+					mtu = 1500;
+			} else {
+				mtu -= 100;
+				if (mtu < peer->hdrsize)
+					mtu = peer->hdrsize + 4;
+			}
+		}
+
+		if (mtu < peer->mtu) {
+			spin_lock_bh(&peer->lock);
+			peer->mtu = mtu;
+			peer->maxdata = peer->mtu - peer->hdrsize;
+			spin_unlock_bh(&peer->lock);
+			_net("Net MTU %u (maxdata %u)",
+			     peer->mtu, peer->maxdata);
+		}
+	}
+
+	rcu_read_unlock();
+
+	/* pass the transport ref to error_handler to release */
+	skb_queue_tail(&trans->error_queue, skb);
+	rxrpc_queue_work(&trans->error_handler);
+	_leave("");
+}
+
+/*
+ * deal with UDP error messages
+ */
+void rxrpc_UDP_error_handler(struct work_struct *work)
+{
+	struct sock_extended_err *ee;
+	struct sock_exterr_skb *serr;
+	struct rxrpc_transport *trans =
+		container_of(work, struct rxrpc_transport, error_handler);
+	struct sk_buff *skb;
+	int err;
+
+	_enter("");
+
+	skb = skb_dequeue(&trans->error_queue);
+	if (!skb)
+		return;
+
+	serr = SKB_EXT_ERR(skb);
+	ee = &serr->ee;
+
+	_net("Rx Error o=%d t=%d c=%d e=%d",
+	     ee->ee_origin, ee->ee_type, ee->ee_code, ee->ee_errno);
+
+	err = ee->ee_errno;
+
+	switch (ee->ee_origin) {
+	case SO_EE_ORIGIN_ICMP:
+		switch (ee->ee_type) {
+		case ICMP_DEST_UNREACH:
+			switch (ee->ee_code) {
+			case ICMP_NET_UNREACH:
+				_net("Rx Received ICMP Network Unreachable");
+				break;
+			case ICMP_HOST_UNREACH:
+				_net("Rx Received ICMP Host Unreachable");
+				break;
+			case ICMP_PORT_UNREACH:
+				_net("Rx Received ICMP Port Unreachable");
+				break;
+			case ICMP_NET_UNKNOWN:
+				_net("Rx Received ICMP Unknown Network");
+				break;
+			case ICMP_HOST_UNKNOWN:
+				_net("Rx Received ICMP Unknown Host");
+				break;
+			default:
+				_net("Rx Received ICMP DestUnreach code=%u",
+				     ee->ee_code);
+				break;
+			}
+			break;
+
+		case ICMP_TIME_EXCEEDED:
+			_net("Rx Received ICMP TTL Exceeded");
+			break;
+
+		default:
+			_proto("Rx Received ICMP error { type=%u code=%u }",
+			       ee->ee_type, ee->ee_code);
+			break;
+		}
+		break;
+
+	case SO_EE_ORIGIN_LOCAL:
+		_proto("Rx Received local error { error=%d }",
+		       ee->ee_errno);
+		break;
+
+	case SO_EE_ORIGIN_NONE:
+	case SO_EE_ORIGIN_ICMP6:
+	default:
+		_proto("Rx Received error report { orig=%u }",
+		       ee->ee_origin);
+		break;
+	}
+
+	/* terminate all the affected calls if there's an unrecoverable
+	 * error */
+	if (err) {
+		struct rxrpc_call *call, *_n;
+
+		_debug("ISSUE ERROR %d", err);
+
+		spin_lock_bh(&trans->peer->lock);
+		trans->peer->net_error = err;
+
+		list_for_each_entry_safe(call, _n, &trans->peer->error_targets,
+					 error_link) {
+			write_lock(&call->state_lock);
+			if (call->state != RXRPC_CALL_COMPLETE &&
+			    call->state < RXRPC_CALL_NETWORK_ERROR) {
+				call->state = RXRPC_CALL_NETWORK_ERROR;
+				set_bit(RXRPC_CALL_EV_RCVD_ERROR, &call->events);
+				rxrpc_queue_call(call);
+			}
+			write_unlock(&call->state_lock);
+			list_del_init(&call->error_link);
+		}
+
+		spin_unlock_bh(&trans->peer->lock);
+	}
+
+	if (!skb_queue_empty(&trans->error_queue))
+		rxrpc_queue_work(&trans->error_handler);
+
+	rxrpc_free_skb(skb);
+	rxrpc_put_transport(trans);
+	_leave("");
+}

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 11/11] rxrpc: Rename rxrpc_UDP_error_report() to rxrpc_error_report()
  2016-03-07 14:37 [PATCH 00/11] RxRPC: Rewrite part 2 David Howells
                   ` (9 preceding siblings ...)
  2016-03-07 14:39 ` [PATCH 10/11] rxrpc: Rename ar-error.c to peer-event.c David Howells
@ 2016-03-07 14:39 ` David Howells
  10 siblings, 0 replies; 21+ messages in thread
From: David Howells @ 2016-03-07 14:39 UTC (permalink / raw)
  To: linux-afs; +Cc: dhowells, netdev, linux-kernel

Rename rxrpc_UDP_error_report() to rxrpc_error_report() as it might get
called for something other than UDP.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/ar-internal.h  |    2 +-
 net/rxrpc/local-object.c |    2 +-
 net/rxrpc/peer-event.c   |    2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 0cfd7c10d824..95c52ed2419f 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -681,7 +681,7 @@ static inline void rxrpc_put_local(struct rxrpc_local *local)
 /*
  * peer-event.c
  */
-void rxrpc_UDP_error_report(struct sock *);
+void rxrpc_error_report(struct sock *);
 void rxrpc_UDP_error_handler(struct work_struct *);
 
 /*
diff --git a/net/rxrpc/local-object.c b/net/rxrpc/local-object.c
index cc6354675026..742e964cb8ce 100644
--- a/net/rxrpc/local-object.c
+++ b/net/rxrpc/local-object.c
@@ -189,7 +189,7 @@ static int rxrpc_open_socket(struct rxrpc_local *local)
 	sock = local->socket->sk;
 	sock->sk_user_data	= local;
 	sock->sk_data_ready	= rxrpc_data_ready;
-	sock->sk_error_report	= rxrpc_UDP_error_report;
+	sock->sk_error_report	= rxrpc_error_report;
 	_leave(" = 0");
 	return 0;
 
diff --git a/net/rxrpc/peer-event.c b/net/rxrpc/peer-event.c
index 05eb2366ff22..7da42d43e9c8 100644
--- a/net/rxrpc/peer-event.c
+++ b/net/rxrpc/peer-event.c
@@ -74,7 +74,7 @@ static struct rxrpc_peer *rxrpc_find_icmp_peer_rcu(struct rxrpc_local *local,
 /*
  * handle an error received on the local endpoint
  */
-void rxrpc_UDP_error_report(struct sock *sk)
+void rxrpc_error_report(struct sock *sk)
 {
 	struct sock_exterr_skb *serr;
 	struct rxrpc_transport *trans;

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* Re: [PATCH 01/11] rxrpc: Add a common object cache
  2016-03-07 14:38 ` [PATCH 01/11] rxrpc: Add a common object cache David Howells
@ 2016-03-07 18:42   ` David Miller
  2016-03-07 22:45   ` David Howells
  1 sibling, 0 replies; 21+ messages in thread
From: David Miller @ 2016-03-07 18:42 UTC (permalink / raw)
  To: dhowells; +Cc: linux-afs, netdev, linux-kernel

From: David Howells <dhowells@redhat.com>
Date: Mon, 07 Mar 2016 14:38:06 +0000

> Add a common object cache implementation for RxRPC.  This will be used to
> cache objects of various types (calls, connections, local and remote
> endpoint records).  Each object that would be cached must contain an
> obj_node struct for the cache to use.  The object's usage count and link
> pointers are here, plus other internal metadata.
> 
> Each object cache consists of a primary hash to which all objects of that
> type must be added and a secondary hash to which objects may also be added
> and removed a single time.  Objects are automatically removed from both
> hashes when they expire.
> 
> Objects start off life with a usage count of 2 - one for the cache and one
> for the caller.  When an object's usage count is reduced to 1, it sits in
> the cache until its expiry time is reached, at which point the cache
> attempts to reduce the count to 0 and, if successful, clean it up.  An
> object with a usage count of 1 in the cache can be looked up and have its
> usage count increased, thereby stopping the expiry process.
> 
> Objects are looked up, unlinked and destroyed under RCU-safe conditions.
> 
> A garbage collector cycles through all the hash buckets in the primary hash
> and compares the expiry times of the usage-count-1 objects to the current
> time, removing any that have expired.  This is kicked by a single timer for
> the whole cache rather than having a timer per object.
> 
> Signed-off-by: David Howells <dhowells@redhat.com>

I know you put a lot of time and effort into this, but I want to strongly
recommend against a garbage collected hash table for anything whatsoever.

Especially if the given objects are in some way created/destroyed/etc. by
operations triggerable remotely.

This can be DoS'd quite trivially, and that's why we have removed the ipv4
routing cache which did the same.

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 01/11] rxrpc: Add a common object cache
  2016-03-07 14:38 ` [PATCH 01/11] rxrpc: Add a common object cache David Howells
  2016-03-07 18:42   ` David Miller
@ 2016-03-07 22:45   ` David Howells
  2016-03-08  4:07     ` David Miller
                       ` (2 more replies)
  1 sibling, 3 replies; 21+ messages in thread
From: David Howells @ 2016-03-07 22:45 UTC (permalink / raw)
  To: David Miller; +Cc: dhowells, linux-afs, netdev, linux-kernel

David Miller <davem@davemloft.net> wrote:

> I know you put a lot of time and effort into this, but I want to strongly
> recommend against a garbage collected hash table for anything whatsoever.
> 
> Especially if the given objects are in some way created/destroyed/etc. by
> operations triggerable remotely.
> 
> This can be DoS'd quite trivially, and that's why we have removed the ipv4
> routing cache which did the same.

Hmmm...  You have a point.  What would you suggest instead?  At least with the
common object cache code I have, I might be able to just change that.

Some thoughts/notes:

 (1) Connection objects must have a time delay before expiry after last use.

     A connection object represents a negotiated security context (involving
     sending CHALLENGE and RESPONSE packets) and stores a certain amount of
     crypto state set up that can be reused (potentially for up to 4 billion
     calls).

     The set up cost of a connection is therefore typically non-trivial (you
     can have a connection without any security, but this can only do
     anonymous operations since the negotiated security represents
     authentication as well as data encryption).

     Once I kill off an incoming connection object, I have to set up the
     connection object anew for the next call on the same connection.  Now,
     granted, it's always possible that there will be a new incoming call the
     moment I kill off a connection - but this is much more likely if the
     connection is killed off immediately.

     Similarly, outgoing connections are meant to be reusable, given the same
     parameters - but if, say, a client program is making a series of calls
     and I kill the connection off immediately a call is dead, then I have to
     set up a new connection for each call the client makes.

     The way AF_RXRPC currently works, userspace clients don't interact
     directly with connection and peer objects - only calls.  I'd rather not
     have to expose the management of those to userspace.

 (2) A connection also retains the final state of the call recently terminated
     on that connection in each call slot (channel) until that slot is reused.
     This allows re-sending of final ACK and ABORT packets.

     If I immediately kill off a connection, I can't do this.

 (3) A local endpoint object is a purely local affair, maximum count 1 per
     open AF_RXRPC socket.  These can be destroyed the moment all pinning
     sockets and connections are gone - but these aren't really a problem.

 (4) A peer object can be disposed of when all the connections using it are
     gone - at the cost of losing the determined MTU data.  That's probably
     fine, provided connections have delay before expiry.

 (5) Call objects can be disposed of immediately that they terminate and have
     communicated their last with userspace (I have to tell userspace that the
     identifier it gave us is released).  A call's last state is transferred
     to the parent connection object until a new call displaces it from the
     channel it was using.

 (6) Call objects have to persist for a while since a call involves the
     exchange of at least three packets (a minimum call is a request DATA
     packet with just an ID, a response DATA packet with no payload and then
     an ACK packet) and some communication with userspace.

     An attacker can just send us a whole bunch of request DATA packets, each
     with a different call/connection combination and attempt to run the
     server out of memory, no matter how the persistence is managed.

 (7) Why can't I have simple counters representing the maxmimum numbers of
     peer, connection and call objects in existence at any one time and return
     a BUSY packet to a remote client or EAGAIN to a local client if the
     counters are maxed out?

     I could probably also drive gc based on counter levels as well as expiry
     time.

 (8) Should I take it that I can't use RCU either as that also has a deferred
     garbage collection mechanism and so subject to being stuffed remotely?

     I really want to get spinlocks out of the incoming packet distribution
     path as that's driven from the data_ready handler of the transport
     socket.

David

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 01/11] rxrpc: Add a common object cache
  2016-03-07 22:45   ` David Howells
@ 2016-03-08  4:07     ` David Miller
  2016-03-08 11:39     ` David Howells
  2016-03-08 13:02     ` David Howells
  2 siblings, 0 replies; 21+ messages in thread
From: David Miller @ 2016-03-08  4:07 UTC (permalink / raw)
  To: dhowells; +Cc: linux-afs, netdev, linux-kernel

From: David Howells <dhowells@redhat.com>
Date: Mon, 07 Mar 2016 22:45:14 +0000

> David Miller <davem@davemloft.net> wrote:
> 
>> I know you put a lot of time and effort into this, but I want to strongly
>> recommend against a garbage collected hash table for anything whatsoever.
>> 
>> Especially if the given objects are in some way created/destroyed/etc. by
>> operations triggerable remotely.
>> 
>> This can be DoS'd quite trivially, and that's why we have removed the ipv4
>> routing cache which did the same.
> 
> Hmmm...  You have a point.  What would you suggest instead?  At least with the
> common object cache code I have, I might be able to just change that.

Objects that are used for correct operation have no easily recyclable
property, you must hold onto them.  There has to be a set of resources
held and consumed at both endpoints for it to work properly ("I can't
DoS you without DoS'ing myself").

Where reclaimable tables work is for stuff that is near zero cost to
reconstitute.  A good example is the TCP metrics table.  When a TCP
metrics entry is reclaimed, it's not like we have to renegotiate a
security context when we try to talk to that end-host again.

If the concept of these open-ended objects is a fundamental aspect of
the protocol.... that's a serious shortcoming of RXRPC.

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 01/11] rxrpc: Add a common object cache
  2016-03-07 22:45   ` David Howells
  2016-03-08  4:07     ` David Miller
@ 2016-03-08 11:39     ` David Howells
  2016-03-08 20:13       ` David Miller
  2016-03-08 21:11       ` David Howells
  2016-03-08 13:02     ` David Howells
  2 siblings, 2 replies; 21+ messages in thread
From: David Howells @ 2016-03-08 11:39 UTC (permalink / raw)
  To: David Miller; +Cc: dhowells, linux-afs, netdev, linux-kernel

David Miller <davem@davemloft.net> wrote:

> >> I know you put a lot of time and effort into this, but I want to strongly
> >> recommend against a garbage collected hash table for anything whatsoever.
> >> 
> >> Especially if the given objects are in some way created/destroyed/etc. by
> >> operations triggerable remotely.
> >> 
> >> This can be DoS'd quite trivially, and that's why we have removed the ipv4
> >> routing cache which did the same.
> > 
> > Hmmm...  You have a point.  What would you suggest instead?  At least with
> > the common object cache code I have, I might be able to just change that.
> 
> Objects that are used for correct operation have no easily recyclable
> property, you must hold onto them.  There has to be a set of resources
> held and consumed at both endpoints for it to work properly ("I can't
> DoS you without DoS'ing myself").

So what would you use instead for the connection object?  That is the only one
that really needs to live past its last use; local, peer and call objects can
reasonably be recycled the moment they're no longer in use.  Note that I still
need some fast way to look up the connection object (hence the RCU hash table
approach) as they're the main packet routing device.

I would also prefer to avoid having a separate timer for every connection
object when I can have one for the whole set.  Does it make sense to maintain a
FIFO list of connections (though this would mean potentially taking a spinlock
every time I get a packet)?  That's why I prefer the gc hash table scan
approach - it doesn't require a separate list and only requires one timer.

> Where reclaimable tables work is for stuff that is near zero cost to
> reconstitute.  A good example is the TCP metrics table.  When a TCP
> metrics entry is reclaimed, it's not like we have to renegotiate a
> security context when we try to talk to that end-host again.

Sounds like this would be a way to do the peer object management.

> If the concept of these open-ended objects is a fundamental aspect of
> the protocol.... that's a serious shortcoming of RXRPC.

There is sort of a way to close a connection - you can send a connection-level
abort packet (call ID is set to 0) - but I'm not sure anyone does that.  It's
probably worth suggesting to the other implementors, though.

However, to some extent that is irrelevant: I didn't design the protocol and
can't very easily change it - I'm just trying to implement it.

So yes, it can be considered open-ended, but it is also expected to be cached
and to expire after a period of non-use.  If I set up a connection and let it
lapse, the RxRPC server can set it up again, no problem - it is allowed to send
further challenge/response packets for the same connection ID.  As far as the
server is concerned it would be a new connection.

I would expect the reason it's like this is that it's intended as the transport
for AFS, but AFS, unlike CIFS, doesn't negotiate a security context at mount
time for the life of the mount.  An AFS mount is made, then I do kinit to
negotiate a connection and then access the AFS mount - which uses my own
negotiated security context.  Someone else, who has done their own kinit and
has negotiated a different security context can access the same AFS mount -
which will then use their context at the same time as it is using mine.

The problem that the AFS mount has is that it doesn't know when I'm going to
make my last access of it, short of the mount being unmounted.

So, in that respect, an open-ended connection concept might make a certain
amount of sense.

One could argue, I suppose, that things should've been arranged that the RxRPC
client would manage the lifetime of each connection it sets up, rather than
both ends letting it lapse by mutual loss of interest.  But you *still* have to
have a timeout, lest the client die and not close its connection.

David

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 01/11] rxrpc: Add a common object cache
  2016-03-07 22:45   ` David Howells
  2016-03-08  4:07     ` David Miller
  2016-03-08 11:39     ` David Howells
@ 2016-03-08 13:02     ` David Howells
  2016-03-08 20:15       ` David Miller
  2 siblings, 1 reply; 21+ messages in thread
From: David Howells @ 2016-03-08 13:02 UTC (permalink / raw)
  Cc: dhowells, David Miller, linux-afs, netdev, linux-kernel

David Howells <dhowells@redhat.com> wrote:

> Does it make sense to maintain a FIFO list of connections (though this would
> mean potentially taking a spinlock every time I get a packet)?

It occurs to me that only inactive connections would need to be on an LRU
list.  Any connection with packets or active calls to deal with wouldn't be on
the list.

David

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 01/11] rxrpc: Add a common object cache
  2016-03-08 11:39     ` David Howells
@ 2016-03-08 20:13       ` David Miller
  2016-03-08 21:11       ` David Howells
  1 sibling, 0 replies; 21+ messages in thread
From: David Miller @ 2016-03-08 20:13 UTC (permalink / raw)
  To: dhowells; +Cc: linux-afs, netdev, linux-kernel

From: David Howells <dhowells@redhat.com>
Date: Tue, 08 Mar 2016 11:39:57 +0000

> One could argue, I suppose, that things should've been arranged that the RxRPC
> client would manage the lifetime of each connection it sets up, rather than
> both ends letting it lapse by mutual loss of interest.  But you *still* have to
> have a timeout, lest the client die and not close its connection.

But the point is if there is no limitation on the number of these connections
that can be setup.... wait for troubles.

And if you try to put in some kind of limit to handle this, it's then
easy for the bad guy to block out other legitimate users.

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 01/11] rxrpc: Add a common object cache
  2016-03-08 13:02     ` David Howells
@ 2016-03-08 20:15       ` David Miller
  0 siblings, 0 replies; 21+ messages in thread
From: David Miller @ 2016-03-08 20:15 UTC (permalink / raw)
  To: dhowells; +Cc: linux-afs, netdev, linux-kernel

From: David Howells <dhowells@redhat.com>
Date: Tue, 08 Mar 2016 13:02:28 +0000

> David Howells <dhowells@redhat.com> wrote:
> 
>> Does it make sense to maintain a FIFO list of connections (though this would
>> mean potentially taking a spinlock every time I get a packet)?
> 
> It occurs to me that only inactive connections would need to be on an LRU
> list.  Any connection with packets or active calls to deal with wouldn't be on
> the list.

In that kind of scheme you have to decide if it's possible to elide a
response in order to intentionally keep objects off the "inactive" LRU
list.  I bet there is.

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 01/11] rxrpc: Add a common object cache
  2016-03-08 11:39     ` David Howells
  2016-03-08 20:13       ` David Miller
@ 2016-03-08 21:11       ` David Howells
  2016-03-09  3:00         ` David Miller
  1 sibling, 1 reply; 21+ messages in thread
From: David Howells @ 2016-03-08 21:11 UTC (permalink / raw)
  To: David Miller; +Cc: dhowells, linux-afs, netdev, linux-kernel

David Miller <davem@davemloft.net> wrote:

> > One could argue, I suppose, that things should've been arranged that the
> > RxRPC client would manage the lifetime of each connection it sets up,
> > rather than both ends letting it lapse by mutual loss of interest.  But
> > you *still* have to have a timeout, lest the client die and not close its
> > connection.
> 
> But the point is if there is no limitation on the number of these connections
> that can be setup.... wait for troubles.
> 
> And if you try to put in some kind of limit to handle this, it's then
> easy for the bad guy to block out other legitimate users.

I can put in a limit per peer, where a 'peer' is either a particular remote
UDP port or particulat remote host.  TCP has this by virtue of having a
limited number of ports available per IP address.  But if I have 10 IP
addresses available, I can attempt to set up half a million TCP connections to
a server simultaneously.  If I have access to a box that has an NFS mount on
it, I can potentially open sufficient TCP ports that the NFS mount can't make
a connection if it's not allowed to use privileged ports.

Am I right in thinking that you have decided that it can't be done and
shouldn't be done?

David

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 01/11] rxrpc: Add a common object cache
  2016-03-08 21:11       ` David Howells
@ 2016-03-09  3:00         ` David Miller
  0 siblings, 0 replies; 21+ messages in thread
From: David Miller @ 2016-03-09  3:00 UTC (permalink / raw)
  To: dhowells; +Cc: linux-afs, netdev, linux-kernel

From: David Howells <dhowells@redhat.com>
Date: Tue, 08 Mar 2016 21:11:09 +0000

> I can put in a limit per peer, where a 'peer' is either a particular remote
> UDP port or particulat remote host.  TCP has this by virtue of having a
> limited number of ports available per IP address.  But if I have 10 IP
> addresses available, I can attempt to set up half a million TCP connections to
> a server simultaneously.  If I have access to a box that has an NFS mount on
> it, I can potentially open sufficient TCP ports that the NFS mount can't make
> a connection if it's not allowed to use privileged ports.

You must hold onto and commit locally to state for each and every one
of those remote TCP connections you create and actually move to
established state.

It's completely different, both sides have to make a non-trivial
resource commitment.

For this RXRPC stuff, you don't.

That's the important and critical difference.

My core argument still stands that RXRPC is fundamental DoS'able, in a
way that is not matched by TCP or our routing code or similar
subsystems.

^ permalink raw reply	[flat|nested] 21+ messages in thread

end of thread, other threads:[~2016-03-09  3:00 UTC | newest]

Thread overview: 21+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-03-07 14:37 [PATCH 00/11] RxRPC: Rewrite part 2 David Howells
2016-03-07 14:38 ` [PATCH 01/11] rxrpc: Add a common object cache David Howells
2016-03-07 18:42   ` David Miller
2016-03-07 22:45   ` David Howells
2016-03-08  4:07     ` David Miller
2016-03-08 11:39     ` David Howells
2016-03-08 20:13       ` David Miller
2016-03-08 21:11       ` David Howells
2016-03-09  3:00         ` David Miller
2016-03-08 13:02     ` David Howells
2016-03-08 20:15       ` David Miller
2016-03-07 14:38 ` [PATCH 02/11] rxrpc: Do procfs lists through objcache David Howells
2016-03-07 14:38 ` [PATCH 03/11] rxrpc: Separate local endpoint object handling out into its own file David Howells
2016-03-07 14:38 ` [PATCH 04/11] rxrpc: Implement local endpoint cache David Howells
2016-03-07 14:38 ` [PATCH 05/11] rxrpc: procfs file to list local endpoints David Howells
2016-03-07 14:38 ` [PATCH 06/11] rxrpc: Rename ar-local.c to local-event.c David Howells
2016-03-07 14:38 ` [PATCH 07/11] rxrpc: Rename ar-peer.c to peer-object.c David Howells
2016-03-07 14:38 ` [PATCH 08/11] rxrpc: Implement peer endpoint cache David Howells
2016-03-07 14:39 ` [PATCH 09/11] rxrpc: Add /proc/net/rxrpc_peers to display the known remote endpoints David Howells
2016-03-07 14:39 ` [PATCH 10/11] rxrpc: Rename ar-error.c to peer-event.c David Howells
2016-03-07 14:39 ` [PATCH 11/11] rxrpc: Rename rxrpc_UDP_error_report() to rxrpc_error_report() David Howells

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).