linux-nfs.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH RFC 0/9] Address bugzilla 198053 and more ...
@ 2020-02-14 15:49 Chuck Lever
  2020-02-14 15:49 ` [PATCH RFC 1/9] nfsd: Fix NFSv4 READ on RDMA when using readv Chuck Lever
                   ` (8 more replies)
  0 siblings, 9 replies; 10+ messages in thread
From: Chuck Lever @ 2020-02-14 15:49 UTC (permalink / raw)
  To: bfields; +Cc: linux-rdma, linux-nfs

Hi Bruce-

As promised, I'm resending the fix for 198053, now that the v5.6
merge window has closed. This fix gets splice-incapable file systems
working with NFS/RDMA. That's the first patch in this series.

We can discuss splitting the fix up again, if you so desire, but my
sense is that will make the fix more challenging to backport into
stable kernels.

The next logical step is to add support for multiple READ payloads
to the server's RPC-over-RDMA transport implementation. Subsequent
patches in this series start down that path. There is more work to
do to finish that task. Today I'm sending only what is code-complete
and working.

The primary issue is that today svcrdma assumes that rq_res's page
vector is exactly what needs to be pushed in a single Write chunk.
In other words, only one read payload is supported, and it has to
fit exactly into that page vector. And critically, the XDR pad for
that payload must not be included in the page vector.

I've already implemented changes to handle Writing more than one
chunk back to a client. See patches 4 and 7.

Patch 9 introduces a data structure to keep track of multiple Write
chunks and multiple read payloads. Next, the svc_rdma_sendto path
needs to be changed to use the information in this data structure to
exclude arbitrary segments of rq_res (ie, read payloads already sent
via explicit RDMA) when constructing each RPC/RDMA Reply.

Comments and input are welcome as always.


---

Chuck Lever (9):
      nfsd: Fix NFSv4 READ on RDMA when using readv
      NFSD: Clean up nfsd4_encode_readv
      svcrdma: Avoid DMA mapping small RPC Replies
      NFSD: Invoke svc_encode_read_payload in "read" NFSD encoders
      svcrdma: Add trace point to examine client-provided write segment
      svcrdma: De-duplicate code that locates Write and Reply chunks
      svcrdma: Post RDMA Writes while XDR encoding replies
      svcrdma: Refactor svc_rdma_sendto()
      svcrdma: Add data structure to track READ payloads


 fs/nfsd/nfs3xdr.c                          |    4 
 fs/nfsd/nfs4xdr.c                          |   32 ++--
 fs/nfsd/nfsxdr.c                           |    4 
 include/linux/sunrpc/svc.h                 |    3 
 include/linux/sunrpc/svc_rdma.h            |   21 ++
 include/linux/sunrpc/svc_xprt.h            |    2 
 include/trace/events/rpcrdma.h             |   47 +++++
 net/sunrpc/svc.c                           |   16 ++
 net/sunrpc/svcsock.c                       |    8 +
 net/sunrpc/xprtrdma/svc_rdma_backchannel.c |    2 
 net/sunrpc/xprtrdma/svc_rdma_recvfrom.c    |   58 +++++--
 net/sunrpc/xprtrdma/svc_rdma_rw.c          |   42 +++--
 net/sunrpc/xprtrdma/svc_rdma_sendto.c      |  248 +++++++++++++---------------
 net/sunrpc/xprtrdma/svc_rdma_transport.c   |    1 
 14 files changed, 308 insertions(+), 180 deletions(-)

--
Chuck Lever

^ permalink raw reply	[flat|nested] 10+ messages in thread

* [PATCH RFC 1/9] nfsd: Fix NFSv4 READ on RDMA when using readv
  2020-02-14 15:49 [PATCH RFC 0/9] Address bugzilla 198053 and more Chuck Lever
