All of lore.kernel.org
 help / color / mirror / Atom feed
* Re: [MPTCP] [RFC PATCH v4 16/17] mptcp: Implement MPTCP receive path
@ 2018-11-30 22:33 Mat Martineau
  0 siblings, 0 replies; 2+ messages in thread
From: Mat Martineau @ 2018-11-30 22:33 UTC (permalink / raw)
  To: mptcp

[-- Attachment #1: Type: text/plain, Size: 4566 bytes --]


On Fri, 30 Nov 2018, Mat Martineau wrote:

> Parses incoming DSS options and populates outgoing MPTCP ACK
> fields. Clone incoming skbs containing MPTCP headers and propagate them
> to the MMPTCP socket using the error queue. This avoids the need for TCP
> coalesce/collapse bypass for MPTCP, although cloning incoming skbs does
> prevent some coalescing.
>
> Keeping DSS map handling separate from data reading makes it easier to
> add multiple-subflow receiving, since the receive code can determine
> which subflow contains relevant data or data that needs to be purged
> without touching the subflow data queues.
>
> MPTCP ACK values are now populated from an atomic value stored in the
> connection socket rather than carried in the extended control block,
> which makes use of the most up-to-date sequence number and allows the
> MPTCP ACK to be populated on TCP ACK packets that have no payload.
>
> Signed-off-by: Mat Martineau <mathew.j.martineau(a)linux.intel.com>
> ---
> include/linux/tcp.h   |  12 ++
> include/net/mptcp.h   |  43 ++++-
> net/ipv4/tcp_input.c  |   4 +
> net/ipv4/tcp_output.c | 120 +++++++-------
> net/mptcp/options.c   |  98 +++++++++++-
> net/mptcp/protocol.c  | 360 +++++++++++++++++++++++++++++++++++++++---
> net/mptcp/subflow.c   |  29 +++-
> 7 files changed, 574 insertions(+), 92 deletions(-)
>

...

> diff --git a/include/net/mptcp.h b/include/net/mptcp.h
> index b7a392652aed..d3b7083859b7 100644
> --- a/include/net/mptcp.h
> +++ b/include/net/mptcp.h
> @@ -38,14 +38,14 @@
> /* MPTCP connection sock */
> struct mptcp_sock {
> 	/* inet_connection_sock must be the first member */
> -	struct	inet_connection_sock sk;
> -	u64	local_key;
> -	u64	remote_key;
> -	u64	write_seq;
> -	u64	ack_seq;
> -	u32	token;
> -	struct	socket *connection_list; /* @@ needs to be a list */
> -	struct	socket *subflow; /* outgoing connect, listener or !mp_capable */
> +	struct inet_connection_sock sk;
> +	u64		local_key;
> +	u64		remote_key;
> +	u64		write_seq;
> +	atomic64_t	ack_seq;
> +	u32		token;
> +	struct socket	*connection_list; /* @@ needs to be a list */
> +	struct socket	*subflow; /* outgoing connect, listener or !mp_capable */
> };
>
> static inline struct mptcp_sock *mptcp_sk(const struct sock *sk)
> @@ -81,16 +81,23 @@ struct subflow_sock {
> 	/* tcp_sock must be the first member */
> 	struct	tcp_sock sk;
> 	u64	local_key;
> +	u64	map_seq;
> +	u32	map_subflow_seq;
> 	u32	token;
> 	u64	idsn;
> 	u64	remote_key;
> 	u32	rel_write_seq;
> +	u32	ssn_offset;
> +	u16	map_dll;
> 	bool	request_mptcp;	// send MP_CAPABLE
> 	bool	checksum;
> 	bool	version;
> 	bool	mp_capable;	// remote is MPTCP capable
> 	bool	fourth_ack;	// send initial DSS
> +	bool	conn_finished;
> +	bool	map_valid;
> 	struct	sock *conn;	// parent mptcp_sock
> +	void	(*tcp_sk_data_ready)(struct sock *sk);
> };
>
> static inline struct subflow_sock *subflow_sk(const struct sock *sk)
> @@ -98,6 +105,16 @@ static inline struct subflow_sock *subflow_sk(const struct sock *sk)
> 	return (struct subflow_sock *)sk;
> }
>
> +static inline struct subflow_sock *subflow_tp(const struct tcp_sock *tp)
> +{
> +	return (struct subflow_sock *)tp;
> +}
> +
> +static inline struct sock *sock_sk(const struct subflow_sock *sk)
> +{
> +	return (struct sock *)sk;
> +}
> +
> struct subflow_request_sock {
> 	struct	tcp_request_sock sk;
> 	u8	mp_capable : 1,
> @@ -109,6 +126,7 @@ struct subflow_request_sock {
> 	u32	token;
> 	u64	idsn;
> 	u64	remote_key;
> +	u32	ssn_offset;
> };
>
> static inline
> @@ -137,6 +155,10 @@ void mptcp_get_options(const struct sk_buff *skb,
> 		       struct tcp_options_received *options);
>
> void mptcp_cb_copy(const struct sk_buff *from, struct sk_buff *to);
> +void mptcp_cb_set(struct sk_buff *skb, struct mptcp_skb_cb *mcb);
> +
> +void mptcp_attach_dss(struct sock *sk, struct sk_buff *original_skb,
> +		      struct tcp_options_received *opt_rx);

I got an automated kbuild email - I didn't define a stub for this when 
CONFIG_MPTCP is not set...

>
> extern const struct tcp_request_sock_ops tcp_request_sock_ipv4_ops;
>
> @@ -197,5 +219,10 @@ void mptcp_cb_copy(const struct sk_buff *from, struct sk_buff *to)
> {
> }
>
> +static inline void mptcp_queue_headers(struct sock *sk,
> +				       struct sk_buff *original_skb)
> +{
> +}
> +

...because mptcp_attach_dss should replace this stub.

> #endif /* CONFIG_MPTCP */
> #endif /* __NET_MPTCP_H */

...

--
Mat Martineau
Intel OTC

^ permalink raw reply	[flat|nested] 2+ messages in thread

* [MPTCP] [RFC PATCH v4 16/17] mptcp: Implement MPTCP receive path
@ 2018-11-30 20:11 Mat Martineau
  0 siblings, 0 replies; 2+ messages in thread
From: Mat Martineau @ 2018-11-30 20:11 UTC (permalink / raw)
  To: mptcp

[-- Attachment #1: Type: text/plain, Size: 28469 bytes --]

Parses incoming DSS options and populates outgoing MPTCP ACK
fields. Clone incoming skbs containing MPTCP headers and propagate them
to the MMPTCP socket using the error queue. This avoids the need for TCP
coalesce/collapse bypass for MPTCP, although cloning incoming skbs does
prevent some coalescing.

Keeping DSS map handling separate from data reading makes it easier to
add multiple-subflow receiving, since the receive code can determine
which subflow contains relevant data or data that needs to be purged
without touching the subflow data queues.

MPTCP ACK values are now populated from an atomic value stored in the
connection socket rather than carried in the extended control block,
which makes use of the most up-to-date sequence number and allows the
MPTCP ACK to be populated on TCP ACK packets that have no payload.

Signed-off-by: Mat Martineau <mathew.j.martineau(a)linux.intel.com>
---
 include/linux/tcp.h   |  12 ++
 include/net/mptcp.h   |  43 ++++-
 net/ipv4/tcp_input.c  |   4 +
 net/ipv4/tcp_output.c | 120 +++++++-------
 net/mptcp/options.c   |  98 +++++++++++-
 net/mptcp/protocol.c  | 360 +++++++++++++++++++++++++++++++++++++++---
 net/mptcp/subflow.c   |  29 +++-
 7 files changed, 574 insertions(+), 92 deletions(-)

diff --git a/include/linux/tcp.h b/include/linux/tcp.h
index b54ab3b5546a..635c4c9d4d8b 100644
--- a/include/linux/tcp.h
+++ b/include/linux/tcp.h
@@ -112,6 +112,18 @@ struct tcp_options_received {
 		u8      flags;
 		u64     sndr_key;
 		u64     rcvr_key;
+		u64	ack;
+		u64	seq;
+		u32	subflow_seq;
+		u16	dll;
+		__sum16	checksum;
+		u8	use_ack:1,
+			ack64:1,
+			use_map:1,
+			dsn64:1,
+			use_checksum:1,
+			data_fin:1,
+			__unused:2;
 	} mptcp;
 
 };
diff --git a/include/net/mptcp.h b/include/net/mptcp.h
index b7a392652aed..d3b7083859b7 100644
--- a/include/net/mptcp.h
+++ b/include/net/mptcp.h
@@ -38,14 +38,14 @@
 /* MPTCP connection sock */
 struct mptcp_sock {
 	/* inet_connection_sock must be the first member */
-	struct	inet_connection_sock sk;
-	u64	local_key;
-	u64	remote_key;
-	u64	write_seq;
-	u64	ack_seq;
-	u32	token;
-	struct	socket *connection_list; /* @@ needs to be a list */
-	struct	socket *subflow; /* outgoing connect, listener or !mp_capable */
+	struct inet_connection_sock sk;
+	u64		local_key;
+	u64		remote_key;
+	u64		write_seq;
+	atomic64_t	ack_seq;
+	u32		token;
+	struct socket	*connection_list; /* @@ needs to be a list */
+	struct socket	*subflow; /* outgoing connect, listener or !mp_capable */
 };
 
 static inline struct mptcp_sock *mptcp_sk(const struct sock *sk)
@@ -81,16 +81,23 @@ struct subflow_sock {
 	/* tcp_sock must be the first member */
 	struct	tcp_sock sk;
 	u64	local_key;
+	u64	map_seq;
+	u32	map_subflow_seq;
 	u32	token;
 	u64	idsn;
 	u64	remote_key;
 	u32	rel_write_seq;
+	u32	ssn_offset;
+	u16	map_dll;
 	bool	request_mptcp;	// send MP_CAPABLE
 	bool	checksum;
 	bool	version;
 	bool	mp_capable;	// remote is MPTCP capable
 	bool	fourth_ack;	// send initial DSS
+	bool	conn_finished;
+	bool	map_valid;
 	struct	sock *conn;	// parent mptcp_sock
+	void	(*tcp_sk_data_ready)(struct sock *sk);
 };
 
 static inline struct subflow_sock *subflow_sk(const struct sock *sk)
@@ -98,6 +105,16 @@ static inline struct subflow_sock *subflow_sk(const struct sock *sk)
 	return (struct subflow_sock *)sk;
 }
 
