* [PATCH net-next v2 1/3] net/sock: factor out dequeue/peek with offset code
2017-05-16 9:20 [PATCH net-next v2 0/3] udp: scalability improvements Paolo Abeni
@ 2017-05-16 9:20 ` Paolo Abeni
2017-05-17 2:46 ` [net-next,v2,1/3] " Andrei Vagin
2017-05-16 9:20 ` [PATCH net-next v2 2/3] udp: use a separate rx queue for packet reception Paolo Abeni
` (2 subsequent siblings)
3 siblings, 1 reply; 7+ messages in thread
From: Paolo Abeni @ 2017-05-16 9:20 UTC (permalink / raw)
To: netdev; +Cc: David S. Miller, Eric Dumazet
And update __sk_queue_drop_skb() to work on the specified queue.
This will help the udp protocol to use an additional private
rx queue in a later patch.
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
Acked-by: Eric Dumazet <edumazet@google.com>
---
include/linux/skbuff.h | 7 ++++
include/net/sock.h | 4 +--
net/core/datagram.c | 90 ++++++++++++++++++++++++++++----------------------
3 files changed, 60 insertions(+), 41 deletions(-)
diff --git a/include/linux/skbuff.h b/include/linux/skbuff.h
index a098d95..bfc7892 100644
--- a/include/linux/skbuff.h
+++ b/include/linux/skbuff.h
@@ -3056,6 +3056,13 @@ static inline void skb_frag_list_init(struct sk_buff *skb)
int __skb_wait_for_more_packets(struct sock *sk, int *err, long *timeo_p,
const struct sk_buff *skb);
+struct sk_buff *__skb_try_recv_from_queue(struct sock *sk,
+ struct sk_buff_head *queue,
+ unsigned int flags,
+ void (*destructor)(struct sock *sk,
+ struct sk_buff *skb),
+ int *peeked, int *off, int *err,
+ struct sk_buff **last);
struct sk_buff *__skb_try_recv_datagram(struct sock *sk, unsigned flags,
void (*destructor)(struct sock *sk,
struct sk_buff *skb),
diff --git a/include/net/sock.h b/include/net/sock.h
index 66349e4..49d226f 100644
--- a/include/net/sock.h
+++ b/include/net/sock.h
@@ -2035,8 +2035,8 @@ void sk_reset_timer(struct sock *sk, struct timer_list *timer,
void sk_stop_timer(struct sock *sk, struct timer_list *timer);
-int __sk_queue_drop_skb(struct sock *sk, struct sk_buff *skb,
- unsigned int flags,
+int __sk_queue_drop_skb(struct sock *sk, struct sk_buff_head *sk_queue,
+ struct sk_buff *skb, unsigned int flags,
void (*destructor)(struct sock *sk,
struct sk_buff *skb));
int __sock_queue_rcv_skb(struct sock *sk, struct sk_buff *skb);
diff --git a/net/core/datagram.c b/net/core/datagram.c
index db1866f2..a4592b4 100644
--- a/net/core/datagram.c
+++ b/net/core/datagram.c
@@ -161,6 +161,43 @@ static struct sk_buff *skb_set_peeked(struct sk_buff *skb)
return skb;
}
+struct sk_buff *__skb_try_recv_from_queue(struct sock *sk,
+ struct sk_buff_head *queue,
+ unsigned int flags,
+ void (*destructor)(struct sock *sk,
+ struct sk_buff *skb),
+ int *peeked, int *off, int *err,
+ struct sk_buff **last)
+{
+ struct sk_buff *skb;
+
+ *last = queue->prev;
+ skb_queue_walk(queue, skb) {
+ if (flags & MSG_PEEK) {
+ if (*off >= skb->len && (skb->len || *off ||
+ skb->peeked)) {
+ *off -= skb->len;
+ continue;
+ }
+ if (!skb->len) {
+ skb = skb_set_peeked(skb);
+ if (unlikely(IS_ERR(skb))) {
+ *err = PTR_ERR(skb);
+ return skb;
+ }
+ }
+ *peeked = 1;
+ atomic_inc(&skb->users);
+ } else {
+ __skb_unlink(skb, queue);
+ if (destructor)
+ destructor(sk, skb);
+ }
+ return skb;
+ }
+ return NULL;
+}
+
/**
* __skb_try_recv_datagram - Receive a datagram skbuff
* @sk: socket
@@ -216,46 +253,20 @@ struct sk_buff *__skb_try_recv_datagram(struct sock *sk, unsigned int flags,
*peeked = 0;
do {
+ int _off = *off;
+
/* Again only user level code calls this function, so nothing
* interrupt level will suddenly eat the receive_queue.
*
* Look at current nfs client by the way...
* However, this function was correct in any case. 8)
*/
- int _off = *off;
-
- *last = (struct sk_buff *)queue;
spin_lock_irqsave(&queue->lock, cpu_flags);
- skb_queue_walk(queue, skb) {
- *last = skb;
- if (flags & MSG_PEEK) {
- if (_off >= skb->len && (skb->len || _off ||
- skb->peeked)) {
- _off -= skb->len;
- continue;
- }
- if (!skb->len) {
- skb = skb_set_peeked(skb);
- if (IS_ERR(skb)) {
- error = PTR_ERR(skb);
- spin_unlock_irqrestore(&queue->lock,
- cpu_flags);
- goto no_packet;
- }
- }
- *peeked = 1;
- atomic_inc(&skb->users);
- } else {
- __skb_unlink(skb, queue);
- if (destructor)
- destructor(sk, skb);
- }
- spin_unlock_irqrestore(&queue->lock, cpu_flags);
- *off = _off;
- return skb;
- }
-
+ skb = __skb_try_recv_from_queue(sk, queue, flags, destructor,
+ peeked, &_off, err, last);
spin_unlock_irqrestore(&queue->lock, cpu_flags);
+ if (skb)
+ return skb;
if (!sk_can_busy_loop(sk))
break;
@@ -335,8 +346,8 @@ void __skb_free_datagram_locked(struct sock *sk, struct sk_buff *skb, int len)
}
EXPORT_SYMBOL(__skb_free_datagram_locked);
-int __sk_queue_drop_skb(struct sock *sk, struct sk_buff *skb,
- unsigned int flags,
+int __sk_queue_drop_skb(struct sock *sk, struct sk_buff_head *sk_queue,
+ struct sk_buff *skb, unsigned int flags,
void (*destructor)(struct sock *sk,
struct sk_buff *skb))
{
@@ -344,15 +355,15 @@ int __sk_queue_drop_skb(struct sock *sk, struct sk_buff *skb,
if (flags & MSG_PEEK) {
err = -ENOENT;
- spin_lock_bh(&sk->sk_receive_queue.lock);
- if (skb == skb_peek(&sk->sk_receive_queue)) {
- __skb_unlink(skb, &sk->sk_receive_queue);
+ spin_lock_bh(&sk_queue->lock);
+ if (skb == skb_peek(sk_queue)) {
+ __skb_unlink(skb, sk_queue);
atomic_dec(&skb->users);
if (destructor)
destructor(sk, skb);
err = 0;
}
- spin_unlock_bh(&sk->sk_receive_queue.lock);
+ spin_unlock_bh(&sk_queue->lock);
}
atomic_inc(&sk->sk_drops);
@@ -383,7 +394,8 @@ EXPORT_SYMBOL(__sk_queue_drop_skb);
int skb_kill_datagram(struct sock *sk, struct sk_buff *skb, unsigned int flags)
{
- int err = __sk_queue_drop_skb(sk, skb, flags, NULL);
+ int err = __sk_queue_drop_skb(sk, &sk->sk_receive_queue, skb, flags,
+ NULL);
kfree_skb(skb);
sk_mem_reclaim_partial(sk);
--
2.9.3
^ permalink raw reply related [flat|nested] 7+ messages in thread
* Re: [net-next,v2,1/3] net/sock: factor out dequeue/peek with offset code
2017-05-16 9:20 ` [PATCH net-next v2 1/3] net/sock: factor out dequeue/peek with offset code Paolo Abeni
@ 2017-05-17 2:46 ` Andrei Vagin
0 siblings, 0 replies; 7+ messages in thread
From: Andrei Vagin @ 2017-05-17 2:46 UTC (permalink / raw)
To: Paolo Abeni; +Cc: netdev, David S. Miller, Eric Dumazet
On Tue, May 16, 2017 at 11:20:13AM +0200, Paolo Abeni wrote:
> And update __sk_queue_drop_skb() to work on the specified queue.
> This will help the udp protocol to use an additional private
> rx queue in a later patch.
CRIU tests fails with this patch:
recvmsg(14, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="packet dgram right\0", iov_len=212960}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, MSG_PEEK|MSG_DONTWAIT) = 19 <0.000048>
writev(9, [{iov_base="\4\0\0\0", iov_len=4}, {iov_base="\10\t\20\23", iov_len=4}], 2) = 8 <0.000085>
write(9, "packet dgram right\0", 19) = 19 <0.000062>
recvmsg(14, {msg_namelen=0}, MSG_PEEK|MSG_DONTWAIT) = -1 EFAULT (Bad address) <0.000046>
without this patch, strace looks like this:
g(14, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="packet dgram right\0", iov_len=212960}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, MSG_PEEK|MSG_DONTWAIT) = 19 <0.000024>
writev(9, [{iov_base="\4\0\0\0", iov_len=4}, {iov_base="\10\t\20\23", iov_len=4}], 2) = 8 <0.000037>
write(9, "packet dgram right\0", 19) = 19 <0.000030>
recvmsg(14, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="packet dgram left\0", iov_len=212960}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, MSG_PEEK|MSG_DONTWAIT) = 18 <0.000023>
writev(9, [{iov_base="\4\0\0\0", iov_len=4}, {iov_base="\10\t\20\22", iov_len=4}], 2) = 8 <0.000030>
write(9, "packet dgram left\0", 18) = 18 <0.000030>
recvmsg(14, {msg_namelen=0}, MSG_PEEK|MSG_DONTWAIT) = -1 EAGAIN (Resource temporarily unavailable) <0.000023>
https://travis-ci.org/avagin/criu/jobs/232990442
>
> Signed-off-by: Paolo Abeni <pabeni@redhat.com>
> Acked-by: Eric Dumazet <edumazet@google.com>
> ---
> include/linux/skbuff.h | 7 ++++
> include/net/sock.h | 4 +--
> net/core/datagram.c | 90 ++++++++++++++++++++++++++++----------------------
> 3 files changed, 60 insertions(+), 41 deletions(-)
>
> diff --git a/include/linux/skbuff.h b/include/linux/skbuff.h
> index a098d95..bfc7892 100644
> --- a/include/linux/skbuff.h
> +++ b/include/linux/skbuff.h
> @@ -3056,6 +3056,13 @@ static inline void skb_frag_list_init(struct sk_buff *skb)
>
> int __skb_wait_for_more_packets(struct sock *sk, int *err, long *timeo_p,
> const struct sk_buff *skb);
> +struct sk_buff *__skb_try_recv_from_queue(struct sock *sk,
> + struct sk_buff_head *queue,
> + unsigned int flags,
> + void (*destructor)(struct sock *sk,
> + struct sk_buff *skb),
> + int *peeked, int *off, int *err,
> + struct sk_buff **last);
> struct sk_buff *__skb_try_recv_datagram(struct sock *sk, unsigned flags,
> void (*destructor)(struct sock *sk,
> struct sk_buff *skb),
> diff --git a/include/net/sock.h b/include/net/sock.h
> index 66349e4..49d226f 100644
> --- a/include/net/sock.h
> +++ b/include/net/sock.h
> @@ -2035,8 +2035,8 @@ void sk_reset_timer(struct sock *sk, struct timer_list *timer,
>
> void sk_stop_timer(struct sock *sk, struct timer_list *timer);
>
> -int __sk_queue_drop_skb(struct sock *sk, struct sk_buff *skb,
> - unsigned int flags,
> +int __sk_queue_drop_skb(struct sock *sk, struct sk_buff_head *sk_queue,
> + struct sk_buff *skb, unsigned int flags,
> void (*destructor)(struct sock *sk,
> struct sk_buff *skb));
> int __sock_queue_rcv_skb(struct sock *sk, struct sk_buff *skb);
> diff --git a/net/core/datagram.c b/net/core/datagram.c
> index db1866f2..a4592b4 100644
> --- a/net/core/datagram.c
> +++ b/net/core/datagram.c
> @@ -161,6 +161,43 @@ static struct sk_buff *skb_set_peeked(struct sk_buff *skb)
> return skb;
> }
>
> +struct sk_buff *__skb_try_recv_from_queue(struct sock *sk,
> + struct sk_buff_head *queue,
> + unsigned int flags,
> + void (*destructor)(struct sock *sk,
> + struct sk_buff *skb),
> + int *peeked, int *off, int *err,
> + struct sk_buff **last)
> +{
> + struct sk_buff *skb;
> +
> + *last = queue->prev;
> + skb_queue_walk(queue, skb) {
> + if (flags & MSG_PEEK) {
> + if (*off >= skb->len && (skb->len || *off ||
> + skb->peeked)) {
> + *off -= skb->len;
> + continue;
> + }
> + if (!skb->len) {
> + skb = skb_set_peeked(skb);
> + if (unlikely(IS_ERR(skb))) {
> + *err = PTR_ERR(skb);
> + return skb;
> + }
> + }
> + *peeked = 1;
> + atomic_inc(&skb->users);
> + } else {
> + __skb_unlink(skb, queue);
> + if (destructor)
> + destructor(sk, skb);
> + }
> + return skb;
> + }
> + return NULL;
> +}
> +
> /**
> * __skb_try_recv_datagram - Receive a datagram skbuff
> * @sk: socket
> @@ -216,46 +253,20 @@ struct sk_buff *__skb_try_recv_datagram(struct sock *sk, unsigned int flags,
>
> *peeked = 0;
> do {
> + int _off = *off;
> +
> /* Again only user level code calls this function, so nothing
> * interrupt level will suddenly eat the receive_queue.
> *
> * Look at current nfs client by the way...
> * However, this function was correct in any case. 8)
> */
> - int _off = *off;
> -
> - *last = (struct sk_buff *)queue;
> spin_lock_irqsave(&queue->lock, cpu_flags);
> - skb_queue_walk(queue, skb) {
> - *last = skb;
> - if (flags & MSG_PEEK) {
> - if (_off >= skb->len && (skb->len || _off ||
> - skb->peeked)) {
> - _off -= skb->len;
> - continue;
> - }
> - if (!skb->len) {
> - skb = skb_set_peeked(skb);
> - if (IS_ERR(skb)) {
> - error = PTR_ERR(skb);
> - spin_unlock_irqrestore(&queue->lock,
> - cpu_flags);
> - goto no_packet;
> - }
> - }
> - *peeked = 1;
> - atomic_inc(&skb->users);
> - } else {
> - __skb_unlink(skb, queue);
> - if (destructor)
> - destructor(sk, skb);
> - }
> - spin_unlock_irqrestore(&queue->lock, cpu_flags);
> - *off = _off;
> - return skb;
> - }
> -
> + skb = __skb_try_recv_from_queue(sk, queue, flags, destructor,
> + peeked, &_off, err, last);
> spin_unlock_irqrestore(&queue->lock, cpu_flags);
> + if (skb)
> + return skb;
>
> if (!sk_can_busy_loop(sk))
> break;
> @@ -335,8 +346,8 @@ void __skb_free_datagram_locked(struct sock *sk, struct sk_buff *skb, int len)
> }
> EXPORT_SYMBOL(__skb_free_datagram_locked);
>
> -int __sk_queue_drop_skb(struct sock *sk, struct sk_buff *skb,
> - unsigned int flags,
> +int __sk_queue_drop_skb(struct sock *sk, struct sk_buff_head *sk_queue,
> + struct sk_buff *skb, unsigned int flags,
> void (*destructor)(struct sock *sk,
> struct sk_buff *skb))
> {
> @@ -344,15 +355,15 @@ int __sk_queue_drop_skb(struct sock *sk, struct sk_buff *skb,
>
> if (flags & MSG_PEEK) {
> err = -ENOENT;
> - spin_lock_bh(&sk->sk_receive_queue.lock);
> - if (skb == skb_peek(&sk->sk_receive_queue)) {
> - __skb_unlink(skb, &sk->sk_receive_queue);
> + spin_lock_bh(&sk_queue->lock);
> + if (skb == skb_peek(sk_queue)) {
> + __skb_unlink(skb, sk_queue);
> atomic_dec(&skb->users);
> if (destructor)
> destructor(sk, skb);
> err = 0;
> }
> - spin_unlock_bh(&sk->sk_receive_queue.lock);
> + spin_unlock_bh(&sk_queue->lock);
> }
>
> atomic_inc(&sk->sk_drops);
> @@ -383,7 +394,8 @@ EXPORT_SYMBOL(__sk_queue_drop_skb);
>
> int skb_kill_datagram(struct sock *sk, struct sk_buff *skb, unsigned int flags)
> {
> - int err = __sk_queue_drop_skb(sk, skb, flags, NULL);
> + int err = __sk_queue_drop_skb(sk, &sk->sk_receive_queue, skb, flags,
> + NULL);
>
> kfree_skb(skb);
> sk_mem_reclaim_partial(sk);
^ permalink raw reply [flat|nested] 7+ messages in thread
* [PATCH net-next v2 2/3] udp: use a separate rx queue for packet reception
2017-05-16 9:20 [PATCH net-next v2 0/3] udp: scalability improvements Paolo Abeni
2017-05-16 9:20 ` [PATCH net-next v2 1/3] net/sock: factor out dequeue/peek with offset code Paolo Abeni
@ 2017-05-16 9:20 ` Paolo Abeni
2017-05-16 9:20 ` [PATCH net-next v2 3/3] udp: keep the sk_receive_queue held when splicing Paolo Abeni
2017-05-16 19:41 ` [PATCH net-next v2 0/3] udp: scalability improvements David Miller
3 siblings, 0 replies; 7+ messages in thread
From: Paolo Abeni @ 2017-05-16 9:20 UTC (permalink / raw)
To: netdev; +Cc: David S. Miller, Eric Dumazet
under udp flood the sk_receive_queue spinlock is heavily contended.
This patch try to reduce the contention on such lock adding a
second receive queue to the udp sockets; recvmsg() looks first
in such queue and, only if empty, tries to fetch the data from
sk_receive_queue. The latter is spliced into the newly added
queue every time the receive path has to acquire the
sk_receive_queue lock.
The accounting of forward allocated memory is still protected with
the sk_receive_queue lock, so udp_rmem_release() needs to acquire
both locks when the forward deficit is flushed.
On specific scenarios we can end up acquiring and releasing the
sk_receive_queue lock multiple times; that will be covered by
the next patch
Suggested-by: Eric Dumazet <edumazet@google.com>
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
Acked-by: Eric Dumazet <edumazet@google.com>
---
include/linux/udp.h | 3 ++
include/net/udp.h | 9 +---
include/net/udplite.h | 2 +-
net/ipv4/udp.c | 138 ++++++++++++++++++++++++++++++++++++++++++++------
net/ipv6/udp.c | 3 +-
5 files changed, 131 insertions(+), 24 deletions(-)
diff --git a/include/linux/udp.h b/include/linux/udp.h
index 6cb4061..eaea63b 100644
--- a/include/linux/udp.h
+++ b/include/linux/udp.h
@@ -80,6 +80,9 @@ struct udp_sock {
struct sk_buff *skb,
int nhoff);
+ /* udp_recvmsg try to use this before splicing sk_receive_queue */
+ struct sk_buff_head reader_queue ____cacheline_aligned_in_smp;
+
/* This field is dirtied by udp_recvmsg() */
int forward_deficit;
};
diff --git a/include/net/udp.h b/include/net/udp.h
index 3391dbd..1468dbd 100644
--- a/include/net/udp.h
+++ b/include/net/udp.h
@@ -249,13 +249,8 @@ void udp_destruct_sock(struct sock *sk);
void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len);
int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb);
void udp_skb_destructor(struct sock *sk, struct sk_buff *skb);
-static inline struct sk_buff *
-__skb_recv_udp(struct sock *sk, unsigned int flags, int noblock, int *peeked,
- int *off, int *err)
-{
- return __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0),
- udp_skb_destructor, peeked, off, err);
-}
+struct sk_buff *__skb_recv_udp(struct sock *sk, unsigned int flags,
+ int noblock, int *peeked, int *off, int *err);
static inline struct sk_buff *skb_recv_udp(struct sock *sk, unsigned int flags,
int noblock, int *err)
{
diff --git a/include/net/udplite.h b/include/net/udplite.h
index ea34052..b7a18f6 100644
--- a/include/net/udplite.h
+++ b/include/net/udplite.h
@@ -26,8 +26,8 @@ static __inline__ int udplite_getfrag(void *from, char *to, int offset,
/* Designate sk as UDP-Lite socket */
static inline int udplite_sk_init(struct sock *sk)
{
+ udp_init_sock(sk);
udp_sk(sk)->pcflag = UDPLITE_BIT;
- sk->sk_destruct = udp_destruct_sock;
return 0;
}
diff --git a/net/ipv4/udp.c b/net/ipv4/udp.c
index ea6e4cf..492c76b 100644
--- a/net/ipv4/udp.c
+++ b/net/ipv4/udp.c
@@ -1167,19 +1167,24 @@ int udp_sendpage(struct sock *sk, struct page *page, int offset,
static void udp_rmem_release(struct sock *sk, int size, int partial)
{
struct udp_sock *up = udp_sk(sk);
+ struct sk_buff_head *sk_queue;
int amt;
if (likely(partial)) {
up->forward_deficit += size;
size = up->forward_deficit;
if (size < (sk->sk_rcvbuf >> 2) &&
- !skb_queue_empty(&sk->sk_receive_queue))
+ !skb_queue_empty(&up->reader_queue))
return;
} else {
size += up->forward_deficit;
}
up->forward_deficit = 0;
+ /* acquire the sk_receive_queue for fwd allocated memory scheduling */
+ sk_queue = &sk->sk_receive_queue;
+ spin_lock(&sk_queue->lock);
+
sk->sk_forward_alloc += size;
amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1);
sk->sk_forward_alloc -= amt;
@@ -1188,9 +1193,14 @@ static void udp_rmem_release(struct sock *sk, int size, int partial)
__sk_mem_reduce_allocated(sk, amt >> SK_MEM_QUANTUM_SHIFT);
atomic_sub(size, &sk->sk_rmem_alloc);
+
+ /* this can save us from acquiring the rx queue lock on next receive */
+ skb_queue_splice_tail_init(sk_queue, &up->reader_queue);
+
+ spin_unlock(&sk_queue->lock);
}
-/* Note: called with sk_receive_queue.lock held.
+/* Note: called with reader_queue.lock held.
* Instead of using skb->truesize here, find a copy of it in skb->dev_scratch
* This avoids a cache line miss while receive_queue lock is held.
* Look at __udp_enqueue_schedule_skb() to find where this copy is done.
@@ -1306,10 +1316,12 @@ EXPORT_SYMBOL_GPL(__udp_enqueue_schedule_skb);
void udp_destruct_sock(struct sock *sk)
{
/* reclaim completely the forward allocated memory */
+ struct udp_sock *up = udp_sk(sk);
unsigned int total = 0;
struct sk_buff *skb;
- while ((skb = __skb_dequeue(&sk->sk_receive_queue)) != NULL) {
+ skb_queue_splice_tail_init(&sk->sk_receive_queue, &up->reader_queue);
+ while ((skb = __skb_dequeue(&up->reader_queue)) != NULL) {
total += skb->truesize;
kfree_skb(skb);
}
@@ -1321,6 +1333,7 @@ EXPORT_SYMBOL_GPL(udp_destruct_sock);
int udp_init_sock(struct sock *sk)
{
+ skb_queue_head_init(&udp_sk(sk)->reader_queue);
sk->sk_destruct = udp_destruct_sock;
return 0;
}
@@ -1338,6 +1351,26 @@ void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len)
}
EXPORT_SYMBOL_GPL(skb_consume_udp);
+static struct sk_buff *__first_packet_length(struct sock *sk,
+ struct sk_buff_head *rcvq,
+ int *total)
+{
+ struct sk_buff *skb;
+
+ while ((skb = skb_peek(rcvq)) != NULL &&
+ udp_lib_checksum_complete(skb)) {
+ __UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS,
+ IS_UDPLITE(sk));
+ __UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS,
+ IS_UDPLITE(sk));
+ atomic_inc(&sk->sk_drops);
+ __skb_unlink(skb, rcvq);
+ *total += skb->truesize;
+ kfree_skb(skb);
+ }
+ return skb;
+}
+
/**
* first_packet_length - return length of first packet in receive queue
* @sk: socket
@@ -1347,22 +1380,20 @@ EXPORT_SYMBOL_GPL(skb_consume_udp);
*/
static int first_packet_length(struct sock *sk)
{
- struct sk_buff_head *rcvq = &sk->sk_receive_queue;
+ struct sk_buff_head *rcvq = &udp_sk(sk)->reader_queue;
+ struct sk_buff_head *sk_queue = &sk->sk_receive_queue;
struct sk_buff *skb;
int total = 0;
int res;
spin_lock_bh(&rcvq->lock);
- while ((skb = skb_peek(rcvq)) != NULL &&
- udp_lib_checksum_complete(skb)) {
- __UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS,
- IS_UDPLITE(sk));
- __UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS,
- IS_UDPLITE(sk));
- atomic_inc(&sk->sk_drops);
- __skb_unlink(skb, rcvq);
- total += skb->truesize;
- kfree_skb(skb);
+ skb = __first_packet_length(sk, rcvq, &total);
+ if (!skb && !skb_queue_empty(sk_queue)) {
+ spin_lock(&sk_queue->lock);
+ skb_queue_splice_tail_init(sk_queue, rcvq);
+ spin_unlock(&sk_queue->lock);
+
+ skb = __first_packet_length(sk, rcvq, &total);
}
res = skb ? skb->len : -1;
if (total)
@@ -1400,6 +1431,79 @@ int udp_ioctl(struct sock *sk, int cmd, unsigned long arg)
}
EXPORT_SYMBOL(udp_ioctl);
+struct sk_buff *__skb_recv_udp(struct sock *sk, unsigned int flags,
+ int noblock, int *peeked, int *off, int *err)
+{
+ struct sk_buff_head *sk_queue = &sk->sk_receive_queue;
+ struct sk_buff_head *queue;
+ struct sk_buff *last;
+ long timeo;
+ int error;
+
+ queue = &udp_sk(sk)->reader_queue;
+ flags |= noblock ? MSG_DONTWAIT : 0;
+ timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
+ do {
+ struct sk_buff *skb;
+
+ error = sock_error(sk);
+ if (error)
+ break;
+
+ error = -EAGAIN;
+ *peeked = 0;
+ do {
+ int _off = *off;
+
+ spin_lock_bh(&queue->lock);
+ skb = __skb_try_recv_from_queue(sk, queue, flags,
+ udp_skb_destructor,
+ peeked, &_off, err,
+ &last);
+ if (skb) {
+ spin_unlock_bh(&queue->lock);
+ *off = _off;
+ return skb;
+ }
+
+ if (skb_queue_empty(sk_queue)) {
+ spin_unlock_bh(&queue->lock);
+ goto busy_check;
+ }
+
+ /* refill the reader queue and walk it again */
+ _off = *off;
+ spin_lock(&sk_queue->lock);
+ skb_queue_splice_tail_init(sk_queue, queue);
+ spin_unlock(&sk_queue->lock);
+
+ skb = __skb_try_recv_from_queue(sk, queue, flags,
+ udp_skb_destructor,
+ peeked, &_off, err,
+ &last);
+ spin_unlock_bh(&queue->lock);
+ if (skb) {
+ *off = _off;
+ return skb;
+ }
+
+busy_check:
+ if (!sk_can_busy_loop(sk))
+ break;
+
+ sk_busy_loop(sk, flags & MSG_DONTWAIT);
+ } while (!skb_queue_empty(sk_queue));
+
+ /* sk_queue is empty, reader_queue may contain peeked packets */
+ } while (timeo &&
+ !__skb_wait_for_more_packets(sk, &error, &timeo,
+ (struct sk_buff *)sk_queue));
+
+ *err = error;
+ return NULL;
+}
+EXPORT_SYMBOL_GPL(__skb_recv_udp);
+
/*
* This should be easy, if there is something there we
* return it, otherwise we block.
@@ -1490,7 +1594,8 @@ int udp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int noblock,
return err;
csum_copy_err:
- if (!__sk_queue_drop_skb(sk, skb, flags, udp_skb_destructor)) {
+ if (!__sk_queue_drop_skb(sk, &udp_sk(sk)->reader_queue, skb, flags,
+ udp_skb_destructor)) {
UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS, is_udplite);
UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS, is_udplite);
}
@@ -2325,6 +2430,9 @@ unsigned int udp_poll(struct file *file, struct socket *sock, poll_table *wait)
unsigned int mask = datagram_poll(file, sock, wait);
struct sock *sk = sock->sk;
+ if (!skb_queue_empty(&udp_sk(sk)->reader_queue))
+ mask |= POLLIN | POLLRDNORM;
+
sock_rps_record_flow(sk);
/* Check for false positives due to checksum errors */
diff --git a/net/ipv6/udp.c b/net/ipv6/udp.c
index 04862ab..f78fdf8 100644
--- a/net/ipv6/udp.c
+++ b/net/ipv6/udp.c
@@ -455,7 +455,8 @@ int udpv6_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
return err;
csum_copy_err:
- if (!__sk_queue_drop_skb(sk, skb, flags, udp_skb_destructor)) {
+ if (!__sk_queue_drop_skb(sk, &udp_sk(sk)->reader_queue, skb, flags,
+ udp_skb_destructor)) {
if (is_udp4) {
UDP_INC_STATS(sock_net(sk),
UDP_MIB_CSUMERRORS, is_udplite);
--
2.9.3
^ permalink raw reply related [flat|nested] 7+ messages in thread
* [PATCH net-next v2 3/3] udp: keep the sk_receive_queue held when splicing
2017-05-16 9:20 [PATCH net-next v2 0/3] udp: scalability improvements Paolo Abeni
2017-05-16 9:20 ` [PATCH net-next v2 1/3] net/sock: factor out dequeue/peek with offset code Paolo Abeni
2017-05-16 9:20 ` [PATCH net-next v2 2/3] udp: use a separate rx queue for packet reception Paolo Abeni
@ 2017-05-16 9:20 ` Paolo Abeni
2017-05-16 14:12 ` Eric Dumazet
2017-05-16 19:41 ` [PATCH net-next v2 0/3] udp: scalability improvements David Miller
3 siblings, 1 reply; 7+ messages in thread
From: Paolo Abeni @ 2017-05-16 9:20 UTC (permalink / raw)
To: netdev; +Cc: David S. Miller, Eric Dumazet
On packet reception, when we are forced to splice the
sk_receive_queue, we can keep the related lock held, so
that we can avoid re-acquiring it, if fwd memory
scheduling is required.
v1 -> v2:
the rx_queue_lock_held param in udp_rmem_release() is
now a bool
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
---
net/ipv4/udp.c | 36 ++++++++++++++++++++++++++----------
1 file changed, 26 insertions(+), 10 deletions(-)
diff --git a/net/ipv4/udp.c b/net/ipv4/udp.c
index 492c76b..7bd56c9 100644
--- a/net/ipv4/udp.c
+++ b/net/ipv4/udp.c
@@ -1164,7 +1164,8 @@ int udp_sendpage(struct sock *sk, struct page *page, int offset,
}
/* fully reclaim rmem/fwd memory allocated for skb */
-static void udp_rmem_release(struct sock *sk, int size, int partial)
+static void udp_rmem_release(struct sock *sk, int size, int partial,
+ bool rx_queue_lock_held)
{
struct udp_sock *up = udp_sk(sk);
struct sk_buff_head *sk_queue;
@@ -1181,9 +1182,13 @@ static void udp_rmem_release(struct sock *sk, int size, int partial)
}
up->forward_deficit = 0;
- /* acquire the sk_receive_queue for fwd allocated memory scheduling */
+ /* acquire the sk_receive_queue for fwd allocated memory scheduling,
+ * if the called don't held it already
+ */
sk_queue = &sk->sk_receive_queue;
- spin_lock(&sk_queue->lock);
+ if (!rx_queue_lock_held)
+ spin_lock(&sk_queue->lock);
+
sk->sk_forward_alloc += size;
amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1);
@@ -1197,7 +1202,8 @@ static void udp_rmem_release(struct sock *sk, int size, int partial)
/* this can save us from acquiring the rx queue lock on next receive */
skb_queue_splice_tail_init(sk_queue, &up->reader_queue);
- spin_unlock(&sk_queue->lock);
+ if (!rx_queue_lock_held)
+ spin_unlock(&sk_queue->lock);
}
/* Note: called with reader_queue.lock held.
@@ -1207,10 +1213,16 @@ static void udp_rmem_release(struct sock *sk, int size, int partial)
*/
void udp_skb_destructor(struct sock *sk, struct sk_buff *skb)
{
- udp_rmem_release(sk, skb->dev_scratch, 1);
+ udp_rmem_release(sk, skb->dev_scratch, 1, false);
}
EXPORT_SYMBOL(udp_skb_destructor);
+/* as above, but the caller held the rx queue lock, too */
+void udp_skb_dtor_locked(struct sock *sk, struct sk_buff *skb)
+{
+ udp_rmem_release(sk, skb->dev_scratch, 1, true);
+}
+
/* Idea of busylocks is to let producers grab an extra spinlock
* to relieve pressure on the receive_queue spinlock shared by consumer.
* Under flood, this means that only one producer can be in line
@@ -1325,7 +1337,7 @@ void udp_destruct_sock(struct sock *sk)
total += skb->truesize;
kfree_skb(skb);
}
- udp_rmem_release(sk, total, 0);
+ udp_rmem_release(sk, total, 0, true);
inet_sock_destruct(sk);
}
@@ -1397,7 +1409,7 @@ static int first_packet_length(struct sock *sk)
}
res = skb ? skb->len : -1;
if (total)
- udp_rmem_release(sk, total, 1);
+ udp_rmem_release(sk, total, 1, false);
spin_unlock_bh(&rcvq->lock);
return res;
}
@@ -1471,16 +1483,20 @@ struct sk_buff *__skb_recv_udp(struct sock *sk, unsigned int flags,
goto busy_check;
}
- /* refill the reader queue and walk it again */
+ /* refill the reader queue and walk it again
+ * keep both queues locked to avoid re-acquiring
+ * the sk_receive_queue lock if fwd memory scheduling
+ * is needed.
+ */
_off = *off;
spin_lock(&sk_queue->lock);
skb_queue_splice_tail_init(sk_queue, queue);
- spin_unlock(&sk_queue->lock);
skb = __skb_try_recv_from_queue(sk, queue, flags,
- udp_skb_destructor,
+ udp_skb_dtor_locked,
peeked, &_off, err,
&last);
+ spin_unlock(&sk_queue->lock);
spin_unlock_bh(&queue->lock);
if (skb) {
*off = _off;
--
2.9.3
^ permalink raw reply related [flat|nested] 7+ messages in thread
* Re: [PATCH net-next v2 0/3] udp: scalability improvements
2017-05-16 9:20 [PATCH net-next v2 0/3] udp: scalability improvements Paolo Abeni
` (2 preceding siblings ...)
2017-05-16 9:20 ` [PATCH net-next v2 3/3] udp: keep the sk_receive_queue held when splicing Paolo Abeni
@ 2017-05-16 19:41 ` David Miller
3 siblings, 0 replies; 7+ messages in thread
From: David Miller @ 2017-05-16 19:41 UTC (permalink / raw)
To: pabeni; +Cc: netdev, edumazet
From: Paolo Abeni <pabeni@redhat.com>
Date: Tue, 16 May 2017 11:20:12 +0200
> This patch series implement an idea suggested by Eric Dumazet to
> reduce the contention of the udp sk_receive_queue lock when the socket is
> under flood.
Series applied, thanks a lot.
^ permalink raw reply [flat|nested] 7+ messages in thread