All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH net-next v2 0/3] strp: Stream parser for messages
@ 2016-08-01 16:33 Tom Herbert
  2016-08-01 16:33 ` [PATCH net-next v2 1/3] strparser: " Tom Herbert
                   ` (3 more replies)
  0 siblings, 4 replies; 7+ messages in thread
From: Tom Herbert @ 2016-08-01 16:33 UTC (permalink / raw)
  To: davem, netdev; +Cc: kernel-team

This patch set introduces a utility for parsing application layer
protocol messages in a TCP stream. This is a generalization of the
mechanism implemented of Kernel Connection Multiplexor.

This patch set adapts KCM to use the strparser. We expect that kTLS
can use this mechanism also. RDS would probably be another candidate
to use a common stream parsing mechanism.

The API includes a context structure, a set of callbacks, utility
functions, and a data ready function. The callbacks include
a parse_msg function that is called to perform parsing (e.g.
BPF parsing in case of KCM), and a rcv_msg function that is called
when a full message has been completed.

For strparser we specify the return codes from the parser to allow
the backend to indicate that control of the socket should be
transferred back to userspace to handle some exceptions in the
stream: The return values are:

      >0 : indicates length of successfully parsed message
       0  : indicates more data must be received to parse the message
       -ESTRPIPE : current message should not be processed by the
          kernel, return control of the socket to userspace which
          can proceed to read the messages itself
       other < 0 : Error is parsing, give control back to userspace
          assuming that synchronization is lost and the stream
          is unrecoverable (application expected to close TCP socket)

There is one issue I haven't been able to fully resolve. If parse_msg
returns ESTRPIPE (wants control back to userspace) the parser may
already have consumed some bytes of the message. There is no way to
put bytes back into the TCP receive queue and tcp_read_sock does not
allow an easy way to peek messages. In lieu of a better solution, we
return ENODATA on the socket to indicate that the data stream is
unrecoverable (application needs to close socket). This condition
should only happen if an application layer message header is split
across two skbuffs and parsing just the first skbuff wasn't sufficient
to determine the that transfer to userspace is needed.

This patch set contains:

  - strparser implementation
  - changes to kcm to use strparser
  - strparser.txt documentation

v2:
  - Add copyright notice to C files
  - Remove GPL module license from strparser.c
  - Add report of rxpause

Tested:
  - Ran a KCM thrash test for 24 hours. No behavioral or performance
    differences observed.


Tom Herbert (3):
  strparser: Stream parser for messages
  kcm: Use stream parser
  strparser: Documentation

 Documentation/networking/strparser.txt | 147 ++++++++++
 include/net/kcm.h                      |  37 +--
 include/net/strparser.h                | 147 ++++++++++
 net/Kconfig                            |   1 +
 net/Makefile                           |   1 +
 net/ipv6/ila/ila_common.c              |   1 -
 net/kcm/Kconfig                        |   1 +
 net/kcm/kcmproc.c                      |  44 ++-
 net/kcm/kcmsock.c                      | 464 +++++-------------------------
 net/strparser/Kconfig                  |   4 +
 net/strparser/Makefile                 |   1 +
 net/strparser/strparser.c              | 500 +++++++++++++++++++++++++++++++++
 12 files changed, 916 insertions(+), 432 deletions(-)
 create mode 100644 Documentation/networking/strparser.txt
 create mode 100644 include/net/strparser.h
 create mode 100644 net/strparser/Kconfig
 create mode 100644 net/strparser/Makefile
 create mode 100644 net/strparser/strparser.c

-- 
2.8.0.rc2

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH net-next v2 1/3] strparser: Stream parser for messages
  2016-08-01 16:33 [PATCH net-next v2 0/3] strp: Stream parser for messages Tom Herbert
@ 2016-08-01 16:33 ` Tom Herbert
  2016-08-01 16:33 ` [PATCH net-next v2 2/3] kcm: Use stream parser Tom Herbert
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 7+ messages in thread
From: Tom Herbert @ 2016-08-01 16:33 UTC (permalink / raw)
  To: davem, netdev; +Cc: kernel-team

This patch introduces a utility for parsing application layer protocol
messages in a TCP stream. This is a generalization of the mechanism
implemented of Kernel Connection Multiplexor.

The API includes a context structure, a set of callbacks, utility
functions, and a data ready function.

A stream parser instance is defined by a strparse structure that
is bound to a TCP socket. The function to initialize the structure
is:

int strp_init(struct strparser *strp, struct sock *csk,
              struct strp_callbacks *cb);

csk is the TCP socket being bound to and cb are the parser callbacks.

A parser is bound to a TCP socket by setting data_ready function to
strp_tcp_data_ready so that all receive indications on the socket
go through the parser. This is assumes that sk_user_data is set to
the strparser structure.

There are four callbacks.
 - parse_msg is called to parse the message (returns length or error).
 - rcv_msg is called when a complete message has been received
 - read_sock_done is called when data_ready function exits
 - abort_parser is called to abort the parser

The input to parse_msg is an skbuff which contains next message under
construction. The backend processing of parse_msg will parse the
application layer protocol headers to determine the length of
the message in the stream. The possible return values are:

   >0 : indicates length of successfully parsed message
   0  : indicates more data must be received to parse the message
   -ESTRPIPE : current message should not be processed by the
      kernel, return control of the socket to userspace which
      can proceed to read the messages itself
   other < 0 : Error is parsing, give control back to userspace
      assuming that synchronzation is lost and the stream
      is unrecoverable (application expected to close TCP socket)

In the case of error return (< 0) strparse will stop the parser
and report and error to userspace. The application must deal
with the error. To handle the error the strparser is unbound
from the TCP socket. If the error indicates that the stream
TCP socket is at recoverable point (ESTRPIPE) then the application
can read the TCP socket to process the stream. Once the application
has dealt with the exceptions in the stream, it may again bind the
socket to a strparser to continue data operations.

Note that ENODATA may be returned to the application. In this case
parse_msg returned -ESTRPIPE, however strparser was unable to maintain
synchronization of the stream (i.e. some of the message in question
was already read by the parser).

strp_pause and strp_unpause are used to provide flow control. For
instance, if rcv_msg is called but the upper layer can't immediately
consume the message it can hold the message and pause strparser.

Signed-off-by: Tom Herbert <tom@herbertland.com>
---
 include/net/strparser.h   | 147 ++++++++++++++
 net/Kconfig               |   1 +
 net/Makefile              |   1 +
 net/strparser/Kconfig     |   4 +
 net/strparser/Makefile    |   1 +
 net/strparser/strparser.c | 500 ++++++++++++++++++++++++++++++++++++++++++++++
 6 files changed, 654 insertions(+)
 create mode 100644 include/net/strparser.h
 create mode 100644 net/strparser/Kconfig
 create mode 100644 net/strparser/Makefile
 create mode 100644 net/strparser/strparser.c

diff --git a/include/net/strparser.h b/include/net/strparser.h
new file mode 100644
index 0000000..42b964d
--- /dev/null
+++ b/include/net/strparser.h
@@ -0,0 +1,147 @@
+/*
+ * Stream Parser
+ *
+ * Copyright (c) 2016 Tom Herbert <tom@herbertland.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2
+ * as published by the Free Software Foundation.
+ */
+
+#ifndef __NET_STRPARSER_H_
+#define __NET_STRPARSER_H_
+
+#include <linux/skbuff.h>
+#include <net/sock.h>
+
+#define STRP_STATS_ADD(stat, count) ((stat) += (count))
+#define STRP_STATS_INCR(stat) ((stat)++)
+
+struct strp_stats {
+	unsigned long long rx_msgs;
+	unsigned long long rx_bytes;
+	unsigned int rx_mem_fail;
+	unsigned int rx_need_more_hdr;
+	unsigned int rx_msg_too_big;
+	unsigned int rx_msg_timeouts;
+	unsigned int rx_bad_hdr_len;
+};
+
+struct strp_aggr_stats {
+	unsigned long long rx_msgs;
+	unsigned long long rx_bytes;
+	unsigned int rx_mem_fail;
+	unsigned int rx_need_more_hdr;
+	unsigned int rx_msg_too_big;
+	unsigned int rx_msg_timeouts;
+	unsigned int rx_bad_hdr_len;
+	unsigned int rx_aborts;
+	unsigned int rx_interrupted;
+	unsigned int rx_unrecov_intr;
+};
+
+struct strparser;
+
+/* Callbacks are called with lock held for the attached socket */
+struct strp_callbacks {
+	int (*parse_msg)(struct strparser *strp, struct sk_buff *skb);
+	void (*rcv_msg)(struct strparser *strp, struct sk_buff *skb);
+	int (*read_sock_done)(struct strparser *strp, int err);
+	void (*abort_parser)(struct strparser *strp, int err);
+};
+
+struct strp_rx_msg {
+	int full_len;
+	int offset;
+};
+
+static inline struct strp_rx_msg *strp_rx_msg(struct sk_buff *skb)
+{
+	return (struct strp_rx_msg *)((void *)skb->cb +
+		offsetof(struct qdisc_skb_cb, data));
+}
+
+/* Structure for an attached lower socket */
+struct strparser {
+	struct sock *sk;
+
+	u32 rx_stopped : 1;
+	u32 rx_paused : 1;
+	u32 rx_aborted : 1;
+	u32 rx_interrupted : 1;
+	u32 rx_unrecov_intr : 1;
+
+	struct sk_buff **rx_skb_nextp;
+	struct timer_list rx_msg_timer;
+	struct sk_buff *rx_skb_head;
+	unsigned int rx_need_bytes;
+	struct delayed_work rx_delayed_work;
+	struct work_struct rx_work;
+	struct strp_stats stats;
+	struct strp_callbacks cb;
+};
+
+/* Must be called with lock held for attached socket */
+static inline void strp_pause(struct strparser *strp)
+{
+	strp->rx_paused = 1;
+}
+
+/* May be called without holding lock for attached socket */
+static inline void strp_unpause(struct strparser *strp)
+{
+	strp->rx_paused = 0;
+}
+
+static inline void save_strp_stats(struct strparser *strp,
+				   struct strp_aggr_stats *agg_stats)
+{
+	/* Save psock statistics in the mux when psock is being unattached. */
+
+#define SAVE_PSOCK_STATS(_stat) (agg_stats->_stat +=		\
+				 strp->stats._stat)
+	SAVE_PSOCK_STATS(rx_msgs);
+	SAVE_PSOCK_STATS(rx_bytes);
+	SAVE_PSOCK_STATS(rx_mem_fail);
+	SAVE_PSOCK_STATS(rx_need_more_hdr);
+	SAVE_PSOCK_STATS(rx_msg_too_big);
+	SAVE_PSOCK_STATS(rx_msg_timeouts);
+	SAVE_PSOCK_STATS(rx_bad_hdr_len);
+#undef SAVE_PSOCK_STATS
+
+	if (strp->rx_aborted)
+		agg_stats->rx_aborts++;
+	if (strp->rx_interrupted)
+		agg_stats->rx_interrupted++;
+	if (strp->rx_unrecov_intr)
+		agg_stats->rx_unrecov_intr++;
+}
+
+static inline void aggregate_strp_stats(struct strp_aggr_stats *stats,
+					struct strp_aggr_stats *agg_stats)
+{
+#define SAVE_PSOCK_STATS(_stat) (agg_stats->_stat += stats->_stat)
+	SAVE_PSOCK_STATS(rx_msgs);
+	SAVE_PSOCK_STATS(rx_bytes);
+	SAVE_PSOCK_STATS(rx_mem_fail);
+	SAVE_PSOCK_STATS(rx_need_more_hdr);
+	SAVE_PSOCK_STATS(rx_msg_too_big);
+	SAVE_PSOCK_STATS(rx_msg_timeouts);
+	SAVE_PSOCK_STATS(rx_bad_hdr_len);
+	SAVE_PSOCK_STATS(rx_aborts);
+	SAVE_PSOCK_STATS(rx_interrupted);
+	SAVE_PSOCK_STATS(rx_unrecov_intr);
+#undef SAVE_PSOCK_STATS
+
+}
+
+void strp_done(struct strparser *strp);
+void strp_stop(struct strparser *strp);
+void strp_check_rcv(struct strparser *strp);
+int strp_init(struct strparser *strp, struct sock *csk,
+	      struct strp_callbacks *cb);
+
+void strp_tcp_data_ready(struct sock *sk);
+
+
+#endif /* __NET_STRPARSER_H_ */
diff --git a/net/Kconfig b/net/Kconfig
index c2cdbce..7b6cd34 100644
--- a/net/Kconfig
+++ b/net/Kconfig
@@ -369,6 +369,7 @@ source "net/irda/Kconfig"
 source "net/bluetooth/Kconfig"
 source "net/rxrpc/Kconfig"
 source "net/kcm/Kconfig"
