All of lore.kernel.org
 help / color / mirror / Atom feed
From: Trond Myklebust <trondmy@gmail.com>
To: linux-nfs@vger.kernel.org
Subject: [PATCH v3 40/44] SUNRPC: Simplify TCP receive code by switching to using iterators
Date: Mon, 17 Sep 2018 09:03:31 -0400	[thread overview]
Message-ID: <20180917130335.112832-41-trond.myklebust@hammerspace.com> (raw)
In-Reply-To: <20180917130335.112832-40-trond.myklebust@hammerspace.com>

Most of this code should also be reusable with other socket types.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 include/linux/sunrpc/xprtsock.h |  19 +-
 include/trace/events/sunrpc.h   |  15 +-
 net/sunrpc/xprtsock.c           | 694 +++++++++++++++-----------------
 3 files changed, 335 insertions(+), 393 deletions(-)

diff --git a/include/linux/sunrpc/xprtsock.h b/include/linux/sunrpc/xprtsock.h
index 005cfb6e7238..458bfe0137f5 100644
--- a/include/linux/sunrpc/xprtsock.h
+++ b/include/linux/sunrpc/xprtsock.h
@@ -31,15 +31,16 @@ struct sock_xprt {
 	 * State of TCP reply receive
 	 */
 	struct {
-		__be32		fraghdr,
+		struct {
+			__be32	fraghdr,
 				xid,
 				calldir;
+		} __attribute__((packed));
 
 		u32		offset,
 				len;
 
-		unsigned long	copied,
-				flags;
+		unsigned long	copied;
 	} recv;
 
 	/*
@@ -76,21 +77,9 @@ struct sock_xprt {
 	void			(*old_error_report)(struct sock *);
 };
 
-/*
- * TCP receive state flags
- */
-#define TCP_RCV_LAST_FRAG	(1UL << 0)
-#define TCP_RCV_COPY_FRAGHDR	(1UL << 1)
-#define TCP_RCV_COPY_XID	(1UL << 2)
-#define TCP_RCV_COPY_DATA	(1UL << 3)
-#define TCP_RCV_READ_CALLDIR	(1UL << 4)
-#define TCP_RCV_COPY_CALLDIR	(1UL << 5)
-
 /*
  * TCP RPC flags
  */
-#define TCP_RPC_REPLY		(1UL << 6)
-
 #define XPRT_SOCK_CONNECTING	1U
 #define XPRT_SOCK_DATA_READY	(2)
 #define XPRT_SOCK_UPD_TIMEOUT	(3)
diff --git a/include/trace/events/sunrpc.h b/include/trace/events/sunrpc.h
index 0aa347194e0f..19e08d12696c 100644
--- a/include/trace/events/sunrpc.h
+++ b/include/trace/events/sunrpc.h
@@ -497,16 +497,6 @@ TRACE_EVENT(xs_tcp_data_ready,
 			__get_str(port), __entry->err, __entry->total)
 );
 
-#define rpc_show_sock_xprt_flags(flags) \
-	__print_flags(flags, "|", \
-		{ TCP_RCV_LAST_FRAG, "TCP_RCV_LAST_FRAG" }, \
-		{ TCP_RCV_COPY_FRAGHDR, "TCP_RCV_COPY_FRAGHDR" }, \
-		{ TCP_RCV_COPY_XID, "TCP_RCV_COPY_XID" }, \
-		{ TCP_RCV_COPY_DATA, "TCP_RCV_COPY_DATA" }, \
-		{ TCP_RCV_READ_CALLDIR, "TCP_RCV_READ_CALLDIR" }, \
-		{ TCP_RCV_COPY_CALLDIR, "TCP_RCV_COPY_CALLDIR" }, \
-		{ TCP_RPC_REPLY, "TCP_RPC_REPLY" })
-
 TRACE_EVENT(xs_tcp_data_recv,
 	TP_PROTO(struct sock_xprt *xs),
 
@@ -516,7 +506,6 @@ TRACE_EVENT(xs_tcp_data_recv,
 		__string(addr, xs->xprt.address_strings[RPC_DISPLAY_ADDR])
 		__string(port, xs->xprt.address_strings[RPC_DISPLAY_PORT])
 		__field(u32, xid)
-		__field(unsigned long, flags)
 		__field(unsigned long, copied)
 		__field(unsigned int, reclen)
 		__field(unsigned long, offset)
@@ -526,15 +515,13 @@ TRACE_EVENT(xs_tcp_data_recv,
 		__assign_str(addr, xs->xprt.address_strings[RPC_DISPLAY_ADDR]);
 		__assign_str(port, xs->xprt.address_strings[RPC_DISPLAY_PORT]);
 		__entry->xid = be32_to_cpu(xs->recv.xid);
-		__entry->flags = xs->recv.flags;
 		__entry->copied = xs->recv.copied;
 		__entry->reclen = xs->recv.len;
 		__entry->offset = xs->recv.offset;
 	),
 
-	TP_printk("peer=[%s]:%s xid=0x%08x flags=%s copied=%lu reclen=%u offset=%lu",
+	TP_printk("peer=[%s]:%s xid=0x%08x copied=%lu reclen=%u offset=%lu",
 			__get_str(addr), __get_str(port), __entry->xid,
-			rpc_show_sock_xprt_flags(__entry->flags),
 			__entry->copied, __entry->reclen, __entry->offset)
 );
 
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index f16406228ead..5269ad98bb08 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -47,13 +47,13 @@
 #include <net/checksum.h>
 #include <net/udp.h>
 #include <net/tcp.h>
+#include <linux/bvec.h>
+#include <linux/uio.h>
 
 #include <trace/events/sunrpc.h>
 
 #include "sunrpc.h"
 
-#define RPC_TCP_READ_CHUNK_SZ	(3*512*1024)
-
 static void xs_close(struct rpc_xprt *xprt);
 static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt,
 		struct socket *sock);
@@ -325,6 +325,320 @@ static void xs_free_peer_addresses(struct rpc_xprt *xprt)
 		}
 }
 
