linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH v2 00/14] RDS: connection scalability and performance improvements
@ 2015-09-30 17:24 Santosh Shilimkar
  2015-09-30 17:24 ` [PATCH v2 01/14] RDS: use kfree_rcu in rds_ib_remove_ipaddr Santosh Shilimkar
                   ` (14 more replies)
  0 siblings, 15 replies; 19+ messages in thread
From: Santosh Shilimkar @ 2015-09-30 17:24 UTC (permalink / raw)
  To: netdev; +Cc: linux-rdma, davem, linux-kernel, ssantosh, Santosh Shilimkar

[v2]:
Dropped "[PATCH 05/15] RDS: increase size of hash-table to 8K" from
earlier version [1]. I plan to address the hash table scalability using
re-sizable hash tables as suggested by David Laight and David Miller [2]

This series addresses RDS connection bottlenecks on massive workloads and
improve the RDMA performance almost by 3X. RDS TCP also gets a small gain
of about 12%.

RDS is being used in massive systems with high scalability where several
hundred thousand end points and tens of thousands of local processes
are operating in tens of thousand sockets. Being RC(reliable connection),
socket bind and release happens very often and any inefficiencies in
bind hash look ups hurts the overall system performance. RDS bin hash-table
uses global spin-lock which is the biggest bottleneck. To make matter worst,
it uses rcu inside global lock for hash buckets.
This is being addressed by simply using per bucket rw lock which makes the
locking simple and very efficient. The hash table size is still an issue and
I plan to address it by using re-sizable hash tables as suggested on the list.

For RDS RDMA improvement, the completion handling is revamped so that we
can do batch completions. Both send and receive completion handlers are
split logically to achieve the same. RDS 8K messages being one of the
key usecase, mr pool is adapted to have the 8K mrs along with default 1M
mrs. And while doing this, few fixes and couple of bottlenecks seen with
rds_sendmsg() are addressed.

Series applies against 4.3-rc1 as well as net-next. Its tested on Oracle
hardware with IB fabric for both bcopy as well as RDMA mode. RDS TCP is
tested with iXGB NIC. Like last time, iWARP transport is untested with
these changes. The patchset is also available at below git repo:

git://git.kernel.org/pub/scm/linux/kernel/git/ssantosh/linux.git net/rds/4.3-v2

As a side note, the IB HCA driver I used for testing misses at least 3
important patches in upstream to see the full blown IB performance and
am hoping to get that in mainline with help of them.

Santosh Shilimkar (14):
  RDS: use kfree_rcu in rds_ib_remove_ipaddr
  RDS: make socket bind/release locking scheme simple and more efficient
  RDS: fix rds_sock reference bug while doing bind
  RDS: Use per-bucket rw lock for bind hash-table
  RDS: defer the over_batch work to send worker
  RDS: use rds_send_xmit() state instead of RDS_LL_SEND_FULL
  RDS: IB: ack more receive completions to improve performance
  RDS: IB: split send completion handling and do batch ack
  RDS: IB: handle rds_ibdev release case instead of crashing the kernel
  RDS: IB: fix the rds_ib_fmr_wq kick call
  RDS: IB: use already available pool handle from ibmr
  RDS: IB: mark rds_ib_fmr_wq static
  RDS: IB: use max_mr from HCA caps than max_fmr
  RDS: IB: split mr pool to improve 8K messages performance

 net/rds/af_rds.c   |   8 +---
 net/rds/bind.c     |  76 ++++++++++++++++++------------
 net/rds/ib.c       |  47 ++++++++++++------
 net/rds/ib.h       |  78 +++++++++++++++++++++++-------
 net/rds/ib_cm.c    | 114 ++++++++++++++++++++++++++++++++++++++++++--
 net/rds/ib_rdma.c  | 116 ++++++++++++++++++++++++++++++---------------
 net/rds/ib_recv.c  | 136 +++++++++++++++--------------------------------------
 net/rds/ib_send.c  | 110 ++++++++++++++++++++-----------------------
 net/rds/ib_stats.c |  22 +++++----
 net/rds/rds.h      |   1 +
 net/rds/send.c     |  15 ++++--
 net/rds/threads.c  |   2 +
 12 files changed, 445 insertions(+), 280 deletions(-)

-- 
1.9.1

Regards,
Santosh

[1] https://lkml.org/lkml/2015/9/19/384
[2] https://lkml.org/lkml/2015/9/21/828




^ permalink raw reply	[flat|nested] 19+ messages in thread

* [PATCH v2 01/14] RDS: use kfree_rcu in rds_ib_remove_ipaddr
  2015-09-30 17:24 [PATCH v2 00/14] RDS: connection scalability and performance improvements Santosh Shilimkar
@ 2015-09-30 17:24 ` Santosh Shilimkar
  2015-09-30 17:24 ` [PATCH v2 02/14] RDS: make socket bind/release locking scheme simple and more efficient Santosh Shilimkar
                   ` (13 subsequent siblings)
  14 siblings, 0 replies; 19+ messages in thread
From: Santosh Shilimkar @ 2015-09-30 17:24 UTC (permalink / raw)
  To: netdev; +Cc: linux-rdma, davem, linux-kernel, ssantosh, Santosh Shilimkar

synchronize_rcu() slowing down un-necessarily the socket shutdown
path. It is used just kfree() the ip addresses in rds_ib_remove_ipaddr()
which is perfect usecase for kfree_rcu();

So lets use that to gain some speedup.

Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
---
 net/rds/ib.h      | 1 +
 net/rds/ib_rdma.c | 6 ++----
 2 files changed, 3 insertions(+), 4 deletions(-)

diff --git a/net/rds/ib.h b/net/rds/ib.h
index aae60fd..f1fd5ff 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -164,6 +164,7 @@ struct rds_ib_connection {
 struct rds_ib_ipaddr {
 	struct list_head	list;
 	__be32			ipaddr;
+	struct rcu_head		rcu;
 };
 
 struct rds_ib_device {
diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index 251d1ce..872f523 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -159,10 +159,8 @@ static void rds_ib_remove_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
 	}
 	spin_unlock_irq(&rds_ibdev->spinlock);
 
-	if (to_free) {
-		synchronize_rcu();
-		kfree(to_free);
-	}
+	if (to_free)
+		kfree_rcu(to_free, rcu);
 }
 
 int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
-- 
1.9.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v2 02/14] RDS: make socket bind/release locking scheme simple and more efficient
  2015-09-30 17:24 [PATCH v2 00/14] RDS: connection scalability and performance improvements Santosh Shilimkar
  2015-09-30 17:24 ` [PATCH v2 01/14] RDS: use kfree_rcu in rds_ib_remove_ipaddr Santosh Shilimkar
@ 2015-09-30 17:24 ` Santosh Shilimkar
  2015-09-30 17:24 ` [PATCH v2 03/14] RDS: fix rds_sock reference bug while doing bind Santosh Shilimkar
                   ` (12 subsequent siblings)
  14 siblings, 0 replies; 19+ messages in thread
From: Santosh Shilimkar @ 2015-09-30 17:24 UTC (permalink / raw)
  To: netdev; +Cc: linux-rdma, davem, linux-kernel, ssantosh, Santosh Shilimkar

RDS bind and release locking scheme is very inefficient. It
uses RCU for maintaining the bind hash-table which is great but
it also needs to hold spinlock for [add/remove]_bound(). So
overall usecase, the hash-table concurrent speedup doesn't pay off.
In fact blocking nature of synchronize_rcu() makes the RDS
socket shutdown too slow which hurts RDS performance since
connection shutdown and re-connect happens quite often to
maintain the RC part of the protocol.

So we make the locking scheme simpler and more efficient by
replacing spin_locks with reader/writer locks and getting rid
off rcu for bind hash-table.

In subsequent patch, we also covert the global lock with per-bucket
lock to reduce the global lock contention.

Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
---
 net/rds/af_rds.c |  6 ------
 net/rds/bind.c   | 35 +++++++++++++++--------------------
 2 files changed, 15 insertions(+), 26 deletions(-)

diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c
index a2f28a6..dc08766 100644
--- a/net/rds/af_rds.c
+++ b/net/rds/af_rds.c
@@ -72,13 +72,7 @@ static int rds_release(struct socket *sock)
 	rds_clear_recv_queue(rs);
 	rds_cong_remove_socket(rs);
 
-	/*
-	 * the binding lookup hash uses rcu, we need to
-	 * make sure we synchronize_rcu before we free our
-	 * entry
-	 */
 	rds_remove_bound(rs);
-	synchronize_rcu();
 
 	rds_send_drop_to(rs, NULL);
 	rds_rdma_drop_keys(rs);
diff --git a/net/rds/bind.c b/net/rds/bind.c
index dd666fb..01989e2 100644
--- a/net/rds/bind.c
+++ b/net/rds/bind.c
@@ -40,7 +40,7 @@
 
 #define BIND_HASH_SIZE 1024
 static struct hlist_head bind_hash_table[BIND_HASH_SIZE];
-static DEFINE_SPINLOCK(rds_bind_lock);
+static DEFINE_RWLOCK(rds_bind_lock);
 
 static struct hlist_head *hash_to_bucket(__be32 addr, __be16 port)
 {
@@ -48,6 +48,7 @@ static struct hlist_head *hash_to_bucket(__be32 addr, __be16 port)
 				  (BIND_HASH_SIZE - 1));
 }
 
+/* must hold either read or write lock (write lock for insert != NULL) */
 static struct rds_sock *rds_bind_lookup(__be32 addr, __be16 port,
 					struct rds_sock *insert)
 {
@@ -56,30 +57,24 @@ static struct rds_sock *rds_bind_lookup(__be32 addr, __be16 port,
 	u64 cmp;
 	u64 needle = ((u64)be32_to_cpu(addr) << 32) | be16_to_cpu(port);
 
-	rcu_read_lock();
-	hlist_for_each_entry_rcu(rs, head, rs_bound_node) {
+	hlist_for_each_entry(rs, head, rs_bound_node) {
 		cmp = ((u64)be32_to_cpu(rs->rs_bound_addr) << 32) |
 		      be16_to_cpu(rs->rs_bound_port);
 
-		if (cmp == needle) {
-			rcu_read_unlock();
+		if (cmp == needle)
 			return rs;
-		}
 	}
-	rcu_read_unlock();
 
 	if (insert) {
 		/*
 		 * make sure our addr and port are set before
-		 * we are added to the list, other people
-		 * in rcu will find us as soon as the
-		 * hlist_add_head_rcu is done
+		 * we are added to the list.
 		 */
 		insert->rs_bound_addr = addr;
 		insert->rs_bound_port = port;
 		rds_sock_addref(insert);
 
-		hlist_add_head_rcu(&insert->rs_bound_node, head);
+		hlist_add_head(&insert->rs_bound_node, head);
 	}
 	return NULL;
 }
@@ -93,8 +88,11 @@ static struct rds_sock *rds_bind_lookup(__be32 addr, __be16 port,
 struct rds_sock *rds_find_bound(__be32 addr, __be16 port)
 {
 	struct rds_sock *rs;
+	unsigned long flags;
 
+	read_lock_irqsave(&rds_bind_lock, flags);
 	rs = rds_bind_lookup(addr, port, NULL);
+	read_unlock_irqrestore(&rds_bind_lock, flags);
 
 	if (rs && !sock_flag(rds_rs_to_sk(rs), SOCK_DEAD))
 		rds_sock_addref(rs);
@@ -103,6 +101,7 @@ struct rds_sock *rds_find_bound(__be32 addr, __be16 port)
 
 	rdsdebug("returning rs %p for %pI4:%u\n", rs, &addr,
 		ntohs(port));
+
 	return rs;
 }
 
@@ -121,7 +120,7 @@ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port)
 		last = rover - 1;
 	}
 
-	spin_lock_irqsave(&rds_bind_lock, flags);
+	write_lock_irqsave(&rds_bind_lock, flags);
 
 	do {
 		if (rover == 0)
@@ -135,7 +134,7 @@ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port)
 		}
 	} while (rover++ != last);
 
-	spin_unlock_irqrestore(&rds_bind_lock, flags);
+	write_unlock_irqrestore(&rds_bind_lock, flags);
 
 	return ret;
 }
@@ -144,19 +143,19 @@ void rds_remove_bound(struct rds_sock *rs)
 {
 	unsigned long flags;
 
-	spin_lock_irqsave(&rds_bind_lock, flags);
+	write_lock_irqsave(&rds_bind_lock, flags);
 
 	if (rs->rs_bound_addr) {
 		rdsdebug("rs %p unbinding from %pI4:%d\n",
 		  rs, &rs->rs_bound_addr,
 		  ntohs(rs->rs_bound_port));
 
-		hlist_del_init_rcu(&rs->rs_bound_node);
+		hlist_del_init(&rs->rs_bound_node);
 		rds_sock_put(rs);
 		rs->rs_bound_addr = 0;
 	}
 
-	spin_unlock_irqrestore(&rds_bind_lock, flags);
+	write_unlock_irqrestore(&rds_bind_lock, flags);
 }
 
 int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len)
@@ -200,9 +199,5 @@ int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len)
 
 out:
 	release_sock(sk);
-
-	/* we might have called rds_remove_bound on error */
-	if (ret)
-		synchronize_rcu();
 	return ret;
 }
-- 
1.9.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v2 03/14] RDS: fix rds_sock reference bug while doing bind
  2015-09-30 17:24 [PATCH v2 00/14] RDS: connection scalability and performance improvements Santosh Shilimkar
  2015-09-30 17:24 ` [PATCH v2 01/14] RDS: use kfree_rcu in rds_ib_remove_ipaddr Santosh Shilimkar
  2015-09-30 17:24 ` [PATCH v2 02/14] RDS: make socket bind/release locking scheme simple and more efficient Santosh Shilimkar
@ 2015-09-30 17:24 ` Santosh Shilimkar
  2015-09-30 17:24 ` [PATCH v2 04/14] RDS: Use per-bucket rw lock for bind hash-table Santosh Shilimkar
                   ` (11 subsequent siblings)
  14 siblings, 0 replies; 19+ messages in thread