@ 2020-02-14 15:49 ` Chuck Lever
  2020-02-14 15:49 ` [PATCH RFC 2/9] NFSD: Clean up nfsd4_encode_readv Chuck Lever
                   ` (7 subsequent siblings)
  8 siblings, 0 replies; 10+ messages in thread
From: Chuck Lever @ 2020-02-14 15:49 UTC (permalink / raw)
  To: bfields; +Cc: linux-rdma, linux-nfs

svcrdma expects that the payload falls precisely into the xdr_buf
page vector. This does not seem to be the case for
nfsd4_encode_readv().

This code is called only when fops->splice_read is missing or when
RQ_SPLICE_OK is clear, so it's not a noticeable problem in many
common cases.

Add new transport method: ->xpo_read_payload so that when a READ
payload does not fit exactly in rq_res's page vector, the XDR
encoder can inform the RPC transport exactly where that payload is,
without the payload's XDR pad.

That way, when a Write chunk is present, the transport knows what
byte range in the Reply message is supposed to be matched with the
chunk.

Note that the Linux NFS server implementation of NFS/RDMA can
currently handle only one Write chunk per RPC-over-RDMA message.
This simplifies the implementation of this fix.

Fixes: b04209806384 ("nfsd4: allow exotic read compounds")
Buglink: https://bugzilla.kernel.org/show_bug.cgi?id=198053
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 fs/nfsd/nfs4xdr.c                        |   20 ++++++++-------
 include/linux/sunrpc/svc.h               |    3 ++
 include/linux/sunrpc/svc_rdma.h          |    8 +++++-
 include/linux/sunrpc/svc_xprt.h          |    2 ++
 net/sunrpc/svc.c                         |   16 ++++++++++++
 net/sunrpc/svcsock.c                     |    8 ++++++
 net/sunrpc/xprtrdma/svc_rdma_recvfrom.c  |    1 +
 net/sunrpc/xprtrdma/svc_rdma_rw.c        |   30 ++++++++++++++---------
 net/sunrpc/xprtrdma/svc_rdma_sendto.c    |   40 +++++++++++++++++++++++++++++-
 net/sunrpc/xprtrdma/svc_rdma_transport.c |    1 +
 10 files changed, 106 insertions(+), 23 deletions(-)

diff --git a/fs/nfsd/nfs4xdr.c b/fs/nfsd/nfs4xdr.c
index 9761512674a0..60be969d8be1 100644
--- a/fs/nfsd/nfs4xdr.c
+++ b/fs/nfsd/nfs4xdr.c
@@ -3594,17 +3594,17 @@ static __be32 nfsd4_encode_readv(struct nfsd4_compoundres *resp,
 	u32 zzz = 0;
 	int pad;
 
+	/*
+	 * svcrdma requires every READ payload to start somewhere
+	 * in xdr->pages.
+	 */
+	if (xdr->iov == xdr->buf->head) {
+		xdr->iov = NULL;
+		xdr->end = xdr->p;
+	}
+
 	len = maxcount;
 	v = 0;
-
-	thislen = min_t(long, len, ((void *)xdr->end - (void *)xdr->p));
-	p = xdr_reserve_space(xdr, (thislen+3)&~3);
-	WARN_ON_ONCE(!p);
-	resp->rqstp->rq_vec[v].iov_base = p;
-	resp->rqstp->rq_vec[v].iov_len = thislen;
-	v++;
-	len -= thislen;
-
 	while (len) {
 		thislen = min_t(long, len, PAGE_SIZE);
 		p = xdr_reserve_space(xdr, (thislen+3)&~3);
@@ -3623,6 +3623,8 @@ static __be32 nfsd4_encode_readv(struct nfsd4_compoundres *resp,
 	read->rd_length = maxcount;
 	if (nfserr)
 		return nfserr;
+	if (svc_encode_read_payload(resp->rqstp, starting_len + 8, maxcount))
+		return nfserr_io;
 	xdr_truncate_encode(xdr, starting_len + 8 + ((maxcount+3)&~3));
 
 	tmp = htonl(eof);
diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
index 1afe38eb33f7..82665ff360fd 100644
--- a/include/linux/sunrpc/svc.h
+++ b/include/linux/sunrpc/svc.h
@@ -517,6 +517,9 @@ int		   svc_register(const struct svc_serv *, struct net *, const int,
 void		   svc_reserve(struct svc_rqst *rqstp, int space);
 struct svc_pool *  svc_pool_for_cpu(struct svc_serv *serv, int cpu);
 char *		   svc_print_addr(struct svc_rqst *, char *, size_t);
+int		   svc_encode_read_payload(struct svc_rqst *rqstp,
+					   unsigned int offset,
+					   unsigned int length);
 unsigned int	   svc_fill_write_vector(struct svc_rqst *rqstp,
 					 struct page **pages,
 					 struct kvec *first, size_t total);
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 40f65888dd38..04e4a34d1c6a 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -137,6 +137,8 @@ struct svc_rdma_recv_ctxt {
 	unsigned int		rc_page_count;
 	unsigned int		rc_hdr_count;
 	u32			rc_inv_rkey;
+	unsigned int		rc_read_payload_offset;
+	unsigned int		rc_read_payload_length;
 	struct page		*rc_pages[RPCSVC_MAXPAGES];
 };
 
@@ -170,7 +172,9 @@ extern int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma,
 				    struct svc_rqst *rqstp,
 				    struct svc_rdma_recv_ctxt *head, __be32 *p);
 extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
-				     __be32 *wr_ch, struct xdr_buf *xdr);
+				     __be32 *wr_ch, struct xdr_buf *xdr,
+				     unsigned int offset,
+				     unsigned long length);
 extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
 				     __be32 *rp_ch, bool writelist,
 				     struct xdr_buf *xdr);
@@ -189,6 +193,8 @@ extern int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
 				  struct svc_rdma_send_ctxt *ctxt,
 				  struct xdr_buf *xdr, __be32 *wr_lst);
 extern int svc_rdma_sendto(struct svc_rqst *);
+extern int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
+				 unsigned int length);
 
 /* svc_rdma_transport.c */
 extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
diff --git a/include/linux/sunrpc/svc_xprt.h b/include/linux/sunrpc/svc_xprt.h
index ea6f46be9cb7..9e1e046de176 100644
--- a/include/linux/sunrpc/svc_xprt.h
+++ b/include/linux/sunrpc/svc_xprt.h
@@ -21,6 +21,8 @@ struct svc_xprt_ops {
 	int		(*xpo_has_wspace)(struct svc_xprt *);
 	int		(*xpo_recvfrom)(struct svc_rqst *);
 	int		(*xpo_sendto)(struct svc_rqst *);
+	int		(*xpo_read_payload)(struct svc_rqst *, unsigned int,
+					    unsigned int);
 	void		(*xpo_release_rqst)(struct svc_rqst *);
 	void		(*xpo_detach)(struct svc_xprt *);
 	void		(*xpo_free)(struct svc_xprt *);
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index 187dd4e73d64..18676d36f490 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1637,6 +1637,22 @@ u32 svc_max_payload(const struct svc_rqst *rqstp)
 EXPORT_SYMBOL_GPL(svc_max_payload);
 
 /**
+ * svc_encode_read_payload - mark a range of bytes as a READ payload
+ * @rqstp: svc_rqst to operate on
+ * @offset: payload's byte offset in rqstp->rq_res
+ * @length: size of payload, in bytes
+ *
+ * Returns zero on success, or a negative errno if a permanent
+ * error occurred.
+ */
+int svc_encode_read_payload(struct svc_rqst *rqstp, unsigned int offset,
+			    unsigned int length)
+{
+	return rqstp->rq_xprt->xpt_ops->xpo_read_payload(rqstp, offset, length);
+}
+EXPORT_SYMBOL_GPL(svc_encode_read_payload);
+
+/**
  * svc_fill_write_vector - Construct data argument for VFS write call
  * @rqstp: svc_rqst to operate on
  * @pages: list of pages containing data payload
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index 2934dd711715..758ab10690de 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -279,6 +279,12 @@ static int svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr)
 	return len;
 }
 
+static int svc_sock_read_payload(struct svc_rqst *rqstp, unsigned int offset,
+				 unsigned int length)
+{
+	return 0;
+}
+
 /*
  * Report socket names for nfsdfs
  */
@@ -653,6 +659,7 @@ static struct svc_xprt *svc_udp_create(struct svc_serv *serv,
 	.xpo_create = svc_udp_create,
 	.xpo_recvfrom = svc_udp_recvfrom,
 	.xpo_sendto = svc_udp_sendto,
+	.xpo_read_payload = svc_sock_read_payload,
 	.xpo_release_rqst = svc_release_udp_skb,
 	.xpo_detach = svc_sock_detach,
 	.xpo_free = svc_sock_free,
@@ -1171,6 +1178,7 @@ static struct svc_xprt *svc_tcp_create(struct svc_serv *serv,
 	.xpo_create = svc_tcp_create,
 	.xpo_recvfrom = svc_tcp_recvfrom,
 	.xpo_sendto = svc_tcp_sendto,
+	.xpo_read_payload = svc_sock_read_payload,
 	.xpo_release_rqst = svc_release_skb,
 	.xpo_detach = svc_tcp_sock_detach,
 	.xpo_free = svc_sock_free,
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 96bccd398469..71127d898562 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -193,6 +193,7 @@ void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma)
 
 out:
 	ctxt->rc_page_count = 0;
+	ctxt->rc_read_payload_length = 0;
 	return ctxt;
 
 out_empty:
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 48fe3b16b0d9..b0ac535c8728 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -482,18 +482,19 @@ static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info,
 				     vec->iov_len);
 }
 
-/* Send an xdr_buf's page list by itself. A Write chunk is
- * just the page list. a Reply chunk is the head, page list,
- * and tail. This function is shared between the two types
- * of chunk.
+/* Send an xdr_buf's page list by itself. A Write chunk is just
+ * the page list. A Reply chunk is @xdr's head, page list, and
+ * tail. This function is shared between the two types of chunk.
  */
 static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
-				      struct xdr_buf *xdr)
+				      struct xdr_buf *xdr,
+				      unsigned int offset,
+				      unsigned long length)
 {
 	info->wi_xdr = xdr;
-	info->wi_next_off = 0;
+	info->wi_next_off = offset - xdr->head[0].iov_len;
 	return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
-				     xdr->page_len);
+				     length);
 }
 
 /**
@@ -501,6 +502,8 @@ static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
  * @rdma: controlling RDMA transport
  * @wr_ch: Write chunk provided by client
  * @xdr: xdr_buf containing the data payload
+ * @offset: payload's byte offset in @xdr
+ * @length: size of payload, in bytes
  *
  * Returns a non-negative number of bytes the chunk consumed, or
  *	%-E2BIG if the payload was larger than the Write chunk,
@@ -510,19 +513,20 @@ static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
  *	%-EIO if rdma_rw initialization failed (DMA mapping, etc).
  */
 int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
-			      struct xdr_buf *xdr)
+			      struct xdr_buf *xdr,
+			      unsigned int offset, unsigned long length)
 {
 	struct svc_rdma_write_info *info;
 	int ret;
 
-	if (!xdr->page_len)
+	if (!length)
 		return 0;
 
 	info = svc_rdma_write_info_alloc(rdma, wr_ch);
 	if (!info)
 		return -ENOMEM;
 
-	ret = svc_rdma_send_xdr_pagelist(info, xdr);
+	ret = svc_rdma_send_xdr_pagelist(info, xdr, offset, length);
 	if (ret < 0)
 		goto out_err;
 
@@ -531,7 +535,7 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
 		goto out_err;
 
 	trace_svcrdma_encode_write(xdr->page_len);
-	return xdr->page_len;
+	return length;
 
 out_err:
 	svc_rdma_write_info_free(info);
@@ -571,7 +575,9 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch,
 	 * client did not provide Write chunks.
 	 */
 	if (!writelist && xdr->page_len) {
-		ret = svc_rdma_send_xdr_pagelist(info, xdr);
+		ret = svc_rdma_send_xdr_pagelist(info, xdr,
+						 xdr->head[0].iov_len,
+						 xdr->page_len);
 		if (ret < 0)
 			goto out_err;
 		consumed += xdr->page_len;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index f3f108090aa4..a11983c2056f 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -858,7 +858,18 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 
 	if (wr_lst) {
 		/* XXX: Presume the client sent only one Write chunk */
-		ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr);
+		unsigned long offset;
+		unsigned int length;
+
+		if (rctxt->rc_read_payload_length) {
+			offset = rctxt->rc_read_payload_offset;
+			length = rctxt->rc_read_payload_length;
+		} else {
+			offset = xdr->head[0].iov_len;
+			length = xdr->page_len;
+		}
+		ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr, offset,
+						length);
 		if (ret < 0)
 			goto err2;
 		svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret);
@@ -900,3 +911,30 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 	ret = -ENOTCONN;
 	goto out;
 }
+
+/**
+ * svc_rdma_read_payload - special processing for a READ payload
+ * @rqstp: svc_rqst to operate on
+ * @offset: payload's byte offset in @xdr
+ * @length: size of payload, in bytes
+ *
+ * Returns zero on success.
+ *
+ * For the moment, just record the xdr_buf location of the READ
+ * payload. svc_rdma_sendto will use that location later when
+ * we actually send the payload.
+ */
+int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
+			  unsigned int length)
+{
+	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
+
+	/* XXX: Just one READ payload slot for now, since our
+	 * transport implementation currently supports only one
+	 * Write chunk.
+	 */
+	rctxt->rc_read_payload_offset = offset;
+	rctxt->rc_read_payload_length = length;
+
+	return 0;
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 145a3615c319..f6aad2798063 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -82,6 +82,7 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
 	.xpo_create = svc_rdma_create,
 	.xpo_recvfrom = svc_rdma_recvfrom,
 	.xpo_sendto = svc_rdma_sendto,
+	.xpo_read_payload = svc_rdma_read_payload,
 	.xpo_release_rqst = svc_rdma_release_rqst,
 	.xpo_detach = svc_rdma_detach,
 	.xpo_free = svc_rdma_free,


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH RFC 2/9] NFSD: Clean up nfsd4_encode_readv
  2020-02-14 15:49 [PATCH RFC 0/9] Address bugzilla 198053 and more Chuck Lever
  2020-02-14 15:49 ` [PATCH RFC 1/9] nfsd: Fix NFSv4 READ on RDMA when using readv Chuck Lever
