bpf.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH v3 0/4] bpf: Add user-space-publisher ringbuffer map type
@ 2022-08-18 22:12 David Vernet
  2022-08-18 22:12 ` [PATCH v3 1/4] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF " David Vernet
                   ` (4 more replies)
  0 siblings, 5 replies; 16+ messages in thread
From: David Vernet @ 2022-08-18 22:12 UTC (permalink / raw)
  To: bpf, ast, andrii, daniel
  Cc: kernel-team, martin.lau, song, yhs, john.fastabend, kpsingh, sdf,
	haoluo, jolsa, joannelkoong, tj, linux-kernel

This patch set defines a new map type, BPF_MAP_TYPE_USER_RINGBUF, which
provides single-user-space-producer / single-kernel-consumer semantics over
a ringbuffer.  Along with the new map type, a helper function called
bpf_user_ringbuf_drain() is added which allows a BPF program to specify a
callback with the following signature, to which samples are posted by the
helper:

void (struct bpf_dynptr *dynptr, void *context);

The program can then use the bpf_dynptr_read() or bpf_dynptr_data() helper
functions to safely read the sample from the dynptr. There are currently no
helpers available to determine the size of the sample, but one could easily
be added if required.

On the user-space side, libbpf has been updated to export a new
'struct ring_buffer_user' type, along with the following symbols:

struct ring_buffer_user *
ring_buffer_user__new(int map_fd,
                      const struct ring_buffer_user_opts *opts);
void ring_buffer_user__free(struct ring_buffer_user *rb);
void *ring_buffer_user__reserve(struct ring_buffer_user *rb,
				uint32_t size);
void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size,
			     int timeout_ms);
void ring_buffer_user__discard(struct ring_buffer_user *rb, void *sample);
void ring_buffer_user__submit(struct ring_buffer_user *rb, void *sample);

These symbols are exported for inclusion in libbpf version 1.0.0.

Signed-off-by: David Vernet <void@manifault.com>
--
v2 -> v3:
- Lots of formatting fixes, such as keeping things on one line if they fit
  within 100 characters, and removing some extraneous newlines. Applies
  to all diffs in the patch-set. (Andrii)
- Renamed ring_buffer_user__* symbols to user_ring_buffer__*. (Andrii)
- Added a missing smb_mb__before_atomic() in
  __bpf_user_ringbuf_sample_release(). (Hao)
- Restructure how and when notification events are sent from the kernel to
  the user-space producers via the .map_poll() callback for the
  BPF_MAP_TYPE_USER_RINGBUF map. Before, we only sent a notification when
  the ringbuffer was fully drained. Now, we guarantee user-space that
  we'll send an event at least once per bpf_user_ringbuf_drain(), as long
  as at least one sample was drained, and BPF_RB_NO_WAKEUP was not passed.
  As a heuristic, we also send a notification event any time a sample being
  drained causes the ringbuffer to no longer be full. (Andrii)
- Continuing on the above point, updated
  user_ring_buffer__reserve_blocking() to loop around epoll_wait() until a
  sufficiently large sample is found. (Andrii)
- Communicate BPF_RINGBUF_BUSY_BIT and BPF_RINGBUF_DISCARD_BIT in sample
  headers. The ringbuffer implementation still only supports
  single-producer semantics, but we can now add synchronization support in
  user_ring_buffer__reserve(), and will automatically get multi-producer
  semantics. (Andrii)
- Updated some commit summaries, specifically adding more details where
  warranted. (Andrii)
- Improved function documentation for bpf_user_ringbuf_drain(), more
  clearly explaining all function arguments and return types, as well as
  the semantics for waking up user-space producers.
- Add function header comments for user_ring_buffer__reserve{_blocking}().
  (Andrii)
- Rounding-up all samples to 8-bytes in the user-space producer, and
  enforcing that all samples are properly aligned in the kernel. (Andrii)
- Added testcases that verify that bpf_user_ringbuf_drain() properly
  validates samples, and returns error conditions if any invalid samples
  are encountered. (Andrii)
- Move atomic_t busy field out of the consumer page, and into the
  struct bpf_ringbuf. (Andrii)
- Split ringbuf_map_{mmap, poll}_{kern, user}() into separate
  implementations. (Andrii)
- Don't silently consume errors in bpf_user_ringbuf_drain(). (Andrii)
- Remove magic number of samples (4096) from bpf_user_ringbuf_drain(),
  and instead use BPF_MAX_USER_RINGBUF_SAMPLES macro, which allows
  128k samples. (Andrii)
- Remove MEM_ALLOC modifier from PTR_TO_DYNPTR register in verifier, and
  instead rely solely on the register being PTR_TO_DYNPTR. (Andrii)
- Move freeing of atomic_t busy bit to before we invoke irq_work_queue() in
  __bpf_user_ringbuf_sample_release(). (Andrii)
- Only check for BPF_RB_NO_WAKEUP flag in bpf_ringbuf_drain().
- Remove libbpf function names from kernel smp_{load, store}* comments in
  the kernel. (Andrii)
- Don't use double-underscore naming convention in libbpf functions.
  (Andrii)
- Use proper __u32 and __u64 for types where we need to guarantee their
  size. (Andrii)

v1 -> v2:
- Following Joanne landing 883743422ced ("bpf: Fix ref_obj_id for dynptr
  data slices in verifier") [0], removed [PATCH 1/5] bpf: Clear callee
  saved regs after updating REG0 [1]. (Joanne)
- Following the above adjustment, updated check_helper_call() to not store
  a reference for bpf_dynptr_data() if the register containing the dynptr
  is of type MEM_ALLOC. (Joanne)
- Fixed casting issue pointed out by kernel test robot by adding a missing
  (uintptr_t) cast. (lkp)

[0] https://lore.kernel.org/all/20220809214055.4050604-1-joannelkoong@gmail.com/
[1] https://lore.kernel.org/all/20220808155341.2479054-1-void@manifault.com/

David Vernet (4):
  bpf: Define new BPF_MAP_TYPE_USER_RINGBUF map type
  bpf: Add bpf_user_ringbuf_drain() helper
  bpf: Add libbpf logic for user-space ring buffer
  selftests/bpf: Add selftests validating the user ringbuf

 include/linux/bpf.h                           |  11 +-
 include/linux/bpf_types.h                     |   1 +
 include/uapi/linux/bpf.h                      |  37 +
 kernel/bpf/helpers.c                          |   2 +
 kernel/bpf/ringbuf.c                          | 272 ++++++-
 kernel/bpf/verifier.c                         |  73 +-
 tools/include/uapi/linux/bpf.h                |  37 +
 tools/lib/bpf/libbpf.c                        |  11 +-
 tools/lib/bpf/libbpf.h                        |  21 +
 tools/lib/bpf/libbpf.map                      |   6 +
 tools/lib/bpf/libbpf_probes.c                 |   1 +
 tools/lib/bpf/ringbuf.c                       | 327 ++++++++
 .../selftests/bpf/prog_tests/user_ringbuf.c   | 715 ++++++++++++++++++
 .../selftests/bpf/progs/user_ringbuf_fail.c   | 177 +++++
 .../bpf/progs/user_ringbuf_success.c          | 220 ++++++
 .../testing/selftests/bpf/test_user_ringbuf.h |  35 +
 16 files changed, 1924 insertions(+), 22 deletions(-)
 create mode 100644 tools/testing/selftests/bpf/prog_tests/user_ringbuf.c
 create mode 100644 tools/testing/selftests/bpf/progs/user_ringbuf_fail.c
 create mode 100644 tools/testing/selftests/bpf/progs/user_ringbuf_success.c
 create mode 100644 tools/testing/selftests/bpf/test_user_ringbuf.h

-- 
2.37.1


^ permalink raw reply	[flat|nested] 16+ messages in thread

* [PATCH v3 1/4] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF map type
  2022-08-18 22:12 [PATCH v3 0/4] bpf: Add user-space-publisher ringbuffer map type David Vernet
@ 2022-08-18 22:12 ` David Vernet
  2022-08-24 20:52   ` Andrii Nakryiko
  2022-08-18 22:12 ` [PATCH v3 2/4] bpf: Add bpf_user_ringbuf_drain() helper David Vernet
                   ` (3 subsequent siblings)
  4 siblings, 1 reply; 16+ messages in thread
From: David Vernet @ 2022-08-18 22:12 UTC (permalink / raw)
  To: bpf, ast, andrii, daniel
  Cc: kernel-team, martin.lau, song, yhs, john.fastabend, kpsingh, sdf,
	haoluo, jolsa, joannelkoong, tj, linux-kernel

We want to support a ringbuf map type where samples are published from
user-space, to be consumed by BPF programs. BPF currently supports a kernel
-> user-space circular ringbuffer via the BPF_MAP_TYPE_RINGBUF map type.
We'll need to define a new map type for user-space -> kernel, as none of
the helpers exported for BPF_MAP_TYPE_RINGBUF will apply to a user-space
producer ringbuffer, and we'll want to add one or more helper functions
that would not apply for a kernel-producer ringbuffer.

This patch therefore adds a new BPF_MAP_TYPE_USER_RINGBUF map type
definition. The map type is useless in its current form, as there is no way
to access or use it for anything until we one or more BPF helpers. A
follow-on patch will therefore add a new helper function that allows BPF
programs to run callbacks on samples that are published to the ringbuffer.

Signed-off-by: David Vernet <void@manifault.com>
---
 include/linux/bpf_types.h      |  1 +
 include/uapi/linux/bpf.h       |  1 +
 kernel/bpf/ringbuf.c           | 62 ++++++++++++++++++++++++++++++----
 kernel/bpf/verifier.c          |  3 ++
 tools/include/uapi/linux/bpf.h |  1 +
 tools/lib/bpf/libbpf.c         |  1 +
 6 files changed, 63 insertions(+), 6 deletions(-)

diff --git a/include/linux/bpf_types.h b/include/linux/bpf_types.h
index 2b9112b80171..2c6a4f2562a7 100644
--- a/include/linux/bpf_types.h
+++ b/include/linux/bpf_types.h
@@ -126,6 +126,7 @@ BPF_MAP_TYPE(BPF_MAP_TYPE_STRUCT_OPS, bpf_struct_ops_map_ops)
 #endif
 BPF_MAP_TYPE(BPF_MAP_TYPE_RINGBUF, ringbuf_map_ops)
 BPF_MAP_TYPE(BPF_MAP_TYPE_BLOOM_FILTER, bloom_filter_map_ops)
+BPF_MAP_TYPE(BPF_MAP_TYPE_USER_RINGBUF, user_ringbuf_map_ops)
 
 BPF_LINK_TYPE(BPF_LINK_TYPE_RAW_TRACEPOINT, raw_tracepoint)
 BPF_LINK_TYPE(BPF_LINK_TYPE_TRACING, tracing)
diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
index 934a2a8beb87..3aee7681fa68 100644
--- a/include/uapi/linux/bpf.h
+++ b/include/uapi/linux/bpf.h
@@ -909,6 +909,7 @@ enum bpf_map_type {
 	BPF_MAP_TYPE_INODE_STORAGE,
 	BPF_MAP_TYPE_TASK_STORAGE,
 	BPF_MAP_TYPE_BLOOM_FILTER,
+	BPF_MAP_TYPE_USER_RINGBUF,
 };
 
 /* Note that tracing related programs such as
diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
index b483aea35f41..0a8de712ecbe 100644
--- a/kernel/bpf/ringbuf.c
+++ b/kernel/bpf/ringbuf.c
@@ -38,10 +38,27 @@ struct bpf_ringbuf {
 	struct page **pages;
 	int nr_pages;
 	spinlock_t spinlock ____cacheline_aligned_in_smp;
-	/* Consumer and producer counters are put into separate pages to allow
-	 * mapping consumer page as r/w, but restrict producer page to r/o.
-	 * This protects producer position from being modified by user-space
-	 * application and ruining in-kernel position tracking.
+	/* Consumer and producer counters are put into separate pages to
+	 * allow each position to be mapped with different permissions.
+	 * This prevents a user-space application from modifying the
+	 * position and ruining in-kernel tracking. The permissions of the
+	 * pages depend on who is producing samples: user-space or the
+	 * kernel.
+	 *
+	 * Kernel-producer
+	 * ---------------
+	 * The producer position and data pages are mapped as r/o in
+	 * userspace. For this approach, bits in the header of samples are
+	 * used to signal to user-space, and to other producers, whether a
+	 * sample is currently being written.
+	 *
+	 * User-space producer
+	 * -------------------
+	 * Only the page containing the consumer position is mapped r/o in
+	 * user-space. User-space producers also use bits of the header to
+	 * communicate to the kernel, but the kernel must carefully check and
+	 * validate each sample to ensure that they're correctly formatted, and
+	 * fully contained within the ringbuffer.
 	 */
 	unsigned long consumer_pos __aligned(PAGE_SIZE);
 	unsigned long producer_pos __aligned(PAGE_SIZE);
@@ -224,7 +241,7 @@ static int ringbuf_map_get_next_key(struct bpf_map *map, void *key,
 	return -ENOTSUPP;
 }
 
-static int ringbuf_map_mmap(struct bpf_map *map, struct vm_area_struct *vma)
+static int ringbuf_map_mmap_kern(struct bpf_map *map, struct vm_area_struct *vma)
 {
 	struct bpf_ringbuf_map *rb_map;
 
@@ -242,6 +259,26 @@ static int ringbuf_map_mmap(struct bpf_map *map, struct vm_area_struct *vma)
 				   vma->vm_pgoff + RINGBUF_PGOFF);
 }
 
