All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH 00/27] Convert RPC client transmission to a queued model
@ 2018-09-03 15:29 Trond Myklebust
  2018-09-03 15:29 ` [PATCH 01/27] SUNRPC: Clean up initialisation of the struct rpc_rqst Trond Myklebust
  2018-09-03 17:41 ` [PATCH 00/27] Convert RPC client transmission to a queued model Chuck Lever
  0 siblings, 2 replies; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

For historical reasons, the RPC client is heavily serialised during the
process of transmitting a request by the XPRT_LOCK. A request is
required to take that lock before it can start XDR encoding, and it is
required to hold it until it is done transmitting. In essence the lock
protects the following functions:

- Stream based transport connect/reconnect
- RPCSEC_GSS encoding of the RPC message
- Transmission of a single RPC message

The following patch set assumes that we do not need to do much to
improve performance of the connect/reconnect case, as that is supposed
to be a rare occurrence.

The set looks at dealing with RPCSEC_GSS issues by removing serialisation
while encoding, and simply assuming that if we detect after grabbing the
XPRT_LOCK that we're about to transmit a message with a sequence number
that has fallen outside the window allowed by RFC2203, then we can
abort the transmission of that message, and schedule it for re-encoding.
Since window sizes are typically expected to lie above 100 messages or
so, we expect these cases where we miss the window to be rare, in
general.

Finally, we look at trying to avoid the requirement that every request
must go through the process of being woken up to grab the XPRT_LOCK in
order to transmit itself by allowing a request that currently holds the
XPRT_LOCK to grab other requests from an ordered queue, and to transmit
them too. The bulk of the changes in this patchset are dedicated to
providing this functionality.

Trond Myklebust (27):
  SUNRPC: Clean up initialisation of the struct rpc_rqst
  SUNRPC: If there is no reply expected, bail early from call_decode
  SUNRPC: The transmitted message must lie in the RPCSEC window of
    validity
  SUNRPC: Simplify identification of when the message send/receive is
    complete
  SUNRPC: Avoid holding locks across the XDR encoding of the RPC message
  SUNRPC: Rename TCP receive-specific state variables
  SUNRPC: Move reset of TCP state variables into the reconnect code
  SUNRPC: Add socket transmit queue offset tracking
  SUNRPC: Simplify dealing with aborted partially transmitted messages
  SUNRPC: Refactor the transport request pinning
  SUNRPC: Add a helper to wake up a sleeping rpc_task and set its status
  SUNRPC: Don't wake queued RPC calls multiple times in xprt_transmit
  SUNRPC: Rename xprt->recv_lock to xprt->queue_lock
  SUNRPC: Refactor xprt_transmit() to remove the reply queue code
  SUNRPC: Refactor xprt_transmit() to remove wait for reply code
  SUNRPC: Minor cleanup for call_transmit()
  SUNRPC: Distinguish between the slot allocation list and receive queue
  NFS: Add a transmission queue for RPC requests
  SUNRPC: Refactor RPC call encoding
  SUNRPC: Treat the task and request as separate in the
    xprt_ops->send_request()
  SUNRPC: Don't reset the request 'bytes_sent' counter when releasing
    XPRT_LOCK
  SUNRPC: Simplify xprt_prepare_transmit()
  SUNRPC: Move RPC retransmission stat counter to xprt_transmit()
  SUNRPC: Fix up the back channel transmit
  SUNRPC: Allow calls to xprt_transmit() to drain the entire transmit
    queue
  SUNRPC: Queue the request for transmission immediately after encoding
  SUNRPC: Convert the xprt->sending queue back to an ordinary wait queue

 include/linux/sunrpc/auth.h                |   2 +
 include/linux/sunrpc/auth_gss.h            |   1 +
 include/linux/sunrpc/sched.h               |   7 +-
 include/linux/sunrpc/xprt.h                |  23 +-
 include/linux/sunrpc/xprtsock.h            |  23 +-
 include/trace/events/sunrpc.h              |  10 +-
 net/sunrpc/auth.c                          |  10 +
 net/sunrpc/auth_gss/auth_gss.c             |  41 ++
 net/sunrpc/backchannel_rqst.c              |   3 +-
 net/sunrpc/clnt.c                          | 139 +++---
 net/sunrpc/sched.c                         |  63 ++-
 net/sunrpc/svcsock.c                       |   6 +-
 net/sunrpc/xprt.c                          | 503 +++++++++++++--------
 net/sunrpc/xprtrdma/backchannel.c          |   3 +-
 net/sunrpc/xprtrdma/rpc_rdma.c             |  10 +-
 net/sunrpc/xprtrdma/svc_rdma_backchannel.c |   7 +-
 net/sunrpc/xprtrdma/transport.c            |   5 +-
 net/sunrpc/xprtsock.c                      | 327 +++++++-------
 18 files changed, 728 insertions(+), 455 deletions(-)

-- 
2.17.1

^ permalink raw reply	[flat|nested] 39+ messages in thread

* [PATCH 01/27] SUNRPC: Clean up initialisation of the struct rpc_rqst
  2018-09-03 15:29 [PATCH 00/27] Convert RPC client transmission to a queued model Trond Myklebust
@ 2018-09-03 15:29 ` Trond Myklebust
  2018-09-03 15:29   ` [PATCH 02/27] SUNRPC: If there is no reply expected, bail early from call_decode Trond Myklebust
  2018-09-03 17:41 ` [PATCH 00/27] Convert RPC client transmission to a queued model Chuck Lever
  1 sibling, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

Move the initialisation back into xprt.c.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 include/linux/sunrpc/xprt.h |  1 -
 net/sunrpc/clnt.c           |  1 -
 net/sunrpc/xprt.c           | 91 +++++++++++++++++++++----------------
 3 files changed, 51 insertions(+), 42 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 336fd1a19cca..3d80524e92d6 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -325,7 +325,6 @@ struct xprt_class {
 struct rpc_xprt		*xprt_create_transport(struct xprt_create *args);
 void			xprt_connect(struct rpc_task *task);
 void			xprt_reserve(struct rpc_task *task);
-void			xprt_request_init(struct rpc_task *task);
 void			xprt_retry_reserve(struct rpc_task *task);
 int			xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task);
 int			xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 8ea2f5fadd96..bc9d020bf71f 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1558,7 +1558,6 @@ call_reserveresult(struct rpc_task *task)
 	task->tk_status = 0;
 	if (status >= 0) {
 		if (task->tk_rqstp) {
-			xprt_request_init(task);
 			task->tk_action = call_refresh;
 			return;
 		}
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index a8db2e3f8904..6aa09edc9567 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1250,6 +1250,55 @@ void xprt_free(struct rpc_xprt *xprt)
 }
 EXPORT_SYMBOL_GPL(xprt_free);
 
+static __be32
+xprt_alloc_xid(struct rpc_xprt *xprt)
+{
+	__be32 xid;
+
+	spin_lock(&xprt->reserve_lock);
+	xid = (__force __be32)xprt->xid++;
+	spin_unlock(&xprt->reserve_lock);
+	return xid;
+}
+
+static void
+xprt_init_xid(struct rpc_xprt *xprt)
+{
+	xprt->xid = prandom_u32();
+}
+
+static void
+xprt_request_init(struct rpc_task *task)
+{
+	struct rpc_xprt *xprt = task->tk_xprt;
+	struct rpc_rqst	*req = task->tk_rqstp;
+
+	INIT_LIST_HEAD(&req->rq_list);
+	req->rq_timeout = task->tk_client->cl_timeout->to_initval;
+	req->rq_task	= task;
+	req->rq_xprt    = xprt;
+	req->rq_buffer  = NULL;
+	req->rq_xid	= xprt_alloc_xid(xprt);
+	req->rq_connect_cookie = xprt->connect_cookie - 1;
+	req->rq_bytes_sent = 0;
+	req->rq_snd_buf.len = 0;
+	req->rq_snd_buf.buflen = 0;
+	req->rq_rcv_buf.len = 0;
+	req->rq_rcv_buf.buflen = 0;
+	req->rq_release_snd_buf = NULL;
+	xprt_reset_majortimeo(req);
+	dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
+			req, ntohl(req->rq_xid));
+}
+
+static void
+xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+	xprt->ops->alloc_slot(xprt, task);
+	if (task->tk_rqstp != NULL)
+		xprt_request_init(task);
+}
+
 /**
  * xprt_reserve - allocate an RPC request slot
  * @task: RPC task requesting a slot allocation
@@ -1269,7 +1318,7 @@ void xprt_reserve(struct rpc_task *task)
 	task->tk_timeout = 0;
 	task->tk_status = -EAGAIN;
 	if (!xprt_throttle_congested(xprt, task))
-		xprt->ops->alloc_slot(xprt, task);
+		xprt_do_reserve(xprt, task);
 }
 
 /**
@@ -1291,45 +1340,7 @@ void xprt_retry_reserve(struct rpc_task *task)
 
 	task->tk_timeout = 0;
 	task->tk_status = -EAGAIN;
-	xprt->ops->alloc_slot(xprt, task);
-}
-
-static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
-{
-	__be32 xid;
-
-	spin_lock(&xprt->reserve_lock);
-	xid = (__force __be32)xprt->xid++;
-	spin_unlock(&xprt->reserve_lock);
-	return xid;
-}
-
-static inline void xprt_init_xid(struct rpc_xprt *xprt)
-{
-	xprt->xid = prandom_u32();
-}
-
-void xprt_request_init(struct rpc_task *task)
-{
-	struct rpc_xprt *xprt = task->tk_xprt;
-	struct rpc_rqst	*req = task->tk_rqstp;
-
-	INIT_LIST_HEAD(&req->rq_list);
-	req->rq_timeout = task->tk_client->cl_timeout->to_initval;
-	req->rq_task	= task;
-	req->rq_xprt    = xprt;
-	req->rq_buffer  = NULL;
-	req->rq_xid	= xprt_alloc_xid(xprt);
-	req->rq_connect_cookie = xprt->connect_cookie - 1;
-	req->rq_bytes_sent = 0;
-	req->rq_snd_buf.len = 0;
-	req->rq_snd_buf.buflen = 0;
-	req->rq_rcv_buf.len = 0;
-	req->rq_rcv_buf.buflen = 0;
-	req->rq_release_snd_buf = NULL;
-	xprt_reset_majortimeo(req);
-	dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
-			req, ntohl(req->rq_xid));
+	xprt_do_reserve(xprt, task);
 }
 
 /**
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 02/27] SUNRPC: If there is no reply expected, bail early from call_decode
  2018-09-03 15:29 ` [PATCH 01/27] SUNRPC: Clean up initialisation of the struct rpc_rqst Trond Myklebust
@ 2018-09-03 15:29   ` Trond Myklebust
  2018-09-03 15:29     ` [PATCH 03/27] SUNRPC: The transmitted message must lie in the RPCSEC window of validity Trond Myklebust
  0 siblings, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 net/sunrpc/clnt.c | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index bc9d020bf71f..4f1ec8013332 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -2260,6 +2260,11 @@ call_decode(struct rpc_task *task)
 
 	dprint_status(task);
 
+	if (!decode) {
+		task->tk_action = rpc_exit_task;
+		return;
+	}
+
 	if (task->tk_flags & RPC_CALL_MAJORSEEN) {
 		if (clnt->cl_chatty) {
 			printk(KERN_NOTICE "%s: server %s OK\n",
@@ -2297,13 +2302,11 @@ call_decode(struct rpc_task *task)
 			goto out_retry;
 		return;
 	}
-
 	task->tk_action = rpc_exit_task;
 
-	if (decode) {
-		task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
-						      task->tk_msg.rpc_resp);
-	}
+	task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
+					      task->tk_msg.rpc_resp);
+
 	dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
 			task->tk_status);
 	return;
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 03/27] SUNRPC: The transmitted message must lie in the RPCSEC window of validity
  2018-09-03 15:29   ` [PATCH 02/27] SUNRPC: If there is no reply expected, bail early from call_decode Trond Myklebust
@ 2018-09-03 15:29     ` Trond Myklebust
  2018-09-03 15:29       ` [PATCH 04/27] SUNRPC: Simplify identification of when the message send/receive is complete Trond Myklebust
  2018-09-04 15:46       ` [PATCH 03/27] SUNRPC: The transmitted message must lie in the RPCSEC window of validity J. Bruce Fields
  0 siblings, 2 replies; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

If a message has been encoded using RPCSEC_GSS, the server is
maintaining a window of sequence numbers that it considers valid.
The client should normally be tracking that window, and needs to
verify that the sequence number used by the message being transmitted
still lies inside the window of validity.

So far, we've been able to assume this condition would be realised
automatically, since the server has been encoding the message only
after taking the socket lock. Once we change that condition, we
will need the explicit check.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 include/linux/sunrpc/auth.h     |  2 ++
 include/linux/sunrpc/auth_gss.h |  1 +
 net/sunrpc/auth.c               | 10 ++++++++
 net/sunrpc/auth_gss/auth_gss.c  | 41 +++++++++++++++++++++++++++++++++
 net/sunrpc/clnt.c               |  3 +++
 net/sunrpc/xprt.c               |  7 ++++++
 6 files changed, 64 insertions(+)

diff --git a/include/linux/sunrpc/auth.h b/include/linux/sunrpc/auth.h
index 58a6765c1c5e..2c97a3933ef9 100644
--- a/include/linux/sunrpc/auth.h
+++ b/include/linux/sunrpc/auth.h
@@ -157,6 +157,7 @@ struct rpc_credops {
 	int			(*crkey_timeout)(struct rpc_cred *);
 	bool			(*crkey_to_expire)(struct rpc_cred *);
 	char *			(*crstringify_acceptor)(struct rpc_cred *);
+	bool			(*crneed_reencode)(struct rpc_task *);
 };
 
 extern const struct rpc_authops	authunix_ops;
@@ -192,6 +193,7 @@ __be32 *		rpcauth_marshcred(struct rpc_task *, __be32 *);
 __be32 *		rpcauth_checkverf(struct rpc_task *, __be32 *);
 int			rpcauth_wrap_req(struct rpc_task *task, kxdreproc_t encode, void *rqstp, __be32 *data, void *obj);
 int			rpcauth_unwrap_resp(struct rpc_task *task, kxdrdproc_t decode, void *rqstp, __be32 *data, void *obj);
+bool			rpcauth_xmit_need_reencode(struct rpc_task *task);
 int			rpcauth_refreshcred(struct rpc_task *);
 void			rpcauth_invalcred(struct rpc_task *);
 int			rpcauth_uptodatecred(struct rpc_task *);
diff --git a/include/linux/sunrpc/auth_gss.h b/include/linux/sunrpc/auth_gss.h
index 0c9eac351aab..30427b729070 100644
--- a/include/linux/sunrpc/auth_gss.h
+++ b/include/linux/sunrpc/auth_gss.h
@@ -70,6 +70,7 @@ struct gss_cl_ctx {
 	refcount_t		count;
 	enum rpc_gss_proc	gc_proc;
 	u32			gc_seq;
+	u32			gc_seq_xmit;
 	spinlock_t		gc_seq_lock;
 	struct gss_ctx		*gc_gss_ctx;
 	struct xdr_netobj	gc_wire_ctx;
diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c
index 305ecea92170..59df5cdba0ac 100644
--- a/net/sunrpc/auth.c
+++ b/net/sunrpc/auth.c
@@ -817,6 +817,16 @@ rpcauth_unwrap_resp(struct rpc_task *task, kxdrdproc_t decode, void *rqstp,
 	return rpcauth_unwrap_req_decode(decode, rqstp, data, obj);
 }
 
+bool
+rpcauth_xmit_need_reencode(struct rpc_task *task)
+{
+	struct rpc_cred *cred = task->tk_rqstp->rq_cred;
+
+	if (!cred || !cred->cr_ops->crneed_reencode)
+		return false;
+	return cred->cr_ops->crneed_reencode(task);
+}
+
 int
 rpcauth_refreshcred(struct rpc_task *task)
 {
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 21c0aa0a0d1d..c898a7c75e84 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -1984,6 +1984,46 @@ gss_unwrap_req_decode(kxdrdproc_t decode, struct rpc_rqst *rqstp,
 	return decode(rqstp, &xdr, obj);
 }
 
+static bool
+gss_seq_is_newer(u32 new, u32 old)
+{
+	return (s32)(new - old) > 0;
+}
+
+static bool
+gss_xmit_need_reencode(struct rpc_task *task)
+{
+	struct rpc_rqst *req = task->tk_rqstp;
+	struct rpc_cred *cred = req->rq_cred;
+	struct gss_cl_ctx *ctx = gss_cred_get_ctx(cred);
+	u32 win, seq_xmit;
+	bool ret = true;
+
+	if (!ctx)
+		return true;
+
+	if (gss_seq_is_newer(req->rq_seqno, READ_ONCE(ctx->gc_seq)))
+		goto out;
+
+	seq_xmit = READ_ONCE(ctx->gc_seq_xmit);
+	while (gss_seq_is_newer(req->rq_seqno, seq_xmit)) {
+		u32 tmp = seq_xmit;
+
+		seq_xmit = cmpxchg(&ctx->gc_seq_xmit, tmp, req->rq_seqno);
+		if (seq_xmit == tmp) {
+			ret = false;
+			goto out;
+		}
+	}
+
+	win = ctx->gc_win;
+	if (win > 0)
+		ret = !gss_seq_is_newer(req->rq_seqno, seq_xmit - win);
+out:
+	gss_put_ctx(ctx);
+	return ret;
+}
+
 static int
 gss_unwrap_resp(struct rpc_task *task,
 		kxdrdproc_t decode, void *rqstp, __be32 *p, void *obj)
@@ -2052,6 +2092,7 @@ static const struct rpc_credops gss_credops = {
 	.crunwrap_resp		= gss_unwrap_resp,
 	.crkey_timeout		= gss_key_timeout,
 	.crstringify_acceptor	= gss_stringify_acceptor,
+	.crneed_reencode	= gss_xmit_need_reencode,
 };
 
 static const struct rpc_credops gss_nullops = {
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 4f1ec8013332..d41b5ac1d4e8 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -2184,6 +2184,9 @@ call_status(struct rpc_task *task)
 		/* shutdown or soft timeout */
 		rpc_exit(task, status);
 		break;
+	case -EBADMSG:
+		task->tk_action = call_transmit;
+		break;
 	default:
 		if (clnt->cl_chatty)
 			printk("%s: RPC call returned error %d\n",
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 6aa09edc9567..3973e10ea2bd 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1014,6 +1014,13 @@ void xprt_transmit(struct rpc_task *task)
 	dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
 
 	if (!req->rq_reply_bytes_recvd) {
+
+		/* Verify that our message lies in the RPCSEC_GSS window */
+		if (!req->rq_bytes_sent && rpcauth_xmit_need_reencode(task)) {
+			task->tk_status = -EBADMSG;
+			return;
+		}
+
 		if (list_empty(&req->rq_list) && rpc_reply_expected(task)) {
 			/*
 			 * Add to the list only if we're expecting a reply
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 04/27] SUNRPC: Simplify identification of when the message send/receive is complete
  2018-09-03 15:29     ` [PATCH 03/27] SUNRPC: The transmitted message must lie in the RPCSEC window of validity Trond Myklebust
@ 2018-09-03 15:29       ` Trond Myklebust
  2018-09-03 15:29         ` [PATCH 05/27] SUNRPC: Avoid holding locks across the XDR encoding of the RPC message Trond Myklebust
  2018-09-03 17:15         ` [PATCH 04/27] SUNRPC: Simplify identification of when the message send/receive is complete Chuck Lever
  2018-09-04 15:46       ` [PATCH 03/27] SUNRPC: The transmitted message must lie in the RPCSEC window of validity J. Bruce Fields
  1 sibling, 2 replies; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

Add states to indicate that the message send and receive are not yet
complete.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 include/linux/sunrpc/sched.h |  6 ++++--
 net/sunrpc/clnt.c            | 19 +++++++------------
 net/sunrpc/xprt.c            | 17 ++++++++++++++---
 3 files changed, 25 insertions(+), 17 deletions(-)

diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 592653becd91..9e655df70131 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -140,8 +140,10 @@ struct rpc_task_setup {
 #define RPC_TASK_RUNNING	0
 #define RPC_TASK_QUEUED		1
 #define RPC_TASK_ACTIVE		2
-#define RPC_TASK_MSG_RECV	3
-#define RPC_TASK_MSG_RECV_WAIT	4
+#define RPC_TASK_NEED_XMIT	3
+#define RPC_TASK_NEED_RECV	4
+#define RPC_TASK_MSG_RECV	5
+#define RPC_TASK_MSG_RECV_WAIT	6
 
 #define RPC_IS_RUNNING(t)	test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
 #define rpc_set_running(t)	set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index d41b5ac1d4e8..e5ac35e803ad 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1156,6 +1156,7 @@ struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
 	 */
 	xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
 			xbufp->tail[0].iov_len;
+	set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
 
 	task->tk_action = call_bc_transmit;
 	atomic_inc(&task->tk_count);
@@ -1720,17 +1721,10 @@ call_allocate(struct rpc_task *task)
 	rpc_exit(task, -ERESTARTSYS);
 }
 
-static inline int
+static int
 rpc_task_need_encode(struct rpc_task *task)
 {
-	return task->tk_rqstp->rq_snd_buf.len == 0;
-}
-
-static inline void
-rpc_task_force_reencode(struct rpc_task *task)
-{
-	task->tk_rqstp->rq_snd_buf.len = 0;
-	task->tk_rqstp->rq_bytes_sent = 0;
+	return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0;
 }
 
 /*
@@ -1765,6 +1759,8 @@ rpc_xdr_encode(struct rpc_task *task)
 
 	task->tk_status = rpcauth_wrap_req(task, encode, req, p,
 			task->tk_msg.rpc_argp);
+	if (task->tk_status == 0)
+		set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
 }
 
 /*
@@ -1999,7 +1995,6 @@ call_transmit_status(struct rpc_task *task)
 	 */
 	if (task->tk_status == 0) {
 		xprt_end_transmit(task);
-		rpc_task_force_reencode(task);
 		return;
 	}
 
@@ -2010,7 +2005,6 @@ call_transmit_status(struct rpc_task *task)
 	default:
 		dprint_status(task);
 		xprt_end_transmit(task);
-		rpc_task_force_reencode(task);
 		break;
 		/*
 		 * Special cases: if we've been waiting on the
@@ -2038,7 +2032,7 @@ call_transmit_status(struct rpc_task *task)
 	case -EADDRINUSE:
 	case -ENOTCONN:
 	case -EPIPE:
-		rpc_task_force_reencode(task);
+		break;
 	}
 }
 
@@ -2185,6 +2179,7 @@ call_status(struct rpc_task *task)
 		rpc_exit(task, status);
 		break;
 	case -EBADMSG:
+		clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
 		task->tk_action = call_transmit;
 		break;
 	default:
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 3973e10ea2bd..45d580cd93ac 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -936,10 +936,18 @@ void xprt_complete_rqst(struct rpc_task *task, int copied)
 	/* req->rq_reply_bytes_recvd */
 	smp_wmb();
 	req->rq_reply_bytes_recvd = copied;
+	clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
 	rpc_wake_up_queued_task(&xprt->pending, task);
 }
 EXPORT_SYMBOL_GPL(xprt_complete_rqst);
 
+static bool
+xprt_request_data_received(struct rpc_task *task)
+{
+	return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
+		task->tk_rqstp->rq_reply_bytes_recvd != 0;
+}
+
 static void xprt_timer(struct rpc_task *task)
 {
 	struct rpc_rqst *req = task->tk_rqstp;
@@ -1031,12 +1039,13 @@ void xprt_transmit(struct rpc_task *task)
 			/* Add request to the receive list */
 			spin_lock(&xprt->recv_lock);
 			list_add_tail(&req->rq_list, &xprt->recv);
+			set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
 			spin_unlock(&xprt->recv_lock);
 			xprt_reset_majortimeo(req);
 			/* Turn off autodisconnect */
 			del_singleshot_timer_sync(&xprt->timer);
 		}
-	} else if (!req->rq_bytes_sent)
+	} else if (xprt_request_data_received(task) && !req->rq_bytes_sent)
 		return;
 
 	connect_cookie = xprt->connect_cookie;
@@ -1046,9 +1055,11 @@ void xprt_transmit(struct rpc_task *task)
 		task->tk_status = status;
 		return;
 	}
+
 	xprt_inject_disconnect(xprt);
 
 	dprintk("RPC: %5u xmit complete\n", task->tk_pid);
+	clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
 	task->tk_flags |= RPC_TASK_SENT;
 	spin_lock_bh(&xprt->transport_lock);
 
@@ -1062,14 +1073,14 @@ void xprt_transmit(struct rpc_task *task)
 	spin_unlock_bh(&xprt->transport_lock);
 
 	req->rq_connect_cookie = connect_cookie;
-	if (rpc_reply_expected(task) && !READ_ONCE(req->rq_reply_bytes_recvd)) {
+	if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
 		/*
 		 * Sleep on the pending queue if we're expecting a reply.
 		 * The spinlock ensures atomicity between the test of
 		 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
 		 */
 		spin_lock(&xprt->recv_lock);
-		if (!req->rq_reply_bytes_recvd) {
+		if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
 			rpc_sleep_on(&xprt->pending, task, xprt_timer);
 			/*
 			 * Send an extra queue wakeup call if the
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 05/27] SUNRPC: Avoid holding locks across the XDR encoding of the RPC message
  2018-09-03 15:29       ` [PATCH 04/27] SUNRPC: Simplify identification of when the message send/receive is complete Trond Myklebust
@ 2018-09-03 15:29         ` Trond Myklebust
  2018-09-03 15:29           ` [PATCH 06/27] SUNRPC: Rename TCP receive-specific state variables Trond Myklebust
  2018-09-03 17:11           ` [PATCH 05/27] SUNRPC: Avoid holding locks across the XDR encoding of the RPC message Chuck Lever
  2018-09-03 17:15         ` [PATCH 04/27] SUNRPC: Simplify identification of when the message send/receive is complete Chuck Lever
  1 sibling, 2 replies; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

Currently, we grab the socket bit lock before we allow the message
to be XDR encoded. That significantly slows down the transmission
rate, since we serialise on a potentially blocking operation.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 net/sunrpc/clnt.c | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index e5ac35e803ad..66ec61347716 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1949,9 +1949,6 @@ call_transmit(struct rpc_task *task)
 	task->tk_action = call_status;
 	if (task->tk_status < 0)
 		return;
-	if (!xprt_prepare_transmit(task))
-		return;
-	task->tk_action = call_transmit_status;
 	/* Encode here so that rpcsec_gss can use correct sequence number. */
 	if (rpc_task_need_encode(task)) {
 		rpc_xdr_encode(task);
@@ -1965,8 +1962,11 @@ call_transmit(struct rpc_task *task)
 			return;
 		}
 	}
+	if (!xprt_prepare_transmit(task))
+		return;
+	task->tk_action = call_transmit_status;
 	xprt_transmit(task);
-	if (task->tk_status < 0)
+	if (task->tk_status < 0) {
 		return;
 	if (is_retrans)
 		task->tk_client->cl_stats->rpcretrans++;
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 06/27] SUNRPC: Rename TCP receive-specific state variables
  2018-09-03 15:29         ` [PATCH 05/27] SUNRPC: Avoid holding locks across the XDR encoding of the RPC message Trond Myklebust
@ 2018-09-03 15:29           ` Trond Myklebust
  2018-09-03 15:29             ` [PATCH 07/27] SUNRPC: Move reset of TCP state variables into the reconnect code Trond Myklebust
  2018-09-03 17:11           ` [PATCH 05/27] SUNRPC: Avoid holding locks across the XDR encoding of the RPC message Chuck Lever
  1 sibling, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

Since we will want to introduce similar TCP state variables for the
transmission of requests, let's rename the existing ones to label
that they are for the receive side.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 include/linux/sunrpc/xprtsock.h |  16 +--
 include/trace/events/sunrpc.h   |  10 +-
 net/sunrpc/xprtsock.c           | 178 ++++++++++++++++----------------
 3 files changed, 103 insertions(+), 101 deletions(-)

diff --git a/include/linux/sunrpc/xprtsock.h b/include/linux/sunrpc/xprtsock.h
index ae0f99b9b965..90d5ca8e65f4 100644
--- a/include/linux/sunrpc/xprtsock.h
+++ b/include/linux/sunrpc/xprtsock.h
@@ -30,15 +30,17 @@ struct sock_xprt {
 	/*
 	 * State of TCP reply receive
 	 */
-	__be32			tcp_fraghdr,
-				tcp_xid,
-				tcp_calldir;
+	struct {
+		__be32		fraghdr,
+				xid,
+				calldir;
 
-	u32			tcp_offset,
-				tcp_reclen;
+		u32		offset,
+				len;
 
-	unsigned long		tcp_copied,
-				tcp_flags;
+		unsigned long	copied,
+				flags;
+	} recv;
 
 	/*
 	 * Connection of transports
diff --git a/include/trace/events/sunrpc.h b/include/trace/events/sunrpc.h
index bbb08a3ef5cc..0aa347194e0f 100644
--- a/include/trace/events/sunrpc.h
+++ b/include/trace/events/sunrpc.h
@@ -525,11 +525,11 @@ TRACE_EVENT(xs_tcp_data_recv,
 	TP_fast_assign(
 		__assign_str(addr, xs->xprt.address_strings[RPC_DISPLAY_ADDR]);
 		__assign_str(port, xs->xprt.address_strings[RPC_DISPLAY_PORT]);
-		__entry->xid = be32_to_cpu(xs->tcp_xid);
-		__entry->flags = xs->tcp_flags;
-		__entry->copied = xs->tcp_copied;
-		__entry->reclen = xs->tcp_reclen;
-		__entry->offset = xs->tcp_offset;
+		__entry->xid = be32_to_cpu(xs->recv.xid);
+		__entry->flags = xs->recv.flags;
+		__entry->copied = xs->recv.copied;
+		__entry->reclen = xs->recv.len;
+		__entry->offset = xs->recv.offset;
 	),
 
 	TP_printk("peer=[%s]:%s xid=0x%08x flags=%s copied=%lu reclen=%u offset=%lu",
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 6b7539c0466e..cd7d093721ae 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1169,42 +1169,42 @@ static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_rea
 	size_t len, used;
 	char *p;
 
-	p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
-	len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
+	p = ((char *) &transport->recv.fraghdr) + transport->recv.offset;
+	len = sizeof(transport->recv.fraghdr) - transport->recv.offset;
 	used = xdr_skb_read_bits(desc, p, len);
-	transport->tcp_offset += used;
+	transport->recv.offset += used;
 	if (used != len)
 		return;
 
-	transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
-	if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
-		transport->tcp_flags |= TCP_RCV_LAST_FRAG;
+	transport->recv.len = ntohl(transport->recv.fraghdr);
+	if (transport->recv.len & RPC_LAST_STREAM_FRAGMENT)
+		transport->recv.flags |= TCP_RCV_LAST_FRAG;
 	else
-		transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
-	transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
+		transport->recv.flags &= ~TCP_RCV_LAST_FRAG;
+	transport->recv.len &= RPC_FRAGMENT_SIZE_MASK;
 
-	transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
-	transport->tcp_offset = 0;
+	transport->recv.flags &= ~TCP_RCV_COPY_FRAGHDR;
+	transport->recv.offset = 0;
 
 	/* Sanity check of the record length */
-	if (unlikely(transport->tcp_reclen < 8)) {
+	if (unlikely(transport->recv.len < 8)) {
 		dprintk("RPC:       invalid TCP record fragment length\n");
 		xs_tcp_force_close(xprt);
 		return;
 	}
 	dprintk("RPC:       reading TCP record fragment of length %d\n",
-			transport->tcp_reclen);
+			transport->recv.len);
 }
 
 static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
 {
-	if (transport->tcp_offset == transport->tcp_reclen) {
-		transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
-		transport->tcp_offset = 0;
-		if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
-			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
-			transport->tcp_flags |= TCP_RCV_COPY_XID;
-			transport->tcp_copied = 0;
+	if (transport->recv.offset == transport->recv.len) {
+		transport->recv.flags |= TCP_RCV_COPY_FRAGHDR;
+		transport->recv.offset = 0;
+		if (transport->recv.flags & TCP_RCV_LAST_FRAG) {
+			transport->recv.flags &= ~TCP_RCV_COPY_DATA;
+			transport->recv.flags |= TCP_RCV_COPY_XID;
+			transport->recv.copied = 0;
 		}
 	}
 }
@@ -1214,20 +1214,20 @@ static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_r
 	size_t len, used;
 	char *p;
 
-	len = sizeof(transport->tcp_xid) - transport->tcp_offset;
+	len = sizeof(transport->recv.xid) - transport->recv.offset;
 	dprintk("RPC:       reading XID (%zu bytes)\n", len);
-	p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
+	p = ((char *) &transport->recv.xid) + transport->recv.offset;
 	used = xdr_skb_read_bits(desc, p, len);
-	transport->tcp_offset += used;
+	transport->recv.offset += used;
 	if (used != len)
 		return;
-	transport->tcp_flags &= ~TCP_RCV_COPY_XID;
-	transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
-	transport->tcp_copied = 4;
+	transport->recv.flags &= ~TCP_RCV_COPY_XID;
+	transport->recv.flags |= TCP_RCV_READ_CALLDIR;
+	transport->recv.copied = 4;
 	dprintk("RPC:       reading %s XID %08x\n",
-			(transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
+			(transport->recv.flags & TCP_RPC_REPLY) ? "reply for"
 							      : "request with",
-			ntohl(transport->tcp_xid));
+			ntohl(transport->recv.xid));
 	xs_tcp_check_fraghdr(transport);
 }
 
@@ -1239,34 +1239,34 @@ static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
 	char *p;
 
 	/*
-	 * We want transport->tcp_offset to be 8 at the end of this routine
+	 * We want transport->recv.offset to be 8 at the end of this routine
 	 * (4 bytes for the xid and 4 bytes for the call/reply flag).
 	 * When this function is called for the first time,
-	 * transport->tcp_offset is 4 (after having already read the xid).
+	 * transport->recv.offset is 4 (after having already read the xid).
 	 */
-	offset = transport->tcp_offset - sizeof(transport->tcp_xid);
-	len = sizeof(transport->tcp_calldir) - offset;
+	offset = transport->recv.offset - sizeof(transport->recv.xid);
+	len = sizeof(transport->recv.calldir) - offset;
 	dprintk("RPC:       reading CALL/REPLY flag (%zu bytes)\n", len);
-	p = ((char *) &transport->tcp_calldir) + offset;
+	p = ((char *) &transport->recv.calldir) + offset;
 	used = xdr_skb_read_bits(desc, p, len);
-	transport->tcp_offset += used;
+	transport->recv.offset += used;
 	if (used != len)
 		return;
-	transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
+	transport->recv.flags &= ~TCP_RCV_READ_CALLDIR;
 	/*
 	 * We don't yet have the XDR buffer, so we will write the calldir
 	 * out after we get the buffer from the 'struct rpc_rqst'
 	 */
-	switch (ntohl(transport->tcp_calldir)) {
+	switch (ntohl(transport->recv.calldir)) {
 	case RPC_REPLY:
-		transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
-		transport->tcp_flags |= TCP_RCV_COPY_DATA;
-		transport->tcp_flags |= TCP_RPC_REPLY;
+		transport->recv.flags |= TCP_RCV_COPY_CALLDIR;
+		transport->recv.flags |= TCP_RCV_COPY_DATA;
+		transport->recv.flags |= TCP_RPC_REPLY;
 		break;
 	case RPC_CALL:
-		transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
-		transport->tcp_flags |= TCP_RCV_COPY_DATA;
-		transport->tcp_flags &= ~TCP_RPC_REPLY;
+		transport->recv.flags |= TCP_RCV_COPY_CALLDIR;
+		transport->recv.flags |= TCP_RCV_COPY_DATA;
+		transport->recv.flags &= ~TCP_RPC_REPLY;
 		break;
 	default:
 		dprintk("RPC:       invalid request message type\n");
@@ -1287,21 +1287,21 @@ static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
 
 	rcvbuf = &req->rq_private_buf;
 
-	if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
+	if (transport->recv.flags & TCP_RCV_COPY_CALLDIR) {
 		/*
 		 * Save the RPC direction in the XDR buffer
 		 */
-		memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
-			&transport->tcp_calldir,
-			sizeof(transport->tcp_calldir));
-		transport->tcp_copied += sizeof(transport->tcp_calldir);
-		transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
+		memcpy(rcvbuf->head[0].iov_base + transport->recv.copied,
+			&transport->recv.calldir,
+			sizeof(transport->recv.calldir));
+		transport->recv.copied += sizeof(transport->recv.calldir);
+		transport->recv.flags &= ~TCP_RCV_COPY_CALLDIR;
 	}
 
 	len = desc->count;
-	if (len > transport->tcp_reclen - transport->tcp_offset)
-		desc->count = transport->tcp_reclen - transport->tcp_offset;
-	r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
+	if (len > transport->recv.len - transport->recv.offset)
+		desc->count = transport->recv.len - transport->recv.offset;
+	r = xdr_partial_copy_from_skb(rcvbuf, transport->recv.copied,
 					  desc, xdr_skb_read_bits);
 
 	if (desc->count) {
@@ -1314,31 +1314,31 @@ static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
 		 * Any remaining data from this record will
 		 * be discarded.
 		 */
-		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
+		transport->recv.flags &= ~TCP_RCV_COPY_DATA;
 		dprintk("RPC:       XID %08x truncated request\n",
-				ntohl(transport->tcp_xid));
-		dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
-				"tcp_offset = %u, tcp_reclen = %u\n",
-				xprt, transport->tcp_copied,
-				transport->tcp_offset, transport->tcp_reclen);
+				ntohl(transport->recv.xid));
+		dprintk("RPC:       xprt = %p, recv.copied = %lu, "
+				"recv.offset = %u, recv.len = %u\n",
+				xprt, transport->recv.copied,
+				transport->recv.offset, transport->recv.len);
 		return;
 	}
 
-	transport->tcp_copied += r;
-	transport->tcp_offset += r;
+	transport->recv.copied += r;
+	transport->recv.offset += r;
 	desc->count = len - r;
 
 	dprintk("RPC:       XID %08x read %zd bytes\n",
-			ntohl(transport->tcp_xid), r);
-	dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
-			"tcp_reclen = %u\n", xprt, transport->tcp_copied,
-			transport->tcp_offset, transport->tcp_reclen);
-
-	if (transport->tcp_copied == req->rq_private_buf.buflen)
-		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
-	else if (transport->tcp_offset == transport->tcp_reclen) {
-		if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
-			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
+			ntohl(transport->recv.xid), r);
+	dprintk("RPC:       xprt = %p, recv.copied = %lu, recv.offset = %u, "
+			"recv.len = %u\n", xprt, transport->recv.copied,
+			transport->recv.offset, transport->recv.len);
+
+	if (transport->recv.copied == req->rq_private_buf.buflen)
+		transport->recv.flags &= ~TCP_RCV_COPY_DATA;
+	else if (transport->recv.offset == transport->recv.len) {
+		if (transport->recv.flags & TCP_RCV_LAST_FRAG)
+			transport->recv.flags &= ~TCP_RCV_COPY_DATA;
 	}
 }
 
