All of lore.kernel.org
 help / color / mirror / Atom feed
* [RFC Patch 08/09] NFS/RDMA client - rpcrdma protocol handling
@ 2007-07-11 21:08 Talpey, Thomas
  2007-07-13 16:35 ` Chuck Lever
  0 siblings, 1 reply; 8+ messages in thread
From: Talpey, Thomas @ 2007-07-11 21:08 UTC (permalink / raw)
  To: nfs

RPCRDMA: rpc rdma protocol implementation

This implements the marshaling and unmarshaling of the rpcrdma transport
headers. Connection management is also addressed.

Signed-off-by: Tom Talpey <talpey@netapp.com>

---

Index: linux-2.6.22/net/sunrpc/xprtrdma/rpc_rdma.c
===================================================================
--- linux-2.6.22.orig/net/sunrpc/xprtrdma/rpc_rdma.c
+++ linux-2.6.22/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -1,9 +1,882 @@
 /*
- * Placeholders for subsequent patches
+ * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ *      Redistributions of source code must retain the above copyright
+ *      notice, this list of conditions and the following disclaimer.
+ *
+ *      Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials provided
+ *      with the distribution.
+ *
+ *      Neither the name of the Network Appliance, Inc. nor the names of
+ *      its contributors may be used to endorse or promote products
+ *      derived from this software without specific prior written
+ *      permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * rpc_rdma.c
+ *
+ * This file contains the guts of the RPC RDMA protocol, and
+ * does marshaling/unmarshaling, etc. It is also where interfacing
+ * to the Linux RPC framework lives.
  */
 
 #include "xprt_rdma.h"
 
-void rpcrdma_conn_func(struct rpcrdma_ep *a) { }
-void rpcrdma_reply_handler(struct rpcrdma_rep *a) { }
-int rpcrdma_marshal_req(struct rpc_rqst *a) { return EINVAL; }
+#include <linux/nfs2.h>
+#include <linux/nfs3.h>
+#include <linux/nfs4.h>
+
+#include <linux/highmem.h>
+
+#ifdef RPC_DEBUG
+# define RPCDBG_FACILITY       RPCDBG_TRANS
+#endif
+
+enum rpcrdma_chunktype {
+       rpcrdma_noch = 0,
+       rpcrdma_readch,
+       rpcrdma_areadch,
+       rpcrdma_writech,
+       rpcrdma_replych
+};
+
+#ifdef RPC_DEBUG
+static const char transfertypes[][12] = {
+       "pure inline",  /* no chunks */
+       " read chunk",  /* some argument via rdma read */
+       "*read chunk",  /* entire request via rdma read */
+       "write chunk",  /* some result via rdma write */
+       "reply chunk"   /* entire reply via rdma write */
+};
+#endif
+
+/*
+ * Chunk assembly from upper layer xdr_buf.
+ *
+ * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
+ * elements. Segments are then coalesced when registered, if possible
+ * within the selected memreg mode.
+ *
+ * Note, this routine is never called if the connection's memory
+ * registration strategy is 0 (bounce buffers).
+ */
+
+static int
+rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, int first,
+       enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs)
+{
+       int len, n = 0, p;
+
+       if (first == 0 && xdrbuf->head[0].iov_len) {
+               seg[n].mr_page = NULL;
+               seg[n].mr_offset = xdrbuf->head[0].iov_base;
+               seg[n].mr_len = xdrbuf->head[0].iov_len;
+               ++n;
+       }
+
+       if (xdrbuf->page_len && (xdrbuf->pages[0] != NULL)) {
+               if (n == nsegs)
+                       return 0;
+               seg[n].mr_page = xdrbuf->pages[0];
+               seg[n].mr_offset = (void *)(unsigned long) xdrbuf->page_base;
+               seg[n].mr_len = PAGE_SIZE - xdrbuf->page_base;
+               len = xdrbuf->page_len - seg[n].mr_len;
+               ++n;
+               p = 1;
+               while (len > 0) {
+                       if (n == nsegs)
+                               return 0;
+                       seg[n].mr_page = xdrbuf->pages[p];
+                       seg[n].mr_offset = NULL;
+                       seg[n].mr_len = ((len > PAGE_SIZE) ? PAGE_SIZE : len);
+                       len -= seg[n].mr_len;
+                       ++n;
+                       ++p;
+               }
+       }
+
+       if (xdrbuf->tail[0].iov_len && type != rpcrdma_writech) {
+               if (n == nsegs)
+                       return 0;
+               seg[n].mr_page = NULL;
+               seg[n].mr_offset = xdrbuf->tail[0].iov_base;
+               seg[n].mr_len = xdrbuf->tail[0].iov_len;
+               ++n;
+       }
+       return n;
+}
+
+/*
+ * Create read/write chunk lists, and reply chunks, for RDMA
+ *
+ *   Assume check against THRESHOLD has been done, and chunks are required.
+ *   Assume only encoding one list entry for read|write chunks. The NFSv3
+ *     protocol is simple enough to allow this as it only has a single "bulk
+ *     result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The
+ *     RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.)
+ *
+ * When used for a single reply chunk (which is a special write
+ * chunk used for the entire reply, rather than just the data), it
+ * is used primarily for READDIR and READLINK which would otherwise
+ * be severely size-limited by a small rdma inline read max. The server
+ * response will come back as an RDMA Write, followed by a message
+ * of type RDMA_NOMSG carrying the xid and length. As a result, reply
+ * chunks do not provide data alignment, however they do not require
+ * "fixup" (moving the response to the upper layer buffer) either.
+ *
+ * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
+ *
+ *  Read chunklist (a linked list):
+ *   N elements, position P (same P for all chunks of same arg!):
+ *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
+ *
+ *  Write chunklist (a list of (one) counted array):
+ *   N elements:
+ *    1 - N - HLOO - HLOO - ... - HLOO - 0
+ *
+ *  Reply chunk (a counted array):
+ *   N elements:
+ *    1 - N - HLOO - HLOO - ... - HLOO
+ */
+
+static unsigned int
+rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
+               struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)
+{
+       struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+       struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_task->tk_xprt);
+       int nsegs, nchunks = 0;
+       int pos;
+       struct rpcrdma_mr_seg *seg = req->rl_segments;
+       struct rpcrdma_read_chunk *cur_rchunk = NULL;
+       struct rpcrdma_write_array *warray = NULL;
+       struct rpcrdma_write_chunk *cur_wchunk = NULL;
+       u32 *iptr = headerp->rm_body.rm_chunks;
+
+       if (type == rpcrdma_readch || type == rpcrdma_areadch) {
+               /* a read chunk - server will RDMA Read our memory */
+               cur_rchunk = (struct rpcrdma_read_chunk *) iptr;
+       } else {
+               /* a write or reply chunk - server will RDMA Write our memory */
+               *iptr++ = xdr_zero;     /* encode a NULL read chunk list */
+               if (type == rpcrdma_replych)
+                       *iptr++ = xdr_zero;     /* a NULL write chunk list */
+               warray = (struct rpcrdma_write_array *) iptr;
+               cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1);
+       }
+
+       if (type == rpcrdma_replych || type == rpcrdma_areadch)
+               pos = 0;
+       else
+               pos = target->head[0].iov_len;
+
+       nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS);
+       if (nsegs == 0)
+               return 0;
+
+       do {
+               /* bind/register the memory, then build chunk from result. */
+               int n = rpcrdma_register_external(seg, nsegs,
+                                               cur_wchunk != NULL, r_xprt);
+               if (n <= 0)
+                       goto out;
+               if (cur_rchunk) {       /* read */
+                       cur_rchunk->rc_discrim = xdr_one;
+                       /* all read chunks have the same "position" */
+                       cur_rchunk->rc_position = htonl(pos);
+                       cur_rchunk->rc_target.rs_handle = htonl(seg->mr_rkey);
+                       cur_rchunk->rc_target.rs_length = htonl(seg->mr_len);
+                       xdr_encode_hyper((u32 *)&cur_rchunk->rc_target.rs_offset,
+                                       seg->mr_base);
+                       dprintk("RPC:       %s: read chunk "
+                               "elem %d@0x%llx:0x%x pos %d (%s)\n", __func__,
+                               seg->mr_len, seg->mr_base, seg->mr_rkey, pos,
+                               n < nsegs ? "more" : "last");
+                       cur_rchunk++;
+                       r_xprt->rx_stats.read_chunk_count++;
+               } else {                /* write/reply */
+                       cur_wchunk->wc_target.rs_handle = htonl(seg->mr_rkey);
+                       cur_wchunk->wc_target.rs_length = htonl(seg->mr_len);
+                       xdr_encode_hyper((u32 *)&cur_wchunk->wc_target.rs_offset,
+                                       seg->mr_base);
+                       dprintk("RPC:       %s: %s chunk "
+                               "elem %d@0x%llx:0x%x (%s)\n", __func__,
+                               (type == rpcrdma_replych) ? "reply" : "write",
+                               seg->mr_len, seg->mr_base, seg->mr_rkey,
+                               n < nsegs ? "more" : "last");
+                       cur_wchunk++;
+                       if (type == rpcrdma_replych)
+                               r_xprt->rx_stats.reply_chunk_count++;
+                       else
+                               r_xprt->rx_stats.write_chunk_count++;
+                       r_xprt->rx_stats.total_rdma_request += seg->mr_len;
+               }
+               nchunks++;
+               seg   += n;
+               nsegs -= n;
+       } while (nsegs);
+
+       /* success. all failures return above */
+       req->rl_nchunks = nchunks;
+
+       BUG_ON(nchunks == 0);
+
+       /*
+       * finish off header. If write, marshal discrim and nchunks.
+       */
+       if (cur_rchunk) {
+               iptr = (u32 *) cur_rchunk;
+               *iptr++ = xdr_zero;     /* finish the read chunk list */
+               *iptr++ = xdr_zero;     /* encode a NULL write chunk list */
+               *iptr++ = xdr_zero;     /* encode a NULL reply chunk */
+       } else {
+               warray->wc_discrim = xdr_one;
+               warray->wc_nchunks = htonl(nchunks);
+               iptr = (u32 *) cur_wchunk;
+               if (type == rpcrdma_writech) {
+                       *iptr++ = xdr_zero; /* finish the write chunk list */
+                       *iptr++ = xdr_zero; /* encode a NULL reply chunk */
+               }
+       }
+
+       /*
+       * Return header size.
+       */
+       return (unsigned char *)iptr - (unsigned char *)headerp;
+
+out:
+       for (pos = 0; nchunks--; )
+               pos += rpcrdma_deregister_external(
+                               &req->rl_segments[pos], r_xprt, NULL);
+       return 0;
+}
+
+/*
+ * Copy write data inline.
+ * This function is used for "small" requests. Data which is passed
+ * to RPC via iovecs (or page list) is copied directly into the
+ * pre-registered memory buffer for this request. For small amounts
+ * of data, this is efficient. The cutoff value is tunable.
+ */
+static int
+rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
+{
+       int i, npages, curlen;
+       int copy_len;
+       unsigned char *srcp, *destp;
+
+       destp = rqst->rq_svec[0].iov_base;
+       curlen = rqst->rq_svec[0].iov_len;
+       destp += curlen;
+       /*
+       * Do optional padding where it makes sense. Alignment of write
+       * payload can help the server, if our setting is accurate.
+       */
+       pad -= (curlen + 36 /*sizeof(struct rpcrdma_msg_padded)*/);
+       if (pad < 0 || rqst->rq_slen - curlen < RPCRDMA_INLINE_PAD_THRESH)
+               pad = 0;        /* don't pad this request */
+
+       dprintk("RPC:       %s: pad %d destp 0x%p len %d hdrlen %d\n",
+               __func__, pad, destp, rqst->rq_slen, curlen);
+
+       copy_len = rqst->rq_snd_buf.page_len;
+       rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.pullup_copy_count += copy_len;
+       npages = PAGE_ALIGN(rqst->rq_snd_buf.page_base + copy_len) >> PAGE_SHIFT;
+       for (i = 0; copy_len && i < npages; i++) {
+               if (i == 0)
+                       curlen = PAGE_SIZE - rqst->rq_snd_buf.page_base;
+               else
+                       curlen = PAGE_SIZE;
+               if (curlen > copy_len)
+                       curlen = copy_len;
+               dprintk("RPC:       %s: page %d destp 0x%p len %d curlen %d\n",
+                       __func__, i, destp, copy_len, curlen);
+               srcp = kmap_atomic(rqst->rq_snd_buf.pages[i],
+                                       KM_SKB_SUNRPC_DATA);
+               if (i == 0)
+                       memcpy(destp, srcp+rqst->rq_snd_buf.page_base, curlen);
+               else
+                       memcpy(destp, srcp, curlen);
+               kunmap_atomic(srcp, KM_SKB_SUNRPC_DATA);
+               rqst->rq_svec[0].iov_len += curlen;
+               destp += curlen;
+               copy_len -= curlen;
+       }
+       if (rqst->rq_snd_buf.tail[0].iov_len) {
+               curlen = rqst->rq_snd_buf.tail[0].iov_len;
+               if (destp != rqst->rq_snd_buf.tail[0].iov_base) {
+                       memcpy(destp, rqst->rq_snd_buf.tail[0].iov_base, curlen);
+                       rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.pullup_copy_count += curlen;
+               }
+               dprintk("RPC:       %s: tail destp 0x%p len %d curlen %d\n",
+                       __func__, destp, copy_len, curlen);
+               rqst->rq_svec[0].iov_len += curlen;
+       }
+       /* header now contains entire send message */
+       return pad;
+}
+
+/*
+ * Totally imperfect, temporary attempt to detect nfs reads...
+ * e.g. establish a hint via xdr_inline_pages, etc.
+ */
+static int
+is_nfs_read(struct rpc_rqst *rqst)
+{
+       u32 *p;
+
+       if (rqst->rq_task->tk_client->cl_prog != NFS_PROGRAM)
+               return 0;
+       switch (rqst->rq_task->tk_client->cl_vers) {
+       case 4:
+               /* Must dig into the COMPOUND. */
+               /* Back up from the end of what a read request would be */
+               /* PUTFH, fh, OP_READ, stateid(16), offset(8), count(4) */
+               p = (u32 *)(rqst->rq_snd_buf.head[0].iov_base +
+                           rqst->rq_snd_buf.head[0].iov_len);
+               /* test read and count */
+               return (rqst->rq_snd_buf.head[0].iov_len > 40 &&
+                       p[-8] == __constant_htonl(OP_READ) &&
+
+                       p[-1] == htonl(rqst->rq_rcv_buf.page_len));
+       case 3:
+               return rqst->rq_task->tk_msg.rpc_proc->p_proc == NFS3PROC_READ;
+       case 2:
+               return rqst->rq_task->tk_msg.rpc_proc->p_proc == NFSPROC_READ;
+       }
+       return 0;
+}
+
+/*
+ * Marshal a request: the primary job of this routine is to choose
+ * the transfer modes. See comments below.
+ *
+ * Uses multiple RDMA IOVs for a request:
+ *  [0] -- RPC RDMA header, which uses memory from the *start* of the
+ *         preregistered buffer that already holds the RPC data in
+ *         its middle.
+ *  [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.
+ *  [2] -- optional padding.
+ *  [3] -- if padded, header only in [1] and data here.
+ */
+
+int
+rpcrdma_marshal_req(struct rpc_rqst *rqst)
+{
+       struct rpc_xprt *xprt = rqst->rq_task->tk_xprt;
+       struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+       struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+       char *base;
+       size_t hdrlen, rpclen, padlen;
+       enum rpcrdma_chunktype rtype, wtype;
+       struct rpcrdma_msg *headerp;
+
+       /*
+       * rpclen gets amount of data in first buffer, which is the
+       * pre-registered buffer.
+       */
+       base = rqst->rq_svec[0].iov_base;
+       rpclen = rqst->rq_svec[0].iov_len;
+
+       /* build RDMA header in private area at front */
+       headerp = (struct rpcrdma_msg *) req->rl_base;
+       /* don't htonl XID, it's already done in request */
+       headerp->rm_xid = rqst->rq_xid;
+       headerp->rm_vers = xdr_one;
+       headerp->rm_credit = htonl(r_xprt->rx_buf.rb_max_requests);
+       headerp->rm_type = __constant_htonl(RDMA_MSG);
+
+       /*
+       * Chunks needed for results?
+       *
+       * o If the expected result is under the inline threshold, all ops
+       *   return as inline (but see later).
+       * o Large non-read ops return as a single reply chunk.
+       * o Large read ops return data as write chunk(s), header as inline.
+       *
+       * Note: the NFS code sending down multiple result segments implies
+       * the op is one of read, readdir[plus], readlink or NFSv4 getacl.
+       */
+
+       /*
+       * This code can handle read chunks, write chunks OR reply
+       * chunks -- only one type. If the request is too big to fit
+       * inline, then we will choose read chunks. If the request is
+       * a READ, then use write chunks to separate the file data
+       * into pages; otherwise use reply chunks.
+       */
+       if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst))
+               wtype = rpcrdma_noch;
+       else if (rqst->rq_rcv_buf.page_len == 0)
+               wtype = rpcrdma_replych;
+       else if (is_nfs_read(rqst))
+               wtype = rpcrdma_writech;
+       else
+               wtype = rpcrdma_replych;
+
+       /*
+       * Chunks needed for arguments?
+       *
+       * o If the total request is under the inline threshold, all ops
+       *   are sent as inline.
+       * o Large non-write ops are sent with the entire message as a
+       *   single read chunk (protocol 0-position special case).
+       * o Large write ops transmit data as read chunk(s), header as
+       *   inline.
+       *
+       * Note: the NFS code sending down multiple argument segments
+       * implies the op is a write.
+       * TBD check NFSv4 setacl
+       */
+       if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
+               rtype = rpcrdma_noch;
+       else if (rqst->rq_snd_buf.page_len == 0)
+               rtype = rpcrdma_areadch;
+       else
+               rtype = rpcrdma_readch;
+
+       /* The following simplification is not true forever */
+       if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
+               wtype = rpcrdma_noch;
+       BUG_ON(rtype != rpcrdma_noch && wtype != rpcrdma_noch);
+
+       if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS &&
+           (rtype != rpcrdma_noch || wtype != rpcrdma_noch)) {
+               /* forced to "pure inline"? */
+               dprintk("RPC:       %s: too much data (%d/%d) for inline\n",
+                       __func__, rqst->rq_rcv_buf.len, rqst->rq_snd_buf.len);
+               return -1;
+       }
+
+       hdrlen = 28; /*sizeof *headerp;*/
+       padlen = 0;
+
+       /*
+       * Pull up any extra send data into the preregistered buffer.
+       * When padding is in use and applies to the transfer, insert
+       * it and change the message type.
+       */
+       if (rtype == rpcrdma_noch) {
+
+               padlen = rpcrdma_inline_pullup(rqst, RPCRDMA_INLINE_PAD_VALUE(rqst));
+
+               if (padlen) {
+                       headerp->rm_type = __constant_htonl(RDMA_MSGP);
+                       headerp->rm_body.rm_padded.rm_align =
+                               htonl(RPCRDMA_INLINE_PAD_VALUE(rqst));
+                       headerp->rm_body.rm_padded.rm_thresh =
+                               __constant_htonl(RPCRDMA_INLINE_PAD_THRESH);
+                       headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero;
+                       headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
+                       headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
+                       hdrlen += 2 * sizeof (u32);     /* extra words in padhdr */
+                       BUG_ON(wtype != rpcrdma_noch);
+
+               } else {
+                       headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
+                       headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
+                       headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
+                       /* new length after pullup */
+                       rpclen = rqst->rq_svec[0].iov_len;
+                       /*
+                       * Currently we try to not actually use read inline.
+                       * Reply chunks have the desirable property that
+                       * they land, packed, directly in the target buffers
+                       * without headers, so they require no fixup. The
+                       * additional RDMA Write op sends the same amount
+                       * of data, streams on-the-wire and adds no overhead
+                       * on receive. Therefore, we request a reply chunk
+                       * for non-writes wherever feasible and efficient.
+                       */
+                       if (wtype == rpcrdma_noch &&
+                           r_xprt->rx_ia.ri_memreg_strategy > RPCRDMA_REGISTER)
+                               wtype = rpcrdma_replych;
+               }
+       }
+
+       /*
+       * Marshal chunks. This routine will return the header length
+       * consumed by marshaling.
+       */
+       if (rtype != rpcrdma_noch) {
+               hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf, headerp, rtype);
+               wtype = rtype;  /* simplify dprintk */
+
+       } else if (wtype != rpcrdma_noch) {
+               hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf, headerp, wtype);
+       }
+
+       if (hdrlen == 0)
+               return -1;
+
+       dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd\n"
+               "                   headerp 0x%p base 0x%p lkey 0x%x\n",
+               __func__, transfertypes[wtype], hdrlen, rpclen, padlen,
+               headerp, base, req->rl_iov.lkey);
+
+       /*
+       * initialize send_iov's - normally only two: rdma chunk header and
+       * single preregistered RPC header buffer, but if padding is present,
+       * then use a preregistered (and zeroed) pad buffer between the RPC
+       * header and any write data. In all non-rdma cases, any following
+       * data has been copied into the RPC header buffer.
+       */
+       req->rl_send_iov[0].addr = req->rl_iov.addr;
+       req->rl_send_iov[0].length = hdrlen;
+       req->rl_send_iov[0].lkey = req->rl_iov.lkey;
+
+       req->rl_send_iov[1].addr = req->rl_iov.addr + (base - req->rl_base);
+       req->rl_send_iov[1].length = rpclen;
+       req->rl_send_iov[1].lkey = req->rl_iov.lkey;
+
+       req->rl_niovs = 2;
+
+       if (padlen) {
+               struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+
+               req->rl_send_iov[2].addr = ep->rep_pad.addr;
+               req->rl_send_iov[2].length = padlen;
+               req->rl_send_iov[2].lkey = ep->rep_pad.lkey;
+
+               req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
+               req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
+               req->rl_send_iov[3].lkey = req->rl_iov.lkey;
+
+               req->rl_niovs = 4;
+       }
+
+       return 0;
+}
+
+/*
+ * Chase down a received write or reply chunklist to get length
+ * RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
+ */
+static int
+rpcrdma_count_chunks(struct rpcrdma_rep *rep, int max, int wrchunk, u32 **iptrp)
+{
+       unsigned int i, total_len;
+       struct rpcrdma_write_chunk *cur_wchunk;
+
+       i = ntohl(**iptrp);     /* get array count */
+       if (i > max) {
+               return -1;
+       }
+       cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
+       total_len = 0;
+       while (i--) {
+               ifdebug(FACILITY) {
+                       u64 off;
+                       xdr_decode_hyper((u32 *)&cur_wchunk->wc_target.rs_offset, &off);
+                       dprintk("RPC:       %s: chunk %d@0x%llx:0x%x\n",
+                               __func__, ntohl(cur_wchunk->wc_target.rs_length),
+                               off, ntohl(cur_wchunk->wc_target.rs_handle));
+               }
+               total_len += ntohl(cur_wchunk->wc_target.rs_length);
+               ++cur_wchunk;
+       }
+       /* check and adjust for properly terminated write chunk */
+       if (wrchunk) {
+               u32 *w = (u32 *) cur_wchunk;
+               if (*w++ != xdr_zero)
+                       return -1;
+               cur_wchunk = (struct rpcrdma_write_chunk *) w;
+       }
+       if ((char *) cur_wchunk > rep->rr_base + rep->rr_len)
+               return -1;
+
+       *iptrp = (u32 *) cur_wchunk;
+       return total_len;
+}
+
+/*
+ * Scatter inline received data back into provided iov's.
+ */
+static void
+rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len)
+{
+       int i, npages, curlen, olen;
+       char *destp;
+
+       curlen = rqst->rq_rcv_buf.head[0].iov_len;
+       if (curlen > copy_len) {        /* write chunk header fixup */
+               curlen = copy_len;
+               rqst->rq_rcv_buf.head[0].iov_len = curlen;
+       }
+
+       dprintk("RPC:       %s: srcp 0x%p len %d hdrlen %d\n",
+               __func__, srcp, copy_len, curlen);
+
+       /* Shift pointer for first receive segment only */
+       rqst->rq_rcv_buf.head[0].iov_base = srcp;
+       srcp += curlen;
+       copy_len -= curlen;
+
+       olen = copy_len;
+       i = 0;
+       rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen;
+       if (copy_len && rqst->rq_rcv_buf.page_len) {
+               npages = PAGE_ALIGN(rqst->rq_rcv_buf.page_base +
+                       rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT;
+               for ( ; i < npages; i++) {
+                       if (i == 0)
+                               curlen = PAGE_SIZE - rqst->rq_rcv_buf.page_base;
+                       else
+                               curlen = PAGE_SIZE;
+                       if (curlen > copy_len)
+                               curlen = copy_len;
+                       dprintk("RPC:       %s: page %d srcp 0x%p len %d curlen %d\n",
+                               __func__, i, srcp, copy_len, curlen);
+                       destp = kmap_atomic(rqst->rq_rcv_buf.pages[i],
+                                               KM_SKB_SUNRPC_DATA);
+                       if (i == 0)
+                               memcpy(destp + rqst->rq_rcv_buf.page_base,
+                                               srcp, curlen);
+                       else
+                               memcpy(destp, srcp, curlen);
+                       flush_dcache_page(rqst->rq_rcv_buf.pages[i]);
+                       kunmap_atomic(destp, KM_SKB_SUNRPC_DATA);
+                       srcp += curlen;
+                       if ((copy_len -= curlen) == 0)
+                               break;
+               }
+               rqst->rq_rcv_buf.page_len = olen - copy_len;
+       } else
+               rqst->rq_rcv_buf.page_len = 0;
+
+       if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) {
+               curlen = copy_len;
+               if (curlen > rqst->rq_rcv_buf.tail[0].iov_len)
+                       curlen = rqst->rq_rcv_buf.tail[0].iov_len;
+               if (rqst->rq_rcv_buf.tail[0].iov_base != srcp)
+                       memcpy(rqst->rq_rcv_buf.tail[0].iov_base, srcp, curlen);
+               dprintk("RPC:       %s: tail srcp 0x%p len %d curlen %d\n",
+                       __func__, srcp, copy_len, curlen);
+               rqst->rq_rcv_buf.tail[0].iov_len = curlen;
+               copy_len -= curlen; ++i;
+       } else
+               rqst->rq_rcv_buf.tail[0].iov_len = 0;
+
+       if (copy_len)
+               dprintk("RPC:       %s: %d bytes in %d extra segments (%d lost)\n",
+                       __func__, olen, i, copy_len);
+
+       /* TBD avoid a warning from call_decode() */
+       rqst->rq_private_buf = rqst->rq_rcv_buf;
+}
+
+/*
+ * This function is called when an async event is posted to
+ * the connection which changes the connection state. All it
+ * does at this point is mark the connection up/down, the rpc
+ * timers do the rest.
+ */
+void
+rpcrdma_conn_func(struct rpcrdma_ep *ep)
+{
+       struct rpc_xprt *xprt = ep->rep_xprt;
+
+       spin_lock_bh(&xprt->transport_lock);
+       if (ep->rep_connected > 0) {
+               if (!xprt_test_and_set_connected(xprt))
+                       xprt_wake_pending_tasks(xprt, 0);
+       } else {
+               if (xprt_test_and_clear_connected(xprt))
+                       xprt_wake_pending_tasks(xprt, ep->rep_connected);
+       }
+       spin_unlock_bh(&xprt->transport_lock);
+}
+
+/*
+ * This function is called when memory window unbind which we are waiting
+ * for completes. Just use rr_func (zeroed by upcall) to signal completion.
+ */
+static void
+rpcrdma_unbind_func(struct rpcrdma_rep *rep)
+{
+       wake_up(&rep->rr_unbind);
+}
+
+/*
+ * Called as a tasklet to do req/reply match and complete a request
+ * Errors must result in the RPC task either being awakened, or
+ * allowed to timeout, to discover the errors at that time.
+ */
+void
+rpcrdma_reply_handler(struct rpcrdma_rep *rep)
+{
+       struct rpcrdma_msg *headerp;
+       struct rpcrdma_req *req;
+       struct rpc_rqst *rqst;
+       struct rpc_xprt *xprt = rep->rr_xprt;
+       struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+       u32 *iptr;
+       int i, rdmalen, status;
+
+       /* Check status. If bad, signal disconnect and return rep to pool */
+       if (rep->rr_len == ~0U) {
+               rpcrdma_recv_buffer_put(rep);
+               if (r_xprt->rx_ep.rep_connected == 1) {
+                       r_xprt->rx_ep.rep_connected = -EIO;
+                       rpcrdma_conn_func(&r_xprt->rx_ep);
+               }
+               return;
+       }
+       if (rep->rr_len < 28) {
+               dprintk("RPC:       %s: short/invalid reply\n", __func__);
+               goto repost;
+       }
+       headerp = (struct rpcrdma_msg *) rep->rr_base;
+       if (headerp->rm_vers != xdr_one) {
+               dprintk("RPC:       %s: invalid version %d\n",
+                       __func__, ntohl(headerp->rm_vers));
+               goto repost;
+       }
+
+       /* Get XID and try for a match. */
+       spin_lock(&xprt->transport_lock);
+       rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
+       if (rqst == NULL) {
+               spin_unlock(&xprt->transport_lock);
+               dprintk("RPC:       %s: reply 0x%p failed "
+                       "to match any request xid 0x%08x len %d\n",
+                       __func__, rep, headerp->rm_xid, rep->rr_len);
+repost:
+               rep->rr_func = rpcrdma_reply_handler;
+               if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+                       rpcrdma_recv_buffer_put(rep);
+
+               return;
+       }
+
+       /* get request object */
+       req = rpcr_to_rdmar(rqst);
+
+       dprintk("RPC:       %s: reply 0x%p completes request 0x%p\n"
+               "                   RPC request 0x%p xid 0x%08x\n",
+                       __func__, rep, req, rqst, headerp->rm_xid);
+
+       BUG_ON(!req || req->rl_reply);
+
+       /* from here on, the reply is no longer an orphan */
+       req->rl_reply = rep;
+
+       /* check for expected message types */
+       /* The order of some of these tests is important. */
+       switch (headerp->rm_type) {
+       case __constant_htonl(RDMA_MSG):
+               /* never expect read chunks */
+               /* never expect reply chunks (two ways to check) */
+               /* never expect write chunks without having offered RDMA */
+               if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
+                   (headerp->rm_body.rm_chunks[1] == xdr_zero &&
+                    headerp->rm_body.rm_chunks[2] != xdr_zero) ||
+                   (headerp->rm_body.rm_chunks[1] != xdr_zero &&
+                    req->rl_nchunks == 0)) {
+                       goto badheader;
+               }
+               if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
+                       /* count any expected write chunks in read reply */
+                       /* start at write chunk array count */
+                       iptr = &headerp->rm_body.rm_chunks[2];
+                       rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 1, &iptr);
+                       /* check for validity, and no reply chunk after */
+                       if (rdmalen < 0 || *iptr++ != xdr_zero) {
+                               goto badheader;
+                       }
+                       rep->rr_len -=
+                           ((unsigned char *)iptr - (unsigned char *)headerp);
+                       status = rep->rr_len + rdmalen;
+                       r_xprt->rx_stats.total_rdma_reply += rdmalen;
+               } else {
+                       /* else ordinary inline */
+                       iptr = (u32 *)((unsigned char *)headerp + 28);
+                       rep->rr_len -= 28; /*sizeof *headerp;*/
+                       status = rep->rr_len;
+               }
+               /* Fix up the rpc results for upper layer */
+               rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len);
+               break;
+
+       case __constant_htonl(RDMA_NOMSG):
+               /* never expect read or write chunks, always reply chunks */
+               if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
+                   headerp->rm_body.rm_chunks[1] != xdr_zero ||
+                   headerp->rm_body.rm_chunks[2] != xdr_one ||
+                   req->rl_nchunks == 0) {
+                       goto badheader;
+               }
+               iptr = (u32 *)((unsigned char *)headerp + 28);
+               rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr);
+               if (rdmalen < 0) {
+                       goto badheader;
+               }
+               r_xprt->rx_stats.total_rdma_reply += rdmalen;
+               /* Reply chunk buffer already is the reply vector - no fixup. */
+               status = rdmalen;
+               break;
+
+       default:
+       badheader:
+               dprintk("%s: invalid rpcrdma reply header (type %d):"
+                               " chunks[012] == %d %d %d expected chunks <= %d\n",
+                               __func__, ntohl(headerp->rm_type),
+                               headerp->rm_body.rm_chunks[0],
+                               headerp->rm_body.rm_chunks[1],
+                               headerp->rm_body.rm_chunks[2],
+                               req->rl_nchunks);
+               status = -EIO;
+               r_xprt->rx_stats.bad_reply_count++;
+               break;
+       }
+
+       /* If using mw bind, start the deregister process now. */
+       /* (Note: if mr_free(), cannot perform it here, in tasklet context) */
+       if (req->rl_nchunks) switch (r_xprt->rx_ia.ri_memreg_strategy) {
+       case RPCRDMA_MEMWINDOWS:
+               for (i = 0; req->rl_nchunks-- > 1; )
+                       i += rpcrdma_deregister_external(
+                               &req->rl_segments[i], r_xprt, NULL);
+               /* Optionally wait (not here) for unbinds to complete */
+               rep->rr_func = rpcrdma_unbind_func;
+               (void) rpcrdma_deregister_external(&req->rl_segments[i], r_xprt, rep);
+               break;
+       case RPCRDMA_MEMWINDOWS_ASYNC:
+               for (i = 0; req->rl_nchunks--; )
+                       i += rpcrdma_deregister_external(
+                               &req->rl_segments[i], r_xprt, NULL);
+               break;
+       default:
+               break;
+       }
+
+       dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
+                       __func__, xprt, rqst, status);
+       xprt_complete_rqst(rqst->rq_task, status);
+       spin_unlock(&xprt->transport_lock);
+}


-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [RFC Patch 08/09] NFS/RDMA client - rpcrdma protocol handling
  2007-07-11 21:08 [RFC Patch 08/09] NFS/RDMA client - rpcrdma protocol handling Talpey, Thomas
@ 2007-07-13 16:35 ` Chuck Lever
  2007-07-13 16:50   ` Talpey, Thomas
  0 siblings, 1 reply; 8+ messages in thread
From: Chuck Lever @ 2007-07-13 16:35 UTC (permalink / raw)
  To: Talpey, Thomas; +Cc: nfs

[-- Attachment #1: Type: text/plain, Size: 39853 bytes --]

Hi Tom!


Talpey, Thomas wrote:
> RPCRDMA: rpc rdma protocol implementation
> 
> This implements the marshaling and unmarshaling of the rpcrdma transport
> headers. Connection management is also addressed.
> 
> Signed-off-by: Tom Talpey <talpey@netapp.com>
> 
> ---
> 
> Index: linux-2.6.22/net/sunrpc/xprtrdma/rpc_rdma.c
> ===================================================================
> --- linux-2.6.22.orig/net/sunrpc/xprtrdma/rpc_rdma.c
> +++ linux-2.6.22/net/sunrpc/xprtrdma/rpc_rdma.c
> @@ -1,9 +1,882 @@
>  /*
> - * Placeholders for subsequent patches
> + * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
> + *
> + * This software is available to you under a choice of one of two
> + * licenses.  You may choose to be licensed under the terms of the GNU
> + * General Public License (GPL) Version 2, available from the file
> + * COPYING in the main directory of this source tree, or the BSD-type
> + * license below:
> + *
> + * Redistribution and use in source and binary forms, with or without
> + * modification, are permitted provided that the following conditions
> + * are met:
> + *
> + *      Redistributions of source code must retain the above copyright
> + *      notice, this list of conditions and the following disclaimer.
> + *
> + *      Redistributions in binary form must reproduce the above
> + *      copyright notice, this list of conditions and the following
> + *      disclaimer in the documentation and/or other materials provided
> + *      with the distribution.
> + *
> + *      Neither the name of the Network Appliance, Inc. nor the names of
> + *      its contributors may be used to endorse or promote products
> + *      derived from this software without specific prior written
> + *      permission.
> + *
> + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> + */
> +
> +/*
> + * rpc_rdma.c
> + *
> + * This file contains the guts of the RPC RDMA protocol, and
> + * does marshaling/unmarshaling, etc. It is also where interfacing
> + * to the Linux RPC framework lives.
>   */
>  
>  #include "xprt_rdma.h"
>  
> -void rpcrdma_conn_func(struct rpcrdma_ep *a) { }
> -void rpcrdma_reply_handler(struct rpcrdma_rep *a) { }
> -int rpcrdma_marshal_req(struct rpc_rqst *a) { return EINVAL; }
> +#include <linux/nfs2.h>
> +#include <linux/nfs3.h>
> +#include <linux/nfs4.h>

I haven't looked closely at this yet, but is there really a dependency 
in here on NFS?  I don't see NFS dependencies in other parts of the RPC 
client or server, save the legacy debugging interface 
(/proc/sys/sunrpc/nfs_debug and friends).


> +#include <linux/highmem.h>
> +
> +#ifdef RPC_DEBUG
> +# define RPCDBG_FACILITY       RPCDBG_TRANS
> +#endif
> +
> +enum rpcrdma_chunktype {
> +       rpcrdma_noch = 0,
> +       rpcrdma_readch,
> +       rpcrdma_areadch,
> +       rpcrdma_writech,
> +       rpcrdma_replych
> +};
> +
> +#ifdef RPC_DEBUG
> +static const char transfertypes[][12] = {
> +       "pure inline",  /* no chunks */
> +       " read chunk",  /* some argument via rdma read */
> +       "*read chunk",  /* entire request via rdma read */
> +       "write chunk",  /* some result via rdma write */
> +       "reply chunk"   /* entire reply via rdma write */
> +};
> +#endif
> +
> +/*
> + * Chunk assembly from upper layer xdr_buf.
> + *
> + * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
> + * elements. Segments are then coalesced when registered, if possible
> + * within the selected memreg mode.
> + *
> + * Note, this routine is never called if the connection's memory
> + * registration strategy is 0 (bounce buffers).
> + */
> +
> +static int
> +rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, int first,
> +       enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs)
> +{
> +       int len, n = 0, p;
> +
> +       if (first == 0 && xdrbuf->head[0].iov_len) {
> +               seg[n].mr_page = NULL;
> +               seg[n].mr_offset = xdrbuf->head[0].iov_base;
> +               seg[n].mr_len = xdrbuf->head[0].iov_len;
> +               ++n;
> +       }
> +
> +       if (xdrbuf->page_len && (xdrbuf->pages[0] != NULL)) {
> +               if (n == nsegs)
> +                       return 0;
> +               seg[n].mr_page = xdrbuf->pages[0];
> +               seg[n].mr_offset = (void *)(unsigned long) xdrbuf->page_base;
> +               seg[n].mr_len = PAGE_SIZE - xdrbuf->page_base;
> +               len = xdrbuf->page_len - seg[n].mr_len;
> +               ++n;
> +               p = 1;
> +               while (len > 0) {
> +                       if (n == nsegs)
> +                               return 0;
> +                       seg[n].mr_page = xdrbuf->pages[p];
> +                       seg[n].mr_offset = NULL;
> +                       seg[n].mr_len = ((len > PAGE_SIZE) ? PAGE_SIZE : len);
> +                       len -= seg[n].mr_len;
> +                       ++n;
> +                       ++p;
> +               }
> +       }
> +
> +       if (xdrbuf->tail[0].iov_len && type != rpcrdma_writech) {
> +               if (n == nsegs)
> +                       return 0;
> +               seg[n].mr_page = NULL;
> +               seg[n].mr_offset = xdrbuf->tail[0].iov_base;
> +               seg[n].mr_len = xdrbuf->tail[0].iov_len;
> +               ++n;
> +       }
> +       return n;
> +}
> +
> +/*
> + * Create read/write chunk lists, and reply chunks, for RDMA
> + *
> + *   Assume check against THRESHOLD has been done, and chunks are required.
> + *   Assume only encoding one list entry for read|write chunks. The NFSv3
> + *     protocol is simple enough to allow this as it only has a single "bulk
> + *     result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The
> + *     RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.)
> + *
> + * When used for a single reply chunk (which is a special write
> + * chunk used for the entire reply, rather than just the data), it
> + * is used primarily for READDIR and READLINK which would otherwise
> + * be severely size-limited by a small rdma inline read max. The server
> + * response will come back as an RDMA Write, followed by a message
> + * of type RDMA_NOMSG carrying the xid and length. As a result, reply
> + * chunks do not provide data alignment, however they do not require
> + * "fixup" (moving the response to the upper layer buffer) either.
> + *
> + * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
> + *
> + *  Read chunklist (a linked list):
> + *   N elements, position P (same P for all chunks of same arg!):
> + *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
> + *
> + *  Write chunklist (a list of (one) counted array):
> + *   N elements:
> + *    1 - N - HLOO - HLOO - ... - HLOO - 0
> + *
> + *  Reply chunk (a counted array):
> + *   N elements:
> + *    1 - N - HLOO - HLOO - ... - HLOO
> + */
> +
> +static unsigned int
> +rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
> +               struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)
> +{
> +       struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
> +       struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_task->tk_xprt);
> +       int nsegs, nchunks = 0;
> +       int pos;
> +       struct rpcrdma_mr_seg *seg = req->rl_segments;
> +       struct rpcrdma_read_chunk *cur_rchunk = NULL;
> +       struct rpcrdma_write_array *warray = NULL;
> +       struct rpcrdma_write_chunk *cur_wchunk = NULL;
> +       u32 *iptr = headerp->rm_body.rm_chunks;
> +
> +       if (type == rpcrdma_readch || type == rpcrdma_areadch) {
> +               /* a read chunk - server will RDMA Read our memory */
> +               cur_rchunk = (struct rpcrdma_read_chunk *) iptr;
> +       } else {
> +               /* a write or reply chunk - server will RDMA Write our memory */
> +               *iptr++ = xdr_zero;     /* encode a NULL read chunk list */
> +               if (type == rpcrdma_replych)
> +                       *iptr++ = xdr_zero;     /* a NULL write chunk list */
> +               warray = (struct rpcrdma_write_array *) iptr;
> +               cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1);
> +       }
> +
> +       if (type == rpcrdma_replych || type == rpcrdma_areadch)
> +               pos = 0;
> +       else
> +               pos = target->head[0].iov_len;
> +
> +       nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS);
> +       if (nsegs == 0)
> +               return 0;
> +
> +       do {
> +               /* bind/register the memory, then build chunk from result. */
> +               int n = rpcrdma_register_external(seg, nsegs,
> +                                               cur_wchunk != NULL, r_xprt);
> +               if (n <= 0)
> +                       goto out;
> +               if (cur_rchunk) {       /* read */
> +                       cur_rchunk->rc_discrim = xdr_one;
> +                       /* all read chunks have the same "position" */
> +                       cur_rchunk->rc_position = htonl(pos);
> +                       cur_rchunk->rc_target.rs_handle = htonl(seg->mr_rkey);
> +                       cur_rchunk->rc_target.rs_length = htonl(seg->mr_len);
> +                       xdr_encode_hyper((u32 *)&cur_rchunk->rc_target.rs_offset,
> +                                       seg->mr_base);
> +                       dprintk("RPC:       %s: read chunk "
> +                               "elem %d@0x%llx:0x%x pos %d (%s)\n", __func__,
> +                               seg->mr_len, seg->mr_base, seg->mr_rkey, pos,
> +                               n < nsegs ? "more" : "last");
> +                       cur_rchunk++;
> +                       r_xprt->rx_stats.read_chunk_count++;
> +               } else {                /* write/reply */
> +                       cur_wchunk->wc_target.rs_handle = htonl(seg->mr_rkey);
> +                       cur_wchunk->wc_target.rs_length = htonl(seg->mr_len);
> +                       xdr_encode_hyper((u32 *)&cur_wchunk->wc_target.rs_offset,
> +                                       seg->mr_base);
> +                       dprintk("RPC:       %s: %s chunk "
> +                               "elem %d@0x%llx:0x%x (%s)\n", __func__,
> +                               (type == rpcrdma_replych) ? "reply" : "write",
> +                               seg->mr_len, seg->mr_base, seg->mr_rkey,
> +                               n < nsegs ? "more" : "last");
> +                       cur_wchunk++;
> +                       if (type == rpcrdma_replych)
> +                               r_xprt->rx_stats.reply_chunk_count++;
> +                       else
> +                               r_xprt->rx_stats.write_chunk_count++;
> +                       r_xprt->rx_stats.total_rdma_request += seg->mr_len;
> +               }
> +               nchunks++;
> +               seg   += n;
> +               nsegs -= n;
> +       } while (nsegs);
> +
> +       /* success. all failures return above */
> +       req->rl_nchunks = nchunks;
> +
> +       BUG_ON(nchunks == 0);
> +
> +       /*
> +       * finish off header. If write, marshal discrim and nchunks.
> +       */
> +       if (cur_rchunk) {
> +               iptr = (u32 *) cur_rchunk;
> +               *iptr++ = xdr_zero;     /* finish the read chunk list */
> +               *iptr++ = xdr_zero;     /* encode a NULL write chunk list */
> +               *iptr++ = xdr_zero;     /* encode a NULL reply chunk */
> +       } else {
> +               warray->wc_discrim = xdr_one;
> +               warray->wc_nchunks = htonl(nchunks);
> +               iptr = (u32 *) cur_wchunk;
> +               if (type == rpcrdma_writech) {
> +                       *iptr++ = xdr_zero; /* finish the write chunk list */
> +                       *iptr++ = xdr_zero; /* encode a NULL reply chunk */
> +               }
> +       }
> +
> +       /*
> +       * Return header size.
> +       */
> +       return (unsigned char *)iptr - (unsigned char *)headerp;
> +
> +out:
> +       for (pos = 0; nchunks--; )
> +               pos += rpcrdma_deregister_external(
> +                               &req->rl_segments[pos], r_xprt, NULL);
> +       return 0;
> +}
> +
> +/*
> + * Copy write data inline.
> + * This function is used for "small" requests. Data which is passed
> + * to RPC via iovecs (or page list) is copied directly into the
> + * pre-registered memory buffer for this request. For small amounts
> + * of data, this is efficient. The cutoff value is tunable.
> + */
> +static int
> +rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
> +{
> +       int i, npages, curlen;
> +       int copy_len;
> +       unsigned char *srcp, *destp;
> +
> +       destp = rqst->rq_svec[0].iov_base;
> +       curlen = rqst->rq_svec[0].iov_len;
> +       destp += curlen;
> +       /*
> +       * Do optional padding where it makes sense. Alignment of write
> +       * payload can help the server, if our setting is accurate.
> +       */
> +       pad -= (curlen + 36 /*sizeof(struct rpcrdma_msg_padded)*/);
> +       if (pad < 0 || rqst->rq_slen - curlen < RPCRDMA_INLINE_PAD_THRESH)
> +               pad = 0;        /* don't pad this request */
> +
> +       dprintk("RPC:       %s: pad %d destp 0x%p len %d hdrlen %d\n",
> +               __func__, pad, destp, rqst->rq_slen, curlen);
> +
> +       copy_len = rqst->rq_snd_buf.page_len;
> +       rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.pullup_copy_count += copy_len;
> +       npages = PAGE_ALIGN(rqst->rq_snd_buf.page_base + copy_len) >> PAGE_SHIFT;
> +       for (i = 0; copy_len && i < npages; i++) {
> +               if (i == 0)
> +                       curlen = PAGE_SIZE - rqst->rq_snd_buf.page_base;
> +               else
> +                       curlen = PAGE_SIZE;
> +               if (curlen > copy_len)
> +                       curlen = copy_len;
> +               dprintk("RPC:       %s: page %d destp 0x%p len %d curlen %d\n",
> +                       __func__, i, destp, copy_len, curlen);
> +               srcp = kmap_atomic(rqst->rq_snd_buf.pages[i],
> +                                       KM_SKB_SUNRPC_DATA);
> +               if (i == 0)
> +                       memcpy(destp, srcp+rqst->rq_snd_buf.page_base, curlen);
> +               else
> +                       memcpy(destp, srcp, curlen);
> +               kunmap_atomic(srcp, KM_SKB_SUNRPC_DATA);
> +               rqst->rq_svec[0].iov_len += curlen;
> +               destp += curlen;
> +               copy_len -= curlen;
> +       }
> +       if (rqst->rq_snd_buf.tail[0].iov_len) {
> +               curlen = rqst->rq_snd_buf.tail[0].iov_len;
> +               if (destp != rqst->rq_snd_buf.tail[0].iov_base) {
> +                       memcpy(destp, rqst->rq_snd_buf.tail[0].iov_base, curlen);
> +                       rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.pullup_copy_count += curlen;
> +               }
> +               dprintk("RPC:       %s: tail destp 0x%p len %d curlen %d\n",
> +                       __func__, destp, copy_len, curlen);
> +               rqst->rq_svec[0].iov_len += curlen;
> +       }
> +       /* header now contains entire send message */
> +       return pad;
> +}
> +
> +/*
> + * Totally imperfect, temporary attempt to detect nfs reads...
> + * e.g. establish a hint via xdr_inline_pages, etc.
> + */
> +static int
> +is_nfs_read(struct rpc_rqst *rqst)
> +{
> +       u32 *p;
> +
> +       if (rqst->rq_task->tk_client->cl_prog != NFS_PROGRAM)
> +               return 0;
> +       switch (rqst->rq_task->tk_client->cl_vers) {
> +       case 4:
> +               /* Must dig into the COMPOUND. */
> +               /* Back up from the end of what a read request would be */
> +               /* PUTFH, fh, OP_READ, stateid(16), offset(8), count(4) */
> +               p = (u32 *)(rqst->rq_snd_buf.head[0].iov_base +
> +                           rqst->rq_snd_buf.head[0].iov_len);
> +               /* test read and count */
> +               return (rqst->rq_snd_buf.head[0].iov_len > 40 &&
> +                       p[-8] == __constant_htonl(OP_READ) &&
> +
> +                       p[-1] == htonl(rqst->rq_rcv_buf.page_len));
> +       case 3:
> +               return rqst->rq_task->tk_msg.rpc_proc->p_proc == NFS3PROC_READ;
> +       case 2:
> +               return rqst->rq_task->tk_msg.rpc_proc->p_proc == NFSPROC_READ;
> +       }
> +       return 0;
> +}
> +
> +/*
> + * Marshal a request: the primary job of this routine is to choose
> + * the transfer modes. See comments below.
> + *
> + * Uses multiple RDMA IOVs for a request:
> + *  [0] -- RPC RDMA header, which uses memory from the *start* of the
> + *         preregistered buffer that already holds the RPC data in
> + *         its middle.
> + *  [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.
> + *  [2] -- optional padding.
> + *  [3] -- if padded, header only in [1] and data here.
> + */
> +
> +int
> +rpcrdma_marshal_req(struct rpc_rqst *rqst)
> +{
> +       struct rpc_xprt *xprt = rqst->rq_task->tk_xprt;
> +       struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
> +       struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
> +       char *base;
> +       size_t hdrlen, rpclen, padlen;
> +       enum rpcrdma_chunktype rtype, wtype;
> +       struct rpcrdma_msg *headerp;
> +
> +       /*
> +       * rpclen gets amount of data in first buffer, which is the
> +       * pre-registered buffer.
> +       */
> +       base = rqst->rq_svec[0].iov_base;
> +       rpclen = rqst->rq_svec[0].iov_len;
> +
> +       /* build RDMA header in private area at front */
> +       headerp = (struct rpcrdma_msg *) req->rl_base;
> +       /* don't htonl XID, it's already done in request */
> +       headerp->rm_xid = rqst->rq_xid;
> +       headerp->rm_vers = xdr_one;
> +       headerp->rm_credit = htonl(r_xprt->rx_buf.rb_max_requests);
> +       headerp->rm_type = __constant_htonl(RDMA_MSG);
> +
> +       /*
> +       * Chunks needed for results?
> +       *
> +       * o If the expected result is under the inline threshold, all ops
> +       *   return as inline (but see later).
> +       * o Large non-read ops return as a single reply chunk.
> +       * o Large read ops return data as write chunk(s), header as inline.
> +       *
> +       * Note: the NFS code sending down multiple result segments implies
> +       * the op is one of read, readdir[plus], readlink or NFSv4 getacl.
> +       */
> +
> +       /*
> +       * This code can handle read chunks, write chunks OR reply
> +       * chunks -- only one type. If the request is too big to fit
> +       * inline, then we will choose read chunks. If the request is
> +       * a READ, then use write chunks to separate the file data
> +       * into pages; otherwise use reply chunks.
> +       */
> +       if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst))
> +               wtype = rpcrdma_noch;
> +       else if (rqst->rq_rcv_buf.page_len == 0)
> +               wtype = rpcrdma_replych;
> +       else if (is_nfs_read(rqst))
> +               wtype = rpcrdma_writech;
> +       else
> +               wtype = rpcrdma_replych;
> +
> +       /*
> +       * Chunks needed for arguments?
> +       *
> +       * o If the total request is under the inline threshold, all ops
> +       *   are sent as inline.
> +       * o Large non-write ops are sent with the entire message as a
> +       *   single read chunk (protocol 0-position special case).
> +       * o Large write ops transmit data as read chunk(s), header as
> +       *   inline.
> +       *
> +       * Note: the NFS code sending down multiple argument segments
> +       * implies the op is a write.
> +       * TBD check NFSv4 setacl
> +       */
> +       if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
> +               rtype = rpcrdma_noch;
> +       else if (rqst->rq_snd_buf.page_len == 0)
> +               rtype = rpcrdma_areadch;
> +       else
> +               rtype = rpcrdma_readch;
> +
> +       /* The following simplification is not true forever */
> +       if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
> +               wtype = rpcrdma_noch;
> +       BUG_ON(rtype != rpcrdma_noch && wtype != rpcrdma_noch);
> +
> +       if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS &&
> +           (rtype != rpcrdma_noch || wtype != rpcrdma_noch)) {
> +               /* forced to "pure inline"? */
> +               dprintk("RPC:       %s: too much data (%d/%d) for inline\n",
> +                       __func__, rqst->rq_rcv_buf.len, rqst->rq_snd_buf.len);
> +               return -1;
> +       }
> +
> +       hdrlen = 28; /*sizeof *headerp;*/
> +       padlen = 0;
> +
> +       /*
> +       * Pull up any extra send data into the preregistered buffer.
> +       * When padding is in use and applies to the transfer, insert
> +       * it and change the message type.
> +       */
> +       if (rtype == rpcrdma_noch) {
> +
> +               padlen = rpcrdma_inline_pullup(rqst, RPCRDMA_INLINE_PAD_VALUE(rqst));
> +
> +               if (padlen) {
> +                       headerp->rm_type = __constant_htonl(RDMA_MSGP);
> +                       headerp->rm_body.rm_padded.rm_align =
> +                               htonl(RPCRDMA_INLINE_PAD_VALUE(rqst));
> +                       headerp->rm_body.rm_padded.rm_thresh =
> +                               __constant_htonl(RPCRDMA_INLINE_PAD_THRESH);
> +                       headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero;
> +                       headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
> +                       headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
> +                       hdrlen += 2 * sizeof (u32);     /* extra words in padhdr */
> +                       BUG_ON(wtype != rpcrdma_noch);
> +
> +               } else {
> +                       headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
> +                       headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
> +                       headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
> +                       /* new length after pullup */
> +                       rpclen = rqst->rq_svec[0].iov_len;
> +                       /*
> +                       * Currently we try to not actually use read inline.
> +                       * Reply chunks have the desirable property that
> +                       * they land, packed, directly in the target buffers
> +                       * without headers, so they require no fixup. The
> +                       * additional RDMA Write op sends the same amount
> +                       * of data, streams on-the-wire and adds no overhead
> +                       * on receive. Therefore, we request a reply chunk
> +                       * for non-writes wherever feasible and efficient.
> +                       */
> +                       if (wtype == rpcrdma_noch &&
> +                           r_xprt->rx_ia.ri_memreg_strategy > RPCRDMA_REGISTER)
> +                               wtype = rpcrdma_replych;
> +               }
> +       }
> +
> +       /*
> +       * Marshal chunks. This routine will return the header length
> +       * consumed by marshaling.
> +       */
> +       if (rtype != rpcrdma_noch) {
> +               hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf, headerp, rtype);
> +               wtype = rtype;  /* simplify dprintk */
> +
> +       } else if (wtype != rpcrdma_noch) {
> +               hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf, headerp, wtype);
> +       }
> +
> +       if (hdrlen == 0)
> +               return -1;
> +
> +       dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd\n"
> +               "                   headerp 0x%p base 0x%p lkey 0x%x\n",
> +               __func__, transfertypes[wtype], hdrlen, rpclen, padlen,
> +               headerp, base, req->rl_iov.lkey);
> +
> +       /*
> +       * initialize send_iov's - normally only two: rdma chunk header and
> +       * single preregistered RPC header buffer, but if padding is present,
> +       * then use a preregistered (and zeroed) pad buffer between the RPC
> +       * header and any write data. In all non-rdma cases, any following
> +       * data has been copied into the RPC header buffer.
> +       */
> +       req->rl_send_iov[0].addr = req->rl_iov.addr;
> +       req->rl_send_iov[0].length = hdrlen;
> +       req->rl_send_iov[0].lkey = req->rl_iov.lkey;
> +
> +       req->rl_send_iov[1].addr = req->rl_iov.addr + (base - req->rl_base);
> +       req->rl_send_iov[1].length = rpclen;
> +       req->rl_send_iov[1].lkey = req->rl_iov.lkey;
> +
> +       req->rl_niovs = 2;
> +
> +       if (padlen) {
> +               struct rpcrdma_ep *ep = &r_xprt->rx_ep;
> +
> +               req->rl_send_iov[2].addr = ep->rep_pad.addr;
> +               req->rl_send_iov[2].length = padlen;
> +               req->rl_send_iov[2].lkey = ep->rep_pad.lkey;
> +
> +               req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
> +               req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
> +               req->rl_send_iov[3].lkey = req->rl_iov.lkey;
> +
> +               req->rl_niovs = 4;
> +       }
> +
> +       return 0;
> +}
> +
> +/*
> + * Chase down a received write or reply chunklist to get length
> + * RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
> + */
> +static int
> +rpcrdma_count_chunks(struct rpcrdma_rep *rep, int max, int wrchunk, u32 **iptrp)
> +{
> +       unsigned int i, total_len;
> +       struct rpcrdma_write_chunk *cur_wchunk;
> +
> +       i = ntohl(**iptrp);     /* get array count */
> +       if (i > max) {
> +               return -1;
> +       }
> +       cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
> +       total_len = 0;
> +       while (i--) {
> +               ifdebug(FACILITY) {
> +                       u64 off;
> +                       xdr_decode_hyper((u32 *)&cur_wchunk->wc_target.rs_offset, &off);
> +                       dprintk("RPC:       %s: chunk %d@0x%llx:0x%x\n",
> +                               __func__, ntohl(cur_wchunk->wc_target.rs_length),
> +                               off, ntohl(cur_wchunk->wc_target.rs_handle));
> +               }
> +               total_len += ntohl(cur_wchunk->wc_target.rs_length);
> +               ++cur_wchunk;
> +       }
> +       /* check and adjust for properly terminated write chunk */
> +       if (wrchunk) {
> +               u32 *w = (u32 *) cur_wchunk;
> +               if (*w++ != xdr_zero)
> +                       return -1;
> +               cur_wchunk = (struct rpcrdma_write_chunk *) w;
> +       }
> +       if ((char *) cur_wchunk > rep->rr_base + rep->rr_len)
> +               return -1;
> +
> +       *iptrp = (u32 *) cur_wchunk;
> +       return total_len;
> +}
> +
> +/*
> + * Scatter inline received data back into provided iov's.
> + */
> +static void
> +rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len)
> +{
> +       int i, npages, curlen, olen;
> +       char *destp;
> +
> +       curlen = rqst->rq_rcv_buf.head[0].iov_len;
> +       if (curlen > copy_len) {        /* write chunk header fixup */
> +               curlen = copy_len;
> +               rqst->rq_rcv_buf.head[0].iov_len = curlen;
> +       }
> +
> +       dprintk("RPC:       %s: srcp 0x%p len %d hdrlen %d\n",
> +               __func__, srcp, copy_len, curlen);
> +
> +       /* Shift pointer for first receive segment only */
> +       rqst->rq_rcv_buf.head[0].iov_base = srcp;
> +       srcp += curlen;
> +       copy_len -= curlen;
> +
> +       olen = copy_len;
> +       i = 0;
> +       rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen;
> +       if (copy_len && rqst->rq_rcv_buf.page_len) {
> +               npages = PAGE_ALIGN(rqst->rq_rcv_buf.page_base +
> +                       rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT;
> +               for ( ; i < npages; i++) {
> +                       if (i == 0)
> +                               curlen = PAGE_SIZE - rqst->rq_rcv_buf.page_base;
> +                       else
> +                               curlen = PAGE_SIZE;
> +                       if (curlen > copy_len)
> +                               curlen = copy_len;
> +                       dprintk("RPC:       %s: page %d srcp 0x%p len %d curlen %d\n",
> +                               __func__, i, srcp, copy_len, curlen);
> +                       destp = kmap_atomic(rqst->rq_rcv_buf.pages[i],
> +                                               KM_SKB_SUNRPC_DATA);
> +                       if (i == 0)
> +                               memcpy(destp + rqst->rq_rcv_buf.page_base,
> +                                               srcp, curlen);
> +                       else
> +                               memcpy(destp, srcp, curlen);
> +                       flush_dcache_page(rqst->rq_rcv_buf.pages[i]);
> +                       kunmap_atomic(destp, KM_SKB_SUNRPC_DATA);
> +                       srcp += curlen;
> +                       if ((copy_len -= curlen) == 0)
> +                               break;
> +               }
> +               rqst->rq_rcv_buf.page_len = olen - copy_len;
> +       } else
> +               rqst->rq_rcv_buf.page_len = 0;
> +
> +       if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) {
> +               curlen = copy_len;
> +               if (curlen > rqst->rq_rcv_buf.tail[0].iov_len)
> +                       curlen = rqst->rq_rcv_buf.tail[0].iov_len;
> +               if (rqst->rq_rcv_buf.tail[0].iov_base != srcp)
> +                       memcpy(rqst->rq_rcv_buf.tail[0].iov_base, srcp, curlen);
> +               dprintk("RPC:       %s: tail srcp 0x%p len %d curlen %d\n",
> +                       __func__, srcp, copy_len, curlen);
> +               rqst->rq_rcv_buf.tail[0].iov_len = curlen;
> +               copy_len -= curlen; ++i;
> +       } else
> +               rqst->rq_rcv_buf.tail[0].iov_len = 0;
> +
> +       if (copy_len)
> +               dprintk("RPC:       %s: %d bytes in %d extra segments (%d lost)\n",
> +                       __func__, olen, i, copy_len);
> +
> +       /* TBD avoid a warning from call_decode() */
> +       rqst->rq_private_buf = rqst->rq_rcv_buf;
> +}
> +
> +/*
> + * This function is called when an async event is posted to
> + * the connection which changes the connection state. All it
> + * does at this point is mark the connection up/down, the rpc
> + * timers do the rest.
> + */
> +void
> +rpcrdma_conn_func(struct rpcrdma_ep *ep)
> +{
> +       struct rpc_xprt *xprt = ep->rep_xprt;
> +
> +       spin_lock_bh(&xprt->transport_lock);
> +       if (ep->rep_connected > 0) {
> +               if (!xprt_test_and_set_connected(xprt))
> +                       xprt_wake_pending_tasks(xprt, 0);
> +       } else {
> +               if (xprt_test_and_clear_connected(xprt))
> +                       xprt_wake_pending_tasks(xprt, ep->rep_connected);
> +       }
> +       spin_unlock_bh(&xprt->transport_lock);
> +}
> +
> +/*
> + * This function is called when memory window unbind which we are waiting
> + * for completes. Just use rr_func (zeroed by upcall) to signal completion.
> + */
> +static void
> +rpcrdma_unbind_func(struct rpcrdma_rep *rep)
> +{
> +       wake_up(&rep->rr_unbind);
> +}
> +
> +/*
> + * Called as a tasklet to do req/reply match and complete a request
> + * Errors must result in the RPC task either being awakened, or
> + * allowed to timeout, to discover the errors at that time.
> + */
> +void
> +rpcrdma_reply_handler(struct rpcrdma_rep *rep)
> +{
> +       struct rpcrdma_msg *headerp;
> +       struct rpcrdma_req *req;
> +       struct rpc_rqst *rqst;
> +       struct rpc_xprt *xprt = rep->rr_xprt;
> +       struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
> +       u32 *iptr;
> +       int i, rdmalen, status;
> +
> +       /* Check status. If bad, signal disconnect and return rep to pool */
> +       if (rep->rr_len == ~0U) {
> +               rpcrdma_recv_buffer_put(rep);
> +               if (r_xprt->rx_ep.rep_connected == 1) {
> +                       r_xprt->rx_ep.rep_connected = -EIO;
> +                       rpcrdma_conn_func(&r_xprt->rx_ep);
> +               }
> +               return;
> +       }
> +       if (rep->rr_len < 28) {
> +               dprintk("RPC:       %s: short/invalid reply\n", __func__);
> +               goto repost;
> +       }
> +       headerp = (struct rpcrdma_msg *) rep->rr_base;
> +       if (headerp->rm_vers != xdr_one) {
> +               dprintk("RPC:       %s: invalid version %d\n",
> +                       __func__, ntohl(headerp->rm_vers));
> +               goto repost;
> +       }
> +
> +       /* Get XID and try for a match. */
> +       spin_lock(&xprt->transport_lock);
> +       rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
> +       if (rqst == NULL) {
> +               spin_unlock(&xprt->transport_lock);
> +               dprintk("RPC:       %s: reply 0x%p failed "
> +                       "to match any request xid 0x%08x len %d\n",
> +                       __func__, rep, headerp->rm_xid, rep->rr_len);
> +repost:
> +               rep->rr_func = rpcrdma_reply_handler;
> +               if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
> +                       rpcrdma_recv_buffer_put(rep);
> +
> +               return;
> +       }
> +
> +       /* get request object */
> +       req = rpcr_to_rdmar(rqst);
> +
> +       dprintk("RPC:       %s: reply 0x%p completes request 0x%p\n"
> +               "                   RPC request 0x%p xid 0x%08x\n",
> +                       __func__, rep, req, rqst, headerp->rm_xid);
> +
> +       BUG_ON(!req || req->rl_reply);
> +
> +       /* from here on, the reply is no longer an orphan */
> +       req->rl_reply = rep;
> +
> +       /* check for expected message types */
> +       /* The order of some of these tests is important. */
> +       switch (headerp->rm_type) {
> +       case __constant_htonl(RDMA_MSG):
> +               /* never expect read chunks */
> +               /* never expect reply chunks (two ways to check) */
> +               /* never expect write chunks without having offered RDMA */
> +               if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
> +                   (headerp->rm_body.rm_chunks[1] == xdr_zero &&
> +                    headerp->rm_body.rm_chunks[2] != xdr_zero) ||
> +                   (headerp->rm_body.rm_chunks[1] != xdr_zero &&
> +                    req->rl_nchunks == 0)) {
> +                       goto badheader;
> +               }
> +               if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
> +                       /* count any expected write chunks in read reply */
> +                       /* start at write chunk array count */
> +                       iptr = &headerp->rm_body.rm_chunks[2];
> +                       rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 1, &iptr);
> +                       /* check for validity, and no reply chunk after */
> +                       if (rdmalen < 0 || *iptr++ != xdr_zero) {
> +                               goto badheader;
> +                       }
> +                       rep->rr_len -=
> +                           ((unsigned char *)iptr - (unsigned char *)headerp);
> +                       status = rep->rr_len + rdmalen;
> +                       r_xprt->rx_stats.total_rdma_reply += rdmalen;
> +               } else {
> +                       /* else ordinary inline */
> +                       iptr = (u32 *)((unsigned char *)headerp + 28);
> +                       rep->rr_len -= 28; /*sizeof *headerp;*/
> +                       status = rep->rr_len;
> +               }
> +               /* Fix up the rpc results for upper layer */
> +               rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len);
> +               break;
> +
> +       case __constant_htonl(RDMA_NOMSG):
> +               /* never expect read or write chunks, always reply chunks */
> +               if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
> +                   headerp->rm_body.rm_chunks[1] != xdr_zero ||
> +                   headerp->rm_body.rm_chunks[2] != xdr_one ||
> +                   req->rl_nchunks == 0) {
> +                       goto badheader;
> +               }
> +               iptr = (u32 *)((unsigned char *)headerp + 28);
> +               rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr);
> +               if (rdmalen < 0) {
> +                       goto badheader;
> +               }
> +               r_xprt->rx_stats.total_rdma_reply += rdmalen;
> +               /* Reply chunk buffer already is the reply vector - no fixup. */
> +               status = rdmalen;
> +               break;
> +
> +       default:
> +       badheader:
> +               dprintk("%s: invalid rpcrdma reply header (type %d):"
> +                               " chunks[012] == %d %d %d expected chunks <= %d\n",
> +                               __func__, ntohl(headerp->rm_type),
> +                               headerp->rm_body.rm_chunks[0],
> +                               headerp->rm_body.rm_chunks[1],
> +                               headerp->rm_body.rm_chunks[2],
> +                               req->rl_nchunks);
> +               status = -EIO;
> +               r_xprt->rx_stats.bad_reply_count++;
> +               break;
> +       }
> +
> +       /* If using mw bind, start the deregister process now. */
> +       /* (Note: if mr_free(), cannot perform it here, in tasklet context) */
> +       if (req->rl_nchunks) switch (r_xprt->rx_ia.ri_memreg_strategy) {
> +       case RPCRDMA_MEMWINDOWS:
> +               for (i = 0; req->rl_nchunks-- > 1; )
> +                       i += rpcrdma_deregister_external(
> +                               &req->rl_segments[i], r_xprt, NULL);
> +               /* Optionally wait (not here) for unbinds to complete */
> +               rep->rr_func = rpcrdma_unbind_func;
> +               (void) rpcrdma_deregister_external(&req->rl_segments[i], r_xprt, rep);
> +               break;
> +       case RPCRDMA_MEMWINDOWS_ASYNC:
> +               for (i = 0; req->rl_nchunks--; )
> +                       i += rpcrdma_deregister_external(
> +                               &req->rl_segments[i], r_xprt, NULL);
> +               break;
> +       default:
> +               break;
> +       }
> +
> +       dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
> +                       __func__, xprt, rqst, status);
> +       xprt_complete_rqst(rqst->rq_task, status);
> +       spin_unlock(&xprt->transport_lock);
> +}
> 
> 
> -------------------------------------------------------------------------
> This SF.net email is sponsored by DB2 Express
> Download DB2 Express C - the FREE version of DB2 express and take
> control of your XML. No limits. Just data. Click to get it now.
> http://sourceforge.net/powerbar/db2/
> _______________________________________________
> NFS maillist  -  NFS@lists.sourceforge.net
> https://lists.sourceforge.net/lists/listinfo/nfs


[-- Attachment #2: chuck.lever.vcf --]
[-- Type: text/x-vcard, Size: 315 bytes --]

begin:vcard
fn:Chuck Lever
n:Lever;Chuck
org:Oracle Corporation;Corporate Architecture: Linux Projects Group
adr:;;1015 Granger Avenue;Ann Arbor;MI;48104;USA
email;internet:chuck dot lever at nospam oracle dot com
title:Principal Member of Staff
tel;work:+1 248 614 5091
x-mozilla-html:FALSE
version:2.1
end:vcard


[-- Attachment #3: Type: text/plain, Size: 286 bytes --]

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/

[-- Attachment #4: Type: text/plain, Size: 140 bytes --]

_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [RFC Patch 08/09] NFS/RDMA client - rpcrdma protocol handling
  2007-07-13 16:35 ` Chuck Lever