From: Santosh Shilimkar @ 2015-09-30 17:24 UTC (permalink / raw)
  To: netdev; +Cc: linux-rdma, davem, linux-kernel, ssantosh, Santosh Shilimkar

One need to take rds socket reference while using it and release it
once done with it. rds_add_bind() code path does not do that so
lets fix it.

Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
---
 net/rds/bind.c | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)

diff --git a/net/rds/bind.c b/net/rds/bind.c
index 01989e2..166c605 100644
--- a/net/rds/bind.c
+++ b/net/rds/bind.c
@@ -61,8 +61,10 @@ static struct rds_sock *rds_bind_lookup(__be32 addr, __be16 port,
 		cmp = ((u64)be32_to_cpu(rs->rs_bound_addr) << 32) |
 		      be16_to_cpu(rs->rs_bound_port);
 
-		if (cmp == needle)
+		if (cmp == needle) {
+			rds_sock_addref(rs);
 			return rs;
+		}
 	}
 
 	if (insert) {
@@ -94,10 +96,10 @@ struct rds_sock *rds_find_bound(__be32 addr, __be16 port)
 	rs = rds_bind_lookup(addr, port, NULL);
 	read_unlock_irqrestore(&rds_bind_lock, flags);
 
-	if (rs && !sock_flag(rds_rs_to_sk(rs), SOCK_DEAD))
-		rds_sock_addref(rs);
-	else
+	if (rs && sock_flag(rds_rs_to_sk(rs), SOCK_DEAD)) {
+		rds_sock_put(rs);
 		rs = NULL;
+	}
 
 	rdsdebug("returning rs %p for %pI4:%u\n", rs, &addr,
 		ntohs(port));
@@ -123,14 +125,18 @@ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port)
 	write_lock_irqsave(&rds_bind_lock, flags);
 
 	do {
+		struct rds_sock *rrs;
 		if (rover == 0)
 			rover++;
-		if (!rds_bind_lookup(addr, cpu_to_be16(rover), rs)) {
+		rrs = rds_bind_lookup(addr, cpu_to_be16(rover), rs);
+		if (!rrs) {
 			*port = rs->rs_bound_port;
 			ret = 0;
 			rdsdebug("rs %p binding to %pI4:%d\n",
 			  rs, &addr, (int)ntohs(*port));
 			break;
+		} else {
+			rds_sock_put(rrs);
 		}
 	} while (rover++ != last);
 
-- 
1.9.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v2 04/14] RDS: Use per-bucket rw lock for bind hash-table
  2015-09-30 17:24 [PATCH v2 00/14] RDS: connection scalability and performance improvements Santosh Shilimkar
                   ` (2 preceding siblings ...)
  2015-09-30 17:24 ` [PATCH v2 03/14] RDS: fix rds_sock reference bug while doing bind Santosh Shilimkar
@ 2015-09-30 17:24 ` Santosh Shilimkar
  2015-09-30 17:24 ` [PATCH v2 05/14] RDS: defer the over_batch work to send worker Santosh Shilimkar
                   ` (10 subsequent siblings)
  14 siblings, 0 replies; 19+ messages in thread
From: Santosh Shilimkar @ 2015-09-30 17:24 UTC (permalink / raw)
  To: netdev; +Cc: linux-rdma, davem, linux-kernel, ssantosh, Santosh Shilimkar

One global lock protecting hash-tables with 1024 buckets isn't
efficient and it shows up in a massive systems with truck
loads of RDS sockets serving multiple databases. The
perf data clearly highlights the contention on the rw
lock in these massive workloads.

When the contention gets worse, the code gets into a state where
it decides to back off on the lock. So while it has disabled interrupts,
it sits and backs off on this lock get. This causes the system to
become sluggish and eventually all sorts of bad things happen.

The simple fix is to move the lock into the hash bucket and
use per-bucket lock to improve the scalability.

Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
---
 net/rds/af_rds.c |  2 ++
 net/rds/bind.c   | 47 ++++++++++++++++++++++++++++++++---------------
 net/rds/rds.h    |  1 +
 3 files changed, 35 insertions(+), 15 deletions(-)

diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c
index dc08766..384ea1e 100644
--- a/net/rds/af_rds.c
+++ b/net/rds/af_rds.c
@@ -582,6 +582,8 @@ static int rds_init(void)
 {
 	int ret;
 
+	rds_bind_lock_init();
+
 	ret = rds_conn_init();
 	if (ret)
 		goto out;
diff --git a/net/rds/bind.c b/net/rds/bind.c
index 166c605..bc6b93e 100644
--- a/net/rds/bind.c
+++ b/net/rds/bind.c
@@ -38,22 +38,27 @@
 #include <linux/ratelimit.h>
 #include "rds.h"
 
+struct bind_bucket {
+	rwlock_t                lock;
+	struct hlist_head	head;
+};
+
 #define BIND_HASH_SIZE 1024
-static struct hlist_head bind_hash_table[BIND_HASH_SIZE];
-static DEFINE_RWLOCK(rds_bind_lock);
+static struct bind_bucket bind_hash_table[BIND_HASH_SIZE];
 
-static struct hlist_head *hash_to_bucket(__be32 addr, __be16 port)
+static struct bind_bucket *hash_to_bucket(__be32 addr, __be16 port)
 {
 	return bind_hash_table + (jhash_2words((u32)addr, (u32)port, 0) &
 				  (BIND_HASH_SIZE - 1));
 }
 
 /* must hold either read or write lock (write lock for insert != NULL) */
-static struct rds_sock *rds_bind_lookup(__be32 addr, __be16 port,
+static struct rds_sock *rds_bind_lookup(struct bind_bucket *bucket,
+					__be32 addr, __be16 port,
 					struct rds_sock *insert)
 {
 	struct rds_sock *rs;
-	struct hlist_head *head = hash_to_bucket(addr, port);
+	struct hlist_head *head = &bucket->head;
 	u64 cmp;
 	u64 needle = ((u64)be32_to_cpu(addr) << 32) | be16_to_cpu(port);
 
@@ -91,10 +96,11 @@ struct rds_sock *rds_find_bound(__be32 addr, __be16 port)
 {
 	struct rds_sock *rs;
 	unsigned long flags;
+	struct bind_bucket *bucket = hash_to_bucket(addr, port);
 
-	read_lock_irqsave(&rds_bind_lock, flags);
-	rs = rds_bind_lookup(addr, port, NULL);
-	read_unlock_irqrestore(&rds_bind_lock, flags);
+	read_lock_irqsave(&bucket->lock, flags);
+	rs = rds_bind_lookup(bucket, addr, port, NULL);
+	read_unlock_irqrestore(&bucket->lock, flags);
 
 	if (rs && sock_flag(rds_rs_to_sk(rs), SOCK_DEAD)) {
 		rds_sock_put(rs);
@@ -113,6 +119,7 @@ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port)
 	unsigned long flags;
 	int ret = -EADDRINUSE;
 	u16 rover, last;
+	struct bind_bucket *bucket;
 
 	if (*port != 0) {
 		rover = be16_to_cpu(*port);
@@ -122,13 +129,15 @@ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port)
 		last = rover - 1;
 	}
 
-	write_lock_irqsave(&rds_bind_lock, flags);
-
 	do {
 		struct rds_sock *rrs;
 		if (rover == 0)
 			rover++;
-		rrs = rds_bind_lookup(addr, cpu_to_be16(rover), rs);
+
+		bucket = hash_to_bucket(addr, cpu_to_be16(rover));
+		write_lock_irqsave(&bucket->lock, flags);
+		rrs = rds_bind_lookup(bucket, addr, cpu_to_be16(rover), rs);
+		write_unlock_irqrestore(&bucket->lock, flags);
 		if (!rrs) {
 			*port = rs->rs_bound_port;
 			ret = 0;
@@ -140,16 +149,16 @@ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port)
 		}
 	} while (rover++ != last);
 
-	write_unlock_irqrestore(&rds_bind_lock, flags);
-
 	return ret;
 }
 
 void rds_remove_bound(struct rds_sock *rs)
 {
 	unsigned long flags;
+	struct bind_bucket *bucket =
+		hash_to_bucket(rs->rs_bound_addr, rs->rs_bound_port);
 
-	write_lock_irqsave(&rds_bind_lock, flags);
+	write_lock_irqsave(&bucket->lock, flags);
 
 	if (rs->rs_bound_addr) {
 		rdsdebug("rs %p unbinding from %pI4:%d\n",
@@ -161,7 +170,7 @@ void rds_remove_bound(struct rds_sock *rs)
 		rs->rs_bound_addr = 0;
 	}
 
-	write_unlock_irqrestore(&rds_bind_lock, flags);
+	write_unlock_irqrestore(&bucket->lock, flags);
 }
 
 int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len)
@@ -207,3 +216,11 @@ out:
 	release_sock(sk);
 	return ret;
 }
+
+void rds_bind_lock_init(void)
+{
+	int i;
+
+	for (i = 0; i < BIND_HASH_SIZE; i++)
+		rwlock_init(&bind_hash_table[i].lock);
+}
diff --git a/net/rds/rds.h b/net/rds/rds.h
index afb4048..121fb81 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -603,6 +603,7 @@ extern wait_queue_head_t rds_poll_waitq;
 int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len);
 void rds_remove_bound(struct rds_sock *rs);
 struct rds_sock *rds_find_bound(__be32 addr, __be16 port);
+void rds_bind_lock_init(void);
 
 /* cong.c */
 int rds_cong_get_maps(struct rds_connection *conn);
-- 
1.9.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v2 05/14] RDS: defer the over_batch work to send worker
  2015-09-30 17:24 [PATCH v2 00/14] RDS: connection scalability and performance improvements Santosh Shilimkar
                   ` (3 preceding siblings ...)
  2015-09-30 17:24 ` [PATCH v2 04/14] RDS: Use per-bucket rw lock for bind hash-table Santosh Shilimkar
@ 2015-09-30 17:24 ` Santosh Shilimkar
  2015-10-05 10:30   ` David Miller
  2015-09-30 17:24 ` [PATCH v2 06/14] RDS: use rds_send_xmit() state instead of RDS_LL_SEND_FULL Santosh Shilimkar
                   ` (9 subsequent siblings)
  14 siblings, 1 reply; 19+ messages in thread
From: Santosh Shilimkar @ 2015-09-30 17:24 UTC (permalink / raw)
  To: netdev; +Cc: linux-rdma, davem, linux-kernel, ssantosh, Santosh Shilimkar

Current process gives up if its send work over the batch limit.
The work queue will get  kicked to finish off any other requests.
This fixes remainder condition from commit 443be0e5affe ("RDS: make
sure not to loop forever inside rds_send_xmit").

The restart condition is only for the case where we reached to
over_batch code for some other reason so just retrying again
before giving up.

Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
---
 net/rds/send.c | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/net/rds/send.c b/net/rds/send.c
index 4df61a5..f1e709c 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -423,7 +423,9 @@ over_batch:
 		     !list_empty(&conn->c_send_queue)) &&
 		    send_gen == conn->c_send_gen) {
 			rds_stats_inc(s_send_lock_queue_raced);
-			goto restart;
+			if (batch_count < 1024)
+				goto restart;
+			queue_delayed_work(rds_wq, &conn->c_send_w, 1);
 		}
 	}
 out:
-- 
1.9.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v2 06/14] RDS: use rds_send_xmit() state instead of RDS_LL_SEND_FULL
  2015-09-30 17:24 [PATCH v2 00/14] RDS: connection scalability and performance improvements Santosh Shilimkar
                   ` (4 preceding siblings ...)
  2015-09-30 17:24 ` [PATCH v2 05/14] RDS: defer the over_batch work to send worker Santosh Shilimkar
@ 2015-09-30 17:24 ` Santosh Shilimkar
  2015-09-30 17:24 ` [PATCH v2 07/14] RDS: IB: ack more receive completions to improve performance Santosh Shilimkar
                   ` (8 subsequent siblings)
  14 siblings, 0 replies; 19+ messages in thread
