All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH 1/5] SUNRPC: Small optimisation of client receive
@ 2016-06-14 19:04 Trond Myklebust
  2016-06-14 19:04 ` [PATCH 2/5] SUNRPC: Consolidate xs_tcp_data_ready and xs_data_ready Trond Myklebust
  0 siblings, 1 reply; 5+ messages in thread
From: Trond Myklebust @ 2016-06-14 19:04 UTC (permalink / raw)
  To: linux-nfs

Do not queue the client receive work if we're still processing.

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
---
 include/linux/sunrpc/xprtsock.h |  1 +
 net/sunrpc/xprtsock.c           | 44 ++++++++++++++++++++++++++++++-----------
 2 files changed, 34 insertions(+), 11 deletions(-)

diff --git a/include/linux/sunrpc/xprtsock.h b/include/linux/sunrpc/xprtsock.h
index 0ece4ba06f06..bef3fb0abb8f 100644
--- a/include/linux/sunrpc/xprtsock.h
+++ b/include/linux/sunrpc/xprtsock.h
@@ -80,6 +80,7 @@ struct sock_xprt {
 #define TCP_RPC_REPLY		(1UL << 6)
 
 #define XPRT_SOCK_CONNECTING	1U
+#define XPRT_SOCK_DATA_READY	(2)
 
 #endif /* __KERNEL__ */
 
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 2d3e0c42361e..2f2178027761 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -755,11 +755,19 @@ static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *s
 	sk->sk_error_report = transport->old_error_report;
 }
 
+static void xs_sock_reset_state_flags(struct rpc_xprt *xprt)
+{
+	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
+
+	clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
+}
+
 static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
 {
 	smp_mb__before_atomic();
 	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
 	clear_bit(XPRT_CLOSING, &xprt->state);
+	xs_sock_reset_state_flags(xprt);
 	smp_mb__after_atomic();
 }
 
@@ -962,10 +970,13 @@ static void xs_local_data_receive(struct sock_xprt *transport)
 		goto out;
 	for (;;) {
 		skb = skb_recv_datagram(sk, 0, 1, &err);
-		if (skb == NULL)
+		if (skb != NULL) {
+			xs_local_data_read_skb(&transport->xprt, sk, skb);
+			skb_free_datagram(sk, skb);
+			continue;
+		}
+		if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
 			break;
-		xs_local_data_read_skb(&transport->xprt, sk, skb);
-		skb_free_datagram(sk, skb);
 	}
 out:
 	mutex_unlock(&transport->recv_mutex);
@@ -1043,10 +1054,13 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
 		goto out;
 	for (;;) {
 		skb = skb_recv_datagram(sk, 0, 1, &err);
-		if (skb == NULL)
+		if (skb != NULL) {
+			xs_udp_data_read_skb(&transport->xprt, sk, skb);
+			skb_free_datagram(sk, skb);
+			continue;
+		}
+		if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
 			break;
-		xs_udp_data_read_skb(&transport->xprt, sk, skb);
-		skb_free_datagram(sk, skb);
 	}
 out:
 	mutex_unlock(&transport->recv_mutex);
@@ -1074,7 +1088,8 @@ static void xs_data_ready(struct sock *sk)
 	if (xprt != NULL) {
 		struct sock_xprt *transport = container_of(xprt,
 				struct sock_xprt, xprt);
-		queue_work(rpciod_workqueue, &transport->recv_worker);
+		if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
+			queue_work(rpciod_workqueue, &transport->recv_worker);
 	}
 	read_unlock_bh(&sk->sk_callback_lock);
 }
@@ -1474,10 +1489,15 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
 	for (;;) {
 		lock_sock(sk);
 		read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
-		release_sock(sk);
-		if (read <= 0)
-			break;
-		total += read;
+		if (read <= 0) {
+			clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
+			release_sock(sk);
+			if (!test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
+				break;
+		} else {
+			release_sock(sk);
+			total += read;
+		}
 		rd_desc.count = 65536;
 	}
 out:
@@ -1508,6 +1528,8 @@ static void xs_tcp_data_ready(struct sock *sk)
 	if (!(xprt = xprt_from_sock(sk)))
 		goto out;
 	transport = container_of(xprt, struct sock_xprt, xprt);
+	if (test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
+		goto out;
 
 	/* Any data means we had a useful conversation, so
 	 * the we don't need to delay the next reconnect
-- 
2.5.5


^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH 2/5] SUNRPC: Consolidate xs_tcp_data_ready and xs_data_ready
  2016-06-14 19:04 [PATCH 1/5] SUNRPC: Small optimisation of client receive Trond Myklebust
@ 2016-06-14 19:04 ` Trond Myklebust
  2016-06-14 19:04   ` [PATCH 3/5] SUNRPC: RPC transport queue must be low latency Trond Myklebust
  0 siblings, 1 reply; 5+ messages in thread
From: Trond Myklebust @ 2016-06-14 19:04 UTC (permalink / raw)
  To: linux-nfs

The only difference between the two at this point is the reset of
the connection timeout, and since everyone expect tcp ignore that value,
we can just throw it into the generic function.

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
---
 net/sunrpc/xprtsock.c | 38 +++++++-------------------------------
 1 file changed, 7 insertions(+), 31 deletions(-)

diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 2f2178027761..62b4f5a2a331 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1088,6 +1088,12 @@ static void xs_data_ready(struct sock *sk)
 	if (xprt != NULL) {
 		struct sock_xprt *transport = container_of(xprt,
 				struct sock_xprt, xprt);
+		transport->old_data_ready(sk);
+		/* Any data means we had a useful conversation, so
+		 * then we don't need to delay the next reconnect
+		 */
+		if (xprt->reestablish_timeout)
+			xprt->reestablish_timeout = 0;
 		if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
 			queue_work(rpciod_workqueue, &transport->recv_worker);
 	}
@@ -1513,36 +1519,6 @@ static void xs_tcp_data_receive_workfn(struct work_struct *work)
 }
 
 /**
- * xs_tcp_data_ready - "data ready" callback for TCP sockets
- * @sk: socket with data to read
- *
- */
-static void xs_tcp_data_ready(struct sock *sk)
-{
-	struct sock_xprt *transport;
-	struct rpc_xprt *xprt;
-
-	dprintk("RPC:       xs_tcp_data_ready...\n");
-
-	read_lock_bh(&sk->sk_callback_lock);
-	if (!(xprt = xprt_from_sock(sk)))
-		goto out;
-	transport = container_of(xprt, struct sock_xprt, xprt);
-	if (test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
-		goto out;
-
-	/* Any data means we had a useful conversation, so
-	 * the we don't need to delay the next reconnect
-	 */
-	if (xprt->reestablish_timeout)
-		xprt->reestablish_timeout = 0;
-	queue_work(rpciod_workqueue, &transport->recv_worker);
-
-out:
-	read_unlock_bh(&sk->sk_callback_lock);
-}
-
-/**
  * xs_tcp_state_change - callback to handle TCP socket state changes
  * @sk: socket whose state has changed
  *
@@ -2263,7 +2239,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 		xs_save_old_callbacks(transport, sk);
 
 		sk->sk_user_data = xprt;
-		sk->sk_data_ready = xs_tcp_data_ready;
+		sk->sk_data_ready = xs_data_ready;
 		sk->sk_state_change = xs_tcp_state_change;
 		sk->sk_write_space = xs_tcp_write_space;
 		sock_set_flag(sk, SOCK_FASYNC);
-- 
2.5.5


^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH 3/5] SUNRPC: RPC transport queue must be low latency
  2016-06-14 19:04 ` [PATCH 2/5] SUNRPC: Consolidate xs_tcp_data_ready and xs_data_ready Trond Myklebust