+static size_t
+xs_alloc_sparse_pages(struct xdr_buf *buf, size_t want, gfp_t gfp)
+{
+	size_t i,n;
+
+	if (!(buf->flags & XDRBUF_SPARSE_PAGES))
+		return want;
+	if (want > buf->page_len)
+		want = buf->page_len;
+	n = (buf->page_base + want + PAGE_SIZE - 1) >> PAGE_SHIFT;
+	for (i = 0; i < n; i++) {
+		if (buf->pages[i])
+			continue;
+		buf->bvec[i].bv_page = buf->pages[i] = alloc_page(gfp);
+		if (!buf->pages[i]) {
+			buf->page_len = (i * PAGE_SIZE) - buf->page_base;
+			return buf->page_len;
+		}
+	}
+	return want;
+}
+
+static ssize_t
+xs_sock_recvmsg(struct socket *sock, struct msghdr *msg, int flags, size_t seek)
+{
+	ssize_t ret;
+	if (seek != 0)
+		iov_iter_advance(&msg->msg_iter, seek);
+	ret = sock_recvmsg(sock, msg, flags);
+	return ret > 0 ? ret + seek : ret;
+}
+
+static ssize_t
+xs_read_kvec(struct socket *sock, struct msghdr *msg, int flags,
+		struct kvec *kvec, size_t count, size_t seek)
+{
+	iov_iter_kvec(&msg->msg_iter, READ | ITER_KVEC, kvec, 1, count);
+	return xs_sock_recvmsg(sock, msg, flags, seek);
+}
+
+static ssize_t
+xs_read_bvec(struct socket *sock, struct msghdr *msg, int flags,
+		struct bio_vec *bvec, unsigned long nr, size_t count,
+		size_t seek)
+{
+	iov_iter_bvec(&msg->msg_iter, READ | ITER_BVEC, bvec, nr, count);
+	return xs_sock_recvmsg(sock, msg, flags, seek);
+}
+
+static ssize_t
+xs_read_discard(struct socket *sock, struct msghdr *msg, int flags,
+		size_t count)
+{
+	struct kvec kvec = { 0 };
+	return xs_read_kvec(sock, msg, flags | MSG_TRUNC, &kvec, count, 0);
+}
+
+static ssize_t
+xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
+		struct xdr_buf *buf, size_t count, size_t seek, size_t *read)
+{
+	size_t want, seek_init = seek, offset = 0;
+	ssize_t ret;
+
+	if (seek < buf->head[0].iov_len) {
+		want = min_t(size_t, count, buf->head[0].iov_len);
+		ret = xs_read_kvec(sock, msg, flags, &buf->head[0], want, seek);
+		if (ret <= 0)
+			goto sock_err;
+		offset += ret;
+		if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
+			goto out;
+		if (ret != want)
+			goto eagain;
+		seek = 0;
+	} else {
+		seek -= buf->head[0].iov_len;
+		offset += buf->head[0].iov_len;
+	}
+	if (buf->page_len && seek < buf->page_len) {
+		want = min_t(size_t, count - offset, buf->page_len);
+		want = xs_alloc_sparse_pages(buf, want, GFP_NOWAIT);
+		ret = xs_read_bvec(sock, msg, flags, buf->bvec,
+				xdr_buf_pagecount(buf),
+				want + buf->page_base,
+				seek + buf->page_base);
+		if (ret <= 0)
+			goto sock_err;
+		offset += ret;
+		if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
+			goto out;
+		if (ret != want)
+			goto eagain;
+		seek = 0;
+	} else {
+		seek -= buf->page_len;
+		offset += buf->page_len;
+	}
+	if (buf->tail[0].iov_len && seek < buf->tail[0].iov_len) {
+		want = min_t(size_t, count - offset, buf->tail[0].iov_len);
+		ret = xs_read_kvec(sock, msg, flags, &buf->tail[0], want, seek);
+		if (ret <= 0)
+			goto sock_err;
+		offset += ret;
+		if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
+			goto out;
+		if (ret != want)
+			goto eagain;
+	} else
+		offset += buf->tail[0].iov_len;
+	ret = -EMSGSIZE;
+	msg->msg_flags |= MSG_TRUNC;
+out:
+	*read = offset - seek_init;
+	return ret;
+eagain:
+	ret = -EAGAIN;
+	goto out;
+sock_err:
+	offset += seek;
+	goto out;
+}
+
+static void
+xs_read_header(struct sock_xprt *transport, struct xdr_buf *buf)
+{
+	if (!transport->recv.copied) {
+		if (buf->head[0].iov_len >= transport->recv.offset)
+			memcpy(buf->head[0].iov_base,
+					&transport->recv.xid,
+					transport->recv.offset);
+		transport->recv.copied = transport->recv.offset;
+	}
+}
+
+static bool
+xs_read_stream_request_done(struct sock_xprt *transport)
+{
+	return transport->recv.fraghdr & cpu_to_be32(RPC_LAST_STREAM_FRAGMENT);
+}
+
+static ssize_t
+xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg,
+		int flags, struct rpc_rqst *req)
+{
+	struct xdr_buf *buf = &req->rq_private_buf;
+	size_t want, read;
+	ssize_t ret;
+
+	xs_read_header(transport, buf);
+
+	want = transport->recv.len - transport->recv.offset;
+	ret = xs_read_xdr_buf(transport->sock, msg, flags, buf,
+			transport->recv.copied + want, transport->recv.copied,
+			&read);
+	transport->recv.offset += read;
+	transport->recv.copied += read;
+	if (transport->recv.offset == transport->recv.len) {
+		if (xs_read_stream_request_done(transport))
+			msg->msg_flags |= MSG_EOR;
+		return transport->recv.copied;
+	}
+
+	switch (ret) {
+	case -EMSGSIZE:
+		return transport->recv.copied;
+	case 0:
+		return -ESHUTDOWN;
+	default:
+		if (ret < 0)
+			return ret;
+	}
+	return -EAGAIN;
+}
+
+static size_t
+xs_read_stream_headersize(bool isfrag)
+{
+	if (isfrag)
+		return sizeof(__be32);
+	return 3 * sizeof(__be32);
+}
+
+static ssize_t
+xs_read_stream_header(struct sock_xprt *transport, struct msghdr *msg,
+		int flags, size_t want, size_t seek)
+{
+	struct kvec kvec = {
+		.iov_base = &transport->recv.fraghdr,
+		.iov_len = want,
+	};
+	return xs_read_kvec(transport->sock, msg, flags, &kvec, want, seek);
+}
+
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+static ssize_t
+xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
+{
+	struct rpc_xprt *xprt = &transport->xprt;
+	struct rpc_rqst *req;
+	ssize_t ret;
+
+	/* Look up and lock the request corresponding to the given XID */
+	req = xprt_lookup_bc_request(xprt, transport->recv.xid);
+	if (!req) {
+		printk(KERN_WARNING "Callback slot table overflowed\n");
+		return -ESHUTDOWN;
+	}
+
+	ret = xs_read_stream_request(transport, msg, flags, req);
+	if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
+		xprt_complete_bc_request(req, ret);
+
+	return ret;
+}
+#else /* CONFIG_SUNRPC_BACKCHANNEL */
+static ssize_t
+xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
+{
+	return -ESHUTDOWN;
+}
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
+static ssize_t
+xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags)
+{
+	struct rpc_xprt *xprt = &transport->xprt;
+	struct rpc_rqst *req;
+	ssize_t ret = 0;
+
+	/* Look up and lock the request corresponding to the given XID */
+	spin_lock(&xprt->queue_lock);
+	req = xprt_lookup_rqst(xprt, transport->recv.xid);
+	if (!req) {
+		msg->msg_flags |= MSG_TRUNC;
+		goto out;
+	}
+	xprt_pin_rqst(req);
+	spin_unlock(&xprt->queue_lock);
+
+	ret = xs_read_stream_request(transport, msg, flags, req);
+
+	spin_lock(&xprt->queue_lock);
+	if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
+		xprt_complete_rqst(req->rq_task, ret);
+	xprt_unpin_rqst(req);
+out:
+	spin_unlock(&xprt->queue_lock);
+	return ret;
+}
+
+static ssize_t
+xs_read_stream(struct sock_xprt *transport, int flags)
+{
+	struct msghdr msg = { 0 };
+	size_t want, read = 0;
+	ssize_t ret = 0;
+
+	if (transport->recv.len == 0) {
+		want = xs_read_stream_headersize(transport->recv.copied != 0);
+		ret = xs_read_stream_header(transport, &msg, flags, want,
+				transport->recv.offset);
+		if (ret <= 0)
+			goto out_err;
+		transport->recv.offset = ret;
+		if (ret != want) {
+			ret = -EAGAIN;
+			goto out_err;
+		}
+		transport->recv.len = be32_to_cpu(transport->recv.fraghdr) &
+			RPC_FRAGMENT_SIZE_MASK;
+		transport->recv.offset -= sizeof(transport->recv.fraghdr);
+		read = ret;
+	}
+
+	switch (be32_to_cpu(transport->recv.calldir)) {
+	case RPC_CALL:
+		ret = xs_read_stream_call(transport, &msg, flags);
+		break;
+	case RPC_REPLY:
+		ret = xs_read_stream_reply(transport, &msg, flags);
+	}
+	if (msg.msg_flags & MSG_TRUNC) {
+		transport->recv.calldir = cpu_to_be32(-1);
+		transport->recv.copied = -1;
+	}
+	if (ret < 0)
+		goto out_err;
+	read += ret;
+	if (transport->recv.offset < transport->recv.len) {
+		ret = xs_read_discard(transport->sock, &msg, flags,
+				transport->recv.len - transport->recv.offset);
+		if (ret <= 0)
+			goto out_err;
+		transport->recv.offset += ret;
+		read += ret;
+	}
+	if (xs_read_stream_request_done(transport)) {
+		trace_xs_tcp_data_recv(transport);
+		transport->recv.copied = 0;
+	}
+	transport->recv.offset = 0;
+	transport->recv.len = 0;
+	return read;
+out_err:
+	switch (ret) {
+	case 0:
+	case -ESHUTDOWN:
+		xprt_force_disconnect(&transport->xprt);
+		return -ESHUTDOWN;
+	}
+	return ret;
+}
+
 #define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)
 
 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