From: Santosh Shilimkar @ 2015-09-30 17:24 UTC (permalink / raw)
  To: netdev; +Cc: linux-rdma, davem, linux-kernel, ssantosh, Santosh Shilimkar

In Transport indepedent rds_sendmsg(), we shouldn't make decisions based
on RDS_LL_SEND_FULL which is used to manage the ring for RDMA based
transports. We can safely issue rds_send_xmit() and the using its
return value take decision on deferred work. This will also fix
the scenario where at times we are seeing connections stuck with
the LL_SEND_FULL bit getting set and never cleared.

We kick krdsd after any time we see -ENOMEM or -EAGAIN from the
ring allocation code.

Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
---
 net/rds/send.c    | 10 ++++++----
 net/rds/threads.c |  2 ++
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/net/rds/send.c b/net/rds/send.c
index f1e709c..9d8b52d 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -1122,8 +1122,9 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
 	 */
 	rds_stats_inc(s_send_queued);
 
-	if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags))
-		rds_send_xmit(conn);
+	ret = rds_send_xmit(conn);
+	if (ret == -ENOMEM || ret == -EAGAIN)
+		queue_delayed_work(rds_wq, &conn->c_send_w, 1);
 
 	rds_message_put(rm);
 	return payload_len;
@@ -1179,8 +1180,9 @@ rds_send_pong(struct rds_connection *conn, __be16 dport)
 	rds_stats_inc(s_send_queued);
 	rds_stats_inc(s_send_pong);
 
-	if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags))
-		queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+	ret = rds_send_xmit(conn);
+	if (ret == -ENOMEM || ret == -EAGAIN)
+		queue_delayed_work(rds_wq, &conn->c_send_w, 1);
 
 	rds_message_put(rm);
 	return 0;