@ 2007-07-13 16:50   ` Talpey, Thomas
  2007-07-13 17:11     ` Chuck Lever
  0 siblings, 1 reply; 8+ messages in thread
From: Talpey, Thomas @ 2007-07-13 16:50 UTC (permalink / raw)
  To: chuck.lever; +Cc: nfs

At 12:35 PM 7/13/2007, Chuck Lever wrote:
>Talpey, Thomas wrote:
>> RPCRDMA: rpc rdma protocol implementation
>> +#include <linux/nfs2.h>
>> +#include <linux/nfs3.h>
>> +#include <linux/nfs4.h>
>
>I haven't looked closely at this yet, but is there really a dependency 
>in here on NFS?  I don't see NFS dependencies in other parts of the RPC 
>client or server, save the legacy debugging interface 
>(/proc/sys/sunrpc/nfs_debug and friends).
>

It's right here:

>> +/*
>> + * Totally imperfect, temporary attempt to detect nfs reads...
>> + * e.g. establish a hint via xdr_inline_pages, etc.
>> + */
>> +static int
>> +is_nfs_read(struct rpc_rqst *rqst)
>> +{

In which the code peeks into the rpc request to divine its origin.
NFS reads prefer a slightly different RDMA handling, because they
(ideally) transfer directly into the buffer cache, or other pagelist
buffers such as passed by directio. Ops such as readdir however
prefer a contiguous transfer.

It's actually fairly clean in the case of NFSv3 and NFSv2, the nfs
#includes are simply there to define the procedure numbers. However
for NFSv4, there is a bit of ... uncleanliness.

The alternative is mentioned, and would involve marking pagelists
built in xdr_inline_pages(), this of course would also require changes to
the NFS-layer callers. I am prepared to do that, pending the outcome
of these comments.

The other integration concern is how to pass any RDMA parameters
at mount time, which of course you're well aware of. :-) I see Trond
has submitted some of your queued changes for 2.6.23, so I expect
to make use of them for the next patch revision, post .23-rc1.

Tom.

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [RFC Patch 08/09] NFS/RDMA client - rpcrdma protocol handling
  2007-07-13 16:50   ` Talpey, Thomas
@ 2007-07-13 17:11     ` Chuck Lever
  2007-07-13 17:28       ` Talpey, Thomas
  2007-08-24 17:12       ` Talpey, Thomas
  0 siblings, 2 replies; 8+ messages in thread
From: Chuck Lever @ 2007-07-13 17:11 UTC (permalink / raw)
  To: Talpey, Thomas; +Cc: nfs

[-- Attachment #1: Type: text/plain, Size: 1870 bytes --]

Talpey, Thomas wrote:
> At 12:35 PM 7/13/2007, Chuck Lever wrote:
>> Talpey, Thomas wrote:
>>> RPCRDMA: rpc rdma protocol implementation
>>> +#include <linux/nfs2.h>
>>> +#include <linux/nfs3.h>
>>> +#include <linux/nfs4.h>
>> I haven't looked closely at this yet, but is there really a dependency 
>> in here on NFS?  I don't see NFS dependencies in other parts of the RPC 
>> client or server, save the legacy debugging interface 
>> (/proc/sys/sunrpc/nfs_debug and friends).
>>
> 
> It's right here:
> 
>>> +/*
>>> + * Totally imperfect, temporary attempt to detect nfs reads...
>>> + * e.g. establish a hint via xdr_inline_pages, etc.
>>> + */
>>> +static int
>>> +is_nfs_read(struct rpc_rqst *rqst)
>>> +{
> 
> In which the code peeks into the rpc request to divine its origin.
> NFS reads prefer a slightly different RDMA handling, because they
> (ideally) transfer directly into the buffer cache, or other pagelist
> buffers such as passed by directio. Ops such as readdir however
> prefer a contiguous transfer.

Well, the other "read-like" operations (like readdir or readlink) use a 
single page cache page, as far as i recall.  In fact there is some 
desire to support multi-page readdir and readdirplus operations at some 
point.

> The alternative is mentioned, and would involve marking pagelists
> built in xdr_inline_pages(), this of course would also require changes to
> the NFS-layer callers. I am prepared to do that, pending the outcome
> of these comments.

I would humbly prefer the clean alternative: I think several other 
operations can use this.  Seems like the distinction is the operations 
that read data (like readdir, readlink, read) and those that read 
metadata (getattr).

The ULP should provide a hint on each of these.  Possibly you could hack 
the nfs_procedures tables (which is an RPC client data structure) to 
provide the hint.

[-- Attachment #2: chuck.lever.vcf --]
[-- Type: text/x-vcard, Size: 315 bytes --]

begin:vcard
fn:Chuck Lever
n:Lever;Chuck
org:Oracle Corporation;Corporate Architecture: Linux Projects Group
adr:;;1015 Granger Avenue;Ann Arbor;MI;48104;USA
email;internet:chuck dot lever at nospam oracle dot com
title:Principal Member of Staff
tel;work:+1 248 614 5091
x-mozilla-html:FALSE
version:2.1
end:vcard


[-- Attachment #3: Type: text/plain, Size: 286 bytes --]

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/

[-- Attachment #4: Type: text/plain, Size: 140 bytes --]

_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [RFC Patch 08/09] NFS/RDMA client - rpcrdma protocol handling
  2007-07-13 17:11     ` Chuck Lever
@ 2007-07-13 17:28       ` Talpey, Thomas
  2007-08-24 17:12       ` Talpey, Thomas
  1 sibling, 0 replies; 8+ messages in thread
From: Talpey, Thomas @ 2007-07-13 17:28 UTC (permalink / raw)
  To: chuck.lever; +Cc: nfs

At 01:11 PM 7/13/2007, Chuck Lever wrote:
>Well, the other "read-like" operations (like readdir or readlink) use a 
>single page cache page, as far as i recall.  In fact there is some 
>desire to support multi-page readdir and readdirplus operations at some 
>point.

Doesn't help, as a hint, since read can also operate on just a page.

>I would humbly prefer the clean alternative: I think several other 
>operations can use this.  Seems like the distinction is the operations 
>that read data (like readdir, readlink, read) and those that read 
>metadata (getattr).

Me too. But the distinction is finer than that. Basically the one that
RPC/RDMA needs to know is whether the rpc result that is destined
for the pagelist is bulk data, or encoded. Readdir, readdirplus, and
readlink are all in the latter category.

The other wrinkle is NFSv4 COMPOUND, which can wrap any number
of these. At the moment, the XDR interface doesn't support a COMPOUND
that would pass down multiple pagelists (e.g. more than one OP_READ),
so that's not an issue. This is the reason for the "simplification" comments
you may see in the code (and it's a good thing IMO).

>The ULP should provide a hint on each of these.  Possibly you could hack 
>the nfs_procedures tables (which is an RPC client data structure) to 
>provide the hint.

That would be simpler than extending the xdr_buf, but it might make
things difficult for NFSv4, which only ever uses one procedure.

Tom.

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [RFC Patch 08/09] NFS/RDMA client - rpcrdma protocol handling
  2007-07-13 17:11     ` Chuck Lever
  2007-07-13 17:28       ` Talpey, Thomas
@ 2007-08-24 17:12       ` Talpey, Thomas
  2007-08-24 18:44         ` Chuck Lever
  1 sibling, 1 reply; 8+ messages in thread
From: Talpey, Thomas @ 2007-08-24 17:12 UTC (permalink / raw)
  To: chuck.lever; +Cc: nfs

At 01:11 PM 7/13/2007, Chuck Lever wrote:
>Talpey, Thomas wrote:
>> The alternative is mentioned, and would involve marking pagelists
>> built in xdr_inline_pages(), this of course would also require changes to
>> the NFS-layer callers. I am prepared to do that, pending the outcome
>> of these comments.
>
>I would humbly prefer the clean alternative: I think several other 
>operations can use this.  Seems like the distinction is the operations 
>that read data (like readdir, readlink, read) and those that read 
>metadata (getattr).
>
>The ULP should provide a hint on each of these.  Possibly you could hack 
>the nfs_procedures tables (which is an RPC client data structure) to 
>provide the hint.

So, to clean this up, I looked into hacking this flag into the rpc_procinfo
as you suggested. It works, but I think it's too high up in the layering.
The issue is that we want to stamp each buffer with its bulk-data disposition,
not the entire procedure. For example, there might in the future be more than
one READ in an NFSv4 COMPOUND.

What do you think of the following? There are some data movers in xdr.c
that might peek at this flag for hints too, I haven't gone there yet.

Tom.

Index: linux-2.6.22/fs/nfs/nfs2xdr.c
===================================================================
--- linux-2.6.22.orig/fs/nfs/nfs2xdr.c
+++ linux-2.6.22/fs/nfs/nfs2xdr.c
@@ -251,6 +251,7 @@ nfs_xdr_readargs(struct rpc_rqst *req, _
 	replen = (RPC_REPHDRSIZE + auth->au_rslack + NFS_readres_sz) << 2;
 	xdr_inline_pages(&req->rq_rcv_buf, replen,
 			 args->pages, args->pgbase, count);
+	req->rq_rcv_buf.page_is_bulk = 1;
 	return 0;
 }
 
Index: linux-2.6.22/fs/nfs/nfs3xdr.c
===================================================================
--- linux-2.6.22.orig/fs/nfs/nfs3xdr.c
+++ linux-2.6.22/fs/nfs/nfs3xdr.c
@@ -346,6 +346,7 @@ nfs3_xdr_readargs(struct rpc_rqst *req, 
 	replen = (RPC_REPHDRSIZE + auth->au_rslack + NFS3_readres_sz) << 2;
 	xdr_inline_pages(&req->rq_rcv_buf, replen,
 			 args->pages, args->pgbase, count);
+	req->rq_rcv_buf.page_is_bulk = 1;
 	return 0;
 }
 
Index: linux-2.6.22/fs/nfs/nfs4xdr.c
===================================================================
--- linux-2.6.22.orig/fs/nfs/nfs4xdr.c
+++ linux-2.6.22/fs/nfs/nfs4xdr.c
@@ -1857,6 +1857,7 @@ static int nfs4_xdr_enc_read(struct rpc_
 	replen = (RPC_REPHDRSIZE + auth->au_rslack + NFS4_dec_read_sz) << 2;
 	xdr_inline_pages(&req->rq_rcv_buf, replen,
 			 args->pages, args->pgbase, args->count);
+	req->rq_rcv_buf.page_is_bulk = 1;
 out:
 	return status;
 }
Index: linux-2.6.22/include/linux/sunrpc/xdr.h
===================================================================
--- linux-2.6.22.orig/include/linux/sunrpc/xdr.h
+++ linux-2.6.22/include/linux/sunrpc/xdr.h
@@ -70,7 +70,8 @@ struct xdr_buf {
 
 	struct page **	pages;		/* Array of contiguous pages */
 	unsigned int	page_base,	/* Start of page data */
-			page_len;	/* Length of page data */
+			page_len,	/* Length of page data */
+			page_is_bulk;	/* Page(s) hold bulk data only */
 
 	unsigned int	buflen,		/* Total length of storage buffer */
 			len;		/* Length of XDR encoded message */
Index: linux-2.6.22/net/sunrpc/clnt.c
===================================================================
--- linux-2.6.22.orig/net/sunrpc/clnt.c
+++ linux-2.6.22/net/sunrpc/clnt.c
@@ -871,6 +871,7 @@ rpc_xdr_buf_init(struct xdr_buf *buf, vo
 	buf->head[0].iov_len = len;
 	buf->tail[0].iov_len = 0;
 	buf->page_len = 0;
+	buf->page_is_bulk = 0;
 	buf->len = 0;
 	buf->buflen = len;
 }
Index: linux-2.6.22/net/sunrpc/xprtrdma/rpc_rdma.c
===================================================================
--- linux-2.6.22.orig/net/sunrpc/xprtrdma/rpc_rdma.c
+++ linux-2.6.22/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -47,10 +47,6 @@
 
 #include "xprt_rdma.h"
 
-#include <linux/nfs2.h>
-#include <linux/nfs3.h>
-#include <linux/nfs4.h>
-
 #include <linux/highmem.h>
 
 #ifdef RPC_DEBUG
@@ -351,37 +347,6 @@ rpcrdma_inline_pullup(struct rpc_rqst *r
 }
 
 /*
- * Totally imperfect, temporary attempt to detect nfs reads...
- * e.g. establish a hint via xdr_inline_pages, etc.
- */
-static int
-is_nfs_read(struct rpc_rqst *rqst)
-{
-	u32 *p;
-
-	if (rqst->rq_task->tk_client->cl_prog != NFS_PROGRAM)
-		return 0;
-	switch (rqst->rq_task->tk_client->cl_vers) {
-	case 4:
-		/* Must dig into the COMPOUND. */
-		/* Back up from the end of what a read request would be */
-		/* PUTFH, fh, OP_READ, stateid(16), offset(8), count(4) */
-		p = (u32 *)(rqst->rq_snd_buf.head[0].iov_base +
-			    rqst->rq_snd_buf.head[0].iov_len);
-		/* test read and count */
-		return (rqst->rq_snd_buf.head[0].iov_len > 40 &&
-			p[-8] == __constant_htonl(OP_READ) &&
-
-			p[-1] == htonl(rqst->rq_rcv_buf.page_len));
-	case 3:
-		return rqst->rq_task->tk_msg.rpc_proc->p_proc == NFS3PROC_READ;
-	case 2:
-		return rqst->rq_task->tk_msg.rpc_proc->p_proc == NFSPROC_READ;
-	}
-	return 0;
-}
-
-/*
  * Marshal a request: the primary job of this routine is to choose
  * the transfer modes. See comments below.
  *
@@ -443,7 +408,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqs
 		wtype = rpcrdma_noch;
 	else if (rqst->rq_rcv_buf.page_len == 0)
 		wtype = rpcrdma_replych;
-	else if (is_nfs_read(rqst))
+	else if (rqst->rq_rcv_buf.page_is_bulk)
 		wtype = rpcrdma_writech;
 	else
 		wtype = rpcrdma_replych;

-------------------------------------------------------------------------
This SF.net email is sponsored by: Splunk Inc.
Still grepping through log files to find problems?  Stop.
Now Search log events and configuration files using AJAX and a browser.
Download your FREE copy of Splunk now >>  http://get.splunk.com/
_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [RFC Patch 08/09] NFS/RDMA client - rpcrdma protocol handling
  2007-08-24 17:12       ` Talpey, Thomas
@ 2007-08-24 18:44         ` Chuck Lever
  2007-08-24 19:38           ` Talpey, Thomas
  0 siblings, 1 reply; 8+ messages in thread
From: Chuck Lever @ 2007-08-24 18:44 UTC (permalink / raw)
  To: Talpey, Thomas; +Cc: nfs

[-- Attachment #1: Type: text/plain, Size: 6004 bytes --]

Talpey, Thomas wrote:
> At 01:11 PM 7/13/2007, Chuck Lever wrote:
>> Talpey, Thomas wrote:
>>> The alternative is mentioned, and would involve marking pagelists
>>> built in xdr_inline_pages(), this of course would also require changes to
>>> the NFS-layer callers. I am prepared to do that, pending the outcome
>>> of these comments.
>> I would humbly prefer the clean alternative: I think several other 
>> operations can use this.  Seems like the distinction is the operations 
>> that read data (like readdir, readlink, read) and those that read 
>> metadata (getattr).
>>
>> The ULP should provide a hint on each of these.  Possibly you could hack 
>> the nfs_procedures tables (which is an RPC client data structure) to 
>> provide the hint.
> 
> So, to clean this up, I looked into hacking this flag into the rpc_procinfo
> as you suggested. It works, but I think it's too high up in the layering.
> The issue is that we want to stamp each buffer with its bulk-data disposition,
> not the entire procedure. For example, there might in the future be more than
> one READ in an NFSv4 COMPOUND.
> 
> What do you think of the following? There are some data movers in xdr.c
> that might peek at this flag for hints too, I haven't gone there yet.

I like this much better than what was there.  The NFS client tells the 
transport layer exactly which pages are bulk, instead of having the 
transport guess.

The name "page_is_bulk" however implies that you are marking the pages, 
where really, you are marking the buffer.  Marking the whole buffer is 
probably correct, but then you should probably rename the flag for clarity.

> Index: linux-2.6.22/fs/nfs/nfs2xdr.c
> ===================================================================
> --- linux-2.6.22.orig/fs/nfs/nfs2xdr.c
> +++ linux-2.6.22/fs/nfs/nfs2xdr.c
> @@ -251,6 +251,7 @@ nfs_xdr_readargs(struct rpc_rqst *req, _
>  	replen = (RPC_REPHDRSIZE + auth->au_rslack + NFS_readres_sz) << 2;
>  	xdr_inline_pages(&req->rq_rcv_buf, replen,
>  			 args->pages, args->pgbase, count);
> +	req->rq_rcv_buf.page_is_bulk = 1;
>  	return 0;
>  }
>  
> Index: linux-2.6.22/fs/nfs/nfs3xdr.c
> ===================================================================
> --- linux-2.6.22.orig/fs/nfs/nfs3xdr.c
> +++ linux-2.6.22/fs/nfs/nfs3xdr.c
> @@ -346,6 +346,7 @@ nfs3_xdr_readargs(struct rpc_rqst *req, 
>  	replen = (RPC_REPHDRSIZE + auth->au_rslack + NFS3_readres_sz) << 2;
>  	xdr_inline_pages(&req->rq_rcv_buf, replen,
>  			 args->pages, args->pgbase, count);
> +	req->rq_rcv_buf.page_is_bulk = 1;
>  	return 0;
>  }
>  
> Index: linux-2.6.22/fs/nfs/nfs4xdr.c
> ===================================================================
> --- linux-2.6.22.orig/fs/nfs/nfs4xdr.c
> +++ linux-2.6.22/fs/nfs/nfs4xdr.c
> @@ -1857,6 +1857,7 @@ static int nfs4_xdr_enc_read(struct rpc_
>  	replen = (RPC_REPHDRSIZE + auth->au_rslack + NFS4_dec_read_sz) << 2;
>  	xdr_inline_pages(&req->rq_rcv_buf, replen,
>  			 args->pages, args->pgbase, args->count);
> +	req->rq_rcv_buf.page_is_bulk = 1;
>  out:
>  	return status;
>  }
> Index: linux-2.6.22/include/linux/sunrpc/xdr.h
> ===================================================================
> --- linux-2.6.22.orig/include/linux/sunrpc/xdr.h
> +++ linux-2.6.22/include/linux/sunrpc/xdr.h
> @@ -70,7 +70,8 @@ struct xdr_buf {
>  
>  	struct page **	pages;		/* Array of contiguous pages */
>  	unsigned int	page_base,	/* Start of page data */
> -			page_len;	/* Length of page data */
> +			page_len,	/* Length of page data */
> +			page_is_bulk;	/* Page(s) hold bulk data only */
>  
>  	unsigned int	buflen,		/* Total length of storage buffer */
>  			len;		/* Length of XDR encoded message */
> Index: linux-2.6.22/net/sunrpc/clnt.c
> ===================================================================
> --- linux-2.6.22.orig/net/sunrpc/clnt.c
> +++ linux-2.6.22/net/sunrpc/clnt.c
> @@ -871,6 +871,7 @@ rpc_xdr_buf_init(struct xdr_buf *buf, vo
>  	buf->head[0].iov_len = len;
>  	buf->tail[0].iov_len = 0;
>  	buf->page_len = 0;
> +	buf->page_is_bulk = 0;
>  	buf->len = 0;
>  	buf->buflen = len;
>  }
> Index: linux-2.6.22/net/sunrpc/xprtrdma/rpc_rdma.c
> ===================================================================
> --- linux-2.6.22.orig/net/sunrpc/xprtrdma/rpc_rdma.c
> +++ linux-2.6.22/net/sunrpc/xprtrdma/rpc_rdma.c
> @@ -47,10 +47,6 @@
>  
>  #include "xprt_rdma.h"
>  
> -#include <linux/nfs2.h>
> -#include <linux/nfs3.h>
> -#include <linux/nfs4.h>
> -
>  #include <linux/highmem.h>
>  
>  #ifdef RPC_DEBUG
> @@ -351,37 +347,6 @@ rpcrdma_inline_pullup(struct rpc_rqst *r
>  }
>  
>  /*
> - * Totally imperfect, temporary attempt to detect nfs reads...
> - * e.g. establish a hint via xdr_inline_pages, etc.
> - */
> -static int
> -is_nfs_read(struct rpc_rqst *rqst)
> -{
> -	u32 *p;
> -
> -	if (rqst->rq_task->tk_client->cl_prog != NFS_PROGRAM)
> -		return 0;
> -	switch (rqst->rq_task->tk_client->cl_vers) {
> -	case 4:
> -		/* Must dig into the COMPOUND. */
> -		/* Back up from the end of what a read request would be */
> -		/* PUTFH, fh, OP_READ, stateid(16), offset(8), count(4) */
> -		p = (u32 *)(rqst->rq_snd_buf.head[0].iov_base +
> -			    rqst->rq_snd_buf.head[0].iov_len);
> -		/* test read and count */
> -		return (rqst->rq_snd_buf.head[0].iov_len > 40 &&
> -			p[-8] == __constant_htonl(OP_READ) &&
> -
> -			p[-1] == htonl(rqst->rq_rcv_buf.page_len));
> -	case 3:
> -		return rqst->rq_task->tk_msg.rpc_proc->p_proc == NFS3PROC_READ;
> -	case 2:
> -		return rqst->rq_task->tk_msg.rpc_proc->p_proc == NFSPROC_READ;
> -	}
> -	return 0;
> -}
> -
> -/*
>   * Marshal a request: the primary job of this routine is to choose
>   * the transfer modes. See comments below.
>   *
> @@ -443,7 +408,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqs
>  		wtype = rpcrdma_noch;
>  	else if (rqst->rq_rcv_buf.page_len == 0)
>  		wtype = rpcrdma_replych;
> -	else if (is_nfs_read(rqst))
> +	else if (rqst->rq_rcv_buf.page_is_bulk)
>  		wtype = rpcrdma_writech;
>  	else
>  		wtype = rpcrdma_replych;

[-- Attachment #2: chuck.lever.vcf --]
[-- Type: text/x-vcard, Size: 290 bytes --]

begin:vcard
fn:Chuck Lever
n:Lever;Chuck
org:Oracle Corporation;Corporate Architecture: Linux Projects Group
adr:;;1015 Granger Avenue;Ann Arbor;MI;48104;USA
title:Principal Member of Staff
tel;work:+1 248 614 5091
x-mozilla-html:FALSE
url:http://oss.oracle.com/~cel
version:2.1
end:vcard


[-- Attachment #3: Type: text/plain, Size: 315 bytes --]

-------------------------------------------------------------------------
This SF.net email is sponsored by: Splunk Inc.
Still grepping through log files to find problems?  Stop.
Now Search log events and configuration files using AJAX and a browser.
Download your FREE copy of Splunk now >>  http://get.splunk.com/

[-- Attachment #4: Type: text/plain, Size: 140 bytes --]

_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [RFC Patch 08/09] NFS/RDMA client - rpcrdma protocol handling
  2007-08-24 18:44         ` Chuck Lever
@ 2007-08-24 19:38           ` Talpey, Thomas
  0 siblings, 0 replies; 8+ messages in thread
From: Talpey, Thomas @ 2007-08-24 19:38 UTC (permalink / raw)
  To: chuck.lever; +Cc: nfs

At 02:44 PM 8/24/2007, Chuck Lever wrote:
>The name "page_is_bulk" however implies that you are marking the pages, 
>where really, you are marking the buffer.  Marking the whole buffer is 
>probably correct, but then you should probably rename the flag for clarity.

Roger. The intent is actually to just mark the "pages" array though.
The head and tail are used for headers and dribble/roundup, so can't
be bulk. For instance, they're not well-aligned nor do they have round
sizes. Maybe... page_data_is_bulk? Should it be a bit flag perhaps?

One other comment while I have the floor. The comment in xdr.h says
"array of contiguous pages". The pages themselves aren't actually
contiguous, though when used for bulk data they do represent a
set of contiguous file blocks. Should that comment be tweaked do
you think?

Tom.

>> Index: linux-2.6.22/include/linux/sunrpc/xdr.h
>> ===================================================================
>> --- linux-2.6.22.orig/include/linux/sunrpc/xdr.h
>> +++ linux-2.6.22/include/linux/sunrpc/xdr.h
>> @@ -70,7 +70,8 @@ struct xdr_buf {
>>  
>>  	struct page **	pages;		/* Array of contiguous pages */
>>  	unsigned int	page_base,	/* Start of page data */
>> -			page_len;	/* Length of page data */
>> +			page_len,	/* Length of page data */
>> +			page_is_bulk;	/* Page(s) hold bulk data only */
>>  
>>  	unsigned int	buflen,		/* Total length of storage buffer */
>>  			len;		/* Length of XDR encoded message */

-------------------------------------------------------------------------
This SF.net email is sponsored by: Splunk Inc.
Still grepping through log files to find problems?  Stop.
Now Search log events and configuration files using AJAX and a browser.
Download your FREE copy of Splunk now >>  http://get.splunk.com/
_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply	[flat|nested] 8+ messages in thread

end of thread, other threads:[~2007-08-24 19:38 UTC | newest]

Thread overview: 8+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2007-07-11 21:08 [RFC Patch 08/09] NFS/RDMA client - rpcrdma protocol handling Talpey, Thomas
2007-07-13 16:35 ` Chuck Lever
2007-07-13 16:50   ` Talpey, Thomas
2007-07-13 17:11     ` Chuck Lever
2007-07-13 17:28       ` Talpey, Thomas
2007-08-24 17:12       ` Talpey, Thomas
2007-08-24 18:44         ` Chuck Lever
2007-08-24 19:38           ` Talpey, Thomas

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.