@@ -484,6 +798,12 @@ static int xs_nospace(struct rpc_rqst *req)
 	return ret;
 }
 
+static void
+xs_stream_prepare_request(struct rpc_rqst *req)
+{
+	req->rq_task->tk_status = xdr_alloc_bvec(&req->rq_rcv_buf, GFP_NOIO);
+}
+
 /*
  * Determine if the previous message in the stream was aborted before it
  * could complete transmission.
@@ -1157,263 +1477,7 @@ static void xs_tcp_force_close(struct rpc_xprt *xprt)
 	xprt_force_disconnect(xprt);
 }
 
-static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
-{
-	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
-	size_t len, used;
-	char *p;
-
-	p = ((char *) &transport->recv.fraghdr) + transport->recv.offset;
-	len = sizeof(transport->recv.fraghdr) - transport->recv.offset;
-	used = xdr_skb_read_bits(desc, p, len);
-	transport->recv.offset += used;
-	if (used != len)
-		return;
-
-	transport->recv.len = ntohl(transport->recv.fraghdr);
-	if (transport->recv.len & RPC_LAST_STREAM_FRAGMENT)
-		transport->recv.flags |= TCP_RCV_LAST_FRAG;
-	else
-		transport->recv.flags &= ~TCP_RCV_LAST_FRAG;
-	transport->recv.len &= RPC_FRAGMENT_SIZE_MASK;
-
-	transport->recv.flags &= ~TCP_RCV_COPY_FRAGHDR;
-	transport->recv.offset = 0;
-
-	/* Sanity check of the record length */
-	if (unlikely(transport->recv.len < 8)) {
-		dprintk("RPC:       invalid TCP record fragment length\n");
-		xs_tcp_force_close(xprt);
-		return;
-	}
-	dprintk("RPC:       reading TCP record fragment of length %d\n",
-			transport->recv.len);
-}
-
-static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
-{
-	if (transport->recv.offset == transport->recv.len) {
-		transport->recv.flags |= TCP_RCV_COPY_FRAGHDR;
-		transport->recv.offset = 0;
-		if (transport->recv.flags & TCP_RCV_LAST_FRAG) {
-			transport->recv.flags &= ~TCP_RCV_COPY_DATA;
-			transport->recv.flags |= TCP_RCV_COPY_XID;
-			transport->recv.copied = 0;
-		}
-	}
-}
-
-static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
-{
-	size_t len, used;
-	char *p;
-
-	len = sizeof(transport->recv.xid) - transport->recv.offset;
-	dprintk("RPC:       reading XID (%zu bytes)\n", len);
-	p = ((char *) &transport->recv.xid) + transport->recv.offset;
-	used = xdr_skb_read_bits(desc, p, len);
-	transport->recv.offset += used;
-	if (used != len)
-		return;
-	transport->recv.flags &= ~TCP_RCV_COPY_XID;
-	transport->recv.flags |= TCP_RCV_READ_CALLDIR;
-	transport->recv.copied = 4;
-	dprintk("RPC:       reading %s XID %08x\n",
-			(transport->recv.flags & TCP_RPC_REPLY) ? "reply for"
-							      : "request with",
-			ntohl(transport->recv.xid));
-	xs_tcp_check_fraghdr(transport);
-}
-
-static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
-				       struct xdr_skb_reader *desc)
-{
-	size_t len, used;
-	u32 offset;
-	char *p;
-
-	/*
-	 * We want transport->recv.offset to be 8 at the end of this routine
-	 * (4 bytes for the xid and 4 bytes for the call/reply flag).
-	 * When this function is called for the first time,
-	 * transport->recv.offset is 4 (after having already read the xid).
-	 */
-	offset = transport->recv.offset - sizeof(transport->recv.xid);
-	len = sizeof(transport->recv.calldir) - offset;
-	dprintk("RPC:       reading CALL/REPLY flag (%zu bytes)\n", len);
-	p = ((char *) &transport->recv.calldir) + offset;
-	used = xdr_skb_read_bits(desc, p, len);
-	transport->recv.offset += used;
-	if (used != len)
-		return;
-	transport->recv.flags &= ~TCP_RCV_READ_CALLDIR;
-	/*
-	 * We don't yet have the XDR buffer, so we will write the calldir
-	 * out after we get the buffer from the 'struct rpc_rqst'
-	 */
-	switch (ntohl(transport->recv.calldir)) {
-	case RPC_REPLY:
-		transport->recv.flags |= TCP_RCV_COPY_CALLDIR;
-		transport->recv.flags |= TCP_RCV_COPY_DATA;
-		transport->recv.flags |= TCP_RPC_REPLY;
-		break;
-	case RPC_CALL:
-		transport->recv.flags |= TCP_RCV_COPY_CALLDIR;
-		transport->recv.flags |= TCP_RCV_COPY_DATA;
-		transport->recv.flags &= ~TCP_RPC_REPLY;
-		break;
-	default:
-		dprintk("RPC:       invalid request message type\n");
-		xs_tcp_force_close(&transport->xprt);
-	}
-	xs_tcp_check_fraghdr(transport);
-}
-
-static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
-				     struct xdr_skb_reader *desc,
-				     struct rpc_rqst *req)
-{
-	struct sock_xprt *transport =
-				container_of(xprt, struct sock_xprt, xprt);
-	struct xdr_buf *rcvbuf;
-	size_t len;
-	ssize_t r;
-
-	rcvbuf = &req->rq_private_buf;
-
-	if (transport->recv.flags & TCP_RCV_COPY_CALLDIR) {
-		/*
-		 * Save the RPC direction in the XDR buffer
-		 */
-		memcpy(rcvbuf->head[0].iov_base + transport->recv.copied,
-			&transport->recv.calldir,
-			sizeof(transport->recv.calldir));
-		transport->recv.copied += sizeof(transport->recv.calldir);
-		transport->recv.flags &= ~TCP_RCV_COPY_CALLDIR;
-	}
-
-	len = desc->count;
-	if (len > transport->recv.len - transport->recv.offset)
-		desc->count = transport->recv.len - transport->recv.offset;
-	r = xdr_partial_copy_from_skb(rcvbuf, transport->recv.copied,
-					  desc, xdr_skb_read_bits);
-
-	if (desc->count) {
-		/* Error when copying to the receive buffer,
-		 * usually because we weren't able to allocate
-		 * additional buffer pages. All we can do now
-		 * is turn off TCP_RCV_COPY_DATA, so the request
-		 * will not receive any additional updates,
-		 * and time out.
-		 * Any remaining data from this record will
-		 * be discarded.
-		 */
-		transport->recv.flags &= ~TCP_RCV_COPY_DATA;
-		dprintk("RPC:       XID %08x truncated request\n",
-				ntohl(transport->recv.xid));
-		dprintk("RPC:       xprt = %p, recv.copied = %lu, "
-				"recv.offset = %u, recv.len = %u\n",
-				xprt, transport->recv.copied,
-				transport->recv.offset, transport->recv.len);
-		return;
-	}
-
-	transport->recv.copied += r;
-	transport->recv.offset += r;
-	desc->count = len - r;
-
-	dprintk("RPC:       XID %08x read %zd bytes\n",
-			ntohl(transport->recv.xid), r);
-	dprintk("RPC:       xprt = %p, recv.copied = %lu, recv.offset = %u, "
-			"recv.len = %u\n", xprt, transport->recv.copied,
-			transport->recv.offset, transport->recv.len);
-
-	if (transport->recv.copied == req->rq_private_buf.buflen)
-		transport->recv.flags &= ~TCP_RCV_COPY_DATA;
-	else if (transport->recv.offset == transport->recv.len) {
-		if (transport->recv.flags & TCP_RCV_LAST_FRAG)
-			transport->recv.flags &= ~TCP_RCV_COPY_DATA;
-	}
-}
-
-/*
- * Finds the request corresponding to the RPC xid and invokes the common
- * tcp read code to read the data.
- */
-static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
-				    struct xdr_skb_reader *desc)
-{
-	struct sock_xprt *transport =
-				container_of(xprt, struct sock_xprt, xprt);
-	struct rpc_rqst *req;
-
-	dprintk("RPC:       read reply XID %08x\n", ntohl(transport->recv.xid));
-
-	/* Find and lock the request corresponding to this xid */
-	spin_lock(&xprt->queue_lock);
-	req = xprt_lookup_rqst(xprt, transport->recv.xid);
-	if (!req) {
-		dprintk("RPC:       XID %08x request not found!\n",
-				ntohl(transport->recv.xid));
-		spin_unlock(&xprt->queue_lock);
-		return -1;
-	}
-	xprt_pin_rqst(req);
-	spin_unlock(&xprt->queue_lock);
-
-	xs_tcp_read_common(xprt, desc, req);
-
-	spin_lock(&xprt->queue_lock);
-	if (!(transport->recv.flags & TCP_RCV_COPY_DATA))
-		xprt_complete_rqst(req->rq_task, transport->recv.copied);
-	xprt_unpin_rqst(req);
-	spin_unlock(&xprt->queue_lock);
-	return 0;
-}
-
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
-/*
- * Obtains an rpc_rqst previously allocated and invokes the common
- * tcp read code to read the data.  The result is placed in the callback
- * queue.
- * If we're unable to obtain the rpc_rqst we schedule the closing of the
- * connection and return -1.
- */
-static int xs_tcp_read_callback(struct rpc_xprt *xprt,
-				       struct xdr_skb_reader *desc)
-{
-	struct sock_xprt *transport =
-				container_of(xprt, struct sock_xprt, xprt);
-	struct rpc_rqst *req;
-
-	/* Look up the request corresponding to the given XID */
-	req = xprt_lookup_bc_request(xprt, transport->recv.xid);
-	if (req == NULL) {
-		printk(KERN_WARNING "Callback slot table overflowed\n");
-		xprt_force_disconnect(xprt);
-		return -1;
-	}
-
-	dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
-	xs_tcp_read_common(xprt, desc, req);
-
-	if (!(transport->recv.flags & TCP_RCV_COPY_DATA))
-		xprt_complete_bc_request(req, transport->recv.copied);
-
-	return 0;
-}
-
-static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
-					struct xdr_skb_reader *desc)
-{
-	struct sock_xprt *transport =
-				container_of(xprt, struct sock_xprt, xprt);
-
-	return (transport->recv.flags & TCP_RPC_REPLY) ?
-		xs_tcp_read_reply(xprt, desc) :
-		xs_tcp_read_callback(xprt, desc);
-}
-
 static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net)
 {
 	int ret;
@@ -1429,106 +1493,14 @@ static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt)
 {
 	return PAGE_SIZE;
 }