diff --git a/net/rds/threads.c b/net/rds/threads.c
index dc2402e..454aa6d 100644
--- a/net/rds/threads.c
+++ b/net/rds/threads.c
@@ -162,7 +162,9 @@ void rds_send_worker(struct work_struct *work)
 	int ret;
 
 	if (rds_conn_state(conn) == RDS_CONN_UP) {
+		clear_bit(RDS_LL_SEND_FULL, &conn->c_flags);
 		ret = rds_send_xmit(conn);
+		cond_resched();
 		rdsdebug("conn %p ret %d\n", conn, ret);
 		switch (ret) {
 		case -EAGAIN:
-- 
1.9.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v2 07/14] RDS: IB: ack more receive completions to improve performance
  2015-09-30 17:24 [PATCH v2 00/14] RDS: connection scalability and performance improvements Santosh Shilimkar
                   ` (5 preceding siblings ...)
  2015-09-30 17:24 ` [PATCH v2 06/14] RDS: use rds_send_xmit() state instead of RDS_LL_SEND_FULL Santosh Shilimkar
@ 2015-09-30 17:24 ` Santosh Shilimkar
  2015-09-30 17:24 ` [PATCH v2 08/14] RDS: IB: split send completion handling and do batch ack Santosh Shilimkar
                   ` (7 subsequent siblings)
  14 siblings, 0 replies; 19+ messages in thread
From: Santosh Shilimkar @ 2015-09-30 17:24 UTC (permalink / raw)
  To: netdev; +Cc: linux-rdma, davem, linux-kernel, ssantosh, Santosh Shilimkar

For better performance, we split the receive completion IRQ handler. That
lets us acknowledge several WCE events in one call. We also limit the WC
to max 32 to avoid latency. Acknowledging several completions in one call
instead of several calls each time will provide better performance since
less mutual exclusion locks are being performed.

In next patch, send completion is also split which re-uses the poll_cq()
and hence the code is moved to ib_cm.c

Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
---
 net/rds/ib.h       |  28 +++++++++--
 net/rds/ib_cm.c    |  70 ++++++++++++++++++++++++++-
 net/rds/ib_recv.c  | 136 +++++++++++++++--------------------------------------
 net/rds/ib_stats.c |   3 +-
 4 files changed, 132 insertions(+), 105 deletions(-)

diff --git a/net/rds/ib.h b/net/rds/ib.h
index f1fd5ff..727759b 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -24,6 +24,8 @@
 
 #define RDS_IB_RECYCLE_BATCH_COUNT	32
 
+#define RDS_IB_WC_MAX			32
+
 extern struct rw_semaphore rds_ib_devices_lock;
 extern struct list_head rds_ib_devices;
 
@@ -89,6 +91,20 @@ struct rds_ib_work_ring {
 	atomic_t	w_free_ctr;
 };
 
+/* Rings are posted with all the allocations they'll need to queue the
+ * incoming message to the receiving socket so this can't fail.
+ * All fragments start with a header, so we can make sure we're not receiving
+ * garbage, and we can tell a small 8 byte fragment from an ACK frame.
+ */
+struct rds_ib_ack_state {
+	u64		ack_next;
+	u64		ack_recv;
+	unsigned int	ack_required:1;
+	unsigned int	ack_next_valid:1;
+	unsigned int	ack_recv_valid:1;
+};
+
+
 struct rds_ib_device;
 
 struct rds_ib_connection {
@@ -102,6 +118,10 @@ struct rds_ib_connection {
 	struct ib_pd		*i_pd;
 	struct ib_cq		*i_send_cq;
 	struct ib_cq		*i_recv_cq;
+	struct ib_wc		i_recv_wc[RDS_IB_WC_MAX];
+
+	/* interrupt handling */
+	struct tasklet_struct	i_recv_tasklet;
 
 	/* tx */
 	struct rds_ib_work_ring	i_send_ring;
@@ -112,7 +132,6 @@ struct rds_ib_connection {
 	atomic_t		i_signaled_sends;
 
 	/* rx */
-	struct tasklet_struct	i_recv_tasklet;
 	struct mutex		i_recv_mutex;
 	struct rds_ib_work_ring	i_recv_ring;
 	struct rds_ib_incoming	*i_ibinc;
@@ -199,13 +218,14 @@ struct rds_ib_statistics {
 	uint64_t	s_ib_connect_raced;
 	uint64_t	s_ib_listen_closed_stale;
 	uint64_t	s_ib_tx_cq_call;
+	uint64_t	s_ib_evt_handler_call;
+	uint64_t	s_ib_tasklet_call;
 	uint64_t	s_ib_tx_cq_event;
 	uint64_t	s_ib_tx_ring_full;
 	uint64_t	s_ib_tx_throttle;
 	uint64_t	s_ib_tx_sg_mapping_failure;
 	uint64_t	s_ib_tx_stalled;
 	uint64_t	s_ib_tx_credit_updates;
-	uint64_t	s_ib_rx_cq_call;
 	uint64_t	s_ib_rx_cq_event;
 	uint64_t	s_ib_rx_ring_empty;
 	uint64_t	s_ib_rx_refill_from_cq;
@@ -324,7 +344,8 @@ void rds_ib_recv_free_caches(struct rds_ib_connection *ic);
 void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp);
 void rds_ib_inc_free(struct rds_incoming *inc);
 int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to);
-void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context);
+void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc,
+			     struct rds_ib_ack_state *state);
 void rds_ib_recv_tasklet_fn(unsigned long data);
 void rds_ib_recv_init_ring(struct rds_ib_connection *ic);
 void rds_ib_recv_clear_ring(struct rds_ib_connection *ic);
@@ -332,6 +353,7 @@ void rds_ib_recv_init_ack(struct rds_ib_connection *ic);
 void rds_ib_attempt_ack(struct rds_ib_connection *ic);
 void rds_ib_ack_send_complete(struct rds_ib_connection *ic);
 u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic);
+void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required);
 
 /* ib_ring.c */
 void rds_ib_ring_init(struct rds_ib_work_ring *ring, u32 nr);
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 9043f5c..28e0979 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -216,6 +216,72 @@ static void rds_ib_cq_event_handler(struct ib_event *event, void *data)
 		 event->event, ib_event_msg(event->event), data);
 }
 
+/* Plucking the oldest entry from the ring can be done concurrently with
+ * the thread refilling the ring.  Each ring operation is protected by
+ * spinlocks and the transient state of refilling doesn't change the
+ * recording of which entry is oldest.
+ *
+ * This relies on IB only calling one cq comp_handler for each cq so that
+ * there will only be one caller of rds_recv_incoming() per RDS connection.
+ */
+static void rds_ib_cq_comp_handler_recv(struct ib_cq *cq, void *context)
+{
+	struct rds_connection *conn = context;
+	struct rds_ib_connection *ic = conn->c_transport_data;
+
+	rdsdebug("conn %p cq %p\n", conn, cq);
+
+	rds_ib_stats_inc(s_ib_evt_handler_call);
+
+	tasklet_schedule(&ic->i_recv_tasklet);
+}
+
+static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
+		    struct ib_wc *wcs,
+		    struct rds_ib_ack_state *ack_state)
+{
+	int nr;
+	int i;
+	struct ib_wc *wc;
+
+	while ((nr = ib_poll_cq(cq, RDS_IB_WC_MAX, wcs)) > 0) {
+		for (i = 0; i < nr; i++) {
+			wc = wcs + i;
+			rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
+				 (unsigned long long)wc->wr_id, wc->status,
+				 wc->byte_len, be32_to_cpu(wc->ex.imm_data));
+			rds_ib_recv_cqe_handler(ic, wc, ack_state);
+		}
+	}
+}
+
+static void rds_ib_tasklet_fn_recv(unsigned long data)
+{
+	struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
+	struct rds_connection *conn = ic->conn;
+	struct rds_ib_device *rds_ibdev = ic->rds_ibdev;
+	struct rds_ib_ack_state state;
+
+	BUG_ON(!rds_ibdev);
+
+	rds_ib_stats_inc(s_ib_tasklet_call);
+
+	memset(&state, 0, sizeof(state));
+	poll_cq(ic, ic->i_recv_cq, ic->i_recv_wc, &state);
+	ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
+	poll_cq(ic, ic->i_recv_cq, ic->i_recv_wc, &state);
+
+	if (state.ack_next_valid)
+		rds_ib_set_ack(ic, state.ack_next, state.ack_required);
+	if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
+		rds_send_drop_acked(conn, state.ack_recv, NULL);
+		ic->i_ack_recv = state.ack_recv;
+	}
+
+	if (rds_conn_up(conn))
+		rds_ib_attempt_ack(ic);
+}
+
 static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
 {
 	struct rds_connection *conn = data;
@@ -282,7 +348,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
 	}
 
 	cq_attr.cqe = ic->i_recv_ring.w_nr;
-	ic->i_recv_cq = ib_create_cq(dev, rds_ib_recv_cq_comp_handler,
+	ic->i_recv_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_recv,
 				     rds_ib_cq_event_handler, conn,
 				     &cq_attr);
 	if (IS_ERR(ic->i_recv_cq)) {
@@ -743,7 +809,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
 	}
 
 	INIT_LIST_HEAD(&ic->ib_node);
-	tasklet_init(&ic->i_recv_tasklet, rds_ib_recv_tasklet_fn,
+	tasklet_init(&ic->i_recv_tasklet, rds_ib_tasklet_fn_recv,
 		     (unsigned long) ic);
 	mutex_init(&ic->i_recv_mutex);
 #ifndef KERNEL_HAS_ATOMIC64
diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c
index f43831e..96744b7 100644
--- a/net/rds/ib_recv.c
+++ b/net/rds/ib_recv.c
@@ -596,8 +596,7 @@ void rds_ib_recv_init_ack(struct rds_ib_connection *ic)
  * wr_id and avoids working with the ring in that case.
  */
 #ifndef KERNEL_HAS_ATOMIC64
-static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq,
-				int ack_required)
+void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required)
 {
 	unsigned long flags;
 
@@ -622,8 +621,7 @@ static u64 rds_ib_get_ack(struct rds_ib_connection *ic)
 	return seq;
 }
 #else
-static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq,
-				int ack_required)
+void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required)
 {
 	atomic64_set(&ic->i_ack_next, seq);
 	if (ack_required) {
@@ -830,20 +828,6 @@ static void rds_ib_cong_recv(struct rds_connection *conn,
 	rds_cong_map_updated(map, uncongested);
 }
 
-/*
- * Rings are posted with all the allocations they'll need to queue the
- * incoming message to the receiving socket so this can't fail.
- * All fragments start with a header, so we can make sure we're not receiving
- * garbage, and we can tell a small 8 byte fragment from an ACK frame.
- */
-struct rds_ib_ack_state {
-	u64		ack_next;
-	u64		ack_recv;
-	unsigned int	ack_required:1;
-	unsigned int	ack_next_valid:1;
-	unsigned int	ack_recv_valid:1;
-};
-
 static void rds_ib_process_recv(struct rds_connection *conn,
 				struct rds_ib_recv_work *recv, u32 data_len,
 				struct rds_ib_ack_state *state)
@@ -969,96 +953,50 @@ static void rds_ib_process_recv(struct rds_connection *conn,
 	}
 }
 
-/*
- * Plucking the oldest entry from the ring can be done concurrently with
- * the thread refilling the ring.  Each ring operation is protected by
- * spinlocks and the transient state of refilling doesn't change the
- * recording of which entry is oldest.
- *
- * This relies on IB only calling one cq comp_handler for each cq so that
- * there will only be one caller of rds_recv_incoming() per RDS connection.
- */
-void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context)
-{
-	struct rds_connection *conn = context;
-	struct rds_ib_connection *ic = conn->c_transport_data;
-
-	rdsdebug("conn %p cq %p\n", conn, cq);
-
-	rds_ib_stats_inc(s_ib_rx_cq_call);
-
-	tasklet_schedule(&ic->i_recv_tasklet);
-}
-
-static inline void rds_poll_cq(struct rds_ib_connection *ic,
-			       struct rds_ib_ack_state *state)
+void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic,
+			     struct ib_wc *wc,
+			     struct rds_ib_ack_state *state)
 {
 	struct rds_connection *conn = ic->conn;
-	struct ib_wc wc;
 	struct rds_ib_recv_work *recv;
 
-	while (ib_poll_cq(ic->i_recv_cq, 1, &wc) > 0) {
-		rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
-			 (unsigned long long)wc.wr_id, wc.status,
-			 ib_wc_status_msg(wc.status), wc.byte_len,
-			 be32_to_cpu(wc.ex.imm_data));
-		rds_ib_stats_inc(s_ib_rx_cq_event);
+	rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
+		 (unsigned long long)wc->wr_id, wc->status,
+		 ib_wc_status_msg(wc->status), wc->byte_len,
+		 be32_to_cpu(wc->ex.imm_data));
 
-		recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)];
-
-		ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE);
-
-		/*
-		 * Also process recvs in connecting state because it is possible
-		 * to get a recv completion _before_ the rdmacm ESTABLISHED
-		 * event is processed.
-		 */
-		if (wc.status == IB_WC_SUCCESS) {
-			rds_ib_process_recv(conn, recv, wc.byte_len, state);
-		} else {
-			/* We expect errors as the qp is drained during shutdown */
-			if (rds_conn_up(conn) || rds_conn_connecting(conn))
-				rds_ib_conn_error(conn, "recv completion on %pI4 had "
-						  "status %u (%s), disconnecting and "
-						  "reconnecting\n", &conn->c_faddr,
-						  wc.status,
-						  ib_wc_status_msg(wc.status));
-		}
+	rds_ib_stats_inc(s_ib_rx_cq_event);
+	recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)];
+	ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1,
+			DMA_FROM_DEVICE);
 
-		/*
-		 * rds_ib_process_recv() doesn't always consume the frag, and
-		 * we might not have called it at all if the wc didn't indicate
-		 * success. We already unmapped the frag's pages, though, and
-		 * the following rds_ib_ring_free() call tells the refill path
-		 * that it will not find an allocated frag here. Make sure we
-		 * keep that promise by freeing a frag that's still on the ring.
-		 */
-		if (recv->r_frag) {
-			rds_ib_frag_free(ic, recv->r_frag);
-			recv->r_frag = NULL;
-		}
-		rds_ib_ring_free(&ic->i_recv_ring, 1);
+	/* Also process recvs in connecting state because it is possible
+	 * to get a recv completion _before_ the rdmacm ESTABLISHED
+	 * event is processed.
+	 */
+	if (wc->status == IB_WC_SUCCESS) {
+		rds_ib_process_recv(conn, recv, wc->byte_len, state);
+	} else {
+		/* We expect errors as the qp is drained during shutdown */
+		if (rds_conn_up(conn) || rds_conn_connecting(conn))
+			rds_ib_conn_error(conn, "recv completion on %pI4 had status %u (%s), disconnecting and reconnecting\n",
+					  &conn->c_faddr,
+					  wc->status,
+					  ib_wc_status_msg(wc->status));
 	}
-}
 
-void rds_ib_recv_tasklet_fn(unsigned long data)
-{
-	struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
-	struct rds_connection *conn = ic->conn;
-	struct rds_ib_ack_state state = { 0, };
-
-	rds_poll_cq(ic, &state);
-	ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
-	rds_poll_cq(ic, &state);
-
-	if (state.ack_next_valid)
-		rds_ib_set_ack(ic, state.ack_next, state.ack_required);
-	if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
-		rds_send_drop_acked(conn, state.ack_recv, NULL);
-		ic->i_ack_recv = state.ack_recv;
+	/* rds_ib_process_recv() doesn't always consume the frag, and
+	 * we might not have called it at all if the wc didn't indicate
+	 * success. We already unmapped the frag's pages, though, and
+	 * the following rds_ib_ring_free() call tells the refill path
+	 * that it will not find an allocated frag here. Make sure we
+	 * keep that promise by freeing a frag that's still on the ring.
+	 */
+	if (recv->r_frag) {
+		rds_ib_frag_free(ic, recv->r_frag);
+		recv->r_frag = NULL;
 	}
-	if (rds_conn_up(conn))
-		rds_ib_attempt_ack(ic);
+	rds_ib_ring_free(&ic->i_recv_ring, 1);
 
 	/* If we ever end up with a really empty receive ring, we're
 	 * in deep trouble, as the sender will definitely see RNR
diff --git a/net/rds/ib_stats.c b/net/rds/ib_stats.c
index 2d5965d..bdf6115 100644
--- a/net/rds/ib_stats.c
+++ b/net/rds/ib_stats.c
@@ -42,14 +42,15 @@ DEFINE_PER_CPU_SHARED_ALIGNED(struct rds_ib_statistics, rds_ib_stats);
 static const char *const rds_ib_stat_names[] = {
 	"ib_connect_raced",
 	"ib_listen_closed_stale",
+	"s_ib_evt_handler_call",
 	"ib_tx_cq_call",
+	"ib_tasklet_call",
 	"ib_tx_cq_event",
 	"ib_tx_ring_full",
 	"ib_tx_throttle",
 	"ib_tx_sg_mapping_failure",
 	"ib_tx_stalled",
 	"ib_tx_credit_updates",
-	"ib_rx_cq_call",
 	"ib_rx_cq_event",
 	"ib_rx_ring_empty",
 	"ib_rx_refill_from_cq",
-- 
1.9.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v2 08/14] RDS: IB: split send completion handling and do batch ack
  2015-09-30 17:24 [PATCH v2 00/14] RDS: connection scalability and performance improvements Santosh Shilimkar
                   ` (6 preceding siblings ...)
  2015-09-30 17:24 ` [PATCH v2 07/14] RDS: IB: ack more receive completions to improve performance Santosh Shilimkar
@ 2015-09-30 17:24 ` Santosh Shilimkar
  2015-09-30 17:24 ` [PATCH v2 09/14] RDS: IB: handle rds_ibdev release case instead of crashing the kernel Santosh Shilimkar
                   ` (6 subsequent siblings)
  14 siblings, 0 replies; 19+ messages in thread
From: Santosh Shilimkar @ 2015-09-30 17:24 UTC (permalink / raw)
  To: netdev; +Cc: linux-rdma, davem, linux-kernel, ssantosh, Santosh Shilimkar

Similar to what we did with receive CQ completion handling, we split
the transmit completion handler so that it lets us implement batched
work completion handling.

We re-use the cq_poll routine and makes use of RDS_IB_SEND_OP to
identify the send vs receive completion event handler invocation.

Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
---
 net/rds/ib.h       |   6 ++-
 net/rds/ib_cm.c    |  45 ++++++++++++++++++++--
 net/rds/ib_send.c  | 110 +++++++++++++++++++++++++----------------------------
 net/rds/ib_stats.c |   1 -
 net/rds/send.c     |   1 +
 5 files changed, 98 insertions(+), 65 deletions(-)

diff --git a/net/rds/ib.h b/net/rds/ib.h
index 727759b..3a8cd31 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -25,6 +25,7 @@
 #define RDS_IB_RECYCLE_BATCH_COUNT	32
 
 #define RDS_IB_WC_MAX			32
+#define RDS_IB_SEND_OP			BIT_ULL(63)
 
 extern struct rw_semaphore rds_ib_devices_lock;
 extern struct list_head rds_ib_devices;
@@ -118,9 +119,11 @@ struct rds_ib_connection {
 	struct ib_pd		*i_pd;
 	struct ib_cq		*i_send_cq;
 	struct ib_cq		*i_recv_cq;
+	struct ib_wc		i_send_wc[RDS_IB_WC_MAX];
 	struct ib_wc		i_recv_wc[RDS_IB_WC_MAX];
 
 	/* interrupt handling */
+	struct tasklet_struct	i_send_tasklet;
 	struct tasklet_struct	i_recv_tasklet;
 
 	/* tx */
@@ -217,7 +220,6 @@ struct rds_ib_device {
 struct rds_ib_statistics {
 	uint64_t	s_ib_connect_raced;
 	uint64_t	s_ib_listen_closed_stale;
-	uint64_t	s_ib_tx_cq_call;
 	uint64_t	s_ib_evt_handler_call;
 	uint64_t	s_ib_tasklet_call;
 	uint64_t	s_ib_tx_cq_event;
@@ -371,7 +373,7 @@ extern wait_queue_head_t rds_ib_ring_empty_wait;
 void rds_ib_xmit_complete(struct rds_connection *conn);
 int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
 		unsigned int hdr_off, unsigned int sg, unsigned int off);
-void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context);
+void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc);
 void rds_ib_send_init_ring(struct rds_ib_connection *ic);
 void rds_ib_send_clear_ring(struct rds_ib_connection *ic);
 int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op);
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 28e0979..8f51d0d 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -250,11 +250,34 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
 			rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
 				 (unsigned long long)wc->wr_id, wc->status,
 				 wc->byte_len, be32_to_cpu(wc->ex.imm_data));
