All of lore.kernel.org
 help / color / mirror / Atom feed
From: Ursula Braun <ubraun@linux.ibm.com>
To: davem@davemloft.net
Cc: netdev@vger.kernel.org, linux-s390@vger.kernel.org,
	schwidefsky@de.ibm.com, heiko.carstens@de.ibm.com,
	raspl@linux.ibm.com, ubraun@linux.ibm.com
Subject: [PATCH net-next 4/4] smc: add support for splice()
Date: Thu,  3 May 2018 18:12:39 +0200	[thread overview]
Message-ID: <20180503161239.71747-5-ubraun@linux.ibm.com> (raw)
In-Reply-To: <20180503161239.71747-1-ubraun@linux.ibm.com>

From: Stefan Raspl <stefan.raspl@linux.ibm.com>

Provide an implementation for splice() when we are using SMC. See
smc_splice_read() for further details.

Signed-off-by: Stefan Raspl <raspl@linux.ibm.com>
Signed-off-by: Ursula Braun <ubraun@linux.ibm.com><
---
 net/smc/af_smc.c |  38 +++++++++++--
 net/smc/smc.h    |   3 +
 net/smc/smc_rx.c | 163 +++++++++++++++++++++++++++++++++++++++++++++++++------
 net/smc/smc_rx.h |   6 +-
 4 files changed, 185 insertions(+), 25 deletions(-)

diff --git a/net/smc/af_smc.c b/net/smc/af_smc.c
index 747fdf1a2d6f..553fe4eb4066 100644
--- a/net/smc/af_smc.c
+++ b/net/smc/af_smc.c
@@ -1166,10 +1166,12 @@ static int smc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 		goto out;
 	}
 
-	if (smc->use_fallback)
+	if (smc->use_fallback) {
 		rc = smc->clcsock->ops->recvmsg(smc->clcsock, msg, len, flags);
-	else
-		rc = smc_rx_recvmsg(smc, msg, len, flags);
+	} else {
+		msg->msg_namelen = 0;
+		rc = smc_rx_recvmsg(smc, msg, NULL, len, flags);
+	}
 
 out:
 	release_sock(sk);
@@ -1446,9 +1448,15 @@ static ssize_t smc_sendpage(struct socket *sock, struct page *page,
 	return rc;
 }
 