+static int ringbuf_map_mmap_user(struct bpf_map *map, struct vm_area_struct *vma)
+{
+	struct bpf_ringbuf_map *rb_map;
+
+	rb_map = container_of(map, struct bpf_ringbuf_map, map);
+
+	if (vma->vm_flags & VM_WRITE) {
+		if (vma->vm_pgoff == 0)
+			/* Disallow writable mappings to the consumer pointer,
+			 * and allow writable mappings to both the producer
+			 * position, and the ring buffer data itself.
+			 */
+			return -EPERM;
+	} else {
+		vma->vm_flags &= ~VM_MAYWRITE;
+	}
+	/* remap_vmalloc_range() checks size and offset constraints */
+	return remap_vmalloc_range(vma, rb_map->rb, vma->vm_pgoff + RINGBUF_PGOFF);
+}
+
 static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)
 {
 	unsigned long cons_pos, prod_pos;
@@ -269,7 +306,7 @@ const struct bpf_map_ops ringbuf_map_ops = {
 	.map_meta_equal = bpf_map_meta_equal,
 	.map_alloc = ringbuf_map_alloc,
 	.map_free = ringbuf_map_free,
-	.map_mmap = ringbuf_map_mmap,
+	.map_mmap = ringbuf_map_mmap_kern,
 	.map_poll = ringbuf_map_poll,
 	.map_lookup_elem = ringbuf_map_lookup_elem,
 	.map_update_elem = ringbuf_map_update_elem,
@@ -278,6 +315,19 @@ const struct bpf_map_ops ringbuf_map_ops = {
 	.map_btf_id = &ringbuf_map_btf_ids[0],
 };
 
+BTF_ID_LIST_SINGLE(user_ringbuf_map_btf_ids, struct, bpf_ringbuf_map)
+const struct bpf_map_ops user_ringbuf_map_ops = {
+	.map_meta_equal = bpf_map_meta_equal,
+	.map_alloc = ringbuf_map_alloc,
+	.map_free = ringbuf_map_free,
+	.map_mmap = ringbuf_map_mmap_user,
+	.map_lookup_elem = ringbuf_map_lookup_elem,
+	.map_update_elem = ringbuf_map_update_elem,
+	.map_delete_elem = ringbuf_map_delete_elem,
+	.map_get_next_key = ringbuf_map_get_next_key,
+	.map_btf_id = &user_ringbuf_map_btf_ids[0],
+};
+
 /* Given pointer to ring buffer record metadata and struct bpf_ringbuf itself,
  * calculate offset from record metadata to ring buffer in pages, rounded
  * down. This page offset is stored as part of record metadata and allows to
diff --git a/kernel/bpf/verifier.c b/kernel/bpf/verifier.c
index 2c1f8069f7b7..970ec5c7ce05 100644
--- a/kernel/bpf/verifier.c
+++ b/kernel/bpf/verifier.c
@@ -6202,6 +6202,8 @@ static int check_map_func_compatibility(struct bpf_verifier_env *env,
 		    func_id != BPF_FUNC_ringbuf_discard_dynptr)
 			goto error;
 		break;
+	case BPF_MAP_TYPE_USER_RINGBUF:
+		goto error;
 	case BPF_MAP_TYPE_STACK_TRACE:
 		if (func_id != BPF_FUNC_get_stackid)
 			goto error;
@@ -12681,6 +12683,7 @@ static int check_map_prog_compatibility(struct bpf_verifier_env *env,
 			}
 			break;
 		case BPF_MAP_TYPE_RINGBUF:
+		case BPF_MAP_TYPE_USER_RINGBUF:
 		case BPF_MAP_TYPE_INODE_STORAGE:
 		case BPF_MAP_TYPE_SK_STORAGE:
 		case BPF_MAP_TYPE_TASK_STORAGE:
diff --git a/tools/include/uapi/linux/bpf.h b/tools/include/uapi/linux/bpf.h
index 1d6085e15fc8..25da0d4c7e14 100644
--- a/tools/include/uapi/linux/bpf.h
+++ b/tools/include/uapi/linux/bpf.h
@@ -909,6 +909,7 @@ enum bpf_map_type {
 	BPF_MAP_TYPE_INODE_STORAGE,
 	BPF_MAP_TYPE_TASK_STORAGE,
 	BPF_MAP_TYPE_BLOOM_FILTER,
+	BPF_MAP_TYPE_USER_RINGBUF,
 };
 
 /* Note that tracing related programs such as
diff --git a/tools/lib/bpf/libbpf.c b/tools/lib/bpf/libbpf.c
index 3ad139285fad..6b580ba027ba 100644
--- a/tools/lib/bpf/libbpf.c
+++ b/tools/lib/bpf/libbpf.c
@@ -163,6 +163,7 @@ static const char * const map_type_name[] = {
 	[BPF_MAP_TYPE_INODE_STORAGE]		= "inode_storage",
 	[BPF_MAP_TYPE_TASK_STORAGE]		= "task_storage",
 	[BPF_MAP_TYPE_BLOOM_FILTER]		= "bloom_filter",
+	[BPF_MAP_TYPE_USER_RINGBUF]             = "user_ringbuf",
 };
 
 static const char * const prog_type_name[] = {
-- 
2.37.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH v3 2/4] bpf: Add bpf_user_ringbuf_drain() helper
  2022-08-18 22:12 [PATCH v3 0/4] bpf: Add user-space-publisher ringbuffer map type David Vernet
  2022-08-18 22:12 ` [PATCH v3 1/4] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF " David Vernet
@ 2022-08-18 22:12 ` David Vernet
  2022-08-24 21:22   ` Andrii Nakryiko
  2022-08-18 22:12 ` [PATCH v3 3/4] bpf: Add libbpf logic for user-space ring buffer David Vernet
                   ` (2 subsequent siblings)
  4 siblings, 1 reply; 16+ messages in thread
From: David Vernet @ 2022-08-18 22:12 UTC (permalink / raw)
  To: bpf, ast, andrii, daniel
  Cc: kernel-team, martin.lau, song, yhs, john.fastabend, kpsingh, sdf,
	haoluo, jolsa, joannelkoong, tj, linux-kernel

In a prior change, we added a new BPF_MAP_TYPE_USER_RINGBUF map type which
will allow user-space applications to publish messages to a ringbuffer that
is consumed by a BPF program in kernel-space. In order for this map-type to
be useful, it will require a BPF helper function that BPF programs can
invoke to drain samples from the ringbuffer, and invoke callbacks on those
samples. This change adds that capability via a new BPF helper function:

bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx,
                       u64 flags)

BPF programs may invoke this function to run callback_fn() on a series of
samples in the ringbuffer. callback_fn() has the following signature:

long callback_fn(struct bpf_dynptr *dynptr, void *context);

Samples are provided to the callback in the form of struct bpf_dynptr *'s,
which the program can read using BPF helper functions for querying
struct bpf_dynptr's.

In order to support bpf_ringbuf_drain(), a new PTR_TO_DYNPTR register
type is added to the verifier to reflect a dynptr that was allocated by
a helper function and passed to a BPF program. Unlike PTR_TO_STACK
dynptrs which are allocated on the stack by a BPF program, PTR_TO_DYNPTR
dynptrs need not use reference tracking, as the BPF helper is trusted to
properly free the dynptr before returning. The verifier currently only
supports PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL.

Note that while the corresponding user-space libbpf logic will be added in
a subsequent patch, this patch does contain an implementation of the
.map_poll() callback for BPF_MAP_TYPE_USER_RINGBUF maps. This .map_poll()
callback guarantees that an epoll-waiting user-space producer will
receive at least one event notification whenever at least one sample is
drained in an invocation of bpf_user_ringbuf_drain(), provided that the
function is not invoked with the BPF_RB_NO_WAKEUP flag.

Sending an event notification for every sample is not an option, as it
could cause the system to hang due to invoking irq_work_queue() in
too-frequent succession. So as to try and optimize for the common case,
however, bpf_user_ringbuf_drain() will also send an event notification
whenever a sample being drained causes the ringbuffer to no longer be
full. This heuristic may not help some user-space producers, as a
producer can publish samples of varying size, and there may not be
enough space in the ringbuffer after the first sample is drained which
causes it to no longer be full. In this case, the producer may have to
wait until bpf_ringbuf_drain() returns to receive an event notification.

Signed-off-by: David Vernet <void@manifault.com>
---
 include/linux/bpf.h            |  11 +-
 include/uapi/linux/bpf.h       |  36 ++++++
 kernel/bpf/helpers.c           |   2 +
 kernel/bpf/ringbuf.c           | 210 ++++++++++++++++++++++++++++++++-
 kernel/bpf/verifier.c          |  72 +++++++++--
 tools/include/uapi/linux/bpf.h |  36 ++++++
 6 files changed, 352 insertions(+), 15 deletions(-)

diff --git a/include/linux/bpf.h b/include/linux/bpf.h
index a627a02cf8ab..515d712fd4a5 100644
--- a/include/linux/bpf.h
+++ b/include/linux/bpf.h
@@ -401,7 +401,7 @@ enum bpf_type_flag {
 	/* DYNPTR points to memory local to the bpf program. */
 	DYNPTR_TYPE_LOCAL	= BIT(8 + BPF_BASE_TYPE_BITS),
 
-	/* DYNPTR points to a ringbuf record. */
+	/* DYNPTR points to a kernel-produced ringbuf record. */
 	DYNPTR_TYPE_RINGBUF	= BIT(9 + BPF_BASE_TYPE_BITS),
 
 	/* Size is known at compile time. */
@@ -606,6 +606,7 @@ enum bpf_reg_type {
 	PTR_TO_MEM,		 /* reg points to valid memory region */
 	PTR_TO_BUF,		 /* reg points to a read/write buffer */
 	PTR_TO_FUNC,		 /* reg points to a bpf program function */
+	PTR_TO_DYNPTR,		 /* reg points to a dynptr */
 	__BPF_REG_TYPE_MAX,
 
 	/* Extended reg_types. */
@@ -1333,6 +1334,11 @@ struct bpf_array {
 #define BPF_MAP_CAN_READ	BIT(0)
 #define BPF_MAP_CAN_WRITE	BIT(1)
 
+/* Maximum number of user-producer ringbuffer samples that can be drained in
+ * a call to bpf_user_ringbuf_drain().
+ */
+#define BPF_MAX_USER_RINGBUF_SAMPLES BIT(17)
+
 static inline u32 bpf_map_flags_to_cap(struct bpf_map *map)
 {
 	u32 access_flags = map->map_flags & (BPF_F_RDONLY_PROG | BPF_F_WRONLY_PROG);
@@ -2411,6 +2417,7 @@ extern const struct bpf_func_proto bpf_loop_proto;
 extern const struct bpf_func_proto bpf_copy_from_user_task_proto;
 extern const struct bpf_func_proto bpf_set_retval_proto;
 extern const struct bpf_func_proto bpf_get_retval_proto;
+extern const struct bpf_func_proto bpf_user_ringbuf_drain_proto;
 
 const struct bpf_func_proto *tracing_prog_func_proto(
   enum bpf_func_id func_id, const struct bpf_prog *prog);
@@ -2555,7 +2562,7 @@ enum bpf_dynptr_type {
 	BPF_DYNPTR_TYPE_INVALID,
 	/* Points to memory that is local to the bpf program */
 	BPF_DYNPTR_TYPE_LOCAL,
-	/* Underlying data is a ringbuf record */
+	/* Underlying data is a kernel-produced ringbuf record */
 	BPF_DYNPTR_TYPE_RINGBUF,
 };
 
diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
index 3aee7681fa68..25c599d9adf8 100644
--- a/include/uapi/linux/bpf.h
+++ b/include/uapi/linux/bpf.h
@@ -5356,6 +5356,41 @@ union bpf_attr {
  *	Return
  *		Current *ktime*.
  *
+ * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
+ *	Description
+ *		Drain samples from the specified user ringbuffer, and invoke the
+ *		provided callback for each such sample:
+ *
+ *		long (\*callback_fn)(struct bpf_dynptr \*dynptr, void \*ctx);
+ *
+ *		If **callback_fn** returns 0, the helper will continue to try
+ *		and drain the next sample, up to a maximum of
+ *		BPF_MAX_USER_RINGBUF_SAMPLES samples. If the return value is 1,
+ *		the helper will skip the rest of the samples and return. Other
+ *		return values are not used now, and will be rejected by the
+ *		verifier.
+ *	Return
+ *		The number of drained samples if no error was encountered while
+ *		draining samples. If a user-space producer was epoll-waiting on
+ *		this map, and at least one sample was drained, they will
+ *		receive an event notification notifying them of available space
+ *		in the ringbuffer. If the BPF_RB_NO_WAKEUP flag is passed to
+ *		this function, no wakeup notification will be sent. If there
+ *		are no samples in the ringbuffer, 0 is returned.
+ *
+ *		On failure, the returned value is one of the following:
+ *
+ *		**-EBUSY** if the ringbuffer is contended, and another calling
+ *		context was concurrently draining the ringbuffer.
+ *
+ *		**-EINVAL** if user-space is not properly tracking the
+ *		ringbuffer due to the producer position not being aligned to 8
+ *		bytes, a sample not being aligned to 8 bytes, the producer
+ *		position not matching the advertised length of a sample, or the
+ *		sample size being larger than the ringbuffer.
+ *
+ *		**-E2BIG** if user-space has tried to publish a sample that
+ *		cannot fit within a struct bpf_dynptr.
  */
 #define __BPF_FUNC_MAPPER(FN)		\
 	FN(unspec),			\
@@ -5567,6 +5602,7 @@ union bpf_attr {
 	FN(tcp_raw_check_syncookie_ipv4),	\
 	FN(tcp_raw_check_syncookie_ipv6),	\
 	FN(ktime_get_tai_ns),		\
+	FN(user_ringbuf_drain),	        \
 	/* */
 
 /* integer value in 'imm' field of BPF_CALL instruction selects which helper
diff --git a/kernel/bpf/helpers.c b/kernel/bpf/helpers.c
index 3c1b9bbcf971..9141eae0ca67 100644
--- a/kernel/bpf/helpers.c
+++ b/kernel/bpf/helpers.c
@@ -1661,6 +1661,8 @@ bpf_base_func_proto(enum bpf_func_id func_id)
 		return &bpf_dynptr_write_proto;
 	case BPF_FUNC_dynptr_data:
 		return &bpf_dynptr_data_proto;
+	case BPF_FUNC_user_ringbuf_drain:
+		return &bpf_user_ringbuf_drain_proto;
 	default:
 		break;
 	}
diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
index 0a8de712ecbe..3818398e57de 100644
--- a/kernel/bpf/ringbuf.c
+++ b/kernel/bpf/ringbuf.c
@@ -38,6 +38,22 @@ struct bpf_ringbuf {
 	struct page **pages;
 	int nr_pages;
 	spinlock_t spinlock ____cacheline_aligned_in_smp;
+	/* For user-space producer ringbuffers, an atomic_t busy bit is used to
+	 * synchronize access to the ringbuffer in the kernel, rather than the
+	 * spinlock that is used for kernel-producer ringbuffers. This is done
+	 * because the ringbuffer must hold a lock across a BPF program's
+	 * callback:
+	 *
+	 *    __bpf_user_ringbuf_peek() // lock acquired
+	 * -> program callback_fn()
+	 * -> __bpf_user_ringbuf_sample_release() // lock released
+	 *
+	 * It is unsafe and incorrect to hold an IRQ spinlock across what could
+	 * be a long execution window, so we instead simply disallow concurrent
+	 * access to the ringbuffer by kernel consumers, and return -EBUSY from
+	 * __bpf_user_ringbuf_peek() if the busy bit is held by another task.
+	 */
+	atomic_t busy ____cacheline_aligned_in_smp;
 	/* Consumer and producer counters are put into separate pages to
 	 * allow each position to be mapped with different permissions.
 	 * This prevents a user-space application from modifying the
@@ -153,6 +169,7 @@ static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node)
 		return NULL;
 
 	spin_lock_init(&rb->spinlock);
+	atomic_set(&rb->busy, 0);
 	init_waitqueue_head(&rb->waitq);
 	init_irq_work(&rb->work, bpf_ringbuf_notify);
 
@@ -288,8 +305,13 @@ static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)
 	return prod_pos - cons_pos;
 }
 
-static __poll_t ringbuf_map_poll(struct bpf_map *map, struct file *filp,
-				 struct poll_table_struct *pts)
+static u32 ringbuf_total_data_sz(const struct bpf_ringbuf *rb)
+{
+	return rb->mask + 1;
+}
+
+static __poll_t ringbuf_map_poll_kern(struct bpf_map *map, struct file *filp,
+				      struct poll_table_struct *pts)
 {
 	struct bpf_ringbuf_map *rb_map;
 
@@ -301,13 +323,26 @@ static __poll_t ringbuf_map_poll(struct bpf_map *map, struct file *filp,
 	return 0;
 }
 
+static __poll_t ringbuf_map_poll_user(struct bpf_map *map, struct file *filp,
+				      struct poll_table_struct *pts)
+{
+	struct bpf_ringbuf_map *rb_map;
+
+	rb_map = container_of(map, struct bpf_ringbuf_map, map);
+	poll_wait(filp, &rb_map->rb->waitq, pts);
+
+	if (ringbuf_avail_data_sz(rb_map->rb) < ringbuf_total_data_sz(rb_map->rb))
+		return  EPOLLOUT | EPOLLWRNORM;
+	return 0;
+}
+
 BTF_ID_LIST_SINGLE(ringbuf_map_btf_ids, struct, bpf_ringbuf_map)
 const struct bpf_map_ops ringbuf_map_ops = {
 	.map_meta_equal = bpf_map_meta_equal,
 	.map_alloc = ringbuf_map_alloc,
 	.map_free = ringbuf_map_free,
 	.map_mmap = ringbuf_map_mmap_kern,
-	.map_poll = ringbuf_map_poll,
+	.map_poll = ringbuf_map_poll_kern,
 	.map_lookup_elem = ringbuf_map_lookup_elem,
 	.map_update_elem = ringbuf_map_update_elem,
 	.map_delete_elem = ringbuf_map_delete_elem,
@@ -321,6 +356,7 @@ const struct bpf_map_ops user_ringbuf_map_ops = {
 	.map_alloc = ringbuf_map_alloc,
 	.map_free = ringbuf_map_free,
 	.map_mmap = ringbuf_map_mmap_user,
+	.map_poll = ringbuf_map_poll_user,
 	.map_lookup_elem = ringbuf_map_lookup_elem,
 	.map_update_elem = ringbuf_map_update_elem,
 	.map_delete_elem = ringbuf_map_delete_elem,
@@ -362,7 +398,7 @@ static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size)
 		return NULL;
 
 	len = round_up(size + BPF_RINGBUF_HDR_SZ, 8);
-	if (len > rb->mask + 1)
+	if (len > ringbuf_total_data_sz(rb))
 		return NULL;
 
 	cons_pos = smp_load_acquire(&rb->consumer_pos);
@@ -509,7 +545,7 @@ BPF_CALL_2(bpf_ringbuf_query, struct bpf_map *, map, u64, flags)
 	case BPF_RB_AVAIL_DATA:
 		return ringbuf_avail_data_sz(rb);
 	case BPF_RB_RING_SIZE:
-		return rb->mask + 1;
+		return ringbuf_total_data_sz(rb);
 	case BPF_RB_CONS_POS:
 		return smp_load_acquire(&rb->consumer_pos);
 	case BPF_RB_PROD_POS:
@@ -603,3 +639,167 @@ const struct bpf_func_proto bpf_ringbuf_discard_dynptr_proto = {
 	.arg1_type	= ARG_PTR_TO_DYNPTR | DYNPTR_TYPE_RINGBUF | OBJ_RELEASE,
 	.arg2_type	= ARG_ANYTHING,
 };
+
+static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *size)
+{
+	int err, busy = 0;
+	u32 hdr_len, sample_len, total_len, flags, *hdr;
+	u64 cons_pos, prod_pos;
+
+	/* If another consumer is already consuming a sample, wait for them to finish. */
+	if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
+		return -EBUSY;
+
+retry:
+	/* Synchronizes with smp_store_release() in user-space producer. */
+	prod_pos = smp_load_acquire(&rb->producer_pos);
+	if (prod_pos % 8) {
+		err = -EINVAL;
+		goto err_unlock;
+	}
+
+	/* Synchronizes with smp_store_release() in __bpf_user_ringbuf_sample_release() */
+	cons_pos = smp_load_acquire(&rb->consumer_pos);
+	if (cons_pos >= prod_pos) {
+		err = -ENODATA;
+		goto err_unlock;
+	}
+
+	hdr = (u32 *)((uintptr_t)rb->data + (uintptr_t)(cons_pos & rb->mask));
+	/* Synchronizes with smp_store_release() in user-space producer. */
+	hdr_len = smp_load_acquire(hdr);
+	flags = hdr_len & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT);
+	sample_len = hdr_len & ~flags;
+	total_len = sample_len + BPF_RINGBUF_HDR_SZ;
+
+	/* Validate the sample header before doing anything else. Even if
+	 * BPF_RINGBUF_BUSY_BIT or BPF_RINGBUF_DISCARD_BIT are set, user-space
+	 * is expected to set a valid length in the header for the sample.
+	 */
+	/* The sample length must be aligned to 8-bytes. */
+	if (sample_len % 8) {
+		err = -EINVAL;
+		goto err_unlock;
+	}
+
+	/* The sample must fit within the region advertised by the producer position. */
+	if (total_len > prod_pos - cons_pos) {
+		err = -EINVAL;
+		goto err_unlock;
+	}
+
+	/* The sample must fit within the data region of the ring buffer. */
+	if (total_len > ringbuf_total_data_sz(rb)) {
+		err = -EINVAL;
+		goto err_unlock;
+	}
+
+	/* The sample must fit into a struct bpf_dynptr. */
+	err = bpf_dynptr_check_size(sample_len);
+	if (err)
+		goto err_unlock;
+
+	if (flags & BPF_RINGBUF_DISCARD_BIT) {
+		/* If the discard bit is set, the sample should be ignored, and
+		 * we can instead try to read the next one.
+		 *
+		 * Synchronizes with smp_load_acquire() in the user-space
+		 * producer, and smp_load_acquire() in
+		 * __bpf_user_ringbuf_peek() above.
+		 */
+		smp_store_release(&rb->consumer_pos, cons_pos + total_len);
+		goto retry;
+	}
+
+	if (flags & BPF_RINGBUF_BUSY_BIT) {
+		err = -ENODATA;
+		goto err_unlock;
+	}
+
+	*sample = (void *)((uintptr_t)rb->data +
+			   (uintptr_t)((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
+	*size = sample_len;
+	return 0;
+
+err_unlock:
+	atomic_set(&rb->busy, 0);
+	return err;
+}
+
+static void __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, u64 flags)
+{
+	u64 producer_pos, consumer_pos;
+
+	/* Synchronizes with smp_store_release() in user-space producer. */
+	producer_pos = smp_load_acquire(&rb->producer_pos);
+
+	/* Using smp_load_acquire() is unnecessary here, as the busy-bit
+	 * prevents another task from writing to consumer_pos after it was read
+	 * by this task with smp_load_acquire() in __bpf_user_ringbuf_peek().
+	 */
+	consumer_pos = rb->consumer_pos;
+	 /* Synchronizes with smp_load_acquire() in user-space producer. */
+	smp_store_release(&rb->consumer_pos, consumer_pos + size + BPF_RINGBUF_HDR_SZ);
+
+	/* Prevent the clearing of the busy-bit from being reordered before the
+	 * storing of the updated rb->consumer_pos value.
+	 */
+	smp_mb__before_atomic();
+	atomic_set(&rb->busy, 0);
+
+	if (!(flags & BPF_RB_NO_WAKEUP)) {
+		/* As a heuristic, if the previously consumed sample caused the
+		 * ringbuffer to no longer be full, send an event notification
+		 * to any user-space producer that is epoll-waiting.
+		 */
+		if (producer_pos - consumer_pos == ringbuf_total_data_sz(rb))
+			irq_work_queue(&rb->work);
+
+	}
+}
+
+BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
+	   void *, callback_fn, void *, callback_ctx, u64, flags)
+{
+	struct bpf_ringbuf *rb;
+	long num_samples = 0, ret = 0;
+	bpf_callback_t callback = (bpf_callback_t)callback_fn;
+	u64 wakeup_flags = BPF_RB_NO_WAKEUP;
+
+	if (unlikely(flags & ~wakeup_flags))
+		return -EINVAL;
+
+	rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
+	do {
+		int err;
+		u32 size;
+		void *sample;
+		struct bpf_dynptr_kern dynptr;
+
+		err = __bpf_user_ringbuf_peek(rb, &sample, &size);
+		if (err) {
+			ret = err != -ENODATA ? err : num_samples;
+			goto schedule_work_return;
+		}
+
+		bpf_dynptr_init(&dynptr, sample, BPF_DYNPTR_TYPE_LOCAL, 0, size);
+		ret = callback((uintptr_t)&dynptr, (uintptr_t)callback_ctx, 0, 0, 0);
+		__bpf_user_ringbuf_sample_release(rb, size, flags);
+		num_samples++;
+	} while (num_samples < BPF_MAX_USER_RINGBUF_SAMPLES && ret == 0);
+	ret = num_samples;
+
+schedule_work_return:
+	if (!(flags & BPF_RB_NO_WAKEUP) && num_samples > 0)
+		irq_work_queue(&rb->work);
+	return ret;
+}
+
+const struct bpf_func_proto bpf_user_ringbuf_drain_proto = {
+	.func		= bpf_user_ringbuf_drain,
+	.ret_type	= RET_INTEGER,
+	.arg1_type	= ARG_CONST_MAP_PTR,
+	.arg2_type	= ARG_PTR_TO_FUNC,
+	.arg3_type	= ARG_PTR_TO_STACK_OR_NULL,
+	.arg4_type	= ARG_ANYTHING,
+};
diff --git a/kernel/bpf/verifier.c b/kernel/bpf/verifier.c
index 970ec5c7ce05..0aa8da73708e 100644
--- a/kernel/bpf/verifier.c
+++ b/kernel/bpf/verifier.c
@@ -561,6 +561,7 @@ static const char *reg_type_str(struct bpf_verifier_env *env,
 		[PTR_TO_BUF]		= "buf",
 		[PTR_TO_FUNC]		= "func",
 		[PTR_TO_MAP_KEY]	= "map_key",
+		[PTR_TO_DYNPTR]		= "dynptr_ptr",
 	};
 
 	if (type & PTR_MAYBE_NULL) {
@@ -5662,6 +5663,12 @@ static const struct bpf_reg_types stack_ptr_types = { .types = { PTR_TO_STACK }
 static const struct bpf_reg_types const_str_ptr_types = { .types = { PTR_TO_MAP_VALUE } };
 static const struct bpf_reg_types timer_types = { .types = { PTR_TO_MAP_VALUE } };
 static const struct bpf_reg_types kptr_types = { .types = { PTR_TO_MAP_VALUE } };
+static const struct bpf_reg_types dynptr_types = {
+	.types = {
+		PTR_TO_STACK,
+		PTR_TO_DYNPTR | DYNPTR_TYPE_LOCAL,
+	}
+};
 
 static const struct bpf_reg_types *compatible_reg_types[__BPF_ARG_TYPE_MAX] = {
 	[ARG_PTR_TO_MAP_KEY]		= &map_key_value_types,
@@ -5688,7 +5695,7 @@ static const struct bpf_reg_types *compatible_reg_types[__BPF_ARG_TYPE_MAX] = {
 	[ARG_PTR_TO_CONST_STR]		= &const_str_ptr_types,
 	[ARG_PTR_TO_TIMER]		= &timer_types,
 	[ARG_PTR_TO_KPTR]		= &kptr_types,
-	[ARG_PTR_TO_DYNPTR]		= &stack_ptr_types,
+	[ARG_PTR_TO_DYNPTR]		= &dynptr_types,
 };
 
 static int check_reg_type(struct bpf_verifier_env *env, u32 regno,
@@ -6031,6 +6038,13 @@ static int check_func_arg(struct bpf_verifier_env *env, u32 arg,
 		err = check_mem_size_reg(env, reg, regno, true, meta);
 		break;
 	case ARG_PTR_TO_DYNPTR:
+		/* We only need to check for initialized / uninitialized helper
+		 * dynptr args if the dynptr is not PTR_TO_DYNPTR, as the
+		 * assumption is that if it is, that a helper function
+		 * initialized the dynptr on behalf of the BPF program.
+		 */
+		if (base_type(reg->type) == PTR_TO_DYNPTR)
+			break;
 		if (arg_type & MEM_UNINIT) {
 			if (!is_dynptr_reg_valid_uninit(env, reg)) {
 				verbose(env, "Dynptr has to be an uninitialized dynptr\n");
@@ -6203,7 +6217,9 @@ static int check_map_func_compatibility(struct bpf_verifier_env *env,
 			goto error;
 		break;
 	case BPF_MAP_TYPE_USER_RINGBUF:
-		goto error;
+		if (func_id != BPF_FUNC_user_ringbuf_drain)
+			goto error;
+		break;
 	case BPF_MAP_TYPE_STACK_TRACE:
 		if (func_id != BPF_FUNC_get_stackid)
 			goto error;
@@ -6323,6 +6339,10 @@ static int check_map_func_compatibility(struct bpf_verifier_env *env,
 		if (map->map_type != BPF_MAP_TYPE_RINGBUF)
 			goto error;
 		break;
+	case BPF_FUNC_user_ringbuf_drain:
+		if (map->map_type != BPF_MAP_TYPE_USER_RINGBUF)
+			goto error;
+		break;
 	case BPF_FUNC_get_stackid:
 		if (map->map_type != BPF_MAP_TYPE_STACK_TRACE)
 			goto error;
@@ -6878,6 +6898,29 @@ static int set_find_vma_callback_state(struct bpf_verifier_env *env,
 	return 0;
 }
 
+static int set_user_ringbuf_callback_state(struct bpf_verifier_env *env,
+					   struct bpf_func_state *caller,
+					   struct bpf_func_state *callee,
+					   int insn_idx)
+{
+	/* bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void
+	 *			  callback_ctx, u64 flags);
+	 * callback_fn(struct bpf_dynptr_t* dynptr, void *callback_ctx);
+	 */
+	__mark_reg_not_init(env, &callee->regs[BPF_REG_0]);
+	callee->regs[BPF_REG_1].type = PTR_TO_DYNPTR | DYNPTR_TYPE_LOCAL;
+	__mark_reg_known_zero(&callee->regs[BPF_REG_1]);
+	callee->regs[BPF_REG_2] = caller->regs[BPF_REG_3];
+
+	/* unused */
+	__mark_reg_not_init(env, &callee->regs[BPF_REG_3]);
+	__mark_reg_not_init(env, &callee->regs[BPF_REG_4]);
+	__mark_reg_not_init(env, &callee->regs[BPF_REG_5]);
+
+	callee->in_callback_fn = true;
+	return 0;
+}
+
 static int prepare_func_exit(struct bpf_verifier_env *env, int *insn_idx)
 {
 	struct bpf_verifier_state *state = env->cur_state;
@@ -7156,7 +7199,7 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
 	struct bpf_reg_state *regs;
 	struct bpf_call_arg_meta meta;
 	int insn_idx = *insn_idx_p;
-	bool changes_data;
+	bool changes_data, helper_allocated_dynptr;
 	int i, err, func_id;
 
 	/* find function prototype */
@@ -7323,22 +7366,35 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
 		}
 		break;
 	case BPF_FUNC_dynptr_data:
+		helper_allocated_dynptr = false;
 		for (i = 0; i < MAX_BPF_FUNC_REG_ARGS; i++) {
 			if (arg_type_is_dynptr(fn->arg_type[i])) {
-				if (meta.ref_obj_id) {
-					verbose(env, "verifier internal error: meta.ref_obj_id already set\n");
+				struct bpf_reg_state *reg = &regs[BPF_REG_1 + i];
+
+				if (helper_allocated_dynptr || meta.ref_obj_id) {
+					verbose(env, "verifier internal error: multiple dynptrs not supported\n");
 					return -EFAULT;
 				}
-				/* Find the id of the dynptr we're tracking the reference of */
-				meta.ref_obj_id = stack_slot_get_id(env, &regs[BPF_REG_1 + i]);
+
+				if (base_type(reg->type) == PTR_TO_DYNPTR)
+					helper_allocated_dynptr = true;
+				else
+					/* Find the id of the dynptr we're
+					 * tracking the reference of
+					 */
+					meta.ref_obj_id = stack_slot_get_id(env, reg);
 				break;
 			}
 		}
-		if (i == MAX_BPF_FUNC_REG_ARGS) {
+		if (!helper_allocated_dynptr && i == MAX_BPF_FUNC_REG_ARGS) {
 			verbose(env, "verifier internal error: no dynptr in bpf_dynptr_data()\n");
 			return -EFAULT;
 		}
 		break;
+	case BPF_FUNC_user_ringbuf_drain:
+		err = __check_func_call(env, insn, insn_idx_p, meta.subprogno,
+					set_user_ringbuf_callback_state);
+		break;
 	}
 
 	if (err)
diff --git a/tools/include/uapi/linux/bpf.h b/tools/include/uapi/linux/bpf.h
index 25da0d4c7e14..bf6a0bd05eb2 100644
--- a/tools/include/uapi/linux/bpf.h
+++ b/tools/include/uapi/linux/bpf.h
@@ -5356,6 +5356,41 @@ union bpf_attr {
  *	Return
  *		Current *ktime*.
  *
+ * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
+ *	Description
+ *		Drain samples from the specified user ringbuffer, and invoke the
+ *		provided callback for each such sample:
+ *
+ *		long (\*callback_fn)(struct bpf_dynptr \*dynptr, void \*ctx);
+ *
+ *		If **callback_fn** returns 0, the helper will continue to try
+ *		and drain the next sample, up to a maximum of
+ *		BPF_MAX_USER_RINGBUF_SAMPLES samples. If the return value is 1,
+ *		the helper will skip the rest of the samples and return. Other
+ *		return values are not used now, and will be rejected by the
+ *		verifier.
+ *	Return
+ *		The number of drained samples if no error was encountered while
+ *		draining samples. If a user-space producer was epoll-waiting on
+ *		this map, and at least one sample was drained, they will
+ *		receive an event notification notifying them of available space
+ *		in the ringbuffer. If the BPF_RB_NO_WAKEUP flag is passed to
+ *		this function, no wakeup notification will be sent. If there
+ *		are no samples in the ringbuffer, 0 is returned.
+ *
+ *		On failure, the returned value is one of the following:
+ *
+ *		**-EBUSY** if the ringbuffer is contended, and another calling
+ *		context was concurrently draining the ringbuffer.
+ *
+ *		**-EINVAL** if user-space is not properly tracking the
+ *		ringbuffer due to the producer position not being aligned to 8
+ *		bytes, a sample not being aligned to 8 bytes, the producer
+ *		position not matching the advertised length of a sample, or the
+ *		sample size being larger than the ringbuffer.
+ *
+ *		**-E2BIG** if user-space has tried to publish a sample that
+ *		cannot fit within a struct bpf_dynptr.
  */
 #define __BPF_FUNC_MAPPER(FN)		\
 	FN(unspec),			\
@@ -5567,6 +5602,7 @@ union bpf_attr {
 	FN(tcp_raw_check_syncookie_ipv4),	\
 	FN(tcp_raw_check_syncookie_ipv6),	\
 	FN(ktime_get_tai_ns),		\
+	FN(bpf_user_ringbuf_drain),	\
 	/* */
 
 /* integer value in 'imm' field of BPF_CALL instruction selects which helper
-- 
2.37.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH v3 3/4] bpf: Add libbpf logic for user-space ring buffer
  2022-08-18 22:12 [PATCH v3 0/4] bpf: Add user-space-publisher ringbuffer map type David Vernet
  2022-08-18 22:12 ` [PATCH v3 1/4] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF " David Vernet
  2022-08-18 22:12 ` [PATCH v3 2/4] bpf: Add bpf_user_ringbuf_drain() helper David Vernet
@ 2022-08-18 22:12 ` David Vernet
  2022-08-24 21:58   ` Andrii Nakryiko
  2022-08-18 22:12 ` [PATCH v3 4/4] selftests/bpf: Add selftests validating the user ringbuf David Vernet
  2022-08-24 17:38 ` [PATCH v3 0/4] bpf: Add user-space-publisher ringbuffer map type Daniel Borkmann
  4 siblings, 1 reply; 16+ messages in thread
From: David Vernet @ 2022-08-18 22:12 UTC (permalink / raw)
  To: bpf, ast, andrii, daniel
  Cc: kernel-team, martin.lau, song, yhs, john.fastabend, kpsingh, sdf,
	haoluo, jolsa, joannelkoong, tj, linux-kernel

Now that all of the logic is in place in the kernel to support user-space
produced ringbuffers, we can add the user-space logic to libbpf. This
patch therefore adds the following public symbols to libbpf:

struct user_ring_buffer *
user_ring_buffer__new(int map_fd,
		      const struct user_ring_buffer_opts *opts);
void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size);
void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb,
                                         __u32 size, int timeout_ms);
void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample);
void user_ring_buffer__discard(struct user_ring_buffer *rb,
void user_ring_buffer__free(struct user_ring_buffer *rb);

A user-space producer must first create a struct user_ring_buffer * object
with user_ring_buffer__new(), and can then reserve samples in the
ringbuffer using one of the following two symbols:

void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size);
void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb,
                                         __u32 size, int timeout_ms);

With user_ring_buffer__reserve(), a pointer to an @size region of the
ringbuffer will be returned if sufficient space is available in the buffer.
user_ring_buffer__reserve_blocking() provides similar semantics, but will
block for up to @timeout_ms in epoll_wait if there is insufficient space in
the buffer. This function has the guarantee from the kernel that it will
receive at least one event-notification per invocation to
bpf_ringbuf_drain(), provided that at least one sample is drained, and the
BPF program did not pass the BPF_RB_NO_WAKEUP flag to bpf_ringbuf_drain().

Once a sample is reserved, it must either be committed to the ringbuffer
with user_ring_buffer__submit(), or discarded with
user_ring_buffer__discard().

Signed-off-by: David Vernet <void@manifault.com>
---
 tools/lib/bpf/libbpf.c        |  10 +-
 tools/lib/bpf/libbpf.h        |  21 +++
 tools/lib/bpf/libbpf.map      |   6 +
 tools/lib/bpf/libbpf_probes.c |   1 +
 tools/lib/bpf/ringbuf.c       | 327 ++++++++++++++++++++++++++++++++++
 5 files changed, 363 insertions(+), 2 deletions(-)

diff --git a/tools/lib/bpf/libbpf.c b/tools/lib/bpf/libbpf.c
index 6b580ba027ba..588cf0474743 100644
--- a/tools/lib/bpf/libbpf.c
+++ b/tools/lib/bpf/libbpf.c
@@ -2373,6 +2373,12 @@ static size_t adjust_ringbuf_sz(size_t sz)
 	return sz;
 }
 
+static bool map_is_ringbuf(const struct bpf_map *map)
+{
+	return map->def.type == BPF_MAP_TYPE_RINGBUF ||
+	       map->def.type == BPF_MAP_TYPE_USER_RINGBUF;
+}
+
 static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def)
 {
 	map->def.type = def->map_type;
@@ -2387,7 +2393,7 @@ static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def
 	map->btf_value_type_id = def->value_type_id;
 
 	/* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */
-	if (map->def.type == BPF_MAP_TYPE_RINGBUF)
+	if (map_is_ringbuf(map))
 		map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries);
 
 	if (def->parts & MAP_DEF_MAP_TYPE)
@@ -4370,7 +4376,7 @@ int bpf_map__set_max_entries(struct bpf_map *map, __u32 max_entries)
 	map->def.max_entries = max_entries;
 
 	/* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */
-	if (map->def.type == BPF_MAP_TYPE_RINGBUF)
+	if (map_is_ringbuf(map))
 		map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries);
 
 	return 0;
diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h
index 88a1ac34b12a..2902661bc27d 100644
--- a/tools/lib/bpf/libbpf.h
+++ b/tools/lib/bpf/libbpf.h
@@ -1011,6 +1011,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook,
 
 /* Ring buffer APIs */
 struct ring_buffer;
+struct user_ring_buffer;
 
 typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size);
 
@@ -1030,6 +1031,26 @@ LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms);
 LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb);
 LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb);
 
+struct user_ring_buffer_opts {
+	size_t sz; /* size of this struct, for forward/backward compatibility */
+};
+
+#define user_ring_buffer_opts__last_field sz
+
+LIBBPF_API struct user_ring_buffer *
+user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts);
+LIBBPF_API void *user_ring_buffer__reserve(struct user_ring_buffer *rb,
+					   __u32 size);
+
+LIBBPF_API void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb,
+						    __u32 size,
+						    int timeout_ms);
+LIBBPF_API void user_ring_buffer__submit(struct user_ring_buffer *rb,
+					 void *sample);
+LIBBPF_API void user_ring_buffer__discard(struct user_ring_buffer *rb,
+					  void *sample);
+LIBBPF_API void user_ring_buffer__free(struct user_ring_buffer *rb);
+
 /* Perf buffer APIs */
 struct perf_buffer;
 
diff --git a/tools/lib/bpf/libbpf.map b/tools/lib/bpf/libbpf.map
index 2b928dc21af0..40c83563f90a 100644
--- a/tools/lib/bpf/libbpf.map
+++ b/tools/lib/bpf/libbpf.map
@@ -367,4 +367,10 @@ LIBBPF_1.0.0 {
 		libbpf_bpf_map_type_str;
 		libbpf_bpf_prog_type_str;
 		perf_buffer__buffer;
+		user_ring_buffer__discard;
+		user_ring_buffer__free;
+		user_ring_buffer__new;
+		user_ring_buffer__reserve;
+		user_ring_buffer__reserve_blocking;
+		user_ring_buffer__submit;
 };
diff --git a/tools/lib/bpf/libbpf_probes.c b/tools/lib/bpf/libbpf_probes.c
index 6d495656f554..f3a8e8e74eb8 100644
--- a/tools/lib/bpf/libbpf_probes.c
+++ b/tools/lib/bpf/libbpf_probes.c
@@ -231,6 +231,7 @@ static int probe_map_create(enum bpf_map_type map_type)
 			return btf_fd;
 		break;
 	case BPF_MAP_TYPE_RINGBUF:
+	case BPF_MAP_TYPE_USER_RINGBUF:
 		key_size = 0;
 		value_size = 0;
 		max_entries = 4096;
diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c
index 8bc117bcc7bc..bf57088917c2 100644
--- a/tools/lib/bpf/ringbuf.c
+++ b/tools/lib/bpf/ringbuf.c
@@ -16,6 +16,7 @@
 #include <asm/barrier.h>
 #include <sys/mman.h>
 #include <sys/epoll.h>
+#include <time.h>
 
 #include "libbpf.h"
 #include "libbpf_internal.h"
@@ -39,6 +40,23 @@ struct ring_buffer {
 	int ring_cnt;
 };
 
+struct user_ring_buffer {
+	struct epoll_event event;
+	unsigned long *consumer_pos;
+	unsigned long *producer_pos;
+	void *data;
+	unsigned long mask;
+	size_t page_size;
+	int map_fd;
+	int epoll_fd;
+};
+
+/* 8-byte ring buffer header structure */
+struct ringbuf_hdr {
+	__u32 len;
+	__u32 pad;
+};
+
 static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r)
 {
 	if (r->consumer_pos) {
@@ -300,3 +318,312 @@ int ring_buffer__epoll_fd(const struct ring_buffer *rb)
 {
 	return rb->epoll_fd;
 }
+
+static void user_ringbuf_unmap_ring(struct user_ring_buffer *rb)
+{
+	if (rb->consumer_pos) {
+		munmap(rb->consumer_pos, rb->page_size);
+		rb->consumer_pos = NULL;
+	}
+	if (rb->producer_pos) {
+		munmap(rb->producer_pos, rb->page_size + 2 * (rb->mask + 1));
+		rb->producer_pos = NULL;
+	}
+}
+
+void user_ring_buffer__free(struct user_ring_buffer *rb)
+{
+	if (!rb)
+		return;
+
+	user_ringbuf_unmap_ring(rb);
+
+	if (rb->epoll_fd >= 0)
+		close(rb->epoll_fd);
+
+	free(rb);
+}
+
+static int user_ringbuf_map(struct user_ring_buffer *rb, int map_fd)
+{
+	struct bpf_map_info info;
+	__u32 len = sizeof(info);
+	void *tmp;
+	struct epoll_event *rb_epoll;
+	int err;
+
+	memset(&info, 0, sizeof(info));
+
+	err = bpf_obj_get_info_by_fd(map_fd, &info, &len);
+	if (err) {
+		err = -errno;
+		pr_warn("user ringbuf: failed to get map info for fd=%d: %d\n", map_fd, err);
+		return libbpf_err(err);
+	}
+
+	if (info.type != BPF_MAP_TYPE_USER_RINGBUF) {
+		pr_warn("user ringbuf: map fd=%d is not BPF_MAP_TYPE_USER_RINGBUF\n", map_fd);
+		return libbpf_err(-EINVAL);
+	}
+
+	rb->map_fd = map_fd;
+	rb->mask = info.max_entries - 1;
+
+	/* Map read-only consumer page */
+	tmp = mmap(NULL, rb->page_size, PROT_READ, MAP_SHARED, map_fd, 0);
+	if (tmp == MAP_FAILED) {
+		err = -errno;
+		pr_warn("user ringbuf: failed to mmap consumer page for map fd=%d: %d\n",
+			map_fd, err);
+		return libbpf_err(err);
+	}
+	rb->consumer_pos = tmp;
+
+	/* Map read-write the producer page and data pages. We map the data
+	 * region as twice the total size of the ringbuffer to allow the simple
+	 * reading and writing of samples that wrap around the end of the
+	 * buffer.  See the kernel implementation for details.
+	 */
+	tmp = mmap(NULL, rb->page_size + 2 * info.max_entries,
+		   PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, rb->page_size);
+	if (tmp == MAP_FAILED) {
+		err = -errno;
+		pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %d\n",
+			map_fd, err);
+		return libbpf_err(err);
+	}
+
+	rb->producer_pos = tmp;
+	rb->data = tmp + rb->page_size;
+
+	rb_epoll = &rb->event;
+	rb_epoll->events = EPOLLOUT;
+	if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, rb_epoll) < 0) {
+		err = -errno;
+		pr_warn("user ringbuf: failed to epoll add map fd=%d: %d\n", map_fd, err);
+		return libbpf_err(err);
+	}
+
+	return 0;
+}
+
+struct user_ring_buffer *
+user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts)
+{
+	struct user_ring_buffer *rb;
+	int err;
+
+	if (!OPTS_VALID(opts, ring_buffer_opts))
+		return errno = EINVAL, NULL;
+
+	rb = calloc(1, sizeof(*rb));
+	if (!rb)
+		return errno = ENOMEM, NULL;
+
+	rb->page_size = getpagesize();
+
+	rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+	if (rb->epoll_fd < 0) {
+		err = -errno;
+		pr_warn("user ringbuf: failed to create epoll instance: %d\n", err);
+		goto err_out;
+	}
+
+	err = user_ringbuf_map(rb, map_fd);
+	if (err)
+		goto err_out;
+
+	return rb;
+
+err_out:
+	user_ring_buffer__free(rb);
+	return errno = -err, NULL;
+}
+
+static void user_ringbuf__commit(struct user_ring_buffer *rb, void *sample, bool discard)
+{
+	__u32 new_len;
+	struct ringbuf_hdr *hdr;
+
+	/* All samples are aligned to 8 bytes, so the header will only ever
+	 * wrap around the back of the ringbuffer if the sample is at the
+	 * very beginning of the ringbuffer.
+	 */
+	if (sample == rb->data)
+		hdr = rb->data + (rb->mask - BPF_RINGBUF_HDR_SZ + 1);
+	else
+		hdr = sample - BPF_RINGBUF_HDR_SZ;
+
+	new_len = hdr->len & ~BPF_RINGBUF_BUSY_BIT;
+	if (discard)
+		new_len |= BPF_RINGBUF_DISCARD_BIT;
+
+	/* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in
+	 * the kernel.
+	 */
+	__atomic_exchange_n(&hdr->len, new_len, __ATOMIC_ACQ_REL);
+}
+
+/* Discard a previously reserved sample into the ring buffer.  It is not
+ * necessary to synchronize amongst multiple producers when invoking this
+ * function.
+ */
+void user_ring_buffer__discard(struct user_ring_buffer *rb, void *sample)
+{
+	user_ringbuf__commit(rb, sample, true);
+}
+
+/* Submit a previously reserved sample into the ring buffer. It is not
+ * necessary to synchronize amongst multiple producers when invoking this
+ * function.
+ */
+void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample)
+{
+	user_ringbuf__commit(rb, sample, false);
+}
+
+/* Reserve a pointer to a sample in the user ring buffer. This function is
+ * *not* thread safe, and callers must synchronize accessing this function if
+ * there are multiple producers.
+ *
+ * If a size is requested that is larger than the size of the entire
+ * ringbuffer, errno is set to E2BIG and NULL is returned. If the ringbuffer
+ * could accommodate the size, but currently does not have enough space, errno
+ * is set to ENODATA and NULL is returned.
+ *
+ * Otherwise, a pointer to the sample is returned. After initializing the
+ * sample, callers must invoke user_ring_buffer__submit() to post the sample to
+ * the kernel. Otherwise, the sample must be freed with
+ * user_ring_buffer__discard().
+ */
+void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size)
+{
+	__u32 avail_size, total_size, max_size;
+	/* 64-bit to avoid overflow in case of extreme application behavior */
+	__u64 cons_pos, prod_pos;
+	struct ringbuf_hdr *hdr;
+
+	/* Synchronizes with smp_store_release() in __bpf_user_ringbuf_peek() in
+	 * the kernel.
+	 */
+	cons_pos = smp_load_acquire(rb->consumer_pos);
+	/* Synchronizes with smp_store_release() in user_ringbuf__commit() */
+	prod_pos = smp_load_acquire(rb->producer_pos);
+
+	/* Round up size to a multiple of 8. */
+	size = (size + 7) / 8 * 8;
+	max_size = rb->mask + 1;
+	avail_size = max_size - (prod_pos - cons_pos);
+	total_size = size + BPF_RINGBUF_HDR_SZ;
+
+	if (total_size > max_size)
+		return errno = E2BIG, NULL;
+
+	if (avail_size < total_size)
+		return errno = ENODATA, NULL;
+
+	hdr = rb->data + (prod_pos & rb->mask);
+	hdr->len = size | BPF_RINGBUF_BUSY_BIT;
+	hdr->pad = 0;
+
+	/* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in
+	 * the kernel.
+	 */
+	smp_store_release(rb->producer_pos, prod_pos + total_size);
+
+	return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask);
+}
+
+static int ms_elapsed_timespec(const struct timespec *start, const struct timespec *end)
+{
+	int total, ns_per_ms = 1000000, ns_per_s = ns_per_ms * 1000;
+
+	if (end->tv_sec > start->tv_sec) {
+		total = 1000 * (end->tv_sec - start->tv_sec);
+		total += (end->tv_nsec + (ns_per_s - start->tv_nsec)) / ns_per_ms;
+	} else {
+		total = (end->tv_nsec - start->tv_nsec) / ns_per_ms;
+	}
+
+	return total;
+}
+
+/* Reserve a record in the ringbuffer, possibly blocking for up to @timeout_ms
+ * until a sample becomes available.  This function is *not* thread safe, and
+ * callers must synchronize accessing this function if there are multiple
+ * producers.
+ *
+ * If @timeout_ms is -1, the function will block indefinitely until a sample
+ * becomes available. Otherwise, @timeout_ms must be non-negative, or errno
+ * will be set to EINVAL, and NULL will be returned. If @timeout_ms is 0,
+ * no blocking will occur and the function will return immediately after
+ * attempting to reserve a sample.
+ *
+ * If @size is larger than the size of the entire ringbuffer, errno is set to
+ * E2BIG and NULL is returned. If the ringbuffer could accommodate @size, but
+ * currently does not have enough space, the caller will block until at most
+ * @timeout_ms has elapsed. If insufficient space is available at that time,
+ * errno will be set to ENODATA, and NULL will be returned.
+ *
+ * The kernel guarantees that it will wake up this thread to check if
+ * sufficient space is available in the ringbuffer at least once per invocation
+ * of the bpf_ringbuf_drain() helper function, provided that at least one
+ * sample is consumed, and the BPF program did not invoke the function with
+ * BPF_RB_NO_WAKEUP. A wakeup may occur sooner than that, but the kernel does
+ * not guarantee this.
+ *
+ * When a sample of size @size is found within @timeout_ms, a pointer to the
+ * sample is returned. After initializing the sample, callers must invoke
+ * user_ring_buffer__submit() to post the sample to the ringbuffer. Otherwise,
+ * the sample must be freed with user_ring_buffer__discard().
+ */
+void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms)
+{
+	int ms_elapsed = 0, err;
+	struct timespec start;
+
+	if (timeout_ms < 0 && timeout_ms != -1)
+		return errno = EINVAL, NULL;
+
+	if (timeout_ms != -1) {
+		err = clock_gettime(CLOCK_MONOTONIC, &start);
+		if (err)
+			return NULL;
+	}
+
+	do {
+		int cnt, ms_remaining = timeout_ms - ms_elapsed;
+		void *sample;
+		struct timespec curr;
+
+		sample = user_ring_buffer__reserve(rb, size);
+		if (sample)
+			return sample;
+		else if (errno != ENODATA)
+			return NULL;
+
+		/* The kernel guarantees at least one event notification
+		 * delivery whenever at least one sample is drained from the
+		 * ringbuffer in an invocation to bpf_ringbuf_drain(). Other
+		 * additional events may be delivered at any time, but only one
+		 * event is guaranteed per bpf_ringbuf_drain() invocation,
+		 * provided that a sample is drained, and the BPF program did
+		 * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain().
+		 */
+		cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining);
+		if (cnt < 0)
+			return NULL;
+
+		if (timeout_ms == -1)
+			continue;
+
+		err = clock_gettime(CLOCK_MONOTONIC, &curr);
+		if (err)
+			return NULL;
+
+		ms_elapsed = ms_elapsed_timespec(&start, &curr);
+	} while (ms_elapsed <= timeout_ms);
+
+	errno = ENODATA;
+	return NULL;
+}
-- 
2.37.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH v3 4/4] selftests/bpf: Add selftests validating the user ringbuf
  2022-08-18 22:12 [PATCH v3 0/4] bpf: Add user-space-publisher ringbuffer map type David Vernet
                   ` (2 preceding siblings ...)
  2022-08-18 22:12 ` [PATCH v3 3/4] bpf: Add libbpf logic for user-space ring buffer David Vernet
@ 2022-08-18 22:12 ` David Vernet
  2022-08-24 22:03   ` Andrii Nakryiko
  2022-08-24 17:38 ` [PATCH v3 0/4] bpf: Add user-space-publisher ringbuffer map type Daniel Borkmann
  4 siblings, 1 reply; 16+ messages in thread
From: David Vernet @ 2022-08-18 22:12 UTC (permalink / raw)
  To: bpf, ast, andrii, daniel
  Cc: kernel-team, martin.lau, song, yhs, john.fastabend, kpsingh, sdf,
	haoluo, jolsa, joannelkoong, tj, linux-kernel

This change includes selftests that validate the expected behavior and
APIs of the new BPF_MAP_TYPE_USER_RINGBUF map type.

Signed-off-by: David Vernet <void@manifault.com>
---
 .../selftests/bpf/prog_tests/user_ringbuf.c   | 755 ++++++++++++++++++
 .../selftests/bpf/progs/user_ringbuf_fail.c   | 177 ++++
 .../bpf/progs/user_ringbuf_success.c          | 220 +++++
 .../testing/selftests/bpf/test_user_ringbuf.h |  35 +
 4 files changed, 1187 insertions(+)
 create mode 100644 tools/testing/selftests/bpf/prog_tests/user_ringbuf.c
 create mode 100644 tools/testing/selftests/bpf/progs/user_ringbuf_fail.c
 create mode 100644 tools/testing/selftests/bpf/progs/user_ringbuf_success.c
 create mode 100644 tools/testing/selftests/bpf/test_user_ringbuf.h

diff --git a/tools/testing/selftests/bpf/prog_tests/user_ringbuf.c b/tools/testing/selftests/bpf/prog_tests/user_ringbuf.c
new file mode 100644
index 000000000000..83a0e1fad4a1
--- /dev/null
+++ b/tools/testing/selftests/bpf/prog_tests/user_ringbuf.c
@@ -0,0 +1,755 @@
+// SPDX-License-Identifier: GPL-2.0
+/* Copyright (c) 2022 Meta Platforms, Inc. and affiliates. */
+
+#define _GNU_SOURCE
+#include <linux/compiler.h>
+#include <linux/ring_buffer.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/mman.h>
+#include <sys/syscall.h>
+#include <sys/sysinfo.h>
+#include <test_progs.h>
+#include <uapi/linux/bpf.h>
+#include <unistd.h>
+
+#include "user_ringbuf_fail.skel.h"
+#include "user_ringbuf_success.skel.h"
+
+#include "../test_user_ringbuf.h"
+
+static int duration;
+static size_t log_buf_sz = 1 << 20; /* 1 MB */
+static char obj_log_buf[1048576];
+static const long c_sample_size = sizeof(struct sample) + BPF_RINGBUF_HDR_SZ;
+static const long c_ringbuf_size = 1 << 12; /* 1 small page */
+static const long c_max_entries = c_ringbuf_size / c_sample_size;
+
+static void drain_current_samples(void)
+{
+	syscall(__NR_getpgid);
+}
+
+static int write_samples(struct user_ring_buffer *ringbuf, uint32_t num_samples)
+{
+	int i, err = 0;
+
+	/* Write some number of samples to the ring buffer. */
+	for (i = 0; i < num_samples; i++) {
+		struct sample *entry;
+		int read;
+
+		entry = user_ring_buffer__reserve(ringbuf, sizeof(*entry));
+		if (!entry) {
+			err = -errno;
+			goto done;
+		}
+
+		entry->pid = getpid();
+		entry->seq = i;
+		entry->value = i * i;
+
+		read = snprintf(entry->comm, sizeof(entry->comm), "%u", i);
+		if (read <= 0) {
+			/* Only invoke CHECK on the error path to avoid spamming
+			 * logs with mostly success messages.
+			 */
+			CHECK(read <= 0, "snprintf_comm",
+			      "Failed to write index %d to comm\n", i);
+			err = read;
+			user_ring_buffer__discard(ringbuf, entry);
+			goto done;
+		}
+
+		user_ring_buffer__submit(ringbuf, entry);
+	}
+
+done:
+	drain_current_samples();
+
+	return err;
+}
+
+static struct user_ringbuf_success *open_load_ringbuf_skel(void)
+{
+	struct user_ringbuf_success *skel;
+	int err;
+
+	skel = user_ringbuf_success__open();
+	if (CHECK(!skel, "skel_open", "skeleton open failed\n"))
+		return NULL;
+
+	err = bpf_map__set_max_entries(skel->maps.user_ringbuf, c_ringbuf_size);
+	if (CHECK(err != 0, "set_max_entries", "set max entries failed: %d\n", err))
+		goto cleanup;
+
+	err = bpf_map__set_max_entries(skel->maps.kernel_ringbuf, c_ringbuf_size);
+	if (CHECK(err != 0, "set_max_entries", "set max entries failed: %d\n", err))
+		goto cleanup;
+
+	err = user_ringbuf_success__load(skel);
+	if (CHECK(err != 0, "skel_load", "skeleton load failed\n"))
+		goto cleanup;
+
+	return skel;
+
+cleanup:
+	user_ringbuf_success__destroy(skel);
+	return NULL;
+}
+
+static void test_user_ringbuf_mappings(void)
+{
+	int err, rb_fd;
+	int page_size = getpagesize();
+	void *mmap_ptr;
+	struct user_ringbuf_success *skel;
+
+	skel = open_load_ringbuf_skel();
+	if (!skel)
+		return;
+
+	rb_fd = bpf_map__fd(skel->maps.user_ringbuf);
+	/* cons_pos can be mapped R/O, can't add +X with mprotect. */
+	mmap_ptr = mmap(NULL, page_size, PROT_READ, MAP_SHARED, rb_fd, 0);
+	ASSERT_OK_PTR(mmap_ptr, "ro_cons_pos");
+	ASSERT_ERR(mprotect(mmap_ptr, page_size, PROT_WRITE), "write_cons_pos_protect");
+	ASSERT_ERR(mprotect(mmap_ptr, page_size, PROT_EXEC), "exec_cons_pos_protect");
+	ASSERT_ERR_PTR(mremap(mmap_ptr, 0, 4 * page_size, MREMAP_MAYMOVE), "wr_prod_pos");
+	err = -errno;
+	ASSERT_EQ(err, -EPERM, "wr_prod_pos_err");
+	ASSERT_OK(munmap(mmap_ptr, page_size), "unmap_ro_cons");
+
+	/* prod_pos can be mapped RW, can't add +X with mprotect. */
+	mmap_ptr = mmap(NULL, page_size, PROT_READ | PROT_WRITE, MAP_SHARED,
+			rb_fd, page_size);
+	ASSERT_OK_PTR(mmap_ptr, "rw_prod_pos");
+	ASSERT_ERR(mprotect(mmap_ptr, page_size, PROT_EXEC), "exec_prod_pos_protect");
+	err = -errno;
+	ASSERT_EQ(err, -EACCES, "wr_prod_pos_err");
+	ASSERT_OK(munmap(mmap_ptr, page_size), "unmap_rw_prod");
+
+	/* data pages can be mapped RW, can't add +X with mprotect. */
+	mmap_ptr = mmap(NULL, page_size, PROT_WRITE, MAP_SHARED, rb_fd,
+			2 * page_size);
+	ASSERT_OK_PTR(mmap_ptr, "rw_data");
+	ASSERT_ERR(mprotect(mmap_ptr, page_size, PROT_EXEC), "exec_data_protect");
+	err = -errno;
+	ASSERT_EQ(err, -EACCES, "exec_data_err");
+	ASSERT_OK(munmap(mmap_ptr, page_size), "unmap_rw_data");
+
+	user_ringbuf_success__destroy(skel);
+}
+
+static int load_skel_create_ringbufs(struct user_ringbuf_success **skel_out,
+				     struct ring_buffer **kern_ringbuf_out,
+				     ring_buffer_sample_fn callback,
+				     struct user_ring_buffer **user_ringbuf_out)
+{
+	struct user_ringbuf_success *skel;
+	struct ring_buffer *kern_ringbuf = NULL;
+	struct user_ring_buffer *user_ringbuf = NULL;
+	int err = -ENOMEM, rb_fd;
+
+	skel = open_load_ringbuf_skel();
+	if (!skel)
+		return err;
+
+	/* only trigger BPF program for current process */
+	skel->bss->pid = getpid();
+
+	if (kern_ringbuf_out) {
+		rb_fd = bpf_map__fd(skel->maps.kernel_ringbuf);
+		kern_ringbuf = ring_buffer__new(rb_fd, callback, skel, NULL);
+		if (CHECK(!kern_ringbuf, "kern_ringbuf_create", "failed to create kern ringbuf\n"))
+			goto cleanup;
+
+		*kern_ringbuf_out = kern_ringbuf;
+	}
+
+	if (user_ringbuf_out) {
+		rb_fd = bpf_map__fd(skel->maps.user_ringbuf);
+		user_ringbuf = user_ring_buffer__new(rb_fd, NULL);
+		if (CHECK(!user_ringbuf, "user_ringbuf_create", "failed to create user ringbuf\n"))
+			goto cleanup;
+
+		*user_ringbuf_out = user_ringbuf;
+		ASSERT_EQ(skel->bss->read, 0, "no_reads_after_load");
+	}
+
+	err = user_ringbuf_success__attach(skel);
+	if (CHECK(err != 0, "skel_attach", "skeleton attachment failed: %d\n", err))
+		goto cleanup;
+
+	*skel_out = skel;
+	return 0;
+
+cleanup:
+	if (kern_ringbuf_out)
+		*kern_ringbuf_out = NULL;
+	if (user_ringbuf_out)
+		*user_ringbuf_out = NULL;
+	ring_buffer__free(kern_ringbuf);
+	user_ring_buffer__free(user_ringbuf);
+	user_ringbuf_success__destroy(skel);
+	return err;
+}
+
+static int load_skel_create_user_ringbuf(struct user_ringbuf_success **skel_out,
+					 struct user_ring_buffer **ringbuf_out)
+{
+	return load_skel_create_ringbufs(skel_out, NULL, NULL, ringbuf_out);
+}
+
+static void manually_write_test_invalid_sample(struct user_ringbuf_success *skel,
+					       __u32 size, __u64 producer_pos)
+{
+	void *data_ptr;
+	__u64 *producer_pos_ptr;
+	int rb_fd, page_size = getpagesize();
+
+	rb_fd = bpf_map__fd(skel->maps.user_ringbuf);
+
+	ASSERT_EQ(skel->bss->read, 0, "num_samples_before_bad_sample");
+
+	/* Map the producer_pos as RW. */
+	producer_pos_ptr = mmap(NULL, page_size, PROT_READ | PROT_WRITE,
+				MAP_SHARED, rb_fd, page_size);
+	ASSERT_OK_PTR(producer_pos_ptr, "producer_pos_ptr");
+
+	/* Map the data pages as RW. */
+	data_ptr = mmap(NULL, page_size, PROT_WRITE, MAP_SHARED, rb_fd, 2 * page_size);
+	ASSERT_OK_PTR(data_ptr, "rw_data");
+
+	memset(data_ptr, 0, BPF_RINGBUF_HDR_SZ);
+	*(__u32 *)data_ptr = size;
+
+	/* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in the kernel. */
+	smp_store_release(producer_pos_ptr, producer_pos + BPF_RINGBUF_HDR_SZ);
+
+	drain_current_samples();
+	ASSERT_EQ(skel->bss->read, 0, "num_samples_after_bad_sample");
+	ASSERT_EQ(skel->bss->err, -EINVAL, "err_after_bad_sample");
+
+	ASSERT_OK(munmap(producer_pos_ptr, page_size), "unmap_producer_pos");
+	ASSERT_OK(munmap(data_ptr, page_size), "unmap_data_ptr");
+}
+
+static void test_user_ringbuf_post_misaligned(void)
+{
+	struct user_ringbuf_success *skel;
+	struct user_ring_buffer *ringbuf;
+	int err;
+	__u32 size = (1 << 5) + 7;
+
+	err = load_skel_create_user_ringbuf(&skel, &ringbuf);
+	if (CHECK(err, "misaligned_skel", "Failed to create skel\n"))
+		return;
+
+	manually_write_test_invalid_sample(skel, size, size);
+	user_ring_buffer__free(ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static void test_user_ringbuf_post_producer_wrong_offset(void)
+{
+	struct user_ringbuf_success *skel;
+	struct user_ring_buffer *ringbuf;
+	int err;
+	__u32 size = (1 << 5);
+
+	err = load_skel_create_user_ringbuf(&skel, &ringbuf);
+	if (CHECK(err, "wrong_offset_skel", "Failed to create skel\n"))
+		return;
+
+	manually_write_test_invalid_sample(skel, size, size - 8);
+	user_ring_buffer__free(ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static void test_user_ringbuf_post_larger_than_ringbuf_sz(void)
+{
+	struct user_ringbuf_success *skel;
+	struct user_ring_buffer *ringbuf;
+	int err;
+	__u32 size = c_ringbuf_size;
+
+	err = load_skel_create_user_ringbuf(&skel, &ringbuf);
+	if (CHECK(err, "huge_sample_skel", "Failed to create skel\n"))
+		return;
+
+	manually_write_test_invalid_sample(skel, size, size);
+	user_ring_buffer__free(ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static void test_user_ringbuf_basic(void)
+{
+	struct user_ringbuf_success *skel;
+	struct user_ring_buffer *ringbuf;
+	int err;
+
+	err = load_skel_create_user_ringbuf(&skel, &ringbuf);
+	if (CHECK(err, "ringbuf_basic_skel", "Failed to create skel\n"))
+		return;
+
+	ASSERT_EQ(skel->bss->read, 0, "num_samples_read_before");
+
+	err = write_samples(ringbuf, 2);
+	if (CHECK(err, "write_samples", "failed to write samples: %d\n", err))
+		goto cleanup;
+
+	ASSERT_EQ(skel->bss->read, 2, "num_samples_read_after");
+
+cleanup:
+	user_ring_buffer__free(ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static void test_user_ringbuf_sample_full_ringbuffer(void)
+{
+	struct user_ringbuf_success *skel;
+	struct user_ring_buffer *ringbuf;
+	int err;
+	void *sample;
+
+	err = load_skel_create_user_ringbuf(&skel, &ringbuf);
+	if (CHECK(err, "ringbuf_full_sample_skel", "Failed to create skel\n"))
+		return;
+
+	sample = user_ring_buffer__reserve(ringbuf, c_ringbuf_size - BPF_RINGBUF_HDR_SZ);
+	if (CHECK(sample == NULL, "full_sample", "failed to create big sample: %d\n", errno))
+		goto cleanup;
+
+	user_ring_buffer__submit(ringbuf, sample);
+	ASSERT_EQ(skel->bss->read, 0, "num_samples_read_before");
+	drain_current_samples();
+	ASSERT_EQ(skel->bss->read, 1, "num_samples_read_after");
+
+cleanup:
+	user_ring_buffer__free(ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static void test_user_ringbuf_post_alignment_autoadjust(void)
+{
+	struct user_ringbuf_success *skel;
+	struct user_ring_buffer *ringbuf;
+	struct sample *sample;
+	int err;
+
+	err = load_skel_create_user_ringbuf(&skel, &ringbuf);
+	if (CHECK(err, "ringbuf_align_autoadjust_skel", "Failed to create skel\n"))
+		return;
+
+	/* libbpf should automatically round any sample up to an 8-byte alignment. */
+	sample = user_ring_buffer__reserve(ringbuf, sizeof(*sample) + 1);
+	CHECK(sample == NULL, "reserve_autoaligned",
+	      "Failed to properly handle autoaligned size\n");
+	user_ring_buffer__submit(ringbuf, sample);
+
+	ASSERT_EQ(skel->bss->read, 0, "num_samples_read_before");
+	drain_current_samples();
+	ASSERT_EQ(skel->bss->read, 1, "num_samples_read_after");
+
+	user_ring_buffer__free(ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static void test_user_ringbuf_overfill(void)
+{
+	struct user_ringbuf_success *skel;
+	struct user_ring_buffer *ringbuf;
+	int err;
+
+	err = load_skel_create_user_ringbuf(&skel, &ringbuf);
+	if (err)
+		return;
+
+	err = write_samples(ringbuf, c_max_entries * 5);
+	CHECK(!err, "write_samples", "Incorrectly overcommitted ringbuffer\n");
+	ASSERT_EQ(errno, ENODATA, "too_many_samples_posted");
+	ASSERT_EQ(skel->bss->read, c_max_entries, "max_entries");
+
+	user_ring_buffer__free(ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static void test_user_ringbuf_discards_properly_ignored(void)
+{
+	struct user_ringbuf_success *skel;
+	struct user_ring_buffer *ringbuf;
+	int err, num_discarded = 0;
+	__u64 *token;
+
+	err = load_skel_create_user_ringbuf(&skel, &ringbuf);
+	if (err)
+		return;
+
+	ASSERT_EQ(skel->bss->read, 0, "num_samples_read_before");
+
+	while (1) {
+		/* Write samples until the buffer is full. */
+		token = user_ring_buffer__reserve(ringbuf, sizeof(*token));
+		if (!token)
+			break;
+
+		user_ring_buffer__discard(ringbuf, token);
+		num_discarded++;
+	}
+
+	if (!ASSERT_GE(num_discarded, 0, "num_discarded"))
+		goto cleanup;
+
+	/* Should not read any samples, as they are all discarded. */
+	ASSERT_EQ(skel->bss->read, 0, "num_pre_kick");
+	drain_current_samples();
+	ASSERT_EQ(skel->bss->read, 0, "num_post_kick");
+
+	/* Now that the ringbuffer has been drained, we should be able to reserve another token. */
+	token = user_ring_buffer__reserve(ringbuf, sizeof(*token));
+
+	if (CHECK(!token, "new_token", "Failed to get entry after drain\n"))
+		goto cleanup;
+
+	user_ring_buffer__discard(ringbuf, token);
+cleanup:
+	user_ring_buffer__free(ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static void test_user_ringbuf_loop(void)
+{
+	struct user_ringbuf_success *skel;
+	struct user_ring_buffer *ringbuf;
+	uint32_t total_samples = 8192;
+	uint32_t remaining_samples = total_samples;
+	int err;
+
+	BUILD_BUG_ON(total_samples <= c_max_entries);
+	err = load_skel_create_user_ringbuf(&skel, &ringbuf);
+	if (err)
+		return;
+
+	do  {
+		uint32_t curr_samples;
+
+		curr_samples = remaining_samples > c_max_entries
+			? c_max_entries : remaining_samples;
+		err = write_samples(ringbuf, curr_samples);
+		if (err != 0) {
+			/* Perform CHECK inside of if statement to avoid
+			 * flooding logs on the success path.
+			 */
+			CHECK(err, "write_samples", "failed to write sample batch: %d\n", err);
+			goto cleanup;
+		}
+
+		remaining_samples -= curr_samples;
+		ASSERT_EQ(skel->bss->read, total_samples - remaining_samples,
+			  "current_batched_entries");
+	} while (remaining_samples > 0);
+	ASSERT_EQ(skel->bss->read, total_samples, "total_batched_entries");
+
+cleanup:
+	user_ring_buffer__free(ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static int send_test_message(struct user_ring_buffer *ringbuf,
+			     enum test_msg_op op, s64 operand_64,
+			     s32 operand_32)
+{
+	struct test_msg *msg;
+
+	msg = user_ring_buffer__reserve(ringbuf, sizeof(*msg));
+	if (!msg) {
+		/* Only invoke CHECK on the error path to avoid spamming
+		 * logs with mostly success messages.
+		 */
+		CHECK(msg != NULL, "reserve_msg", "Failed to reserve message\n");
+		return -ENOMEM;
+	}
+
+	msg->msg_op = op;
+
+	switch (op) {
+	case TEST_MSG_OP_INC64:
+	case TEST_MSG_OP_MUL64:
+		msg->operand_64 = operand_64;
+		break;
+	case TEST_MSG_OP_INC32:
+	case TEST_MSG_OP_MUL32:
+		msg->operand_32 = operand_32;
+		break;
+	default:
+		PRINT_FAIL("Invalid operand %d\n", op);
+		user_ring_buffer__discard(ringbuf, msg);
+		return -EINVAL;
+	}
+
+	user_ring_buffer__submit(ringbuf, msg);
+
+	return 0;
+}
+
+static void kick_kernel_read_messages(void)
+{
+	syscall(__NR_getcwd);
+}
+
+static int handle_kernel_msg(void *ctx, void *data, size_t len)
+{
+	struct user_ringbuf_success *skel = ctx;
+	struct test_msg *msg = data;
+
+	switch (msg->msg_op) {
+	case TEST_MSG_OP_INC64:
+		skel->bss->user_mutated += msg->operand_64;
+		return 0;
+	case TEST_MSG_OP_INC32:
+		skel->bss->user_mutated += msg->operand_32;
+		return 0;
+	case TEST_MSG_OP_MUL64:
+		skel->bss->user_mutated *= msg->operand_64;
+		return 0;
+	case TEST_MSG_OP_MUL32:
+		skel->bss->user_mutated *= msg->operand_32;
+		return 0;
+	default:
+		fprintf(stderr, "Invalid operand %d\n", msg->msg_op);
+		return -EINVAL;
+	}
+}
+
+static void drain_kernel_messages_buffer(struct ring_buffer *kern_ringbuf)
+{
+	int err;
+
+	err = ring_buffer__consume(kern_ringbuf);
+	if (err)
+		/* Only check in failure to avoid spamming success logs. */
+		CHECK(!err, "consume_kern_ringbuf", "Failed to consume kernel ringbuf\n");
+}
+
+static void test_user_ringbuf_msg_protocol(void)
+{
+	struct user_ringbuf_success *skel;
+	struct user_ring_buffer *user_ringbuf;
+	struct ring_buffer *kern_ringbuf;
+	int err, i;
+	__u64 expected_kern = 0;
+
+	err = load_skel_create_ringbufs(&skel, &kern_ringbuf, handle_kernel_msg, &user_ringbuf);
+	if (CHECK(err, "create_ringbufs", "Failed to create ringbufs: %d\n", err))
+		return;
+
+	for (i = 0; i < 64; i++) {
+		enum test_msg_op op = i % TEST_MSG_OP_NUM_OPS;
+		__u64 operand_64 = TEST_OP_64;
+		__u32 operand_32 = TEST_OP_32;
+
+		err = send_test_message(user_ringbuf, op, operand_64, operand_32);
+		if (err) {
+			CHECK(err, "send_test_message", "Failed to send test message\n");
+			goto cleanup;
+		}
+
+		switch (op) {
+		case TEST_MSG_OP_INC64:
+			expected_kern += operand_64;
+			break;
+		case TEST_MSG_OP_INC32:
+			expected_kern += operand_32;
+			break;
+		case TEST_MSG_OP_MUL64:
+			expected_kern *= operand_64;
+			break;
+		case TEST_MSG_OP_MUL32:
+			expected_kern *= operand_32;
+			break;
+		default:
+			PRINT_FAIL("Unexpected op %d\n", op);
+			goto cleanup;
+		}
+
+		if (i % 8 == 0) {
+			kick_kernel_read_messages();
+			ASSERT_EQ(skel->bss->kern_mutated, expected_kern, "expected_kern");
+			ASSERT_EQ(skel->bss->err, 0, "bpf_prog_err");
+			drain_kernel_messages_buffer(kern_ringbuf);
+		}
+	}
+
+cleanup:
+	ring_buffer__free(kern_ringbuf);
+	user_ring_buffer__free(user_ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static void *kick_kernel_cb(void *arg)
+{
+	/* Kick the kernel, causing it to drain the ringbuffer and then wake up
+	 * the test thread waiting on epoll.
+	 */
+	syscall(__NR_getrlimit);
+
+	return NULL;
+}
+
+static int spawn_kick_thread_for_poll(void)
+{
+	pthread_t thread;
+
+	return pthread_create(&thread, NULL, kick_kernel_cb, NULL);
+}
+
+static void test_user_ringbuf_blocking_reserve(void)
+{
+	struct user_ringbuf_success *skel;
+	struct user_ring_buffer *ringbuf;
+	int err, num_written = 0;
+	__u64 *token;
+
+	err = load_skel_create_user_ringbuf(&skel, &ringbuf);
+	if (err)
+		return;
+
+	ASSERT_EQ(skel->bss->read, 0, "num_samples_read_before");
+
+	while (1) {
+		/* Write samples until the buffer is full. */
+		token = user_ring_buffer__reserve(ringbuf, sizeof(*token));
+		if (!token)
+			break;
+
+		*token = 0xdeadbeef;
+
+		user_ring_buffer__submit(ringbuf, token);
+		num_written++;
+	}
+
+	if (!ASSERT_GE(num_written, 0, "num_written"))
+		goto cleanup;
+
+	/* Should not have read any samples until the kernel is kicked. */
+	ASSERT_EQ(skel->bss->read, 0, "num_pre_kick");
+
+	/* We correctly time out after 1 second, without a sample. */
+	token = user_ring_buffer__reserve_blocking(ringbuf, sizeof(*token), 1000);
+	if (!ASSERT_EQ(token, NULL, "pre_kick_timeout_token"))
+		goto cleanup;
+
+	err = spawn_kick_thread_for_poll();
+	if (!ASSERT_EQ(err, 0, "deferred_kick_thread\n"))
+		goto cleanup;
+
+	/* After spawning another thread that asychronously kicks the kernel to
+	 * drain the messages, we're able to block and successfully get a
+	 * sample once we receive an event notification.
+	 */
+	token = user_ring_buffer__reserve_blocking(ringbuf, sizeof(*token), 10000);
+
+	if (CHECK(!token, "block_token", "Failed to block for a ringbuf entry\n"))
+		goto cleanup;
+
+	ASSERT_GT(skel->bss->read, 0, "num_post_kill");
+	ASSERT_LE(skel->bss->read, num_written, "num_post_kill");
+	ASSERT_EQ(skel->bss->err, 0, "err_post_poll");
+	user_ring_buffer__discard(ringbuf, token);
+
+cleanup:
+	user_ring_buffer__free(ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static struct {
+	const char *prog_name;
+	const char *expected_err_msg;
+} failure_tests[] = {
+	/* failure cases */
+	{"user_ringbuf_callback_bad_access1", "negative offset dynptr_ptr ptr"},
+	{"user_ringbuf_callback_bad_access2", "dereference of modified dynptr_ptr ptr"},
+	{"user_ringbuf_callback_write_forbidden", "invalid mem access 'dynptr_ptr'"},
+	{"user_ringbuf_callback_null_context_write", "invalid mem access 'scalar'"},
+	{"user_ringbuf_callback_null_context_read", "invalid mem access 'scalar'"},
+	{"user_ringbuf_callback_discard_dynptr", "arg 1 is an unacquired reference"},
+	{"user_ringbuf_callback_submit_dynptr", "arg 1 is an unacquired reference"},
+	{"user_ringbuf_callback_invalid_return", "At callback return the register R0 has value"},
+};
+
+#define SUCCESS_TEST(_func) { _func, #_func }
+
+static struct {
+	void (*test_callback)(void);
+	const char *test_name;
+} success_tests[] = {
+	SUCCESS_TEST(test_user_ringbuf_mappings),
+	SUCCESS_TEST(test_user_ringbuf_post_misaligned),
+	SUCCESS_TEST(test_user_ringbuf_post_producer_wrong_offset),
+	SUCCESS_TEST(test_user_ringbuf_post_larger_than_ringbuf_sz),
+	SUCCESS_TEST(test_user_ringbuf_basic),
+	SUCCESS_TEST(test_user_ringbuf_sample_full_ringbuffer),
+	SUCCESS_TEST(test_user_ringbuf_post_alignment_autoadjust),
+	SUCCESS_TEST(test_user_ringbuf_overfill),
+	SUCCESS_TEST(test_user_ringbuf_discards_properly_ignored),
+	SUCCESS_TEST(test_user_ringbuf_loop),
+	SUCCESS_TEST(test_user_ringbuf_msg_protocol),
+	SUCCESS_TEST(test_user_ringbuf_blocking_reserve),
+};
+
+static void verify_fail(const char *prog_name, const char *expected_err_msg)
+{
+	LIBBPF_OPTS(bpf_object_open_opts, opts);
+	struct bpf_program *prog;
+	struct user_ringbuf_fail *skel;
+	int err;
+
+	opts.kernel_log_buf = obj_log_buf;
+	opts.kernel_log_size = log_buf_sz;
+	opts.kernel_log_level = 1;
+
+	skel = user_ringbuf_fail__open_opts(&opts);
+	if (!ASSERT_OK_PTR(skel, "dynptr_fail__open_opts"))
+		goto cleanup;
+
+	prog = bpf_object__find_program_by_name(skel->obj, prog_name);
+	if (!ASSERT_OK_PTR(prog, "bpf_object__find_program_by_name"))
+		goto cleanup;
+
+	bpf_program__set_autoload(prog, true);
+
+	bpf_map__set_max_entries(skel->maps.user_ringbuf, getpagesize());
+
+	err = user_ringbuf_fail__load(skel);
+	if (!ASSERT_ERR(err, "unexpected load success"))
+		goto cleanup;
+
+	if (!ASSERT_OK_PTR(strstr(obj_log_buf, expected_err_msg), "expected_err_msg")) {
+		fprintf(stderr, "Expected err_msg: %s\n", expected_err_msg);
+		fprintf(stderr, "Verifier output: %s\n", obj_log_buf);
+	}
+
+cleanup:
+	user_ringbuf_fail__destroy(skel);
+}
+
+void test_user_ringbuf(void)
+{
+	int i;
+
+	for (i = 0; i < ARRAY_SIZE(success_tests); i++) {
+		if (!test__start_subtest(success_tests[i].test_name))
+			continue;
+
+		success_tests[i].test_callback();
+	}
+
+	for (i = 0; i < ARRAY_SIZE(failure_tests); i++) {
+		if (!test__start_subtest(failure_tests[i].prog_name))
+			continue;
+
+		verify_fail(failure_tests[i].prog_name, failure_tests[i].expected_err_msg);
+	}
+}
diff --git a/tools/testing/selftests/bpf/progs/user_ringbuf_fail.c b/tools/testing/selftests/bpf/progs/user_ringbuf_fail.c
new file mode 100644
index 000000000000..82aba4529aa9
--- /dev/null
+++ b/tools/testing/selftests/bpf/progs/user_ringbuf_fail.c
@@ -0,0 +1,177 @@
+// SPDX-License-Identifier: GPL-2.0
+/* Copyright (c) 2022 Meta Platforms, Inc. and affiliates. */
+
+#include <linux/bpf.h>
+#include <bpf/bpf_helpers.h>
+#include "bpf_misc.h"
+
+char _license[] SEC("license") = "GPL";
+
+struct sample {
+	int pid;
+	int seq;
+	long value;
+	char comm[16];
+};
+
+struct {
+	__uint(type, BPF_MAP_TYPE_USER_RINGBUF);
+} user_ringbuf SEC(".maps");
+
+static long
+bad_access1(struct bpf_dynptr *dynptr, void *context)
+{
+	const struct sample *sample;
+
+	sample = bpf_dynptr_data(dynptr - 1, 0, sizeof(*sample));
+	bpf_printk("Was able to pass bad pointer %lx\n", (__u64)dynptr - 1);
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to read before the pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_bad_access1(void *ctx)
+{
+	bpf_user_ringbuf_drain(&user_ringbuf, bad_access1, NULL, 0);
+
+	return 0;
+}
+
+static long
+bad_access2(struct bpf_dynptr *dynptr, void *context)
+{
+	const struct sample *sample;
+
+	sample = bpf_dynptr_data(dynptr + 1, 0, sizeof(*sample));
+	bpf_printk("Was able to pass bad pointer %lx\n", (__u64)dynptr + 1);
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to read past the end of the pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_bad_access2(void *ctx)
+{
+	bpf_user_ringbuf_drain(&user_ringbuf, bad_access2, NULL, 0);
+
+	return 0;
+}
+
+static long
+write_forbidden(struct bpf_dynptr *dynptr, void *context)
+{
+	*((long *)dynptr) = 0;
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to write to that pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_write_forbidden(void *ctx)
+{
+	bpf_user_ringbuf_drain(&user_ringbuf, write_forbidden, NULL, 0);
+
+	return 0;
+}
+
+static long
+null_context_write(struct bpf_dynptr *dynptr, void *context)
+{
+	*((__u64 *)context) = 0;
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to write to that pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_null_context_write(void *ctx)
+{
+	bpf_user_ringbuf_drain(&user_ringbuf, null_context_write, NULL, 0);
+
+	return 0;
+}
+
+static long
+null_context_read(struct bpf_dynptr *dynptr, void *context)
+{
+	__u64 id = *((__u64 *)context);
+
+	bpf_printk("Read id %lu\n", id);
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to write to that pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_null_context_read(void *ctx)
+{
+	bpf_user_ringbuf_drain(&user_ringbuf, null_context_read, NULL, 0);
+
+	return 0;
+}
+
+static long
+try_discard_dynptr(struct bpf_dynptr *dynptr, void *context)
+{
+	bpf_ringbuf_discard_dynptr(dynptr, 0);
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to read past the end of the pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_discard_dynptr(void *ctx)
+{
+	bpf_user_ringbuf_drain(&user_ringbuf, try_discard_dynptr, NULL, 0);
+
+	return 0;
+}
+
+static long
+try_submit_dynptr(struct bpf_dynptr *dynptr, void *context)
+{
+	bpf_ringbuf_submit_dynptr(dynptr, 0);
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to read past the end of the pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_submit_dynptr(void *ctx)
+{
+	bpf_user_ringbuf_drain(&user_ringbuf, try_submit_dynptr, NULL, 0);
+
+	return 0;
+}
+
+static long
+invalid_drain_callback_return(struct bpf_dynptr *dynptr, void *context)
+{
+	return 2;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to write to that pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_invalid_return(void *ctx)
+{
+	bpf_user_ringbuf_drain(&user_ringbuf, invalid_drain_callback_return, NULL, 0);
+
+	return 0;
+}
diff --git a/tools/testing/selftests/bpf/progs/user_ringbuf_success.c b/tools/testing/selftests/bpf/progs/user_ringbuf_success.c
new file mode 100644
index 000000000000..ab8035a47da5
--- /dev/null
+++ b/tools/testing/selftests/bpf/progs/user_ringbuf_success.c
@@ -0,0 +1,220 @@
+// SPDX-License-Identifier: GPL-2.0
+/* Copyright (c) 2022 Meta Platforms, Inc. and affiliates. */
+
+#include <linux/bpf.h>
+#include <bpf/bpf_helpers.h>
+#include "bpf_misc.h"
+#include "../test_user_ringbuf.h"
+
+char _license[] SEC("license") = "GPL";
+
+struct {
+	__uint(type, BPF_MAP_TYPE_USER_RINGBUF);
+} user_ringbuf SEC(".maps");
+
+struct {
+	__uint(type, BPF_MAP_TYPE_RINGBUF);
+} kernel_ringbuf SEC(".maps");
+
+/* inputs */
+int pid, err, val;
+
+int read = 0;
+
+/* Counter used for end-to-end protocol test */
+__u64 kern_mutated = 0;
+__u64 user_mutated = 0;
+__u64 expected_user_mutated = 0;
+
+static int
+is_test_process(void)
+{
+	int cur_pid = bpf_get_current_pid_tgid() >> 32;
+
+	return cur_pid == pid;
+}
+
+static long
+record_sample(struct bpf_dynptr *dynptr, void *context)
+{
+	const struct sample *sample = NULL;
+	struct sample stack_sample;
+	int status;
+	static int num_calls;
+
+	if (num_calls++ % 2 == 0) {
+		status = bpf_dynptr_read(&stack_sample, sizeof(stack_sample),
+					 dynptr, 0, 0);
+		if (status) {
+			bpf_printk("bpf_dynptr_read() failed: %d\n", status);
+			err = 1;
+			return 0;
+		}
+	} else {
+		sample = bpf_dynptr_data(dynptr, 0, sizeof(*sample));
+		if (!sample) {
+			bpf_printk("Unexpectedly failed to get sample\n");
+			err = 2;
+			return 0;
+		}
+		stack_sample = *sample;
+	}
+
+	__sync_fetch_and_add(&read, 1);
+	return 0;
+}
+
+static void
+handle_sample_msg(const struct test_msg *msg)
+{
+	switch (msg->msg_op) {
+	case TEST_MSG_OP_INC64:
+		kern_mutated += msg->operand_64;
+		break;
+	case TEST_MSG_OP_INC32:
+		kern_mutated += msg->operand_32;
+		break;
+	case TEST_MSG_OP_MUL64:
+		kern_mutated *= msg->operand_64;
+		break;
+	case TEST_MSG_OP_MUL32:
+		kern_mutated *= msg->operand_32;
+		break;
+	default:
+		bpf_printk("Unrecognized op %d\n", msg->msg_op);
+		err = 2;
+	}
+}
+
+static long
+read_protocol_msg(struct bpf_dynptr *dynptr, void *context)
+{
+	const struct test_msg *msg = NULL;
+
+	msg = bpf_dynptr_data(dynptr, 0, sizeof(*msg));
+	if (!msg) {
+		err = 1;
+		bpf_printk("Unexpectedly failed to get msg\n");
+		return 0;
+	}
+
+	handle_sample_msg(msg);
+
+	return 0;
+}
+
+static int publish_next_kern_msg(__u32 index, void *context)
+{
+	struct test_msg *msg = NULL;
+	int operand_64 = TEST_OP_64;
+	int operand_32 = TEST_OP_32;
+
+	msg = bpf_ringbuf_reserve(&kernel_ringbuf, sizeof(*msg), 0);
+	if (!msg) {
+		err = 4;
+		return 1;
+	}
+
+	switch (index % TEST_MSG_OP_NUM_OPS) {
+	case TEST_MSG_OP_INC64:
+		msg->operand_64 = operand_64;
+		msg->msg_op = TEST_MSG_OP_INC64;
+		expected_user_mutated += operand_64;
+		break;
+	case TEST_MSG_OP_INC32:
+		msg->operand_32 = operand_32;
+		msg->msg_op = TEST_MSG_OP_INC32;
+		expected_user_mutated += operand_32;
+		break;
+	case TEST_MSG_OP_MUL64:
+		msg->operand_64 = operand_64;
+		msg->msg_op = TEST_MSG_OP_MUL64;
+		expected_user_mutated *= operand_64;
+		break;
+	case TEST_MSG_OP_MUL32:
+		msg->operand_32 = operand_32;
+		msg->msg_op = TEST_MSG_OP_MUL32;
+		expected_user_mutated *= operand_32;
+		break;
+	default:
+		bpf_ringbuf_discard(msg, 0);
+		err = 5;
+		return 1;
+	}
+
+	bpf_ringbuf_submit(msg, 0);
+
+	return 0;
+}
+
+static void
+publish_kern_messages(void)
+{
+	if (expected_user_mutated != user_mutated) {
+		bpf_printk("%d != %d\n", expected_user_mutated, user_mutated);
+		err = 3;
+		return;
+	}
+
+	bpf_loop(8, publish_next_kern_msg, NULL, 0);
+}
+
+SEC("fentry/" SYS_PREFIX "sys_getcwd")
+int test_user_ringbuf_protocol(void *ctx)
+{
+	long status = 0;
+	struct sample *sample = NULL;
+	struct bpf_dynptr ptr;
+
+	if (!is_test_process())
+		return 0;
+
+	status = bpf_user_ringbuf_drain(&user_ringbuf, read_protocol_msg, NULL, 0);
+	if (status < 0) {
+		bpf_printk("Drain returned: %ld\n", status);
+		err = 1;
+		return 0;
+	}
+
+	publish_kern_messages();
+
+	return 0;
+}
+
+SEC("fentry/" SYS_PREFIX "sys_getpgid")
+int test_user_ringbuf(void *ctx)
+{
+	int status = 0;
+	struct sample *sample = NULL;
+	struct bpf_dynptr ptr;
+
+	if (!is_test_process())
+		return 0;
+
+	err = bpf_user_ringbuf_drain(&user_ringbuf, record_sample, NULL, 0);
+
+	return 0;
+}
+
+static long
+do_nothing_cb(struct bpf_dynptr *dynptr, void *context)
+{
+	__sync_fetch_and_add(&read, 1);
+	return 0;
+}
+
+SEC("fentry/" SYS_PREFIX "sys_getrlimit")
+int test_user_ringbuf_epoll(void *ctx)
+{
+	long num_samples;
+
+	if (!is_test_process())
+		return 0;
+
+	num_samples = bpf_user_ringbuf_drain(&user_ringbuf, do_nothing_cb, NULL,
+					     0);
+	if (num_samples <= 0)
+		err = 1;
+
+	return 0;
+}
diff --git a/tools/testing/selftests/bpf/test_user_ringbuf.h b/tools/testing/selftests/bpf/test_user_ringbuf.h
new file mode 100644
index 000000000000..1643b4d59ba7
--- /dev/null
+++ b/tools/testing/selftests/bpf/test_user_ringbuf.h
@@ -0,0 +1,35 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+/* Copyright (c) 2022 Meta Platforms, Inc. and affiliates. */
+
+#ifndef _TEST_USER_RINGBUF_H
+#define _TEST_USER_RINGBUF_H
+
+#define TEST_OP_64 4
+#define TEST_OP_32 2
+
+enum test_msg_op {
+	TEST_MSG_OP_INC64,
+	TEST_MSG_OP_INC32,
+	TEST_MSG_OP_MUL64,
+	TEST_MSG_OP_MUL32,
+
+	// Must come last.
+	TEST_MSG_OP_NUM_OPS,
+};
+
+struct test_msg {
+	enum test_msg_op msg_op;
+	union {
+		__s64 operand_64;
+		__s32 operand_32;
+	};
+};
+
+struct sample {
+	int pid;
+	int seq;
+	long value;
+	char comm[16];
+};
+
+#endif /* _TEST_USER_RINGBUF_H */
-- 
2.37.1


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* Re: [PATCH v3 0/4] bpf: Add user-space-publisher ringbuffer map type
  2022-08-18 22:12 [PATCH v3 0/4] bpf: Add user-space-publisher ringbuffer map type David Vernet
                   ` (3 preceding siblings ...)
  2022-08-18 22:12 ` [PATCH v3 4/4] selftests/bpf: Add selftests validating the user ringbuf David Vernet
@ 2022-08-24 17:38 ` Daniel Borkmann
  2022-08-30 13:50   ` David Vernet
  4 siblings, 1 reply; 16+ messages in thread
From: Daniel Borkmann @ 2022-08-24 17:38 UTC (permalink / raw)
  To: David Vernet, bpf, ast, andrii
  Cc: kernel-team, martin.lau, song, yhs, john.fastabend, kpsingh, sdf,
	haoluo, jolsa, joannelkoong, tj, linux-kernel

Hey David,

On 8/19/22 12:12 AM, David Vernet wrote:
> This patch set defines a new map type, BPF_MAP_TYPE_USER_RINGBUF, which
> provides single-user-space-producer / single-kernel-consumer semantics over
> a ringbuffer.  Along with the new map type, a helper function called
> bpf_user_ringbuf_drain() is added which allows a BPF program to specify a
> callback with the following signature, to which samples are posted by the
> helper:

Looks like this series fail BPF CI, ptal:

https://github.com/kernel-patches/bpf/runs/7996821883?check_suite_focus=true

   [...]
   bpftool_checks - Running bpftool checks...
   Comparing /tmp/work/bpf/bpf/tools/include/uapi/linux/bpf.h (bpf_map_type) and /tmp/work/bpf/bpf/tools/bpf/bpftool/map.c (do_help() TYPE): {'user_ringbuf'}
   Comparing /tmp/work/bpf/bpf/tools/include/uapi/linux/bpf.h (bpf_map_type) and /tmp/work/bpf/bpf/tools/bpf/bpftool/Documentation/bpftool-map.rst (TYPE): {'user_ringbuf'}
   bpftool checks returned 1.
   [...]

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH v3 1/4] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF map type
  2022-08-18 22:12 ` [PATCH v3 1/4] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF " David Vernet
@ 2022-08-24 20:52   ` Andrii Nakryiko
  0 siblings, 0 replies; 16+ messages in thread
From: Andrii Nakryiko @ 2022-08-24 20:52 UTC (permalink / raw)
  To: David Vernet
  Cc: bpf, ast, andrii, daniel, kernel-team, martin.lau, song, yhs,
	john.fastabend, kpsingh, sdf, haoluo, jolsa, joannelkoong, tj,
	linux-kernel

On Thu, Aug 18, 2022 at 3:12 PM David Vernet <void@manifault.com> wrote:
>
> We want to support a ringbuf map type where samples are published from
> user-space, to be consumed by BPF programs. BPF currently supports a kernel
> -> user-space circular ringbuffer via the BPF_MAP_TYPE_RINGBUF map type.
> We'll need to define a new map type for user-space -> kernel, as none of
> the helpers exported for BPF_MAP_TYPE_RINGBUF will apply to a user-space
> producer ringbuffer, and we'll want to add one or more helper functions
> that would not apply for a kernel-producer ringbuffer.
>
> This patch therefore adds a new BPF_MAP_TYPE_USER_RINGBUF map type
> definition. The map type is useless in its current form, as there is no way
> to access or use it for anything until we one or more BPF helpers. A
> follow-on patch will therefore add a new helper function that allows BPF
> programs to run callbacks on samples that are published to the ringbuffer.
>
> Signed-off-by: David Vernet <void@manifault.com>
> ---

LGTM.

Acked-by: Andrii Nakryiko <andrii@kernel.org>

>  include/linux/bpf_types.h      |  1 +
>  include/uapi/linux/bpf.h       |  1 +
>  kernel/bpf/ringbuf.c           | 62 ++++++++++++++++++++++++++++++----
>  kernel/bpf/verifier.c          |  3 ++
>  tools/include/uapi/linux/bpf.h |  1 +
>  tools/lib/bpf/libbpf.c         |  1 +
>  6 files changed, 63 insertions(+), 6 deletions(-)

[...]

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH v3 2/4] bpf: Add bpf_user_ringbuf_drain() helper
  2022-08-18 22:12 ` [PATCH v3 2/4] bpf: Add bpf_user_ringbuf_drain() helper David Vernet
@ 2022-08-24 21:22   ` Andrii Nakryiko
  2022-08-30 13:28     ` David Vernet
  0 siblings, 1 reply; 16+ messages in thread
From: Andrii Nakryiko @ 2022-08-24 21:22 UTC (permalink / raw)
  To: David Vernet
  Cc: bpf, ast, andrii, daniel, kernel-team, martin.lau, song, yhs,
	john.fastabend, kpsingh, sdf, haoluo, jolsa, joannelkoong, tj,
	linux-kernel

On Thu, Aug 18, 2022 at 3:12 PM David Vernet <void@manifault.com> wrote:
>
> In a prior change, we added a new BPF_MAP_TYPE_USER_RINGBUF map type which
> will allow user-space applications to publish messages to a ringbuffer that
> is consumed by a BPF program in kernel-space. In order for this map-type to
> be useful, it will require a BPF helper function that BPF programs can
> invoke to drain samples from the ringbuffer, and invoke callbacks on those
> samples. This change adds that capability via a new BPF helper function:
>
> bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx,
>                        u64 flags)
>
> BPF programs may invoke this function to run callback_fn() on a series of
> samples in the ringbuffer. callback_fn() has the following signature:
>
> long callback_fn(struct bpf_dynptr *dynptr, void *context);
>
> Samples are provided to the callback in the form of struct bpf_dynptr *'s,
> which the program can read using BPF helper functions for querying
> struct bpf_dynptr's.
>
> In order to support bpf_ringbuf_drain(), a new PTR_TO_DYNPTR register
> type is added to the verifier to reflect a dynptr that was allocated by
> a helper function and passed to a BPF program. Unlike PTR_TO_STACK
> dynptrs which are allocated on the stack by a BPF program, PTR_TO_DYNPTR
> dynptrs need not use reference tracking, as the BPF helper is trusted to
> properly free the dynptr before returning. The verifier currently only
> supports PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL.
>
> Note that while the corresponding user-space libbpf logic will be added in
> a subsequent patch, this patch does contain an implementation of the
> .map_poll() callback for BPF_MAP_TYPE_USER_RINGBUF maps. This .map_poll()
> callback guarantees that an epoll-waiting user-space producer will
> receive at least one event notification whenever at least one sample is
> drained in an invocation of bpf_user_ringbuf_drain(), provided that the
> function is not invoked with the BPF_RB_NO_WAKEUP flag.
>
> Sending an event notification for every sample is not an option, as it
> could cause the system to hang due to invoking irq_work_queue() in
> too-frequent succession. So as to try and optimize for the common case,
> however, bpf_user_ringbuf_drain() will also send an event notification
> whenever a sample being drained causes the ringbuffer to no longer be
> full. This heuristic may not help some user-space producers, as a
> producer can publish samples of varying size, and there may not be
> enough space in the ringbuffer after the first sample is drained which
> causes it to no longer be full. In this case, the producer may have to
> wait until bpf_ringbuf_drain() returns to receive an event notification.
>
> Signed-off-by: David Vernet <void@manifault.com>
> ---
>  include/linux/bpf.h            |  11 +-
>  include/uapi/linux/bpf.h       |  36 ++++++
>  kernel/bpf/helpers.c           |   2 +
>  kernel/bpf/ringbuf.c           | 210 ++++++++++++++++++++++++++++++++-
>  kernel/bpf/verifier.c          |  72 +++++++++--
>  tools/include/uapi/linux/bpf.h |  36 ++++++
>  6 files changed, 352 insertions(+), 15 deletions(-)
>
> diff --git a/include/linux/bpf.h b/include/linux/bpf.h
> index a627a02cf8ab..515d712fd4a5 100644
> --- a/include/linux/bpf.h
> +++ b/include/linux/bpf.h
> @@ -401,7 +401,7 @@ enum bpf_type_flag {
>         /* DYNPTR points to memory local to the bpf program. */
>         DYNPTR_TYPE_LOCAL       = BIT(8 + BPF_BASE_TYPE_BITS),
>
> -       /* DYNPTR points to a ringbuf record. */
> +       /* DYNPTR points to a kernel-produced ringbuf record. */
>         DYNPTR_TYPE_RINGBUF     = BIT(9 + BPF_BASE_TYPE_BITS),
>
>         /* Size is known at compile time. */
> @@ -606,6 +606,7 @@ enum bpf_reg_type {
>         PTR_TO_MEM,              /* reg points to valid memory region */
>         PTR_TO_BUF,              /* reg points to a read/write buffer */
>         PTR_TO_FUNC,             /* reg points to a bpf program function */
> +       PTR_TO_DYNPTR,           /* reg points to a dynptr */
>         __BPF_REG_TYPE_MAX,
>
>         /* Extended reg_types. */
> @@ -1333,6 +1334,11 @@ struct bpf_array {
>  #define BPF_MAP_CAN_READ       BIT(0)
>  #define BPF_MAP_CAN_WRITE      BIT(1)
>
> +/* Maximum number of user-producer ringbuffer samples that can be drained in
> + * a call to bpf_user_ringbuf_drain().
> + */
> +#define BPF_MAX_USER_RINGBUF_SAMPLES BIT(17)

nit: I don't think using BIT() is appropriate here. 128 * 1024 would
be better, IMO. This is not inherently required to be a single bit
constant.

> +
>  static inline u32 bpf_map_flags_to_cap(struct bpf_map *map)
>  {
>         u32 access_flags = map->map_flags & (BPF_F_RDONLY_PROG | BPF_F_WRONLY_PROG);
> @@ -2411,6 +2417,7 @@ extern const struct bpf_func_proto bpf_loop_proto;
>  extern const struct bpf_func_proto bpf_copy_from_user_task_proto;
>  extern const struct bpf_func_proto bpf_set_retval_proto;
>  extern const struct bpf_func_proto bpf_get_retval_proto;
> +extern const struct bpf_func_proto bpf_user_ringbuf_drain_proto;
>
>  const struct bpf_func_proto *tracing_prog_func_proto(
>    enum bpf_func_id func_id, const struct bpf_prog *prog);
> @@ -2555,7 +2562,7 @@ enum bpf_dynptr_type {
>         BPF_DYNPTR_TYPE_INVALID,
>         /* Points to memory that is local to the bpf program */
>         BPF_DYNPTR_TYPE_LOCAL,
> -       /* Underlying data is a ringbuf record */
> +       /* Underlying data is a kernel-produced ringbuf record */
>         BPF_DYNPTR_TYPE_RINGBUF,
>  };
>
> diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
> index 3aee7681fa68..25c599d9adf8 100644
> --- a/include/uapi/linux/bpf.h
> +++ b/include/uapi/linux/bpf.h
> @@ -5356,6 +5356,41 @@ union bpf_attr {
>   *     Return
>   *             Current *ktime*.
>   *
> + * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
> + *     Description
> + *             Drain samples from the specified user ringbuffer, and invoke the
> + *             provided callback for each such sample:
> + *
> + *             long (\*callback_fn)(struct bpf_dynptr \*dynptr, void \*ctx);
> + *
> + *             If **callback_fn** returns 0, the helper will continue to try
> + *             and drain the next sample, up to a maximum of
> + *             BPF_MAX_USER_RINGBUF_SAMPLES samples. If the return value is 1,
> + *             the helper will skip the rest of the samples and return. Other
> + *             return values are not used now, and will be rejected by the
> + *             verifier.
> + *     Return
> + *             The number of drained samples if no error was encountered while
> + *             draining samples. If a user-space producer was epoll-waiting on
> + *             this map, and at least one sample was drained, they will
> + *             receive an event notification notifying them of available space
> + *             in the ringbuffer. If the BPF_RB_NO_WAKEUP flag is passed to
> + *             this function, no wakeup notification will be sent. If there
> + *             are no samples in the ringbuffer, 0 is returned.
> + *
> + *             On failure, the returned value is one of the following:
> + *
> + *             **-EBUSY** if the ringbuffer is contended, and another calling
> + *             context was concurrently draining the ringbuffer.
> + *
> + *             **-EINVAL** if user-space is not properly tracking the
> + *             ringbuffer due to the producer position not being aligned to 8

s/ringbuffer/ring buffer/ everywhere to be more human-readable and
consistent with bpf_ringbuf_xxx() descriptions?

> + *             bytes, a sample not being aligned to 8 bytes, the producer
> + *             position not matching the advertised length of a sample, or the
> + *             sample size being larger than the ringbuffer.
> + *
> + *             **-E2BIG** if user-space has tried to publish a sample that
> + *             cannot fit within a struct bpf_dynptr.


"sample size being larger than the ringbuffer" is documented above for
-EINVAL, so it's ambiguous if it's E2BIG or EINVAL?

>   */
>  #define __BPF_FUNC_MAPPER(FN)          \
>         FN(unspec),                     \
> @@ -5567,6 +5602,7 @@ union bpf_attr {
>         FN(tcp_raw_check_syncookie_ipv4),       \
>         FN(tcp_raw_check_syncookie_ipv6),       \
>         FN(ktime_get_tai_ns),           \
> +       FN(user_ringbuf_drain),         \
>         /* */
>
>  /* integer value in 'imm' field of BPF_CALL instruction selects which helper
> diff --git a/kernel/bpf/helpers.c b/kernel/bpf/helpers.c
> index 3c1b9bbcf971..9141eae0ca67 100644
> --- a/kernel/bpf/helpers.c
> +++ b/kernel/bpf/helpers.c
> @@ -1661,6 +1661,8 @@ bpf_base_func_proto(enum bpf_func_id func_id)
>                 return &bpf_dynptr_write_proto;
>         case BPF_FUNC_dynptr_data:
>                 return &bpf_dynptr_data_proto;
> +       case BPF_FUNC_user_ringbuf_drain:
> +               return &bpf_user_ringbuf_drain_proto;
>         default:
>                 break;
>         }
> diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
> index 0a8de712ecbe..3818398e57de 100644
> --- a/kernel/bpf/ringbuf.c
> +++ b/kernel/bpf/ringbuf.c
> @@ -38,6 +38,22 @@ struct bpf_ringbuf {
>         struct page **pages;
>         int nr_pages;
>         spinlock_t spinlock ____cacheline_aligned_in_smp;
> +       /* For user-space producer ringbuffers, an atomic_t busy bit is used to
> +        * synchronize access to the ringbuffer in the kernel, rather than the
> +        * spinlock that is used for kernel-producer ringbuffers. This is done
> +        * because the ringbuffer must hold a lock across a BPF program's

ditto about ringbuffer -> ring buffer (though here it's probably fine
to just use short ringbuf), Gmail also doesn't like "ringbuffer" ;)

> +        * callback:
> +        *
> +        *    __bpf_user_ringbuf_peek() // lock acquired
> +        * -> program callback_fn()
> +        * -> __bpf_user_ringbuf_sample_release() // lock released
> +        *
> +        * It is unsafe and incorrect to hold an IRQ spinlock across what could
> +        * be a long execution window, so we instead simply disallow concurrent
> +        * access to the ringbuffer by kernel consumers, and return -EBUSY from
> +        * __bpf_user_ringbuf_peek() if the busy bit is held by another task.
> +        */

[...]

> +       if (flags & BPF_RINGBUF_DISCARD_BIT) {
> +               /* If the discard bit is set, the sample should be ignored, and
> +                * we can instead try to read the next one.
> +                *
> +                * Synchronizes with smp_load_acquire() in the user-space
> +                * producer, and smp_load_acquire() in
> +                * __bpf_user_ringbuf_peek() above.
> +                */
> +               smp_store_release(&rb->consumer_pos, cons_pos + total_len);
> +               goto retry;

so given fast enough user-space producer, we can make kernel spend a
lot of time looping and retrying here if we just commit discarded
samples. And we won't be taking into account
BPF_MAX_USER_RINGBUF_SAMPLES for those discards. That seems like a bit
of a hole in the logic... would it be better to return with -EAGAIN
for discard samples and let drain logic skip over them?

> +       }
> +
> +       if (flags & BPF_RINGBUF_BUSY_BIT) {
> +               err = -ENODATA;
> +               goto err_unlock;
> +       }
> +
> +       *sample = (void *)((uintptr_t)rb->data +
> +                          (uintptr_t)((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
> +       *size = sample_len;
> +       return 0;
> +
> +err_unlock:
> +       atomic_set(&rb->busy, 0);
> +       return err;
> +}
> +
> +static void __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, u64 flags)
> +{
> +       u64 producer_pos, consumer_pos;
> +
> +       /* Synchronizes with smp_store_release() in user-space producer. */
> +       producer_pos = smp_load_acquire(&rb->producer_pos);
> +
> +       /* Using smp_load_acquire() is unnecessary here, as the busy-bit
> +        * prevents another task from writing to consumer_pos after it was read
> +        * by this task with smp_load_acquire() in __bpf_user_ringbuf_peek().
> +        */
> +       consumer_pos = rb->consumer_pos;
> +        /* Synchronizes with smp_load_acquire() in user-space producer. */
> +       smp_store_release(&rb->consumer_pos, consumer_pos + size + BPF_RINGBUF_HDR_SZ);
> +
> +       /* Prevent the clearing of the busy-bit from being reordered before the
> +        * storing of the updated rb->consumer_pos value.
> +        */
> +       smp_mb__before_atomic();
> +       atomic_set(&rb->busy, 0);
> +
> +       if (!(flags & BPF_RB_NO_WAKEUP)) {
> +               /* As a heuristic, if the previously consumed sample caused the
> +                * ringbuffer to no longer be full, send an event notification
> +                * to any user-space producer that is epoll-waiting.
> +                */
> +               if (producer_pos - consumer_pos == ringbuf_total_data_sz(rb))

I'm a bit confused here. This will be true only if user-space producer
filled out entire ringbuf data *exactly* to the last byte with a
single record. Or am I misunderstanding this?

If my understanding is correct, how is this a realistic use case and
how does this heuristic help at all?

> +                       irq_work_queue(&rb->work);
> +
> +       }
> +}
> +
> +BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
> +          void *, callback_fn, void *, callback_ctx, u64, flags)
> +{
> +       struct bpf_ringbuf *rb;
> +       long num_samples = 0, ret = 0;
> +       bpf_callback_t callback = (bpf_callback_t)callback_fn;
> +       u64 wakeup_flags = BPF_RB_NO_WAKEUP;
> +
> +       if (unlikely(flags & ~wakeup_flags))

hm... so if we specify BPF_RB_FORCE_WAKEUP we'll reject this? Why? Why
not allow both? And why use u64 variable to store BPF_RB_NO_WAKEUP
constant, just use constant right here?

> +               return -EINVAL;
> +
> +       rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
> +       do {
> +               int err;
> +               u32 size;
> +               void *sample;
> +               struct bpf_dynptr_kern dynptr;
> +

[...]

> @@ -7323,22 +7366,35 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
>                 }
>                 break;
>         case BPF_FUNC_dynptr_data:
> +               helper_allocated_dynptr = false;
>                 for (i = 0; i < MAX_BPF_FUNC_REG_ARGS; i++) {
>                         if (arg_type_is_dynptr(fn->arg_type[i])) {
> -                               if (meta.ref_obj_id) {
> -                                       verbose(env, "verifier internal error: meta.ref_obj_id already set\n");
> +                               struct bpf_reg_state *reg = &regs[BPF_REG_1 + i];
> +
> +                               if (helper_allocated_dynptr || meta.ref_obj_id) {
> +                                       verbose(env, "verifier internal error: multiple dynptrs not supported\n");
>                                         return -EFAULT;
>                                 }
> -                               /* Find the id of the dynptr we're tracking the reference of */
> -                               meta.ref_obj_id = stack_slot_get_id(env, &regs[BPF_REG_1 + i]);
> +
> +                               if (base_type(reg->type) == PTR_TO_DYNPTR)
> +                                       helper_allocated_dynptr = true;
> +                               else
> +                                       /* Find the id of the dynptr we're
> +                                        * tracking the reference of
> +                                        */
> +                                       meta.ref_obj_id = stack_slot_get_id(env, reg);
>                                 break;
>                         }
>                 }
> -               if (i == MAX_BPF_FUNC_REG_ARGS) {
> +               if (!helper_allocated_dynptr && i == MAX_BPF_FUNC_REG_ARGS) {

we still expect to get to break in the loop above, right? so there is
no need to special-case !helper_allocated_dynptr, is there?

>                         verbose(env, "verifier internal error: no dynptr in bpf_dynptr_data()\n");
>                         return -EFAULT;
>                 }
>                 break;
> +       case BPF_FUNC_user_ringbuf_drain:
> +               err = __check_func_call(env, insn, insn_idx_p, meta.subprogno,
> +                                       set_user_ringbuf_callback_state);
> +               break;
>         }
>
>         if (err)

[...]

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH v3 3/4] bpf: Add libbpf logic for user-space ring buffer
  2022-08-18 22:12 ` [PATCH v3 3/4] bpf: Add libbpf logic for user-space ring buffer David Vernet
@ 2022-08-24 21:58   ` Andrii Nakryiko
  2022-08-30 13:42     ` David Vernet
  0 siblings, 1 reply; 16+ messages in thread
From: Andrii Nakryiko @ 2022-08-24 21:58 UTC (permalink / raw)
  To: David Vernet
  Cc: bpf, ast, andrii, daniel, kernel-team, martin.lau, song, yhs,
	john.fastabend, kpsingh, sdf, haoluo, jolsa, joannelkoong, tj,
	linux-kernel

On Thu, Aug 18, 2022 at 3:12 PM David Vernet <void@manifault.com> wrote:
>
> Now that all of the logic is in place in the kernel to support user-space
> produced ringbuffers, we can add the user-space logic to libbpf. This
> patch therefore adds the following public symbols to libbpf:
>
> struct user_ring_buffer *
> user_ring_buffer__new(int map_fd,
>                       const struct user_ring_buffer_opts *opts);
> void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size);
> void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb,
>                                          __u32 size, int timeout_ms);
> void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample);
> void user_ring_buffer__discard(struct user_ring_buffer *rb,
> void user_ring_buffer__free(struct user_ring_buffer *rb);
>
> A user-space producer must first create a struct user_ring_buffer * object
> with user_ring_buffer__new(), and can then reserve samples in the
> ringbuffer using one of the following two symbols:
>
> void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size);
> void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb,
>                                          __u32 size, int timeout_ms);
>
> With user_ring_buffer__reserve(), a pointer to an @size region of the
> ringbuffer will be returned if sufficient space is available in the buffer.
> user_ring_buffer__reserve_blocking() provides similar semantics, but will
> block for up to @timeout_ms in epoll_wait if there is insufficient space in
> the buffer. This function has the guarantee from the kernel that it will
> receive at least one event-notification per invocation to
> bpf_ringbuf_drain(), provided that at least one sample is drained, and the
> BPF program did not pass the BPF_RB_NO_WAKEUP flag to bpf_ringbuf_drain().
>
> Once a sample is reserved, it must either be committed to the ringbuffer
> with user_ring_buffer__submit(), or discarded with
> user_ring_buffer__discard().
>
> Signed-off-by: David Vernet <void@manifault.com>
> ---
>  tools/lib/bpf/libbpf.c        |  10 +-
>  tools/lib/bpf/libbpf.h        |  21 +++
>  tools/lib/bpf/libbpf.map      |   6 +
>  tools/lib/bpf/libbpf_probes.c |   1 +
>  tools/lib/bpf/ringbuf.c       | 327 ++++++++++++++++++++++++++++++++++
>  5 files changed, 363 insertions(+), 2 deletions(-)
>

[...]

> +LIBBPF_API struct user_ring_buffer *
> +user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts);
> +LIBBPF_API void *user_ring_buffer__reserve(struct user_ring_buffer *rb,
> +                                          __u32 size);
> +
> +LIBBPF_API void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb,
> +                                                   __u32 size,
> +                                                   int timeout_ms);
> +LIBBPF_API void user_ring_buffer__submit(struct user_ring_buffer *rb,
> +                                        void *sample);
> +LIBBPF_API void user_ring_buffer__discard(struct user_ring_buffer *rb,
> +                                         void *sample);
> +LIBBPF_API void user_ring_buffer__free(struct user_ring_buffer *rb);
> +

Let's make sure that all the relevant comments and description of
inputs/outputs/errors are documented here. These doccomments go to
https://libbpf.readthedocs.io/en/latest/api.html


also, please make sure that declarations that fit within 100
characters stay on single line, it's much more readable that way

>  /* Perf buffer APIs */
>  struct perf_buffer;
>
> diff --git a/tools/lib/bpf/libbpf.map b/tools/lib/bpf/libbpf.map
> index 2b928dc21af0..40c83563f90a 100644
> --- a/tools/lib/bpf/libbpf.map
> +++ b/tools/lib/bpf/libbpf.map
> @@ -367,4 +367,10 @@ LIBBPF_1.0.0 {

now that 1.0 is released, this will have to go into a new LIBBPF_1.1.0
section (which inherits from LIBBPF_1.0.0)

>                 libbpf_bpf_map_type_str;
>                 libbpf_bpf_prog_type_str;
>                 perf_buffer__buffer;
> +               user_ring_buffer__discard;
> +               user_ring_buffer__free;
> +               user_ring_buffer__new;
> +               user_ring_buffer__reserve;
> +               user_ring_buffer__reserve_blocking;
> +               user_ring_buffer__submit;
>  };

[...]

> +       /* Map read-write the producer page and data pages. We map the data
> +        * region as twice the total size of the ringbuffer to allow the simple
> +        * reading and writing of samples that wrap around the end of the
> +        * buffer.  See the kernel implementation for details.
> +        */
> +       tmp = mmap(NULL, rb->page_size + 2 * info.max_entries,
> +                  PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, rb->page_size);
> +       if (tmp == MAP_FAILED) {
> +               err = -errno;
> +               pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %d\n",
> +                       map_fd, err);
> +               return libbpf_err(err);
> +       }
> +
> +       rb->producer_pos = tmp;
> +       rb->data = tmp + rb->page_size;
> +
> +       rb_epoll = &rb->event;
> +       rb_epoll->events = EPOLLOUT;
> +       if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, rb_epoll) < 0) {
> +               err = -errno;
> +               pr_warn("user ringbuf: failed to epoll add map fd=%d: %d\n", map_fd, err);
> +               return libbpf_err(err);

this is internal helper function, so there is no need to use
libbpf_err() helpers, just return errors directly. Only user-facing
functions should make sure to set both errno and return error

> +       }
> +
> +       return 0;
> +}
> +
> +struct user_ring_buffer *
> +user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts)
> +{
> +       struct user_ring_buffer *rb;
> +       int err;
> +
> +       if (!OPTS_VALID(opts, ring_buffer_opts))

user_ring_buffer_opts

> +               return errno = EINVAL, NULL;
> +
> +       rb = calloc(1, sizeof(*rb));
> +       if (!rb)
> +               return errno = ENOMEM, NULL;
> +
> +       rb->page_size = getpagesize();
> +
> +       rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
> +       if (rb->epoll_fd < 0) {
> +               err = -errno;
> +               pr_warn("user ringbuf: failed to create epoll instance: %d\n", err);
> +               goto err_out;
> +       }
> +
> +       err = user_ringbuf_map(rb, map_fd);
> +       if (err)
> +               goto err_out;
> +
> +       return rb;
> +
> +err_out:
> +       user_ring_buffer__free(rb);
> +       return errno = -err, NULL;
> +}
> +
> +static void user_ringbuf__commit(struct user_ring_buffer *rb, void *sample, bool discard)
> +{
> +       __u32 new_len;
> +       struct ringbuf_hdr *hdr;
> +
> +       /* All samples are aligned to 8 bytes, so the header will only ever
> +        * wrap around the back of the ringbuffer if the sample is at the
> +        * very beginning of the ringbuffer.
> +        */
> +       if (sample == rb->data)
> +               hdr = rb->data + (rb->mask - BPF_RINGBUF_HDR_SZ + 1);
> +       else
> +               hdr = sample - BPF_RINGBUF_HDR_SZ;

let's avoid extra if in a hot path?

hdr = rb->data + (rb->mask + 1 + (sample - rb->data) -
BPF_RINGBUF_HDR_SZ) & rb->mask;

> +
> +       new_len = hdr->len & ~BPF_RINGBUF_BUSY_BIT;
> +       if (discard)
> +               new_len |= BPF_RINGBUF_DISCARD_BIT;
> +
> +       /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in
> +        * the kernel.
> +        */
> +       __atomic_exchange_n(&hdr->len, new_len, __ATOMIC_ACQ_REL);
> +}
> +
> +/* Discard a previously reserved sample into the ring buffer.  It is not
> + * necessary to synchronize amongst multiple producers when invoking this
> + * function.
> + */
> +void user_ring_buffer__discard(struct user_ring_buffer *rb, void *sample)
> +{
> +       user_ringbuf__commit(rb, sample, true);
> +}
> +
> +/* Submit a previously reserved sample into the ring buffer. It is not
> + * necessary to synchronize amongst multiple producers when invoking this
> + * function.
> + */
> +void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample)
> +{
> +       user_ringbuf__commit(rb, sample, false);
> +}
> +
> +/* Reserve a pointer to a sample in the user ring buffer. This function is
> + * *not* thread safe, and callers must synchronize accessing this function if
> + * there are multiple producers.
> + *
> + * If a size is requested that is larger than the size of the entire
> + * ringbuffer, errno is set to E2BIG and NULL is returned. If the ringbuffer
> + * could accommodate the size, but currently does not have enough space, errno
> + * is set to ENODATA and NULL is returned.

ENOSPC seems more appropriate for such a situation?

> + *
> + * Otherwise, a pointer to the sample is returned. After initializing the
> + * sample, callers must invoke user_ring_buffer__submit() to post the sample to
> + * the kernel. Otherwise, the sample must be freed with
> + * user_ring_buffer__discard().
> + */

usual complaints about "ringbuffer", feels like a typo


> +void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size)
> +{
> +       __u32 avail_size, total_size, max_size;
> +       /* 64-bit to avoid overflow in case of extreme application behavior */
> +       __u64 cons_pos, prod_pos;
> +       struct ringbuf_hdr *hdr;
> +
> +       /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_peek() in
> +        * the kernel.
> +        */
> +       cons_pos = smp_load_acquire(rb->consumer_pos);
> +       /* Synchronizes with smp_store_release() in user_ringbuf__commit() */
> +       prod_pos = smp_load_acquire(rb->producer_pos);
> +
> +       /* Round up size to a multiple of 8. */
> +       size = (size + 7) / 8 * 8;
> +       max_size = rb->mask + 1;
> +       avail_size = max_size - (prod_pos - cons_pos);
> +       total_size = size + BPF_RINGBUF_HDR_SZ;
> +
> +       if (total_size > max_size)
> +               return errno = E2BIG, NULL;
> +
> +       if (avail_size < total_size)
> +               return errno = ENODATA, NULL;
> +
> +       hdr = rb->data + (prod_pos & rb->mask);
> +       hdr->len = size | BPF_RINGBUF_BUSY_BIT;

so I double-checked what kernel ringbuf is doing with size. We still
record exact user-requested size in header, but all the logic knows
that it has to be rounded up to closest 8. I think that's best
behavior because it preserves user-supplied information exactly. So if
I wanted to reserve and communicate 4 byte sample to my producers,
they should see that there are 4 bytes of data available, not 8. So
let's do the same here?

We still should validate that all the positions are multiples of 8, of
course, as you do in this revision.

> +       hdr->pad = 0;
> +
> +       /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in
> +        * the kernel.
> +        */
> +       smp_store_release(rb->producer_pos, prod_pos + total_size);
> +
> +       return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask);
> +}
> +
> +static int ms_elapsed_timespec(const struct timespec *start, const struct timespec *end)
> +{
> +       int total, ns_per_ms = 1000000, ns_per_s = ns_per_ms * 1000;
> +
> +       if (end->tv_sec > start->tv_sec) {
> +               total = 1000 * (end->tv_sec - start->tv_sec);
> +               total += (end->tv_nsec + (ns_per_s - start->tv_nsec)) / ns_per_ms;
> +       } else {
> +               total = (end->tv_nsec - start->tv_nsec) / ns_per_ms;
> +       }
> +

hm... this seems overengineered, tbh

u64 start_ns = (u64)start->tv_sec * 1000000000 + start->tv_nsec;
u64 end_ns = (u64)end->tv_sec * 1000000000 + start->tv_nsec;

return (end_ns - start_ns) / 1000000;

?

> +       return total;
> +}
> +
> +/* Reserve a record in the ringbuffer, possibly blocking for up to @timeout_ms
> + * until a sample becomes available.  This function is *not* thread safe, and
> + * callers must synchronize accessing this function if there are multiple
> + * producers.
> + *
> + * If @timeout_ms is -1, the function will block indefinitely until a sample
> + * becomes available. Otherwise, @timeout_ms must be non-negative, or errno
> + * will be set to EINVAL, and NULL will be returned. If @timeout_ms is 0,
> + * no blocking will occur and the function will return immediately after
> + * attempting to reserve a sample.
> + *
> + * If @size is larger than the size of the entire ringbuffer, errno is set to
> + * E2BIG and NULL is returned. If the ringbuffer could accommodate @size, but
> + * currently does not have enough space, the caller will block until at most
> + * @timeout_ms has elapsed. If insufficient space is available at that time,
> + * errno will be set to ENODATA, and NULL will be returned.

ENOSPC?

> + *
> + * The kernel guarantees that it will wake up this thread to check if
> + * sufficient space is available in the ringbuffer at least once per invocation
> + * of the bpf_ringbuf_drain() helper function, provided that at least one
> + * sample is consumed, and the BPF program did not invoke the function with
> + * BPF_RB_NO_WAKEUP. A wakeup may occur sooner than that, but the kernel does
> + * not guarantee this.
> + *
> + * When a sample of size @size is found within @timeout_ms, a pointer to the
> + * sample is returned. After initializing the sample, callers must invoke
> + * user_ring_buffer__submit() to post the sample to the ringbuffer. Otherwise,
> + * the sample must be freed with user_ring_buffer__discard().
> + */

so comments like this should go into doccomments for this functions in libbpf.h

> +void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms)
> +{
> +       int ms_elapsed = 0, err;
> +       struct timespec start;
> +
> +       if (timeout_ms < 0 && timeout_ms != -1)
> +               return errno = EINVAL, NULL;
> +
> +       if (timeout_ms != -1) {
> +               err = clock_gettime(CLOCK_MONOTONIC, &start);
> +               if (err)
> +                       return NULL;
> +       }
> +
> +       do {
> +               int cnt, ms_remaining = timeout_ms - ms_elapsed;

let's max(0, timeout_ms - ms_elapsed) to avoid negative ms_remaining
in some edge timing cases

> +               void *sample;
> +               struct timespec curr;
> +
> +               sample = user_ring_buffer__reserve(rb, size);
> +               if (sample)
> +                       return sample;
> +               else if (errno != ENODATA)
> +                       return NULL;
> +
> +               /* The kernel guarantees at least one event notification
> +                * delivery whenever at least one sample is drained from the
> +                * ringbuffer in an invocation to bpf_ringbuf_drain(). Other
> +                * additional events may be delivered at any time, but only one
> +                * event is guaranteed per bpf_ringbuf_drain() invocation,
> +                * provided that a sample is drained, and the BPF program did
> +                * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain().
> +                */
> +               cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining);
> +               if (cnt < 0)
> +                       return NULL;
> +
> +               if (timeout_ms == -1)
> +                       continue;
> +
> +               err = clock_gettime(CLOCK_MONOTONIC, &curr);
> +               if (err)
> +                       return NULL;
> +
> +               ms_elapsed = ms_elapsed_timespec(&start, &curr);
> +       } while (ms_elapsed <= timeout_ms);

let's simplify all the time keeping to use nanosecond timestamps and
only convert to ms when calling epoll_wait()? Then you can just have a
tiny helper to convert timespec to nanosecond ts ((u64)ts.tv_sec *
1000000000 + ts.tv_nsec) and compare u64s directly. WDYT?

> +
> +       errno = ENODATA;
> +       return NULL;
> +}
> --
> 2.37.1
>

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH v3 4/4] selftests/bpf: Add selftests validating the user ringbuf
  2022-08-18 22:12 ` [PATCH v3 4/4] selftests/bpf: Add selftests validating the user ringbuf David Vernet
@ 2022-08-24 22:03   ` Andrii Nakryiko
  2022-08-30 13:46     ` David Vernet
  0 siblings, 1 reply; 16+ messages in thread
From: Andrii Nakryiko @ 2022-08-24 22:03 UTC (permalink / raw)
  To: David Vernet
  Cc: bpf, ast, andrii, daniel, kernel-team, martin.lau, song, yhs,
	john.fastabend, kpsingh, sdf, haoluo, jolsa, joannelkoong, tj,
	linux-kernel

On Thu, Aug 18, 2022 at 3:12 PM David Vernet <void@manifault.com> wrote:
>
> This change includes selftests that validate the expected behavior and
> APIs of the new BPF_MAP_TYPE_USER_RINGBUF map type.
>
> Signed-off-by: David Vernet <void@manifault.com>
> ---
>  .../selftests/bpf/prog_tests/user_ringbuf.c   | 755 ++++++++++++++++++
>  .../selftests/bpf/progs/user_ringbuf_fail.c   | 177 ++++
>  .../bpf/progs/user_ringbuf_success.c          | 220 +++++
>  .../testing/selftests/bpf/test_user_ringbuf.h |  35 +
>  4 files changed, 1187 insertions(+)
>  create mode 100644 tools/testing/selftests/bpf/prog_tests/user_ringbuf.c
>  create mode 100644 tools/testing/selftests/bpf/progs/user_ringbuf_fail.c
>  create mode 100644 tools/testing/selftests/bpf/progs/user_ringbuf_success.c
>  create mode 100644 tools/testing/selftests/bpf/test_user_ringbuf.h
>

[...]

> +       /* Write some number of samples to the ring buffer. */
> +       for (i = 0; i < num_samples; i++) {
> +               struct sample *entry;
> +               int read;
> +
> +               entry = user_ring_buffer__reserve(ringbuf, sizeof(*entry));
> +               if (!entry) {
> +                       err = -errno;
> +                       goto done;
> +               }
> +
> +               entry->pid = getpid();
> +               entry->seq = i;
> +               entry->value = i * i;
> +
> +               read = snprintf(entry->comm, sizeof(entry->comm), "%u", i);
> +               if (read <= 0) {
> +                       /* Only invoke CHECK on the error path to avoid spamming
> +                        * logs with mostly success messages.
> +                        */
> +                       CHECK(read <= 0, "snprintf_comm",
> +                             "Failed to write index %d to comm\n", i);

please, no CHECK() use in new tests, we have ASSERT_xxx() covering all
common cases

> +                       err = read;
> +                       user_ring_buffer__discard(ringbuf, entry);
> +                       goto done;
> +               }
> +
> +               user_ring_buffer__submit(ringbuf, entry);
> +       }
> +

[...]

> +static long
> +bad_access1(struct bpf_dynptr *dynptr, void *context)
> +{
> +       const struct sample *sample;
> +
> +       sample = bpf_dynptr_data(dynptr - 1, 0, sizeof(*sample));
> +       bpf_printk("Was able to pass bad pointer %lx\n", (__u64)dynptr - 1);
> +
> +       return 0;
> +}
> +
> +/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
> + * not be able to read before the pointer.
> + */
> +SEC("?raw_tp/sys_nanosleep")

there is no sys_nanosleep raw tracepoint, use SEC("?raw_tp") to
specify type, that's enough

> +int user_ringbuf_callback_bad_access1(void *ctx)
> +{
> +       bpf_user_ringbuf_drain(&user_ringbuf, bad_access1, NULL, 0);
> +
> +       return 0;
> +}
> +

[...]

> diff --git a/tools/testing/selftests/bpf/test_user_ringbuf.h b/tools/testing/selftests/bpf/test_user_ringbuf.h
> new file mode 100644
> index 000000000000..1643b4d59ba7
> --- /dev/null
> +++ b/tools/testing/selftests/bpf/test_user_ringbuf.h

nit: I'd probably put it under progs/test_user_ringbuf.h so it's
closer to BPF source code. As it is right now, it's neither near
user-space part of tests nor near BPF part.

> @@ -0,0 +1,35 @@
> +/* SPDX-License-Identifier: GPL-2.0 */
> +/* Copyright (c) 2022 Meta Platforms, Inc. and affiliates. */
> +
> +#ifndef _TEST_USER_RINGBUF_H
> +#define _TEST_USER_RINGBUF_H
> +
> +#define TEST_OP_64 4
> +#define TEST_OP_32 2
> +
> +enum test_msg_op {
> +       TEST_MSG_OP_INC64,
> +       TEST_MSG_OP_INC32,
> +       TEST_MSG_OP_MUL64,
> +       TEST_MSG_OP_MUL32,
> +
> +       // Must come last.
> +       TEST_MSG_OP_NUM_OPS,
> +};
> +
> +struct test_msg {
> +       enum test_msg_op msg_op;
> +       union {
> +               __s64 operand_64;
> +               __s32 operand_32;
> +       };
> +};
> +
> +struct sample {
> +       int pid;
> +       int seq;
> +       long value;
> +       char comm[16];
> +};
> +
> +#endif /* _TEST_USER_RINGBUF_H */
> --
> 2.37.1
>

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH v3 2/4] bpf: Add bpf_user_ringbuf_drain() helper
  2022-08-24 21:22   ` Andrii Nakryiko
@ 2022-08-30 13:28     ` David Vernet
  2022-09-09 22:45       ` Andrii Nakryiko
  0 siblings, 1 reply; 16+ messages in thread
From: David Vernet @ 2022-08-30 13:28 UTC (permalink / raw)
  To: Andrii Nakryiko
  Cc: bpf, ast, andrii, daniel, kernel-team, martin.lau, song, yhs,
	john.fastabend, kpsingh, sdf, haoluo, jolsa, joannelkoong, tj,
	linux-kernel

On Wed, Aug 24, 2022 at 02:22:44PM -0700, Andrii Nakryiko wrote:
> > +/* Maximum number of user-producer ringbuffer samples that can be drained in
> > + * a call to bpf_user_ringbuf_drain().
> > + */
> > +#define BPF_MAX_USER_RINGBUF_SAMPLES BIT(17)
> 
> nit: I don't think using BIT() is appropriate here. 128 * 1024 would
> be better, IMO. This is not inherently required to be a single bit
> constant.

No problem, updated.

> > +
> >  static inline u32 bpf_map_flags_to_cap(struct bpf_map *map)
> >  {
> >         u32 access_flags = map->map_flags & (BPF_F_RDONLY_PROG | BPF_F_WRONLY_PROG);
> > @@ -2411,6 +2417,7 @@ extern const struct bpf_func_proto bpf_loop_proto;
> >  extern const struct bpf_func_proto bpf_copy_from_user_task_proto;
> >  extern const struct bpf_func_proto bpf_set_retval_proto;
> >  extern const struct bpf_func_proto bpf_get_retval_proto;
> > +extern const struct bpf_func_proto bpf_user_ringbuf_drain_proto;
> >
> >  const struct bpf_func_proto *tracing_prog_func_proto(
> >    enum bpf_func_id func_id, const struct bpf_prog *prog);
> > @@ -2555,7 +2562,7 @@ enum bpf_dynptr_type {
> >         BPF_DYNPTR_TYPE_INVALID,
> >         /* Points to memory that is local to the bpf program */
> >         BPF_DYNPTR_TYPE_LOCAL,
> > -       /* Underlying data is a ringbuf record */
> > +       /* Underlying data is a kernel-produced ringbuf record */
> >         BPF_DYNPTR_TYPE_RINGBUF,
> >  };
> >
> > diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
> > index 3aee7681fa68..25c599d9adf8 100644
> > --- a/include/uapi/linux/bpf.h
> > +++ b/include/uapi/linux/bpf.h
> > @@ -5356,6 +5356,41 @@ union bpf_attr {
> >   *     Return
> >   *             Current *ktime*.
> >   *
> > + * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
> > + *     Description
> > + *             Drain samples from the specified user ringbuffer, and invoke the
> > + *             provided callback for each such sample:
> > + *
> > + *             long (\*callback_fn)(struct bpf_dynptr \*dynptr, void \*ctx);
> > + *
> > + *             If **callback_fn** returns 0, the helper will continue to try
> > + *             and drain the next sample, up to a maximum of
> > + *             BPF_MAX_USER_RINGBUF_SAMPLES samples. If the return value is 1,
> > + *             the helper will skip the rest of the samples and return. Other
> > + *             return values are not used now, and will be rejected by the
> > + *             verifier.
> > + *     Return
> > + *             The number of drained samples if no error was encountered while
> > + *             draining samples. If a user-space producer was epoll-waiting on
> > + *             this map, and at least one sample was drained, they will
> > + *             receive an event notification notifying them of available space
> > + *             in the ringbuffer. If the BPF_RB_NO_WAKEUP flag is passed to
> > + *             this function, no wakeup notification will be sent. If there
> > + *             are no samples in the ringbuffer, 0 is returned.
> > + *
> > + *             On failure, the returned value is one of the following:
> > + *
> > + *             **-EBUSY** if the ringbuffer is contended, and another calling
> > + *             context was concurrently draining the ringbuffer.
> > + *
> > + *             **-EINVAL** if user-space is not properly tracking the
> > + *             ringbuffer due to the producer position not being aligned to 8
> 
> s/ringbuffer/ring buffer/ everywhere to be more human-readable and
> consistent with bpf_ringbuf_xxx() descriptions?

Done.

> > + *             bytes, a sample not being aligned to 8 bytes, the producer
> > + *             position not matching the advertised length of a sample, or the
> > + *             sample size being larger than the ringbuffer.
> > + *
> > + *             **-E2BIG** if user-space has tried to publish a sample that
> > + *             cannot fit within a struct bpf_dynptr.
> 
> "sample size being larger than the ringbuffer" is documented above for
> -EINVAL, so it's ambiguous if it's E2BIG or EINVAL?

Good point. I'll return -E2BIG for both cases.

> >   */
> >  #define __BPF_FUNC_MAPPER(FN)          \
> >         FN(unspec),                     \
> > @@ -5567,6 +5602,7 @@ union bpf_attr {
> >         FN(tcp_raw_check_syncookie_ipv4),       \
> >         FN(tcp_raw_check_syncookie_ipv6),       \
> >         FN(ktime_get_tai_ns),           \
> > +       FN(user_ringbuf_drain),         \
> >         /* */
> >
> >  /* integer value in 'imm' field of BPF_CALL instruction selects which helper
> > diff --git a/kernel/bpf/helpers.c b/kernel/bpf/helpers.c
> > index 3c1b9bbcf971..9141eae0ca67 100644
> > --- a/kernel/bpf/helpers.c
> > +++ b/kernel/bpf/helpers.c
> > @@ -1661,6 +1661,8 @@ bpf_base_func_proto(enum bpf_func_id func_id)
> >                 return &bpf_dynptr_write_proto;
> >         case BPF_FUNC_dynptr_data:
> >                 return &bpf_dynptr_data_proto;
> > +       case BPF_FUNC_user_ringbuf_drain:
> > +               return &bpf_user_ringbuf_drain_proto;
> >         default:
> >                 break;
> >         }
> > diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
> > index 0a8de712ecbe..3818398e57de 100644
> > --- a/kernel/bpf/ringbuf.c
> > +++ b/kernel/bpf/ringbuf.c
> > @@ -38,6 +38,22 @@ struct bpf_ringbuf {
> >         struct page **pages;
> >         int nr_pages;
> >         spinlock_t spinlock ____cacheline_aligned_in_smp;
> > +       /* For user-space producer ringbuffers, an atomic_t busy bit is used to
> > +        * synchronize access to the ringbuffer in the kernel, rather than the
> > +        * spinlock that is used for kernel-producer ringbuffers. This is done
> > +        * because the ringbuffer must hold a lock across a BPF program's
> 
> ditto about ringbuffer -> ring buffer (though here it's probably fine
> to just use short ringbuf), Gmail also doesn't like "ringbuffer" ;)

I won't pretend to be better at spelling than an AI ;-). Updated to
ring buffer here and in the rest of the file.

> > +        * callback:
> > +        *
> > +        *    __bpf_user_ringbuf_peek() // lock acquired
> > +        * -> program callback_fn()
> > +        * -> __bpf_user_ringbuf_sample_release() // lock released
> > +        *
> > +        * It is unsafe and incorrect to hold an IRQ spinlock across what could
> > +        * be a long execution window, so we instead simply disallow concurrent
> > +        * access to the ringbuffer by kernel consumers, and return -EBUSY from
> > +        * __bpf_user_ringbuf_peek() if the busy bit is held by another task.
> > +        */
> 
> [...]
> 
> > +       if (flags & BPF_RINGBUF_DISCARD_BIT) {
> > +               /* If the discard bit is set, the sample should be ignored, and
> > +                * we can instead try to read the next one.
> > +                *
> > +                * Synchronizes with smp_load_acquire() in the user-space
> > +                * producer, and smp_load_acquire() in
> > +                * __bpf_user_ringbuf_peek() above.
> > +                */
> > +               smp_store_release(&rb->consumer_pos, cons_pos + total_len);
> > +               goto retry;
> 
> so given fast enough user-space producer, we can make kernel spend a
> lot of time looping and retrying here if we just commit discarded
> samples. And we won't be taking into account
> BPF_MAX_USER_RINGBUF_SAMPLES for those discards. That seems like a bit
> of a hole in the logic... would it be better to return with -EAGAIN
> for discard samples and let drain logic skip over them?

I like that idea. I originally had us loop over
BPF_MAX_USER_RINGBUF_SAMPLES times, but then decided to change it to this
as it seemed like a simpler and sufficient solution. In hindsight I agree
that it's incorrect to potentially allow user-space cause the kernel to
busy loop like this. I'll update this to return -EAGAIN and let the drain
logic deal with it.

> > +       }
> > +
> > +       if (flags & BPF_RINGBUF_BUSY_BIT) {
> > +               err = -ENODATA;
> > +               goto err_unlock;
> > +       }
> > +
> > +       *sample = (void *)((uintptr_t)rb->data +
> > +                          (uintptr_t)((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
> > +       *size = sample_len;
> > +       return 0;
> > +
> > +err_unlock:
> > +       atomic_set(&rb->busy, 0);
> > +       return err;
> > +}
> > +
> > +static void __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, u64 flags)
> > +{
> > +       u64 producer_pos, consumer_pos;
> > +
> > +       /* Synchronizes with smp_store_release() in user-space producer. */
> > +       producer_pos = smp_load_acquire(&rb->producer_pos);
> > +
> > +       /* Using smp_load_acquire() is unnecessary here, as the busy-bit
> > +        * prevents another task from writing to consumer_pos after it was read
> > +        * by this task with smp_load_acquire() in __bpf_user_ringbuf_peek().
> > +        */
> > +       consumer_pos = rb->consumer_pos;
> > +        /* Synchronizes with smp_load_acquire() in user-space producer. */
> > +       smp_store_release(&rb->consumer_pos, consumer_pos + size + BPF_RINGBUF_HDR_SZ);
> > +
> > +       /* Prevent the clearing of the busy-bit from being reordered before the
> > +        * storing of the updated rb->consumer_pos value.
> > +        */
> > +       smp_mb__before_atomic();
> > +       atomic_set(&rb->busy, 0);
> > +
> > +       if (!(flags & BPF_RB_NO_WAKEUP)) {
> > +               /* As a heuristic, if the previously consumed sample caused the
> > +                * ringbuffer to no longer be full, send an event notification
> > +                * to any user-space producer that is epoll-waiting.
> > +                */
> > +               if (producer_pos - consumer_pos == ringbuf_total_data_sz(rb))
> 
> I'm a bit confused here. This will be true only if user-space producer
> filled out entire ringbuf data *exactly* to the last byte with a
> single record. Or am I misunderstanding this?

I think you're misunderstanding. This will indeed only be true if the ring
buffer was full (to the last byte as you said) before the last sample was
consumed, but it doesn't have to have been filled with a single record.
We're just checking that producer_pos - consumer_pos is the total size of
the ring buffer, but there can be many samples between consumer_pos and
producer_pos for that to be the case.

> If my understanding is correct, how is this a realistic use case and
> how does this heuristic help at all?

Though I think you may have misunderstood the heuristic, some more
explanation is probably warranted nonetheless. This heuristic being useful
relies on two assumptions:

1. It will be common for user-space to publish statically sized samples.

I think this one is pretty unambiguously true, especially considering that
BPF_MAP_TYPE_RINGBUF was put to great use with statically sized samples for
quite some time. I'm open to hearing why that might not be the case.

2. The size of the ring buffer is a multiple of the size of a sample.

This one I think is a bit less clear. Users can always size the ring buffer
to make sure this will be the case, but whether or not that will be
commonly done is another story.

I'm fine with removing this heuristic for now if it's unclear that it's
serving a common use-case. We can always add it back in later if we want
to.

> > +                       irq_work_queue(&rb->work);
> > +
> > +       }
> > +}
> > +
> > +BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
> > +          void *, callback_fn, void *, callback_ctx, u64, flags)
> > +{
> > +       struct bpf_ringbuf *rb;
> > +       long num_samples = 0, ret = 0;
> > +       bpf_callback_t callback = (bpf_callback_t)callback_fn;
> > +       u64 wakeup_flags = BPF_RB_NO_WAKEUP;
> > +
> > +       if (unlikely(flags & ~wakeup_flags))
> 
> hm... so if we specify BPF_RB_FORCE_WAKEUP we'll reject this? Why? Why
> not allow both? And why use u64 variable to store BPF_RB_NO_WAKEUP
> constant, just use constant right here?

I thought it was prudent to reject it because I observed the kernel hang if
we scheduled too much work in irq_work_queue() while draining samples. I'm
now unable to repro this even after aggressively writing samples and
scheduling work with irq_work_queue(), so let me remove this restriction.
I'm not sure what's changed, but I was able to reproduce this 100% of the
time before, and now not at all. Perhaps there was some other issue in the
kernel that's since been fixed.

> > +               return -EINVAL;
> > +
> > +       rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
> > +       do {
> > +               int err;
> > +               u32 size;
> > +               void *sample;
> > +               struct bpf_dynptr_kern dynptr;
> > +
> 
> [...]
> 
> > @@ -7323,22 +7366,35 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
> >                 }
> >                 break;
> >         case BPF_FUNC_dynptr_data:
> > +               helper_allocated_dynptr = false;
> >                 for (i = 0; i < MAX_BPF_FUNC_REG_ARGS; i++) {
> >                         if (arg_type_is_dynptr(fn->arg_type[i])) {
> > -                               if (meta.ref_obj_id) {
> > -                                       verbose(env, "verifier internal error: meta.ref_obj_id already set\n");
> > +                               struct bpf_reg_state *reg = &regs[BPF_REG_1 + i];
> > +
> > +                               if (helper_allocated_dynptr || meta.ref_obj_id) {
> > +                                       verbose(env, "verifier internal error: multiple dynptrs not supported\n");
> >                                         return -EFAULT;
> >                                 }
> > -                               /* Find the id of the dynptr we're tracking the reference of */
> > -                               meta.ref_obj_id = stack_slot_get_id(env, &regs[BPF_REG_1 + i]);
> > +
> > +                               if (base_type(reg->type) == PTR_TO_DYNPTR)
> > +                                       helper_allocated_dynptr = true;
> > +                               else
> > +                                       /* Find the id of the dynptr we're
> > +                                        * tracking the reference of
> > +                                        */
> > +                                       meta.ref_obj_id = stack_slot_get_id(env, reg);
> >                                 break;
> >                         }
> >                 }
> > -               if (i == MAX_BPF_FUNC_REG_ARGS) {
> > +               if (!helper_allocated_dynptr && i == MAX_BPF_FUNC_REG_ARGS) {
> 
> we still expect to get to break in the loop above, right? so there is
> no need to special-case !helper_allocated_dynptr, is there?

Yes, sorry about that silly mistake, not sure what I was thinking. I'll fix
this in V4.

> >                         verbose(env, "verifier internal error: no dynptr in bpf_dynptr_data()\n");
> >                         return -EFAULT;
> >                 }
> >                 break;
> > +       case BPF_FUNC_user_ringbuf_drain:
> > +               err = __check_func_call(env, insn, insn_idx_p, meta.subprogno,
> > +                                       set_user_ringbuf_callback_state);
> > +               break;
> >         }
> >
> >         if (err)
> 
> [...]

Thanks,
David

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH v3 3/4] bpf: Add libbpf logic for user-space ring buffer
  2022-08-24 21:58   ` Andrii Nakryiko
@ 2022-08-30 13:42     ` David Vernet
  2022-09-09 22:50       ` Andrii Nakryiko
  0 siblings, 1 reply; 16+ messages in thread
From: David Vernet @ 2022-08-30 13:42 UTC (permalink / raw)
  To: Andrii Nakryiko
  Cc: bpf, ast, andrii, daniel, kernel-team, martin.lau, song, yhs,
	john.fastabend, kpsingh, sdf, haoluo, jolsa, joannelkoong, tj,
	linux-kernel

On Wed, Aug 24, 2022 at 02:58:31PM -0700, Andrii Nakryiko wrote:

[...]

> > +LIBBPF_API struct user_ring_buffer *
> > +user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts);
> > +LIBBPF_API void *user_ring_buffer__reserve(struct user_ring_buffer *rb,
> > +                                          __u32 size);
> > +
> > +LIBBPF_API void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb,
> > +                                                   __u32 size,
> > +                                                   int timeout_ms);
> > +LIBBPF_API void user_ring_buffer__submit(struct user_ring_buffer *rb,
> > +                                        void *sample);
> > +LIBBPF_API void user_ring_buffer__discard(struct user_ring_buffer *rb,
> > +                                         void *sample);
> > +LIBBPF_API void user_ring_buffer__free(struct user_ring_buffer *rb);
> > +
> 
> Let's make sure that all the relevant comments and description of
> inputs/outputs/errors are documented here. These doccomments go to
> https://libbpf.readthedocs.io/en/latest/api.html

No problem, I'll add these to the docs.

> also, please make sure that declarations that fit within 100
> characters stay on single line, it's much more readable that way

My mistake, will fix.

> >  /* Perf buffer APIs */
> >  struct perf_buffer;
> >
> > diff --git a/tools/lib/bpf/libbpf.map b/tools/lib/bpf/libbpf.map
> > index 2b928dc21af0..40c83563f90a 100644
> > --- a/tools/lib/bpf/libbpf.map
> > +++ b/tools/lib/bpf/libbpf.map
> > @@ -367,4 +367,10 @@ LIBBPF_1.0.0 {
> 
> now that 1.0 is released, this will have to go into a new LIBBPF_1.1.0
> section (which inherits from LIBBPF_1.0.0)

Sounds good, I'll do that in v4.

[...]

> > +       /* Map read-write the producer page and data pages. We map the data
> > +        * region as twice the total size of the ringbuffer to allow the simple
> > +        * reading and writing of samples that wrap around the end of the
> > +        * buffer.  See the kernel implementation for details.
> > +        */
> > +       tmp = mmap(NULL, rb->page_size + 2 * info.max_entries,
> > +                  PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, rb->page_size);
> > +       if (tmp == MAP_FAILED) {
> > +               err = -errno;
> > +               pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %d\n",
> > +                       map_fd, err);
> > +               return libbpf_err(err);
> > +       }
> > +
> > +       rb->producer_pos = tmp;
> > +       rb->data = tmp + rb->page_size;
> > +
> > +       rb_epoll = &rb->event;
> > +       rb_epoll->events = EPOLLOUT;
> > +       if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, rb_epoll) < 0) {
> > +               err = -errno;
> > +               pr_warn("user ringbuf: failed to epoll add map fd=%d: %d\n", map_fd, err);
> > +               return libbpf_err(err);
> 
> this is internal helper function, so there is no need to use
> libbpf_err() helpers, just return errors directly. Only user-facing
> functions should make sure to set both errno and return error

Will fix.

> > +       }
> > +
> > +       return 0;
> > +}
> > +
> > +struct user_ring_buffer *
> > +user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts)
> > +{
> > +       struct user_ring_buffer *rb;
> > +       int err;
> > +
> > +       if (!OPTS_VALID(opts, ring_buffer_opts))
> 
> user_ring_buffer_opts

Good catch, will fix.

> > +static void user_ringbuf__commit(struct user_ring_buffer *rb, void *sample, bool discard)
> > +{
> > +       __u32 new_len;
> > +       struct ringbuf_hdr *hdr;
> > +
> > +       /* All samples are aligned to 8 bytes, so the header will only ever
> > +        * wrap around the back of the ringbuffer if the sample is at the
> > +        * very beginning of the ringbuffer.
> > +        */
> > +       if (sample == rb->data)
> > +               hdr = rb->data + (rb->mask - BPF_RINGBUF_HDR_SZ + 1);
> > +       else
> > +               hdr = sample - BPF_RINGBUF_HDR_SZ;
> 
> let's avoid extra if in a hot path?
> 
> hdr = rb->data + (rb->mask + 1 + (sample - rb->data) -
> BPF_RINGBUF_HDR_SZ) & rb->mask;

Nice idea, will do.

> > +
> > +       new_len = hdr->len & ~BPF_RINGBUF_BUSY_BIT;
> > +       if (discard)
> > +               new_len |= BPF_RINGBUF_DISCARD_BIT;
> > +
> > +       /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in
> > +        * the kernel.
> > +        */
> > +       __atomic_exchange_n(&hdr->len, new_len, __ATOMIC_ACQ_REL);
> > +}
> > +
> > +/* Discard a previously reserved sample into the ring buffer.  It is not
> > + * necessary to synchronize amongst multiple producers when invoking this
> > + * function.
> > + */
> > +void user_ring_buffer__discard(struct user_ring_buffer *rb, void *sample)
> > +{
> > +       user_ringbuf__commit(rb, sample, true);
> > +}
> > +
> > +/* Submit a previously reserved sample into the ring buffer. It is not
> > + * necessary to synchronize amongst multiple producers when invoking this
> > + * function.
> > + */
> > +void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample)
> > +{
> > +       user_ringbuf__commit(rb, sample, false);
> > +}
> > +
> > +/* Reserve a pointer to a sample in the user ring buffer. This function is
> > + * *not* thread safe, and callers must synchronize accessing this function if
> > + * there are multiple producers.
> > + *
> > + * If a size is requested that is larger than the size of the entire
> > + * ringbuffer, errno is set to E2BIG and NULL is returned. If the ringbuffer
> > + * could accommodate the size, but currently does not have enough space, errno
> > + * is set to ENODATA and NULL is returned.
> 
> ENOSPC seems more appropriate for such a situation?

For the latter? Hmmm, yeah I suppose I agree, I'll make that adjustment.

> > + *
> > + * Otherwise, a pointer to the sample is returned. After initializing the
> > + * sample, callers must invoke user_ring_buffer__submit() to post the sample to
> > + * the kernel. Otherwise, the sample must be freed with
> > + * user_ring_buffer__discard().
> > + */
> 
> usual complaints about "ringbuffer", feels like a typo