-			rds_ib_recv_cqe_handler(ic, wc, ack_state);
+
+			if (wc->wr_id & RDS_IB_SEND_OP)
+				rds_ib_send_cqe_handler(ic, wc);
+			else
+				rds_ib_recv_cqe_handler(ic, wc, ack_state);
 		}
 	}
 }
 
+static void rds_ib_tasklet_fn_send(unsigned long data)
+{
+	struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
+	struct rds_connection *conn = ic->conn;
+	struct rds_ib_ack_state state;
+
+	rds_ib_stats_inc(s_ib_tasklet_call);
+
+	memset(&state, 0, sizeof(state));
+	poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state);
+	ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP);
+	poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state);
+
+	if (rds_conn_up(conn) &&
+	    (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
+	    test_bit(0, &conn->c_map_queued)))
+		rds_send_xmit(ic->conn);
+}
+
 static void rds_ib_tasklet_fn_recv(unsigned long data)
 {
 	struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
@@ -304,6 +327,18 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
 	}
 }
 
+static void rds_ib_cq_comp_handler_send(struct ib_cq *cq, void *context)
+{
+	struct rds_connection *conn = context;
+	struct rds_ib_connection *ic = conn->c_transport_data;
+
+	rdsdebug("conn %p cq %p\n", conn, cq);
+
+	rds_ib_stats_inc(s_ib_evt_handler_call);
+
+	tasklet_schedule(&ic->i_send_tasklet);
+}
+
 /*
  * This needs to be very careful to not leave IS_ERR pointers around for
  * cleanup to trip over.
@@ -337,7 +372,8 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
 	ic->i_pd = rds_ibdev->pd;
 
 	cq_attr.cqe = ic->i_send_ring.w_nr + 1;
-	ic->i_send_cq = ib_create_cq(dev, rds_ib_send_cq_comp_handler,
+
+	ic->i_send_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_send,
 				     rds_ib_cq_event_handler, conn,
 				     &cq_attr);
 	if (IS_ERR(ic->i_send_cq)) {
@@ -703,6 +739,7 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
 		wait_event(rds_ib_ring_empty_wait,
 			   rds_ib_ring_empty(&ic->i_recv_ring) &&
 			   (atomic_read(&ic->i_signaled_sends) == 0));
+		tasklet_kill(&ic->i_send_tasklet);
 		tasklet_kill(&ic->i_recv_tasklet);
 
 		/* first destroy the ib state that generates callbacks */
@@ -809,8 +846,10 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
 	}
 
 	INIT_LIST_HEAD(&ic->ib_node);
+	tasklet_init(&ic->i_send_tasklet, rds_ib_tasklet_fn_send,
+		     (unsigned long)ic);
 	tasklet_init(&ic->i_recv_tasklet, rds_ib_tasklet_fn_recv,
-		     (unsigned long) ic);
+		     (unsigned long)ic);
 	mutex_init(&ic->i_recv_mutex);
 #ifndef KERNEL_HAS_ATOMIC64
 	spin_lock_init(&ic->i_ack_lock);
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 4e88047..670882c 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -195,7 +195,7 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
 
 		send->s_op = NULL;
 
-		send->s_wr.wr_id = i;
+		send->s_wr.wr_id = i | RDS_IB_SEND_OP;
 		send->s_wr.sg_list = send->s_sge;
 		send->s_wr.ex.imm_data = 0;
 
@@ -237,81 +237,73 @@ static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
  * unallocs the next free entry in the ring it doesn't alter which is
  * the next to be freed, which is what this is concerned with.
  */
-void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
+void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc)
 {
-	struct rds_connection *conn = context;
-	struct rds_ib_connection *ic = conn->c_transport_data;
 	struct rds_message *rm = NULL;
-	struct ib_wc wc;
+	struct rds_connection *conn = ic->conn;
 	struct rds_ib_send_work *send;
 	u32 completed;
 	u32 oldest;
 	u32 i = 0;
-	int ret;
 	int nr_sig = 0;
 
-	rdsdebug("cq %p conn %p\n", cq, conn);
-	rds_ib_stats_inc(s_ib_tx_cq_call);
-	ret = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
-	if (ret)
-		rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
-
-	while (ib_poll_cq(cq, 1, &wc) > 0) {
-		rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
-			 (unsigned long long)wc.wr_id, wc.status,
-			 ib_wc_status_msg(wc.status), wc.byte_len,
-			 be32_to_cpu(wc.ex.imm_data));
-		rds_ib_stats_inc(s_ib_tx_cq_event);
-
-		if (wc.wr_id == RDS_IB_ACK_WR_ID) {
-			if (time_after(jiffies, ic->i_ack_queued + HZ/2))
-				rds_ib_stats_inc(s_ib_tx_stalled);
-			rds_ib_ack_send_complete(ic);
-			continue;
-		}
 
-		oldest = rds_ib_ring_oldest(&ic->i_send_ring);
+	rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
+		 (unsigned long long)wc->wr_id, wc->status,
+		 ib_wc_status_msg(wc->status), wc->byte_len,
+		 be32_to_cpu(wc->ex.imm_data));
+	rds_ib_stats_inc(s_ib_tx_cq_event);
 
-		completed = rds_ib_ring_completed(&ic->i_send_ring, wc.wr_id, oldest);
+	if (wc->wr_id == RDS_IB_ACK_WR_ID) {
+		if (time_after(jiffies, ic->i_ack_queued + HZ / 2))
+			rds_ib_stats_inc(s_ib_tx_stalled);
+		rds_ib_ack_send_complete(ic);
+		return;
+	}
 
-		for (i = 0; i < completed; i++) {
-			send = &ic->i_sends[oldest];
-			if (send->s_wr.send_flags & IB_SEND_SIGNALED)
-				nr_sig++;
+	oldest = rds_ib_ring_oldest(&ic->i_send_ring);
 
-			rm = rds_ib_send_unmap_op(ic, send, wc.status);
+	completed = rds_ib_ring_completed(&ic->i_send_ring,
+					  (wc->wr_id & ~RDS_IB_SEND_OP),
+					  oldest);
 
-			if (time_after(jiffies, send->s_queued + HZ/2))
-				rds_ib_stats_inc(s_ib_tx_stalled);
+	for (i = 0; i < completed; i++) {
+		send = &ic->i_sends[oldest];
+		if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+			nr_sig++;
 
-			if (send->s_op) {
-				if (send->s_op == rm->m_final_op) {
-					/* If anyone waited for this message to get flushed out, wake
-					 * them up now */
-					rds_message_unmapped(rm);
-				}
-				rds_message_put(rm);
-				send->s_op = NULL;
-			}
+		rm = rds_ib_send_unmap_op(ic, send, wc->status);
 
-			oldest = (oldest + 1) % ic->i_send_ring.w_nr;
-		}
+		if (time_after(jiffies, send->s_queued + HZ / 2))
+			rds_ib_stats_inc(s_ib_tx_stalled);
 
-		rds_ib_ring_free(&ic->i_send_ring, completed);
-		rds_ib_sub_signaled(ic, nr_sig);
-		nr_sig = 0;
-
-		if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
-		    test_bit(0, &conn->c_map_queued))
-			queue_delayed_work(rds_wq, &conn->c_send_w, 0);
-
-		/* We expect errors as the qp is drained during shutdown */
-		if (wc.status != IB_WC_SUCCESS && rds_conn_up(conn)) {
-			rds_ib_conn_error(conn, "send completion on %pI4 had status "
-					  "%u (%s), disconnecting and reconnecting\n",
-					  &conn->c_faddr, wc.status,
-					  ib_wc_status_msg(wc.status));
+		if (send->s_op) {
+			if (send->s_op == rm->m_final_op) {
+				/* If anyone waited for this message to get
+				 * flushed out, wake them up now
+				 */
+				rds_message_unmapped(rm);
+			}
+			rds_message_put(rm);
+			send->s_op = NULL;
 		}
+
+		oldest = (oldest + 1) % ic->i_send_ring.w_nr;
+	}
+
+	rds_ib_ring_free(&ic->i_send_ring, completed);
+	rds_ib_sub_signaled(ic, nr_sig);
+	nr_sig = 0;
+
+	if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
+	    test_bit(0, &conn->c_map_queued))
+		queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+
+	/* We expect errors as the qp is drained during shutdown */
+	if (wc->status != IB_WC_SUCCESS && rds_conn_up(conn)) {
+		rds_ib_conn_error(conn, "send completion on %pI4 had status %u (%s), disconnecting and reconnecting\n",
+				  &conn->c_faddr, wc->status,
+				  ib_wc_status_msg(wc->status));
 	}
 }
 
diff --git a/net/rds/ib_stats.c b/net/rds/ib_stats.c
index bdf6115..8c8b84f 100644
--- a/net/rds/ib_stats.c
+++ b/net/rds/ib_stats.c
@@ -43,7 +43,6 @@ static const char *const rds_ib_stat_names[] = {
 	"ib_connect_raced",
 	"ib_listen_closed_stale",
 	"s_ib_evt_handler_call",
-	"ib_tx_cq_call",
 	"ib_tasklet_call",
 	"ib_tx_cq_event",
 	"ib_tx_ring_full",
diff --git a/net/rds/send.c b/net/rds/send.c
index 9d8b52d..de930b9 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -431,6 +431,7 @@ over_batch:
 out:
 	return ret;
 }
+EXPORT_SYMBOL_GPL(rds_send_xmit);
 
 static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm)
 {
-- 
1.9.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v2 09/14] RDS: IB: handle rds_ibdev release case instead of crashing the kernel
  2015-09-30 17:24 [PATCH v2 00/14] RDS: connection scalability and performance improvements Santosh Shilimkar
                   ` (7 preceding siblings ...)
  2015-09-30 17:24 ` [PATCH v2 08/14] RDS: IB: split send completion handling and do batch ack Santosh Shilimkar
@ 2015-09-30 17:24 ` Santosh Shilimkar
  2015-09-30 17:24 ` [PATCH v2 10/14] RDS: IB: fix the rds_ib_fmr_wq kick call Santosh Shilimkar
                   ` (5 subsequent siblings)
  14 siblings, 0 replies; 19+ messages in thread
From: Santosh Shilimkar @ 2015-09-30 17:24 UTC (permalink / raw)
  To: netdev; +Cc: linux-rdma, davem, linux-kernel, ssantosh

From: Santosh Shilimkar <ssantosh@kernel.org>

Just in case we are still handling the QP receive completion while the
rds_ibdev is released, drop the connection instead of crashing the kernel.

Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org>
---
 net/rds/ib_cm.c | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 8f51d0d..2b2370e 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -285,7 +285,8 @@ static void rds_ib_tasklet_fn_recv(unsigned long data)
 	struct rds_ib_device *rds_ibdev = ic->rds_ibdev;
 	struct rds_ib_ack_state state;
 
-	BUG_ON(!rds_ibdev);
+	if (!rds_ibdev)
+		rds_conn_drop(conn);
 
 	rds_ib_stats_inc(s_ib_tasklet_call);
 
-- 
1.9.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v2 10/14] RDS: IB: fix the rds_ib_fmr_wq kick call
  2015-09-30 17:24 [PATCH v2 00/14] RDS: connection scalability and performance improvements Santosh Shilimkar
                   ` (8 preceding siblings ...)
  2015-09-30 17:24 ` [PATCH v2 09/14] RDS: IB: handle rds_ibdev release case instead of crashing the kernel Santosh Shilimkar
@ 2015-09-30 17:24 ` Santosh Shilimkar
  2015-09-30 17:24 ` [PATCH v2 11/14] RDS: IB: use already available pool handle from ibmr Santosh Shilimkar
                   ` (4 subsequent siblings)
  14 siblings, 0 replies; 19+ messages in thread