@ 2020-02-14 15:49 ` Chuck Lever
  2020-02-14 15:49 ` [PATCH RFC 3/9] svcrdma: Avoid DMA mapping small RPC Replies Chuck Lever
                   ` (6 subsequent siblings)
  8 siblings, 0 replies; 10+ messages in thread
From: Chuck Lever @ 2020-02-14 15:49 UTC (permalink / raw)
  To: bfields; +Cc: linux-rdma, linux-nfs

Address some minor nits I noticed while working on this function.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 fs/nfsd/nfs4xdr.c |    9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/fs/nfsd/nfs4xdr.c b/fs/nfsd/nfs4xdr.c
index 60be969d8be1..262f9fc76e4e 100644
--- a/fs/nfsd/nfs4xdr.c
+++ b/fs/nfsd/nfs4xdr.c
@@ -3591,7 +3591,6 @@ static __be32 nfsd4_encode_readv(struct nfsd4_compoundres *resp,
 	__be32 nfserr;
 	__be32 tmp;
 	__be32 *p;
-	u32 zzz = 0;
 	int pad;
 
 	/*
@@ -3607,7 +3606,7 @@ static __be32 nfsd4_encode_readv(struct nfsd4_compoundres *resp,
 	v = 0;
 	while (len) {
 		thislen = min_t(long, len, PAGE_SIZE);
-		p = xdr_reserve_space(xdr, (thislen+3)&~3);
+		p = xdr_reserve_space(xdr, thislen);
 		WARN_ON_ONCE(!p);
 		resp->rqstp->rq_vec[v].iov_base = p;
 		resp->rqstp->rq_vec[v].iov_len = thislen;
@@ -3616,7 +3615,6 @@ static __be32 nfsd4_encode_readv(struct nfsd4_compoundres *resp,
 	}
 	read->rd_vlen = v;
 
-	len = maxcount;
 	nfserr = nfsd_readv(resp->rqstp, read->rd_fhp, file, read->rd_offset,
 			    resp->rqstp->rq_vec, read->rd_vlen, &maxcount,
 			    &eof);
@@ -3625,16 +3623,17 @@ static __be32 nfsd4_encode_readv(struct nfsd4_compoundres *resp,
 		return nfserr;
 	if (svc_encode_read_payload(resp->rqstp, starting_len + 8, maxcount))
 		return nfserr_io;
-	xdr_truncate_encode(xdr, starting_len + 8 + ((maxcount+3)&~3));
+	xdr_truncate_encode(xdr, starting_len + 8 + xdr_align_size(maxcount));
 
 	tmp = htonl(eof);
 	write_bytes_to_xdr_buf(xdr->buf, starting_len    , &tmp, 4);
 	tmp = htonl(maxcount);
 	write_bytes_to_xdr_buf(xdr->buf, starting_len + 4, &tmp, 4);
 
+	tmp = xdr_zero;
 	pad = (maxcount&3) ? 4 - (maxcount&3) : 0;
 	write_bytes_to_xdr_buf(xdr->buf, starting_len + 8 + maxcount,
-								&zzz, pad);
+								&tmp, pad);
 	return 0;
 
 }


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH RFC 3/9] svcrdma: Avoid DMA mapping small RPC Replies
  2020-02-14 15:49 [PATCH RFC 0/9] Address bugzilla 198053 and more Chuck Lever
  2020-02-14 15:49 ` [PATCH RFC 1/9] nfsd: Fix NFSv4 READ on RDMA when using readv Chuck Lever
  2020-02-14 15:49 ` [PATCH RFC 2/9] NFSD: Clean up nfsd4_encode_readv Chuck Lever
@ 2020-02-14 15:49 ` Chuck Lever
  2020-02-14 15:50 ` [PATCH RFC 4/9] NFSD: Invoke svc_encode_read_payload in "read" NFSD encoders Chuck Lever
                   ` (5 subsequent siblings)
  8 siblings, 0 replies; 10+ messages in thread
From: Chuck Lever @ 2020-02-14 15:49 UTC (permalink / raw)
  To: bfields; +Cc: linux-rdma, linux-nfs

On some platforms, DMA mapping part of a page is more costly than
copying bytes. Indeed, not involving the I/O MMU can help the
RPC/RDMA transport scale better for tiny I/Os across more RDMA
devices. This is because interaction with the I/O MMU is eliminated
for each of these small I/Os. Without the explicit unmapping, the
NIC no longer needs to do a costly internal TLB shoot down for
buffers that are just a handful of bytes.

The heuristic for now is to pull-up when the size of the RPC message
body is smaller than half the minimum Send buffer size.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/trace/events/rpcrdma.h        |   40 +++++++++++++++++++++++++++++++++
 net/sunrpc/xprtrdma/svc_rdma_sendto.c |   25 +++++++++++++++++----
 2 files changed, 61 insertions(+), 4 deletions(-)

diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index c0e4c93324f5..6f0d3e8ce95c 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -336,6 +336,44 @@
 				),					\
 				TP_ARGS(rqst))
 
+DECLARE_EVENT_CLASS(xdr_buf_class,
+	TP_PROTO(
+		const struct xdr_buf *xdr
+	),
+
+	TP_ARGS(xdr),
+
+	TP_STRUCT__entry(
+		__field(const void *, head_base)
+		__field(size_t, head_len)
+		__field(const void *, tail_base)
+		__field(size_t, tail_len)
+		__field(unsigned int, page_len)
+		__field(unsigned int, msg_len)
+	),
+
+	TP_fast_assign(
+		__entry->head_base = xdr->head[0].iov_base;
+		__entry->head_len = xdr->head[0].iov_len;
+		__entry->tail_base = xdr->tail[0].iov_base;
+		__entry->tail_len = xdr->tail[0].iov_len;
+		__entry->page_len = xdr->page_len;
+		__entry->msg_len = xdr->len;
+	),
+
+	TP_printk("head=[%p,%zu] page=%u tail=[%p,%zu] len=%u",
+		__entry->head_base, __entry->head_len, __entry->page_len,
+		__entry->tail_base, __entry->tail_len, __entry->msg_len
+	)
+);
+
+#define DEFINE_XDRBUF_EVENT(name)					\
+		DEFINE_EVENT(xdr_buf_class, name,			\
+				TP_PROTO(				\
+					const struct xdr_buf *xdr	\
+				),					\
+				TP_ARGS(xdr))
+
 /**
  ** Connection events
  **/
@@ -1634,6 +1672,8 @@
 	)
 );
 
+DEFINE_XDRBUF_EVENT(svcrdma_send_pullup);
+
 TRACE_EVENT(svcrdma_send_failed,
 	TP_PROTO(
 		const struct svc_rqst *rqst,
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index a11983c2056f..8ea21ca351e2 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -537,16 +537,32 @@ void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma,
 				      DMA_TO_DEVICE);
 }
 
-/* If the xdr_buf has more elements than the device can
- * transmit in a single RDMA Send, then the reply will
- * have to be copied into a bounce buffer.
+/**
+ * svc_rdma_pull_up_needed - Determine whether to use pull-up
+ * @rdma: controlling transport
+ * @ctxt: I/O resources for an RDMA Send
+ * @xdr: xdr_buf containing RPC message to transmit
+ * @wr_lst: pointer to start of Write chunk list
+ *
+ * Returns:
+ *	%true if pull-up should be used
+ *	%false otherwise
  */
 static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
+				    struct svc_rdma_send_ctxt *ctxt,
 				    struct xdr_buf *xdr,
 				    __be32 *wr_lst)
 {
 	int elements;
 
+	/* Avoid the overhead of DMA mapping for small messages.
+	 */
+	if (xdr->len < RPCRDMA_V1_DEF_INLINE_SIZE >> 1)
+		return true;
+
+	/* Check whether the xdr_buf has more elements than can
+	 * fit in a single RDMA Send.
+	 */
 	/* xdr->head */
 	elements = 1;
 
@@ -627,6 +643,7 @@ static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
 				      ctxt->sc_sges[0].length,
 				      DMA_TO_DEVICE);
 
+	trace_svcrdma_send_pullup(xdr);
 	return 0;
 }
 