No problem, I'll fix that here and in the rest of the patch-set.

> > +void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size)
> > +{
> > +       __u32 avail_size, total_size, max_size;
> > +       /* 64-bit to avoid overflow in case of extreme application behavior */
> > +       __u64 cons_pos, prod_pos;
> > +       struct ringbuf_hdr *hdr;
> > +
> > +       /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_peek() in
> > +        * the kernel.
> > +        */
> > +       cons_pos = smp_load_acquire(rb->consumer_pos);
> > +       /* Synchronizes with smp_store_release() in user_ringbuf__commit() */
> > +       prod_pos = smp_load_acquire(rb->producer_pos);
> > +
> > +       /* Round up size to a multiple of 8. */
> > +       size = (size + 7) / 8 * 8;
> > +       max_size = rb->mask + 1;
> > +       avail_size = max_size - (prod_pos - cons_pos);
> > +       total_size = size + BPF_RINGBUF_HDR_SZ;
> > +
> > +       if (total_size > max_size)
> > +               return errno = E2BIG, NULL;
> > +
> > +       if (avail_size < total_size)
> > +               return errno = ENODATA, NULL;
> > +
> > +       hdr = rb->data + (prod_pos & rb->mask);
> > +       hdr->len = size | BPF_RINGBUF_BUSY_BIT;
> 
> so I double-checked what kernel ringbuf is doing with size. We still
> record exact user-requested size in header, but all the logic knows
> that it has to be rounded up to closest 8. I think that's best
> behavior because it preserves user-supplied information exactly. So if
> I wanted to reserve and communicate 4 byte sample to my producers,
> they should see that there are 4 bytes of data available, not 8. So
> let's do the same here?

No problem, I'll do this in v4.

> We still should validate that all the positions are multiples of 8, of
> course, as you do in this revision.

Ack.

> > +       hdr->pad = 0;
> > +
> > +       /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in
> > +        * the kernel.
> > +        */
> > +       smp_store_release(rb->producer_pos, prod_pos + total_size);
> > +
> > +       return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask);
> > +}
> > +
> > +static int ms_elapsed_timespec(const struct timespec *start, const struct timespec *end)
> > +{
> > +       int total, ns_per_ms = 1000000, ns_per_s = ns_per_ms * 1000;
> > +
> > +       if (end->tv_sec > start->tv_sec) {
> > +               total = 1000 * (end->tv_sec - start->tv_sec);
> > +               total += (end->tv_nsec + (ns_per_s - start->tv_nsec)) / ns_per_ms;
> > +       } else {
> > +               total = (end->tv_nsec - start->tv_nsec) / ns_per_ms;
> > +       }
> > +
> 
> hm... this seems overengineered, tbh
> 
> u64 start_ns = (u64)start->tv_sec * 1000000000 + start->tv_nsec;
> u64 end_ns = (u64)end->tv_sec * 1000000000 + start->tv_nsec;
> 
> return (end_ns - start_ns) / 1000000;
> 
> ?

Yeah, this is much simpler. Thanks for the suggestion.

> > +       return total;
> > +}
> > +
> > +/* Reserve a record in the ringbuffer, possibly blocking for up to @timeout_ms
> > + * until a sample becomes available.  This function is *not* thread safe, and
> > + * callers must synchronize accessing this function if there are multiple
> > + * producers.
> > + *
> > + * If @timeout_ms is -1, the function will block indefinitely until a sample
> > + * becomes available. Otherwise, @timeout_ms must be non-negative, or errno
> > + * will be set to EINVAL, and NULL will be returned. If @timeout_ms is 0,
> > + * no blocking will occur and the function will return immediately after
> > + * attempting to reserve a sample.
> > + *
> > + * If @size is larger than the size of the entire ringbuffer, errno is set to
> > + * E2BIG and NULL is returned. If the ringbuffer could accommodate @size, but
> > + * currently does not have enough space, the caller will block until at most
> > + * @timeout_ms has elapsed. If insufficient space is available at that time,
> > + * errno will be set to ENODATA, and NULL will be returned.
> 
> ENOSPC?

Ack.

> > + *
> > + * The kernel guarantees that it will wake up this thread to check if
> > + * sufficient space is available in the ringbuffer at least once per invocation
> > + * of the bpf_ringbuf_drain() helper function, provided that at least one
> > + * sample is consumed, and the BPF program did not invoke the function with
> > + * BPF_RB_NO_WAKEUP. A wakeup may occur sooner than that, but the kernel does
> > + * not guarantee this.
> > + *
> > + * When a sample of size @size is found within @timeout_ms, a pointer to the
> > + * sample is returned. After initializing the sample, callers must invoke
> > + * user_ring_buffer__submit() to post the sample to the ringbuffer. Otherwise,
> > + * the sample must be freed with user_ring_buffer__discard().
> > + */
> 
> so comments like this should go into doccomments for this functions in libbpf.h

