bpf.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH v2 0/4] bpf: Add user-space-publisher ringbuffer map type
@ 2022-08-11 23:49 David Vernet
  2022-08-11 23:49 ` [PATCH v2 1/4] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF " David Vernet
                   ` (4 more replies)
  0 siblings, 5 replies; 9+ messages in thread
From: David Vernet @ 2022-08-11 23:49 UTC (permalink / raw)
  To: bpf, andrii, ast, daniel
  Cc: haoluo, joannelkoong, john.fastabend, jolsa, kpsingh,
	linux-kernel, martin.lau, sdf, song, yhs, kernel-team, tj

This patch set defines a new map type, BPF_MAP_TYPE_USER_RINGBUF, which
provides single-user-space-producer / single-kernel-consumer semantics over
a ringbuffer.  Along with the new map type, a helper function called
bpf_user_ringbuf_drain() is added which allows a BPF program to specify a
callback with the following signature, to which samples are posted by the
helper:

void (struct bpf_dynptr *dynptr, void *context);

The program can then use the bpf_dynptr_read() or bpf_dynptr_data() helper
functions to safely read the sample from the dynptr. There are currently no
helpers available to determine the size of the sample, but one could easily
be added if required.

On the user-space side, libbpf has been updated to export a new
'struct ring_buffer_user' type, along with the following symbols:

struct ring_buffer_user *
ring_buffer_user__new(int map_fd,
                      const struct ring_buffer_user_opts *opts);
void ring_buffer_user__free(struct ring_buffer_user *rb);
void *ring_buffer_user__reserve(struct ring_buffer_user *rb, uint32_t size);
void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size,
			     int timeout_ms);
void ring_buffer_user__discard(struct ring_buffer_user *rb, void *sample);
void ring_buffer_user__submit(struct ring_buffer_user *rb, void *sample);

These symbols are exported for inclusion in libbpf version 1.0.0.

Note that one thing that is not included in this patch-set is the
ability to kick the kernel from user-space to have it drain messages.
The selftests included in this patch-set currently just use progs with
syscall hooks to "kick" the kernel and have it drain samples from a
user-producer ringbuffer, but being able to kick the kernel using some
other mechanism that doesn't rely on such hooks would be very useful as
well. The intention is for this to be separate from BPF iters, which are
meant for extracting data from the kernel. This would be a method for
driving logic in BPF from user-space, for example, to run a callback
(possibly with data) for each sample that's published to a BPF program
from user-space. I'm planning on adding this in a future patch-set.

Signed-off-by: David Vernet <void@manifault.com>
--
v1 -> v2:
- Following Joanne landing 883743422ced ("bpf: Fix ref_obj_id for dynptr
  data slices in verifier") [0], removed [PATCH 1/5] bpf: Clear callee
  saved regs after updating REG0 [1].
- Following the above adjustment, updated check_helper_call() to not store
  a reference for bpf_dynptr_data() if the register containing the dynptr
  is of type MEM_ALLOC.
- Fixed casting issue pointed out by kernel test robot by adding a missing
  (uintptr_t) cast.

[0] https://lore.kernel.org/all/20220809214055.4050604-1-joannelkoong@gmail.com/
[1] https://lore.kernel.org/all/20220808155341.2479054-1-void@manifault.com/

David Vernet (4):
  bpf: Define new BPF_MAP_TYPE_USER_RINGBUF map type
  bpf: Add bpf_user_ringbuf_drain() helper
  bpf: Add libbpf logic for user-space ring buffer
  selftests/bpf: Add selftests validating the user ringbuf

 include/linux/bpf.h                           |   6 +-
 include/linux/bpf_types.h                     |   1 +
 include/uapi/linux/bpf.h                      |   8 +
 kernel/bpf/helpers.c                          |   2 +
 kernel/bpf/ringbuf.c                          | 232 ++++++-
 kernel/bpf/verifier.c                         |  59 +-
 tools/include/uapi/linux/bpf.h                |   8 +
 tools/lib/bpf/libbpf.c                        |  11 +-
 tools/lib/bpf/libbpf.h                        |  19 +
 tools/lib/bpf/libbpf.map                      |   6 +
 tools/lib/bpf/libbpf_probes.c                 |   1 +
 tools/lib/bpf/ringbuf.c                       | 216 +++++++
 .../selftests/bpf/prog_tests/user_ringbuf.c   | 587 ++++++++++++++++++
 .../selftests/bpf/progs/user_ringbuf_fail.c   | 174 ++++++
 .../bpf/progs/user_ringbuf_success.c          | 220 +++++++
 .../testing/selftests/bpf/test_user_ringbuf.h |  35 ++
 16 files changed, 1566 insertions(+), 19 deletions(-)
 create mode 100644 tools/testing/selftests/bpf/prog_tests/user_ringbuf.c
 create mode 100644 tools/testing/selftests/bpf/progs/user_ringbuf_fail.c
 create mode 100644 tools/testing/selftests/bpf/progs/user_ringbuf_success.c
 create mode 100644 tools/testing/selftests/bpf/test_user_ringbuf.h

-- 
2.37.1


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH v2 1/4] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF map type
  2022-08-11 23:49 [PATCH v2 0/4] bpf: Add user-space-publisher ringbuffer map type David Vernet
@ 2022-08-11 23:49 ` David Vernet
  2022-08-11 23:49 ` [PATCH v2 2/4] bpf: Add bpf_user_ringbuf_drain() helper David Vernet
                   ` (3 subsequent siblings)
  4 siblings, 0 replies; 9+ messages in thread
From: David Vernet @ 2022-08-11 23:49 UTC (permalink / raw)
  To: bpf, andrii, ast, daniel
  Cc: haoluo, joannelkoong, john.fastabend, jolsa, kpsingh,
	linux-kernel, martin.lau, sdf, song, yhs, kernel-team, tj

We want to support a ringbuf map type where samples are published from
user-space to BPF programs. BPF currently supports a kernel -> user-space
circular ringbuffer via the BPF_MAP_TYPE_RINGBUF map type. We'll need to
define a new map type for user-space -> kernel, as none of the helpers
exported for BPF_MAP_TYPE_RINGBUF will apply to a user-space producer
ringbuffer, and we'll want to add one or more helper functions that would
not apply for a kernel-producer ringbuffer.

This patch therefore adds a new BPF_MAP_TYPE_USER_RINGBUF map type
definition. The map type is useless in its current form, as there is no way
to access or use it for anything until we add more BPF helpers. A follow-on
patch will therefore add a new helper function that allows BPF programs to
run callbacks on samples that are published to the ringbuffer.

Signed-off-by: David Vernet <void@manifault.com>
---
 include/linux/bpf_types.h      |  1 +
 include/uapi/linux/bpf.h       |  1 +
 kernel/bpf/ringbuf.c           | 70 +++++++++++++++++++++++++++++-----
 kernel/bpf/verifier.c          |  3 ++
 tools/include/uapi/linux/bpf.h |  1 +
 tools/lib/bpf/libbpf.c         |  1 +
 6 files changed, 68 insertions(+), 9 deletions(-)

diff --git a/include/linux/bpf_types.h b/include/linux/bpf_types.h
index 2b9112b80171..2c6a4f2562a7 100644
--- a/include/linux/bpf_types.h
+++ b/include/linux/bpf_types.h
@@ -126,6 +126,7 @@ BPF_MAP_TYPE(BPF_MAP_TYPE_STRUCT_OPS, bpf_struct_ops_map_ops)
 #endif
 BPF_MAP_TYPE(BPF_MAP_TYPE_RINGBUF, ringbuf_map_ops)
 BPF_MAP_TYPE(BPF_MAP_TYPE_BLOOM_FILTER, bloom_filter_map_ops)
+BPF_MAP_TYPE(BPF_MAP_TYPE_USER_RINGBUF, user_ringbuf_map_ops)
 
 BPF_LINK_TYPE(BPF_LINK_TYPE_RAW_TRACEPOINT, raw_tracepoint)
 BPF_LINK_TYPE(BPF_LINK_TYPE_TRACING, tracing)
diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
index 7d1e2794d83e..b8ec9e741a43 100644
--- a/include/uapi/linux/bpf.h
+++ b/include/uapi/linux/bpf.h
@@ -909,6 +909,7 @@ enum bpf_map_type {
 	BPF_MAP_TYPE_INODE_STORAGE,
 	BPF_MAP_TYPE_TASK_STORAGE,
 	BPF_MAP_TYPE_BLOOM_FILTER,
+	BPF_MAP_TYPE_USER_RINGBUF,
 };
 
 /* Note that tracing related programs such as
diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
index b483aea35f41..c0f3bca4bb09 100644
--- a/kernel/bpf/ringbuf.c
+++ b/kernel/bpf/ringbuf.c
@@ -38,12 +38,32 @@ struct bpf_ringbuf {
 	struct page **pages;
 	int nr_pages;
 	spinlock_t spinlock ____cacheline_aligned_in_smp;
-	/* Consumer and producer counters are put into separate pages to allow
-	 * mapping consumer page as r/w, but restrict producer page to r/o.
-	 * This protects producer position from being modified by user-space
-	 * application and ruining in-kernel position tracking.
+	/* Consumer and producer counters are put into separate pages to
+	 * allow each position to be mapped with different permissions.
+	 * This prevents a user-space application from modifying the
+	 * position and ruining in-kernel tracking. The permissions of the
+	 * pages depend on who is producing samples: user-space or the
+	 * kernel.
+	 *
+	 * Kernel-producer
+	 * ---------------
+	 * The producer position and data pages are mapped as r/o in
+	 * userspace. For this approach, bits in the header of samples are
+	 * used to signal to user-space, and to other producers, whether a
+	 * sample is currently being written.
+	 *
+	 * User-space producer
+	 * -------------------
+	 * Only the page containing the consumer position, and whether the
+	 * ringbuffer is currently being consumed via a 'busy' bit, are
+	 * mapped r/o in user-space. Sample headers may not be used to
+	 * communicate any information between kernel consumers, as a
+	 * user-space application could modify its contents at any time.
 	 */
-	unsigned long consumer_pos __aligned(PAGE_SIZE);
+	struct {
+		unsigned long consumer_pos;
+		atomic_t busy;
+	} __aligned(PAGE_SIZE);
 	unsigned long producer_pos __aligned(PAGE_SIZE);
 	char data[] __aligned(PAGE_SIZE);
 };
@@ -141,6 +161,7 @@ static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node)
 
 	rb->mask = data_sz - 1;
 	rb->consumer_pos = 0;
+	atomic_set(&rb->busy, 0);
 	rb->producer_pos = 0;
 
 	return rb;
@@ -224,15 +245,23 @@ static int ringbuf_map_get_next_key(struct bpf_map *map, void *key,
 	return -ENOTSUPP;
 }
 
-static int ringbuf_map_mmap(struct bpf_map *map, struct vm_area_struct *vma)
+static int ringbuf_map_mmap(struct bpf_map *map, struct vm_area_struct *vma,
+			    bool kernel_producer)
 {
 	struct bpf_ringbuf_map *rb_map;
 
 	rb_map = container_of(map, struct bpf_ringbuf_map, map);
 
 	if (vma->vm_flags & VM_WRITE) {
-		/* allow writable mapping for the consumer_pos only */
-		if (vma->vm_pgoff != 0 || vma->vm_end - vma->vm_start != PAGE_SIZE)
+		if (kernel_producer) {
+			/* allow writable mapping for the consumer_pos only */
+			if (vma->vm_pgoff != 0 || vma->vm_end - vma->vm_start != PAGE_SIZE)
+				return -EPERM;
+		/* For user ringbufs, disallow writable mappings to the
+		 * consumer pointer, and allow writable mappings to both the
+		 * producer position, and the ring buffer data itself.
+		 */
+		} else if (vma->vm_pgoff == 0)
 			return -EPERM;
 	} else {
 		vma->vm_flags &= ~VM_MAYWRITE;
@@ -242,6 +271,16 @@ static int ringbuf_map_mmap(struct bpf_map *map, struct vm_area_struct *vma)
 				   vma->vm_pgoff + RINGBUF_PGOFF);
 }
 
+static int ringbuf_map_mmap_kern(struct bpf_map *map, struct vm_area_struct *vma)
+{
+	return ringbuf_map_mmap(map, vma, true);
+}
+
+static int ringbuf_map_mmap_user(struct bpf_map *map, struct vm_area_struct *vma)
+{
+	return ringbuf_map_mmap(map, vma, false);
+}
+
 static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)
 {
 	unsigned long cons_pos, prod_pos;
@@ -269,7 +308,7 @@ const struct bpf_map_ops ringbuf_map_ops = {
 	.map_meta_equal = bpf_map_meta_equal,
 	.map_alloc = ringbuf_map_alloc,
 	.map_free = ringbuf_map_free,
-	.map_mmap = ringbuf_map_mmap,
+	.map_mmap = ringbuf_map_mmap_kern,
 	.map_poll = ringbuf_map_poll,
 	.map_lookup_elem = ringbuf_map_lookup_elem,
 	.map_update_elem = ringbuf_map_update_elem,
@@ -278,6 +317,19 @@ const struct bpf_map_ops ringbuf_map_ops = {
 	.map_btf_id = &ringbuf_map_btf_ids[0],
 };
 
+BTF_ID_LIST_SINGLE(user_ringbuf_map_btf_ids, struct, bpf_ringbuf_map)
+const struct bpf_map_ops user_ringbuf_map_ops = {
+	.map_meta_equal = bpf_map_meta_equal,
+	.map_alloc = ringbuf_map_alloc,
+	.map_free = ringbuf_map_free,
+	.map_mmap = ringbuf_map_mmap_user,
+	.map_lookup_elem = ringbuf_map_lookup_elem,
+	.map_update_elem = ringbuf_map_update_elem,
+	.map_delete_elem = ringbuf_map_delete_elem,
+	.map_get_next_key = ringbuf_map_get_next_key,
+	.map_btf_id = &user_ringbuf_map_btf_ids[0],
+};
+
 /* Given pointer to ring buffer record metadata and struct bpf_ringbuf itself,
  * calculate offset from record metadata to ring buffer in pages, rounded
  * down. This page offset is stored as part of record metadata and allows to
diff --git a/kernel/bpf/verifier.c b/kernel/bpf/verifier.c
index 2c1f8069f7b7..970ec5c7ce05 100644
--- a/kernel/bpf/verifier.c
+++ b/kernel/bpf/verifier.c
@@ -6202,6 +6202,8 @@ static int check_map_func_compatibility(struct bpf_verifier_env *env,
 		    func_id != BPF_FUNC_ringbuf_discard_dynptr)
 			goto error;
 		break;
+	case BPF_MAP_TYPE_USER_RINGBUF:
+		goto error;
 	case BPF_MAP_TYPE_STACK_TRACE:
 		if (func_id != BPF_FUNC_get_stackid)
 			goto error;
@@ -12681,6 +12683,7 @@ static int check_map_prog_compatibility(struct bpf_verifier_env *env,
 			}
 			break;
 		case BPF_MAP_TYPE_RINGBUF:
+		case BPF_MAP_TYPE_USER_RINGBUF:
 		case BPF_MAP_TYPE_INODE_STORAGE:
 		case BPF_MAP_TYPE_SK_STORAGE:
 		case BPF_MAP_TYPE_TASK_STORAGE:
diff --git a/tools/include/uapi/linux/bpf.h b/tools/include/uapi/linux/bpf.h
index e174ad28aeb7..ee04b71969b4 100644
--- a/tools/include/uapi/linux/bpf.h
+++ b/tools/include/uapi/linux/bpf.h
@@ -909,6 +909,7 @@ enum bpf_map_type {
 	BPF_MAP_TYPE_INODE_STORAGE,
 	BPF_MAP_TYPE_TASK_STORAGE,
 	BPF_MAP_TYPE_BLOOM_FILTER,
+	BPF_MAP_TYPE_USER_RINGBUF,
 };
 
 /* Note that tracing related programs such as
diff --git a/tools/lib/bpf/libbpf.c b/tools/lib/bpf/libbpf.c
index 917d975bd4c6..244d7b883dc8 100644
--- a/tools/lib/bpf/libbpf.c
+++ b/tools/lib/bpf/libbpf.c
@@ -163,6 +163,7 @@ static const char * const map_type_name[] = {
 	[BPF_MAP_TYPE_INODE_STORAGE]		= "inode_storage",
 	[BPF_MAP_TYPE_TASK_STORAGE]		= "task_storage",
 	[BPF_MAP_TYPE_BLOOM_FILTER]		= "bloom_filter",
+	[BPF_MAP_TYPE_USER_RINGBUF]             = "user_ringbuf",
 };
 
 static const char * const prog_type_name[] = {
-- 
2.37.1


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH v2 2/4] bpf: Add bpf_user_ringbuf_drain() helper
  2022-08-11 23:49 [PATCH v2 0/4] bpf: Add user-space-publisher ringbuffer map type David Vernet
  2022-08-11 23:49 ` [PATCH v2 1/4] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF " David Vernet