+/* Map the affected portions of the rmbe into an spd, note the number of bytes
+ * to splice in conn->splice_pending, and press 'go'. Delays consumer cursor
+ * updates till whenever a respective page has been fully processed.
+ * Note that subsequent recv() calls have to wait till all splice() processing
+ * completed.
+ */
 static ssize_t smc_splice_read(struct socket *sock, loff_t *ppos,
 			       struct pipe_inode_info *pipe, size_t len,
-				    unsigned int flags)
+			       unsigned int flags)
 {
 	struct sock *sk = sock->sk;
 	struct smc_sock *smc;
@@ -1456,16 +1464,34 @@ static ssize_t smc_splice_read(struct socket *sock, loff_t *ppos,
 
 	smc = smc_sk(sk);
 	lock_sock(sk);
-	if ((sk->sk_state != SMC_ACTIVE) && (sk->sk_state != SMC_CLOSED))
+
+	if (sk->sk_state == SMC_INIT ||
+	    sk->sk_state == SMC_LISTEN ||
+	    sk->sk_state == SMC_CLOSED)
+		goto out;
+
+	if (sk->sk_state == SMC_PEERFINCLOSEWAIT) {
+		rc = 0;
 		goto out;
+	}
+
 	if (smc->use_fallback) {
 		rc = smc->clcsock->ops->splice_read(smc->clcsock, ppos,
 						    pipe, len, flags);
 	} else {
-		rc = -EOPNOTSUPP;
+		if (*ppos) {
+			rc = -ESPIPE;
+			goto out;
+		}
+		if (flags & SPLICE_F_NONBLOCK)
+			flags = MSG_DONTWAIT;
+		else
+			flags = 0;
+		rc = smc_rx_recvmsg(smc, NULL, pipe, len, flags);
 	}
 out:
 	release_sock(sk);
+
 	return rc;
 }
 
diff --git a/net/smc/smc.h b/net/smc/smc.h
index 2405e889b93d..ec209cd48d42 100644
--- a/net/smc/smc.h
+++ b/net/smc/smc.h
@@ -164,6 +164,9 @@ struct smc_connection {
 	atomic_t		bytes_to_rcv;	/* arrived data,
 						 * not yet received
 						 */
+	atomic_t		splice_pending;	/* number of spliced bytes
+						 * pending processing
+						 */
 #ifndef KERNEL_HAS_ATOMIC64
 	spinlock_t		acurs_lock;	/* protect cursors */
 #endif
diff --git a/net/smc/smc_rx.c b/net/smc/smc_rx.c
index 7b64bee656e8..ed45569289f5 100644
--- a/net/smc/smc_rx.c
+++ b/net/smc/smc_rx.c
@@ -43,6 +43,116 @@ static void smc_rx_wake_up(struct sock *sk)
 	rcu_read_unlock();
 }
 
+/* Update consumer cursor
+ *   @conn   connection to update
+ *   @cons   consumer cursor
+ *   @len    number of Bytes consumed
+ */
+static void smc_rx_update_consumer(struct smc_connection *conn,
+				   union smc_host_cursor cons, size_t len)
+{
+	smc_curs_add(conn->rmbe_size, &cons, len);
+	smc_curs_write(&conn->local_tx_ctrl.cons, smc_curs_read(&cons, conn),
+		       conn);
+	/* send consumer cursor update if required */
+	/* similar to advertising new TCP rcv_wnd if required */
+	smc_tx_consumer_update(conn);
+}
+
+struct smc_spd_priv {
+	struct smc_sock *smc;
+	size_t		 len;
+};
+
+static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe,
+				    struct pipe_buffer *buf)
+{
+	struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private;
+	struct smc_sock *smc = priv->smc;
+	struct smc_connection *conn;
+	union smc_host_cursor cons;
+	struct sock *sk = &smc->sk;
+
+	if (sk->sk_state == SMC_CLOSED ||
+	    sk->sk_state == SMC_PEERFINCLOSEWAIT ||
+	    sk->sk_state == SMC_APPFINCLOSEWAIT)
+		goto out;
+	conn = &smc->conn;
+	lock_sock(sk);
+	smc_curs_write(&cons, smc_curs_read(&conn->local_tx_ctrl.cons, conn),
+		       conn);
+	smc_rx_update_consumer(conn, cons, priv->len);
+	release_sock(sk);
+	if (atomic_sub_and_test(priv->len, &conn->splice_pending))
+		smc_rx_wake_up(sk);
+out:
+	kfree(priv);
+	put_page(buf->page);
+	sock_put(sk);
+}
+
+static int smc_rx_pipe_buf_nosteal(struct pipe_inode_info *pipe,
+				   struct pipe_buffer *buf)
+{
+	return 1;
+}
+
+static const struct pipe_buf_operations smc_pipe_ops = {
+	.can_merge = 0,
+	.confirm = generic_pipe_buf_confirm,
+	.release = smc_rx_pipe_buf_release,
+	.steal = smc_rx_pipe_buf_nosteal,
+	.get = generic_pipe_buf_get
+};
+
+static void smc_rx_spd_release(struct splice_pipe_desc *spd,
+			       unsigned int i)
+{
+	put_page(spd->pages[i]);
+}
+
+static int smc_rx_splice(struct pipe_inode_info *pipe, char *src, size_t len,
+			 struct smc_sock *smc)
+{
+	struct splice_pipe_desc spd;
+	struct partial_page partial;
+	struct smc_spd_priv *priv;
+	struct page *page;
+	int bytes;
+
+	page = virt_to_page(smc->conn.rmb_desc->cpu_addr);
+	priv = kzalloc(sizeof(*priv), GFP_KERNEL);
+	if (!priv)
+		return -ENOMEM;
+	priv->len = len;
+	priv->smc = smc;
+	partial.offset = src - (char *)smc->conn.rmb_desc->cpu_addr;
+	partial.len = len;
+	partial.private = (unsigned long)priv;
+
+	spd.nr_pages_max = 1;
+	spd.nr_pages = 1;
+	spd.pages = &page;
+	spd.partial = &partial;
+	spd.ops = &smc_pipe_ops;
+	spd.spd_release = smc_rx_spd_release;
+
+	bytes = splice_to_pipe(pipe, &spd);
+	if (bytes > 0) {
+		sock_hold(&smc->sk);
+		get_page(smc->conn.rmb_desc->pages);
+		atomic_add(bytes, &smc->conn.splice_pending);
+	}
+
+	return bytes;
+}
+
+static int smc_rx_data_available_and_no_splice_pend(struct smc_connection *conn)
+{
+	return atomic_read(&conn->bytes_to_rcv) &&
+	       !atomic_read(&conn->splice_pending);
+}
+
 /* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted
  *   @smc    smc socket
  *   @timeo  pointer to max seconds to wait, pointer to value 0 for no timeout
@@ -74,19 +184,25 @@ int smc_rx_wait(struct smc_sock *smc, long *timeo,
 	return rc;
 }
 
-/* rcvbuf consumer: main API called by socket layer.
- * called under sk lock.
+/* smc_rx_recvmsg - receive data from RMBE
+ * @msg:	copy data to receive buffer
+ * @pipe:	copy data to pipe if set - indicates splice() call
+ *
+ * rcvbuf consumer: main API called by socket layer.
+ * Called under sk lock.
  */