+static inline struct subflow_sock *subflow_tp(const struct tcp_sock *tp)
+{
+	return (struct subflow_sock *)tp;
+}
+
+static inline struct sock *sock_sk(const struct subflow_sock *sk)
+{
+	return (struct sock *)sk;
+}
+
 struct subflow_request_sock {
 	struct	tcp_request_sock sk;
 	u8	mp_capable : 1,
@@ -109,6 +126,7 @@ struct subflow_request_sock {
 	u32	token;
 	u64	idsn;
 	u64	remote_key;
+	u32	ssn_offset;
 };
 
 static inline
@@ -137,6 +155,10 @@ void mptcp_get_options(const struct sk_buff *skb,
 		       struct tcp_options_received *options);
 
 void mptcp_cb_copy(const struct sk_buff *from, struct sk_buff *to);
+void mptcp_cb_set(struct sk_buff *skb, struct mptcp_skb_cb *mcb);
+
+void mptcp_attach_dss(struct sock *sk, struct sk_buff *original_skb,
+		      struct tcp_options_received *opt_rx);
 
 extern const struct tcp_request_sock_ops tcp_request_sock_ipv4_ops;
 
@@ -197,5 +219,10 @@ void mptcp_cb_copy(const struct sk_buff *from, struct sk_buff *to)
 {
 }
 
+static inline void mptcp_queue_headers(struct sock *sk,
+				       struct sk_buff *original_skb)
+{
+}
+
 #endif /* CONFIG_MPTCP */
 #endif /* __NET_MPTCP_H */
diff --git a/net/ipv4/tcp_input.c b/net/ipv4/tcp_input.c
index fca76bbaa5c1..0c048ac88f09 100644
--- a/net/ipv4/tcp_input.c
+++ b/net/ipv4/tcp_input.c
@@ -5655,6 +5655,10 @@ void tcp_rcv_established(struct sock *sk, struct sk_buff *skb)
 	/* Process urgent data. */
 	tcp_urg(sk, skb, th);
 
+	/* Prepare MPTCP sequence data */
+	if (tcp_sk(sk)->is_mptcp)
+		mptcp_attach_dss(sk, skb, &tp->rx_opt);
+
 	/* step 7: process the segment text */
 	tcp_data_queue(sk, skb);
 
diff --git a/net/ipv4/tcp_output.c b/net/ipv4/tcp_output.c
index 9ab6f455fc41..b36031464d21 100644
--- a/net/ipv4/tcp_output.c
+++ b/net/ipv4/tcp_output.c
@@ -418,8 +418,8 @@ static inline bool tcp_urg_mode(const struct tcp_sock *tp)
 #define OPTION_MPTCP_MPC_SYN	(1 << 0)
 #define OPTION_MPTCP_MPC_SYNACK	(1 << 1)
 #define OPTION_MPTCP_MPC_ACK	(1 << 2)
-#define OPTION_MPTCP_MPC_DSS_MAP	(1 << 6)
-#define OPTION_MPTCP_MPC_DSS_ACK	(1 << 7)
+#define OPTION_MPTCP_DSS_MAP	(1 << 6)
+#define OPTION_MPTCP_DSS_ACK	(1 << 7)
 
 struct tcp_out_options {
 	u16 options;		/* bit field of OPTION_* */
@@ -471,16 +471,12 @@ static void mptcp_options_write(__be32 *ptr, struct sk_buff *skb,
 		}
 	}
 
-	if ((OPTION_MPTCP_MPC_DSS_MAP |
-	     OPTION_MPTCP_MPC_DSS_ACK) & opts->suboptions) {
-		struct mptcp_skb_cb *mcb = mptcp_skb_priv_cb(skb);
-		bool write_ack = (OPTION_MPTCP_MPC_DSS_ACK & opts->suboptions) && skb->priv_used && mcb->use_ack;
-		bool write_map = (OPTION_MPTCP_MPC_DSS_MAP & opts->suboptions) && skb->priv_used && mcb->use_map;
+	if ((OPTION_MPTCP_DSS_MAP | OPTION_MPTCP_DSS_ACK) & opts->suboptions) {
+		bool write_ack = !!(OPTION_MPTCP_DSS_ACK & opts->suboptions);
+		bool write_map = (OPTION_MPTCP_DSS_MAP & opts->suboptions) &&
+			skb->priv_used && mptcp_skb_priv_cb(skb)->use_map;
 		u8 flags = 0;
 		u8 len = 4;
-		u8 *p = (u8 *)ptr;
-
-		BUG_ON(!write_ack && !write_map);
 
 		if (write_ack) {
 			len += 8;
@@ -488,46 +484,60 @@ static void mptcp_options_write(__be32 *ptr, struct sk_buff *skb,
 		}
 
 		if (write_map) {
+			pr_debug("Updating DSS length and flags for map");
 			len += 14;
 
-			if (mcb->use_checksum)
+			if (mptcp_skb_priv_cb(skb)->use_checksum)
 				len += 2;
 
 			/* Use only 64-bit mapping flags for now, add
 			 * support for optional 32-bit mappings later.
 			 */
 			flags |= 0x0c;
-			if (mcb->data_fin)
+			if (mptcp_skb_priv_cb(skb)->data_fin)
 				flags |= 0x10;
 		}
 
-		*p++ = 0x1e; // TCP option: Multipath TCP
-		*p++ = len;  // length
-		*p++ = 0x20; // subtype=DSS
-		*p++ = flags;
+		*ptr++ = htonl((0x1e << 24) |  // TCP option: Multipath TCP
+			       (len  << 16) |  // length
+			       (0x20 <<  8) |  // subtype=DSS
+			       (flags));
 
 		if (write_ack) {
-			*(__be64 *)p = cpu_to_be64(mcb->data_ack);
-			p += 8;
+			struct mptcp_sock *msk = mptcp_sk(subflow_tp(tp)->conn);
+			u64 ack_seq;
+			__be64 ack;
+
+			if (msk) {
+				ack_seq = atomic64_read(&msk->ack_seq);
+			} else {
+				crypto_key_sha1(subflow_tp(tp)->remote_key,
+						NULL, &ack_seq);
+				ack_seq++;
+			}
+
+			pr_debug("ack=%llu", ack_seq);
+			ack = cpu_to_be64(ack_seq);
+			memcpy((u8 *) ptr, (u8 *) &ack, 8);
+			ptr += 2;
 		}
 
 		if (write_map) {
-			*(__be64 *)p = cpu_to_be64(mcb->data_seq);
-			p += 8;
-
-			*(__be32 *)p = htonl(mcb->subflow_seq);
-			p += 4;
+			struct mptcp_skb_cb *mcb = mptcp_skb_priv_cb(skb);
+			u16 checksum;
+			__be64 dss;
 
-			*(__be16 *)p = htons(mcb->dll);
-			p += 2;
+			pr_debug("Writing map values");
+			dss = cpu_to_be64(mcb->data_seq);
+			memcpy((u8 *) ptr, &dss, 8);
+			ptr += 2;
+			*ptr++ = htonl(mcb->subflow_seq);
 
-			if (mcb->use_checksum) {
-				*(__be16 *)p = htons(mcb->checksum);
-				p += 2;
-			} else {
-				*p++ = TCPOPT_NOP;
-				*p++ = TCPOPT_NOP;
-			}
+			if (mcb->use_checksum)
+				checksum = (__force u16) mcb->checksum;
+			else
+				checksum = TCPOPT_NOP << 8 | TCPOPT_NOP;
+			*ptr++ = htonl(mcb->dll << 16 | checksum);
 		}
 	}
 #endif
@@ -894,50 +904,43 @@ static unsigned int tcp_established_options(struct sock *sk, struct sk_buff *skb
 				opts->rcvr_key = remote_key;
 				size += TCPOLEN_MPTCP_MPC_ACK;
 			}
-		} else if (subflow_sk(sk)->mp_capable && skb && skb->priv_used) {
-			struct mptcp_skb_cb *cb;
+		} else if (subflow_sk(sk)->mp_capable && skb) {
 			unsigned int dss_size = 0;
-			u16 options = 0;
+			unsigned int ack_size = 8;
+			u16 suboptions = 0;
 
-			cb = mptcp_skb_priv_cb(skb);
-
-			if (cb->use_map) {
+			if (skb->priv_used && mptcp_skb_priv_cb(skb)->use_map) {
 				unsigned int map_size = 18;
 
-				if (cb->use_checksum)
+				if (mptcp_skb_priv_cb(skb)->use_checksum)
 					map_size += 2;
 
 				if (map_size <= remaining) {
 					remaining -= map_size;
 					dss_size = map_size;
-					opts->options |= OPTION_MPTCP;
-					opts->suboptions = OPTION_MPTCP_MPC_DSS_MAP;
+					suboptions = OPTION_MPTCP_DSS_MAP;
 				} else {
 					WARN(1, "MPTCP: Map dropped");
 				}
 			}
 
-			if (cb->use_ack) {
-				unsigned int ack_size = 8;
-
-				/* Add kind/length/subtype/flag
-				 * overhead if mapping not populated
-				 */
-				if (dss_size == 0)
-					ack_size += 4;
+			/* Add kind/length/subtype/flag
+			 * overhead if mapping not populated
+			 */
+			if (dss_size == 0)
+				ack_size += 4;
 
-				if (ack_size <= remaining) {
-					dss_size += ack_size;
-					opts->options |= OPTION_MPTCP;
-					opts->suboptions |= OPTION_MPTCP_MPC_DSS_ACK;
-				} else {
-					WARN(1, "MPTCP: Ack dropped");
-				}
+			if (ack_size <= remaining) {
+				dss_size += ack_size;
+				suboptions |= OPTION_MPTCP_DSS_ACK;
+			} else {
+				WARN(1, "MPTCP: Ack dropped");
 			}
 
 			if (dss_size) {
 				size += ALIGN(dss_size, 4);
-				opts->options |= options;
+				opts->options |= OPTION_MPTCP;
+				opts->suboptions = suboptions;
 			}
 		}
 	}
@@ -3833,6 +3836,9 @@ void __tcp_send_ack(struct sock *sk, u32 rcv_nxt)
 	skb_set_tcp_pure_ack(buff);
 
 	/* Send it off, this clears delayed acks for us. */
+	if (tcp_sk(sk)->is_mptcp)
+		pr_debug("mptcp sk=%p", sk);
+
 	__tcp_transmit_skb(sk, buff, 0, (__force gfp_t)0, rcv_nxt);
 }
 EXPORT_SYMBOL_GPL(__tcp_send_ack);
diff --git a/net/mptcp/options.c b/net/mptcp/options.c
index 266a9f7fed0d..eea1a5802483 100644
--- a/net/mptcp/options.c
+++ b/net/mptcp/options.c
@@ -21,8 +21,8 @@ void mptcp_parse_option(const unsigned char *ptr, int opsize,
 			struct tcp_options_received *opt_rx)
 {
 	u8 subtype;
+	int expected_opsize;
 
-	opsize -= 2;
 	subtype = *ptr++;
 
 	/* MPTCPOPT_MP_CAPABLE
@@ -81,6 +81,68 @@ void mptcp_parse_option(const unsigned char *ptr, int opsize,
 	case 0x20:
 		pr_debug("DSS");
 		opt_rx->mptcp.dss = 1;
+
+		opt_rx->mptcp.flags = (*ptr++) & 0x1F;
+		opt_rx->mptcp.data_fin = (opt_rx->mptcp.flags & 0x10) != 0;
+		opt_rx->mptcp.dsn64 = (opt_rx->mptcp.flags & 0x08) != 0;
+		opt_rx->mptcp.use_map = (opt_rx->mptcp.flags & 0x04) != 0;
+		opt_rx->mptcp.ack64 = (opt_rx->mptcp.flags & 0x02) != 0;
+		opt_rx->mptcp.use_ack = (opt_rx->mptcp.flags & 0x01);
+
+		pr_debug("data_fin=%d dsn64=%d use_map=%d ack64=%d use_ack=%d",
+			 opt_rx->mptcp.data_fin, opt_rx->mptcp.dsn64,
+			 opt_rx->mptcp.use_map, opt_rx->mptcp.ack64,
+			 opt_rx->mptcp.use_ack);
+
+		expected_opsize = 0;
+
+		if (opt_rx->mptcp.use_ack) {
+			expected_opsize = 4;
+			if (opt_rx->mptcp.ack64)
+				expected_opsize += 4;
+
+			if (opsize < expected_opsize)
+				break;
+
+			if (opt_rx->mptcp.ack64) {
+				opt_rx->mptcp.ack = get_unaligned_be64(ptr);
+				ptr += 8;
+			} else {
+				opt_rx->mptcp.ack = get_unaligned_be32(ptr);
+				ptr += 4;
+			}
+
+			pr_debug("ack=%llu", opt_rx->mptcp.ack);
+		}
+
+		if (opt_rx->mptcp.use_map) {
+			expected_opsize += 12;
+			if (opt_rx->mptcp.dsn64)
+				expected_opsize += 4;
+
+			if (opsize < expected_opsize)
+				break;
+
+			if (opt_rx->mptcp.dsn64) {
+				opt_rx->mptcp.seq = get_unaligned_be64(ptr);
+				ptr += 8;
+			} else {
+				opt_rx->mptcp.seq = get_unaligned_be32(ptr);
+				ptr += 4;
+			}
+
+			opt_rx->mptcp.subflow_seq = get_unaligned_be32(ptr);
+			ptr += 4;
+
+			opt_rx->mptcp.dll = get_unaligned_be16(ptr);
+			ptr += 2;
+
+			opt_rx->mptcp.checksum = get_unaligned_be16(ptr);
+
+			pr_debug("seq=%llu subflow_seq=%u dll=%u ck=%u",
+				 opt_rx->mptcp.seq, opt_rx->mptcp.subflow_seq,
+				 opt_rx->mptcp.dll, opt_rx->mptcp.checksum);
+		}
 		break;
 
 	/* MPTCPOPT_ADD_ADDR
@@ -204,3 +266,37 @@ unsigned int mptcp_synack_options(struct request_sock *req, u64 *local_key,
 	}
 	return subflow_req->mp_capable;
 }
+
+void mptcp_attach_dss(struct sock *sk, struct sk_buff *skb,
+		      struct tcp_options_received *opt_rx)
+{
+	struct mptcp_skb_cb *mcb;
+
+	if (!opt_rx->mptcp.dss)
+		return;
+
+	mcb = kzalloc(sizeof(*mcb), GFP_ATOMIC);
+	if (!mcb)
+		return;
+
+	if (opt_rx->mptcp.use_map) {
+		mcb->data_seq = opt_rx->mptcp.seq;
+		mcb->subflow_seq = opt_rx->mptcp.subflow_seq;
+		mcb->dll = opt_rx->mptcp.dll;
+		mcb->checksum = opt_rx->mptcp.checksum;
+		mcb->use_map = 1;
+		mcb->dsn64 = opt_rx->mptcp.dsn64;
+		mcb->use_checksum = opt_rx->mptcp.use_checksum;
+	}
+
+	if (opt_rx->mptcp.use_ack) {
+		mcb->data_ack = opt_rx->mptcp.ack;
+		mcb->use_ack = 1;
+		mcb->ack64 = opt_rx->mptcp.ack64;
+	}
+
+	mcb->data_fin = opt_rx->mptcp.data_fin;
+
+	refcount_set(&mcb->refcnt, 1);
+	mptcp_cb_set(skb, mcb);
+}
diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
index f14793c40077..117c83eab584 100644
--- a/net/mptcp/protocol.c
+++ b/net/mptcp/protocol.c
@@ -16,6 +16,8 @@
 #include <linux/kernel.h>
 #include <linux/module.h>
 #include <linux/netdevice.h>
+#include <linux/sched/signal.h>
+#include <linux/atomic.h>
 #include <net/sock.h>
 #include <net/inet_common.h>
 #include <net/inet_hashtables.h>
@@ -23,6 +25,12 @@
 #include <net/tcp.h>
 #include <net/mptcp.h>
 
+static inline bool before64(__u64 seq1, __u64 seq2)
+{
+	return (__s64)(seq1-seq2) < 0;
+}
+#define after64(seq2, seq1)	before64(seq1, seq2)
+
 void mptcp_cb_copy(const struct sk_buff *from, struct sk_buff *to)
 {
 	struct mptcp_skb_cb *mcb = from->priv;
@@ -48,6 +56,16 @@ static void mptcp_cb_destroy(struct sk_buff *skb)
 	skb->priv_used = 0;
 }
 
+void mptcp_cb_set(struct sk_buff *skb, struct mptcp_skb_cb *mcb)
+{
+	if (skb->priv_used)
+		mptcp_cb_destroy(skb);
+
+	skb->priv = mcb;
+	skb->priv_destructor = mptcp_cb_destroy;
+	skb->priv_used = 1;
+}
+
 static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 {
 	struct mptcp_sock *msk = mptcp_sk(sk);
@@ -126,15 +144,12 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 	skb = tcp_write_queue_tail(ssk);
 
 	refcount_set(&mcb->refcnt, 1);
-	mcb->data_ack = msk->ack_seq;
 	mcb->data_seq = msk->write_seq;
 	mcb->subflow_seq = subflow_sk(ssk)->rel_write_seq;
 	mcb->dll = ret;
 	mcb->checksum = 0xbeef;
 	mcb->use_map = 1;
 	mcb->dsn64 = 1;
-	mcb->use_ack = 1;
-	mcb->ack64 = 1;
 
 	if (mcb->use_map) {
 		pr_debug("data_seq=%llu subflow_seq=%u "
@@ -144,9 +159,7 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 			 mcb->dsn64);
 	}
 
-	skb->priv = mcb;
-	skb->priv_destructor = mptcp_cb_destroy;
-	skb->priv_used = 1;
+	mptcp_cb_set(skb, mcb);
 
 	/* mcb memory is now owned by skb */
 	mcb = NULL;
@@ -164,21 +177,304 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 	return ret;
 }
 
+struct mptcp_read_arg {
+	struct msghdr *msg;
+};
+
+static u64 expand_seq(u64 old_seq, u16 old_dll, u64 seq)
+{
+	if ((u32)seq == (u32)old_seq)
+		return old_seq;
+
+	/* Assume map covers data not mapped yet. */
+	return seq | ((old_seq + old_dll + 1) & ~0xFFFFFFFFULL);
+}
+
+static u64 get_mapped_dsn(struct subflow_sock *subflow)
+{
+	u32 map_offset = (tcp_sk(sock_sk(subflow))->copied_seq -
+			  subflow->ssn_offset -
+			  subflow->map_subflow_seq);
+
+	return subflow->map_seq + map_offset;
+}
+
+static int mptcp_read_actor(read_descriptor_t *desc, struct sk_buff *skb,
+			    unsigned int offset, size_t len)
+{
+	struct mptcp_read_arg *arg = desc->arg.data;
+	size_t copy_len;
+
+	copy_len = min(desc->count, len);
+
+	if (likely(arg->msg)) {
+		int err;
+
+		err = skb_copy_datagram_msg(skb, offset, arg->msg, copy_len);
+		if (err) {
+			pr_debug("error path");
+			desc->error = err;
+			return err;
+		}
+	} else {
+		pr_debug("Flushing skb payload");
+	}
+
+	// MSG_PEEK support? Other flags? MSG_TRUNC?
+
+	desc->count -= copy_len;
+
+	pr_debug("consumed %zu bytes, %zu left", copy_len, desc->count);
+	return copy_len;
+}
+
+static int mptcp_flush_actor(read_descriptor_t *desc, struct sk_buff *skb,
+			     unsigned int offset, size_t len)
+{
+	pr_debug("Flushing one skb with %zu of %zu bytes remaining",
+		 len, len + offset);
+
+	desc->count = 0;
+
+	return len;
+}
+
+enum mapping_status {
+	MAPPING_ADDED,
+	MAPPING_MISSING,
+	MAPPING_EMPTY,
+	MAPPING_DATA_FIN
+};
+
+static enum mapping_status mptcp_get_mapping(struct sock *ssk)
+{
+	struct subflow_sock *subflow = subflow_sk(ssk);
+	struct mptcp_skb_cb *mcb;
+	struct sk_buff *skb;
+
+	skb = skb_peek(&ssk->sk_receive_queue);
+	if (!skb) {
+		pr_debug("Empty queue");
+		return MAPPING_EMPTY;
+	} else if (!skb->priv_used) {
+		/* This is expected for non-DSS data packets */
+		return MAPPING_MISSING;
+	}
+
+	mcb = mptcp_skb_priv_cb(skb);
+
+	if (!mcb->use_map)
+		return MAPPING_MISSING;
+
+	pr_debug("seq=%llu is64=%d ssn=%u dll=%u ck=%u",
+		 mcb->data_seq, mcb->dsn64, mcb->subflow_seq, mcb->dll,
+		 mcb->checksum);
+
+	if (mcb->dll == 0) {
+		pr_err("Infinite mapping not handled");
+	} else if (mcb->subflow_seq == 0 &&
+		   mcb->data_fin == 1) {
+		pr_debug("DATA_FIN with no payload");
+		return MAPPING_DATA_FIN;
+	}
+
+	if (subflow->map_valid)
+		pr_warn("Replaced mapping before it was done");
+
+	if (!mcb->dsn64) {
+		subflow->map_seq = expand_seq(subflow->map_seq,
+					      subflow->map_dll,
+					      mcb->data_seq);
+		pr_debug("expanded seq=%llu", subflow->map_seq);
+	} else {
+		subflow->map_seq = mcb->data_seq;
+	}
+
+	subflow->map_subflow_seq = mcb->subflow_seq;
+	subflow->map_dll = mcb->dll;
+	subflow->map_valid = true;
+	pr_debug("new map seq=%llu subflow_seq=%u dll=%u",
+		 subflow->map_seq, subflow->map_subflow_seq,
+		 subflow->map_dll);
+
+	return MAPPING_ADDED;
+}
+
 static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
 			 int nonblock, int flags, int *addr_len)
 {
 	struct mptcp_sock *msk = mptcp_sk(sk);
-	struct socket *subflow;
+	struct subflow_sock *subflow;
+	struct mptcp_read_arg arg;
+	read_descriptor_t desc;
+	struct tcp_sock *tp;
+	struct sock *ssk;
+	int copied = 0;
+	long timeo;
+
+	if (!msk->connection_list) {
+		pr_debug("fallback-read subflow=%p", msk->subflow->sk);
+		return sock_recvmsg(msk->subflow, msg, flags);
+	}
+
+	ssk = msk->connection_list->sk;
+	subflow = subflow_sk(ssk);
+	tp = tcp_sk(ssk);
+
+	desc.arg.data = &arg;
+	desc.error = 0;
+
+	timeo = sock_rcvtimeo(sk, nonblock);
+
+	len = min_t(size_t, len, INT_MAX);
+
+	while (copied < len) {
+		enum mapping_status status;
+		size_t discard_len = 0;
+		int bytes_read;
+		u64 ack_seq;
+		u64 old_ack;
+		u32 ssn;
+
+		status = mptcp_get_mapping(ssk);
+
+		if (status == MAPPING_ADDED) {
+			/* Common case, but nothing to do here */
+		} else if (status == MAPPING_MISSING) {
+			if (!subflow->map_valid) {
+				pr_debug("Mapping missing, trying next skb");
+
+				arg.msg = NULL;
+				desc.count = SIZE_MAX;
+
+				bytes_read = tcp_read_sock(ssk, &desc,
+							   mptcp_flush_actor);
+
+				if (bytes_read < 0)
+					break;
+				else
+					continue;
+			}
+		} else if (status == MAPPING_EMPTY) {
+			goto wait_for_data;
+		} else if (status == MAPPING_DATA_FIN) {
+			/* TODO: Handle according to RFC 6824 */
+			if (!copied) {
+				pr_err("Can't read after DATA_FIN");
+				copied = -ENOTCONN;
+			}
+
+			break;
+		}
+
+		ssn = tcp_sk(ssk)->copied_seq - subflow->ssn_offset;
+		old_ack = atomic64_read(&msk->ack_seq);
+
+		if (unlikely(before(ssn, subflow->map_subflow_seq))) {
+			/* Mapping covers data later in the subflow stream,
+			 * discard unmapped data. */
+			pr_debug("Mapping covers data later in stream");
+			discard_len = subflow->map_subflow_seq - ssn;
+		} else if (unlikely(!before(ssn, (subflow->map_subflow_seq +
+						  subflow->map_dll)))) {
+			/* Mapping ends earlier in the subflow stream.
+			 * Invalidate the mapping and try again.
+			 */
+			subflow->map_valid = false;
+			pr_debug("Invalid mapping ssn=%d map_seq=%d map_dll=%d",
+				 ssn, subflow->map_subflow_seq, subflow->map_dll);
+			continue;
+		} else {
+			ack_seq = get_mapped_dsn(subflow);
+
+			if (before64(ack_seq, old_ack)) {
+				/* Mapping covers data already received,
+				 * discard data in the current mapping
+				 * and invalidate the map
+				 */
+				u64 map_end_dsn = subflow->map_seq +
+					subflow->map_dll;
+				discard_len = min(map_end_dsn - ack_seq,
+						  old_ack - ack_seq);
+				subflow->map_valid = false;
+				pr_debug("Duplicate MPTCP data found");
+			}
+		}
+
+		if (discard_len) {
+			/* Discard data for the current mapping.
+			 */
+			pr_debug("Discard %zu bytes", discard_len);
+
+			arg.msg = NULL;
+			desc.count = discard_len;
+
+			bytes_read = tcp_read_sock(ssk, &desc,
+						   mptcp_read_actor);
+
+			if (bytes_read < 0)
+				break;
+			else if (bytes_read == discard_len)
+				continue;
+			else
+				goto wait_for_data;
+		}
+
+		/* Read mapped data */
+		desc.count = ssn - subflow->map_subflow_seq + subflow->map_dll;
+		arg.msg = msg;
+		bytes_read = tcp_read_sock(ssk, &desc, mptcp_read_actor);
+		if (bytes_read < 0)
+			break;
+
+		/* Refresh current MPTCP sequence number based on subflow seq */
+		ack_seq = get_mapped_dsn(subflow);
+
+		if (before64(old_ack, ack_seq)) {
+			atomic64_set(&msk->ack_seq, ack_seq);
+		}
+
+		if (!before(tcp_sk(ssk)->copied_seq - subflow->ssn_offset,
+			    subflow->map_subflow_seq + subflow->map_dll)) {
+			subflow->map_valid = false;
+			pr_debug("Done with mapping: seq=%u dll=%u",
+				 subflow->map_subflow_seq, subflow->map_dll);
+		}
+
+		copied += bytes_read;
+
+wait_for_data:
+		if (copied)
+			break;
+
+		if (tp->urg_data && tp->urg_seq == tp->copied_seq) {
+			pr_err("Urgent data present, cannot proceed");
+			break;
+		}
+
+		if (ssk->sk_err || ssk->sk_state == TCP_CLOSE ||
+		    (ssk->sk_shutdown & RCV_SHUTDOWN) || !timeo ||
+		    signal_pending(current)) {
+			pr_debug("nonblock or error");
+			break;
+		}
+
+		/* Handle blocking and retry read if needed.
+		 *
+		 * Wait on MPTCP sock, the subflow will notify via data ready.
+		 */
+
+		pr_debug("block");
+		release_sock(ssk);
+		sk_wait_data(sk, &timeo, NULL);
+		lock_sock(ssk);
 
-	if (msk->connection_list) {
-		subflow = msk->connection_list;
-		pr_debug("conn_list->subflow=%p", subflow->sk);
-	} else {
-		subflow = msk->subflow;
-		pr_debug("subflow=%p", subflow->sk);
 	}
 
-	return sock_recvmsg(subflow, msg, flags);
+	release_sock(ssk);
+	release_sock(sk);
+
+	return copied;
 }
 
 static int mptcp_init_sock(struct sock *sk)
@@ -234,18 +530,27 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
 	subflow->conn = mp->sk;
 
 	if (subflow->mp_capable) {
+		u64 ack_seq;
+
+		msk->remote_key = subflow->remote_key;
 		msk->local_key = subflow->local_key;
 		msk->token = subflow->token;
+		pr_debug("token=%u", msk->token);
 		token_update_accept(new_sock->sk, mp->sk);
+		msk->connection_list = new_sock;
+
+		crypto_key_sha1(msk->remote_key, NULL, &ack_seq);
 		msk->write_seq = subflow->idsn + 1;
+		ack_seq++;
+		atomic64_set(&msk->ack_seq, ack_seq);
+		subflow->map_seq = ack_seq;
+		subflow->map_subflow_seq = 1;
 		subflow->rel_write_seq = 1;
-		msk->remote_key = subflow->remote_key;
-		crypto_key_sha1(msk->remote_key, NULL, &msk->ack_seq);
-		msk->ack_seq++;
-		msk->connection_list = new_sock;
+		subflow->conn = mp->sk;
 	} else {
 		msk->subflow = new_sock;
 	}
+	inet_sk_state_store(sk, TCP_ESTABLISHED);
 
 	return mp->sk;
 }
@@ -323,17 +628,24 @@ void mptcp_finish_connect(struct sock *sk, int mp_capable)
 	pr_debug("msk=%p", msk);
 
 	if (mp_capable) {
+		u64 ack_seq;
+
+		msk->remote_key = subflow->remote_key;
 		msk->local_key = subflow->local_key;
 		msk->token = subflow->token;
-		msk->write_seq = subflow->idsn + 1;
-		subflow->rel_write_seq = 1;
-		msk->remote_key = subflow->remote_key;
-		crypto_key_sha1(msk->remote_key, NULL, &msk->ack_seq);
-		msk->ack_seq++;
+		pr_debug("token=%u", msk->token);
 		msk->connection_list = msk->subflow;
 		msk->subflow = NULL;
+
+		crypto_key_sha1(msk->remote_key, NULL, &ack_seq);
+		msk->write_seq = subflow->idsn + 1;
+		ack_seq++;
+		atomic64_set(&msk->ack_seq, ack_seq);
+		subflow->map_seq = ack_seq;
+		subflow->map_subflow_seq = 1;
+		subflow->rel_write_seq = 1;
 	}
-	sk->sk_state = TCP_ESTABLISHED;
+	inet_sk_state_store(sk, TCP_ESTABLISHED);
 }
 
 static int subflow_create(struct sock *sock)
