From: Chuck Lever <chuck.lever@oracle.com>
To: bfields@fieldses.org
Cc: linux-rdma@vger.kernel.org, linux-nfs@vger.kernel.org
Subject: [PATCH RFC 9/9] svcrdma: Add data structure to track READ payloads
Date: Fri, 14 Feb 2020 10:50:29 -0500 [thread overview]
Message-ID: <20200214155029.3848.86626.stgit@klimt.1015granger.net> (raw)
In-Reply-To: <20200214151427.3848.49739.stgit@klimt.1015granger.net>
The Linux NFS/RDMA server implementation currently supports only a
single Write chunk per RPC/RDMA request. Requests with more than one
are so rare there has never been a strong need to support more.
However we are aware of at least one existing NFS client
implementation that can generate such requests, so let's dig in.
Allocate a data structure at Receive time to keep track of the set
of READ payloads and the Write chunks.
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
include/linux/sunrpc/svc_rdma.h | 15 +++-
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 2 -
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 31 +++++++--
net/sunrpc/xprtrdma/svc_rdma_rw.c | 2 -
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 94 +++++++++++++---------------
5 files changed, 80 insertions(+), 64 deletions(-)
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index c1c4563066d9..85e6b281a39b 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -124,6 +124,12 @@ enum {
#define RPCSVC_MAXPAYLOAD_RDMA RPCSVC_MAXPAYLOAD
+struct svc_rdma_payload {
+ __be32 *ra_chunk;
+ unsigned int ra_offset;
+ unsigned int ra_length;
+};
+
struct svc_rdma_recv_ctxt {
struct llist_node rc_node;
struct list_head rc_list;
@@ -137,10 +143,10 @@ struct svc_rdma_recv_ctxt {
unsigned int rc_page_count;
unsigned int rc_hdr_count;
u32 rc_inv_rkey;
- __be32 *rc_write_list;
+ struct svc_rdma_payload *rc_read_payloads;
__be32 *rc_reply_chunk;
- unsigned int rc_read_payload_offset;
- unsigned int rc_read_payload_length;
+ unsigned int rc_num_write_chunks;
+ unsigned int rc_cur_payload;
struct page *rc_pages[RPCSVC_MAXPAGES];
};
@@ -193,7 +199,8 @@ extern void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma,
unsigned int len);
extern int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt,
- struct xdr_buf *xdr, __be32 *wr_lst);
+ struct xdr_buf *xdr,
+ unsigned int num_read_payloads);
extern int svc_rdma_sendto(struct svc_rqst *);
extern int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
unsigned int length);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 908e78bb87c6..3b1baf15a1b7 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -117,7 +117,7 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
{
int ret;
- ret = svc_rdma_map_reply_msg(rdma, ctxt, &rqst->rq_snd_buf, NULL);
+ ret = svc_rdma_map_reply_msg(rdma, ctxt, &rqst->rq_snd_buf, 0);
if (ret < 0)
return -EIO;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 91abe08f7d75..85b8dd8ae772 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -193,7 +193,9 @@ void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma)
out:
ctxt->rc_page_count = 0;
- ctxt->rc_read_payload_length = 0;
+ ctxt->rc_num_write_chunks = 0;
+ ctxt->rc_cur_payload = 0;
+ ctxt->rc_read_payloads = NULL;
return ctxt;
out_empty:
@@ -216,7 +218,8 @@ void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
for (i = 0; i < ctxt->rc_page_count; i++)
put_page(ctxt->rc_pages[i]);
-
+ kfree(ctxt->rc_read_payloads);
+ ctxt->rc_read_payloads = NULL;
if (!ctxt->rc_temp)
llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts);
else
@@ -452,9 +455,10 @@ static __be32 *xdr_check_write_chunk(__be32 *p, const __be32 *end,
static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end,
struct svc_rdma_recv_ctxt *ctxt)
{
- u32 chcount;
+ u32 chcount, segcount;
+ __be32 *saved = p;
+ int i;
- ctxt->rc_write_list = p;
chcount = 0;
while (*p++ != xdr_zero) {
p = xdr_check_write_chunk(p, end, MAX_BYTES_WRITE_SEG);
@@ -463,8 +467,22 @@ static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end,
if (chcount++ > 1)
return NULL;
}
+ ctxt->rc_num_write_chunks = chcount;
if (!chcount)
- ctxt->rc_write_list = NULL;
+ return p;
+
+ ctxt->rc_read_payloads = kcalloc(sizeof(struct svc_rdma_payload),
+ chcount, GFP_KERNEL);
+ if (!ctxt->rc_read_payloads)
+ return NULL;
+
+ i = 0;
+ p = saved;
+ while (*p++ != xdr_zero) {
+ ctxt->rc_read_payloads[i++].ra_chunk = p - 1;
+ segcount = be32_to_cpup(p++);
+ p += segcount * rpcrdma_segment_maxsz;
+ }
return p;
}
@@ -484,8 +502,9 @@ static __be32 *xdr_check_reply_chunk(__be32 *p, const __be32 *end,
p = xdr_check_write_chunk(p, end, MAX_BYTES_SPECIAL_SEG);
if (!p)
return NULL;
- } else
+ } else {
ctxt->rc_reply_chunk = NULL;
+ }
return p;
}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index ca9d414bef9d..740ea4ee251d 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -574,7 +574,7 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
/* Send the page list in the Reply chunk only if the
* client did not provide Write chunks.
*/
- if (!rctxt->rc_write_list && xdr->page_len) {
+ if (!rctxt->rc_num_write_chunks && xdr->page_len) {
ret = svc_rdma_send_xdr_pagelist(info, xdr,
xdr->head[0].iov_len,
xdr->page_len);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 7349a3f9aa5d..378a24b666bb 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -366,10 +366,10 @@ static __be32 *xdr_encode_read_list(__be32 *p)
* transport header. Each segment's length field is updated to
* reflect number of bytes consumed in the segment.
*
- * Returns number of segments in this chunk.
+ * Returns a pointer to the position to encode the next chunk.
*/
-static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
- unsigned int remaining)
+static __be32 *xdr_encode_write_chunk(__be32 *dst, __be32 *src,
+ unsigned int length)
{
unsigned int i, nsegs;
u32 seg_len;
@@ -386,15 +386,15 @@ static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
*dst++ = *src++;
/* bytes returned in this segment */
- seg_len = be32_to_cpu(*src);
- if (remaining >= seg_len) {
+ seg_len = be32_to_cpup(src);
+ if (length >= seg_len) {
/* entire segment was consumed */
*dst = *src;
- remaining -= seg_len;
+ length -= seg_len;
} else {
/* segment only partly filled */
- *dst = cpu_to_be32(remaining);
- remaining = 0;
+ *dst = cpu_to_be32(length);
+ length = 0;
}
dst++; src++;
@@ -403,38 +403,25 @@ static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
*dst++ = *src++;
}
- return nsegs;
+ return dst;
}
-/* The client provided a Write list in the Call message. Fill in
- * the segments in the first Write chunk in the Reply's transport
- * header with the number of bytes consumed in each segment.
- * Remaining chunks are returned unused.
- *
- * Assumptions:
- * - Client has provided only one Write chunk
+/* The client provided a Write list in the Call message. For each
+ * READ payload, fill in the segments in the Write chunks in the
+ * Reply's transport header with the number of bytes consumed
+ * in each segment. Any remaining Write chunks are returned to
+ * the client unused.
*/
static __be32 *xdr_encode_write_list(__be32 *p,
const struct svc_rdma_recv_ctxt *rctxt)
{
- unsigned int consumed, nsegs;
- __be32 *q;
-
- q = rctxt->rc_write_list;
- if (!q)
- goto out;
-
- consumed = rctxt->rc_read_payload_length;
- while (*q != xdr_zero) {
- nsegs = xdr_encode_write_chunk(p, q, consumed);
- q += 2 + nsegs * rpcrdma_segment_maxsz;
- p += 2 + nsegs * rpcrdma_segment_maxsz;
- consumed = 0;
- }
+ unsigned int i;
- /* Terminate Write list */
-out:
- *p++ = xdr_zero;
+ for (i = 0; i < rctxt->rc_num_write_chunks; i++)
+ p = xdr_encode_write_chunk(p,
+ rctxt->rc_read_payloads[i].ra_chunk,
+ rctxt->rc_read_payloads[i].ra_length);
+ *p++ = xdr_zero; /* Terminate Write list */
return p;
}
@@ -519,7 +506,7 @@ void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma,
static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt,
struct xdr_buf *xdr,
- __be32 *wr_lst)
+ unsigned int num_write_chunks)
{
int elements;
@@ -535,7 +522,7 @@ static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
elements = 1;
/* xdr->pages */
- if (!wr_lst) {
+ if (!num_write_chunks) {
unsigned int remaining;
unsigned long pageoff;
@@ -563,7 +550,8 @@ static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
*/
static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt,
- struct xdr_buf *xdr, __be32 *wr_lst)
+ struct xdr_buf *xdr,
+ unsigned int num_write_chunks)
{
unsigned char *dst, *tailbase;
unsigned int taillen;
@@ -576,7 +564,7 @@ static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
tailbase = xdr->tail[0].iov_base;
taillen = xdr->tail[0].iov_len;
- if (wr_lst) {
+ if (num_write_chunks) {
u32 xdrpad;
xdrpad = xdr_padsize(xdr->page_len);
@@ -619,7 +607,7 @@ static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
* @rdma: controlling transport
* @ctxt: send_ctxt for the Send WR
* @xdr: prepared xdr_buf containing RPC message
- * @wr_lst: pointer to Call header's Write list, or NULL
+ * @num_read_payloads: count of separate READ payloads to send
*
* Load the xdr_buf into the ctxt's sge array, and DMA map each
* element as it is added.
@@ -628,7 +616,7 @@ static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
*/
int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt,
- struct xdr_buf *xdr, __be32 *wr_lst)
+ struct xdr_buf *xdr, unsigned int num_read_payloads)
{
unsigned int len, remaining;
unsigned long page_off;
@@ -637,8 +625,8 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
u32 xdr_pad;
int ret;
- if (svc_rdma_pull_up_needed(rdma, ctxt, xdr, wr_lst))
- return svc_rdma_pull_up_reply_msg(rdma, ctxt, xdr, wr_lst);
+ if (svc_rdma_pull_up_needed(rdma, ctxt, xdr, num_read_payloads))
+ return svc_rdma_pull_up_reply_msg(rdma, ctxt, xdr, num_read_payloads);
++ctxt->sc_cur_sge_no;
ret = svc_rdma_dma_map_buf(rdma, ctxt,
@@ -647,12 +635,12 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
if (ret < 0)
return ret;
- /* If a Write chunk is present, the xdr_buf's page list
+ /* If Write chunks are present, the xdr_buf's page list
* is not included inline. However the Upper Layer may
* have added XDR padding in the tail buffer, and that
* should not be included inline.
*/
- if (wr_lst) {
+ if (num_read_payloads) {
base = xdr->tail[0].iov_base;
len = xdr->tail[0].iov_len;
xdr_pad = xdr_padsize(xdr->page_len);
@@ -741,7 +729,7 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
if (!rctxt->rc_reply_chunk) {
ret = svc_rdma_map_reply_msg(rdma, sctxt,
&rqstp->rq_res,
- rctxt->rc_write_list);
+ rctxt->rc_cur_payload);
if (ret < 0)
return ret;
}
@@ -885,18 +873,20 @@ int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
{
struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
struct svcxprt_rdma *rdma;
+ unsigned int i;
- if (!rctxt->rc_write_list)
+ if (!rctxt->rc_num_write_chunks)
return 0;
- /* XXX: Just one READ payload slot for now, since our
- * transport implementation currently supports only one
- * Write chunk.
- */
- rctxt->rc_read_payload_offset = offset;
- rctxt->rc_read_payload_length = length;
+ if (rctxt->rc_cur_payload > rctxt->rc_num_write_chunks)
+ return -ENOENT;
+ i = rctxt->rc_cur_payload++;
+
+ rctxt->rc_read_payloads[i].ra_offset = offset;
+ rctxt->rc_read_payloads[i].ra_length = length;
rdma = container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
- return svc_rdma_send_write_chunk(rdma, rctxt->rc_write_list,
+ return svc_rdma_send_write_chunk(rdma,
+ rctxt->rc_read_payloads[i].ra_chunk,
&rqstp->rq_res, offset, length);
}
prev parent reply other threads:[~2020-02-14 15:50 UTC|newest]
Thread overview: 10+ messages / expand[flat|nested] mbox.gz Atom feed top
2020-02-14 15:49 [PATCH RFC 0/9] Address bugzilla 198053 and more Chuck Lever
2020-02-14 15:49 ` [PATCH RFC 1/9] nfsd: Fix NFSv4 READ on RDMA when using readv Chuck Lever
2020-02-14 15:49 ` [PATCH RFC 2/9] NFSD: Clean up nfsd4_encode_readv Chuck Lever
2020-02-14 15:49 ` [PATCH RFC 3/9] svcrdma: Avoid DMA mapping small RPC Replies Chuck Lever
2020-02-14 15:50 ` [PATCH RFC 4/9] NFSD: Invoke svc_encode_read_payload in "read" NFSD encoders Chuck Lever
2020-02-14 15:50 ` [PATCH RFC 5/9] svcrdma: Add trace point to examine client-provided write segment Chuck Lever
2020-02-14 15:50 ` [PATCH RFC 6/9] svcrdma: De-duplicate code that locates Write and Reply chunks Chuck Lever
2020-02-14 15:50 ` [PATCH RFC 7/9] svcrdma: Post RDMA Writes while XDR encoding replies Chuck Lever
2020-02-14 15:50 ` [PATCH RFC 8/9] svcrdma: Refactor svc_rdma_sendto() Chuck Lever
2020-02-14 15:50 ` Chuck Lever [this message]
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20200214155029.3848.86626.stgit@klimt.1015granger.net \
--to=chuck.lever@oracle.com \
--cc=bfields@fieldses.org \
--cc=linux-nfs@vger.kernel.org \
--cc=linux-rdma@vger.kernel.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).