All of lore.kernel.org
 help / color / mirror / Atom feed
* [MPTCP] [PATCH net-next 0/6] mptcp: avoid workqueue usage for data
@ 2020-11-27 10:10 ` Paolo Abeni
  0 siblings, 0 replies; 28+ messages in thread
From: Paolo Abeni @ 2020-11-27 10:10 UTC (permalink / raw)
  To: mptcp

[-- Attachment #1: Type: text/plain, Size: 1173 bytes --]

The current locking schema used to protect the MPTCP data-path
requires the usage of the MPTCP workqueue to process the incoming
data, depending on trylock result.

The above poses scalability limits and introduces random delays
in MPTCP-level acks.

With this series we use a single spinlock to protect the MPTCP
data-path, removing the need for workqueue and delayed ack usage.

This additionally reduces the number of atomic operations required
per packet and cleans-up considerably the poll/wake-up code.

Paolo Abeni (6):
  mptcp: open code mptcp variant for lock_sock
  mptcp: implement wmem reservation
  mptcp: protect the rx path with the msk socket spinlock
  mptcp: allocate TX skbs in msk context
  mptcp: avoid a few atomic ops in the rx path
  mptcp: use mptcp release_cb for delayed tasks

 include/net/sock.h     |   1 +
 net/core/sock.c        |   2 +-
 net/mptcp/mptcp_diag.c |   2 +-
 net/mptcp/options.c    |  47 +--
 net/mptcp/protocol.c   | 733 ++++++++++++++++++++++++++++++-----------
 net/mptcp/protocol.h   |  34 +-
 net/mptcp/subflow.c    |  14 +-
 7 files changed, 598 insertions(+), 235 deletions(-)

-- 
2.26.2

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [PATCH net-next 0/6] mptcp: avoid workqueue usage for data
@ 2020-11-27 10:10 ` Paolo Abeni
  0 siblings, 0 replies; 28+ messages in thread
From: Paolo Abeni @ 2020-11-27 10:10 UTC (permalink / raw)
  To: netdev; +Cc: Jakub Kicinski, mptcp, Eric Dumazet

The current locking schema used to protect the MPTCP data-path
requires the usage of the MPTCP workqueue to process the incoming
data, depending on trylock result.

The above poses scalability limits and introduces random delays
in MPTCP-level acks.

With this series we use a single spinlock to protect the MPTCP
data-path, removing the need for workqueue and delayed ack usage.

This additionally reduces the number of atomic operations required
per packet and cleans-up considerably the poll/wake-up code.

Paolo Abeni (6):
  mptcp: open code mptcp variant for lock_sock
  mptcp: implement wmem reservation
  mptcp: protect the rx path with the msk socket spinlock
  mptcp: allocate TX skbs in msk context
  mptcp: avoid a few atomic ops in the rx path
  mptcp: use mptcp release_cb for delayed tasks

 include/net/sock.h     |   1 +
 net/core/sock.c        |   2 +-
 net/mptcp/mptcp_diag.c |   2 +-
 net/mptcp/options.c    |  47 +--
 net/mptcp/protocol.c   | 733 ++++++++++++++++++++++++++++++-----------
 net/mptcp/protocol.h   |  34 +-
 net/mptcp/subflow.c    |  14 +-
 7 files changed, 598 insertions(+), 235 deletions(-)

-- 
2.26.2


^ permalink raw reply	[flat|nested] 28+ messages in thread

* [MPTCP] [PATCH net-next 1/6] mptcp: open code mptcp variant for lock_sock
  2020-11-27 10:10 ` Paolo Abeni
@ 2020-11-27 10:10 ` Paolo Abeni
  -1 siblings, 0 replies; 28+ messages in thread
From: Paolo Abeni @ 2020-11-27 10:10 UTC (permalink / raw)
  To: mptcp

[-- Attachment #1: Type: text/plain, Size: 2006 bytes --]

This allows invoking an additional callback under the
socket spin lock.

Will be used by the next patches to avoid additional
spin lock contention.

Acked-by: Florian Westphal <fw(a)strlen.de>
Signed-off-by: Paolo Abeni <pabeni(a)redhat.com>
---
 include/net/sock.h   |  1 +
 net/core/sock.c      |  2 +-
 net/mptcp/protocol.h | 13 +++++++++++++
 3 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/include/net/sock.h b/include/net/sock.h
index 80469c2c448d..f59764614e30 100644
--- a/include/net/sock.h
+++ b/include/net/sock.h
@@ -1590,6 +1590,7 @@ static inline void lock_sock(struct sock *sk)
 	lock_sock_nested(sk, 0);
 }
 
+void __lock_sock(struct sock *sk);
 void __release_sock(struct sock *sk);
 void release_sock(struct sock *sk);
 
diff --git a/net/core/sock.c b/net/core/sock.c
index 9badbe7bb4e4..f0f096852876 100644
--- a/net/core/sock.c
+++ b/net/core/sock.c
@@ -2486,7 +2486,7 @@ bool sk_page_frag_refill(struct sock *sk, struct page_frag *pfrag)
 }
 EXPORT_SYMBOL(sk_page_frag_refill);
 
-static void __lock_sock(struct sock *sk)
+void __lock_sock(struct sock *sk)
 	__releases(&sk->sk_lock.slock)
 	__acquires(&sk->sk_lock.slock)
 {
diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h
index 82d5626323b1..6abac8238de3 100644
--- a/net/mptcp/protocol.h
+++ b/net/mptcp/protocol.h
@@ -253,6 +253,19 @@ struct mptcp_sock {
 	} rcvq_space;
 };
 
+#define mptcp_lock_sock(___sk, cb) do {					\
+	struct sock *__sk = (___sk); /* silence macro reuse warning */	\
+	might_sleep();							\
+	spin_lock_bh(&__sk->sk_lock.slock);				\
+	if (__sk->sk_lock.owned)					\
+		__lock_sock(__sk);					\
+	cb;								\
+	__sk->sk_lock.owned = 1;					\
+	spin_unlock(&__sk->sk_lock.slock);				\
+	mutex_acquire(&__sk->sk_lock.dep_map, 0, 0, _RET_IP_);		\
+	local_bh_enable();						\
+} while (0)
+
 #define mptcp_for_each_subflow(__msk, __subflow)			\
 	list_for_each_entry(__subflow, &((__msk)->conn_list), node)
 
-- 
2.26.2

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [PATCH net-next 1/6] mptcp: open code mptcp variant for lock_sock
@ 2020-11-27 10:10 ` Paolo Abeni
  0 siblings, 0 replies; 28+ messages in thread
From: Paolo Abeni @ 2020-11-27 10:10 UTC (permalink / raw)
  To: netdev; +Cc: Jakub Kicinski, mptcp, Eric Dumazet

This allows invoking an additional callback under the
socket spin lock.

Will be used by the next patches to avoid additional
spin lock contention.

Acked-by: Florian Westphal <fw@strlen.de>
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
---
 include/net/sock.h   |  1 +
 net/core/sock.c      |  2 +-
 net/mptcp/protocol.h | 13 +++++++++++++
 3 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/include/net/sock.h b/include/net/sock.h
index 80469c2c448d..f59764614e30 100644
--- a/include/net/sock.h
+++ b/include/net/sock.h
@@ -1590,6 +1590,7 @@ static inline void lock_sock(struct sock *sk)
 	lock_sock_nested(sk, 0);
 }
 
+void __lock_sock(struct sock *sk);
 void __release_sock(struct sock *sk);
 void release_sock(struct sock *sk);
 
diff --git a/net/core/sock.c b/net/core/sock.c
index 9badbe7bb4e4..f0f096852876 100644
--- a/net/core/sock.c
+++ b/net/core/sock.c
@@ -2486,7 +2486,7 @@ bool sk_page_frag_refill(struct sock *sk, struct page_frag *pfrag)
 }
 EXPORT_SYMBOL(sk_page_frag_refill);
 
-static void __lock_sock(struct sock *sk)
+void __lock_sock(struct sock *sk)
 	__releases(&sk->sk_lock.slock)
 	__acquires(&sk->sk_lock.slock)
 {
diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h
index 82d5626323b1..6abac8238de3 100644
--- a/net/mptcp/protocol.h
+++ b/net/mptcp/protocol.h
@@ -253,6 +253,19 @@ struct mptcp_sock {
 	} rcvq_space;
 };
 
+#define mptcp_lock_sock(___sk, cb) do {					\
+	struct sock *__sk = (___sk); /* silence macro reuse warning */	\
+	might_sleep();							\
+	spin_lock_bh(&__sk->sk_lock.slock);				\
+	if (__sk->sk_lock.owned)					\
+		__lock_sock(__sk);					\
+	cb;								\
+	__sk->sk_lock.owned = 1;					\
+	spin_unlock(&__sk->sk_lock.slock);				\
+	mutex_acquire(&__sk->sk_lock.dep_map, 0, 0, _RET_IP_);		\
+	local_bh_enable();						\
+} while (0)
+
 #define mptcp_for_each_subflow(__msk, __subflow)			\
 	list_for_each_entry(__subflow, &((__msk)->conn_list), node)
 
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [MPTCP] [PATCH net-next 2/6] mptcp: implement wmem reservation
  2020-11-27 10:10 ` Paolo Abeni
@ 2020-11-27 10:10 ` Paolo Abeni
  -1 siblings, 0 replies; 28+ messages in thread
From: Paolo Abeni @ 2020-11-27 10:10 UTC (permalink / raw)
  To: mptcp

[-- Attachment #1: Type: text/plain, Size: 5901 bytes --]

This leverages the previous commit to reserve the wmem
required for the sendmsg() operation when the msk socket
lock is first acquired.
Some heuristics are used to get a reasonable [over] estimation of
the whole memory required. If we can't forward alloc such amount
fallback to a reasonable small chunk, otherwise enter the wait
for memory path.

When sendmsg() needs more memory it looks at wmem_reserved
first and if that is exhausted, move more space from
sk_forward_alloc.

The reserved memory is not persistent and is released at the
next socket unlock via the release_cb().

Overall this will simplify the next patch.

Acked-by: Florian Westphal <fw(a)strlen.de>
Signed-off-by: Paolo Abeni <pabeni(a)redhat.com>
---
 net/mptcp/protocol.c | 92 ++++++++++++++++++++++++++++++++++++++++----
 net/mptcp/protocol.h |  1 +
 2 files changed, 86 insertions(+), 7 deletions(-)

diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
index 16e9cb1c79cc..07fe484eefd1 100644
--- a/net/mptcp/protocol.c
+++ b/net/mptcp/protocol.c
@@ -873,6 +873,81 @@ static bool mptcp_frag_can_collapse_to(const struct mptcp_sock *msk,
 		df->data_seq + df->data_len == msk->write_seq;
 }
 
+static int mptcp_wmem_with_overhead(int size)
+{
+	return size + ((sizeof(struct mptcp_data_frag) * size) >> PAGE_SHIFT);
+}
+
+static void __mptcp_wmem_reserve(struct sock *sk, int size)
+{
+	int amount = mptcp_wmem_with_overhead(size);
+	struct mptcp_sock *msk = mptcp_sk(sk);
+
+	WARN_ON_ONCE(msk->wmem_reserved);
+	if (amount <= sk->sk_forward_alloc)
+		goto reserve;
+
+	/* under memory pressure try to reserve at most a single page
+	 * otherwise try to reserve the full estimate and fallback
+	 * to a single page before entering the error path
+	 */
+	if ((tcp_under_memory_pressure(sk) && amount > PAGE_SIZE) ||
+	    !sk_wmem_schedule(sk, amount)) {
+		if (amount <= PAGE_SIZE)
+			goto nomem;
+
+		amount = PAGE_SIZE;
+		if (!sk_wmem_schedule(sk, amount))
+			goto nomem;
+	}
+
+reserve:
+	msk->wmem_reserved = amount;
+	sk->sk_forward_alloc -= amount;
+	return;
+
+nomem:
+	/* we will wait for memory on next allocation */
+	msk->wmem_reserved = -1;
+}
+
+static void __mptcp_update_wmem(struct sock *sk)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
+
+	if (!msk->wmem_reserved)
+		return;
+
+	if (msk->wmem_reserved < 0)
+		msk->wmem_reserved = 0;
+	if (msk->wmem_reserved > 0) {
+		sk->sk_forward_alloc += msk->wmem_reserved;
+		msk->wmem_reserved = 0;
+	}
+}
+
+static bool mptcp_wmem_alloc(struct sock *sk, int size)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
+
+	/* check for pre-existing error condition */
+	if (msk->wmem_reserved < 0)
+		return false;
+
+	if (msk->wmem_reserved >= size)
+		goto account;
+
+	if (!sk_wmem_schedule(sk, size))
+		return false;
+
+	sk->sk_forward_alloc -= size;
+	msk->wmem_reserved += size;
+
+account:
+	msk->wmem_reserved -= size;
+	return true;
+}
+
 static void dfrag_uncharge(struct sock *sk, int len)
 {
 	sk_mem_uncharge(sk, len);
@@ -930,7 +1005,7 @@ static void mptcp_clean_una(struct sock *sk)
 	}
 
 out:
-	if (cleaned)
+	if (cleaned && tcp_under_memory_pressure(sk))
 		sk_mem_reclaim_partial(sk);
 }
 
@@ -1307,7 +1382,7 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 	if (msg->msg_flags & ~(MSG_MORE | MSG_DONTWAIT | MSG_NOSIGNAL))
 		return -EOPNOTSUPP;
 
-	lock_sock(sk);
+	mptcp_lock_sock(sk, __mptcp_wmem_reserve(sk, len));
 
 	timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
 
@@ -1356,11 +1431,12 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 		offset = dfrag->offset + dfrag->data_len;
 		psize = pfrag->size - offset;
 		psize = min_t(size_t, psize, msg_data_left(msg));
-		if (!sk_wmem_schedule(sk, psize + frag_truesize))
+		if (!mptcp_wmem_alloc(sk, psize + frag_truesize))
 			goto wait_for_memory;
 
 		if (copy_page_from_iter(dfrag->page, offset, psize,
 					&msg->msg_iter) != psize) {
+			msk->wmem_reserved += psize + frag_truesize;
 			ret = -EFAULT;
 			goto out;
 		}
@@ -1376,7 +1452,6 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 		 * Note: we charge such data both to sk and ssk
 		 */
 		sk_wmem_queued_add(sk, frag_truesize);
-		sk->sk_forward_alloc -= frag_truesize;
 		if (!dfrag_collapsed) {
 			get_page(dfrag->page);
 			list_add_tail(&dfrag->list, &msk->rtx_queue);
@@ -2003,6 +2078,7 @@ static int __mptcp_init_sock(struct sock *sk)
 	INIT_WORK(&msk->work, mptcp_worker);
 	msk->out_of_order_queue = RB_ROOT;
 	msk->first_pending = NULL;
+	msk->wmem_reserved = 0;
 
 	msk->ack_hint = NULL;
 	msk->first = NULL;
@@ -2197,6 +2273,7 @@ static void __mptcp_destroy_sock(struct sock *sk)
 
 	sk->sk_prot->destroy(sk);
 
+	WARN_ON_ONCE(msk->wmem_reserved);
 	sk_stream_kill_queues(sk);
 	xfrm_sk_free_policy(sk);
 	sk_refcnt_debug_release(sk);
@@ -2542,13 +2619,14 @@ static int mptcp_getsockopt(struct sock *sk, int level, int optname,
 
 #define MPTCP_DEFERRED_ALL (TCPF_WRITE_TIMER_DEFERRED)
 
-/* this is very alike tcp_release_cb() but we must handle differently a
- * different set of events
- */
+/* processes deferred events and flush wmem */
 static void mptcp_release_cb(struct sock *sk)
 {
 	unsigned long flags, nflags;
 
+	/* clear any wmem reservation and errors */
+	__mptcp_update_wmem(sk);
+
 	do {
 		flags = sk->sk_tsq_flags;
 		if (!(flags & MPTCP_DEFERRED_ALL))
diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h
index 6abac8238de3..4cf355076e35 100644
--- a/net/mptcp/protocol.h
+++ b/net/mptcp/protocol.h
@@ -218,6 +218,7 @@ struct mptcp_sock {
 	u64		ack_seq;
 	u64		rcv_wnd_sent;
 	u64		rcv_data_fin_seq;
+	int		wmem_reserved;
 	struct sock	*last_snd;
 	int		snd_burst;
 	int		old_wspace;
-- 
2.26.2

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [PATCH net-next 2/6] mptcp: implement wmem reservation
@ 2020-11-27 10:10 ` Paolo Abeni
  0 siblings, 0 replies; 28+ messages in thread
From: Paolo Abeni @ 2020-11-27 10:10 UTC (permalink / raw)
  To: netdev; +Cc: Jakub Kicinski, mptcp, Eric Dumazet

This leverages the previous commit to reserve the wmem
required for the sendmsg() operation when the msk socket
lock is first acquired.
Some heuristics are used to get a reasonable [over] estimation of
the whole memory required. If we can't forward alloc such amount
fallback to a reasonable small chunk, otherwise enter the wait
for memory path.

When sendmsg() needs more memory it looks at wmem_reserved
first and if that is exhausted, move more space from
sk_forward_alloc.

The reserved memory is not persistent and is released at the
next socket unlock via the release_cb().

Overall this will simplify the next patch.

Acked-by: Florian Westphal <fw@strlen.de>
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
---
 net/mptcp/protocol.c | 92 ++++++++++++++++++++++++++++++++++++++++----
 net/mptcp/protocol.h |  1 +
 2 files changed, 86 insertions(+), 7 deletions(-)

diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
index 16e9cb1c79cc..07fe484eefd1 100644
--- a/net/mptcp/protocol.c
+++ b/net/mptcp/protocol.c
@@ -873,6 +873,81 @@ static bool mptcp_frag_can_collapse_to(const struct mptcp_sock *msk,
 		df->data_seq + df->data_len == msk->write_seq;
 }
 
+static int mptcp_wmem_with_overhead(int size)
+{
+	return size + ((sizeof(struct mptcp_data_frag) * size) >> PAGE_SHIFT);
+}
+
+static void __mptcp_wmem_reserve(struct sock *sk, int size)
+{
+	int amount = mptcp_wmem_with_overhead(size);
+	struct mptcp_sock *msk = mptcp_sk(sk);
+
+	WARN_ON_ONCE(msk->wmem_reserved);
+	if (amount <= sk->sk_forward_alloc)
+		goto reserve;
+
+	/* under memory pressure try to reserve at most a single page
+	 * otherwise try to reserve the full estimate and fallback
+	 * to a single page before entering the error path
+	 */
+	if ((tcp_under_memory_pressure(sk) && amount > PAGE_SIZE) ||
+	    !sk_wmem_schedule(sk, amount)) {
+		if (amount <= PAGE_SIZE)
+			goto nomem;
+
+		amount = PAGE_SIZE;
+		if (!sk_wmem_schedule(sk, amount))
+			goto nomem;
+	}
+
+reserve:
+	msk->wmem_reserved = amount;
+	sk->sk_forward_alloc -= amount;
+	return;
+
+nomem:
+	/* we will wait for memory on next allocation */
+	msk->wmem_reserved = -1;
+}
+
+static void __mptcp_update_wmem(struct sock *sk)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
+
+	if (!msk->wmem_reserved)
+		return;
+
+	if (msk->wmem_reserved < 0)
+		msk->wmem_reserved = 0;
+	if (msk->wmem_reserved > 0) {
+		sk->sk_forward_alloc += msk->wmem_reserved;
+		msk->wmem_reserved = 0;
+	}
+}
+
+static bool mptcp_wmem_alloc(struct sock *sk, int size)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
+
+	/* check for pre-existing error condition */
+	if (msk->wmem_reserved < 0)
+		return false;
+
+	if (msk->wmem_reserved >= size)
+		goto account;
+
+	if (!sk_wmem_schedule(sk, size))
+		return false;
+
+	sk->sk_forward_alloc -= size;
+	msk->wmem_reserved += size;
+
+account:
+	msk->wmem_reserved -= size;
+	return true;
+}
+
 static void dfrag_uncharge(struct sock *sk, int len)
 {
 	sk_mem_uncharge(sk, len);
@@ -930,7 +1005,7 @@ static void mptcp_clean_una(struct sock *sk)
 	}
 
 out:
-	if (cleaned)
+	if (cleaned && tcp_under_memory_pressure(sk))
 		sk_mem_reclaim_partial(sk);
 }
 
@@ -1307,7 +1382,7 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 	if (msg->msg_flags & ~(MSG_MORE | MSG_DONTWAIT | MSG_NOSIGNAL))
 		return -EOPNOTSUPP;
 
-	lock_sock(sk);
+	mptcp_lock_sock(sk, __mptcp_wmem_reserve(sk, len));
 
 	timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
 
@@ -1356,11 +1431,12 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 		offset = dfrag->offset + dfrag->data_len;
 		psize = pfrag->size - offset;
 		psize = min_t(size_t, psize, msg_data_left(msg));
-		if (!sk_wmem_schedule(sk, psize + frag_truesize))
+		if (!mptcp_wmem_alloc(sk, psize + frag_truesize))
 			goto wait_for_memory;
 
 		if (copy_page_from_iter(dfrag->page, offset, psize,
 					&msg->msg_iter) != psize) {
+			msk->wmem_reserved += psize + frag_truesize;
 			ret = -EFAULT;
 			goto out;
 		}
@@ -1376,7 +1452,6 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 		 * Note: we charge such data both to sk and ssk
 		 */
 		sk_wmem_queued_add(sk, frag_truesize);
