All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH net-next 0/4] net/smc: splice implementation
@ 2018-05-03 16:12 Ursula Braun
  2018-05-03 16:12 ` [PATCH net-next 1/4] smc: simplify abort logic Ursula Braun
                   ` (4 more replies)
  0 siblings, 5 replies; 6+ messages in thread
From: Ursula Braun @ 2018-05-03 16:12 UTC (permalink / raw)
  To: davem; +Cc: netdev, linux-s390, schwidefsky, heiko.carstens, raspl, ubraun

From: Ursula Braun <ursula.braun@de.ibm.com>

Dave,

Stefan comes up with an smc implementation for splice(). The first
three patches are preparational patches, the 4th patch implements
splice().

Thanks, Ursula

Stefan Raspl (4):
  smc: simplify abort logic
  smc: make smc_rx_wait_data() generic
  smc: allocate RMBs as compound pages
  smc: add support for splice()

 net/smc/af_smc.c   |  40 +++++++++--
 net/smc/smc.h      |   3 +
 net/smc/smc_core.c |  18 ++---
 net/smc/smc_core.h |   1 +
 net/smc/smc_rx.c   | 200 +++++++++++++++++++++++++++++++++++++++++++----------
 net/smc/smc_rx.h   |  12 +++-
 6 files changed, 219 insertions(+), 55 deletions(-)

-- 
2.13.5

^ permalink raw reply	[flat|nested] 6+ messages in thread

* [PATCH net-next 1/4] smc: simplify abort logic
  2018-05-03 16:12 [PATCH net-next 0/4] net/smc: splice implementation Ursula Braun
@ 2018-05-03 16:12 ` Ursula Braun
  2018-05-03 16:12 ` [PATCH net-next 2/4] smc: make smc_rx_wait_data() generic Ursula Braun
                   ` (3 subsequent siblings)
  4 siblings, 0 replies; 6+ messages in thread
From: Ursula Braun @ 2018-05-03 16:12 UTC (permalink / raw)
  To: davem; +Cc: netdev, linux-s390, schwidefsky, heiko.carstens, raspl, ubraun

From: Stefan Raspl <stefan.raspl@linux.ibm.com>

Some of the conditions to exit recv() are common in two pathes - cleaning up
code by moving the check up so we have it only once.

Signed-off-by: Stefan Raspl <raspl@linux.ibm.com>
Signed-off-by: Ursula Braun <ubraun@linux.ibm.com><
---
 net/smc/smc_rx.c | 16 ++++++----------
 1 file changed, 6 insertions(+), 10 deletions(-)

diff --git a/net/smc/smc_rx.c b/net/smc/smc_rx.c
index af851d8df1f8..def33fb29ac9 100644
--- a/net/smc/smc_rx.c
+++ b/net/smc/smc_rx.c
@@ -112,26 +112,22 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 		if (atomic_read(&conn->bytes_to_rcv))
 			goto copy;
 
+		if (sk->sk_shutdown & RCV_SHUTDOWN ||
+		    smc_cdc_rxed_any_close_or_senddone(conn) ||
+		    conn->local_tx_ctrl.conn_state_flags.peer_conn_abort)
+			break;
+
 		if (read_done) {
 			if (sk->sk_err ||
 			    sk->sk_state == SMC_CLOSED ||
-			    sk->sk_shutdown & RCV_SHUTDOWN ||
 			    !timeo ||
-			    signal_pending(current) ||
-			    smc_cdc_rxed_any_close_or_senddone(conn) ||
-			    conn->local_tx_ctrl.conn_state_flags.
-			    peer_conn_abort)
+			    signal_pending(current))
 				break;
 		} else {
 			if (sk->sk_err) {
 				read_done = sock_error(sk);
 				break;
 			}
-			if (sk->sk_shutdown & RCV_SHUTDOWN ||
-			    smc_cdc_rxed_any_close_or_senddone(conn) ||
-			    conn->local_tx_ctrl.conn_state_flags.
-			    peer_conn_abort)
-				break;
 			if (sk->sk_state == SMC_CLOSED) {
 				if (!sock_flag(sk, SOCK_DONE)) {
 					/* This occurs when user tries to read
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [PATCH net-next 2/4] smc: make smc_rx_wait_data() generic
  2018-05-03 16:12 [PATCH net-next 0/4] net/smc: splice implementation Ursula Braun
  2018-05-03 16:12 ` [PATCH net-next 1/4] smc: simplify abort logic Ursula Braun
