All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks
@ 2020-10-26 18:53 Chuck Lever
  2020-10-26 18:53 ` [PATCH 01/20] SUNRPC: Adjust synopsis of xdr_buf_subsegment() Chuck Lever
                   ` (20 more replies)
  0 siblings, 21 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:53 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

This series implements support for multiple RPC/RDMA chunks per RPC
transaction. This is one of the few remaining generalities that the
Linux NFS/RDMA server implementation lacks.

There is currently one known NFS/RDMA client implementation that can
send multiple chunks per RPC, and that is Solaris. Multiple chunks
are rare enough that the Linux NFS/RDMA implementation has been
successful without this support for many years.

Along with multiple chunk support, this series adds the following
benefits:

- More robust input sanitization of RPC/RDMA headers
- An internal representation of chunks that is agnostic to their
  wire format

The cost is a little additional complexity and some extra memory
allocations when handling non-empty chunk lists. Most of these
allocations can be optimized away if we find they are a problem.

---

Chuck Lever (20):
      SUNRPC: Adjust synopsis of xdr_buf_subsegment()
      svcrdma: Const-ify the xdr_buf arguments
      svcrdma: Refactor the RDMA Write path
      SUNRPC: Rename svc_encode_read_payload()
      NFSD: Invoke svc_encode_result_payload() in "read" NFSD encoders
      svcrdma: Post RDMA Writes while XDR encoding replies
      svcrdma: Clean up svc_rdma_encode_reply_chunk()
      svcrdma: Add a "parsed chunk list" data structure
      svcrdma: Use parsed chunk lists to derive the inv_rkey
      svcrdma: Use parsed chunk lists to detect reverse direction replies
      svcrdma: Use parsed chunk lists to construct RDMA Writes
      svcrdma: Use parsed chunk lists to encode Reply transport headers
      svcrdma: Support multiple write chunks when pulling up
      svcrdma: Support multiple Write chunks in svc_rdma_map_reply_msg()
      svcrdma: Support multiple Write chunks in svc_rdma_send_reply_chunk
      svcrdma: Remove chunk list pointers
      svcrdma: Clean up chunk tracepoints
      svcrdma: Rename info::ri_chunklen
      svcrdma: Use the new parsed chunk list when pulling Read chunks
      svcrdma: support multiple Read chunks per RPC


 fs/nfsd/nfs3xdr.c                          |   4 +
 fs/nfsd/nfs4xdr.c                          |   5 +-
 fs/nfsd/nfsxdr.c                           |   4 +
 include/linux/sunrpc/svc.h                 |   6 +-
 include/linux/sunrpc/svc_rdma.h            |  36 +-
 include/linux/sunrpc/svc_rdma_pcl.h        | 128 +++++
 include/linux/sunrpc/svc_xprt.h            |   4 +-
 include/trace/events/rpcrdma.h             | 143 +++--
 net/sunrpc/svc.c                           |  11 +-
 net/sunrpc/svcsock.c                       |   8 +-
 net/sunrpc/xprtrdma/Makefile               |   2 +-
 net/sunrpc/xprtrdma/svc_rdma_backchannel.c |  14 +-
 net/sunrpc/xprtrdma/svc_rdma_pcl.c         | 306 +++++++++++
 net/sunrpc/xprtrdma/svc_rdma_recvfrom.c    | 314 ++++++-----
 net/sunrpc/xprtrdma/svc_rdma_rw.c          | 598 +++++++++++++++------
 net/sunrpc/xprtrdma/svc_rdma_sendto.c      | 561 ++++++++++---------
 net/sunrpc/xprtrdma/svc_rdma_transport.c   |   2 +-
 17 files changed, 1488 insertions(+), 658 deletions(-)
 create mode 100644 include/linux/sunrpc/svc_rdma_pcl.h
 create mode 100644 net/sunrpc/xprtrdma/svc_rdma_pcl.c

--
Chuck Lever


^ permalink raw reply	[flat|nested] 29+ messages in thread

* [PATCH 01/20] SUNRPC: Adjust synopsis of xdr_buf_subsegment()
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
@ 2020-10-26 18:53 ` Chuck Lever
  2020-10-26 18:54 ` [PATCH 02/20] svcrdma: Const-ify the xdr_buf arguments Chuck Lever
                   ` (19 subsequent siblings)
  20 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:53 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

Clean up: This enables xdr_buf_subsegment()'s callers to pass in a
const pointer to that buffer.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/linux/sunrpc/xdr.h |    3 ++-
 net/sunrpc/xdr.c           |    5 ++---
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/include/linux/sunrpc/xdr.h b/include/linux/sunrpc/xdr.h
index 9548d075e06d..ec2a22ccdc2a 100644
--- a/include/linux/sunrpc/xdr.h
+++ b/include/linux/sunrpc/xdr.h
@@ -183,7 +183,8 @@ xdr_adjust_iovec(struct kvec *iov, __be32 *p)
  */
 extern void xdr_shift_buf(struct xdr_buf *, size_t);
 extern void xdr_buf_from_iov(struct kvec *, struct xdr_buf *);
-extern int xdr_buf_subsegment(struct xdr_buf *, struct xdr_buf *, unsigned int, unsigned int);
+extern int xdr_buf_subsegment(const struct xdr_buf *buf, struct xdr_buf *subbuf,
+			      unsigned int base, unsigned int len);
 extern void xdr_buf_trim(struct xdr_buf *, unsigned int);
 extern int read_bytes_from_xdr_buf(struct xdr_buf *, unsigned int, void *, unsigned int);
 extern int write_bytes_to_xdr_buf(struct xdr_buf *, unsigned int, void *, unsigned int);
diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c
index 71e03b930b70..28f81769a27c 100644
--- a/net/sunrpc/xdr.c
+++ b/net/sunrpc/xdr.c
@@ -1379,9 +1379,8 @@ EXPORT_SYMBOL_GPL(xdr_buf_from_iov);
  *
  * Returns -1 if base of length are out of bounds.
  */
-int
-xdr_buf_subsegment(struct xdr_buf *buf, struct xdr_buf *subbuf,
-			unsigned int base, unsigned int len)
+int xdr_buf_subsegment(const struct xdr_buf *buf, struct xdr_buf *subbuf,
+		       unsigned int base, unsigned int len)
 {
 	subbuf->buflen = subbuf->len = len;
 	if (base < buf->head[0].iov_len) {



^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 02/20] svcrdma: Const-ify the xdr_buf arguments
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
  2020-10-26 18:53 ` [PATCH 01/20] SUNRPC: Adjust synopsis of xdr_buf_subsegment() Chuck Lever
@ 2020-10-26 18:54 ` Chuck Lever
  2020-10-26 18:54 ` [PATCH 03/20] svcrdma: Refactor the RDMA Write path Chuck Lever
                   ` (18 subsequent siblings)
  20 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:54 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

Clean up: Ensure the code in rw.c does not modify the argument, and
enable callers to also use "const struct xdr_buf *".

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 net/sunrpc/xprtrdma/svc_rdma_rw.c |    4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 80a0c0e87590..d8b2e22c56c1 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -197,7 +197,7 @@ struct svc_rdma_write_info {
 	__be32			*wi_segs;
 
 	/* SGL constructor arguments */
-	struct xdr_buf		*wi_xdr;
+	const struct xdr_buf	*wi_xdr;
 	unsigned char		*wi_base;
 	unsigned int		wi_next_off;
 
@@ -405,7 +405,7 @@ static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
 				    struct svc_rdma_rw_ctxt *ctxt)
 {
 	unsigned int sge_no, sge_bytes, page_off, page_no;
-	struct xdr_buf *xdr = info->wi_xdr;
+	const struct xdr_buf *xdr = info->wi_xdr;
 	struct scatterlist *sg;
 	struct page **page;
 



^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 03/20] svcrdma: Refactor the RDMA Write path
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
  2020-10-26 18:53 ` [PATCH 01/20] SUNRPC: Adjust synopsis of xdr_buf_subsegment() Chuck Lever
  2020-10-26 18:54 ` [PATCH 02/20] svcrdma: Const-ify the xdr_buf arguments Chuck Lever
@ 2020-10-26 18:54 ` Chuck Lever
  2020-10-26 18:54 ` [PATCH 04/20] SUNRPC: Rename svc_encode_read_payload() Chuck Lever
                   ` (17 subsequent siblings)
  20 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:54 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

Refactor for subsequent changes.

Constify the xdr_buf argument to ensure the code here does not
modify it, and to enable callers to pass in a
"const struct xdr_buf *".

At the same time, rename the helper functions, which emit RDMA
Writes, not RDMA Sends, and add documenting comments.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 net/sunrpc/xprtrdma/svc_rdma_rw.c |   56 +++++++++++++++++++++++--------------
 1 file changed, 35 insertions(+), 21 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index d8b2e22c56c1..03c32b441d32 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -493,27 +493,42 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info,
 	return -E2BIG;
 }
 
-/* Send one of an xdr_buf's kvecs by itself. To send a Reply
- * chunk, the whole RPC Reply is written back to the client.
- * This function writes either the head or tail of the xdr_buf
- * containing the Reply.
+/**
+ * svc_rdma_iov_write - Construct RDMA Writes from an iov
+ * @info: pointer to write arguments
+ * @iov: kvec to write
+ *
+ * Returns:
+ *   On succes, returns zero
+ *   %-E2BIG if the client-provided Write chunk is too small
+ *   %-ENOMEM if a resource has been exhausted
+ *   %-EIO if an rdma-rw error occurred
  */
-static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info,
-				  struct kvec *vec)
+static int svc_rdma_iov_write(struct svc_rdma_write_info *info,
+			      const struct kvec *iov)
 {
-	info->wi_base = vec->iov_base;
+	info->wi_base = iov->iov_base;
 	return svc_rdma_build_writes(info, svc_rdma_vec_to_sg,
-				     vec->iov_len);
+				     iov->iov_len);
 }
 
-/* Send an xdr_buf's page list by itself. A Write chunk is just
- * the page list. A Reply chunk is @xdr's head, page list, and
- * tail. This function is shared between the two types of chunk.
+/**
+ * svc_rdma_pages_write - Construct RDMA Writes from pages
+ * @info: pointer to write arguments
+ * @xdr: xdr_buf with pages to write
+ * @offset: offset into the content of @xdr
+ * @length: number of bytes to write
+ *
+ * Returns:
+ *   On succes, returns zero
+ *   %-E2BIG if the client-provided Write chunk is too small
+ *   %-ENOMEM if a resource has been exhausted
+ *   %-EIO if an rdma-rw error occurred
  */
-static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
-				      struct xdr_buf *xdr,
-				      unsigned int offset,
-				      unsigned long length)
+static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
+				const struct xdr_buf *xdr,
+				unsigned int offset,
+				unsigned long length)
 {
 	info->wi_xdr = xdr;
 	info->wi_next_off = offset - xdr->head[0].iov_len;
@@ -550,7 +565,7 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
 	if (!info)
 		return -ENOMEM;
 
-	ret = svc_rdma_send_xdr_pagelist(info, xdr, offset, length);
+	ret = svc_rdma_pages_write(info, xdr, offset, length);
 	if (ret < 0)
 		goto out_err;
 
@@ -590,7 +605,7 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
 	if (!info)
 		return -ENOMEM;
 
-	ret = svc_rdma_send_xdr_kvec(info, &xdr->head[0]);
+	ret = svc_rdma_iov_write(info, &xdr->head[0]);
 	if (ret < 0)
 		goto out_err;
 	consumed = xdr->head[0].iov_len;
@@ -599,16 +614,15 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
 	 * client did not provide Write chunks.
 	 */
 	if (!rctxt->rc_write_list && xdr->page_len) {
-		ret = svc_rdma_send_xdr_pagelist(info, xdr,
-						 xdr->head[0].iov_len,
-						 xdr->page_len);
+		ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len,
+					   xdr->page_len);
 		if (ret < 0)
 			goto out_err;
 		consumed += xdr->page_len;
 	}
 
 	if (xdr->tail[0].iov_len) {
-		ret = svc_rdma_send_xdr_kvec(info, &xdr->tail[0]);
+		ret = svc_rdma_iov_write(info, &xdr->tail[0]);
 		if (ret < 0)
 			goto out_err;
 		consumed += xdr->tail[0].iov_len;



^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 04/20] SUNRPC: Rename svc_encode_read_payload()
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
                   ` (2 preceding siblings ...)
  2020-10-26 18:54 ` [PATCH 03/20] svcrdma: Refactor the RDMA Write path Chuck Lever
@ 2020-10-26 18:54 ` Chuck Lever
  2020-10-27 20:53   ` J. Bruce Fields
  2020-10-26 18:54 ` [PATCH 05/20] NFSD: Invoke svc_encode_result_payload() in "read" NFSD encoders Chuck Lever
                   ` (16 subsequent siblings)
  20 siblings, 1 reply; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:54 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

Clean up: "result payload" is a less confusing name for these
payloads. "READ payload" reflects only the NFS usage.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 fs/nfsd/nfs4xdr.c                        |    2 +-
 include/linux/sunrpc/svc.h               |    6 +++---
 include/linux/sunrpc/svc_rdma.h          |    4 ++--
 include/linux/sunrpc/svc_xprt.h          |    4 ++--
 net/sunrpc/svc.c                         |   11 ++++++-----
 net/sunrpc/svcsock.c                     |    8 ++++----
 net/sunrpc/xprtrdma/svc_rdma_sendto.c    |    8 ++++----
 net/sunrpc/xprtrdma/svc_rdma_transport.c |    2 +-
 8 files changed, 23 insertions(+), 22 deletions(-)

diff --git a/fs/nfsd/nfs4xdr.c b/fs/nfsd/nfs4xdr.c
index 833a2c64dfe8..7e24fb3ca36e 100644
--- a/fs/nfsd/nfs4xdr.c
+++ b/fs/nfsd/nfs4xdr.c
@@ -3829,7 +3829,7 @@ static __be32 nfsd4_encode_readv(struct nfsd4_compoundres *resp,
 	read->rd_length = maxcount;
 	if (nfserr)
 		return nfserr;
-	if (svc_encode_read_payload(resp->rqstp, starting_len + 8, maxcount))
+	if (svc_encode_result_payload(resp->rqstp, starting_len + 8, maxcount))
 		return nfserr_io;
 	xdr_truncate_encode(xdr, starting_len + 8 + xdr_align_size(maxcount));
 
diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
index 386628b36bc7..c220b734fa69 100644
--- a/include/linux/sunrpc/svc.h
+++ b/include/linux/sunrpc/svc.h
@@ -519,9 +519,9 @@ void		   svc_wake_up(struct svc_serv *);
 void		   svc_reserve(struct svc_rqst *rqstp, int space);
 struct svc_pool *  svc_pool_for_cpu(struct svc_serv *serv, int cpu);
 char *		   svc_print_addr(struct svc_rqst *, char *, size_t);
-int		   svc_encode_read_payload(struct svc_rqst *rqstp,
-					   unsigned int offset,
-					   unsigned int length);
+int		   svc_encode_result_payload(struct svc_rqst *rqstp,
+					     unsigned int offset,
+					     unsigned int length);
 unsigned int	   svc_fill_write_vector(struct svc_rqst *rqstp,
 					 struct page **pages,
 					 struct kvec *first, size_t total);
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 9dc3a3b88391..2b870a3f391b 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -207,8 +207,8 @@ extern void svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
 				    struct svc_rdma_recv_ctxt *rctxt,
 				    int status);
 extern int svc_rdma_sendto(struct svc_rqst *);
-extern int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
-				 unsigned int length);
+extern int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
+				   unsigned int length);
 
 /* svc_rdma_transport.c */
 extern struct svc_xprt_class svc_rdma_class;
diff --git a/include/linux/sunrpc/svc_xprt.h b/include/linux/sunrpc/svc_xprt.h
index aca35ab5cff2..92455e0d5244 100644
--- a/include/linux/sunrpc/svc_xprt.h
+++ b/include/linux/sunrpc/svc_xprt.h
@@ -21,8 +21,8 @@ struct svc_xprt_ops {
 	int		(*xpo_has_wspace)(struct svc_xprt *);
 	int		(*xpo_recvfrom)(struct svc_rqst *);
 	int		(*xpo_sendto)(struct svc_rqst *);
-	int		(*xpo_read_payload)(struct svc_rqst *, unsigned int,
-					    unsigned int);
+	int		(*xpo_result_payload)(struct svc_rqst *, unsigned int,
+					      unsigned int);
 	void		(*xpo_release_rqst)(struct svc_rqst *);
 	void		(*xpo_detach)(struct svc_xprt *);
 	void		(*xpo_free)(struct svc_xprt *);
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index c211b607239e..b41500645c3f 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1622,7 +1622,7 @@ u32 svc_max_payload(const struct svc_rqst *rqstp)
 EXPORT_SYMBOL_GPL(svc_max_payload);
 
 /**
- * svc_encode_read_payload - mark a range of bytes as a READ payload
+ * svc_encode_result_payload - mark a range of bytes as a result payload
  * @rqstp: svc_rqst to operate on
  * @offset: payload's byte offset in rqstp->rq_res
  * @length: size of payload, in bytes
@@ -1630,12 +1630,13 @@ EXPORT_SYMBOL_GPL(svc_max_payload);
  * Returns zero on success, or a negative errno if a permanent
  * error occurred.
  */
-int svc_encode_read_payload(struct svc_rqst *rqstp, unsigned int offset,
-			    unsigned int length)
+int svc_encode_result_payload(struct svc_rqst *rqstp, unsigned int offset,
+			      unsigned int length)
 {
-	return rqstp->rq_xprt->xpt_ops->xpo_read_payload(rqstp, offset, length);
+	return rqstp->rq_xprt->xpt_ops->xpo_result_payload(rqstp, offset,
+							   length);
 }
-EXPORT_SYMBOL_GPL(svc_encode_read_payload);
+EXPORT_SYMBOL_GPL(svc_encode_result_payload);
 
 /**
  * svc_fill_write_vector - Construct data argument for VFS write call
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index c2752e2b9ce3..b248f2349437 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -181,8 +181,8 @@ static void svc_set_cmsg_data(struct svc_rqst *rqstp, struct cmsghdr *cmh)
 	}
 }
 
-static int svc_sock_read_payload(struct svc_rqst *rqstp, unsigned int offset,
-				 unsigned int length)
+static int svc_sock_result_payload(struct svc_rqst *rqstp, unsigned int offset,
+				   unsigned int length)
 {
 	return 0;
 }
@@ -635,7 +635,7 @@ static const struct svc_xprt_ops svc_udp_ops = {
 	.xpo_create = svc_udp_create,
 	.xpo_recvfrom = svc_udp_recvfrom,
 	.xpo_sendto = svc_udp_sendto,
-	.xpo_read_payload = svc_sock_read_payload,
+	.xpo_result_payload = svc_sock_result_payload,
 	.xpo_release_rqst = svc_udp_release_rqst,
 	.xpo_detach = svc_sock_detach,
 	.xpo_free = svc_sock_free,
@@ -1123,7 +1123,7 @@ static const struct svc_xprt_ops svc_tcp_ops = {
 	.xpo_create = svc_tcp_create,
 	.xpo_recvfrom = svc_tcp_recvfrom,
 	.xpo_sendto = svc_tcp_sendto,
-	.xpo_read_payload = svc_sock_read_payload,
+	.xpo_result_payload = svc_sock_result_payload,
 	.xpo_release_rqst = svc_tcp_release_rqst,
 	.xpo_detach = svc_tcp_sock_detach,
 	.xpo_free = svc_sock_free,
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index c3d588b149aa..c8411b4f3492 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -979,19 +979,19 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 }
 
 /**
- * svc_rdma_read_payload - special processing for a READ payload
+ * svc_rdma_result_payload - special processing for a result payload
  * @rqstp: svc_rqst to operate on
  * @offset: payload's byte offset in @xdr
  * @length: size of payload, in bytes
  *
  * Returns zero on success.
  *
- * For the moment, just record the xdr_buf location of the READ
+ * For the moment, just record the xdr_buf location of the result
  * payload. svc_rdma_sendto will use that location later when
  * we actually send the payload.
  */
-int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
-			  unsigned int length)
+int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
+			    unsigned int length)
 {
 	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
 
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index fb044792b571..afba4e9d5425 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -80,7 +80,7 @@ static const struct svc_xprt_ops svc_rdma_ops = {
 	.xpo_create = svc_rdma_create,
 	.xpo_recvfrom = svc_rdma_recvfrom,
 	.xpo_sendto = svc_rdma_sendto,
-	.xpo_read_payload = svc_rdma_read_payload,
+	.xpo_result_payload = svc_rdma_result_payload,
 	.xpo_release_rqst = svc_rdma_release_rqst,
 	.xpo_detach = svc_rdma_detach,
 	.xpo_free = svc_rdma_free,



^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 05/20] NFSD: Invoke svc_encode_result_payload() in "read" NFSD encoders
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
                   ` (3 preceding siblings ...)
  2020-10-26 18:54 ` [PATCH 04/20] SUNRPC: Rename svc_encode_read_payload() Chuck Lever
@ 2020-10-26 18:54 ` Chuck Lever
  2020-10-26 18:54 ` [PATCH 06/20] svcrdma: Post RDMA Writes while XDR encoding replies Chuck Lever
                   ` (15 subsequent siblings)
  20 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:54 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

Have the NFSD encoders annotate the boundaries of every
direct-data-placement eligible result data payload. Then change
svcrdma to use that annotation instead of the xdr->page_len
when handling Write chunks.

For NFSv4 on RDMA, that enables the ability to recognize multiple
result payloads per compound. This is a pre-requisite for supporting
multiple Write chunks per RPC transaction.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 fs/nfsd/nfs3xdr.c                     |    4 ++++
 fs/nfsd/nfs4xdr.c                     |    3 +++
 fs/nfsd/nfsxdr.c                      |    4 ++++
 net/sunrpc/xprtrdma/svc_rdma_sendto.c |   24 +++++++-----------------
 4 files changed, 18 insertions(+), 17 deletions(-)

diff --git a/fs/nfsd/nfs3xdr.c b/fs/nfsd/nfs3xdr.c
index 9c23b6acf234..f38cd31dbbec 100644
--- a/fs/nfsd/nfs3xdr.c
+++ b/fs/nfsd/nfs3xdr.c
@@ -720,6 +720,8 @@ nfs3svc_encode_readlinkres(struct svc_rqst *rqstp, __be32 *p)
 			*p = 0;
 			rqstp->rq_res.tail[0].iov_len = 4 - (resp->len&3);
 		}
+		svc_encode_result_payload(rqstp, rqstp->rq_res.head[0].iov_len,
+					  resp->len);
 		return 1;
 	} else
 		return xdr_ressize_check(rqstp, p);
@@ -746,6 +748,8 @@ nfs3svc_encode_readres(struct svc_rqst *rqstp, __be32 *p)
 			*p = 0;
 			rqstp->rq_res.tail[0].iov_len = 4 - (resp->count & 3);
 		}
+		svc_encode_result_payload(rqstp, rqstp->rq_res.head[0].iov_len,
+					  resp->count);
 		return 1;
 	} else
 		return xdr_ressize_check(rqstp, p);
diff --git a/fs/nfsd/nfs4xdr.c b/fs/nfsd/nfs4xdr.c
index 7e24fb3ca36e..e56f0385022c 100644
--- a/fs/nfsd/nfs4xdr.c
+++ b/fs/nfsd/nfs4xdr.c
@@ -3777,6 +3777,8 @@ static __be32 nfsd4_encode_splice_read(
 		buf->page_len = 0;
 		return nfserr;
 	}
+	svc_encode_result_payload(read->rd_rqstp, buf->head[0].iov_len,
+				  maxcount);
 
 	*(p++) = htonl(eof);
 	*(p++) = htonl(maxcount);
@@ -3921,6 +3923,7 @@ nfsd4_encode_readlink(struct nfsd4_compoundres *resp, __be32 nfserr, struct nfsd
 		xdr_truncate_encode(xdr, length_offset);
 		return nfserr;
 	}
+	svc_encode_result_payload(readlink->rl_rqstp, length_offset, maxcount);
 
 	wire_count = htonl(maxcount);
 	write_bytes_to_xdr_buf(xdr->buf, length_offset, &wire_count, 4);
diff --git a/fs/nfsd/nfsxdr.c b/fs/nfsd/nfsxdr.c
index 8a288c8fcd57..a23ea58a098e 100644
--- a/fs/nfsd/nfsxdr.c
+++ b/fs/nfsd/nfsxdr.c
@@ -483,6 +483,8 @@ nfssvc_encode_readlinkres(struct svc_rqst *rqstp, __be32 *p)
 		*p = 0;
 		rqstp->rq_res.tail[0].iov_len = 4 - (resp->len&3);
 	}
+	svc_encode_result_payload(rqstp, rqstp->rq_res.head[0].iov_len,
+				  resp->len);
 	return 1;
 }
 
@@ -507,6 +509,8 @@ nfssvc_encode_readres(struct svc_rqst *rqstp, __be32 *p)
 		*p = 0;
 		rqstp->rq_res.tail[0].iov_len = 4 - (resp->count&3);
 	}
+	svc_encode_result_payload(rqstp, rqstp->rq_res.head[0].iov_len,
+				  resp->count);
 	return 1;
 }
 
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index c8411b4f3492..d6436c13d5c4 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -448,7 +448,6 @@ static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
  * svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list
  * @rctxt: Reply context with information about the RPC Call
  * @sctxt: Send context for the RPC Reply
- * @length: size in bytes of the payload in the first Write chunk
  *
  * The client provides a Write chunk list in the Call message. Fill
  * in the segments in the first Write chunk in the Reply's transport
@@ -465,12 +464,12 @@ static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
  */
 static ssize_t
 svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
-			   struct svc_rdma_send_ctxt *sctxt,
-			   unsigned int length)
+			   struct svc_rdma_send_ctxt *sctxt)
 {
 	ssize_t len, ret;
 
-	ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt, length);
+	ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt,
+					  rctxt->rc_read_payload_length);
 	if (ret < 0)
 		return ret;
 	len = ret;
@@ -923,21 +922,12 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 		goto err0;
 	if (wr_lst) {
 		/* XXX: Presume the client sent only one Write chunk */
-		unsigned long offset;
-		unsigned int length;
-
-		if (rctxt->rc_read_payload_length) {
-			offset = rctxt->rc_read_payload_offset;
-			length = rctxt->rc_read_payload_length;
-		} else {
-			offset = xdr->head[0].iov_len;
-			length = xdr->page_len;
-		}
-		ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr, offset,
-						length);
+		ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr,
+						rctxt->rc_read_payload_offset,
+						rctxt->rc_read_payload_length);
 		if (ret < 0)
 			goto err2;
-		if (svc_rdma_encode_write_list(rctxt, sctxt, length) < 0)
+		if (svc_rdma_encode_write_list(rctxt, sctxt) < 0)
 			goto err0;
 	} else {
 		if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)



^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 06/20] svcrdma: Post RDMA Writes while XDR encoding replies
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
                   ` (4 preceding siblings ...)
  2020-10-26 18:54 ` [PATCH 05/20] NFSD: Invoke svc_encode_result_payload() in "read" NFSD encoders Chuck Lever
@ 2020-10-26 18:54 ` Chuck Lever
  2020-10-26 18:54 ` [PATCH 07/20] svcrdma: Clean up svc_rdma_encode_reply_chunk() Chuck Lever
                   ` (14 subsequent siblings)
  20 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:54 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

The only RPC/RDMA ordering requirement between RDMA Writes and RDMA
Sends is that the responder must post the Writes on the Send queue
before posting the Send that conveys the RPC Reply for that Write
payload.

The Linux NFS server implementation now has a transport method that
can post result Payload Writes earlier than svc_rdma_sendto:

   ->xpo_result_payload()

This gets RDMA Writes going earlier so they are more likely to be
complete at the remote end before the Send completes.

Some care must be taken with pulled-up Replies. We don't want to
push the Write chunk and then send the same payload data via Send.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/linux/sunrpc/svc_rdma.h       |    4 +-
 net/sunrpc/xprtrdma/svc_rdma_rw.c     |   52 +++++++++++++++++++++++------
 net/sunrpc/xprtrdma/svc_rdma_sendto.c |   59 ++++++++++++++++++---------------
 3 files changed, 75 insertions(+), 40 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 2b870a3f391b..f5a3c852bb90 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -183,9 +183,7 @@ extern int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma,
 				    struct svc_rqst *rqstp,
 				    struct svc_rdma_recv_ctxt *head, __be32 *p);
 extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