@@ -1353,14 +1353,14 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
 				container_of(xprt, struct sock_xprt, xprt);
 	struct rpc_rqst *req;
 
-	dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
+	dprintk("RPC:       read reply XID %08x\n", ntohl(transport->recv.xid));
 
 	/* Find and lock the request corresponding to this xid */
 	spin_lock(&xprt->recv_lock);
-	req = xprt_lookup_rqst(xprt, transport->tcp_xid);
+	req = xprt_lookup_rqst(xprt, transport->recv.xid);
 	if (!req) {
 		dprintk("RPC:       XID %08x request not found!\n",
-				ntohl(transport->tcp_xid));
+				ntohl(transport->recv.xid));
 		spin_unlock(&xprt->recv_lock);
 		return -1;
 	}
@@ -1370,8 +1370,8 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
 	xs_tcp_read_common(xprt, desc, req);
 
 	spin_lock(&xprt->recv_lock);
-	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
-		xprt_complete_rqst(req->rq_task, transport->tcp_copied);
+	if (!(transport->recv.flags & TCP_RCV_COPY_DATA))
+		xprt_complete_rqst(req->rq_task, transport->recv.copied);
 	xprt_unpin_rqst(req);
 	spin_unlock(&xprt->recv_lock);
 	return 0;
@@ -1393,7 +1393,7 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
 	struct rpc_rqst *req;
 
 	/* Look up the request corresponding to the given XID */
-	req = xprt_lookup_bc_request(xprt, transport->tcp_xid);
+	req = xprt_lookup_bc_request(xprt, transport->recv.xid);
 	if (req == NULL) {
 		printk(KERN_WARNING "Callback slot table overflowed\n");
 		xprt_force_disconnect(xprt);
@@ -1403,8 +1403,8 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
 	dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
 	xs_tcp_read_common(xprt, desc, req);
 
-	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
-		xprt_complete_bc_request(req, transport->tcp_copied);
+	if (!(transport->recv.flags & TCP_RCV_COPY_DATA))
+		xprt_complete_bc_request(req, transport->recv.copied);
 
 	return 0;
 }
@@ -1415,7 +1415,7 @@ static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
 	struct sock_xprt *transport =
 				container_of(xprt, struct sock_xprt, xprt);
 
-	return (transport->tcp_flags & TCP_RPC_REPLY) ?
+	return (transport->recv.flags & TCP_RPC_REPLY) ?
 		xs_tcp_read_reply(xprt, desc) :
 		xs_tcp_read_callback(xprt, desc);
 }
@@ -1458,9 +1458,9 @@ static void xs_tcp_read_data(struct rpc_xprt *xprt,
 	else {
 		/*
 		 * The transport_lock protects the request handling.
-		 * There's no need to hold it to update the tcp_flags.
+		 * There's no need to hold it to update the recv.flags.
 		 */
-		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
+		transport->recv.flags &= ~TCP_RCV_COPY_DATA;
 	}
 }
 
@@ -1468,12 +1468,12 @@ static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_s
 {
 	size_t len;
 
-	len = transport->tcp_reclen - transport->tcp_offset;
+	len = transport->recv.len - transport->recv.offset;
 	if (len > desc->count)
 		len = desc->count;
 	desc->count -= len;
 	desc->offset += len;
-	transport->tcp_offset += len;
+	transport->recv.offset += len;
 	dprintk("RPC:       discarded %zu bytes\n", len);
 	xs_tcp_check_fraghdr(transport);
 }
@@ -1494,22 +1494,22 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
 		trace_xs_tcp_data_recv(transport);
 		/* Read in a new fragment marker if necessary */
 		/* Can we ever really expect to get completely empty fragments? */
-		if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
+		if (transport->recv.flags & TCP_RCV_COPY_FRAGHDR) {
 			xs_tcp_read_fraghdr(xprt, &desc);
 			continue;
 		}
 		/* Read in the xid if necessary */
-		if (transport->tcp_flags & TCP_RCV_COPY_XID) {
+		if (transport->recv.flags & TCP_RCV_COPY_XID) {
 			xs_tcp_read_xid(transport, &desc);
 			continue;
 		}
 		/* Read in the call/reply flag */
-		if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
+		if (transport->recv.flags & TCP_RCV_READ_CALLDIR) {
 			xs_tcp_read_calldir(transport, &desc);
 			continue;
 		}
 		/* Read in the request data */
-		if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
+		if (transport->recv.flags & TCP_RCV_COPY_DATA) {
 			xs_tcp_read_data(xprt, &desc);
 			continue;
 		}
@@ -1602,10 +1602,10 @@ static void xs_tcp_state_change(struct sock *sk)
 		if (!xprt_test_and_set_connected(xprt)) {
 
 			/* Reset TCP record info */
-			transport->tcp_offset = 0;
-			transport->tcp_reclen = 0;
-			transport->tcp_copied = 0;
-			transport->tcp_flags =
+			transport->recv.offset = 0;
+			transport->recv.len = 0;
+			transport->recv.copied = 0;
+			transport->recv.flags =
 				TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
 			xprt->connect_cookie++;
 			clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 07/27] SUNRPC: Move reset of TCP state variables into the reconnect code
  2018-09-03 15:29           ` [PATCH 06/27] SUNRPC: Rename TCP receive-specific state variables Trond Myklebust
@ 2018-09-03 15:29             ` Trond Myklebust
  2018-09-03 15:29               ` [PATCH 08/27] SUNRPC: Add socket transmit queue offset tracking Trond Myklebust
  0 siblings, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

Rather than resetting state variables in socket state_change() callback,
do it in the sunrpc TCP connect function itself.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 net/sunrpc/xprtsock.c | 13 ++++++-------
 1 file changed, 6 insertions(+), 7 deletions(-)

diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index cd7d093721ae..ec1e3f93e707 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1600,13 +1600,6 @@ static void xs_tcp_state_change(struct sock *sk)
 	case TCP_ESTABLISHED:
 		spin_lock(&xprt->transport_lock);
 		if (!xprt_test_and_set_connected(xprt)) {
-
-			/* Reset TCP record info */
-			transport->recv.offset = 0;
-			transport->recv.len = 0;
-			transport->recv.copied = 0;
-			transport->recv.flags =
-				TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
 			xprt->connect_cookie++;
 			clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
 			xprt_clear_connecting(xprt);
@@ -2386,6 +2379,12 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 
 	xs_set_memalloc(xprt);
 
+	/* Reset TCP record info */
+	transport->recv.offset = 0;
+	transport->recv.len = 0;
+	transport->recv.copied = 0;
+	transport->recv.flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
+
 	/* Tell the socket layer to start connecting... */
 	xprt->stat.connect_count++;
 	xprt->stat.connect_start = jiffies;
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 08/27] SUNRPC: Add socket transmit queue offset tracking
  2018-09-03 15:29             ` [PATCH 07/27] SUNRPC: Move reset of TCP state variables into the reconnect code Trond Myklebust
@ 2018-09-03 15:29               ` Trond Myklebust
  2018-09-03 15:29                 ` [PATCH 09/27] SUNRPC: Simplify dealing with aborted partially transmitted messages Trond Myklebust
  0 siblings, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 include/linux/sunrpc/xprtsock.h |  7 ++++++
 net/sunrpc/xprtsock.c           | 40 ++++++++++++++++++---------------
 2 files changed, 29 insertions(+), 18 deletions(-)

diff --git a/include/linux/sunrpc/xprtsock.h b/include/linux/sunrpc/xprtsock.h
index 90d5ca8e65f4..005cfb6e7238 100644
--- a/include/linux/sunrpc/xprtsock.h
+++ b/include/linux/sunrpc/xprtsock.h
@@ -42,6 +42,13 @@ struct sock_xprt {
 				flags;
 	} recv;
 
+	/*
+	 * State of TCP transmit queue
+	 */
+	struct {
+		u32		offset;
+	} xmit;
+
 	/*
 	 * Connection of transports
 	 */
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index ec1e3f93e707..629cc45e1e6c 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -461,7 +461,7 @@ static int xs_nospace(struct rpc_task *task)
 	int ret = -EAGAIN;
 
 	dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
-			task->tk_pid, req->rq_slen - req->rq_bytes_sent,
+			task->tk_pid, req->rq_slen - transport->xmit.offset,
 			req->rq_slen);
 
 	/* Protect against races with write_space */
@@ -528,19 +528,22 @@ static int xs_local_send_request(struct rpc_task *task)
 			req->rq_svec->iov_base, req->rq_svec->iov_len);
 
 	req->rq_xtime = ktime_get();
-	status = xs_sendpages(transport->sock, NULL, 0, xdr, req->rq_bytes_sent,
+	status = xs_sendpages(transport->sock, NULL, 0, xdr,
+			      transport->xmit.offset,
 			      true, &sent);
 	dprintk("RPC:       %s(%u) = %d\n",
-			__func__, xdr->len - req->rq_bytes_sent, status);
+			__func__, xdr->len - transport->xmit.offset, status);
 
 	if (status == -EAGAIN && sock_writeable(transport->inet))
 		status = -ENOBUFS;
 
 	if (likely(sent > 0) || status == 0) {
-		req->rq_bytes_sent += sent;
-		req->rq_xmit_bytes_sent += sent;
+		transport->xmit.offset += sent;
+		req->rq_bytes_sent = transport->xmit.offset;
 		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
+			req->rq_xmit_bytes_sent += transport->xmit.offset;
 			req->rq_bytes_sent = 0;
+			transport->xmit.offset = 0;
 			return 0;
 		}
 		status = -EAGAIN;
@@ -592,10 +595,10 @@ static int xs_udp_send_request(struct rpc_task *task)
 		return -ENOTCONN;
 	req->rq_xtime = ktime_get();
 	status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen,
-			      xdr, req->rq_bytes_sent, true, &sent);
+			      xdr, 0, true, &sent);
 
 	dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
-			xdr->len - req->rq_bytes_sent, status);
+			xdr->len, status);
 
 	/* firewall is blocking us, don't return -EAGAIN or we end up looping */
 	if (status == -EPERM)
@@ -684,17 +687,20 @@ static int xs_tcp_send_request(struct rpc_task *task)
 	while (1) {
 		sent = 0;
 		status = xs_sendpages(transport->sock, NULL, 0, xdr,
-				      req->rq_bytes_sent, zerocopy, &sent);
+				      transport->xmit.offset,
+				      zerocopy, &sent);
 
 		dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
-				xdr->len - req->rq_bytes_sent, status);
+				xdr->len - transport->xmit.offset, status);
 
 		/* If we've sent the entire packet, immediately
 		 * reset the count of bytes sent. */
-		req->rq_bytes_sent += sent;
-		req->rq_xmit_bytes_sent += sent;
+		transport->xmit.offset += sent;
+		req->rq_bytes_sent = transport->xmit.offset;
 		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
+			req->rq_xmit_bytes_sent += transport->xmit.offset;
 			req->rq_bytes_sent = 0;
+			transport->xmit.offset = 0;
 			return 0;
 		}
 
@@ -760,18 +766,13 @@ static int xs_tcp_send_request(struct rpc_task *task)
  */
 static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 {
-	struct rpc_rqst *req;
+	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 
 	if (task != xprt->snd_task)
 		return;
 	if (task == NULL)
 		goto out_release;
-	req = task->tk_rqstp;
-	if (req == NULL)
-		goto out_release;
-	if (req->rq_bytes_sent == 0)
-		goto out_release;
-	if (req->rq_bytes_sent == req->rq_snd_buf.len)
+	if (transport->xmit.offset == 0 || !xprt_connected(xprt))
 		goto out_release;
 	set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 out_release:
@@ -2021,6 +2022,8 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
 		write_unlock_bh(&sk->sk_callback_lock);
 	}
 
+	transport->xmit.offset = 0;
+
 	/* Tell the socket layer to start connecting... */
 	xprt->stat.connect_count++;
 	xprt->stat.connect_start = jiffies;
@@ -2384,6 +2387,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 	transport->recv.len = 0;
 	transport->recv.copied = 0;
 	transport->recv.flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
+	transport->xmit.offset = 0;
 
 	/* Tell the socket layer to start connecting... */
 	xprt->stat.connect_count++;
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 09/27] SUNRPC: Simplify dealing with aborted partially transmitted messages
  2018-09-03 15:29               ` [PATCH 08/27] SUNRPC: Add socket transmit queue offset tracking Trond Myklebust
@ 2018-09-03 15:29                 ` Trond Myklebust
  2018-09-03 15:29                   ` [PATCH 10/27] SUNRPC: Refactor the transport request pinning Trond Myklebust
  0 siblings, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

If the previous message was only partially transmitted, we need to close
the socket in order to avoid corruption of the message stream. To do so,
we currently hijack the unlocking of the socket in order to schedule
the close.
Now that we track the message offset in the socket state, we can move
that kind of checking out of the socket lock code, which is needed to
allow messages to remain queued after dropping the socket lock.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 net/sunrpc/xprtsock.c | 51 +++++++++++++++++++++----------------------
 1 file changed, 25 insertions(+), 26 deletions(-)

diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 629cc45e1e6c..3fbccebd0b10 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -491,6 +491,16 @@ static int xs_nospace(struct rpc_task *task)
 	return ret;
 }
 
+/*
+ * Determine if the previous message in the stream was aborted before it
+ * could complete transmission.
+ */
+static bool
+xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req)
+{
+	return transport->xmit.offset != 0 && req->rq_bytes_sent == 0;
+}
+
 /*
  * Construct a stream transport record marker in @buf.
  */
@@ -522,6 +532,12 @@ static int xs_local_send_request(struct rpc_task *task)
 	int status;
 	int sent = 0;
 
+	/* Close the stream if the previous transmission was incomplete */
+	if (xs_send_request_was_aborted(transport, req)) {
+		xs_close(xprt);
+		return -ENOTCONN;
+	}
+
 	xs_encode_stream_record_marker(&req->rq_snd_buf);
 
 	xs_pktdump("packet data:",
@@ -665,6 +681,13 @@ static int xs_tcp_send_request(struct rpc_task *task)
 	int status;
 	int sent;
 
+	/* Close the stream if the previous transmission was incomplete */
+	if (xs_send_request_was_aborted(transport, req)) {
+		if (transport->sock != NULL)
+			kernel_sock_shutdown(transport->sock, SHUT_RDWR);
+		return -ENOTCONN;
+	}
+
 	xs_encode_stream_record_marker(&req->rq_snd_buf);
 
 	xs_pktdump("packet data:",
@@ -755,30 +778,6 @@ static int xs_tcp_send_request(struct rpc_task *task)
 	return status;
 }
 
-/**
- * xs_tcp_release_xprt - clean up after a tcp transmission
- * @xprt: transport
- * @task: rpc task
- *
- * This cleans up if an error causes us to abort the transmission of a request.
- * In this case, the socket may need to be reset in order to avoid confusing
- * the server.
- */
-static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
-{
-	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
-
-	if (task != xprt->snd_task)
-		return;
-	if (task == NULL)
-		goto out_release;
-	if (transport->xmit.offset == 0 || !xprt_connected(xprt))
-		goto out_release;
-	set_bit(XPRT_CLOSE_WAIT, &xprt->state);
-out_release:
-	xprt_release_xprt(xprt, task);
-}
-
 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
 {
 	transport->old_data_ready = sk->sk_data_ready;
@@ -2764,7 +2763,7 @@ static void bc_destroy(struct rpc_xprt *xprt)
 
 static const struct rpc_xprt_ops xs_local_ops = {
 	.reserve_xprt		= xprt_reserve_xprt,
-	.release_xprt		= xs_tcp_release_xprt,
+	.release_xprt		= xprt_release_xprt,
 	.alloc_slot		= xprt_alloc_slot,
 	.free_slot		= xprt_free_slot,
 	.rpcbind		= xs_local_rpcbind,
@@ -2806,7 +2805,7 @@ static const struct rpc_xprt_ops xs_udp_ops = {
 
 static const struct rpc_xprt_ops xs_tcp_ops = {
 	.reserve_xprt		= xprt_reserve_xprt,
-	.release_xprt		= xs_tcp_release_xprt,
+	.release_xprt		= xprt_release_xprt,
 	.alloc_slot		= xprt_lock_and_alloc_slot,
 	.free_slot		= xprt_free_slot,
 	.rpcbind		= rpcb_getport_async,
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 10/27] SUNRPC: Refactor the transport request pinning
  2018-09-03 15:29                 ` [PATCH 09/27] SUNRPC: Simplify dealing with aborted partially transmitted messages Trond Myklebust
@ 2018-09-03 15:29                   ` Trond Myklebust
  2018-09-03 15:29                     ` [PATCH 11/27] SUNRPC: Add a helper to wake up a sleeping rpc_task and set its status Trond Myklebust
  0 siblings, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