+source "net/strparser/Kconfig"
 
 config FIB_RULES
 	bool
diff --git a/net/Makefile b/net/Makefile
index 9bd20bb..4cafaa2 100644
--- a/net/Makefile
+++ b/net/Makefile
@@ -35,6 +35,7 @@ obj-$(CONFIG_BT)		+= bluetooth/
 obj-$(CONFIG_SUNRPC)		+= sunrpc/
 obj-$(CONFIG_AF_RXRPC)		+= rxrpc/
 obj-$(CONFIG_AF_KCM)		+= kcm/
+obj-$(CONFIG_STREAM_PARSER)	+= strparser/
 obj-$(CONFIG_ATM)		+= atm/
 obj-$(CONFIG_L2TP)		+= l2tp/
 obj-$(CONFIG_DECNET)		+= decnet/
diff --git a/net/strparser/Kconfig b/net/strparser/Kconfig
new file mode 100644
index 0000000..6cff3f6
--- /dev/null
+++ b/net/strparser/Kconfig
@@ -0,0 +1,4 @@
+
+config STREAM_PARSER
+	tristate
+	default n
diff --git a/net/strparser/Makefile b/net/strparser/Makefile
new file mode 100644
index 0000000..858a126
--- /dev/null
+++ b/net/strparser/Makefile
@@ -0,0 +1 @@
+obj-$(CONFIG_STREAM_PARSER) += strparser.o
diff --git a/net/strparser/strparser.c b/net/strparser/strparser.c
new file mode 100644
index 0000000..dc0a1ec
--- /dev/null
+++ b/net/strparser/strparser.c
@@ -0,0 +1,500 @@
+/*
+ * Stream Parser
+ *
+ * Copyright (c) 2016 Tom Herbert <tom@herbertland.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2
+ * as published by the Free Software Foundation.
+ */
+
+#include <linux/bpf.h>
+#include <linux/errno.h>
+#include <linux/errqueue.h>
+#include <linux/file.h>
+#include <linux/in.h>
+#include <linux/kernel.h>
+#include <linux/module.h>
+#include <linux/net.h>
+#include <linux/netdevice.h>
+#include <linux/poll.h>
+#include <linux/rculist.h>
+#include <linux/skbuff.h>
+#include <linux/socket.h>
+#include <linux/uaccess.h>
+#include <linux/workqueue.h>
+#include <net/strparser.h>
+#include <net/netns/generic.h>
+#include <net/sock.h>
+#include <net/tcp.h>
+
+static struct workqueue_struct *strp_wq;
+
+struct _strp_rx_msg {
+	/* Internal cb structure. struct strp_rx_msg must be first for passing
+	 * to upper layer.
+	 */
+	struct strp_rx_msg strp;
+	int accum_len;
+	int early_eaten;
+};
+
+static inline struct _strp_rx_msg *_strp_rx_msg(struct sk_buff *skb)
+{
+	return (struct _strp_rx_msg *)((void *)skb->cb +
+		offsetof(struct qdisc_skb_cb, data));
+}
+
+/* Lower lock held */
+static void strp_abort_rx_strp(struct strparser *strp, int err)
+{
+	struct sock *csk = strp->sk;
+
+	/* Unrecoverable error in receive */
+
+	del_timer(&strp->rx_msg_timer);
+
+	if (strp->rx_stopped)
+		return;
+
+	strp->rx_stopped = 1;
+
+	/* Report an error on the lower socket */
+	csk->sk_err = err;
+	csk->sk_error_report(csk);
+}
+
+static void strp_start_rx_timer(struct strparser *strp)
+{
+	if (strp->sk->sk_rcvtimeo)
+		mod_timer(&strp->rx_msg_timer, strp->sk->sk_rcvtimeo);
+}
+
+/* Lower lock held */
+void strp_parser_err(struct strparser *strp, int err, read_descriptor_t *desc)
+{
+	desc->error = err;
+	kfree_skb(strp->rx_skb_head);
+	strp->rx_skb_head = NULL;
+	strp_abort_rx_strp(strp, err);
+}
+
+/* Lower socket lock held */
+static int strp_tcp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
+			 unsigned int orig_offset, size_t orig_len)
+{
+	struct strparser *strp = (struct strparser *)desc->arg.data;
+	struct _strp_rx_msg *rxm;
+	struct sk_buff *head, *skb;
+	size_t eaten = 0, cand_len;
+	ssize_t extra;
+	int err;
+	bool cloned_orig = false;
+
+	if (strp->rx_paused)
+		return 0;
+
+	head = strp->rx_skb_head;
+	if (head) {
+		/* Message already in progress */
+
+		rxm = _strp_rx_msg(head);
+		if (unlikely(rxm->early_eaten)) {
+			/* Already some number of bytes on the receive sock
+			 * data saved in rx_skb_head, just indicate they
+			 * are consumed.
+			 */
+			eaten = orig_len <= rxm->early_eaten ?
+				orig_len : rxm->early_eaten;
+			rxm->early_eaten -= eaten;
+
+			return eaten;
+		}
+
+		if (unlikely(orig_offset)) {
+			/* Getting data with a non-zero offset when a message is
+			 * in progress is not expected. If it does happen, we
+			 * need to clone and pull since we can't deal with
+			 * offsets in the skbs for a message expect in the head.
+			 */
+			orig_skb = skb_clone(orig_skb, GFP_ATOMIC);
+			if (!orig_skb) {
+				STRP_STATS_INCR(strp->stats.rx_mem_fail);
+				desc->error = -ENOMEM;
+				return 0;
+			}
+			if (!pskb_pull(orig_skb, orig_offset)) {
+				STRP_STATS_INCR(strp->stats.rx_mem_fail);
+				kfree_skb(orig_skb);
+				desc->error = -ENOMEM;
+				return 0;
+			}
+			cloned_orig = true;
+			orig_offset = 0;
+		}
+
+		if (!strp->rx_skb_nextp) {
+			/* We are going to append to the frags_list of head.
+			 * Need to unshare the frag_list.
+			 */
+			err = skb_unclone(head, GFP_ATOMIC);
+			if (err) {
+				STRP_STATS_INCR(strp->stats.rx_mem_fail);
+				desc->error = err;
+				return 0;
+			}
+
+			if (unlikely(skb_shinfo(head)->frag_list)) {
+				/* We can't append to an sk_buff that already
+				 * has a frag_list. We create a new head, point
+				 * the frag_list of that to the old head, and
+				 * then are able to use the old head->next for
+				 * appending to the message.
+				 */
+				if (WARN_ON(head->next)) {
+					desc->error = -EINVAL;
+					return 0;
+				}
+
+				skb = alloc_skb(0, GFP_ATOMIC);
+				if (!skb) {
+					STRP_STATS_INCR(strp->stats.rx_mem_fail);
+					desc->error = -ENOMEM;
+					return 0;
+				}
+				skb->len = head->len;
+				skb->data_len = head->len;
+				skb->truesize = head->truesize;
+				*_strp_rx_msg(skb) = *_strp_rx_msg(head);
+				strp->rx_skb_nextp = &head->next;
+				skb_shinfo(skb)->frag_list = head;
+				strp->rx_skb_head = skb;
+				head = skb;
+			} else {
+				strp->rx_skb_nextp =
+				    &skb_shinfo(head)->frag_list;
+			}
+		}
+	}
+
+	while (eaten < orig_len) {
+		/* Always clone since we will consume something */
+		skb = skb_clone(orig_skb, GFP_ATOMIC);
+		if (!skb) {
+			STRP_STATS_INCR(strp->stats.rx_mem_fail);
+			desc->error = -ENOMEM;
+			break;
+		}
+
+		cand_len = orig_len - eaten;
+
+		head = strp->rx_skb_head;
+		if (!head) {
+			head = skb;
+			strp->rx_skb_head = head;
+			/* Will set rx_skb_nextp on next packet if needed */
+			strp->rx_skb_nextp = NULL;
+			rxm = _strp_rx_msg(head);
+			memset(rxm, 0, sizeof(*rxm));
+			rxm->strp.offset = orig_offset + eaten;
+		} else {
+			/* Unclone since we may be appending to an skb that we
+			 * already share a frag_list with.
+			 */
+			err = skb_unclone(skb, GFP_ATOMIC);
+			if (err) {
+				STRP_STATS_INCR(strp->stats.rx_mem_fail);
+				desc->error = err;
+				break;
+			}
+
+			rxm = _strp_rx_msg(head);
+			*strp->rx_skb_nextp = skb;
+			strp->rx_skb_nextp = &skb->next;
+			head->data_len += skb->len;
+			head->len += skb->len;
+			head->truesize += skb->truesize;
+		}
+
+		if (!rxm->strp.full_len) {
+			ssize_t len;
+
+			len = (*strp->cb.parse_msg)(strp, head);
+
+			if (!len) {
+				/* Need more header to determine length */
+				if (!rxm->accum_len) {
+					/* Start RX timer for new message */
+					strp_start_rx_timer(strp);
+				}
+				rxm->accum_len += cand_len;
+				eaten += cand_len;
+				STRP_STATS_INCR(strp->stats.rx_need_more_hdr);
+				WARN_ON(eaten != orig_len);
+				break;
+			} else if (len < 0) {
+				if (len == -ESTRPIPE && rxm->accum_len) {
+					len = -ENODATA;
+					strp->rx_unrecov_intr = 1;
+				} else {
+					strp->rx_interrupted = 1;
+				}
+				strp_parser_err(strp, err, desc);
+				break;
+			} else if (len > strp->sk->sk_rcvbuf) {
+				/* Message length exceeds maximum allowed */
+				STRP_STATS_INCR(strp->stats.rx_msg_too_big);
+				strp_parser_err(strp, -EMSGSIZE, desc);
+				break;
+			} else if (len <= (ssize_t)head->len -
+					  skb->len - rxm->strp.offset) {
+				/* Length must be into new skb (and also
+				 * greater than zero)
+				 */
+				STRP_STATS_INCR(strp->stats.rx_bad_hdr_len);
+				strp_parser_err(strp, -EPROTO, desc);
+				break;
+			}
+
+			rxm->strp.full_len = len;
+		}
+
+		extra = (ssize_t)(rxm->accum_len + cand_len) -
+			rxm->strp.full_len;
+
+		if (extra < 0) {
+			/* Message not complete yet. */
+			if (rxm->strp.full_len - rxm->accum_len >
+			    tcp_inq(strp->sk)) {
+				/* Don't have the whole messages in the socket
+				 * buffer. Set strp->rx_need_bytes to wait for
+				 * the rest of the message. Also, set "early
+				 * eaten" since we've already buffered the skb
+				 * but don't consume yet per tcp_read_sock.
+				 */
+
+				if (!rxm->accum_len) {
+					/* Start RX timer for new message */
+					strp_start_rx_timer(strp);
+				}
+
+				strp->rx_need_bytes = rxm->strp.full_len -
+						       rxm->accum_len;
+				rxm->accum_len += cand_len;
+				rxm->early_eaten = cand_len;
+				STRP_STATS_ADD(strp->stats.rx_bytes, cand_len);
+				desc->count = 0; /* Stop reading socket */
+				break;
+			}
+			rxm->accum_len += cand_len;
+			eaten += cand_len;
+			WARN_ON(eaten != orig_len);
+			break;
+		}
+
+		/* Positive extra indicates ore bytes than needed for the
+		 * message
+		 */
+
+		WARN_ON(extra > cand_len);
+
+		eaten += (cand_len - extra);
+
+		/* Hurray, we have a new message! */
+		del_timer(&strp->rx_msg_timer);
+		strp->rx_skb_head = NULL;
+		STRP_STATS_INCR(strp->stats.rx_msgs);
+
+		/* Give skb to upper layer */
+		strp->cb.rcv_msg(strp, head);
+
+		if (unlikely(strp->rx_paused)) {
+			/* Upper layer paused strp */
+			break;
+		}
+	}
+
+	if (cloned_orig)
+		kfree_skb(orig_skb);
+
+	STRP_STATS_ADD(strp->stats.rx_bytes, eaten);
+
+	return eaten;
+}
+
+static int default_read_sock_done(struct strparser *strp, int err)
+{
+	return err;
+}
+
+/* Called with lock held on lower socket */
+static int strp_tcp_read_sock(struct strparser *strp)
+{
+	read_descriptor_t desc;
+
+	desc.arg.data = strp;
+	desc.error = 0;
+	desc.count = 1; /* give more than one skb per call */
+
+	/* sk should be locked here, so okay to do tcp_read_sock */
+	tcp_read_sock(strp->sk, &desc, strp_tcp_recv);
+
+	desc.error = strp->cb.read_sock_done(strp, desc.error);
+
+	return desc.error;
+}
+
+/* Lower sock lock held */
+void strp_tcp_data_ready(struct sock *sk)
+{
+	struct strparser *strp;
+
+	read_lock_bh(&sk->sk_callback_lock);
+
+	strp = (struct strparser *)sk->sk_user_data;
+	if (unlikely(!strp || strp->rx_stopped))
+		goto out;
+
+	if (strp->rx_paused)
+		goto out;
+
+	if (strp->rx_need_bytes) {
+		if (tcp_inq(sk) >= strp->rx_need_bytes)
+			strp->rx_need_bytes = 0;
+		else
+			goto out;
+	}
+
+	if (strp_tcp_read_sock(strp) == -ENOMEM)
+		queue_delayed_work(strp_wq, &strp->rx_delayed_work, 0);
+
+out:
+	read_unlock_bh(&sk->sk_callback_lock);
+}
+EXPORT_SYMBOL(strp_tcp_data_ready);
+
+static void do_strp_rx_work(struct strparser *strp)
+{
+	read_descriptor_t rd_desc;
+	struct sock *csk = strp->sk;
+
+	/* We need the read lock to synchronize with strp_tcp_data_ready. We
+	 * need the socket lock for calling tcp_read_sock.
+	 */
+	lock_sock(csk);
+	read_lock_bh(&csk->sk_callback_lock);
+
+	if (unlikely(csk->sk_user_data != strp))
+		goto out;
+
+	if (unlikely(strp->rx_stopped))
+		goto out;
+
+	if (strp->rx_paused)
+		goto out;
+
+	rd_desc.arg.data = strp;
+
+	if (strp_tcp_read_sock(strp) == -ENOMEM)
+		queue_delayed_work(strp_wq, &strp->rx_delayed_work, 0);
+
+out:
+	read_unlock_bh(&csk->sk_callback_lock);
+	release_sock(csk);
+}
+
+static void strp_rx_work(struct work_struct *w)
+{
+	do_strp_rx_work(container_of(w, struct strparser, rx_work));
+}
+
+static void strp_rx_delayed_work(struct work_struct *w)
+{
+	do_strp_rx_work(container_of(w, struct strparser,
+				     rx_delayed_work.work));
+}
+
+static void strp_rx_msg_timeout(unsigned long arg)
+{
+	struct strparser *strp = (struct strparser *)arg;
+
+	/* Message assembly timed out */
+	STRP_STATS_INCR(strp->stats.rx_msg_timeouts);
+	lock_sock(strp->sk);
+	strp_abort_rx_strp(strp, ETIMEDOUT);
+	release_sock(strp->sk);
+}
+
+void strp_queue_work(struct strparser *strp)
+{
+	queue_work(strp_wq, &strp->rx_work);
+}
+
+int strp_init(struct strparser *strp, struct sock *csk,
+	      struct strp_callbacks *cb)
+{
+	if (!cb || !cb->rcv_msg || !cb->parse_msg)
+		return -EINVAL;
+
+	memset(strp, 0, sizeof(*strp));
+
+	strp->sk = csk;
+
+	setup_timer(&strp->rx_msg_timer, strp_rx_msg_timeout,
+		    (unsigned long)strp);
+
+	INIT_WORK(&strp->rx_work, strp_rx_work);
+	INIT_DELAYED_WORK(&strp->rx_delayed_work, strp_rx_delayed_work);
+
+	strp->cb.rcv_msg = cb->rcv_msg;
+	strp->cb.parse_msg = cb->parse_msg;
+	strp->cb.read_sock_done = cb->read_sock_done ? : default_read_sock_done;
+	strp->cb.abort_parser = cb->abort_parser ? : strp_abort_rx_strp;
+
+	return 0;
+}
+EXPORT_SYMBOL(strp_init);
+
+/* strp must already be stopped so that strp_tcp_recv will no longer be called.
+ * Note that strp_done is not called with the lower socket held.
+ */
+void strp_done(struct strparser *strp)
+{
+	WARN_ON(!strp->rx_stopped);
+
+	del_timer_sync(&strp->rx_msg_timer);
+	cancel_work_sync(&strp->rx_work);
+	cancel_delayed_work_sync(&strp->rx_delayed_work);
+
+	if (strp->rx_skb_head) {
+		kfree_skb(strp->rx_skb_head);
+		strp->rx_skb_head = NULL;
+	}
+}
+EXPORT_SYMBOL(strp_done);
+
+void strp_stop(struct strparser *strp)
+{
+	strp->rx_stopped = 1;
+}
+EXPORT_SYMBOL(strp_stop);
+
+void strp_check_rcv(struct strparser *strp)
+{
+	queue_work(strp_wq, &strp->rx_work);
+}
+EXPORT_SYMBOL(strp_check_rcv);
+
+static int __init strp_mod_init(void)
+{
+	strp_wq = create_singlethread_workqueue("kstrp");
+
+	return 0;
+}
+
+static void __exit strp_mod_exit(void)
+{
+}
+module_init(strp_mod_init);
+module_exit(strp_mod_exit);
+
-- 
2.8.0.rc2

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH net-next v2 2/3] kcm: Use stream parser
  2016-08-01 16:33 [PATCH net-next v2 0/3] strp: Stream parser for messages Tom Herbert
  2016-08-01 16:33 ` [PATCH net-next v2 1/3] strparser: " Tom Herbert
@ 2016-08-01 16:33 ` Tom Herbert
  2016-08-01 16:33 ` [PATCH net-next v2 3/3] strparser: Documentation Tom Herbert
  2016-08-01 20:27 ` [PATCH net-next v2 0/3] strp: Stream parser for messages David Miller
  3 siblings, 0 replies; 7+ messages in thread