@ 2016-06-14 19:04   ` Trond Myklebust
  2016-06-14 19:04     ` [PATCH 4/5] SUNRPC: Reduce latency when send queue is congested Trond Myklebust
  0 siblings, 1 reply; 5+ messages in thread
From: Trond Myklebust @ 2016-06-14 19:04 UTC (permalink / raw)
  To: linux-nfs

rpciod can easily get congested due to the long list of queued rpc_tasks.
Having the receive queue wait in turn for those tasks to complete can
therefore be a bottleneck.

Address the problem by separating the workqueues into:
- rpciod: manages rpc_tasks
- xprtiod: manages transport related work.

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
---
 include/linux/sunrpc/sched.h |  1 +
 net/sunrpc/sched.c           | 24 ++++++++++++++++++++----
 net/sunrpc/xprt.c            |  8 ++++----
 net/sunrpc/xprtsock.c        |  6 +++---
 4 files changed, 28 insertions(+), 11 deletions(-)

diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 05a1809c44d9..ef780b3b5e31 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -247,6 +247,7 @@ void		rpc_show_tasks(struct net *);
 int		rpc_init_mempool(void);
 void		rpc_destroy_mempool(void);
 extern struct workqueue_struct *rpciod_workqueue;
+extern struct workqueue_struct *xprtiod_workqueue;
 void		rpc_prepare_task(struct rpc_task *task);
 
 static inline int rpc_wait_for_completion_task(struct rpc_task *task)
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index fcfd48d263f6..a9f786247ffb 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -54,7 +54,8 @@ static struct rpc_wait_queue delay_queue;
 /*
  * rpciod-related stuff
  */