We are going to need to pin for both send and receive.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 include/linux/sunrpc/sched.h |  2 --
 include/linux/sunrpc/xprt.h  |  1 +
 net/sunrpc/xprt.c            | 37 +++++++++++++++++-------------------
 3 files changed, 18 insertions(+), 22 deletions(-)

diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 9e655df70131..a4a42b3a1f03 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -142,8 +142,6 @@ struct rpc_task_setup {
 #define RPC_TASK_ACTIVE		2
 #define RPC_TASK_NEED_XMIT	3
 #define RPC_TASK_NEED_RECV	4
-#define RPC_TASK_MSG_RECV	5
-#define RPC_TASK_MSG_RECV_WAIT	6
 
 #define RPC_IS_RUNNING(t)	test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
 #define rpc_set_running(t)	set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 3d80524e92d6..bd743c51a865 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -103,6 +103,7 @@ struct rpc_rqst {
 						/* A cookie used to track the
 						   state of the transport
 						   connection */
+	atomic_t		rq_pin;
 	
 	/*
 	 * Partial send handling
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 45d580cd93ac..bf8fc1a5dbd1 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -847,16 +847,22 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
 }
 EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
 
+static bool
+xprt_is_pinned_rqst(struct rpc_rqst *req)
+{
+	return atomic_read(&req->rq_pin) != 0;
+}
+
 /**
  * xprt_pin_rqst - Pin a request on the transport receive list
  * @req: Request to pin
  *
  * Caller must ensure this is atomic with the call to xprt_lookup_rqst()
- * so should be holding the xprt transport lock.
+ * so should be holding the xprt receive lock.
  */
 void xprt_pin_rqst(struct rpc_rqst *req)
 {
-	set_bit(RPC_TASK_MSG_RECV, &req->rq_task->tk_runstate);
+	atomic_inc(&req->rq_pin);
 }
 EXPORT_SYMBOL_GPL(xprt_pin_rqst);
 
@@ -864,31 +870,18 @@ EXPORT_SYMBOL_GPL(xprt_pin_rqst);
  * xprt_unpin_rqst - Unpin a request on the transport receive list
  * @req: Request to pin
  *
- * Caller should be holding the xprt transport lock.
+ * Caller should be holding the xprt receive lock.
  */
 void xprt_unpin_rqst(struct rpc_rqst *req)
 {
-	struct rpc_task *task = req->rq_task;
-
-	clear_bit(RPC_TASK_MSG_RECV, &task->tk_runstate);
-	if (test_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate))
-		wake_up_bit(&task->tk_runstate, RPC_TASK_MSG_RECV);
+	if (atomic_dec_and_test(&req->rq_pin))
+		wake_up_var(&req->rq_pin);
 }
 EXPORT_SYMBOL_GPL(xprt_unpin_rqst);
 
 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
-__must_hold(&req->rq_xprt->recv_lock)
 {
-	struct rpc_task *task = req->rq_task;
-
-	if (task && test_bit(RPC_TASK_MSG_RECV, &task->tk_runstate)) {
-		spin_unlock(&req->rq_xprt->recv_lock);
-		set_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
-		wait_on_bit(&task->tk_runstate, RPC_TASK_MSG_RECV,
-				TASK_UNINTERRUPTIBLE);
-		clear_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
-		spin_lock(&req->rq_xprt->recv_lock);
-	}
+	wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req));
 }
 
 /**
@@ -1388,7 +1381,11 @@ void xprt_release(struct rpc_task *task)
 	spin_lock(&xprt->recv_lock);
 	if (!list_empty(&req->rq_list)) {
 		list_del_init(&req->rq_list);
-		xprt_wait_on_pinned_rqst(req);
+		if (atomic_read(&req->rq_pin)) {
+			spin_unlock(&xprt->recv_lock);
+			xprt_wait_on_pinned_rqst(req);
+			spin_lock(&xprt->recv_lock);
+		}
 	}
 	spin_unlock(&xprt->recv_lock);
 	spin_lock_bh(&xprt->transport_lock);
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 11/27] SUNRPC: Add a helper to wake up a sleeping rpc_task and set its status
  2018-09-03 15:29                   ` [PATCH 10/27] SUNRPC: Refactor the transport request pinning Trond Myklebust
@ 2018-09-03 15:29                     ` Trond Myklebust
  2018-09-03 15:29                       ` [PATCH 12/27] SUNRPC: Don't wake queued RPC calls multiple times in xprt_transmit Trond Myklebust
  0 siblings, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

Add a helper that will wake up a task that is sleeping on a specific
queue, and will set the value of task->tk_status. This is mainly
intended for use by the transport layer to notify the task of an
error condition.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 include/linux/sunrpc/sched.h |  3 ++
 net/sunrpc/sched.c           | 63 ++++++++++++++++++++++++++++++------
 2 files changed, 56 insertions(+), 10 deletions(-)

diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index a4a42b3a1f03..c5bc779feb00 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -234,6 +234,9 @@ void rpc_wake_up_queued_task_on_wq(struct workqueue_struct *wq,
 		struct rpc_task *task);
 void		rpc_wake_up_queued_task(struct rpc_wait_queue *,
 					struct rpc_task *);
+void		rpc_wake_up_queued_task_set_status(struct rpc_wait_queue *,
+						   struct rpc_task *,
+						   int);
 void		rpc_wake_up(struct rpc_wait_queue *);
 struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *);
 struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 3fe5d60ab0e2..104c056daf83 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -440,14 +440,28 @@ static void __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq,
 /*
  * Wake up a queued task while the queue lock is being held
  */
-static void rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq,
-		struct rpc_wait_queue *queue, struct rpc_task *task)
+static struct rpc_task *
+rpc_wake_up_task_on_wq_queue_action_locked(struct workqueue_struct *wq,
+		struct rpc_wait_queue *queue, struct rpc_task *task,
+		bool (*action)(struct rpc_task *, void *), void *data)
 {
 	if (RPC_IS_QUEUED(task)) {
 		smp_rmb();
-		if (task->tk_waitqueue == queue)
-			__rpc_do_wake_up_task_on_wq(wq, queue, task);
+		if (task->tk_waitqueue == queue) {
+			if (action == NULL || action(task, data)) {
+				__rpc_do_wake_up_task_on_wq(wq, queue, task);
+				return task;
+			}
+		}
 	}
+	return NULL;
+}
+
+static void
+rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq,
+		struct rpc_wait_queue *queue, struct rpc_task *task)
+{
+	rpc_wake_up_task_on_wq_queue_action_locked(wq, queue, task, NULL, NULL);
 }
 
 /*
@@ -481,6 +495,38 @@ void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task
 }
 EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
 
+static bool rpc_task_action_set_status(struct rpc_task *task, void *status)
+{
+	task->tk_status = *(int *)status;
+	return true;
+}
+
+static void
+rpc_wake_up_task_queue_set_status_locked(struct rpc_wait_queue *queue,
+		struct rpc_task *task, int status)
+{
+	rpc_wake_up_task_on_wq_queue_action_locked(rpciod_workqueue, queue,
+			task, rpc_task_action_set_status, &status);
+}
+
+/**
+ * rpc_wake_up_queued_task_set_status - wake up a task and set task->tk_status
+ * @queue: pointer to rpc_wait_queue
+ * @task: pointer to rpc_task
+ * @status: integer error value
+ *
+ * If @task is queued on @queue, then it is woken up, and @task->tk_status is
+ * set to the value of @status.
+ */
+void
+rpc_wake_up_queued_task_set_status(struct rpc_wait_queue *queue,
+		struct rpc_task *task, int status)
+{
+	spin_lock_bh(&queue->lock);
+	rpc_wake_up_task_queue_set_status_locked(queue, task, status);
+	spin_unlock_bh(&queue->lock);
+}
+
 /*
  * Wake up the next task on a priority queue.
  */
@@ -553,12 +599,9 @@ struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
 			queue, rpc_qname(queue));
 	spin_lock_bh(&queue->lock);
 	task = __rpc_find_next_queued(queue);
-	if (task != NULL) {
-		if (func(task, data))
-			rpc_wake_up_task_on_wq_queue_locked(wq, queue, task);
-		else
-			task = NULL;
-	}
+	if (task != NULL)
+		task = rpc_wake_up_task_on_wq_queue_action_locked(wq, queue,
+				task, func, data);
 	spin_unlock_bh(&queue->lock);
 
 	return task;
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 12/27] SUNRPC: Don't wake queued RPC calls multiple times in xprt_transmit
  2018-09-03 15:29                     ` [PATCH 11/27] SUNRPC: Add a helper to wake up a sleeping rpc_task and set its status Trond Myklebust
@ 2018-09-03 15:29                       ` Trond Myklebust
  2018-09-03 15:29                         ` [PATCH 13/27] SUNRPC: Rename xprt->recv_lock to xprt->queue_lock Trond Myklebust
  0 siblings, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

Rather than waking up the entire queue of RPC messages a second time,
just wake up the task that was put to sleep.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 net/sunrpc/xprt.c | 9 +++------
 1 file changed, 3 insertions(+), 6 deletions(-)

diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index bf8fc1a5dbd1..0441e6c8153d 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1075,13 +1075,10 @@ void xprt_transmit(struct rpc_task *task)
 		spin_lock(&xprt->recv_lock);
 		if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
 			rpc_sleep_on(&xprt->pending, task, xprt_timer);
-			/*
-			 * Send an extra queue wakeup call if the
-			 * connection was dropped in case the call to
-			 * rpc_sleep_on() raced.
-			 */
+			/* Wake up immediately if the connection was dropped */
 			if (!xprt_connected(xprt))
-				xprt_wake_pending_tasks(xprt, -ENOTCONN);
+				rpc_wake_up_queued_task_set_status(&xprt->pending,
+						task, -ENOTCONN);
 		}
 		spin_unlock(&xprt->recv_lock);
 	}
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 13/27] SUNRPC: Rename xprt->recv_lock to xprt->queue_lock
  2018-09-03 15:29                       ` [PATCH 12/27] SUNRPC: Don't wake queued RPC calls multiple times in xprt_transmit Trond Myklebust
@ 2018-09-03 15:29                         ` Trond Myklebust
  2018-09-03 15:29                           ` [PATCH 14/27] SUNRPC: Refactor xprt_transmit() to remove the reply queue code Trond Myklebust
  0 siblings, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

We will use the same lock to protect both the transmit and receive queues.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 include/linux/sunrpc/xprt.h                |  2 +-
 net/sunrpc/svcsock.c                       |  6 ++---
 net/sunrpc/xprt.c                          | 24 ++++++++---------
 net/sunrpc/xprtrdma/rpc_rdma.c             | 10 ++++----
 net/sunrpc/xprtrdma/svc_rdma_backchannel.c |  4 +--
 net/sunrpc/xprtsock.c                      | 30 +++++++++++-----------
 6 files changed, 38 insertions(+), 38 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index bd743c51a865..c25d0a5fda69 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -235,7 +235,7 @@ struct rpc_xprt {
 	 */
 	spinlock_t		transport_lock;	/* lock transport info */
 	spinlock_t		reserve_lock;	/* lock slot table */
-	spinlock_t		recv_lock;	/* lock receive list */
+	spinlock_t		queue_lock;	/* send/receive queue lock */
 	u32			xid;		/* Next XID value to use */
 	struct rpc_task *	snd_task;	/* Task blocked in send */
 	struct svc_xprt		*bc_xprt;	/* NFSv4.1 backchannel */
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index 5445145e639c..db8bb6b3a2b0 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -1004,7 +1004,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
 
 	if (!bc_xprt)
 		return -EAGAIN;
-	spin_lock(&bc_xprt->recv_lock);
+	spin_lock(&bc_xprt->queue_lock);
 	req = xprt_lookup_rqst(bc_xprt, xid);
 	if (!req)
 		goto unlock_notfound;
@@ -1022,7 +1022,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
 	memcpy(dst->iov_base, src->iov_base, src->iov_len);
 	xprt_complete_rqst(req->rq_task, rqstp->rq_arg.len);
 	rqstp->rq_arg.len = 0;
-	spin_unlock(&bc_xprt->recv_lock);
+	spin_unlock(&bc_xprt->queue_lock);
 	return 0;
 unlock_notfound:
 	printk(KERN_NOTICE
@@ -1031,7 +1031,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
 		__func__, ntohl(calldir),
 		bc_xprt, ntohl(xid));
 unlock_eagain:
-	spin_unlock(&bc_xprt->recv_lock);
+	spin_unlock(&bc_xprt->queue_lock);
 	return -EAGAIN;
 }
 
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 0441e6c8153d..eda305de9f77 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -826,7 +826,7 @@ static void xprt_connect_status(struct rpc_task *task)
  * @xprt: transport on which the original request was transmitted
  * @xid: RPC XID of incoming reply
  *
- * Caller holds xprt->recv_lock.
+ * Caller holds xprt->queue_lock.
  */
 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
 {
@@ -888,7 +888,7 @@ static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
  * xprt_update_rtt - Update RPC RTT statistics
  * @task: RPC request that recently completed
  *
- * Caller holds xprt->recv_lock.
+ * Caller holds xprt->queue_lock.
  */
 void xprt_update_rtt(struct rpc_task *task)
 {
@@ -910,7 +910,7 @@ EXPORT_SYMBOL_GPL(xprt_update_rtt);
  * @task: RPC request that recently completed
  * @copied: actual number of bytes received from the transport
  *
- * Caller holds xprt->recv_lock.
+ * Caller holds xprt->queue_lock.
  */
 void xprt_complete_rqst(struct rpc_task *task, int copied)
 {
@@ -1030,10 +1030,10 @@ void xprt_transmit(struct rpc_task *task)
 			memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
 					sizeof(req->rq_private_buf));
 			/* Add request to the receive list */
-			spin_lock(&xprt->recv_lock);
+			spin_lock(&xprt->queue_lock);
 			list_add_tail(&req->rq_list, &xprt->recv);
 			set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
-			spin_unlock(&xprt->recv_lock);
+			spin_unlock(&xprt->queue_lock);
 			xprt_reset_majortimeo(req);
 			/* Turn off autodisconnect */
 			del_singleshot_timer_sync(&xprt->timer);
@@ -1072,7 +1072,7 @@ void xprt_transmit(struct rpc_task *task)
 		 * The spinlock ensures atomicity between the test of
 		 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
 		 */
-		spin_lock(&xprt->recv_lock);
+		spin_lock(&xprt->queue_lock);
 		if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
 			rpc_sleep_on(&xprt->pending, task, xprt_timer);
 			/* Wake up immediately if the connection was dropped */
@@ -1080,7 +1080,7 @@ void xprt_transmit(struct rpc_task *task)
 				rpc_wake_up_queued_task_set_status(&xprt->pending,
 						task, -ENOTCONN);
 		}
-		spin_unlock(&xprt->recv_lock);
+		spin_unlock(&xprt->queue_lock);
 	}
 }
 
@@ -1375,16 +1375,16 @@ void xprt_release(struct rpc_task *task)
 		task->tk_ops->rpc_count_stats(task, task->tk_calldata);
 	else if (task->tk_client)
 		rpc_count_iostats(task, task->tk_client->cl_metrics);
-	spin_lock(&xprt->recv_lock);
+	spin_lock(&xprt->queue_lock);
 	if (!list_empty(&req->rq_list)) {
 		list_del_init(&req->rq_list);
 		if (atomic_read(&req->rq_pin)) {
-			spin_unlock(&xprt->recv_lock);
+			spin_unlock(&xprt->queue_lock);
 			xprt_wait_on_pinned_rqst(req);
-			spin_lock(&xprt->recv_lock);
+			spin_lock(&xprt->queue_lock);
 		}
 	}
-	spin_unlock(&xprt->recv_lock);
+	spin_unlock(&xprt->queue_lock);
 	spin_lock_bh(&xprt->transport_lock);
 	xprt->ops->release_xprt(xprt, task);
 	if (xprt->ops->release_request)
@@ -1414,7 +1414,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)
 
 	spin_lock_init(&xprt->transport_lock);
 	spin_lock_init(&xprt->reserve_lock);
-	spin_lock_init(&xprt->recv_lock);
+	spin_lock_init(&xprt->queue_lock);
 
 	INIT_LIST_HEAD(&xprt->free);
 	INIT_LIST_HEAD(&xprt->recv);
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index c8ae983c6cc0..0020dc401215 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -1238,7 +1238,7 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
 		goto out_badheader;
 
 out:
-	spin_lock(&xprt->recv_lock);
+	spin_lock(&xprt->queue_lock);
 	cwnd = xprt->cwnd;
 	xprt->cwnd = r_xprt->rx_buf.rb_credits << RPC_CWNDSHIFT;
 	if (xprt->cwnd > cwnd)
@@ -1246,7 +1246,7 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
 
 	xprt_complete_rqst(rqst->rq_task, status);
 	xprt_unpin_rqst(rqst);
-	spin_unlock(&xprt->recv_lock);
+	spin_unlock(&xprt->queue_lock);
 	return;
 
 /* If the incoming reply terminated a pending RPC, the next
@@ -1345,7 +1345,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 	/* Match incoming rpcrdma_rep to an rpcrdma_req to
 	 * get context for handling any incoming chunks.
 	 */
-	spin_lock(&xprt->recv_lock);
+	spin_lock(&xprt->queue_lock);
 	rqst = xprt_lookup_rqst(xprt, rep->rr_xid);
 	if (!rqst)
 		goto out_norqst;
@@ -1357,7 +1357,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 		credits = buf->rb_max_requests;
 	buf->rb_credits = credits;
 
-	spin_unlock(&xprt->recv_lock);
+	spin_unlock(&xprt->queue_lock);
 
 	req = rpcr_to_rdmar(rqst);
 	req->rl_reply = rep;
@@ -1378,7 +1378,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
  * is corrupt.
  */
 out_norqst:
-	spin_unlock(&xprt->recv_lock);
+	spin_unlock(&xprt->queue_lock);
 	trace_xprtrdma_reply_rqst(rep);
 	goto repost;
 
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index a68180090554..09b12b7568fe 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -56,7 +56,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
 	if (src->iov_len < 24)
 		goto out_shortreply;
 
-	spin_lock(&xprt->recv_lock);
+	spin_lock(&xprt->queue_lock);
 	req = xprt_lookup_rqst(xprt, xid);
 	if (!req)
 		goto out_notfound;
@@ -86,7 +86,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
 	rcvbuf->len = 0;
 
 out_unlock:
-	spin_unlock(&xprt->recv_lock);
+	spin_unlock(&xprt->queue_lock);
 out:
 	return ret;
 
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 3fbccebd0b10..8d6404259ff9 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -966,12 +966,12 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt,
 		return;
 
 	/* Look up and lock the request corresponding to the given XID */
-	spin_lock(&xprt->recv_lock);
+	spin_lock(&xprt->queue_lock);
 	rovr = xprt_lookup_rqst(xprt, *xp);
 	if (!rovr)
 		goto out_unlock;
 	xprt_pin_rqst(rovr);
-	spin_unlock(&xprt->recv_lock);
+	spin_unlock(&xprt->queue_lock);
 	task = rovr->rq_task;
 
 	copied = rovr->rq_private_buf.buflen;
@@ -980,16 +980,16 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt,
 
 	if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
 		dprintk("RPC:       sk_buff copy failed\n");
-		spin_lock(&xprt->recv_lock);
+		spin_lock(&xprt->queue_lock);
 		goto out_unpin;
 	}
 
-	spin_lock(&xprt->recv_lock);
+	spin_lock(&xprt->queue_lock);
 	xprt_complete_rqst(task, copied);
 out_unpin:
 	xprt_unpin_rqst(rovr);
  out_unlock:
-	spin_unlock(&xprt->recv_lock);
+	spin_unlock(&xprt->queue_lock);
 }
 
 static void xs_local_data_receive(struct sock_xprt *transport)
@@ -1058,13 +1058,13 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
 		return;
 
 	/* Look up and lock the request corresponding to the given XID */
-	spin_lock(&xprt->recv_lock);
+	spin_lock(&xprt->queue_lock);
 	rovr = xprt_lookup_rqst(xprt, *xp);
 	if (!rovr)
 		goto out_unlock;
 	xprt_pin_rqst(rovr);
 	xprt_update_rtt(rovr->rq_task);
-	spin_unlock(&xprt->recv_lock);
+	spin_unlock(&xprt->queue_lock);
 	task = rovr->rq_task;
 
 	if ((copied = rovr->rq_private_buf.buflen) > repsize)
@@ -1072,7 +1072,7 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
 
 	/* Suck it into the iovec, verify checksum if not done by hw. */
 	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
-		spin_lock(&xprt->recv_lock);
+		spin_lock(&xprt->queue_lock);
 		__UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
 		goto out_unpin;
 	}
@@ -1081,13 +1081,13 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
 	spin_lock_bh(&xprt->transport_lock);
 	xprt_adjust_cwnd(xprt, task, copied);
 	spin_unlock_bh(&xprt->transport_lock);
-	spin_lock(&xprt->recv_lock);
+	spin_lock(&xprt->queue_lock);
 	xprt_complete_rqst(task, copied);
 	__UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
 out_unpin:
 	xprt_unpin_rqst(rovr);
  out_unlock:
-	spin_unlock(&xprt->recv_lock);
+	spin_unlock(&xprt->queue_lock);
 }
 
 static void xs_udp_data_receive(struct sock_xprt *transport)
@@ -1356,24 +1356,24 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
 	dprintk("RPC:       read reply XID %08x\n", ntohl(transport->recv.xid));
 
 	/* Find and lock the request corresponding to this xid */
-	spin_lock(&xprt->recv_lock);
+	spin_lock(&xprt->queue_lock);
 	req = xprt_lookup_rqst(xprt, transport->recv.xid);
 	if (!req) {
 		dprintk("RPC:       XID %08x request not found!\n",
 				ntohl(transport->recv.xid));
-		spin_unlock(&xprt->recv_lock);
+		spin_unlock(&xprt->queue_lock);
 		return -1;
 	}
 	xprt_pin_rqst(req);
-	spin_unlock(&xprt->recv_lock);
+	spin_unlock(&xprt->queue_lock);
 
 	xs_tcp_read_common(xprt, desc, req);
 
-	spin_lock(&xprt->recv_lock);
+	spin_lock(&xprt->queue_lock);
 	if (!(transport->recv.flags & TCP_RCV_COPY_DATA))
 		xprt_complete_rqst(req->rq_task, transport->recv.copied);
 	xprt_unpin_rqst(req);
-	spin_unlock(&xprt->recv_lock);
+	spin_unlock(&xprt->queue_lock);
 	return 0;
 }
 
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 14/27] SUNRPC: Refactor xprt_transmit() to remove the reply queue code
  2018-09-03 15:29                         ` [PATCH 13/27] SUNRPC: Rename xprt->recv_lock to xprt->queue_lock Trond Myklebust
@ 2018-09-03 15:29                           ` Trond Myklebust
  2018-09-03 15:29                             ` [PATCH 15/27] SUNRPC: Refactor xprt_transmit() to remove wait for reply code Trond Myklebust
  0 siblings, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

Separate out the action of adding a request to the reply queue so that the
backchannel code can simply skip calling it altogether.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 include/linux/sunrpc/xprt.h |   1 +
 net/sunrpc/clnt.c           |   5 ++
 net/sunrpc/xprt.c           | 100 ++++++++++++++++++++++--------------
 3 files changed, 68 insertions(+), 38 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index c25d0a5fda69..0250294c904a 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -334,6 +334,7 @@ void			xprt_free_slot(struct rpc_xprt *xprt,
 				       struct rpc_rqst *req);
 void			xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task);
 bool			xprt_prepare_transmit(struct rpc_task *task);
+void			xprt_request_enqueue_receive(struct rpc_task *task);
 void			xprt_transmit(struct rpc_task *task);
 void			xprt_end_transmit(struct rpc_task *task);
 int			xprt_adjust_timeout(struct rpc_rqst *req);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 66ec61347716..3d6d1b5f9e81 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1962,6 +1962,11 @@ call_transmit(struct rpc_task *task)
 			return;
 		}
 	}
+
+	/* Add task to reply queue before transmission to avoid races */
+	if (rpc_reply_expected(task))
+		xprt_request_enqueue_receive(task);
+
 	if (!xprt_prepare_transmit(task))
 		return;
 	task->tk_action = call_transmit_status;
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index eda305de9f77..cb3c0f7d5b3d 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -884,6 +884,57 @@ static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
 	wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req));
 }
 
+static bool
+xprt_request_data_received(struct rpc_task *task)
+{
+	return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
+		task->tk_rqstp->rq_reply_bytes_recvd != 0;
+}
+
+/**
+ * xprt_request_enqueue_receive - Add an request to the receive queue
+ * @task: RPC task
+ *
+ */
+void
+xprt_request_enqueue_receive(struct rpc_task *task)
+{
+	struct rpc_rqst *req = task->tk_rqstp;
+	struct rpc_xprt *xprt = req->rq_xprt;
+
+	spin_lock(&xprt->queue_lock);
+	if (xprt_request_data_received(task) || !list_empty(&req->rq_list)) {
+		spin_unlock(&xprt->queue_lock);
+		return;
+	}
+
+	/* Update the softirq receive buffer */
+	memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
+			sizeof(req->rq_private_buf));
+
+	/* Add request to the receive list */
+	list_add_tail(&req->rq_list, &xprt->recv);
+	set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
+	spin_unlock(&xprt->queue_lock);
+
+	xprt_reset_majortimeo(req);
+	/* Turn off autodisconnect */
+	del_singleshot_timer_sync(&xprt->timer);
+}
+
+/**
+ * xprt_request_dequeue_receive_locked - Remove a request from the receive queue
+ * @task: RPC task
+ *
+ * Caller must hold xprt->queue_lock.
+ */
+static void
+xprt_request_dequeue_receive_locked(struct rpc_task *task)
+{
+	clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
+	list_del_init(&task->tk_rqstp->rq_list);
+}
+
 /**
  * xprt_update_rtt - Update RPC RTT statistics
  * @task: RPC request that recently completed
@@ -923,24 +974,16 @@ void xprt_complete_rqst(struct rpc_task *task, int copied)
 
 	xprt->stat.recvs++;
 
-	list_del_init(&req->rq_list);
 	req->rq_private_buf.len = copied;
 	/* Ensure all writes are done before we update */
 	/* req->rq_reply_bytes_recvd */
 	smp_wmb();
 	req->rq_reply_bytes_recvd = copied;
-	clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
+	xprt_request_dequeue_receive_locked(task);
 	rpc_wake_up_queued_task(&xprt->pending, task);
 }
 EXPORT_SYMBOL_GPL(xprt_complete_rqst);
 
-static bool
-xprt_request_data_received(struct rpc_task *task)
-{
-	return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
-		task->tk_rqstp->rq_reply_bytes_recvd != 0;
-}
-
 static void xprt_timer(struct rpc_task *task)
 {
 	struct rpc_rqst *req = task->tk_rqstp;
@@ -1014,32 +1057,15 @@ void xprt_transmit(struct rpc_task *task)
 
 	dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
 
-	if (!req->rq_reply_bytes_recvd) {
-
+	if (!req->rq_bytes_sent) {
+		if (xprt_request_data_received(task))
+			return;
 		/* Verify that our message lies in the RPCSEC_GSS window */
-		if (!req->rq_bytes_sent && rpcauth_xmit_need_reencode(task)) {
+		if (rpcauth_xmit_need_reencode(task)) {
 			task->tk_status = -EBADMSG;
 			return;
 		}
-
-		if (list_empty(&req->rq_list) && rpc_reply_expected(task)) {
-			/*
-			 * Add to the list only if we're expecting a reply
-			 */
-			/* Update the softirq receive buffer */
-			memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
-					sizeof(req->rq_private_buf));
-			/* Add request to the receive list */
-			spin_lock(&xprt->queue_lock);
-			list_add_tail(&req->rq_list, &xprt->recv);
-			set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
-			spin_unlock(&xprt->queue_lock);
-			xprt_reset_majortimeo(req);
-			/* Turn off autodisconnect */
-			del_singleshot_timer_sync(&xprt->timer);
-		}
-	} else if (xprt_request_data_received(task) && !req->rq_bytes_sent)
-		return;
+	}
 
 	connect_cookie = xprt->connect_cookie;
 	status = xprt->ops->send_request(task);
@@ -1376,13 +1402,11 @@ void xprt_release(struct rpc_task *task)
 	else if (task->tk_client)
 		rpc_count_iostats(task, task->tk_client->cl_metrics);
 	spin_lock(&xprt->queue_lock);
-	if (!list_empty(&req->rq_list)) {
-		list_del_init(&req->rq_list);
-		if (atomic_read(&req->rq_pin)) {
-			spin_unlock(&xprt->queue_lock);
-			xprt_wait_on_pinned_rqst(req);
-			spin_lock(&xprt->queue_lock);
-		}
+	xprt_request_dequeue_receive_locked(task);
+	while (xprt_is_pinned_rqst(req)) {
+		spin_unlock(&xprt->queue_lock);
+		xprt_wait_on_pinned_rqst(req);
+		spin_lock(&xprt->queue_lock);
 	}
 	spin_unlock(&xprt->queue_lock);
 	spin_lock_bh(&xprt->transport_lock);
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 15/27] SUNRPC: Refactor xprt_transmit() to remove wait for reply code
  2018-09-03 15:29                           ` [PATCH 14/27] SUNRPC: Refactor xprt_transmit() to remove the reply queue code Trond Myklebust
