All of lore.kernel.org
 help / color / mirror / Atom feed
From: Paolo Abeni <pabeni@redhat.com>
To: netdev@vger.kernel.org
Cc: "David S. Miller" <davem@davemloft.net>,
	Eric Dumazet <edumazet@google.com>
Subject: [PATCH net-next 2/3] udp: use a separate rx queue for packet reception
Date: Mon, 15 May 2017 11:01:43 +0200	[thread overview]
Message-ID: <e97d459f9b0af848acde4a51d1ac434b15e5a6b7.1494837879.git.pabeni@redhat.com> (raw)
In-Reply-To: <cover.1494837879.git.pabeni@redhat.com>

under udp flood the sk_receive_queue spinlock is heavily contended.
This patch try to reduce the contention on such lock adding a
second receive queue to the udp sockets; recvmsg() looks first
in such queue and, only if empty, tries to fetch the data from
sk_receive_queue. The latter is spliced into the newly added
queue every time the receive path has to acquire the
sk_receive_queue lock.

The accounting of forward allocated memory is still protected with
the sk_receive_queue lock, so udp_rmem_release() needs to acquire
both locks when the forward deficit is flushed.

On specific scenarios we can end up acquiring and releasing the
sk_receive_queue lock multiple times; that will be covered by
the next patch

Suggested-by: Eric Dumazet <edumazet@google.com>
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
---
 include/linux/udp.h   |   3 ++
 include/net/udp.h     |   9 +---
 include/net/udplite.h |   2 +-
 net/ipv4/udp.c        | 138 ++++++++++++++++++++++++++++++++++++++++++++------
 net/ipv6/udp.c        |   3 +-
 5 files changed, 131 insertions(+), 24 deletions(-)

diff --git a/include/linux/udp.h b/include/linux/udp.h
index 6cb4061..eaea63b 100644
--- a/include/linux/udp.h
+++ b/include/linux/udp.h
@@ -80,6 +80,9 @@ struct udp_sock {
 						struct sk_buff *skb,
 						int nhoff);
 
+	/* udp_recvmsg try to use this before splicing sk_receive_queue */
+	struct sk_buff_head	reader_queue ____cacheline_aligned_in_smp;
+
 	/* This field is dirtied by udp_recvmsg() */
 	int		forward_deficit;
 };
diff --git a/include/net/udp.h b/include/net/udp.h
index 3391dbd..1468dbd 100644
--- a/include/net/udp.h
+++ b/include/net/udp.h
@@ -249,13 +249,8 @@ void udp_destruct_sock(struct sock *sk);
 void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len);
 int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb);
 void udp_skb_destructor(struct sock *sk, struct sk_buff *skb);
-static inline struct sk_buff *
-__skb_recv_udp(struct sock *sk, unsigned int flags, int noblock, int *peeked,
-	       int *off, int *err)
-{
-	return __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0),
-				   udp_skb_destructor, peeked, off, err);
-}
+struct sk_buff *__skb_recv_udp(struct sock *sk, unsigned int flags,
+			       int noblock, int *peeked, int *off, int *err);
 static inline struct sk_buff *skb_recv_udp(struct sock *sk, unsigned int flags,
 					   int noblock, int *err)
 {
diff --git a/include/net/udplite.h b/include/net/udplite.h
index ea34052..b7a18f6 100644
--- a/include/net/udplite.h
+++ b/include/net/udplite.h
@@ -26,8 +26,8 @@ static __inline__ int udplite_getfrag(void *from, char *to, int  offset,
 /* Designate sk as UDP-Lite socket */
 static inline int udplite_sk_init(struct sock *sk)
 {
+	udp_init_sock(sk);
 	udp_sk(sk)->pcflag = UDPLITE_BIT;
-	sk->sk_destruct = udp_destruct_sock;
 	return 0;
 }
 
diff --git a/net/ipv4/udp.c b/net/ipv4/udp.c
index ea6e4cf..492c76b 100644
--- a/net/ipv4/udp.c
+++ b/net/ipv4/udp.c
@@ -1167,19 +1167,24 @@ int udp_sendpage(struct sock *sk, struct page *page, int offset,
 static void udp_rmem_release(struct sock *sk, int size, int partial)
 {
 	struct udp_sock *up = udp_sk(sk);
+	struct sk_buff_head *sk_queue;
 	int amt;
 
 	if (likely(partial)) {
 		up->forward_deficit += size;
 		size = up->forward_deficit;
 		if (size < (sk->sk_rcvbuf >> 2) &&
-		    !skb_queue_empty(&sk->sk_receive_queue))
+		    !skb_queue_empty(&up->reader_queue))
 			return;
 	} else {
 		size += up->forward_deficit;
 	}
 	up->forward_deficit = 0;
 
+	/* acquire the sk_receive_queue for fwd allocated memory scheduling */
+	sk_queue = &sk->sk_receive_queue;
+	spin_lock(&sk_queue->lock);
+
 	sk->sk_forward_alloc += size;
 	amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1);
 	sk->sk_forward_alloc -= amt;
@@ -1188,9 +1193,14 @@ static void udp_rmem_release(struct sock *sk, int size, int partial)
 		__sk_mem_reduce_allocated(sk, amt >> SK_MEM_QUANTUM_SHIFT);
 
 	atomic_sub(size, &sk->sk_rmem_alloc);
+
+	/* this can save us from acquiring the rx queue lock on next receive */
+	skb_queue_splice_tail_init(sk_queue, &up->reader_queue);
+
+	spin_unlock(&sk_queue->lock);
 }
 
-/* Note: called with sk_receive_queue.lock held.
+/* Note: called with reader_queue.lock held.
  * Instead of using skb->truesize here, find a copy of it in skb->dev_scratch
  * This avoids a cache line miss while receive_queue lock is held.
  * Look at __udp_enqueue_schedule_skb() to find where this copy is done.
@@ -1306,10 +1316,12 @@ EXPORT_SYMBOL_GPL(__udp_enqueue_schedule_skb);
 void udp_destruct_sock(struct sock *sk)
 {
 	/* reclaim completely the forward allocated memory */
+	struct udp_sock *up = udp_sk(sk);
 	unsigned int total = 0;
 	struct sk_buff *skb;
 
-	while ((skb = __skb_dequeue(&sk->sk_receive_queue)) != NULL) {
+	skb_queue_splice_tail_init(&sk->sk_receive_queue, &up->reader_queue);
+	while ((skb = __skb_dequeue(&up->reader_queue)) != NULL) {
 		total += skb->truesize;
 		kfree_skb(skb);
 	}
@@ -1321,6 +1333,7 @@ EXPORT_SYMBOL_GPL(udp_destruct_sock);
 
 int udp_init_sock(struct sock *sk)
 {
+	skb_queue_head_init(&udp_sk(sk)->reader_queue);
 	sk->sk_destruct = udp_destruct_sock;
 	return 0;
 }
@@ -1338,6 +1351,26 @@ void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len)
 }
 EXPORT_SYMBOL_GPL(skb_consume_udp);
 