-struct workqueue_struct *rpciod_workqueue;
+struct workqueue_struct *rpciod_workqueue __read_mostly;
+struct workqueue_struct *xprtiod_workqueue __read_mostly;
 
 /*
  * Disable the timer for a given RPC task. Should be called with
@@ -1071,10 +1072,22 @@ static int rpciod_start(void)
 	 * Create the rpciod thread and wait for it to start.
 	 */
 	dprintk("RPC:       creating workqueue rpciod\n");
-	/* Note: highpri because network receive is latency sensitive */
-	wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0);
+	wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0);
+	if (!wq)
+		goto out_failed;
 	rpciod_workqueue = wq;
-	return rpciod_workqueue != NULL;
+	/* Note: highpri because network receive is latency sensitive */
+	wq = alloc_workqueue("xprtiod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0);
+	if (!wq)
+		goto free_rpciod;
+	xprtiod_workqueue = wq;
+	return 1;
+free_rpciod:
+	wq = rpciod_workqueue;
+	rpciod_workqueue = NULL;
+	destroy_workqueue(wq);
+out_failed:
+	return 0;
 }
 
 static void rpciod_stop(void)
@@ -1088,6 +1101,9 @@ static void rpciod_stop(void)
 	wq = rpciod_workqueue;
 	rpciod_workqueue = NULL;
 	destroy_workqueue(wq);
+	wq = xprtiod_workqueue;
+	xprtiod_workqueue = NULL;
+	destroy_workqueue(wq);
 }
 
 void
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 216a1385718a..71df082b84a9 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -220,7 +220,7 @@ static void xprt_clear_locked(struct rpc_xprt *xprt)
 		clear_bit(XPRT_LOCKED, &xprt->state);
 		smp_mb__after_atomic();
 	} else
-		queue_work(rpciod_workqueue, &xprt->task_cleanup);
+		queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 }
 
 /*
@@ -645,7 +645,7 @@ void xprt_force_disconnect(struct rpc_xprt *xprt)
 	set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 	/* Try to schedule an autoclose RPC call */
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
-		queue_work(rpciod_workqueue, &xprt->task_cleanup);
+		queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 	xprt_wake_pending_tasks(xprt, -EAGAIN);
 	spin_unlock_bh(&xprt->transport_lock);
 }
@@ -672,7 +672,7 @@ void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
 	set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 	/* Try to schedule an autoclose RPC call */
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
-		queue_work(rpciod_workqueue, &xprt->task_cleanup);
+		queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 	xprt_wake_pending_tasks(xprt, -EAGAIN);
 out:
 	spin_unlock_bh(&xprt->transport_lock);
@@ -689,7 +689,7 @@ xprt_init_autodisconnect(unsigned long data)
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 		goto out_abort;
 	spin_unlock(&xprt->transport_lock);
-	queue_work(rpciod_workqueue, &xprt->task_cleanup);
+	queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 	return;
 out_abort:
 	spin_unlock(&xprt->transport_lock);
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 62b4f5a2a331..646170d0cb86 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1095,7 +1095,7 @@ static void xs_data_ready(struct sock *sk)
 		if (xprt->reestablish_timeout)
 			xprt->reestablish_timeout = 0;
 		if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
-			queue_work(rpciod_workqueue, &transport->recv_worker);
+			queue_work(xprtiod_workqueue, &transport->recv_worker);
 	}
 	read_unlock_bh(&sk->sk_callback_lock);
 }
@@ -2378,7 +2378,7 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
 		/* Start by resetting any existing state */
 		xs_reset_transport(transport);
 
-		queue_delayed_work(rpciod_workqueue,
+		queue_delayed_work(xprtiod_workqueue,
 				   &transport->connect_worker,
 				   xprt->reestablish_timeout);
 		xprt->reestablish_timeout <<= 1;
@@ -2388,7 +2388,7 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
 			xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
 	} else {
 		dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
-		queue_delayed_work(rpciod_workqueue,
+		queue_delayed_work(xprtiod_workqueue,
 				   &transport->connect_worker, 0);
 	}
 }