-				     __be32 *wr_ch, struct xdr_buf *xdr,
-				     unsigned int offset,
-				     unsigned long length);
+				     __be32 *wr_ch, const struct xdr_buf *xdr);
 extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
 				     const struct svc_rdma_recv_ctxt *rctxt,
 				     struct xdr_buf *xdr);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 03c32b441d32..d732785d0380 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -536,13 +536,49 @@ static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
 				     length);
 }
 
+/**
+ * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf
+ * @xdr: xdr_buf to write
+ * @info: pointer to write arguments
+ *
+ * Returns:
+ *   On succes, returns zero
+ *   %-E2BIG if the client-provided Write chunk is too small
+ *   %-ENOMEM if a resource has been exhausted
+ *   %-EIO if an rdma-rw error occurred
+ */
+static int svc_rdma_xb_write(const struct xdr_buf *xdr,
+			     struct svc_rdma_write_info *info)
+{
+	int ret;
+
+	if (xdr->head[0].iov_len) {
+		ret = svc_rdma_iov_write(info, &xdr->head[0]);
+		if (ret < 0)
+			return ret;
+	}
+
+	if (xdr->page_len) {
+		ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len,
+					   xdr->page_len);
+		if (ret < 0)
+			return ret;
+	}
+
+	if (xdr->tail[0].iov_len) {
+		ret = svc_rdma_iov_write(info, &xdr->tail[0]);
+		if (ret < 0)
+			return ret;
+	}
+
+	return xdr->len;
+}
+
 /**
  * svc_rdma_send_write_chunk - Write all segments in a Write chunk
  * @rdma: controlling RDMA transport
  * @wr_ch: Write chunk provided by client
  * @xdr: xdr_buf containing the data payload
- * @offset: payload's byte offset in @xdr
- * @length: size of payload, in bytes
  *
  * Returns a non-negative number of bytes the chunk consumed, or
  *	%-E2BIG if the payload was larger than the Write chunk,
@@ -552,21 +588,17 @@ static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
  *	%-EIO if rdma_rw initialization failed (DMA mapping, etc).
  */
 int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
-			      struct xdr_buf *xdr,
-			      unsigned int offset, unsigned long length)
+			      const struct xdr_buf *xdr)
 {
 	struct svc_rdma_write_info *info;
 	int ret;
 
-	if (!length)
-		return 0;
-
 	info = svc_rdma_write_info_alloc(rdma, wr_ch);
 	if (!info)
 		return -ENOMEM;
 
-	ret = svc_rdma_pages_write(info, xdr, offset, length);
-	if (ret < 0)
+	ret = svc_rdma_xb_write(xdr, info);
+	if (ret != xdr->len)
 		goto out_err;
 
 	ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
@@ -574,7 +606,7 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
 		goto out_err;
 
 	trace_svcrdma_send_write_chunk(xdr->page_len);
-	return length;
+	return xdr->len;
 
 out_err:
 	svc_rdma_write_info_free(info);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index d6436c13d5c4..fb6ba1177fd8 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -468,11 +468,14 @@ svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
 {
 	ssize_t len, ret;
 
-	ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt,
-					  rctxt->rc_read_payload_length);
-	if (ret < 0)
-		return ret;
-	len = ret;
+	len = 0;
+	if (rctxt->rc_write_list) {
+		ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt,
+						  rctxt->rc_read_payload_length);
+		if (ret < 0)
+			return ret;
+		len = ret;
+	}
 
 	/* Terminate the Write list */
 	ret = xdr_stream_encode_item_absent(&sctxt->sc_stream);
@@ -556,11 +559,13 @@ static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
 				    const struct svc_rdma_recv_ctxt *rctxt,
 				    struct xdr_buf *xdr)
 {
+	bool write_chunk_present = rctxt && rctxt->rc_write_list;
 	int elements;
 
 	/* For small messages, copying bytes is cheaper than DMA mapping.
 	 */
-	if (sctxt->sc_hdrbuf.len + xdr->len < RPCRDMA_PULLUP_THRESH)
+	if (!write_chunk_present &&
+	    sctxt->sc_hdrbuf.len + xdr->len < RPCRDMA_PULLUP_THRESH)
 		return true;
 
 	/* Check whether the xdr_buf has more elements than can
@@ -893,9 +898,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 		container_of(xprt, struct svcxprt_rdma, sc_xprt);
 	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
 	__be32 *rdma_argp = rctxt->rc_recv_buf;
-	__be32 *wr_lst = rctxt->rc_write_list;
 	__be32 *rp_ch = rctxt->rc_reply_chunk;
-	struct xdr_buf *xdr = &rqstp->rq_res;
 	struct svc_rdma_send_ctxt *sctxt;
 	__be32 *p;
 	int ret;
@@ -920,19 +923,8 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 
 	if (svc_rdma_encode_read_list(sctxt) < 0)
 		goto err0;
-	if (wr_lst) {
-		/* XXX: Presume the client sent only one Write chunk */
-		ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr,
-						rctxt->rc_read_payload_offset,
-						rctxt->rc_read_payload_length);
-		if (ret < 0)
-			goto err2;
-		if (svc_rdma_encode_write_list(rctxt, sctxt) < 0)
-			goto err0;
-	} else {
-		if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
-			goto err0;
-	}
+	if (svc_rdma_encode_write_list(rctxt, sctxt) < 0)
+		goto err0;
 	if (rp_ch) {
 		ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res);
 		if (ret < 0)
@@ -974,16 +966,25 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
  * @offset: payload's byte offset in @xdr
  * @length: size of payload, in bytes
  *
- * Returns zero on success.
- *
- * For the moment, just record the xdr_buf location of the result
- * payload. svc_rdma_sendto will use that location later when
- * we actually send the payload.
+ * Return values:
+ *   On success, returns the number of bytes in the payload
+ *   If nothing needs to be done, returns zero
+ *   %-EMSGSIZE on XDR buffer overflow
+ *   %-E2BIG if the payload was larger than the Write chunk
+ *   %-EINVAL if client provided too many segments
+ *   %-ENOMEM if rdma_rw context pool was exhausted
+ *   %-ENOTCONN if posting failed (connection is lost)
+ *   %-EIO if rdma_rw initialization failed (DMA mapping, etc)
  */
 int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
 			    unsigned int length)
 {
 	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
+	struct svcxprt_rdma *rdma;
+	struct xdr_buf subbuf;
+
+	if (!rctxt->rc_write_list || !length)
+		return 0;
 
 	/* XXX: Just one READ payload slot for now, since our
 	 * transport implementation currently supports only one
@@ -992,5 +993,9 @@ int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
 	rctxt->rc_read_payload_offset = offset;
 	rctxt->rc_read_payload_length = length;
 
-	return 0;
+	if (xdr_buf_subsegment(&rqstp->rq_res, &subbuf, offset, length))
+		return -EMSGSIZE;
+
+	rdma = container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
+	return svc_rdma_send_write_chunk(rdma, rctxt->rc_write_list, &subbuf);
 }



^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 07/20] svcrdma: Clean up svc_rdma_encode_reply_chunk()
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
                   ` (5 preceding siblings ...)
  2020-10-26 18:54 ` [PATCH 06/20] svcrdma: Post RDMA Writes while XDR encoding replies Chuck Lever
@ 2020-10-26 18:54 ` Chuck Lever
  2020-10-26 18:54 ` [PATCH 08/20] svcrdma: Add a "parsed chunk list" data structure Chuck Lever
                   ` (13 subsequent siblings)
  20 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:54 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

Refactor: Match the control flow of svc_rdma_encode_write_list().

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 net/sunrpc/xprtrdma/svc_rdma_rw.c     |    3 +++
 net/sunrpc/xprtrdma/svc_rdma_sendto.c |   23 +++++++++++------------
 2 files changed, 14 insertions(+), 12 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index d732785d0380..5f667d964cd6 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -633,6 +633,9 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
 	struct svc_rdma_write_info *info;
 	int consumed, ret;
 
+	if (!rctxt->rc_reply_chunk)
+		return 0;
+
 	info = svc_rdma_write_info_alloc(rdma, rctxt->rc_reply_chunk);
 	if (!info)
 		return -ENOMEM;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index fb6ba1177fd8..3e7ba06788b0 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -504,6 +504,9 @@ svc_rdma_encode_reply_chunk(const struct svc_rdma_recv_ctxt *rctxt,
 			    struct svc_rdma_send_ctxt *sctxt,
 			    unsigned int length)
 {
+	if (!rctxt->rc_reply_chunk)
+		return xdr_stream_encode_item_absent(&sctxt->sc_stream);
+
 	return svc_rdma_encode_write_chunk(rctxt->rc_reply_chunk, sctxt,
 					   length);
 }
@@ -898,7 +901,6 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 		container_of(xprt, struct svcxprt_rdma, sc_xprt);
 	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
 	__be32 *rdma_argp = rctxt->rc_recv_buf;
-	__be32 *rp_ch = rctxt->rc_reply_chunk;
 	struct svc_rdma_send_ctxt *sctxt;
 	__be32 *p;
 	int ret;
@@ -916,25 +918,22 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 			      rpcrdma_fixed_maxsz * sizeof(*p));
 	if (!p)
 		goto err0;
+
+	ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res);
+	if (ret < 0)
+		goto err2;
+
 	*p++ = *rdma_argp;
 	*p++ = *(rdma_argp + 1);
 	*p++ = rdma->sc_fc_credits;
-	*p   = rp_ch ? rdma_nomsg : rdma_msg;
+	*p = rctxt->rc_reply_chunk ? rdma_nomsg : rdma_msg;
 
 	if (svc_rdma_encode_read_list(sctxt) < 0)
 		goto err0;
 	if (svc_rdma_encode_write_list(rctxt, sctxt) < 0)
 		goto err0;
-	if (rp_ch) {
-		ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res);
-		if (ret < 0)
-			goto err2;
-		if (svc_rdma_encode_reply_chunk(rctxt, sctxt, ret) < 0)
-			goto err0;
-	} else {
-		if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
-			goto err0;
-	}
+	if (svc_rdma_encode_reply_chunk(rctxt, sctxt, ret) < 0)
+		goto err0;
 
 	ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp);
 	if (ret < 0)



^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 08/20] svcrdma: Add a "parsed chunk list" data structure
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
                   ` (6 preceding siblings ...)
  2020-10-26 18:54 ` [PATCH 07/20] svcrdma: Clean up svc_rdma_encode_reply_chunk() Chuck Lever
@ 2020-10-26 18:54 ` Chuck Lever
  2020-10-26 18:54 ` [PATCH 09/20] svcrdma: Use parsed chunk lists to derive the inv_rkey Chuck Lever
                   ` (12 subsequent siblings)
  20 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:54 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

This simple data structure binds the location of each data payload
inside of an RPC message to the chunk that will be used to push it
to or pull it from the client.

There are several benefits to this small additional overhead:

 * It enables support for more than one chunk in incoming Read and
   Write lists.

 * It translates the version-specific on-the-wire format into a
   generic in-memory structure, enabling support for multiple
   versions of the RPC/RDMA transport protocol.

 * It enables the server to re-organize a chunk list if it needs to
   adjust where Read chunk data lands in server memory without
   altering the contents of the XDR-encoded Receive buffer.

Construction of these lists is done while sanity checking each
incoming RPC/RDMA header. Subsequent patches will make use of the
generated data structures.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/linux/sunrpc/svc_rdma.h         |   12 +
 include/linux/sunrpc/svc_rdma_pcl.h     |  128 +++++++++++++
 include/trace/events/rpcrdma.h          |   75 +++++++-
 net/sunrpc/xprtrdma/Makefile            |    2 
 net/sunrpc/xprtrdma/svc_rdma_pcl.c      |  306 +++++++++++++++++++++++++++++++
 net/sunrpc/xprtrdma/svc_rdma_recvfrom.c |  196 ++++++++++++--------
 6 files changed, 635 insertions(+), 84 deletions(-)
 create mode 100644 include/linux/sunrpc/svc_rdma_pcl.h
 create mode 100644 net/sunrpc/xprtrdma/svc_rdma_pcl.c

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index f5a3c852bb90..a89d4209fe2a 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -47,6 +47,8 @@
 #include <linux/sunrpc/svcsock.h>
 #include <linux/sunrpc/rpc_rdma.h>
 #include <linux/sunrpc/rpc_rdma_cid.h>
+#include <linux/sunrpc/svc_rdma_pcl.h>
+
 #include <rdma/ib_verbs.h>
 #include <rdma/rdma_cm.h>
 
@@ -142,8 +144,18 @@ struct svc_rdma_recv_ctxt {
 	unsigned int		rc_page_count;
 	unsigned int		rc_hdr_count;
 	u32			rc_inv_rkey;
+
+	struct svc_rdma_pcl	rc_call_pcl;
+
+	struct svc_rdma_pcl	rc_read_pcl;
+
 	__be32			*rc_write_list;
+	struct svc_rdma_chunk	*rc_cur_result_payload;
+	struct svc_rdma_pcl	rc_write_pcl;
+
 	__be32			*rc_reply_chunk;
+	struct svc_rdma_pcl	rc_reply_pcl;
+
 	unsigned int		rc_read_payload_offset;
 	unsigned int		rc_read_payload_length;
 	struct page		*rc_pages[RPCSVC_MAXPAGES];
diff --git a/include/linux/sunrpc/svc_rdma_pcl.h b/include/linux/sunrpc/svc_rdma_pcl.h
new file mode 100644
index 000000000000..7516ad0fae80
--- /dev/null
+++ b/include/linux/sunrpc/svc_rdma_pcl.h
@@ -0,0 +1,128 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+/*
+ * Copyright (c) 2020, Oracle and/or its affiliates
+ */
+
+#ifndef SVC_RDMA_PCL_H
+#define SVC_RDMA_PCL_H
+
+#include <linux/list.h>
+
+struct svc_rdma_segment {
+	u32			rs_handle;
+	u32			rs_length;
+	u64			rs_offset;
+};
+
+struct svc_rdma_chunk {
+	struct list_head	ch_list;
+
+	u32			ch_position;
+	u32			ch_length;
+	u32			ch_payload_length;
+
+	u32			ch_segcount;
+	struct svc_rdma_segment	ch_segments[];
+};
+
+struct svc_rdma_pcl {
+	unsigned int		cl_count;
+	struct list_head	cl_chunks;
+};
+
+/**
+ * pcl_init - Initialize a parsed chunk list
+ * @pcl: parsed chunk list to initialize
+ *
+ */
+static inline void pcl_init(struct svc_rdma_pcl *pcl)
+{
+	INIT_LIST_HEAD(&pcl->cl_chunks);
+}
+
+/**
+ * pcl_is_empty - Return true if parsed chunk list is empty
+ * @pcl: parsed chunk list
+ *
+ */
+static inline bool pcl_is_empty(const struct svc_rdma_pcl *pcl)
+{
+	return list_empty(&pcl->cl_chunks);
+}
+
+/**
+ * pcl_first_chunk - Return first chunk in a parsed chunk list
+ * @pcl: parsed chunk list
+ *
+ * Returns the first chunk in the list, or NULL if the list is empty.
+ */
+static inline struct svc_rdma_chunk *
+pcl_first_chunk(const struct svc_rdma_pcl *pcl)
+{
+	if (pcl_is_empty(pcl))
+		return NULL;
+	return list_first_entry(&pcl->cl_chunks, struct svc_rdma_chunk,
+				ch_list);
+}
+
+/**
+ * pcl_next_chunk - Return next chunk in a parsed chunk list
+ * @pcl: a parsed chunk list
+ * @chunk: chunk in @pcl
+ *
+ * Returns the next chunk in the list, or NULL if @chunk is already last.
+ */
+static inline struct svc_rdma_chunk *
+pcl_next_chunk(const struct svc_rdma_pcl *pcl, struct svc_rdma_chunk *chunk)
+{
+	if (list_is_last(&chunk->ch_list, &pcl->cl_chunks))
+		return NULL;
+	return list_next_entry(chunk, ch_list);
+}
+
+/**
+ * pcl_for_each_chunk - Iterate over chunks in a parsed chunk list
+ * @pos: the loop cursor
+ * @pcl: a parsed chunk list
+ */
+#define pcl_for_each_chunk(pos, pcl) \
+	for (pos = list_first_entry(&(pcl)->cl_chunks, struct svc_rdma_chunk, ch_list); \
+	     &pos->ch_list != &(pcl)->cl_chunks; \
+	     pos = list_next_entry(pos, ch_list))
+
+/**
+ * pcl_for_each_segment - Iterate over segments in a parsed chunk
+ * @pos: the loop cursor
+ * @chunk: a parsed chunk
+ */
+#define pcl_for_each_segment(pos, chunk) \
+	for (pos = &(chunk)->ch_segments[0]; \
+	     pos <= &(chunk)->ch_segments[(chunk)->ch_segcount - 1]; \
+	     pos++)
+
+/**
+ * pcl_chunk_end_offset - Return offset of byte range following @chunk
+ * @chunk: chunk in @pcl
+ *
+ * Returns starting offset of the region just after @chunk
+ */
+static inline unsigned int
+pcl_chunk_end_offset(const struct svc_rdma_chunk *chunk)
+{
+	return xdr_align_size(chunk->ch_position + chunk->ch_payload_length);
+}
+
+struct svc_rdma_recv_ctxt;
+
+extern void pcl_free(struct svc_rdma_pcl *pcl);
+extern bool pcl_alloc_call(struct svc_rdma_recv_ctxt *rctxt, __be32 *p);
+extern bool pcl_alloc_read(struct svc_rdma_recv_ctxt *rctxt, __be32 *p);
+extern bool pcl_alloc_write(struct svc_rdma_recv_ctxt *rctxt,
+			    struct svc_rdma_pcl *pcl, __be32 *p);
+extern int pcl_process_nonpayloads(const struct svc_rdma_pcl *pcl,
+				   const struct xdr_buf *xdr,
+				   int (*actor)(const struct xdr_buf *,
+						void *),
+				   void *data);
+
+#endif	/* SVC_RDMA_PCL_H */
diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index bf1065772228..72b941aef43b 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -1446,12 +1446,83 @@ DECLARE_EVENT_CLASS(svcrdma_segment_event,
 				),					\
 				TP_ARGS(handle, length, offset))
 
-DEFINE_SEGMENT_EVENT(decode_wseg);
-DEFINE_SEGMENT_EVENT(encode_rseg);
 DEFINE_SEGMENT_EVENT(send_rseg);
 DEFINE_SEGMENT_EVENT(encode_wseg);
 DEFINE_SEGMENT_EVENT(send_wseg);
 