Ack.

> > +void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms)
> > +{
> > +       int ms_elapsed = 0, err;
> > +       struct timespec start;
> > +
> > +       if (timeout_ms < 0 && timeout_ms != -1)
> > +               return errno = EINVAL, NULL;
> > +
> > +       if (timeout_ms != -1) {
> > +               err = clock_gettime(CLOCK_MONOTONIC, &start);
> > +               if (err)
> > +                       return NULL;
> > +       }
> > +
> > +       do {
> > +               int cnt, ms_remaining = timeout_ms - ms_elapsed;
> 
> let's max(0, timeout_ms - ms_elapsed) to avoid negative ms_remaining
> in some edge timing cases

We actually want to have a negative ms_remaining if timeout_ms is -1. -1
in epoll_wait() specifies an infinite timeout. If we were to round up to
0, it wouldn't block at all.

> > +               void *sample;
> > +               struct timespec curr;
> > +
> > +               sample = user_ring_buffer__reserve(rb, size);
> > +               if (sample)
> > +                       return sample;
> > +               else if (errno != ENODATA)
> > +                       return NULL;
> > +
> > +               /* The kernel guarantees at least one event notification
> > +                * delivery whenever at least one sample is drained from the
> > +                * ringbuffer in an invocation to bpf_ringbuf_drain(). Other
> > +                * additional events may be delivered at any time, but only one
> > +                * event is guaranteed per bpf_ringbuf_drain() invocation,
> > +                * provided that a sample is drained, and the BPF program did
> > +                * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain().
> > +                */
> > +               cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining);
> > +               if (cnt < 0)
> > +                       return NULL;
> > +
> > +               if (timeout_ms == -1)
> > +                       continue;
> > +
> > +               err = clock_gettime(CLOCK_MONOTONIC, &curr);
> > +               if (err)
> > +                       return NULL;
> > +
> > +               ms_elapsed = ms_elapsed_timespec(&start, &curr);
> > +       } while (ms_elapsed <= timeout_ms);
> 
> let's simplify all the time keeping to use nanosecond timestamps and
> only convert to ms when calling epoll_wait()? Then you can just have a
> tiny helper to convert timespec to nanosecond ts ((u64)ts.tv_sec *
> 1000000000 + ts.tv_nsec) and compare u64s directly. WDYT?