-- 
2.5.5


^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH 4/5] SUNRPC: Reduce latency when send queue is congested
  2016-06-14 19:04   ` [PATCH 3/5] SUNRPC: RPC transport queue must be low latency Trond Myklebust
@ 2016-06-14 19:04     ` Trond Myklebust
  2016-06-14 19:04       ` [PATCH 5/5] SUNRPC: Fix suspicious enobufs issues Trond Myklebust
  0 siblings, 1 reply; 5+ messages in thread
From: Trond Myklebust @ 2016-06-14 19:04 UTC (permalink / raw)
  To: linux-nfs

Use the low latency transport workqueue to process the task that is
next in line on the xprt->sending queue.

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
---
 include/linux/sunrpc/sched.h |  4 ++++
 net/sunrpc/sched.c           | 43 +++++++++++++++++++++++++++++++++----------
 net/sunrpc/xprt.c            |  6 ++++--
 3 files changed, 41 insertions(+), 12 deletions(-)

diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index ef780b3b5e31..817af0b4385e 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -230,6 +230,10 @@ void		rpc_wake_up_queued_task(struct rpc_wait_queue *,
 					struct rpc_task *);
 void		rpc_wake_up(struct rpc_wait_queue *);
 struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *);
+struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
+					struct rpc_wait_queue *,
+					bool (*)(struct rpc_task *, void *),
+					void *);
 struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *,
 					bool (*)(struct rpc_task *, void *),
 					void *);
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index a9f786247ffb..9ae588511aaf 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -330,7 +330,8 @@ EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
  * lockless RPC_IS_QUEUED() test) before we've had a chance to test
  * the RPC_TASK_RUNNING flag.
  */
-static void rpc_make_runnable(struct rpc_task *task)
+static void rpc_make_runnable(struct workqueue_struct *wq,
+		struct rpc_task *task)
 {
 	bool need_wakeup = !rpc_test_and_set_running(task);
 
@@ -339,7 +340,7 @@ static void rpc_make_runnable(struct rpc_task *task)
 		return;
 	if (RPC_IS_ASYNC(task)) {
 		INIT_WORK(&task->u.tk_work, rpc_async_schedule);
-		queue_work(rpciod_workqueue, &task->u.tk_work);
+		queue_work(wq, &task->u.tk_work);
 	} else
 		wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
 }
@@ -408,13 +409,16 @@ void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
 EXPORT_SYMBOL_GPL(rpc_sleep_on_priority);
 
 /**
- * __rpc_do_wake_up_task - wake up a single rpc_task
+ * __rpc_do_wake_up_task_on_wq - wake up a single rpc_task
+ * @wq: workqueue on which to run task
  * @queue: wait queue
  * @task: task to be woken up
  *
  * Caller must hold queue->lock, and have cleared the task queued flag.
  */
-static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task)
+static void __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq,
+		struct rpc_wait_queue *queue,
+		struct rpc_task *task)
 {
 	dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
 			task->tk_pid, jiffies);
@@ -429,7 +433,7 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task
 
 	__rpc_remove_wait_queue(queue, task);
 
-	rpc_make_runnable(task);
+	rpc_make_runnable(wq, task);
 
 	dprintk("RPC:       __rpc_wake_up_task done\n");
 }
@@ -437,16 +441,25 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task
 /*
  * Wake up a queued task while the queue lock is being held
  */
-static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
+static void rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq,
+		struct rpc_wait_queue *queue, struct rpc_task *task)
 {
 	if (RPC_IS_QUEUED(task)) {
 		smp_rmb();
 		if (task->tk_waitqueue == queue)
-			__rpc_do_wake_up_task(queue, task);
+			__rpc_do_wake_up_task_on_wq(wq, queue, task);
 	}
 }
 
 /*
+ * Wake up a queued task while the queue lock is being held
+ */
+static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
+{
+	rpc_wake_up_task_on_wq_queue_locked(rpciod_workqueue, queue, task);
+}
+
+/*
  * Wake up a task on a specific queue
  */
 void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
@@ -519,7 +532,8 @@ static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue)
 /*
  * Wake up the first task on the wait queue.
  */
-struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
+struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
+		struct rpc_wait_queue *queue,
 		bool (*func)(struct rpc_task *, void *), void *data)
 {
 	struct rpc_task	*task = NULL;
@@ -530,7 +544,7 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
 	task = __rpc_find_next_queued(queue);
 	if (task != NULL) {
 		if (func(task, data))
-			rpc_wake_up_task_queue_locked(queue, task);
+			rpc_wake_up_task_on_wq_queue_locked(wq, queue, task);
 		else
 			task = NULL;
 	}
@@ -538,6 +552,15 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
 
 	return task;
 }
+
+/*
+ * Wake up the first task on the wait queue.
+ */
+struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
+		bool (*func)(struct rpc_task *, void *), void *data)
+{
+	return rpc_wake_up_first_on_wq(rpciod_workqueue, queue, func, data);
+}
 EXPORT_SYMBOL_GPL(rpc_wake_up_first);
 
 static bool rpc_wake_up_next_func(struct rpc_task *task, void *data)
@@ -815,7 +838,7 @@ void rpc_execute(struct rpc_task *task)
 	bool is_async = RPC_IS_ASYNC(task);
 
 	rpc_set_active(task);
-	rpc_make_runnable(task);
+	rpc_make_runnable(rpciod_workqueue, task);
 	if (!is_async)
 		__rpc_execute(task);
 }
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 71df082b84a9..8313960cac52 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -295,7 +295,8 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 		return;
 
-	if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_func, xprt))
+	if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
+				__xprt_lock_write_func, xprt))
 		return;
 	xprt_clear_locked(xprt);
 }
@@ -324,7 +325,8 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
 		return;
 	if (RPCXPRT_CONGESTED(xprt))
 		goto out_unlock;
-	if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_cong_func, xprt))
+	if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
+				__xprt_lock_write_cong_func, xprt))
 		return;
 out_unlock:
 	xprt_clear_locked(xprt);
-- 
2.5.5


^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH 5/5] SUNRPC: Fix suspicious enobufs issues.
  2016-06-14 19:04     ` [PATCH 4/5] SUNRPC: Reduce latency when send queue is congested Trond Myklebust