From: Santosh Shilimkar @ 2015-09-30 17:24 UTC (permalink / raw)
  To: netdev; +Cc: linux-rdma, davem, linux-kernel, ssantosh, Santosh Shilimkar

RDS IB mr pool has its own workqueue 'rds_ib_fmr_wq', so we need
to use queue_delayed_work() to kick the work. This was hurting
the performance since pool maintenance was less often triggered
from other path.

Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
---
 net/rds/ib_rdma.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index 872f523..b6644fa 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -319,7 +319,7 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
 	int err = 0, iter = 0;
 
 	if (atomic_read(&pool->dirty_count) >= pool->max_items / 10)
-		schedule_delayed_work(&pool->flush_worker, 10);
+		queue_delayed_work(rds_ib_fmr_wq, &pool->flush_worker, 10);
 
 	while (1) {
 		ibmr = rds_ib_reuse_fmr(pool);
-- 
1.9.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v2 11/14] RDS: IB: use already available pool handle from ibmr
  2015-09-30 17:24 [PATCH v2 00/14] RDS: connection scalability and performance improvements Santosh Shilimkar
                   ` (9 preceding siblings ...)
  2015-09-30 17:24 ` [PATCH v2 10/14] RDS: IB: fix the rds_ib_fmr_wq kick call Santosh Shilimkar
@ 2015-09-30 17:24 ` Santosh Shilimkar
  2015-09-30 17:24 ` [PATCH v2 12/14] RDS: IB: mark rds_ib_fmr_wq static Santosh Shilimkar
                   ` (3 subsequent siblings)
  14 siblings, 0 replies; 19+ messages in thread
From: Santosh Shilimkar @ 2015-09-30 17:24 UTC (permalink / raw)
  To: netdev; +Cc: linux-rdma, davem, linux-kernel, ssantosh, Santosh Shilimkar

rds_ib_mr already keeps the pool handle which it associates
with. Lets use that instead of round about way of fetching
it from rds_ib_device.

No functional change.

Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
---
 net/rds/ib_rdma.c | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index b6644fa..52d889a 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -522,8 +522,7 @@ static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
 
 	__rds_ib_teardown_mr(ibmr);
 	if (pinned) {
-		struct rds_ib_device *rds_ibdev = ibmr->device;
-		struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
+		struct rds_ib_mr_pool *pool = ibmr->pool;
 
 		atomic_sub(pinned, &pool->free_pinned);
 	}
@@ -717,8 +716,8 @@ static void rds_ib_mr_pool_flush_worker(struct work_struct *work)
 void rds_ib_free_mr(void *trans_private, int invalidate)
 {
 	struct rds_ib_mr *ibmr = trans_private;
+	struct rds_ib_mr_pool *pool = ibmr->pool;
 	struct rds_ib_device *rds_ibdev = ibmr->device;
-	struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
 
 	rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len);
 
-- 
1.9.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v2 12/14] RDS: IB: mark rds_ib_fmr_wq static
  2015-09-30 17:24 [PATCH v2 00/14] RDS: connection scalability and performance improvements Santosh Shilimkar
                   ` (10 preceding siblings ...)
  2015-09-30 17:24 ` [PATCH v2 11/14] RDS: IB: use already available pool handle from ibmr Santosh Shilimkar
@ 2015-09-30 17:24 ` Santosh Shilimkar
  2015-09-30 17:24 ` [PATCH v2 13/14] RDS: IB: use max_mr from HCA caps than max_fmr Santosh Shilimkar
                   ` (2 subsequent siblings)
  14 siblings, 0 replies; 19+ messages in thread
From: Santosh Shilimkar @ 2015-09-30 17:24 UTC (permalink / raw)
  To: netdev; +Cc: linux-rdma, davem, linux-kernel, ssantosh, Santosh Shilimkar

Fix below warning by marking rds_ib_fmr_wq static

net/rds/ib_rdma.c:87:25: warning: symbol 'rds_ib_fmr_wq' was not declared. Should it be static?

Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
---
 net/rds/ib_rdma.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index 52d889a..bb62024 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -83,7 +83,7 @@ struct rds_ib_mr_pool {
 	struct ib_fmr_attr	fmr_attr;
 };
 
-struct workqueue_struct *rds_ib_fmr_wq;
+static struct workqueue_struct *rds_ib_fmr_wq;
 
 int rds_ib_fmr_init(void)
 {
-- 
1.9.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v2 13/14] RDS: IB: use max_mr from HCA caps than max_fmr
  2015-09-30 17:24 [PATCH v2 00/14] RDS: connection scalability and performance improvements Santosh Shilimkar
                   ` (11 preceding siblings ...)
  2015-09-30 17:24 ` [PATCH v2 12/14] RDS: IB: mark rds_ib_fmr_wq static Santosh Shilimkar