Sounds like an improvement to me!

Thanks,
David

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH v3 4/4] selftests/bpf: Add selftests validating the user ringbuf
  2022-08-24 22:03   ` Andrii Nakryiko
@ 2022-08-30 13:46     ` David Vernet
  0 siblings, 0 replies; 16+ messages in thread
From: David Vernet @ 2022-08-30 13:46 UTC (permalink / raw)
  To: Andrii Nakryiko
  Cc: bpf, ast, andrii, daniel, kernel-team, martin.lau, song, yhs,
	john.fastabend, kpsingh, sdf, haoluo, jolsa, joannelkoong, tj,
	linux-kernel

On Wed, Aug 24, 2022 at 03:03:54PM -0700, Andrii Nakryiko wrote:

[...]

> > +                       CHECK(read <= 0, "snprintf_comm",
> > +                             "Failed to write index %d to comm\n", i);
> 
> please, no CHECK() use in new tests, we have ASSERT_xxx() covering all
> common cases

No problem, I'll make this change in v4. Unless I missed something,
CHECK() isn't documented as deprecated in test_progs.h. I'll take care
of adding that in a separate change.

> > +static long
> > +bad_access1(struct bpf_dynptr *dynptr, void *context)
> > +{
> > +       const struct sample *sample;
> > +
> > +       sample = bpf_dynptr_data(dynptr - 1, 0, sizeof(*sample));
> > +       bpf_printk("Was able to pass bad pointer %lx\n", (__u64)dynptr - 1);
> > +
> > +       return 0;
> > +}
> > +
> > +/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
> > + * not be able to read before the pointer.
> > + */
> > +SEC("?raw_tp/sys_nanosleep")
> 
> there is no sys_nanosleep raw tracepoint, use SEC("?raw_tp") to
> specify type, that's enough