@ 2022-08-11 23:49 ` David Vernet
  2022-08-15 21:23   ` Hao Luo
  2022-08-11 23:49 ` [PATCH v2 3/4] bpf: Add libbpf logic for user-space ring buffer David Vernet
                   ` (2 subsequent siblings)
  4 siblings, 1 reply; 9+ messages in thread
From: David Vernet @ 2022-08-11 23:49 UTC (permalink / raw)
  To: bpf, andrii, ast, daniel
  Cc: haoluo, joannelkoong, john.fastabend, jolsa, kpsingh,
	linux-kernel, martin.lau, sdf, song, yhs, kernel-team, tj

Now that we have a BPF_MAP_TYPE_USER_RINGBUF map type, we need to add a
helper function that allows BPF programs to drain samples from the ring
buffer, and invoke a callback for each. This patch adds a new
bpf_user_ringbuf_drain() helper that provides this abstraction.

In order to support this, we needed to also add a new PTR_TO_DYNPTR
register type to reflect a dynptr that was allocated by a helper function
and passed to a BPF program. The verifier currently only supports
PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL and MEM_ALLOC.

Signed-off-by: David Vernet <void@manifault.com>
---
 include/linux/bpf.h            |   6 +-
 include/uapi/linux/bpf.h       |   7 ++
 kernel/bpf/helpers.c           |   2 +
 kernel/bpf/ringbuf.c           | 157 +++++++++++++++++++++++++++++++--
 kernel/bpf/verifier.c          |  67 ++++++++++++--
 tools/include/uapi/linux/bpf.h |   7 ++
 6 files changed, 233 insertions(+), 13 deletions(-)