@ 2018-05-03 16:12 ` Ursula Braun
  2018-05-03 16:12 ` [PATCH net-next 3/4] smc: allocate RMBs as compound pages Ursula Braun
                   ` (2 subsequent siblings)
  4 siblings, 0 replies; 6+ messages in thread
From: Ursula Braun @ 2018-05-03 16:12 UTC (permalink / raw)
  To: davem; +Cc: netdev, linux-s390, schwidefsky, heiko.carstens, raspl, ubraun

From: Stefan Raspl <stefan.raspl@linux.ibm.com>

Turn smc_rx_wait_data into a generic function that can be used at various
instances to wait on traffic to complete with varying criteria.

Signed-off-by: Stefan Raspl <raspl@linux.ibm.com>
Signed-off-by: Ursula Braun <ubraun@linux.ibm.com><
---
 net/smc/af_smc.c |  2 +-
 net/smc/smc_rx.c | 21 +++++++++++----------
 net/smc/smc_rx.h |  8 +++++++-
 3 files changed, 19 insertions(+), 12 deletions(-)

diff --git a/net/smc/af_smc.c b/net/smc/af_smc.c
index 823ea3371575..747fdf1a2d6f 100644
--- a/net/smc/af_smc.c
+++ b/net/smc/af_smc.c
@@ -1092,7 +1092,7 @@ static int smc_accept(struct socket *sock, struct socket *new_sock,
 			release_sock(clcsk);
 		} else if (!atomic_read(&smc_sk(nsk)->conn.bytes_to_rcv)) {
 			lock_sock(nsk);
-			smc_rx_wait_data(smc_sk(nsk), &timeo);
+			smc_rx_wait(smc_sk(nsk), &timeo, smc_rx_data_available);
 			release_sock(nsk);
 		}
 	}
diff --git a/net/smc/smc_rx.c b/net/smc/smc_rx.c
index def33fb29ac9..7b64bee656e8 100644
--- a/net/smc/smc_rx.c
+++ b/net/smc/smc_rx.c
@@ -22,11 +22,10 @@
 #include "smc_tx.h" /* smc_tx_consumer_update() */
 #include "smc_rx.h"
 
-/* callback implementation for sk.sk_data_ready()
- * to wakeup rcvbuf consumers that blocked with smc_rx_wait_data().
+/* callback implementation to wakeup consumers blocked with smc_rx_wait().
  * indirectly called by smc_cdc_msg_recv_action().
  */
-static void smc_rx_data_ready(struct sock *sk)
+static void smc_rx_wake_up(struct sock *sk)
 {
 	struct socket_wq *wq;
 
@@ -47,25 +46,27 @@ static void smc_rx_data_ready(struct sock *sk)
 /* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted
  *   @smc    smc socket
  *   @timeo  pointer to max seconds to wait, pointer to value 0 for no timeout
+ *   @fcrit  add'l criterion to evaluate as function pointer
  * Returns:
  * 1 if at least 1 byte available in rcvbuf or if socket error/shutdown.
  * 0 otherwise (nothing in rcvbuf nor timeout, e.g. interrupted).
  */
-int smc_rx_wait_data(struct smc_sock *smc, long *timeo)
+int smc_rx_wait(struct smc_sock *smc, long *timeo,
+		int (*fcrit)(struct smc_connection *conn))
 {
 	DEFINE_WAIT_FUNC(wait, woken_wake_function);
 	struct smc_connection *conn = &smc->conn;
 	struct sock *sk = &smc->sk;
 	int rc;
 
-	if (atomic_read(&conn->bytes_to_rcv))
+	if (fcrit(conn))
 		return 1;
 	sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
 	add_wait_queue(sk_sleep(sk), &wait);
 	rc = sk_wait_event(sk, timeo,
 			   sk->sk_err ||
 			   sk->sk_shutdown & RCV_SHUTDOWN ||
-			   atomic_read(&conn->bytes_to_rcv) ||
+			   fcrit(conn) ||
 			   smc_cdc_rxed_any_close_or_senddone(conn),
 			   &wait);
 	remove_wait_queue(sk_sleep(sk), &wait);
@@ -146,14 +147,14 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 				return -EAGAIN;
 		}
 
-		if (!atomic_read(&conn->bytes_to_rcv)) {
-			smc_rx_wait_data(smc, &timeo);
+		if (!smc_rx_data_available(conn)) {
+			smc_rx_wait(smc, &timeo, smc_rx_data_available);
 			continue;
 		}
 
 copy:
 		/* initialize variables for 1st iteration of subsequent loop */
-		/* could be just 1 byte, even after smc_rx_wait_data above */
+		/* could be just 1 byte, even after waiting on data above */
 		readable = atomic_read(&conn->bytes_to_rcv);
 		/* not more than what user space asked for */
 		copylen = min_t(size_t, read_remaining, readable);
@@ -213,5 +214,5 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 /* Initialize receive properties on connection establishment. NB: not __init! */
 void smc_rx_init(struct smc_sock *smc)
 {
-	smc->sk.sk_data_ready = smc_rx_data_ready;
+	smc->sk.sk_data_ready = smc_rx_wake_up;
 }
diff --git a/net/smc/smc_rx.h b/net/smc/smc_rx.h
index 0b75a6b470e6..8f9f00997641 100644
--- a/net/smc/smc_rx.h
+++ b/net/smc/smc_rx.h
@@ -20,6 +20,12 @@
 void smc_rx_init(struct smc_sock *smc);
 int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 		   int flags);
-int smc_rx_wait_data(struct smc_sock *smc, long *timeo);
+int smc_rx_wait(struct smc_sock *smc, long *timeo,
+		int (*fcrit)(struct smc_connection *conn));
+static inline int smc_rx_data_available(struct smc_connection *conn)
+{
+	return atomic_read(&conn->bytes_to_rcv);
+}
+
 
 #endif /* SMC_RX_H */
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [PATCH net-next 3/4] smc: allocate RMBs as compound pages
  2018-05-03 16:12 [PATCH net-next 0/4] net/smc: splice implementation Ursula Braun
  2018-05-03 16:12 ` [PATCH net-next 1/4] smc: simplify abort logic Ursula Braun
  2018-05-03 16:12 ` [PATCH net-next 2/4] smc: make smc_rx_wait_data() generic Ursula Braun
@ 2018-05-03 16:12 ` Ursula Braun
  2018-05-03 16:12 ` [PATCH net-next 4/4] smc: add support for splice() Ursula Braun
  2018-05-03 20:31 ` [PATCH net-next 0/4] net/smc: splice implementation David Miller
  4 siblings, 0 replies; 6+ messages in thread