Got it, thanks for catching that. Will fix.

> > diff --git a/tools/testing/selftests/bpf/test_user_ringbuf.h b/tools/testing/selftests/bpf/test_user_ringbuf.h
> > new file mode 100644
> > index 000000000000..1643b4d59ba7
> > --- /dev/null
> > +++ b/tools/testing/selftests/bpf/test_user_ringbuf.h
> 
> nit: I'd probably put it under progs/test_user_ringbuf.h so it's
> closer to BPF source code. As it is right now, it's neither near
> user-space part of tests nor near BPF part.

Fair enough, will fix.

Thanks,
David

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH v3 0/4] bpf: Add user-space-publisher ringbuffer map type
  2022-08-24 17:38 ` [PATCH v3 0/4] bpf: Add user-space-publisher ringbuffer map type Daniel Borkmann
@ 2022-08-30 13:50   ` David Vernet
  0 siblings, 0 replies; 16+ messages in thread
From: David Vernet @ 2022-08-30 13:50 UTC (permalink / raw)
  To: Daniel Borkmann
  Cc: bpf, ast, andrii, kernel-team, martin.lau, song, yhs,
	john.fastabend, kpsingh, sdf, haoluo, jolsa, joannelkoong, tj,
	linux-kernel

On Wed, Aug 24, 2022 at 07:38:34PM +0200, Daniel Borkmann wrote:

