* [PATCH bpf-next v2 1/2] xsk: update rings for load-acquire/store-release barriers
2021-03-05 9:41 [PATCH bpf-next v2 0/2] load-acquire/store-release barriers for AF_XDP rings Björn Töpel
@ 2021-03-05 9:41 ` Björn Töpel
2021-03-05 9:41 ` [PATCH bpf-next v2 2/2] libbpf, xsk: add libbpf_smp_store_release libbpf_smp_load_acquire Björn Töpel
` (2 subsequent siblings)
3 siblings, 0 replies; 5+ messages in thread
From: Björn Töpel @ 2021-03-05 9:41 UTC (permalink / raw)
To: ast, daniel, netdev, bpf
Cc: Björn Töpel, magnus.karlsson, jonathan.lemon, maximmi,
andrii, toke, will, paulmck, stern
From: Björn Töpel <bjorn.topel@intel.com>
Currently, the AF_XDP rings uses general smp_{r,w,}mb() barriers on
the kernel-side. On most modern architectures
load-acquire/store-release barriers perform better, and results in
simpler code for circular ring buffers.
This change updates the XDP socket rings to use
load-acquire/store-release barriers.
It is important to note that changing from the old smp_{r,w,}mb()
barriers, to load-acquire/store-release barriers does not break
compatibility. The old semantics work with the new one, and vice
versa.
As pointed out by "Documentation/memory-barriers.txt" in the "SMP
BARRIER PAIRING" section:
"General barriers pair with each other, though they also pair with
most other types of barriers, albeit without multicopy atomicity.
An acquire barrier pairs with a release barrier, but both may also
pair with other barriers, including of course general barriers."
How different barriers behaves and pairs is outlined in
"tools/memory-model/Documentation/cheatsheet.txt".
In order to make sure that compatibility is not broken, LKMM herd7
based litmus tests can be constructed and verified.
We generalize the XDP socket ring to a one entry ring, and create two
scenarios; One where the ring is full, where only the consumer can
proceed, followed by the producer. One where the ring is empty, where
only the producer can proceed, followed by the consumer. Each scenario
is then expanded to four different tests: general producer/general
consumer, general producer/acqrel consumer, acqrel producer/general
consumer, acqrel producer/acqrel consumer. In total eight tests.
The empty ring test:
C spsc-rb+empty
// Simple one entry ring:
// prod cons allowed action prod cons
// 0 0 => prod => 1 0
// 0 1 => cons => 0 0
// 1 0 => cons => 1 1
// 1 1 => prod => 0 1
{}
// We start at prod==0, cons==0, data==0, i.e. nothing has been
// written to the ring. From here only the producer can start, and
// should write 1. Afterwards, consumer can continue and read 1 to
// data. Can we enter state prod==1, cons==1, but consumer observed
// the incorrect value of 0?
P0(int *prod, int *cons, int *data)
{
... producer
}
P1(int *prod, int *cons, int *data)
{
... consumer
}
exists( 1:d=0 /\ prod=1 /\ cons=1 );
The full ring test:
C spsc-rb+full
// Simple one entry ring:
// prod cons allowed action prod cons
// 0 0 => prod => 1 0
// 0 1 => cons => 0 0
// 1 0 => cons => 1 1
// 1 1 => prod => 0 1
{ prod = 1; }
// We start at prod==1, cons==0, data==1, i.e. producer has
// written 0, so from here only the consumer can start, and should
// consume 0. Afterwards, producer can continue and write 1 to
// data. Can we enter state prod==0, cons==1, but consumer observed
// the write of 1?
P0(int *prod, int *cons, int *data)
{
... producer
}
P1(int *prod, int *cons, int *data)
{
... consumer
}
exists( 1:d=1 /\ prod=0 /\ cons=1 );
where P0 and P1 are:
P0(int *prod, int *cons, int *data)
{
int p;
p = READ_ONCE(*prod);
if (READ_ONCE(*cons) == p) {
WRITE_ONCE(*data, 1);
smp_wmb();
WRITE_ONCE(*prod, p ^ 1);
}
}
P0(int *prod, int *cons, int *data)
{
int p;
p = READ_ONCE(*prod);
if (READ_ONCE(*cons) == p) {
WRITE_ONCE(*data, 1);
smp_store_release(prod, p ^ 1);
}
}
P1(int *prod, int *cons, int *data)
{
int c;
int d = -1;
c = READ_ONCE(*cons);
if (READ_ONCE(*prod) != c) {
smp_rmb();
d = READ_ONCE(*data);
smp_mb();
WRITE_ONCE(*cons, c ^ 1);
}
}
P1(int *prod, int *cons, int *data)
{
int c;
int d = -1;
c = READ_ONCE(*cons);
if (smp_load_acquire(prod) != c) {
d = READ_ONCE(*data);
smp_store_release(cons, c ^ 1);
}
}
The full LKMM litmus tests are found at [1].
On x86-64 systems the l2fwd AF_XDP xdpsock sample performance
increases by 1%. This is mostly due to that the smp_mb() is removed,
which is a relatively expensive operation on these
platforms. Weakly-ordered platforms, such as ARM64 might benefit even
more.
[1] https://github.com/bjoto/litmus-xsk
Signed-off-by: Björn Töpel <bjorn.topel@intel.com>
---
net/xdp/xsk_queue.h | 30 +++++++++++++-----------------
1 file changed, 13 insertions(+), 17 deletions(-)
diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
index 2823b7c3302d..2ac3802c2cd7 100644
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -47,19 +47,18 @@ struct xsk_queue {
u64 queue_empty_descs;
};
-/* The structure of the shared state of the rings are the same as the
- * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion
- * ring, the kernel is the producer and user space is the consumer. For
- * the Tx and fill rings, the kernel is the consumer and user space is
- * the producer.
+/* The structure of the shared state of the rings are a simple
+ * circular buffer, as outlined in
+ * Documentation/core-api/circular-buffers.rst. For the Rx and
+ * completion ring, the kernel is the producer and user space is the
+ * consumer. For the Tx and fill rings, the kernel is the consumer and
+ * user space is the producer.
*
* producer consumer
*
- * if (LOAD ->consumer) { LOAD ->producer
- * (A) smp_rmb() (C)
+ * if (LOAD ->consumer) { (A) LOAD.acq ->producer (C)
* STORE $data LOAD $data
- * smp_wmb() (B) smp_mb() (D)
- * STORE ->producer STORE ->consumer
+ * STORE.rel ->producer (B) STORE.rel ->consumer (D)
* }
*
* (A) pairs with (D), and (B) pairs with (C).
@@ -78,7 +77,8 @@ struct xsk_queue {
*
* (A) is a control dependency that separates the load of ->consumer
* from the stores of $data. In case ->consumer indicates there is no
- * room in the buffer to store $data we do not. So no barrier is needed.
+ * room in the buffer to store $data we do not. The dependency will
+ * order both of the stores after the loads. So no barrier is needed.
*
* (D) protects the load of the data to be observed to happen after the
* store of the consumer pointer. If we did not have this memory
@@ -227,15 +227,13 @@ static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q,
static inline void __xskq_cons_release(struct xsk_queue *q)
{
- smp_mb(); /* D, matches A */
- WRITE_ONCE(q->ring->consumer, q->cached_cons);
+ smp_store_release(&q->ring->consumer, q->cached_cons); /* D, matchees A */
}
static inline void __xskq_cons_peek(struct xsk_queue *q)
{
/* Refresh the local pointer */
- q->cached_prod = READ_ONCE(q->ring->producer);
- smp_rmb(); /* C, matches B */
+ q->cached_prod = smp_load_acquire(&q->ring->producer); /* C, matches B */
}
static inline void xskq_cons_get_entries(struct xsk_queue *q)
@@ -397,9 +395,7 @@ static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
{
- smp_wmb(); /* B, matches C */
-
- WRITE_ONCE(q->ring->producer, idx);
+ smp_store_release(&q->ring->producer, idx); /* B, matches C */
}
static inline void xskq_prod_submit(struct xsk_queue *q)
--
2.27.0
^ permalink raw reply related [flat|nested] 5+ messages in thread
* [PATCH bpf-next v2 2/2] libbpf, xsk: add libbpf_smp_store_release libbpf_smp_load_acquire
2021-03-05 9:41 [PATCH bpf-next v2 0/2] load-acquire/store-release barriers for AF_XDP rings Björn Töpel
2021-03-05 9:41 ` [PATCH bpf-next v2 1/2] xsk: update rings for load-acquire/store-release barriers Björn Töpel
@ 2021-03-05 9:41 ` Björn Töpel
2021-03-05 13:50 ` [PATCH bpf-next v2 0/2] load-acquire/store-release barriers for AF_XDP rings Toke Høiland-Jørgensen
2021-03-07 3:40 ` patchwork-bot+netdevbpf
3 siblings, 0 replies; 5+ messages in thread
From: Björn Töpel @ 2021-03-05 9:41 UTC (permalink / raw)
To: ast, daniel, netdev, bpf
Cc: Björn Töpel, magnus.karlsson, jonathan.lemon, maximmi,
andrii, toke, will, paulmck, stern
From: Björn Töpel <bjorn.topel@intel.com>
Now that the AF_XDP rings have load-acquire/store-release semantics,
move libbpf to that as well.
The library-internal libbpf_smp_{load_acquire,store_release} are only
valid for 32-bit words on ARM64.
Also, remove the barriers that are no longer in use.
Signed-off-by: Björn Töpel <bjorn.topel@intel.com>
---
tools/lib/bpf/libbpf_util.h | 72 +++++++++++++++++++++++++------------
tools/lib/bpf/xsk.h | 17 +++------
2 files changed, 55 insertions(+), 34 deletions(-)
diff --git a/tools/lib/bpf/libbpf_util.h b/tools/lib/bpf/libbpf_util.h
index 59c779c5790c..94a0d7bb6f3c 100644
--- a/tools/lib/bpf/libbpf_util.h
+++ b/tools/lib/bpf/libbpf_util.h
@@ -5,6 +5,7 @@
#define __LIBBPF_LIBBPF_UTIL_H
#include <stdbool.h>
+#include <linux/compiler.h>
#ifdef __cplusplus
extern "C" {
@@ -15,29 +16,56 @@ extern "C" {
* application that uses libbpf.
*/
#if defined(__i386__) || defined(__x86_64__)
-# define libbpf_smp_rmb() asm volatile("" : : : "memory")
-# define libbpf_smp_wmb() asm volatile("" : : : "memory")
-# define libbpf_smp_mb() \
- asm volatile("lock; addl $0,-4(%%rsp)" : : : "memory", "cc")
-/* Hinders stores to be observed before older loads. */
-# define libbpf_smp_rwmb() asm volatile("" : : : "memory")
+# define libbpf_smp_store_release(p, v) \
+ do { \
+ asm volatile("" : : : "memory"); \
+ WRITE_ONCE(*p, v); \
+ } while (0)
+# define libbpf_smp_load_acquire(p) \
+ ({ \
+ typeof(*p) ___p1 = READ_ONCE(*p); \
+ asm volatile("" : : : "memory"); \
+ ___p1; \
+ })
#elif defined(__aarch64__)
-# define libbpf_smp_rmb() asm volatile("dmb ishld" : : : "memory")
-# define libbpf_smp_wmb() asm volatile("dmb ishst" : : : "memory")
-# define libbpf_smp_mb() asm volatile("dmb ish" : : : "memory")
-# define libbpf_smp_rwmb() libbpf_smp_mb()
-#elif defined(__arm__)
-/* These are only valid for armv7 and above */
-# define libbpf_smp_rmb() asm volatile("dmb ish" : : : "memory")
-# define libbpf_smp_wmb() asm volatile("dmb ishst" : : : "memory")
-# define libbpf_smp_mb() asm volatile("dmb ish" : : : "memory")
-# define libbpf_smp_rwmb() libbpf_smp_mb()
-#else
-/* Architecture missing native barrier functions. */
-# define libbpf_smp_rmb() __sync_synchronize()
-# define libbpf_smp_wmb() __sync_synchronize()
-# define libbpf_smp_mb() __sync_synchronize()
-# define libbpf_smp_rwmb() __sync_synchronize()
+# define libbpf_smp_store_release(p, v) \
+ asm volatile ("stlr %w1, %0" : "=Q" (*p) : "r" (v) : "memory")
+# define libbpf_smp_load_acquire(p) \
+ ({ \
+ typeof(*p) ___p1; \
+ asm volatile ("ldar %w0, %1" \
+ : "=r" (___p1) : "Q" (*p) : "memory"); \
+ __p1; \
+ })
+#elif defined(__riscv)
+# define libbpf_smp_store_release(p, v) \
+ do { \
+ asm volatile ("fence rw,w" : : : "memory"); \
+ WRITE_ONCE(*p, v); \
+ } while (0)
+# define libbpf_smp_load_acquire(p) \
+ ({ \
+ typeof(*p) ___p1 = READ_ONCE(*p); \
+ asm volatile ("fence r,rw" : : : "memory"); \
+ ___p1; \
+ })
+#endif
+
+#ifndef libbpf_smp_store_release
+#define libbpf_smp_store_release(p, v) \
+ do { \
+ __sync_synchronize(); \
+ WRITE_ONCE(*p, v); \
+ } while (0)
+#endif
+
+#ifndef libbpf_smp_load_acquire
+#define libbpf_smp_load_acquire(p) \
+ ({ \
+ typeof(*p) ___p1 = READ_ONCE(*p); \
+ __sync_synchronize(); \
+ ___p1; \
+ })
#endif
#ifdef __cplusplus
diff --git a/tools/lib/bpf/xsk.h b/tools/lib/bpf/xsk.h
index e9f121f5d129..a9fdea87b5cd 100644
--- a/tools/lib/bpf/xsk.h
+++ b/tools/lib/bpf/xsk.h
@@ -96,7 +96,8 @@ static inline __u32 xsk_prod_nb_free(struct xsk_ring_prod *r, __u32 nb)
* this function. Without this optimization it whould have been
* free_entries = r->cached_prod - r->cached_cons + r->size.
*/
- r->cached_cons = *r->consumer + r->size;
+ r->cached_cons = libbpf_smp_load_acquire(r->consumer);
+ r->cached_cons += r->size;
return r->cached_cons - r->cached_prod;
}
@@ -106,7 +107,7 @@ static inline __u32 xsk_cons_nb_avail(struct xsk_ring_cons *r, __u32 nb)
__u32 entries = r->cached_prod - r->cached_cons;
if (entries == 0) {
- r->cached_prod = *r->producer;
+ r->cached_prod = libbpf_smp_load_acquire(r->producer);
entries = r->cached_prod - r->cached_cons;
}
@@ -129,9 +130,7 @@ static inline void xsk_ring_prod__submit(struct xsk_ring_prod *prod, __u32 nb)
/* Make sure everything has been written to the ring before indicating
* this to the kernel by writing the producer pointer.
*/
- libbpf_smp_wmb();
-
- *prod->producer += nb;
+ libbpf_smp_store_release(prod->producer, *prod->producer + nb);
}
static inline __u32 xsk_ring_cons__peek(struct xsk_ring_cons *cons, __u32 nb, __u32 *idx)
@@ -139,11 +138,6 @@ static inline __u32 xsk_ring_cons__peek(struct xsk_ring_cons *cons, __u32 nb, __
__u32 entries = xsk_cons_nb_avail(cons, nb);
if (entries > 0) {
- /* Make sure we do not speculatively read the data before
- * we have received the packet buffers from the ring.
- */
- libbpf_smp_rmb();
-
*idx = cons->cached_cons;
cons->cached_cons += entries;
}
@@ -161,9 +155,8 @@ static inline void xsk_ring_cons__release(struct xsk_ring_cons *cons, __u32 nb)
/* Make sure data has been read before indicating we are done
* with the entries by updating the consumer pointer.
*/
- libbpf_smp_rwmb();
+ libbpf_smp_store_release(cons->consumer, *cons->consumer + nb);
- *cons->consumer += nb;
}
static inline void *xsk_umem__get_data(void *umem_area, __u64 addr)
--
2.27.0
^ permalink raw reply related [flat|nested] 5+ messages in thread