@ 2018-09-03 15:29                             ` Trond Myklebust
  2018-09-03 15:29                               ` [PATCH 16/27] SUNRPC: Minor cleanup for call_transmit() Trond Myklebust
  0 siblings, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

Allow the caller in clnt.c to call into the code to wait for a reply
after calling xprt_transmit(). Again, the reason is that the backchannel
code does not need this functionality.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 include/linux/sunrpc/xprt.h |  1 +
 net/sunrpc/clnt.c           | 10 +-----
 net/sunrpc/xprt.c           | 72 ++++++++++++++++++++++++++-----------
 3 files changed, 53 insertions(+), 30 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 0250294c904a..4fa2af087cff 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -335,6 +335,7 @@ void			xprt_free_slot(struct rpc_xprt *xprt,
 void			xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task);
 bool			xprt_prepare_transmit(struct rpc_task *task);
 void			xprt_request_enqueue_receive(struct rpc_task *task);
+void			xprt_request_wait_receive(struct rpc_task *task);
 void			xprt_transmit(struct rpc_task *task);
 void			xprt_end_transmit(struct rpc_task *task);
 int			xprt_adjust_timeout(struct rpc_rqst *req);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 3d6d1b5f9e81..cf09aab11014 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1975,15 +1975,6 @@ call_transmit(struct rpc_task *task)
 		return;
 	if (is_retrans)
 		task->tk_client->cl_stats->rpcretrans++;
-	/*
-	 * On success, ensure that we call xprt_end_transmit() before sleeping
-	 * in order to allow access to the socket to other RPC requests.
-	 */
-	call_transmit_status(task);
-	if (rpc_reply_expected(task))
-		return;
-	task->tk_action = rpc_exit_task;
-	rpc_wake_up_queued_task(&task->tk_rqstp->rq_xprt->pending, task);
 }
 
 /*
@@ -2000,6 +1991,7 @@ call_transmit_status(struct rpc_task *task)
 	 */
 	if (task->tk_status == 0) {
 		xprt_end_transmit(task);
+		xprt_request_wait_receive(task);
 		return;
 	}
 
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index cb3c0f7d5b3d..aefa23c90a2b 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -654,6 +654,22 @@ void xprt_force_disconnect(struct rpc_xprt *xprt)
 }
 EXPORT_SYMBOL_GPL(xprt_force_disconnect);
 
+static unsigned int
+xprt_connect_cookie(struct rpc_xprt *xprt)
+{
+	return READ_ONCE(xprt->connect_cookie);
+}
+
+static bool
+xprt_request_retransmit_after_disconnect(struct rpc_task *task)
+{
+	struct rpc_rqst *req = task->tk_rqstp;
+	struct rpc_xprt *xprt = req->rq_xprt;
+
+	return req->rq_connect_cookie != xprt_connect_cookie(xprt) ||
+		!xprt_connected(xprt);
+}
+
 /**
  * xprt_conditional_disconnect - force a transport to disconnect
  * @xprt: transport to disconnect
@@ -1000,6 +1016,39 @@ static void xprt_timer(struct rpc_task *task)
 		task->tk_status = 0;
 }
 
+/**
+ * xprt_request_wait_receive - wait for the reply to an RPC request
+ * @task: RPC task about to send a request
+ *
+ */
+void xprt_request_wait_receive(struct rpc_task *task)
+{
+	struct rpc_rqst *req = task->tk_rqstp;
+	struct rpc_xprt *xprt = req->rq_xprt;
+
+	if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
+		return;
+	/*
+	 * Sleep on the pending queue if we're expecting a reply.
+	 * The spinlock ensures atomicity between the test of
+	 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
+	 */
+	spin_lock(&xprt->queue_lock);
+	if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
+		xprt->ops->set_retrans_timeout(task);
+		rpc_sleep_on(&xprt->pending, task, xprt_timer);
+		/*
+		 * Send an extra queue wakeup call if the
+		 * connection was dropped in case the call to
+		 * rpc_sleep_on() raced.
+		 */
+		if (xprt_request_retransmit_after_disconnect(task))
+			rpc_wake_up_queued_task_set_status(&xprt->pending,
+					task, -ENOTCONN);
+	}
+	spin_unlock(&xprt->queue_lock);
+}
+
 /**
  * xprt_prepare_transmit - reserve the transport before sending a request
  * @task: RPC task about to send a request
@@ -1019,9 +1068,8 @@ bool xprt_prepare_transmit(struct rpc_task *task)
 			task->tk_status = req->rq_reply_bytes_recvd;
 			goto out_unlock;
 		}
-		if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT)
-		    && xprt_connected(xprt)
-		    && req->rq_connect_cookie == xprt->connect_cookie) {
+		if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
+		    !xprt_request_retransmit_after_disconnect(task)) {
 			xprt->ops->set_retrans_timeout(task);
 			rpc_sleep_on(&xprt->pending, task, xprt_timer);
 			goto out_unlock;
@@ -1082,8 +1130,6 @@ void xprt_transmit(struct rpc_task *task)
 	task->tk_flags |= RPC_TASK_SENT;
 	spin_lock_bh(&xprt->transport_lock);
 
-	xprt->ops->set_retrans_timeout(task);
-
 	xprt->stat.sends++;
 	xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
 	xprt->stat.bklog_u += xprt->backlog.qlen;
@@ -1092,22 +1138,6 @@ void xprt_transmit(struct rpc_task *task)
 	spin_unlock_bh(&xprt->transport_lock);
 
 	req->rq_connect_cookie = connect_cookie;
-	if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
-		/*
-		 * Sleep on the pending queue if we're expecting a reply.
-		 * The spinlock ensures atomicity between the test of
-		 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
-		 */
-		spin_lock(&xprt->queue_lock);
-		if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
-			rpc_sleep_on(&xprt->pending, task, xprt_timer);
-			/* Wake up immediately if the connection was dropped */
-			if (!xprt_connected(xprt))
-				rpc_wake_up_queued_task_set_status(&xprt->pending,
-						task, -ENOTCONN);
-		}
-		spin_unlock(&xprt->queue_lock);
-	}
 }
 
 static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 16/27] SUNRPC: Minor cleanup for call_transmit()
  2018-09-03 15:29                             ` [PATCH 15/27] SUNRPC: Refactor xprt_transmit() to remove wait for reply code Trond Myklebust
@ 2018-09-03 15:29                               ` Trond Myklebust
  2018-09-03 15:29                                 ` [PATCH 17/27] SUNRPC: Distinguish between the slot allocation list and receive queue Trond Myklebust
  0 siblings, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 net/sunrpc/clnt.c | 33 +++++++++++++++------------------
 1 file changed, 15 insertions(+), 18 deletions(-)

diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index cf09aab11014..5fbd9875544e 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1946,9 +1946,7 @@ call_transmit(struct rpc_task *task)
 
 	dprint_status(task);
 
-	task->tk_action = call_status;
-	if (task->tk_status < 0)
-		return;
+	task->tk_action = call_transmit_status;
 	/* Encode here so that rpcsec_gss can use correct sequence number. */
 	if (rpc_task_need_encode(task)) {
 		rpc_xdr_encode(task);
@@ -1969,9 +1967,8 @@ call_transmit(struct rpc_task *task)
 
 	if (!xprt_prepare_transmit(task))
 		return;
-	task->tk_action = call_transmit_status;
 	xprt_transmit(task);
-	if (task->tk_status < 0) {
+	if (task->tk_status < 0)
 		return;
 	if (is_retrans)
 		task->tk_client->cl_stats->rpcretrans++;
@@ -1996,19 +1993,28 @@ call_transmit_status(struct rpc_task *task)
 	}
 
 	switch (task->tk_status) {
-	case -EAGAIN:
-	case -ENOBUFS:
-		break;
 	default:
 		dprint_status(task);
 		xprt_end_transmit(task);
 		break;
+	case -EBADMSG:
+		clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
+		task->tk_action = call_transmit;
+		xprt_end_transmit(task);
+		break;
 		/*
 		 * Special cases: if we've been waiting on the
 		 * socket's write_space() callback, or if the
 		 * socket just returned a connection error,
 		 * then hold onto the transport lock.
 		 */
+	case -ENOBUFS:
+		rpc_delay(task, HZ>>2);
+		/* fall through */
+	case -EAGAIN:
+		task->tk_action = call_transmit;
+		task->tk_status = 0;
+		break;
 	case -ECONNREFUSED:
 	case -EHOSTDOWN:
 	case -ENETDOWN:
@@ -2163,22 +2169,13 @@ call_status(struct rpc_task *task)
 		/* fall through */
 	case -EPIPE:
 	case -ENOTCONN:
-		task->tk_action = call_bind;
-		break;
-	case -ENOBUFS:
-		rpc_delay(task, HZ>>2);
-		/* fall through */
 	case -EAGAIN:
-		task->tk_action = call_transmit;
+		task->tk_action = call_bind;
 		break;
 	case -EIO:
 		/* shutdown or soft timeout */
 		rpc_exit(task, status);
 		break;
-	case -EBADMSG:
-		clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
-		task->tk_action = call_transmit;
-		break;
 	default:
 		if (clnt->cl_chatty)
 			printk("%s: RPC call returned error %d\n",
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 17/27] SUNRPC: Distinguish between the slot allocation list and receive queue
  2018-09-03 15:29                               ` [PATCH 16/27] SUNRPC: Minor cleanup for call_transmit() Trond Myklebust
@ 2018-09-03 15:29                                 ` Trond Myklebust
  2018-09-03 15:29                                   ` [PATCH 18/27] NFS: Add a transmission queue for RPC requests Trond Myklebust
  0 siblings, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

When storing a struct rpc_rqst on the slot allocation list, we currently
use the same field 'rq_list' as we use to store the request on the
receive queue. Since the structure is never on both lists at the same
time, this is OK.
However, for clarity, let's make that a union with different names for
the different lists so that we can more easily distinguish between
the two states.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 include/linux/sunrpc/xprt.h       |  9 +++++++--
 net/sunrpc/backchannel_rqst.c     |  2 +-
 net/sunrpc/xprt.c                 | 16 ++++++++--------
 net/sunrpc/xprtrdma/backchannel.c |  2 +-
 4 files changed, 17 insertions(+), 12 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 4fa2af087cff..9cec2d0811f2 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -82,7 +82,11 @@ struct rpc_rqst {
 	struct page		**rq_enc_pages;	/* scratch pages for use by
 						   gss privacy code */
 	void (*rq_release_snd_buf)(struct rpc_rqst *); /* release rq_enc_pages */
-	struct list_head	rq_list;
+
+	union {
+		struct list_head	rq_list;	/* Slot allocation list */
+		struct list_head	rq_recv;	/* Receive queue */
+	};
 
 	void			*rq_buffer;	/* Call XDR encode buffer */
 	size_t			rq_callsize;
@@ -249,7 +253,8 @@ struct rpc_xprt {
 	struct list_head	bc_pa_list;	/* List of preallocated
 						 * backchannel rpc_rqst's */
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
-	struct list_head	recv;
+
+	struct list_head	recv_queue;	/* Receive queue */
 
 	struct {
 		unsigned long		bind_count,	/* total number of binds */
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index 3c15a99b9700..92e9ad30ec2f 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -91,7 +91,7 @@ struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt, gfp_t gfp_flags)
 		return NULL;
 
 	req->rq_xprt = xprt;
-	INIT_LIST_HEAD(&req->rq_list);
+	INIT_LIST_HEAD(&req->rq_recv);
 	INIT_LIST_HEAD(&req->rq_bc_list);
 
 	/* Preallocate one XDR receive buffer */
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index aefa23c90a2b..0184a6aed7e3 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -708,7 +708,7 @@ static void
 xprt_schedule_autodisconnect(struct rpc_xprt *xprt)
 	__must_hold(&xprt->transport_lock)
 {
-	if (list_empty(&xprt->recv) && xprt_has_timer(xprt))
+	if (list_empty(&xprt->recv_queue) && xprt_has_timer(xprt))
 		mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout);
 }
 
@@ -718,7 +718,7 @@ xprt_init_autodisconnect(struct timer_list *t)
 	struct rpc_xprt *xprt = from_timer(xprt, t, timer);
 
 	spin_lock(&xprt->transport_lock);
-	if (!list_empty(&xprt->recv))
+	if (!list_empty(&xprt->recv_queue))
 		goto out_abort;
 	/* Reset xprt->last_used to avoid connect/autodisconnect cycling */
 	xprt->last_used = jiffies;
@@ -848,7 +848,7 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
 {
 	struct rpc_rqst *entry;
 
-	list_for_each_entry(entry, &xprt->recv, rq_list)
+	list_for_each_entry(entry, &xprt->recv_queue, rq_recv)
 		if (entry->rq_xid == xid) {
 			trace_xprt_lookup_rqst(xprt, xid, 0);
 			entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime);
@@ -919,7 +919,7 @@ xprt_request_enqueue_receive(struct rpc_task *task)
 	struct rpc_xprt *xprt = req->rq_xprt;
 
 	spin_lock(&xprt->queue_lock);
-	if (xprt_request_data_received(task) || !list_empty(&req->rq_list)) {
+	if (xprt_request_data_received(task) || !list_empty(&req->rq_recv)) {
 		spin_unlock(&xprt->queue_lock);
 		return;
 	}
@@ -929,7 +929,7 @@ xprt_request_enqueue_receive(struct rpc_task *task)
 			sizeof(req->rq_private_buf));
 
 	/* Add request to the receive list */
-	list_add_tail(&req->rq_list, &xprt->recv);
+	list_add_tail(&req->rq_recv, &xprt->recv_queue);
 	set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
 	spin_unlock(&xprt->queue_lock);
 
@@ -948,7 +948,7 @@ static void
 xprt_request_dequeue_receive_locked(struct rpc_task *task)
 {
 	clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
-	list_del_init(&task->tk_rqstp->rq_list);
+	list_del_init(&task->tk_rqstp->rq_recv);
 }
 
 /**
@@ -1337,7 +1337,7 @@ xprt_request_init(struct rpc_task *task)
 	struct rpc_xprt *xprt = task->tk_xprt;
 	struct rpc_rqst	*req = task->tk_rqstp;
 
-	INIT_LIST_HEAD(&req->rq_list);
+	INIT_LIST_HEAD(&req->rq_recv);
 	req->rq_timeout = task->tk_client->cl_timeout->to_initval;
 	req->rq_task	= task;
 	req->rq_xprt    = xprt;
@@ -1471,7 +1471,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)
 	spin_lock_init(&xprt->queue_lock);
 
 	INIT_LIST_HEAD(&xprt->free);
-	INIT_LIST_HEAD(&xprt->recv);
+	INIT_LIST_HEAD(&xprt->recv_queue);
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 	spin_lock_init(&xprt->bc_pa_lock);
 	INIT_LIST_HEAD(&xprt->bc_pa_list);
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 90adeff4c06b..40c7c7306a99 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -51,7 +51,7 @@ static int rpcrdma_bc_setup_reqs(struct rpcrdma_xprt *r_xprt,
 		rqst = &req->rl_slot;
 
 		rqst->rq_xprt = xprt;
-		INIT_LIST_HEAD(&rqst->rq_list);
+		INIT_LIST_HEAD(&rqst->rq_recv);
 		INIT_LIST_HEAD(&rqst->rq_bc_list);
 		__set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
 		spin_lock_bh(&xprt->bc_pa_lock);
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 18/27] NFS: Add a transmission queue for RPC requests
  2018-09-03 15:29                                 ` [PATCH 17/27] SUNRPC: Distinguish between the slot allocation list and receive queue Trond Myklebust
@ 2018-09-03 15:29                                   ` Trond Myklebust
  2018-09-03 15:29                                     ` [PATCH 19/27] SUNRPC: Refactor RPC call encoding Trond Myklebust
  0 siblings, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 include/linux/sunrpc/xprt.h       |  6 +++
 net/sunrpc/backchannel_rqst.c     |  1 +
 net/sunrpc/clnt.c                 |  6 +--
 net/sunrpc/xprt.c                 | 74 +++++++++++++++++++++++++++----
 net/sunrpc/xprtrdma/backchannel.c |  1 +
 5 files changed, 77 insertions(+), 11 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 9cec2d0811f2..81a6c2c8dfc7 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -88,6 +88,8 @@ struct rpc_rqst {
 		struct list_head	rq_recv;	/* Receive queue */
 	};
 
+	struct list_head	rq_xmit;	/* Send queue */
+
 	void			*rq_buffer;	/* Call XDR encode buffer */
 	size_t			rq_callsize;
 	void			*rq_rbuffer;	/* Reply XDR decode buffer */
@@ -242,6 +244,9 @@ struct rpc_xprt {
 	spinlock_t		queue_lock;	/* send/receive queue lock */
 	u32			xid;		/* Next XID value to use */
 	struct rpc_task *	snd_task;	/* Task blocked in send */
+
+	struct list_head	xmit_queue;	/* Send queue */
+
 	struct svc_xprt		*bc_xprt;	/* NFSv4.1 backchannel */
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 	struct svc_serv		*bc_serv;       /* The RPC service which will */
@@ -339,6 +344,7 @@ void			xprt_free_slot(struct rpc_xprt *xprt,
 				       struct rpc_rqst *req);
 void			xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task);
 bool			xprt_prepare_transmit(struct rpc_task *task);
+void			xprt_request_enqueue_transmit(struct rpc_task *task);
 void			xprt_request_enqueue_receive(struct rpc_task *task);
 void			xprt_request_wait_receive(struct rpc_task *task);
 void			xprt_transmit(struct rpc_task *task);
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index 92e9ad30ec2f..39b394b7dae3 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -92,6 +92,7 @@ struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt, gfp_t gfp_flags)
 
 	req->rq_xprt = xprt;
 	INIT_LIST_HEAD(&req->rq_recv);
+	INIT_LIST_HEAD(&req->rq_xmit);
 	INIT_LIST_HEAD(&req->rq_bc_list);
 
 	/* Preallocate one XDR receive buffer */
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 5fbd9875544e..a817f70d6192 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1759,8 +1759,6 @@ rpc_xdr_encode(struct rpc_task *task)
 
 	task->tk_status = rpcauth_wrap_req(task, encode, req, p,
 			task->tk_msg.rpc_argp);
-	if (task->tk_status == 0)
-		set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
 }
 
 /*
@@ -1959,11 +1957,13 @@ call_transmit(struct rpc_task *task)
 				rpc_exit(task, task->tk_status);
 			return;
 		}
+		set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
 	}
 
 	/* Add task to reply queue before transmission to avoid races */
 	if (rpc_reply_expected(task))
 		xprt_request_enqueue_receive(task);
+	xprt_request_enqueue_transmit(task);
 
 	if (!xprt_prepare_transmit(task))
 		return;
@@ -1998,7 +1998,6 @@ call_transmit_status(struct rpc_task *task)
 		xprt_end_transmit(task);
 		break;
 	case -EBADMSG:
-		clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
 		task->tk_action = call_transmit;
 		xprt_end_transmit(task);
 		break;
@@ -2049,6 +2048,7 @@ call_bc_transmit(struct rpc_task *task)
 {
 	struct rpc_rqst *req = task->tk_rqstp;
 
+	xprt_request_enqueue_transmit(task);
 	if (!xprt_prepare_transmit(task))
 		goto out_retry;
 
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 0184a6aed7e3..defc6167570a 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1049,6 +1049,64 @@ void xprt_request_wait_receive(struct rpc_task *task)
 	spin_unlock(&xprt->queue_lock);
 }
 
+static bool
+xprt_request_need_transmit(struct rpc_task *task)
+{
+	return !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
+		xprt_request_retransmit_after_disconnect(task);
+}
+
+/**
+ * xprt_request_enqueue_transmit - queue a task for transmission
+ * @task: pointer to rpc_task
+ *
+ * Add a task to the transmission queue.
+ */
+void
+xprt_request_enqueue_transmit(struct rpc_task *task)
+{
+	struct rpc_rqst *req = task->tk_rqstp;
+	struct rpc_xprt *xprt = req->rq_xprt;
+
+	spin_lock(&xprt->queue_lock);
+	if (list_empty(&req->rq_xmit) && xprt_request_need_transmit(task) &&
+	    test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
+		list_add_tail(&req->rq_xmit, &xprt->xmit_queue);
+	spin_unlock(&xprt->queue_lock);
+}
+
+/**
+ * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue
+ * @task: pointer to rpc_task
+ *
+ * Remove a task from the transmission queue
+ * Caller must hold xprt->queue_lock
+ */
+static void
+xprt_request_dequeue_transmit_locked(struct rpc_task *task)
+{
+	xprt_task_clear_bytes_sent(task);
+	clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
+	list_del_init(&task->tk_rqstp->rq_xmit);
+}
+
+/**
+ * xprt_request_dequeue_transmit - remove a task from the transmission queue
+ * @task: pointer to rpc_task
+ *
+ * Remove a task from the transmission queue
+ */
+static void
+xprt_request_dequeue_transmit(struct rpc_task *task)
+{
+	struct rpc_rqst *req = task->tk_rqstp;
+	struct rpc_xprt *xprt = req->rq_xprt;
+
+	spin_lock(&xprt->queue_lock);
+	xprt_request_dequeue_transmit_locked(task);
+	spin_unlock(&xprt->queue_lock);
+}
+
 /**
  * xprt_prepare_transmit - reserve the transport before sending a request
  * @task: RPC task about to send a request
@@ -1068,12 +1126,8 @@ bool xprt_prepare_transmit(struct rpc_task *task)
 			task->tk_status = req->rq_reply_bytes_recvd;
 			goto out_unlock;
 		}
-		if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
-		    !xprt_request_retransmit_after_disconnect(task)) {
-			xprt->ops->set_retrans_timeout(task);
-			rpc_sleep_on(&xprt->pending, task, xprt_timer);
+		if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
 			goto out_unlock;
-		}
 	}
 	if (!xprt->ops->reserve_xprt(xprt, task)) {
 		task->tk_status = -EAGAIN;
@@ -1107,11 +1161,11 @@ void xprt_transmit(struct rpc_task *task)
 
 	if (!req->rq_bytes_sent) {
 		if (xprt_request_data_received(task))
-			return;
+			goto out_dequeue;
 		/* Verify that our message lies in the RPCSEC_GSS window */
 		if (rpcauth_xmit_need_reencode(task)) {
 			task->tk_status = -EBADMSG;
-			return;
+			goto out_dequeue;
 		}
 	}
 
@@ -1126,7 +1180,6 @@ void xprt_transmit(struct rpc_task *task)
 	xprt_inject_disconnect(xprt);
 
 	dprintk("RPC: %5u xmit complete\n", task->tk_pid);
-	clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
 	task->tk_flags |= RPC_TASK_SENT;
 	spin_lock_bh(&xprt->transport_lock);
 
@@ -1138,6 +1191,8 @@ void xprt_transmit(struct rpc_task *task)
 	spin_unlock_bh(&xprt->transport_lock);
 
 	req->rq_connect_cookie = connect_cookie;
+out_dequeue:
+	xprt_request_dequeue_transmit(task);
 }
 
 static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
@@ -1338,6 +1393,7 @@ xprt_request_init(struct rpc_task *task)
 	struct rpc_rqst	*req = task->tk_rqstp;
 
 	INIT_LIST_HEAD(&req->rq_recv);
+	INIT_LIST_HEAD(&req->rq_xmit);
 	req->rq_timeout = task->tk_client->cl_timeout->to_initval;
 	req->rq_task	= task;
 	req->rq_xprt    = xprt;
@@ -1433,6 +1489,7 @@ void xprt_release(struct rpc_task *task)
 		rpc_count_iostats(task, task->tk_client->cl_metrics);
 	spin_lock(&xprt->queue_lock);
 	xprt_request_dequeue_receive_locked(task);
+	xprt_request_dequeue_transmit_locked(task);
 	while (xprt_is_pinned_rqst(req)) {
 		spin_unlock(&xprt->queue_lock);
 		xprt_wait_on_pinned_rqst(req);
@@ -1472,6 +1529,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)
 
 	INIT_LIST_HEAD(&xprt->free);
 	INIT_LIST_HEAD(&xprt->recv_queue);