+static struct sk_buff *__first_packet_length(struct sock *sk,
+					     struct sk_buff_head *rcvq,
+					     int *total)
+{
+	struct sk_buff *skb;
+
+	while ((skb = skb_peek(rcvq)) != NULL &&
+	       udp_lib_checksum_complete(skb)) {
+		__UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS,
+				IS_UDPLITE(sk));
+		__UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS,
+				IS_UDPLITE(sk));
+		atomic_inc(&sk->sk_drops);
+		__skb_unlink(skb, rcvq);
+		*total += skb->truesize;
+		kfree_skb(skb);
+	}
+	return skb;
+}
+
 /**
  *	first_packet_length	- return length of first packet in receive queue
  *	@sk: socket
@@ -1347,22 +1380,20 @@ EXPORT_SYMBOL_GPL(skb_consume_udp);
  */
 static int first_packet_length(struct sock *sk)
 {
-	struct sk_buff_head *rcvq = &sk->sk_receive_queue;
+	struct sk_buff_head *rcvq = &udp_sk(sk)->reader_queue;
+	struct sk_buff_head *sk_queue = &sk->sk_receive_queue;
 	struct sk_buff *skb;
 	int total = 0;
 	int res;
 
 	spin_lock_bh(&rcvq->lock);
-	while ((skb = skb_peek(rcvq)) != NULL &&
-		udp_lib_checksum_complete(skb)) {
-		__UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS,
-				IS_UDPLITE(sk));
-		__UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS,
-				IS_UDPLITE(sk));
-		atomic_inc(&sk->sk_drops);
-		__skb_unlink(skb, rcvq);
-		total += skb->truesize;
-		kfree_skb(skb);
+	skb = __first_packet_length(sk, rcvq, &total);
+	if (!skb && !skb_queue_empty(sk_queue)) {
+		spin_lock(&sk_queue->lock);
+		skb_queue_splice_tail_init(sk_queue, rcvq);
+		spin_unlock(&sk_queue->lock);
+
+		skb = __first_packet_length(sk, rcvq, &total);
 	}
 	res = skb ? skb->len : -1;
 	if (total)
@@ -1400,6 +1431,79 @@ int udp_ioctl(struct sock *sk, int cmd, unsigned long arg)
 }
 EXPORT_SYMBOL(udp_ioctl);
 