Hey Daniel,

> On 8/19/22 12:12 AM, David Vernet wrote:
> > This patch set defines a new map type, BPF_MAP_TYPE_USER_RINGBUF, which
> > provides single-user-space-producer / single-kernel-consumer semantics over
> > a ringbuffer.  Along with the new map type, a helper function called
> > bpf_user_ringbuf_drain() is added which allows a BPF program to specify a
> > callback with the following signature, to which samples are posted by the
> > helper:
> 
> Looks like this series fail BPF CI, ptal:
> 
> https://github.com/kernel-patches/bpf/runs/7996821883?check_suite_focus=true

Thanks for the heads up. I'll make sure these are fixed these before
submitting v4 by following the instructions in [0].

[0]: https://github.com/qmonnet/whirl-offload/blob/wip/bpf-ci-check/_posts/2022-08-05-bpf-ci-check.md#running-the-bpf-ci-on-your-own-github-linux-fork.

Thanks,
David

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH v3 2/4] bpf: Add bpf_user_ringbuf_drain() helper
  2022-08-30 13:28     ` David Vernet
@ 2022-09-09 22:45       ` Andrii Nakryiko
  0 siblings, 0 replies; 16+ messages in thread
From: Andrii Nakryiko @ 2022-09-09 22:45 UTC (permalink / raw)
  To: David Vernet
  Cc: bpf, ast, andrii, daniel, kernel-team, martin.lau, song, yhs,
	john.fastabend, kpsingh, sdf, haoluo, jolsa, joannelkoong, tj,
	linux-kernel