From: Ursula Braun @ 2018-05-03 16:12 UTC (permalink / raw)
  To: davem; +Cc: netdev, linux-s390, schwidefsky, heiko.carstens, raspl, ubraun

From: Stefan Raspl <stefan.raspl@linux.ibm.com>

Preparatory work for splice() support.

Signed-off-by: Stefan Raspl <raspl@linux.ibm.com>
Signed-off-by: Ursula Braun <ubraun@linux.ibm.com><
---
 net/smc/smc_core.c | 18 +++++++++---------
 net/smc/smc_core.h |  1 +
 2 files changed, 10 insertions(+), 9 deletions(-)

diff --git a/net/smc/smc_core.c b/net/smc/smc_core.c
index 1f3ea62fac5c..4051fb504393 100644
--- a/net/smc/smc_core.c
+++ b/net/smc/smc_core.c
@@ -274,8 +274,8 @@ static void smc_buf_free(struct smc_buf_desc *buf_desc, struct smc_link *lnk,
 				    DMA_TO_DEVICE);
 	}
 	sg_free_table(&buf_desc->sgt[SMC_SINGLE_LINK]);
-	if (buf_desc->cpu_addr)
-		free_pages((unsigned long)buf_desc->cpu_addr, buf_desc->order);
+	if (buf_desc->pages)
+		__free_pages(buf_desc->pages, buf_desc->order);
 	kfree(buf_desc);
 }
 