+TRACE_EVENT(svcrdma_decode_rseg,
+	TP_PROTO(
+		const struct rpc_rdma_cid *cid,
+		const struct svc_rdma_chunk *chunk,
+		const struct svc_rdma_segment *segment
+	),
+
+	TP_ARGS(cid, chunk, segment),
+
+	TP_STRUCT__entry(
+		__field(u32, cq_id)
+		__field(int, completion_id)
+		__field(u32, segno)
+		__field(u32, position)
+		__field(u32, handle)
+		__field(u32, length)
+		__field(u64, offset)
+	),
+
+	TP_fast_assign(
+		__entry->cq_id = cid->ci_queue_id;
+		__entry->completion_id = cid->ci_completion_id;
+		__entry->segno = chunk->ch_segcount;
+		__entry->position = chunk->ch_position;
+		__entry->handle = segment->rs_handle;
+		__entry->length = segment->rs_length;
+		__entry->offset = segment->rs_offset;
+	),
+
+	TP_printk("cq_id=%u cid=%d segno=%u position=%u %u@0x%016llx:0x%08x",
+		__entry->cq_id, __entry->completion_id,
+		__entry->segno, __entry->position, __entry->length,
+		(unsigned long long)__entry->offset, __entry->handle
+	)
+);
+
+TRACE_EVENT(svcrdma_decode_wseg,
+	TP_PROTO(
+		const struct rpc_rdma_cid *cid,
+		const struct svc_rdma_chunk *chunk,
+		u32 segno
+	),
+
+	TP_ARGS(cid, chunk, segno),
+
+	TP_STRUCT__entry(
+		__field(u32, cq_id)
+		__field(int, completion_id)
+		__field(u32, segno)
+		__field(u32, handle)
+		__field(u32, length)
+		__field(u64, offset)
+	),
+
+	TP_fast_assign(
+		const struct svc_rdma_segment *segment =
+			&chunk->ch_segments[segno];
+
+		__entry->cq_id = cid->ci_queue_id;
+		__entry->completion_id = cid->ci_completion_id;
+		__entry->segno = segno;
+		__entry->handle = segment->rs_handle;
+		__entry->length = segment->rs_length;
+		__entry->offset = segment->rs_offset;
+	),
+
+	TP_printk("cq_id=%u cid=%d segno=%u %u@0x%016llx:0x%08x",
+		__entry->cq_id, __entry->completion_id,
+		__entry->segno, __entry->length,
+		(unsigned long long)__entry->offset, __entry->handle
+	)
+);
+
 DECLARE_EVENT_CLASS(svcrdma_chunk_event,
 	TP_PROTO(
 		u32 length
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index 8ed0377d7a18..55b21bae866d 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -4,5 +4,5 @@ obj-$(CONFIG_SUNRPC_XPRT_RDMA) += rpcrdma.o
 rpcrdma-y := transport.o rpc_rdma.o verbs.o frwr_ops.o \
 	svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \
 	svc_rdma_sendto.o svc_rdma_recvfrom.o svc_rdma_rw.o \
-	module.o
+	svc_rdma_pcl.o module.o
 rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
diff --git a/net/sunrpc/xprtrdma/svc_rdma_pcl.c b/net/sunrpc/xprtrdma/svc_rdma_pcl.c
new file mode 100644
index 000000000000..b63cfeaa2923
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_pcl.c
@@ -0,0 +1,306 @@
+// SPDX-License-Identifier: GPL-2.0
+/*
+ * Copyright (c) 2020 Oracle. All rights reserved.
+ */
+
+#include <linux/sunrpc/svc_rdma.h>
+#include <linux/sunrpc/rpc_rdma.h>
+
+#include "xprt_rdma.h"
+#include <trace/events/rpcrdma.h>
+
+/**
+ * pcl_free - Release all memory associated with a parsed chunk list
+ * @pcl: parsed chunk list
+ *
+ */
+void pcl_free(struct svc_rdma_pcl *pcl)
+{
+	while (!list_empty(&pcl->cl_chunks)) {
+		struct svc_rdma_chunk *chunk;
+
+		chunk = pcl_first_chunk(pcl);
+		list_del(&chunk->ch_list);
+		kfree(chunk);
+	}
+}
+
+static struct svc_rdma_chunk *pcl_alloc_chunk(u32 segcount, u32 position)
+{
+	struct svc_rdma_chunk *chunk;
+
+	chunk = kmalloc(struct_size(chunk, ch_segments, segcount), GFP_KERNEL);
+	if (!chunk)
+		return NULL;
+
+	chunk->ch_position = position;
+	chunk->ch_length = 0;
+	chunk->ch_payload_length = 0;
+	chunk->ch_segcount = 0;
+	return chunk;
+}
+
+static struct svc_rdma_chunk *
+pcl_lookup_position(struct svc_rdma_pcl *pcl, u32 position)
+{
+	struct svc_rdma_chunk *pos;
+
+	pcl_for_each_chunk(pos, pcl) {
+		if (pos->ch_position == position)
+			return pos;
+	}
+	return NULL;
+}
+
+static void pcl_insert_position(struct svc_rdma_pcl *pcl,
+				struct svc_rdma_chunk *chunk)
+{
+	struct svc_rdma_chunk *pos;
+
+	pcl_for_each_chunk(pos, pcl) {
+		if (pos->ch_position > chunk->ch_position)
+			break;
+	}
+	__list_add(&chunk->ch_list, pos->ch_list.prev, &pos->ch_list);
+	pcl->cl_count++;
+}
+
+static void pcl_set_read_segment(const struct svc_rdma_recv_ctxt *rctxt,
+				 struct svc_rdma_chunk *chunk,
+				 u32 handle, u32 length, u64 offset)
+{
+	struct svc_rdma_segment *segment;
+
+	segment = &chunk->ch_segments[chunk->ch_segcount];
+	segment->rs_handle = handle;
+	segment->rs_length = length;
+	segment->rs_offset = offset;
+
+	trace_svcrdma_decode_rseg(&rctxt->rc_cid, chunk, segment);
+
+	chunk->ch_length += length;
+	chunk->ch_segcount++;
+}
+
+/**
+ * pcl_alloc_call - Construct a parsed chunk list for the Call body
+ * @rctxt: Ingress receive context
+ * @p: Start of an un-decoded Read list
+ *
+ * Assumptions:
+ * - The incoming Read list has already been sanity checked.
+ * - cl_count is already set to the number of segments in
+ *   the un-decoded list.
+ * - The list might not be in order by position.
+ *
+ * Return values:
+ *       %true: Parsed chunk list was successfully constructed, and
+ *              cl_count is updated to be the number of chunks (ie.
+ *              unique positions) in the Read list.
+ *      %false: Memory allocation failed.
+ */
+bool pcl_alloc_call(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
+{
+	struct svc_rdma_pcl *pcl = &rctxt->rc_call_pcl;
+	unsigned int i, segcount = pcl->cl_count;
+
+	pcl->cl_count = 0;
+	for (i = 0; i < segcount; i++) {
+		struct svc_rdma_chunk *chunk;
+		u32 position, handle, length;
+		u64 offset;
+
+		p++;	/* skip the list discriminator */
+		p = xdr_decode_read_segment(p, &position, &handle,
+					    &length, &offset);
+		if (position != 0)
+			continue;
+
+		if (pcl_is_empty(pcl)) {
+			chunk = pcl_alloc_chunk(segcount, position);
+			if (!chunk)
+				return false;
+			pcl_insert_position(pcl, chunk);
+		} else {
+			chunk = list_first_entry(&pcl->cl_chunks,
+						 struct svc_rdma_chunk,
+						 ch_list);
+		}
+
+		pcl_set_read_segment(rctxt, chunk, handle, length, offset);
+	}
+
+	return true;
+}
+
+/**
+ * pcl_alloc_read - Construct a parsed chunk list for normal Read chunks
+ * @rctxt: Ingress receive context
+ * @p: Start of an un-decoded Read list
+ *
+ * Assumptions:
+ * - The incoming Read list has already been sanity checked.
+ * - cl_count is already set to the number of segments in
+ *   the un-decoded list.
+ * - The list might not be in order by position.
+ *
+ * Return values:
+ *       %true: Parsed chunk list was successfully constructed, and
+ *              cl_count is updated to be the number of chunks (ie.
+ *              unique position values) in the Read list.
+ *      %false: Memory allocation failed.
+ *
+ * TODO:
+ * - Check for chunk range overlaps
+ */
+bool pcl_alloc_read(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
+{
+	struct svc_rdma_pcl *pcl = &rctxt->rc_read_pcl;
+	unsigned int i, segcount = pcl->cl_count;
+
+	pcl->cl_count = 0;
+	for (i = 0; i < segcount; i++) {
+		struct svc_rdma_chunk *chunk;
+		u32 position, handle, length;
+		u64 offset;
+
+		p++;	/* skip the list discriminator */
+		p = xdr_decode_read_segment(p, &position, &handle,
+					    &length, &offset);
+		if (position == 0)
+			continue;
+
+		chunk = pcl_lookup_position(pcl, position);
+		if (!chunk) {
+			chunk = pcl_alloc_chunk(segcount, position);
+			if (!chunk)
+				return false;
+			pcl_insert_position(pcl, chunk);
+		}
+
+		pcl_set_read_segment(rctxt, chunk, handle, length, offset);
+	}
+
+	return true;
+}
+
+/**
+ * pcl_alloc_write - Construct a parsed chunk list from a Write list
+ * @rctxt: Ingress receive context
+ * @pcl: Parsed chunk list to populate
+ * @p: Start of an un-decoded Write list
+ *
+ * Assumptions:
+ * - The incoming Write list has already been sanity checked, and
+ * - cl_count is set to the number of chunks in the un-decoded list.
+ *
+ * Return values:
+ *       %true: Parsed chunk list was successfully constructed.
+ *      %false: Memory allocation failed.
+ */
+bool pcl_alloc_write(struct svc_rdma_recv_ctxt *rctxt,
+		     struct svc_rdma_pcl *pcl, __be32 *p)
+{
+	struct svc_rdma_segment *segment;
+	struct svc_rdma_chunk *chunk;
+	unsigned int i, j;
+	u32 segcount;
+
+	for (i = 0; i < pcl->cl_count; i++) {
+		p++;	/* skip the list discriminator */
+		segcount = be32_to_cpup(p++);
+
+		chunk = pcl_alloc_chunk(segcount, 0);
+		if (!chunk)
+			return false;
+		list_add_tail(&chunk->ch_list, &pcl->cl_chunks);
+
+		for (j = 0; j < segcount; j++) {
+			segment = &chunk->ch_segments[j];
+			p = xdr_decode_rdma_segment(p, &segment->rs_handle,
+						    &segment->rs_length,
+						    &segment->rs_offset);
+			trace_svcrdma_decode_wseg(&rctxt->rc_cid, chunk, j);
+
+			chunk->ch_length += segment->rs_length;
+			chunk->ch_segcount++;
+		}
+	}
+	return true;
+}
+
+static int pcl_process_region(const struct xdr_buf *xdr,
+			      unsigned int offset, unsigned int length,
+			      int (*actor)(const struct xdr_buf *, void *),
+			      void *data)
+{
+	struct xdr_buf subbuf;
+
+	if (!length)
+		return 0;
+	if (xdr_buf_subsegment(xdr, &subbuf, offset, length))
+		return -EMSGSIZE;
+	return actor(&subbuf, data);
+}
+
+/**
+ * pcl_process_nonpayloads - Process non-payload regions inside @xdr
+ * @pcl: Chunk list to process
+ * @xdr: xdr_buf to process
+ * @actor: Function to invoke on each non-payload region
+ * @data: Arguments for @actor
+ *
+ * This mechanism must ignore not only result payloads that were already
+ * sent via RDMA Write, but also XDR padding for those payloads that
+ * the upper layer has added.
+ *
+ * Assumptions:
+ *  The xdr->len and ch_position fields are aligned to 4-byte multiples.
+ *
+ * Returns:
+ *   On success, zero,
+ *   %-EMSGSIZE on XDR buffer overflow, or
+ *   The return value of @actor
+ */
+int pcl_process_nonpayloads(const struct svc_rdma_pcl *pcl,
+			    const struct xdr_buf *xdr,
+			    int (*actor)(const struct xdr_buf *, void *),
+			    void *data)
+{
+	struct svc_rdma_chunk *chunk, *next;
+	unsigned int start;
+	int ret;
+
+	chunk = pcl_first_chunk(pcl);
+
+	/* No result payloads were generated */
+	if (!chunk || !chunk->ch_payload_length)
+		return actor(xdr, data);
+
+	/* Process the region before the first result payload */
+	ret = pcl_process_region(xdr, 0, chunk->ch_position, actor, data);
+	if (ret < 0)
+		return ret;
+
+	/* Process the regions between each middle result payload */
+	while ((next = pcl_next_chunk(pcl, chunk))) {
+		if (!next->ch_payload_length)
+			break;
+
+		start = pcl_chunk_end_offset(chunk);
+		ret = pcl_process_region(xdr, start, next->ch_position - start,
+					 actor, data);
+		if (ret < 0)
+			return ret;
+
+		chunk = next;
+	}
+
+	/* Process the region after the last result payload */
+	start = pcl_chunk_end_offset(chunk);
+	ret = pcl_process_region(xdr, start, xdr->len - start, actor, data);
+	if (ret < 0)
+		return ret;
+
+	return 0;
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index c6ea2903c21a..ec9d259b149c 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -93,6 +93,7 @@
  * (see rdma_read_complete() below).
  */
 
+#include <linux/slab.h>
 #include <linux/spinlock.h>
 #include <asm/unaligned.h>
 #include <rdma/ib_verbs.h>
@@ -143,6 +144,10 @@ svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma *rdma)
 		goto fail2;
 
 	svc_rdma_recv_cid_init(rdma, &ctxt->rc_cid);
+	pcl_init(&ctxt->rc_call_pcl);
+	pcl_init(&ctxt->rc_read_pcl);
+	pcl_init(&ctxt->rc_write_pcl);
+	pcl_init(&ctxt->rc_reply_pcl);
 
 	ctxt->rc_recv_wr.next = NULL;
 	ctxt->rc_recv_wr.wr_cqe = &ctxt->rc_cqe;
@@ -226,6 +231,11 @@ void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
 	for (i = 0; i < ctxt->rc_page_count; i++)
 		put_page(ctxt->rc_pages[i]);
 
+	pcl_free(&ctxt->rc_call_pcl);
+	pcl_free(&ctxt->rc_read_pcl);
+	pcl_free(&ctxt->rc_write_pcl);
+	pcl_free(&ctxt->rc_reply_pcl);
+
 	if (!ctxt->rc_temp)
 		llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts);
 	else
@@ -385,100 +395,123 @@ static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp,
 	arg->len = ctxt->rc_byte_len;
 }
 
-/* This accommodates the largest possible Write chunk.
- */
-#define MAX_BYTES_WRITE_CHUNK ((u32)(RPCSVC_MAXPAGES << PAGE_SHIFT))
-
-/* This accommodates the largest possible Position-Zero
- * Read chunk or Reply chunk.
- */
-#define MAX_BYTES_SPECIAL_CHUNK ((u32)((RPCSVC_MAXPAGES + 2) << PAGE_SHIFT))
-
-/* Sanity check the Read list.
+/**
+ * xdr_count_read_segments - Count number of Read segments in Read list
+ * @rctxt: Ingress receive context
+ * @p: Start of an un-decoded Read list
  *
- * Implementation limits:
- * - This implementation supports only one Read chunk.
+ * Before allocating anything, ensure the ingress Read list is safe
+ * to use.
  *
- * Sanity checks:
- * - Read list does not overflow Receive buffer.
- * - Segment size limited by largest NFS data payload.
- *
- * The segment count is limited to how many segments can
- * fit in the transport header without overflowing the
- * buffer. That's about 40 Read segments for a 1KB inline
- * threshold.
+ * The segment count is limited to how many segments can fit in the
+ * transport header without overflowing the buffer. That's about 40
+ * Read segments for a 1KB inline threshold.
  *
  * Return values:
- *       %true: Read list is valid. @rctxt's xdr_stream is updated
- *		to point to the first byte past the Read list.
- *      %false: Read list is corrupt. @rctxt's xdr_stream is left
- *		in an unknown state.
+ *   %true: Read list is valid. @rctxt's xdr_stream is updated to point
+ *	    to the first byte past the Read list. rc_read_pcl and
+ *	    rc_call_pcl cl_count fields are set to the number of
+ *	    Read segments in the list.
+ *  %false: Read list is corrupt. @rctxt's xdr_stream is left in an
+ *	    unknown state.
  */
-static bool xdr_check_read_list(struct svc_rdma_recv_ctxt *rctxt)
+static bool xdr_count_read_segments(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
 {
-	u32 position, len;
-	bool first;
-	__be32 *p;
-
-	p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
-	if (!p)
-		return false;
-
-	len = 0;
-	first = true;
+	rctxt->rc_call_pcl.cl_count = 0;
+	rctxt->rc_read_pcl.cl_count = 0;
 	while (xdr_item_is_present(p)) {
+		u32 position, handle, length;
+		u64 offset;
+
 		p = xdr_inline_decode(&rctxt->rc_stream,
 				      rpcrdma_readseg_maxsz * sizeof(*p));
 		if (!p)
 			return false;
 
-		if (first) {
-			position = be32_to_cpup(p);
-			first = false;
-		} else if (be32_to_cpup(p) != position) {
-			return false;
+		xdr_decode_read_segment(p, &position, &handle,
+					    &length, &offset);
+		if (position) {
+			if (position & 3)
+				return false;
+			++rctxt->rc_read_pcl.cl_count;
+		} else {
+			++rctxt->rc_call_pcl.cl_count;
 		}
-		p += 2;
-		len += be32_to_cpup(p);
 
 		p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
 		if (!p)
 			return false;
 	}
-	return len <= MAX_BYTES_SPECIAL_CHUNK;
+	return true;
 }
 
-/* The segment count is limited to how many segments can
- * fit in the transport header without overflowing the
- * buffer. That's about 60 Write segments for a 1KB inline
- * threshold.
+/* Sanity check the Read list.
+ *
+ * Sanity checks:
+ * - Read list does not overflow Receive buffer.
+ * - Chunk size limited by largest NFS data payload.
+ *
+ * Return values:
+ *   %true: Read list is valid. @rctxt's xdr_stream is updated
+ *	    to point to the first byte past the Read list.
+ *  %false: Read list is corrupt. @rctxt's xdr_stream is left
+ *	    in an unknown state.
  */
-static bool xdr_check_write_chunk(struct svc_rdma_recv_ctxt *rctxt, u32 maxlen)
+static bool xdr_check_read_list(struct svc_rdma_recv_ctxt *rctxt)
 {
-	u32 i, segcount, total;
 	__be32 *p;
 
 	p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
 	if (!p)
 		return false;
-	segcount = be32_to_cpup(p);
+	if (!xdr_count_read_segments(rctxt, p))
+		return false;
+	if (!pcl_alloc_call(rctxt, p))
+		return false;
+	return pcl_alloc_read(rctxt, p);
+}
 
-	total = 0;
-	for (i = 0; i < segcount; i++) {
-		u32 handle, length;
-		u64 offset;
+static bool xdr_check_write_chunk(struct svc_rdma_recv_ctxt *rctxt)
+{
+	u32 segcount;
+	__be32 *p;
 
-		p = xdr_inline_decode(&rctxt->rc_stream,
-				      rpcrdma_segment_maxsz * sizeof(*p));
-		if (!p)
-			return false;
+	if (xdr_stream_decode_u32(&rctxt->rc_stream, &segcount))
+		return false;
 
-		xdr_decode_rdma_segment(p, &handle, &length, &offset);
-		trace_svcrdma_decode_wseg(handle, length, offset);
+	/* A bogus segcount causes this buffer overflow check to fail. */
+	p = xdr_inline_decode(&rctxt->rc_stream,
+			      segcount * rpcrdma_segment_maxsz * sizeof(*p));
+	return p != NULL;
+}
 
-		total += length;
+/**
+ * xdr_count_write_chunks - Count number of Write chunks in Write list
+ * @rctxt: Received header and decoding state
+ * @p: start of an un-decoded Write list
+ *
+ * Before allocating anything, ensure the ingress Write list is
+ * safe to use.
+ *
+ * Return values:
+ *       %true: Write list is valid. @rctxt's xdr_stream is updated
+ *		to point to the first byte past the Write list, and
+ *		the number of Write chunks is in rc_write_pcl.cl_count.
+ *      %false: Write list is corrupt. @rctxt's xdr_stream is left
+ *		in an indeterminate state.
+ */
+static bool xdr_count_write_chunks(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
+{
+	rctxt->rc_write_pcl.cl_count = 0;
+	while (xdr_item_is_present(p)) {
+		if (!xdr_check_write_chunk(rctxt))
+			return false;
+		++rctxt->rc_write_pcl.cl_count;
+		p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
+		if (!p)
+			return false;
 	}
-	return total <= maxlen;
+	return true;
 }
 
 /* Sanity check the Write list.
@@ -498,24 +531,22 @@ static bool xdr_check_write_chunk(struct svc_rdma_recv_ctxt *rctxt, u32 maxlen)
  */
 static bool xdr_check_write_list(struct svc_rdma_recv_ctxt *rctxt)
 {
-	u32 chcount = 0;
 	__be32 *p;
 
 	p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
 	if (!p)
 		return false;
-	rctxt->rc_write_list = p;
-	while (xdr_item_is_present(p)) {
-		if (!xdr_check_write_chunk(rctxt, MAX_BYTES_WRITE_CHUNK))
-			return false;
-		++chcount;
-		p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
-		if (!p)
-			return false;
-	}
-	if (!chcount)
-		rctxt->rc_write_list = NULL;
-	return chcount < 2;
+
+	rctxt->rc_write_list = NULL;
+	if (!xdr_count_write_chunks(rctxt, p))
+		return false;
+	if (!pcl_alloc_write(rctxt, &rctxt->rc_write_pcl, p))
+		return false;
+
+	if (!pcl_is_empty(&rctxt->rc_write_pcl))
+		rctxt->rc_write_list = p;
+	rctxt->rc_cur_result_payload = pcl_first_chunk(&rctxt->rc_write_pcl);
+	return rctxt->rc_write_pcl.cl_count < 2;
 }
 
 /* Sanity check the Reply chunk.
@@ -537,13 +568,16 @@ static bool xdr_check_reply_chunk(struct svc_rdma_recv_ctxt *rctxt)
 	p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
 	if (!p)
 		return false;
+
 	rctxt->rc_reply_chunk = NULL;
-	if (xdr_item_is_present(p)) {
-		if (!xdr_check_write_chunk(rctxt, MAX_BYTES_SPECIAL_CHUNK))
-			return false;
-		rctxt->rc_reply_chunk = p;
-	}
-	return true;
+	if (!xdr_item_is_present(p))
+		return true;
+	if (!xdr_check_write_chunk(rctxt))
+		return false;
+
+	rctxt->rc_reply_chunk = p;
+	rctxt->rc_reply_pcl.cl_count = 1;
+	return pcl_alloc_write(rctxt, &rctxt->rc_reply_pcl, p);
 }
 
 /* RPC-over-RDMA Version One private extension: Remote Invalidation.



^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 09/20] svcrdma: Use parsed chunk lists to derive the inv_rkey
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
                   ` (7 preceding siblings ...)
  2020-10-26 18:54 ` [PATCH 08/20] svcrdma: Add a "parsed chunk list" data structure Chuck Lever
@ 2020-10-26 18:54 ` Chuck Lever
  2020-10-26 18:54 ` [PATCH 10/20] svcrdma: Use parsed chunk lists to detect reverse direction replies Chuck Lever
                   ` (11 subsequent siblings)
  20 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:54 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

Refactor: Don't duplicate header decoding smarts here. Instead, use
the new parsed chunk lists.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 net/sunrpc/xprtrdma/svc_rdma_recvfrom.c |   67 ++++++++++++++-----------------
 1 file changed, 30 insertions(+), 37 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index ec9d259b149c..2755ca178b09 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -586,60 +586,53 @@ static bool xdr_check_reply_chunk(struct svc_rdma_recv_ctxt *rctxt)
  *
  * If there is exactly one distinct R_key in the received transport
  * header, set rc_inv_rkey to that R_key. Otherwise, set it to zero.
- *
- * Perform this operation while the received transport header is
- * still in the CPU cache.
  */
 static void svc_rdma_get_inv_rkey(struct svcxprt_rdma *rdma,
 				  struct svc_rdma_recv_ctxt *ctxt)
 {
-	__be32 inv_rkey, *p;
-	u32 i, segcount;
+	struct svc_rdma_segment *segment;
+	struct svc_rdma_chunk *chunk;
+	u32 inv_rkey;
 
 	ctxt->rc_inv_rkey = 0;
 
 	if (!rdma->sc_snd_w_inv)
 		return;
 
-	inv_rkey = xdr_zero;
-	p = ctxt->rc_recv_buf;
-	p += rpcrdma_fixed_maxsz;
-
-	/* Read list */
-	while (xdr_item_is_present(p++)) {
-		p++;	/* position */
-		if (inv_rkey == xdr_zero)
-			inv_rkey = *p;
-		else if (inv_rkey != *p)
-			return;
-		p += 4;
+	inv_rkey = 0;
+	pcl_for_each_chunk(chunk, &ctxt->rc_call_pcl) {
+		pcl_for_each_segment(segment, chunk) {
+			if (inv_rkey == 0)
+				inv_rkey = segment->rs_handle;
+			else if (inv_rkey != segment->rs_handle)
+				return;
+		}
 	}
-
-	/* Write list */
-	while (xdr_item_is_present(p++)) {
-		segcount = be32_to_cpup(p++);
-		for (i = 0; i < segcount; i++) {
-			if (inv_rkey == xdr_zero)
-				inv_rkey = *p;
-			else if (inv_rkey != *p)
+	pcl_for_each_chunk(chunk, &ctxt->rc_read_pcl) {
+		pcl_for_each_segment(segment, chunk) {
+			if (inv_rkey == 0)
+				inv_rkey = segment->rs_handle;
+			else if (inv_rkey != segment->rs_handle)
 				return;
-			p += 4;
 		}
 	}
-
-	/* Reply chunk */
-	if (xdr_item_is_present(p++)) {
-		segcount = be32_to_cpup(p++);
-		for (i = 0; i < segcount; i++) {
-			if (inv_rkey == xdr_zero)
-				inv_rkey = *p;
-			else if (inv_rkey != *p)
+	pcl_for_each_chunk(chunk, &ctxt->rc_write_pcl) {
+		pcl_for_each_segment(segment, chunk) {
+			if (inv_rkey == 0)
+				inv_rkey = segment->rs_handle;
+			else if (inv_rkey != segment->rs_handle)
 				return;
-			p += 4;
 		}
 	}
-
-	ctxt->rc_inv_rkey = be32_to_cpu(inv_rkey);
+	pcl_for_each_chunk(chunk, &ctxt->rc_reply_pcl) {
+		pcl_for_each_segment(segment, chunk) {
+			if (inv_rkey == 0)
+				inv_rkey = segment->rs_handle;
+			else if (inv_rkey != segment->rs_handle)
+				return;
+		}
+	}
+	ctxt->rc_inv_rkey = inv_rkey;
 }
 
 /**



^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 10/20] svcrdma: Use parsed chunk lists to detect reverse direction replies
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
                   ` (8 preceding siblings ...)
  2020-10-26 18:54 ` [PATCH 09/20] svcrdma: Use parsed chunk lists to derive the inv_rkey Chuck Lever
@ 2020-10-26 18:54 ` Chuck Lever
  2020-10-26 18:54 ` [PATCH 11/20] svcrdma: Use parsed chunk lists to construct RDMA Writes Chuck Lever
                   ` (10 subsequent siblings)
  20 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:54 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

Refactor: Don't duplicate header decoding smarts here. Instead, use
the new parsed chunk lists.

Note that the XID sanity test is also removed. The XID is already
looked up by the cb handler, and is rejected if it's not recognized.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/linux/sunrpc/svc_rdma.h         |    1 +
 net/sunrpc/xprtrdma/svc_rdma_recvfrom.c |   29 ++++++++++++++---------------
 2 files changed, 15 insertions(+), 15 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index a89d4209fe2a..74247a33b6c6 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -144,6 +144,7 @@ struct svc_rdma_recv_ctxt {
 	unsigned int		rc_page_count;
 	unsigned int		rc_hdr_count;
 	u32			rc_inv_rkey;
+	__be32			rc_msgtype;
 
 	struct svc_rdma_pcl	rc_call_pcl;
 
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 2755ca178b09..72b07e8aa3c9 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -668,7 +668,8 @@ static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg,
 	if (*p != rpcrdma_version)
 		goto out_version;
 	p += 2;
-	switch (*p) {
+	rctxt->rc_msgtype = *p;
+	switch (rctxt->rc_msgtype) {
 	case rdma_msg:
 		break;
 	case rdma_nomsg:
@@ -762,30 +763,28 @@ static void svc_rdma_send_error(struct svcxprt_rdma *rdma,
  * the RPC/RDMA header small and fixed in size, so it is
  * straightforward to check the RPC header's direction field.
  */
-static bool svc_rdma_is_backchannel_reply(struct svc_xprt *xprt,
-					  __be32 *rdma_resp)
+static bool svc_rdma_is_reverse_direction_reply(struct svc_xprt *xprt,
+						struct svc_rdma_recv_ctxt *rctxt)
 {
-	__be32 *p;
+	__be32 *p = rctxt->rc_recv_buf;
 
 	if (!xprt->xpt_bc_xprt)
 		return false;
 
-	p = rdma_resp + 3;
-	if (*p++ != rdma_msg)
+	if (rctxt->rc_msgtype != rdma_msg)
 		return false;
 
-	if (*p++ != xdr_zero)
+	if (!pcl_is_empty(&rctxt->rc_call_pcl))
+		return false;
+	if (!pcl_is_empty(&rctxt->rc_read_pcl))
 		return false;
-	if (*p++ != xdr_zero)
+	if (!pcl_is_empty(&rctxt->rc_write_pcl))
 		return false;
-	if (*p++ != xdr_zero)
+	if (!pcl_is_empty(&rctxt->rc_reply_pcl))
 		return false;
 
-	/* XID sanity */
-	if (*p++ != *rdma_resp)
-		return false;
-	/* call direction */
-	if (*p == cpu_to_be32(RPC_CALL))
+	/* RPC call direction */
+	if (*(p + 8) == cpu_to_be32(RPC_CALL))
 		return false;
 
 	return true;
@@ -868,7 +867,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
 		goto out_drop;
 	rqstp->rq_xprt_hlen = ret;
 
-	if (svc_rdma_is_backchannel_reply(xprt, p))
+	if (svc_rdma_is_reverse_direction_reply(xprt, ctxt))
 		goto out_backchannel;
 
 	svc_rdma_get_inv_rkey(rdma_xprt, ctxt);



^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 11/20] svcrdma: Use parsed chunk lists to construct RDMA Writes
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
                   ` (9 preceding siblings ...)
  2020-10-26 18:54 ` [PATCH 10/20] svcrdma: Use parsed chunk lists to detect reverse direction replies Chuck Lever
@ 2020-10-26 18:54 ` Chuck Lever
  2020-10-26 18:54 ` [PATCH 12/20] svcrdma: Use parsed chunk lists to encode Reply transport headers Chuck Lever
                   ` (9 subsequent siblings)
  20 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:54 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

Refactor: Instead of re-parsing the ingress RPC Call transport
header when constructing RDMA Writes, use the new parsed chunk lists
for the Write list and Reply chunk, which are version-agnostic and
already XDR-decoded.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/linux/sunrpc/svc_rdma.h         |    5 +--
 net/sunrpc/xprtrdma/svc_rdma_recvfrom.c |    1 -
 net/sunrpc/xprtrdma/svc_rdma_rw.c       |   47 +++++++++++++++----------------
 net/sunrpc/xprtrdma/svc_rdma_sendto.c   |   22 +++++++++------
 4 files changed, 38 insertions(+), 37 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 74247a33b6c6..d9148787efff 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -157,8 +157,6 @@ struct svc_rdma_recv_ctxt {
 	__be32			*rc_reply_chunk;
 	struct svc_rdma_pcl	rc_reply_pcl;
 
-	unsigned int		rc_read_payload_offset;
-	unsigned int		rc_read_payload_length;
 	struct page		*rc_pages[RPCSVC_MAXPAGES];
 };
 
@@ -196,7 +194,8 @@ extern int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma,
 				    struct svc_rqst *rqstp,
 				    struct svc_rdma_recv_ctxt *head, __be32 *p);
 extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
-				     __be32 *wr_ch, const struct xdr_buf *xdr);
+				     const struct svc_rdma_chunk *chunk,
+				     const struct xdr_buf *xdr);
 extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
 				     const struct svc_rdma_recv_ctxt *rctxt,
 				     struct xdr_buf *xdr);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 72b07e8aa3c9..7d44e9d2e7a3 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -207,7 +207,6 @@ svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
 
 out:
 	ctxt->rc_page_count = 0;
-	ctxt->rc_read_payload_length = 0;
 	return ctxt;
 
 out_empty:
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 5f667d964cd6..05dd0896860f 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -190,11 +190,11 @@ static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc,
  *  - Stores arguments for the SGL constructor functions
  */
 struct svc_rdma_write_info {
+	const struct svc_rdma_chunk	*wi_chunk;
+
 	/* write state of this chunk */
 	unsigned int		wi_seg_off;
 	unsigned int		wi_seg_no;
-	unsigned int		wi_nsegs;
-	__be32			*wi_segs;
 
 	/* SGL constructor arguments */
 	const struct xdr_buf	*wi_xdr;
@@ -205,7 +205,8 @@ struct svc_rdma_write_info {
 };
 
 static struct svc_rdma_write_info *
-svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk)
+svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma,
+			  const struct svc_rdma_chunk *chunk)
 {
 	struct svc_rdma_write_info *info;
 
@@ -213,10 +214,9 @@ svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk)
 	if (!info)
 		return info;
 
+	info->wi_chunk = chunk;
 	info->wi_seg_off = 0;
 	info->wi_seg_no = 0;
-	info->wi_nsegs = be32_to_cpup(++chunk);
-	info->wi_segs = ++chunk;
 	svc_rdma_cc_init(rdma, &info->wi_cc);
 	info->wi_cc.cc_cqe.done = svc_rdma_write_done;
 	return info;
@@ -443,40 +443,36 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info,
 {
 	struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
 	struct svcxprt_rdma *rdma = cc->cc_rdma;
+	const struct svc_rdma_segment *seg;
 	struct svc_rdma_rw_ctxt *ctxt;
-	__be32 *seg;
 	int ret;
 
-	seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz;
 	do {
 		unsigned int write_len;
-		u32 handle, length;
 		u64 offset;
 
-		if (info->wi_seg_no >= info->wi_nsegs)
+		seg = &info->wi_chunk->ch_segments[info->wi_seg_no];
+		if (!seg)
 			goto out_overflow;
 
-		xdr_decode_rdma_segment(seg, &handle, &length, &offset);
-		offset += info->wi_seg_off;
-
-		write_len = min(remaining, length - info->wi_seg_off);
+		write_len = min(remaining, seg->rs_length - info->wi_seg_off);
 		ctxt = svc_rdma_get_rw_ctxt(rdma,
 					    (write_len >> PAGE_SHIFT) + 2);
 		if (!ctxt)
 			return -ENOMEM;
 
 		constructor(info, write_len, ctxt);
-		ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, handle,
+		offset = seg->rs_offset + info->wi_seg_off;
+		ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle,
 					   DMA_TO_DEVICE);
 		if (ret < 0)
 			return -EIO;
 
-		trace_svcrdma_send_wseg(handle, write_len, offset);
+		trace_svcrdma_send_wseg(seg->rs_handle, write_len, offset);
 
 		list_add(&ctxt->rw_list, &cc->cc_rwctxts);
 		cc->cc_sqecount += ret;
-		if (write_len == length - info->wi_seg_off) {
-			seg += 4;
+		if (write_len == seg->rs_length - info->wi_seg_off) {
 			info->wi_seg_no++;
 			info->wi_seg_off = 0;
 		} else {
@@ -489,7 +485,7 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info,
 
 out_overflow:
 	trace_svcrdma_small_wrch_err(rdma, remaining, info->wi_seg_no,
-				     info->wi_nsegs);
+				     info->wi_chunk->ch_segcount);
 	return -E2BIG;
 }
 
@@ -577,7 +573,7 @@ static int svc_rdma_xb_write(const struct xdr_buf *xdr,
 /**
  * svc_rdma_send_write_chunk - Write all segments in a Write chunk
  * @rdma: controlling RDMA transport
- * @wr_ch: Write chunk provided by client
+ * @chunk: Write chunk provided by the client
  * @xdr: xdr_buf containing the data payload
  *
  * Returns a non-negative number of bytes the chunk consumed, or
@@ -587,13 +583,14 @@ static int svc_rdma_xb_write(const struct xdr_buf *xdr,
  *	%-ENOTCONN if posting failed (connection is lost),
  *	%-EIO if rdma_rw initialization failed (DMA mapping, etc).
  */
-int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
+int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
+			      const struct svc_rdma_chunk *chunk,
 			      const struct xdr_buf *xdr)
 {
 	struct svc_rdma_write_info *info;
 	int ret;
 
-	info = svc_rdma_write_info_alloc(rdma, wr_ch);
+	info = svc_rdma_write_info_alloc(rdma, chunk);
 	if (!info)
 		return -ENOMEM;
 
@@ -631,12 +628,14 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
 			      struct xdr_buf *xdr)
 {
 	struct svc_rdma_write_info *info;
+	struct svc_rdma_chunk *chunk;
 	int consumed, ret;
 
-	if (!rctxt->rc_reply_chunk)
+	if (pcl_is_empty(&rctxt->rc_reply_pcl))
 		return 0;
 
-	info = svc_rdma_write_info_alloc(rdma, rctxt->rc_reply_chunk);
+	chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
+	info = svc_rdma_write_info_alloc(rdma, chunk);
 	if (!info)
 		return -ENOMEM;
 
@@ -648,7 +647,7 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
 	/* Send the page list in the Reply chunk only if the
 	 * client did not provide Write chunks.
 	 */
-	if (!rctxt->rc_write_list && xdr->page_len) {
+	if (pcl_is_empty(&rctxt->rc_write_pcl) && xdr->page_len) {
 		ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len,
 					   xdr->page_len);
 		if (ret < 0)
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 3e7ba06788b0..f697e79757a6 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -466,12 +466,14 @@ static ssize_t
 svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
 			   struct svc_rdma_send_ctxt *sctxt)
 {
+	struct svc_rdma_chunk *chunk;
 	ssize_t len, ret;
 
 	len = 0;
 	if (rctxt->rc_write_list) {
+		chunk = pcl_first_chunk(&rctxt->rc_write_pcl);
 		ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt,
-						  rctxt->rc_read_payload_length);
+						  chunk->ch_payload_length);
 		if (ret < 0)
 			return ret;
 		len = ret;
@@ -979,22 +981,24 @@ int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
 			    unsigned int length)
 {
 	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
+	struct svc_rdma_chunk *chunk;
 	struct svcxprt_rdma *rdma;
 	struct xdr_buf subbuf;
 
-	if (!rctxt->rc_write_list || !length)
+	chunk = rctxt->rc_cur_result_payload;
+	if (!length || !chunk)
 		return 0;
+	rctxt->rc_cur_result_payload =
+		pcl_next_chunk(&rctxt->rc_write_pcl, chunk);
+	if (length > chunk->ch_length)
+		return -E2BIG;
 
-	/* XXX: Just one READ payload slot for now, since our
-	 * transport implementation currently supports only one
-	 * Write chunk.
-	 */
-	rctxt->rc_read_payload_offset = offset;
-	rctxt->rc_read_payload_length = length;
+	chunk->ch_position = offset;
+	chunk->ch_payload_length = length;
 
 	if (xdr_buf_subsegment(&rqstp->rq_res, &subbuf, offset, length))
 		return -EMSGSIZE;
 
 	rdma = container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
-	return svc_rdma_send_write_chunk(rdma, rctxt->rc_write_list, &subbuf);
+	return svc_rdma_send_write_chunk(rdma, chunk, &subbuf);
 }



^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 12/20] svcrdma: Use parsed chunk lists to encode Reply transport headers
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
                   ` (10 preceding siblings ...)
  2020-10-26 18:54 ` [PATCH 11/20] svcrdma: Use parsed chunk lists to construct RDMA Writes Chuck Lever
@ 2020-10-26 18:54 ` Chuck Lever
  2020-10-26 18:55 ` [PATCH 13/20] svcrdma: Support multiple write chunks when pulling up Chuck Lever
                   ` (8 subsequent siblings)
  20 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:54 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

Refactor: Instead of re-parsing the ingress RPC Call transport
header when constructing the egress RPC Reply transport header, use
the new parsed Write list and Reply chunk, which are version-
agnostic and already XDR decoded.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/trace/events/rpcrdma.h        |   37 +++++++++++-
 net/sunrpc/xprtrdma/svc_rdma_sendto.c |  105 ++++++++++++++-------------------
 2 files changed, 80 insertions(+), 62 deletions(-)

diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index 72b941aef43b..5218e0f9596a 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -1447,9 +1447,44 @@ DECLARE_EVENT_CLASS(svcrdma_segment_event,
 				TP_ARGS(handle, length, offset))
 
 DEFINE_SEGMENT_EVENT(send_rseg);
-DEFINE_SEGMENT_EVENT(encode_wseg);
 DEFINE_SEGMENT_EVENT(send_wseg);
 
+TRACE_EVENT(svcrdma_encode_wseg,
+	TP_PROTO(
+		const struct svc_rdma_send_ctxt *ctxt,
+		u32 segno,
+		u32 handle,
+		u32 length,
+		u64 offset
+	),
+
+	TP_ARGS(ctxt, segno, handle, length, offset),
+
+	TP_STRUCT__entry(
+		__field(u32, cq_id)
+		__field(int, completion_id)
+		__field(u32, segno)
+		__field(u32, handle)
+		__field(u32, length)
+		__field(u64, offset)
+	),
+
+	TP_fast_assign(
+		__entry->cq_id = ctxt->sc_cid.ci_queue_id;
+		__entry->completion_id = ctxt->sc_cid.ci_completion_id;
+		__entry->segno = segno;
+		__entry->handle = handle;
+		__entry->length = length;
+		__entry->offset = offset;
+	),
+
+	TP_printk("cq_id=%u cid=%d segno=%u %u@0x%016llx:0x%08x",
+		__entry->cq_id, __entry->completion_id,
+		__entry->segno, __entry->length,
+		(unsigned long long)__entry->offset, __entry->handle
+	)
+);
+
 TRACE_EVENT(svcrdma_decode_rseg,
 	TP_PROTO(
 		const struct rpc_rdma_cid *cid,
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index f697e79757a6..fd8d62b1e640 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -358,49 +358,42 @@ static ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt)
 
 /**
  * svc_rdma_encode_write_segment - Encode one Write segment
- * @src: matching Write chunk in the RPC Call header
  * @sctxt: Send context for the RPC Reply
+ * @chunk: Write chunk to push
  * @remaining: remaining bytes of the payload left in the Write chunk
+ * @segno: which segment in the chunk
  *
  * Return values:
  *   On success, returns length in bytes of the Reply XDR buffer
- *   that was consumed by the Write segment
+ *   that was consumed by the Write segment, and updates @remaining
  *   %-EMSGSIZE on XDR buffer overflow
  */
-static ssize_t svc_rdma_encode_write_segment(__be32 *src,
-					     struct svc_rdma_send_ctxt *sctxt,
-					     unsigned int *remaining)
+static ssize_t svc_rdma_encode_write_segment(struct svc_rdma_send_ctxt *sctxt,
+					     const struct svc_rdma_chunk *chunk,
+					     u32 *remaining, unsigned int segno)
 {
+	const struct svc_rdma_segment *segment = &chunk->ch_segments[segno];
+	const size_t len = rpcrdma_segment_maxsz * sizeof(__be32);
+	u32 length;
 	__be32 *p;
-	const size_t len = rpcrdma_segment_maxsz * sizeof(*p);
-	u32 handle, length;
-	u64 offset;
 
 	p = xdr_reserve_space(&sctxt->sc_stream, len);
 	if (!p)
 		return -EMSGSIZE;
 
-	xdr_decode_rdma_segment(src, &handle, &length, &offset);
-
-	if (*remaining < length) {
-		/* segment only partly filled */
-		length = *remaining;
-		*remaining = 0;
-	} else {
-		/* entire segment was consumed */
-		*remaining -= length;
-	}
-	xdr_encode_rdma_segment(p, handle, length, offset);
-
-	trace_svcrdma_encode_wseg(handle, length, offset);
+	length = min_t(u32, *remaining, segment->rs_length);
+	*remaining -= length;
+	xdr_encode_rdma_segment(p, segment->rs_handle, length,
+				segment->rs_offset);
+	trace_svcrdma_encode_wseg(sctxt, segno, segment->rs_handle, length,
+				  segment->rs_offset);
 	return len;
 }
 
 /**
  * svc_rdma_encode_write_chunk - Encode one Write chunk
- * @src: matching Write chunk in the RPC Call header
  * @sctxt: Send context for the RPC Reply
- * @remaining: size in bytes of the payload in the Write chunk
+ * @chunk: Write chunk to push
  *
  * Copy a Write chunk from the Call transport header to the
  * Reply transport header. Update each segment's length field
@@ -411,33 +404,30 @@ static ssize_t svc_rdma_encode_write_segment(__be32 *src,
  *   that was consumed by the Write chunk
  *   %-EMSGSIZE on XDR buffer overflow
  */
-static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
-					   struct svc_rdma_send_ctxt *sctxt,
-					   unsigned int remaining)
+static ssize_t svc_rdma_encode_write_chunk(struct svc_rdma_send_ctxt *sctxt,
+					   const struct svc_rdma_chunk *chunk)
 {
-	unsigned int i, nsegs;
+	u32 remaining = chunk->ch_payload_length;
+	unsigned int segno;
 	ssize_t len, ret;
 
-	len = 0;
 	trace_svcrdma_encode_write_chunk(remaining);
 
-	src++;
+	len = 0;
 	ret = xdr_stream_encode_item_present(&sctxt->sc_stream);
 	if (ret < 0)
-		return -EMSGSIZE;
+		return ret;
 	len += ret;
 
-	nsegs = be32_to_cpup(src++);
-	ret = xdr_stream_encode_u32(&sctxt->sc_stream, nsegs);
+	ret = xdr_stream_encode_u32(&sctxt->sc_stream, chunk->ch_segcount);
 	if (ret < 0)
-		return -EMSGSIZE;
+		return ret;
 	len += ret;
 
-	for (i = nsegs; i; i--) {
-		ret = svc_rdma_encode_write_segment(src, sctxt, &remaining);
+	for (segno = 0; segno < chunk->ch_segcount; segno++) {
+		ret = svc_rdma_encode_write_segment(sctxt, chunk, &remaining, segno);
 		if (ret < 0)
-			return -EMSGSIZE;
-		src += rpcrdma_segment_maxsz;
+			return ret;
 		len += ret;
 	}
 
@@ -449,34 +439,23 @@ static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
  * @rctxt: Reply context with information about the RPC Call
  * @sctxt: Send context for the RPC Reply
  *
- * The client provides a Write chunk list in the Call message. Fill
- * in the segments in the first Write chunk in the Reply's transport
- * header with the number of bytes consumed in each segment.
- * Remaining chunks are returned unused.
- *
- * Assumptions:
- *  - Client has provided only one Write chunk
- *
  * Return values:
  *   On success, returns length in bytes of the Reply XDR buffer
  *   that was consumed by the Reply's Write list
  *   %-EMSGSIZE on XDR buffer overflow
  */
-static ssize_t
-svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
-			   struct svc_rdma_send_ctxt *sctxt)
+static ssize_t svc_rdma_encode_write_list(struct svc_rdma_recv_ctxt *rctxt,
+					  struct svc_rdma_send_ctxt *sctxt)
 {
 	struct svc_rdma_chunk *chunk;
 	ssize_t len, ret;
 
 	len = 0;
-	if (rctxt->rc_write_list) {
-		chunk = pcl_first_chunk(&rctxt->rc_write_pcl);
-		ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt,
-						  chunk->ch_payload_length);
+	pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) {
+		ret = svc_rdma_encode_write_chunk(sctxt, chunk);
 		if (ret < 0)
 			return ret;
-		len = ret;
+		len += ret;
 	}
 
 	/* Terminate the Write list */
@@ -493,24 +472,28 @@ svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
  * @sctxt: Send context for the RPC Reply
  * @length: size in bytes of the payload in the Reply chunk
  *
- * Assumptions:
- * - Reply can always fit in the client-provided Reply chunk
- *
  * Return values:
  *   On success, returns length in bytes of the Reply XDR buffer
  *   that was consumed by the Reply's Reply chunk
  *   %-EMSGSIZE on XDR buffer overflow
+ *   %-E2BIG if the RPC message is larger than the Reply chunk
  */
 static ssize_t
-svc_rdma_encode_reply_chunk(const struct svc_rdma_recv_ctxt *rctxt,
+svc_rdma_encode_reply_chunk(struct svc_rdma_recv_ctxt *rctxt,
 			    struct svc_rdma_send_ctxt *sctxt,
 			    unsigned int length)
 {
-	if (!rctxt->rc_reply_chunk)
+	struct svc_rdma_chunk *chunk;
+
+	if (pcl_is_empty(&rctxt->rc_reply_pcl))
 		return xdr_stream_encode_item_absent(&sctxt->sc_stream);
 
-	return svc_rdma_encode_write_chunk(rctxt->rc_reply_chunk, sctxt,
-					   length);
+	chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
+	if (length > chunk->ch_length)
+		return -E2BIG;
+
+	chunk->ch_payload_length = length;
+	return svc_rdma_encode_write_chunk(sctxt, chunk);
 }
 
 static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
@@ -928,7 +911,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 	*p++ = *rdma_argp;
 	*p++ = *(rdma_argp + 1);
 	*p++ = rdma->sc_fc_credits;
-	*p = rctxt->rc_reply_chunk ? rdma_nomsg : rdma_msg;
+	*p = pcl_is_empty(&rctxt->rc_reply_pcl) ? rdma_msg : rdma_nomsg;
 
 	if (svc_rdma_encode_read_list(sctxt) < 0)
 		goto err0;



^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 13/20] svcrdma: Support multiple write chunks when pulling up
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
                   ` (11 preceding siblings ...)
  2020-10-26 18:54 ` [PATCH 12/20] svcrdma: Use parsed chunk lists to encode Reply transport headers Chuck Lever
@ 2020-10-26 18:55 ` Chuck Lever
  2020-10-26 18:55 ` [PATCH 14/20] svcrdma: Support multiple Write chunks in svc_rdma_map_reply_msg() Chuck Lever
                   ` (7 subsequent siblings)
  20 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:55 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

When counting the number of SGEs needed to construct a Send request,
do not count result payloads. And, when copying the Reply message
into the pull-up buffer, result payloads are not to be copied to the
Send buffer.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/linux/sunrpc/svc_rdma.h            |    2 
 include/trace/events/rpcrdma.h             |   20 ++-
 net/sunrpc/xprtrdma/svc_rdma_backchannel.c |   14 +-
 net/sunrpc/xprtrdma/svc_rdma_recvfrom.c    |    9 +
 net/sunrpc/xprtrdma/svc_rdma_sendto.c      |  188 +++++++++++++++++-----------
 5 files changed, 146 insertions(+), 87 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index d9148787efff..7090af1a9791 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -182,6 +182,8 @@ extern void svc_rdma_handle_bc_reply(struct svc_rqst *rqstp,
 /* svc_rdma_recvfrom.c */
 extern void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma);
 extern bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma);
+extern struct svc_rdma_recv_ctxt *
+		svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma);
 extern void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
 				   struct svc_rdma_recv_ctxt *ctxt);
 extern void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma);
diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index 5218e0f9596a..afc58accb9cf 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -1805,20 +1805,30 @@ TRACE_EVENT(svcrdma_small_wrch_err,
 
 TRACE_EVENT(svcrdma_send_pullup,
 	TP_PROTO(
-		unsigned int len
+		const struct svc_rdma_send_ctxt *ctxt,
+		unsigned int msglen
 	),
 
-	TP_ARGS(len),
+	TP_ARGS(ctxt, msglen),
 
 	TP_STRUCT__entry(
-		__field(unsigned int, len)
+		__field(u32, cq_id)
+		__field(int, completion_id)
+		__field(unsigned int, hdrlen)
+		__field(unsigned int, msglen)
 	),
 
 	TP_fast_assign(
-		__entry->len = len;
+		__entry->cq_id = ctxt->sc_cid.ci_queue_id;
+		__entry->completion_id = ctxt->sc_cid.ci_completion_id;
+		__entry->hdrlen = ctxt->sc_hdrbuf.len,
+		__entry->msglen = msglen;
 	),
 
-	TP_printk("len=%u", __entry->len)
+	TP_printk("cq_id=%u cid=%d hdr=%u msg=%u (total %u)",
+		__entry->cq_id, __entry->completion_id,
+		__entry->hdrlen, __entry->msglen,
+		__entry->hdrlen + __entry->msglen)
 );
 
 TRACE_EVENT(svcrdma_send_err,
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 5e7c4ba9e147..63f8be974df2 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -74,11 +74,17 @@ void svc_rdma_handle_bc_reply(struct svc_rqst *rqstp,
  */
 static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
 			      struct rpc_rqst *rqst,
-			      struct svc_rdma_send_ctxt *ctxt)
+			      struct svc_rdma_send_ctxt *sctxt)
 {
+	struct svc_rdma_recv_ctxt *rctxt;
 	int ret;
 
-	ret = svc_rdma_map_reply_msg(rdma, ctxt, NULL, &rqst->rq_snd_buf);
+	rctxt = svc_rdma_recv_ctxt_get(rdma);
+	if (!rctxt)
+		return -EIO;
+
+	ret = svc_rdma_map_reply_msg(rdma, sctxt, rctxt, &rqst->rq_snd_buf);
+	svc_rdma_recv_ctxt_put(rdma, rctxt);
 	if (ret < 0)
 		return -EIO;
 
@@ -86,8 +92,8 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
 	 * the rq_buffer before all retransmits are complete.
 	 */
 	get_page(virt_to_page(rqst->rq_buffer));
-	ctxt->sc_send_wr.opcode = IB_WR_SEND;
-	return svc_rdma_send(rdma, ctxt);
+	sctxt->sc_send_wr.opcode = IB_WR_SEND;
+	return svc_rdma_send(rdma, sctxt);
 }
 
 /* Server-side transport endpoint wants a whole page for its send
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 7d44e9d2e7a3..af32c3ad45a6 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -194,8 +194,13 @@ void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma)
 	}
 }
 
-static struct svc_rdma_recv_ctxt *
-svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
+/**
+ * svc_rdma_recv_ctxt_get - Allocate a recv_ctxt
+ * @rdma: controlling svcxprt_rdma
+ *
+ * Returns a recv_ctxt or (rarely) NULL if none are available.
+ */
+struct svc_rdma_recv_ctxt *svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
 {
 	struct svc_rdma_recv_ctxt *ctxt;
 	struct llist_node *node;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index fd8d62b1e640..b21beaa0114e 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -531,6 +531,45 @@ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
 				     offset_in_page(base), len);
 }
 
+struct svc_rdma_pullup_data {
+	u8		*pd_dest;
+	unsigned int	pd_length;
+	unsigned int	pd_num_sges;
+};
+
+/**
+ * svc_rdma_xb_count_sges - Count how many SGEs will be needed
+ * @xdr: xdr_buf containing portion of an RPC message to transmit
+ * @data: pointer to arguments
+ *
+ * Returns:
+ *   Number of SGEs needed to Send the contents of @xdr inline
+ */
+static int svc_rdma_xb_count_sges(const struct xdr_buf *xdr,
+				  void *data)
+{
+	struct svc_rdma_pullup_data *args = data;
+	unsigned int remaining;
+	unsigned long offset;
+
+	if (xdr->head[0].iov_len)
+		++args->pd_num_sges;
+
+	offset = offset_in_page(xdr->page_base);
+	remaining = xdr->page_len;
+	while (remaining) {
+		++args->pd_num_sges;
+		remaining -= min_t(u32, PAGE_SIZE - offset, remaining);
+		offset = 0;
+	}
+
+	if (xdr->tail[0].iov_len)
+		++args->pd_num_sges;
+
+	args->pd_length += xdr->len;
+	return 0;
+}
+
 /**
  * svc_rdma_pull_up_needed - Determine whether to use pull-up
  * @rdma: controlling transport
@@ -539,50 +578,71 @@ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
  * @xdr: xdr_buf containing RPC message to transmit
  *
  * Returns:
- *	%true if pull-up must be used
- *	%false otherwise
+ *   %true if pull-up must be used
+ *   %false otherwise
  */
-static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
-				    struct svc_rdma_send_ctxt *sctxt,
+static bool svc_rdma_pull_up_needed(const struct svcxprt_rdma *rdma,
+				    const struct svc_rdma_send_ctxt *sctxt,
 				    const struct svc_rdma_recv_ctxt *rctxt,
-				    struct xdr_buf *xdr)
+				    const struct xdr_buf *xdr)
 {
-	bool write_chunk_present = rctxt && rctxt->rc_write_list;
-	int elements;
+	/* Resources needed for the transport header */
+	struct svc_rdma_pullup_data args = {
+		.pd_length	= sctxt->sc_hdrbuf.len,
+		.pd_num_sges	= 1,
+	};
+	int ret;
 
-	/* For small messages, copying bytes is cheaper than DMA mapping.
-	 */
-	if (!write_chunk_present &&
-	    sctxt->sc_hdrbuf.len + xdr->len < RPCRDMA_PULLUP_THRESH)
+	ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
+				      svc_rdma_xb_count_sges, &args);
+	if (ret < 0)
+		return false;
+
+	if (args.pd_length < RPCRDMA_PULLUP_THRESH)
 		return true;
+	return args.pd_num_sges >= rdma->sc_max_send_sges;
+}
 
-	/* Check whether the xdr_buf has more elements than can
-	 * fit in a single RDMA Send.
-	 */
-	/* xdr->head */
-	elements = 1;
-
-	/* xdr->pages */
-	if (!rctxt || !rctxt->rc_write_list) {
-		unsigned int remaining;
-		unsigned long pageoff;
-
-		pageoff = xdr->page_base & ~PAGE_MASK;
-		remaining = xdr->page_len;
-		while (remaining) {
-			++elements;
-			remaining -= min_t(u32, PAGE_SIZE - pageoff,
-					   remaining);
-			pageoff = 0;
-		}
+/**
+ * svc_rdma_xb_linearize - Copy region of xdr_buf to flat buffer
+ * @xdr: xdr_buf containing portion of an RPC message to copy
+ * @data: pointer to arguments
+ *
+ * Returns:
+ *   Always zero.
+ */
+static int svc_rdma_xb_linearize(const struct xdr_buf *xdr,
+				 void *data)
+{
+	struct svc_rdma_pullup_data *args = data;
+	unsigned int len, remaining;
+	unsigned long pageoff;
+	struct page **ppages;
+
+	if (xdr->head[0].iov_len) {
+		memcpy(args->pd_dest, xdr->head[0].iov_base, xdr->head[0].iov_len);
+		args->pd_dest += xdr->head[0].iov_len;
 	}
 
-	/* xdr->tail */
-	if (xdr->tail[0].iov_len)
-		++elements;
+	ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
+	pageoff = offset_in_page(xdr->page_base);
+	remaining = xdr->page_len;
+	while (remaining) {
+		len = min_t(u32, PAGE_SIZE - pageoff, remaining);
+		memcpy(args->pd_dest, page_address(*ppages) + pageoff, len);
+		remaining -= len;
+		args->pd_dest += len;
+		pageoff = 0;
+		ppages++;
+	}
 
-	/* assume 1 SGE is needed for the transport header */
-	return elements >= rdma->sc_max_send_sges;
+	if (xdr->tail[0].iov_len) {
+		memcpy(args->pd_dest, xdr->tail[0].iov_base, xdr->tail[0].iov_len);
+		args->pd_dest += xdr->tail[0].iov_len;
+	}
+
+	args->pd_length += xdr->len;
+	return 0;
 }
 
 /**
@@ -595,54 +655,30 @@ static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
  * The device is not capable of sending the reply directly.
  * Assemble the elements of @xdr into the transport header buffer.
  *
- * Returns zero on success, or a negative errno on failure.
+ * Assumptions:
+ *  pull_up_needed has determined that @xdr will fit in the buffer.
+ *
+ * Returns:
+ *   %0 if pull-up was successful
+ *   %-EMSGSIZE if a buffer manipulation problem occurred
  */
-static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
+static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma,
 				      struct svc_rdma_send_ctxt *sctxt,
 				      const struct svc_rdma_recv_ctxt *rctxt,
 				      const struct xdr_buf *xdr)
 {
-	unsigned char *dst, *tailbase;
-	unsigned int taillen;
-
-	dst = sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len;
-	memcpy(dst, xdr->head[0].iov_base, xdr->head[0].iov_len);
-	dst += xdr->head[0].iov_len;
-
-	tailbase = xdr->tail[0].iov_base;
-	taillen = xdr->tail[0].iov_len;
-	if (rctxt && rctxt->rc_write_list) {
-		u32 xdrpad;
-
-		xdrpad = xdr_pad_size(xdr->page_len);
-		if (taillen && xdrpad) {
-			tailbase += xdrpad;
-			taillen -= xdrpad;
-		}
-	} else {
-		unsigned int len, remaining;
-		unsigned long pageoff;
-		struct page **ppages;
-
-		ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
-		pageoff = xdr->page_base & ~PAGE_MASK;
-		remaining = xdr->page_len;
-		while (remaining) {
-			len = min_t(u32, PAGE_SIZE - pageoff, remaining);
-
-			memcpy(dst, page_address(*ppages) + pageoff, len);
-			remaining -= len;
-			dst += len;
-			pageoff = 0;
-			ppages++;
-		}
-	}
+	struct svc_rdma_pullup_data args = {
+		.pd_dest	= sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len,
+	};
+	int ret;
 
-	if (taillen)
-		memcpy(dst, tailbase, taillen);
+	ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
+				      svc_rdma_xb_linearize, &args);
+	if (ret < 0)
+		return ret;
 
-	sctxt->sc_sges[0].length += xdr->len;
-	trace_svcrdma_send_pullup(sctxt->sc_sges[0].length);
+	sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len + args.pd_length;
+	trace_svcrdma_send_pullup(sctxt, args.pd_length);
 	return 0;
 }
 



^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 14/20] svcrdma: Support multiple Write chunks in svc_rdma_map_reply_msg()
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
                   ` (12 preceding siblings ...)
  2020-10-26 18:55 ` [PATCH 13/20] svcrdma: Support multiple write chunks when pulling up Chuck Lever
@ 2020-10-26 18:55 ` Chuck Lever
  2020-10-26 18:55 ` [PATCH 15/20] svcrdma: Support multiple Write chunks in svc_rdma_send_reply_chunk Chuck Lever
                   ` (6 subsequent siblings)
  20 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:55 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

Refactor: svc_rdma_map_reply_msg() is restructured to DMA map only
the parts of rq_res that do not contain a result payload.

This change has been tested to confirm that it does not cause a
regression in the no Write chunk and single Write chunk cases.
Multiple Write chunks have not been tested.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/linux/sunrpc/svc_rdma.h       |    2 
 include/trace/events/rpcrdma.h        |    1 
 net/sunrpc/xprtrdma/svc_rdma_sendto.c |  174 +++++++++++++++++++--------------
 3 files changed, 100 insertions(+), 77 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 7090af1a9791..e09fafba00d7 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -213,7 +213,7 @@ extern int svc_rdma_send(struct svcxprt_rdma *rdma,
 extern int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
 				  struct svc_rdma_send_ctxt *sctxt,
 				  const struct svc_rdma_recv_ctxt *rctxt,
-				  struct xdr_buf *xdr);
+				  const struct xdr_buf *xdr);
 extern void svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
 				    struct svc_rdma_send_ctxt *sctxt,
 				    struct svc_rdma_recv_ctxt *rctxt,
diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index afc58accb9cf..054dedd0280c 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -1687,6 +1687,7 @@ DECLARE_EVENT_CLASS(svcrdma_dma_map_class,
 				TP_ARGS(rdma, dma_addr, length))
 
 DEFINE_SVC_DMA_EVENT(dma_map_page);
+DEFINE_SVC_DMA_EVENT(dma_map_err);
 DEFINE_SVC_DMA_EVENT(dma_unmap_page);
 
 TRACE_EVENT(svcrdma_dma_map_rw_err,
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index b21beaa0114e..7d35bd6224ea 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -496,39 +496,111 @@ svc_rdma_encode_reply_chunk(struct svc_rdma_recv_ctxt *rctxt,
 	return svc_rdma_encode_write_chunk(sctxt, chunk);
 }
 
-static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
-				 struct svc_rdma_send_ctxt *ctxt,
-				 struct page *page,
-				 unsigned long offset,
-				 unsigned int len)
+struct svc_rdma_map_data {
+	struct svcxprt_rdma		*md_rdma;
+	struct svc_rdma_send_ctxt	*md_ctxt;
+};
+
+/**
+ * svc_rdma_page_dma_map - DMA map one page
+ * @data: pointer to arguments
+ * @page: struct page to DMA map
+ * @offset: offset into the page
+ * @len: number of bytes to map
+ *
+ * Returns:
+ *   %0 if DMA mapping was successful
+ *   %-EIO if the page cannot be DMA mapped
+ */
+static int svc_rdma_page_dma_map(void *data, struct page *page,
+				 unsigned long offset, unsigned int len)
 {
+	struct svc_rdma_map_data *args = data;
+	struct svcxprt_rdma *rdma = args->md_rdma;
+	struct svc_rdma_send_ctxt *ctxt = args->md_ctxt;
 	struct ib_device *dev = rdma->sc_cm_id->device;
 	dma_addr_t dma_addr;
 
+	++ctxt->sc_cur_sge_no;
+
 	dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
-	trace_svcrdma_dma_map_page(rdma, dma_addr, len);
 	if (ib_dma_mapping_error(dev, dma_addr))
 		goto out_maperr;
 
+	trace_svcrdma_dma_map_page(rdma, dma_addr, len);
 	ctxt->sc_sges[ctxt->sc_cur_sge_no].addr = dma_addr;
 	ctxt->sc_sges[ctxt->sc_cur_sge_no].length = len;
 	ctxt->sc_send_wr.num_sge++;
 	return 0;
 
 out_maperr:
+	trace_svcrdma_dma_map_err(rdma, dma_addr, len);
 	return -EIO;
 }
 
-/* ib_dma_map_page() is used here because svc_rdma_dma_unmap()
+/**
+ * svc_rdma_iov_dma_map - DMA map an iovec
+ * @data: pointer to arguments
+ * @iov: kvec to DMA map
+ *
+ * ib_dma_map_page() is used here because svc_rdma_dma_unmap()
  * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively.
+ *
+ * Returns:
+ *   %0 if DMA mapping was successful
+ *   %-EIO if the iovec cannot be DMA mapped
  */
-static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
-				struct svc_rdma_send_ctxt *ctxt,
-				unsigned char *base,
-				unsigned int len)
+static int svc_rdma_iov_dma_map(void *data, const struct kvec *iov)
 {
-	return svc_rdma_dma_map_page(rdma, ctxt, virt_to_page(base),
-				     offset_in_page(base), len);
+	if (!iov->iov_len)
+		return 0;
+	return svc_rdma_page_dma_map(data, virt_to_page(iov->iov_base),
+				     offset_in_page(iov->iov_base),
+				     iov->iov_len);
+}
+
+/**
+ * svc_rdma_xb_dma_map - DMA map all segments of an xdr_buf
+ * @xdr: xdr_buf containing portion of an RPC message to transmit
+ * @data: pointer to arguments
+ *
+ * Returns:
+ *   %0 if DMA mapping was successful
+ *   %-EIO if DMA mapping failed
+ *
+ * On failure, any DMA mappings that have been already done must be
+ * unmapped by the caller.
+ */
+static int svc_rdma_xb_dma_map(const struct xdr_buf *xdr, void *data)
+{
+	unsigned int len, remaining;
+	unsigned long pageoff;
+	struct page **ppages;
+	int ret;
+
+	ret = svc_rdma_iov_dma_map(data, &xdr->head[0]);
+	if (ret < 0)
+		return ret;
+
+	ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
+	pageoff = offset_in_page(xdr->page_base);
+	remaining = xdr->page_len;
+	while (remaining) {
+		len = min_t(u32, PAGE_SIZE - pageoff, remaining);
+
+		ret = svc_rdma_page_dma_map(data, *ppages++, pageoff, len);
+		if (ret < 0)
+			return ret;
+
+		remaining -= len;
+		pageoff = 0;
+	}
+
+	ret = svc_rdma_iov_dma_map(data, &xdr->tail[0]);
+	if (ret < 0)
+		return ret;
+
+	return xdr->len;
 }
 
 struct svc_rdma_pullup_data {
@@ -688,22 +760,22 @@ static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma,
  * @rctxt: Write and Reply chunks provided by client
  * @xdr: prepared xdr_buf containing RPC message
  *
- * Load the xdr_buf into the ctxt's sge array, and DMA map each
- * element as it is added. The Send WR's num_sge field is set.
+ * Returns:
+ *   %0 if DMA mapping was successful.
+ *   %-EMSGSIZE if a buffer manipulation problem occurred
+ *   %-EIO if DMA mapping failed
  *
- * Returns zero on success, or a negative errno on failure.
+ * The Send WR's num_sge field is set in all cases.
  */
 int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
 			   struct svc_rdma_send_ctxt *sctxt,
 			   const struct svc_rdma_recv_ctxt *rctxt,
-			   struct xdr_buf *xdr)
+			   const struct xdr_buf *xdr)
 {
-	unsigned int len, remaining;
-	unsigned long page_off;
-	struct page **ppages;
-	unsigned char *base;
-	u32 xdr_pad;
-	int ret;
+	struct svc_rdma_map_data args = {
+		.md_rdma	= rdma,
+		.md_ctxt	= sctxt,
+	};
 
 	/* Set up the (persistently-mapped) transport header SGE. */
 	sctxt->sc_send_wr.num_sge = 1;
@@ -712,7 +784,7 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
 	/* If there is a Reply chunk, nothing follows the transport
 	 * header, and we're done here.
 	 */
-	if (rctxt && rctxt->rc_reply_chunk)
+	if (!pcl_is_empty(&rctxt->rc_reply_pcl))
 		return 0;
 
 	/* For pull-up, svc_rdma_send() will sync the transport header.
@@ -721,58 +793,8 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
 	if (svc_rdma_pull_up_needed(rdma, sctxt, rctxt, xdr))
 		return svc_rdma_pull_up_reply_msg(rdma, sctxt, rctxt, xdr);
 
-	++sctxt->sc_cur_sge_no;
-	ret = svc_rdma_dma_map_buf(rdma, sctxt,
-				   xdr->head[0].iov_base,
-				   xdr->head[0].iov_len);
-	if (ret < 0)
-		return ret;
-
-	/* If a Write chunk is present, the xdr_buf's page list
-	 * is not included inline. However the Upper Layer may
-	 * have added XDR padding in the tail buffer, and that
-	 * should not be included inline.
-	 */
-	if (rctxt && rctxt->rc_write_list) {
-		base = xdr->tail[0].iov_base;
-		len = xdr->tail[0].iov_len;
-		xdr_pad = xdr_pad_size(xdr->page_len);
-
-		if (len && xdr_pad) {
-			base += xdr_pad;
-			len -= xdr_pad;
-		}
-
-		goto tail;
-	}
-
-	ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
-	page_off = xdr->page_base & ~PAGE_MASK;
-	remaining = xdr->page_len;
-	while (remaining) {
-		len = min_t(u32, PAGE_SIZE - page_off, remaining);
-
-		++sctxt->sc_cur_sge_no;
-		ret = svc_rdma_dma_map_page(rdma, sctxt, *ppages++,
-					    page_off, len);
-		if (ret < 0)
-			return ret;
-
-		remaining -= len;
-		page_off = 0;
-	}
-
-	base = xdr->tail[0].iov_base;
-	len = xdr->tail[0].iov_len;
-tail:
-	if (len) {
-		++sctxt->sc_cur_sge_no;
-		ret = svc_rdma_dma_map_buf(rdma, sctxt, base, len);
-		if (ret < 0)
-			return ret;
-	}
-
-	return 0;
+	return pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
+				       svc_rdma_xb_dma_map, &args);
 }
 
 /* The svc_rqst and all resources it owns are released as soon as



^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 15/20] svcrdma: Support multiple Write chunks in svc_rdma_send_reply_chunk
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
                   ` (13 preceding siblings ...)
  2020-10-26 18:55 ` [PATCH 14/20] svcrdma: Support multiple Write chunks in svc_rdma_map_reply_msg() Chuck Lever
@ 2020-10-26 18:55 ` Chuck Lever
  2020-10-26 18:55 ` [PATCH 16/20] svcrdma: Remove chunk list pointers Chuck Lever
                   ` (5 subsequent siblings)
  20 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:55 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

Refactor svc_rdma_send_reply_chunk() so that it Sends only the parts
of rq_res that do not contain a result payload.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/linux/sunrpc/svc_rdma.h   |    2 +-
 net/sunrpc/xprtrdma/svc_rdma_rw.c |   36 +++++++++---------------------------
 2 files changed, 10 insertions(+), 28 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index e09fafba00d7..85fbec47d4b5 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -200,7 +200,7 @@ extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
 				     const struct xdr_buf *xdr);
 extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
 				     const struct svc_rdma_recv_ctxt *rctxt,
-				     struct xdr_buf *xdr);
+				     const struct xdr_buf *xdr);
 
 /* svc_rdma_sendto.c */
 extern void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 05dd0896860f..4efa1fa3f6fb 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -535,7 +535,7 @@ static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
 /**
  * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf
  * @xdr: xdr_buf to write
- * @info: pointer to write arguments
+ * @data: pointer to write arguments
  *
  * Returns:
  *   On succes, returns zero
@@ -543,9 +543,9 @@ static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
  *   %-ENOMEM if a resource has been exhausted
  *   %-EIO if an rdma-rw error occurred
  */
-static int svc_rdma_xb_write(const struct xdr_buf *xdr,
-			     struct svc_rdma_write_info *info)
+static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data)
 {
+	struct svc_rdma_write_info *info = data;
 	int ret;
 
 	if (xdr->head[0].iov_len) {
@@ -625,11 +625,11 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
  */
 int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
 			      const struct svc_rdma_recv_ctxt *rctxt,
-			      struct xdr_buf *xdr)
+			      const struct xdr_buf *xdr)
 {
 	struct svc_rdma_write_info *info;
 	struct svc_rdma_chunk *chunk;
-	int consumed, ret;
+	int ret;
 
 	if (pcl_is_empty(&rctxt->rc_reply_pcl))
 		return 0;
@@ -639,35 +639,17 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
 	if (!info)
 		return -ENOMEM;
 
-	ret = svc_rdma_iov_write(info, &xdr->head[0]);
+	ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
+				      svc_rdma_xb_write, info);
 	if (ret < 0)
 		goto out_err;
-	consumed = xdr->head[0].iov_len;
-
-	/* Send the page list in the Reply chunk only if the
-	 * client did not provide Write chunks.
-	 */
-	if (pcl_is_empty(&rctxt->rc_write_pcl) && xdr->page_len) {
-		ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len,
-					   xdr->page_len);
-		if (ret < 0)
-			goto out_err;
-		consumed += xdr->page_len;
-	}
-
-	if (xdr->tail[0].iov_len) {
-		ret = svc_rdma_iov_write(info, &xdr->tail[0]);
-		if (ret < 0)
-			goto out_err;
-		consumed += xdr->tail[0].iov_len;
-	}
 
 	ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
 	if (ret < 0)
 		goto out_err;
 
-	trace_svcrdma_send_reply_chunk(consumed);
-	return consumed;
+	trace_svcrdma_send_reply_chunk(xdr->len);
+	return xdr->len;
 
 out_err:
 	svc_rdma_write_info_free(info);



^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 16/20] svcrdma: Remove chunk list pointers
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
                   ` (14 preceding siblings ...)
  2020-10-26 18:55 ` [PATCH 15/20] svcrdma: Support multiple Write chunks in svc_rdma_send_reply_chunk Chuck Lever
@ 2020-10-26 18:55 ` Chuck Lever
  2020-10-26 18:55 ` [PATCH 17/20] svcrdma: Clean up chunk tracepoints Chuck Lever
                   ` (4 subsequent siblings)
  20 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:55 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

Clean up: These pointers are no longer used.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/linux/sunrpc/svc_rdma.h         |    4 ----
 net/sunrpc/xprtrdma/svc_rdma_recvfrom.c |    8 +-------
 2 files changed, 1 insertion(+), 11 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 85fbec47d4b5..6f247d043731 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -149,12 +149,8 @@ struct svc_rdma_recv_ctxt {
 	struct svc_rdma_pcl	rc_call_pcl;
 
 	struct svc_rdma_pcl	rc_read_pcl;
-
-	__be32			*rc_write_list;
 	struct svc_rdma_chunk	*rc_cur_result_payload;
 	struct svc_rdma_pcl	rc_write_pcl;
-
-	__be32			*rc_reply_chunk;
 	struct svc_rdma_pcl	rc_reply_pcl;
 
 	struct page		*rc_pages[RPCSVC_MAXPAGES];
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index af32c3ad45a6..dd10b1de227d 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -540,17 +540,13 @@ static bool xdr_check_write_list(struct svc_rdma_recv_ctxt *rctxt)
 	p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
 	if (!p)
 		return false;
-
-	rctxt->rc_write_list = NULL;
 	if (!xdr_count_write_chunks(rctxt, p))
 		return false;
 	if (!pcl_alloc_write(rctxt, &rctxt->rc_write_pcl, p))
 		return false;
 
-	if (!pcl_is_empty(&rctxt->rc_write_pcl))
-		rctxt->rc_write_list = p;
 	rctxt->rc_cur_result_payload = pcl_first_chunk(&rctxt->rc_write_pcl);
-	return rctxt->rc_write_pcl.cl_count < 2;
+	return true;
 }
 
 /* Sanity check the Reply chunk.
@@ -573,13 +569,11 @@ static bool xdr_check_reply_chunk(struct svc_rdma_recv_ctxt *rctxt)
 	if (!p)
 		return false;
 
-	rctxt->rc_reply_chunk = NULL;
 	if (!xdr_item_is_present(p))
 		return true;
 	if (!xdr_check_write_chunk(rctxt))
 		return false;
 
-	rctxt->rc_reply_chunk = p;
 	rctxt->rc_reply_pcl.cl_count = 1;
 	return pcl_alloc_write(rctxt, &rctxt->rc_reply_pcl, p);
 }



^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 17/20] svcrdma: Clean up chunk tracepoints
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
                   ` (15 preceding siblings ...)
  2020-10-26 18:55 ` [PATCH 16/20] svcrdma: Remove chunk list pointers Chuck Lever
@ 2020-10-26 18:55 ` Chuck Lever
  2020-10-26 18:55 ` [PATCH 18/20] svcrdma: Rename info::ri_chunklen Chuck Lever
                   ` (3 subsequent siblings)
  20 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:55 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

We already have trace_svcrdma_decode_rseg(), which records each
ingress Read segment. Instead of reporting those again when they
are about to be posted as RDMA Reads, let's fire one tracepoint
before posting each type of chunk.

So we'll get:

        nfsd-1998  [002]   321.666615: svcrdma_decode_rseg:  cq.id=4 cid=42 segno=0 position=0 192@0x013ca9ebfae14000:0xb0010b05
        nfsd-1998  [002]   321.666615: svcrdma_decode_rseg:  cq.id=4 cid=42 segno=1 position=0 7688@0x013ca9ebf914e000:0xb0010a05
        nfsd-1998  [002]   321.666615: svcrdma_decode_rseg:  cq.id=4 cid=42 segno=2 position=0 28@0x013ca9ebfae15000:0xb0010905
        nfsd-1998  [002]   321.666622: svcrdma_decode_rqst:  cq.id=4 cid=42 xid=0x013ca9eb vers=1 credits=128 proc=RDMA_NOMSG hdrlen=100

        nfsd-1998  [002]   321.666642: svcrdma_post_read_chunk: cq.id=3 cid=112 sqecount=3

kworker/2:1H-221   [002]   321.673949: svcrdma_wc_read:      cq.id=3 cid=112 status=SUCCESS (0/0x0)

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/trace/events/rpcrdma.h        |  110 ++++-----------------------------
 net/sunrpc/xprtrdma/svc_rdma_rw.c     |   27 ++++----
 net/sunrpc/xprtrdma/svc_rdma_sendto.c |    2 -
 3 files changed, 26 insertions(+), 113 deletions(-)

diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index 054dedd0280c..896aafc37b09 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -1410,45 +1410,6 @@ DEFINE_BADREQ_EVENT(drop);
 DEFINE_BADREQ_EVENT(badproc);
 DEFINE_BADREQ_EVENT(parse);
 
-DECLARE_EVENT_CLASS(svcrdma_segment_event,
-	TP_PROTO(
-		u32 handle,
-		u32 length,
-		u64 offset
-	),
-
-	TP_ARGS(handle, length, offset),
-
-	TP_STRUCT__entry(
-		__field(u32, handle)
-		__field(u32, length)
-		__field(u64, offset)
-	),
-
-	TP_fast_assign(
-		__entry->handle = handle;
-		__entry->length = length;
-		__entry->offset = offset;
-	),
-
-	TP_printk("%u@0x%016llx:0x%08x",
-		__entry->length, (unsigned long long)__entry->offset,
-		__entry->handle
-	)
-);
-
-#define DEFINE_SEGMENT_EVENT(name)					\
-		DEFINE_EVENT(svcrdma_segment_event, svcrdma_##name,\
-				TP_PROTO(				\
-					u32 handle,			\
-					u32 length,			\
-					u64 offset			\
-				),					\
-				TP_ARGS(handle, length, offset))
-
-DEFINE_SEGMENT_EVENT(send_rseg);
-DEFINE_SEGMENT_EVENT(send_wseg);
-
 TRACE_EVENT(svcrdma_encode_wseg,
 	TP_PROTO(
 		const struct svc_rdma_send_ctxt *ctxt,
@@ -1558,62 +1519,6 @@ TRACE_EVENT(svcrdma_decode_wseg,
 	)
 );
 
-DECLARE_EVENT_CLASS(svcrdma_chunk_event,
-	TP_PROTO(
-		u32 length
-	),
-
-	TP_ARGS(length),
-
-	TP_STRUCT__entry(
-		__field(u32, length)
-	),
-
-	TP_fast_assign(
-		__entry->length = length;
-	),
-
-	TP_printk("length=%u",
-		__entry->length
-	)
-);
-
-#define DEFINE_CHUNK_EVENT(name)					\
-		DEFINE_EVENT(svcrdma_chunk_event, svcrdma_##name,	\
-				TP_PROTO(				\
-					u32 length			\
-				),					\
-				TP_ARGS(length))
-
-DEFINE_CHUNK_EVENT(send_pzr);
-DEFINE_CHUNK_EVENT(encode_write_chunk);
-DEFINE_CHUNK_EVENT(send_write_chunk);
-DEFINE_CHUNK_EVENT(encode_read_chunk);
-DEFINE_CHUNK_EVENT(send_reply_chunk);
-
-TRACE_EVENT(svcrdma_send_read_chunk,
-	TP_PROTO(
-		u32 length,
-		u32 position
-	),
-
-	TP_ARGS(length, position),
-
-	TP_STRUCT__entry(
-		__field(u32, length)
-		__field(u32, position)
-	),
-
-	TP_fast_assign(
-		__entry->length = length;
-		__entry->position = position;
-	),
-
-	TP_printk("length=%u position=%u",
-		__entry->length, __entry->position
-	)
-);
-
 DECLARE_EVENT_CLASS(svcrdma_error_event,
 	TP_PROTO(
 		__be32 xid
@@ -1936,7 +1841,7 @@ TRACE_EVENT(svcrdma_rq_post_err,
 	)
 );
 
-TRACE_EVENT(svcrdma_post_chunk,
+DECLARE_EVENT_CLASS(svcrdma_post_chunk_class,
 	TP_PROTO(
 		const struct rpc_rdma_cid *cid,
 		int sqecount
@@ -1962,6 +1867,19 @@ TRACE_EVENT(svcrdma_post_chunk,
 	)
 );
 
+#define DEFINE_POST_CHUNK_EVENT(name)					\
+		DEFINE_EVENT(svcrdma_post_chunk_class,			\
+				svcrdma_post_##name##_chunk,		\
+				TP_PROTO(				\
+					const struct rpc_rdma_cid *cid,	\
+					int sqecount			\
+				),					\
+				TP_ARGS(cid, sqecount))
+
+DEFINE_POST_CHUNK_EVENT(read);
+DEFINE_POST_CHUNK_EVENT(write);
+DEFINE_POST_CHUNK_EVENT(reply);
+
 DEFINE_COMPLETION_EVENT(svcrdma_wc_read);
 DEFINE_COMPLETION_EVENT(svcrdma_wc_write);
 
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 4efa1fa3f6fb..0de95207eaf1 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -358,7 +358,6 @@ static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc)
 	do {
 		if (atomic_sub_return(cc->cc_sqecount,
 				      &rdma->sc_sq_avail) > 0) {
-			trace_svcrdma_post_chunk(&cc->cc_cid, cc->cc_sqecount);
 			ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
 			if (ret)
 				break;
@@ -468,8 +467,6 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info,
 		if (ret < 0)
 			return -EIO;
 
-		trace_svcrdma_send_wseg(seg->rs_handle, write_len, offset);
-
 		list_add(&ctxt->rw_list, &cc->cc_rwctxts);
 		cc->cc_sqecount += ret;
 		if (write_len == seg->rs_length - info->wi_seg_off) {
@@ -588,21 +585,22 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
 			      const struct xdr_buf *xdr)
 {
 	struct svc_rdma_write_info *info;
+	struct svc_rdma_chunk_ctxt *cc;
 	int ret;
 
 	info = svc_rdma_write_info_alloc(rdma, chunk);
 	if (!info)
 		return -ENOMEM;
+	cc = &info->wi_cc;
 
 	ret = svc_rdma_xb_write(xdr, info);
 	if (ret != xdr->len)
 		goto out_err;
 
-	ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
+	trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
+	ret = svc_rdma_post_chunk_ctxt(cc);
 	if (ret < 0)
 		goto out_err;
-
-	trace_svcrdma_send_write_chunk(xdr->page_len);
 	return xdr->len;
 
 out_err:
@@ -628,6 +626,7 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
 			      const struct xdr_buf *xdr)
 {
 	struct svc_rdma_write_info *info;
+	struct svc_rdma_chunk_ctxt *cc;
 	struct svc_rdma_chunk *chunk;
 	int ret;
 
@@ -638,17 +637,18 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
 	info = svc_rdma_write_info_alloc(rdma, chunk);
 	if (!info)
 		return -ENOMEM;
+	cc = &info->wi_cc;
 
 	ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
 				      svc_rdma_xb_write, info);
 	if (ret < 0)
 		goto out_err;
 
-	ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
+	trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount);
+	ret = svc_rdma_post_chunk_ctxt(cc);
 	if (ret < 0)
 		goto out_err;
 
-	trace_svcrdma_send_reply_chunk(xdr->len);
 	return xdr->len;
 
 out_err:
@@ -735,10 +735,8 @@ static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
 		if (ret < 0)
 			break;
 
-		trace_svcrdma_send_rseg(handle, length, offset);
 		info->ri_chunklen += length;
 	}
-
 	return ret;
 }
 
@@ -760,8 +758,6 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
 	if (ret < 0)
 		goto out;
 
-	trace_svcrdma_send_read_chunk(info->ri_chunklen, info->ri_position);
-
 	head->rc_hdr_count = 0;
 
 	/* Split the Receive buffer between the head and tail
@@ -816,8 +812,6 @@ static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp,
 	if (ret < 0)
 		goto out;
 
-	trace_svcrdma_send_pzr(info->ri_chunklen);
-
 	head->rc_arg.len += info->ri_chunklen;
 	head->rc_arg.buflen += info->ri_chunklen;
 
@@ -874,6 +868,7 @@ int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
 			     struct svc_rdma_recv_ctxt *head, __be32 *p)
 {
 	struct svc_rdma_read_info *info;
+	struct svc_rdma_chunk_ctxt *cc;
 	int ret;
 
 	/* The request (with page list) is constructed in
@@ -891,6 +886,7 @@ int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
 	info = svc_rdma_read_info_alloc(rdma);
 	if (!info)
 		return -ENOMEM;
+	cc = &info->ri_cc;
 	info->ri_readctxt = head;
 	info->ri_pageno = 0;
 	info->ri_pageoff = 0;
@@ -903,7 +899,8 @@ int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
 	if (ret < 0)
 		goto out_err;
 
-	ret = svc_rdma_post_chunk_ctxt(&info->ri_cc);
+	trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount);
+	ret = svc_rdma_post_chunk_ctxt(cc);
 	if (ret < 0)
 		goto out_err;
 	svc_rdma_save_io_pages(rqstp, 0, head->rc_page_count);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 7d35bd6224ea..035eb99b8ede 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -411,8 +411,6 @@ static ssize_t svc_rdma_encode_write_chunk(struct svc_rdma_send_ctxt *sctxt,
 	unsigned int segno;
 	ssize_t len, ret;
 
-	trace_svcrdma_encode_write_chunk(remaining);
-
 	len = 0;
 	ret = xdr_stream_encode_item_present(&sctxt->sc_stream);
 	if (ret < 0)



^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 18/20] svcrdma: Rename info::ri_chunklen
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
                   ` (16 preceding siblings ...)
  2020-10-26 18:55 ` [PATCH 17/20] svcrdma: Clean up chunk tracepoints Chuck Lever
@ 2020-10-26 18:55 ` Chuck Lever
  2020-10-26 18:55 ` [PATCH 19/20] svcrdma: Use the new parsed chunk list when pulling Read chunks Chuck Lever
                   ` (2 subsequent siblings)
  20 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:55 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

I'm about to change the purpose of ri_chunklen: Instead of tracking
the number of bytes in one Read chunk, it will track the total
number of bytes in the Read list. Rename it for clarity.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 net/sunrpc/xprtrdma/svc_rdma_rw.c |   31 +++++++++++++++----------------
 1 file changed, 15 insertions(+), 16 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 0de95207eaf1..104b1d5a2203 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -262,7 +262,7 @@ struct svc_rdma_read_info {
 	unsigned int			ri_position;
 	unsigned int			ri_pageno;
 	unsigned int			ri_pageoff;
-	unsigned int			ri_chunklen;
+	unsigned int			ri_totalbytes;
 
 	struct svc_rdma_chunk_ctxt	ri_cc;
 };
@@ -724,7 +724,6 @@ static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
 	int ret;
 
 	ret = -EINVAL;
-	info->ri_chunklen = 0;
 	while (*p++ != xdr_zero && be32_to_cpup(p++) == info->ri_position) {
 		u32 handle, length;
 		u64 offset;
@@ -735,7 +734,7 @@ static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
 		if (ret < 0)
 			break;
 
-		info->ri_chunklen += length;
+		info->ri_totalbytes += length;
 	}
 	return ret;
 }
@@ -752,6 +751,8 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
 					    __be32 *p)
 {
 	struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
+	struct xdr_buf *buf = &head->rc_arg;
+	unsigned int length;
 	int ret;
 
 	ret = svc_rdma_build_read_chunk(rqstp, info, p);
@@ -780,11 +781,10 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
 	 * Currently these chunks always start at page offset 0,
 	 * thus the rounded-up length never crosses a page boundary.
 	 */
-	info->ri_chunklen = XDR_QUADLEN(info->ri_chunklen) << 2;
-
-	head->rc_arg.page_len = info->ri_chunklen;
-	head->rc_arg.len += info->ri_chunklen;
-	head->rc_arg.buflen += info->ri_chunklen;
+	length = XDR_QUADLEN(info->ri_totalbytes) << 2;
+	buf->page_len = length;
+	buf->len += length;
+	buf->buflen += length;
 
 out:
 	return ret;
@@ -806,22 +806,20 @@ static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp,
 					__be32 *p)
 {
 	struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
+	struct xdr_buf *buf = &head->rc_arg;
 	int ret;
 
 	ret = svc_rdma_build_read_chunk(rqstp, info, p);
 	if (ret < 0)
 		goto out;
 
-	head->rc_arg.len += info->ri_chunklen;
-	head->rc_arg.buflen += info->ri_chunklen;
+	buf->len += info->ri_totalbytes;
+	buf->buflen += info->ri_totalbytes;
 
 	head->rc_hdr_count = 1;
-	head->rc_arg.head[0].iov_base = page_address(head->rc_pages[0]);
-	head->rc_arg.head[0].iov_len = min_t(size_t, PAGE_SIZE,
-					     info->ri_chunklen);
-
-	head->rc_arg.page_len = info->ri_chunklen -
-				head->rc_arg.head[0].iov_len;
+	buf->head[0].iov_base = page_address(head->rc_pages[0]);
+	buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes);
+	buf->page_len = info->ri_totalbytes - buf->head[0].iov_len;
 
 out:
 	return ret;
@@ -890,6 +888,7 @@ int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
 	info->ri_readctxt = head;
 	info->ri_pageno = 0;
 	info->ri_pageoff = 0;
+	info->ri_totalbytes = 0;
 
 	info->ri_position = be32_to_cpup(p + 1);
 	if (info->ri_position)



^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 19/20] svcrdma: Use the new parsed chunk list when pulling Read chunks
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
                   ` (17 preceding siblings ...)
  2020-10-26 18:55 ` [PATCH 18/20] svcrdma: Rename info::ri_chunklen Chuck Lever
@ 2020-10-26 18:55 ` Chuck Lever
  2020-10-26 18:55 ` [PATCH 20/20] svcrdma: support multiple Read chunks per RPC Chuck Lever
  2020-10-27  6:08 ` [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Leon Romanovsky
  20 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:55 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

As a pre-requisite for handling multiple Read chunks in each Read
list, convert svc_rdma_recv_read_chunk() to use the new parsed Read
chunk list.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/linux/sunrpc/svc_rdma.h         |    6 +
 net/sunrpc/xprtrdma/svc_rdma_recvfrom.c |   16 +--
 net/sunrpc/xprtrdma/svc_rdma_rw.c       |  160 ++++++++++++++++++++-----------
 3 files changed, 111 insertions(+), 71 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 6f247d043731..294b56e61522 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -188,15 +188,15 @@ extern int svc_rdma_recvfrom(struct svc_rqst *);
 
 /* svc_rdma_rw.c */
 extern void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma);
-extern int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma,
-				    struct svc_rqst *rqstp,
-				    struct svc_rdma_recv_ctxt *head, __be32 *p);
 extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
 				     const struct svc_rdma_chunk *chunk,
 				     const struct xdr_buf *xdr);
 extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
 				     const struct svc_rdma_recv_ctxt *rctxt,
 				     const struct xdr_buf *xdr);
+extern int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
+				      struct svc_rqst *rqstp,
+				      struct svc_rdma_recv_ctxt *head);
 
 /* svc_rdma_sendto.c */
 extern void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index dd10b1de227d..cbdb71247755 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -824,7 +824,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
 	struct svcxprt_rdma *rdma_xprt =
 		container_of(xprt, struct svcxprt_rdma, sc_xprt);
 	struct svc_rdma_recv_ctxt *ctxt;
-	__be32 *p;
 	int ret;
 
 	rqstp->rq_xprt_ctxt = NULL;
@@ -857,7 +856,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
 	rqstp->rq_respages = rqstp->rq_pages;
 	rqstp->rq_next_page = rqstp->rq_respages;
 
-	p = (__be32 *)rqstp->rq_arg.head[0].iov_base;
 	ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg, ctxt);
 	if (ret < 0)
 		goto out_err;
@@ -870,9 +868,9 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
 
 	svc_rdma_get_inv_rkey(rdma_xprt, ctxt);
 
-	p += rpcrdma_fixed_maxsz;
-	if (*p != xdr_zero)
-		goto out_readchunk;
+	if (!pcl_is_empty(&ctxt->rc_read_pcl) ||
+	    !pcl_is_empty(&ctxt->rc_call_pcl))
+		goto out_readlist;
 
 complete:
 	rqstp->rq_xprt_ctxt = ctxt;
@@ -880,10 +878,10 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
 	svc_xprt_copy_addrs(rqstp, xprt);
 	return rqstp->rq_arg.len;
 
-out_readchunk:
-	ret = svc_rdma_recv_read_chunk(rdma_xprt, rqstp, ctxt, p);
+out_readlist:
+	ret = svc_rdma_process_read_list(rdma_xprt, rqstp, ctxt);
 	if (ret < 0)
-		goto out_postfail;
+		goto out_readfail;
 	return 0;
 
 out_err:
@@ -891,7 +889,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
 	svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
 	return 0;
 
-out_postfail:
+out_readfail:
 	if (ret == -EINVAL)
 		svc_rdma_send_error(rdma_xprt, ctxt, ret);
 	svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 104b1d5a2203..6ec7bdc7b4d3 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -258,8 +258,8 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
 /* State for pulling a Read chunk.
  */
 struct svc_rdma_read_info {
+	struct svc_rqst			*ri_rqst;
 	struct svc_rdma_recv_ctxt	*ri_readctxt;
-	unsigned int			ri_position;
 	unsigned int			ri_pageno;
 	unsigned int			ri_pageoff;
 	unsigned int			ri_totalbytes;
@@ -656,17 +656,29 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
 	return ret;
 }
 
+/**
+ * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment
+ * @info: context for ongoing I/O
+ * @segment: co-ordinates of remote memory to be read
+ *
+ * Returns:
+ *   %0: the Read WR chain was constructed successfully
+ *   %-EINVAL: there were not enough rq_pages to finish
+ *   %-ENOMEM: allocating a local resources failed
+ *   %-EIO: a DMA mapping error occurred
+ */
 static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
-				       struct svc_rqst *rqstp,
-				       u32 rkey, u32 len, u64 offset)
+				       const struct svc_rdma_segment *segment)
 {
 	struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 	struct svc_rdma_chunk_ctxt *cc = &info->ri_cc;
+	struct svc_rqst *rqstp = info->ri_rqst;
 	struct svc_rdma_rw_ctxt *ctxt;
-	unsigned int sge_no, seg_len;
+	unsigned int sge_no, seg_len, len;
 	struct scatterlist *sg;
 	int ret;
 
+	len = segment->rs_length;
 	sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT;
 	ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no);
 	if (!ctxt)
@@ -700,8 +712,8 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
 			goto out_overrun;
 	}
 
-	ret = svc_rdma_rw_ctx_init(cc->cc_rdma, ctxt, offset, rkey,
-				   DMA_FROM_DEVICE);
+	ret = svc_rdma_rw_ctx_init(cc->cc_rdma, ctxt, segment->rs_offset,
+				   segment->rs_handle, DMA_FROM_DEVICE);
 	if (ret < 0)
 		return -EIO;
 
@@ -714,48 +726,63 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
 	return -EINVAL;
 }
 
-/* Walk the segments in the Read chunk starting at @p and construct
- * RDMA Read operations to pull the chunk to the server.
+/**
+ * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk
+ * @info: context for ongoing I/O
+ * @chunk: Read chunk to pull
+ *
+ * Return values:
+ *   %0: the Read WR chain was constructed successfully
+ *   %-EINVAL: there were not enough resources to finish
+ *   %-ENOMEM: allocating a local resources failed
+ *   %-EIO: a DMA mapping error occurred
  */
-static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
-				     struct svc_rdma_read_info *info,
-				     __be32 *p)
+static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info,
+				     const struct svc_rdma_chunk *chunk)
 {
+	const struct svc_rdma_segment *segment;
 	int ret;
 
 	ret = -EINVAL;
-	while (*p++ != xdr_zero && be32_to_cpup(p++) == info->ri_position) {
-		u32 handle, length;
-		u64 offset;
-
-		p = xdr_decode_rdma_segment(p, &handle, &length, &offset);
-		ret = svc_rdma_build_read_segment(info, rqstp, handle, length,
-						  offset);
+	pcl_for_each_segment(segment, chunk) {
+		ret = svc_rdma_build_read_segment(info, segment);
 		if (ret < 0)
 			break;
-
-		info->ri_totalbytes += length;
+		info->ri_totalbytes += segment->rs_length;
 	}
 	return ret;
 }
 
-/* Construct RDMA Reads to pull over a normal Read chunk. The chunk
- * data lands in the page list of head->rc_arg.pages.
+/**
+ * svc_rdma_read_data_items - Construct RDMA Reads to pull data item Read chunks
+ * @info: context for RDMA Reads
+ *
+ * The chunk data lands in the page list of head->rc_arg.pages.
  *
  * Currently NFSD does not look at the head->rc_arg.tail[0] iovec.
  * Therefore, XDR round-up of the Read chunk and trailing
  * inline content must both be added at the end of the pagelist.
+ *
+ * Return values:
+ *   %0: RDMA Read WQEs were successfully built
+ *   %-EINVAL: client provided too many chunks or segments,
+ *   %-ENOMEM: rdma_rw context pool was exhausted,
+ *   %-ENOTCONN: posting failed (connection is lost),
+ *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
  */
-static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
-					    struct svc_rdma_read_info *info,
-					    __be32 *p)
+static int svc_rdma_read_data_items(struct svc_rdma_read_info *info)
 {
 	struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 	struct xdr_buf *buf = &head->rc_arg;
+	struct svc_rdma_chunk *chunk;
 	unsigned int length;
 	int ret;
 
-	ret = svc_rdma_build_read_chunk(rqstp, info, p);
+	if (head->rc_read_pcl.cl_count > 1)
+		return -EINVAL;
+
+	chunk = pcl_first_chunk(&head->rc_read_pcl);
+	ret = svc_rdma_build_read_chunk(info, chunk);
 	if (ret < 0)
 		goto out;
 
@@ -766,11 +793,9 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
 	 * chunk is not included in either the pagelist or in
 	 * the tail.
 	 */
-	head->rc_arg.tail[0].iov_base =
-		head->rc_arg.head[0].iov_base + info->ri_position;
-	head->rc_arg.tail[0].iov_len =
-		head->rc_arg.head[0].iov_len - info->ri_position;
-	head->rc_arg.head[0].iov_len = info->ri_position;
+	buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position;
+	buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position;
+	buf->head[0].iov_len = chunk->ch_position;
 
 	/* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2).
 	 *
@@ -790,26 +815,36 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
 	return ret;
 }
 
-/* Construct RDMA Reads to pull over a Position Zero Read chunk.
- * The start of the data lands in the first page just after
- * the Transport header, and the rest lands in the page list of
+/**
+ * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message
+ * @info: context for RDMA Reads
+ *
+ * The start of the data lands in the first page just after the
+ * Transport header, and the rest lands in the page list of
  * head->rc_arg.pages.
  *
  * Assumptions:
- *	- A PZRC has an XDR-aligned length (no implicit round-up).
- *	- There can be no trailing inline content (IOW, we assume
- *	  a PZRC is never sent in an RDMA_MSG message, though it's
- *	  allowed by spec).
+ *	- A PZRC is never sent in an RDMA_MSG message, though it's
+ *	  allowed by spec.
+ *
+ * Return values:
+ *   %0: RDMA Read WQEs were successfully built
+ *   %-EINVAL: client provided too many chunks or segments,
+ *   %-ENOMEM: rdma_rw context pool was exhausted,
+ *   %-ENOTCONN: posting failed (connection is lost),
+ *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
  */
-static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp,
-					struct svc_rdma_read_info *info,
-					__be32 *p)
+static int svc_rdma_read_special(struct svc_rdma_read_info *info)
 {
 	struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 	struct xdr_buf *buf = &head->rc_arg;
 	int ret;
 
-	ret = svc_rdma_build_read_chunk(rqstp, info, p);
+	if (head->rc_call_pcl.cl_count > 1)
+		return -EINVAL;
+
+	ret = svc_rdma_build_read_chunk(info,
+					pcl_first_chunk(&head->rc_call_pcl));
 	if (ret < 0)
 		goto out;
 
@@ -846,24 +881,31 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
 }
 
 /**
- * svc_rdma_recv_read_chunk - Pull a Read chunk from the client
+ * svc_rdma_process_read_list - Pull list of Read chunks from the client
  * @rdma: controlling RDMA transport
  * @rqstp: set of pages to use as Read sink buffers
  * @head: pages under I/O collect here
- * @p: pointer to start of Read chunk
  *
- * Returns:
- *	%0 if all needed RDMA Reads were posted successfully,
- *	%-EINVAL if client provided too many segments,
- *	%-ENOMEM if rdma_rw context pool was exhausted,
- *	%-ENOTCONN if posting failed (connection is lost),
- *	%-EIO if rdma_rw initialization failed (DMA mapping, etc).
+ * The RPC/RDMA protocol assumes that the upper layer's XDR decoders
+ * pull each Read chunk as they decode an incoming RPC message.
  *
- * Assumptions:
- * - All Read segments in @p have the same Position value.
+ * On Linux, however, the server needs to have a fully-constructed RPC
+ * message in rqstp->rq_arg when there is a positive return code from
+ * ->xpo_recvfrom. So the Read list is safety-checked immediately when
+ * it is received, then here the whole Read list is pulled all at once.
+ * The ingress RPC message is fully reconstructed once all associated
+ * RDMA Reads have completed.
+ *
+ * Return values:
+ *   %1: all needed RDMA Reads were posted successfully,
+ *   %-EINVAL: client provided too many chunks or segments,
+ *   %-ENOMEM: rdma_rw context pool was exhausted,
+ *   %-ENOTCONN: posting failed (connection is lost),
+ *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
  */
-int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
-			     struct svc_rdma_recv_ctxt *head, __be32 *p)
+int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
+			       struct svc_rqst *rqstp,
+			       struct svc_rdma_recv_ctxt *head)
 {
 	struct svc_rdma_read_info *info;
 	struct svc_rdma_chunk_ctxt *cc;
@@ -885,16 +927,16 @@ int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
 	if (!info)
 		return -ENOMEM;
 	cc = &info->ri_cc;
+	info->ri_rqst = rqstp;
 	info->ri_readctxt = head;
 	info->ri_pageno = 0;
 	info->ri_pageoff = 0;
 	info->ri_totalbytes = 0;
 
-	info->ri_position = be32_to_cpup(p + 1);
-	if (info->ri_position)
-		ret = svc_rdma_build_normal_read_chunk(rqstp, info, p);
+	if (pcl_is_empty(&head->rc_call_pcl))
+		ret = svc_rdma_read_data_items(info);
 	else
-		ret = svc_rdma_build_pz_read_chunk(rqstp, info, p);
+		ret = svc_rdma_read_special(info);
 	if (ret < 0)
 		goto out_err;
 
@@ -903,7 +945,7 @@ int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
 	if (ret < 0)
 		goto out_err;
 	svc_rdma_save_io_pages(rqstp, 0, head->rc_page_count);
-	return 0;
+	return 1;
 
 out_err:
 	svc_rdma_read_info_free(info);



^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [PATCH 20/20] svcrdma: support multiple Read chunks per RPC
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
                   ` (18 preceding siblings ...)
  2020-10-26 18:55 ` [PATCH 19/20] svcrdma: Use the new parsed chunk list when pulling Read chunks Chuck Lever
@ 2020-10-26 18:55 ` Chuck Lever
  2020-10-27  6:08 ` [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Leon Romanovsky
  20 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-26 18:55 UTC (permalink / raw)
  To: linux-nfs, linux-rdma

An efficient way to handle multiple Read chunks is to post them all
together and then take a single completion. This is also how the
code is already structured: when the Read completion fires, all
portions of the incoming RPC message are available to be assembled.

The difficult problem is setting up the Read sink buffers so that
the server pulls the client's data into place, making subsequent
pull-up unnecessary. There are several cases:

* No Read chunks. No-op.

* One data item Read chunk. This is the fast case, where the inline
  part of the RPC-over-RDMA message becomes the head and tail, and
  the data item chunk is placed in buf->pages.

* A Position-zero Read chunk. Treated like TCP: the Read chunk is
  pulled into contiguous pages.

+ A Position-zero Read chunk with data item chunks. Treated like
  TCP: all of the Read chunks are pulled into contiguous pages.

+ Multiple data item chunks. Treated like TCP: the inline part is
  copied and the data item chunks are pulled into contiguous pages.

The "*" cases are already supported. This patch adds support for the
"+" cases.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 net/sunrpc/xprtrdma/svc_rdma_rw.c |  236 +++++++++++++++++++++++++++++++++++--
 1 file changed, 222 insertions(+), 14 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 6ec7bdc7b4d3..12aa4c53b48f 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -754,7 +754,121 @@ static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info,
 }
 
 /**
- * svc_rdma_read_data_items - Construct RDMA Reads to pull data item Read chunks
+ * svc_rdma_copy_inline_range - Copy part of the inline content into pages
+ * @info: context for RDMA Reads
+ * @offset: offset into the Receive buffer of region to copy
+ * @remaining: length of region to copy
+ *
+ * Take a page at a time from rqstp->rq_pages and copy the inline
+ * content from the Receive buffer into that page. Update
+ * info->ri_pageno and info->ri_pageoff so that the next RDMA Read
+ * result will land contiguously with the copied content.
+ *
+ * Return values:
+ *   %0: Inline content was successfully copied
+ *   %-EINVAL: offset or length was incorrect
+ */
+static int svc_rdma_copy_inline_range(struct svc_rdma_read_info *info,
+				      unsigned int offset,
+				      unsigned int remaining)
+{
+	struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
+	unsigned char *dst, *src = head->rc_recv_buf;
+	struct svc_rqst *rqstp = info->ri_rqst;
+	unsigned int page_no, numpages;
+
+	numpages = PAGE_ALIGN(info->ri_pageoff + remaining) >> PAGE_SHIFT;
+	for (page_no = 0; page_no < numpages; page_no++) {
+		unsigned int page_len;
+
+		page_len = min_t(unsigned int, remaining,
+				 PAGE_SIZE - info->ri_pageoff);
+
+		head->rc_arg.pages[info->ri_pageno] =
+			rqstp->rq_pages[info->ri_pageno];
+		if (!info->ri_pageoff)
+			head->rc_page_count++;
+
+		dst = page_address(head->rc_arg.pages[info->ri_pageno]);
+		memcpy(dst + info->ri_pageno, src + offset, page_len);
+
+		info->ri_totalbytes += page_len;
+		info->ri_pageoff += page_len;
+		if (info->ri_pageoff == PAGE_SIZE) {
+			info->ri_pageno++;
+			info->ri_pageoff = 0;
+		}
+		remaining -= page_len;
+		offset += page_len;
+	}
+
+	return -EINVAL;
+}
+
+/**
+ * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks
+ * @info: context for RDMA Reads
+ *
+ * The chunk data lands in head->rc_arg as a series of contiguous pages,
+ * like an incoming TCP call.
+ *
+ * Return values:
+ *   %0: RDMA Read WQEs were successfully built
+ *   %-EINVAL: client provided too many chunks or segments,
+ *   %-ENOMEM: rdma_rw context pool was exhausted,
+ *   %-ENOTCONN: posting failed (connection is lost),
+ *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
+ */
+static noinline int svc_rdma_read_multiple_chunks(struct svc_rdma_read_info *info)
+{
+	struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
+	const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
+	struct svc_rdma_chunk *chunk, *next;
+	struct xdr_buf *buf = &head->rc_arg;
+	unsigned int start, length;
+	int ret;
+
+	start = 0;
+	chunk = pcl_first_chunk(pcl);
+	length = chunk->ch_position;
+	ret = svc_rdma_copy_inline_range(info, start, length);
+	if (ret < 0)
+		return ret;
+
+	pcl_for_each_chunk(chunk, pcl) {
+		ret = svc_rdma_build_read_chunk(info, chunk);
+		if (ret < 0)
+			return ret;
+
+		next = pcl_next_chunk(pcl, chunk);
+		if (!next)
+			break;
+
+		start += length;
+		length = next->ch_position - info->ri_totalbytes;
+		ret = svc_rdma_copy_inline_range(info, start, length);
+		if (ret < 0)
+			return ret;
+	}
+
+	start += length;
+	length = head->rc_byte_len - start;
+	ret = svc_rdma_copy_inline_range(info, start, length);
+	if (ret < 0)
+		return ret;
+
+	buf->len += info->ri_totalbytes;
+	buf->buflen += info->ri_totalbytes;
+
+	head->rc_hdr_count = 1;
+	buf->head[0].iov_base = page_address(head->rc_pages[0]);
+	buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes);
+	buf->page_len = info->ri_totalbytes - buf->head[0].iov_len;
+	return 0;
+}
+
+/**
+ * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks
  * @info: context for RDMA Reads
  *
  * The chunk data lands in the page list of head->rc_arg.pages.
@@ -770,7 +884,7 @@ static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info,
  *   %-ENOTCONN: posting failed (connection is lost),
  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
  */
-static int svc_rdma_read_data_items(struct svc_rdma_read_info *info)
+static int svc_rdma_read_data_item(struct svc_rdma_read_info *info)
 {
 	struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 	struct xdr_buf *buf = &head->rc_arg;
@@ -778,9 +892,6 @@ static int svc_rdma_read_data_items(struct svc_rdma_read_info *info)
 	unsigned int length;
 	int ret;
 
-	if (head->rc_read_pcl.cl_count > 1)
-		return -EINVAL;
-
 	chunk = pcl_first_chunk(&head->rc_read_pcl);
 	ret = svc_rdma_build_read_chunk(info, chunk);
 	if (ret < 0)
@@ -815,6 +926,104 @@ static int svc_rdma_read_data_items(struct svc_rdma_read_info *info)
 	return ret;
 }
 
+/**
+ * svc_rdma_read_chunk_range - Build RDMA Read WQEs for portion of a chunk
+ * @info: context for RDMA Reads
+ * @chunk: parsed Call chunk to pull
+ * @offset: offset of region to pull
+ * @length: length of region to pull
+ *
+ * Return values:
+ *   %0: RDMA Read WQEs were successfully built
+ *   %-EINVAL: there were not enough resources to finish
+ *   %-ENOMEM: rdma_rw context pool was exhausted,
+ *   %-ENOTCONN: posting failed (connection is lost),
+ *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
+ */
+static int svc_rdma_read_chunk_range(struct svc_rdma_read_info *info,
+				     const struct svc_rdma_chunk *chunk,
+				     unsigned int offset, unsigned int length)
+{
+	const struct svc_rdma_segment *segment;
+	int ret;
+
+	ret = -EINVAL;
+	pcl_for_each_segment(segment, chunk) {
+		struct svc_rdma_segment dummy;
+
+		if (offset > segment->rs_length) {
+			offset -= segment->rs_length;
+			continue;
+		}
+
+		dummy.rs_handle = segment->rs_handle;
+		dummy.rs_length = min_t(u32, length, segment->rs_length) - offset;
+		dummy.rs_offset = segment->rs_offset + offset;
+
+		ret = svc_rdma_build_read_segment(info, &dummy);
+		if (ret < 0)
+			break;
+
+		info->ri_totalbytes += dummy.rs_length;
+		length -= dummy.rs_length;
+		offset = 0;
+	}
+	return ret;
+}
+
+/**
+ * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message
+ * @info: context for RDMA Reads
+ *
+ * Return values:
+ *   %0: RDMA Read WQEs were successfully built
+ *   %-EINVAL: there were not enough resources to finish
+ *   %-ENOMEM: rdma_rw context pool was exhausted,
+ *   %-ENOTCONN: posting failed (connection is lost),
+ *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
+ */
+static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info)
+{
+	struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
+	const struct svc_rdma_chunk *call_chunk =
+			pcl_first_chunk(&head->rc_call_pcl);
+	const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
+	struct svc_rdma_chunk *chunk, *next;
+	unsigned int start, length;
+	int ret;
+
+	if (pcl_is_empty(pcl))
+		return svc_rdma_build_read_chunk(info, call_chunk);
+
+	start = 0;
+	chunk = pcl_first_chunk(pcl);
+	length = chunk->ch_position;
+	ret = svc_rdma_read_chunk_range(info, call_chunk, start, length);
+	if (ret < 0)
+		return ret;
+
+	pcl_for_each_chunk(chunk, pcl) {
+		ret = svc_rdma_build_read_chunk(info, chunk);
+		if (ret < 0)
+			return ret;
+
+		next = pcl_next_chunk(pcl, chunk);
+		if (!next)
+			break;
+
+		start += length;
+		length = next->ch_position - info->ri_totalbytes;
+		ret = svc_rdma_read_chunk_range(info, call_chunk,
+						start, length);
+		if (ret < 0)
+			return ret;
+	}
+
+	start += length;
+	length = call_chunk->ch_length - start;
+	return svc_rdma_read_chunk_range(info, call_chunk, start, length);
+}
+
 /**
  * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message
  * @info: context for RDMA Reads
@@ -834,17 +1043,13 @@ static int svc_rdma_read_data_items(struct svc_rdma_read_info *info)
  *   %-ENOTCONN: posting failed (connection is lost),
  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
  */
-static int svc_rdma_read_special(struct svc_rdma_read_info *info)
+static noinline int svc_rdma_read_special(struct svc_rdma_read_info *info)
 {
 	struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 	struct xdr_buf *buf = &head->rc_arg;
 	int ret;
 
-	if (head->rc_call_pcl.cl_count > 1)
-		return -EINVAL;
-
-	ret = svc_rdma_build_read_chunk(info,
-					pcl_first_chunk(&head->rc_call_pcl));
+	ret = svc_rdma_read_call_chunk(info);
 	if (ret < 0)
 		goto out;
 
@@ -933,9 +1138,12 @@ int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
 	info->ri_pageoff = 0;
 	info->ri_totalbytes = 0;
 
-	if (pcl_is_empty(&head->rc_call_pcl))
-		ret = svc_rdma_read_data_items(info);
-	else
+	if (pcl_is_empty(&head->rc_call_pcl)) {
+		if (head->rc_read_pcl.cl_count == 1)
+			ret = svc_rdma_read_data_item(info);
+		else
+			ret = svc_rdma_read_multiple_chunks(info);
+	} else
 		ret = svc_rdma_read_special(info);
 	if (ret < 0)
 		goto out_err;



^ permalink raw reply related	[flat|nested] 29+ messages in thread

* Re: [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks
  2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
                   ` (19 preceding siblings ...)
  2020-10-26 18:55 ` [PATCH 20/20] svcrdma: support multiple Read chunks per RPC Chuck Lever
@ 2020-10-27  6:08 ` Leon Romanovsky
  2020-10-27 13:24   ` Chuck Lever
  20 siblings, 1 reply; 29+ messages in thread
From: Leon Romanovsky @ 2020-10-27  6:08 UTC (permalink / raw)
  To: Chuck Lever; +Cc: linux-nfs, linux-rdma

On Mon, Oct 26, 2020 at 02:53:53PM -0400, Chuck Lever wrote:
> This series implements support for multiple RPC/RDMA chunks per RPC
> transaction. This is one of the few remaining generalities that the
> Linux NFS/RDMA server implementation lacks.
>
> There is currently one known NFS/RDMA client implementation that can
> send multiple chunks per RPC, and that is Solaris. Multiple chunks
> are rare enough that the Linux NFS/RDMA implementation has been
> successful without this support for many years.

So why do we need it? Solaris is dead, and like you wrote Linux systems
work without this feature just fine, what are the benefits? Who will use it?

Thanks

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks
  2020-10-27  6:08 ` [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Leon Romanovsky
@ 2020-10-27 13:24   ` Chuck Lever
  2020-10-27 17:25     ` J. Bruce Fields
  2020-10-28  7:16     ` Leon Romanovsky
  0 siblings, 2 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-27 13:24 UTC (permalink / raw)
  To: Leon Romanovsky; +Cc: Linux NFS Mailing List, linux-rdma

Hi Leon-

> On Oct 27, 2020, at 2:08 AM, Leon Romanovsky <leon@kernel.org> wrote:
> 
> On Mon, Oct 26, 2020 at 02:53:53PM -0400, Chuck Lever wrote:
>> This series implements support for multiple RPC/RDMA chunks per RPC
>> transaction. This is one of the few remaining generalities that the
>> Linux NFS/RDMA server implementation lacks.
>> 
>> There is currently one known NFS/RDMA client implementation that can
>> send multiple chunks per RPC, and that is Solaris. Multiple chunks
>> are rare enough that the Linux NFS/RDMA implementation has been
>> successful without this support for many years.
> 
> So why do we need it? Solaris is dead, and like you wrote Linux systems
> work without this feature just fine, what are the benefits? Who will use it?

The Linux NFS implementation is living. We can add the ability
to provision multiple chunks per RPC to the Linux NFS client at
any time.

Likewise any actively developed NFS/RDMA implementation can add
this feature. The RPC/RDMA version 1 protocol does not have the
ability to communicate the maximum number of chunks the server
will accept per RPC.

Other server implementations do support multiple chunks per RPC.
The Linux NFS/RDMA server implementation has always been incomplete
in this regard.

And the Linux NFS server implementation (the non-transport specific
part) already supports multiple data payloads per NFSv4 COMPOUND.


Restoring a little more of the cover letter:

>> Along with multiple chunk support, this series adds the following
>> benefits:
>> 
>> - More robust input sanitization of RPC/RDMA headers
>> - An internal representation of chunks that is agnostic to their
>>  wire format

The Linux NFS/RDMA server implementation does need to have better
input sanitization.

And there is a version 2 of RPC/RDMA under active development:

https://datatracker.ietf.org/doc/draft-ietf-nfsv4-rpcrdma-version-two/

Having some protocol version agnosticism in our transport might
be necessary eventually.

--
Chuck Lever




^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks
  2020-10-27 13:24   ` Chuck Lever
@ 2020-10-27 17:25     ` J. Bruce Fields
  2020-10-27 17:29       ` Chuck Lever
  2020-10-28  7:16     ` Leon Romanovsky
  1 sibling, 1 reply; 29+ messages in thread
From: J. Bruce Fields @ 2020-10-27 17:25 UTC (permalink / raw)
  To: Chuck Lever; +Cc: Leon Romanovsky, Linux NFS Mailing List, linux-rdma

On Tue, Oct 27, 2020 at 09:24:54AM -0400, Chuck Lever wrote:
> Hi Leon-
> 
> > On Oct 27, 2020, at 2:08 AM, Leon Romanovsky <leon@kernel.org> wrote:
> > 
> > On Mon, Oct 26, 2020 at 02:53:53PM -0400, Chuck Lever wrote:
> >> This series implements support for multiple RPC/RDMA chunks per RPC
> >> transaction. This is one of the few remaining generalities that the
> >> Linux NFS/RDMA server implementation lacks.
> >> 
> >> There is currently one known NFS/RDMA client implementation that can
> >> send multiple chunks per RPC, and that is Solaris. Multiple chunks
> >> are rare enough that the Linux NFS/RDMA implementation has been
> >> successful without this support for many years.
> > 
> > So why do we need it? Solaris is dead, and like you wrote Linux systems
> > work without this feature just fine, what are the benefits? Who will use it?
> 
> The Linux NFS implementation is living. We can add the ability
> to provision multiple chunks per RPC to the Linux NFS client at
> any time.
> 
> Likewise any actively developed NFS/RDMA implementation can add
> this feature. The RPC/RDMA version 1 protocol does not have the
> ability to communicate the maximum number of chunks the server
> will accept per RPC.
> 
> Other server implementations do support multiple chunks per RPC.
> The Linux NFS/RDMA server implementation has always been incomplete
> in this regard.

Can the client can detect the server's lack of support and fall back, or
does the server's incompleteness violate the RFC in some way that can
actually cause a failure to interoperate?

--b.

> And the Linux NFS server implementation (the non-transport specific
> part) already supports multiple data payloads per NFSv4 COMPOUND.
> 
> 
> Restoring a little more of the cover letter:
> 
> >> Along with multiple chunk support, this series adds the following
> >> benefits:
> >> 
> >> - More robust input sanitization of RPC/RDMA headers
> >> - An internal representation of chunks that is agnostic to their
> >>  wire format
> 
> The Linux NFS/RDMA server implementation does need to have better
> input sanitization.
> 
> And there is a version 2 of RPC/RDMA under active development:
> 
> https://datatracker.ietf.org/doc/draft-ietf-nfsv4-rpcrdma-version-two/
> 
> Having some protocol version agnosticism in our transport might
> be necessary eventually.
> 
> --
> Chuck Lever
> 
> 

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks
  2020-10-27 17:25     ` J. Bruce Fields
@ 2020-10-27 17:29       ` Chuck Lever
  0 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-27 17:29 UTC (permalink / raw)
  To: Bruce Fields; +Cc: Leon Romanovsky, Linux NFS Mailing List, linux-rdma



> On Oct 27, 2020, at 1:25 PM, bfields@fieldses.org wrote:
> 
> On Tue, Oct 27, 2020 at 09:24:54AM -0400, Chuck Lever wrote:
>> Hi Leon-
>> 
>>> On Oct 27, 2020, at 2:08 AM, Leon Romanovsky <leon@kernel.org> wrote:
>>> 
>>> On Mon, Oct 26, 2020 at 02:53:53PM -0400, Chuck Lever wrote:
>>>> This series implements support for multiple RPC/RDMA chunks per RPC
>>>> transaction. This is one of the few remaining generalities that the
>>>> Linux NFS/RDMA server implementation lacks.
>>>> 
>>>> There is currently one known NFS/RDMA client implementation that can
>>>> send multiple chunks per RPC, and that is Solaris. Multiple chunks
>>>> are rare enough that the Linux NFS/RDMA implementation has been
>>>> successful without this support for many years.
>>> 
>>> So why do we need it? Solaris is dead, and like you wrote Linux systems
>>> work without this feature just fine, what are the benefits? Who will use it?
>> 
>> The Linux NFS implementation is living. We can add the ability
>> to provision multiple chunks per RPC to the Linux NFS client at
>> any time.
>> 
>> Likewise any actively developed NFS/RDMA implementation can add
>> this feature. The RPC/RDMA version 1 protocol does not have the
>> ability to communicate the maximum number of chunks the server
>> will accept per RPC.
>> 
>> Other server implementations do support multiple chunks per RPC.
>> The Linux NFS/RDMA server implementation has always been incomplete
>> in this regard.
> 
> Can the client can detect the server's lack of support and fall back, or
> does the server's incompleteness violate the RFC in some way that can
> actually cause a failure to interoperate?

The latter. Currently the client has no way to detect that our server
does not comply with RFC 8166, which places no arbitrary limits on
the number of chunks per RPC.

If a client attempts to send more than one chunk the RPC fails (or
worse). RPC/RDMA version 1 does not provide a way to indicate that
the failure was because the client sent too many chunks, so the
client has to terminate the RPC transaction with an error.


>> And the Linux NFS server implementation (the non-transport specific
>> part) already supports multiple data payloads per NFSv4 COMPOUND.
>> 
>> 
>> Restoring a little more of the cover letter:
>> 
>>>> Along with multiple chunk support, this series adds the following
>>>> benefits:
>>>> 
>>>> - More robust input sanitization of RPC/RDMA headers
>>>> - An internal representation of chunks that is agnostic to their
>>>> wire format
>> 
>> The Linux NFS/RDMA server implementation does need to have better
>> input sanitization.
>> 
>> And there is a version 2 of RPC/RDMA under active development:
>> 
>> https://datatracker.ietf.org/doc/draft-ietf-nfsv4-rpcrdma-version-two/
>> 
>> Having some protocol version agnosticism in our transport might
>> be necessary eventually.
>> 
>> --
>> Chuck Lever

--
Chuck Lever




^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [PATCH 04/20] SUNRPC: Rename svc_encode_read_payload()
  2020-10-26 18:54 ` [PATCH 04/20] SUNRPC: Rename svc_encode_read_payload() Chuck Lever
@ 2020-10-27 20:53   ` J. Bruce Fields
  2020-10-28 13:16     ` Chuck Lever
  0 siblings, 1 reply; 29+ messages in thread
From: J. Bruce Fields @ 2020-10-27 20:53 UTC (permalink / raw)
  To: Chuck Lever; +Cc: linux-nfs, linux-rdma

On Mon, Oct 26, 2020 at 02:54:14PM -0400, Chuck Lever wrote:
> Clean up: "result payload" is a less confusing name for these
> payloads. "READ payload" reflects only the NFS usage.
> 
> Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
> ---
>  fs/nfsd/nfs4xdr.c                        |    2 +-
>  include/linux/sunrpc/svc.h               |    6 +++---
>  include/linux/sunrpc/svc_rdma.h          |    4 ++--
>  include/linux/sunrpc/svc_xprt.h          |    4 ++--
>  net/sunrpc/svc.c                         |   11 ++++++-----
>  net/sunrpc/svcsock.c                     |    8 ++++----
>  net/sunrpc/xprtrdma/svc_rdma_sendto.c    |    8 ++++----
>  net/sunrpc/xprtrdma/svc_rdma_transport.c |    2 +-
>  8 files changed, 23 insertions(+), 22 deletions(-)
> 
> diff --git a/fs/nfsd/nfs4xdr.c b/fs/nfsd/nfs4xdr.c
> index 833a2c64dfe8..7e24fb3ca36e 100644
> --- a/fs/nfsd/nfs4xdr.c
> +++ b/fs/nfsd/nfs4xdr.c
> @@ -3829,7 +3829,7 @@ static __be32 nfsd4_encode_readv(struct nfsd4_compoundres *resp,
>  	read->rd_length = maxcount;
>  	if (nfserr)
>  		return nfserr;
> -	if (svc_encode_read_payload(resp->rqstp, starting_len + 8, maxcount))
> +	if (svc_encode_result_payload(resp->rqstp, starting_len + 8, maxcount))
>  		return nfserr_io;

Why does this call check for an error return while the
svc_encode_result_payload() calls in the next patch don't?

--b.

>  	xdr_truncate_encode(xdr, starting_len + 8 + xdr_align_size(maxcount));
>  
> diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
> index 386628b36bc7..c220b734fa69 100644
> --- a/include/linux/sunrpc/svc.h
> +++ b/include/linux/sunrpc/svc.h
> @@ -519,9 +519,9 @@ void		   svc_wake_up(struct svc_serv *);
>  void		   svc_reserve(struct svc_rqst *rqstp, int space);
>  struct svc_pool *  svc_pool_for_cpu(struct svc_serv *serv, int cpu);
>  char *		   svc_print_addr(struct svc_rqst *, char *, size_t);
> -int		   svc_encode_read_payload(struct svc_rqst *rqstp,
> -					   unsigned int offset,
> -					   unsigned int length);
> +int		   svc_encode_result_payload(struct svc_rqst *rqstp,
> +					     unsigned int offset,
> +					     unsigned int length);
>  unsigned int	   svc_fill_write_vector(struct svc_rqst *rqstp,
>  					 struct page **pages,
>  					 struct kvec *first, size_t total);
> diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
> index 9dc3a3b88391..2b870a3f391b 100644
> --- a/include/linux/sunrpc/svc_rdma.h
> +++ b/include/linux/sunrpc/svc_rdma.h
> @@ -207,8 +207,8 @@ extern void svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
>  				    struct svc_rdma_recv_ctxt *rctxt,
>  				    int status);
>  extern int svc_rdma_sendto(struct svc_rqst *);
> -extern int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
> -				 unsigned int length);
> +extern int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
> +				   unsigned int length);
>  
>  /* svc_rdma_transport.c */
>  extern struct svc_xprt_class svc_rdma_class;
> diff --git a/include/linux/sunrpc/svc_xprt.h b/include/linux/sunrpc/svc_xprt.h
> index aca35ab5cff2..92455e0d5244 100644
> --- a/include/linux/sunrpc/svc_xprt.h
> +++ b/include/linux/sunrpc/svc_xprt.h
> @@ -21,8 +21,8 @@ struct svc_xprt_ops {
>  	int		(*xpo_has_wspace)(struct svc_xprt *);
>  	int		(*xpo_recvfrom)(struct svc_rqst *);
>  	int		(*xpo_sendto)(struct svc_rqst *);
> -	int		(*xpo_read_payload)(struct svc_rqst *, unsigned int,
> -					    unsigned int);
> +	int		(*xpo_result_payload)(struct svc_rqst *, unsigned int,
> +					      unsigned int);
>  	void		(*xpo_release_rqst)(struct svc_rqst *);
>  	void		(*xpo_detach)(struct svc_xprt *);
>  	void		(*xpo_free)(struct svc_xprt *);
> diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
> index c211b607239e..b41500645c3f 100644
> --- a/net/sunrpc/svc.c
> +++ b/net/sunrpc/svc.c
> @@ -1622,7 +1622,7 @@ u32 svc_max_payload(const struct svc_rqst *rqstp)
>  EXPORT_SYMBOL_GPL(svc_max_payload);
>  
>  /**
> - * svc_encode_read_payload - mark a range of bytes as a READ payload
> + * svc_encode_result_payload - mark a range of bytes as a result payload
>   * @rqstp: svc_rqst to operate on
>   * @offset: payload's byte offset in rqstp->rq_res
>   * @length: size of payload, in bytes
> @@ -1630,12 +1630,13 @@ EXPORT_SYMBOL_GPL(svc_max_payload);
>   * Returns zero on success, or a negative errno if a permanent
>   * error occurred.
>   */
> -int svc_encode_read_payload(struct svc_rqst *rqstp, unsigned int offset,
> -			    unsigned int length)
> +int svc_encode_result_payload(struct svc_rqst *rqstp, unsigned int offset,
> +			      unsigned int length)
>  {
> -	return rqstp->rq_xprt->xpt_ops->xpo_read_payload(rqstp, offset, length);
> +	return rqstp->rq_xprt->xpt_ops->xpo_result_payload(rqstp, offset,
> +							   length);
>  }
> -EXPORT_SYMBOL_GPL(svc_encode_read_payload);
> +EXPORT_SYMBOL_GPL(svc_encode_result_payload);
>  
>  /**
>   * svc_fill_write_vector - Construct data argument for VFS write call
> diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
> index c2752e2b9ce3..b248f2349437 100644
> --- a/net/sunrpc/svcsock.c
> +++ b/net/sunrpc/svcsock.c
> @@ -181,8 +181,8 @@ static void svc_set_cmsg_data(struct svc_rqst *rqstp, struct cmsghdr *cmh)
>  	}
>  }
>  
> -static int svc_sock_read_payload(struct svc_rqst *rqstp, unsigned int offset,
> -				 unsigned int length)
> +static int svc_sock_result_payload(struct svc_rqst *rqstp, unsigned int offset,
> +				   unsigned int length)
>  {
>  	return 0;
>  }
> @@ -635,7 +635,7 @@ static const struct svc_xprt_ops svc_udp_ops = {
>  	.xpo_create = svc_udp_create,
>  	.xpo_recvfrom = svc_udp_recvfrom,
>  	.xpo_sendto = svc_udp_sendto,
> -	.xpo_read_payload = svc_sock_read_payload,
> +	.xpo_result_payload = svc_sock_result_payload,
>  	.xpo_release_rqst = svc_udp_release_rqst,
>  	.xpo_detach = svc_sock_detach,
>  	.xpo_free = svc_sock_free,
> @@ -1123,7 +1123,7 @@ static const struct svc_xprt_ops svc_tcp_ops = {
>  	.xpo_create = svc_tcp_create,
>  	.xpo_recvfrom = svc_tcp_recvfrom,
>  	.xpo_sendto = svc_tcp_sendto,
> -	.xpo_read_payload = svc_sock_read_payload,
> +	.xpo_result_payload = svc_sock_result_payload,
>  	.xpo_release_rqst = svc_tcp_release_rqst,
>  	.xpo_detach = svc_tcp_sock_detach,
>  	.xpo_free = svc_sock_free,
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> index c3d588b149aa..c8411b4f3492 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> @@ -979,19 +979,19 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
>  }
>  
>  /**
> - * svc_rdma_read_payload - special processing for a READ payload
> + * svc_rdma_result_payload - special processing for a result payload
>   * @rqstp: svc_rqst to operate on
>   * @offset: payload's byte offset in @xdr
>   * @length: size of payload, in bytes
>   *
>   * Returns zero on success.
>   *
> - * For the moment, just record the xdr_buf location of the READ
> + * For the moment, just record the xdr_buf location of the result
>   * payload. svc_rdma_sendto will use that location later when
>   * we actually send the payload.
>   */
> -int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
> -			  unsigned int length)
> +int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
> +			    unsigned int length)
>  {
>  	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
>  
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> index fb044792b571..afba4e9d5425 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> @@ -80,7 +80,7 @@ static const struct svc_xprt_ops svc_rdma_ops = {
>  	.xpo_create = svc_rdma_create,
>  	.xpo_recvfrom = svc_rdma_recvfrom,
>  	.xpo_sendto = svc_rdma_sendto,
> -	.xpo_read_payload = svc_rdma_read_payload,
> +	.xpo_result_payload = svc_rdma_result_payload,
>  	.xpo_release_rqst = svc_rdma_release_rqst,
>  	.xpo_detach = svc_rdma_detach,
>  	.xpo_free = svc_rdma_free,
> 

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks
  2020-10-27 13:24   ` Chuck Lever
  2020-10-27 17:25     ` J. Bruce Fields
@ 2020-10-28  7:16     ` Leon Romanovsky
  2020-10-28 13:10       ` Chuck Lever
  1 sibling, 1 reply; 29+ messages in thread
From: Leon Romanovsky @ 2020-10-28  7:16 UTC (permalink / raw)
  To: Chuck Lever; +Cc: Linux NFS Mailing List, linux-rdma

On Tue, Oct 27, 2020 at 09:24:54AM -0400, Chuck Lever wrote:
> Hi Leon-
>
> > On Oct 27, 2020, at 2:08 AM, Leon Romanovsky <leon@kernel.org> wrote:
> >
> > On Mon, Oct 26, 2020 at 02:53:53PM -0400, Chuck Lever wrote:
> >> This series implements support for multiple RPC/RDMA chunks per RPC
> >> transaction. This is one of the few remaining generalities that the
> >> Linux NFS/RDMA server implementation lacks.
> >>
> >> There is currently one known NFS/RDMA client implementation that can
> >> send multiple chunks per RPC, and that is Solaris. Multiple chunks
> >> are rare enough that the Linux NFS/RDMA implementation has been
> >> successful without this support for many years.
> >
> > So why do we need it? Solaris is dead, and like you wrote Linux systems
> > work without this feature just fine, what are the benefits? Who will use it?
>
> The Linux NFS implementation is living. We can add the ability
> to provision multiple chunks per RPC to the Linux NFS client at
> any time.
>
> Likewise any actively developed NFS/RDMA implementation can add
> this feature. The RPC/RDMA version 1 protocol does not have the
> ability to communicate the maximum number of chunks the server
> will accept per RPC.
>
> Other server implementations do support multiple chunks per RPC.
> The Linux NFS/RDMA server implementation has always been incomplete
> in this regard.
>
> And the Linux NFS server implementation (the non-transport specific
> part) already supports multiple data payloads per NFSv4 COMPOUND.

Thanks, I just got different feeling then I read the cover letter.
You presented it like no one needs this feature.

Thanks

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks
  2020-10-28  7:16     ` Leon Romanovsky
@ 2020-10-28 13:10       ` Chuck Lever
  0 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-28 13:10 UTC (permalink / raw)
  To: Leon Romanovsky; +Cc: Linux NFS Mailing List, linux-rdma



> On Oct 28, 2020, at 3:16 AM, Leon Romanovsky <leon@kernel.org> wrote:
> 
> On Tue, Oct 27, 2020 at 09:24:54AM -0400, Chuck Lever wrote:
>> Hi Leon-
>> 
>>> On Oct 27, 2020, at 2:08 AM, Leon Romanovsky <leon@kernel.org> wrote:
>>> 
>>> On Mon, Oct 26, 2020 at 02:53:53PM -0400, Chuck Lever wrote:
>>>> This series implements support for multiple RPC/RDMA chunks per RPC
>>>> transaction. This is one of the few remaining generalities that the
>>>> Linux NFS/RDMA server implementation lacks.
>>>> 
>>>> There is currently one known NFS/RDMA client implementation that can
>>>> send multiple chunks per RPC, and that is Solaris. Multiple chunks
>>>> are rare enough that the Linux NFS/RDMA implementation has been
>>>> successful without this support for many years.
>>> 
>>> So why do we need it? Solaris is dead, and like you wrote Linux systems
>>> work without this feature just fine, what are the benefits? Who will use it?
>> 
>> The Linux NFS implementation is living. We can add the ability
>> to provision multiple chunks per RPC to the Linux NFS client at
>> any time.
>> 
>> Likewise any actively developed NFS/RDMA implementation can add
>> this feature. The RPC/RDMA version 1 protocol does not have the
>> ability to communicate the maximum number of chunks the server
>> will accept per RPC.
>> 
>> Other server implementations do support multiple chunks per RPC.
>> The Linux NFS/RDMA server implementation has always been incomplete
>> in this regard.
>> 
>> And the Linux NFS server implementation (the non-transport specific
>> part) already supports multiple data payloads per NFSv4 COMPOUND.
> 
> Thanks, I just got different feeling then I read the cover letter.
> You presented it like no one needs this feature.

Understood. I'll incorporate a summary of the content of this thread
in the cover letter for the next version of the series.

--
Chuck Lever




^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [PATCH 04/20] SUNRPC: Rename svc_encode_read_payload()
  2020-10-27 20:53   ` J. Bruce Fields
@ 2020-10-28 13:16     ` Chuck Lever
  0 siblings, 0 replies; 29+ messages in thread
From: Chuck Lever @ 2020-10-28 13:16 UTC (permalink / raw)
  To: Bruce Fields; +Cc: Linux NFS Mailing List, linux-rdma



> On Oct 27, 2020, at 4:53 PM, bfields@fieldses.org wrote:
> 
> On Mon, Oct 26, 2020 at 02:54:14PM -0400, Chuck Lever wrote:
>> Clean up: "result payload" is a less confusing name for these
>> payloads. "READ payload" reflects only the NFS usage.
>> 
>> Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
>> ---
>> fs/nfsd/nfs4xdr.c                        |    2 +-
>> include/linux/sunrpc/svc.h               |    6 +++---
>> include/linux/sunrpc/svc_rdma.h          |    4 ++--
>> include/linux/sunrpc/svc_xprt.h          |    4 ++--
>> net/sunrpc/svc.c                         |   11 ++++++-----
>> net/sunrpc/svcsock.c                     |    8 ++++----
>> net/sunrpc/xprtrdma/svc_rdma_sendto.c    |    8 ++++----
>> net/sunrpc/xprtrdma/svc_rdma_transport.c |    2 +-
>> 8 files changed, 23 insertions(+), 22 deletions(-)
>> 
>> diff --git a/fs/nfsd/nfs4xdr.c b/fs/nfsd/nfs4xdr.c
>> index 833a2c64dfe8..7e24fb3ca36e 100644
>> --- a/fs/nfsd/nfs4xdr.c
>> +++ b/fs/nfsd/nfs4xdr.c
>> @@ -3829,7 +3829,7 @@ static __be32 nfsd4_encode_readv(struct nfsd4_compoundres *resp,
>> 	read->rd_length = maxcount;
>> 	if (nfserr)
>> 		return nfserr;
>> -	if (svc_encode_read_payload(resp->rqstp, starting_len + 8, maxcount))
>> +	if (svc_encode_result_payload(resp->rqstp, starting_len + 8, maxcount))
>> 		return nfserr_io;
> 
> Why does this call check for an error return while the
> svc_encode_result_payload() calls in the next patch don't?

Very likely an oversight. I will ensure the next patch
properly incorporates return code checking.


> 
> --b.
> 
>> 	xdr_truncate_encode(xdr, starting_len + 8 + xdr_align_size(maxcount));
>> 
>> diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
>> index 386628b36bc7..c220b734fa69 100644
>> --- a/include/linux/sunrpc/svc.h
>> +++ b/include/linux/sunrpc/svc.h
>> @@ -519,9 +519,9 @@ void		   svc_wake_up(struct svc_serv *);
>> void		   svc_reserve(struct svc_rqst *rqstp, int space);
>> struct svc_pool *  svc_pool_for_cpu(struct svc_serv *serv, int cpu);
>> char *		   svc_print_addr(struct svc_rqst *, char *, size_t);
>> -int		   svc_encode_read_payload(struct svc_rqst *rqstp,
>> -					   unsigned int offset,
>> -					   unsigned int length);
>> +int		   svc_encode_result_payload(struct svc_rqst *rqstp,
>> +					     unsigned int offset,
>> +					     unsigned int length);
>> unsigned int	   svc_fill_write_vector(struct svc_rqst *rqstp,
>> 					 struct page **pages,
>> 					 struct kvec *first, size_t total);
>> diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
>> index 9dc3a3b88391..2b870a3f391b 100644
>> --- a/include/linux/sunrpc/svc_rdma.h
>> +++ b/include/linux/sunrpc/svc_rdma.h
>> @@ -207,8 +207,8 @@ extern void svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
>> 				    struct svc_rdma_recv_ctxt *rctxt,
>> 				    int status);
>> extern int svc_rdma_sendto(struct svc_rqst *);
>> -extern int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
>> -				 unsigned int length);
>> +extern int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
>> +				   unsigned int length);
>> 
>> /* svc_rdma_transport.c */
>> extern struct svc_xprt_class svc_rdma_class;
>> diff --git a/include/linux/sunrpc/svc_xprt.h b/include/linux/sunrpc/svc_xprt.h
>> index aca35ab5cff2..92455e0d5244 100644
>> --- a/include/linux/sunrpc/svc_xprt.h
>> +++ b/include/linux/sunrpc/svc_xprt.h
>> @@ -21,8 +21,8 @@ struct svc_xprt_ops {
>> 	int		(*xpo_has_wspace)(struct svc_xprt *);
>> 	int		(*xpo_recvfrom)(struct svc_rqst *);
>> 	int		(*xpo_sendto)(struct svc_rqst *);
>> -	int		(*xpo_read_payload)(struct svc_rqst *, unsigned int,
>> -					    unsigned int);
>> +	int		(*xpo_result_payload)(struct svc_rqst *, unsigned int,
>> +					      unsigned int);
>> 	void		(*xpo_release_rqst)(struct svc_rqst *);
>> 	void		(*xpo_detach)(struct svc_xprt *);
>> 	void		(*xpo_free)(struct svc_xprt *);
>> diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
>> index c211b607239e..b41500645c3f 100644
>> --- a/net/sunrpc/svc.c
>> +++ b/net/sunrpc/svc.c
>> @@ -1622,7 +1622,7 @@ u32 svc_max_payload(const struct svc_rqst *rqstp)
>> EXPORT_SYMBOL_GPL(svc_max_payload);
>> 
>> /**
>> - * svc_encode_read_payload - mark a range of bytes as a READ payload
>> + * svc_encode_result_payload - mark a range of bytes as a result payload
>>  * @rqstp: svc_rqst to operate on
>>  * @offset: payload's byte offset in rqstp->rq_res
>>  * @length: size of payload, in bytes
>> @@ -1630,12 +1630,13 @@ EXPORT_SYMBOL_GPL(svc_max_payload);
>>  * Returns zero on success, or a negative errno if a permanent
>>  * error occurred.
>>  */
>> -int svc_encode_read_payload(struct svc_rqst *rqstp, unsigned int offset,
>> -			    unsigned int length)
>> +int svc_encode_result_payload(struct svc_rqst *rqstp, unsigned int offset,
>> +			      unsigned int length)
>> {
>> -	return rqstp->rq_xprt->xpt_ops->xpo_read_payload(rqstp, offset, length);
>> +	return rqstp->rq_xprt->xpt_ops->xpo_result_payload(rqstp, offset,
>> +							   length);
>> }
>> -EXPORT_SYMBOL_GPL(svc_encode_read_payload);
>> +EXPORT_SYMBOL_GPL(svc_encode_result_payload);
>> 
>> /**
>>  * svc_fill_write_vector - Construct data argument for VFS write call
>> diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
>> index c2752e2b9ce3..b248f2349437 100644
>> --- a/net/sunrpc/svcsock.c
>> +++ b/net/sunrpc/svcsock.c
>> @@ -181,8 +181,8 @@ static void svc_set_cmsg_data(struct svc_rqst *rqstp, struct cmsghdr *cmh)
>> 	}
>> }
>> 
>> -static int svc_sock_read_payload(struct svc_rqst *rqstp, unsigned int offset,
>> -				 unsigned int length)
>> +static int svc_sock_result_payload(struct svc_rqst *rqstp, unsigned int offset,
>> +				   unsigned int length)
>> {
>> 	return 0;
>> }
>> @@ -635,7 +635,7 @@ static const struct svc_xprt_ops svc_udp_ops = {
>> 	.xpo_create = svc_udp_create,
>> 	.xpo_recvfrom = svc_udp_recvfrom,
>> 	.xpo_sendto = svc_udp_sendto,
>> -	.xpo_read_payload = svc_sock_read_payload,
>> +	.xpo_result_payload = svc_sock_result_payload,
>> 	.xpo_release_rqst = svc_udp_release_rqst,
>> 	.xpo_detach = svc_sock_detach,
>> 	.xpo_free = svc_sock_free,
>> @@ -1123,7 +1123,7 @@ static const struct svc_xprt_ops svc_tcp_ops = {
>> 	.xpo_create = svc_tcp_create,
>> 	.xpo_recvfrom = svc_tcp_recvfrom,
>> 	.xpo_sendto = svc_tcp_sendto,
>> -	.xpo_read_payload = svc_sock_read_payload,
>> +	.xpo_result_payload = svc_sock_result_payload,
>> 	.xpo_release_rqst = svc_tcp_release_rqst,
>> 	.xpo_detach = svc_tcp_sock_detach,
>> 	.xpo_free = svc_sock_free,
>> diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
>> index c3d588b149aa..c8411b4f3492 100644
>> --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
>> +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
>> @@ -979,19 +979,19 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
>> }
>> 
>> /**
>> - * svc_rdma_read_payload - special processing for a READ payload
>> + * svc_rdma_result_payload - special processing for a result payload
>>  * @rqstp: svc_rqst to operate on
>>  * @offset: payload's byte offset in @xdr
>>  * @length: size of payload, in bytes
>>  *
>>  * Returns zero on success.
>>  *
>> - * For the moment, just record the xdr_buf location of the READ
>> + * For the moment, just record the xdr_buf location of the result
>>  * payload. svc_rdma_sendto will use that location later when
>>  * we actually send the payload.
>>  */
>> -int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
>> -			  unsigned int length)
>> +int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
>> +			    unsigned int length)
>> {
>> 	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
>> 
>> diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
>> index fb044792b571..afba4e9d5425 100644
>> --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
>> +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
>> @@ -80,7 +80,7 @@ static const struct svc_xprt_ops svc_rdma_ops = {
>> 	.xpo_create = svc_rdma_create,
>> 	.xpo_recvfrom = svc_rdma_recvfrom,
>> 	.xpo_sendto = svc_rdma_sendto,
>> -	.xpo_read_payload = svc_rdma_read_payload,
>> +	.xpo_result_payload = svc_rdma_result_payload,
>> 	.xpo_release_rqst = svc_rdma_release_rqst,
>> 	.xpo_detach = svc_rdma_detach,
>> 	.xpo_free = svc_rdma_free,
>> 

--
Chuck Lever




^ permalink raw reply	[flat|nested] 29+ messages in thread

end of thread, other threads:[~2020-10-29  0:56 UTC | newest]

Thread overview: 29+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-10-26 18:53 [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Chuck Lever
2020-10-26 18:53 ` [PATCH 01/20] SUNRPC: Adjust synopsis of xdr_buf_subsegment() Chuck Lever
2020-10-26 18:54 ` [PATCH 02/20] svcrdma: Const-ify the xdr_buf arguments Chuck Lever
2020-10-26 18:54 ` [PATCH 03/20] svcrdma: Refactor the RDMA Write path Chuck Lever
2020-10-26 18:54 ` [PATCH 04/20] SUNRPC: Rename svc_encode_read_payload() Chuck Lever
2020-10-27 20:53   ` J. Bruce Fields
2020-10-28 13:16     ` Chuck Lever
2020-10-26 18:54 ` [PATCH 05/20] NFSD: Invoke svc_encode_result_payload() in "read" NFSD encoders Chuck Lever
2020-10-26 18:54 ` [PATCH 06/20] svcrdma: Post RDMA Writes while XDR encoding replies Chuck Lever
2020-10-26 18:54 ` [PATCH 07/20] svcrdma: Clean up svc_rdma_encode_reply_chunk() Chuck Lever
2020-10-26 18:54 ` [PATCH 08/20] svcrdma: Add a "parsed chunk list" data structure Chuck Lever
2020-10-26 18:54 ` [PATCH 09/20] svcrdma: Use parsed chunk lists to derive the inv_rkey Chuck Lever
2020-10-26 18:54 ` [PATCH 10/20] svcrdma: Use parsed chunk lists to detect reverse direction replies Chuck Lever
2020-10-26 18:54 ` [PATCH 11/20] svcrdma: Use parsed chunk lists to construct RDMA Writes Chuck Lever
2020-10-26 18:54 ` [PATCH 12/20] svcrdma: Use parsed chunk lists to encode Reply transport headers Chuck Lever
2020-10-26 18:55 ` [PATCH 13/20] svcrdma: Support multiple write chunks when pulling up Chuck Lever
2020-10-26 18:55 ` [PATCH 14/20] svcrdma: Support multiple Write chunks in svc_rdma_map_reply_msg() Chuck Lever
2020-10-26 18:55 ` [PATCH 15/20] svcrdma: Support multiple Write chunks in svc_rdma_send_reply_chunk Chuck Lever
2020-10-26 18:55 ` [PATCH 16/20] svcrdma: Remove chunk list pointers Chuck Lever
2020-10-26 18:55 ` [PATCH 17/20] svcrdma: Clean up chunk tracepoints Chuck Lever
2020-10-26 18:55 ` [PATCH 18/20] svcrdma: Rename info::ri_chunklen Chuck Lever
2020-10-26 18:55 ` [PATCH 19/20] svcrdma: Use the new parsed chunk list when pulling Read chunks Chuck Lever
2020-10-26 18:55 ` [PATCH 20/20] svcrdma: support multiple Read chunks per RPC Chuck Lever
2020-10-27  6:08 ` [PATCH 00/20] NFSD support for multiple RPC/RDMA chunks Leon Romanovsky
2020-10-27 13:24   ` Chuck Lever
2020-10-27 17:25     ` J. Bruce Fields
2020-10-27 17:29       ` Chuck Lever
2020-10-28  7:16     ` Leon Romanovsky
2020-10-28 13:10       ` Chuck Lever

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.