@ 2015-09-30 17:24 ` Santosh Shilimkar
  2015-09-30 17:24 ` [PATCH v2 14/14] RDS: IB: split mr pool to improve 8K messages performance Santosh Shilimkar
  2015-10-01 16:19 ` [PATCH v2 00/14] RDS: connection scalability and performance improvements David Laight
  14 siblings, 0 replies; 19+ messages in thread
From: Santosh Shilimkar @ 2015-09-30 17:24 UTC (permalink / raw)
  To: netdev; +Cc: linux-rdma, davem, linux-kernel, ssantosh, Santosh Shilimkar

All HCA drivers seems to popullate max_mr caps and few of
them do both max_mr and max_fmr.

Hence update RDS code to make use of max_mr.

Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
---
 net/rds/ib.c | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/net/rds/ib.c b/net/rds/ib.c
index 2d3f2ab..883813a 100644
--- a/net/rds/ib.c
+++ b/net/rds/ib.c
@@ -148,8 +148,8 @@ static void rds_ib_add_one(struct ib_device *device)
 	rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE);
 
 	rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32;
-	rds_ibdev->max_fmrs = dev_attr->max_fmr ?
-			min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) :
+	rds_ibdev->max_fmrs = dev_attr->max_mr ?
+			min_t(unsigned int, dev_attr->max_mr, fmr_pool_size) :
 			fmr_pool_size;
 
 	rds_ibdev->max_initiator_depth = dev_attr->max_qp_init_rd_atom;
-- 
1.9.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH v2 14/14] RDS: IB: split mr pool to improve 8K messages performance
  2015-09-30 17:24 [PATCH v2 00/14] RDS: connection scalability and performance improvements Santosh Shilimkar
                   ` (12 preceding siblings ...)
  2015-09-30 17:24 ` [PATCH v2 13/14] RDS: IB: use max_mr from HCA caps than max_fmr Santosh Shilimkar
@ 2015-09-30 17:24 ` Santosh Shilimkar
  2015-10-01 16:19 ` [PATCH v2 00/14] RDS: connection scalability and performance improvements David Laight
  14 siblings, 0 replies; 19+ messages in thread
From: Santosh Shilimkar @ 2015-09-30 17:24 UTC (permalink / raw)
  To: netdev; +Cc: linux-rdma, davem, linux-kernel, ssantosh, Santosh Shilimkar

8K message sizes are pretty important usecase for RDS current
workloads so we make provison to have 8K mrs available from the pool.
Based on number of SG's in the RDS message, we pick a pool to use.

Also to make sure that we don't under utlise mrs when say 8k messages
are dominating which could lead to 8k pull being exhausted, we fall-back
to 1m pool till 8k pool recovers for use.

This helps to at least push ~55 kB/s bidirectional data which
is a nice improvement.

Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
---
 net/rds/ib.c       |  47 +++++++++++++++++--------
 net/rds/ib.h       |  43 ++++++++++++++++-------
 net/rds/ib_rdma.c  | 101 +++++++++++++++++++++++++++++++++++++----------------
 net/rds/ib_stats.c |  18 ++++++----
 4 files changed, 147 insertions(+), 62 deletions(-)

diff --git a/net/rds/ib.c b/net/rds/ib.c
index 883813a..a833ab7 100644
--- a/net/rds/ib.c
+++ b/net/rds/ib.c
@@ -43,14 +43,14 @@
 #include "rds.h"
 #include "ib.h"
 
-static unsigned int fmr_pool_size = RDS_FMR_POOL_SIZE;
-unsigned int fmr_message_size = RDS_FMR_SIZE + 1; /* +1 allows for unaligned MRs */
+unsigned int rds_ib_fmr_1m_pool_size = RDS_FMR_1M_POOL_SIZE;
+unsigned int rds_ib_fmr_8k_pool_size = RDS_FMR_8K_POOL_SIZE;
 unsigned int rds_ib_retry_count = RDS_IB_DEFAULT_RETRY_COUNT;
 
-module_param(fmr_pool_size, int, 0444);
-MODULE_PARM_DESC(fmr_pool_size, " Max number of fmr per HCA");
-module_param(fmr_message_size, int, 0444);
-MODULE_PARM_DESC(fmr_message_size, " Max size of a RDMA transfer");
+module_param(rds_ib_fmr_1m_pool_size, int, 0444);
+MODULE_PARM_DESC(rds_ib_fmr_1m_pool_size, " Max number of 1M fmr per HCA");
+module_param(rds_ib_fmr_8k_pool_size, int, 0444);
+MODULE_PARM_DESC(rds_ib_fmr_8k_pool_size, " Max number of 8K fmr per HCA");
 module_param(rds_ib_retry_count, int, 0444);
 MODULE_PARM_DESC(rds_ib_retry_count, " Number of hw retries before reporting an error");
 
@@ -97,8 +97,10 @@ static void rds_ib_dev_free(struct work_struct *work)
 	struct rds_ib_device *rds_ibdev = container_of(work,
 					struct rds_ib_device, free_work);
 
-	if (rds_ibdev->mr_pool)
-		rds_ib_destroy_mr_pool(rds_ibdev->mr_pool);
+	if (rds_ibdev->mr_8k_pool)
+		rds_ib_destroy_mr_pool(rds_ibdev->mr_8k_pool);
+	if (rds_ibdev->mr_1m_pool)
+		rds_ib_destroy_mr_pool(rds_ibdev->mr_1m_pool);
 	if (rds_ibdev->pd)
 		ib_dealloc_pd(rds_ibdev->pd);
 
@@ -148,9 +150,13 @@ static void rds_ib_add_one(struct ib_device *device)
 	rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE);
 
 	rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32;
-	rds_ibdev->max_fmrs = dev_attr->max_mr ?
-			min_t(unsigned int, dev_attr->max_mr, fmr_pool_size) :
-			fmr_pool_size;
+	rds_ibdev->max_1m_fmrs = dev_attr->max_mr ?
+		min_t(unsigned int, (dev_attr->max_mr / 2),
+		      rds_ib_fmr_1m_pool_size) : rds_ib_fmr_1m_pool_size;
+
+	rds_ibdev->max_8k_fmrs = dev_attr->max_mr ?
+		min_t(unsigned int, ((dev_attr->max_mr / 2) * RDS_MR_8K_SCALE),
+		      rds_ib_fmr_8k_pool_size) : rds_ib_fmr_8k_pool_size;
 
 	rds_ibdev->max_initiator_depth = dev_attr->max_qp_init_rd_atom;
 	rds_ibdev->max_responder_resources = dev_attr->max_qp_rd_atom;
@@ -162,12 +168,25 @@ static void rds_ib_add_one(struct ib_device *device)
 		goto put_dev;
 	}
 
-	rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
-	if (IS_ERR(rds_ibdev->mr_pool)) {
-		rds_ibdev->mr_pool = NULL;
+	rds_ibdev->mr_1m_pool =
+		rds_ib_create_mr_pool(rds_ibdev, RDS_IB_MR_1M_POOL);
+	if (IS_ERR(rds_ibdev->mr_1m_pool)) {
+		rds_ibdev->mr_1m_pool = NULL;
 		goto put_dev;
 	}
 
+	rds_ibdev->mr_8k_pool =
+		rds_ib_create_mr_pool(rds_ibdev, RDS_IB_MR_8K_POOL);
+	if (IS_ERR(rds_ibdev->mr_8k_pool)) {
+		rds_ibdev->mr_8k_pool = NULL;
+		goto put_dev;
+	}
+
+	rdsdebug("RDS/IB: max_mr = %d, max_wrs = %d, max_sge = %d, fmr_max_remaps = %d, max_1m_fmrs = %d, max_8k_fmrs = %d\n",
+		 dev_attr->max_fmr, rds_ibdev->max_wrs, rds_ibdev->max_sge,
+		 rds_ibdev->fmr_max_remaps, rds_ibdev->max_1m_fmrs,
+		 rds_ibdev->max_8k_fmrs);
+
 	INIT_LIST_HEAD(&rds_ibdev->ipaddr_list);
 	INIT_LIST_HEAD(&rds_ibdev->conn_list);
 
diff --git a/net/rds/ib.h b/net/rds/ib.h
index 3a8cd31..f17d095 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -9,8 +9,11 @@
 #include "rds.h"
 #include "rdma_transport.h"
 
-#define RDS_FMR_SIZE			256
-#define RDS_FMR_POOL_SIZE		8192
+#define RDS_FMR_1M_POOL_SIZE		(8192 / 2)
+#define RDS_FMR_1M_MSG_SIZE		256
+#define RDS_FMR_8K_MSG_SIZE		2
+#define RDS_MR_8K_SCALE			(256 / (RDS_FMR_8K_MSG_SIZE + 1))
+#define RDS_FMR_8K_POOL_SIZE		(RDS_MR_8K_SCALE * (8192 / 2))
 
 #define RDS_IB_MAX_SGE			8
 #define RDS_IB_RECV_SGE 		2
@@ -189,15 +192,23 @@ struct rds_ib_ipaddr {
 	struct rcu_head		rcu;
 };
 
+enum {
+	RDS_IB_MR_8K_POOL,
+	RDS_IB_MR_1M_POOL,
+};
+
 struct rds_ib_device {
 	struct list_head	list;
 	struct list_head	ipaddr_list;
 	struct list_head	conn_list;
 	struct ib_device	*dev;
 	struct ib_pd		*pd;
-	struct rds_ib_mr_pool	*mr_pool;
-	unsigned int		fmr_max_remaps;
 	unsigned int		max_fmrs;
+	struct rds_ib_mr_pool	*mr_1m_pool;
+	struct rds_ib_mr_pool   *mr_8k_pool;
+	unsigned int		fmr_max_remaps;
+	unsigned int		max_8k_fmrs;
+	unsigned int		max_1m_fmrs;
 	int			max_sge;
 	unsigned int		max_wrs;
 	unsigned int		max_initiator_depth;
@@ -239,12 +250,18 @@ struct rds_ib_statistics {
 	uint64_t	s_ib_ack_send_delayed;
 	uint64_t	s_ib_ack_send_piggybacked;
 	uint64_t	s_ib_ack_received;
-	uint64_t	s_ib_rdma_mr_alloc;
-	uint64_t	s_ib_rdma_mr_free;
-	uint64_t	s_ib_rdma_mr_used;
-	uint64_t	s_ib_rdma_mr_pool_flush;
-	uint64_t	s_ib_rdma_mr_pool_wait;
-	uint64_t	s_ib_rdma_mr_pool_depleted;
+	uint64_t	s_ib_rdma_mr_8k_alloc;
+	uint64_t	s_ib_rdma_mr_8k_free;
+	uint64_t	s_ib_rdma_mr_8k_used;
+	uint64_t	s_ib_rdma_mr_8k_pool_flush;
+	uint64_t	s_ib_rdma_mr_8k_pool_wait;
+	uint64_t	s_ib_rdma_mr_8k_pool_depleted;
+	uint64_t	s_ib_rdma_mr_1m_alloc;
+	uint64_t	s_ib_rdma_mr_1m_free;
+	uint64_t	s_ib_rdma_mr_1m_used;
+	uint64_t	s_ib_rdma_mr_1m_pool_flush;
+	uint64_t	s_ib_rdma_mr_1m_pool_wait;
+	uint64_t	s_ib_rdma_mr_1m_pool_depleted;
 	uint64_t	s_ib_atomic_cswp;
 	uint64_t	s_ib_atomic_fadd;
 };
@@ -296,7 +313,8 @@ struct rds_ib_device *rds_ib_get_client_data(struct ib_device *device);
 void rds_ib_dev_put(struct rds_ib_device *rds_ibdev);
 extern struct ib_client rds_ib_client;
 
-extern unsigned int fmr_message_size;
+extern unsigned int rds_ib_fmr_1m_pool_size;
+extern unsigned int rds_ib_fmr_8k_pool_size;
 extern unsigned int rds_ib_retry_count;
 
 extern spinlock_t ib_nodev_conns_lock;
@@ -326,7 +344,8 @@ int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr);
 void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn);
 void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn);
 void rds_ib_destroy_nodev_conns(void);
-struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *);
+struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_dev,
+					     int npages);
 void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo);
 void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *);
 void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index bb62024..a234074 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -65,6 +65,7 @@ struct rds_ib_mr {
  * Our own little FMR pool
  */
 struct rds_ib_mr_pool {
+	unsigned int            pool_type;
 	struct mutex		flush_lock;		/* serialize fmr invalidate */
 	struct delayed_work	flush_worker;		/* flush worker */
 
@@ -234,7 +235,8 @@ void rds_ib_destroy_nodev_conns(void)
 		rds_conn_destroy(ic->conn);
 }
 
-struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
+struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev,
+					     int pool_type)
 {
 	struct rds_ib_mr_pool *pool;
 
@@ -242,6 +244,7 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
 	if (!pool)
 		return ERR_PTR(-ENOMEM);
 
+	pool->pool_type = pool_type;
 	init_llist_head(&pool->free_list);
 	init_llist_head(&pool->drop_list);
 	init_llist_head(&pool->clean_list);
@@ -249,28 +252,30 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
 	init_waitqueue_head(&pool->flush_wait);
 	INIT_DELAYED_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
 
-	pool->fmr_attr.max_pages = fmr_message_size;
+	if (pool_type == RDS_IB_MR_1M_POOL) {
+		/* +1 allows for unaligned MRs */
+		pool->fmr_attr.max_pages = RDS_FMR_1M_MSG_SIZE + 1;
+		pool->max_items = RDS_FMR_1M_POOL_SIZE;
+	} else {
+		/* pool_type == RDS_IB_MR_8K_POOL */
+		pool->fmr_attr.max_pages = RDS_FMR_8K_MSG_SIZE + 1;
+		pool->max_items = RDS_FMR_8K_POOL_SIZE;
+	}
+
+	pool->max_free_pinned = pool->max_items * pool->fmr_attr.max_pages / 4;
 	pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
 	pool->fmr_attr.page_shift = PAGE_SHIFT;
-	pool->max_free_pinned = rds_ibdev->max_fmrs * fmr_message_size / 4;
-
-	/* We never allow more than max_items MRs to be allocated.
-	 * When we exceed more than max_items_soft, we start freeing
-	 * items more aggressively.
-	 * Make sure that max_items > max_items_soft > max_items / 2
-	 */
 	pool->max_items_soft = rds_ibdev->max_fmrs * 3 / 4;
-	pool->max_items = rds_ibdev->max_fmrs;
 
 	return pool;
 }
 
 void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo)
 {
-	struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
+	struct rds_ib_mr_pool *pool_1m = rds_ibdev->mr_1m_pool;
 
-	iinfo->rdma_mr_max = pool->max_items;
-	iinfo->rdma_mr_size = pool->fmr_attr.max_pages;
+	iinfo->rdma_mr_max = pool_1m->max_items;
+	iinfo->rdma_mr_size = pool_1m->fmr_attr.max_pages;
 }
 
 void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool)
@@ -312,15 +317,29 @@ static inline void wait_clean_list_grace(void)
 	}
 }
 
-static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
+static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev,
+					  int npages)
 {
-	struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
+	struct rds_ib_mr_pool *pool;
 	struct rds_ib_mr *ibmr = NULL;
 	int err = 0, iter = 0;
 
+	if (npages <= RDS_FMR_8K_MSG_SIZE)
+		pool = rds_ibdev->mr_8k_pool;
+	else
+		pool = rds_ibdev->mr_1m_pool;
+
 	if (atomic_read(&pool->dirty_count) >= pool->max_items / 10)
 		queue_delayed_work(rds_ib_fmr_wq, &pool->flush_worker, 10);
 
+	/* Switch pools if one of the pool is reaching upper limit */
+	if (atomic_read(&pool->dirty_count) >=  pool->max_items * 9 / 10) {
+		if (pool->pool_type == RDS_IB_MR_8K_POOL)
+			pool = rds_ibdev->mr_1m_pool;
+		else
+			pool = rds_ibdev->mr_8k_pool;
+	}
+
 	while (1) {
 		ibmr = rds_ib_reuse_fmr(pool);
 		if (ibmr)
@@ -341,12 +360,18 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
 		atomic_dec(&pool->item_count);
 
 		if (++iter > 2) {
-			rds_ib_stats_inc(s_ib_rdma_mr_pool_depleted);
+			if (pool->pool_type == RDS_IB_MR_8K_POOL)
+				rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_depleted);
+			else
+				rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_depleted);
 			return ERR_PTR(-EAGAIN);
 		}
 
 		/* We do have some empty MRs. Flush them out. */
-		rds_ib_stats_inc(s_ib_rdma_mr_pool_wait);
+		if (pool->pool_type == RDS_IB_MR_8K_POOL)
+			rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_wait);
+		else
+			rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_wait);
 		rds_ib_flush_mr_pool(pool, 0, &ibmr);
 		if (ibmr)
 			return ibmr;
@@ -371,7 +396,12 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
 		goto out_no_cigar;
 	}
 
-	rds_ib_stats_inc(s_ib_rdma_mr_alloc);
+	ibmr->pool = pool;
+	if (pool->pool_type == RDS_IB_MR_8K_POOL)
+		rds_ib_stats_inc(s_ib_rdma_mr_8k_alloc);
+	else
+		rds_ib_stats_inc(s_ib_rdma_mr_1m_alloc);
+
 	return ibmr;
 
 out_no_cigar:
@@ -427,7 +457,7 @@ static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibm
 	}
 
 	page_cnt += len >> PAGE_SHIFT;
-	if (page_cnt > fmr_message_size)
+	if (page_cnt > ibmr->pool->fmr_attr.max_pages)
 		return -EINVAL;
 
 	dma_pages = kmalloc_node(sizeof(u64) * page_cnt, GFP_ATOMIC,
@@ -459,7 +489,10 @@ static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibm
 	ibmr->sg_dma_len = sg_dma_len;
 	ibmr->remap_count++;
 
-	rds_ib_stats_inc(s_ib_rdma_mr_used);
+	if (ibmr->pool->pool_type == RDS_IB_MR_8K_POOL)
+		rds_ib_stats_inc(s_ib_rdma_mr_8k_used);
+	else
+		rds_ib_stats_inc(s_ib_rdma_mr_1m_used);
 	ret = 0;
 
 out:
@@ -591,7 +624,7 @@ static void list_to_llist_nodes(struct rds_ib_mr_pool *pool,
  * to free as many MRs as needed to get back to this limit.
  */
 static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
-			        int free_all, struct rds_ib_mr **ibmr_ret)
+				int free_all, struct rds_ib_mr **ibmr_ret)
 {
 	struct rds_ib_mr *ibmr, *next;
 	struct llist_node *clean_nodes;
@@ -602,11 +635,14 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
 	unsigned int nfreed = 0, dirty_to_clean = 0, free_goal;
 	int ret = 0;
 
-	rds_ib_stats_inc(s_ib_rdma_mr_pool_flush);
+	if (pool->pool_type == RDS_IB_MR_8K_POOL)
+		rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_flush);
+	else
+		rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_flush);
 
 	if (ibmr_ret) {
 		DEFINE_WAIT(wait);
-		while(!mutex_trylock(&pool->flush_lock)) {
+		while (!mutex_trylock(&pool->flush_lock)) {
 			ibmr = rds_ib_reuse_fmr(pool);
 			if (ibmr) {
 				*ibmr_ret = ibmr;
@@ -663,8 +699,12 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
 	list_for_each_entry_safe(ibmr, next, &unmap_list, unmap_list) {
 		unpinned += ibmr->sg_len;
 		__rds_ib_teardown_mr(ibmr);
-		if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) {
-			rds_ib_stats_inc(s_ib_rdma_mr_free);
+		if (nfreed < free_goal ||
+		    ibmr->remap_count >= pool->fmr_attr.max_maps) {
+			if (ibmr->pool->pool_type == RDS_IB_MR_8K_POOL)
+				rds_ib_stats_inc(s_ib_rdma_mr_8k_free);
+			else
+				rds_ib_stats_inc(s_ib_rdma_mr_1m_free);
 			list_del(&ibmr->unmap_list);
 			ib_dealloc_fmr(ibmr->fmr);
 			kfree(ibmr);
@@ -756,10 +796,11 @@ void rds_ib_flush_mrs(void)
 
 	down_read(&rds_ib_devices_lock);
 	list_for_each_entry(rds_ibdev, &rds_ib_devices, list) {
-		struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
+		if (rds_ibdev->mr_8k_pool)
+			rds_ib_flush_mr_pool(rds_ibdev->mr_8k_pool, 0, NULL);
 
-		if (pool)
-			rds_ib_flush_mr_pool(pool, 0, NULL);
+		if (rds_ibdev->mr_1m_pool)
+			rds_ib_flush_mr_pool(rds_ibdev->mr_1m_pool, 0, NULL);
 	}
 	up_read(&rds_ib_devices_lock);
 }
@@ -777,12 +818,12 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
 		goto out;
 	}
 
-	if (!rds_ibdev->mr_pool) {
+	if (!rds_ibdev->mr_8k_pool || !rds_ibdev->mr_1m_pool) {
 		ret = -ENODEV;
 		goto out;
 	}
 
-	ibmr = rds_ib_alloc_fmr(rds_ibdev);
+	ibmr = rds_ib_alloc_fmr(rds_ibdev, nents);
 	if (IS_ERR(ibmr)) {
 		rds_ib_dev_put(rds_ibdev);
 		return ibmr;
diff --git a/net/rds/ib_stats.c b/net/rds/ib_stats.c
index 8c8b84f..d77e044 100644
--- a/net/rds/ib_stats.c
+++ b/net/rds/ib_stats.c
@@ -61,12 +61,18 @@ static const char *const rds_ib_stat_names[] = {
 	"ib_ack_send_delayed",
 	"ib_ack_send_piggybacked",
 	"ib_ack_received",
-	"ib_rdma_mr_alloc",
-	"ib_rdma_mr_free",
-	"ib_rdma_mr_used",
-	"ib_rdma_mr_pool_flush",
-	"ib_rdma_mr_pool_wait",
-	"ib_rdma_mr_pool_depleted",
+	"ib_rdma_mr_8k_alloc",
+	"ib_rdma_mr_8k_free",
+	"ib_rdma_mr_8k_used",
+	"ib_rdma_mr_8k_pool_flush",
+	"ib_rdma_mr_8k_pool_wait",
+	"ib_rdma_mr_8k_pool_depleted",
+	"ib_rdma_mr_1m_alloc",
+	"ib_rdma_mr_1m_free",
+	"ib_rdma_mr_1m_used",
+	"ib_rdma_mr_1m_pool_flush",
+	"ib_rdma_mr_1m_pool_wait",
+	"ib_rdma_mr_1m_pool_depleted",
 	"ib_atomic_cswp",
 	"ib_atomic_fadd",
 };
-- 
1.9.1


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* RE: [PATCH v2 00/14] RDS: connection scalability and performance improvements
  2015-09-30 17:24 [PATCH v2 00/14] RDS: connection scalability and performance improvements Santosh Shilimkar
                   ` (13 preceding siblings ...)
  2015-09-30 17:24 ` [PATCH v2 14/14] RDS: IB: split mr pool to improve 8K messages performance Santosh Shilimkar
@ 2015-10-01 16:19 ` David Laight
  2015-10-01 19:00   ` santosh.shilimkar
  14 siblings, 1 reply; 19+ messages in thread
From: David Laight @ 2015-10-01 16:19 UTC (permalink / raw)
  To: 'Santosh Shilimkar', netdev
  Cc: linux-rdma, davem, linux-kernel, ssantosh

From: Santosh Shilimkar
> Sent: 30 September 2015 18:24
...
> This is being addressed by simply using per bucket rw lock which makes the
> locking simple and very efficient. The hash table size is still an issue and
> I plan to address it by using re-sizable hash tables as suggested on the list.

If the hash chains are short do you need the expense of a rw lock
for each chain?
A simple spinlock may be faster.

If you use the hash chain lock for the reference count on the hashed
objects you should be able to release the lock before locking the
object itself.

	David


^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH v2 00/14] RDS: connection scalability and performance improvements
  2015-10-01 16:19 ` [PATCH v2 00/14] RDS: connection scalability and performance improvements David Laight