diff --git a/include/linux/bpf.h b/include/linux/bpf.h
index a627a02cf8ab..32ddfd1e7c1b 100644
--- a/include/linux/bpf.h
+++ b/include/linux/bpf.h
@@ -401,7 +401,7 @@ enum bpf_type_flag {
 	/* DYNPTR points to memory local to the bpf program. */
 	DYNPTR_TYPE_LOCAL	= BIT(8 + BPF_BASE_TYPE_BITS),
 
-	/* DYNPTR points to a ringbuf record. */
+	/* DYNPTR points to a kernel-produced ringbuf record. */
 	DYNPTR_TYPE_RINGBUF	= BIT(9 + BPF_BASE_TYPE_BITS),
 
 	/* Size is known at compile time. */
@@ -606,6 +606,7 @@ enum bpf_reg_type {
 	PTR_TO_MEM,		 /* reg points to valid memory region */
 	PTR_TO_BUF,		 /* reg points to a read/write buffer */
 	PTR_TO_FUNC,		 /* reg points to a bpf program function */
+	PTR_TO_DYNPTR,		 /* reg points to a dynptr */
 	__BPF_REG_TYPE_MAX,
 
 	/* Extended reg_types. */
@@ -2411,6 +2412,7 @@ extern const struct bpf_func_proto bpf_loop_proto;
 extern const struct bpf_func_proto bpf_copy_from_user_task_proto;
 extern const struct bpf_func_proto bpf_set_retval_proto;
 extern const struct bpf_func_proto bpf_get_retval_proto;
+extern const struct bpf_func_proto bpf_user_ringbuf_drain_proto;
 
 const struct bpf_func_proto *tracing_prog_func_proto(
   enum bpf_func_id func_id, const struct bpf_prog *prog);
@@ -2555,7 +2557,7 @@ enum bpf_dynptr_type {
 	BPF_DYNPTR_TYPE_INVALID,
 	/* Points to memory that is local to the bpf program */
 	BPF_DYNPTR_TYPE_LOCAL,
-	/* Underlying data is a ringbuf record */
+	/* Underlying data is a kernel-produced ringbuf record */
 	BPF_DYNPTR_TYPE_RINGBUF,
 };
 
diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
index b8ec9e741a43..54188906c9fc 100644
--- a/include/uapi/linux/bpf.h
+++ b/include/uapi/linux/bpf.h
@@ -5354,6 +5354,12 @@ union bpf_attr {
  *	Return
  *		Current *ktime*.
  *
+ * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
+ *	Description
+ *		Drain samples from the specified user ringbuffer, and invoke the
+ *		provided callback for each such sample.
+ *	Return
+ *		An error if a sample could not be drained.
  */
 #define __BPF_FUNC_MAPPER(FN)		\
 	FN(unspec),			\
@@ -5565,6 +5571,7 @@ union bpf_attr {
 	FN(tcp_raw_check_syncookie_ipv4),	\
 	FN(tcp_raw_check_syncookie_ipv6),	\
 	FN(ktime_get_tai_ns),		\
+	FN(user_ringbuf_drain),	        \
 	/* */
 
 /* integer value in 'imm' field of BPF_CALL instruction selects which helper
diff --git a/kernel/bpf/helpers.c b/kernel/bpf/helpers.c
index 3c1b9bbcf971..9141eae0ca67 100644
--- a/kernel/bpf/helpers.c
+++ b/kernel/bpf/helpers.c
@@ -1661,6 +1661,8 @@ bpf_base_func_proto(enum bpf_func_id func_id)
 		return &bpf_dynptr_write_proto;
 	case BPF_FUNC_dynptr_data:
 		return &bpf_dynptr_data_proto;
+	case BPF_FUNC_user_ringbuf_drain:
+		return &bpf_user_ringbuf_drain_proto;
 	default:
 		break;
 	}
diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
index c0f3bca4bb09..73fa6ed12052 100644
--- a/kernel/bpf/ringbuf.c
+++ b/kernel/bpf/ringbuf.c
@@ -291,16 +291,33 @@ static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)
 }
 
 static __poll_t ringbuf_map_poll(struct bpf_map *map, struct file *filp,
-				 struct poll_table_struct *pts)
+				 struct poll_table_struct *pts,
+				 bool kernel_producer)
 {
 	struct bpf_ringbuf_map *rb_map;
+	bool buffer_empty;
 
 	rb_map = container_of(map, struct bpf_ringbuf_map, map);
 	poll_wait(filp, &rb_map->rb->waitq, pts);
 
-	if (ringbuf_avail_data_sz(rb_map->rb))
-		return EPOLLIN | EPOLLRDNORM;
-	return 0;
+	buffer_empty = !ringbuf_avail_data_sz(rb_map->rb);
+
+	if (kernel_producer)
+		return buffer_empty ? 0 : EPOLLIN | EPOLLRDNORM;
+	else
+		return buffer_empty ? EPOLLOUT | EPOLLWRNORM : 0;
+}
+
+static __poll_t ringbuf_map_poll_kern(struct bpf_map *map, struct file *filp,
+				      struct poll_table_struct *pts)
+{
+	return ringbuf_map_poll(map, filp, pts, true);
+}
+
+static __poll_t ringbuf_map_poll_user(struct bpf_map *map, struct file *filp,
+				      struct poll_table_struct *pts)
+{
+	return ringbuf_map_poll(map, filp, pts, false);
 }
 
 BTF_ID_LIST_SINGLE(ringbuf_map_btf_ids, struct, bpf_ringbuf_map)
@@ -309,7 +326,7 @@ const struct bpf_map_ops ringbuf_map_ops = {
 	.map_alloc = ringbuf_map_alloc,
 	.map_free = ringbuf_map_free,
 	.map_mmap = ringbuf_map_mmap_kern,
-	.map_poll = ringbuf_map_poll,
+	.map_poll = ringbuf_map_poll_kern,
 	.map_lookup_elem = ringbuf_map_lookup_elem,
 	.map_update_elem = ringbuf_map_update_elem,
 	.map_delete_elem = ringbuf_map_delete_elem,
@@ -323,6 +340,7 @@ const struct bpf_map_ops user_ringbuf_map_ops = {
 	.map_alloc = ringbuf_map_alloc,
 	.map_free = ringbuf_map_free,
 	.map_mmap = ringbuf_map_mmap_user,
+	.map_poll = ringbuf_map_poll_user,
 	.map_lookup_elem = ringbuf_map_lookup_elem,
 	.map_update_elem = ringbuf_map_update_elem,
 	.map_delete_elem = ringbuf_map_delete_elem,
@@ -605,3 +623,132 @@ const struct bpf_func_proto bpf_ringbuf_discard_dynptr_proto = {
 	.arg1_type	= ARG_PTR_TO_DYNPTR | DYNPTR_TYPE_RINGBUF | OBJ_RELEASE,
 	.arg2_type	= ARG_ANYTHING,
 };
+
+static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
+				   u32 *size)
+{
+	unsigned long cons_pos, prod_pos;
+	u32 sample_len, total_len;
+	u32 *hdr;
+	int err;
+	int busy = 0;
+
+	/* If another consumer is already consuming a sample, wait for them to
+	 * finish.
+	 */
+	if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
+		return -EBUSY;
+
+	/* Synchronizes with smp_store_release() in user-space. */
+	prod_pos = smp_load_acquire(&rb->producer_pos);
+	/* Synchronizes with smp_store_release() in
+	 * __bpf_user_ringbuf_sample_release().
+	 */
+	cons_pos = smp_load_acquire(&rb->consumer_pos);
+	if (cons_pos >= prod_pos) {
+		atomic_set(&rb->busy, 0);
+		return -ENODATA;
+	}
+
+	hdr = (u32 *)((uintptr_t)rb->data + (uintptr_t)(cons_pos & rb->mask));
+	sample_len = *hdr;
+
+	/* Check that the sample can fit into a dynptr. */
+	err = bpf_dynptr_check_size(sample_len);
+	if (err) {
+		atomic_set(&rb->busy, 0);
+		return err;
+	}
+
+	/* Check that the sample fits within the region advertised by the
+	 * consumer position.
+	 */
+	total_len = sample_len + BPF_RINGBUF_HDR_SZ;
+	if (total_len > prod_pos - cons_pos) {
+		atomic_set(&rb->busy, 0);
+		return -E2BIG;
+	}
+
+	/* Check that the sample fits within the data region of the ring buffer.
+	 */
+	if (total_len > rb->mask + 1) {
+		atomic_set(&rb->busy, 0);
+		return -E2BIG;
+	}
+
+	/* consumer_pos is updated when the sample is released.
+	 */
+
+	*sample = (void *)((uintptr_t)rb->data +
+			   (uintptr_t)((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
+	*size = sample_len;
+
+	return 0;
+}
+
+static void
+__bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
+				  u64 flags)
+{
+
+
+	/* To release the ringbuffer, just increment the producer position to
+	 * signal that a new sample can be consumed. The busy bit is cleared by
+	 * userspace when posting a new sample to the ringbuffer.
+	 */
+	smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
+			  BPF_RINGBUF_HDR_SZ);
+
+	if (flags & BPF_RB_FORCE_WAKEUP || !(flags & BPF_RB_NO_WAKEUP))
+		irq_work_queue(&rb->work);
+
+	atomic_set(&rb->busy, 0);
+}
+
+BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
+	   void *, callback_fn, void *, callback_ctx, u64, flags)
+{
+	struct bpf_ringbuf *rb;
+	long num_samples = 0, ret = 0;
+	int err;
+	bpf_callback_t callback = (bpf_callback_t)callback_fn;
+	u64 wakeup_flags = BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP;
+
+	if (unlikely(flags & ~wakeup_flags))
+		return -EINVAL;
+
+	/* The two wakeup flags are mutually exclusive. */
+	if (unlikely((flags & wakeup_flags) == wakeup_flags))
+		return -EINVAL;
+
+	rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
+	do {
+		u32 size;
+		void *sample;
+
+		err = __bpf_user_ringbuf_poll(rb, &sample, &size);
+
+		if (!err) {
+			struct bpf_dynptr_kern dynptr;
+
+			bpf_dynptr_init(&dynptr, sample, BPF_DYNPTR_TYPE_LOCAL,
+					0, size);
+			ret = callback((uintptr_t)&dynptr,
+				       (uintptr_t)callback_ctx, 0, 0, 0);
+
+			__bpf_user_ringbuf_sample_release(rb, size, flags);
+			num_samples++;
+		}
+	} while (err == 0 && num_samples < 4096 && ret == 0);
+
+	return num_samples;
+}
+
+const struct bpf_func_proto bpf_user_ringbuf_drain_proto = {
+	.func		= bpf_user_ringbuf_drain,
+	.ret_type	= RET_INTEGER,
+	.arg1_type	= ARG_CONST_MAP_PTR,
+	.arg2_type	= ARG_PTR_TO_FUNC,
+	.arg3_type	= ARG_PTR_TO_STACK_OR_NULL,
+	.arg4_type	= ARG_ANYTHING,
+};
diff --git a/kernel/bpf/verifier.c b/kernel/bpf/verifier.c
index 970ec5c7ce05..211322b3317b 100644
--- a/kernel/bpf/verifier.c
+++ b/kernel/bpf/verifier.c
@@ -561,6 +561,7 @@ static const char *reg_type_str(struct bpf_verifier_env *env,
 		[PTR_TO_BUF]		= "buf",
 		[PTR_TO_FUNC]		= "func",
 		[PTR_TO_MAP_KEY]	= "map_key",
+		[PTR_TO_DYNPTR]		= "dynptr_ptr",
 	};
 
 	if (type & PTR_MAYBE_NULL) {
@@ -5662,6 +5663,12 @@ static const struct bpf_reg_types stack_ptr_types = { .types = { PTR_TO_STACK }
 static const struct bpf_reg_types const_str_ptr_types = { .types = { PTR_TO_MAP_VALUE } };
 static const struct bpf_reg_types timer_types = { .types = { PTR_TO_MAP_VALUE } };
 static const struct bpf_reg_types kptr_types = { .types = { PTR_TO_MAP_VALUE } };
+static const struct bpf_reg_types dynptr_types = {
+	.types = {
+		PTR_TO_STACK,
+		PTR_TO_DYNPTR | MEM_ALLOC | DYNPTR_TYPE_LOCAL,
+	}
+};
 
 static const struct bpf_reg_types *compatible_reg_types[__BPF_ARG_TYPE_MAX] = {
 	[ARG_PTR_TO_MAP_KEY]		= &map_key_value_types,
@@ -5688,7 +5695,7 @@ static const struct bpf_reg_types *compatible_reg_types[__BPF_ARG_TYPE_MAX] = {
 	[ARG_PTR_TO_CONST_STR]		= &const_str_ptr_types,
 	[ARG_PTR_TO_TIMER]		= &timer_types,
 	[ARG_PTR_TO_KPTR]		= &kptr_types,
-	[ARG_PTR_TO_DYNPTR]		= &stack_ptr_types,
+	[ARG_PTR_TO_DYNPTR]		= &dynptr_types,
 };
 
 static int check_reg_type(struct bpf_verifier_env *env, u32 regno,
@@ -6031,6 +6038,13 @@ static int check_func_arg(struct bpf_verifier_env *env, u32 arg,
 		err = check_mem_size_reg(env, reg, regno, true, meta);
 		break;
 	case ARG_PTR_TO_DYNPTR:
+		/* We only need to check for initialized / uninitialized helper
+		 * dynptr args if the dynptr is not MEM_ALLOC, as the assumption
+		 * is that if it is, that a helper function initialized the
+		 * dynptr on behalf of the BPF program.
+		 */
+		if (reg->type & MEM_ALLOC)
+			break;
 		if (arg_type & MEM_UNINIT) {
 			if (!is_dynptr_reg_valid_uninit(env, reg)) {
 				verbose(env, "Dynptr has to be an uninitialized dynptr\n");
@@ -6203,7 +6217,9 @@ static int check_map_func_compatibility(struct bpf_verifier_env *env,
 			goto error;
 		break;
 	case BPF_MAP_TYPE_USER_RINGBUF:
-		goto error;
+		if (func_id != BPF_FUNC_user_ringbuf_drain)
+			goto error;
+		break;
 	case BPF_MAP_TYPE_STACK_TRACE:
 		if (func_id != BPF_FUNC_get_stackid)
 			goto error;
@@ -6323,6 +6339,10 @@ static int check_map_func_compatibility(struct bpf_verifier_env *env,
 		if (map->map_type != BPF_MAP_TYPE_RINGBUF)
 			goto error;
 		break;
+	case BPF_FUNC_user_ringbuf_drain:
+		if (map->map_type != BPF_MAP_TYPE_USER_RINGBUF)
+			goto error;
+		break;
 	case BPF_FUNC_get_stackid:
 		if (map->map_type != BPF_MAP_TYPE_STACK_TRACE)
 			goto error;
@@ -6878,6 +6898,29 @@ static int set_find_vma_callback_state(struct bpf_verifier_env *env,
 	return 0;
 }
 
+static int set_user_ringbuf_callback_state(struct bpf_verifier_env *env,
+					   struct bpf_func_state *caller,
+					   struct bpf_func_state *callee,
+					   int insn_idx)
+{
+	/* bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void
+	 *			  callback_ctx, u64 flags);
+	 * callback_fn(struct bpf_dynptr_t* dynptr, void *callback_ctx);
+	 */
+	__mark_reg_not_init(env, &callee->regs[BPF_REG_0]);
+	callee->regs[BPF_REG_1].type = PTR_TO_DYNPTR | DYNPTR_TYPE_LOCAL | MEM_ALLOC;
+	__mark_reg_known_zero(&callee->regs[BPF_REG_1]);
+	callee->regs[BPF_REG_2] = caller->regs[BPF_REG_3];
+
+	/* unused */
+	__mark_reg_not_init(env, &callee->regs[BPF_REG_3]);
+	__mark_reg_not_init(env, &callee->regs[BPF_REG_4]);
+	__mark_reg_not_init(env, &callee->regs[BPF_REG_5]);
+
+	callee->in_callback_fn = true;
+	return 0;
+}
+
 static int prepare_func_exit(struct bpf_verifier_env *env, int *insn_idx)
 {
 	struct bpf_verifier_state *state = env->cur_state;
@@ -7156,7 +7199,7 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
 	struct bpf_reg_state *regs;
 	struct bpf_call_arg_meta meta;
 	int insn_idx = *insn_idx_p;
-	bool changes_data;
+	bool changes_data, mem_alloc_dynptr;
 	int i, err, func_id;
 
 	/* find function prototype */
@@ -7323,22 +7366,34 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
 		}
 		break;
 	case BPF_FUNC_dynptr_data:
+		mem_alloc_dynptr = false;
 		for (i = 0; i < MAX_BPF_FUNC_REG_ARGS; i++) {
 			if (arg_type_is_dynptr(fn->arg_type[i])) {
+				struct bpf_reg_state *reg = &regs[BPF_REG_1 + i];
+
 				if (meta.ref_obj_id) {
 					verbose(env, "verifier internal error: meta.ref_obj_id already set\n");
 					return -EFAULT;
 				}
-				/* Find the id of the dynptr we're tracking the reference of */
-				meta.ref_obj_id = stack_slot_get_id(env, &regs[BPF_REG_1 + i]);
+
+				mem_alloc_dynptr = reg->type & MEM_ALLOC;
+				if (!mem_alloc_dynptr)
+					/* Find the id of the dynptr we're
+					 * tracking the reference of
+					 */
+					meta.ref_obj_id = stack_slot_get_id(env, reg);
 				break;
 			}
 		}
-		if (i == MAX_BPF_FUNC_REG_ARGS) {
+		if (!mem_alloc_dynptr && i == MAX_BPF_FUNC_REG_ARGS) {
 			verbose(env, "verifier internal error: no dynptr in bpf_dynptr_data()\n");
 			return -EFAULT;
 		}
 		break;
+	case BPF_FUNC_user_ringbuf_drain:
+		err = __check_func_call(env, insn, insn_idx_p, meta.subprogno,
+					set_user_ringbuf_callback_state);
+		break;
 	}
 
 	if (err)
diff --git a/tools/include/uapi/linux/bpf.h b/tools/include/uapi/linux/bpf.h
index ee04b71969b4..76909f43fc0e 100644
--- a/tools/include/uapi/linux/bpf.h
+++ b/tools/include/uapi/linux/bpf.h
@@ -5354,6 +5354,12 @@ union bpf_attr {
  *	Return
  *		Current *ktime*.
  *
+ * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
+ *	Description
+ *		Drain samples from the specified user ringbuffer, and invoke the
+ *		provided callback for each such sample.
+ *	Return
+ *		An error if a sample could not be drained.
  */
 #define __BPF_FUNC_MAPPER(FN)		\
 	FN(unspec),			\
@@ -5565,6 +5571,7 @@ union bpf_attr {
 	FN(tcp_raw_check_syncookie_ipv4),	\
 	FN(tcp_raw_check_syncookie_ipv6),	\
 	FN(ktime_get_tai_ns),		\
+	FN(bpf_user_ringbuf_drain),	\
 	/* */
 
 /* integer value in 'imm' field of BPF_CALL instruction selects which helper
-- 
2.37.1


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH v2 3/4] bpf: Add libbpf logic for user-space ring buffer
  2022-08-11 23:49 [PATCH v2 0/4] bpf: Add user-space-publisher ringbuffer map type David Vernet
  2022-08-11 23:49 ` [PATCH v2 1/4] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF " David Vernet
  2022-08-11 23:49 ` [PATCH v2 2/4] bpf: Add bpf_user_ringbuf_drain() helper David Vernet
@ 2022-08-11 23:49 ` David Vernet
  2022-08-11 23:49 ` [PATCH v2 4/4] selftests/bpf: Add selftests validating the user ringbuf David Vernet
  2022-08-11 23:59 ` [PATCH v2 0/4] bpf: Add user-space-publisher ringbuffer map type David Vernet
  4 siblings, 0 replies; 9+ messages in thread
From: David Vernet @ 2022-08-11 23:49 UTC (permalink / raw)
  To: bpf, andrii, ast, daniel
  Cc: haoluo, joannelkoong, john.fastabend, jolsa, kpsingh,
	linux-kernel, martin.lau, sdf, song, yhs, kernel-team, tj

Now that all of the logic is in place in the kernel to support user-space
produced ringbuffers, we can add the user-space logic to libbpf.

Signed-off-by: David Vernet <void@manifault.com>
---
 kernel/bpf/ringbuf.c          |   7 +-
 tools/lib/bpf/libbpf.c        |  10 +-
 tools/lib/bpf/libbpf.h        |  19 +++
 tools/lib/bpf/libbpf.map      |   6 +
 tools/lib/bpf/libbpf_probes.c |   1 +
 tools/lib/bpf/ringbuf.c       | 216 ++++++++++++++++++++++++++++++++++
 6 files changed, 256 insertions(+), 3 deletions(-)

diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
index 73fa6ed12052..18ae9c419df0 100644
--- a/kernel/bpf/ringbuf.c
+++ b/kernel/bpf/ringbuf.c
@@ -639,7 +639,9 @@ static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
 	if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
 		return -EBUSY;
 
-	/* Synchronizes with smp_store_release() in user-space. */
+	/* Synchronizes with smp_store_release() in __ring_buffer_user__commit()
+	 * in user-space.
+	 */
 	prod_pos = smp_load_acquire(&rb->producer_pos);
 	/* Synchronizes with smp_store_release() in
 	 * __bpf_user_ringbuf_sample_release().
@@ -695,6 +697,9 @@ __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
 	/* To release the ringbuffer, just increment the producer position to
 	 * signal that a new sample can be consumed. The busy bit is cleared by
 	 * userspace when posting a new sample to the ringbuffer.
+	 *
+	 * Synchronizes with smp_load_acquire() in ring_buffer_user__reserve()
+	 * in user-space.
 	 */
 	smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
 			  BPF_RINGBUF_HDR_SZ);
diff --git a/tools/lib/bpf/libbpf.c b/tools/lib/bpf/libbpf.c
index 244d7b883dc8..97bbde53128e 100644
--- a/tools/lib/bpf/libbpf.c
+++ b/tools/lib/bpf/libbpf.c
@@ -2370,6 +2370,12 @@ static size_t adjust_ringbuf_sz(size_t sz)
 	return sz;
 }
 
+static bool map_is_ringbuf(const struct bpf_map *map)
+{
+	return map->def.type == BPF_MAP_TYPE_RINGBUF ||
+	       map->def.type == BPF_MAP_TYPE_USER_RINGBUF;
+}
+
 static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def)
 {
 	map->def.type = def->map_type;
@@ -2384,7 +2390,7 @@ static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def
 	map->btf_value_type_id = def->value_type_id;
 
 	/* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */
-	if (map->def.type == BPF_MAP_TYPE_RINGBUF)
+	if (map_is_ringbuf(map))
 		map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries);
 
 	if (def->parts & MAP_DEF_MAP_TYPE)
@@ -4366,7 +4372,7 @@ int bpf_map__set_max_entries(struct bpf_map *map, __u32 max_entries)
 	map->def.max_entries = max_entries;
 
 	/* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */
-	if (map->def.type == BPF_MAP_TYPE_RINGBUF)
+	if (map_is_ringbuf(map))
 		map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries);
 
 	return 0;
diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h
index 61493c4cddac..6d1d0539b08d 100644
--- a/tools/lib/bpf/libbpf.h
+++ b/tools/lib/bpf/libbpf.h
@@ -1009,6 +1009,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook,
 
 /* Ring buffer APIs */
 struct ring_buffer;
+struct ring_buffer_user;
 
 typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size);
 
@@ -1028,6 +1029,24 @@ LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms);
 LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb);
 LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb);
 
+struct ring_buffer_user_opts {
+	size_t sz; /* size of this struct, for forward/backward compatibility */
+};
+
+#define ring_buffer_user_opts__last_field sz
+
+LIBBPF_API struct ring_buffer_user *
+ring_buffer_user__new(int map_fd, const struct ring_buffer_user_opts *opts);
+LIBBPF_API void *ring_buffer_user__reserve(struct ring_buffer_user *rb,
+					   uint32_t size);
+LIBBPF_API void *ring_buffer_user__poll(struct ring_buffer_user *rb,
+					uint32_t size, int timeout_ms);
+LIBBPF_API void ring_buffer_user__submit(struct ring_buffer_user *rb,
+					 void *sample);
+LIBBPF_API void ring_buffer_user__discard(struct ring_buffer_user *rb,
+					  void *sample);
+LIBBPF_API void ring_buffer_user__free(struct ring_buffer_user *rb);
+
 /* Perf buffer APIs */
 struct perf_buffer;
 
diff --git a/tools/lib/bpf/libbpf.map b/tools/lib/bpf/libbpf.map
index 119e6e1ea7f1..8db11040df1b 100644
--- a/tools/lib/bpf/libbpf.map
+++ b/tools/lib/bpf/libbpf.map
@@ -365,4 +365,10 @@ LIBBPF_1.0.0 {
 		libbpf_bpf_map_type_str;
 		libbpf_bpf_prog_type_str;
 		perf_buffer__buffer;
+		ring_buffer_user__discard;
+		ring_buffer_user__free;
+		ring_buffer_user__new;
+		ring_buffer_user__poll;
+		ring_buffer_user__reserve;
+		ring_buffer_user__submit;
 };
diff --git a/tools/lib/bpf/libbpf_probes.c b/tools/lib/bpf/libbpf_probes.c
index 6d495656f554..f3a8e8e74eb8 100644
--- a/tools/lib/bpf/libbpf_probes.c
+++ b/tools/lib/bpf/libbpf_probes.c
@@ -231,6 +231,7 @@ static int probe_map_create(enum bpf_map_type map_type)
 			return btf_fd;
 		break;
 	case BPF_MAP_TYPE_RINGBUF:
+	case BPF_MAP_TYPE_USER_RINGBUF:
 		key_size = 0;
 		value_size = 0;
 		max_entries = 4096;
diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c
index 8bc117bcc7bc..86e3c11d8486 100644
--- a/tools/lib/bpf/ringbuf.c
+++ b/tools/lib/bpf/ringbuf.c
@@ -39,6 +39,17 @@ struct ring_buffer {
 	int ring_cnt;
 };
 
+struct ring_buffer_user {
+	struct epoll_event event;
+	unsigned long *consumer_pos;
+	unsigned long *producer_pos;
+	void *data;
+	unsigned long mask;
+	size_t page_size;
+	int map_fd;
+	int epoll_fd;
+};
+
 static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r)
 {
 	if (r->consumer_pos) {
@@ -300,3 +311,208 @@ int ring_buffer__epoll_fd(const struct ring_buffer *rb)
 {
 	return rb->epoll_fd;
 }
+
+static void __user_ringbuf_unmap_ring(struct ring_buffer_user *rb)
+{
+	if (rb->consumer_pos) {
+		munmap(rb->consumer_pos, rb->page_size);
+		rb->consumer_pos = NULL;
+	}
+	if (rb->producer_pos) {
+		munmap(rb->producer_pos, rb->page_size + 2 * (rb->mask + 1));
+		rb->producer_pos = NULL;
+	}
+}
+
+void ring_buffer_user__free(struct ring_buffer_user *rb)
+{
+	if (!rb)
+		return;
+
+	__user_ringbuf_unmap_ring(rb);
+
+	if (rb->epoll_fd >= 0)
+		close(rb->epoll_fd);
+
+	free(rb);
+}
+
+static int __ring_buffer_user_map(struct ring_buffer_user *rb, int map_fd)
+{
+
+	struct bpf_map_info info;
+	__u32 len = sizeof(info);
+	void *tmp;
+	struct epoll_event *rb_epoll;
+	int err;
+
+	memset(&info, 0, sizeof(info));
+
+	err = bpf_obj_get_info_by_fd(map_fd, &info, &len);
+	if (err) {
+		err = -errno;
+		pr_warn("user ringbuf: failed to get map info for fd=%d: %d\n",
+			map_fd, err);
+		return libbpf_err(err);
+	}
+
+	if (info.type != BPF_MAP_TYPE_USER_RINGBUF) {
+		pr_warn("user ringbuf: map fd=%d is not BPF_MAP_TYPE_USER_RINGBUF\n",
+			map_fd);
+		return libbpf_err(-EINVAL);
+	}
+
+	rb->map_fd = map_fd;
+	rb->mask = info.max_entries - 1;
+
+	/* Map read-only consumer page */
+	tmp = mmap(NULL, rb->page_size, PROT_READ, MAP_SHARED, map_fd, 0);
+	if (tmp == MAP_FAILED) {
+		err = -errno;
+		pr_warn("user ringbuf: failed to mmap consumer page for map fd=%d: %d\n",
+			map_fd, err);
+		return libbpf_err(err);
+	}
+	rb->consumer_pos = tmp;
+
+	/* Map read-write the producer page and data pages. We map the data
+	 * region as twice the total size of the ringbuffer to allow the simple
+	 * reading and writing of samples that wrap around the end of the
+	 * buffer.  See the kernel implementation for details.
+	 */
+	tmp = mmap(NULL, rb->page_size + 2 * info.max_entries,
+		   PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, rb->page_size);
+	if (tmp == MAP_FAILED) {
+		err = -errno;
+		pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %d\n",
+			map_fd, err);
+		return libbpf_err(err);
+	}
+
+	rb->producer_pos = tmp;
+	rb->data = tmp + rb->page_size;
+
+	rb_epoll = &rb->event;
+	rb_epoll->events = EPOLLOUT;
+	if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, rb_epoll) < 0) {
+		err = -errno;
+		pr_warn("user ringbuf: failed to epoll add map fd=%d: %d\n",
+			map_fd, err);
+		return libbpf_err(err);
+	}
+
+	return 0;
+}
+
+struct ring_buffer_user *
+ring_buffer_user__new(int map_fd, const struct ring_buffer_user_opts *opts)
+{
+	struct ring_buffer_user *rb;
+	int err;
+
+	if (!OPTS_VALID(opts, ring_buffer_opts))
+		return errno = EINVAL, NULL;
+
+	rb = calloc(1, sizeof(*rb));
+	if (!rb)
+		return errno = ENOMEM, NULL;
+
+	rb->page_size = getpagesize();
+
+	rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+	if (rb->epoll_fd < 0) {
+		err = -errno;
+		pr_warn("user ringbuf: failed to create epoll instance: %d\n",
+			err);
+		goto err_out;
+	}
+
+	err = __ring_buffer_user_map(rb, map_fd);
+	if (err)
+		goto err_out;
+
+	return rb;
+
+err_out:
+	ring_buffer_user__free(rb);
+	return errno = -err, NULL;
+}
+
+static void __ring_buffer_user__commit(struct ring_buffer_user *rb)
+{
+	uint32_t *hdr;
+	uint32_t total_len;
+	unsigned long prod_pos;
+
+	prod_pos = *rb->producer_pos;
+	hdr = rb->data + (prod_pos & rb->mask);
+
+	total_len = *hdr + BPF_RINGBUF_HDR_SZ;
+
+	/* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_poll() in
+	 * the kernel.
+	 */
+	smp_store_release(rb->producer_pos, prod_pos + total_len);
+}
+
+/* Discard a previously reserved sample into the ring buffer. Because the user
+ * ringbuffer is assumed to be single producer, this can simply be a no-op, and
+ * the producer pointer is left in the same place as when it was reserved.
+ */
+void ring_buffer_user__discard(struct ring_buffer_user *rb, void *sample)
+{}
+
+/* Submit a previously reserved sample into the ring buffer.
+ */
+void ring_buffer_user__submit(struct ring_buffer_user *rb, void *sample)
+{
+	__ring_buffer_user__commit(rb);
+}
+
+/* Reserve a pointer to a sample in the user ring buffer. This function is *not*
+ * thread safe, and the ring-buffer supports only a single producer.
+ */
+void *ring_buffer_user__reserve(struct ring_buffer_user *rb, uint32_t size)
+{
+	uint32_t *hdr;
+	/* 64-bit to avoid overflow in case of extreme application behavior */
+	size_t avail_size, total_size, max_size;
+	unsigned long cons_pos, prod_pos;
+
+	/* Synchronizes with smp_store_release() in __bpf_user_ringbuf_poll() in
+	 * the kernel.
+	 */
+	cons_pos = smp_load_acquire(rb->consumer_pos);
+	/* Synchronizes with smp_store_release() in __ring_buffer_user__commit()
+	 */
+	prod_pos = smp_load_acquire(rb->producer_pos);
+
+	max_size = rb->mask + 1;
+	avail_size = max_size - (prod_pos - cons_pos);
+	total_size = size + BPF_RINGBUF_HDR_SZ;
+
+	if (total_size > max_size || avail_size < total_size)
+		return NULL;
+
+	hdr = rb->data + (prod_pos & rb->mask);
+	*hdr = size;
+
+	/* Producer pos is updated when a sample is submitted. */
+
+	return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask);
+}
+
+/* Poll for available space in the ringbuffer, and reserve a record when it
+ * becomes available.
+ */
+void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size,
+			     int timeout_ms)
+{
+	int cnt;
+
+	cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, timeout_ms);
+	if (cnt < 0)
+		return NULL;
+
+	return ring_buffer_user__reserve(rb, size);
+}
-- 
2.37.1


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH v2 4/4] selftests/bpf: Add selftests validating the user ringbuf
  2022-08-11 23:49 [PATCH v2 0/4] bpf: Add user-space-publisher ringbuffer map type David Vernet
                   ` (2 preceding siblings ...)
  2022-08-11 23:49 ` [PATCH v2 3/4] bpf: Add libbpf logic for user-space ring buffer David Vernet