On Tue, Aug 30, 2022 at 6:28 AM David Vernet <void@manifault.com> wrote:
>
> On Wed, Aug 24, 2022 at 02:22:44PM -0700, Andrii Nakryiko wrote:
> > > +/* Maximum number of user-producer ringbuffer samples that can be drained in
> > > + * a call to bpf_user_ringbuf_drain().
> > > + */
> > > +#define BPF_MAX_USER_RINGBUF_SAMPLES BIT(17)
> >
> > nit: I don't think using BIT() is appropriate here. 128 * 1024 would
> > be better, IMO. This is not inherently required to be a single bit
> > constant.
>
> No problem, updated.
>
> > > +
> > >  static inline u32 bpf_map_flags_to_cap(struct bpf_map *map)
> > >  {
> > >         u32 access_flags = map->map_flags & (BPF_F_RDONLY_PROG | BPF_F_WRONLY_PROG);
> > > @@ -2411,6 +2417,7 @@ extern const struct bpf_func_proto bpf_loop_proto;
> > >  extern const struct bpf_func_proto bpf_copy_from_user_task_proto;
> > >  extern const struct bpf_func_proto bpf_set_retval_proto;
> > >  extern const struct bpf_func_proto bpf_get_retval_proto;
> > > +extern const struct bpf_func_proto bpf_user_ringbuf_drain_proto;
> > >

[...]

> > > +
> > > +static void __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, u64 flags)
> > > +{
> > > +       u64 producer_pos, consumer_pos;
> > > +
> > > +       /* Synchronizes with smp_store_release() in user-space producer. */
> > > +       producer_pos = smp_load_acquire(&rb->producer_pos);
> > > +
> > > +       /* Using smp_load_acquire() is unnecessary here, as the busy-bit
> > > +        * prevents another task from writing to consumer_pos after it was read
> > > +        * by this task with smp_load_acquire() in __bpf_user_ringbuf_peek().
> > > +        */
> > > +       consumer_pos = rb->consumer_pos;
> > > +        /* Synchronizes with smp_load_acquire() in user-space producer. */
> > > +       smp_store_release(&rb->consumer_pos, consumer_pos + size + BPF_RINGBUF_HDR_SZ);
> > > +
> > > +       /* Prevent the clearing of the busy-bit from being reordered before the
> > > +        * storing of the updated rb->consumer_pos value.
> > > +        */
> > > +       smp_mb__before_atomic();
> > > +       atomic_set(&rb->busy, 0);
> > > +
> > > +       if (!(flags & BPF_RB_NO_WAKEUP)) {
> > > +               /* As a heuristic, if the previously consumed sample caused the
> > > +                * ringbuffer to no longer be full, send an event notification
> > > +                * to any user-space producer that is epoll-waiting.
> > > +                */
> > > +               if (producer_pos - consumer_pos == ringbuf_total_data_sz(rb))
> >
> > I'm a bit confused here. This will be true only if user-space producer
> > filled out entire ringbuf data *exactly* to the last byte with a
> > single record. Or am I misunderstanding this?
>
> I think you're misunderstanding. This will indeed only be true if the ring
> buffer was full (to the last byte as you said) before the last sample was
> consumed, but it doesn't have to have been filled with a single record.
> We're just checking that producer_pos - consumer_pos is the total size of
> the ring buffer, but there can be many samples between consumer_pos and
> producer_pos for that to be the case.

you are right, never mind about single sample part, but I don't think
that's the important part (just something that surprised me making
everything even less realistic)

>
> > If my understanding is correct, how is this a realistic use case and
> > how does this heuristic help at all?
>
> Though I think you may have misunderstood the heuristic, some more
> explanation is probably warranted nonetheless. This heuristic being useful
> relies on two assumptions:
>
> 1. It will be common for user-space to publish statically sized samples.
>
> I think this one is pretty unambiguously true, especially considering that
> BPF_MAP_TYPE_RINGBUF was put to great use with statically sized samples for
> quite some time. I'm open to hearing why that might not be the case.

True, majority of use cases for BPF ringubf were fixed-sized, thanks
to convenience of reserve/commit API. But data structure itself allows
variable-sized and there are use cases doing this, plus with dynptr
now it's easier to do variable-sized efficiently. So special-casing
for fixed-sized sample a bit off, especially considering #2

>
> 2. The size of the ring buffer is a multiple of the size of a sample.
>
> This one I think is a bit less clear. Users can always size the ring buffer
> to make sure this will be the case, but whether or not that will be
> commonly done is another story.

so I'm almost certain this won't be the case. I don't think anyone is
going to be tracking exact size of sample's struct (and it will most
probably change with time) and then sizing ringbuf to be both
power-of-2 of page_size *and* multiple of sizeof(struct
my_ringbuf_sample) is something I don't see anyone doing.

>
> I'm fine with removing this heuristic for now if it's unclear that it's
> serving a common use-case. We can always add it back in later if we want
> to.

Yes, this looks quite out of place with a bunch of optimistic but
unrealistic assumptions. Doing one notification after drain will be
fine for now, IMO.

>
> > > +                       irq_work_queue(&rb->work);
> > > +
> > > +       }
> > > +}
> > > +
> > > +BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
> > > +          void *, callback_fn, void *, callback_ctx, u64, flags)
> > > +{
> > > +       struct bpf_ringbuf *rb;
> > > +       long num_samples = 0, ret = 0;
> > > +       bpf_callback_t callback = (bpf_callback_t)callback_fn;
> > > +       u64 wakeup_flags = BPF_RB_NO_WAKEUP;
> > > +
> > > +       if (unlikely(flags & ~wakeup_flags))
> >

[...]

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH v3 3/4] bpf: Add libbpf logic for user-space ring buffer
  2022-08-30 13:42     ` David Vernet
@ 2022-09-09 22:50       ` Andrii Nakryiko
  0 siblings, 0 replies; 16+ messages in thread
From: Andrii Nakryiko @ 2022-09-09 22:50 UTC (permalink / raw)
  To: David Vernet
  Cc: bpf, ast, andrii, daniel, kernel-team, martin.lau, song, yhs,
	john.fastabend, kpsingh, sdf, haoluo, jolsa, joannelkoong, tj,
	linux-kernel

On Tue, Aug 30, 2022 at 6:42 AM David Vernet <void@manifault.com> wrote:
>
> On Wed, Aug 24, 2022 at 02:58:31PM -0700, Andrii Nakryiko wrote:
>
> [...]
>
> > > +LIBBPF_API struct user_ring_buffer *
> > > +user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts);
> > > +LIBBPF_API void *user_ring_buffer__reserve(struct user_ring_buffer *rb,
> > > +                                          __u32 size);
> > > +
> > > +LIBBPF_API void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb,
> > > +                                                   __u32 size,
> > > +                                                   int timeout_ms);
> > > +LIBBPF_API void user_ring_buffer__submit(struct user_ring_buffer *rb,
> > > +                                        void *sample);
> > > +LIBBPF_API void user_ring_buffer__discard(struct user_ring_buffer *rb,
> > > +                                         void *sample);
> > > +LIBBPF_API void user_ring_buffer__free(struct user_ring_buffer *rb);
> > > +

[...]

> > > +void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms)
> > > +{
> > > +       int ms_elapsed = 0, err;
> > > +       struct timespec start;
> > > +
> > > +       if (timeout_ms < 0 && timeout_ms != -1)
> > > +               return errno = EINVAL, NULL;
> > > +
> > > +       if (timeout_ms != -1) {
> > > +               err = clock_gettime(CLOCK_MONOTONIC, &start);
> > > +               if (err)
> > > +                       return NULL;
> > > +       }
> > > +
> > > +       do {
> > > +               int cnt, ms_remaining = timeout_ms - ms_elapsed;
> >
> > let's max(0, timeout_ms - ms_elapsed) to avoid negative ms_remaining
> > in some edge timing cases
>
> We actually want to have a negative ms_remaining if timeout_ms is -1. -1
> in epoll_wait() specifies an infinite timeout. If we were to round up to
> 0, it wouldn't block at all.

then I think it's better to special case timeout_ms == -1. My worry
here as I mentioned is edge case timing where ms_elapsed is bigger
than our remaining timeout_ms and we go into <0 and stay blocked for
long time.

So I think it's best to pass `timeout_ms < 0 ? -1 : ms_remaining` and
still do max. But I haven't checked v5 yet, so if you already
addressed this, it's fine.


>
> > > +               void *sample;
> > > +               struct timespec curr;
> > > +
> > > +               sample = user_ring_buffer__reserve(rb, size);
> > > +               if (sample)
> > > +                       return sample;
> > > +               else if (errno != ENODATA)
> > > +                       return NULL;
> > > +
> > > +               /* The kernel guarantees at least one event notification
> > > +                * delivery whenever at least one sample is drained from the
> > > +                * ringbuffer in an invocation to bpf_ringbuf_drain(). Other
> > > +                * additional events may be delivered at any time, but only one
> > > +                * event is guaranteed per bpf_ringbuf_drain() invocation,
> > > +                * provided that a sample is drained, and the BPF program did
> > > +                * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain().
> > > +                */
> > > +               cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining);
> > > +               if (cnt < 0)
> > > +                       return NULL;
> > > +
> > > +               if (timeout_ms == -1)
> > > +                       continue;
> > > +
> > > +               err = clock_gettime(CLOCK_MONOTONIC, &curr);
> > > +               if (err)
> > > +                       return NULL;
> > > +
> > > +               ms_elapsed = ms_elapsed_timespec(&start, &curr);
> > > +       } while (ms_elapsed <= timeout_ms);
> >
> > let's simplify all the time keeping to use nanosecond timestamps and
> > only convert to ms when calling epoll_wait()? Then you can just have a
> > tiny helper to convert timespec to nanosecond ts ((u64)ts.tv_sec *
> > 1000000000 + ts.tv_nsec) and compare u64s directly. WDYT?
>
> Sounds like an improvement to me!
>
> Thanks,
> David

^ permalink raw reply	[flat|nested] 16+ messages in thread

end of thread, other threads:[~2022-09-09 22:51 UTC | newest]

Thread overview: 16+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-08-18 22:12 [PATCH v3 0/4] bpf: Add user-space-publisher ringbuffer map type David Vernet
2022-08-18 22:12 ` [PATCH v3 1/4] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF " David Vernet
2022-08-24 20:52   ` Andrii Nakryiko
2022-08-18 22:12 ` [PATCH v3 2/4] bpf: Add bpf_user_ringbuf_drain() helper David Vernet
2022-08-24 21:22   ` Andrii Nakryiko
2022-08-30 13:28     ` David Vernet
2022-09-09 22:45       ` Andrii Nakryiko
2022-08-18 22:12 ` [PATCH v3 3/4] bpf: Add libbpf logic for user-space ring buffer David Vernet
2022-08-24 21:58   ` Andrii Nakryiko
2022-08-30 13:42     ` David Vernet
2022-09-09 22:50       ` Andrii Nakryiko
2022-08-18 22:12 ` [PATCH v3 4/4] selftests/bpf: Add selftests validating the user ringbuf David Vernet
2022-08-24 22:03   ` Andrii Nakryiko
2022-08-30 13:46     ` David Vernet
2022-08-24 17:38 ` [PATCH v3 0/4] bpf: Add user-space-publisher ringbuffer map type Daniel Borkmann
2022-08-30 13:50   ` David Vernet

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).