@ 2015-10-01 19:00   ` santosh.shilimkar
  0 siblings, 0 replies; 19+ messages in thread
From: santosh.shilimkar @ 2015-10-01 19:00 UTC (permalink / raw)
  To: David Laight, netdev; +Cc: linux-rdma, davem, linux-kernel, ssantosh

On 10/1/15 9:19 AM, David Laight wrote:
> From: Santosh Shilimkar
>> Sent: 30 September 2015 18:24
> ...
>> This is being addressed by simply using per bucket rw lock which makes the
>> locking simple and very efficient. The hash table size is still an issue and
>> I plan to address it by using re-sizable hash tables as suggested on the list.
>
> If the hash chains are short do you need the expense of a rw lock
> for each chain?
Chains can be really long on larger systems with many databases.

> A simple spinlock may be faster.
>
> If you use the hash chain lock for the reference count on the hashed
> objects you should be able to release the lock before locking the
> object itself.
>
Because of the shared socket nature of RDS, the chain needs to be
protected for parallel accesses for add/removal/lookup. Hashing is
really used to get to the bucket which holds the hlist.

Just to give a bit of history, RDS bind code has evolved over
few years of time. It started with a rb tree and a global rw
lock which wasn't very efficient.Then it was converted to rcu
hlist with spin lock to make the look ups faster. But that
scheme as well exploded on larger systems with truckloads of
sockets with read/write lock failures because of excessive
contention. As high as almost ~25% of system load. Per bucket
lock actually solved most of those issues. Bucket or chain table
increase(1k to 8K) was actually relatively smaller gain though still 
helped to reduce the contention by almost to a nominal 1 or 2 %.

Am still getting my head around with rhashtable plumbing with
this usecase. Will CC you when I post the RFC patch for the
rhashtable conversion. Thanks for your comments so far.

Regards,
Santosh


^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH v2 05/14] RDS: defer the over_batch work to send worker
  2015-09-30 17:24 ` [PATCH v2 05/14] RDS: defer the over_batch work to send worker Santosh Shilimkar
@ 2015-10-05 10:30   ` David Miller
  2015-10-05 15:31     ` santosh shilimkar
  0 siblings, 1 reply; 19+ messages in thread
From: David Miller @ 2015-10-05 10:30 UTC (permalink / raw)
  To: santosh.shilimkar; +Cc: netdev, linux-rdma, linux-kernel, ssantosh

From: Santosh Shilimkar <santosh.shilimkar@oracle.com>
Date: Wed, 30 Sep 2015 13:24:24 -0400

> @@ -423,7 +423,9 @@ over_batch:
>  		     !list_empty(&conn->c_send_queue)) &&
>  		    send_gen == conn->c_send_gen) {
>  			rds_stats_inc(s_send_lock_queue_raced);
> -			goto restart;
> +			if (batch_count < 1024)
> +				goto restart;
> +			queue_delayed_work(rds_wq, &conn->c_send_w, 1);

Sorry, you can't just use a magic number like this.

You have to describe, in detail, exactly how this value was
choosen, derived, and tested to be effeective and in exactly
what environment those tests were done.

You must also use a mnenomic for this value rather than a
raw magic constant.

Thanks.

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH v2 05/14] RDS: defer the over_batch work to send worker
  2015-10-05 10:30   ` David Miller
@ 2015-10-05 15:31     ` santosh shilimkar
  0 siblings, 0 replies; 19+ messages in thread
From: santosh shilimkar @ 2015-10-05 15:31 UTC (permalink / raw)
  To: David Miller; +Cc: netdev, linux-rdma, linux-kernel, ssantosh

On 10/5/2015 3:30 AM, David Miller wrote:
> From: Santosh Shilimkar <santosh.shilimkar@oracle.com>
> Date: Wed, 30 Sep 2015 13:24:24 -0400
>
>> @@ -423,7 +423,9 @@ over_batch:
>>   		     !list_empty(&conn->c_send_queue)) &&
>>   		    send_gen == conn->c_send_gen) {
>>   			rds_stats_inc(s_send_lock_queue_raced);
>> -			goto restart;
>> +			if (batch_count < 1024)
>> +				goto restart;
>> +			queue_delayed_work(rds_wq, &conn->c_send_w, 1);
>
> Sorry, you can't just use a magic number like this.
>
> You have to describe, in detail, exactly how this value was
> choosen, derived, and tested to be effeective and in exactly
> what environment those tests were done.
>
> You must also use a mnenomic for this value rather than a
> raw magic constant.
>
Right. Infact If I look at it now again, there is already a batch
count parameter which is module parameter. I will use that in
this hunk as well as the other couple places and send an update.

Thanks for review !!

Regards,
Santosh

^ permalink raw reply	[flat|nested] 19+ messages in thread

end of thread, other threads:[~2015-10-05 15:31 UTC | newest]

Thread overview: 19+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2015-09-30 17:24 [PATCH v2 00/14] RDS: connection scalability and performance improvements Santosh Shilimkar
2015-09-30 17:24 ` [PATCH v2 01/14] RDS: use kfree_rcu in rds_ib_remove_ipaddr Santosh Shilimkar
2015-09-30 17:24 ` [PATCH v2 02/14] RDS: make socket bind/release locking scheme simple and more efficient Santosh Shilimkar
2015-09-30 17:24 ` [PATCH v2 03/14] RDS: fix rds_sock reference bug while doing bind Santosh Shilimkar
2015-09-30 17:24 ` [PATCH v2 04/14] RDS: Use per-bucket rw lock for bind hash-table Santosh Shilimkar
2015-09-30 17:24 ` [PATCH v2 05/14] RDS: defer the over_batch work to send worker Santosh Shilimkar
2015-10-05 10:30   ` David Miller
2015-10-05 15:31     ` santosh shilimkar
2015-09-30 17:24 ` [PATCH v2 06/14] RDS: use rds_send_xmit() state instead of RDS_LL_SEND_FULL Santosh Shilimkar
2015-09-30 17:24 ` [PATCH v2 07/14] RDS: IB: ack more receive completions to improve performance Santosh Shilimkar
2015-09-30 17:24 ` [PATCH v2 08/14] RDS: IB: split send completion handling and do batch ack Santosh Shilimkar
2015-09-30 17:24 ` [PATCH v2 09/14] RDS: IB: handle rds_ibdev release case instead of crashing the kernel Santosh Shilimkar
2015-09-30 17:24 ` [PATCH v2 10/14] RDS: IB: fix the rds_ib_fmr_wq kick call Santosh Shilimkar
2015-09-30 17:24 ` [PATCH v2 11/14] RDS: IB: use already available pool handle from ibmr Santosh Shilimkar
2015-09-30 17:24 ` [PATCH v2 12/14] RDS: IB: mark rds_ib_fmr_wq static Santosh Shilimkar
2015-09-30 17:24 ` [PATCH v2 13/14] RDS: IB: use max_mr from HCA caps than max_fmr Santosh Shilimkar
2015-09-30 17:24 ` [PATCH v2 14/14] RDS: IB: split mr pool to improve 8K messages performance Santosh Shilimkar
2015-10-01 16:19 ` [PATCH v2 00/14] RDS: connection scalability and performance improvements David Laight
2015-10-01 19:00   ` santosh.shilimkar

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).