+	INIT_LIST_HEAD(&xprt->xmit_queue);
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 	spin_lock_init(&xprt->bc_pa_lock);
 	INIT_LIST_HEAD(&xprt->bc_pa_list);
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 40c7c7306a99..fc01fdabbbce 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -52,6 +52,7 @@ static int rpcrdma_bc_setup_reqs(struct rpcrdma_xprt *r_xprt,
 
 		rqst->rq_xprt = xprt;
 		INIT_LIST_HEAD(&rqst->rq_recv);
+		INIT_LIST_HEAD(&rqst->rq_xmit);
 		INIT_LIST_HEAD(&rqst->rq_bc_list);
 		__set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
 		spin_lock_bh(&xprt->bc_pa_lock);
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 19/27] SUNRPC: Refactor RPC call encoding
  2018-09-03 15:29                                   ` [PATCH 18/27] NFS: Add a transmission queue for RPC requests Trond Myklebust
@ 2018-09-03 15:29                                     ` Trond Myklebust
  2018-09-03 15:29                                       ` [PATCH 20/27] SUNRPC: Treat the task and request as separate in the xprt_ops->send_request() Trond Myklebust
  0 siblings, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

Move the call encoding so that it occurs before the transport connection
etc.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 net/sunrpc/clnt.c | 67 +++++++++++++++++++++++++++--------------------
 1 file changed, 39 insertions(+), 28 deletions(-)

diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index a817f70d6192..497a30762a6d 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -61,6 +61,7 @@ static void	call_start(struct rpc_task *task);
 static void	call_reserve(struct rpc_task *task);
 static void	call_reserveresult(struct rpc_task *task);
 static void	call_allocate(struct rpc_task *task);
+static void	call_encode(struct rpc_task *task);
 static void	call_decode(struct rpc_task *task);
 static void	call_bind(struct rpc_task *task);
 static void	call_bind_status(struct rpc_task *task);
@@ -1680,7 +1681,7 @@ call_allocate(struct rpc_task *task)
 	dprint_status(task);
 
 	task->tk_status = 0;
-	task->tk_action = call_bind;
+	task->tk_action = call_encode;
 
 	if (req->rq_buffer)
 		return;
@@ -1727,9 +1728,6 @@ rpc_task_need_encode(struct rpc_task *task)
 	return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0;
 }
 
-/*
- * 3.	Encode arguments of an RPC call
- */
 static void
 rpc_xdr_encode(struct rpc_task *task)
 {
@@ -1745,6 +1743,7 @@ rpc_xdr_encode(struct rpc_task *task)
 	xdr_buf_init(&req->rq_rcv_buf,
 		     req->rq_rbuffer,
 		     req->rq_rcvsize);
+	req->rq_bytes_sent = 0;
 
 	p = rpc_encode_header(task);
 	if (p == NULL) {
@@ -1761,6 +1760,34 @@ rpc_xdr_encode(struct rpc_task *task)
 			task->tk_msg.rpc_argp);
 }
 
+/*
+ * 3.	Encode arguments of an RPC call
+ */
+static void
+call_encode(struct rpc_task *task)
+{
+	if (!rpc_task_need_encode(task))
+		goto out;
+	/* Encode here so that rpcsec_gss can use correct sequence number. */
+	rpc_xdr_encode(task);
+	/* Did the encode result in an error condition? */
+	if (task->tk_status != 0) {
+		/* Was the error nonfatal? */
+		if (task->tk_status == -EAGAIN)
+			rpc_delay(task, HZ >> 4);
+		else
+			rpc_exit(task, task->tk_status);
+		return;
+	}
+
+	/* Add task to reply queue before transmission to avoid races */
+	if (rpc_reply_expected(task))
+		xprt_request_enqueue_receive(task);
+	set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
+out:
+	task->tk_action = call_bind;
+}
+
 /*
  * 4.	Get the server port number if not yet set
  */
@@ -1945,25 +1972,9 @@ call_transmit(struct rpc_task *task)
 	dprint_status(task);
 
 	task->tk_action = call_transmit_status;
-	/* Encode here so that rpcsec_gss can use correct sequence number. */
-	if (rpc_task_need_encode(task)) {
-		rpc_xdr_encode(task);
-		/* Did the encode result in an error condition? */
-		if (task->tk_status != 0) {
-			/* Was the error nonfatal? */
-			if (task->tk_status == -EAGAIN)
-				rpc_delay(task, HZ >> 4);
-			else
-				rpc_exit(task, task->tk_status);
-			return;
-		}
-		set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
-	}
-
-	/* Add task to reply queue before transmission to avoid races */
-	if (rpc_reply_expected(task))
-		xprt_request_enqueue_receive(task);
 	xprt_request_enqueue_transmit(task);
+	if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
+		return;
 
 	if (!xprt_prepare_transmit(task))
 		return;
@@ -1998,8 +2009,8 @@ call_transmit_status(struct rpc_task *task)
 		xprt_end_transmit(task);
 		break;
 	case -EBADMSG:
-		task->tk_action = call_transmit;
 		xprt_end_transmit(task);
+		task->tk_action = call_encode;
 		break;
 		/*
 		 * Special cases: if we've been waiting on the
@@ -2170,7 +2181,7 @@ call_status(struct rpc_task *task)
 	case -EPIPE:
 	case -ENOTCONN:
 	case -EAGAIN:
-		task->tk_action = call_bind;
+		task->tk_action = call_encode;
 		break;
 	case -EIO:
 		/* shutdown or soft timeout */
@@ -2235,7 +2246,7 @@ call_timeout(struct rpc_task *task)
 	rpcauth_invalcred(task);
 
 retry:
-	task->tk_action = call_bind;
+	task->tk_action = call_encode;
 	task->tk_status = 0;
 }
 
@@ -2279,7 +2290,7 @@ call_decode(struct rpc_task *task)
 
 	if (req->rq_rcv_buf.len < 12) {
 		if (!RPC_IS_SOFT(task)) {
-			task->tk_action = call_bind;
+			task->tk_action = call_encode;
 			goto out_retry;
 		}
 		dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
@@ -2410,7 +2421,7 @@ rpc_verify_header(struct rpc_task *task)
 			task->tk_garb_retry--;
 			dprintk("RPC: %5u %s: retry garbled creds\n",
 					task->tk_pid, __func__);
-			task->tk_action = call_bind;
+			task->tk_action = call_encode;
 			goto out_retry;
 		case RPC_AUTH_TOOWEAK:
 			printk(KERN_NOTICE "RPC: server %s requires stronger "
@@ -2479,7 +2490,7 @@ rpc_verify_header(struct rpc_task *task)
 		task->tk_garb_retry--;
 		dprintk("RPC: %5u %s: retrying\n",
 				task->tk_pid, __func__);
-		task->tk_action = call_bind;
+		task->tk_action = call_encode;
 out_retry:
 		return ERR_PTR(-EAGAIN);
 	}
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 20/27] SUNRPC: Treat the task and request as separate in the xprt_ops->send_request()
  2018-09-03 15:29                                     ` [PATCH 19/27] SUNRPC: Refactor RPC call encoding Trond Myklebust
@ 2018-09-03 15:29                                       ` Trond Myklebust
  2018-09-03 15:29                                         ` [PATCH 21/27] SUNRPC: Don't reset the request 'bytes_sent' counter when releasing XPRT_LOCK Trond Myklebust
  2018-09-03 17:28                                         ` [PATCH 20/27] SUNRPC: Treat the task and request as separate in the xprt_ops->send_request() Chuck Lever
  0 siblings, 2 replies; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

When we shift to using the transmit queue, then the task that holds the
write lock will not necessarily be the same as the one being transmitted.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 include/linux/sunrpc/xprt.h                |  2 +-
 net/sunrpc/xprt.c                          |  2 +-
 net/sunrpc/xprtrdma/svc_rdma_backchannel.c |  3 +--
 net/sunrpc/xprtrdma/transport.c            |  5 ++--
 net/sunrpc/xprtsock.c                      | 27 +++++++++++-----------
 5 files changed, 18 insertions(+), 21 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 81a6c2c8dfc7..6d91acfe0644 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -140,7 +140,7 @@ struct rpc_xprt_ops {
 	void		(*connect)(struct rpc_xprt *xprt, struct rpc_task *task);
 	int		(*buf_alloc)(struct rpc_task *task);
 	void		(*buf_free)(struct rpc_task *task);
-	int		(*send_request)(struct rpc_task *task);
+	int		(*send_request)(struct rpc_rqst *req, struct rpc_task *task);
 	void		(*set_retrans_timeout)(struct rpc_task *task);
 	void		(*timer)(struct rpc_xprt *xprt, struct rpc_task *task);
 	void		(*release_request)(struct rpc_task *task);
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index defc6167570a..3efcb69af526 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1170,7 +1170,7 @@ void xprt_transmit(struct rpc_task *task)
 	}
 
 	connect_cookie = xprt->connect_cookie;
-	status = xprt->ops->send_request(task);
+	status = xprt->ops->send_request(req, task);
 	trace_xprt_transmit(xprt, req->rq_xid, status);
 	if (status != 0) {
 		task->tk_status = status;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 09b12b7568fe..d1618c70edb4 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -215,9 +215,8 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
  * connection.
  */
 static int
-xprt_rdma_bc_send_request(struct rpc_task *task)
+xprt_rdma_bc_send_request(struct rpc_rqst *rqst, struct rpc_task *task)
 {
-	struct rpc_rqst *rqst = task->tk_rqstp;
 	struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
 	struct svcxprt_rdma *rdma;
 	int ret;
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 143ce2579ba9..fa684bf4d090 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -706,9 +706,8 @@ xprt_rdma_free(struct rpc_task *task)
  *		sent. Do not try to send this message again.
  */
 static int
-xprt_rdma_send_request(struct rpc_task *task)
+xprt_rdma_send_request(struct rpc_rqst *rqst, struct rpc_task *task)
 {
-	struct rpc_rqst *rqst = task->tk_rqstp;
 	struct rpc_xprt *xprt = rqst->rq_xprt;
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
@@ -741,7 +740,7 @@ xprt_rdma_send_request(struct rpc_task *task)
 	/* An RPC with no reply will throw off credit accounting,
 	 * so drop the connection to reset the credit grant.
 	 */
-	if (!rpc_reply_expected(task))
+	if (!rpc_reply_expected(rqst->rq_task))
 		goto drop_connection;
 	return 0;
 
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 8d6404259ff9..b8143eded4af 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -449,12 +449,12 @@ static void xs_nospace_callback(struct rpc_task *task)
 
 /**
  * xs_nospace - place task on wait queue if transmit was incomplete
+ * @req: pointer to RPC request
  * @task: task to put to sleep
  *
  */
-static int xs_nospace(struct rpc_task *task)
+static int xs_nospace(struct rpc_rqst *req, struct rpc_task *task)
 {
-	struct rpc_rqst *req = task->tk_rqstp;
 	struct rpc_xprt *xprt = req->rq_xprt;
 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 	struct sock *sk = transport->inet;
@@ -513,6 +513,7 @@ static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)
 
 /**
  * xs_local_send_request - write an RPC request to an AF_LOCAL socket
+ * @req: pointer to RPC request
  * @task: RPC task that manages the state of an RPC request
  *
  * Return values:
@@ -522,9 +523,8 @@ static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)
  * ENOTCONN:	Caller needs to invoke connect logic then call again
  *    other:	Some other error occured, the request was not sent
  */
-static int xs_local_send_request(struct rpc_task *task)
+static int xs_local_send_request(struct rpc_rqst *req, struct rpc_task *task)
 {
-	struct rpc_rqst *req = task->tk_rqstp;
 	struct rpc_xprt *xprt = req->rq_xprt;
 	struct sock_xprt *transport =
 				container_of(xprt, struct sock_xprt, xprt);
@@ -569,7 +569,7 @@ static int xs_local_send_request(struct rpc_task *task)
 	case -ENOBUFS:
 		break;
 	case -EAGAIN:
-		status = xs_nospace(task);
+		status = xs_nospace(req, task);
 		break;
 	default:
 		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
@@ -585,6 +585,7 @@ static int xs_local_send_request(struct rpc_task *task)
 
 /**
  * xs_udp_send_request - write an RPC request to a UDP socket
+ * @req: pointer to RPC request
  * @task: address of RPC task that manages the state of an RPC request
  *
  * Return values:
@@ -594,9 +595,8 @@ static int xs_local_send_request(struct rpc_task *task)
  * ENOTCONN:	Caller needs to invoke connect logic then call again
  *    other:	Some other error occurred, the request was not sent
  */
-static int xs_udp_send_request(struct rpc_task *task)
+static int xs_udp_send_request(struct rpc_rqst *req, struct rpc_task *task)
 {
-	struct rpc_rqst *req = task->tk_rqstp;
 	struct rpc_xprt *xprt = req->rq_xprt;
 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 	struct xdr_buf *xdr = &req->rq_snd_buf;
@@ -638,7 +638,7 @@ static int xs_udp_send_request(struct rpc_task *task)
 		/* Should we call xs_close() here? */
 		break;
 	case -EAGAIN:
-		status = xs_nospace(task);
+		status = xs_nospace(req, task);
 		break;
 	case -ENETUNREACH:
 	case -ENOBUFS:
@@ -658,6 +658,7 @@ static int xs_udp_send_request(struct rpc_task *task)
 
 /**
  * xs_tcp_send_request - write an RPC request to a TCP socket
+ * @req: pointer to RPC request
  * @task: address of RPC task that manages the state of an RPC request
  *
  * Return values:
@@ -670,9 +671,8 @@ static int xs_udp_send_request(struct rpc_task *task)
  * XXX: In the case of soft timeouts, should we eventually give up
  *	if sendmsg is not able to make progress?
  */
-static int xs_tcp_send_request(struct rpc_task *task)
+static int xs_tcp_send_request(struct rpc_rqst *req, struct rpc_task *task)
 {
-	struct rpc_rqst *req = task->tk_rqstp;
 	struct rpc_xprt *xprt = req->rq_xprt;
 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 	struct xdr_buf *xdr = &req->rq_snd_buf;
@@ -697,7 +697,7 @@ static int xs_tcp_send_request(struct rpc_task *task)
 	 * completes while the socket holds a reference to the pages,
 	 * then we may end up resending corrupted data.
 	 */
-	if (task->tk_flags & RPC_TASK_SENT)
+	if (req->rq_task->tk_flags & RPC_TASK_SENT)
 		zerocopy = false;
 
 	if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state))
@@ -761,7 +761,7 @@ static int xs_tcp_send_request(struct rpc_task *task)
 		/* Should we call xs_close() here? */
 		break;
 	case -EAGAIN:
-		status = xs_nospace(task);
+		status = xs_nospace(req, task);
 		break;
 	case -ECONNRESET:
 	case -ECONNREFUSED:
@@ -2706,9 +2706,8 @@ static int bc_sendto(struct rpc_rqst *req)
 /*
  * The send routine. Borrows from svc_send
  */
-static int bc_send_request(struct rpc_task *task)
+static int bc_send_request(struct rpc_rqst *req, struct rpc_task *task)
 {
-	struct rpc_rqst *req = task->tk_rqstp;
 	struct svc_xprt	*xprt;
 	int len;
 
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 21/27] SUNRPC: Don't reset the request 'bytes_sent' counter when releasing XPRT_LOCK
  2018-09-03 15:29                                       ` [PATCH 20/27] SUNRPC: Treat the task and request as separate in the xprt_ops->send_request() Trond Myklebust
@ 2018-09-03 15:29                                         ` Trond Myklebust
  2018-09-03 15:29                                           ` [PATCH 22/27] SUNRPC: Simplify xprt_prepare_transmit() Trond Myklebust
  2018-09-03 17:28                                         ` [PATCH 20/27] SUNRPC: Treat the task and request as separate in the xprt_ops->send_request() Chuck Lever
  1 sibling, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

If the request is still on the queue, this will be incorrect behaviour.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 net/sunrpc/clnt.c |  3 ---
 net/sunrpc/xprt.c | 14 --------------
 2 files changed, 17 deletions(-)

diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 497a30762a6d..585a82dfaf4d 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -2138,9 +2138,6 @@ call_status(struct rpc_task *task)
 	if (!task->tk_msg.rpc_proc->p_proc)
 		trace_xprt_ping(task->tk_xprt, task->tk_status);
 
-	if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
-		task->tk_status = req->rq_reply_bytes_recvd;
-
 	dprint_status(task);
 
 	status = task->tk_status;
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 3efcb69af526..5efa9eddf769 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -332,15 +332,6 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
 	xprt_clear_locked(xprt);
 }
 
-static void xprt_task_clear_bytes_sent(struct rpc_task *task)
-{
-	if (task != NULL) {
-		struct rpc_rqst *req = task->tk_rqstp;
-		if (req != NULL)
-			req->rq_bytes_sent = 0;
-	}
-}
-
 /**
  * xprt_release_xprt - allow other requests to use a transport
  * @xprt: transport with other tasks potentially waiting
@@ -351,7 +342,6 @@ static void xprt_task_clear_bytes_sent(struct rpc_task *task)
 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 {
 	if (xprt->snd_task == task) {
-		xprt_task_clear_bytes_sent(task);
 		xprt_clear_locked(xprt);
 		__xprt_lock_write_next(xprt);
 	}
@@ -369,7 +359,6 @@ EXPORT_SYMBOL_GPL(xprt_release_xprt);
 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 {
 	if (xprt->snd_task == task) {
-		xprt_task_clear_bytes_sent(task);
 		xprt_clear_locked(xprt);
 		__xprt_lock_write_next_cong(xprt);
 	}
@@ -742,7 +731,6 @@ bool xprt_lock_connect(struct rpc_xprt *xprt,
 		goto out;
 	if (xprt->snd_task != task)
 		goto out;
-	xprt_task_clear_bytes_sent(task);
 	xprt->snd_task = cookie;
 	ret = true;
 out:
@@ -788,7 +776,6 @@ void xprt_connect(struct rpc_task *task)
 		xprt->ops->close(xprt);
 
 	if (!xprt_connected(xprt)) {
-		task->tk_rqstp->rq_bytes_sent = 0;
 		task->tk_timeout = task->tk_rqstp->rq_timeout;
 		task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie;
 		rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
@@ -1085,7 +1072,6 @@ xprt_request_enqueue_transmit(struct rpc_task *task)
 static void
 xprt_request_dequeue_transmit_locked(struct rpc_task *task)
 {
-	xprt_task_clear_bytes_sent(task);
 	clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
 	list_del_init(&task->tk_rqstp->rq_xmit);
 }
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 22/27] SUNRPC: Simplify xprt_prepare_transmit()
  2018-09-03 15:29                                         ` [PATCH 21/27] SUNRPC: Don't reset the request 'bytes_sent' counter when releasing XPRT_LOCK Trond Myklebust
@ 2018-09-03 15:29                                           ` Trond Myklebust
  2018-09-03 15:29                                             ` [PATCH 23/27] SUNRPC: Move RPC retransmission stat counter to xprt_transmit() Trond Myklebust
  0 siblings, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

Remove the checks for whether or not we need to transmit, and whether
or not a reply has been received. Those are already handled in
call_transmit() itself.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 net/sunrpc/xprt.c | 21 +++------------------
 1 file changed, 3 insertions(+), 18 deletions(-)

diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 5efa9eddf769..95d15d4017f7 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1102,27 +1102,12 @@ bool xprt_prepare_transmit(struct rpc_task *task)
 {
 	struct rpc_rqst	*req = task->tk_rqstp;
 	struct rpc_xprt	*xprt = req->rq_xprt;
-	bool ret = false;
 
 	dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid);
 
-	spin_lock_bh(&xprt->transport_lock);
-	if (!req->rq_bytes_sent) {
-		if (req->rq_reply_bytes_recvd) {
-			task->tk_status = req->rq_reply_bytes_recvd;
-			goto out_unlock;
-		}
-		if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
-			goto out_unlock;
-	}
-	if (!xprt->ops->reserve_xprt(xprt, task)) {
-		task->tk_status = -EAGAIN;
-		goto out_unlock;
-	}
-	ret = true;
-out_unlock:
-	spin_unlock_bh(&xprt->transport_lock);
-	return ret;
+	if (!xprt_lock_write(xprt, task))
+		return false;
+	return true;
 }
 
 void xprt_end_transmit(struct rpc_task *task)
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 23/27] SUNRPC: Move RPC retransmission stat counter to xprt_transmit()
  2018-09-03 15:29                                           ` [PATCH 22/27] SUNRPC: Simplify xprt_prepare_transmit() Trond Myklebust
@ 2018-09-03 15:29                                             ` Trond Myklebust
  2018-09-03 15:29                                               ` [PATCH 24/27] SUNRPC: Fix up the back channel transmit Trond Myklebust
  2018-09-03 17:31                                               ` [PATCH 23/27] SUNRPC: Move RPC retransmission stat counter to xprt_transmit() Chuck Lever
  0 siblings, 2 replies; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 net/sunrpc/clnt.c | 6 ------
 net/sunrpc/xprt.c | 4 ++++
 2 files changed, 4 insertions(+), 6 deletions(-)

diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 585a82dfaf4d..fb40d1e9f636 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1967,8 +1967,6 @@ call_connect_status(struct rpc_task *task)
 static void
 call_transmit(struct rpc_task *task)
 {
-	int is_retrans = RPC_WAS_SENT(task);
-
 	dprint_status(task);
 
 	task->tk_action = call_transmit_status;
@@ -1979,10 +1977,6 @@ call_transmit(struct rpc_task *task)
 	if (!xprt_prepare_transmit(task))
 		return;
 	xprt_transmit(task);
-	if (task->tk_status < 0)
-		return;
-	if (is_retrans)
-		task->tk_client->cl_stats->rpcretrans++;
 }
 
 /*
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 95d15d4017f7..b85e2c4fa115 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1126,6 +1126,7 @@ void xprt_transmit(struct rpc_task *task)
 	struct rpc_rqst	*req = task->tk_rqstp;
 	struct rpc_xprt	*xprt = req->rq_xprt;
 	unsigned int connect_cookie;
+	int is_retrans = RPC_WAS_SENT(task);
 	int status;
 
 	dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
@@ -1148,6 +1149,9 @@ void xprt_transmit(struct rpc_task *task)
 		return;
 	}
 
+	if (is_retrans)
+		task->tk_client->cl_stats->rpcretrans++;
+
 	xprt_inject_disconnect(xprt);
 
 	dprintk("RPC: %5u xmit complete\n", task->tk_pid);
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 24/27] SUNRPC: Fix up the back channel transmit
  2018-09-03 15:29                                             ` [PATCH 23/27] SUNRPC: Move RPC retransmission stat counter to xprt_transmit() Trond Myklebust
@ 2018-09-03 15:29                                               ` Trond Myklebust
  2018-09-03 15:29                                                 ` [PATCH 25/27] SUNRPC: Allow calls to xprt_transmit() to drain the entire transmit queue Trond Myklebust
  2018-09-03 17:31                                               ` [PATCH 23/27] SUNRPC: Move RPC retransmission stat counter to xprt_transmit() Chuck Lever
  1 sibling, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

Fix up the back channel code to recognise that it has already been
transmitted, so does not need to be called again.
Also ensure that we set req->rq_task.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 net/sunrpc/clnt.c | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index fb40d1e9f636..586976c4c02a 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1150,6 +1150,7 @@ struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
 	 */
 	task = rpc_new_task(&task_setup_data);
 	task->tk_rqstp = req;
+	req->rq_task = task;
 
 	/*
 	 * Set up the xdr_buf length.
@@ -2054,6 +2055,9 @@ call_bc_transmit(struct rpc_task *task)
 	struct rpc_rqst *req = task->tk_rqstp;
 
 	xprt_request_enqueue_transmit(task);
+	if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
+		goto out_wakeup;
+
 	if (!xprt_prepare_transmit(task))
 		goto out_retry;
 
@@ -2108,6 +2112,7 @@ call_bc_transmit(struct rpc_task *task)
 			"error: %d\n", task->tk_status);
 		break;
 	}
+out_wakeup:
 	rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
 out_done:
 	task->tk_action = rpc_exit_task;
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 25/27] SUNRPC: Allow calls to xprt_transmit() to drain the entire transmit queue
  2018-09-03 15:29                                               ` [PATCH 24/27] SUNRPC: Fix up the back channel transmit Trond Myklebust
@ 2018-09-03 15:29                                                 ` Trond Myklebust
  2018-09-03 15:29                                                   ` [PATCH 26/27] SUNRPC: Queue the request for transmission immediately after encoding Trond Myklebust
  0 siblings, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

Rather than forcing each and every RPC task to grab the socket write
lock in order to send itself, we allow whichever task is holding the
write lock to attempt to drain the entire transmit queue.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 net/sunrpc/xprt.c | 82 +++++++++++++++++++++++++++++++++++++++--------
 1 file changed, 69 insertions(+), 13 deletions(-)

diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index b85e2c4fa115..1ce32e555c9b 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1116,15 +1116,20 @@ void xprt_end_transmit(struct rpc_task *task)
 }
 
 /**
- * xprt_transmit - send an RPC request on a transport
- * @task: controlling RPC task
+ * xprt_request_transmit - send an RPC request on a transport
+ * @req: pointer to request to transmit
+ * @snd_task: RPC task that owns the transport lock
  *
- * We have to copy the iovec because sendmsg fiddles with its contents.
+ * This performs the transmission of a single request.
+ * Note that if the request is not the same as snd_task, then it
+ * does need to be pinned.
+ * Returns '0' on success.
  */
-void xprt_transmit(struct rpc_task *task)
+static int
+xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
 {
-	struct rpc_rqst	*req = task->tk_rqstp;
-	struct rpc_xprt	*xprt = req->rq_xprt;
+	struct rpc_xprt *xprt = req->rq_xprt;
+	struct rpc_task *task = req->rq_task;
 	unsigned int connect_cookie;
 	int is_retrans = RPC_WAS_SENT(task);
 	int status;
@@ -1132,22 +1137,25 @@ void xprt_transmit(struct rpc_task *task)
 	dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
 
 	if (!req->rq_bytes_sent) {
-		if (xprt_request_data_received(task))
+		if (xprt_request_data_received(task)) {
+			status = 0;
 			goto out_dequeue;
+		}
 		/* Verify that our message lies in the RPCSEC_GSS window */
 		if (rpcauth_xmit_need_reencode(task)) {
-			task->tk_status = -EBADMSG;
+			status = -EBADMSG;
 			goto out_dequeue;
 		}
 	}
 
 	connect_cookie = xprt->connect_cookie;
-	status = xprt->ops->send_request(req, task);
+	status = xprt->ops->send_request(req, snd_task);
 	trace_xprt_transmit(xprt, req->rq_xid, status);
-	if (status != 0) {
-		task->tk_status = status;
-		return;
-	}
+	if (status != 0)
+		return status;
+
+	if (is_retrans)
+		task->tk_client->cl_stats->rpcretrans++;
 
 	if (is_retrans)
 		task->tk_client->cl_stats->rpcretrans++;
@@ -1168,6 +1176,54 @@ void xprt_transmit(struct rpc_task *task)
 	req->rq_connect_cookie = connect_cookie;
 out_dequeue:
 	xprt_request_dequeue_transmit(task);
+	rpc_wake_up_queued_task_set_status(&xprt->sending, task, status);
+	return status;
+}
+
+/**
+ * xprt_transmit - send an RPC request on a transport
+ * @task: controlling RPC task
+ *
+ * Attempts to drain the transmit queue. On exit, either the transport
+ * signalled an error that needs to be handled before transmission can
+ * resume, or @task finished transmitting, and detected that it already
+ * received a reply.
+ */
+void
+xprt_transmit(struct rpc_task *task)
+{
+	struct rpc_rqst *next, *req = task->tk_rqstp;
+	struct rpc_xprt	*xprt = req->rq_xprt;
+	LIST_HEAD(head);
+	int status;
+
+	task->tk_status = -EAGAIN;
+	spin_lock(&xprt->queue_lock);
+	/* Avoid livelock by moving the xmit_queue contents to a private list */
+	list_splice_init(&xprt->xmit_queue, &head);
+	while (!list_empty(&head)) {
+		next = list_first_entry(&head, struct rpc_rqst, rq_xmit);
+		xprt_pin_rqst(next);
+		spin_unlock(&xprt->queue_lock);
+		status = xprt_request_transmit(next, task);
+		if (status == -EBADMSG && next != req)
+			status = 0;
+		cond_resched();
+		spin_lock(&xprt->queue_lock);
+		xprt_unpin_rqst(next);
+		if (status == 0) {
+			if (!xprt_request_data_received(task) ||
+			    test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
+				continue;
+		} else if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
+			rpc_wake_up_queued_task(&xprt->pending, task);
+		else
+			task->tk_status = status;
+		/* On early exit, splice back the list contents */
+		list_splice(&head, &xprt->xmit_queue);
+		break;
+	}
+	spin_unlock(&xprt->queue_lock);
 }
 
 static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 26/27] SUNRPC: Queue the request for transmission immediately after encoding
  2018-09-03 15:29                                                 ` [PATCH 25/27] SUNRPC: Allow calls to xprt_transmit() to drain the entire transmit queue Trond Myklebust
@ 2018-09-03 15:29                                                   ` Trond Myklebust
  2018-09-03 15:29                                                     ` [PATCH 27/27] SUNRPC: Convert the xprt->sending queue back to an ordinary wait queue Trond Myklebust
  0 siblings, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

Move up the call to xprt_request_enqueue_transmit() to call_encode()
so that the queue order reflects the order in which slots were
allocated.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 net/sunrpc/clnt.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 586976c4c02a..8eea3c4d2532 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1785,6 +1785,7 @@ call_encode(struct rpc_task *task)
 	if (rpc_reply_expected(task))
 		xprt_request_enqueue_receive(task);
 	set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
+	xprt_request_enqueue_transmit(task);
 out:
 	task->tk_action = call_bind;
 }
@@ -1971,7 +1972,6 @@ call_transmit(struct rpc_task *task)
 	dprint_status(task);
 
 	task->tk_action = call_transmit_status;
-	xprt_request_enqueue_transmit(task);
 	if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
 		return;
 
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* [PATCH 27/27] SUNRPC: Convert the xprt->sending queue back to an ordinary wait queue
  2018-09-03 15:29                                                   ` [PATCH 26/27] SUNRPC: Queue the request for transmission immediately after encoding Trond Myklebust