@ 2022-08-11 23:49 ` David Vernet
  2022-08-11 23:59 ` [PATCH v2 0/4] bpf: Add user-space-publisher ringbuffer map type David Vernet
  4 siblings, 0 replies; 9+ messages in thread
From: David Vernet @ 2022-08-11 23:49 UTC (permalink / raw)
  To: bpf, andrii, ast, daniel
  Cc: haoluo, joannelkoong, john.fastabend, jolsa, kpsingh,
	linux-kernel, martin.lau, sdf, song, yhs, kernel-team, tj

This change includes selftests that validate the expected behavior and
APIs of the new BPF_MAP_TYPE_USER_RINGBUF map type.

Signed-off-by: David Vernet <void@manifault.com>
---
 .../selftests/bpf/prog_tests/user_ringbuf.c   | 587 ++++++++++++++++++
 .../selftests/bpf/progs/user_ringbuf_fail.c   | 174 ++++++
 .../bpf/progs/user_ringbuf_success.c          | 220 +++++++
 .../testing/selftests/bpf/test_user_ringbuf.h |  35 ++
 4 files changed, 1016 insertions(+)
 create mode 100644 tools/testing/selftests/bpf/prog_tests/user_ringbuf.c
 create mode 100644 tools/testing/selftests/bpf/progs/user_ringbuf_fail.c
 create mode 100644 tools/testing/selftests/bpf/progs/user_ringbuf_success.c
 create mode 100644 tools/testing/selftests/bpf/test_user_ringbuf.h

