All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH net-next 0/3] net: Infrastructure changes for [kz]proxy
@ 2017-07-28 23:22 Tom Herbert
  2017-07-28 23:22 ` [PATCH net-next 1/3] proto_ops: Add locked held versions of sendmsg and sendpage Tom Herbert
                   ` (3 more replies)
  0 siblings, 4 replies; 11+ messages in thread
From: Tom Herbert @ 2017-07-28 23:22 UTC (permalink / raw)
  To: netdev; +Cc: rohit, john.fastabend, Tom Herbert

This patch set contains some general infrastructure enhancements that
will be used by kernel proxy and zero proxy.

The changes are:
  - proto_ops: Add locked versions of sendmsg and sendpage
  - skb_send_sock: Allow sending and skb on a socket within the
    kernel
  - Generalize strparser. Allow it to be used in other contexts than
    just in the read_sock path. This will be used in the transmit
    path of zero proxy.

Some nice future work (which I've been discussing with John Fastabend)
will be to make some of the related functions to allow gifting of skbs
We should be able to do that with skb_send_sock and strp_process. I'd
also like this feature in the read_sock callbeck.

Tested: Ran modified kernel without incident. Tested new functionality
using zero proxy (in development).


Tom Herbert (3):
  proto_ops: Add locked held versions of sendmsg and sendpage
  skbuff: Function to send an skbuf on a socket
  strparser: Generalize strparser

 Documentation/networking/strparser.txt | 207 +++++++++++++++-------
 include/linux/net.h                    |  12 ++
 include/linux/skbuff.h                 |   3 +
 include/net/sock.h                     |   3 +
 include/net/strparser.h                | 119 +++++++------
 include/net/tcp.h                      |   3 +
 net/core/skbuff.c                      | 101 +++++++++++
 net/core/sock.c                        |  22 +++
 net/ipv4/af_inet.c                     |   2 +
 net/ipv4/tcp.c                         |  39 ++--
 net/kcm/kcmproc.c                      |  34 ++--
 net/kcm/kcmsock.c                      |  38 ++--
 net/socket.c                           |  27 +++
 net/strparser/strparser.c              | 313 ++++++++++++++++++++-------------
 14 files changed, 623 insertions(+), 300 deletions(-)

-- 
2.11.0

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH net-next 1/3] proto_ops: Add locked held versions of sendmsg and sendpage
  2017-07-28 23:22 [PATCH net-next 0/3] net: Infrastructure changes for [kz]proxy Tom Herbert
@ 2017-07-28 23:22 ` Tom Herbert
  2017-08-03 19:49   ` John Fastabend
  2017-08-03 20:21   ` John Fastabend
  2017-07-28 23:22 ` [PATCH net-next 2/3] skbuff: Function to send an skbuf on a socket Tom Herbert
                   ` (2 subsequent siblings)
  3 siblings, 2 replies; 11+ messages in thread
From: Tom Herbert @ 2017-07-28 23:22 UTC (permalink / raw)
  To: netdev; +Cc: rohit, john.fastabend, Tom Herbert

Add new proto_ops sendmsg_locked and sendpage_locked that can be
called when the socket lock is already held. Correspondingly, add
kernel_sendmsg_locked and kernel_sendpage_locked as front end
functions.

These functions will be used in zero proxy so that we can take
the socket lock in a ULP sendmsg/sendpage and then directly call the
backend transport proto_ops functions.

Signed-off-by: Tom Herbert <tom@quantonium.net>
---
 include/linux/net.h | 12 ++++++++++++
 include/net/sock.h  |  3 +++
 include/net/tcp.h   |  3 +++
 net/core/sock.c     | 22 ++++++++++++++++++++++
 net/ipv4/af_inet.c  |  2 ++
 net/ipv4/tcp.c      | 39 ++++++++++++++++++++++++++-------------
 net/socket.c        | 27 +++++++++++++++++++++++++++
 7 files changed, 95 insertions(+), 13 deletions(-)

diff --git a/include/linux/net.h b/include/linux/net.h
index dda2cc939a53..b5c15b31709b 100644
--- a/include/linux/net.h
+++ b/include/linux/net.h
@@ -190,8 +190,16 @@ struct proto_ops {
 				       struct pipe_inode_info *pipe, size_t len, unsigned int flags);
 	int		(*set_peek_off)(struct sock *sk, int val);
 	int		(*peek_len)(struct socket *sock);
+
+	/* The following functions are called internally by kernel with
+	 * sock lock already held.
+	 */
 	int		(*read_sock)(struct sock *sk, read_descriptor_t *desc,
 				     sk_read_actor_t recv_actor);
+	int		(*sendpage_locked)(struct sock *sk, struct page *page,
+					   int offset, size_t size, int flags);
+	int		(*sendmsg_locked)(struct sock *sk, struct msghdr *msg,
+					  size_t size);
 };
 
 #define DECLARE_SOCKADDR(type, dst, src)	\
@@ -279,6 +287,8 @@ do {									\
 
 int kernel_sendmsg(struct socket *sock, struct msghdr *msg, struct kvec *vec,
 		   size_t num, size_t len);
+int kernel_sendmsg_locked(struct sock *sk, struct msghdr *msg,
+			  struct kvec *vec, size_t num, size_t len);
 int kernel_recvmsg(struct socket *sock, struct msghdr *msg, struct kvec *vec,
 		   size_t num, size_t len, int flags);
 
@@ -297,6 +307,8 @@ int kernel_setsockopt(struct socket *sock, int level, int optname, char *optval,
 		      unsigned int optlen);
 int kernel_sendpage(struct socket *sock, struct page *page, int offset,
 		    size_t size, int flags);
+int kernel_sendpage_locked(struct sock *sk, struct page *page, int offset,
+			   size_t size, int flags);
 int kernel_sock_ioctl(struct socket *sock, int cmd, unsigned long arg);
 int kernel_sock_shutdown(struct socket *sock, enum sock_shutdown_cmd how);
 
diff --git a/include/net/sock.h b/include/net/sock.h
index 7c0632c7e870..393c38e9f6aa 100644
--- a/include/net/sock.h
+++ b/include/net/sock.h
@@ -1582,11 +1582,14 @@ int sock_no_shutdown(struct socket *, int);
 int sock_no_getsockopt(struct socket *, int , int, char __user *, int __user *);
 int sock_no_setsockopt(struct socket *, int, int, char __user *, unsigned int);
 int sock_no_sendmsg(struct socket *, struct msghdr *, size_t);
+int sock_no_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t len);
 int sock_no_recvmsg(struct socket *, struct msghdr *, size_t, int);
 int sock_no_mmap(struct file *file, struct socket *sock,
 		 struct vm_area_struct *vma);
 ssize_t sock_no_sendpage(struct socket *sock, struct page *page, int offset,
 			 size_t size, int flags);
+ssize_t sock_no_sendpage_locked(struct sock *sk, struct page *page,
+				int offset, size_t size, int flags);
 
 /*
  * Functions to fill in entries in struct proto_ops when a protocol
diff --git a/include/net/tcp.h b/include/net/tcp.h
index 12d68335acd4..1af32cceda58 100644
--- a/include/net/tcp.h
+++ b/include/net/tcp.h
@@ -351,8 +351,11 @@ int tcp_v4_rcv(struct sk_buff *skb);
 
 int tcp_v4_tw_remember_stamp(struct inet_timewait_sock *tw);
 int tcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t size);
+int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size);
 int tcp_sendpage(struct sock *sk, struct page *page, int offset, size_t size,
 		 int flags);
+int tcp_sendpage_locked(struct sock *sk, struct page *page, int offset,
+			size_t size, int flags);
 ssize_t do_tcp_sendpages(struct sock *sk, struct page *page, int offset,
 		 size_t size, int flags);
 void tcp_release_cb(struct sock *sk);
diff --git a/net/core/sock.c b/net/core/sock.c
index ac2a404c73eb..742f68c9c84a 100644
--- a/net/core/sock.c
+++ b/net/core/sock.c
@@ -2500,6 +2500,12 @@ int sock_no_sendmsg(struct socket *sock, struct msghdr *m, size_t len)
 }
 EXPORT_SYMBOL(sock_no_sendmsg);
 
+int sock_no_sendmsg_locked(struct sock *sk, struct msghdr *m, size_t len)
+{
+	return -EOPNOTSUPP;
+}
+EXPORT_SYMBOL(sock_no_sendmsg_locked);
+
 int sock_no_recvmsg(struct socket *sock, struct msghdr *m, size_t len,
 		    int flags)
 {
@@ -2528,6 +2534,22 @@ ssize_t sock_no_sendpage(struct socket *sock, struct page *page, int offset, siz
 }
 EXPORT_SYMBOL(sock_no_sendpage);
 
+ssize_t sock_no_sendpage_locked(struct sock *sk, struct page *page,
+				int offset, size_t size, int flags)
+{
+	ssize_t res;
+	struct msghdr msg = {.msg_flags = flags};
+	struct kvec iov;
+	char *kaddr = kmap(page);
+
+	iov.iov_base = kaddr + offset;
+	iov.iov_len = size;
+	res = kernel_sendmsg_locked(sk, &msg, &iov, 1, size);
+	kunmap(page);
+	return res;
+}
+EXPORT_SYMBOL(sock_no_sendpage_locked);
+
 /*
  *	Default Socket Callbacks
  */
diff --git a/net/ipv4/af_inet.c b/net/ipv4/af_inet.c
index 5ce44fb7d498..f0103ffe1cdb 100644
--- a/net/ipv4/af_inet.c
+++ b/net/ipv4/af_inet.c
@@ -944,6 +944,8 @@ const struct proto_ops inet_stream_ops = {
 	.sendpage	   = inet_sendpage,
 	.splice_read	   = tcp_splice_read,
 	.read_sock	   = tcp_read_sock,
+	.sendmsg_locked    = tcp_sendmsg_locked,
+	.sendpage_locked   = tcp_sendpage_locked,
 	.peek_len	   = tcp_peek_len,
 #ifdef CONFIG_COMPAT
 	.compat_setsockopt = compat_sock_common_setsockopt,
diff --git a/net/ipv4/tcp.c b/net/ipv4/tcp.c
index 71ce33decd97..ae39926b98b4 100644
--- a/net/ipv4/tcp.c
+++ b/net/ipv4/tcp.c
@@ -1034,23 +1034,29 @@ ssize_t do_tcp_sendpages(struct sock *sk, struct page *page, int offset,
 }
 EXPORT_SYMBOL_GPL(do_tcp_sendpages);
 
-int tcp_sendpage(struct sock *sk, struct page *page, int offset,
-		 size_t size, int flags)
+int tcp_sendpage_locked(struct sock *sk, struct page *page, int offset,
+			size_t size, int flags)
 {
-	ssize_t res;
-
 	if (!(sk->sk_route_caps & NETIF_F_SG) ||
 	    !sk_check_csum_caps(sk))
 		return sock_no_sendpage(sk->sk_socket, page, offset, size,
 					flags);
 
-	lock_sock(sk);
-
 	tcp_rate_check_app_limited(sk);  /* is sending application-limited? */
 
-	res = do_tcp_sendpages(sk, page, offset, size, flags);
+	return do_tcp_sendpages(sk, page, offset, size, flags);
+}
+
+int tcp_sendpage(struct sock *sk, struct page *page, int offset,
+		 size_t size, int flags)
+{
+	int ret;
+
+	lock_sock(sk);
+	ret = tcp_sendpage_locked(sk, page, offset, size, flags);
 	release_sock(sk);
-	return res;
+
+	return ret;
 }
 EXPORT_SYMBOL(tcp_sendpage);
 
@@ -1144,7 +1150,7 @@ static int tcp_sendmsg_fastopen(struct sock *sk, struct msghdr *msg,
 	return err;
 }
 
-int tcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t size)
+int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size)
 {
 	struct tcp_sock *tp = tcp_sk(sk);
 	struct sk_buff *skb;
@@ -1155,8 +1161,6 @@ int tcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t size)
 	bool sg;
 	long timeo;
 
-	lock_sock(sk);
-
 	flags = msg->msg_flags;
 	if (unlikely(flags & MSG_FASTOPEN || inet_sk(sk)->defer_connect)) {
 		err = tcp_sendmsg_fastopen(sk, msg, &copied_syn, size);
@@ -1365,7 +1369,6 @@ int tcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t size)
 		tcp_push(sk, flags, mss_now, tp->nonagle, size_goal);
 	}
 out_nopush:
-	release_sock(sk);
 	return copied + copied_syn;
 
 do_fault:
@@ -1389,9 +1392,19 @@ int tcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t size)
 		sk->sk_write_space(sk);
 		tcp_chrono_stop(sk, TCP_CHRONO_SNDBUF_LIMITED);
 	}
-	release_sock(sk);
 	return err;
 }
+
+int tcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t size)
+{
+	int ret;
+
+	lock_sock(sk);
+	ret = tcp_sendmsg_locked(sk, msg, size);
+	release_sock(sk);
+
+	return ret;
+}
 EXPORT_SYMBOL(tcp_sendmsg);
 
 /*
diff --git a/net/socket.c b/net/socket.c
index 79d9bb964cd8..c0a12ad39610 100644
--- a/net/socket.c
+++ b/net/socket.c
@@ -652,6 +652,20 @@ int kernel_sendmsg(struct socket *sock, struct msghdr *msg,
 }
 EXPORT_SYMBOL(kernel_sendmsg);
 
+int kernel_sendmsg_locked(struct sock *sk, struct msghdr *msg,
+			  struct kvec *vec, size_t num, size_t size)
+{
+	struct socket *sock = sk->sk_socket;
+
+	if (!sock->ops->sendmsg_locked)
+		sock_no_sendmsg_locked(sk, msg, size);
+
+	iov_iter_kvec(&msg->msg_iter, WRITE | ITER_KVEC, vec, num, size);
+
+	return sock->ops->sendmsg_locked(sk, msg, msg_data_left(msg));
+}
+EXPORT_SYMBOL(kernel_sendmsg_locked);
+
 static bool skb_is_err_queue(const struct sk_buff *skb)
 {
 	/* pkt_type of skbs enqueued on the error queue are set to
@@ -3375,6 +3389,19 @@ int kernel_sendpage(struct socket *sock, struct page *page, int offset,
 }
 EXPORT_SYMBOL(kernel_sendpage);
 
+int kernel_sendpage_locked(struct sock *sk, struct page *page, int offset,
+			   size_t size, int flags)
+{
+	struct socket *sock = sk->sk_socket;
+
+	if (sock->ops->sendpage_locked)
+		return sock->ops->sendpage_locked(sk, page, offset, size,
+						  flags);
+
+	return sock_no_sendpage_locked(sk, page, offset, size, flags);
+}
+EXPORT_SYMBOL(kernel_sendpage_locked);
+
 int kernel_sock_ioctl(struct socket *sock, int cmd, unsigned long arg)
 {
 	mm_segment_t oldfs = get_fs();
-- 
2.11.0

^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH net-next 2/3] skbuff: Function to send an skbuf on a socket
  2017-07-28 23:22 [PATCH net-next 0/3] net: Infrastructure changes for [kz]proxy Tom Herbert
  2017-07-28 23:22 ` [PATCH net-next 1/3] proto_ops: Add locked held versions of sendmsg and sendpage Tom Herbert