@ 2018-09-03 15:29                                                     ` Trond Myklebust
  0 siblings, 0 replies; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 15:29 UTC (permalink / raw)
  To: linux-nfs

We no longer need priority semantics on the xprt->sending queue, because
the order in which tasks are sent is now dictated by their position in
the send queue.
Note that the backlog queue remains a priority queue, meaning that
slot resources are still managed in order of task priority.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 net/sunrpc/xprt.c | 29 ++++++-----------------------
 1 file changed, 6 insertions(+), 23 deletions(-)

diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 1ce32e555c9b..730e8bbd63a9 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -183,7 +183,6 @@ EXPORT_SYMBOL_GPL(xprt_load_transport);
 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 {
 	struct rpc_rqst *req = task->tk_rqstp;
-	int priority;
 
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
 		if (task == xprt->snd_task)
@@ -201,13 +200,7 @@ int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 			task->tk_pid, xprt);
 	task->tk_timeout = 0;
 	task->tk_status = -EAGAIN;
-	if (req == NULL)
-		priority = RPC_PRIORITY_LOW;
-	else if (!req->rq_ntrans)
-		priority = RPC_PRIORITY_NORMAL;
-	else
-		priority = RPC_PRIORITY_HIGH;
-	rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
+	rpc_sleep_on(&xprt->sending, task, NULL);
 	return 0;
 }
 EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
@@ -234,7 +227,6 @@ static void xprt_clear_locked(struct rpc_xprt *xprt)
 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 {
 	struct rpc_rqst *req = task->tk_rqstp;
-	int priority;
 
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
 		if (task == xprt->snd_task)
@@ -257,13 +249,7 @@ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 	dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
 	task->tk_timeout = 0;
 	task->tk_status = -EAGAIN;
-	if (req == NULL)
-		priority = RPC_PRIORITY_LOW;
-	else if (!req->rq_ntrans)
-		priority = RPC_PRIORITY_NORMAL;
-	else
-		priority = RPC_PRIORITY_HIGH;
-	rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
+	rpc_sleep_on(&xprt->sending, task, NULL);
 	return 0;
 }
 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
@@ -295,8 +281,7 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 		return;
 
-	if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
-				__xprt_lock_write_func, xprt))
+	if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_func, xprt))
 		return;
 	xprt_clear_locked(xprt);
 }
@@ -325,8 +310,7 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
 		return;
 	if (RPCXPRT_CONGESTED(xprt))
 		goto out_unlock;
-	if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
-				__xprt_lock_write_cong_func, xprt))
+	if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_cong_func, xprt))
 		return;
 out_unlock:
 	xprt_clear_locked(xprt);
@@ -506,8 +490,7 @@ void xprt_write_space(struct rpc_xprt *xprt)
 	if (xprt->snd_task) {
 		dprintk("RPC:       write space: waking waiting task on "
 				"xprt %p\n", xprt);
-		rpc_wake_up_queued_task_on_wq(xprtiod_workqueue,
-				&xprt->pending, xprt->snd_task);
+		rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task);
 	}
 	spin_unlock_bh(&xprt->transport_lock);
 }
@@ -1573,7 +1556,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)
 
 	rpc_init_wait_queue(&xprt->binding, "xprt_binding");
 	rpc_init_wait_queue(&xprt->pending, "xprt_pending");
-	rpc_init_priority_wait_queue(&xprt->sending, "xprt_sending");
+	rpc_init_wait_queue(&xprt->sending, "xprt_sending");
 	rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
 
 	xprt_init_xid(xprt);
-- 
2.17.1

^ permalink raw reply related	[flat|nested] 39+ messages in thread

* Re: [PATCH 05/27] SUNRPC: Avoid holding locks across the XDR encoding of the RPC message
  2018-09-03 15:29         ` [PATCH 05/27] SUNRPC: Avoid holding locks across the XDR encoding of the RPC message Trond Myklebust
  2018-09-03 15:29           ` [PATCH 06/27] SUNRPC: Rename TCP receive-specific state variables Trond Myklebust
@ 2018-09-03 17:11           ` Chuck Lever
  2018-09-03 17:40             ` Trond Myklebust
  1 sibling, 1 reply; 39+ messages in thread
From: Chuck Lever @ 2018-09-03 17:11 UTC (permalink / raw)
  To: Trond Myklebust; +Cc: Linux NFS Mailing List



> On Sep 3, 2018, at 11:29 AM, Trond Myklebust <trondmy@gmail.com> wrote:
> 
> Currently, we grab the socket bit lock before we allow the message
> to be XDR encoded. That significantly slows down the transmission
> rate, since we serialise on a potentially blocking operation.

Which operation blocks, and how often?


> Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
> ---
> net/sunrpc/clnt.c | 8 ++++----
> 1 file changed, 4 insertions(+), 4 deletions(-)
> 
> diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
> index e5ac35e803ad..66ec61347716 100644
> --- a/net/sunrpc/clnt.c
> +++ b/net/sunrpc/clnt.c
> @@ -1949,9 +1949,6 @@ call_transmit(struct rpc_task *task)
> 	task->tk_action = call_status;
> 	if (task->tk_status < 0)
> 		return;
> -	if (!xprt_prepare_transmit(task))
> -		return;
> -	task->tk_action = call_transmit_status;
> 	/* Encode here so that rpcsec_gss can use correct sequence number. */
> 	if (rpc_task_need_encode(task)) {
> 		rpc_xdr_encode(task);
> @@ -1965,8 +1962,11 @@ call_transmit(struct rpc_task *task)
> 			return;
> 		}
> 	}
> +	if (!xprt_prepare_transmit(task))
> +		return;
> +	task->tk_action = call_transmit_status;
> 	xprt_transmit(task);
> -	if (task->tk_status < 0)
> +	if (task->tk_status < 0) {

The added curly bracket seems incorrect.


> 		return;
> 	if (is_retrans)
> 		task->tk_client->cl_stats->rpcretrans++;
> -- 
> 2.17.1
> 

--
Chuck Lever
chucklever@gmail.com

^ permalink raw reply	[flat|nested] 39+ messages in thread

* Re: [PATCH 04/27] SUNRPC: Simplify identification of when the message send/receive is complete
  2018-09-03 15:29       ` [PATCH 04/27] SUNRPC: Simplify identification of when the message send/receive is complete Trond Myklebust
  2018-09-03 15:29         ` [PATCH 05/27] SUNRPC: Avoid holding locks across the XDR encoding of the RPC message Trond Myklebust
@ 2018-09-03 17:15         ` Chuck Lever
  1 sibling, 0 replies; 39+ messages in thread
From: Chuck Lever @ 2018-09-03 17:15 UTC (permalink / raw)
  To: Trond Myklebust; +Cc: Linux NFS Mailing List



> On Sep 3, 2018, at 11:29 AM, Trond Myklebust <trondmy@gmail.com> =
wrote:
>=20
> Add states to indicate that the message send and receive are not yet
> complete.

In general, the context provided by the cover letter is lost
since it does not appear in the git commit log. Some of the
patch descriptions in this series, like this one, fail to
explain "why", so there's no guidance at all to folks looking
through these patches after they're merged.

So I'm not sure if this is a clean up patch or a pre-requisite
and why it might be necessary.


> Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
> ---
> include/linux/sunrpc/sched.h |  6 ++++--
> net/sunrpc/clnt.c            | 19 +++++++------------
> net/sunrpc/xprt.c            | 17 ++++++++++++++---
> 3 files changed, 25 insertions(+), 17 deletions(-)
>=20
> diff --git a/include/linux/sunrpc/sched.h =
b/include/linux/sunrpc/sched.h
> index 592653becd91..9e655df70131 100644
> --- a/include/linux/sunrpc/sched.h
> +++ b/include/linux/sunrpc/sched.h
> @@ -140,8 +140,10 @@ struct rpc_task_setup {
> #define RPC_TASK_RUNNING	0
> #define RPC_TASK_QUEUED		1
> #define RPC_TASK_ACTIVE		2
> -#define RPC_TASK_MSG_RECV	3
> -#define RPC_TASK_MSG_RECV_WAIT	4
> +#define RPC_TASK_NEED_XMIT	3
> +#define RPC_TASK_NEED_RECV	4
> +#define RPC_TASK_MSG_RECV	5
> +#define RPC_TASK_MSG_RECV_WAIT	6
>=20
> #define RPC_IS_RUNNING(t)	test_bit(RPC_TASK_RUNNING, =
&(t)->tk_runstate)
> #define rpc_set_running(t)	set_bit(RPC_TASK_RUNNING, =
&(t)->tk_runstate)
> diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
> index d41b5ac1d4e8..e5ac35e803ad 100644
> --- a/net/sunrpc/clnt.c
> +++ b/net/sunrpc/clnt.c
> @@ -1156,6 +1156,7 @@ struct rpc_task *rpc_run_bc_task(struct rpc_rqst =
*req)
> 	 */
> 	xbufp->len =3D xbufp->head[0].iov_len + xbufp->page_len +
> 			xbufp->tail[0].iov_len;
> +	set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
>=20
> 	task->tk_action =3D call_bc_transmit;
> 	atomic_inc(&task->tk_count);
> @@ -1720,17 +1721,10 @@ call_allocate(struct rpc_task *task)
> 	rpc_exit(task, -ERESTARTSYS);
> }
>=20
> -static inline int
> +static int

Nit: perhaps this function should return a bool type.