From: Tom Herbert @ 2016-08-01 16:33 UTC (permalink / raw)
  To: davem, netdev; +Cc: kernel-team

Adapt KCM to use the stream parser. This mostly involves removing
the RX handling and setting up the strparser using the interface.

Signed-off-by: Tom Herbert <tom@herbertland.com>
---
 include/net/kcm.h         |  37 +---
 net/ipv6/ila/ila_common.c |   1 -
 net/kcm/Kconfig           |   1 +
 net/kcm/kcmproc.c         |  44 +++--
 net/kcm/kcmsock.c         | 464 ++++++++--------------------------------------
 5 files changed, 115 insertions(+), 432 deletions(-)

diff --git a/include/net/kcm.h b/include/net/kcm.h
index 2840b58..2a89658 100644
--- a/include/net/kcm.h
+++ b/include/net/kcm.h
@@ -13,6 +13,7 @@
 
 #include <linux/skbuff.h>
 #include <net/sock.h>
+#include <net/strparser.h>
 #include <uapi/linux/kcm.h>
 
 extern unsigned int kcm_net_id;
@@ -21,16 +22,8 @@ extern unsigned int kcm_net_id;
 #define KCM_STATS_INCR(stat) ((stat)++)
 
 struct kcm_psock_stats {
-	unsigned long long rx_msgs;
-	unsigned long long rx_bytes;
 	unsigned long long tx_msgs;
 	unsigned long long tx_bytes;
-	unsigned int rx_aborts;
-	unsigned int rx_mem_fail;
-	unsigned int rx_need_more_hdr;
-	unsigned int rx_msg_too_big;
-	unsigned int rx_msg_timeouts;
-	unsigned int rx_bad_hdr_len;
 	unsigned long long reserved;
 	unsigned long long unreserved;
 	unsigned int tx_aborts;
@@ -64,13 +57,6 @@ struct kcm_tx_msg {
 	struct sk_buff *last_skb;
 };
 
-struct kcm_rx_msg {
-	int full_len;
-	int accum_len;
-	int offset;
-	int early_eaten;
-};
-
 /* Socket structure for KCM client sockets */
 struct kcm_sock {
 	struct sock sk;
@@ -87,6 +73,7 @@ struct kcm_sock {
 	struct work_struct tx_work;
 	struct list_head wait_psock_list;
 	struct sk_buff *seq_skb;
+	u32 tx_stopped : 1;
 
 	/* Don't use bit fields here, these are set under different locks */
 	bool tx_wait;
@@ -104,11 +91,11 @@ struct bpf_prog;
 /* Structure for an attached lower socket */
 struct kcm_psock {
 	struct sock *sk;
+	struct strparser strp;
 	struct kcm_mux *mux;
 	int index;
 
 	u32 tx_stopped : 1;
-	u32 rx_stopped : 1;
 	u32 done : 1;
 	u32 unattaching : 1;
 
@@ -121,18 +108,12 @@ struct kcm_psock {
 	struct kcm_psock_stats stats;
 
 	/* Receive */
-	struct sk_buff *rx_skb_head;
-	struct sk_buff **rx_skb_nextp;
-	struct sk_buff *ready_rx_msg;
 	struct list_head psock_ready_list;
-	struct work_struct rx_work;
-	struct delayed_work rx_delayed_work;
 	struct bpf_prog *bpf_prog;
 	struct kcm_sock *rx_kcm;
 	unsigned long long saved_rx_bytes;
 	unsigned long long saved_rx_msgs;
-	struct timer_list rx_msg_timer;
-	unsigned int rx_need_bytes;
+	struct sk_buff *ready_rx_msg;
 
 	/* Transmit */
 	struct kcm_sock *tx_kcm;
@@ -146,6 +127,7 @@ struct kcm_net {
 	struct mutex mutex;
 	struct kcm_psock_stats aggregate_psock_stats;
 	struct kcm_mux_stats aggregate_mux_stats;
+	struct strp_aggr_stats aggregate_strp_stats;
 	struct list_head mux_list;
 	int count;
 };
@@ -163,6 +145,7 @@ struct kcm_mux {
 
 	struct kcm_mux_stats stats;
 	struct kcm_psock_stats aggregate_psock_stats;
+	struct strp_aggr_stats aggregate_strp_stats;
 
 	/* Receive */
 	spinlock_t rx_lock ____cacheline_aligned_in_smp;
@@ -190,14 +173,6 @@ static inline void aggregate_psock_stats(struct kcm_psock_stats *stats,
 	/* Save psock statistics in the mux when psock is being unattached. */
 
 #define SAVE_PSOCK_STATS(_stat) (agg_stats->_stat += stats->_stat)
-	SAVE_PSOCK_STATS(rx_msgs);
-	SAVE_PSOCK_STATS(rx_bytes);
-	SAVE_PSOCK_STATS(rx_aborts);
-	SAVE_PSOCK_STATS(rx_mem_fail);
-	SAVE_PSOCK_STATS(rx_need_more_hdr);
-	SAVE_PSOCK_STATS(rx_msg_too_big);
-	SAVE_PSOCK_STATS(rx_msg_timeouts);
-	SAVE_PSOCK_STATS(rx_bad_hdr_len);
 	SAVE_PSOCK_STATS(tx_msgs);
 	SAVE_PSOCK_STATS(tx_bytes);
 	SAVE_PSOCK_STATS(reserved);
diff --git a/net/ipv6/ila/ila_common.c b/net/ipv6/ila/ila_common.c
index ec9efbc..aba0998 100644
--- a/net/ipv6/ila/ila_common.c
+++ b/net/ipv6/ila/ila_common.c
@@ -172,6 +172,5 @@ static void __exit ila_fini(void)
 
 module_init(ila_init);
 module_exit(ila_fini);
-MODULE_ALIAS_RTNL_LWT(ILA);
 MODULE_AUTHOR("Tom Herbert <tom@herbertland.com>");
 MODULE_LICENSE("GPL");
diff --git a/net/kcm/Kconfig b/net/kcm/Kconfig
index 5db94d9..87fca36 100644
--- a/net/kcm/Kconfig
+++ b/net/kcm/Kconfig
@@ -3,6 +3,7 @@ config AF_KCM
 	tristate "KCM sockets"
 	depends on INET
 	select BPF_SYSCALL
+	select STREAM_PARSER
 	---help---
 	  KCM (Kernel Connection Multiplexor) sockets provide a method
 	  for multiplexing messages of a message based application
diff --git a/net/kcm/kcmproc.c b/net/kcm/kcmproc.c
index 16c2e03..47e4453 100644
--- a/net/kcm/kcmproc.c
+++ b/net/kcm/kcmproc.c
@@ -155,8 +155,8 @@ static void kcm_format_psock(struct kcm_psock *psock, struct seq_file *seq,
 	seq_printf(seq,
 		   "   psock-%-5u %-10llu %-16llu %-10llu %-16llu %-8d %-8d %-8d %-8d ",
 		   psock->index,
-		   psock->stats.rx_msgs,
-		   psock->stats.rx_bytes,
+		   psock->strp.stats.rx_msgs,
+		   psock->strp.stats.rx_bytes,
 		   psock->stats.tx_msgs,
 		   psock->stats.tx_bytes,
 		   psock->sk->sk_receive_queue.qlen,
@@ -170,9 +170,12 @@ static void kcm_format_psock(struct kcm_psock *psock, struct seq_file *seq,
 	if (psock->tx_stopped)
 		seq_puts(seq, "TxStop ");
 
-	if (psock->rx_stopped)
+	if (psock->strp.rx_stopped)
 		seq_puts(seq, "RxStop ");
 
+	if (psock->strp.rx_paused)
+		seq_puts(seq, "RxPause ");
+
 	if (psock->tx_kcm)
 		seq_printf(seq, "Rsvd-%d ", psock->tx_kcm->index);
 
@@ -275,6 +278,7 @@ static int kcm_stats_seq_show(struct seq_file *seq, void *v)
 {
 	struct kcm_psock_stats psock_stats;
 	struct kcm_mux_stats mux_stats;
+	struct strp_aggr_stats strp_stats;
 	struct kcm_mux *mux;
 	struct kcm_psock *psock;
 	struct net *net = seq->private;
@@ -282,20 +286,28 @@ static int kcm_stats_seq_show(struct seq_file *seq, void *v)
 
 	memset(&mux_stats, 0, sizeof(mux_stats));
 	memset(&psock_stats, 0, sizeof(psock_stats));
+	memset(&strp_stats, 0, sizeof(strp_stats));
 
 	mutex_lock(&knet->mutex);
 
 	aggregate_mux_stats(&knet->aggregate_mux_stats, &mux_stats);
 	aggregate_psock_stats(&knet->aggregate_psock_stats,
 			      &psock_stats);
+	aggregate_strp_stats(&knet->aggregate_strp_stats,
+			     &strp_stats);
 
 	list_for_each_entry_rcu(mux, &knet->mux_list, kcm_mux_list) {
 		spin_lock_bh(&mux->lock);
 		aggregate_mux_stats(&mux->stats, &mux_stats);
 		aggregate_psock_stats(&mux->aggregate_psock_stats,
 				      &psock_stats);
-		list_for_each_entry(psock, &mux->psocks, psock_list)
+		aggregate_strp_stats(&mux->aggregate_strp_stats,
+				     &strp_stats);
+		list_for_each_entry(psock, &mux->psocks, psock_list) {
 			aggregate_psock_stats(&psock->stats, &psock_stats);
+			save_strp_stats(&psock->strp, &strp_stats);
+		}
+
 		spin_unlock_bh(&mux->lock);
 	}
 
@@ -328,7 +340,7 @@ static int kcm_stats_seq_show(struct seq_file *seq, void *v)
 		   mux_stats.rx_ready_drops);
 
 	seq_printf(seq,
-		   "%-8s %-10s %-16s %-10s %-16s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s\n",
+		   "%-8s %-10s %-16s %-10s %-16s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s\n",
 		   "Psock",
 		   "RX-Msgs",
 		   "RX-Bytes",
@@ -337,6 +349,8 @@ static int kcm_stats_seq_show(struct seq_file *seq, void *v)
 		   "Reserved",
 		   "Unreserved",
 		   "RX-Aborts",
+		   "RX-Intr",
+		   "RX-Unrecov",
 		   "RX-MemFail",
 		   "RX-NeedMor",
 		   "RX-BadLen",
@@ -345,20 +359,22 @@ static int kcm_stats_seq_show(struct seq_file *seq, void *v)
 		   "TX-Aborts");
 
 	seq_printf(seq,
-		   "%-8s %-10llu %-16llu %-10llu %-16llu %-10llu %-10llu %-10u %-10u %-10u %-10u %-10u %-10u %-10u\n",
+		   "%-8s %-10llu %-16llu %-10llu %-16llu %-10llu %-10llu %-10u %-10u %-10u %-10u %-10u %-10u %-10u %-10u %-10u\n",
 		   "",
-		   psock_stats.rx_msgs,
-		   psock_stats.rx_bytes,
+		   strp_stats.rx_msgs,
+		   strp_stats.rx_bytes,
 		   psock_stats.tx_msgs,
 		   psock_stats.tx_bytes,
 		   psock_stats.reserved,
 		   psock_stats.unreserved,
-		   psock_stats.rx_aborts,
-		   psock_stats.rx_mem_fail,
-		   psock_stats.rx_need_more_hdr,
-		   psock_stats.rx_bad_hdr_len,
-		   psock_stats.rx_msg_too_big,
-		   psock_stats.rx_msg_timeouts,
+		   strp_stats.rx_aborts,
+		   strp_stats.rx_interrupted,
+		   strp_stats.rx_unrecov_intr,
+		   strp_stats.rx_mem_fail,
+		   strp_stats.rx_need_more_hdr,
+		   strp_stats.rx_bad_hdr_len,
+		   strp_stats.rx_msg_too_big,
+		   strp_stats.rx_msg_timeouts,
 		   psock_stats.tx_aborts);
 
 	return 0;
diff --git a/net/kcm/kcmsock.c b/net/kcm/kcmsock.c
index cb39e05..9d582d3 100644
--- a/net/kcm/kcmsock.c
+++ b/net/kcm/kcmsock.c
@@ -1,3 +1,13 @@
+/*
+ * Kernel Connection Multiplexor
+ *
+ * Copyright (c) 2016 Tom Herbert <tom@herbertland.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2
+ * as published by the Free Software Foundation.
+ */
+
 #include <linux/bpf.h>
 #include <linux/errno.h>
 #include <linux/errqueue.h>
@@ -35,38 +45,12 @@ static inline struct kcm_tx_msg *kcm_tx_msg(struct sk_buff *skb)
 	return (struct kcm_tx_msg *)skb->cb;
 }
 
-static inline struct kcm_rx_msg *kcm_rx_msg(struct sk_buff *skb)
-{
-	return (struct kcm_rx_msg *)((void *)skb->cb +
-				     offsetof(struct qdisc_skb_cb, data));
-}
-
 static void report_csk_error(struct sock *csk, int err)
 {
 	csk->sk_err = EPIPE;
 	csk->sk_error_report(csk);
 }
 
-/* Callback lock held */
-static void kcm_abort_rx_psock(struct kcm_psock *psock, int err,
-			       struct sk_buff *skb)
-{
-	struct sock *csk = psock->sk;
-
-	/* Unrecoverable error in receive */
-
-	del_timer(&psock->rx_msg_timer);
-
-	if (psock->rx_stopped)
-		return;
-
-	psock->rx_stopped = 1;
-	KCM_STATS_INCR(psock->stats.rx_aborts);
-
-	/* Report an error on the lower socket */
-	report_csk_error(csk, err);
-}
-
 static void kcm_abort_tx_psock(struct kcm_psock *psock, int err,
 			       bool wakeup_kcm)
 {
@@ -109,12 +93,13 @@ static void kcm_abort_tx_psock(struct kcm_psock *psock, int err,
 static void kcm_update_rx_mux_stats(struct kcm_mux *mux,
 				    struct kcm_psock *psock)
 {
-	KCM_STATS_ADD(mux->stats.rx_bytes,
-		      psock->stats.rx_bytes - psock->saved_rx_bytes);
+	STRP_STATS_ADD(mux->stats.rx_bytes,
+		       psock->strp.stats.rx_bytes -
+		       psock->saved_rx_bytes);
 	mux->stats.rx_msgs +=
-		psock->stats.rx_msgs - psock->saved_rx_msgs;
-	psock->saved_rx_msgs = psock->stats.rx_msgs;
-	psock->saved_rx_bytes = psock->stats.rx_bytes;
+		psock->strp.stats.rx_msgs - psock->saved_rx_msgs;
+	psock->saved_rx_msgs = psock->strp.stats.rx_msgs;
+	psock->saved_rx_bytes = psock->strp.stats.rx_bytes;
 }
 
 static void kcm_update_tx_mux_stats(struct kcm_mux *mux,
@@ -167,11 +152,11 @@ static void kcm_rcv_ready(struct kcm_sock *kcm)
 		 */
 		list_del(&psock->psock_ready_list);
 		psock->ready_rx_msg = NULL;
-
 		/* Commit clearing of ready_rx_msg for queuing work */
 		smp_mb();
 
-		queue_work(kcm_wq, &psock->rx_work);
+		strp_unpause(&psock->strp);
+		strp_check_rcv(&psock->strp);
 	}
 
 	/* Buffer limit is okay now, add to ready list */
@@ -285,6 +270,7 @@ static struct kcm_sock *reserve_rx_kcm(struct kcm_psock *psock,
 
 	if (list_empty(&mux->kcm_rx_waiters)) {
 		psock->ready_rx_msg = head;
+		strp_pause(&psock->strp);
 		list_add_tail(&psock->psock_ready_list,
 			      &mux->psocks_ready);
 		spin_unlock_bh(&mux->rx_lock);
@@ -353,343 +339,43 @@ static void unreserve_rx_kcm(struct kcm_psock *psock,
 	spin_unlock_bh(&mux->rx_lock);
 }
 
-static void kcm_start_rx_timer(struct kcm_psock *psock)
+/* Called with lower sock held */
+static void kcm_rcv_strparser(struct strparser *strp, struct sk_buff *skb)
 {
-	if (psock->sk->sk_rcvtimeo)
-		mod_timer(&psock->rx_msg_timer, psock->sk->sk_rcvtimeo);
-}
-
-/* Macro to invoke filter function. */
-#define KCM_RUN_FILTER(prog, ctx) \
-	(*prog->bpf_func)(ctx, prog->insnsi)
-
-/* Lower socket lock held */
-static int kcm_tcp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
-			unsigned int orig_offset, size_t orig_len)
-{
-	struct kcm_psock *psock = (struct kcm_psock *)desc->arg.data;
-	struct kcm_rx_msg *rxm;
+	struct kcm_psock *psock = container_of(strp, struct kcm_psock, strp);
 	struct kcm_sock *kcm;
-	struct sk_buff *head, *skb;
-	size_t eaten = 0, cand_len;
-	ssize_t extra;
-	int err;
-	bool cloned_orig = false;
-
-	if (psock->ready_rx_msg)
-		return 0;
-
-	head = psock->rx_skb_head;
-	if (head) {
-		/* Message already in progress */
-
-		rxm = kcm_rx_msg(head);
-		if (unlikely(rxm->early_eaten)) {
-			/* Already some number of bytes on the receive sock
-			 * data saved in rx_skb_head, just indicate they
-			 * are consumed.
-			 */
-			eaten = orig_len <= rxm->early_eaten ?
-				orig_len : rxm->early_eaten;
-			rxm->early_eaten -= eaten;
-
-			return eaten;
-		}
-
-		if (unlikely(orig_offset)) {
-			/* Getting data with a non-zero offset when a message is
-			 * in progress is not expected. If it does happen, we
-			 * need to clone and pull since we can't deal with
-			 * offsets in the skbs for a message expect in the head.
-			 */
-			orig_skb = skb_clone(orig_skb, GFP_ATOMIC);
-			if (!orig_skb) {
-				KCM_STATS_INCR(psock->stats.rx_mem_fail);
-				desc->error = -ENOMEM;
-				return 0;
-			}
-			if (!pskb_pull(orig_skb, orig_offset)) {
-				KCM_STATS_INCR(psock->stats.rx_mem_fail);
-				kfree_skb(orig_skb);
-				desc->error = -ENOMEM;
-				return 0;
-			}
-			cloned_orig = true;
-			orig_offset = 0;
-		}
-
-		if (!psock->rx_skb_nextp) {
-			/* We are going to append to the frags_list of head.
-			 * Need to unshare the frag_list.
-			 */
-			err = skb_unclone(head, GFP_ATOMIC);
-			if (err) {
-				KCM_STATS_INCR(psock->stats.rx_mem_fail);
-				desc->error = err;
-				return 0;
-			}
-
-			if (unlikely(skb_shinfo(head)->frag_list)) {
-				/* We can't append to an sk_buff that already
-				 * has a frag_list. We create a new head, point
-				 * the frag_list of that to the old head, and
-				 * then are able to use the old head->next for
-				 * appending to the message.
-				 */
-				if (WARN_ON(head->next)) {
-					desc->error = -EINVAL;
-					return 0;
-				}
-
-				skb = alloc_skb(0, GFP_ATOMIC);
-				if (!skb) {
-					KCM_STATS_INCR(psock->stats.rx_mem_fail);
-					desc->error = -ENOMEM;
-					return 0;
-				}
-				skb->len = head->len;
-				skb->data_len = head->len;
-				skb->truesize = head->truesize;
-				*kcm_rx_msg(skb) = *kcm_rx_msg(head);
-				psock->rx_skb_nextp = &head->next;
-				skb_shinfo(skb)->frag_list = head;
-				psock->rx_skb_head = skb;
-				head = skb;
-			} else {
-				psock->rx_skb_nextp =
-				    &skb_shinfo(head)->frag_list;
-			}
-		}
-	}
-
-	while (eaten < orig_len) {
-		/* Always clone since we will consume something */
-		skb = skb_clone(orig_skb, GFP_ATOMIC);
-		if (!skb) {
-			KCM_STATS_INCR(psock->stats.rx_mem_fail);
-			desc->error = -ENOMEM;
-			break;
-		}
-
-		cand_len = orig_len - eaten;
-
-		head = psock->rx_skb_head;
-		if (!head) {
-			head = skb;
-			psock->rx_skb_head = head;
-			/* Will set rx_skb_nextp on next packet if needed */
-			psock->rx_skb_nextp = NULL;
-			rxm = kcm_rx_msg(head);
-			memset(rxm, 0, sizeof(*rxm));
-			rxm->offset = orig_offset + eaten;
-		} else {
-			/* Unclone since we may be appending to an skb that we
-			 * already share a frag_list with.
-			 */
-			err = skb_unclone(skb, GFP_ATOMIC);
-			if (err) {
-				KCM_STATS_INCR(psock->stats.rx_mem_fail);
-				desc->error = err;
-				break;
-			}
-
-			rxm = kcm_rx_msg(head);
-			*psock->rx_skb_nextp = skb;
-			psock->rx_skb_nextp = &skb->next;
-			head->data_len += skb->len;
-			head->len += skb->len;
-			head->truesize += skb->truesize;
-		}
-
-		if (!rxm->full_len) {
-			ssize_t len;
-
-			len = KCM_RUN_FILTER(psock->bpf_prog, head);
-
-			if (!len) {
-				/* Need more header to determine length */
-				if (!rxm->accum_len) {
-					/* Start RX timer for new message */
-					kcm_start_rx_timer(psock);
-				}
-				rxm->accum_len += cand_len;
-				eaten += cand_len;
-				KCM_STATS_INCR(psock->stats.rx_need_more_hdr);
-				WARN_ON(eaten != orig_len);
-				break;
-			} else if (len > psock->sk->sk_rcvbuf) {
-				/* Message length exceeds maximum allowed */
-				KCM_STATS_INCR(psock->stats.rx_msg_too_big);
-				desc->error = -EMSGSIZE;
-				psock->rx_skb_head = NULL;
-				kcm_abort_rx_psock(psock, EMSGSIZE, head);
-				break;
-			} else if (len <= (ssize_t)head->len -
-					  skb->len - rxm->offset) {
-				/* Length must be into new skb (and also
-				 * greater than zero)
-				 */
-				KCM_STATS_INCR(psock->stats.rx_bad_hdr_len);
-				desc->error = -EPROTO;
-				psock->rx_skb_head = NULL;
-				kcm_abort_rx_psock(psock, EPROTO, head);
-				break;
-			}
-
-			rxm->full_len = len;
-		}
-
-		extra = (ssize_t)(rxm->accum_len + cand_len) - rxm->full_len;
-
-		if (extra < 0) {
-			/* Message not complete yet. */
-			if (rxm->full_len - rxm->accum_len >
-			    tcp_inq(psock->sk)) {
-				/* Don't have the whole messages in the socket
-				 * buffer. Set psock->rx_need_bytes to wait for
-				 * the rest of the message. Also, set "early
-				 * eaten" since we've already buffered the skb
-				 * but don't consume yet per tcp_read_sock.
-				 */
-
-				if (!rxm->accum_len) {
-					/* Start RX timer for new message */
-					kcm_start_rx_timer(psock);
-				}
-
-				psock->rx_need_bytes = rxm->full_len -
-						       rxm->accum_len;
-				rxm->accum_len += cand_len;
-				rxm->early_eaten = cand_len;
-				KCM_STATS_ADD(psock->stats.rx_bytes, cand_len);
-				desc->count = 0; /* Stop reading socket */
-				break;
-			}
-			rxm->accum_len += cand_len;
-			eaten += cand_len;
-			WARN_ON(eaten != orig_len);
-			break;
-		}
-
-		/* Positive extra indicates ore bytes than needed for the
-		 * message
-		 */
-
-		WARN_ON(extra > cand_len);
-
-		eaten += (cand_len - extra);
-
-		/* Hurray, we have a new message! */
-		del_timer(&psock->rx_msg_timer);
-		psock->rx_skb_head = NULL;
-		KCM_STATS_INCR(psock->stats.rx_msgs);
 
 try_queue:
-		kcm = reserve_rx_kcm(psock, head);
-		if (!kcm) {
-			/* Unable to reserve a KCM, message is held in psock. */
-			break;
-		}
-
-		if (kcm_queue_rcv_skb(&kcm->sk, head)) {
-			/* Should mean socket buffer full */
-			unreserve_rx_kcm(psock, false);
-			goto try_queue;
-		}
+	kcm = reserve_rx_kcm(psock, skb);
+	if (!kcm) {
+		 /* Unable to reserve a KCM, message is held in psock and strp
+		  * is paused.
+		  */
+		return;
 	}
 
-	if (cloned_orig)
-		kfree_skb(orig_skb);
-
-	KCM_STATS_ADD(psock->stats.rx_bytes, eaten);
-
-	return eaten;
-}
-
-/* Called with lock held on lower socket */
-static int psock_tcp_read_sock(struct kcm_psock *psock)
-{
-	read_descriptor_t desc;
-
-	desc.arg.data = psock;
-	desc.error = 0;
-	desc.count = 1; /* give more than one skb per call */
-
-	/* sk should be locked here, so okay to do tcp_read_sock */
-	tcp_read_sock(psock->sk, &desc, kcm_tcp_recv);
-
-	unreserve_rx_kcm(psock, true);
-
-	return desc.error;
-}
-
-/* Lower sock lock held */
-static void psock_tcp_data_ready(struct sock *sk)
-{
-	struct kcm_psock *psock;
-
-	read_lock_bh(&sk->sk_callback_lock);
-
-	psock = (struct kcm_psock *)sk->sk_user_data;
-	if (unlikely(!psock || psock->rx_stopped))
-		goto out;
-
-	if (psock->ready_rx_msg)
-		goto out;
-
-	if (psock->rx_need_bytes) {
-		if (tcp_inq(sk) >= psock->rx_need_bytes)
-			psock->rx_need_bytes = 0;
-		else
-			goto out;
+	if (kcm_queue_rcv_skb(&kcm->sk, skb)) {
+		/* Should mean socket buffer full */
+		unreserve_rx_kcm(psock, false);
+		goto try_queue;
 	}
-
-	if (psock_tcp_read_sock(psock) == -ENOMEM)
-		queue_delayed_work(kcm_wq, &psock->rx_delayed_work, 0);
-
-out:
-	read_unlock_bh(&sk->sk_callback_lock);
 }
 
-static void do_psock_rx_work(struct kcm_psock *psock)
+static int kcm_parse_func_strparser(struct strparser *strp, struct sk_buff *skb)
 {
-	read_descriptor_t rd_desc;
-	struct sock *csk = psock->sk;
-
-	/* We need the read lock to synchronize with psock_tcp_data_ready. We
-	 * need the socket lock for calling tcp_read_sock.
-	 */
-	lock_sock(csk);
-	read_lock_bh(&csk->sk_callback_lock);
+	struct kcm_psock *psock = container_of(strp, struct kcm_psock, strp);
+	struct bpf_prog *prog = psock->bpf_prog;
 
-	if (unlikely(csk->sk_user_data != psock))
-		goto out;
-
-	if (unlikely(psock->rx_stopped))
-		goto out;
-
-	if (psock->ready_rx_msg)
-		goto out;
-
-	rd_desc.arg.data = psock;
-
-	if (psock_tcp_read_sock(psock) == -ENOMEM)
-		queue_delayed_work(kcm_wq, &psock->rx_delayed_work, 0);
-
-out:
-	read_unlock_bh(&csk->sk_callback_lock);
-	release_sock(csk);
+	return (*prog->bpf_func)(skb, prog->insnsi);
 }
 
-static void psock_rx_work(struct work_struct *w)
+static int kcm_read_sock_done(struct strparser *strp, int err)
 {
-	do_psock_rx_work(container_of(w, struct kcm_psock, rx_work));
-}
+	struct kcm_psock *psock = container_of(strp, struct kcm_psock, strp);
 
-static void psock_rx_delayed_work(struct work_struct *w)
-{
-	do_psock_rx_work(container_of(w, struct kcm_psock,
-				      rx_delayed_work.work));
+	unreserve_rx_kcm(psock, true);
+
+	return err;
 }
 
 static void psock_tcp_state_change(struct sock *sk)
@@ -705,22 +391,25 @@ static void psock_tcp_state_change(struct sock *sk)
 static void psock_tcp_write_space(struct sock *sk)
 {
 	struct kcm_psock *psock;
+	struct strparser *strp;
 	struct kcm_mux *mux;
 	struct kcm_sock *kcm;
 
 	read_lock_bh(&sk->sk_callback_lock);
 
-	psock = (struct kcm_psock *)sk->sk_user_data;
-	if (unlikely(!psock))
+	strp = (struct strparser *)sk->sk_user_data;
+	if (unlikely(!strp))
 		goto out;
 
+	psock = container_of(strp, struct kcm_psock, strp);
+
 	mux = psock->mux;
 
 	spin_lock_bh(&mux->lock);
 
 	/* Check if the socket is reserved so someone is waiting for sending. */
 	kcm = psock->tx_kcm;
-	if (kcm)
+	if (kcm && !unlikely(kcm->tx_stopped))
 		queue_work(kcm_wq, &kcm->tx_work);
 
 	spin_unlock_bh(&mux->lock);
@@ -1411,7 +1100,7 @@ static int kcm_recvmsg(struct socket *sock, struct msghdr *msg,
 	struct kcm_sock *kcm = kcm_sk(sk);
 	int err = 0;
 	long timeo;
-	struct kcm_rx_msg *rxm;
+	struct strp_rx_msg *rxm;
 	int copied = 0;
 	struct sk_buff *skb;
 
@@ -1425,7 +1114,7 @@ static int kcm_recvmsg(struct socket *sock, struct msghdr *msg,
 
 	/* Okay, have a message on the receive queue */
 
-	rxm = kcm_rx_msg(skb);
+	rxm = strp_rx_msg(skb);
 
 	if (len > rxm->full_len)
 		len = rxm->full_len;
@@ -1481,7 +1170,7 @@ static ssize_t kcm_splice_read(struct socket *sock, loff_t *ppos,
 	struct sock *sk = sock->sk;
 	struct kcm_sock *kcm = kcm_sk(sk);
 	long timeo;
-	struct kcm_rx_msg *rxm;
+	struct strp_rx_msg *rxm;
 	int err = 0;
 	ssize_t copied;
 	struct sk_buff *skb;
@@ -1498,7 +1187,7 @@ static ssize_t kcm_splice_read(struct socket *sock, loff_t *ppos,
 
 	/* Okay, have a message on the receive queue */
 
-	rxm = kcm_rx_msg(skb);
+	rxm = strp_rx_msg(skb);
 
 	if (len > rxm->full_len)
 		len = rxm->full_len;
@@ -1674,15 +1363,6 @@ static void init_kcm_sock(struct kcm_sock *kcm, struct kcm_mux *mux)
 	spin_unlock_bh(&mux->rx_lock);
 }
 
-static void kcm_rx_msg_timeout(unsigned long arg)
-{
-	struct kcm_psock *psock = (struct kcm_psock *)arg;
-
-	/* Message assembly timed out */
-	KCM_STATS_INCR(psock->stats.rx_msg_timeouts);
-	kcm_abort_rx_psock(psock, ETIMEDOUT, NULL);
-}
-
 static int kcm_attach(struct socket *sock, struct socket *csock,
 		      struct bpf_prog *prog)
 {
@@ -1692,6 +1372,7 @@ static int kcm_attach(struct socket *sock, struct socket *csock,
 	struct kcm_psock *psock = NULL, *tpsock;
 	struct list_head *head;
 	int index = 0;
+	struct strp_callbacks cb;
 
 	if (csock->ops->family != PF_INET &&
 	    csock->ops->family != PF_INET6)
@@ -1713,11 +1394,12 @@ static int kcm_attach(struct socket *sock, struct socket *csock,
 	psock->sk = csk;
 	psock->bpf_prog = prog;
 
-	setup_timer(&psock->rx_msg_timer, kcm_rx_msg_timeout,
-		    (unsigned long)psock);
+	cb.rcv_msg = kcm_rcv_strparser;
+	cb.abort_parser = NULL;
+	cb.parse_msg = kcm_parse_func_strparser;
+	cb.read_sock_done = kcm_read_sock_done;
 
-	INIT_WORK(&psock->rx_work, psock_rx_work);
-	INIT_DELAYED_WORK(&psock->rx_delayed_work, psock_rx_delayed_work);
+	strp_init(&psock->strp, csk, &cb);
 
 	sock_hold(csk);
 
@@ -1725,8 +1407,8 @@ static int kcm_attach(struct socket *sock, struct socket *csock,
 	psock->save_data_ready = csk->sk_data_ready;
 	psock->save_write_space = csk->sk_write_space;
 	psock->save_state_change = csk->sk_state_change;
-	csk->sk_user_data = psock;
-	csk->sk_data_ready = psock_tcp_data_ready;
+	csk->sk_user_data = &psock->strp;
+	csk->sk_data_ready = strp_tcp_data_ready;
 	csk->sk_write_space = psock_tcp_write_space;
 	csk->sk_state_change = psock_tcp_state_change;
 	write_unlock_bh(&csk->sk_callback_lock);
@@ -1750,7 +1432,7 @@ static int kcm_attach(struct socket *sock, struct socket *csock,
 	spin_unlock_bh(&mux->lock);
 
 	/* Schedule RX work in case there are already bytes queued */
-	queue_work(kcm_wq, &psock->rx_work);
+	strp_check_rcv(&psock->strp);
 
 	return 0;
 }
@@ -1785,6 +1467,7 @@ out:
 	return err;
 }
 
+/* Lower socket lock held */
 static void kcm_unattach(struct kcm_psock *psock)
 {
 	struct sock *csk = psock->sk;
@@ -1798,7 +1481,7 @@ static void kcm_unattach(struct kcm_psock *psock)
 	csk->sk_data_ready = psock->save_data_ready;
 	csk->sk_write_space = psock->save_write_space;
 	csk->sk_state_change = psock->save_state_change;
-	psock->rx_stopped = 1;
+	strp_stop(&psock->strp);
 
 	if (WARN_ON(psock->rx_kcm)) {
 		write_unlock_bh(&csk->sk_callback_lock);
@@ -1821,18 +1504,14 @@ static void kcm_unattach(struct kcm_psock *psock)
 
 	write_unlock_bh(&csk->sk_callback_lock);
 
-	del_timer_sync(&psock->rx_msg_timer);
-	cancel_work_sync(&psock->rx_work);
-	cancel_delayed_work_sync(&psock->rx_delayed_work);
+	strp_done(&psock->strp);
 
 	bpf_prog_put(psock->bpf_prog);
 
-	kfree_skb(psock->rx_skb_head);
-	psock->rx_skb_head = NULL;
-
 	spin_lock_bh(&mux->lock);
 
 	aggregate_psock_stats(&psock->stats, &mux->aggregate_psock_stats);
+	save_strp_stats(&psock->strp, &mux->aggregate_strp_stats);
 
 	KCM_STATS_INCR(mux->stats.psock_unattach);
 
@@ -1915,6 +1594,7 @@ static int kcm_unattach_ioctl(struct socket *sock, struct kcm_unattach *info)
 
 		spin_unlock_bh(&mux->lock);
 
+		/* Lower socket lock should already be held */
 		kcm_unattach(psock);
 
 		err = 0;
@@ -2059,8 +1739,11 @@ static void release_mux(struct kcm_mux *mux)
 	/* Release psocks */
 	list_for_each_entry_safe(psock, tmp_psock,
 				 &mux->psocks, psock_list) {
-		if (!WARN_ON(psock->unattaching))
+		if (!WARN_ON(psock->unattaching)) {
+			lock_sock(psock->strp.sk);
 			kcm_unattach(psock);
+			release_sock(psock->strp.sk);
+		}
 	}
 
 	if (WARN_ON(mux->psocks_cnt))
@@ -2072,6 +1755,8 @@ static void release_mux(struct kcm_mux *mux)
 	aggregate_mux_stats(&mux->stats, &knet->aggregate_mux_stats);
 	aggregate_psock_stats(&mux->aggregate_psock_stats,
 			      &knet->aggregate_psock_stats);
+	aggregate_strp_stats(&mux->aggregate_strp_stats,
+			     &knet->aggregate_strp_stats);
 	list_del_rcu(&mux->kcm_mux_list);
 	knet->count--;
 	mutex_unlock(&knet->mutex);
@@ -2151,6 +1836,13 @@ static int kcm_release(struct socket *sock)
 	 * it will just return.
 	 */
 	__skb_queue_purge(&sk->sk_write_queue);
+
+	/* Set tx_stopped. This is checked when psock is bound to a kcm and we
+	 * get a writespace callback. This prevents further work being queued
+	 * from the callback (unbinding the psock occurs after canceling work.
+	 */
+	kcm->tx_stopped = 1;
+
 	release_sock(sk);
 
 	spin_lock_bh(&mux->lock);
-- 
2.8.0.rc2

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH net-next v2 3/3] strparser: Documentation
  2016-08-01 16:33 [PATCH net-next v2 0/3] strp: Stream parser for messages Tom Herbert
  2016-08-01 16:33 ` [PATCH net-next v2 1/3] strparser: " Tom Herbert
  2016-08-01 16:33 ` [PATCH net-next v2 2/3] kcm: Use stream parser Tom Herbert
@ 2016-08-01 16:33 ` Tom Herbert
  2016-08-01 20:27 ` [PATCH net-next v2 0/3] strp: Stream parser for messages David Miller
  3 siblings, 0 replies; 7+ messages in thread
From: Tom Herbert @ 2016-08-01 16:33 UTC (permalink / raw)
  To: davem, netdev; +Cc: kernel-team

Signed-off-by: Tom Herbert <tom@herbertland.com>
---
 Documentation/networking/strparser.txt | 147 +++++++++++++++++++++++++++++++++
 1 file changed, 147 insertions(+)
 create mode 100644 Documentation/networking/strparser.txt

diff --git a/Documentation/networking/strparser.txt b/Documentation/networking/strparser.txt
new file mode 100644
index 0000000..abfef72
--- /dev/null
+++ b/Documentation/networking/strparser.txt
@@ -0,0 +1,147 @@
+Stream Parser
+-------------
+
+The stream parser (strparser) is a utility that parses messages of an
+application layer protocol running over a TCP connection. The stream
+parser works in conjunction with an upper layer in the kernel to provide
+kernel support for application layer messages. For instance, Kernel
+Connection Multiplexor (KCM) uses the Stream Parser to parse messages
+using a BPF program.
+
+Interface
+---------
+
+The API includes a context structure, a set of callbacks, utility
+functions, and a data_ready function. The callbacks include
+a parse_msg function that is called to perform parsing (e.g.
+BPF parsing in case of KCM), and a rcv_msg function that is called
+when a full message has been completed.
+
+A stream parser can be instantiated for a TCP connection. This is done
+by:
+
+strp_init(struct strparser *strp, struct sock *csk,
+	  struct strp_callbacks *cb)
+
+strp is a struct of type strparser that is allocated by the upper layer.
+csk is the TCP socket associated with the stream parser. Callbacks are
+called by the stream parser.
+
+Callbacks
+---------
+
+There are four callbacks:
+
+int (*parse_msg)(struct strparser *strp, struct sk_buff *skb);
+
+    parse_msg is called to determine the length of the next message
+    in the stream. The upper layer must implement this function. It
+    should parse the sk_buff as containing the headers for the
+    next application layer messages in the stream.
+
+    The skb->cb in the input skb is a struct strp_rx_msg. Only
+    the offset field is relevant in parse_msg and gives the offset
+    where the message starts in the skb.
+
+    The return values of this function are:
+
+    >0 : indicates length of successfully parsed message
+    0  : indicates more data must be received to parse the message
+    -ESTRPIPE : current message should not be processed by the
+          kernel, return control of the socket to userspace which
+          can proceed to read the messages itself
+    other < 0 : Error is parsing, give control back to userspace
+          assuming that synchronization is lost and the stream
+          is unrecoverable (application expected to close TCP socket)
+
+    In the case that an error is returned (return value is less than
+    zero) the stream parser will set the error on TCP socket and wake
+    it up. If parse_msg returned -ESTRPIPE and the stream parser had
+    previously read some bytes for the current message, then the error
+    set on the attached socket is ENODATA since the stream is
+    unrecoverable in that case.
+
+void (*rcv_msg)(struct strparser *strp, struct sk_buff *skb);
+
+    rcv_msg is called when a full message has been received and
+    is queued. The callee must consume the sk_buff; it can
+    call strp_pause to prevent any further messages from being
+    received in rcv_msg (see strp_pause below). This callback
+    must be set.
+
+    The skb->cb in the input skb is a struct strp_rx_msg. This
+    struct contains two fields: offset and full_len. Offset is
+    where the message starts in the skb, and full_len is the
+    the length of the message. skb->len - offset may be greater
+    then full_len since strparser does not trim the skb.
+
+int (*read_sock_done)(struct strparser *strp, int err);
+
+     read_sock_done is called when the stream parser is done reading
+     the TCP socket. The stream parser may read multiple messages
+     in a loop and this function allows cleanup to occur when existing
+     the loop. If the callback is not set (NULL in strp_init) a
+     default function is used.
+
+void (*abort_parser)(struct strparser *strp, int err);
+
+     This function is called when stream parser encounters an error
+     in parsing. The default function stops the stream parser for the
+     TCP socket and sets the error in the socket. The default function
+     can be changed by setting the callback to non-NULL in strp_init.
+
+data_ready
+----------
+
+strp_tcp_data_ready is the TCP data ready function that can be used with
+the stream parser. When the upper layer attaches the TCP socket it
+can set sk_data_ready to this function (after strp_init). sk_user_data
+must be set to the stream parser structure to use this (container_of
+can be used in the other socket callbacks to get the upper layer
+structure from the stream parser structure).
+
+Functions
+---------
+
+The upper layer uses strp_pause and strp_unpause to flow control the
+stream parser. This is needed for instance when the upper layer can't
+immediately process a received message and has to hold it.
+
+strp_stop is called to completely stop stream parser operations. This
+is called internally when the stream parser encounters an error, and
+it is called from the upper layer when unattaching a TCP socket.
+
+strp_done is called to unattach the stream parser from the TCP socket.
+This must be called after the stream processor has be stopped.
+
+strp_check_rcv is called to check for new messages on the socket. This
+is normally called at initialization of the a stream parser instance
+of after strp_unpause.
+
+Statistics
+----------
+
+Various counters are kept for each stream parser for a TCP socket.
+These are in the strp_stats structure. strp_aggr_stats is a convenience
+structure for accumulating statistics for multiple stream parser
+instances. save_strp_stats and aggregate_strp_stats are helper functions
+to save and aggregate statistics.
+
+Message assembly limits
+-----------------------
+
+The stream parser provide mechanisms to limit the resources consumed by
+message assembly.
+
+A timer is set when assembly starts for a new message. The message
+timeout is taken from rcvtime for the associated TCP socket. If the
+timer fires before assembly completes the stream parser is aborted
+and the ETIMEDOUT error is set on the TCP socket.
+
+Message length is limited to the receive buffer size of the associated
+TCP socket. If the length returned by parse_msg is greater than
+the socket buffer size then the stream parser is aborted with
+EMSGSIZE error set on the TCP socket. Note that this makes the
+maximum size of receive skbuffs for a socket with a stream parser
+to be 2*sk_rcvbuf of the TCP socket.
+
-- 
2.8.0.rc2

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* Re: [PATCH net-next v2 0/3] strp: Stream parser for messages
  2016-08-01 16:33 [PATCH net-next v2 0/3] strp: Stream parser for messages Tom Herbert
                   ` (2 preceding siblings ...)
  2016-08-01 16:33 ` [PATCH net-next v2 3/3] strparser: Documentation Tom Herbert
@ 2016-08-01 20:27 ` David Miller
  2016-08-01 20:38   ` Tom Herbert
  3 siblings, 1 reply; 7+ messages in thread
From: David Miller @ 2016-08-01 20:27 UTC (permalink / raw)
  To: tom; +Cc: netdev, kernel-team

From: Tom Herbert <tom@herbertland.com>
Date: Mon, 1 Aug 2016 09:33:17 -0700

>   - Remove GPL module license from strparser.c

Why?

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH net-next v2 0/3] strp: Stream parser for messages
  2016-08-01 20:27 ` [PATCH net-next v2 0/3] strp: Stream parser for messages David Miller
@ 2016-08-01 20:38   ` Tom Herbert
  2016-08-01 20:46     ` David Miller
  0 siblings, 1 reply; 7+ messages in thread
From: Tom Herbert @ 2016-08-01 20:38 UTC (permalink / raw)
  To: David Miller; +Cc: Linux Kernel Network Developers, Kernel Team

On Mon, Aug 1, 2016 at 1:27 PM, David Miller <davem@davemloft.net> wrote:
> From: Tom Herbert <tom@herbertland.com>
> Date: Mon, 1 Aug 2016 09:33:17 -0700
>
>>   - Remove GPL module license from strparser.c
>
> Why?

It looked more consistent with other modules. Is GPL module license
preferred then?

Tom

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH net-next v2 0/3] strp: Stream parser for messages
  2016-08-01 20:38   ` Tom Herbert
@ 2016-08-01 20:46     ` David Miller
  0 siblings, 0 replies; 7+ messages in thread
From: David Miller @ 2016-08-01 20:46 UTC (permalink / raw)
  To: tom; +Cc: netdev, kernel-team

From: Tom Herbert <tom@herbertland.com>
Date: Mon, 1 Aug 2016 13:38:46 -0700

> On Mon, Aug 1, 2016 at 1:27 PM, David Miller <davem@davemloft.net> wrote:
>> From: Tom Herbert <tom@herbertland.com>
>> Date: Mon, 1 Aug 2016 09:33:17 -0700
>>
>>>   - Remove GPL module license from strparser.c
>>
>> Why?
> 
> It looked more consistent with other modules. Is GPL module license
> preferred then?

Yes.

Since this is a facility which is largely unique to the Linux kernel,
things linking to these symbols are almost certainly derived works.

^ permalink raw reply	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2016-08-01 20:56 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-08-01 16:33 [PATCH net-next v2 0/3] strp: Stream parser for messages Tom Herbert
2016-08-01 16:33 ` [PATCH net-next v2 1/3] strparser: " Tom Herbert
2016-08-01 16:33 ` [PATCH net-next v2 2/3] kcm: Use stream parser Tom Herbert
2016-08-01 16:33 ` [PATCH net-next v2 3/3] strparser: Documentation Tom Herbert
2016-08-01 20:27 ` [PATCH net-next v2 0/3] strp: Stream parser for messages David Miller
2016-08-01 20:38   ` Tom Herbert
2016-08-01 20:46     ` David Miller

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.