@ 2017-07-28 23:22 ` Tom Herbert
  2017-08-03 19:51   ` John Fastabend
  2017-07-28 23:22 ` [PATCH net-next 3/3] strparser: Generalize strparser Tom Herbert
  2017-08-01 22:26 ` [PATCH net-next 0/3] net: Infrastructure changes for [kz]proxy David Miller
  3 siblings, 1 reply; 11+ messages in thread
From: Tom Herbert @ 2017-07-28 23:22 UTC (permalink / raw)
  To: netdev; +Cc: rohit, john.fastabend, Tom Herbert

Add skb_send_sock to send an skbuff on a socket within the kernel.
Arguments include an offset so that an skbuf might be sent in mulitple
calls (e.g. send buffer limit is hit).

Signed-off-by: Tom Herbert <tom@quantonium.net>
---
 include/linux/skbuff.h |   3 ++
 net/core/skbuff.c      | 101 +++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 104 insertions(+)

diff --git a/include/linux/skbuff.h b/include/linux/skbuff.h
index 4093552be1de..18e76bf9574e 100644
--- a/include/linux/skbuff.h
+++ b/include/linux/skbuff.h
@@ -3113,6 +3113,9 @@ __wsum skb_copy_and_csum_bits(const struct sk_buff *skb, int offset, u8 *to,
 int skb_splice_bits(struct sk_buff *skb, struct sock *sk, unsigned int offset,
 		    struct pipe_inode_info *pipe, unsigned int len,
 		    unsigned int flags);
+int skb_send_sock_locked(struct sock *sk, struct sk_buff *skb, int offset,
+			 int len);
+int skb_send_sock(struct sock *sk, struct sk_buff *skb, int offset, int len);
 void skb_copy_and_csum_dev(const struct sk_buff *skb, u8 *to);
 unsigned int skb_zerocopy_headlen(const struct sk_buff *from);
 int skb_zerocopy(struct sk_buff *to, struct sk_buff *from,
diff --git a/net/core/skbuff.c b/net/core/skbuff.c
index c27da51d14e4..9c0e015ff3fe 100644
--- a/net/core/skbuff.c
+++ b/net/core/skbuff.c
@@ -1982,6 +1982,107 @@ int skb_splice_bits(struct sk_buff *skb, struct sock *sk, unsigned int offset,
 }
 EXPORT_SYMBOL_GPL(skb_splice_bits);
 
+/* Send skb data on a socket. Socket must be locked. */
+int skb_send_sock_locked(struct sock *sk, struct sk_buff *skb, int offset,
+			 int len)
+{
+	unsigned int orig_len = len;
+	struct sk_buff *head = skb;
+	unsigned short fragidx;
+	int slen, ret;
+
+do_frag_list:
+
+	/* Deal with head data */
+	while (offset < skb_headlen(skb) && len) {
+		struct kvec kv;
+		struct msghdr msg;
+
+		slen = min_t(int, len, skb_headlen(skb) - offset);
+		kv.iov_base = skb->data + offset;
+		kv.iov_len = len;
+		memset(&msg, 0, sizeof(msg));
+
+		ret = kernel_sendmsg_locked(sk, &msg, &kv, 1, slen);
+		if (ret <= 0)
+			goto error;
+
+		offset += ret;
+		len -= ret;
+	}
+
+	/* All the data was skb head? */
+	if (!len)
+		goto out;
+
+	/* Make offset relative to start of frags */
+	offset -= skb_headlen(skb);
+
+	/* Find where we are in frag list */
+	for (fragidx = 0; fragidx < skb_shinfo(skb)->nr_frags; fragidx++) {
+		skb_frag_t *frag  = &skb_shinfo(skb)->frags[fragidx];
+
+		if (offset < frag->size)
+			break;
+
+		offset -= frag->size;
+	}
+
+	for (; len && fragidx < skb_shinfo(skb)->nr_frags; fragidx++) {
+		skb_frag_t *frag  = &skb_shinfo(skb)->frags[fragidx];
+
+		slen = min_t(size_t, len, frag->size - offset);
+
+		while (slen) {
+			ret = kernel_sendpage_locked(sk, frag->page.p,
+						     frag->page_offset + offset,
+						     slen, MSG_DONTWAIT);
+			if (ret <= 0)
+				goto error;
+
+			len -= ret;
+			offset += ret;
+			slen -= ret;
+		}
+
+		offset = 0;
+	}
+
+	if (len) {
+		/* Process any frag lists */
+
+		if (skb == head) {
+			if (skb_has_frag_list(skb)) {
+				skb = skb_shinfo(skb)->frag_list;
+				goto do_frag_list;
+			}
+		} else if (skb->next) {
+			skb = skb->next;
+			goto do_frag_list;
+		}
+	}
+
+out:
+	return orig_len - len;
+
+error:
+	return orig_len == len ? ret : orig_len - len;
+}
+EXPORT_SYMBOL_GPL(skb_send_sock_locked);
+
+/* Send skb data on a socket. */
+int skb_send_sock(struct sock *sk, struct sk_buff *skb, int offset, int len)
+{
+	int ret = 0;
+
+	lock_sock(sk);
+	ret = skb_send_sock_locked(sk, skb, offset, len);
+	release_sock(sk);
+
+	return ret;
+}
+EXPORT_SYMBOL_GPL(skb_send_sock);
+
 /**
  *	skb_store_bits - store bits from kernel buffer to skb
  *	@skb: destination buffer
-- 
2.11.0

^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH net-next 3/3] strparser: Generalize strparser
  2017-07-28 23:22 [PATCH net-next 0/3] net: Infrastructure changes for [kz]proxy Tom Herbert
  2017-07-28 23:22 ` [PATCH net-next 1/3] proto_ops: Add locked held versions of sendmsg and sendpage Tom Herbert
  2017-07-28 23:22 ` [PATCH net-next 2/3] skbuff: Function to send an skbuf on a socket Tom Herbert
@ 2017-07-28 23:22 ` Tom Herbert
  2017-10-19 17:42   ` Eric Dumazet
  2017-08-01 22:26 ` [PATCH net-next 0/3] net: Infrastructure changes for [kz]proxy David Miller
  3 siblings, 1 reply; 11+ messages in thread
From: Tom Herbert @ 2017-07-28 23:22 UTC (permalink / raw)
  To: netdev; +Cc: rohit, john.fastabend, Tom Herbert

Generalize strparser from more than just being used in conjunction
with read_sock. strparser will also be used in the send path with
zero proxy. The primary change is to create strp_process function
that performs the critical processing on skbs. The documentation
is also updated to reflect the new uses.

Signed-off-by: Tom Herbert <tom@quantonium.net>
---
 Documentation/networking/strparser.txt | 207 +++++++++++++++-------
 include/net/strparser.h                | 119 +++++++------
 net/kcm/kcmproc.c                      |  34 ++--
 net/kcm/kcmsock.c                      |  38 ++--
 net/strparser/strparser.c              | 313 ++++++++++++++++++++-------------
 5 files changed, 424 insertions(+), 287 deletions(-)

diff --git a/Documentation/networking/strparser.txt b/Documentation/networking/strparser.txt
index a0bf573dfa61..fe01302471ae 100644
--- a/Documentation/networking/strparser.txt
+++ b/Documentation/networking/strparser.txt
@@ -1,45 +1,107 @@
-Stream Parser
--------------
+Stream Parser (strparser)
+
+Introduction
+============
 
 The stream parser (strparser) is a utility that parses messages of an
-application layer protocol running over a TCP connection. The stream
+application layer protocol running over a data stream. The stream
 parser works in conjunction with an upper layer in the kernel to provide
 kernel support for application layer messages. For instance, Kernel
 Connection Multiplexor (KCM) uses the Stream Parser to parse messages
 using a BPF program.
 
+The strparser works in one of two modes: receive callback or general
+mode.
+
+In receive callback mode, the strparser is called from the data_ready
+callback of a TCP socket. Messages are parsed and delivered as they are
+received on the socket.
+
+In general mode, a sequence of skbs are fed to strparser from an
+outside source. Message are parsed and delivered as the sequence is
+processed. This modes allows strparser to be applied to arbitrary
+streams of data.
+
 Interface
----------
+=========
 
 The API includes a context structure, a set of callbacks, utility
-functions, and a data_ready function. The callbacks include
-a parse_msg function that is called to perform parsing (e.g.
-BPF parsing in case of KCM), and a rcv_msg function that is called
-when a full message has been completed.
+functions, and a data_ready function for receive callback mode. The
+callbacks include a parse_msg function that is called to perform
+parsing (e.g.  BPF parsing in case of KCM), and a rcv_msg function
+that is called when a full message has been completed.
 
-A stream parser can be instantiated for a TCP connection. This is done
-by:
+Functions
+=========
 
-strp_init(struct strparser *strp, struct sock *csk,
+strp_init(struct strparser *strp, struct sock *sk,
 	  struct strp_callbacks *cb)
 
-strp is a struct of type strparser that is allocated by the upper layer.
-csk is the TCP socket associated with the stream parser. Callbacks are
-called by the stream parser.
+     Called to initialize a stream parser. strp is a struct of type
+     strparser that is allocated by the upper layer. sk is the TCP
+     socket associated with the stream parser for use with receive
+     callback mode; in general mode this is set to NULL. Callbacks
+     are called by the stream parser (the callbacks are listed below).
+
+void strp_pause(struct strparser *strp)
+
+     Temporarily pause a stream parser. Message parsing is suspended
+     and no new messages are delivered to the upper layer.
+
+void strp_pause(struct strparser *strp)
+
+     Unpause a paused stream parser.
+
+void strp_stop(struct strparser *strp);
+
+     strp_stop is called to completely stop stream parser operations.
+     This is called internally when the stream parser encounters an
+     error, and it is called from the upper layer to stop parsing
+     operations.
+
+void strp_done(struct strparser *strp);
+
+     strp_done is called to release any resources held by the stream
+     parser instance. This must be called after the stream processor
+     has been stopped.
+
+int strp_process(struct strparser *strp, struct sk_buff *orig_skb,
+		 unsigned int orig_offset, size_t orig_len,
+		 size_t max_msg_size, long timeo)
+
+    strp_process is called in general mode for a stream parser to
+    parse an sk_buff. The number of bytes processed or a negative
+    error number is returned. Note that strp_process does not
+    consume the sk_buff. max_msg_size is maximum size the stream
+    parser will parse. timeo is timeout for completing a message.
+
+void strp_data_ready(struct strparser *strp);
+
+    The upper layer calls strp_tcp_data_ready when data is ready on
+    the lower socket for strparser to process. This should be called
+    from a data_ready callback that is set on the socket. Note that
+    maximum messages size is the limit of the receive socket
+    buffer and message timeout is the receive timeout for the socket.
+
+void strp_check_rcv(struct strparser *strp);
+
+    strp_check_rcv is called to check for new messages on the socket.
+    This is normally called at initialization of a stream parser
+    instance or after strp_unpause.
 
 Callbacks
----------
+=========
 
-There are four callbacks:
+There are six callbacks:
 
 int (*parse_msg)(struct strparser *strp, struct sk_buff *skb);
 
     parse_msg is called to determine the length of the next message
     in the stream. The upper layer must implement this function. It
     should parse the sk_buff as containing the headers for the
-    next application layer messages in the stream.
+    next application layer message in the stream.
 
-    The skb->cb in the input skb is a struct strp_rx_msg. Only
+    The skb->cb in the input skb is a struct strp_msg. Only
     the offset field is relevant in parse_msg and gives the offset
     where the message starts in the skb.
 
@@ -50,26 +112,41 @@ int (*parse_msg)(struct strparser *strp, struct sk_buff *skb);
     -ESTRPIPE : current message should not be processed by the
           kernel, return control of the socket to userspace which
           can proceed to read the messages itself
-    other < 0 : Error is parsing, give control back to userspace
+    other < 0 : Error in parsing, give control back to userspace
           assuming that synchronization is lost and the stream
           is unrecoverable (application expected to close TCP socket)
 
     In the case that an error is returned (return value is less than
-    zero) the stream parser will set the error on TCP socket and wake
-    it up. If parse_msg returned -ESTRPIPE and the stream parser had
-    previously read some bytes for the current message, then the error
-    set on the attached socket is ENODATA since the stream is
-    unrecoverable in that case.
+    zero) and the parser is in receive callback mode, then it will set
+    the error on TCP socket and wake it up. If parse_msg returned
+    -ESTRPIPE and the stream parser had previously read some bytes for
+    the current message, then the error set on the attached socket is
+    ENODATA since the stream is unrecoverable in that case.
+
+void (*lock)(struct strparser *strp)
+
+    The lock callback is called to lock the strp structure when
+    the strparser is performing an asynchronous operation (such as
+    processing a timeout). In receive callback mode the default
+    function is to lock_sock for the associated socket. In general
+    mode the callback must be set appropriately.
+
+void (*unlock)(struct strparser *strp)
+
+    The unlock callback is called to release the lock obtained
+    by the lock callback. In receive callback mode the default
+    function is release_sock for the associated socket. In general
+    mode the callback must be set appropriately.
 
 void (*rcv_msg)(struct strparser *strp, struct sk_buff *skb);
 
     rcv_msg is called when a full message has been received and
     is queued. The callee must consume the sk_buff; it can
     call strp_pause to prevent any further messages from being
-    received in rcv_msg (see strp_pause below). This callback
+    received in rcv_msg (see strp_pause above). This callback
     must be set.
 
-    The skb->cb in the input skb is a struct strp_rx_msg. This
+    The skb->cb in the input skb is a struct strp_msg. This
     struct contains two fields: offset and full_len. Offset is
     where the message starts in the skb, and full_len is the
     the length of the message. skb->len - offset may be greater
@@ -78,59 +155,53 @@ void (*rcv_msg)(struct strparser *strp, struct sk_buff *skb);
 int (*read_sock_done)(struct strparser *strp, int err);
 
      read_sock_done is called when the stream parser is done reading
-     the TCP socket. The stream parser may read multiple messages
-     in a loop and this function allows cleanup to occur when existing
-     the loop. If the callback is not set (NULL in strp_init) a
-     default function is used.
+     the TCP socket in receive callback mode. The stream parser may
+     read multiple messages in a loop and this function allows cleanup
+     to occur when exiting the loop. If the callback is not set (NULL
+     in strp_init) a default function is used.
 
 void (*abort_parser)(struct strparser *strp, int err);
 
      This function is called when stream parser encounters an error
-     in parsing. The default function stops the stream parser for the
-     TCP socket and sets the error in the socket. The default function
-     can be changed by setting the callback to non-NULL in strp_init.
+     in parsing. The default function stops the stream parser and
+     sets the error in the socket if the parser is in receive callback
+     mode. The default function can be changed by setting the callback
+     to non-NULL in strp_init.
 
-Functions
----------
+Statistics
+==========
 
-The upper layer calls strp_tcp_data_ready when data is ready on the lower
-socket for strparser to process. This should be called from a data_ready
-callback that is set on the socket.
+Various counters are kept for each stream parser instance. These are in
+the strp_stats structure. strp_aggr_stats is a convenience structure for
+accumulating statistics for multiple stream parser instances.
+save_strp_stats and aggregate_strp_stats are helper functions to save
+and aggregate statistics.
 
-strp_stop is called to completely stop stream parser operations. This
-is called internally when the stream parser encounters an error, and
-it is called from the upper layer when unattaching a TCP socket.
+Message assembly limits
+=======================
 
-strp_done is called to unattach the stream parser from the TCP socket.
-This must be called after the stream processor has be stopped.
+The stream parser provide mechanisms to limit the resources consumed by
+message assembly.
 
-strp_check_rcv is called to check for new messages on the socket. This
-is normally called at initialization of the a stream parser instance
-of after strp_unpause.
+A timer is set when assembly starts for a new message. In receive
+callback mode the message timeout is taken from rcvtime for the
+associated TCP socket. In general mode, the timeout is passed as an
+argument in strp_process. If the timer fires before assembly completes
+the stream parser is aborted and the ETIMEDOUT error is set on the TCP
+socket if in receive callback mode.
 
-Statistics
-----------
+In receive callback mode, message length is limited to the receive
+buffer size of the associated TCP socket. If the length returned by
+parse_msg is greater than the socket buffer size then the stream parser
+is aborted with EMSGSIZE error set on the TCP socket. Note that this
+makes the maximum size of receive skbuffs for a socket with a stream
+parser to be 2*sk_rcvbuf of the TCP socket.
 
-Various counters are kept for each stream parser for a TCP socket.
-These are in the strp_stats structure. strp_aggr_stats is a convenience
-structure for accumulating statistics for multiple stream parser
-instances. save_strp_stats and aggregate_strp_stats are helper functions
-to save and aggregate statistics.
+In general mode the message length limit is passed in as an argument
+to strp_process.
 
-Message assembly limits
------------------------
+Author
+======
 
-The stream parser provide mechanisms to limit the resources consumed by
-message assembly.
+Tom Herbert (tom@quantonium.net)
 
-A timer is set when assembly starts for a new message. The message
-timeout is taken from rcvtime for the associated TCP socket. If the
-timer fires before assembly completes the stream parser is aborted
-and the ETIMEDOUT error is set on the TCP socket.
-
-Message length is limited to the receive buffer size of the associated
-TCP socket. If the length returned by parse_msg is greater than
-the socket buffer size then the stream parser is aborted with
-EMSGSIZE error set on the TCP socket. Note that this makes the
-maximum size of receive skbuffs for a socket with a stream parser
-to be 2*sk_rcvbuf of the TCP socket.
diff --git a/include/net/strparser.h b/include/net/strparser.h
index 0c28ad97c52f..4fe966a0ad92 100644
--- a/include/net/strparser.h
+++ b/include/net/strparser.h
@@ -18,26 +18,26 @@
 #define STRP_STATS_INCR(stat) ((stat)++)
 
 struct strp_stats {
-	unsigned long long rx_msgs;
-	unsigned long long rx_bytes;
-	unsigned int rx_mem_fail;
-	unsigned int rx_need_more_hdr;
-	unsigned int rx_msg_too_big;
-	unsigned int rx_msg_timeouts;
-	unsigned int rx_bad_hdr_len;
+	unsigned long long msgs;
+	unsigned long long bytes;
+	unsigned int mem_fail;
+	unsigned int need_more_hdr;
+	unsigned int msg_too_big;
+	unsigned int msg_timeouts;
+	unsigned int bad_hdr_len;
 };
 
 struct strp_aggr_stats {
-	unsigned long long rx_msgs;
-	unsigned long long rx_bytes;
-	unsigned int rx_mem_fail;
-	unsigned int rx_need_more_hdr;
-	unsigned int rx_msg_too_big;
-	unsigned int rx_msg_timeouts;
-	unsigned int rx_bad_hdr_len;
-	unsigned int rx_aborts;
-	unsigned int rx_interrupted;
-	unsigned int rx_unrecov_intr;
+	unsigned long long msgs;
+	unsigned long long bytes;
+	unsigned int mem_fail;
+	unsigned int need_more_hdr;
+	unsigned int msg_too_big;
+	unsigned int msg_timeouts;
+	unsigned int bad_hdr_len;
+	unsigned int aborts;
+	unsigned int interrupted;
+	unsigned int unrecov_intr;
 };
 
 struct strparser;
@@ -48,16 +48,18 @@ struct strp_callbacks {
 	void (*rcv_msg)(struct strparser *strp, struct sk_buff *skb);
 	int (*read_sock_done)(struct strparser *strp, int err);
 	void (*abort_parser)(struct strparser *strp, int err);
+	void (*lock)(struct strparser *strp);
+	void (*unlock)(struct strparser *strp);
 };
 
-struct strp_rx_msg {
+struct strp_msg {
 	int full_len;
 	int offset;
 };
 
-static inline struct strp_rx_msg *strp_rx_msg(struct sk_buff *skb)
+static inline struct strp_msg *strp_msg(struct sk_buff *skb)
 {
-	return (struct strp_rx_msg *)((void *)skb->cb +
+	return (struct strp_msg *)((void *)skb->cb +
 		offsetof(struct qdisc_skb_cb, data));
 }
 
@@ -65,18 +67,18 @@ static inline struct strp_rx_msg *strp_rx_msg(struct sk_buff *skb)
 struct strparser {
 	struct sock *sk;
 
-	u32 rx_stopped : 1;
-	u32 rx_paused : 1;
-	u32 rx_aborted : 1;
-	u32 rx_interrupted : 1;
-	u32 rx_unrecov_intr : 1;
-
-	struct sk_buff **rx_skb_nextp;
-	struct timer_list rx_msg_timer;
-	struct sk_buff *rx_skb_head;
-	unsigned int rx_need_bytes;
-	struct delayed_work rx_delayed_work;
-	struct work_struct rx_work;
+	u32 stopped : 1;
+	u32 paused : 1;
+	u32 aborted : 1;
+	u32 interrupted : 1;
+	u32 unrecov_intr : 1;
+
+	struct sk_buff **skb_nextp;
+	struct timer_list msg_timer;
+	struct sk_buff *skb_head;
+	unsigned int need_bytes;
+	struct delayed_work delayed_work;
+	struct work_struct work;
 	struct strp_stats stats;
 	struct strp_callbacks cb;
 };
@@ -84,7 +86,7 @@ struct strparser {
 /* Must be called with lock held for attached socket */
 static inline void strp_pause(struct strparser *strp)
 {
-	strp->rx_paused = 1;
+	strp->paused = 1;
 }
 
 /* May be called without holding lock for attached socket */
@@ -97,37 +99,37 @@ static inline void save_strp_stats(struct strparser *strp,
 
 #define SAVE_PSOCK_STATS(_stat) (agg_stats->_stat +=		\
 				 strp->stats._stat)
-	SAVE_PSOCK_STATS(rx_msgs);
-	SAVE_PSOCK_STATS(rx_bytes);
-	SAVE_PSOCK_STATS(rx_mem_fail);
-	SAVE_PSOCK_STATS(rx_need_more_hdr);
-	SAVE_PSOCK_STATS(rx_msg_too_big);
-	SAVE_PSOCK_STATS(rx_msg_timeouts);
-	SAVE_PSOCK_STATS(rx_bad_hdr_len);
+	SAVE_PSOCK_STATS(msgs);
+	SAVE_PSOCK_STATS(bytes);
+	SAVE_PSOCK_STATS(mem_fail);
+	SAVE_PSOCK_STATS(need_more_hdr);
+	SAVE_PSOCK_STATS(msg_too_big);
+	SAVE_PSOCK_STATS(msg_timeouts);
+	SAVE_PSOCK_STATS(bad_hdr_len);
 #undef SAVE_PSOCK_STATS
 
-	if (strp->rx_aborted)
-		agg_stats->rx_aborts++;
-	if (strp->rx_interrupted)
-		agg_stats->rx_interrupted++;
-	if (strp->rx_unrecov_intr)
-		agg_stats->rx_unrecov_intr++;
+	if (strp->aborted)
+		agg_stats->aborts++;
+	if (strp->interrupted)
+		agg_stats->interrupted++;
+	if (strp->unrecov_intr)
+		agg_stats->unrecov_intr++;
 }
 
 static inline void aggregate_strp_stats(struct strp_aggr_stats *stats,
 					struct strp_aggr_stats *agg_stats)
 {
 #define SAVE_PSOCK_STATS(_stat) (agg_stats->_stat += stats->_stat)
-	SAVE_PSOCK_STATS(rx_msgs);
-	SAVE_PSOCK_STATS(rx_bytes);
-	SAVE_PSOCK_STATS(rx_mem_fail);
-	SAVE_PSOCK_STATS(rx_need_more_hdr);
-	SAVE_PSOCK_STATS(rx_msg_too_big);
-	SAVE_PSOCK_STATS(rx_msg_timeouts);
-	SAVE_PSOCK_STATS(rx_bad_hdr_len);
-	SAVE_PSOCK_STATS(rx_aborts);
-	SAVE_PSOCK_STATS(rx_interrupted);
-	SAVE_PSOCK_STATS(rx_unrecov_intr);
+	SAVE_PSOCK_STATS(msgs);
+	SAVE_PSOCK_STATS(bytes);
+	SAVE_PSOCK_STATS(mem_fail);
+	SAVE_PSOCK_STATS(need_more_hdr);
+	SAVE_PSOCK_STATS(msg_too_big);
+	SAVE_PSOCK_STATS(msg_timeouts);
+	SAVE_PSOCK_STATS(bad_hdr_len);
+	SAVE_PSOCK_STATS(aborts);
+	SAVE_PSOCK_STATS(interrupted);
+	SAVE_PSOCK_STATS(unrecov_intr);
 #undef SAVE_PSOCK_STATS
 
 }
@@ -135,8 +137,11 @@ static inline void aggregate_strp_stats(struct strp_aggr_stats *stats,
 void strp_done(struct strparser *strp);
 void strp_stop(struct strparser *strp);
 void strp_check_rcv(struct strparser *strp);
-int strp_init(struct strparser *strp, struct sock *csk,
+int strp_init(struct strparser *strp, struct sock *sk,
 	      struct strp_callbacks *cb);
 void strp_data_ready(struct strparser *strp);
+int strp_process(struct strparser *strp, struct sk_buff *orig_skb,
+		 unsigned int orig_offset, size_t orig_len,
+		 size_t max_msg_size, long timeo);
 
 #endif /* __NET_STRPARSER_H_ */
diff --git a/net/kcm/kcmproc.c b/net/kcm/kcmproc.c
index c343ac60bf50..c748e8a6a72c 100644
--- a/net/kcm/kcmproc.c
+++ b/net/kcm/kcmproc.c
@@ -155,8 +155,8 @@ static void kcm_format_psock(struct kcm_psock *psock, struct seq_file *seq,
 	seq_printf(seq,
 		   "   psock-%-5u %-10llu %-16llu %-10llu %-16llu %-8d %-8d %-8d %-8d ",
 		   psock->index,
-		   psock->strp.stats.rx_msgs,
-		   psock->strp.stats.rx_bytes,
+		   psock->strp.stats.msgs,
+		   psock->strp.stats.bytes,
 		   psock->stats.tx_msgs,
 		   psock->stats.tx_bytes,
 		   psock->sk->sk_receive_queue.qlen,
@@ -170,22 +170,22 @@ static void kcm_format_psock(struct kcm_psock *psock, struct seq_file *seq,
 	if (psock->tx_stopped)
 		seq_puts(seq, "TxStop ");
 
-	if (psock->strp.rx_stopped)
+	if (psock->strp.stopped)
 		seq_puts(seq, "RxStop ");
 
 	if (psock->tx_kcm)
 		seq_printf(seq, "Rsvd-%d ", psock->tx_kcm->index);
 
-	if (!psock->strp.rx_paused && !psock->ready_rx_msg) {
+	if (!psock->strp.paused && !psock->ready_rx_msg) {
 		if (psock->sk->sk_receive_queue.qlen) {
-			if (psock->strp.rx_need_bytes)
+			if (psock->strp.need_bytes)
 				seq_printf(seq, "RxWait=%u ",
-					   psock->strp.rx_need_bytes);
+					   psock->strp.need_bytes);
 			else
 				seq_printf(seq, "RxWait ");
 		}
 	} else  {
-		if (psock->strp.rx_paused)
+		if (psock->strp.paused)
 			seq_puts(seq, "RxPause ");
 
 		if (psock->ready_rx_msg)
@@ -371,20 +371,20 @@ static int kcm_stats_seq_show(struct seq_file *seq, void *v)
 	seq_printf(seq,
 		   "%-8s %-10llu %-16llu %-10llu %-16llu %-10llu %-10llu %-10u %-10u %-10u %-10u %-10u %-10u %-10u %-10u %-10u\n",
 		   "",
-		   strp_stats.rx_msgs,
-		   strp_stats.rx_bytes,
+		   strp_stats.msgs,
+		   strp_stats.bytes,
 		   psock_stats.tx_msgs,
 		   psock_stats.tx_bytes,
 		   psock_stats.reserved,
 		   psock_stats.unreserved,
-		   strp_stats.rx_aborts,
-		   strp_stats.rx_interrupted,
-		   strp_stats.rx_unrecov_intr,
-		   strp_stats.rx_mem_fail,
-		   strp_stats.rx_need_more_hdr,
-		   strp_stats.rx_bad_hdr_len,
-		   strp_stats.rx_msg_too_big,
-		   strp_stats.rx_msg_timeouts,
+		   strp_stats.aborts,
+		   strp_stats.interrupted,
+		   strp_stats.unrecov_intr,
+		   strp_stats.mem_fail,
+		   strp_stats.need_more_hdr,
+		   strp_stats.bad_hdr_len,
+		   strp_stats.msg_too_big,
+		   strp_stats.msg_timeouts,
 		   psock_stats.tx_aborts);
 
 	return 0;
diff --git a/net/kcm/kcmsock.c b/net/kcm/kcmsock.c
index da49191f7ad0..88ce73288247 100644
--- a/net/kcm/kcmsock.c
+++ b/net/kcm/kcmsock.c
@@ -96,12 +96,12 @@ static void kcm_update_rx_mux_stats(struct kcm_mux *mux,
 				    struct kcm_psock *psock)
 {
 	STRP_STATS_ADD(mux->stats.rx_bytes,
-		       psock->strp.stats.rx_bytes -
+		       psock->strp.stats.bytes -
 		       psock->saved_rx_bytes);
 	mux->stats.rx_msgs +=
-		psock->strp.stats.rx_msgs - psock->saved_rx_msgs;
-	psock->saved_rx_msgs = psock->strp.stats.rx_msgs;
-	psock->saved_rx_bytes = psock->strp.stats.rx_bytes;
+		psock->strp.stats.msgs - psock->saved_rx_msgs;
+	psock->saved_rx_msgs = psock->strp.stats.msgs;
+	psock->saved_rx_bytes = psock->strp.stats.bytes;
 }
 
 static void kcm_update_tx_mux_stats(struct kcm_mux *mux,
@@ -1118,7 +1118,7 @@ static int kcm_recvmsg(struct socket *sock, struct msghdr *msg,
 	struct kcm_sock *kcm = kcm_sk(sk);
 	int err = 0;
 	long timeo;
-	struct strp_rx_msg *rxm;
+	struct strp_msg *stm;
 	int copied = 0;
 	struct sk_buff *skb;
 
@@ -1132,26 +1132,26 @@ static int kcm_recvmsg(struct socket *sock, struct msghdr *msg,
 
 	/* Okay, have a message on the receive queue */
 
-	rxm = strp_rx_msg(skb);
+	stm = strp_msg(skb);
 
-	if (len > rxm->full_len)
-		len = rxm->full_len;
+	if (len > stm->full_len)
+		len = stm->full_len;
 
-	err = skb_copy_datagram_msg(skb, rxm->offset, msg, len);
+	err = skb_copy_datagram_msg(skb, stm->offset, msg, len);
 	if (err < 0)
 		goto out;
 
 	copied = len;
 	if (likely(!(flags & MSG_PEEK))) {
 		KCM_STATS_ADD(kcm->stats.rx_bytes, copied);
-		if (copied < rxm->full_len) {
+		if (copied < stm->full_len) {
 			if (sock->type == SOCK_DGRAM) {
 				/* Truncated message */
 				msg->msg_flags |= MSG_TRUNC;
 				goto msg_finished;
 			}
-			rxm->offset += copied;
-			rxm->full_len -= copied;
+			stm->offset += copied;
+			stm->full_len -= copied;
 		} else {
 msg_finished:
 			/* Finished with message */
@@ -1175,7 +1175,7 @@ static ssize_t kcm_splice_read(struct socket *sock, loff_t *ppos,
 	struct sock *sk = sock->sk;
 	struct kcm_sock *kcm = kcm_sk(sk);
 	long timeo;
-	struct strp_rx_msg *rxm;
+	struct strp_msg *stm;
 	int err = 0;
 	ssize_t copied;
 	struct sk_buff *skb;
@@ -1192,12 +1192,12 @@ static ssize_t kcm_splice_read(struct socket *sock, loff_t *ppos,
 
 	/* Okay, have a message on the receive queue */
 
-	rxm = strp_rx_msg(skb);
+	stm = strp_msg(skb);
 
-	if (len > rxm->full_len)
-		len = rxm->full_len;
+	if (len > stm->full_len)
+		len = stm->full_len;
 
-	copied = skb_splice_bits(skb, sk, rxm->offset, pipe, len, flags);
+	copied = skb_splice_bits(skb, sk, stm->offset, pipe, len, flags);
 	if (copied < 0) {
 		err = copied;
 		goto err_out;
@@ -1205,8 +1205,8 @@ static ssize_t kcm_splice_read(struct socket *sock, loff_t *ppos,
 
 	KCM_STATS_ADD(kcm->stats.rx_bytes, copied);
 
-	rxm->offset += copied;
-	rxm->full_len -= copied;
+	stm->offset += copied;
+	stm->full_len -= copied;
 
 	/* We have no way to return MSG_EOR. If all the bytes have been
 	 * read we still leave the message in the receive socket buffer.
diff --git a/net/strparser/strparser.c b/net/strparser/strparser.c
index b5c279b22680..0d18fbc6f870 100644
--- a/net/strparser/strparser.c
+++ b/net/strparser/strparser.c
@@ -29,44 +29,46 @@
 
 static struct workqueue_struct *strp_wq;
 
-struct _strp_rx_msg {
-	/* Internal cb structure. struct strp_rx_msg must be first for passing
+struct _strp_msg {
+	/* Internal cb structure. struct strp_msg must be first for passing
 	 * to upper layer.
 	 */
-	struct strp_rx_msg strp;
+	struct strp_msg strp;
 	int accum_len;
 	int early_eaten;
 };
 
-static inline struct _strp_rx_msg *_strp_rx_msg(struct sk_buff *skb)
+static inline struct _strp_msg *_strp_msg(struct sk_buff *skb)
 {
-	return (struct _strp_rx_msg *)((void *)skb->cb +
+	return (struct _strp_msg *)((void *)skb->cb +
 		offsetof(struct qdisc_skb_cb, data));
 }
 
 /* Lower lock held */
-static void strp_abort_rx_strp(struct strparser *strp, int err)
+static void strp_abort_strp(struct strparser *strp, int err)
 {
-	struct sock *csk = strp->sk;
-
 	/* Unrecoverable error in receive */
 
-	del_timer(&strp->rx_msg_timer);
+	del_timer(&strp->msg_timer);
 
-	if (strp->rx_stopped)
+	if (strp->stopped)
 		return;
 
-	strp->rx_stopped = 1;
+	strp->stopped = 1;
+
+	if (strp->sk) {
+		struct sock *sk = strp->sk;
 
-	/* Report an error on the lower socket */
-	csk->sk_err = err;
-	csk->sk_error_report(csk);
+		/* Report an error on the lower socket */
+		sk->sk_err = err;
+		sk->sk_error_report(sk);
+	}
 }
 
-static void strp_start_rx_timer(struct strparser *strp)
+static void strp_start_timer(struct strparser *strp, long timeo)
 {
-	if (strp->sk->sk_rcvtimeo)
-		mod_timer(&strp->rx_msg_timer, strp->sk->sk_rcvtimeo);
+	if (timeo)
+		mod_timer(&strp->msg_timer, timeo);
 }
 
 /* Lower lock held */
@@ -74,46 +76,55 @@ static void strp_parser_err(struct strparser *strp, int err,
 			    read_descriptor_t *desc)
 {
 	desc->error = err;
-	kfree_skb(strp->rx_skb_head);
-	strp->rx_skb_head = NULL;
+	kfree_skb(strp->skb_head);
+	strp->skb_head = NULL;
 	strp->cb.abort_parser(strp, err);
 }
 
 static inline int strp_peek_len(struct strparser *strp)
 {
-	struct socket *sock = strp->sk->sk_socket;
+	if (strp->sk) {
+		struct socket *sock = strp->sk->sk_socket;
+
+		return sock->ops->peek_len(sock);
+	}
+
+	/* If we don't have an associated socket there's nothing to peek.
+	 * Return int max to avoid stopping the strparser.
+	 */
 
-	return sock->ops->peek_len(sock);
+	return INT_MAX;
 }
 
 /* Lower socket lock held */
-static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
-		     unsigned int orig_offset, size_t orig_len)
+static int __strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
+		       unsigned int orig_offset, size_t orig_len,
+		       size_t max_msg_size, long timeo)
 {
 	struct strparser *strp = (struct strparser *)desc->arg.data;
-	struct _strp_rx_msg *rxm;
+	struct _strp_msg *stm;
 	struct sk_buff *head, *skb;
 	size_t eaten = 0, cand_len;
 	ssize_t extra;
 	int err;
 	bool cloned_orig = false;
 
-	if (strp->rx_paused)
+	if (strp->paused)
 		return 0;
 
-	head = strp->rx_skb_head;
+	head = strp->skb_head;
 	if (head) {
 		/* Message already in progress */
 
-		rxm = _strp_rx_msg(head);
-		if (unlikely(rxm->early_eaten)) {
+		stm = _strp_msg(head);
+		if (unlikely(stm->early_eaten)) {
 			/* Already some number of bytes on the receive sock
-			 * data saved in rx_skb_head, just indicate they
+			 * data saved in skb_head, just indicate they
 			 * are consumed.
 			 */
-			eaten = orig_len <= rxm->early_eaten ?
-				orig_len : rxm->early_eaten;
-			rxm->early_eaten -= eaten;
+			eaten = orig_len <= stm->early_eaten ?
+				orig_len : stm->early_eaten;
+			stm->early_eaten -= eaten;
 
 			return eaten;
 		}
@@ -126,12 +137,12 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
 			 */
 			orig_skb = skb_clone(orig_skb, GFP_ATOMIC);
 			if (!orig_skb) {
-				STRP_STATS_INCR(strp->stats.rx_mem_fail);
+				STRP_STATS_INCR(strp->stats.mem_fail);
 				desc->error = -ENOMEM;
 				return 0;
 			}
 			if (!pskb_pull(orig_skb, orig_offset)) {
-				STRP_STATS_INCR(strp->stats.rx_mem_fail);
+				STRP_STATS_INCR(strp->stats.mem_fail);
 				kfree_skb(orig_skb);
 				desc->error = -ENOMEM;
 				return 0;
@@ -140,13 +151,13 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
 			orig_offset = 0;
 		}
 
-		if (!strp->rx_skb_nextp) {
+		if (!strp->skb_nextp) {
 			/* We are going to append to the frags_list of head.
 			 * Need to unshare the frag_list.
 			 */
 			err = skb_unclone(head, GFP_ATOMIC);
 			if (err) {
-				STRP_STATS_INCR(strp->stats.rx_mem_fail);
+				STRP_STATS_INCR(strp->stats.mem_fail);
 				desc->error = err;
 				return 0;
 			}
@@ -165,20 +176,20 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
 
 				skb = alloc_skb(0, GFP_ATOMIC);
 				if (!skb) {
-					STRP_STATS_INCR(strp->stats.rx_mem_fail);
+					STRP_STATS_INCR(strp->stats.mem_fail);
 					desc->error = -ENOMEM;
 					return 0;
 				}
 				skb->len = head->len;
 				skb->data_len = head->len;
 				skb->truesize = head->truesize;
-				*_strp_rx_msg(skb) = *_strp_rx_msg(head);
-				strp->rx_skb_nextp = &head->next;
+				*_strp_msg(skb) = *_strp_msg(head);
+				strp->skb_nextp = &head->next;
 				skb_shinfo(skb)->frag_list = head;
-				strp->rx_skb_head = skb;
+				strp->skb_head = skb;
 				head = skb;
 			} else {
-				strp->rx_skb_nextp =
+				strp->skb_nextp =
 				    &skb_shinfo(head)->frag_list;
 			}
 		}
@@ -188,112 +199,112 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
 		/* Always clone since we will consume something */
 		skb = skb_clone(orig_skb, GFP_ATOMIC);
 		if (!skb) {
-			STRP_STATS_INCR(strp->stats.rx_mem_fail);
+			STRP_STATS_INCR(strp->stats.mem_fail);
 			desc->error = -ENOMEM;
 			break;
 		}
 
 		cand_len = orig_len - eaten;
 
-		head = strp->rx_skb_head;
+		head = strp->skb_head;
 		if (!head) {
 			head = skb;
-			strp->rx_skb_head = head;
-			/* Will set rx_skb_nextp on next packet if needed */
-			strp->rx_skb_nextp = NULL;
-			rxm = _strp_rx_msg(head);
-			memset(rxm, 0, sizeof(*rxm));
-			rxm->strp.offset = orig_offset + eaten;
+			strp->skb_head = head;
+			/* Will set skb_nextp on next packet if needed */
+			strp->skb_nextp = NULL;
+			stm = _strp_msg(head);
+			memset(stm, 0, sizeof(*stm));
+			stm->strp.offset = orig_offset + eaten;
 		} else {
 			/* Unclone since we may be appending to an skb that we
 			 * already share a frag_list with.
 			 */
 			err = skb_unclone(skb, GFP_ATOMIC);
 			if (err) {
-				STRP_STATS_INCR(strp->stats.rx_mem_fail);
+				STRP_STATS_INCR(strp->stats.mem_fail);
 				desc->error = err;
 				break;
 			}
 
-			rxm = _strp_rx_msg(head);
-			*strp->rx_skb_nextp = skb;
-			strp->rx_skb_nextp = &skb->next;
+			stm = _strp_msg(head);
+			*strp->skb_nextp = skb;
+			strp->skb_nextp = &skb->next;
 			head->data_len += skb->len;
 			head->len += skb->len;
 			head->truesize += skb->truesize;
 		}
 
-		if (!rxm->strp.full_len) {
+		if (!stm->strp.full_len) {
 			ssize_t len;
 
 			len = (*strp->cb.parse_msg)(strp, head);
 
 			if (!len) {
 				/* Need more header to determine length */
-				if (!rxm->accum_len) {
+				if (!stm->accum_len) {
 					/* Start RX timer for new message */
-					strp_start_rx_timer(strp);
+					strp_start_timer(strp, timeo);
 				}
-				rxm->accum_len += cand_len;
+				stm->accum_len += cand_len;
 				eaten += cand_len;
-				STRP_STATS_INCR(strp->stats.rx_need_more_hdr);
+				STRP_STATS_INCR(strp->stats.need_more_hdr);
 				WARN_ON(eaten != orig_len);
 				break;
 			} else if (len < 0) {
-				if (len == -ESTRPIPE && rxm->accum_len) {
+				if (len == -ESTRPIPE && stm->accum_len) {
 					len = -ENODATA;
-					strp->rx_unrecov_intr = 1;
+					strp->unrecov_intr = 1;
 				} else {
-					strp->rx_interrupted = 1;
+					strp->interrupted = 1;
 				}
 				strp_parser_err(strp, len, desc);
 				break;
-			} else if (len > strp->sk->sk_rcvbuf) {
+			} else if (len > max_msg_size) {
 				/* Message length exceeds maximum allowed */
-				STRP_STATS_INCR(strp->stats.rx_msg_too_big);
+				STRP_STATS_INCR(strp->stats.msg_too_big);
 				strp_parser_err(strp, -EMSGSIZE, desc);
 				break;
 			} else if (len <= (ssize_t)head->len -
-					  skb->len - rxm->strp.offset) {
+					  skb->len - stm->strp.offset) {
 				/* Length must be into new skb (and also
 				 * greater than zero)
 				 */
-				STRP_STATS_INCR(strp->stats.rx_bad_hdr_len);
+				STRP_STATS_INCR(strp->stats.bad_hdr_len);
 				strp_parser_err(strp, -EPROTO, desc);
 				break;
 			}
 
-			rxm->strp.full_len = len;
+			stm->strp.full_len = len;
 		}
 
-		extra = (ssize_t)(rxm->accum_len + cand_len) -
-			rxm->strp.full_len;
+		extra = (ssize_t)(stm->accum_len + cand_len) -
+			stm->strp.full_len;
 
 		if (extra < 0) {
 			/* Message not complete yet. */
-			if (rxm->strp.full_len - rxm->accum_len >
+			if (stm->strp.full_len - stm->accum_len >
 			    strp_peek_len(strp)) {
-				/* Don't have the whole messages in the socket
-				 * buffer. Set strp->rx_need_bytes to wait for
+				/* Don't have the whole message in the socket
+				 * buffer. Set strp->need_bytes to wait for
 				 * the rest of the message. Also, set "early
 				 * eaten" since we've already buffered the skb
 				 * but don't consume yet per strp_read_sock.
 				 */
 
-				if (!rxm->accum_len) {
+				if (!stm->accum_len) {
 					/* Start RX timer for new message */
-					strp_start_rx_timer(strp);
+					strp_start_timer(strp, timeo);
 				}
 
-				strp->rx_need_bytes = rxm->strp.full_len -
-						       rxm->accum_len;
-				rxm->accum_len += cand_len;
-				rxm->early_eaten = cand_len;
-				STRP_STATS_ADD(strp->stats.rx_bytes, cand_len);
+				strp->need_bytes = stm->strp.full_len -
+						       stm->accum_len;
+				stm->accum_len += cand_len;
+				stm->early_eaten = cand_len;
+				STRP_STATS_ADD(strp->stats.bytes, cand_len);
 				desc->count = 0; /* Stop reading socket */
 				break;
 			}
-			rxm->accum_len += cand_len;
+			stm->accum_len += cand_len;
 			eaten += cand_len;
 			WARN_ON(eaten != orig_len);
 			break;
@@ -308,14 +319,14 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
 		eaten += (cand_len - extra);
 
 		/* Hurray, we have a new message! */
-		del_timer(&strp->rx_msg_timer);
-		strp->rx_skb_head = NULL;
-		STRP_STATS_INCR(strp->stats.rx_msgs);
+		del_timer(&strp->msg_timer);
+		strp->skb_head = NULL;
+		STRP_STATS_INCR(strp->stats.msgs);
 
 		/* Give skb to upper layer */
 		strp->cb.rcv_msg(strp, head);
 
-		if (unlikely(strp->rx_paused)) {
+		if (unlikely(strp->paused)) {
 			/* Upper layer paused strp */
 			break;
 		}
@@ -324,11 +335,33 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
 	if (cloned_orig)
 		kfree_skb(orig_skb);
 
-	STRP_STATS_ADD(strp->stats.rx_bytes, eaten);
+	STRP_STATS_ADD(strp->stats.bytes, eaten);
 
 	return eaten;
 }
 
+int strp_process(struct strparser *strp, struct sk_buff *orig_skb,
+		 unsigned int orig_offset, size_t orig_len,
+		 size_t max_msg_size, long timeo)
+{
+	read_descriptor_t desc; /* Dummy arg to strp_recv */
+
+	desc.arg.data = strp;
+
+	return __strp_recv(&desc, orig_skb, orig_offset, orig_len,
+			   max_msg_size, timeo);
+}
+EXPORT_SYMBOL_GPL(strp_process);
+
+static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
+		     unsigned int orig_offset, size_t orig_len)
+{
+	struct strparser *strp = (struct strparser *)desc->arg.data;
+
+	return __strp_recv(desc, orig_skb, orig_offset, orig_len,
+			   strp->sk->sk_rcvbuf, strp->sk->sk_rcvtimeo);
+}
+
 static int default_read_sock_done(struct strparser *strp, int err)
 {
 	return err;
@@ -355,101 +388,129 @@ static int strp_read_sock(struct strparser *strp)
 /* Lower sock lock held */
 void strp_data_ready(struct strparser *strp)
 {
-	if (unlikely(strp->rx_stopped))
+	if (unlikely(strp->stopped))
 		return;
 
-	/* This check is needed to synchronize with do_strp_rx_work.
-	 * do_strp_rx_work acquires a process lock (lock_sock) whereas
+	/* This check is needed to synchronize with do_strp_work.
+	 * do_strp_work acquires a process lock (lock_sock) whereas
 	 * the lock held here is bh_lock_sock. The two locks can be
 	 * held by different threads at the same time, but bh_lock_sock
 	 * allows a thread in BH context to safely check if the process
 	 * lock is held. In this case, if the lock is held, queue work.
 	 */
 	if (sock_owned_by_user(strp->sk)) {
-		queue_work(strp_wq, &strp->rx_work);
+		queue_work(strp_wq, &strp->work);
 		return;
 	}
 
-	if (strp->rx_paused)
+	if (strp->paused)
 		return;
 
-	if (strp->rx_need_bytes) {
-		if (strp_peek_len(strp) >= strp->rx_need_bytes)
-			strp->rx_need_bytes = 0;
+	if (strp->need_bytes) {
+		if (strp_peek_len(strp) >= strp->need_bytes)
+			strp->need_bytes = 0;
 		else
 			return;
 	}
 
 	if (strp_read_sock(strp) == -ENOMEM)
-		queue_work(strp_wq, &strp->rx_work);
+		queue_work(strp_wq, &strp->work);
 }
 EXPORT_SYMBOL_GPL(strp_data_ready);
 
-static void do_strp_rx_work(struct strparser *strp)
+static void do_strp_work(struct strparser *strp)
 {
 	read_descriptor_t rd_desc;
-	struct sock *csk = strp->sk;
 
 	/* We need the read lock to synchronize with strp_data_ready. We
 	 * need the socket lock for calling strp_read_sock.
 	 */
-	lock_sock(csk);
+	strp->cb.lock(strp);
 
-	if (unlikely(strp->rx_stopped))
+	if (unlikely(strp->stopped))
 		goto out;
 
-	if (strp->rx_paused)
+	if (strp->paused)
 		goto out;
 
 	rd_desc.arg.data = strp;
 
 	if (strp_read_sock(strp) == -ENOMEM)
-		queue_work(strp_wq, &strp->rx_work);
+		queue_work(strp_wq, &strp->work);
 
 out:
-	release_sock(csk);
+	strp->cb.unlock(strp);
 }
 
-static void strp_rx_work(struct work_struct *w)
+static void strp_work(struct work_struct *w)
 {
-	do_strp_rx_work(container_of(w, struct strparser, rx_work));
+	do_strp_work(container_of(w, struct strparser, work));
 }
 
-static void strp_rx_msg_timeout(unsigned long arg)
+static void strp_msg_timeout(unsigned long arg)
 {
 	struct strparser *strp = (struct strparser *)arg;
 
 	/* Message assembly timed out */
-	STRP_STATS_INCR(strp->stats.rx_msg_timeouts);
-	lock_sock(strp->sk);
+	STRP_STATS_INCR(strp->stats.msg_timeouts);
+	strp->cb.lock(strp);
 	strp->cb.abort_parser(strp, ETIMEDOUT);
+	strp->cb.unlock(strp);
+}
+
+static void strp_sock_lock(struct strparser *strp)
+{
+	lock_sock(strp->sk);
+}
+
+static void strp_sock_unlock(struct strparser *strp)
+{
 	release_sock(strp->sk);
 }
 
-int strp_init(struct strparser *strp, struct sock *csk,
+int strp_init(struct strparser *strp, struct sock *sk,
 	      struct strp_callbacks *cb)
 {
-	struct socket *sock = csk->sk_socket;
 
 	if (!cb || !cb->rcv_msg || !cb->parse_msg)
 		return -EINVAL;
 
-	if (!sock->ops->read_sock || !sock->ops->peek_len)
-		return -EAFNOSUPPORT;
+	/* The sk (sock) arg determines the mode of the stream parser.
+	 *
+	 * If the sock is set then the strparser is in receive callback mode.
+	 * The upper layer calls strp_data_ready to kick receive processing
+	 * and strparser calls the read_sock function on the socket to
+	 * get packets.
+	 *
+	 * If the sock is not set then the strparser is in general mode.
+	 * The upper layer calls strp_process for each skb to be parsed.
+	 */
 
-	memset(strp, 0, sizeof(*strp));
+	if (sk) {
+		struct socket *sock = sk->sk_socket;
 
-	strp->sk = csk;
+		if (!sock->ops->read_sock || !sock->ops->peek_len)
+			return -EAFNOSUPPORT;
+	} else {
+		if (!cb->lock || !cb->unlock)
+			return -EINVAL;
+	}
 
-	setup_timer(&strp->rx_msg_timer, strp_rx_msg_timeout,
-		    (unsigned long)strp);
+	memset(strp, 0, sizeof(*strp));
 
-	INIT_WORK(&strp->rx_work, strp_rx_work);
+	strp->sk = sk;
 
+	strp->cb.lock = cb->lock ? : strp_sock_lock;
+	strp->cb.unlock = cb->unlock ? : strp_sock_unlock;
 	strp->cb.rcv_msg = cb->rcv_msg;
 	strp->cb.parse_msg = cb->parse_msg;
 	strp->cb.read_sock_done = cb->read_sock_done ? : default_read_sock_done;
-	strp->cb.abort_parser = cb->abort_parser ? : strp_abort_rx_strp;
+	strp->cb.abort_parser = cb->abort_parser ? : strp_abort_strp;
+
+	setup_timer(&strp->msg_timer, strp_msg_timeout,
+		    (unsigned long)strp);
+
+	INIT_WORK(&strp->work, strp_work);
 
 	return 0;
 }
@@ -457,12 +518,12 @@ EXPORT_SYMBOL_GPL(strp_init);
 
 void strp_unpause(struct strparser *strp)
 {
-	strp->rx_paused = 0;
+	strp->paused = 0;
 
-	/* Sync setting rx_paused with RX work */
+	/* Sync setting paused with RX work */
 	smp_mb();
 
-	queue_work(strp_wq, &strp->rx_work);
+	queue_work(strp_wq, &strp->work);
 }
 EXPORT_SYMBOL_GPL(strp_unpause);
 
@@ -471,27 +532,27 @@ EXPORT_SYMBOL_GPL(strp_unpause);
  */
 void strp_done(struct strparser *strp)
 {
-	WARN_ON(!strp->rx_stopped);
+	WARN_ON(!strp->stopped);
 
-	del_timer_sync(&strp->rx_msg_timer);
-	cancel_work_sync(&strp->rx_work);
+	del_timer_sync(&strp->msg_timer);
+	cancel_work_sync(&strp->work);
 
-	if (strp->rx_skb_head) {
-		kfree_skb(strp->rx_skb_head);
-		strp->rx_skb_head = NULL;
+	if (strp->skb_head) {
+		kfree_skb(strp->skb_head);
+		strp->skb_head = NULL;
 	}
 }
 EXPORT_SYMBOL_GPL(strp_done);
 
 void strp_stop(struct strparser *strp)
 {
-	strp->rx_stopped = 1;
+	strp->stopped = 1;
 }
 EXPORT_SYMBOL_GPL(strp_stop);
 
 void strp_check_rcv(struct strparser *strp)
 {
-	queue_work(strp_wq, &strp->rx_work);
+	queue_work(strp_wq, &strp->work);
 }
 EXPORT_SYMBOL_GPL(strp_check_rcv);
 
-- 
2.11.0

^ permalink raw reply related	[flat|nested] 11+ messages in thread

* Re: [PATCH net-next 0/3] net: Infrastructure changes for [kz]proxy
  2017-07-28 23:22 [PATCH net-next 0/3] net: Infrastructure changes for [kz]proxy Tom Herbert
                   ` (2 preceding siblings ...)
  2017-07-28 23:22 ` [PATCH net-next 3/3] strparser: Generalize strparser Tom Herbert
@ 2017-08-01 22:26 ` David Miller
  3 siblings, 0 replies; 11+ messages in thread
From: David Miller @ 2017-08-01 22:26 UTC (permalink / raw)
  To: tom; +Cc: netdev, rohit, john.fastabend

From: Tom Herbert <tom@quantonium.net>
Date: Fri, 28 Jul 2017 16:22:40 -0700

> This patch set contains some general infrastructure enhancements that
> will be used by kernel proxy and zero proxy.
> 
> The changes are:
>   - proto_ops: Add locked versions of sendmsg and sendpage
>   - skb_send_sock: Allow sending and skb on a socket within the
>     kernel
>   - Generalize strparser. Allow it to be used in other contexts than
>     just in the read_sock path. This will be used in the transmit
>     path of zero proxy.
> 
> Some nice future work (which I've been discussing with John Fastabend)
> will be to make some of the related functions to allow gifting of skbs
> We should be able to do that with skb_send_sock and strp_process. I'd
> also like this feature in the read_sock callbeck.
> 
> Tested: Ran modified kernel without incident. Tested new functionality
> using zero proxy (in development).

Series applied, thanks Tom.

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH net-next 1/3] proto_ops: Add locked held versions of sendmsg and sendpage
  2017-07-28 23:22 ` [PATCH net-next 1/3] proto_ops: Add locked held versions of sendmsg and sendpage Tom Herbert
@ 2017-08-03 19:49   ` John Fastabend
  2017-08-03 20:21   ` John Fastabend
  1 sibling, 0 replies; 11+ messages in thread
From: John Fastabend @ 2017-08-03 19:49 UTC (permalink / raw)
  To: Tom Herbert, netdev; +Cc: rohit

On 07/28/2017 04:22 PM, Tom Herbert wrote:
> Add new proto_ops sendmsg_locked and sendpage_locked that can be
> called when the socket lock is already held. Correspondingly, add
> kernel_sendmsg_locked and kernel_sendpage_locked as front end
> functions.
> 
> These functions will be used in zero proxy so that we can take
> the socket lock in a ULP sendmsg/sendpage and then directly call the
> backend transport proto_ops functions.
> 
> Signed-off-by: Tom Herbert <tom@quantonium.net>
> ---

[...]

> diff --git a/net/socket.c b/net/socket.c
> index 79d9bb964cd8..c0a12ad39610 100644
> --- a/net/socket.c
> +++ b/net/socket.c
> @@ -652,6 +652,20 @@ int kernel_sendmsg(struct socket *sock, struct msghdr *msg,
>  }
>  EXPORT_SYMBOL(kernel_sendmsg);
>  
> +int kernel_sendmsg_locked(struct sock *sk, struct msghdr *msg,
> +			  struct kvec *vec, size_t num, size_t size)
> +{
> +	struct socket *sock = sk->sk_socket;
> +
> +	if (!sock->ops->sendmsg_locked)
> +		sock_no_sendmsg_locked(sk, msg, size);
> +

Should be

		return sock_no_sendmsg_locked(sk, msg, size);


> +	iov_iter_kvec(&msg->msg_iter, WRITE | ITER_KVEC, vec, num, size);
> +
> +	return sock->ops->sendmsg_locked(sk, msg, msg_data_left(msg));

Otherwise this is a null ptr deref.

> +}
> +EXPORT_SYMBOL(kernel_sendmsg_locked);
> +


Thanks,
John

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH net-next 2/3] skbuff: Function to send an skbuf on a socket
  2017-07-28 23:22 ` [PATCH net-next 2/3] skbuff: Function to send an skbuf on a socket Tom Herbert
@ 2017-08-03 19:51   ` John Fastabend
  0 siblings, 0 replies; 11+ messages in thread
From: John Fastabend @ 2017-08-03 19:51 UTC (permalink / raw)
  To: Tom Herbert, netdev; +Cc: rohit

On 07/28/2017 04:22 PM, Tom Herbert wrote:
> Add skb_send_sock to send an skbuff on a socket within the kernel.
> Arguments include an offset so that an skbuf might be sent in mulitple
> calls (e.g. send buffer limit is hit).
> 
> Signed-off-by: Tom Herbert <tom@quantonium.net>
> ---

[...]

> +/* Send skb data on a socket. Socket must be locked. */
> +int skb_send_sock_locked(struct sock *sk, struct sk_buff *skb, int offset,
> +			 int len)
> +{
> +	unsigned int orig_len = len;
> +	struct sk_buff *head = skb;
> +	unsigned short fragidx;
> +	int slen, ret;
> +
> +do_frag_list:
> +
> +	/* Deal with head data */
> +	while (offset < skb_headlen(skb) && len) {
> +		struct kvec kv;
> +		struct msghdr msg;
> +
> +		slen = min_t(int, len, skb_headlen(skb) - offset);
> +		kv.iov_base = skb->data + offset;
> +		kv.iov_len = len;
                            ^^^^^^

This should be slen right?

> +		memset(&msg, 0, sizeof(msg));
> +
> +		ret = kernel_sendmsg_locked(sk, &msg, &kv, 1, slen);
> +		if (ret <= 0)
> +			goto error;
> +
> +		offset += ret;
> +		len -= ret;
> +	}
> +

Thanks,
John

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH net-next 1/3] proto_ops: Add locked held versions of sendmsg and sendpage
  2017-07-28 23:22 ` [PATCH net-next 1/3] proto_ops: Add locked held versions of sendmsg and sendpage Tom Herbert
  2017-08-03 19:49   ` John Fastabend
@ 2017-08-03 20:21   ` John Fastabend
  2017-08-03 21:07     ` Tom Herbert
  1 sibling, 1 reply; 11+ messages in thread
From: John Fastabend @ 2017-08-03 20:21 UTC (permalink / raw)
  To: Tom Herbert, netdev; +Cc: rohit

On 07/28/2017 04:22 PM, Tom Herbert wrote:
> Add new proto_ops sendmsg_locked and sendpage_locked that can be
> called when the socket lock is already held. Correspondingly, add
> kernel_sendmsg_locked and kernel_sendpage_locked as front end
> functions.
> 
> These functions will be used in zero proxy so that we can take
> the socket lock in a ULP sendmsg/sendpage and then directly call the
> backend transport proto_ops functions.
> 

[...]

>  
> +int kernel_sendpage_locked(struct sock *sk, struct page *page, int offset,
> +			   size_t size, int flags)
> +{
> +	struct socket *sock = sk->sk_socket;
> +
> +	if (sock->ops->sendpage_locked)
> +		return sock->ops->sendpage_locked(sk, page, offset, size,
> +						  flags);
> +
> +	return sock_no_sendpage_locked(sk, page, offset, size, flags);
> +}

How about just returning EOPNOTSUPP here and force implementations to do both
sendmsg and sendpage. The only implementation of these callbacks already does
this. And if its any other socket it will just wind its way through a few
layers of calls before returning EOPNOTSUPP.

.John

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH net-next 1/3] proto_ops: Add locked held versions of sendmsg and sendpage
  2017-08-03 20:21   ` John Fastabend
@ 2017-08-03 21:07     ` Tom Herbert
  0 siblings, 0 replies; 11+ messages in thread
From: Tom Herbert @ 2017-08-03 21:07 UTC (permalink / raw)
  To: John Fastabend; +Cc: Tom Herbert, Linux Kernel Network Developers, Rohit Seth

On Thu, Aug 3, 2017 at 1:21 PM, John Fastabend <john.fastabend@gmail.com> wrote:
> On 07/28/2017 04:22 PM, Tom Herbert wrote:
>> Add new proto_ops sendmsg_locked and sendpage_locked that can be
>> called when the socket lock is already held. Correspondingly, add
>> kernel_sendmsg_locked and kernel_sendpage_locked as front end
>> functions.
>>
>> These functions will be used in zero proxy so that we can take
>> the socket lock in a ULP sendmsg/sendpage and then directly call the
>> backend transport proto_ops functions.
>>
>
> [...]
>
>>
>> +int kernel_sendpage_locked(struct sock *sk, struct page *page, int offset,
>> +                        size_t size, int flags)
>> +{
>> +     struct socket *sock = sk->sk_socket;
>> +
>> +     if (sock->ops->sendpage_locked)
>> +             return sock->ops->sendpage_locked(sk, page, offset, size,
>> +                                               flags);
>> +
>> +     return sock_no_sendpage_locked(sk, page, offset, size, flags);
>> +}
>
> How about just returning EOPNOTSUPP here and force implementations to do both
> sendmsg and sendpage. The only implementation of these callbacks already does
> this. And if its any other socket it will just wind its way through a few
> layers of calls before returning EOPNOTSUPP.
>
Seems reasonable, but we should probably make the same change to
kernel_sendpage to be consistent.

Tom

> .John

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH net-next 3/3] strparser: Generalize strparser
  2017-07-28 23:22 ` [PATCH net-next 3/3] strparser: Generalize strparser Tom Herbert
@ 2017-10-19 17:42   ` Eric Dumazet
       [not found]     ` <CALx6S3765-5oT6UtSQVts4KwWz3F3ub2vhDWO7YemYZPPM0EjA@mail.gmail.com>
  0 siblings, 1 reply; 11+ messages in thread
From: Eric Dumazet @ 2017-10-19 17:42 UTC (permalink / raw)
  To: Tom Herbert; +Cc: netdev, rohit, john.fastabend

On Fri, 2017-07-28 at 16:22 -0700, Tom Herbert wrote:
> Generalize strparser from more than just being used in conjunction
> with read_sock. strparser will also be used in the send path with
> zero proxy. The primary change is to create strp_process function
> that performs the critical processing on skbs. The documentation
> is also updated to reflect the new uses.
> 
> Signed-off-by: Tom Herbert <tom@quantonium.net>
> ---
>  Documentation/networking/strparser.txt | 207 +++++++++++++++-------
>  include/net/strparser.h                | 119 +++++++------
>  net/kcm/kcmproc.c                      |  34 ++--
>  net/kcm/kcmsock.c                      |  38 ++--
>  net/strparser/strparser.c              | 313 ++++++++++++++++++++-------------
>  5 files changed, 424 insertions(+), 287 deletions(-)

Just found this gem :

static void strp_msg_timeout(unsigned long arg)
{
        struct strparser *strp = (struct strparser *)arg;

        /* Message assembly timed out */
        STRP_STATS_INCR(strp->stats.msg_timeouts);
        strp->cb.lock(strp);
        strp->cb.abort_parser(strp, ETIMEDOUT);
        strp->cb.unlock(strp);
}

static void strp_sock_lock(struct strparser *strp)
{
        lock_sock(strp->sk);
}

static void strp_sock_unlock(struct strparser *strp)
{
        release_sock(strp->sk);
}


A timer runs from BH, and from this interrupt context it is absolutely
illegal to call lock_sock() ( and release_sock() )

Please fix, thanks !

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH net-next 3/3] strparser: Generalize strparser
       [not found]     ` <CALx6S3765-5oT6UtSQVts4KwWz3F3ub2vhDWO7YemYZPPM0EjA@mail.gmail.com>
@ 2017-10-19 20:18       ` John Fastabend
  0 siblings, 0 replies; 11+ messages in thread
From: John Fastabend @ 2017-10-19 20:18 UTC (permalink / raw)
  To: Tom Herbert, Eric Dumazet
  Cc: Tom Herbert, Linux Kernel Network Developers, Rohit Seth

On 10/19/2017 12:51 PM, Tom Herbert wrote:
> On Thu, Oct 19, 2017 at 10:42 AM, Eric Dumazet <eric.dumazet@gmail.com>
> wrote:
> 
>> On Fri, 2017-07-28 at 16:22 -0700, Tom Herbert wrote:
>>> Generalize strparser from more than just being used in conjunction
>>> with read_sock. strparser will also be used in the send path with
>>> zero proxy. The primary change is to create strp_process function
>>> that performs the critical processing on skbs. The documentation
>>> is also updated to reflect the new uses.
>>>
>>> Signed-off-by: Tom Herbert <tom@quantonium.net>
>>> ---
>>>  Documentation/networking/strparser.txt | 207 +++++++++++++++-------
>>>  include/net/strparser.h                | 119 +++++++------
>>>  net/kcm/kcmproc.c                      |  34 ++--
>>>  net/kcm/kcmsock.c                      |  38 ++--
>>>  net/strparser/strparser.c              | 313
>> ++++++++++++++++++++-------------
>>>  5 files changed, 424 insertions(+), 287 deletions(-)
>>
>> Just found this gem :
>>
>> static void strp_msg_timeout(unsigned long arg)
>> {
>>         struct strparser *strp = (struct strparser *)arg;
>>
>>         /* Message assembly timed out */
>>         STRP_STATS_INCR(strp->stats.msg_timeouts);
>>         strp->cb.lock(strp);
>>         strp->cb.abort_parser(strp, ETIMEDOUT);
>>         strp->cb.unlock(strp);
>> }
>>
>> static void strp_sock_lock(struct strparser *strp)
>> {
>>         lock_sock(strp->sk);
>> }
>>
>> static void strp_sock_unlock(struct strparser *strp)
>> {
>>         release_sock(strp->sk);
>> }
>>
>>
>> A timer runs from BH, and from this interrupt context it is absolutely
>> illegal to call lock_sock() ( and release_sock() )
>>
>> Please fix, thanks !
>>
> 
> Nice catch! I'll fix it.
> 
Can the lock/release be removed and the strp stopped bit can use atomics?

It looks like sk_error_report can be called without sock lock.

Thanks,
John

^ permalink raw reply	[flat|nested] 11+ messages in thread

end of thread, other threads:[~2017-10-19 20:18 UTC | newest]

Thread overview: 11+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2017-07-28 23:22 [PATCH net-next 0/3] net: Infrastructure changes for [kz]proxy Tom Herbert
2017-07-28 23:22 ` [PATCH net-next 1/3] proto_ops: Add locked held versions of sendmsg and sendpage Tom Herbert
2017-08-03 19:49   ` John Fastabend
2017-08-03 20:21   ` John Fastabend
2017-08-03 21:07     ` Tom Herbert
2017-07-28 23:22 ` [PATCH net-next 2/3] skbuff: Function to send an skbuf on a socket Tom Herbert
2017-08-03 19:51   ` John Fastabend
2017-07-28 23:22 ` [PATCH net-next 3/3] strparser: Generalize strparser Tom Herbert
2017-10-19 17:42   ` Eric Dumazet
     [not found]     ` <CALx6S3765-5oT6UtSQVts4KwWz3F3ub2vhDWO7YemYZPPM0EjA@mail.gmail.com>
2017-10-19 20:18       ` John Fastabend
2017-08-01 22:26 ` [PATCH net-next 0/3] net: Infrastructure changes for [kz]proxy David Miller

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.