@@ -652,7 +669,7 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
 	u32 xdr_pad;
 	int ret;
 
-	if (svc_rdma_pull_up_needed(rdma, xdr, wr_lst))
+	if (svc_rdma_pull_up_needed(rdma, ctxt, xdr, wr_lst))
 		return svc_rdma_pull_up_reply_msg(rdma, ctxt, xdr, wr_lst);
 
 	++ctxt->sc_cur_sge_no;


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH RFC 4/9] NFSD: Invoke svc_encode_read_payload in "read" NFSD encoders
  2020-02-14 15:49 [PATCH RFC 0/9] Address bugzilla 198053 and more Chuck Lever
                   ` (2 preceding siblings ...)
  2020-02-14 15:49 ` [PATCH RFC 3/9] svcrdma: Avoid DMA mapping small RPC Replies Chuck Lever
@ 2020-02-14 15:50 ` Chuck Lever
  2020-02-14 15:50 ` [PATCH RFC 5/9] svcrdma: Add trace point to examine client-provided write segment Chuck Lever
                   ` (4 subsequent siblings)
  8 siblings, 0 replies; 10+ messages in thread
From: Chuck Lever @ 2020-02-14 15:50 UTC (permalink / raw)
  To: bfields; +Cc: linux-rdma, linux-nfs

Have the NFSD encoders annotate the boundaries of every
direct-data-placement eligible READ data payload. Then change
svcrdma to use that annotation instead of the xdr->page_len
when handling Write chunks.

For NFSv4 on RDMA, that enables the ability to recognize multiple
READ payloads per compound. Next step is to support multiple Write
chunks.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 fs/nfsd/nfs3xdr.c                     |    4 ++++
 fs/nfsd/nfs4xdr.c                     |    3 +++
 fs/nfsd/nfsxdr.c                      |    4 ++++
 net/sunrpc/xprtrdma/svc_rdma_sendto.c |   15 +++------------
 4 files changed, 14 insertions(+), 12 deletions(-)

diff --git a/fs/nfsd/nfs3xdr.c b/fs/nfsd/nfs3xdr.c
index aae514d40b64..8c272efbc94e 100644
--- a/fs/nfsd/nfs3xdr.c
+++ b/fs/nfsd/nfs3xdr.c
@@ -712,6 +712,8 @@ void fill_post_wcc(struct svc_fh *fhp)
 			*p = 0;
 			rqstp->rq_res.tail[0].iov_len = 4 - (resp->len&3);
 		}
+		svc_encode_read_payload(rqstp, rqstp->rq_res.head[0].iov_len,
+					resp->len);
 		return 1;
 	} else
 		return xdr_ressize_check(rqstp, p);
@@ -737,6 +739,8 @@ void fill_post_wcc(struct svc_fh *fhp)
 			*p = 0;
 			rqstp->rq_res.tail[0].iov_len = 4 - (resp->count & 3);
 		}
+		svc_encode_read_payload(rqstp, rqstp->rq_res.head[0].iov_len,
+					resp->count);
 		return 1;
 	} else
 		return xdr_ressize_check(rqstp, p);
diff --git a/fs/nfsd/nfs4xdr.c b/fs/nfsd/nfs4xdr.c
index 262f9fc76e4e..a8d3f8f035a0 100644
--- a/fs/nfsd/nfs4xdr.c
+++ b/fs/nfsd/nfs4xdr.c
@@ -3547,6 +3547,8 @@ static __be32 nfsd4_encode_splice_read(
 		buf->page_len = 0;
 		return nfserr;
 	}
+	svc_encode_read_payload(read->rd_rqstp, buf->head[0].iov_len,
+				maxcount);
 
 	*(p++) = htonl(eof);
 	*(p++) = htonl(maxcount);
@@ -3713,6 +3715,7 @@ static __be32 nfsd4_encode_readv(struct nfsd4_compoundres *resp,
 		xdr_truncate_encode(xdr, length_offset);
 		return nfserr;
 	}
+	svc_encode_read_payload(readlink->rl_rqstp, length_offset, maxcount);
 
 	wire_count = htonl(maxcount);
 	write_bytes_to_xdr_buf(xdr->buf, length_offset, &wire_count, 4);
diff --git a/fs/nfsd/nfsxdr.c b/fs/nfsd/nfsxdr.c
index b51fe515f06f..98ea417042a6 100644
--- a/fs/nfsd/nfsxdr.c
+++ b/fs/nfsd/nfsxdr.c
@@ -462,6 +462,8 @@ __be32 *nfs2svc_encode_fattr(struct svc_rqst *rqstp, __be32 *p, struct svc_fh *f
 		*p = 0;
 		rqstp->rq_res.tail[0].iov_len = 4 - (resp->len&3);
 	}
+	svc_encode_read_payload(rqstp, rqstp->rq_res.head[0].iov_len,
+				resp->len);
 	return 1;
 }
 
@@ -482,6 +484,8 @@ __be32 *nfs2svc_encode_fattr(struct svc_rqst *rqstp, __be32 *p, struct svc_fh *f
 		*p = 0;
 		rqstp->rq_res.tail[0].iov_len = 4 - (resp->count&3);
 	}
+	svc_encode_read_payload(rqstp, rqstp->rq_res.head[0].iov_len,
+				resp->count);
 	return 1;
 }
 
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 8ea21ca351e2..40b4843be869 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -875,18 +875,9 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 
 	if (wr_lst) {
 		/* XXX: Presume the client sent only one Write chunk */
-		unsigned long offset;
-		unsigned int length;
-
-		if (rctxt->rc_read_payload_length) {
-			offset = rctxt->rc_read_payload_offset;
-			length = rctxt->rc_read_payload_length;
-		} else {
-			offset = xdr->head[0].iov_len;
-			length = xdr->page_len;
-		}
-		ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr, offset,
-						length);
+		ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr,
+						rctxt->rc_read_payload_offset,
+						rctxt->rc_read_payload_length);
 		if (ret < 0)
 			goto err2;
 		svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret);


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH RFC 5/9] svcrdma: Add trace point to examine client-provided write segment
  2020-02-14 15:49 [PATCH RFC 0/9] Address bugzilla 198053 and more Chuck Lever
                   ` (3 preceding siblings ...)
  2020-02-14 15:50 ` [PATCH RFC 4/9] NFSD: Invoke svc_encode_read_payload in "read" NFSD encoders Chuck Lever
@ 2020-02-14 15:50 ` Chuck Lever
  2020-02-14 15:50 ` [PATCH RFC 6/9] svcrdma: De-duplicate code that locates Write and Reply chunks Chuck Lever
                   ` (3 subsequent siblings)
  8 siblings, 0 replies; 10+ messages in thread
From: Chuck Lever @ 2020-02-14 15:50 UTC (permalink / raw)
  To: bfields; +Cc: linux-rdma, linux-nfs

Ensure clients send large enough Write chunks.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/trace/events/rpcrdma.h          |    7 ++++---
 net/sunrpc/xprtrdma/svc_rdma_recvfrom.c |   12 +++++++++---
 2 files changed, 13 insertions(+), 6 deletions(-)

diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index 6f0d3e8ce95c..773f6d9fd800 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -1507,7 +1507,7 @@
 );
 
 #define DEFINE_SEGMENT_EVENT(name)					\