@ 2016-06-14 19:04       ` Trond Myklebust
  0 siblings, 0 replies; 5+ messages in thread
From: Trond Myklebust @ 2016-06-14 19:04 UTC (permalink / raw)
  To: linux-nfs

The current test is racy when dealing with fast NICs.

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
---
 net/sunrpc/xprtsock.c | 31 +++++++++++++++++++++++++------
 1 file changed, 25 insertions(+), 6 deletions(-)

diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 646170d0cb86..6b3efeb3edc5 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -642,6 +642,7 @@ static int xs_tcp_send_request(struct rpc_task *task)
 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 	struct xdr_buf *xdr = &req->rq_snd_buf;
 	bool zerocopy = true;
+	bool vm_wait = false;
 	int status;
 	int sent;
 
@@ -677,15 +678,33 @@ static int xs_tcp_send_request(struct rpc_task *task)
 			return 0;
 		}
 
+		WARN_ON_ONCE(sent == 0 && status == 0);
+
+		if (status == -EAGAIN ) {
+			/*
+			 * Return EAGAIN if we're sure we're hitting the
+			 * socket send buffer limits.
+			 */
+			if (test_bit(SOCK_NOSPACE, &transport->sock->flags))
+				break;
+			/*
+			 * Did we hit a memory allocation failure?
+			 */
+			if (sent == 0) {
+				status = -ENOBUFS;
+				if (vm_wait)
+					break;
+				/* Retry, knowing now that we're below the
+				 * socket send buffer limit
+				 */
+				vm_wait = true;
+			}
+			continue;
+		}
 		if (status < 0)
 			break;
-		if (sent == 0) {
-			status = -EAGAIN;
-			break;
-		}
+		vm_wait = false;
 	}
-	if (status == -EAGAIN && sk_stream_is_writeable(transport->inet))
-		status = -ENOBUFS;
 
 	switch (status) {
 	case -ENOTSOCK:
-- 
2.5.5


^ permalink raw reply related	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2016-06-14 19:04 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-06-14 19:04 [PATCH 1/5] SUNRPC: Small optimisation of client receive Trond Myklebust
2016-06-14 19:04 ` [PATCH 2/5] SUNRPC: Consolidate xs_tcp_data_ready and xs_data_ready Trond Myklebust
2016-06-14 19:04   ` [PATCH 3/5] SUNRPC: RPC transport queue must be low latency Trond Myklebust
2016-06-14 19:04     ` [PATCH 4/5] SUNRPC: Reduce latency when send queue is congested Trond Myklebust
2016-06-14 19:04       ` [PATCH 5/5] SUNRPC: Fix suspicious enobufs issues Trond Myklebust

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.