> rpc_task_need_encode(struct rpc_task *task)
> {
> -	return task->tk_rqstp->rq_snd_buf.len =3D=3D 0;
> -}
> -
> -static inline void
> -rpc_task_force_reencode(struct rpc_task *task)
> -{
> -	task->tk_rqstp->rq_snd_buf.len =3D 0;
> -	task->tk_rqstp->rq_bytes_sent =3D 0;
> +	return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) =3D=3D =
0;
> }
>=20
> /*
> @@ -1765,6 +1759,8 @@ rpc_xdr_encode(struct rpc_task *task)
>=20
> 	task->tk_status =3D rpcauth_wrap_req(task, encode, req, p,
> 			task->tk_msg.rpc_argp);
> +	if (task->tk_status =3D=3D 0)
> +		set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
> }
>=20
> /*
> @@ -1999,7 +1995,6 @@ call_transmit_status(struct rpc_task *task)
> 	 */
> 	if (task->tk_status =3D=3D 0) {
> 		xprt_end_transmit(task);
> -		rpc_task_force_reencode(task);
> 		return;
> 	}
>=20
> @@ -2010,7 +2005,6 @@ call_transmit_status(struct rpc_task *task)
> 	default:
> 		dprint_status(task);
> 		xprt_end_transmit(task);
> -		rpc_task_force_reencode(task);
> 		break;
> 		/*
> 		 * Special cases: if we've been waiting on the
> @@ -2038,7 +2032,7 @@ call_transmit_status(struct rpc_task *task)
> 	case -EADDRINUSE:
> 	case -ENOTCONN:
> 	case -EPIPE:
> -		rpc_task_force_reencode(task);
> +		break;
> 	}
> }
>=20
> @@ -2185,6 +2179,7 @@ call_status(struct rpc_task *task)
> 		rpc_exit(task, status);
> 		break;
> 	case -EBADMSG:
> +		clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
> 		task->tk_action =3D call_transmit;
> 		break;
> 	default:
> diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
> index 3973e10ea2bd..45d580cd93ac 100644
> --- a/net/sunrpc/xprt.c
> +++ b/net/sunrpc/xprt.c
> @@ -936,10 +936,18 @@ void xprt_complete_rqst(struct rpc_task *task, =
int copied)
> 	/* req->rq_reply_bytes_recvd */
> 	smp_wmb();
> 	req->rq_reply_bytes_recvd =3D copied;
> +	clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
> 	rpc_wake_up_queued_task(&xprt->pending, task);
> }
> EXPORT_SYMBOL_GPL(xprt_complete_rqst);
>=20
> +static bool
> +xprt_request_data_received(struct rpc_task *task)
> +{
> +	return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
> +		task->tk_rqstp->rq_reply_bytes_recvd !=3D 0;
> +}
> +
> static void xprt_timer(struct rpc_task *task)
> {
> 	struct rpc_rqst *req =3D task->tk_rqstp;
> @@ -1031,12 +1039,13 @@ void xprt_transmit(struct rpc_task *task)
> 			/* Add request to the receive list */
> 			spin_lock(&xprt->recv_lock);
> 			list_add_tail(&req->rq_list, &xprt->recv);
> +			set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
> 			spin_unlock(&xprt->recv_lock);
> 			xprt_reset_majortimeo(req);
> 			/* Turn off autodisconnect */
> 			del_singleshot_timer_sync(&xprt->timer);
> 		}
> -	} else if (!req->rq_bytes_sent)
> +	} else if (xprt_request_data_received(task) && =
!req->rq_bytes_sent)
> 		return;
>=20
> 	connect_cookie =3D xprt->connect_cookie;
> @@ -1046,9 +1055,11 @@ void xprt_transmit(struct rpc_task *task)
> 		task->tk_status =3D status;
> 		return;
> 	}
> +
> 	xprt_inject_disconnect(xprt);
>=20
> 	dprintk("RPC: %5u xmit complete\n", task->tk_pid);
> +	clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
> 	task->tk_flags |=3D RPC_TASK_SENT;
> 	spin_lock_bh(&xprt->transport_lock);
>=20
> @@ -1062,14 +1073,14 @@ void xprt_transmit(struct rpc_task *task)
> 	spin_unlock_bh(&xprt->transport_lock);
>=20
> 	req->rq_connect_cookie =3D connect_cookie;
> -	if (rpc_reply_expected(task) && =
!READ_ONCE(req->rq_reply_bytes_recvd)) {
> +	if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
> 		/*
> 		 * Sleep on the pending queue if we're expecting a =
reply.
> 		 * The spinlock ensures atomicity between the test of
> 		 * req->rq_reply_bytes_recvd, and the call to =
rpc_sleep_on().
> 		 */
> 		spin_lock(&xprt->recv_lock);
> -		if (!req->rq_reply_bytes_recvd) {
> +		if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
> 			rpc_sleep_on(&xprt->pending, task, xprt_timer);
> 			/*
> 			 * Send an extra queue wakeup call if the
> --=20
> 2.17.1
>=20

--
Chuck Lever
chucklever@gmail.com

^ permalink raw reply	[flat|nested] 39+ messages in thread

* Re: [PATCH 20/27] SUNRPC: Treat the task and request as separate in the xprt_ops->send_request()
  2018-09-03 15:29                                       ` [PATCH 20/27] SUNRPC: Treat the task and request as separate in the xprt_ops->send_request() Trond Myklebust
  2018-09-03 15:29                                         ` [PATCH 21/27] SUNRPC: Don't reset the request 'bytes_sent' counter when releasing XPRT_LOCK Trond Myklebust
@ 2018-09-03 17:28                                         ` Chuck Lever
  2018-09-03 17:47                                           ` Trond Myklebust
  1 sibling, 1 reply; 39+ messages in thread
From: Chuck Lever @ 2018-09-03 17:28 UTC (permalink / raw)
  To: Trond Myklebust; +Cc: Linux NFS Mailing List



> On Sep 3, 2018, at 11:29 AM, Trond Myklebust <trondmy@gmail.com> =
wrote:
>=20
> When we shift to using the transmit queue, then the task that holds =
the
> write lock will not necessarily be the same as the one being =
transmitted.

Can you pass in just the rpc_rqst? Then @task would be available as
xprt->snd_task.


> Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
> ---
> include/linux/sunrpc/xprt.h                |  2 +-
> net/sunrpc/xprt.c                          |  2 +-
> net/sunrpc/xprtrdma/svc_rdma_backchannel.c |  3 +--
> net/sunrpc/xprtrdma/transport.c            |  5 ++--
> net/sunrpc/xprtsock.c                      | 27 +++++++++++-----------
> 5 files changed, 18 insertions(+), 21 deletions(-)
>=20
> diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
> index 81a6c2c8dfc7..6d91acfe0644 100644
> --- a/include/linux/sunrpc/xprt.h
> +++ b/include/linux/sunrpc/xprt.h
> @@ -140,7 +140,7 @@ struct rpc_xprt_ops {
> 	void		(*connect)(struct rpc_xprt *xprt, struct =
rpc_task *task);
> 	int		(*buf_alloc)(struct rpc_task *task);
> 	void		(*buf_free)(struct rpc_task *task);
> -	int		(*send_request)(struct rpc_task *task);
> +	int		(*send_request)(struct rpc_rqst *req, struct =
rpc_task *task);
> 	void		(*set_retrans_timeout)(struct rpc_task *task);
> 	void		(*timer)(struct rpc_xprt *xprt, struct rpc_task =
*task);
> 	void		(*release_request)(struct rpc_task *task);
> diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
> index defc6167570a..3efcb69af526 100644
> --- a/net/sunrpc/xprt.c
> +++ b/net/sunrpc/xprt.c
> @@ -1170,7 +1170,7 @@ void xprt_transmit(struct rpc_task *task)
> 	}
>=20
> 	connect_cookie =3D xprt->connect_cookie;
> -	status =3D xprt->ops->send_request(task);
> +	status =3D xprt->ops->send_request(req, task);
> 	trace_xprt_transmit(xprt, req->rq_xid, status);
> 	if (status !=3D 0) {
> 		task->tk_status =3D status;
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c =
b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> index 09b12b7568fe..d1618c70edb4 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> @@ -215,9 +215,8 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, =
struct rpc_rqst *rqst)
>  * connection.
>  */
> static int
> -xprt_rdma_bc_send_request(struct rpc_task *task)
> +xprt_rdma_bc_send_request(struct rpc_rqst *rqst, struct rpc_task =
*task)
> {
> -	struct rpc_rqst *rqst =3D task->tk_rqstp;
> 	struct svc_xprt *sxprt =3D rqst->rq_xprt->bc_xprt;
> 	struct svcxprt_rdma *rdma;
> 	int ret;
> diff --git a/net/sunrpc/xprtrdma/transport.c =
b/net/sunrpc/xprtrdma/transport.c
> index 143ce2579ba9..fa684bf4d090 100644
> --- a/net/sunrpc/xprtrdma/transport.c
> +++ b/net/sunrpc/xprtrdma/transport.c
> @@ -706,9 +706,8 @@ xprt_rdma_free(struct rpc_task *task)
>  *		sent. Do not try to send this message again.
>  */
> static int
> -xprt_rdma_send_request(struct rpc_task *task)
> +xprt_rdma_send_request(struct rpc_rqst *rqst, struct rpc_task *task)

Nit: The function's documenting comment will need to be updated.


> {
> -	struct rpc_rqst *rqst =3D task->tk_rqstp;
> 	struct rpc_xprt *xprt =3D rqst->rq_xprt;
> 	struct rpcrdma_req *req =3D rpcr_to_rdmar(rqst);
> 	struct rpcrdma_xprt *r_xprt =3D rpcx_to_rdmax(xprt);
> @@ -741,7 +740,7 @@ xprt_rdma_send_request(struct rpc_task *task)
> 	/* An RPC with no reply will throw off credit accounting,
> 	 * so drop the connection to reset the credit grant.
> 	 */
> -	if (!rpc_reply_expected(task))
> +	if (!rpc_reply_expected(rqst->rq_task))
> 		goto drop_connection;
> 	return 0;
>=20
> diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
> index 8d6404259ff9..b8143eded4af 100644
> --- a/net/sunrpc/xprtsock.c
> +++ b/net/sunrpc/xprtsock.c
> @@ -449,12 +449,12 @@ static void xs_nospace_callback(struct rpc_task =
*task)
>=20
> /**
>  * xs_nospace - place task on wait queue if transmit was incomplete
> + * @req: pointer to RPC request
>  * @task: task to put to sleep
>  *
>  */
> -static int xs_nospace(struct rpc_task *task)
> +static int xs_nospace(struct rpc_rqst *req, struct rpc_task *task)
> {
> -	struct rpc_rqst *req =3D task->tk_rqstp;
> 	struct rpc_xprt *xprt =3D req->rq_xprt;
> 	struct sock_xprt *transport =3D container_of(xprt, struct =
sock_xprt, xprt);
> 	struct sock *sk =3D transport->inet;
> @@ -513,6 +513,7 @@ static inline void =
xs_encode_stream_record_marker(struct xdr_buf *buf)
>=20
> /**
>  * xs_local_send_request - write an RPC request to an AF_LOCAL socket
> + * @req: pointer to RPC request
>  * @task: RPC task that manages the state of an RPC request
>  *
>  * Return values:
> @@ -522,9 +523,8 @@ static inline void =
xs_encode_stream_record_marker(struct xdr_buf *buf)
>  * ENOTCONN:	Caller needs to invoke connect logic then call again
>  *    other:	Some other error occured, the request was not sent
>  */
> -static int xs_local_send_request(struct rpc_task *task)
> +static int xs_local_send_request(struct rpc_rqst *req, struct =
rpc_task *task)
> {
> -	struct rpc_rqst *req =3D task->tk_rqstp;
> 	struct rpc_xprt *xprt =3D req->rq_xprt;
> 	struct sock_xprt *transport =3D
> 				container_of(xprt, struct sock_xprt, =
xprt);
> @@ -569,7 +569,7 @@ static int xs_local_send_request(struct rpc_task =
*task)
> 	case -ENOBUFS:
> 		break;
> 	case -EAGAIN:
> -		status =3D xs_nospace(task);
> +		status =3D xs_nospace(req, task);
> 		break;
> 	default:
> 		dprintk("RPC:       sendmsg returned unrecognized error =
%d\n",
> @@ -585,6 +585,7 @@ static int xs_local_send_request(struct rpc_task =
*task)
>=20
> /**
>  * xs_udp_send_request - write an RPC request to a UDP socket
> + * @req: pointer to RPC request
>  * @task: address of RPC task that manages the state of an RPC request
>  *
>  * Return values:
> @@ -594,9 +595,8 @@ static int xs_local_send_request(struct rpc_task =
*task)
>  * ENOTCONN:	Caller needs to invoke connect logic then call again
>  *    other:	Some other error occurred, the request was not sent
>  */
> -static int xs_udp_send_request(struct rpc_task *task)
> +static int xs_udp_send_request(struct rpc_rqst *req, struct rpc_task =
*task)
> {
> -	struct rpc_rqst *req =3D task->tk_rqstp;
> 	struct rpc_xprt *xprt =3D req->rq_xprt;
> 	struct sock_xprt *transport =3D container_of(xprt, struct =
sock_xprt, xprt);
> 	struct xdr_buf *xdr =3D &req->rq_snd_buf;
> @@ -638,7 +638,7 @@ static int xs_udp_send_request(struct rpc_task =
*task)
> 		/* Should we call xs_close() here? */
> 		break;
> 	case -EAGAIN:
> -		status =3D xs_nospace(task);
> +		status =3D xs_nospace(req, task);
> 		break;
> 	case -ENETUNREACH:
> 	case -ENOBUFS:
> @@ -658,6 +658,7 @@ static int xs_udp_send_request(struct rpc_task =
*task)
>=20
> /**
>  * xs_tcp_send_request - write an RPC request to a TCP socket
> + * @req: pointer to RPC request
>  * @task: address of RPC task that manages the state of an RPC request
>  *
>  * Return values:
> @@ -670,9 +671,8 @@ static int xs_udp_send_request(struct rpc_task =
*task)
>  * XXX: In the case of soft timeouts, should we eventually give up
>  *	if sendmsg is not able to make progress?
>  */
> -static int xs_tcp_send_request(struct rpc_task *task)
> +static int xs_tcp_send_request(struct rpc_rqst *req, struct rpc_task =
*task)
> {
> -	struct rpc_rqst *req =3D task->tk_rqstp;
> 	struct rpc_xprt *xprt =3D req->rq_xprt;
> 	struct sock_xprt *transport =3D container_of(xprt, struct =
sock_xprt, xprt);
> 	struct xdr_buf *xdr =3D &req->rq_snd_buf;
> @@ -697,7 +697,7 @@ static int xs_tcp_send_request(struct rpc_task =
*task)
> 	 * completes while the socket holds a reference to the pages,
> 	 * then we may end up resending corrupted data.
> 	 */
> -	if (task->tk_flags & RPC_TASK_SENT)
> +	if (req->rq_task->tk_flags & RPC_TASK_SENT)
> 		zerocopy =3D false;
>=20
> 	if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state))
> @@ -761,7 +761,7 @@ static int xs_tcp_send_request(struct rpc_task =
*task)
> 		/* Should we call xs_close() here? */
> 		break;
> 	case -EAGAIN:
> -		status =3D xs_nospace(task);
> +		status =3D xs_nospace(req, task);
> 		break;
> 	case -ECONNRESET:
> 	case -ECONNREFUSED:
> @@ -2706,9 +2706,8 @@ static int bc_sendto(struct rpc_rqst *req)
> /*
>  * The send routine. Borrows from svc_send
>  */
> -static int bc_send_request(struct rpc_task *task)
> +static int bc_send_request(struct rpc_rqst *req, struct rpc_task =
*task)
> {
> -	struct rpc_rqst *req =3D task->tk_rqstp;
> 	struct svc_xprt	*xprt;
> 	int len;
>=20
> --=20
> 2.17.1
>=20

--
Chuck Lever
chucklever@gmail.com

^ permalink raw reply	[flat|nested] 39+ messages in thread

* Re: [PATCH 23/27] SUNRPC: Move RPC retransmission stat counter to xprt_transmit()
  2018-09-03 15:29                                             ` [PATCH 23/27] SUNRPC: Move RPC retransmission stat counter to xprt_transmit() Trond Myklebust
  2018-09-03 15:29                                               ` [PATCH 24/27] SUNRPC: Fix up the back channel transmit Trond Myklebust
@ 2018-09-03 17:31                                               ` Chuck Lever
  2018-09-03 17:49                                                 ` Trond Myklebust
  1 sibling, 1 reply; 39+ messages in thread
From: Chuck Lever @ 2018-09-03 17:31 UTC (permalink / raw)
  To: Trond Myklebust; +Cc: Linux NFS Mailing List

Do you also need to move req->rq_ntrans++ ?


> On Sep 3, 2018, at 11:29 AM, Trond Myklebust <trondmy@gmail.com> wrote:
> 
> Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
> ---
> net/sunrpc/clnt.c | 6 ------
> net/sunrpc/xprt.c | 4 ++++
> 2 files changed, 4 insertions(+), 6 deletions(-)
> 
> diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
> index 585a82dfaf4d..fb40d1e9f636 100644
> --- a/net/sunrpc/clnt.c
> +++ b/net/sunrpc/clnt.c
> @@ -1967,8 +1967,6 @@ call_connect_status(struct rpc_task *task)
> static void
> call_transmit(struct rpc_task *task)
> {
> -	int is_retrans = RPC_WAS_SENT(task);
> -
> 	dprint_status(task);
> 
> 	task->tk_action = call_transmit_status;
> @@ -1979,10 +1977,6 @@ call_transmit(struct rpc_task *task)
> 	if (!xprt_prepare_transmit(task))
> 		return;
> 	xprt_transmit(task);
> -	if (task->tk_status < 0)
> -		return;
> -	if (is_retrans)
> -		task->tk_client->cl_stats->rpcretrans++;
> }
> 
> /*
> diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
> index 95d15d4017f7..b85e2c4fa115 100644
> --- a/net/sunrpc/xprt.c
> +++ b/net/sunrpc/xprt.c
> @@ -1126,6 +1126,7 @@ void xprt_transmit(struct rpc_task *task)
> 	struct rpc_rqst	*req = task->tk_rqstp;
> 	struct rpc_xprt	*xprt = req->rq_xprt;
> 	unsigned int connect_cookie;
> +	int is_retrans = RPC_WAS_SENT(task);
> 	int status;
> 
> 	dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
> @@ -1148,6 +1149,9 @@ void xprt_transmit(struct rpc_task *task)
> 		return;
> 	}
> 
> +	if (is_retrans)
> +		task->tk_client->cl_stats->rpcretrans++;
> +
> 	xprt_inject_disconnect(xprt);
> 
> 	dprintk("RPC: %5u xmit complete\n", task->tk_pid);
> -- 
> 2.17.1
> 

--
Chuck Lever
chucklever@gmail.com

^ permalink raw reply	[flat|nested] 39+ messages in thread

* Re: [PATCH 05/27] SUNRPC: Avoid holding locks across the XDR encoding of the RPC message
  2018-09-03 17:11           ` [PATCH 05/27] SUNRPC: Avoid holding locks across the XDR encoding of the RPC message Chuck Lever
@ 2018-09-03 17:40             ` Trond Myklebust
  0 siblings, 0 replies; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 17:40 UTC (permalink / raw)
  To: Chuck Lever; +Cc: Linux NFS Mailing List

On Mon, 2018-09-03 at 13:11 -0400, Chuck Lever wrote:
> > On Sep 3, 2018, at 11:29 AM, Trond Myklebust <trondmy@gmail.com>
> > wrote:
> > 
> > Currently, we grab the socket bit lock before we allow the message
> > to be XDR encoded. That significantly slows down the transmission
> > rate, since we serialise on a potentially blocking operation.
> 
> Which operation blocks, and how often?

RPCSEC_GSS allocates memory when doing privacy encoding, for instance. 

> > Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
> > ---
> > net/sunrpc/clnt.c | 8 ++++----
> > 1 file changed, 4 insertions(+), 4 deletions(-)
> > 
> > diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
> > index e5ac35e803ad..66ec61347716 100644
> > --- a/net/sunrpc/clnt.c
> > +++ b/net/sunrpc/clnt.c
> > @@ -1949,9 +1949,6 @@ call_transmit(struct rpc_task *task)
> > 	task->tk_action = call_status;
> > 	if (task->tk_status < 0)
> > 		return;
> > -	if (!xprt_prepare_transmit(task))
> > -		return;
> > -	task->tk_action = call_transmit_status;
> > 	/* Encode here so that rpcsec_gss can use correct sequence
> > number. */
> > 	if (rpc_task_need_encode(task)) {
> > 		rpc_xdr_encode(task);
> > @@ -1965,8 +1962,11 @@ call_transmit(struct rpc_task *task)
> > 			return;
> > 		}
> > 	}
> > +	if (!xprt_prepare_transmit(task))
> > +		return;
> > +	task->tk_action = call_transmit_status;
> > 	xprt_transmit(task);
> > -	if (task->tk_status < 0)
> > +	if (task->tk_status < 0) {
> 
> The added curly bracket seems incorrect.

Oops. Yes, that's a rebase brain-fart... Thanks for spotting it!
> 
> 
> > 		return;
> > 	if (is_retrans)
> > 		task->tk_client->cl_stats->rpcretrans++;
> > -- 
> > 2.17.1
> > 
> 
> --
> Chuck Lever
> chucklever@gmail.com
> 
> 
> 

^ permalink raw reply	[flat|nested] 39+ messages in thread

* Re: [PATCH 00/27] Convert RPC client transmission to a queued model
  2018-09-03 15:29 [PATCH 00/27] Convert RPC client transmission to a queued model Trond Myklebust
  2018-09-03 15:29 ` [PATCH 01/27] SUNRPC: Clean up initialisation of the struct rpc_rqst Trond Myklebust
@ 2018-09-03 17:41 ` Chuck Lever
  2018-09-03 17:55   ` Trond Myklebust
  1 sibling, 1 reply; 39+ messages in thread
From: Chuck Lever @ 2018-09-03 17:41 UTC (permalink / raw)
  To: Trond Myklebust; +Cc: Linux NFS Mailing List



> On Sep 3, 2018, at 11:29 AM, Trond Myklebust <trondmy@gmail.com> wrote:
> 
> For historical reasons, the RPC client is heavily serialised during the
> process of transmitting a request by the XPRT_LOCK. A request is
> required to take that lock before it can start XDR encoding, and it is
> required to hold it until it is done transmitting. In essence the lock
> protects the following functions:
> 
> - Stream based transport connect/reconnect
> - RPCSEC_GSS encoding of the RPC message
> - Transmission of a single RPC message

It also protects TCP rqst slot allocation:

void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
{
        /* Note: grabbing the xprt_lock_write() ensures that we throttle
         * new slot allocation if the transport is congested (i.e. when
         * reconnecting a stream transport or when out of socket write
         * buffer space).
         */
        if (xprt_lock_write(xprt, task)) {
                xprt_alloc_slot(xprt, task);
                xprt_release_write(xprt, task);
        }
}


> The following patch set assumes that we do not need to do much to
> improve performance of the connect/reconnect case, as that is supposed
> to be a rare occurrence.
> 
> The set looks at dealing with RPCSEC_GSS issues by removing serialisation
> while encoding, and simply assuming that if we detect after grabbing the
> XPRT_LOCK that we're about to transmit a message with a sequence number
> that has fallen outside the window allowed by RFC2203, then we can
> abort the transmission of that message, and schedule it for re-encoding.
> Since window sizes are typically expected to lie above 100 messages or
> so, we expect these cases where we miss the window to be rare, in
> general.
> 
> Finally, we look at trying to avoid the requirement that every request
> must go through the process of being woken up to grab the XPRT_LOCK in
> order to transmit itself by allowing a request that currently holds the
> XPRT_LOCK to grab other requests from an ordered queue, and to transmit
> them too. The bulk of the changes in this patchset are dedicated to
> providing this functionality.

When considering whether this kind of change could work for
xprtrdma: the transport send lock mechanism is used to manage
the credit grant. The transport send lock prevents the client
from sending too many RPC Calls at once.

Congestion- or flow-controlled transports might not be able to
adopt this approach, because there needs to be a check before
each RPC Call is sent to see if the congestion/credit window
has room.


> Trond Myklebust (27):
>  SUNRPC: Clean up initialisation of the struct rpc_rqst
>  SUNRPC: If there is no reply expected, bail early from call_decode
>  SUNRPC: The transmitted message must lie in the RPCSEC window of
>    validity
>  SUNRPC: Simplify identification of when the message send/receive is
>    complete
>  SUNRPC: Avoid holding locks across the XDR encoding of the RPC message
>  SUNRPC: Rename TCP receive-specific state variables
>  SUNRPC: Move reset of TCP state variables into the reconnect code
>  SUNRPC: Add socket transmit queue offset tracking
>  SUNRPC: Simplify dealing with aborted partially transmitted messages
>  SUNRPC: Refactor the transport request pinning
>  SUNRPC: Add a helper to wake up a sleeping rpc_task and set its status
>  SUNRPC: Don't wake queued RPC calls multiple times in xprt_transmit
>  SUNRPC: Rename xprt->recv_lock to xprt->queue_lock
>  SUNRPC: Refactor xprt_transmit() to remove the reply queue code
>  SUNRPC: Refactor xprt_transmit() to remove wait for reply code
>  SUNRPC: Minor cleanup for call_transmit()
>  SUNRPC: Distinguish between the slot allocation list and receive queue
>  NFS: Add a transmission queue for RPC requests
>  SUNRPC: Refactor RPC call encoding
>  SUNRPC: Treat the task and request as separate in the
>    xprt_ops->send_request()
>  SUNRPC: Don't reset the request 'bytes_sent' counter when releasing
>    XPRT_LOCK
>  SUNRPC: Simplify xprt_prepare_transmit()
>  SUNRPC: Move RPC retransmission stat counter to xprt_transmit()
>  SUNRPC: Fix up the back channel transmit
>  SUNRPC: Allow calls to xprt_transmit() to drain the entire transmit
>    queue
>  SUNRPC: Queue the request for transmission immediately after encoding
>  SUNRPC: Convert the xprt->sending queue back to an ordinary wait queue
> 
> include/linux/sunrpc/auth.h                |   2 +
> include/linux/sunrpc/auth_gss.h            |   1 +
> include/linux/sunrpc/sched.h               |   7 +-
> include/linux/sunrpc/xprt.h                |  23 +-
> include/linux/sunrpc/xprtsock.h            |  23 +-
> include/trace/events/sunrpc.h              |  10 +-
> net/sunrpc/auth.c                          |  10 +
> net/sunrpc/auth_gss/auth_gss.c             |  41 ++
> net/sunrpc/backchannel_rqst.c              |   3 +-
> net/sunrpc/clnt.c                          | 139 +++---
> net/sunrpc/sched.c                         |  63 ++-
> net/sunrpc/svcsock.c                       |   6 +-
> net/sunrpc/xprt.c                          | 503 +++++++++++++--------
> net/sunrpc/xprtrdma/backchannel.c          |   3 +-
> net/sunrpc/xprtrdma/rpc_rdma.c             |  10 +-
> net/sunrpc/xprtrdma/svc_rdma_backchannel.c |   7 +-
> net/sunrpc/xprtrdma/transport.c            |   5 +-
> net/sunrpc/xprtsock.c                      | 327 +++++++-------
> 18 files changed, 728 insertions(+), 455 deletions(-)
> 
> -- 
> 2.17.1
> 

--
Chuck Lever

^ permalink raw reply	[flat|nested] 39+ messages in thread

* Re: [PATCH 20/27] SUNRPC: Treat the task and request as separate in the xprt_ops->send_request()
  2018-09-03 17:28                                         ` [PATCH 20/27] SUNRPC: Treat the task and request as separate in the xprt_ops->send_request() Chuck Lever
@ 2018-09-03 17:47                                           ` Trond Myklebust
  2018-09-03 17:49                                             ` Chuck Lever
  0 siblings, 1 reply; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 17:47 UTC (permalink / raw)
  To: Chuck Lever; +Cc: Linux NFS Mailing List

On Mon, 2018-09-03 at 13:28 -0400, Chuck Lever wrote:
> > On Sep 3, 2018, at 11:29 AM, Trond Myklebust <trondmy@gmail.com>
> > wrote:
> > 
> > When we shift to using the transmit queue, then the task that holds
> > the
> > write lock will not necessarily be the same as the one being
> > transmitted.
> 
> Can you pass in just the rpc_rqst? Then @task would be available as
> xprt->snd_task.

No. The point is that when we add queueing, @req->rq_task will usually
be asleep on xprt->sending. Only the task that holds the XPRT_LOCK is
awake, and so it is the one that needs to be put asleep when we hit a
socket nospace/EAGAIN error.

We _could_ potentially avoid passing in the task at all, and just have
xs_nospace() put xprt->snd_task to sleep, but that would skirt the edge
of a transport layering violation.

> 
> > Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
> > ---
> > include/linux/sunrpc/xprt.h                |  2 +-
> > net/sunrpc/xprt.c                          |  2 +-
> > net/sunrpc/xprtrdma/svc_rdma_backchannel.c |  3 +--
> > net/sunrpc/xprtrdma/transport.c            |  5 ++--
> > net/sunrpc/xprtsock.c                      | 27 +++++++++++------
> > -----
> > 5 files changed, 18 insertions(+), 21 deletions(-)
> > 
> > diff --git a/include/linux/sunrpc/xprt.h
> > b/include/linux/sunrpc/xprt.h
> > index 81a6c2c8dfc7..6d91acfe0644 100644
> > --- a/include/linux/sunrpc/xprt.h
> > +++ b/include/linux/sunrpc/xprt.h
> > @@ -140,7 +140,7 @@ struct rpc_xprt_ops {
> > 	void		(*connect)(struct rpc_xprt *xprt, struct
> > rpc_task *task);
> > 	int		(*buf_alloc)(struct rpc_task *task);
> > 	void		(*buf_free)(struct rpc_task *task);
> > -	int		(*send_request)(struct rpc_task *task);
> > +	int		(*send_request)(struct rpc_rqst *req, struct
> > rpc_task *task);
> > 	void		(*set_retrans_timeout)(struct rpc_task
> > *task);
> > 	void		(*timer)(struct rpc_xprt *xprt, struct
> > rpc_task *task);
> > 	void		(*release_request)(struct rpc_task *task);
> > diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
> > index defc6167570a..3efcb69af526 100644
> > --- a/net/sunrpc/xprt.c
> > +++ b/net/sunrpc/xprt.c
> > @@ -1170,7 +1170,7 @@ void xprt_transmit(struct rpc_task *task)
> > 	}
> > 
> > 	connect_cookie = xprt->connect_cookie;
> > -	status = xprt->ops->send_request(task);
> > +	status = xprt->ops->send_request(req, task);
> > 	trace_xprt_transmit(xprt, req->rq_xid, status);
> > 	if (status != 0) {
> > 		task->tk_status = status;
> > diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> > b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> > index 09b12b7568fe..d1618c70edb4 100644
> > --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> > +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> > @@ -215,9 +215,8 @@ rpcrdma_bc_send_request(struct svcxprt_rdma
> > *rdma, struct rpc_rqst *rqst)
> >  * connection.
> >  */
> > static int
> > -xprt_rdma_bc_send_request(struct rpc_task *task)
> > +xprt_rdma_bc_send_request(struct rpc_rqst *rqst, struct rpc_task
> > *task)
> > {
> > -	struct rpc_rqst *rqst = task->tk_rqstp;
> > 	struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
> > 	struct svcxprt_rdma *rdma;
> > 	int ret;
> > diff --git a/net/sunrpc/xprtrdma/transport.c
> > b/net/sunrpc/xprtrdma/transport.c
> > index 143ce2579ba9..fa684bf4d090 100644
> > --- a/net/sunrpc/xprtrdma/transport.c
> > +++ b/net/sunrpc/xprtrdma/transport.c
> > @@ -706,9 +706,8 @@ xprt_rdma_free(struct rpc_task *task)
> >  *		sent. Do not try to send this message again.
> >  */
> > static int
> > -xprt_rdma_send_request(struct rpc_task *task)
> > +xprt_rdma_send_request(struct rpc_rqst *rqst, struct rpc_task
> > *task)
> 
> Nit: The function's documenting comment will need to be updated.
> 
> 
> > {
> > -	struct rpc_rqst *rqst = task->tk_rqstp;
> > 	struct rpc_xprt *xprt = rqst->rq_xprt;
> > 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
> > 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
> > @@ -741,7 +740,7 @@ xprt_rdma_send_request(struct rpc_task *task)
> > 	/* An RPC with no reply will throw off credit accounting,
> > 	 * so drop the connection to reset the credit grant.
> > 	 */
> > -	if (!rpc_reply_expected(task))
> > +	if (!rpc_reply_expected(rqst->rq_task))
> > 		goto drop_connection;
> > 	return 0;
> > 
> > diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
> > index 8d6404259ff9..b8143eded4af 100644
> > --- a/net/sunrpc/xprtsock.c
> > +++ b/net/sunrpc/xprtsock.c
> > @@ -449,12 +449,12 @@ static void xs_nospace_callback(struct
> > rpc_task *task)
> > 
> > /**
> >  * xs_nospace - place task on wait queue if transmit was incomplete
> > + * @req: pointer to RPC request
> >  * @task: task to put to sleep
> >  *
> >  */
> > -static int xs_nospace(struct rpc_task *task)
> > +static int xs_nospace(struct rpc_rqst *req, struct rpc_task *task)
> > {
> > -	struct rpc_rqst *req = task->tk_rqstp;
> > 	struct rpc_xprt *xprt = req->rq_xprt;
> > 	struct sock_xprt *transport = container_of(xprt, struct
> > sock_xprt, xprt);
> > 	struct sock *sk = transport->inet;
> > @@ -513,6 +513,7 @@ static inline void
> > xs_encode_stream_record_marker(struct xdr_buf *buf)
> > 
> > /**
> >  * xs_local_send_request - write an RPC request to an AF_LOCAL
> > socket
> > + * @req: pointer to RPC request
> >  * @task: RPC task that manages the state of an RPC request
> >  *
> >  * Return values:
> > @@ -522,9 +523,8 @@ static inline void
> > xs_encode_stream_record_marker(struct xdr_buf *buf)
> >  * ENOTCONN:	Caller needs to invoke connect logic then call
> > again
> >  *    other:	Some other error occured, the request was not
> > sent
> >  */
> > -static int xs_local_send_request(struct rpc_task *task)
> > +static int xs_local_send_request(struct rpc_rqst *req, struct
> > rpc_task *task)
> > {
> > -	struct rpc_rqst *req = task->tk_rqstp;
> > 	struct rpc_xprt *xprt = req->rq_xprt;
> > 	struct sock_xprt *transport =
> > 				container_of(xprt, struct sock_xprt,
> > xprt);
> > @@ -569,7 +569,7 @@ static int xs_local_send_request(struct
> > rpc_task *task)
> > 	case -ENOBUFS:
> > 		break;
> > 	case -EAGAIN:
> > -		status = xs_nospace(task);
> > +		status = xs_nospace(req, task);
> > 		break;
> > 	default:
> > 		dprintk("RPC:       sendmsg returned unrecognized error
> > %d\n",
> > @@ -585,6 +585,7 @@ static int xs_local_send_request(struct
> > rpc_task *task)
> > 
> > /**
> >  * xs_udp_send_request - write an RPC request to a UDP socket
> > + * @req: pointer to RPC request
> >  * @task: address of RPC task that manages the state of an RPC
> > request
> >  *
> >  * Return values:
> > @@ -594,9 +595,8 @@ static int xs_local_send_request(struct
> > rpc_task *task)
> >  * ENOTCONN:	Caller needs to invoke connect logic then call
> > again
> >  *    other:	Some other error occurred, the request was not
> > sent
> >  */
> > -static int xs_udp_send_request(struct rpc_task *task)
> > +static int xs_udp_send_request(struct rpc_rqst *req, struct
> > rpc_task *task)
> > {
> > -	struct rpc_rqst *req = task->tk_rqstp;
> > 	struct rpc_xprt *xprt = req->rq_xprt;
> > 	struct sock_xprt *transport = container_of(xprt, struct
> > sock_xprt, xprt);
> > 	struct xdr_buf *xdr = &req->rq_snd_buf;
> > @@ -638,7 +638,7 @@ static int xs_udp_send_request(struct rpc_task
> > *task)
> > 		/* Should we call xs_close() here? */
> > 		break;
> > 	case -EAGAIN:
> > -		status = xs_nospace(task);
> > +		status = xs_nospace(req, task);
> > 		break;
> > 	case -ENETUNREACH:
> > 	case -ENOBUFS:
> > @@ -658,6 +658,7 @@ static int xs_udp_send_request(struct rpc_task
> > *task)
> > 
> > /**
> >  * xs_tcp_send_request - write an RPC request to a TCP socket
> > + * @req: pointer to RPC request
> >  * @task: address of RPC task that manages the state of an RPC
> > request
> >  *
> >  * Return values:
> > @@ -670,9 +671,8 @@ static int xs_udp_send_request(struct rpc_task
> > *task)
> >  * XXX: In the case of soft timeouts, should we eventually give up
> >  *	if sendmsg is not able to make progress?
> >  */
> > -static int xs_tcp_send_request(struct rpc_task *task)
> > +static int xs_tcp_send_request(struct rpc_rqst *req, struct
> > rpc_task *task)
> > {
> > -	struct rpc_rqst *req = task->tk_rqstp;
> > 	struct rpc_xprt *xprt = req->rq_xprt;
> > 	struct sock_xprt *transport = container_of(xprt, struct
> > sock_xprt, xprt);
> > 	struct xdr_buf *xdr = &req->rq_snd_buf;
> > @@ -697,7 +697,7 @@ static int xs_tcp_send_request(struct rpc_task
> > *task)
> > 	 * completes while the socket holds a reference to the pages,
> > 	 * then we may end up resending corrupted data.
> > 	 */
> > -	if (task->tk_flags & RPC_TASK_SENT)
> > +	if (req->rq_task->tk_flags & RPC_TASK_SENT)
> > 		zerocopy = false;
> > 
> > 	if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state))
> > @@ -761,7 +761,7 @@ static int xs_tcp_send_request(struct rpc_task
> > *task)
> > 		/* Should we call xs_close() here? */
> > 		break;
> > 	case -EAGAIN:
> > -		status = xs_nospace(task);
> > +		status = xs_nospace(req, task);
> > 		break;
> > 	case -ECONNRESET:
> > 	case -ECONNREFUSED:
> > @@ -2706,9 +2706,8 @@ static int bc_sendto(struct rpc_rqst *req)
> > /*
> >  * The send routine. Borrows from svc_send
> >  */
> > -static int bc_send_request(struct rpc_task *task)
> > +static int bc_send_request(struct rpc_rqst *req, struct rpc_task
> > *task)
> > {
> > -	struct rpc_rqst *req = task->tk_rqstp;
> > 	struct svc_xprt	*xprt;
> > 	int len;
> > 
> > -- 
> > 2.17.1
> > 
> 
> --
> Chuck Lever
> chucklever@gmail.com
> 
> 
> 

^ permalink raw reply	[flat|nested] 39+ messages in thread

* Re: [PATCH 23/27] SUNRPC: Move RPC retransmission stat counter to xprt_transmit()
  2018-09-03 17:31                                               ` [PATCH 23/27] SUNRPC: Move RPC retransmission stat counter to xprt_transmit() Chuck Lever
@ 2018-09-03 17:49                                                 ` Trond Myklebust
  0 siblings, 0 replies; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 17:49 UTC (permalink / raw)
  To: Chuck Lever; +Cc: Linux NFS Mailing List

On Mon, 2018-09-03 at 13:31 -0400, Chuck Lever wrote:
> Do you also need to move req->rq_ntrans++ ?

Good point. Right now, it is in completely the wrong place.

> 
> > On Sep 3, 2018, at 11:29 AM, Trond Myklebust <trondmy@gmail.com>
> > wrote:
> > 
> > Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
> > ---
> > net/sunrpc/clnt.c | 6 ------
> > net/sunrpc/xprt.c | 4 ++++
> > 2 files changed, 4 insertions(+), 6 deletions(-)
> > 
> > diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
> > index 585a82dfaf4d..fb40d1e9f636 100644
> > --- a/net/sunrpc/clnt.c
> > +++ b/net/sunrpc/clnt.c
> > @@ -1967,8 +1967,6 @@ call_connect_status(struct rpc_task *task)
> > static void
> > call_transmit(struct rpc_task *task)
> > {
> > -	int is_retrans = RPC_WAS_SENT(task);
> > -
> > 	dprint_status(task);
> > 
> > 	task->tk_action = call_transmit_status;
> > @@ -1979,10 +1977,6 @@ call_transmit(struct rpc_task *task)
> > 	if (!xprt_prepare_transmit(task))
> > 		return;
> > 	xprt_transmit(task);
> > -	if (task->tk_status < 0)
> > -		return;
> > -	if (is_retrans)
> > -		task->tk_client->cl_stats->rpcretrans++;
> > }
> > 
> > /*
> > diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
> > index 95d15d4017f7..b85e2c4fa115 100644
> > --- a/net/sunrpc/xprt.c
> > +++ b/net/sunrpc/xprt.c
> > @@ -1126,6 +1126,7 @@ void xprt_transmit(struct rpc_task *task)
> > 	struct rpc_rqst	*req = task->tk_rqstp;
> > 	struct rpc_xprt	*xprt = req->rq_xprt;
> > 	unsigned int connect_cookie;
> > +	int is_retrans = RPC_WAS_SENT(task);
> > 	int status;
> > 
> > 	dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req-
> > >rq_slen);
> > @@ -1148,6 +1149,9 @@ void xprt_transmit(struct rpc_task *task)
> > 		return;
> > 	}
> > 
> > +	if (is_retrans)
> > +		task->tk_client->cl_stats->rpcretrans++;
> > +
> > 	xprt_inject_disconnect(xprt);
> > 
> > 	dprintk("RPC: %5u xmit complete\n", task->tk_pid);
> > -- 
> > 2.17.1
> > 
> 
> --
> Chuck Lever
> chucklever@gmail.com
> 
> 
> 

^ permalink raw reply	[flat|nested] 39+ messages in thread

* Re: [PATCH 20/27] SUNRPC: Treat the task and request as separate in the xprt_ops->send_request()
  2018-09-03 17:47                                           ` Trond Myklebust
@ 2018-09-03 17:49                                             ` Chuck Lever
  0 siblings, 0 replies; 39+ messages in thread
From: Chuck Lever @ 2018-09-03 17:49 UTC (permalink / raw)
  To: Trond Myklebust; +Cc: Linux NFS Mailing List



> On Sep 3, 2018, at 1:47 PM, Trond Myklebust <trondmy@gmail.com> wrote:
> 
> On Mon, 2018-09-03 at 13:28 -0400, Chuck Lever wrote:
>>> On Sep 3, 2018, at 11:29 AM, Trond Myklebust <trondmy@gmail.com>
>>> wrote:
>>> 
>>> When we shift to using the transmit queue, then the task that holds
>>> the
>>> write lock will not necessarily be the same as the one being
>>> transmitted.
>> 
>> Can you pass in just the rpc_rqst? Then @task would be available as
>> xprt->snd_task.
> 
> No. The point is that when we add queueing, @req->rq_task will usually
> be asleep on xprt->sending. Only the task that holds the XPRT_LOCK is
> awake, and so it is the one that needs to be put asleep when we hit a
> socket nospace/EAGAIN error.

Right, that task is rqst->rq_xprt->snd_task, isn't it?


> We _could_ potentially avoid passing in the task at all, and just have
> xs_nospace() put xprt->snd_task to sleep, but that would skirt the edge
> of a transport layering violation.
> 
>> 
>>> Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
>>> ---
>>> include/linux/sunrpc/xprt.h                |  2 +-
>>> net/sunrpc/xprt.c                          |  2 +-
>>> net/sunrpc/xprtrdma/svc_rdma_backchannel.c |  3 +--
>>> net/sunrpc/xprtrdma/transport.c            |  5 ++--
>>> net/sunrpc/xprtsock.c                      | 27 +++++++++++------
>>> -----
>>> 5 files changed, 18 insertions(+), 21 deletions(-)
>>> 
>>> diff --git a/include/linux/sunrpc/xprt.h
>>> b/include/linux/sunrpc/xprt.h
>>> index 81a6c2c8dfc7..6d91acfe0644 100644
>>> --- a/include/linux/sunrpc/xprt.h
>>> +++ b/include/linux/sunrpc/xprt.h
>>> @@ -140,7 +140,7 @@ struct rpc_xprt_ops {
>>> 	void		(*connect)(struct rpc_xprt *xprt, struct
>>> rpc_task *task);
>>> 	int		(*buf_alloc)(struct rpc_task *task);
>>> 	void		(*buf_free)(struct rpc_task *task);
>>> -	int		(*send_request)(struct rpc_task *task);
>>> +	int		(*send_request)(struct rpc_rqst *req, struct
>>> rpc_task *task);
>>> 	void		(*set_retrans_timeout)(struct rpc_task
>>> *task);
>>> 	void		(*timer)(struct rpc_xprt *xprt, struct
>>> rpc_task *task);
>>> 	void		(*release_request)(struct rpc_task *task);
>>> diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
>>> index defc6167570a..3efcb69af526 100644
>>> --- a/net/sunrpc/xprt.c
>>> +++ b/net/sunrpc/xprt.c
>>> @@ -1170,7 +1170,7 @@ void xprt_transmit(struct rpc_task *task)
>>> 	}
>>> 
>>> 	connect_cookie = xprt->connect_cookie;
>>> -	status = xprt->ops->send_request(task);
>>> +	status = xprt->ops->send_request(req, task);
>>> 	trace_xprt_transmit(xprt, req->rq_xid, status);
>>> 	if (status != 0) {
>>> 		task->tk_status = status;
>>> diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
>>> b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
>>> index 09b12b7568fe..d1618c70edb4 100644
>>> --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
>>> +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
>>> @@ -215,9 +215,8 @@ rpcrdma_bc_send_request(struct svcxprt_rdma
>>> *rdma, struct rpc_rqst *rqst)
>>> * connection.
>>> */
>>> static int
>>> -xprt_rdma_bc_send_request(struct rpc_task *task)
>>> +xprt_rdma_bc_send_request(struct rpc_rqst *rqst, struct rpc_task
>>> *task)
>>> {
>>> -	struct rpc_rqst *rqst = task->tk_rqstp;
>>> 	struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
>>> 	struct svcxprt_rdma *rdma;
>>> 	int ret;
>>> diff --git a/net/sunrpc/xprtrdma/transport.c
>>> b/net/sunrpc/xprtrdma/transport.c
>>> index 143ce2579ba9..fa684bf4d090 100644
>>> --- a/net/sunrpc/xprtrdma/transport.c
>>> +++ b/net/sunrpc/xprtrdma/transport.c
>>> @@ -706,9 +706,8 @@ xprt_rdma_free(struct rpc_task *task)
>>> *		sent. Do not try to send this message again.
>>> */
>>> static int
>>> -xprt_rdma_send_request(struct rpc_task *task)
>>> +xprt_rdma_send_request(struct rpc_rqst *rqst, struct rpc_task
>>> *task)
>> 
>> Nit: The function's documenting comment will need to be updated.
>> 
>> 
>>> {
>>> -	struct rpc_rqst *rqst = task->tk_rqstp;
>>> 	struct rpc_xprt *xprt = rqst->rq_xprt;
>>> 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
>>> 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
>>> @@ -741,7 +740,7 @@ xprt_rdma_send_request(struct rpc_task *task)
>>> 	/* An RPC with no reply will throw off credit accounting,
>>> 	 * so drop the connection to reset the credit grant.
>>> 	 */
>>> -	if (!rpc_reply_expected(task))
>>> +	if (!rpc_reply_expected(rqst->rq_task))
>>> 		goto drop_connection;
>>> 	return 0;
>>> 
>>> diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
>>> index 8d6404259ff9..b8143eded4af 100644
>>> --- a/net/sunrpc/xprtsock.c
>>> +++ b/net/sunrpc/xprtsock.c
>>> @@ -449,12 +449,12 @@ static void xs_nospace_callback(struct
>>> rpc_task *task)
>>> 
>>> /**
>>> * xs_nospace - place task on wait queue if transmit was incomplete
>>> + * @req: pointer to RPC request
>>> * @task: task to put to sleep
>>> *
>>> */
>>> -static int xs_nospace(struct rpc_task *task)
>>> +static int xs_nospace(struct rpc_rqst *req, struct rpc_task *task)
>>> {
>>> -	struct rpc_rqst *req = task->tk_rqstp;
>>> 	struct rpc_xprt *xprt = req->rq_xprt;
>>> 	struct sock_xprt *transport = container_of(xprt, struct
>>> sock_xprt, xprt);
>>> 	struct sock *sk = transport->inet;
>>> @@ -513,6 +513,7 @@ static inline void
>>> xs_encode_stream_record_marker(struct xdr_buf *buf)
>>> 
>>> /**
>>> * xs_local_send_request - write an RPC request to an AF_LOCAL
>>> socket
>>> + * @req: pointer to RPC request
>>> * @task: RPC task that manages the state of an RPC request
>>> *
>>> * Return values:
>>> @@ -522,9 +523,8 @@ static inline void
>>> xs_encode_stream_record_marker(struct xdr_buf *buf)
>>> * ENOTCONN:	Caller needs to invoke connect logic then call
>>> again
>>> *    other:	Some other error occured, the request was not
>>> sent
>>> */
>>> -static int xs_local_send_request(struct rpc_task *task)
>>> +static int xs_local_send_request(struct rpc_rqst *req, struct
>>> rpc_task *task)
>>> {
>>> -	struct rpc_rqst *req = task->tk_rqstp;
>>> 	struct rpc_xprt *xprt = req->rq_xprt;
>>> 	struct sock_xprt *transport =
>>> 				container_of(xprt, struct sock_xprt,
>>> xprt);
>>> @@ -569,7 +569,7 @@ static int xs_local_send_request(struct
>>> rpc_task *task)
>>> 	case -ENOBUFS:
>>> 		break;
>>> 	case -EAGAIN:
>>> -		status = xs_nospace(task);
>>> +		status = xs_nospace(req, task);
>>> 		break;
>>> 	default:
>>> 		dprintk("RPC:       sendmsg returned unrecognized error
>>> %d\n",
>>> @@ -585,6 +585,7 @@ static int xs_local_send_request(struct
>>> rpc_task *task)
>>> 
>>> /**
>>> * xs_udp_send_request - write an RPC request to a UDP socket
>>> + * @req: pointer to RPC request
>>> * @task: address of RPC task that manages the state of an RPC
>>> request
>>> *
>>> * Return values:
>>> @@ -594,9 +595,8 @@ static int xs_local_send_request(struct
>>> rpc_task *task)
>>> * ENOTCONN:	Caller needs to invoke connect logic then call
>>> again
>>> *    other:	Some other error occurred, the request was not
>>> sent
>>> */
>>> -static int xs_udp_send_request(struct rpc_task *task)
>>> +static int xs_udp_send_request(struct rpc_rqst *req, struct
>>> rpc_task *task)
>>> {
>>> -	struct rpc_rqst *req = task->tk_rqstp;
>>> 	struct rpc_xprt *xprt = req->rq_xprt;
>>> 	struct sock_xprt *transport = container_of(xprt, struct
>>> sock_xprt, xprt);
>>> 	struct xdr_buf *xdr = &req->rq_snd_buf;
>>> @@ -638,7 +638,7 @@ static int xs_udp_send_request(struct rpc_task
>>> *task)
>>> 		/* Should we call xs_close() here? */
>>> 		break;
>>> 	case -EAGAIN:
>>> -		status = xs_nospace(task);
>>> +		status = xs_nospace(req, task);
>>> 		break;
>>> 	case -ENETUNREACH:
>>> 	case -ENOBUFS:
>>> @@ -658,6 +658,7 @@ static int xs_udp_send_request(struct rpc_task
>>> *task)
>>> 
>>> /**
>>> * xs_tcp_send_request - write an RPC request to a TCP socket
>>> + * @req: pointer to RPC request
>>> * @task: address of RPC task that manages the state of an RPC
>>> request
>>> *
>>> * Return values:
>>> @@ -670,9 +671,8 @@ static int xs_udp_send_request(struct rpc_task
>>> *task)
>>> * XXX: In the case of soft timeouts, should we eventually give up
>>> *	if sendmsg is not able to make progress?
>>> */
>>> -static int xs_tcp_send_request(struct rpc_task *task)
>>> +static int xs_tcp_send_request(struct rpc_rqst *req, struct
>>> rpc_task *task)
>>> {
>>> -	struct rpc_rqst *req = task->tk_rqstp;
>>> 	struct rpc_xprt *xprt = req->rq_xprt;
>>> 	struct sock_xprt *transport = container_of(xprt, struct
>>> sock_xprt, xprt);
>>> 	struct xdr_buf *xdr = &req->rq_snd_buf;
>>> @@ -697,7 +697,7 @@ static int xs_tcp_send_request(struct rpc_task
>>> *task)
>>> 	 * completes while the socket holds a reference to the pages,
>>> 	 * then we may end up resending corrupted data.
>>> 	 */
>>> -	if (task->tk_flags & RPC_TASK_SENT)
>>> +	if (req->rq_task->tk_flags & RPC_TASK_SENT)
>>> 		zerocopy = false;
>>> 
>>> 	if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state))
>>> @@ -761,7 +761,7 @@ static int xs_tcp_send_request(struct rpc_task
>>> *task)
>>> 		/* Should we call xs_close() here? */
>>> 		break;
>>> 	case -EAGAIN:
>>> -		status = xs_nospace(task);
>>> +		status = xs_nospace(req, task);
>>> 		break;
>>> 	case -ECONNRESET:
>>> 	case -ECONNREFUSED:
>>> @@ -2706,9 +2706,8 @@ static int bc_sendto(struct rpc_rqst *req)
>>> /*
>>> * The send routine. Borrows from svc_send
>>> */
>>> -static int bc_send_request(struct rpc_task *task)
>>> +static int bc_send_request(struct rpc_rqst *req, struct rpc_task
>>> *task)
>>> {
>>> -	struct rpc_rqst *req = task->tk_rqstp;
>>> 	struct svc_xprt	*xprt;
>>> 	int len;
>>> 
>>> -- 
>>> 2.17.1
>>> 
>> 
>> --
>> Chuck Lever
>> chucklever@gmail.com
>> 
>> 
>> 
> 

--
Chuck Lever
chucklever@gmail.com

^ permalink raw reply	[flat|nested] 39+ messages in thread

* Re: [PATCH 00/27] Convert RPC client transmission to a queued model
  2018-09-03 17:41 ` [PATCH 00/27] Convert RPC client transmission to a queued model Chuck Lever
@ 2018-09-03 17:55   ` Trond Myklebust
  0 siblings, 0 replies; 39+ messages in thread
From: Trond Myklebust @ 2018-09-03 17:55 UTC (permalink / raw)
  To: Chuck Lever; +Cc: Linux NFS Mailing List

On Mon, 2018-09-03 at 13:41 -0400, Chuck Lever wrote:
> > On Sep 3, 2018, at 11:29 AM, Trond Myklebust <trondmy@gmail.com>
> > wrote:
> > 
> > For historical reasons, the RPC client is heavily serialised during
> > the
> > process of transmitting a request by the XPRT_LOCK. A request is
> > required to take that lock before it can start XDR encoding, and it
> > is
> > required to hold it until it is done transmitting. In essence the
> > lock
> > protects the following functions:
> > 
> > - Stream based transport connect/reconnect
> > - RPCSEC_GSS encoding of the RPC message
> > - Transmission of a single RPC message
> 
> It also protects TCP rqst slot allocation:
> 
> void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task
> *task)
> {
>         /* Note: grabbing the xprt_lock_write() ensures that we
> throttle
>          * new slot allocation if the transport is congested (i.e.
> when
>          * reconnecting a stream transport or when out of socket
> write
>          * buffer space).
>          */
>         if (xprt_lock_write(xprt, task)) {
>                 xprt_alloc_slot(xprt, task);
>                 xprt_release_write(xprt, task);
>         }
> }

Ack. That needs some thought.

> 
> > The following patch set assumes that we do not need to do much to
> > improve performance of the connect/reconnect case, as that is
> > supposed
> > to be a rare occurrence.
> > 
> > The set looks at dealing with RPCSEC_GSS issues by removing
> > serialisation
> > while encoding, and simply assuming that if we detect after
> > grabbing the
> > XPRT_LOCK that we're about to transmit a message with a sequence
> > number
> > that has fallen outside the window allowed by RFC2203, then we can
> > abort the transmission of that message, and schedule it for re-
> > encoding.
> > Since window sizes are typically expected to lie above 100 messages
> > or
> > so, we expect these cases where we miss the window to be rare, in
> > general.
> > 
> > Finally, we look at trying to avoid the requirement that every
> > request
> > must go through the process of being woken up to grab the XPRT_LOCK
> > in
> > order to transmit itself by allowing a request that currently holds
> > the
> > XPRT_LOCK to grab other requests from an ordered queue, and to
> > transmit
> > them too. The bulk of the changes in this patchset are dedicated to
> > providing this functionality.
> 
> When considering whether this kind of change could work for
> xprtrdma: the transport send lock mechanism is used to manage
> the credit grant. The transport send lock prevents the client
> from sending too many RPC Calls at once.
> 
> Congestion- or flow-controlled transports might not be able to
> adopt this approach, because there needs to be a check before
> each RPC Call is sent to see if the congestion/credit window
> has room.

That's a good point. We might want to put an additional check for
congestion overflow in the send request to push back when we hit the
credit window limit. I'll think about that.

> 
> > Trond Myklebust (27):
> >  SUNRPC: Clean up initialisation of the struct rpc_rqst
> >  SUNRPC: If there is no reply expected, bail early from call_decode
> >  SUNRPC: The transmitted message must lie in the RPCSEC window of
> >    validity
> >  SUNRPC: Simplify identification of when the message send/receive
> > is
> >    complete
> >  SUNRPC: Avoid holding locks across the XDR encoding of the RPC
> > message
> >  SUNRPC: Rename TCP receive-specific state variables
> >  SUNRPC: Move reset of TCP state variables into the reconnect code
> >  SUNRPC: Add socket transmit queue offset tracking
> >  SUNRPC: Simplify dealing with aborted partially transmitted
> > messages
> >  SUNRPC: Refactor the transport request pinning
> >  SUNRPC: Add a helper to wake up a sleeping rpc_task and set its
> > status
> >  SUNRPC: Don't wake queued RPC calls multiple times in
> > xprt_transmit
> >  SUNRPC: Rename xprt->recv_lock to xprt->queue_lock
> >  SUNRPC: Refactor xprt_transmit() to remove the reply queue code
> >  SUNRPC: Refactor xprt_transmit() to remove wait for reply code
> >  SUNRPC: Minor cleanup for call_transmit()
> >  SUNRPC: Distinguish between the slot allocation list and receive
> > queue
> >  NFS: Add a transmission queue for RPC requests
> >  SUNRPC: Refactor RPC call encoding
> >  SUNRPC: Treat the task and request as separate in the
> >    xprt_ops->send_request()
> >  SUNRPC: Don't reset the request 'bytes_sent' counter when
> > releasing
> >    XPRT_LOCK
> >  SUNRPC: Simplify xprt_prepare_transmit()
> >  SUNRPC: Move RPC retransmission stat counter to xprt_transmit()
> >  SUNRPC: Fix up the back channel transmit
> >  SUNRPC: Allow calls to xprt_transmit() to drain the entire
> > transmit
> >    queue
> >  SUNRPC: Queue the request for transmission immediately after
> > encoding
> >  SUNRPC: Convert the xprt->sending queue back to an ordinary wait
> > queue
> > 
> > include/linux/sunrpc/auth.h                |   2 +
> > include/linux/sunrpc/auth_gss.h            |   1 +
> > include/linux/sunrpc/sched.h               |   7 +-
> > include/linux/sunrpc/xprt.h                |  23 +-
> > include/linux/sunrpc/xprtsock.h            |  23 +-
> > include/trace/events/sunrpc.h              |  10 +-
> > net/sunrpc/auth.c                          |  10 +
> > net/sunrpc/auth_gss/auth_gss.c             |  41 ++
> > net/sunrpc/backchannel_rqst.c              |   3 +-
> > net/sunrpc/clnt.c                          | 139 +++---
> > net/sunrpc/sched.c                         |  63 ++-
> > net/sunrpc/svcsock.c                       |   6 +-
> > net/sunrpc/xprt.c                          | 503 +++++++++++++-----
> > ---
> > net/sunrpc/xprtrdma/backchannel.c          |   3 +-
> > net/sunrpc/xprtrdma/rpc_rdma.c             |  10 +-
> > net/sunrpc/xprtrdma/svc_rdma_backchannel.c |   7 +-
> > net/sunrpc/xprtrdma/transport.c            |   5 +-
> > net/sunrpc/xprtsock.c                      | 327 +++++++-------
> > 18 files changed, 728 insertions(+), 455 deletions(-)
> > 
> > -- 
> > 2.17.1
> > 
> 
> --
> Chuck Lever
> 
> 
> 

^ permalink raw reply	[flat|nested] 39+ messages in thread

* Re: [PATCH 03/27] SUNRPC: The transmitted message must lie in the RPCSEC window of validity
  2018-09-03 15:29     ` [PATCH 03/27] SUNRPC: The transmitted message must lie in the RPCSEC window of validity Trond Myklebust
  2018-09-03 15:29       ` [PATCH 04/27] SUNRPC: Simplify identification of when the message send/receive is complete Trond Myklebust
@ 2018-09-04 15:46       ` J. Bruce Fields
  1 sibling, 0 replies; 39+ messages in thread
From: J. Bruce Fields @ 2018-09-04 15:46 UTC (permalink / raw)
  To: Trond Myklebust; +Cc: linux-nfs

On Mon, Sep 03, 2018 at 11:29:12AM -0400, Trond Myklebust wrote:
> If a message has been encoded using RPCSEC_GSS, the server is
> maintaining a window of sequence numbers that it considers valid.
> The client should normally be tracking that window, and needs to
> verify that the sequence number used by the message being transmitted
> still lies inside the window of validity.
> 
> So far, we've been able to assume this condition would be realised
> automatically, since the server has been encoding the message only

Do you mean "the client has been encoding..."?

--b.

> after taking the socket lock. Once we change that condition, we
> will need the explicit check.
> 
> Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
> ---
>  include/linux/sunrpc/auth.h     |  2 ++
>  include/linux/sunrpc/auth_gss.h |  1 +
>  net/sunrpc/auth.c               | 10 ++++++++
>  net/sunrpc/auth_gss/auth_gss.c  | 41 +++++++++++++++++++++++++++++++++
>  net/sunrpc/clnt.c               |  3 +++
>  net/sunrpc/xprt.c               |  7 ++++++
>  6 files changed, 64 insertions(+)
> 
> diff --git a/include/linux/sunrpc/auth.h b/include/linux/sunrpc/auth.h
> index 58a6765c1c5e..2c97a3933ef9 100644
> --- a/include/linux/sunrpc/auth.h
> +++ b/include/linux/sunrpc/auth.h
> @@ -157,6 +157,7 @@ struct rpc_credops {
>  	int			(*crkey_timeout)(struct rpc_cred *);
>  	bool			(*crkey_to_expire)(struct rpc_cred *);
>  	char *			(*crstringify_acceptor)(struct rpc_cred *);
> +	bool			(*crneed_reencode)(struct rpc_task *);
>  };
>  
>  extern const struct rpc_authops	authunix_ops;
> @@ -192,6 +193,7 @@ __be32 *		rpcauth_marshcred(struct rpc_task *, __be32 *);
>  __be32 *		rpcauth_checkverf(struct rpc_task *, __be32 *);
>  int			rpcauth_wrap_req(struct rpc_task *task, kxdreproc_t encode, void *rqstp, __be32 *data, void *obj);
>  int			rpcauth_unwrap_resp(struct rpc_task *task, kxdrdproc_t decode, void *rqstp, __be32 *data, void *obj);
> +bool			rpcauth_xmit_need_reencode(struct rpc_task *task);
>  int			rpcauth_refreshcred(struct rpc_task *);
>  void			rpcauth_invalcred(struct rpc_task *);
>  int			rpcauth_uptodatecred(struct rpc_task *);
> diff --git a/include/linux/sunrpc/auth_gss.h b/include/linux/sunrpc/auth_gss.h
> index 0c9eac351aab..30427b729070 100644
> --- a/include/linux/sunrpc/auth_gss.h
> +++ b/include/linux/sunrpc/auth_gss.h
> @@ -70,6 +70,7 @@ struct gss_cl_ctx {
>  	refcount_t		count;
>  	enum rpc_gss_proc	gc_proc;
>  	u32			gc_seq;
> +	u32			gc_seq_xmit;
>  	spinlock_t		gc_seq_lock;
>  	struct gss_ctx		*gc_gss_ctx;
>  	struct xdr_netobj	gc_wire_ctx;
> diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c
> index 305ecea92170..59df5cdba0ac 100644
> --- a/net/sunrpc/auth.c
> +++ b/net/sunrpc/auth.c
> @@ -817,6 +817,16 @@ rpcauth_unwrap_resp(struct rpc_task *task, kxdrdproc_t decode, void *rqstp,
>  	return rpcauth_unwrap_req_decode(decode, rqstp, data, obj);
>  }
>  
> +bool
> +rpcauth_xmit_need_reencode(struct rpc_task *task)
> +{
> +	struct rpc_cred *cred = task->tk_rqstp->rq_cred;
> +
> +	if (!cred || !cred->cr_ops->crneed_reencode)
> +		return false;
> +	return cred->cr_ops->crneed_reencode(task);
> +}
> +
>  int
>  rpcauth_refreshcred(struct rpc_task *task)
>  {
> diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
> index 21c0aa0a0d1d..c898a7c75e84 100644
> --- a/net/sunrpc/auth_gss/auth_gss.c
> +++ b/net/sunrpc/auth_gss/auth_gss.c
> @@ -1984,6 +1984,46 @@ gss_unwrap_req_decode(kxdrdproc_t decode, struct rpc_rqst *rqstp,
>  	return decode(rqstp, &xdr, obj);
>  }
>  
> +static bool
> +gss_seq_is_newer(u32 new, u32 old)
> +{
> +	return (s32)(new - old) > 0;
> +}
> +
> +static bool
> +gss_xmit_need_reencode(struct rpc_task *task)
> +{
> +	struct rpc_rqst *req = task->tk_rqstp;
> +	struct rpc_cred *cred = req->rq_cred;
> +	struct gss_cl_ctx *ctx = gss_cred_get_ctx(cred);
> +	u32 win, seq_xmit;
> +	bool ret = true;
> +
> +	if (!ctx)
> +		return true;
> +
> +	if (gss_seq_is_newer(req->rq_seqno, READ_ONCE(ctx->gc_seq)))
> +		goto out;
> +
> +	seq_xmit = READ_ONCE(ctx->gc_seq_xmit);
> +	while (gss_seq_is_newer(req->rq_seqno, seq_xmit)) {
> +		u32 tmp = seq_xmit;
> +
> +		seq_xmit = cmpxchg(&ctx->gc_seq_xmit, tmp, req->rq_seqno);
> +		if (seq_xmit == tmp) {
> +			ret = false;
> +			goto out;
> +		}
> +	}
> +
> +	win = ctx->gc_win;
> +	if (win > 0)
> +		ret = !gss_seq_is_newer(req->rq_seqno, seq_xmit - win);
> +out:
> +	gss_put_ctx(ctx);
> +	return ret;
> +}
> +
>  static int
>  gss_unwrap_resp(struct rpc_task *task,
>  		kxdrdproc_t decode, void *rqstp, __be32 *p, void *obj)
> @@ -2052,6 +2092,7 @@ static const struct rpc_credops gss_credops = {
>  	.crunwrap_resp		= gss_unwrap_resp,
>  	.crkey_timeout		= gss_key_timeout,
>  	.crstringify_acceptor	= gss_stringify_acceptor,
> +	.crneed_reencode	= gss_xmit_need_reencode,
>  };
>  
>  static const struct rpc_credops gss_nullops = {
> diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
> index 4f1ec8013332..d41b5ac1d4e8 100644
> --- a/net/sunrpc/clnt.c
> +++ b/net/sunrpc/clnt.c
> @@ -2184,6 +2184,9 @@ call_status(struct rpc_task *task)
>  		/* shutdown or soft timeout */
>  		rpc_exit(task, status);
>  		break;
> +	case -EBADMSG:
> +		task->tk_action = call_transmit;
> +		break;
>  	default:
>  		if (clnt->cl_chatty)
>  			printk("%s: RPC call returned error %d\n",
> diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
> index 6aa09edc9567..3973e10ea2bd 100644
> --- a/net/sunrpc/xprt.c
> +++ b/net/sunrpc/xprt.c
> @@ -1014,6 +1014,13 @@ void xprt_transmit(struct rpc_task *task)
>  	dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
>  
>  	if (!req->rq_reply_bytes_recvd) {
> +
> +		/* Verify that our message lies in the RPCSEC_GSS window */
> +		if (!req->rq_bytes_sent && rpcauth_xmit_need_reencode(task)) {
> +			task->tk_status = -EBADMSG;
> +			return;
> +		}
> +
>  		if (list_empty(&req->rq_list) && rpc_reply_expected(task)) {
>  			/*
>  			 * Add to the list only if we're expecting a reply
> -- 
> 2.17.1

^ permalink raw reply	[flat|nested] 39+ messages in thread

end of thread, other threads:[~2018-09-04 20:12 UTC | newest]

Thread overview: 39+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-09-03 15:29 [PATCH 00/27] Convert RPC client transmission to a queued model Trond Myklebust
2018-09-03 15:29 ` [PATCH 01/27] SUNRPC: Clean up initialisation of the struct rpc_rqst Trond Myklebust
2018-09-03 15:29   ` [PATCH 02/27] SUNRPC: If there is no reply expected, bail early from call_decode Trond Myklebust
2018-09-03 15:29     ` [PATCH 03/27] SUNRPC: The transmitted message must lie in the RPCSEC window of validity Trond Myklebust
2018-09-03 15:29       ` [PATCH 04/27] SUNRPC: Simplify identification of when the message send/receive is complete Trond Myklebust
2018-09-03 15:29         ` [PATCH 05/27] SUNRPC: Avoid holding locks across the XDR encoding of the RPC message Trond Myklebust
2018-09-03 15:29           ` [PATCH 06/27] SUNRPC: Rename TCP receive-specific state variables Trond Myklebust
2018-09-03 15:29             ` [PATCH 07/27] SUNRPC: Move reset of TCP state variables into the reconnect code Trond Myklebust
2018-09-03 15:29               ` [PATCH 08/27] SUNRPC: Add socket transmit queue offset tracking Trond Myklebust
2018-09-03 15:29                 ` [PATCH 09/27] SUNRPC: Simplify dealing with aborted partially transmitted messages Trond Myklebust
2018-09-03 15:29                   ` [PATCH 10/27] SUNRPC: Refactor the transport request pinning Trond Myklebust
2018-09-03 15:29                     ` [PATCH 11/27] SUNRPC: Add a helper to wake up a sleeping rpc_task and set its status Trond Myklebust
2018-09-03 15:29                       ` [PATCH 12/27] SUNRPC: Don't wake queued RPC calls multiple times in xprt_transmit Trond Myklebust
2018-09-03 15:29                         ` [PATCH 13/27] SUNRPC: Rename xprt->recv_lock to xprt->queue_lock Trond Myklebust
2018-09-03 15:29                           ` [PATCH 14/27] SUNRPC: Refactor xprt_transmit() to remove the reply queue code Trond Myklebust
2018-09-03 15:29                             ` [PATCH 15/27] SUNRPC: Refactor xprt_transmit() to remove wait for reply code Trond Myklebust
2018-09-03 15:29                               ` [PATCH 16/27] SUNRPC: Minor cleanup for call_transmit() Trond Myklebust
2018-09-03 15:29                                 ` [PATCH 17/27] SUNRPC: Distinguish between the slot allocation list and receive queue Trond Myklebust
2018-09-03 15:29                                   ` [PATCH 18/27] NFS: Add a transmission queue for RPC requests Trond Myklebust
2018-09-03 15:29                                     ` [PATCH 19/27] SUNRPC: Refactor RPC call encoding Trond Myklebust
2018-09-03 15:29                                       ` [PATCH 20/27] SUNRPC: Treat the task and request as separate in the xprt_ops->send_request() Trond Myklebust
2018-09-03 15:29                                         ` [PATCH 21/27] SUNRPC: Don't reset the request 'bytes_sent' counter when releasing XPRT_LOCK Trond Myklebust
2018-09-03 15:29                                           ` [PATCH 22/27] SUNRPC: Simplify xprt_prepare_transmit() Trond Myklebust
2018-09-03 15:29                                             ` [PATCH 23/27] SUNRPC: Move RPC retransmission stat counter to xprt_transmit() Trond Myklebust
2018-09-03 15:29                                               ` [PATCH 24/27] SUNRPC: Fix up the back channel transmit Trond Myklebust
2018-09-03 15:29                                                 ` [PATCH 25/27] SUNRPC: Allow calls to xprt_transmit() to drain the entire transmit queue Trond Myklebust
2018-09-03 15:29                                                   ` [PATCH 26/27] SUNRPC: Queue the request for transmission immediately after encoding Trond Myklebust
2018-09-03 15:29                                                     ` [PATCH 27/27] SUNRPC: Convert the xprt->sending queue back to an ordinary wait queue Trond Myklebust
2018-09-03 17:31                                               ` [PATCH 23/27] SUNRPC: Move RPC retransmission stat counter to xprt_transmit() Chuck Lever
2018-09-03 17:49                                                 ` Trond Myklebust
2018-09-03 17:28                                         ` [PATCH 20/27] SUNRPC: Treat the task and request as separate in the xprt_ops->send_request() Chuck Lever
2018-09-03 17:47                                           ` Trond Myklebust
2018-09-03 17:49                                             ` Chuck Lever
2018-09-03 17:11           ` [PATCH 05/27] SUNRPC: Avoid holding locks across the XDR encoding of the RPC message Chuck Lever
2018-09-03 17:40             ` Trond Myklebust
2018-09-03 17:15         ` [PATCH 04/27] SUNRPC: Simplify identification of when the message send/receive is complete Chuck Lever
2018-09-04 15:46       ` [PATCH 03/27] SUNRPC: The transmitted message must lie in the RPCSEC window of validity J. Bruce Fields
2018-09-03 17:41 ` [PATCH 00/27] Convert RPC client transmission to a queued model Chuck Lever
2018-09-03 17:55   ` Trond Myklebust

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.