+struct sk_buff *__skb_recv_udp(struct sock *sk, unsigned int flags,
+			       int noblock, int *peeked, int *off, int *err)
+{
+	struct sk_buff_head *sk_queue = &sk->sk_receive_queue;
+	struct sk_buff_head *queue;
+	struct sk_buff *last;
+	long timeo;
+	int error;
+
+	queue = &udp_sk(sk)->reader_queue;
+	flags |= noblock ? MSG_DONTWAIT : 0;
+	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
+	do {
+		struct sk_buff *skb;
+
+		error = sock_error(sk);
+		if (error)
+			break;
+
+		error = -EAGAIN;
+		*peeked = 0;
+		do {
+			int _off = *off;
+
+			spin_lock_bh(&queue->lock);
+			skb = __skb_try_recv_from_queue(sk, queue, flags,
+							udp_skb_destructor,
+							peeked, &_off, err,
+							&last);
+			if (skb) {
+				spin_unlock_bh(&queue->lock);
+				*off = _off;
+				return skb;
+			}
+
+			if (skb_queue_empty(sk_queue)) {
+				spin_unlock_bh(&queue->lock);
+				goto busy_check;
+			}
+
+			/* refill the reader queue and walk it again */
+			_off = *off;
+			spin_lock(&sk_queue->lock);
+			skb_queue_splice_tail_init(sk_queue, queue);
+			spin_unlock(&sk_queue->lock);
+
+			skb = __skb_try_recv_from_queue(sk, queue, flags,
+							udp_skb_destructor,
+							peeked, &_off, err,
+							&last);
+			spin_unlock_bh(&queue->lock);
+			if (skb) {
+				*off = _off;
+				return skb;
+			}
+
+busy_check:
+			if (!sk_can_busy_loop(sk))
+				break;
+
+			sk_busy_loop(sk, flags & MSG_DONTWAIT);
+		} while (!skb_queue_empty(sk_queue));
+
+		/* sk_queue is empty, reader_queue may contain peeked packets */
+	} while (timeo &&
+		 !__skb_wait_for_more_packets(sk, &error, &timeo,
+					      (struct sk_buff *)sk_queue));
+
+	*err = error;
+	return NULL;
+}
+EXPORT_SYMBOL_GPL(__skb_recv_udp);
+
 /*
  * 	This should be easy, if there is something there we
  * 	return it, otherwise we block.
@@ -1490,7 +1594,8 @@ int udp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int noblock,
 	return err;
 
 csum_copy_err:
-	if (!__sk_queue_drop_skb(sk, skb, flags, udp_skb_destructor)) {
+	if (!__sk_queue_drop_skb(sk, &udp_sk(sk)->reader_queue, skb, flags,
+				 udp_skb_destructor)) {
 		UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS, is_udplite);
 		UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS, is_udplite);
 	}
@@ -2325,6 +2430,9 @@ unsigned int udp_poll(struct file *file, struct socket *sock, poll_table *wait)
 	unsigned int mask = datagram_poll(file, sock, wait);
 	struct sock *sk = sock->sk;
 
+	if (!skb_queue_empty(&udp_sk(sk)->reader_queue))
+		mask |= POLLIN | POLLRDNORM;
+
 	sock_rps_record_flow(sk);
 
 	/* Check for false positives due to checksum errors */
diff --git a/net/ipv6/udp.c b/net/ipv6/udp.c
index 04862ab..f78fdf8 100644
--- a/net/ipv6/udp.c
+++ b/net/ipv6/udp.c
@@ -455,7 +455,8 @@ int udpv6_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
 	return err;
 
 csum_copy_err:
-	if (!__sk_queue_drop_skb(sk, skb, flags, udp_skb_destructor)) {
+	if (!__sk_queue_drop_skb(sk, &udp_sk(sk)->reader_queue, skb, flags,
+				 udp_skb_destructor)) {
 		if (is_udp4) {
 			UDP_INC_STATS(sock_net(sk),
 				      UDP_MIB_CSUMERRORS, is_udplite);
-- 
2.9.3

  parent reply	other threads:[~2017-05-15  9:03 UTC|newest]

Thread overview: 10+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2017-05-15  9:01 [PATCH net-next 0/3] udp: scalability improvements Paolo Abeni
2017-05-15  9:01 ` [PATCH net-next 1/3] net/sock: factor out dequeue/peek with offset code Paolo Abeni
2017-05-15 16:02   ` Eric Dumazet
2018-10-23  4:49   ` Alexei Starovoitov
2018-10-23  7:28     ` Paolo Abeni
2018-10-24 21:23       ` Alexei Starovoitov
2017-05-15  9:01 ` Paolo Abeni [this message]
2017-05-15 16:10   ` [PATCH net-next 2/3] udp: use a separate rx queue for packet reception Eric Dumazet
2017-05-15  9:01 ` [PATCH net-next 3/3] udp: keep the sk_receive_queue held when splicing Paolo Abeni
2017-05-15 16:11   ` Eric Dumazet

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=e97d459f9b0af848acde4a51d1ac434b15e5a6b7.1494837879.git.pabeni@redhat.com \
    --to=pabeni@redhat.com \
    --cc=davem@davemloft.net \
    --cc=edumazet@google.com \
    --cc=netdev@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.