-		DEFINE_EVENT(svcrdma_segment_event, svcrdma_encode_##name,\
+		DEFINE_EVENT(svcrdma_segment_event, svcrdma_##name,\
 				TP_PROTO(				\
 					u32 handle,			\
 					u32 length,			\
@@ -1515,8 +1515,9 @@
 				),					\
 				TP_ARGS(handle, length, offset))
 
-DEFINE_SEGMENT_EVENT(rseg);
-DEFINE_SEGMENT_EVENT(wseg);
+DEFINE_SEGMENT_EVENT(decode_wseg);
+DEFINE_SEGMENT_EVENT(encode_rseg);
+DEFINE_SEGMENT_EVENT(encode_wseg);
 
 DECLARE_EVENT_CLASS(svcrdma_chunk_event,
 	TP_PROTO(
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 71127d898562..2f16c0625226 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -420,13 +420,19 @@ static __be32 *xdr_check_write_chunk(__be32 *p, const __be32 *end,
 
 	segcount = be32_to_cpup(p++);
 	for (i = 0; i < segcount; i++) {
-		p++;	/* handle */
-		if (be32_to_cpup(p++) > maxlen)
+		u32 handle, length;
+		u64 offset;
+
+		handle = be32_to_cpup(p++);
+		length = be32_to_cpup(p++);
+		if (length > maxlen)
 			return NULL;
-		p += 2;	/* offset */
+		p = xdr_decode_hyper(p, &offset);
 
 		if (p > end)
 			return NULL;
+
+		trace_svcrdma_decode_wseg(handle, length, offset);
 	}
 
 	return p;


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH RFC 6/9] svcrdma: De-duplicate code that locates Write and Reply chunks
  2020-02-14 15:49 [PATCH RFC 0/9] Address bugzilla 198053 and more Chuck Lever
                   ` (4 preceding siblings ...)
  2020-02-14 15:50 ` [PATCH RFC 5/9] svcrdma: Add trace point to examine client-provided write segment Chuck Lever
@ 2020-02-14 15:50 ` Chuck Lever
  2020-02-14 15:50 ` [PATCH RFC 7/9] svcrdma: Post RDMA Writes while XDR encoding replies Chuck Lever
                   ` (2 subsequent siblings)
  8 siblings, 0 replies; 10+ messages in thread
From: Chuck Lever @ 2020-02-14 15:50 UTC (permalink / raw)
  To: bfields; +Cc: linux-rdma, linux-nfs

Cache the locations of the first Write chunk and the Reply chunk so
that the Send path doesn't need to parse the Call header again.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/linux/sunrpc/svc_rdma.h         |    2 ++
 net/sunrpc/xprtrdma/svc_rdma_recvfrom.c |   24 +++++++++++++-------
 net/sunrpc/xprtrdma/svc_rdma_sendto.c   |   38 +++----------------------------
 3 files changed, 22 insertions(+), 42 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 04e4a34d1c6a..07baeb5f93c1 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -137,6 +137,8 @@ struct svc_rdma_recv_ctxt {
 	unsigned int		rc_page_count;
 	unsigned int		rc_hdr_count;
 	u32			rc_inv_rkey;
+	__be32			*rc_write_list;
+	__be32			*rc_reply_chunk;
 	unsigned int		rc_read_payload_offset;
 	unsigned int		rc_read_payload_length;
 	struct page		*rc_pages[RPCSVC_MAXPAGES];
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 2f16c0625226..91abe08f7d75 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -444,15 +444,17 @@ static __be32 *xdr_check_write_chunk(__be32 *p, const __be32 *end,
  * - This implementation supports only one Write chunk.
  *
  * Sanity checks:
- * - Write list does not overflow buffer.
+ * - Write list does not overflow Receive buffer.
  * - Segment size limited by largest NFS data payload.
  *
  * Returns pointer to the following Reply chunk.
  */
-static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end)
+static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end,
+				    struct svc_rdma_recv_ctxt *ctxt)
 {
 	u32 chcount;
 
+	ctxt->rc_write_list = p;
 	chcount = 0;
 	while (*p++ != xdr_zero) {
 		p = xdr_check_write_chunk(p, end, MAX_BYTES_WRITE_SEG);
@@ -461,6 +463,8 @@ static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end)
 		if (chcount++ > 1)
 			return NULL;
 	}
+	if (!chcount)
+		ctxt->rc_write_list = NULL;
 	return p;
 }
 
@@ -472,13 +476,16 @@ static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end)
  *
  * Returns pointer to the following RPC header.
  */
-static __be32 *xdr_check_reply_chunk(__be32 *p, const __be32 *end)
+static __be32 *xdr_check_reply_chunk(__be32 *p, const __be32 *end,
+				     struct svc_rdma_recv_ctxt *ctxt)
 {
+	ctxt->rc_reply_chunk = p;
 	if (*p++ != xdr_zero) {
 		p = xdr_check_write_chunk(p, end, MAX_BYTES_SPECIAL_SEG);
 		if (!p)
 			return NULL;
-	}
+	} else
+		ctxt->rc_reply_chunk = NULL;
 	return p;
 }
 
@@ -554,7 +561,8 @@ static void svc_rdma_get_inv_rkey(struct svcxprt_rdma *rdma,
  * Assumptions:
  * - The transport header is entirely contained in the head iovec.
  */
-static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg)
+static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg,
+				   struct svc_rdma_recv_ctxt *ctxt)
 {
 	__be32 *p, *end, *rdma_argp;
 	unsigned int hdr_len;
@@ -587,10 +595,10 @@ static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg)
 	p = xdr_check_read_list(rdma_argp + 4, end);
 	if (!p)
 		goto out_inval;
-	p = xdr_check_write_list(p, end);
+	p = xdr_check_write_list(p, end, ctxt);
 	if (!p)
 		goto out_inval;
-	p = xdr_check_reply_chunk(p, end);
+	p = xdr_check_reply_chunk(p, end, ctxt);
 	if (!p)
 		goto out_inval;
 	if (p > end)
@@ -792,7 +800,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
 	rqstp->rq_next_page = rqstp->rq_respages;
 
 	p = (__be32 *)rqstp->rq_arg.head[0].iov_base;
-	ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg);
+	ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg, ctxt);
 	if (ret < 0)
 		goto out_err;
 	if (ret == 0)
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 40b4843be869..3c0e41d378bc 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -454,36 +454,6 @@ static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch,
 	xdr_encode_write_chunk(p, rp_ch, consumed);
 }
 
-/* Parse the RPC Call's transport header.
- */
-static void svc_rdma_get_write_arrays(__be32 *rdma_argp,
-				      __be32 **write, __be32 **reply)
-{
-	__be32 *p;
-
-	p = rdma_argp + rpcrdma_fixed_maxsz;
-
-	/* Read list */
-	while (*p++ != xdr_zero)
-		p += 5;
-
-	/* Write list */
-	if (*p != xdr_zero) {
-		*write = p;
-		while (*p++ != xdr_zero)
-			p += 1 + be32_to_cpu(*p) * 4;
-	} else {
-		*write = NULL;
-		p++;
-	}
-
-	/* Reply chunk */
-	if (*p != xdr_zero)
-		*reply = p;
-	else
-		*reply = NULL;
-}
-
 static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
 				 struct svc_rdma_send_ctxt *ctxt,
 				 struct page *page,
@@ -842,14 +812,14 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 	struct svcxprt_rdma *rdma =
 		container_of(xprt, struct svcxprt_rdma, sc_xprt);
 	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
-	__be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch;
+	__be32 *rdma_argp = rctxt->rc_recv_buf;
+	__be32 *wr_lst = rctxt->rc_write_list;
+	__be32 *rp_ch = rctxt->rc_reply_chunk;
 	struct xdr_buf *xdr = &rqstp->rq_res;
 	struct svc_rdma_send_ctxt *sctxt;
+	__be32 *p, *rdma_resp;
 	int ret;
 
-	rdma_argp = rctxt->rc_recv_buf;
-	svc_rdma_get_write_arrays(rdma_argp, &wr_lst, &rp_ch);
-
 	/* Create the RDMA response header. xprt->xpt_mutex,
 	 * acquired in svc_send(), serializes RPC replies. The
 	 * code path below that inserts the credit grant value


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH RFC 7/9] svcrdma: Post RDMA Writes while XDR encoding replies
  2020-02-14 15:49 [PATCH RFC 0/9] Address bugzilla 198053 and more Chuck Lever
                   ` (5 preceding siblings ...)
  2020-02-14 15:50 ` [PATCH RFC 6/9] svcrdma: De-duplicate code that locates Write and Reply chunks Chuck Lever
@ 2020-02-14 15:50 ` Chuck Lever
  2020-02-14 15:50 ` [PATCH RFC 8/9] svcrdma: Refactor svc_rdma_sendto() Chuck Lever
  2020-02-14 15:50 ` [PATCH RFC 9/9] svcrdma: Add data structure to track READ payloads Chuck Lever
  8 siblings, 0 replies; 10+ messages in thread
From: Chuck Lever @ 2020-02-14 15:50 UTC (permalink / raw)
  To: bfields; +Cc: linux-rdma, linux-nfs

The only RPC/RDMA ordering requirement between RDMA Writes and RDMA
Sends is that Writes have to be posted before the Send that sends
the RPC Reply for that Write payload.

The Linux NFS server implementation now has a transport method that
can post READ Payload Writes earlier than svc_rdma_sendto:

   ->xpo_read_payload.

Goals:
- Get RDMA Writes going earlier so they are more likely to be
  complete at the remote end before the Send completes.
- Allow more parallelism when dispatching RDMA operations by
  posting RDMA Writes before taking xpt_mutex.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 net/sunrpc/xprtrdma/svc_rdma_sendto.c |   26 +++++++++++---------------
 1 file changed, 11 insertions(+), 15 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 3c0e41d378bc..273453a336b0 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -843,15 +843,9 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 	*p++ = xdr_zero;
 	*p   = xdr_zero;
 
-	if (wr_lst) {
-		/* XXX: Presume the client sent only one Write chunk */
-		ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr,
-						rctxt->rc_read_payload_offset,
-						rctxt->rc_read_payload_length);
-		if (ret < 0)
-			goto err2;
-		svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret);
-	}
+	if (wr_lst)
+		svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst,
+					       rctxt->rc_read_payload_length);
 	if (rp_ch) {
 		ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr);
 		if (ret < 0)