-#else
-static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
-					struct xdr_skb_reader *desc)
-{
-	return xs_tcp_read_reply(xprt, desc);
-}
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
-/*
- * Read data off the transport.  This can be either an RPC_CALL or an
- * RPC_REPLY.  Relay the processing to helper functions.
- */
-static void xs_tcp_read_data(struct rpc_xprt *xprt,
-				    struct xdr_skb_reader *desc)
-{
-	struct sock_xprt *transport =
-				container_of(xprt, struct sock_xprt, xprt);
-
-	if (_xs_tcp_read_data(xprt, desc) == 0)
-		xs_tcp_check_fraghdr(transport);
-	else {
-		/*
-		 * The transport_lock protects the request handling.
-		 * There's no need to hold it to update the recv.flags.
-		 */
-		transport->recv.flags &= ~TCP_RCV_COPY_DATA;
-	}
-}
-
-static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
-{
-	size_t len;
-
-	len = transport->recv.len - transport->recv.offset;
-	if (len > desc->count)
-		len = desc->count;
-	desc->count -= len;
-	desc->offset += len;
-	transport->recv.offset += len;
-	dprintk("RPC:       discarded %zu bytes\n", len);
-	xs_tcp_check_fraghdr(transport);
-}
-
-static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
-{
-	struct rpc_xprt *xprt = rd_desc->arg.data;
-	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
-	struct xdr_skb_reader desc = {
-		.skb	= skb,
-		.offset	= offset,
-		.count	= len,
-	};
-	size_t ret;
-
-	dprintk("RPC:       xs_tcp_data_recv started\n");
-	do {
-		trace_xs_tcp_data_recv(transport);
-		/* Read in a new fragment marker if necessary */
-		/* Can we ever really expect to get completely empty fragments? */
-		if (transport->recv.flags & TCP_RCV_COPY_FRAGHDR) {
-			xs_tcp_read_fraghdr(xprt, &desc);
-			continue;
-		}
-		/* Read in the xid if necessary */
-		if (transport->recv.flags & TCP_RCV_COPY_XID) {
-			xs_tcp_read_xid(transport, &desc);
-			continue;
-		}
-		/* Read in the call/reply flag */
-		if (transport->recv.flags & TCP_RCV_READ_CALLDIR) {
-			xs_tcp_read_calldir(transport, &desc);
-			continue;
-		}
-		/* Read in the request data */
-		if (transport->recv.flags & TCP_RCV_COPY_DATA) {
-			xs_tcp_read_data(xprt, &desc);
-			continue;
-		}
-		/* Skip over any trailing bytes on short reads */
-		xs_tcp_read_discard(transport, &desc);
-	} while (desc.count);
-	ret = len - desc.count;
-	if (ret < rd_desc->count)
-		rd_desc->count -= ret;
-	else
-		rd_desc->count = 0;
-	trace_xs_tcp_data_recv(transport);
-	dprintk("RPC:       xs_tcp_data_recv done\n");
-	return ret;
-}
-
 static void xs_tcp_data_receive(struct sock_xprt *transport)
 {
 	struct rpc_xprt *xprt = &transport->xprt;
 	struct sock *sk;
-	read_descriptor_t rd_desc = {
-		.arg.data = xprt,
-	};
-	unsigned long total = 0;
-	int read = 0;
+	size_t read = 0;
+	ssize_t ret = 0;
 
 restart:
 	mutex_lock(&transport->recv_mutex);
@@ -1536,18 +1508,12 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
 	if (sk == NULL)
 		goto out;
 
-	/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
 	for (;;) {
-		rd_desc.count = RPC_TCP_READ_CHUNK_SZ;
-		lock_sock(sk);
-		read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
-		if (rd_desc.count != 0 || read < 0) {
-			clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
-			release_sock(sk);
+		clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
+		ret = xs_read_stream(transport, MSG_DONTWAIT | MSG_NOSIGNAL);
+		if (ret < 0)
 			break;
-		}
-		release_sock(sk);
-		total += read;
+		read += ret;
 		if (need_resched()) {
 			mutex_unlock(&transport->recv_mutex);
 			cond_resched();
@@ -1558,7 +1524,7 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
 		queue_work(xprtiod_workqueue, &transport->recv_worker);
 out:
 	mutex_unlock(&transport->recv_mutex);
-	trace_xs_tcp_data_ready(xprt, read, total);
+	trace_xs_tcp_data_ready(xprt, ret, read);
 }
 
 static void xs_tcp_data_receive_workfn(struct work_struct *work)
@@ -2380,7 +2346,6 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 	transport->recv.offset = 0;
 	transport->recv.len = 0;
 	transport->recv.copied = 0;
-	transport->recv.flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
 	transport->xmit.offset = 0;
 
 	/* Tell the socket layer to start connecting... */
@@ -2802,6 +2767,7 @@ static const struct rpc_xprt_ops xs_tcp_ops = {
 	.connect		= xs_connect,
 	.buf_alloc		= rpc_malloc,
 	.buf_free		= rpc_free,
+	.prepare_request	= xs_stream_prepare_request,
 	.send_request		= xs_tcp_send_request,
 	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
 	.close			= xs_tcp_shutdown,
-- 
2.17.1

  reply	other threads:[~2018-09-17 18:31 UTC|newest]

Thread overview: 76+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-09-17 13:02 [PATCH v3 00/44] Convert RPC client transmission to a queued model Trond Myklebust
2018-09-17 13:02 ` [PATCH v3 01/44] SUNRPC: Clean up initialisation of the struct rpc_rqst Trond Myklebust
2018-09-17 13:02   ` [PATCH v3 02/44] SUNRPC: If there is no reply expected, bail early from call_decode Trond Myklebust
2018-09-17 13:02     ` [PATCH v3 03/44] SUNRPC: The transmitted message must lie in the RPCSEC window of validity Trond Myklebust
2018-09-17 13:02       ` [PATCH v3 04/44] SUNRPC: Simplify identification of when the message send/receive is complete Trond Myklebust
2018-09-17 13:02         ` [PATCH v3 05/44] SUNRPC: Avoid holding locks across the XDR encoding of the RPC message Trond Myklebust
2018-09-17 13:02           ` [PATCH v3 06/44] SUNRPC: Rename TCP receive-specific state variables Trond Myklebust
2018-09-17 13:02             ` [PATCH v3 07/44] SUNRPC: Move reset of TCP state variables into the reconnect code Trond Myklebust
2018-09-17 13:02               ` [PATCH v3 08/44] SUNRPC: Add socket transmit queue offset tracking Trond Myklebust
2018-09-17 13:03                 ` [PATCH v3 09/44] SUNRPC: Simplify dealing with aborted partially transmitted messages Trond Myklebust
2018-09-17 13:03                   ` [PATCH v3 10/44] SUNRPC: Refactor the transport request pinning Trond Myklebust
2018-09-17 13:03                     ` [PATCH v3 11/44] SUNRPC: Add a helper to wake up a sleeping rpc_task and set its status Trond Myklebust
2018-09-17 13:03                       ` [PATCH v3 12/44] SUNRPC: Test whether the task is queued before grabbing the queue spinlocks Trond Myklebust
2018-09-17 13:03                         ` [PATCH v3 13/44] SUNRPC: Don't wake queued RPC calls multiple times in xprt_transmit Trond Myklebust
2018-09-17 13:03                           ` [PATCH v3 14/44] SUNRPC: Rename xprt->recv_lock to xprt->queue_lock Trond Myklebust
2018-09-17 13:03                             ` [PATCH v3 15/44] SUNRPC: Refactor xprt_transmit() to remove the reply queue code Trond Myklebust
2018-09-17 13:03                               ` [PATCH v3 16/44] SUNRPC: Refactor xprt_transmit() to remove wait for reply code Trond Myklebust
2018-09-17 13:03                                 ` [PATCH v3 17/44] SUNRPC: Minor cleanup for call_transmit() Trond Myklebust
2018-09-17 13:03                                   ` [PATCH v3 18/44] SUNRPC: Distinguish between the slot allocation list and receive queue Trond Myklebust
2018-09-17 13:03                                     ` [PATCH v3 19/44] SUNRPC: Add a transmission queue for RPC requests Trond Myklebust
2018-09-17 13:03                                       ` [PATCH v3 20/44] SUNRPC: Refactor RPC call encoding Trond Myklebust
2018-09-17 13:03                                         ` [PATCH v3 21/44] SUNRPC: Fix up the back channel transmit Trond Myklebust
2018-09-17 13:03                                           ` [PATCH v3 22/44] SUNRPC: Treat the task and request as separate in the xprt_ops->send_request() Trond Myklebust
2018-09-17 13:03                                             ` [PATCH v3 23/44] SUNRPC: Don't reset the request 'bytes_sent' counter when releasing XPRT_LOCK Trond Myklebust
2018-09-17 13:03                                               ` [PATCH v3 24/44] SUNRPC: Simplify xprt_prepare_transmit() Trond Myklebust
2018-09-17 13:03                                                 ` [PATCH v3 25/44] SUNRPC: Move RPC retransmission stat counter to xprt_transmit() Trond Myklebust
2018-09-17 13:03                                                   ` [PATCH v3 26/44] SUNRPC: Improve latency for interactive tasks Trond Myklebust
2018-09-17 13:03                                                     ` [PATCH v3 27/44] SUNRPC: Support for congestion control when queuing is enabled Trond Myklebust
2018-09-17 13:03                                                       ` [PATCH v3 28/44] SUNRPC: Enqueue swapper tagged RPCs at the head of the transmit queue Trond Myklebust
2018-09-17 13:03                                                         ` [PATCH v3 29/44] SUNRPC: Allow calls to xprt_transmit() to drain the entire " Trond Myklebust
2018-09-17 13:03                                                           ` [PATCH v3 30/44] SUNRPC: Allow soft RPC calls to time out when waiting for the XPRT_LOCK Trond Myklebust
2018-09-17 13:03                                                             ` [PATCH v3 31/44] SUNRPC: Turn off throttling of RPC slots for TCP sockets Trond Myklebust
2018-09-17 13:03                                                               ` [PATCH v3 32/44] SUNRPC: Clean up transport write space handling Trond Myklebust
2018-09-17 13:03                                                                 ` [PATCH v3 33/44] SUNRPC: Cleanup: remove the unused 'task' argument from the request_send() Trond Myklebust
2018-09-17 13:03                                                                   ` [PATCH v3 34/44] SUNRPC: Don't take transport->lock unnecessarily when taking XPRT_LOCK Trond Myklebust
2018-09-17 13:03                                                                     ` [PATCH v3 35/44] SUNRPC: Convert xprt receive queue to use an rbtree Trond Myklebust
2018-09-17 13:03                                                                       ` [PATCH v3 36/44] SUNRPC: Fix priority queue fairness Trond Myklebust
2018-09-17 13:03                                                                         ` [PATCH v3 37/44] SUNRPC: Convert the xprt->sending queue back to an ordinary wait queue Trond Myklebust
2018-09-17 13:03                                                                           ` [PATCH v3 38/44] SUNRPC: Add a label for RPC calls that require allocation on receive Trond Myklebust
2018-09-17 13:03                                                                             ` [PATCH v3 39/44] SUNRPC: Add a bvec array to struct xdr_buf for use with iovec_iter() Trond Myklebust
2018-09-17 13:03                                                                               ` Trond Myklebust [this message]
2018-09-17 13:03                                                                                 ` [PATCH v3 41/44] SUNRPC: Clean up - rename xs_tcp_data_receive() to xs_stream_data_receive() Trond Myklebust
2018-09-17 13:03                                                                                   ` [PATCH v3 42/44] SUNRPC: Allow AF_LOCAL sockets to use the generic stream receive Trond Myklebust
2018-09-17 13:03                                                                                     ` [PATCH v3 43/44] SUNRPC: Clean up xs_udp_data_receive() Trond Myklebust
2018-09-17 13:03                                                                                       ` [PATCH v3 44/44] SUNRPC: Unexport xdr_partial_copy_from_skb() Trond Myklebust
2018-09-17 20:44                                                                                 ` [PATCH v3 40/44] SUNRPC: Simplify TCP receive code by switching to using iterators Trond Myklebust
2018-11-09 11:19                                                                                 ` Catalin Marinas
2018-11-29 19:28                                                                                   ` Cristian Marussi
2018-11-29 19:56                                                                                     ` Trond Myklebust
2018-11-30 16:19                                                                                       ` Cristian Marussi
2018-11-30 19:31                                                                                         ` Trond Myklebust
2018-12-02 16:44                                                                                           ` Trond Myklebust
2018-12-03 11:45                                                                                             ` Catalin Marinas
2018-12-03 11:53                                                                                               ` Cristian Marussi
2018-12-03 18:54                                                                                                 ` Cristian Marussi
2018-12-27 19:21                                                     ` [PATCH v3 26/44] SUNRPC: Improve latency for interactive tasks Chuck Lever
2018-12-27 22:14                                                       ` Trond Myklebust
2018-12-27 22:34                                                         ` Chuck Lever
2018-12-31 18:09                                                           ` Trond Myklebust
2018-12-31 18:44                                                             ` Chuck Lever
2018-12-31 18:59                                                               ` Trond Myklebust
2018-12-31 19:09                                                                 ` Chuck Lever
2018-12-31 19:18                                                                   ` Trond Myklebust
2018-12-31 19:21                                                                     ` Trond Myklebust
2019-01-02 18:17                                                                       ` Chuck Lever
2019-01-02 18:45                                                                         ` Trond Myklebust
2019-01-02 18:51                                                                           ` Chuck Lever
2019-01-02 18:57                                                                             ` Trond Myklebust
2019-01-02 19:06                                                                               ` Trond Myklebust
2019-01-02 19:24                                                                                 ` Trond Myklebust
2019-01-02 19:33                                                                                   ` Chuck Lever
2019-01-02 19:08                                                                               ` Chuck Lever
2019-01-02 19:11                                                                                 ` Trond Myklebust
2018-09-18 21:01                               ` [PATCH v3 15/44] SUNRPC: Refactor xprt_transmit() to remove the reply queue code Anna Schumaker
2018-09-19 15:48                                 ` Trond Myklebust
2018-09-19 17:30                                   ` Anna Schumaker

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20180917130335.112832-41-trond.myklebust@hammerspace.com \
    --to=trondmy@gmail.com \
    --cc=linux-nfs@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.