-		sk->sk_forward_alloc -= frag_truesize;
 		if (!dfrag_collapsed) {
 			get_page(dfrag->page);
 			list_add_tail(&dfrag->list, &msk->rtx_queue);
@@ -2003,6 +2078,7 @@ static int __mptcp_init_sock(struct sock *sk)
 	INIT_WORK(&msk->work, mptcp_worker);
 	msk->out_of_order_queue = RB_ROOT;
 	msk->first_pending = NULL;
+	msk->wmem_reserved = 0;
 
 	msk->ack_hint = NULL;
 	msk->first = NULL;
@@ -2197,6 +2273,7 @@ static void __mptcp_destroy_sock(struct sock *sk)
 
 	sk->sk_prot->destroy(sk);
 
+	WARN_ON_ONCE(msk->wmem_reserved);
 	sk_stream_kill_queues(sk);
 	xfrm_sk_free_policy(sk);
 	sk_refcnt_debug_release(sk);
@@ -2542,13 +2619,14 @@ static int mptcp_getsockopt(struct sock *sk, int level, int optname,
 
 #define MPTCP_DEFERRED_ALL (TCPF_WRITE_TIMER_DEFERRED)
 
-/* this is very alike tcp_release_cb() but we must handle differently a
- * different set of events
- */
+/* processes deferred events and flush wmem */
 static void mptcp_release_cb(struct sock *sk)
 {
 	unsigned long flags, nflags;
 
+	/* clear any wmem reservation and errors */
+	__mptcp_update_wmem(sk);
+
 	do {
 		flags = sk->sk_tsq_flags;
 		if (!(flags & MPTCP_DEFERRED_ALL))
diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h
index 6abac8238de3..4cf355076e35 100644
--- a/net/mptcp/protocol.h
+++ b/net/mptcp/protocol.h
@@ -218,6 +218,7 @@ struct mptcp_sock {
 	u64		ack_seq;
 	u64		rcv_wnd_sent;
 	u64		rcv_data_fin_seq;
+	int		wmem_reserved;
 	struct sock	*last_snd;
 	int		snd_burst;
 	int		old_wspace;
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [MPTCP] [PATCH net-next 3/6] mptcp: protect the rx path with the msk socket spinlock
  2020-11-27 10:10 ` Paolo Abeni
@ 2020-11-27 10:10 ` Paolo Abeni
  -1 siblings, 0 replies; 28+ messages in thread
From: Paolo Abeni @ 2020-11-27 10:10 UTC (permalink / raw)
  To: mptcp

[-- Attachment #1: Type: text/plain, Size: 13313 bytes --]

Such spinlock is currently used only to protect the 'owned'
flag inside the socket lock itself. With this patch, we extend
its scope to protect the whole msk receive path and
sk_forward_memory.

Given the above, we can always move data into the msk receive
queue (and OoO queue) from the subflow.

We leverage the previous commit, so that we need to acquire the
spinlock in the tx path only when moving fwd memory.

recvmsg() must now explicitly acquire the socket spinlock
when moving skbs out of sk_receive_queue. To reduce the number of
lock operations required we use a second rx queue and splice the
first into the latter in mptcp_lock_sock(). Additionally rmem
allocated memory is bulk-freed via release_cb()

Acked-by: Florian Westphal <fw(a)strlen.de>
Co-developed-by: Florian Westphal <fw(a)strlen.de>
Signed-off-by: Florian Westphal <fw(a)strlen.de>
Signed-off-by: Paolo Abeni <pabeni(a)redhat.com>
---
 net/mptcp/protocol.c | 149 +++++++++++++++++++++++++++++--------------
 net/mptcp/protocol.h |   5 ++
 2 files changed, 107 insertions(+), 47 deletions(-)

diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
index 07fe484eefd1..2f40882c4279 100644
--- a/net/mptcp/protocol.c
+++ b/net/mptcp/protocol.c
@@ -453,15 +453,15 @@ static bool mptcp_subflow_cleanup_rbuf(struct sock *ssk)
 
 static void mptcp_cleanup_rbuf(struct mptcp_sock *msk)
 {
+	struct sock *ack_hint = READ_ONCE(msk->ack_hint);
 	struct mptcp_subflow_context *subflow;
 
 	/* if the hinted ssk is still active, try to use it */
-	if (likely(msk->ack_hint)) {
+	if (likely(ack_hint)) {
 		mptcp_for_each_subflow(msk, subflow) {
 			struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
 
-			if (msk->ack_hint == ssk &&
-			    mptcp_subflow_cleanup_rbuf(ssk))
+			if (ack_hint == ssk && mptcp_subflow_cleanup_rbuf(ssk))
 				return;
 		}
 	}
@@ -614,13 +614,13 @@ static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk,
 			break;
 		}
 	} while (more_data_avail);
-	msk->ack_hint = ssk;
+	WRITE_ONCE(msk->ack_hint, ssk);
 
 	*bytes += moved;
 	return done;
 }
 
-static bool mptcp_ofo_queue(struct mptcp_sock *msk)
+static bool __mptcp_ofo_queue(struct mptcp_sock *msk)
 {
 	struct sock *sk = (struct sock *)msk;
 	struct sk_buff *skb, *tail;
@@ -666,34 +666,27 @@ static bool mptcp_ofo_queue(struct mptcp_sock *msk)
 /* In most cases we will be able to lock the mptcp socket.  If its already
  * owned, we need to defer to the work queue to avoid ABBA deadlock.
  */
-static bool move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk)
+static void move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk)
 {
 	struct sock *sk = (struct sock *)msk;
 	unsigned int moved = 0;
 
-	if (READ_ONCE(sk->sk_lock.owned))
-		return false;
-
-	if (unlikely(!spin_trylock_bh(&sk->sk_lock.slock)))
-		return false;
-
-	/* must re-check after taking the lock */
-	if (!READ_ONCE(sk->sk_lock.owned)) {
-		__mptcp_move_skbs_from_subflow(msk, ssk, &moved);
-		mptcp_ofo_queue(msk);
+	if (inet_sk_state_load(sk) == TCP_CLOSE)
+		return;
 
-		/* If the moves have caught up with the DATA_FIN sequence number
-		 * it's time to ack the DATA_FIN and change socket state, but
-		 * this is not a good place to change state. Let the workqueue
-		 * do it.
-		 */
-		if (mptcp_pending_data_fin(sk, NULL))
-			mptcp_schedule_work(sk);
-	}
+	mptcp_data_lock(sk);
 
-	spin_unlock_bh(&sk->sk_lock.slock);
+	__mptcp_move_skbs_from_subflow(msk, ssk, &moved);
+	__mptcp_ofo_queue(msk);
 
-	return moved > 0;
+	/* If the moves have caught up with the DATA_FIN sequence number
+	 * it's time to ack the DATA_FIN and change socket state, but
+	 * this is not a good place to change state. Let the workqueue
+	 * do it.
+	 */
+	if (mptcp_pending_data_fin(sk, NULL))
+		mptcp_schedule_work(sk);
+	mptcp_data_unlock(sk);
 }
 
 void mptcp_data_ready(struct sock *sk, struct sock *ssk)
@@ -937,17 +930,30 @@ static bool mptcp_wmem_alloc(struct sock *sk, int size)
 	if (msk->wmem_reserved >= size)
 		goto account;
 
-	if (!sk_wmem_schedule(sk, size))
+	mptcp_data_lock(sk);
+	if (!sk_wmem_schedule(sk, size)) {
+		mptcp_data_unlock(sk);
 		return false;
+	}
 
 	sk->sk_forward_alloc -= size;
 	msk->wmem_reserved += size;
+	mptcp_data_unlock(sk);
 
 account:
 	msk->wmem_reserved -= size;
 	return true;
 }
 
+static void mptcp_wmem_uncharge(struct sock *sk, int size)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
+
+	if (msk->wmem_reserved < 0)
+		msk->wmem_reserved = 0;
+	msk->wmem_reserved += size;
+}
+
 static void dfrag_uncharge(struct sock *sk, int len)
 {
 	sk_mem_uncharge(sk, len);
@@ -976,6 +982,7 @@ static void mptcp_clean_una(struct sock *sk)
 	if (__mptcp_check_fallback(msk))
 		atomic64_set(&msk->snd_una, msk->snd_nxt);
 
+	mptcp_data_lock(sk);
 	snd_una = atomic64_read(&msk->snd_una);
 
 	list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list) {
@@ -1007,6 +1014,7 @@ static void mptcp_clean_una(struct sock *sk)
 out:
 	if (cleaned && tcp_under_memory_pressure(sk))
 		sk_mem_reclaim_partial(sk);
+	mptcp_data_unlock(sk);
 }
 
 static void mptcp_clean_una_wakeup(struct sock *sk)
@@ -1436,7 +1444,7 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 
 		if (copy_page_from_iter(dfrag->page, offset, psize,
 					&msg->msg_iter) != psize) {
-			msk->wmem_reserved += psize + frag_truesize;
+			mptcp_wmem_uncharge(sk, psize + frag_truesize);
 			ret = -EFAULT;
 			goto out;
 		}
@@ -1502,11 +1510,10 @@ static int __mptcp_recvmsg_mskq(struct mptcp_sock *msk,
 				struct msghdr *msg,
 				size_t len)
 {
-	struct sock *sk = (struct sock *)msk;
 	struct sk_buff *skb;
 	int copied = 0;
 
-	while ((skb = skb_peek(&sk->sk_receive_queue)) != NULL) {
+	while ((skb = skb_peek(&msk->receive_queue)) != NULL) {
 		u32 offset = MPTCP_SKB_CB(skb)->offset;
 		u32 data_len = skb->len - offset;
 		u32 count = min_t(size_t, len - copied, data_len);
@@ -1526,7 +1533,10 @@ static int __mptcp_recvmsg_mskq(struct mptcp_sock *msk,
 			break;
 		}
 
-		__skb_unlink(skb, &sk->sk_receive_queue);
+		/* we will bulk release the skb memory later */
+		skb->destructor = NULL;
+		msk->rmem_released += skb->truesize;
+		__skb_unlink(skb, &msk->receive_queue);
 		__kfree_skb(skb);
 
 		if (copied >= len)
@@ -1634,25 +1644,47 @@ static void mptcp_rcv_space_adjust(struct mptcp_sock *msk, int copied)
 	msk->rcvq_space.time = mstamp;
 }
 
+static void __mptcp_update_rmem(struct sock *sk)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
+
+	if (!msk->rmem_released)
+		return;
+
+	atomic_sub(msk->rmem_released, &sk->sk_rmem_alloc);
+	sk_mem_uncharge(sk, msk->rmem_released);
+	msk->rmem_released = 0;
+}
+
+static void __mptcp_splice_receive_queue(struct sock *sk)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
+
+	skb_queue_splice_tail_init(&sk->sk_receive_queue, &msk->receive_queue);
+}
+
 static bool __mptcp_move_skbs(struct mptcp_sock *msk, unsigned int rcv)
 {
+	struct sock *sk = (struct sock *)msk;
 	unsigned int moved = 0;
-	bool done;
-
-	/* avoid looping forever below on racing close */
-	if (((struct sock *)msk)->sk_state == TCP_CLOSE)
-		return false;
+	bool ret, done;
 
 	__mptcp_flush_join_list(msk);
 	do {
 		struct sock *ssk = mptcp_subflow_recv_lookup(msk);
 		bool slowpath;
 
-		if (!ssk)
+		/* we can have data pending in the subflows only if the msk
+		 * receive buffer was full at subflow_data_ready() time,
+		 * that is an unlikely slow path.
+		 */
+		if (likely(!ssk))
 			break;
 
 		slowpath = lock_sock_fast(ssk);
+		mptcp_data_lock(sk);
 		done = __mptcp_move_skbs_from_subflow(msk, ssk, &moved);
+		mptcp_data_unlock(sk);
 		if (moved && rcv) {
 			WRITE_ONCE(msk->rmem_pending, min(rcv, moved));
 			tcp_cleanup_rbuf(ssk, 1);
@@ -1661,11 +1693,19 @@ static bool __mptcp_move_skbs(struct mptcp_sock *msk, unsigned int rcv)
 		unlock_sock_fast(ssk, slowpath);
 	} while (!done);
 
-	if (mptcp_ofo_queue(msk) || moved > 0) {
-		mptcp_check_data_fin((struct sock *)msk);
-		return true;
+	/* acquire the data lock only if some input data is pending */
+	ret = moved > 0;
+	if (!RB_EMPTY_ROOT(&msk->out_of_order_queue) ||
+	    !skb_queue_empty_lockless(&sk->sk_receive_queue)) {
+		mptcp_data_lock(sk);
+		__mptcp_update_rmem(sk);
+		ret |= __mptcp_ofo_queue(msk);
+		__mptcp_splice_receive_queue(sk);
+		mptcp_data_unlock(sk);
 	}
-	return false;
+	if (ret)
+		mptcp_check_data_fin((struct sock *)msk);
+	return !skb_queue_empty(&msk->receive_queue);
 }
 
 static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
@@ -1679,7 +1719,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
 	if (msg->msg_flags & ~(MSG_WAITALL | MSG_DONTWAIT))
 		return -EOPNOTSUPP;
 
-	lock_sock(sk);
+	mptcp_lock_sock(sk, __mptcp_splice_receive_queue(sk));
 	if (unlikely(sk->sk_state == TCP_LISTEN)) {
 		copied = -ENOTCONN;
 		goto out_err;
@@ -1689,7 +1729,6 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
 
 	len = min_t(size_t, len, INT_MAX);
 	target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
-	__mptcp_flush_join_list(msk);
 
 	for (;;) {
 		int bytes_read, old_space;
@@ -1703,7 +1742,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
 
 		copied += bytes_read;
 
-		if (skb_queue_empty(&sk->sk_receive_queue) &&
+		if (skb_queue_empty(&msk->receive_queue) &&
 		    __mptcp_move_skbs(msk, len - copied))
 			continue;
 
@@ -1734,8 +1773,14 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
 			if (test_and_clear_bit(MPTCP_WORK_EOF, &msk->flags))
 				mptcp_check_for_eof(msk);
 
-			if (sk->sk_shutdown & RCV_SHUTDOWN)
+			if (sk->sk_shutdown & RCV_SHUTDOWN) {
+				/* race breaker: the shutdown could be after the
+				 * previous receive queue check
+				 */
+				if (__mptcp_move_skbs(msk, len - copied))
+					continue;
 				break;
+			}
 
 			if (sk->sk_state == TCP_CLOSE) {
 				copied = -ENOTCONN;
@@ -1757,7 +1802,8 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
 		mptcp_wait_data(sk, &timeo);
 	}
 
-	if (skb_queue_empty(&sk->sk_receive_queue)) {
+	if (skb_queue_empty_lockless(&sk->sk_receive_queue) &&
+	    skb_queue_empty(&msk->receive_queue)) {
 		/* entire backlog drained, clear DATA_READY. */
 		clear_bit(MPTCP_DATA_READY, &msk->flags);
 
@@ -1773,7 +1819,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
 out_err:
 	pr_debug("msk=%p data_ready=%d rx queue empty=%d copied=%d",
 		 msk, test_bit(MPTCP_DATA_READY, &msk->flags),
-		 skb_queue_empty(&sk->sk_receive_queue), copied);
+		 skb_queue_empty_lockless(&sk->sk_receive_queue), copied);
 	mptcp_rcv_space_adjust(msk, copied);
 
 	release_sock(sk);
@@ -2076,9 +2122,11 @@ static int __mptcp_init_sock(struct sock *sk)
 	INIT_LIST_HEAD(&msk->join_list);
 	INIT_LIST_HEAD(&msk->rtx_queue);
 	INIT_WORK(&msk->work, mptcp_worker);
+	__skb_queue_head_init(&msk->receive_queue);
 	msk->out_of_order_queue = RB_ROOT;
 	msk->first_pending = NULL;
 	msk->wmem_reserved = 0;
+	msk->rmem_released = 0;
 
 	msk->ack_hint = NULL;
 	msk->first = NULL;
@@ -2274,6 +2322,7 @@ static void __mptcp_destroy_sock(struct sock *sk)
 	sk->sk_prot->destroy(sk);
 
 	WARN_ON_ONCE(msk->wmem_reserved);
+	WARN_ON_ONCE(msk->rmem_released);
 	sk_stream_kill_queues(sk);
 	xfrm_sk_free_policy(sk);
 	sk_refcnt_debug_release(sk);
@@ -2491,6 +2540,11 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
 
 void mptcp_destroy_common(struct mptcp_sock *msk)
 {
+	struct sock *sk = (struct sock *)msk;
+
+	/* move to sk_receive_queue, sk_stream_kill_queues will purge it */
+	skb_queue_splice_tail_init(&msk->receive_queue, &sk->sk_receive_queue);
+
 	skb_rbtree_purge(&msk->out_of_order_queue);
 	mptcp_token_destroy(msk);
 	mptcp_pm_free_anno_list(msk);
@@ -2626,6 +2680,7 @@ static void mptcp_release_cb(struct sock *sk)
 
 	/* clear any wmem reservation and errors */
 	__mptcp_update_wmem(sk);
+	__mptcp_update_rmem(sk);
 
 	do {
 		flags = sk->sk_tsq_flags;
diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h
index 4cf355076e35..fe2efd923c5c 100644
--- a/net/mptcp/protocol.h
+++ b/net/mptcp/protocol.h
@@ -227,6 +227,7 @@ struct mptcp_sock {
 	unsigned long	timer_ival;
 	u32		token;
 	int		rmem_pending;
+	int		rmem_released;
 	unsigned long	flags;
 	bool		can_ack;
 	bool		fully_established;
@@ -238,6 +239,7 @@ struct mptcp_sock {
 	struct work_struct work;
 	struct sk_buff  *ooo_last_skb;
 	struct rb_root  out_of_order_queue;
+	struct sk_buff_head receive_queue;
 	struct list_head conn_list;
 	struct list_head rtx_queue;
 	struct mptcp_data_frag *first_pending;
@@ -267,6 +269,9 @@ struct mptcp_sock {
 	local_bh_enable();						\
 } while (0)
 
+#define mptcp_data_lock(sk) spin_lock_bh(&(sk)->sk_lock.slock)
+#define mptcp_data_unlock(sk) spin_unlock_bh(&(sk)->sk_lock.slock)
+
 #define mptcp_for_each_subflow(__msk, __subflow)			\
 	list_for_each_entry(__subflow, &((__msk)->conn_list), node)
 
-- 
2.26.2

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [PATCH net-next 3/6] mptcp: protect the rx path with the msk socket spinlock
@ 2020-11-27 10:10 ` Paolo Abeni
  0 siblings, 0 replies; 28+ messages in thread
From: Paolo Abeni @ 2020-11-27 10:10 UTC (permalink / raw)
  To: netdev; +Cc: Jakub Kicinski, mptcp, Eric Dumazet

Such spinlock is currently used only to protect the 'owned'
flag inside the socket lock itself. With this patch, we extend
its scope to protect the whole msk receive path and
sk_forward_memory.

Given the above, we can always move data into the msk receive
queue (and OoO queue) from the subflow.

We leverage the previous commit, so that we need to acquire the
spinlock in the tx path only when moving fwd memory.

recvmsg() must now explicitly acquire the socket spinlock
when moving skbs out of sk_receive_queue. To reduce the number of
lock operations required we use a second rx queue and splice the
first into the latter in mptcp_lock_sock(). Additionally rmem
allocated memory is bulk-freed via release_cb()

Acked-by: Florian Westphal <fw@strlen.de>
Co-developed-by: Florian Westphal <fw@strlen.de>
Signed-off-by: Florian Westphal <fw@strlen.de>
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
---
 net/mptcp/protocol.c | 149 +++++++++++++++++++++++++++++--------------
 net/mptcp/protocol.h |   5 ++
 2 files changed, 107 insertions(+), 47 deletions(-)

diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
index 07fe484eefd1..2f40882c4279 100644
--- a/net/mptcp/protocol.c
+++ b/net/mptcp/protocol.c
@@ -453,15 +453,15 @@ static bool mptcp_subflow_cleanup_rbuf(struct sock *ssk)
 
 static void mptcp_cleanup_rbuf(struct mptcp_sock *msk)
 {
+	struct sock *ack_hint = READ_ONCE(msk->ack_hint);
 	struct mptcp_subflow_context *subflow;
 
 	/* if the hinted ssk is still active, try to use it */
-	if (likely(msk->ack_hint)) {
+	if (likely(ack_hint)) {
 		mptcp_for_each_subflow(msk, subflow) {
 			struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
 
-			if (msk->ack_hint == ssk &&
-			    mptcp_subflow_cleanup_rbuf(ssk))
+			if (ack_hint == ssk && mptcp_subflow_cleanup_rbuf(ssk))
 				return;
 		}
 	}
@@ -614,13 +614,13 @@ static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk,
 			break;
 		}
 	} while (more_data_avail);
-	msk->ack_hint = ssk;
+	WRITE_ONCE(msk->ack_hint, ssk);
 
 	*bytes += moved;
 	return done;
 }
 
-static bool mptcp_ofo_queue(struct mptcp_sock *msk)
+static bool __mptcp_ofo_queue(struct mptcp_sock *msk)
 {
 	struct sock *sk = (struct sock *)msk;
 	struct sk_buff *skb, *tail;
@@ -666,34 +666,27 @@ static bool mptcp_ofo_queue(struct mptcp_sock *msk)
 /* In most cases we will be able to lock the mptcp socket.  If its already
  * owned, we need to defer to the work queue to avoid ABBA deadlock.
  */
-static bool move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk)
+static void move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk)
 {
 	struct sock *sk = (struct sock *)msk;
 	unsigned int moved = 0;
 
-	if (READ_ONCE(sk->sk_lock.owned))
-		return false;
-
-	if (unlikely(!spin_trylock_bh(&sk->sk_lock.slock)))
-		return false;
-
-	/* must re-check after taking the lock */
-	if (!READ_ONCE(sk->sk_lock.owned)) {
-		__mptcp_move_skbs_from_subflow(msk, ssk, &moved);
-		mptcp_ofo_queue(msk);
+	if (inet_sk_state_load(sk) == TCP_CLOSE)
+		return;
 
-		/* If the moves have caught up with the DATA_FIN sequence number
-		 * it's time to ack the DATA_FIN and change socket state, but
-		 * this is not a good place to change state. Let the workqueue
-		 * do it.
-		 */
-		if (mptcp_pending_data_fin(sk, NULL))
-			mptcp_schedule_work(sk);
-	}
+	mptcp_data_lock(sk);
 
-	spin_unlock_bh(&sk->sk_lock.slock);
+	__mptcp_move_skbs_from_subflow(msk, ssk, &moved);
+	__mptcp_ofo_queue(msk);
 
-	return moved > 0;
+	/* If the moves have caught up with the DATA_FIN sequence number
+	 * it's time to ack the DATA_FIN and change socket state, but
+	 * this is not a good place to change state. Let the workqueue
+	 * do it.
+	 */
+	if (mptcp_pending_data_fin(sk, NULL))
+		mptcp_schedule_work(sk);
+	mptcp_data_unlock(sk);
 }
 
 void mptcp_data_ready(struct sock *sk, struct sock *ssk)
@@ -937,17 +930,30 @@ static bool mptcp_wmem_alloc(struct sock *sk, int size)
 	if (msk->wmem_reserved >= size)
 		goto account;
 
-	if (!sk_wmem_schedule(sk, size))
+	mptcp_data_lock(sk);
+	if (!sk_wmem_schedule(sk, size)) {
+		mptcp_data_unlock(sk);
 		return false;
+	}
 
 	sk->sk_forward_alloc -= size;
 	msk->wmem_reserved += size;
+	mptcp_data_unlock(sk);
 
 account:
 	msk->wmem_reserved -= size;
 	return true;
 }
 
+static void mptcp_wmem_uncharge(struct sock *sk, int size)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
+
+	if (msk->wmem_reserved < 0)
+		msk->wmem_reserved = 0;
+	msk->wmem_reserved += size;
+}
+
 static void dfrag_uncharge(struct sock *sk, int len)
 {
 	sk_mem_uncharge(sk, len);
@@ -976,6 +982,7 @@ static void mptcp_clean_una(struct sock *sk)
 	if (__mptcp_check_fallback(msk))
 		atomic64_set(&msk->snd_una, msk->snd_nxt);
 
+	mptcp_data_lock(sk);
 	snd_una = atomic64_read(&msk->snd_una);
 
 	list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list) {
@@ -1007,6 +1014,7 @@ static void mptcp_clean_una(struct sock *sk)
 out:
 	if (cleaned && tcp_under_memory_pressure(sk))
 		sk_mem_reclaim_partial(sk);
+	mptcp_data_unlock(sk);
 }
 
 static void mptcp_clean_una_wakeup(struct sock *sk)
@@ -1436,7 +1444,7 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 
 		if (copy_page_from_iter(dfrag->page, offset, psize,
 					&msg->msg_iter) != psize) {
-			msk->wmem_reserved += psize + frag_truesize;
+			mptcp_wmem_uncharge(sk, psize + frag_truesize);
 			ret = -EFAULT;
 			goto out;
 		}
@@ -1502,11 +1510,10 @@ static int __mptcp_recvmsg_mskq(struct mptcp_sock *msk,
 				struct msghdr *msg,
 				size_t len)
 {
-	struct sock *sk = (struct sock *)msk;
 	struct sk_buff *skb;
 	int copied = 0;
 
-	while ((skb = skb_peek(&sk->sk_receive_queue)) != NULL) {
+	while ((skb = skb_peek(&msk->receive_queue)) != NULL) {
 		u32 offset = MPTCP_SKB_CB(skb)->offset;
 		u32 data_len = skb->len - offset;
 		u32 count = min_t(size_t, len - copied, data_len);
@@ -1526,7 +1533,10 @@ static int __mptcp_recvmsg_mskq(struct mptcp_sock *msk,
 			break;
 		}
 
-		__skb_unlink(skb, &sk->sk_receive_queue);
+		/* we will bulk release the skb memory later */
+		skb->destructor = NULL;
+		msk->rmem_released += skb->truesize;
+		__skb_unlink(skb, &msk->receive_queue);
 		__kfree_skb(skb);
 
 		if (copied >= len)
@@ -1634,25 +1644,47 @@ static void mptcp_rcv_space_adjust(struct mptcp_sock *msk, int copied)
 	msk->rcvq_space.time = mstamp;
 }
 
+static void __mptcp_update_rmem(struct sock *sk)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
+
+	if (!msk->rmem_released)
+		return;
+
+	atomic_sub(msk->rmem_released, &sk->sk_rmem_alloc);
+	sk_mem_uncharge(sk, msk->rmem_released);
+	msk->rmem_released = 0;
+}
+
+static void __mptcp_splice_receive_queue(struct sock *sk)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
+
+	skb_queue_splice_tail_init(&sk->sk_receive_queue, &msk->receive_queue);
+}
+
 static bool __mptcp_move_skbs(struct mptcp_sock *msk, unsigned int rcv)
 {
+	struct sock *sk = (struct sock *)msk;
 	unsigned int moved = 0;
-	bool done;
-
-	/* avoid looping forever below on racing close */
-	if (((struct sock *)msk)->sk_state == TCP_CLOSE)
-		return false;
+	bool ret, done;
 
 	__mptcp_flush_join_list(msk);
 	do {
 		struct sock *ssk = mptcp_subflow_recv_lookup(msk);
 		bool slowpath;
 
-		if (!ssk)
+		/* we can have data pending in the subflows only if the msk
+		 * receive buffer was full at subflow_data_ready() time,
+		 * that is an unlikely slow path.
+		 */
+		if (likely(!ssk))
 			break;
 
 		slowpath = lock_sock_fast(ssk);
+		mptcp_data_lock(sk);
 		done = __mptcp_move_skbs_from_subflow(msk, ssk, &moved);
+		mptcp_data_unlock(sk);
 		if (moved && rcv) {
 			WRITE_ONCE(msk->rmem_pending, min(rcv, moved));
 			tcp_cleanup_rbuf(ssk, 1);
@@ -1661,11 +1693,19 @@ static bool __mptcp_move_skbs(struct mptcp_sock *msk, unsigned int rcv)
 		unlock_sock_fast(ssk, slowpath);
 	} while (!done);
 
-	if (mptcp_ofo_queue(msk) || moved > 0) {
-		mptcp_check_data_fin((struct sock *)msk);
-		return true;
+	/* acquire the data lock only if some input data is pending */
+	ret = moved > 0;
+	if (!RB_EMPTY_ROOT(&msk->out_of_order_queue) ||
+	    !skb_queue_empty_lockless(&sk->sk_receive_queue)) {
+		mptcp_data_lock(sk);
+		__mptcp_update_rmem(sk);
+		ret |= __mptcp_ofo_queue(msk);
+		__mptcp_splice_receive_queue(sk);
+		mptcp_data_unlock(sk);
 	}
-	return false;
+	if (ret)
+		mptcp_check_data_fin((struct sock *)msk);
+	return !skb_queue_empty(&msk->receive_queue);
 }
 
 static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
@@ -1679,7 +1719,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
 	if (msg->msg_flags & ~(MSG_WAITALL | MSG_DONTWAIT))
 		return -EOPNOTSUPP;
 
-	lock_sock(sk);
+	mptcp_lock_sock(sk, __mptcp_splice_receive_queue(sk));
 	if (unlikely(sk->sk_state == TCP_LISTEN)) {
 		copied = -ENOTCONN;
 		goto out_err;
@@ -1689,7 +1729,6 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
 
 	len = min_t(size_t, len, INT_MAX);
 	target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
-	__mptcp_flush_join_list(msk);
 
 	for (;;) {
 		int bytes_read, old_space;
@@ -1703,7 +1742,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
 
 		copied += bytes_read;
 
-		if (skb_queue_empty(&sk->sk_receive_queue) &&
+		if (skb_queue_empty(&msk->receive_queue) &&
 		    __mptcp_move_skbs(msk, len - copied))
 			continue;
 
@@ -1734,8 +1773,14 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
 			if (test_and_clear_bit(MPTCP_WORK_EOF, &msk->flags))
 				mptcp_check_for_eof(msk);
 
-			if (sk->sk_shutdown & RCV_SHUTDOWN)
+			if (sk->sk_shutdown & RCV_SHUTDOWN) {
+				/* race breaker: the shutdown could be after the
+				 * previous receive queue check
+				 */
+				if (__mptcp_move_skbs(msk, len - copied))
+					continue;
 				break;
+			}
 
 			if (sk->sk_state == TCP_CLOSE) {
 				copied = -ENOTCONN;
@@ -1757,7 +1802,8 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
 		mptcp_wait_data(sk, &timeo);
 	}
 
-	if (skb_queue_empty(&sk->sk_receive_queue)) {
+	if (skb_queue_empty_lockless(&sk->sk_receive_queue) &&
+	    skb_queue_empty(&msk->receive_queue)) {
 		/* entire backlog drained, clear DATA_READY. */
 		clear_bit(MPTCP_DATA_READY, &msk->flags);
 
@@ -1773,7 +1819,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
 out_err:
 	pr_debug("msk=%p data_ready=%d rx queue empty=%d copied=%d",
 		 msk, test_bit(MPTCP_DATA_READY, &msk->flags),
-		 skb_queue_empty(&sk->sk_receive_queue), copied);
+		 skb_queue_empty_lockless(&sk->sk_receive_queue), copied);
 	mptcp_rcv_space_adjust(msk, copied);
 
 	release_sock(sk);
@@ -2076,9 +2122,11 @@ static int __mptcp_init_sock(struct sock *sk)
 	INIT_LIST_HEAD(&msk->join_list);
 	INIT_LIST_HEAD(&msk->rtx_queue);
 	INIT_WORK(&msk->work, mptcp_worker);
+	__skb_queue_head_init(&msk->receive_queue);
 	msk->out_of_order_queue = RB_ROOT;
 	msk->first_pending = NULL;
 	msk->wmem_reserved = 0;
+	msk->rmem_released = 0;
 
 	msk->ack_hint = NULL;
 	msk->first = NULL;
@@ -2274,6 +2322,7 @@ static void __mptcp_destroy_sock(struct sock *sk)
 	sk->sk_prot->destroy(sk);
 
 	WARN_ON_ONCE(msk->wmem_reserved);
+	WARN_ON_ONCE(msk->rmem_released);
 	sk_stream_kill_queues(sk);
 	xfrm_sk_free_policy(sk);
 	sk_refcnt_debug_release(sk);
@@ -2491,6 +2540,11 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
 
 void mptcp_destroy_common(struct mptcp_sock *msk)
 {
+	struct sock *sk = (struct sock *)msk;
+
+	/* move to sk_receive_queue, sk_stream_kill_queues will purge it */
+	skb_queue_splice_tail_init(&msk->receive_queue, &sk->sk_receive_queue);
+
 	skb_rbtree_purge(&msk->out_of_order_queue);
 	mptcp_token_destroy(msk);
 	mptcp_pm_free_anno_list(msk);
@@ -2626,6 +2680,7 @@ static void mptcp_release_cb(struct sock *sk)
 
 	/* clear any wmem reservation and errors */
 	__mptcp_update_wmem(sk);
+	__mptcp_update_rmem(sk);
 
 	do {
 		flags = sk->sk_tsq_flags;
diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h
index 4cf355076e35..fe2efd923c5c 100644
--- a/net/mptcp/protocol.h
+++ b/net/mptcp/protocol.h
@@ -227,6 +227,7 @@ struct mptcp_sock {
 	unsigned long	timer_ival;
 	u32		token;
 	int		rmem_pending;
+	int		rmem_released;
 	unsigned long	flags;
 	bool		can_ack;
 	bool		fully_established;
@@ -238,6 +239,7 @@ struct mptcp_sock {
 	struct work_struct work;
 	struct sk_buff  *ooo_last_skb;
 	struct rb_root  out_of_order_queue;
+	struct sk_buff_head receive_queue;
 	struct list_head conn_list;
 	struct list_head rtx_queue;
 	struct mptcp_data_frag *first_pending;
@@ -267,6 +269,9 @@ struct mptcp_sock {
 	local_bh_enable();						\
 } while (0)
 
+#define mptcp_data_lock(sk) spin_lock_bh(&(sk)->sk_lock.slock)
+#define mptcp_data_unlock(sk) spin_unlock_bh(&(sk)->sk_lock.slock)
+
 #define mptcp_for_each_subflow(__msk, __subflow)			\
 	list_for_each_entry(__subflow, &((__msk)->conn_list), node)
 
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [MPTCP] [PATCH net-next 4/6] mptcp: allocate TX skbs in msk context
  2020-11-27 10:10 ` Paolo Abeni
@ 2020-11-27 10:10 ` Paolo Abeni
  -1 siblings, 0 replies; 28+ messages in thread
From: Paolo Abeni @ 2020-11-27 10:10 UTC (permalink / raw)
  To: mptcp

[-- Attachment #1: Type: text/plain, Size: 14242 bytes --]

Move the TX skbs allocation in mptcp_sendmsg() scope,
and tentatively pre-allocate a skbs number proportional
to the sendmsg() length.

Use the ssk tx skb cache to prevent the subflow allocation.

This allows removing the msk skb extension cache and will
make possible the later patches.

Acked-by: Florian Westphal <fw(a)strlen.de>
Signed-off-by: Paolo Abeni <pabeni(a)redhat.com>
---
 net/mptcp/protocol.c | 248 ++++++++++++++++++++++++++++++++++++-------
 net/mptcp/protocol.h |   4 +-
 2 files changed, 210 insertions(+), 42 deletions(-)

diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
index 2f40882c4279..75b4c4c50dbb 100644
--- a/net/mptcp/protocol.c
+++ b/net/mptcp/protocol.c
@@ -818,16 +818,6 @@ static void mptcp_check_for_eof(struct mptcp_sock *msk)
 	mptcp_close_wake_up(sk);
 }
 
-static bool mptcp_ext_cache_refill(struct mptcp_sock *msk)
-{
-	const struct sock *sk = (const struct sock *)msk;
-
-	if (!msk->cached_ext)
-		msk->cached_ext = __skb_ext_alloc(sk->sk_allocation);
-
-	return !!msk->cached_ext;
-}
-
 static struct sock *mptcp_subflow_recv_lookup(const struct mptcp_sock *msk)
 {
 	struct mptcp_subflow_context *subflow;
@@ -866,14 +856,22 @@ static bool mptcp_frag_can_collapse_to(const struct mptcp_sock *msk,
 		df->data_seq + df->data_len == msk->write_seq;
 }
 
-static int mptcp_wmem_with_overhead(int size)
+static int mptcp_wmem_with_overhead(struct sock *sk, int size)
 {
-	return size + ((sizeof(struct mptcp_data_frag) * size) >> PAGE_SHIFT);
+	struct mptcp_sock *msk = mptcp_sk(sk);
+	int ret, skbs;
+
+	ret = size + ((sizeof(struct mptcp_data_frag) * size) >> PAGE_SHIFT);
+	skbs = (msk->tx_pending_data + size) / msk->size_goal_cache;
+	if (skbs < msk->skb_tx_cache.qlen)
+		return ret;
+
+	return ret + (skbs - msk->skb_tx_cache.qlen) * SKB_TRUESIZE(MAX_TCP_HEADER);
 }
 
 static void __mptcp_wmem_reserve(struct sock *sk, int size)
 {
-	int amount = mptcp_wmem_with_overhead(size);
+	int amount = mptcp_wmem_with_overhead(sk, size);
 	struct mptcp_sock *msk = mptcp_sk(sk);
 
 	WARN_ON_ONCE(msk->wmem_reserved);
@@ -954,6 +952,25 @@ static void mptcp_wmem_uncharge(struct sock *sk, int size)
 	msk->wmem_reserved += size;
 }
 
+static void mptcp_mem_reclaim_partial(struct sock *sk)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
+
+	/* if we are experiencing a transint allocation error,
+	 * the forward allocation memory has been already
+	 * released
+	 */
+	if (msk->wmem_reserved < 0)
+		return;
+
+	mptcp_data_lock(sk);
+	sk->sk_forward_alloc += msk->wmem_reserved;
+	sk_mem_reclaim_partial(sk);
+	msk->wmem_reserved = sk->sk_forward_alloc;
+	sk->sk_forward_alloc = 0;
+	mptcp_data_unlock(sk);
+}
+
 static void dfrag_uncharge(struct sock *sk, int len)
 {
 	sk_mem_uncharge(sk, len);
@@ -1030,19 +1047,12 @@ static void mptcp_clean_una_wakeup(struct sock *sk)
 	}
 }
 
-/* ensure we get enough memory for the frag hdr, beyond some minimal amount of
- * data
- */
-static bool mptcp_page_frag_refill(struct sock *sk, struct page_frag *pfrag)
+static void mptcp_enter_memory_pressure(struct sock *sk)
 {
 	struct mptcp_subflow_context *subflow;
 	struct mptcp_sock *msk = mptcp_sk(sk);
 	bool first = true;
 
-	if (likely(skb_page_frag_refill(32U + sizeof(struct mptcp_data_frag),
-					pfrag, sk->sk_allocation)))
-		return true;
-
 	sk_stream_moderate_sndbuf(sk);
 	mptcp_for_each_subflow(msk, subflow) {
 		struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
@@ -1052,6 +1062,18 @@ static bool mptcp_page_frag_refill(struct sock *sk, struct page_frag *pfrag)
 		sk_stream_moderate_sndbuf(ssk);
 		first = false;
 	}
+}
+
+/* ensure we get enough memory for the frag hdr, beyond some minimal amount of
+ * data
+ */
+static bool mptcp_page_frag_refill(struct sock *sk, struct page_frag *pfrag)
+{
+	if (likely(skb_page_frag_refill(32U + sizeof(struct mptcp_data_frag),
+					pfrag, sk->sk_allocation)))
+		return true;
+
+	mptcp_enter_memory_pressure(sk);
 	return false;
 }
 
@@ -1098,6 +1120,128 @@ static int mptcp_check_allowed_size(struct mptcp_sock *msk, u64 data_seq,
 	return avail_size;
 }
 
+static bool __mptcp_add_ext(struct sk_buff *skb, gfp_t gfp)
+{
+	struct skb_ext *mpext = __skb_ext_alloc(gfp);
+
+	if (!mpext)
+		return false;
+	__skb_ext_set(skb, SKB_EXT_MPTCP, mpext);
+	return true;
+}
+
+static struct sk_buff *__mptcp_do_alloc_tx_skb(struct sock *sk)
+{
+	struct sk_buff *skb;
+
+	skb = alloc_skb_fclone(MAX_TCP_HEADER, sk->sk_allocation);
+	if (likely(skb)) {
+		if (likely(__mptcp_add_ext(skb, sk->sk_allocation))) {
+			skb_reserve(skb, MAX_TCP_HEADER);
+			skb->reserved_tailroom = skb->end - skb->tail;
+			return skb;
+		}
+		__kfree_skb(skb);
+	} else {
+		mptcp_enter_memory_pressure(sk);
+	}
+	return NULL;
+}
+
+static bool mptcp_tx_cache_refill(struct sock *sk, int size,
+				  struct sk_buff_head *skbs, int *total_ts)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
+	struct sk_buff *skb;
+	int space_needed;
+
+	if (unlikely(tcp_under_memory_pressure(sk))) {
+		mptcp_mem_reclaim_partial(sk);
+
+		/* under pressure pre-allocate at most a single skb */
+		if (msk->skb_tx_cache.qlen)
+			return true;
+		space_needed = msk->size_goal_cache;
+	} else {
+		space_needed = msk->tx_pending_data + size -
+			       msk->skb_tx_cache.qlen * msk->size_goal_cache;
+	}
+
+	while (space_needed > 0) {
+		skb = __mptcp_do_alloc_tx_skb(sk);
+		if (unlikely(!skb)) {
+			/* under memory pressure, try to pass the caller a
+			 * single skb to allow forward progress
+			 */
+			while (skbs->qlen > 1) {
+				skb = __skb_dequeue_tail(skbs);
+				__kfree_skb(skb);
+			}
+			return skbs->qlen > 0;
+		}
+
+		*total_ts += skb->truesize;
+		__skb_queue_tail(skbs, skb);
+		space_needed -= msk->size_goal_cache;
+	}
+	return true;
+}
+
+static bool __mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
+	struct sk_buff *skb;
+
+	if (ssk->sk_tx_skb_cache) {
+		skb = ssk->sk_tx_skb_cache;
+		if (unlikely(!skb_ext_find(skb, SKB_EXT_MPTCP) &&
+			     !__mptcp_add_ext(skb, sk->sk_allocation)))
+			return false;
+		return true;
+	}
+
+	skb = skb_peek(&msk->skb_tx_cache);
+	if (skb) {
+		if (likely(sk_wmem_schedule(ssk, skb->truesize))) {
+			skb = __skb_dequeue(&msk->skb_tx_cache);
+			if (WARN_ON_ONCE(!skb))
+				return false;
+
+			mptcp_wmem_uncharge(sk, skb->truesize);
+			ssk->sk_tx_skb_cache = skb;
+			return true;
+		}
+
+		/* over memory limit, no point to try to allocate a new skb */
+		return false;
+	}
+
+	skb = __mptcp_do_alloc_tx_skb(sk);
+	if (!skb)
+		return false;
+
+	if (likely(sk_wmem_schedule(ssk, skb->truesize))) {
+		ssk->sk_tx_skb_cache = skb;
+		return true;
+	}
+	kfree_skb(skb);
+	return false;
+}
+
+static bool mptcp_must_reclaim_memory(struct sock *sk, struct sock *ssk)
+{
+	return !ssk->sk_tx_skb_cache &&
+	       !skb_peek(&mptcp_sk(sk)->skb_tx_cache) &&
+	       tcp_under_memory_pressure(sk);
+}
+
+static bool mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk)
+{
+	if (unlikely(mptcp_must_reclaim_memory(sk, ssk)))
+		mptcp_mem_reclaim_partial(sk);
+	return __mptcp_alloc_tx_skb(sk, ssk);
+}
+
 static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
 			      struct mptcp_data_frag *dfrag,
 			      struct mptcp_sendmsg_info *info)
@@ -1109,7 +1253,7 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
 	struct sk_buff *skb, *tail;
 	bool can_collapse = false;
 	int avail_size;
-	size_t ret;
+	size_t ret = 0;
 
 	pr_debug("msk=%p ssk=%p sending dfrag at seq=%lld len=%d already sent=%d",
 		 msk, ssk, dfrag->data_seq, dfrag->data_len, info->sent);
@@ -1117,6 +1261,7 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
 	/* compute send limit */
 	info->mss_now = tcp_send_mss(ssk, &info->size_goal, info->flags);
 	avail_size = info->size_goal;
+	msk->size_goal_cache = info->size_goal;
 	skb = tcp_write_queue_tail(ssk);
 	if (skb) {
 		/* Limit the write to the size available in the
@@ -1165,8 +1310,11 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
 		goto out;
 	}
 
-	mpext = __skb_ext_set(tail, SKB_EXT_MPTCP, msk->cached_ext);
-	msk->cached_ext = NULL;
+	mpext = skb_ext_find(tail, SKB_EXT_MPTCP);
+	if (WARN_ON_ONCE(!mpext)) {
+		/* should never reach here, stream corrupted */
+		return -EINVAL;
+	}
 
 	memset(mpext, 0, sizeof(*mpext));
 	mpext->data_seq = data_seq;
@@ -1239,9 +1387,6 @@ static struct sock *mptcp_subflow_get_send(struct mptcp_sock *msk,
 	sock_owned_by_me((struct sock *)msk);
 
 	*sndbuf = 0;
-	if (!mptcp_ext_cache_refill(msk))
-		return NULL;
-
 	if (__mptcp_check_fallback(msk)) {
 		if (!msk->first)
 			return NULL;
@@ -1350,6 +1495,15 @@ static void mptcp_push_pending(struct sock *sk, unsigned int flags)
 			if (ssk != prev_ssk || !prev_ssk)
 				lock_sock(ssk);
 
+			/* keep it simple and always provide a new skb for the
+			 * subflow, even if we will not use it when collapsing
+			 * on the pending one
+			 */
+			if (!mptcp_alloc_tx_skb(sk, ssk)) {
+				mptcp_push_release(sk, ssk, &info);
+				goto out;
+			}
+
 			ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
 			if (ret <= 0) {
 				mptcp_push_release(sk, ssk, &info);
@@ -1360,6 +1514,7 @@ static void mptcp_push_pending(struct sock *sk, unsigned int flags)
 			dfrag->already_sent += ret;
 			msk->snd_nxt += ret;
 			msk->snd_burst -= ret;
+			msk->tx_pending_data -= ret;
 			copied += ret;
 			len -= ret;
 		}
@@ -1404,8 +1559,9 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 	mptcp_clean_una(sk);
 
 	while (msg_data_left(msg)) {
+		int total_ts, frag_truesize = 0;
 		struct mptcp_data_frag *dfrag;
-		int frag_truesize = 0;
+		struct sk_buff_head skbs;
 		bool dfrag_collapsed;
 		size_t psize, offset;
 
@@ -1439,9 +1595,17 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 		offset = dfrag->offset + dfrag->data_len;
 		psize = pfrag->size - offset;
 		psize = min_t(size_t, psize, msg_data_left(msg));
-		if (!mptcp_wmem_alloc(sk, psize + frag_truesize))
+		total_ts = psize + frag_truesize;
+		__skb_queue_head_init(&skbs);
+		if (!mptcp_tx_cache_refill(sk, psize, &skbs, &total_ts))
 			goto wait_for_memory;
 
+		if (!mptcp_wmem_alloc(sk, total_ts)) {
+			__skb_queue_purge(&skbs);
+			goto wait_for_memory;
+		}
+
+		skb_queue_splice_tail(&skbs, &msk->skb_tx_cache);
 		if (copy_page_from_iter(dfrag->page, offset, psize,
 					&msg->msg_iter) != psize) {
 			mptcp_wmem_uncharge(sk, psize + frag_truesize);
@@ -1470,8 +1634,6 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 			 dfrag->data_seq, dfrag->data_len, dfrag->already_sent,
 			 !dfrag_collapsed);
 
-		if (!mptcp_ext_cache_refill(msk))
-			goto wait_for_memory;
 		continue;
 
 wait_for_memory:
@@ -1483,8 +1645,10 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 			goto out;
 	}
 
-	if (copied)
+	if (copied) {
+		msk->tx_pending_data += copied;
 		mptcp_push_pending(sk, msg->msg_flags);
+	}
 
 out:
 	release_sock(sk);
@@ -2072,9 +2236,6 @@ static void mptcp_worker(struct work_struct *work)
 	if (!dfrag)
 		goto unlock;
 
-	if (!mptcp_ext_cache_refill(msk))
-		goto reset_unlock;
-
 	ssk = mptcp_subflow_get_retrans(msk);
 	if (!ssk)
 		goto reset_unlock;
@@ -2085,6 +2246,9 @@ static void mptcp_worker(struct work_struct *work)
 	info.sent = 0;
 	info.limit = dfrag->already_sent;
 	while (info.sent < dfrag->already_sent) {
+		if (!mptcp_alloc_tx_skb(sk, ssk))
+			break;
+
 		ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
 		if (ret <= 0)
 			break;
@@ -2092,9 +2256,6 @@ static void mptcp_worker(struct work_struct *work)
 		MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_RETRANSSEGS);
 		copied += ret;
 		info.sent += ret;
-
-		if (!mptcp_ext_cache_refill(msk))
-			break;
 	}
 	if (copied)
 		tcp_push(ssk, 0, info.mss_now, tcp_sk(ssk)->nonagle,
@@ -2123,10 +2284,13 @@ static int __mptcp_init_sock(struct sock *sk)
 	INIT_LIST_HEAD(&msk->rtx_queue);
 	INIT_WORK(&msk->work, mptcp_worker);
 	__skb_queue_head_init(&msk->receive_queue);
+	__skb_queue_head_init(&msk->skb_tx_cache);
 	msk->out_of_order_queue = RB_ROOT;
 	msk->first_pending = NULL;
 	msk->wmem_reserved = 0;
 	msk->rmem_released = 0;
+	msk->tx_pending_data = 0;
+	msk->size_goal_cache = TCP_BASE_MSS;
 
 	msk->ack_hint = NULL;
 	msk->first = NULL;
@@ -2170,12 +2334,17 @@ static void __mptcp_clear_xmit(struct sock *sk)
 {
 	struct mptcp_sock *msk = mptcp_sk(sk);
 	struct mptcp_data_frag *dtmp, *dfrag;
+	struct sk_buff *skb;
 
 	sk_stop_timer(sk, &msk->sk.icsk_retransmit_timer);
 
 	WRITE_ONCE(msk->first_pending, NULL);
 	list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list)
 		dfrag_clear(sk, dfrag);
+	while ((skb = __skb_dequeue(&msk->skb_tx_cache)) != NULL) {
+		sk->sk_forward_alloc += skb->truesize;
+		kfree_skb(skb);
+	}
 }
 
 static void mptcp_cancel_work(struct sock *sk)
@@ -2554,9 +2723,6 @@ static void mptcp_destroy(struct sock *sk)
 {
 	struct mptcp_sock *msk = mptcp_sk(sk);
 
-	if (msk->cached_ext)
-		__skb_ext_put(msk->cached_ext);
-
 	mptcp_destroy_common(msk);
 	sk_sockets_allocated_dec(sk);
 }
diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h
index fe2efd923c5c..97c1e5dcb3e2 100644
--- a/net/mptcp/protocol.h
+++ b/net/mptcp/protocol.h
@@ -240,11 +240,13 @@ struct mptcp_sock {
 	struct sk_buff  *ooo_last_skb;
 	struct rb_root  out_of_order_queue;
 	struct sk_buff_head receive_queue;
+	struct sk_buff_head skb_tx_cache;	/* this is wmem accounted */
+	int		tx_pending_data;
+	int		size_goal_cache;
 	struct list_head conn_list;
 	struct list_head rtx_queue;
 	struct mptcp_data_frag *first_pending;
 	struct list_head join_list;
-	struct skb_ext	*cached_ext;	/* for the next sendmsg */
 	struct socket	*subflow; /* outgoing connect/listener/!mp_capable */
 	struct sock	*first;
 	struct mptcp_pm_data	pm;
-- 
2.26.2

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [PATCH net-next 4/6] mptcp: allocate TX skbs in msk context
@ 2020-11-27 10:10 ` Paolo Abeni
  0 siblings, 0 replies; 28+ messages in thread
From: Paolo Abeni @ 2020-11-27 10:10 UTC (permalink / raw)
  To: netdev; +Cc: Jakub Kicinski, mptcp, Eric Dumazet

Move the TX skbs allocation in mptcp_sendmsg() scope,
and tentatively pre-allocate a skbs number proportional
to the sendmsg() length.

Use the ssk tx skb cache to prevent the subflow allocation.

This allows removing the msk skb extension cache and will
make possible the later patches.

Acked-by: Florian Westphal <fw@strlen.de>
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
---
 net/mptcp/protocol.c | 248 ++++++++++++++++++++++++++++++++++++-------
 net/mptcp/protocol.h |   4 +-
 2 files changed, 210 insertions(+), 42 deletions(-)

diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
index 2f40882c4279..75b4c4c50dbb 100644
--- a/net/mptcp/protocol.c
+++ b/net/mptcp/protocol.c
@@ -818,16 +818,6 @@ static void mptcp_check_for_eof(struct mptcp_sock *msk)
 	mptcp_close_wake_up(sk);
 }
 
-static bool mptcp_ext_cache_refill(struct mptcp_sock *msk)
-{
-	const struct sock *sk = (const struct sock *)msk;
-
-	if (!msk->cached_ext)
-		msk->cached_ext = __skb_ext_alloc(sk->sk_allocation);
-
-	return !!msk->cached_ext;
-}
-
 static struct sock *mptcp_subflow_recv_lookup(const struct mptcp_sock *msk)
 {
 	struct mptcp_subflow_context *subflow;
@@ -866,14 +856,22 @@ static bool mptcp_frag_can_collapse_to(const struct mptcp_sock *msk,
 		df->data_seq + df->data_len == msk->write_seq;
 }
 
-static int mptcp_wmem_with_overhead(int size)
+static int mptcp_wmem_with_overhead(struct sock *sk, int size)
 {
-	return size + ((sizeof(struct mptcp_data_frag) * size) >> PAGE_SHIFT);
+	struct mptcp_sock *msk = mptcp_sk(sk);
+	int ret, skbs;
+
+	ret = size + ((sizeof(struct mptcp_data_frag) * size) >> PAGE_SHIFT);
+	skbs = (msk->tx_pending_data + size) / msk->size_goal_cache;
+	if (skbs < msk->skb_tx_cache.qlen)
+		return ret;
+
+	return ret + (skbs - msk->skb_tx_cache.qlen) * SKB_TRUESIZE(MAX_TCP_HEADER);
 }
 
 static void __mptcp_wmem_reserve(struct sock *sk, int size)
 {
-	int amount = mptcp_wmem_with_overhead(size);
+	int amount = mptcp_wmem_with_overhead(sk, size);
 	struct mptcp_sock *msk = mptcp_sk(sk);
 
 	WARN_ON_ONCE(msk->wmem_reserved);
@@ -954,6 +952,25 @@ static void mptcp_wmem_uncharge(struct sock *sk, int size)
 	msk->wmem_reserved += size;
 }
 
+static void mptcp_mem_reclaim_partial(struct sock *sk)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
+
+	/* if we are experiencing a transint allocation error,
+	 * the forward allocation memory has been already
+	 * released
+	 */
+	if (msk->wmem_reserved < 0)
+		return;
+
+	mptcp_data_lock(sk);
+	sk->sk_forward_alloc += msk->wmem_reserved;
+	sk_mem_reclaim_partial(sk);
+	msk->wmem_reserved = sk->sk_forward_alloc;
+	sk->sk_forward_alloc = 0;
+	mptcp_data_unlock(sk);
+}
+
 static void dfrag_uncharge(struct sock *sk, int len)
 {
 	sk_mem_uncharge(sk, len);
@@ -1030,19 +1047,12 @@ static void mptcp_clean_una_wakeup(struct sock *sk)
 	}
 }
 
-/* ensure we get enough memory for the frag hdr, beyond some minimal amount of
- * data
- */
-static bool mptcp_page_frag_refill(struct sock *sk, struct page_frag *pfrag)
+static void mptcp_enter_memory_pressure(struct sock *sk)
 {
 	struct mptcp_subflow_context *subflow;
 	struct mptcp_sock *msk = mptcp_sk(sk);
 	bool first = true;
 
-	if (likely(skb_page_frag_refill(32U + sizeof(struct mptcp_data_frag),
-					pfrag, sk->sk_allocation)))
-		return true;
-
 	sk_stream_moderate_sndbuf(sk);
 	mptcp_for_each_subflow(msk, subflow) {
 		struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
@@ -1052,6 +1062,18 @@ static bool mptcp_page_frag_refill(struct sock *sk, struct page_frag *pfrag)
 		sk_stream_moderate_sndbuf(ssk);
 		first = false;
 	}
+}
+
+/* ensure we get enough memory for the frag hdr, beyond some minimal amount of
+ * data
+ */
+static bool mptcp_page_frag_refill(struct sock *sk, struct page_frag *pfrag)
+{
+	if (likely(skb_page_frag_refill(32U + sizeof(struct mptcp_data_frag),
+					pfrag, sk->sk_allocation)))
+		return true;
+
+	mptcp_enter_memory_pressure(sk);
 	return false;
 }
 
@@ -1098,6 +1120,128 @@ static int mptcp_check_allowed_size(struct mptcp_sock *msk, u64 data_seq,
 	return avail_size;
 }
 
+static bool __mptcp_add_ext(struct sk_buff *skb, gfp_t gfp)
+{
+	struct skb_ext *mpext = __skb_ext_alloc(gfp);
+
+	if (!mpext)
+		return false;
+	__skb_ext_set(skb, SKB_EXT_MPTCP, mpext);
+	return true;
+}
+
+static struct sk_buff *__mptcp_do_alloc_tx_skb(struct sock *sk)
+{
+	struct sk_buff *skb;
+
+	skb = alloc_skb_fclone(MAX_TCP_HEADER, sk->sk_allocation);
+	if (likely(skb)) {
+		if (likely(__mptcp_add_ext(skb, sk->sk_allocation))) {
+			skb_reserve(skb, MAX_TCP_HEADER);
+			skb->reserved_tailroom = skb->end - skb->tail;
+			return skb;
+		}
+		__kfree_skb(skb);
+	} else {
+		mptcp_enter_memory_pressure(sk);
+	}
+	return NULL;
+}
+
+static bool mptcp_tx_cache_refill(struct sock *sk, int size,
+				  struct sk_buff_head *skbs, int *total_ts)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
+	struct sk_buff *skb;
+	int space_needed;
+
+	if (unlikely(tcp_under_memory_pressure(sk))) {
+		mptcp_mem_reclaim_partial(sk);
+
+		/* under pressure pre-allocate at most a single skb */
+		if (msk->skb_tx_cache.qlen)
+			return true;
+		space_needed = msk->size_goal_cache;
+	} else {
+		space_needed = msk->tx_pending_data + size -
+			       msk->skb_tx_cache.qlen * msk->size_goal_cache;
+	}
+
+	while (space_needed > 0) {
+		skb = __mptcp_do_alloc_tx_skb(sk);
+		if (unlikely(!skb)) {
+			/* under memory pressure, try to pass the caller a
+			 * single skb to allow forward progress
+			 */
+			while (skbs->qlen > 1) {
+				skb = __skb_dequeue_tail(skbs);
+				__kfree_skb(skb);
+			}
+			return skbs->qlen > 0;
+		}
+
+		*total_ts += skb->truesize;
+		__skb_queue_tail(skbs, skb);
+		space_needed -= msk->size_goal_cache;
+	}
+	return true;
+}
+
+static bool __mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
+	struct sk_buff *skb;
+
+	if (ssk->sk_tx_skb_cache) {
+		skb = ssk->sk_tx_skb_cache;
+		if (unlikely(!skb_ext_find(skb, SKB_EXT_MPTCP) &&
+			     !__mptcp_add_ext(skb, sk->sk_allocation)))
+			return false;
+		return true;
+	}
+
+	skb = skb_peek(&msk->skb_tx_cache);
+	if (skb) {
+		if (likely(sk_wmem_schedule(ssk, skb->truesize))) {
+			skb = __skb_dequeue(&msk->skb_tx_cache);
+			if (WARN_ON_ONCE(!skb))
+				return false;
+
+			mptcp_wmem_uncharge(sk, skb->truesize);
+			ssk->sk_tx_skb_cache = skb;
+			return true;
+		}
+
+		/* over memory limit, no point to try to allocate a new skb */
+		return false;
+	}
+
+	skb = __mptcp_do_alloc_tx_skb(sk);
+	if (!skb)
+		return false;
+
+	if (likely(sk_wmem_schedule(ssk, skb->truesize))) {
+		ssk->sk_tx_skb_cache = skb;
+		return true;
+	}
+	kfree_skb(skb);
+	return false;
+}
+
+static bool mptcp_must_reclaim_memory(struct sock *sk, struct sock *ssk)
+{
+	return !ssk->sk_tx_skb_cache &&
+	       !skb_peek(&mptcp_sk(sk)->skb_tx_cache) &&
+	       tcp_under_memory_pressure(sk);
+}
+
+static bool mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk)
+{
+	if (unlikely(mptcp_must_reclaim_memory(sk, ssk)))
+		mptcp_mem_reclaim_partial(sk);
+	return __mptcp_alloc_tx_skb(sk, ssk);
+}
+
 static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
 			      struct mptcp_data_frag *dfrag,
 			      struct mptcp_sendmsg_info *info)
@@ -1109,7 +1253,7 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
 	struct sk_buff *skb, *tail;
 	bool can_collapse = false;
 	int avail_size;
-	size_t ret;
+	size_t ret = 0;
 
 	pr_debug("msk=%p ssk=%p sending dfrag at seq=%lld len=%d already sent=%d",
 		 msk, ssk, dfrag->data_seq, dfrag->data_len, info->sent);
@@ -1117,6 +1261,7 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
 	/* compute send limit */
 	info->mss_now = tcp_send_mss(ssk, &info->size_goal, info->flags);
 	avail_size = info->size_goal;
+	msk->size_goal_cache = info->size_goal;
 	skb = tcp_write_queue_tail(ssk);
 	if (skb) {
 		/* Limit the write to the size available in the
@@ -1165,8 +1310,11 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
 		goto out;
 	}
 
-	mpext = __skb_ext_set(tail, SKB_EXT_MPTCP, msk->cached_ext);
-	msk->cached_ext = NULL;
+	mpext = skb_ext_find(tail, SKB_EXT_MPTCP);
+	if (WARN_ON_ONCE(!mpext)) {
+		/* should never reach here, stream corrupted */
+		return -EINVAL;
+	}
 
 	memset(mpext, 0, sizeof(*mpext));
 	mpext->data_seq = data_seq;
@@ -1239,9 +1387,6 @@ static struct sock *mptcp_subflow_get_send(struct mptcp_sock *msk,
 	sock_owned_by_me((struct sock *)msk);
 
 	*sndbuf = 0;
-	if (!mptcp_ext_cache_refill(msk))
-		return NULL;
-
 	if (__mptcp_check_fallback(msk)) {
 		if (!msk->first)
 			return NULL;
@@ -1350,6 +1495,15 @@ static void mptcp_push_pending(struct sock *sk, unsigned int flags)
 			if (ssk != prev_ssk || !prev_ssk)
 				lock_sock(ssk);
 
+			/* keep it simple and always provide a new skb for the
+			 * subflow, even if we will not use it when collapsing
+			 * on the pending one
+			 */
+			if (!mptcp_alloc_tx_skb(sk, ssk)) {
+				mptcp_push_release(sk, ssk, &info);
+				goto out;
+			}
+
 			ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
 			if (ret <= 0) {
 				mptcp_push_release(sk, ssk, &info);
@@ -1360,6 +1514,7 @@ static void mptcp_push_pending(struct sock *sk, unsigned int flags)
 			dfrag->already_sent += ret;
 			msk->snd_nxt += ret;
 			msk->snd_burst -= ret;
+			msk->tx_pending_data -= ret;
 			copied += ret;
 			len -= ret;
 		}
@@ -1404,8 +1559,9 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 	mptcp_clean_una(sk);
 
 	while (msg_data_left(msg)) {
+		int total_ts, frag_truesize = 0;
 		struct mptcp_data_frag *dfrag;
-		int frag_truesize = 0;
+		struct sk_buff_head skbs;
 		bool dfrag_collapsed;
 		size_t psize, offset;
 
@@ -1439,9 +1595,17 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 		offset = dfrag->offset + dfrag->data_len;
 		psize = pfrag->size - offset;
 		psize = min_t(size_t, psize, msg_data_left(msg));
-		if (!mptcp_wmem_alloc(sk, psize + frag_truesize))
+		total_ts = psize + frag_truesize;
+		__skb_queue_head_init(&skbs);
+		if (!mptcp_tx_cache_refill(sk, psize, &skbs, &total_ts))
 			goto wait_for_memory;
 
+		if (!mptcp_wmem_alloc(sk, total_ts)) {
+			__skb_queue_purge(&skbs);
+			goto wait_for_memory;
+		}
+
+		skb_queue_splice_tail(&skbs, &msk->skb_tx_cache);
 		if (copy_page_from_iter(dfrag->page, offset, psize,
 					&msg->msg_iter) != psize) {
 			mptcp_wmem_uncharge(sk, psize + frag_truesize);
@@ -1470,8 +1634,6 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 			 dfrag->data_seq, dfrag->data_len, dfrag->already_sent,
 			 !dfrag_collapsed);
 
-		if (!mptcp_ext_cache_refill(msk))
-			goto wait_for_memory;
 		continue;
 
 wait_for_memory:
@@ -1483,8 +1645,10 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 			goto out;
 	}
 
-	if (copied)
+	if (copied) {
+		msk->tx_pending_data += copied;
 		mptcp_push_pending(sk, msg->msg_flags);
+	}
 
 out:
 	release_sock(sk);
@@ -2072,9 +2236,6 @@ static void mptcp_worker(struct work_struct *work)
 	if (!dfrag)
 		goto unlock;
 
-	if (!mptcp_ext_cache_refill(msk))
-		goto reset_unlock;
-
 	ssk = mptcp_subflow_get_retrans(msk);
 	if (!ssk)
 		goto reset_unlock;
@@ -2085,6 +2246,9 @@ static void mptcp_worker(struct work_struct *work)
 	info.sent = 0;
 	info.limit = dfrag->already_sent;
 	while (info.sent < dfrag->already_sent) {
+		if (!mptcp_alloc_tx_skb(sk, ssk))
+			break;
+
 		ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
 		if (ret <= 0)
 			break;
@@ -2092,9 +2256,6 @@ static void mptcp_worker(struct work_struct *work)
 		MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_RETRANSSEGS);
 		copied += ret;
 		info.sent += ret;
-
-		if (!mptcp_ext_cache_refill(msk))
-			break;
 	}
 	if (copied)
 		tcp_push(ssk, 0, info.mss_now, tcp_sk(ssk)->nonagle,
@@ -2123,10 +2284,13 @@ static int __mptcp_init_sock(struct sock *sk)
 	INIT_LIST_HEAD(&msk->rtx_queue);
 	INIT_WORK(&msk->work, mptcp_worker);
 	__skb_queue_head_init(&msk->receive_queue);
+	__skb_queue_head_init(&msk->skb_tx_cache);
 	msk->out_of_order_queue = RB_ROOT;
 	msk->first_pending = NULL;
 	msk->wmem_reserved = 0;
 	msk->rmem_released = 0;
+	msk->tx_pending_data = 0;
+	msk->size_goal_cache = TCP_BASE_MSS;
 
 	msk->ack_hint = NULL;
 	msk->first = NULL;
@@ -2170,12 +2334,17 @@ static void __mptcp_clear_xmit(struct sock *sk)
 {
 	struct mptcp_sock *msk = mptcp_sk(sk);
 	struct mptcp_data_frag *dtmp, *dfrag;
+	struct sk_buff *skb;
 
 	sk_stop_timer(sk, &msk->sk.icsk_retransmit_timer);
 
 	WRITE_ONCE(msk->first_pending, NULL);
 	list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list)
 		dfrag_clear(sk, dfrag);
+	while ((skb = __skb_dequeue(&msk->skb_tx_cache)) != NULL) {
+		sk->sk_forward_alloc += skb->truesize;
+		kfree_skb(skb);
+	}
 }
 
 static void mptcp_cancel_work(struct sock *sk)
@@ -2554,9 +2723,6 @@ static void mptcp_destroy(struct sock *sk)
 {
 	struct mptcp_sock *msk = mptcp_sk(sk);
 
-	if (msk->cached_ext)
-		__skb_ext_put(msk->cached_ext);
-
 	mptcp_destroy_common(msk);
 	sk_sockets_allocated_dec(sk);
 }
diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h
index fe2efd923c5c..97c1e5dcb3e2 100644
--- a/net/mptcp/protocol.h
+++ b/net/mptcp/protocol.h
@@ -240,11 +240,13 @@ struct mptcp_sock {
 	struct sk_buff  *ooo_last_skb;
 	struct rb_root  out_of_order_queue;
 	struct sk_buff_head receive_queue;
+	struct sk_buff_head skb_tx_cache;	/* this is wmem accounted */
+	int		tx_pending_data;
+	int		size_goal_cache;
 	struct list_head conn_list;
 	struct list_head rtx_queue;
 	struct mptcp_data_frag *first_pending;
 	struct list_head join_list;
-	struct skb_ext	*cached_ext;	/* for the next sendmsg */
 	struct socket	*subflow; /* outgoing connect/listener/!mp_capable */
 	struct sock	*first;
 	struct mptcp_pm_data	pm;
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [MPTCP] [PATCH net-next 5/6] mptcp: avoid a few atomic ops in the rx path
  2020-11-27 10:10 ` Paolo Abeni
@ 2020-11-27 10:10 ` Paolo Abeni
  -1 siblings, 0 replies; 28+ messages in thread
From: Paolo Abeni @ 2020-11-27 10:10 UTC (permalink / raw)
  To: mptcp

[-- Attachment #1: Type: text/plain, Size: 8132 bytes --]

Extending the data_lock scope in mptcp_incoming_option
we can use that to protect both snd_una and wnd_end.
In the typical case, we will have a single atomic op instead of 2

Acked-by: Florian Westphal <fw(a)strlen.de>
Signed-off-by: Paolo Abeni <pabeni(a)redhat.com>
---
 net/mptcp/mptcp_diag.c |  2 +-
 net/mptcp/options.c    | 33 +++++++++++++--------------------
 net/mptcp/protocol.c   | 34 ++++++++++++++++------------------
 net/mptcp/protocol.h   |  8 ++++----
 4 files changed, 34 insertions(+), 43 deletions(-)

diff --git a/net/mptcp/mptcp_diag.c b/net/mptcp/mptcp_diag.c
index 5f390a97f556..b70ae4ba3000 100644
--- a/net/mptcp/mptcp_diag.c
+++ b/net/mptcp/mptcp_diag.c
@@ -140,7 +140,7 @@ static void mptcp_diag_get_info(struct sock *sk, struct inet_diag_msg *r,
 	info->mptcpi_flags = flags;
 	info->mptcpi_token = READ_ONCE(msk->token);
 	info->mptcpi_write_seq = READ_ONCE(msk->write_seq);
-	info->mptcpi_snd_una = atomic64_read(&msk->snd_una);
+	info->mptcpi_snd_una = READ_ONCE(msk->snd_una);
 	info->mptcpi_rcv_nxt = READ_ONCE(msk->ack_seq);
 	unlock_sock_fast(sk, slow);
 }
diff --git a/net/mptcp/options.c b/net/mptcp/options.c
index 8a59b3e44599..3986454a0340 100644
--- a/net/mptcp/options.c
+++ b/net/mptcp/options.c
@@ -833,15 +833,17 @@ static void ack_update_msk(struct mptcp_sock *msk,
 			   const struct sock *ssk,
 			   struct mptcp_options_received *mp_opt)
 {
-	u64 new_snd_una, snd_una, old_snd_una = atomic64_read(&msk->snd_una);
-	u64 new_wnd_end, wnd_end, old_wnd_end = atomic64_read(&msk->wnd_end);
-	u64 snd_nxt = READ_ONCE(msk->snd_nxt);
+	u64 new_wnd_end, new_snd_una, snd_nxt = READ_ONCE(msk->snd_nxt);
 	struct sock *sk = (struct sock *)msk;
+	u64 old_snd_una;
+
+	mptcp_data_lock(sk);
 
 	/* avoid ack expansion on update conflict, to reduce the risk of
 	 * wrongly expanding to a future ack sequence number, which is way
 	 * more dangerous than missing an ack
 	 */
+	old_snd_una = msk->snd_una;
 	new_snd_una = expand_ack(old_snd_una, mp_opt->data_ack, mp_opt->ack64);
 
 	/* ACK for data not even sent yet? Ignore. */
@@ -850,26 +852,17 @@ static void ack_update_msk(struct mptcp_sock *msk,
 
 	new_wnd_end = new_snd_una + tcp_sk(ssk)->snd_wnd;
 
-	while (after64(new_wnd_end, old_wnd_end)) {
-		wnd_end = old_wnd_end;
-		old_wnd_end = atomic64_cmpxchg(&msk->wnd_end, wnd_end,
-					       new_wnd_end);
-		if (old_wnd_end == wnd_end) {
-			if (mptcp_send_head(sk))
-				mptcp_schedule_work(sk);
-			break;
-		}
+	if (after64(new_wnd_end, msk->wnd_end)) {
+		msk->wnd_end = new_wnd_end;
+		if (mptcp_send_head(sk))
+			mptcp_schedule_work(sk);
 	}
 
-	while (after64(new_snd_una, old_snd_una)) {
-		snd_una = old_snd_una;
-		old_snd_una = atomic64_cmpxchg(&msk->snd_una, snd_una,
-					       new_snd_una);
-		if (old_snd_una == snd_una) {
-			mptcp_data_acked(sk);
-			break;
-		}
+	if (after64(new_snd_una, old_snd_una)) {
+		msk->snd_una = new_snd_una;
+		__mptcp_data_acked(sk);
 	}
+	mptcp_data_unlock(sk);
 }
 
 bool mptcp_update_rcv_data_fin(struct mptcp_sock *msk, u64 data_fin_seq, bool use_64bit)
diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
index 75b4c4c50dbb..51f92f3096bf 100644
--- a/net/mptcp/protocol.c
+++ b/net/mptcp/protocol.c
@@ -60,7 +60,7 @@ static struct socket *__mptcp_nmpc_socket(const struct mptcp_sock *msk)
 /* Returns end sequence number of the receiver's advertised window */
 static u64 mptcp_wnd_end(const struct mptcp_sock *msk)
 {
-	return atomic64_read(&msk->wnd_end);
+	return READ_ONCE(msk->wnd_end);
 }
 
 static bool mptcp_is_tcpsk(struct sock *sk)
@@ -358,7 +358,7 @@ static void mptcp_check_data_fin_ack(struct sock *sk)
 	/* Look for an acknowledged DATA_FIN */
 	if (((1 << sk->sk_state) &
 	     (TCPF_FIN_WAIT1 | TCPF_CLOSING | TCPF_LAST_ACK)) &&
-	    msk->write_seq == atomic64_read(&msk->snd_una)) {
+	    msk->write_seq == READ_ONCE(msk->snd_una)) {
 		mptcp_stop_timer(sk);
 
 		WRITE_ONCE(msk->snd_data_fin_enable, 0);
@@ -764,7 +764,7 @@ bool mptcp_schedule_work(struct sock *sk)
 	return false;
 }
 
-void mptcp_data_acked(struct sock *sk)
+void __mptcp_data_acked(struct sock *sk)
 {
 	mptcp_reset_timer(sk);
 
@@ -997,11 +997,11 @@ static void mptcp_clean_una(struct sock *sk)
 	 * plain TCP
 	 */
 	if (__mptcp_check_fallback(msk))
-		atomic64_set(&msk->snd_una, msk->snd_nxt);
+		msk->snd_una = READ_ONCE(msk->snd_nxt);
 
-	mptcp_data_lock(sk);
-	snd_una = atomic64_read(&msk->snd_una);
 
+	mptcp_data_lock(sk);
+	snd_una = msk->snd_una;
 	list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list) {
 		if (after64(dfrag->data_seq + dfrag->data_len, snd_una))
 			break;
@@ -1282,10 +1282,12 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
 	/* Zero window and all data acked? Probe. */
 	avail_size = mptcp_check_allowed_size(msk, data_seq, avail_size);
 	if (avail_size == 0) {
-		if (skb || atomic64_read(&msk->snd_una) != msk->snd_nxt)
+		u64 snd_una = READ_ONCE(msk->snd_una);
+
+		if (skb || snd_una != msk->snd_nxt)
 			return 0;
 		zero_window_probe = true;
-		data_seq = atomic64_read(&msk->snd_una) - 1;
+		data_seq = snd_una - 1;
 		avail_size = 1;
 	}
 
@@ -1994,12 +1996,8 @@ static void mptcp_retransmit_handler(struct sock *sk)
 {
 	struct mptcp_sock *msk = mptcp_sk(sk);
 
-	if (atomic64_read(&msk->snd_una) == READ_ONCE(msk->snd_nxt)) {
-		mptcp_stop_timer(sk);
-	} else {
-		set_bit(MPTCP_WORK_RTX, &msk->flags);
-		mptcp_schedule_work(sk);
-	}
+	set_bit(MPTCP_WORK_RTX, &msk->flags);
+	mptcp_schedule_work(sk);
 }
 
 static void mptcp_retransmit_timer(struct timer_list *t)
@@ -2621,8 +2619,8 @@ struct sock *mptcp_sk_clone(const struct sock *sk,
 
 	msk->write_seq = subflow_req->idsn + 1;
 	msk->snd_nxt = msk->write_seq;
-	atomic64_set(&msk->snd_una, msk->write_seq);
-	atomic64_set(&msk->wnd_end, msk->snd_nxt + req->rsk_rcv_wnd);
+	msk->snd_una = msk->write_seq;
+	msk->wnd_end = msk->snd_nxt + req->rsk_rcv_wnd;
 
 	if (mp_opt->mp_capable) {
 		msk->can_ack = true;
@@ -2658,7 +2656,7 @@ void mptcp_rcv_space_init(struct mptcp_sock *msk, const struct sock *ssk)
 	if (msk->rcvq_space.space == 0)
 		msk->rcvq_space.space = TCP_INIT_CWND * TCP_MSS_DEFAULT;
 
-	atomic64_set(&msk->wnd_end, msk->snd_nxt + tcp_sk(ssk)->snd_wnd);
+	WRITE_ONCE(msk->wnd_end, msk->snd_nxt + tcp_sk(ssk)->snd_wnd);
 }
 
 static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
@@ -2918,7 +2916,7 @@ void mptcp_finish_connect(struct sock *ssk)
 	WRITE_ONCE(msk->ack_seq, ack_seq);
 	WRITE_ONCE(msk->rcv_wnd_sent, ack_seq);
 	WRITE_ONCE(msk->can_ack, 1);
-	atomic64_set(&msk->snd_una, msk->write_seq);
+	WRITE_ONCE(msk->snd_una, msk->write_seq);
 
 	mptcp_pm_new_connection(msk, 0);
 
diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h
index 97c1e5dcb3e2..3c07aafde10e 100644
--- a/net/mptcp/protocol.h
+++ b/net/mptcp/protocol.h
@@ -222,8 +222,8 @@ struct mptcp_sock {
 	struct sock	*last_snd;
 	int		snd_burst;
 	int		old_wspace;
-	atomic64_t	snd_una;
-	atomic64_t	wnd_end;
+	u64		snd_una;
+	u64		wnd_end;
 	unsigned long	timer_ival;
 	u32		token;
 	int		rmem_pending;
@@ -321,7 +321,7 @@ static inline struct mptcp_data_frag *mptcp_rtx_tail(const struct sock *sk)
 {
 	struct mptcp_sock *msk = mptcp_sk(sk);
 
-	if (!before64(msk->snd_nxt, atomic64_read(&msk->snd_una)))
+	if (!before64(msk->snd_nxt, READ_ONCE(msk->snd_una)))
 		return NULL;
 
 	return list_last_entry(&msk->rtx_queue, struct mptcp_data_frag, list);
@@ -495,7 +495,7 @@ void mptcp_rcv_space_init(struct mptcp_sock *msk, const struct sock *ssk);
 void mptcp_data_ready(struct sock *sk, struct sock *ssk);
 bool mptcp_finish_join(struct sock *sk);
 bool mptcp_schedule_work(struct sock *sk);
-void mptcp_data_acked(struct sock *sk);
+void __mptcp_data_acked(struct sock *sk);
 void mptcp_subflow_eof(struct sock *sk);
 bool mptcp_update_rcv_data_fin(struct mptcp_sock *msk, u64 data_fin_seq, bool use_64bit);
 void __mptcp_flush_join_list(struct mptcp_sock *msk);
-- 
2.26.2

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [PATCH net-next 5/6] mptcp: avoid a few atomic ops in the rx path
@ 2020-11-27 10:10 ` Paolo Abeni
  0 siblings, 0 replies; 28+ messages in thread
From: Paolo Abeni @ 2020-11-27 10:10 UTC (permalink / raw)
  To: netdev; +Cc: Jakub Kicinski, mptcp, Eric Dumazet

Extending the data_lock scope in mptcp_incoming_option
we can use that to protect both snd_una and wnd_end.
In the typical case, we will have a single atomic op instead of 2

Acked-by: Florian Westphal <fw@strlen.de>
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
---
 net/mptcp/mptcp_diag.c |  2 +-
 net/mptcp/options.c    | 33 +++++++++++++--------------------
 net/mptcp/protocol.c   | 34 ++++++++++++++++------------------
 net/mptcp/protocol.h   |  8 ++++----
 4 files changed, 34 insertions(+), 43 deletions(-)

diff --git a/net/mptcp/mptcp_diag.c b/net/mptcp/mptcp_diag.c
index 5f390a97f556..b70ae4ba3000 100644
--- a/net/mptcp/mptcp_diag.c
+++ b/net/mptcp/mptcp_diag.c
@@ -140,7 +140,7 @@ static void mptcp_diag_get_info(struct sock *sk, struct inet_diag_msg *r,
 	info->mptcpi_flags = flags;
 	info->mptcpi_token = READ_ONCE(msk->token);
 	info->mptcpi_write_seq = READ_ONCE(msk->write_seq);
-	info->mptcpi_snd_una = atomic64_read(&msk->snd_una);
+	info->mptcpi_snd_una = READ_ONCE(msk->snd_una);
 	info->mptcpi_rcv_nxt = READ_ONCE(msk->ack_seq);
 	unlock_sock_fast(sk, slow);
 }
diff --git a/net/mptcp/options.c b/net/mptcp/options.c
index 8a59b3e44599..3986454a0340 100644
--- a/net/mptcp/options.c
+++ b/net/mptcp/options.c
@@ -833,15 +833,17 @@ static void ack_update_msk(struct mptcp_sock *msk,
 			   const struct sock *ssk,
 			   struct mptcp_options_received *mp_opt)
 {
-	u64 new_snd_una, snd_una, old_snd_una = atomic64_read(&msk->snd_una);
-	u64 new_wnd_end, wnd_end, old_wnd_end = atomic64_read(&msk->wnd_end);
-	u64 snd_nxt = READ_ONCE(msk->snd_nxt);
+	u64 new_wnd_end, new_snd_una, snd_nxt = READ_ONCE(msk->snd_nxt);
 	struct sock *sk = (struct sock *)msk;
+	u64 old_snd_una;
+
+	mptcp_data_lock(sk);
 
 	/* avoid ack expansion on update conflict, to reduce the risk of
 	 * wrongly expanding to a future ack sequence number, which is way
 	 * more dangerous than missing an ack
 	 */
+	old_snd_una = msk->snd_una;
 	new_snd_una = expand_ack(old_snd_una, mp_opt->data_ack, mp_opt->ack64);
 
 	/* ACK for data not even sent yet? Ignore. */
@@ -850,26 +852,17 @@ static void ack_update_msk(struct mptcp_sock *msk,
 
 	new_wnd_end = new_snd_una + tcp_sk(ssk)->snd_wnd;
 
-	while (after64(new_wnd_end, old_wnd_end)) {
-		wnd_end = old_wnd_end;
-		old_wnd_end = atomic64_cmpxchg(&msk->wnd_end, wnd_end,
-					       new_wnd_end);
-		if (old_wnd_end == wnd_end) {
-			if (mptcp_send_head(sk))
-				mptcp_schedule_work(sk);
-			break;
-		}
+	if (after64(new_wnd_end, msk->wnd_end)) {
+		msk->wnd_end = new_wnd_end;
+		if (mptcp_send_head(sk))
+			mptcp_schedule_work(sk);
 	}
 
-	while (after64(new_snd_una, old_snd_una)) {
-		snd_una = old_snd_una;
-		old_snd_una = atomic64_cmpxchg(&msk->snd_una, snd_una,
-					       new_snd_una);
-		if (old_snd_una == snd_una) {
-			mptcp_data_acked(sk);
-			break;
-		}
+	if (after64(new_snd_una, old_snd_una)) {
+		msk->snd_una = new_snd_una;
+		__mptcp_data_acked(sk);
 	}
+	mptcp_data_unlock(sk);
 }
 
 bool mptcp_update_rcv_data_fin(struct mptcp_sock *msk, u64 data_fin_seq, bool use_64bit)
diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
index 75b4c4c50dbb..51f92f3096bf 100644
--- a/net/mptcp/protocol.c
+++ b/net/mptcp/protocol.c
@@ -60,7 +60,7 @@ static struct socket *__mptcp_nmpc_socket(const struct mptcp_sock *msk)
 /* Returns end sequence number of the receiver's advertised window */
 static u64 mptcp_wnd_end(const struct mptcp_sock *msk)
 {
-	return atomic64_read(&msk->wnd_end);
+	return READ_ONCE(msk->wnd_end);
 }
 
 static bool mptcp_is_tcpsk(struct sock *sk)
@@ -358,7 +358,7 @@ static void mptcp_check_data_fin_ack(struct sock *sk)
 	/* Look for an acknowledged DATA_FIN */
 	if (((1 << sk->sk_state) &
 	     (TCPF_FIN_WAIT1 | TCPF_CLOSING | TCPF_LAST_ACK)) &&
-	    msk->write_seq == atomic64_read(&msk->snd_una)) {
+	    msk->write_seq == READ_ONCE(msk->snd_una)) {
 		mptcp_stop_timer(sk);
 
 		WRITE_ONCE(msk->snd_data_fin_enable, 0);
@@ -764,7 +764,7 @@ bool mptcp_schedule_work(struct sock *sk)
 	return false;
 }
 
-void mptcp_data_acked(struct sock *sk)
+void __mptcp_data_acked(struct sock *sk)
 {
 	mptcp_reset_timer(sk);
 
@@ -997,11 +997,11 @@ static void mptcp_clean_una(struct sock *sk)
 	 * plain TCP
 	 */
 	if (__mptcp_check_fallback(msk))
-		atomic64_set(&msk->snd_una, msk->snd_nxt);
+		msk->snd_una = READ_ONCE(msk->snd_nxt);
 
-	mptcp_data_lock(sk);
-	snd_una = atomic64_read(&msk->snd_una);
 
+	mptcp_data_lock(sk);
+	snd_una = msk->snd_una;
 	list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list) {
 		if (after64(dfrag->data_seq + dfrag->data_len, snd_una))
 			break;
@@ -1282,10 +1282,12 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
 	/* Zero window and all data acked? Probe. */
 	avail_size = mptcp_check_allowed_size(msk, data_seq, avail_size);
 	if (avail_size == 0) {
-		if (skb || atomic64_read(&msk->snd_una) != msk->snd_nxt)
+		u64 snd_una = READ_ONCE(msk->snd_una);
+
+		if (skb || snd_una != msk->snd_nxt)
 			return 0;
 		zero_window_probe = true;
-		data_seq = atomic64_read(&msk->snd_una) - 1;
+		data_seq = snd_una - 1;
 		avail_size = 1;
 	}
 
@@ -1994,12 +1996,8 @@ static void mptcp_retransmit_handler(struct sock *sk)
 {
 	struct mptcp_sock *msk = mptcp_sk(sk);
 
-	if (atomic64_read(&msk->snd_una) == READ_ONCE(msk->snd_nxt)) {
-		mptcp_stop_timer(sk);
-	} else {
-		set_bit(MPTCP_WORK_RTX, &msk->flags);
-		mptcp_schedule_work(sk);
-	}
+	set_bit(MPTCP_WORK_RTX, &msk->flags);
+	mptcp_schedule_work(sk);
 }
 
 static void mptcp_retransmit_timer(struct timer_list *t)
@@ -2621,8 +2619,8 @@ struct sock *mptcp_sk_clone(const struct sock *sk,
 
 	msk->write_seq = subflow_req->idsn + 1;
 	msk->snd_nxt = msk->write_seq;
-	atomic64_set(&msk->snd_una, msk->write_seq);
-	atomic64_set(&msk->wnd_end, msk->snd_nxt + req->rsk_rcv_wnd);
+	msk->snd_una = msk->write_seq;
+	msk->wnd_end = msk->snd_nxt + req->rsk_rcv_wnd;
 
 	if (mp_opt->mp_capable) {
 		msk->can_ack = true;
@@ -2658,7 +2656,7 @@ void mptcp_rcv_space_init(struct mptcp_sock *msk, const struct sock *ssk)
 	if (msk->rcvq_space.space == 0)
 		msk->rcvq_space.space = TCP_INIT_CWND * TCP_MSS_DEFAULT;
 
-	atomic64_set(&msk->wnd_end, msk->snd_nxt + tcp_sk(ssk)->snd_wnd);
+	WRITE_ONCE(msk->wnd_end, msk->snd_nxt + tcp_sk(ssk)->snd_wnd);
 }
 
 static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
@@ -2918,7 +2916,7 @@ void mptcp_finish_connect(struct sock *ssk)
 	WRITE_ONCE(msk->ack_seq, ack_seq);
 	WRITE_ONCE(msk->rcv_wnd_sent, ack_seq);
 	WRITE_ONCE(msk->can_ack, 1);
-	atomic64_set(&msk->snd_una, msk->write_seq);
+	WRITE_ONCE(msk->snd_una, msk->write_seq);
 
 	mptcp_pm_new_connection(msk, 0);
 
diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h
index 97c1e5dcb3e2..3c07aafde10e 100644
--- a/net/mptcp/protocol.h
+++ b/net/mptcp/protocol.h
@@ -222,8 +222,8 @@ struct mptcp_sock {
 	struct sock	*last_snd;
 	int		snd_burst;
 	int		old_wspace;
-	atomic64_t	snd_una;
-	atomic64_t	wnd_end;
+	u64		snd_una;
+	u64		wnd_end;
 	unsigned long	timer_ival;
 	u32		token;
 	int		rmem_pending;
@@ -321,7 +321,7 @@ static inline struct mptcp_data_frag *mptcp_rtx_tail(const struct sock *sk)
 {
 	struct mptcp_sock *msk = mptcp_sk(sk);
 
-	if (!before64(msk->snd_nxt, atomic64_read(&msk->snd_una)))
+	if (!before64(msk->snd_nxt, READ_ONCE(msk->snd_una)))
 		return NULL;
 
 	return list_last_entry(&msk->rtx_queue, struct mptcp_data_frag, list);
@@ -495,7 +495,7 @@ void mptcp_rcv_space_init(struct mptcp_sock *msk, const struct sock *ssk);
 void mptcp_data_ready(struct sock *sk, struct sock *ssk);
 bool mptcp_finish_join(struct sock *sk);
 bool mptcp_schedule_work(struct sock *sk);
-void mptcp_data_acked(struct sock *sk);
+void __mptcp_data_acked(struct sock *sk);
 void mptcp_subflow_eof(struct sock *sk);
 bool mptcp_update_rcv_data_fin(struct mptcp_sock *msk, u64 data_fin_seq, bool use_64bit);
 void __mptcp_flush_join_list(struct mptcp_sock *msk);
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [MPTCP] [PATCH net-next 6/6] mptcp: use mptcp release_cb for delayed tasks
  2020-11-27 10:10 ` Paolo Abeni
@ 2020-11-27 10:10 ` Paolo Abeni
  -1 siblings, 0 replies; 28+ messages in thread
From: Paolo Abeni @ 2020-11-27 10:10 UTC (permalink / raw)
  To: mptcp

[-- Attachment #1: Type: text/plain, Size: 17930 bytes --]

We have some tasks triggered by the subflow receive path
which require to access the msk socket status, specifically:
mptcp_clean_una() and mptcp_push_pending()

We have almost everything in place to defer to the msk
release_cb such tasks when the msk sock is owned.

Since the worker is no more used to clean the acked data,
for fallback sockets we need to explicitly flush them.

As an added bonus we can move the wake-up code in __mptcp_clean_una(),
simplify a lot mptcp_poll() and move the timer update under
the data lock.

The worker is now used only to process and send DATA_FIN
packets and do the mptcp-level retransmissions.

Acked-by: Florian Westphal <fw(a)strlen.de>
Signed-off-by: Paolo Abeni <pabeni(a)redhat.com>
---
 net/mptcp/options.c  |  18 +++-
 net/mptcp/protocol.c | 250 ++++++++++++++++++++++++++-----------------
 net/mptcp/protocol.h |   3 +
 net/mptcp/subflow.c  |  14 +--
 4 files changed, 168 insertions(+), 117 deletions(-)

diff --git a/net/mptcp/options.c b/net/mptcp/options.c
index 3986454a0340..6b7b4b67f18c 100644
--- a/net/mptcp/options.c
+++ b/net/mptcp/options.c
@@ -830,7 +830,7 @@ static u64 expand_ack(u64 old_ack, u64 cur_ack, bool use_64bit)
 }
 
 static void ack_update_msk(struct mptcp_sock *msk,
-			   const struct sock *ssk,
+			   struct sock *ssk,
 			   struct mptcp_options_received *mp_opt)
 {
 	u64 new_wnd_end, new_snd_una, snd_nxt = READ_ONCE(msk->snd_nxt);
@@ -854,8 +854,7 @@ static void ack_update_msk(struct mptcp_sock *msk,
 
 	if (after64(new_wnd_end, msk->wnd_end)) {
 		msk->wnd_end = new_wnd_end;
-		if (mptcp_send_head(sk))
-			mptcp_schedule_work(sk);
+		__mptcp_wnd_updated(sk, ssk);
 	}
 
 	if (after64(new_snd_una, old_snd_una)) {
@@ -915,8 +914,19 @@ void mptcp_incoming_options(struct sock *sk, struct sk_buff *skb)
 	struct mptcp_options_received mp_opt;
 	struct mptcp_ext *mpext;
 
-	if (__mptcp_check_fallback(msk))
+	if (__mptcp_check_fallback(msk)) {
+		/* Keep it simple and unconditionally trigger send data cleanup and
+		 * pending queue spooling. We will need to acquire the data lock
+		 * for more accurate checks, and once the lock is acquired, such
+		 * helpers are cheap.
+		 */
+		mptcp_data_lock(subflow->conn);
+		if (mptcp_send_head(subflow->conn))
+			__mptcp_wnd_updated(subflow->conn, sk);
+		__mptcp_data_acked(subflow->conn);
+		mptcp_data_unlock(subflow->conn);
 		return;
+	}
 
 	mptcp_get_options(skb, &mp_opt);
 	if (!check_fully_established(msk, sk, subflow, skb, &mp_opt))
diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
index 51f92f3096bf..221f7cdd416b 100644
--- a/net/mptcp/protocol.c
+++ b/net/mptcp/protocol.c
@@ -348,17 +348,22 @@ static void mptcp_close_wake_up(struct sock *sk)
 		sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
 }
 
-static void mptcp_check_data_fin_ack(struct sock *sk)
+static bool mptcp_pending_data_fin_ack(struct sock *sk)
 {
 	struct mptcp_sock *msk = mptcp_sk(sk);
 
-	if (__mptcp_check_fallback(msk))
-		return;
+	return !__mptcp_check_fallback(msk) &&
+	       ((1 << sk->sk_state) &
+		(TCPF_FIN_WAIT1 | TCPF_CLOSING | TCPF_LAST_ACK)) &&
+	       msk->write_seq == READ_ONCE(msk->snd_una);
+}
+
+static void mptcp_check_data_fin_ack(struct sock *sk)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
 
 	/* Look for an acknowledged DATA_FIN */
-	if (((1 << sk->sk_state) &
-	     (TCPF_FIN_WAIT1 | TCPF_CLOSING | TCPF_LAST_ACK)) &&
-	    msk->write_seq == READ_ONCE(msk->snd_una)) {
+	if (mptcp_pending_data_fin_ack(sk)) {
 		mptcp_stop_timer(sk);
 
 		WRITE_ONCE(msk->snd_data_fin_enable, 0);
@@ -764,16 +769,6 @@ bool mptcp_schedule_work(struct sock *sk)
 	return false;
 }
 
-void __mptcp_data_acked(struct sock *sk)
-{
-	mptcp_reset_timer(sk);
-
-	if ((test_bit(MPTCP_NOSPACE, &mptcp_sk(sk)->flags) ||
-	     mptcp_send_head(sk) ||
-	     (inet_sk_state_load(sk) != TCP_ESTABLISHED)))
-		mptcp_schedule_work(sk);
-}
-
 void mptcp_subflow_eof(struct sock *sk)
 {
 	if (!test_and_set_bit(MPTCP_WORK_EOF, &mptcp_sk(sk)->flags))
@@ -986,7 +981,7 @@ static void dfrag_clear(struct sock *sk, struct mptcp_data_frag *dfrag)
 	put_page(dfrag->page);
 }
 
-static void mptcp_clean_una(struct sock *sk)
+static void __mptcp_clean_una(struct sock *sk)
 {
 	struct mptcp_sock *msk = mptcp_sk(sk);
 	struct mptcp_data_frag *dtmp, *dfrag;
@@ -999,8 +994,6 @@ static void mptcp_clean_una(struct sock *sk)
 	if (__mptcp_check_fallback(msk))
 		msk->snd_una = READ_ONCE(msk->snd_nxt);
 
-
-	mptcp_data_lock(sk);
 	snd_una = msk->snd_una;
 	list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list) {
 		if (after64(dfrag->data_seq + dfrag->data_len, snd_una))
@@ -1029,21 +1022,25 @@ static void mptcp_clean_una(struct sock *sk)
 	}
 
 out:
-	if (cleaned && tcp_under_memory_pressure(sk))
-		sk_mem_reclaim_partial(sk);
-	mptcp_data_unlock(sk);
-}
-
-static void mptcp_clean_una_wakeup(struct sock *sk)
-{
-	struct mptcp_sock *msk = mptcp_sk(sk);
+	if (cleaned) {
+		if (tcp_under_memory_pressure(sk)) {
+			__mptcp_update_wmem(sk);
+			sk_mem_reclaim_partial(sk);
+		}
 
-	mptcp_clean_una(sk);
+		if (sk_stream_is_writeable(sk)) {
+			/* pairs with memory barrier in mptcp_poll */
+			smp_mb();
+			if (test_and_clear_bit(MPTCP_NOSPACE, &msk->flags))
+				sk_stream_write_space(sk);
+		}
+	}
 
-	/* Only wake up writers if a subflow is ready */
-	if (sk_stream_is_writeable(sk)) {
-		clear_bit(MPTCP_NOSPACE, &msk->flags);
-		sk_stream_write_space(sk);
+	if (snd_una == READ_ONCE(msk->snd_nxt)) {
+		if (msk->timer_ival)
+			mptcp_stop_timer(sk);
+	} else {
+		mptcp_reset_timer(sk);
 	}
 }
 
@@ -1130,13 +1127,13 @@ static bool __mptcp_add_ext(struct sk_buff *skb, gfp_t gfp)
 	return true;
 }
 
-static struct sk_buff *__mptcp_do_alloc_tx_skb(struct sock *sk)
+static struct sk_buff *__mptcp_do_alloc_tx_skb(struct sock *sk, gfp_t gfp)
 {
 	struct sk_buff *skb;
 
-	skb = alloc_skb_fclone(MAX_TCP_HEADER, sk->sk_allocation);
+	skb = alloc_skb_fclone(MAX_TCP_HEADER, gfp);
 	if (likely(skb)) {
-		if (likely(__mptcp_add_ext(skb, sk->sk_allocation))) {
+		if (likely(__mptcp_add_ext(skb, gfp))) {
 			skb_reserve(skb, MAX_TCP_HEADER);
 			skb->reserved_tailroom = skb->end - skb->tail;
 			return skb;
@@ -1168,7 +1165,7 @@ static bool mptcp_tx_cache_refill(struct sock *sk, int size,
 	}
 
 	while (space_needed > 0) {
-		skb = __mptcp_do_alloc_tx_skb(sk);
+		skb = __mptcp_do_alloc_tx_skb(sk, sk->sk_allocation);
 		if (unlikely(!skb)) {
 			/* under memory pressure, try to pass the caller a
 			 * single skb to allow forward progress
@@ -1187,7 +1184,7 @@ static bool mptcp_tx_cache_refill(struct sock *sk, int size,
 	return true;
 }
 
-static bool __mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk)
+static bool __mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk, gfp_t gfp)
 {
 	struct mptcp_sock *msk = mptcp_sk(sk);
 	struct sk_buff *skb;
@@ -1195,7 +1192,7 @@ static bool __mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk)
 	if (ssk->sk_tx_skb_cache) {
 		skb = ssk->sk_tx_skb_cache;
 		if (unlikely(!skb_ext_find(skb, SKB_EXT_MPTCP) &&
-			     !__mptcp_add_ext(skb, sk->sk_allocation)))
+			     !__mptcp_add_ext(skb, gfp)))
 			return false;
 		return true;
 	}
@@ -1216,7 +1213,7 @@ static bool __mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk)
 		return false;
 	}
 
-	skb = __mptcp_do_alloc_tx_skb(sk);
+	skb = __mptcp_do_alloc_tx_skb(sk, gfp);
 	if (!skb)
 		return false;
 
@@ -1239,7 +1236,7 @@ static bool mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk)
 {
 	if (unlikely(mptcp_must_reclaim_memory(sk, ssk)))
 		mptcp_mem_reclaim_partial(sk);
-	return __mptcp_alloc_tx_skb(sk, ssk);
+	return __mptcp_alloc_tx_skb(sk, ssk, sk->sk_allocation);
 }
 
 static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
@@ -1340,31 +1337,6 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
 	return ret;
 }
 
-static void mptcp_nospace(struct mptcp_sock *msk)
-{
-	struct mptcp_subflow_context *subflow;
-
-	set_bit(MPTCP_NOSPACE, &msk->flags);
-	smp_mb__after_atomic(); /* msk->flags is changed by write_space cb */
-
-	mptcp_for_each_subflow(msk, subflow) {
-		struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
-		bool ssk_writeable = sk_stream_is_writeable(ssk);
-		struct socket *sock = READ_ONCE(ssk->sk_socket);
-
-		if (ssk_writeable || !sock)
-			continue;
-
-		/* enables ssk->write_space() callbacks */
-		set_bit(SOCK_NOSPACE, &sock->flags);
-	}
-
-	/* mptcp_data_acked() could run just before we set the NOSPACE bit,
-	 * so explicitly check for snd_una value
-	 */
-	mptcp_clean_una((struct sock *)msk);
-}
-
 #define MPTCP_SEND_BURST_SIZE		((1 << 16) - \
 					 sizeof(struct tcphdr) - \
 					 MAX_TCP_OPTION_SPACE - \
@@ -1536,6 +1508,63 @@ static void mptcp_push_pending(struct sock *sk, unsigned int flags)
 	}
 }
 
+static void __mptcp_subflow_push_pending(struct sock *sk, struct sock *ssk)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
+	struct mptcp_sendmsg_info info;
+	struct mptcp_data_frag *dfrag;
+	int len, copied = 0;
+
+	info.flags = 0;
+	while ((dfrag = mptcp_send_head(sk))) {
+		info.sent = dfrag->already_sent;
+		info.limit = dfrag->data_len;
+		len = dfrag->data_len - dfrag->already_sent;
+		while (len > 0) {
+			int ret = 0;
+
+			/* do auto tuning */
+			if (!(sk->sk_userlocks & SOCK_SNDBUF_LOCK) &&
+			    ssk->sk_sndbuf > READ_ONCE(sk->sk_sndbuf))
+				WRITE_ONCE(sk->sk_sndbuf, ssk->sk_sndbuf);
+
+			if (unlikely(mptcp_must_reclaim_memory(sk, ssk))) {
+				__mptcp_update_wmem(sk);
+				sk_mem_reclaim_partial(sk);
+			}
+			if (!__mptcp_alloc_tx_skb(sk, ssk, GFP_ATOMIC))
+				goto out;
+
+			ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
+			if (ret <= 0)
+				goto out;
+
+			info.sent += ret;
+			dfrag->already_sent += ret;
+			msk->snd_nxt += ret;
+			msk->snd_burst -= ret;
+			msk->tx_pending_data -= ret;
+			copied += ret;
+			len -= ret;
+		}
+		WRITE_ONCE(msk->first_pending, mptcp_send_next(sk));
+	}
+
+out:
+	/* __mptcp_alloc_tx_skb could have released some wmem and we are
+	 * not going to flush it via release_sock()
+	 */
+	__mptcp_update_wmem(sk);
+	if (copied) {
+		mptcp_set_timeout(sk, ssk);
+		tcp_push(ssk, 0, info.mss_now, tcp_sk(ssk)->nonagle,
+			 info.size_goal);
+		if (msk->snd_data_fin_enable &&
+		    msk->snd_nxt + 1 == msk->write_seq)
+			mptcp_schedule_work(sk);
+	}
+}
+
 static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 {
 	struct mptcp_sock *msk = mptcp_sk(sk);
@@ -1558,7 +1587,6 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 	}
 
 	pfrag = sk_page_frag(sk);
-	mptcp_clean_una(sk);
 
 	while (msg_data_left(msg)) {
 		int total_ts, frag_truesize = 0;
@@ -1578,11 +1606,9 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 		dfrag = mptcp_pending_tail(sk);
 		dfrag_collapsed = mptcp_frag_can_collapse_to(msk, pfrag, dfrag);
 		if (!dfrag_collapsed) {
-			if (!sk_stream_memory_free(sk)) {
-				mptcp_push_pending(sk, msg->msg_flags);
-				if (!sk_stream_memory_free(sk))
-					goto wait_for_memory;
-			}
+			if (!sk_stream_memory_free(sk))
+				goto wait_for_memory;
+
 			if (!mptcp_page_frag_refill(sk, pfrag))
 				goto wait_for_memory;
 
@@ -1639,9 +1665,8 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 		continue;
 
 wait_for_memory:
-		mptcp_nospace(msk);
-		if (mptcp_timer_pending(sk))
-			mptcp_reset_timer(sk);
+		set_bit(MPTCP_NOSPACE, &msk->flags);
+		mptcp_push_pending(sk, msg->msg_flags);
 		ret = sk_stream_wait_memory(sk, &timeo);
 		if (ret)
 			goto out;
@@ -2198,21 +2223,18 @@ static void mptcp_worker(struct work_struct *work)
 	if (unlikely(state == TCP_CLOSE))
 		goto unlock;
 
-	mptcp_clean_una_wakeup(sk);
 	mptcp_check_data_fin_ack(sk);
 	__mptcp_flush_join_list(msk);
 	if (test_and_clear_bit(MPTCP_WORK_CLOSE_SUBFLOW, &msk->flags))
 		__mptcp_close_subflow(msk);
 
-	if (mptcp_send_head(sk))
-		mptcp_push_pending(sk, 0);
-
 	if (msk->pm.status)
 		pm_work(msk);
 
 	if (test_and_clear_bit(MPTCP_WORK_EOF, &msk->flags))
 		mptcp_check_for_eof(msk);
 
+	__mptcp_check_send_data_fin(sk);
 	mptcp_check_data_fin(sk);
 
 	/* if the msk data is completely acked, or the socket timedout,
@@ -2334,8 +2356,6 @@ static void __mptcp_clear_xmit(struct sock *sk)
 	struct mptcp_data_frag *dtmp, *dfrag;
 	struct sk_buff *skb;
 
-	sk_stop_timer(sk, &msk->sk.icsk_retransmit_timer);
-
 	WRITE_ONCE(msk->first_pending, NULL);
 	list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list)
 		dfrag_clear(sk, dfrag);
@@ -2477,7 +2497,7 @@ static void __mptcp_destroy_sock(struct sock *sk)
 	spin_unlock_bh(&msk->join_list_lock);
 	list_splice_init(&msk->conn_list, &conn_list);
 
-	__mptcp_clear_xmit(sk);
+	sk_stop_timer(sk, &msk->sk.icsk_retransmit_timer);
 	sk_stop_timer(sk, &sk->sk_timer);
 	msk->pm.status = 0;
 
@@ -2709,6 +2729,8 @@ void mptcp_destroy_common(struct mptcp_sock *msk)
 {
 	struct sock *sk = (struct sock *)msk;
 
+	__mptcp_clear_xmit(sk);
+
 	/* move to sk_receive_queue, sk_stream_kill_queues will purge it */
 	skb_queue_splice_tail_init(&msk->receive_queue, &sk->sk_receive_queue);
 
@@ -2835,6 +2857,28 @@ static int mptcp_getsockopt(struct sock *sk, int level, int optname,
 	return -EOPNOTSUPP;
 }
 
+void __mptcp_data_acked(struct sock *sk)
+{
+	if (!sock_owned_by_user(sk))
+		__mptcp_clean_una(sk);
+	else
+		set_bit(MPTCP_CLEAN_UNA, &mptcp_sk(sk)->flags);
+
+	if (mptcp_pending_data_fin_ack(sk))
+		mptcp_schedule_work(sk);
+}
+
+void __mptcp_wnd_updated(struct sock *sk, struct sock *ssk)
+{
+	if (!mptcp_send_head(sk))
+		return;
+
+	if (!sock_owned_by_user(sk))
+		__mptcp_subflow_push_pending(sk, ssk);
+	else
+		set_bit(MPTCP_PUSH_PENDING, &mptcp_sk(sk)->flags);
+}
+
 #define MPTCP_DEFERRED_ALL (TCPF_WRITE_TIMER_DEFERRED)
 
 /* processes deferred events and flush wmem */
@@ -2842,6 +2886,25 @@ static void mptcp_release_cb(struct sock *sk)
 {
 	unsigned long flags, nflags;
 
+	/* push_pending may touch wmem_reserved, do it before the later
+	 * cleanup
+	 */
+	if (test_and_clear_bit(MPTCP_CLEAN_UNA, &mptcp_sk(sk)->flags))
+		__mptcp_clean_una(sk);
+	if (test_and_clear_bit(MPTCP_PUSH_PENDING, &mptcp_sk(sk)->flags)) {
+		/* mptcp_push_pending() acquires the subflow socket lock
+		 *
+		 * 1) can't be invoked in atomic scope
+		 * 2) must avoid ABBA deadlock with msk socket spinlock: the RX
+		 *    datapath acquires the msk socket spinlock while helding
+		 *    the subflow socket lock
+		 */
+
+		spin_unlock_bh(&sk->sk_lock.slock);
+		mptcp_push_pending(sk, 0);
+		spin_lock_bh(&sk->sk_lock.slock);
+	}
+
 	/* clear any wmem reservation and errors */
 	__mptcp_update_wmem(sk);
 	__mptcp_update_rmem(sk);
@@ -3177,24 +3240,9 @@ static __poll_t mptcp_check_readable(struct mptcp_sock *msk)
 	       0;
 }
 
-static bool __mptcp_check_writeable(struct mptcp_sock *msk)
-{
-	struct sock *sk = (struct sock *)msk;
-	bool mptcp_writable;
-
-	mptcp_clean_una(sk);
-	mptcp_writable = sk_stream_is_writeable(sk);
-	if (!mptcp_writable)
-		mptcp_nospace(msk);
-
-	return mptcp_writable;
-}
-
 static __poll_t mptcp_check_writeable(struct mptcp_sock *msk)
 {
 	struct sock *sk = (struct sock *)msk;
-	__poll_t ret = 0;
-	bool slow;
 
 	if (unlikely(sk->sk_shutdown & SEND_SHUTDOWN))
 		return 0;
@@ -3202,12 +3250,12 @@ static __poll_t mptcp_check_writeable(struct mptcp_sock *msk)
 	if (sk_stream_is_writeable(sk))
 		return EPOLLOUT | EPOLLWRNORM;
 
-	slow = lock_sock_fast(sk);
-	if (__mptcp_check_writeable(msk))
-		ret = EPOLLOUT | EPOLLWRNORM;
+	set_bit(MPTCP_NOSPACE, &msk->flags);
+	smp_mb__after_atomic(); /* msk->flags is changed by write_space cb */
+	if (sk_stream_is_writeable(sk))
+		return EPOLLOUT | EPOLLWRNORM;
 
-	unlock_sock_fast(sk, slow);
-	return ret;
+	return 0;
 }
 
 static __poll_t mptcp_poll(struct file *file, struct socket *sock,
diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h
index 3c07aafde10e..fc56e730fb35 100644
--- a/net/mptcp/protocol.h
+++ b/net/mptcp/protocol.h
@@ -91,6 +91,8 @@
 #define MPTCP_WORK_EOF		3
 #define MPTCP_FALLBACK_DONE	4
 #define MPTCP_WORK_CLOSE_SUBFLOW 5
+#define MPTCP_PUSH_PENDING	6
+#define MPTCP_CLEAN_UNA		7
 
 static inline bool before64(__u64 seq1, __u64 seq2)
 {
@@ -495,6 +497,7 @@ void mptcp_rcv_space_init(struct mptcp_sock *msk, const struct sock *ssk);
 void mptcp_data_ready(struct sock *sk, struct sock *ssk);
 bool mptcp_finish_join(struct sock *sk);
 bool mptcp_schedule_work(struct sock *sk);
+void __mptcp_wnd_updated(struct sock *sk, struct sock *ssk);
 void __mptcp_data_acked(struct sock *sk);
 void mptcp_subflow_eof(struct sock *sk);
 bool mptcp_update_rcv_data_fin(struct mptcp_sock *msk, u64 data_fin_seq, bool use_64bit);
diff --git a/net/mptcp/subflow.c b/net/mptcp/subflow.c
index 4d8abff1be18..e867ccf3adc0 100644
--- a/net/mptcp/subflow.c
+++ b/net/mptcp/subflow.c
@@ -996,19 +996,9 @@ static void subflow_data_ready(struct sock *sk)
 		mptcp_data_ready(parent, sk);
 }
 
-static void subflow_write_space(struct sock *sk)
+static void subflow_write_space(struct sock *ssk)
 {
-	struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(sk);
-	struct socket *sock = READ_ONCE(sk->sk_socket);
-	struct sock *parent = subflow->conn;
-
-	if (!sk_stream_is_writeable(sk))
-		return;
-
-	if (sock && sk_stream_is_writeable(parent))
-		clear_bit(SOCK_NOSPACE, &sock->flags);
-
-	sk_stream_write_space(parent);
+	/* we take action in __mptcp_clean_una() */
 }
 
 static struct inet_connection_sock_af_ops *
-- 
2.26.2

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [PATCH net-next 6/6] mptcp: use mptcp release_cb for delayed tasks
@ 2020-11-27 10:10 ` Paolo Abeni
  0 siblings, 0 replies; 28+ messages in thread
From: Paolo Abeni @ 2020-11-27 10:10 UTC (permalink / raw)
  To: netdev; +Cc: Jakub Kicinski, mptcp, Eric Dumazet

We have some tasks triggered by the subflow receive path
which require to access the msk socket status, specifically:
mptcp_clean_una() and mptcp_push_pending()

We have almost everything in place to defer to the msk
release_cb such tasks when the msk sock is owned.

Since the worker is no more used to clean the acked data,
for fallback sockets we need to explicitly flush them.

As an added bonus we can move the wake-up code in __mptcp_clean_una(),
simplify a lot mptcp_poll() and move the timer update under
the data lock.

The worker is now used only to process and send DATA_FIN
packets and do the mptcp-level retransmissions.

Acked-by: Florian Westphal <fw@strlen.de>
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
---
 net/mptcp/options.c  |  18 +++-
 net/mptcp/protocol.c | 250 ++++++++++++++++++++++++++-----------------
 net/mptcp/protocol.h |   3 +
 net/mptcp/subflow.c  |  14 +--
 4 files changed, 168 insertions(+), 117 deletions(-)

diff --git a/net/mptcp/options.c b/net/mptcp/options.c
index 3986454a0340..6b7b4b67f18c 100644
--- a/net/mptcp/options.c
+++ b/net/mptcp/options.c
@@ -830,7 +830,7 @@ static u64 expand_ack(u64 old_ack, u64 cur_ack, bool use_64bit)
 }
 
 static void ack_update_msk(struct mptcp_sock *msk,
-			   const struct sock *ssk,
+			   struct sock *ssk,
 			   struct mptcp_options_received *mp_opt)
 {
 	u64 new_wnd_end, new_snd_una, snd_nxt = READ_ONCE(msk->snd_nxt);
@@ -854,8 +854,7 @@ static void ack_update_msk(struct mptcp_sock *msk,
 
 	if (after64(new_wnd_end, msk->wnd_end)) {
 		msk->wnd_end = new_wnd_end;
-		if (mptcp_send_head(sk))
-			mptcp_schedule_work(sk);
+		__mptcp_wnd_updated(sk, ssk);
 	}
 
 	if (after64(new_snd_una, old_snd_una)) {
@@ -915,8 +914,19 @@ void mptcp_incoming_options(struct sock *sk, struct sk_buff *skb)
 	struct mptcp_options_received mp_opt;
 	struct mptcp_ext *mpext;
 
-	if (__mptcp_check_fallback(msk))
+	if (__mptcp_check_fallback(msk)) {
+		/* Keep it simple and unconditionally trigger send data cleanup and
+		 * pending queue spooling. We will need to acquire the data lock
+		 * for more accurate checks, and once the lock is acquired, such
+		 * helpers are cheap.
+		 */
+		mptcp_data_lock(subflow->conn);
+		if (mptcp_send_head(subflow->conn))
+			__mptcp_wnd_updated(subflow->conn, sk);
+		__mptcp_data_acked(subflow->conn);
+		mptcp_data_unlock(subflow->conn);
 		return;
+	}
 
 	mptcp_get_options(skb, &mp_opt);
 	if (!check_fully_established(msk, sk, subflow, skb, &mp_opt))
diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
index 51f92f3096bf..221f7cdd416b 100644
--- a/net/mptcp/protocol.c
+++ b/net/mptcp/protocol.c
@@ -348,17 +348,22 @@ static void mptcp_close_wake_up(struct sock *sk)
 		sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
 }
 
-static void mptcp_check_data_fin_ack(struct sock *sk)
+static bool mptcp_pending_data_fin_ack(struct sock *sk)
 {
 	struct mptcp_sock *msk = mptcp_sk(sk);
 
-	if (__mptcp_check_fallback(msk))
-		return;
+	return !__mptcp_check_fallback(msk) &&
+	       ((1 << sk->sk_state) &
+		(TCPF_FIN_WAIT1 | TCPF_CLOSING | TCPF_LAST_ACK)) &&
+	       msk->write_seq == READ_ONCE(msk->snd_una);
+}
+
+static void mptcp_check_data_fin_ack(struct sock *sk)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
 
 	/* Look for an acknowledged DATA_FIN */
-	if (((1 << sk->sk_state) &
-	     (TCPF_FIN_WAIT1 | TCPF_CLOSING | TCPF_LAST_ACK)) &&
-	    msk->write_seq == READ_ONCE(msk->snd_una)) {
+	if (mptcp_pending_data_fin_ack(sk)) {
 		mptcp_stop_timer(sk);
 
 		WRITE_ONCE(msk->snd_data_fin_enable, 0);
@@ -764,16 +769,6 @@ bool mptcp_schedule_work(struct sock *sk)
 	return false;
 }
 
-void __mptcp_data_acked(struct sock *sk)
-{
-	mptcp_reset_timer(sk);
-
-	if ((test_bit(MPTCP_NOSPACE, &mptcp_sk(sk)->flags) ||
-	     mptcp_send_head(sk) ||
-	     (inet_sk_state_load(sk) != TCP_ESTABLISHED)))
-		mptcp_schedule_work(sk);
-}
-
 void mptcp_subflow_eof(struct sock *sk)
 {
 	if (!test_and_set_bit(MPTCP_WORK_EOF, &mptcp_sk(sk)->flags))
@@ -986,7 +981,7 @@ static void dfrag_clear(struct sock *sk, struct mptcp_data_frag *dfrag)
 	put_page(dfrag->page);
 }
 
-static void mptcp_clean_una(struct sock *sk)
+static void __mptcp_clean_una(struct sock *sk)
 {
 	struct mptcp_sock *msk = mptcp_sk(sk);
 	struct mptcp_data_frag *dtmp, *dfrag;
@@ -999,8 +994,6 @@ static void mptcp_clean_una(struct sock *sk)
 	if (__mptcp_check_fallback(msk))
 		msk->snd_una = READ_ONCE(msk->snd_nxt);
 
-
-	mptcp_data_lock(sk);
 	snd_una = msk->snd_una;
 	list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list) {
 		if (after64(dfrag->data_seq + dfrag->data_len, snd_una))
@@ -1029,21 +1022,25 @@ static void mptcp_clean_una(struct sock *sk)
 	}
 
 out:
-	if (cleaned && tcp_under_memory_pressure(sk))
-		sk_mem_reclaim_partial(sk);
-	mptcp_data_unlock(sk);
-}
-
-static void mptcp_clean_una_wakeup(struct sock *sk)
-{
-	struct mptcp_sock *msk = mptcp_sk(sk);
+	if (cleaned) {
+		if (tcp_under_memory_pressure(sk)) {
+			__mptcp_update_wmem(sk);
+			sk_mem_reclaim_partial(sk);
+		}
 
-	mptcp_clean_una(sk);
+		if (sk_stream_is_writeable(sk)) {
+			/* pairs with memory barrier in mptcp_poll */
+			smp_mb();
+			if (test_and_clear_bit(MPTCP_NOSPACE, &msk->flags))
+				sk_stream_write_space(sk);
+		}
+	}
 
-	/* Only wake up writers if a subflow is ready */
-	if (sk_stream_is_writeable(sk)) {
-		clear_bit(MPTCP_NOSPACE, &msk->flags);
-		sk_stream_write_space(sk);
+	if (snd_una == READ_ONCE(msk->snd_nxt)) {
+		if (msk->timer_ival)
+			mptcp_stop_timer(sk);
+	} else {
+		mptcp_reset_timer(sk);
 	}
 }
 
@@ -1130,13 +1127,13 @@ static bool __mptcp_add_ext(struct sk_buff *skb, gfp_t gfp)
 	return true;
 }
 
-static struct sk_buff *__mptcp_do_alloc_tx_skb(struct sock *sk)
+static struct sk_buff *__mptcp_do_alloc_tx_skb(struct sock *sk, gfp_t gfp)
 {
 	struct sk_buff *skb;
 
-	skb = alloc_skb_fclone(MAX_TCP_HEADER, sk->sk_allocation);
+	skb = alloc_skb_fclone(MAX_TCP_HEADER, gfp);
 	if (likely(skb)) {
-		if (likely(__mptcp_add_ext(skb, sk->sk_allocation))) {
+		if (likely(__mptcp_add_ext(skb, gfp))) {
 			skb_reserve(skb, MAX_TCP_HEADER);
 			skb->reserved_tailroom = skb->end - skb->tail;
 			return skb;
@@ -1168,7 +1165,7 @@ static bool mptcp_tx_cache_refill(struct sock *sk, int size,
 	}
 
 	while (space_needed > 0) {
-		skb = __mptcp_do_alloc_tx_skb(sk);
+		skb = __mptcp_do_alloc_tx_skb(sk, sk->sk_allocation);
 		if (unlikely(!skb)) {
 			/* under memory pressure, try to pass the caller a
 			 * single skb to allow forward progress
@@ -1187,7 +1184,7 @@ static bool mptcp_tx_cache_refill(struct sock *sk, int size,
 	return true;
 }
 
-static bool __mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk)
+static bool __mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk, gfp_t gfp)
 {
 	struct mptcp_sock *msk = mptcp_sk(sk);
 	struct sk_buff *skb;
@@ -1195,7 +1192,7 @@ static bool __mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk)
 	if (ssk->sk_tx_skb_cache) {
 		skb = ssk->sk_tx_skb_cache;
 		if (unlikely(!skb_ext_find(skb, SKB_EXT_MPTCP) &&
-			     !__mptcp_add_ext(skb, sk->sk_allocation)))
+			     !__mptcp_add_ext(skb, gfp)))
 			return false;
 		return true;
 	}
@@ -1216,7 +1213,7 @@ static bool __mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk)
 		return false;
 	}
 
-	skb = __mptcp_do_alloc_tx_skb(sk);
+	skb = __mptcp_do_alloc_tx_skb(sk, gfp);
 	if (!skb)
 		return false;
 
@@ -1239,7 +1236,7 @@ static bool mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk)
 {
 	if (unlikely(mptcp_must_reclaim_memory(sk, ssk)))
 		mptcp_mem_reclaim_partial(sk);
-	return __mptcp_alloc_tx_skb(sk, ssk);
+	return __mptcp_alloc_tx_skb(sk, ssk, sk->sk_allocation);
 }
 
 static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
@@ -1340,31 +1337,6 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
 	return ret;
 }
 
-static void mptcp_nospace(struct mptcp_sock *msk)
-{
-	struct mptcp_subflow_context *subflow;
-
-	set_bit(MPTCP_NOSPACE, &msk->flags);
-	smp_mb__after_atomic(); /* msk->flags is changed by write_space cb */
-
-	mptcp_for_each_subflow(msk, subflow) {
-		struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
-		bool ssk_writeable = sk_stream_is_writeable(ssk);
-		struct socket *sock = READ_ONCE(ssk->sk_socket);
-
-		if (ssk_writeable || !sock)
-			continue;
-
-		/* enables ssk->write_space() callbacks */
-		set_bit(SOCK_NOSPACE, &sock->flags);
-	}
-
-	/* mptcp_data_acked() could run just before we set the NOSPACE bit,
-	 * so explicitly check for snd_una value
-	 */
-	mptcp_clean_una((struct sock *)msk);
-}
-
 #define MPTCP_SEND_BURST_SIZE		((1 << 16) - \
 					 sizeof(struct tcphdr) - \
 					 MAX_TCP_OPTION_SPACE - \
@@ -1536,6 +1508,63 @@ static void mptcp_push_pending(struct sock *sk, unsigned int flags)
 	}
 }
 
+static void __mptcp_subflow_push_pending(struct sock *sk, struct sock *ssk)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
+	struct mptcp_sendmsg_info info;
+	struct mptcp_data_frag *dfrag;
+	int len, copied = 0;
+
+	info.flags = 0;
+	while ((dfrag = mptcp_send_head(sk))) {
+		info.sent = dfrag->already_sent;
+		info.limit = dfrag->data_len;
+		len = dfrag->data_len - dfrag->already_sent;
+		while (len > 0) {
+			int ret = 0;
+
+			/* do auto tuning */
+			if (!(sk->sk_userlocks & SOCK_SNDBUF_LOCK) &&
+			    ssk->sk_sndbuf > READ_ONCE(sk->sk_sndbuf))
+				WRITE_ONCE(sk->sk_sndbuf, ssk->sk_sndbuf);
+
+			if (unlikely(mptcp_must_reclaim_memory(sk, ssk))) {
+				__mptcp_update_wmem(sk);
+				sk_mem_reclaim_partial(sk);
+			}
+			if (!__mptcp_alloc_tx_skb(sk, ssk, GFP_ATOMIC))
+				goto out;
+
+			ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
+			if (ret <= 0)
+				goto out;
+
+			info.sent += ret;
+			dfrag->already_sent += ret;
+			msk->snd_nxt += ret;
+			msk->snd_burst -= ret;
+			msk->tx_pending_data -= ret;
+			copied += ret;
+			len -= ret;
+		}
+		WRITE_ONCE(msk->first_pending, mptcp_send_next(sk));
+	}
+
+out:
+	/* __mptcp_alloc_tx_skb could have released some wmem and we are
+	 * not going to flush it via release_sock()
+	 */
+	__mptcp_update_wmem(sk);
+	if (copied) {
+		mptcp_set_timeout(sk, ssk);
+		tcp_push(ssk, 0, info.mss_now, tcp_sk(ssk)->nonagle,
+			 info.size_goal);
+		if (msk->snd_data_fin_enable &&
+		    msk->snd_nxt + 1 == msk->write_seq)
+			mptcp_schedule_work(sk);
+	}
+}
+
 static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 {
 	struct mptcp_sock *msk = mptcp_sk(sk);
@@ -1558,7 +1587,6 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 	}
 
 	pfrag = sk_page_frag(sk);
-	mptcp_clean_una(sk);
 
 	while (msg_data_left(msg)) {
 		int total_ts, frag_truesize = 0;
@@ -1578,11 +1606,9 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 		dfrag = mptcp_pending_tail(sk);
 		dfrag_collapsed = mptcp_frag_can_collapse_to(msk, pfrag, dfrag);
 		if (!dfrag_collapsed) {
-			if (!sk_stream_memory_free(sk)) {
-				mptcp_push_pending(sk, msg->msg_flags);
-				if (!sk_stream_memory_free(sk))
-					goto wait_for_memory;
-			}
+			if (!sk_stream_memory_free(sk))
+				goto wait_for_memory;
+
 			if (!mptcp_page_frag_refill(sk, pfrag))
 				goto wait_for_memory;
 
@@ -1639,9 +1665,8 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 		continue;
 
 wait_for_memory:
-		mptcp_nospace(msk);
-		if (mptcp_timer_pending(sk))
-			mptcp_reset_timer(sk);
+		set_bit(MPTCP_NOSPACE, &msk->flags);
+		mptcp_push_pending(sk, msg->msg_flags);
 		ret = sk_stream_wait_memory(sk, &timeo);
 		if (ret)
 			goto out;
@@ -2198,21 +2223,18 @@ static void mptcp_worker(struct work_struct *work)
 	if (unlikely(state == TCP_CLOSE))
 		goto unlock;
 
-	mptcp_clean_una_wakeup(sk);
 	mptcp_check_data_fin_ack(sk);
 	__mptcp_flush_join_list(msk);
 	if (test_and_clear_bit(MPTCP_WORK_CLOSE_SUBFLOW, &msk->flags))
 		__mptcp_close_subflow(msk);
 
-	if (mptcp_send_head(sk))
-		mptcp_push_pending(sk, 0);
-
 	if (msk->pm.status)
 		pm_work(msk);
 
 	if (test_and_clear_bit(MPTCP_WORK_EOF, &msk->flags))
 		mptcp_check_for_eof(msk);
 
+	__mptcp_check_send_data_fin(sk);
 	mptcp_check_data_fin(sk);
 
 	/* if the msk data is completely acked, or the socket timedout,
@@ -2334,8 +2356,6 @@ static void __mptcp_clear_xmit(struct sock *sk)
 	struct mptcp_data_frag *dtmp, *dfrag;
 	struct sk_buff *skb;
 
-	sk_stop_timer(sk, &msk->sk.icsk_retransmit_timer);
-
 	WRITE_ONCE(msk->first_pending, NULL);
 	list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list)
 		dfrag_clear(sk, dfrag);
@@ -2477,7 +2497,7 @@ static void __mptcp_destroy_sock(struct sock *sk)
 	spin_unlock_bh(&msk->join_list_lock);
 	list_splice_init(&msk->conn_list, &conn_list);
 
-	__mptcp_clear_xmit(sk);
+	sk_stop_timer(sk, &msk->sk.icsk_retransmit_timer);
 	sk_stop_timer(sk, &sk->sk_timer);
 	msk->pm.status = 0;
 
@@ -2709,6 +2729,8 @@ void mptcp_destroy_common(struct mptcp_sock *msk)
 {
 	struct sock *sk = (struct sock *)msk;
 
+	__mptcp_clear_xmit(sk);
+
 	/* move to sk_receive_queue, sk_stream_kill_queues will purge it */
 	skb_queue_splice_tail_init(&msk->receive_queue, &sk->sk_receive_queue);
 
@@ -2835,6 +2857,28 @@ static int mptcp_getsockopt(struct sock *sk, int level, int optname,
 	return -EOPNOTSUPP;
 }
 
+void __mptcp_data_acked(struct sock *sk)
+{
+	if (!sock_owned_by_user(sk))
+		__mptcp_clean_una(sk);
+	else
+		set_bit(MPTCP_CLEAN_UNA, &mptcp_sk(sk)->flags);
+
+	if (mptcp_pending_data_fin_ack(sk))
+		mptcp_schedule_work(sk);
+}
+
+void __mptcp_wnd_updated(struct sock *sk, struct sock *ssk)
+{
+	if (!mptcp_send_head(sk))
+		return;
+
+	if (!sock_owned_by_user(sk))
+		__mptcp_subflow_push_pending(sk, ssk);
+	else
+		set_bit(MPTCP_PUSH_PENDING, &mptcp_sk(sk)->flags);
+}
+
 #define MPTCP_DEFERRED_ALL (TCPF_WRITE_TIMER_DEFERRED)
 
 /* processes deferred events and flush wmem */
@@ -2842,6 +2886,25 @@ static void mptcp_release_cb(struct sock *sk)
 {
 	unsigned long flags, nflags;
 
+	/* push_pending may touch wmem_reserved, do it before the later
+	 * cleanup
+	 */
+	if (test_and_clear_bit(MPTCP_CLEAN_UNA, &mptcp_sk(sk)->flags))
+		__mptcp_clean_una(sk);
+	if (test_and_clear_bit(MPTCP_PUSH_PENDING, &mptcp_sk(sk)->flags)) {
+		/* mptcp_push_pending() acquires the subflow socket lock
+		 *
+		 * 1) can't be invoked in atomic scope
+		 * 2) must avoid ABBA deadlock with msk socket spinlock: the RX
+		 *    datapath acquires the msk socket spinlock while helding
+		 *    the subflow socket lock
+		 */
+
+		spin_unlock_bh(&sk->sk_lock.slock);
+		mptcp_push_pending(sk, 0);
+		spin_lock_bh(&sk->sk_lock.slock);
+	}
+
 	/* clear any wmem reservation and errors */
 	__mptcp_update_wmem(sk);
 	__mptcp_update_rmem(sk);
@@ -3177,24 +3240,9 @@ static __poll_t mptcp_check_readable(struct mptcp_sock *msk)
 	       0;
 }
 
-static bool __mptcp_check_writeable(struct mptcp_sock *msk)
-{
-	struct sock *sk = (struct sock *)msk;
-	bool mptcp_writable;
-
-	mptcp_clean_una(sk);
-	mptcp_writable = sk_stream_is_writeable(sk);
-	if (!mptcp_writable)
-		mptcp_nospace(msk);
-
-	return mptcp_writable;
-}
-
 static __poll_t mptcp_check_writeable(struct mptcp_sock *msk)
 {
 	struct sock *sk = (struct sock *)msk;
-	__poll_t ret = 0;
-	bool slow;
 
 	if (unlikely(sk->sk_shutdown & SEND_SHUTDOWN))
 		return 0;
@@ -3202,12 +3250,12 @@ static __poll_t mptcp_check_writeable(struct mptcp_sock *msk)
 	if (sk_stream_is_writeable(sk))
 		return EPOLLOUT | EPOLLWRNORM;
 
-	slow = lock_sock_fast(sk);
-	if (__mptcp_check_writeable(msk))
-		ret = EPOLLOUT | EPOLLWRNORM;
+	set_bit(MPTCP_NOSPACE, &msk->flags);
+	smp_mb__after_atomic(); /* msk->flags is changed by write_space cb */
+	if (sk_stream_is_writeable(sk))
+		return EPOLLOUT | EPOLLWRNORM;
 
-	unlock_sock_fast(sk, slow);
-	return ret;
+	return 0;
 }
 
 static __poll_t mptcp_poll(struct file *file, struct socket *sock,
diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h
index 3c07aafde10e..fc56e730fb35 100644
--- a/net/mptcp/protocol.h
+++ b/net/mptcp/protocol.h
@@ -91,6 +91,8 @@
 #define MPTCP_WORK_EOF		3
 #define MPTCP_FALLBACK_DONE	4
 #define MPTCP_WORK_CLOSE_SUBFLOW 5
+#define MPTCP_PUSH_PENDING	6
+#define MPTCP_CLEAN_UNA		7
 
 static inline bool before64(__u64 seq1, __u64 seq2)
 {
@@ -495,6 +497,7 @@ void mptcp_rcv_space_init(struct mptcp_sock *msk, const struct sock *ssk);
 void mptcp_data_ready(struct sock *sk, struct sock *ssk);
 bool mptcp_finish_join(struct sock *sk);
 bool mptcp_schedule_work(struct sock *sk);
+void __mptcp_wnd_updated(struct sock *sk, struct sock *ssk);
 void __mptcp_data_acked(struct sock *sk);
 void mptcp_subflow_eof(struct sock *sk);
 bool mptcp_update_rcv_data_fin(struct mptcp_sock *msk, u64 data_fin_seq, bool use_64bit);
diff --git a/net/mptcp/subflow.c b/net/mptcp/subflow.c
index 4d8abff1be18..e867ccf3adc0 100644
--- a/net/mptcp/subflow.c
+++ b/net/mptcp/subflow.c
@@ -996,19 +996,9 @@ static void subflow_data_ready(struct sock *sk)
 		mptcp_data_ready(parent, sk);
 }
 
-static void subflow_write_space(struct sock *sk)
+static void subflow_write_space(struct sock *ssk)
 {
-	struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(sk);
-	struct socket *sock = READ_ONCE(sk->sk_socket);
-	struct sock *parent = subflow->conn;
-
-	if (!sk_stream_is_writeable(sk))
-		return;
-
-	if (sock && sk_stream_is_writeable(parent))
-		clear_bit(SOCK_NOSPACE, &sock->flags);
-
-	sk_stream_write_space(parent);
+	/* we take action in __mptcp_clean_una() */
 }
 
 static struct inet_connection_sock_af_ops *
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [MPTCP] Re: [PATCH net-next 1/6] mptcp: open code mptcp variant for lock_sock
  2020-11-27 10:10 ` Paolo Abeni
@ 2020-11-30 23:43 ` Mat Martineau
  -1 siblings, 0 replies; 28+ messages in thread
From: Mat Martineau @ 2020-11-30 23:43 UTC (permalink / raw)
  To: mptcp

[-- Attachment #1: Type: text/plain, Size: 567 bytes --]

On Fri, 27 Nov 2020, Paolo Abeni wrote:

> This allows invoking an additional callback under the
> socket spin lock.
>
> Will be used by the next patches to avoid additional
> spin lock contention.
>
> Acked-by: Florian Westphal <fw(a)strlen.de>
> Signed-off-by: Paolo Abeni <pabeni(a)redhat.com>
> ---
> include/net/sock.h   |  1 +
> net/core/sock.c      |  2 +-
> net/mptcp/protocol.h | 13 +++++++++++++
> 3 files changed, 15 insertions(+), 1 deletion(-)

Reviewed-by: Mat Martineau <mathew.j.martineau(a)linux.intel.com>

--
Mat Martineau
Intel

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH net-next 1/6] mptcp: open code mptcp variant for lock_sock
@ 2020-11-30 23:43 ` Mat Martineau
  0 siblings, 0 replies; 28+ messages in thread
From: Mat Martineau @ 2020-11-30 23:43 UTC (permalink / raw)
  To: Paolo Abeni; +Cc: netdev, Jakub Kicinski, mptcp, Eric Dumazet

On Fri, 27 Nov 2020, Paolo Abeni wrote:

> This allows invoking an additional callback under the
> socket spin lock.
>
> Will be used by the next patches to avoid additional
> spin lock contention.
>
> Acked-by: Florian Westphal <fw@strlen.de>
> Signed-off-by: Paolo Abeni <pabeni@redhat.com>
> ---
> include/net/sock.h   |  1 +
> net/core/sock.c      |  2 +-
> net/mptcp/protocol.h | 13 +++++++++++++
> 3 files changed, 15 insertions(+), 1 deletion(-)

Reviewed-by: Mat Martineau <mathew.j.martineau@linux.intel.com>

--
Mat Martineau
Intel

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [MPTCP] Re: [PATCH net-next 2/6] mptcp: implement wmem reservation
  2020-11-27 10:10 ` Paolo Abeni
@ 2020-11-30 23:43 ` Mat Martineau
  -1 siblings, 0 replies; 28+ messages in thread
From: Mat Martineau @ 2020-11-30 23:43 UTC (permalink / raw)
  To: mptcp

[-- Attachment #1: Type: text/plain, Size: 1075 bytes --]

On Fri, 27 Nov 2020, Paolo Abeni wrote:

> This leverages the previous commit to reserve the wmem
> required for the sendmsg() operation when the msk socket
> lock is first acquired.
> Some heuristics are used to get a reasonable [over] estimation of
> the whole memory required. If we can't forward alloc such amount
> fallback to a reasonable small chunk, otherwise enter the wait
> for memory path.
>
> When sendmsg() needs more memory it looks at wmem_reserved
> first and if that is exhausted, move more space from
> sk_forward_alloc.
>
> The reserved memory is not persistent and is released at the
> next socket unlock via the release_cb().
>
> Overall this will simplify the next patch.
>
> Acked-by: Florian Westphal <fw(a)strlen.de>
> Signed-off-by: Paolo Abeni <pabeni(a)redhat.com>
> ---
> net/mptcp/protocol.c | 92 ++++++++++++++++++++++++++++++++++++++++----
> net/mptcp/protocol.h |  1 +
> 2 files changed, 86 insertions(+), 7 deletions(-)

Reviewed-by: Mat Martineau <mathew.j.martineau(a)linux.intel.com>

--
Mat Martineau
Intel

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH net-next 2/6] mptcp: implement wmem reservation
@ 2020-11-30 23:43 ` Mat Martineau
  0 siblings, 0 replies; 28+ messages in thread
From: Mat Martineau @ 2020-11-30 23:43 UTC (permalink / raw)
  To: Paolo Abeni; +Cc: netdev, Jakub Kicinski, mptcp, Eric Dumazet

On Fri, 27 Nov 2020, Paolo Abeni wrote:

> This leverages the previous commit to reserve the wmem
> required for the sendmsg() operation when the msk socket
> lock is first acquired.
> Some heuristics are used to get a reasonable [over] estimation of
> the whole memory required. If we can't forward alloc such amount
> fallback to a reasonable small chunk, otherwise enter the wait
> for memory path.
>
> When sendmsg() needs more memory it looks at wmem_reserved
> first and if that is exhausted, move more space from
> sk_forward_alloc.
>
> The reserved memory is not persistent and is released at the
> next socket unlock via the release_cb().
>
> Overall this will simplify the next patch.
>
> Acked-by: Florian Westphal <fw@strlen.de>
> Signed-off-by: Paolo Abeni <pabeni@redhat.com>
> ---
> net/mptcp/protocol.c | 92 ++++++++++++++++++++++++++++++++++++++++----
> net/mptcp/protocol.h |  1 +
> 2 files changed, 86 insertions(+), 7 deletions(-)

Reviewed-by: Mat Martineau <mathew.j.martineau@linux.intel.com>

--
Mat Martineau
Intel

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [MPTCP] Re: [PATCH net-next 3/6] mptcp: protect the rx path with the msk socket spinlock
  2020-11-27 10:10 ` Paolo Abeni
@ 2020-11-30 23:43 ` Mat Martineau
  -1 siblings, 0 replies; 28+ messages in thread
From: Mat Martineau @ 2020-11-30 23:43 UTC (permalink / raw)
  To: mptcp

[-- Attachment #1: Type: text/plain, Size: 1275 bytes --]

On Fri, 27 Nov 2020, Paolo Abeni wrote:

> Such spinlock is currently used only to protect the 'owned'
> flag inside the socket lock itself. With this patch, we extend
> its scope to protect the whole msk receive path and
> sk_forward_memory.
>
> Given the above, we can always move data into the msk receive
> queue (and OoO queue) from the subflow.
>
> We leverage the previous commit, so that we need to acquire the
> spinlock in the tx path only when moving fwd memory.
>
> recvmsg() must now explicitly acquire the socket spinlock
> when moving skbs out of sk_receive_queue. To reduce the number of
> lock operations required we use a second rx queue and splice the
> first into the latter in mptcp_lock_sock(). Additionally rmem
> allocated memory is bulk-freed via release_cb()
>
> Acked-by: Florian Westphal <fw(a)strlen.de>
> Co-developed-by: Florian Westphal <fw(a)strlen.de>
> Signed-off-by: Florian Westphal <fw(a)strlen.de>
> Signed-off-by: Paolo Abeni <pabeni(a)redhat.com>
> ---
> net/mptcp/protocol.c | 149 +++++++++++++++++++++++++++++--------------
> net/mptcp/protocol.h |   5 ++
> 2 files changed, 107 insertions(+), 47 deletions(-)

Reviewed-by: Mat Martineau <mathew.j.martineau(a)linux.intel.com>

--
Mat Martineau
Intel

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH net-next 3/6] mptcp: protect the rx path with the msk socket spinlock
@ 2020-11-30 23:43 ` Mat Martineau
  0 siblings, 0 replies; 28+ messages in thread
From: Mat Martineau @ 2020-11-30 23:43 UTC (permalink / raw)
  To: Paolo Abeni; +Cc: netdev, Jakub Kicinski, mptcp, Eric Dumazet

On Fri, 27 Nov 2020, Paolo Abeni wrote:

> Such spinlock is currently used only to protect the 'owned'
> flag inside the socket lock itself. With this patch, we extend
> its scope to protect the whole msk receive path and
> sk_forward_memory.
>
> Given the above, we can always move data into the msk receive
> queue (and OoO queue) from the subflow.
>
> We leverage the previous commit, so that we need to acquire the
> spinlock in the tx path only when moving fwd memory.
>
> recvmsg() must now explicitly acquire the socket spinlock
> when moving skbs out of sk_receive_queue. To reduce the number of
> lock operations required we use a second rx queue and splice the
> first into the latter in mptcp_lock_sock(). Additionally rmem
> allocated memory is bulk-freed via release_cb()
>
> Acked-by: Florian Westphal <fw@strlen.de>
> Co-developed-by: Florian Westphal <fw@strlen.de>
> Signed-off-by: Florian Westphal <fw@strlen.de>
> Signed-off-by: Paolo Abeni <pabeni@redhat.com>
> ---
> net/mptcp/protocol.c | 149 +++++++++++++++++++++++++++++--------------
> net/mptcp/protocol.h |   5 ++
> 2 files changed, 107 insertions(+), 47 deletions(-)

Reviewed-by: Mat Martineau <mathew.j.martineau@linux.intel.com>

--
Mat Martineau
Intel

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [MPTCP] Re: [PATCH net-next 4/6] mptcp: allocate TX skbs in msk context
  2020-11-27 10:10 ` Paolo Abeni
@ 2020-11-30 23:44 ` Mat Martineau
  -1 siblings, 0 replies; 28+ messages in thread
From: Mat Martineau @ 2020-11-30 23:44 UTC (permalink / raw)
  To: mptcp

[-- Attachment #1: Type: text/plain, Size: 719 bytes --]

On Fri, 27 Nov 2020, Paolo Abeni wrote:

> Move the TX skbs allocation in mptcp_sendmsg() scope,
> and tentatively pre-allocate a skbs number proportional
> to the sendmsg() length.
>
> Use the ssk tx skb cache to prevent the subflow allocation.
>
> This allows removing the msk skb extension cache and will
> make possible the later patches.
>
> Acked-by: Florian Westphal <fw(a)strlen.de>
> Signed-off-by: Paolo Abeni <pabeni(a)redhat.com>
> ---
> net/mptcp/protocol.c | 248 ++++++++++++++++++++++++++++++++++++-------
> net/mptcp/protocol.h |   4 +-
> 2 files changed, 210 insertions(+), 42 deletions(-)

Reviewed-by: Mat Martineau <mathew.j.martineau(a)linux.intel.com>

--
Mat Martineau
Intel

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH net-next 4/6] mptcp: allocate TX skbs in msk context
@ 2020-11-30 23:44 ` Mat Martineau
  0 siblings, 0 replies; 28+ messages in thread
From: Mat Martineau @ 2020-11-30 23:44 UTC (permalink / raw)
  To: Paolo Abeni; +Cc: netdev, Jakub Kicinski, mptcp, Eric Dumazet

On Fri, 27 Nov 2020, Paolo Abeni wrote:

> Move the TX skbs allocation in mptcp_sendmsg() scope,
> and tentatively pre-allocate a skbs number proportional
> to the sendmsg() length.
>
> Use the ssk tx skb cache to prevent the subflow allocation.
>
> This allows removing the msk skb extension cache and will
> make possible the later patches.
>
> Acked-by: Florian Westphal <fw@strlen.de>
> Signed-off-by: Paolo Abeni <pabeni@redhat.com>
> ---
> net/mptcp/protocol.c | 248 ++++++++++++++++++++++++++++++++++++-------
> net/mptcp/protocol.h |   4 +-
> 2 files changed, 210 insertions(+), 42 deletions(-)

Reviewed-by: Mat Martineau <mathew.j.martineau@linux.intel.com>

--
Mat Martineau
Intel

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [MPTCP] Re: [PATCH net-next 5/6] mptcp: avoid a few atomic ops in the rx path
  2020-11-27 10:10 ` Paolo Abeni
@ 2020-11-30 23:44 ` Mat Martineau
  -1 siblings, 0 replies; 28+ messages in thread
From: Mat Martineau @ 2020-11-30 23:44 UTC (permalink / raw)
  To: mptcp

[-- Attachment #1: Type: text/plain, Size: 689 bytes --]

On Fri, 27 Nov 2020, Paolo Abeni wrote:

> Extending the data_lock scope in mptcp_incoming_option
> we can use that to protect both snd_una and wnd_end.
> In the typical case, we will have a single atomic op instead of 2
>
> Acked-by: Florian Westphal <fw(a)strlen.de>
> Signed-off-by: Paolo Abeni <pabeni(a)redhat.com>
> ---
> net/mptcp/mptcp_diag.c |  2 +-
> net/mptcp/options.c    | 33 +++++++++++++--------------------
> net/mptcp/protocol.c   | 34 ++++++++++++++++------------------
> net/mptcp/protocol.h   |  8 ++++----
> 4 files changed, 34 insertions(+), 43 deletions(-)

Reviewed-by: Mat Martineau <mathew.j.martineau(a)linux.intel.com>

--
Mat Martineau
Intel

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH net-next 5/6] mptcp: avoid a few atomic ops in the rx path
@ 2020-11-30 23:44 ` Mat Martineau
  0 siblings, 0 replies; 28+ messages in thread
From: Mat Martineau @ 2020-11-30 23:44 UTC (permalink / raw)
  To: Paolo Abeni; +Cc: netdev, Jakub Kicinski, mptcp, Eric Dumazet

On Fri, 27 Nov 2020, Paolo Abeni wrote:

> Extending the data_lock scope in mptcp_incoming_option
> we can use that to protect both snd_una and wnd_end.
> In the typical case, we will have a single atomic op instead of 2
>
> Acked-by: Florian Westphal <fw@strlen.de>
> Signed-off-by: Paolo Abeni <pabeni@redhat.com>
> ---
> net/mptcp/mptcp_diag.c |  2 +-
> net/mptcp/options.c    | 33 +++++++++++++--------------------
> net/mptcp/protocol.c   | 34 ++++++++++++++++------------------
> net/mptcp/protocol.h   |  8 ++++----
> 4 files changed, 34 insertions(+), 43 deletions(-)

Reviewed-by: Mat Martineau <mathew.j.martineau@linux.intel.com>

--
Mat Martineau
Intel

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [MPTCP] Re: [PATCH net-next 6/6] mptcp: use mptcp release_cb for delayed tasks
  2020-11-27 10:10 ` Paolo Abeni
@ 2020-11-30 23:45 ` Mat Martineau
  -1 siblings, 0 replies; 28+ messages in thread
From: Mat Martineau @ 2020-11-30 23:45 UTC (permalink / raw)
  To: mptcp

[-- Attachment #1: Type: text/plain, Size: 1156 bytes --]

On Fri, 27 Nov 2020, Paolo Abeni wrote:

> We have some tasks triggered by the subflow receive path
> which require to access the msk socket status, specifically:
> mptcp_clean_una() and mptcp_push_pending()
>
> We have almost everything in place to defer to the msk
> release_cb such tasks when the msk sock is owned.
>
> Since the worker is no more used to clean the acked data,
> for fallback sockets we need to explicitly flush them.
>
> As an added bonus we can move the wake-up code in __mptcp_clean_una(),
> simplify a lot mptcp_poll() and move the timer update under
> the data lock.
>
> The worker is now used only to process and send DATA_FIN
> packets and do the mptcp-level retransmissions.
>
> Acked-by: Florian Westphal <fw(a)strlen.de>
> Signed-off-by: Paolo Abeni <pabeni(a)redhat.com>
> ---
> net/mptcp/options.c  |  18 +++-
> net/mptcp/protocol.c | 250 ++++++++++++++++++++++++++-----------------
> net/mptcp/protocol.h |   3 +
> net/mptcp/subflow.c  |  14 +--
> 4 files changed, 168 insertions(+), 117 deletions(-)

Reviewed-by: Mat Martineau <mathew.j.martineau(a)linux.intel.com>

--
Mat Martineau
Intel

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH net-next 6/6] mptcp: use mptcp release_cb for delayed tasks
@ 2020-11-30 23:45 ` Mat Martineau
  0 siblings, 0 replies; 28+ messages in thread
From: Mat Martineau @ 2020-11-30 23:45 UTC (permalink / raw)
  To: Paolo Abeni; +Cc: netdev, Jakub Kicinski, mptcp, Eric Dumazet

On Fri, 27 Nov 2020, Paolo Abeni wrote:

> We have some tasks triggered by the subflow receive path
> which require to access the msk socket status, specifically:
> mptcp_clean_una() and mptcp_push_pending()
>
> We have almost everything in place to defer to the msk
> release_cb such tasks when the msk sock is owned.
>
> Since the worker is no more used to clean the acked data,
> for fallback sockets we need to explicitly flush them.
>
> As an added bonus we can move the wake-up code in __mptcp_clean_una(),
> simplify a lot mptcp_poll() and move the timer update under
> the data lock.
>
> The worker is now used only to process and send DATA_FIN
> packets and do the mptcp-level retransmissions.
>
> Acked-by: Florian Westphal <fw@strlen.de>
> Signed-off-by: Paolo Abeni <pabeni@redhat.com>
> ---
> net/mptcp/options.c  |  18 +++-
> net/mptcp/protocol.c | 250 ++++++++++++++++++++++++++-----------------
> net/mptcp/protocol.h |   3 +
> net/mptcp/subflow.c  |  14 +--
> 4 files changed, 168 insertions(+), 117 deletions(-)

Reviewed-by: Mat Martineau <mathew.j.martineau@linux.intel.com>

--
Mat Martineau
Intel

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [MPTCP] Re: [PATCH net-next 0/6] mptcp: avoid workqueue usage for data
  2020-11-27 10:10 ` Paolo Abeni
@ 2020-12-01  2:34 ` Jakub Kicinski
  -1 siblings, 0 replies; 28+ messages in thread
From: Jakub Kicinski @ 2020-12-01  2:34 UTC (permalink / raw)
  To: mptcp

[-- Attachment #1: Type: text/plain, Size: 618 bytes --]

On Fri, 27 Nov 2020 11:10:21 +0100 Paolo Abeni wrote:
> The current locking schema used to protect the MPTCP data-path
> requires the usage of the MPTCP workqueue to process the incoming
> data, depending on trylock result.
> 
> The above poses scalability limits and introduces random delays
> in MPTCP-level acks.
> 
> With this series we use a single spinlock to protect the MPTCP
> data-path, removing the need for workqueue and delayed ack usage.
> 
> This additionally reduces the number of atomic operations required
> per packet and cleans-up considerably the poll/wake-up code.

Applied, thanks!

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [PATCH net-next 0/6] mptcp: avoid workqueue usage for data
@ 2020-12-01  2:34 ` Jakub Kicinski
  0 siblings, 0 replies; 28+ messages in thread
From: Jakub Kicinski @ 2020-12-01  2:34 UTC (permalink / raw)
  To: Paolo Abeni; +Cc: netdev, mptcp, Eric Dumazet

On Fri, 27 Nov 2020 11:10:21 +0100 Paolo Abeni wrote:
> The current locking schema used to protect the MPTCP data-path
> requires the usage of the MPTCP workqueue to process the incoming
> data, depending on trylock result.
> 
> The above poses scalability limits and introduces random delays
> in MPTCP-level acks.
> 
> With this series we use a single spinlock to protect the MPTCP
> data-path, removing the need for workqueue and delayed ack usage.
> 
> This additionally reduces the number of atomic operations required
> per packet and cleans-up considerably the poll/wake-up code.

Applied, thanks!

^ permalink raw reply	[flat|nested] 28+ messages in thread

end of thread, other threads:[~2020-12-01  2:34 UTC | newest]

Thread overview: 28+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-11-30 23:45 [MPTCP] Re: [PATCH net-next 6/6] mptcp: use mptcp release_cb for delayed tasks Mat Martineau
2020-11-30 23:45 ` Mat Martineau
  -- strict thread matches above, loose matches on Subject: below --
2020-12-01  2:34 [MPTCP] Re: [PATCH net-next 0/6] mptcp: avoid workqueue usage for data Jakub Kicinski
2020-12-01  2:34 ` Jakub Kicinski
2020-11-30 23:44 [MPTCP] Re: [PATCH net-next 5/6] mptcp: avoid a few atomic ops in the rx path Mat Martineau
2020-11-30 23:44 ` Mat Martineau
2020-11-30 23:44 [MPTCP] Re: [PATCH net-next 4/6] mptcp: allocate TX skbs in msk context Mat Martineau
2020-11-30 23:44 ` Mat Martineau
2020-11-30 23:43 [MPTCP] Re: [PATCH net-next 3/6] mptcp: protect the rx path with the msk socket spinlock Mat Martineau
2020-11-30 23:43 ` Mat Martineau
2020-11-30 23:43 [MPTCP] Re: [PATCH net-next 2/6] mptcp: implement wmem reservation Mat Martineau
2020-11-30 23:43 ` Mat Martineau
2020-11-30 23:43 [MPTCP] Re: [PATCH net-next 1/6] mptcp: open code mptcp variant for lock_sock Mat Martineau
2020-11-30 23:43 ` Mat Martineau
2020-11-27 10:10 [MPTCP] [PATCH net-next 6/6] mptcp: use mptcp release_cb for delayed tasks Paolo Abeni
2020-11-27 10:10 ` Paolo Abeni
2020-11-27 10:10 [MPTCP] [PATCH net-next 5/6] mptcp: avoid a few atomic ops in the rx path Paolo Abeni
2020-11-27 10:10 ` Paolo Abeni
2020-11-27 10:10 [MPTCP] [PATCH net-next 4/6] mptcp: allocate TX skbs in msk context Paolo Abeni
2020-11-27 10:10 ` Paolo Abeni
2020-11-27 10:10 [MPTCP] [PATCH net-next 3/6] mptcp: protect the rx path with the msk socket spinlock Paolo Abeni
2020-11-27 10:10 ` Paolo Abeni
2020-11-27 10:10 [MPTCP] [PATCH net-next 2/6] mptcp: implement wmem reservation Paolo Abeni
2020-11-27 10:10 ` Paolo Abeni
2020-11-27 10:10 [MPTCP] [PATCH net-next 1/6] mptcp: open code mptcp variant for lock_sock Paolo Abeni
2020-11-27 10:10 ` Paolo Abeni
2020-11-27 10:10 [MPTCP] [PATCH net-next 0/6] mptcp: avoid workqueue usage for data Paolo Abeni
2020-11-27 10:10 ` Paolo Abeni

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.