diff --git a/net/mptcp/subflow.c b/net/mptcp/subflow.c
index e0b4aa8fe51d..3c9dca4be26a 100644
--- a/net/mptcp/subflow.c
+++ b/net/mptcp/subflow.c
@@ -107,6 +107,8 @@ static void subflow_v4_init_req(struct request_sock *req,
 		subflow_req->remote_key = rx_opt.mptcp.sndr_key;
 		pr_debug("remote_key=%llu", subflow_req->remote_key);
 		token_new_request(req, skb);
+		pr_debug("syn seq=%u", TCP_SKB_CB(skb)->seq);
+		subflow_req->ssn_offset = TCP_SKB_CB(skb)->seq;
 	} else {
 		subflow_req->mp_capable = 0;
 	}
@@ -120,10 +122,15 @@ static void subflow_finish_connect(struct sock *sk, const struct sk_buff *skb)
 
 	pr_debug("subflow=%p", subflow);
 
-	if (subflow->conn) {
+	if (!subflow->conn_finished) {
 		pr_debug("remote_key=%llu", subflow->remote_key);
 		mptcp_finish_connect(subflow->conn, subflow->mp_capable);
-		subflow->conn = NULL;
+		subflow->conn_finished = 1;
+
+		if (skb) {
+			pr_debug("synack seq=%u", TCP_SKB_CB(skb)->seq);
+			subflow->ssn_offset = TCP_SKB_CB(skb)->seq;
+		}
 	}
 }
 