@@ -896,16 +890,16 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
  * @offset: payload's byte offset in @xdr
  * @length: size of payload, in bytes
  *
- * Returns zero on success.
- *
- * For the moment, just record the xdr_buf location of the READ
- * payload. svc_rdma_sendto will use that location later when
- * we actually send the payload.
+ * Returns zero on success, or a negative errno.
  */
 int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
 			  unsigned int length)
 {
 	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
+	struct svcxprt_rdma *rdma;
+
+	if (!rctxt->rc_write_list)
+		return 0;
 
 	/* XXX: Just one READ payload slot for now, since our
 	 * transport implementation currently supports only one
@@ -914,5 +908,7 @@ int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
 	rctxt->rc_read_payload_offset = offset;
 	rctxt->rc_read_payload_length = length;
 
-	return 0;
+	rdma = container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
+	return svc_rdma_send_write_chunk(rdma, rctxt->rc_write_list,
+					 &rqstp->rq_res, offset, length);
 }


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH RFC 8/9] svcrdma: Refactor svc_rdma_sendto()
  2020-02-14 15:49 [PATCH RFC 0/9] Address bugzilla 198053 and more Chuck Lever
                   ` (6 preceding siblings ...)
  2020-02-14 15:50 ` [PATCH RFC 7/9] svcrdma: Post RDMA Writes while XDR encoding replies Chuck Lever
@ 2020-02-14 15:50 ` Chuck Lever
  2020-02-14 15:50 ` [PATCH RFC 9/9] svcrdma: Add data structure to track READ payloads Chuck Lever
  8 siblings, 0 replies; 10+ messages in thread
From: Chuck Lever @ 2020-02-14 15:50 UTC (permalink / raw)
  To: bfields; +Cc: linux-rdma, linux-nfs

No behavior change expected, just preparing for subsequent patches.

Pass the RPC request's svc_rdma_recv_ctxt deeper into the sendto()
path. This will enable us to subsequent pass more information about
the Reply into those lower-level functions.

Since we're touching the synopses of these functions, let's also
change the header encoding to work like other areas: Instead of
walking over the beginning of the header when encoding each
chunk list, use the "p = xdr_encode_blob(p);" style that is
consistent with most other XDR-related code.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/linux/sunrpc/svc_rdma.h       |    2 -
 net/sunrpc/xprtrdma/svc_rdma_rw.c     |   12 ++--
 net/sunrpc/xprtrdma/svc_rdma_sendto.c |   98 ++++++++++++++-------------------
 3 files changed, 50 insertions(+), 62 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 07baeb5f93c1..c1c4563066d9 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -178,7 +178,7 @@ extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
 				     unsigned int offset,
 				     unsigned long length);
 extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
-				     __be32 *rp_ch, bool writelist,
+				     const struct svc_rdma_recv_ctxt *rctxt,
 				     struct xdr_buf *xdr);
 
 /* svc_rdma_sendto.c */
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index b0ac535c8728..ca9d414bef9d 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -545,8 +545,7 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
 /**
  * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
  * @rdma: controlling RDMA transport
- * @rp_ch: Reply chunk provided by client
- * @writelist: true if client provided a Write list
+ * @rctxt: chunk list information
  * @xdr: xdr_buf containing an RPC Reply
  *
  * Returns a non-negative number of bytes the chunk consumed, or
@@ -556,13 +555,14 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
  *	%-ENOTCONN if posting failed (connection is lost),
  *	%-EIO if rdma_rw initialization failed (DMA mapping, etc).
  */
-int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch,
-			      bool writelist, struct xdr_buf *xdr)
+int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
+			      const struct svc_rdma_recv_ctxt *rctxt,
+			      struct xdr_buf *xdr)
 {
 	struct svc_rdma_write_info *info;
 	int consumed, ret;
 
-	info = svc_rdma_write_info_alloc(rdma, rp_ch);
+	info = svc_rdma_write_info_alloc(rdma, rctxt->rc_reply_chunk);
 	if (!info)
 		return -ENOMEM;
 
@@ -574,7 +574,7 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch,
 	/* Send the page list in the Reply chunk only if the
 	 * client did not provide Write chunks.
 	 */
-	if (!writelist && xdr->page_len) {
+	if (!rctxt->rc_write_list && xdr->page_len) {
 		ret = svc_rdma_send_xdr_pagelist(info, xdr,
 						 xdr->head[0].iov_len,
 						 xdr->page_len);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 273453a336b0..7349a3f9aa5d 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -354,6 +354,14 @@ static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp)
 	return (unsigned long)p - (unsigned long)rdma_resp;
 }
 
+/* RPC-over-RDMA V1 replies never have a Read list.
+ */
+static __be32 *xdr_encode_read_list(__be32 *p)
+{
+	*p++ = xdr_zero;
+	return p;
+}
+
 /* One Write chunk is copied from Call transport header to Reply
  * transport header. Each segment's length field is updated to
  * reflect number of bytes consumed in the segment.
@@ -406,16 +414,17 @@ static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
  * Assumptions:
  *  - Client has provided only one Write chunk
  */
-static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch,
-					   unsigned int consumed)
+static __be32 *xdr_encode_write_list(__be32 *p,
+				     const struct svc_rdma_recv_ctxt *rctxt)
 {
-	unsigned int nsegs;
-	__be32 *p, *q;
+	unsigned int consumed, nsegs;
+	__be32 *q;
 
-	/* RPC-over-RDMA V1 replies never have a Read list. */
-	p = rdma_resp + rpcrdma_fixed_maxsz + 1;
+	q = rctxt->rc_write_list;
+	if (!q)
+		goto out;
 
-	q = wr_ch;
+	consumed = rctxt->rc_read_payload_length;
 	while (*q != xdr_zero) {
 		nsegs = xdr_encode_write_chunk(p, q, consumed);
 		q += 2 + nsegs * rpcrdma_segment_maxsz;
@@ -424,10 +433,9 @@ static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch,
 	}
 
 	/* Terminate Write list */
+out:
 	*p++ = xdr_zero;
-
-	/* Reply chunk discriminator; may be replaced later */
-	*p = xdr_zero;
+	return p;
 }
 
 /* The client provided a Reply chunk in the Call message. Fill in
@@ -435,23 +443,13 @@ static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch,
  * number of bytes consumed in each segment.
  *
  * Assumptions:
- * - Reply can always fit in the provided Reply chunk
+ * - Reply can always fit in the client-provided Reply chunk
  */
-static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch,
-					    unsigned int consumed)
+static void xdr_encode_reply_chunk(__be32 *p,
+				   const struct svc_rdma_recv_ctxt *rctxt,
+				   unsigned int length)
 {
-	__be32 *p;
-
-	/* Find the Reply chunk in the Reply's xprt header.
-	 * RPC-over-RDMA V1 replies never have a Read list.
-	 */
-	p = rdma_resp + rpcrdma_fixed_maxsz + 1;
-
-	/* Skip past Write list */
-	while (*p++ != xdr_zero)
-		p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
-
-	xdr_encode_write_chunk(p, rp_ch, consumed);
+	xdr_encode_write_chunk(p, rctxt->rc_reply_chunk, length);
 }
 
 static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
@@ -735,15 +733,15 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
  */
 static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
 				   struct svc_rdma_send_ctxt *sctxt,
-				   struct svc_rdma_recv_ctxt *rctxt,
-				   struct svc_rqst *rqstp,
-				   __be32 *wr_lst, __be32 *rp_ch)
+				   const struct svc_rdma_recv_ctxt *rctxt,
+				   struct svc_rqst *rqstp)
 {
 	int ret;
 
-	if (!rp_ch) {
+	if (!rctxt->rc_reply_chunk) {
 		ret = svc_rdma_map_reply_msg(rdma, sctxt,
-					     &rqstp->rq_res, wr_lst);
+					     &rqstp->rq_res,
+					     rctxt->rc_write_list);
 		if (ret < 0)
 			return ret;
 	}
@@ -808,16 +806,12 @@ static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
  */
 int svc_rdma_sendto(struct svc_rqst *rqstp)
 {
-	struct svc_xprt *xprt = rqstp->rq_xprt;
 	struct svcxprt_rdma *rdma =
-		container_of(xprt, struct svcxprt_rdma, sc_xprt);
+		container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
 	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
 	__be32 *rdma_argp = rctxt->rc_recv_buf;
-	__be32 *wr_lst = rctxt->rc_write_list;
-	__be32 *rp_ch = rctxt->rc_reply_chunk;
-	struct xdr_buf *xdr = &rqstp->rq_res;
 	struct svc_rdma_send_ctxt *sctxt;
-	__be32 *p, *rdma_resp;
+	__be32 *p;
 	int ret;
 
 	/* Create the RDMA response header. xprt->xpt_mutex,
@@ -830,32 +824,26 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 	sctxt = svc_rdma_send_ctxt_get(rdma);
 	if (!sctxt)
 		goto err0;
-	rdma_resp = sctxt->sc_xprt_buf;
 
-	p = rdma_resp;
+	p = sctxt->sc_xprt_buf;
 	*p++ = *rdma_argp;
 	*p++ = *(rdma_argp + 1);
 	*p++ = rdma->sc_fc_credits;
-	*p++ = rp_ch ? rdma_nomsg : rdma_msg;
+	*p++ = rctxt->rc_reply_chunk ? rdma_nomsg : rdma_msg;
 
-	/* Start with empty chunks */
-	*p++ = xdr_zero;
-	*p++ = xdr_zero;
-	*p   = xdr_zero;
-
-	if (wr_lst)
-		svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst,
-					       rctxt->rc_read_payload_length);
-	if (rp_ch) {
-		ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr);
+	p = xdr_encode_read_list(p);
+	p = xdr_encode_write_list(p, rctxt);
+	if (rctxt->rc_reply_chunk) {
+		ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res);
 		if (ret < 0)
 			goto err2;
-		svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret);
-	}
+		xdr_encode_reply_chunk(p, rctxt, ret);
+	} else
+		*p = xdr_zero;
 