diff --git a/tools/testing/selftests/bpf/prog_tests/user_ringbuf.c b/tools/testing/selftests/bpf/prog_tests/user_ringbuf.c
new file mode 100644
index 000000000000..d62adb74da73
--- /dev/null
+++ b/tools/testing/selftests/bpf/prog_tests/user_ringbuf.c
@@ -0,0 +1,587 @@
+// SPDX-License-Identifier: GPL-2.0
+/* Copyright (c) 2022 Meta Platforms, Inc. and affiliates. */
+
+#define _GNU_SOURCE
+#include <linux/compiler.h>
+#include <linux/ring_buffer.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/mman.h>
+#include <sys/syscall.h>
+#include <sys/sysinfo.h>
+#include <test_progs.h>
+#include <unistd.h>
+
+#include "user_ringbuf_fail.skel.h"
+#include "user_ringbuf_success.skel.h"
+
+#include "../test_user_ringbuf.h"
+
+static int duration;
+static size_t log_buf_sz = 1 << 20; /* 1 MB */
+static char obj_log_buf[1048576];
+static const long c_sample_size = sizeof(struct sample) + BPF_RINGBUF_HDR_SZ;
+static const long c_ringbuf_size = 1 << 12; /* 1 small page */
+static const long c_max_entries = c_ringbuf_size / c_sample_size;
+
+static void drain_current_samples(void)
+{
+	syscall(__NR_getpgid);
+}
+
+static int write_samples(struct ring_buffer_user *ringbuf, uint32_t num_samples)
+{
+	int i, err = 0;
+
+	// Write some number of samples to the ring buffer.
+	for (i = 0; i < num_samples; i++) {
+		struct sample *entry;
+		int read;
+
+		entry = ring_buffer_user__reserve(ringbuf, sizeof(*entry));
+		if (!entry) {
+			err = -ENOMEM;
+			goto done;
+		}
+
+		entry->pid = getpid();
+		entry->seq = i;
+		entry->value = i * i;
+
+		read = snprintf(entry->comm, sizeof(entry->comm), "%u", i);
+		if (read <= 0) {
+			/* Only invoke CHECK on the error path to avoid spamming
+			 * logs with mostly success messages.
+			 */
+			CHECK(read <= 0, "snprintf_comm",
+			      "Failed to write index %d to comm\n", i);
+			err = read;
+			ring_buffer_user__discard(ringbuf, entry);
+			goto done;
+		}
+
+		ring_buffer_user__submit(ringbuf, entry);
+	}
+
+done:
+	drain_current_samples();
+
+	return err;
+}
+
+static struct user_ringbuf_success*
+open_load_ringbuf_skel(void)
+{
+	struct user_ringbuf_success *skel;
+	int err;
+
+	skel = user_ringbuf_success__open();
+	if (CHECK(!skel, "skel_open", "skeleton open failed\n"))
+		return NULL;
+
+	err = bpf_map__set_max_entries(skel->maps.user_ringbuf,
+				       c_ringbuf_size);
+	if (CHECK(err != 0, "set_max_entries", "set max entries failed: %d\n", err))
+		goto cleanup;
+
+	err = bpf_map__set_max_entries(skel->maps.kernel_ringbuf,
+				       c_ringbuf_size);
+	if (CHECK(err != 0, "set_max_entries", "set max entries failed: %d\n", err))
+		goto cleanup;
+
+	err = user_ringbuf_success__load(skel);
+	if (CHECK(err != 0, "skel_load", "skeleton load failed\n"))
+		goto cleanup;
+
+	return skel;
+
+cleanup:
+	user_ringbuf_success__destroy(skel);
+	return NULL;
+}
+
+static void test_user_ringbuf_mappings(void)
+{
+	int err, rb_fd;
+	int page_size = getpagesize();
+	void *mmap_ptr;
+	struct user_ringbuf_success *skel;
+
+	skel = open_load_ringbuf_skel();
+	if (!skel)
+		return;
+
+	rb_fd = bpf_map__fd(skel->maps.user_ringbuf);
+	/* cons_pos can be mapped R/O, can't add +X with mprotect. */
+	mmap_ptr = mmap(NULL, page_size, PROT_READ, MAP_SHARED, rb_fd, 0);
+	ASSERT_OK_PTR(mmap_ptr, "ro_cons_pos");
+	ASSERT_ERR(mprotect(mmap_ptr, page_size, PROT_WRITE), "write_cons_pos_protect");
+	ASSERT_ERR(mprotect(mmap_ptr, page_size, PROT_EXEC), "exec_cons_pos_protect");
+	ASSERT_ERR_PTR(mremap(mmap_ptr, 0, 4 * page_size, MREMAP_MAYMOVE), "wr_prod_pos");
+	err = -errno;
+	ASSERT_EQ(err, -EPERM, "wr_prod_pos_err");
+	ASSERT_OK(munmap(mmap_ptr, page_size), "unmap_ro_cons");
+
+	/* prod_pos can be mapped RW, can't add +X with mprotect. */
+	mmap_ptr = mmap(NULL, page_size, PROT_READ | PROT_WRITE, MAP_SHARED,
+			rb_fd, page_size);
+	ASSERT_OK_PTR(mmap_ptr, "rw_prod_pos");
+	ASSERT_ERR(mprotect(mmap_ptr, page_size, PROT_EXEC), "exec_prod_pos_protect");
+	err = -errno;
+	ASSERT_EQ(err, -EACCES, "wr_prod_pos_err");
+	ASSERT_OK(munmap(mmap_ptr, page_size), "unmap_rw_prod");
+
+	/* data pages can be mapped RW, can't add +X with mprotect. */
+	mmap_ptr = mmap(NULL, page_size, PROT_WRITE, MAP_SHARED, rb_fd,
+			2 * page_size);
+	ASSERT_OK_PTR(mmap_ptr, "rw_data");
+	ASSERT_ERR(mprotect(mmap_ptr, page_size, PROT_EXEC), "exec_data_protect");
+	err = -errno;
+	ASSERT_EQ(err, -EACCES, "exec_data_err");
+	ASSERT_OK(munmap(mmap_ptr, page_size), "unmap_rw_data");
+
+	user_ringbuf_success__destroy(skel);
+}
+
+static int load_skel_create_ringbufs(struct user_ringbuf_success **skel_out,
+				     struct ring_buffer **kern_ringbuf_out,
+				     ring_buffer_sample_fn callback,
+				     struct ring_buffer_user **user_ringbuf_out)
+{
+	struct user_ringbuf_success *skel;
+	struct ring_buffer *kern_ringbuf = NULL;
+	struct ring_buffer_user *user_ringbuf = NULL;
+	int err = -ENOMEM, rb_fd;
+
+	skel = open_load_ringbuf_skel();
+	if (!skel)
+		return err;
+
+	/* only trigger BPF program for current process */
+	skel->bss->pid = getpid();
+
+	if (kern_ringbuf_out) {
+		rb_fd = bpf_map__fd(skel->maps.kernel_ringbuf);
+		kern_ringbuf = ring_buffer__new(rb_fd, callback, skel, NULL);
+		if (CHECK(!kern_ringbuf, "kern_ringbuf_create",
+					"failed to create kern ringbuf\n"))
+			goto cleanup;
+
+		*kern_ringbuf_out = kern_ringbuf;
+	}
+
+	if (user_ringbuf_out) {
+		rb_fd = bpf_map__fd(skel->maps.user_ringbuf);
+		user_ringbuf = ring_buffer_user__new(rb_fd, NULL);
+		if (CHECK(!user_ringbuf, "user_ringbuf_create",
+			  "failed to create user ringbuf\n"))
+			goto cleanup;
+
+		*user_ringbuf_out = user_ringbuf;
+		ASSERT_EQ(skel->bss->read, 0, "no_reads_after_load");
+	}
+
+	err = user_ringbuf_success__attach(skel);
+	if (CHECK(err != 0, "skel_attach", "skeleton attachment failed: %d\n",
+		  err))
+		goto cleanup;
+
+	*skel_out = skel;
+	return 0;
+
+cleanup:
+	if (kern_ringbuf_out)
+		*kern_ringbuf_out = NULL;
+	if (user_ringbuf_out)
+		*user_ringbuf_out = NULL;
+	ring_buffer__free(kern_ringbuf);
+	ring_buffer_user__free(user_ringbuf);
+	user_ringbuf_success__destroy(skel);
+	return err;
+}
+
+static int
+load_skel_create_user_ringbuf(struct user_ringbuf_success **skel_out,
+			      struct ring_buffer_user **ringbuf_out)
+{
+	return load_skel_create_ringbufs(skel_out, NULL, NULL, ringbuf_out);
+}
+
+static void test_user_ringbuf_commit(void)
+{
+	struct user_ringbuf_success *skel;
+	struct ring_buffer_user *ringbuf;
+	int err;
+
+	err = load_skel_create_user_ringbuf(&skel, &ringbuf);
+	if (err)
+		return;
+
+	ASSERT_EQ(skel->bss->read, 0, "num_samples_read_before");
+
+	err = write_samples(ringbuf, 2);
+	if (CHECK(err, "write_samples", "failed to write samples: %d\n", err))
+		goto cleanup;
+
+	ASSERT_EQ(skel->bss->read, 2, "num_samples_read_after");
+
+cleanup:
+	ring_buffer_user__free(ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static void test_user_ringbuf_fill(void)
+{
+	struct user_ringbuf_success *skel;
+	struct ring_buffer_user *ringbuf;
+	int err;
+
+	err = load_skel_create_user_ringbuf(&skel, &ringbuf);
+	if (err)
+		return;
+
+	err = write_samples(ringbuf, c_max_entries * 5);
+	ASSERT_EQ(err, -ENOMEM, "too_many_samples_posted");
+	ASSERT_EQ(skel->bss->read, c_max_entries, "max_entries");
+
+	ring_buffer_user__free(ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static void test_user_ringbuf_loop(void)
+{
+	struct user_ringbuf_success *skel;
+	struct ring_buffer_user *ringbuf;
+	uint32_t total_samples = 8192;
+	uint32_t remaining_samples = total_samples;
+	int err;
+
+	err = load_skel_create_user_ringbuf(&skel, &ringbuf);
+	if (err)
+		return;
+
+	do  {
+		uint32_t curr_samples;
+
+		curr_samples = remaining_samples > c_max_entries
+			? c_max_entries : remaining_samples;
+		err = write_samples(ringbuf, curr_samples);
+		if (err != 0) {
+			/* Perform CHECK inside of if statement to avoid
+			 * flooding logs on the success path.
+			 */
+			CHECK(err, "write_samples",
+					"failed to write sample batch: %d\n", err);
+			goto cleanup;
+		}
+
+		remaining_samples -= curr_samples;
+		ASSERT_EQ(skel->bss->read, total_samples - remaining_samples,
+			  "current_batched_entries");
+	} while (remaining_samples > 0);
+	ASSERT_EQ(skel->bss->read, total_samples, "total_batched_entries");
+
+cleanup:
+	ring_buffer_user__free(ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static int send_test_message(struct ring_buffer_user *ringbuf,
+			     enum test_msg_op op, s64 operand_64,
+			     s32 operand_32)
+{
+	struct test_msg *msg;
+
+	msg = ring_buffer_user__reserve(ringbuf, sizeof(*msg));
+	if (!msg) {
+		/* Only invoke CHECK on the error path to avoid spamming
+		 * logs with mostly success messages.
+		 */
+		CHECK(msg != NULL, "reserve_msg",
+		      "Failed to reserve message\n");
+		return -ENOMEM;
+	}
+
+	msg->msg_op = op;
+
+	switch (op) {
+	case TEST_MSG_OP_INC64:
+	case TEST_MSG_OP_MUL64:
+		msg->operand_64 = operand_64;
+		break;
+	case TEST_MSG_OP_INC32:
+	case TEST_MSG_OP_MUL32:
+		msg->operand_32 = operand_32;
+		break;
+	default:
+		PRINT_FAIL("Invalid operand %d\n", op);
+		ring_buffer_user__discard(ringbuf, msg);
+		return -EINVAL;
+	}
+
+	ring_buffer_user__submit(ringbuf, msg);
+
+	return 0;
+}
+
+static void kick_kernel_read_messages(void)
+{
+	syscall(__NR_getcwd);
+}
+
+static int handle_kernel_msg(void *ctx, void *data, size_t len)
+{
+	struct user_ringbuf_success *skel = ctx;
+	struct test_msg *msg = data;
+
+	switch (msg->msg_op) {
+	case TEST_MSG_OP_INC64:
+		skel->bss->user_mutated += msg->operand_64;
+		return 0;
+	case TEST_MSG_OP_INC32:
+		skel->bss->user_mutated += msg->operand_32;
+		return 0;
+	case TEST_MSG_OP_MUL64:
+		skel->bss->user_mutated *= msg->operand_64;
+		return 0;
+	case TEST_MSG_OP_MUL32:
+		skel->bss->user_mutated *= msg->operand_32;
+		return 0;
+	default:
+		fprintf(stderr, "Invalid operand %d\n", msg->msg_op);
+		return -EINVAL;
+	}
+}
+
+static void drain_kernel_messages_buffer(struct ring_buffer *kern_ringbuf)
+{
+	int err;
+
+	err = ring_buffer__consume(kern_ringbuf);
+	if (err)
+		/* Only check in failure to avoid spamming success logs. */
+		CHECK(!err, "consume_kern_ringbuf",
+		      "Failed to consume kernel ringbuf\n");
+}
+
+static void test_user_ringbuf_msg_protocol(void)
+{
+	struct user_ringbuf_success *skel;
+	struct ring_buffer_user *user_ringbuf;
+	struct ring_buffer *kern_ringbuf;
+	int err, i;
+	__u64 expected_kern = 0;
+
+	err = load_skel_create_ringbufs(&skel, &kern_ringbuf, handle_kernel_msg,
+					&user_ringbuf);
+	if (CHECK(err, "create_ringbufs", "Failed to create ringbufs: %d\n",
+		  err))
+		return;
+
+	for (i = 0; i < 64; i++) {
+		enum test_msg_op op = i % TEST_MSG_OP_NUM_OPS;
+		__u64 operand_64 = TEST_OP_64;
+		__u32 operand_32 = TEST_OP_32;
+
+		err = send_test_message(user_ringbuf, op, operand_64,
+					operand_32);
+		if (err) {
+			CHECK(err, "send_test_message",
+			"Failed to send test message\n");
+			goto cleanup;
+		}
+
+		switch (op) {
+		case TEST_MSG_OP_INC64:
+			expected_kern += operand_64;
+			break;
+		case TEST_MSG_OP_INC32:
+			expected_kern += operand_32;
+			break;
+		case TEST_MSG_OP_MUL64:
+			expected_kern *= operand_64;
+			break;
+		case TEST_MSG_OP_MUL32:
+			expected_kern *= operand_32;
+			break;
+		default:
+			PRINT_FAIL("Unexpected op %d\n", op);
+			goto cleanup;
+		}
+
+		if (i % 8 == 0) {
+			kick_kernel_read_messages();
+			ASSERT_EQ(skel->bss->kern_mutated, expected_kern,
+				  "expected_kern");
+			ASSERT_EQ(skel->bss->err, 0, "bpf_prog_err");
+			drain_kernel_messages_buffer(kern_ringbuf);
+		}
+	}
+
+cleanup:
+	ring_buffer__free(kern_ringbuf);
+	ring_buffer_user__free(user_ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static void *kick_kernel_cb(void *arg)
+{
+	/* Sleep to better exercise the path for the main thread waiting in
+	 * poll_wait().
+	 */
+	sleep(1);
+
+	/* Kick the kernel, causing it to drain the ringbuffer and then wake up
+	 * the test thread waiting on epoll.
+	 */
+	syscall(__NR_getrlimit);
+
+	return NULL;
+}
+
+static int spawn_kick_thread_for_poll(void)
+{
+	pthread_t thread;
+
+	return pthread_create(&thread, NULL, kick_kernel_cb, NULL);
+}
+
+static void test_user_ringbuf_poll_wait(void)
+{
+	struct user_ringbuf_success *skel;
+	struct ring_buffer_user *ringbuf;
+	int err, num_written = 0;
+	__u64 *token;
+
+	err = load_skel_create_user_ringbuf(&skel, &ringbuf);
+	if (err)
+		return;
+
+	ASSERT_EQ(skel->bss->read, 0, "num_samples_read_before");
+
+	while (1) {
+		/* Write samples until the buffer is full. */
+		token = ring_buffer_user__reserve(ringbuf, sizeof(*token));
+		if (!token)
+			break;
+
+		*token = 0xdeadbeef;
+
+		ring_buffer_user__submit(ringbuf, token);
+		num_written++;
+	}
+
+	if (!ASSERT_GE(num_written, 0, "num_written"))
+		goto cleanup;
+
+	/* Should not have read any samples until the kernel is kicked. */
+	ASSERT_EQ(skel->bss->read, 0, "num_pre_kick");
+
+	token = ring_buffer_user__poll(ringbuf, sizeof(*token), 1000);
+	if (!ASSERT_EQ(token, NULL, "pre_kick_timeout_token"))
+		goto cleanup;
+
+	err = spawn_kick_thread_for_poll();
+	if (!ASSERT_EQ(err, 0, "deferred_kick_thread\n"))
+		goto cleanup;
+
+	token = ring_buffer_user__poll(ringbuf, sizeof(*token), 10000);
+	if (!token) {
+		PRINT_FAIL("Failed to poll for user ringbuf entry\n");
+		ring_buffer_user__discard(ringbuf, token);
+		goto cleanup;
+	}
+
+	ASSERT_EQ(skel->bss->read, num_written, "num_post_kill");
+	ASSERT_EQ(skel->bss->err, 0, "err_post_poll");
+	ring_buffer_user__discard(ringbuf, token);
+
+cleanup:
+	ring_buffer_user__free(ringbuf);
+	user_ringbuf_success__destroy(skel);
+}
+
+static struct {
+	const char *prog_name;
+	const char *expected_err_msg;
+} failure_tests[] = {
+	/* failure cases */
+	{"user_ringbuf_callback_bad_access1", "negative offset alloc_dynptr_ptr ptr"},
+	{"user_ringbuf_callback_bad_access2", "dereference of modified alloc_dynptr_ptr ptr"},
+	{"user_ringbuf_callback_write_forbidden", "invalid mem access 'alloc_dynptr_ptr'"},
+	{"user_ringbuf_callback_null_context_write", "invalid mem access 'scalar'"},
+	{"user_ringbuf_callback_null_context_read", "invalid mem access 'scalar'"},
+	{"user_ringbuf_callback_discard_dynptr", "arg 1 is an unacquired reference"},
+	{"user_ringbuf_callback_submit_dynptr", "arg 1 is an unacquired reference"},
+};
+
+#define SUCCESS_TEST(_func) { _func, #_func }
+
+static struct {
+	void (*test_callback)(void);
+	const char *test_name;
+} success_tests[] = {
+	SUCCESS_TEST(test_user_ringbuf_mappings),
+	SUCCESS_TEST(test_user_ringbuf_commit),
+	SUCCESS_TEST(test_user_ringbuf_fill),
+	SUCCESS_TEST(test_user_ringbuf_loop),
+	SUCCESS_TEST(test_user_ringbuf_msg_protocol),
+	SUCCESS_TEST(test_user_ringbuf_poll_wait),
+};
+
+static void verify_fail(const char *prog_name, const char *expected_err_msg)
+{
+	LIBBPF_OPTS(bpf_object_open_opts, opts);
+	struct bpf_program *prog;
+	struct user_ringbuf_fail *skel;
+	int err;
+
+	opts.kernel_log_buf = obj_log_buf;
+	opts.kernel_log_size = log_buf_sz;
+	opts.kernel_log_level = 1;
+
+	skel = user_ringbuf_fail__open_opts(&opts);
+	if (!ASSERT_OK_PTR(skel, "dynptr_fail__open_opts"))
+		goto cleanup;
+
+	prog = bpf_object__find_program_by_name(skel->obj, prog_name);
+	if (!ASSERT_OK_PTR(prog, "bpf_object__find_program_by_name"))
+		goto cleanup;
+
+	bpf_program__set_autoload(prog, true);
+
+	bpf_map__set_max_entries(skel->maps.user_ringbuf, getpagesize());
+
+	err = user_ringbuf_fail__load(skel);
+	if (!ASSERT_ERR(err, "unexpected load success"))
+		goto cleanup;
+
+	if (!ASSERT_OK_PTR(strstr(obj_log_buf, expected_err_msg), "expected_err_msg")) {
+		fprintf(stderr, "Expected err_msg: %s\n", expected_err_msg);
+		fprintf(stderr, "Verifier output: %s\n", obj_log_buf);
+	}
+
+cleanup:
+	user_ringbuf_fail__destroy(skel);
+}
+
+void test_user_ringbuf(void)
+{
+	int i;
+
+	for (i = 0; i < ARRAY_SIZE(success_tests); i++) {
+		if (!test__start_subtest(success_tests[i].test_name))
+			continue;
+
+		success_tests[i].test_callback();
+	}
+
+	for (i = 0; i < ARRAY_SIZE(failure_tests); i++) {
+		if (!test__start_subtest(failure_tests[i].prog_name))
+			continue;
+
+		verify_fail(failure_tests[i].prog_name,
+			    failure_tests[i].expected_err_msg);
+	}
+}
diff --git a/tools/testing/selftests/bpf/progs/user_ringbuf_fail.c b/tools/testing/selftests/bpf/progs/user_ringbuf_fail.c
new file mode 100644
index 000000000000..237df8ca637a
--- /dev/null
+++ b/tools/testing/selftests/bpf/progs/user_ringbuf_fail.c
@@ -0,0 +1,174 @@
+// SPDX-License-Identifier: GPL-2.0
+/* Copyright (c) 2022 Meta Platforms, Inc. and affiliates. */
+
+#include <linux/bpf.h>
+#include <bpf/bpf_helpers.h>
+#include "bpf_misc.h"
+
+char _license[] SEC("license") = "GPL";
+
+struct sample {
+	int pid;
+	int seq;
+	long value;
+	char comm[16];
+};
+
+struct {
+	__uint(type, BPF_MAP_TYPE_USER_RINGBUF);
+} user_ringbuf SEC(".maps");
+
+static long
+bad_access1(struct bpf_dynptr *dynptr, void *context)
+{
+	const struct sample *sample;
+
+	sample = bpf_dynptr_data(dynptr - 1, 0, sizeof(*sample));
+	bpf_printk("Was able to pass bad pointer %lx\n", (__u64)dynptr - 1);
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to read before the pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_bad_access1(void *ctx)
+{
+	struct bpf_dynptr ptr;
+
+	bpf_user_ringbuf_drain(&user_ringbuf, bad_access1, NULL, 0);
+
+	return 0;
+}
+
+static long
+bad_access2(struct bpf_dynptr *dynptr, void *context)
+{
+	const struct sample *sample;
+
+	sample = bpf_dynptr_data(dynptr + 1, 0, sizeof(*sample));
+	bpf_printk("Was able to pass bad pointer %lx\n", (__u64)dynptr + 1);
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to read past the end of the pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_bad_access2(void *ctx)
+{
+	struct bpf_dynptr ptr;
+
+	bpf_user_ringbuf_drain(&user_ringbuf, bad_access2, NULL, 0);
+
+	return 0;
+}
+
+static long
+write_forbidden(struct bpf_dynptr *dynptr, void *context)
+{
+	*((long *)dynptr) = 0;
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to write to that pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_write_forbidden(void *ctx)
+{
+	struct bpf_dynptr ptr;
+
+	bpf_user_ringbuf_drain(&user_ringbuf, write_forbidden, NULL, 0);
+
+	return 0;
+}
+
+static long
+null_context_write(struct bpf_dynptr *dynptr, void *context)
+{
+	*((__u64 *)context) = 0;
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to write to that pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_null_context_write(void *ctx)
+{
+	struct bpf_dynptr ptr;
+
+	bpf_user_ringbuf_drain(&user_ringbuf, null_context_write, NULL, 0);
+
+	return 0;
+}
+
+static long
+null_context_read(struct bpf_dynptr *dynptr, void *context)
+{
+	__u64 id = *((__u64 *)context);
+
+	bpf_printk("Read id %lu\n", id);
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to write to that pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_null_context_read(void *ctx)
+{
+	struct bpf_dynptr ptr;
+
+	bpf_user_ringbuf_drain(&user_ringbuf, null_context_read, NULL, 0);
+
+	return 0;
+}
+
+static long
+try_discard_dynptr(struct bpf_dynptr *dynptr, void *context)
+{
+	bpf_ringbuf_discard_dynptr(dynptr, 0);
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to read past the end of the pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_discard_dynptr(void *ctx)
+{
+	struct bpf_dynptr ptr;
+
+	bpf_user_ringbuf_drain(&user_ringbuf, try_discard_dynptr, NULL, 0);
+
+	return 0;
+}
+
+static long
+try_submit_dynptr(struct bpf_dynptr *dynptr, void *context)
+{
+	bpf_ringbuf_submit_dynptr(dynptr, 0);
+
+	return 0;
+}
+
+/* A callback that accesses a dynptr in a bpf_user_ringbuf_drain callback should
+ * not be able to read past the end of the pointer.
+ */
+SEC("?raw_tp/sys_nanosleep")
+int user_ringbuf_callback_submit_dynptr(void *ctx)
+{
+	struct bpf_dynptr ptr;
+
+	bpf_user_ringbuf_drain(&user_ringbuf, try_submit_dynptr, NULL, 0);
+
+	return 0;
+}
diff --git a/tools/testing/selftests/bpf/progs/user_ringbuf_success.c b/tools/testing/selftests/bpf/progs/user_ringbuf_success.c
new file mode 100644
index 000000000000..ab8035a47da5
--- /dev/null
+++ b/tools/testing/selftests/bpf/progs/user_ringbuf_success.c
@@ -0,0 +1,220 @@
+// SPDX-License-Identifier: GPL-2.0
+/* Copyright (c) 2022 Meta Platforms, Inc. and affiliates. */
+
+#include <linux/bpf.h>
+#include <bpf/bpf_helpers.h>
+#include "bpf_misc.h"
+#include "../test_user_ringbuf.h"
+
+char _license[] SEC("license") = "GPL";
+
+struct {
+	__uint(type, BPF_MAP_TYPE_USER_RINGBUF);
+} user_ringbuf SEC(".maps");
+
+struct {
+	__uint(type, BPF_MAP_TYPE_RINGBUF);
+} kernel_ringbuf SEC(".maps");
+
+/* inputs */
+int pid, err, val;
+
+int read = 0;
+
+/* Counter used for end-to-end protocol test */
+__u64 kern_mutated = 0;
+__u64 user_mutated = 0;
+__u64 expected_user_mutated = 0;
+
+static int
+is_test_process(void)
+{
+	int cur_pid = bpf_get_current_pid_tgid() >> 32;
+
+	return cur_pid == pid;
+}
+
+static long
+record_sample(struct bpf_dynptr *dynptr, void *context)
+{
+	const struct sample *sample = NULL;
+	struct sample stack_sample;
+	int status;
+	static int num_calls;
+
+	if (num_calls++ % 2 == 0) {
+		status = bpf_dynptr_read(&stack_sample, sizeof(stack_sample),
+					 dynptr, 0, 0);
+		if (status) {
+			bpf_printk("bpf_dynptr_read() failed: %d\n", status);
+			err = 1;
+			return 0;
+		}
+	} else {
+		sample = bpf_dynptr_data(dynptr, 0, sizeof(*sample));
+		if (!sample) {
+			bpf_printk("Unexpectedly failed to get sample\n");
+			err = 2;
+			return 0;
+		}
+		stack_sample = *sample;
+	}
+
+	__sync_fetch_and_add(&read, 1);
+	return 0;
+}
+
+static void
+handle_sample_msg(const struct test_msg *msg)
+{
+	switch (msg->msg_op) {
+	case TEST_MSG_OP_INC64:
+		kern_mutated += msg->operand_64;
+		break;
+	case TEST_MSG_OP_INC32:
+		kern_mutated += msg->operand_32;
+		break;
+	case TEST_MSG_OP_MUL64:
+		kern_mutated *= msg->operand_64;
+		break;
+	case TEST_MSG_OP_MUL32:
+		kern_mutated *= msg->operand_32;
+		break;
+	default:
+		bpf_printk("Unrecognized op %d\n", msg->msg_op);
+		err = 2;
+	}
+}
+
+static long
+read_protocol_msg(struct bpf_dynptr *dynptr, void *context)
+{
+	const struct test_msg *msg = NULL;
+
+	msg = bpf_dynptr_data(dynptr, 0, sizeof(*msg));
+	if (!msg) {
+		err = 1;
+		bpf_printk("Unexpectedly failed to get msg\n");
+		return 0;
+	}
+
+	handle_sample_msg(msg);
+
+	return 0;
+}
+
+static int publish_next_kern_msg(__u32 index, void *context)
+{
+	struct test_msg *msg = NULL;
+	int operand_64 = TEST_OP_64;
+	int operand_32 = TEST_OP_32;
+
+	msg = bpf_ringbuf_reserve(&kernel_ringbuf, sizeof(*msg), 0);
+	if (!msg) {
+		err = 4;
+		return 1;
+	}
+
+	switch (index % TEST_MSG_OP_NUM_OPS) {
+	case TEST_MSG_OP_INC64:
+		msg->operand_64 = operand_64;
+		msg->msg_op = TEST_MSG_OP_INC64;
+		expected_user_mutated += operand_64;
+		break;
+	case TEST_MSG_OP_INC32:
+		msg->operand_32 = operand_32;
+		msg->msg_op = TEST_MSG_OP_INC32;
+		expected_user_mutated += operand_32;
+		break;
+	case TEST_MSG_OP_MUL64:
+		msg->operand_64 = operand_64;
+		msg->msg_op = TEST_MSG_OP_MUL64;
+		expected_user_mutated *= operand_64;
+		break;
+	case TEST_MSG_OP_MUL32:
+		msg->operand_32 = operand_32;
+		msg->msg_op = TEST_MSG_OP_MUL32;
+		expected_user_mutated *= operand_32;
+		break;
+	default:
+		bpf_ringbuf_discard(msg, 0);
+		err = 5;
+		return 1;
+	}
+
+	bpf_ringbuf_submit(msg, 0);
+
+	return 0;
+}
+
+static void
+publish_kern_messages(void)
+{
+	if (expected_user_mutated != user_mutated) {
+		bpf_printk("%d != %d\n", expected_user_mutated, user_mutated);
+		err = 3;
+		return;
+	}
+
+	bpf_loop(8, publish_next_kern_msg, NULL, 0);
+}
+
+SEC("fentry/" SYS_PREFIX "sys_getcwd")
+int test_user_ringbuf_protocol(void *ctx)
+{
+	long status = 0;
+	struct sample *sample = NULL;
+	struct bpf_dynptr ptr;
+
+	if (!is_test_process())
+		return 0;
+
+	status = bpf_user_ringbuf_drain(&user_ringbuf, read_protocol_msg, NULL, 0);
+	if (status < 0) {
+		bpf_printk("Drain returned: %ld\n", status);
+		err = 1;
+		return 0;
+	}
+
+	publish_kern_messages();
+
+	return 0;
+}
+
+SEC("fentry/" SYS_PREFIX "sys_getpgid")
+int test_user_ringbuf(void *ctx)
+{
+	int status = 0;
+	struct sample *sample = NULL;
+	struct bpf_dynptr ptr;
+
+	if (!is_test_process())
+		return 0;
+
+	err = bpf_user_ringbuf_drain(&user_ringbuf, record_sample, NULL, 0);
+
+	return 0;
+}
+
+static long
+do_nothing_cb(struct bpf_dynptr *dynptr, void *context)
+{
+	__sync_fetch_and_add(&read, 1);
+	return 0;
+}
+
+SEC("fentry/" SYS_PREFIX "sys_getrlimit")
+int test_user_ringbuf_epoll(void *ctx)
+{
+	long num_samples;
+
+	if (!is_test_process())
+		return 0;
+
+	num_samples = bpf_user_ringbuf_drain(&user_ringbuf, do_nothing_cb, NULL,
+					     0);
+	if (num_samples <= 0)
+		err = 1;
+
+	return 0;
+}
diff --git a/tools/testing/selftests/bpf/test_user_ringbuf.h b/tools/testing/selftests/bpf/test_user_ringbuf.h
new file mode 100644
index 000000000000..1643b4d59ba7
--- /dev/null
+++ b/tools/testing/selftests/bpf/test_user_ringbuf.h
@@ -0,0 +1,35 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+/* Copyright (c) 2022 Meta Platforms, Inc. and affiliates. */
+
+#ifndef _TEST_USER_RINGBUF_H
+#define _TEST_USER_RINGBUF_H
+
+#define TEST_OP_64 4
+#define TEST_OP_32 2
+
+enum test_msg_op {
+	TEST_MSG_OP_INC64,
+	TEST_MSG_OP_INC32,
+	TEST_MSG_OP_MUL64,
+	TEST_MSG_OP_MUL32,
+
+	// Must come last.
+	TEST_MSG_OP_NUM_OPS,
+};
+
+struct test_msg {
+	enum test_msg_op msg_op;
+	union {
+		__s64 operand_64;
+		__s32 operand_32;
+	};
+};
+
+struct sample {
+	int pid;
+	int seq;
+	long value;
+	char comm[16];
+};
+
+#endif /* _TEST_USER_RINGBUF_H */
-- 
2.37.1


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* Re: [PATCH v2 0/4] bpf: Add user-space-publisher ringbuffer map type
  2022-08-11 23:49 [PATCH v2 0/4] bpf: Add user-space-publisher ringbuffer map type David Vernet
                   ` (3 preceding siblings ...)
  2022-08-11 23:49 ` [PATCH v2 4/4] selftests/bpf: Add selftests validating the user ringbuf David Vernet
@ 2022-08-11 23:59 ` David Vernet
  4 siblings, 0 replies; 9+ messages in thread
From: David Vernet @ 2022-08-11 23:59 UTC (permalink / raw)
  To: bpf, andrii, ast, daniel
  Cc: haoluo, joannelkoong, john.fastabend, jolsa, kpsingh,
	linux-kernel, martin.lau, sdf, song, yhs, kernel-team, tj

On Thu, Aug 11, 2022 at 06:49:37PM -0500, David Vernet wrote:
> This patch set defines a new map type, BPF_MAP_TYPE_USER_RINGBUF, which
> provides single-user-space-producer / single-kernel-consumer semantics over
> a ringbuffer.  Along with the new map type, a helper function called
> bpf_user_ringbuf_drain() is added which allows a BPF program to specify a
> callback with the following signature, to which samples are posted by the
> helper:

[...]

I just noticed that Andrii left some comments on v1 about 20 mins ago.
Please disregard this version -- I'll apply Andrii's comments and then send
out a follow on v3 version.

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH v2 2/4] bpf: Add bpf_user_ringbuf_drain() helper
  2022-08-11 23:49 ` [PATCH v2 2/4] bpf: Add bpf_user_ringbuf_drain() helper David Vernet
@ 2022-08-15 21:23   ` Hao Luo
  2022-08-16 13:20     ` David Vernet
  0 siblings, 1 reply; 9+ messages in thread
From: Hao Luo @ 2022-08-15 21:23 UTC (permalink / raw)
  To: David Vernet
  Cc: bpf, andrii, ast, daniel, joannelkoong, john.fastabend, jolsa,
	kpsingh, linux-kernel, martin.lau, sdf, song, yhs, kernel-team,
	tj

On Thu, Aug 11, 2022 at 4:50 PM David Vernet <void@manifault.com> wrote:
>
> Now that we have a BPF_MAP_TYPE_USER_RINGBUF map type, we need to add a
> helper function that allows BPF programs to drain samples from the ring
> buffer, and invoke a callback for each. This patch adds a new
> bpf_user_ringbuf_drain() helper that provides this abstraction.
>
> In order to support this, we needed to also add a new PTR_TO_DYNPTR
> register type to reflect a dynptr that was allocated by a helper function
> and passed to a BPF program. The verifier currently only supports
> PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL and MEM_ALLOC.
>
> Signed-off-by: David Vernet <void@manifault.com>
> ---
[...]
> diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
> index c0f3bca4bb09..73fa6ed12052 100644
> --- a/kernel/bpf/ringbuf.c
> +++ b/kernel/bpf/ringbuf.c
[...]
> +static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
> +                                  u32 *size)
> +{
> +       unsigned long cons_pos, prod_pos;
> +       u32 sample_len, total_len;
> +       u32 *hdr;
> +       int err;
> +       int busy = 0;
> +
> +       /* If another consumer is already consuming a sample, wait for them to
> +        * finish.
> +        */
> +       if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
> +               return -EBUSY;
> +
> +       /* Synchronizes with smp_store_release() in user-space. */
> +       prod_pos = smp_load_acquire(&rb->producer_pos);
> +       /* Synchronizes with smp_store_release() in
> +        * __bpf_user_ringbuf_sample_release().
> +        */
> +       cons_pos = smp_load_acquire(&rb->consumer_pos);
> +       if (cons_pos >= prod_pos) {
> +               atomic_set(&rb->busy, 0);
> +               return -ENODATA;
> +       }
> +
> +       hdr = (u32 *)((uintptr_t)rb->data + (uintptr_t)(cons_pos & rb->mask));
> +       sample_len = *hdr;
> +

rb->data and rb->mask better be protected by READ_ONCE.

> +       /* Check that the sample can fit into a dynptr. */
> +       err = bpf_dynptr_check_size(sample_len);
> +       if (err) {
> +               atomic_set(&rb->busy, 0);
> +               return err;
> +       }
> +
> +       /* Check that the sample fits within the region advertised by the
> +        * consumer position.
> +        */
> +       total_len = sample_len + BPF_RINGBUF_HDR_SZ;
> +       if (total_len > prod_pos - cons_pos) {
> +               atomic_set(&rb->busy, 0);
> +               return -E2BIG;
> +       }
> +
> +       /* Check that the sample fits within the data region of the ring buffer.
> +        */
> +       if (total_len > rb->mask + 1) {
> +               atomic_set(&rb->busy, 0);
> +               return -E2BIG;
> +       }
> +
> +       /* consumer_pos is updated when the sample is released.
> +        */
> +
> +       *sample = (void *)((uintptr_t)rb->data +
> +                          (uintptr_t)((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
> +       *size = sample_len;
> +
> +       return 0;
> +}
> +
> +static void
> +__bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
> +                                 u64 flags)
> +{
> +
> +
> +       /* To release the ringbuffer, just increment the producer position to
> +        * signal that a new sample can be consumed. The busy bit is cleared by
> +        * userspace when posting a new sample to the ringbuffer.
> +        */
> +       smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
> +                         BPF_RINGBUF_HDR_SZ);
> +
> +       if (flags & BPF_RB_FORCE_WAKEUP || !(flags & BPF_RB_NO_WAKEUP))
> +               irq_work_queue(&rb->work);
> +
> +       atomic_set(&rb->busy, 0);
> +}

atomic_set() doesn't imply barrier, so it could be observed before
smp_store_release(). So the paired smp_load_acquire could observe
rb->busy == 0 while seeing the old consumer_pos. At least, you need
smp_mb__before_atomic() as a barrier before atomic_set. Or smp_wmb()
to ensure all _writes_ complete when see rb->busy == 0.

Similarly rb->work could be observed before smp_store_release.

Is it possible for __bpf_user_ringbuf_sample_release to be called
concurrently? If yes, there are races. Because the load of
rb->consumer_pos is not protected by smp_load_acquire, they are not
synchronized with this smp_store_release. Concurrently calling
__bpf_user_ringbuf_sample_release may cause both threads getting stale
consumer_pos values.



> +
> +BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
> +          void *, callback_fn, void *, callback_ctx, u64, flags)
> +{
> +       struct bpf_ringbuf *rb;
> +       long num_samples = 0, ret = 0;
> +       int err;
> +       bpf_callback_t callback = (bpf_callback_t)callback_fn;
> +       u64 wakeup_flags = BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP;
> +
> +       if (unlikely(flags & ~wakeup_flags))
> +               return -EINVAL;
> +
> +       /* The two wakeup flags are mutually exclusive. */
> +       if (unlikely((flags & wakeup_flags) == wakeup_flags))
> +               return -EINVAL;
> +
> +       rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
> +       do {
> +               u32 size;
> +               void *sample;
> +
> +               err = __bpf_user_ringbuf_poll(rb, &sample, &size);
> +
> +               if (!err) {
> +                       struct bpf_dynptr_kern dynptr;
> +
> +                       bpf_dynptr_init(&dynptr, sample, BPF_DYNPTR_TYPE_LOCAL,
> +                                       0, size);
> +                       ret = callback((uintptr_t)&dynptr,
> +                                      (uintptr_t)callback_ctx, 0, 0, 0);
> +
> +                       __bpf_user_ringbuf_sample_release(rb, size, flags);
> +                       num_samples++;
> +               }
> +       } while (err == 0 && num_samples < 4096 && ret == 0);
> +
> +       return num_samples;
> +}
> +
> +const struct bpf_func_proto bpf_user_ringbuf_drain_proto = {
> +       .func           = bpf_user_ringbuf_drain,
> +       .ret_type       = RET_INTEGER,
> +       .arg1_type      = ARG_CONST_MAP_PTR,
> +       .arg2_type      = ARG_PTR_TO_FUNC,
> +       .arg3_type      = ARG_PTR_TO_STACK_OR_NULL,
> +       .arg4_type      = ARG_ANYTHING,
> +};
> diff --git a/kernel/bpf/verifier.c b/kernel/bpf/verifier.c
> index 970ec5c7ce05..211322b3317b 100644
> --- a/kernel/bpf/verifier.c
> +++ b/kernel/bpf/verifier.c
> @@ -561,6 +561,7 @@ static const char *reg_type_str(struct bpf_verifier_env *env,
>                 [PTR_TO_BUF]            = "buf",
>                 [PTR_TO_FUNC]           = "func",
>                 [PTR_TO_MAP_KEY]        = "map_key",
> +               [PTR_TO_DYNPTR]         = "dynptr_ptr",
>         };
>
>         if (type & PTR_MAYBE_NULL) {
> @@ -5662,6 +5663,12 @@ static const struct bpf_reg_types stack_ptr_types = { .types = { PTR_TO_STACK }
>  static const struct bpf_reg_types const_str_ptr_types = { .types = { PTR_TO_MAP_VALUE } };
>  static const struct bpf_reg_types timer_types = { .types = { PTR_TO_MAP_VALUE } };
>  static const struct bpf_reg_types kptr_types = { .types = { PTR_TO_MAP_VALUE } };
> +static const struct bpf_reg_types dynptr_types = {
> +       .types = {
> +               PTR_TO_STACK,
> +               PTR_TO_DYNPTR | MEM_ALLOC | DYNPTR_TYPE_LOCAL,
> +       }
> +};
>
>  static const struct bpf_reg_types *compatible_reg_types[__BPF_ARG_TYPE_MAX] = {
>         [ARG_PTR_TO_MAP_KEY]            = &map_key_value_types,
> @@ -5688,7 +5695,7 @@ static const struct bpf_reg_types *compatible_reg_types[__BPF_ARG_TYPE_MAX] = {
>         [ARG_PTR_TO_CONST_STR]          = &const_str_ptr_types,
>         [ARG_PTR_TO_TIMER]              = &timer_types,
>         [ARG_PTR_TO_KPTR]               = &kptr_types,
> -       [ARG_PTR_TO_DYNPTR]             = &stack_ptr_types,
> +       [ARG_PTR_TO_DYNPTR]             = &dynptr_types,
>  };
>
>  static int check_reg_type(struct bpf_verifier_env *env, u32 regno,
> @@ -6031,6 +6038,13 @@ static int check_func_arg(struct bpf_verifier_env *env, u32 arg,
>                 err = check_mem_size_reg(env, reg, regno, true, meta);
>                 break;
>         case ARG_PTR_TO_DYNPTR:
> +               /* We only need to check for initialized / uninitialized helper
> +                * dynptr args if the dynptr is not MEM_ALLOC, as the assumption
> +                * is that if it is, that a helper function initialized the
> +                * dynptr on behalf of the BPF program.
> +                */
> +               if (reg->type & MEM_ALLOC)
> +                       break;
>                 if (arg_type & MEM_UNINIT) {
>                         if (!is_dynptr_reg_valid_uninit(env, reg)) {
>                                 verbose(env, "Dynptr has to be an uninitialized dynptr\n");
> @@ -6203,7 +6217,9 @@ static int check_map_func_compatibility(struct bpf_verifier_env *env,
>                         goto error;
>                 break;
>         case BPF_MAP_TYPE_USER_RINGBUF:
> -               goto error;
> +               if (func_id != BPF_FUNC_user_ringbuf_drain)
> +                       goto error;
> +               break;
>         case BPF_MAP_TYPE_STACK_TRACE:
>                 if (func_id != BPF_FUNC_get_stackid)
>                         goto error;
> @@ -6323,6 +6339,10 @@ static int check_map_func_compatibility(struct bpf_verifier_env *env,
>                 if (map->map_type != BPF_MAP_TYPE_RINGBUF)
>                         goto error;
>                 break;
> +       case BPF_FUNC_user_ringbuf_drain:
> +               if (map->map_type != BPF_MAP_TYPE_USER_RINGBUF)
> +                       goto error;
> +               break;
>         case BPF_FUNC_get_stackid:
>                 if (map->map_type != BPF_MAP_TYPE_STACK_TRACE)
>                         goto error;
> @@ -6878,6 +6898,29 @@ static int set_find_vma_callback_state(struct bpf_verifier_env *env,
>         return 0;
>  }
>
> +static int set_user_ringbuf_callback_state(struct bpf_verifier_env *env,
> +                                          struct bpf_func_state *caller,
> +                                          struct bpf_func_state *callee,
> +                                          int insn_idx)
> +{
> +       /* bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void
> +        *                        callback_ctx, u64 flags);
> +        * callback_fn(struct bpf_dynptr_t* dynptr, void *callback_ctx);
> +        */
> +       __mark_reg_not_init(env, &callee->regs[BPF_REG_0]);
> +       callee->regs[BPF_REG_1].type = PTR_TO_DYNPTR | DYNPTR_TYPE_LOCAL | MEM_ALLOC;
> +       __mark_reg_known_zero(&callee->regs[BPF_REG_1]);
> +       callee->regs[BPF_REG_2] = caller->regs[BPF_REG_3];
> +
> +       /* unused */
> +       __mark_reg_not_init(env, &callee->regs[BPF_REG_3]);
> +       __mark_reg_not_init(env, &callee->regs[BPF_REG_4]);
> +       __mark_reg_not_init(env, &callee->regs[BPF_REG_5]);
> +
> +       callee->in_callback_fn = true;
> +       return 0;
> +}
> +
>  static int prepare_func_exit(struct bpf_verifier_env *env, int *insn_idx)
>  {
>         struct bpf_verifier_state *state = env->cur_state;
> @@ -7156,7 +7199,7 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
>         struct bpf_reg_state *regs;
>         struct bpf_call_arg_meta meta;
>         int insn_idx = *insn_idx_p;
> -       bool changes_data;
> +       bool changes_data, mem_alloc_dynptr;
>         int i, err, func_id;
>
>         /* find function prototype */
> @@ -7323,22 +7366,34 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
>                 }
>                 break;
>         case BPF_FUNC_dynptr_data:
> +               mem_alloc_dynptr = false;
>                 for (i = 0; i < MAX_BPF_FUNC_REG_ARGS; i++) {
>                         if (arg_type_is_dynptr(fn->arg_type[i])) {
> +                               struct bpf_reg_state *reg = &regs[BPF_REG_1 + i];
> +
>                                 if (meta.ref_obj_id) {
>                                         verbose(env, "verifier internal error: meta.ref_obj_id already set\n");
>                                         return -EFAULT;
>                                 }
> -                               /* Find the id of the dynptr we're tracking the reference of */
> -                               meta.ref_obj_id = stack_slot_get_id(env, &regs[BPF_REG_1 + i]);
> +
> +                               mem_alloc_dynptr = reg->type & MEM_ALLOC;
> +                               if (!mem_alloc_dynptr)
> +                                       /* Find the id of the dynptr we're
> +                                        * tracking the reference of
> +                                        */
> +                                       meta.ref_obj_id = stack_slot_get_id(env, reg);
>                                 break;
>                         }
>                 }
> -               if (i == MAX_BPF_FUNC_REG_ARGS) {
> +               if (!mem_alloc_dynptr && i == MAX_BPF_FUNC_REG_ARGS) {
>                         verbose(env, "verifier internal error: no dynptr in bpf_dynptr_data()\n");
>                         return -EFAULT;
>                 }
>                 break;
> +       case BPF_FUNC_user_ringbuf_drain:
> +               err = __check_func_call(env, insn, insn_idx_p, meta.subprogno,
> +                                       set_user_ringbuf_callback_state);
> +               break;
>         }
>
>         if (err)
> diff --git a/tools/include/uapi/linux/bpf.h b/tools/include/uapi/linux/bpf.h
> index ee04b71969b4..76909f43fc0e 100644
> --- a/tools/include/uapi/linux/bpf.h
> +++ b/tools/include/uapi/linux/bpf.h
> @@ -5354,6 +5354,12 @@ union bpf_attr {
>   *     Return
>   *             Current *ktime*.
>   *
> + * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
> + *     Description
> + *             Drain samples from the specified user ringbuffer, and invoke the
> + *             provided callback for each such sample.
> + *     Return
> + *             An error if a sample could not be drained.
>   */
>  #define __BPF_FUNC_MAPPER(FN)          \
>         FN(unspec),                     \
> @@ -5565,6 +5571,7 @@ union bpf_attr {
>         FN(tcp_raw_check_syncookie_ipv4),       \
>         FN(tcp_raw_check_syncookie_ipv6),       \
>         FN(ktime_get_tai_ns),           \
> +       FN(bpf_user_ringbuf_drain),     \
>         /* */
>
>  /* integer value in 'imm' field of BPF_CALL instruction selects which helper
> --
> 2.37.1
>

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH v2 2/4] bpf: Add bpf_user_ringbuf_drain() helper
  2022-08-15 21:23   ` Hao Luo
@ 2022-08-16 13:20     ` David Vernet
  2022-08-16 18:24       ` Hao Luo
  0 siblings, 1 reply; 9+ messages in thread
From: David Vernet @ 2022-08-16 13:20 UTC (permalink / raw)
  To: Hao Luo
  Cc: bpf, andrii, ast, daniel, joannelkoong, john.fastabend, jolsa,
	kpsingh, linux-kernel, martin.lau, sdf, song, yhs, kernel-team,
	tj

On Mon, Aug 15, 2022 at 02:23:04PM -0700, Hao Luo wrote:

Hi Hao,

Thanks for the review.

> On Thu, Aug 11, 2022 at 4:50 PM David Vernet <void@manifault.com> wrote:
> >
> > Now that we have a BPF_MAP_TYPE_USER_RINGBUF map type, we need to add a
> > helper function that allows BPF programs to drain samples from the ring
> > buffer, and invoke a callback for each. This patch adds a new
> > bpf_user_ringbuf_drain() helper that provides this abstraction.
> >
> > In order to support this, we needed to also add a new PTR_TO_DYNPTR
> > register type to reflect a dynptr that was allocated by a helper function
> > and passed to a BPF program. The verifier currently only supports
> > PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL and MEM_ALLOC.
> >
> > Signed-off-by: David Vernet <void@manifault.com>
> > ---
> [...]
> > diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
> > index c0f3bca4bb09..73fa6ed12052 100644
> > --- a/kernel/bpf/ringbuf.c
> > +++ b/kernel/bpf/ringbuf.c
> [...]
> > +static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
> > +                                  u32 *size)
> > +{
> > +       unsigned long cons_pos, prod_pos;
> > +       u32 sample_len, total_len;
> > +       u32 *hdr;
> > +       int err;
> > +       int busy = 0;
> > +
> > +       /* If another consumer is already consuming a sample, wait for them to
> > +        * finish.
> > +        */
> > +       if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
> > +               return -EBUSY;
> > +
> > +       /* Synchronizes with smp_store_release() in user-space. */
> > +       prod_pos = smp_load_acquire(&rb->producer_pos);
> > +       /* Synchronizes with smp_store_release() in
> > +        * __bpf_user_ringbuf_sample_release().
> > +        */
> > +       cons_pos = smp_load_acquire(&rb->consumer_pos);
> > +       if (cons_pos >= prod_pos) {
> > +               atomic_set(&rb->busy, 0);
> > +               return -ENODATA;
> > +       }
> > +
> > +       hdr = (u32 *)((uintptr_t)rb->data + (uintptr_t)(cons_pos & rb->mask));
> > +       sample_len = *hdr;
> > +
>
> rb->data and rb->mask better be protected by READ_ONCE.

Could you please clarify about the behavior you're protecting against here?
We're just calculating an offset from rb->data, and both rb->data and
rb->mask are set only once when the ringbuffer is first created in
bpf_ringbuf_area_alloc(). I'm not following what we'd be protecting against
by making these volatile, though I freely admit that I may be missing some
weird possible behavior in the compiler.

For what it's worth, in a follow-on version of the patch, I've updated this
read of the sample len to be an smp_load_acquire() to accommodate Andrii's
suggestion [0] that we should support using the busy bit and discard bit in
the header from the get-go, as we do with BPF_MAP_TYPE_RINGBUF ringbuffers.

[0]: https://lore.kernel.org/all/CAEf4BzYVLgd=rHaxzZjyv0WJBzBpMqGSStgVhXG9XOHpB7qDRQ@mail.gmail.com/

> > +       /* Check that the sample can fit into a dynptr. */
> > +       err = bpf_dynptr_check_size(sample_len);
> > +       if (err) {
> > +               atomic_set(&rb->busy, 0);
> > +               return err;
> > +       }
> > +
> > +       /* Check that the sample fits within the region advertised by the
> > +        * consumer position.
> > +        */
> > +       total_len = sample_len + BPF_RINGBUF_HDR_SZ;
> > +       if (total_len > prod_pos - cons_pos) {
> > +               atomic_set(&rb->busy, 0);
> > +               return -E2BIG;
> > +       }
> > +
> > +       /* Check that the sample fits within the data region of the ring buffer.
> > +        */
> > +       if (total_len > rb->mask + 1) {
> > +               atomic_set(&rb->busy, 0);
> > +               return -E2BIG;
> > +       }
> > +
> > +       /* consumer_pos is updated when the sample is released.
> > +        */
> > +
> > +       *sample = (void *)((uintptr_t)rb->data +
> > +                          (uintptr_t)((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
> > +       *size = sample_len;
> > +
> > +       return 0;
> > +}
> > +
> > +static void
> > +__bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
> > +                                 u64 flags)
> > +{
> > +
> > +
> > +       /* To release the ringbuffer, just increment the producer position to
> > +        * signal that a new sample can be consumed. The busy bit is cleared by
> > +        * userspace when posting a new sample to the ringbuffer.
> > +        */
> > +       smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
> > +                         BPF_RINGBUF_HDR_SZ);
> > +
> > +       if (flags & BPF_RB_FORCE_WAKEUP || !(flags & BPF_RB_NO_WAKEUP))
> > +               irq_work_queue(&rb->work);
> > +
> > +       atomic_set(&rb->busy, 0);
> > +}
> 
> atomic_set() doesn't imply barrier, so it could be observed before
> smp_store_release(). So the paired smp_load_acquire could observe
> rb->busy == 0 while seeing the old consumer_pos. At least, you need
> smp_mb__before_atomic() as a barrier before atomic_set. Or smp_wmb()
> to ensure all _writes_ complete when see rb->busy == 0.

Thanks for catching this. I should have been more careful to not assume the
semantics of atomic_set(), and I see now that you're of course correct that
it's just a WRITE_ONCE() and has no implications at all w.r.t. memory or
compiler barriers. I'll fix this in the follow-on version, and will give
another closer read over memory-barriers.txt and atomic_t.txt.

> Similarly rb->work could be observed before smp_store_release.

Yes, in the follow-on version I'll move the atomic_set() to before the
irq_work_queue() invocation (per Andrii's comment in [1], though that
discussion is still ongoing), and will add the missing
smp_mb__before_atomic(). Thanks again for catching this.

[1]: https://lore.kernel.org/all/CAEf4BzZ-m-AUX+1+CGr7nMxMDnT=fjkn8DP9nP21Uts1y7fMyg@mail.gmail.com/

> Is it possible for __bpf_user_ringbuf_sample_release to be called
> concurrently? If yes, there are races. Because the load of
> rb->consumer_pos is not protected by smp_load_acquire, they are not
> synchronized with this smp_store_release. Concurrently calling
> __bpf_user_ringbuf_sample_release may cause both threads getting stale
> consumer_pos values.

If we add smp_mb__before_atomic() per your proposed fix above, I don't
believe this is an issue. __bpf_user_ringbuf_sample_release() should only
be invoked when a caller has an unreleased sample, and that can only happen
in a serial context due to the protection afforded by the atomic busy bit.

A single caller will not see a stale value, as they must first invoke
__bpf_user_ringbuf_peek(), and then invoke
__bpf_user_ringbuf_sample_release() with the sample they received. The
consumer pos they read in __bpf_user_ringbuf_sample_release() was already
smp_load_acquire()'d in __bpf_user_ringbuf_peek(), so they shouldn't see a
stale value there. We could certainly add another smp_load_acquire() here
for safety, but it would be redundant.

Thanks,
David

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH v2 2/4] bpf: Add bpf_user_ringbuf_drain() helper
  2022-08-16 13:20     ` David Vernet
@ 2022-08-16 18:24       ` Hao Luo
  0 siblings, 0 replies; 9+ messages in thread
From: Hao Luo @ 2022-08-16 18:24 UTC (permalink / raw)
  To: David Vernet
  Cc: bpf, andrii, ast, daniel, joannelkoong, john.fastabend, jolsa,
	kpsingh, linux-kernel, martin.lau, sdf, song, yhs, kernel-team,
	tj

On Tue, Aug 16, 2022 at 6:20 AM David Vernet <void@manifault.com> wrote:
>
> On Mon, Aug 15, 2022 at 02:23:04PM -0700, Hao Luo wrote:
>
> Hi Hao,
>
> Thanks for the review.
>
> > On Thu, Aug 11, 2022 at 4:50 PM David Vernet <void@manifault.com> wrote:
> > >
> > > Now that we have a BPF_MAP_TYPE_USER_RINGBUF map type, we need to add a
> > > helper function that allows BPF programs to drain samples from the ring
> > > buffer, and invoke a callback for each. This patch adds a new
> > > bpf_user_ringbuf_drain() helper that provides this abstraction.
> > >
> > > In order to support this, we needed to also add a new PTR_TO_DYNPTR
> > > register type to reflect a dynptr that was allocated by a helper function
> > > and passed to a BPF program. The verifier currently only supports
> > > PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL and MEM_ALLOC.
> > >
> > > Signed-off-by: David Vernet <void@manifault.com>
> > > ---
> > [...]
> > > diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
> > > index c0f3bca4bb09..73fa6ed12052 100644
> > > --- a/kernel/bpf/ringbuf.c
> > > +++ b/kernel/bpf/ringbuf.c
> > [...]
> > > +static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
> > > +                                  u32 *size)
> > > +{
> > > +       unsigned long cons_pos, prod_pos;
> > > +       u32 sample_len, total_len;
> > > +       u32 *hdr;
> > > +       int err;
> > > +       int busy = 0;
> > > +
> > > +       /* If another consumer is already consuming a sample, wait for them to
> > > +        * finish.
> > > +        */
> > > +       if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
> > > +               return -EBUSY;
> > > +
> > > +       /* Synchronizes with smp_store_release() in user-space. */
> > > +       prod_pos = smp_load_acquire(&rb->producer_pos);
> > > +       /* Synchronizes with smp_store_release() in
> > > +        * __bpf_user_ringbuf_sample_release().
> > > +        */
> > > +       cons_pos = smp_load_acquire(&rb->consumer_pos);
> > > +       if (cons_pos >= prod_pos) {
> > > +               atomic_set(&rb->busy, 0);
> > > +               return -ENODATA;
> > > +       }
> > > +
> > > +       hdr = (u32 *)((uintptr_t)rb->data + (uintptr_t)(cons_pos & rb->mask));
> > > +       sample_len = *hdr;
> > > +
> >
> > rb->data and rb->mask better be protected by READ_ONCE.
>
> Could you please clarify about the behavior you're protecting against here?
> We're just calculating an offset from rb->data, and both rb->data and
> rb->mask are set only once when the ringbuffer is first created in
> bpf_ringbuf_area_alloc(). I'm not following what we'd be protecting against
> by making these volatile, though I freely admit that I may be missing some
> weird possible behavior in the compiler.
>

Sorry, I missed the fact that rb->data and rb->mask are set only once.
I thought rb->data also moved somewhere.

My mental model is: normally for accessing shared data, if there is no
clear critical section protected by locks or mutex etc, I would wrap
them in READ_ONCE and WRITE_ONCE. But here, if it's read-only, it
should be ok IMHO.

> For what it's worth, in a follow-on version of the patch, I've updated this
> read of the sample len to be an smp_load_acquire() to accommodate Andrii's
> suggestion [0] that we should support using the busy bit and discard bit in
> the header from the get-go, as we do with BPF_MAP_TYPE_RINGBUF ringbuffers.
>
> [0]: https://lore.kernel.org/all/CAEf4BzYVLgd=rHaxzZjyv0WJBzBpMqGSStgVhXG9XOHpB7qDRQ@mail.gmail.com/
>
> > > +       /* Check that the sample can fit into a dynptr. */
> > > +       err = bpf_dynptr_check_size(sample_len);
> > > +       if (err) {
> > > +               atomic_set(&rb->busy, 0);
> > > +               return err;
> > > +       }
> > > +
> > > +       /* Check that the sample fits within the region advertised by the
> > > +        * consumer position.
> > > +        */
> > > +       total_len = sample_len + BPF_RINGBUF_HDR_SZ;
> > > +       if (total_len > prod_pos - cons_pos) {
> > > +               atomic_set(&rb->busy, 0);
> > > +               return -E2BIG;
> > > +       }
> > > +
> > > +       /* Check that the sample fits within the data region of the ring buffer.
> > > +        */
> > > +       if (total_len > rb->mask + 1) {
> > > +               atomic_set(&rb->busy, 0);
> > > +               return -E2BIG;
> > > +       }
> > > +
> > > +       /* consumer_pos is updated when the sample is released.
> > > +        */
> > > +
> > > +       *sample = (void *)((uintptr_t)rb->data +
> > > +                          (uintptr_t)((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
> > > +       *size = sample_len;
> > > +
> > > +       return 0;
> > > +}
> > > +
> > > +static void
> > > +__bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
> > > +                                 u64 flags)
> > > +{
> > > +
> > > +
> > > +       /* To release the ringbuffer, just increment the producer position to
> > > +        * signal that a new sample can be consumed. The busy bit is cleared by
> > > +        * userspace when posting a new sample to the ringbuffer.
> > > +        */
> > > +       smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
> > > +                         BPF_RINGBUF_HDR_SZ);
> > > +
> > > +       if (flags & BPF_RB_FORCE_WAKEUP || !(flags & BPF_RB_NO_WAKEUP))
> > > +               irq_work_queue(&rb->work);
> > > +
> > > +       atomic_set(&rb->busy, 0);
> > > +}
> >
> > atomic_set() doesn't imply barrier, so it could be observed before
> > smp_store_release(). So the paired smp_load_acquire could observe
> > rb->busy == 0 while seeing the old consumer_pos. At least, you need
> > smp_mb__before_atomic() as a barrier before atomic_set. Or smp_wmb()
> > to ensure all _writes_ complete when see rb->busy == 0.
>
> Thanks for catching this. I should have been more careful to not assume the
> semantics of atomic_set(), and I see now that you're of course correct that
> it's just a WRITE_ONCE() and has no implications at all w.r.t. memory or
> compiler barriers. I'll fix this in the follow-on version, and will give
> another closer read over memory-barriers.txt and atomic_t.txt.
>

No problem. These things are tricky.

> > Similarly rb->work could be observed before smp_store_release.
>
> Yes, in the follow-on version I'll move the atomic_set() to before the
> irq_work_queue() invocation (per Andrii's comment in [1], though that
> discussion is still ongoing), and will add the missing
> smp_mb__before_atomic(). Thanks again for catching this.
>
> [1]: https://lore.kernel.org/all/CAEf4BzZ-m-AUX+1+CGr7nMxMDnT=fjkn8DP9nP21Uts1y7fMyg@mail.gmail.com/
>
> > Is it possible for __bpf_user_ringbuf_sample_release to be called
> > concurrently? If yes, there are races. Because the load of
> > rb->consumer_pos is not protected by smp_load_acquire, they are not
> > synchronized with this smp_store_release. Concurrently calling
> > __bpf_user_ringbuf_sample_release may cause both threads getting stale
> > consumer_pos values.
>
> If we add smp_mb__before_atomic() per your proposed fix above, I don't
> believe this is an issue. __bpf_user_ringbuf_sample_release() should only
> be invoked when a caller has an unreleased sample, and that can only happen
> in a serial context due to the protection afforded by the atomic busy bit.
>

Right. I gave it more thought after publishing the comment yesterday.
There are two parts of synchronization: sync between multi producers
and sync between producer and consumer. It looks like multi producers
are serialized by the atomic busy bit. We need to fix the barrier. The
sync between producer and consumer is weaker, using the lockless
primitives like store_release/load_acquire. It should be fine.


> A single caller will not see a stale value, as they must first invoke
> __bpf_user_ringbuf_peek(), and then invoke
> __bpf_user_ringbuf_sample_release() with the sample they received. The
> consumer pos they read in __bpf_user_ringbuf_sample_release() was already
> smp_load_acquire()'d in __bpf_user_ringbuf_peek(), so they shouldn't see a
> stale value there. We could certainly add another smp_load_acquire() here
> for safety, but it would be redundant.
>
> Thanks,
> David

^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2022-08-16 18:25 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-08-11 23:49 [PATCH v2 0/4] bpf: Add user-space-publisher ringbuffer map type David Vernet
2022-08-11 23:49 ` [PATCH v2 1/4] bpf: Define new BPF_MAP_TYPE_USER_RINGBUF " David Vernet
2022-08-11 23:49 ` [PATCH v2 2/4] bpf: Add bpf_user_ringbuf_drain() helper David Vernet
2022-08-15 21:23   ` Hao Luo
2022-08-16 13:20     ` David Vernet
2022-08-16 18:24       ` Hao Luo
2022-08-11 23:49 ` [PATCH v2 3/4] bpf: Add libbpf logic for user-space ring buffer David Vernet
2022-08-11 23:49 ` [PATCH v2 4/4] selftests/bpf: Add selftests validating the user ringbuf David Vernet
2022-08-11 23:59 ` [PATCH v2 0/4] bpf: Add user-space-publisher ringbuffer map type David Vernet

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).