@@ -172,7 +179,9 @@ static struct sock *subflow_syn_recv_sock(const struct sock *sk,
 			subflow->fourth_ack = 1;
 			subflow->remote_key = subflow_req->remote_key;
 			subflow->local_key = subflow_req->local_key;
+			subflow->ssn_offset = subflow_req->ssn_offset;
 			subflow->token = subflow_req->token;
+			subflow->idsn = subflow_req->idsn;
 			pr_debug("token=%u", subflow->token);
 			token_new_accept(child);
 		} else {
@@ -202,6 +211,20 @@ const struct inet_connection_sock_af_ops subflow_specific = {
 	.mtu_reduced	   = tcp_v4_mtu_reduced,
 };
 
+static void subflow_data_ready(struct sock *sk)
+{
+	struct subflow_sock *subflow = subflow_sk(sk);
+	struct sock *parent = subflow->conn;
+
+	pr_debug("sk=%p", sk);
+	subflow->tcp_sk_data_ready(sk);
+
+	if (parent) {
+		pr_debug("parent=%p", parent);
+		parent->sk_data_ready(parent);
+	}
+}
+
 static int subflow_init_sock(struct sock *sk)
 {
 	struct subflow_sock *subflow = subflow_sk(sk);
@@ -215,6 +238,8 @@ static int subflow_init_sock(struct sock *sk)
 	if (!err) { // @@ AND mptcp is enabled
 		tsk->is_mptcp = 1;
 		icsk->icsk_af_ops = &subflow_specific;
+		subflow->tcp_sk_data_ready = sk->sk_data_ready;
+		sk->sk_data_ready = subflow_data_ready;
 	}
 
 	return err;
-- 
2.19.1


^ permalink raw reply related	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2018-11-30 22:33 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-11-30 22:33 [MPTCP] [RFC PATCH v4 16/17] mptcp: Implement MPTCP receive path Mat Martineau
  -- strict thread matches above, loose matches on Subject: below --
2018-11-30 20:11 Mat Martineau

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.