-	svc_rdma_sync_reply_hdr(rdma, sctxt, svc_rdma_reply_hdr_len(rdma_resp));
-	ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp,
-				      wr_lst, rp_ch);
+	svc_rdma_sync_reply_hdr(rdma, sctxt,
+				svc_rdma_reply_hdr_len(sctxt->sc_xprt_buf));
+	ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp);
 	if (ret < 0)
 		goto err1;
 	ret = 0;
@@ -879,7 +867,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 	svc_rdma_send_ctxt_put(rdma, sctxt);
  err0:
 	trace_svcrdma_send_failed(rqstp, ret);
-	set_bit(XPT_CLOSE, &xprt->xpt_flags);
+	set_bit(XPT_CLOSE, &rqstp->rq_xprt->xpt_flags);
 	ret = -ENOTCONN;
 	goto out;
 }


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH RFC 9/9] svcrdma: Add data structure to track READ payloads
  2020-02-14 15:49 [PATCH RFC 0/9] Address bugzilla 198053 and more Chuck Lever
                   ` (7 preceding siblings ...)
  2020-02-14 15:50 ` [PATCH RFC 8/9] svcrdma: Refactor svc_rdma_sendto() Chuck Lever
@ 2020-02-14 15:50 ` Chuck Lever
  8 siblings, 0 replies; 10+ messages in thread
From: Chuck Lever @ 2020-02-14 15:50 UTC (permalink / raw)
  To: bfields; +Cc: linux-rdma, linux-nfs

The Linux NFS/RDMA server implementation currently supports only a
single Write chunk per RPC/RDMA request. Requests with more than one
are so rare there has never been a strong need to support more.
However we are aware of at least one existing NFS client
implementation that can generate such requests, so let's dig in.

Allocate a data structure at Receive time to keep track of the set
of READ payloads and the Write chunks.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/linux/sunrpc/svc_rdma.h            |   15 +++-
 net/sunrpc/xprtrdma/svc_rdma_backchannel.c |    2 -
 net/sunrpc/xprtrdma/svc_rdma_recvfrom.c    |   31 +++++++--
 net/sunrpc/xprtrdma/svc_rdma_rw.c          |    2 -
 net/sunrpc/xprtrdma/svc_rdma_sendto.c      |   94 +++++++++++++---------------
 5 files changed, 80 insertions(+), 64 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index c1c4563066d9..85e6b281a39b 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -124,6 +124,12 @@ enum {
 
 #define RPCSVC_MAXPAYLOAD_RDMA	RPCSVC_MAXPAYLOAD
 
+struct svc_rdma_payload {
+	__be32			*ra_chunk;
+	unsigned int		ra_offset;
+	unsigned int		ra_length;
+};
+
 struct svc_rdma_recv_ctxt {
 	struct llist_node	rc_node;
 	struct list_head	rc_list;
@@ -137,10 +143,10 @@ struct svc_rdma_recv_ctxt {
 	unsigned int		rc_page_count;
 	unsigned int		rc_hdr_count;
 	u32			rc_inv_rkey;
-	__be32			*rc_write_list;
+	struct svc_rdma_payload	*rc_read_payloads;
 	__be32			*rc_reply_chunk;
-	unsigned int		rc_read_payload_offset;
-	unsigned int		rc_read_payload_length;
+	unsigned int		rc_num_write_chunks;
+	unsigned int		rc_cur_payload;
 	struct page		*rc_pages[RPCSVC_MAXPAGES];
 };
 
@@ -193,7 +199,8 @@ extern void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma,
 				    unsigned int len);
 extern int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
 				  struct svc_rdma_send_ctxt *ctxt,
-				  struct xdr_buf *xdr, __be32 *wr_lst);
+				  struct xdr_buf *xdr,
+				  unsigned int num_read_payloads);
 extern int svc_rdma_sendto(struct svc_rqst *);
 extern int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
 				 unsigned int length);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 908e78bb87c6..3b1baf15a1b7 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -117,7 +117,7 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
 {
 	int ret;
 
-	ret = svc_rdma_map_reply_msg(rdma, ctxt, &rqst->rq_snd_buf, NULL);
+	ret = svc_rdma_map_reply_msg(rdma, ctxt, &rqst->rq_snd_buf, 0);
 	if (ret < 0)
 		return -EIO;
 
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 91abe08f7d75..85b8dd8ae772 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -193,7 +193,9 @@ void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma)
 
 out:
 	ctxt->rc_page_count = 0;
-	ctxt->rc_read_payload_length = 0;
+	ctxt->rc_num_write_chunks = 0;
+	ctxt->rc_cur_payload = 0;
+	ctxt->rc_read_payloads = NULL;
 	return ctxt;
 
 out_empty:
@@ -216,7 +218,8 @@ void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
 
 	for (i = 0; i < ctxt->rc_page_count; i++)
 		put_page(ctxt->rc_pages[i]);
-
+	kfree(ctxt->rc_read_payloads);
+	ctxt->rc_read_payloads = NULL;
 	if (!ctxt->rc_temp)
 		llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts);
 	else
@@ -452,9 +455,10 @@ static __be32 *xdr_check_write_chunk(__be32 *p, const __be32 *end,
 static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end,
 				    struct svc_rdma_recv_ctxt *ctxt)
 {
-	u32 chcount;
+	u32 chcount, segcount;
+	__be32 *saved = p;
+	int i;
 
-	ctxt->rc_write_list = p;
 	chcount = 0;
 	while (*p++ != xdr_zero) {
 		p = xdr_check_write_chunk(p, end, MAX_BYTES_WRITE_SEG);
@@ -463,8 +467,22 @@ static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end,
 		if (chcount++ > 1)
 			return NULL;
 	}
+	ctxt->rc_num_write_chunks = chcount;
 	if (!chcount)
-		ctxt->rc_write_list = NULL;
+		return p;
+
+	ctxt->rc_read_payloads = kcalloc(sizeof(struct svc_rdma_payload),
+					 chcount, GFP_KERNEL);
+	if (!ctxt->rc_read_payloads)
+		return NULL;
+
+	i = 0;
+	p = saved;
+	while (*p++ != xdr_zero) {
+		ctxt->rc_read_payloads[i++].ra_chunk = p - 1;
+		segcount = be32_to_cpup(p++);
+		p += segcount * rpcrdma_segment_maxsz;
+	}
 	return p;
 }
 
@@ -484,8 +502,9 @@ static __be32 *xdr_check_reply_chunk(__be32 *p, const __be32 *end,
 		p = xdr_check_write_chunk(p, end, MAX_BYTES_SPECIAL_SEG);
 		if (!p)
 			return NULL;
-	} else
+	} else {
 		ctxt->rc_reply_chunk = NULL;
+	}
 	return p;
 }
 
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index ca9d414bef9d..740ea4ee251d 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -574,7 +574,7 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
 	/* Send the page list in the Reply chunk only if the
 	 * client did not provide Write chunks.
 	 */
-	if (!rctxt->rc_write_list && xdr->page_len) {
+	if (!rctxt->rc_num_write_chunks && xdr->page_len) {
 		ret = svc_rdma_send_xdr_pagelist(info, xdr,
 						 xdr->head[0].iov_len,
 						 xdr->page_len);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 7349a3f9aa5d..378a24b666bb 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -366,10 +366,10 @@ static __be32 *xdr_encode_read_list(__be32 *p)
  * transport header. Each segment's length field is updated to
  * reflect number of bytes consumed in the segment.
  *
- * Returns number of segments in this chunk.
+ * Returns a pointer to the position to encode the next chunk.
  */
-static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
-					   unsigned int remaining)
+static __be32 *xdr_encode_write_chunk(__be32 *dst, __be32 *src,
+				      unsigned int length)
 {
 	unsigned int i, nsegs;
 	u32 seg_len;
@@ -386,15 +386,15 @@ static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
 		*dst++ = *src++;
 
 		/* bytes returned in this segment */
-		seg_len = be32_to_cpu(*src);
-		if (remaining >= seg_len) {
+		seg_len = be32_to_cpup(src);
+		if (length >= seg_len) {
 			/* entire segment was consumed */
 			*dst = *src;
-			remaining -= seg_len;
+			length -= seg_len;
 		} else {
 			/* segment only partly filled */
-			*dst = cpu_to_be32(remaining);
-			remaining = 0;
+			*dst = cpu_to_be32(length);
+			length = 0;
 		}
 		dst++; src++;
 
@@ -403,38 +403,25 @@ static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
 		*dst++ = *src++;
 	}
 
-	return nsegs;
+	return dst;
 }
 
-/* The client provided a Write list in the Call message. Fill in
- * the segments in the first Write chunk in the Reply's transport
- * header with the number of bytes consumed in each segment.
- * Remaining chunks are returned unused.
- *
- * Assumptions:
- *  - Client has provided only one Write chunk
+/* The client provided a Write list in the Call message. For each
+ * READ payload, fill in the segments in the Write chunks in the
+ * Reply's transport header with the number of bytes consumed
+ * in each segment. Any remaining Write chunks are returned to
+ * the client unused.
  */
 static __be32 *xdr_encode_write_list(__be32 *p,
 				     const struct svc_rdma_recv_ctxt *rctxt)
 {
-	unsigned int consumed, nsegs;
-	__be32 *q;
-
-	q = rctxt->rc_write_list;
-	if (!q)
-		goto out;
-
-	consumed = rctxt->rc_read_payload_length;
-	while (*q != xdr_zero) {
-		nsegs = xdr_encode_write_chunk(p, q, consumed);
-		q += 2 + nsegs * rpcrdma_segment_maxsz;
-		p += 2 + nsegs * rpcrdma_segment_maxsz;
-		consumed = 0;
-	}
+	unsigned int i;
 
-	/* Terminate Write list */
-out:
-	*p++ = xdr_zero;
+	for (i = 0; i < rctxt->rc_num_write_chunks; i++)
+		p = xdr_encode_write_chunk(p,
+					rctxt->rc_read_payloads[i].ra_chunk,
+					rctxt->rc_read_payloads[i].ra_length);
+	*p++ = xdr_zero;	/* Terminate Write list */
 	return p;
 }
 
@@ -519,7 +506,7 @@ void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma,
 static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
 				    struct svc_rdma_send_ctxt *ctxt,
 				    struct xdr_buf *xdr,
-				    __be32 *wr_lst)
+				    unsigned int num_write_chunks)
 {
 	int elements;
 
@@ -535,7 +522,7 @@ static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
 	elements = 1;
 
 	/* xdr->pages */
-	if (!wr_lst) {
+	if (!num_write_chunks) {
 		unsigned int remaining;
 		unsigned long pageoff;
 
@@ -563,7 +550,8 @@ static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
  */
 static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
 				      struct svc_rdma_send_ctxt *ctxt,
-				      struct xdr_buf *xdr, __be32 *wr_lst)
+				      struct xdr_buf *xdr,
+				      unsigned int num_write_chunks)
 {
 	unsigned char *dst, *tailbase;
 	unsigned int taillen;
@@ -576,7 +564,7 @@ static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
 
 	tailbase = xdr->tail[0].iov_base;
 	taillen = xdr->tail[0].iov_len;
-	if (wr_lst) {
+	if (num_write_chunks) {
 		u32 xdrpad;
 
 		xdrpad = xdr_padsize(xdr->page_len);
@@ -619,7 +607,7 @@ static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
  * @rdma: controlling transport
  * @ctxt: send_ctxt for the Send WR
  * @xdr: prepared xdr_buf containing RPC message
- * @wr_lst: pointer to Call header's Write list, or NULL
+ * @num_read_payloads: count of separate READ payloads to send
  *
  * Load the xdr_buf into the ctxt's sge array, and DMA map each
  * element as it is added.
@@ -628,7 +616,7 @@ static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
  */
 int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
 			   struct svc_rdma_send_ctxt *ctxt,
-			   struct xdr_buf *xdr, __be32 *wr_lst)
+			   struct xdr_buf *xdr, unsigned int num_read_payloads)
 {
 	unsigned int len, remaining;
 	unsigned long page_off;
@@ -637,8 +625,8 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
 	u32 xdr_pad;
 	int ret;
 
-	if (svc_rdma_pull_up_needed(rdma, ctxt, xdr, wr_lst))
-		return svc_rdma_pull_up_reply_msg(rdma, ctxt, xdr, wr_lst);
+	if (svc_rdma_pull_up_needed(rdma, ctxt, xdr, num_read_payloads))
+		return svc_rdma_pull_up_reply_msg(rdma, ctxt, xdr, num_read_payloads);
 
 	++ctxt->sc_cur_sge_no;
 	ret = svc_rdma_dma_map_buf(rdma, ctxt,
@@ -647,12 +635,12 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
 	if (ret < 0)
 		return ret;
 
-	/* If a Write chunk is present, the xdr_buf's page list
+	/* If Write chunks are present, the xdr_buf's page list
 	 * is not included inline. However the Upper Layer may
 	 * have added XDR padding in the tail buffer, and that
 	 * should not be included inline.
 	 */
-	if (wr_lst) {
+	if (num_read_payloads) {
 		base = xdr->tail[0].iov_base;
 		len = xdr->tail[0].iov_len;
 		xdr_pad = xdr_padsize(xdr->page_len);
@@ -741,7 +729,7 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
 	if (!rctxt->rc_reply_chunk) {
 		ret = svc_rdma_map_reply_msg(rdma, sctxt,
 					     &rqstp->rq_res,
-					     rctxt->rc_write_list);
+					     rctxt->rc_cur_payload);
 		if (ret < 0)
 			return ret;
 	}
@@ -885,18 +873,20 @@ int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
 {
 	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
 	struct svcxprt_rdma *rdma;
+	unsigned int i;
 
-	if (!rctxt->rc_write_list)
+	if (!rctxt->rc_num_write_chunks)
 		return 0;
 
-	/* XXX: Just one READ payload slot for now, since our
-	 * transport implementation currently supports only one
-	 * Write chunk.
-	 */
-	rctxt->rc_read_payload_offset = offset;
-	rctxt->rc_read_payload_length = length;
+	if (rctxt->rc_cur_payload > rctxt->rc_num_write_chunks)
+		return -ENOENT;
+	i = rctxt->rc_cur_payload++;
+
+	rctxt->rc_read_payloads[i].ra_offset = offset;
+	rctxt->rc_read_payloads[i].ra_length = length;
 
 	rdma = container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
-	return svc_rdma_send_write_chunk(rdma, rctxt->rc_write_list,
+	return svc_rdma_send_write_chunk(rdma,
+					 rctxt->rc_read_payloads[i].ra_chunk,
 					 &rqstp->rq_res, offset, length);
 }


^ permalink raw reply related	[flat|nested] 10+ messages in thread

end of thread, other threads:[~2020-02-14 18:21 UTC | newest]

Thread overview: 10+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-02-14 15:49 [PATCH RFC 0/9] Address bugzilla 198053 and more Chuck Lever
2020-02-14 15:49 ` [PATCH RFC 1/9] nfsd: Fix NFSv4 READ on RDMA when using readv Chuck Lever
2020-02-14 15:49 ` [PATCH RFC 2/9] NFSD: Clean up nfsd4_encode_readv Chuck Lever
2020-02-14 15:49 ` [PATCH RFC 3/9] svcrdma: Avoid DMA mapping small RPC Replies Chuck Lever
2020-02-14 15:50 ` [PATCH RFC 4/9] NFSD: Invoke svc_encode_read_payload in "read" NFSD encoders Chuck Lever
2020-02-14 15:50 ` [PATCH RFC 5/9] svcrdma: Add trace point to examine client-provided write segment Chuck Lever
2020-02-14 15:50 ` [PATCH RFC 6/9] svcrdma: De-duplicate code that locates Write and Reply chunks Chuck Lever
2020-02-14 15:50 ` [PATCH RFC 7/9] svcrdma: Post RDMA Writes while XDR encoding replies Chuck Lever
2020-02-14 15:50 ` [PATCH RFC 8/9] svcrdma: Refactor svc_rdma_sendto() Chuck Lever
2020-02-14 15:50 ` [PATCH RFC 9/9] svcrdma: Add data structure to track READ payloads Chuck Lever

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).