-int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
-		   int flags)
+int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
+		   struct pipe_inode_info *pipe, size_t len, int flags)
 {
 	size_t copylen, read_done = 0, read_remaining = len;
 	size_t chunk_len, chunk_off, chunk_len_sum;
 	struct smc_connection *conn = &smc->conn;
+	int (*func)(struct smc_connection *conn);
 	union smc_host_cursor cons;
 	int readable, chunk;
 	char *rcvbuf_base;
 	struct sock *sk;
+	int splbytes;
 	long timeo;
 	int target;		/* Read at least these many bytes */
 	int rc;
@@ -102,12 +218,11 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
 	target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
 
-	msg->msg_namelen = 0;
 	/* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */
 	rcvbuf_base = conn->rmb_desc->cpu_addr;
 
 	do { /* while (read_remaining) */
-		if (read_done >= target)
+		if (read_done >= target || (pipe && read_done))
 			break;
 
 		if (atomic_read(&conn->bytes_to_rcv))
@@ -156,11 +271,24 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 		/* initialize variables for 1st iteration of subsequent loop */
 		/* could be just 1 byte, even after waiting on data above */
 		readable = atomic_read(&conn->bytes_to_rcv);
+		splbytes = atomic_read(&conn->splice_pending);
+		if (!readable || (msg && splbytes)) {
+			if (splbytes)
+				func = smc_rx_data_available_and_no_splice_pend;
+			else
+				func = smc_rx_data_available;
+			smc_rx_wait(smc, &timeo, func);
+			continue;
+		}
+
 		/* not more than what user space asked for */
 		copylen = min_t(size_t, read_remaining, readable);
 		smc_curs_write(&cons,
 			       smc_curs_read(&conn->local_tx_ctrl.cons, conn),
 			       conn);
+		/* subsequent splice() calls pick up where previous left */
+		if (splbytes)
+			smc_curs_add(conn->rmbe_size, &cons, splbytes);
 		/* determine chunks where to read from rcvbuf */
 		/* either unwrapped case, or 1st chunk of wrapped case */
 		chunk_len = min_t(size_t,
@@ -170,9 +298,16 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 		smc_rmb_sync_sg_for_cpu(conn);
 		for (chunk = 0; chunk < 2; chunk++) {
 			if (!(flags & MSG_TRUNC)) {
-				rc = memcpy_to_msg(msg, rcvbuf_base + chunk_off,
-						   chunk_len);
-				if (rc) {
+				if (msg) {
+					rc = memcpy_to_msg(msg, rcvbuf_base +
+							   chunk_off,
+							   chunk_len);
+				} else {
+					rc = smc_rx_splice(pipe, rcvbuf_base +
+							chunk_off, chunk_len,
+							smc);
+				}
+				if (rc < 0) {
 					if (!read_done)
 						read_done = -EFAULT;
 					smc_rmb_sync_sg_for_device(conn);
@@ -193,18 +328,13 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 
 		/* update cursors */
 		if (!(flags & MSG_PEEK)) {
-			smc_curs_add(conn->rmbe_size, &cons, copylen);
 			/* increased in recv tasklet smc_cdc_msg_rcv() */
 			smp_mb__before_atomic();
 			atomic_sub(copylen, &conn->bytes_to_rcv);
 			/* guarantee 0 <= bytes_to_rcv <= rmbe_size */
 			smp_mb__after_atomic();
-			smc_curs_write(&conn->local_tx_ctrl.cons,
-				       smc_curs_read(&cons, conn),
-				       conn);
-			/* send consumer cursor update if required */
-			/* similar to advertising new TCP rcv_wnd if required */
-			smc_tx_consumer_update(conn);
+			if (msg)
+				smc_rx_update_consumer(conn, cons, copylen);
 		}
 	} while (read_remaining);
 out:
@@ -215,4 +345,5 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 void smc_rx_init(struct smc_sock *smc)
 {
 	smc->sk.sk_data_ready = smc_rx_wake_up;
+	atomic_set(&smc->conn.splice_pending, 0);
 }
diff --git a/net/smc/smc_rx.h b/net/smc/smc_rx.h
index 8f9f00997641..db823c97d824 100644
--- a/net/smc/smc_rx.h
+++ b/net/smc/smc_rx.h
@@ -18,8 +18,9 @@
 #include "smc.h"
 
 void smc_rx_init(struct smc_sock *smc);
-int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
-		   int flags);
+
+int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
+		   struct pipe_inode_info *pipe, size_t len, int flags);
 int smc_rx_wait(struct smc_sock *smc, long *timeo,
 		int (*fcrit)(struct smc_connection *conn));
 static inline int smc_rx_data_available(struct smc_connection *conn)
@@ -27,5 +28,4 @@ static inline int smc_rx_data_available(struct smc_connection *conn)
 	return atomic_read(&conn->bytes_to_rcv);
 }
 
-
 #endif /* SMC_RX_H */
-- 
2.13.5

  parent reply	other threads:[~2018-05-03 16:12 UTC|newest]

Thread overview: 6+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-05-03 16:12 [PATCH net-next 0/4] net/smc: splice implementation Ursula Braun
2018-05-03 16:12 ` [PATCH net-next 1/4] smc: simplify abort logic Ursula Braun
2018-05-03 16:12 ` [PATCH net-next 2/4] smc: make smc_rx_wait_data() generic Ursula Braun
2018-05-03 16:12 ` [PATCH net-next 3/4] smc: allocate RMBs as compound pages Ursula Braun
2018-05-03 16:12 ` Ursula Braun [this message]
2018-05-03 20:31 ` [PATCH net-next 0/4] net/smc: splice implementation David Miller

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20180503161239.71747-5-ubraun@linux.ibm.com \
    --to=ubraun@linux.ibm.com \
    --cc=davem@davemloft.net \
    --cc=heiko.carstens@de.ibm.com \
    --cc=linux-s390@vger.kernel.org \
    --cc=netdev@vger.kernel.org \
    --cc=raspl@linux.ibm.com \
    --cc=schwidefsky@de.ibm.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.