@@ -550,16 +550,16 @@ static struct smc_buf_desc *smc_new_buf_create(struct smc_link_group *lgr,
 	if (!buf_desc)
 		return ERR_PTR(-ENOMEM);
 
-	buf_desc->cpu_addr =
-		(void *)__get_free_pages(GFP_KERNEL | __GFP_NOWARN |
-					 __GFP_NOMEMALLOC |
-					 __GFP_NORETRY | __GFP_ZERO,
-					 get_order(bufsize));
-	if (!buf_desc->cpu_addr) {
+	buf_desc->order = get_order(bufsize);
+	buf_desc->pages = alloc_pages(GFP_KERNEL | __GFP_NOWARN |
+				      __GFP_NOMEMALLOC | __GFP_COMP |
+				      __GFP_NORETRY | __GFP_ZERO,
+				      buf_desc->order);
+	if (!buf_desc->pages) {
 		kfree(buf_desc);
 		return ERR_PTR(-EAGAIN);
 	}
-	buf_desc->order = get_order(bufsize);
+	buf_desc->cpu_addr = (void *)page_address(buf_desc->pages);
 
 	/* build the sg table from the pages */
 	lnk = &lgr->lnk[SMC_SINGLE_LINK];
diff --git a/net/smc/smc_core.h b/net/smc/smc_core.h
index 97339f03ba79..cdf800f1fae7 100644
--- a/net/smc/smc_core.h
+++ b/net/smc/smc_core.h
@@ -120,6 +120,7 @@ struct smc_link {
 struct smc_buf_desc {
 	struct list_head	list;
 	void			*cpu_addr;	/* virtual address of buffer */
+	struct page		*pages;
 	struct sg_table		sgt[SMC_LINKS_PER_LGR_MAX];/* virtual buffer */
 	struct ib_mr		*mr_rx[SMC_LINKS_PER_LGR_MAX];
 						/* for rmb only: memory region
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [PATCH net-next 4/4] smc: add support for splice()
  2018-05-03 16:12 [PATCH net-next 0/4] net/smc: splice implementation Ursula Braun
                   ` (2 preceding siblings ...)
  2018-05-03 16:12 ` [PATCH net-next 3/4] smc: allocate RMBs as compound pages Ursula Braun
@ 2018-05-03 16:12 ` Ursula Braun
  2018-05-03 20:31 ` [PATCH net-next 0/4] net/smc: splice implementation David Miller
  4 siblings, 0 replies; 6+ messages in thread
From: Ursula Braun @ 2018-05-03 16:12 UTC (permalink / raw)
  To: davem; +Cc: netdev, linux-s390, schwidefsky, heiko.carstens, raspl, ubraun

From: Stefan Raspl <stefan.raspl@linux.ibm.com>

Provide an implementation for splice() when we are using SMC. See
smc_splice_read() for further details.

Signed-off-by: Stefan Raspl <raspl@linux.ibm.com>
Signed-off-by: Ursula Braun <ubraun@linux.ibm.com><
---
 net/smc/af_smc.c |  38 +++++++++++--
 net/smc/smc.h    |   3 +
 net/smc/smc_rx.c | 163 +++++++++++++++++++++++++++++++++++++++++++++++++------
 net/smc/smc_rx.h |   6 +-
 4 files changed, 185 insertions(+), 25 deletions(-)

diff --git a/net/smc/af_smc.c b/net/smc/af_smc.c
index 747fdf1a2d6f..553fe4eb4066 100644
--- a/net/smc/af_smc.c
+++ b/net/smc/af_smc.c
@@ -1166,10 +1166,12 @@ static int smc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 		goto out;
 	}
 
-	if (smc->use_fallback)
+	if (smc->use_fallback) {
 		rc = smc->clcsock->ops->recvmsg(smc->clcsock, msg, len, flags);
-	else
-		rc = smc_rx_recvmsg(smc, msg, len, flags);
+	} else {
+		msg->msg_namelen = 0;
+		rc = smc_rx_recvmsg(smc, msg, NULL, len, flags);
+	}
 
 out:
 	release_sock(sk);
@@ -1446,9 +1448,15 @@ static ssize_t smc_sendpage(struct socket *sock, struct page *page,
 	return rc;
 }
 
+/* Map the affected portions of the rmbe into an spd, note the number of bytes
+ * to splice in conn->splice_pending, and press 'go'. Delays consumer cursor
+ * updates till whenever a respective page has been fully processed.
+ * Note that subsequent recv() calls have to wait till all splice() processing
+ * completed.
+ */
 static ssize_t smc_splice_read(struct socket *sock, loff_t *ppos,
 			       struct pipe_inode_info *pipe, size_t len,
-				    unsigned int flags)
+			       unsigned int flags)
 {
 	struct sock *sk = sock->sk;
 	struct smc_sock *smc;
@@ -1456,16 +1464,34 @@ static ssize_t smc_splice_read(struct socket *sock, loff_t *ppos,
 
 	smc = smc_sk(sk);
 	lock_sock(sk);
-	if ((sk->sk_state != SMC_ACTIVE) && (sk->sk_state != SMC_CLOSED))
+
+	if (sk->sk_state == SMC_INIT ||
+	    sk->sk_state == SMC_LISTEN ||
+	    sk->sk_state == SMC_CLOSED)
+		goto out;
+
+	if (sk->sk_state == SMC_PEERFINCLOSEWAIT) {
+		rc = 0;
 		goto out;
+	}
+
 	if (smc->use_fallback) {
 		rc = smc->clcsock->ops->splice_read(smc->clcsock, ppos,
 						    pipe, len, flags);
 	} else {
-		rc = -EOPNOTSUPP;
+		if (*ppos) {
+			rc = -ESPIPE;
+			goto out;
+		}
+		if (flags & SPLICE_F_NONBLOCK)
+			flags = MSG_DONTWAIT;
+		else
+			flags = 0;
+		rc = smc_rx_recvmsg(smc, NULL, pipe, len, flags);
 	}
 out:
 	release_sock(sk);
+
 	return rc;
 }
 
diff --git a/net/smc/smc.h b/net/smc/smc.h
index 2405e889b93d..ec209cd48d42 100644
--- a/net/smc/smc.h
+++ b/net/smc/smc.h
@@ -164,6 +164,9 @@ struct smc_connection {
 	atomic_t		bytes_to_rcv;	/* arrived data,
 						 * not yet received
 						 */
+	atomic_t		splice_pending;	/* number of spliced bytes
+						 * pending processing
+						 */
 #ifndef KERNEL_HAS_ATOMIC64
 	spinlock_t		acurs_lock;	/* protect cursors */
 #endif
diff --git a/net/smc/smc_rx.c b/net/smc/smc_rx.c
index 7b64bee656e8..ed45569289f5 100644
--- a/net/smc/smc_rx.c
+++ b/net/smc/smc_rx.c
@@ -43,6 +43,116 @@ static void smc_rx_wake_up(struct sock *sk)
 	rcu_read_unlock();
 }
 
+/* Update consumer cursor
+ *   @conn   connection to update
+ *   @cons   consumer cursor
+ *   @len    number of Bytes consumed
+ */
+static void smc_rx_update_consumer(struct smc_connection *conn,
+				   union smc_host_cursor cons, size_t len)
+{
+	smc_curs_add(conn->rmbe_size, &cons, len);
+	smc_curs_write(&conn->local_tx_ctrl.cons, smc_curs_read(&cons, conn),
+		       conn);
+	/* send consumer cursor update if required */
+	/* similar to advertising new TCP rcv_wnd if required */
+	smc_tx_consumer_update(conn);
+}
+
+struct smc_spd_priv {
+	struct smc_sock *smc;
+	size_t		 len;
+};
+
+static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe,
+				    struct pipe_buffer *buf)
+{
+	struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private;
+	struct smc_sock *smc = priv->smc;
+	struct smc_connection *conn;
+	union smc_host_cursor cons;
+	struct sock *sk = &smc->sk;
+
+	if (sk->sk_state == SMC_CLOSED ||
+	    sk->sk_state == SMC_PEERFINCLOSEWAIT ||
+	    sk->sk_state == SMC_APPFINCLOSEWAIT)
+		goto out;
+	conn = &smc->conn;
+	lock_sock(sk);
+	smc_curs_write(&cons, smc_curs_read(&conn->local_tx_ctrl.cons, conn),
+		       conn);
+	smc_rx_update_consumer(conn, cons, priv->len);
+	release_sock(sk);
+	if (atomic_sub_and_test(priv->len, &conn->splice_pending))
+		smc_rx_wake_up(sk);
+out:
+	kfree(priv);
+	put_page(buf->page);
+	sock_put(sk);
+}
+
+static int smc_rx_pipe_buf_nosteal(struct pipe_inode_info *pipe,
+				   struct pipe_buffer *buf)
+{
+	return 1;
+}
+
+static const struct pipe_buf_operations smc_pipe_ops = {
+	.can_merge = 0,
+	.confirm = generic_pipe_buf_confirm,
+	.release = smc_rx_pipe_buf_release,
+	.steal = smc_rx_pipe_buf_nosteal,
+	.get = generic_pipe_buf_get
+};
+
+static void smc_rx_spd_release(struct splice_pipe_desc *spd,
+			       unsigned int i)
+{
+	put_page(spd->pages[i]);
+}
+
+static int smc_rx_splice(struct pipe_inode_info *pipe, char *src, size_t len,
+			 struct smc_sock *smc)
+{
+	struct splice_pipe_desc spd;
+	struct partial_page partial;
+	struct smc_spd_priv *priv;
+	struct page *page;
+	int bytes;
+
+	page = virt_to_page(smc->conn.rmb_desc->cpu_addr);
+	priv = kzalloc(sizeof(*priv), GFP_KERNEL);
+	if (!priv)
+		return -ENOMEM;
+	priv->len = len;
+	priv->smc = smc;
+	partial.offset = src - (char *)smc->conn.rmb_desc->cpu_addr;
+	partial.len = len;
+	partial.private = (unsigned long)priv;
+
+	spd.nr_pages_max = 1;
+	spd.nr_pages = 1;
+	spd.pages = &page;
+	spd.partial = &partial;
+	spd.ops = &smc_pipe_ops;
+	spd.spd_release = smc_rx_spd_release;
+
+	bytes = splice_to_pipe(pipe, &spd);
+	if (bytes > 0) {
+		sock_hold(&smc->sk);
+		get_page(smc->conn.rmb_desc->pages);
+		atomic_add(bytes, &smc->conn.splice_pending);
+	}
+
+	return bytes;
+}
+
+static int smc_rx_data_available_and_no_splice_pend(struct smc_connection *conn)
+{
+	return atomic_read(&conn->bytes_to_rcv) &&
+	       !atomic_read(&conn->splice_pending);
+}
+
 /* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted
  *   @smc    smc socket
  *   @timeo  pointer to max seconds to wait, pointer to value 0 for no timeout
@@ -74,19 +184,25 @@ int smc_rx_wait(struct smc_sock *smc, long *timeo,
 	return rc;
 }
 
-/* rcvbuf consumer: main API called by socket layer.
- * called under sk lock.
+/* smc_rx_recvmsg - receive data from RMBE
+ * @msg:	copy data to receive buffer
+ * @pipe:	copy data to pipe if set - indicates splice() call
+ *
+ * rcvbuf consumer: main API called by socket layer.
+ * Called under sk lock.
  */
-int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
-		   int flags)
+int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
+		   struct pipe_inode_info *pipe, size_t len, int flags)
 {
 	size_t copylen, read_done = 0, read_remaining = len;
 	size_t chunk_len, chunk_off, chunk_len_sum;
 	struct smc_connection *conn = &smc->conn;
+	int (*func)(struct smc_connection *conn);
 	union smc_host_cursor cons;
 	int readable, chunk;
 	char *rcvbuf_base;
 	struct sock *sk;
+	int splbytes;
 	long timeo;
 	int target;		/* Read at least these many bytes */
 	int rc;
@@ -102,12 +218,11 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
 	target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
 
-	msg->msg_namelen = 0;
 	/* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */
 	rcvbuf_base = conn->rmb_desc->cpu_addr;
 
 	do { /* while (read_remaining) */
-		if (read_done >= target)
+		if (read_done >= target || (pipe && read_done))
 			break;
 
 		if (atomic_read(&conn->bytes_to_rcv))
@@ -156,11 +271,24 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 		/* initialize variables for 1st iteration of subsequent loop */
 		/* could be just 1 byte, even after waiting on data above */
 		readable = atomic_read(&conn->bytes_to_rcv);
+		splbytes = atomic_read(&conn->splice_pending);
+		if (!readable || (msg && splbytes)) {
+			if (splbytes)
+				func = smc_rx_data_available_and_no_splice_pend;
+			else
+				func = smc_rx_data_available;
+			smc_rx_wait(smc, &timeo, func);
+			continue;
+		}
+
 		/* not more than what user space asked for */
 		copylen = min_t(size_t, read_remaining, readable);
 		smc_curs_write(&cons,
 			       smc_curs_read(&conn->local_tx_ctrl.cons, conn),
 			       conn);
+		/* subsequent splice() calls pick up where previous left */
+		if (splbytes)
+			smc_curs_add(conn->rmbe_size, &cons, splbytes);
 		/* determine chunks where to read from rcvbuf */
 		/* either unwrapped case, or 1st chunk of wrapped case */
 		chunk_len = min_t(size_t,
@@ -170,9 +298,16 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 		smc_rmb_sync_sg_for_cpu(conn);
 		for (chunk = 0; chunk < 2; chunk++) {
 			if (!(flags & MSG_TRUNC)) {
-				rc = memcpy_to_msg(msg, rcvbuf_base + chunk_off,
-						   chunk_len);
-				if (rc) {
+				if (msg) {
+					rc = memcpy_to_msg(msg, rcvbuf_base +
+							   chunk_off,
+							   chunk_len);
+				} else {
+					rc = smc_rx_splice(pipe, rcvbuf_base +
+							chunk_off, chunk_len,
+							smc);
+				}
+				if (rc < 0) {
 					if (!read_done)
 						read_done = -EFAULT;
 					smc_rmb_sync_sg_for_device(conn);
@@ -193,18 +328,13 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 
 		/* update cursors */
 		if (!(flags & MSG_PEEK)) {
-			smc_curs_add(conn->rmbe_size, &cons, copylen);
 			/* increased in recv tasklet smc_cdc_msg_rcv() */
 			smp_mb__before_atomic();
 			atomic_sub(copylen, &conn->bytes_to_rcv);
 			/* guarantee 0 <= bytes_to_rcv <= rmbe_size */
 			smp_mb__after_atomic();
-			smc_curs_write(&conn->local_tx_ctrl.cons,
-				       smc_curs_read(&cons, conn),
-				       conn);
-			/* send consumer cursor update if required */
-			/* similar to advertising new TCP rcv_wnd if required */
-			smc_tx_consumer_update(conn);
+			if (msg)
+				smc_rx_update_consumer(conn, cons, copylen);
 		}
 	} while (read_remaining);
 out:
@@ -215,4 +345,5 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 void smc_rx_init(struct smc_sock *smc)
 {
 	smc->sk.sk_data_ready = smc_rx_wake_up;
+	atomic_set(&smc->conn.splice_pending, 0);
 }
diff --git a/net/smc/smc_rx.h b/net/smc/smc_rx.h
index 8f9f00997641..db823c97d824 100644
--- a/net/smc/smc_rx.h
+++ b/net/smc/smc_rx.h
@@ -18,8 +18,9 @@
 #include "smc.h"
 
 void smc_rx_init(struct smc_sock *smc);
-int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
-		   int flags);
+
+int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
+		   struct pipe_inode_info *pipe, size_t len, int flags);
 int smc_rx_wait(struct smc_sock *smc, long *timeo,
 		int (*fcrit)(struct smc_connection *conn));
 static inline int smc_rx_data_available(struct smc_connection *conn)
@@ -27,5 +28,4 @@ static inline int smc_rx_data_available(struct smc_connection *conn)
 	return atomic_read(&conn->bytes_to_rcv);
 }
 
-
 #endif /* SMC_RX_H */
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 6+ messages in thread

* Re: [PATCH net-next 0/4] net/smc: splice implementation
  2018-05-03 16:12 [PATCH net-next 0/4] net/smc: splice implementation Ursula Braun
                   ` (3 preceding siblings ...)
  2018-05-03 16:12 ` [PATCH net-next 4/4] smc: add support for splice() Ursula Braun
@ 2018-05-03 20:31 ` David Miller
  4 siblings, 0 replies; 6+ messages in thread
From: David Miller @ 2018-05-03 20:31 UTC (permalink / raw)
  To: ubraun; +Cc: netdev, linux-s390, schwidefsky, heiko.carstens, raspl

From: Ursula Braun <ubraun@linux.ibm.com>
Date: Thu,  3 May 2018 18:12:35 +0200

> From: Ursula Braun <ursula.braun@de.ibm.com>
> 
> Dave,
> 
> Stefan comes up with an smc implementation for splice(). The first
> three patches are preparational patches, the 4th patch implements
> splice().

Doesn't look too bad :)

Series applied, thanks.

^ permalink raw reply	[flat|nested] 6+ messages in thread

end of thread, other threads:[~2018-05-03 20:31 UTC | newest]

Thread overview: 6+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-05-03 16:12 [PATCH net-next 0/4] net/smc: splice implementation Ursula Braun
2018-05-03 16:12 ` [PATCH net-next 1/4] smc: simplify abort logic Ursula Braun
2018-05-03 16:12 ` [PATCH net-next 2/4] smc: make smc_rx_wait_data() generic Ursula Braun
2018-05-03 16:12 ` [PATCH net-next 3/4] smc: allocate RMBs as compound pages Ursula Braun
2018-05-03 16:12 ` [PATCH net-next 4/4] smc: add support for splice() Ursula Braun
2018-05-03 20:31 ` [PATCH net-next 0/4] net/smc: splice implementation David Miller

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.